hopr_db_node/
db.rs

1use std::{
2    fs,
3    path::{Path, PathBuf},
4    sync::Arc,
5    time::Duration,
6};
7
8use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
9use hopr_internal_types::prelude::ChannelEntry;
10use hopr_primitive_types::primitives::Address;
11use migration::{MigratorPeers, MigratorTickets, MigratorTrait};
12use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, SqlxSqliteConnector};
13use sqlx::{
14    ConnectOptions, SqlitePool,
15    pool::PoolOptions,
16    sqlite::{SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous},
17};
18use tracing::{debug, log::LevelFilter};
19use validator::Validate;
20
21use crate::{cache::NodeDbCaches, errors::NodeDbError, ticket_manager::TicketManager};
22
23/// Filename for the network peers database.
24pub const SQL_DB_PEERS_FILE_NAME: &str = "hopr_peers.db";
25/// Filename for the payment tickets database.
26pub const SQL_DB_TICKETS_FILE_NAME: &str = "hopr_tickets.db";
27
28pub const HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS: u64 = 5 * 60; // 5 minutes
29
30pub const MIN_SURB_RING_BUFFER_SIZE: usize = 1024;
31
32#[derive(Clone, Debug, validator::Validate, smart_default::SmartDefault)]
33pub struct HoprNodeDbConfig {
34    #[default(true)]
35    pub create_if_missing: bool,
36    #[default(false)]
37    pub force_create: bool,
38    #[default(Duration::from_secs(5))]
39    pub log_slow_queries: Duration,
40    #[default(10_000)]
41    #[validate(range(min = MIN_SURB_RING_BUFFER_SIZE))]
42    pub surb_ring_buffer_size: usize,
43    #[default(1000)]
44    #[validate(range(min = 2))]
45    pub surb_distress_threshold: usize,
46}
47
48#[derive(Clone)]
49pub struct HoprNodeDb {
50    pub(crate) tickets_db: sea_orm::DatabaseConnection,
51    pub(crate) peers_db: sea_orm::DatabaseConnection,
52    pub(crate) ticket_manager: Arc<TicketManager>,
53    pub(crate) caches: Arc<NodeDbCaches>,
54    pub(crate) me_onchain: ChainKeypair,
55    pub(crate) me_address: Address,
56    pub(crate) cfg: HoprNodeDbConfig,
57}
58
59impl HoprNodeDb {
60    pub async fn new(directory: &Path, chain_key: ChainKeypair, cfg: HoprNodeDbConfig) -> Result<Self, NodeDbError> {
61        #[cfg(all(feature = "prometheus", not(test)))]
62        {
63            lazy_static::initialize(&crate::protocol::METRIC_RECEIVED_ACKS);
64            lazy_static::initialize(&crate::protocol::METRIC_SENT_ACKS);
65            lazy_static::initialize(&crate::protocol::METRIC_TICKETS_COUNT);
66        }
67
68        cfg.validate().map_err(|e| NodeDbError::Other(e.into()))?;
69
70        fs::create_dir_all(directory).map_err(|e| NodeDbError::Other(e.into()))?;
71
72        let peers_options = PoolOptions::new()
73            .acquire_timeout(Duration::from_secs(60)) // Default is 30
74            .idle_timeout(Some(Duration::from_secs(10 * 60))) // This is the default
75            .max_lifetime(Some(Duration::from_secs(30 * 60))); // This is the default
76
77        let peers = Self::create_pool(
78            cfg.clone(),
79            directory.to_path_buf(),
80            peers_options,
81            Some(0),
82            Some(300),
83            SQL_DB_PEERS_FILE_NAME,
84        )
85        .await?;
86
87        let tickets = Self::create_pool(
88            cfg.clone(),
89            directory.to_path_buf(),
90            PoolOptions::new(),
91            Some(0),
92            Some(50),
93            SQL_DB_TICKETS_FILE_NAME,
94        )
95        .await?;
96
97        #[cfg(feature = "sqlite")]
98        Self::new_sqlx_sqlite(chain_key, tickets, peers, cfg).await
99    }
100
101    #[cfg(feature = "sqlite")]
102    pub async fn new_in_memory(chain_key: ChainKeypair) -> Result<Self, NodeDbError> {
103        Self::new_sqlx_sqlite(
104            chain_key,
105            SqlitePool::connect(":memory:")
106                .await
107                .map_err(|e| NodeDbError::Other(e.into()))?,
108            SqlitePool::connect(":memory:")
109                .await
110                .map_err(|e| NodeDbError::Other(e.into()))?,
111            Default::default(),
112        )
113        .await
114    }
115
116    #[cfg(feature = "sqlite")]
117    async fn new_sqlx_sqlite(
118        me_onchain: ChainKeypair,
119        peers_db_pool: SqlitePool,
120        tickets_db_pool: SqlitePool,
121        cfg: HoprNodeDbConfig,
122    ) -> Result<Self, NodeDbError> {
123        let tickets_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(tickets_db_pool);
124        MigratorTickets::up(&tickets_db, None).await?;
125
126        let peers_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(peers_db_pool);
127        MigratorPeers::up(&peers_db, None).await?;
128
129        // Reset the peer network information
130        let res = hopr_db_entity::network_peer::Entity::delete_many()
131            .filter(
132                sea_orm::Condition::all().add(
133                    hopr_db_entity::network_peer::Column::LastSeen.lt(chrono::DateTime::<chrono::Utc>::from(
134                        hopr_platform::time::native::current_time()
135                            .checked_sub(std::time::Duration::from_secs(
136                                std::env::var("HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS")
137                                    .unwrap_or_else(|_| {
138                                        HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS.to_string()
139                                    })
140                                    .parse::<u64>()
141                                    .unwrap_or(HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS),
142                            ))
143                            .unwrap_or_else(hopr_platform::time::native::current_time),
144                    )),
145                ),
146            )
147            .exec(&peers_db)
148            .await?;
149        debug!(rows = res.rows_affected, "Cleaned up rows from the 'peers' table");
150
151        let caches = Arc::new(NodeDbCaches::default());
152        caches.invalidate_all();
153
154        Ok(Self {
155            ticket_manager: Arc::new(TicketManager::new(tickets_db.clone(), caches.clone())),
156            tickets_db,
157            peers_db,
158            caches,
159            me_address: me_onchain.public().to_address(),
160            me_onchain,
161            cfg,
162        })
163    }
164
165    async fn create_pool(
166        cfg: HoprNodeDbConfig,
167        directory: PathBuf,
168        mut options: PoolOptions<sqlx::Sqlite>,
169        min_conn: Option<u32>,
170        max_conn: Option<u32>,
171        path: &str,
172    ) -> Result<SqlitePool, NodeDbError> {
173        if let Some(min_conn) = min_conn {
174            options = options.min_connections(min_conn);
175        }
176        if let Some(max_conn) = max_conn {
177            options = options.max_connections(max_conn);
178        }
179
180        let sqlite_cfg = SqliteConnectOptions::default()
181            .create_if_missing(cfg.create_if_missing)
182            .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
183            .log_statements(LevelFilter::Debug)
184            .journal_mode(SqliteJournalMode::Wal)
185            .synchronous(SqliteSynchronous::Normal)
186            .auto_vacuum(SqliteAutoVacuum::Full)
187            //.optimize_on_close(true, None) // Removed, because it causes optimization on each connection, due to min_connections being set to 0
188            .page_size(4096)
189            .pragma("cache_size", "-30000") // 32M
190            .pragma("busy_timeout", "1000"); // 1000ms
191
192        let pool = options.connect_with(sqlite_cfg.filename(directory.join(path))).await?;
193
194        Ok(pool)
195    }
196
197    pub async fn invalidate_unrealized_value(&self, channel: &ChannelEntry) {
198        self.caches
199            .unrealized_value
200            .invalidate(&(channel.get_id(), channel.channel_epoch))
201            .await;
202    }
203
204    pub fn config(&self) -> &HoprNodeDbConfig {
205        &self.cfg
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use hopr_api::{db::*, *};
212    use hopr_crypto_types::keypairs::OffchainKeypair;
213    use hopr_primitive_types::prelude::SingleSumSMA;
214    use rand::{Rng, distributions::Alphanumeric};
215
216    use super::*;
217
218    #[tokio::test]
219    async fn test_basic_db_init() -> anyhow::Result<()> {
220        let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
221        MigratorTickets::status(&db.tickets_db).await?;
222        MigratorPeers::status(&db.peers_db).await?;
223
224        Ok(())
225    }
226
227    #[tokio::test]
228    async fn peers_without_any_recent_updates_should_be_discarded_on_restarts() -> anyhow::Result<()> {
229        let random_filename: String = rand::thread_rng()
230            .sample_iter(&Alphanumeric)
231            .take(15)
232            .map(char::from)
233            .collect();
234        let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
235
236        let peer_id: PeerId = OffchainKeypair::random().public().into();
237        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
238        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
239
240        let path = std::path::Path::new(&random_tmp_file);
241
242        {
243            let db = HoprNodeDb::new(path, ChainKeypair::random(), HoprNodeDbConfig::default()).await?;
244
245            db.add_network_peer(
246                &peer_id,
247                PeerOrigin::IncomingConnection,
248                vec![ma_1.clone(), ma_2.clone()],
249                0.0,
250                25,
251            )
252            .await?;
253        }
254
255        {
256            let db = HoprNodeDb::new(path, ChainKeypair::random(), HoprNodeDbConfig::default()).await?;
257
258            let not_found_peer = db.get_network_peer(&peer_id).await?;
259
260            assert_eq!(not_found_peer, None);
261        }
262
263        Ok(())
264    }
265
266    #[tokio::test]
267    async fn peers_with_a_recent_update_should_be_retained_in_the_database() -> anyhow::Result<()> {
268        let random_filename: String = rand::thread_rng()
269            .sample_iter(&Alphanumeric)
270            .take(15)
271            .map(char::from)
272            .collect();
273        let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
274
275        let ofk = OffchainKeypair::random();
276        let peer_id: PeerId = (*ofk.public()).into();
277        let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
278        let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
279
280        let path = std::path::Path::new(&random_tmp_file);
281
282        {
283            let db = HoprNodeDb::new(path, ChainKeypair::random(), HoprNodeDbConfig::default()).await?;
284
285            db.add_network_peer(
286                &peer_id,
287                PeerOrigin::IncomingConnection,
288                vec![ma_1.clone(), ma_2.clone()],
289                0.0,
290                25,
291            )
292            .await?;
293
294            let ten_seconds_ago = std::time::SystemTime::now() - std::time::Duration::from_secs(10);
295
296            db.update_network_peer(PeerStatus {
297                id: (*ofk.public(), peer_id),
298                origin: PeerOrigin::Initialization,
299                last_seen: ten_seconds_ago,
300                last_seen_latency: std::time::Duration::from_millis(10),
301                heartbeats_sent: 1,
302                heartbeats_succeeded: 1,
303                backoff: 1.0,
304                ignored_until: None,
305                multiaddresses: vec![ma_1.clone(), ma_2.clone()],
306                quality: 1.0,
307                quality_avg: SingleSumSMA::new(2),
308            })
309            .await?;
310        }
311        {
312            let db = HoprNodeDb::new(path, ChainKeypair::random(), HoprNodeDbConfig::default()).await?;
313
314            let found_peer = db.get_network_peer(&peer_id).await?.map(|p| p.id.1);
315
316            assert_eq!(found_peer, Some(peer_id));
317        }
318
319        Ok(())
320    }
321}