1use async_trait::async_trait;
2use futures::TryFutureExt;
3use hopr_crypto_types::prelude::Hash;
4use hopr_db_entity::{
5 chain_info, global_settings, node_info,
6 prelude::{Account, Announcement, ChainInfo, Channel, NetworkEligibility, NetworkRegistry, NodeInfo},
7};
8use hopr_internal_types::prelude::WinningProbability;
9use hopr_primitive_types::prelude::*;
10use sea_orm::{
11 ActiveModelBehavior, ActiveModelTrait, ColumnTrait, EntityOrSelect, EntityTrait, IntoActiveModel, PaginatorTrait,
12 QueryFilter, Set,
13};
14use tracing::trace;
15
16use crate::{
17 HoprDbGeneralModelOperations, HoprIndexerDb, OptTx, SINGULAR_TABLE_FIXED_ID, TargetDb,
18 cache::{CachedValue, CachedValueDiscriminants},
19 errors::{DbSqlError, DbSqlError::MissingFixedTableEntry, Result},
20};
21
22#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
23pub struct IndexerStateInfo {
24 pub latest_block_number: u32,
26 pub latest_log_block_number: u32,
27 pub latest_log_checksum: Hash,
28}
29
30#[derive(Clone, Copy, Debug)]
34pub struct IndexerData {
35 pub ledger_dst: Option<Hash>,
37 pub safe_registry_dst: Option<Hash>,
39 pub channels_dst: Option<Hash>,
41 pub ticket_price: Option<HoprBalance>,
43 pub minimum_incoming_ticket_winning_prob: WinningProbability,
45 pub nr_enabled: bool,
47}
48
49#[derive(Copy, Clone, Debug, PartialEq, Eq)]
51pub enum DomainSeparator {
52 Ledger,
54 SafeRegistry,
56 Channel,
58}
59
60impl IndexerData {
61 pub fn domain_separator(&self, dst_type: DomainSeparator) -> Option<Hash> {
63 match dst_type {
64 DomainSeparator::Ledger => self.ledger_dst,
65 DomainSeparator::SafeRegistry => self.safe_registry_dst,
66 DomainSeparator::Channel => self.channels_dst,
67 }
68 }
69}
70
71#[async_trait]
89pub trait HoprDbInfoOperations {
90 async fn index_is_empty(&self) -> Result<bool>;
96
97 async fn clear_index_db<'a>(&'a self, tx: OptTx<'a>) -> Result<()>;
103
104 async fn get_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>) -> Result<HoprBalance>;
106
107 async fn set_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>, new_balance: HoprBalance) -> Result<()>;
109
110 async fn get_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>) -> Result<HoprBalance>;
112
113 async fn set_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>, new_allowance: HoprBalance) -> Result<()>;
115
116 async fn get_indexer_data<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerData>;
121
122 async fn set_domain_separator<'a>(&'a self, tx: OptTx<'a>, dst_type: DomainSeparator, value: Hash) -> Result<()>;
127
128 async fn set_minimum_incoming_ticket_win_prob<'a>(
131 &'a self,
132 tx: OptTx<'a>,
133 win_prob: WinningProbability,
134 ) -> Result<()>;
135
136 async fn update_ticket_price<'a>(&'a self, tx: OptTx<'a>, price: HoprBalance) -> Result<()>;
140
141 async fn get_indexer_state_info<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerStateInfo>;
143
144 async fn set_indexer_state_info<'a>(&'a self, tx: OptTx<'a>, block_num: u32) -> Result<()>;
146
147 async fn get_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str) -> Result<Option<Box<[u8]>>>;
149
150 async fn set_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str, value: Option<&[u8]>) -> Result<()>;
155}
156
157#[async_trait]
158impl HoprDbInfoOperations for HoprIndexerDb {
159 async fn index_is_empty(&self) -> Result<bool> {
160 let c = self.conn(TargetDb::Index);
161
162 if Account::find().select().count(c).await? > 1 {
164 return Ok(false);
165 }
166
167 if Announcement::find().one(c).await?.is_some() {
168 return Ok(false);
169 }
170
171 if Channel::find().one(c).await?.is_some() {
172 return Ok(false);
173 }
174
175 if NetworkEligibility::find().one(c).await?.is_some() {
176 return Ok(false);
177 }
178
179 if NetworkRegistry::find().one(c).await?.is_some() {
180 return Ok(false);
181 }
182
183 Ok(true)
184 }
185
186 async fn clear_index_db<'a>(&'a self, tx: OptTx<'a>) -> Result<()> {
187 self.nest_transaction(tx)
188 .await?
189 .perform(|tx| {
190 Box::pin(async move {
191 Account::delete_many().exec(tx.as_ref()).await?;
192 Announcement::delete_many().exec(tx.as_ref()).await?;
193 Channel::delete_many().exec(tx.as_ref()).await?;
194 NetworkEligibility::delete_many().exec(tx.as_ref()).await?;
195 NetworkRegistry::delete_many().exec(tx.as_ref()).await?;
196 ChainInfo::delete_many().exec(tx.as_ref()).await?;
197 NodeInfo::delete_many().exec(tx.as_ref()).await?;
198
199 let mut initial_row = chain_info::ActiveModel::new();
203 initial_row.id = Set(1);
204 ChainInfo::insert(initial_row).exec(tx.as_ref()).await?;
205
206 let mut initial_row = node_info::ActiveModel::new();
207 initial_row.id = Set(1);
208 NodeInfo::insert(initial_row).exec(tx.as_ref()).await?;
209
210 Ok::<(), DbSqlError>(())
211 })
212 })
213 .await?;
214
215 Ok(())
216 }
217
218 async fn get_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>) -> Result<HoprBalance> {
219 self.nest_transaction(tx)
220 .await?
221 .perform(|tx| {
222 Box::pin(async move {
223 node_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
224 .one(tx.as_ref())
225 .await?
226 .ok_or(MissingFixedTableEntry("node_info".into()))
227 .map(|m| HoprBalance::from_be_bytes(m.safe_balance))
228 })
229 })
230 .await
231 }
232
233 async fn set_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>, new_balance: HoprBalance) -> Result<()> {
234 self.nest_transaction(tx)
235 .await?
236 .perform(|tx| {
237 Box::pin(async move {
238 Ok::<_, DbSqlError>(
239 node_info::ActiveModel {
240 id: Set(SINGULAR_TABLE_FIXED_ID),
241 safe_balance: Set(new_balance.to_be_bytes().into()),
242 ..Default::default()
243 }
244 .update(tx.as_ref()) .await?,
246 )
247 })
248 })
249 .await?;
250
251 Ok(())
252 }
253
254 async fn get_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>) -> Result<HoprBalance> {
255 self.nest_transaction(tx)
256 .await?
257 .perform(|tx| {
258 Box::pin(async move {
259 node_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
260 .one(tx.as_ref())
261 .await?
262 .ok_or(MissingFixedTableEntry("node_info".into()))
263 .map(|m| HoprBalance::from_be_bytes(m.safe_allowance))
264 })
265 })
266 .await
267 }
268
269 async fn set_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>, new_allowance: HoprBalance) -> Result<()> {
270 self.nest_transaction(tx)
271 .await?
272 .perform(|tx| {
273 Box::pin(async move {
274 node_info::ActiveModel {
275 id: Set(SINGULAR_TABLE_FIXED_ID),
276 safe_allowance: Set(new_allowance.amount().to_be_bytes().to_vec()),
277 ..Default::default()
278 }
279 .update(tx.as_ref()) .await?;
281
282 Ok::<_, DbSqlError>(())
283 })
284 })
285 .await
286 }
287
288 async fn get_indexer_data<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerData> {
289 let myself = self.clone();
290 Ok(self
291 .caches
292 .single_values
293 .try_get_with_by_ref(&CachedValueDiscriminants::IndexerDataCache, async move {
294 tracing::warn!("cache miss on get_indexer_data");
295 myself
296 .nest_transaction(tx)
297 .and_then(|op| {
298 op.perform(|tx| {
299 Box::pin(async move {
300 let model = chain_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
301 .one(tx.as_ref())
302 .await?
303 .ok_or(MissingFixedTableEntry("chain_info".into()))?;
304
305 let ledger_dst = if let Some(b) = model.ledger_dst {
306 Some(Hash::try_from(b.as_ref())?)
307 } else {
308 None
309 };
310
311 let safe_registry_dst = if let Some(b) = model.safe_registry_dst {
312 Some(Hash::try_from(b.as_ref())?)
313 } else {
314 None
315 };
316
317 let channels_dst = if let Some(b) = model.channels_dst {
318 Some(Hash::try_from(b.as_ref())?)
319 } else {
320 None
321 };
322
323 Ok::<_, DbSqlError>(CachedValue::IndexerDataCache(IndexerData {
324 ledger_dst,
325 safe_registry_dst,
326 channels_dst,
327 ticket_price: model.ticket_price.map(HoprBalance::from_be_bytes),
328 minimum_incoming_ticket_winning_prob: (model.min_incoming_ticket_win_prob as f64)
329 .try_into()?,
330 nr_enabled: model.network_registry_enabled,
331 }))
332 })
333 })
334 })
335 .await
336 })
337 .await?
338 .try_into()?)
339 }
340
341 async fn set_domain_separator<'a>(&'a self, tx: OptTx<'a>, dst_type: DomainSeparator, value: Hash) -> Result<()> {
342 self.nest_transaction(tx)
343 .await?
344 .perform(|tx| {
345 Box::pin(async move {
346 let mut active_model = chain_info::ActiveModel {
347 id: Set(SINGULAR_TABLE_FIXED_ID),
348 ..Default::default()
349 };
350
351 match dst_type {
352 DomainSeparator::Ledger => {
353 active_model.ledger_dst = Set(Some(value.as_ref().into()));
354 }
355 DomainSeparator::SafeRegistry => {
356 active_model.safe_registry_dst = Set(Some(value.as_ref().into()));
357 }
358 DomainSeparator::Channel => {
359 active_model.channels_dst = Set(Some(value.as_ref().into()));
360 }
361 }
362
363 active_model.update(tx.as_ref()).await?;
365
366 Ok::<(), DbSqlError>(())
367 })
368 })
369 .await?;
370
371 self.caches
372 .single_values
373 .invalidate(&CachedValueDiscriminants::IndexerDataCache)
374 .await;
375 Ok(())
376 }
377
378 async fn set_minimum_incoming_ticket_win_prob<'a>(
379 &'a self,
380 tx: OptTx<'a>,
381 win_prob: WinningProbability,
382 ) -> Result<()> {
383 self.nest_transaction(tx)
384 .await?
385 .perform(|tx| {
386 Box::pin(async move {
387 chain_info::ActiveModel {
388 id: Set(SINGULAR_TABLE_FIXED_ID),
389 min_incoming_ticket_win_prob: Set(win_prob.as_f64() as f32),
390 ..Default::default()
391 }
392 .update(tx.as_ref())
393 .await?;
394
395 Ok::<(), DbSqlError>(())
396 })
397 })
398 .await?;
399
400 self.caches
401 .single_values
402 .invalidate(&CachedValueDiscriminants::IndexerDataCache)
403 .await;
404 Ok(())
405 }
406
407 async fn update_ticket_price<'a>(&'a self, tx: OptTx<'a>, price: HoprBalance) -> Result<()> {
408 self.nest_transaction(tx)
409 .await?
410 .perform(|tx| {
411 Box::pin(async move {
412 chain_info::ActiveModel {
413 id: Set(SINGULAR_TABLE_FIXED_ID),
414 ticket_price: Set(Some(price.to_be_bytes().into())),
415 ..Default::default()
416 }
417 .update(tx.as_ref())
418 .await?;
419
420 Ok::<(), DbSqlError>(())
421 })
422 })
423 .await?;
424
425 self.caches
426 .single_values
427 .invalidate(&CachedValueDiscriminants::IndexerDataCache)
428 .await;
429 Ok(())
430 }
431
432 async fn get_indexer_state_info<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerStateInfo> {
433 self.nest_transaction(tx)
434 .await?
435 .perform(|tx| {
436 Box::pin(async move {
437 chain_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
438 .one(tx.as_ref())
439 .await?
440 .ok_or(DbSqlError::MissingFixedTableEntry("chain_info".into()))
441 .map(|m| IndexerStateInfo {
442 latest_block_number: m.last_indexed_block as u32,
443 ..Default::default()
444 })
445 })
446 })
447 .await
448 }
449
450 async fn set_indexer_state_info<'a>(&'a self, tx: OptTx<'a>, block_num: u32) -> Result<()> {
451 self.nest_transaction(tx)
452 .await?
453 .perform(|tx| {
454 Box::pin(async move {
455 let model = chain_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
456 .one(tx.as_ref())
457 .await?
458 .ok_or(MissingFixedTableEntry("chain_info".into()))?;
459
460 let current_last_indexed_block = model.last_indexed_block;
461
462 let mut active_model = model.into_active_model();
463
464 trace!(
465 old_block = current_last_indexed_block,
466 new_block = block_num,
467 "update block"
468 );
469
470 active_model.last_indexed_block = Set(block_num as i32);
471 active_model.update(tx.as_ref()).await?;
472
473 Ok::<_, DbSqlError>(())
474 })
475 })
476 .await
477 }
478
479 async fn get_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str) -> Result<Option<Box<[u8]>>> {
480 let k = key.to_owned();
481 self.nest_transaction(tx)
482 .await?
483 .perform(|tx| {
484 Box::pin(async move {
485 Ok::<Option<Box<[u8]>>, DbSqlError>(
486 global_settings::Entity::find()
487 .filter(global_settings::Column::Key.eq(k))
488 .one(tx.as_ref())
489 .await?
490 .map(|m| m.value.into_boxed_slice()),
491 )
492 })
493 })
494 .await
495 }
496
497 async fn set_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str, value: Option<&[u8]>) -> Result<()> {
498 let k = key.to_owned();
499 let value = value.map(Vec::from);
500 self.nest_transaction(tx)
501 .await?
502 .perform(|tx| {
503 Box::pin(async move {
504 if let Some(v) = value {
505 let mut am = global_settings::Entity::find()
506 .filter(global_settings::Column::Key.eq(k.clone()))
507 .one(tx.as_ref())
508 .await?
509 .map(|m| m.into_active_model())
510 .unwrap_or(global_settings::ActiveModel {
511 key: Set(k),
512 ..Default::default()
513 });
514 am.value = Set(v);
515 am.save(tx.as_ref()).await?;
516 } else {
517 global_settings::Entity::delete_many()
518 .filter(global_settings::Column::Key.eq(k))
519 .exec(tx.as_ref())
520 .await?;
521 }
522 Ok::<(), DbSqlError>(())
523 })
524 })
525 .await
526 }
527}
528
529#[cfg(test)]
530mod tests {
531 use hex_literal::hex;
532 use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
533 use hopr_primitive_types::{balance::HoprBalance, prelude::Address};
534
535 use super::*;
536
537 lazy_static::lazy_static! {
538 static ref ADDR_1: Address = Address::from(hex!("86fa27add61fafc955e2da17329bba9f31692fe7"));
539 static ref ADDR_2: Address = Address::from(hex!("4c8bbd047c2130e702badb23b6b97a88b6562324"));
540 }
541
542 #[tokio::test]
543 async fn test_set_get_balance() -> anyhow::Result<()> {
544 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
545
546 assert_eq!(
547 HoprBalance::zero(),
548 db.get_safe_hopr_balance(None).await?,
549 "balance must be 0"
550 );
551
552 let balance = HoprBalance::from(10_000);
553 db.set_safe_hopr_balance(None, balance).await?;
554
555 assert_eq!(
556 balance,
557 db.get_safe_hopr_balance(None).await?,
558 "balance must be {balance}"
559 );
560 Ok(())
561 }
562
563 #[tokio::test]
564 async fn test_set_get_allowance() -> anyhow::Result<()> {
565 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
566
567 assert_eq!(
568 HoprBalance::zero(),
569 db.get_safe_hopr_allowance(None).await?,
570 "balance must be 0"
571 );
572
573 let balance = HoprBalance::from(10_000);
574 db.set_safe_hopr_allowance(None, balance).await?;
575
576 assert_eq!(
577 balance,
578 db.get_safe_hopr_allowance(None).await?,
579 "balance must be {balance}"
580 );
581
582 Ok(())
583 }
584
585 #[tokio::test]
586 async fn test_set_get_indexer_data() -> anyhow::Result<()> {
587 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
588
589 let data = db.get_indexer_data(None).await?;
590 assert_eq!(data.ticket_price, None);
591
592 let price = HoprBalance::from(10);
593 db.update_ticket_price(None, price).await?;
594
595 db.set_minimum_incoming_ticket_win_prob(None, 0.5.try_into()?).await?;
596
597 let data = db.get_indexer_data(None).await?;
598
599 assert_eq!(data.ticket_price, Some(price));
600 assert_eq!(data.minimum_incoming_ticket_winning_prob, 0.5);
601 Ok(())
602 }
603
604 #[tokio::test]
605 async fn test_set_get_global_setting() -> anyhow::Result<()> {
606 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
607
608 let key = "test";
609 let value = hex!("deadbeef");
610
611 assert_eq!(None, db.get_global_setting(None, key).await?);
612
613 db.set_global_setting(None, key, Some(&value)).await?;
614
615 assert_eq!(Some(value.into()), db.get_global_setting(None, key).await?);
616
617 db.set_global_setting(None, key, None).await?;
618
619 assert_eq!(None, db.get_global_setting(None, key).await?);
620 Ok(())
621 }
622}