koprogo_api/infrastructure/storage/
s3_storage.rs1use super::{metrics::record_storage_operation, StorageProvider};
2use async_trait::async_trait;
3use aws_config::meta::region::RegionProviderChain;
4use aws_config::BehaviorVersion;
5use aws_credential_types::provider::SharedCredentialsProvider;
6use aws_credential_types::Credentials;
7use aws_sdk_s3::config::{Builder as S3ConfigBuilder, Region};
8use aws_sdk_s3::error::SdkError;
9use aws_sdk_s3::primitives::ByteStream;
10use aws_sdk_s3::Client;
11use std::env;
12use std::sync::Arc;
13use std::time::Instant;
14use uuid::Uuid;
15
16#[derive(Clone, Debug)]
18pub struct S3StorageConfig {
19 pub bucket: String,
20 pub region: Option<String>,
21 pub endpoint: Option<String>,
22 pub access_key: String,
23 pub secret_key: String,
24 pub force_path_style: bool,
25 pub key_prefix: Option<String>,
26}
27
28impl S3StorageConfig {
29 pub fn from_env() -> Result<Self, String> {
31 let bucket =
32 env::var("S3_BUCKET").map_err(|_| "S3_BUCKET is required when using s3 storage")?;
33 let access_key = env::var("S3_ACCESS_KEY")
34 .map_err(|_| "S3_ACCESS_KEY is required when using s3 storage")?;
35 let secret_key = env::var("S3_SECRET_KEY")
36 .map_err(|_| "S3_SECRET_KEY is required when using s3 storage")?;
37
38 let region = env::var("S3_REGION").ok();
39 let endpoint = env::var("S3_ENDPOINT").ok();
40 let key_prefix = env::var("S3_KEY_PREFIX").ok();
41 let force_path_style = env::var("S3_FORCE_PATH_STYLE")
42 .unwrap_or_else(|_| "true".to_string())
43 .parse::<bool>()
44 .unwrap_or(true);
45
46 Ok(Self {
47 bucket,
48 region,
49 endpoint,
50 access_key,
51 secret_key,
52 force_path_style,
53 key_prefix,
54 })
55 }
56}
57
58pub struct S3Storage {
59 client: Client,
60 bucket: String,
61 key_prefix: Option<String>,
62}
63
64impl S3Storage {
65 pub async fn from_config(config: S3StorageConfig) -> Result<Self, String> {
67 let S3StorageConfig {
68 bucket,
69 region,
70 endpoint,
71 access_key,
72 secret_key,
73 force_path_style,
74 key_prefix,
75 } = config;
76
77 let region_provider = if let Some(ref region) = region {
78 RegionProviderChain::first_try(Region::new(region.clone()))
79 } else {
80 RegionProviderChain::default_provider()
81 };
82
83 let credentials = SharedCredentialsProvider::new(Credentials::new(
84 access_key,
85 secret_key,
86 None,
87 None,
88 "koprogo-storage",
89 ));
90
91 let shared_config = aws_config::defaults(BehaviorVersion::latest())
92 .region(region_provider)
93 .credentials_provider(credentials)
94 .load()
95 .await;
96
97 let mut builder = S3ConfigBuilder::from(&shared_config);
98
99 if let Some(region) = region {
100 builder = builder.region(Region::new(region));
101 }
102
103 if let Some(endpoint) = endpoint {
104 builder = builder.endpoint_url(endpoint);
105 }
106
107 if force_path_style {
108 builder = builder.force_path_style(true);
109 }
110
111 let client = Client::from_conf(builder.build());
112
113 Self::ensure_bucket(&client, &bucket).await?;
114
115 Ok(Self {
116 client,
117 bucket,
118 key_prefix,
119 })
120 }
121
122 fn build_key(&self, building_id: Uuid, original_name: &str) -> String {
123 let sanitized = Self::sanitize_filename(original_name);
124 let unique = format!("{}_{}", Uuid::new_v4(), sanitized);
125 let key = format!("{}/{}", building_id, unique);
126 if let Some(prefix) = &self.key_prefix {
127 format!("{}/{}", prefix.trim_end_matches('/'), key)
128 } else {
129 key
130 }
131 }
132
133 fn sanitize_filename(filename: &str) -> String {
134 filename.replace("..", "_").replace(['/', '\\'], "_")
135 }
136}
137
138impl S3Storage {
139 async fn ensure_bucket(client: &Client, bucket: &str) -> Result<(), String> {
140 match client.head_bucket().bucket(bucket).send().await {
141 Ok(_) => Ok(()),
142 Err(SdkError::ServiceError(err)) if err.err().is_not_found() => {
143 match client.create_bucket().bucket(bucket).send().await {
144 Ok(_) => Ok(()),
145 Err(SdkError::ServiceError(err))
146 if err.err().is_bucket_already_exists()
147 || err.err().is_bucket_already_owned_by_you() =>
148 {
149 Ok(())
150 }
151 Err(e) => Err(format!("Failed to create bucket `{}`: {}", bucket, e)),
152 }
153 }
154 Err(e) => Err(format!("Failed to verify bucket `{}`: {}", bucket, e)),
155 }
156 }
157}
158
159#[async_trait]
160impl StorageProvider for S3Storage {
161 async fn save_file(
162 &self,
163 building_id: Uuid,
164 filename: &str,
165 content: &[u8],
166 ) -> Result<String, String> {
167 let start = Instant::now();
168 let key = self.build_key(building_id, filename);
169
170 let result = self
171 .client
172 .put_object()
173 .bucket(&self.bucket)
174 .key(&key)
175 .body(ByteStream::from(content.to_vec()))
176 .send()
177 .await
178 .map_err(|e| format!("Failed to upload object: {}", e))
179 .map(|_| key.clone());
180
181 record_storage_operation(
182 "s3",
183 "save_file",
184 start.elapsed(),
185 result.as_ref().map(|_| ()).map_err(|e| e.as_str()),
186 );
187
188 result
189 }
190
191 async fn read_file(&self, relative_path: &str) -> Result<Vec<u8>, String> {
192 let start = Instant::now();
193 let result = match self
194 .client
195 .get_object()
196 .bucket(&self.bucket)
197 .key(relative_path)
198 .send()
199 .await
200 {
201 Ok(output) => match output.body.collect().await {
202 Ok(data) => Ok(data.into_bytes().to_vec()),
203 Err(e) => Err(format!("Failed to read object body: {}", e)),
204 },
205 Err(e) => Err(format!("Failed to fetch object: {}", e)),
206 };
207
208 record_storage_operation(
209 "s3",
210 "read_file",
211 start.elapsed(),
212 result.as_ref().map(|_| ()).map_err(|e| e.as_str()),
213 );
214
215 result
216 }
217
218 async fn delete_file(&self, relative_path: &str) -> Result<(), String> {
219 let start = Instant::now();
220 let result = self
221 .client
222 .delete_object()
223 .bucket(&self.bucket)
224 .key(relative_path)
225 .send()
226 .await
227 .map_err(|e| format!("Failed to delete object: {}", e))
228 .map(|_| ());
229
230 record_storage_operation(
231 "s3",
232 "delete_file",
233 start.elapsed(),
234 result.as_ref().map(|_| ()).map_err(|e| e.as_str()),
235 );
236
237 result
238 }
239
240 async fn file_exists(&self, relative_path: &str) -> bool {
241 let start = Instant::now();
242 let result = self
243 .client
244 .head_object()
245 .bucket(&self.bucket)
246 .key(relative_path)
247 .send()
248 .await;
249
250 let exists = match result {
251 Ok(_) => true,
252 Err(SdkError::ServiceError(err)) if err.err().is_not_found() => false,
253 Err(_) => false,
254 };
255
256 record_storage_operation("s3", "file_exists", start.elapsed(), Ok(()));
257
258 exists
259 }
260}
261
262impl std::fmt::Debug for S3Storage {
263 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264 f.debug_struct("S3Storage")
265 .field("bucket", &self.bucket)
266 .field("key_prefix", &self.key_prefix)
267 .finish()
268 }
269}
270
271pub type SharedS3Storage = Arc<S3Storage>;