1use crate::application::ports::individual_member_repository::IndividualMemberRepository;
2use crate::domain::entities::individual_member::IndividualMember;
3use crate::infrastructure::database::pool::DbPool;
4use async_trait::async_trait;
5use sqlx::Row;
6use uuid::Uuid;
7
8pub struct PostgresIndividualMemberRepository {
9 pool: DbPool,
10}
11
12impl PostgresIndividualMemberRepository {
13 pub fn new(pool: DbPool) -> Self {
14 Self { pool }
15 }
16}
17
18#[async_trait]
19impl IndividualMemberRepository for PostgresIndividualMemberRepository {
20 async fn create(&self, member: &IndividualMember) -> Result<IndividualMember, String> {
21 sqlx::query(
22 r#"
23 INSERT INTO individual_members (
24 id, campaign_id, email, postal_code, has_gdpr_consent,
25 consent_at, annual_consumption_kwh, current_provider,
26 ean_code, unsubscribed_at, created_at
27 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
28 "#,
29 )
30 .bind(member.id)
31 .bind(member.campaign_id)
32 .bind(&member.email)
33 .bind(&member.postal_code)
34 .bind(member.has_gdpr_consent)
35 .bind(member.consent_at)
36 .bind(member.annual_consumption_kwh)
37 .bind(&member.current_provider)
38 .bind(&member.ean_code)
39 .bind(member.unsubscribed_at)
40 .bind(member.created_at)
41 .execute(&self.pool)
42 .await
43 .map_err(|e| format!("Database error creating individual member: {}", e))?;
44
45 Ok(member.clone())
46 }
47
48 async fn find_by_id(&self, id: Uuid) -> Result<Option<IndividualMember>, String> {
49 let row = sqlx::query(
50 r#"
51 SELECT id, campaign_id, email, postal_code, has_gdpr_consent,
52 consent_at, annual_consumption_kwh, current_provider,
53 ean_code, unsubscribed_at, created_at
54 FROM individual_members
55 WHERE id = $1
56 "#,
57 )
58 .bind(id)
59 .fetch_optional(&self.pool)
60 .await
61 .map_err(|e| format!("Database error: {}", e))?;
62
63 Ok(row.map(|row| IndividualMember {
64 id: row.get("id"),
65 campaign_id: row.get("campaign_id"),
66 email: row.get("email"),
67 postal_code: row.get("postal_code"),
68 has_gdpr_consent: row.get("has_gdpr_consent"),
69 consent_at: row.get("consent_at"),
70 annual_consumption_kwh: row.get("annual_consumption_kwh"),
71 current_provider: row.get("current_provider"),
72 ean_code: row.get("ean_code"),
73 unsubscribed_at: row.get("unsubscribed_at"),
74 created_at: row.get("created_at"),
75 }))
76 }
77
78 async fn find_by_campaign(
79 &self,
80 campaign_id: Uuid,
81 page: i64,
82 per_page: i64,
83 ) -> Result<Vec<IndividualMember>, String> {
84 if page < 1 || per_page < 1 {
85 return Err("Page and per_page must be >= 1".to_string());
86 }
87
88 let offset = (page - 1) * per_page;
89
90 let rows = sqlx::query(
91 r#"
92 SELECT id, campaign_id, email, postal_code, has_gdpr_consent,
93 consent_at, annual_consumption_kwh, current_provider,
94 ean_code, unsubscribed_at, created_at
95 FROM individual_members
96 WHERE campaign_id = $1
97 ORDER BY created_at DESC
98 LIMIT $2 OFFSET $3
99 "#,
100 )
101 .bind(campaign_id)
102 .bind(per_page)
103 .bind(offset)
104 .fetch_all(&self.pool)
105 .await
106 .map_err(|e| format!("Database error: {}", e))?;
107
108 Ok(rows
109 .iter()
110 .map(|row| IndividualMember {
111 id: row.get("id"),
112 campaign_id: row.get("campaign_id"),
113 email: row.get("email"),
114 postal_code: row.get("postal_code"),
115 has_gdpr_consent: row.get("has_gdpr_consent"),
116 consent_at: row.get("consent_at"),
117 annual_consumption_kwh: row.get("annual_consumption_kwh"),
118 current_provider: row.get("current_provider"),
119 ean_code: row.get("ean_code"),
120 unsubscribed_at: row.get("unsubscribed_at"),
121 created_at: row.get("created_at"),
122 })
123 .collect())
124 }
125
126 async fn find_by_email(&self, email: &str) -> Result<Option<IndividualMember>, String> {
127 let row = sqlx::query(
128 r#"
129 SELECT id, campaign_id, email, postal_code, has_gdpr_consent,
130 consent_at, annual_consumption_kwh, current_provider,
131 ean_code, unsubscribed_at, created_at
132 FROM individual_members
133 WHERE email = $1
134 "#,
135 )
136 .bind(email)
137 .fetch_optional(&self.pool)
138 .await
139 .map_err(|e| format!("Database error: {}", e))?;
140
141 Ok(row.map(|row| IndividualMember {
142 id: row.get("id"),
143 campaign_id: row.get("campaign_id"),
144 email: row.get("email"),
145 postal_code: row.get("postal_code"),
146 has_gdpr_consent: row.get("has_gdpr_consent"),
147 consent_at: row.get("consent_at"),
148 annual_consumption_kwh: row.get("annual_consumption_kwh"),
149 current_provider: row.get("current_provider"),
150 ean_code: row.get("ean_code"),
151 unsubscribed_at: row.get("unsubscribed_at"),
152 created_at: row.get("created_at"),
153 }))
154 }
155
156 async fn find_by_email_and_campaign(
157 &self,
158 email: &str,
159 campaign_id: Uuid,
160 ) -> Result<Option<IndividualMember>, String> {
161 let row = sqlx::query(
162 r#"
163 SELECT id, campaign_id, email, postal_code, has_gdpr_consent,
164 consent_at, annual_consumption_kwh, current_provider,
165 ean_code, unsubscribed_at, created_at
166 FROM individual_members
167 WHERE email = $1 AND campaign_id = $2
168 "#,
169 )
170 .bind(email)
171 .bind(campaign_id)
172 .fetch_optional(&self.pool)
173 .await
174 .map_err(|e| format!("Database error: {}", e))?;
175
176 Ok(row.map(|row| IndividualMember {
177 id: row.get("id"),
178 campaign_id: row.get("campaign_id"),
179 email: row.get("email"),
180 postal_code: row.get("postal_code"),
181 has_gdpr_consent: row.get("has_gdpr_consent"),
182 consent_at: row.get("consent_at"),
183 annual_consumption_kwh: row.get("annual_consumption_kwh"),
184 current_provider: row.get("current_provider"),
185 ean_code: row.get("ean_code"),
186 unsubscribed_at: row.get("unsubscribed_at"),
187 created_at: row.get("created_at"),
188 }))
189 }
190
191 async fn find_active_by_campaign(
192 &self,
193 campaign_id: Uuid,
194 page: i64,
195 per_page: i64,
196 ) -> Result<Vec<IndividualMember>, String> {
197 if page < 1 || per_page < 1 {
198 return Err("Page and per_page must be >= 1".to_string());
199 }
200
201 let offset = (page - 1) * per_page;
202
203 let rows = sqlx::query(
204 r#"
205 SELECT id, campaign_id, email, postal_code, has_gdpr_consent,
206 consent_at, annual_consumption_kwh, current_provider,
207 ean_code, unsubscribed_at, created_at
208 FROM individual_members
209 WHERE campaign_id = $1 AND unsubscribed_at IS NULL
210 ORDER BY created_at DESC
211 LIMIT $2 OFFSET $3
212 "#,
213 )
214 .bind(campaign_id)
215 .bind(per_page)
216 .bind(offset)
217 .fetch_all(&self.pool)
218 .await
219 .map_err(|e| format!("Database error: {}", e))?;
220
221 Ok(rows
222 .iter()
223 .map(|row| IndividualMember {
224 id: row.get("id"),
225 campaign_id: row.get("campaign_id"),
226 email: row.get("email"),
227 postal_code: row.get("postal_code"),
228 has_gdpr_consent: row.get("has_gdpr_consent"),
229 consent_at: row.get("consent_at"),
230 annual_consumption_kwh: row.get("annual_consumption_kwh"),
231 current_provider: row.get("current_provider"),
232 ean_code: row.get("ean_code"),
233 unsubscribed_at: row.get("unsubscribed_at"),
234 created_at: row.get("created_at"),
235 })
236 .collect())
237 }
238
239 async fn find_with_consent_by_campaign(
240 &self,
241 campaign_id: Uuid,
242 page: i64,
243 per_page: i64,
244 ) -> Result<Vec<IndividualMember>, String> {
245 if page < 1 || per_page < 1 {
246 return Err("Page and per_page must be >= 1".to_string());
247 }
248
249 let offset = (page - 1) * per_page;
250
251 let rows = sqlx::query(
252 r#"
253 SELECT id, campaign_id, email, postal_code, has_gdpr_consent,
254 consent_at, annual_consumption_kwh, current_provider,
255 ean_code, unsubscribed_at, created_at
256 FROM individual_members
257 WHERE campaign_id = $1 AND has_gdpr_consent = TRUE AND unsubscribed_at IS NULL
258 ORDER BY created_at DESC
259 LIMIT $2 OFFSET $3
260 "#,
261 )
262 .bind(campaign_id)
263 .bind(per_page)
264 .bind(offset)
265 .fetch_all(&self.pool)
266 .await
267 .map_err(|e| format!("Database error: {}", e))?;
268
269 Ok(rows
270 .iter()
271 .map(|row| IndividualMember {
272 id: row.get("id"),
273 campaign_id: row.get("campaign_id"),
274 email: row.get("email"),
275 postal_code: row.get("postal_code"),
276 has_gdpr_consent: row.get("has_gdpr_consent"),
277 consent_at: row.get("consent_at"),
278 annual_consumption_kwh: row.get("annual_consumption_kwh"),
279 current_provider: row.get("current_provider"),
280 ean_code: row.get("ean_code"),
281 unsubscribed_at: row.get("unsubscribed_at"),
282 created_at: row.get("created_at"),
283 })
284 .collect())
285 }
286
287 async fn update(&self, member: &IndividualMember) -> Result<IndividualMember, String> {
288 sqlx::query(
289 r#"
290 UPDATE individual_members
291 SET email = $1,
292 postal_code = $2,
293 has_gdpr_consent = $3,
294 consent_at = $4,
295 annual_consumption_kwh = $5,
296 current_provider = $6,
297 ean_code = $7,
298 unsubscribed_at = $8
299 WHERE id = $9
300 "#,
301 )
302 .bind(&member.email)
303 .bind(&member.postal_code)
304 .bind(member.has_gdpr_consent)
305 .bind(member.consent_at)
306 .bind(member.annual_consumption_kwh)
307 .bind(&member.current_provider)
308 .bind(&member.ean_code)
309 .bind(member.unsubscribed_at)
310 .bind(member.id)
311 .execute(&self.pool)
312 .await
313 .map_err(|e| format!("Database error updating individual member: {}", e))?;
314
315 Ok(member.clone())
316 }
317
318 async fn delete(&self, id: Uuid) -> Result<(), String> {
319 sqlx::query("DELETE FROM individual_members WHERE id = $1")
320 .bind(id)
321 .execute(&self.pool)
322 .await
323 .map_err(|e| format!("Database error deleting individual member: {}", e))?;
324
325 Ok(())
326 }
327
328 async fn withdraw_consent(&self, id: Uuid) -> Result<(), String> {
329 sqlx::query(
330 r#"
331 UPDATE individual_members
332 SET email = CONCAT('withdrawn_', id::text, '@anonymized.invalid'),
333 postal_code = 'ANONYMIZED',
334 current_provider = NULL,
335 ean_code = NULL,
336 has_gdpr_consent = FALSE,
337 unsubscribed_at = NOW()
338 WHERE id = $1
339 "#,
340 )
341 .bind(id)
342 .execute(&self.pool)
343 .await
344 .map_err(|e| format!("Database error withdrawing consent: {}", e))?;
345
346 Ok(())
347 }
348
349 async fn count_by_campaign(&self, campaign_id: Uuid) -> Result<i64, String> {
350 let row =
351 sqlx::query("SELECT COUNT(*) as count FROM individual_members WHERE campaign_id = $1")
352 .bind(campaign_id)
353 .fetch_one(&self.pool)
354 .await
355 .map_err(|e| format!("Database error: {}", e))?;
356
357 Ok(row.get::<i64, _>("count"))
358 }
359
360 async fn count_active_by_campaign(&self, campaign_id: Uuid) -> Result<i64, String> {
361 let row = sqlx::query(
362 "SELECT COUNT(*) as count FROM individual_members WHERE campaign_id = $1 AND unsubscribed_at IS NULL",
363 )
364 .bind(campaign_id)
365 .fetch_one(&self.pool)
366 .await
367 .map_err(|e| format!("Database error: {}", e))?;
368
369 Ok(row.get::<i64, _>("count"))
370 }
371
372 async fn count_with_consent_by_campaign(&self, campaign_id: Uuid) -> Result<i64, String> {
373 let row = sqlx::query(
374 "SELECT COUNT(*) as count FROM individual_members WHERE campaign_id = $1 AND has_gdpr_consent = TRUE AND unsubscribed_at IS NULL",
375 )
376 .bind(campaign_id)
377 .fetch_one(&self.pool)
378 .await
379 .map_err(|e| format!("Database error: {}", e))?;
380
381 Ok(row.get::<i64, _>("count"))
382 }
383}