koprogo_api/infrastructure/database/repositories/
gdpr_repository_impl.rs

1use crate::application::ports::GdprRepository;
2use crate::domain::entities::gdpr_export::{
3    DocumentData, ExpenseData, GdprExport, MeetingData, OwnerData, UnitOwnershipData, UserData,
4};
5use async_trait::async_trait;
6use sqlx::PgPool;
7use std::sync::Arc;
8use uuid::Uuid;
9
10/// PostgreSQL implementation of GdprRepository
11pub struct PostgresGdprRepository {
12    pool: Arc<PgPool>,
13}
14
15impl PostgresGdprRepository {
16    pub fn new(pool: Arc<PgPool>) -> Self {
17        Self { pool }
18    }
19}
20
21#[async_trait]
22impl GdprRepository for PostgresGdprRepository {
23    async fn aggregate_user_data(
24        &self,
25        user_id: Uuid,
26        organization_id: Option<Uuid>,
27    ) -> Result<GdprExport, String> {
28        // Fetch user data
29        let user_data = self.fetch_user_data(user_id).await?;
30
31        // Fetch owner profiles linked to this user
32        let owner_profiles = self.fetch_owner_profiles(user_id, organization_id).await?;
33
34        // Fetch unit ownership data
35        let units = self.fetch_unit_ownership(user_id, organization_id).await?;
36
37        // Fetch expenses
38        let expenses = self.fetch_expenses(user_id, organization_id).await?;
39
40        // Fetch documents
41        let documents = self.fetch_documents(user_id, organization_id).await?;
42
43        // Fetch meetings
44        let meetings = self.fetch_meetings(user_id, organization_id).await?;
45
46        // Build GdprExport aggregate
47        let mut export = GdprExport::new(user_data);
48
49        for owner in owner_profiles {
50            export.add_owner_profile(owner);
51        }
52
53        for unit in units {
54            export.add_unit_ownership(unit);
55        }
56
57        for expense in expenses {
58            export.add_expense(expense);
59        }
60
61        for document in documents {
62            export.add_document(document);
63        }
64
65        for meeting in meetings {
66            export.add_meeting(meeting);
67        }
68
69        Ok(export)
70    }
71
72    async fn anonymize_user(&self, user_id: Uuid) -> Result<(), String> {
73        let result = sqlx::query!(
74            r#"
75            UPDATE users
76            SET
77                email = CONCAT('anonymized-', id::text, '@deleted.local'),
78                first_name = 'Anonymized',
79                last_name = 'User',
80                is_anonymized = TRUE,
81                anonymized_at = NOW(),
82                updated_at = NOW()
83            WHERE id = $1 AND is_anonymized = FALSE
84            "#,
85            user_id
86        )
87        .execute(self.pool.as_ref())
88        .await
89        .map_err(|e| format!("Failed to anonymize user: {}", e))?;
90
91        if result.rows_affected() == 0 {
92            return Err("User not found or already anonymized".to_string());
93        }
94
95        Ok(())
96    }
97
98    async fn anonymize_owner(&self, owner_id: Uuid) -> Result<(), String> {
99        let anonymized_email = format!("anonymized-{}@deleted.local", owner_id);
100        let result = sqlx::query!(
101            r#"
102            UPDATE owners
103            SET
104                email = $2,
105                phone = NULL,
106                address = 'Anonymized',
107                city = 'Anonymized',
108                postal_code = '0000',
109                country = 'Anonymized',
110                first_name = 'Anonymized',
111                last_name = 'User',
112                is_anonymized = TRUE,
113                anonymized_at = NOW(),
114                updated_at = NOW()
115            WHERE id = $1 AND is_anonymized = FALSE
116            "#,
117            owner_id,
118            anonymized_email
119        )
120        .execute(self.pool.as_ref())
121        .await
122        .map_err(|e| format!("Failed to anonymize owner: {}", e))?;
123
124        if result.rows_affected() == 0 {
125            return Err("Owner not found or already anonymized".to_string());
126        }
127
128        Ok(())
129    }
130
131    async fn find_owner_ids_by_user(
132        &self,
133        user_id: Uuid,
134        organization_id: Option<Uuid>,
135    ) -> Result<Vec<Uuid>, String> {
136        // Fetch owners by user_id foreign key
137        let records = sqlx::query!(
138            r#"
139            SELECT id
140            FROM owners
141            WHERE user_id = $1
142              AND ($2::uuid IS NULL OR organization_id = $2)
143              AND (is_anonymized IS NULL OR is_anonymized = FALSE)
144            "#,
145            user_id,
146            organization_id
147        )
148        .fetch_all(self.pool.as_ref())
149        .await
150        .map_err(|e| format!("Failed to fetch owner IDs: {}", e))?;
151
152        Ok(records.into_iter().map(|r| r.id).collect())
153    }
154
155    async fn check_legal_holds(&self, user_id: Uuid) -> Result<Vec<String>, String> {
156        let mut holds = Vec::new();
157
158        // Check for unpaid expenses linked to user's owner profiles
159        let unpaid_expenses = sqlx::query!(
160            r#"
161            SELECT COUNT(*) as count
162            FROM expenses e
163            INNER JOIN units u ON e.building_id = u.building_id
164            INNER JOIN unit_owners uo ON u.id = uo.unit_id
165            INNER JOIN owners o ON uo.owner_id = o.id
166            WHERE o.user_id = $1 AND e.payment_status != 'paid'
167            "#,
168            user_id
169        )
170        .fetch_one(self.pool.as_ref())
171        .await
172        .map_err(|e| format!("Failed to check unpaid expenses: {}", e))?;
173
174        if unpaid_expenses.count.unwrap_or(0) > 0 {
175            holds.push(format!(
176                "Unpaid expenses: {}",
177                unpaid_expenses.count.unwrap_or(0)
178            ));
179        }
180
181        // Future: Add more legal hold checks here
182        // - Ongoing legal proceedings
183        // - Pending transactions
184        // - Regulatory requirements
185
186        Ok(holds)
187    }
188
189    async fn is_user_anonymized(&self, user_id: Uuid) -> Result<bool, String> {
190        let result = sqlx::query!(
191            r#"
192            SELECT is_anonymized
193            FROM users
194            WHERE id = $1
195            "#,
196            user_id
197        )
198        .fetch_optional(self.pool.as_ref())
199        .await
200        .map_err(|e| format!("Failed to check user anonymization status: {}", e))?;
201
202        match result {
203            Some(record) => Ok(record.is_anonymized),
204            None => Err("User not found".to_string()),
205        }
206    }
207}
208
209impl PostgresGdprRepository {
210    /// Fetch user data from database
211    async fn fetch_user_data(&self, user_id: Uuid) -> Result<UserData, String> {
212        let record = sqlx::query!(
213            r#"
214            SELECT id, email, first_name, last_name, organization_id,
215                   is_active, is_anonymized, created_at, updated_at
216            FROM users
217            WHERE id = $1
218            "#,
219            user_id
220        )
221        .fetch_optional(self.pool.as_ref())
222        .await
223        .map_err(|e| format!("Failed to fetch user data: {}", e))?;
224
225        let record = record.ok_or("User not found".to_string())?;
226
227        Ok(UserData {
228            id: record.id,
229            email: record.email,
230            first_name: record.first_name,
231            last_name: record.last_name,
232            organization_id: record.organization_id,
233            is_active: record.is_active,
234            is_anonymized: record.is_anonymized,
235            created_at: record.created_at,
236            updated_at: record.updated_at,
237        })
238    }
239
240    /// Fetch owner profiles linked to user
241    async fn fetch_owner_profiles(
242        &self,
243        user_id: Uuid,
244        organization_id: Option<Uuid>,
245    ) -> Result<Vec<OwnerData>, String> {
246        let records = sqlx::query!(
247            r#"
248            SELECT id, organization_id, first_name, last_name,
249                   email, phone, address, city, postal_code, country,
250                   is_anonymized, created_at, updated_at
251            FROM owners
252            WHERE user_id = $1
253              AND ($2::uuid IS NULL OR organization_id = $2)
254            "#,
255            user_id,
256            organization_id
257        )
258        .fetch_all(self.pool.as_ref())
259        .await
260        .map_err(|e| format!("Failed to fetch owner profiles: {}", e))?;
261
262        Ok(records
263            .into_iter()
264            .map(|r| OwnerData {
265                id: r.id,
266                organization_id: r.organization_id,
267                first_name: r.first_name,
268                last_name: r.last_name,
269                email: Some(r.email),
270                phone: r.phone,
271                address: Some(r.address),
272                city: Some(r.city),
273                postal_code: Some(r.postal_code),
274                country: Some(r.country),
275                is_anonymized: r.is_anonymized,
276                created_at: r.created_at,
277                updated_at: r.updated_at,
278            })
279            .collect())
280    }
281
282    /// Fetch unit ownership data
283    async fn fetch_unit_ownership(
284        &self,
285        user_id: Uuid,
286        _organization_id: Option<Uuid>,
287    ) -> Result<Vec<UnitOwnershipData>, String> {
288        let records = sqlx::query!(
289            r#"
290            SELECT b.name as building_name, b.address as building_address,
291                   u.unit_number, u.floor, uo.ownership_percentage,
292                   uo.start_date, uo.end_date, uo.is_primary_contact
293            FROM unit_owners uo
294            INNER JOIN units u ON uo.unit_id = u.id
295            INNER JOIN buildings b ON u.building_id = b.id
296            INNER JOIN owners o ON uo.owner_id = o.id
297            WHERE o.user_id = $1
298            ORDER BY b.name, u.unit_number
299            "#,
300            user_id
301        )
302        .fetch_all(self.pool.as_ref())
303        .await
304        .map_err(|e| format!("Failed to fetch unit ownership: {}", e))?;
305
306        Ok(records
307            .into_iter()
308            .map(|r| UnitOwnershipData {
309                building_name: r.building_name,
310                building_address: r.building_address,
311                unit_number: r.unit_number,
312                floor: r.floor,
313                ownership_percentage: r.ownership_percentage,
314                start_date: r.start_date,
315                end_date: r.end_date,
316                is_primary_contact: r.is_primary_contact,
317            })
318            .collect())
319    }
320
321    /// Fetch expenses related to user
322    async fn fetch_expenses(
323        &self,
324        user_id: Uuid,
325        _organization_id: Option<Uuid>,
326    ) -> Result<Vec<ExpenseData>, String> {
327        let records = sqlx::query!(
328            r#"
329            SELECT DISTINCT e.description, e.amount, e.expense_date as due_date,
330                   (e.payment_status = 'paid') as paid, b.name as building_name
331            FROM expenses e
332            INNER JOIN buildings b ON e.building_id = b.id
333            INNER JOIN units u ON b.id = u.building_id
334            INNER JOIN unit_owners uo ON u.id = uo.unit_id
335            INNER JOIN owners o ON uo.owner_id = o.id
336            WHERE o.user_id = $1
337            ORDER BY e.expense_date DESC
338            "#,
339            user_id
340        )
341        .fetch_all(self.pool.as_ref())
342        .await
343        .map_err(|e| format!("Failed to fetch expenses: {}", e))?;
344
345        Ok(records
346            .into_iter()
347            .map(|r| ExpenseData {
348                description: r.description,
349                amount: r.amount,
350                due_date: r.due_date,
351                paid: r.paid.unwrap_or(false),
352                building_name: r.building_name,
353            })
354            .collect())
355    }
356
357    /// Fetch documents related to user
358    async fn fetch_documents(
359        &self,
360        user_id: Uuid,
361        _organization_id: Option<Uuid>,
362    ) -> Result<Vec<DocumentData>, String> {
363        let records = sqlx::query!(
364            r#"
365            SELECT DISTINCT d.title, d.document_type::text as document_type, d.created_at as uploaded_at, b.name as building_name
366            FROM documents d
367            LEFT JOIN buildings b ON d.building_id = b.id
368            WHERE d.uploaded_by = $1
369            ORDER BY d.created_at DESC
370            "#,
371            user_id
372        )
373        .fetch_all(self.pool.as_ref())
374        .await
375        .map_err(|e| format!("Failed to fetch documents: {}", e))?;
376
377        Ok(records
378            .into_iter()
379            .map(|r| DocumentData {
380                title: r.title,
381                document_type: r.document_type.unwrap_or_else(|| "unknown".to_string()),
382                uploaded_at: r.uploaded_at,
383                building_name: Some(r.building_name),
384            })
385            .collect())
386    }
387
388    /// Fetch meetings attended by user
389    async fn fetch_meetings(
390        &self,
391        _user_id: Uuid,
392        _organization_id: Option<Uuid>,
393    ) -> Result<Vec<MeetingData>, String> {
394        let records = sqlx::query!(
395            r#"
396            SELECT DISTINCT m.title, m.scheduled_date as meeting_date, m.agenda::text as agenda, b.name as building_name
397            FROM meetings m
398            INNER JOIN buildings b ON m.building_id = b.id
399            ORDER BY m.scheduled_date DESC
400            "#
401        )
402        .fetch_all(self.pool.as_ref())
403        .await
404        .map_err(|e| format!("Failed to fetch meetings: {}", e))?;
405
406        Ok(records
407            .into_iter()
408            .map(|r| MeetingData {
409                title: r.title,
410                meeting_date: r.meeting_date,
411                agenda: r.agenda,
412                building_name: r.building_name,
413            })
414            .collect())
415    }
416}