koprogo_api/infrastructure/grid/
boinc_grid_adapter.rs

1use crate::application::ports::grid_participation_port::{
2    BoincConsent, GridError, GridParticipationPort, GridTask, GridTaskId, GridTaskStatus,
3};
4use async_trait::async_trait;
5use chrono::Utc;
6use sqlx::PgPool;
7use tokio::process::Command;
8use tracing::{info, warn};
9use uuid::Uuid;
10
11/// Adapter BOINC: implémente GridParticipationPort via boinccmd + PostgreSQL.
12///
13/// Consentement GDPR stocké dans la table `boinc_consents`.
14/// Tâches stockées dans `grid_tasks` avec statut polling.
15///
16/// Variables d'environnement:
17/// - BOINC_HOST (défaut: "localhost")
18/// - BOINC_PORT (défaut: 31416)
19/// - BOINC_RPC_PASSWORD
20pub struct BoincGridAdapter {
21    pool: PgPool,
22    boinc_rpc_password: String,
23    boinc_host: String,
24    boinc_port: u16,
25}
26
27impl BoincGridAdapter {
28    pub fn new(pool: PgPool) -> Self {
29        Self {
30            pool,
31            boinc_rpc_password: std::env::var("BOINC_RPC_PASSWORD").unwrap_or_default(),
32            boinc_host: std::env::var("BOINC_HOST").unwrap_or_else(|_| "localhost".to_string()),
33            boinc_port: std::env::var("BOINC_PORT")
34                .unwrap_or_else(|_| "31416".to_string())
35                .parse()
36                .unwrap_or(31416),
37        }
38    }
39
40    /// Exécute boinccmd avec les arguments donnés.
41    /// Retourne stdout en cas de succès, GridError::RpcFailed en cas d'échec.
42    async fn run_boinccmd(&self, args: &[&str]) -> Result<String, GridError> {
43        let mut cmd = Command::new("boinccmd");
44        cmd.arg("--host")
45            .arg(&self.boinc_host)
46            .arg("--port")
47            .arg(self.boinc_port.to_string())
48            .arg("--passwd")
49            .arg(&self.boinc_rpc_password);
50        for arg in args {
51            cmd.arg(arg);
52        }
53        let output = cmd.output().await.map_err(|e| {
54            GridError::ProcessError(format!("boinccmd not found or spawn failed: {e}"))
55        })?;
56        if !output.status.success() {
57            let stderr = String::from_utf8_lossy(&output.stderr);
58            return Err(GridError::RpcFailed(stderr.to_string()));
59        }
60        Ok(String::from_utf8_lossy(&output.stdout).to_string())
61    }
62}
63
64#[async_trait]
65impl GridParticipationPort for BoincGridAdapter {
66    async fn check_consent(&self, owner_id: Uuid) -> Result<bool, GridError> {
67        let row: Option<(bool,)> = sqlx::query_as(
68            r#"SELECT granted FROM boinc_consents
69               WHERE owner_id = $1 AND revoked_at IS NULL
70               ORDER BY granted_at DESC LIMIT 1"#,
71        )
72        .bind(owner_id)
73        .fetch_optional(&self.pool)
74        .await
75        .map_err(|e| GridError::RpcFailed(e.to_string()))?;
76
77        Ok(row.map(|(g,)| g).unwrap_or(false))
78    }
79
80    async fn get_consent(&self, owner_id: Uuid) -> Result<Option<BoincConsent>, GridError> {
81        let row: Option<(
82            Uuid,
83            Uuid,
84            bool,
85            Option<chrono::DateTime<Utc>>,
86            Option<chrono::DateTime<Utc>>,
87            Option<String>,
88            String,
89        )> = sqlx::query_as(
90            r#"SELECT owner_id, organization_id, granted, granted_at, revoked_at,
91                      consent_ip, consent_version
92               FROM boinc_consents
93               WHERE owner_id = $1
94               ORDER BY COALESCE(granted_at, created_at) DESC
95               LIMIT 1"#,
96        )
97        .bind(owner_id)
98        .fetch_optional(&self.pool)
99        .await
100        .map_err(|e| GridError::RpcFailed(e.to_string()))?;
101
102        Ok(row.map(
103            |(oid, org_id, granted, granted_at, revoked_at, consent_ip, consent_version)| {
104                BoincConsent {
105                    owner_id: oid,
106                    organization_id: org_id,
107                    granted,
108                    granted_at,
109                    revoked_at,
110                    consent_ip,
111                    consent_version,
112                }
113            },
114        ))
115    }
116
117    async fn grant_consent(
118        &self,
119        owner_id: Uuid,
120        organization_id: Uuid,
121        consent_version: &str,
122        consent_ip: Option<&str>,
123    ) -> Result<BoincConsent, GridError> {
124        let now = Utc::now();
125        sqlx::query(
126            r#"INSERT INTO boinc_consents
127               (id, owner_id, organization_id, granted, granted_at, consent_ip, consent_version)
128               VALUES ($1, $2, $3, true, $4, $5, $6)
129               ON CONFLICT (owner_id) DO UPDATE
130               SET granted = true,
131                   granted_at = $4,
132                   revoked_at = NULL,
133                   consent_ip = $5,
134                   consent_version = $6,
135                   updated_at = NOW()"#,
136        )
137        .bind(Uuid::new_v4())
138        .bind(owner_id)
139        .bind(organization_id)
140        .bind(now)
141        .bind(consent_ip)
142        .bind(consent_version)
143        .execute(&self.pool)
144        .await
145        .map_err(|e| GridError::RpcFailed(e.to_string()))?;
146
147        info!("BOINC consent granted for owner {}", owner_id);
148        Ok(BoincConsent {
149            owner_id,
150            organization_id,
151            granted: true,
152            granted_at: Some(now),
153            revoked_at: None,
154            consent_ip: consent_ip.map(|s| s.to_string()),
155            consent_version: consent_version.to_string(),
156        })
157    }
158
159    async fn revoke_consent(&self, owner_id: Uuid) -> Result<(), GridError> {
160        sqlx::query(
161            r#"UPDATE boinc_consents
162               SET granted = false, revoked_at = NOW(), updated_at = NOW()
163               WHERE owner_id = $1"#,
164        )
165        .bind(owner_id)
166        .execute(&self.pool)
167        .await
168        .map_err(|e| GridError::RpcFailed(e.to_string()))?;
169
170        info!("BOINC consent revoked for owner {}", owner_id);
171        Ok(())
172    }
173
174    async fn submit_task(&self, task: GridTask) -> Result<GridTaskId, GridError> {
175        let kind_json =
176            serde_json::to_value(&task.kind).map_err(|e| GridError::ProcessError(e.to_string()))?;
177
178        sqlx::query(
179            r#"INSERT INTO grid_tasks
180               (id, copropriete_id, organization_id, kind_json, status, priority, deadline_at)
181               VALUES ($1, $2, $3, $4, 'queued', $5, $6)"#,
182        )
183        .bind(task.internal_id)
184        .bind(task.copropriete_id)
185        .bind(task.organization_id)
186        .bind(kind_json)
187        .bind(task.priority as i32)
188        .bind(task.deadline)
189        .execute(&self.pool)
190        .await
191        .map_err(|e| GridError::RpcFailed(e.to_string()))?;
192
193        // Tentative best-effort de notifier BOINC (ne bloque pas si boinccmd absent)
194        match self.run_boinccmd(&["--get_simple_gui_info"]).await {
195            Ok(_) => info!("BOINC daemon reachable, task {} queued", task.internal_id),
196            Err(e) => warn!(
197                "BOINC daemon not reachable (task stored in DB for retry): {}",
198                e
199            ),
200        }
201
202        Ok(GridTaskId(task.internal_id.to_string()))
203    }
204
205    async fn poll_result(&self, task_id: &GridTaskId) -> Result<GridTaskStatus, GridError> {
206        let id =
207            Uuid::parse_str(&task_id.0).map_err(|_| GridError::TaskNotFound(task_id.0.clone()))?;
208
209        let row: Option<(
210            String,
211            Option<String>,
212            Option<chrono::DateTime<Utc>>,
213            Option<chrono::DateTime<Utc>>,
214            Option<chrono::DateTime<Utc>>,
215            Option<String>,
216        )> = sqlx::query_as(
217            r#"SELECT status, result_json::text,
218                      started_at, completed_at, failed_at, failure_reason
219               FROM grid_tasks WHERE id = $1"#,
220        )
221        .bind(id)
222        .fetch_optional(&self.pool)
223        .await
224        .map_err(|e| GridError::RpcFailed(e.to_string()))?;
225
226        let (status, result_json, started_at, completed_at, failed_at, failure_reason) =
227            row.ok_or_else(|| GridError::TaskNotFound(task_id.0.clone()))?;
228
229        Ok(match status.as_str() {
230            "running" => GridTaskStatus::Running {
231                started_at: started_at.unwrap_or_else(Utc::now),
232            },
233            "completed" => GridTaskStatus::Completed {
234                completed_at: completed_at.unwrap_or_else(Utc::now),
235                result_json: result_json.unwrap_or_default(),
236            },
237            "failed" => GridTaskStatus::Failed {
238                failed_at: failed_at.unwrap_or_else(Utc::now),
239                reason: failure_reason.unwrap_or_default(),
240            },
241            "cancelled" => GridTaskStatus::Cancelled,
242            _ => GridTaskStatus::Queued,
243        })
244    }
245
246    async fn cancel_task(&self, task_id: &GridTaskId) -> Result<(), GridError> {
247        let id =
248            Uuid::parse_str(&task_id.0).map_err(|_| GridError::TaskNotFound(task_id.0.clone()))?;
249
250        sqlx::query("UPDATE grid_tasks SET status = 'cancelled', updated_at = NOW() WHERE id = $1")
251            .bind(id)
252            .execute(&self.pool)
253            .await
254            .map_err(|e| GridError::RpcFailed(e.to_string()))?;
255
256        Ok(())
257    }
258}