koprogo_api/infrastructure/database/repositories/
ag_session_repository_impl.rs

1use crate::application::ports::ag_session_repository::AgSessionRepository;
2use crate::domain::entities::ag_session::{AgSession, AgSessionStatus, VideoPlatform};
3use crate::infrastructure::database::pool::DbPool;
4use async_trait::async_trait;
5use sqlx::Row;
6use uuid::Uuid;
7
8pub struct PostgresAgSessionRepository {
9    pool: DbPool,
10}
11
12impl PostgresAgSessionRepository {
13    pub fn new(pool: DbPool) -> Self {
14        Self { pool }
15    }
16}
17
18fn row_to_ag_session(row: &sqlx::postgres::PgRow) -> AgSession {
19    let platform_str: String = row.get("platform");
20    let platform = VideoPlatform::from_db_string(&platform_str).unwrap_or(VideoPlatform::Other);
21
22    let status_str: String = row.get("status");
23    let status = AgSessionStatus::from_db_string(&status_str).unwrap_or(AgSessionStatus::Scheduled);
24
25    AgSession {
26        id: row.get("id"),
27        organization_id: row.get("organization_id"),
28        meeting_id: row.get("meeting_id"),
29        platform,
30        video_url: row.get("video_url"),
31        host_url: row.get("host_url"),
32        status,
33        scheduled_start: row.get("scheduled_start"),
34        actual_start: row.get("actual_start"),
35        actual_end: row.get("actual_end"),
36        remote_attendees_count: row.get("remote_attendees_count"),
37        remote_voting_power: row.get("remote_voting_power"),
38        quorum_remote_contribution: row.get("quorum_remote_contribution"),
39        access_password: row.get("access_password"),
40        waiting_room_enabled: row.get("waiting_room_enabled"),
41        recording_enabled: row.get("recording_enabled"),
42        recording_url: row.get("recording_url"),
43        created_at: row.get("created_at"),
44        updated_at: row.get("updated_at"),
45        created_by: row.get("created_by"),
46    }
47}
48
49#[async_trait]
50impl AgSessionRepository for PostgresAgSessionRepository {
51    async fn create(&self, session: &AgSession) -> Result<AgSession, String> {
52        sqlx::query(
53            r#"
54            INSERT INTO ag_sessions (
55                id, organization_id, meeting_id, platform, video_url, host_url,
56                status, scheduled_start, actual_start, actual_end,
57                remote_attendees_count, remote_voting_power, quorum_remote_contribution,
58                access_password, waiting_room_enabled, recording_enabled, recording_url,
59                created_at, updated_at, created_by
60            ) VALUES (
61                $1, $2, $3, $4::video_platform, $5, $6,
62                $7::ag_session_status, $8, $9, $10,
63                $11, $12, $13,
64                $14, $15, $16, $17,
65                $18, $19, $20
66            )
67            "#,
68        )
69        .bind(session.id)
70        .bind(session.organization_id)
71        .bind(session.meeting_id)
72        .bind(session.platform.to_db_str())
73        .bind(&session.video_url)
74        .bind(&session.host_url)
75        .bind(session.status.to_db_str())
76        .bind(session.scheduled_start)
77        .bind(session.actual_start)
78        .bind(session.actual_end)
79        .bind(session.remote_attendees_count)
80        .bind(session.remote_voting_power)
81        .bind(session.quorum_remote_contribution)
82        .bind(&session.access_password)
83        .bind(session.waiting_room_enabled)
84        .bind(session.recording_enabled)
85        .bind(&session.recording_url)
86        .bind(session.created_at)
87        .bind(session.updated_at)
88        .bind(session.created_by)
89        .execute(&self.pool)
90        .await
91        .map_err(|e| format!("Database error creating ag_session: {}", e))?;
92
93        Ok(session.clone())
94    }
95
96    async fn find_by_id(&self, id: Uuid) -> Result<Option<AgSession>, String> {
97        let row = sqlx::query(
98            r#"
99            SELECT id, organization_id, meeting_id, platform::TEXT, video_url, host_url,
100                   status::TEXT, scheduled_start, actual_start, actual_end,
101                   remote_attendees_count, remote_voting_power::FLOAT8,
102                   quorum_remote_contribution::FLOAT8,
103                   access_password, waiting_room_enabled, recording_enabled, recording_url,
104                   created_at, updated_at, created_by
105            FROM ag_sessions
106            WHERE id = $1
107            "#,
108        )
109        .bind(id)
110        .fetch_optional(&self.pool)
111        .await
112        .map_err(|e| format!("Database error finding ag_session: {}", e))?;
113
114        Ok(row.as_ref().map(row_to_ag_session))
115    }
116
117    async fn find_by_meeting_id(&self, meeting_id: Uuid) -> Result<Option<AgSession>, String> {
118        let row = sqlx::query(
119            r#"
120            SELECT id, organization_id, meeting_id, platform::TEXT, video_url, host_url,
121                   status::TEXT, scheduled_start, actual_start, actual_end,
122                   remote_attendees_count, remote_voting_power::FLOAT8,
123                   quorum_remote_contribution::FLOAT8,
124                   access_password, waiting_room_enabled, recording_enabled, recording_url,
125                   created_at, updated_at, created_by
126            FROM ag_sessions
127            WHERE meeting_id = $1
128            "#,
129        )
130        .bind(meeting_id)
131        .fetch_optional(&self.pool)
132        .await
133        .map_err(|e| format!("Database error finding ag_session by meeting: {}", e))?;
134
135        Ok(row.as_ref().map(row_to_ag_session))
136    }
137
138    async fn find_by_organization(&self, organization_id: Uuid) -> Result<Vec<AgSession>, String> {
139        let rows = sqlx::query(
140            r#"
141            SELECT id, organization_id, meeting_id, platform::TEXT, video_url, host_url,
142                   status::TEXT, scheduled_start, actual_start, actual_end,
143                   remote_attendees_count, remote_voting_power::FLOAT8,
144                   quorum_remote_contribution::FLOAT8,
145                   access_password, waiting_room_enabled, recording_enabled, recording_url,
146                   created_at, updated_at, created_by
147            FROM ag_sessions
148            WHERE organization_id = $1
149            ORDER BY scheduled_start DESC
150            "#,
151        )
152        .bind(organization_id)
153        .fetch_all(&self.pool)
154        .await
155        .map_err(|e| format!("Database error listing ag_sessions: {}", e))?;
156
157        Ok(rows.iter().map(row_to_ag_session).collect())
158    }
159
160    async fn update(&self, session: &AgSession) -> Result<AgSession, String> {
161        sqlx::query(
162            r#"
163            UPDATE ag_sessions SET
164                platform = $2::video_platform,
165                video_url = $3,
166                host_url = $4,
167                status = $5::ag_session_status,
168                scheduled_start = $6,
169                actual_start = $7,
170                actual_end = $8,
171                remote_attendees_count = $9,
172                remote_voting_power = $10,
173                quorum_remote_contribution = $11,
174                access_password = $12,
175                waiting_room_enabled = $13,
176                recording_enabled = $14,
177                recording_url = $15,
178                updated_at = $16
179            WHERE id = $1
180            "#,
181        )
182        .bind(session.id)
183        .bind(session.platform.to_db_str())
184        .bind(&session.video_url)
185        .bind(&session.host_url)
186        .bind(session.status.to_db_str())
187        .bind(session.scheduled_start)
188        .bind(session.actual_start)
189        .bind(session.actual_end)
190        .bind(session.remote_attendees_count)
191        .bind(session.remote_voting_power)
192        .bind(session.quorum_remote_contribution)
193        .bind(&session.access_password)
194        .bind(session.waiting_room_enabled)
195        .bind(session.recording_enabled)
196        .bind(&session.recording_url)
197        .bind(session.updated_at)
198        .execute(&self.pool)
199        .await
200        .map_err(|e| format!("Database error updating ag_session: {}", e))?;
201
202        Ok(session.clone())
203    }
204
205    async fn delete(&self, id: Uuid) -> Result<bool, String> {
206        let result = sqlx::query("DELETE FROM ag_sessions WHERE id = $1")
207            .bind(id)
208            .execute(&self.pool)
209            .await
210            .map_err(|e| format!("Database error deleting ag_session: {}", e))?;
211
212        Ok(result.rows_affected() > 0)
213    }
214
215    async fn find_pending_start(&self) -> Result<Vec<AgSession>, String> {
216        let rows = sqlx::query(
217            r#"
218            SELECT id, organization_id, meeting_id, platform::TEXT, video_url, host_url,
219                   status::TEXT, scheduled_start, actual_start, actual_end,
220                   remote_attendees_count, remote_voting_power::FLOAT8,
221                   quorum_remote_contribution::FLOAT8,
222                   access_password, waiting_room_enabled, recording_enabled, recording_url,
223                   created_at, updated_at, created_by
224            FROM ag_sessions
225            WHERE status = 'scheduled'
226              AND scheduled_start <= NOW()
227            ORDER BY scheduled_start ASC
228            "#,
229        )
230        .fetch_all(&self.pool)
231        .await
232        .map_err(|e| format!("Database error finding pending ag_sessions: {}", e))?;
233
234        Ok(rows.iter().map(row_to_ag_session).collect())
235    }
236}