1use async_trait::async_trait;
2use futures::{StreamExt, TryFutureExt, stream::BoxStream};
3use hopr_crypto_types::prelude::OffchainPublicKey;
4use hopr_db_entity::{
5 account, announcement,
6 prelude::{Account, Announcement},
7};
8use hopr_internal_types::{account::AccountType, prelude::AccountEntry};
9use hopr_primitive_types::{
10 errors::GeneralError,
11 prelude::{Address, ToHex},
12};
13use multiaddr::{Multiaddr, PeerId};
14use sea_orm::{
15 ActiveModelTrait, ColumnTrait, DbErr, EntityTrait, IntoActiveModel, ModelTrait, QueryFilter, QueryOrder, Related,
16 Set, sea_query::Expr,
17};
18use sea_query::{Condition, IntoCondition, OnConflict};
19use tracing::instrument;
20
21use crate::{
22 HoprDbGeneralModelOperations, HoprIndexerDb, OptTx,
23 errors::{DbSqlError, DbSqlError::MissingAccount, Result},
24};
25
26#[allow(clippy::large_enum_variant)] #[derive(Copy, Clone, Debug, PartialEq, Eq)]
29pub enum ChainOrPacketKey {
30 ChainKey(Address),
32 PacketKey(OffchainPublicKey),
34}
35
36impl From<Address> for ChainOrPacketKey {
37 fn from(value: Address) -> Self {
38 Self::ChainKey(value)
39 }
40}
41
42impl From<OffchainPublicKey> for ChainOrPacketKey {
43 fn from(value: OffchainPublicKey) -> Self {
44 Self::PacketKey(value)
45 }
46}
47
48impl TryFrom<ChainOrPacketKey> for OffchainPublicKey {
49 type Error = GeneralError;
50
51 fn try_from(value: ChainOrPacketKey) -> std::result::Result<Self, Self::Error> {
52 match value {
53 ChainOrPacketKey::ChainKey(_) => Err(GeneralError::InvalidInput),
54 ChainOrPacketKey::PacketKey(k) => Ok(k),
55 }
56 }
57}
58
59impl TryFrom<ChainOrPacketKey> for Address {
60 type Error = GeneralError;
61
62 fn try_from(value: ChainOrPacketKey) -> std::result::Result<Self, Self::Error> {
63 match value {
64 ChainOrPacketKey::ChainKey(k) => Ok(k),
65 ChainOrPacketKey::PacketKey(_) => Err(GeneralError::InvalidInput),
66 }
67 }
68}
69
70impl IntoCondition for ChainOrPacketKey {
71 fn into_condition(self) -> Condition {
72 match self {
73 ChainOrPacketKey::ChainKey(chain_key) => {
74 account::Column::ChainKey.eq(chain_key.to_string()).into_condition()
75 }
76 ChainOrPacketKey::PacketKey(packet_key) => {
77 account::Column::PacketKey.eq(packet_key.to_string()).into_condition()
78 }
79 }
80 }
81}
82
83#[async_trait]
88pub trait HoprDbAccountOperations {
89 async fn get_account<'a, T>(&'a self, tx: OptTx<'a>, key: T) -> Result<Option<AccountEntry>>
91 where
92 T: Into<ChainOrPacketKey> + Send + Sync;
93
94 async fn get_self_account<'a>(&'a self, tx: OptTx<'a>) -> Result<AccountEntry>;
97
98 async fn stream_accounts<'a>(&'a self, public_only: bool) -> Result<BoxStream<'a, AccountEntry>>;
99
100 async fn insert_account<'a>(&'a self, tx: OptTx<'a>, account: AccountEntry) -> Result<()>;
103
104 async fn insert_announcement<'a, T>(
111 &'a self,
112 tx: OptTx<'a>,
113 key: T,
114 multiaddr: Multiaddr,
115 at_block: u32,
116 ) -> Result<AccountEntry>
117 where
118 T: Into<ChainOrPacketKey> + Send + Sync;
119
120 async fn delete_all_announcements<'a, T>(&'a self, tx: OptTx<'a>, key: T) -> Result<()>
122 where
123 T: Into<ChainOrPacketKey> + Send + Sync;
124
125 async fn delete_account<'a, T>(&'a self, tx: OptTx<'a>, key: T) -> Result<()>
127 where
128 T: Into<ChainOrPacketKey> + Send + Sync;
129
130 async fn translate_key<'a, T>(&'a self, tx: OptTx<'a>, key: T) -> Result<Option<ChainOrPacketKey>>
135 where
136 T: Into<ChainOrPacketKey> + Send + Sync;
137}
138
139pub(crate) fn model_to_account_entry(
141 account: account::Model,
142 announcements: Vec<announcement::Model>,
143) -> Result<AccountEntry> {
144 let announcement = announcements.first();
146
147 Ok(AccountEntry {
148 public_key: OffchainPublicKey::from_hex(&account.packet_key)?,
149 chain_addr: account.chain_key.parse()?,
150 published_at: account.published_at as u32,
151 entry_type: match announcement {
152 None => AccountType::NotAnnounced,
153 Some(a) => AccountType::Announced {
154 multiaddr: a.multiaddress.parse().map_err(|_| DbSqlError::DecodingError)?,
155 updated_block: a.at_block as u32,
156 },
157 },
158 })
159}
160
161#[async_trait]
162impl HoprDbAccountOperations for HoprIndexerDb {
163 async fn get_account<'a, T: Into<ChainOrPacketKey> + Send + Sync>(
164 &'a self,
165 tx: OptTx<'a>,
166 key: T,
167 ) -> Result<Option<AccountEntry>> {
168 let cpk = key.into();
169 self.nest_transaction(tx)
170 .await?
171 .perform(|tx| {
172 Box::pin(async move {
173 let maybe_model = Account::find()
174 .find_with_related(Announcement)
175 .filter(cpk)
176 .order_by_desc(announcement::Column::AtBlock)
177 .all(tx.as_ref())
178 .await?
179 .pop();
180
181 Ok::<_, DbSqlError>(if let Some((account, announcements)) = maybe_model {
182 Some(model_to_account_entry(account, announcements)?)
183 } else {
184 None
185 })
186 })
187 })
188 .await
189 }
190
191 async fn get_self_account<'a>(&'a self, tx: OptTx<'a>) -> Result<AccountEntry> {
192 self.get_account(tx, self.me_onchain)
193 .await?
194 .ok_or(DbSqlError::MissingAccount)
195 }
196
197 async fn stream_accounts<'a>(&'a self, public_only: bool) -> Result<BoxStream<'a, AccountEntry>> {
198 let accounts = Account::find()
200 .find_with_related(Announcement)
201 .filter(if public_only {
202 announcement::Column::Multiaddress.ne("")
203 } else {
204 Expr::value(1)
205 })
206 .order_by_desc(announcement::Column::AtBlock)
207 .all(self.index_db.read_only())
208 .await?
209 .into_iter()
210 .map(|(a, b)| model_to_account_entry(a, b))
211 .collect::<Result<Vec<_>>>()?;
212
213 Ok(futures::stream::iter(accounts).boxed())
214 }
215
216 async fn insert_account<'a>(&'a self, tx: OptTx<'a>, account: AccountEntry) -> Result<()> {
217 let myself = self.clone();
218 self.nest_transaction(tx)
219 .await?
220 .perform(|tx| {
221 Box::pin(async move {
222 match account::Entity::insert(account::ActiveModel {
223 chain_key: Set(account.chain_addr.to_hex()),
224 packet_key: Set(account.public_key.to_hex()),
225 published_at: Set(account.published_at as i32),
226 ..Default::default()
227 })
228 .on_conflict(
229 OnConflict::columns([account::Column::ChainKey, account::Column::PacketKey])
230 .do_nothing()
231 .to_owned(),
232 )
233 .exec(tx.as_ref())
234 .await
235 {
236 res @ Ok(_) | res @ Err(DbErr::RecordNotInserted) => {
238 myself
239 .caches
240 .chain_to_offchain
241 .insert(account.chain_addr, Some(account.public_key))
242 .await;
243 myself
244 .caches
245 .offchain_to_chain
246 .insert(account.public_key, Some(account.chain_addr))
247 .await;
248
249 if res.is_ok() {
252 if let Err(error) = myself.caches.key_id_mapper.update_key_id_binding(&account) {
253 tracing::warn!(?account, %error, "keybinding not updated")
254 }
255 }
256
257 if let AccountType::Announced {
258 multiaddr,
259 updated_block,
260 } = account.entry_type
261 {
262 myself
263 .insert_announcement(Some(tx), account.chain_addr, multiaddr, updated_block)
264 .await?;
265 }
266
267 Ok::<(), DbSqlError>(())
268 }
269 Err(e) => Err(e.into()),
270 }
271 })
272 })
273 .await
274 }
275
276 async fn insert_announcement<'a, T>(
277 &'a self,
278 tx: OptTx<'a>,
279 key: T,
280 multiaddr: Multiaddr,
281 at_block: u32,
282 ) -> Result<AccountEntry>
283 where
284 T: Into<ChainOrPacketKey> + Send + Sync,
285 {
286 let cpk = key.into();
287 self.nest_transaction(tx)
288 .await?
289 .perform(|tx| {
290 Box::pin(async move {
291 let (existing_account, mut existing_announcements) = account::Entity::find()
292 .find_with_related(announcement::Entity)
293 .filter(cpk)
294 .order_by_desc(announcement::Column::AtBlock)
295 .all(tx.as_ref())
296 .await?
297 .pop()
298 .ok_or(MissingAccount)?;
299
300 if let Some((index, _)) = existing_announcements
301 .iter()
302 .enumerate()
303 .find(|(_, announcement)| announcement.multiaddress == multiaddr.to_string())
304 {
305 let mut existing_announcement = existing_announcements.remove(index).into_active_model();
306 existing_announcement.at_block = Set(at_block as i32);
307 let updated_announcement = existing_announcement.update(tx.as_ref()).await?;
308
309 existing_announcements.insert(index, updated_announcement);
311 } else {
312 let new_announcement = announcement::ActiveModel {
313 account_id: Set(existing_account.id),
314 multiaddress: Set(multiaddr.to_string()),
315 at_block: Set(at_block as i32),
316 ..Default::default()
317 }
318 .insert(tx.as_ref())
319 .await?;
320
321 existing_announcements.insert(0, new_announcement);
323 }
324
325 model_to_account_entry(existing_account, existing_announcements)
326 })
327 })
328 .await
329 }
330
331 async fn delete_all_announcements<'a, T>(&'a self, tx: OptTx<'a>, key: T) -> Result<()>
332 where
333 T: Into<ChainOrPacketKey> + Send + Sync,
334 {
335 let cpk = key.into();
336 self.nest_transaction(tx)
337 .await?
338 .perform(|tx| {
339 Box::pin(async move {
340 let to_delete = account::Entity::find_related()
341 .filter(cpk)
342 .all(tx.as_ref())
343 .await?
344 .into_iter()
345 .map(|x| x.id)
346 .collect::<Vec<_>>();
347
348 if !to_delete.is_empty() {
349 announcement::Entity::delete_many()
350 .filter(announcement::Column::Id.is_in(to_delete))
351 .exec(tx.as_ref())
352 .await?;
353
354 Ok::<_, DbSqlError>(())
355 } else {
356 Err(MissingAccount)
357 }
358 })
359 })
360 .await
361 }
362
363 async fn delete_account<'a, T>(&'a self, tx: OptTx<'a>, key: T) -> Result<()>
364 where
365 T: Into<ChainOrPacketKey> + Send + Sync,
366 {
367 let myself = self.clone();
368 let cpk = key.into();
369 self.nest_transaction(tx)
370 .await?
371 .perform(|tx| {
372 Box::pin(async move {
373 if let Some(entry) = account::Entity::find().filter(cpk).one(tx.as_ref()).await? {
374 let account_entry = model_to_account_entry(entry.clone(), vec![])?;
375 entry.delete(tx.as_ref()).await?;
376
377 myself
378 .caches
379 .chain_to_offchain
380 .invalidate(&account_entry.chain_addr)
381 .await;
382 myself
383 .caches
384 .offchain_to_chain
385 .invalidate(&account_entry.public_key)
386 .await;
387 Ok::<_, DbSqlError>(())
388 } else {
389 Err(MissingAccount)
390 }
391 })
392 })
393 .await
394 }
395
396 #[instrument(level = "trace", skip_all, err)]
397 async fn translate_key<'a, T: Into<ChainOrPacketKey> + Send + Sync>(
398 &'a self,
399 tx: OptTx<'a>,
400 key: T,
401 ) -> Result<Option<ChainOrPacketKey>> {
402 Ok(match key.into() {
403 ChainOrPacketKey::ChainKey(chain_key) => self
404 .caches
405 .chain_to_offchain
406 .try_get_with_by_ref(
407 &chain_key,
408 self.nest_transaction(tx).and_then(|op| {
409 tracing::warn!(?chain_key, "cache miss on chain key lookup");
410 op.perform(|tx| {
411 Box::pin(async move {
412 let maybe_model = Account::find()
413 .filter(account::Column::ChainKey.eq(chain_key.to_string()))
414 .one(tx.as_ref())
415 .await?;
416 if let Some(m) = maybe_model {
417 Ok(Some(OffchainPublicKey::from_hex(&m.packet_key)?))
418 } else {
419 Ok(None)
420 }
421 })
422 })
423 }),
424 )
425 .await?
426 .map(ChainOrPacketKey::PacketKey),
427 ChainOrPacketKey::PacketKey(packet_key) => self
428 .caches
429 .offchain_to_chain
430 .try_get_with_by_ref(
431 &packet_key,
432 self.nest_transaction(tx).and_then(|op| {
433 tracing::warn!(?packet_key, "cache miss on packet key lookup");
434 op.perform(|tx| {
435 Box::pin(async move {
436 let maybe_model = Account::find()
437 .filter(account::Column::PacketKey.eq(packet_key.to_string()))
438 .one(tx.as_ref())
439 .await?;
440 if let Some(m) = maybe_model {
441 Ok(Some(Address::from_hex(&m.chain_key)?))
442 } else {
443 Ok(None)
444 }
445 })
446 })
447 }),
448 )
449 .await?
450 .map(ChainOrPacketKey::ChainKey),
451 })
452 }
453}
454
455impl HoprIndexerDb {
456 pub async fn resolve_packet_key(&self, onchain_key: &Address) -> Result<Option<OffchainPublicKey>> {
457 self.translate_key(None, *onchain_key)
458 .await?
459 .map(|k| k.try_into())
460 .inspect(|v: &std::result::Result<OffchainPublicKey, _>| {
461 if let Ok(offchain_pk) = v {
462 tracing::trace!(
463 peer = %Into::<PeerId>::into(offchain_pk),
464 offchain_pk = offchain_pk.to_hex(),
465 "found offchain key",
466 );
467 }
468 })
469 .transpose()
470 .map_err(|_e| DbSqlError::LogicalError("failed to transpose the translated key".into()))
471 }
472
473 pub async fn resolve_chain_key(&self, offchain_key: &OffchainPublicKey) -> Result<Option<Address>> {
474 self.translate_key(None, *offchain_key)
475 .await?
476 .map(|k| k.try_into())
477 .transpose()
478 .map_err(|_e| DbSqlError::LogicalError("failed to transpose the translated key".into()))
479 }
480}
481
482#[cfg(test)]
483mod tests {
484 use anyhow::Context;
485 use hopr_crypto_types::prelude::{ChainKeypair, Keypair, OffchainKeypair};
486 use hopr_internal_types::prelude::AccountType::NotAnnounced;
487
488 use super::*;
489 use crate::{
490 HoprDbGeneralModelOperations,
491 errors::{DbSqlError, DbSqlError::DecodingError},
492 };
493
494 #[tokio::test]
495 async fn test_insert_account_announcement() -> anyhow::Result<()> {
496 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
497
498 let chain_1 = ChainKeypair::random().public().to_address();
499 let packet_1 = *OffchainKeypair::random().public();
500
501 db.insert_account(
502 None,
503 AccountEntry {
504 public_key: packet_1,
505 chain_addr: chain_1,
506 published_at: 1,
507 entry_type: AccountType::NotAnnounced,
508 },
509 )
510 .await?;
511
512 let acc = db.get_account(None, chain_1).await?.expect("should contain account");
513 assert_eq!(packet_1, acc.public_key, "pub keys must match");
514 assert_eq!(AccountType::NotAnnounced, acc.entry_type.clone());
515 assert_eq!(1, acc.published_at);
516
517 let maddr: Multiaddr = "/ip4/1.2.3.4/tcp/8000".parse()?;
518 let block = 100;
519
520 let db_acc = db.insert_announcement(None, chain_1, maddr.clone(), block).await?;
521
522 let acc = db.get_account(None, chain_1).await?.context("should contain account")?;
523 assert_eq!(Some(maddr.clone()), acc.get_multiaddr(), "multiaddress must match");
524 assert_eq!(Some(block), acc.updated_at());
525 assert_eq!(acc, db_acc);
526
527 let block = 200;
528 let db_acc = db.insert_announcement(None, chain_1, maddr.clone(), block).await?;
529
530 let acc = db.get_account(None, chain_1).await?.expect("should contain account");
531 assert_eq!(Some(maddr), acc.get_multiaddr(), "multiaddress must match");
532 assert_eq!(Some(block), acc.updated_at());
533 assert_eq!(acc, db_acc);
534
535 let maddr: Multiaddr = "/dns4/useful.domain/tcp/56".parse()?;
536 let block = 300;
537 let db_acc = db.insert_announcement(None, chain_1, maddr.clone(), block).await?;
538
539 let acc = db.get_account(None, chain_1).await?.expect("should contain account");
540 assert_eq!(Some(maddr), acc.get_multiaddr(), "multiaddress must match");
541 assert_eq!(Some(block), acc.updated_at());
542 assert_eq!(acc, db_acc);
543
544 Ok(())
545 }
546
547 #[tokio::test]
548 async fn test_should_allow_reannouncement() -> anyhow::Result<()> {
549 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
550
551 let chain_1 = ChainKeypair::random().public().to_address();
552 let packet_1 = *OffchainKeypair::random().public();
553
554 db.insert_account(
555 None,
556 AccountEntry {
557 public_key: packet_1,
558 chain_addr: chain_1,
559 published_at: 1,
560 entry_type: AccountType::NotAnnounced,
561 },
562 )
563 .await?;
564
565 db.insert_announcement(None, chain_1, "/ip4/1.2.3.4/tcp/8000".parse()?, 100)
566 .await?;
567
568 let ae = db.get_account(None, chain_1).await?.ok_or(MissingAccount)?;
569
570 assert_eq!(
571 "/ip4/1.2.3.4/tcp/8000",
572 ae.get_multiaddr().expect("has multiaddress").to_string()
573 );
574
575 db.insert_announcement(None, chain_1, "/ip4/1.2.3.4/tcp/8001".parse()?, 110)
576 .await?;
577
578 let ae = db.get_account(None, chain_1).await?.ok_or(MissingAccount)?;
579
580 assert_eq!(
581 "/ip4/1.2.3.4/tcp/8001",
582 ae.get_multiaddr().expect("has multiaddress").to_string()
583 );
584
585 Ok(())
586 }
587
588 #[tokio::test]
589 async fn test_should_not_insert_account_announcement_to_nonexisting_account() -> anyhow::Result<()> {
590 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
591
592 let chain_1 = ChainKeypair::random().public().to_address();
593
594 let maddr: Multiaddr = "/ip4/1.2.3.4/tcp/8000".parse()?;
595 let block = 100;
596
597 let r = db.insert_announcement(None, chain_1, maddr.clone(), block).await;
598 assert!(
599 matches!(r, Err(MissingAccount)),
600 "should not insert announcement to non-existing account"
601 );
602
603 Ok(())
604 }
605
606 #[tokio::test]
607 async fn test_should_allow_duplicate_announcement_per_different_accounts() -> anyhow::Result<()> {
608 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
609
610 let chain_1 = ChainKeypair::random().public().to_address();
611 let packet_1 = *OffchainKeypair::random().public();
612
613 db.insert_account(
614 None,
615 AccountEntry {
616 public_key: packet_1,
617 chain_addr: chain_1,
618 published_at: 1,
619 entry_type: AccountType::NotAnnounced,
620 },
621 )
622 .await?;
623
624 let chain_2 = ChainKeypair::random().public().to_address();
625 let packet_2 = *OffchainKeypair::random().public();
626
627 db.insert_account(
628 None,
629 AccountEntry {
630 public_key: packet_2,
631 chain_addr: chain_2,
632 published_at: 2,
633 entry_type: AccountType::NotAnnounced,
634 },
635 )
636 .await?;
637
638 let maddr: Multiaddr = "/ip4/1.2.3.4/tcp/8000".parse()?;
639 let block = 100;
640
641 let db_acc_1 = db.insert_announcement(None, chain_1, maddr.clone(), block).await?;
642 let db_acc_2 = db.insert_announcement(None, chain_2, maddr.clone(), block).await?;
643
644 let acc = db.get_account(None, chain_1).await?.expect("should contain account");
645 assert_eq!(Some(maddr.clone()), acc.get_multiaddr(), "multiaddress must match");
646 assert_eq!(Some(block), acc.updated_at());
647 assert_eq!(acc, db_acc_1);
648
649 let acc = db.get_account(None, chain_2).await?.expect("should contain account");
650 assert_eq!(Some(maddr.clone()), acc.get_multiaddr(), "multiaddress must match");
651 assert_eq!(Some(block), acc.updated_at());
652 assert_eq!(acc, db_acc_2);
653
654 Ok(())
655 }
656
657 #[tokio::test]
658 async fn test_delete_account() -> anyhow::Result<()> {
659 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
660
661 let packet_1 = *OffchainKeypair::random().public();
662 let chain_1 = ChainKeypair::random().public().to_address();
663 db.insert_account(
664 None,
665 AccountEntry {
666 public_key: packet_1,
667 chain_addr: chain_1,
668 published_at: 1,
669 entry_type: AccountType::Announced {
670 multiaddr: "/ip4/1.2.3.4/tcp/1234".parse()?,
671 updated_block: 10,
672 },
673 },
674 )
675 .await?;
676
677 assert!(db.get_account(None, chain_1).await?.is_some());
678
679 db.delete_account(None, chain_1).await?;
680
681 assert!(db.get_account(None, chain_1).await?.is_none());
682
683 Ok(())
684 }
685
686 #[tokio::test]
687 async fn test_should_fail_to_delete_nonexistent_account() -> anyhow::Result<()> {
688 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
689
690 let chain_1 = ChainKeypair::random().public().to_address();
691
692 let r = db.delete_account(None, chain_1).await;
693 assert!(
694 matches!(r, Err(MissingAccount)),
695 "should not delete non-existing account"
696 );
697
698 Ok(())
699 }
700
701 #[tokio::test]
702 async fn test_should_not_fail_on_duplicate_account_insert() -> anyhow::Result<()> {
703 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
704
705 let chain_1 = ChainKeypair::random().public().to_address();
706 let packet_1 = *OffchainKeypair::random().public();
707
708 db.insert_account(
709 None,
710 AccountEntry {
711 public_key: packet_1,
712 chain_addr: chain_1,
713 published_at: 1,
714 entry_type: AccountType::NotAnnounced,
715 },
716 )
717 .await?;
718
719 db.insert_account(
720 None,
721 AccountEntry {
722 public_key: packet_1,
723 chain_addr: chain_1,
724 published_at: 1,
725 entry_type: AccountType::NotAnnounced,
726 },
727 )
728 .await?;
729
730 Ok(())
731 }
732
733 #[tokio::test]
734 async fn test_delete_announcements() -> anyhow::Result<()> {
735 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
736
737 let packet_1 = *OffchainKeypair::random().public();
738 let chain_1 = ChainKeypair::random().public().to_address();
739 let mut entry = AccountEntry {
740 public_key: packet_1,
741 chain_addr: chain_1,
742 published_at: 1,
743 entry_type: AccountType::Announced {
744 multiaddr: "/ip4/1.2.3.4/tcp/1234".parse()?,
745 updated_block: 10,
746 },
747 };
748
749 db.insert_account(None, entry.clone()).await?;
750
751 assert_eq!(Some(entry.clone()), db.get_account(None, chain_1).await?);
752
753 db.delete_all_announcements(None, chain_1).await?;
754
755 entry.entry_type = NotAnnounced;
756
757 assert_eq!(Some(entry), db.get_account(None, chain_1).await?);
758
759 Ok(())
760 }
761
762 #[tokio::test]
763 async fn test_should_fail_to_delete_nonexistent_account_announcements() -> anyhow::Result<()> {
764 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
765
766 let chain_1 = ChainKeypair::random().public().to_address();
767
768 let r = db.delete_all_announcements(None, chain_1).await;
769 assert!(
770 matches!(r, Err(MissingAccount)),
771 "should not delete non-existing account"
772 );
773
774 Ok(())
775 }
776
777 #[tokio::test]
778 async fn test_translate_key() -> anyhow::Result<()> {
779 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
780
781 let chain_1 = ChainKeypair::random().public().to_address();
782 let packet_1 = *OffchainKeypair::random().public();
783
784 let chain_2 = ChainKeypair::random().public().to_address();
785 let packet_2 = *OffchainKeypair::random().public();
786
787 let db_clone = db.clone();
788 db.begin_transaction()
789 .await?
790 .perform(|tx| {
791 Box::pin(async move {
792 db_clone
793 .insert_account(
794 tx.into(),
795 AccountEntry {
796 public_key: packet_1,
797 chain_addr: chain_1,
798 published_at: 1,
799 entry_type: AccountType::NotAnnounced,
800 },
801 )
802 .await?;
803 db_clone
804 .insert_account(
805 tx.into(),
806 AccountEntry {
807 public_key: packet_2,
808 chain_addr: chain_2,
809 published_at: 2,
810 entry_type: AccountType::NotAnnounced,
811 },
812 )
813 .await?;
814 Ok::<(), DbSqlError>(())
815 })
816 })
817 .await?;
818
819 let a: Address = db
820 .translate_key(None, packet_1)
821 .await?
822 .context("must contain key")?
823 .try_into()?;
824
825 let b: OffchainPublicKey = db
826 .translate_key(None, chain_2)
827 .await?
828 .context("must contain key")?
829 .try_into()?;
830
831 assert_eq!(chain_1, a, "chain keys must match");
832 assert_eq!(packet_2, b, "chain keys must match");
833
834 Ok(())
835 }
836
837 #[tokio::test]
838 async fn test_translate_key_no_cache() -> anyhow::Result<()> {
839 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
840
841 let chain_1 = ChainKeypair::random().public().to_address();
842 let packet_1 = *OffchainKeypair::random().public();
843
844 let chain_2 = ChainKeypair::random().public().to_address();
845 let packet_2 = *OffchainKeypair::random().public();
846
847 let db_clone = db.clone();
848 db.begin_transaction()
849 .await?
850 .perform(|tx| {
851 Box::pin(async move {
852 db_clone
853 .insert_account(
854 tx.into(),
855 AccountEntry {
856 public_key: packet_1,
857 chain_addr: chain_1,
858 published_at: 1,
859 entry_type: AccountType::NotAnnounced,
860 },
861 )
862 .await?;
863 db_clone
864 .insert_account(
865 tx.into(),
866 AccountEntry {
867 public_key: packet_2,
868 chain_addr: chain_2,
869 published_at: 2,
870 entry_type: AccountType::NotAnnounced,
871 },
872 )
873 .await?;
874 Ok::<(), DbSqlError>(())
875 })
876 })
877 .await?;
878
879 db.caches.invalidate_all();
880
881 let a: Address = db
882 .translate_key(None, packet_1)
883 .await?
884 .context("must contain key")?
885 .try_into()?;
886
887 let b: OffchainPublicKey = db
888 .translate_key(None, chain_2)
889 .await?
890 .context("must contain key")?
891 .try_into()?;
892
893 assert_eq!(chain_1, a, "chain keys must match");
894 assert_eq!(packet_2, b, "chain keys must match");
895
896 Ok(())
897 }
898
899 #[tokio::test]
900 async fn test_stream_accounts() -> anyhow::Result<()> {
901 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
902
903 let chain_1 = ChainKeypair::random().public().to_address();
904 let chain_2 = ChainKeypair::random().public().to_address();
905 let chain_3 = ChainKeypair::random().public().to_address();
906
907 let db_clone = db.clone();
908 db.begin_transaction()
909 .await?
910 .perform(|tx| {
911 Box::pin(async move {
912 db_clone
913 .insert_account(
914 Some(tx),
915 AccountEntry {
916 public_key: *OffchainKeypair::random().public(),
917 chain_addr: chain_1,
918 entry_type: AccountType::NotAnnounced,
919 published_at: 1,
920 },
921 )
922 .await?;
923 db_clone
924 .insert_account(
925 Some(tx),
926 AccountEntry {
927 public_key: *OffchainKeypair::random().public(),
928 chain_addr: chain_2,
929 entry_type: AccountType::Announced {
930 multiaddr: "/ip4/10.10.10.10/tcp/1234".parse().map_err(|_| DecodingError)?,
931 updated_block: 10,
932 },
933 published_at: 2,
934 },
935 )
936 .await?;
937 db_clone
938 .insert_account(
939 Some(tx),
940 AccountEntry {
941 public_key: *OffchainKeypair::random().public(),
942 chain_addr: chain_3,
943 entry_type: AccountType::NotAnnounced,
944 published_at: 3,
945 },
946 )
947 .await?;
948
949 db_clone
950 .insert_announcement(
951 Some(tx),
952 chain_3,
953 "/ip4/1.2.3.4/tcp/1234".parse().map_err(|_| DecodingError)?,
954 12,
955 )
956 .await?;
957 db_clone
958 .insert_announcement(
959 Some(tx),
960 chain_3,
961 "/ip4/8.8.1.1/tcp/1234".parse().map_err(|_| DecodingError)?,
962 15,
963 )
964 .await?;
965 db_clone
966 .insert_announcement(
967 Some(tx),
968 chain_3,
969 "/ip4/1.2.3.0/tcp/234".parse().map_err(|_| DecodingError)?,
970 14,
971 )
972 .await
973 })
974 })
975 .await?;
976
977 let all_accounts = db.stream_accounts(false).await?.collect::<Vec<_>>().await;
978 let public_only = db.stream_accounts(true).await?.collect::<Vec<_>>().await;
979
980 assert_eq!(3, all_accounts.len());
981
982 assert_eq!(2, public_only.len());
983 let acc_1 = public_only
984 .iter()
985 .find(|a| a.chain_addr.eq(&chain_2))
986 .expect("should contain 1");
987
988 let acc_2 = public_only
989 .iter()
990 .find(|a| a.chain_addr.eq(&chain_3))
991 .expect("should contain 2");
992
993 assert_eq!(
994 "/ip4/10.10.10.10/tcp/1234",
995 acc_1.get_multiaddr().expect("should have a multiaddress").to_string()
996 );
997 assert_eq!(
998 "/ip4/8.8.1.1/tcp/1234",
999 acc_2.get_multiaddr().expect("should have a multiaddress").to_string()
1000 );
1001
1002 Ok(())
1003 }
1004
1005 #[tokio::test]
1006 async fn test_get_offchain_key_should_return_nothing_if_a_mapping_to_chain_key_does_not_exist() -> anyhow::Result<()>
1007 {
1008 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1009
1010 let chain = ChainKeypair::random().public().to_address();
1011
1012 let actual_pk = db.resolve_packet_key(&chain).await?;
1013 assert_eq!(actual_pk, None, "offchain key should not be present");
1014 Ok(())
1015 }
1016
1017 #[tokio::test]
1018 async fn test_get_chain_key_should_return_nothing_if_a_mapping_to_offchain_key_does_not_exist() -> anyhow::Result<()>
1019 {
1020 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1021
1022 let packet = *OffchainKeypair::random().public();
1023
1024 let actual_ck = db.resolve_chain_key(&packet).await?;
1025 assert_eq!(actual_ck, None, "chain key should not be present");
1026 Ok(())
1027 }
1028
1029 #[tokio::test]
1030 async fn test_get_chain_key_should_succeed_if_a_mapping_to_offchain_key_exists() -> anyhow::Result<()> {
1031 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1032
1033 let chain_1 = ChainKeypair::random().public().to_address();
1036 let packet_1 = *OffchainKeypair::random().public();
1037 let account_1 = hopr_db_entity::account::ActiveModel {
1038 chain_key: Set(chain_1.to_hex()),
1039 packet_key: Set(packet_1.to_hex()),
1040 ..Default::default()
1041 };
1042
1043 let chain_2 = ChainKeypair::random().public().to_address();
1044 let packet_2 = *OffchainKeypair::random().public();
1045 let account_2 = hopr_db_entity::account::ActiveModel {
1046 chain_key: Set(chain_2.to_hex()),
1047 packet_key: Set(packet_2.to_hex()),
1048 ..Default::default()
1049 };
1050
1051 hopr_db_entity::account::Entity::insert_many([account_1, account_2])
1052 .exec(db.index_db.read_write())
1053 .await?;
1054
1055 let actual_ck = db.resolve_chain_key(&packet_1).await?;
1056 assert_eq!(actual_ck, Some(chain_1), "chain keys must match");
1057 Ok(())
1058 }
1059
1060 #[tokio::test]
1061 async fn test_get_chain_key_should_succeed_if_a_mapping_to_offchain_key_exists_with_cache() -> anyhow::Result<()> {
1062 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1063
1064 let chain_1 = ChainKeypair::random().public().to_address();
1067 let packet_1 = *OffchainKeypair::random().public();
1068 db.insert_account(
1069 None,
1070 AccountEntry {
1071 public_key: packet_1,
1072 chain_addr: chain_1,
1073 entry_type: AccountType::NotAnnounced,
1074 published_at: 1,
1075 },
1076 )
1077 .await?;
1078
1079 let chain_2 = ChainKeypair::random().public().to_address();
1080 let packet_2 = *OffchainKeypair::random().public();
1081 db.insert_account(
1082 None,
1083 AccountEntry {
1084 public_key: packet_2,
1085 chain_addr: chain_2,
1086 entry_type: AccountType::NotAnnounced,
1087 published_at: 1,
1088 },
1089 )
1090 .await?;
1091
1092 let actual_ck = db.resolve_chain_key(&packet_1).await?;
1093 assert_eq!(actual_ck, Some(chain_1), "chain keys must match");
1094 Ok(())
1095 }
1096
1097 #[tokio::test]
1098 async fn test_get_offchain_key_should_succeed_if_a_mapping_to_chain_key_exists() -> anyhow::Result<()> {
1099 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1100
1101 let chain_1 = ChainKeypair::random().public().to_address();
1104 let packet_1 = *OffchainKeypair::random().public();
1105 let account_1 = hopr_db_entity::account::ActiveModel {
1106 chain_key: Set(chain_1.to_hex()),
1107 packet_key: Set(packet_1.to_hex()),
1108 ..Default::default()
1109 };
1110
1111 let chain_2 = ChainKeypair::random().public().to_address();
1112 let packet_2 = *OffchainKeypair::random().public();
1113 let account_2 = hopr_db_entity::account::ActiveModel {
1114 chain_key: Set(chain_2.to_hex()),
1115 packet_key: Set(packet_2.to_hex()),
1116 ..Default::default()
1117 };
1118
1119 hopr_db_entity::account::Entity::insert_many([account_1, account_2])
1120 .exec(db.index_db.read_write())
1121 .await?;
1122
1123 let actual_pk = db.resolve_packet_key(&chain_2).await?;
1124
1125 assert_eq!(actual_pk, Some(packet_2), "packet keys must match");
1126 Ok(())
1127 }
1128
1129 #[tokio::test]
1130 async fn test_get_offchain_key_should_succeed_if_a_mapping_to_chain_key_exists_with_cache() -> anyhow::Result<()> {
1131 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1132
1133 let chain_1 = ChainKeypair::random().public().to_address();
1136 let packet_1 = *OffchainKeypair::random().public();
1137 db.insert_account(
1138 None,
1139 AccountEntry {
1140 public_key: packet_1,
1141 chain_addr: chain_1,
1142 entry_type: AccountType::NotAnnounced,
1143 published_at: 1,
1144 },
1145 )
1146 .await?;
1147
1148 let chain_2 = ChainKeypair::random().public().to_address();
1149 let packet_2 = *OffchainKeypair::random().public();
1150 db.insert_account(
1151 None,
1152 AccountEntry {
1153 public_key: packet_2,
1154 chain_addr: chain_2,
1155 entry_type: AccountType::NotAnnounced,
1156 published_at: 1,
1157 },
1158 )
1159 .await?;
1160
1161 let actual_pk = db.resolve_packet_key(&chain_2).await?;
1162
1163 assert_eq!(actual_pk, Some(packet_2), "packet keys must match");
1164 Ok(())
1165 }
1166}