koprogo_api/infrastructure/mqtt/
mqtt_energy_adapter.rs

1use crate::application::ports::iot_repository::IoTRepository;
2use crate::application::ports::mqtt_energy_port::{
3    MqttEnergyPort, MqttError, MqttIncomingReadingDto,
4};
5use crate::domain::entities::iot_reading::IoTReading;
6use async_trait::async_trait;
7use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS};
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tokio::task::JoinHandle;
11use tracing::{error, info, warn};
12use uuid::Uuid;
13
14/// Configuration MQTT chargée depuis les variables d'environnement.
15///
16/// Variables attendues:
17/// - MQTT_HOST (défaut: "localhost")
18/// - MQTT_PORT (défaut: 1883, TLS: 8883)
19/// - MQTT_CLIENT_ID (défaut: "koprogo-backend")
20/// - MQTT_USERNAME / MQTT_PASSWORD
21/// - MQTT_TOPIC (défaut: "koprogo/+/energy/#")
22#[derive(Debug, Clone)]
23pub struct MqttConfig {
24    pub host: String,
25    pub port: u16,
26    pub client_id: String,
27    pub username: String,
28    pub password: String,
29    pub subscribe_topic: String,
30}
31
32impl MqttConfig {
33    pub fn from_env() -> Self {
34        Self {
35            host: std::env::var("MQTT_HOST").unwrap_or_else(|_| "localhost".to_string()),
36            port: std::env::var("MQTT_PORT")
37                .unwrap_or_else(|_| "1883".to_string())
38                .parse()
39                .unwrap_or(1883),
40            client_id: std::env::var("MQTT_CLIENT_ID")
41                .unwrap_or_else(|_| "koprogo-backend".to_string()),
42            username: std::env::var("MQTT_USERNAME").unwrap_or_default(),
43            password: std::env::var("MQTT_PASSWORD").unwrap_or_default(),
44            subscribe_topic: std::env::var("MQTT_TOPIC")
45                .unwrap_or_else(|_| "koprogo/+/energy/#".to_string()),
46        }
47    }
48}
49
50/// Adapter MQTT: subscribe aux topics Home Assistant → dispatch vers IoTRepository.
51///
52/// Flux: Mosquitto broker → rumqttc EventLoop (tokio::spawn)
53///       → parse topic (copropriete_id + unit_id) + deserialize JSON payload
54///       → IoTReading::new() (validation domaine) → iot_repo.create_reading()
55pub struct MqttEnergyAdapter {
56    config: MqttConfig,
57    iot_repo: Arc<dyn IoTRepository>,
58    listener_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
59}
60
61impl MqttEnergyAdapter {
62    pub fn new(config: MqttConfig, iot_repo: Arc<dyn IoTRepository>) -> Self {
63        Self {
64            config,
65            iot_repo,
66            listener_handle: Arc::new(Mutex::new(None)),
67        }
68    }
69
70    /// Parse le topic MQTT pour extraire copropriete_id et unit_id.
71    ///
72    /// Format attendu: koprogo/{copropriete_id}/energy/{unit_id}/{metric}
73    /// Exemple: koprogo/550e8400-.../energy/aaaa-bbbb-.../electricity
74    pub fn parse_topic(topic: &str) -> Result<(Uuid, Uuid), MqttError> {
75        let parts: Vec<&str> = topic.split('/').collect();
76        if parts.len() < 5 || parts[0] != "koprogo" || parts[2] != "energy" {
77            return Err(MqttError::InvalidTopic(format!(
78                "Expected 'koprogo/{{copropriete_id}}/energy/{{unit_id}}/{{metric}}', got: {}",
79                topic
80            )));
81        }
82        let copropriete_id = Uuid::parse_str(parts[1]).map_err(|_| {
83            MqttError::InvalidTopic(format!("Invalid copropriete_id UUID: {}", parts[1]))
84        })?;
85        let unit_id = Uuid::parse_str(parts[3])
86            .map_err(|_| MqttError::InvalidTopic(format!("Invalid unit_id UUID: {}", parts[3])))?;
87        Ok((copropriete_id, unit_id))
88    }
89}
90
91#[async_trait]
92impl MqttEnergyPort for MqttEnergyAdapter {
93    async fn start_listening(&self) -> Result<(), MqttError> {
94        let mut handle = self.listener_handle.lock().await;
95        if handle.is_some() {
96            return Err(MqttError::AlreadyRunning);
97        }
98
99        let mut opts =
100            MqttOptions::new(&self.config.client_id, &self.config.host, self.config.port);
101        if !self.config.username.is_empty() {
102            opts.set_credentials(&self.config.username, &self.config.password);
103        }
104        opts.set_keep_alive(std::time::Duration::from_secs(30));
105        // Persistent session: QoS 1 survit aux reconnexions
106        opts.set_clean_session(false);
107
108        let (client, mut eventloop) = AsyncClient::new(opts, 128);
109
110        client
111            .subscribe(&self.config.subscribe_topic, QoS::AtLeastOnce)
112            .await
113            .map_err(|e| MqttError::SubscriptionFailed {
114                topic: self.config.subscribe_topic.clone(),
115                reason: e.to_string(),
116            })?;
117
118        let iot_repo = Arc::clone(&self.iot_repo);
119        let topic_pattern = self.config.subscribe_topic.clone();
120
121        let join_handle = tokio::spawn(async move {
122            info!("MQTT listener started on topic: {}", topic_pattern);
123            loop {
124                match eventloop.poll().await {
125                    Ok(Event::Incoming(Packet::Publish(msg))) => {
126                        let topic = msg.topic.clone();
127                        match Self::parse_topic(&topic) {
128                            Ok((copropriete_id, _unit_id)) => {
129                                match serde_json::from_slice::<MqttIncomingReadingDto>(&msg.payload)
130                                {
131                                    Ok(dto) => {
132                                        match IoTReading::new(
133                                            copropriete_id,
134                                            dto.device_type,
135                                            dto.metric_type,
136                                            dto.value,
137                                            dto.unit.clone(),
138                                            dto.ts,
139                                            "mqtt_home_assistant".to_string(),
140                                        ) {
141                                            Ok(reading) => {
142                                                if let Err(e) =
143                                                    iot_repo.create_reading(&reading).await
144                                                {
145                                                    error!(
146                                                        "Failed to persist MQTT reading from {}: {}",
147                                                        topic, e
148                                                    );
149                                                }
150                                            }
151                                            Err(e) => {
152                                                warn!(
153                                                    "Domain validation failed for MQTT reading from {}: {}",
154                                                    topic, e
155                                                );
156                                            }
157                                        }
158                                    }
159                                    Err(e) => {
160                                        warn!("Failed to parse MQTT payload on {}: {}", topic, e);
161                                    }
162                                }
163                            }
164                            Err(e) => {
165                                warn!("Invalid MQTT topic {}: {}", topic, e);
166                            }
167                        }
168                    }
169                    Ok(Event::Incoming(Packet::ConnAck(_))) => {
170                        info!("MQTT connected to broker");
171                    }
172                    Ok(Event::Incoming(Packet::Disconnect)) => {
173                        warn!("MQTT disconnected, rumqttc will reconnect automatically");
174                    }
175                    Err(e) => {
176                        error!("MQTT event loop error: {}", e);
177                        tokio::time::sleep(std::time::Duration::from_secs(5)).await;
178                    }
179                    _ => {}
180                }
181            }
182        });
183
184        *handle = Some(join_handle);
185        info!(
186            "MQTT listener spawned for topic pattern: {}",
187            self.config.subscribe_topic
188        );
189        Ok(())
190    }
191
192    async fn stop_listening(&self) -> Result<(), MqttError> {
193        let mut handle = self.listener_handle.lock().await;
194        if let Some(h) = handle.take() {
195            h.abort();
196            info!("MQTT listener stopped");
197        }
198        Ok(())
199    }
200
201    async fn publish_alert(
202        &self,
203        _copropriete_id: Uuid,
204        alert_type: &str,
205        _payload: &str,
206    ) -> Result<(), MqttError> {
207        // TODO Phase 2: garder un Arc<Mutex<AsyncClient>> pour publier depuis le listener
208        info!("MQTT alert publish queued (not yet wired): {}", alert_type);
209        Ok(())
210    }
211
212    async fn is_running(&self) -> bool {
213        self.listener_handle.lock().await.is_some()
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    #[test]
222    fn test_parse_topic_valid() {
223        let copropriete_id = Uuid::new_v4();
224        let unit_id = Uuid::new_v4();
225        let topic = format!("koprogo/{}/energy/{}/electricity", copropriete_id, unit_id);
226        let (parsed_copro, parsed_unit) = MqttEnergyAdapter::parse_topic(&topic).unwrap();
227        assert_eq!(parsed_copro, copropriete_id);
228        assert_eq!(parsed_unit, unit_id);
229    }
230
231    #[test]
232    fn test_parse_topic_invalid_prefix() {
233        let topic = "other/abc/energy/def/electricity";
234        assert!(MqttEnergyAdapter::parse_topic(topic).is_err());
235    }
236
237    #[test]
238    fn test_parse_topic_missing_energy_segment() {
239        let topic = "koprogo/abc-def/readings/xyz/power";
240        assert!(MqttEnergyAdapter::parse_topic(topic).is_err());
241    }
242
243    #[test]
244    fn test_parse_topic_invalid_uuid() {
245        let topic = "koprogo/not-a-uuid/energy/also-not-a-uuid/electricity";
246        assert!(MqttEnergyAdapter::parse_topic(topic).is_err());
247    }
248
249    #[test]
250    fn test_parse_topic_too_short() {
251        let topic = "koprogo/abc/energy";
252        assert!(MqttEnergyAdapter::parse_topic(topic).is_err());
253    }
254
255    #[test]
256    fn test_mqtt_config_defaults() {
257        // Efface les éventuelles variables d'env pour tester les défauts
258        std::env::remove_var("MQTT_HOST");
259        std::env::remove_var("MQTT_PORT");
260        std::env::remove_var("MQTT_CLIENT_ID");
261        std::env::remove_var("MQTT_TOPIC");
262
263        let config = MqttConfig::from_env();
264        assert_eq!(config.host, "localhost");
265        assert_eq!(config.port, 1883);
266        assert_eq!(config.client_id, "koprogo-backend");
267        assert_eq!(config.subscribe_topic, "koprogo/+/energy/#");
268    }
269}