1use async_trait::async_trait;
2use sqlx::PgPool;
3use uuid::Uuid;
4
5use crate::application::ports::EnergyBillUploadRepository;
6use crate::domain::entities::{EnergyBillUpload, EnergyType};
7
8pub struct PostgresEnergyBillUploadRepository {
9 pub pool: PgPool,
10}
11
12impl PostgresEnergyBillUploadRepository {
13 pub fn new(pool: PgPool) -> Self {
14 Self { pool }
15 }
16}
17
18#[async_trait]
19impl EnergyBillUploadRepository for PostgresEnergyBillUploadRepository {
20 async fn create(&self, upload: &EnergyBillUpload) -> Result<EnergyBillUpload, String> {
21 sqlx::query!(
22 r#"
23 INSERT INTO energy_bill_uploads (
24 id, campaign_id, unit_id, building_id, organization_id,
25 bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
26 file_hash, file_path_encrypted, ocr_confidence, manually_verified,
27 uploaded_by, uploaded_at, verified_at, verified_by,
28 consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
29 anonymized, retention_until, deleted_at, created_at, updated_at
30 )
31 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28)
32 "#,
33 upload.id,
34 upload.campaign_id,
35 upload.unit_id,
36 upload.building_id,
37 upload.organization_id,
38 upload.bill_period_start,
39 upload.bill_period_end,
40 &upload.total_kwh_encrypted,
41 upload.energy_type.to_string(),
42 upload.provider.as_ref(),
43 upload.postal_code,
44 upload.file_hash,
45 upload.file_path_encrypted,
46 upload.ocr_confidence,
47 upload.manually_verified,
48 upload.uploaded_by,
49 upload.uploaded_at,
50 upload.verified_at,
51 upload.verified_by,
52 upload.consent_timestamp,
53 upload.consent_ip,
54 upload.consent_user_agent,
55 upload.consent_signature_hash,
56 upload.anonymized,
57 upload.retention_until,
58 upload.deleted_at,
59 upload.created_at,
60 upload.updated_at,
61 )
62 .execute(&self.pool)
63 .await
64 .map_err(|e| format!("Failed to create energy bill upload: {}", e))?;
65
66 Ok(upload.clone())
67 }
68
69 async fn find_by_id(&self, id: Uuid) -> Result<Option<EnergyBillUpload>, String> {
70 let row = sqlx::query!(
71 r#"
72 SELECT
73 id, campaign_id, unit_id, building_id, organization_id,
74 bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
75 file_hash, file_path_encrypted, ocr_confidence, manually_verified,
76 uploaded_by, uploaded_at, verified_at, verified_by,
77 consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
78 anonymized, retention_until, deleted_at, created_at, updated_at
79 FROM energy_bill_uploads
80 WHERE id = $1
81 "#,
82 id
83 )
84 .fetch_optional(&self.pool)
85 .await
86 .map_err(|e| format!("Failed to find energy bill upload: {}", e))?;
87
88 Ok(row.map(|r| EnergyBillUpload {
89 id: r.id,
90 campaign_id: r.campaign_id,
91 unit_id: r.unit_id,
92 building_id: r.building_id,
93 organization_id: r.organization_id,
94 bill_period_start: r.bill_period_start,
95 bill_period_end: r.bill_period_end,
96 total_kwh_encrypted: r.total_kwh_encrypted,
97 energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
98 provider: r.provider,
99 postal_code: r.postal_code,
100 file_hash: r.file_hash,
101 file_path_encrypted: r.file_path_encrypted,
102 ocr_confidence: r.ocr_confidence,
103 manually_verified: r.manually_verified,
104 uploaded_by: r.uploaded_by,
105 uploaded_at: r.uploaded_at,
106 verified_at: r.verified_at,
107 verified_by: r.verified_by,
108 consent_timestamp: r.consent_timestamp,
109 consent_ip: r.consent_ip,
110 consent_user_agent: r.consent_user_agent,
111 consent_signature_hash: r.consent_signature_hash,
112 anonymized: r.anonymized,
113 retention_until: r.retention_until,
114 deleted_at: r.deleted_at,
115 created_at: r.created_at,
116 updated_at: r.updated_at,
117 }))
118 }
119
120 async fn find_by_campaign(&self, campaign_id: Uuid) -> Result<Vec<EnergyBillUpload>, String> {
121 let rows = sqlx::query!(
122 r#"
123 SELECT
124 id, campaign_id, unit_id, building_id, organization_id,
125 bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
126 file_hash, file_path_encrypted, ocr_confidence, manually_verified,
127 uploaded_by, uploaded_at, verified_at, verified_by,
128 consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
129 anonymized, retention_until, deleted_at, created_at, updated_at
130 FROM energy_bill_uploads
131 WHERE campaign_id = $1
132 ORDER BY uploaded_at DESC
133 "#,
134 campaign_id
135 )
136 .fetch_all(&self.pool)
137 .await
138 .map_err(|e| format!("Failed to find uploads by campaign: {}", e))?;
139
140 let uploads = rows
141 .into_iter()
142 .map(|r| EnergyBillUpload {
143 id: r.id,
144 campaign_id: r.campaign_id,
145 unit_id: r.unit_id,
146 building_id: r.building_id,
147 organization_id: r.organization_id,
148 bill_period_start: r.bill_period_start,
149 bill_period_end: r.bill_period_end,
150 total_kwh_encrypted: r.total_kwh_encrypted,
151 energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
152 provider: r.provider,
153 postal_code: r.postal_code,
154 file_hash: r.file_hash,
155 file_path_encrypted: r.file_path_encrypted,
156 ocr_confidence: r.ocr_confidence,
157 manually_verified: r.manually_verified,
158 uploaded_by: r.uploaded_by,
159 uploaded_at: r.uploaded_at,
160 verified_at: r.verified_at,
161 verified_by: r.verified_by,
162 consent_timestamp: r.consent_timestamp,
163 consent_ip: r.consent_ip,
164 consent_user_agent: r.consent_user_agent,
165 consent_signature_hash: r.consent_signature_hash,
166 anonymized: r.anonymized,
167 retention_until: r.retention_until,
168 deleted_at: r.deleted_at,
169 created_at: r.created_at,
170 updated_at: r.updated_at,
171 })
172 .collect();
173
174 Ok(uploads)
175 }
176
177 async fn find_by_unit(&self, unit_id: Uuid) -> Result<Vec<EnergyBillUpload>, String> {
178 let rows = sqlx::query!(
179 r#"
180 SELECT
181 id, campaign_id, unit_id, building_id, organization_id,
182 bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
183 file_hash, file_path_encrypted, ocr_confidence, manually_verified,
184 uploaded_by, uploaded_at, verified_at, verified_by,
185 consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
186 anonymized, retention_until, deleted_at, created_at, updated_at
187 FROM energy_bill_uploads
188 WHERE unit_id = $1
189 ORDER BY uploaded_at DESC
190 "#,
191 unit_id
192 )
193 .fetch_all(&self.pool)
194 .await
195 .map_err(|e| format!("Failed to find uploads by unit: {}", e))?;
196
197 let uploads = rows
198 .into_iter()
199 .map(|r| EnergyBillUpload {
200 id: r.id,
201 campaign_id: r.campaign_id,
202 unit_id: r.unit_id,
203 building_id: r.building_id,
204 organization_id: r.organization_id,
205 bill_period_start: r.bill_period_start,
206 bill_period_end: r.bill_period_end,
207 total_kwh_encrypted: r.total_kwh_encrypted,
208 energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
209 provider: r.provider,
210 postal_code: r.postal_code,
211 file_hash: r.file_hash,
212 file_path_encrypted: r.file_path_encrypted,
213 ocr_confidence: r.ocr_confidence,
214 manually_verified: r.manually_verified,
215 uploaded_by: r.uploaded_by,
216 uploaded_at: r.uploaded_at,
217 verified_at: r.verified_at,
218 verified_by: r.verified_by,
219 consent_timestamp: r.consent_timestamp,
220 consent_ip: r.consent_ip,
221 consent_user_agent: r.consent_user_agent,
222 consent_signature_hash: r.consent_signature_hash,
223 anonymized: r.anonymized,
224 retention_until: r.retention_until,
225 deleted_at: r.deleted_at,
226 created_at: r.created_at,
227 updated_at: r.updated_at,
228 })
229 .collect();
230
231 Ok(uploads)
232 }
233
234 async fn find_by_campaign_and_unit(
235 &self,
236 campaign_id: Uuid,
237 unit_id: Uuid,
238 ) -> Result<Option<EnergyBillUpload>, String> {
239 let row = sqlx::query!(
240 r#"
241 SELECT
242 id, campaign_id, unit_id, building_id, organization_id,
243 bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
244 file_hash, file_path_encrypted, ocr_confidence, manually_verified,
245 uploaded_by, uploaded_at, verified_at, verified_by,
246 consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
247 anonymized, retention_until, deleted_at, created_at, updated_at
248 FROM energy_bill_uploads
249 WHERE campaign_id = $1 AND unit_id = $2
250 "#,
251 campaign_id,
252 unit_id
253 )
254 .fetch_optional(&self.pool)
255 .await
256 .map_err(|e| format!("Failed to find upload by campaign and unit: {}", e))?;
257
258 Ok(row.map(|r| EnergyBillUpload {
259 id: r.id,
260 campaign_id: r.campaign_id,
261 unit_id: r.unit_id,
262 building_id: r.building_id,
263 organization_id: r.organization_id,
264 bill_period_start: r.bill_period_start,
265 bill_period_end: r.bill_period_end,
266 total_kwh_encrypted: r.total_kwh_encrypted,
267 energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
268 provider: r.provider,
269 postal_code: r.postal_code,
270 file_hash: r.file_hash,
271 file_path_encrypted: r.file_path_encrypted,
272 ocr_confidence: r.ocr_confidence,
273 manually_verified: r.manually_verified,
274 uploaded_by: r.uploaded_by,
275 uploaded_at: r.uploaded_at,
276 verified_at: r.verified_at,
277 verified_by: r.verified_by,
278 consent_timestamp: r.consent_timestamp,
279 consent_ip: r.consent_ip,
280 consent_user_agent: r.consent_user_agent,
281 consent_signature_hash: r.consent_signature_hash,
282 anonymized: r.anonymized,
283 retention_until: r.retention_until,
284 deleted_at: r.deleted_at,
285 created_at: r.created_at,
286 updated_at: r.updated_at,
287 }))
288 }
289
290 async fn find_by_uploaded_by(
291 &self,
292 uploaded_by: Uuid,
293 ) -> Result<Vec<EnergyBillUpload>, String> {
294 let rows = sqlx::query!(
295 r#"
296 SELECT
297 id, campaign_id, unit_id, building_id, organization_id,
298 bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
299 file_hash, file_path_encrypted, ocr_confidence, manually_verified,
300 uploaded_by, uploaded_at, verified_at, verified_by,
301 consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
302 anonymized, retention_until, deleted_at, created_at, updated_at
303 FROM energy_bill_uploads
304 WHERE uploaded_by = $1
305 ORDER BY uploaded_at DESC
306 "#,
307 uploaded_by
308 )
309 .fetch_all(&self.pool)
310 .await
311 .map_err(|e| format!("Failed to find uploads by user: {}", e))?;
312
313 let uploads = rows
314 .into_iter()
315 .map(|r| EnergyBillUpload {
316 id: r.id,
317 campaign_id: r.campaign_id,
318 unit_id: r.unit_id,
319 building_id: r.building_id,
320 organization_id: r.organization_id,
321 bill_period_start: r.bill_period_start,
322 bill_period_end: r.bill_period_end,
323 total_kwh_encrypted: r.total_kwh_encrypted,
324 energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
325 provider: r.provider,
326 postal_code: r.postal_code,
327 file_hash: r.file_hash,
328 file_path_encrypted: r.file_path_encrypted,
329 ocr_confidence: r.ocr_confidence,
330 manually_verified: r.manually_verified,
331 uploaded_by: r.uploaded_by,
332 uploaded_at: r.uploaded_at,
333 verified_at: r.verified_at,
334 verified_by: r.verified_by,
335 consent_timestamp: r.consent_timestamp,
336 consent_ip: r.consent_ip,
337 consent_user_agent: r.consent_user_agent,
338 consent_signature_hash: r.consent_signature_hash,
339 anonymized: r.anonymized,
340 retention_until: r.retention_until,
341 deleted_at: r.deleted_at,
342 created_at: r.created_at,
343 updated_at: r.updated_at,
344 })
345 .collect();
346
347 Ok(uploads)
348 }
349
350 async fn find_by_building(&self, building_id: Uuid) -> Result<Vec<EnergyBillUpload>, String> {
351 let rows = sqlx::query!(
352 r#"
353 SELECT
354 id, campaign_id, unit_id, building_id, organization_id,
355 bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
356 file_hash, file_path_encrypted, ocr_confidence, manually_verified,
357 uploaded_by, uploaded_at, verified_at, verified_by,
358 consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
359 anonymized, retention_until, deleted_at, created_at, updated_at
360 FROM energy_bill_uploads
361 WHERE building_id = $1
362 ORDER BY uploaded_at DESC
363 "#,
364 building_id
365 )
366 .fetch_all(&self.pool)
367 .await
368 .map_err(|e| format!("Failed to find uploads by building: {}", e))?;
369
370 let uploads = rows
371 .into_iter()
372 .map(|r| EnergyBillUpload {
373 id: r.id,
374 campaign_id: r.campaign_id,
375 unit_id: r.unit_id,
376 building_id: r.building_id,
377 organization_id: r.organization_id,
378 bill_period_start: r.bill_period_start,
379 bill_period_end: r.bill_period_end,
380 total_kwh_encrypted: r.total_kwh_encrypted,
381 energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
382 provider: r.provider,
383 postal_code: r.postal_code,
384 file_hash: r.file_hash,
385 file_path_encrypted: r.file_path_encrypted,
386 ocr_confidence: r.ocr_confidence,
387 manually_verified: r.manually_verified,
388 uploaded_by: r.uploaded_by,
389 uploaded_at: r.uploaded_at,
390 verified_at: r.verified_at,
391 verified_by: r.verified_by,
392 consent_timestamp: r.consent_timestamp,
393 consent_ip: r.consent_ip,
394 consent_user_agent: r.consent_user_agent,
395 consent_signature_hash: r.consent_signature_hash,
396 anonymized: r.anonymized,
397 retention_until: r.retention_until,
398 deleted_at: r.deleted_at,
399 created_at: r.created_at,
400 updated_at: r.updated_at,
401 })
402 .collect();
403
404 Ok(uploads)
405 }
406
407 async fn update(&self, upload: &EnergyBillUpload) -> Result<EnergyBillUpload, String> {
408 sqlx::query!(
409 r#"
410 UPDATE energy_bill_uploads
411 SET
412 bill_period_start = $2,
413 bill_period_end = $3,
414 total_kwh_encrypted = $4,
415 energy_type = $5,
416 provider = $6,
417 postal_code = $7,
418 file_hash = $8,
419 file_path_encrypted = $9,
420 ocr_confidence = $10,
421 manually_verified = $11,
422 verified_at = $12,
423 verified_by = $13,
424 consent_timestamp = $14,
425 consent_ip = $15,
426 consent_user_agent = $16,
427 consent_signature_hash = $17,
428 anonymized = $18,
429 retention_until = $19,
430 deleted_at = $20,
431 updated_at = $21
432 WHERE id = $1
433 "#,
434 upload.id,
435 upload.bill_period_start,
436 upload.bill_period_end,
437 &upload.total_kwh_encrypted,
438 upload.energy_type.to_string(),
439 upload.provider.as_ref(),
440 upload.postal_code,
441 upload.file_hash,
442 upload.file_path_encrypted,
443 upload.ocr_confidence,
444 upload.manually_verified,
445 upload.verified_at,
446 upload.verified_by,
447 upload.consent_timestamp,
448 upload.consent_ip,
449 upload.consent_user_agent,
450 upload.consent_signature_hash,
451 upload.anonymized,
452 upload.retention_until,
453 upload.deleted_at,
454 upload.updated_at,
455 )
456 .execute(&self.pool)
457 .await
458 .map_err(|e| format!("Failed to update energy bill upload: {}", e))?;
459
460 Ok(upload.clone())
461 }
462
463 async fn delete(&self, id: Uuid) -> Result<(), String> {
464 sqlx::query!(
466 r#"
467 UPDATE energy_bill_uploads
468 SET deleted_at = NOW(), updated_at = NOW()
469 WHERE id = $1
470 "#,
471 id
472 )
473 .execute(&self.pool)
474 .await
475 .map_err(|e| format!("Failed to delete energy bill upload: {}", e))?;
476
477 Ok(())
478 }
479
480 async fn find_expired(&self) -> Result<Vec<EnergyBillUpload>, String> {
481 let rows = sqlx::query!(
482 r#"
483 SELECT
484 id, campaign_id, unit_id, building_id, organization_id,
485 bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
486 file_hash, file_path_encrypted, ocr_confidence, manually_verified,
487 uploaded_by, uploaded_at, verified_at, verified_by,
488 consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
489 anonymized, retention_until, deleted_at, created_at, updated_at
490 FROM energy_bill_uploads
491 WHERE retention_until < NOW() AND deleted_at IS NULL
492 "#
493 )
494 .fetch_all(&self.pool)
495 .await
496 .map_err(|e| format!("Failed to find expired uploads: {}", e))?;
497
498 let uploads = rows
499 .into_iter()
500 .map(|r| EnergyBillUpload {
501 id: r.id,
502 campaign_id: r.campaign_id,
503 unit_id: r.unit_id,
504 building_id: r.building_id,
505 organization_id: r.organization_id,
506 bill_period_start: r.bill_period_start,
507 bill_period_end: r.bill_period_end,
508 total_kwh_encrypted: r.total_kwh_encrypted,
509 energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
510 provider: r.provider,
511 postal_code: r.postal_code,
512 file_hash: r.file_hash,
513 file_path_encrypted: r.file_path_encrypted,
514 ocr_confidence: r.ocr_confidence,
515 manually_verified: r.manually_verified,
516 uploaded_by: r.uploaded_by,
517 uploaded_at: r.uploaded_at,
518 verified_at: r.verified_at,
519 verified_by: r.verified_by,
520 consent_timestamp: r.consent_timestamp,
521 consent_ip: r.consent_ip,
522 consent_user_agent: r.consent_user_agent,
523 consent_signature_hash: r.consent_signature_hash,
524 anonymized: r.anonymized,
525 retention_until: r.retention_until,
526 deleted_at: r.deleted_at,
527 created_at: r.created_at,
528 updated_at: r.updated_at,
529 })
530 .collect();
531
532 Ok(uploads)
533 }
534
535 async fn count_verified_by_campaign(&self, campaign_id: Uuid) -> Result<i32, String> {
536 let result = sqlx::query!(
537 r#"
538 SELECT COUNT(*) as count
539 FROM energy_bill_uploads
540 WHERE campaign_id = $1
541 AND manually_verified = TRUE
542 AND deleted_at IS NULL
543 "#,
544 campaign_id
545 )
546 .fetch_one(&self.pool)
547 .await
548 .map_err(|e| format!("Failed to count verified uploads: {}", e))?;
549
550 Ok(result.count.unwrap_or(0) as i32)
551 }
552
553 async fn find_verified_by_campaign(
554 &self,
555 campaign_id: Uuid,
556 ) -> Result<Vec<EnergyBillUpload>, String> {
557 let rows = sqlx::query!(
558 r#"
559 SELECT
560 id, campaign_id, unit_id, building_id, organization_id,
561 bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
562 file_hash, file_path_encrypted, ocr_confidence, manually_verified,
563 uploaded_by, uploaded_at, verified_at, verified_by,
564 consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
565 anonymized, retention_until, deleted_at, created_at, updated_at
566 FROM energy_bill_uploads
567 WHERE campaign_id = $1
568 AND manually_verified = TRUE
569 AND deleted_at IS NULL
570 ORDER BY uploaded_at DESC
571 "#,
572 campaign_id
573 )
574 .fetch_all(&self.pool)
575 .await
576 .map_err(|e| format!("Failed to find verified uploads: {}", e))?;
577
578 let uploads = rows
579 .into_iter()
580 .map(|r| EnergyBillUpload {
581 id: r.id,
582 campaign_id: r.campaign_id,
583 unit_id: r.unit_id,
584 building_id: r.building_id,
585 organization_id: r.organization_id,
586 bill_period_start: r.bill_period_start,
587 bill_period_end: r.bill_period_end,
588 total_kwh_encrypted: r.total_kwh_encrypted,
589 energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
590 provider: r.provider,
591 postal_code: r.postal_code,
592 file_hash: r.file_hash,
593 file_path_encrypted: r.file_path_encrypted,
594 ocr_confidence: r.ocr_confidence,
595 manually_verified: r.manually_verified,
596 uploaded_by: r.uploaded_by,
597 uploaded_at: r.uploaded_at,
598 verified_at: r.verified_at,
599 verified_by: r.verified_by,
600 consent_timestamp: r.consent_timestamp,
601 consent_ip: r.consent_ip,
602 consent_user_agent: r.consent_user_agent,
603 consent_signature_hash: r.consent_signature_hash,
604 anonymized: r.anonymized,
605 retention_until: r.retention_until,
606 deleted_at: r.deleted_at,
607 created_at: r.created_at,
608 updated_at: r.updated_at,
609 })
610 .collect();
611
612 Ok(uploads)
613 }
614
615 async fn delete_expired(&self) -> Result<i32, String> {
616 let result = sqlx::query!(
617 r#"
618 UPDATE energy_bill_uploads
619 SET deleted_at = NOW(), updated_at = NOW()
620 WHERE retention_until < NOW()
621 AND deleted_at IS NULL
622 "#
623 )
624 .execute(&self.pool)
625 .await
626 .map_err(|e| format!("Failed to delete expired uploads: {}", e))?;
627
628 Ok(result.rows_affected() as i32)
629 }
630}