1use crate::application::ports::NotificationRepository;
2use crate::domain::entities::{
3 Notification, NotificationChannel, NotificationPriority, NotificationStatus, NotificationType,
4};
5use async_trait::async_trait;
6use sqlx::PgPool;
7use uuid::Uuid;
8
9pub struct PostgresNotificationRepository {
11 pool: PgPool,
12}
13
14impl PostgresNotificationRepository {
15 pub fn new(pool: PgPool) -> Self {
16 Self { pool }
17 }
18
19 fn type_to_db(notification_type: &NotificationType) -> &'static str {
21 match notification_type {
22 NotificationType::ExpenseCreated => "ExpenseCreated",
23 NotificationType::MeetingConvocation => "MeetingConvocation",
24 NotificationType::PaymentReceived => "PaymentReceived",
25 NotificationType::TicketResolved => "TicketResolved",
26 NotificationType::DocumentAdded => "DocumentAdded",
27 NotificationType::BoardMessage => "BoardMessage",
28 NotificationType::PaymentReminder => "PaymentReminder",
29 NotificationType::BudgetApproved => "BudgetApproved",
30 NotificationType::ResolutionVote => "ResolutionVote",
31 NotificationType::System => "System",
32 }
33 }
34
35 fn type_from_db(s: &str) -> Result<NotificationType, String> {
37 match s {
38 "ExpenseCreated" => Ok(NotificationType::ExpenseCreated),
39 "MeetingConvocation" => Ok(NotificationType::MeetingConvocation),
40 "PaymentReceived" => Ok(NotificationType::PaymentReceived),
41 "TicketResolved" => Ok(NotificationType::TicketResolved),
42 "DocumentAdded" => Ok(NotificationType::DocumentAdded),
43 "BoardMessage" => Ok(NotificationType::BoardMessage),
44 "PaymentReminder" => Ok(NotificationType::PaymentReminder),
45 "BudgetApproved" => Ok(NotificationType::BudgetApproved),
46 "ResolutionVote" => Ok(NotificationType::ResolutionVote),
47 "System" => Ok(NotificationType::System),
48 _ => Err(format!("Invalid notification type: {}", s)),
49 }
50 }
51
52 fn channel_to_db(channel: &NotificationChannel) -> &'static str {
54 match channel {
55 NotificationChannel::Email => "Email",
56 NotificationChannel::InApp => "InApp",
57 NotificationChannel::Push => "Push",
58 }
59 }
60
61 fn channel_from_db(s: &str) -> Result<NotificationChannel, String> {
63 match s {
64 "Email" => Ok(NotificationChannel::Email),
65 "InApp" => Ok(NotificationChannel::InApp),
66 "Push" => Ok(NotificationChannel::Push),
67 _ => Err(format!("Invalid notification channel: {}", s)),
68 }
69 }
70
71 fn priority_to_db(priority: &NotificationPriority) -> &'static str {
73 match priority {
74 NotificationPriority::Low => "Low",
75 NotificationPriority::Medium => "Medium",
76 NotificationPriority::High => "High",
77 NotificationPriority::Critical => "Critical",
78 }
79 }
80
81 fn priority_from_db(s: &str) -> Result<NotificationPriority, String> {
83 match s {
84 "Low" => Ok(NotificationPriority::Low),
85 "Medium" => Ok(NotificationPriority::Medium),
86 "High" => Ok(NotificationPriority::High),
87 "Critical" => Ok(NotificationPriority::Critical),
88 _ => Err(format!("Invalid notification priority: {}", s)),
89 }
90 }
91
92 fn status_to_db(status: &NotificationStatus) -> &'static str {
94 match status {
95 NotificationStatus::Pending => "Pending",
96 NotificationStatus::Sent => "Sent",
97 NotificationStatus::Failed => "Failed",
98 NotificationStatus::Read => "Read",
99 }
100 }
101
102 fn status_from_db(s: &str) -> Result<NotificationStatus, String> {
104 match s {
105 "Pending" => Ok(NotificationStatus::Pending),
106 "Sent" => Ok(NotificationStatus::Sent),
107 "Failed" => Ok(NotificationStatus::Failed),
108 "Read" => Ok(NotificationStatus::Read),
109 _ => Err(format!("Invalid notification status: {}", s)),
110 }
111 }
112}
113
114#[async_trait]
115impl NotificationRepository for PostgresNotificationRepository {
116 async fn create(&self, notification: &Notification) -> Result<Notification, String> {
117 let type_str = Self::type_to_db(¬ification.notification_type);
118 let channel_str = Self::channel_to_db(¬ification.channel);
119 let priority_str = Self::priority_to_db(¬ification.priority);
120 let status_str = Self::status_to_db(¬ification.status);
121
122 let row = sqlx::query!(
123 r#"
124 INSERT INTO notifications (
125 id, organization_id, user_id, notification_type, channel, priority, status,
126 title, message, link_url, metadata, sent_at, read_at, created_at, error_message
127 )
128 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
129 RETURNING id, organization_id, user_id, notification_type, channel, priority, status,
130 title, message, link_url, metadata, sent_at, read_at, created_at, error_message
131 "#,
132 notification.id,
133 notification.organization_id,
134 notification.user_id,
135 type_str,
136 channel_str,
137 priority_str,
138 status_str,
139 notification.title,
140 notification.message,
141 notification.link_url,
142 notification.metadata.as_ref().map(|m| serde_json::from_str::<serde_json::Value>(m).ok()).flatten(),
143 notification.sent_at,
144 notification.read_at,
145 notification.created_at,
146 notification.error_message
147 )
148 .fetch_one(&self.pool)
149 .await
150 .map_err(|e| format!("Database error creating notification: {}", e))?;
151
152 Ok(Notification {
153 id: row.id,
154 organization_id: row.organization_id,
155 user_id: row.user_id,
156 notification_type: Self::type_from_db(&row.notification_type)?,
157 channel: Self::channel_from_db(&row.channel)?,
158 priority: Self::priority_from_db(&row.priority)?,
159 status: Self::status_from_db(&row.status)?,
160 title: row.title,
161 message: row.message,
162 link_url: row.link_url,
163 metadata: row.metadata.map(|m| m.to_string()),
164 sent_at: row.sent_at,
165 read_at: row.read_at,
166 created_at: row.created_at,
167 error_message: row.error_message,
168 })
169 }
170
171 async fn find_by_id(&self, id: Uuid) -> Result<Option<Notification>, String> {
172 let row = sqlx::query!(
173 r#"
174 SELECT id, organization_id, user_id, notification_type, channel, priority, status,
175 title, message, link_url, metadata, sent_at, read_at, created_at, error_message
176 FROM notifications
177 WHERE id = $1
178 "#,
179 id
180 )
181 .fetch_optional(&self.pool)
182 .await
183 .map_err(|e| format!("Database error finding notification: {}", e))?;
184
185 match row {
186 Some(r) => Ok(Some(Notification {
187 id: r.id,
188 organization_id: r.organization_id,
189 user_id: r.user_id,
190 notification_type: Self::type_from_db(&r.notification_type)?,
191 channel: Self::channel_from_db(&r.channel)?,
192 priority: Self::priority_from_db(&r.priority)?,
193 status: Self::status_from_db(&r.status)?,
194 title: r.title,
195 message: r.message,
196 link_url: r.link_url,
197 metadata: r.metadata.map(|m| m.to_string()),
198 sent_at: r.sent_at,
199 read_at: r.read_at,
200 created_at: r.created_at,
201 error_message: r.error_message,
202 })),
203 None => Ok(None),
204 }
205 }
206
207 async fn find_by_user(&self, user_id: Uuid) -> Result<Vec<Notification>, String> {
208 let rows = sqlx::query!(
209 r#"
210 SELECT id, organization_id, user_id, notification_type, channel, priority, status,
211 title, message, link_url, metadata, sent_at, read_at, created_at, error_message
212 FROM notifications
213 WHERE user_id = $1
214 ORDER BY created_at DESC
215 "#,
216 user_id
217 )
218 .fetch_all(&self.pool)
219 .await
220 .map_err(|e| format!("Database error finding notifications by user: {}", e))?;
221
222 rows.into_iter()
223 .map(|r| {
224 Ok(Notification {
225 id: r.id,
226 organization_id: r.organization_id,
227 user_id: r.user_id,
228 notification_type: Self::type_from_db(&r.notification_type)?,
229 channel: Self::channel_from_db(&r.channel)?,
230 priority: Self::priority_from_db(&r.priority)?,
231 status: Self::status_from_db(&r.status)?,
232 title: r.title,
233 message: r.message,
234 link_url: r.link_url,
235 metadata: r.metadata.map(|m| m.to_string()),
236 sent_at: r.sent_at,
237 read_at: r.read_at,
238 created_at: r.created_at,
239 error_message: r.error_message,
240 })
241 })
242 .collect()
243 }
244
245 async fn find_by_user_and_status(
246 &self,
247 user_id: Uuid,
248 status: NotificationStatus,
249 ) -> Result<Vec<Notification>, String> {
250 let status_str = Self::status_to_db(&status);
251
252 let rows = sqlx::query!(
253 r#"
254 SELECT id, organization_id, user_id, notification_type, channel, priority, status,
255 title, message, link_url, metadata, sent_at, read_at, created_at, error_message
256 FROM notifications
257 WHERE user_id = $1 AND status = $2
258 ORDER BY created_at DESC
259 "#,
260 user_id,
261 status_str
262 )
263 .fetch_all(&self.pool)
264 .await
265 .map_err(|e| {
266 format!(
267 "Database error finding notifications by user and status: {}",
268 e
269 )
270 })?;
271
272 rows.into_iter()
273 .map(|r| {
274 Ok(Notification {
275 id: r.id,
276 organization_id: r.organization_id,
277 user_id: r.user_id,
278 notification_type: Self::type_from_db(&r.notification_type)?,
279 channel: Self::channel_from_db(&r.channel)?,
280 priority: Self::priority_from_db(&r.priority)?,
281 status: Self::status_from_db(&r.status)?,
282 title: r.title,
283 message: r.message,
284 link_url: r.link_url,
285 metadata: r.metadata.map(|m| m.to_string()),
286 sent_at: r.sent_at,
287 read_at: r.read_at,
288 created_at: r.created_at,
289 error_message: r.error_message,
290 })
291 })
292 .collect()
293 }
294
295 async fn find_by_user_and_channel(
296 &self,
297 user_id: Uuid,
298 channel: NotificationChannel,
299 ) -> Result<Vec<Notification>, String> {
300 let channel_str = Self::channel_to_db(&channel);
301
302 let rows = sqlx::query!(
303 r#"
304 SELECT id, organization_id, user_id, notification_type, channel, priority, status,
305 title, message, link_url, metadata, sent_at, read_at, created_at, error_message
306 FROM notifications
307 WHERE user_id = $1 AND channel = $2
308 ORDER BY created_at DESC
309 "#,
310 user_id,
311 channel_str
312 )
313 .fetch_all(&self.pool)
314 .await
315 .map_err(|e| {
316 format!(
317 "Database error finding notifications by user and channel: {}",
318 e
319 )
320 })?;
321
322 rows.into_iter()
323 .map(|r| {
324 Ok(Notification {
325 id: r.id,
326 organization_id: r.organization_id,
327 user_id: r.user_id,
328 notification_type: Self::type_from_db(&r.notification_type)?,
329 channel: Self::channel_from_db(&r.channel)?,
330 priority: Self::priority_from_db(&r.priority)?,
331 status: Self::status_from_db(&r.status)?,
332 title: r.title,
333 message: r.message,
334 link_url: r.link_url,
335 metadata: r.metadata.map(|m| m.to_string()),
336 sent_at: r.sent_at,
337 read_at: r.read_at,
338 created_at: r.created_at,
339 error_message: r.error_message,
340 })
341 })
342 .collect()
343 }
344
345 async fn find_unread_by_user(&self, user_id: Uuid) -> Result<Vec<Notification>, String> {
346 let rows = sqlx::query!(
347 r#"
348 SELECT id, organization_id, user_id, notification_type, channel, priority, status,
349 title, message, link_url, metadata, sent_at, read_at, created_at, error_message
350 FROM notifications
351 WHERE user_id = $1 AND channel = 'InApp' AND status = 'Sent' AND read_at IS NULL
352 ORDER BY created_at DESC
353 "#,
354 user_id
355 )
356 .fetch_all(&self.pool)
357 .await
358 .map_err(|e| format!("Database error finding unread notifications: {}", e))?;
359
360 rows.into_iter()
361 .map(|r| {
362 Ok(Notification {
363 id: r.id,
364 organization_id: r.organization_id,
365 user_id: r.user_id,
366 notification_type: Self::type_from_db(&r.notification_type)?,
367 channel: Self::channel_from_db(&r.channel)?,
368 priority: Self::priority_from_db(&r.priority)?,
369 status: Self::status_from_db(&r.status)?,
370 title: r.title,
371 message: r.message,
372 link_url: r.link_url,
373 metadata: r.metadata.map(|m| m.to_string()),
374 sent_at: r.sent_at,
375 read_at: r.read_at,
376 created_at: r.created_at,
377 error_message: r.error_message,
378 })
379 })
380 .collect()
381 }
382
383 async fn find_pending(&self) -> Result<Vec<Notification>, String> {
384 let rows = sqlx::query!(
385 r#"
386 SELECT id, organization_id, user_id, notification_type, channel, priority, status,
387 title, message, link_url, metadata, sent_at, read_at, created_at, error_message
388 FROM notifications
389 WHERE status = 'Pending'
390 ORDER BY priority DESC, created_at ASC
391 "#
392 )
393 .fetch_all(&self.pool)
394 .await
395 .map_err(|e| format!("Database error finding pending notifications: {}", e))?;
396
397 rows.into_iter()
398 .map(|r| {
399 Ok(Notification {
400 id: r.id,
401 organization_id: r.organization_id,
402 user_id: r.user_id,
403 notification_type: Self::type_from_db(&r.notification_type)?,
404 channel: Self::channel_from_db(&r.channel)?,
405 priority: Self::priority_from_db(&r.priority)?,
406 status: Self::status_from_db(&r.status)?,
407 title: r.title,
408 message: r.message,
409 link_url: r.link_url,
410 metadata: r.metadata.map(|m| m.to_string()),
411 sent_at: r.sent_at,
412 read_at: r.read_at,
413 created_at: r.created_at,
414 error_message: r.error_message,
415 })
416 })
417 .collect()
418 }
419
420 async fn find_failed(&self) -> Result<Vec<Notification>, String> {
421 let rows = sqlx::query!(
422 r#"
423 SELECT id, organization_id, user_id, notification_type, channel, priority, status,
424 title, message, link_url, metadata, sent_at, read_at, created_at, error_message
425 FROM notifications
426 WHERE status = 'Failed'
427 ORDER BY created_at ASC
428 "#
429 )
430 .fetch_all(&self.pool)
431 .await
432 .map_err(|e| format!("Database error finding failed notifications: {}", e))?;
433
434 rows.into_iter()
435 .map(|r| {
436 Ok(Notification {
437 id: r.id,
438 organization_id: r.organization_id,
439 user_id: r.user_id,
440 notification_type: Self::type_from_db(&r.notification_type)?,
441 channel: Self::channel_from_db(&r.channel)?,
442 priority: Self::priority_from_db(&r.priority)?,
443 status: Self::status_from_db(&r.status)?,
444 title: r.title,
445 message: r.message,
446 link_url: r.link_url,
447 metadata: r.metadata.map(|m| m.to_string()),
448 sent_at: r.sent_at,
449 read_at: r.read_at,
450 created_at: r.created_at,
451 error_message: r.error_message,
452 })
453 })
454 .collect()
455 }
456
457 async fn find_by_organization(
458 &self,
459 organization_id: Uuid,
460 ) -> Result<Vec<Notification>, String> {
461 let rows = sqlx::query!(
462 r#"
463 SELECT id, organization_id, user_id, notification_type, channel, priority, status,
464 title, message, link_url, metadata, sent_at, read_at, created_at, error_message
465 FROM notifications
466 WHERE organization_id = $1
467 ORDER BY created_at DESC
468 "#,
469 organization_id
470 )
471 .fetch_all(&self.pool)
472 .await
473 .map_err(|e| {
474 format!(
475 "Database error finding notifications by organization: {}",
476 e
477 )
478 })?;
479
480 rows.into_iter()
481 .map(|r| {
482 Ok(Notification {
483 id: r.id,
484 organization_id: r.organization_id,
485 user_id: r.user_id,
486 notification_type: Self::type_from_db(&r.notification_type)?,
487 channel: Self::channel_from_db(&r.channel)?,
488 priority: Self::priority_from_db(&r.priority)?,
489 status: Self::status_from_db(&r.status)?,
490 title: r.title,
491 message: r.message,
492 link_url: r.link_url,
493 metadata: r.metadata.map(|m| m.to_string()),
494 sent_at: r.sent_at,
495 read_at: r.read_at,
496 created_at: r.created_at,
497 error_message: r.error_message,
498 })
499 })
500 .collect()
501 }
502
503 async fn update(&self, notification: &Notification) -> Result<Notification, String> {
504 let type_str = Self::type_to_db(¬ification.notification_type);
505 let channel_str = Self::channel_to_db(¬ification.channel);
506 let priority_str = Self::priority_to_db(¬ification.priority);
507 let status_str = Self::status_to_db(¬ification.status);
508
509 let row = sqlx::query!(
510 r#"
511 UPDATE notifications
512 SET organization_id = $2,
513 user_id = $3,
514 notification_type = $4,
515 channel = $5,
516 priority = $6,
517 status = $7,
518 title = $8,
519 message = $9,
520 link_url = $10,
521 metadata = $11,
522 sent_at = $12,
523 read_at = $13,
524 error_message = $14
525 WHERE id = $1
526 RETURNING id, organization_id, user_id, notification_type, channel, priority, status,
527 title, message, link_url, metadata, sent_at, read_at, created_at, error_message
528 "#,
529 notification.id,
530 notification.organization_id,
531 notification.user_id,
532 type_str,
533 channel_str,
534 priority_str,
535 status_str,
536 notification.title,
537 notification.message,
538 notification.link_url,
539 notification.metadata.as_ref().map(|m| serde_json::from_str::<serde_json::Value>(m).ok()).flatten(),
540 notification.sent_at,
541 notification.read_at,
542 notification.error_message
543 )
544 .fetch_one(&self.pool)
545 .await
546 .map_err(|e| format!("Database error updating notification: {}", e))?;
547
548 Ok(Notification {
549 id: row.id,
550 organization_id: row.organization_id,
551 user_id: row.user_id,
552 notification_type: Self::type_from_db(&row.notification_type)?,
553 channel: Self::channel_from_db(&row.channel)?,
554 priority: Self::priority_from_db(&row.priority)?,
555 status: Self::status_from_db(&row.status)?,
556 title: row.title,
557 message: row.message,
558 link_url: row.link_url,
559 metadata: row.metadata.map(|m| m.to_string()),
560 sent_at: row.sent_at,
561 read_at: row.read_at,
562 created_at: row.created_at,
563 error_message: row.error_message,
564 })
565 }
566
567 async fn delete(&self, id: Uuid) -> Result<bool, String> {
568 let result = sqlx::query!(
569 r#"
570 DELETE FROM notifications
571 WHERE id = $1
572 "#,
573 id
574 )
575 .execute(&self.pool)
576 .await
577 .map_err(|e| format!("Database error deleting notification: {}", e))?;
578
579 Ok(result.rows_affected() > 0)
580 }
581
582 async fn count_unread_by_user(&self, user_id: Uuid) -> Result<i64, String> {
583 let row = sqlx::query!(
584 r#"
585 SELECT COUNT(*) as count
586 FROM notifications
587 WHERE user_id = $1 AND channel = 'InApp' AND status = 'Sent' AND read_at IS NULL
588 "#,
589 user_id
590 )
591 .fetch_one(&self.pool)
592 .await
593 .map_err(|e| format!("Database error counting unread notifications: {}", e))?;
594
595 Ok(row.count.unwrap_or(0))
596 }
597
598 async fn count_by_user_and_status(
599 &self,
600 user_id: Uuid,
601 status: NotificationStatus,
602 ) -> Result<i64, String> {
603 let status_str = Self::status_to_db(&status);
604
605 let row = sqlx::query!(
606 r#"
607 SELECT COUNT(*) as count
608 FROM notifications
609 WHERE user_id = $1 AND status = $2
610 "#,
611 user_id,
612 status_str
613 )
614 .fetch_one(&self.pool)
615 .await
616 .map_err(|e| format!("Database error counting notifications by status: {}", e))?;
617
618 Ok(row.count.unwrap_or(0))
619 }
620
621 async fn mark_all_read_by_user(&self, user_id: Uuid) -> Result<i64, String> {
622 let now = chrono::Utc::now();
623
624 let result = sqlx::query!(
625 r#"
626 UPDATE notifications
627 SET status = 'Read',
628 read_at = $2
629 WHERE user_id = $1
630 AND channel = 'InApp'
631 AND status = 'Sent'
632 AND read_at IS NULL
633 "#,
634 user_id,
635 now
636 )
637 .execute(&self.pool)
638 .await
639 .map_err(|e| format!("Database error marking all notifications as read: {}", e))?;
640
641 Ok(result.rows_affected() as i64)
642 }
643
644 async fn delete_older_than(&self, days: i64) -> Result<i64, String> {
645 let threshold = chrono::Utc::now() - chrono::Duration::days(days);
646
647 let result = sqlx::query!(
648 r#"
649 DELETE FROM notifications
650 WHERE created_at < $1
651 "#,
652 threshold
653 )
654 .execute(&self.pool)
655 .await
656 .map_err(|e| format!("Database error deleting old notifications: {}", e))?;
657
658 Ok(result.rows_affected() as i64)
659 }
660}
661
662#[cfg(test)]
663mod tests {
664 use super::*;
665
666 #[test]
667 fn test_type_conversion() {
668 assert_eq!(
669 PostgresNotificationRepository::type_to_db(&NotificationType::ExpenseCreated),
670 "ExpenseCreated"
671 );
672 assert_eq!(
673 PostgresNotificationRepository::type_from_db("MeetingConvocation").unwrap(),
674 NotificationType::MeetingConvocation
675 );
676 }
677
678 #[test]
679 fn test_channel_conversion() {
680 assert_eq!(
681 PostgresNotificationRepository::channel_to_db(&NotificationChannel::Email),
682 "Email"
683 );
684 assert_eq!(
685 PostgresNotificationRepository::channel_from_db("InApp").unwrap(),
686 NotificationChannel::InApp
687 );
688 }
689
690 #[test]
691 fn test_priority_conversion() {
692 assert_eq!(
693 PostgresNotificationRepository::priority_to_db(&NotificationPriority::Critical),
694 "Critical"
695 );
696 assert_eq!(
697 PostgresNotificationRepository::priority_from_db("Low").unwrap(),
698 NotificationPriority::Low
699 );
700 }
701
702 #[test]
703 fn test_status_conversion() {
704 assert_eq!(
705 PostgresNotificationRepository::status_to_db(&NotificationStatus::Pending),
706 "Pending"
707 );
708 assert_eq!(
709 PostgresNotificationRepository::status_from_db("Sent").unwrap(),
710 NotificationStatus::Sent
711 );
712 }
713}