1use std::{
2 cmp::Ordering,
3 fmt::{Debug, Formatter},
4 ops::Add,
5 sync::Arc,
6 time::{Duration, SystemTime},
7};
8
9use alloy::{primitives::B256, sol_types::SolEventInterface};
10use async_trait::async_trait;
11use hopr_bindings::{
12 hoprannouncements::HoprAnnouncements::HoprAnnouncementsEvents, hoprchannels::HoprChannels::HoprChannelsEvents,
13 hoprnetworkregistry::HoprNetworkRegistry::HoprNetworkRegistryEvents,
14 hoprnodemanagementmodule::HoprNodeManagementModule::HoprNodeManagementModuleEvents,
15 hoprnodesaferegistry::HoprNodeSafeRegistry::HoprNodeSafeRegistryEvents,
16 hoprticketpriceoracle::HoprTicketPriceOracle::HoprTicketPriceOracleEvents, hoprtoken::HoprToken::HoprTokenEvents,
17 hoprwinningprobabilityoracle::HoprWinningProbabilityOracle::HoprWinningProbabilityOracleEvents,
18};
19use hopr_chain_rpc::{HoprIndexerRpcOperations, Log};
20use hopr_chain_types::{
21 ContractAddresses,
22 chain_events::{ChainEventType, NetworkRegistryStatus, SignificantChainEvent},
23};
24use hopr_crypto_types::prelude::*;
25use hopr_db_sql::{
26 HoprDbAllOperations, OpenTransaction,
27 api::{info::DomainSeparator, tickets::TicketSelector},
28 errors::DbSqlError,
29 prelude::TicketMarker,
30};
31use hopr_internal_types::prelude::*;
32use hopr_primitive_types::prelude::*;
33use tracing::{debug, error, info, trace, warn};
34
35use crate::errors::{CoreEthereumIndexerError, Result};
36
37#[cfg(all(feature = "prometheus", not(test)))]
38lazy_static::lazy_static! {
39 static ref METRIC_INDEXER_LOG_COUNTERS: hopr_metrics::MultiCounter =
40 hopr_metrics::MultiCounter::new(
41 "hopr_indexer_contract_log_count",
42 "Counts of different HOPR contract logs processed by the Indexer",
43 &["contract"]
44 ).unwrap();
45}
46
47#[derive(Clone)]
52pub struct ContractEventHandlers<T, Db> {
53 addresses: Arc<ContractAddresses>,
56 safe_address: Address,
58 chain_key: ChainKeypair, db: Db,
62 rpc_operations: T,
64}
65
66impl<T, Db> Debug for ContractEventHandlers<T, Db> {
67 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68 f.debug_struct("ContractEventHandler")
69 .field("addresses", &self.addresses)
70 .field("safe_address", &self.safe_address)
71 .field("chain_key", &self.chain_key)
72 .finish_non_exhaustive()
73 }
74}
75
76impl<T, Db> ContractEventHandlers<T, Db>
77where
78 T: HoprIndexerRpcOperations + Clone + Send + 'static,
79 Db: HoprDbAllOperations + Clone,
80{
81 pub fn new(
99 addresses: ContractAddresses,
100 safe_address: Address,
101 chain_key: ChainKeypair,
102 db: Db,
103 rpc_operations: T,
104 ) -> Self {
105 Self {
106 addresses: Arc::new(addresses),
107 safe_address,
108 chain_key,
109 db,
110 rpc_operations,
111 }
112 }
113
114 async fn on_announcement_event(
115 &self,
116 tx: &OpenTransaction,
117 event: HoprAnnouncementsEvents,
118 block_number: u32,
119 _is_synced: bool,
120 ) -> Result<Option<ChainEventType>> {
121 #[cfg(all(feature = "prometheus", not(test)))]
122 METRIC_INDEXER_LOG_COUNTERS.increment(&["announcements"]);
123
124 match event {
125 HoprAnnouncementsEvents::AddressAnnouncement(address_announcement) => {
126 debug!(
127 multiaddress = &address_announcement.baseMultiaddr,
128 address = &address_announcement.node.to_string(),
129 "on_announcement_event: AddressAnnouncement",
130 );
131 if address_announcement.baseMultiaddr.is_empty() {
133 warn!(
134 address = %address_announcement.node,
135 "encountered empty multiaddress announcement",
136 );
137 return Ok(None);
138 }
139 let node_address: Address = address_announcement.node.into();
140
141 return match self
142 .db
143 .insert_announcement(
144 Some(tx),
145 node_address,
146 address_announcement.baseMultiaddr.parse()?,
147 block_number,
148 )
149 .await
150 {
151 Ok(account) => Ok(Some(ChainEventType::Announcement {
152 peer: account.public_key.into(),
153 address: account.chain_addr,
154 multiaddresses: vec![account.get_multiaddr().expect("not must contain multiaddr")],
155 })),
156 Err(DbSqlError::MissingAccount) => Err(CoreEthereumIndexerError::AnnounceBeforeKeyBinding),
157 Err(e) => Err(e.into()),
158 };
159 }
160 HoprAnnouncementsEvents::KeyBinding(key_binding) => {
161 debug!(
162 address = %key_binding.chain_key,
163 public_key = %key_binding.ed25519_pub_key,
164 "on_announcement_event: KeyBinding",
165 );
166 match KeyBinding::from_parts(
167 key_binding.chain_key.into(),
168 key_binding.ed25519_pub_key.0.try_into()?,
169 OffchainSignature::try_from((key_binding.ed25519_sig_0.0, key_binding.ed25519_sig_1.0))?,
170 ) {
171 Ok(binding) => {
172 match self
173 .db
174 .insert_account(
175 Some(tx),
176 AccountEntry {
177 public_key: binding.packet_key,
178 chain_addr: binding.chain_key,
179 entry_type: AccountType::NotAnnounced,
180 published_at: block_number,
181 },
182 )
183 .await
184 {
185 Ok(_) => (),
186 Err(err) => {
187 error!(%err, "failed to store announcement key binding")
190 }
191 }
192 }
193 Err(e) => {
194 warn!(
195 address = ?key_binding.chain_key,
196 error = %e,
197 "Filtering announcement with invalid signature",
198
199 )
200 }
201 }
202 }
203 HoprAnnouncementsEvents::RevokeAnnouncement(revocation) => {
204 let node_address: Address = revocation.node.into();
205 match self.db.delete_all_announcements(Some(tx), node_address).await {
206 Err(DbSqlError::MissingAccount) => {
207 return Err(CoreEthereumIndexerError::RevocationBeforeKeyBinding);
208 }
209 Err(e) => return Err(e.into()),
210 _ => {}
211 }
212 }
213 };
214
215 Ok(None)
216 }
217
218 async fn on_channel_event(
219 &self,
220 tx: &OpenTransaction,
221 event: HoprChannelsEvents,
222 is_synced: bool,
223 ) -> Result<Option<ChainEventType>> {
224 #[cfg(all(feature = "prometheus", not(test)))]
225 METRIC_INDEXER_LOG_COUNTERS.increment(&["channels"]);
226
227 match event {
228 HoprChannelsEvents::ChannelBalanceDecreased(balance_decreased) => {
229 let channel_id = balance_decreased.channelId.0.into();
230
231 let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
232 Ok(channel) => channel,
233 Err(e) => {
234 error!(%channel_id, %e, "failed to begin channel update on on_channel_balance_decreased_event");
235 return Err(e.into());
236 }
237 };
238
239 trace!(
240 %channel_id,
241 is_channel = maybe_channel.is_some(),
242 "on_channel_balance_decreased_event",
243 );
244
245 if let Some(channel_edits) = maybe_channel {
246 let new_balance = HoprBalance::from_be_bytes(balance_decreased.newBalance.to_be_bytes::<12>());
247 let diff = channel_edits.entry().balance - new_balance;
248
249 let updated_channel = self
250 .db
251 .finish_channel_update(tx.into(), channel_edits.change_balance(new_balance))
252 .await?
253 .ok_or(CoreEthereumIndexerError::ProcessError(format!(
254 "channel balance decreased event for channel {channel_id} did not return an updated \
255 channel"
256 )))?;
257
258 if is_synced
259 && (updated_channel.source == self.chain_key.public().to_address()
260 || updated_channel.destination == self.chain_key.public().to_address())
261 {
262 info!("updating safe balance from chain after channel balance decreased event");
263 match self.rpc_operations.get_hopr_balance(self.safe_address).await {
264 Ok(balance) => {
265 self.db.set_safe_hopr_balance(Some(tx), balance).await?;
266 }
267 Err(error) => {
268 error!(%error, "error getting safe balance from chain after channel balance decreased event");
269 }
270 }
271 }
272
273 Ok(Some(ChainEventType::ChannelBalanceDecreased(updated_channel, diff)))
274 } else {
275 error!(%channel_id, "observed balance decreased event for a channel that does not exist");
276 self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
277 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
278 }
279 }
280 HoprChannelsEvents::ChannelBalanceIncreased(balance_increased) => {
281 let channel_id = balance_increased.channelId.0.into();
282
283 let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
284 Ok(channel) => channel,
285 Err(e) => {
286 error!(%channel_id, %e, "failed to begin channel update on on_channel_balance_increased_event");
287 return Err(e.into());
288 }
289 };
290
291 trace!(
292 %channel_id,
293 is_channel = maybe_channel.is_some(),
294 "on_channel_balance_increased_event",
295 );
296
297 if let Some(channel_edits) = maybe_channel {
298 let new_balance = HoprBalance::from_be_bytes(balance_increased.newBalance.to_be_bytes::<12>());
299 let diff = new_balance - channel_edits.entry().balance;
300
301 let updated_channel = self
302 .db
303 .finish_channel_update(tx.into(), channel_edits.change_balance(new_balance))
304 .await?
305 .ok_or(CoreEthereumIndexerError::ProcessError(format!(
306 "channel balance increased event for channel {channel_id} did not return an updated \
307 channel"
308 )))?;
309
310 if updated_channel.source == self.chain_key.public().to_address() && is_synced {
311 info!("updating safe balance from chain after channel balance increased event");
312 match self.rpc_operations.get_hopr_balance(self.safe_address).await {
313 Ok(balance) => {
314 self.db.set_safe_hopr_balance(Some(tx), balance).await?;
315 }
316 Err(error) => {
317 error!(%error, "error getting safe balance from chain after channel balance increased event");
318 }
319 }
320 info!("updating safe allowance from chain after channel balance increased event");
321 match self
322 .rpc_operations
323 .get_hopr_allowance(self.safe_address, self.addresses.channels)
324 .await
325 {
326 Ok(allowance) => {
327 self.db.set_safe_hopr_allowance(Some(tx), allowance).await?;
328 }
329 Err(error) => {
330 error!(%error, "error getting safe allowance from chain after channel balance increased event");
331 }
332 }
333 }
334
335 Ok(Some(ChainEventType::ChannelBalanceIncreased(updated_channel, diff)))
336 } else {
337 error!(%channel_id, "observed balance increased event for a channel that does not exist");
338 self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
339 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
340 }
341 }
342 HoprChannelsEvents::ChannelClosed(channel_closed) => {
343 let channel_id = channel_closed.channelId.0.into();
344
345 let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
346 Ok(channel) => channel,
347 Err(e) => {
348 error!(%channel_id, %e, "failed to begin channel update on on_channel_closed_event");
349 return Err(e.into());
350 }
351 };
352
353 trace!(
354 %channel_id,
355 is_channel = maybe_channel.is_some(),
356 "on_channel_closed_event",
357 );
358
359 if let Some(channel_edits) = maybe_channel {
360 let channel_id = channel_edits.entry().get_id();
361 let orientation = channel_edits.entry().orientation(&self.chain_key.public().to_address());
362
363 let updated_channel = if let Some((direction, _)) = orientation {
366 let channel_edits = channel_edits
368 .change_status(ChannelStatus::Closed)
369 .change_balance(HoprBalance::zero())
370 .change_ticket_index(0);
371
372 let updated_channel = self.db.finish_channel_update(tx.into(), channel_edits).await?.ok_or(
373 CoreEthereumIndexerError::ProcessError(format!(
374 "channel closed event for channel {channel_id} did not return an updated channel",
375 )),
376 )?;
377
378 match direction {
380 ChannelDirection::Incoming => {
381 self.db
383 .mark_tickets_as(updated_channel.into(), TicketMarker::Neglected)
384 .await?;
385 }
386 ChannelDirection::Outgoing => {
387 self.db.reset_outgoing_ticket_index(channel_id).await?;
389 }
390 }
391 updated_channel
392 } else {
393 let updated_channel = self
395 .db
396 .finish_channel_update(tx.into(), channel_edits.delete())
397 .await?
398 .ok_or(CoreEthereumIndexerError::ProcessError(format!(
399 "channel closed event for channel {channel_id} did not return an updated channel",
400 )))?;
401
402 debug!(%channel_id, "foreign closed closed channel was deleted");
403 updated_channel
404 };
405
406 Ok(Some(ChainEventType::ChannelClosed(updated_channel)))
407 } else {
408 error!(%channel_id, "observed closure finalization event for a channel that does not exist.");
409 self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
410 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
411 }
412 }
413 HoprChannelsEvents::ChannelOpened(channel_opened) => {
414 let source: Address = channel_opened.source.into();
415 let destination: Address = channel_opened.destination.into();
416 let channel_id = generate_channel_id(&source, &destination);
417
418 let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
419 Ok(channel) => channel,
420 Err(e) => {
421 error!(%source, %destination, %channel_id, %e, "failed to begin channel update on on_channel_opened_event");
422 return Err(e.into());
423 }
424 };
425
426 let channel = if let Some(channel_edits) = maybe_channel {
427 if channel_edits.entry().status != ChannelStatus::Closed {
431 warn!(%source, %destination, %channel_id, "received Open event for a channel that is not Closed, marking it as corrupted");
432
433 self.db.finish_channel_update(tx.into(), channel_edits.delete()).await?;
434 self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
435
436 return Ok(None);
437 }
438
439 trace!(%source, %destination, %channel_id, "on_channel_reopened_event");
440
441 let current_epoch = channel_edits.entry().channel_epoch;
442
443 if source == self.chain_key.public().to_address()
445 || destination == self.chain_key.public().to_address()
446 {
447 self.db
448 .mark_tickets_as(TicketSelector::new(channel_id, current_epoch), TicketMarker::Neglected)
449 .await?;
450
451 self.db.reset_outgoing_ticket_index(channel_id).await?;
452 }
453
454 self.db
456 .finish_channel_update(
457 tx.into(),
458 channel_edits
459 .change_ticket_index(0_u32)
460 .change_epoch(current_epoch.add(1))
461 .change_status(ChannelStatus::Open),
462 )
463 .await?
464 .ok_or(CoreEthereumIndexerError::ProcessError(format!(
465 "channel opened event for channel {channel_id} did not return an updated channel",
466 )))?
467 } else {
468 trace!(%source, %destination, %channel_id, "on_channel_opened_event");
469
470 let new_channel = ChannelEntry::new(
471 source,
472 destination,
473 0_u32.into(),
474 0_u32.into(),
475 ChannelStatus::Open,
476 1_u32.into(),
477 );
478
479 self.db.upsert_channel(tx.into(), new_channel).await?;
480 new_channel
481 };
482
483 Ok(Some(ChainEventType::ChannelOpened(channel)))
484 }
485 HoprChannelsEvents::TicketRedeemed(ticket_redeemed) => {
486 let channel_id = ticket_redeemed.channelId.0.into();
487
488 let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
489 Ok(channel) => channel,
490 Err(e) => {
491 error!(%channel_id, %e, "failed to begin channel update on on_ticket_redeemed_event");
492 return Err(e.into());
493 }
494 };
495
496 if let Some(channel_edits) = maybe_channel {
497 let ack_ticket = match channel_edits.entry().direction(&self.chain_key.public().to_address()) {
498 Some(ChannelDirection::Incoming) => {
501 let mut matching_tickets = self
503 .db
504 .get_tickets(
505 TicketSelector::from(channel_edits.entry())
506 .with_state(AcknowledgedTicketStatus::BeingRedeemed),
507 )
508 .await?
509 .into_iter()
510 .filter(|ticket| {
511 let ticket_idx = ticket.verified_ticket().index;
516 let ticket_off = ticket.verified_ticket().index_offset as u64;
517
518 ticket_idx + ticket_off == ticket_redeemed.newTicketIndex.to::<u64>()
519 })
520 .collect::<Vec<_>>();
521
522 match matching_tickets.len().cmp(&1) {
523 Ordering::Equal => {
524 let ack_ticket = matching_tickets.pop().unwrap();
525
526 self.db
527 .mark_tickets_as((&ack_ticket).into(), TicketMarker::Redeemed)
528 .await?;
529 info!(%ack_ticket, "ticket marked as redeemed");
530 Some(ack_ticket)
531 }
532 Ordering::Less => {
533 error!(
534 idx = %ticket_redeemed.newTicketIndex.to::<u64>() - 1,
535 entry = %channel_edits.entry(),
536 "could not find acknowledged 'BeingRedeemed' ticket",
537 );
538 None
541 }
542 Ordering::Greater => {
543 error!(
544 count = matching_tickets.len(),
545 index = %ticket_redeemed.newTicketIndex.to::<u64>() - 1,
546 entry = %channel_edits.entry(),
547 "found tickets matching 'BeingRedeemed'",
548 );
549
550 let entry_str = channel_edits.entry().to_string();
551
552 self.db.finish_channel_update(tx.into(), channel_edits.delete()).await?;
553 self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
554
555 return Err(CoreEthereumIndexerError::ProcessError(format!(
556 "multiple tickets matching idx {} found in {}",
557 ticket_redeemed.newTicketIndex.to::<u64>() - 1,
558 entry_str
559 )));
560 }
561 }
562 }
563 Some(ChannelDirection::Outgoing) => {
568 debug!(channel = %channel_edits.entry(), "observed redeem event on an outgoing channel");
570 self.db
571 .compare_and_set_outgoing_ticket_index(
572 channel_edits.entry().get_id(),
573 ticket_redeemed.newTicketIndex.to::<u64>(),
574 )
575 .await?;
576 None
577 }
578 None => {
580 debug!(channel = %channel_edits.entry(), "observed redeem event on a foreign channel");
582 None
583 }
584 };
585
586 let channel = self
588 .db
589 .finish_channel_update(
590 tx.into(),
591 channel_edits.change_ticket_index(U256::from_be_bytes(
592 ticket_redeemed.newTicketIndex.to_be_bytes::<6>(),
593 )),
594 )
595 .await?
596 .ok_or(CoreEthereumIndexerError::ProcessError(format!(
597 "ticket redeemed event for channel {channel_id} did not return an updated channel",
598 )))?;
599
600 self.db
603 .mark_tickets_as(
604 TicketSelector::from(&channel)
605 .with_index_range(..ticket_redeemed.newTicketIndex.to::<u64>()),
606 TicketMarker::Neglected,
607 )
608 .await?;
609
610 Ok(Some(ChainEventType::TicketRedeemed(channel, ack_ticket)))
611 } else {
612 error!(%channel_id, "observed ticket redeem on a channel that we don't have in the DB");
613 self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
614 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
615 }
616 }
617 HoprChannelsEvents::OutgoingChannelClosureInitiated(closure_initiated) => {
618 let channel_id = closure_initiated.channelId.0.into();
619
620 let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
621 Ok(channel) => channel,
622 Err(e) => {
623 error!(%channel_id, %e, "failed to begin channel update on on_outgoing_channel_closure_initiated_event");
624 return Err(e.into());
625 }
626 };
627
628 let closure_time: u32 = closure_initiated.closureTime;
629 if let Some(channel_edits) = maybe_channel {
630 let new_status = ChannelStatus::PendingToClose(
631 SystemTime::UNIX_EPOCH.add(Duration::from_secs(closure_time.into())),
632 );
633
634 let channel = self
635 .db
636 .finish_channel_update(tx.into(), channel_edits.change_status(new_status))
637 .await?
638 .ok_or(CoreEthereumIndexerError::ProcessError(format!(
639 "channel closure initiation event for channel {channel_id} did not return an updated \
640 channel",
641 )))?;
642
643 Ok(Some(ChainEventType::ChannelClosureInitiated(channel)))
644 } else {
645 error!(%channel_id, "observed channel closure initiation on a channel that we don't have in the DB");
646 self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
647 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
648 }
649 }
650 HoprChannelsEvents::DomainSeparatorUpdated(domain_separator_updated) => {
651 self.db
652 .set_domain_separator(
653 Some(tx),
654 DomainSeparator::Channel,
655 domain_separator_updated.domainSeparator.0.into(),
656 )
657 .await?;
658
659 Ok(None)
660 }
661 HoprChannelsEvents::LedgerDomainSeparatorUpdated(ledger_domain_separator_updated) => {
662 self.db
663 .set_domain_separator(
664 Some(tx),
665 DomainSeparator::Ledger,
666 ledger_domain_separator_updated.ledgerDomainSeparator.0.into(),
667 )
668 .await?;
669
670 Ok(None)
671 }
672 }
673 }
674
675 async fn on_token_event(
676 &self,
677 tx: &OpenTransaction,
678 event: HoprTokenEvents,
679 is_synced: bool,
680 ) -> Result<Option<ChainEventType>> {
681 #[cfg(all(feature = "prometheus", not(test)))]
682 METRIC_INDEXER_LOG_COUNTERS.increment(&["token"]);
683
684 match event {
685 HoprTokenEvents::Transfer(transferred) => {
686 let from: Address = transferred.from.into();
687 let to: Address = transferred.to.into();
688
689 trace!(
690 safe_address = %&self.safe_address, %from, %to,
691 "on_token_transfer_event"
692 );
693
694 if to.ne(&self.safe_address) && from.ne(&self.safe_address) {
695 error!(
696 safe_address = %&self.safe_address, %from, %to,
697 "filter misconfiguration: transfer event not involving the safe");
698 return Ok(None);
699 }
700
701 if is_synced {
702 info!("updating safe balance from chain after transfer event");
703 match self.rpc_operations.get_hopr_balance(self.safe_address).await {
704 Ok(balance) => {
705 self.db.set_safe_hopr_balance(Some(tx), balance).await?;
706 }
707 Err(error) => {
708 error!(%error, "error getting safe balance from chain after transfer event");
709 }
710 }
711 info!("updating safe allowance from chain after transfer event");
712 match self
713 .rpc_operations
714 .get_hopr_allowance(self.safe_address, self.addresses.channels)
715 .await
716 {
717 Ok(allowance) => {
718 self.db.set_safe_hopr_allowance(Some(tx), allowance).await?;
719 }
720 Err(error) => {
721 error!(%error, "error getting safe allowance from chain after transfer event");
722 }
723 }
724 }
725 }
726 HoprTokenEvents::Approval(approved) => {
727 let owner: Address = approved.owner.into();
728 let spender: Address = approved.spender.into();
729
730 trace!(
731 address = %&self.safe_address, %owner, %spender, allowance = %approved.value,
732 "on_token_approval_event",
733
734 );
735
736 if owner.ne(&self.safe_address) || spender.ne(&self.addresses.channels) {
737 error!(
738 address = %&self.safe_address, %owner, %spender, allowance = %approved.value,
739 "filter misconfiguration: approval event not involving the safe and channels contract");
740 return Ok(None);
741 }
742
743 if is_synced {
745 info!("updating safe allowance from chain after approval event");
746 match self
747 .rpc_operations
748 .get_hopr_allowance(self.safe_address, self.addresses.channels)
749 .await
750 {
751 Ok(allowance) => {
752 self.db.set_safe_hopr_allowance(Some(tx), allowance).await?;
753 }
754 Err(error) => {
755 error!(%error, "error getting safe allowance from chain after approval event");
756 }
757 }
758 }
759 }
760 _ => error!("Implement all the other filters for HoprTokenEvents"),
761 }
762
763 Ok(None)
764 }
765
766 async fn on_network_registry_event(
767 &self,
768 tx: &OpenTransaction,
769 event: HoprNetworkRegistryEvents,
770 _is_synced: bool,
771 ) -> Result<Option<ChainEventType>> {
772 #[cfg(all(feature = "prometheus", not(test)))]
773 METRIC_INDEXER_LOG_COUNTERS.increment(&["network_registry"]);
774
775 match event {
776 HoprNetworkRegistryEvents::DeregisteredByManager(deregistered) => {
777 debug!(
778 node_address = %deregistered.nodeAddress,
779 "on_network_registry_deregistered_by_manager_event",
780 );
781 let node_address: Address = deregistered.nodeAddress.into();
782 self.db
783 .set_access_in_network_registry(Some(tx), node_address, false)
784 .await?;
785
786 return Ok(Some(ChainEventType::NetworkRegistryUpdate(
787 node_address,
788 NetworkRegistryStatus::Denied,
789 )));
790 }
791 HoprNetworkRegistryEvents::Deregistered(deregistered) => {
792 debug!(
793 node_address = %deregistered.nodeAddress,
794 "on_network_registry_deregistered_event",
795 );
796 let node_address: Address = deregistered.nodeAddress.into();
797 self.db
798 .set_access_in_network_registry(Some(tx), node_address, false)
799 .await?;
800
801 return Ok(Some(ChainEventType::NetworkRegistryUpdate(
802 node_address,
803 NetworkRegistryStatus::Denied,
804 )));
805 }
806 HoprNetworkRegistryEvents::RegisteredByManager(registered) => {
807 let node_address: Address = registered.nodeAddress.into();
808 debug!(
809 %node_address,
810 "Node registered by manager, adding to the network registry",
811 );
812 self.db
813 .set_access_in_network_registry(Some(tx), node_address, true)
814 .await?;
815
816 if node_address == self.chain_key.public().to_address() {
817 info!(
818 "This node has been added to the registry, node activation process continues on: http://hub.hoprnet.org/."
819 );
820 }
821
822 return Ok(Some(ChainEventType::NetworkRegistryUpdate(
823 node_address,
824 NetworkRegistryStatus::Allowed,
825 )));
826 }
827 HoprNetworkRegistryEvents::Registered(registered) => {
828 let node_address: Address = registered.nodeAddress.into();
829 debug!(
830 %node_address,
831 "Node registered, adding to the network registry",
832 );
833 self.db
834 .set_access_in_network_registry(Some(tx), node_address, true)
835 .await?;
836
837 if node_address == self.chain_key.public().to_address() {
838 info!(
839 "This node has been added to the registry, node can now continue the node activation process on: http://hub.hoprnet.org/."
840 );
841 }
842
843 return Ok(Some(ChainEventType::NetworkRegistryUpdate(
844 node_address,
845 NetworkRegistryStatus::Allowed,
846 )));
847 }
848 HoprNetworkRegistryEvents::EligibilityUpdated(eligibility_updated) => {
849 debug!(
850 staking_account = %eligibility_updated.stakingAccount,
851 eligibility = %eligibility_updated.eligibility,
852 "Node eligibility updated, updating the safe eligibility",
853 );
854 let account: Address = eligibility_updated.stakingAccount.into();
855 self.db
856 .set_safe_eligibility(Some(tx), account, eligibility_updated.eligibility)
857 .await?;
858 }
859 HoprNetworkRegistryEvents::NetworkRegistryStatusUpdated(enabled) => {
860 debug!(enabled = enabled.isEnabled, "on_network_registry_status_updated_event",);
861 self.db
862 .set_network_registry_enabled(Some(tx), enabled.isEnabled)
863 .await?;
864 }
865 _ => {
866 debug!("on_network_registry_event - not implemented for event: {:?}", event);
867 } };
869
870 Ok(None)
871 }
872
873 async fn on_node_safe_registry_event(
874 &self,
875 tx: &OpenTransaction,
876 event: HoprNodeSafeRegistryEvents,
877 _is_synced: bool,
878 ) -> Result<Option<ChainEventType>> {
879 #[cfg(all(feature = "prometheus", not(test)))]
880 METRIC_INDEXER_LOG_COUNTERS.increment(&["safe_registry"]);
881
882 match event {
883 HoprNodeSafeRegistryEvents::RegisteredNodeSafe(registered) => {
884 if self.chain_key.public().to_address() == registered.nodeAddress.into() {
885 info!(safe_address = %registered.safeAddress, "Node safe registered", );
886 return Ok(Some(ChainEventType::NodeSafeRegistered(registered.safeAddress.into())));
888 }
889 }
890 HoprNodeSafeRegistryEvents::DergisteredNodeSafe(deregistered) => {
891 if self.chain_key.public().to_address() == deregistered.nodeAddress.into() {
892 info!("Node safe unregistered");
893 }
895 }
896 HoprNodeSafeRegistryEvents::DomainSeparatorUpdated(domain_separator_updated) => {
897 self.db
898 .set_domain_separator(
899 Some(tx),
900 DomainSeparator::SafeRegistry,
901 domain_separator_updated.domainSeparator.0.into(),
902 )
903 .await?;
904 }
905 }
906
907 Ok(None)
908 }
909
910 async fn on_node_management_module_event(
911 &self,
912 _db: &OpenTransaction,
913 _event: HoprNodeManagementModuleEvents,
914 _is_synced: bool,
915 ) -> Result<Option<ChainEventType>> {
916 #[cfg(all(feature = "prometheus", not(test)))]
917 METRIC_INDEXER_LOG_COUNTERS.increment(&["node_management_module"]);
918
919 Ok(None)
921 }
922
923 async fn on_ticket_winning_probability_oracle_event(
924 &self,
925 tx: &OpenTransaction,
926 event: HoprWinningProbabilityOracleEvents,
927 _is_synced: bool,
928 ) -> Result<Option<ChainEventType>> {
929 #[cfg(all(feature = "prometheus", not(test)))]
930 METRIC_INDEXER_LOG_COUNTERS.increment(&["win_prob_oracle"]);
931
932 match event {
933 HoprWinningProbabilityOracleEvents::WinProbUpdated(update) => {
934 let old_minimum_win_prob: WinningProbability = update.oldWinProb.to_be_bytes().into();
935 let new_minimum_win_prob: WinningProbability = update.newWinProb.to_be_bytes().into();
936
937 trace!(
938 %old_minimum_win_prob,
939 %new_minimum_win_prob,
940 "on_ticket_minimum_win_prob_updated",
941 );
942
943 self.db
944 .set_minimum_incoming_ticket_win_prob(Some(tx), new_minimum_win_prob)
945 .await?;
946
947 info!(
948 %old_minimum_win_prob,
949 %new_minimum_win_prob,
950 "minimum ticket winning probability updated"
951 );
952
953 if old_minimum_win_prob.approx_cmp(&new_minimum_win_prob).is_lt() {
956 let mut selector: Option<TicketSelector> = None;
957 for channel in self.db.get_incoming_channels(tx.into()).await? {
958 selector = selector
959 .map(|s| s.also_on_channel(channel.get_id(), channel.channel_epoch))
960 .or_else(|| Some(TicketSelector::from(channel)));
961 }
962 if let Some(selector) = selector {
964 let num_rejected = self
965 .db
966 .mark_tickets_as(
967 selector.with_winning_probability(..new_minimum_win_prob),
968 TicketMarker::Rejected,
969 )
970 .await?;
971 info!(
972 count = num_rejected,
973 "unredeemed tickets were rejected, because the minimum winning probability has been \
974 increased"
975 );
976 }
977 }
978 }
979 _ => {
980 }
982 }
983 Ok(None)
984 }
985
986 async fn on_ticket_price_oracle_event(
987 &self,
988 tx: &OpenTransaction,
989 event: HoprTicketPriceOracleEvents,
990 _is_synced: bool,
991 ) -> Result<Option<ChainEventType>> {
992 #[cfg(all(feature = "prometheus", not(test)))]
993 METRIC_INDEXER_LOG_COUNTERS.increment(&["price_oracle"]);
994
995 match event {
996 HoprTicketPriceOracleEvents::TicketPriceUpdated(update) => {
997 trace!(
998 old = update._0.to_string(),
999 new = update._1.to_string(),
1000 "on_ticket_price_updated",
1001 );
1002
1003 self.db
1004 .update_ticket_price(Some(tx), HoprBalance::from_be_bytes(update._1.to_be_bytes::<32>()))
1005 .await?;
1006
1007 info!(price = %update._1, "ticket price updated");
1008 }
1009 HoprTicketPriceOracleEvents::OwnershipTransferred(_event) => {
1010 }
1012 }
1013 Ok(None)
1014 }
1015
1016 #[tracing::instrument(level = "debug", skip(self, slog), fields(log=%slog))]
1017 async fn process_log_event(
1018 &self,
1019 tx: &OpenTransaction,
1020 slog: SerializableLog,
1021 is_synced: bool,
1022 ) -> Result<Option<ChainEventType>> {
1023 trace!(log = %slog, "log content");
1024
1025 let log = Log::from(slog.clone());
1026
1027 let primitive_log = alloy::primitives::Log::new(
1028 slog.address.into(),
1029 slog.topics.iter().map(|h| B256::from_slice(h.as_ref())).collect(),
1030 slog.data.clone().into(),
1031 )
1032 .ok_or_else(|| {
1033 CoreEthereumIndexerError::ProcessError(format!("failed to convert log to primitive log: {slog:?}"))
1034 })?;
1035
1036 if log.address.eq(&self.addresses.announcements) {
1037 let bn = log.block_number as u32;
1038 let event = HoprAnnouncementsEvents::decode_log(&primitive_log)?;
1039 self.on_announcement_event(tx, event.data, bn, is_synced).await
1040 } else if log.address.eq(&self.addresses.channels) {
1041 let event = HoprChannelsEvents::decode_log(&primitive_log)?;
1042 match self.on_channel_event(tx, event.data, is_synced).await {
1043 Ok(res) => Ok(res),
1044 Err(CoreEthereumIndexerError::ChannelDoesNotExist) => {
1045 debug!(
1047 ?log,
1048 "channel didn't exist in the db. Created a corrupted channel entry and ignored event"
1049 );
1050 Ok(None)
1051 }
1052 Err(e) => Err(e),
1053 }
1054 } else if log.address.eq(&self.addresses.network_registry) {
1055 let event = HoprNetworkRegistryEvents::decode_log(&primitive_log)?;
1056 self.on_network_registry_event(tx, event.data, is_synced).await
1057 } else if log.address.eq(&self.addresses.token) {
1058 let event = HoprTokenEvents::decode_log(&primitive_log)?;
1059 self.on_token_event(tx, event.data, is_synced).await
1060 } else if log.address.eq(&self.addresses.safe_registry) {
1061 let event = HoprNodeSafeRegistryEvents::decode_log(&primitive_log)?;
1062 self.on_node_safe_registry_event(tx, event.data, is_synced).await
1063 } else if log.address.eq(&self.addresses.module_implementation) {
1064 let event = HoprNodeManagementModuleEvents::decode_log(&primitive_log)?;
1065 self.on_node_management_module_event(tx, event.data, is_synced).await
1066 } else if log.address.eq(&self.addresses.price_oracle) {
1067 let event = HoprTicketPriceOracleEvents::decode_log(&primitive_log)?;
1068 self.on_ticket_price_oracle_event(tx, event.data, is_synced).await
1069 } else if log.address.eq(&self.addresses.win_prob_oracle) {
1070 let event = HoprWinningProbabilityOracleEvents::decode_log(&primitive_log)?;
1071 self.on_ticket_winning_probability_oracle_event(tx, event.data, is_synced)
1072 .await
1073 } else {
1074 #[cfg(all(feature = "prometheus", not(test)))]
1075 METRIC_INDEXER_LOG_COUNTERS.increment(&["unknown"]);
1076
1077 error!(
1078 address = %log.address, log = ?log,
1079 "on_event error - unknown contract address, received log"
1080 );
1081 return Err(CoreEthereumIndexerError::UnknownContract(log.address));
1082 }
1083 }
1084}
1085
1086#[async_trait]
1087impl<T, Db> crate::traits::ChainLogHandler for ContractEventHandlers<T, Db>
1088where
1089 T: HoprIndexerRpcOperations + Clone + Send + Sync + 'static,
1090 Db: HoprDbAllOperations + Clone + Debug + Send + Sync + 'static,
1091{
1092 fn contract_addresses(&self) -> Vec<Address> {
1093 vec![
1094 self.addresses.announcements,
1095 self.addresses.channels,
1096 self.addresses.module_implementation,
1097 self.addresses.network_registry,
1098 self.addresses.price_oracle,
1099 self.addresses.win_prob_oracle,
1100 self.addresses.safe_registry,
1101 self.addresses.token,
1102 ]
1103 }
1104
1105 fn contract_addresses_map(&self) -> Arc<ContractAddresses> {
1106 self.addresses.clone()
1107 }
1108
1109 fn safe_address(&self) -> Address {
1110 self.safe_address
1111 }
1112
1113 fn contract_address_topics(&self, contract: Address) -> Vec<B256> {
1114 if contract.eq(&self.addresses.announcements) {
1115 crate::constants::topics::announcement()
1116 } else if contract.eq(&self.addresses.channels) {
1117 crate::constants::topics::channel()
1118 } else if contract.eq(&self.addresses.module_implementation) {
1119 crate::constants::topics::module_implementation()
1120 } else if contract.eq(&self.addresses.network_registry) {
1121 crate::constants::topics::network_registry()
1122 } else if contract.eq(&self.addresses.price_oracle) {
1123 crate::constants::topics::ticket_price_oracle()
1124 } else if contract.eq(&self.addresses.win_prob_oracle) {
1125 crate::constants::topics::winning_prob_oracle()
1126 } else if contract.eq(&self.addresses.safe_registry) {
1127 crate::constants::topics::node_safe_registry()
1128 } else {
1129 panic!("use of unsupported contract address: {contract}");
1130 }
1131 }
1132
1133 async fn collect_log_event(&self, slog: SerializableLog, is_synced: bool) -> Result<Option<SignificantChainEvent>> {
1134 let myself = self.clone();
1135 self.db
1136 .begin_transaction()
1137 .await?
1138 .perform(move |tx| {
1139 let log = slog.clone();
1140 let tx_hash = Hash::from(log.tx_hash);
1141 let log_id = log.log_index;
1142 let block_id = log.block_number;
1143
1144 Box::pin(async move {
1145 match myself.process_log_event(tx, log, is_synced).await {
1146 Ok(Some(event_type)) => {
1148 let significant_event = SignificantChainEvent { tx_hash, event_type };
1149 debug!(block_id, %tx_hash, log_id, ?significant_event, "indexer got significant_event");
1150 Ok(Some(significant_event))
1151 }
1152 Ok(None) => {
1153 debug!(block_id, %tx_hash, log_id, "no significant event in log");
1154 Ok(None)
1155 }
1156 Err(error) => {
1157 error!(block_id, %tx_hash, log_id, %error, "error processing log in tx");
1158 Err(error)
1159 }
1160 }
1161 })
1162 })
1163 .await
1164 }
1165}
1166
1167#[cfg(test)]
1168mod tests {
1169 use std::{
1170 sync::{Arc, atomic::Ordering},
1171 time::SystemTime,
1172 };
1173
1174 use alloy::{
1175 dyn_abi::DynSolValue,
1176 primitives::{Address as AlloyAddress, U256},
1177 sol_types::{SolEvent, SolValue},
1178 };
1179 use anyhow::{Context, anyhow};
1180 use hex_literal::hex;
1181 use hopr_chain_rpc::HoprIndexerRpcOperations;
1182 use hopr_chain_types::{
1183 ContractAddresses,
1184 chain_events::{ChainEventType, NetworkRegistryStatus},
1185 };
1186 use hopr_crypto_types::prelude::*;
1187 use hopr_db_sql::{
1188 HoprDbAllOperations, HoprDbGeneralModelOperations,
1189 accounts::{ChainOrPacketKey, HoprDbAccountOperations},
1190 api::{info::DomainSeparator, tickets::HoprDbTicketOperations},
1191 channels::HoprDbChannelOperations,
1192 corrupted_channels::HoprDbCorruptedChannelOperations,
1193 db::HoprDb,
1194 info::HoprDbInfoOperations,
1195 prelude::HoprDbResolverOperations,
1196 registry::HoprDbRegistryOperations,
1197 };
1198 use hopr_internal_types::prelude::*;
1199 use hopr_primitive_types::prelude::*;
1200 use multiaddr::Multiaddr;
1201 use primitive_types::H256;
1202
1203 use super::ContractEventHandlers;
1204
1205 lazy_static::lazy_static! {
1206 static ref SELF_PRIV_KEY: OffchainKeypair = OffchainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
1207 static ref COUNTERPARTY_CHAIN_KEY: ChainKeypair = ChainKeypair::random();
1208 static ref COUNTERPARTY_CHAIN_ADDRESS: Address = COUNTERPARTY_CHAIN_KEY.public().to_address();
1209 static ref SELF_CHAIN_KEY: ChainKeypair = ChainKeypair::random();
1210 static ref SELF_CHAIN_ADDRESS: Address = SELF_CHAIN_KEY.public().to_address();
1211 static ref STAKE_ADDRESS: Address = "4331eaa9542b6b034c43090d9ec1c2198758dbc3".parse().expect("lazy static address should be constructible");
1212 static ref CHANNELS_ADDR: Address = "bab20aea98368220baa4e3b7f151273ee71df93b".parse().expect("lazy static address should be constructible"); static ref TOKEN_ADDR: Address = "47d1677e018e79dcdd8a9c554466cb1556fa5007".parse().expect("lazy static address should be constructible"); static ref NETWORK_REGISTRY_ADDR: Address = "a469d0225f884fb989cbad4fe289f6fd2fb98051".parse().expect("lazy static address should be constructible"); static ref NODE_SAFE_REGISTRY_ADDR: Address = "0dcd1bf9a1b36ce34237eeafef220932846bcd82".parse().expect("lazy static address should be constructible"); static ref ANNOUNCEMENTS_ADDR: Address = "11db4791bf45ef31a10ea4a1b5cb90f46cc72c7e".parse().expect("lazy static address should be constructible"); static ref SAFE_MANAGEMENT_MODULE_ADDR: Address = "9b91245a65ad469163a86e32b2281af7a25f38ce".parse().expect("lazy static address should be constructible"); static ref SAFE_INSTANCE_ADDR: Address = "b93d7fdd605fb64fdcc87f21590f950170719d47".parse().expect("lazy static address should be constructible"); static ref TICKET_PRICE_ORACLE_ADDR: Address = "11db4391bf45ef31a10ea4a1b5cb90f46cc72c7e".parse().expect("lazy static address should be constructible"); static ref WIN_PROB_ORACLE_ADDR: Address = "00db4391bf45ef31a10ea4a1b5cb90f46cc64c7e".parse().expect("lazy static address should be constructible"); }
1222
1223 mockall::mock! {
1224 pub(crate) IndexerRpcOperations {}
1225
1226 #[async_trait::async_trait]
1227 impl HoprIndexerRpcOperations for IndexerRpcOperations {
1228 async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64>;
1229
1230 async fn get_hopr_allowance(&self, owner: Address, spender: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
1231
1232 async fn get_xdai_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<XDaiBalance>;
1233
1234 async fn get_hopr_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
1235
1236 fn try_stream_logs<'a>(
1237 &'a self,
1238 start_block_number: u64,
1239 filters: hopr_chain_rpc::FilterSet,
1240 is_synced: bool,
1241 ) -> hopr_chain_rpc::errors::Result<std::pin::Pin<Box<dyn futures::Stream<Item=hopr_chain_rpc::BlockWithLogs> + Send + 'a> > >;
1242 }
1243 }
1244
1245 #[derive(Clone)]
1246 pub struct ClonableMockOperations {
1247 pub inner: Arc<MockIndexerRpcOperations>,
1248 }
1249
1250 #[async_trait::async_trait]
1251 impl HoprIndexerRpcOperations for ClonableMockOperations {
1252 async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64> {
1253 self.inner.block_number().await
1254 }
1255
1256 async fn get_hopr_allowance(
1257 &self,
1258 owner: Address,
1259 spender: Address,
1260 ) -> hopr_chain_rpc::errors::Result<HoprBalance> {
1261 self.inner.get_hopr_allowance(owner, spender).await
1262 }
1263
1264 async fn get_xdai_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<XDaiBalance> {
1265 self.inner.get_xdai_balance(address).await
1266 }
1267
1268 async fn get_hopr_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<HoprBalance> {
1269 self.inner.get_hopr_balance(address).await
1270 }
1271
1272 fn try_stream_logs<'a>(
1273 &'a self,
1274 start_block_number: u64,
1275 filters: hopr_chain_rpc::FilterSet,
1276 is_synced: bool,
1277 ) -> hopr_chain_rpc::errors::Result<
1278 std::pin::Pin<Box<dyn futures::Stream<Item = hopr_chain_rpc::BlockWithLogs> + Send + 'a>>,
1279 > {
1280 self.inner.try_stream_logs(start_block_number, filters, is_synced)
1281 }
1282 }
1283
1284 fn init_handlers<T: Clone, Db: HoprDbAllOperations + Clone>(
1285 rpc_operations: T,
1286 db: Db,
1287 ) -> ContractEventHandlers<T, Db> {
1288 ContractEventHandlers {
1289 addresses: Arc::new(ContractAddresses {
1290 channels: *CHANNELS_ADDR,
1291 token: *TOKEN_ADDR,
1292 network_registry: *NETWORK_REGISTRY_ADDR,
1293 network_registry_proxy: Default::default(),
1294 safe_registry: *NODE_SAFE_REGISTRY_ADDR,
1295 announcements: *ANNOUNCEMENTS_ADDR,
1296 module_implementation: *SAFE_MANAGEMENT_MODULE_ADDR,
1297 price_oracle: *TICKET_PRICE_ORACLE_ADDR,
1298 win_prob_oracle: *WIN_PROB_ORACLE_ADDR,
1299 stake_factory: Default::default(),
1300 }),
1301 chain_key: SELF_CHAIN_KEY.clone(),
1302 safe_address: *STAKE_ADDRESS,
1303 db,
1304 rpc_operations,
1305 }
1306 }
1307
1308 fn test_log() -> SerializableLog {
1309 SerializableLog { ..Default::default() }
1310 }
1311
1312 #[tokio::test]
1313 async fn announce_keybinding() -> anyhow::Result<()> {
1314 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1315
1316 let rpc_operations = MockIndexerRpcOperations::new();
1317 let clonable_rpc_operations = ClonableMockOperations {
1319 inner: Arc::new(rpc_operations),
1320 };
1321
1322 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1323
1324 let keybinding = KeyBinding::new(*SELF_CHAIN_ADDRESS, &SELF_PRIV_KEY);
1325
1326 let keybinding_log = SerializableLog {
1327 address: handlers.addresses.announcements,
1328 topics: vec![
1329 hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::KeyBinding::SIGNATURE_HASH.into(),
1330 ],
1331 data: DynSolValue::Tuple(vec![
1332 DynSolValue::Bytes(keybinding.signature.as_ref().to_vec()),
1333 DynSolValue::Bytes(keybinding.packet_key.as_ref().to_vec()),
1334 DynSolValue::FixedBytes(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref()).into_word(), 32),
1335 ])
1336 .abi_encode_packed(),
1337 ..test_log()
1338 };
1339
1340 let account_entry = AccountEntry {
1341 public_key: *SELF_PRIV_KEY.public(),
1342 chain_addr: *SELF_CHAIN_ADDRESS,
1343 entry_type: AccountType::NotAnnounced,
1344 published_at: 0,
1345 };
1346
1347 let event_type = db
1348 .begin_transaction()
1349 .await?
1350 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, keybinding_log, true).await }))
1351 .await?;
1352
1353 assert!(event_type.is_none(), "keybinding does not have a chain event type");
1354
1355 assert_eq!(
1356 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1357 .await?
1358 .context("a value should be present")?,
1359 account_entry
1360 );
1361 Ok(())
1362 }
1363
1364 #[tokio::test]
1365 async fn announce_address_announcement() -> anyhow::Result<()> {
1366 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1367 let rpc_operations = MockIndexerRpcOperations::new();
1368 let clonable_rpc_operations = ClonableMockOperations {
1370 inner: Arc::new(rpc_operations),
1372 };
1373 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1374
1375 let account_entry = AccountEntry {
1377 public_key: *SELF_PRIV_KEY.public(),
1378 chain_addr: *SELF_CHAIN_ADDRESS,
1379 entry_type: AccountType::NotAnnounced,
1380 published_at: 1,
1381 };
1382 db.insert_account(None, account_entry.clone()).await?;
1383
1384 let test_multiaddr_empty: Multiaddr = "".parse()?;
1385
1386 let address_announcement_empty_log_encoded_data = DynSolValue::Tuple(vec![
1387 DynSolValue::Address(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref())),
1388 DynSolValue::String(test_multiaddr_empty.to_string()),
1389 ])
1390 .abi_encode();
1391
1392 let address_announcement_empty_log = SerializableLog {
1393 address: handlers.addresses.announcements,
1394 topics: vec![
1395 hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH
1396 .into(),
1397 ],
1398 data: address_announcement_empty_log_encoded_data[32..].into(),
1399 ..test_log()
1400 };
1401
1402 let handlers_clone = handlers.clone();
1403 let event_type = db
1404 .begin_transaction()
1405 .await?
1406 .perform(|tx| {
1407 Box::pin(async move {
1408 handlers_clone
1409 .process_log_event(tx, address_announcement_empty_log, true)
1410 .await
1411 })
1412 })
1413 .await?;
1414
1415 assert!(
1416 event_type.is_none(),
1417 "announcement of empty multiaddresses must pass through"
1418 );
1419
1420 assert_eq!(
1421 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1422 .await?
1423 .context("a value should be present")?,
1424 account_entry
1425 );
1426
1427 let test_multiaddr: Multiaddr = "/ip4/1.2.3.4/tcp/56".parse()?;
1428
1429 let address_announcement_log_encoded_data = DynSolValue::Tuple(vec![
1430 DynSolValue::Address(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref())),
1431 DynSolValue::String(test_multiaddr.to_string()),
1432 ])
1433 .abi_encode();
1434
1435 let address_announcement_log = SerializableLog {
1436 address: handlers.addresses.announcements,
1437 block_number: 1,
1438 topics: vec![
1439 hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH
1440 .into(),
1441 ],
1442 data: address_announcement_log_encoded_data[32..].into(),
1443 ..test_log()
1444 };
1445
1446 let announced_account_entry = AccountEntry {
1447 public_key: *SELF_PRIV_KEY.public(),
1448 chain_addr: *SELF_CHAIN_ADDRESS,
1449 entry_type: AccountType::Announced {
1450 multiaddr: test_multiaddr.clone(),
1451 updated_block: 1,
1452 },
1453 published_at: 1,
1454 };
1455
1456 let handlers_clone = handlers.clone();
1457 let event_type = db
1458 .begin_transaction()
1459 .await?
1460 .perform(|tx| {
1461 Box::pin(async move {
1462 handlers_clone
1463 .process_log_event(tx, address_announcement_log, true)
1464 .await
1465 })
1466 })
1467 .await?;
1468
1469 assert!(
1470 matches!(event_type, Some(ChainEventType::Announcement { multiaddresses,.. }) if multiaddresses == vec![test_multiaddr]),
1471 "must return the latest announce multiaddress"
1472 );
1473
1474 assert_eq!(
1475 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1476 .await?
1477 .context("a value should be present")?,
1478 announced_account_entry
1479 );
1480
1481 assert_eq!(
1482 Some(*SELF_CHAIN_ADDRESS),
1483 db.resolve_chain_key(SELF_PRIV_KEY.public()).await?,
1484 "must resolve correct chain key"
1485 );
1486
1487 assert_eq!(
1488 Some(*SELF_PRIV_KEY.public()),
1489 db.resolve_packet_key(&SELF_CHAIN_ADDRESS).await?,
1490 "must resolve correct packet key"
1491 );
1492
1493 let test_multiaddr_dns: Multiaddr = "/dns4/useful.domain/tcp/56".parse()?;
1494
1495 let address_announcement_dns_log_encoded_data = DynSolValue::Tuple(vec![
1496 DynSolValue::Address(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref())),
1497 DynSolValue::String(test_multiaddr_dns.to_string()),
1498 ])
1499 .abi_encode();
1500
1501 let address_announcement_dns_log = SerializableLog {
1502 address: handlers.addresses.announcements,
1503 block_number: 2,
1504 topics: vec![
1505 hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH
1506 .into(),
1507 ],
1508 data: address_announcement_dns_log_encoded_data[32..].into(),
1509 ..test_log()
1510 };
1511
1512 let announced_dns_account_entry = AccountEntry {
1513 public_key: *SELF_PRIV_KEY.public(),
1514 chain_addr: *SELF_CHAIN_ADDRESS,
1515 entry_type: AccountType::Announced {
1516 multiaddr: test_multiaddr_dns.clone(),
1517 updated_block: 2,
1518 },
1519 published_at: 1,
1520 };
1521
1522 let event_type = db
1523 .begin_transaction()
1524 .await?
1525 .perform(|tx| {
1526 Box::pin(async move { handlers.process_log_event(tx, address_announcement_dns_log, true).await })
1527 })
1528 .await?;
1529
1530 assert!(
1531 matches!(event_type, Some(ChainEventType::Announcement { multiaddresses,.. }) if multiaddresses == vec![test_multiaddr_dns]),
1532 "must return the latest announce multiaddress"
1533 );
1534
1535 assert_eq!(
1536 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1537 .await?
1538 .context("a value should be present")?,
1539 announced_dns_account_entry
1540 );
1541
1542 assert_eq!(
1543 Some(*SELF_CHAIN_ADDRESS),
1544 db.resolve_chain_key(SELF_PRIV_KEY.public()).await?,
1545 "must resolve correct chain key"
1546 );
1547
1548 assert_eq!(
1549 Some(*SELF_PRIV_KEY.public()),
1550 db.resolve_packet_key(&SELF_CHAIN_ADDRESS).await?,
1551 "must resolve correct packet key"
1552 );
1553 Ok(())
1554 }
1555
1556 #[tokio::test]
1557 async fn announce_revoke() -> anyhow::Result<()> {
1558 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1559 let rpc_operations = MockIndexerRpcOperations::new();
1560 let clonable_rpc_operations = ClonableMockOperations {
1562 inner: Arc::new(rpc_operations),
1564 };
1565 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1566
1567 let test_multiaddr: Multiaddr = "/ip4/1.2.3.4/tcp/56".parse()?;
1568
1569 let announced_account_entry = AccountEntry {
1571 public_key: *SELF_PRIV_KEY.public(),
1572 chain_addr: *SELF_CHAIN_ADDRESS,
1573 entry_type: AccountType::Announced {
1574 multiaddr: test_multiaddr,
1575 updated_block: 0,
1576 },
1577 published_at: 1,
1578 };
1579
1580 db.insert_account(None, announced_account_entry).await?;
1581
1582 let encoded_data = (AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref()),).abi_encode();
1583
1584 let revoke_announcement_log = SerializableLog {
1585 address: handlers.addresses.announcements,
1586 topics: vec![
1587 hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::RevokeAnnouncement::SIGNATURE_HASH
1588 .into(),
1589 ],
1590 data: encoded_data,
1591 ..test_log()
1592 };
1593
1594 let account_entry = AccountEntry {
1595 public_key: *SELF_PRIV_KEY.public(),
1596 chain_addr: *SELF_CHAIN_ADDRESS,
1597 entry_type: AccountType::NotAnnounced,
1598 published_at: 1,
1599 };
1600
1601 let event_type = db
1602 .begin_transaction()
1603 .await?
1604 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, revoke_announcement_log, true).await }))
1605 .await?;
1606
1607 assert!(
1608 event_type.is_none(),
1609 "revoke announcement does not have chain event type"
1610 );
1611
1612 assert_eq!(
1613 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1614 .await?
1615 .context("a value should be present")?,
1616 account_entry
1617 );
1618 Ok(())
1619 }
1620
1621 #[tokio::test]
1622 async fn on_token_transfer_to() -> anyhow::Result<()> {
1623 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1624
1625 let value = U256::MAX;
1626 let target_hopr_balance = HoprBalance::from(primitive_types::U256::from_big_endian(
1627 value.to_be_bytes_vec().as_slice(),
1628 ));
1629
1630 let mut rpc_operations = MockIndexerRpcOperations::new();
1631 rpc_operations
1632 .expect_get_hopr_balance()
1633 .times(1)
1634 .return_once(move |_| Ok(target_hopr_balance));
1635 rpc_operations
1636 .expect_get_hopr_allowance()
1637 .times(1)
1638 .returning(move |_, _| Ok(HoprBalance::from(primitive_types::U256::from(1000u64))));
1639 let clonable_rpc_operations = ClonableMockOperations {
1640 inner: Arc::new(rpc_operations),
1641 };
1642
1643 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1644
1645 let encoded_data = (value).abi_encode();
1646
1647 let transferred_log = SerializableLog {
1648 address: handlers.addresses.token,
1649 topics: vec![
1650 hopr_bindings::hoprtoken::HoprToken::Transfer::SIGNATURE_HASH.into(),
1651 H256::from_slice(&Address::default().to_bytes32()).into(),
1652 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1653 ],
1654 data: encoded_data,
1655 ..test_log()
1656 };
1657
1658 let event_type = db
1659 .begin_transaction()
1660 .await?
1661 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, transferred_log, true).await }))
1662 .await?;
1663
1664 assert!(event_type.is_none(), "token transfer does not have chain event type");
1665
1666 assert_eq!(db.get_safe_hopr_balance(None).await?, target_hopr_balance);
1667
1668 Ok(())
1669 }
1670
1671 #[test_log::test(tokio::test)]
1672 async fn on_token_transfer_from() -> anyhow::Result<()> {
1673 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1674
1675 let mut rpc_operations = MockIndexerRpcOperations::new();
1676 rpc_operations
1677 .expect_get_hopr_balance()
1678 .times(1)
1679 .return_once(|_| Ok(HoprBalance::zero()));
1680 rpc_operations
1681 .expect_get_hopr_allowance()
1682 .times(1)
1683 .returning(move |_, _| Ok(HoprBalance::from(primitive_types::U256::from(1000u64))));
1684 let clonable_rpc_operations = ClonableMockOperations {
1685 inner: Arc::new(rpc_operations),
1686 };
1687
1688 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1689
1690 let value = U256::MAX;
1691
1692 let encoded_data = (value).abi_encode();
1693
1694 db.set_safe_hopr_balance(
1695 None,
1696 HoprBalance::from(primitive_types::U256::from_big_endian(
1697 value.to_be_bytes_vec().as_slice(),
1698 )),
1699 )
1700 .await?;
1701
1702 let transferred_log = SerializableLog {
1703 address: handlers.addresses.token,
1704 topics: vec![
1705 hopr_bindings::hoprtoken::HoprToken::Transfer::SIGNATURE_HASH.into(),
1706 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1707 H256::from_slice(&Address::default().to_bytes32()).into(),
1708 ],
1709 data: encoded_data,
1710 ..test_log()
1711 };
1712
1713 let event_type = db
1714 .begin_transaction()
1715 .await?
1716 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, transferred_log, true).await }))
1717 .await?;
1718
1719 assert!(event_type.is_none(), "token transfer does not have chain event type");
1720
1721 assert_eq!(db.get_safe_hopr_balance(None).await?, HoprBalance::zero());
1722
1723 Ok(())
1724 }
1725
1726 #[tokio::test]
1727 async fn on_token_approval_correct() -> anyhow::Result<()> {
1728 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1729
1730 let target_allowance = HoprBalance::from(primitive_types::U256::from(1000u64));
1731 let mut rpc_operations = MockIndexerRpcOperations::new();
1732 rpc_operations
1733 .expect_get_hopr_allowance()
1734 .times(2)
1735 .returning(move |_, _| Ok(target_allowance));
1736 let clonable_rpc_operations = ClonableMockOperations {
1737 inner: Arc::new(rpc_operations),
1738 };
1739
1740 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1741
1742 let encoded_data = (U256::from(1000u64)).abi_encode();
1743
1744 let approval_log = SerializableLog {
1745 address: handlers.addresses.token,
1746 topics: vec![
1747 hopr_bindings::hoprtoken::HoprToken::Approval::SIGNATURE_HASH.into(),
1748 H256::from_slice(&handlers.safe_address.to_bytes32()).into(),
1749 H256::from_slice(&handlers.addresses.channels.to_bytes32()).into(),
1750 ],
1751 data: encoded_data,
1752 ..test_log()
1753 };
1754
1755 assert_eq!(db.get_safe_hopr_allowance(None).await?, HoprBalance::zero());
1757
1758 let approval_log_clone = approval_log.clone();
1759 let handlers_clone = handlers.clone();
1760 let event_type = db
1761 .begin_transaction()
1762 .await?
1763 .perform(|tx| Box::pin(async move { handlers_clone.process_log_event(tx, approval_log_clone, true).await }))
1764 .await?;
1765
1766 assert!(event_type.is_none(), "token approval does not have chain event type");
1767
1768 assert_eq!(db.get_safe_hopr_allowance(None).await?, target_allowance.clone());
1770
1771 let _ = db
1773 .set_safe_hopr_allowance(None, HoprBalance::from(primitive_types::U256::from(10u64)))
1774 .await;
1775 assert_eq!(
1776 db.get_safe_hopr_allowance(None).await?,
1777 HoprBalance::from(primitive_types::U256::from(10u64))
1778 );
1779
1780 let handlers_clone = handlers.clone();
1781 let event_type = db
1782 .begin_transaction()
1783 .await?
1784 .perform(|tx| Box::pin(async move { handlers_clone.process_log_event(tx, approval_log, true).await }))
1785 .await?;
1786
1787 assert!(event_type.is_none(), "token approval does not have chain event type");
1788
1789 assert_eq!(db.get_safe_hopr_allowance(None).await?, target_allowance);
1790 Ok(())
1791 }
1792
1793 #[tokio::test]
1794 async fn on_network_registry_event_registered() -> anyhow::Result<()> {
1795 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1796 let rpc_operations = MockIndexerRpcOperations::new();
1797 let clonable_rpc_operations = ClonableMockOperations {
1799 inner: Arc::new(rpc_operations),
1801 };
1802 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1803
1804 let encoded_data = ().abi_encode();
1805
1806 let registered_log = SerializableLog {
1807 address: handlers.addresses.network_registry,
1808 topics: vec![
1809 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::Registered::SIGNATURE_HASH.into(),
1810 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1811 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1812 ],
1813 data: encoded_data,
1814 ..test_log()
1816 };
1817
1818 assert!(
1819 !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1820 .await?
1821 );
1822
1823 let event_type = db
1824 .begin_transaction()
1825 .await?
1826 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1827 .await?;
1828
1829 assert!(
1830 matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Allowed),
1831 "must return correct NR update"
1832 );
1833
1834 assert!(
1835 db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1836 .await?,
1837 "must be allowed in NR"
1838 );
1839 Ok(())
1840 }
1841
1842 #[tokio::test]
1843 async fn on_network_registry_event_registered_by_manager() -> anyhow::Result<()> {
1844 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1845 let rpc_operations = MockIndexerRpcOperations::new();
1846 let clonable_rpc_operations = ClonableMockOperations {
1848 inner: Arc::new(rpc_operations),
1850 };
1851 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1852
1853 let registered_log = SerializableLog {
1854 address: handlers.addresses.network_registry,
1855 topics: vec![
1856 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::RegisteredByManager::SIGNATURE_HASH.into(),
1857 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1859 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1860 ],
1861 data: ().abi_encode(),
1862 ..test_log()
1864 };
1865
1866 assert!(
1867 !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1868 .await?
1869 );
1870
1871 let event_type = db
1872 .begin_transaction()
1873 .await?
1874 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1875 .await?;
1876
1877 assert!(
1878 matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Allowed),
1879 "must return correct NR update"
1880 );
1881
1882 assert!(
1883 db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1884 .await?,
1885 "must be allowed in NR"
1886 );
1887 Ok(())
1888 }
1889
1890 #[tokio::test]
1891 async fn on_network_registry_event_deregistered() -> anyhow::Result<()> {
1892 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1893 let rpc_operations = MockIndexerRpcOperations::new();
1894 let clonable_rpc_operations = ClonableMockOperations {
1896 inner: Arc::new(rpc_operations),
1898 };
1899 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1900
1901 db.set_access_in_network_registry(None, *SELF_CHAIN_ADDRESS, true)
1902 .await?;
1903
1904 let encoded_data = ().abi_encode();
1905
1906 let registered_log = SerializableLog {
1907 address: handlers.addresses.network_registry,
1908 topics: vec![
1909 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::Deregistered::SIGNATURE_HASH.into(),
1910 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1911 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1912 ],
1913 data: encoded_data,
1914 ..test_log()
1915 };
1916
1917 assert!(
1918 db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1919 .await?
1920 );
1921
1922 let event_type = db
1923 .begin_transaction()
1924 .await?
1925 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1926 .await?;
1927
1928 assert!(
1929 matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Denied),
1930 "must return correct NR update"
1931 );
1932
1933 assert!(
1934 !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1935 .await?,
1936 "must not be allowed in NR"
1937 );
1938 Ok(())
1939 }
1940
1941 #[tokio::test]
1942 async fn on_network_registry_event_deregistered_by_manager() -> anyhow::Result<()> {
1943 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1944 let rpc_operations = MockIndexerRpcOperations::new();
1945 let clonable_rpc_operations = ClonableMockOperations {
1947 inner: Arc::new(rpc_operations),
1949 };
1950 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1951
1952 db.set_access_in_network_registry(None, *SELF_CHAIN_ADDRESS, true)
1953 .await?;
1954
1955 let encoded_data = ().abi_encode();
1956
1957 let registered_log = SerializableLog {
1958 address: handlers.addresses.network_registry,
1959 topics: vec![
1960 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::DeregisteredByManager::SIGNATURE_HASH.into(),
1961 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1963 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1964 ],
1965 data: encoded_data,
1966 ..test_log()
1967 };
1968
1969 assert!(
1970 db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1971 .await?
1972 );
1973
1974 let event_type = db
1975 .begin_transaction()
1976 .await?
1977 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1978 .await?;
1979
1980 assert!(
1981 matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Denied),
1982 "must return correct NR update"
1983 );
1984
1985 assert!(
1986 !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1987 .await?,
1988 "must not be allowed in NR"
1989 );
1990 Ok(())
1991 }
1992
1993 #[tokio::test]
1994 async fn on_network_registry_event_enabled() -> anyhow::Result<()> {
1995 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1996 let rpc_operations = MockIndexerRpcOperations::new();
1997 let clonable_rpc_operations = ClonableMockOperations {
1999 inner: Arc::new(rpc_operations),
2001 };
2002 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2003
2004 let encoded_data = ().abi_encode();
2005
2006 let nr_enabled = SerializableLog {
2007 address: handlers.addresses.network_registry,
2008 topics: vec![
2009 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::NetworkRegistryStatusUpdated::SIGNATURE_HASH
2010 .into(),
2011 H256::from_low_u64_be(1).into(),
2013 ],
2014 data: encoded_data,
2015 ..test_log()
2016 };
2017
2018 let event_type = db
2019 .begin_transaction()
2020 .await?
2021 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, nr_enabled, false).await }))
2022 .await?;
2023
2024 assert!(event_type.is_none(), "there's no chain event type for nr disable");
2025
2026 assert!(db.get_indexer_data(None).await?.nr_enabled);
2027 Ok(())
2028 }
2029
2030 #[tokio::test]
2031 async fn on_network_registry_event_disabled() -> anyhow::Result<()> {
2032 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2033 let rpc_operations = MockIndexerRpcOperations::new();
2034 let clonable_rpc_operations = ClonableMockOperations {
2036 inner: Arc::new(rpc_operations),
2038 };
2039 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2040
2041 db.set_network_registry_enabled(None, true).await?;
2042
2043 let encoded_data = ().abi_encode();
2044
2045 let nr_disabled = SerializableLog {
2046 address: handlers.addresses.network_registry,
2047 topics: vec![
2048 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::NetworkRegistryStatusUpdated::SIGNATURE_HASH
2049 .into(),
2050 H256::from_low_u64_be(0).into(),
2052 ],
2053 data: encoded_data,
2054 ..test_log()
2055 };
2056
2057 let event_type = db
2058 .begin_transaction()
2059 .await?
2060 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, nr_disabled, false).await }))
2061 .await?;
2062
2063 assert!(event_type.is_none(), "there's no chain event type for nr enable");
2064
2065 assert!(!db.get_indexer_data(None).await?.nr_enabled);
2066 Ok(())
2067 }
2068
2069 #[tokio::test]
2070 async fn on_network_registry_set_eligible() -> anyhow::Result<()> {
2071 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2072 let rpc_operations = MockIndexerRpcOperations::new();
2073 let clonable_rpc_operations = ClonableMockOperations {
2075 inner: Arc::new(rpc_operations),
2077 };
2078 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2079
2080 let encoded_data = ().abi_encode();
2081
2082 let set_eligible = SerializableLog {
2083 address: handlers.addresses.network_registry,
2084 topics: vec![
2085 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::EligibilityUpdated::SIGNATURE_HASH.into(),
2086 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
2088 H256::from_low_u64_be(1).into(),
2089 ],
2090 data: encoded_data,
2091 ..test_log()
2092 };
2093
2094 let event_type = db
2095 .begin_transaction()
2096 .await?
2097 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, set_eligible, false).await }))
2098 .await?;
2099
2100 assert!(
2101 event_type.is_none(),
2102 "there's no chain event type for setting nr eligibility"
2103 );
2104
2105 assert!(db.is_safe_eligible(None, *STAKE_ADDRESS).await?);
2106
2107 Ok(())
2108 }
2109
2110 #[tokio::test]
2111 async fn on_network_registry_set_not_eligible() -> anyhow::Result<()> {
2112 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2113 let rpc_operations = MockIndexerRpcOperations::new();
2114 let clonable_rpc_operations = ClonableMockOperations {
2116 inner: Arc::new(rpc_operations),
2118 };
2119 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2120
2121 db.set_safe_eligibility(None, *STAKE_ADDRESS, false).await?;
2122
2123 let encoded_data = ().abi_encode();
2124
2125 let set_eligible = SerializableLog {
2126 address: handlers.addresses.network_registry,
2127 topics: vec![
2128 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::EligibilityUpdated::SIGNATURE_HASH.into(),
2129 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
2131 H256::from_low_u64_be(0).into(),
2132 ],
2133 data: encoded_data,
2134 ..test_log()
2135 };
2136
2137 let event_type = db
2138 .begin_transaction()
2139 .await?
2140 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, set_eligible, true).await }))
2141 .await?;
2142
2143 assert!(
2144 event_type.is_none(),
2145 "there's no chain event type for unsetting nr eligibility"
2146 );
2147
2148 assert!(!db.is_safe_eligible(None, *STAKE_ADDRESS).await?);
2149
2150 Ok(())
2151 }
2152
2153 #[tokio::test]
2154 async fn on_channel_event_balance_increased() -> anyhow::Result<()> {
2155 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2156
2157 let value = U256::MAX;
2158 let target_hopr_balance = HoprBalance::from(primitive_types::U256::from_big_endian(
2159 value.to_be_bytes_vec().as_slice(),
2160 ));
2161
2162 let mut rpc_operations = MockIndexerRpcOperations::new();
2163 rpc_operations
2164 .expect_get_hopr_balance()
2165 .times(1)
2166 .return_once(move |_| Ok(target_hopr_balance));
2167 rpc_operations
2168 .expect_get_hopr_allowance()
2169 .times(1)
2170 .returning(move |_, _| Ok(HoprBalance::from(primitive_types::U256::from(1000u64))));
2171 let clonable_rpc_operations = ClonableMockOperations {
2172 inner: Arc::new(rpc_operations),
2173 };
2174 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2175
2176 let channel = ChannelEntry::new(
2177 *SELF_CHAIN_ADDRESS,
2178 *COUNTERPARTY_CHAIN_ADDRESS,
2179 0.into(),
2180 primitive_types::U256::zero(),
2181 ChannelStatus::Open,
2182 primitive_types::U256::one(),
2183 );
2184
2185 db.upsert_channel(None, channel).await?;
2186
2187 let solidity_balance: HoprBalance = primitive_types::U256::from((1u128 << 96) - 1).into();
2188 let diff = solidity_balance - channel.balance;
2189
2190 let encoded_data = (solidity_balance.amount().to_be_bytes()).abi_encode();
2191
2192 let balance_increased_log = SerializableLog {
2193 address: handlers.addresses.channels,
2194 topics: vec![
2195 hopr_bindings::hoprchannels::HoprChannels::ChannelBalanceIncreased::SIGNATURE_HASH.into(),
2196 H256::from_slice(channel.get_id().as_ref()).into(),
2198 ],
2199 data: encoded_data,
2200 ..test_log()
2201 };
2202
2203 let event_type = db
2204 .begin_transaction()
2205 .await?
2206 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_increased_log, true).await }))
2207 .await?;
2208
2209 let channel = db
2210 .get_channel_by_id(None, &channel.get_id())
2211 .await?
2212 .context("a value should be present")?;
2213
2214 assert!(
2215 matches!(event_type, Some(ChainEventType::ChannelBalanceIncreased(c, b)) if c == channel && b == diff),
2216 "must return updated channel entry and balance diff"
2217 );
2218
2219 assert_eq!(solidity_balance, channel.balance, "balance must be updated");
2220 Ok(())
2221 }
2222
2223 #[tokio::test]
2224 async fn on_channel_event_domain_separator_updated() -> anyhow::Result<()> {
2225 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2226 let rpc_operations = MockIndexerRpcOperations::new();
2227 let clonable_rpc_operations = ClonableMockOperations {
2229 inner: Arc::new(rpc_operations),
2231 };
2232 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2233
2234 let separator = Hash::from(hopr_crypto_random::random_bytes());
2235
2236 let encoded_data = ().abi_encode();
2237
2238 let channels_dst_updated = SerializableLog {
2239 address: handlers.addresses.channels,
2240 topics: vec![
2241 hopr_bindings::hoprchannels::HoprChannels::DomainSeparatorUpdated::SIGNATURE_HASH.into(),
2242 H256::from_slice(separator.as_ref()).into(),
2244 ],
2245 data: encoded_data,
2246 ..test_log()
2247 };
2248
2249 assert!(db.get_indexer_data(None).await?.channels_dst.is_none());
2250
2251 let event_type = db
2252 .begin_transaction()
2253 .await?
2254 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channels_dst_updated, true).await }))
2255 .await?;
2256
2257 assert!(
2258 event_type.is_none(),
2259 "there's no chain event type for channel dst update"
2260 );
2261
2262 assert_eq!(
2263 separator,
2264 db.get_indexer_data(None)
2265 .await?
2266 .channels_dst
2267 .context("a value should be present")?,
2268 "separator must be updated"
2269 );
2270 Ok(())
2271 }
2272
2273 #[tokio::test]
2274 async fn on_channel_event_balance_decreased() -> anyhow::Result<()> {
2275 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2276
2277 let value = U256::MAX;
2278 let target_hopr_balance = HoprBalance::from(primitive_types::U256::from_big_endian(
2279 value.to_be_bytes_vec().as_slice(),
2280 ));
2281
2282 let mut rpc_operations = MockIndexerRpcOperations::new();
2283 rpc_operations
2284 .expect_get_hopr_balance()
2285 .times(1)
2286 .return_once(move |_| Ok(target_hopr_balance));
2287 let clonable_rpc_operations = ClonableMockOperations {
2288 inner: Arc::new(rpc_operations),
2289 };
2290 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2291
2292 let channel = ChannelEntry::new(
2293 *SELF_CHAIN_ADDRESS,
2294 *COUNTERPARTY_CHAIN_ADDRESS,
2295 HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1)),
2296 primitive_types::U256::zero(),
2297 ChannelStatus::Open,
2298 primitive_types::U256::one(),
2299 );
2300
2301 db.upsert_channel(None, channel).await?;
2302
2303 let solidity_balance: HoprBalance = primitive_types::U256::from((1u128 << 96) - 2).into();
2304 let diff = channel.balance - solidity_balance;
2305
2306 let encoded_data = DynSolValue::Tuple(vec![DynSolValue::Uint(
2308 U256::from_be_slice(&solidity_balance.amount().to_be_bytes()),
2309 256,
2310 )])
2311 .abi_encode();
2312
2313 let balance_decreased_log = SerializableLog {
2314 address: handlers.addresses.channels,
2315 topics: vec![
2316 hopr_bindings::hoprchannels::HoprChannels::ChannelBalanceDecreased::SIGNATURE_HASH.into(),
2317 H256::from_slice(channel.get_id().as_ref()).into(),
2319 ],
2320 data: encoded_data,
2321 ..test_log()
2322 };
2323
2324 let event_type = db
2325 .begin_transaction()
2326 .await?
2327 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_decreased_log, true).await }))
2328 .await?;
2329
2330 let channel = db
2331 .get_channel_by_id(None, &channel.get_id())
2332 .await?
2333 .context("a value should be present")?;
2334
2335 assert!(
2336 matches!(event_type, Some(ChainEventType::ChannelBalanceDecreased(c, b)) if c == channel && b == diff),
2337 "must return updated channel entry and balance diff"
2338 );
2339
2340 assert_eq!(solidity_balance, channel.balance, "balance must be updated");
2341 Ok(())
2342 }
2343
2344 #[tokio::test]
2345 async fn on_channel_closed() -> anyhow::Result<()> {
2346 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2347 let rpc_operations = MockIndexerRpcOperations::new();
2348 let clonable_rpc_operations = ClonableMockOperations {
2350 inner: Arc::new(rpc_operations),
2352 };
2353 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2354
2355 let starting_balance = HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1));
2356
2357 let channel = ChannelEntry::new(
2358 *SELF_CHAIN_ADDRESS,
2359 *COUNTERPARTY_CHAIN_ADDRESS,
2360 starting_balance,
2361 primitive_types::U256::zero(),
2362 ChannelStatus::Open,
2363 primitive_types::U256::one(),
2364 );
2365
2366 db.upsert_channel(None, channel).await?;
2367
2368 let encoded_data = ().abi_encode();
2369
2370 let channel_closed_log = SerializableLog {
2371 address: handlers.addresses.channels,
2372 topics: vec![
2373 hopr_bindings::hoprchannels::HoprChannels::ChannelClosed::SIGNATURE_HASH.into(),
2374 H256::from_slice(channel.get_id().as_ref()).into(),
2376 ],
2377 data: encoded_data,
2378 ..test_log()
2379 };
2380
2381 let event_type = db
2382 .begin_transaction()
2383 .await?
2384 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_closed_log, true).await }))
2385 .await?;
2386
2387 let closed_channel = db
2388 .get_channel_by_id(None, &channel.get_id())
2389 .await?
2390 .context("a value should be present")?;
2391
2392 assert!(
2393 matches!(event_type, Some(ChainEventType::ChannelClosed(c)) if c == closed_channel),
2394 "must return the updated channel entry"
2395 );
2396
2397 assert_eq!(closed_channel.status, ChannelStatus::Closed);
2398 assert_eq!(closed_channel.ticket_index, 0u64.into());
2399 assert_eq!(
2400 0,
2401 db.get_outgoing_ticket_index(closed_channel.get_id())
2402 .await?
2403 .load(Ordering::Relaxed)
2404 );
2405
2406 assert!(closed_channel.balance.amount().eq(&primitive_types::U256::zero()));
2407 Ok(())
2408 }
2409
2410 #[tokio::test]
2411 async fn on_foreign_channel_closed() -> anyhow::Result<()> {
2412 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2413 let rpc_operations = MockIndexerRpcOperations::new();
2414 let clonable_rpc_operations = ClonableMockOperations {
2416 inner: Arc::new(rpc_operations),
2418 };
2419 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2420
2421 let starting_balance = HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1));
2422
2423 let channel = ChannelEntry::new(
2424 Address::new(&hex!("B7397C218766eBe6A1A634df523A1a7e412e67eA")),
2425 Address::new(&hex!("D4fdec44DB9D44B8f2b6d529620f9C0C7066A2c1")),
2426 starting_balance,
2427 primitive_types::U256::zero(),
2428 ChannelStatus::Open,
2429 primitive_types::U256::one(),
2430 );
2431
2432 db.upsert_channel(None, channel).await?;
2433
2434 let encoded_data = ().abi_encode();
2435
2436 let channel_closed_log = SerializableLog {
2437 address: handlers.addresses.channels,
2438 topics: vec![
2439 hopr_bindings::hoprchannels::HoprChannels::ChannelClosed::SIGNATURE_HASH.into(),
2440 H256::from_slice(channel.get_id().as_ref()).into(),
2442 ],
2443 data: encoded_data,
2444 ..test_log()
2445 };
2446
2447 let event_type = db
2448 .begin_transaction()
2449 .await?
2450 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_closed_log, true).await }))
2451 .await?;
2452
2453 let closed_channel = db.get_channel_by_id(None, &channel.get_id()).await?;
2454
2455 assert_eq!(None, closed_channel, "foreign channel must be deleted");
2456
2457 assert!(
2458 matches!(event_type, Some(ChainEventType::ChannelClosed(c)) if c.get_id() == channel.get_id()),
2459 "must return the closed channel entry"
2460 );
2461
2462 Ok(())
2463 }
2464
2465 #[tokio::test]
2466 async fn on_channel_opened() -> anyhow::Result<()> {
2467 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2468 let rpc_operations = MockIndexerRpcOperations::new();
2469 let clonable_rpc_operations = ClonableMockOperations {
2471 inner: Arc::new(rpc_operations),
2473 };
2474 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2475
2476 let channel_id = generate_channel_id(&SELF_CHAIN_ADDRESS, &COUNTERPARTY_CHAIN_ADDRESS);
2477
2478 let encoded_data = ().abi_encode();
2479
2480 let channel_opened_log = SerializableLog {
2481 address: handlers.addresses.channels,
2482 topics: vec![
2483 hopr_bindings::hoprchannels::HoprChannels::ChannelOpened::SIGNATURE_HASH.into(),
2484 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2486 H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2487 ],
2488 data: encoded_data,
2489 ..test_log()
2490 };
2491
2492 let event_type = db
2493 .begin_transaction()
2494 .await?
2495 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log, true).await }))
2496 .await?;
2497
2498 let channel = db
2499 .get_channel_by_id(None, &channel_id)
2500 .await?
2501 .context("a value should be present")?;
2502
2503 assert!(
2504 matches!(event_type, Some(ChainEventType::ChannelOpened(c)) if c == channel),
2505 "must return the updated channel entry"
2506 );
2507
2508 assert_eq!(channel.status, ChannelStatus::Open);
2509 assert_eq!(channel.channel_epoch, 1u64.into());
2510 assert_eq!(channel.ticket_index, 0u64.into());
2511 assert_eq!(
2512 0,
2513 db.get_outgoing_ticket_index(channel.get_id())
2514 .await?
2515 .load(Ordering::Relaxed)
2516 );
2517 Ok(())
2518 }
2519
2520 #[tokio::test]
2521 async fn on_channel_reopened() -> anyhow::Result<()> {
2522 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2523 let rpc_operations = MockIndexerRpcOperations::new();
2524 let clonable_rpc_operations = ClonableMockOperations {
2526 inner: Arc::new(rpc_operations),
2528 };
2529 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2530
2531 let channel = ChannelEntry::new(
2532 *SELF_CHAIN_ADDRESS,
2533 *COUNTERPARTY_CHAIN_ADDRESS,
2534 HoprBalance::zero(),
2535 primitive_types::U256::zero(),
2536 ChannelStatus::Closed,
2537 3.into(),
2538 );
2539
2540 db.upsert_channel(None, channel).await?;
2541
2542 let encoded_data = ().abi_encode();
2543
2544 let channel_opened_log = SerializableLog {
2545 address: handlers.addresses.channels,
2546 topics: vec![
2547 hopr_bindings::hoprchannels::HoprChannels::ChannelOpened::SIGNATURE_HASH.into(),
2548 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2550 H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2551 ],
2552 data: encoded_data,
2553 ..test_log()
2554 };
2555
2556 let event_type = db
2557 .begin_transaction()
2558 .await?
2559 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log, true).await }))
2560 .await?;
2561
2562 let channel = db
2563 .get_channel_by_id(None, &channel.get_id())
2564 .await?
2565 .context("a value should be present")?;
2566
2567 assert!(
2568 matches!(event_type, Some(ChainEventType::ChannelOpened(c)) if c == channel),
2569 "must return the updated channel entry"
2570 );
2571
2572 assert_eq!(channel.status, ChannelStatus::Open);
2573 assert_eq!(channel.channel_epoch, 4u64.into());
2574 assert_eq!(channel.ticket_index, 0u64.into());
2575
2576 assert_eq!(
2577 0,
2578 db.get_outgoing_ticket_index(channel.get_id())
2579 .await?
2580 .load(Ordering::Relaxed)
2581 );
2582 Ok(())
2583 }
2584
2585 #[tokio::test]
2586 async fn on_channel_should_not_reopen_when_not_closed() -> anyhow::Result<()> {
2587 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2588 let rpc_operations = MockIndexerRpcOperations::new();
2589 let clonable_rpc_operations = ClonableMockOperations {
2591 inner: Arc::new(rpc_operations),
2593 };
2594 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2595
2596 let channel = ChannelEntry::new(
2597 *SELF_CHAIN_ADDRESS,
2598 *COUNTERPARTY_CHAIN_ADDRESS,
2599 0.into(),
2600 primitive_types::U256::zero(),
2601 ChannelStatus::Open,
2602 3.into(),
2603 );
2604
2605 db.upsert_channel(None, channel).await?;
2606
2607 let encoded_data = ().abi_encode();
2608
2609 let channel_opened_log = SerializableLog {
2610 address: handlers.addresses.channels,
2611 topics: vec![
2612 hopr_bindings::hoprchannels::HoprChannels::ChannelOpened::SIGNATURE_HASH.into(),
2613 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2615 H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2616 ],
2617 data: encoded_data,
2618 ..test_log()
2619 };
2620
2621 db.begin_transaction()
2622 .await?
2623 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log, true).await }))
2624 .await
2625 .context("Channel should stay open, with corrupted flag set")?;
2626
2627 assert!(
2628 db.get_channel_by_id(None, &channel.get_id()).await?.is_none(),
2629 "channel should not be returned as marked as corrupted",
2630 );
2631
2632 db.get_corrupted_channel_by_id(None, &channel.get_id())
2633 .await?
2634 .context("a value should be present")?;
2635
2636 Ok(())
2637 }
2638
2639 #[tokio::test]
2640 async fn event_for_non_existing_channel_should_create_corrupted_channel() -> anyhow::Result<()> {
2641 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2642 let rpc_operations = MockIndexerRpcOperations::new();
2643 let clonable_rpc_operations = ClonableMockOperations {
2645 inner: Arc::new(rpc_operations),
2647 };
2648 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2649
2650 let channel_id = generate_channel_id(&SELF_CHAIN_ADDRESS, &COUNTERPARTY_CHAIN_ADDRESS);
2651
2652 let solidity_balance: HoprBalance = primitive_types::U256::from((1u128 << 96) - 1).into();
2654
2655 let encoded_data = (solidity_balance.amount().to_be_bytes()).abi_encode();
2656
2657 let balance_increased_log = SerializableLog {
2658 address: handlers.addresses.channels,
2659 topics: vec![
2660 hopr_bindings::hoprchannels::HoprChannels::ChannelBalanceIncreased::SIGNATURE_HASH.into(),
2661 H256::from_slice(channel_id.as_ref()).into(),
2663 ],
2664 data: encoded_data,
2665 ..test_log()
2666 };
2667
2668 db.begin_transaction()
2669 .await?
2670 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_increased_log, true).await }))
2671 .await?;
2672
2673 db.get_corrupted_channel_by_id(None, &channel_id)
2675 .await?
2676 .context("channel should be set a corrupted")?;
2677
2678 Ok(())
2679 }
2680
2681 const PRICE_PER_PACKET: u32 = 20_u32;
2682
2683 fn mock_acknowledged_ticket(
2684 signer: &ChainKeypair,
2685 destination: &ChainKeypair,
2686 index: u64,
2687 win_prob: f64,
2688 ) -> anyhow::Result<AcknowledgedTicket> {
2689 let channel_id = generate_channel_id(&signer.into(), &destination.into());
2690
2691 let channel_epoch = 1u64;
2692 let domain_separator = Hash::default();
2693
2694 let response = Response::try_from(
2695 Hash::create(&[channel_id.as_ref(), &channel_epoch.to_be_bytes(), &index.to_be_bytes()]).as_ref(),
2696 )?;
2697
2698 Ok(TicketBuilder::default()
2699 .direction(&signer.into(), &destination.into())
2700 .amount(primitive_types::U256::from(PRICE_PER_PACKET).div_f64(win_prob)?)
2701 .index(index)
2702 .index_offset(1)
2703 .win_prob(win_prob.try_into()?)
2704 .channel_epoch(1)
2705 .challenge(response.to_challenge()?)
2706 .build_signed(signer, &domain_separator)?
2707 .into_acknowledged(response))
2708 }
2709
2710 #[tokio::test]
2711 async fn on_channel_ticket_redeemed_incoming_channel() -> anyhow::Result<()> {
2712 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2713 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2714 .await?;
2715 let rpc_operations = MockIndexerRpcOperations::new();
2716 let clonable_rpc_operations = ClonableMockOperations {
2718 inner: Arc::new(rpc_operations),
2720 };
2721 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2722
2723 let channel = ChannelEntry::new(
2724 *COUNTERPARTY_CHAIN_ADDRESS,
2725 *SELF_CHAIN_ADDRESS,
2726 HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1)),
2727 primitive_types::U256::zero(),
2728 ChannelStatus::Open,
2729 primitive_types::U256::one(),
2730 );
2731
2732 let ticket_index = primitive_types::U256::from((1u128 << 48) - 2);
2733 let next_ticket_index = ticket_index + 1;
2734
2735 let mut ticket =
2736 mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64(), 1.0)?;
2737 ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
2738
2739 let ticket_value = ticket.verified_ticket().amount;
2740
2741 db.upsert_channel(None, channel).await?;
2742 db.upsert_ticket(None, ticket.clone()).await?;
2743
2744 let ticket_redeemed_log = SerializableLog {
2745 address: handlers.addresses.channels,
2746 topics: vec![
2747 hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2748 H256::from_slice(channel.get_id().as_ref()).into(),
2750 ],
2751 data: DynSolValue::Tuple(vec![DynSolValue::Uint(
2752 U256::from_be_bytes(next_ticket_index.to_be_bytes()),
2753 48,
2754 )])
2755 .abi_encode(),
2756 ..test_log()
2757 };
2758
2759 let outgoing_ticket_index_before = db
2760 .get_outgoing_ticket_index(channel.get_id())
2761 .await?
2762 .load(Ordering::Relaxed);
2763
2764 let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2765 assert_eq!(
2766 HoprBalance::zero(),
2767 stats.redeemed_value,
2768 "there should not be any redeemed value"
2769 );
2770 assert_eq!(
2771 HoprBalance::zero(),
2772 stats.neglected_value,
2773 "there should not be any neglected value"
2774 );
2775
2776 let event_type = db
2777 .begin_transaction()
2778 .await?
2779 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2780 .await?;
2781
2782 let channel = db
2783 .get_channel_by_id(None, &channel.get_id())
2784 .await?
2785 .context("a value should be present")?;
2786
2787 assert!(
2788 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, t)) if channel == c && t == Some(ticket)),
2789 "must return the updated channel entry and the redeemed ticket"
2790 );
2791
2792 assert_eq!(
2793 channel.ticket_index, next_ticket_index,
2794 "channel entry must contain next ticket index"
2795 );
2796
2797 let outgoing_ticket_index_after = db
2798 .get_outgoing_ticket_index(channel.get_id())
2799 .await?
2800 .load(Ordering::Relaxed);
2801
2802 assert_eq!(
2803 outgoing_ticket_index_before, outgoing_ticket_index_after,
2804 "outgoing ticket index must not change"
2805 );
2806
2807 let tickets = db.get_tickets((&channel).into()).await?;
2808
2809 assert!(tickets.is_empty(), "there should not be any tickets left");
2810
2811 let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2812 assert_eq!(
2813 ticket_value, stats.redeemed_value,
2814 "there should be redeemed value worth 1 ticket"
2815 );
2816 assert_eq!(
2817 HoprBalance::zero(),
2818 stats.neglected_value,
2819 "there should not be any neglected ticket"
2820 );
2821 Ok(())
2822 }
2823
2824 #[tokio::test]
2825 async fn on_channel_ticket_redeemed_incoming_channel_neglect_left_over_tickets() -> anyhow::Result<()> {
2826 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2827 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2828 .await?;
2829 let rpc_operations = MockIndexerRpcOperations::new();
2830 let clonable_rpc_operations = ClonableMockOperations {
2832 inner: Arc::new(rpc_operations),
2834 };
2835 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2836
2837 let channel = ChannelEntry::new(
2838 *COUNTERPARTY_CHAIN_ADDRESS,
2839 *SELF_CHAIN_ADDRESS,
2840 primitive_types::U256::from((1u128 << 96) - 1).into(),
2841 primitive_types::U256::zero(),
2842 ChannelStatus::Open,
2843 primitive_types::U256::one(),
2844 );
2845
2846 let ticket_index = primitive_types::U256::from((1u128 << 48) - 2);
2847 let next_ticket_index = ticket_index + 1;
2848
2849 let mut ticket =
2850 mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64(), 1.0)?;
2851 ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
2852
2853 let ticket_value = ticket.verified_ticket().amount;
2854
2855 db.upsert_channel(None, channel).await?;
2856 db.upsert_ticket(None, ticket.clone()).await?;
2857
2858 let old_ticket =
2859 mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64() - 1, 1.0)?;
2860 db.upsert_ticket(None, old_ticket.clone()).await?;
2861
2862 let ticket_redeemed_log = SerializableLog {
2863 address: handlers.addresses.channels,
2864 topics: vec![
2865 hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2866 H256::from_slice(channel.get_id().as_ref()).into(),
2868 ],
2869 data: Vec::from(next_ticket_index.to_be_bytes()),
2870 ..test_log()
2871 };
2872
2873 let outgoing_ticket_index_before = db
2874 .get_outgoing_ticket_index(channel.get_id())
2875 .await?
2876 .load(Ordering::Relaxed);
2877
2878 let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2879 assert_eq!(
2880 HoprBalance::zero(),
2881 stats.redeemed_value,
2882 "there should not be any redeemed value"
2883 );
2884 assert_eq!(
2885 HoprBalance::zero(),
2886 stats.neglected_value,
2887 "there should not be any neglected value"
2888 );
2889
2890 let event_type = db
2891 .begin_transaction()
2892 .await?
2893 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2894 .await?;
2895
2896 let channel = db
2897 .get_channel_by_id(None, &channel.get_id())
2898 .await?
2899 .context("a value should be present")?;
2900
2901 assert!(
2902 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, t)) if channel == c && t == Some(ticket)),
2903 "must return the updated channel entry and the redeemed ticket"
2904 );
2905
2906 assert_eq!(
2907 channel.ticket_index, next_ticket_index,
2908 "channel entry must contain next ticket index"
2909 );
2910
2911 let outgoing_ticket_index_after = db
2912 .get_outgoing_ticket_index(channel.get_id())
2913 .await?
2914 .load(Ordering::Relaxed);
2915
2916 assert_eq!(
2917 outgoing_ticket_index_before, outgoing_ticket_index_after,
2918 "outgoing ticket index must not change"
2919 );
2920
2921 let tickets = db.get_tickets((&channel).into()).await?;
2922 assert!(tickets.is_empty(), "there should not be any tickets left");
2923
2924 let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2925 assert_eq!(
2926 ticket_value, stats.redeemed_value,
2927 "there should be redeemed value worth 1 ticket"
2928 );
2929 assert_eq!(
2930 ticket_value, stats.neglected_value,
2931 "there should neglected value worth 1 ticket"
2932 );
2933 Ok(())
2934 }
2935
2936 #[tokio::test]
2937 async fn on_channel_ticket_redeemed_outgoing_channel() -> anyhow::Result<()> {
2938 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2939 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2940 .await?;
2941 let rpc_operations = MockIndexerRpcOperations::new();
2942 let clonable_rpc_operations = ClonableMockOperations {
2944 inner: Arc::new(rpc_operations),
2946 };
2947 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2948
2949 let channel = ChannelEntry::new(
2950 *SELF_CHAIN_ADDRESS,
2951 *COUNTERPARTY_CHAIN_ADDRESS,
2952 primitive_types::U256::from((1u128 << 96) - 1).into(),
2953 primitive_types::U256::zero(),
2954 ChannelStatus::Open,
2955 primitive_types::U256::one(),
2956 );
2957
2958 let ticket_index = primitive_types::U256::from((1u128 << 48) - 2);
2959 let next_ticket_index = ticket_index + 1;
2960
2961 db.upsert_channel(None, channel).await?;
2962
2963 let ticket_redeemed_log = SerializableLog {
2964 address: handlers.addresses.channels,
2965 topics: vec![
2966 hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2967 H256::from_slice(channel.get_id().as_ref()).into(),
2969 ],
2970 data: Vec::from(next_ticket_index.to_be_bytes()),
2971 ..test_log()
2972 };
2973
2974 let event_type = db
2975 .begin_transaction()
2976 .await?
2977 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2978 .await?;
2979
2980 let channel = db
2981 .get_channel_by_id(None, &channel.get_id())
2982 .await?
2983 .context("a value should be present")?;
2984
2985 assert!(
2986 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if channel == c),
2987 "must return update channel entry and no ticket"
2988 );
2989
2990 assert_eq!(
2991 channel.ticket_index, next_ticket_index,
2992 "channel entry must contain next ticket index"
2993 );
2994
2995 let outgoing_ticket_index = db
2996 .get_outgoing_ticket_index(channel.get_id())
2997 .await?
2998 .load(Ordering::Relaxed);
2999
3000 assert!(
3001 outgoing_ticket_index >= ticket_index.as_u64(),
3002 "outgoing idx {outgoing_ticket_index} must be greater or equal to {ticket_index}"
3003 );
3004 assert_eq!(
3005 outgoing_ticket_index,
3006 next_ticket_index.as_u64(),
3007 "outgoing ticket index must be equal to next ticket index"
3008 );
3009 Ok(())
3010 }
3011
3012 #[tokio::test]
3013 async fn on_channel_ticket_redeemed_on_incoming_channel_with_non_existent_ticket_should_pass() -> anyhow::Result<()>
3014 {
3015 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3016 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
3017 .await?;
3018 let rpc_operations = MockIndexerRpcOperations::new();
3019 let clonable_rpc_operations = ClonableMockOperations {
3021 inner: Arc::new(rpc_operations),
3023 };
3024 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3025
3026 let channel = ChannelEntry::new(
3027 *COUNTERPARTY_CHAIN_ADDRESS,
3028 *SELF_CHAIN_ADDRESS,
3029 primitive_types::U256::from((1u128 << 96) - 1).into(),
3030 primitive_types::U256::zero(),
3031 ChannelStatus::Open,
3032 primitive_types::U256::one(),
3033 );
3034
3035 db.upsert_channel(None, channel).await?;
3036
3037 let next_ticket_index = primitive_types::U256::from((1u128 << 48) - 1);
3038
3039 let ticket_redeemed_log = SerializableLog {
3040 address: handlers.addresses.channels,
3041 topics: vec![
3042 hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
3043 H256::from_slice(channel.get_id().as_ref()).into(),
3045 ],
3046 data: Vec::from(next_ticket_index.to_be_bytes()),
3047 ..test_log()
3048 };
3049
3050 let event_type = db
3051 .begin_transaction()
3052 .await?
3053 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
3054 .await?;
3055
3056 let channel = db
3057 .get_channel_by_id(None, &channel.get_id())
3058 .await?
3059 .context("a value should be present")?;
3060
3061 assert!(
3062 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if c == channel),
3063 "must return updated channel entry and no ticket"
3064 );
3065
3066 assert_eq!(
3067 channel.ticket_index, next_ticket_index,
3068 "channel entry must contain next ticket index"
3069 );
3070 Ok(())
3071 }
3072
3073 #[tokio::test]
3074 async fn on_channel_ticket_redeemed_on_foreign_channel_should_pass() -> anyhow::Result<()> {
3075 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3076 let rpc_operations = MockIndexerRpcOperations::new();
3077 let clonable_rpc_operations = ClonableMockOperations {
3079 inner: Arc::new(rpc_operations),
3081 };
3082 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3083
3084 let channel = ChannelEntry::new(
3085 Address::from(hopr_crypto_random::random_bytes()),
3086 Address::from(hopr_crypto_random::random_bytes()),
3087 primitive_types::U256::from((1u128 << 96) - 1).into(),
3088 primitive_types::U256::zero(),
3089 ChannelStatus::Open,
3090 primitive_types::U256::one(),
3091 );
3092
3093 db.upsert_channel(None, channel).await?;
3094
3095 let next_ticket_index = primitive_types::U256::from((1u128 << 48) - 1);
3096
3097 let ticket_redeemed_log = SerializableLog {
3098 address: handlers.addresses.channels,
3099 topics: vec![
3100 hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
3101 H256::from_slice(channel.get_id().as_ref()).into(),
3103 ],
3104 data: Vec::from(next_ticket_index.to_be_bytes()),
3105 ..test_log()
3106 };
3107
3108 let event_type = db
3109 .begin_transaction()
3110 .await?
3111 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
3112 .await?;
3113
3114 let channel = db
3115 .get_channel_by_id(None, &channel.get_id())
3116 .await?
3117 .context("a value should be present")?;
3118
3119 assert!(
3120 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if c == channel),
3121 "must return updated channel entry and no ticket"
3122 );
3123
3124 assert_eq!(
3125 channel.ticket_index, next_ticket_index,
3126 "channel entry must contain next ticket index"
3127 );
3128 Ok(())
3129 }
3130
3131 #[tokio::test]
3132 async fn on_channel_closure_initiated() -> anyhow::Result<()> {
3133 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3134 let rpc_operations = MockIndexerRpcOperations::new();
3135 let clonable_rpc_operations = ClonableMockOperations {
3137 inner: Arc::new(rpc_operations),
3139 };
3140 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3141
3142 let channel = ChannelEntry::new(
3143 *SELF_CHAIN_ADDRESS,
3144 *COUNTERPARTY_CHAIN_ADDRESS,
3145 primitive_types::U256::from((1u128 << 96) - 1).into(),
3146 primitive_types::U256::zero(),
3147 ChannelStatus::Open,
3148 primitive_types::U256::one(),
3149 );
3150
3151 db.upsert_channel(None, channel).await?;
3152
3153 let timestamp = SystemTime::now();
3154
3155 let encoded_data = (U256::from(timestamp.as_unix_timestamp().as_secs())).abi_encode();
3156
3157 let closure_initiated_log = SerializableLog {
3158 address: handlers.addresses.channels,
3159 topics: vec![
3160 hopr_bindings::hoprchannels::HoprChannels::OutgoingChannelClosureInitiated::SIGNATURE_HASH.into(),
3161 H256::from_slice(channel.get_id().as_ref()).into(),
3163 ],
3164 data: encoded_data,
3165 ..test_log()
3167 };
3168
3169 let event_type = db
3170 .begin_transaction()
3171 .await?
3172 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, closure_initiated_log, true).await }))
3173 .await?;
3174
3175 let channel = db
3176 .get_channel_by_id(None, &channel.get_id())
3177 .await?
3178 .context("a value should be present")?;
3179
3180 assert!(
3181 matches!(event_type, Some(ChainEventType::ChannelClosureInitiated(c)) if c == channel),
3182 "must return updated channel entry"
3183 );
3184
3185 assert_eq!(
3186 channel.status,
3187 ChannelStatus::PendingToClose(timestamp),
3188 "channel status must match"
3189 );
3190 Ok(())
3191 }
3192
3193 #[tokio::test]
3194 async fn on_node_safe_registry_registered() -> anyhow::Result<()> {
3195 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3196 let rpc_operations = MockIndexerRpcOperations::new();
3197 let clonable_rpc_operations = ClonableMockOperations {
3199 inner: Arc::new(rpc_operations),
3201 };
3202 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3203
3204 let encoded_data = ().abi_encode();
3205
3206 let safe_registered_log = SerializableLog {
3207 address: handlers.addresses.safe_registry,
3208 topics: vec![
3209 hopr_bindings::hoprnodesaferegistry::HoprNodeSafeRegistry::RegisteredNodeSafe::SIGNATURE_HASH.into(),
3210 H256::from_slice(&SAFE_INSTANCE_ADDR.to_bytes32()).into(),
3212 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
3213 ],
3214 data: encoded_data,
3215 ..test_log()
3216 };
3217
3218 let event_type = db
3219 .begin_transaction()
3220 .await?
3221 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, safe_registered_log, true).await }))
3222 .await?;
3223
3224 assert!(matches!(event_type, Some(ChainEventType::NodeSafeRegistered(addr)) if addr == *SAFE_INSTANCE_ADDR));
3225
3226 Ok(())
3228 }
3229
3230 #[tokio::test]
3231 async fn on_node_safe_registry_deregistered() -> anyhow::Result<()> {
3232 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3233 let rpc_operations = MockIndexerRpcOperations::new();
3234 let clonable_rpc_operations = ClonableMockOperations {
3236 inner: Arc::new(rpc_operations),
3238 };
3239 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3240
3241 let encoded_data = ().abi_encode();
3244
3245 let safe_registered_log = SerializableLog {
3246 address: handlers.addresses.safe_registry,
3247 topics: vec![
3248 hopr_bindings::hoprnodesaferegistry::HoprNodeSafeRegistry::DergisteredNodeSafe::SIGNATURE_HASH.into(),
3249 H256::from_slice(&SAFE_INSTANCE_ADDR.to_bytes32()).into(),
3251 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
3252 ],
3253 data: encoded_data,
3254 ..test_log()
3255 };
3256
3257 let event_type = db
3258 .begin_transaction()
3259 .await?
3260 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, safe_registered_log, true).await }))
3261 .await?;
3262
3263 assert!(
3264 event_type.is_none(),
3265 "there's no associated chain event type with safe deregistration"
3266 );
3267
3268 Ok(())
3270 }
3271
3272 #[tokio::test]
3273 async fn ticket_price_update() -> anyhow::Result<()> {
3274 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3275 let rpc_operations = MockIndexerRpcOperations::new();
3276 let clonable_rpc_operations = ClonableMockOperations {
3278 inner: Arc::new(rpc_operations),
3280 };
3281 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3282
3283 let encoded_data = (U256::from(1u64), U256::from(123u64)).abi_encode();
3284
3285 let price_change_log = SerializableLog {
3286 address: handlers.addresses.price_oracle,
3287 topics: vec![
3288 hopr_bindings::hoprticketpriceoracle::HoprTicketPriceOracle::TicketPriceUpdated::SIGNATURE_HASH.into(),
3289 ],
3291 data: encoded_data,
3292 ..test_log()
3294 };
3295
3296 assert_eq!(db.get_indexer_data(None).await?.ticket_price, None);
3297
3298 let event_type = db
3299 .begin_transaction()
3300 .await?
3301 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, price_change_log, true).await }))
3302 .await?;
3303
3304 assert!(
3305 event_type.is_none(),
3306 "there's no associated chain event type with price oracle"
3307 );
3308
3309 assert_eq!(
3310 db.get_indexer_data(None).await?.ticket_price.map(|p| p.amount()),
3311 Some(primitive_types::U256::from(123u64))
3312 );
3313 Ok(())
3314 }
3315
3316 #[tokio::test]
3317 async fn minimum_win_prob_update() -> anyhow::Result<()> {
3318 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3319 let rpc_operations = MockIndexerRpcOperations::new();
3320 let clonable_rpc_operations = ClonableMockOperations {
3322 inner: Arc::new(rpc_operations),
3324 };
3325 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3326
3327 let encoded_data = (
3328 U256::from_be_slice(WinningProbability::ALWAYS.as_ref()),
3329 U256::from_be_slice(WinningProbability::try_from_f64(0.5)?.as_ref()),
3330 )
3331 .abi_encode();
3332
3333 let win_prob_change_log = SerializableLog {
3334 address: handlers.addresses.win_prob_oracle,
3335 topics: vec![
3336 hopr_bindings::hoprwinningprobabilityoracle::HoprWinningProbabilityOracle::WinProbUpdated::SIGNATURE_HASH.into()],
3337 data: encoded_data,
3338 ..test_log()
3339 };
3340
3341 assert_eq!(
3342 db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
3343 1.0
3344 );
3345
3346 let event_type = db
3347 .begin_transaction()
3348 .await?
3349 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, win_prob_change_log, true).await }))
3350 .await?;
3351
3352 assert!(
3353 event_type.is_none(),
3354 "there's no associated chain event type with winning probability change"
3355 );
3356
3357 assert_eq!(
3358 db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
3359 0.5
3360 );
3361 Ok(())
3362 }
3363
3364 #[tokio::test]
3365 async fn lowering_minimum_win_prob_update_should_reject_non_satisfying_unredeemed_tickets() -> anyhow::Result<()> {
3366 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3367 db.set_minimum_incoming_ticket_win_prob(None, 0.1.try_into()?).await?;
3368
3369 let new_minimum = 0.5;
3370 let ticket_win_probs = [0.1, 1.0, 0.3, 0.2];
3371
3372 let channel_1 = ChannelEntry::new(
3373 *COUNTERPARTY_CHAIN_ADDRESS,
3374 *SELF_CHAIN_ADDRESS,
3375 primitive_types::U256::from((1u128 << 96) - 1).into(),
3376 3_u32.into(),
3377 ChannelStatus::Open,
3378 primitive_types::U256::one(),
3379 );
3380
3381 db.upsert_channel(None, channel_1).await?;
3382
3383 let ticket = mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, 1, ticket_win_probs[0])?;
3384 db.upsert_ticket(None, ticket).await?;
3385
3386 let ticket = mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, 2, ticket_win_probs[1])?;
3387 db.upsert_ticket(None, ticket).await?;
3388
3389 let tickets = db.get_tickets((&channel_1).into()).await?;
3390 assert_eq!(tickets.len(), 2);
3391
3392 let other_counterparty = ChainKeypair::random();
3395 let channel_2 = ChannelEntry::new(
3396 other_counterparty.public().to_address(),
3397 *SELF_CHAIN_ADDRESS,
3398 primitive_types::U256::from((1u128 << 96) - 1).into(),
3399 3_u32.into(),
3400 ChannelStatus::Open,
3401 primitive_types::U256::one(),
3402 );
3403
3404 db.upsert_channel(None, channel_2).await?;
3405
3406 let ticket = mock_acknowledged_ticket(&other_counterparty, &SELF_CHAIN_KEY, 1, ticket_win_probs[2])?;
3407 db.upsert_ticket(None, ticket).await?;
3408
3409 let ticket = mock_acknowledged_ticket(&other_counterparty, &SELF_CHAIN_KEY, 2, ticket_win_probs[3])?;
3410 db.upsert_ticket(None, ticket).await?;
3411
3412 let tickets = db.get_tickets((&channel_2).into()).await?;
3413 assert_eq!(tickets.len(), 2);
3414
3415 let stats = db.get_ticket_statistics(None).await?;
3416 assert_eq!(HoprBalance::zero(), stats.rejected_value);
3417
3418 let rpc_operations = MockIndexerRpcOperations::new();
3419 let clonable_rpc_operations = ClonableMockOperations {
3421 inner: Arc::new(rpc_operations),
3423 };
3424 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3425
3426 let encoded_data = (
3427 U256::from_be_slice(WinningProbability::try_from(0.1)?.as_ref()),
3428 U256::from_be_slice(WinningProbability::try_from(new_minimum)?.as_ref()),
3429 )
3430 .abi_encode();
3431
3432 let win_prob_change_log = SerializableLog {
3433 address: handlers.addresses.win_prob_oracle,
3434 topics: vec![
3435 hopr_bindings::hoprwinningprobabilityoracle::HoprWinningProbabilityOracle::WinProbUpdated::SIGNATURE_HASH.into(),
3436 ],
3437 data: encoded_data,
3438 ..test_log()
3439 };
3440
3441 let event_type = db
3442 .begin_transaction()
3443 .await?
3444 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, win_prob_change_log, true).await }))
3445 .await?;
3446
3447 assert!(
3448 event_type.is_none(),
3449 "there's no associated chain event type with winning probability change"
3450 );
3451
3452 assert_eq!(
3453 db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
3454 new_minimum
3455 );
3456
3457 let tickets = db.get_tickets((&channel_1).into()).await?;
3458 assert_eq!(tickets.len(), 1);
3459
3460 let tickets = db.get_tickets((&channel_2).into()).await?;
3461 assert_eq!(tickets.len(), 0);
3462
3463 let stats = db.get_ticket_statistics(None).await?;
3464 let rejected_value: primitive_types::U256 = ticket_win_probs
3465 .iter()
3466 .filter(|p| **p < new_minimum)
3467 .map(|p| {
3468 primitive_types::U256::from(PRICE_PER_PACKET)
3469 .div_f64(*p)
3470 .expect("must divide")
3471 })
3472 .reduce(|a, b| a + b)
3473 .ok_or(anyhow!("must sum"))?;
3474
3475 assert_eq!(HoprBalance::from(rejected_value), stats.rejected_value);
3476
3477 Ok(())
3478 }
3479}