koprogo_api/infrastructure/database/repositories/
notification_repository_impl.rs

1use crate::application::ports::NotificationRepository;
2use crate::domain::entities::{
3    Notification, NotificationChannel, NotificationPriority, NotificationStatus, NotificationType,
4};
5use async_trait::async_trait;
6use sqlx::PgPool;
7use uuid::Uuid;
8
9/// PostgreSQL implementation of NotificationRepository
10pub struct PostgresNotificationRepository {
11    pool: PgPool,
12}
13
14impl PostgresNotificationRepository {
15    pub fn new(pool: PgPool) -> Self {
16        Self { pool }
17    }
18
19    /// Convert NotificationType enum to database string
20    fn type_to_db(notification_type: &NotificationType) -> &'static str {
21        match notification_type {
22            NotificationType::ExpenseCreated => "ExpenseCreated",
23            NotificationType::MeetingConvocation => "MeetingConvocation",
24            NotificationType::PaymentReceived => "PaymentReceived",
25            NotificationType::TicketResolved => "TicketResolved",
26            NotificationType::DocumentAdded => "DocumentAdded",
27            NotificationType::BoardMessage => "BoardMessage",
28            NotificationType::PaymentReminder => "PaymentReminder",
29            NotificationType::BudgetApproved => "BudgetApproved",
30            NotificationType::ResolutionVote => "ResolutionVote",
31            NotificationType::System => "System",
32        }
33    }
34
35    /// Convert database string to NotificationType enum
36    fn type_from_db(s: &str) -> Result<NotificationType, String> {
37        match s {
38            "ExpenseCreated" => Ok(NotificationType::ExpenseCreated),
39            "MeetingConvocation" => Ok(NotificationType::MeetingConvocation),
40            "PaymentReceived" => Ok(NotificationType::PaymentReceived),
41            "TicketResolved" => Ok(NotificationType::TicketResolved),
42            "DocumentAdded" => Ok(NotificationType::DocumentAdded),
43            "BoardMessage" => Ok(NotificationType::BoardMessage),
44            "PaymentReminder" => Ok(NotificationType::PaymentReminder),
45            "BudgetApproved" => Ok(NotificationType::BudgetApproved),
46            "ResolutionVote" => Ok(NotificationType::ResolutionVote),
47            "System" => Ok(NotificationType::System),
48            _ => Err(format!("Invalid notification type: {}", s)),
49        }
50    }
51
52    /// Convert NotificationChannel enum to database string
53    fn channel_to_db(channel: &NotificationChannel) -> &'static str {
54        match channel {
55            NotificationChannel::Email => "Email",
56            NotificationChannel::InApp => "InApp",
57            NotificationChannel::Push => "Push",
58        }
59    }
60
61    /// Convert database string to NotificationChannel enum
62    fn channel_from_db(s: &str) -> Result<NotificationChannel, String> {
63        match s {
64            "Email" => Ok(NotificationChannel::Email),
65            "InApp" => Ok(NotificationChannel::InApp),
66            "Push" => Ok(NotificationChannel::Push),
67            _ => Err(format!("Invalid notification channel: {}", s)),
68        }
69    }
70
71    /// Convert NotificationPriority enum to database string
72    fn priority_to_db(priority: &NotificationPriority) -> &'static str {
73        match priority {
74            NotificationPriority::Low => "Low",
75            NotificationPriority::Medium => "Medium",
76            NotificationPriority::High => "High",
77            NotificationPriority::Critical => "Critical",
78        }
79    }
80
81    /// Convert database string to NotificationPriority enum
82    fn priority_from_db(s: &str) -> Result<NotificationPriority, String> {
83        match s {
84            "Low" => Ok(NotificationPriority::Low),
85            "Medium" => Ok(NotificationPriority::Medium),
86            "High" => Ok(NotificationPriority::High),
87            "Critical" => Ok(NotificationPriority::Critical),
88            _ => Err(format!("Invalid notification priority: {}", s)),
89        }
90    }
91
92    /// Convert NotificationStatus enum to database string
93    fn status_to_db(status: &NotificationStatus) -> &'static str {
94        match status {
95            NotificationStatus::Pending => "Pending",
96            NotificationStatus::Sent => "Sent",
97            NotificationStatus::Failed => "Failed",
98            NotificationStatus::Read => "Read",
99        }
100    }
101
102    /// Convert database string to NotificationStatus enum
103    fn status_from_db(s: &str) -> Result<NotificationStatus, String> {
104        match s {
105            "Pending" => Ok(NotificationStatus::Pending),
106            "Sent" => Ok(NotificationStatus::Sent),
107            "Failed" => Ok(NotificationStatus::Failed),
108            "Read" => Ok(NotificationStatus::Read),
109            _ => Err(format!("Invalid notification status: {}", s)),
110        }
111    }
112}
113
114#[async_trait]
115impl NotificationRepository for PostgresNotificationRepository {
116    async fn create(&self, notification: &Notification) -> Result<Notification, String> {
117        let type_str = Self::type_to_db(&notification.notification_type);
118        let channel_str = Self::channel_to_db(&notification.channel);
119        let priority_str = Self::priority_to_db(&notification.priority);
120        let status_str = Self::status_to_db(&notification.status);
121
122        let row = sqlx::query!(
123            r#"
124            INSERT INTO notifications (
125                id, organization_id, user_id, notification_type, channel, priority, status,
126                title, message, link_url, metadata, sent_at, read_at, created_at, error_message
127            )
128            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
129            RETURNING id, organization_id, user_id, notification_type, channel, priority, status,
130                      title, message, link_url, metadata, sent_at, read_at, created_at, error_message
131            "#,
132            notification.id,
133            notification.organization_id,
134            notification.user_id,
135            type_str,
136            channel_str,
137            priority_str,
138            status_str,
139            notification.title,
140            notification.message,
141            notification.link_url,
142            notification.metadata.as_ref().map(|m| serde_json::from_str::<serde_json::Value>(m).ok()).flatten(),
143            notification.sent_at,
144            notification.read_at,
145            notification.created_at,
146            notification.error_message
147        )
148        .fetch_one(&self.pool)
149        .await
150        .map_err(|e| format!("Database error creating notification: {}", e))?;
151
152        Ok(Notification {
153            id: row.id,
154            organization_id: row.organization_id,
155            user_id: row.user_id,
156            notification_type: Self::type_from_db(&row.notification_type)?,
157            channel: Self::channel_from_db(&row.channel)?,
158            priority: Self::priority_from_db(&row.priority)?,
159            status: Self::status_from_db(&row.status)?,
160            title: row.title,
161            message: row.message,
162            link_url: row.link_url,
163            metadata: row.metadata.map(|m| m.to_string()),
164            sent_at: row.sent_at,
165            read_at: row.read_at,
166            created_at: row.created_at,
167            error_message: row.error_message,
168        })
169    }
170
171    async fn find_by_id(&self, id: Uuid) -> Result<Option<Notification>, String> {
172        let row = sqlx::query!(
173            r#"
174            SELECT id, organization_id, user_id, notification_type, channel, priority, status,
175                   title, message, link_url, metadata, sent_at, read_at, created_at, error_message
176            FROM notifications
177            WHERE id = $1
178            "#,
179            id
180        )
181        .fetch_optional(&self.pool)
182        .await
183        .map_err(|e| format!("Database error finding notification: {}", e))?;
184
185        match row {
186            Some(r) => Ok(Some(Notification {
187                id: r.id,
188                organization_id: r.organization_id,
189                user_id: r.user_id,
190                notification_type: Self::type_from_db(&r.notification_type)?,
191                channel: Self::channel_from_db(&r.channel)?,
192                priority: Self::priority_from_db(&r.priority)?,
193                status: Self::status_from_db(&r.status)?,
194                title: r.title,
195                message: r.message,
196                link_url: r.link_url,
197                metadata: r.metadata.map(|m| m.to_string()),
198                sent_at: r.sent_at,
199                read_at: r.read_at,
200                created_at: r.created_at,
201                error_message: r.error_message,
202            })),
203            None => Ok(None),
204        }
205    }
206
207    async fn find_by_user(&self, user_id: Uuid) -> Result<Vec<Notification>, String> {
208        let rows = sqlx::query!(
209            r#"
210            SELECT id, organization_id, user_id, notification_type, channel, priority, status,
211                   title, message, link_url, metadata, sent_at, read_at, created_at, error_message
212            FROM notifications
213            WHERE user_id = $1
214            ORDER BY created_at DESC
215            "#,
216            user_id
217        )
218        .fetch_all(&self.pool)
219        .await
220        .map_err(|e| format!("Database error finding notifications by user: {}", e))?;
221
222        rows.into_iter()
223            .map(|r| {
224                Ok(Notification {
225                    id: r.id,
226                    organization_id: r.organization_id,
227                    user_id: r.user_id,
228                    notification_type: Self::type_from_db(&r.notification_type)?,
229                    channel: Self::channel_from_db(&r.channel)?,
230                    priority: Self::priority_from_db(&r.priority)?,
231                    status: Self::status_from_db(&r.status)?,
232                    title: r.title,
233                    message: r.message,
234                    link_url: r.link_url,
235                    metadata: r.metadata.map(|m| m.to_string()),
236                    sent_at: r.sent_at,
237                    read_at: r.read_at,
238                    created_at: r.created_at,
239                    error_message: r.error_message,
240                })
241            })
242            .collect()
243    }
244
245    async fn find_by_user_and_status(
246        &self,
247        user_id: Uuid,
248        status: NotificationStatus,
249    ) -> Result<Vec<Notification>, String> {
250        let status_str = Self::status_to_db(&status);
251
252        let rows = sqlx::query!(
253            r#"
254            SELECT id, organization_id, user_id, notification_type, channel, priority, status,
255                   title, message, link_url, metadata, sent_at, read_at, created_at, error_message
256            FROM notifications
257            WHERE user_id = $1 AND status = $2
258            ORDER BY created_at DESC
259            "#,
260            user_id,
261            status_str
262        )
263        .fetch_all(&self.pool)
264        .await
265        .map_err(|e| {
266            format!(
267                "Database error finding notifications by user and status: {}",
268                e
269            )
270        })?;
271
272        rows.into_iter()
273            .map(|r| {
274                Ok(Notification {
275                    id: r.id,
276                    organization_id: r.organization_id,
277                    user_id: r.user_id,
278                    notification_type: Self::type_from_db(&r.notification_type)?,
279                    channel: Self::channel_from_db(&r.channel)?,
280                    priority: Self::priority_from_db(&r.priority)?,
281                    status: Self::status_from_db(&r.status)?,
282                    title: r.title,
283                    message: r.message,
284                    link_url: r.link_url,
285                    metadata: r.metadata.map(|m| m.to_string()),
286                    sent_at: r.sent_at,
287                    read_at: r.read_at,
288                    created_at: r.created_at,
289                    error_message: r.error_message,
290                })
291            })
292            .collect()
293    }
294
295    async fn find_by_user_and_channel(
296        &self,
297        user_id: Uuid,
298        channel: NotificationChannel,
299    ) -> Result<Vec<Notification>, String> {
300        let channel_str = Self::channel_to_db(&channel);
301
302        let rows = sqlx::query!(
303            r#"
304            SELECT id, organization_id, user_id, notification_type, channel, priority, status,
305                   title, message, link_url, metadata, sent_at, read_at, created_at, error_message
306            FROM notifications
307            WHERE user_id = $1 AND channel = $2
308            ORDER BY created_at DESC
309            "#,
310            user_id,
311            channel_str
312        )
313        .fetch_all(&self.pool)
314        .await
315        .map_err(|e| {
316            format!(
317                "Database error finding notifications by user and channel: {}",
318                e
319            )
320        })?;
321
322        rows.into_iter()
323            .map(|r| {
324                Ok(Notification {
325                    id: r.id,
326                    organization_id: r.organization_id,
327                    user_id: r.user_id,
328                    notification_type: Self::type_from_db(&r.notification_type)?,
329                    channel: Self::channel_from_db(&r.channel)?,
330                    priority: Self::priority_from_db(&r.priority)?,
331                    status: Self::status_from_db(&r.status)?,
332                    title: r.title,
333                    message: r.message,
334                    link_url: r.link_url,
335                    metadata: r.metadata.map(|m| m.to_string()),
336                    sent_at: r.sent_at,
337                    read_at: r.read_at,
338                    created_at: r.created_at,
339                    error_message: r.error_message,
340                })
341            })
342            .collect()
343    }
344
345    async fn find_unread_by_user(&self, user_id: Uuid) -> Result<Vec<Notification>, String> {
346        let rows = sqlx::query!(
347            r#"
348            SELECT id, organization_id, user_id, notification_type, channel, priority, status,
349                   title, message, link_url, metadata, sent_at, read_at, created_at, error_message
350            FROM notifications
351            WHERE user_id = $1 AND channel = 'InApp' AND status = 'Sent' AND read_at IS NULL
352            ORDER BY created_at DESC
353            "#,
354            user_id
355        )
356        .fetch_all(&self.pool)
357        .await
358        .map_err(|e| format!("Database error finding unread notifications: {}", e))?;
359
360        rows.into_iter()
361            .map(|r| {
362                Ok(Notification {
363                    id: r.id,
364                    organization_id: r.organization_id,
365                    user_id: r.user_id,
366                    notification_type: Self::type_from_db(&r.notification_type)?,
367                    channel: Self::channel_from_db(&r.channel)?,
368                    priority: Self::priority_from_db(&r.priority)?,
369                    status: Self::status_from_db(&r.status)?,
370                    title: r.title,
371                    message: r.message,
372                    link_url: r.link_url,
373                    metadata: r.metadata.map(|m| m.to_string()),
374                    sent_at: r.sent_at,
375                    read_at: r.read_at,
376                    created_at: r.created_at,
377                    error_message: r.error_message,
378                })
379            })
380            .collect()
381    }
382
383    async fn find_pending(&self) -> Result<Vec<Notification>, String> {
384        let rows = sqlx::query!(
385            r#"
386            SELECT id, organization_id, user_id, notification_type, channel, priority, status,
387                   title, message, link_url, metadata, sent_at, read_at, created_at, error_message
388            FROM notifications
389            WHERE status = 'Pending'
390            ORDER BY priority DESC, created_at ASC
391            "#
392        )
393        .fetch_all(&self.pool)
394        .await
395        .map_err(|e| format!("Database error finding pending notifications: {}", e))?;
396
397        rows.into_iter()
398            .map(|r| {
399                Ok(Notification {
400                    id: r.id,
401                    organization_id: r.organization_id,
402                    user_id: r.user_id,
403                    notification_type: Self::type_from_db(&r.notification_type)?,
404                    channel: Self::channel_from_db(&r.channel)?,
405                    priority: Self::priority_from_db(&r.priority)?,
406                    status: Self::status_from_db(&r.status)?,
407                    title: r.title,
408                    message: r.message,
409                    link_url: r.link_url,
410                    metadata: r.metadata.map(|m| m.to_string()),
411                    sent_at: r.sent_at,
412                    read_at: r.read_at,
413                    created_at: r.created_at,
414                    error_message: r.error_message,
415                })
416            })
417            .collect()
418    }
419
420    async fn find_failed(&self) -> Result<Vec<Notification>, String> {
421        let rows = sqlx::query!(
422            r#"
423            SELECT id, organization_id, user_id, notification_type, channel, priority, status,
424                   title, message, link_url, metadata, sent_at, read_at, created_at, error_message
425            FROM notifications
426            WHERE status = 'Failed'
427            ORDER BY created_at ASC
428            "#
429        )
430        .fetch_all(&self.pool)
431        .await
432        .map_err(|e| format!("Database error finding failed notifications: {}", e))?;
433
434        rows.into_iter()
435            .map(|r| {
436                Ok(Notification {
437                    id: r.id,
438                    organization_id: r.organization_id,
439                    user_id: r.user_id,
440                    notification_type: Self::type_from_db(&r.notification_type)?,
441                    channel: Self::channel_from_db(&r.channel)?,
442                    priority: Self::priority_from_db(&r.priority)?,
443                    status: Self::status_from_db(&r.status)?,
444                    title: r.title,
445                    message: r.message,
446                    link_url: r.link_url,
447                    metadata: r.metadata.map(|m| m.to_string()),
448                    sent_at: r.sent_at,
449                    read_at: r.read_at,
450                    created_at: r.created_at,
451                    error_message: r.error_message,
452                })
453            })
454            .collect()
455    }
456
457    async fn find_by_organization(
458        &self,
459        organization_id: Uuid,
460    ) -> Result<Vec<Notification>, String> {
461        let rows = sqlx::query!(
462            r#"
463            SELECT id, organization_id, user_id, notification_type, channel, priority, status,
464                   title, message, link_url, metadata, sent_at, read_at, created_at, error_message
465            FROM notifications
466            WHERE organization_id = $1
467            ORDER BY created_at DESC
468            "#,
469            organization_id
470        )
471        .fetch_all(&self.pool)
472        .await
473        .map_err(|e| {
474            format!(
475                "Database error finding notifications by organization: {}",
476                e
477            )
478        })?;
479
480        rows.into_iter()
481            .map(|r| {
482                Ok(Notification {
483                    id: r.id,
484                    organization_id: r.organization_id,
485                    user_id: r.user_id,
486                    notification_type: Self::type_from_db(&r.notification_type)?,
487                    channel: Self::channel_from_db(&r.channel)?,
488                    priority: Self::priority_from_db(&r.priority)?,
489                    status: Self::status_from_db(&r.status)?,
490                    title: r.title,
491                    message: r.message,
492                    link_url: r.link_url,
493                    metadata: r.metadata.map(|m| m.to_string()),
494                    sent_at: r.sent_at,
495                    read_at: r.read_at,
496                    created_at: r.created_at,
497                    error_message: r.error_message,
498                })
499            })
500            .collect()
501    }
502
503    async fn update(&self, notification: &Notification) -> Result<Notification, String> {
504        let type_str = Self::type_to_db(&notification.notification_type);
505        let channel_str = Self::channel_to_db(&notification.channel);
506        let priority_str = Self::priority_to_db(&notification.priority);
507        let status_str = Self::status_to_db(&notification.status);
508
509        let row = sqlx::query!(
510            r#"
511            UPDATE notifications
512            SET organization_id = $2,
513                user_id = $3,
514                notification_type = $4,
515                channel = $5,
516                priority = $6,
517                status = $7,
518                title = $8,
519                message = $9,
520                link_url = $10,
521                metadata = $11,
522                sent_at = $12,
523                read_at = $13,
524                error_message = $14
525            WHERE id = $1
526            RETURNING id, organization_id, user_id, notification_type, channel, priority, status,
527                      title, message, link_url, metadata, sent_at, read_at, created_at, error_message
528            "#,
529            notification.id,
530            notification.organization_id,
531            notification.user_id,
532            type_str,
533            channel_str,
534            priority_str,
535            status_str,
536            notification.title,
537            notification.message,
538            notification.link_url,
539            notification.metadata.as_ref().map(|m| serde_json::from_str::<serde_json::Value>(m).ok()).flatten(),
540            notification.sent_at,
541            notification.read_at,
542            notification.error_message
543        )
544        .fetch_one(&self.pool)
545        .await
546        .map_err(|e| format!("Database error updating notification: {}", e))?;
547
548        Ok(Notification {
549            id: row.id,
550            organization_id: row.organization_id,
551            user_id: row.user_id,
552            notification_type: Self::type_from_db(&row.notification_type)?,
553            channel: Self::channel_from_db(&row.channel)?,
554            priority: Self::priority_from_db(&row.priority)?,
555            status: Self::status_from_db(&row.status)?,
556            title: row.title,
557            message: row.message,
558            link_url: row.link_url,
559            metadata: row.metadata.map(|m| m.to_string()),
560            sent_at: row.sent_at,
561            read_at: row.read_at,
562            created_at: row.created_at,
563            error_message: row.error_message,
564        })
565    }
566
567    async fn delete(&self, id: Uuid) -> Result<bool, String> {
568        let result = sqlx::query!(
569            r#"
570            DELETE FROM notifications
571            WHERE id = $1
572            "#,
573            id
574        )
575        .execute(&self.pool)
576        .await
577        .map_err(|e| format!("Database error deleting notification: {}", e))?;
578
579        Ok(result.rows_affected() > 0)
580    }
581
582    async fn count_unread_by_user(&self, user_id: Uuid) -> Result<i64, String> {
583        let row = sqlx::query!(
584            r#"
585            SELECT COUNT(*) as count
586            FROM notifications
587            WHERE user_id = $1 AND channel = 'InApp' AND status = 'Sent' AND read_at IS NULL
588            "#,
589            user_id
590        )
591        .fetch_one(&self.pool)
592        .await
593        .map_err(|e| format!("Database error counting unread notifications: {}", e))?;
594
595        Ok(row.count.unwrap_or(0))
596    }
597
598    async fn count_by_user_and_status(
599        &self,
600        user_id: Uuid,
601        status: NotificationStatus,
602    ) -> Result<i64, String> {
603        let status_str = Self::status_to_db(&status);
604
605        let row = sqlx::query!(
606            r#"
607            SELECT COUNT(*) as count
608            FROM notifications
609            WHERE user_id = $1 AND status = $2
610            "#,
611            user_id,
612            status_str
613        )
614        .fetch_one(&self.pool)
615        .await
616        .map_err(|e| format!("Database error counting notifications by status: {}", e))?;
617
618        Ok(row.count.unwrap_or(0))
619    }
620
621    async fn mark_all_read_by_user(&self, user_id: Uuid) -> Result<i64, String> {
622        let now = chrono::Utc::now();
623
624        let result = sqlx::query!(
625            r#"
626            UPDATE notifications
627            SET status = 'Read',
628                read_at = $2
629            WHERE user_id = $1
630              AND channel = 'InApp'
631              AND status = 'Sent'
632              AND read_at IS NULL
633            "#,
634            user_id,
635            now
636        )
637        .execute(&self.pool)
638        .await
639        .map_err(|e| format!("Database error marking all notifications as read: {}", e))?;
640
641        Ok(result.rows_affected() as i64)
642    }
643
644    async fn delete_older_than(&self, days: i64) -> Result<i64, String> {
645        let threshold = chrono::Utc::now() - chrono::Duration::days(days);
646
647        let result = sqlx::query!(
648            r#"
649            DELETE FROM notifications
650            WHERE created_at < $1
651            "#,
652            threshold
653        )
654        .execute(&self.pool)
655        .await
656        .map_err(|e| format!("Database error deleting old notifications: {}", e))?;
657
658        Ok(result.rows_affected() as i64)
659    }
660}
661
662#[cfg(test)]
663mod tests {
664    use super::*;
665
666    #[test]
667    fn test_type_conversion() {
668        assert_eq!(
669            PostgresNotificationRepository::type_to_db(&NotificationType::ExpenseCreated),
670            "ExpenseCreated"
671        );
672        assert_eq!(
673            PostgresNotificationRepository::type_from_db("MeetingConvocation").unwrap(),
674            NotificationType::MeetingConvocation
675        );
676    }
677
678    #[test]
679    fn test_channel_conversion() {
680        assert_eq!(
681            PostgresNotificationRepository::channel_to_db(&NotificationChannel::Email),
682            "Email"
683        );
684        assert_eq!(
685            PostgresNotificationRepository::channel_from_db("InApp").unwrap(),
686            NotificationChannel::InApp
687        );
688    }
689
690    #[test]
691    fn test_priority_conversion() {
692        assert_eq!(
693            PostgresNotificationRepository::priority_to_db(&NotificationPriority::Critical),
694            "Critical"
695        );
696        assert_eq!(
697            PostgresNotificationRepository::priority_from_db("Low").unwrap(),
698            NotificationPriority::Low
699        );
700    }
701
702    #[test]
703    fn test_status_conversion() {
704        assert_eq!(
705            PostgresNotificationRepository::status_to_db(&NotificationStatus::Pending),
706            "Pending"
707        );
708        assert_eq!(
709            PostgresNotificationRepository::status_from_db("Sent").unwrap(),
710            NotificationStatus::Sent
711        );
712    }
713}