koprogo_api/infrastructure/grid/
boinc_grid_adapter.rs1use crate::application::ports::grid_participation_port::{
2 BoincConsent, GridError, GridParticipationPort, GridTask, GridTaskId, GridTaskStatus,
3};
4use async_trait::async_trait;
5use chrono::Utc;
6use sqlx::PgPool;
7use tokio::process::Command;
8use tracing::{info, warn};
9use uuid::Uuid;
10
11pub struct BoincGridAdapter {
21 pool: PgPool,
22 boinc_rpc_password: String,
23 boinc_host: String,
24 boinc_port: u16,
25}
26
27impl BoincGridAdapter {
28 pub fn new(pool: PgPool) -> Self {
29 Self {
30 pool,
31 boinc_rpc_password: std::env::var("BOINC_RPC_PASSWORD").unwrap_or_default(),
32 boinc_host: std::env::var("BOINC_HOST").unwrap_or_else(|_| "localhost".to_string()),
33 boinc_port: std::env::var("BOINC_PORT")
34 .unwrap_or_else(|_| "31416".to_string())
35 .parse()
36 .unwrap_or(31416),
37 }
38 }
39
40 async fn run_boinccmd(&self, args: &[&str]) -> Result<String, GridError> {
43 let mut cmd = Command::new("boinccmd");
44 cmd.arg("--host")
45 .arg(&self.boinc_host)
46 .arg("--port")
47 .arg(self.boinc_port.to_string())
48 .arg("--passwd")
49 .arg(&self.boinc_rpc_password);
50 for arg in args {
51 cmd.arg(arg);
52 }
53 let output = cmd.output().await.map_err(|e| {
54 GridError::ProcessError(format!("boinccmd not found or spawn failed: {e}"))
55 })?;
56 if !output.status.success() {
57 let stderr = String::from_utf8_lossy(&output.stderr);
58 return Err(GridError::RpcFailed(stderr.to_string()));
59 }
60 Ok(String::from_utf8_lossy(&output.stdout).to_string())
61 }
62}
63
64#[async_trait]
65impl GridParticipationPort for BoincGridAdapter {
66 async fn check_consent(&self, owner_id: Uuid) -> Result<bool, GridError> {
67 let row: Option<(bool,)> = sqlx::query_as(
68 r#"SELECT granted FROM boinc_consents
69 WHERE owner_id = $1 AND revoked_at IS NULL
70 ORDER BY granted_at DESC LIMIT 1"#,
71 )
72 .bind(owner_id)
73 .fetch_optional(&self.pool)
74 .await
75 .map_err(|e| GridError::RpcFailed(e.to_string()))?;
76
77 Ok(row.map(|(g,)| g).unwrap_or(false))
78 }
79
80 async fn get_consent(&self, owner_id: Uuid) -> Result<Option<BoincConsent>, GridError> {
81 let row: Option<(
82 Uuid,
83 Uuid,
84 bool,
85 Option<chrono::DateTime<Utc>>,
86 Option<chrono::DateTime<Utc>>,
87 Option<String>,
88 String,
89 )> = sqlx::query_as(
90 r#"SELECT owner_id, organization_id, granted, granted_at, revoked_at,
91 consent_ip, consent_version
92 FROM boinc_consents
93 WHERE owner_id = $1
94 ORDER BY COALESCE(granted_at, created_at) DESC
95 LIMIT 1"#,
96 )
97 .bind(owner_id)
98 .fetch_optional(&self.pool)
99 .await
100 .map_err(|e| GridError::RpcFailed(e.to_string()))?;
101
102 Ok(row.map(
103 |(oid, org_id, granted, granted_at, revoked_at, consent_ip, consent_version)| {
104 BoincConsent {
105 owner_id: oid,
106 organization_id: org_id,
107 granted,
108 granted_at,
109 revoked_at,
110 consent_ip,
111 consent_version,
112 }
113 },
114 ))
115 }
116
117 async fn grant_consent(
118 &self,
119 owner_id: Uuid,
120 organization_id: Uuid,
121 consent_version: &str,
122 consent_ip: Option<&str>,
123 ) -> Result<BoincConsent, GridError> {
124 let now = Utc::now();
125 sqlx::query(
126 r#"INSERT INTO boinc_consents
127 (id, owner_id, organization_id, granted, granted_at, consent_ip, consent_version)
128 VALUES ($1, $2, $3, true, $4, $5, $6)
129 ON CONFLICT (owner_id) DO UPDATE
130 SET granted = true,
131 granted_at = $4,
132 revoked_at = NULL,
133 consent_ip = $5,
134 consent_version = $6,
135 updated_at = NOW()"#,
136 )
137 .bind(Uuid::new_v4())
138 .bind(owner_id)
139 .bind(organization_id)
140 .bind(now)
141 .bind(consent_ip)
142 .bind(consent_version)
143 .execute(&self.pool)
144 .await
145 .map_err(|e| GridError::RpcFailed(e.to_string()))?;
146
147 info!("BOINC consent granted for owner {}", owner_id);
148 Ok(BoincConsent {
149 owner_id,
150 organization_id,
151 granted: true,
152 granted_at: Some(now),
153 revoked_at: None,
154 consent_ip: consent_ip.map(|s| s.to_string()),
155 consent_version: consent_version.to_string(),
156 })
157 }
158
159 async fn revoke_consent(&self, owner_id: Uuid) -> Result<(), GridError> {
160 sqlx::query(
161 r#"UPDATE boinc_consents
162 SET granted = false, revoked_at = NOW(), updated_at = NOW()
163 WHERE owner_id = $1"#,
164 )
165 .bind(owner_id)
166 .execute(&self.pool)
167 .await
168 .map_err(|e| GridError::RpcFailed(e.to_string()))?;
169
170 info!("BOINC consent revoked for owner {}", owner_id);
171 Ok(())
172 }
173
174 async fn submit_task(&self, task: GridTask) -> Result<GridTaskId, GridError> {
175 let kind_json =
176 serde_json::to_value(&task.kind).map_err(|e| GridError::ProcessError(e.to_string()))?;
177
178 sqlx::query(
179 r#"INSERT INTO grid_tasks
180 (id, copropriete_id, organization_id, kind_json, status, priority, deadline_at)
181 VALUES ($1, $2, $3, $4, 'queued', $5, $6)"#,
182 )
183 .bind(task.internal_id)
184 .bind(task.copropriete_id)
185 .bind(task.organization_id)
186 .bind(kind_json)
187 .bind(task.priority as i32)
188 .bind(task.deadline)
189 .execute(&self.pool)
190 .await
191 .map_err(|e| GridError::RpcFailed(e.to_string()))?;
192
193 match self.run_boinccmd(&["--get_simple_gui_info"]).await {
195 Ok(_) => info!("BOINC daemon reachable, task {} queued", task.internal_id),
196 Err(e) => warn!(
197 "BOINC daemon not reachable (task stored in DB for retry): {}",
198 e
199 ),
200 }
201
202 Ok(GridTaskId(task.internal_id.to_string()))
203 }
204
205 async fn poll_result(&self, task_id: &GridTaskId) -> Result<GridTaskStatus, GridError> {
206 let id =
207 Uuid::parse_str(&task_id.0).map_err(|_| GridError::TaskNotFound(task_id.0.clone()))?;
208
209 let row: Option<(
210 String,
211 Option<String>,
212 Option<chrono::DateTime<Utc>>,
213 Option<chrono::DateTime<Utc>>,
214 Option<chrono::DateTime<Utc>>,
215 Option<String>,
216 )> = sqlx::query_as(
217 r#"SELECT status, result_json::text,
218 started_at, completed_at, failed_at, failure_reason
219 FROM grid_tasks WHERE id = $1"#,
220 )
221 .bind(id)
222 .fetch_optional(&self.pool)
223 .await
224 .map_err(|e| GridError::RpcFailed(e.to_string()))?;
225
226 let (status, result_json, started_at, completed_at, failed_at, failure_reason) =
227 row.ok_or_else(|| GridError::TaskNotFound(task_id.0.clone()))?;
228
229 Ok(match status.as_str() {
230 "running" => GridTaskStatus::Running {
231 started_at: started_at.unwrap_or_else(Utc::now),
232 },
233 "completed" => GridTaskStatus::Completed {
234 completed_at: completed_at.unwrap_or_else(Utc::now),
235 result_json: result_json.unwrap_or_default(),
236 },
237 "failed" => GridTaskStatus::Failed {
238 failed_at: failed_at.unwrap_or_else(Utc::now),
239 reason: failure_reason.unwrap_or_default(),
240 },
241 "cancelled" => GridTaskStatus::Cancelled,
242 _ => GridTaskStatus::Queued,
243 })
244 }
245
246 async fn cancel_task(&self, task_id: &GridTaskId) -> Result<(), GridError> {
247 let id =
248 Uuid::parse_str(&task_id.0).map_err(|_| GridError::TaskNotFound(task_id.0.clone()))?;
249
250 sqlx::query("UPDATE grid_tasks SET status = 'cancelled', updated_at = NOW() WHERE id = $1")
251 .bind(id)
252 .execute(&self.pool)
253 .await
254 .map_err(|e| GridError::RpcFailed(e.to_string()))?;
255
256 Ok(())
257 }
258}