koprogo_api/infrastructure/database/repositories/
audit_log_repository_impl.rs

1use crate::application::dto::PageRequest;
2use crate::application::ports::{AuditLogFilters, AuditLogRepository};
3use crate::infrastructure::audit::{AuditEventType, AuditLogEntry};
4use crate::infrastructure::database::pool::DbPool;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use sqlx::Row;
8use uuid::Uuid;
9
10pub struct PostgresAuditLogRepository {
11    pool: DbPool,
12}
13
14impl PostgresAuditLogRepository {
15    pub fn new(pool: DbPool) -> Self {
16        Self { pool }
17    }
18
19    /// Convert AuditEventType to string for database storage
20    fn event_type_to_string(event_type: &AuditEventType) -> String {
21        format!("{:?}", event_type)
22    }
23
24    /// Convert string from database to AuditEventType
25    fn string_to_event_type(s: &str) -> AuditEventType {
26        match s {
27            "UserLogin" => AuditEventType::UserLogin,
28            "UserLogout" => AuditEventType::UserLogout,
29            "UserRegistration" => AuditEventType::UserRegistration,
30            "TokenRefresh" => AuditEventType::TokenRefresh,
31            "AuthenticationFailed" => AuditEventType::AuthenticationFailed,
32            "BuildingCreated" => AuditEventType::BuildingCreated,
33            "BuildingUpdated" => AuditEventType::BuildingUpdated,
34            "BuildingDeleted" => AuditEventType::BuildingDeleted,
35            "UnitCreated" => AuditEventType::UnitCreated,
36            "UnitAssignedToOwner" => AuditEventType::UnitAssignedToOwner,
37            "OwnerCreated" => AuditEventType::OwnerCreated,
38            "OwnerUpdated" => AuditEventType::OwnerUpdated,
39            "ExpenseCreated" => AuditEventType::ExpenseCreated,
40            "ExpenseMarkedPaid" => AuditEventType::ExpenseMarkedPaid,
41            "MeetingCreated" => AuditEventType::MeetingCreated,
42            "MeetingCompleted" => AuditEventType::MeetingCompleted,
43            "DocumentUploaded" => AuditEventType::DocumentUploaded,
44            "DocumentDeleted" => AuditEventType::DocumentDeleted,
45            "UnauthorizedAccess" => AuditEventType::UnauthorizedAccess,
46            "RateLimitExceeded" => AuditEventType::RateLimitExceeded,
47            "InvalidToken" => AuditEventType::InvalidToken,
48            "GdprDataExported" => AuditEventType::GdprDataExported,
49            "GdprDataExportFailed" => AuditEventType::GdprDataExportFailed,
50            "GdprDataErased" => AuditEventType::GdprDataErased,
51            "GdprDataErasureFailed" => AuditEventType::GdprDataErasureFailed,
52            "GdprErasureCheckRequested" => AuditEventType::GdprErasureCheckRequested,
53            _ => AuditEventType::UnauthorizedAccess, // Default fallback
54        }
55    }
56
57    /// Map database row to AuditLogEntry
58    fn row_to_entry(row: &sqlx::postgres::PgRow) -> AuditLogEntry {
59        let event_type_str: String = row.get("event_type");
60        let metadata_json: Option<serde_json::Value> = row.get("metadata");
61
62        AuditLogEntry {
63            id: row.get("id"),
64            timestamp: row.get("timestamp"),
65            event_type: Self::string_to_event_type(&event_type_str),
66            user_id: row.get("user_id"),
67            organization_id: row.get("organization_id"),
68            resource_type: row.get("resource_type"),
69            resource_id: row.get("resource_id"),
70            ip_address: row.get("ip_address"),
71            user_agent: row.get("user_agent"),
72            metadata: metadata_json,
73            success: row.get("success"),
74            error_message: row.get("error_message"),
75        }
76    }
77}
78
79#[async_trait]
80impl AuditLogRepository for PostgresAuditLogRepository {
81    async fn create(&self, entry: &AuditLogEntry) -> Result<AuditLogEntry, String> {
82        let event_type_str = Self::event_type_to_string(&entry.event_type);
83
84        sqlx::query(
85            r#"
86            INSERT INTO audit_logs (
87                id, timestamp, event_type, user_id, organization_id,
88                resource_type, resource_id, ip_address, user_agent,
89                metadata, success, error_message, created_at
90            )
91            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
92            "#,
93        )
94        .bind(entry.id)
95        .bind(entry.timestamp)
96        .bind(event_type_str)
97        .bind(entry.user_id)
98        .bind(entry.organization_id)
99        .bind(&entry.resource_type)
100        .bind(entry.resource_id)
101        .bind(&entry.ip_address)
102        .bind(&entry.user_agent)
103        .bind(&entry.metadata)
104        .bind(entry.success)
105        .bind(&entry.error_message)
106        .bind(Utc::now())
107        .execute(&self.pool)
108        .await
109        .map_err(|e| format!("Database error: {}", e))?;
110
111        Ok(entry.clone())
112    }
113
114    async fn find_by_id(&self, id: Uuid) -> Result<Option<AuditLogEntry>, String> {
115        let row = sqlx::query(
116            r#"
117            SELECT id, timestamp, event_type, user_id, organization_id,
118                   resource_type, resource_id, ip_address, user_agent,
119                   metadata, success, error_message
120            FROM audit_logs
121            WHERE id = $1
122            "#,
123        )
124        .bind(id)
125        .fetch_optional(&self.pool)
126        .await
127        .map_err(|e| format!("Database error: {}", e))?;
128
129        Ok(row.map(|r| Self::row_to_entry(&r)))
130    }
131
132    async fn find_all_paginated(
133        &self,
134        page_request: &PageRequest,
135        filters: &AuditLogFilters,
136    ) -> Result<(Vec<AuditLogEntry>, i64), String> {
137        let limit = page_request.per_page.min(100);
138        let offset = (page_request.page - 1) * limit;
139
140        // Build WHERE clause dynamically based on filters
141        let mut where_clauses = Vec::new();
142        let mut param_index = 1;
143
144        if filters.user_id.is_some() {
145            where_clauses.push(format!("user_id = ${}", param_index));
146            param_index += 1;
147        }
148        if filters.organization_id.is_some() {
149            where_clauses.push(format!("organization_id = ${}", param_index));
150            param_index += 1;
151        }
152        if filters.event_type.is_some() {
153            where_clauses.push(format!("event_type = ${}", param_index));
154            param_index += 1;
155        }
156        if filters.success.is_some() {
157            where_clauses.push(format!("success = ${}", param_index));
158            param_index += 1;
159        }
160        if filters.start_date.is_some() {
161            where_clauses.push(format!("timestamp >= ${}", param_index));
162            param_index += 1;
163        }
164        if filters.end_date.is_some() {
165            where_clauses.push(format!("timestamp <= ${}", param_index));
166            param_index += 1;
167        }
168        if filters.resource_type.is_some() {
169            where_clauses.push(format!("resource_type = ${}", param_index));
170            param_index += 1;
171        }
172        if filters.resource_id.is_some() {
173            where_clauses.push(format!("resource_id = ${}", param_index));
174            param_index += 1;
175        }
176
177        let where_clause = if where_clauses.is_empty() {
178            String::new()
179        } else {
180            format!("WHERE {}", where_clauses.join(" AND "))
181        };
182
183        // Count total matching records
184        let count_query = format!("SELECT COUNT(*) as count FROM audit_logs {}", where_clause);
185        let mut count_query_builder = sqlx::query(&count_query);
186
187        // Bind parameters for count query
188        if let Some(user_id) = filters.user_id {
189            count_query_builder = count_query_builder.bind(user_id);
190        }
191        if let Some(org_id) = filters.organization_id {
192            count_query_builder = count_query_builder.bind(org_id);
193        }
194        if let Some(ref event_type) = filters.event_type {
195            count_query_builder = count_query_builder.bind(Self::event_type_to_string(event_type));
196        }
197        if let Some(success) = filters.success {
198            count_query_builder = count_query_builder.bind(success);
199        }
200        if let Some(start_date) = filters.start_date {
201            count_query_builder = count_query_builder.bind(start_date);
202        }
203        if let Some(end_date) = filters.end_date {
204            count_query_builder = count_query_builder.bind(end_date);
205        }
206        if let Some(ref resource_type) = filters.resource_type {
207            count_query_builder = count_query_builder.bind(resource_type);
208        }
209        if let Some(resource_id) = filters.resource_id {
210            count_query_builder = count_query_builder.bind(resource_id);
211        }
212
213        let count_row = count_query_builder
214            .fetch_one(&self.pool)
215            .await
216            .map_err(|e| format!("Database error: {}", e))?;
217        let total: i64 = count_row.get("count");
218
219        // Fetch paginated results
220        let data_query = format!(
221            r#"
222            SELECT id, timestamp, event_type, user_id, organization_id,
223                   resource_type, resource_id, ip_address, user_agent,
224                   metadata, success, error_message
225            FROM audit_logs
226            {}
227            ORDER BY timestamp DESC
228            LIMIT ${} OFFSET ${}
229            "#,
230            where_clause,
231            param_index,
232            param_index + 1
233        );
234
235        let mut data_query_builder = sqlx::query(&data_query);
236
237        // Bind parameters for data query
238        if let Some(user_id) = filters.user_id {
239            data_query_builder = data_query_builder.bind(user_id);
240        }
241        if let Some(org_id) = filters.organization_id {
242            data_query_builder = data_query_builder.bind(org_id);
243        }
244        if let Some(ref event_type) = filters.event_type {
245            data_query_builder = data_query_builder.bind(Self::event_type_to_string(event_type));
246        }
247        if let Some(success) = filters.success {
248            data_query_builder = data_query_builder.bind(success);
249        }
250        if let Some(start_date) = filters.start_date {
251            data_query_builder = data_query_builder.bind(start_date);
252        }
253        if let Some(end_date) = filters.end_date {
254            data_query_builder = data_query_builder.bind(end_date);
255        }
256        if let Some(ref resource_type) = filters.resource_type {
257            data_query_builder = data_query_builder.bind(resource_type);
258        }
259        if let Some(resource_id) = filters.resource_id {
260            data_query_builder = data_query_builder.bind(resource_id);
261        }
262
263        data_query_builder = data_query_builder.bind(limit).bind(offset);
264
265        let rows = data_query_builder
266            .fetch_all(&self.pool)
267            .await
268            .map_err(|e| format!("Database error: {}", e))?;
269
270        let entries: Vec<AuditLogEntry> = rows.iter().map(Self::row_to_entry).collect();
271
272        Ok((entries, total))
273    }
274
275    async fn find_recent(&self, limit: i64) -> Result<Vec<AuditLogEntry>, String> {
276        let rows = sqlx::query(
277            r#"
278            SELECT id, timestamp, event_type, user_id, organization_id,
279                   resource_type, resource_id, ip_address, user_agent,
280                   metadata, success, error_message
281            FROM audit_logs
282            ORDER BY timestamp DESC
283            LIMIT $1
284            "#,
285        )
286        .bind(limit)
287        .fetch_all(&self.pool)
288        .await
289        .map_err(|e| format!("Database error: {}", e))?;
290
291        Ok(rows.iter().map(Self::row_to_entry).collect())
292    }
293
294    async fn find_failed_operations(
295        &self,
296        page_request: &PageRequest,
297        organization_id: Option<Uuid>,
298    ) -> Result<(Vec<AuditLogEntry>, i64), String> {
299        let limit = page_request.per_page.min(100);
300        let offset = (page_request.page - 1) * limit;
301
302        let where_clause = if organization_id.is_some() {
303            "WHERE success = false AND organization_id = $1"
304        } else {
305            "WHERE success = false"
306        };
307
308        // Count
309        let count_query = format!("SELECT COUNT(*) as count FROM audit_logs {}", where_clause);
310        let count_row = if let Some(org_id) = organization_id {
311            sqlx::query(&count_query)
312                .bind(org_id)
313                .fetch_one(&self.pool)
314                .await
315                .map_err(|e| format!("Database error: {}", e))?
316        } else {
317            sqlx::query(&count_query)
318                .fetch_one(&self.pool)
319                .await
320                .map_err(|e| format!("Database error: {}", e))?
321        };
322        let total: i64 = count_row.get("count");
323
324        // Fetch data
325        let data_query = format!(
326            r#"
327            SELECT id, timestamp, event_type, user_id, organization_id,
328                   resource_type, resource_id, ip_address, user_agent,
329                   metadata, success, error_message
330            FROM audit_logs
331            {}
332            ORDER BY timestamp DESC
333            LIMIT $2 OFFSET $3
334            "#,
335            where_clause
336        );
337
338        let rows = if let Some(org_id) = organization_id {
339            sqlx::query(&data_query)
340                .bind(org_id)
341                .bind(limit)
342                .bind(offset)
343                .fetch_all(&self.pool)
344                .await
345                .map_err(|e| format!("Database error: {}", e))?
346        } else {
347            sqlx::query(&data_query)
348                .bind(limit)
349                .bind(offset)
350                .fetch_all(&self.pool)
351                .await
352                .map_err(|e| format!("Database error: {}", e))?
353        };
354
355        let entries: Vec<AuditLogEntry> = rows.iter().map(Self::row_to_entry).collect();
356
357        Ok((entries, total))
358    }
359
360    async fn delete_older_than(&self, timestamp: DateTime<Utc>) -> Result<i64, String> {
361        let result = sqlx::query(
362            r#"
363            DELETE FROM audit_logs
364            WHERE timestamp < $1
365            "#,
366        )
367        .bind(timestamp)
368        .execute(&self.pool)
369        .await
370        .map_err(|e| format!("Database error: {}", e))?;
371
372        Ok(result.rows_affected() as i64)
373    }
374
375    async fn count_by_filters(&self, filters: &AuditLogFilters) -> Result<i64, String> {
376        let mut where_clauses = Vec::new();
377        let mut param_index = 1;
378
379        if filters.user_id.is_some() {
380            where_clauses.push(format!("user_id = ${}", param_index));
381            param_index += 1;
382        }
383        if filters.organization_id.is_some() {
384            where_clauses.push(format!("organization_id = ${}", param_index));
385            param_index += 1;
386        }
387        if filters.event_type.is_some() {
388            where_clauses.push(format!("event_type = ${}", param_index));
389            param_index += 1;
390        }
391        if filters.success.is_some() {
392            where_clauses.push(format!("success = ${}", param_index));
393            param_index += 1;
394        }
395        if filters.start_date.is_some() {
396            where_clauses.push(format!("timestamp >= ${}", param_index));
397            param_index += 1;
398        }
399        if filters.end_date.is_some() {
400            where_clauses.push(format!("timestamp <= ${}", param_index));
401            param_index += 1;
402        }
403        if filters.resource_type.is_some() {
404            where_clauses.push(format!("resource_type = ${}", param_index));
405            param_index += 1;
406        }
407        if filters.resource_id.is_some() {
408            where_clauses.push(format!("resource_id = ${}", param_index));
409        }
410
411        let where_clause = if where_clauses.is_empty() {
412            String::new()
413        } else {
414            format!("WHERE {}", where_clauses.join(" AND "))
415        };
416
417        let count_query = format!("SELECT COUNT(*) as count FROM audit_logs {}", where_clause);
418        let mut query_builder = sqlx::query(&count_query);
419
420        // Bind parameters
421        if let Some(user_id) = filters.user_id {
422            query_builder = query_builder.bind(user_id);
423        }
424        if let Some(org_id) = filters.organization_id {
425            query_builder = query_builder.bind(org_id);
426        }
427        if let Some(ref event_type) = filters.event_type {
428            query_builder = query_builder.bind(Self::event_type_to_string(event_type));
429        }
430        if let Some(success) = filters.success {
431            query_builder = query_builder.bind(success);
432        }
433        if let Some(start_date) = filters.start_date {
434            query_builder = query_builder.bind(start_date);
435        }
436        if let Some(end_date) = filters.end_date {
437            query_builder = query_builder.bind(end_date);
438        }
439        if let Some(ref resource_type) = filters.resource_type {
440            query_builder = query_builder.bind(resource_type);
441        }
442        if let Some(resource_id) = filters.resource_id {
443            query_builder = query_builder.bind(resource_id);
444        }
445
446        let row = query_builder
447            .fetch_one(&self.pool)
448            .await
449            .map_err(|e| format!("Database error: {}", e))?;
450
451        Ok(row.get("count"))
452    }
453}