koprogo_api/infrastructure/database/repositories/
owner_repository_impl.rs

1use crate::application::dto::{OwnerFilters, PageRequest};
2use crate::application::ports::OwnerRepository;
3use crate::domain::entities::Owner;
4use crate::infrastructure::database::pool::DbPool;
5use async_trait::async_trait;
6use sqlx::Row;
7use uuid::Uuid;
8
9pub struct PostgresOwnerRepository {
10    pool: DbPool,
11}
12
13impl PostgresOwnerRepository {
14    pub fn new(pool: DbPool) -> Self {
15        Self { pool }
16    }
17}
18
19#[async_trait]
20impl OwnerRepository for PostgresOwnerRepository {
21    async fn create(&self, owner: &Owner) -> Result<Owner, String> {
22        sqlx::query(
23            r#"
24            INSERT INTO owners (id, organization_id, user_id, first_name, last_name, email, phone, address, city, postal_code, country, created_at, updated_at)
25            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
26            "#,
27        )
28        .bind(owner.id)
29        .bind(owner.organization_id)
30        .bind(owner.user_id)
31        .bind(&owner.first_name)
32        .bind(&owner.last_name)
33        .bind(&owner.email)
34        .bind(&owner.phone)
35        .bind(&owner.address)
36        .bind(&owner.city)
37        .bind(&owner.postal_code)
38        .bind(&owner.country)
39        .bind(owner.created_at)
40        .bind(owner.updated_at)
41        .execute(&self.pool)
42        .await
43        .map_err(|e| format!("Database error: {}", e))?;
44
45        Ok(owner.clone())
46    }
47
48    async fn find_by_id(&self, id: Uuid) -> Result<Option<Owner>, String> {
49        let row = sqlx::query(
50            r#"
51            SELECT id, organization_id, user_id, first_name, last_name, email, phone, address, city, postal_code, country, created_at, updated_at
52            FROM owners
53            WHERE id = $1
54            "#,
55        )
56        .bind(id)
57        .fetch_optional(&self.pool)
58        .await
59        .map_err(|e| format!("Database error: {}", e))?;
60
61        Ok(row.map(|row| Owner {
62            id: row.get("id"),
63            organization_id: row.get("organization_id"),
64            user_id: row.get("user_id"),
65            first_name: row.get("first_name"),
66            last_name: row.get("last_name"),
67            email: row.get("email"),
68            phone: row.get("phone"),
69            address: row.get("address"),
70            city: row.get("city"),
71            postal_code: row.get("postal_code"),
72            country: row.get("country"),
73            created_at: row.get("created_at"),
74            updated_at: row.get("updated_at"),
75        }))
76    }
77
78    async fn find_by_email(&self, email: &str) -> Result<Option<Owner>, String> {
79        let row = sqlx::query(
80            r#"
81            SELECT id, organization_id, user_id, first_name, last_name, email, phone, address, city, postal_code, country, created_at, updated_at
82            FROM owners
83            WHERE email = $1
84            "#,
85        )
86        .bind(email)
87        .fetch_optional(&self.pool)
88        .await
89        .map_err(|e| format!("Database error: {}", e))?;
90
91        Ok(row.map(|row| Owner {
92            id: row.get("id"),
93            organization_id: row.get("organization_id"),
94            user_id: row.get("user_id"),
95            first_name: row.get("first_name"),
96            last_name: row.get("last_name"),
97            email: row.get("email"),
98            phone: row.get("phone"),
99            address: row.get("address"),
100            city: row.get("city"),
101            postal_code: row.get("postal_code"),
102            country: row.get("country"),
103            created_at: row.get("created_at"),
104            updated_at: row.get("updated_at"),
105        }))
106    }
107
108    async fn find_all(&self) -> Result<Vec<Owner>, String> {
109        let rows = sqlx::query(
110            r#"
111            SELECT id, organization_id, user_id, first_name, last_name, email, phone, address, city, postal_code, country, created_at, updated_at
112            FROM owners
113            ORDER BY last_name, first_name
114            "#,
115        )
116        .fetch_all(&self.pool)
117        .await
118        .map_err(|e| format!("Database error: {}", e))?;
119
120        Ok(rows
121            .iter()
122            .map(|row| Owner {
123                id: row.get("id"),
124                organization_id: row.get("organization_id"),
125                user_id: row.get("user_id"),
126                first_name: row.get("first_name"),
127                last_name: row.get("last_name"),
128                email: row.get("email"),
129                phone: row.get("phone"),
130                address: row.get("address"),
131                city: row.get("city"),
132                postal_code: row.get("postal_code"),
133                country: row.get("country"),
134                created_at: row.get("created_at"),
135                updated_at: row.get("updated_at"),
136            })
137            .collect())
138    }
139
140    async fn find_all_paginated(
141        &self,
142        page_request: &PageRequest,
143        filters: &OwnerFilters,
144    ) -> Result<(Vec<Owner>, i64), String> {
145        // Validate page request
146        page_request.validate()?;
147
148        // Build WHERE clause dynamically
149        let mut where_clauses = Vec::new();
150        let mut param_count = 0;
151
152        if filters.organization_id.is_some() {
153            param_count += 1;
154            where_clauses.push(format!("organization_id = ${}", param_count));
155        }
156
157        if filters.email.is_some() {
158            param_count += 1;
159            where_clauses.push(format!("email ILIKE ${}", param_count));
160        }
161
162        if filters.phone.is_some() {
163            param_count += 1;
164            where_clauses.push(format!("phone ILIKE ${}", param_count));
165        }
166
167        if filters.last_name.is_some() {
168            param_count += 1;
169            where_clauses.push(format!("last_name ILIKE ${}", param_count));
170        }
171
172        if filters.first_name.is_some() {
173            param_count += 1;
174            where_clauses.push(format!("first_name ILIKE ${}", param_count));
175        }
176
177        let where_clause = if where_clauses.is_empty() {
178            String::new()
179        } else {
180            format!("WHERE {}", where_clauses.join(" AND "))
181        };
182
183        // Validate sort column (whitelist)
184        let allowed_columns = ["last_name", "first_name", "email", "created_at"];
185        let sort_column = page_request.sort_by.as_deref().unwrap_or("last_name");
186
187        if !allowed_columns.contains(&sort_column) {
188            return Err(format!("Invalid sort column: {}", sort_column));
189        }
190
191        // Count total items
192        let count_query = format!("SELECT COUNT(*) FROM owners {}", where_clause);
193        let mut count_query = sqlx::query_scalar::<_, i64>(&count_query);
194
195        if let Some(organization_id) = &filters.organization_id {
196            count_query = count_query.bind(organization_id);
197        }
198        if let Some(email) = &filters.email {
199            count_query = count_query.bind(format!("%{}%", email));
200        }
201        if let Some(phone) = &filters.phone {
202            count_query = count_query.bind(format!("%{}%", phone));
203        }
204        if let Some(last_name) = &filters.last_name {
205            count_query = count_query.bind(format!("%{}%", last_name));
206        }
207        if let Some(first_name) = &filters.first_name {
208            count_query = count_query.bind(format!("%{}%", first_name));
209        }
210
211        let total_items = count_query
212            .fetch_one(&self.pool)
213            .await
214            .map_err(|e| format!("Database error: {}", e))?;
215
216        // Fetch paginated data
217        param_count += 1;
218        let limit_param = param_count;
219        param_count += 1;
220        let offset_param = param_count;
221
222        let data_query = format!(
223            "SELECT id, organization_id, user_id, first_name, last_name, email, phone, address, city, postal_code, country, created_at, updated_at \
224             FROM owners {} ORDER BY {} {} LIMIT ${} OFFSET ${}",
225            where_clause,
226            sort_column,
227            page_request.order.to_sql(),
228            limit_param,
229            offset_param
230        );
231
232        let mut data_query = sqlx::query(&data_query);
233
234        if let Some(organization_id) = &filters.organization_id {
235            data_query = data_query.bind(organization_id);
236        }
237        if let Some(email) = &filters.email {
238            data_query = data_query.bind(format!("%{}%", email));
239        }
240        if let Some(phone) = &filters.phone {
241            data_query = data_query.bind(format!("%{}%", phone));
242        }
243        if let Some(last_name) = &filters.last_name {
244            data_query = data_query.bind(format!("%{}%", last_name));
245        }
246        if let Some(first_name) = &filters.first_name {
247            data_query = data_query.bind(format!("%{}%", first_name));
248        }
249
250        data_query = data_query
251            .bind(page_request.limit())
252            .bind(page_request.offset());
253
254        let rows = data_query
255            .fetch_all(&self.pool)
256            .await
257            .map_err(|e| format!("Database error: {}", e))?;
258
259        let owners: Vec<Owner> = rows
260            .iter()
261            .map(|row| Owner {
262                id: row.get("id"),
263                organization_id: row.get("organization_id"),
264                user_id: row.get("user_id"),
265                first_name: row.get("first_name"),
266                last_name: row.get("last_name"),
267                email: row.get("email"),
268                phone: row.get("phone"),
269                address: row.get("address"),
270                city: row.get("city"),
271                postal_code: row.get("postal_code"),
272                country: row.get("country"),
273                created_at: row.get("created_at"),
274                updated_at: row.get("updated_at"),
275            })
276            .collect();
277
278        Ok((owners, total_items))
279    }
280
281    async fn update(&self, owner: &Owner) -> Result<Owner, String> {
282        sqlx::query(
283            r#"
284            UPDATE owners
285            SET first_name = $2, last_name = $3, email = $4, phone = $5, address = $6, city = $7, postal_code = $8, country = $9, updated_at = $10
286            WHERE id = $1
287            "#,
288        )
289        .bind(owner.id)
290        .bind(&owner.first_name)
291        .bind(&owner.last_name)
292        .bind(&owner.email)
293        .bind(&owner.phone)
294        .bind(&owner.address)
295        .bind(&owner.city)
296        .bind(&owner.postal_code)
297        .bind(&owner.country)
298        .bind(owner.updated_at)
299        .execute(&self.pool)
300        .await
301        .map_err(|e| format!("Database error: {}", e))?;
302
303        Ok(owner.clone())
304    }
305
306    async fn delete(&self, id: Uuid) -> Result<bool, String> {
307        let result = sqlx::query("DELETE FROM owners WHERE id = $1")
308            .bind(id)
309            .execute(&self.pool)
310            .await
311            .map_err(|e| format!("Database error: {}", e))?;
312
313        Ok(result.rows_affected() > 0)
314    }
315}