koprogo_api/infrastructure/database/repositories/
audit_log_repository_impl.rs

1use crate::application::dto::PageRequest;
2use crate::application::ports::{AuditLogFilters, AuditLogRepository};
3use crate::infrastructure::audit::{AuditEventType, AuditLogEntry};
4use crate::infrastructure::database::pool::DbPool;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use sqlx::Row;
8use uuid::Uuid;
9
10pub struct PostgresAuditLogRepository {
11    pool: DbPool,
12}
13
14impl PostgresAuditLogRepository {
15    pub fn new(pool: DbPool) -> Self {
16        Self { pool }
17    }
18
19    /// Convert AuditEventType to string for database storage
20    fn event_type_to_string(event_type: &AuditEventType) -> String {
21        format!("{:?}", event_type)
22    }
23
24    /// Convert string from database to AuditEventType
25    fn string_to_event_type(s: &str) -> AuditEventType {
26        match s {
27            "UserLogin" => AuditEventType::UserLogin,
28            "UserLogout" => AuditEventType::UserLogout,
29            "UserRegistration" => AuditEventType::UserRegistration,
30            "TokenRefresh" => AuditEventType::TokenRefresh,
31            "AuthenticationFailed" => AuditEventType::AuthenticationFailed,
32            "BuildingCreated" => AuditEventType::BuildingCreated,
33            "BuildingUpdated" => AuditEventType::BuildingUpdated,
34            "BuildingDeleted" => AuditEventType::BuildingDeleted,
35            "UnitCreated" => AuditEventType::UnitCreated,
36            "UnitAssignedToOwner" => AuditEventType::UnitAssignedToOwner,
37            "OwnerCreated" => AuditEventType::OwnerCreated,
38            "OwnerUpdated" => AuditEventType::OwnerUpdated,
39            "ExpenseCreated" => AuditEventType::ExpenseCreated,
40            "ExpenseMarkedPaid" => AuditEventType::ExpenseMarkedPaid,
41            "MeetingCreated" => AuditEventType::MeetingCreated,
42            "MeetingCompleted" => AuditEventType::MeetingCompleted,
43            "DocumentUploaded" => AuditEventType::DocumentUploaded,
44            "DocumentDeleted" => AuditEventType::DocumentDeleted,
45            "UnauthorizedAccess" => AuditEventType::UnauthorizedAccess,
46            "RateLimitExceeded" => AuditEventType::RateLimitExceeded,
47            "InvalidToken" => AuditEventType::InvalidToken,
48            _ => AuditEventType::UnauthorizedAccess, // Default fallback
49        }
50    }
51
52    /// Map database row to AuditLogEntry
53    fn row_to_entry(row: &sqlx::postgres::PgRow) -> AuditLogEntry {
54        let event_type_str: String = row.get("event_type");
55        let metadata_json: Option<serde_json::Value> = row.get("metadata");
56
57        AuditLogEntry {
58            id: row.get("id"),
59            timestamp: row.get("timestamp"),
60            event_type: Self::string_to_event_type(&event_type_str),
61            user_id: row.get("user_id"),
62            organization_id: row.get("organization_id"),
63            resource_type: row.get("resource_type"),
64            resource_id: row.get("resource_id"),
65            ip_address: row.get("ip_address"),
66            user_agent: row.get("user_agent"),
67            metadata: metadata_json,
68            success: row.get("success"),
69            error_message: row.get("error_message"),
70        }
71    }
72}
73
74#[async_trait]
75impl AuditLogRepository for PostgresAuditLogRepository {
76    async fn create(&self, entry: &AuditLogEntry) -> Result<AuditLogEntry, String> {
77        let event_type_str = Self::event_type_to_string(&entry.event_type);
78
79        sqlx::query(
80            r#"
81            INSERT INTO audit_logs (
82                id, timestamp, event_type, user_id, organization_id,
83                resource_type, resource_id, ip_address, user_agent,
84                metadata, success, error_message, created_at
85            )
86            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
87            "#,
88        )
89        .bind(entry.id)
90        .bind(entry.timestamp)
91        .bind(event_type_str)
92        .bind(entry.user_id)
93        .bind(entry.organization_id)
94        .bind(&entry.resource_type)
95        .bind(entry.resource_id)
96        .bind(&entry.ip_address)
97        .bind(&entry.user_agent)
98        .bind(&entry.metadata)
99        .bind(entry.success)
100        .bind(&entry.error_message)
101        .bind(Utc::now())
102        .execute(&self.pool)
103        .await
104        .map_err(|e| format!("Database error: {}", e))?;
105
106        Ok(entry.clone())
107    }
108
109    async fn find_by_id(&self, id: Uuid) -> Result<Option<AuditLogEntry>, String> {
110        let row = sqlx::query(
111            r#"
112            SELECT id, timestamp, event_type, user_id, organization_id,
113                   resource_type, resource_id, ip_address, user_agent,
114                   metadata, success, error_message
115            FROM audit_logs
116            WHERE id = $1
117            "#,
118        )
119        .bind(id)
120        .fetch_optional(&self.pool)
121        .await
122        .map_err(|e| format!("Database error: {}", e))?;
123
124        Ok(row.map(|r| Self::row_to_entry(&r)))
125    }
126
127    async fn find_all_paginated(
128        &self,
129        page_request: &PageRequest,
130        filters: &AuditLogFilters,
131    ) -> Result<(Vec<AuditLogEntry>, i64), String> {
132        let limit = page_request.per_page.min(100);
133        let offset = (page_request.page - 1) * limit;
134
135        // Build WHERE clause dynamically based on filters
136        let mut where_clauses = Vec::new();
137        let mut param_index = 1;
138
139        if filters.user_id.is_some() {
140            where_clauses.push(format!("user_id = ${}", param_index));
141            param_index += 1;
142        }
143        if filters.organization_id.is_some() {
144            where_clauses.push(format!("organization_id = ${}", param_index));
145            param_index += 1;
146        }
147        if filters.event_type.is_some() {
148            where_clauses.push(format!("event_type = ${}", param_index));
149            param_index += 1;
150        }
151        if filters.success.is_some() {
152            where_clauses.push(format!("success = ${}", param_index));
153            param_index += 1;
154        }
155        if filters.start_date.is_some() {
156            where_clauses.push(format!("timestamp >= ${}", param_index));
157            param_index += 1;
158        }
159        if filters.end_date.is_some() {
160            where_clauses.push(format!("timestamp <= ${}", param_index));
161            param_index += 1;
162        }
163        if filters.resource_type.is_some() {
164            where_clauses.push(format!("resource_type = ${}", param_index));
165            param_index += 1;
166        }
167        if filters.resource_id.is_some() {
168            where_clauses.push(format!("resource_id = ${}", param_index));
169            param_index += 1;
170        }
171
172        let where_clause = if where_clauses.is_empty() {
173            String::new()
174        } else {
175            format!("WHERE {}", where_clauses.join(" AND "))
176        };
177
178        // Count total matching records
179        let count_query = format!("SELECT COUNT(*) as count FROM audit_logs {}", where_clause);
180        let mut count_query_builder = sqlx::query(&count_query);
181
182        // Bind parameters for count query
183        if let Some(user_id) = filters.user_id {
184            count_query_builder = count_query_builder.bind(user_id);
185        }
186        if let Some(org_id) = filters.organization_id {
187            count_query_builder = count_query_builder.bind(org_id);
188        }
189        if let Some(ref event_type) = filters.event_type {
190            count_query_builder = count_query_builder.bind(Self::event_type_to_string(event_type));
191        }
192        if let Some(success) = filters.success {
193            count_query_builder = count_query_builder.bind(success);
194        }
195        if let Some(start_date) = filters.start_date {
196            count_query_builder = count_query_builder.bind(start_date);
197        }
198        if let Some(end_date) = filters.end_date {
199            count_query_builder = count_query_builder.bind(end_date);
200        }
201        if let Some(ref resource_type) = filters.resource_type {
202            count_query_builder = count_query_builder.bind(resource_type);
203        }
204        if let Some(resource_id) = filters.resource_id {
205            count_query_builder = count_query_builder.bind(resource_id);
206        }
207
208        let count_row = count_query_builder
209            .fetch_one(&self.pool)
210            .await
211            .map_err(|e| format!("Database error: {}", e))?;
212        let total: i64 = count_row.get("count");
213
214        // Fetch paginated results
215        let data_query = format!(
216            r#"
217            SELECT id, timestamp, event_type, user_id, organization_id,
218                   resource_type, resource_id, ip_address, user_agent,
219                   metadata, success, error_message
220            FROM audit_logs
221            {}
222            ORDER BY timestamp DESC
223            LIMIT ${} OFFSET ${}
224            "#,
225            where_clause,
226            param_index,
227            param_index + 1
228        );
229
230        let mut data_query_builder = sqlx::query(&data_query);
231
232        // Bind parameters for data query
233        if let Some(user_id) = filters.user_id {
234            data_query_builder = data_query_builder.bind(user_id);
235        }
236        if let Some(org_id) = filters.organization_id {
237            data_query_builder = data_query_builder.bind(org_id);
238        }
239        if let Some(ref event_type) = filters.event_type {
240            data_query_builder = data_query_builder.bind(Self::event_type_to_string(event_type));
241        }
242        if let Some(success) = filters.success {
243            data_query_builder = data_query_builder.bind(success);
244        }
245        if let Some(start_date) = filters.start_date {
246            data_query_builder = data_query_builder.bind(start_date);
247        }
248        if let Some(end_date) = filters.end_date {
249            data_query_builder = data_query_builder.bind(end_date);
250        }
251        if let Some(ref resource_type) = filters.resource_type {
252            data_query_builder = data_query_builder.bind(resource_type);
253        }
254        if let Some(resource_id) = filters.resource_id {
255            data_query_builder = data_query_builder.bind(resource_id);
256        }
257
258        data_query_builder = data_query_builder.bind(limit).bind(offset);
259
260        let rows = data_query_builder
261            .fetch_all(&self.pool)
262            .await
263            .map_err(|e| format!("Database error: {}", e))?;
264
265        let entries: Vec<AuditLogEntry> = rows.iter().map(Self::row_to_entry).collect();
266
267        Ok((entries, total))
268    }
269
270    async fn find_recent(&self, limit: i64) -> Result<Vec<AuditLogEntry>, String> {
271        let rows = sqlx::query(
272            r#"
273            SELECT id, timestamp, event_type, user_id, organization_id,
274                   resource_type, resource_id, ip_address, user_agent,
275                   metadata, success, error_message
276            FROM audit_logs
277            ORDER BY timestamp DESC
278            LIMIT $1
279            "#,
280        )
281        .bind(limit)
282        .fetch_all(&self.pool)
283        .await
284        .map_err(|e| format!("Database error: {}", e))?;
285
286        Ok(rows.iter().map(Self::row_to_entry).collect())
287    }
288
289    async fn find_failed_operations(
290        &self,
291        page_request: &PageRequest,
292        organization_id: Option<Uuid>,
293    ) -> Result<(Vec<AuditLogEntry>, i64), String> {
294        let limit = page_request.per_page.min(100);
295        let offset = (page_request.page - 1) * limit;
296
297        let where_clause = if organization_id.is_some() {
298            "WHERE success = false AND organization_id = $1"
299        } else {
300            "WHERE success = false"
301        };
302
303        // Count
304        let count_query = format!("SELECT COUNT(*) as count FROM audit_logs {}", where_clause);
305        let count_row = if let Some(org_id) = organization_id {
306            sqlx::query(&count_query)
307                .bind(org_id)
308                .fetch_one(&self.pool)
309                .await
310                .map_err(|e| format!("Database error: {}", e))?
311        } else {
312            sqlx::query(&count_query)
313                .fetch_one(&self.pool)
314                .await
315                .map_err(|e| format!("Database error: {}", e))?
316        };
317        let total: i64 = count_row.get("count");
318
319        // Fetch data
320        let data_query = format!(
321            r#"
322            SELECT id, timestamp, event_type, user_id, organization_id,
323                   resource_type, resource_id, ip_address, user_agent,
324                   metadata, success, error_message
325            FROM audit_logs
326            {}
327            ORDER BY timestamp DESC
328            LIMIT $2 OFFSET $3
329            "#,
330            where_clause
331        );
332
333        let rows = if let Some(org_id) = organization_id {
334            sqlx::query(&data_query)
335                .bind(org_id)
336                .bind(limit)
337                .bind(offset)
338                .fetch_all(&self.pool)
339                .await
340                .map_err(|e| format!("Database error: {}", e))?
341        } else {
342            sqlx::query(&data_query)
343                .bind(limit)
344                .bind(offset)
345                .fetch_all(&self.pool)
346                .await
347                .map_err(|e| format!("Database error: {}", e))?
348        };
349
350        let entries: Vec<AuditLogEntry> = rows.iter().map(Self::row_to_entry).collect();
351
352        Ok((entries, total))
353    }
354
355    async fn delete_older_than(&self, timestamp: DateTime<Utc>) -> Result<i64, String> {
356        let result = sqlx::query(
357            r#"
358            DELETE FROM audit_logs
359            WHERE timestamp < $1
360            "#,
361        )
362        .bind(timestamp)
363        .execute(&self.pool)
364        .await
365        .map_err(|e| format!("Database error: {}", e))?;
366
367        Ok(result.rows_affected() as i64)
368    }
369
370    async fn count_by_filters(&self, filters: &AuditLogFilters) -> Result<i64, String> {
371        let mut where_clauses = Vec::new();
372        let mut param_index = 1;
373
374        if filters.user_id.is_some() {
375            where_clauses.push(format!("user_id = ${}", param_index));
376            param_index += 1;
377        }
378        if filters.organization_id.is_some() {
379            where_clauses.push(format!("organization_id = ${}", param_index));
380            param_index += 1;
381        }
382        if filters.event_type.is_some() {
383            where_clauses.push(format!("event_type = ${}", param_index));
384            param_index += 1;
385        }
386        if filters.success.is_some() {
387            where_clauses.push(format!("success = ${}", param_index));
388            param_index += 1;
389        }
390        if filters.start_date.is_some() {
391            where_clauses.push(format!("timestamp >= ${}", param_index));
392            param_index += 1;
393        }
394        if filters.end_date.is_some() {
395            where_clauses.push(format!("timestamp <= ${}", param_index));
396            param_index += 1;
397        }
398        if filters.resource_type.is_some() {
399            where_clauses.push(format!("resource_type = ${}", param_index));
400            param_index += 1;
401        }
402        if filters.resource_id.is_some() {
403            where_clauses.push(format!("resource_id = ${}", param_index));
404        }
405
406        let where_clause = if where_clauses.is_empty() {
407            String::new()
408        } else {
409            format!("WHERE {}", where_clauses.join(" AND "))
410        };
411
412        let count_query = format!("SELECT COUNT(*) as count FROM audit_logs {}", where_clause);
413        let mut query_builder = sqlx::query(&count_query);
414
415        // Bind parameters
416        if let Some(user_id) = filters.user_id {
417            query_builder = query_builder.bind(user_id);
418        }
419        if let Some(org_id) = filters.organization_id {
420            query_builder = query_builder.bind(org_id);
421        }
422        if let Some(ref event_type) = filters.event_type {
423            query_builder = query_builder.bind(Self::event_type_to_string(event_type));
424        }
425        if let Some(success) = filters.success {
426            query_builder = query_builder.bind(success);
427        }
428        if let Some(start_date) = filters.start_date {
429            query_builder = query_builder.bind(start_date);
430        }
431        if let Some(end_date) = filters.end_date {
432            query_builder = query_builder.bind(end_date);
433        }
434        if let Some(ref resource_type) = filters.resource_type {
435            query_builder = query_builder.bind(resource_type);
436        }
437        if let Some(resource_id) = filters.resource_id {
438            query_builder = query_builder.bind(resource_id);
439        }
440
441        let row = query_builder
442            .fetch_one(&self.pool)
443            .await
444            .map_err(|e| format!("Database error: {}", e))?;
445
446        Ok(row.get("count"))
447    }
448}