1use crate::application::ports::{ConvocationRecipientRepository, RecipientTrackingSummary};
2use crate::domain::entities::{AttendanceStatus, ConvocationRecipient};
3use async_trait::async_trait;
4use sqlx::PgPool;
5use uuid::Uuid;
6
7pub struct PostgresConvocationRecipientRepository {
9 pool: PgPool,
10}
11
12impl PostgresConvocationRecipientRepository {
13 pub fn new(pool: PgPool) -> Self {
14 Self { pool }
15 }
16
17 fn attendance_status_to_db(status: &AttendanceStatus) -> &'static str {
19 status.to_db_string()
20 }
21
22 fn attendance_status_from_db(s: &str) -> Result<AttendanceStatus, String> {
24 AttendanceStatus::from_db_string(s)
25 }
26}
27
28#[async_trait]
29impl ConvocationRecipientRepository for PostgresConvocationRecipientRepository {
30 async fn create(
31 &self,
32 recipient: &ConvocationRecipient,
33 ) -> Result<ConvocationRecipient, String> {
34 let attendance_status_str = Self::attendance_status_to_db(&recipient.attendance_status);
35
36 let row = sqlx::query!(
37 r#"
38 INSERT INTO convocation_recipients (
39 id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
40 email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
41 attendance_status, attendance_updated_at, proxy_owner_id,
42 created_at, updated_at
43 )
44 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11::TEXT::attendance_status, $12, $13, $14, $15)
45 RETURNING id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
46 email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
47 attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
48 created_at, updated_at
49 "#,
50 recipient.id,
51 recipient.convocation_id,
52 recipient.owner_id,
53 recipient.email,
54 recipient.email_sent_at,
55 recipient.email_opened_at,
56 recipient.email_failed,
57 recipient.email_failure_reason,
58 recipient.reminder_sent_at,
59 recipient.reminder_opened_at,
60 attendance_status_str,
61 recipient.attendance_updated_at,
62 recipient.proxy_owner_id,
63 recipient.created_at,
64 recipient.updated_at,
65 )
66 .fetch_one(&self.pool)
67 .await
68 .map_err(|e| format!("Failed to create convocation recipient: {}", e))?;
69
70 Ok(ConvocationRecipient {
71 id: row.id,
72 convocation_id: row.convocation_id,
73 owner_id: row.owner_id,
74 email: row.email,
75 email_sent_at: row.email_sent_at,
76 email_opened_at: row.email_opened_at,
77 email_failed: row.email_failed,
78 email_failure_reason: row.email_failure_reason,
79 reminder_sent_at: row.reminder_sent_at,
80 reminder_opened_at: row.reminder_opened_at,
81 attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
82 attendance_updated_at: row.attendance_updated_at,
83 proxy_owner_id: row.proxy_owner_id,
84 created_at: row.created_at,
85 updated_at: row.updated_at,
86 })
87 }
88
89 async fn create_many(
90 &self,
91 recipients: &[ConvocationRecipient],
92 ) -> Result<Vec<ConvocationRecipient>, String> {
93 if recipients.is_empty() {
94 return Ok(Vec::new());
95 }
96
97 let mut tx = self
99 .pool
100 .begin()
101 .await
102 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
103
104 let mut created = Vec::new();
105
106 for recipient in recipients {
107 let attendance_status_str = Self::attendance_status_to_db(&recipient.attendance_status);
108
109 let row = sqlx::query!(
110 r#"
111 INSERT INTO convocation_recipients (
112 id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
113 email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
114 attendance_status, attendance_updated_at, proxy_owner_id,
115 created_at, updated_at
116 )
117 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11::TEXT::attendance_status, $12, $13, $14, $15)
118 RETURNING id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
119 email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
120 attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
121 created_at, updated_at
122 "#,
123 recipient.id,
124 recipient.convocation_id,
125 recipient.owner_id,
126 recipient.email,
127 recipient.email_sent_at,
128 recipient.email_opened_at,
129 recipient.email_failed,
130 recipient.email_failure_reason,
131 recipient.reminder_sent_at,
132 recipient.reminder_opened_at,
133 attendance_status_str,
134 recipient.attendance_updated_at,
135 recipient.proxy_owner_id,
136 recipient.created_at,
137 recipient.updated_at,
138 )
139 .fetch_one(&mut *tx)
140 .await
141 .map_err(|e| format!("Failed to create recipient in batch: {}", e))?;
142
143 created.push(ConvocationRecipient {
144 id: row.id,
145 convocation_id: row.convocation_id,
146 owner_id: row.owner_id,
147 email: row.email,
148 email_sent_at: row.email_sent_at,
149 email_opened_at: row.email_opened_at,
150 email_failed: row.email_failed,
151 email_failure_reason: row.email_failure_reason,
152 reminder_sent_at: row.reminder_sent_at,
153 reminder_opened_at: row.reminder_opened_at,
154 attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
155 attendance_updated_at: row.attendance_updated_at,
156 proxy_owner_id: row.proxy_owner_id,
157 created_at: row.created_at,
158 updated_at: row.updated_at,
159 });
160 }
161
162 tx.commit()
163 .await
164 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
165
166 Ok(created)
167 }
168
169 async fn find_by_id(&self, id: Uuid) -> Result<Option<ConvocationRecipient>, String> {
170 let row = sqlx::query!(
171 r#"
172 SELECT id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
173 email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
174 attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
175 created_at, updated_at
176 FROM convocation_recipients
177 WHERE id = $1
178 "#,
179 id
180 )
181 .fetch_optional(&self.pool)
182 .await
183 .map_err(|e| format!("Failed to find convocation recipient by id: {}", e))?;
184
185 match row {
186 Some(row) => Ok(Some(ConvocationRecipient {
187 id: row.id,
188 convocation_id: row.convocation_id,
189 owner_id: row.owner_id,
190 email: row.email,
191 email_sent_at: row.email_sent_at,
192 email_opened_at: row.email_opened_at,
193 email_failed: row.email_failed,
194 email_failure_reason: row.email_failure_reason,
195 reminder_sent_at: row.reminder_sent_at,
196 reminder_opened_at: row.reminder_opened_at,
197 attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
198 attendance_updated_at: row.attendance_updated_at,
199 proxy_owner_id: row.proxy_owner_id,
200 created_at: row.created_at,
201 updated_at: row.updated_at,
202 })),
203 None => Ok(None),
204 }
205 }
206
207 async fn find_by_convocation(
208 &self,
209 convocation_id: Uuid,
210 ) -> Result<Vec<ConvocationRecipient>, String> {
211 let rows = sqlx::query!(
212 r#"
213 SELECT id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
214 email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
215 attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
216 created_at, updated_at
217 FROM convocation_recipients
218 WHERE convocation_id = $1
219 ORDER BY created_at ASC
220 "#,
221 convocation_id
222 )
223 .fetch_all(&self.pool)
224 .await
225 .map_err(|e| format!("Failed to find recipients by convocation: {}", e))?;
226
227 rows.into_iter()
228 .map(|row| {
229 Ok(ConvocationRecipient {
230 id: row.id,
231 convocation_id: row.convocation_id,
232 owner_id: row.owner_id,
233 email: row.email,
234 email_sent_at: row.email_sent_at,
235 email_opened_at: row.email_opened_at,
236 email_failed: row.email_failed,
237 email_failure_reason: row.email_failure_reason,
238 reminder_sent_at: row.reminder_sent_at,
239 reminder_opened_at: row.reminder_opened_at,
240 attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
241 attendance_updated_at: row.attendance_updated_at,
242 proxy_owner_id: row.proxy_owner_id,
243 created_at: row.created_at,
244 updated_at: row.updated_at,
245 })
246 })
247 .collect()
248 }
249
250 async fn find_by_convocation_and_owner(
251 &self,
252 convocation_id: Uuid,
253 owner_id: Uuid,
254 ) -> Result<Option<ConvocationRecipient>, String> {
255 let row = sqlx::query!(
256 r#"
257 SELECT id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
258 email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
259 attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
260 created_at, updated_at
261 FROM convocation_recipients
262 WHERE convocation_id = $1 AND owner_id = $2
263 "#,
264 convocation_id,
265 owner_id
266 )
267 .fetch_optional(&self.pool)
268 .await
269 .map_err(|e| {
270 format!(
271 "Failed to find recipient by convocation and owner: {}",
272 e
273 )
274 })?;
275
276 match row {
277 Some(row) => Ok(Some(ConvocationRecipient {
278 id: row.id,
279 convocation_id: row.convocation_id,
280 owner_id: row.owner_id,
281 email: row.email,
282 email_sent_at: row.email_sent_at,
283 email_opened_at: row.email_opened_at,
284 email_failed: row.email_failed,
285 email_failure_reason: row.email_failure_reason,
286 reminder_sent_at: row.reminder_sent_at,
287 reminder_opened_at: row.reminder_opened_at,
288 attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
289 attendance_updated_at: row.attendance_updated_at,
290 proxy_owner_id: row.proxy_owner_id,
291 created_at: row.created_at,
292 updated_at: row.updated_at,
293 })),
294 None => Ok(None),
295 }
296 }
297
298 async fn find_by_owner(&self, owner_id: Uuid) -> Result<Vec<ConvocationRecipient>, String> {
299 let rows = sqlx::query!(
300 r#"
301 SELECT id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
302 email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
303 attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
304 created_at, updated_at
305 FROM convocation_recipients
306 WHERE owner_id = $1
307 ORDER BY created_at DESC
308 "#,
309 owner_id
310 )
311 .fetch_all(&self.pool)
312 .await
313 .map_err(|e| format!("Failed to find recipients by owner: {}", e))?;
314
315 rows.into_iter()
316 .map(|row| {
317 Ok(ConvocationRecipient {
318 id: row.id,
319 convocation_id: row.convocation_id,
320 owner_id: row.owner_id,
321 email: row.email,
322 email_sent_at: row.email_sent_at,
323 email_opened_at: row.email_opened_at,
324 email_failed: row.email_failed,
325 email_failure_reason: row.email_failure_reason,
326 reminder_sent_at: row.reminder_sent_at,
327 reminder_opened_at: row.reminder_opened_at,
328 attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
329 attendance_updated_at: row.attendance_updated_at,
330 proxy_owner_id: row.proxy_owner_id,
331 created_at: row.created_at,
332 updated_at: row.updated_at,
333 })
334 })
335 .collect()
336 }
337
338 async fn find_by_attendance_status(
339 &self,
340 convocation_id: Uuid,
341 status: AttendanceStatus,
342 ) -> Result<Vec<ConvocationRecipient>, String> {
343 let status_str = Self::attendance_status_to_db(&status);
344
345 let rows = sqlx::query!(
346 r#"
347 SELECT id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
348 email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
349 attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
350 created_at, updated_at
351 FROM convocation_recipients
352 WHERE convocation_id = $1 AND attendance_status = $2::TEXT::attendance_status
353 ORDER BY created_at ASC
354 "#,
355 convocation_id,
356 status_str
357 )
358 .fetch_all(&self.pool)
359 .await
360 .map_err(|e| format!("Failed to find recipients by attendance status: {}", e))?;
361
362 rows.into_iter()
363 .map(|row| {
364 Ok(ConvocationRecipient {
365 id: row.id,
366 convocation_id: row.convocation_id,
367 owner_id: row.owner_id,
368 email: row.email,
369 email_sent_at: row.email_sent_at,
370 email_opened_at: row.email_opened_at,
371 email_failed: row.email_failed,
372 email_failure_reason: row.email_failure_reason,
373 reminder_sent_at: row.reminder_sent_at,
374 reminder_opened_at: row.reminder_opened_at,
375 attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
376 attendance_updated_at: row.attendance_updated_at,
377 proxy_owner_id: row.proxy_owner_id,
378 created_at: row.created_at,
379 updated_at: row.updated_at,
380 })
381 })
382 .collect()
383 }
384
385 async fn find_needing_reminder(
386 &self,
387 convocation_id: Uuid,
388 ) -> Result<Vec<ConvocationRecipient>, String> {
389 let rows = sqlx::query!(
390 r#"
391 SELECT id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
392 email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
393 attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
394 created_at, updated_at
395 FROM convocation_recipients
396 WHERE convocation_id = $1
397 AND email_sent_at IS NOT NULL
398 AND email_opened_at IS NULL
399 AND reminder_sent_at IS NULL
400 AND email_failed = FALSE
401 ORDER BY created_at ASC
402 "#,
403 convocation_id
404 )
405 .fetch_all(&self.pool)
406 .await
407 .map_err(|e| format!("Failed to find recipients needing reminder: {}", e))?;
408
409 rows.into_iter()
410 .map(|row| {
411 Ok(ConvocationRecipient {
412 id: row.id,
413 convocation_id: row.convocation_id,
414 owner_id: row.owner_id,
415 email: row.email,
416 email_sent_at: row.email_sent_at,
417 email_opened_at: row.email_opened_at,
418 email_failed: row.email_failed,
419 email_failure_reason: row.email_failure_reason,
420 reminder_sent_at: row.reminder_sent_at,
421 reminder_opened_at: row.reminder_opened_at,
422 attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
423 attendance_updated_at: row.attendance_updated_at,
424 proxy_owner_id: row.proxy_owner_id,
425 created_at: row.created_at,
426 updated_at: row.updated_at,
427 })
428 })
429 .collect()
430 }
431
432 async fn find_failed_emails(
433 &self,
434 convocation_id: Uuid,
435 ) -> Result<Vec<ConvocationRecipient>, String> {
436 let rows = sqlx::query!(
437 r#"
438 SELECT id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
439 email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
440 attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
441 created_at, updated_at
442 FROM convocation_recipients
443 WHERE convocation_id = $1 AND email_failed = TRUE
444 ORDER BY created_at ASC
445 "#,
446 convocation_id
447 )
448 .fetch_all(&self.pool)
449 .await
450 .map_err(|e| format!("Failed to find failed emails: {}", e))?;
451
452 rows.into_iter()
453 .map(|row| {
454 Ok(ConvocationRecipient {
455 id: row.id,
456 convocation_id: row.convocation_id,
457 owner_id: row.owner_id,
458 email: row.email,
459 email_sent_at: row.email_sent_at,
460 email_opened_at: row.email_opened_at,
461 email_failed: row.email_failed,
462 email_failure_reason: row.email_failure_reason,
463 reminder_sent_at: row.reminder_sent_at,
464 reminder_opened_at: row.reminder_opened_at,
465 attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
466 attendance_updated_at: row.attendance_updated_at,
467 proxy_owner_id: row.proxy_owner_id,
468 created_at: row.created_at,
469 updated_at: row.updated_at,
470 })
471 })
472 .collect()
473 }
474
475 async fn update(
476 &self,
477 recipient: &ConvocationRecipient,
478 ) -> Result<ConvocationRecipient, String> {
479 let attendance_status_str = Self::attendance_status_to_db(&recipient.attendance_status);
480
481 let row = sqlx::query!(
482 r#"
483 UPDATE convocation_recipients
484 SET convocation_id = $2, owner_id = $3, email = $4, email_sent_at = $5, email_opened_at = $6,
485 email_failed = $7, email_failure_reason = $8, reminder_sent_at = $9, reminder_opened_at = $10,
486 attendance_status = $11::TEXT::attendance_status, attendance_updated_at = $12, proxy_owner_id = $13,
487 updated_at = $14
488 WHERE id = $1
489 RETURNING id, convocation_id, owner_id, email, email_sent_at, email_opened_at,
490 email_failed, email_failure_reason, reminder_sent_at, reminder_opened_at,
491 attendance_status::text AS "attendance_status!", attendance_updated_at, proxy_owner_id,
492 created_at, updated_at
493 "#,
494 recipient.id,
495 recipient.convocation_id,
496 recipient.owner_id,
497 recipient.email,
498 recipient.email_sent_at,
499 recipient.email_opened_at,
500 recipient.email_failed,
501 recipient.email_failure_reason,
502 recipient.reminder_sent_at,
503 recipient.reminder_opened_at,
504 attendance_status_str,
505 recipient.attendance_updated_at,
506 recipient.proxy_owner_id,
507 recipient.updated_at,
508 )
509 .fetch_one(&self.pool)
510 .await
511 .map_err(|e| format!("Failed to update convocation recipient: {}", e))?;
512
513 Ok(ConvocationRecipient {
514 id: row.id,
515 convocation_id: row.convocation_id,
516 owner_id: row.owner_id,
517 email: row.email,
518 email_sent_at: row.email_sent_at,
519 email_opened_at: row.email_opened_at,
520 email_failed: row.email_failed,
521 email_failure_reason: row.email_failure_reason,
522 reminder_sent_at: row.reminder_sent_at,
523 reminder_opened_at: row.reminder_opened_at,
524 attendance_status: Self::attendance_status_from_db(&row.attendance_status)?,
525 attendance_updated_at: row.attendance_updated_at,
526 proxy_owner_id: row.proxy_owner_id,
527 created_at: row.created_at,
528 updated_at: row.updated_at,
529 })
530 }
531
532 async fn delete(&self, id: Uuid) -> Result<bool, String> {
533 let result = sqlx::query!(
534 r#"
535 DELETE FROM convocation_recipients
536 WHERE id = $1
537 "#,
538 id
539 )
540 .execute(&self.pool)
541 .await
542 .map_err(|e| format!("Failed to delete convocation recipient: {}", e))?;
543
544 Ok(result.rows_affected() > 0)
545 }
546
547 async fn count_by_convocation(&self, convocation_id: Uuid) -> Result<i64, String> {
548 let row = sqlx::query!(
549 r#"
550 SELECT COUNT(*) AS "count!"
551 FROM convocation_recipients
552 WHERE convocation_id = $1
553 "#,
554 convocation_id
555 )
556 .fetch_one(&self.pool)
557 .await
558 .map_err(|e| format!("Failed to count recipients by convocation: {}", e))?;
559
560 Ok(row.count)
561 }
562
563 async fn count_opened(&self, convocation_id: Uuid) -> Result<i64, String> {
564 let row = sqlx::query!(
565 r#"
566 SELECT COUNT(*) AS "count!"
567 FROM convocation_recipients
568 WHERE convocation_id = $1 AND email_opened_at IS NOT NULL
569 "#,
570 convocation_id
571 )
572 .fetch_one(&self.pool)
573 .await
574 .map_err(|e| format!("Failed to count opened emails: {}", e))?;
575
576 Ok(row.count)
577 }
578
579 async fn count_by_attendance_status(
580 &self,
581 convocation_id: Uuid,
582 status: AttendanceStatus,
583 ) -> Result<i64, String> {
584 let status_str = Self::attendance_status_to_db(&status);
585
586 let row = sqlx::query!(
587 r#"
588 SELECT COUNT(*) AS "count!"
589 FROM convocation_recipients
590 WHERE convocation_id = $1 AND attendance_status = $2::TEXT::attendance_status
591 "#,
592 convocation_id,
593 status_str
594 )
595 .fetch_one(&self.pool)
596 .await
597 .map_err(|e| format!("Failed to count by attendance status: {}", e))?;
598
599 Ok(row.count)
600 }
601
602 async fn get_tracking_summary(
603 &self,
604 convocation_id: Uuid,
605 ) -> Result<RecipientTrackingSummary, String> {
606 let row = sqlx::query!(
607 r#"
608 SELECT
609 COUNT(*) AS "total_count!",
610 COUNT(*) FILTER (WHERE email_opened_at IS NOT NULL) AS "opened_count!",
611 COUNT(*) FILTER (WHERE attendance_status = 'will_attend') AS "will_attend_count!",
612 COUNT(*) FILTER (WHERE attendance_status = 'will_not_attend') AS "will_not_attend_count!",
613 COUNT(*) FILTER (WHERE attendance_status = 'attended') AS "attended_count!",
614 COUNT(*) FILTER (WHERE attendance_status = 'did_not_attend') AS "did_not_attend_count!",
615 COUNT(*) FILTER (WHERE attendance_status = 'pending') AS "pending_count!",
616 COUNT(*) FILTER (WHERE email_failed = TRUE) AS "failed_email_count!"
617 FROM convocation_recipients
618 WHERE convocation_id = $1
619 "#,
620 convocation_id
621 )
622 .fetch_one(&self.pool)
623 .await
624 .map_err(|e| format!("Failed to get tracking summary: {}", e))?;
625
626 Ok(RecipientTrackingSummary {
627 total_count: row.total_count,
628 opened_count: row.opened_count,
629 will_attend_count: row.will_attend_count,
630 will_not_attend_count: row.will_not_attend_count,
631 attended_count: row.attended_count,
632 did_not_attend_count: row.did_not_attend_count,
633 pending_count: row.pending_count,
634 failed_email_count: row.failed_email_count,
635 })
636 }
637}