1use futures::channel::mpsc::UnboundedSender;
2use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, SqlxSqliteConnector};
3use sea_query::Expr;
4use sqlx::pool::PoolOptions;
5use sqlx::sqlite::{SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous};
6use sqlx::{ConnectOptions, SqlitePool};
7use std::path::Path;
8use std::sync::Arc;
9use std::time::Duration;
10use tracing::debug;
11use tracing::log::LevelFilter;
12
13use hopr_crypto_types::keypairs::Keypair;
14use hopr_crypto_types::prelude::ChainKeypair;
15use hopr_db_entity::ticket;
16use hopr_internal_types::prelude::{AcknowledgedTicket, AcknowledgedTicketStatus};
17use hopr_primitive_types::primitives::Address;
18use migration::{MigratorChainLogs, MigratorIndex, MigratorPeers, MigratorTickets, MigratorTrait};
19
20use crate::cache::HoprDbCaches;
21use crate::errors::Result;
22use crate::ticket_manager::TicketManager;
23use crate::HoprDbAllOperations;
24
25pub const HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS: u64 = 5 * 60; #[derive(Debug, Clone, PartialEq, Eq, smart_default::SmartDefault)]
28pub struct HoprDbConfig {
29 #[default(true)]
30 pub create_if_missing: bool,
31 #[default(false)]
32 pub force_create: bool,
33 #[default(Duration::from_secs(5))]
34 pub log_slow_queries: Duration,
35}
36
37#[derive(Debug, Clone)]
38pub struct HoprDb {
39 pub(crate) index_db: sea_orm::DatabaseConnection,
40 pub(crate) tickets_db: sea_orm::DatabaseConnection,
41 pub(crate) peers_db: sea_orm::DatabaseConnection,
42 pub(crate) logs_db: sea_orm::DatabaseConnection,
43 pub(crate) ticket_manager: Arc<TicketManager>,
44 pub(crate) chain_key: ChainKeypair,
45 pub(crate) me_onchain: Address,
46 pub(crate) caches: Arc<HoprDbCaches>,
47}
48
49pub const SQL_DB_INDEX_FILE_NAME: &str = "hopr_index.db";
50pub const SQL_DB_PEERS_FILE_NAME: &str = "hopr_peers.db";
51pub const SQL_DB_TICKETS_FILE_NAME: &str = "hopr_tickets.db";
52pub const SQL_DB_LOGS_FILE_NAME: &str = "hopr_logs.db";
53
54impl HoprDb {
55 pub async fn new(directory: &Path, chain_key: ChainKeypair, cfg: HoprDbConfig) -> Result<Self> {
56 std::fs::create_dir_all(directory).map_err(|_e| {
57 crate::errors::DbSqlError::Construction(format!("cannot create main database directory {directory:?}"))
58 })?;
59
60 let cfg_template = SqliteConnectOptions::default()
63 .create_if_missing(cfg.create_if_missing)
64 .log_slow_statements(LevelFilter::Warn, cfg.log_slow_queries)
65 .log_statements(LevelFilter::Debug)
66 .journal_mode(SqliteJournalMode::Wal)
67 .synchronous(SqliteSynchronous::Normal)
68 .auto_vacuum(SqliteAutoVacuum::Full)
69 .page_size(4096)
71 .pragma("cache_size", "-30000") .pragma("busy_timeout", "1000"); let index = PoolOptions::new()
76 .min_connections(0)
77 .max_connections(30)
78 .connect_with(cfg_template.clone().filename(directory.join(SQL_DB_INDEX_FILE_NAME)))
79 .await
80 .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?;
81
82 let peers = PoolOptions::new()
84 .min_connections(0) .acquire_timeout(Duration::from_secs(60)) .idle_timeout(Some(Duration::from_secs(10 * 60))) .max_lifetime(Some(Duration::from_secs(30 * 60))) .max_connections(300) .connect_with(cfg_template.clone().filename(directory.join(SQL_DB_PEERS_FILE_NAME)))
90 .await
91 .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?;
92
93 let tickets = PoolOptions::new()
95 .min_connections(0)
96 .max_connections(50)
97 .connect_with(cfg_template.clone().filename(directory.join(SQL_DB_TICKETS_FILE_NAME)))
98 .await
99 .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?;
100
101 let logs = PoolOptions::new()
102 .min_connections(0)
103 .connect_with(cfg_template.clone().filename(directory.join(SQL_DB_LOGS_FILE_NAME)))
104 .await
105 .unwrap_or_else(|e| panic!("failed to create logs database: {e}"));
106
107 Self::new_sqlx_sqlite(chain_key, index, peers, tickets, logs).await
108 }
109
110 pub async fn new_in_memory(chain_key: ChainKeypair) -> Result<Self> {
111 Self::new_sqlx_sqlite(
112 chain_key,
113 SqlitePool::connect(":memory:")
114 .await
115 .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?,
116 SqlitePool::connect(":memory:")
117 .await
118 .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?,
119 SqlitePool::connect(":memory:")
120 .await
121 .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?,
122 SqlitePool::connect(":memory:")
123 .await
124 .map_err(|e| crate::errors::DbSqlError::Construction(e.to_string()))?,
125 )
126 .await
127 }
128
129 async fn new_sqlx_sqlite(
130 chain_key: ChainKeypair,
131 index_db: SqlitePool,
132 peers_db: SqlitePool,
133 tickets_db: SqlitePool,
134 logs_db: SqlitePool,
135 ) -> Result<Self> {
136 let index_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(index_db);
137
138 MigratorIndex::up(&index_db, None)
139 .await
140 .map_err(|e| crate::errors::DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
141
142 let tickets_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(tickets_db);
143
144 MigratorTickets::up(&tickets_db, None)
145 .await
146 .map_err(|e| crate::errors::DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
147
148 let peers_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(peers_db);
149
150 MigratorPeers::up(&peers_db, None)
151 .await
152 .map_err(|e| crate::errors::DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
153
154 let logs_db = SqlxSqliteConnector::from_sqlx_sqlite_pool(logs_db);
155
156 MigratorChainLogs::up(&logs_db, None)
157 .await
158 .map_err(|e| crate::errors::DbSqlError::Construction(format!("cannot apply database migration: {e}")))?;
159
160 let res = hopr_db_entity::network_peer::Entity::delete_many()
162 .filter(
163 sea_orm::Condition::all().add(
164 hopr_db_entity::network_peer::Column::LastSeen.lt(chrono::DateTime::<chrono::Utc>::from(
165 hopr_platform::time::native::current_time()
166 .checked_sub(std::time::Duration::from_secs(
167 std::env::var("HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS")
168 .unwrap_or_else(|_| {
169 HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS.to_string()
170 })
171 .parse::<u64>()
172 .unwrap_or(HOPR_INTERNAL_DB_PEERS_PERSISTENCE_AFTER_RESTART_IN_SECONDS),
173 ))
174 .unwrap_or_else(hopr_platform::time::native::current_time),
175 )),
176 ),
177 )
178 .exec(&peers_db)
179 .await
180 .map_err(|e| crate::errors::DbSqlError::Construction(format!("must reset peers on init: {e}")))?;
181 debug!(rows = res.rows_affected, "Cleaned up rows from the 'peers' table");
182
183 ticket::Entity::update_many()
185 .filter(ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
186 .col_expr(
187 ticket::Column::State,
188 Expr::value(AcknowledgedTicketStatus::Untouched as u8),
189 )
190 .exec(&tickets_db)
191 .await?;
192
193 let caches = Arc::new(HoprDbCaches::default());
194 caches.invalidate_all();
195
196 Ok(Self {
197 me_onchain: chain_key.public().to_address(),
198 chain_key,
199 index_db,
200 peers_db,
201 logs_db,
202 ticket_manager: Arc::new(TicketManager::new(tickets_db.clone(), caches.clone())),
203 tickets_db,
204 caches,
205 })
206 }
207
208 pub fn start_ticket_processing(&self, ticket_notifier: Option<UnboundedSender<AcknowledgedTicket>>) -> Result<()> {
214 if let Some(notifier) = ticket_notifier {
215 self.ticket_manager.start_ticket_processing(notifier)
216 } else {
217 self.ticket_manager.start_ticket_processing(futures::sink::drain())
218 }
219 }
220}
221
222impl HoprDbAllOperations for HoprDb {}
223
224#[cfg(test)]
225mod tests {
226 use crate::db::HoprDb;
227 use crate::{HoprDbGeneralModelOperations, TargetDb};
228 use hopr_crypto_types::keypairs::{ChainKeypair, OffchainKeypair};
229 use hopr_crypto_types::prelude::Keypair;
230 use hopr_db_api::peers::{HoprDbPeersOperations, PeerOrigin};
231 use hopr_primitive_types::sma::SingleSumSMA;
232 use libp2p_identity::PeerId;
233 use migration::{MigratorChainLogs, MigratorIndex, MigratorPeers, MigratorTickets, MigratorTrait};
234 use multiaddr::Multiaddr;
235 use rand::{distributions::Alphanumeric, Rng}; #[async_std::test]
238 async fn test_basic_db_init() -> anyhow::Result<()> {
239 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
240
241 MigratorIndex::status(db.conn(TargetDb::Index)).await?;
243 MigratorTickets::status(db.conn(TargetDb::Tickets)).await?;
244 MigratorPeers::status(db.conn(TargetDb::Peers)).await?;
245 MigratorChainLogs::status(db.conn(TargetDb::Logs)).await?;
246
247 Ok(())
248 }
249
250 #[async_std::test]
251 async fn peers_without_any_recent_updates_should_be_discarded_on_restarts() -> anyhow::Result<()> {
252 let random_filename: String = rand::thread_rng()
253 .sample_iter(&Alphanumeric)
254 .take(15)
255 .map(char::from)
256 .collect();
257 let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
258
259 let peer_id: PeerId = OffchainKeypair::random().public().into();
260 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
261 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
262
263 let path = std::path::Path::new(&random_tmp_file);
264
265 {
266 let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
267
268 db.add_network_peer(
269 &peer_id,
270 PeerOrigin::IncomingConnection,
271 vec![ma_1.clone(), ma_2.clone()],
272 0.0,
273 25,
274 )
275 .await?;
276 }
277 {
278 let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
279
280 let not_found_peer = db.get_network_peer(&peer_id).await?;
281
282 assert_eq!(not_found_peer, None);
283 }
284
285 Ok(())
286 }
287
288 #[async_std::test]
289 async fn peers_with_a_recent_update_should_be_retained_in_the_database() -> anyhow::Result<()> {
290 let random_filename: String = rand::thread_rng()
291 .sample_iter(&Alphanumeric)
292 .take(15)
293 .map(char::from)
294 .collect();
295 let random_tmp_file = format!("/tmp/{random_filename}.sqlite");
296
297 let ofk = OffchainKeypair::random();
298 let peer_id: PeerId = (*ofk.public()).into();
299 let ma_1: Multiaddr = format!("/ip4/127.0.0.1/tcp/10000/p2p/{peer_id}").parse()?;
300 let ma_2: Multiaddr = format!("/ip4/127.0.0.1/tcp/10002/p2p/{peer_id}").parse()?;
301
302 let path = std::path::Path::new(&random_tmp_file);
303
304 {
305 let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
306
307 db.add_network_peer(
308 &peer_id,
309 PeerOrigin::IncomingConnection,
310 vec![ma_1.clone(), ma_2.clone()],
311 0.0,
312 25,
313 )
314 .await?;
315
316 let ten_seconds_ago = std::time::SystemTime::now() - std::time::Duration::from_secs(10);
317
318 db.update_network_peer(hopr_db_api::peers::PeerStatus {
319 id: (*ofk.public(), peer_id),
320 origin: PeerOrigin::Initialization,
321 is_public: true,
322 last_seen: ten_seconds_ago,
323 last_seen_latency: std::time::Duration::from_millis(10),
324 heartbeats_sent: 1,
325 heartbeats_succeeded: 1,
326 backoff: 1.0,
327 ignored: None,
328 peer_version: None,
329 multiaddresses: vec![ma_1.clone(), ma_2.clone()],
330 quality: 1.0,
331 quality_avg: SingleSumSMA::new(2),
332 })
333 .await?;
334 }
335 {
336 let db = HoprDb::new(path, ChainKeypair::random(), crate::db::HoprDbConfig::default()).await?;
337
338 let found_peer = db.get_network_peer(&peer_id).await?.map(|p| p.id.1);
339
340 assert_eq!(found_peer, Some(peer_id));
341 }
342
343 Ok(())
344 }
345}