1use async_trait::async_trait;
2use sqlx::PgPool;
3use uuid::Uuid;
4
5use crate::application::ports::EnergyBillUploadRepository;
6use crate::domain::entities::{EnergyBillUpload, EnergyType};
7
8pub struct PostgresEnergyBillUploadRepository {
9 pub pool: PgPool,
10}
11
12impl PostgresEnergyBillUploadRepository {
13 pub fn new(pool: PgPool) -> Self {
14 Self { pool }
15 }
16}
17
18#[async_trait]
19impl EnergyBillUploadRepository for PostgresEnergyBillUploadRepository {
20 async fn create(&self, upload: &EnergyBillUpload) -> Result<EnergyBillUpload, String> {
21 sqlx::query!(
22 r#"
23 INSERT INTO energy_bill_uploads (
24 id, campaign_id, unit_id, building_id, organization_id,
25 bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
26 file_hash, file_path_encrypted, ocr_confidence, manually_verified,
27 uploaded_by, uploaded_at, verified_at, verified_by,
28 consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
29 anonymized, retention_until, deleted_at, created_at, updated_at
30 )
31 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28)
32 "#,
33 upload.id,
34 upload.campaign_id,
35 upload.unit_id,
36 upload.building_id,
37 upload.organization_id,
38 upload.bill_period_start,
39 upload.bill_period_end,
40 &upload.total_kwh_encrypted,
41 upload.energy_type.to_string(),
42 upload.provider.as_ref(),
43 upload.postal_code,
44 upload.file_hash,
45 upload.file_path_encrypted,
46 upload.ocr_confidence,
47 upload.manually_verified,
48 upload.uploaded_by,
49 upload.uploaded_at,
50 upload.verified_at,
51 upload.verified_by,
52 upload.consent_timestamp,
53 upload.consent_ip,
54 upload.consent_user_agent,
55 upload.consent_signature_hash,
56 upload.anonymized,
57 upload.retention_until,
58 upload.deleted_at,
59 upload.created_at,
60 upload.updated_at,
61 )
62 .execute(&self.pool)
63 .await
64 .map_err(|e| format!("Failed to create energy bill upload: {}", e))?;
65
66 Ok(upload.clone())
67 }
68
69 async fn find_by_id(&self, id: Uuid) -> Result<Option<EnergyBillUpload>, String> {
70 let row = sqlx::query!(
71 r#"
72 SELECT
73 id, campaign_id, unit_id, building_id, organization_id,
74 bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
75 file_hash, file_path_encrypted, ocr_confidence, manually_verified,
76 uploaded_by, uploaded_at, verified_at, verified_by,
77 consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
78 anonymized, retention_until, deleted_at, created_at, updated_at
79 FROM energy_bill_uploads
80 WHERE id = $1
81 "#,
82 id
83 )
84 .fetch_optional(&self.pool)
85 .await
86 .map_err(|e| format!("Failed to find energy bill upload: {}", e))?;
87
88 Ok(row.map(|r| EnergyBillUpload {
89 id: r.id,
90 campaign_id: r.campaign_id,
91 unit_id: r.unit_id,
92 building_id: r.building_id,
93 organization_id: r.organization_id,
94 bill_period_start: r.bill_period_start,
95 bill_period_end: r.bill_period_end,
96 total_kwh_encrypted: r.total_kwh_encrypted,
97 energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
98 provider: r.provider,
99 postal_code: r.postal_code,
100 file_hash: r.file_hash,
101 file_path_encrypted: r.file_path_encrypted,
102 ocr_confidence: r.ocr_confidence,
103 manually_verified: r.manually_verified,
104 uploaded_by: r.uploaded_by,
105 uploaded_at: r.uploaded_at,
106 verified_at: r.verified_at,
107 verified_by: r.verified_by,
108 consent_timestamp: r.consent_timestamp,
109 consent_ip: r.consent_ip,
110 consent_user_agent: r.consent_user_agent,
111 consent_signature_hash: r.consent_signature_hash,
112 anonymized: r.anonymized,
113 retention_until: r.retention_until,
114 deleted_at: r.deleted_at,
115 created_at: r.created_at,
116 updated_at: r.updated_at,
117 }))
118 }
119
120 async fn find_by_campaign(&self, campaign_id: Uuid) -> Result<Vec<EnergyBillUpload>, String> {
121 let rows = sqlx::query!(
122 r#"
123 SELECT
124 id, campaign_id, unit_id, building_id, organization_id,
125 bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
126 file_hash, file_path_encrypted, ocr_confidence, manually_verified,
127 uploaded_by, uploaded_at, verified_at, verified_by,
128 consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
129 anonymized, retention_until, deleted_at, created_at, updated_at
130 FROM energy_bill_uploads
131 WHERE campaign_id = $1
132 ORDER BY uploaded_at DESC
133 "#,
134 campaign_id
135 )
136 .fetch_all(&self.pool)
137 .await
138 .map_err(|e| format!("Failed to find uploads by campaign: {}", e))?;
139
140 let uploads = rows
141 .into_iter()
142 .map(|r| EnergyBillUpload {
143 id: r.id,
144 campaign_id: r.campaign_id,
145 unit_id: r.unit_id,
146 building_id: r.building_id,
147 organization_id: r.organization_id,
148 bill_period_start: r.bill_period_start,
149 bill_period_end: r.bill_period_end,
150 total_kwh_encrypted: r.total_kwh_encrypted,
151 energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
152 provider: r.provider,
153 postal_code: r.postal_code,
154 file_hash: r.file_hash,
155 file_path_encrypted: r.file_path_encrypted,
156 ocr_confidence: r.ocr_confidence,
157 manually_verified: r.manually_verified,
158 uploaded_by: r.uploaded_by,
159 uploaded_at: r.uploaded_at,
160 verified_at: r.verified_at,
161 verified_by: r.verified_by,
162 consent_timestamp: r.consent_timestamp,
163 consent_ip: r.consent_ip,
164 consent_user_agent: r.consent_user_agent,
165 consent_signature_hash: r.consent_signature_hash,
166 anonymized: r.anonymized,
167 retention_until: r.retention_until,
168 deleted_at: r.deleted_at,
169 created_at: r.created_at,
170 updated_at: r.updated_at,
171 })
172 .collect();
173
174 Ok(uploads)
175 }
176
177 async fn find_by_unit(&self, unit_id: Uuid) -> Result<Vec<EnergyBillUpload>, String> {
178 let rows = sqlx::query!(
179 r#"
180 SELECT
181 id, campaign_id, unit_id, building_id, organization_id,
182 bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
183 file_hash, file_path_encrypted, ocr_confidence, manually_verified,
184 uploaded_by, uploaded_at, verified_at, verified_by,
185 consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
186 anonymized, retention_until, deleted_at, created_at, updated_at
187 FROM energy_bill_uploads
188 WHERE unit_id = $1
189 ORDER BY uploaded_at DESC
190 "#,
191 unit_id
192 )
193 .fetch_all(&self.pool)
194 .await
195 .map_err(|e| format!("Failed to find uploads by unit: {}", e))?;
196
197 let uploads = rows
198 .into_iter()
199 .map(|r| EnergyBillUpload {
200 id: r.id,
201 campaign_id: r.campaign_id,
202 unit_id: r.unit_id,
203 building_id: r.building_id,
204 organization_id: r.organization_id,
205 bill_period_start: r.bill_period_start,
206 bill_period_end: r.bill_period_end,
207 total_kwh_encrypted: r.total_kwh_encrypted,
208 energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
209 provider: r.provider,
210 postal_code: r.postal_code,
211 file_hash: r.file_hash,
212 file_path_encrypted: r.file_path_encrypted,
213 ocr_confidence: r.ocr_confidence,
214 manually_verified: r.manually_verified,
215 uploaded_by: r.uploaded_by,
216 uploaded_at: r.uploaded_at,
217 verified_at: r.verified_at,
218 verified_by: r.verified_by,
219 consent_timestamp: r.consent_timestamp,
220 consent_ip: r.consent_ip,
221 consent_user_agent: r.consent_user_agent,
222 consent_signature_hash: r.consent_signature_hash,
223 anonymized: r.anonymized,
224 retention_until: r.retention_until,
225 deleted_at: r.deleted_at,
226 created_at: r.created_at,
227 updated_at: r.updated_at,
228 })
229 .collect();
230
231 Ok(uploads)
232 }
233
234 async fn find_by_campaign_and_unit(
235 &self,
236 campaign_id: Uuid,
237 unit_id: Uuid,
238 ) -> Result<Option<EnergyBillUpload>, String> {
239 let row = sqlx::query!(
240 r#"
241 SELECT
242 id, campaign_id, unit_id, building_id, organization_id,
243 bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
244 file_hash, file_path_encrypted, ocr_confidence, manually_verified,
245 uploaded_by, uploaded_at, verified_at, verified_by,
246 consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
247 anonymized, retention_until, deleted_at, created_at, updated_at
248 FROM energy_bill_uploads
249 WHERE campaign_id = $1 AND unit_id = $2
250 "#,
251 campaign_id,
252 unit_id
253 )
254 .fetch_optional(&self.pool)
255 .await
256 .map_err(|e| format!("Failed to find upload by campaign and unit: {}", e))?;
257
258 Ok(row.map(|r| EnergyBillUpload {
259 id: r.id,
260 campaign_id: r.campaign_id,
261 unit_id: r.unit_id,
262 building_id: r.building_id,
263 organization_id: r.organization_id,
264 bill_period_start: r.bill_period_start,
265 bill_period_end: r.bill_period_end,
266 total_kwh_encrypted: r.total_kwh_encrypted,
267 energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
268 provider: r.provider,
269 postal_code: r.postal_code,
270 file_hash: r.file_hash,
271 file_path_encrypted: r.file_path_encrypted,
272 ocr_confidence: r.ocr_confidence,
273 manually_verified: r.manually_verified,
274 uploaded_by: r.uploaded_by,
275 uploaded_at: r.uploaded_at,
276 verified_at: r.verified_at,
277 verified_by: r.verified_by,
278 consent_timestamp: r.consent_timestamp,
279 consent_ip: r.consent_ip,
280 consent_user_agent: r.consent_user_agent,
281 consent_signature_hash: r.consent_signature_hash,
282 anonymized: r.anonymized,
283 retention_until: r.retention_until,
284 deleted_at: r.deleted_at,
285 created_at: r.created_at,
286 updated_at: r.updated_at,
287 }))
288 }
289
290 async fn find_by_building(&self, building_id: Uuid) -> Result<Vec<EnergyBillUpload>, String> {
291 let rows = sqlx::query!(
292 r#"
293 SELECT
294 id, campaign_id, unit_id, building_id, organization_id,
295 bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
296 file_hash, file_path_encrypted, ocr_confidence, manually_verified,
297 uploaded_by, uploaded_at, verified_at, verified_by,
298 consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
299 anonymized, retention_until, deleted_at, created_at, updated_at
300 FROM energy_bill_uploads
301 WHERE building_id = $1
302 ORDER BY uploaded_at DESC
303 "#,
304 building_id
305 )
306 .fetch_all(&self.pool)
307 .await
308 .map_err(|e| format!("Failed to find uploads by building: {}", e))?;
309
310 let uploads = rows
311 .into_iter()
312 .map(|r| EnergyBillUpload {
313 id: r.id,
314 campaign_id: r.campaign_id,
315 unit_id: r.unit_id,
316 building_id: r.building_id,
317 organization_id: r.organization_id,
318 bill_period_start: r.bill_period_start,
319 bill_period_end: r.bill_period_end,
320 total_kwh_encrypted: r.total_kwh_encrypted,
321 energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
322 provider: r.provider,
323 postal_code: r.postal_code,
324 file_hash: r.file_hash,
325 file_path_encrypted: r.file_path_encrypted,
326 ocr_confidence: r.ocr_confidence,
327 manually_verified: r.manually_verified,
328 uploaded_by: r.uploaded_by,
329 uploaded_at: r.uploaded_at,
330 verified_at: r.verified_at,
331 verified_by: r.verified_by,
332 consent_timestamp: r.consent_timestamp,
333 consent_ip: r.consent_ip,
334 consent_user_agent: r.consent_user_agent,
335 consent_signature_hash: r.consent_signature_hash,
336 anonymized: r.anonymized,
337 retention_until: r.retention_until,
338 deleted_at: r.deleted_at,
339 created_at: r.created_at,
340 updated_at: r.updated_at,
341 })
342 .collect();
343
344 Ok(uploads)
345 }
346
347 async fn update(&self, upload: &EnergyBillUpload) -> Result<EnergyBillUpload, String> {
348 sqlx::query!(
349 r#"
350 UPDATE energy_bill_uploads
351 SET
352 bill_period_start = $2,
353 bill_period_end = $3,
354 total_kwh_encrypted = $4,
355 energy_type = $5,
356 provider = $6,
357 postal_code = $7,
358 file_hash = $8,
359 file_path_encrypted = $9,
360 ocr_confidence = $10,
361 manually_verified = $11,
362 verified_at = $12,
363 verified_by = $13,
364 consent_timestamp = $14,
365 consent_ip = $15,
366 consent_user_agent = $16,
367 consent_signature_hash = $17,
368 anonymized = $18,
369 retention_until = $19,
370 deleted_at = $20,
371 updated_at = $21
372 WHERE id = $1
373 "#,
374 upload.id,
375 upload.bill_period_start,
376 upload.bill_period_end,
377 &upload.total_kwh_encrypted,
378 upload.energy_type.to_string(),
379 upload.provider.as_ref(),
380 upload.postal_code,
381 upload.file_hash,
382 upload.file_path_encrypted,
383 upload.ocr_confidence,
384 upload.manually_verified,
385 upload.verified_at,
386 upload.verified_by,
387 upload.consent_timestamp,
388 upload.consent_ip,
389 upload.consent_user_agent,
390 upload.consent_signature_hash,
391 upload.anonymized,
392 upload.retention_until,
393 upload.deleted_at,
394 upload.updated_at,
395 )
396 .execute(&self.pool)
397 .await
398 .map_err(|e| format!("Failed to update energy bill upload: {}", e))?;
399
400 Ok(upload.clone())
401 }
402
403 async fn delete(&self, id: Uuid) -> Result<(), String> {
404 sqlx::query!(
406 r#"
407 UPDATE energy_bill_uploads
408 SET deleted_at = NOW(), updated_at = NOW()
409 WHERE id = $1
410 "#,
411 id
412 )
413 .execute(&self.pool)
414 .await
415 .map_err(|e| format!("Failed to delete energy bill upload: {}", e))?;
416
417 Ok(())
418 }
419
420 async fn find_expired(&self) -> Result<Vec<EnergyBillUpload>, String> {
421 let rows = sqlx::query!(
422 r#"
423 SELECT
424 id, campaign_id, unit_id, building_id, organization_id,
425 bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
426 file_hash, file_path_encrypted, ocr_confidence, manually_verified,
427 uploaded_by, uploaded_at, verified_at, verified_by,
428 consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
429 anonymized, retention_until, deleted_at, created_at, updated_at
430 FROM energy_bill_uploads
431 WHERE retention_until < NOW() AND deleted_at IS NULL
432 "#
433 )
434 .fetch_all(&self.pool)
435 .await
436 .map_err(|e| format!("Failed to find expired uploads: {}", e))?;
437
438 let uploads = rows
439 .into_iter()
440 .map(|r| EnergyBillUpload {
441 id: r.id,
442 campaign_id: r.campaign_id,
443 unit_id: r.unit_id,
444 building_id: r.building_id,
445 organization_id: r.organization_id,
446 bill_period_start: r.bill_period_start,
447 bill_period_end: r.bill_period_end,
448 total_kwh_encrypted: r.total_kwh_encrypted,
449 energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
450 provider: r.provider,
451 postal_code: r.postal_code,
452 file_hash: r.file_hash,
453 file_path_encrypted: r.file_path_encrypted,
454 ocr_confidence: r.ocr_confidence,
455 manually_verified: r.manually_verified,
456 uploaded_by: r.uploaded_by,
457 uploaded_at: r.uploaded_at,
458 verified_at: r.verified_at,
459 verified_by: r.verified_by,
460 consent_timestamp: r.consent_timestamp,
461 consent_ip: r.consent_ip,
462 consent_user_agent: r.consent_user_agent,
463 consent_signature_hash: r.consent_signature_hash,
464 anonymized: r.anonymized,
465 retention_until: r.retention_until,
466 deleted_at: r.deleted_at,
467 created_at: r.created_at,
468 updated_at: r.updated_at,
469 })
470 .collect();
471
472 Ok(uploads)
473 }
474
475 async fn count_verified_by_campaign(&self, campaign_id: Uuid) -> Result<i32, String> {
476 let result = sqlx::query!(
477 r#"
478 SELECT COUNT(*) as count
479 FROM energy_bill_uploads
480 WHERE campaign_id = $1
481 AND manually_verified = TRUE
482 AND deleted_at IS NULL
483 "#,
484 campaign_id
485 )
486 .fetch_one(&self.pool)
487 .await
488 .map_err(|e| format!("Failed to count verified uploads: {}", e))?;
489
490 Ok(result.count.unwrap_or(0) as i32)
491 }
492
493 async fn find_verified_by_campaign(
494 &self,
495 campaign_id: Uuid,
496 ) -> Result<Vec<EnergyBillUpload>, String> {
497 let rows = sqlx::query!(
498 r#"
499 SELECT
500 id, campaign_id, unit_id, building_id, organization_id,
501 bill_period_start, bill_period_end, total_kwh_encrypted, energy_type, provider, postal_code,
502 file_hash, file_path_encrypted, ocr_confidence, manually_verified,
503 uploaded_by, uploaded_at, verified_at, verified_by,
504 consent_timestamp, consent_ip, consent_user_agent, consent_signature_hash,
505 anonymized, retention_until, deleted_at, created_at, updated_at
506 FROM energy_bill_uploads
507 WHERE campaign_id = $1
508 AND manually_verified = TRUE
509 AND deleted_at IS NULL
510 ORDER BY uploaded_at DESC
511 "#,
512 campaign_id
513 )
514 .fetch_all(&self.pool)
515 .await
516 .map_err(|e| format!("Failed to find verified uploads: {}", e))?;
517
518 let uploads = rows
519 .into_iter()
520 .map(|r| EnergyBillUpload {
521 id: r.id,
522 campaign_id: r.campaign_id,
523 unit_id: r.unit_id,
524 building_id: r.building_id,
525 organization_id: r.organization_id,
526 bill_period_start: r.bill_period_start,
527 bill_period_end: r.bill_period_end,
528 total_kwh_encrypted: r.total_kwh_encrypted,
529 energy_type: r.energy_type.parse().unwrap_or(EnergyType::Electricity),
530 provider: r.provider,
531 postal_code: r.postal_code,
532 file_hash: r.file_hash,
533 file_path_encrypted: r.file_path_encrypted,
534 ocr_confidence: r.ocr_confidence,
535 manually_verified: r.manually_verified,
536 uploaded_by: r.uploaded_by,
537 uploaded_at: r.uploaded_at,
538 verified_at: r.verified_at,
539 verified_by: r.verified_by,
540 consent_timestamp: r.consent_timestamp,
541 consent_ip: r.consent_ip,
542 consent_user_agent: r.consent_user_agent,
543 consent_signature_hash: r.consent_signature_hash,
544 anonymized: r.anonymized,
545 retention_until: r.retention_until,
546 deleted_at: r.deleted_at,
547 created_at: r.created_at,
548 updated_at: r.updated_at,
549 })
550 .collect();
551
552 Ok(uploads)
553 }
554
555 async fn delete_expired(&self) -> Result<i32, String> {
556 let result = sqlx::query!(
557 r#"
558 UPDATE energy_bill_uploads
559 SET deleted_at = NOW(), updated_at = NOW()
560 WHERE retention_until < NOW()
561 AND deleted_at IS NULL
562 "#
563 )
564 .execute(&self.pool)
565 .await
566 .map_err(|e| format!("Failed to delete expired uploads: {}", e))?;
567
568 Ok(result.rows_affected() as i32)
569 }
570}