1use crate::application::dto::{ConsumptionStatsDto, DailyAggregateDto, MonthlyAggregateDto};
2use crate::application::ports::IoTRepository;
3use crate::domain::entities::{DeviceType, IoTReading, LinkyDevice, MetricType};
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use sqlx::PgPool;
7use uuid::Uuid;
8
9pub struct PostgresIoTRepository {
11 pool: PgPool,
12}
13
14impl PostgresIoTRepository {
15 pub fn new(pool: PgPool) -> Self {
16 Self { pool }
17 }
18}
19
20#[async_trait]
21impl IoTRepository for PostgresIoTRepository {
22 async fn create_reading(&self, reading: &IoTReading) -> Result<IoTReading, String> {
23 let record = sqlx::query!(
24 r#"
25 INSERT INTO iot_readings (
26 id, building_id, device_type, metric_type, value,
27 unit, timestamp, source, metadata, created_at
28 )
29 VALUES ($1, $2, $3::TEXT::device_type, $4::TEXT::metric_type, $5, $6, $7, $8, $9, $10)
30 RETURNING id, building_id, device_type::text AS "device_type!", metric_type::text AS "metric_type!", value,
31 unit, timestamp, source, metadata, created_at
32 "#,
33 reading.id,
34 reading.building_id,
35 reading.device_type.to_string(),
36 reading.metric_type.to_string(),
37 reading.value,
38 reading.unit,
39 reading.timestamp,
40 reading.source,
41 reading.metadata,
42 reading.created_at,
43 )
44 .fetch_one(&self.pool)
45 .await
46 .map_err(|e| format!("Failed to create IoT reading: {}", e))?;
47
48 Ok(IoTReading {
49 id: record.id,
50 building_id: record.building_id,
51 device_type: record
52 .device_type
53 .parse()
54 .map_err(|e| format!("Invalid device_type: {}", e))?,
55 metric_type: record
56 .metric_type
57 .parse()
58 .map_err(|e| format!("Invalid metric_type: {}", e))?,
59 value: record.value,
60 unit: record.unit,
62 timestamp: record.timestamp,
63 source: record.source,
64 metadata: record.metadata,
65 created_at: record.created_at,
66 })
67 }
68
69 async fn create_readings_bulk(&self, readings: &[IoTReading]) -> Result<usize, String> {
70 if readings.is_empty() {
71 return Ok(0);
72 }
73
74 let mut tx = self
75 .pool
76 .begin()
77 .await
78 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
79
80 let mut count = 0;
81
82 for reading in readings {
83 sqlx::query!(
84 r#"
85 INSERT INTO iot_readings (
86 id, building_id, device_type, metric_type, value,
87 unit, timestamp, source, metadata, created_at
88 )
89 VALUES ($1, $2, $3::TEXT::device_type, $4::TEXT::metric_type, $5, $6, $7, $8, $9, $10)
90 "#,
91 reading.id,
92 reading.building_id,
93 reading.device_type.to_string(),
94 reading.metric_type.to_string(),
95 reading.value,
96 reading.unit,
97 reading.timestamp,
98 reading.source,
99 reading.metadata,
100 reading.created_at,
101 )
102 .execute(&mut *tx)
103 .await
104 .map_err(|e| format!("Failed to insert reading: {}", e))?;
105
106 count += 1;
107 }
108
109 tx.commit()
110 .await
111 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
112
113 Ok(count)
114 }
115
116 async fn find_readings_by_building(
117 &self,
118 building_id: Uuid,
119 device_type: Option<DeviceType>,
120 metric_type: Option<MetricType>,
121 start_date: DateTime<Utc>,
122 end_date: DateTime<Utc>,
123 limit: Option<usize>,
124 ) -> Result<Vec<IoTReading>, String> {
125 let limit = limit.unwrap_or(1000).min(10000) as i64;
126 let device_type_str = device_type.as_ref().map(|dt| dt.to_string());
127 let metric_type_str = metric_type.as_ref().map(|mt| mt.to_string());
128
129 let records = sqlx::query!(
130 r#"
131 SELECT id, building_id, device_type::TEXT as device_type, metric_type::TEXT as metric_type, value,
132 unit, timestamp, source, metadata, created_at
133 FROM iot_readings
134 WHERE building_id = $1
135 AND timestamp >= $2
136 AND timestamp <= $3
137 AND ($4::TEXT IS NULL OR device_type::TEXT = $4)
138 AND ($5::TEXT IS NULL OR metric_type::TEXT = $5)
139 ORDER BY timestamp DESC
140 LIMIT $6
141 "#,
142 building_id,
143 start_date,
144 end_date,
145 device_type_str,
146 metric_type_str,
147 limit,
148 )
149 .fetch_all(&self.pool)
150 .await
151 .map_err(|e| format!("Failed to query IoT readings: {}", e))?;
152
153 records
154 .into_iter()
155 .map(|record| {
156 Ok(IoTReading {
157 id: record.id,
158 building_id: record.building_id,
159 device_type: record
160 .device_type
161 .ok_or("Device type is required")?
162 .parse()
163 .map_err(|e| format!("Invalid device_type: {}", e))?,
164 metric_type: record
165 .metric_type
166 .ok_or("Metric type is required")?
167 .parse()
168 .map_err(|e| format!("Invalid metric_type: {}", e))?,
169 value: record.value,
170 unit: record.unit,
172 timestamp: record.timestamp,
173 source: record.source,
174 metadata: record.metadata,
175 created_at: record.created_at,
176 })
177 })
178 .collect()
179 }
180
181 async fn get_consumption_stats(
182 &self,
183 building_id: Uuid,
184 metric_type: MetricType,
185 start_date: DateTime<Utc>,
186 end_date: DateTime<Utc>,
187 ) -> Result<ConsumptionStatsDto, String> {
188 let metric_type_str = metric_type.to_string();
189
190 let record = sqlx::query!(
191 r#"
192 SELECT
193 COUNT(*) as "reading_count!",
194 SUM(value) as total_consumption,
195 AVG(value) as average_daily,
196 MIN(value) as min_value,
197 MAX(value) as max_value,
198 unit,
199 source
200 FROM iot_readings
201 WHERE building_id = $1
202 AND metric_type::TEXT = $2
203 AND timestamp >= $3
204 AND timestamp <= $4
205 GROUP BY unit, source
206 "#,
207 building_id,
208 metric_type_str,
209 start_date,
210 end_date,
211 )
212 .fetch_optional(&self.pool)
213 .await
214 .map_err(|e| format!("Failed to get consumption stats: {}", e))?;
215
216 Ok(match record {
217 Some(r) => ConsumptionStatsDto {
218 building_id,
219 metric_type,
220 period_start: start_date,
221 period_end: end_date,
222 total_consumption: r.total_consumption.unwrap_or(0.0),
223 average_daily: r.average_daily.unwrap_or(0.0),
224 min_value: r.min_value.unwrap_or(0.0),
225 max_value: r.max_value.unwrap_or(0.0),
226 reading_count: r.reading_count,
227 unit: r.unit,
228 source: r.source,
229 },
230 None => ConsumptionStatsDto {
231 building_id,
232 metric_type,
233 period_start: start_date,
234 period_end: end_date,
235 total_consumption: 0.0,
236 average_daily: 0.0,
237 min_value: 0.0,
238 max_value: 0.0,
239 reading_count: 0,
240 unit: String::new(),
241 source: String::new(),
242 },
243 })
244 }
245
246 async fn get_daily_aggregates(
247 &self,
248 building_id: Uuid,
249 device_type: DeviceType,
250 metric_type: MetricType,
251 start_date: DateTime<Utc>,
252 end_date: DateTime<Utc>,
253 ) -> Result<Vec<DailyAggregateDto>, String> {
254 let device_type_str = device_type.to_string();
255 let metric_type_str = metric_type.to_string();
256
257 let records = sqlx::query!(
258 r#"
259 SELECT
260 DATE(timestamp) as "day!",
261 AVG(value) as avg_value,
262 MIN(value) as min_value,
263 MAX(value) as max_value,
264 SUM(value) as total_value,
265 COUNT(*) as "reading_count!",
266 source
267 FROM iot_readings
268 WHERE building_id = $1
269 AND device_type::TEXT = $2
270 AND metric_type::TEXT = $3
271 AND timestamp >= $4
272 AND timestamp <= $5
273 GROUP BY DATE(timestamp), source
274 ORDER BY DATE(timestamp) ASC
275 "#,
276 building_id,
277 device_type_str,
278 metric_type_str,
279 start_date,
280 end_date,
281 )
282 .fetch_all(&self.pool)
283 .await
284 .map_err(|e| format!("Failed to get daily aggregates: {}", e))?;
285
286 Ok(records
287 .into_iter()
288 .map(|record| DailyAggregateDto {
289 building_id,
290 device_type,
291 metric_type,
292 day: record.day.and_hms_opt(0, 0, 0).unwrap().and_utc(),
293 avg_value: record.avg_value.unwrap_or(0.0),
294 min_value: record.min_value.unwrap_or(0.0),
295 max_value: record.max_value.unwrap_or(0.0),
296 total_value: record.total_value.unwrap_or(0.0),
297 reading_count: record.reading_count,
298 source: record.source,
299 })
300 .collect())
301 }
302
303 async fn get_monthly_aggregates(
304 &self,
305 building_id: Uuid,
306 device_type: DeviceType,
307 metric_type: MetricType,
308 start_date: DateTime<Utc>,
309 end_date: DateTime<Utc>,
310 ) -> Result<Vec<MonthlyAggregateDto>, String> {
311 let device_type_str = device_type.to_string();
312 let metric_type_str = metric_type.to_string();
313
314 let records = sqlx::query!(
315 r#"
316 SELECT
317 DATE_TRUNC('month', timestamp) as "month!",
318 AVG(value) as avg_value,
319 MIN(value) as min_value,
320 MAX(value) as max_value,
321 SUM(value) as total_value,
322 COUNT(*) as "reading_count!",
323 source
324 FROM iot_readings
325 WHERE building_id = $1
326 AND device_type::TEXT = $2
327 AND metric_type::TEXT = $3
328 AND timestamp >= $4
329 AND timestamp <= $5
330 GROUP BY DATE_TRUNC('month', timestamp), source
331 ORDER BY DATE_TRUNC('month', timestamp) ASC
332 "#,
333 building_id,
334 device_type_str,
335 metric_type_str,
336 start_date,
337 end_date,
338 )
339 .fetch_all(&self.pool)
340 .await
341 .map_err(|e| format!("Failed to get monthly aggregates: {}", e))?;
342
343 Ok(records
344 .into_iter()
345 .map(|record| MonthlyAggregateDto {
346 building_id,
347 device_type,
348 metric_type,
349 month: record.month,
350 avg_value: record.avg_value.unwrap_or(0.0),
351 min_value: record.min_value.unwrap_or(0.0),
352 max_value: record.max_value.unwrap_or(0.0),
353 total_value: record.total_value.unwrap_or(0.0),
354 reading_count: record.reading_count,
355 source: record.source,
356 })
357 .collect())
358 }
359
360 async fn detect_anomalies(
361 &self,
362 building_id: Uuid,
363 metric_type: MetricType,
364 threshold_percentage: f64,
365 lookback_days: i64,
366 ) -> Result<Vec<IoTReading>, String> {
367 let metric_type_str = metric_type.to_string();
368
369 let avg_record = sqlx::query!(
371 r#"
372 SELECT AVG(value) as avg_value
373 FROM iot_readings
374 WHERE building_id = $1
375 AND metric_type::TEXT = $2
376 AND timestamp >= NOW() - INTERVAL '1 day' * $3
377 "#,
378 building_id,
379 metric_type_str,
380 lookback_days as f64,
381 )
382 .fetch_one(&self.pool)
383 .await
384 .map_err(|e| format!("Failed to calculate average: {}", e))?;
385
386 let avg_value = avg_record.avg_value.unwrap_or(0.0);
387 let threshold = avg_value * (threshold_percentage / 100.0);
388 let upper_bound = avg_value + threshold;
389 let lower_bound = (avg_value - threshold).max(0.0);
390
391 let records = sqlx::query!(
393 r#"
394 SELECT id, building_id, device_type::TEXT as device_type, metric_type::TEXT as metric_type, value,
395 unit, timestamp, source, metadata, created_at
396 FROM iot_readings
397 WHERE building_id = $1
398 AND metric_type::TEXT = $2
399 AND timestamp >= NOW() - INTERVAL '1 day' * $3
400 AND (value > $4 OR value < $5)
401 ORDER BY timestamp DESC
402 "#,
403 building_id,
404 metric_type_str,
405 lookback_days as f64,
406 upper_bound,
407 lower_bound,
408 )
409 .fetch_all(&self.pool)
410 .await
411 .map_err(|e| format!("Failed to detect anomalies: {}", e))?;
412
413 records
414 .into_iter()
415 .map(|record| {
416 Ok(IoTReading {
417 id: record.id,
418 building_id: record.building_id,
419 device_type: record
420 .device_type
421 .ok_or("Device type is required")?
422 .parse()
423 .map_err(|e| format!("Invalid device_type: {}", e))?,
424 metric_type: record
425 .metric_type
426 .ok_or("Metric type is required")?
427 .parse()
428 .map_err(|e| format!("Invalid metric_type: {}", e))?,
429 value: record.value,
430 unit: record.unit,
432 timestamp: record.timestamp,
433 source: record.source,
434 metadata: record.metadata,
435 created_at: record.created_at,
436 })
437 })
438 .collect()
439 }
440
441 async fn create_linky_device(&self, device: &LinkyDevice) -> Result<LinkyDevice, String> {
442 let provider_str = device.provider.to_string();
443
444 let record = sqlx::query!(
445 r#"
446 INSERT INTO linky_devices (
447 id, building_id, prm, provider, api_key_encrypted, refresh_token_encrypted,
448 token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
449 )
450 VALUES ($1, $2, $3, $4::TEXT::linky_provider, $5, $6, $7, $8, $9, $10, $11)
451 RETURNING id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
452 token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
453 "#,
454 device.id,
455 device.building_id,
456 device.prm,
457 provider_str,
458 device.api_key_encrypted,
459 device.refresh_token_encrypted,
460 device.token_expires_at,
461 device.sync_enabled,
462 device.last_sync_at,
463 device.created_at,
464 device.updated_at,
465 )
466 .fetch_one(&self.pool)
467 .await
468 .map_err(|e| format!("Failed to create Linky device: {}", e))?;
469
470 Ok(LinkyDevice {
471 id: record.id,
472 building_id: record.building_id,
473 prm: record.prm,
474 provider: record
475 .provider
476 .ok_or("Provider is required")?
477 .parse()
478 .map_err(|e| format!("Invalid provider: {}", e))?,
479 api_key_encrypted: record.api_key_encrypted,
480 refresh_token_encrypted: record.refresh_token_encrypted,
481 token_expires_at: record.token_expires_at,
482 sync_enabled: record.sync_enabled,
483 last_sync_at: record.last_sync_at,
484 created_at: record.created_at,
485 updated_at: record.updated_at,
486 })
487 }
488
489 async fn find_linky_device_by_id(
490 &self,
491 device_id: Uuid,
492 ) -> Result<Option<LinkyDevice>, String> {
493 let record = sqlx::query!(
494 r#"
495 SELECT id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
496 token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
497 FROM linky_devices
498 WHERE id = $1
499 "#,
500 device_id,
501 )
502 .fetch_optional(&self.pool)
503 .await
504 .map_err(|e| format!("Failed to find Linky device: {}", e))?;
505
506 record
507 .map(|r| -> Result<LinkyDevice, String> {
508 Ok(LinkyDevice {
509 id: r.id,
510 building_id: r.building_id,
511 prm: r.prm,
512 provider: r
513 .provider
514 .ok_or("Provider is required")?
515 .parse()
516 .map_err(|e| format!("Invalid provider: {}", e))?,
517 api_key_encrypted: r.api_key_encrypted,
518 refresh_token_encrypted: r.refresh_token_encrypted,
519 token_expires_at: r.token_expires_at,
520 sync_enabled: r.sync_enabled,
521 last_sync_at: r.last_sync_at,
522 created_at: r.created_at,
523 updated_at: r.updated_at,
524 })
525 })
526 .transpose()
527 }
528
529 async fn find_linky_device_by_building(
530 &self,
531 building_id: Uuid,
532 ) -> Result<Option<LinkyDevice>, String> {
533 let record = sqlx::query!(
534 r#"
535 SELECT id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
536 token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
537 FROM linky_devices
538 WHERE building_id = $1
539 "#,
540 building_id,
541 )
542 .fetch_optional(&self.pool)
543 .await
544 .map_err(|e| format!("Failed to find Linky device by building: {}", e))?;
545
546 record
547 .map(|r| -> Result<LinkyDevice, String> {
548 Ok(LinkyDevice {
549 id: r.id,
550 building_id: r.building_id,
551 prm: r.prm,
552 provider: r
553 .provider
554 .ok_or("Provider is required")?
555 .parse()
556 .map_err(|e| format!("Invalid provider: {}", e))?,
557 api_key_encrypted: r.api_key_encrypted,
558 refresh_token_encrypted: r.refresh_token_encrypted,
559 token_expires_at: r.token_expires_at,
560 sync_enabled: r.sync_enabled,
561 last_sync_at: r.last_sync_at,
562 created_at: r.created_at,
563 updated_at: r.updated_at,
564 })
565 })
566 .transpose()
567 }
568
569 async fn find_linky_device_by_prm(
570 &self,
571 prm: &str,
572 provider: &str,
573 ) -> Result<Option<LinkyDevice>, String> {
574 let record = sqlx::query!(
575 r#"
576 SELECT id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
577 token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
578 FROM linky_devices
579 WHERE prm = $1 AND provider::TEXT = $2
580 "#,
581 prm,
582 provider,
583 )
584 .fetch_optional(&self.pool)
585 .await
586 .map_err(|e| format!("Failed to find Linky device by PRM: {}", e))?;
587
588 record
589 .map(|r| -> Result<LinkyDevice, String> {
590 Ok(LinkyDevice {
591 id: r.id,
592 building_id: r.building_id,
593 prm: r.prm,
594 provider: r
595 .provider
596 .ok_or("Provider is required")?
597 .parse()
598 .map_err(|e| format!("Invalid provider: {}", e))?,
599 api_key_encrypted: r.api_key_encrypted,
600 refresh_token_encrypted: r.refresh_token_encrypted,
601 token_expires_at: r.token_expires_at,
602 sync_enabled: r.sync_enabled,
603 last_sync_at: r.last_sync_at,
604 created_at: r.created_at,
605 updated_at: r.updated_at,
606 })
607 })
608 .transpose()
609 }
610
611 async fn update_linky_device(&self, device: &LinkyDevice) -> Result<LinkyDevice, String> {
612 let provider_str = device.provider.to_string();
613
614 let record = sqlx::query!(
615 r#"
616 UPDATE linky_devices
617 SET building_id = $2,
618 prm = $3,
619 provider = $4::TEXT::linky_provider,
620 api_key_encrypted = $5,
621 refresh_token_encrypted = $6,
622 token_expires_at = $7,
623 sync_enabled = $8,
624 last_sync_at = $9,
625 updated_at = $10
626 WHERE id = $1
627 RETURNING id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
628 token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
629 "#,
630 device.id,
631 device.building_id,
632 device.prm,
633 provider_str,
634 device.api_key_encrypted,
635 device.refresh_token_encrypted,
636 device.token_expires_at,
637 device.sync_enabled,
638 device.last_sync_at,
639 device.updated_at,
640 )
641 .fetch_one(&self.pool)
642 .await
643 .map_err(|e| format!("Failed to update Linky device: {}", e))?;
644
645 Ok(LinkyDevice {
646 id: record.id,
647 building_id: record.building_id,
648 prm: record.prm,
649 provider: record
650 .provider
651 .ok_or("Provider is required")?
652 .parse()
653 .map_err(|e| format!("Invalid provider: {}", e))?,
654 api_key_encrypted: record.api_key_encrypted,
655 refresh_token_encrypted: record.refresh_token_encrypted,
656 token_expires_at: record.token_expires_at,
657 sync_enabled: record.sync_enabled,
658 last_sync_at: record.last_sync_at,
659 created_at: record.created_at,
660 updated_at: record.updated_at,
661 })
662 }
663
664 async fn delete_linky_device(&self, device_id: Uuid) -> Result<(), String> {
665 sqlx::query!(
666 r#"
667 DELETE FROM linky_devices
668 WHERE id = $1
669 "#,
670 device_id,
671 )
672 .execute(&self.pool)
673 .await
674 .map_err(|e| format!("Failed to delete Linky device: {}", e))?;
675
676 Ok(())
677 }
678
679 async fn find_devices_needing_sync(&self) -> Result<Vec<LinkyDevice>, String> {
680 let records = sqlx::query!(
681 r#"
682 SELECT id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
683 token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
684 FROM linky_devices
685 WHERE sync_enabled = true
686 AND (last_sync_at IS NULL OR last_sync_at < NOW() - INTERVAL '1 day')
687 ORDER BY last_sync_at ASC NULLS FIRST
688 "#,
689 )
690 .fetch_all(&self.pool)
691 .await
692 .map_err(|e| format!("Failed to find devices needing sync: {}", e))?;
693
694 records
695 .into_iter()
696 .map(|r| -> Result<LinkyDevice, String> {
697 Ok(LinkyDevice {
698 id: r.id,
699 building_id: r.building_id,
700 prm: r.prm,
701 provider: r
702 .provider
703 .ok_or("Provider is required")?
704 .parse()
705 .map_err(|e| format!("Invalid provider: {}", e))?,
706 api_key_encrypted: r.api_key_encrypted,
707 refresh_token_encrypted: r.refresh_token_encrypted,
708 token_expires_at: r.token_expires_at,
709 sync_enabled: r.sync_enabled,
710 last_sync_at: r.last_sync_at,
711 created_at: r.created_at,
712 updated_at: r.updated_at,
713 })
714 })
715 .collect()
716 }
717
718 async fn find_devices_with_expired_tokens(&self) -> Result<Vec<LinkyDevice>, String> {
719 let records = sqlx::query!(
720 r#"
721 SELECT id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
722 token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
723 FROM linky_devices
724 WHERE token_expires_at < NOW()
725 ORDER BY token_expires_at ASC
726 "#,
727 )
728 .fetch_all(&self.pool)
729 .await
730 .map_err(|e| format!("Failed to find devices with expired tokens: {}", e))?;
731
732 records
733 .into_iter()
734 .map(|r| -> Result<LinkyDevice, String> {
735 Ok(LinkyDevice {
736 id: r.id,
737 building_id: r.building_id,
738 prm: r.prm,
739 provider: r
740 .provider
741 .ok_or("Provider is required")?
742 .parse()
743 .map_err(|e| format!("Invalid provider: {}", e))?,
744 api_key_encrypted: r.api_key_encrypted,
745 refresh_token_encrypted: r.refresh_token_encrypted,
746 token_expires_at: r.token_expires_at,
747 sync_enabled: r.sync_enabled,
748 last_sync_at: r.last_sync_at,
749 created_at: r.created_at,
750 updated_at: r.updated_at,
751 })
752 })
753 .collect()
754 }
755}