1use std::{
2    fs,
3    path::{Path, PathBuf},
4    time::Duration,
5};
6
7use hopr_crypto_types::{keypairs::Keypair, prelude::ChainKeypair};
8use hopr_db_entity::prelude::{Account, Announcement};
9use hopr_primitive_types::primitives::Address;
10use migration::{MigratorChainLogs, MigratorIndex, MigratorTrait};
11use sea_orm::{EntityTrait, SqlxSqliteConnector};
12use sqlx::{
13    ConnectOptions, SqlitePool,
14    pool::PoolOptions,
15    sqlite::{SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous},
16};
17use tracing::log::LevelFilter;
18use validator::Validate;
19
20use crate::{
21    cache::{CacheKeyMapper, HoprIndexerDbCaches},
22    errors::{DbSqlError, Result},
23    prelude::model_to_account_entry,
24};
25
26#[derive(Debug, Clone, PartialEq, Eq, smart_default::SmartDefault, validator::Validate)]
27pub struct HoprIndexerDbConfig {
28    #[default(true)]
29    pub create_if_missing: bool,
30    #[default(false)]
31    pub force_create: bool,
32    #[default(Duration::from_secs(5))]
33    pub log_slow_queries: Duration,
34}
35
36#[cfg(feature = "sqlite")]
37#[derive(Debug, Clone)]
38pub(crate) struct DbConnection {
39    ro: sea_orm::DatabaseConnection,
40    rw: sea_orm::DatabaseConnection,
41}
42
43#[cfg(feature = "sqlite")]
44impl DbConnection {
45    pub fn read_only(&self) -> &sea_orm::DatabaseConnection {
46        &self.ro
47    }
48
49    pub fn read_write(&self) -> &sea_orm::DatabaseConnection {
50        &self.rw
51    }
52}
53
54#[derive(Debug, Clone)]
67pub struct HoprIndexerDb {
68    pub(crate) index_db: DbConnection,
69    pub(crate) logs_db: sea_orm::DatabaseConnection,
70    pub me_onchain: Address,
71    pub(crate) caches: HoprIndexerDbCaches,
72}
73
74pub const SQL_DB_INDEX_FILE_NAME: &str = "hopr_index.db";
76
77pub const SQL_DB_LOGS_FILE_NAME: &str = "hopr_logs.db";
79
80impl HoprIndexerDb {
81    pub async fn new(directory: &Path, chain_key: ChainKeypair, cfg: HoprIndexerDbConfig) -> Result<Self> {
82        cfg.validate()
83            .map_err(|e| DbSqlError::Construction(format!("failed configuration validation: {e}")))?;
84
85        fs::create_dir_all(directory)
86            .map_err(|_e| DbSqlError::Construction(format!("cannot create main database directory {directory:?}")))?;
87
88        let index = Self::create_pool(
89            cfg.clone(),
90            directory.to_path_buf(),
91            PoolOptions::new(),
92            Some(0),
93            Some(1),
94            false,
95            SQL_DB_INDEX_FILE_NAME,
96        )
97        .await?;
98
99        #[cfg(feature = "sqlite")]
100        let index_ro: sqlx::Pool<sqlx::Sqlite> = Self::create_pool(
101            cfg.clone(),
102            directory.to_path_buf(),
103            PoolOptions::new(),
104            Some(0),
105            Some(30),
106            true,
107            SQL_DB_INDEX_FILE_NAME,
108        )
109        .await?;
110
111        let logs = Self::create_pool(
112            cfg.clone(),
113            directory.to_path_buf(),
114            PoolOptions::new(),
115            Some(0),
116            None,
117            false,
118            SQL_DB_LOGS_FILE_NAME,
119        )
120        .await?;
121
122        #[cfg(feature = "sqlite")]
123        Self::new_sqlx_sqlite(chain_key, index, index_ro, logs).await
124    }
125
126    #[cfg(feature = "sqlite")]
127    pub async fn new_in_memory(chain_key: ChainKeypair) -> Result<Self> {
128        let index_db = SqlitePool::connect(":memory:")
129            .await
130            .map_err(|e| DbSqlError::Construction(e.to_string()))?;
131
132        Self::new_sqlx_sqlite(
133            chain_key,
134            index_db.clone(),
135            index_db,
136            SqlitePool::connect(":memory:")
137                .await
138                .map_err(|e| DbSqlError::Construction(e.to_string()))?,
139        )
140        .await
141    }
142
143    #[cfg(feature = "sqlite")]
144    async fn new_sqlx_sqlite(
145        chain_key: ChainKeypair,
146        index_db_pool: SqlitePool,
147        index_db_ro_pool: SqlitePool,
148        logs_db_pool: SqlitePool,
149    ) -> Result<Self> {
150        let index_db_rw = SqlxSqliteConnector::from_sqlx_sqlite_pool(index_db_pool);
151        let index_db_ro = SqlxSqliteConnector::from_sqlx_sqlite_pool(index_db_ro_pool);
152
153        MigratorIndex::up(&index_db_rw, None)
154            .await
155            .map_err(|e| DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
156
157        let logs_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(logs_db_pool.clone());
158
159        MigratorChainLogs::up(&logs_db, None)
160            .await
161            .map_err(|e| DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
162
163        let caches = HoprIndexerDbCaches::default();
164
165        Account::find()
166            .find_with_related(Announcement)
167            .all(&index_db_rw)
168            .await?
169            .into_iter()
170            .try_for_each(|(a, b)| match model_to_account_entry(a, b) {
171                Ok(account) => caches.key_id_mapper.update_key_id_binding(&account),
172                Err(error) => {
173                    tracing::error!(%error, "undecodeable account");
175                    Ok(())
176                }
177            })?;
178
179        Ok(Self {
180            me_onchain: chain_key.public().to_address(),
181            index_db: DbConnection {
182                ro: index_db_ro,
183                rw: index_db_rw,
184            },
185            logs_db,
186            caches,
187        })
188    }
189
190    fn common_connection_cfg_rw(cfg: HoprIndexerDbConfig) -> SqliteConnectOptions {
192        SqliteConnectOptions::default()
193            .create_if_missing(cfg.create_if_missing)
194            .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
195            .log_statements(LevelFilter::Debug)
196            .journal_mode(SqliteJournalMode::Wal)
197            .synchronous(SqliteSynchronous::Normal)
198            .auto_vacuum(SqliteAutoVacuum::Full)
199            .page_size(4096)
201            .pragma("cache_size", "-30000") .pragma("busy_timeout", "1000") }
204
205    fn common_connection_cfg_ro(cfg: HoprIndexerDbConfig) -> SqliteConnectOptions {
207        SqliteConnectOptions::default()
208            .create_if_missing(cfg.create_if_missing)
209            .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
210            .log_statements(LevelFilter::Debug)
211            .page_size(4096)
213            .pragma("cache_size", "-30000") .pragma("busy_timeout", "1000") .read_only(true)
216    }
217
218    pub async fn create_pool(
219        cfg: HoprIndexerDbConfig,
220        directory: PathBuf,
221        mut options: PoolOptions<sqlx::Sqlite>,
222        min_conn: Option<u32>,
223        max_conn: Option<u32>,
224        read_only: bool,
225        path: &str,
226    ) -> Result<SqlitePool> {
227        if let Some(min_conn) = min_conn {
228            options = options.min_connections(min_conn);
229        }
230        if let Some(max_conn) = max_conn {
231            options = options.max_connections(max_conn);
232        }
233
234        let cfg = if read_only {
235            Self::common_connection_cfg_ro(cfg)
236        } else {
237            Self::common_connection_cfg_rw(cfg)
238        };
239
240        let pool = options
241            .connect_with(cfg.filename(directory.join(path)))
242            .await
243            .map_err(|e| DbSqlError::Construction(format!("failed to create {path} database: {e}")))?;
244
245        Ok(pool)
246    }
247
248    pub fn key_id_mapper_ref(&self) -> &CacheKeyMapper {
249        &self.caches.key_id_mapper
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
256    use migration::{MigratorChainLogs, MigratorIndex, MigratorTrait};
257
258    use super::*;
259    use crate::{HoprDbGeneralModelOperations, TargetDb}; #[tokio::test]
262    async fn test_basic_db_init() -> anyhow::Result<()> {
263        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
264
265        MigratorIndex::status(db.conn(TargetDb::Index)).await?;
268        MigratorChainLogs::status(db.conn(TargetDb::Logs)).await?;
269
270        Ok(())
271    }
272}