1use std::{
2 cmp::Ordering,
3 fmt::{Debug, Formatter},
4 ops::Add,
5 sync::Arc,
6 time::{Duration, SystemTime},
7};
8
9use alloy::{primitives::B256, sol_types::SolEventInterface};
10use async_trait::async_trait;
11use hopr_bindings::{
12 hoprannouncements::HoprAnnouncements::HoprAnnouncementsEvents, hoprchannels::HoprChannels::HoprChannelsEvents,
13 hoprnetworkregistry::HoprNetworkRegistry::HoprNetworkRegistryEvents,
14 hoprnodemanagementmodule::HoprNodeManagementModule::HoprNodeManagementModuleEvents,
15 hoprnodesaferegistry::HoprNodeSafeRegistry::HoprNodeSafeRegistryEvents,
16 hoprticketpriceoracle::HoprTicketPriceOracle::HoprTicketPriceOracleEvents, hoprtoken::HoprToken::HoprTokenEvents,
17 hoprwinningprobabilityoracle::HoprWinningProbabilityOracle::HoprWinningProbabilityOracleEvents,
18};
19use hopr_chain_rpc::{HoprIndexerRpcOperations, Log};
20use hopr_chain_types::{
21 ContractAddresses,
22 chain_events::{ChainEventType, NetworkRegistryStatus, SignificantChainEvent},
23};
24use hopr_crypto_types::prelude::*;
25use hopr_db_sql::{
26 HoprDbAllOperations, OpenTransaction,
27 api::{info::DomainSeparator, tickets::TicketSelector},
28 errors::DbSqlError,
29 prelude::TicketMarker,
30};
31use hopr_internal_types::prelude::*;
32use hopr_primitive_types::prelude::*;
33use tracing::{debug, error, info, trace, warn};
34
35use crate::errors::{CoreEthereumIndexerError, Result};
36
37#[cfg(all(feature = "prometheus", not(test)))]
38lazy_static::lazy_static! {
39 static ref METRIC_INDEXER_LOG_COUNTERS: hopr_metrics::MultiCounter =
40 hopr_metrics::MultiCounter::new(
41 "hopr_indexer_contract_log_count",
42 "Counts of different HOPR contract logs processed by the Indexer",
43 &["contract"]
44 ).unwrap();
45}
46
47#[derive(Clone)]
52pub struct ContractEventHandlers<T, Db> {
53 addresses: Arc<ContractAddresses>,
56 safe_address: Address,
58 chain_key: ChainKeypair, db: Db,
62 rpc_operations: T,
64}
65
66impl<T, Db> Debug for ContractEventHandlers<T, Db> {
67 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68 f.debug_struct("ContractEventHandler")
69 .field("addresses", &self.addresses)
70 .field("safe_address", &self.safe_address)
71 .field("chain_key", &self.chain_key)
72 .finish_non_exhaustive()
73 }
74}
75
76impl<T, Db> ContractEventHandlers<T, Db>
77where
78 T: HoprIndexerRpcOperations + Clone + Send + 'static,
79 Db: HoprDbAllOperations + Clone,
80{
81 pub fn new(
99 addresses: ContractAddresses,
100 safe_address: Address,
101 chain_key: ChainKeypair,
102 db: Db,
103 rpc_operations: T,
104 ) -> Self {
105 Self {
106 addresses: Arc::new(addresses),
107 safe_address,
108 chain_key,
109 db,
110 rpc_operations,
111 }
112 }
113
114 async fn on_announcement_event(
115 &self,
116 tx: &OpenTransaction,
117 event: HoprAnnouncementsEvents,
118 block_number: u32,
119 _is_synced: bool,
120 ) -> Result<Option<ChainEventType>> {
121 #[cfg(all(feature = "prometheus", not(test)))]
122 METRIC_INDEXER_LOG_COUNTERS.increment(&["announcements"]);
123
124 match event {
125 HoprAnnouncementsEvents::AddressAnnouncement(address_announcement) => {
126 debug!(
127 multiaddress = &address_announcement.baseMultiaddr,
128 address = &address_announcement.node.to_string(),
129 "on_announcement_event: AddressAnnouncement",
130 );
131 if address_announcement.baseMultiaddr.is_empty() {
133 warn!(
134 address = %address_announcement.node,
135 "encountered empty multiaddress announcement",
136 );
137 return Ok(None);
138 }
139 let node_address: Address = address_announcement.node.into();
140
141 return match self
142 .db
143 .insert_announcement(
144 Some(tx),
145 node_address,
146 address_announcement.baseMultiaddr.parse()?,
147 block_number,
148 )
149 .await
150 {
151 Ok(account) => Ok(Some(ChainEventType::Announcement {
152 peer: account.public_key.into(),
153 address: account.chain_addr,
154 multiaddresses: vec![account.get_multiaddr().expect("not must contain multiaddr")],
155 })),
156 Err(DbSqlError::MissingAccount) => Err(CoreEthereumIndexerError::AnnounceBeforeKeyBinding),
157 Err(e) => Err(e.into()),
158 };
159 }
160 HoprAnnouncementsEvents::KeyBinding(key_binding) => {
161 debug!(
162 address = %key_binding.chain_key,
163 public_key = %key_binding.ed25519_pub_key,
164 "on_announcement_event: KeyBinding",
165 );
166 match KeyBinding::from_parts(
167 key_binding.chain_key.into(),
168 key_binding.ed25519_pub_key.0.try_into()?,
169 OffchainSignature::try_from((key_binding.ed25519_sig_0.0, key_binding.ed25519_sig_1.0))?,
170 ) {
171 Ok(binding) => {
172 match self
173 .db
174 .insert_account(
175 Some(tx),
176 AccountEntry {
177 public_key: binding.packet_key,
178 chain_addr: binding.chain_key,
179 entry_type: AccountType::NotAnnounced,
180 published_at: block_number,
181 },
182 )
183 .await
184 {
185 Ok(_) => (),
186 Err(err) => {
187 error!(%err, "failed to store announcement key binding")
190 }
191 }
192 }
193 Err(e) => {
194 warn!(
195 address = ?key_binding.chain_key,
196 error = %e,
197 "Filtering announcement with invalid signature",
198
199 )
200 }
201 }
202 }
203 HoprAnnouncementsEvents::RevokeAnnouncement(revocation) => {
204 let node_address: Address = revocation.node.into();
205 match self.db.delete_all_announcements(Some(tx), node_address).await {
206 Err(DbSqlError::MissingAccount) => {
207 return Err(CoreEthereumIndexerError::RevocationBeforeKeyBinding);
208 }
209 Err(e) => return Err(e.into()),
210 _ => {}
211 }
212 }
213 };
214
215 Ok(None)
216 }
217
218 async fn on_channel_event(
219 &self,
220 tx: &OpenTransaction,
221 event: HoprChannelsEvents,
222 is_synced: bool,
223 ) -> Result<Option<ChainEventType>> {
224 #[cfg(all(feature = "prometheus", not(test)))]
225 METRIC_INDEXER_LOG_COUNTERS.increment(&["channels"]);
226
227 match event {
228 HoprChannelsEvents::ChannelBalanceDecreased(balance_decreased) => {
229 let channel_id = balance_decreased.channelId.0.into();
230
231 let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
232 Ok(channel) => channel,
233 Err(e) => {
234 error!(%channel_id, %e, "failed to begin channel update on on_channel_balance_decreased_event");
235 return Err(e.into());
236 }
237 };
238
239 trace!(
240 %channel_id,
241 is_channel = maybe_channel.is_some(),
242 "on_channel_balance_decreased_event",
243 );
244
245 if let Some(channel_edits) = maybe_channel {
246 let new_balance = HoprBalance::from_be_bytes(balance_decreased.newBalance.to_be_bytes::<12>());
247 let diff = channel_edits.entry().balance - new_balance;
248
249 let updated_channel = self
250 .db
251 .finish_channel_update(tx.into(), channel_edits.change_balance(new_balance))
252 .await?
253 .ok_or(CoreEthereumIndexerError::ProcessError(format!(
254 "channel balance decreased event for channel {channel_id} did not return an updated \
255 channel"
256 )))?;
257
258 if is_synced
259 && (updated_channel.source == self.chain_key.public().to_address()
260 || updated_channel.destination == self.chain_key.public().to_address())
261 {
262 info!("updating safe balance from chain after channel balance decreased event");
263 match self.rpc_operations.get_hopr_balance(self.safe_address).await {
264 Ok(balance) => {
265 self.db.set_safe_hopr_balance(Some(tx), balance).await?;
266 }
267 Err(error) => {
268 error!(%error, "error getting safe balance from chain after channel balance decreased event");
269 }
270 }
271 }
272
273 Ok(Some(ChainEventType::ChannelBalanceDecreased(updated_channel, diff)))
274 } else {
275 error!(%channel_id, "observed balance decreased event for a channel that does not exist");
276 self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
277 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
278 }
279 }
280 HoprChannelsEvents::ChannelBalanceIncreased(balance_increased) => {
281 let channel_id = balance_increased.channelId.0.into();
282
283 let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
284 Ok(channel) => channel,
285 Err(e) => {
286 error!(%channel_id, %e, "failed to begin channel update on on_channel_balance_increased_event");
287 return Err(e.into());
288 }
289 };
290
291 trace!(
292 %channel_id,
293 is_channel = maybe_channel.is_some(),
294 "on_channel_balance_increased_event",
295 );
296
297 if let Some(channel_edits) = maybe_channel {
298 let new_balance = HoprBalance::from_be_bytes(balance_increased.newBalance.to_be_bytes::<12>());
299 let diff = new_balance - channel_edits.entry().balance;
300
301 let updated_channel = self
302 .db
303 .finish_channel_update(tx.into(), channel_edits.change_balance(new_balance))
304 .await?
305 .ok_or(CoreEthereumIndexerError::ProcessError(format!(
306 "channel balance increased event for channel {channel_id} did not return an updated \
307 channel"
308 )))?;
309
310 if updated_channel.source == self.chain_key.public().to_address() && is_synced {
311 info!("updating safe balance from chain after channel balance increased event");
312 match self.rpc_operations.get_hopr_balance(self.safe_address).await {
313 Ok(balance) => {
314 self.db.set_safe_hopr_balance(Some(tx), balance).await?;
315 }
316 Err(error) => {
317 error!(%error, "error getting safe balance from chain after channel balance increased event");
318 }
319 }
320 info!("updating safe allowance from chain after channel balance increased event");
321 match self
322 .rpc_operations
323 .get_hopr_allowance(self.safe_address, self.addresses.channels)
324 .await
325 {
326 Ok(allowance) => {
327 self.db.set_safe_hopr_allowance(Some(tx), allowance).await?;
328 }
329 Err(error) => {
330 error!(%error, "error getting safe allowance from chain after channel balance increased event");
331 }
332 }
333 }
334
335 Ok(Some(ChainEventType::ChannelBalanceIncreased(updated_channel, diff)))
336 } else {
337 error!(%channel_id, "observed balance increased event for a channel that does not exist");
338 self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
339 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
340 }
341 }
342 HoprChannelsEvents::ChannelClosed(channel_closed) => {
343 let channel_id = channel_closed.channelId.0.into();
344
345 let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
346 Ok(channel) => channel,
347 Err(e) => {
348 error!(%channel_id, %e, "failed to begin channel update on on_channel_closed_event");
349 return Err(e.into());
350 }
351 };
352
353 trace!(
354 %channel_id,
355 is_channel = maybe_channel.is_some(),
356 "on_channel_closed_event",
357 );
358
359 if let Some(channel_edits) = maybe_channel {
360 let channel_id = channel_edits.entry().get_id();
361 let orientation = channel_edits.entry().orientation(&self.chain_key.public().to_address());
362
363 let updated_channel = if let Some((direction, _)) = orientation {
366 let channel_edits = channel_edits
368 .change_status(ChannelStatus::Closed)
369 .change_balance(HoprBalance::zero())
370 .change_ticket_index(0);
371
372 let updated_channel = self.db.finish_channel_update(tx.into(), channel_edits).await?.ok_or(
373 CoreEthereumIndexerError::ProcessError(format!(
374 "channel closed event for channel {channel_id} did not return an updated channel",
375 )),
376 )?;
377
378 match direction {
380 ChannelDirection::Incoming => {
381 self.db
383 .mark_tickets_as(updated_channel.into(), TicketMarker::Neglected)
384 .await?;
385 }
386 ChannelDirection::Outgoing => {
387 self.db.reset_outgoing_ticket_index(channel_id).await?;
389 }
390 }
391 updated_channel
392 } else {
393 let updated_channel = self
395 .db
396 .finish_channel_update(tx.into(), channel_edits.delete())
397 .await?
398 .ok_or(CoreEthereumIndexerError::ProcessError(format!(
399 "channel closed event for channel {channel_id} did not return an updated channel",
400 )))?;
401
402 debug!(%channel_id, "foreign closed closed channel was deleted");
403 updated_channel
404 };
405
406 Ok(Some(ChainEventType::ChannelClosed(updated_channel)))
407 } else {
408 error!(%channel_id, "observed closure finalization event for a channel that does not exist.");
409 self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
410 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
411 }
412 }
413 HoprChannelsEvents::ChannelOpened(channel_opened) => {
414 let source: Address = channel_opened.source.into();
415 let destination: Address = channel_opened.destination.into();
416 let channel_id = generate_channel_id(&source, &destination);
417
418 let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
419 Ok(channel) => channel,
420 Err(e) => {
421 error!(%source, %destination, %channel_id, %e, "failed to begin channel update on on_channel_opened_event");
422 return Err(e.into());
423 }
424 };
425
426 let channel = if let Some(channel_edits) = maybe_channel {
427 if channel_edits.entry().status != ChannelStatus::Closed {
431 warn!(%source, %destination, %channel_id, "received Open event for a channel that is not Closed, marking it as corrupted");
432
433 self.db.finish_channel_update(tx.into(), channel_edits.delete()).await?;
434 self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
435
436 return Ok(None);
437 }
438
439 trace!(%source, %destination, %channel_id, "on_channel_reopened_event");
440
441 let current_epoch = channel_edits.entry().channel_epoch;
442
443 if source == self.chain_key.public().to_address()
445 || destination == self.chain_key.public().to_address()
446 {
447 self.db
448 .mark_tickets_as(TicketSelector::new(channel_id, current_epoch), TicketMarker::Neglected)
449 .await?;
450
451 self.db.reset_outgoing_ticket_index(channel_id).await?;
452 }
453
454 self.db
456 .finish_channel_update(
457 tx.into(),
458 channel_edits
459 .change_ticket_index(0_u32)
460 .change_epoch(current_epoch.add(1))
461 .change_status(ChannelStatus::Open),
462 )
463 .await?
464 .ok_or(CoreEthereumIndexerError::ProcessError(format!(
465 "channel opened event for channel {channel_id} did not return an updated channel",
466 )))?
467 } else {
468 trace!(%source, %destination, %channel_id, "on_channel_opened_event");
469
470 let new_channel = ChannelEntry::new(
471 source,
472 destination,
473 0_u32.into(),
474 0_u32.into(),
475 ChannelStatus::Open,
476 1_u32.into(),
477 );
478
479 self.db.upsert_channel(tx.into(), new_channel).await?;
480 new_channel
481 };
482
483 Ok(Some(ChainEventType::ChannelOpened(channel)))
484 }
485 HoprChannelsEvents::TicketRedeemed(ticket_redeemed) => {
486 let channel_id = ticket_redeemed.channelId.0.into();
487
488 let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
489 Ok(channel) => channel,
490 Err(e) => {
491 error!(%channel_id, %e, "failed to begin channel update on on_ticket_redeemed_event");
492 return Err(e.into());
493 }
494 };
495
496 if let Some(channel_edits) = maybe_channel {
497 let ack_ticket = match channel_edits.entry().direction(&self.chain_key.public().to_address()) {
498 Some(ChannelDirection::Incoming) => {
501 let mut matching_tickets = self
503 .db
504 .get_tickets(
505 TicketSelector::from(channel_edits.entry())
506 .with_state(AcknowledgedTicketStatus::BeingRedeemed),
507 )
508 .await?
509 .into_iter()
510 .filter(|ticket| {
511 let ticket_idx = ticket.verified_ticket().index;
516 let ticket_off = ticket.verified_ticket().index_offset as u64;
517
518 ticket_idx + ticket_off == ticket_redeemed.newTicketIndex.to::<u64>()
519 })
520 .collect::<Vec<_>>();
521
522 match matching_tickets.len().cmp(&1) {
523 Ordering::Equal => {
524 let ack_ticket = matching_tickets.pop().unwrap();
525
526 self.db
527 .mark_tickets_as((&ack_ticket).into(), TicketMarker::Redeemed)
528 .await?;
529 info!(%ack_ticket, "ticket marked as redeemed");
530 Some(ack_ticket)
531 }
532 Ordering::Less => {
533 error!(
534 idx = %ticket_redeemed.newTicketIndex.to::<u64>() - 1,
535 entry = %channel_edits.entry(),
536 "could not find acknowledged 'BeingRedeemed' ticket",
537 );
538 None
541 }
542 Ordering::Greater => {
543 error!(
544 count = matching_tickets.len(),
545 index = %ticket_redeemed.newTicketIndex.to::<u64>() - 1,
546 entry = %channel_edits.entry(),
547 "found tickets matching 'BeingRedeemed'",
548 );
549
550 let entry_str = channel_edits.entry().to_string();
551
552 self.db.finish_channel_update(tx.into(), channel_edits.delete()).await?;
553 self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
554
555 return Err(CoreEthereumIndexerError::ProcessError(format!(
556 "multiple tickets matching idx {} found in {}",
557 ticket_redeemed.newTicketIndex.to::<u64>() - 1,
558 entry_str
559 )));
560 }
561 }
562 }
563 Some(ChannelDirection::Outgoing) => {
568 debug!(channel = %channel_edits.entry(), "observed redeem event on an outgoing channel");
570 self.db
571 .compare_and_set_outgoing_ticket_index(
572 channel_edits.entry().get_id(),
573 ticket_redeemed.newTicketIndex.to::<u64>(),
574 )
575 .await?;
576 None
577 }
578 None => {
580 debug!(channel = %channel_edits.entry(), "observed redeem event on a foreign channel");
582 None
583 }
584 };
585
586 let channel = self
588 .db
589 .finish_channel_update(
590 tx.into(),
591 channel_edits.change_ticket_index(U256::from_be_bytes(
592 ticket_redeemed.newTicketIndex.to_be_bytes::<6>(),
593 )),
594 )
595 .await?
596 .ok_or(CoreEthereumIndexerError::ProcessError(format!(
597 "ticket redeemed event for channel {channel_id} did not return an updated channel",
598 )))?;
599
600 self.db
603 .mark_tickets_as(
604 TicketSelector::from(&channel)
605 .with_index_range(..ticket_redeemed.newTicketIndex.to::<u64>()),
606 TicketMarker::Neglected,
607 )
608 .await?;
609
610 Ok(Some(ChainEventType::TicketRedeemed(channel, ack_ticket)))
611 } else {
612 error!(%channel_id, "observed ticket redeem on a channel that we don't have in the DB");
613 self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
614 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
615 }
616 }
617 HoprChannelsEvents::OutgoingChannelClosureInitiated(closure_initiated) => {
618 let channel_id = closure_initiated.channelId.0.into();
619
620 let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
621 Ok(channel) => channel,
622 Err(e) => {
623 error!(%channel_id, %e, "failed to begin channel update on on_outgoing_channel_closure_initiated_event");
624 return Err(e.into());
625 }
626 };
627
628 let closure_time: u32 = closure_initiated.closureTime;
629 if let Some(channel_edits) = maybe_channel {
630 let new_status = ChannelStatus::PendingToClose(
631 SystemTime::UNIX_EPOCH.add(Duration::from_secs(closure_time.into())),
632 );
633
634 let channel = self
635 .db
636 .finish_channel_update(tx.into(), channel_edits.change_status(new_status))
637 .await?
638 .ok_or(CoreEthereumIndexerError::ProcessError(format!(
639 "channel closure initiation event for channel {channel_id} did not return an updated \
640 channel",
641 )))?;
642
643 Ok(Some(ChainEventType::ChannelClosureInitiated(channel)))
644 } else {
645 error!(%channel_id, "observed channel closure initiation on a channel that we don't have in the DB");
646 self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
647 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
648 }
649 }
650 HoprChannelsEvents::DomainSeparatorUpdated(domain_separator_updated) => {
651 self.db
652 .set_domain_separator(
653 Some(tx),
654 DomainSeparator::Channel,
655 domain_separator_updated.domainSeparator.0.into(),
656 )
657 .await?;
658
659 Ok(None)
660 }
661 HoprChannelsEvents::LedgerDomainSeparatorUpdated(ledger_domain_separator_updated) => {
662 self.db
663 .set_domain_separator(
664 Some(tx),
665 DomainSeparator::Ledger,
666 ledger_domain_separator_updated.ledgerDomainSeparator.0.into(),
667 )
668 .await?;
669
670 Ok(None)
671 }
672 }
673 }
674
675 async fn on_token_event(
676 &self,
677 tx: &OpenTransaction,
678 event: HoprTokenEvents,
679 is_synced: bool,
680 ) -> Result<Option<ChainEventType>> {
681 #[cfg(all(feature = "prometheus", not(test)))]
682 METRIC_INDEXER_LOG_COUNTERS.increment(&["token"]);
683
684 match event {
685 HoprTokenEvents::Transfer(transferred) => {
686 let from: Address = transferred.from.into();
687 let to: Address = transferred.to.into();
688
689 trace!(
690 safe_address = %&self.safe_address, %from, %to,
691 "on_token_transfer_event"
692 );
693
694 if to.ne(&self.safe_address) && from.ne(&self.safe_address) {
695 error!(
696 safe_address = %&self.safe_address, %from, %to,
697 "filter misconfiguration: transfer event not involving the safe");
698 return Ok(None);
699 }
700
701 if is_synced {
702 info!("updating safe balance from chain after transfer event");
703 match self.rpc_operations.get_hopr_balance(self.safe_address).await {
704 Ok(balance) => {
705 self.db.set_safe_hopr_balance(Some(tx), balance).await?;
706 }
707 Err(error) => {
708 error!(%error, "error getting safe balance from chain after transfer event");
709 }
710 }
711 info!("updating safe allowance from chain after transfer event");
712 match self
713 .rpc_operations
714 .get_hopr_allowance(self.safe_address, self.addresses.channels)
715 .await
716 {
717 Ok(allowance) => {
718 self.db.set_safe_hopr_allowance(Some(tx), allowance).await?;
719 }
720 Err(error) => {
721 error!(%error, "error getting safe allowance from chain after transfer event");
722 }
723 }
724 }
725 }
726 HoprTokenEvents::Approval(approved) => {
727 let owner: Address = approved.owner.into();
728 let spender: Address = approved.spender.into();
729
730 trace!(
731 address = %&self.safe_address, %owner, %spender, allowance = %approved.value,
732 "on_token_approval_event",
733
734 );
735
736 if owner.ne(&self.safe_address) || spender.ne(&self.addresses.channels) {
737 error!(
738 address = %&self.safe_address, %owner, %spender, allowance = %approved.value,
739 "filter misconfiguration: approval event not involving the safe and channels contract");
740 return Ok(None);
741 }
742
743 if is_synced {
745 info!("updating safe allowance from chain after approval event");
746 match self
747 .rpc_operations
748 .get_hopr_allowance(self.safe_address, self.addresses.channels)
749 .await
750 {
751 Ok(allowance) => {
752 self.db.set_safe_hopr_allowance(Some(tx), allowance).await?;
753 }
754 Err(error) => {
755 error!(%error, "error getting safe allowance from chain after approval event");
756 }
757 }
758 }
759 }
760 _ => error!("Implement all the other filters for HoprTokenEvents"),
761 }
762
763 Ok(None)
764 }
765
766 async fn on_network_registry_event(
767 &self,
768 tx: &OpenTransaction,
769 event: HoprNetworkRegistryEvents,
770 _is_synced: bool,
771 ) -> Result<Option<ChainEventType>> {
772 #[cfg(all(feature = "prometheus", not(test)))]
773 METRIC_INDEXER_LOG_COUNTERS.increment(&["network_registry"]);
774
775 match event {
776 HoprNetworkRegistryEvents::DeregisteredByManager(deregistered) => {
777 debug!(
778 node_address = %deregistered.nodeAddress,
779 "on_network_registry_deregistered_by_manager_event",
780 );
781 let node_address: Address = deregistered.nodeAddress.into();
782 self.db
783 .set_access_in_network_registry(Some(tx), node_address, false)
784 .await?;
785
786 return Ok(Some(ChainEventType::NetworkRegistryUpdate(
787 node_address,
788 NetworkRegistryStatus::Denied,
789 )));
790 }
791 HoprNetworkRegistryEvents::Deregistered(deregistered) => {
792 debug!(
793 node_address = %deregistered.nodeAddress,
794 "on_network_registry_deregistered_event",
795 );
796 let node_address: Address = deregistered.nodeAddress.into();
797 self.db
798 .set_access_in_network_registry(Some(tx), node_address, false)
799 .await?;
800
801 return Ok(Some(ChainEventType::NetworkRegistryUpdate(
802 node_address,
803 NetworkRegistryStatus::Denied,
804 )));
805 }
806 HoprNetworkRegistryEvents::RegisteredByManager(registered) => {
807 let node_address: Address = registered.nodeAddress.into();
808 debug!(
809 %node_address,
810 "Node registered by manager, adding to the network registry",
811 );
812 self.db
813 .set_access_in_network_registry(Some(tx), node_address, true)
814 .await?;
815
816 if node_address == self.chain_key.public().to_address() {
817 info!(
818 "This node has been added to the registry, node activation process continues on: http://hub.hoprnet.org/."
819 );
820 }
821
822 return Ok(Some(ChainEventType::NetworkRegistryUpdate(
823 node_address,
824 NetworkRegistryStatus::Allowed,
825 )));
826 }
827 HoprNetworkRegistryEvents::Registered(registered) => {
828 let node_address: Address = registered.nodeAddress.into();
829 debug!(
830 %node_address,
831 "Node registered, adding to the network registry",
832 );
833 self.db
834 .set_access_in_network_registry(Some(tx), node_address, true)
835 .await?;
836
837 if node_address == self.chain_key.public().to_address() {
838 info!(
839 "This node has been added to the registry, node can now continue the node activation process on: http://hub.hoprnet.org/."
840 );
841 }
842
843 return Ok(Some(ChainEventType::NetworkRegistryUpdate(
844 node_address,
845 NetworkRegistryStatus::Allowed,
846 )));
847 }
848 HoprNetworkRegistryEvents::EligibilityUpdated(eligibility_updated) => {
849 debug!(
850 staking_account = %eligibility_updated.stakingAccount,
851 eligibility = %eligibility_updated.eligibility,
852 "Node eligibility updated, updating the safe eligibility",
853 );
854 let account: Address = eligibility_updated.stakingAccount.into();
855 self.db
856 .set_safe_eligibility(Some(tx), account, eligibility_updated.eligibility)
857 .await?;
858 }
859 HoprNetworkRegistryEvents::NetworkRegistryStatusUpdated(enabled) => {
860 debug!(enabled = enabled.isEnabled, "on_network_registry_status_updated_event",);
861 self.db
862 .set_network_registry_enabled(Some(tx), enabled.isEnabled)
863 .await?;
864 }
865 _ => {
866 debug!("on_network_registry_event - not implemented for event: {:?}", event);
867 } };
869
870 Ok(None)
871 }
872
873 async fn on_node_safe_registry_event(
874 &self,
875 tx: &OpenTransaction,
876 event: HoprNodeSafeRegistryEvents,
877 _is_synced: bool,
878 ) -> Result<Option<ChainEventType>> {
879 #[cfg(all(feature = "prometheus", not(test)))]
880 METRIC_INDEXER_LOG_COUNTERS.increment(&["safe_registry"]);
881
882 match event {
883 HoprNodeSafeRegistryEvents::RegisteredNodeSafe(registered) => {
884 if self.chain_key.public().to_address() == registered.nodeAddress.into() {
885 info!(safe_address = %registered.safeAddress, "Node safe registered", );
886 return Ok(Some(ChainEventType::NodeSafeRegistered(registered.safeAddress.into())));
888 }
889 }
890 HoprNodeSafeRegistryEvents::DergisteredNodeSafe(deregistered) => {
891 if self.chain_key.public().to_address() == deregistered.nodeAddress.into() {
892 info!("Node safe unregistered");
893 }
895 }
896 HoprNodeSafeRegistryEvents::DomainSeparatorUpdated(domain_separator_updated) => {
897 self.db
898 .set_domain_separator(
899 Some(tx),
900 DomainSeparator::SafeRegistry,
901 domain_separator_updated.domainSeparator.0.into(),
902 )
903 .await?;
904 }
905 }
906
907 Ok(None)
908 }
909
910 async fn on_node_management_module_event(
911 &self,
912 _db: &OpenTransaction,
913 _event: HoprNodeManagementModuleEvents,
914 _is_synced: bool,
915 ) -> Result<Option<ChainEventType>> {
916 #[cfg(all(feature = "prometheus", not(test)))]
917 METRIC_INDEXER_LOG_COUNTERS.increment(&["node_management_module"]);
918
919 Ok(None)
921 }
922
923 async fn on_ticket_winning_probability_oracle_event(
924 &self,
925 tx: &OpenTransaction,
926 event: HoprWinningProbabilityOracleEvents,
927 _is_synced: bool,
928 ) -> Result<Option<ChainEventType>> {
929 #[cfg(all(feature = "prometheus", not(test)))]
930 METRIC_INDEXER_LOG_COUNTERS.increment(&["win_prob_oracle"]);
931
932 match event {
933 HoprWinningProbabilityOracleEvents::WinProbUpdated(update) => {
934 let old_minimum_win_prob: WinningProbability = update.oldWinProb.to_be_bytes().into();
935 let new_minimum_win_prob: WinningProbability = update.newWinProb.to_be_bytes().into();
936
937 trace!(
938 %old_minimum_win_prob,
939 %new_minimum_win_prob,
940 "on_ticket_minimum_win_prob_updated",
941 );
942
943 self.db
944 .set_minimum_incoming_ticket_win_prob(Some(tx), new_minimum_win_prob)
945 .await?;
946
947 info!(
948 %old_minimum_win_prob,
949 %new_minimum_win_prob,
950 "minimum ticket winning probability updated"
951 );
952
953 if old_minimum_win_prob.approx_cmp(&new_minimum_win_prob).is_lt() {
956 let mut selector: Option<TicketSelector> = None;
957 for channel in self.db.get_incoming_channels(tx.into()).await? {
958 selector = selector
959 .map(|s| s.also_on_channel(channel.get_id(), channel.channel_epoch))
960 .or_else(|| Some(TicketSelector::from(channel)));
961 }
962 if let Some(selector) = selector {
964 let num_rejected = self
965 .db
966 .mark_tickets_as(
967 selector.with_winning_probability(..new_minimum_win_prob),
968 TicketMarker::Rejected,
969 )
970 .await?;
971 info!(
972 count = num_rejected,
973 "unredeemed tickets were rejected, because the minimum winning probability has been \
974 increased"
975 );
976 }
977 }
978 }
979 _ => {
980 }
982 }
983 Ok(None)
984 }
985
986 async fn on_ticket_price_oracle_event(
987 &self,
988 tx: &OpenTransaction,
989 event: HoprTicketPriceOracleEvents,
990 _is_synced: bool,
991 ) -> Result<Option<ChainEventType>> {
992 #[cfg(all(feature = "prometheus", not(test)))]
993 METRIC_INDEXER_LOG_COUNTERS.increment(&["price_oracle"]);
994
995 match event {
996 HoprTicketPriceOracleEvents::TicketPriceUpdated(update) => {
997 trace!(
998 old = update._0.to_string(),
999 new = update._1.to_string(),
1000 "on_ticket_price_updated",
1001 );
1002
1003 self.db
1004 .update_ticket_price(Some(tx), HoprBalance::from_be_bytes(update._1.to_be_bytes::<32>()))
1005 .await?;
1006
1007 info!(price = %update._1, "ticket price updated");
1008 }
1009 HoprTicketPriceOracleEvents::OwnershipTransferred(_event) => {
1010 }
1012 }
1013 Ok(None)
1014 }
1015
1016 #[tracing::instrument(level = "debug", skip(self, slog), fields(log=%slog))]
1017 async fn process_log_event(
1018 &self,
1019 tx: &OpenTransaction,
1020 slog: SerializableLog,
1021 is_synced: bool,
1022 ) -> Result<Option<ChainEventType>> {
1023 trace!(log = %slog, "log content");
1024
1025 let log = Log::from(slog.clone());
1026
1027 let primitive_log = alloy::primitives::Log::new(
1028 slog.address.into(),
1029 slog.topics.iter().map(|h| B256::from_slice(h.as_ref())).collect(),
1030 slog.data.clone().into(),
1031 )
1032 .ok_or_else(|| {
1033 CoreEthereumIndexerError::ProcessError(format!("failed to convert log to primitive log: {slog:?}"))
1034 })?;
1035
1036 if log.address.eq(&self.addresses.announcements) {
1037 let bn = log.block_number as u32;
1038 let event = HoprAnnouncementsEvents::decode_log(&primitive_log)?;
1039 self.on_announcement_event(tx, event.data, bn, is_synced).await
1040 } else if log.address.eq(&self.addresses.channels) {
1041 let event = HoprChannelsEvents::decode_log(&primitive_log)?;
1042 match self.on_channel_event(tx, event.data, is_synced).await {
1043 Ok(res) => Ok(res),
1044 Err(CoreEthereumIndexerError::ChannelDoesNotExist) => {
1045 debug!(
1047 ?log,
1048 "channel didn't exist in the db. Created a corrupted channel entry and ignored event"
1049 );
1050 Ok(None)
1051 }
1052 Err(e) => Err(e),
1053 }
1054 } else if log.address.eq(&self.addresses.network_registry) {
1055 let event = HoprNetworkRegistryEvents::decode_log(&primitive_log)?;
1056 self.on_network_registry_event(tx, event.data, is_synced).await
1057 } else if log.address.eq(&self.addresses.token) {
1058 let event = HoprTokenEvents::decode_log(&primitive_log)?;
1059 self.on_token_event(tx, event.data, is_synced).await
1060 } else if log.address.eq(&self.addresses.safe_registry) {
1061 let event = HoprNodeSafeRegistryEvents::decode_log(&primitive_log)?;
1062 self.on_node_safe_registry_event(tx, event.data, is_synced).await
1063 } else if log.address.eq(&self.addresses.module_implementation) {
1064 let event = HoprNodeManagementModuleEvents::decode_log(&primitive_log)?;
1065 self.on_node_management_module_event(tx, event.data, is_synced).await
1066 } else if log.address.eq(&self.addresses.price_oracle) {
1067 let event = HoprTicketPriceOracleEvents::decode_log(&primitive_log)?;
1068 self.on_ticket_price_oracle_event(tx, event.data, is_synced).await
1069 } else if log.address.eq(&self.addresses.win_prob_oracle) {
1070 let event = HoprWinningProbabilityOracleEvents::decode_log(&primitive_log)?;
1071 self.on_ticket_winning_probability_oracle_event(tx, event.data, is_synced)
1072 .await
1073 } else {
1074 #[cfg(all(feature = "prometheus", not(test)))]
1075 METRIC_INDEXER_LOG_COUNTERS.increment(&["unknown"]);
1076
1077 error!(
1078 address = %log.address, log = ?log,
1079 "on_event error - unknown contract address, received log"
1080 );
1081 return Err(CoreEthereumIndexerError::UnknownContract(log.address));
1082 }
1083 }
1084}
1085
1086#[async_trait]
1087impl<T, Db> crate::traits::ChainLogHandler for ContractEventHandlers<T, Db>
1088where
1089 T: HoprIndexerRpcOperations + Clone + Send + Sync + 'static,
1090 Db: HoprDbAllOperations + Clone + Debug + Send + Sync + 'static,
1091{
1092 fn contract_addresses(&self) -> Vec<Address> {
1093 vec![
1094 self.addresses.announcements,
1095 self.addresses.channels,
1096 self.addresses.module_implementation,
1097 self.addresses.network_registry,
1098 self.addresses.price_oracle,
1099 self.addresses.win_prob_oracle,
1100 self.addresses.safe_registry,
1101 self.addresses.token,
1102 ]
1103 }
1104
1105 fn contract_addresses_map(&self) -> Arc<ContractAddresses> {
1106 self.addresses.clone()
1107 }
1108
1109 fn safe_address(&self) -> Address {
1110 self.safe_address
1111 }
1112
1113 fn contract_address_topics(&self, contract: Address) -> Vec<B256> {
1114 if contract.eq(&self.addresses.announcements) {
1115 crate::constants::topics::announcement()
1116 } else if contract.eq(&self.addresses.channels) {
1117 crate::constants::topics::channel()
1118 } else if contract.eq(&self.addresses.module_implementation) {
1119 crate::constants::topics::module_implementation()
1120 } else if contract.eq(&self.addresses.network_registry) {
1121 crate::constants::topics::network_registry()
1122 } else if contract.eq(&self.addresses.price_oracle) {
1123 crate::constants::topics::ticket_price_oracle()
1124 } else if contract.eq(&self.addresses.win_prob_oracle) {
1125 crate::constants::topics::winning_prob_oracle()
1126 } else if contract.eq(&self.addresses.safe_registry) {
1127 crate::constants::topics::node_safe_registry()
1128 } else {
1129 panic!("use of unsupported contract address: {contract}");
1130 }
1131 }
1132
1133 async fn collect_log_event(&self, slog: SerializableLog, is_synced: bool) -> Result<Option<SignificantChainEvent>> {
1134 let myself = self.clone();
1135 self.db
1136 .begin_transaction()
1137 .await?
1138 .perform(move |tx| {
1139 let log = slog.clone();
1140 let tx_hash = Hash::from(log.tx_hash);
1141 let log_id = log.log_index;
1142 let block_id = log.block_number;
1143
1144 Box::pin(async move {
1145 match myself.process_log_event(tx, log, is_synced).await {
1146 Ok(Some(event_type)) => {
1148 let significant_event = SignificantChainEvent { tx_hash, event_type };
1149 debug!(block_id, %tx_hash, log_id, ?significant_event, "indexer got significant_event");
1150 Ok(Some(significant_event))
1151 }
1152 Ok(None) => {
1153 debug!(block_id, %tx_hash, log_id, "no significant event in log");
1154 Ok(None)
1155 }
1156 Err(error) => {
1157 error!(block_id, %tx_hash, log_id, %error, "error processing log in tx");
1158 Err(error)
1159 }
1160 }
1161 })
1162 })
1163 .await
1164 }
1165}
1166
1167#[cfg(test)]
1168mod tests {
1169 use std::{
1170 sync::{Arc, atomic::Ordering},
1171 time::SystemTime,
1172 };
1173
1174 use alloy::{
1175 dyn_abi::DynSolValue,
1176 primitives::{Address as AlloyAddress, U256},
1177 sol_types::{SolEvent, SolValue},
1178 };
1179 use anyhow::{Context, anyhow};
1180 use hex_literal::hex;
1181 use hopr_chain_rpc::HoprIndexerRpcOperations;
1182 use hopr_chain_types::{
1183 ContractAddresses,
1184 chain_events::{ChainEventType, NetworkRegistryStatus},
1185 };
1186 use hopr_crypto_types::prelude::*;
1187 use hopr_db_sql::{
1188 HoprDbAllOperations, HoprDbGeneralModelOperations,
1189 accounts::{ChainOrPacketKey, HoprDbAccountOperations},
1190 api::{info::DomainSeparator, tickets::HoprDbTicketOperations},
1191 channels::HoprDbChannelOperations,
1192 corrupted_channels::HoprDbCorruptedChannelOperations,
1193 db::HoprDb,
1194 info::HoprDbInfoOperations,
1195 prelude::HoprDbResolverOperations,
1196 registry::HoprDbRegistryOperations,
1197 };
1198 use hopr_internal_types::prelude::*;
1199 use hopr_primitive_types::prelude::*;
1200 use multiaddr::Multiaddr;
1201 use primitive_types::H256;
1202
1203 use super::ContractEventHandlers;
1204
1205 lazy_static::lazy_static! {
1206 static ref SELF_PRIV_KEY: OffchainKeypair = OffchainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
1207 static ref COUNTERPARTY_CHAIN_KEY: ChainKeypair = ChainKeypair::random();
1208 static ref COUNTERPARTY_CHAIN_ADDRESS: Address = COUNTERPARTY_CHAIN_KEY.public().to_address();
1209 static ref SELF_CHAIN_KEY: ChainKeypair = ChainKeypair::random();
1210 static ref SELF_CHAIN_ADDRESS: Address = SELF_CHAIN_KEY.public().to_address();
1211 static ref STAKE_ADDRESS: Address = "4331eaa9542b6b034c43090d9ec1c2198758dbc3".parse().expect("lazy static address should be constructible");
1212 static ref CHANNELS_ADDR: Address = "bab20aea98368220baa4e3b7f151273ee71df93b".parse().expect("lazy static address should be constructible"); static ref TOKEN_ADDR: Address = "47d1677e018e79dcdd8a9c554466cb1556fa5007".parse().expect("lazy static address should be constructible"); static ref NETWORK_REGISTRY_ADDR: Address = "a469d0225f884fb989cbad4fe289f6fd2fb98051".parse().expect("lazy static address should be constructible"); static ref NODE_SAFE_REGISTRY_ADDR: Address = "0dcd1bf9a1b36ce34237eeafef220932846bcd82".parse().expect("lazy static address should be constructible"); static ref ANNOUNCEMENTS_ADDR: Address = "11db4791bf45ef31a10ea4a1b5cb90f46cc72c7e".parse().expect("lazy static address should be constructible"); static ref SAFE_MANAGEMENT_MODULE_ADDR: Address = "9b91245a65ad469163a86e32b2281af7a25f38ce".parse().expect("lazy static address should be constructible"); static ref SAFE_INSTANCE_ADDR: Address = "b93d7fdd605fb64fdcc87f21590f950170719d47".parse().expect("lazy static address should be constructible"); static ref TICKET_PRICE_ORACLE_ADDR: Address = "11db4391bf45ef31a10ea4a1b5cb90f46cc72c7e".parse().expect("lazy static address should be constructible"); static ref WIN_PROB_ORACLE_ADDR: Address = "00db4391bf45ef31a10ea4a1b5cb90f46cc64c7e".parse().expect("lazy static address should be constructible"); }
1222
1223 mockall::mock! {
1224 pub(crate) IndexerRpcOperations {}
1225
1226 #[async_trait::async_trait]
1227 impl HoprIndexerRpcOperations for IndexerRpcOperations {
1228 async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64>;
1229
1230 async fn get_hopr_allowance(&self, owner: Address, spender: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
1231
1232 async fn get_xdai_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<XDaiBalance>;
1233
1234 async fn get_hopr_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
1235
1236 fn try_stream_logs<'a>(
1237 &'a self,
1238 start_block_number: u64,
1239 filters: hopr_chain_rpc::FilterSet,
1240 is_synced: bool,
1241 ) -> hopr_chain_rpc::errors::Result<std::pin::Pin<Box<dyn futures::Stream<Item=hopr_chain_rpc::BlockWithLogs> + Send + 'a> > >;
1242 }
1243 }
1244
1245 #[derive(Clone)]
1246 pub struct ClonableMockOperations {
1247 pub inner: Arc<MockIndexerRpcOperations>,
1248 }
1249
1250 #[async_trait::async_trait]
1251 impl HoprIndexerRpcOperations for ClonableMockOperations {
1252 async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64> {
1253 self.inner.block_number().await
1254 }
1255
1256 async fn get_hopr_allowance(
1257 &self,
1258 owner: Address,
1259 spender: Address,
1260 ) -> hopr_chain_rpc::errors::Result<HoprBalance> {
1261 self.inner.get_hopr_allowance(owner, spender).await
1262 }
1263
1264 async fn get_xdai_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<XDaiBalance> {
1265 self.inner.get_xdai_balance(address).await
1266 }
1267
1268 async fn get_hopr_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<HoprBalance> {
1269 self.inner.get_hopr_balance(address).await
1270 }
1271
1272 fn try_stream_logs<'a>(
1273 &'a self,
1274 start_block_number: u64,
1275 filters: hopr_chain_rpc::FilterSet,
1276 is_synced: bool,
1277 ) -> hopr_chain_rpc::errors::Result<
1278 std::pin::Pin<Box<dyn futures::Stream<Item = hopr_chain_rpc::BlockWithLogs> + Send + 'a>>,
1279 > {
1280 self.inner.try_stream_logs(start_block_number, filters, is_synced)
1281 }
1282 }
1283
1284 fn init_handlers<T: Clone, Db: HoprDbAllOperations + Clone>(
1285 rpc_operations: T,
1286 db: Db,
1287 ) -> ContractEventHandlers<T, Db> {
1288 ContractEventHandlers {
1289 addresses: Arc::new(ContractAddresses {
1290 channels: *CHANNELS_ADDR,
1291 token: *TOKEN_ADDR,
1292 network_registry: *NETWORK_REGISTRY_ADDR,
1293 network_registry_proxy: Default::default(),
1294 safe_registry: *NODE_SAFE_REGISTRY_ADDR,
1295 announcements: *ANNOUNCEMENTS_ADDR,
1296 module_implementation: *SAFE_MANAGEMENT_MODULE_ADDR,
1297 price_oracle: *TICKET_PRICE_ORACLE_ADDR,
1298 win_prob_oracle: *WIN_PROB_ORACLE_ADDR,
1299 stake_factory: Default::default(),
1300 }),
1301 chain_key: SELF_CHAIN_KEY.clone(),
1302 safe_address: *STAKE_ADDRESS,
1303 db,
1304 rpc_operations,
1305 }
1306 }
1307
1308 fn test_log() -> SerializableLog {
1309 SerializableLog { ..Default::default() }
1310 }
1311
1312 #[tokio::test]
1313 async fn announce_keybinding() -> anyhow::Result<()> {
1314 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1315
1316 let rpc_operations = MockIndexerRpcOperations::new();
1317 let clonable_rpc_operations = ClonableMockOperations {
1319 inner: Arc::new(rpc_operations),
1320 };
1321
1322 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1323
1324 let keybinding = KeyBinding::new(*SELF_CHAIN_ADDRESS, &SELF_PRIV_KEY);
1325
1326 let keybinding_log = SerializableLog {
1327 address: handlers.addresses.announcements,
1328 topics: vec![
1329 hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::KeyBinding::SIGNATURE_HASH.into(),
1330 ],
1331 data: DynSolValue::Tuple(vec![
1332 DynSolValue::Bytes(keybinding.signature.as_ref().to_vec()),
1333 DynSolValue::Bytes(keybinding.packet_key.as_ref().to_vec()),
1334 DynSolValue::FixedBytes(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref()).into_word(), 32),
1335 ])
1336 .abi_encode_packed(),
1337 ..test_log()
1338 };
1339
1340 let account_entry = AccountEntry {
1341 public_key: *SELF_PRIV_KEY.public(),
1342 chain_addr: *SELF_CHAIN_ADDRESS,
1343 entry_type: AccountType::NotAnnounced,
1344 published_at: 0,
1345 };
1346
1347 let event_type = db
1348 .begin_transaction()
1349 .await?
1350 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, keybinding_log, true).await }))
1351 .await?;
1352
1353 assert!(event_type.is_none(), "keybinding does not have a chain event type");
1354
1355 assert_eq!(
1356 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1357 .await?
1358 .context("a value should be present")?,
1359 account_entry
1360 );
1361 Ok(())
1362 }
1363
1364 #[tokio::test]
1365 async fn announce_address_announcement() -> anyhow::Result<()> {
1366 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1367 let rpc_operations = MockIndexerRpcOperations::new();
1368 let clonable_rpc_operations = ClonableMockOperations {
1370 inner: Arc::new(rpc_operations),
1372 };
1373 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1374
1375 let account_entry = AccountEntry {
1377 public_key: *SELF_PRIV_KEY.public(),
1378 chain_addr: *SELF_CHAIN_ADDRESS,
1379 entry_type: AccountType::NotAnnounced,
1380 published_at: 1,
1381 };
1382 db.insert_account(None, account_entry.clone()).await?;
1383
1384 let test_multiaddr_empty: Multiaddr = "".parse()?;
1385
1386 let address_announcement_empty_log_encoded_data = DynSolValue::Tuple(vec![
1387 DynSolValue::Address(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref())),
1388 DynSolValue::String(test_multiaddr_empty.to_string()),
1389 ])
1390 .abi_encode();
1391
1392 let address_announcement_empty_log = SerializableLog {
1393 address: handlers.addresses.announcements,
1394 topics: vec![
1395 hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH
1396 .into(),
1397 ],
1398 data: address_announcement_empty_log_encoded_data[32..].into(),
1399 ..test_log()
1400 };
1401
1402 let handlers_clone = handlers.clone();
1403 let event_type = db
1404 .begin_transaction()
1405 .await?
1406 .perform(|tx| {
1407 Box::pin(async move {
1408 handlers_clone
1409 .process_log_event(tx, address_announcement_empty_log, true)
1410 .await
1411 })
1412 })
1413 .await?;
1414
1415 assert!(
1416 event_type.is_none(),
1417 "announcement of empty multiaddresses must pass through"
1418 );
1419
1420 assert_eq!(
1421 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1422 .await?
1423 .context("a value should be present")?,
1424 account_entry
1425 );
1426
1427 let test_multiaddr: Multiaddr = "/ip4/1.2.3.4/tcp/56".parse()?;
1428
1429 let address_announcement_log_encoded_data = DynSolValue::Tuple(vec![
1430 DynSolValue::Address(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref())),
1431 DynSolValue::String(test_multiaddr.to_string()),
1432 ])
1433 .abi_encode();
1434
1435 let address_announcement_log = SerializableLog {
1436 address: handlers.addresses.announcements,
1437 block_number: 1,
1438 topics: vec![
1439 hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH
1440 .into(),
1441 ],
1442 data: address_announcement_log_encoded_data[32..].into(),
1443 ..test_log()
1444 };
1445
1446 let announced_account_entry = AccountEntry {
1447 public_key: *SELF_PRIV_KEY.public(),
1448 chain_addr: *SELF_CHAIN_ADDRESS,
1449 entry_type: AccountType::Announced {
1450 multiaddr: test_multiaddr.clone(),
1451 updated_block: 1,
1452 },
1453 published_at: 1,
1454 };
1455
1456 let handlers_clone = handlers.clone();
1457 let event_type = db
1458 .begin_transaction()
1459 .await?
1460 .perform(|tx| {
1461 Box::pin(async move {
1462 handlers_clone
1463 .process_log_event(tx, address_announcement_log, true)
1464 .await
1465 })
1466 })
1467 .await?;
1468
1469 assert!(
1470 matches!(event_type, Some(ChainEventType::Announcement { multiaddresses,.. }) if multiaddresses == vec![test_multiaddr]),
1471 "must return the latest announce multiaddress"
1472 );
1473
1474 assert_eq!(
1475 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1476 .await?
1477 .context("a value should be present")?,
1478 announced_account_entry
1479 );
1480
1481 assert_eq!(
1482 Some(*SELF_CHAIN_ADDRESS),
1483 db.resolve_chain_key(SELF_PRIV_KEY.public()).await?,
1484 "must resolve correct chain key"
1485 );
1486
1487 assert_eq!(
1488 Some(*SELF_PRIV_KEY.public()),
1489 db.resolve_packet_key(&SELF_CHAIN_ADDRESS).await?,
1490 "must resolve correct packet key"
1491 );
1492
1493 let test_multiaddr_dns: Multiaddr = "/dns4/useful.domain/tcp/56".parse()?;
1494
1495 let address_announcement_dns_log_encoded_data = DynSolValue::Tuple(vec![
1496 DynSolValue::Address(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref())),
1497 DynSolValue::String(test_multiaddr_dns.to_string()),
1498 ])
1499 .abi_encode();
1500
1501 let address_announcement_dns_log = SerializableLog {
1502 address: handlers.addresses.announcements,
1503 block_number: 2,
1504 topics: vec![
1505 hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH
1506 .into(),
1507 ],
1508 data: address_announcement_dns_log_encoded_data[32..].into(),
1509 ..test_log()
1510 };
1511
1512 let announced_dns_account_entry = AccountEntry {
1513 public_key: *SELF_PRIV_KEY.public(),
1514 chain_addr: *SELF_CHAIN_ADDRESS,
1515 entry_type: AccountType::Announced {
1516 multiaddr: test_multiaddr_dns.clone(),
1517 updated_block: 2,
1518 },
1519 published_at: 1,
1520 };
1521
1522 let event_type = db
1523 .begin_transaction()
1524 .await?
1525 .perform(|tx| {
1526 Box::pin(async move { handlers.process_log_event(tx, address_announcement_dns_log, true).await })
1527 })
1528 .await?;
1529
1530 assert!(
1531 matches!(event_type, Some(ChainEventType::Announcement { multiaddresses,.. }) if multiaddresses == vec![test_multiaddr_dns]),
1532 "must return the latest announce multiaddress"
1533 );
1534
1535 assert_eq!(
1536 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1537 .await?
1538 .context("a value should be present")?,
1539 announced_dns_account_entry
1540 );
1541
1542 assert_eq!(
1543 Some(*SELF_CHAIN_ADDRESS),
1544 db.resolve_chain_key(SELF_PRIV_KEY.public()).await?,
1545 "must resolve correct chain key"
1546 );
1547
1548 assert_eq!(
1549 Some(*SELF_PRIV_KEY.public()),
1550 db.resolve_packet_key(&SELF_CHAIN_ADDRESS).await?,
1551 "must resolve correct packet key"
1552 );
1553 Ok(())
1554 }
1555
1556 #[tokio::test]
1557 async fn announce_revoke() -> anyhow::Result<()> {
1558 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1559 let rpc_operations = MockIndexerRpcOperations::new();
1560 let clonable_rpc_operations = ClonableMockOperations {
1562 inner: Arc::new(rpc_operations),
1564 };
1565 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1566
1567 let test_multiaddr: Multiaddr = "/ip4/1.2.3.4/tcp/56".parse()?;
1568
1569 let announced_account_entry = AccountEntry {
1571 public_key: *SELF_PRIV_KEY.public(),
1572 chain_addr: *SELF_CHAIN_ADDRESS,
1573 entry_type: AccountType::Announced {
1574 multiaddr: test_multiaddr,
1575 updated_block: 0,
1576 },
1577 published_at: 1,
1578 };
1579
1580 db.insert_account(None, announced_account_entry).await?;
1581
1582 let encoded_data = (AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref()),).abi_encode();
1583
1584 let revoke_announcement_log = SerializableLog {
1585 address: handlers.addresses.announcements,
1586 topics: vec![
1587 hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::RevokeAnnouncement::SIGNATURE_HASH
1588 .into(),
1589 ],
1590 data: encoded_data,
1591 ..test_log()
1592 };
1593
1594 let account_entry = AccountEntry {
1595 public_key: *SELF_PRIV_KEY.public(),
1596 chain_addr: *SELF_CHAIN_ADDRESS,
1597 entry_type: AccountType::NotAnnounced,
1598 published_at: 1,
1599 };
1600
1601 let event_type = db
1602 .begin_transaction()
1603 .await?
1604 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, revoke_announcement_log, true).await }))
1605 .await?;
1606
1607 assert!(
1608 event_type.is_none(),
1609 "revoke announcement does not have chain event type"
1610 );
1611
1612 assert_eq!(
1613 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1614 .await?
1615 .context("a value should be present")?,
1616 account_entry
1617 );
1618 Ok(())
1619 }
1620
1621 #[tokio::test]
1622 async fn on_token_transfer_to() -> anyhow::Result<()> {
1623 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1624
1625 let value = U256::MAX;
1626 let target_hopr_balance = HoprBalance::from(primitive_types::U256::from_big_endian(
1627 value.to_be_bytes_vec().as_slice(),
1628 ));
1629
1630 let mut rpc_operations = MockIndexerRpcOperations::new();
1631 rpc_operations
1632 .expect_get_hopr_balance()
1633 .times(1)
1634 .return_once(move |_| Ok(target_hopr_balance));
1635 rpc_operations
1636 .expect_get_hopr_allowance()
1637 .times(1)
1638 .returning(move |_, _| Ok(HoprBalance::from(primitive_types::U256::from(1000u64))));
1639 let clonable_rpc_operations = ClonableMockOperations {
1640 inner: Arc::new(rpc_operations),
1641 };
1642
1643 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1644
1645 let encoded_data = (value).abi_encode();
1646
1647 let transferred_log = SerializableLog {
1648 address: handlers.addresses.token,
1649 topics: vec![
1650 hopr_bindings::hoprtoken::HoprToken::Transfer::SIGNATURE_HASH.into(),
1651 H256::from_slice(&Address::default().to_bytes32()).into(),
1652 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1653 ],
1654 data: encoded_data,
1655 ..test_log()
1656 };
1657
1658 let event_type = db
1659 .begin_transaction()
1660 .await?
1661 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, transferred_log, true).await }))
1662 .await?;
1663
1664 assert!(event_type.is_none(), "token transfer does not have chain event type");
1665
1666 assert_eq!(db.get_safe_hopr_balance(None).await?, target_hopr_balance);
1667
1668 Ok(())
1669 }
1670
1671 #[test_log::test(tokio::test)]
1672 async fn on_token_transfer_from() -> anyhow::Result<()> {
1673 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1674
1675 let mut rpc_operations = MockIndexerRpcOperations::new();
1676 rpc_operations
1677 .expect_get_hopr_balance()
1678 .times(1)
1679 .return_once(|_| Ok(HoprBalance::zero()));
1680 rpc_operations
1681 .expect_get_hopr_allowance()
1682 .times(1)
1683 .returning(move |_, _| Ok(HoprBalance::from(primitive_types::U256::from(1000u64))));
1684 let clonable_rpc_operations = ClonableMockOperations {
1685 inner: Arc::new(rpc_operations),
1686 };
1687
1688 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1689
1690 let value = U256::MAX;
1691
1692 let encoded_data = (value).abi_encode();
1693
1694 db.set_safe_hopr_balance(
1695 None,
1696 HoprBalance::from(primitive_types::U256::from_big_endian(
1697 value.to_be_bytes_vec().as_slice(),
1698 )),
1699 )
1700 .await?;
1701
1702 let transferred_log = SerializableLog {
1703 address: handlers.addresses.token,
1704 topics: vec![
1705 hopr_bindings::hoprtoken::HoprToken::Transfer::SIGNATURE_HASH.into(),
1706 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1707 H256::from_slice(&Address::default().to_bytes32()).into(),
1708 ],
1709 data: encoded_data,
1710 ..test_log()
1711 };
1712
1713 let event_type = db
1714 .begin_transaction()
1715 .await?
1716 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, transferred_log, true).await }))
1717 .await?;
1718
1719 assert!(event_type.is_none(), "token transfer does not have chain event type");
1720
1721 assert_eq!(db.get_safe_hopr_balance(None).await?, HoprBalance::zero());
1722
1723 Ok(())
1724 }
1725
1726 #[tokio::test]
1727 async fn on_token_approval_correct() -> anyhow::Result<()> {
1728 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1729
1730 let target_allowance = HoprBalance::from(primitive_types::U256::from(1000u64));
1731 let mut rpc_operations = MockIndexerRpcOperations::new();
1732 rpc_operations
1733 .expect_get_hopr_allowance()
1734 .times(2)
1735 .returning(move |_, _| Ok(target_allowance));
1736 let clonable_rpc_operations = ClonableMockOperations {
1737 inner: Arc::new(rpc_operations),
1738 };
1739
1740 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1741
1742 let encoded_data = (U256::from(1000u64)).abi_encode();
1743
1744 let approval_log = SerializableLog {
1745 address: handlers.addresses.token,
1746 topics: vec![
1747 hopr_bindings::hoprtoken::HoprToken::Approval::SIGNATURE_HASH.into(),
1748 H256::from_slice(&handlers.safe_address.to_bytes32()).into(),
1749 H256::from_slice(&handlers.addresses.channels.to_bytes32()).into(),
1750 ],
1751 data: encoded_data,
1752 ..test_log()
1753 };
1754
1755 assert_eq!(db.get_safe_hopr_allowance(None).await?, HoprBalance::zero());
1757
1758 let approval_log_clone = approval_log.clone();
1759 let handlers_clone = handlers.clone();
1760 let event_type = db
1761 .begin_transaction()
1762 .await?
1763 .perform(|tx| Box::pin(async move { handlers_clone.process_log_event(tx, approval_log_clone, true).await }))
1764 .await?;
1765
1766 assert!(event_type.is_none(), "token approval does not have chain event type");
1767
1768 assert_eq!(db.get_safe_hopr_allowance(None).await?, target_allowance.clone());
1770
1771 let _ = db
1773 .set_safe_hopr_allowance(None, HoprBalance::from(primitive_types::U256::from(10u64)))
1774 .await;
1775 assert_eq!(
1776 db.get_safe_hopr_allowance(None).await?,
1777 HoprBalance::from(primitive_types::U256::from(10u64))
1778 );
1779
1780 let handlers_clone = handlers.clone();
1781 let event_type = db
1782 .begin_transaction()
1783 .await?
1784 .perform(|tx| Box::pin(async move { handlers_clone.process_log_event(tx, approval_log, true).await }))
1785 .await?;
1786
1787 assert!(event_type.is_none(), "token approval does not have chain event type");
1788
1789 assert_eq!(db.get_safe_hopr_allowance(None).await?, target_allowance);
1790 Ok(())
1791 }
1792
1793 #[tokio::test]
1794 async fn on_network_registry_event_registered() -> anyhow::Result<()> {
1795 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1796 let rpc_operations = MockIndexerRpcOperations::new();
1797 let clonable_rpc_operations = ClonableMockOperations {
1799 inner: Arc::new(rpc_operations),
1801 };
1802 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1803
1804 db.set_network_registry_enabled(None, true).await?;
1805
1806 let encoded_data = ().abi_encode();
1807
1808 let registered_log = SerializableLog {
1809 address: handlers.addresses.network_registry,
1810 topics: vec![
1811 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::Registered::SIGNATURE_HASH.into(),
1812 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1813 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1814 ],
1815 data: encoded_data,
1816 ..test_log()
1818 };
1819
1820 assert!(
1821 !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1822 .await?
1823 );
1824
1825 let event_type = db
1826 .begin_transaction()
1827 .await?
1828 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1829 .await?;
1830
1831 assert!(
1832 matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Allowed),
1833 "must return correct NR update"
1834 );
1835
1836 assert!(
1837 db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1838 .await?,
1839 "must be allowed in NR"
1840 );
1841 Ok(())
1842 }
1843
1844 #[tokio::test]
1845 async fn on_network_registry_event_registered_by_manager() -> anyhow::Result<()> {
1846 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1847 let rpc_operations = MockIndexerRpcOperations::new();
1848 let clonable_rpc_operations = ClonableMockOperations {
1850 inner: Arc::new(rpc_operations),
1852 };
1853 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1854
1855 db.set_network_registry_enabled(None, true).await?;
1856
1857 let registered_log = SerializableLog {
1858 address: handlers.addresses.network_registry,
1859 topics: vec![
1860 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::RegisteredByManager::SIGNATURE_HASH.into(),
1861 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1863 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1864 ],
1865 data: ().abi_encode(),
1866 ..test_log()
1868 };
1869
1870 assert!(
1871 !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1872 .await?
1873 );
1874
1875 let event_type = db
1876 .begin_transaction()
1877 .await?
1878 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1879 .await?;
1880
1881 assert!(
1882 matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Allowed),
1883 "must return correct NR update"
1884 );
1885
1886 assert!(
1887 db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1888 .await?,
1889 "must be allowed in NR"
1890 );
1891 Ok(())
1892 }
1893
1894 #[tokio::test]
1895 async fn on_network_registry_event_deregistered() -> anyhow::Result<()> {
1896 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1897 let rpc_operations = MockIndexerRpcOperations::new();
1898 let clonable_rpc_operations = ClonableMockOperations {
1900 inner: Arc::new(rpc_operations),
1902 };
1903 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1904
1905 db.set_network_registry_enabled(None, true).await?;
1906
1907 db.set_access_in_network_registry(None, *SELF_CHAIN_ADDRESS, true)
1908 .await?;
1909
1910 let encoded_data = ().abi_encode();
1911
1912 let registered_log = SerializableLog {
1913 address: handlers.addresses.network_registry,
1914 topics: vec![
1915 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::Deregistered::SIGNATURE_HASH.into(),
1916 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1917 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1918 ],
1919 data: encoded_data,
1920 ..test_log()
1921 };
1922
1923 assert!(
1924 db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1925 .await?
1926 );
1927
1928 let event_type = db
1929 .begin_transaction()
1930 .await?
1931 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1932 .await?;
1933
1934 assert!(
1935 matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Denied),
1936 "must return correct NR update"
1937 );
1938
1939 assert!(
1940 !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1941 .await?,
1942 "must not be allowed in NR"
1943 );
1944 Ok(())
1945 }
1946
1947 #[tokio::test]
1948 async fn on_network_registry_event_deregistered_by_manager() -> anyhow::Result<()> {
1949 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1950 let rpc_operations = MockIndexerRpcOperations::new();
1951 let clonable_rpc_operations = ClonableMockOperations {
1953 inner: Arc::new(rpc_operations),
1955 };
1956 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1957
1958 db.set_network_registry_enabled(None, true).await?;
1959
1960 db.set_access_in_network_registry(None, *SELF_CHAIN_ADDRESS, true)
1961 .await?;
1962
1963 let encoded_data = ().abi_encode();
1964
1965 let registered_log = SerializableLog {
1966 address: handlers.addresses.network_registry,
1967 topics: vec![
1968 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::DeregisteredByManager::SIGNATURE_HASH.into(),
1969 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1971 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1972 ],
1973 data: encoded_data,
1974 ..test_log()
1975 };
1976
1977 assert!(
1978 db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1979 .await?
1980 );
1981
1982 let event_type = db
1983 .begin_transaction()
1984 .await?
1985 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1986 .await?;
1987
1988 assert!(
1989 matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Denied),
1990 "must return correct NR update"
1991 );
1992
1993 assert!(
1994 !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1995 .await?,
1996 "must not be allowed in NR"
1997 );
1998 Ok(())
1999 }
2000
2001 #[tokio::test]
2002 async fn on_network_registry_event_enabled() -> anyhow::Result<()> {
2003 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2004 let rpc_operations = MockIndexerRpcOperations::new();
2005 let clonable_rpc_operations = ClonableMockOperations {
2007 inner: Arc::new(rpc_operations),
2009 };
2010 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2011
2012 let encoded_data = ().abi_encode();
2013
2014 let nr_enabled = SerializableLog {
2015 address: handlers.addresses.network_registry,
2016 topics: vec![
2017 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::NetworkRegistryStatusUpdated::SIGNATURE_HASH
2018 .into(),
2019 H256::from_low_u64_be(1).into(),
2021 ],
2022 data: encoded_data,
2023 ..test_log()
2024 };
2025
2026 let event_type = db
2027 .begin_transaction()
2028 .await?
2029 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, nr_enabled, false).await }))
2030 .await?;
2031
2032 assert!(event_type.is_none(), "there's no chain event type for nr disable");
2033
2034 assert!(db.get_indexer_data(None).await?.nr_enabled);
2035 Ok(())
2036 }
2037
2038 #[tokio::test]
2039 async fn on_network_registry_event_disabled() -> anyhow::Result<()> {
2040 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2041 let rpc_operations = MockIndexerRpcOperations::new();
2042 let clonable_rpc_operations = ClonableMockOperations {
2044 inner: Arc::new(rpc_operations),
2046 };
2047 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2048
2049 db.set_network_registry_enabled(None, true).await?;
2050
2051 let encoded_data = ().abi_encode();
2052
2053 let nr_disabled = SerializableLog {
2054 address: handlers.addresses.network_registry,
2055 topics: vec![
2056 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::NetworkRegistryStatusUpdated::SIGNATURE_HASH
2057 .into(),
2058 H256::from_low_u64_be(0).into(),
2060 ],
2061 data: encoded_data,
2062 ..test_log()
2063 };
2064
2065 let event_type = db
2066 .begin_transaction()
2067 .await?
2068 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, nr_disabled, false).await }))
2069 .await?;
2070
2071 assert!(event_type.is_none(), "there's no chain event type for nr enable");
2072
2073 assert!(!db.get_indexer_data(None).await?.nr_enabled);
2074 Ok(())
2075 }
2076
2077 #[tokio::test]
2078 async fn on_network_registry_set_eligible() -> anyhow::Result<()> {
2079 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2080 let rpc_operations = MockIndexerRpcOperations::new();
2081 let clonable_rpc_operations = ClonableMockOperations {
2083 inner: Arc::new(rpc_operations),
2085 };
2086 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2087
2088 let encoded_data = ().abi_encode();
2089
2090 let set_eligible = SerializableLog {
2091 address: handlers.addresses.network_registry,
2092 topics: vec![
2093 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::EligibilityUpdated::SIGNATURE_HASH.into(),
2094 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
2096 H256::from_low_u64_be(1).into(),
2097 ],
2098 data: encoded_data,
2099 ..test_log()
2100 };
2101
2102 let event_type = db
2103 .begin_transaction()
2104 .await?
2105 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, set_eligible, false).await }))
2106 .await?;
2107
2108 assert!(
2109 event_type.is_none(),
2110 "there's no chain event type for setting nr eligibility"
2111 );
2112
2113 assert!(db.is_safe_eligible(None, *STAKE_ADDRESS).await?);
2114
2115 Ok(())
2116 }
2117
2118 #[tokio::test]
2119 async fn on_network_registry_set_not_eligible() -> anyhow::Result<()> {
2120 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2121 let rpc_operations = MockIndexerRpcOperations::new();
2122 let clonable_rpc_operations = ClonableMockOperations {
2124 inner: Arc::new(rpc_operations),
2126 };
2127 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2128
2129 db.set_safe_eligibility(None, *STAKE_ADDRESS, false).await?;
2130
2131 let encoded_data = ().abi_encode();
2132
2133 let set_eligible = SerializableLog {
2134 address: handlers.addresses.network_registry,
2135 topics: vec![
2136 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::EligibilityUpdated::SIGNATURE_HASH.into(),
2137 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
2139 H256::from_low_u64_be(0).into(),
2140 ],
2141 data: encoded_data,
2142 ..test_log()
2143 };
2144
2145 let event_type = db
2146 .begin_transaction()
2147 .await?
2148 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, set_eligible, true).await }))
2149 .await?;
2150
2151 assert!(
2152 event_type.is_none(),
2153 "there's no chain event type for unsetting nr eligibility"
2154 );
2155
2156 assert!(!db.is_safe_eligible(None, *STAKE_ADDRESS).await?);
2157
2158 Ok(())
2159 }
2160
2161 #[tokio::test]
2162 async fn on_channel_event_balance_increased() -> anyhow::Result<()> {
2163 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2164
2165 let value = U256::MAX;
2166 let target_hopr_balance = HoprBalance::from(primitive_types::U256::from_big_endian(
2167 value.to_be_bytes_vec().as_slice(),
2168 ));
2169
2170 let mut rpc_operations = MockIndexerRpcOperations::new();
2171 rpc_operations
2172 .expect_get_hopr_balance()
2173 .times(1)
2174 .return_once(move |_| Ok(target_hopr_balance));
2175 rpc_operations
2176 .expect_get_hopr_allowance()
2177 .times(1)
2178 .returning(move |_, _| Ok(HoprBalance::from(primitive_types::U256::from(1000u64))));
2179 let clonable_rpc_operations = ClonableMockOperations {
2180 inner: Arc::new(rpc_operations),
2181 };
2182 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2183
2184 let channel = ChannelEntry::new(
2185 *SELF_CHAIN_ADDRESS,
2186 *COUNTERPARTY_CHAIN_ADDRESS,
2187 0.into(),
2188 primitive_types::U256::zero(),
2189 ChannelStatus::Open,
2190 primitive_types::U256::one(),
2191 );
2192
2193 db.upsert_channel(None, channel).await?;
2194
2195 let solidity_balance: HoprBalance = primitive_types::U256::from((1u128 << 96) - 1).into();
2196 let diff = solidity_balance - channel.balance;
2197
2198 let encoded_data = (solidity_balance.amount().to_be_bytes()).abi_encode();
2199
2200 let balance_increased_log = SerializableLog {
2201 address: handlers.addresses.channels,
2202 topics: vec![
2203 hopr_bindings::hoprchannels::HoprChannels::ChannelBalanceIncreased::SIGNATURE_HASH.into(),
2204 H256::from_slice(channel.get_id().as_ref()).into(),
2206 ],
2207 data: encoded_data,
2208 ..test_log()
2209 };
2210
2211 let event_type = db
2212 .begin_transaction()
2213 .await?
2214 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_increased_log, true).await }))
2215 .await?;
2216
2217 let channel = db
2218 .get_channel_by_id(None, &channel.get_id())
2219 .await?
2220 .context("a value should be present")?;
2221
2222 assert!(
2223 matches!(event_type, Some(ChainEventType::ChannelBalanceIncreased(c, b)) if c == channel && b == diff),
2224 "must return updated channel entry and balance diff"
2225 );
2226
2227 assert_eq!(solidity_balance, channel.balance, "balance must be updated");
2228 Ok(())
2229 }
2230
2231 #[tokio::test]
2232 async fn on_channel_event_domain_separator_updated() -> anyhow::Result<()> {
2233 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2234 let rpc_operations = MockIndexerRpcOperations::new();
2235 let clonable_rpc_operations = ClonableMockOperations {
2237 inner: Arc::new(rpc_operations),
2239 };
2240 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2241
2242 let separator = Hash::from(hopr_crypto_random::random_bytes());
2243
2244 let encoded_data = ().abi_encode();
2245
2246 let channels_dst_updated = SerializableLog {
2247 address: handlers.addresses.channels,
2248 topics: vec![
2249 hopr_bindings::hoprchannels::HoprChannels::DomainSeparatorUpdated::SIGNATURE_HASH.into(),
2250 H256::from_slice(separator.as_ref()).into(),
2252 ],
2253 data: encoded_data,
2254 ..test_log()
2255 };
2256
2257 assert!(db.get_indexer_data(None).await?.channels_dst.is_none());
2258
2259 let event_type = db
2260 .begin_transaction()
2261 .await?
2262 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channels_dst_updated, true).await }))
2263 .await?;
2264
2265 assert!(
2266 event_type.is_none(),
2267 "there's no chain event type for channel dst update"
2268 );
2269
2270 assert_eq!(
2271 separator,
2272 db.get_indexer_data(None)
2273 .await?
2274 .channels_dst
2275 .context("a value should be present")?,
2276 "separator must be updated"
2277 );
2278 Ok(())
2279 }
2280
2281 #[tokio::test]
2282 async fn on_channel_event_balance_decreased() -> anyhow::Result<()> {
2283 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2284
2285 let value = U256::MAX;
2286 let target_hopr_balance = HoprBalance::from(primitive_types::U256::from_big_endian(
2287 value.to_be_bytes_vec().as_slice(),
2288 ));
2289
2290 let mut rpc_operations = MockIndexerRpcOperations::new();
2291 rpc_operations
2292 .expect_get_hopr_balance()
2293 .times(1)
2294 .return_once(move |_| Ok(target_hopr_balance));
2295 let clonable_rpc_operations = ClonableMockOperations {
2296 inner: Arc::new(rpc_operations),
2297 };
2298 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2299
2300 let channel = ChannelEntry::new(
2301 *SELF_CHAIN_ADDRESS,
2302 *COUNTERPARTY_CHAIN_ADDRESS,
2303 HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1)),
2304 primitive_types::U256::zero(),
2305 ChannelStatus::Open,
2306 primitive_types::U256::one(),
2307 );
2308
2309 db.upsert_channel(None, channel).await?;
2310
2311 let solidity_balance: HoprBalance = primitive_types::U256::from((1u128 << 96) - 2).into();
2312 let diff = channel.balance - solidity_balance;
2313
2314 let encoded_data = DynSolValue::Tuple(vec![DynSolValue::Uint(
2316 U256::from_be_slice(&solidity_balance.amount().to_be_bytes()),
2317 256,
2318 )])
2319 .abi_encode();
2320
2321 let balance_decreased_log = SerializableLog {
2322 address: handlers.addresses.channels,
2323 topics: vec![
2324 hopr_bindings::hoprchannels::HoprChannels::ChannelBalanceDecreased::SIGNATURE_HASH.into(),
2325 H256::from_slice(channel.get_id().as_ref()).into(),
2327 ],
2328 data: encoded_data,
2329 ..test_log()
2330 };
2331
2332 let event_type = db
2333 .begin_transaction()
2334 .await?
2335 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_decreased_log, true).await }))
2336 .await?;
2337
2338 let channel = db
2339 .get_channel_by_id(None, &channel.get_id())
2340 .await?
2341 .context("a value should be present")?;
2342
2343 assert!(
2344 matches!(event_type, Some(ChainEventType::ChannelBalanceDecreased(c, b)) if c == channel && b == diff),
2345 "must return updated channel entry and balance diff"
2346 );
2347
2348 assert_eq!(solidity_balance, channel.balance, "balance must be updated");
2349 Ok(())
2350 }
2351
2352 #[tokio::test]
2353 async fn on_channel_closed() -> anyhow::Result<()> {
2354 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2355 let rpc_operations = MockIndexerRpcOperations::new();
2356 let clonable_rpc_operations = ClonableMockOperations {
2358 inner: Arc::new(rpc_operations),
2360 };
2361 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2362
2363 let starting_balance = HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1));
2364
2365 let channel = ChannelEntry::new(
2366 *SELF_CHAIN_ADDRESS,
2367 *COUNTERPARTY_CHAIN_ADDRESS,
2368 starting_balance,
2369 primitive_types::U256::zero(),
2370 ChannelStatus::Open,
2371 primitive_types::U256::one(),
2372 );
2373
2374 db.upsert_channel(None, channel).await?;
2375
2376 let encoded_data = ().abi_encode();
2377
2378 let channel_closed_log = SerializableLog {
2379 address: handlers.addresses.channels,
2380 topics: vec![
2381 hopr_bindings::hoprchannels::HoprChannels::ChannelClosed::SIGNATURE_HASH.into(),
2382 H256::from_slice(channel.get_id().as_ref()).into(),
2384 ],
2385 data: encoded_data,
2386 ..test_log()
2387 };
2388
2389 let event_type = db
2390 .begin_transaction()
2391 .await?
2392 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_closed_log, true).await }))
2393 .await?;
2394
2395 let closed_channel = db
2396 .get_channel_by_id(None, &channel.get_id())
2397 .await?
2398 .context("a value should be present")?;
2399
2400 assert!(
2401 matches!(event_type, Some(ChainEventType::ChannelClosed(c)) if c == closed_channel),
2402 "must return the updated channel entry"
2403 );
2404
2405 assert_eq!(closed_channel.status, ChannelStatus::Closed);
2406 assert_eq!(closed_channel.ticket_index, 0u64.into());
2407 assert_eq!(
2408 0,
2409 db.get_outgoing_ticket_index(closed_channel.get_id())
2410 .await?
2411 .load(Ordering::Relaxed)
2412 );
2413
2414 assert!(closed_channel.balance.amount().eq(&primitive_types::U256::zero()));
2415 Ok(())
2416 }
2417
2418 #[tokio::test]
2419 async fn on_foreign_channel_closed() -> anyhow::Result<()> {
2420 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2421 let rpc_operations = MockIndexerRpcOperations::new();
2422 let clonable_rpc_operations = ClonableMockOperations {
2424 inner: Arc::new(rpc_operations),
2426 };
2427 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2428
2429 let starting_balance = HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1));
2430
2431 let channel = ChannelEntry::new(
2432 Address::new(&hex!("B7397C218766eBe6A1A634df523A1a7e412e67eA")),
2433 Address::new(&hex!("D4fdec44DB9D44B8f2b6d529620f9C0C7066A2c1")),
2434 starting_balance,
2435 primitive_types::U256::zero(),
2436 ChannelStatus::Open,
2437 primitive_types::U256::one(),
2438 );
2439
2440 db.upsert_channel(None, channel).await?;
2441
2442 let encoded_data = ().abi_encode();
2443
2444 let channel_closed_log = SerializableLog {
2445 address: handlers.addresses.channels,
2446 topics: vec![
2447 hopr_bindings::hoprchannels::HoprChannels::ChannelClosed::SIGNATURE_HASH.into(),
2448 H256::from_slice(channel.get_id().as_ref()).into(),
2450 ],
2451 data: encoded_data,
2452 ..test_log()
2453 };
2454
2455 let event_type = db
2456 .begin_transaction()
2457 .await?
2458 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_closed_log, true).await }))
2459 .await?;
2460
2461 let closed_channel = db.get_channel_by_id(None, &channel.get_id()).await?;
2462
2463 assert_eq!(None, closed_channel, "foreign channel must be deleted");
2464
2465 assert!(
2466 matches!(event_type, Some(ChainEventType::ChannelClosed(c)) if c.get_id() == channel.get_id()),
2467 "must return the closed channel entry"
2468 );
2469
2470 Ok(())
2471 }
2472
2473 #[tokio::test]
2474 async fn on_channel_opened() -> anyhow::Result<()> {
2475 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2476 let rpc_operations = MockIndexerRpcOperations::new();
2477 let clonable_rpc_operations = ClonableMockOperations {
2479 inner: Arc::new(rpc_operations),
2481 };
2482 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2483
2484 let channel_id = generate_channel_id(&SELF_CHAIN_ADDRESS, &COUNTERPARTY_CHAIN_ADDRESS);
2485
2486 let encoded_data = ().abi_encode();
2487
2488 let channel_opened_log = SerializableLog {
2489 address: handlers.addresses.channels,
2490 topics: vec![
2491 hopr_bindings::hoprchannels::HoprChannels::ChannelOpened::SIGNATURE_HASH.into(),
2492 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2494 H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2495 ],
2496 data: encoded_data,
2497 ..test_log()
2498 };
2499
2500 let event_type = db
2501 .begin_transaction()
2502 .await?
2503 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log, true).await }))
2504 .await?;
2505
2506 let channel = db
2507 .get_channel_by_id(None, &channel_id)
2508 .await?
2509 .context("a value should be present")?;
2510
2511 assert!(
2512 matches!(event_type, Some(ChainEventType::ChannelOpened(c)) if c == channel),
2513 "must return the updated channel entry"
2514 );
2515
2516 assert_eq!(channel.status, ChannelStatus::Open);
2517 assert_eq!(channel.channel_epoch, 1u64.into());
2518 assert_eq!(channel.ticket_index, 0u64.into());
2519 assert_eq!(
2520 0,
2521 db.get_outgoing_ticket_index(channel.get_id())
2522 .await?
2523 .load(Ordering::Relaxed)
2524 );
2525 Ok(())
2526 }
2527
2528 #[tokio::test]
2529 async fn on_channel_reopened() -> anyhow::Result<()> {
2530 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2531 let rpc_operations = MockIndexerRpcOperations::new();
2532 let clonable_rpc_operations = ClonableMockOperations {
2534 inner: Arc::new(rpc_operations),
2536 };
2537 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2538
2539 let channel = ChannelEntry::new(
2540 *SELF_CHAIN_ADDRESS,
2541 *COUNTERPARTY_CHAIN_ADDRESS,
2542 HoprBalance::zero(),
2543 primitive_types::U256::zero(),
2544 ChannelStatus::Closed,
2545 3.into(),
2546 );
2547
2548 db.upsert_channel(None, channel).await?;
2549
2550 let encoded_data = ().abi_encode();
2551
2552 let channel_opened_log = SerializableLog {
2553 address: handlers.addresses.channels,
2554 topics: vec![
2555 hopr_bindings::hoprchannels::HoprChannels::ChannelOpened::SIGNATURE_HASH.into(),
2556 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2558 H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2559 ],
2560 data: encoded_data,
2561 ..test_log()
2562 };
2563
2564 let event_type = db
2565 .begin_transaction()
2566 .await?
2567 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log, true).await }))
2568 .await?;
2569
2570 let channel = db
2571 .get_channel_by_id(None, &channel.get_id())
2572 .await?
2573 .context("a value should be present")?;
2574
2575 assert!(
2576 matches!(event_type, Some(ChainEventType::ChannelOpened(c)) if c == channel),
2577 "must return the updated channel entry"
2578 );
2579
2580 assert_eq!(channel.status, ChannelStatus::Open);
2581 assert_eq!(channel.channel_epoch, 4u64.into());
2582 assert_eq!(channel.ticket_index, 0u64.into());
2583
2584 assert_eq!(
2585 0,
2586 db.get_outgoing_ticket_index(channel.get_id())
2587 .await?
2588 .load(Ordering::Relaxed)
2589 );
2590 Ok(())
2591 }
2592
2593 #[tokio::test]
2594 async fn on_channel_should_not_reopen_when_not_closed() -> anyhow::Result<()> {
2595 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2596 let rpc_operations = MockIndexerRpcOperations::new();
2597 let clonable_rpc_operations = ClonableMockOperations {
2599 inner: Arc::new(rpc_operations),
2601 };
2602 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2603
2604 let channel = ChannelEntry::new(
2605 *SELF_CHAIN_ADDRESS,
2606 *COUNTERPARTY_CHAIN_ADDRESS,
2607 0.into(),
2608 primitive_types::U256::zero(),
2609 ChannelStatus::Open,
2610 3.into(),
2611 );
2612
2613 db.upsert_channel(None, channel).await?;
2614
2615 let encoded_data = ().abi_encode();
2616
2617 let channel_opened_log = SerializableLog {
2618 address: handlers.addresses.channels,
2619 topics: vec![
2620 hopr_bindings::hoprchannels::HoprChannels::ChannelOpened::SIGNATURE_HASH.into(),
2621 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2623 H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2624 ],
2625 data: encoded_data,
2626 ..test_log()
2627 };
2628
2629 db.begin_transaction()
2630 .await?
2631 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log, true).await }))
2632 .await
2633 .context("Channel should stay open, with corrupted flag set")?;
2634
2635 assert!(
2636 db.get_channel_by_id(None, &channel.get_id()).await?.is_none(),
2637 "channel should not be returned as marked as corrupted",
2638 );
2639
2640 db.get_corrupted_channel_by_id(None, &channel.get_id())
2641 .await?
2642 .context("a value should be present")?;
2643
2644 Ok(())
2645 }
2646
2647 #[tokio::test]
2648 async fn event_for_non_existing_channel_should_create_corrupted_channel() -> anyhow::Result<()> {
2649 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2650 let rpc_operations = MockIndexerRpcOperations::new();
2651 let clonable_rpc_operations = ClonableMockOperations {
2653 inner: Arc::new(rpc_operations),
2655 };
2656 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2657
2658 let channel_id = generate_channel_id(&SELF_CHAIN_ADDRESS, &COUNTERPARTY_CHAIN_ADDRESS);
2659
2660 let solidity_balance: HoprBalance = primitive_types::U256::from((1u128 << 96) - 1).into();
2662
2663 let encoded_data = (solidity_balance.amount().to_be_bytes()).abi_encode();
2664
2665 let balance_increased_log = SerializableLog {
2666 address: handlers.addresses.channels,
2667 topics: vec![
2668 hopr_bindings::hoprchannels::HoprChannels::ChannelBalanceIncreased::SIGNATURE_HASH.into(),
2669 H256::from_slice(channel_id.as_ref()).into(),
2671 ],
2672 data: encoded_data,
2673 ..test_log()
2674 };
2675
2676 db.begin_transaction()
2677 .await?
2678 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_increased_log, true).await }))
2679 .await?;
2680
2681 db.get_corrupted_channel_by_id(None, &channel_id)
2683 .await?
2684 .context("channel should be set a corrupted")?;
2685
2686 Ok(())
2687 }
2688
2689 const PRICE_PER_PACKET: u32 = 20_u32;
2690
2691 fn mock_acknowledged_ticket(
2692 signer: &ChainKeypair,
2693 destination: &ChainKeypair,
2694 index: u64,
2695 win_prob: f64,
2696 ) -> anyhow::Result<AcknowledgedTicket> {
2697 let channel_id = generate_channel_id(&signer.into(), &destination.into());
2698
2699 let channel_epoch = 1u64;
2700 let domain_separator = Hash::default();
2701
2702 let response = Response::try_from(
2703 Hash::create(&[channel_id.as_ref(), &channel_epoch.to_be_bytes(), &index.to_be_bytes()]).as_ref(),
2704 )?;
2705
2706 Ok(TicketBuilder::default()
2707 .direction(&signer.into(), &destination.into())
2708 .amount(primitive_types::U256::from(PRICE_PER_PACKET).div_f64(win_prob)?)
2709 .index(index)
2710 .index_offset(1)
2711 .win_prob(win_prob.try_into()?)
2712 .channel_epoch(1)
2713 .challenge(response.to_challenge()?)
2714 .build_signed(signer, &domain_separator)?
2715 .into_acknowledged(response))
2716 }
2717
2718 #[tokio::test]
2719 async fn on_channel_ticket_redeemed_incoming_channel() -> anyhow::Result<()> {
2720 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2721 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2722 .await?;
2723 let rpc_operations = MockIndexerRpcOperations::new();
2724 let clonable_rpc_operations = ClonableMockOperations {
2726 inner: Arc::new(rpc_operations),
2728 };
2729 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2730
2731 let channel = ChannelEntry::new(
2732 *COUNTERPARTY_CHAIN_ADDRESS,
2733 *SELF_CHAIN_ADDRESS,
2734 HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1)),
2735 primitive_types::U256::zero(),
2736 ChannelStatus::Open,
2737 primitive_types::U256::one(),
2738 );
2739
2740 let ticket_index = primitive_types::U256::from((1u128 << 48) - 2);
2741 let next_ticket_index = ticket_index + 1;
2742
2743 let mut ticket =
2744 mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64(), 1.0)?;
2745 ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
2746
2747 let ticket_value = ticket.verified_ticket().amount;
2748
2749 db.upsert_channel(None, channel).await?;
2750 db.upsert_ticket(None, ticket.clone()).await?;
2751
2752 let ticket_redeemed_log = SerializableLog {
2753 address: handlers.addresses.channels,
2754 topics: vec![
2755 hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2756 H256::from_slice(channel.get_id().as_ref()).into(),
2758 ],
2759 data: DynSolValue::Tuple(vec![DynSolValue::Uint(
2760 U256::from_be_bytes(next_ticket_index.to_be_bytes()),
2761 48,
2762 )])
2763 .abi_encode(),
2764 ..test_log()
2765 };
2766
2767 let outgoing_ticket_index_before = db
2768 .get_outgoing_ticket_index(channel.get_id())
2769 .await?
2770 .load(Ordering::Relaxed);
2771
2772 let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2773 assert_eq!(
2774 HoprBalance::zero(),
2775 stats.redeemed_value,
2776 "there should not be any redeemed value"
2777 );
2778 assert_eq!(
2779 HoprBalance::zero(),
2780 stats.neglected_value,
2781 "there should not be any neglected value"
2782 );
2783
2784 let event_type = db
2785 .begin_transaction()
2786 .await?
2787 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2788 .await?;
2789
2790 let channel = db
2791 .get_channel_by_id(None, &channel.get_id())
2792 .await?
2793 .context("a value should be present")?;
2794
2795 assert!(
2796 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, t)) if channel == c && t == Some(ticket)),
2797 "must return the updated channel entry and the redeemed ticket"
2798 );
2799
2800 assert_eq!(
2801 channel.ticket_index, next_ticket_index,
2802 "channel entry must contain next ticket index"
2803 );
2804
2805 let outgoing_ticket_index_after = db
2806 .get_outgoing_ticket_index(channel.get_id())
2807 .await?
2808 .load(Ordering::Relaxed);
2809
2810 assert_eq!(
2811 outgoing_ticket_index_before, outgoing_ticket_index_after,
2812 "outgoing ticket index must not change"
2813 );
2814
2815 let tickets = db.get_tickets((&channel).into()).await?;
2816
2817 assert!(tickets.is_empty(), "there should not be any tickets left");
2818
2819 let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2820 assert_eq!(
2821 ticket_value, stats.redeemed_value,
2822 "there should be redeemed value worth 1 ticket"
2823 );
2824 assert_eq!(
2825 HoprBalance::zero(),
2826 stats.neglected_value,
2827 "there should not be any neglected ticket"
2828 );
2829 Ok(())
2830 }
2831
2832 #[tokio::test]
2833 async fn on_channel_ticket_redeemed_incoming_channel_neglect_left_over_tickets() -> anyhow::Result<()> {
2834 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2835 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2836 .await?;
2837 let rpc_operations = MockIndexerRpcOperations::new();
2838 let clonable_rpc_operations = ClonableMockOperations {
2840 inner: Arc::new(rpc_operations),
2842 };
2843 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2844
2845 let channel = ChannelEntry::new(
2846 *COUNTERPARTY_CHAIN_ADDRESS,
2847 *SELF_CHAIN_ADDRESS,
2848 primitive_types::U256::from((1u128 << 96) - 1).into(),
2849 primitive_types::U256::zero(),
2850 ChannelStatus::Open,
2851 primitive_types::U256::one(),
2852 );
2853
2854 let ticket_index = primitive_types::U256::from((1u128 << 48) - 2);
2855 let next_ticket_index = ticket_index + 1;
2856
2857 let mut ticket =
2858 mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64(), 1.0)?;
2859 ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
2860
2861 let ticket_value = ticket.verified_ticket().amount;
2862
2863 db.upsert_channel(None, channel).await?;
2864 db.upsert_ticket(None, ticket.clone()).await?;
2865
2866 let old_ticket =
2867 mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64() - 1, 1.0)?;
2868 db.upsert_ticket(None, old_ticket.clone()).await?;
2869
2870 let ticket_redeemed_log = SerializableLog {
2871 address: handlers.addresses.channels,
2872 topics: vec![
2873 hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2874 H256::from_slice(channel.get_id().as_ref()).into(),
2876 ],
2877 data: Vec::from(next_ticket_index.to_be_bytes()),
2878 ..test_log()
2879 };
2880
2881 let outgoing_ticket_index_before = db
2882 .get_outgoing_ticket_index(channel.get_id())
2883 .await?
2884 .load(Ordering::Relaxed);
2885
2886 let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2887 assert_eq!(
2888 HoprBalance::zero(),
2889 stats.redeemed_value,
2890 "there should not be any redeemed value"
2891 );
2892 assert_eq!(
2893 HoprBalance::zero(),
2894 stats.neglected_value,
2895 "there should not be any neglected value"
2896 );
2897
2898 let event_type = db
2899 .begin_transaction()
2900 .await?
2901 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2902 .await?;
2903
2904 let channel = db
2905 .get_channel_by_id(None, &channel.get_id())
2906 .await?
2907 .context("a value should be present")?;
2908
2909 assert!(
2910 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, t)) if channel == c && t == Some(ticket)),
2911 "must return the updated channel entry and the redeemed ticket"
2912 );
2913
2914 assert_eq!(
2915 channel.ticket_index, next_ticket_index,
2916 "channel entry must contain next ticket index"
2917 );
2918
2919 let outgoing_ticket_index_after = db
2920 .get_outgoing_ticket_index(channel.get_id())
2921 .await?
2922 .load(Ordering::Relaxed);
2923
2924 assert_eq!(
2925 outgoing_ticket_index_before, outgoing_ticket_index_after,
2926 "outgoing ticket index must not change"
2927 );
2928
2929 let tickets = db.get_tickets((&channel).into()).await?;
2930 assert!(tickets.is_empty(), "there should not be any tickets left");
2931
2932 let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2933 assert_eq!(
2934 ticket_value, stats.redeemed_value,
2935 "there should be redeemed value worth 1 ticket"
2936 );
2937 assert_eq!(
2938 ticket_value, stats.neglected_value,
2939 "there should neglected value worth 1 ticket"
2940 );
2941 Ok(())
2942 }
2943
2944 #[tokio::test]
2945 async fn on_channel_ticket_redeemed_outgoing_channel() -> anyhow::Result<()> {
2946 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2947 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2948 .await?;
2949 let rpc_operations = MockIndexerRpcOperations::new();
2950 let clonable_rpc_operations = ClonableMockOperations {
2952 inner: Arc::new(rpc_operations),
2954 };
2955 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2956
2957 let channel = ChannelEntry::new(
2958 *SELF_CHAIN_ADDRESS,
2959 *COUNTERPARTY_CHAIN_ADDRESS,
2960 primitive_types::U256::from((1u128 << 96) - 1).into(),
2961 primitive_types::U256::zero(),
2962 ChannelStatus::Open,
2963 primitive_types::U256::one(),
2964 );
2965
2966 let ticket_index = primitive_types::U256::from((1u128 << 48) - 2);
2967 let next_ticket_index = ticket_index + 1;
2968
2969 db.upsert_channel(None, channel).await?;
2970
2971 let ticket_redeemed_log = SerializableLog {
2972 address: handlers.addresses.channels,
2973 topics: vec![
2974 hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2975 H256::from_slice(channel.get_id().as_ref()).into(),
2977 ],
2978 data: Vec::from(next_ticket_index.to_be_bytes()),
2979 ..test_log()
2980 };
2981
2982 let event_type = db
2983 .begin_transaction()
2984 .await?
2985 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2986 .await?;
2987
2988 let channel = db
2989 .get_channel_by_id(None, &channel.get_id())
2990 .await?
2991 .context("a value should be present")?;
2992
2993 assert!(
2994 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if channel == c),
2995 "must return update channel entry and no ticket"
2996 );
2997
2998 assert_eq!(
2999 channel.ticket_index, next_ticket_index,
3000 "channel entry must contain next ticket index"
3001 );
3002
3003 let outgoing_ticket_index = db
3004 .get_outgoing_ticket_index(channel.get_id())
3005 .await?
3006 .load(Ordering::Relaxed);
3007
3008 assert!(
3009 outgoing_ticket_index >= ticket_index.as_u64(),
3010 "outgoing idx {outgoing_ticket_index} must be greater or equal to {ticket_index}"
3011 );
3012 assert_eq!(
3013 outgoing_ticket_index,
3014 next_ticket_index.as_u64(),
3015 "outgoing ticket index must be equal to next ticket index"
3016 );
3017 Ok(())
3018 }
3019
3020 #[tokio::test]
3021 async fn on_channel_ticket_redeemed_on_incoming_channel_with_non_existent_ticket_should_pass() -> anyhow::Result<()>
3022 {
3023 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3024 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
3025 .await?;
3026 let rpc_operations = MockIndexerRpcOperations::new();
3027 let clonable_rpc_operations = ClonableMockOperations {
3029 inner: Arc::new(rpc_operations),
3031 };
3032 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3033
3034 let channel = ChannelEntry::new(
3035 *COUNTERPARTY_CHAIN_ADDRESS,
3036 *SELF_CHAIN_ADDRESS,
3037 primitive_types::U256::from((1u128 << 96) - 1).into(),
3038 primitive_types::U256::zero(),
3039 ChannelStatus::Open,
3040 primitive_types::U256::one(),
3041 );
3042
3043 db.upsert_channel(None, channel).await?;
3044
3045 let next_ticket_index = primitive_types::U256::from((1u128 << 48) - 1);
3046
3047 let ticket_redeemed_log = SerializableLog {
3048 address: handlers.addresses.channels,
3049 topics: vec![
3050 hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
3051 H256::from_slice(channel.get_id().as_ref()).into(),
3053 ],
3054 data: Vec::from(next_ticket_index.to_be_bytes()),
3055 ..test_log()
3056 };
3057
3058 let event_type = db
3059 .begin_transaction()
3060 .await?
3061 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
3062 .await?;
3063
3064 let channel = db
3065 .get_channel_by_id(None, &channel.get_id())
3066 .await?
3067 .context("a value should be present")?;
3068
3069 assert!(
3070 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if c == channel),
3071 "must return updated channel entry and no ticket"
3072 );
3073
3074 assert_eq!(
3075 channel.ticket_index, next_ticket_index,
3076 "channel entry must contain next ticket index"
3077 );
3078 Ok(())
3079 }
3080
3081 #[tokio::test]
3082 async fn on_channel_ticket_redeemed_on_foreign_channel_should_pass() -> anyhow::Result<()> {
3083 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3084 let rpc_operations = MockIndexerRpcOperations::new();
3085 let clonable_rpc_operations = ClonableMockOperations {
3087 inner: Arc::new(rpc_operations),
3089 };
3090 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3091
3092 let channel = ChannelEntry::new(
3093 Address::from(hopr_crypto_random::random_bytes()),
3094 Address::from(hopr_crypto_random::random_bytes()),
3095 primitive_types::U256::from((1u128 << 96) - 1).into(),
3096 primitive_types::U256::zero(),
3097 ChannelStatus::Open,
3098 primitive_types::U256::one(),
3099 );
3100
3101 db.upsert_channel(None, channel).await?;
3102
3103 let next_ticket_index = primitive_types::U256::from((1u128 << 48) - 1);
3104
3105 let ticket_redeemed_log = SerializableLog {
3106 address: handlers.addresses.channels,
3107 topics: vec![
3108 hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
3109 H256::from_slice(channel.get_id().as_ref()).into(),
3111 ],
3112 data: Vec::from(next_ticket_index.to_be_bytes()),
3113 ..test_log()
3114 };
3115
3116 let event_type = db
3117 .begin_transaction()
3118 .await?
3119 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
3120 .await?;
3121
3122 let channel = db
3123 .get_channel_by_id(None, &channel.get_id())
3124 .await?
3125 .context("a value should be present")?;
3126
3127 assert!(
3128 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if c == channel),
3129 "must return updated channel entry and no ticket"
3130 );
3131
3132 assert_eq!(
3133 channel.ticket_index, next_ticket_index,
3134 "channel entry must contain next ticket index"
3135 );
3136 Ok(())
3137 }
3138
3139 #[tokio::test]
3140 async fn on_channel_closure_initiated() -> anyhow::Result<()> {
3141 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3142 let rpc_operations = MockIndexerRpcOperations::new();
3143 let clonable_rpc_operations = ClonableMockOperations {
3145 inner: Arc::new(rpc_operations),
3147 };
3148 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3149
3150 let channel = ChannelEntry::new(
3151 *SELF_CHAIN_ADDRESS,
3152 *COUNTERPARTY_CHAIN_ADDRESS,
3153 primitive_types::U256::from((1u128 << 96) - 1).into(),
3154 primitive_types::U256::zero(),
3155 ChannelStatus::Open,
3156 primitive_types::U256::one(),
3157 );
3158
3159 db.upsert_channel(None, channel).await?;
3160
3161 let timestamp = SystemTime::now();
3162
3163 let encoded_data = (U256::from(timestamp.as_unix_timestamp().as_secs())).abi_encode();
3164
3165 let closure_initiated_log = SerializableLog {
3166 address: handlers.addresses.channels,
3167 topics: vec![
3168 hopr_bindings::hoprchannels::HoprChannels::OutgoingChannelClosureInitiated::SIGNATURE_HASH.into(),
3169 H256::from_slice(channel.get_id().as_ref()).into(),
3171 ],
3172 data: encoded_data,
3173 ..test_log()
3175 };
3176
3177 let event_type = db
3178 .begin_transaction()
3179 .await?
3180 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, closure_initiated_log, true).await }))
3181 .await?;
3182
3183 let channel = db
3184 .get_channel_by_id(None, &channel.get_id())
3185 .await?
3186 .context("a value should be present")?;
3187
3188 assert!(
3189 matches!(event_type, Some(ChainEventType::ChannelClosureInitiated(c)) if c == channel),
3190 "must return updated channel entry"
3191 );
3192
3193 assert_eq!(
3194 channel.status,
3195 ChannelStatus::PendingToClose(timestamp),
3196 "channel status must match"
3197 );
3198 Ok(())
3199 }
3200
3201 #[tokio::test]
3202 async fn on_node_safe_registry_registered() -> anyhow::Result<()> {
3203 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3204 let rpc_operations = MockIndexerRpcOperations::new();
3205 let clonable_rpc_operations = ClonableMockOperations {
3207 inner: Arc::new(rpc_operations),
3209 };
3210 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3211
3212 let encoded_data = ().abi_encode();
3213
3214 let safe_registered_log = SerializableLog {
3215 address: handlers.addresses.safe_registry,
3216 topics: vec![
3217 hopr_bindings::hoprnodesaferegistry::HoprNodeSafeRegistry::RegisteredNodeSafe::SIGNATURE_HASH.into(),
3218 H256::from_slice(&SAFE_INSTANCE_ADDR.to_bytes32()).into(),
3220 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
3221 ],
3222 data: encoded_data,
3223 ..test_log()
3224 };
3225
3226 let event_type = db
3227 .begin_transaction()
3228 .await?
3229 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, safe_registered_log, true).await }))
3230 .await?;
3231
3232 assert!(matches!(event_type, Some(ChainEventType::NodeSafeRegistered(addr)) if addr == *SAFE_INSTANCE_ADDR));
3233
3234 Ok(())
3236 }
3237
3238 #[tokio::test]
3239 async fn on_node_safe_registry_deregistered() -> anyhow::Result<()> {
3240 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3241 let rpc_operations = MockIndexerRpcOperations::new();
3242 let clonable_rpc_operations = ClonableMockOperations {
3244 inner: Arc::new(rpc_operations),
3246 };
3247 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3248
3249 let encoded_data = ().abi_encode();
3252
3253 let safe_registered_log = SerializableLog {
3254 address: handlers.addresses.safe_registry,
3255 topics: vec![
3256 hopr_bindings::hoprnodesaferegistry::HoprNodeSafeRegistry::DergisteredNodeSafe::SIGNATURE_HASH.into(),
3257 H256::from_slice(&SAFE_INSTANCE_ADDR.to_bytes32()).into(),
3259 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
3260 ],
3261 data: encoded_data,
3262 ..test_log()
3263 };
3264
3265 let event_type = db
3266 .begin_transaction()
3267 .await?
3268 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, safe_registered_log, true).await }))
3269 .await?;
3270
3271 assert!(
3272 event_type.is_none(),
3273 "there's no associated chain event type with safe deregistration"
3274 );
3275
3276 Ok(())
3278 }
3279
3280 #[tokio::test]
3281 async fn ticket_price_update() -> anyhow::Result<()> {
3282 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3283 let rpc_operations = MockIndexerRpcOperations::new();
3284 let clonable_rpc_operations = ClonableMockOperations {
3286 inner: Arc::new(rpc_operations),
3288 };
3289 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3290
3291 let encoded_data = (U256::from(1u64), U256::from(123u64)).abi_encode();
3292
3293 let price_change_log = SerializableLog {
3294 address: handlers.addresses.price_oracle,
3295 topics: vec![
3296 hopr_bindings::hoprticketpriceoracle::HoprTicketPriceOracle::TicketPriceUpdated::SIGNATURE_HASH.into(),
3297 ],
3299 data: encoded_data,
3300 ..test_log()
3302 };
3303
3304 assert_eq!(db.get_indexer_data(None).await?.ticket_price, None);
3305
3306 let event_type = db
3307 .begin_transaction()
3308 .await?
3309 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, price_change_log, true).await }))
3310 .await?;
3311
3312 assert!(
3313 event_type.is_none(),
3314 "there's no associated chain event type with price oracle"
3315 );
3316
3317 assert_eq!(
3318 db.get_indexer_data(None).await?.ticket_price.map(|p| p.amount()),
3319 Some(primitive_types::U256::from(123u64))
3320 );
3321 Ok(())
3322 }
3323
3324 #[tokio::test]
3325 async fn minimum_win_prob_update() -> anyhow::Result<()> {
3326 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3327 let rpc_operations = MockIndexerRpcOperations::new();
3328 let clonable_rpc_operations = ClonableMockOperations {
3330 inner: Arc::new(rpc_operations),
3332 };
3333 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3334
3335 let encoded_data = (
3336 U256::from_be_slice(WinningProbability::ALWAYS.as_ref()),
3337 U256::from_be_slice(WinningProbability::try_from_f64(0.5)?.as_ref()),
3338 )
3339 .abi_encode();
3340
3341 let win_prob_change_log = SerializableLog {
3342 address: handlers.addresses.win_prob_oracle,
3343 topics: vec![
3344 hopr_bindings::hoprwinningprobabilityoracle::HoprWinningProbabilityOracle::WinProbUpdated::SIGNATURE_HASH.into()],
3345 data: encoded_data,
3346 ..test_log()
3347 };
3348
3349 assert_eq!(
3350 db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
3351 1.0
3352 );
3353
3354 let event_type = db
3355 .begin_transaction()
3356 .await?
3357 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, win_prob_change_log, true).await }))
3358 .await?;
3359
3360 assert!(
3361 event_type.is_none(),
3362 "there's no associated chain event type with winning probability change"
3363 );
3364
3365 assert_eq!(
3366 db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
3367 0.5
3368 );
3369 Ok(())
3370 }
3371
3372 #[tokio::test]
3373 async fn lowering_minimum_win_prob_update_should_reject_non_satisfying_unredeemed_tickets() -> anyhow::Result<()> {
3374 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3375 db.set_minimum_incoming_ticket_win_prob(None, 0.1.try_into()?).await?;
3376
3377 let new_minimum = 0.5;
3378 let ticket_win_probs = [0.1, 1.0, 0.3, 0.2];
3379
3380 let channel_1 = ChannelEntry::new(
3381 *COUNTERPARTY_CHAIN_ADDRESS,
3382 *SELF_CHAIN_ADDRESS,
3383 primitive_types::U256::from((1u128 << 96) - 1).into(),
3384 3_u32.into(),
3385 ChannelStatus::Open,
3386 primitive_types::U256::one(),
3387 );
3388
3389 db.upsert_channel(None, channel_1).await?;
3390
3391 let ticket = mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, 1, ticket_win_probs[0])?;
3392 db.upsert_ticket(None, ticket).await?;
3393
3394 let ticket = mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, 2, ticket_win_probs[1])?;
3395 db.upsert_ticket(None, ticket).await?;
3396
3397 let tickets = db.get_tickets((&channel_1).into()).await?;
3398 assert_eq!(tickets.len(), 2);
3399
3400 let other_counterparty = ChainKeypair::random();
3403 let channel_2 = ChannelEntry::new(
3404 other_counterparty.public().to_address(),
3405 *SELF_CHAIN_ADDRESS,
3406 primitive_types::U256::from((1u128 << 96) - 1).into(),
3407 3_u32.into(),
3408 ChannelStatus::Open,
3409 primitive_types::U256::one(),
3410 );
3411
3412 db.upsert_channel(None, channel_2).await?;
3413
3414 let ticket = mock_acknowledged_ticket(&other_counterparty, &SELF_CHAIN_KEY, 1, ticket_win_probs[2])?;
3415 db.upsert_ticket(None, ticket).await?;
3416
3417 let ticket = mock_acknowledged_ticket(&other_counterparty, &SELF_CHAIN_KEY, 2, ticket_win_probs[3])?;
3418 db.upsert_ticket(None, ticket).await?;
3419
3420 let tickets = db.get_tickets((&channel_2).into()).await?;
3421 assert_eq!(tickets.len(), 2);
3422
3423 let stats = db.get_ticket_statistics(None).await?;
3424 assert_eq!(HoprBalance::zero(), stats.rejected_value);
3425
3426 let rpc_operations = MockIndexerRpcOperations::new();
3427 let clonable_rpc_operations = ClonableMockOperations {
3429 inner: Arc::new(rpc_operations),
3431 };
3432 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3433
3434 let encoded_data = (
3435 U256::from_be_slice(WinningProbability::try_from(0.1)?.as_ref()),
3436 U256::from_be_slice(WinningProbability::try_from(new_minimum)?.as_ref()),
3437 )
3438 .abi_encode();
3439
3440 let win_prob_change_log = SerializableLog {
3441 address: handlers.addresses.win_prob_oracle,
3442 topics: vec![
3443 hopr_bindings::hoprwinningprobabilityoracle::HoprWinningProbabilityOracle::WinProbUpdated::SIGNATURE_HASH.into(),
3444 ],
3445 data: encoded_data,
3446 ..test_log()
3447 };
3448
3449 let event_type = db
3450 .begin_transaction()
3451 .await?
3452 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, win_prob_change_log, true).await }))
3453 .await?;
3454
3455 assert!(
3456 event_type.is_none(),
3457 "there's no associated chain event type with winning probability change"
3458 );
3459
3460 assert_eq!(
3461 db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
3462 new_minimum
3463 );
3464
3465 let tickets = db.get_tickets((&channel_1).into()).await?;
3466 assert_eq!(tickets.len(), 1);
3467
3468 let tickets = db.get_tickets((&channel_2).into()).await?;
3469 assert_eq!(tickets.len(), 0);
3470
3471 let stats = db.get_ticket_statistics(None).await?;
3472 let rejected_value: primitive_types::U256 = ticket_win_probs
3473 .iter()
3474 .filter(|p| **p < new_minimum)
3475 .map(|p| {
3476 primitive_types::U256::from(PRICE_PER_PACKET)
3477 .div_f64(*p)
3478 .expect("must divide")
3479 })
3480 .reduce(|a, b| a + b)
3481 .ok_or(anyhow!("must sum"))?;
3482
3483 assert_eq!(HoprBalance::from(rejected_value), stats.rejected_value);
3484
3485 Ok(())
3486 }
3487}