hopr_db_sql/
lib.rs

1//! Crate for accessing database(s) of a HOPR node.
2//!
3//! Functionality defined here is meant to be used mostly by other higher-level crates.
4//! The crate provides database operations across multiple SQLite databases for scalability
5//! and supports importing logs database snapshots for fast synchronization.
6
7pub mod accounts;
8mod cache;
9pub mod channels;
10pub mod corrupted_channels;
11pub mod db;
12pub mod errors;
13pub mod info;
14pub mod logs;
15
16use std::path::PathBuf;
17
18use async_trait::async_trait;
19pub use cache::CacheKeyMapper;
20pub use db::{HoprIndexerDb, HoprIndexerDbConfig};
21use errors::{DbSqlError, Result};
22use futures::future::BoxFuture;
23use sea_orm::{ConnectionTrait, TransactionTrait};
24pub use sea_orm::{DatabaseConnection, DatabaseTransaction};
25
26/// Primary key used in tables that contain only a single row.
27pub const SINGULAR_TABLE_FIXED_ID: i32 = 1;
28
29/// Shorthand for the `chrono` based timestamp type used in the database.
30pub type DbTimestamp = chrono::DateTime<chrono::Utc>;
31
32/// Represents an already opened transaction.
33/// This is a thin wrapper over [DatabaseTransaction].
34/// The wrapping behavior is needed to allow transaction agnostic functionalities
35/// of the DB traits.
36#[derive(Debug)]
37pub struct OpenTransaction(DatabaseTransaction, TargetDb);
38
39impl OpenTransaction {
40    /// Executes the given `callback` inside the transaction
41    /// and commits the transaction if it succeeds or rollbacks otherwise.
42    #[tracing::instrument(level = "trace", name = "Sql::perform_in_transaction", skip_all, err)]
43    pub async fn perform<F, T, E>(self, callback: F) -> std::result::Result<T, E>
44    where
45        F: for<'c> FnOnce(&'c OpenTransaction) -> BoxFuture<'c, std::result::Result<T, E>> + Send,
46        T: Send,
47        E: std::error::Error + From<DbSqlError>,
48    {
49        let start = std::time::Instant::now();
50        let res = callback(&self).await;
51
52        if res.is_ok() {
53            self.commit().await?;
54        } else {
55            self.rollback().await?;
56        }
57
58        tracing::trace!(
59            elapsed_ms = start.elapsed().as_millis(),
60            was_successful = res.is_ok(),
61            "transaction completed",
62        );
63
64        res
65    }
66
67    /// Commits the transaction.
68    pub async fn commit(self) -> Result<()> {
69        Ok(self.0.commit().await?)
70    }
71
72    /// Rollbacks the transaction.
73    pub async fn rollback(self) -> Result<()> {
74        Ok(self.0.rollback().await?)
75    }
76}
77
78impl AsRef<DatabaseTransaction> for OpenTransaction {
79    fn as_ref(&self) -> &DatabaseTransaction {
80        &self.0
81    }
82}
83
84impl From<OpenTransaction> for DatabaseTransaction {
85    fn from(value: OpenTransaction) -> Self {
86        value.0
87    }
88}
89
90/// Shorthand for optional transaction.
91/// Useful for transaction nesting (see [`HoprDbGeneralModelOperations::nest_transaction`]).
92pub type OptTx<'a> = Option<&'a OpenTransaction>;
93
94/// When Sqlite is used as a backend, model needs to be split
95/// into 4 different databases to avoid locking the database.
96/// On Postgres backend, these should actually point to the same database.
97#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
98pub enum TargetDb {
99    #[default]
100    /// Indexer database.
101    Index,
102    /// RPC logs database
103    Logs,
104}
105
106#[async_trait]
107pub trait HoprDbGeneralModelOperations {
108    /// Returns reference to the database connection.
109    /// Can be used in case transaction is not needed, but
110    /// users should aim to use [`HoprDbGeneralModelOperations::begin_transaction`]
111    /// and [`HoprDbGeneralModelOperations::nest_transaction`] as much as possible.
112    fn conn(&self, target_db: TargetDb) -> &DatabaseConnection;
113
114    /// Creates a new transaction.
115    async fn begin_transaction_in_db(&self, target: TargetDb) -> Result<OpenTransaction>;
116
117    /// Same as [`HoprDbGeneralModelOperations::begin_transaction_in_db`] with default [TargetDb].
118    async fn begin_transaction(&self) -> Result<OpenTransaction> {
119        self.begin_transaction_in_db(Default::default()).await
120    }
121
122    /// Creates a nested transaction inside the given transaction.
123    ///
124    /// If `None` is given, behaves exactly as [`HoprDbGeneralModelOperations::begin_transaction`].
125    ///
126    /// This method is useful for creating APIs that should be agnostic whether they are being
127    /// run from an existing transaction or without it (via [OptTx]).
128    ///
129    /// If `tx` is `Some`, the `target_db` must match with the one in `tx`. In other words,
130    /// nesting across different databases is forbidden and the method will panic.
131    async fn nest_transaction_in_db(&self, tx: OptTx<'_>, target_db: TargetDb) -> Result<OpenTransaction> {
132        if let Some(t) = tx {
133            assert_eq!(t.1, target_db, "attempt to create nest into tx from a different db");
134            Ok(OpenTransaction(t.as_ref().begin().await?, target_db))
135        } else {
136            self.begin_transaction_in_db(target_db).await
137        }
138    }
139
140    /// Same as [`HoprDbGeneralModelOperations::nest_transaction_in_db`] with default [TargetDb].
141    async fn nest_transaction(&self, tx: OptTx<'_>) -> Result<OpenTransaction> {
142        self.nest_transaction_in_db(tx, Default::default()).await
143    }
144
145    /// Import logs database from a snapshot directory.
146    ///
147    /// Replaces all data in the current logs database with data from a snapshot's
148    /// `hopr_logs.db` file. This is used for fast synchronization during node startup.
149    ///
150    /// # Process
151    ///
152    /// 1. Attaches the source database from the snapshot directory
153    /// 2. Clears existing data from all logs-related tables
154    /// 3. Copies all data from the snapshot database
155    /// 4. Detaches the source database
156    ///
157    /// All operations are performed within a single transaction for atomicity.
158    ///
159    /// # Arguments
160    ///
161    /// * `src_dir` - Directory containing the extracted snapshot with `hopr_logs.db`
162    ///
163    /// # Returns
164    ///
165    /// `Ok(())` on successful import, or [`DbSqlError::Construction`] if the source
166    /// database doesn't exist or the import operation fails.
167    ///
168    /// # Errors
169    ///
170    /// - Returns error if `hopr_logs.db` is not found in the source directory
171    /// - Returns error if SQLite ATTACH, data transfer, or DETACH operations fail
172    /// - All database errors are wrapped in [`DbSqlError::Construction`]
173    ///
174    /// # Example
175    ///
176    /// ```no_run
177    /// # use std::path::PathBuf;
178    /// # use hopr_db_sql::HoprDbGeneralModelOperations;
179    /// # async fn example(db: impl HoprDbGeneralModelOperations) -> Result<(), Box<dyn std::error::Error>> {
180    /// let snapshot_dir = PathBuf::from("/tmp/snapshot_extracted");
181    /// db.import_logs_db(snapshot_dir).await?;
182    /// # Ok(())
183    /// # }
184    /// ```
185    async fn import_logs_db(self, src_dir: PathBuf) -> Result<()>;
186}
187
188#[async_trait]
189impl HoprDbGeneralModelOperations for HoprIndexerDb {
190    /// Retrieves raw database connection to the given [DB](TargetDb).
191    fn conn(&self, target_db: TargetDb) -> &DatabaseConnection {
192        match target_db {
193            TargetDb::Index => self.index_db.read_only(), // TODO: no write access needed here, deserves better
194            TargetDb::Logs => &self.logs_db,
195        }
196    }
197
198    /// Starts a new transaction in the given [DB](TargetDb).
199    async fn begin_transaction_in_db(&self, target_db: TargetDb) -> Result<OpenTransaction> {
200        match target_db {
201            TargetDb::Index => Ok(OpenTransaction(
202                self.index_db.read_write().begin_with_config(None, None).await?, /* TODO: cannot estimate intent,
203                                                                                  * must be readwrite */
204                target_db,
205            )),
206            TargetDb::Logs => Ok(OpenTransaction(
207                self.logs_db.begin_with_config(None, None).await?,
208                target_db,
209            )),
210        }
211    }
212
213    async fn import_logs_db(self, src_dir: PathBuf) -> crate::errors::Result<()> {
214        let src_db_path = src_dir.join("hopr_logs.db");
215        if !src_db_path.exists() {
216            return Err(DbSqlError::Construction(format!(
217                "Source logs database file does not exist: {}",
218                src_db_path.display()
219            )));
220        }
221
222        let sql = format!(
223            r#"
224            ATTACH DATABASE '{}' AS source_logs;
225            BEGIN TRANSACTION;
226            DELETE FROM log;
227            DELETE FROM log_status;
228            DELETE FROM log_topic_info;
229            INSERT INTO log_topic_info SELECT * FROM source_logs.log_topic_info;
230            INSERT INTO log_status SELECT * FROM source_logs.log_status;
231            INSERT INTO log SELECT * FROM source_logs.log;
232            COMMIT;
233            DETACH DATABASE source_logs;
234        "#,
235            src_db_path.to_string_lossy().replace("'", "''")
236        );
237
238        let logs_conn = self.conn(TargetDb::Logs);
239
240        logs_conn
241            .execute_unprepared(sql.as_str())
242            .await
243            .map_err(|e| DbSqlError::Construction(format!("Failed to import logs data: {e}")))?;
244
245        Ok(())
246    }
247}
248
249#[doc(hidden)]
250pub mod prelude {
251    pub use super::*;
252    pub use crate::{accounts::*, channels::*, corrupted_channels::*, db::*, errors::*, info::*};
253}