hopr_db_sql/
lib.rs

1//! Crate for accessing database(s) of a HOPR node.
2//!
3//! Functionality defined here is meant to be used mostly by other higher-level crates.
4//! The crate provides database operations across multiple SQLite databases for scalability
5//! and supports importing logs database snapshots for fast synchronization.
6
7pub mod accounts;
8mod cache;
9pub mod channels;
10pub mod corrupted_channels;
11pub mod db;
12pub mod errors;
13pub mod info;
14pub mod logs;
15pub mod peers;
16pub mod protocol;
17pub mod registry;
18pub mod resolver;
19mod ticket_manager;
20pub mod tickets;
21
22use std::path::PathBuf;
23
24use async_trait::async_trait;
25use futures::future::BoxFuture;
26pub use hopr_db_api as api;
27use hopr_db_api::{
28    logs::HoprDbLogOperations, peers::HoprDbPeersOperations, protocol::HoprDbProtocolOperations,
29    resolver::HoprDbResolverOperations, tickets::HoprDbTicketOperations,
30};
31use sea_orm::{ConnectionTrait, TransactionTrait};
32pub use sea_orm::{DatabaseConnection, DatabaseTransaction};
33
34use crate::{
35    accounts::HoprDbAccountOperations,
36    channels::HoprDbChannelOperations,
37    corrupted_channels::HoprDbCorruptedChannelOperations,
38    db::HoprDb,
39    errors::{DbSqlError, Result},
40    info::HoprDbInfoOperations,
41    registry::HoprDbRegistryOperations,
42};
43
44/// Primary key used in tables that contain only a single row.
45pub const SINGULAR_TABLE_FIXED_ID: i32 = 1;
46
47/// Shorthand for the `chrono` based timestamp type used in the database.
48pub type DbTimestamp = chrono::DateTime<chrono::Utc>;
49
50/// Represents an already opened transaction.
51/// This is a thin wrapper over [DatabaseTransaction].
52/// The wrapping behavior is needed to allow transaction agnostic functionalities
53/// of the DB traits.
54#[derive(Debug)]
55pub struct OpenTransaction(DatabaseTransaction, TargetDb);
56
57impl OpenTransaction {
58    /// Executes the given `callback` inside the transaction
59    /// and commits the transaction if it succeeds or rollbacks otherwise.
60    #[tracing::instrument(level = "trace", name = "Sql::perform_in_transaction", skip_all, err)]
61    pub async fn perform<F, T, E>(self, callback: F) -> std::result::Result<T, E>
62    where
63        F: for<'c> FnOnce(&'c OpenTransaction) -> BoxFuture<'c, std::result::Result<T, E>> + Send,
64        T: Send,
65        E: std::error::Error + From<DbSqlError>,
66    {
67        let start = std::time::Instant::now();
68        let res = callback(&self).await;
69
70        if res.is_ok() {
71            self.commit().await?;
72        } else {
73            self.rollback().await?;
74        }
75
76        tracing::trace!(
77            elapsed_ms = start.elapsed().as_millis(),
78            was_successful = res.is_ok(),
79            "transaction completed",
80        );
81
82        res
83    }
84
85    /// Commits the transaction.
86    pub async fn commit(self) -> Result<()> {
87        Ok(self.0.commit().await?)
88    }
89
90    /// Rollbacks the transaction.
91    pub async fn rollback(self) -> Result<()> {
92        Ok(self.0.rollback().await?)
93    }
94}
95
96impl AsRef<DatabaseTransaction> for OpenTransaction {
97    fn as_ref(&self) -> &DatabaseTransaction {
98        &self.0
99    }
100}
101
102impl From<OpenTransaction> for DatabaseTransaction {
103    fn from(value: OpenTransaction) -> Self {
104        value.0
105    }
106}
107
108/// Shorthand for optional transaction.
109/// Useful for transaction nesting (see [`HoprDbGeneralModelOperations::nest_transaction`]).
110pub type OptTx<'a> = Option<&'a OpenTransaction>;
111
112/// When Sqlite is used as a backend, model needs to be split
113/// into 4 different databases to avoid locking the database.
114/// On Postgres backend, these should actually point to the same database.
115#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
116pub enum TargetDb {
117    #[default]
118    /// Indexer database.
119    Index,
120    /// Acknowledged winning ticket database.
121    Tickets,
122    /// Network peers database
123    Peers,
124    /// RPC logs database
125    Logs,
126}
127
128#[async_trait]
129pub trait HoprDbGeneralModelOperations {
130    /// Returns reference to the database connection.
131    /// Can be used in case transaction is not needed, but
132    /// users should aim to use [`HoprDbGeneralModelOperations::begin_transaction`]
133    /// and [`HoprDbGeneralModelOperations::nest_transaction`] as much as possible.
134    fn conn(&self, target_db: TargetDb) -> &DatabaseConnection;
135
136    /// Creates a new transaction.
137    async fn begin_transaction_in_db(&self, target: TargetDb) -> Result<OpenTransaction>;
138
139    /// Import logs database from a snapshot directory.
140    ///
141    /// Replaces all data in the current logs database with data from a snapshot's
142    /// `hopr_logs.db` file. This is used for fast synchronization during node startup.
143    ///
144    /// # Process
145    ///
146    /// 1. Attaches the source database from the snapshot directory
147    /// 2. Clears existing data from all logs-related tables
148    /// 3. Copies all data from the snapshot database
149    /// 4. Detaches the source database
150    ///
151    /// All operations are performed within a single transaction for atomicity.
152    ///
153    /// # Arguments
154    ///
155    /// * `src_dir` - Directory containing the extracted snapshot with `hopr_logs.db`
156    ///
157    /// # Returns
158    ///
159    /// `Ok(())` on successful import, or [`DbSqlError::Construction`] if the source
160    /// database doesn't exist or the import operation fails.
161    ///
162    /// # Errors
163    ///
164    /// - Returns error if `hopr_logs.db` is not found in the source directory
165    /// - Returns error if SQLite ATTACH, data transfer, or DETACH operations fail
166    /// - All database errors are wrapped in [`DbSqlError::Construction`]
167    ///
168    /// # Example
169    ///
170    /// ```no_run
171    /// # use std::path::PathBuf;
172    /// # use hopr_db_sql::HoprDbGeneralModelOperations;
173    /// # async fn example(db: impl HoprDbGeneralModelOperations) -> Result<(), Box<dyn std::error::Error>> {
174    /// let snapshot_dir = PathBuf::from("/tmp/snapshot_extracted");
175    /// db.import_logs_db(snapshot_dir).await?;
176    /// # Ok(())
177    /// # }
178    /// ```
179    async fn import_logs_db(self, src_dir: PathBuf) -> Result<()>;
180
181    /// Same as [`HoprDbGeneralModelOperations::begin_transaction_in_db`] with default [TargetDb].
182    async fn begin_transaction(&self) -> Result<OpenTransaction> {
183        self.begin_transaction_in_db(Default::default()).await
184    }
185
186    /// Creates a nested transaction inside the given transaction.
187    ///
188    /// If `None` is given, behaves exactly as [`HoprDbGeneralModelOperations::begin_transaction`].
189    ///
190    /// This method is useful for creating APIs that should be agnostic whether they are being
191    /// run from an existing transaction or without it (via [OptTx]).
192    ///
193    /// If `tx` is `Some`, the `target_db` must match with the one in `tx`. In other words,
194    /// nesting across different databases is forbidden and the method will panic.
195    async fn nest_transaction_in_db(&self, tx: OptTx<'_>, target_db: TargetDb) -> Result<OpenTransaction> {
196        if let Some(t) = tx {
197            assert_eq!(t.1, target_db, "attempt to create nest into tx from a different db");
198            Ok(OpenTransaction(t.as_ref().begin().await?, target_db))
199        } else {
200            self.begin_transaction_in_db(target_db).await
201        }
202    }
203
204    /// Same as [`HoprDbGeneralModelOperations::nest_transaction_in_db`] with default [TargetDb].
205    async fn nest_transaction(&self, tx: OptTx<'_>) -> Result<OpenTransaction> {
206        self.nest_transaction_in_db(tx, Default::default()).await
207    }
208}
209
210#[async_trait]
211impl HoprDbGeneralModelOperations for HoprDb {
212    /// Retrieves raw database connection to the given [DB](TargetDb).
213    fn conn(&self, target_db: TargetDb) -> &DatabaseConnection {
214        match target_db {
215            TargetDb::Index => self.index_db.read_only(), // TODO: no write access needed here, deserves better
216            // wrapping
217            TargetDb::Tickets => &self.tickets_db,
218            TargetDb::Peers => &self.peers_db,
219            TargetDb::Logs => &self.logs_db,
220        }
221    }
222
223    /// Starts a new transaction in the given [DB](TargetDb).
224    async fn begin_transaction_in_db(&self, target_db: TargetDb) -> Result<OpenTransaction> {
225        match target_db {
226            TargetDb::Index => Ok(OpenTransaction(
227                self.index_db.read_write().begin_with_config(None, None).await?, /* TODO: cannot estimate intent,
228                                                                                  * must be readwrite */
229                target_db,
230            )),
231            // TODO: when adding Postgres support, redirect `Tickets` and `Peers` into `self.db`
232            TargetDb::Tickets => Ok(OpenTransaction(
233                self.tickets_db.begin_with_config(None, None).await?,
234                target_db,
235            )),
236            TargetDb::Peers => Ok(OpenTransaction(
237                self.peers_db.begin_with_config(None, None).await?,
238                target_db,
239            )),
240            TargetDb::Logs => Ok(OpenTransaction(
241                self.logs_db.begin_with_config(None, None).await?,
242                target_db,
243            )),
244        }
245    }
246
247    async fn import_logs_db(self, src_dir: PathBuf) -> Result<()> {
248        let src_db_path = src_dir.join("hopr_logs.db");
249        if !src_db_path.exists() {
250            return Err(DbSqlError::Construction(format!(
251                "Source logs database file does not exist: {}",
252                src_db_path.display()
253            )));
254        }
255
256        let sql = format!(
257            r#"
258            ATTACH DATABASE '{}' AS source_logs;
259            BEGIN TRANSACTION;
260            DELETE FROM log;
261            DELETE FROM log_status;
262            DELETE FROM log_topic_info;
263            INSERT INTO log_topic_info SELECT * FROM source_logs.log_topic_info;
264            INSERT INTO log_status SELECT * FROM source_logs.log_status;
265            INSERT INTO log SELECT * FROM source_logs.log;
266            COMMIT;
267            DETACH DATABASE source_logs;
268        "#,
269            src_db_path.to_string_lossy().replace("'", "''")
270        );
271
272        let logs_conn = self.conn(TargetDb::Logs);
273
274        logs_conn
275            .execute_unprepared(sql.as_str())
276            .await
277            .map_err(|e| DbSqlError::Construction(format!("Failed to import logs data: {e}")))?;
278
279        Ok(())
280    }
281}
282
283/// Convenience trait that contain all HOPR DB operations crates.
284pub trait HoprDbAllOperations:
285    HoprDbGeneralModelOperations
286    + HoprDbAccountOperations
287    + HoprDbChannelOperations
288    + HoprDbCorruptedChannelOperations
289    + HoprDbInfoOperations
290    + HoprDbLogOperations
291    + HoprDbPeersOperations
292    + HoprDbProtocolOperations
293    + HoprDbRegistryOperations
294    + HoprDbResolverOperations
295    + HoprDbTicketOperations
296{
297}
298
299#[doc(hidden)]
300pub mod prelude {
301    pub use hopr_db_api::{logs::*, peers::*, protocol::*, resolver::*, tickets::*};
302
303    pub use super::*;
304    pub use crate::{accounts::*, channels::*, corrupted_channels::*, db::*, errors::*, info::*, registry::*};
305}