hopr_db_sql/
db.rs

1use std::{
2    fs,
3    path::{Path, PathBuf},
4    time::Duration,
5};
6
7use hopr_crypto_types::{keypairs::Keypair, prelude::ChainKeypair};
8use hopr_db_entity::prelude::{Account, Announcement};
9use hopr_primitive_types::primitives::Address;
10use migration::{MigratorChainLogs, MigratorIndex, MigratorTrait};
11use sea_orm::{EntityTrait, SqlxSqliteConnector};
12use sqlx::{
13    ConnectOptions, SqlitePool,
14    pool::PoolOptions,
15    sqlite::{SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous},
16};
17use tracing::log::LevelFilter;
18use validator::Validate;
19
20use crate::{
21    cache::{CacheKeyMapper, HoprIndexerDbCaches},
22    errors::{DbSqlError, Result},
23    prelude::model_to_account_entry,
24};
25
26#[derive(Debug, Clone, PartialEq, Eq, smart_default::SmartDefault, validator::Validate)]
27pub struct HoprIndexerDbConfig {
28    #[default(true)]
29    pub create_if_missing: bool,
30    #[default(false)]
31    pub force_create: bool,
32    #[default(Duration::from_secs(5))]
33    pub log_slow_queries: Duration,
34}
35
36#[cfg(feature = "sqlite")]
37#[derive(Debug, Clone)]
38pub(crate) struct DbConnection {
39    ro: sea_orm::DatabaseConnection,
40    rw: sea_orm::DatabaseConnection,
41}
42
43#[cfg(feature = "sqlite")]
44impl DbConnection {
45    pub fn read_only(&self) -> &sea_orm::DatabaseConnection {
46        &self.ro
47    }
48
49    pub fn read_write(&self) -> &sea_orm::DatabaseConnection {
50        &self.rw
51    }
52}
53
54/// Main database handle for HOPR node operations.
55///
56/// Manages multiple SQLite databases for different data domains to avoid
57/// locking conflicts and improve performance:
58///
59/// - **Index DB**: Blockchain indexing and contract data
60/// - **Tickets DB**: Payment tickets and acknowledgments
61/// - **Peers DB**: Network peer information and metadata
62/// - **Logs DB**: Blockchain logs and processing status
63///
64/// Supports database snapshot imports for fast synchronization via
65/// [`HoprDbGeneralModelOperations::import_logs_db`].
66#[derive(Debug, Clone)]
67pub struct HoprIndexerDb {
68    pub(crate) index_db: DbConnection,
69    pub(crate) logs_db: sea_orm::DatabaseConnection,
70    pub me_onchain: Address,
71    pub(crate) caches: HoprIndexerDbCaches,
72}
73
74/// Filename for the blockchain-indexing database.
75pub const SQL_DB_INDEX_FILE_NAME: &str = "hopr_index.db";
76
77/// Filename for the blockchain logs database (used in snapshots).
78pub const SQL_DB_LOGS_FILE_NAME: &str = "hopr_logs.db";
79
80impl HoprIndexerDb {
81    pub async fn new(directory: &Path, chain_key: ChainKeypair, cfg: HoprIndexerDbConfig) -> Result<Self> {
82        cfg.validate()
83            .map_err(|e| DbSqlError::Construction(format!("failed configuration validation: {e}")))?;
84
85        fs::create_dir_all(directory)
86            .map_err(|_e| DbSqlError::Construction(format!("cannot create main database directory {directory:?}")))?;
87
88        let index = Self::create_pool(
89            cfg.clone(),
90            directory.to_path_buf(),
91            PoolOptions::new(),
92            Some(0),
93            Some(1),
94            false,
95            SQL_DB_INDEX_FILE_NAME,
96        )
97        .await?;
98
99        #[cfg(feature = "sqlite")]
100        let index_ro = Self::create_pool(
101            cfg.clone(),
102            directory.to_path_buf(),
103            PoolOptions::new(),
104            Some(0),
105            Some(30),
106            true,
107            SQL_DB_INDEX_FILE_NAME,
108        )
109        .await?;
110
111        let logs = Self::create_pool(
112            cfg.clone(),
113            directory.to_path_buf(),
114            PoolOptions::new(),
115            Some(0),
116            None,
117            false,
118            SQL_DB_LOGS_FILE_NAME,
119        )
120        .await?;
121
122        #[cfg(feature = "sqlite")]
123        Self::new_sqlx_sqlite(chain_key, index, index_ro, logs).await
124    }
125
126    #[cfg(feature = "sqlite")]
127    pub async fn new_in_memory(chain_key: ChainKeypair) -> Result<Self> {
128        let index_db = SqlitePool::connect(":memory:")
129            .await
130            .map_err(|e| DbSqlError::Construction(e.to_string()))?;
131
132        Self::new_sqlx_sqlite(
133            chain_key,
134            index_db.clone(),
135            index_db,
136            SqlitePool::connect(":memory:")
137                .await
138                .map_err(|e| DbSqlError::Construction(e.to_string()))?,
139        )
140        .await
141    }
142
143    #[cfg(feature = "sqlite")]
144    async fn new_sqlx_sqlite(
145        chain_key: ChainKeypair,
146        index_db_pool: SqlitePool,
147        index_db_ro_pool: SqlitePool,
148        logs_db_pool: SqlitePool,
149    ) -> Result<Self> {
150        let index_db_rw = SqlxSqliteConnector::from_sqlx_sqlite_pool(index_db_pool);
151        let index_db_ro = SqlxSqliteConnector::from_sqlx_sqlite_pool(index_db_ro_pool);
152
153        MigratorIndex::up(&index_db_rw, None)
154            .await
155            .map_err(|e| DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
156
157        let logs_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(logs_db_pool.clone());
158
159        MigratorChainLogs::up(&logs_db, None)
160            .await
161            .map_err(|e| DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
162
163        let caches = HoprIndexerDbCaches::default();
164
165        Account::find()
166            .find_with_related(Announcement)
167            .all(&index_db_rw)
168            .await?
169            .into_iter()
170            .try_for_each(|(a, b)| match model_to_account_entry(a, b) {
171                Ok(account) => caches.key_id_mapper.update_key_id_binding(&account),
172                Err(error) => {
173                    // Undecodeable accounts are skipped and will be unreachable
174                    tracing::error!(%error, "undecodeable account");
175                    Ok(())
176                }
177            })?;
178
179        Ok(Self {
180            me_onchain: chain_key.public().to_address(),
181            index_db: DbConnection {
182                ro: index_db_ro,
183                rw: index_db_rw,
184            },
185            logs_db,
186            caches,
187        })
188    }
189
190    /// Default SQLite config values for all DBs with RW  (read-write) access.
191    fn common_connection_cfg_rw(cfg: HoprIndexerDbConfig) -> SqliteConnectOptions {
192        SqliteConnectOptions::default()
193            .create_if_missing(cfg.create_if_missing)
194            .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
195            .log_statements(LevelFilter::Debug)
196            .journal_mode(SqliteJournalMode::Wal)
197            .synchronous(SqliteSynchronous::Normal)
198            .auto_vacuum(SqliteAutoVacuum::Full)
199            //.optimize_on_close(true, None) // Removed, because it causes optimization on each connection, due to min_connections being set to 0
200            .page_size(4096)
201            .pragma("cache_size", "-30000") // 32M
202            .pragma("busy_timeout", "1000") // 1000ms
203    }
204
205    /// Default SQLite config values for all DBs with RO (read-only) access.
206    fn common_connection_cfg_ro(cfg: HoprIndexerDbConfig) -> SqliteConnectOptions {
207        SqliteConnectOptions::default()
208            .create_if_missing(cfg.create_if_missing)
209            .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
210            .log_statements(LevelFilter::Debug)
211            //.optimize_on_close(true, None) // Removed, because it causes optimization on each connection, due to min_connections being set to 0
212            .page_size(4096)
213            .pragma("cache_size", "-30000") // 32M
214            .pragma("busy_timeout", "1000") // 1000ms
215            .read_only(true)
216    }
217
218    pub async fn create_pool(
219        cfg: HoprIndexerDbConfig,
220        directory: PathBuf,
221        mut options: PoolOptions<sqlx::Sqlite>,
222        min_conn: Option<u32>,
223        max_conn: Option<u32>,
224        read_only: bool,
225        path: &str,
226    ) -> Result<SqlitePool> {
227        if let Some(min_conn) = min_conn {
228            options = options.min_connections(min_conn);
229        }
230        if let Some(max_conn) = max_conn {
231            options = options.max_connections(max_conn);
232        }
233
234        let cfg = if read_only {
235            Self::common_connection_cfg_ro(cfg)
236        } else {
237            Self::common_connection_cfg_rw(cfg)
238        };
239
240        let pool = options
241            .connect_with(cfg.filename(directory.join(path)))
242            .await
243            .map_err(|e| DbSqlError::Construction(format!("failed to create {path} database: {e}")))?;
244
245        Ok(pool)
246    }
247
248    pub fn key_id_mapper_ref(&self) -> &CacheKeyMapper {
249        &self.caches.key_id_mapper
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
256    use migration::{MigratorChainLogs, MigratorIndex, MigratorTrait};
257
258    use super::*;
259    use crate::{HoprDbGeneralModelOperations, TargetDb}; // 0.8
260
261    #[tokio::test]
262    async fn test_basic_db_init() -> anyhow::Result<()> {
263        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
264
265        // NOTE: cfg-if this on Postgres to do only `Migrator::status(db.conn(Default::default)).await.expect("status
266        // must be ok");`
267        MigratorIndex::status(db.conn(TargetDb::Index)).await?;
268        MigratorChainLogs::status(db.conn(TargetDb::Logs)).await?;
269
270        Ok(())
271    }
272}