1use crate::application::dto::{ConsumptionStatsDto, DailyAggregateDto, MonthlyAggregateDto};
2use crate::application::ports::IoTRepository;
3use crate::domain::entities::{DeviceType, IoTReading, LinkyDevice, MetricType};
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use sqlx::PgPool;
7use uuid::Uuid;
8
9pub struct PostgresIoTRepository {
11 pool: PgPool,
12}
13
14impl PostgresIoTRepository {
15 pub fn new(pool: PgPool) -> Self {
16 Self { pool }
17 }
18}
19
20#[async_trait]
21impl IoTRepository for PostgresIoTRepository {
22 async fn create_reading(&self, reading: &IoTReading) -> Result<IoTReading, String> {
23 let record = sqlx::query!(
24 r#"
25 INSERT INTO iot_readings (
26 id, building_id, device_type, metric_type, value,
27 unit, timestamp, source, metadata, created_at
28 )
29 VALUES ($1, $2, $3::TEXT::device_type, $4::TEXT::metric_type, $5, $6, $7, $8, $9, $10)
30 RETURNING id, building_id, device_type::text AS "device_type!", metric_type::text AS "metric_type!", value,
31 unit, timestamp, source, metadata, created_at
32 "#,
33 reading.id,
34 reading.building_id,
35 reading.device_type.to_string(),
36 reading.metric_type.to_string(),
37 reading.value,
38 reading.unit,
39 reading.timestamp,
40 reading.source,
41 reading.metadata,
42 reading.created_at,
43 )
44 .fetch_one(&self.pool)
45 .await
46 .map_err(|e| format!("Failed to create IoT reading: {}", e))?;
47
48 Ok(IoTReading {
49 id: record.id,
50 building_id: record.building_id,
51 device_type: record
52 .device_type
53 .parse()
54 .map_err(|e| format!("Invalid device_type: {}", e))?,
55 metric_type: record
56 .metric_type
57 .parse()
58 .map_err(|e| format!("Invalid metric_type: {}", e))?,
59 value: record.value,
60 unit: record.unit,
62 timestamp: record.timestamp,
63 source: record.source,
64 metadata: record.metadata,
65 created_at: record.created_at,
66 })
67 }
68
69 async fn create_readings_bulk(&self, readings: &[IoTReading]) -> Result<usize, String> {
70 if readings.is_empty() {
71 return Ok(0);
72 }
73
74 let mut tx = self
75 .pool
76 .begin()
77 .await
78 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
79
80 let mut count = 0;
81
82 for reading in readings {
83 sqlx::query!(
84 r#"
85 INSERT INTO iot_readings (
86 id, building_id, device_type, metric_type, value,
87 unit, timestamp, source, metadata, created_at
88 )
89 VALUES ($1, $2, $3::TEXT::device_type, $4::TEXT::metric_type, $5, $6, $7, $8, $9, $10)
90 "#,
91 reading.id,
92 reading.building_id,
93 reading.device_type.to_string(),
94 reading.metric_type.to_string(),
95 reading.value,
96 reading.unit,
97 reading.timestamp,
98 reading.source,
99 reading.metadata,
100 reading.created_at,
101 )
102 .execute(&mut *tx)
103 .await
104 .map_err(|e| format!("Failed to insert reading: {}", e))?;
105
106 count += 1;
107 }
108
109 tx.commit()
110 .await
111 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
112
113 Ok(count)
114 }
115
116 async fn find_readings_by_building(
117 &self,
118 building_id: Uuid,
119 device_type: Option<DeviceType>,
120 metric_type: Option<MetricType>,
121 start_date: DateTime<Utc>,
122 end_date: DateTime<Utc>,
123 limit: Option<usize>,
124 ) -> Result<Vec<IoTReading>, String> {
125 let limit = limit.unwrap_or(1000).min(10000) as i64;
126 let device_type_str = device_type.as_ref().map(|dt| dt.to_string());
127 let metric_type_str = metric_type.as_ref().map(|mt| mt.to_string());
128
129 let records = sqlx::query!(
130 r#"
131 SELECT id, building_id, device_type::TEXT as device_type, metric_type::TEXT as metric_type, value,
132 unit, timestamp, source, metadata, created_at
133 FROM iot_readings
134 WHERE building_id = $1
135 AND timestamp >= $2
136 AND timestamp <= $3
137 AND ($4::TEXT IS NULL OR device_type::TEXT = $4)
138 AND ($5::TEXT IS NULL OR metric_type::TEXT = $5)
139 ORDER BY timestamp DESC
140 LIMIT $6
141 "#,
142 building_id,
143 start_date,
144 end_date,
145 device_type_str,
146 metric_type_str,
147 limit,
148 )
149 .fetch_all(&self.pool)
150 .await
151 .map_err(|e| format!("Failed to query IoT readings: {}", e))?;
152
153 records
154 .into_iter()
155 .map(|record| {
156 Ok(IoTReading {
157 id: record.id,
158 building_id: record.building_id,
159 device_type: record
160 .device_type
161 .ok_or("Device type is required")?
162 .parse()
163 .map_err(|e| format!("Invalid device_type: {}", e))?,
164 metric_type: record
165 .metric_type
166 .ok_or("Metric type is required")?
167 .parse()
168 .map_err(|e| format!("Invalid metric_type: {}", e))?,
169 value: record.value,
170 unit: record.unit,
172 timestamp: record.timestamp,
173 source: record.source,
174 metadata: record.metadata,
175 created_at: record.created_at,
176 })
177 })
178 .collect()
179 }
180
181 async fn get_consumption_stats(
182 &self,
183 building_id: Uuid,
184 metric_type: MetricType,
185 start_date: DateTime<Utc>,
186 end_date: DateTime<Utc>,
187 ) -> Result<ConsumptionStatsDto, String> {
188 let metric_type_str = metric_type.to_string();
189
190 let record = sqlx::query!(
191 r#"
192 SELECT
193 COUNT(*) as "reading_count!",
194 SUM(value) as total_consumption,
195 AVG(value) as average_daily,
196 MIN(value) as min_value,
197 MAX(value) as max_value,
198 unit,
199 source
200 FROM iot_readings
201 WHERE building_id = $1
202 AND metric_type::TEXT = $2
203 AND timestamp >= $3
204 AND timestamp <= $4
205 GROUP BY unit, source
206 "#,
207 building_id,
208 metric_type_str,
209 start_date,
210 end_date,
211 )
212 .fetch_one(&self.pool)
213 .await
214 .map_err(|e| format!("Failed to get consumption stats: {}", e))?;
215
216 Ok(ConsumptionStatsDto {
217 building_id,
218 metric_type,
219 period_start: start_date,
220 period_end: end_date,
221 total_consumption: record.total_consumption.unwrap_or(0.0),
222 average_daily: record.average_daily.unwrap_or(0.0),
223 min_value: record.min_value.unwrap_or(0.0),
224 max_value: record.max_value.unwrap_or(0.0),
225 reading_count: record.reading_count,
226 unit: record.unit,
227 source: record.source,
228 })
229 }
230
231 async fn get_daily_aggregates(
232 &self,
233 building_id: Uuid,
234 device_type: DeviceType,
235 metric_type: MetricType,
236 start_date: DateTime<Utc>,
237 end_date: DateTime<Utc>,
238 ) -> Result<Vec<DailyAggregateDto>, String> {
239 let device_type_str = device_type.to_string();
240 let metric_type_str = metric_type.to_string();
241
242 let records = sqlx::query!(
243 r#"
244 SELECT
245 DATE(timestamp) as "day!",
246 AVG(value) as avg_value,
247 MIN(value) as min_value,
248 MAX(value) as max_value,
249 SUM(value) as total_value,
250 COUNT(*) as "reading_count!",
251 source
252 FROM iot_readings
253 WHERE building_id = $1
254 AND device_type::TEXT = $2
255 AND metric_type::TEXT = $3
256 AND timestamp >= $4
257 AND timestamp <= $5
258 GROUP BY DATE(timestamp), source
259 ORDER BY DATE(timestamp) ASC
260 "#,
261 building_id,
262 device_type_str,
263 metric_type_str,
264 start_date,
265 end_date,
266 )
267 .fetch_all(&self.pool)
268 .await
269 .map_err(|e| format!("Failed to get daily aggregates: {}", e))?;
270
271 Ok(records
272 .into_iter()
273 .map(|record| DailyAggregateDto {
274 building_id,
275 device_type,
276 metric_type,
277 day: record.day.and_hms_opt(0, 0, 0).unwrap().and_utc(),
278 avg_value: record.avg_value.unwrap_or(0.0),
279 min_value: record.min_value.unwrap_or(0.0),
280 max_value: record.max_value.unwrap_or(0.0),
281 total_value: record.total_value.unwrap_or(0.0),
282 reading_count: record.reading_count,
283 source: record.source,
284 })
285 .collect())
286 }
287
288 async fn get_monthly_aggregates(
289 &self,
290 building_id: Uuid,
291 device_type: DeviceType,
292 metric_type: MetricType,
293 start_date: DateTime<Utc>,
294 end_date: DateTime<Utc>,
295 ) -> Result<Vec<MonthlyAggregateDto>, String> {
296 let device_type_str = device_type.to_string();
297 let metric_type_str = metric_type.to_string();
298
299 let records = sqlx::query!(
300 r#"
301 SELECT
302 DATE_TRUNC('month', timestamp) as "month!",
303 AVG(value) as avg_value,
304 MIN(value) as min_value,
305 MAX(value) as max_value,
306 SUM(value) as total_value,
307 COUNT(*) as "reading_count!",
308 source
309 FROM iot_readings
310 WHERE building_id = $1
311 AND device_type::TEXT = $2
312 AND metric_type::TEXT = $3
313 AND timestamp >= $4
314 AND timestamp <= $5
315 GROUP BY DATE_TRUNC('month', timestamp), source
316 ORDER BY DATE_TRUNC('month', timestamp) ASC
317 "#,
318 building_id,
319 device_type_str,
320 metric_type_str,
321 start_date,
322 end_date,
323 )
324 .fetch_all(&self.pool)
325 .await
326 .map_err(|e| format!("Failed to get monthly aggregates: {}", e))?;
327
328 Ok(records
329 .into_iter()
330 .map(|record| MonthlyAggregateDto {
331 building_id,
332 device_type,
333 metric_type,
334 month: record.month,
335 avg_value: record.avg_value.unwrap_or(0.0),
336 min_value: record.min_value.unwrap_or(0.0),
337 max_value: record.max_value.unwrap_or(0.0),
338 total_value: record.total_value.unwrap_or(0.0),
339 reading_count: record.reading_count,
340 source: record.source,
341 })
342 .collect())
343 }
344
345 async fn detect_anomalies(
346 &self,
347 building_id: Uuid,
348 metric_type: MetricType,
349 threshold_percentage: f64,
350 lookback_days: i64,
351 ) -> Result<Vec<IoTReading>, String> {
352 let metric_type_str = metric_type.to_string();
353
354 let avg_record = sqlx::query!(
356 r#"
357 SELECT AVG(value) as avg_value
358 FROM iot_readings
359 WHERE building_id = $1
360 AND metric_type::TEXT = $2
361 AND timestamp >= NOW() - INTERVAL '1 day' * $3
362 "#,
363 building_id,
364 metric_type_str,
365 lookback_days as f64,
366 )
367 .fetch_one(&self.pool)
368 .await
369 .map_err(|e| format!("Failed to calculate average: {}", e))?;
370
371 let avg_value = avg_record.avg_value.unwrap_or(0.0);
372 let threshold = avg_value * (threshold_percentage / 100.0);
373 let upper_bound = avg_value + threshold;
374 let lower_bound = (avg_value - threshold).max(0.0);
375
376 let records = sqlx::query!(
378 r#"
379 SELECT id, building_id, device_type::TEXT as device_type, metric_type::TEXT as metric_type, value,
380 unit, timestamp, source, metadata, created_at
381 FROM iot_readings
382 WHERE building_id = $1
383 AND metric_type::TEXT = $2
384 AND timestamp >= NOW() - INTERVAL '1 day' * $3
385 AND (value > $4 OR value < $5)
386 ORDER BY timestamp DESC
387 "#,
388 building_id,
389 metric_type_str,
390 lookback_days as f64,
391 upper_bound,
392 lower_bound,
393 )
394 .fetch_all(&self.pool)
395 .await
396 .map_err(|e| format!("Failed to detect anomalies: {}", e))?;
397
398 records
399 .into_iter()
400 .map(|record| {
401 Ok(IoTReading {
402 id: record.id,
403 building_id: record.building_id,
404 device_type: record
405 .device_type
406 .ok_or("Device type is required")?
407 .parse()
408 .map_err(|e| format!("Invalid device_type: {}", e))?,
409 metric_type: record
410 .metric_type
411 .ok_or("Metric type is required")?
412 .parse()
413 .map_err(|e| format!("Invalid metric_type: {}", e))?,
414 value: record.value,
415 unit: record.unit,
417 timestamp: record.timestamp,
418 source: record.source,
419 metadata: record.metadata,
420 created_at: record.created_at,
421 })
422 })
423 .collect()
424 }
425
426 async fn create_linky_device(&self, device: &LinkyDevice) -> Result<LinkyDevice, String> {
427 let provider_str = device.provider.to_string();
428
429 let record = sqlx::query!(
430 r#"
431 INSERT INTO linky_devices (
432 id, building_id, prm, provider, api_key_encrypted, refresh_token_encrypted,
433 token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
434 )
435 VALUES ($1, $2, $3, $4::TEXT::linky_provider, $5, $6, $7, $8, $9, $10, $11)
436 RETURNING id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
437 token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
438 "#,
439 device.id,
440 device.building_id,
441 device.prm,
442 provider_str,
443 device.api_key_encrypted,
444 device.refresh_token_encrypted,
445 device.token_expires_at,
446 device.sync_enabled,
447 device.last_sync_at,
448 device.created_at,
449 device.updated_at,
450 )
451 .fetch_one(&self.pool)
452 .await
453 .map_err(|e| format!("Failed to create Linky device: {}", e))?;
454
455 Ok(LinkyDevice {
456 id: record.id,
457 building_id: record.building_id,
458 prm: record.prm,
459 provider: record
460 .provider
461 .ok_or("Provider is required")?
462 .parse()
463 .map_err(|e| format!("Invalid provider: {}", e))?,
464 api_key_encrypted: record.api_key_encrypted,
465 refresh_token_encrypted: record.refresh_token_encrypted,
466 token_expires_at: record.token_expires_at,
467 sync_enabled: record.sync_enabled,
468 last_sync_at: record.last_sync_at,
469 created_at: record.created_at,
470 updated_at: record.updated_at,
471 })
472 }
473
474 async fn find_linky_device_by_id(
475 &self,
476 device_id: Uuid,
477 ) -> Result<Option<LinkyDevice>, String> {
478 let record = sqlx::query!(
479 r#"
480 SELECT id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
481 token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
482 FROM linky_devices
483 WHERE id = $1
484 "#,
485 device_id,
486 )
487 .fetch_optional(&self.pool)
488 .await
489 .map_err(|e| format!("Failed to find Linky device: {}", e))?;
490
491 record
492 .map(|r| -> Result<LinkyDevice, String> {
493 Ok(LinkyDevice {
494 id: r.id,
495 building_id: r.building_id,
496 prm: r.prm,
497 provider: r
498 .provider
499 .ok_or("Provider is required")?
500 .parse()
501 .map_err(|e| format!("Invalid provider: {}", e))?,
502 api_key_encrypted: r.api_key_encrypted,
503 refresh_token_encrypted: r.refresh_token_encrypted,
504 token_expires_at: r.token_expires_at,
505 sync_enabled: r.sync_enabled,
506 last_sync_at: r.last_sync_at,
507 created_at: r.created_at,
508 updated_at: r.updated_at,
509 })
510 })
511 .transpose()
512 }
513
514 async fn find_linky_device_by_building(
515 &self,
516 building_id: Uuid,
517 ) -> Result<Option<LinkyDevice>, String> {
518 let record = sqlx::query!(
519 r#"
520 SELECT id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
521 token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
522 FROM linky_devices
523 WHERE building_id = $1
524 "#,
525 building_id,
526 )
527 .fetch_optional(&self.pool)
528 .await
529 .map_err(|e| format!("Failed to find Linky device by building: {}", e))?;
530
531 record
532 .map(|r| -> Result<LinkyDevice, String> {
533 Ok(LinkyDevice {
534 id: r.id,
535 building_id: r.building_id,
536 prm: r.prm,
537 provider: r
538 .provider
539 .ok_or("Provider is required")?
540 .parse()
541 .map_err(|e| format!("Invalid provider: {}", e))?,
542 api_key_encrypted: r.api_key_encrypted,
543 refresh_token_encrypted: r.refresh_token_encrypted,
544 token_expires_at: r.token_expires_at,
545 sync_enabled: r.sync_enabled,
546 last_sync_at: r.last_sync_at,
547 created_at: r.created_at,
548 updated_at: r.updated_at,
549 })
550 })
551 .transpose()
552 }
553
554 async fn find_linky_device_by_prm(
555 &self,
556 prm: &str,
557 provider: &str,
558 ) -> Result<Option<LinkyDevice>, String> {
559 let record = sqlx::query!(
560 r#"
561 SELECT id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
562 token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
563 FROM linky_devices
564 WHERE prm = $1 AND provider::TEXT = $2
565 "#,
566 prm,
567 provider,
568 )
569 .fetch_optional(&self.pool)
570 .await
571 .map_err(|e| format!("Failed to find Linky device by PRM: {}", e))?;
572
573 record
574 .map(|r| -> Result<LinkyDevice, String> {
575 Ok(LinkyDevice {
576 id: r.id,
577 building_id: r.building_id,
578 prm: r.prm,
579 provider: r
580 .provider
581 .ok_or("Provider is required")?
582 .parse()
583 .map_err(|e| format!("Invalid provider: {}", e))?,
584 api_key_encrypted: r.api_key_encrypted,
585 refresh_token_encrypted: r.refresh_token_encrypted,
586 token_expires_at: r.token_expires_at,
587 sync_enabled: r.sync_enabled,
588 last_sync_at: r.last_sync_at,
589 created_at: r.created_at,
590 updated_at: r.updated_at,
591 })
592 })
593 .transpose()
594 }
595
596 async fn update_linky_device(&self, device: &LinkyDevice) -> Result<LinkyDevice, String> {
597 let provider_str = device.provider.to_string();
598
599 let record = sqlx::query!(
600 r#"
601 UPDATE linky_devices
602 SET building_id = $2,
603 prm = $3,
604 provider = $4::TEXT::linky_provider,
605 api_key_encrypted = $5,
606 refresh_token_encrypted = $6,
607 token_expires_at = $7,
608 sync_enabled = $8,
609 last_sync_at = $9,
610 updated_at = $10
611 WHERE id = $1
612 RETURNING id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
613 token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
614 "#,
615 device.id,
616 device.building_id,
617 device.prm,
618 provider_str,
619 device.api_key_encrypted,
620 device.refresh_token_encrypted,
621 device.token_expires_at,
622 device.sync_enabled,
623 device.last_sync_at,
624 device.updated_at,
625 )
626 .fetch_one(&self.pool)
627 .await
628 .map_err(|e| format!("Failed to update Linky device: {}", e))?;
629
630 Ok(LinkyDevice {
631 id: record.id,
632 building_id: record.building_id,
633 prm: record.prm,
634 provider: record
635 .provider
636 .ok_or("Provider is required")?
637 .parse()
638 .map_err(|e| format!("Invalid provider: {}", e))?,
639 api_key_encrypted: record.api_key_encrypted,
640 refresh_token_encrypted: record.refresh_token_encrypted,
641 token_expires_at: record.token_expires_at,
642 sync_enabled: record.sync_enabled,
643 last_sync_at: record.last_sync_at,
644 created_at: record.created_at,
645 updated_at: record.updated_at,
646 })
647 }
648
649 async fn delete_linky_device(&self, device_id: Uuid) -> Result<(), String> {
650 sqlx::query!(
651 r#"
652 DELETE FROM linky_devices
653 WHERE id = $1
654 "#,
655 device_id,
656 )
657 .execute(&self.pool)
658 .await
659 .map_err(|e| format!("Failed to delete Linky device: {}", e))?;
660
661 Ok(())
662 }
663
664 async fn find_devices_needing_sync(&self) -> Result<Vec<LinkyDevice>, String> {
665 let records = sqlx::query!(
666 r#"
667 SELECT id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
668 token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
669 FROM linky_devices
670 WHERE sync_enabled = true
671 AND (last_sync_at IS NULL OR last_sync_at < NOW() - INTERVAL '1 day')
672 ORDER BY last_sync_at ASC NULLS FIRST
673 "#,
674 )
675 .fetch_all(&self.pool)
676 .await
677 .map_err(|e| format!("Failed to find devices needing sync: {}", e))?;
678
679 records
680 .into_iter()
681 .map(|r| -> Result<LinkyDevice, String> {
682 Ok(LinkyDevice {
683 id: r.id,
684 building_id: r.building_id,
685 prm: r.prm,
686 provider: r
687 .provider
688 .ok_or("Provider is required")?
689 .parse()
690 .map_err(|e| format!("Invalid provider: {}", e))?,
691 api_key_encrypted: r.api_key_encrypted,
692 refresh_token_encrypted: r.refresh_token_encrypted,
693 token_expires_at: r.token_expires_at,
694 sync_enabled: r.sync_enabled,
695 last_sync_at: r.last_sync_at,
696 created_at: r.created_at,
697 updated_at: r.updated_at,
698 })
699 })
700 .collect()
701 }
702
703 async fn find_devices_with_expired_tokens(&self) -> Result<Vec<LinkyDevice>, String> {
704 let records = sqlx::query!(
705 r#"
706 SELECT id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
707 token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
708 FROM linky_devices
709 WHERE token_expires_at < NOW()
710 ORDER BY token_expires_at ASC
711 "#,
712 )
713 .fetch_all(&self.pool)
714 .await
715 .map_err(|e| format!("Failed to find devices with expired tokens: {}", e))?;
716
717 records
718 .into_iter()
719 .map(|r| -> Result<LinkyDevice, String> {
720 Ok(LinkyDevice {
721 id: r.id,
722 building_id: r.building_id,
723 prm: r.prm,
724 provider: r
725 .provider
726 .ok_or("Provider is required")?
727 .parse()
728 .map_err(|e| format!("Invalid provider: {}", e))?,
729 api_key_encrypted: r.api_key_encrypted,
730 refresh_token_encrypted: r.refresh_token_encrypted,
731 token_expires_at: r.token_expires_at,
732 sync_enabled: r.sync_enabled,
733 last_sync_at: r.last_sync_at,
734 created_at: r.created_at,
735 updated_at: r.updated_at,
736 })
737 })
738 .collect()
739 }
740}