hopr_db_sql/
accounts.rs

1use async_trait::async_trait;
2use futures::{StreamExt, TryFutureExt, stream::BoxStream};
3use hopr_crypto_types::prelude::OffchainPublicKey;
4use hopr_db_entity::{
5    account, announcement,
6    prelude::{Account, Announcement},
7};
8use hopr_internal_types::{account::AccountType, prelude::AccountEntry};
9use hopr_primitive_types::{
10    errors::GeneralError,
11    prelude::{Address, ToHex},
12};
13use multiaddr::{Multiaddr, PeerId};
14use sea_orm::{
15    ActiveModelTrait, ColumnTrait, DbErr, EntityTrait, IntoActiveModel, ModelTrait, QueryFilter, QueryOrder, Related,
16    Set, sea_query::Expr,
17};
18use sea_query::{Condition, IntoCondition, OnConflict};
19use tracing::instrument;
20
21use crate::{
22    HoprDbGeneralModelOperations, HoprIndexerDb, OptTx,
23    errors::{DbSqlError, DbSqlError::MissingAccount, Result},
24};
25
26/// A type that can represent both [chain public key](Address) and [packet public key](OffchainPublicKey).
27#[allow(clippy::large_enum_variant)] // TODO: use CompactOffchainPublicKey
28#[derive(Copy, Clone, Debug, PartialEq, Eq)]
29pub enum ChainOrPacketKey {
30    /// Represents [chain public key](Address).
31    ChainKey(Address),
32    /// Represents [packet public key](OffchainPublicKey).
33    PacketKey(OffchainPublicKey),
34}
35
36impl From<Address> for ChainOrPacketKey {
37    fn from(value: Address) -> Self {
38        Self::ChainKey(value)
39    }
40}
41
42impl From<OffchainPublicKey> for ChainOrPacketKey {
43    fn from(value: OffchainPublicKey) -> Self {
44        Self::PacketKey(value)
45    }
46}
47
48impl TryFrom<ChainOrPacketKey> for OffchainPublicKey {
49    type Error = GeneralError;
50
51    fn try_from(value: ChainOrPacketKey) -> std::result::Result<Self, Self::Error> {
52        match value {
53            ChainOrPacketKey::ChainKey(_) => Err(GeneralError::InvalidInput),
54            ChainOrPacketKey::PacketKey(k) => Ok(k),
55        }
56    }
57}
58
59impl TryFrom<ChainOrPacketKey> for Address {
60    type Error = GeneralError;
61
62    fn try_from(value: ChainOrPacketKey) -> std::result::Result<Self, Self::Error> {
63        match value {
64            ChainOrPacketKey::ChainKey(k) => Ok(k),
65            ChainOrPacketKey::PacketKey(_) => Err(GeneralError::InvalidInput),
66        }
67    }
68}
69
70impl IntoCondition for ChainOrPacketKey {
71    fn into_condition(self) -> Condition {
72        match self {
73            ChainOrPacketKey::ChainKey(chain_key) => {
74                account::Column::ChainKey.eq(chain_key.to_string()).into_condition()
75            }
76            ChainOrPacketKey::PacketKey(packet_key) => {
77                account::Column::PacketKey.eq(packet_key.to_string()).into_condition()
78            }
79        }
80    }
81}
82
83/// Defines DB API for accessing HOPR accounts and corresponding on-chain announcements.
84///
85/// Accounts store the Chain and Packet key information, so as the
86/// routable network information if the account has been announced as well.
87#[async_trait]
88pub trait HoprDbAccountOperations {
89    /// Retrieves the account entry using a Packet key or Chain key.
90    async fn get_account<'a, T>(&'a self, tx: OptTx<'a>, key: T) -> Result<Option<AccountEntry>>
91    where
92        T: Into<ChainOrPacketKey> + Send + Sync;
93
94    /// Retrieves account entry about this node's account.
95    /// This a unique account in the database that must always be present.
96    async fn get_self_account<'a>(&'a self, tx: OptTx<'a>) -> Result<AccountEntry>;
97
98    async fn stream_accounts<'a>(&'a self, public_only: bool) -> Result<BoxStream<'a, AccountEntry>>;
99
100    /// Inserts a new account entry to the database.
101    /// Fails if such an entry already exists.
102    async fn insert_account<'a>(&'a self, tx: OptTx<'a>, account: AccountEntry) -> Result<()>;
103
104    /// Inserts a routable address announcement linked to a specific entry.
105    ///
106    /// If an account matching the given `key` (chain or off-chain key) does not exist, an
107    /// error is returned.
108    /// If such `multiaddr` has been already announced for the given account `key`, only
109    /// the `at_block` will be updated on that announcement.
110    async fn insert_announcement<'a, T>(
111        &'a self,
112        tx: OptTx<'a>,
113        key: T,
114        multiaddr: Multiaddr,
115        at_block: u32,
116    ) -> Result<AccountEntry>
117    where
118        T: Into<ChainOrPacketKey> + Send + Sync;
119
120    /// Deletes all address announcements for the given account.
121    async fn delete_all_announcements<'a, T>(&'a self, tx: OptTx<'a>, key: T) -> Result<()>
122    where
123        T: Into<ChainOrPacketKey> + Send + Sync;
124
125    /// Deletes account with the given `key` (chain or off-chain).
126    async fn delete_account<'a, T>(&'a self, tx: OptTx<'a>, key: T) -> Result<()>
127    where
128        T: Into<ChainOrPacketKey> + Send + Sync;
129
130    /// Translates the given Chain or Packet key to its counterpart.
131    ///
132    /// If `Address` is given as `key`, the result will contain `OffchainPublicKey` if present.
133    /// If `OffchainPublicKey` is given as `key`, the result will contain `Address` if present.
134    async fn translate_key<'a, T>(&'a self, tx: OptTx<'a>, key: T) -> Result<Option<ChainOrPacketKey>>
135    where
136        T: Into<ChainOrPacketKey> + Send + Sync;
137}
138
139// NOTE: this function currently assumes `announcements` are sorted from latest to earliest
140pub(crate) fn model_to_account_entry(
141    account: account::Model,
142    announcements: Vec<announcement::Model>,
143) -> Result<AccountEntry> {
144    // Currently, we always take only the most recent announcement
145    let announcement = announcements.first();
146
147    Ok(AccountEntry {
148        public_key: OffchainPublicKey::from_hex(&account.packet_key)?,
149        chain_addr: account.chain_key.parse()?,
150        published_at: account.published_at as u32,
151        entry_type: match announcement {
152            None => AccountType::NotAnnounced,
153            Some(a) => AccountType::Announced {
154                multiaddr: a.multiaddress.parse().map_err(|_| DbSqlError::DecodingError)?,
155                updated_block: a.at_block as u32,
156            },
157        },
158    })
159}
160
161#[async_trait]
162impl HoprDbAccountOperations for HoprIndexerDb {
163    async fn get_account<'a, T: Into<ChainOrPacketKey> + Send + Sync>(
164        &'a self,
165        tx: OptTx<'a>,
166        key: T,
167    ) -> Result<Option<AccountEntry>> {
168        let cpk = key.into();
169        self.nest_transaction(tx)
170            .await?
171            .perform(|tx| {
172                Box::pin(async move {
173                    let maybe_model = Account::find()
174                        .find_with_related(Announcement)
175                        .filter(cpk)
176                        .order_by_desc(announcement::Column::AtBlock)
177                        .all(tx.as_ref())
178                        .await?
179                        .pop();
180
181                    Ok::<_, DbSqlError>(if let Some((account, announcements)) = maybe_model {
182                        Some(model_to_account_entry(account, announcements)?)
183                    } else {
184                        None
185                    })
186                })
187            })
188            .await
189    }
190
191    async fn get_self_account<'a>(&'a self, tx: OptTx<'a>) -> Result<AccountEntry> {
192        self.get_account(tx, self.me_onchain)
193            .await?
194            .ok_or(DbSqlError::MissingAccount)
195    }
196
197    async fn stream_accounts<'a>(&'a self, public_only: bool) -> Result<BoxStream<'a, AccountEntry>> {
198        // .stream() cannot do CROSS JOIN, so we need to collect it via .all() first
199        let accounts = Account::find()
200            .find_with_related(Announcement)
201            .filter(if public_only {
202                announcement::Column::Multiaddress.ne("")
203            } else {
204                Expr::value(1)
205            })
206            .order_by_desc(announcement::Column::AtBlock)
207            .all(self.index_db.read_only())
208            .await?
209            .into_iter()
210            .map(|(a, b)| model_to_account_entry(a, b))
211            .collect::<Result<Vec<_>>>()?;
212
213        Ok(futures::stream::iter(accounts).boxed())
214    }
215
216    async fn insert_account<'a>(&'a self, tx: OptTx<'a>, account: AccountEntry) -> Result<()> {
217        let myself = self.clone();
218        self.nest_transaction(tx)
219            .await?
220            .perform(|tx| {
221                Box::pin(async move {
222                    match account::Entity::insert(account::ActiveModel {
223                        chain_key: Set(account.chain_addr.to_hex()),
224                        packet_key: Set(account.public_key.to_hex()),
225                        published_at: Set(account.published_at as i32),
226                        ..Default::default()
227                    })
228                    .on_conflict(
229                        OnConflict::columns([account::Column::ChainKey, account::Column::PacketKey])
230                            .do_nothing()
231                            .to_owned(),
232                    )
233                    .exec(tx.as_ref())
234                    .await
235                    {
236                        // Proceed if succeeded or already exists
237                        res @ Ok(_) | res @ Err(DbErr::RecordNotInserted) => {
238                            myself
239                                .caches
240                                .chain_to_offchain
241                                .insert(account.chain_addr, Some(account.public_key))
242                                .await;
243                            myself
244                                .caches
245                                .offchain_to_chain
246                                .insert(account.public_key, Some(account.chain_addr))
247                                .await;
248
249                            // Update key-id binding only if the account was inserted successfully
250                            // (= not re-announced)
251                            if res.is_ok() {
252                                if let Err(error) = myself.caches.key_id_mapper.update_key_id_binding(&account) {
253                                    tracing::warn!(?account, %error, "keybinding not updated")
254                                }
255                            }
256
257                            if let AccountType::Announced {
258                                multiaddr,
259                                updated_block,
260                            } = account.entry_type
261                            {
262                                myself
263                                    .insert_announcement(Some(tx), account.chain_addr, multiaddr, updated_block)
264                                    .await?;
265                            }
266
267                            Ok::<(), DbSqlError>(())
268                        }
269                        Err(e) => Err(e.into()),
270                    }
271                })
272            })
273            .await
274    }
275
276    async fn insert_announcement<'a, T>(
277        &'a self,
278        tx: OptTx<'a>,
279        key: T,
280        multiaddr: Multiaddr,
281        at_block: u32,
282    ) -> Result<AccountEntry>
283    where
284        T: Into<ChainOrPacketKey> + Send + Sync,
285    {
286        let cpk = key.into();
287        self.nest_transaction(tx)
288            .await?
289            .perform(|tx| {
290                Box::pin(async move {
291                    let (existing_account, mut existing_announcements) = account::Entity::find()
292                        .find_with_related(announcement::Entity)
293                        .filter(cpk)
294                        .order_by_desc(announcement::Column::AtBlock)
295                        .all(tx.as_ref())
296                        .await?
297                        .pop()
298                        .ok_or(MissingAccount)?;
299
300                    if let Some((index, _)) = existing_announcements
301                        .iter()
302                        .enumerate()
303                        .find(|(_, announcement)| announcement.multiaddress == multiaddr.to_string())
304                    {
305                        let mut existing_announcement = existing_announcements.remove(index).into_active_model();
306                        existing_announcement.at_block = Set(at_block as i32);
307                        let updated_announcement = existing_announcement.update(tx.as_ref()).await?;
308
309                        // To maintain the sort order, insert at the original location
310                        existing_announcements.insert(index, updated_announcement);
311                    } else {
312                        let new_announcement = announcement::ActiveModel {
313                            account_id: Set(existing_account.id),
314                            multiaddress: Set(multiaddr.to_string()),
315                            at_block: Set(at_block as i32),
316                            ..Default::default()
317                        }
318                        .insert(tx.as_ref())
319                        .await?;
320
321                        // Assuming this is the latest announcement, so prepend it
322                        existing_announcements.insert(0, new_announcement);
323                    }
324
325                    model_to_account_entry(existing_account, existing_announcements)
326                })
327            })
328            .await
329    }
330
331    async fn delete_all_announcements<'a, T>(&'a self, tx: OptTx<'a>, key: T) -> Result<()>
332    where
333        T: Into<ChainOrPacketKey> + Send + Sync,
334    {
335        let cpk = key.into();
336        self.nest_transaction(tx)
337            .await?
338            .perform(|tx| {
339                Box::pin(async move {
340                    let to_delete = account::Entity::find_related()
341                        .filter(cpk)
342                        .all(tx.as_ref())
343                        .await?
344                        .into_iter()
345                        .map(|x| x.id)
346                        .collect::<Vec<_>>();
347
348                    if !to_delete.is_empty() {
349                        announcement::Entity::delete_many()
350                            .filter(announcement::Column::Id.is_in(to_delete))
351                            .exec(tx.as_ref())
352                            .await?;
353
354                        Ok::<_, DbSqlError>(())
355                    } else {
356                        Err(MissingAccount)
357                    }
358                })
359            })
360            .await
361    }
362
363    async fn delete_account<'a, T>(&'a self, tx: OptTx<'a>, key: T) -> Result<()>
364    where
365        T: Into<ChainOrPacketKey> + Send + Sync,
366    {
367        let myself = self.clone();
368        let cpk = key.into();
369        self.nest_transaction(tx)
370            .await?
371            .perform(|tx| {
372                Box::pin(async move {
373                    if let Some(entry) = account::Entity::find().filter(cpk).one(tx.as_ref()).await? {
374                        let account_entry = model_to_account_entry(entry.clone(), vec![])?;
375                        entry.delete(tx.as_ref()).await?;
376
377                        myself
378                            .caches
379                            .chain_to_offchain
380                            .invalidate(&account_entry.chain_addr)
381                            .await;
382                        myself
383                            .caches
384                            .offchain_to_chain
385                            .invalidate(&account_entry.public_key)
386                            .await;
387                        Ok::<_, DbSqlError>(())
388                    } else {
389                        Err(MissingAccount)
390                    }
391                })
392            })
393            .await
394    }
395
396    #[instrument(level = "trace", skip_all, err)]
397    async fn translate_key<'a, T: Into<ChainOrPacketKey> + Send + Sync>(
398        &'a self,
399        tx: OptTx<'a>,
400        key: T,
401    ) -> Result<Option<ChainOrPacketKey>> {
402        Ok(match key.into() {
403            ChainOrPacketKey::ChainKey(chain_key) => self
404                .caches
405                .chain_to_offchain
406                .try_get_with_by_ref(
407                    &chain_key,
408                    self.nest_transaction(tx).and_then(|op| {
409                        tracing::warn!(?chain_key, "cache miss on chain key lookup");
410                        op.perform(|tx| {
411                            Box::pin(async move {
412                                let maybe_model = Account::find()
413                                    .filter(account::Column::ChainKey.eq(chain_key.to_string()))
414                                    .one(tx.as_ref())
415                                    .await?;
416                                if let Some(m) = maybe_model {
417                                    Ok(Some(OffchainPublicKey::from_hex(&m.packet_key)?))
418                                } else {
419                                    Ok(None)
420                                }
421                            })
422                        })
423                    }),
424                )
425                .await?
426                .map(ChainOrPacketKey::PacketKey),
427            ChainOrPacketKey::PacketKey(packet_key) => self
428                .caches
429                .offchain_to_chain
430                .try_get_with_by_ref(
431                    &packet_key,
432                    self.nest_transaction(tx).and_then(|op| {
433                        tracing::warn!(?packet_key, "cache miss on packet key lookup");
434                        op.perform(|tx| {
435                            Box::pin(async move {
436                                let maybe_model = Account::find()
437                                    .filter(account::Column::PacketKey.eq(packet_key.to_string()))
438                                    .one(tx.as_ref())
439                                    .await?;
440                                if let Some(m) = maybe_model {
441                                    Ok(Some(Address::from_hex(&m.chain_key)?))
442                                } else {
443                                    Ok(None)
444                                }
445                            })
446                        })
447                    }),
448                )
449                .await?
450                .map(ChainOrPacketKey::ChainKey),
451        })
452    }
453}
454
455impl HoprIndexerDb {
456    pub async fn resolve_packet_key(&self, onchain_key: &Address) -> Result<Option<OffchainPublicKey>> {
457        self.translate_key(None, *onchain_key)
458            .await?
459            .map(|k| k.try_into())
460            .inspect(|v: &std::result::Result<OffchainPublicKey, _>| {
461                if let Ok(offchain_pk) = v {
462                    tracing::trace!(
463                        peer = %Into::<PeerId>::into(offchain_pk),
464                        offchain_pk = offchain_pk.to_hex(),
465                        "found offchain key",
466                    );
467                }
468            })
469            .transpose()
470            .map_err(|_e| DbSqlError::LogicalError("failed to transpose the translated key".into()))
471    }
472
473    pub async fn resolve_chain_key(&self, offchain_key: &OffchainPublicKey) -> Result<Option<Address>> {
474        self.translate_key(None, *offchain_key)
475            .await?
476            .map(|k| k.try_into())
477            .transpose()
478            .map_err(|_e| DbSqlError::LogicalError("failed to transpose the translated key".into()))
479    }
480}
481
482#[cfg(test)]
483mod tests {
484    use anyhow::Context;
485    use hopr_crypto_types::prelude::{ChainKeypair, Keypair, OffchainKeypair};
486    use hopr_internal_types::prelude::AccountType::NotAnnounced;
487
488    use super::*;
489    use crate::{
490        HoprDbGeneralModelOperations,
491        errors::{DbSqlError, DbSqlError::DecodingError},
492    };
493
494    #[tokio::test]
495    async fn test_insert_account_announcement() -> anyhow::Result<()> {
496        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
497
498        let chain_1 = ChainKeypair::random().public().to_address();
499        let packet_1 = *OffchainKeypair::random().public();
500
501        db.insert_account(
502            None,
503            AccountEntry {
504                public_key: packet_1,
505                chain_addr: chain_1,
506                published_at: 1,
507                entry_type: AccountType::NotAnnounced,
508            },
509        )
510        .await?;
511
512        let acc = db.get_account(None, chain_1).await?.expect("should contain account");
513        assert_eq!(packet_1, acc.public_key, "pub keys must match");
514        assert_eq!(AccountType::NotAnnounced, acc.entry_type.clone());
515        assert_eq!(1, acc.published_at);
516
517        let maddr: Multiaddr = "/ip4/1.2.3.4/tcp/8000".parse()?;
518        let block = 100;
519
520        let db_acc = db.insert_announcement(None, chain_1, maddr.clone(), block).await?;
521
522        let acc = db.get_account(None, chain_1).await?.context("should contain account")?;
523        assert_eq!(Some(maddr.clone()), acc.get_multiaddr(), "multiaddress must match");
524        assert_eq!(Some(block), acc.updated_at());
525        assert_eq!(acc, db_acc);
526
527        let block = 200;
528        let db_acc = db.insert_announcement(None, chain_1, maddr.clone(), block).await?;
529
530        let acc = db.get_account(None, chain_1).await?.expect("should contain account");
531        assert_eq!(Some(maddr), acc.get_multiaddr(), "multiaddress must match");
532        assert_eq!(Some(block), acc.updated_at());
533        assert_eq!(acc, db_acc);
534
535        let maddr: Multiaddr = "/dns4/useful.domain/tcp/56".parse()?;
536        let block = 300;
537        let db_acc = db.insert_announcement(None, chain_1, maddr.clone(), block).await?;
538
539        let acc = db.get_account(None, chain_1).await?.expect("should contain account");
540        assert_eq!(Some(maddr), acc.get_multiaddr(), "multiaddress must match");
541        assert_eq!(Some(block), acc.updated_at());
542        assert_eq!(acc, db_acc);
543
544        Ok(())
545    }
546
547    #[tokio::test]
548    async fn test_should_allow_reannouncement() -> anyhow::Result<()> {
549        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
550
551        let chain_1 = ChainKeypair::random().public().to_address();
552        let packet_1 = *OffchainKeypair::random().public();
553
554        db.insert_account(
555            None,
556            AccountEntry {
557                public_key: packet_1,
558                chain_addr: chain_1,
559                published_at: 1,
560                entry_type: AccountType::NotAnnounced,
561            },
562        )
563        .await?;
564
565        db.insert_announcement(None, chain_1, "/ip4/1.2.3.4/tcp/8000".parse()?, 100)
566            .await?;
567
568        let ae = db.get_account(None, chain_1).await?.ok_or(MissingAccount)?;
569
570        assert_eq!(
571            "/ip4/1.2.3.4/tcp/8000",
572            ae.get_multiaddr().expect("has multiaddress").to_string()
573        );
574
575        db.insert_announcement(None, chain_1, "/ip4/1.2.3.4/tcp/8001".parse()?, 110)
576            .await?;
577
578        let ae = db.get_account(None, chain_1).await?.ok_or(MissingAccount)?;
579
580        assert_eq!(
581            "/ip4/1.2.3.4/tcp/8001",
582            ae.get_multiaddr().expect("has multiaddress").to_string()
583        );
584
585        Ok(())
586    }
587
588    #[tokio::test]
589    async fn test_should_not_insert_account_announcement_to_nonexisting_account() -> anyhow::Result<()> {
590        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
591
592        let chain_1 = ChainKeypair::random().public().to_address();
593
594        let maddr: Multiaddr = "/ip4/1.2.3.4/tcp/8000".parse()?;
595        let block = 100;
596
597        let r = db.insert_announcement(None, chain_1, maddr.clone(), block).await;
598        assert!(
599            matches!(r, Err(MissingAccount)),
600            "should not insert announcement to non-existing account"
601        );
602
603        Ok(())
604    }
605
606    #[tokio::test]
607    async fn test_should_allow_duplicate_announcement_per_different_accounts() -> anyhow::Result<()> {
608        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
609
610        let chain_1 = ChainKeypair::random().public().to_address();
611        let packet_1 = *OffchainKeypair::random().public();
612
613        db.insert_account(
614            None,
615            AccountEntry {
616                public_key: packet_1,
617                chain_addr: chain_1,
618                published_at: 1,
619                entry_type: AccountType::NotAnnounced,
620            },
621        )
622        .await?;
623
624        let chain_2 = ChainKeypair::random().public().to_address();
625        let packet_2 = *OffchainKeypair::random().public();
626
627        db.insert_account(
628            None,
629            AccountEntry {
630                public_key: packet_2,
631                chain_addr: chain_2,
632                published_at: 2,
633                entry_type: AccountType::NotAnnounced,
634            },
635        )
636        .await?;
637
638        let maddr: Multiaddr = "/ip4/1.2.3.4/tcp/8000".parse()?;
639        let block = 100;
640
641        let db_acc_1 = db.insert_announcement(None, chain_1, maddr.clone(), block).await?;
642        let db_acc_2 = db.insert_announcement(None, chain_2, maddr.clone(), block).await?;
643
644        let acc = db.get_account(None, chain_1).await?.expect("should contain account");
645        assert_eq!(Some(maddr.clone()), acc.get_multiaddr(), "multiaddress must match");
646        assert_eq!(Some(block), acc.updated_at());
647        assert_eq!(acc, db_acc_1);
648
649        let acc = db.get_account(None, chain_2).await?.expect("should contain account");
650        assert_eq!(Some(maddr.clone()), acc.get_multiaddr(), "multiaddress must match");
651        assert_eq!(Some(block), acc.updated_at());
652        assert_eq!(acc, db_acc_2);
653
654        Ok(())
655    }
656
657    #[tokio::test]
658    async fn test_delete_account() -> anyhow::Result<()> {
659        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
660
661        let packet_1 = *OffchainKeypair::random().public();
662        let chain_1 = ChainKeypair::random().public().to_address();
663        db.insert_account(
664            None,
665            AccountEntry {
666                public_key: packet_1,
667                chain_addr: chain_1,
668                published_at: 1,
669                entry_type: AccountType::Announced {
670                    multiaddr: "/ip4/1.2.3.4/tcp/1234".parse()?,
671                    updated_block: 10,
672                },
673            },
674        )
675        .await?;
676
677        assert!(db.get_account(None, chain_1).await?.is_some());
678
679        db.delete_account(None, chain_1).await?;
680
681        assert!(db.get_account(None, chain_1).await?.is_none());
682
683        Ok(())
684    }
685
686    #[tokio::test]
687    async fn test_should_fail_to_delete_nonexistent_account() -> anyhow::Result<()> {
688        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
689
690        let chain_1 = ChainKeypair::random().public().to_address();
691
692        let r = db.delete_account(None, chain_1).await;
693        assert!(
694            matches!(r, Err(MissingAccount)),
695            "should not delete non-existing account"
696        );
697
698        Ok(())
699    }
700
701    #[tokio::test]
702    async fn test_should_not_fail_on_duplicate_account_insert() -> anyhow::Result<()> {
703        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
704
705        let chain_1 = ChainKeypair::random().public().to_address();
706        let packet_1 = *OffchainKeypair::random().public();
707
708        db.insert_account(
709            None,
710            AccountEntry {
711                public_key: packet_1,
712                chain_addr: chain_1,
713                published_at: 1,
714                entry_type: AccountType::NotAnnounced,
715            },
716        )
717        .await?;
718
719        db.insert_account(
720            None,
721            AccountEntry {
722                public_key: packet_1,
723                chain_addr: chain_1,
724                published_at: 1,
725                entry_type: AccountType::NotAnnounced,
726            },
727        )
728        .await?;
729
730        Ok(())
731    }
732
733    #[tokio::test]
734    async fn test_delete_announcements() -> anyhow::Result<()> {
735        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
736
737        let packet_1 = *OffchainKeypair::random().public();
738        let chain_1 = ChainKeypair::random().public().to_address();
739        let mut entry = AccountEntry {
740            public_key: packet_1,
741            chain_addr: chain_1,
742            published_at: 1,
743            entry_type: AccountType::Announced {
744                multiaddr: "/ip4/1.2.3.4/tcp/1234".parse()?,
745                updated_block: 10,
746            },
747        };
748
749        db.insert_account(None, entry.clone()).await?;
750
751        assert_eq!(Some(entry.clone()), db.get_account(None, chain_1).await?);
752
753        db.delete_all_announcements(None, chain_1).await?;
754
755        entry.entry_type = NotAnnounced;
756
757        assert_eq!(Some(entry), db.get_account(None, chain_1).await?);
758
759        Ok(())
760    }
761
762    #[tokio::test]
763    async fn test_should_fail_to_delete_nonexistent_account_announcements() -> anyhow::Result<()> {
764        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
765
766        let chain_1 = ChainKeypair::random().public().to_address();
767
768        let r = db.delete_all_announcements(None, chain_1).await;
769        assert!(
770            matches!(r, Err(MissingAccount)),
771            "should not delete non-existing account"
772        );
773
774        Ok(())
775    }
776
777    #[tokio::test]
778    async fn test_translate_key() -> anyhow::Result<()> {
779        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
780
781        let chain_1 = ChainKeypair::random().public().to_address();
782        let packet_1 = *OffchainKeypair::random().public();
783
784        let chain_2 = ChainKeypair::random().public().to_address();
785        let packet_2 = *OffchainKeypair::random().public();
786
787        let db_clone = db.clone();
788        db.begin_transaction()
789            .await?
790            .perform(|tx| {
791                Box::pin(async move {
792                    db_clone
793                        .insert_account(
794                            tx.into(),
795                            AccountEntry {
796                                public_key: packet_1,
797                                chain_addr: chain_1,
798                                published_at: 1,
799                                entry_type: AccountType::NotAnnounced,
800                            },
801                        )
802                        .await?;
803                    db_clone
804                        .insert_account(
805                            tx.into(),
806                            AccountEntry {
807                                public_key: packet_2,
808                                chain_addr: chain_2,
809                                published_at: 2,
810                                entry_type: AccountType::NotAnnounced,
811                            },
812                        )
813                        .await?;
814                    Ok::<(), DbSqlError>(())
815                })
816            })
817            .await?;
818
819        let a: Address = db
820            .translate_key(None, packet_1)
821            .await?
822            .context("must contain key")?
823            .try_into()?;
824
825        let b: OffchainPublicKey = db
826            .translate_key(None, chain_2)
827            .await?
828            .context("must contain key")?
829            .try_into()?;
830
831        assert_eq!(chain_1, a, "chain keys must match");
832        assert_eq!(packet_2, b, "chain keys must match");
833
834        Ok(())
835    }
836
837    #[tokio::test]
838    async fn test_translate_key_no_cache() -> anyhow::Result<()> {
839        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
840
841        let chain_1 = ChainKeypair::random().public().to_address();
842        let packet_1 = *OffchainKeypair::random().public();
843
844        let chain_2 = ChainKeypair::random().public().to_address();
845        let packet_2 = *OffchainKeypair::random().public();
846
847        let db_clone = db.clone();
848        db.begin_transaction()
849            .await?
850            .perform(|tx| {
851                Box::pin(async move {
852                    db_clone
853                        .insert_account(
854                            tx.into(),
855                            AccountEntry {
856                                public_key: packet_1,
857                                chain_addr: chain_1,
858                                published_at: 1,
859                                entry_type: AccountType::NotAnnounced,
860                            },
861                        )
862                        .await?;
863                    db_clone
864                        .insert_account(
865                            tx.into(),
866                            AccountEntry {
867                                public_key: packet_2,
868                                chain_addr: chain_2,
869                                published_at: 2,
870                                entry_type: AccountType::NotAnnounced,
871                            },
872                        )
873                        .await?;
874                    Ok::<(), DbSqlError>(())
875                })
876            })
877            .await?;
878
879        db.caches.invalidate_all();
880
881        let a: Address = db
882            .translate_key(None, packet_1)
883            .await?
884            .context("must contain key")?
885            .try_into()?;
886
887        let b: OffchainPublicKey = db
888            .translate_key(None, chain_2)
889            .await?
890            .context("must contain key")?
891            .try_into()?;
892
893        assert_eq!(chain_1, a, "chain keys must match");
894        assert_eq!(packet_2, b, "chain keys must match");
895
896        Ok(())
897    }
898
899    #[tokio::test]
900    async fn test_stream_accounts() -> anyhow::Result<()> {
901        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
902
903        let chain_1 = ChainKeypair::random().public().to_address();
904        let chain_2 = ChainKeypair::random().public().to_address();
905        let chain_3 = ChainKeypair::random().public().to_address();
906
907        let db_clone = db.clone();
908        db.begin_transaction()
909            .await?
910            .perform(|tx| {
911                Box::pin(async move {
912                    db_clone
913                        .insert_account(
914                            Some(tx),
915                            AccountEntry {
916                                public_key: *OffchainKeypair::random().public(),
917                                chain_addr: chain_1,
918                                entry_type: AccountType::NotAnnounced,
919                                published_at: 1,
920                            },
921                        )
922                        .await?;
923                    db_clone
924                        .insert_account(
925                            Some(tx),
926                            AccountEntry {
927                                public_key: *OffchainKeypair::random().public(),
928                                chain_addr: chain_2,
929                                entry_type: AccountType::Announced {
930                                    multiaddr: "/ip4/10.10.10.10/tcp/1234".parse().map_err(|_| DecodingError)?,
931                                    updated_block: 10,
932                                },
933                                published_at: 2,
934                            },
935                        )
936                        .await?;
937                    db_clone
938                        .insert_account(
939                            Some(tx),
940                            AccountEntry {
941                                public_key: *OffchainKeypair::random().public(),
942                                chain_addr: chain_3,
943                                entry_type: AccountType::NotAnnounced,
944                                published_at: 3,
945                            },
946                        )
947                        .await?;
948
949                    db_clone
950                        .insert_announcement(
951                            Some(tx),
952                            chain_3,
953                            "/ip4/1.2.3.4/tcp/1234".parse().map_err(|_| DecodingError)?,
954                            12,
955                        )
956                        .await?;
957                    db_clone
958                        .insert_announcement(
959                            Some(tx),
960                            chain_3,
961                            "/ip4/8.8.1.1/tcp/1234".parse().map_err(|_| DecodingError)?,
962                            15,
963                        )
964                        .await?;
965                    db_clone
966                        .insert_announcement(
967                            Some(tx),
968                            chain_3,
969                            "/ip4/1.2.3.0/tcp/234".parse().map_err(|_| DecodingError)?,
970                            14,
971                        )
972                        .await
973                })
974            })
975            .await?;
976
977        let all_accounts = db.stream_accounts(false).await?.collect::<Vec<_>>().await;
978        let public_only = db.stream_accounts(true).await?.collect::<Vec<_>>().await;
979
980        assert_eq!(3, all_accounts.len());
981
982        assert_eq!(2, public_only.len());
983        let acc_1 = public_only
984            .iter()
985            .find(|a| a.chain_addr.eq(&chain_2))
986            .expect("should contain 1");
987
988        let acc_2 = public_only
989            .iter()
990            .find(|a| a.chain_addr.eq(&chain_3))
991            .expect("should contain 2");
992
993        assert_eq!(
994            "/ip4/10.10.10.10/tcp/1234",
995            acc_1.get_multiaddr().expect("should have a multiaddress").to_string()
996        );
997        assert_eq!(
998            "/ip4/8.8.1.1/tcp/1234",
999            acc_2.get_multiaddr().expect("should have a multiaddress").to_string()
1000        );
1001
1002        Ok(())
1003    }
1004
1005    #[tokio::test]
1006    async fn test_get_offchain_key_should_return_nothing_if_a_mapping_to_chain_key_does_not_exist() -> anyhow::Result<()>
1007    {
1008        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1009
1010        let chain = ChainKeypair::random().public().to_address();
1011
1012        let actual_pk = db.resolve_packet_key(&chain).await?;
1013        assert_eq!(actual_pk, None, "offchain key should not be present");
1014        Ok(())
1015    }
1016
1017    #[tokio::test]
1018    async fn test_get_chain_key_should_return_nothing_if_a_mapping_to_offchain_key_does_not_exist() -> anyhow::Result<()>
1019    {
1020        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1021
1022        let packet = *OffchainKeypair::random().public();
1023
1024        let actual_ck = db.resolve_chain_key(&packet).await?;
1025        assert_eq!(actual_ck, None, "chain key should not be present");
1026        Ok(())
1027    }
1028
1029    #[tokio::test]
1030    async fn test_get_chain_key_should_succeed_if_a_mapping_to_offchain_key_exists() -> anyhow::Result<()> {
1031        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1032
1033        // Inserting to the table directly to avoid cache
1034
1035        let chain_1 = ChainKeypair::random().public().to_address();
1036        let packet_1 = *OffchainKeypair::random().public();
1037        let account_1 = hopr_db_entity::account::ActiveModel {
1038            chain_key: Set(chain_1.to_hex()),
1039            packet_key: Set(packet_1.to_hex()),
1040            ..Default::default()
1041        };
1042
1043        let chain_2 = ChainKeypair::random().public().to_address();
1044        let packet_2 = *OffchainKeypair::random().public();
1045        let account_2 = hopr_db_entity::account::ActiveModel {
1046            chain_key: Set(chain_2.to_hex()),
1047            packet_key: Set(packet_2.to_hex()),
1048            ..Default::default()
1049        };
1050
1051        hopr_db_entity::account::Entity::insert_many([account_1, account_2])
1052            .exec(db.index_db.read_write())
1053            .await?;
1054
1055        let actual_ck = db.resolve_chain_key(&packet_1).await?;
1056        assert_eq!(actual_ck, Some(chain_1), "chain keys must match");
1057        Ok(())
1058    }
1059
1060    #[tokio::test]
1061    async fn test_get_chain_key_should_succeed_if_a_mapping_to_offchain_key_exists_with_cache() -> anyhow::Result<()> {
1062        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1063
1064        // Inserting to the table via API to insert into cache as well
1065
1066        let chain_1 = ChainKeypair::random().public().to_address();
1067        let packet_1 = *OffchainKeypair::random().public();
1068        db.insert_account(
1069            None,
1070            AccountEntry {
1071                public_key: packet_1,
1072                chain_addr: chain_1,
1073                entry_type: AccountType::NotAnnounced,
1074                published_at: 1,
1075            },
1076        )
1077        .await?;
1078
1079        let chain_2 = ChainKeypair::random().public().to_address();
1080        let packet_2 = *OffchainKeypair::random().public();
1081        db.insert_account(
1082            None,
1083            AccountEntry {
1084                public_key: packet_2,
1085                chain_addr: chain_2,
1086                entry_type: AccountType::NotAnnounced,
1087                published_at: 1,
1088            },
1089        )
1090        .await?;
1091
1092        let actual_ck = db.resolve_chain_key(&packet_1).await?;
1093        assert_eq!(actual_ck, Some(chain_1), "chain keys must match");
1094        Ok(())
1095    }
1096
1097    #[tokio::test]
1098    async fn test_get_offchain_key_should_succeed_if_a_mapping_to_chain_key_exists() -> anyhow::Result<()> {
1099        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1100
1101        // Inserting to the table directly to avoid cache
1102
1103        let chain_1 = ChainKeypair::random().public().to_address();
1104        let packet_1 = *OffchainKeypair::random().public();
1105        let account_1 = hopr_db_entity::account::ActiveModel {
1106            chain_key: Set(chain_1.to_hex()),
1107            packet_key: Set(packet_1.to_hex()),
1108            ..Default::default()
1109        };
1110
1111        let chain_2 = ChainKeypair::random().public().to_address();
1112        let packet_2 = *OffchainKeypair::random().public();
1113        let account_2 = hopr_db_entity::account::ActiveModel {
1114            chain_key: Set(chain_2.to_hex()),
1115            packet_key: Set(packet_2.to_hex()),
1116            ..Default::default()
1117        };
1118
1119        hopr_db_entity::account::Entity::insert_many([account_1, account_2])
1120            .exec(db.index_db.read_write())
1121            .await?;
1122
1123        let actual_pk = db.resolve_packet_key(&chain_2).await?;
1124
1125        assert_eq!(actual_pk, Some(packet_2), "packet keys must match");
1126        Ok(())
1127    }
1128
1129    #[tokio::test]
1130    async fn test_get_offchain_key_should_succeed_if_a_mapping_to_chain_key_exists_with_cache() -> anyhow::Result<()> {
1131        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1132
1133        // Inserting to the table via API to insert into cache as well
1134
1135        let chain_1 = ChainKeypair::random().public().to_address();
1136        let packet_1 = *OffchainKeypair::random().public();
1137        db.insert_account(
1138            None,
1139            AccountEntry {
1140                public_key: packet_1,
1141                chain_addr: chain_1,
1142                entry_type: AccountType::NotAnnounced,
1143                published_at: 1,
1144            },
1145        )
1146        .await?;
1147
1148        let chain_2 = ChainKeypair::random().public().to_address();
1149        let packet_2 = *OffchainKeypair::random().public();
1150        db.insert_account(
1151            None,
1152            AccountEntry {
1153                public_key: packet_2,
1154                chain_addr: chain_2,
1155                entry_type: AccountType::NotAnnounced,
1156                published_at: 1,
1157            },
1158        )
1159        .await?;
1160
1161        let actual_pk = db.resolve_packet_key(&chain_2).await?;
1162
1163        assert_eq!(actual_pk, Some(packet_2), "packet keys must match");
1164        Ok(())
1165    }
1166}