hopr_db_node/
db.rs

1use std::{
2    fs,
3    path::{Path, PathBuf},
4    sync::Arc,
5    time::Duration,
6};
7
8use hopr_internal_types::channels::ChannelId;
9use hopr_primitive_types::prelude::HoprBalance;
10use migration::{MigratorTickets, MigratorTrait};
11use sea_orm::SqlxSqliteConnector;
12use sqlx::{
13    ConnectOptions, SqlitePool,
14    pool::PoolOptions,
15    sqlite::{SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous},
16};
17use tracing::log::LevelFilter;
18use validator::Validate;
19
20use crate::errors::NodeDbError;
21
22/// Filename for the payment tickets database.
23pub const SQL_DB_TICKETS_FILE_NAME: &str = "hopr_tickets.db";
24
25#[derive(Clone, Debug, validator::Validate, smart_default::SmartDefault)]
26pub struct HoprNodeDbConfig {
27    #[default(true)]
28    pub create_if_missing: bool,
29    #[default(false)]
30    pub force_create: bool,
31    #[default(Duration::from_secs(5))]
32    pub log_slow_queries: Duration,
33}
34
35#[derive(Clone)]
36pub struct HoprNodeDb {
37    pub(crate) tickets_db: sea_orm::DatabaseConnection,
38    pub(crate) tickets_write_lock: Arc<async_lock::Mutex<()>>,
39    pub(crate) cfg: HoprNodeDbConfig,
40    // This value must be cached here, due to complicated invalidation logic.
41    pub(crate) unrealized_value: moka::future::Cache<(ChannelId, u32), HoprBalance>,
42}
43
44impl HoprNodeDb {
45    pub async fn new(directory: &Path, cfg: HoprNodeDbConfig) -> Result<Self, NodeDbError> {
46        cfg.validate().map_err(|e| NodeDbError::Other(e.into()))?;
47
48        fs::create_dir_all(directory).map_err(|e| NodeDbError::Other(e.into()))?;
49
50        let tickets = Self::create_pool(
51            cfg.clone(),
52            directory.to_path_buf(),
53            PoolOptions::new(),
54            Some(0),
55            Some(50),
56            SQL_DB_TICKETS_FILE_NAME,
57        )
58        .await?;
59
60        #[cfg(feature = "sqlite")]
61        Self::new_sqlx_sqlite(tickets, cfg).await
62    }
63
64    #[cfg(feature = "sqlite")]
65    pub async fn new_in_memory() -> Result<Self, NodeDbError> {
66        Self::new_sqlx_sqlite(
67            SqlitePool::connect(":memory:")
68                .await
69                .map_err(|e| NodeDbError::Other(e.into()))?,
70            Default::default(),
71        )
72        .await
73    }
74
75    #[cfg(feature = "sqlite")]
76    async fn new_sqlx_sqlite(tickets_db_pool: SqlitePool, cfg: HoprNodeDbConfig) -> Result<Self, NodeDbError> {
77        let tickets_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(tickets_db_pool);
78        MigratorTickets::up(&tickets_db, None).await?;
79
80        Ok(Self {
81            tickets_write_lock: Arc::new(async_lock::Mutex::new(())),
82            unrealized_value: moka::future::CacheBuilder::new(10_000)
83                .time_to_idle(std::time::Duration::from_secs(30))
84                .build(),
85            tickets_db,
86            cfg,
87        })
88    }
89
90    async fn create_pool(
91        cfg: HoprNodeDbConfig,
92        directory: PathBuf,
93        mut options: PoolOptions<sqlx::Sqlite>,
94        min_conn: Option<u32>,
95        max_conn: Option<u32>,
96        path: &str,
97    ) -> Result<SqlitePool, NodeDbError> {
98        if let Some(min_conn) = min_conn {
99            options = options.min_connections(min_conn);
100        }
101        if let Some(max_conn) = max_conn {
102            options = options.max_connections(max_conn);
103        }
104
105        let sqlite_cfg = SqliteConnectOptions::default()
106            .create_if_missing(cfg.create_if_missing)
107            .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
108            .log_statements(LevelFilter::Debug)
109            .journal_mode(SqliteJournalMode::Wal)
110            .synchronous(SqliteSynchronous::Normal)
111            .auto_vacuum(SqliteAutoVacuum::Full)
112            //.optimize_on_close(true, None) // Removed, because it causes optimization on each connection, due to min_connections being set to 0
113            .page_size(4096)
114            .pragma("cache_size", "-30000") // 32M
115            .pragma("busy_timeout", "1000"); // 1000ms
116
117        let pool = options.connect_with(sqlite_cfg.filename(directory.join(path))).await?;
118
119        Ok(pool)
120    }
121
122    pub fn config(&self) -> &HoprNodeDbConfig {
123        &self.cfg
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130
131    #[tokio::test]
132    async fn test_basic_db_init() -> anyhow::Result<()> {
133        let db = HoprNodeDb::new_in_memory().await?;
134        MigratorTickets::status(&db.tickets_db).await?;
135
136        Ok(())
137    }
138}