koprogo_api/infrastructure/database/repositories/
energy_bill_upload_repository_impl.rs

1use async_trait::async_trait;
2use sqlx::PgPool;
3use uuid::Uuid;
4
5use crate::application::ports::EnergyBillUploadRepository;
6use crate::domain::entities::{EnergyBillUpload, EnergyType};
7
8pub struct PostgresEnergyBillUploadRepository {
9    pub pool: PgPool,
10}
11
12impl PostgresEnergyBillUploadRepository {
13    pub fn new(pool: PgPool) -> Self {
14        Self { pool }
15    }
16}
17
18#[async_trait]
19impl EnergyBillUploadRepository for PostgresEnergyBillUploadRepository {
20    async fn create(&self, upload: &EnergyBillUpload) -> Result<EnergyBillUpload, String> {
21        sqlx::query!(
22            r#"
23            INSERT INTO energy_bill_uploads (
24                id, campaign_id, unit_id, building_id, organization_id,
25                bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
26                file_hash, file_path_encrypted, ocr_confidence, manually_verified,
27                uploaded_by, uploaded_at, verified_at, verified_by,
28                consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
29                anonymized, retention_until, deleted_at, created_at, updated_at
30            )
31            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28)
32            "#,
33            upload.id,
34            upload.campaign_id,
35            upload.unit_id,
36            upload.building_id,
37            upload.organization_id,
38            upload.bill_period_start,
39            upload.bill_period_end,
40            &upload.total_kwh_encrypted,
41            upload.energy_type.to_string(),
42            upload.provider.as_ref(),
43            upload.postal_code,
44            upload.file_hash,
45            upload.file_path_encrypted,
46            upload.ocr_confidence,
47            upload.manually_verified,
48            upload.uploaded_by,
49            upload.uploaded_at,
50            upload.verified_at,
51            upload.verified_by,
52            upload.consent_timestamp,
53            upload.consent_ip,
54            upload.consent_user_agent,
55            upload.consent_signature_hash,
56            upload.anonymized,
57            upload.retention_until,
58            upload.deleted_at,
59            upload.created_at,
60            upload.updated_at,
61        )
62        .execute(&self.pool)
63        .await
64        .map_err(|e| format!("Failed to create energy bill upload: {}", e))?;
65
66        Ok(upload.clone())
67    }
68
69    async fn find_by_id(&self, id: Uuid) -> Result<Option<EnergyBillUpload>, String> {
70        let row = sqlx::query!(
71            r#"
72            SELECT
73                id, campaign_id, unit_id, building_id, organization_id,
74                bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
75                file_hash, file_path_encrypted, ocr_confidence, manually_verified,
76                uploaded_by, uploaded_at, verified_at, verified_by,
77                consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
78                anonymized, retention_until, deleted_at, created_at, updated_at
79            FROM energy_bill_uploads
80            WHERE id = $1
81            "#,
82            id
83        )
84        .fetch_optional(&self.pool)
85        .await
86        .map_err(|e| format!("Failed to find energy bill upload: {}", e))?;
87
88        Ok(row.map(|r| EnergyBillUpload {
89            id: r.id,
90            campaign_id: r.campaign_id,
91            unit_id: r.unit_id,
92            building_id: r.building_id,
93            organization_id: r.organization_id,
94            bill_period_start: r.bill_period_start,
95            bill_period_end: r.bill_period_end,
96            total_kwh_encrypted: r.total_kwh_encrypted,
97            energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
98            provider: r.provider,
99            postal_code: r.postal_code,
100            file_hash: r.file_hash,
101            file_path_encrypted: r.file_path_encrypted,
102            ocr_confidence: r.ocr_confidence,
103            manually_verified: r.manually_verified,
104            uploaded_by: r.uploaded_by,
105            uploaded_at: r.uploaded_at,
106            verified_at: r.verified_at,
107            verified_by: r.verified_by,
108            consent_timestamp: r.consent_timestamp,
109            consent_ip: r.consent_ip,
110            consent_user_agent: r.consent_user_agent,
111            consent_signature_hash: r.consent_signature_hash,
112            anonymized: r.anonymized,
113            retention_until: r.retention_until,
114            deleted_at: r.deleted_at,
115            created_at: r.created_at,
116            updated_at: r.updated_at,
117        }))
118    }
119
120    async fn find_by_campaign(&self, campaign_id: Uuid) -> Result<Vec<EnergyBillUpload>, String> {
121        let rows = sqlx::query!(
122            r#"
123            SELECT
124                id, campaign_id, unit_id, building_id, organization_id,
125                bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
126                file_hash, file_path_encrypted, ocr_confidence, manually_verified,
127                uploaded_by, uploaded_at, verified_at, verified_by,
128                consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
129                anonymized, retention_until, deleted_at, created_at, updated_at
130            FROM energy_bill_uploads
131            WHERE campaign_id = $1
132            ORDER BY uploaded_at DESC
133            "#,
134            campaign_id
135        )
136        .fetch_all(&self.pool)
137        .await
138        .map_err(|e| format!("Failed to find uploads by campaign: {}", e))?;
139
140        let uploads = rows
141            .into_iter()
142            .map(|r| EnergyBillUpload {
143                id: r.id,
144                campaign_id: r.campaign_id,
145                unit_id: r.unit_id,
146                building_id: r.building_id,
147                organization_id: r.organization_id,
148                bill_period_start: r.bill_period_start,
149                bill_period_end: r.bill_period_end,
150                total_kwh_encrypted: r.total_kwh_encrypted,
151                energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
152                provider: r.provider,
153                postal_code: r.postal_code,
154                file_hash: r.file_hash,
155                file_path_encrypted: r.file_path_encrypted,
156                ocr_confidence: r.ocr_confidence,
157                manually_verified: r.manually_verified,
158                uploaded_by: r.uploaded_by,
159                uploaded_at: r.uploaded_at,
160                verified_at: r.verified_at,
161                verified_by: r.verified_by,
162                consent_timestamp: r.consent_timestamp,
163                consent_ip: r.consent_ip,
164                consent_user_agent: r.consent_user_agent,
165                consent_signature_hash: r.consent_signature_hash,
166                anonymized: r.anonymized,
167                retention_until: r.retention_until,
168                deleted_at: r.deleted_at,
169                created_at: r.created_at,
170                updated_at: r.updated_at,
171            })
172            .collect();
173
174        Ok(uploads)
175    }
176
177    async fn find_by_unit(&self, unit_id: Uuid) -> Result<Vec<EnergyBillUpload>, String> {
178        let rows = sqlx::query!(
179            r#"
180            SELECT
181                id, campaign_id, unit_id, building_id, organization_id,
182                bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
183                file_hash, file_path_encrypted, ocr_confidence, manually_verified,
184                uploaded_by, uploaded_at, verified_at, verified_by,
185                consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
186                anonymized, retention_until, deleted_at, created_at, updated_at
187            FROM energy_bill_uploads
188            WHERE unit_id = $1
189            ORDER BY uploaded_at DESC
190            "#,
191            unit_id
192        )
193        .fetch_all(&self.pool)
194        .await
195        .map_err(|e| format!("Failed to find uploads by unit: {}", e))?;
196
197        let uploads = rows
198            .into_iter()
199            .map(|r| EnergyBillUpload {
200                id: r.id,
201                campaign_id: r.campaign_id,
202                unit_id: r.unit_id,
203                building_id: r.building_id,
204                organization_id: r.organization_id,
205                bill_period_start: r.bill_period_start,
206                bill_period_end: r.bill_period_end,
207                total_kwh_encrypted: r.total_kwh_encrypted,
208                energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
209                provider: r.provider,
210                postal_code: r.postal_code,
211                file_hash: r.file_hash,
212                file_path_encrypted: r.file_path_encrypted,
213                ocr_confidence: r.ocr_confidence,
214                manually_verified: r.manually_verified,
215                uploaded_by: r.uploaded_by,
216                uploaded_at: r.uploaded_at,
217                verified_at: r.verified_at,
218                verified_by: r.verified_by,
219                consent_timestamp: r.consent_timestamp,
220                consent_ip: r.consent_ip,
221                consent_user_agent: r.consent_user_agent,
222                consent_signature_hash: r.consent_signature_hash,
223                anonymized: r.anonymized,
224                retention_until: r.retention_until,
225                deleted_at: r.deleted_at,
226                created_at: r.created_at,
227                updated_at: r.updated_at,
228            })
229            .collect();
230
231        Ok(uploads)
232    }
233
234    async fn find_by_campaign_and_unit(
235        &self,
236        campaign_id: Uuid,
237        unit_id: Uuid,
238    ) -> Result<Option<EnergyBillUpload>, String> {
239        let row = sqlx::query!(
240            r#"
241            SELECT
242                id, campaign_id, unit_id, building_id, organization_id,
243                bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
244                file_hash, file_path_encrypted, ocr_confidence, manually_verified,
245                uploaded_by, uploaded_at, verified_at, verified_by,
246                consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
247                anonymized, retention_until, deleted_at, created_at, updated_at
248            FROM energy_bill_uploads
249            WHERE campaign_id = $1 AND unit_id = $2
250            "#,
251            campaign_id,
252            unit_id
253        )
254        .fetch_optional(&self.pool)
255        .await
256        .map_err(|e| format!("Failed to find upload by campaign and unit: {}", e))?;
257
258        Ok(row.map(|r| EnergyBillUpload {
259            id: r.id,
260            campaign_id: r.campaign_id,
261            unit_id: r.unit_id,
262            building_id: r.building_id,
263            organization_id: r.organization_id,
264            bill_period_start: r.bill_period_start,
265            bill_period_end: r.bill_period_end,
266            total_kwh_encrypted: r.total_kwh_encrypted,
267            energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
268            provider: r.provider,
269            postal_code: r.postal_code,
270            file_hash: r.file_hash,
271            file_path_encrypted: r.file_path_encrypted,
272            ocr_confidence: r.ocr_confidence,
273            manually_verified: r.manually_verified,
274            uploaded_by: r.uploaded_by,
275            uploaded_at: r.uploaded_at,
276            verified_at: r.verified_at,
277            verified_by: r.verified_by,
278            consent_timestamp: r.consent_timestamp,
279            consent_ip: r.consent_ip,
280            consent_user_agent: r.consent_user_agent,
281            consent_signature_hash: r.consent_signature_hash,
282            anonymized: r.anonymized,
283            retention_until: r.retention_until,
284            deleted_at: r.deleted_at,
285            created_at: r.created_at,
286            updated_at: r.updated_at,
287        }))
288    }
289
290    async fn find_by_building(&self, building_id: Uuid) -> Result<Vec<EnergyBillUpload>, String> {
291        let rows = sqlx::query!(
292            r#"
293            SELECT
294                id, campaign_id, unit_id, building_id, organization_id,
295                bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
296                file_hash, file_path_encrypted, ocr_confidence, manually_verified,
297                uploaded_by, uploaded_at, verified_at, verified_by,
298                consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
299                anonymized, retention_until, deleted_at, created_at, updated_at
300            FROM energy_bill_uploads
301            WHERE building_id = $1
302            ORDER BY uploaded_at DESC
303            "#,
304            building_id
305        )
306        .fetch_all(&self.pool)
307        .await
308        .map_err(|e| format!("Failed to find uploads by building: {}", e))?;
309
310        let uploads = rows
311            .into_iter()
312            .map(|r| EnergyBillUpload {
313                id: r.id,
314                campaign_id: r.campaign_id,
315                unit_id: r.unit_id,
316                building_id: r.building_id,
317                organization_id: r.organization_id,
318                bill_period_start: r.bill_period_start,
319                bill_period_end: r.bill_period_end,
320                total_kwh_encrypted: r.total_kwh_encrypted,
321                energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
322                provider: r.provider,
323                postal_code: r.postal_code,
324                file_hash: r.file_hash,
325                file_path_encrypted: r.file_path_encrypted,
326                ocr_confidence: r.ocr_confidence,
327                manually_verified: r.manually_verified,
328                uploaded_by: r.uploaded_by,
329                uploaded_at: r.uploaded_at,
330                verified_at: r.verified_at,
331                verified_by: r.verified_by,
332                consent_timestamp: r.consent_timestamp,
333                consent_ip: r.consent_ip,
334                consent_user_agent: r.consent_user_agent,
335                consent_signature_hash: r.consent_signature_hash,
336                anonymized: r.anonymized,
337                retention_until: r.retention_until,
338                deleted_at: r.deleted_at,
339                created_at: r.created_at,
340                updated_at: r.updated_at,
341            })
342            .collect();
343
344        Ok(uploads)
345    }
346
347    async fn update(&self, upload: &EnergyBillUpload) -> Result<EnergyBillUpload, String> {
348        sqlx::query!(
349            r#"
350            UPDATE energy_bill_uploads
351            SET
352                bill_period_start = $2,
353                bill_period_end = $3,
354                total_kwh_encrypted = $4,
355                energy_type = $5,
356                provider = $6,
357                postal_code = $7,
358                file_hash = $8,
359                file_path_encrypted = $9,
360                ocr_confidence = $10,
361                manually_verified = $11,
362                verified_at = $12,
363                verified_by = $13,
364                consent_timestamp = $14,
365                consent_ip = $15,
366                consent_user_agent = $16,
367                consent_signature_hash = $17,
368                anonymized = $18,
369                retention_until = $19,
370                deleted_at = $20,
371                updated_at = $21
372            WHERE id = $1
373            "#,
374            upload.id,
375            upload.bill_period_start,
376            upload.bill_period_end,
377            &upload.total_kwh_encrypted,
378            upload.energy_type.to_string(),
379            upload.provider.as_ref(),
380            upload.postal_code,
381            upload.file_hash,
382            upload.file_path_encrypted,
383            upload.ocr_confidence,
384            upload.manually_verified,
385            upload.verified_at,
386            upload.verified_by,
387            upload.consent_timestamp,
388            upload.consent_ip,
389            upload.consent_user_agent,
390            upload.consent_signature_hash,
391            upload.anonymized,
392            upload.retention_until,
393            upload.deleted_at,
394            upload.updated_at,
395        )
396        .execute(&self.pool)
397        .await
398        .map_err(|e| format!("Failed to update energy bill upload: {}", e))?;
399
400        Ok(upload.clone())
401    }
402
403    async fn delete(&self, id: Uuid) -> Result<(), String> {
404        // Soft delete
405        sqlx::query!(
406            r#"
407            UPDATE energy_bill_uploads
408            SET deleted_at = NOW(), updated_at = NOW()
409            WHERE id = $1
410            "#,
411            id
412        )
413        .execute(&self.pool)
414        .await
415        .map_err(|e| format!("Failed to delete energy bill upload: {}", e))?;
416
417        Ok(())
418    }
419
420    async fn find_expired(&self) -> Result<Vec<EnergyBillUpload>, String> {
421        let rows = sqlx::query!(
422            r#"
423            SELECT
424                id, campaign_id, unit_id, building_id, organization_id,
425                bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
426                file_hash, file_path_encrypted, ocr_confidence, manually_verified,
427                uploaded_by, uploaded_at, verified_at, verified_by,
428                consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
429                anonymized, retention_until, deleted_at, created_at, updated_at
430            FROM energy_bill_uploads
431            WHERE retention_until < NOW() AND deleted_at IS NULL
432            "#
433        )
434        .fetch_all(&self.pool)
435        .await
436        .map_err(|e| format!("Failed to find expired uploads: {}", e))?;
437
438        let uploads = rows
439            .into_iter()
440            .map(|r| EnergyBillUpload {
441                id: r.id,
442                campaign_id: r.campaign_id,
443                unit_id: r.unit_id,
444                building_id: r.building_id,
445                organization_id: r.organization_id,
446                bill_period_start: r.bill_period_start,
447                bill_period_end: r.bill_period_end,
448                total_kwh_encrypted: r.total_kwh_encrypted,
449                energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
450                provider: r.provider,
451                postal_code: r.postal_code,
452                file_hash: r.file_hash,
453                file_path_encrypted: r.file_path_encrypted,
454                ocr_confidence: r.ocr_confidence,
455                manually_verified: r.manually_verified,
456                uploaded_by: r.uploaded_by,
457                uploaded_at: r.uploaded_at,
458                verified_at: r.verified_at,
459                verified_by: r.verified_by,
460                consent_timestamp: r.consent_timestamp,
461                consent_ip: r.consent_ip,
462                consent_user_agent: r.consent_user_agent,
463                consent_signature_hash: r.consent_signature_hash,
464                anonymized: r.anonymized,
465                retention_until: r.retention_until,
466                deleted_at: r.deleted_at,
467                created_at: r.created_at,
468                updated_at: r.updated_at,
469            })
470            .collect();
471
472        Ok(uploads)
473    }
474
475    async fn count_verified_by_campaign(&self, campaign_id: Uuid) -> Result<i32, String> {
476        let result = sqlx::query!(
477            r#"
478            SELECT COUNT(*) as count
479            FROM energy_bill_uploads
480            WHERE campaign_id = $1
481            AND manually_verified = TRUE
482            AND deleted_at IS NULL
483            "#,
484            campaign_id
485        )
486        .fetch_one(&self.pool)
487        .await
488        .map_err(|e| format!("Failed to count verified uploads: {}", e))?;
489
490        Ok(result.count.unwrap_or(0) as i32)
491    }
492
493    async fn find_verified_by_campaign(
494        &self,
495        campaign_id: Uuid,
496    ) -> Result<Vec<EnergyBillUpload>, String> {
497        let rows = sqlx::query!(
498            r#"
499            SELECT
500                id, campaign_id, unit_id, building_id, organization_id,
501                bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
502                file_hash, file_path_encrypted, ocr_confidence, manually_verified,
503                uploaded_by, uploaded_at, verified_at, verified_by,
504                consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
505                anonymized, retention_until, deleted_at, created_at, updated_at
506            FROM energy_bill_uploads
507            WHERE campaign_id = $1
508            AND manually_verified = TRUE
509            AND deleted_at IS NULL
510            ORDER BY uploaded_at DESC
511            "#,
512            campaign_id
513        )
514        .fetch_all(&self.pool)
515        .await
516        .map_err(|e| format!("Failed to find verified uploads: {}", e))?;
517
518        let uploads = rows
519            .into_iter()
520            .map(|r| EnergyBillUpload {
521                id: r.id,
522                campaign_id: r.campaign_id,
523                unit_id: r.unit_id,
524                building_id: r.building_id,
525                organization_id: r.organization_id,
526                bill_period_start: r.bill_period_start,
527                bill_period_end: r.bill_period_end,
528                total_kwh_encrypted: r.total_kwh_encrypted,
529                energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
530                provider: r.provider,
531                postal_code: r.postal_code,
532                file_hash: r.file_hash,
533                file_path_encrypted: r.file_path_encrypted,
534                ocr_confidence: r.ocr_confidence,
535                manually_verified: r.manually_verified,
536                uploaded_by: r.uploaded_by,
537                uploaded_at: r.uploaded_at,
538                verified_at: r.verified_at,
539                verified_by: r.verified_by,
540                consent_timestamp: r.consent_timestamp,
541                consent_ip: r.consent_ip,
542                consent_user_agent: r.consent_user_agent,
543                consent_signature_hash: r.consent_signature_hash,
544                anonymized: r.anonymized,
545                retention_until: r.retention_until,
546                deleted_at: r.deleted_at,
547                created_at: r.created_at,
548                updated_at: r.updated_at,
549            })
550            .collect();
551
552        Ok(uploads)
553    }
554
555    async fn delete_expired(&self) -> Result<i32, String> {
556        let result = sqlx::query!(
557            r#"
558            UPDATE energy_bill_uploads
559            SET deleted_at = NOW(), updated_at = NOW()
560            WHERE retention_until < NOW()
561            AND deleted_at IS NULL
562            "#
563        )
564        .execute(&self.pool)
565        .await
566        .map_err(|e| format!("Failed to delete expired uploads: {}", e))?;
567
568        Ok(result.rows_affected() as i32)
569    }
570}