koprogo_api/infrastructure/database/repositories/
user_repository_impl.rs1use crate::application::ports::UserRepository;
2use crate::domain::entities::{User, UserRole};
3use crate::infrastructure::pool::DbPool;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use sqlx::Row;
7use uuid::Uuid;
8
9pub struct PostgresUserRepository {
10 pool: DbPool,
11}
12
13impl PostgresUserRepository {
14 pub fn new(pool: DbPool) -> Self {
15 Self { pool }
16 }
17}
18
19const USER_COLUMNS: &str = "id, email, password_hash, first_name, last_name, role, organization_id, is_active, processing_restricted, processing_restricted_at, marketing_opt_out, marketing_opt_out_at, created_at, updated_at";
20
21fn row_to_user(row: &sqlx::postgres::PgRow) -> Result<User, String> {
22 let role_str: String = row.get("role");
23 let role = role_str
24 .parse::<UserRole>()
25 .map_err(|e| format!("Invalid role: {}", e))?;
26
27 Ok(User {
28 id: row.get("id"),
29 email: row.get("email"),
30 password_hash: row.get("password_hash"),
31 first_name: row.get("first_name"),
32 last_name: row.get("last_name"),
33 role,
34 organization_id: row.get("organization_id"),
35 is_active: row.get("is_active"),
36 processing_restricted: row.get("processing_restricted"),
37 processing_restricted_at: row.get::<Option<DateTime<Utc>>, _>("processing_restricted_at"),
38 marketing_opt_out: row.get("marketing_opt_out"),
39 marketing_opt_out_at: row.get::<Option<DateTime<Utc>>, _>("marketing_opt_out_at"),
40 created_at: row.get("created_at"),
41 updated_at: row.get("updated_at"),
42 })
43}
44
45#[async_trait]
46impl UserRepository for PostgresUserRepository {
47 async fn create(&self, user: &User) -> Result<User, String> {
48 sqlx::query!(
49 r#"
50 INSERT INTO users (id, email, password_hash, first_name, last_name, role, organization_id, is_active, created_at, updated_at)
51 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
52 "#,
53 user.id,
54 user.email,
55 user.password_hash,
56 user.first_name,
57 user.last_name,
58 user.role.to_string(),
59 user.organization_id,
60 user.is_active,
61 user.created_at,
62 user.updated_at,
63 )
64 .execute(&self.pool)
65 .await
66 .map_err(|e| {
67 if let sqlx::Error::Database(ref db_err) = e {
68 if db_err.is_unique_violation() {
69 return "email_exists".to_string();
70 }
71 }
72 format!("Failed to create user: {}", e)
73 })?;
74
75 Ok(user.clone())
76 }
77
78 async fn find_by_id(&self, id: Uuid) -> Result<Option<User>, String> {
79 let sql = format!("SELECT {} FROM users WHERE id = $1", USER_COLUMNS);
80 let result = sqlx::query(&sql)
81 .bind(id)
82 .fetch_optional(&self.pool)
83 .await
84 .map_err(|e| format!("Failed to find user: {}", e))?;
85
86 match result {
87 Some(row) => Ok(Some(row_to_user(&row)?)),
88 None => Ok(None),
89 }
90 }
91
92 async fn find_by_email(&self, email: &str) -> Result<Option<User>, String> {
93 let sql = format!("SELECT {} FROM users WHERE email = $1", USER_COLUMNS);
94 let result = sqlx::query(&sql)
95 .bind(email)
96 .fetch_optional(&self.pool)
97 .await
98 .map_err(|e| format!("Failed to find user by email: {}", e))?;
99
100 match result {
101 Some(row) => Ok(Some(row_to_user(&row)?)),
102 None => Ok(None),
103 }
104 }
105
106 async fn find_all(&self) -> Result<Vec<User>, String> {
107 let sql = format!(
108 "SELECT {} FROM users ORDER BY created_at DESC",
109 USER_COLUMNS
110 );
111 let rows = sqlx::query(&sql)
112 .fetch_all(&self.pool)
113 .await
114 .map_err(|e| format!("Failed to fetch users: {}", e))?;
115
116 let mut users = Vec::new();
117 for row in &rows {
118 if let Ok(user) = row_to_user(row) {
119 users.push(user);
120 }
121 }
122
123 Ok(users)
124 }
125
126 async fn find_by_organization(&self, org_id: Uuid) -> Result<Vec<User>, String> {
127 let sql = format!(
128 "SELECT {} FROM users WHERE organization_id = $1 ORDER BY created_at DESC",
129 USER_COLUMNS
130 );
131 let rows = sqlx::query(&sql)
132 .bind(org_id)
133 .fetch_all(&self.pool)
134 .await
135 .map_err(|e| format!("Failed to fetch users by organization: {}", e))?;
136
137 let mut users = Vec::new();
138 for row in &rows {
139 if let Ok(user) = row_to_user(row) {
140 users.push(user);
141 }
142 }
143
144 Ok(users)
145 }
146
147 async fn update(&self, user: &User) -> Result<User, String> {
148 sqlx::query(
149 r#"
150 UPDATE users
151 SET email = $2, first_name = $3, last_name = $4, role = $5,
152 organization_id = $6, is_active = $7,
153 processing_restricted = $8, processing_restricted_at = $9,
154 marketing_opt_out = $10, marketing_opt_out_at = $11,
155 updated_at = $12
156 WHERE id = $1
157 "#,
158 )
159 .bind(user.id)
160 .bind(&user.email)
161 .bind(&user.first_name)
162 .bind(&user.last_name)
163 .bind(user.role.to_string())
164 .bind(user.organization_id)
165 .bind(user.is_active)
166 .bind(user.processing_restricted)
167 .bind(user.processing_restricted_at)
168 .bind(user.marketing_opt_out)
169 .bind(user.marketing_opt_out_at)
170 .bind(user.updated_at)
171 .execute(&self.pool)
172 .await
173 .map_err(|e| {
174 if let sqlx::Error::Database(ref db_err) = e {
175 if db_err.is_unique_violation() {
176 return "email_exists".to_string();
177 }
178 }
179 format!("Failed to update user: {}", e)
180 })?;
181
182 Ok(user.clone())
183 }
184
185 async fn update_password(&self, id: Uuid, password_hash: &str) -> Result<bool, String> {
186 let result =
187 sqlx::query("UPDATE users SET password_hash = $1, updated_at = NOW() WHERE id = $2")
188 .bind(password_hash)
189 .bind(id)
190 .execute(&self.pool)
191 .await
192 .map_err(|e| format!("Failed to update password: {}", e))?;
193
194 Ok(result.rows_affected() > 0)
195 }
196
197 async fn activate(&self, id: Uuid) -> Result<Option<User>, String> {
198 let result = sqlx::query(
199 "UPDATE users SET is_active = true, updated_at = NOW() WHERE id = $1 RETURNING id",
200 )
201 .bind(id)
202 .fetch_optional(&self.pool)
203 .await
204 .map_err(|e| format!("Failed to activate user: {}", e))?;
205
206 if result.is_none() {
207 return Ok(None);
208 }
209 self.find_by_id(id).await
210 }
211
212 async fn deactivate(&self, id: Uuid) -> Result<Option<User>, String> {
213 let result = sqlx::query(
214 "UPDATE users SET is_active = false, updated_at = NOW() WHERE id = $1 RETURNING id",
215 )
216 .bind(id)
217 .fetch_optional(&self.pool)
218 .await
219 .map_err(|e| format!("Failed to deactivate user: {}", e))?;
220
221 if result.is_none() {
222 return Ok(None);
223 }
224 self.find_by_id(id).await
225 }
226
227 async fn delete(&self, id: Uuid) -> Result<bool, String> {
228 let result = sqlx::query!(
229 r#"
230 DELETE FROM users
231 WHERE id = $1
232 "#,
233 id
234 )
235 .execute(&self.pool)
236 .await
237 .map_err(|e| format!("Failed to delete user: {}", e))?;
238
239 Ok(result.rows_affected() > 0)
240 }
241
242 async fn count_by_organization(&self, org_id: Uuid) -> Result<i64, String> {
243 let result = sqlx::query!(
244 r#"
245 SELECT COUNT(*) as count
246 FROM users
247 WHERE organization_id = $1
248 "#,
249 org_id
250 )
251 .fetch_one(&self.pool)
252 .await
253 .map_err(|e| format!("Failed to count users: {}", e))?;
254
255 Ok(result.count.unwrap_or(0))
256 }
257}