koprogo_api/infrastructure/database/repositories/
user_repository_impl.rs1use crate::application::ports::UserRepository;
2use crate::domain::entities::{User, UserRole};
3use crate::infrastructure::pool::DbPool;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use sqlx::Row;
7use uuid::Uuid;
8
9pub struct PostgresUserRepository {
10 pool: DbPool,
11}
12
13impl PostgresUserRepository {
14 pub fn new(pool: DbPool) -> Self {
15 Self { pool }
16 }
17}
18
19const USER_COLUMNS: &str = "id, email, password_hash, first_name, last_name, role, organization_id, is_active, processing_restricted, processing_restricted_at, marketing_opt_out, marketing_opt_out_at, created_at, updated_at";
20
21fn row_to_user(row: &sqlx::postgres::PgRow) -> Result<User, String> {
22 let role_str: String = row.get("role");
23 let role = role_str
24 .parse::<UserRole>()
25 .map_err(|e| format!("Invalid role: {}", e))?;
26
27 Ok(User {
28 id: row.get("id"),
29 email: row.get("email"),
30 password_hash: row.get("password_hash"),
31 first_name: row.get("first_name"),
32 last_name: row.get("last_name"),
33 role,
34 organization_id: row.get("organization_id"),
35 is_active: row.get("is_active"),
36 processing_restricted: row.get("processing_restricted"),
37 processing_restricted_at: row.get::<Option<DateTime<Utc>>, _>("processing_restricted_at"),
38 marketing_opt_out: row.get("marketing_opt_out"),
39 marketing_opt_out_at: row.get::<Option<DateTime<Utc>>, _>("marketing_opt_out_at"),
40 created_at: row.get("created_at"),
41 updated_at: row.get("updated_at"),
42 })
43}
44
45#[async_trait]
46impl UserRepository for PostgresUserRepository {
47 async fn create(&self, user: &User) -> Result<User, String> {
48 sqlx::query!(
49 r#"
50 INSERT INTO users (id, email, password_hash, first_name, last_name, role, organization_id, is_active, created_at, updated_at)
51 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
52 "#,
53 user.id,
54 user.email,
55 user.password_hash,
56 user.first_name,
57 user.last_name,
58 user.role.to_string(),
59 user.organization_id,
60 user.is_active,
61 user.created_at,
62 user.updated_at,
63 )
64 .execute(&self.pool)
65 .await
66 .map_err(|e| format!("Failed to create user: {}", e))?;
67
68 Ok(user.clone())
69 }
70
71 async fn find_by_id(&self, id: Uuid) -> Result<Option<User>, String> {
72 let sql = format!("SELECT {} FROM users WHERE id = $1", USER_COLUMNS);
73 let result = sqlx::query(&sql)
74 .bind(id)
75 .fetch_optional(&self.pool)
76 .await
77 .map_err(|e| format!("Failed to find user: {}", e))?;
78
79 match result {
80 Some(row) => Ok(Some(row_to_user(&row)?)),
81 None => Ok(None),
82 }
83 }
84
85 async fn find_by_email(&self, email: &str) -> Result<Option<User>, String> {
86 let sql = format!("SELECT {} FROM users WHERE email = $1", USER_COLUMNS);
87 let result = sqlx::query(&sql)
88 .bind(email)
89 .fetch_optional(&self.pool)
90 .await
91 .map_err(|e| format!("Failed to find user by email: {}", e))?;
92
93 match result {
94 Some(row) => Ok(Some(row_to_user(&row)?)),
95 None => Ok(None),
96 }
97 }
98
99 async fn find_all(&self) -> Result<Vec<User>, String> {
100 let sql = format!(
101 "SELECT {} FROM users ORDER BY created_at DESC",
102 USER_COLUMNS
103 );
104 let rows = sqlx::query(&sql)
105 .fetch_all(&self.pool)
106 .await
107 .map_err(|e| format!("Failed to fetch users: {}", e))?;
108
109 let mut users = Vec::new();
110 for row in &rows {
111 if let Ok(user) = row_to_user(row) {
112 users.push(user);
113 }
114 }
115
116 Ok(users)
117 }
118
119 async fn find_by_organization(&self, org_id: Uuid) -> Result<Vec<User>, String> {
120 let sql = format!(
121 "SELECT {} FROM users WHERE organization_id = $1 ORDER BY created_at DESC",
122 USER_COLUMNS
123 );
124 let rows = sqlx::query(&sql)
125 .bind(org_id)
126 .fetch_all(&self.pool)
127 .await
128 .map_err(|e| format!("Failed to fetch users by organization: {}", e))?;
129
130 let mut users = Vec::new();
131 for row in &rows {
132 if let Ok(user) = row_to_user(row) {
133 users.push(user);
134 }
135 }
136
137 Ok(users)
138 }
139
140 async fn update(&self, user: &User) -> Result<User, String> {
141 sqlx::query(
142 r#"
143 UPDATE users
144 SET email = $2, first_name = $3, last_name = $4, role = $5,
145 organization_id = $6, is_active = $7,
146 processing_restricted = $8, processing_restricted_at = $9,
147 marketing_opt_out = $10, marketing_opt_out_at = $11,
148 updated_at = $12
149 WHERE id = $1
150 "#,
151 )
152 .bind(user.id)
153 .bind(&user.email)
154 .bind(&user.first_name)
155 .bind(&user.last_name)
156 .bind(user.role.to_string())
157 .bind(user.organization_id)
158 .bind(user.is_active)
159 .bind(user.processing_restricted)
160 .bind(user.processing_restricted_at)
161 .bind(user.marketing_opt_out)
162 .bind(user.marketing_opt_out_at)
163 .bind(user.updated_at)
164 .execute(&self.pool)
165 .await
166 .map_err(|e| format!("Failed to update user: {}", e))?;
167
168 Ok(user.clone())
169 }
170
171 async fn delete(&self, id: Uuid) -> Result<bool, String> {
172 let result = sqlx::query!(
173 r#"
174 DELETE FROM users
175 WHERE id = $1
176 "#,
177 id
178 )
179 .execute(&self.pool)
180 .await
181 .map_err(|e| format!("Failed to delete user: {}", e))?;
182
183 Ok(result.rows_affected() > 0)
184 }
185
186 async fn count_by_organization(&self, org_id: Uuid) -> Result<i64, String> {
187 let result = sqlx::query!(
188 r#"
189 SELECT COUNT(*) as count
190 FROM users
191 WHERE organization_id = $1
192 "#,
193 org_id
194 )
195 .fetch_one(&self.pool)
196 .await
197 .map_err(|e| format!("Failed to count users: {}", e))?;
198
199 Ok(result.count.unwrap_or(0))
200 }
201}