koprogo_api/infrastructure/database/repositories/
user_repository_impl.rs

1use crate::application::ports::UserRepository;
2use crate::domain::entities::{User, UserRole};
3use crate::infrastructure::pool::DbPool;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use sqlx::Row;
7use uuid::Uuid;
8
9pub struct PostgresUserRepository {
10    pool: DbPool,
11}
12
13impl PostgresUserRepository {
14    pub fn new(pool: DbPool) -> Self {
15        Self { pool }
16    }
17}
18
19const USER_COLUMNS: &str = "id, email, password_hash, first_name, last_name, role, organization_id, is_active, processing_restricted, processing_restricted_at, marketing_opt_out, marketing_opt_out_at, created_at, updated_at";
20
21fn row_to_user(row: &sqlx::postgres::PgRow) -> Result<User, String> {
22    let role_str: String = row.get("role");
23    let role = role_str
24        .parse::<UserRole>()
25        .map_err(|e| format!("Invalid role: {}", e))?;
26
27    Ok(User {
28        id: row.get("id"),
29        email: row.get("email"),
30        password_hash: row.get("password_hash"),
31        first_name: row.get("first_name"),
32        last_name: row.get("last_name"),
33        role,
34        organization_id: row.get("organization_id"),
35        is_active: row.get("is_active"),
36        processing_restricted: row.get("processing_restricted"),
37        processing_restricted_at: row.get::<Option<DateTime<Utc>>, _>("processing_restricted_at"),
38        marketing_opt_out: row.get("marketing_opt_out"),
39        marketing_opt_out_at: row.get::<Option<DateTime<Utc>>, _>("marketing_opt_out_at"),
40        created_at: row.get("created_at"),
41        updated_at: row.get("updated_at"),
42    })
43}
44
45#[async_trait]
46impl UserRepository for PostgresUserRepository {
47    async fn create(&self, user: &User) -> Result<User, String> {
48        sqlx::query!(
49            r#"
50            INSERT INTO users (id, email, password_hash, first_name, last_name, role, organization_id, is_active, created_at, updated_at)
51            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
52            "#,
53            user.id,
54            user.email,
55            user.password_hash,
56            user.first_name,
57            user.last_name,
58            user.role.to_string(),
59            user.organization_id,
60            user.is_active,
61            user.created_at,
62            user.updated_at,
63        )
64        .execute(&self.pool)
65        .await
66        .map_err(|e| {
67            if let sqlx::Error::Database(ref db_err) = e {
68                if db_err.is_unique_violation() {
69                    return "email_exists".to_string();
70                }
71            }
72            format!("Failed to create user: {}", e)
73        })?;
74
75        Ok(user.clone())
76    }
77
78    async fn find_by_id(&self, id: Uuid) -> Result<Option<User>, String> {
79        let sql = format!("SELECT {} FROM users WHERE id = $1", USER_COLUMNS);
80        let result = sqlx::query(&sql)
81            .bind(id)
82            .fetch_optional(&self.pool)
83            .await
84            .map_err(|e| format!("Failed to find user: {}", e))?;
85
86        match result {
87            Some(row) => Ok(Some(row_to_user(&row)?)),
88            None => Ok(None),
89        }
90    }
91
92    async fn find_by_email(&self, email: &str) -> Result<Option<User>, String> {
93        let sql = format!("SELECT {} FROM users WHERE email = $1", USER_COLUMNS);
94        let result = sqlx::query(&sql)
95            .bind(email)
96            .fetch_optional(&self.pool)
97            .await
98            .map_err(|e| format!("Failed to find user by email: {}", e))?;
99
100        match result {
101            Some(row) => Ok(Some(row_to_user(&row)?)),
102            None => Ok(None),
103        }
104    }
105
106    async fn find_all(&self) -> Result<Vec<User>, String> {
107        let sql = format!(
108            "SELECT {} FROM users ORDER BY created_at DESC",
109            USER_COLUMNS
110        );
111        let rows = sqlx::query(&sql)
112            .fetch_all(&self.pool)
113            .await
114            .map_err(|e| format!("Failed to fetch users: {}", e))?;
115
116        let mut users = Vec::new();
117        for row in &rows {
118            if let Ok(user) = row_to_user(row) {
119                users.push(user);
120            }
121        }
122
123        Ok(users)
124    }
125
126    async fn find_by_organization(&self, org_id: Uuid) -> Result<Vec<User>, String> {
127        let sql = format!(
128            "SELECT {} FROM users WHERE organization_id = $1 ORDER BY created_at DESC",
129            USER_COLUMNS
130        );
131        let rows = sqlx::query(&sql)
132            .bind(org_id)
133            .fetch_all(&self.pool)
134            .await
135            .map_err(|e| format!("Failed to fetch users by organization: {}", e))?;
136
137        let mut users = Vec::new();
138        for row in &rows {
139            if let Ok(user) = row_to_user(row) {
140                users.push(user);
141            }
142        }
143
144        Ok(users)
145    }
146
147    async fn update(&self, user: &User) -> Result<User, String> {
148        sqlx::query(
149            r#"
150            UPDATE users
151            SET email = $2, first_name = $3, last_name = $4, role = $5,
152                organization_id = $6, is_active = $7,
153                processing_restricted = $8, processing_restricted_at = $9,
154                marketing_opt_out = $10, marketing_opt_out_at = $11,
155                updated_at = $12
156            WHERE id = $1
157            "#,
158        )
159        .bind(user.id)
160        .bind(&user.email)
161        .bind(&user.first_name)
162        .bind(&user.last_name)
163        .bind(user.role.to_string())
164        .bind(user.organization_id)
165        .bind(user.is_active)
166        .bind(user.processing_restricted)
167        .bind(user.processing_restricted_at)
168        .bind(user.marketing_opt_out)
169        .bind(user.marketing_opt_out_at)
170        .bind(user.updated_at)
171        .execute(&self.pool)
172        .await
173        .map_err(|e| {
174            if let sqlx::Error::Database(ref db_err) = e {
175                if db_err.is_unique_violation() {
176                    return "email_exists".to_string();
177                }
178            }
179            format!("Failed to update user: {}", e)
180        })?;
181
182        Ok(user.clone())
183    }
184
185    async fn update_password(&self, id: Uuid, password_hash: &str) -> Result<bool, String> {
186        let result =
187            sqlx::query("UPDATE users SET password_hash = $1, updated_at = NOW() WHERE id = $2")
188                .bind(password_hash)
189                .bind(id)
190                .execute(&self.pool)
191                .await
192                .map_err(|e| format!("Failed to update password: {}", e))?;
193
194        Ok(result.rows_affected() > 0)
195    }
196
197    async fn activate(&self, id: Uuid) -> Result<Option<User>, String> {
198        let result = sqlx::query(
199            "UPDATE users SET is_active = true, updated_at = NOW() WHERE id = $1 RETURNING id",
200        )
201        .bind(id)
202        .fetch_optional(&self.pool)
203        .await
204        .map_err(|e| format!("Failed to activate user: {}", e))?;
205
206        if result.is_none() {
207            return Ok(None);
208        }
209        self.find_by_id(id).await
210    }
211
212    async fn deactivate(&self, id: Uuid) -> Result<Option<User>, String> {
213        let result = sqlx::query(
214            "UPDATE users SET is_active = false, updated_at = NOW() WHERE id = $1 RETURNING id",
215        )
216        .bind(id)
217        .fetch_optional(&self.pool)
218        .await
219        .map_err(|e| format!("Failed to deactivate user: {}", e))?;
220
221        if result.is_none() {
222            return Ok(None);
223        }
224        self.find_by_id(id).await
225    }
226
227    async fn delete(&self, id: Uuid) -> Result<bool, String> {
228        let result = sqlx::query!(
229            r#"
230            DELETE FROM users
231            WHERE id = $1
232            "#,
233            id
234        )
235        .execute(&self.pool)
236        .await
237        .map_err(|e| format!("Failed to delete user: {}", e))?;
238
239        Ok(result.rows_affected() > 0)
240    }
241
242    async fn count_by_organization(&self, org_id: Uuid) -> Result<i64, String> {
243        let result = sqlx::query!(
244            r#"
245            SELECT COUNT(*) as count
246            FROM users
247            WHERE organization_id = $1
248            "#,
249            org_id
250        )
251        .fetch_one(&self.pool)
252        .await
253        .map_err(|e| format!("Failed to count users: {}", e))?;
254
255        Ok(result.count.unwrap_or(0))
256    }
257}