koprogo_api/infrastructure/database/repositories/
convocation_recipient_repository_impl.rs

1use crate::application::ports::{ConvocationRecipientRepository, RecipientTrackingSummary};
2use crate::domain::entities::{AttendanceStatus, ConvocationRecipient};
3use async_trait::async_trait;
4use sqlx::PgPool;
5use uuid::Uuid;
6
7/// PostgreSQL implementation of ConvocationRecipientRepository
8pub struct PostgresConvocationRecipientRepository {
9    pool: PgPool,
10}
11
12impl PostgresConvocationRecipientRepository {
13    pub fn new(pool: PgPool) -> Self {
14        Self { pool }
15    }
16
17    /// Convert AttendanceStatus enum to database string
18    fn attendance_status_to_db(status: &AttendanceStatus) -> &'static str {
19        status.to_db_string()
20    }
21
22    /// Convert database string to AttendanceStatus enum
23    fn attendance_status_from_db(s: &str) -> Result<AttendanceStatus, String> {
24        AttendanceStatus::from_db_string(s)
25    }
26}
27
28#[async_trait]
29impl ConvocationRecipientRepository for PostgresConvocationRecipientRepository {
30    async fn create(
31        &self,
32        recipient: &ConvocationRecipient,
33    ) -> Result<ConvocationRecipient, String> {
34        let attendance_status_str = Self::attendance_status_to_db(&recipient.attendance_status);
35
36        let row = sqlx::query!(
37            r#"
38            INSERT INTO convocation_recipients (
39                id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
40                email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
41                attendance_status, attendance_updated_at, proxy_owner_id,
42                created_at, updated_at
43            )
44            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11::TEXT::attendance_status, $12, $13, $14, $15)
45            RETURNING id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
46                      email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
47                      attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
48                      created_at, updated_at
49            "#,
50            recipient.id,
51            recipient.convocation_id,
52            recipient.owner_id,
53            recipient.email,
54            recipient.email_sent_at,
55            recipient.email_opened_at,
56            recipient.email_failed,
57            recipient.email_failure_reason,
58            recipient.reminder_sent_at,
59            recipient.reminder_opened_at,
60            attendance_status_str,
61            recipient.attendance_updated_at,
62            recipient.proxy_owner_id,
63            recipient.created_at,
64            recipient.updated_at,
65        )
66        .fetch_one(&self.pool)
67        .await
68        .map_err(|e| format!("Failed to create convocation recipient: {}", e))?;
69
70        Ok(ConvocationRecipient {
71            id: row.id,
72            convocation_id: row.convocation_id,
73            owner_id: row.owner_id,
74            email: row.email,
75            email_sent_at: row.email_sent_at,
76            email_opened_at: row.email_opened_at,
77            email_failed: row.email_failed,
78            email_failure_reason: row.email_failure_reason,
79            reminder_sent_at: row.reminder_sent_at,
80            reminder_opened_at: row.reminder_opened_at,
81            attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
82            attendance_updated_at: row.attendance_updated_at,
83            proxy_owner_id: row.proxy_owner_id,
84            created_at: row.created_at,
85            updated_at: row.updated_at,
86        })
87    }
88
89    async fn create_many(
90        &self,
91        recipients: &[ConvocationRecipient],
92    ) -> Result<Vec<ConvocationRecipient>, String> {
93        if recipients.is_empty() {
94            return Ok(Vec::new());
95        }
96
97        // Use a transaction for bulk insert
98        let mut tx = self
99            .pool
100            .begin()
101            .await
102            .map_err(|e| format!("Failed to begin transaction: {}", e))?;
103
104        let mut created = Vec::new();
105
106        for recipient in recipients {
107            let attendance_status_str = Self::attendance_status_to_db(&recipient.attendance_status);
108
109            let row = sqlx::query!(
110                r#"
111                INSERT INTO convocation_recipients (
112                    id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
113                    email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
114                    attendance_status, attendance_updated_at, proxy_owner_id,
115                    created_at, updated_at
116                )
117                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11::TEXT::attendance_status, $12, $13, $14, $15)
118                RETURNING id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
119                          email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
120                          attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
121                          created_at, updated_at
122                "#,
123                recipient.id,
124                recipient.convocation_id,
125                recipient.owner_id,
126                recipient.email,
127                recipient.email_sent_at,
128                recipient.email_opened_at,
129                recipient.email_failed,
130                recipient.email_failure_reason,
131                recipient.reminder_sent_at,
132                recipient.reminder_opened_at,
133                attendance_status_str,
134                recipient.attendance_updated_at,
135                recipient.proxy_owner_id,
136                recipient.created_at,
137                recipient.updated_at,
138            )
139            .fetch_one(&mut *tx)
140            .await
141            .map_err(|e| format!("Failed to create recipient in batch: {}", e))?;
142
143            created.push(ConvocationRecipient {
144                id: row.id,
145                convocation_id: row.convocation_id,
146                owner_id: row.owner_id,
147                email: row.email,
148                email_sent_at: row.email_sent_at,
149                email_opened_at: row.email_opened_at,
150                email_failed: row.email_failed,
151                email_failure_reason: row.email_failure_reason,
152                reminder_sent_at: row.reminder_sent_at,
153                reminder_opened_at: row.reminder_opened_at,
154                attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
155                attendance_updated_at: row.attendance_updated_at,
156                proxy_owner_id: row.proxy_owner_id,
157                created_at: row.created_at,
158                updated_at: row.updated_at,
159            });
160        }
161
162        tx.commit()
163            .await
164            .map_err(|e| format!("Failed to commit transaction: {}", e))?;
165
166        Ok(created)
167    }
168
169    async fn find_by_id(&self, id: Uuid) -> Result<Option<ConvocationRecipient>, String> {
170        let row = sqlx::query!(
171            r#"
172            SELECT id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
173                   email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
174                   attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
175                   created_at, updated_at
176            FROM convocation_recipients
177            WHERE id = $1
178            "#,
179            id
180        )
181        .fetch_optional(&self.pool)
182        .await
183        .map_err(|e| format!("Failed to find convocation recipient by id: {}", e))?;
184
185        match row {
186            Some(row) => Ok(Some(ConvocationRecipient {
187                id: row.id,
188                convocation_id: row.convocation_id,
189                owner_id: row.owner_id,
190                email: row.email,
191                email_sent_at: row.email_sent_at,
192                email_opened_at: row.email_opened_at,
193                email_failed: row.email_failed,
194                email_failure_reason: row.email_failure_reason,
195                reminder_sent_at: row.reminder_sent_at,
196                reminder_opened_at: row.reminder_opened_at,
197                attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
198                attendance_updated_at: row.attendance_updated_at,
199                proxy_owner_id: row.proxy_owner_id,
200                created_at: row.created_at,
201                updated_at: row.updated_at,
202            })),
203            None => Ok(None),
204        }
205    }
206
207    async fn find_by_convocation(
208        &self,
209        convocation_id: Uuid,
210    ) -> Result<Vec<ConvocationRecipient>, String> {
211        let rows = sqlx::query!(
212            r#"
213            SELECT id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
214                   email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
215                   attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
216                   created_at, updated_at
217            FROM convocation_recipients
218            WHERE convocation_id = $1
219            ORDER BY created_at ASC
220            "#,
221            convocation_id
222        )
223        .fetch_all(&self.pool)
224        .await
225        .map_err(|e| format!("Failed to find recipients by convocation: {}", e))?;
226
227        rows.into_iter()
228            .map(|row| {
229                Ok(ConvocationRecipient {
230                    id: row.id,
231                    convocation_id: row.convocation_id,
232                    owner_id: row.owner_id,
233                    email: row.email,
234                    email_sent_at: row.email_sent_at,
235                    email_opened_at: row.email_opened_at,
236                    email_failed: row.email_failed,
237                    email_failure_reason: row.email_failure_reason,
238                    reminder_sent_at: row.reminder_sent_at,
239                    reminder_opened_at: row.reminder_opened_at,
240                    attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
241                    attendance_updated_at: row.attendance_updated_at,
242                    proxy_owner_id: row.proxy_owner_id,
243                    created_at: row.created_at,
244                    updated_at: row.updated_at,
245                })
246            })
247            .collect()
248    }
249
250    async fn find_by_convocation_and_owner(
251        &self,
252        convocation_id: Uuid,
253        owner_id: Uuid,
254    ) -> Result<Option<ConvocationRecipient>, String> {
255        let row = sqlx::query!(
256            r#"
257            SELECT id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
258                   email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
259                   attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
260                   created_at, updated_at
261            FROM convocation_recipients
262            WHERE convocation_id = $1 AND owner_id = $2
263            "#,
264            convocation_id,
265            owner_id
266        )
267        .fetch_optional(&self.pool)
268        .await
269        .map_err(|e| {
270            format!(
271                "Failed to find recipient by convocation and owner: {}",
272                e
273            )
274        })?;
275
276        match row {
277            Some(row) => Ok(Some(ConvocationRecipient {
278                id: row.id,
279                convocation_id: row.convocation_id,
280                owner_id: row.owner_id,
281                email: row.email,
282                email_sent_at: row.email_sent_at,
283                email_opened_at: row.email_opened_at,
284                email_failed: row.email_failed,
285                email_failure_reason: row.email_failure_reason,
286                reminder_sent_at: row.reminder_sent_at,
287                reminder_opened_at: row.reminder_opened_at,
288                attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
289                attendance_updated_at: row.attendance_updated_at,
290                proxy_owner_id: row.proxy_owner_id,
291                created_at: row.created_at,
292                updated_at: row.updated_at,
293            })),
294            None => Ok(None),
295        }
296    }
297
298    async fn find_by_owner(&self, owner_id: Uuid) -> Result<Vec<ConvocationRecipient>, String> {
299        let rows = sqlx::query!(
300            r#"
301            SELECT id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
302                   email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
303                   attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
304                   created_at, updated_at
305            FROM convocation_recipients
306            WHERE owner_id = $1
307            ORDER BY created_at DESC
308            "#,
309            owner_id
310        )
311        .fetch_all(&self.pool)
312        .await
313        .map_err(|e| format!("Failed to find recipients by owner: {}", e))?;
314
315        rows.into_iter()
316            .map(|row| {
317                Ok(ConvocationRecipient {
318                    id: row.id,
319                    convocation_id: row.convocation_id,
320                    owner_id: row.owner_id,
321                    email: row.email,
322                    email_sent_at: row.email_sent_at,
323                    email_opened_at: row.email_opened_at,
324                    email_failed: row.email_failed,
325                    email_failure_reason: row.email_failure_reason,
326                    reminder_sent_at: row.reminder_sent_at,
327                    reminder_opened_at: row.reminder_opened_at,
328                    attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
329                    attendance_updated_at: row.attendance_updated_at,
330                    proxy_owner_id: row.proxy_owner_id,
331                    created_at: row.created_at,
332                    updated_at: row.updated_at,
333                })
334            })
335            .collect()
336    }
337
338    async fn find_by_attendance_status(
339        &self,
340        convocation_id: Uuid,
341        status: AttendanceStatus,
342    ) -> Result<Vec<ConvocationRecipient>, String> {
343        let status_str = Self::attendance_status_to_db(&status);
344
345        let rows = sqlx::query!(
346            r#"
347            SELECT id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
348                   email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
349                   attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
350                   created_at, updated_at
351            FROM convocation_recipients
352            WHERE convocation_id = $1 AND attendance_status = $2::TEXT::attendance_status
353            ORDER BY created_at ASC
354            "#,
355            convocation_id,
356            status_str
357        )
358        .fetch_all(&self.pool)
359        .await
360        .map_err(|e| format!("Failed to find recipients by attendance status: {}", e))?;
361
362        rows.into_iter()
363            .map(|row| {
364                Ok(ConvocationRecipient {
365                    id: row.id,
366                    convocation_id: row.convocation_id,
367                    owner_id: row.owner_id,
368                    email: row.email,
369                    email_sent_at: row.email_sent_at,
370                    email_opened_at: row.email_opened_at,
371                    email_failed: row.email_failed,
372                    email_failure_reason: row.email_failure_reason,
373                    reminder_sent_at: row.reminder_sent_at,
374                    reminder_opened_at: row.reminder_opened_at,
375                    attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
376                    attendance_updated_at: row.attendance_updated_at,
377                    proxy_owner_id: row.proxy_owner_id,
378                    created_at: row.created_at,
379                    updated_at: row.updated_at,
380                })
381            })
382            .collect()
383    }
384
385    async fn find_needing_reminder(
386        &self,
387        convocation_id: Uuid,
388    ) -> Result<Vec<ConvocationRecipient>, String> {
389        let rows = sqlx::query!(
390            r#"
391            SELECT id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
392                   email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
393                   attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
394                   created_at, updated_at
395            FROM convocation_recipients
396            WHERE convocation_id = $1
397              AND email_sent_at IS NOT NULL
398              AND email_opened_at IS NULL
399              AND reminder_sent_at IS NULL
400              AND email_failed = FALSE
401            ORDER BY created_at ASC
402            "#,
403            convocation_id
404        )
405        .fetch_all(&self.pool)
406        .await
407        .map_err(|e| format!("Failed to find recipients needing reminder: {}", e))?;
408
409        rows.into_iter()
410            .map(|row| {
411                Ok(ConvocationRecipient {
412                    id: row.id,
413                    convocation_id: row.convocation_id,
414                    owner_id: row.owner_id,
415                    email: row.email,
416                    email_sent_at: row.email_sent_at,
417                    email_opened_at: row.email_opened_at,
418                    email_failed: row.email_failed,
419                    email_failure_reason: row.email_failure_reason,
420                    reminder_sent_at: row.reminder_sent_at,
421                    reminder_opened_at: row.reminder_opened_at,
422                    attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
423                    attendance_updated_at: row.attendance_updated_at,
424                    proxy_owner_id: row.proxy_owner_id,
425                    created_at: row.created_at,
426                    updated_at: row.updated_at,
427                })
428            })
429            .collect()
430    }
431
432    async fn find_failed_emails(
433        &self,
434        convocation_id: Uuid,
435    ) -> Result<Vec<ConvocationRecipient>, String> {
436        let rows = sqlx::query!(
437            r#"
438            SELECT id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
439                   email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
440                   attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
441                   created_at, updated_at
442            FROM convocation_recipients
443            WHERE convocation_id = $1 AND email_failed = TRUE
444            ORDER BY created_at ASC
445            "#,
446            convocation_id
447        )
448        .fetch_all(&self.pool)
449        .await
450        .map_err(|e| format!("Failed to find failed emails: {}", e))?;
451
452        rows.into_iter()
453            .map(|row| {
454                Ok(ConvocationRecipient {
455                    id: row.id,
456                    convocation_id: row.convocation_id,
457                    owner_id: row.owner_id,
458                    email: row.email,
459                    email_sent_at: row.email_sent_at,
460                    email_opened_at: row.email_opened_at,
461                    email_failed: row.email_failed,
462                    email_failure_reason: row.email_failure_reason,
463                    reminder_sent_at: row.reminder_sent_at,
464                    reminder_opened_at: row.reminder_opened_at,
465                    attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
466                    attendance_updated_at: row.attendance_updated_at,
467                    proxy_owner_id: row.proxy_owner_id,
468                    created_at: row.created_at,
469                    updated_at: row.updated_at,
470                })
471            })
472            .collect()
473    }
474
475    async fn update(
476        &self,
477        recipient: &ConvocationRecipient,
478    ) -> Result<ConvocationRecipient, String> {
479        let attendance_status_str = Self::attendance_status_to_db(&recipient.attendance_status);
480
481        let row = sqlx::query!(
482            r#"
483            UPDATE convocation_recipients
484            SET convocation_id = $2, owner_id = $3, email = $4, email_sent_at = $5, email_opened_at = $6,
485                email_failed = $7, email_failure_reason = $8, reminder_sent_at = $9, reminder_opened_at = $10,
486                attendance_status = $11::TEXT::attendance_status, attendance_updated_at = $12, proxy_owner_id = $13,
487                updated_at = $14
488            WHERE id = $1
489            RETURNING id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
490                      email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
491                      attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
492                      created_at, updated_at
493            "#,
494            recipient.id,
495            recipient.convocation_id,
496            recipient.owner_id,
497            recipient.email,
498            recipient.email_sent_at,
499            recipient.email_opened_at,
500            recipient.email_failed,
501            recipient.email_failure_reason,
502            recipient.reminder_sent_at,
503            recipient.reminder_opened_at,
504            attendance_status_str,
505            recipient.attendance_updated_at,
506            recipient.proxy_owner_id,
507            recipient.updated_at,
508        )
509        .fetch_one(&self.pool)
510        .await
511        .map_err(|e| format!("Failed to update convocation recipient: {}", e))?;
512
513        Ok(ConvocationRecipient {
514            id: row.id,
515            convocation_id: row.convocation_id,
516            owner_id: row.owner_id,
517            email: row.email,
518            email_sent_at: row.email_sent_at,
519            email_opened_at: row.email_opened_at,
520            email_failed: row.email_failed,
521            email_failure_reason: row.email_failure_reason,
522            reminder_sent_at: row.reminder_sent_at,
523            reminder_opened_at: row.reminder_opened_at,
524            attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
525            attendance_updated_at: row.attendance_updated_at,
526            proxy_owner_id: row.proxy_owner_id,
527            created_at: row.created_at,
528            updated_at: row.updated_at,
529        })
530    }
531
532    async fn delete(&self, id: Uuid) -> Result<bool, String> {
533        let result = sqlx::query!(
534            r#"
535            DELETE FROM convocation_recipients
536            WHERE id = $1
537            "#,
538            id
539        )
540        .execute(&self.pool)
541        .await
542        .map_err(|e| format!("Failed to delete convocation recipient: {}", e))?;
543
544        Ok(result.rows_affected() > 0)
545    }
546
547    async fn count_by_convocation(&self, convocation_id: Uuid) -> Result<i64, String> {
548        let row = sqlx::query!(
549            r#"
550            SELECT COUNT(*) AS "count!"
551            FROM convocation_recipients
552            WHERE convocation_id = $1
553            "#,
554            convocation_id
555        )
556        .fetch_one(&self.pool)
557        .await
558        .map_err(|e| format!("Failed to count recipients by convocation: {}", e))?;
559
560        Ok(row.count)
561    }
562
563    async fn count_opened(&self, convocation_id: Uuid) -> Result<i64, String> {
564        let row = sqlx::query!(
565            r#"
566            SELECT COUNT(*) AS "count!"
567            FROM convocation_recipients
568            WHERE convocation_id = $1 AND email_opened_at IS NOT NULL
569            "#,
570            convocation_id
571        )
572        .fetch_one(&self.pool)
573        .await
574        .map_err(|e| format!("Failed to count opened emails: {}", e))?;
575
576        Ok(row.count)
577    }
578
579    async fn count_by_attendance_status(
580        &self,
581        convocation_id: Uuid,
582        status: AttendanceStatus,
583    ) -> Result<i64, String> {
584        let status_str = Self::attendance_status_to_db(&status);
585
586        let row = sqlx::query!(
587            r#"
588            SELECT COUNT(*) AS "count!"
589            FROM convocation_recipients
590            WHERE convocation_id = $1 AND attendance_status = $2::TEXT::attendance_status
591            "#,
592            convocation_id,
593            status_str
594        )
595        .fetch_one(&self.pool)
596        .await
597        .map_err(|e| format!("Failed to count by attendance status: {}", e))?;
598
599        Ok(row.count)
600    }
601
602    async fn get_tracking_summary(
603        &self,
604        convocation_id: Uuid,
605    ) -> Result<RecipientTrackingSummary, String> {
606        let row = sqlx::query!(
607            r#"
608            SELECT
609                COUNT(*) AS "total_count!",
610                COUNT(*) FILTER (WHERE email_opened_at IS NOT NULL) AS "opened_count!",
611                COUNT(*) FILTER (WHERE attendance_status = 'will_attend') AS "will_attend_count!",
612                COUNT(*) FILTER (WHERE attendance_status = 'will_not_attend') AS "will_not_attend_count!",
613                COUNT(*) FILTER (WHERE attendance_status = 'attended') AS "attended_count!",
614                COUNT(*) FILTER (WHERE attendance_status = 'did_not_attend') AS "did_not_attend_count!",
615                COUNT(*) FILTER (WHERE attendance_status = 'pending') AS "pending_count!",
616                COUNT(*) FILTER (WHERE email_failed = TRUE) AS "failed_email_count!"
617            FROM convocation_recipients
618            WHERE convocation_id = $1
619            "#,
620            convocation_id
621        )
622        .fetch_one(&self.pool)
623        .await
624        .map_err(|e| format!("Failed to get tracking summary: {}", e))?;
625
626        Ok(RecipientTrackingSummary {
627            total_count: row.total_count,
628            opened_count: row.opened_count,
629            will_attend_count: row.will_attend_count,
630            will_not_attend_count: row.will_not_attend_count,
631            attended_count: row.attended_count,
632            did_not_attend_count: row.did_not_attend_count,
633            pending_count: row.pending_count,
634            failed_email_count: row.failed_email_count,
635        })
636    }
637}