koprogo_api/infrastructure/database/repositories/
gdpr_repository_impl.rs

1use crate::application::ports::GdprRepository;
2use crate::domain::entities::gdpr_export::{
3    DocumentData, ExpenseData, GdprExport, MeetingData, OwnerData, UnitOwnershipData, UserData,
4};
5use async_trait::async_trait;
6use sqlx::PgPool;
7use std::sync::Arc;
8use uuid::Uuid;
9
10/// PostgreSQL implementation of GdprRepository
11pub struct PostgresGdprRepository {
12    pool: Arc<PgPool>,
13}
14
15impl PostgresGdprRepository {
16    pub fn new(pool: Arc<PgPool>) -> Self {
17        Self { pool }
18    }
19}
20
21#[async_trait]
22impl GdprRepository for PostgresGdprRepository {
23    async fn aggregate_user_data(
24        &self,
25        user_id: Uuid,
26        organization_id: Option<Uuid>,
27    ) -> Result<GdprExport, String> {
28        // Fetch user data
29        let user_data = self.fetch_user_data(user_id).await?;
30
31        // Fetch owner profiles linked to this user
32        let owner_profiles = self.fetch_owner_profiles(user_id, organization_id).await?;
33
34        // Fetch unit ownership data
35        let units = self.fetch_unit_ownership(user_id, organization_id).await?;
36
37        // Fetch expenses
38        let expenses = self.fetch_expenses(user_id, organization_id).await?;
39
40        // Fetch documents
41        let documents = self.fetch_documents(user_id, organization_id).await?;
42
43        // Fetch meetings
44        let meetings = self.fetch_meetings(user_id, organization_id).await?;
45
46        // Build GdprExport aggregate
47        let mut export = GdprExport::new(user_data);
48
49        for owner in owner_profiles {
50            export.add_owner_profile(owner);
51        }
52
53        for unit in units {
54            export.add_unit_ownership(unit);
55        }
56
57        for expense in expenses {
58            export.add_expense(expense);
59        }
60
61        for document in documents {
62            export.add_document(document);
63        }
64
65        for meeting in meetings {
66            export.add_meeting(meeting);
67        }
68
69        Ok(export)
70    }
71
72    async fn anonymize_user(&self, user_id: Uuid) -> Result<(), String> {
73        let result = sqlx::query!(
74            r#"
75            UPDATE users
76            SET
77                email = CONCAT('anonymized-', id::text, '@deleted.local'),
78                first_name = 'Anonymized',
79                last_name = 'User',
80                is_anonymized = TRUE,
81                anonymized_at = NOW(),
82                updated_at = NOW()
83            WHERE id = $1 AND is_anonymized = FALSE
84            "#,
85            user_id
86        )
87        .execute(self.pool.as_ref())
88        .await
89        .map_err(|e| format!("Failed to anonymize user: {}", e))?;
90
91        if result.rows_affected() == 0 {
92            return Err("User not found or already anonymized".to_string());
93        }
94
95        Ok(())
96    }
97
98    async fn anonymize_owner(&self, owner_id: Uuid) -> Result<(), String> {
99        let result = sqlx::query!(
100            r#"
101            UPDATE owners
102            SET
103                email = NULL,
104                phone = NULL,
105                address = NULL,
106                city = NULL,
107                postal_code = NULL,
108                country = NULL,
109                first_name = 'Anonymized',
110                last_name = 'User',
111                is_anonymized = TRUE,
112                anonymized_at = NOW(),
113                updated_at = NOW()
114            WHERE id = $1 AND is_anonymized = FALSE
115            "#,
116            owner_id
117        )
118        .execute(self.pool.as_ref())
119        .await
120        .map_err(|e| format!("Failed to anonymize owner: {}", e))?;
121
122        if result.rows_affected() == 0 {
123            return Err("Owner not found or already anonymized".to_string());
124        }
125
126        Ok(())
127    }
128
129    async fn find_owner_ids_by_user(
130        &self,
131        user_id: Uuid,
132        organization_id: Option<Uuid>,
133    ) -> Result<Vec<Uuid>, String> {
134        // Fetch owners by user_id foreign key
135        let records = sqlx::query!(
136            r#"
137            SELECT id
138            FROM owners
139            WHERE user_id = $1
140              AND ($2::uuid IS NULL OR organization_id = $2)
141              AND (is_anonymized IS NULL OR is_anonymized = FALSE)
142            "#,
143            user_id,
144            organization_id
145        )
146        .fetch_all(self.pool.as_ref())
147        .await
148        .map_err(|e| format!("Failed to fetch owner IDs: {}", e))?;
149
150        Ok(records.into_iter().map(|r| r.id).collect())
151    }
152
153    async fn check_legal_holds(&self, user_id: Uuid) -> Result<Vec<String>, String> {
154        let mut holds = Vec::new();
155
156        // Check for unpaid expenses linked to user's owner profiles
157        let unpaid_expenses = sqlx::query!(
158            r#"
159            SELECT COUNT(*) as count
160            FROM expenses e
161            INNER JOIN units u ON e.building_id = u.building_id
162            INNER JOIN unit_owners uo ON u.id = uo.unit_id
163            INNER JOIN owners o ON uo.owner_id = o.id
164            WHERE o.user_id = $1 AND e.payment_status != 'paid'
165            "#,
166            user_id
167        )
168        .fetch_one(self.pool.as_ref())
169        .await
170        .map_err(|e| format!("Failed to check unpaid expenses: {}", e))?;
171
172        if unpaid_expenses.count.unwrap_or(0) > 0 {
173            holds.push(format!(
174                "Unpaid expenses: {}",
175                unpaid_expenses.count.unwrap_or(0)
176            ));
177        }
178
179        // Future: Add more legal hold checks here
180        // - Ongoing legal proceedings
181        // - Pending transactions
182        // - Regulatory requirements
183
184        Ok(holds)
185    }
186
187    async fn is_user_anonymized(&self, user_id: Uuid) -> Result<bool, String> {
188        let result = sqlx::query!(
189            r#"
190            SELECT is_anonymized
191            FROM users
192            WHERE id = $1
193            "#,
194            user_id
195        )
196        .fetch_optional(self.pool.as_ref())
197        .await
198        .map_err(|e| format!("Failed to check user anonymization status: {}", e))?;
199
200        match result {
201            Some(record) => Ok(record.is_anonymized),
202            None => Err("User not found".to_string()),
203        }
204    }
205}
206
207impl PostgresGdprRepository {
208    /// Fetch user data from database
209    async fn fetch_user_data(&self, user_id: Uuid) -> Result<UserData, String> {
210        let record = sqlx::query!(
211            r#"
212            SELECT id, email, first_name, last_name, organization_id,
213                   is_active, is_anonymized, created_at, updated_at
214            FROM users
215            WHERE id = $1
216            "#,
217            user_id
218        )
219        .fetch_optional(self.pool.as_ref())
220        .await
221        .map_err(|e| format!("Failed to fetch user data: {}", e))?;
222
223        let record = record.ok_or("User not found".to_string())?;
224
225        Ok(UserData {
226            id: record.id,
227            email: record.email,
228            first_name: record.first_name,
229            last_name: record.last_name,
230            organization_id: record.organization_id,
231            is_active: record.is_active,
232            is_anonymized: record.is_anonymized,
233            created_at: record.created_at,
234            updated_at: record.updated_at,
235        })
236    }
237
238    /// Fetch owner profiles linked to user
239    async fn fetch_owner_profiles(
240        &self,
241        user_id: Uuid,
242        organization_id: Option<Uuid>,
243    ) -> Result<Vec<OwnerData>, String> {
244        let records = sqlx::query!(
245            r#"
246            SELECT id, organization_id, first_name, last_name,
247                   email, phone, address, city, postal_code, country,
248                   is_anonymized, created_at, updated_at
249            FROM owners
250            WHERE user_id = $1
251              AND ($2::uuid IS NULL OR organization_id = $2)
252            "#,
253            user_id,
254            organization_id
255        )
256        .fetch_all(self.pool.as_ref())
257        .await
258        .map_err(|e| format!("Failed to fetch owner profiles: {}", e))?;
259
260        Ok(records
261            .into_iter()
262            .map(|r| OwnerData {
263                id: r.id,
264                organization_id: r.organization_id,
265                first_name: r.first_name,
266                last_name: r.last_name,
267                email: Some(r.email),
268                phone: r.phone,
269                address: Some(r.address),
270                city: Some(r.city),
271                postal_code: Some(r.postal_code),
272                country: Some(r.country),
273                is_anonymized: r.is_anonymized,
274                created_at: r.created_at,
275                updated_at: r.updated_at,
276            })
277            .collect())
278    }
279
280    /// Fetch unit ownership data
281    async fn fetch_unit_ownership(
282        &self,
283        user_id: Uuid,
284        _organization_id: Option<Uuid>,
285    ) -> Result<Vec<UnitOwnershipData>, String> {
286        let records = sqlx::query!(
287            r#"
288            SELECT b.name as building_name, b.address as building_address,
289                   u.unit_number, u.floor, uo.ownership_percentage,
290                   uo.start_date, uo.end_date, uo.is_primary_contact
291            FROM unit_owners uo
292            INNER JOIN units u ON uo.unit_id = u.id
293            INNER JOIN buildings b ON u.building_id = b.id
294            INNER JOIN owners o ON uo.owner_id = o.id
295            WHERE o.user_id = $1
296            ORDER BY b.name, u.unit_number
297            "#,
298            user_id
299        )
300        .fetch_all(self.pool.as_ref())
301        .await
302        .map_err(|e| format!("Failed to fetch unit ownership: {}", e))?;
303
304        Ok(records
305            .into_iter()
306            .map(|r| UnitOwnershipData {
307                building_name: r.building_name,
308                building_address: r.building_address,
309                unit_number: r.unit_number,
310                floor: r.floor,
311                ownership_percentage: r.ownership_percentage,
312                start_date: r.start_date,
313                end_date: r.end_date,
314                is_primary_contact: r.is_primary_contact,
315            })
316            .collect())
317    }
318
319    /// Fetch expenses related to user
320    async fn fetch_expenses(
321        &self,
322        user_id: Uuid,
323        _organization_id: Option<Uuid>,
324    ) -> Result<Vec<ExpenseData>, String> {
325        let records = sqlx::query!(
326            r#"
327            SELECT DISTINCT e.description, e.amount, e.expense_date as due_date,
328                   (e.payment_status = 'paid') as paid, b.name as building_name
329            FROM expenses e
330            INNER JOIN buildings b ON e.building_id = b.id
331            INNER JOIN units u ON b.id = u.building_id
332            INNER JOIN unit_owners uo ON u.id = uo.unit_id
333            INNER JOIN owners o ON uo.owner_id = o.id
334            WHERE o.user_id = $1
335            ORDER BY e.expense_date DESC
336            "#,
337            user_id
338        )
339        .fetch_all(self.pool.as_ref())
340        .await
341        .map_err(|e| format!("Failed to fetch expenses: {}", e))?;
342
343        Ok(records
344            .into_iter()
345            .map(|r| ExpenseData {
346                description: r.description,
347                amount: r.amount,
348                due_date: r.due_date,
349                paid: r.paid.unwrap_or(false),
350                building_name: r.building_name,
351            })
352            .collect())
353    }
354
355    /// Fetch documents related to user
356    async fn fetch_documents(
357        &self,
358        user_id: Uuid,
359        _organization_id: Option<Uuid>,
360    ) -> Result<Vec<DocumentData>, String> {
361        let records = sqlx::query!(
362            r#"
363            SELECT DISTINCT d.title, d.document_type::text as document_type, d.created_at as uploaded_at, b.name as building_name
364            FROM documents d
365            LEFT JOIN buildings b ON d.building_id = b.id
366            WHERE d.uploaded_by = $1
367            ORDER BY d.created_at DESC
368            "#,
369            user_id
370        )
371        .fetch_all(self.pool.as_ref())
372        .await
373        .map_err(|e| format!("Failed to fetch documents: {}", e))?;
374
375        Ok(records
376            .into_iter()
377            .map(|r| DocumentData {
378                title: r.title,
379                document_type: r.document_type.unwrap_or_else(|| "unknown".to_string()),
380                uploaded_at: r.uploaded_at,
381                building_name: Some(r.building_name),
382            })
383            .collect())
384    }
385
386    /// Fetch meetings attended by user
387    async fn fetch_meetings(
388        &self,
389        _user_id: Uuid,
390        _organization_id: Option<Uuid>,
391    ) -> Result<Vec<MeetingData>, String> {
392        let records = sqlx::query!(
393            r#"
394            SELECT DISTINCT m.title, m.scheduled_date as meeting_date, m.agenda::text as agenda, b.name as building_name
395            FROM meetings m
396            INNER JOIN buildings b ON m.building_id = b.id
397            ORDER BY m.scheduled_date DESC
398            "#
399        )
400        .fetch_all(self.pool.as_ref())
401        .await
402        .map_err(|e| format!("Failed to fetch meetings: {}", e))?;
403
404        Ok(records
405            .into_iter()
406            .map(|r| MeetingData {
407                title: r.title,
408                meeting_date: r.meeting_date,
409                agenda: r.agenda,
410                building_name: r.building_name,
411            })
412            .collect())
413    }
414}