1use std::{
2 fs,
3 path::{Path, PathBuf},
4 sync::Arc,
5 time::Duration,
6};
7
8use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
9use hopr_internal_types::prelude::ChannelEntry;
10use hopr_primitive_types::primitives::Address;
11use migration::{MigratorPeers, MigratorTickets, MigratorTrait};
12use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, SqlxSqliteConnector};
13use sqlx::{
14 ConnectOptions, SqlitePool,
15 pool::PoolOptions,
16 sqlite::{SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous},
17};
18use tracing::{debug, log::LevelFilter};
19use validator::Validate;
20
21use crate::{cache::NodeDbCaches, errors::NodeDbError, ticket_manager::TicketManager};
22
23pub const SQL_DB_PEERS_FILE_NAME: &str = "hopr_peers.db";
25pub const SQL_DB_TICKETS_FILE_NAME: &str = "hopr_tickets.db";
27
28pub const HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS: u64 = 5 * 60; pub const MIN_SURB_RING_BUFFER_SIZE: usize = 1024;
31
32#[derive(Clone, Debug, validator::Validate, smart_default::SmartDefault)]
33pub struct HoprNodeDbConfig {
34 #[default(true)]
35 pub create_if_missing: bool,
36 #[default(false)]
37 pub force_create: bool,
38 #[default(Duration::from_secs(5))]
39 pub log_slow_queries: Duration,
40 #[default(10_000)]
41 #[validate(range(min = MIN_SURB_RING_BUFFER_SIZE))]
42 pub surb_ring_buffer_size: usize,
43 #[default(1000)]
44 #[validate(range(min = 2))]
45 pub surb_distress_threshold: usize,
46}
47
48#[derive(Clone)]
49pub struct HoprNodeDb {
50 pub(crate) tickets_db: sea_orm::DatabaseConnection,
51 pub(crate) peers_db: sea_orm::DatabaseConnection,
52 pub(crate) ticket_manager: Arc<TicketManager>,
53 pub(crate) caches: Arc<NodeDbCaches>,
54 pub(crate) me_onchain: ChainKeypair,
55 pub(crate) me_address: Address,
56 pub(crate) cfg: HoprNodeDbConfig,
57}
58
59impl HoprNodeDb {
60 pub async fn new(directory: &Path, chain_key: ChainKeypair, cfg: HoprNodeDbConfig) -> Result<Self, NodeDbError> {
61 #[cfg(all(feature = "prometheus", not(test)))]
62 {
63 lazy_static::initialize(&crate::protocol::METRIC_RECEIVED_ACKS);
64 lazy_static::initialize(&crate::protocol::METRIC_SENT_ACKS);
65 lazy_static::initialize(&crate::protocol::METRIC_TICKETS_COUNT);
66 }
67
68 cfg.validate().map_err(|e| NodeDbError::Other(e.into()))?;
69
70 fs::create_dir_all(directory).map_err(|e| NodeDbError::Other(e.into()))?;
71
72 let peers_options = PoolOptions::new()
73 .acquire_timeout(Duration::from_secs(60)) .idle_timeout(Some(Duration::from_secs(10 * 60))) .max_lifetime(Some(Duration::from_secs(30 * 60))); let peers = Self::create_pool(
78 cfg.clone(),
79 directory.to_path_buf(),
80 peers_options,
81 Some(0),
82 Some(300),
83 SQL_DB_PEERS_FILE_NAME,
84 )
85 .await?;
86
87 let tickets = Self::create_pool(
88 cfg.clone(),
89 directory.to_path_buf(),
90 PoolOptions::new(),
91 Some(0),
92 Some(50),
93 SQL_DB_TICKETS_FILE_NAME,
94 )
95 .await?;
96
97 #[cfg(feature = "sqlite")]
98 Self::new_sqlx_sqlite(chain_key, tickets, peers, cfg).await
99 }
100
101 #[cfg(feature = "sqlite")]
102 pub async fn new_in_memory(chain_key: ChainKeypair) -> Result<Self, NodeDbError> {
103 Self::new_sqlx_sqlite(
104 chain_key,
105 SqlitePool::connect(":memory:")
106 .await
107 .map_err(|e| NodeDbError::Other(e.into()))?,
108 SqlitePool::connect(":memory:")
109 .await
110 .map_err(|e| NodeDbError::Other(e.into()))?,
111 Default::default(),
112 )
113 .await
114 }
115
116 #[cfg(feature = "sqlite")]
117 async fn new_sqlx_sqlite(
118 me_onchain: ChainKeypair,
119 peers_db_pool: SqlitePool,
120 tickets_db_pool: SqlitePool,
121 cfg: HoprNodeDbConfig,
122 ) -> Result<Self, NodeDbError> {
123 let tickets_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(tickets_db_pool);
124 MigratorTickets::up(&tickets_db, None).await?;
125
126 let peers_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(peers_db_pool);
127 MigratorPeers::up(&peers_db, None).await?;
128
129 let res = hopr_db_entity::network_peer::Entity::delete_many()
131 .filter(
132 sea_orm::Condition::all().add(
133 hopr_db_entity::network_peer::Column::LastSeen.lt(chrono::DateTime::<chrono::Utc>::from(
134 hopr_platform::time::native::current_time()
135 .checked_sub(std::time::Duration::from_secs(
136 std::env::var("HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS")
137 .unwrap_or_else(|_| {
138 HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS.to_string()
139 })
140 .parse::<u64>()
141 .unwrap_or(HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS),
142 ))
143 .unwrap_or_else(hopr_platform::time::native::current_time),
144 )),
145 ),
146 )
147 .exec(&peers_db)
148 .await?;
149 debug!(rows = res.rows_affected, "Cleaned up rows from the 'peers' table");
150
151 let caches = Arc::new(NodeDbCaches::default());
152 caches.invalidate_all();
153
154 Ok(Self {
155 ticket_manager: Arc::new(TicketManager::new(tickets_db.clone(), caches.clone())),
156 tickets_db,
157 peers_db,
158 caches,
159 me_address: me_onchain.public().to_address(),
160 me_onchain,
161 cfg,
162 })
163 }
164
165 async fn create_pool(
166 cfg: HoprNodeDbConfig,
167 directory: PathBuf,
168 mut options: PoolOptions<sqlx::Sqlite>,
169 min_conn: Option<u32>,
170 max_conn: Option<u32>,
171 path: &str,
172 ) -> Result<SqlitePool, NodeDbError> {
173 if let Some(min_conn) = min_conn {
174 options = options.min_connections(min_conn);
175 }
176 if let Some(max_conn) = max_conn {
177 options = options.max_connections(max_conn);
178 }
179
180 let sqlite_cfg = SqliteConnectOptions::default()
181 .create_if_missing(cfg.create_if_missing)
182 .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
183 .log_statements(LevelFilter::Debug)
184 .journal_mode(SqliteJournalMode::Wal)
185 .synchronous(SqliteSynchronous::Normal)
186 .auto_vacuum(SqliteAutoVacuum::Full)
187 .page_size(4096)
189 .pragma("cache_size", "-30000") .pragma("busy_timeout", "1000"); let pool = options.connect_with(sqlite_cfg.filename(directory.join(path))).await?;
193
194 Ok(pool)
195 }
196
197 pub async fn invalidate_unrealized_value(&self, channel: &ChannelEntry) {
198 self.caches
199 .unrealized_value
200 .invalidate(&(channel.get_id(), channel.channel_epoch))
201 .await;
202 }
203
204 pub fn config(&self) -> &HoprNodeDbConfig {
205 &self.cfg
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use hopr_api::{db::*, *};
212 use hopr_crypto_types::keypairs::OffchainKeypair;
213 use hopr_primitive_types::prelude::SingleSumSMA;
214 use rand::{Rng, distributions::Alphanumeric};
215
216 use super::*;
217
218 #[tokio::test]
219 async fn test_basic_db_init() -> anyhow::Result<()> {
220 let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
221 MigratorTickets::status(&db.tickets_db).await?;
222 MigratorPeers::status(&db.peers_db).await?;
223
224 Ok(())
225 }
226
227 #[tokio::test]
228 async fn peers_without_any_recent_updates_should_be_discarded_on_restarts() -> anyhow::Result<()> {
229 let random_filename: String = rand::thread_rng()
230 .sample_iter(&Alphanumeric)
231 .take(15)
232 .map(char::from)
233 .collect();
234 let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
235
236 let peer_id: PeerId = OffchainKeypair::random().public().into();
237 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
238 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
239
240 let path = std::path::Path::new(&random_tmp_file);
241
242 {
243 let db = HoprNodeDb::new(path, ChainKeypair::random(), HoprNodeDbConfig::default()).await?;
244
245 db.add_network_peer(
246 &peer_id,
247 PeerOrigin::IncomingConnection,
248 vec![ma_1.clone(), ma_2.clone()],
249 0.0,
250 25,
251 )
252 .await?;
253 }
254
255 {
256 let db = HoprNodeDb::new(path, ChainKeypair::random(), HoprNodeDbConfig::default()).await?;
257
258 let not_found_peer = db.get_network_peer(&peer_id).await?;
259
260 assert_eq!(not_found_peer, None);
261 }
262
263 Ok(())
264 }
265
266 #[tokio::test]
267 async fn peers_with_a_recent_update_should_be_retained_in_the_database() -> anyhow::Result<()> {
268 let random_filename: String = rand::thread_rng()
269 .sample_iter(&Alphanumeric)
270 .take(15)
271 .map(char::from)
272 .collect();
273 let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
274
275 let ofk = OffchainKeypair::random();
276 let peer_id: PeerId = (*ofk.public()).into();
277 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
278 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
279
280 let path = std::path::Path::new(&random_tmp_file);
281
282 {
283 let db = HoprNodeDb::new(path, ChainKeypair::random(), HoprNodeDbConfig::default()).await?;
284
285 db.add_network_peer(
286 &peer_id,
287 PeerOrigin::IncomingConnection,
288 vec![ma_1.clone(), ma_2.clone()],
289 0.0,
290 25,
291 )
292 .await?;
293
294 let ten_seconds_ago = std::time::SystemTime::now() - std::time::Duration::from_secs(10);
295
296 db.update_network_peer(PeerStatus {
297 id: (*ofk.public(), peer_id),
298 origin: PeerOrigin::Initialization,
299 last_seen: ten_seconds_ago,
300 last_seen_latency: std::time::Duration::from_millis(10),
301 heartbeats_sent: 1,
302 heartbeats_succeeded: 1,
303 backoff: 1.0,
304 ignored_until: None,
305 multiaddresses: vec![ma_1.clone(), ma_2.clone()],
306 quality: 1.0,
307 quality_avg: SingleSumSMA::new(2),
308 })
309 .await?;
310 }
311 {
312 let db = HoprNodeDb::new(path, ChainKeypair::random(), HoprNodeDbConfig::default()).await?;
313
314 let found_peer = db.get_network_peer(&peer_id).await?.map(|p| p.id.1);
315
316 assert_eq!(found_peer, Some(peer_id));
317 }
318
319 Ok(())
320 }
321}