1use std::{
2 fs,
3 path::{Path, PathBuf},
4 sync::Arc,
5 time::Duration,
6};
7
8use futures::channel::mpsc::UnboundedSender;
9use hopr_crypto_types::{keypairs::Keypair, prelude::ChainKeypair};
10use hopr_db_entity::{
11 prelude::{Account, Announcement},
12 ticket,
13};
14use hopr_internal_types::prelude::{AcknowledgedTicket, AcknowledgedTicketStatus};
15use hopr_primitive_types::primitives::Address;
16use migration::{MigratorChainLogs, MigratorIndex, MigratorPeers, MigratorTickets, MigratorTrait};
17use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, SqlxSqliteConnector};
18use sea_query::Expr;
19use sqlx::{
20 ConnectOptions, SqlitePool,
21 pool::PoolOptions,
22 sqlite::{SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous},
23};
24use tracing::{debug, log::LevelFilter};
25use validator::Validate;
26
27use crate::{
28 HoprDbAllOperations,
29 accounts::model_to_account_entry,
30 cache::HoprDbCaches,
31 errors::{DbSqlError, Result},
32 ticket_manager::TicketManager,
33};
34
35pub const HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS: u64 = 5 * 60; pub const MIN_SURB_RING_BUFFER_SIZE: usize = 1024;
38
39#[derive(Debug, Clone, PartialEq, Eq, smart_default::SmartDefault, validator::Validate)]
40pub struct HoprDbConfig {
41 #[default(true)]
42 pub create_if_missing: bool,
43 #[default(false)]
44 pub force_create: bool,
45 #[default(Duration::from_secs(5))]
46 pub log_slow_queries: Duration,
47 #[default(10_000)]
48 #[validate(range(min = MIN_SURB_RING_BUFFER_SIZE))]
49 pub surb_ring_buffer_size: usize,
50 #[default(1000)]
51 #[validate(range(min = 2))]
52 pub surb_distress_threshold: usize,
53}
54
55#[cfg(feature = "sqlite")]
56#[derive(Debug, Clone)]
57pub(crate) struct DbConnection {
58 ro: sea_orm::DatabaseConnection,
59 rw: sea_orm::DatabaseConnection,
60}
61
62#[cfg(feature = "sqlite")]
63impl DbConnection {
64 pub fn read_only(&self) -> &sea_orm::DatabaseConnection {
65 &self.ro
66 }
67
68 pub fn read_write(&self) -> &sea_orm::DatabaseConnection {
69 &self.rw
70 }
71}
72
73#[derive(Debug, Clone)]
86pub struct HoprDb {
87 pub(crate) index_db: DbConnection,
88 pub(crate) tickets_db: sea_orm::DatabaseConnection,
89 pub(crate) peers_db: sea_orm::DatabaseConnection,
90 pub(crate) logs_db: sea_orm::DatabaseConnection,
91
92 pub(crate) caches: Arc<HoprDbCaches>,
93 pub(crate) chain_key: ChainKeypair,
94 pub(crate) me_onchain: Address,
95 pub(crate) ticket_manager: Arc<TicketManager>,
96 pub(crate) cfg: HoprDbConfig,
97}
98
99pub const SQL_DB_INDEX_FILE_NAME: &str = "hopr_index.db";
101pub const SQL_DB_PEERS_FILE_NAME: &str = "hopr_peers.db";
103pub const SQL_DB_TICKETS_FILE_NAME: &str = "hopr_tickets.db";
105pub const SQL_DB_LOGS_FILE_NAME: &str = "hopr_logs.db";
107
108impl HoprDb {
109 pub async fn new(directory: &Path, chain_key: ChainKeypair, cfg: HoprDbConfig) -> Result<Self> {
110 #[cfg(all(feature = "prometheus", not(test)))]
111 {
112 lazy_static::initialize(&crate::protocol::METRIC_RECEIVED_ACKS);
113 lazy_static::initialize(&crate::protocol::METRIC_SENT_ACKS);
114 lazy_static::initialize(&crate::protocol::METRIC_TICKETS_COUNT);
115 }
116
117 cfg.validate()
118 .map_err(|e| DbSqlError::Construction(format!("failed configuration validation: {e}")))?;
119
120 fs::create_dir_all(directory)
121 .map_err(|_e| DbSqlError::Construction(format!("cannot create main database directory {directory:?}")))?;
122
123 let index = Self::create_pool(
124 cfg.clone(),
125 directory.to_path_buf(),
126 PoolOptions::new(),
127 Some(0),
128 Some(1),
129 false,
130 SQL_DB_INDEX_FILE_NAME,
131 )
132 .await?;
133
134 #[cfg(feature = "sqlite")]
135 let index_ro = Self::create_pool(
136 cfg.clone(),
137 directory.to_path_buf(),
138 PoolOptions::new(),
139 Some(0),
140 Some(30),
141 true,
142 SQL_DB_INDEX_FILE_NAME,
143 )
144 .await?;
145
146 let peers_options = PoolOptions::new()
147 .acquire_timeout(Duration::from_secs(60)) .idle_timeout(Some(Duration::from_secs(10 * 60))) .max_lifetime(Some(Duration::from_secs(30 * 60))); let peers = Self::create_pool(
152 cfg.clone(),
153 directory.to_path_buf(),
154 peers_options,
155 Some(0),
156 Some(300),
157 false,
158 SQL_DB_PEERS_FILE_NAME,
159 )
160 .await?;
161
162 let tickets = Self::create_pool(
163 cfg.clone(),
164 directory.to_path_buf(),
165 PoolOptions::new(),
166 Some(0),
167 Some(50),
168 false,
169 SQL_DB_TICKETS_FILE_NAME,
170 )
171 .await?;
172
173 let logs = Self::create_pool(
174 cfg.clone(),
175 directory.to_path_buf(),
176 PoolOptions::new(),
177 Some(0),
178 None,
179 false,
180 SQL_DB_LOGS_FILE_NAME,
181 )
182 .await?;
183
184 #[cfg(feature = "sqlite")]
185 Self::new_sqlx_sqlite(chain_key, index, index_ro, peers, tickets, logs, cfg).await
186 }
187
188 #[cfg(feature = "sqlite")]
189 pub async fn new_in_memory(chain_key: ChainKeypair) -> Result<Self> {
190 let index_db = SqlitePool::connect(":memory:")
191 .await
192 .map_err(|e| DbSqlError::Construction(e.to_string()))?;
193
194 Self::new_sqlx_sqlite(
195 chain_key,
196 index_db.clone(),
197 index_db,
198 SqlitePool::connect(":memory:")
199 .await
200 .map_err(|e| DbSqlError::Construction(e.to_string()))?,
201 SqlitePool::connect(":memory:")
202 .await
203 .map_err(|e| DbSqlError::Construction(e.to_string()))?,
204 SqlitePool::connect(":memory:")
205 .await
206 .map_err(|e| DbSqlError::Construction(e.to_string()))?,
207 Default::default(),
208 )
209 .await
210 }
211
212 #[cfg(feature = "sqlite")]
213 async fn new_sqlx_sqlite(
214 chain_key: ChainKeypair,
215 index_db_pool: SqlitePool,
216 index_db_ro_pool: SqlitePool,
217 peers_db_pool: SqlitePool,
218 tickets_db_pool: SqlitePool,
219 logs_db_pool: SqlitePool,
220 cfg: HoprDbConfig,
221 ) -> Result<Self> {
222 let index_db_rw = SqlxSqliteConnector::from_sqlx_sqlite_pool(index_db_pool);
223 let index_db_ro = SqlxSqliteConnector::from_sqlx_sqlite_pool(index_db_ro_pool);
224
225 MigratorIndex::up(&index_db_rw, None)
226 .await
227 .map_err(|e| DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
228
229 let tickets_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(tickets_db_pool);
230
231 MigratorTickets::up(&tickets_db, None)
232 .await
233 .map_err(|e| DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
234
235 let peers_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(peers_db_pool);
236
237 MigratorPeers::up(&peers_db, None)
238 .await
239 .map_err(|e| DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
240
241 let logs_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(logs_db_pool.clone());
242
243 MigratorChainLogs::up(&logs_db, None)
244 .await
245 .map_err(|e| DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
246
247 let res = hopr_db_entity::network_peer::Entity::delete_many()
249 .filter(
250 sea_orm::Condition::all().add(
251 hopr_db_entity::network_peer::Column::LastSeen.lt(chrono::DateTime::<chrono::Utc>::from(
252 hopr_platform::time::native::current_time()
253 .checked_sub(std::time::Duration::from_secs(
254 std::env::var("HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS")
255 .unwrap_or_else(|_| {
256 HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS.to_string()
257 })
258 .parse::<u64>()
259 .unwrap_or(HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS),
260 ))
261 .unwrap_or_else(hopr_platform::time::native::current_time),
262 )),
263 ),
264 )
265 .exec(&peers_db)
266 .await
267 .map_err(|e| DbSqlError::Construction(format!("must reset peers on init: {e}")))?;
268 debug!(rows = res.rows_affected, "Cleaned up rows from the 'peers' table");
269
270 ticket::Entity::update_many()
272 .filter(ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
273 .col_expr(
274 ticket::Column::State,
275 Expr::value(AcknowledgedTicketStatus::Untouched as u8),
276 )
277 .exec(&tickets_db)
278 .await?;
279
280 let caches = Arc::new(HoprDbCaches::default());
281 caches.invalidate_all();
282
283 Account::find()
285 .find_with_related(Announcement)
286 .all(&index_db_rw)
287 .await?
288 .into_iter()
289 .try_for_each(|(a, b)| match model_to_account_entry(a, b) {
290 Ok(account) => caches.key_id_mapper.update_key_id_binding(&account),
291 Err(error) => {
292 tracing::error!(%error, "undecodeable account");
294 Ok(())
295 }
296 })?;
297
298 Ok(Self {
299 me_onchain: chain_key.public().to_address(),
300 chain_key,
301 ticket_manager: Arc::new(TicketManager::new(tickets_db.clone(), caches.clone())),
302 caches,
303
304 index_db: DbConnection {
305 ro: index_db_ro,
306 rw: index_db_rw,
307 },
308 tickets_db,
309 peers_db,
310 logs_db,
311 cfg,
312 })
313 }
314
315 pub fn start_ticket_processing(&self, ticket_notifier: Option<UnboundedSender<AcknowledgedTicket>>) -> Result<()> {
321 if let Some(notifier) = ticket_notifier {
322 self.ticket_manager.start_ticket_processing(notifier)
323 } else {
324 self.ticket_manager.start_ticket_processing(futures::sink::drain())
325 }
326 }
327
328 fn common_connection_cfg_rw(cfg: HoprDbConfig) -> SqliteConnectOptions {
330 SqliteConnectOptions::default()
331 .create_if_missing(cfg.create_if_missing)
332 .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
333 .log_statements(LevelFilter::Debug)
334 .journal_mode(SqliteJournalMode::Wal)
335 .synchronous(SqliteSynchronous::Normal)
336 .auto_vacuum(SqliteAutoVacuum::Full)
337 .page_size(4096)
339 .pragma("cache_size", "-30000") .pragma("busy_timeout", "1000") }
342
343 fn common_connection_cfg_ro(cfg: HoprDbConfig) -> SqliteConnectOptions {
345 SqliteConnectOptions::default()
346 .create_if_missing(cfg.create_if_missing)
347 .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
348 .log_statements(LevelFilter::Debug)
349 .page_size(4096)
351 .pragma("cache_size", "-30000") .pragma("busy_timeout", "1000") .read_only(true)
354 }
355
356 pub async fn create_pool(
357 cfg: HoprDbConfig,
358 directory: PathBuf,
359 mut options: PoolOptions<sqlx::Sqlite>,
360 min_conn: Option<u32>,
361 max_conn: Option<u32>,
362 read_only: bool,
363 path: &str,
364 ) -> Result<SqlitePool> {
365 if let Some(min_conn) = min_conn {
366 options = options.min_connections(min_conn);
367 }
368 if let Some(max_conn) = max_conn {
369 options = options.max_connections(max_conn);
370 }
371
372 let cfg = if read_only {
373 Self::common_connection_cfg_ro(cfg)
374 } else {
375 Self::common_connection_cfg_rw(cfg)
376 };
377
378 let pool = options
379 .connect_with(cfg.filename(directory.join(path)))
380 .await
381 .map_err(|e| DbSqlError::Construction(format!("failed to create {path} database: {e}")))?;
382
383 Ok(pool)
384 }
385}
386
387impl HoprDbAllOperations for HoprDb {}
388
389#[cfg(test)]
390mod tests {
391 use hopr_crypto_types::{
392 keypairs::{ChainKeypair, OffchainKeypair},
393 prelude::Keypair,
394 };
395 use hopr_db_api::peers::{HoprDbPeersOperations, PeerOrigin};
396 use hopr_primitive_types::sma::SingleSumSMA;
397 use libp2p_identity::PeerId;
398 use migration::{MigratorChainLogs, MigratorIndex, MigratorPeers, MigratorTickets, MigratorTrait};
399 use multiaddr::Multiaddr;
400 use rand::{Rng, distributions::Alphanumeric};
401
402 use crate::{HoprDbGeneralModelOperations, TargetDb, db::HoprDb}; #[tokio::test]
405 async fn test_basic_db_init() -> anyhow::Result<()> {
406 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
407
408 MigratorIndex::status(db.conn(TargetDb::Index)).await?;
411 MigratorTickets::status(db.conn(TargetDb::Tickets)).await?;
412 MigratorPeers::status(db.conn(TargetDb::Peers)).await?;
413 MigratorChainLogs::status(db.conn(TargetDb::Logs)).await?;
414
415 Ok(())
416 }
417
418 #[tokio::test]
419 async fn peers_without_any_recent_updates_should_be_discarded_on_restarts() -> anyhow::Result<()> {
420 let random_filename: String = rand::thread_rng()
421 .sample_iter(&Alphanumeric)
422 .take(15)
423 .map(char::from)
424 .collect();
425 let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
426
427 let peer_id: PeerId = OffchainKeypair::random().public().into();
428 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
429 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
430
431 let path = std::path::Path::new(&random_tmp_file);
432
433 {
434 let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
435
436 db.add_network_peer(
437 &peer_id,
438 PeerOrigin::IncomingConnection,
439 vec![ma_1.clone(), ma_2.clone()],
440 0.0,
441 25,
442 )
443 .await?;
444 }
445
446 {
447 let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
448
449 let not_found_peer = db.get_network_peer(&peer_id).await?;
450
451 assert_eq!(not_found_peer, None);
452 }
453
454 Ok(())
455 }
456
457 #[tokio::test]
458 async fn peers_with_a_recent_update_should_be_retained_in_the_database() -> anyhow::Result<()> {
459 let random_filename: String = rand::thread_rng()
460 .sample_iter(&Alphanumeric)
461 .take(15)
462 .map(char::from)
463 .collect();
464 let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
465
466 let ofk = OffchainKeypair::random();
467 let peer_id: PeerId = (*ofk.public()).into();
468 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
469 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
470
471 let path = std::path::Path::new(&random_tmp_file);
472
473 {
474 let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
475
476 db.add_network_peer(
477 &peer_id,
478 PeerOrigin::IncomingConnection,
479 vec![ma_1.clone(), ma_2.clone()],
480 0.0,
481 25,
482 )
483 .await?;
484
485 let ten_seconds_ago = std::time::SystemTime::now() - std::time::Duration::from_secs(10);
486
487 db.update_network_peer(hopr_db_api::peers::PeerStatus {
488 id: (*ofk.public(), peer_id),
489 origin: PeerOrigin::Initialization,
490 last_seen: ten_seconds_ago,
491 last_seen_latency: std::time::Duration::from_millis(10),
492 heartbeats_sent: 1,
493 heartbeats_succeeded: 1,
494 backoff: 1.0,
495 ignored_until: None,
496 multiaddresses: vec![ma_1.clone(), ma_2.clone()],
497 quality: 1.0,
498 quality_avg: SingleSumSMA::new(2),
499 })
500 .await?;
501 }
502 {
503 let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
504
505 let found_peer = db.get_network_peer(&peer_id).await?.map(|p| p.id.1);
506
507 assert_eq!(found_peer, Some(peer_id));
508 }
509
510 Ok(())
511 }
512}