koprogo_api/infrastructure/database/repositories/
energy_bill_upload_repository_impl.rs

1use async_trait::async_trait;
2use sqlx::PgPool;
3use uuid::Uuid;
4
5use crate::application::ports::EnergyBillUploadRepository;
6use crate::domain::entities::{EnergyBillUpload, EnergyType};
7
8pub struct PostgresEnergyBillUploadRepository {
9    pub pool: PgPool,
10}
11
12impl PostgresEnergyBillUploadRepository {
13    pub fn new(pool: PgPool) -> Self {
14        Self { pool }
15    }
16}
17
18#[async_trait]
19impl EnergyBillUploadRepository for PostgresEnergyBillUploadRepository {
20    async fn create(&self, upload: &EnergyBillUpload) -> Result<EnergyBillUpload, String> {
21        sqlx::query!(
22            r#"
23            INSERT INTO energy_bill_uploads (
24                id, campaign_id, unit_id, building_id, organization_id,
25                bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
26                file_hash, file_path_encrypted, ocr_confidence, manually_verified,
27                uploaded_by, uploaded_at, verified_at, verified_by,
28                consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
29                anonymized, retention_until, deleted_at, created_at, updated_at
30            )
31            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28)
32            "#,
33            upload.id,
34            upload.campaign_id,
35            upload.unit_id,
36            upload.building_id,
37            upload.organization_id,
38            upload.bill_period_start,
39            upload.bill_period_end,
40            &upload.total_kwh_encrypted,
41            upload.energy_type.to_string(),
42            upload.provider.as_ref(),
43            upload.postal_code,
44            upload.file_hash,
45            upload.file_path_encrypted,
46            upload.ocr_confidence,
47            upload.manually_verified,
48            upload.uploaded_by,
49            upload.uploaded_at,
50            upload.verified_at,
51            upload.verified_by,
52            upload.consent_timestamp,
53            upload.consent_ip,
54            upload.consent_user_agent,
55            upload.consent_signature_hash,
56            upload.anonymized,
57            upload.retention_until,
58            upload.deleted_at,
59            upload.created_at,
60            upload.updated_at,
61        )
62        .execute(&self.pool)
63        .await
64        .map_err(|e| format!("Failed to create energy bill upload: {}", e))?;
65
66        Ok(upload.clone())
67    }
68
69    async fn find_by_id(&self, id: Uuid) -> Result<Option<EnergyBillUpload>, String> {
70        let row = sqlx::query!(
71            r#"
72            SELECT
73                id, campaign_id, unit_id, building_id, organization_id,
74                bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
75                file_hash, file_path_encrypted, ocr_confidence, manually_verified,
76                uploaded_by, uploaded_at, verified_at, verified_by,
77                consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
78                anonymized, retention_until, deleted_at, created_at, updated_at
79            FROM energy_bill_uploads
80            WHERE id = $1
81            "#,
82            id
83        )
84        .fetch_optional(&self.pool)
85        .await
86        .map_err(|e| format!("Failed to find energy bill upload: {}", e))?;
87
88        Ok(row.map(|r| EnergyBillUpload {
89            id: r.id,
90            campaign_id: r.campaign_id,
91            unit_id: r.unit_id,
92            building_id: r.building_id,
93            organization_id: r.organization_id,
94            bill_period_start: r.bill_period_start,
95            bill_period_end: r.bill_period_end,
96            total_kwh_encrypted: r.total_kwh_encrypted,
97            energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
98            provider: r.provider,
99            postal_code: r.postal_code,
100            file_hash: r.file_hash,
101            file_path_encrypted: r.file_path_encrypted,
102            ocr_confidence: r.ocr_confidence,
103            manually_verified: r.manually_verified,
104            uploaded_by: r.uploaded_by,
105            uploaded_at: r.uploaded_at,
106            verified_at: r.verified_at,
107            verified_by: r.verified_by,
108            consent_timestamp: r.consent_timestamp,
109            consent_ip: r.consent_ip,
110            consent_user_agent: r.consent_user_agent,
111            consent_signature_hash: r.consent_signature_hash,
112            anonymized: r.anonymized,
113            retention_until: r.retention_until,
114            deleted_at: r.deleted_at,
115            created_at: r.created_at,
116            updated_at: r.updated_at,
117        }))
118    }
119
120    async fn find_by_campaign(&self, campaign_id: Uuid) -> Result<Vec<EnergyBillUpload>, String> {
121        let rows = sqlx::query!(
122            r#"
123            SELECT
124                id, campaign_id, unit_id, building_id, organization_id,
125                bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
126                file_hash, file_path_encrypted, ocr_confidence, manually_verified,
127                uploaded_by, uploaded_at, verified_at, verified_by,
128                consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
129                anonymized, retention_until, deleted_at, created_at, updated_at
130            FROM energy_bill_uploads
131            WHERE campaign_id = $1
132            ORDER BY uploaded_at DESC
133            "#,
134            campaign_id
135        )
136        .fetch_all(&self.pool)
137        .await
138        .map_err(|e| format!("Failed to find uploads by campaign: {}", e))?;
139
140        let uploads = rows
141            .into_iter()
142            .map(|r| EnergyBillUpload {
143                id: r.id,
144                campaign_id: r.campaign_id,
145                unit_id: r.unit_id,
146                building_id: r.building_id,
147                organization_id: r.organization_id,
148                bill_period_start: r.bill_period_start,
149                bill_period_end: r.bill_period_end,
150                total_kwh_encrypted: r.total_kwh_encrypted,
151                energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
152                provider: r.provider,
153                postal_code: r.postal_code,
154                file_hash: r.file_hash,
155                file_path_encrypted: r.file_path_encrypted,
156                ocr_confidence: r.ocr_confidence,
157                manually_verified: r.manually_verified,
158                uploaded_by: r.uploaded_by,
159                uploaded_at: r.uploaded_at,
160                verified_at: r.verified_at,
161                verified_by: r.verified_by,
162                consent_timestamp: r.consent_timestamp,
163                consent_ip: r.consent_ip,
164                consent_user_agent: r.consent_user_agent,
165                consent_signature_hash: r.consent_signature_hash,
166                anonymized: r.anonymized,
167                retention_until: r.retention_until,
168                deleted_at: r.deleted_at,
169                created_at: r.created_at,
170                updated_at: r.updated_at,
171            })
172            .collect();
173
174        Ok(uploads)
175    }
176
177    async fn find_by_unit(&self, unit_id: Uuid) -> Result<Vec<EnergyBillUpload>, String> {
178        let rows = sqlx::query!(
179            r#"
180            SELECT
181                id, campaign_id, unit_id, building_id, organization_id,
182                bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
183                file_hash, file_path_encrypted, ocr_confidence, manually_verified,
184                uploaded_by, uploaded_at, verified_at, verified_by,
185                consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
186                anonymized, retention_until, deleted_at, created_at, updated_at
187            FROM energy_bill_uploads
188            WHERE unit_id = $1
189            ORDER BY uploaded_at DESC
190            "#,
191            unit_id
192        )
193        .fetch_all(&self.pool)
194        .await
195        .map_err(|e| format!("Failed to find uploads by unit: {}", e))?;
196
197        let uploads = rows
198            .into_iter()
199            .map(|r| EnergyBillUpload {
200                id: r.id,
201                campaign_id: r.campaign_id,
202                unit_id: r.unit_id,
203                building_id: r.building_id,
204                organization_id: r.organization_id,
205                bill_period_start: r.bill_period_start,
206                bill_period_end: r.bill_period_end,
207                total_kwh_encrypted: r.total_kwh_encrypted,
208                energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
209                provider: r.provider,
210                postal_code: r.postal_code,
211                file_hash: r.file_hash,
212                file_path_encrypted: r.file_path_encrypted,
213                ocr_confidence: r.ocr_confidence,
214                manually_verified: r.manually_verified,
215                uploaded_by: r.uploaded_by,
216                uploaded_at: r.uploaded_at,
217                verified_at: r.verified_at,
218                verified_by: r.verified_by,
219                consent_timestamp: r.consent_timestamp,
220                consent_ip: r.consent_ip,
221                consent_user_agent: r.consent_user_agent,
222                consent_signature_hash: r.consent_signature_hash,
223                anonymized: r.anonymized,
224                retention_until: r.retention_until,
225                deleted_at: r.deleted_at,
226                created_at: r.created_at,
227                updated_at: r.updated_at,
228            })
229            .collect();
230
231        Ok(uploads)
232    }
233
234    async fn find_by_campaign_and_unit(
235        &self,
236        campaign_id: Uuid,
237        unit_id: Uuid,
238    ) -> Result<Option<EnergyBillUpload>, String> {
239        let row = sqlx::query!(
240            r#"
241            SELECT
242                id, campaign_id, unit_id, building_id, organization_id,
243                bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
244                file_hash, file_path_encrypted, ocr_confidence, manually_verified,
245                uploaded_by, uploaded_at, verified_at, verified_by,
246                consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
247                anonymized, retention_until, deleted_at, created_at, updated_at
248            FROM energy_bill_uploads
249            WHERE campaign_id = $1 AND unit_id = $2
250            "#,
251            campaign_id,
252            unit_id
253        )
254        .fetch_optional(&self.pool)
255        .await
256        .map_err(|e| format!("Failed to find upload by campaign and unit: {}", e))?;
257
258        Ok(row.map(|r| EnergyBillUpload {
259            id: r.id,
260            campaign_id: r.campaign_id,
261            unit_id: r.unit_id,
262            building_id: r.building_id,
263            organization_id: r.organization_id,
264            bill_period_start: r.bill_period_start,
265            bill_period_end: r.bill_period_end,
266            total_kwh_encrypted: r.total_kwh_encrypted,
267            energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
268            provider: r.provider,
269            postal_code: r.postal_code,
270            file_hash: r.file_hash,
271            file_path_encrypted: r.file_path_encrypted,
272            ocr_confidence: r.ocr_confidence,
273            manually_verified: r.manually_verified,
274            uploaded_by: r.uploaded_by,
275            uploaded_at: r.uploaded_at,
276            verified_at: r.verified_at,
277            verified_by: r.verified_by,
278            consent_timestamp: r.consent_timestamp,
279            consent_ip: r.consent_ip,
280            consent_user_agent: r.consent_user_agent,
281            consent_signature_hash: r.consent_signature_hash,
282            anonymized: r.anonymized,
283            retention_until: r.retention_until,
284            deleted_at: r.deleted_at,
285            created_at: r.created_at,
286            updated_at: r.updated_at,
287        }))
288    }
289
290    async fn find_by_uploaded_by(
291        &self,
292        uploaded_by: Uuid,
293    ) -> Result<Vec<EnergyBillUpload>, String> {
294        let rows = sqlx::query!(
295            r#"
296            SELECT
297                id, campaign_id, unit_id, building_id, organization_id,
298                bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
299                file_hash, file_path_encrypted, ocr_confidence, manually_verified,
300                uploaded_by, uploaded_at, verified_at, verified_by,
301                consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
302                anonymized, retention_until, deleted_at, created_at, updated_at
303            FROM energy_bill_uploads
304            WHERE uploaded_by = $1
305            ORDER BY uploaded_at DESC
306            "#,
307            uploaded_by
308        )
309        .fetch_all(&self.pool)
310        .await
311        .map_err(|e| format!("Failed to find uploads by user: {}", e))?;
312
313        let uploads = rows
314            .into_iter()
315            .map(|r| EnergyBillUpload {
316                id: r.id,
317                campaign_id: r.campaign_id,
318                unit_id: r.unit_id,
319                building_id: r.building_id,
320                organization_id: r.organization_id,
321                bill_period_start: r.bill_period_start,
322                bill_period_end: r.bill_period_end,
323                total_kwh_encrypted: r.total_kwh_encrypted,
324                energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
325                provider: r.provider,
326                postal_code: r.postal_code,
327                file_hash: r.file_hash,
328                file_path_encrypted: r.file_path_encrypted,
329                ocr_confidence: r.ocr_confidence,
330                manually_verified: r.manually_verified,
331                uploaded_by: r.uploaded_by,
332                uploaded_at: r.uploaded_at,
333                verified_at: r.verified_at,
334                verified_by: r.verified_by,
335                consent_timestamp: r.consent_timestamp,
336                consent_ip: r.consent_ip,
337                consent_user_agent: r.consent_user_agent,
338                consent_signature_hash: r.consent_signature_hash,
339                anonymized: r.anonymized,
340                retention_until: r.retention_until,
341                deleted_at: r.deleted_at,
342                created_at: r.created_at,
343                updated_at: r.updated_at,
344            })
345            .collect();
346
347        Ok(uploads)
348    }
349
350    async fn find_by_building(&self, building_id: Uuid) -> Result<Vec<EnergyBillUpload>, String> {
351        let rows = sqlx::query!(
352            r#"
353            SELECT
354                id, campaign_id, unit_id, building_id, organization_id,
355                bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
356                file_hash, file_path_encrypted, ocr_confidence, manually_verified,
357                uploaded_by, uploaded_at, verified_at, verified_by,
358                consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
359                anonymized, retention_until, deleted_at, created_at, updated_at
360            FROM energy_bill_uploads
361            WHERE building_id = $1
362            ORDER BY uploaded_at DESC
363            "#,
364            building_id
365        )
366        .fetch_all(&self.pool)
367        .await
368        .map_err(|e| format!("Failed to find uploads by building: {}", e))?;
369
370        let uploads = rows
371            .into_iter()
372            .map(|r| EnergyBillUpload {
373                id: r.id,
374                campaign_id: r.campaign_id,
375                unit_id: r.unit_id,
376                building_id: r.building_id,
377                organization_id: r.organization_id,
378                bill_period_start: r.bill_period_start,
379                bill_period_end: r.bill_period_end,
380                total_kwh_encrypted: r.total_kwh_encrypted,
381                energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
382                provider: r.provider,
383                postal_code: r.postal_code,
384                file_hash: r.file_hash,
385                file_path_encrypted: r.file_path_encrypted,
386                ocr_confidence: r.ocr_confidence,
387                manually_verified: r.manually_verified,
388                uploaded_by: r.uploaded_by,
389                uploaded_at: r.uploaded_at,
390                verified_at: r.verified_at,
391                verified_by: r.verified_by,
392                consent_timestamp: r.consent_timestamp,
393                consent_ip: r.consent_ip,
394                consent_user_agent: r.consent_user_agent,
395                consent_signature_hash: r.consent_signature_hash,
396                anonymized: r.anonymized,
397                retention_until: r.retention_until,
398                deleted_at: r.deleted_at,
399                created_at: r.created_at,
400                updated_at: r.updated_at,
401            })
402            .collect();
403
404        Ok(uploads)
405    }
406
407    async fn update(&self, upload: &EnergyBillUpload) -> Result<EnergyBillUpload, String> {
408        sqlx::query!(
409            r#"
410            UPDATE energy_bill_uploads
411            SET
412                bill_period_start = $2,
413                bill_period_end = $3,
414                total_kwh_encrypted = $4,
415                energy_type = $5,
416                provider = $6,
417                postal_code = $7,
418                file_hash = $8,
419                file_path_encrypted = $9,
420                ocr_confidence = $10,
421                manually_verified = $11,
422                verified_at = $12,
423                verified_by = $13,
424                consent_timestamp = $14,
425                consent_ip = $15,
426                consent_user_agent = $16,
427                consent_signature_hash = $17,
428                anonymized = $18,
429                retention_until = $19,
430                deleted_at = $20,
431                updated_at = $21
432            WHERE id = $1
433            "#,
434            upload.id,
435            upload.bill_period_start,
436            upload.bill_period_end,
437            &upload.total_kwh_encrypted,
438            upload.energy_type.to_string(),
439            upload.provider.as_ref(),
440            upload.postal_code,
441            upload.file_hash,
442            upload.file_path_encrypted,
443            upload.ocr_confidence,
444            upload.manually_verified,
445            upload.verified_at,
446            upload.verified_by,
447            upload.consent_timestamp,
448            upload.consent_ip,
449            upload.consent_user_agent,
450            upload.consent_signature_hash,
451            upload.anonymized,
452            upload.retention_until,
453            upload.deleted_at,
454            upload.updated_at,
455        )
456        .execute(&self.pool)
457        .await
458        .map_err(|e| format!("Failed to update energy bill upload: {}", e))?;
459
460        Ok(upload.clone())
461    }
462
463    async fn delete(&self, id: Uuid) -> Result<(), String> {
464        // Soft delete
465        sqlx::query!(
466            r#"
467            UPDATE energy_bill_uploads
468            SET deleted_at = NOW(), updated_at = NOW()
469            WHERE id = $1
470            "#,
471            id
472        )
473        .execute(&self.pool)
474        .await
475        .map_err(|e| format!("Failed to delete energy bill upload: {}", e))?;
476
477        Ok(())
478    }
479
480    async fn find_expired(&self) -> Result<Vec<EnergyBillUpload>, String> {
481        let rows = sqlx::query!(
482            r#"
483            SELECT
484                id, campaign_id, unit_id, building_id, organization_id,
485                bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
486                file_hash, file_path_encrypted, ocr_confidence, manually_verified,
487                uploaded_by, uploaded_at, verified_at, verified_by,
488                consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
489                anonymized, retention_until, deleted_at, created_at, updated_at
490            FROM energy_bill_uploads
491            WHERE retention_until < NOW() AND deleted_at IS NULL
492            "#
493        )
494        .fetch_all(&self.pool)
495        .await
496        .map_err(|e| format!("Failed to find expired uploads: {}", e))?;
497
498        let uploads = rows
499            .into_iter()
500            .map(|r| EnergyBillUpload {
501                id: r.id,
502                campaign_id: r.campaign_id,
503                unit_id: r.unit_id,
504                building_id: r.building_id,
505                organization_id: r.organization_id,
506                bill_period_start: r.bill_period_start,
507                bill_period_end: r.bill_period_end,
508                total_kwh_encrypted: r.total_kwh_encrypted,
509                energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
510                provider: r.provider,
511                postal_code: r.postal_code,
512                file_hash: r.file_hash,
513                file_path_encrypted: r.file_path_encrypted,
514                ocr_confidence: r.ocr_confidence,
515                manually_verified: r.manually_verified,
516                uploaded_by: r.uploaded_by,
517                uploaded_at: r.uploaded_at,
518                verified_at: r.verified_at,
519                verified_by: r.verified_by,
520                consent_timestamp: r.consent_timestamp,
521                consent_ip: r.consent_ip,
522                consent_user_agent: r.consent_user_agent,
523                consent_signature_hash: r.consent_signature_hash,
524                anonymized: r.anonymized,
525                retention_until: r.retention_until,
526                deleted_at: r.deleted_at,
527                created_at: r.created_at,
528                updated_at: r.updated_at,
529            })
530            .collect();
531
532        Ok(uploads)
533    }
534
535    async fn count_verified_by_campaign(&self, campaign_id: Uuid) -> Result<i32, String> {
536        let result = sqlx::query!(
537            r#"
538            SELECT COUNT(*) as count
539            FROM energy_bill_uploads
540            WHERE campaign_id = $1
541            AND manually_verified = TRUE
542            AND deleted_at IS NULL
543            "#,
544            campaign_id
545        )
546        .fetch_one(&self.pool)
547        .await
548        .map_err(|e| format!("Failed to count verified uploads: {}", e))?;
549
550        Ok(result.count.unwrap_or(0) as i32)
551    }
552
553    async fn find_verified_by_campaign(
554        &self,
555        campaign_id: Uuid,
556    ) -> Result<Vec<EnergyBillUpload>, String> {
557        let rows = sqlx::query!(
558            r#"
559            SELECT
560                id, campaign_id, unit_id, building_id, organization_id,
561                bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
562                file_hash, file_path_encrypted, ocr_confidence, manually_verified,
563                uploaded_by, uploaded_at, verified_at, verified_by,
564                consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
565                anonymized, retention_until, deleted_at, created_at, updated_at
566            FROM energy_bill_uploads
567            WHERE campaign_id = $1
568            AND manually_verified = TRUE
569            AND deleted_at IS NULL
570            ORDER BY uploaded_at DESC
571            "#,
572            campaign_id
573        )
574        .fetch_all(&self.pool)
575        .await
576        .map_err(|e| format!("Failed to find verified uploads: {}", e))?;
577
578        let uploads = rows
579            .into_iter()
580            .map(|r| EnergyBillUpload {
581                id: r.id,
582                campaign_id: r.campaign_id,
583                unit_id: r.unit_id,
584                building_id: r.building_id,
585                organization_id: r.organization_id,
586                bill_period_start: r.bill_period_start,
587                bill_period_end: r.bill_period_end,
588                total_kwh_encrypted: r.total_kwh_encrypted,
589                energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
590                provider: r.provider,
591                postal_code: r.postal_code,
592                file_hash: r.file_hash,
593                file_path_encrypted: r.file_path_encrypted,
594                ocr_confidence: r.ocr_confidence,
595                manually_verified: r.manually_verified,
596                uploaded_by: r.uploaded_by,
597                uploaded_at: r.uploaded_at,
598                verified_at: r.verified_at,
599                verified_by: r.verified_by,
600                consent_timestamp: r.consent_timestamp,
601                consent_ip: r.consent_ip,
602                consent_user_agent: r.consent_user_agent,
603                consent_signature_hash: r.consent_signature_hash,
604                anonymized: r.anonymized,
605                retention_until: r.retention_until,
606                deleted_at: r.deleted_at,
607                created_at: r.created_at,
608                updated_at: r.updated_at,
609            })
610            .collect();
611
612        Ok(uploads)
613    }
614
615    async fn delete_expired(&self) -> Result<i32, String> {
616        let result = sqlx::query!(
617            r#"
618            UPDATE energy_bill_uploads
619            SET deleted_at = NOW(), updated_at = NOW()
620            WHERE retention_until < NOW()
621            AND deleted_at IS NULL
622            "#
623        )
624        .execute(&self.pool)
625        .await
626        .map_err(|e| format!("Failed to delete expired uploads: {}", e))?;
627
628        Ok(result.rows_affected() as i32)
629    }
630}