1use async_trait::async_trait;
2use futures::TryFutureExt;
3use hopr_crypto_types::prelude::Hash;
4use hopr_db_api::info::*;
5use hopr_db_entity::{
6 chain_info, global_settings, node_info,
7 prelude::{Account, Announcement, ChainInfo, Channel, NetworkEligibility, NetworkRegistry, NodeInfo},
8};
9use hopr_internal_types::prelude::WinningProbability;
10use hopr_primitive_types::prelude::*;
11use sea_orm::{
12 ActiveModelBehavior, ActiveModelTrait, ColumnTrait, EntityOrSelect, EntityTrait, IntoActiveModel, PaginatorTrait,
13 QueryFilter, Set,
14};
15use tracing::trace;
16
17use crate::{
18 HoprDbGeneralModelOperations, OptTx, SINGULAR_TABLE_FIXED_ID, TargetDb,
19 cache::{CachedValue, CachedValueDiscriminants},
20 db::HoprDb,
21 errors::{DbSqlError, DbSqlError::MissingFixedTableEntry, Result},
22};
23
24#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
25pub struct IndexerStateInfo {
26 pub latest_block_number: u32,
28 pub latest_log_block_number: u32,
29 pub latest_log_checksum: Hash,
30}
31
32#[async_trait]
50pub trait HoprDbInfoOperations {
51 async fn index_is_empty(&self) -> Result<bool>;
57
58 async fn clear_index_db<'a>(&'a self, tx: OptTx<'a>) -> Result<()>;
64
65 async fn get_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>) -> Result<HoprBalance>;
67
68 async fn set_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>, new_balance: HoprBalance) -> Result<()>;
70
71 async fn get_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>) -> Result<HoprBalance>;
73
74 async fn set_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>, new_allowance: HoprBalance) -> Result<()>;
76
77 async fn get_safe_info<'a>(&'a self, tx: OptTx<'a>) -> Result<Option<SafeInfo>>;
79
80 async fn set_safe_info<'a>(&'a self, tx: OptTx<'a>, safe_info: SafeInfo) -> Result<()>;
82
83 async fn get_indexer_data<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerData>;
88
89 async fn set_domain_separator<'a>(&'a self, tx: OptTx<'a>, dst_type: DomainSeparator, value: Hash) -> Result<()>;
94
95 async fn set_minimum_incoming_ticket_win_prob<'a>(
98 &'a self,
99 tx: OptTx<'a>,
100 win_prob: WinningProbability,
101 ) -> Result<()>;
102
103 async fn update_ticket_price<'a>(&'a self, tx: OptTx<'a>, price: HoprBalance) -> Result<()>;
107
108 async fn get_indexer_state_info<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerStateInfo>;
110
111 async fn set_indexer_state_info<'a>(&'a self, tx: OptTx<'a>, block_num: u32) -> Result<()>;
113
114 async fn set_network_registry_enabled<'a>(&'a self, tx: OptTx<'a>, enabled: bool) -> Result<()>;
118
119 async fn get_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str) -> Result<Option<Box<[u8]>>>;
121
122 async fn set_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str, value: Option<&[u8]>) -> Result<()>;
127}
128
129#[async_trait]
130impl HoprDbInfoOperations for HoprDb {
131 async fn index_is_empty(&self) -> Result<bool> {
132 let c = self.conn(TargetDb::Index);
133
134 if Account::find().select().count(c).await? > 1 {
136 return Ok(false);
137 }
138
139 if Announcement::find().one(c).await?.is_some() {
140 return Ok(false);
141 }
142
143 if Channel::find().one(c).await?.is_some() {
144 return Ok(false);
145 }
146
147 if NetworkEligibility::find().one(c).await?.is_some() {
148 return Ok(false);
149 }
150
151 if NetworkRegistry::find().one(c).await?.is_some() {
152 return Ok(false);
153 }
154
155 Ok(true)
156 }
157
158 async fn clear_index_db<'a>(&'a self, tx: OptTx<'a>) -> Result<()> {
159 self.nest_transaction(tx)
160 .await?
161 .perform(|tx| {
162 Box::pin(async move {
163 Account::delete_many().exec(tx.as_ref()).await?;
164 Announcement::delete_many().exec(tx.as_ref()).await?;
165 Channel::delete_many().exec(tx.as_ref()).await?;
166 NetworkEligibility::delete_many().exec(tx.as_ref()).await?;
167 NetworkRegistry::delete_many().exec(tx.as_ref()).await?;
168 ChainInfo::delete_many().exec(tx.as_ref()).await?;
169 NodeInfo::delete_many().exec(tx.as_ref()).await?;
170
171 let mut initial_row = chain_info::ActiveModel::new();
175 initial_row.id = Set(1);
176 ChainInfo::insert(initial_row).exec(tx.as_ref()).await?;
177
178 let mut initial_row = node_info::ActiveModel::new();
179 initial_row.id = Set(1);
180 NodeInfo::insert(initial_row).exec(tx.as_ref()).await?;
181
182 Ok::<(), DbSqlError>(())
183 })
184 })
185 .await?;
186
187 Ok(())
188 }
189
190 async fn get_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>) -> Result<HoprBalance> {
191 self.nest_transaction(tx)
192 .await?
193 .perform(|tx| {
194 Box::pin(async move {
195 node_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
196 .one(tx.as_ref())
197 .await?
198 .ok_or(MissingFixedTableEntry("node_info".into()))
199 .map(|m| HoprBalance::from_be_bytes(m.safe_balance))
200 })
201 })
202 .await
203 }
204
205 async fn set_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>, new_balance: HoprBalance) -> Result<()> {
206 self.nest_transaction(tx)
207 .await?
208 .perform(|tx| {
209 Box::pin(async move {
210 Ok::<_, DbSqlError>(
211 node_info::ActiveModel {
212 id: Set(SINGULAR_TABLE_FIXED_ID),
213 safe_balance: Set(new_balance.to_be_bytes().into()),
214 ..Default::default()
215 }
216 .update(tx.as_ref()) .await?,
218 )
219 })
220 })
221 .await?;
222
223 Ok(())
224 }
225
226 async fn get_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>) -> Result<HoprBalance> {
227 self.nest_transaction(tx)
228 .await?
229 .perform(|tx| {
230 Box::pin(async move {
231 node_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
232 .one(tx.as_ref())
233 .await?
234 .ok_or(MissingFixedTableEntry("node_info".into()))
235 .map(|m| HoprBalance::from_be_bytes(m.safe_allowance))
236 })
237 })
238 .await
239 }
240
241 async fn set_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>, new_allowance: HoprBalance) -> Result<()> {
242 self.nest_transaction(tx)
243 .await?
244 .perform(|tx| {
245 Box::pin(async move {
246 node_info::ActiveModel {
247 id: Set(SINGULAR_TABLE_FIXED_ID),
248 safe_allowance: Set(new_allowance.amount().to_be_bytes().to_vec()),
249 ..Default::default()
250 }
251 .update(tx.as_ref()) .await?;
253
254 Ok::<_, DbSqlError>(())
255 })
256 })
257 .await
258 }
259
260 async fn get_safe_info<'a>(&'a self, tx: OptTx<'a>) -> Result<Option<SafeInfo>> {
261 let myself = self.clone();
262 Ok(self
263 .caches
264 .single_values
265 .try_get_with_by_ref(&CachedValueDiscriminants::SafeInfoCache, async move {
266 myself
267 .nest_transaction(tx)
268 .and_then(|op| {
269 op.perform(|tx| {
270 Box::pin(async move {
271 let info = node_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
272 .one(tx.as_ref())
273 .await?
274 .ok_or(MissingFixedTableEntry("node_info".into()))?;
275 Ok::<_, DbSqlError>(info.safe_address.zip(info.module_address))
276 })
277 })
278 })
279 .await
280 .and_then(|addrs| {
281 if let Some((safe_address, module_address)) = addrs {
282 Ok(Some(SafeInfo {
283 safe_address: safe_address.parse()?,
284 module_address: module_address.parse()?,
285 }))
286 } else {
287 Ok(None)
288 }
289 })
290 .map(CachedValue::SafeInfoCache)
291 })
292 .await?
293 .try_into()?)
294 }
295
296 async fn set_safe_info<'a>(&'a self, tx: OptTx<'a>, safe_info: SafeInfo) -> Result<()> {
297 self.nest_transaction(tx)
298 .await?
299 .perform(|tx| {
300 Box::pin(async move {
301 node_info::ActiveModel {
302 id: Set(SINGULAR_TABLE_FIXED_ID),
303 safe_address: Set(Some(safe_info.safe_address.to_hex())),
304 module_address: Set(Some(safe_info.module_address.to_hex())),
305 ..Default::default()
306 }
307 .update(tx.as_ref()) .await?;
309 Ok::<_, DbSqlError>(())
310 })
311 })
312 .await?;
313 self.caches
314 .single_values
315 .insert(
316 CachedValueDiscriminants::SafeInfoCache,
317 CachedValue::SafeInfoCache(Some(safe_info)),
318 )
319 .await;
320 Ok(())
321 }
322
323 async fn get_indexer_data<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerData> {
324 let myself = self.clone();
325 Ok(self
326 .caches
327 .single_values
328 .try_get_with_by_ref(&CachedValueDiscriminants::IndexerDataCache, async move {
329 myself
330 .nest_transaction(tx)
331 .and_then(|op| {
332 op.perform(|tx| {
333 Box::pin(async move {
334 let model = chain_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
335 .one(tx.as_ref())
336 .await?
337 .ok_or(MissingFixedTableEntry("chain_info".into()))?;
338
339 let ledger_dst = if let Some(b) = model.ledger_dst {
340 Some(Hash::try_from(b.as_ref())?)
341 } else {
342 None
343 };
344
345 let safe_registry_dst = if let Some(b) = model.safe_registry_dst {
346 Some(Hash::try_from(b.as_ref())?)
347 } else {
348 None
349 };
350
351 let channels_dst = if let Some(b) = model.channels_dst {
352 Some(Hash::try_from(b.as_ref())?)
353 } else {
354 None
355 };
356
357 Ok::<_, DbSqlError>(CachedValue::IndexerDataCache(IndexerData {
358 ledger_dst,
359 safe_registry_dst,
360 channels_dst,
361 ticket_price: model.ticket_price.map(HoprBalance::from_be_bytes),
362 minimum_incoming_ticket_winning_prob: (model.min_incoming_ticket_win_prob as f64)
363 .try_into()?,
364 nr_enabled: model.network_registry_enabled,
365 }))
366 })
367 })
368 })
369 .await
370 })
371 .await?
372 .try_into()?)
373 }
374
375 async fn set_domain_separator<'a>(&'a self, tx: OptTx<'a>, dst_type: DomainSeparator, value: Hash) -> Result<()> {
376 self.nest_transaction(tx)
377 .await?
378 .perform(|tx| {
379 Box::pin(async move {
380 let mut active_model = chain_info::ActiveModel {
381 id: Set(SINGULAR_TABLE_FIXED_ID),
382 ..Default::default()
383 };
384
385 match dst_type {
386 DomainSeparator::Ledger => {
387 active_model.ledger_dst = Set(Some(value.as_ref().into()));
388 }
389 DomainSeparator::SafeRegistry => {
390 active_model.safe_registry_dst = Set(Some(value.as_ref().into()));
391 }
392 DomainSeparator::Channel => {
393 active_model.channels_dst = Set(Some(value.as_ref().into()));
394 }
395 }
396
397 active_model.update(tx.as_ref()).await?;
399
400 Ok::<(), DbSqlError>(())
401 })
402 })
403 .await?;
404
405 self.caches
406 .single_values
407 .invalidate(&CachedValueDiscriminants::IndexerDataCache)
408 .await;
409 Ok(())
410 }
411
412 async fn set_minimum_incoming_ticket_win_prob<'a>(
413 &'a self,
414 tx: OptTx<'a>,
415 win_prob: WinningProbability,
416 ) -> Result<()> {
417 self.nest_transaction(tx)
418 .await?
419 .perform(|tx| {
420 Box::pin(async move {
421 chain_info::ActiveModel {
422 id: Set(SINGULAR_TABLE_FIXED_ID),
423 min_incoming_ticket_win_prob: Set(win_prob.as_f64() as f32),
424 ..Default::default()
425 }
426 .update(tx.as_ref())
427 .await?;
428
429 Ok::<(), DbSqlError>(())
430 })
431 })
432 .await?;
433
434 self.caches
435 .single_values
436 .invalidate(&CachedValueDiscriminants::IndexerDataCache)
437 .await;
438 Ok(())
439 }
440
441 async fn update_ticket_price<'a>(&'a self, tx: OptTx<'a>, price: HoprBalance) -> Result<()> {
442 self.nest_transaction(tx)
443 .await?
444 .perform(|tx| {
445 Box::pin(async move {
446 chain_info::ActiveModel {
447 id: Set(SINGULAR_TABLE_FIXED_ID),
448 ticket_price: Set(Some(price.to_be_bytes().into())),
449 ..Default::default()
450 }
451 .update(tx.as_ref())
452 .await?;
453
454 Ok::<(), DbSqlError>(())
455 })
456 })
457 .await?;
458
459 self.caches
460 .single_values
461 .invalidate(&CachedValueDiscriminants::IndexerDataCache)
462 .await;
463 Ok(())
464 }
465
466 async fn get_indexer_state_info<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerStateInfo> {
467 self.nest_transaction(tx)
468 .await?
469 .perform(|tx| {
470 Box::pin(async move {
471 chain_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
472 .one(tx.as_ref())
473 .await?
474 .ok_or(DbSqlError::MissingFixedTableEntry("chain_info".into()))
475 .map(|m| IndexerStateInfo {
476 latest_block_number: m.last_indexed_block as u32,
477 ..Default::default()
478 })
479 })
480 })
481 .await
482 }
483
484 async fn set_indexer_state_info<'a>(&'a self, tx: OptTx<'a>, block_num: u32) -> Result<()> {
485 self.nest_transaction(tx)
486 .await?
487 .perform(|tx| {
488 Box::pin(async move {
489 let model = chain_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
490 .one(tx.as_ref())
491 .await?
492 .ok_or(MissingFixedTableEntry("chain_info".into()))?;
493
494 let current_last_indexed_block = model.last_indexed_block;
495
496 let mut active_model = model.into_active_model();
497
498 trace!(
499 old_block = current_last_indexed_block,
500 new_block = block_num,
501 "update block"
502 );
503
504 active_model.last_indexed_block = Set(block_num as i32);
505 active_model.update(tx.as_ref()).await?;
506
507 Ok::<_, DbSqlError>(())
508 })
509 })
510 .await
511 }
512
513 async fn set_network_registry_enabled<'a>(&'a self, tx: OptTx<'a>, enabled: bool) -> Result<()> {
514 self.nest_transaction(tx)
515 .await?
516 .perform(|tx| {
517 Box::pin(async move {
518 chain_info::ActiveModel {
519 id: Set(SINGULAR_TABLE_FIXED_ID),
520 network_registry_enabled: Set(enabled),
521 ..Default::default()
522 }
523 .update(tx.as_ref())
524 .await?;
525 Ok::<_, DbSqlError>(())
526 })
527 })
528 .await?;
529
530 self.caches
531 .single_values
532 .invalidate(&CachedValueDiscriminants::IndexerDataCache)
533 .await;
534 Ok(())
535 }
536
537 async fn get_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str) -> Result<Option<Box<[u8]>>> {
538 let k = key.to_owned();
539 self.nest_transaction(tx)
540 .await?
541 .perform(|tx| {
542 Box::pin(async move {
543 Ok::<Option<Box<[u8]>>, DbSqlError>(
544 global_settings::Entity::find()
545 .filter(global_settings::Column::Key.eq(k))
546 .one(tx.as_ref())
547 .await?
548 .map(|m| m.value.into_boxed_slice()),
549 )
550 })
551 })
552 .await
553 }
554
555 async fn set_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str, value: Option<&[u8]>) -> Result<()> {
556 let k = key.to_owned();
557 let value = value.map(Vec::from);
558 self.nest_transaction(tx)
559 .await?
560 .perform(|tx| {
561 Box::pin(async move {
562 if let Some(v) = value {
563 let mut am = global_settings::Entity::find()
564 .filter(global_settings::Column::Key.eq(k.clone()))
565 .one(tx.as_ref())
566 .await?
567 .map(|m| m.into_active_model())
568 .unwrap_or(global_settings::ActiveModel {
569 key: Set(k),
570 ..Default::default()
571 });
572 am.value = Set(v);
573 am.save(tx.as_ref()).await?;
574 } else {
575 global_settings::Entity::delete_many()
576 .filter(global_settings::Column::Key.eq(k))
577 .exec(tx.as_ref())
578 .await?;
579 }
580 Ok::<(), DbSqlError>(())
581 })
582 })
583 .await
584 }
585}
586
587#[cfg(test)]
588mod tests {
589 use hex_literal::hex;
590 use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
591 use hopr_primitive_types::{balance::HoprBalance, prelude::Address};
592
593 use crate::{
594 db::HoprDb,
595 info::{HoprDbInfoOperations, SafeInfo},
596 };
597
598 lazy_static::lazy_static! {
599 static ref ADDR_1: Address = Address::from(hex!("86fa27add61fafc955e2da17329bba9f31692fe7"));
600 static ref ADDR_2: Address = Address::from(hex!("4c8bbd047c2130e702badb23b6b97a88b6562324"));
601 }
602
603 #[tokio::test]
604 async fn test_set_get_balance() -> anyhow::Result<()> {
605 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
606
607 assert_eq!(
608 HoprBalance::zero(),
609 db.get_safe_hopr_balance(None).await?,
610 "balance must be 0"
611 );
612
613 let balance = HoprBalance::from(10_000);
614 db.set_safe_hopr_balance(None, balance).await?;
615
616 assert_eq!(
617 balance,
618 db.get_safe_hopr_balance(None).await?,
619 "balance must be {balance}"
620 );
621 Ok(())
622 }
623
624 #[tokio::test]
625 async fn test_set_get_allowance() -> anyhow::Result<()> {
626 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
627
628 assert_eq!(
629 HoprBalance::zero(),
630 db.get_safe_hopr_allowance(None).await?,
631 "balance must be 0"
632 );
633
634 let balance = HoprBalance::from(10_000);
635 db.set_safe_hopr_allowance(None, balance).await?;
636
637 assert_eq!(
638 balance,
639 db.get_safe_hopr_allowance(None).await?,
640 "balance must be {balance}"
641 );
642
643 Ok(())
644 }
645
646 #[tokio::test]
647 async fn test_set_get_indexer_data() -> anyhow::Result<()> {
648 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
649
650 let data = db.get_indexer_data(None).await?;
651 assert_eq!(data.ticket_price, None);
652
653 let price = HoprBalance::from(10);
654 db.update_ticket_price(None, price).await?;
655
656 db.set_minimum_incoming_ticket_win_prob(None, 0.5.try_into()?).await?;
657
658 let data = db.get_indexer_data(None).await?;
659
660 assert_eq!(data.ticket_price, Some(price));
661 assert_eq!(data.minimum_incoming_ticket_winning_prob, 0.5);
662 Ok(())
663 }
664
665 #[tokio::test]
666 async fn test_set_get_safe_info_with_cache() -> anyhow::Result<()> {
667 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
668
669 assert_eq!(None, db.get_safe_info(None).await?);
670
671 let safe_info = SafeInfo {
672 safe_address: *ADDR_1,
673 module_address: *ADDR_2,
674 };
675
676 db.set_safe_info(None, safe_info).await?;
677
678 assert_eq!(Some(safe_info), db.get_safe_info(None).await?);
679 Ok(())
680 }
681
682 #[tokio::test]
683 async fn test_set_get_safe_info() -> anyhow::Result<()> {
684 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
685
686 assert_eq!(None, db.get_safe_info(None).await?);
687
688 let safe_info = SafeInfo {
689 safe_address: *ADDR_1,
690 module_address: *ADDR_2,
691 };
692
693 db.set_safe_info(None, safe_info).await?;
694 db.caches.single_values.invalidate_all();
695
696 assert_eq!(Some(safe_info), db.get_safe_info(None).await?);
697 Ok(())
698 }
699
700 #[tokio::test]
701 async fn test_set_get_global_setting() -> anyhow::Result<()> {
702 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
703
704 let key = "test";
705 let value = hex!("deadbeef");
706
707 assert_eq!(None, db.get_global_setting(None, key).await?);
708
709 db.set_global_setting(None, key, Some(&value)).await?;
710
711 assert_eq!(Some(value.into()), db.get_global_setting(None, key).await?);
712
713 db.set_global_setting(None, key, None).await?;
714
715 assert_eq!(None, db.get_global_setting(None, key).await?);
716 Ok(())
717 }
718}