hopr_db_sql/
db.rs

1use std::{
2    fs,
3    path::{Path, PathBuf},
4    sync::Arc,
5    time::Duration,
6};
7
8use futures::channel::mpsc::UnboundedSender;
9use hopr_crypto_types::{keypairs::Keypair, prelude::ChainKeypair};
10use hopr_db_entity::{
11    prelude::{Account, Announcement},
12    ticket,
13};
14use hopr_internal_types::prelude::{AcknowledgedTicket, AcknowledgedTicketStatus};
15use hopr_primitive_types::primitives::Address;
16use migration::{MigratorChainLogs, MigratorIndex, MigratorPeers, MigratorTickets, MigratorTrait};
17use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, SqlxSqliteConnector};
18use sea_query::Expr;
19use sqlx::{
20    ConnectOptions, SqlitePool,
21    pool::PoolOptions,
22    sqlite::{SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous},
23};
24use tracing::{debug, log::LevelFilter};
25use validator::Validate;
26
27use crate::{
28    HoprDbAllOperations,
29    accounts::model_to_account_entry,
30    cache::HoprDbCaches,
31    errors::{DbSqlError, Result},
32    ticket_manager::TicketManager,
33};
34
35pub const HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS: u64 = 5 * 60; // 5 minutes
36
37pub const MIN_SURB_RING_BUFFER_SIZE: usize = 1024;
38
39#[derive(Debug, Clone, PartialEq, Eq, smart_default::SmartDefault, validator::Validate)]
40pub struct HoprDbConfig {
41    #[default(true)]
42    pub create_if_missing: bool,
43    #[default(false)]
44    pub force_create: bool,
45    #[default(Duration::from_secs(5))]
46    pub log_slow_queries: Duration,
47    #[default(10_000)]
48    #[validate(range(min = MIN_SURB_RING_BUFFER_SIZE))]
49    pub surb_ring_buffer_size: usize,
50    #[default(1000)]
51    #[validate(range(min = 2))]
52    pub surb_distress_threshold: usize,
53}
54
55#[cfg(feature = "sqlite")]
56#[derive(Debug, Clone)]
57pub(crate) struct DbConnection {
58    ro: sea_orm::DatabaseConnection,
59    rw: sea_orm::DatabaseConnection,
60}
61
62#[cfg(feature = "sqlite")]
63impl DbConnection {
64    pub fn read_only(&self) -> &sea_orm::DatabaseConnection {
65        &self.ro
66    }
67
68    pub fn read_write(&self) -> &sea_orm::DatabaseConnection {
69        &self.rw
70    }
71}
72
73/// Main database handle for HOPR node operations.
74///
75/// Manages multiple SQLite databases for different data domains to avoid
76/// locking conflicts and improve performance:
77///
78/// - **Index DB**: Blockchain indexing and contract data
79/// - **Tickets DB**: Payment tickets and acknowledgments
80/// - **Peers DB**: Network peer information and metadata
81/// - **Logs DB**: Blockchain logs and processing status
82///
83/// Supports database snapshot imports for fast synchronization via
84/// [`HoprDbGeneralModelOperations::import_logs_db`].
85#[derive(Debug, Clone)]
86pub struct HoprDb {
87    pub(crate) index_db: DbConnection,
88    pub(crate) tickets_db: sea_orm::DatabaseConnection,
89    pub(crate) peers_db: sea_orm::DatabaseConnection,
90    pub(crate) logs_db: sea_orm::DatabaseConnection,
91
92    pub(crate) caches: Arc<HoprDbCaches>,
93    pub(crate) chain_key: ChainKeypair,
94    pub(crate) me_onchain: Address,
95    pub(crate) ticket_manager: Arc<TicketManager>,
96    pub(crate) cfg: HoprDbConfig,
97}
98
99/// Filename for the blockchain-indexing database.
100pub const SQL_DB_INDEX_FILE_NAME: &str = "hopr_index.db";
101/// Filename for the network peers database.
102pub const SQL_DB_PEERS_FILE_NAME: &str = "hopr_peers.db";
103/// Filename for the payment tickets database.
104pub const SQL_DB_TICKETS_FILE_NAME: &str = "hopr_tickets.db";
105/// Filename for the blockchain logs database (used in snapshots).
106pub const SQL_DB_LOGS_FILE_NAME: &str = "hopr_logs.db";
107
108impl HoprDb {
109    pub async fn new(directory: &Path, chain_key: ChainKeypair, cfg: HoprDbConfig) -> Result<Self> {
110        #[cfg(all(feature = "prometheus", not(test)))]
111        {
112            lazy_static::initialize(&crate::protocol::METRIC_RECEIVED_ACKS);
113            lazy_static::initialize(&crate::protocol::METRIC_SENT_ACKS);
114            lazy_static::initialize(&crate::protocol::METRIC_TICKETS_COUNT);
115        }
116
117        cfg.validate()
118            .map_err(|e| DbSqlError::Construction(format!("failed configuration validation: {e}")))?;
119
120        fs::create_dir_all(directory)
121            .map_err(|_e| DbSqlError::Construction(format!("cannot create main database directory {directory:?}")))?;
122
123        let index = Self::create_pool(
124            cfg.clone(),
125            directory.to_path_buf(),
126            PoolOptions::new(),
127            Some(0),
128            Some(1),
129            false,
130            SQL_DB_INDEX_FILE_NAME,
131        )
132        .await?;
133
134        #[cfg(feature = "sqlite")]
135        let index_ro = Self::create_pool(
136            cfg.clone(),
137            directory.to_path_buf(),
138            PoolOptions::new(),
139            Some(0),
140            Some(30),
141            true,
142            SQL_DB_INDEX_FILE_NAME,
143        )
144        .await?;
145
146        let peers_options = PoolOptions::new()
147            .acquire_timeout(Duration::from_secs(60)) // Default is 30
148            .idle_timeout(Some(Duration::from_secs(10 * 60))) // This is the default
149            .max_lifetime(Some(Duration::from_secs(30 * 60))); // This is the default
150
151        let peers = Self::create_pool(
152            cfg.clone(),
153            directory.to_path_buf(),
154            peers_options,
155            Some(0),
156            Some(300),
157            false,
158            SQL_DB_PEERS_FILE_NAME,
159        )
160        .await?;
161
162        let tickets = Self::create_pool(
163            cfg.clone(),
164            directory.to_path_buf(),
165            PoolOptions::new(),
166            Some(0),
167            Some(50),
168            false,
169            SQL_DB_TICKETS_FILE_NAME,
170        )
171        .await?;
172
173        let logs = Self::create_pool(
174            cfg.clone(),
175            directory.to_path_buf(),
176            PoolOptions::new(),
177            Some(0),
178            None,
179            false,
180            SQL_DB_LOGS_FILE_NAME,
181        )
182        .await?;
183
184        #[cfg(feature = "sqlite")]
185        Self::new_sqlx_sqlite(chain_key, index, index_ro, peers, tickets, logs, cfg).await
186    }
187
188    #[cfg(feature = "sqlite")]
189    pub async fn new_in_memory(chain_key: ChainKeypair) -> Result<Self> {
190        let index_db = SqlitePool::connect(":memory:")
191            .await
192            .map_err(|e| DbSqlError::Construction(e.to_string()))?;
193
194        Self::new_sqlx_sqlite(
195            chain_key,
196            index_db.clone(),
197            index_db,
198            SqlitePool::connect(":memory:")
199                .await
200                .map_err(|e| DbSqlError::Construction(e.to_string()))?,
201            SqlitePool::connect(":memory:")
202                .await
203                .map_err(|e| DbSqlError::Construction(e.to_string()))?,
204            SqlitePool::connect(":memory:")
205                .await
206                .map_err(|e| DbSqlError::Construction(e.to_string()))?,
207            Default::default(),
208        )
209        .await
210    }
211
212    #[cfg(feature = "sqlite")]
213    async fn new_sqlx_sqlite(
214        chain_key: ChainKeypair,
215        index_db_pool: SqlitePool,
216        index_db_ro_pool: SqlitePool,
217        peers_db_pool: SqlitePool,
218        tickets_db_pool: SqlitePool,
219        logs_db_pool: SqlitePool,
220        cfg: HoprDbConfig,
221    ) -> Result<Self> {
222        let index_db_rw = SqlxSqliteConnector::from_sqlx_sqlite_pool(index_db_pool);
223        let index_db_ro = SqlxSqliteConnector::from_sqlx_sqlite_pool(index_db_ro_pool);
224
225        MigratorIndex::up(&index_db_rw, None)
226            .await
227            .map_err(|e| DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
228
229        let tickets_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(tickets_db_pool);
230
231        MigratorTickets::up(&tickets_db, None)
232            .await
233            .map_err(|e| DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
234
235        let peers_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(peers_db_pool);
236
237        MigratorPeers::up(&peers_db, None)
238            .await
239            .map_err(|e| DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
240
241        let logs_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(logs_db_pool.clone());
242
243        MigratorChainLogs::up(&logs_db, None)
244            .await
245            .map_err(|e| DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
246
247        // Reset the peer network information
248        let res = hopr_db_entity::network_peer::Entity::delete_many()
249            .filter(
250                sea_orm::Condition::all().add(
251                    hopr_db_entity::network_peer::Column::LastSeen.lt(chrono::DateTime::<chrono::Utc>::from(
252                        hopr_platform::time::native::current_time()
253                            .checked_sub(std::time::Duration::from_secs(
254                                std::env::var("HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS")
255                                    .unwrap_or_else(|_| {
256                                        HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS.to_string()
257                                    })
258                                    .parse::<u64>()
259                                    .unwrap_or(HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS),
260                            ))
261                            .unwrap_or_else(hopr_platform::time::native::current_time),
262                    )),
263                ),
264            )
265            .exec(&peers_db)
266            .await
267            .map_err(|e| DbSqlError::Construction(format!("must reset peers on init: {e}")))?;
268        debug!(rows = res.rows_affected, "Cleaned up rows from the 'peers' table");
269
270        // Reset all BeingAggregated ticket states to Untouched
271        ticket::Entity::update_many()
272            .filter(ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
273            .col_expr(
274                ticket::Column::State,
275                Expr::value(AcknowledgedTicketStatus::Untouched as u8),
276            )
277            .exec(&tickets_db)
278            .await?;
279
280        let caches = Arc::new(HoprDbCaches::default());
281        caches.invalidate_all();
282
283        // Initialize KeyId mapping for accounts
284        Account::find()
285            .find_with_related(Announcement)
286            .all(&index_db_rw)
287            .await?
288            .into_iter()
289            .try_for_each(|(a, b)| match model_to_account_entry(a, b) {
290                Ok(account) => caches.key_id_mapper.update_key_id_binding(&account),
291                Err(error) => {
292                    // Undecodeable accounts are skipped and will be unreachable
293                    tracing::error!(%error, "undecodeable account");
294                    Ok(())
295                }
296            })?;
297
298        Ok(Self {
299            me_onchain: chain_key.public().to_address(),
300            chain_key,
301            ticket_manager: Arc::new(TicketManager::new(tickets_db.clone(), caches.clone())),
302            caches,
303
304            index_db: DbConnection {
305                ro: index_db_ro,
306                rw: index_db_rw,
307            },
308            tickets_db,
309            peers_db,
310            logs_db,
311            cfg,
312        })
313    }
314
315    /// Starts ticket processing by the `TicketManager` with an optional new ticket notifier.
316    /// Without calling this method, tickets will not be persisted into the DB.
317    ///
318    /// If the notifier is given, it will receive notifications once new ticket has been
319    /// persisted into the Tickets DB.
320    pub fn start_ticket_processing(&self, ticket_notifier: Option<UnboundedSender<AcknowledgedTicket>>) -> Result<()> {
321        if let Some(notifier) = ticket_notifier {
322            self.ticket_manager.start_ticket_processing(notifier)
323        } else {
324            self.ticket_manager.start_ticket_processing(futures::sink::drain())
325        }
326    }
327
328    /// Default SQLite config values for all DBs with RW  (read-write) access.
329    fn common_connection_cfg_rw(cfg: HoprDbConfig) -> SqliteConnectOptions {
330        SqliteConnectOptions::default()
331            .create_if_missing(cfg.create_if_missing)
332            .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
333            .log_statements(LevelFilter::Debug)
334            .journal_mode(SqliteJournalMode::Wal)
335            .synchronous(SqliteSynchronous::Normal)
336            .auto_vacuum(SqliteAutoVacuum::Full)
337            //.optimize_on_close(true, None) // Removed, because it causes optimization on each connection, due to min_connections being set to 0
338            .page_size(4096)
339            .pragma("cache_size", "-30000") // 32M
340            .pragma("busy_timeout", "1000") // 1000ms
341    }
342
343    /// Default SQLite config values for all DBs with RO (read-only) access.
344    fn common_connection_cfg_ro(cfg: HoprDbConfig) -> SqliteConnectOptions {
345        SqliteConnectOptions::default()
346            .create_if_missing(cfg.create_if_missing)
347            .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
348            .log_statements(LevelFilter::Debug)
349            //.optimize_on_close(true, None) // Removed, because it causes optimization on each connection, due to min_connections being set to 0
350            .page_size(4096)
351            .pragma("cache_size", "-30000") // 32M
352            .pragma("busy_timeout", "1000") // 1000ms
353            .read_only(true)
354    }
355
356    pub async fn create_pool(
357        cfg: HoprDbConfig,
358        directory: PathBuf,
359        mut options: PoolOptions<sqlx::Sqlite>,
360        min_conn: Option<u32>,
361        max_conn: Option<u32>,
362        read_only: bool,
363        path: &str,
364    ) -> Result<SqlitePool> {
365        if let Some(min_conn) = min_conn {
366            options = options.min_connections(min_conn);
367        }
368        if let Some(max_conn) = max_conn {
369            options = options.max_connections(max_conn);
370        }
371
372        let cfg = if read_only {
373            Self::common_connection_cfg_ro(cfg)
374        } else {
375            Self::common_connection_cfg_rw(cfg)
376        };
377
378        let pool = options
379            .connect_with(cfg.filename(directory.join(path)))
380            .await
381            .map_err(|e| DbSqlError::Construction(format!("failed to create {path} database: {e}")))?;
382
383        Ok(pool)
384    }
385}
386
387impl HoprDbAllOperations for HoprDb {}
388
389#[cfg(test)]
390mod tests {
391    use hopr_crypto_types::{
392        keypairs::{ChainKeypair, OffchainKeypair},
393        prelude::Keypair,
394    };
395    use hopr_db_api::peers::{HoprDbPeersOperations, PeerOrigin};
396    use hopr_primitive_types::sma::SingleSumSMA;
397    use libp2p_identity::PeerId;
398    use migration::{MigratorChainLogs, MigratorIndex, MigratorPeers, MigratorTickets, MigratorTrait};
399    use multiaddr::Multiaddr;
400    use rand::{Rng, distributions::Alphanumeric};
401
402    use crate::{HoprDbGeneralModelOperations, TargetDb, db::HoprDb}; // 0.8
403
404    #[tokio::test]
405    async fn test_basic_db_init() -> anyhow::Result<()> {
406        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
407
408        // NOTE: cfg-if this on Postgres to do only `Migrator::status(db.conn(Default::default)).await.expect("status
409        // must be ok");`
410        MigratorIndex::status(db.conn(TargetDb::Index)).await?;
411        MigratorTickets::status(db.conn(TargetDb::Tickets)).await?;
412        MigratorPeers::status(db.conn(TargetDb::Peers)).await?;
413        MigratorChainLogs::status(db.conn(TargetDb::Logs)).await?;
414
415        Ok(())
416    }
417
418    #[tokio::test]
419    async fn peers_without_any_recent_updates_should_be_discarded_on_restarts() -> anyhow::Result<()> {
420        let random_filename: String = rand::thread_rng()
421            .sample_iter(&Alphanumeric)
422            .take(15)
423            .map(char::from)
424            .collect();
425        let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
426
427        let peer_id: PeerId = OffchainKeypair::random().public().into();
428        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
429        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
430
431        let path = std::path::Path::new(&random_tmp_file);
432
433        {
434            let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
435
436            db.add_network_peer(
437                &peer_id,
438                PeerOrigin::IncomingConnection,
439                vec![ma_1.clone(), ma_2.clone()],
440                0.0,
441                25,
442            )
443            .await?;
444        }
445
446        {
447            let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
448
449            let not_found_peer = db.get_network_peer(&peer_id).await?;
450
451            assert_eq!(not_found_peer, None);
452        }
453
454        Ok(())
455    }
456
457    #[tokio::test]
458    async fn peers_with_a_recent_update_should_be_retained_in_the_database() -> anyhow::Result<()> {
459        let random_filename: String = rand::thread_rng()
460            .sample_iter(&Alphanumeric)
461            .take(15)
462            .map(char::from)
463            .collect();
464        let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
465
466        let ofk = OffchainKeypair::random();
467        let peer_id: PeerId = (*ofk.public()).into();
468        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
469        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
470
471        let path = std::path::Path::new(&random_tmp_file);
472
473        {
474            let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
475
476            db.add_network_peer(
477                &peer_id,
478                PeerOrigin::IncomingConnection,
479                vec![ma_1.clone(), ma_2.clone()],
480                0.0,
481                25,
482            )
483            .await?;
484
485            let ten_seconds_ago = std::time::SystemTime::now() - std::time::Duration::from_secs(10);
486
487            db.update_network_peer(hopr_db_api::peers::PeerStatus {
488                id: (*ofk.public(), peer_id),
489                origin: PeerOrigin::Initialization,
490                last_seen: ten_seconds_ago,
491                last_seen_latency: std::time::Duration::from_millis(10),
492                heartbeats_sent: 1,
493                heartbeats_succeeded: 1,
494                backoff: 1.0,
495                ignored_until: None,
496                multiaddresses: vec![ma_1.clone(), ma_2.clone()],
497                quality: 1.0,
498                quality_avg: SingleSumSMA::new(2),
499            })
500            .await?;
501        }
502        {
503            let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
504
505            let found_peer = db.get_network_peer(&peer_id).await?.map(|p| p.id.1);
506
507            assert_eq!(found_peer, Some(peer_id));
508        }
509
510        Ok(())
511    }
512}