1use crate::application::dto::PageRequest;
2use crate::application::ports::{AuditLogFilters, AuditLogRepository};
3use crate::infrastructure::audit::{AuditEventType, AuditLogEntry};
4use crate::infrastructure::database::pool::DbPool;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use sqlx::Row;
8use uuid::Uuid;
9
10pub struct PostgresAuditLogRepository {
11 pool: DbPool,
12}
13
14impl PostgresAuditLogRepository {
15 pub fn new(pool: DbPool) -> Self {
16 Self { pool }
17 }
18
19 fn event_type_to_string(event_type: &AuditEventType) -> String {
21 format!("{:?}", event_type)
22 }
23
24 fn string_to_event_type(s: &str) -> AuditEventType {
26 match s {
27 "UserLogin" => AuditEventType::UserLogin,
28 "UserLogout" => AuditEventType::UserLogout,
29 "UserRegistration" => AuditEventType::UserRegistration,
30 "TokenRefresh" => AuditEventType::TokenRefresh,
31 "AuthenticationFailed" => AuditEventType::AuthenticationFailed,
32 "BuildingCreated" => AuditEventType::BuildingCreated,
33 "BuildingUpdated" => AuditEventType::BuildingUpdated,
34 "BuildingDeleted" => AuditEventType::BuildingDeleted,
35 "UnitCreated" => AuditEventType::UnitCreated,
36 "UnitAssignedToOwner" => AuditEventType::UnitAssignedToOwner,
37 "OwnerCreated" => AuditEventType::OwnerCreated,
38 "OwnerUpdated" => AuditEventType::OwnerUpdated,
39 "ExpenseCreated" => AuditEventType::ExpenseCreated,
40 "ExpenseMarkedPaid" => AuditEventType::ExpenseMarkedPaid,
41 "MeetingCreated" => AuditEventType::MeetingCreated,
42 "MeetingCompleted" => AuditEventType::MeetingCompleted,
43 "DocumentUploaded" => AuditEventType::DocumentUploaded,
44 "DocumentDeleted" => AuditEventType::DocumentDeleted,
45 "UnauthorizedAccess" => AuditEventType::UnauthorizedAccess,
46 "RateLimitExceeded" => AuditEventType::RateLimitExceeded,
47 "InvalidToken" => AuditEventType::InvalidToken,
48 "GdprDataExported" => AuditEventType::GdprDataExported,
49 "GdprDataExportFailed" => AuditEventType::GdprDataExportFailed,
50 "GdprDataErased" => AuditEventType::GdprDataErased,
51 "GdprDataErasureFailed" => AuditEventType::GdprDataErasureFailed,
52 "GdprErasureCheckRequested" => AuditEventType::GdprErasureCheckRequested,
53 _ => AuditEventType::UnauthorizedAccess, }
55 }
56
57 fn row_to_entry(row: &sqlx::postgres::PgRow) -> AuditLogEntry {
59 let event_type_str: String = row.get("event_type");
60 let metadata_json: Option<serde_json::Value> = row.get("metadata");
61
62 AuditLogEntry {
63 id: row.get("id"),
64 timestamp: row.get("timestamp"),
65 event_type: Self::string_to_event_type(&event_type_str),
66 user_id: row.get("user_id"),
67 organization_id: row.get("organization_id"),
68 resource_type: row.get("resource_type"),
69 resource_id: row.get("resource_id"),
70 ip_address: row.get("ip_address"),
71 user_agent: row.get("user_agent"),
72 metadata: metadata_json,
73 success: row.get("success"),
74 error_message: row.get("error_message"),
75 }
76 }
77}
78
79#[async_trait]
80impl AuditLogRepository for PostgresAuditLogRepository {
81 async fn create(&self, entry: &AuditLogEntry) -> Result<AuditLogEntry, String> {
82 let event_type_str = Self::event_type_to_string(&entry.event_type);
83
84 sqlx::query(
85 r#"
86 INSERT INTO audit_logs (
87 id, timestamp, event_type, user_id, organization_id,
88 resource_type, resource_id, ip_address, user_agent,
89 metadata, success, error_message, created_at
90 )
91 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
92 "#,
93 )
94 .bind(entry.id)
95 .bind(entry.timestamp)
96 .bind(event_type_str)
97 .bind(entry.user_id)
98 .bind(entry.organization_id)
99 .bind(&entry.resource_type)
100 .bind(entry.resource_id)
101 .bind(&entry.ip_address)
102 .bind(&entry.user_agent)
103 .bind(&entry.metadata)
104 .bind(entry.success)
105 .bind(&entry.error_message)
106 .bind(Utc::now())
107 .execute(&self.pool)
108 .await
109 .map_err(|e| format!("Database error: {}", e))?;
110
111 Ok(entry.clone())
112 }
113
114 async fn find_by_id(&self, id: Uuid) -> Result<Option<AuditLogEntry>, String> {
115 let row = sqlx::query(
116 r#"
117 SELECT id, timestamp, event_type, user_id, organization_id,
118 resource_type, resource_id, ip_address, user_agent,
119 metadata, success, error_message
120 FROM audit_logs
121 WHERE id = $1
122 "#,
123 )
124 .bind(id)
125 .fetch_optional(&self.pool)
126 .await
127 .map_err(|e| format!("Database error: {}", e))?;
128
129 Ok(row.map(|r| Self::row_to_entry(&r)))
130 }
131
132 async fn find_all_paginated(
133 &self,
134 page_request: &PageRequest,
135 filters: &AuditLogFilters,
136 ) -> Result<(Vec<AuditLogEntry>, i64), String> {
137 let limit = page_request.per_page.min(100);
138 let offset = (page_request.page - 1) * limit;
139
140 let mut where_clauses = Vec::new();
142 let mut param_index = 1;
143
144 if filters.user_id.is_some() {
145 where_clauses.push(format!("user_id = ${}", param_index));
146 param_index += 1;
147 }
148 if filters.organization_id.is_some() {
149 where_clauses.push(format!("organization_id = ${}", param_index));
150 param_index += 1;
151 }
152 if filters.event_type.is_some() {
153 where_clauses.push(format!("event_type = ${}", param_index));
154 param_index += 1;
155 }
156 if filters.success.is_some() {
157 where_clauses.push(format!("success = ${}", param_index));
158 param_index += 1;
159 }
160 if filters.start_date.is_some() {
161 where_clauses.push(format!("timestamp >= ${}", param_index));
162 param_index += 1;
163 }
164 if filters.end_date.is_some() {
165 where_clauses.push(format!("timestamp <= ${}", param_index));
166 param_index += 1;
167 }
168 if filters.resource_type.is_some() {
169 where_clauses.push(format!("resource_type = ${}", param_index));
170 param_index += 1;
171 }
172 if filters.resource_id.is_some() {
173 where_clauses.push(format!("resource_id = ${}", param_index));
174 param_index += 1;
175 }
176
177 let where_clause = if where_clauses.is_empty() {
178 String::new()
179 } else {
180 format!("WHERE {}", where_clauses.join(" AND "))
181 };
182
183 let count_query = format!("SELECT COUNT(*) as count FROM audit_logs {}", where_clause);
185 let mut count_query_builder = sqlx::query(&count_query);
186
187 if let Some(user_id) = filters.user_id {
189 count_query_builder = count_query_builder.bind(user_id);
190 }
191 if let Some(org_id) = filters.organization_id {
192 count_query_builder = count_query_builder.bind(org_id);
193 }
194 if let Some(ref event_type) = filters.event_type {
195 count_query_builder = count_query_builder.bind(Self::event_type_to_string(event_type));
196 }
197 if let Some(success) = filters.success {
198 count_query_builder = count_query_builder.bind(success);
199 }
200 if let Some(start_date) = filters.start_date {
201 count_query_builder = count_query_builder.bind(start_date);
202 }
203 if let Some(end_date) = filters.end_date {
204 count_query_builder = count_query_builder.bind(end_date);
205 }
206 if let Some(ref resource_type) = filters.resource_type {
207 count_query_builder = count_query_builder.bind(resource_type);
208 }
209 if let Some(resource_id) = filters.resource_id {
210 count_query_builder = count_query_builder.bind(resource_id);
211 }
212
213 let count_row = count_query_builder
214 .fetch_one(&self.pool)
215 .await
216 .map_err(|e| format!("Database error: {}", e))?;
217 let total: i64 = count_row.get("count");
218
219 let data_query = format!(
221 r#"
222 SELECT id, timestamp, event_type, user_id, organization_id,
223 resource_type, resource_id, ip_address, user_agent,
224 metadata, success, error_message
225 FROM audit_logs
226 {}
227 ORDER BY timestamp DESC
228 LIMIT ${} OFFSET ${}
229 "#,
230 where_clause,
231 param_index,
232 param_index + 1
233 );
234
235 let mut data_query_builder = sqlx::query(&data_query);
236
237 if let Some(user_id) = filters.user_id {
239 data_query_builder = data_query_builder.bind(user_id);
240 }
241 if let Some(org_id) = filters.organization_id {
242 data_query_builder = data_query_builder.bind(org_id);
243 }
244 if let Some(ref event_type) = filters.event_type {
245 data_query_builder = data_query_builder.bind(Self::event_type_to_string(event_type));
246 }
247 if let Some(success) = filters.success {
248 data_query_builder = data_query_builder.bind(success);
249 }
250 if let Some(start_date) = filters.start_date {
251 data_query_builder = data_query_builder.bind(start_date);
252 }
253 if let Some(end_date) = filters.end_date {
254 data_query_builder = data_query_builder.bind(end_date);
255 }
256 if let Some(ref resource_type) = filters.resource_type {
257 data_query_builder = data_query_builder.bind(resource_type);
258 }
259 if let Some(resource_id) = filters.resource_id {
260 data_query_builder = data_query_builder.bind(resource_id);
261 }
262
263 data_query_builder = data_query_builder.bind(limit).bind(offset);
264
265 let rows = data_query_builder
266 .fetch_all(&self.pool)
267 .await
268 .map_err(|e| format!("Database error: {}", e))?;
269
270 let entries: Vec<AuditLogEntry> = rows.iter().map(Self::row_to_entry).collect();
271
272 Ok((entries, total))
273 }
274
275 async fn find_recent(&self, limit: i64) -> Result<Vec<AuditLogEntry>, String> {
276 let rows = sqlx::query(
277 r#"
278 SELECT id, timestamp, event_type, user_id, organization_id,
279 resource_type, resource_id, ip_address, user_agent,
280 metadata, success, error_message
281 FROM audit_logs
282 ORDER BY timestamp DESC
283 LIMIT $1
284 "#,
285 )
286 .bind(limit)
287 .fetch_all(&self.pool)
288 .await
289 .map_err(|e| format!("Database error: {}", e))?;
290
291 Ok(rows.iter().map(Self::row_to_entry).collect())
292 }
293
294 async fn find_failed_operations(
295 &self,
296 page_request: &PageRequest,
297 organization_id: Option<Uuid>,
298 ) -> Result<(Vec<AuditLogEntry>, i64), String> {
299 let limit = page_request.per_page.min(100);
300 let offset = (page_request.page - 1) * limit;
301
302 let where_clause = if organization_id.is_some() {
303 "WHERE success = false AND organization_id = $1"
304 } else {
305 "WHERE success = false"
306 };
307
308 let count_query = format!("SELECT COUNT(*) as count FROM audit_logs {}", where_clause);
310 let count_row = if let Some(org_id) = organization_id {
311 sqlx::query(&count_query)
312 .bind(org_id)
313 .fetch_one(&self.pool)
314 .await
315 .map_err(|e| format!("Database error: {}", e))?
316 } else {
317 sqlx::query(&count_query)
318 .fetch_one(&self.pool)
319 .await
320 .map_err(|e| format!("Database error: {}", e))?
321 };
322 let total: i64 = count_row.get("count");
323
324 let data_query = format!(
326 r#"
327 SELECT id, timestamp, event_type, user_id, organization_id,
328 resource_type, resource_id, ip_address, user_agent,
329 metadata, success, error_message
330 FROM audit_logs
331 {}
332 ORDER BY timestamp DESC
333 LIMIT $2 OFFSET $3
334 "#,
335 where_clause
336 );
337
338 let rows = if let Some(org_id) = organization_id {
339 sqlx::query(&data_query)
340 .bind(org_id)
341 .bind(limit)
342 .bind(offset)
343 .fetch_all(&self.pool)
344 .await
345 .map_err(|e| format!("Database error: {}", e))?
346 } else {
347 sqlx::query(&data_query)
348 .bind(limit)
349 .bind(offset)
350 .fetch_all(&self.pool)
351 .await
352 .map_err(|e| format!("Database error: {}", e))?
353 };
354
355 let entries: Vec<AuditLogEntry> = rows.iter().map(Self::row_to_entry).collect();
356
357 Ok((entries, total))
358 }
359
360 async fn delete_older_than(&self, timestamp: DateTime<Utc>) -> Result<i64, String> {
361 let result = sqlx::query(
362 r#"
363 DELETE FROM audit_logs
364 WHERE timestamp < $1
365 "#,
366 )
367 .bind(timestamp)
368 .execute(&self.pool)
369 .await
370 .map_err(|e| format!("Database error: {}", e))?;
371
372 Ok(result.rows_affected() as i64)
373 }
374
375 async fn count_by_filters(&self, filters: &AuditLogFilters) -> Result<i64, String> {
376 let mut where_clauses = Vec::new();
377 let mut param_index = 1;
378
379 if filters.user_id.is_some() {
380 where_clauses.push(format!("user_id = ${}", param_index));
381 param_index += 1;
382 }
383 if filters.organization_id.is_some() {
384 where_clauses.push(format!("organization_id = ${}", param_index));
385 param_index += 1;
386 }
387 if filters.event_type.is_some() {
388 where_clauses.push(format!("event_type = ${}", param_index));
389 param_index += 1;
390 }
391 if filters.success.is_some() {
392 where_clauses.push(format!("success = ${}", param_index));
393 param_index += 1;
394 }
395 if filters.start_date.is_some() {
396 where_clauses.push(format!("timestamp >= ${}", param_index));
397 param_index += 1;
398 }
399 if filters.end_date.is_some() {
400 where_clauses.push(format!("timestamp <= ${}", param_index));
401 param_index += 1;
402 }
403 if filters.resource_type.is_some() {
404 where_clauses.push(format!("resource_type = ${}", param_index));
405 param_index += 1;
406 }
407 if filters.resource_id.is_some() {
408 where_clauses.push(format!("resource_id = ${}", param_index));
409 }
410
411 let where_clause = if where_clauses.is_empty() {
412 String::new()
413 } else {
414 format!("WHERE {}", where_clauses.join(" AND "))
415 };
416
417 let count_query = format!("SELECT COUNT(*) as count FROM audit_logs {}", where_clause);
418 let mut query_builder = sqlx::query(&count_query);
419
420 if let Some(user_id) = filters.user_id {
422 query_builder = query_builder.bind(user_id);
423 }
424 if let Some(org_id) = filters.organization_id {
425 query_builder = query_builder.bind(org_id);
426 }
427 if let Some(ref event_type) = filters.event_type {
428 query_builder = query_builder.bind(Self::event_type_to_string(event_type));
429 }
430 if let Some(success) = filters.success {
431 query_builder = query_builder.bind(success);
432 }
433 if let Some(start_date) = filters.start_date {
434 query_builder = query_builder.bind(start_date);
435 }
436 if let Some(end_date) = filters.end_date {
437 query_builder = query_builder.bind(end_date);
438 }
439 if let Some(ref resource_type) = filters.resource_type {
440 query_builder = query_builder.bind(resource_type);
441 }
442 if let Some(resource_id) = filters.resource_id {
443 query_builder = query_builder.bind(resource_id);
444 }
445
446 let row = query_builder
447 .fetch_one(&self.pool)
448 .await
449 .map_err(|e| format!("Database error: {}", e))?;
450
451 Ok(row.get("count"))
452 }
453}