1use std::{
2 fs,
3 path::{Path, PathBuf},
4 sync::Arc,
5 time::Duration,
6};
7
8use hopr_internal_types::channels::ChannelId;
9use hopr_primitive_types::prelude::HoprBalance;
10use migration::{MigratorTickets, MigratorTrait};
11use sea_orm::SqlxSqliteConnector;
12use sqlx::{
13 ConnectOptions, SqlitePool,
14 pool::PoolOptions,
15 sqlite::{SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous},
16};
17use tracing::log::LevelFilter;
18use validator::Validate;
19
20use crate::errors::NodeDbError;
21
22pub const SQL_DB_TICKETS_FILE_NAME: &str = "hopr_tickets.db";
24
25#[derive(Clone, Debug, validator::Validate, smart_default::SmartDefault)]
26pub struct HoprNodeDbConfig {
27 #[default(true)]
28 pub create_if_missing: bool,
29 #[default(false)]
30 pub force_create: bool,
31 #[default(Duration::from_secs(5))]
32 pub log_slow_queries: Duration,
33}
34
35#[derive(Clone)]
36pub struct HoprNodeDb {
37 pub(crate) tickets_db: sea_orm::DatabaseConnection,
38 pub(crate) tickets_write_lock: Arc<async_lock::Mutex<()>>,
39 pub(crate) cfg: HoprNodeDbConfig,
40 pub(crate) unrealized_value: moka::future::Cache<(ChannelId, u32), HoprBalance>,
42}
43
44impl HoprNodeDb {
45 pub async fn new(directory: &Path, cfg: HoprNodeDbConfig) -> Result<Self, NodeDbError> {
46 cfg.validate().map_err(|e| NodeDbError::Other(e.into()))?;
47
48 fs::create_dir_all(directory).map_err(|e| NodeDbError::Other(e.into()))?;
49
50 let tickets = Self::create_pool(
51 cfg.clone(),
52 directory.to_path_buf(),
53 PoolOptions::new(),
54 Some(0),
55 Some(50),
56 SQL_DB_TICKETS_FILE_NAME,
57 )
58 .await?;
59
60 #[cfg(feature = "sqlite")]
61 Self::new_sqlx_sqlite(tickets, cfg).await
62 }
63
64 #[cfg(feature = "sqlite")]
65 pub async fn new_in_memory() -> Result<Self, NodeDbError> {
66 Self::new_sqlx_sqlite(
67 SqlitePool::connect(":memory:")
68 .await
69 .map_err(|e| NodeDbError::Other(e.into()))?,
70 Default::default(),
71 )
72 .await
73 }
74
75 #[cfg(feature = "sqlite")]
76 async fn new_sqlx_sqlite(tickets_db_pool: SqlitePool, cfg: HoprNodeDbConfig) -> Result<Self, NodeDbError> {
77 let tickets_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(tickets_db_pool);
78 MigratorTickets::up(&tickets_db, None).await?;
79
80 Ok(Self {
81 tickets_write_lock: Arc::new(async_lock::Mutex::new(())),
82 unrealized_value: moka::future::CacheBuilder::new(10_000)
83 .time_to_idle(std::time::Duration::from_secs(30))
84 .build(),
85 tickets_db,
86 cfg,
87 })
88 }
89
90 async fn create_pool(
91 cfg: HoprNodeDbConfig,
92 directory: PathBuf,
93 mut options: PoolOptions<sqlx::Sqlite>,
94 min_conn: Option<u32>,
95 max_conn: Option<u32>,
96 path: &str,
97 ) -> Result<SqlitePool, NodeDbError> {
98 if let Some(min_conn) = min_conn {
99 options = options.min_connections(min_conn);
100 }
101 if let Some(max_conn) = max_conn {
102 options = options.max_connections(max_conn);
103 }
104
105 let sqlite_cfg = SqliteConnectOptions::default()
106 .create_if_missing(cfg.create_if_missing)
107 .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
108 .log_statements(LevelFilter::Debug)
109 .journal_mode(SqliteJournalMode::Wal)
110 .synchronous(SqliteSynchronous::Normal)
111 .auto_vacuum(SqliteAutoVacuum::Full)
112 .page_size(4096)
114 .pragma("cache_size", "-30000") .pragma("busy_timeout", "1000"); let pool = options.connect_with(sqlite_cfg.filename(directory.join(path))).await?;
118
119 Ok(pool)
120 }
121
122 pub fn config(&self) -> &HoprNodeDbConfig {
123 &self.cfg
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130
131 #[tokio::test]
132 async fn test_basic_db_init() -> anyhow::Result<()> {
133 let db = HoprNodeDb::new_in_memory().await?;
134 MigratorTickets::status(&db.tickets_db).await?;
135
136 Ok(())
137 }
138}