1use std::{
2 cmp::Ordering,
3 fmt::{Debug, Formatter},
4 ops::Add,
5 sync::Arc,
6 time::{Duration, SystemTime},
7};
8
9use alloy::{primitives::B256, sol_types::SolEventInterface};
10use async_trait::async_trait;
11use hopr_bindings::{
12 hoprannouncements::HoprAnnouncements::HoprAnnouncementsEvents, hoprchannels::HoprChannels::HoprChannelsEvents,
13 hoprnetworkregistry::HoprNetworkRegistry::HoprNetworkRegistryEvents,
14 hoprnodemanagementmodule::HoprNodeManagementModule::HoprNodeManagementModuleEvents,
15 hoprnodesaferegistry::HoprNodeSafeRegistry::HoprNodeSafeRegistryEvents,
16 hoprticketpriceoracle::HoprTicketPriceOracle::HoprTicketPriceOracleEvents, hoprtoken::HoprToken::HoprTokenEvents,
17 hoprwinningprobabilityoracle::HoprWinningProbabilityOracle::HoprWinningProbabilityOracleEvents,
18};
19use hopr_chain_rpc::{HoprIndexerRpcOperations, Log};
20use hopr_chain_types::{
21 ContractAddresses,
22 chain_events::{ChainEventType, NetworkRegistryStatus, SignificantChainEvent},
23};
24use hopr_crypto_types::{
25 keypairs::ChainKeypair,
26 prelude::{Hash, Keypair},
27 types::OffchainSignature,
28};
29use hopr_db_sql::{
30 HoprDbAllOperations, OpenTransaction,
31 api::{info::DomainSeparator, tickets::TicketSelector},
32 errors::DbSqlError,
33 prelude::TicketMarker,
34};
35use hopr_internal_types::prelude::*;
36use hopr_primitive_types::prelude::*;
37use tracing::{debug, error, info, trace, warn};
38
39use crate::errors::{CoreEthereumIndexerError, Result};
40
41#[cfg(all(feature = "prometheus", not(test)))]
42lazy_static::lazy_static! {
43 static ref METRIC_INDEXER_LOG_COUNTERS: hopr_metrics::MultiCounter =
44 hopr_metrics::MultiCounter::new(
45 "hopr_indexer_contract_log_count",
46 "Counts of different HOPR contract logs processed by the Indexer",
47 &["contract"]
48 ).unwrap();
49}
50
51#[derive(Clone)]
56pub struct ContractEventHandlers<T, Db> {
57 addresses: Arc<ContractAddresses>,
60 safe_address: Address,
62 chain_key: ChainKeypair, db: Db,
66 rpc_operations: T,
68}
69
70impl<T, Db> Debug for ContractEventHandlers<T, Db> {
71 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("ContractEventHandler")
73 .field("addresses", &self.addresses)
74 .field("safe_address", &self.safe_address)
75 .field("chain_key", &self.chain_key)
76 .finish_non_exhaustive()
77 }
78}
79
80impl<T, Db> ContractEventHandlers<T, Db>
81where
82 T: HoprIndexerRpcOperations + Clone + Send + 'static,
83 Db: HoprDbAllOperations + Clone,
84{
85 pub fn new(
103 addresses: ContractAddresses,
104 safe_address: Address,
105 chain_key: ChainKeypair,
106 db: Db,
107 rpc_operations: T,
108 ) -> Self {
109 Self {
110 addresses: Arc::new(addresses),
111 safe_address,
112 chain_key,
113 db,
114 rpc_operations,
115 }
116 }
117
118 async fn on_announcement_event(
119 &self,
120 tx: &OpenTransaction,
121 event: HoprAnnouncementsEvents,
122 block_number: u32,
123 _is_synced: bool,
124 ) -> Result<Option<ChainEventType>> {
125 #[cfg(all(feature = "prometheus", not(test)))]
126 METRIC_INDEXER_LOG_COUNTERS.increment(&["announcements"]);
127
128 match event {
129 HoprAnnouncementsEvents::AddressAnnouncement(address_announcement) => {
130 debug!(
131 multiaddress = &address_announcement.baseMultiaddr,
132 address = &address_announcement.node.to_string(),
133 "on_announcement_event: AddressAnnouncement",
134 );
135 if address_announcement.baseMultiaddr.is_empty() {
137 warn!(
138 address = %address_announcement.node,
139 "encountered empty multiaddress announcement",
140 );
141 return Ok(None);
142 }
143 let node_address: Address = address_announcement.node.into();
144
145 return match self
146 .db
147 .insert_announcement(
148 Some(tx),
149 node_address,
150 address_announcement.baseMultiaddr.parse()?,
151 block_number,
152 )
153 .await
154 {
155 Ok(account) => Ok(Some(ChainEventType::Announcement {
156 peer: account.public_key.into(),
157 address: account.chain_addr,
158 multiaddresses: vec![account.get_multiaddr().expect("not must contain multiaddr")],
159 })),
160 Err(DbSqlError::MissingAccount) => Err(CoreEthereumIndexerError::AnnounceBeforeKeyBinding),
161 Err(e) => Err(e.into()),
162 };
163 }
164 HoprAnnouncementsEvents::KeyBinding(key_binding) => {
165 debug!(
166 address = %key_binding.chain_key,
167 public_key = %key_binding.ed25519_pub_key,
168 "on_announcement_event: KeyBinding",
169 );
170 match KeyBinding::from_parts(
171 key_binding.chain_key.into(),
172 key_binding.ed25519_pub_key.0.try_into()?,
173 OffchainSignature::try_from((key_binding.ed25519_sig_0.0, key_binding.ed25519_sig_1.0))?,
174 ) {
175 Ok(binding) => {
176 match self
177 .db
178 .insert_account(
179 Some(tx),
180 AccountEntry {
181 public_key: binding.packet_key,
182 chain_addr: binding.chain_key,
183 entry_type: AccountType::NotAnnounced,
184 published_at: block_number,
185 },
186 )
187 .await
188 {
189 Ok(_) => (),
190 Err(err) => {
191 error!(%err, "failed to store announcement key binding")
194 }
195 }
196 }
197 Err(e) => {
198 warn!(
199 address = ?key_binding.chain_key,
200 error = %e,
201 "Filtering announcement with invalid signature",
202
203 )
204 }
205 }
206 }
207 HoprAnnouncementsEvents::RevokeAnnouncement(revocation) => {
208 let node_address: Address = revocation.node.into();
209 match self.db.delete_all_announcements(Some(tx), node_address).await {
210 Err(DbSqlError::MissingAccount) => {
211 return Err(CoreEthereumIndexerError::RevocationBeforeKeyBinding);
212 }
213 Err(e) => return Err(e.into()),
214 _ => {}
215 }
216 }
217 };
218
219 Ok(None)
220 }
221
222 async fn on_channel_event(
223 &self,
224 tx: &OpenTransaction,
225 event: HoprChannelsEvents,
226 is_synced: bool,
227 ) -> Result<Option<ChainEventType>> {
228 #[cfg(all(feature = "prometheus", not(test)))]
229 METRIC_INDEXER_LOG_COUNTERS.increment(&["channels"]);
230
231 match event {
232 HoprChannelsEvents::ChannelBalanceDecreased(balance_decreased) => {
233 let maybe_channel = self
234 .db
235 .begin_channel_update(tx.into(), &balance_decreased.channelId.0.into())
236 .await?;
237
238 trace!(
239 channel_id = %Hash::from(balance_decreased.channelId.0),
240 is_channel = maybe_channel.is_some(),
241 "on_channel_balance_decreased_event",
242 );
243
244 if let Some(channel_edits) = maybe_channel {
245 let new_balance = HoprBalance::from_be_bytes(balance_decreased.newBalance.to_be_bytes::<12>());
246 let diff = channel_edits.entry().balance - new_balance;
247
248 let updated_channel = self
249 .db
250 .finish_channel_update(tx.into(), channel_edits.change_balance(new_balance))
251 .await?;
252
253 if is_synced
254 && (updated_channel.source == self.chain_key.public().to_address()
255 || updated_channel.destination == self.chain_key.public().to_address())
256 {
257 info!("updating safe balance from chain after channel balance decreased event");
258 match self.rpc_operations.get_hopr_balance(self.safe_address).await {
259 Ok(balance) => {
260 self.db.set_safe_hopr_balance(Some(tx), balance).await?;
261 }
262 Err(error) => {
263 error!(%error, "error getting safe balance from chain after channel balance decreased event");
264 }
265 }
266 }
267
268 Ok(Some(ChainEventType::ChannelBalanceDecreased(updated_channel, diff)))
269 } else {
270 error!(channel_id = %Hash::from(balance_decreased.channelId.0), "observed balance decreased event for a channel that does not exist");
271 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
272 }
273 }
274 HoprChannelsEvents::ChannelBalanceIncreased(balance_increased) => {
275 let maybe_channel = self
276 .db
277 .begin_channel_update(tx.into(), &balance_increased.channelId.0.into())
278 .await?;
279
280 trace!(
281 channel_id = %Hash::from(balance_increased.channelId.0),
282 is_channel = maybe_channel.is_some(),
283 "on_channel_balance_increased_event",
284 );
285
286 if let Some(channel_edits) = maybe_channel {
287 let new_balance = HoprBalance::from_be_bytes(balance_increased.newBalance.to_be_bytes::<12>());
288 let diff = new_balance - channel_edits.entry().balance;
289
290 let updated_channel = self
291 .db
292 .finish_channel_update(tx.into(), channel_edits.change_balance(new_balance))
293 .await?;
294
295 if updated_channel.source == self.chain_key.public().to_address() && is_synced {
296 info!("updating safe balance from chain after channel balance increased event");
297 match self.rpc_operations.get_hopr_balance(self.safe_address).await {
298 Ok(balance) => {
299 self.db.set_safe_hopr_balance(Some(tx), balance).await?;
300 }
301 Err(error) => {
302 error!(%error, "error getting safe balance from chain after channel balance increased event");
303 }
304 }
305 info!("updating safe allowance from chain after channel balance increased event");
306 match self
307 .rpc_operations
308 .get_hopr_allowance(self.safe_address, self.addresses.channels)
309 .await
310 {
311 Ok(allowance) => {
312 self.db.set_safe_hopr_allowance(Some(tx), allowance).await?;
313 }
314 Err(error) => {
315 error!(%error, "error getting safe allowance from chain after channel balance increased event");
316 }
317 }
318 }
319
320 Ok(Some(ChainEventType::ChannelBalanceIncreased(updated_channel, diff)))
321 } else {
322 error!(channel_id = %Hash::from(balance_increased.channelId.0), "observed balance increased event for a channel that does not exist");
323 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
324 }
325 }
326 HoprChannelsEvents::ChannelClosed(channel_closed) => {
327 let maybe_channel = self
328 .db
329 .begin_channel_update(tx.into(), &channel_closed.channelId.0.into())
330 .await?;
331
332 trace!(
333 channel_id = ?channel_closed.channelId.0,
334 is_channel = maybe_channel.is_some(),
335 "on_channel_closed_event",
336 );
337
338 if let Some(channel_edits) = maybe_channel {
339 let channel_id = channel_edits.entry().get_id();
340 let orientation = channel_edits.entry().orientation(&self.chain_key.public().to_address());
341
342 let updated_channel = if let Some((direction, _)) = orientation {
345 let channel_edits = channel_edits
347 .change_status(ChannelStatus::Closed)
348 .change_balance(HoprBalance::zero())
349 .change_ticket_index(0);
350
351 let updated_channel = self.db.finish_channel_update(tx.into(), channel_edits).await?;
352
353 match direction {
355 ChannelDirection::Incoming => {
356 self.db
358 .mark_tickets_as(updated_channel.into(), TicketMarker::Neglected)
359 .await?;
360 }
361 ChannelDirection::Outgoing => {
362 self.db.reset_outgoing_ticket_index(channel_id).await?;
364 }
365 }
366 updated_channel
367 } else {
368 let updated_channel = self.db.finish_channel_update(tx.into(), channel_edits.delete()).await?;
371 debug!(channel_id = %channel_id, "foreign closed closed channel was deleted");
372 updated_channel
373 };
374
375 Ok(Some(ChainEventType::ChannelClosed(updated_channel)))
376 } else {
377 error!(channel_id = %Hash::from(channel_closed.channelId.0), "observed closure finalization event for a channel that does not exist");
378 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
379 }
380 }
381 HoprChannelsEvents::ChannelOpened(channel_opened) => {
382 let source: Address = channel_opened.source.into();
383 let destination: Address = channel_opened.destination.into();
384 let channel_id = generate_channel_id(&source, &destination);
385
386 let maybe_channel = self.db.begin_channel_update(tx.into(), &channel_id).await?;
387
388 let channel = if let Some(channel_edits) = maybe_channel {
389 if channel_edits.entry().status != ChannelStatus::Closed {
391 return Err(CoreEthereumIndexerError::ProcessError(format!(
392 "trying to re-open channel {} which is not closed, but {}",
393 channel_edits.entry().get_id(),
394 channel_edits.entry().status,
395 )));
396 }
397
398 trace!(%source, %destination, %channel_id, "on_channel_reopened_event");
399
400 let current_epoch = channel_edits.entry().channel_epoch;
401
402 if source == self.chain_key.public().to_address()
404 || destination == self.chain_key.public().to_address()
405 {
406 self.db
407 .mark_tickets_as(TicketSelector::new(channel_id, current_epoch), TicketMarker::Neglected)
408 .await?;
409
410 self.db.reset_outgoing_ticket_index(channel_id).await?;
411 }
412
413 self.db
415 .finish_channel_update(
416 tx.into(),
417 channel_edits
418 .change_ticket_index(0_u32)
419 .change_epoch(current_epoch.add(1))
420 .change_status(ChannelStatus::Open),
421 )
422 .await?
423 } else {
424 trace!(%source, %destination, %channel_id, "on_channel_opened_event");
425
426 let new_channel = ChannelEntry::new(
427 source,
428 destination,
429 0_u32.into(),
430 0_u32.into(),
431 ChannelStatus::Open,
432 1_u32.into(),
433 );
434
435 self.db.upsert_channel(tx.into(), new_channel).await?;
436 new_channel
437 };
438
439 Ok(Some(ChainEventType::ChannelOpened(channel)))
440 }
441 HoprChannelsEvents::TicketRedeemed(ticket_redeemed) => {
442 let maybe_channel = self
443 .db
444 .begin_channel_update(tx.into(), &ticket_redeemed.channelId.0.into())
445 .await?;
446
447 if let Some(channel_edits) = maybe_channel {
448 let ack_ticket = match channel_edits.entry().direction(&self.chain_key.public().to_address()) {
449 Some(ChannelDirection::Incoming) => {
452 let mut matching_tickets = self
454 .db
455 .get_tickets(
456 TicketSelector::from(channel_edits.entry())
457 .with_state(AcknowledgedTicketStatus::BeingRedeemed),
458 )
459 .await?
460 .into_iter()
461 .filter(|ticket| {
462 let ticket_idx = ticket.verified_ticket().index;
467 let ticket_off = ticket.verified_ticket().index_offset as u64;
468
469 ticket_idx + ticket_off == ticket_redeemed.newTicketIndex.to::<u64>()
470 })
471 .collect::<Vec<_>>();
472
473 match matching_tickets.len().cmp(&1) {
474 Ordering::Equal => {
475 let ack_ticket = matching_tickets.pop().unwrap();
476
477 self.db
478 .mark_tickets_as((&ack_ticket).into(), TicketMarker::Redeemed)
479 .await?;
480 info!(%ack_ticket, "ticket marked as redeemed");
481 Some(ack_ticket)
482 }
483 Ordering::Less => {
484 error!(
485 idx = %ticket_redeemed.newTicketIndex.to::<u64>() - 1,
486 entry = %channel_edits.entry(),
487 "could not find acknowledged 'BeingRedeemed' ticket",
488 );
489 None
492 }
493 Ordering::Greater => {
494 error!(
495 count = matching_tickets.len(),
496 index = %ticket_redeemed.newTicketIndex.to::<u64>() - 1,
497 entry = %channel_edits.entry(),
498 "found tickets matching 'BeingRedeemed'",
499 );
500 return Err(CoreEthereumIndexerError::ProcessError(format!(
501 "multiple tickets matching idx {} found in {}",
502 ticket_redeemed.newTicketIndex.to::<u64>() - 1,
503 channel_edits.entry()
504 )));
505 }
506 }
507 }
508 Some(ChannelDirection::Outgoing) => {
513 debug!(channel = %channel_edits.entry(), "observed redeem event on an outgoing channel");
515 self.db
516 .compare_and_set_outgoing_ticket_index(
517 channel_edits.entry().get_id(),
518 ticket_redeemed.newTicketIndex.to::<u64>(),
519 )
520 .await?;
521 None
522 }
523 None => {
525 debug!(channel = %channel_edits.entry(), "observed redeem event on a foreign channel");
527 None
528 }
529 };
530
531 let channel = self
533 .db
534 .finish_channel_update(
535 tx.into(),
536 channel_edits.change_ticket_index(U256::from_be_bytes(
537 ticket_redeemed.newTicketIndex.to_be_bytes::<6>(),
538 )),
539 )
540 .await?;
541 self.db
544 .mark_tickets_as(
545 TicketSelector::from(&channel)
546 .with_index_range(..ticket_redeemed.newTicketIndex.to::<u64>()),
547 TicketMarker::Neglected,
548 )
549 .await?;
550
551 Ok(Some(ChainEventType::TicketRedeemed(channel, ack_ticket)))
552 } else {
553 error!(channel_id = %Hash::from(ticket_redeemed.channelId.0), "observed ticket redeem on a channel that we don't have in the DB");
554 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
555 }
556 }
557 HoprChannelsEvents::OutgoingChannelClosureInitiated(closure_initiated) => {
558 let maybe_channel = self
559 .db
560 .begin_channel_update(tx.into(), &closure_initiated.channelId.0.into())
561 .await?;
562
563 let closure_time: u32 = closure_initiated.closureTime;
564 if let Some(channel_edits) = maybe_channel {
565 let new_status = ChannelStatus::PendingToClose(
566 SystemTime::UNIX_EPOCH.add(Duration::from_secs(closure_time.into())),
567 );
568
569 let channel = self
570 .db
571 .finish_channel_update(tx.into(), channel_edits.change_status(new_status))
572 .await?;
573 Ok(Some(ChainEventType::ChannelClosureInitiated(channel)))
574 } else {
575 error!(channel_id = %Hash::from(closure_initiated.channelId.0), "observed channel closure initiation on a channel that we don't have in the DB");
576 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
577 }
578 }
579 HoprChannelsEvents::DomainSeparatorUpdated(domain_separator_updated) => {
580 self.db
581 .set_domain_separator(
582 Some(tx),
583 DomainSeparator::Channel,
584 domain_separator_updated.domainSeparator.0.into(),
585 )
586 .await?;
587
588 Ok(None)
589 }
590 HoprChannelsEvents::LedgerDomainSeparatorUpdated(ledger_domain_separator_updated) => {
591 self.db
592 .set_domain_separator(
593 Some(tx),
594 DomainSeparator::Ledger,
595 ledger_domain_separator_updated.ledgerDomainSeparator.0.into(),
596 )
597 .await?;
598
599 Ok(None)
600 }
601 }
602 }
603
604 async fn on_token_event(
605 &self,
606 tx: &OpenTransaction,
607 event: HoprTokenEvents,
608 is_synced: bool,
609 ) -> Result<Option<ChainEventType>> {
610 #[cfg(all(feature = "prometheus", not(test)))]
611 METRIC_INDEXER_LOG_COUNTERS.increment(&["token"]);
612
613 match event {
614 HoprTokenEvents::Transfer(transferred) => {
615 let from: Address = transferred.from.into();
616 let to: Address = transferred.to.into();
617
618 trace!(
619 safe_address = %&self.safe_address, %from, %to,
620 "on_token_transfer_event"
621 );
622
623 if to.ne(&self.safe_address) && from.ne(&self.safe_address) {
624 error!(
625 safe_address = %&self.safe_address, %from, %to,
626 "filter misconfiguration: transfer event not involving the safe");
627 return Ok(None);
628 }
629
630 if is_synced {
631 info!("updating safe balance from chain after transfer event");
632 match self.rpc_operations.get_hopr_balance(self.safe_address).await {
633 Ok(balance) => {
634 self.db.set_safe_hopr_balance(Some(tx), balance).await?;
635 }
636 Err(error) => {
637 error!(%error, "error getting safe balance from chain after transfer event");
638 }
639 }
640 info!("updating safe allowance from chain after transfer event");
641 match self
642 .rpc_operations
643 .get_hopr_allowance(self.safe_address, self.addresses.channels)
644 .await
645 {
646 Ok(allowance) => {
647 self.db.set_safe_hopr_allowance(Some(tx), allowance).await?;
648 }
649 Err(error) => {
650 error!(%error, "error getting safe allowance from chain after transfer event");
651 }
652 }
653 }
654 }
655 HoprTokenEvents::Approval(approved) => {
656 let owner: Address = approved.owner.into();
657 let spender: Address = approved.spender.into();
658
659 trace!(
660 address = %&self.safe_address, %owner, %spender, allowance = %approved.value,
661 "on_token_approval_event",
662
663 );
664
665 if owner.ne(&self.safe_address) || spender.ne(&self.addresses.channels) {
666 error!(
667 address = %&self.safe_address, %owner, %spender, allowance = %approved.value,
668 "filter misconfiguration: approval event not involving the safe and channels contract");
669 return Ok(None);
670 }
671
672 if is_synced {
674 info!("updating safe allowance from chain after approval event");
675 match self
676 .rpc_operations
677 .get_hopr_allowance(self.safe_address, self.addresses.channels)
678 .await
679 {
680 Ok(allowance) => {
681 self.db.set_safe_hopr_allowance(Some(tx), allowance).await?;
682 }
683 Err(error) => {
684 error!(%error, "error getting safe allowance from chain after approval event");
685 }
686 }
687 }
688 }
689 _ => error!("Implement all the other filters for HoprTokenEvents"),
690 }
691
692 Ok(None)
693 }
694
695 async fn on_network_registry_event(
696 &self,
697 tx: &OpenTransaction,
698 event: HoprNetworkRegistryEvents,
699 _is_synced: bool,
700 ) -> Result<Option<ChainEventType>> {
701 #[cfg(all(feature = "prometheus", not(test)))]
702 METRIC_INDEXER_LOG_COUNTERS.increment(&["network_registry"]);
703
704 match event {
705 HoprNetworkRegistryEvents::DeregisteredByManager(deregistered) => {
706 debug!(
707 node_address = %deregistered.nodeAddress,
708 "on_network_registry_deregistered_by_manager_event",
709 );
710 let node_address: Address = deregistered.nodeAddress.into();
711 self.db
712 .set_access_in_network_registry(Some(tx), node_address, false)
713 .await?;
714
715 return Ok(Some(ChainEventType::NetworkRegistryUpdate(
716 node_address,
717 NetworkRegistryStatus::Denied,
718 )));
719 }
720 HoprNetworkRegistryEvents::Deregistered(deregistered) => {
721 debug!(
722 node_address = %deregistered.nodeAddress,
723 "on_network_registry_deregistered_event",
724 );
725 let node_address: Address = deregistered.nodeAddress.into();
726 self.db
727 .set_access_in_network_registry(Some(tx), node_address, false)
728 .await?;
729
730 return Ok(Some(ChainEventType::NetworkRegistryUpdate(
731 node_address,
732 NetworkRegistryStatus::Denied,
733 )));
734 }
735 HoprNetworkRegistryEvents::RegisteredByManager(registered) => {
736 let node_address: Address = registered.nodeAddress.into();
737 debug!(
738 %node_address,
739 "Node registered by manager, adding to the network registry",
740 );
741 self.db
742 .set_access_in_network_registry(Some(tx), node_address, true)
743 .await?;
744
745 if node_address == self.chain_key.public().to_address() {
746 info!(
747 "This node has been added to the registry, node activation process continues on: http://hub.hoprnet.org/."
748 );
749 }
750
751 return Ok(Some(ChainEventType::NetworkRegistryUpdate(
752 node_address,
753 NetworkRegistryStatus::Allowed,
754 )));
755 }
756 HoprNetworkRegistryEvents::Registered(registered) => {
757 let node_address: Address = registered.nodeAddress.into();
758 debug!(
759 %node_address,
760 "Node registered, adding to the network registry",
761 );
762 self.db
763 .set_access_in_network_registry(Some(tx), node_address, true)
764 .await?;
765
766 if node_address == self.chain_key.public().to_address() {
767 info!(
768 "This node has been added to the registry, node can now continue the node activation process on: http://hub.hoprnet.org/."
769 );
770 }
771
772 return Ok(Some(ChainEventType::NetworkRegistryUpdate(
773 node_address,
774 NetworkRegistryStatus::Allowed,
775 )));
776 }
777 HoprNetworkRegistryEvents::EligibilityUpdated(eligibility_updated) => {
778 debug!(
779 staking_account = %eligibility_updated.stakingAccount,
780 eligibility = %eligibility_updated.eligibility,
781 "Node eligibility updated, updating the safe eligibility",
782 );
783 let account: Address = eligibility_updated.stakingAccount.into();
784 self.db
785 .set_safe_eligibility(Some(tx), account, eligibility_updated.eligibility)
786 .await?;
787 }
788 HoprNetworkRegistryEvents::NetworkRegistryStatusUpdated(enabled) => {
789 debug!(enabled = enabled.isEnabled, "on_network_registry_status_updated_event",);
790 self.db
791 .set_network_registry_enabled(Some(tx), enabled.isEnabled)
792 .await?;
793 }
794 _ => {
795 debug!("on_network_registry_event - not implemented for event: {:?}", event);
796 } };
798
799 Ok(None)
800 }
801
802 async fn on_node_safe_registry_event(
803 &self,
804 tx: &OpenTransaction,
805 event: HoprNodeSafeRegistryEvents,
806 _is_synced: bool,
807 ) -> Result<Option<ChainEventType>> {
808 #[cfg(all(feature = "prometheus", not(test)))]
809 METRIC_INDEXER_LOG_COUNTERS.increment(&["safe_registry"]);
810
811 match event {
812 HoprNodeSafeRegistryEvents::RegisteredNodeSafe(registered) => {
813 if self.chain_key.public().to_address() == registered.nodeAddress.into() {
814 info!(safe_address = %registered.safeAddress, "Node safe registered", );
815 return Ok(Some(ChainEventType::NodeSafeRegistered(registered.safeAddress.into())));
817 }
818 }
819 HoprNodeSafeRegistryEvents::DergisteredNodeSafe(deregistered) => {
820 if self.chain_key.public().to_address() == deregistered.nodeAddress.into() {
821 info!("Node safe unregistered");
822 }
824 }
825 HoprNodeSafeRegistryEvents::DomainSeparatorUpdated(domain_separator_updated) => {
826 self.db
827 .set_domain_separator(
828 Some(tx),
829 DomainSeparator::SafeRegistry,
830 domain_separator_updated.domainSeparator.0.into(),
831 )
832 .await?;
833 }
834 }
835
836 Ok(None)
837 }
838
839 async fn on_node_management_module_event(
840 &self,
841 _db: &OpenTransaction,
842 _event: HoprNodeManagementModuleEvents,
843 _is_synced: bool,
844 ) -> Result<Option<ChainEventType>> {
845 #[cfg(all(feature = "prometheus", not(test)))]
846 METRIC_INDEXER_LOG_COUNTERS.increment(&["node_management_module"]);
847
848 Ok(None)
850 }
851
852 async fn on_ticket_winning_probability_oracle_event(
853 &self,
854 tx: &OpenTransaction,
855 event: HoprWinningProbabilityOracleEvents,
856 _is_synced: bool,
857 ) -> Result<Option<ChainEventType>> {
858 #[cfg(all(feature = "prometheus", not(test)))]
859 METRIC_INDEXER_LOG_COUNTERS.increment(&["win_prob_oracle"]);
860
861 match event {
862 HoprWinningProbabilityOracleEvents::WinProbUpdated(update) => {
863 let old_minimum_win_prob: WinningProbability = update.oldWinProb.to_be_bytes().into();
864 let new_minimum_win_prob: WinningProbability = update.newWinProb.to_be_bytes().into();
865
866 trace!(
867 %old_minimum_win_prob,
868 %new_minimum_win_prob,
869 "on_ticket_minimum_win_prob_updated",
870 );
871
872 self.db
873 .set_minimum_incoming_ticket_win_prob(Some(tx), new_minimum_win_prob)
874 .await?;
875
876 info!(
877 %old_minimum_win_prob,
878 %new_minimum_win_prob,
879 "minimum ticket winning probability updated"
880 );
881
882 if old_minimum_win_prob.approx_cmp(&new_minimum_win_prob).is_lt() {
885 let mut selector: Option<TicketSelector> = None;
886 for channel in self.db.get_incoming_channels(tx.into()).await? {
887 selector = selector
888 .map(|s| s.also_on_channel(channel.get_id(), channel.channel_epoch))
889 .or_else(|| Some(TicketSelector::from(channel)));
890 }
891 if let Some(selector) = selector {
893 let num_rejected = self
894 .db
895 .mark_tickets_as(
896 selector.with_winning_probability(..new_minimum_win_prob),
897 TicketMarker::Rejected,
898 )
899 .await?;
900 info!(
901 count = num_rejected,
902 "unredeemed tickets were rejected, because the minimum winning probability has been \
903 increased"
904 );
905 }
906 }
907 }
908 _ => {
909 }
911 }
912 Ok(None)
913 }
914
915 async fn on_ticket_price_oracle_event(
916 &self,
917 tx: &OpenTransaction,
918 event: HoprTicketPriceOracleEvents,
919 _is_synced: bool,
920 ) -> Result<Option<ChainEventType>> {
921 #[cfg(all(feature = "prometheus", not(test)))]
922 METRIC_INDEXER_LOG_COUNTERS.increment(&["price_oracle"]);
923
924 match event {
925 HoprTicketPriceOracleEvents::TicketPriceUpdated(update) => {
926 trace!(
927 old = update._0.to_string(),
928 new = update._1.to_string(),
929 "on_ticket_price_updated",
930 );
931
932 self.db
933 .update_ticket_price(Some(tx), HoprBalance::from_be_bytes(update._1.to_be_bytes::<32>()))
934 .await?;
935
936 info!(price = %update._1, "ticket price updated");
937 }
938 HoprTicketPriceOracleEvents::OwnershipTransferred(_event) => {
939 }
941 }
942 Ok(None)
943 }
944
945 #[tracing::instrument(level = "debug", skip(self, slog), fields(log=%slog))]
946 async fn process_log_event(
947 &self,
948 tx: &OpenTransaction,
949 slog: SerializableLog,
950 is_synced: bool,
951 ) -> Result<Option<ChainEventType>> {
952 trace!(log = %slog, "log content");
953
954 let log = Log::from(slog.clone());
955
956 let primitive_log = alloy::primitives::Log::new(
957 slog.address.into(),
958 slog.topics.iter().map(|h| B256::from_slice(h.as_ref())).collect(),
959 slog.data.clone().into(),
960 )
961 .ok_or_else(|| {
962 CoreEthereumIndexerError::ProcessError(format!("failed to convert log to primitive log: {slog:?}"))
963 })?;
964
965 if log.address.eq(&self.addresses.announcements) {
966 let bn = log.block_number as u32;
967 let event = HoprAnnouncementsEvents::decode_log(&primitive_log)?;
968 self.on_announcement_event(tx, event.data, bn, is_synced).await
969 } else if log.address.eq(&self.addresses.channels) {
970 let event = HoprChannelsEvents::decode_log(&primitive_log)?;
971 self.on_channel_event(tx, event.data, is_synced).await
972 } else if log.address.eq(&self.addresses.network_registry) {
973 let event = HoprNetworkRegistryEvents::decode_log(&primitive_log)?;
974 self.on_network_registry_event(tx, event.data, is_synced).await
975 } else if log.address.eq(&self.addresses.token) {
976 let event = HoprTokenEvents::decode_log(&primitive_log)?;
977 self.on_token_event(tx, event.data, is_synced).await
978 } else if log.address.eq(&self.addresses.safe_registry) {
979 let event = HoprNodeSafeRegistryEvents::decode_log(&primitive_log)?;
980 self.on_node_safe_registry_event(tx, event.data, is_synced).await
981 } else if log.address.eq(&self.addresses.module_implementation) {
982 let event = HoprNodeManagementModuleEvents::decode_log(&primitive_log)?;
983 self.on_node_management_module_event(tx, event.data, is_synced).await
984 } else if log.address.eq(&self.addresses.price_oracle) {
985 let event = HoprTicketPriceOracleEvents::decode_log(&primitive_log)?;
986 self.on_ticket_price_oracle_event(tx, event.data, is_synced).await
987 } else if log.address.eq(&self.addresses.win_prob_oracle) {
988 let event = HoprWinningProbabilityOracleEvents::decode_log(&primitive_log)?;
989 self.on_ticket_winning_probability_oracle_event(tx, event.data, is_synced)
990 .await
991 } else {
992 #[cfg(all(feature = "prometheus", not(test)))]
993 METRIC_INDEXER_LOG_COUNTERS.increment(&["unknown"]);
994
995 error!(
996 address = %log.address, log = ?log,
997 "on_event error - unknown contract address, received log"
998 );
999 return Err(CoreEthereumIndexerError::UnknownContract(log.address));
1000 }
1001 }
1002}
1003
1004#[async_trait]
1005impl<T, Db> crate::traits::ChainLogHandler for ContractEventHandlers<T, Db>
1006where
1007 T: HoprIndexerRpcOperations + Clone + Send + Sync + 'static,
1008 Db: HoprDbAllOperations + Clone + Debug + Send + Sync + 'static,
1009{
1010 fn contract_addresses(&self) -> Vec<Address> {
1011 vec![
1012 self.addresses.announcements,
1013 self.addresses.channels,
1014 self.addresses.module_implementation,
1015 self.addresses.network_registry,
1016 self.addresses.price_oracle,
1017 self.addresses.win_prob_oracle,
1018 self.addresses.safe_registry,
1019 self.addresses.token,
1020 ]
1021 }
1022
1023 fn contract_addresses_map(&self) -> Arc<ContractAddresses> {
1024 self.addresses.clone()
1025 }
1026
1027 fn safe_address(&self) -> Address {
1028 self.safe_address
1029 }
1030
1031 fn contract_address_topics(&self, contract: Address) -> Vec<B256> {
1032 if contract.eq(&self.addresses.announcements) {
1033 crate::constants::topics::announcement()
1034 } else if contract.eq(&self.addresses.channels) {
1035 crate::constants::topics::channel()
1036 } else if contract.eq(&self.addresses.module_implementation) {
1037 crate::constants::topics::module_implementation()
1038 } else if contract.eq(&self.addresses.network_registry) {
1039 crate::constants::topics::network_registry()
1040 } else if contract.eq(&self.addresses.price_oracle) {
1041 crate::constants::topics::ticket_price_oracle()
1042 } else if contract.eq(&self.addresses.win_prob_oracle) {
1043 crate::constants::topics::winning_prob_oracle()
1044 } else if contract.eq(&self.addresses.safe_registry) {
1045 crate::constants::topics::node_safe_registry()
1046 } else {
1047 panic!("use of unsupported contract address: {contract}");
1048 }
1049 }
1050
1051 async fn collect_log_event(&self, slog: SerializableLog, is_synced: bool) -> Result<Option<SignificantChainEvent>> {
1052 let myself = self.clone();
1053 self.db
1054 .begin_transaction()
1055 .await?
1056 .perform(move |tx| {
1057 let log = slog.clone();
1058 let tx_hash = Hash::from(log.tx_hash);
1059 let log_id = log.log_index;
1060 let block_id = log.block_number;
1061
1062 Box::pin(async move {
1063 match myself.process_log_event(tx, log, is_synced).await {
1064 Ok(Some(event_type)) => {
1066 let significant_event = SignificantChainEvent { tx_hash, event_type };
1067 debug!(block_id, %tx_hash, log_id, ?significant_event, "indexer got significant_event");
1068 Ok(Some(significant_event))
1069 }
1070 Ok(None) => {
1071 debug!(block_id, %tx_hash, log_id, "no significant event in log");
1072 Ok(None)
1073 }
1074 Err(error) => {
1075 error!(block_id, %tx_hash, log_id, %error, "error processing log in tx");
1076 Err(error)
1077 }
1078 }
1079 })
1080 })
1081 .await
1082 }
1083}
1084
1085#[cfg(test)]
1086mod tests {
1087 use std::{
1088 sync::{Arc, atomic::Ordering},
1089 time::SystemTime,
1090 };
1091
1092 use alloy::{
1093 dyn_abi::DynSolValue,
1094 primitives::{Address as AlloyAddress, U256},
1095 sol_types::{SolEvent, SolValue},
1096 };
1097 use anyhow::{Context, anyhow};
1098 use hex_literal::hex;
1099 use hopr_chain_rpc::HoprIndexerRpcOperations;
1100 use hopr_chain_types::{
1101 ContractAddresses,
1102 chain_events::{ChainEventType, NetworkRegistryStatus},
1103 };
1104 use hopr_crypto_types::prelude::*;
1105 use hopr_db_sql::{
1106 HoprDbAllOperations, HoprDbGeneralModelOperations,
1107 accounts::{ChainOrPacketKey, HoprDbAccountOperations},
1108 api::{info::DomainSeparator, tickets::HoprDbTicketOperations},
1109 channels::HoprDbChannelOperations,
1110 db::HoprDb,
1111 info::HoprDbInfoOperations,
1112 prelude::HoprDbResolverOperations,
1113 registry::HoprDbRegistryOperations,
1114 };
1115 use hopr_internal_types::prelude::*;
1116 use hopr_primitive_types::prelude::*;
1117 use multiaddr::Multiaddr;
1118 use primitive_types::H256;
1119
1120 use super::ContractEventHandlers;
1121
1122 lazy_static::lazy_static! {
1123 static ref SELF_PRIV_KEY: OffchainKeypair = OffchainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
1124 static ref COUNTERPARTY_CHAIN_KEY: ChainKeypair = ChainKeypair::random();
1125 static ref COUNTERPARTY_CHAIN_ADDRESS: Address = COUNTERPARTY_CHAIN_KEY.public().to_address();
1126 static ref SELF_CHAIN_KEY: ChainKeypair = ChainKeypair::random();
1127 static ref SELF_CHAIN_ADDRESS: Address = SELF_CHAIN_KEY.public().to_address();
1128 static ref STAKE_ADDRESS: Address = "4331eaa9542b6b034c43090d9ec1c2198758dbc3".parse().expect("lazy static address should be constructible");
1129 static ref CHANNELS_ADDR: Address = "bab20aea98368220baa4e3b7f151273ee71df93b".parse().expect("lazy static address should be constructible"); static ref TOKEN_ADDR: Address = "47d1677e018e79dcdd8a9c554466cb1556fa5007".parse().expect("lazy static address should be constructible"); static ref NETWORK_REGISTRY_ADDR: Address = "a469d0225f884fb989cbad4fe289f6fd2fb98051".parse().expect("lazy static address should be constructible"); static ref NODE_SAFE_REGISTRY_ADDR: Address = "0dcd1bf9a1b36ce34237eeafef220932846bcd82".parse().expect("lazy static address should be constructible"); static ref ANNOUNCEMENTS_ADDR: Address = "11db4791bf45ef31a10ea4a1b5cb90f46cc72c7e".parse().expect("lazy static address should be constructible"); static ref SAFE_MANAGEMENT_MODULE_ADDR: Address = "9b91245a65ad469163a86e32b2281af7a25f38ce".parse().expect("lazy static address should be constructible"); static ref SAFE_INSTANCE_ADDR: Address = "b93d7fdd605fb64fdcc87f21590f950170719d47".parse().expect("lazy static address should be constructible"); static ref TICKET_PRICE_ORACLE_ADDR: Address = "11db4391bf45ef31a10ea4a1b5cb90f46cc72c7e".parse().expect("lazy static address should be constructible"); static ref WIN_PROB_ORACLE_ADDR: Address = "00db4391bf45ef31a10ea4a1b5cb90f46cc64c7e".parse().expect("lazy static address should be constructible"); }
1139
1140 mockall::mock! {
1141 pub(crate) IndexerRpcOperations {}
1142
1143 #[async_trait::async_trait]
1144 impl HoprIndexerRpcOperations for IndexerRpcOperations {
1145 async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64>;
1146
1147 async fn get_hopr_allowance(&self, owner: Address, spender: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
1148
1149 async fn get_xdai_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<XDaiBalance>;
1150
1151 async fn get_hopr_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
1152
1153 fn try_stream_logs<'a>(
1154 &'a self,
1155 start_block_number: u64,
1156 filters: hopr_chain_rpc::FilterSet,
1157 is_synced: bool,
1158 ) -> hopr_chain_rpc::errors::Result<std::pin::Pin<Box<dyn futures::Stream<Item=hopr_chain_rpc::BlockWithLogs> + Send + 'a> > >;
1159 }
1160 }
1161
1162 #[derive(Clone)]
1163 pub struct ClonableMockOperations {
1164 pub inner: Arc<MockIndexerRpcOperations>,
1165 }
1166
1167 #[async_trait::async_trait]
1168 impl HoprIndexerRpcOperations for ClonableMockOperations {
1169 async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64> {
1170 self.inner.block_number().await
1171 }
1172
1173 async fn get_hopr_allowance(
1174 &self,
1175 owner: Address,
1176 spender: Address,
1177 ) -> hopr_chain_rpc::errors::Result<HoprBalance> {
1178 self.inner.get_hopr_allowance(owner, spender).await
1179 }
1180
1181 async fn get_xdai_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<XDaiBalance> {
1182 self.inner.get_xdai_balance(address).await
1183 }
1184
1185 async fn get_hopr_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<HoprBalance> {
1186 self.inner.get_hopr_balance(address).await
1187 }
1188
1189 fn try_stream_logs<'a>(
1190 &'a self,
1191 start_block_number: u64,
1192 filters: hopr_chain_rpc::FilterSet,
1193 is_synced: bool,
1194 ) -> hopr_chain_rpc::errors::Result<
1195 std::pin::Pin<Box<dyn futures::Stream<Item = hopr_chain_rpc::BlockWithLogs> + Send + 'a>>,
1196 > {
1197 self.inner.try_stream_logs(start_block_number, filters, is_synced)
1198 }
1199 }
1200
1201 fn init_handlers<T: Clone, Db: HoprDbAllOperations + Clone>(
1202 rpc_operations: T,
1203 db: Db,
1204 ) -> ContractEventHandlers<T, Db> {
1205 ContractEventHandlers {
1206 addresses: Arc::new(ContractAddresses {
1207 channels: *CHANNELS_ADDR,
1208 token: *TOKEN_ADDR,
1209 network_registry: *NETWORK_REGISTRY_ADDR,
1210 network_registry_proxy: Default::default(),
1211 safe_registry: *NODE_SAFE_REGISTRY_ADDR,
1212 announcements: *ANNOUNCEMENTS_ADDR,
1213 module_implementation: *SAFE_MANAGEMENT_MODULE_ADDR,
1214 price_oracle: *TICKET_PRICE_ORACLE_ADDR,
1215 win_prob_oracle: *WIN_PROB_ORACLE_ADDR,
1216 stake_factory: Default::default(),
1217 }),
1218 chain_key: SELF_CHAIN_KEY.clone(),
1219 safe_address: STAKE_ADDRESS.clone(),
1220 db,
1221 rpc_operations,
1222 }
1223 }
1224
1225 fn test_log() -> SerializableLog {
1226 SerializableLog { ..Default::default() }
1227 }
1228
1229 #[tokio::test]
1230 async fn announce_keybinding() -> anyhow::Result<()> {
1231 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1232
1233 let rpc_operations = MockIndexerRpcOperations::new();
1234 let clonable_rpc_operations = ClonableMockOperations {
1236 inner: Arc::new(rpc_operations),
1237 };
1238
1239 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1240
1241 let keybinding = KeyBinding::new(*SELF_CHAIN_ADDRESS, &SELF_PRIV_KEY);
1242
1243 let keybinding_log = SerializableLog {
1244 address: handlers.addresses.announcements,
1245 topics: vec![
1246 hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::KeyBinding::SIGNATURE_HASH.into(),
1247 ],
1248 data: DynSolValue::Tuple(vec![
1249 DynSolValue::Bytes(keybinding.signature.as_ref().to_vec()),
1250 DynSolValue::Bytes(keybinding.packet_key.as_ref().to_vec()),
1251 DynSolValue::FixedBytes(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref()).into_word(), 32),
1252 ])
1253 .abi_encode_packed(),
1254 ..test_log()
1255 };
1256
1257 let account_entry = AccountEntry {
1258 public_key: *SELF_PRIV_KEY.public(),
1259 chain_addr: *SELF_CHAIN_ADDRESS,
1260 entry_type: AccountType::NotAnnounced,
1261 published_at: 0,
1262 };
1263
1264 let event_type = db
1265 .begin_transaction()
1266 .await?
1267 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, keybinding_log, true).await }))
1268 .await?;
1269
1270 assert!(event_type.is_none(), "keybinding does not have a chain event type");
1271
1272 assert_eq!(
1273 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1274 .await?
1275 .context("a value should be present")?,
1276 account_entry
1277 );
1278 Ok(())
1279 }
1280
1281 #[tokio::test]
1282 async fn announce_address_announcement() -> anyhow::Result<()> {
1283 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1284 let rpc_operations = MockIndexerRpcOperations::new();
1285 let clonable_rpc_operations = ClonableMockOperations {
1287 inner: Arc::new(rpc_operations),
1289 };
1290 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1291
1292 let account_entry = AccountEntry {
1294 public_key: *SELF_PRIV_KEY.public(),
1295 chain_addr: *SELF_CHAIN_ADDRESS,
1296 entry_type: AccountType::NotAnnounced,
1297 published_at: 1,
1298 };
1299 db.insert_account(None, account_entry.clone()).await?;
1300
1301 let test_multiaddr_empty: Multiaddr = "".parse()?;
1302
1303 let address_announcement_empty_log_encoded_data = DynSolValue::Tuple(vec![
1304 DynSolValue::Address(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref())),
1305 DynSolValue::String(test_multiaddr_empty.to_string()),
1306 ])
1307 .abi_encode();
1308
1309 let address_announcement_empty_log = SerializableLog {
1310 address: handlers.addresses.announcements,
1311 topics: vec![
1312 hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH
1313 .into(),
1314 ],
1315 data: address_announcement_empty_log_encoded_data[32..].into(),
1316 ..test_log()
1317 };
1318
1319 let handlers_clone = handlers.clone();
1320 let event_type = db
1321 .begin_transaction()
1322 .await?
1323 .perform(|tx| {
1324 Box::pin(async move {
1325 handlers_clone
1326 .process_log_event(tx, address_announcement_empty_log, true)
1327 .await
1328 })
1329 })
1330 .await?;
1331
1332 assert!(
1333 event_type.is_none(),
1334 "announcement of empty multiaddresses must pass through"
1335 );
1336
1337 assert_eq!(
1338 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1339 .await?
1340 .context("a value should be present")?,
1341 account_entry
1342 );
1343
1344 let test_multiaddr: Multiaddr = "/ip4/1.2.3.4/tcp/56".parse()?;
1345
1346 let address_announcement_log_encoded_data = DynSolValue::Tuple(vec![
1347 DynSolValue::Address(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref())),
1348 DynSolValue::String(test_multiaddr.to_string()),
1349 ])
1350 .abi_encode();
1351
1352 let address_announcement_log = SerializableLog {
1353 address: handlers.addresses.announcements,
1354 block_number: 1,
1355 topics: vec![
1356 hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH
1357 .into(),
1358 ],
1359 data: address_announcement_log_encoded_data[32..].into(),
1360 ..test_log()
1361 };
1362
1363 let announced_account_entry = AccountEntry {
1364 public_key: *SELF_PRIV_KEY.public(),
1365 chain_addr: *SELF_CHAIN_ADDRESS,
1366 entry_type: AccountType::Announced {
1367 multiaddr: test_multiaddr.clone(),
1368 updated_block: 1,
1369 },
1370 published_at: 1,
1371 };
1372
1373 let handlers_clone = handlers.clone();
1374 let event_type = db
1375 .begin_transaction()
1376 .await?
1377 .perform(|tx| {
1378 Box::pin(async move {
1379 handlers_clone
1380 .process_log_event(tx, address_announcement_log, true)
1381 .await
1382 })
1383 })
1384 .await?;
1385
1386 assert!(
1387 matches!(event_type, Some(ChainEventType::Announcement { multiaddresses,.. }) if multiaddresses == vec![test_multiaddr]),
1388 "must return the latest announce multiaddress"
1389 );
1390
1391 assert_eq!(
1392 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1393 .await?
1394 .context("a value should be present")?,
1395 announced_account_entry
1396 );
1397
1398 assert_eq!(
1399 Some(*SELF_CHAIN_ADDRESS),
1400 db.resolve_chain_key(SELF_PRIV_KEY.public()).await?,
1401 "must resolve correct chain key"
1402 );
1403
1404 assert_eq!(
1405 Some(*SELF_PRIV_KEY.public()),
1406 db.resolve_packet_key(&SELF_CHAIN_ADDRESS).await?,
1407 "must resolve correct packet key"
1408 );
1409
1410 let test_multiaddr_dns: Multiaddr = "/dns4/useful.domain/tcp/56".parse()?;
1411
1412 let address_announcement_dns_log_encoded_data = DynSolValue::Tuple(vec![
1413 DynSolValue::Address(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref())),
1414 DynSolValue::String(test_multiaddr_dns.to_string()),
1415 ])
1416 .abi_encode();
1417
1418 let address_announcement_dns_log = SerializableLog {
1419 address: handlers.addresses.announcements,
1420 block_number: 2,
1421 topics: vec![
1422 hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH
1423 .into(),
1424 ],
1425 data: address_announcement_dns_log_encoded_data[32..].into(),
1426 ..test_log()
1427 };
1428
1429 let announced_dns_account_entry = AccountEntry {
1430 public_key: *SELF_PRIV_KEY.public(),
1431 chain_addr: *SELF_CHAIN_ADDRESS,
1432 entry_type: AccountType::Announced {
1433 multiaddr: test_multiaddr_dns.clone(),
1434 updated_block: 2,
1435 },
1436 published_at: 1,
1437 };
1438
1439 let event_type = db
1440 .begin_transaction()
1441 .await?
1442 .perform(|tx| {
1443 Box::pin(async move { handlers.process_log_event(tx, address_announcement_dns_log, true).await })
1444 })
1445 .await?;
1446
1447 assert!(
1448 matches!(event_type, Some(ChainEventType::Announcement { multiaddresses,.. }) if multiaddresses == vec![test_multiaddr_dns]),
1449 "must return the latest announce multiaddress"
1450 );
1451
1452 assert_eq!(
1453 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1454 .await?
1455 .context("a value should be present")?,
1456 announced_dns_account_entry
1457 );
1458
1459 assert_eq!(
1460 Some(*SELF_CHAIN_ADDRESS),
1461 db.resolve_chain_key(SELF_PRIV_KEY.public()).await?,
1462 "must resolve correct chain key"
1463 );
1464
1465 assert_eq!(
1466 Some(*SELF_PRIV_KEY.public()),
1467 db.resolve_packet_key(&SELF_CHAIN_ADDRESS).await?,
1468 "must resolve correct packet key"
1469 );
1470 Ok(())
1471 }
1472
1473 #[tokio::test]
1474 async fn announce_revoke() -> anyhow::Result<()> {
1475 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1476 let rpc_operations = MockIndexerRpcOperations::new();
1477 let clonable_rpc_operations = ClonableMockOperations {
1479 inner: Arc::new(rpc_operations),
1481 };
1482 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1483
1484 let test_multiaddr: Multiaddr = "/ip4/1.2.3.4/tcp/56".parse()?;
1485
1486 let announced_account_entry = AccountEntry {
1488 public_key: *SELF_PRIV_KEY.public(),
1489 chain_addr: *SELF_CHAIN_ADDRESS,
1490 entry_type: AccountType::Announced {
1491 multiaddr: test_multiaddr,
1492 updated_block: 0,
1493 },
1494 published_at: 1,
1495 };
1496
1497 db.insert_account(None, announced_account_entry).await?;
1498
1499 let encoded_data = (AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref()),).abi_encode();
1500
1501 let revoke_announcement_log = SerializableLog {
1502 address: handlers.addresses.announcements,
1503 topics: vec![
1504 hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::RevokeAnnouncement::SIGNATURE_HASH
1505 .into(),
1506 ],
1507 data: encoded_data,
1508 ..test_log()
1509 };
1510
1511 let account_entry = AccountEntry {
1512 public_key: *SELF_PRIV_KEY.public(),
1513 chain_addr: *SELF_CHAIN_ADDRESS,
1514 entry_type: AccountType::NotAnnounced,
1515 published_at: 1,
1516 };
1517
1518 let event_type = db
1519 .begin_transaction()
1520 .await?
1521 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, revoke_announcement_log, true).await }))
1522 .await?;
1523
1524 assert!(
1525 event_type.is_none(),
1526 "revoke announcement does not have chain event type"
1527 );
1528
1529 assert_eq!(
1530 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1531 .await?
1532 .context("a value should be present")?,
1533 account_entry
1534 );
1535 Ok(())
1536 }
1537
1538 #[tokio::test]
1539 async fn on_token_transfer_to() -> anyhow::Result<()> {
1540 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1541
1542 let value = U256::MAX;
1543 let target_hopr_balance = HoprBalance::from(primitive_types::U256::from_big_endian(
1544 value.to_be_bytes_vec().as_slice(),
1545 ));
1546
1547 let mut rpc_operations = MockIndexerRpcOperations::new();
1548 rpc_operations
1549 .expect_get_hopr_balance()
1550 .times(1)
1551 .return_once(move |_| Ok(target_hopr_balance.clone()));
1552 rpc_operations
1553 .expect_get_hopr_allowance()
1554 .times(1)
1555 .returning(move |_, _| Ok(HoprBalance::from(primitive_types::U256::from(1000u64))));
1556 let clonable_rpc_operations = ClonableMockOperations {
1557 inner: Arc::new(rpc_operations),
1558 };
1559
1560 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1561
1562 let encoded_data = (value).abi_encode();
1563
1564 let transferred_log = SerializableLog {
1565 address: handlers.addresses.token,
1566 topics: vec![
1567 hopr_bindings::hoprtoken::HoprToken::Transfer::SIGNATURE_HASH.into(),
1568 H256::from_slice(&Address::default().to_bytes32()).into(),
1569 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1570 ],
1571 data: encoded_data,
1572 ..test_log()
1573 };
1574
1575 let event_type = db
1576 .begin_transaction()
1577 .await?
1578 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, transferred_log, true).await }))
1579 .await?;
1580
1581 assert!(event_type.is_none(), "token transfer does not have chain event type");
1582
1583 assert_eq!(db.get_safe_hopr_balance(None).await?, target_hopr_balance);
1584
1585 Ok(())
1586 }
1587
1588 #[test_log::test(tokio::test)]
1589 async fn on_token_transfer_from() -> anyhow::Result<()> {
1590 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1591
1592 let mut rpc_operations = MockIndexerRpcOperations::new();
1593 rpc_operations
1594 .expect_get_hopr_balance()
1595 .times(1)
1596 .return_once(|_| Ok(HoprBalance::zero()));
1597 rpc_operations
1598 .expect_get_hopr_allowance()
1599 .times(1)
1600 .returning(move |_, _| Ok(HoprBalance::from(primitive_types::U256::from(1000u64))));
1601 let clonable_rpc_operations = ClonableMockOperations {
1602 inner: Arc::new(rpc_operations),
1603 };
1604
1605 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1606
1607 let value = U256::MAX;
1608
1609 let encoded_data = (value).abi_encode();
1610
1611 db.set_safe_hopr_balance(
1612 None,
1613 HoprBalance::from(primitive_types::U256::from_big_endian(
1614 value.to_be_bytes_vec().as_slice(),
1615 )),
1616 )
1617 .await?;
1618
1619 let transferred_log = SerializableLog {
1620 address: handlers.addresses.token,
1621 topics: vec![
1622 hopr_bindings::hoprtoken::HoprToken::Transfer::SIGNATURE_HASH.into(),
1623 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1624 H256::from_slice(&Address::default().to_bytes32()).into(),
1625 ],
1626 data: encoded_data,
1627 ..test_log()
1628 };
1629
1630 let event_type = db
1631 .begin_transaction()
1632 .await?
1633 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, transferred_log, true).await }))
1634 .await?;
1635
1636 assert!(event_type.is_none(), "token transfer does not have chain event type");
1637
1638 assert_eq!(db.get_safe_hopr_balance(None).await?, HoprBalance::zero());
1639
1640 Ok(())
1641 }
1642
1643 #[tokio::test]
1644 async fn on_token_approval_correct() -> anyhow::Result<()> {
1645 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1646
1647 let target_allowance = HoprBalance::from(primitive_types::U256::from(1000u64));
1648 let mut rpc_operations = MockIndexerRpcOperations::new();
1649 rpc_operations
1650 .expect_get_hopr_allowance()
1651 .times(2)
1652 .returning(move |_, _| Ok(target_allowance.clone()));
1653 let clonable_rpc_operations = ClonableMockOperations {
1654 inner: Arc::new(rpc_operations),
1655 };
1656
1657 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1658
1659 let encoded_data = (U256::from(1000u64)).abi_encode();
1660
1661 let approval_log = SerializableLog {
1662 address: handlers.addresses.token,
1663 topics: vec![
1664 hopr_bindings::hoprtoken::HoprToken::Approval::SIGNATURE_HASH.into(),
1665 H256::from_slice(&handlers.safe_address.to_bytes32()).into(),
1666 H256::from_slice(&handlers.addresses.channels.to_bytes32()).into(),
1667 ],
1668 data: encoded_data,
1669 ..test_log()
1670 };
1671
1672 assert_eq!(db.get_safe_hopr_allowance(None).await?, HoprBalance::zero());
1674
1675 let approval_log_clone = approval_log.clone();
1676 let handlers_clone = handlers.clone();
1677 let event_type = db
1678 .begin_transaction()
1679 .await?
1680 .perform(|tx| Box::pin(async move { handlers_clone.process_log_event(tx, approval_log_clone, true).await }))
1681 .await?;
1682
1683 assert!(event_type.is_none(), "token approval does not have chain event type");
1684
1685 assert_eq!(db.get_safe_hopr_allowance(None).await?, target_allowance.clone());
1687
1688 let _ = db
1690 .set_safe_hopr_allowance(None, HoprBalance::from(primitive_types::U256::from(10u64)))
1691 .await;
1692 assert_eq!(
1693 db.get_safe_hopr_allowance(None).await?,
1694 HoprBalance::from(primitive_types::U256::from(10u64))
1695 );
1696
1697 let handlers_clone = handlers.clone();
1698 let event_type = db
1699 .begin_transaction()
1700 .await?
1701 .perform(|tx| Box::pin(async move { handlers_clone.process_log_event(tx, approval_log, true).await }))
1702 .await?;
1703
1704 assert!(event_type.is_none(), "token approval does not have chain event type");
1705
1706 assert_eq!(db.get_safe_hopr_allowance(None).await?, target_allowance);
1707 Ok(())
1708 }
1709
1710 #[tokio::test]
1711 async fn on_network_registry_event_registered() -> anyhow::Result<()> {
1712 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1713 let rpc_operations = MockIndexerRpcOperations::new();
1714 let clonable_rpc_operations = ClonableMockOperations {
1716 inner: Arc::new(rpc_operations),
1718 };
1719 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1720
1721 let encoded_data = ().abi_encode();
1722
1723 let registered_log = SerializableLog {
1724 address: handlers.addresses.network_registry,
1725 topics: vec![
1726 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::Registered::SIGNATURE_HASH.into(),
1727 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1728 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1729 ],
1730 data: encoded_data,
1731 ..test_log()
1733 };
1734
1735 assert!(
1736 !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1737 .await?
1738 );
1739
1740 let event_type = db
1741 .begin_transaction()
1742 .await?
1743 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1744 .await?;
1745
1746 assert!(
1747 matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Allowed),
1748 "must return correct NR update"
1749 );
1750
1751 assert!(
1752 db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1753 .await?,
1754 "must be allowed in NR"
1755 );
1756 Ok(())
1757 }
1758
1759 #[tokio::test]
1760 async fn on_network_registry_event_registered_by_manager() -> anyhow::Result<()> {
1761 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1762 let rpc_operations = MockIndexerRpcOperations::new();
1763 let clonable_rpc_operations = ClonableMockOperations {
1765 inner: Arc::new(rpc_operations),
1767 };
1768 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1769
1770 let registered_log = SerializableLog {
1771 address: handlers.addresses.network_registry,
1772 topics: vec![
1773 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::RegisteredByManager::SIGNATURE_HASH.into(),
1774 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1776 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1777 ],
1778 data: ().abi_encode(),
1779 ..test_log()
1781 };
1782
1783 assert!(
1784 !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1785 .await?
1786 );
1787
1788 let event_type = db
1789 .begin_transaction()
1790 .await?
1791 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1792 .await?;
1793
1794 assert!(
1795 matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Allowed),
1796 "must return correct NR update"
1797 );
1798
1799 assert!(
1800 db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1801 .await?,
1802 "must be allowed in NR"
1803 );
1804 Ok(())
1805 }
1806
1807 #[tokio::test]
1808 async fn on_network_registry_event_deregistered() -> anyhow::Result<()> {
1809 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1810 let rpc_operations = MockIndexerRpcOperations::new();
1811 let clonable_rpc_operations = ClonableMockOperations {
1813 inner: Arc::new(rpc_operations),
1815 };
1816 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1817
1818 db.set_access_in_network_registry(None, *SELF_CHAIN_ADDRESS, true)
1819 .await?;
1820
1821 let encoded_data = ().abi_encode();
1822
1823 let registered_log = SerializableLog {
1824 address: handlers.addresses.network_registry,
1825 topics: vec![
1826 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::Deregistered::SIGNATURE_HASH.into(),
1827 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1828 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1829 ],
1830 data: encoded_data,
1831 ..test_log()
1832 };
1833
1834 assert!(
1835 db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1836 .await?
1837 );
1838
1839 let event_type = db
1840 .begin_transaction()
1841 .await?
1842 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1843 .await?;
1844
1845 assert!(
1846 matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Denied),
1847 "must return correct NR update"
1848 );
1849
1850 assert!(
1851 !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1852 .await?,
1853 "must not be allowed in NR"
1854 );
1855 Ok(())
1856 }
1857
1858 #[tokio::test]
1859 async fn on_network_registry_event_deregistered_by_manager() -> anyhow::Result<()> {
1860 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1861 let rpc_operations = MockIndexerRpcOperations::new();
1862 let clonable_rpc_operations = ClonableMockOperations {
1864 inner: Arc::new(rpc_operations),
1866 };
1867 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1868
1869 db.set_access_in_network_registry(None, *SELF_CHAIN_ADDRESS, true)
1870 .await?;
1871
1872 let encoded_data = ().abi_encode();
1873
1874 let registered_log = SerializableLog {
1875 address: handlers.addresses.network_registry,
1876 topics: vec![
1877 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::DeregisteredByManager::SIGNATURE_HASH.into(),
1878 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1880 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1881 ],
1882 data: encoded_data,
1883 ..test_log()
1884 };
1885
1886 assert!(
1887 db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1888 .await?
1889 );
1890
1891 let event_type = db
1892 .begin_transaction()
1893 .await?
1894 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1895 .await?;
1896
1897 assert!(
1898 matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Denied),
1899 "must return correct NR update"
1900 );
1901
1902 assert!(
1903 !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1904 .await?,
1905 "must not be allowed in NR"
1906 );
1907 Ok(())
1908 }
1909
1910 #[tokio::test]
1911 async fn on_network_registry_event_enabled() -> anyhow::Result<()> {
1912 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1913 let rpc_operations = MockIndexerRpcOperations::new();
1914 let clonable_rpc_operations = ClonableMockOperations {
1916 inner: Arc::new(rpc_operations),
1918 };
1919 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1920
1921 let encoded_data = ().abi_encode();
1922
1923 let nr_enabled = SerializableLog {
1924 address: handlers.addresses.network_registry,
1925 topics: vec![
1926 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::NetworkRegistryStatusUpdated::SIGNATURE_HASH
1927 .into(),
1928 H256::from_low_u64_be(1).into(),
1930 ],
1931 data: encoded_data,
1932 ..test_log()
1933 };
1934
1935 let event_type = db
1936 .begin_transaction()
1937 .await?
1938 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, nr_enabled, false).await }))
1939 .await?;
1940
1941 assert!(event_type.is_none(), "there's no chain event type for nr disable");
1942
1943 assert!(db.get_indexer_data(None).await?.nr_enabled);
1944 Ok(())
1945 }
1946
1947 #[tokio::test]
1948 async fn on_network_registry_event_disabled() -> anyhow::Result<()> {
1949 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1950 let rpc_operations = MockIndexerRpcOperations::new();
1951 let clonable_rpc_operations = ClonableMockOperations {
1953 inner: Arc::new(rpc_operations),
1955 };
1956 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1957
1958 db.set_network_registry_enabled(None, true).await?;
1959
1960 let encoded_data = ().abi_encode();
1961
1962 let nr_disabled = SerializableLog {
1963 address: handlers.addresses.network_registry,
1964 topics: vec![
1965 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::NetworkRegistryStatusUpdated::SIGNATURE_HASH
1966 .into(),
1967 H256::from_low_u64_be(0).into(),
1969 ],
1970 data: encoded_data,
1971 ..test_log()
1972 };
1973
1974 let event_type = db
1975 .begin_transaction()
1976 .await?
1977 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, nr_disabled, false).await }))
1978 .await?;
1979
1980 assert!(event_type.is_none(), "there's no chain event type for nr enable");
1981
1982 assert!(!db.get_indexer_data(None).await?.nr_enabled);
1983 Ok(())
1984 }
1985
1986 #[tokio::test]
1987 async fn on_network_registry_set_eligible() -> anyhow::Result<()> {
1988 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1989 let rpc_operations = MockIndexerRpcOperations::new();
1990 let clonable_rpc_operations = ClonableMockOperations {
1992 inner: Arc::new(rpc_operations),
1994 };
1995 let handlers = init_handlers(clonable_rpc_operations, db.clone());
1996
1997 let encoded_data = ().abi_encode();
1998
1999 let set_eligible = SerializableLog {
2000 address: handlers.addresses.network_registry,
2001 topics: vec![
2002 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::EligibilityUpdated::SIGNATURE_HASH.into(),
2003 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
2005 H256::from_low_u64_be(1).into(),
2006 ],
2007 data: encoded_data,
2008 ..test_log()
2009 };
2010
2011 let event_type = db
2012 .begin_transaction()
2013 .await?
2014 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, set_eligible, false).await }))
2015 .await?;
2016
2017 assert!(
2018 event_type.is_none(),
2019 "there's no chain event type for setting nr eligibility"
2020 );
2021
2022 assert!(db.is_safe_eligible(None, *STAKE_ADDRESS).await?);
2023
2024 Ok(())
2025 }
2026
2027 #[tokio::test]
2028 async fn on_network_registry_set_not_eligible() -> anyhow::Result<()> {
2029 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2030 let rpc_operations = MockIndexerRpcOperations::new();
2031 let clonable_rpc_operations = ClonableMockOperations {
2033 inner: Arc::new(rpc_operations),
2035 };
2036 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2037
2038 db.set_safe_eligibility(None, *STAKE_ADDRESS, false).await?;
2039
2040 let encoded_data = ().abi_encode();
2041
2042 let set_eligible = SerializableLog {
2043 address: handlers.addresses.network_registry,
2044 topics: vec![
2045 hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::EligibilityUpdated::SIGNATURE_HASH.into(),
2046 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
2048 H256::from_low_u64_be(0).into(),
2049 ],
2050 data: encoded_data,
2051 ..test_log()
2052 };
2053
2054 let event_type = db
2055 .begin_transaction()
2056 .await?
2057 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, set_eligible, true).await }))
2058 .await?;
2059
2060 assert!(
2061 event_type.is_none(),
2062 "there's no chain event type for unsetting nr eligibility"
2063 );
2064
2065 assert!(!db.is_safe_eligible(None, *STAKE_ADDRESS).await?);
2066
2067 Ok(())
2068 }
2069
2070 #[tokio::test]
2071 async fn on_channel_event_balance_increased() -> anyhow::Result<()> {
2072 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2073
2074 let value = U256::MAX;
2075 let target_hopr_balance = HoprBalance::from(primitive_types::U256::from_big_endian(
2076 value.to_be_bytes_vec().as_slice(),
2077 ));
2078
2079 let mut rpc_operations = MockIndexerRpcOperations::new();
2080 rpc_operations
2081 .expect_get_hopr_balance()
2082 .times(1)
2083 .return_once(move |_| Ok(target_hopr_balance.clone()));
2084 rpc_operations
2085 .expect_get_hopr_allowance()
2086 .times(1)
2087 .returning(move |_, _| Ok(HoprBalance::from(primitive_types::U256::from(1000u64))));
2088 let clonable_rpc_operations = ClonableMockOperations {
2089 inner: Arc::new(rpc_operations),
2090 };
2091 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2092
2093 let channel = ChannelEntry::new(
2094 *SELF_CHAIN_ADDRESS,
2095 *COUNTERPARTY_CHAIN_ADDRESS,
2096 0.into(),
2097 primitive_types::U256::zero(),
2098 ChannelStatus::Open,
2099 primitive_types::U256::one(),
2100 );
2101
2102 db.upsert_channel(None, channel).await?;
2103
2104 let solidity_balance: HoprBalance = primitive_types::U256::from((1u128 << 96) - 1).into();
2105 let diff = solidity_balance - channel.balance;
2106
2107 let encoded_data = (solidity_balance.amount().to_be_bytes()).abi_encode();
2108
2109 let balance_increased_log = SerializableLog {
2110 address: handlers.addresses.channels,
2111 topics: vec![
2112 hopr_bindings::hoprchannels::HoprChannels::ChannelBalanceIncreased::SIGNATURE_HASH.into(),
2113 H256::from_slice(channel.get_id().as_ref()).into(),
2115 ],
2116 data: encoded_data,
2117 ..test_log()
2118 };
2119
2120 let event_type = db
2121 .begin_transaction()
2122 .await?
2123 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_increased_log, true).await }))
2124 .await?;
2125
2126 let channel = db
2127 .get_channel_by_id(None, &channel.get_id())
2128 .await?
2129 .context("a value should be present")?;
2130
2131 assert!(
2132 matches!(event_type, Some(ChainEventType::ChannelBalanceIncreased(c, b)) if c == channel && b == diff),
2133 "must return updated channel entry and balance diff"
2134 );
2135
2136 assert_eq!(solidity_balance, channel.balance, "balance must be updated");
2137 Ok(())
2138 }
2139
2140 #[tokio::test]
2141 async fn on_channel_event_domain_separator_updated() -> anyhow::Result<()> {
2142 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2143 let rpc_operations = MockIndexerRpcOperations::new();
2144 let clonable_rpc_operations = ClonableMockOperations {
2146 inner: Arc::new(rpc_operations),
2148 };
2149 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2150
2151 let separator = Hash::from(hopr_crypto_random::random_bytes());
2152
2153 let encoded_data = ().abi_encode();
2154
2155 let channels_dst_updated = SerializableLog {
2156 address: handlers.addresses.channels,
2157 topics: vec![
2158 hopr_bindings::hoprchannels::HoprChannels::DomainSeparatorUpdated::SIGNATURE_HASH.into(),
2159 H256::from_slice(separator.as_ref()).into(),
2161 ],
2162 data: encoded_data,
2163 ..test_log()
2164 };
2165
2166 assert!(db.get_indexer_data(None).await?.channels_dst.is_none());
2167
2168 let event_type = db
2169 .begin_transaction()
2170 .await?
2171 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channels_dst_updated, true).await }))
2172 .await?;
2173
2174 assert!(
2175 event_type.is_none(),
2176 "there's no chain event type for channel dst update"
2177 );
2178
2179 assert_eq!(
2180 separator,
2181 db.get_indexer_data(None)
2182 .await?
2183 .channels_dst
2184 .context("a value should be present")?,
2185 "separator must be updated"
2186 );
2187 Ok(())
2188 }
2189
2190 #[tokio::test]
2191 async fn on_channel_event_balance_decreased() -> anyhow::Result<()> {
2192 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2193
2194 let value = U256::MAX;
2195 let target_hopr_balance = HoprBalance::from(primitive_types::U256::from_big_endian(
2196 value.to_be_bytes_vec().as_slice(),
2197 ));
2198
2199 let mut rpc_operations = MockIndexerRpcOperations::new();
2200 rpc_operations
2201 .expect_get_hopr_balance()
2202 .times(1)
2203 .return_once(move |_| Ok(target_hopr_balance.clone()));
2204 let clonable_rpc_operations = ClonableMockOperations {
2205 inner: Arc::new(rpc_operations),
2206 };
2207 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2208
2209 let channel = ChannelEntry::new(
2210 *SELF_CHAIN_ADDRESS,
2211 *COUNTERPARTY_CHAIN_ADDRESS,
2212 HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1)),
2213 primitive_types::U256::zero(),
2214 ChannelStatus::Open,
2215 primitive_types::U256::one(),
2216 );
2217
2218 db.upsert_channel(None, channel).await?;
2219
2220 let solidity_balance: HoprBalance = primitive_types::U256::from((1u128 << 96) - 2).into();
2221 let diff = channel.balance - solidity_balance;
2222
2223 let encoded_data = DynSolValue::Tuple(vec![DynSolValue::Uint(
2225 U256::from_be_slice(&solidity_balance.amount().to_be_bytes()),
2226 256,
2227 )])
2228 .abi_encode();
2229
2230 let balance_decreased_log = SerializableLog {
2231 address: handlers.addresses.channels,
2232 topics: vec![
2233 hopr_bindings::hoprchannels::HoprChannels::ChannelBalanceDecreased::SIGNATURE_HASH.into(),
2234 H256::from_slice(channel.get_id().as_ref()).into(),
2236 ],
2237 data: encoded_data,
2238 ..test_log()
2239 };
2240
2241 let event_type = db
2242 .begin_transaction()
2243 .await?
2244 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_decreased_log, true).await }))
2245 .await?;
2246
2247 let channel = db
2248 .get_channel_by_id(None, &channel.get_id())
2249 .await?
2250 .context("a value should be present")?;
2251
2252 assert!(
2253 matches!(event_type, Some(ChainEventType::ChannelBalanceDecreased(c, b)) if c == channel && b == diff),
2254 "must return updated channel entry and balance diff"
2255 );
2256
2257 assert_eq!(solidity_balance, channel.balance, "balance must be updated");
2258 Ok(())
2259 }
2260
2261 #[tokio::test]
2262 async fn on_channel_closed() -> anyhow::Result<()> {
2263 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2264 let rpc_operations = MockIndexerRpcOperations::new();
2265 let clonable_rpc_operations = ClonableMockOperations {
2267 inner: Arc::new(rpc_operations),
2269 };
2270 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2271
2272 let starting_balance = HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1));
2273
2274 let channel = ChannelEntry::new(
2275 *SELF_CHAIN_ADDRESS,
2276 *COUNTERPARTY_CHAIN_ADDRESS,
2277 starting_balance,
2278 primitive_types::U256::zero(),
2279 ChannelStatus::Open,
2280 primitive_types::U256::one(),
2281 );
2282
2283 db.upsert_channel(None, channel).await?;
2284
2285 let encoded_data = ().abi_encode();
2286
2287 let channel_closed_log = SerializableLog {
2288 address: handlers.addresses.channels,
2289 topics: vec![
2290 hopr_bindings::hoprchannels::HoprChannels::ChannelClosed::SIGNATURE_HASH.into(),
2291 H256::from_slice(channel.get_id().as_ref()).into(),
2293 ],
2294 data: encoded_data,
2295 ..test_log()
2296 };
2297
2298 let event_type = db
2299 .begin_transaction()
2300 .await?
2301 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_closed_log, true).await }))
2302 .await?;
2303
2304 let closed_channel = db
2305 .get_channel_by_id(None, &channel.get_id())
2306 .await?
2307 .context("a value should be present")?;
2308
2309 assert!(
2310 matches!(event_type, Some(ChainEventType::ChannelClosed(c)) if c == closed_channel),
2311 "must return the updated channel entry"
2312 );
2313
2314 assert_eq!(closed_channel.status, ChannelStatus::Closed);
2315 assert_eq!(closed_channel.ticket_index, 0u64.into());
2316 assert_eq!(
2317 0,
2318 db.get_outgoing_ticket_index(closed_channel.get_id())
2319 .await?
2320 .load(Ordering::Relaxed)
2321 );
2322
2323 assert!(closed_channel.balance.amount().eq(&primitive_types::U256::zero()));
2324 Ok(())
2325 }
2326
2327 #[tokio::test]
2328 async fn on_foreign_channel_closed() -> anyhow::Result<()> {
2329 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2330 let rpc_operations = MockIndexerRpcOperations::new();
2331 let clonable_rpc_operations = ClonableMockOperations {
2333 inner: Arc::new(rpc_operations),
2335 };
2336 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2337
2338 let starting_balance = HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1));
2339
2340 let channel = ChannelEntry::new(
2341 Address::new(&hex!("B7397C218766eBe6A1A634df523A1a7e412e67eA")),
2342 Address::new(&hex!("D4fdec44DB9D44B8f2b6d529620f9C0C7066A2c1")),
2343 starting_balance,
2344 primitive_types::U256::zero(),
2345 ChannelStatus::Open,
2346 primitive_types::U256::one(),
2347 );
2348
2349 db.upsert_channel(None, channel).await?;
2350
2351 let encoded_data = ().abi_encode();
2352
2353 let channel_closed_log = SerializableLog {
2354 address: handlers.addresses.channels,
2355 topics: vec![
2356 hopr_bindings::hoprchannels::HoprChannels::ChannelClosed::SIGNATURE_HASH.into(),
2357 H256::from_slice(channel.get_id().as_ref()).into(),
2359 ],
2360 data: encoded_data,
2361 ..test_log()
2362 };
2363
2364 let event_type = db
2365 .begin_transaction()
2366 .await?
2367 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_closed_log, true).await }))
2368 .await?;
2369
2370 let closed_channel = db.get_channel_by_id(None, &channel.get_id()).await?;
2371
2372 assert_eq!(None, closed_channel, "foreign channel must be deleted");
2373
2374 assert!(
2375 matches!(event_type, Some(ChainEventType::ChannelClosed(c)) if c.get_id() == channel.get_id()),
2376 "must return the closed channel entry"
2377 );
2378
2379 Ok(())
2380 }
2381
2382 #[tokio::test]
2383 async fn on_channel_opened() -> anyhow::Result<()> {
2384 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2385 let rpc_operations = MockIndexerRpcOperations::new();
2386 let clonable_rpc_operations = ClonableMockOperations {
2388 inner: Arc::new(rpc_operations),
2390 };
2391 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2392
2393 let channel_id = generate_channel_id(&SELF_CHAIN_ADDRESS, &COUNTERPARTY_CHAIN_ADDRESS);
2394
2395 let encoded_data = ().abi_encode();
2396
2397 let channel_opened_log = SerializableLog {
2398 address: handlers.addresses.channels,
2399 topics: vec![
2400 hopr_bindings::hoprchannels::HoprChannels::ChannelOpened::SIGNATURE_HASH.into(),
2401 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2403 H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2404 ],
2405 data: encoded_data,
2406 ..test_log()
2407 };
2408
2409 let event_type = db
2410 .begin_transaction()
2411 .await?
2412 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log, true).await }))
2413 .await?;
2414
2415 let channel = db
2416 .get_channel_by_id(None, &channel_id)
2417 .await?
2418 .context("a value should be present")?;
2419
2420 assert!(
2421 matches!(event_type, Some(ChainEventType::ChannelOpened(c)) if c == channel),
2422 "must return the updated channel entry"
2423 );
2424
2425 assert_eq!(channel.status, ChannelStatus::Open);
2426 assert_eq!(channel.channel_epoch, 1u64.into());
2427 assert_eq!(channel.ticket_index, 0u64.into());
2428 assert_eq!(
2429 0,
2430 db.get_outgoing_ticket_index(channel.get_id())
2431 .await?
2432 .load(Ordering::Relaxed)
2433 );
2434 Ok(())
2435 }
2436
2437 #[tokio::test]
2438 async fn on_channel_reopened() -> anyhow::Result<()> {
2439 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2440 let rpc_operations = MockIndexerRpcOperations::new();
2441 let clonable_rpc_operations = ClonableMockOperations {
2443 inner: Arc::new(rpc_operations),
2445 };
2446 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2447
2448 let channel = ChannelEntry::new(
2449 *SELF_CHAIN_ADDRESS,
2450 *COUNTERPARTY_CHAIN_ADDRESS,
2451 HoprBalance::zero(),
2452 primitive_types::U256::zero(),
2453 ChannelStatus::Closed,
2454 3.into(),
2455 );
2456
2457 db.upsert_channel(None, channel).await?;
2458
2459 let encoded_data = ().abi_encode();
2460
2461 let channel_opened_log = SerializableLog {
2462 address: handlers.addresses.channels,
2463 topics: vec![
2464 hopr_bindings::hoprchannels::HoprChannels::ChannelOpened::SIGNATURE_HASH.into(),
2465 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2467 H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2468 ],
2469 data: encoded_data,
2470 ..test_log()
2471 };
2472
2473 let event_type = db
2474 .begin_transaction()
2475 .await?
2476 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log, true).await }))
2477 .await?;
2478
2479 let channel = db
2480 .get_channel_by_id(None, &channel.get_id())
2481 .await?
2482 .context("a value should be present")?;
2483
2484 assert!(
2485 matches!(event_type, Some(ChainEventType::ChannelOpened(c)) if c == channel),
2486 "must return the updated channel entry"
2487 );
2488
2489 assert_eq!(channel.status, ChannelStatus::Open);
2490 assert_eq!(channel.channel_epoch, 4u64.into());
2491 assert_eq!(channel.ticket_index, 0u64.into());
2492
2493 assert_eq!(
2494 0,
2495 db.get_outgoing_ticket_index(channel.get_id())
2496 .await?
2497 .load(Ordering::Relaxed)
2498 );
2499 Ok(())
2500 }
2501
2502 #[tokio::test]
2503 async fn on_channel_should_not_reopen_when_not_closed() -> anyhow::Result<()> {
2504 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2505 let rpc_operations = MockIndexerRpcOperations::new();
2506 let clonable_rpc_operations = ClonableMockOperations {
2508 inner: Arc::new(rpc_operations),
2510 };
2511 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2512
2513 let channel = ChannelEntry::new(
2514 *SELF_CHAIN_ADDRESS,
2515 *COUNTERPARTY_CHAIN_ADDRESS,
2516 0.into(),
2517 primitive_types::U256::zero(),
2518 ChannelStatus::Open,
2519 3.into(),
2520 );
2521
2522 db.upsert_channel(None, channel).await?;
2523
2524 let encoded_data = ().abi_encode();
2525
2526 let channel_opened_log = SerializableLog {
2527 address: handlers.addresses.channels,
2528 topics: vec![
2529 hopr_bindings::hoprchannels::HoprChannels::ChannelOpened::SIGNATURE_HASH.into(),
2530 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2532 H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2533 ],
2534 data: encoded_data,
2535 ..test_log()
2536 };
2537
2538 db.begin_transaction()
2539 .await?
2540 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log, true).await }))
2541 .await
2542 .expect_err("should not re-open channel that is not Closed");
2543 Ok(())
2544 }
2545
2546 const PRICE_PER_PACKET: u32 = 20_u32;
2547
2548 fn mock_acknowledged_ticket(
2549 signer: &ChainKeypair,
2550 destination: &ChainKeypair,
2551 index: u64,
2552 win_prob: f64,
2553 ) -> anyhow::Result<AcknowledgedTicket> {
2554 let channel_id = generate_channel_id(&signer.into(), &destination.into());
2555
2556 let channel_epoch = 1u64;
2557 let domain_separator = Hash::default();
2558
2559 let response = Response::try_from(
2560 Hash::create(&[channel_id.as_ref(), &channel_epoch.to_be_bytes(), &index.to_be_bytes()]).as_ref(),
2561 )?;
2562
2563 Ok(TicketBuilder::default()
2564 .direction(&signer.into(), &destination.into())
2565 .amount(primitive_types::U256::from(PRICE_PER_PACKET).div_f64(win_prob)?)
2566 .index(index)
2567 .index_offset(1)
2568 .win_prob(win_prob.try_into()?)
2569 .channel_epoch(1)
2570 .challenge(response.to_challenge().into())
2571 .build_signed(signer, &domain_separator)?
2572 .into_acknowledged(response))
2573 }
2574
2575 #[tokio::test]
2576 async fn on_channel_ticket_redeemed_incoming_channel() -> anyhow::Result<()> {
2577 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2578 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2579 .await?;
2580 let rpc_operations = MockIndexerRpcOperations::new();
2581 let clonable_rpc_operations = ClonableMockOperations {
2583 inner: Arc::new(rpc_operations),
2585 };
2586 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2587
2588 let channel = ChannelEntry::new(
2589 *COUNTERPARTY_CHAIN_ADDRESS,
2590 *SELF_CHAIN_ADDRESS,
2591 HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1)),
2592 primitive_types::U256::zero(),
2593 ChannelStatus::Open,
2594 primitive_types::U256::one(),
2595 );
2596
2597 let ticket_index = primitive_types::U256::from((1u128 << 48) - 2);
2598 let next_ticket_index = ticket_index + 1;
2599
2600 let mut ticket =
2601 mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64(), 1.0)?;
2602 ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
2603
2604 let ticket_value = ticket.verified_ticket().amount;
2605
2606 db.upsert_channel(None, channel).await?;
2607 db.upsert_ticket(None, ticket.clone()).await?;
2608
2609 let ticket_redeemed_log = SerializableLog {
2610 address: handlers.addresses.channels,
2611 topics: vec![
2612 hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2613 H256::from_slice(channel.get_id().as_ref()).into(),
2615 ],
2616 data: DynSolValue::Tuple(vec![DynSolValue::Uint(
2617 U256::from_be_bytes(next_ticket_index.to_be_bytes()),
2618 48,
2619 )])
2620 .abi_encode(),
2621 ..test_log()
2622 };
2623
2624 let outgoing_ticket_index_before = db
2625 .get_outgoing_ticket_index(channel.get_id())
2626 .await?
2627 .load(Ordering::Relaxed);
2628
2629 let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2630 assert_eq!(
2631 HoprBalance::zero(),
2632 stats.redeemed_value,
2633 "there should not be any redeemed value"
2634 );
2635 assert_eq!(
2636 HoprBalance::zero(),
2637 stats.neglected_value,
2638 "there should not be any neglected value"
2639 );
2640
2641 let event_type = db
2642 .begin_transaction()
2643 .await?
2644 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2645 .await?;
2646
2647 let channel = db
2648 .get_channel_by_id(None, &channel.get_id())
2649 .await?
2650 .context("a value should be present")?;
2651
2652 assert!(
2653 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, t)) if channel == c && t == Some(ticket)),
2654 "must return the updated channel entry and the redeemed ticket"
2655 );
2656
2657 assert_eq!(
2658 channel.ticket_index, next_ticket_index,
2659 "channel entry must contain next ticket index"
2660 );
2661
2662 let outgoing_ticket_index_after = db
2663 .get_outgoing_ticket_index(channel.get_id())
2664 .await?
2665 .load(Ordering::Relaxed);
2666
2667 assert_eq!(
2668 outgoing_ticket_index_before, outgoing_ticket_index_after,
2669 "outgoing ticket index must not change"
2670 );
2671
2672 let tickets = db.get_tickets((&channel).into()).await?;
2673
2674 assert!(tickets.is_empty(), "there should not be any tickets left");
2675
2676 let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2677 assert_eq!(
2678 ticket_value, stats.redeemed_value,
2679 "there should be redeemed value worth 1 ticket"
2680 );
2681 assert_eq!(
2682 HoprBalance::zero(),
2683 stats.neglected_value,
2684 "there should not be any neglected ticket"
2685 );
2686 Ok(())
2687 }
2688
2689 #[tokio::test]
2690 async fn on_channel_ticket_redeemed_incoming_channel_neglect_left_over_tickets() -> anyhow::Result<()> {
2691 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2692 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2693 .await?;
2694 let rpc_operations = MockIndexerRpcOperations::new();
2695 let clonable_rpc_operations = ClonableMockOperations {
2697 inner: Arc::new(rpc_operations),
2699 };
2700 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2701
2702 let channel = ChannelEntry::new(
2703 *COUNTERPARTY_CHAIN_ADDRESS,
2704 *SELF_CHAIN_ADDRESS,
2705 primitive_types::U256::from((1u128 << 96) - 1).into(),
2706 primitive_types::U256::zero(),
2707 ChannelStatus::Open,
2708 primitive_types::U256::one(),
2709 );
2710
2711 let ticket_index = primitive_types::U256::from((1u128 << 48) - 2);
2712 let next_ticket_index = ticket_index + 1;
2713
2714 let mut ticket =
2715 mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64(), 1.0)?;
2716 ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
2717
2718 let ticket_value = ticket.verified_ticket().amount;
2719
2720 db.upsert_channel(None, channel).await?;
2721 db.upsert_ticket(None, ticket.clone()).await?;
2722
2723 let old_ticket =
2724 mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64() - 1, 1.0)?;
2725 db.upsert_ticket(None, old_ticket.clone()).await?;
2726
2727 let ticket_redeemed_log = SerializableLog {
2728 address: handlers.addresses.channels,
2729 topics: vec![
2730 hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2731 H256::from_slice(channel.get_id().as_ref()).into(),
2733 ],
2734 data: Vec::from(next_ticket_index.to_be_bytes()),
2735 ..test_log()
2736 };
2737
2738 let outgoing_ticket_index_before = db
2739 .get_outgoing_ticket_index(channel.get_id())
2740 .await?
2741 .load(Ordering::Relaxed);
2742
2743 let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2744 assert_eq!(
2745 HoprBalance::zero(),
2746 stats.redeemed_value,
2747 "there should not be any redeemed value"
2748 );
2749 assert_eq!(
2750 HoprBalance::zero(),
2751 stats.neglected_value,
2752 "there should not be any neglected value"
2753 );
2754
2755 let event_type = db
2756 .begin_transaction()
2757 .await?
2758 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2759 .await?;
2760
2761 let channel = db
2762 .get_channel_by_id(None, &channel.get_id())
2763 .await?
2764 .context("a value should be present")?;
2765
2766 assert!(
2767 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, t)) if channel == c && t == Some(ticket)),
2768 "must return the updated channel entry and the redeemed ticket"
2769 );
2770
2771 assert_eq!(
2772 channel.ticket_index, next_ticket_index,
2773 "channel entry must contain next ticket index"
2774 );
2775
2776 let outgoing_ticket_index_after = db
2777 .get_outgoing_ticket_index(channel.get_id())
2778 .await?
2779 .load(Ordering::Relaxed);
2780
2781 assert_eq!(
2782 outgoing_ticket_index_before, outgoing_ticket_index_after,
2783 "outgoing ticket index must not change"
2784 );
2785
2786 let tickets = db.get_tickets((&channel).into()).await?;
2787 assert!(tickets.is_empty(), "there should not be any tickets left");
2788
2789 let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2790 assert_eq!(
2791 ticket_value, stats.redeemed_value,
2792 "there should be redeemed value worth 1 ticket"
2793 );
2794 assert_eq!(
2795 ticket_value, stats.neglected_value,
2796 "there should neglected value worth 1 ticket"
2797 );
2798 Ok(())
2799 }
2800
2801 #[tokio::test]
2802 async fn on_channel_ticket_redeemed_outgoing_channel() -> anyhow::Result<()> {
2803 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2804 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2805 .await?;
2806 let rpc_operations = MockIndexerRpcOperations::new();
2807 let clonable_rpc_operations = ClonableMockOperations {
2809 inner: Arc::new(rpc_operations),
2811 };
2812 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2813
2814 let channel = ChannelEntry::new(
2815 *SELF_CHAIN_ADDRESS,
2816 *COUNTERPARTY_CHAIN_ADDRESS,
2817 primitive_types::U256::from((1u128 << 96) - 1).into(),
2818 primitive_types::U256::zero(),
2819 ChannelStatus::Open,
2820 primitive_types::U256::one(),
2821 );
2822
2823 let ticket_index = primitive_types::U256::from((1u128 << 48) - 2);
2824 let next_ticket_index = ticket_index + 1;
2825
2826 db.upsert_channel(None, channel).await?;
2827
2828 let ticket_redeemed_log = SerializableLog {
2829 address: handlers.addresses.channels,
2830 topics: vec![
2831 hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2832 H256::from_slice(channel.get_id().as_ref()).into(),
2834 ],
2835 data: Vec::from(next_ticket_index.to_be_bytes()),
2836 ..test_log()
2837 };
2838
2839 let event_type = db
2840 .begin_transaction()
2841 .await?
2842 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2843 .await?;
2844
2845 let channel = db
2846 .get_channel_by_id(None, &channel.get_id())
2847 .await?
2848 .context("a value should be present")?;
2849
2850 assert!(
2851 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if channel == c),
2852 "must return update channel entry and no ticket"
2853 );
2854
2855 assert_eq!(
2856 channel.ticket_index, next_ticket_index,
2857 "channel entry must contain next ticket index"
2858 );
2859
2860 let outgoing_ticket_index = db
2861 .get_outgoing_ticket_index(channel.get_id())
2862 .await?
2863 .load(Ordering::Relaxed);
2864
2865 assert!(
2866 outgoing_ticket_index >= ticket_index.as_u64(),
2867 "outgoing idx {outgoing_ticket_index} must be greater or equal to {ticket_index}"
2868 );
2869 assert_eq!(
2870 outgoing_ticket_index,
2871 next_ticket_index.as_u64(),
2872 "outgoing ticket index must be equal to next ticket index"
2873 );
2874 Ok(())
2875 }
2876
2877 #[tokio::test]
2878 async fn on_channel_ticket_redeemed_on_incoming_channel_with_non_existent_ticket_should_pass() -> anyhow::Result<()>
2879 {
2880 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2881 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2882 .await?;
2883 let rpc_operations = MockIndexerRpcOperations::new();
2884 let clonable_rpc_operations = ClonableMockOperations {
2886 inner: Arc::new(rpc_operations),
2888 };
2889 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2890
2891 let channel = ChannelEntry::new(
2892 *COUNTERPARTY_CHAIN_ADDRESS,
2893 *SELF_CHAIN_ADDRESS,
2894 primitive_types::U256::from((1u128 << 96) - 1).into(),
2895 primitive_types::U256::zero(),
2896 ChannelStatus::Open,
2897 primitive_types::U256::one(),
2898 );
2899
2900 db.upsert_channel(None, channel).await?;
2901
2902 let next_ticket_index = primitive_types::U256::from((1u128 << 48) - 1);
2903
2904 let ticket_redeemed_log = SerializableLog {
2905 address: handlers.addresses.channels,
2906 topics: vec![
2907 hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2908 H256::from_slice(channel.get_id().as_ref()).into(),
2910 ],
2911 data: Vec::from(next_ticket_index.to_be_bytes()),
2912 ..test_log()
2913 };
2914
2915 let event_type = db
2916 .begin_transaction()
2917 .await?
2918 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2919 .await?;
2920
2921 let channel = db
2922 .get_channel_by_id(None, &channel.get_id())
2923 .await?
2924 .context("a value should be present")?;
2925
2926 assert!(
2927 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if c == channel),
2928 "must return updated channel entry and no ticket"
2929 );
2930
2931 assert_eq!(
2932 channel.ticket_index, next_ticket_index,
2933 "channel entry must contain next ticket index"
2934 );
2935 Ok(())
2936 }
2937
2938 #[tokio::test]
2939 async fn on_channel_ticket_redeemed_on_foreign_channel_should_pass() -> anyhow::Result<()> {
2940 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2941 let rpc_operations = MockIndexerRpcOperations::new();
2942 let clonable_rpc_operations = ClonableMockOperations {
2944 inner: Arc::new(rpc_operations),
2946 };
2947 let handlers = init_handlers(clonable_rpc_operations, db.clone());
2948
2949 let channel = ChannelEntry::new(
2950 Address::from(hopr_crypto_random::random_bytes()),
2951 Address::from(hopr_crypto_random::random_bytes()),
2952 primitive_types::U256::from((1u128 << 96) - 1).into(),
2953 primitive_types::U256::zero(),
2954 ChannelStatus::Open,
2955 primitive_types::U256::one(),
2956 );
2957
2958 db.upsert_channel(None, channel).await?;
2959
2960 let next_ticket_index = primitive_types::U256::from((1u128 << 48) - 1);
2961
2962 let ticket_redeemed_log = SerializableLog {
2963 address: handlers.addresses.channels,
2964 topics: vec![
2965 hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2966 H256::from_slice(channel.get_id().as_ref()).into(),
2968 ],
2969 data: Vec::from(next_ticket_index.to_be_bytes()),
2970 ..test_log()
2971 };
2972
2973 let event_type = db
2974 .begin_transaction()
2975 .await?
2976 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2977 .await?;
2978
2979 let channel = db
2980 .get_channel_by_id(None, &channel.get_id())
2981 .await?
2982 .context("a value should be present")?;
2983
2984 assert!(
2985 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if c == channel),
2986 "must return updated channel entry and no ticket"
2987 );
2988
2989 assert_eq!(
2990 channel.ticket_index, next_ticket_index,
2991 "channel entry must contain next ticket index"
2992 );
2993 Ok(())
2994 }
2995
2996 #[tokio::test]
2997 async fn on_channel_closure_initiated() -> anyhow::Result<()> {
2998 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2999 let rpc_operations = MockIndexerRpcOperations::new();
3000 let clonable_rpc_operations = ClonableMockOperations {
3002 inner: Arc::new(rpc_operations),
3004 };
3005 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3006
3007 let channel = ChannelEntry::new(
3008 *SELF_CHAIN_ADDRESS,
3009 *COUNTERPARTY_CHAIN_ADDRESS,
3010 primitive_types::U256::from((1u128 << 96) - 1).into(),
3011 primitive_types::U256::zero(),
3012 ChannelStatus::Open,
3013 primitive_types::U256::one(),
3014 );
3015
3016 db.upsert_channel(None, channel).await?;
3017
3018 let timestamp = SystemTime::now();
3019
3020 let encoded_data = (U256::from(timestamp.as_unix_timestamp().as_secs())).abi_encode();
3021
3022 let closure_initiated_log = SerializableLog {
3023 address: handlers.addresses.channels,
3024 topics: vec![
3025 hopr_bindings::hoprchannels::HoprChannels::OutgoingChannelClosureInitiated::SIGNATURE_HASH.into(),
3026 H256::from_slice(channel.get_id().as_ref()).into(),
3028 ],
3029 data: encoded_data,
3030 ..test_log()
3032 };
3033
3034 let event_type = db
3035 .begin_transaction()
3036 .await?
3037 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, closure_initiated_log, true).await }))
3038 .await?;
3039
3040 let channel = db
3041 .get_channel_by_id(None, &channel.get_id())
3042 .await?
3043 .context("a value should be present")?;
3044
3045 assert!(
3046 matches!(event_type, Some(ChainEventType::ChannelClosureInitiated(c)) if c == channel),
3047 "must return updated channel entry"
3048 );
3049
3050 assert_eq!(
3051 channel.status,
3052 ChannelStatus::PendingToClose(timestamp),
3053 "channel status must match"
3054 );
3055 Ok(())
3056 }
3057
3058 #[tokio::test]
3059 async fn on_node_safe_registry_registered() -> anyhow::Result<()> {
3060 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3061 let rpc_operations = MockIndexerRpcOperations::new();
3062 let clonable_rpc_operations = ClonableMockOperations {
3064 inner: Arc::new(rpc_operations),
3066 };
3067 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3068
3069 let encoded_data = ().abi_encode();
3070
3071 let safe_registered_log = SerializableLog {
3072 address: handlers.addresses.safe_registry,
3073 topics: vec![
3074 hopr_bindings::hoprnodesaferegistry::HoprNodeSafeRegistry::RegisteredNodeSafe::SIGNATURE_HASH.into(),
3075 H256::from_slice(&SAFE_INSTANCE_ADDR.to_bytes32()).into(),
3077 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
3078 ],
3079 data: encoded_data,
3080 ..test_log()
3081 };
3082
3083 let event_type = db
3084 .begin_transaction()
3085 .await?
3086 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, safe_registered_log, true).await }))
3087 .await?;
3088
3089 assert!(matches!(event_type, Some(ChainEventType::NodeSafeRegistered(addr)) if addr == *SAFE_INSTANCE_ADDR));
3090
3091 Ok(())
3093 }
3094
3095 #[tokio::test]
3096 async fn on_node_safe_registry_deregistered() -> anyhow::Result<()> {
3097 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3098 let rpc_operations = MockIndexerRpcOperations::new();
3099 let clonable_rpc_operations = ClonableMockOperations {
3101 inner: Arc::new(rpc_operations),
3103 };
3104 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3105
3106 let encoded_data = ().abi_encode();
3109
3110 let safe_registered_log = SerializableLog {
3111 address: handlers.addresses.safe_registry,
3112 topics: vec![
3113 hopr_bindings::hoprnodesaferegistry::HoprNodeSafeRegistry::DergisteredNodeSafe::SIGNATURE_HASH.into(),
3114 H256::from_slice(&SAFE_INSTANCE_ADDR.to_bytes32()).into(),
3116 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
3117 ],
3118 data: encoded_data,
3119 ..test_log()
3120 };
3121
3122 let event_type = db
3123 .begin_transaction()
3124 .await?
3125 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, safe_registered_log, true).await }))
3126 .await?;
3127
3128 assert!(
3129 event_type.is_none(),
3130 "there's no associated chain event type with safe deregistration"
3131 );
3132
3133 Ok(())
3135 }
3136
3137 #[tokio::test]
3138 async fn ticket_price_update() -> anyhow::Result<()> {
3139 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3140 let rpc_operations = MockIndexerRpcOperations::new();
3141 let clonable_rpc_operations = ClonableMockOperations {
3143 inner: Arc::new(rpc_operations),
3145 };
3146 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3147
3148 let encoded_data = (U256::from(1u64), U256::from(123u64)).abi_encode();
3149
3150 let price_change_log = SerializableLog {
3151 address: handlers.addresses.price_oracle,
3152 topics: vec![
3153 hopr_bindings::hoprticketpriceoracle::HoprTicketPriceOracle::TicketPriceUpdated::SIGNATURE_HASH.into(),
3154 ],
3156 data: encoded_data,
3157 ..test_log()
3159 };
3160
3161 assert_eq!(db.get_indexer_data(None).await?.ticket_price, None);
3162
3163 let event_type = db
3164 .begin_transaction()
3165 .await?
3166 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, price_change_log, true).await }))
3167 .await?;
3168
3169 assert!(
3170 event_type.is_none(),
3171 "there's no associated chain event type with price oracle"
3172 );
3173
3174 assert_eq!(
3175 db.get_indexer_data(None).await?.ticket_price.map(|p| p.amount()),
3176 Some(primitive_types::U256::from(123u64))
3177 );
3178 Ok(())
3179 }
3180
3181 #[tokio::test]
3182 async fn minimum_win_prob_update() -> anyhow::Result<()> {
3183 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3184 let rpc_operations = MockIndexerRpcOperations::new();
3185 let clonable_rpc_operations = ClonableMockOperations {
3187 inner: Arc::new(rpc_operations),
3189 };
3190 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3191
3192 let encoded_data = (
3193 U256::from_be_slice(WinningProbability::ALWAYS.as_ref()),
3194 U256::from_be_slice(WinningProbability::try_from_f64(0.5)?.as_ref()),
3195 )
3196 .abi_encode();
3197
3198 let win_prob_change_log = SerializableLog {
3199 address: handlers.addresses.win_prob_oracle,
3200 topics: vec![
3201 hopr_bindings::hoprwinningprobabilityoracle::HoprWinningProbabilityOracle::WinProbUpdated::SIGNATURE_HASH.into()],
3202 data: encoded_data,
3203 ..test_log()
3204 };
3205
3206 assert_eq!(
3207 db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
3208 1.0
3209 );
3210
3211 let event_type = db
3212 .begin_transaction()
3213 .await?
3214 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, win_prob_change_log, true).await }))
3215 .await?;
3216
3217 assert!(
3218 event_type.is_none(),
3219 "there's no associated chain event type with winning probability change"
3220 );
3221
3222 assert_eq!(
3223 db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
3224 0.5
3225 );
3226 Ok(())
3227 }
3228
3229 #[tokio::test]
3230 async fn lowering_minimum_win_prob_update_should_reject_non_satisfying_unredeemed_tickets() -> anyhow::Result<()> {
3231 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3232 db.set_minimum_incoming_ticket_win_prob(None, 0.1.try_into()?).await?;
3233
3234 let new_minimum = 0.5;
3235 let ticket_win_probs = [0.1, 1.0, 0.3, 0.2];
3236
3237 let channel_1 = ChannelEntry::new(
3238 *COUNTERPARTY_CHAIN_ADDRESS,
3239 *SELF_CHAIN_ADDRESS,
3240 primitive_types::U256::from((1u128 << 96) - 1).into(),
3241 3_u32.into(),
3242 ChannelStatus::Open,
3243 primitive_types::U256::one(),
3244 );
3245
3246 db.upsert_channel(None, channel_1).await?;
3247
3248 let ticket = mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, 1, ticket_win_probs[0])?;
3249 db.upsert_ticket(None, ticket).await?;
3250
3251 let ticket = mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, 2, ticket_win_probs[1])?;
3252 db.upsert_ticket(None, ticket).await?;
3253
3254 let tickets = db.get_tickets((&channel_1).into()).await?;
3255 assert_eq!(tickets.len(), 2);
3256
3257 let other_counterparty = ChainKeypair::random();
3260 let channel_2 = ChannelEntry::new(
3261 other_counterparty.public().to_address(),
3262 *SELF_CHAIN_ADDRESS,
3263 primitive_types::U256::from((1u128 << 96) - 1).into(),
3264 3_u32.into(),
3265 ChannelStatus::Open,
3266 primitive_types::U256::one(),
3267 );
3268
3269 db.upsert_channel(None, channel_2).await?;
3270
3271 let ticket = mock_acknowledged_ticket(&other_counterparty, &SELF_CHAIN_KEY, 1, ticket_win_probs[2])?;
3272 db.upsert_ticket(None, ticket).await?;
3273
3274 let ticket = mock_acknowledged_ticket(&other_counterparty, &SELF_CHAIN_KEY, 2, ticket_win_probs[3])?;
3275 db.upsert_ticket(None, ticket).await?;
3276
3277 let tickets = db.get_tickets((&channel_2).into()).await?;
3278 assert_eq!(tickets.len(), 2);
3279
3280 let stats = db.get_ticket_statistics(None).await?;
3281 assert_eq!(HoprBalance::zero(), stats.rejected_value);
3282
3283 let rpc_operations = MockIndexerRpcOperations::new();
3284 let clonable_rpc_operations = ClonableMockOperations {
3286 inner: Arc::new(rpc_operations),
3288 };
3289 let handlers = init_handlers(clonable_rpc_operations, db.clone());
3290
3291 let encoded_data = (
3292 U256::from_be_slice(WinningProbability::try_from(0.1)?.as_ref()),
3293 U256::from_be_slice(WinningProbability::try_from(new_minimum)?.as_ref()),
3294 )
3295 .abi_encode();
3296
3297 let win_prob_change_log = SerializableLog {
3298 address: handlers.addresses.win_prob_oracle,
3299 topics: vec![
3300 hopr_bindings::hoprwinningprobabilityoracle::HoprWinningProbabilityOracle::WinProbUpdated::SIGNATURE_HASH.into(),
3301 ],
3302 data: encoded_data,
3303 ..test_log()
3304 };
3305
3306 let event_type = db
3307 .begin_transaction()
3308 .await?
3309 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, win_prob_change_log, true).await }))
3310 .await?;
3311
3312 assert!(
3313 event_type.is_none(),
3314 "there's no associated chain event type with winning probability change"
3315 );
3316
3317 assert_eq!(
3318 db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
3319 new_minimum
3320 );
3321
3322 let tickets = db.get_tickets((&channel_1).into()).await?;
3323 assert_eq!(tickets.len(), 1);
3324
3325 let tickets = db.get_tickets((&channel_2).into()).await?;
3326 assert_eq!(tickets.len(), 0);
3327
3328 let stats = db.get_ticket_statistics(None).await?;
3329 let rejected_value: primitive_types::U256 = ticket_win_probs
3330 .iter()
3331 .filter(|p| **p < new_minimum)
3332 .map(|p| {
3333 primitive_types::U256::from(PRICE_PER_PACKET)
3334 .div_f64(*p)
3335 .expect("must divide")
3336 })
3337 .reduce(|a, b| a + b)
3338 .ok_or(anyhow!("must sum"))?;
3339
3340 assert_eq!(HoprBalance::from(rejected_value), stats.rejected_value);
3341
3342 Ok(())
3343 }
3344}