hopr_db_sql/lib.rs
1//! Crate for accessing database(s) of a HOPR node.
2//!
3//! Functionality defined here is meant to be used mostly by other higher-level crates.
4//! The crate provides database operations across multiple SQLite databases for scalability
5//! and supports importing logs database snapshots for fast synchronization.
6
7pub mod accounts;
8mod cache;
9pub mod channels;
10pub mod corrupted_channels;
11pub mod db;
12pub mod errors;
13pub mod info;
14pub mod logs;
15
16use std::path::PathBuf;
17
18use async_trait::async_trait;
19pub use cache::CacheKeyMapper;
20pub use db::{HoprIndexerDb, HoprIndexerDbConfig};
21use errors::{DbSqlError, Result};
22use futures::future::BoxFuture;
23use sea_orm::{ConnectionTrait, TransactionTrait};
24pub use sea_orm::{DatabaseConnection, DatabaseTransaction};
25
26/// Primary key used in tables that contain only a single row.
27pub const SINGULAR_TABLE_FIXED_ID: i32 = 1;
28
29/// Shorthand for the `chrono` based timestamp type used in the database.
30pub type DbTimestamp = chrono::DateTime<chrono::Utc>;
31
32/// Represents an already opened transaction.
33/// This is a thin wrapper over [DatabaseTransaction].
34/// The wrapping behavior is needed to allow transaction agnostic functionalities
35/// of the DB traits.
36#[derive(Debug)]
37pub struct OpenTransaction(DatabaseTransaction, TargetDb);
38
39impl OpenTransaction {
40 /// Executes the given `callback` inside the transaction
41 /// and commits the transaction if it succeeds or rollbacks otherwise.
42 #[tracing::instrument(level = "trace", name = "Sql::perform_in_transaction", skip_all, err)]
43 pub async fn perform<F, T, E>(self, callback: F) -> std::result::Result<T, E>
44 where
45 F: for<'c> FnOnce(&'c OpenTransaction) -> BoxFuture<'c, std::result::Result<T, E>> + Send,
46 T: Send,
47 E: std::error::Error + From<DbSqlError>,
48 {
49 let start = std::time::Instant::now();
50 let res = callback(&self).await;
51
52 if res.is_ok() {
53 self.commit().await?;
54 } else {
55 self.rollback().await?;
56 }
57
58 tracing::trace!(
59 elapsed_ms = start.elapsed().as_millis(),
60 was_successful = res.is_ok(),
61 "transaction completed",
62 );
63
64 res
65 }
66
67 /// Commits the transaction.
68 pub async fn commit(self) -> Result<()> {
69 Ok(self.0.commit().await?)
70 }
71
72 /// Rollbacks the transaction.
73 pub async fn rollback(self) -> Result<()> {
74 Ok(self.0.rollback().await?)
75 }
76}
77
78impl AsRef<DatabaseTransaction> for OpenTransaction {
79 fn as_ref(&self) -> &DatabaseTransaction {
80 &self.0
81 }
82}
83
84impl From<OpenTransaction> for DatabaseTransaction {
85 fn from(value: OpenTransaction) -> Self {
86 value.0
87 }
88}
89
90/// Shorthand for optional transaction.
91/// Useful for transaction nesting (see [`HoprDbGeneralModelOperations::nest_transaction`]).
92pub type OptTx<'a> = Option<&'a OpenTransaction>;
93
94/// When Sqlite is used as a backend, model needs to be split
95/// into 4 different databases to avoid locking the database.
96/// On Postgres backend, these should actually point to the same database.
97#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
98pub enum TargetDb {
99 #[default]
100 /// Indexer database.
101 Index,
102 /// RPC logs database
103 Logs,
104}
105
106#[async_trait]
107pub trait HoprDbGeneralModelOperations {
108 /// Returns reference to the database connection.
109 /// Can be used in case transaction is not needed, but
110 /// users should aim to use [`HoprDbGeneralModelOperations::begin_transaction`]
111 /// and [`HoprDbGeneralModelOperations::nest_transaction`] as much as possible.
112 fn conn(&self, target_db: TargetDb) -> &DatabaseConnection;
113
114 /// Creates a new transaction.
115 async fn begin_transaction_in_db(&self, target: TargetDb) -> Result<OpenTransaction>;
116
117 /// Same as [`HoprDbGeneralModelOperations::begin_transaction_in_db`] with default [TargetDb].
118 async fn begin_transaction(&self) -> Result<OpenTransaction> {
119 self.begin_transaction_in_db(Default::default()).await
120 }
121
122 /// Creates a nested transaction inside the given transaction.
123 ///
124 /// If `None` is given, behaves exactly as [`HoprDbGeneralModelOperations::begin_transaction`].
125 ///
126 /// This method is useful for creating APIs that should be agnostic whether they are being
127 /// run from an existing transaction or without it (via [OptTx]).
128 ///
129 /// If `tx` is `Some`, the `target_db` must match with the one in `tx`. In other words,
130 /// nesting across different databases is forbidden and the method will panic.
131 async fn nest_transaction_in_db(&self, tx: OptTx<'_>, target_db: TargetDb) -> Result<OpenTransaction> {
132 if let Some(t) = tx {
133 assert_eq!(t.1, target_db, "attempt to create nest into tx from a different db");
134 Ok(OpenTransaction(t.as_ref().begin().await?, target_db))
135 } else {
136 self.begin_transaction_in_db(target_db).await
137 }
138 }
139
140 /// Same as [`HoprDbGeneralModelOperations::nest_transaction_in_db`] with default [TargetDb].
141 async fn nest_transaction(&self, tx: OptTx<'_>) -> Result<OpenTransaction> {
142 self.nest_transaction_in_db(tx, Default::default()).await
143 }
144
145 /// Import logs database from a snapshot directory.
146 ///
147 /// Replaces all data in the current logs database with data from a snapshot's
148 /// `hopr_logs.db` file. This is used for fast synchronization during node startup.
149 ///
150 /// # Process
151 ///
152 /// 1. Attaches the source database from the snapshot directory
153 /// 2. Clears existing data from all logs-related tables
154 /// 3. Copies all data from the snapshot database
155 /// 4. Detaches the source database
156 ///
157 /// All operations are performed within a single transaction for atomicity.
158 ///
159 /// # Arguments
160 ///
161 /// * `src_dir` - Directory containing the extracted snapshot with `hopr_logs.db`
162 ///
163 /// # Returns
164 ///
165 /// `Ok(())` on successful import, or [`DbSqlError::Construction`] if the source
166 /// database doesn't exist or the import operation fails.
167 ///
168 /// # Errors
169 ///
170 /// - Returns error if `hopr_logs.db` is not found in the source directory
171 /// - Returns error if SQLite ATTACH, data transfer, or DETACH operations fail
172 /// - All database errors are wrapped in [`DbSqlError::Construction`]
173 ///
174 /// # Example
175 ///
176 /// ```no_run
177 /// # use std::path::PathBuf;
178 /// # use hopr_db_sql::HoprDbGeneralModelOperations;
179 /// # async fn example(db: impl HoprDbGeneralModelOperations) -> Result<(), Box<dyn std::error::Error>> {
180 /// let snapshot_dir = PathBuf::from("/tmp/snapshot_extracted");
181 /// db.import_logs_db(snapshot_dir).await?;
182 /// # Ok(())
183 /// # }
184 /// ```
185 async fn import_logs_db(self, src_dir: PathBuf) -> Result<()>;
186}
187
188#[async_trait]
189impl HoprDbGeneralModelOperations for HoprIndexerDb {
190 /// Retrieves raw database connection to the given [DB](TargetDb).
191 fn conn(&self, target_db: TargetDb) -> &DatabaseConnection {
192 match target_db {
193 TargetDb::Index => self.index_db.read_only(), // TODO: no write access needed here, deserves better
194 TargetDb::Logs => &self.logs_db,
195 }
196 }
197
198 /// Starts a new transaction in the given [DB](TargetDb).
199 async fn begin_transaction_in_db(&self, target_db: TargetDb) -> Result<OpenTransaction> {
200 match target_db {
201 TargetDb::Index => Ok(OpenTransaction(
202 self.index_db.read_write().begin_with_config(None, None).await?, /* TODO: cannot estimate intent,
203 * must be readwrite */
204 target_db,
205 )),
206 TargetDb::Logs => Ok(OpenTransaction(
207 self.logs_db.begin_with_config(None, None).await?,
208 target_db,
209 )),
210 }
211 }
212
213 async fn import_logs_db(self, src_dir: PathBuf) -> crate::errors::Result<()> {
214 let src_db_path = src_dir.join("hopr_logs.db");
215 if !src_db_path.exists() {
216 return Err(DbSqlError::Construction(format!(
217 "Source logs database file does not exist: {}",
218 src_db_path.display()
219 )));
220 }
221
222 let sql = format!(
223 r#"
224 ATTACH DATABASE '{}' AS source_logs;
225 BEGIN TRANSACTION;
226 DELETE FROM log;
227 DELETE FROM log_status;
228 DELETE FROM log_topic_info;
229 INSERT INTO log_topic_info SELECT * FROM source_logs.log_topic_info;
230 INSERT INTO log_status SELECT * FROM source_logs.log_status;
231 INSERT INTO log SELECT * FROM source_logs.log;
232 COMMIT;
233 DETACH DATABASE source_logs;
234 "#,
235 src_db_path.to_string_lossy().replace("'", "''")
236 );
237
238 let logs_conn = self.conn(TargetDb::Logs);
239
240 logs_conn
241 .execute_unprepared(sql.as_str())
242 .await
243 .map_err(|e| DbSqlError::Construction(format!("Failed to import logs data: {e}")))?;
244
245 Ok(())
246 }
247}
248
249#[doc(hidden)]
250pub mod prelude {
251 pub use super::*;
252 pub use crate::{accounts::*, channels::*, corrupted_channels::*, db::*, errors::*, info::*};
253}