1use crate::application::ports::GdprRepository;
2use crate::domain::entities::gdpr_export::{
3 DocumentData, ExpenseData, GdprExport, MeetingData, OwnerData, UnitOwnershipData, UserData,
4};
5use async_trait::async_trait;
6use sqlx::PgPool;
7use std::sync::Arc;
8use uuid::Uuid;
9
10pub struct PostgresGdprRepository {
12 pool: Arc<PgPool>,
13}
14
15impl PostgresGdprRepository {
16 pub fn new(pool: Arc<PgPool>) -> Self {
17 Self { pool }
18 }
19}
20
21#[async_trait]
22impl GdprRepository for PostgresGdprRepository {
23 async fn aggregate_user_data(
24 &self,
25 user_id: Uuid,
26 organization_id: Option<Uuid>,
27 ) -> Result<GdprExport, String> {
28 let user_data = self.fetch_user_data(user_id).await?;
30
31 let owner_profiles = self.fetch_owner_profiles(user_id, organization_id).await?;
33
34 let units = self.fetch_unit_ownership(user_id, organization_id).await?;
36
37 let expenses = self.fetch_expenses(user_id, organization_id).await?;
39
40 let documents = self.fetch_documents(user_id, organization_id).await?;
42
43 let meetings = self.fetch_meetings(user_id, organization_id).await?;
45
46 let mut export = GdprExport::new(user_data);
48
49 for owner in owner_profiles {
50 export.add_owner_profile(owner);
51 }
52
53 for unit in units {
54 export.add_unit_ownership(unit);
55 }
56
57 for expense in expenses {
58 export.add_expense(expense);
59 }
60
61 for document in documents {
62 export.add_document(document);
63 }
64
65 for meeting in meetings {
66 export.add_meeting(meeting);
67 }
68
69 Ok(export)
70 }
71
72 async fn anonymize_user(&self, user_id: Uuid) -> Result<(), String> {
73 let result = sqlx::query!(
74 r#"
75 UPDATE users
76 SET
77 email = CONCAT('anonymized-', id::text, '@deleted.local'),
78 first_name = 'Anonymized',
79 last_name = 'User',
80 is_anonymized = TRUE,
81 anonymized_at = NOW(),
82 updated_at = NOW()
83 WHERE id = $1 AND is_anonymized = FALSE
84 "#,
85 user_id
86 )
87 .execute(self.pool.as_ref())
88 .await
89 .map_err(|e| format!("Failed to anonymize user: {}", e))?;
90
91 if result.rows_affected() == 0 {
92 return Err("User not found or already anonymized".to_string());
93 }
94
95 Ok(())
96 }
97
98 async fn anonymize_owner(&self, owner_id: Uuid) -> Result<(), String> {
99 let anonymized_email = format!("anonymized-{}@deleted.local", owner_id);
100 let result = sqlx::query!(
101 r#"
102 UPDATE owners
103 SET
104 email = $2,
105 phone = NULL,
106 address = 'Anonymized',
107 city = 'Anonymized',
108 postal_code = '0000',
109 country = 'Anonymized',
110 first_name = 'Anonymized',
111 last_name = 'User',
112 is_anonymized = TRUE,
113 anonymized_at = NOW(),
114 updated_at = NOW()
115 WHERE id = $1 AND is_anonymized = FALSE
116 "#,
117 owner_id,
118 anonymized_email
119 )
120 .execute(self.pool.as_ref())
121 .await
122 .map_err(|e| format!("Failed to anonymize owner: {}", e))?;
123
124 if result.rows_affected() == 0 {
125 return Err("Owner not found or already anonymized".to_string());
126 }
127
128 Ok(())
129 }
130
131 async fn find_owner_ids_by_user(
132 &self,
133 user_id: Uuid,
134 organization_id: Option<Uuid>,
135 ) -> Result<Vec<Uuid>, String> {
136 let records = sqlx::query!(
138 r#"
139 SELECT id
140 FROM owners
141 WHERE user_id = $1
142 AND ($2::uuid IS NULL OR organization_id = $2)
143 AND (is_anonymized IS NULL OR is_anonymized = FALSE)
144 "#,
145 user_id,
146 organization_id
147 )
148 .fetch_all(self.pool.as_ref())
149 .await
150 .map_err(|e| format!("Failed to fetch owner IDs: {}", e))?;
151
152 Ok(records.into_iter().map(|r| r.id).collect())
153 }
154
155 async fn check_legal_holds(&self, user_id: Uuid) -> Result<Vec<String>, String> {
156 let mut holds = Vec::new();
157
158 let unpaid_expenses = sqlx::query!(
160 r#"
161 SELECT COUNT(*) as count
162 FROM expenses e
163 INNER JOIN units u ON e.building_id = u.building_id
164 INNER JOIN unit_owners uo ON u.id = uo.unit_id
165 INNER JOIN owners o ON uo.owner_id = o.id
166 WHERE o.user_id = $1 AND e.payment_status != 'paid'
167 "#,
168 user_id
169 )
170 .fetch_one(self.pool.as_ref())
171 .await
172 .map_err(|e| format!("Failed to check unpaid expenses: {}", e))?;
173
174 if unpaid_expenses.count.unwrap_or(0) > 0 {
175 holds.push(format!(
176 "Unpaid expenses: {}",
177 unpaid_expenses.count.unwrap_or(0)
178 ));
179 }
180
181 Ok(holds)
187 }
188
189 async fn is_user_anonymized(&self, user_id: Uuid) -> Result<bool, String> {
190 let result = sqlx::query!(
191 r#"
192 SELECT is_anonymized
193 FROM users
194 WHERE id = $1
195 "#,
196 user_id
197 )
198 .fetch_optional(self.pool.as_ref())
199 .await
200 .map_err(|e| format!("Failed to check user anonymization status: {}", e))?;
201
202 match result {
203 Some(record) => Ok(record.is_anonymized),
204 None => Err("User not found".to_string()),
205 }
206 }
207}
208
209impl PostgresGdprRepository {
210 async fn fetch_user_data(&self, user_id: Uuid) -> Result<UserData, String> {
212 let record = sqlx::query!(
213 r#"
214 SELECT id, email, first_name, last_name, organization_id,
215 is_active, is_anonymized, created_at, updated_at
216 FROM users
217 WHERE id = $1
218 "#,
219 user_id
220 )
221 .fetch_optional(self.pool.as_ref())
222 .await
223 .map_err(|e| format!("Failed to fetch user data: {}", e))?;
224
225 let record = record.ok_or("User not found".to_string())?;
226
227 Ok(UserData {
228 id: record.id,
229 email: record.email,
230 first_name: record.first_name,
231 last_name: record.last_name,
232 organization_id: record.organization_id,
233 is_active: record.is_active,
234 is_anonymized: record.is_anonymized,
235 created_at: record.created_at,
236 updated_at: record.updated_at,
237 })
238 }
239
240 async fn fetch_owner_profiles(
242 &self,
243 user_id: Uuid,
244 organization_id: Option<Uuid>,
245 ) -> Result<Vec<OwnerData>, String> {
246 let records = sqlx::query!(
247 r#"
248 SELECT id, organization_id, first_name, last_name,
249 email, phone, address, city, postal_code, country,
250 is_anonymized, created_at, updated_at
251 FROM owners
252 WHERE user_id = $1
253 AND ($2::uuid IS NULL OR organization_id = $2)
254 "#,
255 user_id,
256 organization_id
257 )
258 .fetch_all(self.pool.as_ref())
259 .await
260 .map_err(|e| format!("Failed to fetch owner profiles: {}", e))?;
261
262 Ok(records
263 .into_iter()
264 .map(|r| OwnerData {
265 id: r.id,
266 organization_id: r.organization_id,
267 first_name: r.first_name,
268 last_name: r.last_name,
269 email: Some(r.email),
270 phone: r.phone,
271 address: Some(r.address),
272 city: Some(r.city),
273 postal_code: Some(r.postal_code),
274 country: Some(r.country),
275 is_anonymized: r.is_anonymized,
276 created_at: r.created_at,
277 updated_at: r.updated_at,
278 })
279 .collect())
280 }
281
282 async fn fetch_unit_ownership(
284 &self,
285 user_id: Uuid,
286 _organization_id: Option<Uuid>,
287 ) -> Result<Vec<UnitOwnershipData>, String> {
288 let records = sqlx::query!(
289 r#"
290 SELECT b.name as building_name, b.address as building_address,
291 u.unit_number, u.floor, uo.ownership_percentage,
292 uo.start_date, uo.end_date, uo.is_primary_contact
293 FROM unit_owners uo
294 INNER JOIN units u ON uo.unit_id = u.id
295 INNER JOIN buildings b ON u.building_id = b.id
296 INNER JOIN owners o ON uo.owner_id = o.id
297 WHERE o.user_id = $1
298 ORDER BY b.name, u.unit_number
299 "#,
300 user_id
301 )
302 .fetch_all(self.pool.as_ref())
303 .await
304 .map_err(|e| format!("Failed to fetch unit ownership: {}", e))?;
305
306 Ok(records
307 .into_iter()
308 .map(|r| UnitOwnershipData {
309 building_name: r.building_name,
310 building_address: r.building_address,
311 unit_number: r.unit_number,
312 floor: r.floor,
313 ownership_percentage: r.ownership_percentage,
314 start_date: r.start_date,
315 end_date: r.end_date,
316 is_primary_contact: r.is_primary_contact,
317 })
318 .collect())
319 }
320
321 async fn fetch_expenses(
323 &self,
324 user_id: Uuid,
325 _organization_id: Option<Uuid>,
326 ) -> Result<Vec<ExpenseData>, String> {
327 let records = sqlx::query!(
328 r#"
329 SELECT DISTINCT e.description, e.amount, e.expense_date as due_date,
330 (e.payment_status = 'paid') as paid, b.name as building_name
331 FROM expenses e
332 INNER JOIN buildings b ON e.building_id = b.id
333 INNER JOIN units u ON b.id = u.building_id
334 INNER JOIN unit_owners uo ON u.id = uo.unit_id
335 INNER JOIN owners o ON uo.owner_id = o.id
336 WHERE o.user_id = $1
337 ORDER BY e.expense_date DESC
338 "#,
339 user_id
340 )
341 .fetch_all(self.pool.as_ref())
342 .await
343 .map_err(|e| format!("Failed to fetch expenses: {}", e))?;
344
345 Ok(records
346 .into_iter()
347 .map(|r| ExpenseData {
348 description: r.description,
349 amount: r.amount,
350 due_date: r.due_date,
351 paid: r.paid.unwrap_or(false),
352 building_name: r.building_name,
353 })
354 .collect())
355 }
356
357 async fn fetch_documents(
359 &self,
360 user_id: Uuid,
361 _organization_id: Option<Uuid>,
362 ) -> Result<Vec<DocumentData>, String> {
363 let records = sqlx::query!(
364 r#"
365 SELECT DISTINCT d.title, d.document_type::text as document_type, d.created_at as uploaded_at, b.name as building_name
366 FROM documents d
367 LEFT JOIN buildings b ON d.building_id = b.id
368 WHERE d.uploaded_by = $1
369 ORDER BY d.created_at DESC
370 "#,
371 user_id
372 )
373 .fetch_all(self.pool.as_ref())
374 .await
375 .map_err(|e| format!("Failed to fetch documents: {}", e))?;
376
377 Ok(records
378 .into_iter()
379 .map(|r| DocumentData {
380 title: r.title,
381 document_type: r.document_type.unwrap_or_else(|| "unknown".to_string()),
382 uploaded_at: r.uploaded_at,
383 building_name: Some(r.building_name),
384 })
385 .collect())
386 }
387
388 async fn fetch_meetings(
390 &self,
391 _user_id: Uuid,
392 _organization_id: Option<Uuid>,
393 ) -> Result<Vec<MeetingData>, String> {
394 let records = sqlx::query!(
395 r#"
396 SELECT DISTINCT m.title, m.scheduled_date as meeting_date, m.agenda::text as agenda, b.name as building_name
397 FROM meetings m
398 INNER JOIN buildings b ON m.building_id = b.id
399 ORDER BY m.scheduled_date DESC
400 "#
401 )
402 .fetch_all(self.pool.as_ref())
403 .await
404 .map_err(|e| format!("Failed to fetch meetings: {}", e))?;
405
406 Ok(records
407 .into_iter()
408 .map(|r| MeetingData {
409 title: r.title,
410 meeting_date: r.meeting_date,
411 agenda: r.agenda,
412 building_name: r.building_name,
413 })
414 .collect())
415 }
416}