hopr_db_sql/
db.rs

1use futures::channel::mpsc::UnboundedSender;
2use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, SqlxSqliteConnector};
3use sea_query::Expr;
4use sqlx::pool::PoolOptions;
5use sqlx::sqlite::{SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous};
6use sqlx::{ConnectOptions, SqlitePool};
7use std::path::Path;
8use std::sync::Arc;
9use std::time::Duration;
10use tracing::debug;
11use tracing::log::LevelFilter;
12
13use hopr_crypto_types::keypairs::Keypair;
14use hopr_crypto_types::prelude::ChainKeypair;
15use hopr_db_entity::ticket;
16use hopr_internal_types::prelude::{AcknowledgedTicket, AcknowledgedTicketStatus};
17use hopr_primitive_types::primitives::Address;
18use migration::{MigratorChainLogs, MigratorIndex, MigratorPeers, MigratorTickets, MigratorTrait};
19
20use crate::cache::HoprDbCaches;
21use crate::errors::Result;
22use crate::ticket_manager::TicketManager;
23use crate::HoprDbAllOperations;
24
25pub const HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS: u64 = 5 * 60; // 5 minutes
26
27#[derive(Debug, Clone, PartialEq, Eq, smart_default::SmartDefault)]
28pub struct HoprDbConfig {
29    #[default(true)]
30    pub create_if_missing: bool,
31    #[default(false)]
32    pub force_create: bool,
33    #[default(Duration::from_secs(5))]
34    pub log_slow_queries: Duration,
35}
36
37#[derive(Debug, Clone)]
38pub struct HoprDb {
39    pub(crate) index_db: sea_orm::DatabaseConnection,
40    pub(crate) tickets_db: sea_orm::DatabaseConnection,
41    pub(crate) peers_db: sea_orm::DatabaseConnection,
42    pub(crate) logs_db: sea_orm::DatabaseConnection,
43    pub(crate) ticket_manager: Arc<TicketManager>,
44    pub(crate) chain_key: ChainKeypair,
45    pub(crate) me_onchain: Address,
46    pub(crate) caches: Arc<HoprDbCaches>,
47}
48
49pub const SQL_DB_INDEX_FILE_NAME: &str = "hopr_index.db";
50pub const SQL_DB_PEERS_FILE_NAME: &str = "hopr_peers.db";
51pub const SQL_DB_TICKETS_FILE_NAME: &str = "hopr_tickets.db";
52pub const SQL_DB_LOGS_FILE_NAME: &str = "hopr_logs.db";
53
54impl HoprDb {
55    pub async fn new(directory: &Path, chain_key: ChainKeypair, cfg: HoprDbConfig) -> Result<Self> {
56        std::fs::create_dir_all(directory).map_err(|_e| {
57            crate::errors::DbSqlError::Construction(format!("cannot create main database directory {directory:?}"))
58        })?;
59
60        // Default SQLite config values for all 3 DBs.
61        // Each DB can customize with its own specific values
62        let cfg_template = SqliteConnectOptions::default()
63            .create_if_missing(cfg.create_if_missing)
64            .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
65            .log_statements(LevelFilter::Debug)
66            .journal_mode(SqliteJournalMode::Wal)
67            .synchronous(SqliteSynchronous::Normal)
68            .auto_vacuum(SqliteAutoVacuum::Full)
69            //.optimize_on_close(true, None) // Removed, because it causes optimization on each connection, due to min_connections being set to 0
70            .page_size(4096)
71            .pragma("cache_size", "-30000") // 32M
72            .pragma("busy_timeout", "1000"); // 1000ms
73
74        // Indexer database
75        let index = PoolOptions::new()
76            .min_connections(0)
77            .max_connections(30)
78            .connect_with(cfg_template.clone().filename(directory.join(SQL_DB_INDEX_FILE_NAME)))
79            .await
80            .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?;
81
82        // Peers database
83        let peers = PoolOptions::new()
84            .min_connections(0) // Default is 0
85            .acquire_timeout(Duration::from_secs(60)) // Default is 30
86            .idle_timeout(Some(Duration::from_secs(10 * 60))) // This is the default
87            .max_lifetime(Some(Duration::from_secs(30 * 60))) // This is the default
88            .max_connections(300) // Default is 10
89            .connect_with(cfg_template.clone().filename(directory.join(SQL_DB_PEERS_FILE_NAME)))
90            .await
91            .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?;
92
93        // Tickets database
94        let tickets = PoolOptions::new()
95            .min_connections(0)
96            .max_connections(50)
97            .connect_with(cfg_template.clone().filename(directory.join(SQL_DB_TICKETS_FILE_NAME)))
98            .await
99            .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?;
100
101        let logs = PoolOptions::new()
102            .min_connections(0)
103            .connect_with(cfg_template.clone().filename(directory.join(SQL_DB_LOGS_FILE_NAME)))
104            .await
105            .unwrap_or_else(|e| panic!("failed to create logs database: {e}"));
106
107        Self::new_sqlx_sqlite(chain_key, index, peers, tickets, logs).await
108    }
109
110    pub async fn new_in_memory(chain_key: ChainKeypair) -> Result<Self> {
111        Self::new_sqlx_sqlite(
112            chain_key,
113            SqlitePool::connect(":memory:")
114                .await
115                .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?,
116            SqlitePool::connect(":memory:")
117                .await
118                .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?,
119            SqlitePool::connect(":memory:")
120                .await
121                .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?,
122            SqlitePool::connect(":memory:")
123                .await
124                .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?,
125        )
126        .await
127    }
128
129    async fn new_sqlx_sqlite(
130        chain_key: ChainKeypair,
131        index_db: SqlitePool,
132        peers_db: SqlitePool,
133        tickets_db: SqlitePool,
134        logs_db: SqlitePool,
135    ) -> Result<Self> {
136        let index_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(index_db);
137
138        MigratorIndex::up(&index_db, None)
139            .await
140            .map_err(|e| crate::errors::DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
141
142        let tickets_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(tickets_db);
143
144        MigratorTickets::up(&tickets_db, None)
145            .await
146            .map_err(|e| crate::errors::DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
147
148        let peers_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(peers_db);
149
150        MigratorPeers::up(&peers_db, None)
151            .await
152            .map_err(|e| crate::errors::DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
153
154        let logs_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(logs_db);
155
156        MigratorChainLogs::up(&logs_db, None)
157            .await
158            .map_err(|e| crate::errors::DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
159
160        // Reset the peer network information
161        let res = hopr_db_entity::network_peer::Entity::delete_many()
162            .filter(
163                sea_orm::Condition::all().add(
164                    hopr_db_entity::network_peer::Column::LastSeen.lt(chrono::DateTime::<chrono::Utc>::from(
165                        hopr_platform::time::native::current_time()
166                            .checked_sub(std::time::Duration::from_secs(
167                                std::env::var("HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS")
168                                    .unwrap_or_else(|_| {
169                                        HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS.to_string()
170                                    })
171                                    .parse::<u64>()
172                                    .unwrap_or(HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS),
173                            ))
174                            .unwrap_or_else(hopr_platform::time::native::current_time),
175                    )),
176                ),
177            )
178            .exec(&peers_db)
179            .await
180            .map_err(|e| crate::errors::DbSqlError::Construction(format!("must reset peers on init: {e}")))?;
181        debug!(rows = res.rows_affected, "Cleaned up rows from the 'peers' table");
182
183        // Reset all BeingAggregated ticket states to Untouched
184        ticket::Entity::update_many()
185            .filter(ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
186            .col_expr(
187                ticket::Column::State,
188                Expr::value(AcknowledgedTicketStatus::Untouched as u8),
189            )
190            .exec(&tickets_db)
191            .await?;
192
193        let caches = Arc::new(HoprDbCaches::default());
194        caches.invalidate_all();
195
196        Ok(Self {
197            me_onchain: chain_key.public().to_address(),
198            chain_key,
199            index_db,
200            peers_db,
201            logs_db,
202            ticket_manager: Arc::new(TicketManager::new(tickets_db.clone(), caches.clone())),
203            tickets_db,
204            caches,
205        })
206    }
207
208    /// Starts ticket processing by the [TicketManager] with an optional new ticket notifier.
209    /// Without calling this method, tickets will not be persisted into the DB.
210    ///
211    /// If the notifier is given, it will receive notifications once new ticket has been
212    /// persisted into the Tickets DB.
213    pub fn start_ticket_processing(&self, ticket_notifier: Option<UnboundedSender<AcknowledgedTicket>>) -> Result<()> {
214        if let Some(notifier) = ticket_notifier {
215            self.ticket_manager.start_ticket_processing(notifier)
216        } else {
217            self.ticket_manager.start_ticket_processing(futures::sink::drain())
218        }
219    }
220}
221
222impl HoprDbAllOperations for HoprDb {}
223
224#[cfg(test)]
225mod tests {
226    use crate::db::HoprDb;
227    use crate::{HoprDbGeneralModelOperations, TargetDb};
228    use hopr_crypto_types::keypairs::{ChainKeypair, OffchainKeypair};
229    use hopr_crypto_types::prelude::Keypair;
230    use hopr_db_api::peers::{HoprDbPeersOperations, PeerOrigin};
231    use hopr_primitive_types::sma::SingleSumSMA;
232    use libp2p_identity::PeerId;
233    use migration::{MigratorChainLogs, MigratorIndex, MigratorPeers, MigratorTickets, MigratorTrait};
234    use multiaddr::Multiaddr;
235    use rand::{distributions::Alphanumeric, Rng}; // 0.8
236
237    #[async_std::test]
238    async fn test_basic_db_init() -> anyhow::Result<()> {
239        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
240
241        // NOTE: cfg-if this on Postgres to do only `Migrator::status(db.conn(Default::default)).await.expect("status must be ok");`
242        MigratorIndex::status(db.conn(TargetDb::Index)).await?;
243        MigratorTickets::status(db.conn(TargetDb::Tickets)).await?;
244        MigratorPeers::status(db.conn(TargetDb::Peers)).await?;
245        MigratorChainLogs::status(db.conn(TargetDb::Logs)).await?;
246
247        Ok(())
248    }
249
250    #[async_std::test]
251    async fn peers_without_any_recent_updates_should_be_discarded_on_restarts() -> anyhow::Result<()> {
252        let random_filename: String = rand::thread_rng()
253            .sample_iter(&Alphanumeric)
254            .take(15)
255            .map(char::from)
256            .collect();
257        let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
258
259        let peer_id: PeerId = OffchainKeypair::random().public().into();
260        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
261        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
262
263        let path = std::path::Path::new(&random_tmp_file);
264
265        {
266            let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
267
268            db.add_network_peer(
269                &peer_id,
270                PeerOrigin::IncomingConnection,
271                vec![ma_1.clone(), ma_2.clone()],
272                0.0,
273                25,
274            )
275            .await?;
276        }
277        {
278            let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
279
280            let not_found_peer = db.get_network_peer(&peer_id).await?;
281
282            assert_eq!(not_found_peer, None);
283        }
284
285        Ok(())
286    }
287
288    #[async_std::test]
289    async fn peers_with_a_recent_update_should_be_retained_in_the_database() -> anyhow::Result<()> {
290        let random_filename: String = rand::thread_rng()
291            .sample_iter(&Alphanumeric)
292            .take(15)
293            .map(char::from)
294            .collect();
295        let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
296
297        let ofk = OffchainKeypair::random();
298        let peer_id: PeerId = (*ofk.public()).into();
299        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
300        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
301
302        let path = std::path::Path::new(&random_tmp_file);
303
304        {
305            let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
306
307            db.add_network_peer(
308                &peer_id,
309                PeerOrigin::IncomingConnection,
310                vec![ma_1.clone(), ma_2.clone()],
311                0.0,
312                25,
313            )
314            .await?;
315
316            let ten_seconds_ago = std::time::SystemTime::now() - std::time::Duration::from_secs(10);
317
318            db.update_network_peer(hopr_db_api::peers::PeerStatus {
319                id: (*ofk.public(), peer_id),
320                origin: PeerOrigin::Initialization,
321                is_public: true,
322                last_seen: ten_seconds_ago,
323                last_seen_latency: std::time::Duration::from_millis(10),
324                heartbeats_sent: 1,
325                heartbeats_succeeded: 1,
326                backoff: 1.0,
327                ignored: None,
328                peer_version: None,
329                multiaddresses: vec![ma_1.clone(), ma_2.clone()],
330                quality: 1.0,
331                quality_avg: SingleSumSMA::new(2),
332            })
333            .await?;
334        }
335        {
336            let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
337
338            let found_peer = db.get_network_peer(&peer_id).await?.map(|p| p.id.1);
339
340            assert_eq!(found_peer, Some(peer_id));
341        }
342
343        Ok(())
344    }
345}