1pub mod accounts;
8mod cache;
9pub mod channels;
10pub mod corrupted_channels;
11pub mod db;
12pub mod errors;
13pub mod info;
14pub mod logs;
15pub mod peers;
16pub mod protocol;
17pub mod registry;
18pub mod resolver;
19mod ticket_manager;
20pub mod tickets;
21
22use std::path::PathBuf;
23
24use async_trait::async_trait;
25use futures::future::BoxFuture;
26pub use hopr_db_api as api;
27use hopr_db_api::{
28 logs::HoprDbLogOperations, peers::HoprDbPeersOperations, protocol::HoprDbProtocolOperations,
29 resolver::HoprDbResolverOperations, tickets::HoprDbTicketOperations,
30};
31use sea_orm::{ConnectionTrait, TransactionTrait};
32pub use sea_orm::{DatabaseConnection, DatabaseTransaction};
33
34use crate::{
35 accounts::HoprDbAccountOperations,
36 channels::HoprDbChannelOperations,
37 corrupted_channels::HoprDbCorruptedChannelOperations,
38 db::HoprDb,
39 errors::{DbSqlError, Result},
40 info::HoprDbInfoOperations,
41 registry::HoprDbRegistryOperations,
42};
43
44pub const SINGULAR_TABLE_FIXED_ID: i32 = 1;
46
47pub type DbTimestamp = chrono::DateTime<chrono::Utc>;
49
50#[derive(Debug)]
55pub struct OpenTransaction(DatabaseTransaction, TargetDb);
56
57impl OpenTransaction {
58 #[tracing::instrument(level = "trace", name = "Sql::perform_in_transaction", skip_all, err)]
61 pub async fn perform<F, T, E>(self, callback: F) -> std::result::Result<T, E>
62 where
63 F: for<'c> FnOnce(&'c OpenTransaction) -> BoxFuture<'c, std::result::Result<T, E>> + Send,
64 T: Send,
65 E: std::error::Error + From<DbSqlError>,
66 {
67 let start = std::time::Instant::now();
68 let res = callback(&self).await;
69
70 if res.is_ok() {
71 self.commit().await?;
72 } else {
73 self.rollback().await?;
74 }
75
76 tracing::trace!(
77 elapsed_ms = start.elapsed().as_millis(),
78 was_successful = res.is_ok(),
79 "transaction completed",
80 );
81
82 res
83 }
84
85 pub async fn commit(self) -> Result<()> {
87 Ok(self.0.commit().await?)
88 }
89
90 pub async fn rollback(self) -> Result<()> {
92 Ok(self.0.rollback().await?)
93 }
94}
95
96impl AsRef<DatabaseTransaction> for OpenTransaction {
97 fn as_ref(&self) -> &DatabaseTransaction {
98 &self.0
99 }
100}
101
102impl From<OpenTransaction> for DatabaseTransaction {
103 fn from(value: OpenTransaction) -> Self {
104 value.0
105 }
106}
107
108pub type OptTx<'a> = Option<&'a OpenTransaction>;
111
112#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
116pub enum TargetDb {
117 #[default]
118 Index,
120 Tickets,
122 Peers,
124 Logs,
126}
127
128#[async_trait]
129pub trait HoprDbGeneralModelOperations {
130 fn conn(&self, target_db: TargetDb) -> &DatabaseConnection;
135
136 async fn begin_transaction_in_db(&self, target: TargetDb) -> Result<OpenTransaction>;
138
139 async fn import_logs_db(self, src_dir: PathBuf) -> Result<()>;
180
181 async fn begin_transaction(&self) -> Result<OpenTransaction> {
183 self.begin_transaction_in_db(Default::default()).await
184 }
185
186 async fn nest_transaction_in_db(&self, tx: OptTx<'_>, target_db: TargetDb) -> Result<OpenTransaction> {
196 if let Some(t) = tx {
197 assert_eq!(t.1, target_db, "attempt to create nest into tx from a different db");
198 Ok(OpenTransaction(t.as_ref().begin().await?, target_db))
199 } else {
200 self.begin_transaction_in_db(target_db).await
201 }
202 }
203
204 async fn nest_transaction(&self, tx: OptTx<'_>) -> Result<OpenTransaction> {
206 self.nest_transaction_in_db(tx, Default::default()).await
207 }
208}
209
210#[async_trait]
211impl HoprDbGeneralModelOperations for HoprDb {
212 fn conn(&self, target_db: TargetDb) -> &DatabaseConnection {
214 match target_db {
215 TargetDb::Index => self.index_db.read_only(), TargetDb::Tickets => &self.tickets_db,
218 TargetDb::Peers => &self.peers_db,
219 TargetDb::Logs => &self.logs_db,
220 }
221 }
222
223 async fn begin_transaction_in_db(&self, target_db: TargetDb) -> Result<OpenTransaction> {
225 match target_db {
226 TargetDb::Index => Ok(OpenTransaction(
227 self.index_db.read_write().begin_with_config(None, None).await?, target_db,
230 )),
231 TargetDb::Tickets => Ok(OpenTransaction(
233 self.tickets_db.begin_with_config(None, None).await?,
234 target_db,
235 )),
236 TargetDb::Peers => Ok(OpenTransaction(
237 self.peers_db.begin_with_config(None, None).await?,
238 target_db,
239 )),
240 TargetDb::Logs => Ok(OpenTransaction(
241 self.logs_db.begin_with_config(None, None).await?,
242 target_db,
243 )),
244 }
245 }
246
247 async fn import_logs_db(self, src_dir: PathBuf) -> Result<()> {
248 let src_db_path = src_dir.join("hopr_logs.db");
249 if !src_db_path.exists() {
250 return Err(DbSqlError::Construction(format!(
251 "Source logs database file does not exist: {}",
252 src_db_path.display()
253 )));
254 }
255
256 let sql = format!(
257 r#"
258 ATTACH DATABASE '{}' AS source_logs;
259 BEGIN TRANSACTION;
260 DELETE FROM log;
261 DELETE FROM log_status;
262 DELETE FROM log_topic_info;
263 INSERT INTO log_topic_info SELECT * FROM source_logs.log_topic_info;
264 INSERT INTO log_status SELECT * FROM source_logs.log_status;
265 INSERT INTO log SELECT * FROM source_logs.log;
266 COMMIT;
267 DETACH DATABASE source_logs;
268 "#,
269 src_db_path.to_string_lossy().replace("'", "''")
270 );
271
272 let logs_conn = self.conn(TargetDb::Logs);
273
274 logs_conn
275 .execute_unprepared(sql.as_str())
276 .await
277 .map_err(|e| DbSqlError::Construction(format!("Failed to import logs data: {e}")))?;
278
279 Ok(())
280 }
281}
282
283pub trait HoprDbAllOperations:
285 HoprDbGeneralModelOperations
286 + HoprDbAccountOperations
287 + HoprDbChannelOperations
288 + HoprDbCorruptedChannelOperations
289 + HoprDbInfoOperations
290 + HoprDbLogOperations
291 + HoprDbPeersOperations
292 + HoprDbProtocolOperations
293 + HoprDbRegistryOperations
294 + HoprDbResolverOperations
295 + HoprDbTicketOperations
296{
297}
298
299#[doc(hidden)]
300pub mod prelude {
301 pub use hopr_db_api::{logs::*, peers::*, protocol::*, resolver::*, tickets::*};
302
303 pub use super::*;
304 pub use crate::{accounts::*, channels::*, corrupted_channels::*, db::*, errors::*, info::*, registry::*};
305}