1use crate::application::ports::GdprRepository;
2use crate::domain::entities::gdpr_export::{
3 DocumentData, ExpenseData, GdprExport, MeetingData, OwnerData, UnitOwnershipData, UserData,
4};
5use async_trait::async_trait;
6use sqlx::PgPool;
7use std::sync::Arc;
8use uuid::Uuid;
9
10pub struct PostgresGdprRepository {
12 pool: Arc<PgPool>,
13}
14
15impl PostgresGdprRepository {
16 pub fn new(pool: Arc<PgPool>) -> Self {
17 Self { pool }
18 }
19}
20
21#[async_trait]
22impl GdprRepository for PostgresGdprRepository {
23 async fn aggregate_user_data(
24 &self,
25 user_id: Uuid,
26 organization_id: Option<Uuid>,
27 ) -> Result<GdprExport, String> {
28 let user_data = self.fetch_user_data(user_id).await?;
30
31 let owner_profiles = self.fetch_owner_profiles(user_id, organization_id).await?;
33
34 let units = self.fetch_unit_ownership(user_id, organization_id).await?;
36
37 let expenses = self.fetch_expenses(user_id, organization_id).await?;
39
40 let documents = self.fetch_documents(user_id, organization_id).await?;
42
43 let meetings = self.fetch_meetings(user_id, organization_id).await?;
45
46 let mut export = GdprExport::new(user_data);
48
49 for owner in owner_profiles {
50 export.add_owner_profile(owner);
51 }
52
53 for unit in units {
54 export.add_unit_ownership(unit);
55 }
56
57 for expense in expenses {
58 export.add_expense(expense);
59 }
60
61 for document in documents {
62 export.add_document(document);
63 }
64
65 for meeting in meetings {
66 export.add_meeting(meeting);
67 }
68
69 Ok(export)
70 }
71
72 async fn anonymize_user(&self, user_id: Uuid) -> Result<(), String> {
73 let result = sqlx::query!(
74 r#"
75 UPDATE users
76 SET
77 email = CONCAT('anonymized-', id::text, '@deleted.local'),
78 first_name = 'Anonymized',
79 last_name = 'User',
80 is_anonymized = TRUE,
81 anonymized_at = NOW(),
82 updated_at = NOW()
83 WHERE id = $1 AND is_anonymized = FALSE
84 "#,
85 user_id
86 )
87 .execute(self.pool.as_ref())
88 .await
89 .map_err(|e| format!("Failed to anonymize user: {}", e))?;
90
91 if result.rows_affected() == 0 {
92 return Err("User not found or already anonymized".to_string());
93 }
94
95 Ok(())
96 }
97
98 async fn anonymize_owner(&self, owner_id: Uuid) -> Result<(), String> {
99 let result = sqlx::query!(
100 r#"
101 UPDATE owners
102 SET
103 email = NULL,
104 phone = NULL,
105 address = NULL,
106 city = NULL,
107 postal_code = NULL,
108 country = NULL,
109 first_name = 'Anonymized',
110 last_name = 'User',
111 is_anonymized = TRUE,
112 anonymized_at = NOW(),
113 updated_at = NOW()
114 WHERE id = $1 AND is_anonymized = FALSE
115 "#,
116 owner_id
117 )
118 .execute(self.pool.as_ref())
119 .await
120 .map_err(|e| format!("Failed to anonymize owner: {}", e))?;
121
122 if result.rows_affected() == 0 {
123 return Err("Owner not found or already anonymized".to_string());
124 }
125
126 Ok(())
127 }
128
129 async fn find_owner_ids_by_user(
130 &self,
131 user_id: Uuid,
132 organization_id: Option<Uuid>,
133 ) -> Result<Vec<Uuid>, String> {
134 let records = sqlx::query!(
136 r#"
137 SELECT id
138 FROM owners
139 WHERE user_id = $1
140 AND ($2::uuid IS NULL OR organization_id = $2)
141 AND (is_anonymized IS NULL OR is_anonymized = FALSE)
142 "#,
143 user_id,
144 organization_id
145 )
146 .fetch_all(self.pool.as_ref())
147 .await
148 .map_err(|e| format!("Failed to fetch owner IDs: {}", e))?;
149
150 Ok(records.into_iter().map(|r| r.id).collect())
151 }
152
153 async fn check_legal_holds(&self, user_id: Uuid) -> Result<Vec<String>, String> {
154 let mut holds = Vec::new();
155
156 let unpaid_expenses = sqlx::query!(
158 r#"
159 SELECT COUNT(*) as count
160 FROM expenses e
161 INNER JOIN units u ON e.building_id = u.building_id
162 INNER JOIN unit_owners uo ON u.id = uo.unit_id
163 INNER JOIN owners o ON uo.owner_id = o.id
164 WHERE o.user_id = $1 AND e.payment_status != 'paid'
165 "#,
166 user_id
167 )
168 .fetch_one(self.pool.as_ref())
169 .await
170 .map_err(|e| format!("Failed to check unpaid expenses: {}", e))?;
171
172 if unpaid_expenses.count.unwrap_or(0) > 0 {
173 holds.push(format!(
174 "Unpaid expenses: {}",
175 unpaid_expenses.count.unwrap_or(0)
176 ));
177 }
178
179 Ok(holds)
185 }
186
187 async fn is_user_anonymized(&self, user_id: Uuid) -> Result<bool, String> {
188 let result = sqlx::query!(
189 r#"
190 SELECT is_anonymized
191 FROM users
192 WHERE id = $1
193 "#,
194 user_id
195 )
196 .fetch_optional(self.pool.as_ref())
197 .await
198 .map_err(|e| format!("Failed to check user anonymization status: {}", e))?;
199
200 match result {
201 Some(record) => Ok(record.is_anonymized),
202 None => Err("User not found".to_string()),
203 }
204 }
205}
206
207impl PostgresGdprRepository {
208 async fn fetch_user_data(&self, user_id: Uuid) -> Result<UserData, String> {
210 let record = sqlx::query!(
211 r#"
212 SELECT id, email, first_name, last_name, organization_id,
213 is_active, is_anonymized, created_at, updated_at
214 FROM users
215 WHERE id = $1
216 "#,
217 user_id
218 )
219 .fetch_optional(self.pool.as_ref())
220 .await
221 .map_err(|e| format!("Failed to fetch user data: {}", e))?;
222
223 let record = record.ok_or("User not found".to_string())?;
224
225 Ok(UserData {
226 id: record.id,
227 email: record.email,
228 first_name: record.first_name,
229 last_name: record.last_name,
230 organization_id: record.organization_id,
231 is_active: record.is_active,
232 is_anonymized: record.is_anonymized,
233 created_at: record.created_at,
234 updated_at: record.updated_at,
235 })
236 }
237
238 async fn fetch_owner_profiles(
240 &self,
241 user_id: Uuid,
242 organization_id: Option<Uuid>,
243 ) -> Result<Vec<OwnerData>, String> {
244 let records = sqlx::query!(
245 r#"
246 SELECT id, organization_id, first_name, last_name,
247 email, phone, address, city, postal_code, country,
248 is_anonymized, created_at, updated_at
249 FROM owners
250 WHERE user_id = $1
251 AND ($2::uuid IS NULL OR organization_id = $2)
252 "#,
253 user_id,
254 organization_id
255 )
256 .fetch_all(self.pool.as_ref())
257 .await
258 .map_err(|e| format!("Failed to fetch owner profiles: {}", e))?;
259
260 Ok(records
261 .into_iter()
262 .map(|r| OwnerData {
263 id: r.id,
264 organization_id: r.organization_id,
265 first_name: r.first_name,
266 last_name: r.last_name,
267 email: Some(r.email),
268 phone: r.phone,
269 address: Some(r.address),
270 city: Some(r.city),
271 postal_code: Some(r.postal_code),
272 country: Some(r.country),
273 is_anonymized: r.is_anonymized,
274 created_at: r.created_at,
275 updated_at: r.updated_at,
276 })
277 .collect())
278 }
279
280 async fn fetch_unit_ownership(
282 &self,
283 user_id: Uuid,
284 _organization_id: Option<Uuid>,
285 ) -> Result<Vec<UnitOwnershipData>, String> {
286 let records = sqlx::query!(
287 r#"
288 SELECT b.name as building_name, b.address as building_address,
289 u.unit_number, u.floor, uo.ownership_percentage,
290 uo.start_date, uo.end_date, uo.is_primary_contact
291 FROM unit_owners uo
292 INNER JOIN units u ON uo.unit_id = u.id
293 INNER JOIN buildings b ON u.building_id = b.id
294 INNER JOIN owners o ON uo.owner_id = o.id
295 WHERE o.user_id = $1
296 ORDER BY b.name, u.unit_number
297 "#,
298 user_id
299 )
300 .fetch_all(self.pool.as_ref())
301 .await
302 .map_err(|e| format!("Failed to fetch unit ownership: {}", e))?;
303
304 Ok(records
305 .into_iter()
306 .map(|r| UnitOwnershipData {
307 building_name: r.building_name,
308 building_address: r.building_address,
309 unit_number: r.unit_number,
310 floor: r.floor,
311 ownership_percentage: r.ownership_percentage,
312 start_date: r.start_date,
313 end_date: r.end_date,
314 is_primary_contact: r.is_primary_contact,
315 })
316 .collect())
317 }
318
319 async fn fetch_expenses(
321 &self,
322 user_id: Uuid,
323 _organization_id: Option<Uuid>,
324 ) -> Result<Vec<ExpenseData>, String> {
325 let records = sqlx::query!(
326 r#"
327 SELECT DISTINCT e.description, e.amount, e.expense_date as due_date,
328 (e.payment_status = 'paid') as paid, b.name as building_name
329 FROM expenses e
330 INNER JOIN buildings b ON e.building_id = b.id
331 INNER JOIN units u ON b.id = u.building_id
332 INNER JOIN unit_owners uo ON u.id = uo.unit_id
333 INNER JOIN owners o ON uo.owner_id = o.id
334 WHERE o.user_id = $1
335 ORDER BY e.expense_date DESC
336 "#,
337 user_id
338 )
339 .fetch_all(self.pool.as_ref())
340 .await
341 .map_err(|e| format!("Failed to fetch expenses: {}", e))?;
342
343 Ok(records
344 .into_iter()
345 .map(|r| ExpenseData {
346 description: r.description,
347 amount: r.amount,
348 due_date: r.due_date,
349 paid: r.paid.unwrap_or(false),
350 building_name: r.building_name,
351 })
352 .collect())
353 }
354
355 async fn fetch_documents(
357 &self,
358 user_id: Uuid,
359 _organization_id: Option<Uuid>,
360 ) -> Result<Vec<DocumentData>, String> {
361 let records = sqlx::query!(
362 r#"
363 SELECT DISTINCT d.title, d.document_type::text as document_type, d.created_at as uploaded_at, b.name as building_name
364 FROM documents d
365 LEFT JOIN buildings b ON d.building_id = b.id
366 WHERE d.uploaded_by = $1
367 ORDER BY d.created_at DESC
368 "#,
369 user_id
370 )
371 .fetch_all(self.pool.as_ref())
372 .await
373 .map_err(|e| format!("Failed to fetch documents: {}", e))?;
374
375 Ok(records
376 .into_iter()
377 .map(|r| DocumentData {
378 title: r.title,
379 document_type: r.document_type.unwrap_or_else(|| "unknown".to_string()),
380 uploaded_at: r.uploaded_at,
381 building_name: Some(r.building_name),
382 })
383 .collect())
384 }
385
386 async fn fetch_meetings(
388 &self,
389 _user_id: Uuid,
390 _organization_id: Option<Uuid>,
391 ) -> Result<Vec<MeetingData>, String> {
392 let records = sqlx::query!(
393 r#"
394 SELECT DISTINCT m.title, m.scheduled_date as meeting_date, m.agenda::text as agenda, b.name as building_name
395 FROM meetings m
396 INNER JOIN buildings b ON m.building_id = b.id
397 ORDER BY m.scheduled_date DESC
398 "#
399 )
400 .fetch_all(self.pool.as_ref())
401 .await
402 .map_err(|e| format!("Failed to fetch meetings: {}", e))?;
403
404 Ok(records
405 .into_iter()
406 .map(|r| MeetingData {
407 title: r.title,
408 meeting_date: r.meeting_date,
409 agenda: r.agenda,
410 building_name: r.building_name,
411 })
412 .collect())
413 }
414}