koprogo_api/application/use_cases/
iot_use_cases.rs

1use crate::application::dto::{
2    ConfigureLinkyDeviceDto, ConsumptionStatsDto, CreateIoTReadingDto, DailyAggregateDto,
3    IoTReadingResponseDto, LinkyDeviceResponseDto, LinkySyncResponseDto, MonthlyAggregateDto,
4    QueryIoTReadingsDto, SyncLinkyDataDto,
5};
6use crate::application::ports::{IoTRepository, LinkyApiClient};
7use crate::domain::entities::{DeviceType, IoTReading, LinkyDevice, MetricType};
8use crate::infrastructure::audit::{log_audit_event, AuditEventType};
9use chrono::{DateTime, Utc};
10use std::sync::Arc;
11use uuid::Uuid;
12
13/// Use cases for managing IoT readings and consumption data
14pub struct IoTUseCases {
15    iot_repo: Arc<dyn IoTRepository>,
16}
17
18impl IoTUseCases {
19    pub fn new(iot_repo: Arc<dyn IoTRepository>) -> Self {
20        Self { iot_repo }
21    }
22
23    /// Create a single IoT reading
24    pub async fn create_reading(
25        &self,
26        dto: CreateIoTReadingDto,
27        user_id: Uuid,
28        organization_id: Uuid,
29    ) -> Result<IoTReadingResponseDto, String> {
30        // Validate device type and metric type compatibility
31        Self::validate_metric_for_device(&dto.device_type, &dto.metric_type)?;
32
33        let mut reading = IoTReading::new(
34            dto.building_id,
35            dto.device_type,
36            dto.metric_type,
37            dto.value,
38            dto.unit,
39            dto.timestamp,
40            dto.source,
41        )?;
42
43        if let Some(metadata) = dto.metadata {
44            reading = reading.with_metadata(metadata);
45        }
46
47        let created = self.iot_repo.create_reading(&reading).await?;
48
49        // Audit log (async, non-blocking)
50        tokio::spawn(async move {
51            log_audit_event(
52                AuditEventType::IoTReadingCreated,
53                Some(user_id),
54                Some(organization_id),
55                Some(format!(
56                    "IoT reading created: {:?} {} {}",
57                    dto.device_type, dto.metric_type, dto.value
58                )),
59                None,
60            )
61            .await;
62        });
63
64        Ok(IoTReadingResponseDto::from(created))
65    }
66
67    /// Create multiple IoT readings in bulk
68    pub async fn create_readings_bulk(
69        &self,
70        dtos: Vec<CreateIoTReadingDto>,
71        user_id: Uuid,
72        organization_id: Uuid,
73    ) -> Result<usize, String> {
74        if dtos.is_empty() {
75            return Err("No readings provided".to_string());
76        }
77
78        let readings: Result<Vec<IoTReading>, String> = dtos
79            .iter()
80            .map(|dto| {
81                Self::validate_metric_for_device(&dto.device_type, &dto.metric_type)?;
82                let mut reading = IoTReading::new(
83                    dto.building_id,
84                    dto.device_type.clone(),
85                    dto.metric_type.clone(),
86                    dto.value,
87                    dto.unit.clone(),
88                    dto.timestamp,
89                    dto.source.clone(),
90                )?;
91
92                if let Some(ref metadata) = dto.metadata {
93                    reading = reading.with_metadata(metadata.clone());
94                }
95
96                Ok(reading)
97            })
98            .collect();
99
100        let readings = readings?;
101        let count = self.iot_repo.create_readings_bulk(&readings).await?;
102
103        // Audit log (async, non-blocking)
104        tokio::spawn(async move {
105            log_audit_event(
106                AuditEventType::IoTReadingsBulkCreated,
107                Some(user_id),
108                Some(organization_id),
109                Some(format!("Bulk IoT readings created: {} records", count)),
110                None,
111            )
112            .await;
113        });
114
115        Ok(count)
116    }
117
118    /// Query IoT readings with filters
119    pub async fn query_readings(
120        &self,
121        query: QueryIoTReadingsDto,
122    ) -> Result<Vec<IoTReadingResponseDto>, String> {
123        let readings = self
124            .iot_repo
125            .find_readings_by_building(
126                query.building_id,
127                query.device_type,
128                query.metric_type,
129                query.start_date,
130                query.end_date,
131                query.limit,
132            )
133            .await?;
134
135        Ok(readings
136            .into_iter()
137            .map(IoTReadingResponseDto::from)
138            .collect())
139    }
140
141    /// Get consumption statistics for a building and metric type
142    pub async fn get_consumption_stats(
143        &self,
144        building_id: Uuid,
145        metric_type: MetricType,
146        start_date: DateTime<Utc>,
147        end_date: DateTime<Utc>,
148    ) -> Result<ConsumptionStatsDto, String> {
149        if start_date >= end_date {
150            return Err("start_date must be before end_date".to_string());
151        }
152
153        self.iot_repo
154            .get_consumption_stats(building_id, metric_type, start_date, end_date)
155            .await
156    }
157
158    /// Get daily aggregates for a device type and metric
159    pub async fn get_daily_aggregates(
160        &self,
161        building_id: Uuid,
162        device_type: DeviceType,
163        metric_type: MetricType,
164        start_date: DateTime<Utc>,
165        end_date: DateTime<Utc>,
166    ) -> Result<Vec<DailyAggregateDto>, String> {
167        Self::validate_metric_for_device(&device_type, &metric_type)?;
168
169        if start_date >= end_date {
170            return Err("start_date must be before end_date".to_string());
171        }
172
173        self.iot_repo
174            .get_daily_aggregates(building_id, device_type, metric_type, start_date, end_date)
175            .await
176    }
177
178    /// Get monthly aggregates for a device type and metric
179    pub async fn get_monthly_aggregates(
180        &self,
181        building_id: Uuid,
182        device_type: DeviceType,
183        metric_type: MetricType,
184        start_date: DateTime<Utc>,
185        end_date: DateTime<Utc>,
186    ) -> Result<Vec<MonthlyAggregateDto>, String> {
187        Self::validate_metric_for_device(&device_type, &metric_type)?;
188
189        if start_date >= end_date {
190            return Err("start_date must be before end_date".to_string());
191        }
192
193        self.iot_repo
194            .get_monthly_aggregates(building_id, device_type, metric_type, start_date, end_date)
195            .await
196    }
197
198    /// Detect consumption anomalies (values deviating significantly from average)
199    pub async fn detect_anomalies(
200        &self,
201        building_id: Uuid,
202        metric_type: MetricType,
203        threshold_percentage: f64,
204        lookback_days: i64,
205    ) -> Result<Vec<IoTReadingResponseDto>, String> {
206        if threshold_percentage <= 0.0 || threshold_percentage > 100.0 {
207            return Err("threshold_percentage must be between 0 and 100".to_string());
208        }
209
210        if lookback_days <= 0 {
211            return Err("lookback_days must be positive".to_string());
212        }
213
214        let anomalies = self
215            .iot_repo
216            .detect_anomalies(
217                building_id,
218                metric_type,
219                threshold_percentage,
220                lookback_days,
221            )
222            .await?;
223
224        Ok(anomalies
225            .into_iter()
226            .map(IoTReadingResponseDto::from)
227            .collect())
228    }
229
230    /// Validate that a metric type is compatible with a device type
231    fn validate_metric_for_device(
232        device_type: &DeviceType,
233        metric_type: &MetricType,
234    ) -> Result<(), String> {
235        match device_type {
236            DeviceType::ElectricityMeter => match metric_type {
237                MetricType::ElectricityConsumption | MetricType::Power | MetricType::Voltage => {
238                    Ok(())
239                }
240                _ => Err(format!(
241                    "Metric type {:?} is not compatible with Linky device",
242                    metric_type
243                )),
244            },
245            DeviceType::GasMeter => match metric_type {
246                MetricType::GasConsumption => Ok(()),
247                _ => Err(format!(
248                    "Metric type {:?} is not compatible with GasMeter device",
249                    metric_type
250                )),
251            },
252            DeviceType::PowerMeter => match metric_type {
253                MetricType::Power => Ok(()),
254                _ => Err(format!(
255                    "Metric type {:?} is not compatible with PowerMeter device",
256                    metric_type
257                )),
258            },
259            DeviceType::WaterMeter => match metric_type {
260                MetricType::WaterConsumption => Ok(()),
261                _ => Err(format!(
262                    "Metric type {:?} is not compatible with WaterMeter device",
263                    metric_type
264                )),
265            },
266            DeviceType::TemperatureSensor => match metric_type {
267                MetricType::Temperature => Ok(()),
268                _ => Err(format!(
269                    "Metric type {:?} is not compatible with TemperatureSensor device",
270                    metric_type
271                )),
272            },
273            DeviceType::HumiditySensor => match metric_type {
274                MetricType::Humidity => Ok(()),
275                _ => Err(format!(
276                    "Metric type {:?} is not compatible with HumiditySensor device",
277                    metric_type
278                )),
279            },
280        }
281    }
282}
283
284/// Use cases for managing Linky devices and syncing data
285pub struct LinkyUseCases {
286    iot_repo: Arc<dyn IoTRepository>,
287    linky_client: Arc<dyn LinkyApiClient>,
288    oauth_redirect_uri: String,
289}
290
291impl LinkyUseCases {
292    pub fn new(
293        iot_repo: Arc<dyn IoTRepository>,
294        linky_client: Arc<dyn LinkyApiClient>,
295        oauth_redirect_uri: String,
296    ) -> Self {
297        Self {
298            iot_repo,
299            linky_client,
300            oauth_redirect_uri,
301        }
302    }
303
304    /// Configure a new Linky device for a building
305    pub async fn configure_linky_device(
306        &self,
307        dto: ConfigureLinkyDeviceDto,
308        user_id: Uuid,
309        organization_id: Uuid,
310    ) -> Result<LinkyDeviceResponseDto, String> {
311        // Check if device already exists for this building
312        if let Some(_existing) = self
313            .iot_repo
314            .find_linky_device_by_building(dto.building_id)
315            .await?
316        {
317            return Err("Linky device already configured for this building".to_string());
318        }
319
320        // Check if PRM already registered with this provider
321        if let Some(_existing) = self
322            .iot_repo
323            .find_linky_device_by_prm(&dto.prm, &dto.provider.to_string())
324            .await?
325        {
326            return Err("This PRM is already registered with this provider".to_string());
327        }
328
329        // Exchange authorization code for access token
330        let token_response = self
331            .linky_client
332            .exchange_authorization_code(&dto.authorization_code, &self.oauth_redirect_uri)
333            .await
334            .map_err(|e| format!("Failed to exchange authorization code: {:?}", e))?;
335
336        // Create Linky device
337        let mut device = LinkyDevice::new(
338            dto.building_id,
339            dto.prm,
340            dto.provider,
341            token_response.access_token,
342        )?;
343
344        // Add refresh token if present
345        if let Some(refresh_token) = token_response.refresh_token {
346            let expires_at =
347                chrono::Utc::now() + chrono::Duration::seconds(token_response.expires_in);
348            device = device.with_refresh_token(refresh_token, expires_at);
349        }
350
351        let created = self.iot_repo.create_linky_device(&device).await?;
352
353        // Audit log (async, non-blocking)
354        let prm = created.prm.clone();
355        let provider = created.provider.clone();
356        tokio::spawn(async move {
357            log_audit_event(
358                AuditEventType::LinkyDeviceConfigured,
359                Some(user_id),
360                Some(organization_id),
361                Some(format!(
362                    "Linky device configured: PRM={}, Provider={}",
363                    prm, provider
364                )),
365                None,
366            )
367            .await;
368        });
369
370        Ok(LinkyDeviceResponseDto::from(created))
371    }
372
373    /// Sync data from Linky API for a building
374    pub async fn sync_linky_data(
375        &self,
376        dto: SyncLinkyDataDto,
377        user_id: Uuid,
378        organization_id: Uuid,
379    ) -> Result<LinkySyncResponseDto, String> {
380        // Find Linky device for this building
381        let mut device = self
382            .iot_repo
383            .find_linky_device_by_building(dto.building_id)
384            .await?
385            .ok_or("No Linky device configured for this building".to_string())?;
386
387        if !device.sync_enabled {
388            return Err("Sync is disabled for this device".to_string());
389        }
390
391        // Refresh token if expired
392        if device.is_token_expired() {
393            // Unwrap refresh token (must exist if token is expired)
394            let refresh_token = device
395                .refresh_token_encrypted
396                .as_ref()
397                .ok_or("Refresh token not available")?;
398
399            let token_response = self
400                .linky_client
401                .refresh_access_token(refresh_token)
402                .await
403                .map_err(|e| format!("Failed to refresh access token: {:?}", e))?;
404
405            // Convert expires_in to DateTime
406            let expires_at =
407                chrono::Utc::now() + chrono::Duration::seconds(token_response.expires_in);
408
409            device.update_tokens(
410                token_response.access_token,
411                token_response.refresh_token,
412                Some(expires_at),
413            )?;
414
415            self.iot_repo.update_linky_device(&device).await?;
416        }
417
418        let sync_started_at = Utc::now();
419
420        // Fetch daily consumption data
421        let consumption_data = self
422            .linky_client
423            .get_daily_consumption(
424                &device.prm,
425                &device.api_key_encrypted,
426                dto.start_date,
427                dto.end_date,
428            )
429            .await
430            .map_err(|e| format!("Failed to fetch consumption data: {:?}", e))?;
431
432        // Convert consumption data to IoT readings
433        let readings: Vec<IoTReading> = consumption_data
434            .iter()
435            .map(|data_point| -> Result<IoTReading, String> {
436                let reading = IoTReading::new(
437                    dto.building_id,
438                    DeviceType::ElectricityMeter,
439                    MetricType::ElectricityConsumption,
440                    data_point.value,
441                    "kWh".to_string(),
442                    data_point.timestamp,
443                    device.provider.to_string(),
444                )?;
445
446                Ok(reading.with_metadata(serde_json::json!({
447                    "prm": device.prm,
448                    "quality": data_point.quality,
449                    "sync_id": Uuid::new_v4().to_string(),
450                })))
451            })
452            .collect::<Result<Vec<_>, _>>()?;
453
454        let readings_count = readings.len();
455
456        // Bulk insert readings
457        if readings_count > 0 {
458            self.iot_repo.create_readings_bulk(&readings).await?;
459        }
460
461        // Update device last_sync_at
462        device.mark_synced();
463        self.iot_repo.update_linky_device(&device).await?;
464
465        // Audit log (async, non-blocking)
466        let device_id = device.id;
467        tokio::spawn(async move {
468            log_audit_event(
469                AuditEventType::LinkyDataSynced,
470                Some(user_id),
471                Some(organization_id),
472                Some(format!(
473                    "Linky data synced: {} readings fetched",
474                    readings_count
475                )),
476                None,
477            )
478            .await;
479        });
480
481        Ok(LinkySyncResponseDto {
482            device_id,
483            sync_started_at,
484            start_date: dto.start_date,
485            end_date: dto.end_date,
486            readings_fetched: readings_count,
487            success: true,
488            error_message: None,
489        })
490    }
491
492    /// Get Linky device information for a building
493    pub async fn get_linky_device(
494        &self,
495        building_id: Uuid,
496    ) -> Result<LinkyDeviceResponseDto, String> {
497        let device = self
498            .iot_repo
499            .find_linky_device_by_building(building_id)
500            .await?
501            .ok_or("No Linky device configured for this building".to_string())?;
502
503        Ok(LinkyDeviceResponseDto::from(device))
504    }
505
506    /// Delete Linky device configuration
507    pub async fn delete_linky_device(
508        &self,
509        building_id: Uuid,
510        user_id: Uuid,
511        organization_id: Uuid,
512    ) -> Result<(), String> {
513        let device = self
514            .iot_repo
515            .find_linky_device_by_building(building_id)
516            .await?
517            .ok_or("No Linky device configured for this building".to_string())?;
518
519        let device_id = device.id;
520        self.iot_repo.delete_linky_device(device_id).await?;
521
522        // Audit log (async, non-blocking)
523        tokio::spawn(async move {
524            log_audit_event(
525                AuditEventType::LinkyDeviceDeleted,
526                Some(user_id),
527                Some(organization_id),
528                Some(format!("Linky device deleted: {}", device_id)),
529                None,
530            )
531            .await;
532        });
533
534        Ok(())
535    }
536
537    /// Enable/disable sync for a Linky device
538    pub async fn toggle_sync(
539        &self,
540        building_id: Uuid,
541        enabled: bool,
542        user_id: Uuid,
543        organization_id: Uuid,
544    ) -> Result<LinkyDeviceResponseDto, String> {
545        let mut device = self
546            .iot_repo
547            .find_linky_device_by_building(building_id)
548            .await?
549            .ok_or("No Linky device configured for this building".to_string())?;
550
551        if enabled {
552            device.enable_sync();
553        } else {
554            device.disable_sync();
555        }
556
557        let updated = self.iot_repo.update_linky_device(&device).await?;
558
559        // Audit log (async, non-blocking)
560        tokio::spawn(async move {
561            log_audit_event(
562                AuditEventType::LinkySyncToggled,
563                Some(user_id),
564                Some(organization_id),
565                Some(format!(
566                    "Linky sync {}",
567                    if enabled { "enabled" } else { "disabled" }
568                )),
569                None,
570            )
571            .await;
572        });
573
574        Ok(LinkyDeviceResponseDto::from(updated))
575    }
576
577    /// Find devices that need syncing (enabled, not synced recently)
578    pub async fn find_devices_needing_sync(&self) -> Result<Vec<LinkyDeviceResponseDto>, String> {
579        let devices = self.iot_repo.find_devices_needing_sync().await?;
580        Ok(devices
581            .into_iter()
582            .map(LinkyDeviceResponseDto::from)
583            .collect())
584    }
585
586    /// Find devices with expired tokens
587    pub async fn find_devices_with_expired_tokens(
588        &self,
589    ) -> Result<Vec<LinkyDeviceResponseDto>, String> {
590        let devices = self.iot_repo.find_devices_with_expired_tokens().await?;
591        Ok(devices
592            .into_iter()
593            .map(LinkyDeviceResponseDto::from)
594            .collect())
595    }
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601
602    #[test]
603    fn test_validate_metric_for_device_linky_valid() {
604        assert!(IoTUseCases::validate_metric_for_device(
605            &DeviceType::ElectricityMeter,
606            &MetricType::ElectricityConsumption
607        )
608        .is_ok());
609        assert!(IoTUseCases::validate_metric_for_device(
610            &DeviceType::ElectricityMeter,
611            &MetricType::Power
612        )
613        .is_ok());
614        assert!(IoTUseCases::validate_metric_for_device(
615            &DeviceType::ElectricityMeter,
616            &MetricType::Voltage
617        )
618        .is_ok());
619    }
620
621    #[test]
622    fn test_validate_metric_for_device_linky_invalid() {
623        assert!(IoTUseCases::validate_metric_for_device(
624            &DeviceType::ElectricityMeter,
625            &MetricType::GasConsumption
626        )
627        .is_err());
628        assert!(IoTUseCases::validate_metric_for_device(
629            &DeviceType::ElectricityMeter,
630            &MetricType::WaterConsumption
631        )
632        .is_err());
633    }
634
635    #[test]
636    fn test_validate_metric_for_device_ores_valid() {
637        assert!(IoTUseCases::validate_metric_for_device(
638            &DeviceType::ElectricityMeter,
639            &MetricType::ElectricityConsumption
640        )
641        .is_ok());
642    }
643
644    #[test]
645    fn test_validate_metric_for_device_ores_invalid() {
646        assert!(IoTUseCases::validate_metric_for_device(
647            &DeviceType::ElectricityMeter,
648            &MetricType::GasConsumption
649        )
650        .is_err());
651    }
652
653    #[test]
654    fn test_validate_metric_for_device_water_meter_valid() {
655        assert!(IoTUseCases::validate_metric_for_device(
656            &DeviceType::WaterMeter,
657            &MetricType::WaterConsumption
658        )
659        .is_ok());
660    }
661
662    #[test]
663    fn test_validate_metric_for_device_temperature_sensor_valid() {
664        assert!(IoTUseCases::validate_metric_for_device(
665            &DeviceType::TemperatureSensor,
666            &MetricType::Temperature
667        )
668        .is_ok());
669    }
670
671    #[test]
672    fn test_validate_metric_for_device_humidity_sensor_valid() {
673        assert!(IoTUseCases::validate_metric_for_device(
674            &DeviceType::HumiditySensor,
675            &MetricType::Humidity
676        )
677        .is_ok());
678    }
679}