1use std::{path::Path, sync::Arc, time::Duration};
2
3use futures::channel::mpsc::UnboundedSender;
4use hopr_crypto_types::{keypairs::Keypair, prelude::ChainKeypair};
5use hopr_db_entity::{
6 prelude::{Account, Announcement},
7 ticket,
8};
9use hopr_internal_types::prelude::{AcknowledgedTicket, AcknowledgedTicketStatus};
10use hopr_primitive_types::primitives::Address;
11use migration::{MigratorChainLogs, MigratorIndex, MigratorPeers, MigratorTickets, MigratorTrait};
12use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, SqlxSqliteConnector};
13use sea_query::Expr;
14use sqlx::{
15 ConnectOptions, SqlitePool,
16 pool::PoolOptions,
17 sqlite::{SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous},
18};
19use tracing::{debug, log::LevelFilter};
20
21use crate::{
22 HoprDbAllOperations, accounts::model_to_account_entry, cache::HoprDbCaches, errors::Result,
23 ticket_manager::TicketManager,
24};
25
26pub const HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS: u64 = 5 * 60; #[derive(Debug, Clone, PartialEq, Eq, smart_default::SmartDefault)]
29pub struct HoprDbConfig {
30 #[default(true)]
31 pub create_if_missing: bool,
32 #[default(false)]
33 pub force_create: bool,
34 #[default(Duration::from_secs(5))]
35 pub log_slow_queries: Duration,
36}
37
38#[derive(Debug, Clone)]
39pub struct HoprDb {
40 pub(crate) index_db: sea_orm::DatabaseConnection,
41 pub(crate) tickets_db: sea_orm::DatabaseConnection,
42 pub(crate) peers_db: sea_orm::DatabaseConnection,
43 pub(crate) logs_db: sea_orm::DatabaseConnection,
44 pub(crate) ticket_manager: Arc<TicketManager>,
45 pub(crate) chain_key: ChainKeypair,
46 pub(crate) me_onchain: Address,
47 pub(crate) caches: Arc<HoprDbCaches>,
48}
49
50pub const SQL_DB_INDEX_FILE_NAME: &str = "hopr_index.db";
51pub const SQL_DB_PEERS_FILE_NAME: &str = "hopr_peers.db";
52pub const SQL_DB_TICKETS_FILE_NAME: &str = "hopr_tickets.db";
53pub const SQL_DB_LOGS_FILE_NAME: &str = "hopr_logs.db";
54
55impl HoprDb {
56 pub async fn new(directory: &Path, chain_key: ChainKeypair, cfg: HoprDbConfig) -> Result<Self> {
57 #[cfg(all(feature = "prometheus", not(test)))]
58 {
59 lazy_static::initialize(&crate::protocol::METRIC_RECEIVED_ACKS);
60 lazy_static::initialize(&crate::protocol::METRIC_SENT_ACKS);
61 lazy_static::initialize(&crate::protocol::METRIC_TICKETS_COUNT);
62 }
63
64 std::fs::create_dir_all(directory).map_err(|_e| {
65 crate::errors::DbSqlError::Construction(format!("cannot create main database directory {directory:?}"))
66 })?;
67
68 let cfg_template = SqliteConnectOptions::default()
71 .create_if_missing(cfg.create_if_missing)
72 .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
73 .log_statements(LevelFilter::Debug)
74 .journal_mode(SqliteJournalMode::Wal)
75 .synchronous(SqliteSynchronous::Normal)
76 .auto_vacuum(SqliteAutoVacuum::Full)
77 .page_size(4096)
79 .pragma("cache_size", "-30000") .pragma("busy_timeout", "1000"); let index = PoolOptions::new()
84 .min_connections(0)
85 .max_connections(30)
86 .connect_with(cfg_template.clone().filename(directory.join(SQL_DB_INDEX_FILE_NAME)))
87 .await
88 .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?;
89
90 let peers = PoolOptions::new()
92 .min_connections(0) .acquire_timeout(Duration::from_secs(60)) .idle_timeout(Some(Duration::from_secs(10 * 60))) .max_lifetime(Some(Duration::from_secs(30 * 60))) .max_connections(300) .connect_with(cfg_template.clone().filename(directory.join(SQL_DB_PEERS_FILE_NAME)))
98 .await
99 .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?;
100
101 let tickets = PoolOptions::new()
103 .min_connections(0)
104 .max_connections(50)
105 .connect_with(cfg_template.clone().filename(directory.join(SQL_DB_TICKETS_FILE_NAME)))
106 .await
107 .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?;
108
109 let logs = PoolOptions::new()
110 .min_connections(0)
111 .connect_with(cfg_template.clone().filename(directory.join(SQL_DB_LOGS_FILE_NAME)))
112 .await
113 .unwrap_or_else(|e| panic!("failed to create logs database: {e}"));
114
115 Self::new_sqlx_sqlite(chain_key, index, peers, tickets, logs).await
116 }
117
118 pub async fn new_in_memory(chain_key: ChainKeypair) -> Result<Self> {
119 Self::new_sqlx_sqlite(
120 chain_key,
121 SqlitePool::connect(":memory:")
122 .await
123 .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?,
124 SqlitePool::connect(":memory:")
125 .await
126 .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?,
127 SqlitePool::connect(":memory:")
128 .await
129 .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?,
130 SqlitePool::connect(":memory:")
131 .await
132 .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?,
133 )
134 .await
135 }
136
137 async fn new_sqlx_sqlite(
138 chain_key: ChainKeypair,
139 index_db: SqlitePool,
140 peers_db: SqlitePool,
141 tickets_db: SqlitePool,
142 logs_db: SqlitePool,
143 ) -> Result<Self> {
144 let index_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(index_db);
145
146 MigratorIndex::up(&index_db, None)
147 .await
148 .map_err(|e| crate::errors::DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
149
150 let tickets_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(tickets_db);
151
152 MigratorTickets::up(&tickets_db, None)
153 .await
154 .map_err(|e| crate::errors::DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
155
156 let peers_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(peers_db);
157
158 MigratorPeers::up(&peers_db, None)
159 .await
160 .map_err(|e| crate::errors::DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
161
162 let logs_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(logs_db);
163
164 MigratorChainLogs::up(&logs_db, None)
165 .await
166 .map_err(|e| crate::errors::DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
167
168 let res = hopr_db_entity::network_peer::Entity::delete_many()
170 .filter(
171 sea_orm::Condition::all().add(
172 hopr_db_entity::network_peer::Column::LastSeen.lt(chrono::DateTime::<chrono::Utc>::from(
173 hopr_platform::time::native::current_time()
174 .checked_sub(std::time::Duration::from_secs(
175 std::env::var("HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS")
176 .unwrap_or_else(|_| {
177 HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS.to_string()
178 })
179 .parse::<u64>()
180 .unwrap_or(HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS),
181 ))
182 .unwrap_or_else(hopr_platform::time::native::current_time),
183 )),
184 ),
185 )
186 .exec(&peers_db)
187 .await
188 .map_err(|e| crate::errors::DbSqlError::Construction(format!("must reset peers on init: {e}")))?;
189 debug!(rows = res.rows_affected, "Cleaned up rows from the 'peers' table");
190
191 ticket::Entity::update_many()
193 .filter(ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
194 .col_expr(
195 ticket::Column::State,
196 Expr::value(AcknowledgedTicketStatus::Untouched as u8),
197 )
198 .exec(&tickets_db)
199 .await?;
200
201 let caches = Arc::new(HoprDbCaches::default());
202 caches.invalidate_all();
203
204 Account::find()
206 .find_with_related(Announcement)
207 .all(&index_db)
208 .await?
209 .into_iter()
210 .try_for_each(|(a, b)| match model_to_account_entry(a, b) {
211 Ok(account) => caches.key_id_mapper.update_key_id_binding(&account),
212 Err(error) => {
213 tracing::error!(%error, "undecodeable account");
215 Ok(())
216 }
217 })?;
218
219 Ok(Self {
220 me_onchain: chain_key.public().to_address(),
221 chain_key,
222 index_db,
223 peers_db,
224 logs_db,
225 ticket_manager: Arc::new(TicketManager::new(tickets_db.clone(), caches.clone())),
226 tickets_db,
227 caches,
228 })
229 }
230
231 pub fn start_ticket_processing(&self, ticket_notifier: Option<UnboundedSender<AcknowledgedTicket>>) -> Result<()> {
237 if let Some(notifier) = ticket_notifier {
238 self.ticket_manager.start_ticket_processing(notifier)
239 } else {
240 self.ticket_manager.start_ticket_processing(futures::sink::drain())
241 }
242 }
243}
244
245impl HoprDbAllOperations for HoprDb {}
246
247#[cfg(test)]
248mod tests {
249 use hopr_crypto_types::{
250 keypairs::{ChainKeypair, OffchainKeypair},
251 prelude::Keypair,
252 };
253 use hopr_db_api::peers::{HoprDbPeersOperations, PeerOrigin};
254 use hopr_primitive_types::sma::SingleSumSMA;
255 use libp2p_identity::PeerId;
256 use migration::{MigratorChainLogs, MigratorIndex, MigratorPeers, MigratorTickets, MigratorTrait};
257 use multiaddr::Multiaddr;
258 use rand::{Rng, distributions::Alphanumeric};
259
260 use crate::{HoprDbGeneralModelOperations, TargetDb, db::HoprDb}; #[tokio::test]
263 async fn test_basic_db_init() -> anyhow::Result<()> {
264 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
265
266 MigratorIndex::status(db.conn(TargetDb::Index)).await?;
269 MigratorTickets::status(db.conn(TargetDb::Tickets)).await?;
270 MigratorPeers::status(db.conn(TargetDb::Peers)).await?;
271 MigratorChainLogs::status(db.conn(TargetDb::Logs)).await?;
272
273 Ok(())
274 }
275
276 #[tokio::test]
277 async fn peers_without_any_recent_updates_should_be_discarded_on_restarts() -> anyhow::Result<()> {
278 let random_filename: String = rand::thread_rng()
279 .sample_iter(&Alphanumeric)
280 .take(15)
281 .map(char::from)
282 .collect();
283 let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
284
285 let peer_id: PeerId = OffchainKeypair::random().public().into();
286 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
287 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
288
289 let path = std::path::Path::new(&random_tmp_file);
290
291 {
292 let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
293
294 db.add_network_peer(
295 &peer_id,
296 PeerOrigin::IncomingConnection,
297 vec![ma_1.clone(), ma_2.clone()],
298 0.0,
299 25,
300 )
301 .await?;
302 }
303 {
304 let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
305
306 let not_found_peer = db.get_network_peer(&peer_id).await?;
307
308 assert_eq!(not_found_peer, None);
309 }
310
311 Ok(())
312 }
313
314 #[tokio::test]
315 async fn peers_with_a_recent_update_should_be_retained_in_the_database() -> anyhow::Result<()> {
316 let random_filename: String = rand::thread_rng()
317 .sample_iter(&Alphanumeric)
318 .take(15)
319 .map(char::from)
320 .collect();
321 let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
322
323 let ofk = OffchainKeypair::random();
324 let peer_id: PeerId = (*ofk.public()).into();
325 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
326 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
327
328 let path = std::path::Path::new(&random_tmp_file);
329
330 {
331 let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
332
333 db.add_network_peer(
334 &peer_id,
335 PeerOrigin::IncomingConnection,
336 vec![ma_1.clone(), ma_2.clone()],
337 0.0,
338 25,
339 )
340 .await?;
341
342 let ten_seconds_ago = std::time::SystemTime::now() - std::time::Duration::from_secs(10);
343
344 db.update_network_peer(hopr_db_api::peers::PeerStatus {
345 id: (*ofk.public(), peer_id),
346 origin: PeerOrigin::Initialization,
347 is_public: true,
348 last_seen: ten_seconds_ago,
349 last_seen_latency: std::time::Duration::from_millis(10),
350 heartbeats_sent: 1,
351 heartbeats_succeeded: 1,
352 backoff: 1.0,
353 ignored: None,
354 peer_version: None,
355 multiaddresses: vec![ma_1.clone(), ma_2.clone()],
356 quality: 1.0,
357 quality_avg: SingleSumSMA::new(2),
358 })
359 .await?;
360 }
361 {
362 let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
363
364 let found_peer = db.get_network_peer(&peer_id).await?.map(|p| p.id.1);
365
366 assert_eq!(found_peer, Some(peer_id));
367 }
368
369 Ok(())
370 }
371}