1use crate::application::dto::PageRequest;
2use crate::application::ports::{AuditLogFilters, AuditLogRepository};
3use crate::infrastructure::audit::{AuditEventType, AuditLogEntry};
4use crate::infrastructure::database::pool::DbPool;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use sqlx::Row;
8use uuid::Uuid;
9
10pub struct PostgresAuditLogRepository {
11 pool: DbPool,
12}
13
14impl PostgresAuditLogRepository {
15 pub fn new(pool: DbPool) -> Self {
16 Self { pool }
17 }
18
19 fn event_type_to_string(event_type: &AuditEventType) -> String {
21 format!("{:?}", event_type)
22 }
23
24 fn string_to_event_type(s: &str) -> AuditEventType {
26 match s {
27 "UserLogin" => AuditEventType::UserLogin,
28 "UserLogout" => AuditEventType::UserLogout,
29 "UserRegistration" => AuditEventType::UserRegistration,
30 "TokenRefresh" => AuditEventType::TokenRefresh,
31 "AuthenticationFailed" => AuditEventType::AuthenticationFailed,
32 "BuildingCreated" => AuditEventType::BuildingCreated,
33 "BuildingUpdated" => AuditEventType::BuildingUpdated,
34 "BuildingDeleted" => AuditEventType::BuildingDeleted,
35 "UnitCreated" => AuditEventType::UnitCreated,
36 "UnitAssignedToOwner" => AuditEventType::UnitAssignedToOwner,
37 "OwnerCreated" => AuditEventType::OwnerCreated,
38 "OwnerUpdated" => AuditEventType::OwnerUpdated,
39 "ExpenseCreated" => AuditEventType::ExpenseCreated,
40 "ExpenseMarkedPaid" => AuditEventType::ExpenseMarkedPaid,
41 "MeetingCreated" => AuditEventType::MeetingCreated,
42 "MeetingCompleted" => AuditEventType::MeetingCompleted,
43 "DocumentUploaded" => AuditEventType::DocumentUploaded,
44 "DocumentDeleted" => AuditEventType::DocumentDeleted,
45 "UnauthorizedAccess" => AuditEventType::UnauthorizedAccess,
46 "RateLimitExceeded" => AuditEventType::RateLimitExceeded,
47 "InvalidToken" => AuditEventType::InvalidToken,
48 _ => AuditEventType::UnauthorizedAccess, }
50 }
51
52 fn row_to_entry(row: &sqlx::postgres::PgRow) -> AuditLogEntry {
54 let event_type_str: String = row.get("event_type");
55 let metadata_json: Option<serde_json::Value> = row.get("metadata");
56
57 AuditLogEntry {
58 id: row.get("id"),
59 timestamp: row.get("timestamp"),
60 event_type: Self::string_to_event_type(&event_type_str),
61 user_id: row.get("user_id"),
62 organization_id: row.get("organization_id"),
63 resource_type: row.get("resource_type"),
64 resource_id: row.get("resource_id"),
65 ip_address: row.get("ip_address"),
66 user_agent: row.get("user_agent"),
67 metadata: metadata_json,
68 success: row.get("success"),
69 error_message: row.get("error_message"),
70 }
71 }
72}
73
74#[async_trait]
75impl AuditLogRepository for PostgresAuditLogRepository {
76 async fn create(&self, entry: &AuditLogEntry) -> Result<AuditLogEntry, String> {
77 let event_type_str = Self::event_type_to_string(&entry.event_type);
78
79 sqlx::query(
80 r#"
81 INSERT INTO audit_logs (
82 id, timestamp, event_type, user_id, organization_id,
83 resource_type, resource_id, ip_address, user_agent,
84 metadata, success, error_message, created_at
85 )
86 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
87 "#,
88 )
89 .bind(entry.id)
90 .bind(entry.timestamp)
91 .bind(event_type_str)
92 .bind(entry.user_id)
93 .bind(entry.organization_id)
94 .bind(&entry.resource_type)
95 .bind(entry.resource_id)
96 .bind(&entry.ip_address)
97 .bind(&entry.user_agent)
98 .bind(&entry.metadata)
99 .bind(entry.success)
100 .bind(&entry.error_message)
101 .bind(Utc::now())
102 .execute(&self.pool)
103 .await
104 .map_err(|e| format!("Database error: {}", e))?;
105
106 Ok(entry.clone())
107 }
108
109 async fn find_by_id(&self, id: Uuid) -> Result<Option<AuditLogEntry>, String> {
110 let row = sqlx::query(
111 r#"
112 SELECT id, timestamp, event_type, user_id, organization_id,
113 resource_type, resource_id, ip_address, user_agent,
114 metadata, success, error_message
115 FROM audit_logs
116 WHERE id = $1
117 "#,
118 )
119 .bind(id)
120 .fetch_optional(&self.pool)
121 .await
122 .map_err(|e| format!("Database error: {}", e))?;
123
124 Ok(row.map(|r| Self::row_to_entry(&r)))
125 }
126
127 async fn find_all_paginated(
128 &self,
129 page_request: &PageRequest,
130 filters: &AuditLogFilters,
131 ) -> Result<(Vec<AuditLogEntry>, i64), String> {
132 let limit = page_request.per_page.min(100);
133 let offset = (page_request.page - 1) * limit;
134
135 let mut where_clauses = Vec::new();
137 let mut param_index = 1;
138
139 if filters.user_id.is_some() {
140 where_clauses.push(format!("user_id = ${}", param_index));
141 param_index += 1;
142 }
143 if filters.organization_id.is_some() {
144 where_clauses.push(format!("organization_id = ${}", param_index));
145 param_index += 1;
146 }
147 if filters.event_type.is_some() {
148 where_clauses.push(format!("event_type = ${}", param_index));
149 param_index += 1;
150 }
151 if filters.success.is_some() {
152 where_clauses.push(format!("success = ${}", param_index));
153 param_index += 1;
154 }
155 if filters.start_date.is_some() {
156 where_clauses.push(format!("timestamp >= ${}", param_index));
157 param_index += 1;
158 }
159 if filters.end_date.is_some() {
160 where_clauses.push(format!("timestamp <= ${}", param_index));
161 param_index += 1;
162 }
163 if filters.resource_type.is_some() {
164 where_clauses.push(format!("resource_type = ${}", param_index));
165 param_index += 1;
166 }
167 if filters.resource_id.is_some() {
168 where_clauses.push(format!("resource_id = ${}", param_index));
169 param_index += 1;
170 }
171
172 let where_clause = if where_clauses.is_empty() {
173 String::new()
174 } else {
175 format!("WHERE {}", where_clauses.join(" AND "))
176 };
177
178 let count_query = format!("SELECT COUNT(*) as count FROM audit_logs {}", where_clause);
180 let mut count_query_builder = sqlx::query(&count_query);
181
182 if let Some(user_id) = filters.user_id {
184 count_query_builder = count_query_builder.bind(user_id);
185 }
186 if let Some(org_id) = filters.organization_id {
187 count_query_builder = count_query_builder.bind(org_id);
188 }
189 if let Some(ref event_type) = filters.event_type {
190 count_query_builder = count_query_builder.bind(Self::event_type_to_string(event_type));
191 }
192 if let Some(success) = filters.success {
193 count_query_builder = count_query_builder.bind(success);
194 }
195 if let Some(start_date) = filters.start_date {
196 count_query_builder = count_query_builder.bind(start_date);
197 }
198 if let Some(end_date) = filters.end_date {
199 count_query_builder = count_query_builder.bind(end_date);
200 }
201 if let Some(ref resource_type) = filters.resource_type {
202 count_query_builder = count_query_builder.bind(resource_type);
203 }
204 if let Some(resource_id) = filters.resource_id {
205 count_query_builder = count_query_builder.bind(resource_id);
206 }
207
208 let count_row = count_query_builder
209 .fetch_one(&self.pool)
210 .await
211 .map_err(|e| format!("Database error: {}", e))?;
212 let total: i64 = count_row.get("count");
213
214 let data_query = format!(
216 r#"
217 SELECT id, timestamp, event_type, user_id, organization_id,
218 resource_type, resource_id, ip_address, user_agent,
219 metadata, success, error_message
220 FROM audit_logs
221 {}
222 ORDER BY timestamp DESC
223 LIMIT ${} OFFSET ${}
224 "#,
225 where_clause,
226 param_index,
227 param_index + 1
228 );
229
230 let mut data_query_builder = sqlx::query(&data_query);
231
232 if let Some(user_id) = filters.user_id {
234 data_query_builder = data_query_builder.bind(user_id);
235 }
236 if let Some(org_id) = filters.organization_id {
237 data_query_builder = data_query_builder.bind(org_id);
238 }
239 if let Some(ref event_type) = filters.event_type {
240 data_query_builder = data_query_builder.bind(Self::event_type_to_string(event_type));
241 }
242 if let Some(success) = filters.success {
243 data_query_builder = data_query_builder.bind(success);
244 }
245 if let Some(start_date) = filters.start_date {
246 data_query_builder = data_query_builder.bind(start_date);
247 }
248 if let Some(end_date) = filters.end_date {
249 data_query_builder = data_query_builder.bind(end_date);
250 }
251 if let Some(ref resource_type) = filters.resource_type {
252 data_query_builder = data_query_builder.bind(resource_type);
253 }
254 if let Some(resource_id) = filters.resource_id {
255 data_query_builder = data_query_builder.bind(resource_id);
256 }
257
258 data_query_builder = data_query_builder.bind(limit).bind(offset);
259
260 let rows = data_query_builder
261 .fetch_all(&self.pool)
262 .await
263 .map_err(|e| format!("Database error: {}", e))?;
264
265 let entries: Vec<AuditLogEntry> = rows.iter().map(Self::row_to_entry).collect();
266
267 Ok((entries, total))
268 }
269
270 async fn find_recent(&self, limit: i64) -> Result<Vec<AuditLogEntry>, String> {
271 let rows = sqlx::query(
272 r#"
273 SELECT id, timestamp, event_type, user_id, organization_id,
274 resource_type, resource_id, ip_address, user_agent,
275 metadata, success, error_message
276 FROM audit_logs
277 ORDER BY timestamp DESC
278 LIMIT $1
279 "#,
280 )
281 .bind(limit)
282 .fetch_all(&self.pool)
283 .await
284 .map_err(|e| format!("Database error: {}", e))?;
285
286 Ok(rows.iter().map(Self::row_to_entry).collect())
287 }
288
289 async fn find_failed_operations(
290 &self,
291 page_request: &PageRequest,
292 organization_id: Option<Uuid>,
293 ) -> Result<(Vec<AuditLogEntry>, i64), String> {
294 let limit = page_request.per_page.min(100);
295 let offset = (page_request.page - 1) * limit;
296
297 let where_clause = if organization_id.is_some() {
298 "WHERE success = false AND organization_id = $1"
299 } else {
300 "WHERE success = false"
301 };
302
303 let count_query = format!("SELECT COUNT(*) as count FROM audit_logs {}", where_clause);
305 let count_row = if let Some(org_id) = organization_id {
306 sqlx::query(&count_query)
307 .bind(org_id)
308 .fetch_one(&self.pool)
309 .await
310 .map_err(|e| format!("Database error: {}", e))?
311 } else {
312 sqlx::query(&count_query)
313 .fetch_one(&self.pool)
314 .await
315 .map_err(|e| format!("Database error: {}", e))?
316 };
317 let total: i64 = count_row.get("count");
318
319 let data_query = format!(
321 r#"
322 SELECT id, timestamp, event_type, user_id, organization_id,
323 resource_type, resource_id, ip_address, user_agent,
324 metadata, success, error_message
325 FROM audit_logs
326 {}
327 ORDER BY timestamp DESC
328 LIMIT $2 OFFSET $3
329 "#,
330 where_clause
331 );
332
333 let rows = if let Some(org_id) = organization_id {
334 sqlx::query(&data_query)
335 .bind(org_id)
336 .bind(limit)
337 .bind(offset)
338 .fetch_all(&self.pool)
339 .await
340 .map_err(|e| format!("Database error: {}", e))?
341 } else {
342 sqlx::query(&data_query)
343 .bind(limit)
344 .bind(offset)
345 .fetch_all(&self.pool)
346 .await
347 .map_err(|e| format!("Database error: {}", e))?
348 };
349
350 let entries: Vec<AuditLogEntry> = rows.iter().map(Self::row_to_entry).collect();
351
352 Ok((entries, total))
353 }
354
355 async fn delete_older_than(&self, timestamp: DateTime<Utc>) -> Result<i64, String> {
356 let result = sqlx::query(
357 r#"
358 DELETE FROM audit_logs
359 WHERE timestamp < $1
360 "#,
361 )
362 .bind(timestamp)
363 .execute(&self.pool)
364 .await
365 .map_err(|e| format!("Database error: {}", e))?;
366
367 Ok(result.rows_affected() as i64)
368 }
369
370 async fn count_by_filters(&self, filters: &AuditLogFilters) -> Result<i64, String> {
371 let mut where_clauses = Vec::new();
372 let mut param_index = 1;
373
374 if filters.user_id.is_some() {
375 where_clauses.push(format!("user_id = ${}", param_index));
376 param_index += 1;
377 }
378 if filters.organization_id.is_some() {
379 where_clauses.push(format!("organization_id = ${}", param_index));
380 param_index += 1;
381 }
382 if filters.event_type.is_some() {
383 where_clauses.push(format!("event_type = ${}", param_index));
384 param_index += 1;
385 }
386 if filters.success.is_some() {
387 where_clauses.push(format!("success = ${}", param_index));
388 param_index += 1;
389 }
390 if filters.start_date.is_some() {
391 where_clauses.push(format!("timestamp >= ${}", param_index));
392 param_index += 1;
393 }
394 if filters.end_date.is_some() {
395 where_clauses.push(format!("timestamp <= ${}", param_index));
396 param_index += 1;
397 }
398 if filters.resource_type.is_some() {
399 where_clauses.push(format!("resource_type = ${}", param_index));
400 param_index += 1;
401 }
402 if filters.resource_id.is_some() {
403 where_clauses.push(format!("resource_id = ${}", param_index));
404 }
405
406 let where_clause = if where_clauses.is_empty() {
407 String::new()
408 } else {
409 format!("WHERE {}", where_clauses.join(" AND "))
410 };
411
412 let count_query = format!("SELECT COUNT(*) as count FROM audit_logs {}", where_clause);
413 let mut query_builder = sqlx::query(&count_query);
414
415 if let Some(user_id) = filters.user_id {
417 query_builder = query_builder.bind(user_id);
418 }
419 if let Some(org_id) = filters.organization_id {
420 query_builder = query_builder.bind(org_id);
421 }
422 if let Some(ref event_type) = filters.event_type {
423 query_builder = query_builder.bind(Self::event_type_to_string(event_type));
424 }
425 if let Some(success) = filters.success {
426 query_builder = query_builder.bind(success);
427 }
428 if let Some(start_date) = filters.start_date {
429 query_builder = query_builder.bind(start_date);
430 }
431 if let Some(end_date) = filters.end_date {
432 query_builder = query_builder.bind(end_date);
433 }
434 if let Some(ref resource_type) = filters.resource_type {
435 query_builder = query_builder.bind(resource_type);
436 }
437 if let Some(resource_id) = filters.resource_id {
438 query_builder = query_builder.bind(resource_id);
439 }
440
441 let row = query_builder
442 .fetch_one(&self.pool)
443 .await
444 .map_err(|e| format!("Database error: {}", e))?;
445
446 Ok(row.get("count"))
447 }
448}