koprogo_api/infrastructure/database/repositories/
local_exchange_repository_impl.rs

1use crate::application::ports::LocalExchangeRepository;
2use crate::domain::entities::{ExchangeStatus, ExchangeType, LocalExchange};
3use crate::infrastructure::database::pool::DbPool;
4use async_trait::async_trait;
5use sqlx::Row;
6use uuid::Uuid;
7
8pub struct PostgresLocalExchangeRepository {
9    pool: DbPool,
10}
11
12impl PostgresLocalExchangeRepository {
13    pub fn new(pool: DbPool) -> Self {
14        Self { pool }
15    }
16}
17
18#[async_trait]
19impl LocalExchangeRepository for PostgresLocalExchangeRepository {
20    async fn create(&self, exchange: &LocalExchange) -> Result<LocalExchange, String> {
21        sqlx::query(
22            r#"
23            INSERT INTO local_exchanges (
24                id, building_id, provider_id, requester_id, exchange_type,
25                title, description, credits, status, offered_at,
26                requested_at, started_at, completed_at, cancelled_at,
27                cancellation_reason, provider_rating, requester_rating,
28                created_at, updated_at
29            )
30            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)
31            "#,
32        )
33        .bind(exchange.id)
34        .bind(exchange.building_id)
35        .bind(exchange.provider_id)
36        .bind(exchange.requester_id)
37        .bind(exchange.exchange_type.to_sql())
38        .bind(&exchange.title)
39        .bind(&exchange.description)
40        .bind(exchange.credits)
41        .bind(exchange.status.to_sql())
42        .bind(exchange.offered_at)
43        .bind(exchange.requested_at)
44        .bind(exchange.started_at)
45        .bind(exchange.completed_at)
46        .bind(exchange.cancelled_at)
47        .bind(&exchange.cancellation_reason)
48        .bind(exchange.provider_rating)
49        .bind(exchange.requester_rating)
50        .bind(exchange.created_at)
51        .bind(exchange.updated_at)
52        .execute(&self.pool)
53        .await
54        .map_err(|e| format!("Database error: {}", e))?;
55
56        Ok(exchange.clone())
57    }
58
59    async fn find_by_id(&self, id: Uuid) -> Result<Option<LocalExchange>, String> {
60        let row = sqlx::query(
61            r#"
62            SELECT
63                id, building_id, provider_id, requester_id, exchange_type,
64                title, description, credits, status, offered_at,
65                requested_at, started_at, completed_at, cancelled_at,
66                cancellation_reason, provider_rating, requester_rating,
67                created_at, updated_at
68            FROM local_exchanges
69            WHERE id = $1
70            "#,
71        )
72        .bind(id)
73        .fetch_optional(&self.pool)
74        .await
75        .map_err(|e| format!("Database error: {}", e))?;
76
77        Ok(row.map(|row| map_row_to_exchange(&row)))
78    }
79
80    async fn find_by_building(&self, building_id: Uuid) -> Result<Vec<LocalExchange>, String> {
81        let rows = sqlx::query(
82            r#"
83            SELECT
84                id, building_id, provider_id, requester_id, exchange_type,
85                title, description, credits, status, offered_at,
86                requested_at, started_at, completed_at, cancelled_at,
87                cancellation_reason, provider_rating, requester_rating,
88                created_at, updated_at
89            FROM local_exchanges
90            WHERE building_id = $1
91            ORDER BY offered_at DESC
92            "#,
93        )
94        .bind(building_id)
95        .fetch_all(&self.pool)
96        .await
97        .map_err(|e| format!("Database error: {}", e))?;
98
99        Ok(rows.iter().map(map_row_to_exchange).collect())
100    }
101
102    async fn find_by_building_and_status(
103        &self,
104        building_id: Uuid,
105        status: &str,
106    ) -> Result<Vec<LocalExchange>, String> {
107        let rows = sqlx::query(
108            r#"
109            SELECT
110                id, building_id, provider_id, requester_id, exchange_type,
111                title, description, credits, status, offered_at,
112                requested_at, started_at, completed_at, cancelled_at,
113                cancellation_reason, provider_rating, requester_rating,
114                created_at, updated_at
115            FROM local_exchanges
116            WHERE building_id = $1 AND status = $2
117            ORDER BY offered_at DESC
118            "#,
119        )
120        .bind(building_id)
121        .bind(status)
122        .fetch_all(&self.pool)
123        .await
124        .map_err(|e| format!("Database error: {}", e))?;
125
126        Ok(rows.iter().map(map_row_to_exchange).collect())
127    }
128
129    async fn find_by_provider(&self, provider_id: Uuid) -> Result<Vec<LocalExchange>, String> {
130        let rows = sqlx::query(
131            r#"
132            SELECT
133                id, building_id, provider_id, requester_id, exchange_type,
134                title, description, credits, status, offered_at,
135                requested_at, started_at, completed_at, cancelled_at,
136                cancellation_reason, provider_rating, requester_rating,
137                created_at, updated_at
138            FROM local_exchanges
139            WHERE provider_id = $1
140            ORDER BY offered_at DESC
141            "#,
142        )
143        .bind(provider_id)
144        .fetch_all(&self.pool)
145        .await
146        .map_err(|e| format!("Database error: {}", e))?;
147
148        Ok(rows.iter().map(map_row_to_exchange).collect())
149    }
150
151    async fn find_by_requester(&self, requester_id: Uuid) -> Result<Vec<LocalExchange>, String> {
152        let rows = sqlx::query(
153            r#"
154            SELECT
155                id, building_id, provider_id, requester_id, exchange_type,
156                title, description, credits, status, offered_at,
157                requested_at, started_at, completed_at, cancelled_at,
158                cancellation_reason, provider_rating, requester_rating,
159                created_at, updated_at
160            FROM local_exchanges
161            WHERE requester_id = $1
162            ORDER BY requested_at DESC
163            "#,
164        )
165        .bind(requester_id)
166        .fetch_all(&self.pool)
167        .await
168        .map_err(|e| format!("Database error: {}", e))?;
169
170        Ok(rows.iter().map(map_row_to_exchange).collect())
171    }
172
173    async fn find_by_owner(&self, owner_id: Uuid) -> Result<Vec<LocalExchange>, String> {
174        let rows = sqlx::query(
175            r#"
176            SELECT
177                id, building_id, provider_id, requester_id, exchange_type,
178                title, description, credits, status, offered_at,
179                requested_at, started_at, completed_at, cancelled_at,
180                cancellation_reason, provider_rating, requester_rating,
181                created_at, updated_at
182            FROM local_exchanges
183            WHERE provider_id = $1 OR requester_id = $1
184            ORDER BY offered_at DESC
185            "#,
186        )
187        .bind(owner_id)
188        .fetch_all(&self.pool)
189        .await
190        .map_err(|e| format!("Database error: {}", e))?;
191
192        Ok(rows.iter().map(map_row_to_exchange).collect())
193    }
194
195    async fn find_active_by_building(
196        &self,
197        building_id: Uuid,
198    ) -> Result<Vec<LocalExchange>, String> {
199        let rows = sqlx::query(
200            r#"
201            SELECT
202                id, building_id, provider_id, requester_id, exchange_type,
203                title, description, credits, status, offered_at,
204                requested_at, started_at, completed_at, cancelled_at,
205                cancellation_reason, provider_rating, requester_rating,
206                created_at, updated_at
207            FROM local_exchanges
208            WHERE building_id = $1
209              AND status IN ('Offered', 'Requested', 'InProgress')
210            ORDER BY offered_at DESC
211            "#,
212        )
213        .bind(building_id)
214        .fetch_all(&self.pool)
215        .await
216        .map_err(|e| format!("Database error: {}", e))?;
217
218        Ok(rows.iter().map(map_row_to_exchange).collect())
219    }
220
221    async fn find_available_by_building(
222        &self,
223        building_id: Uuid,
224    ) -> Result<Vec<LocalExchange>, String> {
225        let rows = sqlx::query(
226            r#"
227            SELECT
228                id, building_id, provider_id, requester_id, exchange_type,
229                title, description, credits, status, offered_at,
230                requested_at, started_at, completed_at, cancelled_at,
231                cancellation_reason, provider_rating, requester_rating,
232                created_at, updated_at
233            FROM local_exchanges
234            WHERE building_id = $1 AND status = 'Offered'
235            ORDER BY offered_at DESC
236            "#,
237        )
238        .bind(building_id)
239        .fetch_all(&self.pool)
240        .await
241        .map_err(|e| format!("Database error: {}", e))?;
242
243        Ok(rows.iter().map(map_row_to_exchange).collect())
244    }
245
246    async fn find_by_type(
247        &self,
248        building_id: Uuid,
249        exchange_type: &str,
250    ) -> Result<Vec<LocalExchange>, String> {
251        let rows = sqlx::query(
252            r#"
253            SELECT
254                id, building_id, provider_id, requester_id, exchange_type,
255                title, description, credits, status, offered_at,
256                requested_at, started_at, completed_at, cancelled_at,
257                cancellation_reason, provider_rating, requester_rating,
258                created_at, updated_at
259            FROM local_exchanges
260            WHERE building_id = $1 AND exchange_type = $2
261            ORDER BY offered_at DESC
262            "#,
263        )
264        .bind(building_id)
265        .bind(exchange_type)
266        .fetch_all(&self.pool)
267        .await
268        .map_err(|e| format!("Database error: {}", e))?;
269
270        Ok(rows.iter().map(map_row_to_exchange).collect())
271    }
272
273    async fn update(&self, exchange: &LocalExchange) -> Result<LocalExchange, String> {
274        sqlx::query(
275            r#"
276            UPDATE local_exchanges
277            SET
278                building_id = $2,
279                provider_id = $3,
280                requester_id = $4,
281                exchange_type = $5,
282                title = $6,
283                description = $7,
284                credits = $8,
285                status = $9,
286                offered_at = $10,
287                requested_at = $11,
288                started_at = $12,
289                completed_at = $13,
290                cancelled_at = $14,
291                cancellation_reason = $15,
292                provider_rating = $16,
293                requester_rating = $17,
294                updated_at = $18
295            WHERE id = $1
296            "#,
297        )
298        .bind(exchange.id)
299        .bind(exchange.building_id)
300        .bind(exchange.provider_id)
301        .bind(exchange.requester_id)
302        .bind(exchange.exchange_type.to_sql())
303        .bind(&exchange.title)
304        .bind(&exchange.description)
305        .bind(exchange.credits)
306        .bind(exchange.status.to_sql())
307        .bind(exchange.offered_at)
308        .bind(exchange.requested_at)
309        .bind(exchange.started_at)
310        .bind(exchange.completed_at)
311        .bind(exchange.cancelled_at)
312        .bind(&exchange.cancellation_reason)
313        .bind(exchange.provider_rating)
314        .bind(exchange.requester_rating)
315        .bind(exchange.updated_at)
316        .execute(&self.pool)
317        .await
318        .map_err(|e| format!("Database error: {}", e))?;
319
320        Ok(exchange.clone())
321    }
322
323    async fn delete(&self, id: Uuid) -> Result<bool, String> {
324        let result = sqlx::query("DELETE FROM local_exchanges WHERE id = $1")
325            .bind(id)
326            .execute(&self.pool)
327            .await
328            .map_err(|e| format!("Database error: {}", e))?;
329
330        Ok(result.rows_affected() > 0)
331    }
332
333    async fn count_by_building(&self, building_id: Uuid) -> Result<i64, String> {
334        let count: i64 =
335            sqlx::query_scalar("SELECT COUNT(*) FROM local_exchanges WHERE building_id = $1")
336                .bind(building_id)
337                .fetch_one(&self.pool)
338                .await
339                .map_err(|e| format!("Database error: {}", e))?;
340
341        Ok(count)
342    }
343
344    async fn count_by_building_and_status(
345        &self,
346        building_id: Uuid,
347        status: &str,
348    ) -> Result<i64, String> {
349        let count: i64 = sqlx::query_scalar(
350            "SELECT COUNT(*) FROM local_exchanges WHERE building_id = $1 AND status = $2",
351        )
352        .bind(building_id)
353        .bind(status)
354        .fetch_one(&self.pool)
355        .await
356        .map_err(|e| format!("Database error: {}", e))?;
357
358        Ok(count)
359    }
360
361    async fn get_total_credits_exchanged(&self, building_id: Uuid) -> Result<i32, String> {
362        let total: Option<i64> = sqlx::query_scalar(
363            r#"
364            SELECT COALESCE(SUM(credits), 0)
365            FROM local_exchanges
366            WHERE building_id = $1 AND status = 'Completed'
367            "#,
368        )
369        .bind(building_id)
370        .fetch_one(&self.pool)
371        .await
372        .map_err(|e| format!("Database error: {}", e))?;
373
374        Ok(total.unwrap_or(0) as i32)
375    }
376
377    async fn get_average_exchange_rating(&self, building_id: Uuid) -> Result<Option<f32>, String> {
378        let avg: Option<f32> = sqlx::query_scalar(
379            r#"
380            SELECT AVG((provider_rating + requester_rating) / 2.0)
381            FROM local_exchanges
382            WHERE building_id = $1
383              AND status = 'Completed'
384              AND provider_rating IS NOT NULL
385              AND requester_rating IS NOT NULL
386            "#,
387        )
388        .bind(building_id)
389        .fetch_one(&self.pool)
390        .await
391        .map_err(|e| format!("Database error: {}", e))?;
392
393        Ok(avg)
394    }
395}
396
397/// Helper function to map PostgreSQL row to LocalExchange entity
398fn map_row_to_exchange(row: &sqlx::postgres::PgRow) -> LocalExchange {
399    LocalExchange {
400        id: row.get("id"),
401        building_id: row.get("building_id"),
402        provider_id: row.get("provider_id"),
403        requester_id: row.get("requester_id"),
404        exchange_type: ExchangeType::from_sql(row.get("exchange_type"))
405            .unwrap_or(ExchangeType::Service),
406        title: row.get("title"),
407        description: row.get("description"),
408        credits: row.get("credits"),
409        status: ExchangeStatus::from_sql(row.get("status")).unwrap_or(ExchangeStatus::Offered),
410        offered_at: row.get("offered_at"),
411        requested_at: row.get("requested_at"),
412        started_at: row.get("started_at"),
413        completed_at: row.get("completed_at"),
414        cancelled_at: row.get("cancelled_at"),
415        cancellation_reason: row.get("cancellation_reason"),
416        provider_rating: row.get("provider_rating"),
417        requester_rating: row.get("requester_rating"),
418        created_at: row.get("created_at"),
419        updated_at: row.get("updated_at"),
420    }
421}