1use std::{
2 fs,
3 path::{Path, PathBuf},
4 time::Duration,
5};
6
7use hopr_crypto_types::{keypairs::Keypair, prelude::ChainKeypair};
8use hopr_db_entity::prelude::{Account, Announcement};
9use hopr_primitive_types::primitives::Address;
10use migration::{MigratorChainLogs, MigratorIndex, MigratorTrait};
11use sea_orm::{EntityTrait, SqlxSqliteConnector};
12use sqlx::{
13 ConnectOptions, SqlitePool,
14 pool::PoolOptions,
15 sqlite::{SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous},
16};
17use tracing::log::LevelFilter;
18use validator::Validate;
19
20use crate::{
21 cache::{CacheKeyMapper, HoprIndexerDbCaches},
22 errors::{DbSqlError, Result},
23 prelude::model_to_account_entry,
24};
25
26#[derive(Debug, Clone, PartialEq, Eq, smart_default::SmartDefault, validator::Validate)]
27pub struct HoprIndexerDbConfig {
28 #[default(true)]
29 pub create_if_missing: bool,
30 #[default(false)]
31 pub force_create: bool,
32 #[default(Duration::from_secs(5))]
33 pub log_slow_queries: Duration,
34}
35
36#[cfg(feature = "sqlite")]
37#[derive(Debug, Clone)]
38pub(crate) struct DbConnection {
39 ro: sea_orm::DatabaseConnection,
40 rw: sea_orm::DatabaseConnection,
41}
42
43#[cfg(feature = "sqlite")]
44impl DbConnection {
45 pub fn read_only(&self) -> &sea_orm::DatabaseConnection {
46 &self.ro
47 }
48
49 pub fn read_write(&self) -> &sea_orm::DatabaseConnection {
50 &self.rw
51 }
52}
53
54#[derive(Debug, Clone)]
67pub struct HoprIndexerDb {
68 pub(crate) index_db: DbConnection,
69 pub(crate) logs_db: sea_orm::DatabaseConnection,
70 pub me_onchain: Address,
71 pub(crate) caches: HoprIndexerDbCaches,
72}
73
74pub const SQL_DB_INDEX_FILE_NAME: &str = "hopr_index.db";
76
77pub const SQL_DB_LOGS_FILE_NAME: &str = "hopr_logs.db";
79
80impl HoprIndexerDb {
81 pub async fn new(directory: &Path, chain_key: ChainKeypair, cfg: HoprIndexerDbConfig) -> Result<Self> {
82 cfg.validate()
83 .map_err(|e| DbSqlError::Construction(format!("failed configuration validation: {e}")))?;
84
85 fs::create_dir_all(directory)
86 .map_err(|_e| DbSqlError::Construction(format!("cannot create main database directory {directory:?}")))?;
87
88 let index = Self::create_pool(
89 cfg.clone(),
90 directory.to_path_buf(),
91 PoolOptions::new(),
92 Some(0),
93 Some(1),
94 false,
95 SQL_DB_INDEX_FILE_NAME,
96 )
97 .await?;
98
99 #[cfg(feature = "sqlite")]
100 let index_ro = Self::create_pool(
101 cfg.clone(),
102 directory.to_path_buf(),
103 PoolOptions::new(),
104 Some(0),
105 Some(30),
106 true,
107 SQL_DB_INDEX_FILE_NAME,
108 )
109 .await?;
110
111 let logs = Self::create_pool(
112 cfg.clone(),
113 directory.to_path_buf(),
114 PoolOptions::new(),
115 Some(0),
116 None,
117 false,
118 SQL_DB_LOGS_FILE_NAME,
119 )
120 .await?;
121
122 #[cfg(feature = "sqlite")]
123 Self::new_sqlx_sqlite(chain_key, index, index_ro, logs).await
124 }
125
126 #[cfg(feature = "sqlite")]
127 pub async fn new_in_memory(chain_key: ChainKeypair) -> Result<Self> {
128 let index_db = SqlitePool::connect(":memory:")
129 .await
130 .map_err(|e| DbSqlError::Construction(e.to_string()))?;
131
132 Self::new_sqlx_sqlite(
133 chain_key,
134 index_db.clone(),
135 index_db,
136 SqlitePool::connect(":memory:")
137 .await
138 .map_err(|e| DbSqlError::Construction(e.to_string()))?,
139 )
140 .await
141 }
142
143 #[cfg(feature = "sqlite")]
144 async fn new_sqlx_sqlite(
145 chain_key: ChainKeypair,
146 index_db_pool: SqlitePool,
147 index_db_ro_pool: SqlitePool,
148 logs_db_pool: SqlitePool,
149 ) -> Result<Self> {
150 let index_db_rw = SqlxSqliteConnector::from_sqlx_sqlite_pool(index_db_pool);
151 let index_db_ro = SqlxSqliteConnector::from_sqlx_sqlite_pool(index_db_ro_pool);
152
153 MigratorIndex::up(&index_db_rw, None)
154 .await
155 .map_err(|e| DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
156
157 let logs_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(logs_db_pool.clone());
158
159 MigratorChainLogs::up(&logs_db, None)
160 .await
161 .map_err(|e| DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
162
163 let caches = HoprIndexerDbCaches::default();
164
165 Account::find()
166 .find_with_related(Announcement)
167 .all(&index_db_rw)
168 .await?
169 .into_iter()
170 .try_for_each(|(a, b)| match model_to_account_entry(a, b) {
171 Ok(account) => caches.key_id_mapper.update_key_id_binding(&account),
172 Err(error) => {
173 tracing::error!(%error, "undecodeable account");
175 Ok(())
176 }
177 })?;
178
179 Ok(Self {
180 me_onchain: chain_key.public().to_address(),
181 index_db: DbConnection {
182 ro: index_db_ro,
183 rw: index_db_rw,
184 },
185 logs_db,
186 caches,
187 })
188 }
189
190 fn common_connection_cfg_rw(cfg: HoprIndexerDbConfig) -> SqliteConnectOptions {
192 SqliteConnectOptions::default()
193 .create_if_missing(cfg.create_if_missing)
194 .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
195 .log_statements(LevelFilter::Debug)
196 .journal_mode(SqliteJournalMode::Wal)
197 .synchronous(SqliteSynchronous::Normal)
198 .auto_vacuum(SqliteAutoVacuum::Full)
199 .page_size(4096)
201 .pragma("cache_size", "-30000") .pragma("busy_timeout", "1000") }
204
205 fn common_connection_cfg_ro(cfg: HoprIndexerDbConfig) -> SqliteConnectOptions {
207 SqliteConnectOptions::default()
208 .create_if_missing(cfg.create_if_missing)
209 .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
210 .log_statements(LevelFilter::Debug)
211 .page_size(4096)
213 .pragma("cache_size", "-30000") .pragma("busy_timeout", "1000") .read_only(true)
216 }
217
218 pub async fn create_pool(
219 cfg: HoprIndexerDbConfig,
220 directory: PathBuf,
221 mut options: PoolOptions<sqlx::Sqlite>,
222 min_conn: Option<u32>,
223 max_conn: Option<u32>,
224 read_only: bool,
225 path: &str,
226 ) -> Result<SqlitePool> {
227 if let Some(min_conn) = min_conn {
228 options = options.min_connections(min_conn);
229 }
230 if let Some(max_conn) = max_conn {
231 options = options.max_connections(max_conn);
232 }
233
234 let cfg = if read_only {
235 Self::common_connection_cfg_ro(cfg)
236 } else {
237 Self::common_connection_cfg_rw(cfg)
238 };
239
240 let pool = options
241 .connect_with(cfg.filename(directory.join(path)))
242 .await
243 .map_err(|e| DbSqlError::Construction(format!("failed to create {path} database: {e}")))?;
244
245 Ok(pool)
246 }
247
248 pub fn key_id_mapper_ref(&self) -> &CacheKeyMapper {
249 &self.caches.key_id_mapper
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
256 use migration::{MigratorChainLogs, MigratorIndex, MigratorTrait};
257
258 use super::*;
259 use crate::{HoprDbGeneralModelOperations, TargetDb}; #[tokio::test]
262 async fn test_basic_db_init() -> anyhow::Result<()> {
263 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
264
265 MigratorIndex::status(db.conn(TargetDb::Index)).await?;
268 MigratorChainLogs::status(db.conn(TargetDb::Logs)).await?;
269
270 Ok(())
271 }
272}