1use crate::application::ports::ag_session_repository::AgSessionRepository;
2use crate::domain::entities::ag_session::{AgSession, AgSessionStatus, VideoPlatform};
3use crate::infrastructure::database::pool::DbPool;
4use async_trait::async_trait;
5use sqlx::Row;
6use uuid::Uuid;
7
8pub struct PostgresAgSessionRepository {
9 pool: DbPool,
10}
11
12impl PostgresAgSessionRepository {
13 pub fn new(pool: DbPool) -> Self {
14 Self { pool }
15 }
16}
17
18fn row_to_ag_session(row: &sqlx::postgres::PgRow) -> AgSession {
19 let platform_str: String = row.get("platform");
20 let platform = VideoPlatform::from_db_string(&platform_str).unwrap_or(VideoPlatform::Other);
21
22 let status_str: String = row.get("status");
23 let status = AgSessionStatus::from_db_string(&status_str).unwrap_or(AgSessionStatus::Scheduled);
24
25 AgSession {
26 id: row.get("id"),
27 organization_id: row.get("organization_id"),
28 meeting_id: row.get("meeting_id"),
29 platform,
30 video_url: row.get("video_url"),
31 host_url: row.get("host_url"),
32 status,
33 scheduled_start: row.get("scheduled_start"),
34 actual_start: row.get("actual_start"),
35 actual_end: row.get("actual_end"),
36 remote_attendees_count: row.get("remote_attendees_count"),
37 remote_voting_power: row.get("remote_voting_power"),
38 quorum_remote_contribution: row.get("quorum_remote_contribution"),
39 access_password: row.get("access_password"),
40 waiting_room_enabled: row.get("waiting_room_enabled"),
41 recording_enabled: row.get("recording_enabled"),
42 recording_url: row.get("recording_url"),
43 created_at: row.get("created_at"),
44 updated_at: row.get("updated_at"),
45 created_by: row.get("created_by"),
46 }
47}
48
49#[async_trait]
50impl AgSessionRepository for PostgresAgSessionRepository {
51 async fn create(&self, session: &AgSession) -> Result<AgSession, String> {
52 sqlx::query(
53 r#"
54 INSERT INTO ag_sessions (
55 id, organization_id, meeting_id, platform, video_url, host_url,
56 status, scheduled_start, actual_start, actual_end,
57 remote_attendees_count, remote_voting_power, quorum_remote_contribution,
58 access_password, waiting_room_enabled, recording_enabled, recording_url,
59 created_at, updated_at, created_by
60 ) VALUES (
61 $1, $2, $3, $4::video_platform, $5, $6,
62 $7::ag_session_status, $8, $9, $10,
63 $11, $12, $13,
64 $14, $15, $16, $17,
65 $18, $19, $20
66 )
67 "#,
68 )
69 .bind(session.id)
70 .bind(session.organization_id)
71 .bind(session.meeting_id)
72 .bind(session.platform.to_db_str())
73 .bind(&session.video_url)
74 .bind(&session.host_url)
75 .bind(session.status.to_db_str())
76 .bind(session.scheduled_start)
77 .bind(session.actual_start)
78 .bind(session.actual_end)
79 .bind(session.remote_attendees_count)
80 .bind(session.remote_voting_power)
81 .bind(session.quorum_remote_contribution)
82 .bind(&session.access_password)
83 .bind(session.waiting_room_enabled)
84 .bind(session.recording_enabled)
85 .bind(&session.recording_url)
86 .bind(session.created_at)
87 .bind(session.updated_at)
88 .bind(session.created_by)
89 .execute(&self.pool)
90 .await
91 .map_err(|e| format!("Database error creating ag_session: {}", e))?;
92
93 Ok(session.clone())
94 }
95
96 async fn find_by_id(&self, id: Uuid) -> Result<Option<AgSession>, String> {
97 let row = sqlx::query(
98 r#"
99 SELECT id, organization_id, meeting_id, platform::TEXT, video_url, host_url,
100 status::TEXT, scheduled_start, actual_start, actual_end,
101 remote_attendees_count, remote_voting_power::FLOAT8,
102 quorum_remote_contribution::FLOAT8,
103 access_password, waiting_room_enabled, recording_enabled, recording_url,
104 created_at, updated_at, created_by
105 FROM ag_sessions
106 WHERE id = $1
107 "#,
108 )
109 .bind(id)
110 .fetch_optional(&self.pool)
111 .await
112 .map_err(|e| format!("Database error finding ag_session: {}", e))?;
113
114 Ok(row.as_ref().map(row_to_ag_session))
115 }
116
117 async fn find_by_meeting_id(&self, meeting_id: Uuid) -> Result<Option<AgSession>, String> {
118 let row = sqlx::query(
119 r#"
120 SELECT id, organization_id, meeting_id, platform::TEXT, video_url, host_url,
121 status::TEXT, scheduled_start, actual_start, actual_end,
122 remote_attendees_count, remote_voting_power::FLOAT8,
123 quorum_remote_contribution::FLOAT8,
124 access_password, waiting_room_enabled, recording_enabled, recording_url,
125 created_at, updated_at, created_by
126 FROM ag_sessions
127 WHERE meeting_id = $1
128 "#,
129 )
130 .bind(meeting_id)
131 .fetch_optional(&self.pool)
132 .await
133 .map_err(|e| format!("Database error finding ag_session by meeting: {}", e))?;
134
135 Ok(row.as_ref().map(row_to_ag_session))
136 }
137
138 async fn find_by_organization(&self, organization_id: Uuid) -> Result<Vec<AgSession>, String> {
139 let rows = sqlx::query(
140 r#"
141 SELECT id, organization_id, meeting_id, platform::TEXT, video_url, host_url,
142 status::TEXT, scheduled_start, actual_start, actual_end,
143 remote_attendees_count, remote_voting_power::FLOAT8,
144 quorum_remote_contribution::FLOAT8,
145 access_password, waiting_room_enabled, recording_enabled, recording_url,
146 created_at, updated_at, created_by
147 FROM ag_sessions
148 WHERE organization_id = $1
149 ORDER BY scheduled_start DESC
150 "#,
151 )
152 .bind(organization_id)
153 .fetch_all(&self.pool)
154 .await
155 .map_err(|e| format!("Database error listing ag_sessions: {}", e))?;
156
157 Ok(rows.iter().map(row_to_ag_session).collect())
158 }
159
160 async fn update(&self, session: &AgSession) -> Result<AgSession, String> {
161 sqlx::query(
162 r#"
163 UPDATE ag_sessions SET
164 platform = $2::video_platform,
165 video_url = $3,
166 host_url = $4,
167 status = $5::ag_session_status,
168 scheduled_start = $6,
169 actual_start = $7,
170 actual_end = $8,
171 remote_attendees_count = $9,
172 remote_voting_power = $10,
173 quorum_remote_contribution = $11,
174 access_password = $12,
175 waiting_room_enabled = $13,
176 recording_enabled = $14,
177 recording_url = $15,
178 updated_at = $16
179 WHERE id = $1
180 "#,
181 )
182 .bind(session.id)
183 .bind(session.platform.to_db_str())
184 .bind(&session.video_url)
185 .bind(&session.host_url)
186 .bind(session.status.to_db_str())
187 .bind(session.scheduled_start)
188 .bind(session.actual_start)
189 .bind(session.actual_end)
190 .bind(session.remote_attendees_count)
191 .bind(session.remote_voting_power)
192 .bind(session.quorum_remote_contribution)
193 .bind(&session.access_password)
194 .bind(session.waiting_room_enabled)
195 .bind(session.recording_enabled)
196 .bind(&session.recording_url)
197 .bind(session.updated_at)
198 .execute(&self.pool)
199 .await
200 .map_err(|e| format!("Database error updating ag_session: {}", e))?;
201
202 Ok(session.clone())
203 }
204
205 async fn delete(&self, id: Uuid) -> Result<bool, String> {
206 let result = sqlx::query("DELETE FROM ag_sessions WHERE id = $1")
207 .bind(id)
208 .execute(&self.pool)
209 .await
210 .map_err(|e| format!("Database error deleting ag_session: {}", e))?;
211
212 Ok(result.rows_affected() > 0)
213 }
214
215 async fn find_pending_start(&self) -> Result<Vec<AgSession>, String> {
216 let rows = sqlx::query(
217 r#"
218 SELECT id, organization_id, meeting_id, platform::TEXT, video_url, host_url,
219 status::TEXT, scheduled_start, actual_start, actual_end,
220 remote_attendees_count, remote_voting_power::FLOAT8,
221 quorum_remote_contribution::FLOAT8,
222 access_password, waiting_room_enabled, recording_enabled, recording_url,
223 created_at, updated_at, created_by
224 FROM ag_sessions
225 WHERE status = 'scheduled'
226 AND scheduled_start <= NOW()
227 ORDER BY scheduled_start ASC
228 "#,
229 )
230 .fetch_all(&self.pool)
231 .await
232 .map_err(|e| format!("Database error finding pending ag_sessions: {}", e))?;
233
234 Ok(rows.iter().map(row_to_ag_session).collect())
235 }
236}