koprogo_api/infrastructure/database/repositories/
user_repository_impl.rs

1use crate::application::ports::UserRepository;
2use crate::domain::entities::{User, UserRole};
3use crate::infrastructure::pool::DbPool;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use sqlx::Row;
7use uuid::Uuid;
8
9pub struct PostgresUserRepository {
10    pool: DbPool,
11}
12
13impl PostgresUserRepository {
14    pub fn new(pool: DbPool) -> Self {
15        Self { pool }
16    }
17}
18
19const USER_COLUMNS: &str = "id, email, password_hash, first_name, last_name, role, organization_id, is_active, processing_restricted, processing_restricted_at, marketing_opt_out, marketing_opt_out_at, created_at, updated_at";
20
21fn row_to_user(row: &sqlx::postgres::PgRow) -> Result<User, String> {
22    let role_str: String = row.get("role");
23    let role = role_str
24        .parse::<UserRole>()
25        .map_err(|e| format!("Invalid role: {}", e))?;
26
27    Ok(User {
28        id: row.get("id"),
29        email: row.get("email"),
30        password_hash: row.get("password_hash"),
31        first_name: row.get("first_name"),
32        last_name: row.get("last_name"),
33        role,
34        organization_id: row.get("organization_id"),
35        is_active: row.get("is_active"),
36        processing_restricted: row.get("processing_restricted"),
37        processing_restricted_at: row.get::<Option<DateTime<Utc>>, _>("processing_restricted_at"),
38        marketing_opt_out: row.get("marketing_opt_out"),
39        marketing_opt_out_at: row.get::<Option<DateTime<Utc>>, _>("marketing_opt_out_at"),
40        created_at: row.get("created_at"),
41        updated_at: row.get("updated_at"),
42    })
43}
44
45#[async_trait]
46impl UserRepository for PostgresUserRepository {
47    async fn create(&self, user: &User) -> Result<User, String> {
48        sqlx::query!(
49            r#"
50            INSERT INTO users (id, email, password_hash, first_name, last_name, role, organization_id, is_active, created_at, updated_at)
51            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
52            "#,
53            user.id,
54            user.email,
55            user.password_hash,
56            user.first_name,
57            user.last_name,
58            user.role.to_string(),
59            user.organization_id,
60            user.is_active,
61            user.created_at,
62            user.updated_at,
63        )
64        .execute(&self.pool)
65        .await
66        .map_err(|e| format!("Failed to create user: {}", e))?;
67
68        Ok(user.clone())
69    }
70
71    async fn find_by_id(&self, id: Uuid) -> Result<Option<User>, String> {
72        let sql = format!("SELECT {} FROM users WHERE id = $1", USER_COLUMNS);
73        let result = sqlx::query(&sql)
74            .bind(id)
75            .fetch_optional(&self.pool)
76            .await
77            .map_err(|e| format!("Failed to find user: {}", e))?;
78
79        match result {
80            Some(row) => Ok(Some(row_to_user(&row)?)),
81            None => Ok(None),
82        }
83    }
84
85    async fn find_by_email(&self, email: &str) -> Result<Option<User>, String> {
86        let sql = format!("SELECT {} FROM users WHERE email = $1", USER_COLUMNS);
87        let result = sqlx::query(&sql)
88            .bind(email)
89            .fetch_optional(&self.pool)
90            .await
91            .map_err(|e| format!("Failed to find user by email: {}", e))?;
92
93        match result {
94            Some(row) => Ok(Some(row_to_user(&row)?)),
95            None => Ok(None),
96        }
97    }
98
99    async fn find_all(&self) -> Result<Vec<User>, String> {
100        let sql = format!(
101            "SELECT {} FROM users ORDER BY created_at DESC",
102            USER_COLUMNS
103        );
104        let rows = sqlx::query(&sql)
105            .fetch_all(&self.pool)
106            .await
107            .map_err(|e| format!("Failed to fetch users: {}", e))?;
108
109        let mut users = Vec::new();
110        for row in &rows {
111            if let Ok(user) = row_to_user(row) {
112                users.push(user);
113            }
114        }
115
116        Ok(users)
117    }
118
119    async fn find_by_organization(&self, org_id: Uuid) -> Result<Vec<User>, String> {
120        let sql = format!(
121            "SELECT {} FROM users WHERE organization_id = $1 ORDER BY created_at DESC",
122            USER_COLUMNS
123        );
124        let rows = sqlx::query(&sql)
125            .bind(org_id)
126            .fetch_all(&self.pool)
127            .await
128            .map_err(|e| format!("Failed to fetch users by organization: {}", e))?;
129
130        let mut users = Vec::new();
131        for row in &rows {
132            if let Ok(user) = row_to_user(row) {
133                users.push(user);
134            }
135        }
136
137        Ok(users)
138    }
139
140    async fn update(&self, user: &User) -> Result<User, String> {
141        sqlx::query(
142            r#"
143            UPDATE users
144            SET email = $2, first_name = $3, last_name = $4, role = $5,
145                organization_id = $6, is_active = $7,
146                processing_restricted = $8, processing_restricted_at = $9,
147                marketing_opt_out = $10, marketing_opt_out_at = $11,
148                updated_at = $12
149            WHERE id = $1
150            "#,
151        )
152        .bind(user.id)
153        .bind(&user.email)
154        .bind(&user.first_name)
155        .bind(&user.last_name)
156        .bind(user.role.to_string())
157        .bind(user.organization_id)
158        .bind(user.is_active)
159        .bind(user.processing_restricted)
160        .bind(user.processing_restricted_at)
161        .bind(user.marketing_opt_out)
162        .bind(user.marketing_opt_out_at)
163        .bind(user.updated_at)
164        .execute(&self.pool)
165        .await
166        .map_err(|e| format!("Failed to update user: {}", e))?;
167
168        Ok(user.clone())
169    }
170
171    async fn delete(&self, id: Uuid) -> Result<bool, String> {
172        let result = sqlx::query!(
173            r#"
174            DELETE FROM users
175            WHERE id = $1
176            "#,
177            id
178        )
179        .execute(&self.pool)
180        .await
181        .map_err(|e| format!("Failed to delete user: {}", e))?;
182
183        Ok(result.rows_affected() > 0)
184    }
185
186    async fn count_by_organization(&self, org_id: Uuid) -> Result<i64, String> {
187        let result = sqlx::query!(
188            r#"
189            SELECT COUNT(*) as count
190            FROM users
191            WHERE organization_id = $1
192            "#,
193            org_id
194        )
195        .fetch_one(&self.pool)
196        .await
197        .map_err(|e| format!("Failed to count users: {}", e))?;
198
199        Ok(result.count.unwrap_or(0))
200    }
201}