hopr_chain_indexer/
handlers.rs

1use async_trait::async_trait;
2use ethers::contract::EthLogDecode;
3use ethers::types::H256;
4use std::cmp::Ordering;
5use std::fmt::Formatter;
6use std::ops::{Add, Sub};
7use std::sync::Arc;
8use std::time::{Duration, SystemTime};
9use tracing::{debug, error, info, trace, warn};
10
11use hopr_bindings::{
12    hopr_announcements::HoprAnnouncementsEvents, hopr_channels::HoprChannelsEvents,
13    hopr_network_registry::HoprNetworkRegistryEvents, hopr_node_management_module::HoprNodeManagementModuleEvents,
14    hopr_node_safe_registry::HoprNodeSafeRegistryEvents, hopr_ticket_price_oracle::HoprTicketPriceOracleEvents,
15    hopr_token::HoprTokenEvents, hopr_winning_probability_oracle::HoprWinningProbabilityOracleEvents,
16};
17use hopr_chain_rpc::{BlockWithLogs, Log};
18use hopr_chain_types::chain_events::{ChainEventType, NetworkRegistryStatus, SignificantChainEvent};
19use hopr_chain_types::ContractAddresses;
20use hopr_crypto_types::keypairs::ChainKeypair;
21use hopr_crypto_types::prelude::{Hash, Keypair};
22use hopr_crypto_types::types::OffchainSignature;
23use hopr_db_sql::api::info::DomainSeparator;
24use hopr_db_sql::api::tickets::TicketSelector;
25use hopr_db_sql::errors::DbSqlError;
26use hopr_db_sql::prelude::TicketMarker;
27use hopr_db_sql::{HoprDbAllOperations, OpenTransaction};
28use hopr_internal_types::prelude::*;
29use hopr_primitive_types::prelude::*;
30
31use crate::errors::{CoreEthereumIndexerError, Result};
32
33#[cfg(all(feature = "prometheus", not(test)))]
34lazy_static::lazy_static! {
35    static ref METRIC_INDEXER_LOG_COUNTERS: hopr_metrics::MultiCounter =
36        hopr_metrics::MultiCounter::new(
37            "hopr_indexer_contract_log_count",
38            "Counts of different HOPR contract logs processed by the Indexer",
39            &["contract"]
40    ).unwrap();
41}
42
43/// Event handling an object for on-chain operations
44///
45/// Once an on-chain operation is recorded by the [crate::block::Indexer], it is pre-processed
46/// and passed on to this object that handles event-specific actions for each on-chain operation.
47///
48#[derive(Clone)]
49pub struct ContractEventHandlers<Db: Clone> {
50    /// channels, announcements, network_registry, token: contract addresses
51    /// whose event we process
52    addresses: Arc<ContractAddresses>,
53    /// Safe on-chain address which we are monitoring
54    safe_address: Address,
55    /// own address, aka message sender
56    chain_key: ChainKeypair, // TODO: store only address here once Ticket have TryFrom DB
57    /// callbacks to inform other modules
58    db: Db,
59}
60
61impl<Db: Clone> std::fmt::Debug for ContractEventHandlers<Db> {
62    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63        f.debug_struct("ContractEventHandler")
64            .field("addresses", &self.addresses)
65            .field("safe_address", &self.safe_address)
66            .field("chain_key", &self.chain_key)
67            .finish_non_exhaustive()
68    }
69}
70
71impl<Db> ContractEventHandlers<Db>
72where
73    Db: HoprDbAllOperations + Clone,
74{
75    pub fn new(addresses: ContractAddresses, safe_address: Address, chain_key: ChainKeypair, db: Db) -> Self {
76        Self {
77            addresses: Arc::new(addresses),
78            safe_address,
79            chain_key,
80            db,
81        }
82    }
83
84    async fn on_announcement_event(
85        &self,
86        tx: &OpenTransaction,
87        event: HoprAnnouncementsEvents,
88        block_number: u32,
89    ) -> Result<Option<ChainEventType>> {
90        #[cfg(all(feature = "prometheus", not(test)))]
91        METRIC_INDEXER_LOG_COUNTERS.increment(&["announcements"]);
92
93        match event {
94            HoprAnnouncementsEvents::AddressAnnouncementFilter(address_announcement) => {
95                trace!(
96                    multiaddress = &address_announcement.base_multiaddr,
97                    address = &address_announcement.node.to_string(),
98                    "on_announcement_event",
99                );
100                // safeguard against empty multiaddrs, skip
101                if address_announcement.base_multiaddr.is_empty() {
102                    warn!(
103                        address = ?address_announcement.node,
104                        "encountered empty multiaddress announcement",
105                    );
106                    return Ok(None);
107                }
108                let node_address: Address = address_announcement.node.into();
109
110                return match self
111                    .db
112                    .insert_announcement(
113                        Some(tx),
114                        node_address,
115                        address_announcement.base_multiaddr.parse()?,
116                        block_number,
117                    )
118                    .await
119                {
120                    Ok(account) => Ok(Some(ChainEventType::Announcement {
121                        peer: account.public_key.into(),
122                        address: account.chain_addr,
123                        multiaddresses: vec![account.get_multiaddr().expect("not must contain multiaddr")],
124                    })),
125                    Err(DbSqlError::MissingAccount) => Err(CoreEthereumIndexerError::AnnounceBeforeKeyBinding),
126                    Err(e) => Err(e.into()),
127                };
128            }
129            HoprAnnouncementsEvents::KeyBindingFilter(key_binding) => {
130                match KeyBinding::from_parts(
131                    key_binding.chain_key.into(),
132                    key_binding.ed_25519_pub_key.try_into()?,
133                    OffchainSignature::try_from((key_binding.ed_25519_sig_0, key_binding.ed_25519_sig_1))?,
134                ) {
135                    Ok(binding) => {
136                        self.db
137                            .insert_account(
138                                Some(tx),
139                                AccountEntry::new(binding.packet_key, binding.chain_key, AccountType::NotAnnounced),
140                            )
141                            .await?;
142                    }
143                    Err(e) => {
144                        warn!(
145                            address = ?key_binding.chain_key,
146                            error = %e,
147                            "Filtering announcement with invalid signature",
148
149                        )
150                    }
151                }
152            }
153            HoprAnnouncementsEvents::RevokeAnnouncementFilter(revocation) => {
154                let node_address: Address = revocation.node.into();
155                match self.db.delete_all_announcements(Some(tx), node_address).await {
156                    Err(DbSqlError::MissingAccount) => {
157                        return Err(CoreEthereumIndexerError::RevocationBeforeKeyBinding)
158                    }
159                    Err(e) => return Err(e.into()),
160                    _ => {}
161                }
162            }
163        };
164
165        Ok(None)
166    }
167
168    async fn on_channel_event(
169        &self,
170        tx: &OpenTransaction,
171        event: HoprChannelsEvents,
172    ) -> Result<Option<ChainEventType>> {
173        #[cfg(all(feature = "prometheus", not(test)))]
174        METRIC_INDEXER_LOG_COUNTERS.increment(&["channels"]);
175
176        match event {
177            HoprChannelsEvents::ChannelBalanceDecreasedFilter(balance_decreased) => {
178                let maybe_channel = self
179                    .db
180                    .begin_channel_update(tx.into(), &balance_decreased.channel_id.into())
181                    .await?;
182
183                if let Some(channel_edits) = maybe_channel {
184                    let new_balance = Balance::new(balance_decreased.new_balance, BalanceType::HOPR);
185                    let diff = channel_edits.entry().balance.sub(&new_balance);
186
187                    let updated_channel = self
188                        .db
189                        .finish_channel_update(tx.into(), channel_edits.change_balance(new_balance))
190                        .await?;
191
192                    Ok(Some(ChainEventType::ChannelBalanceDecreased(updated_channel, diff)))
193                } else {
194                    error!(channel_id = %Hash::from(balance_decreased.channel_id), "observed balance decreased event for a channel that does not exist");
195                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
196                }
197            }
198            HoprChannelsEvents::ChannelBalanceIncreasedFilter(balance_increased) => {
199                let maybe_channel = self
200                    .db
201                    .begin_channel_update(tx.into(), &balance_increased.channel_id.into())
202                    .await?;
203
204                if let Some(channel_edits) = maybe_channel {
205                    let new_balance = Balance::new(balance_increased.new_balance, BalanceType::HOPR);
206                    let diff = new_balance.sub(&channel_edits.entry().balance);
207
208                    let updated_channel = self
209                        .db
210                        .finish_channel_update(tx.into(), channel_edits.change_balance(new_balance))
211                        .await?;
212
213                    Ok(Some(ChainEventType::ChannelBalanceIncreased(updated_channel, diff)))
214                } else {
215                    error!(channel_id = %Hash::from(balance_increased.channel_id), "observed balance increased event for a channel that does not exist");
216                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
217                }
218            }
219            HoprChannelsEvents::ChannelClosedFilter(channel_closed) => {
220                let maybe_channel = self
221                    .db
222                    .begin_channel_update(tx.into(), &channel_closed.channel_id.into())
223                    .await?;
224
225                trace!(
226                    "on_channel_closed_event - channel_id: {:?} - channel known: {:?}",
227                    channel_closed.channel_id,
228                    maybe_channel.is_some()
229                );
230
231                if let Some(channel_edits) = maybe_channel {
232                    let channel_id = channel_edits.entry().get_id();
233                    let orientation = channel_edits.entry().orientation(&self.chain_key.public().to_address());
234
235                    // If the channel is our own (incoming or outgoing) reset its fields
236                    // and change its state to Closed.
237                    let updated_channel = if let Some((direction, _)) = orientation {
238                        // Set all channel fields like we do on-chain on close
239                        let channel_edits = channel_edits
240                            .change_status(ChannelStatus::Closed)
241                            .change_balance(BalanceType::HOPR.zero())
242                            .change_ticket_index(0);
243
244                        let updated_channel = self.db.finish_channel_update(tx.into(), channel_edits).await?;
245
246                        // Perform additional tasks based on the channel's direction
247                        match direction {
248                            ChannelDirection::Incoming => {
249                                // On incoming channel, mark all unredeemed tickets as neglected
250                                self.db
251                                    .mark_tickets_as(updated_channel.into(), TicketMarker::Neglected)
252                                    .await?;
253                            }
254                            ChannelDirection::Outgoing => {
255                                // On outgoing channels, reset the current_ticket_index to zero
256                                self.db.reset_outgoing_ticket_index(channel_id).await?;
257                            }
258                        }
259                        updated_channel
260                    } else {
261                        // Closed channels that are not our own, we be safely removed
262                        // from the database
263                        let updated_channel = self.db.finish_channel_update(tx.into(), channel_edits.delete()).await?;
264                        debug!(channel_id = %channel_id, "foreign closed closed channel was deleted");
265                        updated_channel
266                    };
267
268                    Ok(Some(ChainEventType::ChannelClosed(updated_channel)))
269                } else {
270                    error!(channel_id = %Hash::from(channel_closed.channel_id), "observed closure finalization event for a channel that does not exist");
271                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
272                }
273            }
274            HoprChannelsEvents::ChannelOpenedFilter(channel_opened) => {
275                let source: Address = channel_opened.source.into();
276                let destination: Address = channel_opened.destination.into();
277                let channel_id = generate_channel_id(&source, &destination);
278
279                let maybe_channel = self.db.begin_channel_update(tx.into(), &channel_id).await?;
280
281                let channel = if let Some(channel_edits) = maybe_channel {
282                    // Check that we're not receiving the Open event without the channel being Close prior
283                    if channel_edits.entry().status != ChannelStatus::Closed {
284                        return Err(CoreEthereumIndexerError::ProcessError(format!(
285                            "trying to re-open channel {} which is not closed, but {}",
286                            channel_edits.entry().get_id(),
287                            channel_edits.entry().status,
288                        )));
289                    }
290
291                    trace!(%source, %destination, %channel_id, "on_channel_reopened_event");
292
293                    let current_epoch = channel_edits.entry().channel_epoch;
294
295                    // cleanup tickets from previous epochs on channel re-opening
296                    if source == self.chain_key.public().to_address()
297                        || destination == self.chain_key.public().to_address()
298                    {
299                        self.db
300                            .mark_tickets_as(TicketSelector::new(channel_id, current_epoch), TicketMarker::Neglected)
301                            .await?;
302
303                        self.db.reset_outgoing_ticket_index(channel_id).await?;
304                    }
305
306                    // set all channel fields like we do on-chain on close
307                    self.db
308                        .finish_channel_update(
309                            tx.into(),
310                            channel_edits
311                                .change_ticket_index(0_u32)
312                                .change_epoch(current_epoch.add(1))
313                                .change_status(ChannelStatus::Open),
314                        )
315                        .await?
316                } else {
317                    trace!(%source, %destination, %channel_id, "on_channel_opened_event");
318
319                    let new_channel = ChannelEntry::new(
320                        source,
321                        destination,
322                        BalanceType::HOPR.zero(),
323                        0_u32.into(),
324                        ChannelStatus::Open,
325                        1_u32.into(),
326                    );
327
328                    self.db.upsert_channel(tx.into(), new_channel).await?;
329                    new_channel
330                };
331
332                Ok(Some(ChainEventType::ChannelOpened(channel)))
333            }
334            HoprChannelsEvents::TicketRedeemedFilter(ticket_redeemed) => {
335                let maybe_channel = self
336                    .db
337                    .begin_channel_update(tx.into(), &ticket_redeemed.channel_id.into())
338                    .await?;
339
340                if let Some(channel_edits) = maybe_channel {
341                    let ack_ticket = match channel_edits.entry().direction(&self.chain_key.public().to_address()) {
342                        // For channels where destination is us, it means that our ticket
343                        // has been redeemed, so mark it in the DB as redeemed
344                        Some(ChannelDirection::Incoming) => {
345                            // Filter all BeingRedeemed tickets in this channel and its current epoch
346                            let mut matching_tickets = self
347                                .db
348                                .get_tickets(
349                                    TicketSelector::from(channel_edits.entry())
350                                        .with_state(AcknowledgedTicketStatus::BeingRedeemed),
351                                )
352                                .await?
353                                .into_iter()
354                                .filter(|ticket| {
355                                    // The ticket that has been redeemed at this point has: index + index_offset - 1 == new_ticket_index - 1
356                                    // Since unaggregated tickets have index_offset = 1, for the unagg case this leads to: index == new_ticket_index - 1
357                                    let ticket_idx = ticket.verified_ticket().index;
358                                    let ticket_off = ticket.verified_ticket().index_offset as u64;
359
360                                    ticket_idx + ticket_off == ticket_redeemed.new_ticket_index
361                                })
362                                .collect::<Vec<_>>();
363
364                            match matching_tickets.len().cmp(&1) {
365                                Ordering::Equal => {
366                                    let ack_ticket = matching_tickets.pop().unwrap();
367
368                                    self.db
369                                        .mark_tickets_as((&ack_ticket).into(), TicketMarker::Redeemed)
370                                        .await?;
371                                    info!(%ack_ticket, "ticket marked as redeemed");
372                                    Some(ack_ticket)
373                                }
374                                Ordering::Less => {
375                                    error!(
376                                        idx = %ticket_redeemed.new_ticket_index - 1,
377                                        entry = %channel_edits.entry(),
378                                        "could not find acknowledged 'BeingRedeemed' ticket",
379                                    );
380                                    // This is not an error, because the ticket might've become neglected before
381                                    // the ticket redemption could finish
382                                    None
383                                }
384                                Ordering::Greater => {
385                                    error!(
386                                        count = matching_tickets.len(),
387                                        index = %ticket_redeemed.new_ticket_index - 1,
388                                        entry = %channel_edits.entry(),
389                                        "found tickets matching 'BeingRedeemed'",
390                                    );
391                                    return Err(CoreEthereumIndexerError::ProcessError(format!(
392                                        "multiple tickets matching idx {} found in {}",
393                                        ticket_redeemed.new_ticket_index - 1,
394                                        channel_edits.entry()
395                                    )));
396                                }
397                            }
398                        }
399                        // For the channel where the source is us, it means a ticket that we
400                        // issue has been redeemed.
401                        // So we just need to be sure our outgoing ticket
402                        // index value in the cache is at least the index of the redeemed ticket
403                        Some(ChannelDirection::Outgoing) => {
404                            // We need to ensure the outgoing ticket index is at least this new value
405                            debug!(channel = %channel_edits.entry(), "observed redeem event on an outgoing channel");
406                            self.db
407                                .compare_and_set_outgoing_ticket_index(
408                                    channel_edits.entry().get_id(),
409                                    ticket_redeemed.new_ticket_index,
410                                )
411                                .await?;
412                            None
413                        }
414                        // For a channel where neither source nor destination is us, we don't care
415                        None => {
416                            // Not our redeem event
417                            debug!(channel = %channel_edits.entry(), "observed redeem event on a foreign channel");
418                            None
419                        }
420                    };
421
422                    // Update the ticket index on the Channel entry and get the updated model
423                    let channel = self
424                        .db
425                        .finish_channel_update(
426                            tx.into(),
427                            channel_edits.change_ticket_index(ticket_redeemed.new_ticket_index),
428                        )
429                        .await?;
430
431                    // Neglect all the tickets in this channel
432                    // which have a lower ticket index than `ticket_redeemed.new_ticket_index`
433                    self.db
434                        .mark_tickets_as(
435                            TicketSelector::from(&channel).with_index_range(..ticket_redeemed.new_ticket_index),
436                            TicketMarker::Neglected,
437                        )
438                        .await?;
439
440                    Ok(Some(ChainEventType::TicketRedeemed(channel, ack_ticket)))
441                } else {
442                    error!(channel_id = %Hash::from(ticket_redeemed.channel_id), "observed ticket redeem on a channel that we don't have in the DB");
443                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
444                }
445            }
446            HoprChannelsEvents::OutgoingChannelClosureInitiatedFilter(closure_initiated) => {
447                let maybe_channel = self
448                    .db
449                    .begin_channel_update(tx.into(), &closure_initiated.channel_id.into())
450                    .await?;
451
452                if let Some(channel_edits) = maybe_channel {
453                    let new_status = ChannelStatus::PendingToClose(
454                        SystemTime::UNIX_EPOCH.add(Duration::from_secs(closure_initiated.closure_time as u64)),
455                    );
456
457                    let channel = self
458                        .db
459                        .finish_channel_update(tx.into(), channel_edits.change_status(new_status))
460                        .await?;
461                    Ok(Some(ChainEventType::ChannelClosureInitiated(channel)))
462                } else {
463                    error!(channel_id = %Hash::from(closure_initiated.channel_id), "observed channel closure initiation on a channel that we don't have in the DB");
464                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
465                }
466            }
467            HoprChannelsEvents::DomainSeparatorUpdatedFilter(domain_separator_updated) => {
468                self.db
469                    .set_domain_separator(
470                        Some(tx),
471                        DomainSeparator::Channel,
472                        domain_separator_updated.domain_separator.into(),
473                    )
474                    .await?;
475
476                Ok(None)
477            }
478            HoprChannelsEvents::LedgerDomainSeparatorUpdatedFilter(ledger_domain_separator_updated) => {
479                self.db
480                    .set_domain_separator(
481                        Some(tx),
482                        DomainSeparator::Ledger,
483                        ledger_domain_separator_updated.ledger_domain_separator.into(),
484                    )
485                    .await?;
486
487                Ok(None)
488            }
489        }
490    }
491
492    async fn on_token_event(&self, tx: &OpenTransaction, event: HoprTokenEvents) -> Result<Option<ChainEventType>> {
493        #[cfg(all(feature = "prometheus", not(test)))]
494        METRIC_INDEXER_LOG_COUNTERS.increment(&["token"]);
495
496        match event {
497            HoprTokenEvents::TransferFilter(transferred) => {
498                let from: Address = transferred.from.into();
499                let to: Address = transferred.to.into();
500
501                trace!(
502                    safe_address = %&self.safe_address, %from, %to,
503                    "on_token_transfer_event"
504                );
505
506                let mut current_balance = self.db.get_safe_hopr_balance(Some(tx)).await?;
507                let transferred_value = transferred.value;
508
509                if to.ne(&self.safe_address) && from.ne(&self.safe_address) {
510                    return Ok(None);
511                } else if to.eq(&self.safe_address) {
512                    // This + is internally defined as saturating add
513                    info!(?current_balance, added_value = %transferred_value, "Safe balance increased ");
514                    current_balance = current_balance + transferred_value;
515                } else if from.eq(&self.safe_address) {
516                    // This - is internally defined as saturating sub
517                    info!(?current_balance, removed_value = %transferred_value, "Safe balance decreased");
518                    current_balance = current_balance - transferred_value;
519                }
520
521                self.db.set_safe_hopr_balance(Some(tx), current_balance).await?;
522            }
523            HoprTokenEvents::ApprovalFilter(approved) => {
524                let owner: Address = approved.owner.into();
525                let spender: Address = approved.spender.into();
526
527                trace!(
528                    address = %&self.safe_address, %owner, %spender, allowance = %approved.value,
529                    "on_token_approval_event",
530
531                );
532
533                // if approval is for tokens on Safe contract to be spent by HoprChannels
534                if owner.eq(&self.safe_address) && spender.eq(&self.addresses.channels) {
535                    self.db
536                        .set_safe_hopr_allowance(Some(tx), BalanceType::HOPR.balance(approved.value))
537                        .await?;
538                } else {
539                    return Ok(None);
540                }
541            }
542            _ => error!("Implement all the other filters for HoprTokenEvents"),
543        }
544
545        Ok(None)
546    }
547
548    async fn on_network_registry_event(
549        &self,
550        tx: &OpenTransaction,
551        event: HoprNetworkRegistryEvents,
552    ) -> Result<Option<ChainEventType>> {
553        #[cfg(all(feature = "prometheus", not(test)))]
554        METRIC_INDEXER_LOG_COUNTERS.increment(&["network_registry"]);
555
556        match event {
557            HoprNetworkRegistryEvents::DeregisteredByManagerFilter(deregistered) => {
558                let node_address: Address = deregistered.node_address.into();
559                self.db
560                    .set_access_in_network_registry(Some(tx), node_address, false)
561                    .await?;
562
563                return Ok(Some(ChainEventType::NetworkRegistryUpdate(
564                    node_address,
565                    NetworkRegistryStatus::Denied,
566                )));
567            }
568            HoprNetworkRegistryEvents::DeregisteredFilter(deregistered) => {
569                let node_address: Address = deregistered.node_address.into();
570                self.db
571                    .set_access_in_network_registry(Some(tx), node_address, false)
572                    .await?;
573
574                return Ok(Some(ChainEventType::NetworkRegistryUpdate(
575                    node_address,
576                    NetworkRegistryStatus::Denied,
577                )));
578            }
579            HoprNetworkRegistryEvents::RegisteredByManagerFilter(registered) => {
580                let node_address: Address = registered.node_address.into();
581                self.db
582                    .set_access_in_network_registry(Some(tx), node_address, true)
583                    .await?;
584
585                if node_address == self.chain_key.public().to_address() {
586                    info!("This node has been added to the registry, node activation process continues on: http://hub.hoprnet.org/.");
587                }
588
589                return Ok(Some(ChainEventType::NetworkRegistryUpdate(
590                    node_address,
591                    NetworkRegistryStatus::Allowed,
592                )));
593            }
594            HoprNetworkRegistryEvents::RegisteredFilter(registered) => {
595                let node_address: Address = registered.node_address.into();
596                self.db
597                    .set_access_in_network_registry(Some(tx), node_address, true)
598                    .await?;
599
600                if node_address == self.chain_key.public().to_address() {
601                    info!("This node has been added to the registry, node can now continue the node activation process on: http://hub.hoprnet.org/.");
602                }
603
604                return Ok(Some(ChainEventType::NetworkRegistryUpdate(
605                    node_address,
606                    NetworkRegistryStatus::Allowed,
607                )));
608            }
609            HoprNetworkRegistryEvents::EligibilityUpdatedFilter(eligibility_updated) => {
610                let account: Address = eligibility_updated.staking_account.into();
611                self.db
612                    .set_safe_eligibility(Some(tx), account, eligibility_updated.eligibility)
613                    .await?;
614            }
615            HoprNetworkRegistryEvents::NetworkRegistryStatusUpdatedFilter(enabled) => {
616                self.db
617                    .set_network_registry_enabled(Some(tx), enabled.is_enabled)
618                    .await?;
619            }
620            _ => {} // Not important to at the moment
621        };
622
623        Ok(None)
624    }
625
626    async fn on_node_safe_registry_event(
627        &self,
628        tx: &OpenTransaction,
629        event: HoprNodeSafeRegistryEvents,
630    ) -> Result<Option<ChainEventType>> {
631        #[cfg(all(feature = "prometheus", not(test)))]
632        METRIC_INDEXER_LOG_COUNTERS.increment(&["safe_registry"]);
633
634        match event {
635            HoprNodeSafeRegistryEvents::RegisteredNodeSafeFilter(registered) => {
636                if self.chain_key.public().to_address() == registered.node_address.into() {
637                    info!(safe_address = %registered.safe_address, "Node safe registered", );
638                    // NOTE: we don't store this state in the DB
639                    return Ok(Some(ChainEventType::NodeSafeRegistered(registered.safe_address.into())));
640                }
641            }
642            HoprNodeSafeRegistryEvents::DergisteredNodeSafeFilter(deregistered) => {
643                if self.chain_key.public().to_address() == deregistered.node_address.into() {
644                    info!("Node safe unregistered");
645                    // NOTE: we don't store this state in the DB
646                }
647            }
648            HoprNodeSafeRegistryEvents::DomainSeparatorUpdatedFilter(domain_separator_updated) => {
649                self.db
650                    .set_domain_separator(
651                        Some(tx),
652                        DomainSeparator::SafeRegistry,
653                        domain_separator_updated.domain_separator.into(),
654                    )
655                    .await?;
656            }
657        }
658
659        Ok(None)
660    }
661
662    async fn on_node_management_module_event(
663        &self,
664        _db: &OpenTransaction,
665        _event: HoprNodeManagementModuleEvents,
666    ) -> Result<Option<ChainEventType>> {
667        #[cfg(all(feature = "prometheus", not(test)))]
668        METRIC_INDEXER_LOG_COUNTERS.increment(&["node_management_module"]);
669
670        // Don't care at the moment
671        Ok(None)
672    }
673
674    async fn on_ticket_winning_probability_oracle_event(
675        &self,
676        tx: &OpenTransaction,
677        event: HoprWinningProbabilityOracleEvents,
678    ) -> Result<Option<ChainEventType>> {
679        #[cfg(all(feature = "prometheus", not(test)))]
680        METRIC_INDEXER_LOG_COUNTERS.increment(&["win_prob_oracle"]);
681
682        match event {
683            HoprWinningProbabilityOracleEvents::WinProbUpdatedFilter(update) => {
684                let mut encoded_old: EncodedWinProb = Default::default();
685                encoded_old.copy_from_slice(&update.old_win_prob.to_be_bytes()[1..]);
686                let old_minimum_win_prob = win_prob_to_f64(&encoded_old);
687
688                let mut encoded_new: EncodedWinProb = Default::default();
689                encoded_new.copy_from_slice(&update.new_win_prob.to_be_bytes()[1..]);
690                let new_minimum_win_prob = win_prob_to_f64(&encoded_new);
691
692                trace!(
693                    old = old_minimum_win_prob,
694                    new = new_minimum_win_prob,
695                    "on_ticket_minimum_win_prob_updated",
696                );
697
698                self.db
699                    .set_minimum_incoming_ticket_win_prob(Some(tx), new_minimum_win_prob)
700                    .await?;
701
702                info!(
703                    old = old_minimum_win_prob,
704                    new = new_minimum_win_prob,
705                    "minimum ticket winning probability updated"
706                );
707
708                // If the old minimum was less strict, we need to mark of all the
709                // tickets below the new higher minimum as rejected
710                if old_minimum_win_prob < new_minimum_win_prob {
711                    let mut selector: Option<TicketSelector> = None;
712                    for channel in self.db.get_incoming_channels(tx.into()).await? {
713                        selector = selector
714                            .map(|s| s.also_on_channel(channel.get_id(), channel.channel_epoch))
715                            .or_else(|| Some(TicketSelector::from(channel)));
716                    }
717                    // Reject unredeemed tickets on all the channels with win prob lower than the new one
718                    if let Some(selector) = selector {
719                        let num_rejected = self
720                            .db
721                            .mark_tickets_as(selector.with_winning_probability(..encoded_new), TicketMarker::Rejected)
722                            .await?;
723                        info!(count = num_rejected, "unredeemed tickets were rejected, because the minimum winning probability has been increased");
724                    }
725                }
726            }
727            _ => {
728                // Ignore other events
729            }
730        }
731        Ok(None)
732    }
733
734    async fn on_ticket_price_oracle_event(
735        &self,
736        tx: &OpenTransaction,
737        event: HoprTicketPriceOracleEvents,
738    ) -> Result<Option<ChainEventType>> {
739        #[cfg(all(feature = "prometheus", not(test)))]
740        METRIC_INDEXER_LOG_COUNTERS.increment(&["price_oracle"]);
741
742        match event {
743            HoprTicketPriceOracleEvents::TicketPriceUpdatedFilter(update) => {
744                trace!(
745                    old = update.0.to_string(),
746                    new = update.1.to_string(),
747                    "on_ticket_price_updated",
748                );
749
750                self.db
751                    .update_ticket_price(Some(tx), BalanceType::HOPR.balance(update.1))
752                    .await?;
753
754                info!(price = %update.1, "ticket price updated");
755            }
756            HoprTicketPriceOracleEvents::OwnershipTransferredFilter(_event) => {
757                // ignore ownership transfer event
758            }
759        }
760        Ok(None)
761    }
762
763    #[tracing::instrument(level = "debug", skip(self))]
764    async fn process_log_event(&self, tx: &OpenTransaction, slog: SerializableLog) -> Result<Option<ChainEventType>> {
765        trace!(log = %slog, "log content");
766        let log = Log::from(slog);
767
768        if log.address.eq(&self.addresses.announcements) {
769            let bn = log.block_number as u32;
770            let event = HoprAnnouncementsEvents::decode_log(&log.into())?;
771            self.on_announcement_event(tx, event, bn).await
772        } else if log.address.eq(&self.addresses.channels) {
773            let event = HoprChannelsEvents::decode_log(&log.into())?;
774            self.on_channel_event(tx, event).await
775        } else if log.address.eq(&self.addresses.network_registry) {
776            let event = HoprNetworkRegistryEvents::decode_log(&log.into())?;
777            self.on_network_registry_event(tx, event).await
778        } else if log.address.eq(&self.addresses.token) {
779            let event = HoprTokenEvents::decode_log(&log.into())?;
780            self.on_token_event(tx, event).await
781        } else if log.address.eq(&self.addresses.safe_registry) {
782            let event = HoprNodeSafeRegistryEvents::decode_log(&log.into())?;
783            self.on_node_safe_registry_event(tx, event).await
784        } else if log.address.eq(&self.addresses.module_implementation) {
785            let event = HoprNodeManagementModuleEvents::decode_log(&log.into())?;
786            self.on_node_management_module_event(tx, event).await
787        } else if log.address.eq(&self.addresses.price_oracle) {
788            let event = HoprTicketPriceOracleEvents::decode_log(&log.into())?;
789            self.on_ticket_price_oracle_event(tx, event).await
790        } else if log.address.eq(&self.addresses.win_prob_oracle) {
791            let event = HoprWinningProbabilityOracleEvents::decode_log(&log.into())?;
792            self.on_ticket_winning_probability_oracle_event(tx, event).await
793        } else {
794            #[cfg(all(feature = "prometheus", not(test)))]
795            METRIC_INDEXER_LOG_COUNTERS.increment(&["unknown"]);
796
797            error!(
798                address = %log.address, log = ?log,
799                "on_event error - unknown contract address, received log"
800            );
801            return Err(CoreEthereumIndexerError::UnknownContract(log.address));
802        }
803    }
804}
805
806#[async_trait]
807impl<Db> crate::traits::ChainLogHandler for ContractEventHandlers<Db>
808where
809    Db: HoprDbAllOperations + Clone + Send + Sync + 'static,
810{
811    fn contract_addresses(&self) -> Vec<Address> {
812        vec![
813            self.addresses.announcements,
814            self.addresses.channels,
815            self.addresses.module_implementation,
816            self.addresses.network_registry,
817            self.addresses.price_oracle,
818            self.addresses.win_prob_oracle,
819            self.addresses.safe_registry,
820            self.addresses.token,
821        ]
822    }
823
824    fn contract_address_topics(&self, contract: Address) -> Vec<H256> {
825        if contract.eq(&self.addresses.announcements) {
826            crate::constants::topics::announcement()
827        } else if contract.eq(&self.addresses.channels) {
828            crate::constants::topics::channel()
829        } else if contract.eq(&self.addresses.module_implementation) {
830            crate::constants::topics::module_implementation()
831        } else if contract.eq(&self.addresses.network_registry) {
832            crate::constants::topics::network_registry()
833        } else if contract.eq(&self.addresses.price_oracle) {
834            crate::constants::topics::ticket_price_oracle()
835        } else if contract.eq(&self.addresses.win_prob_oracle) {
836            crate::constants::topics::winning_prob_oracle()
837        } else if contract.eq(&self.addresses.safe_registry) {
838            crate::constants::topics::node_safe_registry()
839        } else if contract.eq(&self.addresses.token) {
840            crate::constants::topics::token()
841        } else {
842            vec![]
843        }
844    }
845
846    async fn collect_block_events(&self, block_with_logs: BlockWithLogs) -> Result<Vec<SignificantChainEvent>> {
847        let myself = self.clone();
848        self.db
849            .begin_transaction()
850            .await?
851            .perform(|tx| {
852                Box::pin(async move {
853                    // In the worst case, each log contains a single event
854                    let mut ret = Vec::with_capacity(block_with_logs.logs.len());
855
856                    // Process all logs in the block
857                    for log in block_with_logs.logs {
858                        let tx_hash = Hash::from(log.tx_hash);
859                        let log_id = log.log_index;
860                        let block_id = log.block_number;
861
862                        match myself.process_log_event(tx, log).await {
863                            // If a significant chain event can be extracted from the log, push it
864                            Ok(Some(event_type)) => {
865                                let significant_event = SignificantChainEvent { tx_hash, event_type };
866                                debug!(block_id, %tx_hash, log_id, ?significant_event, "indexer got significant_event");
867                                ret.push(significant_event);
868                            }
869                            Ok(None) => debug!(block_id, %tx_hash, log_id, "no significant event in log"),
870                            Err(error) => error!(block_id, %tx_hash, log_id, %error, "error processing log in tx"),
871                        }
872                    }
873
874                    Ok(ret)
875                })
876            })
877            .await
878    }
879}
880
881#[cfg(test)]
882mod tests {
883    use super::ContractEventHandlers;
884
885    use anyhow::{anyhow, Context};
886    use ethers::contract::EthEvent;
887    use ethers::{
888        abi::{encode, Address as EthereumAddress, Token},
889        types::U256 as EthU256,
890    };
891    use hex_literal::hex;
892    use multiaddr::Multiaddr;
893    use primitive_types::H256;
894    use std::sync::atomic::Ordering;
895    use std::sync::Arc;
896    use std::time::SystemTime;
897
898    use hopr_bindings::hopr_winning_probability_oracle_events::WinProbUpdatedFilter;
899    use hopr_bindings::{
900        hopr_announcements::{AddressAnnouncementFilter, KeyBindingFilter, RevokeAnnouncementFilter},
901        hopr_channels::{
902            ChannelBalanceDecreasedFilter, ChannelBalanceIncreasedFilter, ChannelClosedFilter, ChannelOpenedFilter,
903            DomainSeparatorUpdatedFilter, OutgoingChannelClosureInitiatedFilter, TicketRedeemedFilter,
904        },
905        hopr_network_registry::{
906            DeregisteredByManagerFilter, DeregisteredFilter, EligibilityUpdatedFilter,
907            NetworkRegistryStatusUpdatedFilter, RegisteredByManagerFilter, RegisteredFilter,
908        },
909        hopr_node_safe_registry::{DergisteredNodeSafeFilter, RegisteredNodeSafeFilter},
910        hopr_ticket_price_oracle::TicketPriceUpdatedFilter,
911        hopr_token::{ApprovalFilter, TransferFilter},
912    };
913    use hopr_chain_types::chain_events::{ChainEventType, NetworkRegistryStatus};
914    use hopr_chain_types::ContractAddresses;
915    use hopr_crypto_types::prelude::*;
916    use hopr_db_sql::accounts::{ChainOrPacketKey, HoprDbAccountOperations};
917    use hopr_db_sql::api::{info::DomainSeparator, tickets::HoprDbTicketOperations};
918    use hopr_db_sql::channels::HoprDbChannelOperations;
919    use hopr_db_sql::db::HoprDb;
920    use hopr_db_sql::info::HoprDbInfoOperations;
921    use hopr_db_sql::prelude::HoprDbResolverOperations;
922    use hopr_db_sql::registry::HoprDbRegistryOperations;
923    use hopr_db_sql::{HoprDbAllOperations, HoprDbGeneralModelOperations};
924    use hopr_internal_types::prelude::*;
925    use hopr_primitive_types::prelude::*;
926
927    lazy_static::lazy_static! {
928        static ref SELF_PRIV_KEY: OffchainKeypair = OffchainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
929        static ref COUNTERPARTY_CHAIN_KEY: ChainKeypair = ChainKeypair::random();
930        static ref COUNTERPARTY_CHAIN_ADDRESS: Address = COUNTERPARTY_CHAIN_KEY.public().to_address();
931        static ref SELF_CHAIN_KEY: ChainKeypair = ChainKeypair::random();
932        static ref SELF_CHAIN_ADDRESS: Address = SELF_CHAIN_KEY.public().to_address();
933        static ref STAKE_ADDRESS: Address = "4331eaa9542b6b034c43090d9ec1c2198758dbc3".parse().expect("lazy static address should be constructible");
934        static ref CHANNELS_ADDR: Address = "bab20aea98368220baa4e3b7f151273ee71df93b".parse().expect("lazy static address should be constructible"); // just a dummy
935        static ref TOKEN_ADDR: Address = "47d1677e018e79dcdd8a9c554466cb1556fa5007".parse().expect("lazy static address should be constructible"); // just a dummy
936        static ref NETWORK_REGISTRY_ADDR: Address = "a469d0225f884fb989cbad4fe289f6fd2fb98051".parse().expect("lazy static address should be constructible"); // just a dummy
937        static ref NODE_SAFE_REGISTRY_ADDR: Address = "0dcd1bf9a1b36ce34237eeafef220932846bcd82".parse().expect("lazy static address should be constructible"); // just a dummy
938        static ref ANNOUNCEMENTS_ADDR: Address = "11db4791bf45ef31a10ea4a1b5cb90f46cc72c7e".parse().expect("lazy static address should be constructible"); // just a dummy
939        static ref SAFE_MANAGEMENT_MODULE_ADDR: Address = "9b91245a65ad469163a86e32b2281af7a25f38ce".parse().expect("lazy static address should be constructible"); // just a dummy
940        static ref SAFE_INSTANCE_ADDR: Address = "b93d7fdd605fb64fdcc87f21590f950170719d47".parse().expect("lazy static address should be constructible"); // just a dummy
941        static ref TICKET_PRICE_ORACLE_ADDR: Address = "11db4391bf45ef31a10ea4a1b5cb90f46cc72c7e".parse().expect("lazy static address should be constructible"); // just a dummy
942        static ref WIN_PROB_ORACLE_ADDR: Address = "00db4391bf45ef31a10ea4a1b5cb90f46cc64c7e".parse().expect("lazy static address should be constructible"); // just a dummy
943    }
944
945    fn init_handlers<Db: HoprDbAllOperations + Clone>(db: Db) -> ContractEventHandlers<Db> {
946        ContractEventHandlers {
947            addresses: Arc::new(ContractAddresses {
948                channels: *CHANNELS_ADDR,
949                token: *TOKEN_ADDR,
950                network_registry: *NETWORK_REGISTRY_ADDR,
951                network_registry_proxy: Default::default(),
952                safe_registry: *NODE_SAFE_REGISTRY_ADDR,
953                announcements: *ANNOUNCEMENTS_ADDR,
954                module_implementation: *SAFE_MANAGEMENT_MODULE_ADDR,
955                price_oracle: *TICKET_PRICE_ORACLE_ADDR,
956                win_prob_oracle: *WIN_PROB_ORACLE_ADDR,
957                stake_factory: Default::default(),
958            }),
959            chain_key: SELF_CHAIN_KEY.clone(),
960            safe_address: SELF_CHAIN_KEY.public().to_address(),
961            db,
962        }
963    }
964
965    fn test_log() -> SerializableLog {
966        SerializableLog { ..Default::default() }
967    }
968
969    #[async_std::test]
970    async fn announce_keybinding() -> anyhow::Result<()> {
971        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
972
973        let handlers = init_handlers(db.clone());
974
975        let keybinding = KeyBinding::new(*SELF_CHAIN_ADDRESS, &SELF_PRIV_KEY);
976
977        let keybinding_log = SerializableLog {
978            address: handlers.addresses.announcements.into(),
979            topics: vec![KeyBindingFilter::signature().into()],
980            data: encode(&[
981                Token::FixedBytes(keybinding.signature.as_ref().to_vec()),
982                Token::FixedBytes(keybinding.packet_key.as_ref().to_vec()),
983                Token::Address(EthereumAddress::from_slice(
984                    &SELF_CHAIN_KEY.public().to_address().as_ref(),
985                )),
986            ])
987            .into(),
988            ..test_log()
989        };
990
991        let account_entry = AccountEntry::new(*SELF_PRIV_KEY.public(), *SELF_CHAIN_ADDRESS, AccountType::NotAnnounced);
992
993        let event_type = db
994            .begin_transaction()
995            .await?
996            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, keybinding_log.into()).await }))
997            .await?;
998
999        assert!(event_type.is_none(), "keybinding does not have a chain event type");
1000
1001        assert_eq!(
1002            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1003                .await?
1004                .context("a value should be present")?,
1005            account_entry
1006        );
1007        Ok(())
1008    }
1009
1010    #[async_std::test]
1011    async fn announce_address_announcement() -> anyhow::Result<()> {
1012        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1013
1014        let handlers = init_handlers(db.clone());
1015
1016        // Assume that there is a keybinding
1017        let account_entry = AccountEntry::new(*SELF_PRIV_KEY.public(), *SELF_CHAIN_ADDRESS, AccountType::NotAnnounced);
1018        db.insert_account(None, account_entry.clone()).await?;
1019
1020        let test_multiaddr_empty: Multiaddr = "".parse()?;
1021
1022        let address_announcement_empty_log = SerializableLog {
1023            address: handlers.addresses.announcements.into(),
1024            topics: vec![AddressAnnouncementFilter::signature().into()],
1025            data: encode(&[
1026                Token::Address(EthereumAddress::from_slice(&SELF_CHAIN_ADDRESS.as_ref())),
1027                Token::String(test_multiaddr_empty.to_string()),
1028            ])
1029            .into(),
1030            ..test_log()
1031        };
1032
1033        let handlers_clone = handlers.clone();
1034        let event_type = db
1035            .begin_transaction()
1036            .await?
1037            .perform(|tx| {
1038                Box::pin(async move {
1039                    handlers_clone
1040                        .process_log_event(tx, address_announcement_empty_log.into())
1041                        .await
1042                })
1043            })
1044            .await?;
1045
1046        assert!(
1047            event_type.is_none(),
1048            "announcement of empty multiaddresses must pass through"
1049        );
1050
1051        assert_eq!(
1052            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1053                .await?
1054                .context("a value should be present")?,
1055            account_entry
1056        );
1057
1058        let test_multiaddr: Multiaddr = "/ip4/1.2.3.4/tcp/56".parse()?;
1059
1060        let address_announcement_log = SerializableLog {
1061            address: handlers.addresses.announcements.into(),
1062            block_number: 1,
1063            topics: vec![AddressAnnouncementFilter::signature().into()],
1064            data: encode(&[
1065                Token::Address(EthereumAddress::from_slice(&SELF_CHAIN_ADDRESS.as_ref())),
1066                Token::String(test_multiaddr.to_string()),
1067            ])
1068            .into(),
1069            ..test_log()
1070        };
1071
1072        let announced_account_entry = AccountEntry::new(
1073            *SELF_PRIV_KEY.public(),
1074            *SELF_CHAIN_ADDRESS,
1075            AccountType::Announced {
1076                multiaddr: test_multiaddr.clone(),
1077                updated_block: 1,
1078            },
1079        );
1080
1081        let handlers_clone = handlers.clone();
1082        let event_type = db
1083            .begin_transaction()
1084            .await?
1085            .perform(|tx| {
1086                Box::pin(async move {
1087                    handlers_clone
1088                        .process_log_event(tx, address_announcement_log.into())
1089                        .await
1090                })
1091            })
1092            .await?;
1093
1094        assert!(
1095            matches!(event_type, Some(ChainEventType::Announcement { multiaddresses,.. }) if multiaddresses == vec![test_multiaddr]),
1096            "must return the latest announce multiaddress"
1097        );
1098
1099        assert_eq!(
1100            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1101                .await?
1102                .context("a value should be present")?,
1103            announced_account_entry
1104        );
1105
1106        assert_eq!(
1107            Some(*SELF_CHAIN_ADDRESS),
1108            db.resolve_chain_key(SELF_PRIV_KEY.public()).await?,
1109            "must resolve correct chain key"
1110        );
1111
1112        assert_eq!(
1113            Some(*SELF_PRIV_KEY.public()),
1114            db.resolve_packet_key(&SELF_CHAIN_ADDRESS).await?,
1115            "must resolve correct packet key"
1116        );
1117
1118        let test_multiaddr_dns: Multiaddr = "/dns4/useful.domain/tcp/56".parse()?;
1119
1120        let address_announcement_dns_log = SerializableLog {
1121            address: handlers.addresses.announcements.into(),
1122            block_number: 2,
1123            topics: vec![AddressAnnouncementFilter::signature().into()],
1124            data: encode(&[
1125                Token::Address(EthereumAddress::from_slice(&SELF_CHAIN_ADDRESS.as_ref())),
1126                Token::String(test_multiaddr_dns.to_string()),
1127            ])
1128            .into(),
1129            ..test_log()
1130        };
1131
1132        let announced_dns_account_entry = AccountEntry::new(
1133            *SELF_PRIV_KEY.public(),
1134            *SELF_CHAIN_ADDRESS,
1135            AccountType::Announced {
1136                multiaddr: test_multiaddr_dns.clone(),
1137                updated_block: 2,
1138            },
1139        );
1140
1141        let event_type = db
1142            .begin_transaction()
1143            .await?
1144            .perform(|tx| {
1145                Box::pin(async move {
1146                    handlers
1147                        .process_log_event(tx, address_announcement_dns_log.into())
1148                        .await
1149                })
1150            })
1151            .await?;
1152
1153        assert!(
1154            matches!(event_type, Some(ChainEventType::Announcement { multiaddresses,.. }) if multiaddresses == vec![test_multiaddr_dns]),
1155            "must return the latest announce multiaddress"
1156        );
1157
1158        assert_eq!(
1159            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1160                .await?
1161                .context("a value should be present")?,
1162            announced_dns_account_entry
1163        );
1164
1165        assert_eq!(
1166            Some(*SELF_CHAIN_ADDRESS),
1167            db.resolve_chain_key(SELF_PRIV_KEY.public()).await?,
1168            "must resolve correct chain key"
1169        );
1170
1171        assert_eq!(
1172            Some(*SELF_PRIV_KEY.public()),
1173            db.resolve_packet_key(&SELF_CHAIN_ADDRESS).await?,
1174            "must resolve correct packet key"
1175        );
1176        Ok(())
1177    }
1178
1179    #[async_std::test]
1180    async fn announce_revoke() -> anyhow::Result<()> {
1181        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1182        let handlers = init_handlers(db.clone());
1183
1184        let test_multiaddr: Multiaddr = "/ip4/1.2.3.4/tcp/56".parse()?;
1185
1186        // Assume that there is a keybinding and an address announcement
1187        let announced_account_entry = AccountEntry::new(
1188            *SELF_PRIV_KEY.public(),
1189            *SELF_CHAIN_ADDRESS,
1190            AccountType::Announced {
1191                multiaddr: test_multiaddr,
1192                updated_block: 0,
1193            },
1194        );
1195        db.insert_account(None, announced_account_entry).await?;
1196
1197        let revoke_announcement_log = SerializableLog {
1198            address: handlers.addresses.announcements.into(),
1199            topics: vec![RevokeAnnouncementFilter::signature().into()],
1200            data: encode(&[Token::Address(EthereumAddress::from_slice(
1201                &SELF_CHAIN_ADDRESS.as_ref(),
1202            ))])
1203            .into(),
1204            ..test_log()
1205        };
1206
1207        let account_entry = AccountEntry::new(*SELF_PRIV_KEY.public(), *SELF_CHAIN_ADDRESS, AccountType::NotAnnounced);
1208
1209        let event_type = db
1210            .begin_transaction()
1211            .await?
1212            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, revoke_announcement_log.into()).await }))
1213            .await?;
1214
1215        assert!(
1216            event_type.is_none(),
1217            "revoke announcement does not have chain event type"
1218        );
1219
1220        assert_eq!(
1221            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1222                .await?
1223                .context("a value should be present")?,
1224            account_entry
1225        );
1226        Ok(())
1227    }
1228
1229    #[async_std::test]
1230    async fn on_token_transfer_to() -> anyhow::Result<()> {
1231        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1232
1233        let handlers = init_handlers(db.clone());
1234
1235        let value = U256::max_value();
1236
1237        let transferred_log = SerializableLog {
1238            address: handlers.addresses.token.into(),
1239            topics: vec![
1240                TransferFilter::signature().into(),
1241                H256::from_slice(&Address::default().to_bytes32()).into(),
1242                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1243            ],
1244            data: encode(&[Token::Uint(value)]).into(),
1245            ..test_log()
1246        };
1247
1248        let event_type = db
1249            .begin_transaction()
1250            .await?
1251            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, transferred_log.into()).await }))
1252            .await?;
1253
1254        assert!(event_type.is_none(), "token transfer does not have chain event type");
1255
1256        assert_eq!(
1257            db.get_safe_hopr_balance(None).await?,
1258            Balance::new(value, BalanceType::HOPR)
1259        );
1260
1261        Ok(())
1262    }
1263
1264    #[async_std::test]
1265    async fn on_token_transfer_from() -> anyhow::Result<()> {
1266        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1267
1268        let handlers = init_handlers(db.clone());
1269
1270        let value = U256::max_value();
1271
1272        db.set_safe_hopr_balance(None, BalanceType::HOPR.balance(value)).await?;
1273
1274        let transferred_log = SerializableLog {
1275            address: handlers.addresses.token.into(),
1276            topics: vec![
1277                TransferFilter::signature().into(),
1278                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1279                H256::from_slice(&Address::default().to_bytes32()).into(),
1280            ],
1281            data: encode(&[Token::Uint(value)]).into(),
1282            ..test_log()
1283        };
1284
1285        let event_type = db
1286            .begin_transaction()
1287            .await?
1288            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, transferred_log.into()).await }))
1289            .await?;
1290
1291        assert!(event_type.is_none(), "token transfer does not have chain event type");
1292
1293        assert_eq!(db.get_safe_hopr_balance(None).await?, BalanceType::HOPR.zero());
1294
1295        Ok(())
1296    }
1297
1298    #[async_std::test]
1299    async fn on_token_approval_correct() -> anyhow::Result<()> {
1300        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1301
1302        let handlers = init_handlers(db.clone());
1303
1304        let approval_log = SerializableLog {
1305            address: handlers.addresses.token.into(),
1306            topics: vec![
1307                ApprovalFilter::signature().into(),
1308                H256::from_slice(&handlers.safe_address.to_bytes32()).into(),
1309                H256::from_slice(&handlers.addresses.channels.to_bytes32()).into(),
1310            ],
1311            data: encode(&[Token::Uint(EthU256::from(1000u64))]).into(),
1312            ..test_log()
1313        };
1314
1315        assert_eq!(
1316            db.get_safe_hopr_allowance(None).await?,
1317            Balance::new(U256::from(0u64), BalanceType::HOPR)
1318        );
1319
1320        let approval_log_clone = approval_log.clone();
1321        let handlers_clone = handlers.clone();
1322        let event_type = db
1323            .begin_transaction()
1324            .await?
1325            .perform(|tx| {
1326                Box::pin(async move { handlers_clone.process_log_event(tx, approval_log_clone.into()).await })
1327            })
1328            .await?;
1329
1330        assert!(event_type.is_none(), "token approval does not have chain event type");
1331
1332        assert_eq!(
1333            db.get_safe_hopr_allowance(None).await?,
1334            Balance::new(U256::from(1000u64), BalanceType::HOPR)
1335        );
1336
1337        // reduce allowance manually to verify a second time
1338        let _ = db
1339            .set_safe_hopr_allowance(None, Balance::new(U256::from(10u64), BalanceType::HOPR))
1340            .await;
1341        assert_eq!(
1342            db.get_safe_hopr_allowance(None).await?,
1343            Balance::new(U256::from(10u64), BalanceType::HOPR)
1344        );
1345
1346        let handlers_clone = handlers.clone();
1347        let event_type = db
1348            .begin_transaction()
1349            .await?
1350            .perform(|tx| Box::pin(async move { handlers_clone.process_log_event(tx, approval_log.into()).await }))
1351            .await?;
1352
1353        assert!(event_type.is_none(), "token approval does not have chain event type");
1354
1355        assert_eq!(
1356            db.get_safe_hopr_allowance(None).await?,
1357            Balance::new(U256::from(1000u64), BalanceType::HOPR)
1358        );
1359        Ok(())
1360    }
1361
1362    #[async_std::test]
1363    async fn on_network_registry_event_registered() -> anyhow::Result<()> {
1364        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1365
1366        let handlers = init_handlers(db.clone());
1367
1368        let registered_log = SerializableLog {
1369            address: handlers.addresses.network_registry.into(),
1370            topics: vec![
1371                RegisteredFilter::signature().into(),
1372                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1373                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1374            ],
1375            data: encode(&[]).into(),
1376            ..test_log()
1377        };
1378
1379        assert!(
1380            !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1381                .await?
1382        );
1383
1384        let event_type = db
1385            .begin_transaction()
1386            .await?
1387            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log.into()).await }))
1388            .await?;
1389
1390        assert!(
1391            matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Allowed),
1392            "must return correct NR update"
1393        );
1394
1395        assert!(
1396            db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1397                .await?,
1398            "must be allowed in NR"
1399        );
1400        Ok(())
1401    }
1402
1403    #[async_std::test]
1404    async fn on_network_registry_event_registered_by_manager() -> anyhow::Result<()> {
1405        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1406
1407        let handlers = init_handlers(db.clone());
1408
1409        let registered_log = SerializableLog {
1410            address: handlers.addresses.network_registry.into(),
1411            topics: vec![
1412                RegisteredByManagerFilter::signature().into(),
1413                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1414                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1415            ],
1416            data: encode(&[]).into(),
1417            ..test_log()
1418        };
1419
1420        assert!(
1421            !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1422                .await?
1423        );
1424
1425        let event_type = db
1426            .begin_transaction()
1427            .await?
1428            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log.into()).await }))
1429            .await?;
1430
1431        assert!(
1432            matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Allowed),
1433            "must return correct NR update"
1434        );
1435
1436        assert!(
1437            db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1438                .await?,
1439            "must be allowed in NR"
1440        );
1441        Ok(())
1442    }
1443
1444    #[async_std::test]
1445    async fn on_network_registry_event_deregistered() -> anyhow::Result<()> {
1446        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1447
1448        let handlers = init_handlers(db.clone());
1449
1450        db.set_access_in_network_registry(None, *SELF_CHAIN_ADDRESS, true)
1451            .await?;
1452
1453        let registered_log = SerializableLog {
1454            address: handlers.addresses.network_registry.into(),
1455            topics: vec![
1456                DeregisteredFilter::signature().into(),
1457                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1458                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1459            ],
1460            data: encode(&[]).into(),
1461            ..test_log()
1462        };
1463
1464        assert!(
1465            db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1466                .await?
1467        );
1468
1469        let event_type = db
1470            .begin_transaction()
1471            .await?
1472            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log.into()).await }))
1473            .await?;
1474
1475        assert!(
1476            matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Denied),
1477            "must return correct NR update"
1478        );
1479
1480        assert!(
1481            !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1482                .await?,
1483            "must not be allowed in NR"
1484        );
1485        Ok(())
1486    }
1487
1488    #[async_std::test]
1489    async fn on_network_registry_event_deregistered_by_manager() -> anyhow::Result<()> {
1490        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1491
1492        let handlers = init_handlers(db.clone());
1493
1494        db.set_access_in_network_registry(None, *SELF_CHAIN_ADDRESS, true)
1495            .await?;
1496
1497        let registered_log = SerializableLog {
1498            address: handlers.addresses.network_registry.into(),
1499            topics: vec![
1500                DeregisteredByManagerFilter::signature().into(),
1501                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1502                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1503            ],
1504            data: encode(&[]).into(),
1505            ..test_log()
1506        };
1507
1508        assert!(
1509            db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1510                .await?
1511        );
1512
1513        let event_type = db
1514            .begin_transaction()
1515            .await?
1516            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log.into()).await }))
1517            .await?;
1518
1519        assert!(
1520            matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Denied),
1521            "must return correct NR update"
1522        );
1523
1524        assert!(
1525            !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1526                .await?,
1527            "must not be allowed in NR"
1528        );
1529        Ok(())
1530    }
1531
1532    #[async_std::test]
1533    async fn on_network_registry_event_enabled() -> anyhow::Result<()> {
1534        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1535
1536        let handlers = init_handlers(db.clone());
1537
1538        let nr_enabled = SerializableLog {
1539            address: handlers.addresses.network_registry.into(),
1540            topics: vec![
1541                NetworkRegistryStatusUpdatedFilter::signature().into(),
1542                H256::from_low_u64_be(1).into(),
1543            ],
1544            data: encode(&[]).into(),
1545            ..test_log()
1546        };
1547
1548        let event_type = db
1549            .begin_transaction()
1550            .await?
1551            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, nr_enabled.into()).await }))
1552            .await?;
1553
1554        assert!(event_type.is_none(), "there's no chain event type for nr disable");
1555
1556        assert!(db.get_indexer_data(None).await?.nr_enabled);
1557        Ok(())
1558    }
1559
1560    #[async_std::test]
1561    async fn on_network_registry_event_disabled() -> anyhow::Result<()> {
1562        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1563
1564        let handlers = init_handlers(db.clone());
1565
1566        db.set_network_registry_enabled(None, true).await?;
1567
1568        let nr_disabled = SerializableLog {
1569            address: handlers.addresses.network_registry.into(),
1570            topics: vec![
1571                NetworkRegistryStatusUpdatedFilter::signature().into(),
1572                H256::from_low_u64_be(0).into(),
1573            ],
1574            data: encode(&[]).into(),
1575            ..test_log()
1576        };
1577
1578        let event_type = db
1579            .begin_transaction()
1580            .await?
1581            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, nr_disabled.into()).await }))
1582            .await?;
1583
1584        assert!(event_type.is_none(), "there's no chain event type for nr enable");
1585
1586        assert!(!db.get_indexer_data(None).await?.nr_enabled);
1587        Ok(())
1588    }
1589
1590    #[async_std::test]
1591    async fn on_network_registry_set_eligible() -> anyhow::Result<()> {
1592        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1593
1594        let handlers = init_handlers(db.clone());
1595
1596        let set_eligible = SerializableLog {
1597            address: handlers.addresses.network_registry.into(),
1598            topics: vec![
1599                EligibilityUpdatedFilter::signature().into(),
1600                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1601                H256::from_low_u64_be(1).into(),
1602            ],
1603            data: encode(&[]).into(),
1604            ..test_log()
1605        };
1606
1607        let event_type = db
1608            .begin_transaction()
1609            .await?
1610            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, set_eligible.into()).await }))
1611            .await?;
1612
1613        assert!(
1614            event_type.is_none(),
1615            "there's no chain event type for setting nr eligibility"
1616        );
1617
1618        assert!(db.is_safe_eligible(None, *STAKE_ADDRESS).await?);
1619
1620        Ok(())
1621    }
1622
1623    #[async_std::test]
1624    async fn on_network_registry_set_not_eligible() -> anyhow::Result<()> {
1625        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1626
1627        let handlers = init_handlers(db.clone());
1628
1629        db.set_safe_eligibility(None, *STAKE_ADDRESS, false).await?;
1630
1631        let set_eligible = SerializableLog {
1632            address: handlers.addresses.network_registry.into(),
1633            topics: vec![
1634                EligibilityUpdatedFilter::signature().into(),
1635                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1636                H256::from_low_u64_be(0).into(),
1637            ],
1638            data: encode(&[]).into(),
1639            ..test_log()
1640        };
1641
1642        let event_type = db
1643            .begin_transaction()
1644            .await?
1645            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, set_eligible.into()).await }))
1646            .await?;
1647
1648        assert!(
1649            event_type.is_none(),
1650            "there's no chain event type for unsetting nr eligibility"
1651        );
1652
1653        assert!(!db.is_safe_eligible(None, *STAKE_ADDRESS).await?);
1654
1655        Ok(())
1656    }
1657
1658    #[async_std::test]
1659    async fn on_channel_event_balance_increased() -> anyhow::Result<()> {
1660        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1661
1662        let handlers = init_handlers(db.clone());
1663
1664        let channel = ChannelEntry::new(
1665            *SELF_CHAIN_ADDRESS,
1666            *COUNTERPARTY_CHAIN_ADDRESS,
1667            Balance::new(U256::zero(), BalanceType::HOPR),
1668            U256::zero(),
1669            ChannelStatus::Open,
1670            U256::one(),
1671        );
1672
1673        db.upsert_channel(None, channel).await?;
1674
1675        let solidity_balance = BalanceType::HOPR.balance(U256::from((1u128 << 96) - 1));
1676        let diff = solidity_balance - channel.balance;
1677
1678        let balance_increased_log = SerializableLog {
1679            address: handlers.addresses.channels.into(),
1680            topics: vec![
1681                ChannelBalanceIncreasedFilter::signature().into(),
1682                H256::from_slice(channel.get_id().as_ref()).into(),
1683            ],
1684            data: Vec::from(solidity_balance.amount().to_be_bytes()).into(),
1685            ..test_log()
1686        };
1687
1688        let event_type = db
1689            .begin_transaction()
1690            .await?
1691            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_increased_log.into()).await }))
1692            .await?;
1693
1694        let channel = db
1695            .get_channel_by_id(None, &channel.get_id())
1696            .await?
1697            .context("a value should be present")?;
1698
1699        assert!(
1700            matches!(event_type, Some(ChainEventType::ChannelBalanceIncreased(c, b)) if c == channel && b == diff),
1701            "must return updated channel entry and balance diff"
1702        );
1703
1704        assert_eq!(solidity_balance, channel.balance, "balance must be updated");
1705        Ok(())
1706    }
1707
1708    #[async_std::test]
1709    async fn on_channel_event_domain_separator_updated() -> anyhow::Result<()> {
1710        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1711
1712        let handlers = init_handlers(db.clone());
1713
1714        let separator = Hash::from(hopr_crypto_random::random_bytes());
1715
1716        let channels_dst_updated = SerializableLog {
1717            address: handlers.addresses.channels.into(),
1718            topics: vec![
1719                DomainSeparatorUpdatedFilter::signature().into(),
1720                H256::from_slice(separator.as_ref()).into(),
1721            ],
1722            data: encode(&[]).into(),
1723            ..test_log()
1724        };
1725
1726        assert!(db.get_indexer_data(None).await?.channels_dst.is_none());
1727
1728        let event_type = db
1729            .begin_transaction()
1730            .await?
1731            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channels_dst_updated.into()).await }))
1732            .await?;
1733
1734        assert!(
1735            event_type.is_none(),
1736            "there's no chain event type for channel dst update"
1737        );
1738
1739        assert_eq!(
1740            separator,
1741            db.get_indexer_data(None)
1742                .await?
1743                .channels_dst
1744                .context("a value should be present")?,
1745            "separator must be updated"
1746        );
1747        Ok(())
1748    }
1749
1750    #[async_std::test]
1751    async fn on_channel_event_balance_decreased() -> anyhow::Result<()> {
1752        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1753
1754        let handlers = init_handlers(db.clone());
1755
1756        let channel = ChannelEntry::new(
1757            *SELF_CHAIN_ADDRESS,
1758            *COUNTERPARTY_CHAIN_ADDRESS,
1759            Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR),
1760            U256::zero(),
1761            ChannelStatus::Open,
1762            U256::one(),
1763        );
1764
1765        db.upsert_channel(None, channel).await?;
1766
1767        let solidity_balance = U256::from((1u128 << 96) - 2);
1768        let diff = channel.balance - solidity_balance;
1769
1770        let balance_decreased_log = SerializableLog {
1771            address: handlers.addresses.channels.into(),
1772            topics: vec![
1773                ChannelBalanceDecreasedFilter::signature().into(),
1774                H256::from_slice(channel.get_id().as_ref()).into(),
1775            ],
1776            data: Vec::from(solidity_balance.to_be_bytes()).into(),
1777            ..test_log()
1778        };
1779
1780        let event_type = db
1781            .begin_transaction()
1782            .await?
1783            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_decreased_log.into()).await }))
1784            .await?;
1785
1786        let channel = db
1787            .get_channel_by_id(None, &channel.get_id())
1788            .await?
1789            .context("a value should be present")?;
1790
1791        assert!(
1792            matches!(event_type, Some(ChainEventType::ChannelBalanceDecreased(c, b)) if c == channel && b == diff),
1793            "must return updated channel entry and balance diff"
1794        );
1795
1796        assert_eq!(solidity_balance, channel.balance.amount(), "balance must be updated");
1797        Ok(())
1798    }
1799
1800    #[async_std::test]
1801    async fn on_channel_closed() -> anyhow::Result<()> {
1802        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1803
1804        let handlers = init_handlers(db.clone());
1805
1806        let starting_balance = Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR);
1807
1808        let channel = ChannelEntry::new(
1809            *SELF_CHAIN_ADDRESS,
1810            *COUNTERPARTY_CHAIN_ADDRESS,
1811            starting_balance,
1812            U256::zero(),
1813            ChannelStatus::Open,
1814            U256::one(),
1815        );
1816
1817        db.upsert_channel(None, channel).await?;
1818
1819        let channel_closed_log = SerializableLog {
1820            address: handlers.addresses.channels.into(),
1821            topics: vec![
1822                ChannelClosedFilter::signature().into(),
1823                H256::from_slice(channel.get_id().as_ref()).into(),
1824            ],
1825            data: encode(&[]).into(),
1826            ..test_log()
1827        };
1828
1829        let event_type = db
1830            .begin_transaction()
1831            .await?
1832            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_closed_log.into()).await }))
1833            .await?;
1834
1835        let closed_channel = db
1836            .get_channel_by_id(None, &channel.get_id())
1837            .await?
1838            .context("a value should be present")?;
1839
1840        assert!(
1841            matches!(event_type, Some(ChainEventType::ChannelClosed(c)) if c == closed_channel),
1842            "must return the updated channel entry"
1843        );
1844
1845        assert_eq!(closed_channel.status, ChannelStatus::Closed);
1846        assert_eq!(closed_channel.ticket_index, 0u64.into());
1847        assert_eq!(
1848            0,
1849            db.get_outgoing_ticket_index(closed_channel.get_id())
1850                .await?
1851                .load(Ordering::Relaxed)
1852        );
1853
1854        assert!(closed_channel.balance.amount().eq(&U256::zero()));
1855        Ok(())
1856    }
1857
1858    #[async_std::test]
1859    async fn on_foreign_channel_closed() -> anyhow::Result<()> {
1860        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1861
1862        let handlers = init_handlers(db.clone());
1863
1864        let starting_balance = Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR);
1865
1866        let channel = ChannelEntry::new(
1867            Address::new(&hex!("B7397C218766eBe6A1A634df523A1a7e412e67eA")),
1868            Address::new(&hex!("D4fdec44DB9D44B8f2b6d529620f9C0C7066A2c1")),
1869            starting_balance,
1870            U256::zero(),
1871            ChannelStatus::Open,
1872            U256::one(),
1873        );
1874
1875        db.upsert_channel(None, channel).await?;
1876
1877        let channel_closed_log = SerializableLog {
1878            address: handlers.addresses.channels.into(),
1879            topics: vec![
1880                ChannelClosedFilter::signature().into(),
1881                H256::from_slice(channel.get_id().as_ref()).into(),
1882            ],
1883            data: encode(&[]).into(),
1884            ..test_log()
1885        };
1886
1887        let event_type = db
1888            .begin_transaction()
1889            .await?
1890            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_closed_log.into()).await }))
1891            .await?;
1892
1893        let closed_channel = db.get_channel_by_id(None, &channel.get_id()).await?;
1894
1895        assert_eq!(None, closed_channel, "foreign channel must be deleted");
1896
1897        assert!(
1898            matches!(event_type, Some(ChainEventType::ChannelClosed(c)) if c.get_id() == channel.get_id()),
1899            "must return the closed channel entry"
1900        );
1901
1902        Ok(())
1903    }
1904
1905    #[async_std::test]
1906    async fn on_channel_opened() -> anyhow::Result<()> {
1907        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1908
1909        let handlers = init_handlers(db.clone());
1910
1911        let channel_id = generate_channel_id(&SELF_CHAIN_ADDRESS, &COUNTERPARTY_CHAIN_ADDRESS);
1912
1913        let channel_opened_log = SerializableLog {
1914            address: handlers.addresses.channels.into(),
1915            topics: vec![
1916                ChannelOpenedFilter::signature().into(),
1917                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1918                H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
1919            ],
1920            data: encode(&[]).into(),
1921            ..test_log()
1922        };
1923
1924        let event_type = db
1925            .begin_transaction()
1926            .await?
1927            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log.into()).await }))
1928            .await?;
1929
1930        let channel = db
1931            .get_channel_by_id(None, &channel_id)
1932            .await?
1933            .context("a value should be present")?;
1934
1935        assert!(
1936            matches!(event_type, Some(ChainEventType::ChannelOpened(c)) if c == channel),
1937            "must return the updated channel entry"
1938        );
1939
1940        assert_eq!(channel.status, ChannelStatus::Open);
1941        assert_eq!(channel.channel_epoch, 1u64.into());
1942        assert_eq!(channel.ticket_index, 0u64.into());
1943        assert_eq!(
1944            0,
1945            db.get_outgoing_ticket_index(channel.get_id())
1946                .await?
1947                .load(Ordering::Relaxed)
1948        );
1949        Ok(())
1950    }
1951
1952    #[async_std::test]
1953    async fn on_channel_reopened() -> anyhow::Result<()> {
1954        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1955
1956        let handlers = init_handlers(db.clone());
1957
1958        let channel = ChannelEntry::new(
1959            *SELF_CHAIN_ADDRESS,
1960            *COUNTERPARTY_CHAIN_ADDRESS,
1961            Balance::zero(BalanceType::HOPR),
1962            U256::zero(),
1963            ChannelStatus::Closed,
1964            3.into(),
1965        );
1966
1967        db.upsert_channel(None, channel).await?;
1968
1969        let channel_opened_log = SerializableLog {
1970            address: handlers.addresses.channels.into(),
1971            topics: vec![
1972                ChannelOpenedFilter::signature().into(),
1973                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1974                H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
1975            ],
1976            data: encode(&[]).into(),
1977            ..test_log()
1978        };
1979
1980        let event_type = db
1981            .begin_transaction()
1982            .await?
1983            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log.into()).await }))
1984            .await?;
1985
1986        let channel = db
1987            .get_channel_by_id(None, &channel.get_id())
1988            .await?
1989            .context("a value should be present")?;
1990
1991        assert!(
1992            matches!(event_type, Some(ChainEventType::ChannelOpened(c)) if c == channel),
1993            "must return the updated channel entry"
1994        );
1995
1996        assert_eq!(channel.status, ChannelStatus::Open);
1997        assert_eq!(channel.channel_epoch, 4u64.into());
1998        assert_eq!(channel.ticket_index, 0u64.into());
1999
2000        assert_eq!(
2001            0,
2002            db.get_outgoing_ticket_index(channel.get_id())
2003                .await?
2004                .load(Ordering::Relaxed)
2005        );
2006        Ok(())
2007    }
2008
2009    #[async_std::test]
2010    async fn on_channel_should_not_reopen_when_not_closed() -> anyhow::Result<()> {
2011        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2012
2013        let handlers = init_handlers(db.clone());
2014
2015        let channel = ChannelEntry::new(
2016            *SELF_CHAIN_ADDRESS,
2017            *COUNTERPARTY_CHAIN_ADDRESS,
2018            Balance::zero(BalanceType::HOPR),
2019            U256::zero(),
2020            ChannelStatus::Open,
2021            3.into(),
2022        );
2023
2024        db.upsert_channel(None, channel).await?;
2025
2026        let channel_opened_log = SerializableLog {
2027            address: handlers.addresses.channels.into(),
2028            topics: vec![
2029                ChannelOpenedFilter::signature().into(),
2030                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2031                H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2032            ],
2033            data: encode(&[]).into(),
2034            ..test_log()
2035        };
2036
2037        db.begin_transaction()
2038            .await?
2039            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log.into()).await }))
2040            .await
2041            .expect_err("should not re-open channel that is not Closed");
2042        Ok(())
2043    }
2044
2045    const PRICE_PER_PACKET: u32 = 20_u32;
2046
2047    fn mock_acknowledged_ticket(
2048        signer: &ChainKeypair,
2049        destination: &ChainKeypair,
2050        index: u64,
2051        win_prob: f64,
2052    ) -> anyhow::Result<AcknowledgedTicket> {
2053        let channel_id = generate_channel_id(&signer.into(), &destination.into());
2054
2055        let channel_epoch = 1u64;
2056        let domain_separator = Hash::default();
2057
2058        let response = Response::try_from(
2059            Hash::create(&[channel_id.as_ref(), &channel_epoch.to_be_bytes(), &index.to_be_bytes()]).as_ref(),
2060        )?;
2061
2062        Ok(TicketBuilder::default()
2063            .direction(&signer.into(), &destination.into())
2064            .amount(U256::from(PRICE_PER_PACKET).div_f64(win_prob)?)
2065            .index(index)
2066            .index_offset(1)
2067            .win_prob(win_prob)
2068            .channel_epoch(1)
2069            .challenge(response.to_challenge().into())
2070            .build_signed(signer, &domain_separator)?
2071            .into_acknowledged(response))
2072    }
2073
2074    #[async_std::test]
2075    async fn on_channel_ticket_redeemed_incoming_channel() -> anyhow::Result<()> {
2076        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2077        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2078            .await?;
2079
2080        let handlers = init_handlers(db.clone());
2081
2082        let channel = ChannelEntry::new(
2083            *COUNTERPARTY_CHAIN_ADDRESS,
2084            *SELF_CHAIN_ADDRESS,
2085            Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR),
2086            U256::zero(),
2087            ChannelStatus::Open,
2088            U256::one(),
2089        );
2090
2091        let ticket_index = U256::from((1u128 << 48) - 1);
2092        let next_ticket_index = ticket_index + 1;
2093
2094        let mut ticket =
2095            mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64(), 1.0)?;
2096        ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
2097
2098        let ticket_value = ticket.verified_ticket().amount;
2099
2100        db.upsert_channel(None, channel).await?;
2101        db.upsert_ticket(None, ticket.clone()).await?;
2102
2103        let ticket_redeemed_log = SerializableLog {
2104            address: handlers.addresses.channels.into(),
2105            topics: vec![
2106                TicketRedeemedFilter::signature().into(),
2107                H256::from_slice(channel.get_id().as_ref()).into(),
2108            ],
2109            data: Vec::from(next_ticket_index.to_be_bytes()).into(),
2110            ..test_log()
2111        };
2112
2113        let outgoing_ticket_index_before = db
2114            .get_outgoing_ticket_index(channel.get_id())
2115            .await?
2116            .load(Ordering::Relaxed);
2117
2118        let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2119        assert_eq!(
2120            BalanceType::HOPR.zero(),
2121            stats.redeemed_value,
2122            "there should not be any redeemed value"
2123        );
2124        assert_eq!(
2125            BalanceType::HOPR.zero(),
2126            stats.neglected_value,
2127            "there should not be any neglected value"
2128        );
2129
2130        let event_type = db
2131            .begin_transaction()
2132            .await?
2133            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log.into()).await }))
2134            .await?;
2135
2136        let channel = db
2137            .get_channel_by_id(None, &channel.get_id())
2138            .await?
2139            .context("a value should be present")?;
2140
2141        assert!(
2142            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, t)) if channel == c && t == Some(ticket)),
2143            "must return the updated channel entry and the redeemed ticket"
2144        );
2145
2146        assert_eq!(
2147            channel.ticket_index, next_ticket_index,
2148            "channel entry must contain next ticket index"
2149        );
2150
2151        let outgoing_ticket_index_after = db
2152            .get_outgoing_ticket_index(channel.get_id())
2153            .await?
2154            .load(Ordering::Relaxed);
2155
2156        assert_eq!(
2157            outgoing_ticket_index_before, outgoing_ticket_index_after,
2158            "outgoing ticket index must not change"
2159        );
2160
2161        let tickets = db.get_tickets((&channel).into()).await?;
2162        assert!(tickets.is_empty(), "there should not be any tickets left");
2163
2164        let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2165        assert_eq!(
2166            ticket_value, stats.redeemed_value,
2167            "there should be redeemed value worth 1 ticket"
2168        );
2169        assert_eq!(
2170            BalanceType::HOPR.zero(),
2171            stats.neglected_value,
2172            "there should not be any neglected ticket"
2173        );
2174        Ok(())
2175    }
2176
2177    #[async_std::test]
2178    async fn on_channel_ticket_redeemed_incoming_channel_neglect_left_over_tickets() -> anyhow::Result<()> {
2179        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2180        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2181            .await?;
2182
2183        let handlers = init_handlers(db.clone());
2184
2185        let channel = ChannelEntry::new(
2186            *COUNTERPARTY_CHAIN_ADDRESS,
2187            *SELF_CHAIN_ADDRESS,
2188            Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR),
2189            U256::zero(),
2190            ChannelStatus::Open,
2191            U256::one(),
2192        );
2193
2194        let ticket_index = U256::from((1u128 << 48) - 1);
2195        let next_ticket_index = ticket_index + 1;
2196
2197        let mut ticket =
2198            mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64(), 1.0)?;
2199        ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
2200
2201        let ticket_value = ticket.verified_ticket().amount;
2202
2203        db.upsert_channel(None, channel).await?;
2204        db.upsert_ticket(None, ticket.clone()).await?;
2205
2206        let old_ticket =
2207            mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64() - 1, 1.0)?;
2208        db.upsert_ticket(None, old_ticket.clone()).await?;
2209
2210        let ticket_redeemed_log = SerializableLog {
2211            address: handlers.addresses.channels.into(),
2212            topics: vec![
2213                TicketRedeemedFilter::signature().into(),
2214                H256::from_slice(&channel.get_id().as_ref()).into(),
2215            ],
2216            data: Vec::from(next_ticket_index.to_be_bytes()).into(),
2217            ..test_log()
2218        };
2219
2220        let outgoing_ticket_index_before = db
2221            .get_outgoing_ticket_index(channel.get_id())
2222            .await?
2223            .load(Ordering::Relaxed);
2224
2225        let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2226        assert_eq!(
2227            BalanceType::HOPR.zero(),
2228            stats.redeemed_value,
2229            "there should not be any redeemed value"
2230        );
2231        assert_eq!(
2232            BalanceType::HOPR.zero(),
2233            stats.neglected_value,
2234            "there should not be any neglected value"
2235        );
2236
2237        let event_type = db
2238            .begin_transaction()
2239            .await?
2240            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log.into()).await }))
2241            .await?;
2242
2243        let channel = db
2244            .get_channel_by_id(None, &channel.get_id())
2245            .await?
2246            .context("a value should be present")?;
2247
2248        assert!(
2249            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, t)) if channel == c && t == Some(ticket)),
2250            "must return the updated channel entry and the redeemed ticket"
2251        );
2252
2253        assert_eq!(
2254            channel.ticket_index, next_ticket_index,
2255            "channel entry must contain next ticket index"
2256        );
2257
2258        let outgoing_ticket_index_after = db
2259            .get_outgoing_ticket_index(channel.get_id())
2260            .await?
2261            .load(Ordering::Relaxed);
2262
2263        assert_eq!(
2264            outgoing_ticket_index_before, outgoing_ticket_index_after,
2265            "outgoing ticket index must not change"
2266        );
2267
2268        let tickets = db.get_tickets((&channel).into()).await?;
2269        assert!(tickets.is_empty(), "there should not be any tickets left");
2270
2271        let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2272        assert_eq!(
2273            ticket_value, stats.redeemed_value,
2274            "there should be redeemed value worth 1 ticket"
2275        );
2276        assert_eq!(
2277            ticket_value, stats.neglected_value,
2278            "there should neglected value worth 1 ticket"
2279        );
2280        Ok(())
2281    }
2282
2283    #[async_std::test]
2284    async fn on_channel_ticket_redeemed_outgoing_channel() -> anyhow::Result<()> {
2285        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2286        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2287            .await?;
2288
2289        let handlers = init_handlers(db.clone());
2290
2291        let channel = ChannelEntry::new(
2292            *SELF_CHAIN_ADDRESS,
2293            *COUNTERPARTY_CHAIN_ADDRESS,
2294            Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR),
2295            U256::zero(),
2296            ChannelStatus::Open,
2297            U256::one(),
2298        );
2299
2300        let ticket_index = U256::from((1u128 << 48) - 1);
2301        let next_ticket_index = ticket_index + 1;
2302
2303        db.upsert_channel(None, channel).await?;
2304
2305        let ticket_redeemed_log = SerializableLog {
2306            address: handlers.addresses.channels.into(),
2307            topics: vec![
2308                TicketRedeemedFilter::signature().into(),
2309                H256::from_slice(channel.get_id().as_ref()).into(),
2310            ],
2311            data: Vec::from(next_ticket_index.to_be_bytes()).into(),
2312            ..test_log()
2313        };
2314
2315        let event_type = db
2316            .begin_transaction()
2317            .await?
2318            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log.into()).await }))
2319            .await?;
2320
2321        let channel = db
2322            .get_channel_by_id(None, &channel.get_id())
2323            .await?
2324            .context("a value should be present")?;
2325
2326        assert!(
2327            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if channel == c),
2328            "must return update channel entry and no ticket"
2329        );
2330
2331        assert_eq!(
2332            channel.ticket_index, next_ticket_index,
2333            "channel entry must contain next ticket index"
2334        );
2335
2336        let outgoing_ticket_index = db
2337            .get_outgoing_ticket_index(channel.get_id())
2338            .await?
2339            .load(Ordering::Relaxed);
2340
2341        assert!(
2342            outgoing_ticket_index >= ticket_index.as_u64(),
2343            "outgoing idx {outgoing_ticket_index} must be greater or equal to {ticket_index}"
2344        );
2345        assert_eq!(
2346            outgoing_ticket_index,
2347            next_ticket_index.as_u64(),
2348            "outgoing ticket index must be equal to next ticket index"
2349        );
2350        Ok(())
2351    }
2352
2353    #[async_std::test]
2354    async fn on_channel_ticket_redeemed_on_incoming_channel_with_non_existent_ticket_should_pass() -> anyhow::Result<()>
2355    {
2356        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2357        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2358            .await?;
2359
2360        let handlers = init_handlers(db.clone());
2361
2362        let channel = ChannelEntry::new(
2363            *COUNTERPARTY_CHAIN_ADDRESS,
2364            *SELF_CHAIN_ADDRESS,
2365            Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR),
2366            U256::zero(),
2367            ChannelStatus::Open,
2368            U256::one(),
2369        );
2370
2371        db.upsert_channel(None, channel).await?;
2372
2373        let next_ticket_index = U256::from((1u128 << 48) - 1);
2374
2375        let ticket_redeemed_log = SerializableLog {
2376            address: handlers.addresses.channels.into(),
2377            topics: vec![
2378                TicketRedeemedFilter::signature().into(),
2379                H256::from_slice(channel.get_id().as_ref()).into(),
2380            ],
2381            data: Vec::from(next_ticket_index.to_be_bytes()).into(),
2382            ..test_log()
2383        };
2384
2385        let event_type = db
2386            .begin_transaction()
2387            .await?
2388            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log.into()).await }))
2389            .await?;
2390
2391        let channel = db
2392            .get_channel_by_id(None, &channel.get_id())
2393            .await?
2394            .context("a value should be present")?;
2395
2396        assert!(
2397            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if c == channel),
2398            "must return updated channel entry and no ticket"
2399        );
2400
2401        assert_eq!(
2402            channel.ticket_index, next_ticket_index,
2403            "channel entry must contain next ticket index"
2404        );
2405        Ok(())
2406    }
2407
2408    #[async_std::test]
2409    async fn on_channel_ticket_redeemed_on_foreign_channel_should_pass() -> anyhow::Result<()> {
2410        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2411
2412        let handlers = init_handlers(db.clone());
2413
2414        let channel = ChannelEntry::new(
2415            Address::from(hopr_crypto_random::random_bytes()),
2416            Address::from(hopr_crypto_random::random_bytes()),
2417            Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR),
2418            U256::zero(),
2419            ChannelStatus::Open,
2420            U256::one(),
2421        );
2422
2423        db.upsert_channel(None, channel).await?;
2424
2425        let next_ticket_index = U256::from((1u128 << 48) - 1);
2426
2427        let ticket_redeemed_log = SerializableLog {
2428            address: handlers.addresses.channels.into(),
2429            topics: vec![
2430                TicketRedeemedFilter::signature().into(),
2431                H256::from_slice(channel.get_id().as_ref()).into(),
2432            ],
2433            data: Vec::from(next_ticket_index.to_be_bytes()).into(),
2434            ..test_log()
2435        };
2436
2437        let event_type = db
2438            .begin_transaction()
2439            .await?
2440            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log.into()).await }))
2441            .await?;
2442
2443        let channel = db
2444            .get_channel_by_id(None, &channel.get_id())
2445            .await?
2446            .context("a value should be present")?;
2447
2448        assert!(
2449            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if c == channel),
2450            "must return updated channel entry and no ticket"
2451        );
2452
2453        assert_eq!(
2454            channel.ticket_index, next_ticket_index,
2455            "channel entry must contain next ticket index"
2456        );
2457        Ok(())
2458    }
2459
2460    #[async_std::test]
2461    async fn on_channel_closure_initiated() -> anyhow::Result<()> {
2462        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2463
2464        let handlers = init_handlers(db.clone());
2465
2466        let channel = ChannelEntry::new(
2467            *SELF_CHAIN_ADDRESS,
2468            *COUNTERPARTY_CHAIN_ADDRESS,
2469            Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR),
2470            U256::zero(),
2471            ChannelStatus::Open,
2472            U256::one(),
2473        );
2474
2475        db.upsert_channel(None, channel).await?;
2476
2477        let timestamp = SystemTime::now();
2478
2479        let closure_initiated_log = SerializableLog {
2480            address: handlers.addresses.channels.into(),
2481            topics: vec![
2482                OutgoingChannelClosureInitiatedFilter::signature().into(),
2483                H256::from_slice(channel.get_id().as_ref()).into(),
2484            ],
2485            data: Vec::from(U256::from(timestamp.as_unix_timestamp().as_secs()).to_be_bytes()).into(),
2486            ..test_log()
2487        };
2488
2489        let event_type = db
2490            .begin_transaction()
2491            .await?
2492            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, closure_initiated_log.into()).await }))
2493            .await?;
2494
2495        let channel = db
2496            .get_channel_by_id(None, &channel.get_id())
2497            .await?
2498            .context("a value should be present")?;
2499
2500        assert!(
2501            matches!(event_type, Some(ChainEventType::ChannelClosureInitiated(c)) if c == channel),
2502            "must return updated channel entry"
2503        );
2504
2505        assert_eq!(
2506            channel.status,
2507            ChannelStatus::PendingToClose(timestamp),
2508            "channel status must match"
2509        );
2510        Ok(())
2511    }
2512
2513    #[async_std::test]
2514    async fn on_node_safe_registry_registered() -> anyhow::Result<()> {
2515        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2516
2517        let handlers = init_handlers(db.clone());
2518
2519        let safe_registered_log = SerializableLog {
2520            address: handlers.addresses.safe_registry.into(),
2521            topics: vec![
2522                RegisteredNodeSafeFilter::signature().into(),
2523                H256::from_slice(&SAFE_INSTANCE_ADDR.to_bytes32()).into(),
2524                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2525            ],
2526            data: encode(&[]).into(),
2527            ..test_log()
2528        };
2529
2530        let event_type = db
2531            .begin_transaction()
2532            .await?
2533            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, safe_registered_log.into()).await }))
2534            .await?;
2535
2536        assert!(matches!(event_type, Some(ChainEventType::NodeSafeRegistered(addr)) if addr == *SAFE_INSTANCE_ADDR));
2537
2538        // Nothing to check in the DB here, since we do not track this
2539        Ok(())
2540    }
2541
2542    #[async_std::test]
2543    async fn on_node_safe_registry_deregistered() -> anyhow::Result<()> {
2544        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2545
2546        let handlers = init_handlers(db.clone());
2547
2548        // Nothing to write to the DB here, since we do not track this
2549
2550        let safe_registered_log = SerializableLog {
2551            address: handlers.addresses.safe_registry.into(),
2552            topics: vec![
2553                DergisteredNodeSafeFilter::signature().into(),
2554                H256::from_slice(&SAFE_INSTANCE_ADDR.to_bytes32()).into(),
2555                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2556            ],
2557            data: encode(&[]).into(),
2558            ..test_log()
2559        };
2560
2561        let event_type = db
2562            .begin_transaction()
2563            .await?
2564            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, safe_registered_log.into()).await }))
2565            .await?;
2566
2567        assert!(
2568            event_type.is_none(),
2569            "there's no associated chain event type with safe deregistration"
2570        );
2571
2572        // Nothing to check in the DB here, since we do not track this
2573        Ok(())
2574    }
2575
2576    #[async_std::test]
2577    async fn ticket_price_update() -> anyhow::Result<()> {
2578        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2579
2580        let handlers = init_handlers(db.clone());
2581
2582        let price_change_log = SerializableLog {
2583            address: handlers.addresses.price_oracle.into(),
2584            topics: vec![TicketPriceUpdatedFilter::signature().into()],
2585            data: encode(&[Token::Uint(EthU256::from(1u64)), Token::Uint(EthU256::from(123u64))]).into(),
2586            ..test_log()
2587        };
2588
2589        assert_eq!(db.get_indexer_data(None).await?.ticket_price, None);
2590
2591        let event_type = db
2592            .begin_transaction()
2593            .await?
2594            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, price_change_log.into()).await }))
2595            .await?;
2596
2597        assert!(
2598            event_type.is_none(),
2599            "there's no associated chain event type with price oracle"
2600        );
2601
2602        assert_eq!(
2603            db.get_indexer_data(None).await?.ticket_price.map(|p| p.amount()),
2604            Some(U256::from(123u64))
2605        );
2606        Ok(())
2607    }
2608
2609    #[async_std::test]
2610    async fn minimum_win_prob_update() -> anyhow::Result<()> {
2611        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2612
2613        let handlers = init_handlers(db.clone());
2614
2615        let win_prob_change_log = SerializableLog {
2616            address: handlers.addresses.win_prob_oracle.into(),
2617            topics: vec![WinProbUpdatedFilter::signature().into()],
2618            data: encode(&[
2619                Token::Uint(EthU256::from(f64_to_win_prob(1.0)?.as_ref())),
2620                Token::Uint(EthU256::from(f64_to_win_prob(0.5)?.as_ref())),
2621            ])
2622            .into(),
2623            ..test_log()
2624        };
2625
2626        assert_eq!(
2627            db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
2628            1.0
2629        );
2630
2631        let event_type = db
2632            .begin_transaction()
2633            .await?
2634            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, win_prob_change_log.into()).await }))
2635            .await?;
2636
2637        assert!(
2638            event_type.is_none(),
2639            "there's no associated chain event type with winning probability change"
2640        );
2641
2642        assert_eq!(
2643            db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
2644            0.5
2645        );
2646        Ok(())
2647    }
2648
2649    #[async_std::test]
2650    async fn lowering_minimum_win_prob_update_should_reject_non_satisfying_unredeemed_tickets() -> anyhow::Result<()> {
2651        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2652        db.set_minimum_incoming_ticket_win_prob(None, 0.1).await?;
2653
2654        let new_minimum = 0.5;
2655        let ticket_win_probs = [0.1, 1.0, 0.3, 0.2];
2656
2657        let channel_1 = ChannelEntry::new(
2658            *COUNTERPARTY_CHAIN_ADDRESS,
2659            *SELF_CHAIN_ADDRESS,
2660            Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR),
2661            3_u32.into(),
2662            ChannelStatus::Open,
2663            U256::one(),
2664        );
2665
2666        db.upsert_channel(None, channel_1.clone()).await?;
2667
2668        let ticket = mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, 1, ticket_win_probs[0])?;
2669        db.upsert_ticket(None, ticket).await?;
2670
2671        let ticket = mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, 2, ticket_win_probs[1])?;
2672        db.upsert_ticket(None, ticket).await?;
2673
2674        let tickets = db.get_tickets((&channel_1).into()).await?;
2675        assert_eq!(tickets.len(), 2);
2676
2677        // ---
2678
2679        let other_counterparty = ChainKeypair::random();
2680        let channel_2 = ChannelEntry::new(
2681            other_counterparty.public().to_address(),
2682            *SELF_CHAIN_ADDRESS,
2683            Balance::new(U256::from((1u128 << 96) - 1), BalanceType::HOPR),
2684            3_u32.into(),
2685            ChannelStatus::Open,
2686            U256::one(),
2687        );
2688
2689        db.upsert_channel(None, channel_2.clone()).await?;
2690
2691        let ticket = mock_acknowledged_ticket(&other_counterparty, &SELF_CHAIN_KEY, 1, ticket_win_probs[2])?;
2692        db.upsert_ticket(None, ticket).await?;
2693
2694        let ticket = mock_acknowledged_ticket(&other_counterparty, &SELF_CHAIN_KEY, 2, ticket_win_probs[3])?;
2695        db.upsert_ticket(None, ticket).await?;
2696
2697        let tickets = db.get_tickets((&channel_2).into()).await?;
2698        assert_eq!(tickets.len(), 2);
2699
2700        let stats = db.get_ticket_statistics(None).await?;
2701        assert_eq!(BalanceType::HOPR.zero(), stats.rejected_value);
2702
2703        let handlers = init_handlers(db.clone());
2704
2705        let win_prob_change_log = SerializableLog {
2706            address: handlers.addresses.win_prob_oracle.into(),
2707            topics: vec![WinProbUpdatedFilter::signature().into()],
2708            data: encode(&[
2709                Token::Uint(EthU256::from(f64_to_win_prob(0.1)?.as_ref())),
2710                Token::Uint(EthU256::from(f64_to_win_prob(new_minimum)?.as_ref())),
2711            ])
2712            .into(),
2713            ..test_log()
2714        };
2715
2716        let event_type = db
2717            .begin_transaction()
2718            .await?
2719            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, win_prob_change_log.into()).await }))
2720            .await?;
2721
2722        assert!(
2723            event_type.is_none(),
2724            "there's no associated chain event type with winning probability change"
2725        );
2726
2727        assert_eq!(
2728            db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
2729            new_minimum
2730        );
2731
2732        let tickets = db.get_tickets((&channel_1).into()).await?;
2733        assert_eq!(tickets.len(), 1);
2734
2735        let tickets = db.get_tickets((&channel_2).into()).await?;
2736        assert_eq!(tickets.len(), 0);
2737
2738        let stats = db.get_ticket_statistics(None).await?;
2739        let rejected_value: U256 = ticket_win_probs
2740            .iter()
2741            .filter(|p| **p < new_minimum)
2742            .map(|p| U256::from(PRICE_PER_PACKET).div_f64(*p).expect("must divide"))
2743            .reduce(|a, b| a + b)
2744            .ok_or(anyhow!("must sum"))?;
2745
2746        assert_eq!(BalanceType::HOPR.balance(rejected_value), stats.rejected_value);
2747
2748        Ok(())
2749    }
2750}