koprogo_api/infrastructure/database/repositories/
owner_repository_impl.rs

1use crate::application::dto::{OwnerFilters, PageRequest};
2use crate::application::ports::OwnerRepository;
3use crate::domain::entities::Owner;
4use crate::infrastructure::database::pool::DbPool;
5use async_trait::async_trait;
6use sqlx::Row;
7use uuid::Uuid;
8
9pub struct PostgresOwnerRepository {
10    pool: DbPool,
11}
12
13impl PostgresOwnerRepository {
14    pub fn new(pool: DbPool) -> Self {
15        Self { pool }
16    }
17}
18
19#[async_trait]
20impl OwnerRepository for PostgresOwnerRepository {
21    async fn create(&self, owner: &Owner) -> Result<Owner, String> {
22        sqlx::query(
23            r#"
24            INSERT INTO owners (id, organization_id, first_name, last_name, email, phone, address, city, postal_code, country, created_at, updated_at)
25            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
26            "#,
27        )
28        .bind(owner.id)
29        .bind(owner.organization_id)
30        .bind(&owner.first_name)
31        .bind(&owner.last_name)
32        .bind(&owner.email)
33        .bind(&owner.phone)
34        .bind(&owner.address)
35        .bind(&owner.city)
36        .bind(&owner.postal_code)
37        .bind(&owner.country)
38        .bind(owner.created_at)
39        .bind(owner.updated_at)
40        .execute(&self.pool)
41        .await
42        .map_err(|e| format!("Database error: {}", e))?;
43
44        Ok(owner.clone())
45    }
46
47    async fn find_by_id(&self, id: Uuid) -> Result<Option<Owner>, String> {
48        let row = sqlx::query(
49            r#"
50            SELECT id, organization_id, first_name, last_name, email, phone, address, city, postal_code, country, created_at, updated_at
51            FROM owners
52            WHERE id = $1
53            "#,
54        )
55        .bind(id)
56        .fetch_optional(&self.pool)
57        .await
58        .map_err(|e| format!("Database error: {}", e))?;
59
60        Ok(row.map(|row| Owner {
61            id: row.get("id"),
62            organization_id: row.get("organization_id"),
63            first_name: row.get("first_name"),
64            last_name: row.get("last_name"),
65            email: row.get("email"),
66            phone: row.get("phone"),
67            address: row.get("address"),
68            city: row.get("city"),
69            postal_code: row.get("postal_code"),
70            country: row.get("country"),
71            created_at: row.get("created_at"),
72            updated_at: row.get("updated_at"),
73        }))
74    }
75
76    async fn find_by_email(&self, email: &str) -> Result<Option<Owner>, String> {
77        let row = sqlx::query(
78            r#"
79            SELECT id, organization_id, first_name, last_name, email, phone, address, city, postal_code, country, created_at, updated_at
80            FROM owners
81            WHERE email = $1
82            "#,
83        )
84        .bind(email)
85        .fetch_optional(&self.pool)
86        .await
87        .map_err(|e| format!("Database error: {}", e))?;
88
89        Ok(row.map(|row| Owner {
90            id: row.get("id"),
91            organization_id: row.get("organization_id"),
92            first_name: row.get("first_name"),
93            last_name: row.get("last_name"),
94            email: row.get("email"),
95            phone: row.get("phone"),
96            address: row.get("address"),
97            city: row.get("city"),
98            postal_code: row.get("postal_code"),
99            country: row.get("country"),
100            created_at: row.get("created_at"),
101            updated_at: row.get("updated_at"),
102        }))
103    }
104
105    async fn find_all(&self) -> Result<Vec<Owner>, String> {
106        let rows = sqlx::query(
107            r#"
108            SELECT id, organization_id, first_name, last_name, email, phone, address, city, postal_code, country, created_at, updated_at
109            FROM owners
110            ORDER BY last_name, first_name
111            "#,
112        )
113        .fetch_all(&self.pool)
114        .await
115        .map_err(|e| format!("Database error: {}", e))?;
116
117        Ok(rows
118            .iter()
119            .map(|row| Owner {
120                id: row.get("id"),
121                organization_id: row.get("organization_id"),
122                first_name: row.get("first_name"),
123                last_name: row.get("last_name"),
124                email: row.get("email"),
125                phone: row.get("phone"),
126                address: row.get("address"),
127                city: row.get("city"),
128                postal_code: row.get("postal_code"),
129                country: row.get("country"),
130                created_at: row.get("created_at"),
131                updated_at: row.get("updated_at"),
132            })
133            .collect())
134    }
135
136    async fn find_all_paginated(
137        &self,
138        page_request: &PageRequest,
139        filters: &OwnerFilters,
140    ) -> Result<(Vec<Owner>, i64), String> {
141        // Validate page request
142        page_request.validate()?;
143
144        // Build WHERE clause dynamically
145        let mut where_clauses = Vec::new();
146        let mut param_count = 0;
147
148        if filters.organization_id.is_some() {
149            param_count += 1;
150            where_clauses.push(format!("organization_id = ${}", param_count));
151        }
152
153        if filters.email.is_some() {
154            param_count += 1;
155            where_clauses.push(format!("email ILIKE ${}", param_count));
156        }
157
158        if filters.phone.is_some() {
159            param_count += 1;
160            where_clauses.push(format!("phone ILIKE ${}", param_count));
161        }
162
163        if filters.last_name.is_some() {
164            param_count += 1;
165            where_clauses.push(format!("last_name ILIKE ${}", param_count));
166        }
167
168        if filters.first_name.is_some() {
169            param_count += 1;
170            where_clauses.push(format!("first_name ILIKE ${}", param_count));
171        }
172
173        let where_clause = if where_clauses.is_empty() {
174            String::new()
175        } else {
176            format!("WHERE {}", where_clauses.join(" AND "))
177        };
178
179        // Validate sort column (whitelist)
180        let allowed_columns = ["last_name", "first_name", "email", "created_at"];
181        let sort_column = page_request.sort_by.as_deref().unwrap_or("last_name");
182
183        if !allowed_columns.contains(&sort_column) {
184            return Err(format!("Invalid sort column: {}", sort_column));
185        }
186
187        // Count total items
188        let count_query = format!("SELECT COUNT(*) FROM owners {}", where_clause);
189        let mut count_query = sqlx::query_scalar::<_, i64>(&count_query);
190
191        if let Some(organization_id) = &filters.organization_id {
192            count_query = count_query.bind(organization_id);
193        }
194        if let Some(email) = &filters.email {
195            count_query = count_query.bind(format!("%{}%", email));
196        }
197        if let Some(phone) = &filters.phone {
198            count_query = count_query.bind(format!("%{}%", phone));
199        }
200        if let Some(last_name) = &filters.last_name {
201            count_query = count_query.bind(format!("%{}%", last_name));
202        }
203        if let Some(first_name) = &filters.first_name {
204            count_query = count_query.bind(format!("%{}%", first_name));
205        }
206
207        let total_items = count_query
208            .fetch_one(&self.pool)
209            .await
210            .map_err(|e| format!("Database error: {}", e))?;
211
212        // Fetch paginated data
213        param_count += 1;
214        let limit_param = param_count;
215        param_count += 1;
216        let offset_param = param_count;
217
218        let data_query = format!(
219            "SELECT id, organization_id, first_name, last_name, email, phone, address, city, postal_code, country, created_at, updated_at \
220             FROM owners {} ORDER BY {} {} LIMIT ${} OFFSET ${}",
221            where_clause,
222            sort_column,
223            page_request.order.to_sql(),
224            limit_param,
225            offset_param
226        );
227
228        let mut data_query = sqlx::query(&data_query);
229
230        if let Some(organization_id) = &filters.organization_id {
231            data_query = data_query.bind(organization_id);
232        }
233        if let Some(email) = &filters.email {
234            data_query = data_query.bind(format!("%{}%", email));
235        }
236        if let Some(phone) = &filters.phone {
237            data_query = data_query.bind(format!("%{}%", phone));
238        }
239        if let Some(last_name) = &filters.last_name {
240            data_query = data_query.bind(format!("%{}%", last_name));
241        }
242        if let Some(first_name) = &filters.first_name {
243            data_query = data_query.bind(format!("%{}%", first_name));
244        }
245
246        data_query = data_query
247            .bind(page_request.limit())
248            .bind(page_request.offset());
249
250        let rows = data_query
251            .fetch_all(&self.pool)
252            .await
253            .map_err(|e| format!("Database error: {}", e))?;
254
255        let owners: Vec<Owner> = rows
256            .iter()
257            .map(|row| Owner {
258                id: row.get("id"),
259                organization_id: row.get("organization_id"),
260                first_name: row.get("first_name"),
261                last_name: row.get("last_name"),
262                email: row.get("email"),
263                phone: row.get("phone"),
264                address: row.get("address"),
265                city: row.get("city"),
266                postal_code: row.get("postal_code"),
267                country: row.get("country"),
268                created_at: row.get("created_at"),
269                updated_at: row.get("updated_at"),
270            })
271            .collect();
272
273        Ok((owners, total_items))
274    }
275
276    async fn update(&self, owner: &Owner) -> Result<Owner, String> {
277        sqlx::query(
278            r#"
279            UPDATE owners
280            SET first_name = $2, last_name = $3, email = $4, phone = $5, address = $6, city = $7, postal_code = $8, country = $9, updated_at = $10
281            WHERE id = $1
282            "#,
283        )
284        .bind(owner.id)
285        .bind(&owner.first_name)
286        .bind(&owner.last_name)
287        .bind(&owner.email)
288        .bind(&owner.phone)
289        .bind(&owner.address)
290        .bind(&owner.city)
291        .bind(&owner.postal_code)
292        .bind(&owner.country)
293        .bind(owner.updated_at)
294        .execute(&self.pool)
295        .await
296        .map_err(|e| format!("Database error: {}", e))?;
297
298        Ok(owner.clone())
299    }
300
301    async fn delete(&self, id: Uuid) -> Result<bool, String> {
302        let result = sqlx::query("DELETE FROM owners WHERE id = $1")
303            .bind(id)
304            .execute(&self.pool)
305            .await
306            .map_err(|e| format!("Database error: {}", e))?;
307
308        Ok(result.rows_affected() > 0)
309    }
310}