koprogo_api/infrastructure/database/repositories/
individual_member_repository_impl.rs

1use crate::application::ports::individual_member_repository::IndividualMemberRepository;
2use crate::domain::entities::individual_member::IndividualMember;
3use crate::infrastructure::database::pool::DbPool;
4use async_trait::async_trait;
5use sqlx::Row;
6use uuid::Uuid;
7
8pub struct PostgresIndividualMemberRepository {
9    pool: DbPool,
10}
11
12impl PostgresIndividualMemberRepository {
13    pub fn new(pool: DbPool) -> Self {
14        Self { pool }
15    }
16}
17
18#[async_trait]
19impl IndividualMemberRepository for PostgresIndividualMemberRepository {
20    async fn create(&self, member: &IndividualMember) -> Result<IndividualMember, String> {
21        sqlx::query(
22            r#"
23            INSERT INTO individual_members (
24                id, campaign_id, email, postal_code, has_gdpr_consent,
25                consent_at, annual_consumption_kwh, current_provider,
26                ean_code, unsubscribed_at, created_at
27            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
28            "#,
29        )
30        .bind(member.id)
31        .bind(member.campaign_id)
32        .bind(&member.email)
33        .bind(&member.postal_code)
34        .bind(member.has_gdpr_consent)
35        .bind(member.consent_at)
36        .bind(member.annual_consumption_kwh)
37        .bind(&member.current_provider)
38        .bind(&member.ean_code)
39        .bind(member.unsubscribed_at)
40        .bind(member.created_at)
41        .execute(&self.pool)
42        .await
43        .map_err(|e| format!("Database error creating individual member: {}", e))?;
44
45        Ok(member.clone())
46    }
47
48    async fn find_by_id(&self, id: Uuid) -> Result<Option<IndividualMember>, String> {
49        let row = sqlx::query(
50            r#"
51            SELECT id, campaign_id, email, postal_code, has_gdpr_consent,
52                   consent_at, annual_consumption_kwh, current_provider,
53                   ean_code, unsubscribed_at, created_at
54            FROM individual_members
55            WHERE id = $1
56            "#,
57        )
58        .bind(id)
59        .fetch_optional(&self.pool)
60        .await
61        .map_err(|e| format!("Database error: {}", e))?;
62
63        Ok(row.map(|row| IndividualMember {
64            id: row.get("id"),
65            campaign_id: row.get("campaign_id"),
66            email: row.get("email"),
67            postal_code: row.get("postal_code"),
68            has_gdpr_consent: row.get("has_gdpr_consent"),
69            consent_at: row.get("consent_at"),
70            annual_consumption_kwh: row.get("annual_consumption_kwh"),
71            current_provider: row.get("current_provider"),
72            ean_code: row.get("ean_code"),
73            unsubscribed_at: row.get("unsubscribed_at"),
74            created_at: row.get("created_at"),
75        }))
76    }
77
78    async fn find_by_campaign(
79        &self,
80        campaign_id: Uuid,
81        page: i64,
82        per_page: i64,
83    ) -> Result<Vec<IndividualMember>, String> {
84        if page < 1 || per_page < 1 {
85            return Err("Page and per_page must be >= 1".to_string());
86        }
87
88        let offset = (page - 1) * per_page;
89
90        let rows = sqlx::query(
91            r#"
92            SELECT id, campaign_id, email, postal_code, has_gdpr_consent,
93                   consent_at, annual_consumption_kwh, current_provider,
94                   ean_code, unsubscribed_at, created_at
95            FROM individual_members
96            WHERE campaign_id = $1
97            ORDER BY created_at DESC
98            LIMIT $2 OFFSET $3
99            "#,
100        )
101        .bind(campaign_id)
102        .bind(per_page)
103        .bind(offset)
104        .fetch_all(&self.pool)
105        .await
106        .map_err(|e| format!("Database error: {}", e))?;
107
108        Ok(rows
109            .iter()
110            .map(|row| IndividualMember {
111                id: row.get("id"),
112                campaign_id: row.get("campaign_id"),
113                email: row.get("email"),
114                postal_code: row.get("postal_code"),
115                has_gdpr_consent: row.get("has_gdpr_consent"),
116                consent_at: row.get("consent_at"),
117                annual_consumption_kwh: row.get("annual_consumption_kwh"),
118                current_provider: row.get("current_provider"),
119                ean_code: row.get("ean_code"),
120                unsubscribed_at: row.get("unsubscribed_at"),
121                created_at: row.get("created_at"),
122            })
123            .collect())
124    }
125
126    async fn find_by_email(&self, email: &str) -> Result<Option<IndividualMember>, String> {
127        let row = sqlx::query(
128            r#"
129            SELECT id, campaign_id, email, postal_code, has_gdpr_consent,
130                   consent_at, annual_consumption_kwh, current_provider,
131                   ean_code, unsubscribed_at, created_at
132            FROM individual_members
133            WHERE email = $1
134            "#,
135        )
136        .bind(email)
137        .fetch_optional(&self.pool)
138        .await
139        .map_err(|e| format!("Database error: {}", e))?;
140
141        Ok(row.map(|row| IndividualMember {
142            id: row.get("id"),
143            campaign_id: row.get("campaign_id"),
144            email: row.get("email"),
145            postal_code: row.get("postal_code"),
146            has_gdpr_consent: row.get("has_gdpr_consent"),
147            consent_at: row.get("consent_at"),
148            annual_consumption_kwh: row.get("annual_consumption_kwh"),
149            current_provider: row.get("current_provider"),
150            ean_code: row.get("ean_code"),
151            unsubscribed_at: row.get("unsubscribed_at"),
152            created_at: row.get("created_at"),
153        }))
154    }
155
156    async fn find_by_email_and_campaign(
157        &self,
158        email: &str,
159        campaign_id: Uuid,
160    ) -> Result<Option<IndividualMember>, String> {
161        let row = sqlx::query(
162            r#"
163            SELECT id, campaign_id, email, postal_code, has_gdpr_consent,
164                   consent_at, annual_consumption_kwh, current_provider,
165                   ean_code, unsubscribed_at, created_at
166            FROM individual_members
167            WHERE email = $1 AND campaign_id = $2
168            "#,
169        )
170        .bind(email)
171        .bind(campaign_id)
172        .fetch_optional(&self.pool)
173        .await
174        .map_err(|e| format!("Database error: {}", e))?;
175
176        Ok(row.map(|row| IndividualMember {
177            id: row.get("id"),
178            campaign_id: row.get("campaign_id"),
179            email: row.get("email"),
180            postal_code: row.get("postal_code"),
181            has_gdpr_consent: row.get("has_gdpr_consent"),
182            consent_at: row.get("consent_at"),
183            annual_consumption_kwh: row.get("annual_consumption_kwh"),
184            current_provider: row.get("current_provider"),
185            ean_code: row.get("ean_code"),
186            unsubscribed_at: row.get("unsubscribed_at"),
187            created_at: row.get("created_at"),
188        }))
189    }
190
191    async fn find_active_by_campaign(
192        &self,
193        campaign_id: Uuid,
194        page: i64,
195        per_page: i64,
196    ) -> Result<Vec<IndividualMember>, String> {
197        if page < 1 || per_page < 1 {
198            return Err("Page and per_page must be >= 1".to_string());
199        }
200
201        let offset = (page - 1) * per_page;
202
203        let rows = sqlx::query(
204            r#"
205            SELECT id, campaign_id, email, postal_code, has_gdpr_consent,
206                   consent_at, annual_consumption_kwh, current_provider,
207                   ean_code, unsubscribed_at, created_at
208            FROM individual_members
209            WHERE campaign_id = $1 AND unsubscribed_at IS NULL
210            ORDER BY created_at DESC
211            LIMIT $2 OFFSET $3
212            "#,
213        )
214        .bind(campaign_id)
215        .bind(per_page)
216        .bind(offset)
217        .fetch_all(&self.pool)
218        .await
219        .map_err(|e| format!("Database error: {}", e))?;
220
221        Ok(rows
222            .iter()
223            .map(|row| IndividualMember {
224                id: row.get("id"),
225                campaign_id: row.get("campaign_id"),
226                email: row.get("email"),
227                postal_code: row.get("postal_code"),
228                has_gdpr_consent: row.get("has_gdpr_consent"),
229                consent_at: row.get("consent_at"),
230                annual_consumption_kwh: row.get("annual_consumption_kwh"),
231                current_provider: row.get("current_provider"),
232                ean_code: row.get("ean_code"),
233                unsubscribed_at: row.get("unsubscribed_at"),
234                created_at: row.get("created_at"),
235            })
236            .collect())
237    }
238
239    async fn find_with_consent_by_campaign(
240        &self,
241        campaign_id: Uuid,
242        page: i64,
243        per_page: i64,
244    ) -> Result<Vec<IndividualMember>, String> {
245        if page < 1 || per_page < 1 {
246            return Err("Page and per_page must be >= 1".to_string());
247        }
248
249        let offset = (page - 1) * per_page;
250
251        let rows = sqlx::query(
252            r#"
253            SELECT id, campaign_id, email, postal_code, has_gdpr_consent,
254                   consent_at, annual_consumption_kwh, current_provider,
255                   ean_code, unsubscribed_at, created_at
256            FROM individual_members
257            WHERE campaign_id = $1 AND has_gdpr_consent = TRUE AND unsubscribed_at IS NULL
258            ORDER BY created_at DESC
259            LIMIT $2 OFFSET $3
260            "#,
261        )
262        .bind(campaign_id)
263        .bind(per_page)
264        .bind(offset)
265        .fetch_all(&self.pool)
266        .await
267        .map_err(|e| format!("Database error: {}", e))?;
268
269        Ok(rows
270            .iter()
271            .map(|row| IndividualMember {
272                id: row.get("id"),
273                campaign_id: row.get("campaign_id"),
274                email: row.get("email"),
275                postal_code: row.get("postal_code"),
276                has_gdpr_consent: row.get("has_gdpr_consent"),
277                consent_at: row.get("consent_at"),
278                annual_consumption_kwh: row.get("annual_consumption_kwh"),
279                current_provider: row.get("current_provider"),
280                ean_code: row.get("ean_code"),
281                unsubscribed_at: row.get("unsubscribed_at"),
282                created_at: row.get("created_at"),
283            })
284            .collect())
285    }
286
287    async fn update(&self, member: &IndividualMember) -> Result<IndividualMember, String> {
288        sqlx::query(
289            r#"
290            UPDATE individual_members
291            SET email = $1,
292                postal_code = $2,
293                has_gdpr_consent = $3,
294                consent_at = $4,
295                annual_consumption_kwh = $5,
296                current_provider = $6,
297                ean_code = $7,
298                unsubscribed_at = $8
299            WHERE id = $9
300            "#,
301        )
302        .bind(&member.email)
303        .bind(&member.postal_code)
304        .bind(member.has_gdpr_consent)
305        .bind(member.consent_at)
306        .bind(member.annual_consumption_kwh)
307        .bind(&member.current_provider)
308        .bind(&member.ean_code)
309        .bind(member.unsubscribed_at)
310        .bind(member.id)
311        .execute(&self.pool)
312        .await
313        .map_err(|e| format!("Database error updating individual member: {}", e))?;
314
315        Ok(member.clone())
316    }
317
318    async fn delete(&self, id: Uuid) -> Result<(), String> {
319        sqlx::query("DELETE FROM individual_members WHERE id = $1")
320            .bind(id)
321            .execute(&self.pool)
322            .await
323            .map_err(|e| format!("Database error deleting individual member: {}", e))?;
324
325        Ok(())
326    }
327
328    async fn withdraw_consent(&self, id: Uuid) -> Result<(), String> {
329        sqlx::query(
330            r#"
331            UPDATE individual_members
332            SET email = CONCAT('withdrawn_', id::text, '@anonymized.invalid'),
333                postal_code = 'ANONYMIZED',
334                current_provider = NULL,
335                ean_code = NULL,
336                has_gdpr_consent = FALSE,
337                unsubscribed_at = NOW()
338            WHERE id = $1
339            "#,
340        )
341        .bind(id)
342        .execute(&self.pool)
343        .await
344        .map_err(|e| format!("Database error withdrawing consent: {}", e))?;
345
346        Ok(())
347    }
348
349    async fn count_by_campaign(&self, campaign_id: Uuid) -> Result<i64, String> {
350        let row =
351            sqlx::query("SELECT COUNT(*) as count FROM individual_members WHERE campaign_id = $1")
352                .bind(campaign_id)
353                .fetch_one(&self.pool)
354                .await
355                .map_err(|e| format!("Database error: {}", e))?;
356
357        Ok(row.get::<i64, _>("count"))
358    }
359
360    async fn count_active_by_campaign(&self, campaign_id: Uuid) -> Result<i64, String> {
361        let row = sqlx::query(
362            "SELECT COUNT(*) as count FROM individual_members WHERE campaign_id = $1 AND unsubscribed_at IS NULL",
363        )
364        .bind(campaign_id)
365        .fetch_one(&self.pool)
366        .await
367        .map_err(|e| format!("Database error: {}", e))?;
368
369        Ok(row.get::<i64, _>("count"))
370    }
371
372    async fn count_with_consent_by_campaign(&self, campaign_id: Uuid) -> Result<i64, String> {
373        let row = sqlx::query(
374            "SELECT COUNT(*) as count FROM individual_members WHERE campaign_id = $1 AND has_gdpr_consent = TRUE AND unsubscribed_at IS NULL",
375        )
376        .bind(campaign_id)
377        .fetch_one(&self.pool)
378        .await
379        .map_err(|e| format!("Database error: {}", e))?;
380
381        Ok(row.get::<i64, _>("count"))
382    }
383}