koprogo_api/infrastructure/database/repositories/
iot_repository_impl.rs

1use crate::application::dto::{ConsumptionStatsDto, DailyAggregateDto, MonthlyAggregateDto};
2use crate::application::ports::IoTRepository;
3use crate::domain::entities::{DeviceType, IoTReading, LinkyDevice, MetricType};
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use sqlx::PgPool;
7use uuid::Uuid;
8
9/// PostgreSQL implementation of IoT repository
10pub struct PostgresIoTRepository {
11    pool: PgPool,
12}
13
14impl PostgresIoTRepository {
15    pub fn new(pool: PgPool) -> Self {
16        Self { pool }
17    }
18}
19
20#[async_trait]
21impl IoTRepository for PostgresIoTRepository {
22    async fn create_reading(&self, reading: &IoTReading) -> Result<IoTReading, String> {
23        let record = sqlx::query!(
24            r#"
25            INSERT INTO iot_readings (
26                id, building_id, device_type, metric_type, value,
27                unit, timestamp, source, metadata, created_at
28            )
29            VALUES ($1, $2, $3::TEXT::device_type, $4::TEXT::metric_type, $5, $6, $7, $8, $9, $10)
30            RETURNING id, building_id, device_type::text AS "device_type!", metric_type::text AS "metric_type!", value,
31                      unit, timestamp, source, metadata, created_at
32            "#,
33            reading.id,
34            reading.building_id,
35            reading.device_type.to_string(),
36            reading.metric_type.to_string(),
37            reading.value,
38            reading.unit,
39            reading.timestamp,
40            reading.source,
41            reading.metadata,
42            reading.created_at,
43        )
44        .fetch_one(&self.pool)
45        .await
46        .map_err(|e| format!("Failed to create IoT reading: {}", e))?;
47
48        Ok(IoTReading {
49            id: record.id,
50            building_id: record.building_id,
51            device_type: record
52                .device_type
53                .parse()
54                .map_err(|e| format!("Invalid device_type: {}", e))?,
55            metric_type: record
56                .metric_type
57                .parse()
58                .map_err(|e| format!("Invalid metric_type: {}", e))?,
59            value: record.value,
60            // value is a method, not a field
61            unit: record.unit,
62            timestamp: record.timestamp,
63            source: record.source,
64            metadata: record.metadata,
65            created_at: record.created_at,
66        })
67    }
68
69    async fn create_readings_bulk(&self, readings: &[IoTReading]) -> Result<usize, String> {
70        if readings.is_empty() {
71            return Ok(0);
72        }
73
74        let mut tx = self
75            .pool
76            .begin()
77            .await
78            .map_err(|e| format!("Failed to begin transaction: {}", e))?;
79
80        let mut count = 0;
81
82        for reading in readings {
83            sqlx::query!(
84                r#"
85                INSERT INTO iot_readings (
86                    id, building_id, device_type, metric_type, value,
87                    unit, timestamp, source, metadata, created_at
88                )
89                VALUES ($1, $2, $3::TEXT::device_type, $4::TEXT::metric_type, $5, $6, $7, $8, $9, $10)
90                "#,
91                reading.id,
92                reading.building_id,
93                reading.device_type.to_string(),
94                reading.metric_type.to_string(),
95                reading.value,
96                reading.unit,
97                reading.timestamp,
98                reading.source,
99                reading.metadata,
100                reading.created_at,
101            )
102            .execute(&mut *tx)
103            .await
104            .map_err(|e| format!("Failed to insert reading: {}", e))?;
105
106            count += 1;
107        }
108
109        tx.commit()
110            .await
111            .map_err(|e| format!("Failed to commit transaction: {}", e))?;
112
113        Ok(count)
114    }
115
116    async fn find_readings_by_building(
117        &self,
118        building_id: Uuid,
119        device_type: Option<DeviceType>,
120        metric_type: Option<MetricType>,
121        start_date: DateTime<Utc>,
122        end_date: DateTime<Utc>,
123        limit: Option<usize>,
124    ) -> Result<Vec<IoTReading>, String> {
125        let limit = limit.unwrap_or(1000).min(10000) as i64;
126        let device_type_str = device_type.as_ref().map(|dt| dt.to_string());
127        let metric_type_str = metric_type.as_ref().map(|mt| mt.to_string());
128
129        let records = sqlx::query!(
130            r#"
131            SELECT id, building_id, device_type::TEXT as device_type, metric_type::TEXT as metric_type, value,
132                   unit, timestamp, source, metadata, created_at
133            FROM iot_readings
134            WHERE building_id = $1
135              AND timestamp >= $2
136              AND timestamp <= $3
137              AND ($4::TEXT IS NULL OR device_type::TEXT = $4)
138              AND ($5::TEXT IS NULL OR metric_type::TEXT = $5)
139            ORDER BY timestamp DESC
140            LIMIT $6
141            "#,
142            building_id,
143            start_date,
144            end_date,
145            device_type_str,
146            metric_type_str,
147            limit,
148        )
149        .fetch_all(&self.pool)
150        .await
151        .map_err(|e| format!("Failed to query IoT readings: {}", e))?;
152
153        records
154            .into_iter()
155            .map(|record| {
156                Ok(IoTReading {
157                    id: record.id,
158                    building_id: record.building_id,
159                    device_type: record
160                        .device_type
161                        .ok_or("Device type is required")?
162                        .parse()
163                        .map_err(|e| format!("Invalid device_type: {}", e))?,
164                    metric_type: record
165                        .metric_type
166                        .ok_or("Metric type is required")?
167                        .parse()
168                        .map_err(|e| format!("Invalid metric_type: {}", e))?,
169                    value: record.value,
170                    // value is a method, not a field
171                    unit: record.unit,
172                    timestamp: record.timestamp,
173                    source: record.source,
174                    metadata: record.metadata,
175                    created_at: record.created_at,
176                })
177            })
178            .collect()
179    }
180
181    async fn get_consumption_stats(
182        &self,
183        building_id: Uuid,
184        metric_type: MetricType,
185        start_date: DateTime<Utc>,
186        end_date: DateTime<Utc>,
187    ) -> Result<ConsumptionStatsDto, String> {
188        let metric_type_str = metric_type.to_string();
189
190        let record = sqlx::query!(
191            r#"
192            SELECT
193                COUNT(*) as "reading_count!",
194                SUM(value) as total_consumption,
195                AVG(value) as average_daily,
196                MIN(value) as min_value,
197                MAX(value) as max_value,
198                unit,
199                source
200            FROM iot_readings
201            WHERE building_id = $1
202              AND metric_type::TEXT = $2
203              AND timestamp >= $3
204              AND timestamp <= $4
205            GROUP BY unit, source
206            "#,
207            building_id,
208            metric_type_str,
209            start_date,
210            end_date,
211        )
212        .fetch_one(&self.pool)
213        .await
214        .map_err(|e| format!("Failed to get consumption stats: {}", e))?;
215
216        Ok(ConsumptionStatsDto {
217            building_id,
218            metric_type,
219            period_start: start_date,
220            period_end: end_date,
221            total_consumption: record.total_consumption.unwrap_or(0.0),
222            average_daily: record.average_daily.unwrap_or(0.0),
223            min_value: record.min_value.unwrap_or(0.0),
224            max_value: record.max_value.unwrap_or(0.0),
225            reading_count: record.reading_count,
226            unit: record.unit,
227            source: record.source,
228        })
229    }
230
231    async fn get_daily_aggregates(
232        &self,
233        building_id: Uuid,
234        device_type: DeviceType,
235        metric_type: MetricType,
236        start_date: DateTime<Utc>,
237        end_date: DateTime<Utc>,
238    ) -> Result<Vec<DailyAggregateDto>, String> {
239        let device_type_str = device_type.to_string();
240        let metric_type_str = metric_type.to_string();
241
242        let records = sqlx::query!(
243            r#"
244            SELECT
245                DATE(timestamp) as "day!",
246                AVG(value) as avg_value,
247                MIN(value) as min_value,
248                MAX(value) as max_value,
249                SUM(value) as total_value,
250                COUNT(*) as "reading_count!",
251                source
252            FROM iot_readings
253            WHERE building_id = $1
254              AND device_type::TEXT = $2
255              AND metric_type::TEXT = $3
256              AND timestamp >= $4
257              AND timestamp <= $5
258            GROUP BY DATE(timestamp), source
259            ORDER BY DATE(timestamp) ASC
260            "#,
261            building_id,
262            device_type_str,
263            metric_type_str,
264            start_date,
265            end_date,
266        )
267        .fetch_all(&self.pool)
268        .await
269        .map_err(|e| format!("Failed to get daily aggregates: {}", e))?;
270
271        Ok(records
272            .into_iter()
273            .map(|record| DailyAggregateDto {
274                building_id,
275                device_type,
276                metric_type,
277                day: record.day.and_hms_opt(0, 0, 0).unwrap().and_utc(),
278                avg_value: record.avg_value.unwrap_or(0.0),
279                min_value: record.min_value.unwrap_or(0.0),
280                max_value: record.max_value.unwrap_or(0.0),
281                total_value: record.total_value.unwrap_or(0.0),
282                reading_count: record.reading_count,
283                source: record.source,
284            })
285            .collect())
286    }
287
288    async fn get_monthly_aggregates(
289        &self,
290        building_id: Uuid,
291        device_type: DeviceType,
292        metric_type: MetricType,
293        start_date: DateTime<Utc>,
294        end_date: DateTime<Utc>,
295    ) -> Result<Vec<MonthlyAggregateDto>, String> {
296        let device_type_str = device_type.to_string();
297        let metric_type_str = metric_type.to_string();
298
299        let records = sqlx::query!(
300            r#"
301            SELECT
302                DATE_TRUNC('month', timestamp) as "month!",
303                AVG(value) as avg_value,
304                MIN(value) as min_value,
305                MAX(value) as max_value,
306                SUM(value) as total_value,
307                COUNT(*) as "reading_count!",
308                source
309            FROM iot_readings
310            WHERE building_id = $1
311              AND device_type::TEXT = $2
312              AND metric_type::TEXT = $3
313              AND timestamp >= $4
314              AND timestamp <= $5
315            GROUP BY DATE_TRUNC('month', timestamp), source
316            ORDER BY DATE_TRUNC('month', timestamp) ASC
317            "#,
318            building_id,
319            device_type_str,
320            metric_type_str,
321            start_date,
322            end_date,
323        )
324        .fetch_all(&self.pool)
325        .await
326        .map_err(|e| format!("Failed to get monthly aggregates: {}", e))?;
327
328        Ok(records
329            .into_iter()
330            .map(|record| MonthlyAggregateDto {
331                building_id,
332                device_type,
333                metric_type,
334                month: record.month,
335                avg_value: record.avg_value.unwrap_or(0.0),
336                min_value: record.min_value.unwrap_or(0.0),
337                max_value: record.max_value.unwrap_or(0.0),
338                total_value: record.total_value.unwrap_or(0.0),
339                reading_count: record.reading_count,
340                source: record.source,
341            })
342            .collect())
343    }
344
345    async fn detect_anomalies(
346        &self,
347        building_id: Uuid,
348        metric_type: MetricType,
349        threshold_percentage: f64,
350        lookback_days: i64,
351    ) -> Result<Vec<IoTReading>, String> {
352        let metric_type_str = metric_type.to_string();
353
354        // Calculate average value for the lookback period
355        let avg_record = sqlx::query!(
356            r#"
357            SELECT AVG(value) as avg_value
358            FROM iot_readings
359            WHERE building_id = $1
360              AND metric_type::TEXT = $2
361              AND timestamp >= NOW() - INTERVAL '1 day' * $3
362            "#,
363            building_id,
364            metric_type_str,
365            lookback_days as f64,
366        )
367        .fetch_one(&self.pool)
368        .await
369        .map_err(|e| format!("Failed to calculate average: {}", e))?;
370
371        let avg_value = avg_record.avg_value.unwrap_or(0.0);
372        let threshold = avg_value * (threshold_percentage / 100.0);
373        let upper_bound = avg_value + threshold;
374        let lower_bound = (avg_value - threshold).max(0.0);
375
376        // Find readings outside the threshold
377        let records = sqlx::query!(
378            r#"
379            SELECT id, building_id, device_type::TEXT as device_type, metric_type::TEXT as metric_type, value,
380                   unit, timestamp, source, metadata, created_at
381            FROM iot_readings
382            WHERE building_id = $1
383              AND metric_type::TEXT = $2
384              AND timestamp >= NOW() - INTERVAL '1 day' * $3
385              AND (value > $4 OR value < $5)
386            ORDER BY timestamp DESC
387            "#,
388            building_id,
389            metric_type_str,
390            lookback_days as f64,
391            upper_bound,
392            lower_bound,
393        )
394        .fetch_all(&self.pool)
395        .await
396        .map_err(|e| format!("Failed to detect anomalies: {}", e))?;
397
398        records
399            .into_iter()
400            .map(|record| {
401                Ok(IoTReading {
402                    id: record.id,
403                    building_id: record.building_id,
404                    device_type: record
405                        .device_type
406                        .ok_or("Device type is required")?
407                        .parse()
408                        .map_err(|e| format!("Invalid device_type: {}", e))?,
409                    metric_type: record
410                        .metric_type
411                        .ok_or("Metric type is required")?
412                        .parse()
413                        .map_err(|e| format!("Invalid metric_type: {}", e))?,
414                    value: record.value,
415                    // value is a method, not a field
416                    unit: record.unit,
417                    timestamp: record.timestamp,
418                    source: record.source,
419                    metadata: record.metadata,
420                    created_at: record.created_at,
421                })
422            })
423            .collect()
424    }
425
426    async fn create_linky_device(&self, device: &LinkyDevice) -> Result<LinkyDevice, String> {
427        let provider_str = device.provider.to_string();
428
429        let record = sqlx::query!(
430            r#"
431            INSERT INTO linky_devices (
432                id, building_id, prm, provider, api_key_encrypted, refresh_token_encrypted,
433                token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
434            )
435            VALUES ($1, $2, $3, $4::TEXT::linky_provider, $5, $6, $7, $8, $9, $10, $11)
436            RETURNING id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
437                      token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
438            "#,
439            device.id,
440            device.building_id,
441            device.prm,
442            provider_str,
443            device.api_key_encrypted,
444            device.refresh_token_encrypted,
445            device.token_expires_at,
446            device.sync_enabled,
447            device.last_sync_at,
448            device.created_at,
449            device.updated_at,
450        )
451        .fetch_one(&self.pool)
452        .await
453        .map_err(|e| format!("Failed to create Linky device: {}", e))?;
454
455        Ok(LinkyDevice {
456            id: record.id,
457            building_id: record.building_id,
458            prm: record.prm,
459            provider: record
460                .provider
461                .ok_or("Provider is required")?
462                .parse()
463                .map_err(|e| format!("Invalid provider: {}", e))?,
464            api_key_encrypted: record.api_key_encrypted,
465            refresh_token_encrypted: record.refresh_token_encrypted,
466            token_expires_at: record.token_expires_at,
467            sync_enabled: record.sync_enabled,
468            last_sync_at: record.last_sync_at,
469            created_at: record.created_at,
470            updated_at: record.updated_at,
471        })
472    }
473
474    async fn find_linky_device_by_id(
475        &self,
476        device_id: Uuid,
477    ) -> Result<Option<LinkyDevice>, String> {
478        let record = sqlx::query!(
479            r#"
480            SELECT id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
481                   token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
482            FROM linky_devices
483            WHERE id = $1
484            "#,
485            device_id,
486        )
487        .fetch_optional(&self.pool)
488        .await
489        .map_err(|e| format!("Failed to find Linky device: {}", e))?;
490
491        record
492            .map(|r| -> Result<LinkyDevice, String> {
493                Ok(LinkyDevice {
494                    id: r.id,
495                    building_id: r.building_id,
496                    prm: r.prm,
497                    provider: r
498                        .provider
499                        .ok_or("Provider is required")?
500                        .parse()
501                        .map_err(|e| format!("Invalid provider: {}", e))?,
502                    api_key_encrypted: r.api_key_encrypted,
503                    refresh_token_encrypted: r.refresh_token_encrypted,
504                    token_expires_at: r.token_expires_at,
505                    sync_enabled: r.sync_enabled,
506                    last_sync_at: r.last_sync_at,
507                    created_at: r.created_at,
508                    updated_at: r.updated_at,
509                })
510            })
511            .transpose()
512    }
513
514    async fn find_linky_device_by_building(
515        &self,
516        building_id: Uuid,
517    ) -> Result<Option<LinkyDevice>, String> {
518        let record = sqlx::query!(
519            r#"
520            SELECT id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
521                   token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
522            FROM linky_devices
523            WHERE building_id = $1
524            "#,
525            building_id,
526        )
527        .fetch_optional(&self.pool)
528        .await
529        .map_err(|e| format!("Failed to find Linky device by building: {}", e))?;
530
531        record
532            .map(|r| -> Result<LinkyDevice, String> {
533                Ok(LinkyDevice {
534                    id: r.id,
535                    building_id: r.building_id,
536                    prm: r.prm,
537                    provider: r
538                        .provider
539                        .ok_or("Provider is required")?
540                        .parse()
541                        .map_err(|e| format!("Invalid provider: {}", e))?,
542                    api_key_encrypted: r.api_key_encrypted,
543                    refresh_token_encrypted: r.refresh_token_encrypted,
544                    token_expires_at: r.token_expires_at,
545                    sync_enabled: r.sync_enabled,
546                    last_sync_at: r.last_sync_at,
547                    created_at: r.created_at,
548                    updated_at: r.updated_at,
549                })
550            })
551            .transpose()
552    }
553
554    async fn find_linky_device_by_prm(
555        &self,
556        prm: &str,
557        provider: &str,
558    ) -> Result<Option<LinkyDevice>, String> {
559        let record = sqlx::query!(
560            r#"
561            SELECT id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
562                   token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
563            FROM linky_devices
564            WHERE prm = $1 AND provider::TEXT = $2
565            "#,
566            prm,
567            provider,
568        )
569        .fetch_optional(&self.pool)
570        .await
571        .map_err(|e| format!("Failed to find Linky device by PRM: {}", e))?;
572
573        record
574            .map(|r| -> Result<LinkyDevice, String> {
575                Ok(LinkyDevice {
576                    id: r.id,
577                    building_id: r.building_id,
578                    prm: r.prm,
579                    provider: r
580                        .provider
581                        .ok_or("Provider is required")?
582                        .parse()
583                        .map_err(|e| format!("Invalid provider: {}", e))?,
584                    api_key_encrypted: r.api_key_encrypted,
585                    refresh_token_encrypted: r.refresh_token_encrypted,
586                    token_expires_at: r.token_expires_at,
587                    sync_enabled: r.sync_enabled,
588                    last_sync_at: r.last_sync_at,
589                    created_at: r.created_at,
590                    updated_at: r.updated_at,
591                })
592            })
593            .transpose()
594    }
595
596    async fn update_linky_device(&self, device: &LinkyDevice) -> Result<LinkyDevice, String> {
597        let provider_str = device.provider.to_string();
598
599        let record = sqlx::query!(
600            r#"
601            UPDATE linky_devices
602            SET building_id = $2,
603                prm = $3,
604                provider = $4::TEXT::linky_provider,
605                api_key_encrypted = $5,
606                refresh_token_encrypted = $6,
607                token_expires_at = $7,
608                sync_enabled = $8,
609                last_sync_at = $9,
610                updated_at = $10
611            WHERE id = $1
612            RETURNING id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
613                      token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
614            "#,
615            device.id,
616            device.building_id,
617            device.prm,
618            provider_str,
619            device.api_key_encrypted,
620            device.refresh_token_encrypted,
621            device.token_expires_at,
622            device.sync_enabled,
623            device.last_sync_at,
624            device.updated_at,
625        )
626        .fetch_one(&self.pool)
627        .await
628        .map_err(|e| format!("Failed to update Linky device: {}", e))?;
629
630        Ok(LinkyDevice {
631            id: record.id,
632            building_id: record.building_id,
633            prm: record.prm,
634            provider: record
635                .provider
636                .ok_or("Provider is required")?
637                .parse()
638                .map_err(|e| format!("Invalid provider: {}", e))?,
639            api_key_encrypted: record.api_key_encrypted,
640            refresh_token_encrypted: record.refresh_token_encrypted,
641            token_expires_at: record.token_expires_at,
642            sync_enabled: record.sync_enabled,
643            last_sync_at: record.last_sync_at,
644            created_at: record.created_at,
645            updated_at: record.updated_at,
646        })
647    }
648
649    async fn delete_linky_device(&self, device_id: Uuid) -> Result<(), String> {
650        sqlx::query!(
651            r#"
652            DELETE FROM linky_devices
653            WHERE id = $1
654            "#,
655            device_id,
656        )
657        .execute(&self.pool)
658        .await
659        .map_err(|e| format!("Failed to delete Linky device: {}", e))?;
660
661        Ok(())
662    }
663
664    async fn find_devices_needing_sync(&self) -> Result<Vec<LinkyDevice>, String> {
665        let records = sqlx::query!(
666            r#"
667            SELECT id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
668                   token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
669            FROM linky_devices
670            WHERE sync_enabled = true
671              AND (last_sync_at IS NULL OR last_sync_at < NOW() - INTERVAL '1 day')
672            ORDER BY last_sync_at ASC NULLS FIRST
673            "#,
674        )
675        .fetch_all(&self.pool)
676        .await
677        .map_err(|e| format!("Failed to find devices needing sync: {}", e))?;
678
679        records
680            .into_iter()
681            .map(|r| -> Result<LinkyDevice, String> {
682                Ok(LinkyDevice {
683                    id: r.id,
684                    building_id: r.building_id,
685                    prm: r.prm,
686                    provider: r
687                        .provider
688                        .ok_or("Provider is required")?
689                        .parse()
690                        .map_err(|e| format!("Invalid provider: {}", e))?,
691                    api_key_encrypted: r.api_key_encrypted,
692                    refresh_token_encrypted: r.refresh_token_encrypted,
693                    token_expires_at: r.token_expires_at,
694                    sync_enabled: r.sync_enabled,
695                    last_sync_at: r.last_sync_at,
696                    created_at: r.created_at,
697                    updated_at: r.updated_at,
698                })
699            })
700            .collect()
701    }
702
703    async fn find_devices_with_expired_tokens(&self) -> Result<Vec<LinkyDevice>, String> {
704        let records = sqlx::query!(
705            r#"
706            SELECT id, building_id, prm, provider::TEXT as provider, api_key_encrypted, refresh_token_encrypted,
707                   token_expires_at, sync_enabled, last_sync_at, created_at, updated_at
708            FROM linky_devices
709            WHERE token_expires_at < NOW()
710            ORDER BY token_expires_at ASC
711            "#,
712        )
713        .fetch_all(&self.pool)
714        .await
715        .map_err(|e| format!("Failed to find devices with expired tokens: {}", e))?;
716
717        records
718            .into_iter()
719            .map(|r| -> Result<LinkyDevice, String> {
720                Ok(LinkyDevice {
721                    id: r.id,
722                    building_id: r.building_id,
723                    prm: r.prm,
724                    provider: r
725                        .provider
726                        .ok_or("Provider is required")?
727                        .parse()
728                        .map_err(|e| format!("Invalid provider: {}", e))?,
729                    api_key_encrypted: r.api_key_encrypted,
730                    refresh_token_encrypted: r.refresh_token_encrypted,
731                    token_expires_at: r.token_expires_at,
732                    sync_enabled: r.sync_enabled,
733                    last_sync_at: r.last_sync_at,
734                    created_at: r.created_at,
735                    updated_at: r.updated_at,
736                })
737            })
738            .collect()
739    }
740}