1use async_trait::async_trait;
2use futures::TryFutureExt;
3use hopr_crypto_types::prelude::Hash;
4use hopr_db_entity::{
5    chain_info, global_settings, node_info,
6    prelude::{Account, Announcement, ChainInfo, Channel, NetworkEligibility, NetworkRegistry, NodeInfo},
7};
8use hopr_internal_types::prelude::WinningProbability;
9use hopr_primitive_types::prelude::*;
10use sea_orm::{
11    ActiveModelBehavior, ActiveModelTrait, ColumnTrait, EntityOrSelect, EntityTrait, IntoActiveModel, PaginatorTrait,
12    QueryFilter, Set,
13};
14use tracing::trace;
15
16use crate::{
17    HoprDbGeneralModelOperations, HoprIndexerDb, OptTx, SINGULAR_TABLE_FIXED_ID, TargetDb,
18    cache::{CachedValue, CachedValueDiscriminants},
19    errors::{DbSqlError, DbSqlError::MissingFixedTableEntry, Result},
20};
21
22#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
23pub struct IndexerStateInfo {
24    pub latest_block_number: u32,
26    pub latest_log_block_number: u32,
27    pub latest_log_checksum: Hash,
28}
29
30#[derive(Clone, Copy, Debug)]
34pub struct IndexerData {
35    pub ledger_dst: Option<Hash>,
37    pub safe_registry_dst: Option<Hash>,
39    pub channels_dst: Option<Hash>,
41    pub ticket_price: Option<HoprBalance>,
43    pub minimum_incoming_ticket_winning_prob: WinningProbability,
45    pub nr_enabled: bool,
47}
48
49#[derive(Copy, Clone, Debug, PartialEq, Eq)]
51pub enum DomainSeparator {
52    Ledger,
54    SafeRegistry,
56    Channel,
58}
59
60impl IndexerData {
61    pub fn domain_separator(&self, dst_type: DomainSeparator) -> Option<Hash> {
63        match dst_type {
64            DomainSeparator::Ledger => self.ledger_dst,
65            DomainSeparator::SafeRegistry => self.safe_registry_dst,
66            DomainSeparator::Channel => self.channels_dst,
67        }
68    }
69}
70
71#[async_trait]
89pub trait HoprDbInfoOperations {
90    async fn index_is_empty(&self) -> Result<bool>;
96
97    async fn clear_index_db<'a>(&'a self, tx: OptTx<'a>) -> Result<()>;
103
104    async fn get_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>) -> Result<HoprBalance>;
106
107    async fn set_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>, new_balance: HoprBalance) -> Result<()>;
109
110    async fn get_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>) -> Result<HoprBalance>;
112
113    async fn set_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>, new_allowance: HoprBalance) -> Result<()>;
115
116    async fn get_indexer_data<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerData>;
121
122    async fn set_domain_separator<'a>(&'a self, tx: OptTx<'a>, dst_type: DomainSeparator, value: Hash) -> Result<()>;
127
128    async fn set_minimum_incoming_ticket_win_prob<'a>(
131        &'a self,
132        tx: OptTx<'a>,
133        win_prob: WinningProbability,
134    ) -> Result<()>;
135
136    async fn update_ticket_price<'a>(&'a self, tx: OptTx<'a>, price: HoprBalance) -> Result<()>;
140
141    async fn get_indexer_state_info<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerStateInfo>;
143
144    async fn set_indexer_state_info<'a>(&'a self, tx: OptTx<'a>, block_num: u32) -> Result<()>;
146
147    async fn get_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str) -> Result<Option<Box<[u8]>>>;
149
150    async fn set_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str, value: Option<&[u8]>) -> Result<()>;
155}
156
157#[async_trait]
158impl HoprDbInfoOperations for HoprIndexerDb {
159    async fn index_is_empty(&self) -> Result<bool> {
160        let c = self.conn(TargetDb::Index);
161
162        if Account::find().select().count(c).await? > 1 {
164            return Ok(false);
165        }
166
167        if Announcement::find().one(c).await?.is_some() {
168            return Ok(false);
169        }
170
171        if Channel::find().one(c).await?.is_some() {
172            return Ok(false);
173        }
174
175        if NetworkEligibility::find().one(c).await?.is_some() {
176            return Ok(false);
177        }
178
179        if NetworkRegistry::find().one(c).await?.is_some() {
180            return Ok(false);
181        }
182
183        Ok(true)
184    }
185
186    async fn clear_index_db<'a>(&'a self, tx: OptTx<'a>) -> Result<()> {
187        self.nest_transaction(tx)
188            .await?
189            .perform(|tx| {
190                Box::pin(async move {
191                    Account::delete_many().exec(tx.as_ref()).await?;
192                    Announcement::delete_many().exec(tx.as_ref()).await?;
193                    Channel::delete_many().exec(tx.as_ref()).await?;
194                    NetworkEligibility::delete_many().exec(tx.as_ref()).await?;
195                    NetworkRegistry::delete_many().exec(tx.as_ref()).await?;
196                    ChainInfo::delete_many().exec(tx.as_ref()).await?;
197                    NodeInfo::delete_many().exec(tx.as_ref()).await?;
198
199                    let mut initial_row = chain_info::ActiveModel::new();
203                    initial_row.id = Set(1);
204                    ChainInfo::insert(initial_row).exec(tx.as_ref()).await?;
205
206                    let mut initial_row = node_info::ActiveModel::new();
207                    initial_row.id = Set(1);
208                    NodeInfo::insert(initial_row).exec(tx.as_ref()).await?;
209
210                    Ok::<(), DbSqlError>(())
211                })
212            })
213            .await?;
214
215        Ok(())
216    }
217
218    async fn get_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>) -> Result<HoprBalance> {
219        self.nest_transaction(tx)
220            .await?
221            .perform(|tx| {
222                Box::pin(async move {
223                    node_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
224                        .one(tx.as_ref())
225                        .await?
226                        .ok_or(MissingFixedTableEntry("node_info".into()))
227                        .map(|m| HoprBalance::from_be_bytes(m.safe_balance))
228                })
229            })
230            .await
231    }
232
233    async fn set_safe_hopr_balance<'a>(&'a self, tx: OptTx<'a>, new_balance: HoprBalance) -> Result<()> {
234        self.nest_transaction(tx)
235            .await?
236            .perform(|tx| {
237                Box::pin(async move {
238                    Ok::<_, DbSqlError>(
239                        node_info::ActiveModel {
240                            id: Set(SINGULAR_TABLE_FIXED_ID),
241                            safe_balance: Set(new_balance.to_be_bytes().into()),
242                            ..Default::default()
243                        }
244                        .update(tx.as_ref()) .await?,
246                    )
247                })
248            })
249            .await?;
250
251        Ok(())
252    }
253
254    async fn get_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>) -> Result<HoprBalance> {
255        self.nest_transaction(tx)
256            .await?
257            .perform(|tx| {
258                Box::pin(async move {
259                    node_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
260                        .one(tx.as_ref())
261                        .await?
262                        .ok_or(MissingFixedTableEntry("node_info".into()))
263                        .map(|m| HoprBalance::from_be_bytes(m.safe_allowance))
264                })
265            })
266            .await
267    }
268
269    async fn set_safe_hopr_allowance<'a>(&'a self, tx: OptTx<'a>, new_allowance: HoprBalance) -> Result<()> {
270        self.nest_transaction(tx)
271            .await?
272            .perform(|tx| {
273                Box::pin(async move {
274                    node_info::ActiveModel {
275                        id: Set(SINGULAR_TABLE_FIXED_ID),
276                        safe_allowance: Set(new_allowance.amount().to_be_bytes().to_vec()),
277                        ..Default::default()
278                    }
279                    .update(tx.as_ref()) .await?;
281
282                    Ok::<_, DbSqlError>(())
283                })
284            })
285            .await
286    }
287
288    async fn get_indexer_data<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerData> {
289        let myself = self.clone();
290        Ok(self
291            .caches
292            .single_values
293            .try_get_with_by_ref(&CachedValueDiscriminants::IndexerDataCache, async move {
294                tracing::warn!("cache miss on get_indexer_data");
295                myself
296                    .nest_transaction(tx)
297                    .and_then(|op| {
298                        op.perform(|tx| {
299                            Box::pin(async move {
300                                let model = chain_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
301                                    .one(tx.as_ref())
302                                    .await?
303                                    .ok_or(MissingFixedTableEntry("chain_info".into()))?;
304
305                                let ledger_dst = if let Some(b) = model.ledger_dst {
306                                    Some(Hash::try_from(b.as_ref())?)
307                                } else {
308                                    None
309                                };
310
311                                let safe_registry_dst = if let Some(b) = model.safe_registry_dst {
312                                    Some(Hash::try_from(b.as_ref())?)
313                                } else {
314                                    None
315                                };
316
317                                let channels_dst = if let Some(b) = model.channels_dst {
318                                    Some(Hash::try_from(b.as_ref())?)
319                                } else {
320                                    None
321                                };
322
323                                Ok::<_, DbSqlError>(CachedValue::IndexerDataCache(IndexerData {
324                                    ledger_dst,
325                                    safe_registry_dst,
326                                    channels_dst,
327                                    ticket_price: model.ticket_price.map(HoprBalance::from_be_bytes),
328                                    minimum_incoming_ticket_winning_prob: (model.min_incoming_ticket_win_prob as f64)
329                                        .try_into()?,
330                                    nr_enabled: model.network_registry_enabled,
331                                }))
332                            })
333                        })
334                    })
335                    .await
336            })
337            .await?
338            .try_into()?)
339    }
340
341    async fn set_domain_separator<'a>(&'a self, tx: OptTx<'a>, dst_type: DomainSeparator, value: Hash) -> Result<()> {
342        self.nest_transaction(tx)
343            .await?
344            .perform(|tx| {
345                Box::pin(async move {
346                    let mut active_model = chain_info::ActiveModel {
347                        id: Set(SINGULAR_TABLE_FIXED_ID),
348                        ..Default::default()
349                    };
350
351                    match dst_type {
352                        DomainSeparator::Ledger => {
353                            active_model.ledger_dst = Set(Some(value.as_ref().into()));
354                        }
355                        DomainSeparator::SafeRegistry => {
356                            active_model.safe_registry_dst = Set(Some(value.as_ref().into()));
357                        }
358                        DomainSeparator::Channel => {
359                            active_model.channels_dst = Set(Some(value.as_ref().into()));
360                        }
361                    }
362
363                    active_model.update(tx.as_ref()).await?;
365
366                    Ok::<(), DbSqlError>(())
367                })
368            })
369            .await?;
370
371        self.caches
372            .single_values
373            .invalidate(&CachedValueDiscriminants::IndexerDataCache)
374            .await;
375        Ok(())
376    }
377
378    async fn set_minimum_incoming_ticket_win_prob<'a>(
379        &'a self,
380        tx: OptTx<'a>,
381        win_prob: WinningProbability,
382    ) -> Result<()> {
383        self.nest_transaction(tx)
384            .await?
385            .perform(|tx| {
386                Box::pin(async move {
387                    chain_info::ActiveModel {
388                        id: Set(SINGULAR_TABLE_FIXED_ID),
389                        min_incoming_ticket_win_prob: Set(win_prob.as_f64() as f32),
390                        ..Default::default()
391                    }
392                    .update(tx.as_ref())
393                    .await?;
394
395                    Ok::<(), DbSqlError>(())
396                })
397            })
398            .await?;
399
400        self.caches
401            .single_values
402            .invalidate(&CachedValueDiscriminants::IndexerDataCache)
403            .await;
404        Ok(())
405    }
406
407    async fn update_ticket_price<'a>(&'a self, tx: OptTx<'a>, price: HoprBalance) -> Result<()> {
408        self.nest_transaction(tx)
409            .await?
410            .perform(|tx| {
411                Box::pin(async move {
412                    chain_info::ActiveModel {
413                        id: Set(SINGULAR_TABLE_FIXED_ID),
414                        ticket_price: Set(Some(price.to_be_bytes().into())),
415                        ..Default::default()
416                    }
417                    .update(tx.as_ref())
418                    .await?;
419
420                    Ok::<(), DbSqlError>(())
421                })
422            })
423            .await?;
424
425        self.caches
426            .single_values
427            .invalidate(&CachedValueDiscriminants::IndexerDataCache)
428            .await;
429        Ok(())
430    }
431
432    async fn get_indexer_state_info<'a>(&'a self, tx: OptTx<'a>) -> Result<IndexerStateInfo> {
433        self.nest_transaction(tx)
434            .await?
435            .perform(|tx| {
436                Box::pin(async move {
437                    chain_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
438                        .one(tx.as_ref())
439                        .await?
440                        .ok_or(DbSqlError::MissingFixedTableEntry("chain_info".into()))
441                        .map(|m| IndexerStateInfo {
442                            latest_block_number: m.last_indexed_block as u32,
443                            ..Default::default()
444                        })
445                })
446            })
447            .await
448    }
449
450    async fn set_indexer_state_info<'a>(&'a self, tx: OptTx<'a>, block_num: u32) -> Result<()> {
451        self.nest_transaction(tx)
452            .await?
453            .perform(|tx| {
454                Box::pin(async move {
455                    let model = chain_info::Entity::find_by_id(SINGULAR_TABLE_FIXED_ID)
456                        .one(tx.as_ref())
457                        .await?
458                        .ok_or(MissingFixedTableEntry("chain_info".into()))?;
459
460                    let current_last_indexed_block = model.last_indexed_block;
461
462                    let mut active_model = model.into_active_model();
463
464                    trace!(
465                        old_block = current_last_indexed_block,
466                        new_block = block_num,
467                        "update block"
468                    );
469
470                    active_model.last_indexed_block = Set(block_num as i32);
471                    active_model.update(tx.as_ref()).await?;
472
473                    Ok::<_, DbSqlError>(())
474                })
475            })
476            .await
477    }
478
479    async fn get_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str) -> Result<Option<Box<[u8]>>> {
480        let k = key.to_owned();
481        self.nest_transaction(tx)
482            .await?
483            .perform(|tx| {
484                Box::pin(async move {
485                    Ok::<Option<Box<[u8]>>, DbSqlError>(
486                        global_settings::Entity::find()
487                            .filter(global_settings::Column::Key.eq(k))
488                            .one(tx.as_ref())
489                            .await?
490                            .map(|m| m.value.into_boxed_slice()),
491                    )
492                })
493            })
494            .await
495    }
496
497    async fn set_global_setting<'a>(&'a self, tx: OptTx<'a>, key: &str, value: Option<&[u8]>) -> Result<()> {
498        let k = key.to_owned();
499        let value = value.map(Vec::from);
500        self.nest_transaction(tx)
501            .await?
502            .perform(|tx| {
503                Box::pin(async move {
504                    if let Some(v) = value {
505                        let mut am = global_settings::Entity::find()
506                            .filter(global_settings::Column::Key.eq(k.clone()))
507                            .one(tx.as_ref())
508                            .await?
509                            .map(|m| m.into_active_model())
510                            .unwrap_or(global_settings::ActiveModel {
511                                key: Set(k),
512                                ..Default::default()
513                            });
514                        am.value = Set(v);
515                        am.save(tx.as_ref()).await?;
516                    } else {
517                        global_settings::Entity::delete_many()
518                            .filter(global_settings::Column::Key.eq(k))
519                            .exec(tx.as_ref())
520                            .await?;
521                    }
522                    Ok::<(), DbSqlError>(())
523                })
524            })
525            .await
526    }
527}
528
529#[cfg(test)]
530mod tests {
531    use hex_literal::hex;
532    use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
533    use hopr_primitive_types::{balance::HoprBalance, prelude::Address};
534
535    use super::*;
536
537    lazy_static::lazy_static! {
538        static ref ADDR_1: Address = Address::from(hex!("86fa27add61fafc955e2da17329bba9f31692fe7"));
539        static ref ADDR_2: Address = Address::from(hex!("4c8bbd047c2130e702badb23b6b97a88b6562324"));
540    }
541
542    #[tokio::test]
543    async fn test_set_get_balance() -> anyhow::Result<()> {
544        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
545
546        assert_eq!(
547            HoprBalance::zero(),
548            db.get_safe_hopr_balance(None).await?,
549            "balance must be 0"
550        );
551
552        let balance = HoprBalance::from(10_000);
553        db.set_safe_hopr_balance(None, balance).await?;
554
555        assert_eq!(
556            balance,
557            db.get_safe_hopr_balance(None).await?,
558            "balance must be {balance}"
559        );
560        Ok(())
561    }
562
563    #[tokio::test]
564    async fn test_set_get_allowance() -> anyhow::Result<()> {
565        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
566
567        assert_eq!(
568            HoprBalance::zero(),
569            db.get_safe_hopr_allowance(None).await?,
570            "balance must be 0"
571        );
572
573        let balance = HoprBalance::from(10_000);
574        db.set_safe_hopr_allowance(None, balance).await?;
575
576        assert_eq!(
577            balance,
578            db.get_safe_hopr_allowance(None).await?,
579            "balance must be {balance}"
580        );
581
582        Ok(())
583    }
584
585    #[tokio::test]
586    async fn test_set_get_indexer_data() -> anyhow::Result<()> {
587        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
588
589        let data = db.get_indexer_data(None).await?;
590        assert_eq!(data.ticket_price, None);
591
592        let price = HoprBalance::from(10);
593        db.update_ticket_price(None, price).await?;
594
595        db.set_minimum_incoming_ticket_win_prob(None, 0.5.try_into()?).await?;
596
597        let data = db.get_indexer_data(None).await?;
598
599        assert_eq!(data.ticket_price, Some(price));
600        assert_eq!(data.minimum_incoming_ticket_winning_prob, 0.5);
601        Ok(())
602    }
603
604    #[tokio::test]
605    async fn test_set_get_global_setting() -> anyhow::Result<()> {
606        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
607
608        let key = "test";
609        let value = hex!("deadbeef");
610
611        assert_eq!(None, db.get_global_setting(None, key).await?);
612
613        db.set_global_setting(None, key, Some(&value)).await?;
614
615        assert_eq!(Some(value.into()), db.get_global_setting(None, key).await?);
616
617        db.set_global_setting(None, key, None).await?;
618
619        assert_eq!(None, db.get_global_setting(None, key).await?);
620        Ok(())
621    }
622}