1use async_trait::async_trait;
2use futures::TryFutureExt;
3use hopr_crypto_types::prelude::Hash;
4use hopr_db_api::info::*;
5use hopr_db_entity::{
6 chain_info, global_settings, node_info,
7 prelude::{Account, Announcement, ChainInfo, Channel, NetworkEligibility, NetworkRegistry, NodeInfo},
8};
9use hopr_internal_types::prelude::WinningProbability;
10use hopr_primitive_types::prelude::*;
11use sea_orm::{
12 ActiveModelBehavior, ActiveModelTrait, ColumnTrait, EntityOrSelect, EntityTrait, IntoActiveModel, PaginatorTrait,
13 QueryFilter, Set,
14};
15use tracing::trace;
16
17use crate::{
18 HoprDbGeneralModelOperations, OptTx, SINGULAR_TABLE_FIXED_ID, TargetDb,
19 cache::{CachedValue, CachedValueDiscriminants},
20 db::HoprDb,
21 errors::{DbSqlError, DbSqlError::MissingFixedTableEntry, Result},
22};
23
24#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
25pub struct IndexerStateInfo {
26 pub latest_block_number: u32,
28 pub latest_log_block_number: u32,
29 pub latest_log_checksum: Hash,
30}
31
32#[async_trait]
50pub trait HoprDbInfoOperations {
51 async fn index_is_empty(&self) -> Result<bool>;
57
58 async fn clear_index_db<'a>(&'a self, tx: OptTx<'a>) -> Result<()>;
64
65 async fn get_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>) -> Result<HoprBalance>;
67
68 async fn set_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>, new_balance: HoprBalance) -> Result<()>;
70
71 async fn get_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>) -> Result<HoprBalance>;
73
74 async fn set_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>, new_allowance: HoprBalance) -> Result<()>;
76
77 async fn get_safe_info<'a>(&'a self, tx: OptTx<'a>) -> Result<Option<SafeInfo>>;
79
80 async fn set_safe_info<'a>(&'a self, tx: OptTx<'a>, safe_info: SafeInfo) -> Result<()>;
82
83 async fn get_indexer_data<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerData>;
88
89 async fn set_domain_separator<'a>(&'a self, tx: OptTx<'a>, dst_type: DomainSeparator, value: Hash) -> Result<()>;
94
95 async fn set_minimum_incoming_ticket_win_prob<'a>(
98 &'a self,
99 tx: OptTx<'a>,
100 win_prob: WinningProbability,
101 ) -> Result<()>;
102
103 async fn update_ticket_price<'a>(&'a self, tx: OptTx<'a>, price: HoprBalance) -> Result<()>;
107
108 async fn get_indexer_state_info<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerStateInfo>;
110
111 async fn set_indexer_state_info<'a>(&'a self, tx: OptTx<'a>, block_num: u32) -> Result<()>;
113
114 async fn set_network_registry_enabled<'a>(&'a self, tx: OptTx<'a>, enabled: bool) -> Result<()>;
118
119 async fn get_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str) -> Result<Option<Box<[u8]>>>;
121
122 async fn set_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str, value: Option<&[u8]>) -> Result<()>;
127}
128
129#[async_trait]
130impl HoprDbInfoOperations for HoprDb {
131 async fn index_is_empty(&self) -> Result<bool> {
132 let c = self.conn(TargetDb::Index);
133
134 if Account::find().select().count(c).await? > 1 {
136 return Ok(false);
137 }
138
139 if Announcement::find().one(c).await?.is_some() {
140 return Ok(false);
141 }
142
143 if Channel::find().one(c).await?.is_some() {
144 return Ok(false);
145 }
146
147 if NetworkEligibility::find().one(c).await?.is_some() {
148 return Ok(false);
149 }
150
151 if NetworkRegistry::find().one(c).await?.is_some() {
152 return Ok(false);
153 }
154
155 Ok(true)
156 }
157
158 async fn clear_index_db<'a>(&'a self, tx: OptTx<'a>) -> Result<()> {
159 self.nest_transaction(tx)
160 .await?
161 .perform(|tx| {
162 Box::pin(async move {
163 Account::delete_many().exec(tx.as_ref()).await?;
164 Announcement::delete_many().exec(tx.as_ref()).await?;
165 Channel::delete_many().exec(tx.as_ref()).await?;
166 NetworkEligibility::delete_many().exec(tx.as_ref()).await?;
167 NetworkRegistry::delete_many().exec(tx.as_ref()).await?;
168 ChainInfo::delete_many().exec(tx.as_ref()).await?;
169 NodeInfo::delete_many().exec(tx.as_ref()).await?;
170
171 let mut initial_row = chain_info::ActiveModel::new();
175 initial_row.id = Set(1);
176 ChainInfo::insert(initial_row).exec(tx.as_ref()).await?;
177
178 let mut initial_row = node_info::ActiveModel::new();
179 initial_row.id = Set(1);
180 NodeInfo::insert(initial_row).exec(tx.as_ref()).await?;
181
182 Ok::<(), DbSqlError>(())
183 })
184 })
185 .await?;
186
187 Ok(())
188 }
189
190 async fn get_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>) -> Result<HoprBalance> {
191 self.nest_transaction(tx)
192 .await?
193 .perform(|tx| {
194 Box::pin(async move {
195 node_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
196 .one(tx.as_ref())
197 .await?
198 .ok_or(MissingFixedTableEntry("node_info".into()))
199 .map(|m| HoprBalance::from_be_bytes(m.safe_balance))
200 })
201 })
202 .await
203 }
204
205 async fn set_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>, new_balance: HoprBalance) -> Result<()> {
206 self.nest_transaction(tx)
207 .await?
208 .perform(|tx| {
209 Box::pin(async move {
210 Ok::<_, DbSqlError>(
211 node_info::ActiveModel {
212 id: Set(SINGULAR_TABLE_FIXED_ID),
213 safe_balance: Set(new_balance.to_be_bytes().into()),
214 ..Default::default()
215 }
216 .update(tx.as_ref()) .await?,
218 )
219 })
220 })
221 .await?;
222
223 Ok(())
224 }
225
226 async fn get_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>) -> Result<HoprBalance> {
227 self.nest_transaction(tx)
228 .await?
229 .perform(|tx| {
230 Box::pin(async move {
231 node_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
232 .one(tx.as_ref())
233 .await?
234 .ok_or(MissingFixedTableEntry("node_info".into()))
235 .map(|m| HoprBalance::from_be_bytes(m.safe_allowance))
236 })
237 })
238 .await
239 }
240
241 async fn set_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>, new_allowance: HoprBalance) -> Result<()> {
242 self.nest_transaction(tx)
243 .await?
244 .perform(|tx| {
245 Box::pin(async move {
246 node_info::ActiveModel {
247 id: Set(SINGULAR_TABLE_FIXED_ID),
248 safe_allowance: Set(new_allowance.amount().to_be_bytes().to_vec()),
249 ..Default::default()
250 }
251 .update(tx.as_ref()) .await?;
253
254 Ok::<_, DbSqlError>(())
255 })
256 })
257 .await
258 }
259
260 async fn get_safe_info<'a>(&'a self, tx: OptTx<'a>) -> Result<Option<SafeInfo>> {
261 let myself = self.clone();
262 Ok(self
263 .caches
264 .single_values
265 .try_get_with_by_ref(&CachedValueDiscriminants::SafeInfoCache, async move {
266 myself
267 .nest_transaction(tx)
268 .and_then(|op| {
269 op.perform(|tx| {
270 Box::pin(async move {
271 let info = node_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
272 .one(tx.as_ref())
273 .await?
274 .ok_or(MissingFixedTableEntry("node_info".into()))?;
275 Ok::<_, DbSqlError>(info.safe_address.zip(info.module_address))
276 })
277 })
278 })
279 .await
280 .and_then(|addrs| {
281 if let Some((safe_address, module_address)) = addrs {
282 Ok(Some(SafeInfo {
283 safe_address: safe_address.parse()?,
284 module_address: module_address.parse()?,
285 }))
286 } else {
287 Ok(None)
288 }
289 })
290 .map(CachedValue::SafeInfoCache)
291 })
292 .await?
293 .try_into()?)
294 }
295
296 async fn set_safe_info<'a>(&'a self, tx: OptTx<'a>, safe_info: SafeInfo) -> Result<()> {
297 self.nest_transaction(tx)
298 .await?
299 .perform(|tx| {
300 Box::pin(async move {
301 node_info::ActiveModel {
302 id: Set(SINGULAR_TABLE_FIXED_ID),
303 safe_address: Set(Some(safe_info.safe_address.to_hex())),
304 module_address: Set(Some(safe_info.module_address.to_hex())),
305 ..Default::default()
306 }
307 .update(tx.as_ref()) .await?;
309 Ok::<_, DbSqlError>(())
310 })
311 })
312 .await?;
313 self.caches
314 .single_values
315 .insert(
316 CachedValueDiscriminants::SafeInfoCache,
317 CachedValue::SafeInfoCache(Some(safe_info)),
318 )
319 .await;
320 Ok(())
321 }
322
323 async fn get_indexer_data<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerData> {
324 let myself = self.clone();
325 Ok(self
326 .caches
327 .single_values
328 .try_get_with_by_ref(&CachedValueDiscriminants::IndexerDataCache, async move {
329 tracing::warn!("cache miss on get_indexer_data");
330 myself
331 .nest_transaction(tx)
332 .and_then(|op| {
333 op.perform(|tx| {
334 Box::pin(async move {
335 let model = chain_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
336 .one(tx.as_ref())
337 .await?
338 .ok_or(MissingFixedTableEntry("chain_info".into()))?;
339
340 let ledger_dst = if let Some(b) = model.ledger_dst {
341 Some(Hash::try_from(b.as_ref())?)
342 } else {
343 None
344 };
345
346 let safe_registry_dst = if let Some(b) = model.safe_registry_dst {
347 Some(Hash::try_from(b.as_ref())?)
348 } else {
349 None
350 };
351
352 let channels_dst = if let Some(b) = model.channels_dst {
353 Some(Hash::try_from(b.as_ref())?)
354 } else {
355 None
356 };
357
358 Ok::<_, DbSqlError>(CachedValue::IndexerDataCache(IndexerData {
359 ledger_dst,
360 safe_registry_dst,
361 channels_dst,
362 ticket_price: model.ticket_price.map(HoprBalance::from_be_bytes),
363 minimum_incoming_ticket_winning_prob: (model.min_incoming_ticket_win_prob as f64)
364 .try_into()?,
365 nr_enabled: model.network_registry_enabled,
366 }))
367 })
368 })
369 })
370 .await
371 })
372 .await?
373 .try_into()?)
374 }
375
376 async fn set_domain_separator<'a>(&'a self, tx: OptTx<'a>, dst_type: DomainSeparator, value: Hash) -> Result<()> {
377 self.nest_transaction(tx)
378 .await?
379 .perform(|tx| {
380 Box::pin(async move {
381 let mut active_model = chain_info::ActiveModel {
382 id: Set(SINGULAR_TABLE_FIXED_ID),
383 ..Default::default()
384 };
385
386 match dst_type {
387 DomainSeparator::Ledger => {
388 active_model.ledger_dst = Set(Some(value.as_ref().into()));
389 }
390 DomainSeparator::SafeRegistry => {
391 active_model.safe_registry_dst = Set(Some(value.as_ref().into()));
392 }
393 DomainSeparator::Channel => {
394 active_model.channels_dst = Set(Some(value.as_ref().into()));
395 }
396 }
397
398 active_model.update(tx.as_ref()).await?;
400
401 Ok::<(), DbSqlError>(())
402 })
403 })
404 .await?;
405
406 self.caches
407 .single_values
408 .invalidate(&CachedValueDiscriminants::IndexerDataCache)
409 .await;
410 Ok(())
411 }
412
413 async fn set_minimum_incoming_ticket_win_prob<'a>(
414 &'a self,
415 tx: OptTx<'a>,
416 win_prob: WinningProbability,
417 ) -> Result<()> {
418 self.nest_transaction(tx)
419 .await?
420 .perform(|tx| {
421 Box::pin(async move {
422 chain_info::ActiveModel {
423 id: Set(SINGULAR_TABLE_FIXED_ID),
424 min_incoming_ticket_win_prob: Set(win_prob.as_f64() as f32),
425 ..Default::default()
426 }
427 .update(tx.as_ref())
428 .await?;
429
430 Ok::<(), DbSqlError>(())
431 })
432 })
433 .await?;
434
435 self.caches
436 .single_values
437 .invalidate(&CachedValueDiscriminants::IndexerDataCache)
438 .await;
439 Ok(())
440 }
441
442 async fn update_ticket_price<'a>(&'a self, tx: OptTx<'a>, price: HoprBalance) -> Result<()> {
443 self.nest_transaction(tx)
444 .await?
445 .perform(|tx| {
446 Box::pin(async move {
447 chain_info::ActiveModel {
448 id: Set(SINGULAR_TABLE_FIXED_ID),
449 ticket_price: Set(Some(price.to_be_bytes().into())),
450 ..Default::default()
451 }
452 .update(tx.as_ref())
453 .await?;
454
455 Ok::<(), DbSqlError>(())
456 })
457 })
458 .await?;
459
460 self.caches
461 .single_values
462 .invalidate(&CachedValueDiscriminants::IndexerDataCache)
463 .await;
464 Ok(())
465 }
466
467 async fn get_indexer_state_info<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerStateInfo> {
468 self.nest_transaction(tx)
469 .await?
470 .perform(|tx| {
471 Box::pin(async move {
472 chain_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
473 .one(tx.as_ref())
474 .await?
475 .ok_or(DbSqlError::MissingFixedTableEntry("chain_info".into()))
476 .map(|m| IndexerStateInfo {
477 latest_block_number: m.last_indexed_block as u32,
478 ..Default::default()
479 })
480 })
481 })
482 .await
483 }
484
485 async fn set_indexer_state_info<'a>(&'a self, tx: OptTx<'a>, block_num: u32) -> Result<()> {
486 self.nest_transaction(tx)
487 .await?
488 .perform(|tx| {
489 Box::pin(async move {
490 let model = chain_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
491 .one(tx.as_ref())
492 .await?
493 .ok_or(MissingFixedTableEntry("chain_info".into()))?;
494
495 let current_last_indexed_block = model.last_indexed_block;
496
497 let mut active_model = model.into_active_model();
498
499 trace!(
500 old_block = current_last_indexed_block,
501 new_block = block_num,
502 "update block"
503 );
504
505 active_model.last_indexed_block = Set(block_num as i32);
506 active_model.update(tx.as_ref()).await?;
507
508 Ok::<_, DbSqlError>(())
509 })
510 })
511 .await
512 }
513
514 async fn set_network_registry_enabled<'a>(&'a self, tx: OptTx<'a>, enabled: bool) -> Result<()> {
515 self.nest_transaction(tx)
516 .await?
517 .perform(|tx| {
518 Box::pin(async move {
519 chain_info::ActiveModel {
520 id: Set(SINGULAR_TABLE_FIXED_ID),
521 network_registry_enabled: Set(enabled),
522 ..Default::default()
523 }
524 .update(tx.as_ref())
525 .await?;
526 Ok::<_, DbSqlError>(())
527 })
528 })
529 .await?;
530
531 self.caches
532 .single_values
533 .invalidate(&CachedValueDiscriminants::IndexerDataCache)
534 .await;
535 Ok(())
536 }
537
538 async fn get_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str) -> Result<Option<Box<[u8]>>> {
539 let k = key.to_owned();
540 self.nest_transaction(tx)
541 .await?
542 .perform(|tx| {
543 Box::pin(async move {
544 Ok::<Option<Box<[u8]>>, DbSqlError>(
545 global_settings::Entity::find()
546 .filter(global_settings::Column::Key.eq(k))
547 .one(tx.as_ref())
548 .await?
549 .map(|m| m.value.into_boxed_slice()),
550 )
551 })
552 })
553 .await
554 }
555
556 async fn set_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str, value: Option<&[u8]>) -> Result<()> {
557 let k = key.to_owned();
558 let value = value.map(Vec::from);
559 self.nest_transaction(tx)
560 .await?
561 .perform(|tx| {
562 Box::pin(async move {
563 if let Some(v) = value {
564 let mut am = global_settings::Entity::find()
565 .filter(global_settings::Column::Key.eq(k.clone()))
566 .one(tx.as_ref())
567 .await?
568 .map(|m| m.into_active_model())
569 .unwrap_or(global_settings::ActiveModel {
570 key: Set(k),
571 ..Default::default()
572 });
573 am.value = Set(v);
574 am.save(tx.as_ref()).await?;
575 } else {
576 global_settings::Entity::delete_many()
577 .filter(global_settings::Column::Key.eq(k))
578 .exec(tx.as_ref())
579 .await?;
580 }
581 Ok::<(), DbSqlError>(())
582 })
583 })
584 .await
585 }
586}
587
588#[cfg(test)]
589mod tests {
590 use hex_literal::hex;
591 use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
592 use hopr_primitive_types::{balance::HoprBalance, prelude::Address};
593
594 use crate::{
595 db::HoprDb,
596 info::{HoprDbInfoOperations, SafeInfo},
597 };
598
599 lazy_static::lazy_static! {
600 static ref ADDR_1: Address = Address::from(hex!("86fa27add61fafc955e2da17329bba9f31692fe7"));
601 static ref ADDR_2: Address = Address::from(hex!("4c8bbd047c2130e702badb23b6b97a88b6562324"));
602 }
603
604 #[tokio::test]
605 async fn test_set_get_balance() -> anyhow::Result<()> {
606 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
607
608 assert_eq!(
609 HoprBalance::zero(),
610 db.get_safe_hopr_balance(None).await?,
611 "balance must be 0"
612 );
613
614 let balance = HoprBalance::from(10_000);
615 db.set_safe_hopr_balance(None, balance).await?;
616
617 assert_eq!(
618 balance,
619 db.get_safe_hopr_balance(None).await?,
620 "balance must be {balance}"
621 );
622 Ok(())
623 }
624
625 #[tokio::test]
626 async fn test_set_get_allowance() -> anyhow::Result<()> {
627 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
628
629 assert_eq!(
630 HoprBalance::zero(),
631 db.get_safe_hopr_allowance(None).await?,
632 "balance must be 0"
633 );
634
635 let balance = HoprBalance::from(10_000);
636 db.set_safe_hopr_allowance(None, balance).await?;
637
638 assert_eq!(
639 balance,
640 db.get_safe_hopr_allowance(None).await?,
641 "balance must be {balance}"
642 );
643
644 Ok(())
645 }
646
647 #[tokio::test]
648 async fn test_set_get_indexer_data() -> anyhow::Result<()> {
649 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
650
651 let data = db.get_indexer_data(None).await?;
652 assert_eq!(data.ticket_price, None);
653
654 let price = HoprBalance::from(10);
655 db.update_ticket_price(None, price).await?;
656
657 db.set_minimum_incoming_ticket_win_prob(None, 0.5.try_into()?).await?;
658
659 let data = db.get_indexer_data(None).await?;
660
661 assert_eq!(data.ticket_price, Some(price));
662 assert_eq!(data.minimum_incoming_ticket_winning_prob, 0.5);
663 Ok(())
664 }
665
666 #[tokio::test]
667 async fn test_set_get_safe_info_with_cache() -> anyhow::Result<()> {
668 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
669
670 assert_eq!(None, db.get_safe_info(None).await?);
671
672 let safe_info = SafeInfo {
673 safe_address: *ADDR_1,
674 module_address: *ADDR_2,
675 };
676
677 db.set_safe_info(None, safe_info).await?;
678
679 assert_eq!(Some(safe_info), db.get_safe_info(None).await?);
680 Ok(())
681 }
682
683 #[tokio::test]
684 async fn test_set_get_safe_info() -> anyhow::Result<()> {
685 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
686
687 assert_eq!(None, db.get_safe_info(None).await?);
688
689 let safe_info = SafeInfo {
690 safe_address: *ADDR_1,
691 module_address: *ADDR_2,
692 };
693
694 db.set_safe_info(None, safe_info).await?;
695 db.caches.single_values.invalidate_all();
696
697 assert_eq!(Some(safe_info), db.get_safe_info(None).await?);
698 Ok(())
699 }
700
701 #[tokio::test]
702 async fn test_set_get_global_setting() -> anyhow::Result<()> {
703 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
704
705 let key = "test";
706 let value = hex!("deadbeef");
707
708 assert_eq!(None, db.get_global_setting(None, key).await?);
709
710 db.set_global_setting(None, key, Some(&value)).await?;
711
712 assert_eq!(Some(value.into()), db.get_global_setting(None, key).await?);
713
714 db.set_global_setting(None, key, None).await?;
715
716 assert_eq!(None, db.get_global_setting(None, key).await?);
717 Ok(())
718 }
719}