koprogo_api/infrastructure/database/repositories/
owner_repository_impl.rs

1use crate::application::dto::{OwnerFilters, PageRequest};
2use crate::application::ports::OwnerRepository;
3use crate::domain::entities::Owner;
4use crate::infrastructure::database::pool::DbPool;
5use async_trait::async_trait;
6use sqlx::Row;
7use uuid::Uuid;
8
9pub struct PostgresOwnerRepository {
10    pool: DbPool,
11}
12
13impl PostgresOwnerRepository {
14    pub fn new(pool: DbPool) -> Self {
15        Self { pool }
16    }
17}
18
19#[async_trait]
20impl OwnerRepository for PostgresOwnerRepository {
21    async fn create(&self, owner: &Owner) -> Result<Owner, String> {
22        sqlx::query(
23            r#"
24            INSERT INTO owners (id, organization_id, user_id, first_name, last_name, email, phone, address, city, postal_code, country, created_at, updated_at)
25            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
26            "#,
27        )
28        .bind(owner.id)
29        .bind(owner.organization_id)
30        .bind(owner.user_id)
31        .bind(&owner.first_name)
32        .bind(&owner.last_name)
33        .bind(&owner.email)
34        .bind(&owner.phone)
35        .bind(&owner.address)
36        .bind(&owner.city)
37        .bind(&owner.postal_code)
38        .bind(&owner.country)
39        .bind(owner.created_at)
40        .bind(owner.updated_at)
41        .execute(&self.pool)
42        .await
43        .map_err(|e| format!("Database error: {}", e))?;
44
45        Ok(owner.clone())
46    }
47
48    async fn find_by_id(&self, id: Uuid) -> Result<Option<Owner>, String> {
49        let row = sqlx::query(
50            r#"
51            SELECT id, organization_id, user_id, first_name, last_name, email, phone, address, city, postal_code, country, created_at, updated_at
52            FROM owners
53            WHERE id = $1
54            "#,
55        )
56        .bind(id)
57        .fetch_optional(&self.pool)
58        .await
59        .map_err(|e| format!("Database error: {}", e))?;
60
61        Ok(row.map(|row| Owner {
62            id: row.get("id"),
63            organization_id: row.get("organization_id"),
64            user_id: row.get("user_id"),
65            first_name: row.get("first_name"),
66            last_name: row.get("last_name"),
67            email: row.get("email"),
68            phone: row.get("phone"),
69            address: row.get("address"),
70            city: row.get("city"),
71            postal_code: row.get("postal_code"),
72            country: row.get("country"),
73            created_at: row.get("created_at"),
74            updated_at: row.get("updated_at"),
75        }))
76    }
77
78    async fn find_by_user_id(&self, user_id: Uuid) -> Result<Option<Owner>, String> {
79        let row = sqlx::query(
80            r#"
81            SELECT id, organization_id, user_id, first_name, last_name, email, phone, address, city, postal_code, country, created_at, updated_at
82            FROM owners
83            WHERE user_id = $1
84            "#,
85        )
86        .bind(user_id)
87        .fetch_optional(&self.pool)
88        .await
89        .map_err(|e| format!("Database error: {}", e))?;
90
91        Ok(row.map(|row| Owner {
92            id: row.get("id"),
93            organization_id: row.get("organization_id"),
94            user_id: row.get("user_id"),
95            first_name: row.get("first_name"),
96            last_name: row.get("last_name"),
97            email: row.get("email"),
98            phone: row.get("phone"),
99            address: row.get("address"),
100            city: row.get("city"),
101            postal_code: row.get("postal_code"),
102            country: row.get("country"),
103            created_at: row.get("created_at"),
104            updated_at: row.get("updated_at"),
105        }))
106    }
107
108    async fn find_by_user_id_and_organization(
109        &self,
110        user_id: Uuid,
111        organization_id: Uuid,
112    ) -> Result<Option<Owner>, String> {
113        let row = sqlx::query(
114            r#"
115            SELECT id, organization_id, user_id, first_name, last_name, email, phone, address, city, postal_code, country, created_at, updated_at
116            FROM owners
117            WHERE user_id = $1 AND organization_id = $2
118            "#,
119        )
120        .bind(user_id)
121        .bind(organization_id)
122        .fetch_optional(&self.pool)
123        .await
124        .map_err(|e| format!("Database error: {}", e))?;
125
126        Ok(row.map(|row| Owner {
127            id: row.get("id"),
128            organization_id: row.get("organization_id"),
129            user_id: row.get("user_id"),
130            first_name: row.get("first_name"),
131            last_name: row.get("last_name"),
132            email: row.get("email"),
133            phone: row.get("phone"),
134            address: row.get("address"),
135            city: row.get("city"),
136            postal_code: row.get("postal_code"),
137            country: row.get("country"),
138            created_at: row.get("created_at"),
139            updated_at: row.get("updated_at"),
140        }))
141    }
142
143    async fn find_by_email(&self, email: &str) -> Result<Option<Owner>, String> {
144        let row = sqlx::query(
145            r#"
146            SELECT id, organization_id, user_id, first_name, last_name, email, phone, address, city, postal_code, country, created_at, updated_at
147            FROM owners
148            WHERE email = $1
149            "#,
150        )
151        .bind(email)
152        .fetch_optional(&self.pool)
153        .await
154        .map_err(|e| format!("Database error: {}", e))?;
155
156        Ok(row.map(|row| Owner {
157            id: row.get("id"),
158            organization_id: row.get("organization_id"),
159            user_id: row.get("user_id"),
160            first_name: row.get("first_name"),
161            last_name: row.get("last_name"),
162            email: row.get("email"),
163            phone: row.get("phone"),
164            address: row.get("address"),
165            city: row.get("city"),
166            postal_code: row.get("postal_code"),
167            country: row.get("country"),
168            created_at: row.get("created_at"),
169            updated_at: row.get("updated_at"),
170        }))
171    }
172
173    async fn find_all(&self) -> Result<Vec<Owner>, String> {
174        let rows = sqlx::query(
175            r#"
176            SELECT id, organization_id, user_id, first_name, last_name, email, phone, address, city, postal_code, country, created_at, updated_at
177            FROM owners
178            ORDER BY last_name, first_name
179            "#,
180        )
181        .fetch_all(&self.pool)
182        .await
183        .map_err(|e| format!("Database error: {}", e))?;
184
185        Ok(rows
186            .iter()
187            .map(|row| Owner {
188                id: row.get("id"),
189                organization_id: row.get("organization_id"),
190                user_id: row.get("user_id"),
191                first_name: row.get("first_name"),
192                last_name: row.get("last_name"),
193                email: row.get("email"),
194                phone: row.get("phone"),
195                address: row.get("address"),
196                city: row.get("city"),
197                postal_code: row.get("postal_code"),
198                country: row.get("country"),
199                created_at: row.get("created_at"),
200                updated_at: row.get("updated_at"),
201            })
202            .collect())
203    }
204
205    async fn find_all_paginated(
206        &self,
207        page_request: &PageRequest,
208        filters: &OwnerFilters,
209    ) -> Result<(Vec<Owner>, i64), String> {
210        // Validate page request
211        page_request.validate()?;
212
213        // Build WHERE clause dynamically
214        let mut where_clauses = Vec::new();
215        let mut param_count = 0;
216
217        if filters.organization_id.is_some() {
218            param_count += 1;
219            where_clauses.push(format!("organization_id = ${}", param_count));
220        }
221
222        if filters.email.is_some() {
223            param_count += 1;
224            where_clauses.push(format!("email ILIKE ${}", param_count));
225        }
226
227        if filters.phone.is_some() {
228            param_count += 1;
229            where_clauses.push(format!("phone ILIKE ${}", param_count));
230        }
231
232        if filters.last_name.is_some() {
233            param_count += 1;
234            where_clauses.push(format!("last_name ILIKE ${}", param_count));
235        }
236
237        if filters.first_name.is_some() {
238            param_count += 1;
239            where_clauses.push(format!("first_name ILIKE ${}", param_count));
240        }
241
242        let where_clause = if where_clauses.is_empty() {
243            String::new()
244        } else {
245            format!("WHERE {}", where_clauses.join(" AND "))
246        };
247
248        // Validate sort column (whitelist)
249        let allowed_columns = ["last_name", "first_name", "email", "created_at"];
250        let sort_column = page_request.sort_by.as_deref().unwrap_or("last_name");
251
252        if !allowed_columns.contains(&sort_column) {
253            return Err(format!("Invalid sort column: {}", sort_column));
254        }
255
256        // Count total items
257        let count_query = format!("SELECT COUNT(*) FROM owners {}", where_clause);
258        let mut count_query = sqlx::query_scalar::<_, i64>(&count_query);
259
260        if let Some(organization_id) = &filters.organization_id {
261            count_query = count_query.bind(organization_id);
262        }
263        if let Some(email) = &filters.email {
264            count_query = count_query.bind(format!("%{}%", email));
265        }
266        if let Some(phone) = &filters.phone {
267            count_query = count_query.bind(format!("%{}%", phone));
268        }
269        if let Some(last_name) = &filters.last_name {
270            count_query = count_query.bind(format!("%{}%", last_name));
271        }
272        if let Some(first_name) = &filters.first_name {
273            count_query = count_query.bind(format!("%{}%", first_name));
274        }
275
276        let total_items = count_query
277            .fetch_one(&self.pool)
278            .await
279            .map_err(|e| format!("Database error: {}", e))?;
280
281        // Fetch paginated data
282        param_count += 1;
283        let limit_param = param_count;
284        param_count += 1;
285        let offset_param = param_count;
286
287        let data_query = format!(
288            "SELECT id, organization_id, user_id, first_name, last_name, email, phone, address, city, postal_code, country, created_at, updated_at \
289             FROM owners {} ORDER BY {} {} LIMIT ${} OFFSET ${}",
290            where_clause,
291            sort_column,
292            page_request.order.to_sql(),
293            limit_param,
294            offset_param
295        );
296
297        let mut data_query = sqlx::query(&data_query);
298
299        if let Some(organization_id) = &filters.organization_id {
300            data_query = data_query.bind(organization_id);
301        }
302        if let Some(email) = &filters.email {
303            data_query = data_query.bind(format!("%{}%", email));
304        }
305        if let Some(phone) = &filters.phone {
306            data_query = data_query.bind(format!("%{}%", phone));
307        }
308        if let Some(last_name) = &filters.last_name {
309            data_query = data_query.bind(format!("%{}%", last_name));
310        }
311        if let Some(first_name) = &filters.first_name {
312            data_query = data_query.bind(format!("%{}%", first_name));
313        }
314
315        data_query = data_query
316            .bind(page_request.limit())
317            .bind(page_request.offset());
318
319        let rows = data_query
320            .fetch_all(&self.pool)
321            .await
322            .map_err(|e| format!("Database error: {}", e))?;
323
324        let owners: Vec<Owner> = rows
325            .iter()
326            .map(|row| Owner {
327                id: row.get("id"),
328                organization_id: row.get("organization_id"),
329                user_id: row.get("user_id"),
330                first_name: row.get("first_name"),
331                last_name: row.get("last_name"),
332                email: row.get("email"),
333                phone: row.get("phone"),
334                address: row.get("address"),
335                city: row.get("city"),
336                postal_code: row.get("postal_code"),
337                country: row.get("country"),
338                created_at: row.get("created_at"),
339                updated_at: row.get("updated_at"),
340            })
341            .collect();
342
343        Ok((owners, total_items))
344    }
345
346    async fn update(&self, owner: &Owner) -> Result<Owner, String> {
347        sqlx::query(
348            r#"
349            UPDATE owners
350            SET first_name = $2, last_name = $3, email = $4, phone = $5, address = $6, city = $7, postal_code = $8, country = $9, updated_at = $10
351            WHERE id = $1
352            "#,
353        )
354        .bind(owner.id)
355        .bind(&owner.first_name)
356        .bind(&owner.last_name)
357        .bind(&owner.email)
358        .bind(&owner.phone)
359        .bind(&owner.address)
360        .bind(&owner.city)
361        .bind(&owner.postal_code)
362        .bind(&owner.country)
363        .bind(owner.updated_at)
364        .execute(&self.pool)
365        .await
366        .map_err(|e| format!("Database error: {}", e))?;
367
368        Ok(owner.clone())
369    }
370
371    async fn delete(&self, id: Uuid) -> Result<bool, String> {
372        let result = sqlx::query("DELETE FROM owners WHERE id = $1")
373            .bind(id)
374            .execute(&self.pool)
375            .await
376            .map_err(|e| format!("Database error: {}", e))?;
377
378        Ok(result.rows_affected() > 0)
379    }
380}