hopr_db_sql/
db.rs

1use std::{path::Path, sync::Arc, time::Duration};
2
3use futures::channel::mpsc::UnboundedSender;
4use hopr_crypto_types::{keypairs::Keypair, prelude::ChainKeypair};
5use hopr_db_entity::{
6    prelude::{Account, Announcement},
7    ticket,
8};
9use hopr_internal_types::prelude::{AcknowledgedTicket, AcknowledgedTicketStatus};
10use hopr_primitive_types::primitives::Address;
11use migration::{MigratorChainLogs, MigratorIndex, MigratorPeers, MigratorTickets, MigratorTrait};
12use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, SqlxSqliteConnector};
13use sea_query::Expr;
14use sqlx::{
15    ConnectOptions, SqlitePool,
16    pool::PoolOptions,
17    sqlite::{SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous},
18};
19use tracing::{debug, log::LevelFilter};
20
21use crate::{
22    HoprDbAllOperations, accounts::model_to_account_entry, cache::HoprDbCaches, errors::Result,
23    ticket_manager::TicketManager,
24};
25
26pub const HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS: u64 = 5 * 60; // 5 minutes
27
28#[derive(Debug, Clone, PartialEq, Eq, smart_default::SmartDefault)]
29pub struct HoprDbConfig {
30    #[default(true)]
31    pub create_if_missing: bool,
32    #[default(false)]
33    pub force_create: bool,
34    #[default(Duration::from_secs(5))]
35    pub log_slow_queries: Duration,
36}
37
38#[derive(Debug, Clone)]
39pub struct HoprDb {
40    pub(crate) index_db: sea_orm::DatabaseConnection,
41    pub(crate) tickets_db: sea_orm::DatabaseConnection,
42    pub(crate) peers_db: sea_orm::DatabaseConnection,
43    pub(crate) logs_db: sea_orm::DatabaseConnection,
44    pub(crate) ticket_manager: Arc<TicketManager>,
45    pub(crate) chain_key: ChainKeypair,
46    pub(crate) me_onchain: Address,
47    pub(crate) caches: Arc<HoprDbCaches>,
48}
49
50pub const SQL_DB_INDEX_FILE_NAME: &str = "hopr_index.db";
51pub const SQL_DB_PEERS_FILE_NAME: &str = "hopr_peers.db";
52pub const SQL_DB_TICKETS_FILE_NAME: &str = "hopr_tickets.db";
53pub const SQL_DB_LOGS_FILE_NAME: &str = "hopr_logs.db";
54
55impl HoprDb {
56    pub async fn new(directory: &Path, chain_key: ChainKeypair, cfg: HoprDbConfig) -> Result<Self> {
57        #[cfg(all(feature = "prometheus", not(test)))]
58        {
59            lazy_static::initialize(&crate::protocol::METRIC_RECEIVED_ACKS);
60            lazy_static::initialize(&crate::protocol::METRIC_SENT_ACKS);
61            lazy_static::initialize(&crate::protocol::METRIC_TICKETS_COUNT);
62        }
63
64        std::fs::create_dir_all(directory).map_err(|_e| {
65            crate::errors::DbSqlError::Construction(format!("cannot create main database directory {directory:?}"))
66        })?;
67
68        // Default SQLite config values for all 3 DBs.
69        // Each DB can customize with its own specific values
70        let cfg_template = SqliteConnectOptions::default()
71            .create_if_missing(cfg.create_if_missing)
72            .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
73            .log_statements(LevelFilter::Debug)
74            .journal_mode(SqliteJournalMode::Wal)
75            .synchronous(SqliteSynchronous::Normal)
76            .auto_vacuum(SqliteAutoVacuum::Full)
77            //.optimize_on_close(true, None) // Removed, because it causes optimization on each connection, due to min_connections being set to 0
78            .page_size(4096)
79            .pragma("cache_size", "-30000") // 32M
80            .pragma("busy_timeout", "1000"); // 1000ms
81
82        // Indexer database
83        let index = PoolOptions::new()
84            .min_connections(0)
85            .max_connections(30)
86            .connect_with(cfg_template.clone().filename(directory.join(SQL_DB_INDEX_FILE_NAME)))
87            .await
88            .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?;
89
90        // Peers database
91        let peers = PoolOptions::new()
92            .min_connections(0) // Default is 0
93            .acquire_timeout(Duration::from_secs(60)) // Default is 30
94            .idle_timeout(Some(Duration::from_secs(10 * 60))) // This is the default
95            .max_lifetime(Some(Duration::from_secs(30 * 60))) // This is the default
96            .max_connections(300) // Default is 10
97            .connect_with(cfg_template.clone().filename(directory.join(SQL_DB_PEERS_FILE_NAME)))
98            .await
99            .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?;
100
101        // Tickets database
102        let tickets = PoolOptions::new()
103            .min_connections(0)
104            .max_connections(50)
105            .connect_with(cfg_template.clone().filename(directory.join(SQL_DB_TICKETS_FILE_NAME)))
106            .await
107            .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?;
108
109        let logs = PoolOptions::new()
110            .min_connections(0)
111            .connect_with(cfg_template.clone().filename(directory.join(SQL_DB_LOGS_FILE_NAME)))
112            .await
113            .unwrap_or_else(|e| panic!("failed to create logs database: {e}"));
114
115        Self::new_sqlx_sqlite(chain_key, index, peers, tickets, logs).await
116    }
117
118    pub async fn new_in_memory(chain_key: ChainKeypair) -> Result<Self> {
119        Self::new_sqlx_sqlite(
120            chain_key,
121            SqlitePool::connect(":memory:")
122                .await
123                .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?,
124            SqlitePool::connect(":memory:")
125                .await
126                .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?,
127            SqlitePool::connect(":memory:")
128                .await
129                .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?,
130            SqlitePool::connect(":memory:")
131                .await
132                .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?,
133        )
134        .await
135    }
136
137    async fn new_sqlx_sqlite(
138        chain_key: ChainKeypair,
139        index_db: SqlitePool,
140        peers_db: SqlitePool,
141        tickets_db: SqlitePool,
142        logs_db: SqlitePool,
143    ) -> Result<Self> {
144        let index_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(index_db);
145
146        MigratorIndex::up(&index_db, None)
147            .await
148            .map_err(|e| crate::errors::DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
149
150        let tickets_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(tickets_db);
151
152        MigratorTickets::up(&tickets_db, None)
153            .await
154            .map_err(|e| crate::errors::DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
155
156        let peers_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(peers_db);
157
158        MigratorPeers::up(&peers_db, None)
159            .await
160            .map_err(|e| crate::errors::DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
161
162        let logs_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(logs_db);
163
164        MigratorChainLogs::up(&logs_db, None)
165            .await
166            .map_err(|e| crate::errors::DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
167
168        // Reset the peer network information
169        let res = hopr_db_entity::network_peer::Entity::delete_many()
170            .filter(
171                sea_orm::Condition::all().add(
172                    hopr_db_entity::network_peer::Column::LastSeen.lt(chrono::DateTime::<chrono::Utc>::from(
173                        hopr_platform::time::native::current_time()
174                            .checked_sub(std::time::Duration::from_secs(
175                                std::env::var("HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS")
176                                    .unwrap_or_else(|_| {
177                                        HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS.to_string()
178                                    })
179                                    .parse::<u64>()
180                                    .unwrap_or(HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS),
181                            ))
182                            .unwrap_or_else(hopr_platform::time::native::current_time),
183                    )),
184                ),
185            )
186            .exec(&peers_db)
187            .await
188            .map_err(|e| crate::errors::DbSqlError::Construction(format!("must reset peers on init: {e}")))?;
189        debug!(rows = res.rows_affected, "Cleaned up rows from the 'peers' table");
190
191        // Reset all BeingAggregated ticket states to Untouched
192        ticket::Entity::update_many()
193            .filter(ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
194            .col_expr(
195                ticket::Column::State,
196                Expr::value(AcknowledgedTicketStatus::Untouched as u8),
197            )
198            .exec(&tickets_db)
199            .await?;
200
201        let caches = Arc::new(HoprDbCaches::default());
202        caches.invalidate_all();
203
204        // Initialize KeyId mapping for accounts
205        Account::find()
206            .find_with_related(Announcement)
207            .all(&index_db)
208            .await?
209            .into_iter()
210            .try_for_each(|(a, b)| match model_to_account_entry(a, b) {
211                Ok(account) => caches.key_id_mapper.update_key_id_binding(&account),
212                Err(error) => {
213                    // Undecodeable accounts are skipped and will be unreachable
214                    tracing::error!(%error, "undecodeable account");
215                    Ok(())
216                }
217            })?;
218
219        Ok(Self {
220            me_onchain: chain_key.public().to_address(),
221            chain_key,
222            index_db,
223            peers_db,
224            logs_db,
225            ticket_manager: Arc::new(TicketManager::new(tickets_db.clone(), caches.clone())),
226            tickets_db,
227            caches,
228        })
229    }
230
231    /// Starts ticket processing by the [TicketManager] with an optional new ticket notifier.
232    /// Without calling this method, tickets will not be persisted into the DB.
233    ///
234    /// If the notifier is given, it will receive notifications once new ticket has been
235    /// persisted into the Tickets DB.
236    pub fn start_ticket_processing(&self, ticket_notifier: Option<UnboundedSender<AcknowledgedTicket>>) -> Result<()> {
237        if let Some(notifier) = ticket_notifier {
238            self.ticket_manager.start_ticket_processing(notifier)
239        } else {
240            self.ticket_manager.start_ticket_processing(futures::sink::drain())
241        }
242    }
243}
244
245impl HoprDbAllOperations for HoprDb {}
246
247#[cfg(test)]
248mod tests {
249    use hopr_crypto_types::{
250        keypairs::{ChainKeypair, OffchainKeypair},
251        prelude::Keypair,
252    };
253    use hopr_db_api::peers::{HoprDbPeersOperations, PeerOrigin};
254    use hopr_primitive_types::sma::SingleSumSMA;
255    use libp2p_identity::PeerId;
256    use migration::{MigratorChainLogs, MigratorIndex, MigratorPeers, MigratorTickets, MigratorTrait};
257    use multiaddr::Multiaddr;
258    use rand::{Rng, distributions::Alphanumeric};
259
260    use crate::{HoprDbGeneralModelOperations, TargetDb, db::HoprDb}; // 0.8
261
262    #[tokio::test]
263    async fn test_basic_db_init() -> anyhow::Result<()> {
264        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
265
266        // NOTE: cfg-if this on Postgres to do only `Migrator::status(db.conn(Default::default)).await.expect("status
267        // must be ok");`
268        MigratorIndex::status(db.conn(TargetDb::Index)).await?;
269        MigratorTickets::status(db.conn(TargetDb::Tickets)).await?;
270        MigratorPeers::status(db.conn(TargetDb::Peers)).await?;
271        MigratorChainLogs::status(db.conn(TargetDb::Logs)).await?;
272
273        Ok(())
274    }
275
276    #[tokio::test]
277    async fn peers_without_any_recent_updates_should_be_discarded_on_restarts() -> anyhow::Result<()> {
278        let random_filename: String = rand::thread_rng()
279            .sample_iter(&Alphanumeric)
280            .take(15)
281            .map(char::from)
282            .collect();
283        let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
284
285        let peer_id: PeerId = OffchainKeypair::random().public().into();
286        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
287        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
288
289        let path = std::path::Path::new(&random_tmp_file);
290
291        {
292            let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
293
294            db.add_network_peer(
295                &peer_id,
296                PeerOrigin::IncomingConnection,
297                vec![ma_1.clone(), ma_2.clone()],
298                0.0,
299                25,
300            )
301            .await?;
302        }
303        {
304            let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
305
306            let not_found_peer = db.get_network_peer(&peer_id).await?;
307
308            assert_eq!(not_found_peer, None);
309        }
310
311        Ok(())
312    }
313
314    #[tokio::test]
315    async fn peers_with_a_recent_update_should_be_retained_in_the_database() -> anyhow::Result<()> {
316        let random_filename: String = rand::thread_rng()
317            .sample_iter(&Alphanumeric)
318            .take(15)
319            .map(char::from)
320            .collect();
321        let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
322
323        let ofk = OffchainKeypair::random();
324        let peer_id: PeerId = (*ofk.public()).into();
325        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
326        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
327
328        let path = std::path::Path::new(&random_tmp_file);
329
330        {
331            let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
332
333            db.add_network_peer(
334                &peer_id,
335                PeerOrigin::IncomingConnection,
336                vec![ma_1.clone(), ma_2.clone()],
337                0.0,
338                25,
339            )
340            .await?;
341
342            let ten_seconds_ago = std::time::SystemTime::now() - std::time::Duration::from_secs(10);
343
344            db.update_network_peer(hopr_db_api::peers::PeerStatus {
345                id: (*ofk.public(), peer_id),
346                origin: PeerOrigin::Initialization,
347                is_public: true,
348                last_seen: ten_seconds_ago,
349                last_seen_latency: std::time::Duration::from_millis(10),
350                heartbeats_sent: 1,
351                heartbeats_succeeded: 1,
352                backoff: 1.0,
353                ignored: None,
354                peer_version: None,
355                multiaddresses: vec![ma_1.clone(), ma_2.clone()],
356                quality: 1.0,
357                quality_avg: SingleSumSMA::new(2),
358            })
359            .await?;
360        }
361        {
362            let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
363
364            let found_peer = db.get_network_peer(&peer_id).await?.map(|p| p.id.1);
365
366            assert_eq!(found_peer, Some(peer_id));
367        }
368
369        Ok(())
370    }
371}