hopr_db_node/
db.rs

1use std::{
2    fs,
3    path::{Path, PathBuf},
4    sync::Arc,
5    time::Duration,
6};
7
8use hopr_internal_types::channels::ChannelId;
9use hopr_primitive_types::prelude::HoprBalance;
10use migration::{MigratorPeers, MigratorTickets, MigratorTrait};
11use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, SqlxSqliteConnector};
12use sqlx::{
13    ConnectOptions, SqlitePool,
14    pool::PoolOptions,
15    sqlite::{SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous},
16};
17use tracing::{debug, log::LevelFilter};
18use validator::Validate;
19
20use crate::errors::NodeDbError;
21
22/// Filename for the network peers database.
23pub const SQL_DB_PEERS_FILE_NAME: &str = "hopr_peers.db";
24/// Filename for the payment tickets database.
25pub const SQL_DB_TICKETS_FILE_NAME: &str = "hopr_tickets.db";
26
27pub const HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS: u64 = 5 * 60; // 5 minutes
28
29#[derive(Clone, Debug, validator::Validate, smart_default::SmartDefault)]
30pub struct HoprNodeDbConfig {
31    #[default(true)]
32    pub create_if_missing: bool,
33    #[default(false)]
34    pub force_create: bool,
35    #[default(Duration::from_secs(5))]
36    pub log_slow_queries: Duration,
37}
38
39#[derive(Clone)]
40pub struct HoprNodeDb {
41    pub(crate) tickets_db: sea_orm::DatabaseConnection,
42    pub(crate) peers_db: sea_orm::DatabaseConnection,
43    pub(crate) tickets_write_lock: Arc<async_lock::Mutex<()>>,
44    pub(crate) cfg: HoprNodeDbConfig,
45    // This value must be cached here, due to complicated invalidation logic.
46    pub(crate) unrealized_value: moka::future::Cache<(ChannelId, u32), HoprBalance>,
47}
48
49impl HoprNodeDb {
50    pub async fn new(directory: &Path, cfg: HoprNodeDbConfig) -> Result<Self, NodeDbError> {
51        cfg.validate().map_err(|e| NodeDbError::Other(e.into()))?;
52
53        fs::create_dir_all(directory).map_err(|e| NodeDbError::Other(e.into()))?;
54
55        let peers_options = PoolOptions::new()
56            .acquire_timeout(Duration::from_secs(60)) // Default is 30
57            .idle_timeout(Some(Duration::from_secs(10 * 60))) // This is the default
58            .max_lifetime(Some(Duration::from_secs(30 * 60))); // This is the default
59
60        let peers = Self::create_pool(
61            cfg.clone(),
62            directory.to_path_buf(),
63            peers_options,
64            Some(0),
65            Some(300),
66            SQL_DB_PEERS_FILE_NAME,
67        )
68        .await?;
69
70        let tickets = Self::create_pool(
71            cfg.clone(),
72            directory.to_path_buf(),
73            PoolOptions::new(),
74            Some(0),
75            Some(50),
76            SQL_DB_TICKETS_FILE_NAME,
77        )
78        .await?;
79
80        #[cfg(feature = "sqlite")]
81        Self::new_sqlx_sqlite(tickets, peers, cfg).await
82    }
83
84    #[cfg(feature = "sqlite")]
85    pub async fn new_in_memory() -> Result<Self, NodeDbError> {
86        Self::new_sqlx_sqlite(
87            SqlitePool::connect(":memory:")
88                .await
89                .map_err(|e| NodeDbError::Other(e.into()))?,
90            SqlitePool::connect(":memory:")
91                .await
92                .map_err(|e| NodeDbError::Other(e.into()))?,
93            Default::default(),
94        )
95        .await
96    }
97
98    #[cfg(feature = "sqlite")]
99    async fn new_sqlx_sqlite(
100        peers_db_pool: SqlitePool,
101        tickets_db_pool: SqlitePool,
102        cfg: HoprNodeDbConfig,
103    ) -> Result<Self, NodeDbError> {
104        let tickets_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(tickets_db_pool);
105        MigratorTickets::up(&tickets_db, None).await?;
106
107        let peers_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(peers_db_pool);
108        MigratorPeers::up(&peers_db, None).await?;
109
110        // Reset the peer network information
111        let res = hopr_db_entity::network_peer::Entity::delete_many()
112            .filter(
113                sea_orm::Condition::all().add(
114                    hopr_db_entity::network_peer::Column::LastSeen.lt(chrono::DateTime::<chrono::Utc>::from(
115                        hopr_platform::time::native::current_time()
116                            .checked_sub(std::time::Duration::from_secs(
117                                std::env::var("HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS")
118                                    .unwrap_or_else(|_| {
119                                        HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS.to_string()
120                                    })
121                                    .parse::<u64>()
122                                    .unwrap_or(HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS),
123                            ))
124                            .unwrap_or_else(hopr_platform::time::native::current_time),
125                    )),
126                ),
127            )
128            .exec(&peers_db)
129            .await?;
130        debug!(rows = res.rows_affected, "Cleaned up rows from the 'peers' table");
131
132        Ok(Self {
133            tickets_write_lock: Arc::new(async_lock::Mutex::new(())),
134            unrealized_value: moka::future::CacheBuilder::new(10_000)
135                .time_to_idle(std::time::Duration::from_secs(30))
136                .build(),
137            tickets_db,
138            peers_db,
139            cfg,
140        })
141    }
142
143    async fn create_pool(
144        cfg: HoprNodeDbConfig,
145        directory: PathBuf,
146        mut options: PoolOptions<sqlx::Sqlite>,
147        min_conn: Option<u32>,
148        max_conn: Option<u32>,
149        path: &str,
150    ) -> Result<SqlitePool, NodeDbError> {
151        if let Some(min_conn) = min_conn {
152            options = options.min_connections(min_conn);
153        }
154        if let Some(max_conn) = max_conn {
155            options = options.max_connections(max_conn);
156        }
157
158        let sqlite_cfg = SqliteConnectOptions::default()
159            .create_if_missing(cfg.create_if_missing)
160            .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
161            .log_statements(LevelFilter::Debug)
162            .journal_mode(SqliteJournalMode::Wal)
163            .synchronous(SqliteSynchronous::Normal)
164            .auto_vacuum(SqliteAutoVacuum::Full)
165            //.optimize_on_close(true, None) // Removed, because it causes optimization on each connection, due to min_connections being set to 0
166            .page_size(4096)
167            .pragma("cache_size", "-30000") // 32M
168            .pragma("busy_timeout", "1000"); // 1000ms
169
170        let pool = options.connect_with(sqlite_cfg.filename(directory.join(path))).await?;
171
172        Ok(pool)
173    }
174
175    pub fn config(&self) -> &HoprNodeDbConfig {
176        &self.cfg
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use hopr_api::{db::*, *};
183    use hopr_crypto_types::{keypairs::OffchainKeypair, prelude::Keypair};
184    use hopr_primitive_types::prelude::SingleSumSMA;
185    use rand::{Rng, distributions::Alphanumeric};
186
187    use super::*;
188
189    #[tokio::test]
190    async fn test_basic_db_init() -> anyhow::Result<()> {
191        let db = HoprNodeDb::new_in_memory().await?;
192        MigratorTickets::status(&db.tickets_db).await?;
193        MigratorPeers::status(&db.peers_db).await?;
194
195        Ok(())
196    }
197
198    #[tokio::test]
199    async fn peers_without_any_recent_updates_should_be_discarded_on_restarts() -> anyhow::Result<()> {
200        let random_filename: String = rand::thread_rng()
201            .sample_iter(&Alphanumeric)
202            .take(15)
203            .map(char::from)
204            .collect();
205        let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
206
207        let peer_id: PeerId = OffchainKeypair::random().public().into();
208        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
209        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
210
211        let path = std::path::Path::new(&random_tmp_file);
212
213        {
214            let db = HoprNodeDb::new(path, HoprNodeDbConfig::default()).await?;
215
216            db.add_network_peer(
217                &peer_id,
218                PeerOrigin::IncomingConnection,
219                vec![ma_1.clone(), ma_2.clone()],
220                0.0,
221                25,
222            )
223            .await?;
224        }
225
226        {
227            let db = HoprNodeDb::new(path, HoprNodeDbConfig::default()).await?;
228
229            let not_found_peer = db.get_network_peer(&peer_id).await?;
230
231            assert_eq!(not_found_peer, None);
232        }
233
234        Ok(())
235    }
236
237    #[tokio::test]
238    async fn peers_with_a_recent_update_should_be_retained_in_the_database() -> anyhow::Result<()> {
239        let random_filename: String = rand::thread_rng()
240            .sample_iter(&Alphanumeric)
241            .take(15)
242            .map(char::from)
243            .collect();
244        let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
245
246        let ofk = OffchainKeypair::random();
247        let peer_id: PeerId = (*ofk.public()).into();
248        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
249        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
250
251        let path = std::path::Path::new(&random_tmp_file);
252
253        {
254            let db = HoprNodeDb::new(path, HoprNodeDbConfig::default()).await?;
255
256            db.add_network_peer(
257                &peer_id,
258                PeerOrigin::IncomingConnection,
259                vec![ma_1.clone(), ma_2.clone()],
260                0.0,
261                25,
262            )
263            .await?;
264
265            let ten_seconds_ago = std::time::SystemTime::now() - std::time::Duration::from_secs(10);
266
267            db.update_network_peer(PeerStatus {
268                id: (*ofk.public(), peer_id),
269                origin: PeerOrigin::Initialization,
270                last_seen: ten_seconds_ago,
271                last_seen_latency: std::time::Duration::from_millis(10),
272                heartbeats_sent: 1,
273                heartbeats_succeeded: 1,
274                backoff: 1.0,
275                ignored_until: None,
276                multiaddresses: vec![ma_1.clone(), ma_2.clone()],
277                quality: 1.0,
278                quality_avg: SingleSumSMA::new(2),
279            })
280            .await?;
281        }
282        {
283            let db = HoprNodeDb::new(path, HoprNodeDbConfig::default()).await?;
284
285            let found_peer = db.get_network_peer(&peer_id).await?.map(|p| p.id.1);
286
287            assert_eq!(found_peer, Some(peer_id));
288        }
289
290        Ok(())
291    }
292}