koprogo_api/infrastructure/mqtt/
mqtt_energy_adapter.rs1use crate::application::ports::iot_repository::IoTRepository;
2use crate::application::ports::mqtt_energy_port::{
3 MqttEnergyPort, MqttError, MqttIncomingReadingDto,
4};
5use crate::domain::entities::iot_reading::IoTReading;
6use async_trait::async_trait;
7use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS};
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tokio::task::JoinHandle;
11use tracing::{error, info, warn};
12use uuid::Uuid;
13
14#[derive(Debug, Clone)]
23pub struct MqttConfig {
24 pub host: String,
25 pub port: u16,
26 pub client_id: String,
27 pub username: String,
28 pub password: String,
29 pub subscribe_topic: String,
30}
31
32impl MqttConfig {
33 pub fn from_env() -> Self {
34 Self {
35 host: std::env::var("MQTT_HOST").unwrap_or_else(|_| "localhost".to_string()),
36 port: std::env::var("MQTT_PORT")
37 .unwrap_or_else(|_| "1883".to_string())
38 .parse()
39 .unwrap_or(1883),
40 client_id: std::env::var("MQTT_CLIENT_ID")
41 .unwrap_or_else(|_| "koprogo-backend".to_string()),
42 username: std::env::var("MQTT_USERNAME").unwrap_or_default(),
43 password: std::env::var("MQTT_PASSWORD").unwrap_or_default(),
44 subscribe_topic: std::env::var("MQTT_TOPIC")
45 .unwrap_or_else(|_| "koprogo/+/energy/#".to_string()),
46 }
47 }
48}
49
50pub struct MqttEnergyAdapter {
56 config: MqttConfig,
57 iot_repo: Arc<dyn IoTRepository>,
58 listener_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
59}
60
61impl MqttEnergyAdapter {
62 pub fn new(config: MqttConfig, iot_repo: Arc<dyn IoTRepository>) -> Self {
63 Self {
64 config,
65 iot_repo,
66 listener_handle: Arc::new(Mutex::new(None)),
67 }
68 }
69
70 pub fn parse_topic(topic: &str) -> Result<(Uuid, Uuid), MqttError> {
75 let parts: Vec<&str> = topic.split('/').collect();
76 if parts.len() < 5 || parts[0] != "koprogo" || parts[2] != "energy" {
77 return Err(MqttError::InvalidTopic(format!(
78 "Expected 'koprogo/{{copropriete_id}}/energy/{{unit_id}}/{{metric}}', got: {}",
79 topic
80 )));
81 }
82 let copropriete_id = Uuid::parse_str(parts[1]).map_err(|_| {
83 MqttError::InvalidTopic(format!("Invalid copropriete_id UUID: {}", parts[1]))
84 })?;
85 let unit_id = Uuid::parse_str(parts[3])
86 .map_err(|_| MqttError::InvalidTopic(format!("Invalid unit_id UUID: {}", parts[3])))?;
87 Ok((copropriete_id, unit_id))
88 }
89}
90
91#[async_trait]
92impl MqttEnergyPort for MqttEnergyAdapter {
93 async fn start_listening(&self) -> Result<(), MqttError> {
94 let mut handle = self.listener_handle.lock().await;
95 if handle.is_some() {
96 return Err(MqttError::AlreadyRunning);
97 }
98
99 let mut opts =
100 MqttOptions::new(&self.config.client_id, &self.config.host, self.config.port);
101 if !self.config.username.is_empty() {
102 opts.set_credentials(&self.config.username, &self.config.password);
103 }
104 opts.set_keep_alive(std::time::Duration::from_secs(30));
105 opts.set_clean_session(false);
107
108 let (client, mut eventloop) = AsyncClient::new(opts, 128);
109
110 client
111 .subscribe(&self.config.subscribe_topic, QoS::AtLeastOnce)
112 .await
113 .map_err(|e| MqttError::SubscriptionFailed {
114 topic: self.config.subscribe_topic.clone(),
115 reason: e.to_string(),
116 })?;
117
118 let iot_repo = Arc::clone(&self.iot_repo);
119 let topic_pattern = self.config.subscribe_topic.clone();
120
121 let join_handle = tokio::spawn(async move {
122 info!("MQTT listener started on topic: {}", topic_pattern);
123 loop {
124 match eventloop.poll().await {
125 Ok(Event::Incoming(Packet::Publish(msg))) => {
126 let topic = msg.topic.clone();
127 match Self::parse_topic(&topic) {
128 Ok((copropriete_id, _unit_id)) => {
129 match serde_json::from_slice::<MqttIncomingReadingDto>(&msg.payload)
130 {
131 Ok(dto) => {
132 match IoTReading::new(
133 copropriete_id,
134 dto.device_type,
135 dto.metric_type,
136 dto.value,
137 dto.unit.clone(),
138 dto.ts,
139 "mqtt_home_assistant".to_string(),
140 ) {
141 Ok(reading) => {
142 if let Err(e) =
143 iot_repo.create_reading(&reading).await
144 {
145 error!(
146 "Failed to persist MQTT reading from {}: {}",
147 topic, e
148 );
149 }
150 }
151 Err(e) => {
152 warn!(
153 "Domain validation failed for MQTT reading from {}: {}",
154 topic, e
155 );
156 }
157 }
158 }
159 Err(e) => {
160 warn!("Failed to parse MQTT payload on {}: {}", topic, e);
161 }
162 }
163 }
164 Err(e) => {
165 warn!("Invalid MQTT topic {}: {}", topic, e);
166 }
167 }
168 }
169 Ok(Event::Incoming(Packet::ConnAck(_))) => {
170 info!("MQTT connected to broker");
171 }
172 Ok(Event::Incoming(Packet::Disconnect)) => {
173 warn!("MQTT disconnected, rumqttc will reconnect automatically");
174 }
175 Err(e) => {
176 error!("MQTT event loop error: {}", e);
177 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
178 }
179 _ => {}
180 }
181 }
182 });
183
184 *handle = Some(join_handle);
185 info!(
186 "MQTT listener spawned for topic pattern: {}",
187 self.config.subscribe_topic
188 );
189 Ok(())
190 }
191
192 async fn stop_listening(&self) -> Result<(), MqttError> {
193 let mut handle = self.listener_handle.lock().await;
194 if let Some(h) = handle.take() {
195 h.abort();
196 info!("MQTT listener stopped");
197 }
198 Ok(())
199 }
200
201 async fn publish_alert(
202 &self,
203 _copropriete_id: Uuid,
204 alert_type: &str,
205 _payload: &str,
206 ) -> Result<(), MqttError> {
207 info!("MQTT alert publish queued (not yet wired): {}", alert_type);
209 Ok(())
210 }
211
212 async fn is_running(&self) -> bool {
213 self.listener_handle.lock().await.is_some()
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 #[test]
222 fn test_parse_topic_valid() {
223 let copropriete_id = Uuid::new_v4();
224 let unit_id = Uuid::new_v4();
225 let topic = format!("koprogo/{}/energy/{}/electricity", copropriete_id, unit_id);
226 let (parsed_copro, parsed_unit) = MqttEnergyAdapter::parse_topic(&topic).unwrap();
227 assert_eq!(parsed_copro, copropriete_id);
228 assert_eq!(parsed_unit, unit_id);
229 }
230
231 #[test]
232 fn test_parse_topic_invalid_prefix() {
233 let topic = "other/abc/energy/def/electricity";
234 assert!(MqttEnergyAdapter::parse_topic(topic).is_err());
235 }
236
237 #[test]
238 fn test_parse_topic_missing_energy_segment() {
239 let topic = "koprogo/abc-def/readings/xyz/power";
240 assert!(MqttEnergyAdapter::parse_topic(topic).is_err());
241 }
242
243 #[test]
244 fn test_parse_topic_invalid_uuid() {
245 let topic = "koprogo/not-a-uuid/energy/also-not-a-uuid/electricity";
246 assert!(MqttEnergyAdapter::parse_topic(topic).is_err());
247 }
248
249 #[test]
250 fn test_parse_topic_too_short() {
251 let topic = "koprogo/abc/energy";
252 assert!(MqttEnergyAdapter::parse_topic(topic).is_err());
253 }
254
255 #[test]
256 fn test_mqtt_config_defaults() {
257 std::env::remove_var("MQTT_HOST");
259 std::env::remove_var("MQTT_PORT");
260 std::env::remove_var("MQTT_CLIENT_ID");
261 std::env::remove_var("MQTT_TOPIC");
262
263 let config = MqttConfig::from_env();
264 assert_eq!(config.host, "localhost");
265 assert_eq!(config.port, 1883);
266 assert_eq!(config.client_id, "koprogo-backend");
267 assert_eq!(config.subscribe_topic, "koprogo/+/energy/#");
268 }
269}