koprogo_api/infrastructure/database/repositories/
owner_credit_balance_repository_impl.rs1use crate::application::ports::OwnerCreditBalanceRepository;
2use crate::domain::entities::OwnerCreditBalance;
3use crate::infrastructure::database::pool::DbPool;
4use async_trait::async_trait;
5use sqlx::Row;
6use uuid::Uuid;
7
8pub struct PostgresOwnerCreditBalanceRepository {
9 pool: DbPool,
10}
11
12impl PostgresOwnerCreditBalanceRepository {
13 pub fn new(pool: DbPool) -> Self {
14 Self { pool }
15 }
16}
17
18#[async_trait]
19impl OwnerCreditBalanceRepository for PostgresOwnerCreditBalanceRepository {
20 async fn create(&self, balance: &OwnerCreditBalance) -> Result<OwnerCreditBalance, String> {
21 sqlx::query(
22 r#"
23 INSERT INTO owner_credit_balances (
24 owner_id, building_id, credits_earned, credits_spent,
25 balance, total_exchanges, average_rating,
26 created_at, updated_at
27 )
28 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
29 "#,
30 )
31 .bind(balance.owner_id)
32 .bind(balance.building_id)
33 .bind(balance.credits_earned)
34 .bind(balance.credits_spent)
35 .bind(balance.balance)
36 .bind(balance.total_exchanges)
37 .bind(balance.average_rating)
38 .bind(balance.created_at)
39 .bind(balance.updated_at)
40 .execute(&self.pool)
41 .await
42 .map_err(|e| format!("Database error: {}", e))?;
43
44 Ok(balance.clone())
45 }
46
47 async fn find_by_owner_and_building(
48 &self,
49 owner_id: Uuid,
50 building_id: Uuid,
51 ) -> Result<Option<OwnerCreditBalance>, String> {
52 let row = sqlx::query(
53 r#"
54 SELECT
55 owner_id, building_id, credits_earned, credits_spent,
56 balance, total_exchanges, average_rating,
57 created_at, updated_at
58 FROM owner_credit_balances
59 WHERE owner_id = $1 AND building_id = $2
60 "#,
61 )
62 .bind(owner_id)
63 .bind(building_id)
64 .fetch_optional(&self.pool)
65 .await
66 .map_err(|e| format!("Database error: {}", e))?;
67
68 Ok(row.map(|row| map_row_to_balance(&row)))
69 }
70
71 async fn find_by_building(&self, building_id: Uuid) -> Result<Vec<OwnerCreditBalance>, String> {
72 let rows = sqlx::query(
73 r#"
74 SELECT
75 owner_id, building_id, credits_earned, credits_spent,
76 balance, total_exchanges, average_rating,
77 created_at, updated_at
78 FROM owner_credit_balances
79 WHERE building_id = $1
80 ORDER BY balance DESC
81 "#,
82 )
83 .bind(building_id)
84 .fetch_all(&self.pool)
85 .await
86 .map_err(|e| format!("Database error: {}", e))?;
87
88 Ok(rows.iter().map(map_row_to_balance).collect())
89 }
90
91 async fn find_by_owner(&self, owner_id: Uuid) -> Result<Vec<OwnerCreditBalance>, String> {
92 let rows = sqlx::query(
93 r#"
94 SELECT
95 owner_id, building_id, credits_earned, credits_spent,
96 balance, total_exchanges, average_rating,
97 created_at, updated_at
98 FROM owner_credit_balances
99 WHERE owner_id = $1
100 ORDER BY building_id
101 "#,
102 )
103 .bind(owner_id)
104 .fetch_all(&self.pool)
105 .await
106 .map_err(|e| format!("Database error: {}", e))?;
107
108 Ok(rows.iter().map(map_row_to_balance).collect())
109 }
110
111 async fn get_or_create(
112 &self,
113 owner_id: Uuid,
114 building_id: Uuid,
115 ) -> Result<OwnerCreditBalance, String> {
116 if let Some(balance) = self
118 .find_by_owner_and_building(owner_id, building_id)
119 .await?
120 {
121 return Ok(balance);
122 }
123
124 let new_balance = OwnerCreditBalance::new(owner_id, building_id);
126 self.create(&new_balance).await
127 }
128
129 async fn update(&self, balance: &OwnerCreditBalance) -> Result<OwnerCreditBalance, String> {
130 sqlx::query(
131 r#"
132 UPDATE owner_credit_balances
133 SET
134 credits_earned = $3,
135 credits_spent = $4,
136 balance = $5,
137 total_exchanges = $6,
138 average_rating = $7,
139 updated_at = $8
140 WHERE owner_id = $1 AND building_id = $2
141 "#,
142 )
143 .bind(balance.owner_id)
144 .bind(balance.building_id)
145 .bind(balance.credits_earned)
146 .bind(balance.credits_spent)
147 .bind(balance.balance)
148 .bind(balance.total_exchanges)
149 .bind(balance.average_rating)
150 .bind(balance.updated_at)
151 .execute(&self.pool)
152 .await
153 .map_err(|e| format!("Database error: {}", e))?;
154
155 Ok(balance.clone())
156 }
157
158 async fn delete(&self, owner_id: Uuid, building_id: Uuid) -> Result<bool, String> {
159 let result = sqlx::query(
160 "DELETE FROM owner_credit_balances WHERE owner_id = $1 AND building_id = $2",
161 )
162 .bind(owner_id)
163 .bind(building_id)
164 .execute(&self.pool)
165 .await
166 .map_err(|e| format!("Database error: {}", e))?;
167
168 Ok(result.rows_affected() > 0)
169 }
170
171 async fn get_leaderboard(
172 &self,
173 building_id: Uuid,
174 limit: i32,
175 ) -> Result<Vec<OwnerCreditBalance>, String> {
176 let rows = sqlx::query(
177 r#"
178 SELECT
179 owner_id, building_id, credits_earned, credits_spent,
180 balance, total_exchanges, average_rating,
181 created_at, updated_at
182 FROM owner_credit_balances
183 WHERE building_id = $1
184 ORDER BY balance DESC
185 LIMIT $2
186 "#,
187 )
188 .bind(building_id)
189 .bind(limit)
190 .fetch_all(&self.pool)
191 .await
192 .map_err(|e| format!("Database error: {}", e))?;
193
194 Ok(rows.iter().map(map_row_to_balance).collect())
195 }
196
197 async fn count_active_participants(&self, building_id: Uuid) -> Result<i64, String> {
198 let count: i64 = sqlx::query_scalar(
199 r#"
200 SELECT COUNT(*)
201 FROM owner_credit_balances
202 WHERE building_id = $1 AND total_exchanges > 0
203 "#,
204 )
205 .bind(building_id)
206 .fetch_one(&self.pool)
207 .await
208 .map_err(|e| format!("Database error: {}", e))?;
209
210 Ok(count)
211 }
212
213 async fn get_total_credits_in_circulation(&self, building_id: Uuid) -> Result<i32, String> {
214 let total: Option<i64> = sqlx::query_scalar(
215 r#"
216 SELECT COALESCE(SUM(ABS(balance)), 0)
217 FROM owner_credit_balances
218 WHERE building_id = $1
219 "#,
220 )
221 .bind(building_id)
222 .fetch_one(&self.pool)
223 .await
224 .map_err(|e| format!("Database error: {}", e))?;
225
226 Ok(total.unwrap_or(0) as i32)
227 }
228}
229
230fn map_row_to_balance(row: &sqlx::postgres::PgRow) -> OwnerCreditBalance {
232 OwnerCreditBalance {
233 owner_id: row.get("owner_id"),
234 building_id: row.get("building_id"),
235 credits_earned: row.get("credits_earned"),
236 credits_spent: row.get("credits_spent"),
237 balance: row.get("balance"),
238 total_exchanges: row.get("total_exchanges"),
239 average_rating: row.get("average_rating"),
240 created_at: row.get("created_at"),
241 updated_at: row.get("updated_at"),
242 }
243}