1use async_trait::async_trait;
2use futures::TryFutureExt;
3use sea_orm::{
4 ActiveModelBehavior, ActiveModelTrait, ColumnTrait, EntityOrSelect, EntityTrait, IntoActiveModel, PaginatorTrait,
5 QueryFilter, Set,
6};
7use tracing::trace;
8
9use hopr_crypto_types::prelude::Hash;
10use hopr_db_api::info::*;
11use hopr_db_entity::prelude::{
12 Account, Announcement, ChainInfo, Channel, NetworkEligibility, NetworkRegistry, NodeInfo,
13};
14use hopr_db_entity::{chain_info, global_settings, node_info};
15use hopr_primitive_types::prelude::*;
16
17use crate::cache::{CachedValue, CachedValueDiscriminants};
18use crate::db::HoprDb;
19use crate::errors::DbSqlError::MissingFixedTableEntry;
20use crate::errors::{DbSqlError, Result};
21use crate::{HoprDbGeneralModelOperations, OptTx, TargetDb, SINGULAR_TABLE_FIXED_ID};
22
23#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
24pub struct IndexerStateInfo {
25 pub latest_block_number: u32,
27 pub latest_log_block_number: u32,
28 pub latest_log_checksum: Hash,
29}
30
31#[async_trait]
48pub trait HoprDbInfoOperations {
49 async fn index_is_empty(&self) -> Result<bool>;
55
56 async fn clear_index_db<'a>(&'a self, tx: OptTx<'a>) -> Result<()>;
62
63 async fn get_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>) -> Result<Balance>;
65
66 async fn set_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>, new_balance: Balance) -> Result<()>;
68
69 async fn get_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>) -> Result<Balance>;
71
72 async fn set_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>, new_allowance: Balance) -> Result<()>;
74
75 async fn get_safe_info<'a>(&'a self, tx: OptTx<'a>) -> Result<Option<SafeInfo>>;
77
78 async fn set_safe_info<'a>(&'a self, tx: OptTx<'a>, safe_info: SafeInfo) -> Result<()>;
80
81 async fn get_indexer_data<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerData>;
86
87 async fn set_domain_separator<'a>(&'a self, tx: OptTx<'a>, dst_type: DomainSeparator, value: Hash) -> Result<()>;
92
93 async fn set_minimum_incoming_ticket_win_prob<'a>(&'a self, tx: OptTx<'a>, win_prob: f64) -> Result<()>;
96
97 async fn update_ticket_price<'a>(&'a self, tx: OptTx<'a>, price: Balance) -> Result<()>;
101
102 async fn get_indexer_state_info<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerStateInfo>;
104
105 async fn set_indexer_state_info<'a>(&'a self, tx: OptTx<'a>, block_num: u32) -> Result<()>;
107
108 async fn set_network_registry_enabled<'a>(&'a self, tx: OptTx<'a>, enabled: bool) -> Result<()>;
112
113 async fn get_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str) -> Result<Option<Box<[u8]>>>;
115
116 async fn set_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str, value: Option<&[u8]>) -> Result<()>;
121}
122
123#[async_trait]
124impl HoprDbInfoOperations for HoprDb {
125 async fn index_is_empty(&self) -> Result<bool> {
126 let c = self.conn(TargetDb::Index);
127
128 if Account::find().select().count(c).await? > 1 {
130 return Ok(false);
131 }
132
133 if Announcement::find().one(c).await?.is_some() {
134 return Ok(false);
135 }
136
137 if Channel::find().one(c).await?.is_some() {
138 return Ok(false);
139 }
140
141 if NetworkEligibility::find().one(c).await?.is_some() {
142 return Ok(false);
143 }
144
145 if NetworkRegistry::find().one(c).await?.is_some() {
146 return Ok(false);
147 }
148
149 Ok(true)
150 }
151
152 async fn clear_index_db<'a>(&'a self, tx: OptTx<'a>) -> Result<()> {
153 self.nest_transaction(tx)
154 .await?
155 .perform(|tx| {
156 Box::pin(async move {
157 Account::delete_many().exec(tx.as_ref()).await?;
158 Announcement::delete_many().exec(tx.as_ref()).await?;
159 Channel::delete_many().exec(tx.as_ref()).await?;
160 NetworkEligibility::delete_many().exec(tx.as_ref()).await?;
161 NetworkRegistry::delete_many().exec(tx.as_ref()).await?;
162 ChainInfo::delete_many().exec(tx.as_ref()).await?;
163 NodeInfo::delete_many().exec(tx.as_ref()).await?;
164
165 let mut initial_row = chain_info::ActiveModel::new();
169 initial_row.id = Set(1);
170 ChainInfo::insert(initial_row).exec(tx.as_ref()).await?;
171
172 let mut initial_row = node_info::ActiveModel::new();
173 initial_row.id = Set(1);
174 NodeInfo::insert(initial_row).exec(tx.as_ref()).await?;
175
176 Ok::<(), DbSqlError>(())
177 })
178 })
179 .await?;
180
181 Ok(())
182 }
183
184 async fn get_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>) -> Result<Balance> {
185 self.nest_transaction(tx)
186 .await?
187 .perform(|tx| {
188 Box::pin(async move {
189 node_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
190 .one(tx.as_ref())
191 .await?
192 .ok_or(MissingFixedTableEntry("node_info".into()))
193 .map(|m| BalanceType::HOPR.balance_bytes(m.safe_balance))
194 })
195 })
196 .await
197 }
198
199 async fn set_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>, new_balance: Balance) -> Result<()> {
200 self.nest_transaction(tx)
201 .await?
202 .perform(|tx| {
203 Box::pin(async move {
204 Ok::<_, DbSqlError>(
205 node_info::ActiveModel {
206 id: Set(SINGULAR_TABLE_FIXED_ID),
207 safe_balance: Set(new_balance.amount().to_be_bytes().into()),
208 ..Default::default()
209 }
210 .update(tx.as_ref()) .await?,
212 )
213 })
214 })
215 .await?;
216
217 Ok(())
218 }
219
220 async fn get_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>) -> Result<Balance> {
221 self.nest_transaction(tx)
222 .await?
223 .perform(|tx| {
224 Box::pin(async move {
225 node_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
226 .one(tx.as_ref())
227 .await?
228 .ok_or(MissingFixedTableEntry("node_info".into()))
229 .map(|m| BalanceType::HOPR.balance_bytes(m.safe_allowance))
230 })
231 })
232 .await
233 }
234
235 async fn set_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>, new_allowance: Balance) -> Result<()> {
236 self.nest_transaction(tx)
237 .await?
238 .perform(|tx| {
239 Box::pin(async move {
240 node_info::ActiveModel {
241 id: Set(SINGULAR_TABLE_FIXED_ID),
242 safe_allowance: Set(new_allowance.amount().to_be_bytes().to_vec()),
243 ..Default::default()
244 }
245 .update(tx.as_ref()) .await?;
247
248 Ok::<_, DbSqlError>(())
249 })
250 })
251 .await
252 }
253
254 async fn get_safe_info<'a>(&'a self, tx: OptTx<'a>) -> Result<Option<SafeInfo>> {
255 let myself = self.clone();
256 Ok(self
257 .caches
258 .single_values
259 .try_get_with_by_ref(&CachedValueDiscriminants::SafeInfoCache, async move {
260 myself
261 .nest_transaction(tx)
262 .and_then(|op| {
263 op.perform(|tx| {
264 Box::pin(async move {
265 let info = node_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
266 .one(tx.as_ref())
267 .await?
268 .ok_or(MissingFixedTableEntry("node_info".into()))?;
269 Ok::<_, DbSqlError>(info.safe_address.zip(info.module_address))
270 })
271 })
272 })
273 .await
274 .and_then(|addrs| {
275 if let Some((safe_address, module_address)) = addrs {
276 Ok(Some(SafeInfo {
277 safe_address: safe_address.parse()?,
278 module_address: module_address.parse()?,
279 }))
280 } else {
281 Ok(None)
282 }
283 })
284 .map(CachedValue::SafeInfoCache)
285 })
286 .await?
287 .try_into()?)
288 }
289
290 async fn set_safe_info<'a>(&'a self, tx: OptTx<'a>, safe_info: SafeInfo) -> Result<()> {
291 self.nest_transaction(tx)
292 .await?
293 .perform(|tx| {
294 Box::pin(async move {
295 node_info::ActiveModel {
296 id: Set(SINGULAR_TABLE_FIXED_ID),
297 safe_address: Set(Some(safe_info.safe_address.to_hex())),
298 module_address: Set(Some(safe_info.module_address.to_hex())),
299 ..Default::default()
300 }
301 .update(tx.as_ref()) .await?;
303 Ok::<_, DbSqlError>(())
304 })
305 })
306 .await?;
307 self.caches
308 .single_values
309 .insert(
310 CachedValueDiscriminants::SafeInfoCache,
311 CachedValue::SafeInfoCache(Some(safe_info)),
312 )
313 .await;
314 Ok(())
315 }
316
317 async fn get_indexer_data<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerData> {
318 let myself = self.clone();
319 Ok(self
320 .caches
321 .single_values
322 .try_get_with_by_ref(&CachedValueDiscriminants::IndexerDataCache, async move {
323 myself
324 .nest_transaction(tx)
325 .and_then(|op| {
326 op.perform(|tx| {
327 Box::pin(async move {
328 let model = chain_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
329 .one(tx.as_ref())
330 .await?
331 .ok_or(MissingFixedTableEntry("chain_info".into()))?;
332
333 let ledger_dst = if let Some(b) = model.ledger_dst {
334 Some(Hash::try_from(b.as_ref())?)
335 } else {
336 None
337 };
338
339 let safe_registry_dst = if let Some(b) = model.safe_registry_dst {
340 Some(Hash::try_from(b.as_ref())?)
341 } else {
342 None
343 };
344
345 let channels_dst = if let Some(b) = model.channels_dst {
346 Some(Hash::try_from(b.as_ref())?)
347 } else {
348 None
349 };
350
351 Ok::<_, DbSqlError>(CachedValue::IndexerDataCache(IndexerData {
352 ledger_dst,
353 safe_registry_dst,
354 channels_dst,
355 ticket_price: model.ticket_price.map(|p| BalanceType::HOPR.balance_bytes(p)),
356 minimum_incoming_ticket_winning_prob: model.min_incoming_ticket_win_prob as f64,
357 nr_enabled: model.network_registry_enabled,
358 }))
359 })
360 })
361 })
362 .await
363 })
364 .await?
365 .try_into()?)
366 }
367
368 async fn set_domain_separator<'a>(&'a self, tx: OptTx<'a>, dst_type: DomainSeparator, value: Hash) -> Result<()> {
369 self.nest_transaction(tx)
370 .await?
371 .perform(|tx| {
372 Box::pin(async move {
373 let mut active_model = chain_info::ActiveModel {
374 id: Set(SINGULAR_TABLE_FIXED_ID),
375 ..Default::default()
376 };
377
378 match dst_type {
379 DomainSeparator::Ledger => {
380 active_model.ledger_dst = Set(Some(value.as_ref().into()));
381 }
382 DomainSeparator::SafeRegistry => {
383 active_model.safe_registry_dst = Set(Some(value.as_ref().into()));
384 }
385 DomainSeparator::Channel => {
386 active_model.channels_dst = Set(Some(value.as_ref().into()));
387 }
388 }
389
390 active_model.update(tx.as_ref()).await?;
392
393 Ok::<(), DbSqlError>(())
394 })
395 })
396 .await?;
397
398 self.caches
399 .single_values
400 .invalidate(&CachedValueDiscriminants::IndexerDataCache)
401 .await;
402 Ok(())
403 }
404
405 async fn set_minimum_incoming_ticket_win_prob<'a>(&'a self, tx: OptTx<'a>, win_prob: f64) -> Result<()> {
406 if !(0.0..=1.0).contains(&win_prob) {
407 return Err(DbSqlError::LogicalError(
408 "winning probability must be between 0 and 1".into(),
409 ));
410 }
411
412 self.nest_transaction(tx)
413 .await?
414 .perform(|tx| {
415 Box::pin(async move {
416 chain_info::ActiveModel {
417 id: Set(SINGULAR_TABLE_FIXED_ID),
418 min_incoming_ticket_win_prob: Set(win_prob as f32),
419 ..Default::default()
420 }
421 .update(tx.as_ref())
422 .await?;
423
424 Ok::<(), DbSqlError>(())
425 })
426 })
427 .await?;
428
429 self.caches
430 .single_values
431 .invalidate(&CachedValueDiscriminants::IndexerDataCache)
432 .await;
433 Ok(())
434 }
435
436 async fn update_ticket_price<'a>(&'a self, tx: OptTx<'a>, price: Balance) -> Result<()> {
437 self.nest_transaction(tx)
438 .await?
439 .perform(|tx| {
440 Box::pin(async move {
441 chain_info::ActiveModel {
442 id: Set(SINGULAR_TABLE_FIXED_ID),
443 ticket_price: Set(Some(price.amount().to_be_bytes().into())),
444 ..Default::default()
445 }
446 .update(tx.as_ref())
447 .await?;
448
449 Ok::<(), DbSqlError>(())
450 })
451 })
452 .await?;
453
454 self.caches
455 .single_values
456 .invalidate(&CachedValueDiscriminants::IndexerDataCache)
457 .await;
458 Ok(())
459 }
460
461 async fn get_indexer_state_info<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerStateInfo> {
462 self.nest_transaction(tx)
463 .await?
464 .perform(|tx| {
465 Box::pin(async move {
466 chain_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
467 .one(tx.as_ref())
468 .await?
469 .ok_or(DbSqlError::MissingFixedTableEntry("chain_info".into()))
470 .map(|m| IndexerStateInfo {
471 latest_block_number: m.last_indexed_block as u32,
472 ..Default::default()
473 })
474 })
475 })
476 .await
477 }
478
479 async fn set_indexer_state_info<'a>(&'a self, tx: OptTx<'a>, block_num: u32) -> Result<()> {
480 self.nest_transaction(tx)
481 .await?
482 .perform(|tx| {
483 Box::pin(async move {
484 let model = chain_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
485 .one(tx.as_ref())
486 .await?
487 .ok_or(MissingFixedTableEntry("chain_info".into()))?;
488
489 let current_last_indexed_block = model.last_indexed_block;
490
491 let mut active_model = model.into_active_model();
492
493 trace!(
494 old_block = current_last_indexed_block,
495 new_block = block_num,
496 "update block"
497 );
498
499 active_model.last_indexed_block = Set(block_num as i32);
500 active_model.update(tx.as_ref()).await?;
501
502 Ok::<_, DbSqlError>(())
503 })
504 })
505 .await
506 }
507
508 async fn set_network_registry_enabled<'a>(&'a self, tx: OptTx<'a>, enabled: bool) -> Result<()> {
509 self.nest_transaction(tx)
510 .await?
511 .perform(|tx| {
512 Box::pin(async move {
513 chain_info::ActiveModel {
514 id: Set(SINGULAR_TABLE_FIXED_ID),
515 network_registry_enabled: Set(enabled),
516 ..Default::default()
517 }
518 .update(tx.as_ref())
519 .await?;
520 Ok::<_, DbSqlError>(())
521 })
522 })
523 .await?;
524
525 self.caches
526 .single_values
527 .invalidate(&CachedValueDiscriminants::IndexerDataCache)
528 .await;
529 Ok(())
530 }
531
532 async fn get_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str) -> Result<Option<Box<[u8]>>> {
533 let k = key.to_owned();
534 self.nest_transaction(tx)
535 .await?
536 .perform(|tx| {
537 Box::pin(async move {
538 Ok::<Option<Box<[u8]>>, DbSqlError>(
539 global_settings::Entity::find()
540 .filter(global_settings::Column::Key.eq(k))
541 .one(tx.as_ref())
542 .await?
543 .map(|m| m.value.into_boxed_slice()),
544 )
545 })
546 })
547 .await
548 }
549
550 async fn set_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str, value: Option<&[u8]>) -> Result<()> {
551 let k = key.to_owned();
552 let value = value.map(Vec::from);
553 self.nest_transaction(tx)
554 .await?
555 .perform(|tx| {
556 Box::pin(async move {
557 if let Some(v) = value {
558 let mut am = global_settings::Entity::find()
559 .filter(global_settings::Column::Key.eq(k.clone()))
560 .one(tx.as_ref())
561 .await?
562 .map(|m| m.into_active_model())
563 .unwrap_or(global_settings::ActiveModel {
564 key: Set(k),
565 ..Default::default()
566 });
567 am.value = Set(v);
568 am.save(tx.as_ref()).await?;
569 } else {
570 global_settings::Entity::delete_many()
571 .filter(global_settings::Column::Key.eq(k))
572 .exec(tx.as_ref())
573 .await?;
574 }
575 Ok::<(), DbSqlError>(())
576 })
577 })
578 .await
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use hex_literal::hex;
585 use hopr_crypto_types::keypairs::ChainKeypair;
586 use hopr_crypto_types::prelude::Keypair;
587
588 use hopr_primitive_types::prelude::{Address, BalanceType};
589
590 use crate::db::HoprDb;
591 use crate::info::{HoprDbInfoOperations, SafeInfo};
592
593 lazy_static::lazy_static! {
594 static ref ADDR_1: Address = Address::from(hex!("86fa27add61fafc955e2da17329bba9f31692fe7"));
595 static ref ADDR_2: Address = Address::from(hex!("4c8bbd047c2130e702badb23b6b97a88b6562324"));
596 }
597
598 #[async_std::test]
599 async fn test_set_get_balance() -> anyhow::Result<()> {
600 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
601
602 assert_eq!(
603 BalanceType::HOPR.zero(),
604 db.get_safe_hopr_balance(None).await?,
605 "balance must be 0"
606 );
607
608 let balance = BalanceType::HOPR.balance(10_000);
609 db.set_safe_hopr_balance(None, balance).await?;
610
611 assert_eq!(
612 balance,
613 db.get_safe_hopr_balance(None).await?,
614 "balance must be {balance}"
615 );
616 Ok(())
617 }
618
619 #[async_std::test]
620 async fn test_set_get_allowance() -> anyhow::Result<()> {
621 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
622
623 assert_eq!(
624 BalanceType::HOPR.zero(),
625 db.get_safe_hopr_allowance(None).await?,
626 "balance must be 0"
627 );
628
629 let balance = BalanceType::HOPR.balance(10_000);
630 db.set_safe_hopr_allowance(None, balance).await?;
631
632 assert_eq!(
633 balance,
634 db.get_safe_hopr_allowance(None).await?,
635 "balance must be {balance}"
636 );
637
638 Ok(())
639 }
640
641 #[async_std::test]
642 async fn test_set_get_indexer_data() -> anyhow::Result<()> {
643 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
644
645 let data = db.get_indexer_data(None).await?;
646 assert_eq!(data.ticket_price, None);
647
648 let price = BalanceType::HOPR.balance(10);
649 db.update_ticket_price(None, price).await?;
650
651 db.set_minimum_incoming_ticket_win_prob(None, 0.5).await?;
652
653 let data = db.get_indexer_data(None).await?;
654
655 assert_eq!(data.ticket_price, Some(price));
656 assert_eq!(data.minimum_incoming_ticket_winning_prob, 0.5);
657 Ok(())
658 }
659
660 #[async_std::test]
661 async fn test_set_get_safe_info_with_cache() -> anyhow::Result<()> {
662 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
663
664 assert_eq!(None, db.get_safe_info(None).await?);
665
666 let safe_info = SafeInfo {
667 safe_address: *ADDR_1,
668 module_address: *ADDR_2,
669 };
670
671 db.set_safe_info(None, safe_info).await?;
672
673 assert_eq!(Some(safe_info), db.get_safe_info(None).await?);
674 Ok(())
675 }
676
677 #[async_std::test]
678 async fn test_set_get_safe_info() -> anyhow::Result<()> {
679 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
680
681 assert_eq!(None, db.get_safe_info(None).await?);
682
683 let safe_info = SafeInfo {
684 safe_address: *ADDR_1,
685 module_address: *ADDR_2,
686 };
687
688 db.set_safe_info(None, safe_info).await?;
689 db.caches.single_values.invalidate_all();
690
691 assert_eq!(Some(safe_info), db.get_safe_info(None).await?);
692 Ok(())
693 }
694
695 #[async_std::test]
696 async fn test_set_get_global_setting() -> anyhow::Result<()> {
697 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
698
699 let key = "test";
700 let value = hex!("deadbeef");
701
702 assert_eq!(None, db.get_global_setting(None, key).await?);
703
704 db.set_global_setting(None, key, Some(&value)).await?;
705
706 assert_eq!(Some(value.into()), db.get_global_setting(None, key).await?);
707
708 db.set_global_setting(None, key, None).await?;
709
710 assert_eq!(None, db.get_global_setting(None, key).await?);
711 Ok(())
712 }
713}