1use std::{
2 fs,
3 path::{Path, PathBuf},
4 sync::Arc,
5 time::Duration,
6};
7
8use hopr_internal_types::channels::ChannelId;
9use hopr_primitive_types::prelude::HoprBalance;
10use migration::{MigratorPeers, MigratorTickets, MigratorTrait};
11use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, SqlxSqliteConnector};
12use sqlx::{
13 ConnectOptions, SqlitePool,
14 pool::PoolOptions,
15 sqlite::{SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous},
16};
17use tracing::{debug, log::LevelFilter};
18use validator::Validate;
19
20use crate::errors::NodeDbError;
21
22pub const SQL_DB_PEERS_FILE_NAME: &str = "hopr_peers.db";
24pub const SQL_DB_TICKETS_FILE_NAME: &str = "hopr_tickets.db";
26
27pub const HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS: u64 = 5 * 60; #[derive(Clone, Debug, validator::Validate, smart_default::SmartDefault)]
30pub struct HoprNodeDbConfig {
31 #[default(true)]
32 pub create_if_missing: bool,
33 #[default(false)]
34 pub force_create: bool,
35 #[default(Duration::from_secs(5))]
36 pub log_slow_queries: Duration,
37}
38
39#[derive(Clone)]
40pub struct HoprNodeDb {
41 pub(crate) tickets_db: sea_orm::DatabaseConnection,
42 pub(crate) peers_db: sea_orm::DatabaseConnection,
43 pub(crate) tickets_write_lock: Arc<async_lock::Mutex<()>>,
44 pub(crate) cfg: HoprNodeDbConfig,
45 pub(crate) unrealized_value: moka::future::Cache<(ChannelId, u32), HoprBalance>,
47}
48
49impl HoprNodeDb {
50 pub async fn new(directory: &Path, cfg: HoprNodeDbConfig) -> Result<Self, NodeDbError> {
51 cfg.validate().map_err(|e| NodeDbError::Other(e.into()))?;
52
53 fs::create_dir_all(directory).map_err(|e| NodeDbError::Other(e.into()))?;
54
55 let peers_options = PoolOptions::new()
56 .acquire_timeout(Duration::from_secs(60)) .idle_timeout(Some(Duration::from_secs(10 * 60))) .max_lifetime(Some(Duration::from_secs(30 * 60))); let peers = Self::create_pool(
61 cfg.clone(),
62 directory.to_path_buf(),
63 peers_options,
64 Some(0),
65 Some(300),
66 SQL_DB_PEERS_FILE_NAME,
67 )
68 .await?;
69
70 let tickets = Self::create_pool(
71 cfg.clone(),
72 directory.to_path_buf(),
73 PoolOptions::new(),
74 Some(0),
75 Some(50),
76 SQL_DB_TICKETS_FILE_NAME,
77 )
78 .await?;
79
80 #[cfg(feature = "sqlite")]
81 Self::new_sqlx_sqlite(tickets, peers, cfg).await
82 }
83
84 #[cfg(feature = "sqlite")]
85 pub async fn new_in_memory() -> Result<Self, NodeDbError> {
86 Self::new_sqlx_sqlite(
87 SqlitePool::connect(":memory:")
88 .await
89 .map_err(|e| NodeDbError::Other(e.into()))?,
90 SqlitePool::connect(":memory:")
91 .await
92 .map_err(|e| NodeDbError::Other(e.into()))?,
93 Default::default(),
94 )
95 .await
96 }
97
98 #[cfg(feature = "sqlite")]
99 async fn new_sqlx_sqlite(
100 peers_db_pool: SqlitePool,
101 tickets_db_pool: SqlitePool,
102 cfg: HoprNodeDbConfig,
103 ) -> Result<Self, NodeDbError> {
104 let tickets_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(tickets_db_pool);
105 MigratorTickets::up(&tickets_db, None).await?;
106
107 let peers_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(peers_db_pool);
108 MigratorPeers::up(&peers_db, None).await?;
109
110 let res = hopr_db_entity::network_peer::Entity::delete_many()
112 .filter(
113 sea_orm::Condition::all().add(
114 hopr_db_entity::network_peer::Column::LastSeen.lt(chrono::DateTime::<chrono::Utc>::from(
115 hopr_platform::time::native::current_time()
116 .checked_sub(std::time::Duration::from_secs(
117 std::env::var("HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS")
118 .unwrap_or_else(|_| {
119 HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS.to_string()
120 })
121 .parse::<u64>()
122 .unwrap_or(HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS),
123 ))
124 .unwrap_or_else(hopr_platform::time::native::current_time),
125 )),
126 ),
127 )
128 .exec(&peers_db)
129 .await?;
130 debug!(rows = res.rows_affected, "Cleaned up rows from the 'peers' table");
131
132 Ok(Self {
133 tickets_write_lock: Arc::new(async_lock::Mutex::new(())),
134 unrealized_value: moka::future::CacheBuilder::new(10_000)
135 .time_to_idle(std::time::Duration::from_secs(30))
136 .build(),
137 tickets_db,
138 peers_db,
139 cfg,
140 })
141 }
142
143 async fn create_pool(
144 cfg: HoprNodeDbConfig,
145 directory: PathBuf,
146 mut options: PoolOptions<sqlx::Sqlite>,
147 min_conn: Option<u32>,
148 max_conn: Option<u32>,
149 path: &str,
150 ) -> Result<SqlitePool, NodeDbError> {
151 if let Some(min_conn) = min_conn {
152 options = options.min_connections(min_conn);
153 }
154 if let Some(max_conn) = max_conn {
155 options = options.max_connections(max_conn);
156 }
157
158 let sqlite_cfg = SqliteConnectOptions::default()
159 .create_if_missing(cfg.create_if_missing)
160 .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
161 .log_statements(LevelFilter::Debug)
162 .journal_mode(SqliteJournalMode::Wal)
163 .synchronous(SqliteSynchronous::Normal)
164 .auto_vacuum(SqliteAutoVacuum::Full)
165 .page_size(4096)
167 .pragma("cache_size", "-30000") .pragma("busy_timeout", "1000"); let pool = options.connect_with(sqlite_cfg.filename(directory.join(path))).await?;
171
172 Ok(pool)
173 }
174
175 pub fn config(&self) -> &HoprNodeDbConfig {
176 &self.cfg
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use hopr_api::{db::*, *};
183 use hopr_crypto_types::{keypairs::OffchainKeypair, prelude::Keypair};
184 use hopr_primitive_types::prelude::SingleSumSMA;
185 use rand::{Rng, distributions::Alphanumeric};
186
187 use super::*;
188
189 #[tokio::test]
190 async fn test_basic_db_init() -> anyhow::Result<()> {
191 let db = HoprNodeDb::new_in_memory().await?;
192 MigratorTickets::status(&db.tickets_db).await?;
193 MigratorPeers::status(&db.peers_db).await?;
194
195 Ok(())
196 }
197
198 #[tokio::test]
199 async fn peers_without_any_recent_updates_should_be_discarded_on_restarts() -> anyhow::Result<()> {
200 let random_filename: String = rand::thread_rng()
201 .sample_iter(&Alphanumeric)
202 .take(15)
203 .map(char::from)
204 .collect();
205 let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
206
207 let peer_id: PeerId = OffchainKeypair::random().public().into();
208 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
209 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
210
211 let path = std::path::Path::new(&random_tmp_file);
212
213 {
214 let db = HoprNodeDb::new(path, HoprNodeDbConfig::default()).await?;
215
216 db.add_network_peer(
217 &peer_id,
218 PeerOrigin::IncomingConnection,
219 vec![ma_1.clone(), ma_2.clone()],
220 0.0,
221 25,
222 )
223 .await?;
224 }
225
226 {
227 let db = HoprNodeDb::new(path, HoprNodeDbConfig::default()).await?;
228
229 let not_found_peer = db.get_network_peer(&peer_id).await?;
230
231 assert_eq!(not_found_peer, None);
232 }
233
234 Ok(())
235 }
236
237 #[tokio::test]
238 async fn peers_with_a_recent_update_should_be_retained_in_the_database() -> anyhow::Result<()> {
239 let random_filename: String = rand::thread_rng()
240 .sample_iter(&Alphanumeric)
241 .take(15)
242 .map(char::from)
243 .collect();
244 let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
245
246 let ofk = OffchainKeypair::random();
247 let peer_id: PeerId = (*ofk.public()).into();
248 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
249 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
250
251 let path = std::path::Path::new(&random_tmp_file);
252
253 {
254 let db = HoprNodeDb::new(path, HoprNodeDbConfig::default()).await?;
255
256 db.add_network_peer(
257 &peer_id,
258 PeerOrigin::IncomingConnection,
259 vec![ma_1.clone(), ma_2.clone()],
260 0.0,
261 25,
262 )
263 .await?;
264
265 let ten_seconds_ago = std::time::SystemTime::now() - std::time::Duration::from_secs(10);
266
267 db.update_network_peer(PeerStatus {
268 id: (*ofk.public(), peer_id),
269 origin: PeerOrigin::Initialization,
270 last_seen: ten_seconds_ago,
271 last_seen_latency: std::time::Duration::from_millis(10),
272 heartbeats_sent: 1,
273 heartbeats_succeeded: 1,
274 backoff: 1.0,
275 ignored_until: None,
276 multiaddresses: vec![ma_1.clone(), ma_2.clone()],
277 quality: 1.0,
278 quality_avg: SingleSumSMA::new(2),
279 })
280 .await?;
281 }
282 {
283 let db = HoprNodeDb::new(path, HoprNodeDbConfig::default()).await?;
284
285 let found_peer = db.get_network_peer(&peer_id).await?.map(|p| p.id.1);
286
287 assert_eq!(found_peer, Some(peer_id));
288 }
289
290 Ok(())
291 }
292}