hopr_chain_indexer/
handlers.rs

1use std::{
2    cmp::Ordering,
3    fmt::{Debug, Formatter},
4    ops::Add,
5    sync::Arc,
6    time::{Duration, SystemTime},
7};
8
9use alloy::{primitives::B256, sol_types::SolEventInterface};
10use async_trait::async_trait;
11use hopr_bindings::{
12    hoprannouncements::HoprAnnouncements::HoprAnnouncementsEvents, hoprchannels::HoprChannels::HoprChannelsEvents,
13    hoprnetworkregistry::HoprNetworkRegistry::HoprNetworkRegistryEvents,
14    hoprnodemanagementmodule::HoprNodeManagementModule::HoprNodeManagementModuleEvents,
15    hoprnodesaferegistry::HoprNodeSafeRegistry::HoprNodeSafeRegistryEvents,
16    hoprticketpriceoracle::HoprTicketPriceOracle::HoprTicketPriceOracleEvents, hoprtoken::HoprToken::HoprTokenEvents,
17    hoprwinningprobabilityoracle::HoprWinningProbabilityOracle::HoprWinningProbabilityOracleEvents,
18};
19use hopr_chain_rpc::{HoprIndexerRpcOperations, Log};
20use hopr_chain_types::{
21    ContractAddresses,
22    chain_events::{ChainEventType, NetworkRegistryStatus, SignificantChainEvent},
23};
24use hopr_crypto_types::{
25    keypairs::ChainKeypair,
26    prelude::{Hash, Keypair},
27    types::OffchainSignature,
28};
29use hopr_db_sql::{
30    HoprDbAllOperations, OpenTransaction,
31    api::{info::DomainSeparator, tickets::TicketSelector},
32    errors::DbSqlError,
33    prelude::TicketMarker,
34};
35use hopr_internal_types::prelude::*;
36use hopr_primitive_types::prelude::*;
37use tracing::{debug, error, info, trace, warn};
38
39use crate::errors::{CoreEthereumIndexerError, Result};
40
41#[cfg(all(feature = "prometheus", not(test)))]
42lazy_static::lazy_static! {
43    static ref METRIC_INDEXER_LOG_COUNTERS: hopr_metrics::MultiCounter =
44        hopr_metrics::MultiCounter::new(
45            "hopr_indexer_contract_log_count",
46            "Counts of different HOPR contract logs processed by the Indexer",
47            &["contract"]
48    ).unwrap();
49}
50
51/// Event handling an object for on-chain operations
52///
53/// Once an on-chain operation is recorded by the [crate::block::Indexer], it is pre-processed
54/// and passed on to this object that handles event-specific actions for each on-chain operation.
55#[derive(Clone)]
56pub struct ContractEventHandlers<T, Db> {
57    /// channels, announcements, network_registry, token: contract addresses
58    /// whose event we process
59    addresses: Arc<ContractAddresses>,
60    /// Safe on-chain address which we are monitoring
61    safe_address: Address,
62    /// own address, aka message sender
63    chain_key: ChainKeypair, // TODO: store only address here once Ticket have TryFrom DB
64    /// callbacks to inform other modules
65    db: Db,
66    /// rpc operations to interact with the chain
67    rpc_operations: T,
68}
69
70impl<T, Db> Debug for ContractEventHandlers<T, Db> {
71    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("ContractEventHandler")
73            .field("addresses", &self.addresses)
74            .field("safe_address", &self.safe_address)
75            .field("chain_key", &self.chain_key)
76            .finish_non_exhaustive()
77    }
78}
79
80impl<T, Db> ContractEventHandlers<T, Db>
81where
82    T: HoprIndexerRpcOperations + Clone + Send + 'static,
83    Db: HoprDbAllOperations + Clone,
84{
85    /// Creates a new instance of contract event handlers with RPC operations support.
86    ///
87    /// This constructor initializes the event handlers with all necessary dependencies
88    /// for processing blockchain events and making direct RPC calls for fresh state data.
89    ///
90    /// # Type Parameters
91    /// * `T` - Type implementing `HoprIndexerRpcOperations` for blockchain queries
92    ///
93    /// # Arguments
94    /// * `addresses` - Contract addresses configuration
95    /// * `safe_address` - The node's safe address for filtering relevant events
96    /// * `chain_key` - Cryptographic key for chain operations
97    /// * `db` - Database connection for persistent storage
98    /// * `rpc_operations` - RPC interface for direct blockchain queries
99    ///
100    /// # Returns
101    /// * `Self` - New instance of `ContractEventHandlers`
102    pub fn new(
103        addresses: ContractAddresses,
104        safe_address: Address,
105        chain_key: ChainKeypair,
106        db: Db,
107        rpc_operations: T,
108    ) -> Self {
109        Self {
110            addresses: Arc::new(addresses),
111            safe_address,
112            chain_key,
113            db,
114            rpc_operations,
115        }
116    }
117
118    async fn on_announcement_event(
119        &self,
120        tx: &OpenTransaction,
121        event: HoprAnnouncementsEvents,
122        block_number: u32,
123        _is_synced: bool,
124    ) -> Result<Option<ChainEventType>> {
125        #[cfg(all(feature = "prometheus", not(test)))]
126        METRIC_INDEXER_LOG_COUNTERS.increment(&["announcements"]);
127
128        match event {
129            HoprAnnouncementsEvents::AddressAnnouncement(address_announcement) => {
130                debug!(
131                    multiaddress = &address_announcement.baseMultiaddr,
132                    address = &address_announcement.node.to_string(),
133                    "on_announcement_event: AddressAnnouncement",
134                );
135                // safeguard against empty multiaddrs, skip
136                if address_announcement.baseMultiaddr.is_empty() {
137                    warn!(
138                        address = %address_announcement.node,
139                        "encountered empty multiaddress announcement",
140                    );
141                    return Ok(None);
142                }
143                let node_address: Address = address_announcement.node.into();
144
145                return match self
146                    .db
147                    .insert_announcement(
148                        Some(tx),
149                        node_address,
150                        address_announcement.baseMultiaddr.parse()?,
151                        block_number,
152                    )
153                    .await
154                {
155                    Ok(account) => Ok(Some(ChainEventType::Announcement {
156                        peer: account.public_key.into(),
157                        address: account.chain_addr,
158                        multiaddresses: vec![account.get_multiaddr().expect("not must contain multiaddr")],
159                    })),
160                    Err(DbSqlError::MissingAccount) => Err(CoreEthereumIndexerError::AnnounceBeforeKeyBinding),
161                    Err(e) => Err(e.into()),
162                };
163            }
164            HoprAnnouncementsEvents::KeyBinding(key_binding) => {
165                debug!(
166                    address = %key_binding.chain_key,
167                    public_key = %key_binding.ed25519_pub_key,
168                    "on_announcement_event: KeyBinding",
169                );
170                match KeyBinding::from_parts(
171                    key_binding.chain_key.into(),
172                    key_binding.ed25519_pub_key.0.try_into()?,
173                    OffchainSignature::try_from((key_binding.ed25519_sig_0.0, key_binding.ed25519_sig_1.0))?,
174                ) {
175                    Ok(binding) => {
176                        match self
177                            .db
178                            .insert_account(
179                                Some(tx),
180                                AccountEntry {
181                                    public_key: binding.packet_key,
182                                    chain_addr: binding.chain_key,
183                                    entry_type: AccountType::NotAnnounced,
184                                    published_at: block_number,
185                                },
186                            )
187                            .await
188                        {
189                            Ok(_) => (),
190                            Err(err) => {
191                                // We handle these errors gracefully and don't want the indexer to crash,
192                                // because anybody could write faulty entries into the announcement contract.
193                                error!(%err, "failed to store announcement key binding")
194                            }
195                        }
196                    }
197                    Err(e) => {
198                        warn!(
199                            address = ?key_binding.chain_key,
200                            error = %e,
201                            "Filtering announcement with invalid signature",
202
203                        )
204                    }
205                }
206            }
207            HoprAnnouncementsEvents::RevokeAnnouncement(revocation) => {
208                let node_address: Address = revocation.node.into();
209                match self.db.delete_all_announcements(Some(tx), node_address).await {
210                    Err(DbSqlError::MissingAccount) => {
211                        return Err(CoreEthereumIndexerError::RevocationBeforeKeyBinding);
212                    }
213                    Err(e) => return Err(e.into()),
214                    _ => {}
215                }
216            }
217        };
218
219        Ok(None)
220    }
221
222    async fn on_channel_event(
223        &self,
224        tx: &OpenTransaction,
225        event: HoprChannelsEvents,
226        is_synced: bool,
227    ) -> Result<Option<ChainEventType>> {
228        #[cfg(all(feature = "prometheus", not(test)))]
229        METRIC_INDEXER_LOG_COUNTERS.increment(&["channels"]);
230
231        match event {
232            HoprChannelsEvents::ChannelBalanceDecreased(balance_decreased) => {
233                let maybe_channel = self
234                    .db
235                    .begin_channel_update(tx.into(), &balance_decreased.channelId.0.into())
236                    .await?;
237
238                trace!(
239                    channel_id = %Hash::from(balance_decreased.channelId.0),
240                    is_channel = maybe_channel.is_some(),
241                    "on_channel_balance_decreased_event",
242                );
243
244                if let Some(channel_edits) = maybe_channel {
245                    let new_balance = HoprBalance::from_be_bytes(balance_decreased.newBalance.to_be_bytes::<12>());
246                    let diff = channel_edits.entry().balance - new_balance;
247
248                    let updated_channel = self
249                        .db
250                        .finish_channel_update(tx.into(), channel_edits.change_balance(new_balance))
251                        .await?;
252
253                    if is_synced
254                        && (updated_channel.source == self.chain_key.public().to_address()
255                            || updated_channel.destination == self.chain_key.public().to_address())
256                    {
257                        info!("updating safe balance from chain after channel balance decreased event");
258                        match self.rpc_operations.get_hopr_balance(self.safe_address).await {
259                            Ok(balance) => {
260                                self.db.set_safe_hopr_balance(Some(tx), balance).await?;
261                            }
262                            Err(error) => {
263                                error!(%error, "error getting safe balance from chain after channel balance decreased event");
264                            }
265                        }
266                    }
267
268                    Ok(Some(ChainEventType::ChannelBalanceDecreased(updated_channel, diff)))
269                } else {
270                    error!(channel_id = %Hash::from(balance_decreased.channelId.0), "observed balance decreased event for a channel that does not exist");
271                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
272                }
273            }
274            HoprChannelsEvents::ChannelBalanceIncreased(balance_increased) => {
275                let maybe_channel = self
276                    .db
277                    .begin_channel_update(tx.into(), &balance_increased.channelId.0.into())
278                    .await?;
279
280                trace!(
281                    channel_id = %Hash::from(balance_increased.channelId.0),
282                    is_channel = maybe_channel.is_some(),
283                    "on_channel_balance_increased_event",
284                );
285
286                if let Some(channel_edits) = maybe_channel {
287                    let new_balance = HoprBalance::from_be_bytes(balance_increased.newBalance.to_be_bytes::<12>());
288                    let diff = new_balance - channel_edits.entry().balance;
289
290                    let updated_channel = self
291                        .db
292                        .finish_channel_update(tx.into(), channel_edits.change_balance(new_balance))
293                        .await?;
294
295                    if updated_channel.source == self.chain_key.public().to_address() && is_synced {
296                        info!("updating safe balance from chain after channel balance increased event");
297                        match self.rpc_operations.get_hopr_balance(self.safe_address).await {
298                            Ok(balance) => {
299                                self.db.set_safe_hopr_balance(Some(tx), balance).await?;
300                            }
301                            Err(error) => {
302                                error!(%error, "error getting safe balance from chain after channel balance increased event");
303                            }
304                        }
305                        info!("updating safe allowance from chain after channel balance increased event");
306                        match self
307                            .rpc_operations
308                            .get_hopr_allowance(self.safe_address, self.addresses.channels)
309                            .await
310                        {
311                            Ok(allowance) => {
312                                self.db.set_safe_hopr_allowance(Some(tx), allowance).await?;
313                            }
314                            Err(error) => {
315                                error!(%error, "error getting safe allowance from chain after channel balance increased event");
316                            }
317                        }
318                    }
319
320                    Ok(Some(ChainEventType::ChannelBalanceIncreased(updated_channel, diff)))
321                } else {
322                    error!(channel_id = %Hash::from(balance_increased.channelId.0), "observed balance increased event for a channel that does not exist");
323                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
324                }
325            }
326            HoprChannelsEvents::ChannelClosed(channel_closed) => {
327                let maybe_channel = self
328                    .db
329                    .begin_channel_update(tx.into(), &channel_closed.channelId.0.into())
330                    .await?;
331
332                trace!(
333                    channel_id = ?channel_closed.channelId.0,
334                    is_channel = maybe_channel.is_some(),
335                    "on_channel_closed_event",
336                );
337
338                if let Some(channel_edits) = maybe_channel {
339                    let channel_id = channel_edits.entry().get_id();
340                    let orientation = channel_edits.entry().orientation(&self.chain_key.public().to_address());
341
342                    // If the channel is our own (incoming or outgoing) reset its fields
343                    // and change its state to Closed.
344                    let updated_channel = if let Some((direction, _)) = orientation {
345                        // Set all channel fields like we do on-chain on close
346                        let channel_edits = channel_edits
347                            .change_status(ChannelStatus::Closed)
348                            .change_balance(HoprBalance::zero())
349                            .change_ticket_index(0);
350
351                        let updated_channel = self.db.finish_channel_update(tx.into(), channel_edits).await?;
352
353                        // Perform additional tasks based on the channel's direction
354                        match direction {
355                            ChannelDirection::Incoming => {
356                                // On incoming channel, mark all unredeemed tickets as neglected
357                                self.db
358                                    .mark_tickets_as(updated_channel.into(), TicketMarker::Neglected)
359                                    .await?;
360                            }
361                            ChannelDirection::Outgoing => {
362                                // On outgoing channels, reset the current_ticket_index to zero
363                                self.db.reset_outgoing_ticket_index(channel_id).await?;
364                            }
365                        }
366                        updated_channel
367                    } else {
368                        // Closed channels that are not our own, we be safely removed
369                        // from the database
370                        let updated_channel = self.db.finish_channel_update(tx.into(), channel_edits.delete()).await?;
371                        debug!(channel_id = %channel_id, "foreign closed closed channel was deleted");
372                        updated_channel
373                    };
374
375                    Ok(Some(ChainEventType::ChannelClosed(updated_channel)))
376                } else {
377                    error!(channel_id = %Hash::from(channel_closed.channelId.0), "observed closure finalization event for a channel that does not exist");
378                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
379                }
380            }
381            HoprChannelsEvents::ChannelOpened(channel_opened) => {
382                let source: Address = channel_opened.source.into();
383                let destination: Address = channel_opened.destination.into();
384                let channel_id = generate_channel_id(&source, &destination);
385
386                let maybe_channel = self.db.begin_channel_update(tx.into(), &channel_id).await?;
387
388                let channel = if let Some(channel_edits) = maybe_channel {
389                    // Check that we're not receiving the Open event without the channel being Close prior
390                    if channel_edits.entry().status != ChannelStatus::Closed {
391                        return Err(CoreEthereumIndexerError::ProcessError(format!(
392                            "trying to re-open channel {} which is not closed, but {}",
393                            channel_edits.entry().get_id(),
394                            channel_edits.entry().status,
395                        )));
396                    }
397
398                    trace!(%source, %destination, %channel_id, "on_channel_reopened_event");
399
400                    let current_epoch = channel_edits.entry().channel_epoch;
401
402                    // cleanup tickets from previous epochs on channel re-opening
403                    if source == self.chain_key.public().to_address()
404                        || destination == self.chain_key.public().to_address()
405                    {
406                        self.db
407                            .mark_tickets_as(TicketSelector::new(channel_id, current_epoch), TicketMarker::Neglected)
408                            .await?;
409
410                        self.db.reset_outgoing_ticket_index(channel_id).await?;
411                    }
412
413                    // set all channel fields like we do on-chain on close
414                    self.db
415                        .finish_channel_update(
416                            tx.into(),
417                            channel_edits
418                                .change_ticket_index(0_u32)
419                                .change_epoch(current_epoch.add(1))
420                                .change_status(ChannelStatus::Open),
421                        )
422                        .await?
423                } else {
424                    trace!(%source, %destination, %channel_id, "on_channel_opened_event");
425
426                    let new_channel = ChannelEntry::new(
427                        source,
428                        destination,
429                        0_u32.into(),
430                        0_u32.into(),
431                        ChannelStatus::Open,
432                        1_u32.into(),
433                    );
434
435                    self.db.upsert_channel(tx.into(), new_channel).await?;
436                    new_channel
437                };
438
439                Ok(Some(ChainEventType::ChannelOpened(channel)))
440            }
441            HoprChannelsEvents::TicketRedeemed(ticket_redeemed) => {
442                let maybe_channel = self
443                    .db
444                    .begin_channel_update(tx.into(), &ticket_redeemed.channelId.0.into())
445                    .await?;
446
447                if let Some(channel_edits) = maybe_channel {
448                    let ack_ticket = match channel_edits.entry().direction(&self.chain_key.public().to_address()) {
449                        // For channels where destination is us, it means that our ticket
450                        // has been redeemed, so mark it in the DB as redeemed
451                        Some(ChannelDirection::Incoming) => {
452                            // Filter all BeingRedeemed tickets in this channel and its current epoch
453                            let mut matching_tickets = self
454                                .db
455                                .get_tickets(
456                                    TicketSelector::from(channel_edits.entry())
457                                        .with_state(AcknowledgedTicketStatus::BeingRedeemed),
458                                )
459                                .await?
460                                .into_iter()
461                                .filter(|ticket| {
462                                    // The ticket that has been redeemed at this point has: index + index_offset - 1 ==
463                                    // new_ticket_index - 1 Since unaggregated
464                                    // tickets have index_offset = 1, for the unagg case this leads to: index ==
465                                    // new_ticket_index - 1
466                                    let ticket_idx = ticket.verified_ticket().index;
467                                    let ticket_off = ticket.verified_ticket().index_offset as u64;
468
469                                    ticket_idx + ticket_off == ticket_redeemed.newTicketIndex.to::<u64>()
470                                })
471                                .collect::<Vec<_>>();
472
473                            match matching_tickets.len().cmp(&1) {
474                                Ordering::Equal => {
475                                    let ack_ticket = matching_tickets.pop().unwrap();
476
477                                    self.db
478                                        .mark_tickets_as((&ack_ticket).into(), TicketMarker::Redeemed)
479                                        .await?;
480                                    info!(%ack_ticket, "ticket marked as redeemed");
481                                    Some(ack_ticket)
482                                }
483                                Ordering::Less => {
484                                    error!(
485                                        idx = %ticket_redeemed.newTicketIndex.to::<u64>() - 1,
486                                        entry = %channel_edits.entry(),
487                                        "could not find acknowledged 'BeingRedeemed' ticket",
488                                    );
489                                    // This is not an error, because the ticket might've become neglected before
490                                    // the ticket redemption could finish
491                                    None
492                                }
493                                Ordering::Greater => {
494                                    error!(
495                                        count = matching_tickets.len(),
496                                        index = %ticket_redeemed.newTicketIndex.to::<u64>() - 1,
497                                        entry = %channel_edits.entry(),
498                                        "found tickets matching 'BeingRedeemed'",
499                                    );
500                                    return Err(CoreEthereumIndexerError::ProcessError(format!(
501                                        "multiple tickets matching idx {} found in {}",
502                                        ticket_redeemed.newTicketIndex.to::<u64>() - 1,
503                                        channel_edits.entry()
504                                    )));
505                                }
506                            }
507                        }
508                        // For the channel where the source is us, it means a ticket that we
509                        // issue has been redeemed.
510                        // So we just need to be sure our outgoing ticket
511                        // index value in the cache is at least the index of the redeemed ticket
512                        Some(ChannelDirection::Outgoing) => {
513                            // We need to ensure the outgoing ticket index is at least this new value
514                            debug!(channel = %channel_edits.entry(), "observed redeem event on an outgoing channel");
515                            self.db
516                                .compare_and_set_outgoing_ticket_index(
517                                    channel_edits.entry().get_id(),
518                                    ticket_redeemed.newTicketIndex.to::<u64>(),
519                                )
520                                .await?;
521                            None
522                        }
523                        // For a channel where neither source nor destination is us, we don't care
524                        None => {
525                            // Not our redeem event
526                            debug!(channel = %channel_edits.entry(), "observed redeem event on a foreign channel");
527                            None
528                        }
529                    };
530
531                    // Update the ticket index on the Channel entry and get the updated model
532                    let channel = self
533                        .db
534                        .finish_channel_update(
535                            tx.into(),
536                            channel_edits.change_ticket_index(U256::from_be_bytes(
537                                ticket_redeemed.newTicketIndex.to_be_bytes::<6>(),
538                            )),
539                        )
540                        .await?;
541                    // Neglect all the tickets in this channel
542                    // which have a lower ticket index than `ticket_redeemed.new_ticket_index`
543                    self.db
544                        .mark_tickets_as(
545                            TicketSelector::from(&channel)
546                                .with_index_range(..ticket_redeemed.newTicketIndex.to::<u64>()),
547                            TicketMarker::Neglected,
548                        )
549                        .await?;
550
551                    Ok(Some(ChainEventType::TicketRedeemed(channel, ack_ticket)))
552                } else {
553                    error!(channel_id = %Hash::from(ticket_redeemed.channelId.0), "observed ticket redeem on a channel that we don't have in the DB");
554                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
555                }
556            }
557            HoprChannelsEvents::OutgoingChannelClosureInitiated(closure_initiated) => {
558                let maybe_channel = self
559                    .db
560                    .begin_channel_update(tx.into(), &closure_initiated.channelId.0.into())
561                    .await?;
562
563                let closure_time: u32 = closure_initiated.closureTime;
564                if let Some(channel_edits) = maybe_channel {
565                    let new_status = ChannelStatus::PendingToClose(
566                        SystemTime::UNIX_EPOCH.add(Duration::from_secs(closure_time.into())),
567                    );
568
569                    let channel = self
570                        .db
571                        .finish_channel_update(tx.into(), channel_edits.change_status(new_status))
572                        .await?;
573                    Ok(Some(ChainEventType::ChannelClosureInitiated(channel)))
574                } else {
575                    error!(channel_id = %Hash::from(closure_initiated.channelId.0), "observed channel closure initiation on a channel that we don't have in the DB");
576                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
577                }
578            }
579            HoprChannelsEvents::DomainSeparatorUpdated(domain_separator_updated) => {
580                self.db
581                    .set_domain_separator(
582                        Some(tx),
583                        DomainSeparator::Channel,
584                        domain_separator_updated.domainSeparator.0.into(),
585                    )
586                    .await?;
587
588                Ok(None)
589            }
590            HoprChannelsEvents::LedgerDomainSeparatorUpdated(ledger_domain_separator_updated) => {
591                self.db
592                    .set_domain_separator(
593                        Some(tx),
594                        DomainSeparator::Ledger,
595                        ledger_domain_separator_updated.ledgerDomainSeparator.0.into(),
596                    )
597                    .await?;
598
599                Ok(None)
600            }
601        }
602    }
603
604    async fn on_token_event(
605        &self,
606        tx: &OpenTransaction,
607        event: HoprTokenEvents,
608        is_synced: bool,
609    ) -> Result<Option<ChainEventType>> {
610        #[cfg(all(feature = "prometheus", not(test)))]
611        METRIC_INDEXER_LOG_COUNTERS.increment(&["token"]);
612
613        match event {
614            HoprTokenEvents::Transfer(transferred) => {
615                let from: Address = transferred.from.into();
616                let to: Address = transferred.to.into();
617
618                trace!(
619                    safe_address = %&self.safe_address, %from, %to,
620                    "on_token_transfer_event"
621                );
622
623                if to.ne(&self.safe_address) && from.ne(&self.safe_address) {
624                    error!(
625                    safe_address = %&self.safe_address, %from, %to,
626                        "filter misconfiguration: transfer event not involving the safe");
627                    return Ok(None);
628                }
629
630                if is_synced {
631                    info!("updating safe balance from chain after transfer event");
632                    match self.rpc_operations.get_hopr_balance(self.safe_address).await {
633                        Ok(balance) => {
634                            self.db.set_safe_hopr_balance(Some(tx), balance).await?;
635                        }
636                        Err(error) => {
637                            error!(%error, "error getting safe balance from chain after transfer event");
638                        }
639                    }
640                    info!("updating safe allowance from chain after transfer event");
641                    match self
642                        .rpc_operations
643                        .get_hopr_allowance(self.safe_address, self.addresses.channels)
644                        .await
645                    {
646                        Ok(allowance) => {
647                            self.db.set_safe_hopr_allowance(Some(tx), allowance).await?;
648                        }
649                        Err(error) => {
650                            error!(%error, "error getting safe allowance from chain after transfer event");
651                        }
652                    }
653                }
654            }
655            HoprTokenEvents::Approval(approved) => {
656                let owner: Address = approved.owner.into();
657                let spender: Address = approved.spender.into();
658
659                trace!(
660                    address = %&self.safe_address, %owner, %spender, allowance = %approved.value,
661                    "on_token_approval_event",
662
663                );
664
665                if owner.ne(&self.safe_address) || spender.ne(&self.addresses.channels) {
666                    error!(
667                        address = %&self.safe_address, %owner, %spender, allowance = %approved.value,
668                        "filter misconfiguration: approval event not involving the safe and channels contract");
669                    return Ok(None);
670                }
671
672                // if approval is for tokens on Safe contract to be spent by HoprChannels
673                if is_synced {
674                    info!("updating safe allowance from chain after approval event");
675                    match self
676                        .rpc_operations
677                        .get_hopr_allowance(self.safe_address, self.addresses.channels)
678                        .await
679                    {
680                        Ok(allowance) => {
681                            self.db.set_safe_hopr_allowance(Some(tx), allowance).await?;
682                        }
683                        Err(error) => {
684                            error!(%error, "error getting safe allowance from chain after approval event");
685                        }
686                    }
687                }
688            }
689            _ => error!("Implement all the other filters for HoprTokenEvents"),
690        }
691
692        Ok(None)
693    }
694
695    async fn on_network_registry_event(
696        &self,
697        tx: &OpenTransaction,
698        event: HoprNetworkRegistryEvents,
699        _is_synced: bool,
700    ) -> Result<Option<ChainEventType>> {
701        #[cfg(all(feature = "prometheus", not(test)))]
702        METRIC_INDEXER_LOG_COUNTERS.increment(&["network_registry"]);
703
704        match event {
705            HoprNetworkRegistryEvents::DeregisteredByManager(deregistered) => {
706                debug!(
707                    node_address = %deregistered.nodeAddress,
708                    "on_network_registry_deregistered_by_manager_event",
709                );
710                let node_address: Address = deregistered.nodeAddress.into();
711                self.db
712                    .set_access_in_network_registry(Some(tx), node_address, false)
713                    .await?;
714
715                return Ok(Some(ChainEventType::NetworkRegistryUpdate(
716                    node_address,
717                    NetworkRegistryStatus::Denied,
718                )));
719            }
720            HoprNetworkRegistryEvents::Deregistered(deregistered) => {
721                debug!(
722                    node_address = %deregistered.nodeAddress,
723                    "on_network_registry_deregistered_event",
724                );
725                let node_address: Address = deregistered.nodeAddress.into();
726                self.db
727                    .set_access_in_network_registry(Some(tx), node_address, false)
728                    .await?;
729
730                return Ok(Some(ChainEventType::NetworkRegistryUpdate(
731                    node_address,
732                    NetworkRegistryStatus::Denied,
733                )));
734            }
735            HoprNetworkRegistryEvents::RegisteredByManager(registered) => {
736                let node_address: Address = registered.nodeAddress.into();
737                debug!(
738                    %node_address,
739                    "Node registered by manager, adding to the network registry",
740                );
741                self.db
742                    .set_access_in_network_registry(Some(tx), node_address, true)
743                    .await?;
744
745                if node_address == self.chain_key.public().to_address() {
746                    info!(
747                        "This node has been added to the registry, node activation process continues on: http://hub.hoprnet.org/."
748                    );
749                }
750
751                return Ok(Some(ChainEventType::NetworkRegistryUpdate(
752                    node_address,
753                    NetworkRegistryStatus::Allowed,
754                )));
755            }
756            HoprNetworkRegistryEvents::Registered(registered) => {
757                let node_address: Address = registered.nodeAddress.into();
758                debug!(
759                    %node_address,
760                    "Node registered, adding to the network registry",
761                );
762                self.db
763                    .set_access_in_network_registry(Some(tx), node_address, true)
764                    .await?;
765
766                if node_address == self.chain_key.public().to_address() {
767                    info!(
768                        "This node has been added to the registry, node can now continue the node activation process on: http://hub.hoprnet.org/."
769                    );
770                }
771
772                return Ok(Some(ChainEventType::NetworkRegistryUpdate(
773                    node_address,
774                    NetworkRegistryStatus::Allowed,
775                )));
776            }
777            HoprNetworkRegistryEvents::EligibilityUpdated(eligibility_updated) => {
778                debug!(
779                    staking_account = %eligibility_updated.stakingAccount,
780                    eligibility = %eligibility_updated.eligibility,
781                    "Node eligibility updated, updating the safe eligibility",
782                );
783                let account: Address = eligibility_updated.stakingAccount.into();
784                self.db
785                    .set_safe_eligibility(Some(tx), account, eligibility_updated.eligibility)
786                    .await?;
787            }
788            HoprNetworkRegistryEvents::NetworkRegistryStatusUpdated(enabled) => {
789                debug!(enabled = enabled.isEnabled, "on_network_registry_status_updated_event",);
790                self.db
791                    .set_network_registry_enabled(Some(tx), enabled.isEnabled)
792                    .await?;
793            }
794            _ => {
795                debug!("on_network_registry_event - not implemented for event: {:?}", event);
796            } // Not important to at the moment
797        };
798
799        Ok(None)
800    }
801
802    async fn on_node_safe_registry_event(
803        &self,
804        tx: &OpenTransaction,
805        event: HoprNodeSafeRegistryEvents,
806        _is_synced: bool,
807    ) -> Result<Option<ChainEventType>> {
808        #[cfg(all(feature = "prometheus", not(test)))]
809        METRIC_INDEXER_LOG_COUNTERS.increment(&["safe_registry"]);
810
811        match event {
812            HoprNodeSafeRegistryEvents::RegisteredNodeSafe(registered) => {
813                if self.chain_key.public().to_address() == registered.nodeAddress.into() {
814                    info!(safe_address = %registered.safeAddress, "Node safe registered", );
815                    // NOTE: we don't store this state in the DB
816                    return Ok(Some(ChainEventType::NodeSafeRegistered(registered.safeAddress.into())));
817                }
818            }
819            HoprNodeSafeRegistryEvents::DergisteredNodeSafe(deregistered) => {
820                if self.chain_key.public().to_address() == deregistered.nodeAddress.into() {
821                    info!("Node safe unregistered");
822                    // NOTE: we don't store this state in the DB
823                }
824            }
825            HoprNodeSafeRegistryEvents::DomainSeparatorUpdated(domain_separator_updated) => {
826                self.db
827                    .set_domain_separator(
828                        Some(tx),
829                        DomainSeparator::SafeRegistry,
830                        domain_separator_updated.domainSeparator.0.into(),
831                    )
832                    .await?;
833            }
834        }
835
836        Ok(None)
837    }
838
839    async fn on_node_management_module_event(
840        &self,
841        _db: &OpenTransaction,
842        _event: HoprNodeManagementModuleEvents,
843        _is_synced: bool,
844    ) -> Result<Option<ChainEventType>> {
845        #[cfg(all(feature = "prometheus", not(test)))]
846        METRIC_INDEXER_LOG_COUNTERS.increment(&["node_management_module"]);
847
848        // Don't care at the moment
849        Ok(None)
850    }
851
852    async fn on_ticket_winning_probability_oracle_event(
853        &self,
854        tx: &OpenTransaction,
855        event: HoprWinningProbabilityOracleEvents,
856        _is_synced: bool,
857    ) -> Result<Option<ChainEventType>> {
858        #[cfg(all(feature = "prometheus", not(test)))]
859        METRIC_INDEXER_LOG_COUNTERS.increment(&["win_prob_oracle"]);
860
861        match event {
862            HoprWinningProbabilityOracleEvents::WinProbUpdated(update) => {
863                let old_minimum_win_prob: WinningProbability = update.oldWinProb.to_be_bytes().into();
864                let new_minimum_win_prob: WinningProbability = update.newWinProb.to_be_bytes().into();
865
866                trace!(
867                    %old_minimum_win_prob,
868                    %new_minimum_win_prob,
869                    "on_ticket_minimum_win_prob_updated",
870                );
871
872                self.db
873                    .set_minimum_incoming_ticket_win_prob(Some(tx), new_minimum_win_prob)
874                    .await?;
875
876                info!(
877                    %old_minimum_win_prob,
878                    %new_minimum_win_prob,
879                    "minimum ticket winning probability updated"
880                );
881
882                // If the old minimum was less strict, we need to mark of all the
883                // tickets below the new higher minimum as rejected
884                if old_minimum_win_prob.approx_cmp(&new_minimum_win_prob).is_lt() {
885                    let mut selector: Option<TicketSelector> = None;
886                    for channel in self.db.get_incoming_channels(tx.into()).await? {
887                        selector = selector
888                            .map(|s| s.also_on_channel(channel.get_id(), channel.channel_epoch))
889                            .or_else(|| Some(TicketSelector::from(channel)));
890                    }
891                    // Reject unredeemed tickets on all the channels with win prob lower than the new one
892                    if let Some(selector) = selector {
893                        let num_rejected = self
894                            .db
895                            .mark_tickets_as(
896                                selector.with_winning_probability(..new_minimum_win_prob),
897                                TicketMarker::Rejected,
898                            )
899                            .await?;
900                        info!(
901                            count = num_rejected,
902                            "unredeemed tickets were rejected, because the minimum winning probability has been \
903                             increased"
904                        );
905                    }
906                }
907            }
908            _ => {
909                // Ignore other events
910            }
911        }
912        Ok(None)
913    }
914
915    async fn on_ticket_price_oracle_event(
916        &self,
917        tx: &OpenTransaction,
918        event: HoprTicketPriceOracleEvents,
919        _is_synced: bool,
920    ) -> Result<Option<ChainEventType>> {
921        #[cfg(all(feature = "prometheus", not(test)))]
922        METRIC_INDEXER_LOG_COUNTERS.increment(&["price_oracle"]);
923
924        match event {
925            HoprTicketPriceOracleEvents::TicketPriceUpdated(update) => {
926                trace!(
927                    old = update._0.to_string(),
928                    new = update._1.to_string(),
929                    "on_ticket_price_updated",
930                );
931
932                self.db
933                    .update_ticket_price(Some(tx), HoprBalance::from_be_bytes(update._1.to_be_bytes::<32>()))
934                    .await?;
935
936                info!(price = %update._1, "ticket price updated");
937            }
938            HoprTicketPriceOracleEvents::OwnershipTransferred(_event) => {
939                // ignore ownership transfer event
940            }
941        }
942        Ok(None)
943    }
944
945    #[tracing::instrument(level = "debug", skip(self, slog), fields(log=%slog))]
946    async fn process_log_event(
947        &self,
948        tx: &OpenTransaction,
949        slog: SerializableLog,
950        is_synced: bool,
951    ) -> Result<Option<ChainEventType>> {
952        trace!(log = %slog, "log content");
953
954        let log = Log::from(slog.clone());
955
956        let primitive_log = alloy::primitives::Log::new(
957            slog.address.into(),
958            slog.topics.iter().map(|h| B256::from_slice(h.as_ref())).collect(),
959            slog.data.clone().into(),
960        )
961        .ok_or_else(|| {
962            CoreEthereumIndexerError::ProcessError(format!("failed to convert log to primitive log: {slog:?}"))
963        })?;
964
965        if log.address.eq(&self.addresses.announcements) {
966            let bn = log.block_number as u32;
967            let event = HoprAnnouncementsEvents::decode_log(&primitive_log)?;
968            self.on_announcement_event(tx, event.data, bn, is_synced).await
969        } else if log.address.eq(&self.addresses.channels) {
970            let event = HoprChannelsEvents::decode_log(&primitive_log)?;
971            self.on_channel_event(tx, event.data, is_synced).await
972        } else if log.address.eq(&self.addresses.network_registry) {
973            let event = HoprNetworkRegistryEvents::decode_log(&primitive_log)?;
974            self.on_network_registry_event(tx, event.data, is_synced).await
975        } else if log.address.eq(&self.addresses.token) {
976            let event = HoprTokenEvents::decode_log(&primitive_log)?;
977            self.on_token_event(tx, event.data, is_synced).await
978        } else if log.address.eq(&self.addresses.safe_registry) {
979            let event = HoprNodeSafeRegistryEvents::decode_log(&primitive_log)?;
980            self.on_node_safe_registry_event(tx, event.data, is_synced).await
981        } else if log.address.eq(&self.addresses.module_implementation) {
982            let event = HoprNodeManagementModuleEvents::decode_log(&primitive_log)?;
983            self.on_node_management_module_event(tx, event.data, is_synced).await
984        } else if log.address.eq(&self.addresses.price_oracle) {
985            let event = HoprTicketPriceOracleEvents::decode_log(&primitive_log)?;
986            self.on_ticket_price_oracle_event(tx, event.data, is_synced).await
987        } else if log.address.eq(&self.addresses.win_prob_oracle) {
988            let event = HoprWinningProbabilityOracleEvents::decode_log(&primitive_log)?;
989            self.on_ticket_winning_probability_oracle_event(tx, event.data, is_synced)
990                .await
991        } else {
992            #[cfg(all(feature = "prometheus", not(test)))]
993            METRIC_INDEXER_LOG_COUNTERS.increment(&["unknown"]);
994
995            error!(
996                address = %log.address, log = ?log,
997                "on_event error - unknown contract address, received log"
998            );
999            return Err(CoreEthereumIndexerError::UnknownContract(log.address));
1000        }
1001    }
1002}
1003
1004#[async_trait]
1005impl<T, Db> crate::traits::ChainLogHandler for ContractEventHandlers<T, Db>
1006where
1007    T: HoprIndexerRpcOperations + Clone + Send + Sync + 'static,
1008    Db: HoprDbAllOperations + Clone + Debug + Send + Sync + 'static,
1009{
1010    fn contract_addresses(&self) -> Vec<Address> {
1011        vec![
1012            self.addresses.announcements,
1013            self.addresses.channels,
1014            self.addresses.module_implementation,
1015            self.addresses.network_registry,
1016            self.addresses.price_oracle,
1017            self.addresses.win_prob_oracle,
1018            self.addresses.safe_registry,
1019            self.addresses.token,
1020        ]
1021    }
1022
1023    fn contract_addresses_map(&self) -> Arc<ContractAddresses> {
1024        self.addresses.clone()
1025    }
1026
1027    fn safe_address(&self) -> Address {
1028        self.safe_address
1029    }
1030
1031    fn contract_address_topics(&self, contract: Address) -> Vec<B256> {
1032        if contract.eq(&self.addresses.announcements) {
1033            crate::constants::topics::announcement()
1034        } else if contract.eq(&self.addresses.channels) {
1035            crate::constants::topics::channel()
1036        } else if contract.eq(&self.addresses.module_implementation) {
1037            crate::constants::topics::module_implementation()
1038        } else if contract.eq(&self.addresses.network_registry) {
1039            crate::constants::topics::network_registry()
1040        } else if contract.eq(&self.addresses.price_oracle) {
1041            crate::constants::topics::ticket_price_oracle()
1042        } else if contract.eq(&self.addresses.win_prob_oracle) {
1043            crate::constants::topics::winning_prob_oracle()
1044        } else if contract.eq(&self.addresses.safe_registry) {
1045            crate::constants::topics::node_safe_registry()
1046        } else {
1047            panic!("use of unsupported contract address: {contract}");
1048        }
1049    }
1050
1051    async fn collect_log_event(&self, slog: SerializableLog, is_synced: bool) -> Result<Option<SignificantChainEvent>> {
1052        let myself = self.clone();
1053        self.db
1054            .begin_transaction()
1055            .await?
1056            .perform(move |tx| {
1057                let log = slog.clone();
1058                let tx_hash = Hash::from(log.tx_hash);
1059                let log_id = log.log_index;
1060                let block_id = log.block_number;
1061
1062                Box::pin(async move {
1063                    match myself.process_log_event(tx, log, is_synced).await {
1064                        // If a significant chain event can be extracted from the log
1065                        Ok(Some(event_type)) => {
1066                            let significant_event = SignificantChainEvent { tx_hash, event_type };
1067                            debug!(block_id, %tx_hash, log_id, ?significant_event, "indexer got significant_event");
1068                            Ok(Some(significant_event))
1069                        }
1070                        Ok(None) => {
1071                            debug!(block_id, %tx_hash, log_id, "no significant event in log");
1072                            Ok(None)
1073                        }
1074                        Err(error) => {
1075                            error!(block_id, %tx_hash, log_id, %error, "error processing log in tx");
1076                            Err(error)
1077                        }
1078                    }
1079                })
1080            })
1081            .await
1082    }
1083}
1084
1085#[cfg(test)]
1086mod tests {
1087    use std::{
1088        sync::{Arc, atomic::Ordering},
1089        time::SystemTime,
1090    };
1091
1092    use alloy::{
1093        dyn_abi::DynSolValue,
1094        primitives::{Address as AlloyAddress, U256},
1095        sol_types::{SolEvent, SolValue},
1096    };
1097    use anyhow::{Context, anyhow};
1098    use hex_literal::hex;
1099    use hopr_chain_rpc::HoprIndexerRpcOperations;
1100    use hopr_chain_types::{
1101        ContractAddresses,
1102        chain_events::{ChainEventType, NetworkRegistryStatus},
1103    };
1104    use hopr_crypto_types::prelude::*;
1105    use hopr_db_sql::{
1106        HoprDbAllOperations, HoprDbGeneralModelOperations,
1107        accounts::{ChainOrPacketKey, HoprDbAccountOperations},
1108        api::{info::DomainSeparator, tickets::HoprDbTicketOperations},
1109        channels::HoprDbChannelOperations,
1110        db::HoprDb,
1111        info::HoprDbInfoOperations,
1112        prelude::HoprDbResolverOperations,
1113        registry::HoprDbRegistryOperations,
1114    };
1115    use hopr_internal_types::prelude::*;
1116    use hopr_primitive_types::prelude::*;
1117    use multiaddr::Multiaddr;
1118    use primitive_types::H256;
1119
1120    use super::ContractEventHandlers;
1121
1122    lazy_static::lazy_static! {
1123        static ref SELF_PRIV_KEY: OffchainKeypair = OffchainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
1124        static ref COUNTERPARTY_CHAIN_KEY: ChainKeypair = ChainKeypair::random();
1125        static ref COUNTERPARTY_CHAIN_ADDRESS: Address = COUNTERPARTY_CHAIN_KEY.public().to_address();
1126        static ref SELF_CHAIN_KEY: ChainKeypair = ChainKeypair::random();
1127        static ref SELF_CHAIN_ADDRESS: Address = SELF_CHAIN_KEY.public().to_address();
1128        static ref STAKE_ADDRESS: Address = "4331eaa9542b6b034c43090d9ec1c2198758dbc3".parse().expect("lazy static address should be constructible");
1129        static ref CHANNELS_ADDR: Address = "bab20aea98368220baa4e3b7f151273ee71df93b".parse().expect("lazy static address should be constructible"); // just a dummy
1130        static ref TOKEN_ADDR: Address = "47d1677e018e79dcdd8a9c554466cb1556fa5007".parse().expect("lazy static address should be constructible"); // just a dummy
1131        static ref NETWORK_REGISTRY_ADDR: Address = "a469d0225f884fb989cbad4fe289f6fd2fb98051".parse().expect("lazy static address should be constructible"); // just a dummy
1132        static ref NODE_SAFE_REGISTRY_ADDR: Address = "0dcd1bf9a1b36ce34237eeafef220932846bcd82".parse().expect("lazy static address should be constructible"); // just a dummy
1133        static ref ANNOUNCEMENTS_ADDR: Address = "11db4791bf45ef31a10ea4a1b5cb90f46cc72c7e".parse().expect("lazy static address should be constructible"); // just a dummy
1134        static ref SAFE_MANAGEMENT_MODULE_ADDR: Address = "9b91245a65ad469163a86e32b2281af7a25f38ce".parse().expect("lazy static address should be constructible"); // just a dummy
1135        static ref SAFE_INSTANCE_ADDR: Address = "b93d7fdd605fb64fdcc87f21590f950170719d47".parse().expect("lazy static address should be constructible"); // just a dummy
1136        static ref TICKET_PRICE_ORACLE_ADDR: Address = "11db4391bf45ef31a10ea4a1b5cb90f46cc72c7e".parse().expect("lazy static address should be constructible"); // just a dummy
1137        static ref WIN_PROB_ORACLE_ADDR: Address = "00db4391bf45ef31a10ea4a1b5cb90f46cc64c7e".parse().expect("lazy static address should be constructible"); // just a dummy
1138    }
1139
1140    mockall::mock! {
1141        pub(crate) IndexerRpcOperations {}
1142
1143        #[async_trait::async_trait]
1144        impl HoprIndexerRpcOperations for IndexerRpcOperations {
1145            async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64>;
1146
1147        async fn get_hopr_allowance(&self, owner: Address, spender: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
1148
1149        async fn get_xdai_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<XDaiBalance>;
1150
1151        async fn get_hopr_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
1152
1153        fn try_stream_logs<'a>(
1154            &'a self,
1155            start_block_number: u64,
1156            filters: hopr_chain_rpc::FilterSet,
1157            is_synced: bool,
1158        ) -> hopr_chain_rpc::errors::Result<std::pin::Pin<Box<dyn futures::Stream<Item=hopr_chain_rpc::BlockWithLogs> + Send + 'a> > >;
1159        }
1160    }
1161
1162    #[derive(Clone)]
1163    pub struct ClonableMockOperations {
1164        pub inner: Arc<MockIndexerRpcOperations>,
1165    }
1166
1167    #[async_trait::async_trait]
1168    impl HoprIndexerRpcOperations for ClonableMockOperations {
1169        async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64> {
1170            self.inner.block_number().await
1171        }
1172
1173        async fn get_hopr_allowance(
1174            &self,
1175            owner: Address,
1176            spender: Address,
1177        ) -> hopr_chain_rpc::errors::Result<HoprBalance> {
1178            self.inner.get_hopr_allowance(owner, spender).await
1179        }
1180
1181        async fn get_xdai_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<XDaiBalance> {
1182            self.inner.get_xdai_balance(address).await
1183        }
1184
1185        async fn get_hopr_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<HoprBalance> {
1186            self.inner.get_hopr_balance(address).await
1187        }
1188
1189        fn try_stream_logs<'a>(
1190            &'a self,
1191            start_block_number: u64,
1192            filters: hopr_chain_rpc::FilterSet,
1193            is_synced: bool,
1194        ) -> hopr_chain_rpc::errors::Result<
1195            std::pin::Pin<Box<dyn futures::Stream<Item = hopr_chain_rpc::BlockWithLogs> + Send + 'a>>,
1196        > {
1197            self.inner.try_stream_logs(start_block_number, filters, is_synced)
1198        }
1199    }
1200
1201    fn init_handlers<T: Clone, Db: HoprDbAllOperations + Clone>(
1202        rpc_operations: T,
1203        db: Db,
1204    ) -> ContractEventHandlers<T, Db> {
1205        ContractEventHandlers {
1206            addresses: Arc::new(ContractAddresses {
1207                channels: *CHANNELS_ADDR,
1208                token: *TOKEN_ADDR,
1209                network_registry: *NETWORK_REGISTRY_ADDR,
1210                network_registry_proxy: Default::default(),
1211                safe_registry: *NODE_SAFE_REGISTRY_ADDR,
1212                announcements: *ANNOUNCEMENTS_ADDR,
1213                module_implementation: *SAFE_MANAGEMENT_MODULE_ADDR,
1214                price_oracle: *TICKET_PRICE_ORACLE_ADDR,
1215                win_prob_oracle: *WIN_PROB_ORACLE_ADDR,
1216                stake_factory: Default::default(),
1217            }),
1218            chain_key: SELF_CHAIN_KEY.clone(),
1219            safe_address: STAKE_ADDRESS.clone(),
1220            db,
1221            rpc_operations,
1222        }
1223    }
1224
1225    fn test_log() -> SerializableLog {
1226        SerializableLog { ..Default::default() }
1227    }
1228
1229    #[tokio::test]
1230    async fn announce_keybinding() -> anyhow::Result<()> {
1231        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1232
1233        let rpc_operations = MockIndexerRpcOperations::new();
1234        // ==> set mock expectations here
1235        let clonable_rpc_operations = ClonableMockOperations {
1236            inner: Arc::new(rpc_operations),
1237        };
1238
1239        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1240
1241        let keybinding = KeyBinding::new(*SELF_CHAIN_ADDRESS, &SELF_PRIV_KEY);
1242
1243        let keybinding_log = SerializableLog {
1244            address: handlers.addresses.announcements,
1245            topics: vec![
1246                hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::KeyBinding::SIGNATURE_HASH.into(),
1247            ],
1248            data: DynSolValue::Tuple(vec![
1249                DynSolValue::Bytes(keybinding.signature.as_ref().to_vec()),
1250                DynSolValue::Bytes(keybinding.packet_key.as_ref().to_vec()),
1251                DynSolValue::FixedBytes(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref()).into_word(), 32),
1252            ])
1253            .abi_encode_packed(),
1254            ..test_log()
1255        };
1256
1257        let account_entry = AccountEntry {
1258            public_key: *SELF_PRIV_KEY.public(),
1259            chain_addr: *SELF_CHAIN_ADDRESS,
1260            entry_type: AccountType::NotAnnounced,
1261            published_at: 0,
1262        };
1263
1264        let event_type = db
1265            .begin_transaction()
1266            .await?
1267            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, keybinding_log, true).await }))
1268            .await?;
1269
1270        assert!(event_type.is_none(), "keybinding does not have a chain event type");
1271
1272        assert_eq!(
1273            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1274                .await?
1275                .context("a value should be present")?,
1276            account_entry
1277        );
1278        Ok(())
1279    }
1280
1281    #[tokio::test]
1282    async fn announce_address_announcement() -> anyhow::Result<()> {
1283        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1284        let rpc_operations = MockIndexerRpcOperations::new();
1285        // ==> set mock expectations here
1286        let clonable_rpc_operations = ClonableMockOperations {
1287            //
1288            inner: Arc::new(rpc_operations),
1289        };
1290        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1291
1292        // Assume that there is a keybinding
1293        let account_entry = AccountEntry {
1294            public_key: *SELF_PRIV_KEY.public(),
1295            chain_addr: *SELF_CHAIN_ADDRESS,
1296            entry_type: AccountType::NotAnnounced,
1297            published_at: 1,
1298        };
1299        db.insert_account(None, account_entry.clone()).await?;
1300
1301        let test_multiaddr_empty: Multiaddr = "".parse()?;
1302
1303        let address_announcement_empty_log_encoded_data = DynSolValue::Tuple(vec![
1304            DynSolValue::Address(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref())),
1305            DynSolValue::String(test_multiaddr_empty.to_string()),
1306        ])
1307        .abi_encode();
1308
1309        let address_announcement_empty_log = SerializableLog {
1310            address: handlers.addresses.announcements,
1311            topics: vec![
1312                hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH
1313                    .into(),
1314            ],
1315            data: address_announcement_empty_log_encoded_data[32..].into(),
1316            ..test_log()
1317        };
1318
1319        let handlers_clone = handlers.clone();
1320        let event_type = db
1321            .begin_transaction()
1322            .await?
1323            .perform(|tx| {
1324                Box::pin(async move {
1325                    handlers_clone
1326                        .process_log_event(tx, address_announcement_empty_log, true)
1327                        .await
1328                })
1329            })
1330            .await?;
1331
1332        assert!(
1333            event_type.is_none(),
1334            "announcement of empty multiaddresses must pass through"
1335        );
1336
1337        assert_eq!(
1338            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1339                .await?
1340                .context("a value should be present")?,
1341            account_entry
1342        );
1343
1344        let test_multiaddr: Multiaddr = "/ip4/1.2.3.4/tcp/56".parse()?;
1345
1346        let address_announcement_log_encoded_data = DynSolValue::Tuple(vec![
1347            DynSolValue::Address(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref())),
1348            DynSolValue::String(test_multiaddr.to_string()),
1349        ])
1350        .abi_encode();
1351
1352        let address_announcement_log = SerializableLog {
1353            address: handlers.addresses.announcements,
1354            block_number: 1,
1355            topics: vec![
1356                hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH
1357                    .into(),
1358            ],
1359            data: address_announcement_log_encoded_data[32..].into(),
1360            ..test_log()
1361        };
1362
1363        let announced_account_entry = AccountEntry {
1364            public_key: *SELF_PRIV_KEY.public(),
1365            chain_addr: *SELF_CHAIN_ADDRESS,
1366            entry_type: AccountType::Announced {
1367                multiaddr: test_multiaddr.clone(),
1368                updated_block: 1,
1369            },
1370            published_at: 1,
1371        };
1372
1373        let handlers_clone = handlers.clone();
1374        let event_type = db
1375            .begin_transaction()
1376            .await?
1377            .perform(|tx| {
1378                Box::pin(async move {
1379                    handlers_clone
1380                        .process_log_event(tx, address_announcement_log, true)
1381                        .await
1382                })
1383            })
1384            .await?;
1385
1386        assert!(
1387            matches!(event_type, Some(ChainEventType::Announcement { multiaddresses,.. }) if multiaddresses == vec![test_multiaddr]),
1388            "must return the latest announce multiaddress"
1389        );
1390
1391        assert_eq!(
1392            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1393                .await?
1394                .context("a value should be present")?,
1395            announced_account_entry
1396        );
1397
1398        assert_eq!(
1399            Some(*SELF_CHAIN_ADDRESS),
1400            db.resolve_chain_key(SELF_PRIV_KEY.public()).await?,
1401            "must resolve correct chain key"
1402        );
1403
1404        assert_eq!(
1405            Some(*SELF_PRIV_KEY.public()),
1406            db.resolve_packet_key(&SELF_CHAIN_ADDRESS).await?,
1407            "must resolve correct packet key"
1408        );
1409
1410        let test_multiaddr_dns: Multiaddr = "/dns4/useful.domain/tcp/56".parse()?;
1411
1412        let address_announcement_dns_log_encoded_data = DynSolValue::Tuple(vec![
1413            DynSolValue::Address(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref())),
1414            DynSolValue::String(test_multiaddr_dns.to_string()),
1415        ])
1416        .abi_encode();
1417
1418        let address_announcement_dns_log = SerializableLog {
1419            address: handlers.addresses.announcements,
1420            block_number: 2,
1421            topics: vec![
1422                hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH
1423                    .into(),
1424            ],
1425            data: address_announcement_dns_log_encoded_data[32..].into(),
1426            ..test_log()
1427        };
1428
1429        let announced_dns_account_entry = AccountEntry {
1430            public_key: *SELF_PRIV_KEY.public(),
1431            chain_addr: *SELF_CHAIN_ADDRESS,
1432            entry_type: AccountType::Announced {
1433                multiaddr: test_multiaddr_dns.clone(),
1434                updated_block: 2,
1435            },
1436            published_at: 1,
1437        };
1438
1439        let event_type = db
1440            .begin_transaction()
1441            .await?
1442            .perform(|tx| {
1443                Box::pin(async move { handlers.process_log_event(tx, address_announcement_dns_log, true).await })
1444            })
1445            .await?;
1446
1447        assert!(
1448            matches!(event_type, Some(ChainEventType::Announcement { multiaddresses,.. }) if multiaddresses == vec![test_multiaddr_dns]),
1449            "must return the latest announce multiaddress"
1450        );
1451
1452        assert_eq!(
1453            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1454                .await?
1455                .context("a value should be present")?,
1456            announced_dns_account_entry
1457        );
1458
1459        assert_eq!(
1460            Some(*SELF_CHAIN_ADDRESS),
1461            db.resolve_chain_key(SELF_PRIV_KEY.public()).await?,
1462            "must resolve correct chain key"
1463        );
1464
1465        assert_eq!(
1466            Some(*SELF_PRIV_KEY.public()),
1467            db.resolve_packet_key(&SELF_CHAIN_ADDRESS).await?,
1468            "must resolve correct packet key"
1469        );
1470        Ok(())
1471    }
1472
1473    #[tokio::test]
1474    async fn announce_revoke() -> anyhow::Result<()> {
1475        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1476        let rpc_operations = MockIndexerRpcOperations::new();
1477        // ==> set mock expectations here
1478        let clonable_rpc_operations = ClonableMockOperations {
1479            //
1480            inner: Arc::new(rpc_operations),
1481        };
1482        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1483
1484        let test_multiaddr: Multiaddr = "/ip4/1.2.3.4/tcp/56".parse()?;
1485
1486        // Assume that there is a keybinding and an address announcement
1487        let announced_account_entry = AccountEntry {
1488            public_key: *SELF_PRIV_KEY.public(),
1489            chain_addr: *SELF_CHAIN_ADDRESS,
1490            entry_type: AccountType::Announced {
1491                multiaddr: test_multiaddr,
1492                updated_block: 0,
1493            },
1494            published_at: 1,
1495        };
1496
1497        db.insert_account(None, announced_account_entry).await?;
1498
1499        let encoded_data = (AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref()),).abi_encode();
1500
1501        let revoke_announcement_log = SerializableLog {
1502            address: handlers.addresses.announcements,
1503            topics: vec![
1504                hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::RevokeAnnouncement::SIGNATURE_HASH
1505                    .into(),
1506            ],
1507            data: encoded_data,
1508            ..test_log()
1509        };
1510
1511        let account_entry = AccountEntry {
1512            public_key: *SELF_PRIV_KEY.public(),
1513            chain_addr: *SELF_CHAIN_ADDRESS,
1514            entry_type: AccountType::NotAnnounced,
1515            published_at: 1,
1516        };
1517
1518        let event_type = db
1519            .begin_transaction()
1520            .await?
1521            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, revoke_announcement_log, true).await }))
1522            .await?;
1523
1524        assert!(
1525            event_type.is_none(),
1526            "revoke announcement does not have chain event type"
1527        );
1528
1529        assert_eq!(
1530            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1531                .await?
1532                .context("a value should be present")?,
1533            account_entry
1534        );
1535        Ok(())
1536    }
1537
1538    #[tokio::test]
1539    async fn on_token_transfer_to() -> anyhow::Result<()> {
1540        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1541
1542        let value = U256::MAX;
1543        let target_hopr_balance = HoprBalance::from(primitive_types::U256::from_big_endian(
1544            value.to_be_bytes_vec().as_slice(),
1545        ));
1546
1547        let mut rpc_operations = MockIndexerRpcOperations::new();
1548        rpc_operations
1549            .expect_get_hopr_balance()
1550            .times(1)
1551            .return_once(move |_| Ok(target_hopr_balance.clone()));
1552        rpc_operations
1553            .expect_get_hopr_allowance()
1554            .times(1)
1555            .returning(move |_, _| Ok(HoprBalance::from(primitive_types::U256::from(1000u64))));
1556        let clonable_rpc_operations = ClonableMockOperations {
1557            inner: Arc::new(rpc_operations),
1558        };
1559
1560        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1561
1562        let encoded_data = (value).abi_encode();
1563
1564        let transferred_log = SerializableLog {
1565            address: handlers.addresses.token,
1566            topics: vec![
1567                hopr_bindings::hoprtoken::HoprToken::Transfer::SIGNATURE_HASH.into(),
1568                H256::from_slice(&Address::default().to_bytes32()).into(),
1569                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1570            ],
1571            data: encoded_data,
1572            ..test_log()
1573        };
1574
1575        let event_type = db
1576            .begin_transaction()
1577            .await?
1578            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, transferred_log, true).await }))
1579            .await?;
1580
1581        assert!(event_type.is_none(), "token transfer does not have chain event type");
1582
1583        assert_eq!(db.get_safe_hopr_balance(None).await?, target_hopr_balance);
1584
1585        Ok(())
1586    }
1587
1588    #[test_log::test(tokio::test)]
1589    async fn on_token_transfer_from() -> anyhow::Result<()> {
1590        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1591
1592        let mut rpc_operations = MockIndexerRpcOperations::new();
1593        rpc_operations
1594            .expect_get_hopr_balance()
1595            .times(1)
1596            .return_once(|_| Ok(HoprBalance::zero()));
1597        rpc_operations
1598            .expect_get_hopr_allowance()
1599            .times(1)
1600            .returning(move |_, _| Ok(HoprBalance::from(primitive_types::U256::from(1000u64))));
1601        let clonable_rpc_operations = ClonableMockOperations {
1602            inner: Arc::new(rpc_operations),
1603        };
1604
1605        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1606
1607        let value = U256::MAX;
1608
1609        let encoded_data = (value).abi_encode();
1610
1611        db.set_safe_hopr_balance(
1612            None,
1613            HoprBalance::from(primitive_types::U256::from_big_endian(
1614                value.to_be_bytes_vec().as_slice(),
1615            )),
1616        )
1617        .await?;
1618
1619        let transferred_log = SerializableLog {
1620            address: handlers.addresses.token,
1621            topics: vec![
1622                hopr_bindings::hoprtoken::HoprToken::Transfer::SIGNATURE_HASH.into(),
1623                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1624                H256::from_slice(&Address::default().to_bytes32()).into(),
1625            ],
1626            data: encoded_data,
1627            ..test_log()
1628        };
1629
1630        let event_type = db
1631            .begin_transaction()
1632            .await?
1633            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, transferred_log, true).await }))
1634            .await?;
1635
1636        assert!(event_type.is_none(), "token transfer does not have chain event type");
1637
1638        assert_eq!(db.get_safe_hopr_balance(None).await?, HoprBalance::zero());
1639
1640        Ok(())
1641    }
1642
1643    #[tokio::test]
1644    async fn on_token_approval_correct() -> anyhow::Result<()> {
1645        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1646
1647        let target_allowance = HoprBalance::from(primitive_types::U256::from(1000u64));
1648        let mut rpc_operations = MockIndexerRpcOperations::new();
1649        rpc_operations
1650            .expect_get_hopr_allowance()
1651            .times(2)
1652            .returning(move |_, _| Ok(target_allowance.clone()));
1653        let clonable_rpc_operations = ClonableMockOperations {
1654            inner: Arc::new(rpc_operations),
1655        };
1656
1657        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1658
1659        let encoded_data = (U256::from(1000u64)).abi_encode();
1660
1661        let approval_log = SerializableLog {
1662            address: handlers.addresses.token,
1663            topics: vec![
1664                hopr_bindings::hoprtoken::HoprToken::Approval::SIGNATURE_HASH.into(),
1665                H256::from_slice(&handlers.safe_address.to_bytes32()).into(),
1666                H256::from_slice(&handlers.addresses.channels.to_bytes32()).into(),
1667            ],
1668            data: encoded_data,
1669            ..test_log()
1670        };
1671
1672        // before any operation the allowance should be 0
1673        assert_eq!(db.get_safe_hopr_allowance(None).await?, HoprBalance::zero());
1674
1675        let approval_log_clone = approval_log.clone();
1676        let handlers_clone = handlers.clone();
1677        let event_type = db
1678            .begin_transaction()
1679            .await?
1680            .perform(|tx| Box::pin(async move { handlers_clone.process_log_event(tx, approval_log_clone, true).await }))
1681            .await?;
1682
1683        assert!(event_type.is_none(), "token approval does not have chain event type");
1684
1685        // after processing the allowance should be 0
1686        assert_eq!(db.get_safe_hopr_allowance(None).await?, target_allowance.clone());
1687
1688        // reduce allowance manually to verify a second time
1689        let _ = db
1690            .set_safe_hopr_allowance(None, HoprBalance::from(primitive_types::U256::from(10u64)))
1691            .await;
1692        assert_eq!(
1693            db.get_safe_hopr_allowance(None).await?,
1694            HoprBalance::from(primitive_types::U256::from(10u64))
1695        );
1696
1697        let handlers_clone = handlers.clone();
1698        let event_type = db
1699            .begin_transaction()
1700            .await?
1701            .perform(|tx| Box::pin(async move { handlers_clone.process_log_event(tx, approval_log, true).await }))
1702            .await?;
1703
1704        assert!(event_type.is_none(), "token approval does not have chain event type");
1705
1706        assert_eq!(db.get_safe_hopr_allowance(None).await?, target_allowance);
1707        Ok(())
1708    }
1709
1710    #[tokio::test]
1711    async fn on_network_registry_event_registered() -> anyhow::Result<()> {
1712        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1713        let rpc_operations = MockIndexerRpcOperations::new();
1714        // ==> set mock expectations here
1715        let clonable_rpc_operations = ClonableMockOperations {
1716            //
1717            inner: Arc::new(rpc_operations),
1718        };
1719        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1720
1721        let encoded_data = ().abi_encode();
1722
1723        let registered_log = SerializableLog {
1724            address: handlers.addresses.network_registry,
1725            topics: vec![
1726                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::Registered::SIGNATURE_HASH.into(),
1727                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1728                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1729            ],
1730            data: encoded_data,
1731            // data: encode(&[]).into(),
1732            ..test_log()
1733        };
1734
1735        assert!(
1736            !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1737                .await?
1738        );
1739
1740        let event_type = db
1741            .begin_transaction()
1742            .await?
1743            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1744            .await?;
1745
1746        assert!(
1747            matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Allowed),
1748            "must return correct NR update"
1749        );
1750
1751        assert!(
1752            db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1753                .await?,
1754            "must be allowed in NR"
1755        );
1756        Ok(())
1757    }
1758
1759    #[tokio::test]
1760    async fn on_network_registry_event_registered_by_manager() -> anyhow::Result<()> {
1761        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1762        let rpc_operations = MockIndexerRpcOperations::new();
1763        // ==> set mock expectations here
1764        let clonable_rpc_operations = ClonableMockOperations {
1765            //
1766            inner: Arc::new(rpc_operations),
1767        };
1768        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1769
1770        let registered_log = SerializableLog {
1771            address: handlers.addresses.network_registry,
1772            topics: vec![
1773                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::RegisteredByManager::SIGNATURE_HASH.into(),
1774                // RegisteredByManagerFilter::signature().into(),
1775                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1776                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1777            ],
1778            data: ().abi_encode(),
1779            // data: encode(&[]).into(),
1780            ..test_log()
1781        };
1782
1783        assert!(
1784            !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1785                .await?
1786        );
1787
1788        let event_type = db
1789            .begin_transaction()
1790            .await?
1791            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1792            .await?;
1793
1794        assert!(
1795            matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Allowed),
1796            "must return correct NR update"
1797        );
1798
1799        assert!(
1800            db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1801                .await?,
1802            "must be allowed in NR"
1803        );
1804        Ok(())
1805    }
1806
1807    #[tokio::test]
1808    async fn on_network_registry_event_deregistered() -> anyhow::Result<()> {
1809        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1810        let rpc_operations = MockIndexerRpcOperations::new();
1811        // ==> set mock expectations here
1812        let clonable_rpc_operations = ClonableMockOperations {
1813            //
1814            inner: Arc::new(rpc_operations),
1815        };
1816        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1817
1818        db.set_access_in_network_registry(None, *SELF_CHAIN_ADDRESS, true)
1819            .await?;
1820
1821        let encoded_data = ().abi_encode();
1822
1823        let registered_log = SerializableLog {
1824            address: handlers.addresses.network_registry,
1825            topics: vec![
1826                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::Deregistered::SIGNATURE_HASH.into(),
1827                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1828                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1829            ],
1830            data: encoded_data,
1831            ..test_log()
1832        };
1833
1834        assert!(
1835            db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1836                .await?
1837        );
1838
1839        let event_type = db
1840            .begin_transaction()
1841            .await?
1842            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1843            .await?;
1844
1845        assert!(
1846            matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Denied),
1847            "must return correct NR update"
1848        );
1849
1850        assert!(
1851            !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1852                .await?,
1853            "must not be allowed in NR"
1854        );
1855        Ok(())
1856    }
1857
1858    #[tokio::test]
1859    async fn on_network_registry_event_deregistered_by_manager() -> anyhow::Result<()> {
1860        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1861        let rpc_operations = MockIndexerRpcOperations::new();
1862        // ==> set mock expectations here
1863        let clonable_rpc_operations = ClonableMockOperations {
1864            //
1865            inner: Arc::new(rpc_operations),
1866        };
1867        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1868
1869        db.set_access_in_network_registry(None, *SELF_CHAIN_ADDRESS, true)
1870            .await?;
1871
1872        let encoded_data = ().abi_encode();
1873
1874        let registered_log = SerializableLog {
1875            address: handlers.addresses.network_registry,
1876            topics: vec![
1877                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::DeregisteredByManager::SIGNATURE_HASH.into(),
1878                // DeregisteredByManagerFilter::signature().into(),
1879                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1880                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1881            ],
1882            data: encoded_data,
1883            ..test_log()
1884        };
1885
1886        assert!(
1887            db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1888                .await?
1889        );
1890
1891        let event_type = db
1892            .begin_transaction()
1893            .await?
1894            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1895            .await?;
1896
1897        assert!(
1898            matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Denied),
1899            "must return correct NR update"
1900        );
1901
1902        assert!(
1903            !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1904                .await?,
1905            "must not be allowed in NR"
1906        );
1907        Ok(())
1908    }
1909
1910    #[tokio::test]
1911    async fn on_network_registry_event_enabled() -> anyhow::Result<()> {
1912        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1913        let rpc_operations = MockIndexerRpcOperations::new();
1914        // ==> set mock expectations here
1915        let clonable_rpc_operations = ClonableMockOperations {
1916            //
1917            inner: Arc::new(rpc_operations),
1918        };
1919        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1920
1921        let encoded_data = ().abi_encode();
1922
1923        let nr_enabled = SerializableLog {
1924            address: handlers.addresses.network_registry,
1925            topics: vec![
1926                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::NetworkRegistryStatusUpdated::SIGNATURE_HASH
1927                    .into(),
1928                // NetworkRegistryStatusUpdatedFilter::signature().into(),
1929                H256::from_low_u64_be(1).into(),
1930            ],
1931            data: encoded_data,
1932            ..test_log()
1933        };
1934
1935        let event_type = db
1936            .begin_transaction()
1937            .await?
1938            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, nr_enabled, false).await }))
1939            .await?;
1940
1941        assert!(event_type.is_none(), "there's no chain event type for nr disable");
1942
1943        assert!(db.get_indexer_data(None).await?.nr_enabled);
1944        Ok(())
1945    }
1946
1947    #[tokio::test]
1948    async fn on_network_registry_event_disabled() -> anyhow::Result<()> {
1949        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1950        let rpc_operations = MockIndexerRpcOperations::new();
1951        // ==> set mock expectations here
1952        let clonable_rpc_operations = ClonableMockOperations {
1953            //
1954            inner: Arc::new(rpc_operations),
1955        };
1956        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1957
1958        db.set_network_registry_enabled(None, true).await?;
1959
1960        let encoded_data = ().abi_encode();
1961
1962        let nr_disabled = SerializableLog {
1963            address: handlers.addresses.network_registry,
1964            topics: vec![
1965                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::NetworkRegistryStatusUpdated::SIGNATURE_HASH
1966                    .into(),
1967                // NetworkRegistryStatusUpdatedFilter::signature().into(),
1968                H256::from_low_u64_be(0).into(),
1969            ],
1970            data: encoded_data,
1971            ..test_log()
1972        };
1973
1974        let event_type = db
1975            .begin_transaction()
1976            .await?
1977            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, nr_disabled, false).await }))
1978            .await?;
1979
1980        assert!(event_type.is_none(), "there's no chain event type for nr enable");
1981
1982        assert!(!db.get_indexer_data(None).await?.nr_enabled);
1983        Ok(())
1984    }
1985
1986    #[tokio::test]
1987    async fn on_network_registry_set_eligible() -> anyhow::Result<()> {
1988        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1989        let rpc_operations = MockIndexerRpcOperations::new();
1990        // ==> set mock expectations here
1991        let clonable_rpc_operations = ClonableMockOperations {
1992            //
1993            inner: Arc::new(rpc_operations),
1994        };
1995        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1996
1997        let encoded_data = ().abi_encode();
1998
1999        let set_eligible = SerializableLog {
2000            address: handlers.addresses.network_registry,
2001            topics: vec![
2002                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::EligibilityUpdated::SIGNATURE_HASH.into(),
2003                // EligibilityUpdatedFilter::signature().into(),
2004                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
2005                H256::from_low_u64_be(1).into(),
2006            ],
2007            data: encoded_data,
2008            ..test_log()
2009        };
2010
2011        let event_type = db
2012            .begin_transaction()
2013            .await?
2014            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, set_eligible, false).await }))
2015            .await?;
2016
2017        assert!(
2018            event_type.is_none(),
2019            "there's no chain event type for setting nr eligibility"
2020        );
2021
2022        assert!(db.is_safe_eligible(None, *STAKE_ADDRESS).await?);
2023
2024        Ok(())
2025    }
2026
2027    #[tokio::test]
2028    async fn on_network_registry_set_not_eligible() -> anyhow::Result<()> {
2029        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2030        let rpc_operations = MockIndexerRpcOperations::new();
2031        // ==> set mock expectations here
2032        let clonable_rpc_operations = ClonableMockOperations {
2033            //
2034            inner: Arc::new(rpc_operations),
2035        };
2036        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2037
2038        db.set_safe_eligibility(None, *STAKE_ADDRESS, false).await?;
2039
2040        let encoded_data = ().abi_encode();
2041
2042        let set_eligible = SerializableLog {
2043            address: handlers.addresses.network_registry,
2044            topics: vec![
2045                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::EligibilityUpdated::SIGNATURE_HASH.into(),
2046                // EligibilityUpdatedFilter::signature().into(),
2047                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
2048                H256::from_low_u64_be(0).into(),
2049            ],
2050            data: encoded_data,
2051            ..test_log()
2052        };
2053
2054        let event_type = db
2055            .begin_transaction()
2056            .await?
2057            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, set_eligible, true).await }))
2058            .await?;
2059
2060        assert!(
2061            event_type.is_none(),
2062            "there's no chain event type for unsetting nr eligibility"
2063        );
2064
2065        assert!(!db.is_safe_eligible(None, *STAKE_ADDRESS).await?);
2066
2067        Ok(())
2068    }
2069
2070    #[tokio::test]
2071    async fn on_channel_event_balance_increased() -> anyhow::Result<()> {
2072        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2073
2074        let value = U256::MAX;
2075        let target_hopr_balance = HoprBalance::from(primitive_types::U256::from_big_endian(
2076            value.to_be_bytes_vec().as_slice(),
2077        ));
2078
2079        let mut rpc_operations = MockIndexerRpcOperations::new();
2080        rpc_operations
2081            .expect_get_hopr_balance()
2082            .times(1)
2083            .return_once(move |_| Ok(target_hopr_balance.clone()));
2084        rpc_operations
2085            .expect_get_hopr_allowance()
2086            .times(1)
2087            .returning(move |_, _| Ok(HoprBalance::from(primitive_types::U256::from(1000u64))));
2088        let clonable_rpc_operations = ClonableMockOperations {
2089            inner: Arc::new(rpc_operations),
2090        };
2091        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2092
2093        let channel = ChannelEntry::new(
2094            *SELF_CHAIN_ADDRESS,
2095            *COUNTERPARTY_CHAIN_ADDRESS,
2096            0.into(),
2097            primitive_types::U256::zero(),
2098            ChannelStatus::Open,
2099            primitive_types::U256::one(),
2100        );
2101
2102        db.upsert_channel(None, channel).await?;
2103
2104        let solidity_balance: HoprBalance = primitive_types::U256::from((1u128 << 96) - 1).into();
2105        let diff = solidity_balance - channel.balance;
2106
2107        let encoded_data = (solidity_balance.amount().to_be_bytes()).abi_encode();
2108
2109        let balance_increased_log = SerializableLog {
2110            address: handlers.addresses.channels,
2111            topics: vec![
2112                hopr_bindings::hoprchannels::HoprChannels::ChannelBalanceIncreased::SIGNATURE_HASH.into(),
2113                // ChannelBalanceIncreasedFilter::signature().into(),
2114                H256::from_slice(channel.get_id().as_ref()).into(),
2115            ],
2116            data: encoded_data,
2117            ..test_log()
2118        };
2119
2120        let event_type = db
2121            .begin_transaction()
2122            .await?
2123            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_increased_log, true).await }))
2124            .await?;
2125
2126        let channel = db
2127            .get_channel_by_id(None, &channel.get_id())
2128            .await?
2129            .context("a value should be present")?;
2130
2131        assert!(
2132            matches!(event_type, Some(ChainEventType::ChannelBalanceIncreased(c, b)) if c == channel && b == diff),
2133            "must return updated channel entry and balance diff"
2134        );
2135
2136        assert_eq!(solidity_balance, channel.balance, "balance must be updated");
2137        Ok(())
2138    }
2139
2140    #[tokio::test]
2141    async fn on_channel_event_domain_separator_updated() -> anyhow::Result<()> {
2142        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2143        let rpc_operations = MockIndexerRpcOperations::new();
2144        // ==> set mock expectations here
2145        let clonable_rpc_operations = ClonableMockOperations {
2146            //
2147            inner: Arc::new(rpc_operations),
2148        };
2149        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2150
2151        let separator = Hash::from(hopr_crypto_random::random_bytes());
2152
2153        let encoded_data = ().abi_encode();
2154
2155        let channels_dst_updated = SerializableLog {
2156            address: handlers.addresses.channels,
2157            topics: vec![
2158                hopr_bindings::hoprchannels::HoprChannels::DomainSeparatorUpdated::SIGNATURE_HASH.into(),
2159                // DomainSeparatorUpdatedFilter::signature().into(),
2160                H256::from_slice(separator.as_ref()).into(),
2161            ],
2162            data: encoded_data,
2163            ..test_log()
2164        };
2165
2166        assert!(db.get_indexer_data(None).await?.channels_dst.is_none());
2167
2168        let event_type = db
2169            .begin_transaction()
2170            .await?
2171            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channels_dst_updated, true).await }))
2172            .await?;
2173
2174        assert!(
2175            event_type.is_none(),
2176            "there's no chain event type for channel dst update"
2177        );
2178
2179        assert_eq!(
2180            separator,
2181            db.get_indexer_data(None)
2182                .await?
2183                .channels_dst
2184                .context("a value should be present")?,
2185            "separator must be updated"
2186        );
2187        Ok(())
2188    }
2189
2190    #[tokio::test]
2191    async fn on_channel_event_balance_decreased() -> anyhow::Result<()> {
2192        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2193
2194        let value = U256::MAX;
2195        let target_hopr_balance = HoprBalance::from(primitive_types::U256::from_big_endian(
2196            value.to_be_bytes_vec().as_slice(),
2197        ));
2198
2199        let mut rpc_operations = MockIndexerRpcOperations::new();
2200        rpc_operations
2201            .expect_get_hopr_balance()
2202            .times(1)
2203            .return_once(move |_| Ok(target_hopr_balance.clone()));
2204        let clonable_rpc_operations = ClonableMockOperations {
2205            inner: Arc::new(rpc_operations),
2206        };
2207        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2208
2209        let channel = ChannelEntry::new(
2210            *SELF_CHAIN_ADDRESS,
2211            *COUNTERPARTY_CHAIN_ADDRESS,
2212            HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1)),
2213            primitive_types::U256::zero(),
2214            ChannelStatus::Open,
2215            primitive_types::U256::one(),
2216        );
2217
2218        db.upsert_channel(None, channel).await?;
2219
2220        let solidity_balance: HoprBalance = primitive_types::U256::from((1u128 << 96) - 2).into();
2221        let diff = channel.balance - solidity_balance;
2222
2223        // let encoded_data = (solidity_balance).abi_encode();
2224        let encoded_data = DynSolValue::Tuple(vec![DynSolValue::Uint(
2225            U256::from_be_slice(&solidity_balance.amount().to_be_bytes()),
2226            256,
2227        )])
2228        .abi_encode();
2229
2230        let balance_decreased_log = SerializableLog {
2231            address: handlers.addresses.channels,
2232            topics: vec![
2233                hopr_bindings::hoprchannels::HoprChannels::ChannelBalanceDecreased::SIGNATURE_HASH.into(),
2234                // ChannelBalanceDecreasedFilter::signature().into(),
2235                H256::from_slice(channel.get_id().as_ref()).into(),
2236            ],
2237            data: encoded_data,
2238            ..test_log()
2239        };
2240
2241        let event_type = db
2242            .begin_transaction()
2243            .await?
2244            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_decreased_log, true).await }))
2245            .await?;
2246
2247        let channel = db
2248            .get_channel_by_id(None, &channel.get_id())
2249            .await?
2250            .context("a value should be present")?;
2251
2252        assert!(
2253            matches!(event_type, Some(ChainEventType::ChannelBalanceDecreased(c, b)) if c == channel && b == diff),
2254            "must return updated channel entry and balance diff"
2255        );
2256
2257        assert_eq!(solidity_balance, channel.balance, "balance must be updated");
2258        Ok(())
2259    }
2260
2261    #[tokio::test]
2262    async fn on_channel_closed() -> anyhow::Result<()> {
2263        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2264        let rpc_operations = MockIndexerRpcOperations::new();
2265        // ==> set mock expectations here
2266        let clonable_rpc_operations = ClonableMockOperations {
2267            //
2268            inner: Arc::new(rpc_operations),
2269        };
2270        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2271
2272        let starting_balance = HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1));
2273
2274        let channel = ChannelEntry::new(
2275            *SELF_CHAIN_ADDRESS,
2276            *COUNTERPARTY_CHAIN_ADDRESS,
2277            starting_balance,
2278            primitive_types::U256::zero(),
2279            ChannelStatus::Open,
2280            primitive_types::U256::one(),
2281        );
2282
2283        db.upsert_channel(None, channel).await?;
2284
2285        let encoded_data = ().abi_encode();
2286
2287        let channel_closed_log = SerializableLog {
2288            address: handlers.addresses.channels,
2289            topics: vec![
2290                hopr_bindings::hoprchannels::HoprChannels::ChannelClosed::SIGNATURE_HASH.into(),
2291                // ChannelClosedFilter::signature().into(),
2292                H256::from_slice(channel.get_id().as_ref()).into(),
2293            ],
2294            data: encoded_data,
2295            ..test_log()
2296        };
2297
2298        let event_type = db
2299            .begin_transaction()
2300            .await?
2301            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_closed_log, true).await }))
2302            .await?;
2303
2304        let closed_channel = db
2305            .get_channel_by_id(None, &channel.get_id())
2306            .await?
2307            .context("a value should be present")?;
2308
2309        assert!(
2310            matches!(event_type, Some(ChainEventType::ChannelClosed(c)) if c == closed_channel),
2311            "must return the updated channel entry"
2312        );
2313
2314        assert_eq!(closed_channel.status, ChannelStatus::Closed);
2315        assert_eq!(closed_channel.ticket_index, 0u64.into());
2316        assert_eq!(
2317            0,
2318            db.get_outgoing_ticket_index(closed_channel.get_id())
2319                .await?
2320                .load(Ordering::Relaxed)
2321        );
2322
2323        assert!(closed_channel.balance.amount().eq(&primitive_types::U256::zero()));
2324        Ok(())
2325    }
2326
2327    #[tokio::test]
2328    async fn on_foreign_channel_closed() -> anyhow::Result<()> {
2329        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2330        let rpc_operations = MockIndexerRpcOperations::new();
2331        // ==> set mock expectations here
2332        let clonable_rpc_operations = ClonableMockOperations {
2333            //
2334            inner: Arc::new(rpc_operations),
2335        };
2336        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2337
2338        let starting_balance = HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1));
2339
2340        let channel = ChannelEntry::new(
2341            Address::new(&hex!("B7397C218766eBe6A1A634df523A1a7e412e67eA")),
2342            Address::new(&hex!("D4fdec44DB9D44B8f2b6d529620f9C0C7066A2c1")),
2343            starting_balance,
2344            primitive_types::U256::zero(),
2345            ChannelStatus::Open,
2346            primitive_types::U256::one(),
2347        );
2348
2349        db.upsert_channel(None, channel).await?;
2350
2351        let encoded_data = ().abi_encode();
2352
2353        let channel_closed_log = SerializableLog {
2354            address: handlers.addresses.channels,
2355            topics: vec![
2356                hopr_bindings::hoprchannels::HoprChannels::ChannelClosed::SIGNATURE_HASH.into(),
2357                // ChannelClosedFilter::signature().into(),
2358                H256::from_slice(channel.get_id().as_ref()).into(),
2359            ],
2360            data: encoded_data,
2361            ..test_log()
2362        };
2363
2364        let event_type = db
2365            .begin_transaction()
2366            .await?
2367            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_closed_log, true).await }))
2368            .await?;
2369
2370        let closed_channel = db.get_channel_by_id(None, &channel.get_id()).await?;
2371
2372        assert_eq!(None, closed_channel, "foreign channel must be deleted");
2373
2374        assert!(
2375            matches!(event_type, Some(ChainEventType::ChannelClosed(c)) if c.get_id() == channel.get_id()),
2376            "must return the closed channel entry"
2377        );
2378
2379        Ok(())
2380    }
2381
2382    #[tokio::test]
2383    async fn on_channel_opened() -> anyhow::Result<()> {
2384        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2385        let rpc_operations = MockIndexerRpcOperations::new();
2386        // ==> set mock expectations here
2387        let clonable_rpc_operations = ClonableMockOperations {
2388            //
2389            inner: Arc::new(rpc_operations),
2390        };
2391        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2392
2393        let channel_id = generate_channel_id(&SELF_CHAIN_ADDRESS, &COUNTERPARTY_CHAIN_ADDRESS);
2394
2395        let encoded_data = ().abi_encode();
2396
2397        let channel_opened_log = SerializableLog {
2398            address: handlers.addresses.channels,
2399            topics: vec![
2400                hopr_bindings::hoprchannels::HoprChannels::ChannelOpened::SIGNATURE_HASH.into(),
2401                // ChannelOpenedFilter::signature().into(),
2402                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2403                H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2404            ],
2405            data: encoded_data,
2406            ..test_log()
2407        };
2408
2409        let event_type = db
2410            .begin_transaction()
2411            .await?
2412            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log, true).await }))
2413            .await?;
2414
2415        let channel = db
2416            .get_channel_by_id(None, &channel_id)
2417            .await?
2418            .context("a value should be present")?;
2419
2420        assert!(
2421            matches!(event_type, Some(ChainEventType::ChannelOpened(c)) if c == channel),
2422            "must return the updated channel entry"
2423        );
2424
2425        assert_eq!(channel.status, ChannelStatus::Open);
2426        assert_eq!(channel.channel_epoch, 1u64.into());
2427        assert_eq!(channel.ticket_index, 0u64.into());
2428        assert_eq!(
2429            0,
2430            db.get_outgoing_ticket_index(channel.get_id())
2431                .await?
2432                .load(Ordering::Relaxed)
2433        );
2434        Ok(())
2435    }
2436
2437    #[tokio::test]
2438    async fn on_channel_reopened() -> anyhow::Result<()> {
2439        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2440        let rpc_operations = MockIndexerRpcOperations::new();
2441        // ==> set mock expectations here
2442        let clonable_rpc_operations = ClonableMockOperations {
2443            //
2444            inner: Arc::new(rpc_operations),
2445        };
2446        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2447
2448        let channel = ChannelEntry::new(
2449            *SELF_CHAIN_ADDRESS,
2450            *COUNTERPARTY_CHAIN_ADDRESS,
2451            HoprBalance::zero(),
2452            primitive_types::U256::zero(),
2453            ChannelStatus::Closed,
2454            3.into(),
2455        );
2456
2457        db.upsert_channel(None, channel).await?;
2458
2459        let encoded_data = ().abi_encode();
2460
2461        let channel_opened_log = SerializableLog {
2462            address: handlers.addresses.channels,
2463            topics: vec![
2464                hopr_bindings::hoprchannels::HoprChannels::ChannelOpened::SIGNATURE_HASH.into(),
2465                // ChannelOpenedFilter::signature().into(),
2466                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2467                H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2468            ],
2469            data: encoded_data,
2470            ..test_log()
2471        };
2472
2473        let event_type = db
2474            .begin_transaction()
2475            .await?
2476            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log, true).await }))
2477            .await?;
2478
2479        let channel = db
2480            .get_channel_by_id(None, &channel.get_id())
2481            .await?
2482            .context("a value should be present")?;
2483
2484        assert!(
2485            matches!(event_type, Some(ChainEventType::ChannelOpened(c)) if c == channel),
2486            "must return the updated channel entry"
2487        );
2488
2489        assert_eq!(channel.status, ChannelStatus::Open);
2490        assert_eq!(channel.channel_epoch, 4u64.into());
2491        assert_eq!(channel.ticket_index, 0u64.into());
2492
2493        assert_eq!(
2494            0,
2495            db.get_outgoing_ticket_index(channel.get_id())
2496                .await?
2497                .load(Ordering::Relaxed)
2498        );
2499        Ok(())
2500    }
2501
2502    #[tokio::test]
2503    async fn on_channel_should_not_reopen_when_not_closed() -> anyhow::Result<()> {
2504        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2505        let rpc_operations = MockIndexerRpcOperations::new();
2506        // ==> set mock expectations here
2507        let clonable_rpc_operations = ClonableMockOperations {
2508            //
2509            inner: Arc::new(rpc_operations),
2510        };
2511        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2512
2513        let channel = ChannelEntry::new(
2514            *SELF_CHAIN_ADDRESS,
2515            *COUNTERPARTY_CHAIN_ADDRESS,
2516            0.into(),
2517            primitive_types::U256::zero(),
2518            ChannelStatus::Open,
2519            3.into(),
2520        );
2521
2522        db.upsert_channel(None, channel).await?;
2523
2524        let encoded_data = ().abi_encode();
2525
2526        let channel_opened_log = SerializableLog {
2527            address: handlers.addresses.channels,
2528            topics: vec![
2529                hopr_bindings::hoprchannels::HoprChannels::ChannelOpened::SIGNATURE_HASH.into(),
2530                // ChannelOpenedFilter::signature().into(),
2531                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2532                H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2533            ],
2534            data: encoded_data,
2535            ..test_log()
2536        };
2537
2538        db.begin_transaction()
2539            .await?
2540            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log, true).await }))
2541            .await
2542            .expect_err("should not re-open channel that is not Closed");
2543        Ok(())
2544    }
2545
2546    const PRICE_PER_PACKET: u32 = 20_u32;
2547
2548    fn mock_acknowledged_ticket(
2549        signer: &ChainKeypair,
2550        destination: &ChainKeypair,
2551        index: u64,
2552        win_prob: f64,
2553    ) -> anyhow::Result<AcknowledgedTicket> {
2554        let channel_id = generate_channel_id(&signer.into(), &destination.into());
2555
2556        let channel_epoch = 1u64;
2557        let domain_separator = Hash::default();
2558
2559        let response = Response::try_from(
2560            Hash::create(&[channel_id.as_ref(), &channel_epoch.to_be_bytes(), &index.to_be_bytes()]).as_ref(),
2561        )?;
2562
2563        Ok(TicketBuilder::default()
2564            .direction(&signer.into(), &destination.into())
2565            .amount(primitive_types::U256::from(PRICE_PER_PACKET).div_f64(win_prob)?)
2566            .index(index)
2567            .index_offset(1)
2568            .win_prob(win_prob.try_into()?)
2569            .channel_epoch(1)
2570            .challenge(response.to_challenge().into())
2571            .build_signed(signer, &domain_separator)?
2572            .into_acknowledged(response))
2573    }
2574
2575    #[tokio::test]
2576    async fn on_channel_ticket_redeemed_incoming_channel() -> anyhow::Result<()> {
2577        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2578        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2579            .await?;
2580        let rpc_operations = MockIndexerRpcOperations::new();
2581        // ==> set mock expectations here
2582        let clonable_rpc_operations = ClonableMockOperations {
2583            //
2584            inner: Arc::new(rpc_operations),
2585        };
2586        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2587
2588        let channel = ChannelEntry::new(
2589            *COUNTERPARTY_CHAIN_ADDRESS,
2590            *SELF_CHAIN_ADDRESS,
2591            HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1)),
2592            primitive_types::U256::zero(),
2593            ChannelStatus::Open,
2594            primitive_types::U256::one(),
2595        );
2596
2597        let ticket_index = primitive_types::U256::from((1u128 << 48) - 2);
2598        let next_ticket_index = ticket_index + 1;
2599
2600        let mut ticket =
2601            mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64(), 1.0)?;
2602        ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
2603
2604        let ticket_value = ticket.verified_ticket().amount;
2605
2606        db.upsert_channel(None, channel).await?;
2607        db.upsert_ticket(None, ticket.clone()).await?;
2608
2609        let ticket_redeemed_log = SerializableLog {
2610            address: handlers.addresses.channels,
2611            topics: vec![
2612                hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2613                // TicketRedeemedFilter::signature().into(),
2614                H256::from_slice(channel.get_id().as_ref()).into(),
2615            ],
2616            data: DynSolValue::Tuple(vec![DynSolValue::Uint(
2617                U256::from_be_bytes(next_ticket_index.to_be_bytes()),
2618                48,
2619            )])
2620            .abi_encode(),
2621            ..test_log()
2622        };
2623
2624        let outgoing_ticket_index_before = db
2625            .get_outgoing_ticket_index(channel.get_id())
2626            .await?
2627            .load(Ordering::Relaxed);
2628
2629        let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2630        assert_eq!(
2631            HoprBalance::zero(),
2632            stats.redeemed_value,
2633            "there should not be any redeemed value"
2634        );
2635        assert_eq!(
2636            HoprBalance::zero(),
2637            stats.neglected_value,
2638            "there should not be any neglected value"
2639        );
2640
2641        let event_type = db
2642            .begin_transaction()
2643            .await?
2644            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2645            .await?;
2646
2647        let channel = db
2648            .get_channel_by_id(None, &channel.get_id())
2649            .await?
2650            .context("a value should be present")?;
2651
2652        assert!(
2653            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, t)) if channel == c && t == Some(ticket)),
2654            "must return the updated channel entry and the redeemed ticket"
2655        );
2656
2657        assert_eq!(
2658            channel.ticket_index, next_ticket_index,
2659            "channel entry must contain next ticket index"
2660        );
2661
2662        let outgoing_ticket_index_after = db
2663            .get_outgoing_ticket_index(channel.get_id())
2664            .await?
2665            .load(Ordering::Relaxed);
2666
2667        assert_eq!(
2668            outgoing_ticket_index_before, outgoing_ticket_index_after,
2669            "outgoing ticket index must not change"
2670        );
2671
2672        let tickets = db.get_tickets((&channel).into()).await?;
2673
2674        assert!(tickets.is_empty(), "there should not be any tickets left");
2675
2676        let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2677        assert_eq!(
2678            ticket_value, stats.redeemed_value,
2679            "there should be redeemed value worth 1 ticket"
2680        );
2681        assert_eq!(
2682            HoprBalance::zero(),
2683            stats.neglected_value,
2684            "there should not be any neglected ticket"
2685        );
2686        Ok(())
2687    }
2688
2689    #[tokio::test]
2690    async fn on_channel_ticket_redeemed_incoming_channel_neglect_left_over_tickets() -> anyhow::Result<()> {
2691        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2692        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2693            .await?;
2694        let rpc_operations = MockIndexerRpcOperations::new();
2695        // ==> set mock expectations here
2696        let clonable_rpc_operations = ClonableMockOperations {
2697            //
2698            inner: Arc::new(rpc_operations),
2699        };
2700        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2701
2702        let channel = ChannelEntry::new(
2703            *COUNTERPARTY_CHAIN_ADDRESS,
2704            *SELF_CHAIN_ADDRESS,
2705            primitive_types::U256::from((1u128 << 96) - 1).into(),
2706            primitive_types::U256::zero(),
2707            ChannelStatus::Open,
2708            primitive_types::U256::one(),
2709        );
2710
2711        let ticket_index = primitive_types::U256::from((1u128 << 48) - 2);
2712        let next_ticket_index = ticket_index + 1;
2713
2714        let mut ticket =
2715            mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64(), 1.0)?;
2716        ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
2717
2718        let ticket_value = ticket.verified_ticket().amount;
2719
2720        db.upsert_channel(None, channel).await?;
2721        db.upsert_ticket(None, ticket.clone()).await?;
2722
2723        let old_ticket =
2724            mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64() - 1, 1.0)?;
2725        db.upsert_ticket(None, old_ticket.clone()).await?;
2726
2727        let ticket_redeemed_log = SerializableLog {
2728            address: handlers.addresses.channels,
2729            topics: vec![
2730                hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2731                // TicketRedeemedFilter::signature().into(),
2732                H256::from_slice(channel.get_id().as_ref()).into(),
2733            ],
2734            data: Vec::from(next_ticket_index.to_be_bytes()),
2735            ..test_log()
2736        };
2737
2738        let outgoing_ticket_index_before = db
2739            .get_outgoing_ticket_index(channel.get_id())
2740            .await?
2741            .load(Ordering::Relaxed);
2742
2743        let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2744        assert_eq!(
2745            HoprBalance::zero(),
2746            stats.redeemed_value,
2747            "there should not be any redeemed value"
2748        );
2749        assert_eq!(
2750            HoprBalance::zero(),
2751            stats.neglected_value,
2752            "there should not be any neglected value"
2753        );
2754
2755        let event_type = db
2756            .begin_transaction()
2757            .await?
2758            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2759            .await?;
2760
2761        let channel = db
2762            .get_channel_by_id(None, &channel.get_id())
2763            .await?
2764            .context("a value should be present")?;
2765
2766        assert!(
2767            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, t)) if channel == c && t == Some(ticket)),
2768            "must return the updated channel entry and the redeemed ticket"
2769        );
2770
2771        assert_eq!(
2772            channel.ticket_index, next_ticket_index,
2773            "channel entry must contain next ticket index"
2774        );
2775
2776        let outgoing_ticket_index_after = db
2777            .get_outgoing_ticket_index(channel.get_id())
2778            .await?
2779            .load(Ordering::Relaxed);
2780
2781        assert_eq!(
2782            outgoing_ticket_index_before, outgoing_ticket_index_after,
2783            "outgoing ticket index must not change"
2784        );
2785
2786        let tickets = db.get_tickets((&channel).into()).await?;
2787        assert!(tickets.is_empty(), "there should not be any tickets left");
2788
2789        let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2790        assert_eq!(
2791            ticket_value, stats.redeemed_value,
2792            "there should be redeemed value worth 1 ticket"
2793        );
2794        assert_eq!(
2795            ticket_value, stats.neglected_value,
2796            "there should neglected value worth 1 ticket"
2797        );
2798        Ok(())
2799    }
2800
2801    #[tokio::test]
2802    async fn on_channel_ticket_redeemed_outgoing_channel() -> anyhow::Result<()> {
2803        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2804        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2805            .await?;
2806        let rpc_operations = MockIndexerRpcOperations::new();
2807        // ==> set mock expectations here
2808        let clonable_rpc_operations = ClonableMockOperations {
2809            //
2810            inner: Arc::new(rpc_operations),
2811        };
2812        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2813
2814        let channel = ChannelEntry::new(
2815            *SELF_CHAIN_ADDRESS,
2816            *COUNTERPARTY_CHAIN_ADDRESS,
2817            primitive_types::U256::from((1u128 << 96) - 1).into(),
2818            primitive_types::U256::zero(),
2819            ChannelStatus::Open,
2820            primitive_types::U256::one(),
2821        );
2822
2823        let ticket_index = primitive_types::U256::from((1u128 << 48) - 2);
2824        let next_ticket_index = ticket_index + 1;
2825
2826        db.upsert_channel(None, channel).await?;
2827
2828        let ticket_redeemed_log = SerializableLog {
2829            address: handlers.addresses.channels,
2830            topics: vec![
2831                hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2832                // TicketRedeemedFilter::signature().into(),
2833                H256::from_slice(channel.get_id().as_ref()).into(),
2834            ],
2835            data: Vec::from(next_ticket_index.to_be_bytes()),
2836            ..test_log()
2837        };
2838
2839        let event_type = db
2840            .begin_transaction()
2841            .await?
2842            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2843            .await?;
2844
2845        let channel = db
2846            .get_channel_by_id(None, &channel.get_id())
2847            .await?
2848            .context("a value should be present")?;
2849
2850        assert!(
2851            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if channel == c),
2852            "must return update channel entry and no ticket"
2853        );
2854
2855        assert_eq!(
2856            channel.ticket_index, next_ticket_index,
2857            "channel entry must contain next ticket index"
2858        );
2859
2860        let outgoing_ticket_index = db
2861            .get_outgoing_ticket_index(channel.get_id())
2862            .await?
2863            .load(Ordering::Relaxed);
2864
2865        assert!(
2866            outgoing_ticket_index >= ticket_index.as_u64(),
2867            "outgoing idx {outgoing_ticket_index} must be greater or equal to {ticket_index}"
2868        );
2869        assert_eq!(
2870            outgoing_ticket_index,
2871            next_ticket_index.as_u64(),
2872            "outgoing ticket index must be equal to next ticket index"
2873        );
2874        Ok(())
2875    }
2876
2877    #[tokio::test]
2878    async fn on_channel_ticket_redeemed_on_incoming_channel_with_non_existent_ticket_should_pass() -> anyhow::Result<()>
2879    {
2880        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2881        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2882            .await?;
2883        let rpc_operations = MockIndexerRpcOperations::new();
2884        // ==> set mock expectations here
2885        let clonable_rpc_operations = ClonableMockOperations {
2886            //
2887            inner: Arc::new(rpc_operations),
2888        };
2889        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2890
2891        let channel = ChannelEntry::new(
2892            *COUNTERPARTY_CHAIN_ADDRESS,
2893            *SELF_CHAIN_ADDRESS,
2894            primitive_types::U256::from((1u128 << 96) - 1).into(),
2895            primitive_types::U256::zero(),
2896            ChannelStatus::Open,
2897            primitive_types::U256::one(),
2898        );
2899
2900        db.upsert_channel(None, channel).await?;
2901
2902        let next_ticket_index = primitive_types::U256::from((1u128 << 48) - 1);
2903
2904        let ticket_redeemed_log = SerializableLog {
2905            address: handlers.addresses.channels,
2906            topics: vec![
2907                hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2908                // TicketRedeemedFilter::signature().into(),
2909                H256::from_slice(channel.get_id().as_ref()).into(),
2910            ],
2911            data: Vec::from(next_ticket_index.to_be_bytes()),
2912            ..test_log()
2913        };
2914
2915        let event_type = db
2916            .begin_transaction()
2917            .await?
2918            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2919            .await?;
2920
2921        let channel = db
2922            .get_channel_by_id(None, &channel.get_id())
2923            .await?
2924            .context("a value should be present")?;
2925
2926        assert!(
2927            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if c == channel),
2928            "must return updated channel entry and no ticket"
2929        );
2930
2931        assert_eq!(
2932            channel.ticket_index, next_ticket_index,
2933            "channel entry must contain next ticket index"
2934        );
2935        Ok(())
2936    }
2937
2938    #[tokio::test]
2939    async fn on_channel_ticket_redeemed_on_foreign_channel_should_pass() -> anyhow::Result<()> {
2940        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2941        let rpc_operations = MockIndexerRpcOperations::new();
2942        // ==> set mock expectations here
2943        let clonable_rpc_operations = ClonableMockOperations {
2944            //
2945            inner: Arc::new(rpc_operations),
2946        };
2947        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2948
2949        let channel = ChannelEntry::new(
2950            Address::from(hopr_crypto_random::random_bytes()),
2951            Address::from(hopr_crypto_random::random_bytes()),
2952            primitive_types::U256::from((1u128 << 96) - 1).into(),
2953            primitive_types::U256::zero(),
2954            ChannelStatus::Open,
2955            primitive_types::U256::one(),
2956        );
2957
2958        db.upsert_channel(None, channel).await?;
2959
2960        let next_ticket_index = primitive_types::U256::from((1u128 << 48) - 1);
2961
2962        let ticket_redeemed_log = SerializableLog {
2963            address: handlers.addresses.channels,
2964            topics: vec![
2965                hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2966                // TicketRedeemedFilter::signature().into(),
2967                H256::from_slice(channel.get_id().as_ref()).into(),
2968            ],
2969            data: Vec::from(next_ticket_index.to_be_bytes()),
2970            ..test_log()
2971        };
2972
2973        let event_type = db
2974            .begin_transaction()
2975            .await?
2976            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2977            .await?;
2978
2979        let channel = db
2980            .get_channel_by_id(None, &channel.get_id())
2981            .await?
2982            .context("a value should be present")?;
2983
2984        assert!(
2985            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if c == channel),
2986            "must return updated channel entry and no ticket"
2987        );
2988
2989        assert_eq!(
2990            channel.ticket_index, next_ticket_index,
2991            "channel entry must contain next ticket index"
2992        );
2993        Ok(())
2994    }
2995
2996    #[tokio::test]
2997    async fn on_channel_closure_initiated() -> anyhow::Result<()> {
2998        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2999        let rpc_operations = MockIndexerRpcOperations::new();
3000        // ==> set mock expectations here
3001        let clonable_rpc_operations = ClonableMockOperations {
3002            //
3003            inner: Arc::new(rpc_operations),
3004        };
3005        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3006
3007        let channel = ChannelEntry::new(
3008            *SELF_CHAIN_ADDRESS,
3009            *COUNTERPARTY_CHAIN_ADDRESS,
3010            primitive_types::U256::from((1u128 << 96) - 1).into(),
3011            primitive_types::U256::zero(),
3012            ChannelStatus::Open,
3013            primitive_types::U256::one(),
3014        );
3015
3016        db.upsert_channel(None, channel).await?;
3017
3018        let timestamp = SystemTime::now();
3019
3020        let encoded_data = (U256::from(timestamp.as_unix_timestamp().as_secs())).abi_encode();
3021
3022        let closure_initiated_log = SerializableLog {
3023            address: handlers.addresses.channels,
3024            topics: vec![
3025                hopr_bindings::hoprchannels::HoprChannels::OutgoingChannelClosureInitiated::SIGNATURE_HASH.into(),
3026                // OutgoingChannelClosureInitiatedFilter::signature().into(),
3027                H256::from_slice(channel.get_id().as_ref()).into(),
3028            ],
3029            data: encoded_data,
3030            // data: Vec::from(U256::from(timestamp.as_unix_timestamp().as_secs()).to_be_bytes()).into(),
3031            ..test_log()
3032        };
3033
3034        let event_type = db
3035            .begin_transaction()
3036            .await?
3037            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, closure_initiated_log, true).await }))
3038            .await?;
3039
3040        let channel = db
3041            .get_channel_by_id(None, &channel.get_id())
3042            .await?
3043            .context("a value should be present")?;
3044
3045        assert!(
3046            matches!(event_type, Some(ChainEventType::ChannelClosureInitiated(c)) if c == channel),
3047            "must return updated channel entry"
3048        );
3049
3050        assert_eq!(
3051            channel.status,
3052            ChannelStatus::PendingToClose(timestamp),
3053            "channel status must match"
3054        );
3055        Ok(())
3056    }
3057
3058    #[tokio::test]
3059    async fn on_node_safe_registry_registered() -> anyhow::Result<()> {
3060        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3061        let rpc_operations = MockIndexerRpcOperations::new();
3062        // ==> set mock expectations here
3063        let clonable_rpc_operations = ClonableMockOperations {
3064            //
3065            inner: Arc::new(rpc_operations),
3066        };
3067        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3068
3069        let encoded_data = ().abi_encode();
3070
3071        let safe_registered_log = SerializableLog {
3072            address: handlers.addresses.safe_registry,
3073            topics: vec![
3074                hopr_bindings::hoprnodesaferegistry::HoprNodeSafeRegistry::RegisteredNodeSafe::SIGNATURE_HASH.into(),
3075                // RegisteredNodeSafeFilter::signature().into(),
3076                H256::from_slice(&SAFE_INSTANCE_ADDR.to_bytes32()).into(),
3077                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
3078            ],
3079            data: encoded_data,
3080            ..test_log()
3081        };
3082
3083        let event_type = db
3084            .begin_transaction()
3085            .await?
3086            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, safe_registered_log, true).await }))
3087            .await?;
3088
3089        assert!(matches!(event_type, Some(ChainEventType::NodeSafeRegistered(addr)) if addr == *SAFE_INSTANCE_ADDR));
3090
3091        // Nothing to check in the DB here, since we do not track this
3092        Ok(())
3093    }
3094
3095    #[tokio::test]
3096    async fn on_node_safe_registry_deregistered() -> anyhow::Result<()> {
3097        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3098        let rpc_operations = MockIndexerRpcOperations::new();
3099        // ==> set mock expectations here
3100        let clonable_rpc_operations = ClonableMockOperations {
3101            //
3102            inner: Arc::new(rpc_operations),
3103        };
3104        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3105
3106        // Nothing to write to the DB here, since we do not track this
3107
3108        let encoded_data = ().abi_encode();
3109
3110        let safe_registered_log = SerializableLog {
3111            address: handlers.addresses.safe_registry,
3112            topics: vec![
3113                hopr_bindings::hoprnodesaferegistry::HoprNodeSafeRegistry::DergisteredNodeSafe::SIGNATURE_HASH.into(),
3114                // DergisteredNodeSafeFilter::signature().into(),
3115                H256::from_slice(&SAFE_INSTANCE_ADDR.to_bytes32()).into(),
3116                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
3117            ],
3118            data: encoded_data,
3119            ..test_log()
3120        };
3121
3122        let event_type = db
3123            .begin_transaction()
3124            .await?
3125            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, safe_registered_log, true).await }))
3126            .await?;
3127
3128        assert!(
3129            event_type.is_none(),
3130            "there's no associated chain event type with safe deregistration"
3131        );
3132
3133        // Nothing to check in the DB here, since we do not track this
3134        Ok(())
3135    }
3136
3137    #[tokio::test]
3138    async fn ticket_price_update() -> anyhow::Result<()> {
3139        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3140        let rpc_operations = MockIndexerRpcOperations::new();
3141        // ==> set mock expectations here
3142        let clonable_rpc_operations = ClonableMockOperations {
3143            //
3144            inner: Arc::new(rpc_operations),
3145        };
3146        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3147
3148        let encoded_data = (U256::from(1u64), U256::from(123u64)).abi_encode();
3149
3150        let price_change_log = SerializableLog {
3151            address: handlers.addresses.price_oracle,
3152            topics: vec![
3153                hopr_bindings::hoprticketpriceoracle::HoprTicketPriceOracle::TicketPriceUpdated::SIGNATURE_HASH.into(),
3154                // TicketPriceUpdatedFilter::signature().into()
3155            ],
3156            data: encoded_data,
3157            // data: encode(&[Token::Uint(EthU256::from(1u64)), Token::Uint(EthU256::from(123u64))]).into(),
3158            ..test_log()
3159        };
3160
3161        assert_eq!(db.get_indexer_data(None).await?.ticket_price, None);
3162
3163        let event_type = db
3164            .begin_transaction()
3165            .await?
3166            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, price_change_log, true).await }))
3167            .await?;
3168
3169        assert!(
3170            event_type.is_none(),
3171            "there's no associated chain event type with price oracle"
3172        );
3173
3174        assert_eq!(
3175            db.get_indexer_data(None).await?.ticket_price.map(|p| p.amount()),
3176            Some(primitive_types::U256::from(123u64))
3177        );
3178        Ok(())
3179    }
3180
3181    #[tokio::test]
3182    async fn minimum_win_prob_update() -> anyhow::Result<()> {
3183        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3184        let rpc_operations = MockIndexerRpcOperations::new();
3185        // ==> set mock expectations here
3186        let clonable_rpc_operations = ClonableMockOperations {
3187            //
3188            inner: Arc::new(rpc_operations),
3189        };
3190        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3191
3192        let encoded_data = (
3193            U256::from_be_slice(WinningProbability::ALWAYS.as_ref()),
3194            U256::from_be_slice(WinningProbability::try_from_f64(0.5)?.as_ref()),
3195        )
3196            .abi_encode();
3197
3198        let win_prob_change_log = SerializableLog {
3199            address: handlers.addresses.win_prob_oracle,
3200            topics: vec![
3201                hopr_bindings::hoprwinningprobabilityoracle::HoprWinningProbabilityOracle::WinProbUpdated::SIGNATURE_HASH.into()],
3202            data: encoded_data,
3203            ..test_log()
3204        };
3205
3206        assert_eq!(
3207            db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
3208            1.0
3209        );
3210
3211        let event_type = db
3212            .begin_transaction()
3213            .await?
3214            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, win_prob_change_log, true).await }))
3215            .await?;
3216
3217        assert!(
3218            event_type.is_none(),
3219            "there's no associated chain event type with winning probability change"
3220        );
3221
3222        assert_eq!(
3223            db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
3224            0.5
3225        );
3226        Ok(())
3227    }
3228
3229    #[tokio::test]
3230    async fn lowering_minimum_win_prob_update_should_reject_non_satisfying_unredeemed_tickets() -> anyhow::Result<()> {
3231        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3232        db.set_minimum_incoming_ticket_win_prob(None, 0.1.try_into()?).await?;
3233
3234        let new_minimum = 0.5;
3235        let ticket_win_probs = [0.1, 1.0, 0.3, 0.2];
3236
3237        let channel_1 = ChannelEntry::new(
3238            *COUNTERPARTY_CHAIN_ADDRESS,
3239            *SELF_CHAIN_ADDRESS,
3240            primitive_types::U256::from((1u128 << 96) - 1).into(),
3241            3_u32.into(),
3242            ChannelStatus::Open,
3243            primitive_types::U256::one(),
3244        );
3245
3246        db.upsert_channel(None, channel_1).await?;
3247
3248        let ticket = mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, 1, ticket_win_probs[0])?;
3249        db.upsert_ticket(None, ticket).await?;
3250
3251        let ticket = mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, 2, ticket_win_probs[1])?;
3252        db.upsert_ticket(None, ticket).await?;
3253
3254        let tickets = db.get_tickets((&channel_1).into()).await?;
3255        assert_eq!(tickets.len(), 2);
3256
3257        // ---
3258
3259        let other_counterparty = ChainKeypair::random();
3260        let channel_2 = ChannelEntry::new(
3261            other_counterparty.public().to_address(),
3262            *SELF_CHAIN_ADDRESS,
3263            primitive_types::U256::from((1u128 << 96) - 1).into(),
3264            3_u32.into(),
3265            ChannelStatus::Open,
3266            primitive_types::U256::one(),
3267        );
3268
3269        db.upsert_channel(None, channel_2).await?;
3270
3271        let ticket = mock_acknowledged_ticket(&other_counterparty, &SELF_CHAIN_KEY, 1, ticket_win_probs[2])?;
3272        db.upsert_ticket(None, ticket).await?;
3273
3274        let ticket = mock_acknowledged_ticket(&other_counterparty, &SELF_CHAIN_KEY, 2, ticket_win_probs[3])?;
3275        db.upsert_ticket(None, ticket).await?;
3276
3277        let tickets = db.get_tickets((&channel_2).into()).await?;
3278        assert_eq!(tickets.len(), 2);
3279
3280        let stats = db.get_ticket_statistics(None).await?;
3281        assert_eq!(HoprBalance::zero(), stats.rejected_value);
3282
3283        let rpc_operations = MockIndexerRpcOperations::new();
3284        // ==> set mock expectations here
3285        let clonable_rpc_operations = ClonableMockOperations {
3286            //
3287            inner: Arc::new(rpc_operations),
3288        };
3289        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3290
3291        let encoded_data = (
3292            U256::from_be_slice(WinningProbability::try_from(0.1)?.as_ref()),
3293            U256::from_be_slice(WinningProbability::try_from(new_minimum)?.as_ref()),
3294        )
3295            .abi_encode();
3296
3297        let win_prob_change_log = SerializableLog {
3298            address: handlers.addresses.win_prob_oracle,
3299            topics: vec![
3300                hopr_bindings::hoprwinningprobabilityoracle::HoprWinningProbabilityOracle::WinProbUpdated::SIGNATURE_HASH.into(),
3301            ],
3302            data: encoded_data,
3303            ..test_log()
3304        };
3305
3306        let event_type = db
3307            .begin_transaction()
3308            .await?
3309            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, win_prob_change_log, true).await }))
3310            .await?;
3311
3312        assert!(
3313            event_type.is_none(),
3314            "there's no associated chain event type with winning probability change"
3315        );
3316
3317        assert_eq!(
3318            db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
3319            new_minimum
3320        );
3321
3322        let tickets = db.get_tickets((&channel_1).into()).await?;
3323        assert_eq!(tickets.len(), 1);
3324
3325        let tickets = db.get_tickets((&channel_2).into()).await?;
3326        assert_eq!(tickets.len(), 0);
3327
3328        let stats = db.get_ticket_statistics(None).await?;
3329        let rejected_value: primitive_types::U256 = ticket_win_probs
3330            .iter()
3331            .filter(|p| **p < new_minimum)
3332            .map(|p| {
3333                primitive_types::U256::from(PRICE_PER_PACKET)
3334                    .div_f64(*p)
3335                    .expect("must divide")
3336            })
3337            .reduce(|a, b| a + b)
3338            .ok_or(anyhow!("must sum"))?;
3339
3340        assert_eq!(HoprBalance::from(rejected_value), stats.rejected_value);
3341
3342        Ok(())
3343    }
3344}