1use async_trait::async_trait;
2use ethers::contract::EthLogDecode;
3use ethers::types::H256;
4use std::cmp::Ordering;
5use std::fmt::Formatter;
6use std::ops::{Add, Sub};
7use std::sync::Arc;
8use std::time::{Duration, SystemTime};
9use tracing::{debug, error, info, trace, warn};
10
11use hopr_bindings::{
12 hopr_announcements::HoprAnnouncementsEvents, hopr_channels::HoprChannelsEvents,
13 hopr_network_registry::HoprNetworkRegistryEvents, hopr_node_management_module::HoprNodeManagementModuleEvents,
14 hopr_node_safe_registry::HoprNodeSafeRegistryEvents, hopr_ticket_price_oracle::HoprTicketPriceOracleEvents,
15 hopr_token::HoprTokenEvents, hopr_winning_probability_oracle::HoprWinningProbabilityOracleEvents,
16};
17use hopr_chain_rpc::{BlockWithLogs, Log};
18use hopr_chain_types::chain_events::{ChainEventType, NetworkRegistryStatus, SignificantChainEvent};
19use hopr_chain_types::ContractAddresses;
20use hopr_crypto_types::keypairs::ChainKeypair;
21use hopr_crypto_types::prelude::{Hash, Keypair};
22use hopr_crypto_types::types::OffchainSignature;
23use hopr_db_sql::api::info::DomainSeparator;
24use hopr_db_sql::api::tickets::TicketSelector;
25use hopr_db_sql::errors::DbSqlError;
26use hopr_db_sql::prelude::TicketMarker;
27use hopr_db_sql::{HoprDbAllOperations, OpenTransaction};
28use hopr_internal_types::prelude::*;
29use hopr_primitive_types::prelude::*;
30
31use crate::errors::{CoreEthereumIndexerError, Result};
32
33#[cfg(all(feature = "prometheus", not(test)))]
34lazy_static::lazy_static! {
35 static ref METRIC_INDEXER_LOG_COUNTERS: hopr_metrics::MultiCounter =
36 hopr_metrics::MultiCounter::new(
37 "hopr_indexer_contract_log_count",
38 "Counts of different HOPR contract logs processed by the Indexer",
39 &["contract"]
40 ).unwrap();
41}
42
43#[derive(Clone)]
49pub struct ContractEventHandlers<Db: Clone> {
50 addresses: Arc<ContractAddresses>,
53 safe_address: Address,
55 chain_key: ChainKeypair, db: Db,
59}
60
61impl<Db: Clone> std::fmt::Debug for ContractEventHandlers<Db> {
62 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63 f.debug_struct("ContractEventHandler")
64 .field("addresses", &self.addresses)
65 .field("safe_address", &self.safe_address)
66 .field("chain_key", &self.chain_key)
67 .finish_non_exhaustive()
68 }
69}
70
71impl<Db> ContractEventHandlers<Db>
72where
73 Db: HoprDbAllOperations + Clone,
74{
75 pub fn new(addresses: ContractAddresses, safe_address: Address, chain_key: ChainKeypair, db: Db) -> Self {
76 Self {
77 addresses: Arc::new(addresses),
78 safe_address,
79 chain_key,
80 db,
81 }
82 }
83
84 async fn on_announcement_event(
85 &self,
86 tx: &OpenTransaction,
87 event: HoprAnnouncementsEvents,
88 block_number: u32,
89 ) -> Result<Option<ChainEventType>> {
90 #[cfg(all(feature = "prometheus", not(test)))]
91 METRIC_INDEXER_LOG_COUNTERS.increment(&["announcements"]);
92
93 match event {
94 HoprAnnouncementsEvents::AddressAnnouncementFilter(address_announcement) => {
95 trace!(
96 multiaddress = &address_announcement.base_multiaddr,
97 address = &address_announcement.node.to_string(),
98 "on_announcement_event",
99 );
100 if address_announcement.base_multiaddr.is_empty() {
102 warn!(
103 address = ?address_announcement.node,
104 "encountered empty multiaddress announcement",
105 );
106 return Ok(None);
107 }
108 let node_address: Address = address_announcement.node.into();
109
110 return match self
111 .db
112 .insert_announcement(
113 Some(tx),
114 node_address,
115 address_announcement.base_multiaddr.parse()?,
116 block_number,
117 )
118 .await
119 {
120 Ok(account) => Ok(Some(ChainEventType::Announcement {
121 peer: account.public_key.into(),
122 address: account.chain_addr,
123 multiaddresses: vec![account.get_multiaddr().expect("not must contain multiaddr")],
124 })),
125 Err(DbSqlError::MissingAccount) => Err(CoreEthereumIndexerError::AnnounceBeforeKeyBinding),
126 Err(e) => Err(e.into()),
127 };
128 }
129 HoprAnnouncementsEvents::KeyBindingFilter(key_binding) => {
130 match KeyBinding::from_parts(
131 key_binding.chain_key.into(),
132 key_binding.ed_25519_pub_key.try_into()?,
133 OffchainSignature::try_from((key_binding.ed_25519_sig_0, key_binding.ed_25519_sig_1))?,
134 ) {
135 Ok(binding) => {
136 self.db
137 .insert_account(
138 Some(tx),
139 AccountEntry::new(binding.packet_key, binding.chain_key, AccountType::NotAnnounced),
140 )
141 .await?;
142 }
143 Err(e) => {
144 warn!(
145 address = ?key_binding.chain_key,
146 error = %e,
147 "Filtering announcement with invalid signature",
148
149 )
150 }
151 }
152 }
153 HoprAnnouncementsEvents::RevokeAnnouncementFilter(revocation) => {
154 let node_address: Address = revocation.node.into();
155 match self.db.delete_all_announcements(Some(tx), node_address).await {
156 Err(DbSqlError::MissingAccount) => {
157 return Err(CoreEthereumIndexerError::RevocationBeforeKeyBinding)
158 }
159 Err(e) => return Err(e.into()),
160 _ => {}
161 }
162 }
163 };
164
165 Ok(None)
166 }
167
168 async fn on_channel_event(
169 &self,
170 tx: &OpenTransaction,
171 event: HoprChannelsEvents,
172 ) -> Result<Option<ChainEventType>> {
173 #[cfg(all(feature = "prometheus", not(test)))]
174 METRIC_INDEXER_LOG_COUNTERS.increment(&["channels"]);
175
176 match event {
177 HoprChannelsEvents::ChannelBalanceDecreasedFilter(balance_decreased) => {
178 let maybe_channel = self
179 .db
180 .begin_channel_update(tx.into(), &balance_decreased.channel_id.into())
181 .await?;
182
183 if let Some(channel_edits) = maybe_channel {
184 let new_balance = Balance::new(balance_decreased.new_balance, BalanceType::HOPR);
185 let diff = channel_edits.entry().balance.sub(&new_balance);
186
187 let updated_channel = self
188 .db
189 .finish_channel_update(tx.into(), channel_edits.change_balance(new_balance))
190 .await?;
191
192 Ok(Some(ChainEventType::ChannelBalanceDecreased(updated_channel, diff)))
193 } else {
194 error!(channel_id = %Hash::from(balance_decreased.channel_id), "observed balance decreased event for a channel that does not exist");
195 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
196 }
197 }
198 HoprChannelsEvents::ChannelBalanceIncreasedFilter(balance_increased) => {
199 let maybe_channel = self
200 .db
201 .begin_channel_update(tx.into(), &balance_increased.channel_id.into())
202 .await?;
203
204 if let Some(channel_edits) = maybe_channel {
205 let new_balance = Balance::new(balance_increased.new_balance, BalanceType::HOPR);
206 let diff = new_balance.sub(&channel_edits.entry().balance);
207
208 let updated_channel = self
209 .db
210 .finish_channel_update(tx.into(), channel_edits.change_balance(new_balance))
211 .await?;
212
213 Ok(Some(ChainEventType::ChannelBalanceIncreased(updated_channel, diff)))
214 } else {
215 error!(channel_id = %Hash::from(balance_increased.channel_id), "observed balance increased event for a channel that does not exist");
216 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
217 }
218 }
219 HoprChannelsEvents::ChannelClosedFilter(channel_closed) => {
220 let maybe_channel = self
221 .db
222 .begin_channel_update(tx.into(), &channel_closed.channel_id.into())
223 .await?;
224
225 trace!(
226 "on_channel_closed_event - channel_id: {:?} - channel known: {:?}",
227 channel_closed.channel_id,
228 maybe_channel.is_some()
229 );
230
231 if let Some(channel_edits) = maybe_channel {
232 let channel_id = channel_edits.entry().get_id();
233 let orientation = channel_edits.entry().orientation(&self.chain_key.public().to_address());
234
235 let updated_channel = if let Some((direction, _)) = orientation {
238 let channel_edits = channel_edits
240 .change_status(ChannelStatus::Closed)
241 .change_balance(BalanceType::HOPR.zero())
242 .change_ticket_index(0);
243
244 let updated_channel = self.db.finish_channel_update(tx.into(), channel_edits).await?;
245
246 match direction {
248 ChannelDirection::Incoming => {
249 self.db
251 .mark_tickets_as(updated_channel.into(), TicketMarker::Neglected)
252 .await?;
253 }
254 ChannelDirection::Outgoing => {
255 self.db.reset_outgoing_ticket_index(channel_id).await?;
257 }
258 }
259 updated_channel
260 } else {
261 let updated_channel = self.db.finish_channel_update(tx.into(), channel_edits.delete()).await?;
264 debug!(channel_id = %channel_id, "foreign closed closed channel was deleted");
265 updated_channel
266 };
267
268 Ok(Some(ChainEventType::ChannelClosed(updated_channel)))
269 } else {
270 error!(channel_id = %Hash::from(channel_closed.channel_id), "observed closure finalization event for a channel that does not exist");
271 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
272 }
273 }
274 HoprChannelsEvents::ChannelOpenedFilter(channel_opened) => {
275 let source: Address = channel_opened.source.into();
276 let destination: Address = channel_opened.destination.into();
277 let channel_id = generate_channel_id(&source, &destination);
278
279 let maybe_channel = self.db.begin_channel_update(tx.into(), &channel_id).await?;
280
281 let channel = if let Some(channel_edits) = maybe_channel {
282 if channel_edits.entry().status != ChannelStatus::Closed {
284 return Err(CoreEthereumIndexerError::ProcessError(format!(
285 "trying to re-open channel {} which is not closed, but {}",
286 channel_edits.entry().get_id(),
287 channel_edits.entry().status,
288 )));
289 }
290
291 trace!(%source, %destination, %channel_id, "on_channel_reopened_event");
292
293 let current_epoch = channel_edits.entry().channel_epoch;
294
295 if source == self.chain_key.public().to_address()
297 || destination == self.chain_key.public().to_address()
298 {
299 self.db
300 .mark_tickets_as(TicketSelector::new(channel_id, current_epoch), TicketMarker::Neglected)
301 .await?;
302
303 self.db.reset_outgoing_ticket_index(channel_id).await?;
304 }
305
306 self.db
308 .finish_channel_update(
309 tx.into(),
310 channel_edits
311 .change_ticket_index(0_u32)
312 .change_epoch(current_epoch.add(1))
313 .change_status(ChannelStatus::Open),
314 )
315 .await?
316 } else {
317 trace!(%source, %destination, %channel_id, "on_channel_opened_event");
318
319 let new_channel = ChannelEntry::new(
320 source,
321 destination,
322 BalanceType::HOPR.zero(),
323 0_u32.into(),
324 ChannelStatus::Open,
325 1_u32.into(),
326 );
327
328 self.db.upsert_channel(tx.into(), new_channel).await?;
329 new_channel
330 };
331
332 Ok(Some(ChainEventType::ChannelOpened(channel)))
333 }
334 HoprChannelsEvents::TicketRedeemedFilter(ticket_redeemed) => {
335 let maybe_channel = self
336 .db
337 .begin_channel_update(tx.into(), &ticket_redeemed.channel_id.into())
338 .await?;
339
340 if let Some(channel_edits) = maybe_channel {
341 let ack_ticket = match channel_edits.entry().direction(&self.chain_key.public().to_address()) {
342 Some(ChannelDirection::Incoming) => {
345 let mut matching_tickets = self
347 .db
348 .get_tickets(
349 TicketSelector::from(channel_edits.entry())
350 .with_state(AcknowledgedTicketStatus::BeingRedeemed),
351 )
352 .await?
353 .into_iter()
354 .filter(|ticket| {
355 let ticket_idx = ticket.verified_ticket().index;
358 let ticket_off = ticket.verified_ticket().index_offset as u64;
359
360 ticket_idx + ticket_off == ticket_redeemed.new_ticket_index
361 })
362 .collect::<Vec<_>>();
363
364 match matching_tickets.len().cmp(&1) {
365 Ordering::Equal => {
366 let ack_ticket = matching_tickets.pop().unwrap();
367
368 self.db
369 .mark_tickets_as((&ack_ticket).into(), TicketMarker::Redeemed)
370 .await?;
371 info!(%ack_ticket, "ticket marked as redeemed");
372 Some(ack_ticket)
373 }
374 Ordering::Less => {
375 error!(
376 idx = %ticket_redeemed.new_ticket_index - 1,
377 entry = %channel_edits.entry(),
378 "could not find acknowledged 'BeingRedeemed' ticket",
379 );
380 None
383 }
384 Ordering::Greater => {
385 error!(
386 count = matching_tickets.len(),
387 index = %ticket_redeemed.new_ticket_index - 1,
388 entry = %channel_edits.entry(),
389 "found tickets matching 'BeingRedeemed'",
390 );
391 return Err(CoreEthereumIndexerError::ProcessError(format!(
392 "multiple tickets matching idx {} found in {}",
393 ticket_redeemed.new_ticket_index - 1,
394 channel_edits.entry()
395 )));
396 }
397 }
398 }
399 Some(ChannelDirection::Outgoing) => {
404 debug!(channel = %channel_edits.entry(), "observed redeem event on an outgoing channel");
406 self.db
407 .compare_and_set_outgoing_ticket_index(
408 channel_edits.entry().get_id(),
409 ticket_redeemed.new_ticket_index,
410 )
411 .await?;
412 None
413 }
414 None => {
416 debug!(channel = %channel_edits.entry(), "observed redeem event on a foreign channel");
418 None
419 }
420 };
421
422 let channel = self
424 .db
425 .finish_channel_update(
426 tx.into(),
427 channel_edits.change_ticket_index(ticket_redeemed.new_ticket_index),
428 )
429 .await?;
430
431 self.db
434 .mark_tickets_as(
435 TicketSelector::from(&channel).with_index_range(..ticket_redeemed.new_ticket_index),
436 TicketMarker::Neglected,
437 )
438 .await?;
439
440 Ok(Some(ChainEventType::TicketRedeemed(channel, ack_ticket)))
441 } else {
442 error!(channel_id = %Hash::from(ticket_redeemed.channel_id), "observed ticket redeem on a channel that we don't have in the DB");
443 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
444 }
445 }
446 HoprChannelsEvents::OutgoingChannelClosureInitiatedFilter(closure_initiated) => {
447 let maybe_channel = self
448 .db
449 .begin_channel_update(tx.into(), &closure_initiated.channel_id.into())
450 .await?;
451
452 if let Some(channel_edits) = maybe_channel {
453 let new_status = ChannelStatus::PendingToClose(
454 SystemTime::UNIX_EPOCH.add(Duration::from_secs(closure_initiated.closure_time as u64)),
455 );
456
457 let channel = self
458 .db
459 .finish_channel_update(tx.into(), channel_edits.change_status(new_status))
460 .await?;
461 Ok(Some(ChainEventType::ChannelClosureInitiated(channel)))
462 } else {
463 error!(channel_id = %Hash::from(closure_initiated.channel_id), "observed channel closure initiation on a channel that we don't have in the DB");
464 Err(CoreEthereumIndexerError::ChannelDoesNotExist)
465 }
466 }
467 HoprChannelsEvents::DomainSeparatorUpdatedFilter(domain_separator_updated) => {
468 self.db
469 .set_domain_separator(
470 Some(tx),
471 DomainSeparator::Channel,
472 domain_separator_updated.domain_separator.into(),
473 )
474 .await?;
475
476 Ok(None)
477 }
478 HoprChannelsEvents::LedgerDomainSeparatorUpdatedFilter(ledger_domain_separator_updated) => {
479 self.db
480 .set_domain_separator(
481 Some(tx),
482 DomainSeparator::Ledger,
483 ledger_domain_separator_updated.ledger_domain_separator.into(),
484 )
485 .await?;
486
487 Ok(None)
488 }
489 }
490 }
491
492 async fn on_token_event(&self, tx: &OpenTransaction, event: HoprTokenEvents) -> Result<Option<ChainEventType>> {
493 #[cfg(all(feature = "prometheus", not(test)))]
494 METRIC_INDEXER_LOG_COUNTERS.increment(&["token"]);
495
496 match event {
497 HoprTokenEvents::TransferFilter(transferred) => {
498 let from: Address = transferred.from.into();
499 let to: Address = transferred.to.into();
500
501 trace!(
502 safe_address = %&self.safe_address, %from, %to,
503 "on_token_transfer_event"
504 );
505
506 let mut current_balance = self.db.get_safe_hopr_balance(Some(tx)).await?;
507 let transferred_value = transferred.value;
508
509 if to.ne(&self.safe_address) && from.ne(&self.safe_address) {
510 return Ok(None);
511 } else if to.eq(&self.safe_address) {
512 info!(?current_balance, added_value = %transferred_value, "Safe balance increased ");
514 current_balance = current_balance + transferred_value;
515 } else if from.eq(&self.safe_address) {
516 info!(?current_balance, removed_value = %transferred_value, "Safe balance decreased");
518 current_balance = current_balance - transferred_value;
519 }
520
521 self.db.set_safe_hopr_balance(Some(tx), current_balance).await?;
522 }
523 HoprTokenEvents::ApprovalFilter(approved) => {
524 let owner: Address = approved.owner.into();
525 let spender: Address = approved.spender.into();
526
527 trace!(
528 address = %&self.safe_address, %owner, %spender, allowance = %approved.value,
529 "on_token_approval_event",
530
531 );
532
533 if owner.eq(&self.safe_address) && spender.eq(&self.addresses.channels) {
535 self.db
536 .set_safe_hopr_allowance(Some(tx), BalanceType::HOPR.balance(approved.value))
537 .await?;
538 } else {
539 return Ok(None);
540 }
541 }
542 _ => error!("Implement all the other filters for HoprTokenEvents"),
543 }
544
545 Ok(None)
546 }
547
548 async fn on_network_registry_event(
549 &self,
550 tx: &OpenTransaction,
551 event: HoprNetworkRegistryEvents,
552 ) -> Result<Option<ChainEventType>> {
553 #[cfg(all(feature = "prometheus", not(test)))]
554 METRIC_INDEXER_LOG_COUNTERS.increment(&["network_registry"]);
555
556 match event {
557 HoprNetworkRegistryEvents::DeregisteredByManagerFilter(deregistered) => {
558 let node_address: Address = deregistered.node_address.into();
559 self.db
560 .set_access_in_network_registry(Some(tx), node_address, false)
561 .await?;
562
563 return Ok(Some(ChainEventType::NetworkRegistryUpdate(
564 node_address,
565 NetworkRegistryStatus::Denied,
566 )));
567 }
568 HoprNetworkRegistryEvents::DeregisteredFilter(deregistered) => {
569 let node_address: Address = deregistered.node_address.into();
570 self.db
571 .set_access_in_network_registry(Some(tx), node_address, false)
572 .await?;
573
574 return Ok(Some(ChainEventType::NetworkRegistryUpdate(
575 node_address,
576 NetworkRegistryStatus::Denied,
577 )));
578 }
579 HoprNetworkRegistryEvents::RegisteredByManagerFilter(registered) => {
580 let node_address: Address = registered.node_address.into();
581 self.db
582 .set_access_in_network_registry(Some(tx), node_address, true)
583 .await?;
584
585 if node_address == self.chain_key.public().to_address() {
586 info!("This node has been added to the registry, node activation process continues on: http://hub.hoprnet.org/.");
587 }
588
589 return Ok(Some(ChainEventType::NetworkRegistryUpdate(
590 node_address,
591 NetworkRegistryStatus::Allowed,
592 )));
593 }
594 HoprNetworkRegistryEvents::RegisteredFilter(registered) => {
595 let node_address: Address = registered.node_address.into();
596 self.db
597 .set_access_in_network_registry(Some(tx), node_address, true)
598 .await?;
599
600 if node_address == self.chain_key.public().to_address() {
601 info!("This node has been added to the registry, node can now continue the node activation process on: http://hub.hoprnet.org/.");
602 }
603
604 return Ok(Some(ChainEventType::NetworkRegistryUpdate(
605 node_address,
606 NetworkRegistryStatus::Allowed,
607 )));
608 }
609 HoprNetworkRegistryEvents::EligibilityUpdatedFilter(eligibility_updated) => {
610 let account: Address = eligibility_updated.staking_account.into();
611 self.db
612 .set_safe_eligibility(Some(tx), account, eligibility_updated.eligibility)
613 .await?;
614 }
615 HoprNetworkRegistryEvents::NetworkRegistryStatusUpdatedFilter(enabled) => {
616 self.db
617 .set_network_registry_enabled(Some(tx), enabled.is_enabled)
618 .await?;
619 }
620 _ => {} };
622
623 Ok(None)
624 }
625
626 async fn on_node_safe_registry_event(
627 &self,
628 tx: &OpenTransaction,
629 event: HoprNodeSafeRegistryEvents,
630 ) -> Result<Option<ChainEventType>> {
631 #[cfg(all(feature = "prometheus", not(test)))]
632 METRIC_INDEXER_LOG_COUNTERS.increment(&["safe_registry"]);
633
634 match event {
635 HoprNodeSafeRegistryEvents::RegisteredNodeSafeFilter(registered) => {
636 if self.chain_key.public().to_address() == registered.node_address.into() {
637 info!(safe_address = %registered.safe_address, "Node safe registered", );
638 return Ok(Some(ChainEventType::NodeSafeRegistered(registered.safe_address.into())));
640 }
641 }
642 HoprNodeSafeRegistryEvents::DergisteredNodeSafeFilter(deregistered) => {
643 if self.chain_key.public().to_address() == deregistered.node_address.into() {
644 info!("Node safe unregistered");
645 }
647 }
648 HoprNodeSafeRegistryEvents::DomainSeparatorUpdatedFilter(domain_separator_updated) => {
649 self.db
650 .set_domain_separator(
651 Some(tx),
652 DomainSeparator::SafeRegistry,
653 domain_separator_updated.domain_separator.into(),
654 )
655 .await?;
656 }
657 }
658
659 Ok(None)
660 }
661
662 async fn on_node_management_module_event(
663 &self,
664 _db: &OpenTransaction,
665 _event: HoprNodeManagementModuleEvents,
666 ) -> Result<Option<ChainEventType>> {
667 #[cfg(all(feature = "prometheus", not(test)))]
668 METRIC_INDEXER_LOG_COUNTERS.increment(&["node_management_module"]);
669
670 Ok(None)
672 }
673
674 async fn on_ticket_winning_probability_oracle_event(
675 &self,
676 tx: &OpenTransaction,
677 event: HoprWinningProbabilityOracleEvents,
678 ) -> Result<Option<ChainEventType>> {
679 #[cfg(all(feature = "prometheus", not(test)))]
680 METRIC_INDEXER_LOG_COUNTERS.increment(&["win_prob_oracle"]);
681
682 match event {
683 HoprWinningProbabilityOracleEvents::WinProbUpdatedFilter(update) => {
684 let mut encoded_old: EncodedWinProb = Default::default();
685 encoded_old.copy_from_slice(&update.old_win_prob.to_be_bytes()[1..]);
686 let old_minimum_win_prob = win_prob_to_f64(&encoded_old);
687
688 let mut encoded_new: EncodedWinProb = Default::default();
689 encoded_new.copy_from_slice(&update.new_win_prob.to_be_bytes()[1..]);
690 let new_minimum_win_prob = win_prob_to_f64(&encoded_new);
691
692 trace!(
693 old = old_minimum_win_prob,
694 new = new_minimum_win_prob,
695 "on_ticket_minimum_win_prob_updated",
696 );
697
698 self.db
699 .set_minimum_incoming_ticket_win_prob(Some(tx), new_minimum_win_prob)
700 .await?;
701
702 info!(
703 old = old_minimum_win_prob,
704 new = new_minimum_win_prob,
705 "minimum ticket winning probability updated"
706 );
707
708 if old_minimum_win_prob < new_minimum_win_prob {
711 let mut selector: Option<TicketSelector> = None;
712 for channel in self.db.get_incoming_channels(tx.into()).await? {
713 selector = selector
714 .map(|s| s.also_on_channel(channel.get_id(), channel.channel_epoch))
715 .or_else(|| Some(TicketSelector::from(channel)));
716 }
717 if let Some(selector) = selector {
719 let num_rejected = self
720 .db
721 .mark_tickets_as(selector.with_winning_probability(..encoded_new), TicketMarker::Rejected)
722 .await?;
723 info!(count = num_rejected, "unredeemed tickets were rejected, because the minimum winning probability has been increased");
724 }
725 }
726 }
727 _ => {
728 }
730 }
731 Ok(None)
732 }
733
734 async fn on_ticket_price_oracle_event(
735 &self,
736 tx: &OpenTransaction,
737 event: HoprTicketPriceOracleEvents,
738 ) -> Result<Option<ChainEventType>> {
739 #[cfg(all(feature = "prometheus", not(test)))]
740 METRIC_INDEXER_LOG_COUNTERS.increment(&["price_oracle"]);
741
742 match event {
743 HoprTicketPriceOracleEvents::TicketPriceUpdatedFilter(update) => {
744 trace!(
745 old = update.0.to_string(),
746 new = update.1.to_string(),
747 "on_ticket_price_updated",
748 );
749
750 self.db
751 .update_ticket_price(Some(tx), BalanceType::HOPR.balance(update.1))
752 .await?;
753
754 info!(price = %update.1, "ticket price updated");
755 }
756 HoprTicketPriceOracleEvents::OwnershipTransferredFilter(_event) => {
757 }
759 }
760 Ok(None)
761 }
762
763 #[tracing::instrument(level = "debug", skip(self))]
764 async fn process_log_event(&self, tx: &OpenTransaction, slog: SerializableLog) -> Result<Option<ChainEventType>> {
765 trace!(log = %slog, "log content");
766 let log = Log::from(slog);
767
768 if log.address.eq(&self.addresses.announcements) {
769 let bn = log.block_number as u32;
770 let event = HoprAnnouncementsEvents::decode_log(&log.into())?;
771 self.on_announcement_event(tx, event, bn).await
772 } else if log.address.eq(&self.addresses.channels) {
773 let event = HoprChannelsEvents::decode_log(&log.into())?;
774 self.on_channel_event(tx, event).await
775 } else if log.address.eq(&self.addresses.network_registry) {
776 let event = HoprNetworkRegistryEvents::decode_log(&log.into())?;
777 self.on_network_registry_event(tx, event).await
778 } else if log.address.eq(&self.addresses.token) {
779 let event = HoprTokenEvents::decode_log(&log.into())?;
780 self.on_token_event(tx, event).await
781 } else if log.address.eq(&self.addresses.safe_registry) {
782 let event = HoprNodeSafeRegistryEvents::decode_log(&log.into())?;
783 self.on_node_safe_registry_event(tx, event).await
784 } else if log.address.eq(&self.addresses.module_implementation) {
785 let event = HoprNodeManagementModuleEvents::decode_log(&log.into())?;
786 self.on_node_management_module_event(tx, event).await
787 } else if log.address.eq(&self.addresses.price_oracle) {
788 let event = HoprTicketPriceOracleEvents::decode_log(&log.into())?;
789 self.on_ticket_price_oracle_event(tx, event).await
790 } else if log.address.eq(&self.addresses.win_prob_oracle) {
791 let event = HoprWinningProbabilityOracleEvents::decode_log(&log.into())?;
792 self.on_ticket_winning_probability_oracle_event(tx, event).await
793 } else {
794 #[cfg(all(feature = "prometheus", not(test)))]
795 METRIC_INDEXER_LOG_COUNTERS.increment(&["unknown"]);
796
797 error!(
798 address = %log.address, log = ?log,
799 "on_event error - unknown contract address, received log"
800 );
801 return Err(CoreEthereumIndexerError::UnknownContract(log.address));
802 }
803 }
804}
805
806#[async_trait]
807impl<Db> crate::traits::ChainLogHandler for ContractEventHandlers<Db>
808where
809 Db: HoprDbAllOperations + Clone + Send + Sync + 'static,
810{
811 fn contract_addresses(&self) -> Vec<Address> {
812 vec![
813 self.addresses.announcements,
814 self.addresses.channels,
815 self.addresses.module_implementation,
816 self.addresses.network_registry,
817 self.addresses.price_oracle,
818 self.addresses.win_prob_oracle,
819 self.addresses.safe_registry,
820 self.addresses.token,
821 ]
822 }
823
824 fn contract_address_topics(&self, contract: Address) -> Vec<H256> {
825 if contract.eq(&self.addresses.announcements) {
826 crate::constants::topics::announcement()
827 } else if contract.eq(&self.addresses.channels) {
828 crate::constants::topics::channel()
829 } else if contract.eq(&self.addresses.module_implementation) {
830 crate::constants::topics::module_implementation()
831 } else if contract.eq(&self.addresses.network_registry) {
832 crate::constants::topics::network_registry()
833 } else if contract.eq(&self.addresses.price_oracle) {
834 crate::constants::topics::ticket_price_oracle()
835 } else if contract.eq(&self.addresses.win_prob_oracle) {
836 crate::constants::topics::winning_prob_oracle()
837 } else if contract.eq(&self.addresses.safe_registry) {
838 crate::constants::topics::node_safe_registry()
839 } else if contract.eq(&self.addresses.token) {
840 crate::constants::topics::token()
841 } else {
842 vec![]
843 }
844 }
845
846 async fn collect_block_events(&self, block_with_logs: BlockWithLogs) -> Result<Vec<SignificantChainEvent>> {
847 let myself = self.clone();
848 self.db
849 .begin_transaction()
850 .await?
851 .perform(|tx| {
852 Box::pin(async move {
853 let mut ret = Vec::with_capacity(block_with_logs.logs.len());
855
856 for log in block_with_logs.logs {
858 let tx_hash = Hash::from(log.tx_hash);
859 let log_id = log.log_index;
860 let block_id = log.block_number;
861
862 match myself.process_log_event(tx, log).await {
863 Ok(Some(event_type)) => {
865 let significant_event = SignificantChainEvent { tx_hash, event_type };
866 debug!(block_id, %tx_hash, log_id, ?significant_event, "indexer got significant_event");
867 ret.push(significant_event);
868 }
869 Ok(None) => debug!(block_id, %tx_hash, log_id, "no significant event in log"),
870 Err(error) => error!(block_id, %tx_hash, log_id, %error, "error processing log in tx"),
871 }
872 }
873
874 Ok(ret)
875 })
876 })
877 .await
878 }
879}
880
881#[cfg(test)]
882mod tests {
883 use super::ContractEventHandlers;
884
885 use anyhow::{anyhow, Context};
886 use ethers::contract::EthEvent;
887 use ethers::{
888 abi::{encode, Address as EthereumAddress, Token},
889 types::U256 as EthU256,
890 };
891 use hex_literal::hex;
892 use multiaddr::Multiaddr;
893 use primitive_types::H256;
894 use std::sync::atomic::Ordering;
895 use std::sync::Arc;
896 use std::time::SystemTime;
897
898 use hopr_bindings::hopr_winning_probability_oracle_events::WinProbUpdatedFilter;
899 use hopr_bindings::{
900 hopr_announcements::{AddressAnnouncementFilter, KeyBindingFilter, RevokeAnnouncementFilter},
901 hopr_channels::{
902 ChannelBalanceDecreasedFilter, ChannelBalanceIncreasedFilter, ChannelClosedFilter, ChannelOpenedFilter,
903 DomainSeparatorUpdatedFilter, OutgoingChannelClosureInitiatedFilter, TicketRedeemedFilter,
904 },
905 hopr_network_registry::{
906 DeregisteredByManagerFilter, DeregisteredFilter, EligibilityUpdatedFilter,
907 NetworkRegistryStatusUpdatedFilter, RegisteredByManagerFilter, RegisteredFilter,
908 },
909 hopr_node_safe_registry::{DergisteredNodeSafeFilter, RegisteredNodeSafeFilter},
910 hopr_ticket_price_oracle::TicketPriceUpdatedFilter,
911 hopr_token::{ApprovalFilter, TransferFilter},
912 };
913 use hopr_chain_types::chain_events::{ChainEventType, NetworkRegistryStatus};
914 use hopr_chain_types::ContractAddresses;
915 use hopr_crypto_types::prelude::*;
916 use hopr_db_sql::accounts::{ChainOrPacketKey, HoprDbAccountOperations};
917 use hopr_db_sql::api::{info::DomainSeparator, tickets::HoprDbTicketOperations};
918 use hopr_db_sql::channels::HoprDbChannelOperations;
919 use hopr_db_sql::db::HoprDb;
920 use hopr_db_sql::info::HoprDbInfoOperations;
921 use hopr_db_sql::prelude::HoprDbResolverOperations;
922 use hopr_db_sql::registry::HoprDbRegistryOperations;
923 use hopr_db_sql::{HoprDbAllOperations, HoprDbGeneralModelOperations};
924 use hopr_internal_types::prelude::*;
925 use hopr_primitive_types::prelude::*;
926
927 lazy_static::lazy_static! {
928 static ref SELF_PRIV_KEY: OffchainKeypair = OffchainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
929 static ref COUNTERPARTY_CHAIN_KEY: ChainKeypair = ChainKeypair::random();
930 static ref COUNTERPARTY_CHAIN_ADDRESS: Address = COUNTERPARTY_CHAIN_KEY.public().to_address();
931 static ref SELF_CHAIN_KEY: ChainKeypair = ChainKeypair::random();
932 static ref SELF_CHAIN_ADDRESS: Address = SELF_CHAIN_KEY.public().to_address();
933 static ref STAKE_ADDRESS: Address = "4331eaa9542b6b034c43090d9ec1c2198758dbc3".parse().expect("lazy static address should be constructible");
934 static ref CHANNELS_ADDR: Address = "bab20aea98368220baa4e3b7f151273ee71df93b".parse().expect("lazy static address should be constructible"); static ref TOKEN_ADDR: Address = "47d1677e018e79dcdd8a9c554466cb1556fa5007".parse().expect("lazy static address should be constructible"); static ref NETWORK_REGISTRY_ADDR: Address = "a469d0225f884fb989cbad4fe289f6fd2fb98051".parse().expect("lazy static address should be constructible"); static ref NODE_SAFE_REGISTRY_ADDR: Address = "0dcd1bf9a1b36ce34237eeafef220932846bcd82".parse().expect("lazy static address should be constructible"); static ref ANNOUNCEMENTS_ADDR: Address = "11db4791bf45ef31a10ea4a1b5cb90f46cc72c7e".parse().expect("lazy static address should be constructible"); static ref SAFE_MANAGEMENT_MODULE_ADDR: Address = "9b91245a65ad469163a86e32b2281af7a25f38ce".parse().expect("lazy static address should be constructible"); static ref SAFE_INSTANCE_ADDR: Address = "b93d7fdd605fb64fdcc87f21590f950170719d47".parse().expect("lazy static address should be constructible"); static ref TICKET_PRICE_ORACLE_ADDR: Address = "11db4391bf45ef31a10ea4a1b5cb90f46cc72c7e".parse().expect("lazy static address should be constructible"); static ref WIN_PROB_ORACLE_ADDR: Address = "00db4391bf45ef31a10ea4a1b5cb90f46cc64c7e".parse().expect("lazy static address should be constructible"); }
944
945 fn init_handlers<Db: HoprDbAllOperations + Clone>(db: Db) -> ContractEventHandlers<Db> {
946 ContractEventHandlers {
947 addresses: Arc::new(ContractAddresses {
948 channels: *CHANNELS_ADDR,
949 token: *TOKEN_ADDR,
950 network_registry: *NETWORK_REGISTRY_ADDR,
951 network_registry_proxy: Default::default(),
952 safe_registry: *NODE_SAFE_REGISTRY_ADDR,
953 announcements: *ANNOUNCEMENTS_ADDR,
954 module_implementation: *SAFE_MANAGEMENT_MODULE_ADDR,
955 price_oracle: *TICKET_PRICE_ORACLE_ADDR,
956 win_prob_oracle: *WIN_PROB_ORACLE_ADDR,
957 stake_factory: Default::default(),
958 }),
959 chain_key: SELF_CHAIN_KEY.clone(),
960 safe_address: SELF_CHAIN_KEY.public().to_address(),
961 db,
962 }
963 }
964
965 fn test_log() -> SerializableLog {
966 SerializableLog { ..Default::default() }
967 }
968
969 #[async_std::test]
970 async fn announce_keybinding() -> anyhow::Result<()> {
971 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
972
973 let handlers = init_handlers(db.clone());
974
975 let keybinding = KeyBinding::new(*SELF_CHAIN_ADDRESS, &SELF_PRIV_KEY);
976
977 let keybinding_log = SerializableLog {
978 address: handlers.addresses.announcements.into(),
979 topics: vec![KeyBindingFilter::signature().into()],
980 data: encode(&[
981 Token::FixedBytes(keybinding.signature.as_ref().to_vec()),
982 Token::FixedBytes(keybinding.packet_key.as_ref().to_vec()),
983 Token::Address(EthereumAddress::from_slice(
984 &SELF_CHAIN_KEY.public().to_address().as_ref(),
985 )),
986 ])
987 .into(),
988 ..test_log()
989 };
990
991 let account_entry = AccountEntry::new(*SELF_PRIV_KEY.public(), *SELF_CHAIN_ADDRESS, AccountType::NotAnnounced);
992
993 let event_type = db
994 .begin_transaction()
995 .await?
996 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, keybinding_log.into()).await }))
997 .await?;
998
999 assert!(event_type.is_none(), "keybinding does not have a chain event type");
1000
1001 assert_eq!(
1002 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1003 .await?
1004 .context("a value should be present")?,
1005 account_entry
1006 );
1007 Ok(())
1008 }
1009
1010 #[async_std::test]
1011 async fn announce_address_announcement() -> anyhow::Result<()> {
1012 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1013
1014 let handlers = init_handlers(db.clone());
1015
1016 let account_entry = AccountEntry::new(*SELF_PRIV_KEY.public(), *SELF_CHAIN_ADDRESS, AccountType::NotAnnounced);
1018 db.insert_account(None, account_entry.clone()).await?;
1019
1020 let test_multiaddr_empty: Multiaddr = "".parse()?;
1021
1022 let address_announcement_empty_log = SerializableLog {
1023 address: handlers.addresses.announcements.into(),
1024 topics: vec![AddressAnnouncementFilter::signature().into()],
1025 data: encode(&[
1026 Token::Address(EthereumAddress::from_slice(&SELF_CHAIN_ADDRESS.as_ref())),
1027 Token::String(test_multiaddr_empty.to_string()),
1028 ])
1029 .into(),
1030 ..test_log()
1031 };
1032
1033 let handlers_clone = handlers.clone();
1034 let event_type = db
1035 .begin_transaction()
1036 .await?
1037 .perform(|tx| {
1038 Box::pin(async move {
1039 handlers_clone
1040 .process_log_event(tx, address_announcement_empty_log.into())
1041 .await
1042 })
1043 })
1044 .await?;
1045
1046 assert!(
1047 event_type.is_none(),
1048 "announcement of empty multiaddresses must pass through"
1049 );
1050
1051 assert_eq!(
1052 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1053 .await?
1054 .context("a value should be present")?,
1055 account_entry
1056 );
1057
1058 let test_multiaddr: Multiaddr = "/ip4/1.2.3.4/tcp/56".parse()?;
1059
1060 let address_announcement_log = SerializableLog {
1061 address: handlers.addresses.announcements.into(),
1062 block_number: 1,
1063 topics: vec![AddressAnnouncementFilter::signature().into()],
1064 data: encode(&[
1065 Token::Address(EthereumAddress::from_slice(&SELF_CHAIN_ADDRESS.as_ref())),
1066 Token::String(test_multiaddr.to_string()),
1067 ])
1068 .into(),
1069 ..test_log()
1070 };
1071
1072 let announced_account_entry = AccountEntry::new(
1073 *SELF_PRIV_KEY.public(),
1074 *SELF_CHAIN_ADDRESS,
1075 AccountType::Announced {
1076 multiaddr: test_multiaddr.clone(),
1077 updated_block: 1,
1078 },
1079 );
1080
1081 let handlers_clone = handlers.clone();
1082 let event_type = db
1083 .begin_transaction()
1084 .await?
1085 .perform(|tx| {
1086 Box::pin(async move {
1087 handlers_clone
1088 .process_log_event(tx, address_announcement_log.into())
1089 .await
1090 })
1091 })
1092 .await?;
1093
1094 assert!(
1095 matches!(event_type, Some(ChainEventType::Announcement { multiaddresses,.. }) if multiaddresses == vec![test_multiaddr]),
1096 "must return the latest announce multiaddress"
1097 );
1098
1099 assert_eq!(
1100 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1101 .await?
1102 .context("a value should be present")?,
1103 announced_account_entry
1104 );
1105
1106 assert_eq!(
1107 Some(*SELF_CHAIN_ADDRESS),
1108 db.resolve_chain_key(SELF_PRIV_KEY.public()).await?,
1109 "must resolve correct chain key"
1110 );
1111
1112 assert_eq!(
1113 Some(*SELF_PRIV_KEY.public()),
1114 db.resolve_packet_key(&SELF_CHAIN_ADDRESS).await?,
1115 "must resolve correct packet key"
1116 );
1117
1118 let test_multiaddr_dns: Multiaddr = "/dns4/useful.domain/tcp/56".parse()?;
1119
1120 let address_announcement_dns_log = SerializableLog {
1121 address: handlers.addresses.announcements.into(),
1122 block_number: 2,
1123 topics: vec![AddressAnnouncementFilter::signature().into()],
1124 data: encode(&[
1125 Token::Address(EthereumAddress::from_slice(&SELF_CHAIN_ADDRESS.as_ref())),
1126 Token::String(test_multiaddr_dns.to_string()),
1127 ])
1128 .into(),
1129 ..test_log()
1130 };
1131
1132 let announced_dns_account_entry = AccountEntry::new(
1133 *SELF_PRIV_KEY.public(),
1134 *SELF_CHAIN_ADDRESS,
1135 AccountType::Announced {
1136 multiaddr: test_multiaddr_dns.clone(),
1137 updated_block: 2,
1138 },
1139 );
1140
1141 let event_type = db
1142 .begin_transaction()
1143 .await?
1144 .perform(|tx| {
1145 Box::pin(async move {
1146 handlers
1147 .process_log_event(tx, address_announcement_dns_log.into())
1148 .await
1149 })
1150 })
1151 .await?;
1152
1153 assert!(
1154 matches!(event_type, Some(ChainEventType::Announcement { multiaddresses,.. }) if multiaddresses == vec![test_multiaddr_dns]),
1155 "must return the latest announce multiaddress"
1156 );
1157
1158 assert_eq!(
1159 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1160 .await?
1161 .context("a value should be present")?,
1162 announced_dns_account_entry
1163 );
1164
1165 assert_eq!(
1166 Some(*SELF_CHAIN_ADDRESS),
1167 db.resolve_chain_key(SELF_PRIV_KEY.public()).await?,
1168 "must resolve correct chain key"
1169 );
1170
1171 assert_eq!(
1172 Some(*SELF_PRIV_KEY.public()),
1173 db.resolve_packet_key(&SELF_CHAIN_ADDRESS).await?,
1174 "must resolve correct packet key"
1175 );
1176 Ok(())
1177 }
1178
1179 #[async_std::test]
1180 async fn announce_revoke() -> anyhow::Result<()> {
1181 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1182 let handlers = init_handlers(db.clone());
1183
1184 let test_multiaddr: Multiaddr = "/ip4/1.2.3.4/tcp/56".parse()?;
1185
1186 let announced_account_entry = AccountEntry::new(
1188 *SELF_PRIV_KEY.public(),
1189 *SELF_CHAIN_ADDRESS,
1190 AccountType::Announced {
1191 multiaddr: test_multiaddr,
1192 updated_block: 0,
1193 },
1194 );
1195 db.insert_account(None, announced_account_entry).await?;
1196
1197 let revoke_announcement_log = SerializableLog {
1198 address: handlers.addresses.announcements.into(),
1199 topics: vec![RevokeAnnouncementFilter::signature().into()],
1200 data: encode(&[Token::Address(EthereumAddress::from_slice(
1201 &SELF_CHAIN_ADDRESS.as_ref(),
1202 ))])
1203 .into(),
1204 ..test_log()
1205 };
1206
1207 let account_entry = AccountEntry::new(*SELF_PRIV_KEY.public(), *SELF_CHAIN_ADDRESS, AccountType::NotAnnounced);
1208
1209 let event_type = db
1210 .begin_transaction()
1211 .await?
1212 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, revoke_announcement_log.into()).await }))
1213 .await?;
1214
1215 assert!(
1216 event_type.is_none(),
1217 "revoke announcement does not have chain event type"
1218 );
1219
1220 assert_eq!(
1221 db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1222 .await?
1223 .context("a value should be present")?,
1224 account_entry
1225 );
1226 Ok(())
1227 }
1228
1229 #[async_std::test]
1230 async fn on_token_transfer_to() -> anyhow::Result<()> {
1231 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1232
1233 let handlers = init_handlers(db.clone());
1234
1235 let value = U256::max_value();
1236
1237 let transferred_log = SerializableLog {
1238 address: handlers.addresses.token.into(),
1239 topics: vec![
1240 TransferFilter::signature().into(),
1241 H256::from_slice(&Address::default().to_bytes32()).into(),
1242 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1243 ],
1244 data: encode(&[Token::Uint(value)]).into(),
1245 ..test_log()
1246 };
1247
1248 let event_type = db
1249 .begin_transaction()
1250 .await?
1251 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, transferred_log.into()).await }))
1252 .await?;
1253
1254 assert!(event_type.is_none(), "token transfer does not have chain event type");
1255
1256 assert_eq!(
1257 db.get_safe_hopr_balance(None).await?,
1258 Balance::new(value, BalanceType::HOPR)
1259 );
1260
1261 Ok(())
1262 }
1263
1264 #[async_std::test]
1265 async fn on_token_transfer_from() -> anyhow::Result<()> {
1266 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1267
1268 let handlers = init_handlers(db.clone());
1269
1270 let value = U256::max_value();
1271
1272 db.set_safe_hopr_balance(None, BalanceType::HOPR.balance(value)).await?;
1273
1274 let transferred_log = SerializableLog {
1275 address: handlers.addresses.token.into(),
1276 topics: vec![
1277 TransferFilter::signature().into(),
1278 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1279 H256::from_slice(&Address::default().to_bytes32()).into(),
1280 ],
1281 data: encode(&[Token::Uint(value)]).into(),
1282 ..test_log()
1283 };
1284
1285 let event_type = db
1286 .begin_transaction()
1287 .await?
1288 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, transferred_log.into()).await }))
1289 .await?;
1290
1291 assert!(event_type.is_none(), "token transfer does not have chain event type");
1292
1293 assert_eq!(db.get_safe_hopr_balance(None).await?, BalanceType::HOPR.zero());
1294
1295 Ok(())
1296 }
1297
1298 #[async_std::test]
1299 async fn on_token_approval_correct() -> anyhow::Result<()> {
1300 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1301
1302 let handlers = init_handlers(db.clone());
1303
1304 let approval_log = SerializableLog {
1305 address: handlers.addresses.token.into(),
1306 topics: vec![
1307 ApprovalFilter::signature().into(),
1308 H256::from_slice(&handlers.safe_address.to_bytes32()).into(),
1309 H256::from_slice(&handlers.addresses.channels.to_bytes32()).into(),
1310 ],
1311 data: encode(&[Token::Uint(EthU256::from(1000u64))]).into(),
1312 ..test_log()
1313 };
1314
1315 assert_eq!(
1316 db.get_safe_hopr_allowance(None).await?,
1317 Balance::new(U256::from(0u64), BalanceType::HOPR)
1318 );
1319
1320 let approval_log_clone = approval_log.clone();
1321 let handlers_clone = handlers.clone();
1322 let event_type = db
1323 .begin_transaction()
1324 .await?
1325 .perform(|tx| {
1326 Box::pin(async move { handlers_clone.process_log_event(tx, approval_log_clone.into()).await })
1327 })
1328 .await?;
1329
1330 assert!(event_type.is_none(), "token approval does not have chain event type");
1331
1332 assert_eq!(
1333 db.get_safe_hopr_allowance(None).await?,
1334 Balance::new(U256::from(1000u64), BalanceType::HOPR)
1335 );
1336
1337 let _ = db
1339 .set_safe_hopr_allowance(None, Balance::new(U256::from(10u64), BalanceType::HOPR))
1340 .await;
1341 assert_eq!(
1342 db.get_safe_hopr_allowance(None).await?,
1343 Balance::new(U256::from(10u64), BalanceType::HOPR)
1344 );
1345
1346 let handlers_clone = handlers.clone();
1347 let event_type = db
1348 .begin_transaction()
1349 .await?
1350 .perform(|tx| Box::pin(async move { handlers_clone.process_log_event(tx, approval_log.into()).await }))
1351 .await?;
1352
1353 assert!(event_type.is_none(), "token approval does not have chain event type");
1354
1355 assert_eq!(
1356 db.get_safe_hopr_allowance(None).await?,
1357 Balance::new(U256::from(1000u64), BalanceType::HOPR)
1358 );
1359 Ok(())
1360 }
1361
1362 #[async_std::test]
1363 async fn on_network_registry_event_registered() -> anyhow::Result<()> {
1364 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1365
1366 let handlers = init_handlers(db.clone());
1367
1368 let registered_log = SerializableLog {
1369 address: handlers.addresses.network_registry.into(),
1370 topics: vec![
1371 RegisteredFilter::signature().into(),
1372 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1373 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1374 ],
1375 data: encode(&[]).into(),
1376 ..test_log()
1377 };
1378
1379 assert!(
1380 !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1381 .await?
1382 );
1383
1384 let event_type = db
1385 .begin_transaction()
1386 .await?
1387 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log.into()).await }))
1388 .await?;
1389
1390 assert!(
1391 matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Allowed),
1392 "must return correct NR update"
1393 );
1394
1395 assert!(
1396 db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1397 .await?,
1398 "must be allowed in NR"
1399 );
1400 Ok(())
1401 }
1402
1403 #[async_std::test]
1404 async fn on_network_registry_event_registered_by_manager() -> anyhow::Result<()> {
1405 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1406
1407 let handlers = init_handlers(db.clone());
1408
1409 let registered_log = SerializableLog {
1410 address: handlers.addresses.network_registry.into(),
1411 topics: vec![
1412 RegisteredByManagerFilter::signature().into(),
1413 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1414 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1415 ],
1416 data: encode(&[]).into(),
1417 ..test_log()
1418 };
1419
1420 assert!(
1421 !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1422 .await?
1423 );
1424
1425 let event_type = db
1426 .begin_transaction()
1427 .await?
1428 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log.into()).await }))
1429 .await?;
1430
1431 assert!(
1432 matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Allowed),
1433 "must return correct NR update"
1434 );
1435
1436 assert!(
1437 db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1438 .await?,
1439 "must be allowed in NR"
1440 );
1441 Ok(())
1442 }
1443
1444 #[async_std::test]
1445 async fn on_network_registry_event_deregistered() -> anyhow::Result<()> {
1446 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1447
1448 let handlers = init_handlers(db.clone());
1449
1450 db.set_access_in_network_registry(None, *SELF_CHAIN_ADDRESS, true)
1451 .await?;
1452
1453 let registered_log = SerializableLog {
1454 address: handlers.addresses.network_registry.into(),
1455 topics: vec![
1456 DeregisteredFilter::signature().into(),
1457 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1458 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1459 ],
1460 data: encode(&[]).into(),
1461 ..test_log()
1462 };
1463
1464 assert!(
1465 db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1466 .await?
1467 );
1468
1469 let event_type = db
1470 .begin_transaction()
1471 .await?
1472 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log.into()).await }))
1473 .await?;
1474
1475 assert!(
1476 matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Denied),
1477 "must return correct NR update"
1478 );
1479
1480 assert!(
1481 !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1482 .await?,
1483 "must not be allowed in NR"
1484 );
1485 Ok(())
1486 }
1487
1488 #[async_std::test]
1489 async fn on_network_registry_event_deregistered_by_manager() -> anyhow::Result<()> {
1490 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1491
1492 let handlers = init_handlers(db.clone());
1493
1494 db.set_access_in_network_registry(None, *SELF_CHAIN_ADDRESS, true)
1495 .await?;
1496
1497 let registered_log = SerializableLog {
1498 address: handlers.addresses.network_registry.into(),
1499 topics: vec![
1500 DeregisteredByManagerFilter::signature().into(),
1501 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1502 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1503 ],
1504 data: encode(&[]).into(),
1505 ..test_log()
1506 };
1507
1508 assert!(
1509 db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1510 .await?
1511 );
1512
1513 let event_type = db
1514 .begin_transaction()
1515 .await?
1516 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log.into()).await }))
1517 .await?;
1518
1519 assert!(
1520 matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Denied),
1521 "must return correct NR update"
1522 );
1523
1524 assert!(
1525 !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1526 .await?,
1527 "must not be allowed in NR"
1528 );
1529 Ok(())
1530 }
1531
1532 #[async_std::test]
1533 async fn on_network_registry_event_enabled() -> anyhow::Result<()> {
1534 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1535
1536 let handlers = init_handlers(db.clone());
1537
1538 let nr_enabled = SerializableLog {
1539 address: handlers.addresses.network_registry.into(),
1540 topics: vec![
1541 NetworkRegistryStatusUpdatedFilter::signature().into(),
1542 H256::from_low_u64_be(1).into(),
1543 ],
1544 data: encode(&[]).into(),
1545 ..test_log()
1546 };
1547
1548 let event_type = db
1549 .begin_transaction()
1550 .await?
1551 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, nr_enabled.into()).await }))
1552 .await?;
1553
1554 assert!(event_type.is_none(), "there's no chain event type for nr disable");
1555
1556 assert!(db.get_indexer_data(None).await?.nr_enabled);
1557 Ok(())
1558 }
1559
1560 #[async_std::test]
1561 async fn on_network_registry_event_disabled() -> anyhow::Result<()> {
1562 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1563
1564 let handlers = init_handlers(db.clone());
1565
1566 db.set_network_registry_enabled(None, true).await?;
1567
1568 let nr_disabled = SerializableLog {
1569 address: handlers.addresses.network_registry.into(),
1570 topics: vec![
1571 NetworkRegistryStatusUpdatedFilter::signature().into(),
1572 H256::from_low_u64_be(0).into(),
1573 ],
1574 data: encode(&[]).into(),
1575 ..test_log()
1576 };
1577
1578 let event_type = db
1579 .begin_transaction()
1580 .await?
1581 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, nr_disabled.into()).await }))
1582 .await?;
1583
1584 assert!(event_type.is_none(), "there's no chain event type for nr enable");
1585
1586 assert!(!db.get_indexer_data(None).await?.nr_enabled);
1587 Ok(())
1588 }
1589
1590 #[async_std::test]
1591 async fn on_network_registry_set_eligible() -> anyhow::Result<()> {
1592 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1593
1594 let handlers = init_handlers(db.clone());
1595
1596 let set_eligible = SerializableLog {
1597 address: handlers.addresses.network_registry.into(),
1598 topics: vec![
1599 EligibilityUpdatedFilter::signature().into(),
1600 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1601 H256::from_low_u64_be(1).into(),
1602 ],
1603 data: encode(&[]).into(),
1604 ..test_log()
1605 };
1606
1607 let event_type = db
1608 .begin_transaction()
1609 .await?
1610 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, set_eligible.into()).await }))
1611 .await?;
1612
1613 assert!(
1614 event_type.is_none(),
1615 "there's no chain event type for setting nr eligibility"
1616 );
1617
1618 assert!(db.is_safe_eligible(None, *STAKE_ADDRESS).await?);
1619
1620 Ok(())
1621 }
1622
1623 #[async_std::test]
1624 async fn on_network_registry_set_not_eligible() -> anyhow::Result<()> {
1625 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1626
1627 let handlers = init_handlers(db.clone());
1628
1629 db.set_safe_eligibility(None, *STAKE_ADDRESS, false).await?;
1630
1631 let set_eligible = SerializableLog {
1632 address: handlers.addresses.network_registry.into(),
1633 topics: vec![
1634 EligibilityUpdatedFilter::signature().into(),
1635 H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1636 H256::from_low_u64_be(0).into(),
1637 ],
1638 data: encode(&[]).into(),
1639 ..test_log()
1640 };
1641
1642 let event_type = db
1643 .begin_transaction()
1644 .await?
1645 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, set_eligible.into()).await }))
1646 .await?;
1647
1648 assert!(
1649 event_type.is_none(),
1650 "there's no chain event type for unsetting nr eligibility"
1651 );
1652
1653 assert!(!db.is_safe_eligible(None, *STAKE_ADDRESS).await?);
1654
1655 Ok(())
1656 }
1657
1658 #[async_std::test]
1659 async fn on_channel_event_balance_increased() -> anyhow::Result<()> {
1660 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1661
1662 let handlers = init_handlers(db.clone());
1663
1664 let channel = ChannelEntry::new(
1665 *SELF_CHAIN_ADDRESS,
1666 *COUNTERPARTY_CHAIN_ADDRESS,
1667 Balance::new(U256::zero(), BalanceType::HOPR),
1668 U256::zero(),
1669 ChannelStatus::Open,
1670 U256::one(),
1671 );
1672
1673 db.upsert_channel(None, channel).await?;
1674
1675 let solidity_balance = BalanceType::HOPR.balance(U256::from((1u128 << 96) - 1));
1676 let diff = solidity_balance - channel.balance;
1677
1678 let balance_increased_log = SerializableLog {
1679 address: handlers.addresses.channels.into(),
1680 topics: vec![
1681 ChannelBalanceIncreasedFilter::signature().into(),
1682 H256::from_slice(channel.get_id().as_ref()).into(),
1683 ],
1684 data: Vec::from(solidity_balance.amount().to_be_bytes()).into(),
1685 ..test_log()
1686 };
1687
1688 let event_type = db
1689 .begin_transaction()
1690 .await?
1691 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_increased_log.into()).await }))
1692 .await?;
1693
1694 let channel = db
1695 .get_channel_by_id(None, &channel.get_id())
1696 .await?
1697 .context("a value should be present")?;
1698
1699 assert!(
1700 matches!(event_type, Some(ChainEventType::ChannelBalanceIncreased(c, b)) if c == channel && b == diff),
1701 "must return updated channel entry and balance diff"
1702 );
1703
1704 assert_eq!(solidity_balance, channel.balance, "balance must be updated");
1705 Ok(())
1706 }
1707
1708 #[async_std::test]
1709 async fn on_channel_event_domain_separator_updated() -> anyhow::Result<()> {
1710 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1711
1712 let handlers = init_handlers(db.clone());
1713
1714 let separator = Hash::from(hopr_crypto_random::random_bytes());
1715
1716 let channels_dst_updated = SerializableLog {
1717 address: handlers.addresses.channels.into(),
1718 topics: vec![
1719 DomainSeparatorUpdatedFilter::signature().into(),
1720 H256::from_slice(separator.as_ref()).into(),
1721 ],
1722 data: encode(&[]).into(),
1723 ..test_log()
1724 };
1725
1726 assert!(db.get_indexer_data(None).await?.channels_dst.is_none());
1727
1728 let event_type = db
1729 .begin_transaction()
1730 .await?
1731 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channels_dst_updated.into()).await }))
1732 .await?;
1733
1734 assert!(
1735 event_type.is_none(),
1736 "there's no chain event type for channel dst update"
1737 );
1738
1739 assert_eq!(
1740 separator,
1741 db.get_indexer_data(None)
1742 .await?
1743 .channels_dst
1744 .context("a value should be present")?,
1745 "separator must be updated"
1746 );
1747 Ok(())
1748 }
1749
1750 #[async_std::test]
1751 async fn on_channel_event_balance_decreased() -> anyhow::Result<()> {
1752 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1753
1754 let handlers = init_handlers(db.clone());
1755
1756 let channel = ChannelEntry::new(
1757 *SELF_CHAIN_ADDRESS,
1758 *COUNTERPARTY_CHAIN_ADDRESS,
1759 Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR),
1760 U256::zero(),
1761 ChannelStatus::Open,
1762 U256::one(),
1763 );
1764
1765 db.upsert_channel(None, channel).await?;
1766
1767 let solidity_balance = U256::from((1u128 << 96) - 2);
1768 let diff = channel.balance - solidity_balance;
1769
1770 let balance_decreased_log = SerializableLog {
1771 address: handlers.addresses.channels.into(),
1772 topics: vec![
1773 ChannelBalanceDecreasedFilter::signature().into(),
1774 H256::from_slice(channel.get_id().as_ref()).into(),
1775 ],
1776 data: Vec::from(solidity_balance.to_be_bytes()).into(),
1777 ..test_log()
1778 };
1779
1780 let event_type = db
1781 .begin_transaction()
1782 .await?
1783 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_decreased_log.into()).await }))
1784 .await?;
1785
1786 let channel = db
1787 .get_channel_by_id(None, &channel.get_id())
1788 .await?
1789 .context("a value should be present")?;
1790
1791 assert!(
1792 matches!(event_type, Some(ChainEventType::ChannelBalanceDecreased(c, b)) if c == channel && b == diff),
1793 "must return updated channel entry and balance diff"
1794 );
1795
1796 assert_eq!(solidity_balance, channel.balance.amount(), "balance must be updated");
1797 Ok(())
1798 }
1799
1800 #[async_std::test]
1801 async fn on_channel_closed() -> anyhow::Result<()> {
1802 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1803
1804 let handlers = init_handlers(db.clone());
1805
1806 let starting_balance = Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR);
1807
1808 let channel = ChannelEntry::new(
1809 *SELF_CHAIN_ADDRESS,
1810 *COUNTERPARTY_CHAIN_ADDRESS,
1811 starting_balance,
1812 U256::zero(),
1813 ChannelStatus::Open,
1814 U256::one(),
1815 );
1816
1817 db.upsert_channel(None, channel).await?;
1818
1819 let channel_closed_log = SerializableLog {
1820 address: handlers.addresses.channels.into(),
1821 topics: vec![
1822 ChannelClosedFilter::signature().into(),
1823 H256::from_slice(channel.get_id().as_ref()).into(),
1824 ],
1825 data: encode(&[]).into(),
1826 ..test_log()
1827 };
1828
1829 let event_type = db
1830 .begin_transaction()
1831 .await?
1832 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_closed_log.into()).await }))
1833 .await?;
1834
1835 let closed_channel = db
1836 .get_channel_by_id(None, &channel.get_id())
1837 .await?
1838 .context("a value should be present")?;
1839
1840 assert!(
1841 matches!(event_type, Some(ChainEventType::ChannelClosed(c)) if c == closed_channel),
1842 "must return the updated channel entry"
1843 );
1844
1845 assert_eq!(closed_channel.status, ChannelStatus::Closed);
1846 assert_eq!(closed_channel.ticket_index, 0u64.into());
1847 assert_eq!(
1848 0,
1849 db.get_outgoing_ticket_index(closed_channel.get_id())
1850 .await?
1851 .load(Ordering::Relaxed)
1852 );
1853
1854 assert!(closed_channel.balance.amount().eq(&U256::zero()));
1855 Ok(())
1856 }
1857
1858 #[async_std::test]
1859 async fn on_foreign_channel_closed() -> anyhow::Result<()> {
1860 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1861
1862 let handlers = init_handlers(db.clone());
1863
1864 let starting_balance = Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR);
1865
1866 let channel = ChannelEntry::new(
1867 Address::new(&hex!("B7397C218766eBe6A1A634df523A1a7e412e67eA")),
1868 Address::new(&hex!("D4fdec44DB9D44B8f2b6d529620f9C0C7066A2c1")),
1869 starting_balance,
1870 U256::zero(),
1871 ChannelStatus::Open,
1872 U256::one(),
1873 );
1874
1875 db.upsert_channel(None, channel).await?;
1876
1877 let channel_closed_log = SerializableLog {
1878 address: handlers.addresses.channels.into(),
1879 topics: vec![
1880 ChannelClosedFilter::signature().into(),
1881 H256::from_slice(channel.get_id().as_ref()).into(),
1882 ],
1883 data: encode(&[]).into(),
1884 ..test_log()
1885 };
1886
1887 let event_type = db
1888 .begin_transaction()
1889 .await?
1890 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_closed_log.into()).await }))
1891 .await?;
1892
1893 let closed_channel = db.get_channel_by_id(None, &channel.get_id()).await?;
1894
1895 assert_eq!(None, closed_channel, "foreign channel must be deleted");
1896
1897 assert!(
1898 matches!(event_type, Some(ChainEventType::ChannelClosed(c)) if c.get_id() == channel.get_id()),
1899 "must return the closed channel entry"
1900 );
1901
1902 Ok(())
1903 }
1904
1905 #[async_std::test]
1906 async fn on_channel_opened() -> anyhow::Result<()> {
1907 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1908
1909 let handlers = init_handlers(db.clone());
1910
1911 let channel_id = generate_channel_id(&SELF_CHAIN_ADDRESS, &COUNTERPARTY_CHAIN_ADDRESS);
1912
1913 let channel_opened_log = SerializableLog {
1914 address: handlers.addresses.channels.into(),
1915 topics: vec![
1916 ChannelOpenedFilter::signature().into(),
1917 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1918 H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
1919 ],
1920 data: encode(&[]).into(),
1921 ..test_log()
1922 };
1923
1924 let event_type = db
1925 .begin_transaction()
1926 .await?
1927 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log.into()).await }))
1928 .await?;
1929
1930 let channel = db
1931 .get_channel_by_id(None, &channel_id)
1932 .await?
1933 .context("a value should be present")?;
1934
1935 assert!(
1936 matches!(event_type, Some(ChainEventType::ChannelOpened(c)) if c == channel),
1937 "must return the updated channel entry"
1938 );
1939
1940 assert_eq!(channel.status, ChannelStatus::Open);
1941 assert_eq!(channel.channel_epoch, 1u64.into());
1942 assert_eq!(channel.ticket_index, 0u64.into());
1943 assert_eq!(
1944 0,
1945 db.get_outgoing_ticket_index(channel.get_id())
1946 .await?
1947 .load(Ordering::Relaxed)
1948 );
1949 Ok(())
1950 }
1951
1952 #[async_std::test]
1953 async fn on_channel_reopened() -> anyhow::Result<()> {
1954 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1955
1956 let handlers = init_handlers(db.clone());
1957
1958 let channel = ChannelEntry::new(
1959 *SELF_CHAIN_ADDRESS,
1960 *COUNTERPARTY_CHAIN_ADDRESS,
1961 Balance::zero(BalanceType::HOPR),
1962 U256::zero(),
1963 ChannelStatus::Closed,
1964 3.into(),
1965 );
1966
1967 db.upsert_channel(None, channel).await?;
1968
1969 let channel_opened_log = SerializableLog {
1970 address: handlers.addresses.channels.into(),
1971 topics: vec![
1972 ChannelOpenedFilter::signature().into(),
1973 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1974 H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
1975 ],
1976 data: encode(&[]).into(),
1977 ..test_log()
1978 };
1979
1980 let event_type = db
1981 .begin_transaction()
1982 .await?
1983 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log.into()).await }))
1984 .await?;
1985
1986 let channel = db
1987 .get_channel_by_id(None, &channel.get_id())
1988 .await?
1989 .context("a value should be present")?;
1990
1991 assert!(
1992 matches!(event_type, Some(ChainEventType::ChannelOpened(c)) if c == channel),
1993 "must return the updated channel entry"
1994 );
1995
1996 assert_eq!(channel.status, ChannelStatus::Open);
1997 assert_eq!(channel.channel_epoch, 4u64.into());
1998 assert_eq!(channel.ticket_index, 0u64.into());
1999
2000 assert_eq!(
2001 0,
2002 db.get_outgoing_ticket_index(channel.get_id())
2003 .await?
2004 .load(Ordering::Relaxed)
2005 );
2006 Ok(())
2007 }
2008
2009 #[async_std::test]
2010 async fn on_channel_should_not_reopen_when_not_closed() -> anyhow::Result<()> {
2011 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2012
2013 let handlers = init_handlers(db.clone());
2014
2015 let channel = ChannelEntry::new(
2016 *SELF_CHAIN_ADDRESS,
2017 *COUNTERPARTY_CHAIN_ADDRESS,
2018 Balance::zero(BalanceType::HOPR),
2019 U256::zero(),
2020 ChannelStatus::Open,
2021 3.into(),
2022 );
2023
2024 db.upsert_channel(None, channel).await?;
2025
2026 let channel_opened_log = SerializableLog {
2027 address: handlers.addresses.channels.into(),
2028 topics: vec![
2029 ChannelOpenedFilter::signature().into(),
2030 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2031 H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2032 ],
2033 data: encode(&[]).into(),
2034 ..test_log()
2035 };
2036
2037 db.begin_transaction()
2038 .await?
2039 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log.into()).await }))
2040 .await
2041 .expect_err("should not re-open channel that is not Closed");
2042 Ok(())
2043 }
2044
2045 const PRICE_PER_PACKET: u32 = 20_u32;
2046
2047 fn mock_acknowledged_ticket(
2048 signer: &ChainKeypair,
2049 destination: &ChainKeypair,
2050 index: u64,
2051 win_prob: f64,
2052 ) -> anyhow::Result<AcknowledgedTicket> {
2053 let channel_id = generate_channel_id(&signer.into(), &destination.into());
2054
2055 let channel_epoch = 1u64;
2056 let domain_separator = Hash::default();
2057
2058 let response = Response::try_from(
2059 Hash::create(&[channel_id.as_ref(), &channel_epoch.to_be_bytes(), &index.to_be_bytes()]).as_ref(),
2060 )?;
2061
2062 Ok(TicketBuilder::default()
2063 .direction(&signer.into(), &destination.into())
2064 .amount(U256::from(PRICE_PER_PACKET).div_f64(win_prob)?)
2065 .index(index)
2066 .index_offset(1)
2067 .win_prob(win_prob)
2068 .channel_epoch(1)
2069 .challenge(response.to_challenge().into())
2070 .build_signed(signer, &domain_separator)?
2071 .into_acknowledged(response))
2072 }
2073
2074 #[async_std::test]
2075 async fn on_channel_ticket_redeemed_incoming_channel() -> anyhow::Result<()> {
2076 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2077 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2078 .await?;
2079
2080 let handlers = init_handlers(db.clone());
2081
2082 let channel = ChannelEntry::new(
2083 *COUNTERPARTY_CHAIN_ADDRESS,
2084 *SELF_CHAIN_ADDRESS,
2085 Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR),
2086 U256::zero(),
2087 ChannelStatus::Open,
2088 U256::one(),
2089 );
2090
2091 let ticket_index = U256::from((1u128 << 48) - 1);
2092 let next_ticket_index = ticket_index + 1;
2093
2094 let mut ticket =
2095 mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64(), 1.0)?;
2096 ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
2097
2098 let ticket_value = ticket.verified_ticket().amount;
2099
2100 db.upsert_channel(None, channel).await?;
2101 db.upsert_ticket(None, ticket.clone()).await?;
2102
2103 let ticket_redeemed_log = SerializableLog {
2104 address: handlers.addresses.channels.into(),
2105 topics: vec![
2106 TicketRedeemedFilter::signature().into(),
2107 H256::from_slice(channel.get_id().as_ref()).into(),
2108 ],
2109 data: Vec::from(next_ticket_index.to_be_bytes()).into(),
2110 ..test_log()
2111 };
2112
2113 let outgoing_ticket_index_before = db
2114 .get_outgoing_ticket_index(channel.get_id())
2115 .await?
2116 .load(Ordering::Relaxed);
2117
2118 let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2119 assert_eq!(
2120 BalanceType::HOPR.zero(),
2121 stats.redeemed_value,
2122 "there should not be any redeemed value"
2123 );
2124 assert_eq!(
2125 BalanceType::HOPR.zero(),
2126 stats.neglected_value,
2127 "there should not be any neglected value"
2128 );
2129
2130 let event_type = db
2131 .begin_transaction()
2132 .await?
2133 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log.into()).await }))
2134 .await?;
2135
2136 let channel = db
2137 .get_channel_by_id(None, &channel.get_id())
2138 .await?
2139 .context("a value should be present")?;
2140
2141 assert!(
2142 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, t)) if channel == c && t == Some(ticket)),
2143 "must return the updated channel entry and the redeemed ticket"
2144 );
2145
2146 assert_eq!(
2147 channel.ticket_index, next_ticket_index,
2148 "channel entry must contain next ticket index"
2149 );
2150
2151 let outgoing_ticket_index_after = db
2152 .get_outgoing_ticket_index(channel.get_id())
2153 .await?
2154 .load(Ordering::Relaxed);
2155
2156 assert_eq!(
2157 outgoing_ticket_index_before, outgoing_ticket_index_after,
2158 "outgoing ticket index must not change"
2159 );
2160
2161 let tickets = db.get_tickets((&channel).into()).await?;
2162 assert!(tickets.is_empty(), "there should not be any tickets left");
2163
2164 let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2165 assert_eq!(
2166 ticket_value, stats.redeemed_value,
2167 "there should be redeemed value worth 1 ticket"
2168 );
2169 assert_eq!(
2170 BalanceType::HOPR.zero(),
2171 stats.neglected_value,
2172 "there should not be any neglected ticket"
2173 );
2174 Ok(())
2175 }
2176
2177 #[async_std::test]
2178 async fn on_channel_ticket_redeemed_incoming_channel_neglect_left_over_tickets() -> anyhow::Result<()> {
2179 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2180 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2181 .await?;
2182
2183 let handlers = init_handlers(db.clone());
2184
2185 let channel = ChannelEntry::new(
2186 *COUNTERPARTY_CHAIN_ADDRESS,
2187 *SELF_CHAIN_ADDRESS,
2188 Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR),
2189 U256::zero(),
2190 ChannelStatus::Open,
2191 U256::one(),
2192 );
2193
2194 let ticket_index = U256::from((1u128 << 48) - 1);
2195 let next_ticket_index = ticket_index + 1;
2196
2197 let mut ticket =
2198 mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64(), 1.0)?;
2199 ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
2200
2201 let ticket_value = ticket.verified_ticket().amount;
2202
2203 db.upsert_channel(None, channel).await?;
2204 db.upsert_ticket(None, ticket.clone()).await?;
2205
2206 let old_ticket =
2207 mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64() - 1, 1.0)?;
2208 db.upsert_ticket(None, old_ticket.clone()).await?;
2209
2210 let ticket_redeemed_log = SerializableLog {
2211 address: handlers.addresses.channels.into(),
2212 topics: vec![
2213 TicketRedeemedFilter::signature().into(),
2214 H256::from_slice(&channel.get_id().as_ref()).into(),
2215 ],
2216 data: Vec::from(next_ticket_index.to_be_bytes()).into(),
2217 ..test_log()
2218 };
2219
2220 let outgoing_ticket_index_before = db
2221 .get_outgoing_ticket_index(channel.get_id())
2222 .await?
2223 .load(Ordering::Relaxed);
2224
2225 let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2226 assert_eq!(
2227 BalanceType::HOPR.zero(),
2228 stats.redeemed_value,
2229 "there should not be any redeemed value"
2230 );
2231 assert_eq!(
2232 BalanceType::HOPR.zero(),
2233 stats.neglected_value,
2234 "there should not be any neglected value"
2235 );
2236
2237 let event_type = db
2238 .begin_transaction()
2239 .await?
2240 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log.into()).await }))
2241 .await?;
2242
2243 let channel = db
2244 .get_channel_by_id(None, &channel.get_id())
2245 .await?
2246 .context("a value should be present")?;
2247
2248 assert!(
2249 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, t)) if channel == c && t == Some(ticket)),
2250 "must return the updated channel entry and the redeemed ticket"
2251 );
2252
2253 assert_eq!(
2254 channel.ticket_index, next_ticket_index,
2255 "channel entry must contain next ticket index"
2256 );
2257
2258 let outgoing_ticket_index_after = db
2259 .get_outgoing_ticket_index(channel.get_id())
2260 .await?
2261 .load(Ordering::Relaxed);
2262
2263 assert_eq!(
2264 outgoing_ticket_index_before, outgoing_ticket_index_after,
2265 "outgoing ticket index must not change"
2266 );
2267
2268 let tickets = db.get_tickets((&channel).into()).await?;
2269 assert!(tickets.is_empty(), "there should not be any tickets left");
2270
2271 let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2272 assert_eq!(
2273 ticket_value, stats.redeemed_value,
2274 "there should be redeemed value worth 1 ticket"
2275 );
2276 assert_eq!(
2277 ticket_value, stats.neglected_value,
2278 "there should neglected value worth 1 ticket"
2279 );
2280 Ok(())
2281 }
2282
2283 #[async_std::test]
2284 async fn on_channel_ticket_redeemed_outgoing_channel() -> anyhow::Result<()> {
2285 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2286 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2287 .await?;
2288
2289 let handlers = init_handlers(db.clone());
2290
2291 let channel = ChannelEntry::new(
2292 *SELF_CHAIN_ADDRESS,
2293 *COUNTERPARTY_CHAIN_ADDRESS,
2294 Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR),
2295 U256::zero(),
2296 ChannelStatus::Open,
2297 U256::one(),
2298 );
2299
2300 let ticket_index = U256::from((1u128 << 48) - 1);
2301 let next_ticket_index = ticket_index + 1;
2302
2303 db.upsert_channel(None, channel).await?;
2304
2305 let ticket_redeemed_log = SerializableLog {
2306 address: handlers.addresses.channels.into(),
2307 topics: vec![
2308 TicketRedeemedFilter::signature().into(),
2309 H256::from_slice(channel.get_id().as_ref()).into(),
2310 ],
2311 data: Vec::from(next_ticket_index.to_be_bytes()).into(),
2312 ..test_log()
2313 };
2314
2315 let event_type = db
2316 .begin_transaction()
2317 .await?
2318 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log.into()).await }))
2319 .await?;
2320
2321 let channel = db
2322 .get_channel_by_id(None, &channel.get_id())
2323 .await?
2324 .context("a value should be present")?;
2325
2326 assert!(
2327 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if channel == c),
2328 "must return update channel entry and no ticket"
2329 );
2330
2331 assert_eq!(
2332 channel.ticket_index, next_ticket_index,
2333 "channel entry must contain next ticket index"
2334 );
2335
2336 let outgoing_ticket_index = db
2337 .get_outgoing_ticket_index(channel.get_id())
2338 .await?
2339 .load(Ordering::Relaxed);
2340
2341 assert!(
2342 outgoing_ticket_index >= ticket_index.as_u64(),
2343 "outgoing idx {outgoing_ticket_index} must be greater or equal to {ticket_index}"
2344 );
2345 assert_eq!(
2346 outgoing_ticket_index,
2347 next_ticket_index.as_u64(),
2348 "outgoing ticket index must be equal to next ticket index"
2349 );
2350 Ok(())
2351 }
2352
2353 #[async_std::test]
2354 async fn on_channel_ticket_redeemed_on_incoming_channel_with_non_existent_ticket_should_pass() -> anyhow::Result<()>
2355 {
2356 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2357 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2358 .await?;
2359
2360 let handlers = init_handlers(db.clone());
2361
2362 let channel = ChannelEntry::new(
2363 *COUNTERPARTY_CHAIN_ADDRESS,
2364 *SELF_CHAIN_ADDRESS,
2365 Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR),
2366 U256::zero(),
2367 ChannelStatus::Open,
2368 U256::one(),
2369 );
2370
2371 db.upsert_channel(None, channel).await?;
2372
2373 let next_ticket_index = U256::from((1u128 << 48) - 1);
2374
2375 let ticket_redeemed_log = SerializableLog {
2376 address: handlers.addresses.channels.into(),
2377 topics: vec![
2378 TicketRedeemedFilter::signature().into(),
2379 H256::from_slice(channel.get_id().as_ref()).into(),
2380 ],
2381 data: Vec::from(next_ticket_index.to_be_bytes()).into(),
2382 ..test_log()
2383 };
2384
2385 let event_type = db
2386 .begin_transaction()
2387 .await?
2388 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log.into()).await }))
2389 .await?;
2390
2391 let channel = db
2392 .get_channel_by_id(None, &channel.get_id())
2393 .await?
2394 .context("a value should be present")?;
2395
2396 assert!(
2397 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if c == channel),
2398 "must return updated channel entry and no ticket"
2399 );
2400
2401 assert_eq!(
2402 channel.ticket_index, next_ticket_index,
2403 "channel entry must contain next ticket index"
2404 );
2405 Ok(())
2406 }
2407
2408 #[async_std::test]
2409 async fn on_channel_ticket_redeemed_on_foreign_channel_should_pass() -> anyhow::Result<()> {
2410 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2411
2412 let handlers = init_handlers(db.clone());
2413
2414 let channel = ChannelEntry::new(
2415 Address::from(hopr_crypto_random::random_bytes()),
2416 Address::from(hopr_crypto_random::random_bytes()),
2417 Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR),
2418 U256::zero(),
2419 ChannelStatus::Open,
2420 U256::one(),
2421 );
2422
2423 db.upsert_channel(None, channel).await?;
2424
2425 let next_ticket_index = U256::from((1u128 << 48) - 1);
2426
2427 let ticket_redeemed_log = SerializableLog {
2428 address: handlers.addresses.channels.into(),
2429 topics: vec![
2430 TicketRedeemedFilter::signature().into(),
2431 H256::from_slice(channel.get_id().as_ref()).into(),
2432 ],
2433 data: Vec::from(next_ticket_index.to_be_bytes()).into(),
2434 ..test_log()
2435 };
2436
2437 let event_type = db
2438 .begin_transaction()
2439 .await?
2440 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log.into()).await }))
2441 .await?;
2442
2443 let channel = db
2444 .get_channel_by_id(None, &channel.get_id())
2445 .await?
2446 .context("a value should be present")?;
2447
2448 assert!(
2449 matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if c == channel),
2450 "must return updated channel entry and no ticket"
2451 );
2452
2453 assert_eq!(
2454 channel.ticket_index, next_ticket_index,
2455 "channel entry must contain next ticket index"
2456 );
2457 Ok(())
2458 }
2459
2460 #[async_std::test]
2461 async fn on_channel_closure_initiated() -> anyhow::Result<()> {
2462 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2463
2464 let handlers = init_handlers(db.clone());
2465
2466 let channel = ChannelEntry::new(
2467 *SELF_CHAIN_ADDRESS,
2468 *COUNTERPARTY_CHAIN_ADDRESS,
2469 Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR),
2470 U256::zero(),
2471 ChannelStatus::Open,
2472 U256::one(),
2473 );
2474
2475 db.upsert_channel(None, channel).await?;
2476
2477 let timestamp = SystemTime::now();
2478
2479 let closure_initiated_log = SerializableLog {
2480 address: handlers.addresses.channels.into(),
2481 topics: vec![
2482 OutgoingChannelClosureInitiatedFilter::signature().into(),
2483 H256::from_slice(channel.get_id().as_ref()).into(),
2484 ],
2485 data: Vec::from(U256::from(timestamp.as_unix_timestamp().as_secs()).to_be_bytes()).into(),
2486 ..test_log()
2487 };
2488
2489 let event_type = db
2490 .begin_transaction()
2491 .await?
2492 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, closure_initiated_log.into()).await }))
2493 .await?;
2494
2495 let channel = db
2496 .get_channel_by_id(None, &channel.get_id())
2497 .await?
2498 .context("a value should be present")?;
2499
2500 assert!(
2501 matches!(event_type, Some(ChainEventType::ChannelClosureInitiated(c)) if c == channel),
2502 "must return updated channel entry"
2503 );
2504
2505 assert_eq!(
2506 channel.status,
2507 ChannelStatus::PendingToClose(timestamp),
2508 "channel status must match"
2509 );
2510 Ok(())
2511 }
2512
2513 #[async_std::test]
2514 async fn on_node_safe_registry_registered() -> anyhow::Result<()> {
2515 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2516
2517 let handlers = init_handlers(db.clone());
2518
2519 let safe_registered_log = SerializableLog {
2520 address: handlers.addresses.safe_registry.into(),
2521 topics: vec![
2522 RegisteredNodeSafeFilter::signature().into(),
2523 H256::from_slice(&SAFE_INSTANCE_ADDR.to_bytes32()).into(),
2524 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2525 ],
2526 data: encode(&[]).into(),
2527 ..test_log()
2528 };
2529
2530 let event_type = db
2531 .begin_transaction()
2532 .await?
2533 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, safe_registered_log.into()).await }))
2534 .await?;
2535
2536 assert!(matches!(event_type, Some(ChainEventType::NodeSafeRegistered(addr)) if addr == *SAFE_INSTANCE_ADDR));
2537
2538 Ok(())
2540 }
2541
2542 #[async_std::test]
2543 async fn on_node_safe_registry_deregistered() -> anyhow::Result<()> {
2544 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2545
2546 let handlers = init_handlers(db.clone());
2547
2548 let safe_registered_log = SerializableLog {
2551 address: handlers.addresses.safe_registry.into(),
2552 topics: vec![
2553 DergisteredNodeSafeFilter::signature().into(),
2554 H256::from_slice(&SAFE_INSTANCE_ADDR.to_bytes32()).into(),
2555 H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2556 ],
2557 data: encode(&[]).into(),
2558 ..test_log()
2559 };
2560
2561 let event_type = db
2562 .begin_transaction()
2563 .await?
2564 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, safe_registered_log.into()).await }))
2565 .await?;
2566
2567 assert!(
2568 event_type.is_none(),
2569 "there's no associated chain event type with safe deregistration"
2570 );
2571
2572 Ok(())
2574 }
2575
2576 #[async_std::test]
2577 async fn ticket_price_update() -> anyhow::Result<()> {
2578 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2579
2580 let handlers = init_handlers(db.clone());
2581
2582 let price_change_log = SerializableLog {
2583 address: handlers.addresses.price_oracle.into(),
2584 topics: vec![TicketPriceUpdatedFilter::signature().into()],
2585 data: encode(&[Token::Uint(EthU256::from(1u64)), Token::Uint(EthU256::from(123u64))]).into(),
2586 ..test_log()
2587 };
2588
2589 assert_eq!(db.get_indexer_data(None).await?.ticket_price, None);
2590
2591 let event_type = db
2592 .begin_transaction()
2593 .await?
2594 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, price_change_log.into()).await }))
2595 .await?;
2596
2597 assert!(
2598 event_type.is_none(),
2599 "there's no associated chain event type with price oracle"
2600 );
2601
2602 assert_eq!(
2603 db.get_indexer_data(None).await?.ticket_price.map(|p| p.amount()),
2604 Some(U256::from(123u64))
2605 );
2606 Ok(())
2607 }
2608
2609 #[async_std::test]
2610 async fn minimum_win_prob_update() -> anyhow::Result<()> {
2611 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2612
2613 let handlers = init_handlers(db.clone());
2614
2615 let win_prob_change_log = SerializableLog {
2616 address: handlers.addresses.win_prob_oracle.into(),
2617 topics: vec![WinProbUpdatedFilter::signature().into()],
2618 data: encode(&[
2619 Token::Uint(EthU256::from(f64_to_win_prob(1.0)?.as_ref())),
2620 Token::Uint(EthU256::from(f64_to_win_prob(0.5)?.as_ref())),
2621 ])
2622 .into(),
2623 ..test_log()
2624 };
2625
2626 assert_eq!(
2627 db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
2628 1.0
2629 );
2630
2631 let event_type = db
2632 .begin_transaction()
2633 .await?
2634 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, win_prob_change_log.into()).await }))
2635 .await?;
2636
2637 assert!(
2638 event_type.is_none(),
2639 "there's no associated chain event type with winning probability change"
2640 );
2641
2642 assert_eq!(
2643 db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
2644 0.5
2645 );
2646 Ok(())
2647 }
2648
2649 #[async_std::test]
2650 async fn lowering_minimum_win_prob_update_should_reject_non_satisfying_unredeemed_tickets() -> anyhow::Result<()> {
2651 let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2652 db.set_minimum_incoming_ticket_win_prob(None, 0.1).await?;
2653
2654 let new_minimum = 0.5;
2655 let ticket_win_probs = [0.1, 1.0, 0.3, 0.2];
2656
2657 let channel_1 = ChannelEntry::new(
2658 *COUNTERPARTY_CHAIN_ADDRESS,
2659 *SELF_CHAIN_ADDRESS,
2660 Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR),
2661 3_u32.into(),
2662 ChannelStatus::Open,
2663 U256::one(),
2664 );
2665
2666 db.upsert_channel(None, channel_1.clone()).await?;
2667
2668 let ticket = mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, 1, ticket_win_probs[0])?;
2669 db.upsert_ticket(None, ticket).await?;
2670
2671 let ticket = mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, 2, ticket_win_probs[1])?;
2672 db.upsert_ticket(None, ticket).await?;
2673
2674 let tickets = db.get_tickets((&channel_1).into()).await?;
2675 assert_eq!(tickets.len(), 2);
2676
2677 let other_counterparty = ChainKeypair::random();
2680 let channel_2 = ChannelEntry::new(
2681 other_counterparty.public().to_address(),
2682 *SELF_CHAIN_ADDRESS,
2683 Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR),
2684 3_u32.into(),
2685 ChannelStatus::Open,
2686 U256::one(),
2687 );
2688
2689 db.upsert_channel(None, channel_2.clone()).await?;
2690
2691 let ticket = mock_acknowledged_ticket(&other_counterparty, &SELF_CHAIN_KEY, 1, ticket_win_probs[2])?;
2692 db.upsert_ticket(None, ticket).await?;
2693
2694 let ticket = mock_acknowledged_ticket(&other_counterparty, &SELF_CHAIN_KEY, 2, ticket_win_probs[3])?;
2695 db.upsert_ticket(None, ticket).await?;
2696
2697 let tickets = db.get_tickets((&channel_2).into()).await?;
2698 assert_eq!(tickets.len(), 2);
2699
2700 let stats = db.get_ticket_statistics(None).await?;
2701 assert_eq!(BalanceType::HOPR.zero(), stats.rejected_value);
2702
2703 let handlers = init_handlers(db.clone());
2704
2705 let win_prob_change_log = SerializableLog {
2706 address: handlers.addresses.win_prob_oracle.into(),
2707 topics: vec![WinProbUpdatedFilter::signature().into()],
2708 data: encode(&[
2709 Token::Uint(EthU256::from(f64_to_win_prob(0.1)?.as_ref())),
2710 Token::Uint(EthU256::from(f64_to_win_prob(new_minimum)?.as_ref())),
2711 ])
2712 .into(),
2713 ..test_log()
2714 };
2715
2716 let event_type = db
2717 .begin_transaction()
2718 .await?
2719 .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, win_prob_change_log.into()).await }))
2720 .await?;
2721
2722 assert!(
2723 event_type.is_none(),
2724 "there's no associated chain event type with winning probability change"
2725 );
2726
2727 assert_eq!(
2728 db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
2729 new_minimum
2730 );
2731
2732 let tickets = db.get_tickets((&channel_1).into()).await?;
2733 assert_eq!(tickets.len(), 1);
2734
2735 let tickets = db.get_tickets((&channel_2).into()).await?;
2736 assert_eq!(tickets.len(), 0);
2737
2738 let stats = db.get_ticket_statistics(None).await?;
2739 let rejected_value: U256 = ticket_win_probs
2740 .iter()
2741 .filter(|p| **p < new_minimum)
2742 .map(|p| U256::from(PRICE_PER_PACKET).div_f64(*p).expect("must divide"))
2743 .reduce(|a, b| a + b)
2744 .ok_or(anyhow!("must sum"))?;
2745
2746 assert_eq!(BalanceType::HOPR.balance(rejected_value), stats.rejected_value);
2747
2748 Ok(())
2749 }
2750}