1use crate::application::dto::{
2 ConfigureLinkyDeviceDto, ConsumptionStatsDto, CreateIoTReadingDto, DailyAggregateDto,
3 IoTReadingResponseDto, LinkyDeviceResponseDto, LinkySyncResponseDto, MonthlyAggregateDto,
4 QueryIoTReadingsDto, SyncLinkyDataDto,
5};
6use crate::application::ports::{IoTRepository, LinkyApiClient};
7use crate::domain::entities::{DeviceType, IoTReading, LinkyDevice, MetricType};
8use crate::infrastructure::audit::{log_audit_event, AuditEventType};
9use chrono::{DateTime, Utc};
10use std::sync::Arc;
11use uuid::Uuid;
12
13pub struct IoTUseCases {
15 iot_repo: Arc<dyn IoTRepository>,
16}
17
18impl IoTUseCases {
19 pub fn new(iot_repo: Arc<dyn IoTRepository>) -> Self {
20 Self { iot_repo }
21 }
22
23 pub async fn create_reading(
25 &self,
26 dto: CreateIoTReadingDto,
27 user_id: Uuid,
28 organization_id: Uuid,
29 ) -> Result<IoTReadingResponseDto, String> {
30 Self::validate_metric_for_device(&dto.device_type, &dto.metric_type)?;
32
33 let mut reading = IoTReading::new(
34 dto.building_id,
35 dto.device_type,
36 dto.metric_type,
37 dto.value,
38 dto.unit,
39 dto.timestamp,
40 dto.source,
41 )?;
42
43 if let Some(metadata) = dto.metadata {
44 reading = reading.with_metadata(metadata);
45 }
46
47 let created = self.iot_repo.create_reading(&reading).await?;
48
49 tokio::spawn(async move {
51 log_audit_event(
52 AuditEventType::IoTReadingCreated,
53 Some(user_id),
54 Some(organization_id),
55 Some(format!(
56 "IoT reading created: {:?} {} {}",
57 dto.device_type, dto.metric_type, dto.value
58 )),
59 None,
60 )
61 .await;
62 });
63
64 Ok(IoTReadingResponseDto::from(created))
65 }
66
67 pub async fn create_readings_bulk(
69 &self,
70 dtos: Vec<CreateIoTReadingDto>,
71 user_id: Uuid,
72 organization_id: Uuid,
73 ) -> Result<usize, String> {
74 if dtos.is_empty() {
75 return Err("No readings provided".to_string());
76 }
77
78 let readings: Result<Vec<IoTReading>, String> = dtos
79 .iter()
80 .map(|dto| {
81 Self::validate_metric_for_device(&dto.device_type, &dto.metric_type)?;
82 let mut reading = IoTReading::new(
83 dto.building_id,
84 dto.device_type.clone(),
85 dto.metric_type.clone(),
86 dto.value,
87 dto.unit.clone(),
88 dto.timestamp,
89 dto.source.clone(),
90 )?;
91
92 if let Some(ref metadata) = dto.metadata {
93 reading = reading.with_metadata(metadata.clone());
94 }
95
96 Ok(reading)
97 })
98 .collect();
99
100 let readings = readings?;
101 let count = self.iot_repo.create_readings_bulk(&readings).await?;
102
103 tokio::spawn(async move {
105 log_audit_event(
106 AuditEventType::IoTReadingsBulkCreated,
107 Some(user_id),
108 Some(organization_id),
109 Some(format!("Bulk IoT readings created: {} records", count)),
110 None,
111 )
112 .await;
113 });
114
115 Ok(count)
116 }
117
118 pub async fn query_readings(
120 &self,
121 query: QueryIoTReadingsDto,
122 ) -> Result<Vec<IoTReadingResponseDto>, String> {
123 let readings = self
124 .iot_repo
125 .find_readings_by_building(
126 query.building_id,
127 query.device_type,
128 query.metric_type,
129 query.start_date,
130 query.end_date,
131 query.limit,
132 )
133 .await?;
134
135 Ok(readings
136 .into_iter()
137 .map(IoTReadingResponseDto::from)
138 .collect())
139 }
140
141 pub async fn get_consumption_stats(
143 &self,
144 building_id: Uuid,
145 metric_type: MetricType,
146 start_date: DateTime<Utc>,
147 end_date: DateTime<Utc>,
148 ) -> Result<ConsumptionStatsDto, String> {
149 if start_date >= end_date {
150 return Err("start_date must be before end_date".to_string());
151 }
152
153 self.iot_repo
154 .get_consumption_stats(building_id, metric_type, start_date, end_date)
155 .await
156 }
157
158 pub async fn get_daily_aggregates(
160 &self,
161 building_id: Uuid,
162 device_type: DeviceType,
163 metric_type: MetricType,
164 start_date: DateTime<Utc>,
165 end_date: DateTime<Utc>,
166 ) -> Result<Vec<DailyAggregateDto>, String> {
167 Self::validate_metric_for_device(&device_type, &metric_type)?;
168
169 if start_date >= end_date {
170 return Err("start_date must be before end_date".to_string());
171 }
172
173 self.iot_repo
174 .get_daily_aggregates(building_id, device_type, metric_type, start_date, end_date)
175 .await
176 }
177
178 pub async fn get_monthly_aggregates(
180 &self,
181 building_id: Uuid,
182 device_type: DeviceType,
183 metric_type: MetricType,
184 start_date: DateTime<Utc>,
185 end_date: DateTime<Utc>,
186 ) -> Result<Vec<MonthlyAggregateDto>, String> {
187 Self::validate_metric_for_device(&device_type, &metric_type)?;
188
189 if start_date >= end_date {
190 return Err("start_date must be before end_date".to_string());
191 }
192
193 self.iot_repo
194 .get_monthly_aggregates(building_id, device_type, metric_type, start_date, end_date)
195 .await
196 }
197
198 pub async fn detect_anomalies(
200 &self,
201 building_id: Uuid,
202 metric_type: MetricType,
203 threshold_percentage: f64,
204 lookback_days: i64,
205 ) -> Result<Vec<IoTReadingResponseDto>, String> {
206 if threshold_percentage <= 0.0 || threshold_percentage > 100.0 {
207 return Err("threshold_percentage must be between 0 and 100".to_string());
208 }
209
210 if lookback_days <= 0 {
211 return Err("lookback_days must be positive".to_string());
212 }
213
214 let anomalies = self
215 .iot_repo
216 .detect_anomalies(
217 building_id,
218 metric_type,
219 threshold_percentage,
220 lookback_days,
221 )
222 .await?;
223
224 Ok(anomalies
225 .into_iter()
226 .map(IoTReadingResponseDto::from)
227 .collect())
228 }
229
230 fn validate_metric_for_device(
232 device_type: &DeviceType,
233 metric_type: &MetricType,
234 ) -> Result<(), String> {
235 match device_type {
236 DeviceType::ElectricityMeter => match metric_type {
237 MetricType::ElectricityConsumption | MetricType::Power | MetricType::Voltage => {
238 Ok(())
239 }
240 _ => Err(format!(
241 "Metric type {:?} is not compatible with Linky device",
242 metric_type
243 )),
244 },
245 DeviceType::GasMeter => match metric_type {
246 MetricType::GasConsumption => Ok(()),
247 _ => Err(format!(
248 "Metric type {:?} is not compatible with GasMeter device",
249 metric_type
250 )),
251 },
252 DeviceType::PowerMeter => match metric_type {
253 MetricType::Power => Ok(()),
254 _ => Err(format!(
255 "Metric type {:?} is not compatible with PowerMeter device",
256 metric_type
257 )),
258 },
259 DeviceType::WaterMeter => match metric_type {
260 MetricType::WaterConsumption => Ok(()),
261 _ => Err(format!(
262 "Metric type {:?} is not compatible with WaterMeter device",
263 metric_type
264 )),
265 },
266 DeviceType::TemperatureSensor => match metric_type {
267 MetricType::Temperature => Ok(()),
268 _ => Err(format!(
269 "Metric type {:?} is not compatible with TemperatureSensor device",
270 metric_type
271 )),
272 },
273 DeviceType::HumiditySensor => match metric_type {
274 MetricType::Humidity => Ok(()),
275 _ => Err(format!(
276 "Metric type {:?} is not compatible with HumiditySensor device",
277 metric_type
278 )),
279 },
280 }
281 }
282}
283
284pub struct LinkyUseCases {
286 iot_repo: Arc<dyn IoTRepository>,
287 linky_client: Arc<dyn LinkyApiClient>,
288 oauth_redirect_uri: String,
289}
290
291impl LinkyUseCases {
292 pub fn new(
293 iot_repo: Arc<dyn IoTRepository>,
294 linky_client: Arc<dyn LinkyApiClient>,
295 oauth_redirect_uri: String,
296 ) -> Self {
297 Self {
298 iot_repo,
299 linky_client,
300 oauth_redirect_uri,
301 }
302 }
303
304 pub async fn configure_linky_device(
306 &self,
307 dto: ConfigureLinkyDeviceDto,
308 user_id: Uuid,
309 organization_id: Uuid,
310 ) -> Result<LinkyDeviceResponseDto, String> {
311 if let Some(_existing) = self
313 .iot_repo
314 .find_linky_device_by_building(dto.building_id)
315 .await?
316 {
317 return Err("Linky device already configured for this building".to_string());
318 }
319
320 if let Some(_existing) = self
322 .iot_repo
323 .find_linky_device_by_prm(&dto.prm, &dto.provider.to_string())
324 .await?
325 {
326 return Err("This PRM is already registered with this provider".to_string());
327 }
328
329 let token_response = self
331 .linky_client
332 .exchange_authorization_code(&dto.authorization_code, &self.oauth_redirect_uri)
333 .await
334 .map_err(|e| format!("Failed to exchange authorization code: {:?}", e))?;
335
336 let mut device = LinkyDevice::new(
338 dto.building_id,
339 dto.prm,
340 dto.provider,
341 token_response.access_token,
342 )?;
343
344 if let Some(refresh_token) = token_response.refresh_token {
346 let expires_at =
347 chrono::Utc::now() + chrono::Duration::seconds(token_response.expires_in);
348 device = device.with_refresh_token(refresh_token, expires_at);
349 }
350
351 let created = self.iot_repo.create_linky_device(&device).await?;
352
353 let prm = created.prm.clone();
355 let provider = created.provider.clone();
356 tokio::spawn(async move {
357 log_audit_event(
358 AuditEventType::LinkyDeviceConfigured,
359 Some(user_id),
360 Some(organization_id),
361 Some(format!(
362 "Linky device configured: PRM={}, Provider={}",
363 prm, provider
364 )),
365 None,
366 )
367 .await;
368 });
369
370 Ok(LinkyDeviceResponseDto::from(created))
371 }
372
373 pub async fn sync_linky_data(
375 &self,
376 dto: SyncLinkyDataDto,
377 user_id: Uuid,
378 organization_id: Uuid,
379 ) -> Result<LinkySyncResponseDto, String> {
380 let mut device = self
382 .iot_repo
383 .find_linky_device_by_building(dto.building_id)
384 .await?
385 .ok_or("No Linky device configured for this building".to_string())?;
386
387 if !device.sync_enabled {
388 return Err("Sync is disabled for this device".to_string());
389 }
390
391 if device.is_token_expired() {
393 let refresh_token = device
395 .refresh_token_encrypted
396 .as_ref()
397 .ok_or("Refresh token not available")?;
398
399 let token_response = self
400 .linky_client
401 .refresh_access_token(refresh_token)
402 .await
403 .map_err(|e| format!("Failed to refresh access token: {:?}", e))?;
404
405 let expires_at =
407 chrono::Utc::now() + chrono::Duration::seconds(token_response.expires_in);
408
409 device.update_tokens(
410 token_response.access_token,
411 token_response.refresh_token,
412 Some(expires_at),
413 )?;
414
415 self.iot_repo.update_linky_device(&device).await?;
416 }
417
418 let sync_started_at = Utc::now();
419
420 let consumption_data = self
422 .linky_client
423 .get_daily_consumption(
424 &device.prm,
425 &device.api_key_encrypted,
426 dto.start_date,
427 dto.end_date,
428 )
429 .await
430 .map_err(|e| format!("Failed to fetch consumption data: {:?}", e))?;
431
432 let readings: Vec<IoTReading> = consumption_data
434 .iter()
435 .map(|data_point| -> Result<IoTReading, String> {
436 let reading = IoTReading::new(
437 dto.building_id,
438 DeviceType::ElectricityMeter,
439 MetricType::ElectricityConsumption,
440 data_point.value,
441 "kWh".to_string(),
442 data_point.timestamp,
443 device.provider.to_string(),
444 )?;
445
446 Ok(reading.with_metadata(serde_json::json!({
447 "prm": device.prm,
448 "quality": data_point.quality,
449 "sync_id": Uuid::new_v4().to_string(),
450 })))
451 })
452 .collect::<Result<Vec<_>, _>>()?;
453
454 let readings_count = readings.len();
455
456 if readings_count > 0 {
458 self.iot_repo.create_readings_bulk(&readings).await?;
459 }
460
461 device.mark_synced();
463 self.iot_repo.update_linky_device(&device).await?;
464
465 let device_id = device.id;
467 tokio::spawn(async move {
468 log_audit_event(
469 AuditEventType::LinkyDataSynced,
470 Some(user_id),
471 Some(organization_id),
472 Some(format!(
473 "Linky data synced: {} readings fetched",
474 readings_count
475 )),
476 None,
477 )
478 .await;
479 });
480
481 Ok(LinkySyncResponseDto {
482 device_id,
483 sync_started_at,
484 start_date: dto.start_date,
485 end_date: dto.end_date,
486 readings_fetched: readings_count,
487 success: true,
488 error_message: None,
489 })
490 }
491
492 pub async fn get_linky_device(
494 &self,
495 building_id: Uuid,
496 ) -> Result<LinkyDeviceResponseDto, String> {
497 let device = self
498 .iot_repo
499 .find_linky_device_by_building(building_id)
500 .await?
501 .ok_or("No Linky device configured for this building".to_string())?;
502
503 Ok(LinkyDeviceResponseDto::from(device))
504 }
505
506 pub async fn delete_linky_device(
508 &self,
509 building_id: Uuid,
510 user_id: Uuid,
511 organization_id: Uuid,
512 ) -> Result<(), String> {
513 let device = self
514 .iot_repo
515 .find_linky_device_by_building(building_id)
516 .await?
517 .ok_or("No Linky device configured for this building".to_string())?;
518
519 let device_id = device.id;
520 self.iot_repo.delete_linky_device(device_id).await?;
521
522 tokio::spawn(async move {
524 log_audit_event(
525 AuditEventType::LinkyDeviceDeleted,
526 Some(user_id),
527 Some(organization_id),
528 Some(format!("Linky device deleted: {}", device_id)),
529 None,
530 )
531 .await;
532 });
533
534 Ok(())
535 }
536
537 pub async fn toggle_sync(
539 &self,
540 building_id: Uuid,
541 enabled: bool,
542 user_id: Uuid,
543 organization_id: Uuid,
544 ) -> Result<LinkyDeviceResponseDto, String> {
545 let mut device = self
546 .iot_repo
547 .find_linky_device_by_building(building_id)
548 .await?
549 .ok_or("No Linky device configured for this building".to_string())?;
550
551 if enabled {
552 device.enable_sync();
553 } else {
554 device.disable_sync();
555 }
556
557 let updated = self.iot_repo.update_linky_device(&device).await?;
558
559 tokio::spawn(async move {
561 log_audit_event(
562 AuditEventType::LinkySyncToggled,
563 Some(user_id),
564 Some(organization_id),
565 Some(format!(
566 "Linky sync {}",
567 if enabled { "enabled" } else { "disabled" }
568 )),
569 None,
570 )
571 .await;
572 });
573
574 Ok(LinkyDeviceResponseDto::from(updated))
575 }
576
577 pub async fn find_devices_needing_sync(&self) -> Result<Vec<LinkyDeviceResponseDto>, String> {
579 let devices = self.iot_repo.find_devices_needing_sync().await?;
580 Ok(devices
581 .into_iter()
582 .map(LinkyDeviceResponseDto::from)
583 .collect())
584 }
585
586 pub async fn find_devices_with_expired_tokens(
588 &self,
589 ) -> Result<Vec<LinkyDeviceResponseDto>, String> {
590 let devices = self.iot_repo.find_devices_with_expired_tokens().await?;
591 Ok(devices
592 .into_iter()
593 .map(LinkyDeviceResponseDto::from)
594 .collect())
595 }
596}
597
598#[cfg(test)]
599mod tests {
600 use super::*;
601
602 #[test]
603 fn test_validate_metric_for_device_linky_valid() {
604 assert!(IoTUseCases::validate_metric_for_device(
605 &DeviceType::ElectricityMeter,
606 &MetricType::ElectricityConsumption
607 )
608 .is_ok());
609 assert!(IoTUseCases::validate_metric_for_device(
610 &DeviceType::ElectricityMeter,
611 &MetricType::Power
612 )
613 .is_ok());
614 assert!(IoTUseCases::validate_metric_for_device(
615 &DeviceType::ElectricityMeter,
616 &MetricType::Voltage
617 )
618 .is_ok());
619 }
620
621 #[test]
622 fn test_validate_metric_for_device_linky_invalid() {
623 assert!(IoTUseCases::validate_metric_for_device(
624 &DeviceType::ElectricityMeter,
625 &MetricType::GasConsumption
626 )
627 .is_err());
628 assert!(IoTUseCases::validate_metric_for_device(
629 &DeviceType::ElectricityMeter,
630 &MetricType::WaterConsumption
631 )
632 .is_err());
633 }
634
635 #[test]
636 fn test_validate_metric_for_device_ores_valid() {
637 assert!(IoTUseCases::validate_metric_for_device(
638 &DeviceType::ElectricityMeter,
639 &MetricType::ElectricityConsumption
640 )
641 .is_ok());
642 }
643
644 #[test]
645 fn test_validate_metric_for_device_ores_invalid() {
646 assert!(IoTUseCases::validate_metric_for_device(
647 &DeviceType::ElectricityMeter,
648 &MetricType::GasConsumption
649 )
650 .is_err());
651 }
652
653 #[test]
654 fn test_validate_metric_for_device_water_meter_valid() {
655 assert!(IoTUseCases::validate_metric_for_device(
656 &DeviceType::WaterMeter,
657 &MetricType::WaterConsumption
658 )
659 .is_ok());
660 }
661
662 #[test]
663 fn test_validate_metric_for_device_temperature_sensor_valid() {
664 assert!(IoTUseCases::validate_metric_for_device(
665 &DeviceType::TemperatureSensor,
666 &MetricType::Temperature
667 )
668 .is_ok());
669 }
670
671 #[test]
672 fn test_validate_metric_for_device_humidity_sensor_valid() {
673 assert!(IoTUseCases::validate_metric_for_device(
674 &DeviceType::HumiditySensor,
675 &MetricType::Humidity
676 )
677 .is_ok());
678 }
679}