hopr_chain_indexer/
handlers.rs

1use std::{
2    cmp::Ordering,
3    fmt::{Debug, Formatter},
4    ops::Add,
5    sync::Arc,
6    time::{Duration, SystemTime},
7};
8
9use alloy::{primitives::B256, sol_types::SolEventInterface};
10use async_trait::async_trait;
11use hopr_bindings::{
12    hoprannouncements::HoprAnnouncements::HoprAnnouncementsEvents, hoprchannels::HoprChannels::HoprChannelsEvents,
13    hoprnetworkregistry::HoprNetworkRegistry::HoprNetworkRegistryEvents,
14    hoprnodemanagementmodule::HoprNodeManagementModule::HoprNodeManagementModuleEvents,
15    hoprnodesaferegistry::HoprNodeSafeRegistry::HoprNodeSafeRegistryEvents,
16    hoprticketpriceoracle::HoprTicketPriceOracle::HoprTicketPriceOracleEvents, hoprtoken::HoprToken::HoprTokenEvents,
17    hoprwinningprobabilityoracle::HoprWinningProbabilityOracle::HoprWinningProbabilityOracleEvents,
18};
19use hopr_chain_rpc::{HoprIndexerRpcOperations, Log};
20use hopr_chain_types::{
21    ContractAddresses,
22    chain_events::{ChainEventType, NetworkRegistryStatus, SignificantChainEvent},
23};
24use hopr_crypto_types::prelude::*;
25use hopr_db_sql::{
26    HoprDbAllOperations, OpenTransaction,
27    api::{info::DomainSeparator, tickets::TicketSelector},
28    errors::DbSqlError,
29    prelude::TicketMarker,
30};
31use hopr_internal_types::prelude::*;
32use hopr_primitive_types::prelude::*;
33use tracing::{debug, error, info, trace, warn};
34
35use crate::errors::{CoreEthereumIndexerError, Result};
36
37#[cfg(all(feature = "prometheus", not(test)))]
38lazy_static::lazy_static! {
39    static ref METRIC_INDEXER_LOG_COUNTERS: hopr_metrics::MultiCounter =
40        hopr_metrics::MultiCounter::new(
41            "hopr_indexer_contract_log_count",
42            "Counts of different HOPR contract logs processed by the Indexer",
43            &["contract"]
44    ).unwrap();
45}
46
47/// Event handling an object for on-chain operations
48///
49/// Once an on-chain operation is recorded by the [crate::block::Indexer], it is pre-processed
50/// and passed on to this object that handles event-specific actions for each on-chain operation.
51#[derive(Clone)]
52pub struct ContractEventHandlers<T, Db> {
53    /// channels, announcements, network_registry, token: contract addresses
54    /// whose event we process
55    addresses: Arc<ContractAddresses>,
56    /// Safe on-chain address which we are monitoring
57    safe_address: Address,
58    /// own address, aka message sender
59    chain_key: ChainKeypair, // TODO: store only address here once Ticket have TryFrom DB
60    /// callbacks to inform other modules
61    db: Db,
62    /// rpc operations to interact with the chain
63    rpc_operations: T,
64}
65
66impl<T, Db> Debug for ContractEventHandlers<T, Db> {
67    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct("ContractEventHandler")
69            .field("addresses", &self.addresses)
70            .field("safe_address", &self.safe_address)
71            .field("chain_key", &self.chain_key)
72            .finish_non_exhaustive()
73    }
74}
75
76impl<T, Db> ContractEventHandlers<T, Db>
77where
78    T: HoprIndexerRpcOperations + Clone + Send + 'static,
79    Db: HoprDbAllOperations + Clone,
80{
81    /// Creates a new instance of contract event handlers with RPC operations support.
82    ///
83    /// This constructor initializes the event handlers with all necessary dependencies
84    /// for processing blockchain events and making direct RPC calls for fresh state data.
85    ///
86    /// # Type Parameters
87    /// * `T` - Type implementing `HoprIndexerRpcOperations` for blockchain queries
88    ///
89    /// # Arguments
90    /// * `addresses` - Contract addresses configuration
91    /// * `safe_address` - The node's safe address for filtering relevant events
92    /// * `chain_key` - Cryptographic key for chain operations
93    /// * `db` - Database connection for persistent storage
94    /// * `rpc_operations` - RPC interface for direct blockchain queries
95    ///
96    /// # Returns
97    /// * `Self` - New instance of `ContractEventHandlers`
98    pub fn new(
99        addresses: ContractAddresses,
100        safe_address: Address,
101        chain_key: ChainKeypair,
102        db: Db,
103        rpc_operations: T,
104    ) -> Self {
105        Self {
106            addresses: Arc::new(addresses),
107            safe_address,
108            chain_key,
109            db,
110            rpc_operations,
111        }
112    }
113
114    async fn on_announcement_event(
115        &self,
116        tx: &OpenTransaction,
117        event: HoprAnnouncementsEvents,
118        block_number: u32,
119        _is_synced: bool,
120    ) -> Result<Option<ChainEventType>> {
121        #[cfg(all(feature = "prometheus", not(test)))]
122        METRIC_INDEXER_LOG_COUNTERS.increment(&["announcements"]);
123
124        match event {
125            HoprAnnouncementsEvents::AddressAnnouncement(address_announcement) => {
126                debug!(
127                    multiaddress = &address_announcement.baseMultiaddr,
128                    address = &address_announcement.node.to_string(),
129                    "on_announcement_event: AddressAnnouncement",
130                );
131                // safeguard against empty multiaddrs, skip
132                if address_announcement.baseMultiaddr.is_empty() {
133                    warn!(
134                        address = %address_announcement.node,
135                        "encountered empty multiaddress announcement",
136                    );
137                    return Ok(None);
138                }
139                let node_address: Address = address_announcement.node.into();
140
141                return match self
142                    .db
143                    .insert_announcement(
144                        Some(tx),
145                        node_address,
146                        address_announcement.baseMultiaddr.parse()?,
147                        block_number,
148                    )
149                    .await
150                {
151                    Ok(account) => Ok(Some(ChainEventType::Announcement {
152                        peer: account.public_key.into(),
153                        address: account.chain_addr,
154                        multiaddresses: vec![account.get_multiaddr().expect("not must contain multiaddr")],
155                    })),
156                    Err(DbSqlError::MissingAccount) => Err(CoreEthereumIndexerError::AnnounceBeforeKeyBinding),
157                    Err(e) => Err(e.into()),
158                };
159            }
160            HoprAnnouncementsEvents::KeyBinding(key_binding) => {
161                debug!(
162                    address = %key_binding.chain_key,
163                    public_key = %key_binding.ed25519_pub_key,
164                    "on_announcement_event: KeyBinding",
165                );
166                match KeyBinding::from_parts(
167                    key_binding.chain_key.into(),
168                    key_binding.ed25519_pub_key.0.try_into()?,
169                    OffchainSignature::try_from((key_binding.ed25519_sig_0.0, key_binding.ed25519_sig_1.0))?,
170                ) {
171                    Ok(binding) => {
172                        match self
173                            .db
174                            .insert_account(
175                                Some(tx),
176                                AccountEntry {
177                                    public_key: binding.packet_key,
178                                    chain_addr: binding.chain_key,
179                                    entry_type: AccountType::NotAnnounced,
180                                    published_at: block_number,
181                                },
182                            )
183                            .await
184                        {
185                            Ok(_) => (),
186                            Err(err) => {
187                                // We handle these errors gracefully and don't want the indexer to crash,
188                                // because anybody could write faulty entries into the announcement contract.
189                                error!(%err, "failed to store announcement key binding")
190                            }
191                        }
192                    }
193                    Err(e) => {
194                        warn!(
195                            address = ?key_binding.chain_key,
196                            error = %e,
197                            "Filtering announcement with invalid signature",
198
199                        )
200                    }
201                }
202            }
203            HoprAnnouncementsEvents::RevokeAnnouncement(revocation) => {
204                let node_address: Address = revocation.node.into();
205                match self.db.delete_all_announcements(Some(tx), node_address).await {
206                    Err(DbSqlError::MissingAccount) => {
207                        return Err(CoreEthereumIndexerError::RevocationBeforeKeyBinding);
208                    }
209                    Err(e) => return Err(e.into()),
210                    _ => {}
211                }
212            }
213        };
214
215        Ok(None)
216    }
217
218    async fn on_channel_event(
219        &self,
220        tx: &OpenTransaction,
221        event: HoprChannelsEvents,
222        is_synced: bool,
223    ) -> Result<Option<ChainEventType>> {
224        #[cfg(all(feature = "prometheus", not(test)))]
225        METRIC_INDEXER_LOG_COUNTERS.increment(&["channels"]);
226
227        match event {
228            HoprChannelsEvents::ChannelBalanceDecreased(balance_decreased) => {
229                let channel_id = balance_decreased.channelId.0.into();
230
231                let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
232                    Ok(channel) => channel,
233                    Err(e) => {
234                        error!(%channel_id, %e, "failed to begin channel update on on_channel_balance_decreased_event");
235                        return Err(e.into());
236                    }
237                };
238
239                trace!(
240                    %channel_id,
241                    is_channel = maybe_channel.is_some(),
242                    "on_channel_balance_decreased_event",
243                );
244
245                if let Some(channel_edits) = maybe_channel {
246                    let new_balance = HoprBalance::from_be_bytes(balance_decreased.newBalance.to_be_bytes::<12>());
247                    let diff = channel_edits.entry().balance - new_balance;
248
249                    let updated_channel = self
250                        .db
251                        .finish_channel_update(tx.into(), channel_edits.change_balance(new_balance))
252                        .await?
253                        .ok_or(CoreEthereumIndexerError::ProcessError(format!(
254                            "channel balance decreased event for channel {channel_id} did not return an updated \
255                             channel"
256                        )))?;
257
258                    if is_synced
259                        && (updated_channel.source == self.chain_key.public().to_address()
260                            || updated_channel.destination == self.chain_key.public().to_address())
261                    {
262                        info!("updating safe balance from chain after channel balance decreased event");
263                        match self.rpc_operations.get_hopr_balance(self.safe_address).await {
264                            Ok(balance) => {
265                                self.db.set_safe_hopr_balance(Some(tx), balance).await?;
266                            }
267                            Err(error) => {
268                                error!(%error, "error getting safe balance from chain after channel balance decreased event");
269                            }
270                        }
271                    }
272
273                    Ok(Some(ChainEventType::ChannelBalanceDecreased(updated_channel, diff)))
274                } else {
275                    error!(%channel_id, "observed balance decreased event for a channel that does not exist");
276                    self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
277                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
278                }
279            }
280            HoprChannelsEvents::ChannelBalanceIncreased(balance_increased) => {
281                let channel_id = balance_increased.channelId.0.into();
282
283                let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
284                    Ok(channel) => channel,
285                    Err(e) => {
286                        error!(%channel_id, %e, "failed to begin channel update on on_channel_balance_increased_event");
287                        return Err(e.into());
288                    }
289                };
290
291                trace!(
292                    %channel_id,
293                    is_channel = maybe_channel.is_some(),
294                    "on_channel_balance_increased_event",
295                );
296
297                if let Some(channel_edits) = maybe_channel {
298                    let new_balance = HoprBalance::from_be_bytes(balance_increased.newBalance.to_be_bytes::<12>());
299                    let diff = new_balance - channel_edits.entry().balance;
300
301                    let updated_channel = self
302                        .db
303                        .finish_channel_update(tx.into(), channel_edits.change_balance(new_balance))
304                        .await?
305                        .ok_or(CoreEthereumIndexerError::ProcessError(format!(
306                            "channel balance increased event for channel {channel_id} did not return an updated \
307                             channel"
308                        )))?;
309
310                    if updated_channel.source == self.chain_key.public().to_address() && is_synced {
311                        info!("updating safe balance from chain after channel balance increased event");
312                        match self.rpc_operations.get_hopr_balance(self.safe_address).await {
313                            Ok(balance) => {
314                                self.db.set_safe_hopr_balance(Some(tx), balance).await?;
315                            }
316                            Err(error) => {
317                                error!(%error, "error getting safe balance from chain after channel balance increased event");
318                            }
319                        }
320                        info!("updating safe allowance from chain after channel balance increased event");
321                        match self
322                            .rpc_operations
323                            .get_hopr_allowance(self.safe_address, self.addresses.channels)
324                            .await
325                        {
326                            Ok(allowance) => {
327                                self.db.set_safe_hopr_allowance(Some(tx), allowance).await?;
328                            }
329                            Err(error) => {
330                                error!(%error, "error getting safe allowance from chain after channel balance increased event");
331                            }
332                        }
333                    }
334
335                    Ok(Some(ChainEventType::ChannelBalanceIncreased(updated_channel, diff)))
336                } else {
337                    error!(%channel_id, "observed balance increased event for a channel that does not exist");
338                    self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
339                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
340                }
341            }
342            HoprChannelsEvents::ChannelClosed(channel_closed) => {
343                let channel_id = channel_closed.channelId.0.into();
344
345                let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
346                    Ok(channel) => channel,
347                    Err(e) => {
348                        error!(%channel_id, %e, "failed to begin channel update on on_channel_closed_event");
349                        return Err(e.into());
350                    }
351                };
352
353                trace!(
354                    %channel_id,
355                    is_channel = maybe_channel.is_some(),
356                    "on_channel_closed_event",
357                );
358
359                if let Some(channel_edits) = maybe_channel {
360                    let channel_id = channel_edits.entry().get_id();
361                    let orientation = channel_edits.entry().orientation(&self.chain_key.public().to_address());
362
363                    // If the channel is our own (incoming or outgoing) reset its fields
364                    // and change its state to Closed.
365                    let updated_channel = if let Some((direction, _)) = orientation {
366                        // Set all channel fields like we do on-chain on close
367                        let channel_edits = channel_edits
368                            .change_status(ChannelStatus::Closed)
369                            .change_balance(HoprBalance::zero())
370                            .change_ticket_index(0);
371
372                        let updated_channel = self.db.finish_channel_update(tx.into(), channel_edits).await?.ok_or(
373                            CoreEthereumIndexerError::ProcessError(format!(
374                                "channel closed event for channel {channel_id} did not return an updated channel",
375                            )),
376                        )?;
377
378                        // Perform additional tasks based on the channel's direction
379                        match direction {
380                            ChannelDirection::Incoming => {
381                                // On incoming channel, mark all unredeemed tickets as neglected
382                                self.db
383                                    .mark_tickets_as(updated_channel.into(), TicketMarker::Neglected)
384                                    .await?;
385                            }
386                            ChannelDirection::Outgoing => {
387                                // On outgoing channels, reset the current_ticket_index to zero
388                                self.db.reset_outgoing_ticket_index(channel_id).await?;
389                            }
390                        }
391                        updated_channel
392                    } else {
393                        // Closed channels that are not our own can be safely removed from the database
394                        let updated_channel = self
395                            .db
396                            .finish_channel_update(tx.into(), channel_edits.delete())
397                            .await?
398                            .ok_or(CoreEthereumIndexerError::ProcessError(format!(
399                                "channel closed event for channel {channel_id} did not return an updated channel",
400                            )))?;
401
402                        debug!(%channel_id, "foreign closed closed channel was deleted");
403                        updated_channel
404                    };
405
406                    Ok(Some(ChainEventType::ChannelClosed(updated_channel)))
407                } else {
408                    error!(%channel_id, "observed closure finalization event for a channel that does not exist.");
409                    self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
410                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
411                }
412            }
413            HoprChannelsEvents::ChannelOpened(channel_opened) => {
414                let source: Address = channel_opened.source.into();
415                let destination: Address = channel_opened.destination.into();
416                let channel_id = generate_channel_id(&source, &destination);
417
418                let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
419                    Ok(channel) => channel,
420                    Err(e) => {
421                        error!(%source, %destination, %channel_id, %e, "failed to begin channel update on on_channel_opened_event");
422                        return Err(e.into());
423                    }
424                };
425
426                let channel = if let Some(channel_edits) = maybe_channel {
427                    // Check that we're not receiving the Open event without the channel being Close prior, or that the
428                    // channel is not yet corrupted
429
430                    if channel_edits.entry().status != ChannelStatus::Closed {
431                        warn!(%source, %destination, %channel_id, "received Open event for a channel that is not Closed, marking it as corrupted");
432
433                        self.db.finish_channel_update(tx.into(), channel_edits.delete()).await?;
434                        self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
435
436                        return Ok(None);
437                    }
438
439                    trace!(%source, %destination, %channel_id, "on_channel_reopened_event");
440
441                    let current_epoch = channel_edits.entry().channel_epoch;
442
443                    // cleanup tickets from previous epochs on channel re-opening
444                    if source == self.chain_key.public().to_address()
445                        || destination == self.chain_key.public().to_address()
446                    {
447                        self.db
448                            .mark_tickets_as(TicketSelector::new(channel_id, current_epoch), TicketMarker::Neglected)
449                            .await?;
450
451                        self.db.reset_outgoing_ticket_index(channel_id).await?;
452                    }
453
454                    // set all channel fields like we do on-chain on close
455                    self.db
456                        .finish_channel_update(
457                            tx.into(),
458                            channel_edits
459                                .change_ticket_index(0_u32)
460                                .change_epoch(current_epoch.add(1))
461                                .change_status(ChannelStatus::Open),
462                        )
463                        .await?
464                        .ok_or(CoreEthereumIndexerError::ProcessError(format!(
465                            "channel opened event for channel {channel_id} did not return an updated channel",
466                        )))?
467                } else {
468                    trace!(%source, %destination, %channel_id, "on_channel_opened_event");
469
470                    let new_channel = ChannelEntry::new(
471                        source,
472                        destination,
473                        0_u32.into(),
474                        0_u32.into(),
475                        ChannelStatus::Open,
476                        1_u32.into(),
477                    );
478
479                    self.db.upsert_channel(tx.into(), new_channel).await?;
480                    new_channel
481                };
482
483                Ok(Some(ChainEventType::ChannelOpened(channel)))
484            }
485            HoprChannelsEvents::TicketRedeemed(ticket_redeemed) => {
486                let channel_id = ticket_redeemed.channelId.0.into();
487
488                let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
489                    Ok(channel) => channel,
490                    Err(e) => {
491                        error!(%channel_id, %e, "failed to begin channel update on on_ticket_redeemed_event");
492                        return Err(e.into());
493                    }
494                };
495
496                if let Some(channel_edits) = maybe_channel {
497                    let ack_ticket = match channel_edits.entry().direction(&self.chain_key.public().to_address()) {
498                        // For channels where destination is us, it means that our ticket
499                        // has been redeemed, so mark it in the DB as redeemed
500                        Some(ChannelDirection::Incoming) => {
501                            // Filter all BeingRedeemed tickets in this channel and its current epoch
502                            let mut matching_tickets = self
503                                .db
504                                .get_tickets(
505                                    TicketSelector::from(channel_edits.entry())
506                                        .with_state(AcknowledgedTicketStatus::BeingRedeemed),
507                                )
508                                .await?
509                                .into_iter()
510                                .filter(|ticket| {
511                                    // The ticket that has been redeemed at this point has: index + index_offset - 1 ==
512                                    // new_ticket_index - 1 Since unaggregated
513                                    // tickets have index_offset = 1, for the unagg case this leads to: index ==
514                                    // new_ticket_index - 1
515                                    let ticket_idx = ticket.verified_ticket().index;
516                                    let ticket_off = ticket.verified_ticket().index_offset as u64;
517
518                                    ticket_idx + ticket_off == ticket_redeemed.newTicketIndex.to::<u64>()
519                                })
520                                .collect::<Vec<_>>();
521
522                            match matching_tickets.len().cmp(&1) {
523                                Ordering::Equal => {
524                                    let ack_ticket = matching_tickets.pop().unwrap();
525
526                                    self.db
527                                        .mark_tickets_as((&ack_ticket).into(), TicketMarker::Redeemed)
528                                        .await?;
529                                    info!(%ack_ticket, "ticket marked as redeemed");
530                                    Some(ack_ticket)
531                                }
532                                Ordering::Less => {
533                                    error!(
534                                        idx = %ticket_redeemed.newTicketIndex.to::<u64>() - 1,
535                                        entry = %channel_edits.entry(),
536                                        "could not find acknowledged 'BeingRedeemed' ticket",
537                                    );
538                                    // This is not an error, because the ticket might've become neglected before
539                                    // the ticket redemption could finish
540                                    None
541                                }
542                                Ordering::Greater => {
543                                    error!(
544                                        count = matching_tickets.len(),
545                                        index = %ticket_redeemed.newTicketIndex.to::<u64>() - 1,
546                                        entry = %channel_edits.entry(),
547                                        "found tickets matching 'BeingRedeemed'",
548                                    );
549
550                                    let entry_str = channel_edits.entry().to_string();
551
552                                    self.db.finish_channel_update(tx.into(), channel_edits.delete()).await?;
553                                    self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
554
555                                    return Err(CoreEthereumIndexerError::ProcessError(format!(
556                                        "multiple tickets matching idx {} found in {}",
557                                        ticket_redeemed.newTicketIndex.to::<u64>() - 1,
558                                        entry_str
559                                    )));
560                                }
561                            }
562                        }
563                        // For the channel where the source is us, it means a ticket that we
564                        // issue has been redeemed.
565                        // So we just need to be sure our outgoing ticket
566                        // index value in the cache is at least the index of the redeemed ticket
567                        Some(ChannelDirection::Outgoing) => {
568                            // We need to ensure the outgoing ticket index is at least this new value
569                            debug!(channel = %channel_edits.entry(), "observed redeem event on an outgoing channel");
570                            self.db
571                                .compare_and_set_outgoing_ticket_index(
572                                    channel_edits.entry().get_id(),
573                                    ticket_redeemed.newTicketIndex.to::<u64>(),
574                                )
575                                .await?;
576                            None
577                        }
578                        // For a channel where neither source nor destination is us, we don't care
579                        None => {
580                            // Not our redeem event
581                            debug!(channel = %channel_edits.entry(), "observed redeem event on a foreign channel");
582                            None
583                        }
584                    };
585
586                    // Update the ticket index on the Channel entry and get the updated model
587                    let channel = self
588                        .db
589                        .finish_channel_update(
590                            tx.into(),
591                            channel_edits.change_ticket_index(U256::from_be_bytes(
592                                ticket_redeemed.newTicketIndex.to_be_bytes::<6>(),
593                            )),
594                        )
595                        .await?
596                        .ok_or(CoreEthereumIndexerError::ProcessError(format!(
597                            "ticket redeemed event for channel {channel_id} did not return an updated channel",
598                        )))?;
599
600                    // Neglect all the tickets in this channel
601                    // which have a lower ticket index than `ticket_redeemed.new_ticket_index`
602                    self.db
603                        .mark_tickets_as(
604                            TicketSelector::from(&channel)
605                                .with_index_range(..ticket_redeemed.newTicketIndex.to::<u64>()),
606                            TicketMarker::Neglected,
607                        )
608                        .await?;
609
610                    Ok(Some(ChainEventType::TicketRedeemed(channel, ack_ticket)))
611                } else {
612                    error!(%channel_id, "observed ticket redeem on a channel that we don't have in the DB");
613                    self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
614                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
615                }
616            }
617            HoprChannelsEvents::OutgoingChannelClosureInitiated(closure_initiated) => {
618                let channel_id = closure_initiated.channelId.0.into();
619
620                let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
621                    Ok(channel) => channel,
622                    Err(e) => {
623                        error!(%channel_id, %e, "failed to begin channel update on on_outgoing_channel_closure_initiated_event");
624                        return Err(e.into());
625                    }
626                };
627
628                let closure_time: u32 = closure_initiated.closureTime;
629                if let Some(channel_edits) = maybe_channel {
630                    let new_status = ChannelStatus::PendingToClose(
631                        SystemTime::UNIX_EPOCH.add(Duration::from_secs(closure_time.into())),
632                    );
633
634                    let channel = self
635                        .db
636                        .finish_channel_update(tx.into(), channel_edits.change_status(new_status))
637                        .await?
638                        .ok_or(CoreEthereumIndexerError::ProcessError(format!(
639                            "channel closure initiation event for channel {channel_id} did not return an updated \
640                             channel",
641                        )))?;
642
643                    Ok(Some(ChainEventType::ChannelClosureInitiated(channel)))
644                } else {
645                    error!(%channel_id, "observed channel closure initiation on a channel that we don't have in the DB");
646                    self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
647                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
648                }
649            }
650            HoprChannelsEvents::DomainSeparatorUpdated(domain_separator_updated) => {
651                self.db
652                    .set_domain_separator(
653                        Some(tx),
654                        DomainSeparator::Channel,
655                        domain_separator_updated.domainSeparator.0.into(),
656                    )
657                    .await?;
658
659                Ok(None)
660            }
661            HoprChannelsEvents::LedgerDomainSeparatorUpdated(ledger_domain_separator_updated) => {
662                self.db
663                    .set_domain_separator(
664                        Some(tx),
665                        DomainSeparator::Ledger,
666                        ledger_domain_separator_updated.ledgerDomainSeparator.0.into(),
667                    )
668                    .await?;
669
670                Ok(None)
671            }
672        }
673    }
674
675    async fn on_token_event(
676        &self,
677        tx: &OpenTransaction,
678        event: HoprTokenEvents,
679        is_synced: bool,
680    ) -> Result<Option<ChainEventType>> {
681        #[cfg(all(feature = "prometheus", not(test)))]
682        METRIC_INDEXER_LOG_COUNTERS.increment(&["token"]);
683
684        match event {
685            HoprTokenEvents::Transfer(transferred) => {
686                let from: Address = transferred.from.into();
687                let to: Address = transferred.to.into();
688
689                trace!(
690                    safe_address = %&self.safe_address, %from, %to,
691                    "on_token_transfer_event"
692                );
693
694                if to.ne(&self.safe_address) && from.ne(&self.safe_address) {
695                    error!(
696                    safe_address = %&self.safe_address, %from, %to,
697                        "filter misconfiguration: transfer event not involving the safe");
698                    return Ok(None);
699                }
700
701                if is_synced {
702                    info!("updating safe balance from chain after transfer event");
703                    match self.rpc_operations.get_hopr_balance(self.safe_address).await {
704                        Ok(balance) => {
705                            self.db.set_safe_hopr_balance(Some(tx), balance).await?;
706                        }
707                        Err(error) => {
708                            error!(%error, "error getting safe balance from chain after transfer event");
709                        }
710                    }
711                    info!("updating safe allowance from chain after transfer event");
712                    match self
713                        .rpc_operations
714                        .get_hopr_allowance(self.safe_address, self.addresses.channels)
715                        .await
716                    {
717                        Ok(allowance) => {
718                            self.db.set_safe_hopr_allowance(Some(tx), allowance).await?;
719                        }
720                        Err(error) => {
721                            error!(%error, "error getting safe allowance from chain after transfer event");
722                        }
723                    }
724                }
725            }
726            HoprTokenEvents::Approval(approved) => {
727                let owner: Address = approved.owner.into();
728                let spender: Address = approved.spender.into();
729
730                trace!(
731                    address = %&self.safe_address, %owner, %spender, allowance = %approved.value,
732                    "on_token_approval_event",
733
734                );
735
736                if owner.ne(&self.safe_address) || spender.ne(&self.addresses.channels) {
737                    error!(
738                        address = %&self.safe_address, %owner, %spender, allowance = %approved.value,
739                        "filter misconfiguration: approval event not involving the safe and channels contract");
740                    return Ok(None);
741                }
742
743                // if approval is for tokens on Safe contract to be spent by HoprChannels
744                if is_synced {
745                    info!("updating safe allowance from chain after approval event");
746                    match self
747                        .rpc_operations
748                        .get_hopr_allowance(self.safe_address, self.addresses.channels)
749                        .await
750                    {
751                        Ok(allowance) => {
752                            self.db.set_safe_hopr_allowance(Some(tx), allowance).await?;
753                        }
754                        Err(error) => {
755                            error!(%error, "error getting safe allowance from chain after approval event");
756                        }
757                    }
758                }
759            }
760            _ => error!("Implement all the other filters for HoprTokenEvents"),
761        }
762
763        Ok(None)
764    }
765
766    async fn on_network_registry_event(
767        &self,
768        tx: &OpenTransaction,
769        event: HoprNetworkRegistryEvents,
770        _is_synced: bool,
771    ) -> Result<Option<ChainEventType>> {
772        #[cfg(all(feature = "prometheus", not(test)))]
773        METRIC_INDEXER_LOG_COUNTERS.increment(&["network_registry"]);
774
775        match event {
776            HoprNetworkRegistryEvents::DeregisteredByManager(deregistered) => {
777                debug!(
778                    node_address = %deregistered.nodeAddress,
779                    "on_network_registry_deregistered_by_manager_event",
780                );
781                let node_address: Address = deregistered.nodeAddress.into();
782                self.db
783                    .set_access_in_network_registry(Some(tx), node_address, false)
784                    .await?;
785
786                return Ok(Some(ChainEventType::NetworkRegistryUpdate(
787                    node_address,
788                    NetworkRegistryStatus::Denied,
789                )));
790            }
791            HoprNetworkRegistryEvents::Deregistered(deregistered) => {
792                debug!(
793                    node_address = %deregistered.nodeAddress,
794                    "on_network_registry_deregistered_event",
795                );
796                let node_address: Address = deregistered.nodeAddress.into();
797                self.db
798                    .set_access_in_network_registry(Some(tx), node_address, false)
799                    .await?;
800
801                return Ok(Some(ChainEventType::NetworkRegistryUpdate(
802                    node_address,
803                    NetworkRegistryStatus::Denied,
804                )));
805            }
806            HoprNetworkRegistryEvents::RegisteredByManager(registered) => {
807                let node_address: Address = registered.nodeAddress.into();
808                debug!(
809                    %node_address,
810                    "Node registered by manager, adding to the network registry",
811                );
812                self.db
813                    .set_access_in_network_registry(Some(tx), node_address, true)
814                    .await?;
815
816                if node_address == self.chain_key.public().to_address() {
817                    info!(
818                        "This node has been added to the registry, node activation process continues on: http://hub.hoprnet.org/."
819                    );
820                }
821
822                return Ok(Some(ChainEventType::NetworkRegistryUpdate(
823                    node_address,
824                    NetworkRegistryStatus::Allowed,
825                )));
826            }
827            HoprNetworkRegistryEvents::Registered(registered) => {
828                let node_address: Address = registered.nodeAddress.into();
829                debug!(
830                    %node_address,
831                    "Node registered, adding to the network registry",
832                );
833                self.db
834                    .set_access_in_network_registry(Some(tx), node_address, true)
835                    .await?;
836
837                if node_address == self.chain_key.public().to_address() {
838                    info!(
839                        "This node has been added to the registry, node can now continue the node activation process on: http://hub.hoprnet.org/."
840                    );
841                }
842
843                return Ok(Some(ChainEventType::NetworkRegistryUpdate(
844                    node_address,
845                    NetworkRegistryStatus::Allowed,
846                )));
847            }
848            HoprNetworkRegistryEvents::EligibilityUpdated(eligibility_updated) => {
849                debug!(
850                    staking_account = %eligibility_updated.stakingAccount,
851                    eligibility = %eligibility_updated.eligibility,
852                    "Node eligibility updated, updating the safe eligibility",
853                );
854                let account: Address = eligibility_updated.stakingAccount.into();
855                self.db
856                    .set_safe_eligibility(Some(tx), account, eligibility_updated.eligibility)
857                    .await?;
858            }
859            HoprNetworkRegistryEvents::NetworkRegistryStatusUpdated(enabled) => {
860                debug!(enabled = enabled.isEnabled, "on_network_registry_status_updated_event",);
861                self.db
862                    .set_network_registry_enabled(Some(tx), enabled.isEnabled)
863                    .await?;
864            }
865            _ => {
866                debug!("on_network_registry_event - not implemented for event: {:?}", event);
867            } // Not important to at the moment
868        };
869
870        Ok(None)
871    }
872
873    async fn on_node_safe_registry_event(
874        &self,
875        tx: &OpenTransaction,
876        event: HoprNodeSafeRegistryEvents,
877        _is_synced: bool,
878    ) -> Result<Option<ChainEventType>> {
879        #[cfg(all(feature = "prometheus", not(test)))]
880        METRIC_INDEXER_LOG_COUNTERS.increment(&["safe_registry"]);
881
882        match event {
883            HoprNodeSafeRegistryEvents::RegisteredNodeSafe(registered) => {
884                if self.chain_key.public().to_address() == registered.nodeAddress.into() {
885                    info!(safe_address = %registered.safeAddress, "Node safe registered", );
886                    // NOTE: we don't store this state in the DB
887                    return Ok(Some(ChainEventType::NodeSafeRegistered(registered.safeAddress.into())));
888                }
889            }
890            HoprNodeSafeRegistryEvents::DergisteredNodeSafe(deregistered) => {
891                if self.chain_key.public().to_address() == deregistered.nodeAddress.into() {
892                    info!("Node safe unregistered");
893                    // NOTE: we don't store this state in the DB
894                }
895            }
896            HoprNodeSafeRegistryEvents::DomainSeparatorUpdated(domain_separator_updated) => {
897                self.db
898                    .set_domain_separator(
899                        Some(tx),
900                        DomainSeparator::SafeRegistry,
901                        domain_separator_updated.domainSeparator.0.into(),
902                    )
903                    .await?;
904            }
905        }
906
907        Ok(None)
908    }
909
910    async fn on_node_management_module_event(
911        &self,
912        _db: &OpenTransaction,
913        _event: HoprNodeManagementModuleEvents,
914        _is_synced: bool,
915    ) -> Result<Option<ChainEventType>> {
916        #[cfg(all(feature = "prometheus", not(test)))]
917        METRIC_INDEXER_LOG_COUNTERS.increment(&["node_management_module"]);
918
919        // Don't care at the moment
920        Ok(None)
921    }
922
923    async fn on_ticket_winning_probability_oracle_event(
924        &self,
925        tx: &OpenTransaction,
926        event: HoprWinningProbabilityOracleEvents,
927        _is_synced: bool,
928    ) -> Result<Option<ChainEventType>> {
929        #[cfg(all(feature = "prometheus", not(test)))]
930        METRIC_INDEXER_LOG_COUNTERS.increment(&["win_prob_oracle"]);
931
932        match event {
933            HoprWinningProbabilityOracleEvents::WinProbUpdated(update) => {
934                let old_minimum_win_prob: WinningProbability = update.oldWinProb.to_be_bytes().into();
935                let new_minimum_win_prob: WinningProbability = update.newWinProb.to_be_bytes().into();
936
937                trace!(
938                    %old_minimum_win_prob,
939                    %new_minimum_win_prob,
940                    "on_ticket_minimum_win_prob_updated",
941                );
942
943                self.db
944                    .set_minimum_incoming_ticket_win_prob(Some(tx), new_minimum_win_prob)
945                    .await?;
946
947                info!(
948                    %old_minimum_win_prob,
949                    %new_minimum_win_prob,
950                    "minimum ticket winning probability updated"
951                );
952
953                // If the old minimum was less strict, we need to mark of all the
954                // tickets below the new higher minimum as rejected
955                if old_minimum_win_prob.approx_cmp(&new_minimum_win_prob).is_lt() {
956                    let mut selector: Option<TicketSelector> = None;
957                    for channel in self.db.get_incoming_channels(tx.into()).await? {
958                        selector = selector
959                            .map(|s| s.also_on_channel(channel.get_id(), channel.channel_epoch))
960                            .or_else(|| Some(TicketSelector::from(channel)));
961                    }
962                    // Reject unredeemed tickets on all the channels with win prob lower than the new one
963                    if let Some(selector) = selector {
964                        let num_rejected = self
965                            .db
966                            .mark_tickets_as(
967                                selector.with_winning_probability(..new_minimum_win_prob),
968                                TicketMarker::Rejected,
969                            )
970                            .await?;
971                        info!(
972                            count = num_rejected,
973                            "unredeemed tickets were rejected, because the minimum winning probability has been \
974                             increased"
975                        );
976                    }
977                }
978            }
979            _ => {
980                // Ignore other events
981            }
982        }
983        Ok(None)
984    }
985
986    async fn on_ticket_price_oracle_event(
987        &self,
988        tx: &OpenTransaction,
989        event: HoprTicketPriceOracleEvents,
990        _is_synced: bool,
991    ) -> Result<Option<ChainEventType>> {
992        #[cfg(all(feature = "prometheus", not(test)))]
993        METRIC_INDEXER_LOG_COUNTERS.increment(&["price_oracle"]);
994
995        match event {
996            HoprTicketPriceOracleEvents::TicketPriceUpdated(update) => {
997                trace!(
998                    old = update._0.to_string(),
999                    new = update._1.to_string(),
1000                    "on_ticket_price_updated",
1001                );
1002
1003                self.db
1004                    .update_ticket_price(Some(tx), HoprBalance::from_be_bytes(update._1.to_be_bytes::<32>()))
1005                    .await?;
1006
1007                info!(price = %update._1, "ticket price updated");
1008            }
1009            HoprTicketPriceOracleEvents::OwnershipTransferred(_event) => {
1010                // ignore ownership transfer event
1011            }
1012        }
1013        Ok(None)
1014    }
1015
1016    #[tracing::instrument(level = "debug", skip(self, slog), fields(log=%slog))]
1017    async fn process_log_event(
1018        &self,
1019        tx: &OpenTransaction,
1020        slog: SerializableLog,
1021        is_synced: bool,
1022    ) -> Result<Option<ChainEventType>> {
1023        trace!(log = %slog, "log content");
1024
1025        let log = Log::from(slog.clone());
1026
1027        let primitive_log = alloy::primitives::Log::new(
1028            slog.address.into(),
1029            slog.topics.iter().map(|h| B256::from_slice(h.as_ref())).collect(),
1030            slog.data.clone().into(),
1031        )
1032        .ok_or_else(|| {
1033            CoreEthereumIndexerError::ProcessError(format!("failed to convert log to primitive log: {slog:?}"))
1034        })?;
1035
1036        if log.address.eq(&self.addresses.announcements) {
1037            let bn = log.block_number as u32;
1038            let event = HoprAnnouncementsEvents::decode_log(&primitive_log)?;
1039            self.on_announcement_event(tx, event.data, bn, is_synced).await
1040        } else if log.address.eq(&self.addresses.channels) {
1041            let event = HoprChannelsEvents::decode_log(&primitive_log)?;
1042            match self.on_channel_event(tx, event.data, is_synced).await {
1043                Ok(res) => Ok(res),
1044                Err(CoreEthereumIndexerError::ChannelDoesNotExist) => {
1045                    // This is not an error, just a log that we don't have the channel in the DB
1046                    debug!(
1047                        ?log,
1048                        "channel didn't exist in the db. Created a corrupted channel entry and ignored event"
1049                    );
1050                    Ok(None)
1051                }
1052                Err(e) => Err(e),
1053            }
1054        } else if log.address.eq(&self.addresses.network_registry) {
1055            let event = HoprNetworkRegistryEvents::decode_log(&primitive_log)?;
1056            self.on_network_registry_event(tx, event.data, is_synced).await
1057        } else if log.address.eq(&self.addresses.token) {
1058            let event = HoprTokenEvents::decode_log(&primitive_log)?;
1059            self.on_token_event(tx, event.data, is_synced).await
1060        } else if log.address.eq(&self.addresses.safe_registry) {
1061            let event = HoprNodeSafeRegistryEvents::decode_log(&primitive_log)?;
1062            self.on_node_safe_registry_event(tx, event.data, is_synced).await
1063        } else if log.address.eq(&self.addresses.module_implementation) {
1064            let event = HoprNodeManagementModuleEvents::decode_log(&primitive_log)?;
1065            self.on_node_management_module_event(tx, event.data, is_synced).await
1066        } else if log.address.eq(&self.addresses.price_oracle) {
1067            let event = HoprTicketPriceOracleEvents::decode_log(&primitive_log)?;
1068            self.on_ticket_price_oracle_event(tx, event.data, is_synced).await
1069        } else if log.address.eq(&self.addresses.win_prob_oracle) {
1070            let event = HoprWinningProbabilityOracleEvents::decode_log(&primitive_log)?;
1071            self.on_ticket_winning_probability_oracle_event(tx, event.data, is_synced)
1072                .await
1073        } else {
1074            #[cfg(all(feature = "prometheus", not(test)))]
1075            METRIC_INDEXER_LOG_COUNTERS.increment(&["unknown"]);
1076
1077            error!(
1078                address = %log.address, log = ?log,
1079                "on_event error - unknown contract address, received log"
1080            );
1081            return Err(CoreEthereumIndexerError::UnknownContract(log.address));
1082        }
1083    }
1084}
1085
1086#[async_trait]
1087impl<T, Db> crate::traits::ChainLogHandler for ContractEventHandlers<T, Db>
1088where
1089    T: HoprIndexerRpcOperations + Clone + Send + Sync + 'static,
1090    Db: HoprDbAllOperations + Clone + Debug + Send + Sync + 'static,
1091{
1092    fn contract_addresses(&self) -> Vec<Address> {
1093        vec![
1094            self.addresses.announcements,
1095            self.addresses.channels,
1096            self.addresses.module_implementation,
1097            self.addresses.network_registry,
1098            self.addresses.price_oracle,
1099            self.addresses.win_prob_oracle,
1100            self.addresses.safe_registry,
1101            self.addresses.token,
1102        ]
1103    }
1104
1105    fn contract_addresses_map(&self) -> Arc<ContractAddresses> {
1106        self.addresses.clone()
1107    }
1108
1109    fn safe_address(&self) -> Address {
1110        self.safe_address
1111    }
1112
1113    fn contract_address_topics(&self, contract: Address) -> Vec<B256> {
1114        if contract.eq(&self.addresses.announcements) {
1115            crate::constants::topics::announcement()
1116        } else if contract.eq(&self.addresses.channels) {
1117            crate::constants::topics::channel()
1118        } else if contract.eq(&self.addresses.module_implementation) {
1119            crate::constants::topics::module_implementation()
1120        } else if contract.eq(&self.addresses.network_registry) {
1121            crate::constants::topics::network_registry()
1122        } else if contract.eq(&self.addresses.price_oracle) {
1123            crate::constants::topics::ticket_price_oracle()
1124        } else if contract.eq(&self.addresses.win_prob_oracle) {
1125            crate::constants::topics::winning_prob_oracle()
1126        } else if contract.eq(&self.addresses.safe_registry) {
1127            crate::constants::topics::node_safe_registry()
1128        } else {
1129            panic!("use of unsupported contract address: {contract}");
1130        }
1131    }
1132
1133    async fn collect_log_event(&self, slog: SerializableLog, is_synced: bool) -> Result<Option<SignificantChainEvent>> {
1134        let myself = self.clone();
1135        self.db
1136            .begin_transaction()
1137            .await?
1138            .perform(move |tx| {
1139                let log = slog.clone();
1140                let tx_hash = Hash::from(log.tx_hash);
1141                let log_id = log.log_index;
1142                let block_id = log.block_number;
1143
1144                Box::pin(async move {
1145                    match myself.process_log_event(tx, log, is_synced).await {
1146                        // If a significant chain event can be extracted from the log
1147                        Ok(Some(event_type)) => {
1148                            let significant_event = SignificantChainEvent { tx_hash, event_type };
1149                            debug!(block_id, %tx_hash, log_id, ?significant_event, "indexer got significant_event");
1150                            Ok(Some(significant_event))
1151                        }
1152                        Ok(None) => {
1153                            debug!(block_id, %tx_hash, log_id, "no significant event in log");
1154                            Ok(None)
1155                        }
1156                        Err(error) => {
1157                            error!(block_id, %tx_hash, log_id, %error, "error processing log in tx");
1158                            Err(error)
1159                        }
1160                    }
1161                })
1162            })
1163            .await
1164    }
1165}
1166
1167#[cfg(test)]
1168mod tests {
1169    use std::{
1170        sync::{Arc, atomic::Ordering},
1171        time::SystemTime,
1172    };
1173
1174    use alloy::{
1175        dyn_abi::DynSolValue,
1176        primitives::{Address as AlloyAddress, U256},
1177        sol_types::{SolEvent, SolValue},
1178    };
1179    use anyhow::{Context, anyhow};
1180    use hex_literal::hex;
1181    use hopr_chain_rpc::HoprIndexerRpcOperations;
1182    use hopr_chain_types::{
1183        ContractAddresses,
1184        chain_events::{ChainEventType, NetworkRegistryStatus},
1185    };
1186    use hopr_crypto_types::prelude::*;
1187    use hopr_db_sql::{
1188        HoprDbAllOperations, HoprDbGeneralModelOperations,
1189        accounts::{ChainOrPacketKey, HoprDbAccountOperations},
1190        api::{info::DomainSeparator, tickets::HoprDbTicketOperations},
1191        channels::HoprDbChannelOperations,
1192        corrupted_channels::HoprDbCorruptedChannelOperations,
1193        db::HoprDb,
1194        info::HoprDbInfoOperations,
1195        prelude::HoprDbResolverOperations,
1196        registry::HoprDbRegistryOperations,
1197    };
1198    use hopr_internal_types::prelude::*;
1199    use hopr_primitive_types::prelude::*;
1200    use multiaddr::Multiaddr;
1201    use primitive_types::H256;
1202
1203    use super::ContractEventHandlers;
1204
1205    lazy_static::lazy_static! {
1206        static ref SELF_PRIV_KEY: OffchainKeypair = OffchainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
1207        static ref COUNTERPARTY_CHAIN_KEY: ChainKeypair = ChainKeypair::random();
1208        static ref COUNTERPARTY_CHAIN_ADDRESS: Address = COUNTERPARTY_CHAIN_KEY.public().to_address();
1209        static ref SELF_CHAIN_KEY: ChainKeypair = ChainKeypair::random();
1210        static ref SELF_CHAIN_ADDRESS: Address = SELF_CHAIN_KEY.public().to_address();
1211        static ref STAKE_ADDRESS: Address = "4331eaa9542b6b034c43090d9ec1c2198758dbc3".parse().expect("lazy static address should be constructible");
1212        static ref CHANNELS_ADDR: Address = "bab20aea98368220baa4e3b7f151273ee71df93b".parse().expect("lazy static address should be constructible"); // just a dummy
1213        static ref TOKEN_ADDR: Address = "47d1677e018e79dcdd8a9c554466cb1556fa5007".parse().expect("lazy static address should be constructible"); // just a dummy
1214        static ref NETWORK_REGISTRY_ADDR: Address = "a469d0225f884fb989cbad4fe289f6fd2fb98051".parse().expect("lazy static address should be constructible"); // just a dummy
1215        static ref NODE_SAFE_REGISTRY_ADDR: Address = "0dcd1bf9a1b36ce34237eeafef220932846bcd82".parse().expect("lazy static address should be constructible"); // just a dummy
1216        static ref ANNOUNCEMENTS_ADDR: Address = "11db4791bf45ef31a10ea4a1b5cb90f46cc72c7e".parse().expect("lazy static address should be constructible"); // just a dummy
1217        static ref SAFE_MANAGEMENT_MODULE_ADDR: Address = "9b91245a65ad469163a86e32b2281af7a25f38ce".parse().expect("lazy static address should be constructible"); // just a dummy
1218        static ref SAFE_INSTANCE_ADDR: Address = "b93d7fdd605fb64fdcc87f21590f950170719d47".parse().expect("lazy static address should be constructible"); // just a dummy
1219        static ref TICKET_PRICE_ORACLE_ADDR: Address = "11db4391bf45ef31a10ea4a1b5cb90f46cc72c7e".parse().expect("lazy static address should be constructible"); // just a dummy
1220        static ref WIN_PROB_ORACLE_ADDR: Address = "00db4391bf45ef31a10ea4a1b5cb90f46cc64c7e".parse().expect("lazy static address should be constructible"); // just a dummy
1221    }
1222
1223    mockall::mock! {
1224        pub(crate) IndexerRpcOperations {}
1225
1226        #[async_trait::async_trait]
1227        impl HoprIndexerRpcOperations for IndexerRpcOperations {
1228            async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64>;
1229
1230        async fn get_hopr_allowance(&self, owner: Address, spender: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
1231
1232        async fn get_xdai_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<XDaiBalance>;
1233
1234        async fn get_hopr_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
1235
1236        fn try_stream_logs<'a>(
1237            &'a self,
1238            start_block_number: u64,
1239            filters: hopr_chain_rpc::FilterSet,
1240            is_synced: bool,
1241        ) -> hopr_chain_rpc::errors::Result<std::pin::Pin<Box<dyn futures::Stream<Item=hopr_chain_rpc::BlockWithLogs> + Send + 'a> > >;
1242        }
1243    }
1244
1245    #[derive(Clone)]
1246    pub struct ClonableMockOperations {
1247        pub inner: Arc<MockIndexerRpcOperations>,
1248    }
1249
1250    #[async_trait::async_trait]
1251    impl HoprIndexerRpcOperations for ClonableMockOperations {
1252        async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64> {
1253            self.inner.block_number().await
1254        }
1255
1256        async fn get_hopr_allowance(
1257            &self,
1258            owner: Address,
1259            spender: Address,
1260        ) -> hopr_chain_rpc::errors::Result<HoprBalance> {
1261            self.inner.get_hopr_allowance(owner, spender).await
1262        }
1263
1264        async fn get_xdai_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<XDaiBalance> {
1265            self.inner.get_xdai_balance(address).await
1266        }
1267
1268        async fn get_hopr_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<HoprBalance> {
1269            self.inner.get_hopr_balance(address).await
1270        }
1271
1272        fn try_stream_logs<'a>(
1273            &'a self,
1274            start_block_number: u64,
1275            filters: hopr_chain_rpc::FilterSet,
1276            is_synced: bool,
1277        ) -> hopr_chain_rpc::errors::Result<
1278            std::pin::Pin<Box<dyn futures::Stream<Item = hopr_chain_rpc::BlockWithLogs> + Send + 'a>>,
1279        > {
1280            self.inner.try_stream_logs(start_block_number, filters, is_synced)
1281        }
1282    }
1283
1284    fn init_handlers<T: Clone, Db: HoprDbAllOperations + Clone>(
1285        rpc_operations: T,
1286        db: Db,
1287    ) -> ContractEventHandlers<T, Db> {
1288        ContractEventHandlers {
1289            addresses: Arc::new(ContractAddresses {
1290                channels: *CHANNELS_ADDR,
1291                token: *TOKEN_ADDR,
1292                network_registry: *NETWORK_REGISTRY_ADDR,
1293                network_registry_proxy: Default::default(),
1294                safe_registry: *NODE_SAFE_REGISTRY_ADDR,
1295                announcements: *ANNOUNCEMENTS_ADDR,
1296                module_implementation: *SAFE_MANAGEMENT_MODULE_ADDR,
1297                price_oracle: *TICKET_PRICE_ORACLE_ADDR,
1298                win_prob_oracle: *WIN_PROB_ORACLE_ADDR,
1299                stake_factory: Default::default(),
1300            }),
1301            chain_key: SELF_CHAIN_KEY.clone(),
1302            safe_address: *STAKE_ADDRESS,
1303            db,
1304            rpc_operations,
1305        }
1306    }
1307
1308    fn test_log() -> SerializableLog {
1309        SerializableLog { ..Default::default() }
1310    }
1311
1312    #[tokio::test]
1313    async fn announce_keybinding() -> anyhow::Result<()> {
1314        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1315
1316        let rpc_operations = MockIndexerRpcOperations::new();
1317        // ==> set mock expectations here
1318        let clonable_rpc_operations = ClonableMockOperations {
1319            inner: Arc::new(rpc_operations),
1320        };
1321
1322        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1323
1324        let keybinding = KeyBinding::new(*SELF_CHAIN_ADDRESS, &SELF_PRIV_KEY);
1325
1326        let keybinding_log = SerializableLog {
1327            address: handlers.addresses.announcements,
1328            topics: vec![
1329                hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::KeyBinding::SIGNATURE_HASH.into(),
1330            ],
1331            data: DynSolValue::Tuple(vec![
1332                DynSolValue::Bytes(keybinding.signature.as_ref().to_vec()),
1333                DynSolValue::Bytes(keybinding.packet_key.as_ref().to_vec()),
1334                DynSolValue::FixedBytes(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref()).into_word(), 32),
1335            ])
1336            .abi_encode_packed(),
1337            ..test_log()
1338        };
1339
1340        let account_entry = AccountEntry {
1341            public_key: *SELF_PRIV_KEY.public(),
1342            chain_addr: *SELF_CHAIN_ADDRESS,
1343            entry_type: AccountType::NotAnnounced,
1344            published_at: 0,
1345        };
1346
1347        let event_type = db
1348            .begin_transaction()
1349            .await?
1350            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, keybinding_log, true).await }))
1351            .await?;
1352
1353        assert!(event_type.is_none(), "keybinding does not have a chain event type");
1354
1355        assert_eq!(
1356            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1357                .await?
1358                .context("a value should be present")?,
1359            account_entry
1360        );
1361        Ok(())
1362    }
1363
1364    #[tokio::test]
1365    async fn announce_address_announcement() -> anyhow::Result<()> {
1366        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1367        let rpc_operations = MockIndexerRpcOperations::new();
1368        // ==> set mock expectations here
1369        let clonable_rpc_operations = ClonableMockOperations {
1370            //
1371            inner: Arc::new(rpc_operations),
1372        };
1373        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1374
1375        // Assume that there is a keybinding
1376        let account_entry = AccountEntry {
1377            public_key: *SELF_PRIV_KEY.public(),
1378            chain_addr: *SELF_CHAIN_ADDRESS,
1379            entry_type: AccountType::NotAnnounced,
1380            published_at: 1,
1381        };
1382        db.insert_account(None, account_entry.clone()).await?;
1383
1384        let test_multiaddr_empty: Multiaddr = "".parse()?;
1385
1386        let address_announcement_empty_log_encoded_data = DynSolValue::Tuple(vec![
1387            DynSolValue::Address(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref())),
1388            DynSolValue::String(test_multiaddr_empty.to_string()),
1389        ])
1390        .abi_encode();
1391
1392        let address_announcement_empty_log = SerializableLog {
1393            address: handlers.addresses.announcements,
1394            topics: vec![
1395                hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH
1396                    .into(),
1397            ],
1398            data: address_announcement_empty_log_encoded_data[32..].into(),
1399            ..test_log()
1400        };
1401
1402        let handlers_clone = handlers.clone();
1403        let event_type = db
1404            .begin_transaction()
1405            .await?
1406            .perform(|tx| {
1407                Box::pin(async move {
1408                    handlers_clone
1409                        .process_log_event(tx, address_announcement_empty_log, true)
1410                        .await
1411                })
1412            })
1413            .await?;
1414
1415        assert!(
1416            event_type.is_none(),
1417            "announcement of empty multiaddresses must pass through"
1418        );
1419
1420        assert_eq!(
1421            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1422                .await?
1423                .context("a value should be present")?,
1424            account_entry
1425        );
1426
1427        let test_multiaddr: Multiaddr = "/ip4/1.2.3.4/tcp/56".parse()?;
1428
1429        let address_announcement_log_encoded_data = DynSolValue::Tuple(vec![
1430            DynSolValue::Address(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref())),
1431            DynSolValue::String(test_multiaddr.to_string()),
1432        ])
1433        .abi_encode();
1434
1435        let address_announcement_log = SerializableLog {
1436            address: handlers.addresses.announcements,
1437            block_number: 1,
1438            topics: vec![
1439                hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH
1440                    .into(),
1441            ],
1442            data: address_announcement_log_encoded_data[32..].into(),
1443            ..test_log()
1444        };
1445
1446        let announced_account_entry = AccountEntry {
1447            public_key: *SELF_PRIV_KEY.public(),
1448            chain_addr: *SELF_CHAIN_ADDRESS,
1449            entry_type: AccountType::Announced {
1450                multiaddr: test_multiaddr.clone(),
1451                updated_block: 1,
1452            },
1453            published_at: 1,
1454        };
1455
1456        let handlers_clone = handlers.clone();
1457        let event_type = db
1458            .begin_transaction()
1459            .await?
1460            .perform(|tx| {
1461                Box::pin(async move {
1462                    handlers_clone
1463                        .process_log_event(tx, address_announcement_log, true)
1464                        .await
1465                })
1466            })
1467            .await?;
1468
1469        assert!(
1470            matches!(event_type, Some(ChainEventType::Announcement { multiaddresses,.. }) if multiaddresses == vec![test_multiaddr]),
1471            "must return the latest announce multiaddress"
1472        );
1473
1474        assert_eq!(
1475            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1476                .await?
1477                .context("a value should be present")?,
1478            announced_account_entry
1479        );
1480
1481        assert_eq!(
1482            Some(*SELF_CHAIN_ADDRESS),
1483            db.resolve_chain_key(SELF_PRIV_KEY.public()).await?,
1484            "must resolve correct chain key"
1485        );
1486
1487        assert_eq!(
1488            Some(*SELF_PRIV_KEY.public()),
1489            db.resolve_packet_key(&SELF_CHAIN_ADDRESS).await?,
1490            "must resolve correct packet key"
1491        );
1492
1493        let test_multiaddr_dns: Multiaddr = "/dns4/useful.domain/tcp/56".parse()?;
1494
1495        let address_announcement_dns_log_encoded_data = DynSolValue::Tuple(vec![
1496            DynSolValue::Address(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref())),
1497            DynSolValue::String(test_multiaddr_dns.to_string()),
1498        ])
1499        .abi_encode();
1500
1501        let address_announcement_dns_log = SerializableLog {
1502            address: handlers.addresses.announcements,
1503            block_number: 2,
1504            topics: vec![
1505                hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH
1506                    .into(),
1507            ],
1508            data: address_announcement_dns_log_encoded_data[32..].into(),
1509            ..test_log()
1510        };
1511
1512        let announced_dns_account_entry = AccountEntry {
1513            public_key: *SELF_PRIV_KEY.public(),
1514            chain_addr: *SELF_CHAIN_ADDRESS,
1515            entry_type: AccountType::Announced {
1516                multiaddr: test_multiaddr_dns.clone(),
1517                updated_block: 2,
1518            },
1519            published_at: 1,
1520        };
1521
1522        let event_type = db
1523            .begin_transaction()
1524            .await?
1525            .perform(|tx| {
1526                Box::pin(async move { handlers.process_log_event(tx, address_announcement_dns_log, true).await })
1527            })
1528            .await?;
1529
1530        assert!(
1531            matches!(event_type, Some(ChainEventType::Announcement { multiaddresses,.. }) if multiaddresses == vec![test_multiaddr_dns]),
1532            "must return the latest announce multiaddress"
1533        );
1534
1535        assert_eq!(
1536            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1537                .await?
1538                .context("a value should be present")?,
1539            announced_dns_account_entry
1540        );
1541
1542        assert_eq!(
1543            Some(*SELF_CHAIN_ADDRESS),
1544            db.resolve_chain_key(SELF_PRIV_KEY.public()).await?,
1545            "must resolve correct chain key"
1546        );
1547
1548        assert_eq!(
1549            Some(*SELF_PRIV_KEY.public()),
1550            db.resolve_packet_key(&SELF_CHAIN_ADDRESS).await?,
1551            "must resolve correct packet key"
1552        );
1553        Ok(())
1554    }
1555
1556    #[tokio::test]
1557    async fn announce_revoke() -> anyhow::Result<()> {
1558        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1559        let rpc_operations = MockIndexerRpcOperations::new();
1560        // ==> set mock expectations here
1561        let clonable_rpc_operations = ClonableMockOperations {
1562            //
1563            inner: Arc::new(rpc_operations),
1564        };
1565        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1566
1567        let test_multiaddr: Multiaddr = "/ip4/1.2.3.4/tcp/56".parse()?;
1568
1569        // Assume that there is a keybinding and an address announcement
1570        let announced_account_entry = AccountEntry {
1571            public_key: *SELF_PRIV_KEY.public(),
1572            chain_addr: *SELF_CHAIN_ADDRESS,
1573            entry_type: AccountType::Announced {
1574                multiaddr: test_multiaddr,
1575                updated_block: 0,
1576            },
1577            published_at: 1,
1578        };
1579
1580        db.insert_account(None, announced_account_entry).await?;
1581
1582        let encoded_data = (AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref()),).abi_encode();
1583
1584        let revoke_announcement_log = SerializableLog {
1585            address: handlers.addresses.announcements,
1586            topics: vec![
1587                hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::RevokeAnnouncement::SIGNATURE_HASH
1588                    .into(),
1589            ],
1590            data: encoded_data,
1591            ..test_log()
1592        };
1593
1594        let account_entry = AccountEntry {
1595            public_key: *SELF_PRIV_KEY.public(),
1596            chain_addr: *SELF_CHAIN_ADDRESS,
1597            entry_type: AccountType::NotAnnounced,
1598            published_at: 1,
1599        };
1600
1601        let event_type = db
1602            .begin_transaction()
1603            .await?
1604            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, revoke_announcement_log, true).await }))
1605            .await?;
1606
1607        assert!(
1608            event_type.is_none(),
1609            "revoke announcement does not have chain event type"
1610        );
1611
1612        assert_eq!(
1613            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1614                .await?
1615                .context("a value should be present")?,
1616            account_entry
1617        );
1618        Ok(())
1619    }
1620
1621    #[tokio::test]
1622    async fn on_token_transfer_to() -> anyhow::Result<()> {
1623        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1624
1625        let value = U256::MAX;
1626        let target_hopr_balance = HoprBalance::from(primitive_types::U256::from_big_endian(
1627            value.to_be_bytes_vec().as_slice(),
1628        ));
1629
1630        let mut rpc_operations = MockIndexerRpcOperations::new();
1631        rpc_operations
1632            .expect_get_hopr_balance()
1633            .times(1)
1634            .return_once(move |_| Ok(target_hopr_balance));
1635        rpc_operations
1636            .expect_get_hopr_allowance()
1637            .times(1)
1638            .returning(move |_, _| Ok(HoprBalance::from(primitive_types::U256::from(1000u64))));
1639        let clonable_rpc_operations = ClonableMockOperations {
1640            inner: Arc::new(rpc_operations),
1641        };
1642
1643        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1644
1645        let encoded_data = (value).abi_encode();
1646
1647        let transferred_log = SerializableLog {
1648            address: handlers.addresses.token,
1649            topics: vec![
1650                hopr_bindings::hoprtoken::HoprToken::Transfer::SIGNATURE_HASH.into(),
1651                H256::from_slice(&Address::default().to_bytes32()).into(),
1652                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1653            ],
1654            data: encoded_data,
1655            ..test_log()
1656        };
1657
1658        let event_type = db
1659            .begin_transaction()
1660            .await?
1661            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, transferred_log, true).await }))
1662            .await?;
1663
1664        assert!(event_type.is_none(), "token transfer does not have chain event type");
1665
1666        assert_eq!(db.get_safe_hopr_balance(None).await?, target_hopr_balance);
1667
1668        Ok(())
1669    }
1670
1671    #[test_log::test(tokio::test)]
1672    async fn on_token_transfer_from() -> anyhow::Result<()> {
1673        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1674
1675        let mut rpc_operations = MockIndexerRpcOperations::new();
1676        rpc_operations
1677            .expect_get_hopr_balance()
1678            .times(1)
1679            .return_once(|_| Ok(HoprBalance::zero()));
1680        rpc_operations
1681            .expect_get_hopr_allowance()
1682            .times(1)
1683            .returning(move |_, _| Ok(HoprBalance::from(primitive_types::U256::from(1000u64))));
1684        let clonable_rpc_operations = ClonableMockOperations {
1685            inner: Arc::new(rpc_operations),
1686        };
1687
1688        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1689
1690        let value = U256::MAX;
1691
1692        let encoded_data = (value).abi_encode();
1693
1694        db.set_safe_hopr_balance(
1695            None,
1696            HoprBalance::from(primitive_types::U256::from_big_endian(
1697                value.to_be_bytes_vec().as_slice(),
1698            )),
1699        )
1700        .await?;
1701
1702        let transferred_log = SerializableLog {
1703            address: handlers.addresses.token,
1704            topics: vec![
1705                hopr_bindings::hoprtoken::HoprToken::Transfer::SIGNATURE_HASH.into(),
1706                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1707                H256::from_slice(&Address::default().to_bytes32()).into(),
1708            ],
1709            data: encoded_data,
1710            ..test_log()
1711        };
1712
1713        let event_type = db
1714            .begin_transaction()
1715            .await?
1716            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, transferred_log, true).await }))
1717            .await?;
1718
1719        assert!(event_type.is_none(), "token transfer does not have chain event type");
1720
1721        assert_eq!(db.get_safe_hopr_balance(None).await?, HoprBalance::zero());
1722
1723        Ok(())
1724    }
1725
1726    #[tokio::test]
1727    async fn on_token_approval_correct() -> anyhow::Result<()> {
1728        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1729
1730        let target_allowance = HoprBalance::from(primitive_types::U256::from(1000u64));
1731        let mut rpc_operations = MockIndexerRpcOperations::new();
1732        rpc_operations
1733            .expect_get_hopr_allowance()
1734            .times(2)
1735            .returning(move |_, _| Ok(target_allowance));
1736        let clonable_rpc_operations = ClonableMockOperations {
1737            inner: Arc::new(rpc_operations),
1738        };
1739
1740        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1741
1742        let encoded_data = (U256::from(1000u64)).abi_encode();
1743
1744        let approval_log = SerializableLog {
1745            address: handlers.addresses.token,
1746            topics: vec![
1747                hopr_bindings::hoprtoken::HoprToken::Approval::SIGNATURE_HASH.into(),
1748                H256::from_slice(&handlers.safe_address.to_bytes32()).into(),
1749                H256::from_slice(&handlers.addresses.channels.to_bytes32()).into(),
1750            ],
1751            data: encoded_data,
1752            ..test_log()
1753        };
1754
1755        // before any operation the allowance should be 0
1756        assert_eq!(db.get_safe_hopr_allowance(None).await?, HoprBalance::zero());
1757
1758        let approval_log_clone = approval_log.clone();
1759        let handlers_clone = handlers.clone();
1760        let event_type = db
1761            .begin_transaction()
1762            .await?
1763            .perform(|tx| Box::pin(async move { handlers_clone.process_log_event(tx, approval_log_clone, true).await }))
1764            .await?;
1765
1766        assert!(event_type.is_none(), "token approval does not have chain event type");
1767
1768        // after processing the allowance should be 0
1769        assert_eq!(db.get_safe_hopr_allowance(None).await?, target_allowance.clone());
1770
1771        // reduce allowance manually to verify a second time
1772        let _ = db
1773            .set_safe_hopr_allowance(None, HoprBalance::from(primitive_types::U256::from(10u64)))
1774            .await;
1775        assert_eq!(
1776            db.get_safe_hopr_allowance(None).await?,
1777            HoprBalance::from(primitive_types::U256::from(10u64))
1778        );
1779
1780        let handlers_clone = handlers.clone();
1781        let event_type = db
1782            .begin_transaction()
1783            .await?
1784            .perform(|tx| Box::pin(async move { handlers_clone.process_log_event(tx, approval_log, true).await }))
1785            .await?;
1786
1787        assert!(event_type.is_none(), "token approval does not have chain event type");
1788
1789        assert_eq!(db.get_safe_hopr_allowance(None).await?, target_allowance);
1790        Ok(())
1791    }
1792
1793    #[tokio::test]
1794    async fn on_network_registry_event_registered() -> anyhow::Result<()> {
1795        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1796        let rpc_operations = MockIndexerRpcOperations::new();
1797        // ==> set mock expectations here
1798        let clonable_rpc_operations = ClonableMockOperations {
1799            //
1800            inner: Arc::new(rpc_operations),
1801        };
1802        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1803
1804        db.set_network_registry_enabled(None, true).await?;
1805
1806        let encoded_data = ().abi_encode();
1807
1808        let registered_log = SerializableLog {
1809            address: handlers.addresses.network_registry,
1810            topics: vec![
1811                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::Registered::SIGNATURE_HASH.into(),
1812                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1813                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1814            ],
1815            data: encoded_data,
1816            // data: encode(&[]).into(),
1817            ..test_log()
1818        };
1819
1820        assert!(
1821            !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1822                .await?
1823        );
1824
1825        let event_type = db
1826            .begin_transaction()
1827            .await?
1828            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1829            .await?;
1830
1831        assert!(
1832            matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Allowed),
1833            "must return correct NR update"
1834        );
1835
1836        assert!(
1837            db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1838                .await?,
1839            "must be allowed in NR"
1840        );
1841        Ok(())
1842    }
1843
1844    #[tokio::test]
1845    async fn on_network_registry_event_registered_by_manager() -> anyhow::Result<()> {
1846        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1847        let rpc_operations = MockIndexerRpcOperations::new();
1848        // ==> set mock expectations here
1849        let clonable_rpc_operations = ClonableMockOperations {
1850            //
1851            inner: Arc::new(rpc_operations),
1852        };
1853        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1854
1855        db.set_network_registry_enabled(None, true).await?;
1856
1857        let registered_log = SerializableLog {
1858            address: handlers.addresses.network_registry,
1859            topics: vec![
1860                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::RegisteredByManager::SIGNATURE_HASH.into(),
1861                // RegisteredByManagerFilter::signature().into(),
1862                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1863                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1864            ],
1865            data: ().abi_encode(),
1866            // data: encode(&[]).into(),
1867            ..test_log()
1868        };
1869
1870        assert!(
1871            !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1872                .await?
1873        );
1874
1875        let event_type = db
1876            .begin_transaction()
1877            .await?
1878            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1879            .await?;
1880
1881        assert!(
1882            matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Allowed),
1883            "must return correct NR update"
1884        );
1885
1886        assert!(
1887            db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1888                .await?,
1889            "must be allowed in NR"
1890        );
1891        Ok(())
1892    }
1893
1894    #[tokio::test]
1895    async fn on_network_registry_event_deregistered() -> anyhow::Result<()> {
1896        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1897        let rpc_operations = MockIndexerRpcOperations::new();
1898        // ==> set mock expectations here
1899        let clonable_rpc_operations = ClonableMockOperations {
1900            //
1901            inner: Arc::new(rpc_operations),
1902        };
1903        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1904
1905        db.set_network_registry_enabled(None, true).await?;
1906
1907        db.set_access_in_network_registry(None, *SELF_CHAIN_ADDRESS, true)
1908            .await?;
1909
1910        let encoded_data = ().abi_encode();
1911
1912        let registered_log = SerializableLog {
1913            address: handlers.addresses.network_registry,
1914            topics: vec![
1915                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::Deregistered::SIGNATURE_HASH.into(),
1916                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1917                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1918            ],
1919            data: encoded_data,
1920            ..test_log()
1921        };
1922
1923        assert!(
1924            db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1925                .await?
1926        );
1927
1928        let event_type = db
1929            .begin_transaction()
1930            .await?
1931            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1932            .await?;
1933
1934        assert!(
1935            matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Denied),
1936            "must return correct NR update"
1937        );
1938
1939        assert!(
1940            !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1941                .await?,
1942            "must not be allowed in NR"
1943        );
1944        Ok(())
1945    }
1946
1947    #[tokio::test]
1948    async fn on_network_registry_event_deregistered_by_manager() -> anyhow::Result<()> {
1949        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1950        let rpc_operations = MockIndexerRpcOperations::new();
1951        // ==> set mock expectations here
1952        let clonable_rpc_operations = ClonableMockOperations {
1953            //
1954            inner: Arc::new(rpc_operations),
1955        };
1956        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1957
1958        db.set_network_registry_enabled(None, true).await?;
1959
1960        db.set_access_in_network_registry(None, *SELF_CHAIN_ADDRESS, true)
1961            .await?;
1962
1963        let encoded_data = ().abi_encode();
1964
1965        let registered_log = SerializableLog {
1966            address: handlers.addresses.network_registry,
1967            topics: vec![
1968                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::DeregisteredByManager::SIGNATURE_HASH.into(),
1969                // DeregisteredByManagerFilter::signature().into(),
1970                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1971                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1972            ],
1973            data: encoded_data,
1974            ..test_log()
1975        };
1976
1977        assert!(
1978            db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1979                .await?
1980        );
1981
1982        let event_type = db
1983            .begin_transaction()
1984            .await?
1985            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1986            .await?;
1987
1988        assert!(
1989            matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Denied),
1990            "must return correct NR update"
1991        );
1992
1993        assert!(
1994            !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1995                .await?,
1996            "must not be allowed in NR"
1997        );
1998        Ok(())
1999    }
2000
2001    #[tokio::test]
2002    async fn on_network_registry_event_enabled() -> anyhow::Result<()> {
2003        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2004        let rpc_operations = MockIndexerRpcOperations::new();
2005        // ==> set mock expectations here
2006        let clonable_rpc_operations = ClonableMockOperations {
2007            //
2008            inner: Arc::new(rpc_operations),
2009        };
2010        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2011
2012        let encoded_data = ().abi_encode();
2013
2014        let nr_enabled = SerializableLog {
2015            address: handlers.addresses.network_registry,
2016            topics: vec![
2017                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::NetworkRegistryStatusUpdated::SIGNATURE_HASH
2018                    .into(),
2019                // NetworkRegistryStatusUpdatedFilter::signature().into(),
2020                H256::from_low_u64_be(1).into(),
2021            ],
2022            data: encoded_data,
2023            ..test_log()
2024        };
2025
2026        let event_type = db
2027            .begin_transaction()
2028            .await?
2029            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, nr_enabled, false).await }))
2030            .await?;
2031
2032        assert!(event_type.is_none(), "there's no chain event type for nr disable");
2033
2034        assert!(db.get_indexer_data(None).await?.nr_enabled);
2035        Ok(())
2036    }
2037
2038    #[tokio::test]
2039    async fn on_network_registry_event_disabled() -> anyhow::Result<()> {
2040        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2041        let rpc_operations = MockIndexerRpcOperations::new();
2042        // ==> set mock expectations here
2043        let clonable_rpc_operations = ClonableMockOperations {
2044            //
2045            inner: Arc::new(rpc_operations),
2046        };
2047        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2048
2049        db.set_network_registry_enabled(None, true).await?;
2050
2051        let encoded_data = ().abi_encode();
2052
2053        let nr_disabled = SerializableLog {
2054            address: handlers.addresses.network_registry,
2055            topics: vec![
2056                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::NetworkRegistryStatusUpdated::SIGNATURE_HASH
2057                    .into(),
2058                // NetworkRegistryStatusUpdatedFilter::signature().into(),
2059                H256::from_low_u64_be(0).into(),
2060            ],
2061            data: encoded_data,
2062            ..test_log()
2063        };
2064
2065        let event_type = db
2066            .begin_transaction()
2067            .await?
2068            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, nr_disabled, false).await }))
2069            .await?;
2070
2071        assert!(event_type.is_none(), "there's no chain event type for nr enable");
2072
2073        assert!(!db.get_indexer_data(None).await?.nr_enabled);
2074        Ok(())
2075    }
2076
2077    #[tokio::test]
2078    async fn on_network_registry_set_eligible() -> anyhow::Result<()> {
2079        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2080        let rpc_operations = MockIndexerRpcOperations::new();
2081        // ==> set mock expectations here
2082        let clonable_rpc_operations = ClonableMockOperations {
2083            //
2084            inner: Arc::new(rpc_operations),
2085        };
2086        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2087
2088        let encoded_data = ().abi_encode();
2089
2090        let set_eligible = SerializableLog {
2091            address: handlers.addresses.network_registry,
2092            topics: vec![
2093                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::EligibilityUpdated::SIGNATURE_HASH.into(),
2094                // EligibilityUpdatedFilter::signature().into(),
2095                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
2096                H256::from_low_u64_be(1).into(),
2097            ],
2098            data: encoded_data,
2099            ..test_log()
2100        };
2101
2102        let event_type = db
2103            .begin_transaction()
2104            .await?
2105            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, set_eligible, false).await }))
2106            .await?;
2107
2108        assert!(
2109            event_type.is_none(),
2110            "there's no chain event type for setting nr eligibility"
2111        );
2112
2113        assert!(db.is_safe_eligible(None, *STAKE_ADDRESS).await?);
2114
2115        Ok(())
2116    }
2117
2118    #[tokio::test]
2119    async fn on_network_registry_set_not_eligible() -> anyhow::Result<()> {
2120        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2121        let rpc_operations = MockIndexerRpcOperations::new();
2122        // ==> set mock expectations here
2123        let clonable_rpc_operations = ClonableMockOperations {
2124            //
2125            inner: Arc::new(rpc_operations),
2126        };
2127        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2128
2129        db.set_safe_eligibility(None, *STAKE_ADDRESS, false).await?;
2130
2131        let encoded_data = ().abi_encode();
2132
2133        let set_eligible = SerializableLog {
2134            address: handlers.addresses.network_registry,
2135            topics: vec![
2136                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::EligibilityUpdated::SIGNATURE_HASH.into(),
2137                // EligibilityUpdatedFilter::signature().into(),
2138                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
2139                H256::from_low_u64_be(0).into(),
2140            ],
2141            data: encoded_data,
2142            ..test_log()
2143        };
2144
2145        let event_type = db
2146            .begin_transaction()
2147            .await?
2148            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, set_eligible, true).await }))
2149            .await?;
2150
2151        assert!(
2152            event_type.is_none(),
2153            "there's no chain event type for unsetting nr eligibility"
2154        );
2155
2156        assert!(!db.is_safe_eligible(None, *STAKE_ADDRESS).await?);
2157
2158        Ok(())
2159    }
2160
2161    #[tokio::test]
2162    async fn on_channel_event_balance_increased() -> anyhow::Result<()> {
2163        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2164
2165        let value = U256::MAX;
2166        let target_hopr_balance = HoprBalance::from(primitive_types::U256::from_big_endian(
2167            value.to_be_bytes_vec().as_slice(),
2168        ));
2169
2170        let mut rpc_operations = MockIndexerRpcOperations::new();
2171        rpc_operations
2172            .expect_get_hopr_balance()
2173            .times(1)
2174            .return_once(move |_| Ok(target_hopr_balance));
2175        rpc_operations
2176            .expect_get_hopr_allowance()
2177            .times(1)
2178            .returning(move |_, _| Ok(HoprBalance::from(primitive_types::U256::from(1000u64))));
2179        let clonable_rpc_operations = ClonableMockOperations {
2180            inner: Arc::new(rpc_operations),
2181        };
2182        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2183
2184        let channel = ChannelEntry::new(
2185            *SELF_CHAIN_ADDRESS,
2186            *COUNTERPARTY_CHAIN_ADDRESS,
2187            0.into(),
2188            primitive_types::U256::zero(),
2189            ChannelStatus::Open,
2190            primitive_types::U256::one(),
2191        );
2192
2193        db.upsert_channel(None, channel).await?;
2194
2195        let solidity_balance: HoprBalance = primitive_types::U256::from((1u128 << 96) - 1).into();
2196        let diff = solidity_balance - channel.balance;
2197
2198        let encoded_data = (solidity_balance.amount().to_be_bytes()).abi_encode();
2199
2200        let balance_increased_log = SerializableLog {
2201            address: handlers.addresses.channels,
2202            topics: vec![
2203                hopr_bindings::hoprchannels::HoprChannels::ChannelBalanceIncreased::SIGNATURE_HASH.into(),
2204                // ChannelBalanceIncreasedFilter::signature().into(),
2205                H256::from_slice(channel.get_id().as_ref()).into(),
2206            ],
2207            data: encoded_data,
2208            ..test_log()
2209        };
2210
2211        let event_type = db
2212            .begin_transaction()
2213            .await?
2214            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_increased_log, true).await }))
2215            .await?;
2216
2217        let channel = db
2218            .get_channel_by_id(None, &channel.get_id())
2219            .await?
2220            .context("a value should be present")?;
2221
2222        assert!(
2223            matches!(event_type, Some(ChainEventType::ChannelBalanceIncreased(c, b)) if c == channel && b == diff),
2224            "must return updated channel entry and balance diff"
2225        );
2226
2227        assert_eq!(solidity_balance, channel.balance, "balance must be updated");
2228        Ok(())
2229    }
2230
2231    #[tokio::test]
2232    async fn on_channel_event_domain_separator_updated() -> anyhow::Result<()> {
2233        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2234        let rpc_operations = MockIndexerRpcOperations::new();
2235        // ==> set mock expectations here
2236        let clonable_rpc_operations = ClonableMockOperations {
2237            //
2238            inner: Arc::new(rpc_operations),
2239        };
2240        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2241
2242        let separator = Hash::from(hopr_crypto_random::random_bytes());
2243
2244        let encoded_data = ().abi_encode();
2245
2246        let channels_dst_updated = SerializableLog {
2247            address: handlers.addresses.channels,
2248            topics: vec![
2249                hopr_bindings::hoprchannels::HoprChannels::DomainSeparatorUpdated::SIGNATURE_HASH.into(),
2250                // DomainSeparatorUpdatedFilter::signature().into(),
2251                H256::from_slice(separator.as_ref()).into(),
2252            ],
2253            data: encoded_data,
2254            ..test_log()
2255        };
2256
2257        assert!(db.get_indexer_data(None).await?.channels_dst.is_none());
2258
2259        let event_type = db
2260            .begin_transaction()
2261            .await?
2262            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channels_dst_updated, true).await }))
2263            .await?;
2264
2265        assert!(
2266            event_type.is_none(),
2267            "there's no chain event type for channel dst update"
2268        );
2269
2270        assert_eq!(
2271            separator,
2272            db.get_indexer_data(None)
2273                .await?
2274                .channels_dst
2275                .context("a value should be present")?,
2276            "separator must be updated"
2277        );
2278        Ok(())
2279    }
2280
2281    #[tokio::test]
2282    async fn on_channel_event_balance_decreased() -> anyhow::Result<()> {
2283        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2284
2285        let value = U256::MAX;
2286        let target_hopr_balance = HoprBalance::from(primitive_types::U256::from_big_endian(
2287            value.to_be_bytes_vec().as_slice(),
2288        ));
2289
2290        let mut rpc_operations = MockIndexerRpcOperations::new();
2291        rpc_operations
2292            .expect_get_hopr_balance()
2293            .times(1)
2294            .return_once(move |_| Ok(target_hopr_balance));
2295        let clonable_rpc_operations = ClonableMockOperations {
2296            inner: Arc::new(rpc_operations),
2297        };
2298        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2299
2300        let channel = ChannelEntry::new(
2301            *SELF_CHAIN_ADDRESS,
2302            *COUNTERPARTY_CHAIN_ADDRESS,
2303            HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1)),
2304            primitive_types::U256::zero(),
2305            ChannelStatus::Open,
2306            primitive_types::U256::one(),
2307        );
2308
2309        db.upsert_channel(None, channel).await?;
2310
2311        let solidity_balance: HoprBalance = primitive_types::U256::from((1u128 << 96) - 2).into();
2312        let diff = channel.balance - solidity_balance;
2313
2314        // let encoded_data = (solidity_balance).abi_encode();
2315        let encoded_data = DynSolValue::Tuple(vec![DynSolValue::Uint(
2316            U256::from_be_slice(&solidity_balance.amount().to_be_bytes()),
2317            256,
2318        )])
2319        .abi_encode();
2320
2321        let balance_decreased_log = SerializableLog {
2322            address: handlers.addresses.channels,
2323            topics: vec![
2324                hopr_bindings::hoprchannels::HoprChannels::ChannelBalanceDecreased::SIGNATURE_HASH.into(),
2325                // ChannelBalanceDecreasedFilter::signature().into(),
2326                H256::from_slice(channel.get_id().as_ref()).into(),
2327            ],
2328            data: encoded_data,
2329            ..test_log()
2330        };
2331
2332        let event_type = db
2333            .begin_transaction()
2334            .await?
2335            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_decreased_log, true).await }))
2336            .await?;
2337
2338        let channel = db
2339            .get_channel_by_id(None, &channel.get_id())
2340            .await?
2341            .context("a value should be present")?;
2342
2343        assert!(
2344            matches!(event_type, Some(ChainEventType::ChannelBalanceDecreased(c, b)) if c == channel && b == diff),
2345            "must return updated channel entry and balance diff"
2346        );
2347
2348        assert_eq!(solidity_balance, channel.balance, "balance must be updated");
2349        Ok(())
2350    }
2351
2352    #[tokio::test]
2353    async fn on_channel_closed() -> anyhow::Result<()> {
2354        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2355        let rpc_operations = MockIndexerRpcOperations::new();
2356        // ==> set mock expectations here
2357        let clonable_rpc_operations = ClonableMockOperations {
2358            //
2359            inner: Arc::new(rpc_operations),
2360        };
2361        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2362
2363        let starting_balance = HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1));
2364
2365        let channel = ChannelEntry::new(
2366            *SELF_CHAIN_ADDRESS,
2367            *COUNTERPARTY_CHAIN_ADDRESS,
2368            starting_balance,
2369            primitive_types::U256::zero(),
2370            ChannelStatus::Open,
2371            primitive_types::U256::one(),
2372        );
2373
2374        db.upsert_channel(None, channel).await?;
2375
2376        let encoded_data = ().abi_encode();
2377
2378        let channel_closed_log = SerializableLog {
2379            address: handlers.addresses.channels,
2380            topics: vec![
2381                hopr_bindings::hoprchannels::HoprChannels::ChannelClosed::SIGNATURE_HASH.into(),
2382                // ChannelClosedFilter::signature().into(),
2383                H256::from_slice(channel.get_id().as_ref()).into(),
2384            ],
2385            data: encoded_data,
2386            ..test_log()
2387        };
2388
2389        let event_type = db
2390            .begin_transaction()
2391            .await?
2392            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_closed_log, true).await }))
2393            .await?;
2394
2395        let closed_channel = db
2396            .get_channel_by_id(None, &channel.get_id())
2397            .await?
2398            .context("a value should be present")?;
2399
2400        assert!(
2401            matches!(event_type, Some(ChainEventType::ChannelClosed(c)) if c == closed_channel),
2402            "must return the updated channel entry"
2403        );
2404
2405        assert_eq!(closed_channel.status, ChannelStatus::Closed);
2406        assert_eq!(closed_channel.ticket_index, 0u64.into());
2407        assert_eq!(
2408            0,
2409            db.get_outgoing_ticket_index(closed_channel.get_id())
2410                .await?
2411                .load(Ordering::Relaxed)
2412        );
2413
2414        assert!(closed_channel.balance.amount().eq(&primitive_types::U256::zero()));
2415        Ok(())
2416    }
2417
2418    #[tokio::test]
2419    async fn on_foreign_channel_closed() -> anyhow::Result<()> {
2420        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2421        let rpc_operations = MockIndexerRpcOperations::new();
2422        // ==> set mock expectations here
2423        let clonable_rpc_operations = ClonableMockOperations {
2424            //
2425            inner: Arc::new(rpc_operations),
2426        };
2427        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2428
2429        let starting_balance = HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1));
2430
2431        let channel = ChannelEntry::new(
2432            Address::new(&hex!("B7397C218766eBe6A1A634df523A1a7e412e67eA")),
2433            Address::new(&hex!("D4fdec44DB9D44B8f2b6d529620f9C0C7066A2c1")),
2434            starting_balance,
2435            primitive_types::U256::zero(),
2436            ChannelStatus::Open,
2437            primitive_types::U256::one(),
2438        );
2439
2440        db.upsert_channel(None, channel).await?;
2441
2442        let encoded_data = ().abi_encode();
2443
2444        let channel_closed_log = SerializableLog {
2445            address: handlers.addresses.channels,
2446            topics: vec![
2447                hopr_bindings::hoprchannels::HoprChannels::ChannelClosed::SIGNATURE_HASH.into(),
2448                // ChannelClosedFilter::signature().into(),
2449                H256::from_slice(channel.get_id().as_ref()).into(),
2450            ],
2451            data: encoded_data,
2452            ..test_log()
2453        };
2454
2455        let event_type = db
2456            .begin_transaction()
2457            .await?
2458            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_closed_log, true).await }))
2459            .await?;
2460
2461        let closed_channel = db.get_channel_by_id(None, &channel.get_id()).await?;
2462
2463        assert_eq!(None, closed_channel, "foreign channel must be deleted");
2464
2465        assert!(
2466            matches!(event_type, Some(ChainEventType::ChannelClosed(c)) if c.get_id() == channel.get_id()),
2467            "must return the closed channel entry"
2468        );
2469
2470        Ok(())
2471    }
2472
2473    #[tokio::test]
2474    async fn on_channel_opened() -> anyhow::Result<()> {
2475        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2476        let rpc_operations = MockIndexerRpcOperations::new();
2477        // ==> set mock expectations here
2478        let clonable_rpc_operations = ClonableMockOperations {
2479            //
2480            inner: Arc::new(rpc_operations),
2481        };
2482        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2483
2484        let channel_id = generate_channel_id(&SELF_CHAIN_ADDRESS, &COUNTERPARTY_CHAIN_ADDRESS);
2485
2486        let encoded_data = ().abi_encode();
2487
2488        let channel_opened_log = SerializableLog {
2489            address: handlers.addresses.channels,
2490            topics: vec![
2491                hopr_bindings::hoprchannels::HoprChannels::ChannelOpened::SIGNATURE_HASH.into(),
2492                // ChannelOpenedFilter::signature().into(),
2493                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2494                H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2495            ],
2496            data: encoded_data,
2497            ..test_log()
2498        };
2499
2500        let event_type = db
2501            .begin_transaction()
2502            .await?
2503            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log, true).await }))
2504            .await?;
2505
2506        let channel = db
2507            .get_channel_by_id(None, &channel_id)
2508            .await?
2509            .context("a value should be present")?;
2510
2511        assert!(
2512            matches!(event_type, Some(ChainEventType::ChannelOpened(c)) if c == channel),
2513            "must return the updated channel entry"
2514        );
2515
2516        assert_eq!(channel.status, ChannelStatus::Open);
2517        assert_eq!(channel.channel_epoch, 1u64.into());
2518        assert_eq!(channel.ticket_index, 0u64.into());
2519        assert_eq!(
2520            0,
2521            db.get_outgoing_ticket_index(channel.get_id())
2522                .await?
2523                .load(Ordering::Relaxed)
2524        );
2525        Ok(())
2526    }
2527
2528    #[tokio::test]
2529    async fn on_channel_reopened() -> anyhow::Result<()> {
2530        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2531        let rpc_operations = MockIndexerRpcOperations::new();
2532        // ==> set mock expectations here
2533        let clonable_rpc_operations = ClonableMockOperations {
2534            //
2535            inner: Arc::new(rpc_operations),
2536        };
2537        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2538
2539        let channel = ChannelEntry::new(
2540            *SELF_CHAIN_ADDRESS,
2541            *COUNTERPARTY_CHAIN_ADDRESS,
2542            HoprBalance::zero(),
2543            primitive_types::U256::zero(),
2544            ChannelStatus::Closed,
2545            3.into(),
2546        );
2547
2548        db.upsert_channel(None, channel).await?;
2549
2550        let encoded_data = ().abi_encode();
2551
2552        let channel_opened_log = SerializableLog {
2553            address: handlers.addresses.channels,
2554            topics: vec![
2555                hopr_bindings::hoprchannels::HoprChannels::ChannelOpened::SIGNATURE_HASH.into(),
2556                // ChannelOpenedFilter::signature().into(),
2557                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2558                H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2559            ],
2560            data: encoded_data,
2561            ..test_log()
2562        };
2563
2564        let event_type = db
2565            .begin_transaction()
2566            .await?
2567            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log, true).await }))
2568            .await?;
2569
2570        let channel = db
2571            .get_channel_by_id(None, &channel.get_id())
2572            .await?
2573            .context("a value should be present")?;
2574
2575        assert!(
2576            matches!(event_type, Some(ChainEventType::ChannelOpened(c)) if c == channel),
2577            "must return the updated channel entry"
2578        );
2579
2580        assert_eq!(channel.status, ChannelStatus::Open);
2581        assert_eq!(channel.channel_epoch, 4u64.into());
2582        assert_eq!(channel.ticket_index, 0u64.into());
2583
2584        assert_eq!(
2585            0,
2586            db.get_outgoing_ticket_index(channel.get_id())
2587                .await?
2588                .load(Ordering::Relaxed)
2589        );
2590        Ok(())
2591    }
2592
2593    #[tokio::test]
2594    async fn on_channel_should_not_reopen_when_not_closed() -> anyhow::Result<()> {
2595        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2596        let rpc_operations = MockIndexerRpcOperations::new();
2597        // ==> set mock expectations here
2598        let clonable_rpc_operations = ClonableMockOperations {
2599            //
2600            inner: Arc::new(rpc_operations),
2601        };
2602        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2603
2604        let channel = ChannelEntry::new(
2605            *SELF_CHAIN_ADDRESS,
2606            *COUNTERPARTY_CHAIN_ADDRESS,
2607            0.into(),
2608            primitive_types::U256::zero(),
2609            ChannelStatus::Open,
2610            3.into(),
2611        );
2612
2613        db.upsert_channel(None, channel).await?;
2614
2615        let encoded_data = ().abi_encode();
2616
2617        let channel_opened_log = SerializableLog {
2618            address: handlers.addresses.channels,
2619            topics: vec![
2620                hopr_bindings::hoprchannels::HoprChannels::ChannelOpened::SIGNATURE_HASH.into(),
2621                // ChannelOpenedFilter::signature().into(),
2622                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2623                H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2624            ],
2625            data: encoded_data,
2626            ..test_log()
2627        };
2628
2629        db.begin_transaction()
2630            .await?
2631            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log, true).await }))
2632            .await
2633            .context("Channel should stay open, with corrupted flag set")?;
2634
2635        assert!(
2636            db.get_channel_by_id(None, &channel.get_id()).await?.is_none(),
2637            "channel should not be returned as marked as corrupted",
2638        );
2639
2640        db.get_corrupted_channel_by_id(None, &channel.get_id())
2641            .await?
2642            .context("a value should be present")?;
2643
2644        Ok(())
2645    }
2646
2647    #[tokio::test]
2648    async fn event_for_non_existing_channel_should_create_corrupted_channel() -> anyhow::Result<()> {
2649        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2650        let rpc_operations = MockIndexerRpcOperations::new();
2651        // ==> set mock expectations here
2652        let clonable_rpc_operations = ClonableMockOperations {
2653            //
2654            inner: Arc::new(rpc_operations),
2655        };
2656        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2657
2658        let channel_id = generate_channel_id(&SELF_CHAIN_ADDRESS, &COUNTERPARTY_CHAIN_ADDRESS);
2659
2660        // Attempt to increase balance
2661        let solidity_balance: HoprBalance = primitive_types::U256::from((1u128 << 96) - 1).into();
2662
2663        let encoded_data = (solidity_balance.amount().to_be_bytes()).abi_encode();
2664
2665        let balance_increased_log = SerializableLog {
2666            address: handlers.addresses.channels,
2667            topics: vec![
2668                hopr_bindings::hoprchannels::HoprChannels::ChannelBalanceIncreased::SIGNATURE_HASH.into(),
2669                // ChannelBalanceIncreasedFilter::signature().into(),
2670                H256::from_slice(channel_id.as_ref()).into(),
2671            ],
2672            data: encoded_data,
2673            ..test_log()
2674        };
2675
2676        db.begin_transaction()
2677            .await?
2678            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_increased_log, true).await }))
2679            .await?;
2680
2681        // Check that the corrupted channel was created
2682        db.get_corrupted_channel_by_id(None, &channel_id)
2683            .await?
2684            .context("channel should be set a corrupted")?;
2685
2686        Ok(())
2687    }
2688
2689    const PRICE_PER_PACKET: u32 = 20_u32;
2690
2691    fn mock_acknowledged_ticket(
2692        signer: &ChainKeypair,
2693        destination: &ChainKeypair,
2694        index: u64,
2695        win_prob: f64,
2696    ) -> anyhow::Result<AcknowledgedTicket> {
2697        let channel_id = generate_channel_id(&signer.into(), &destination.into());
2698
2699        let channel_epoch = 1u64;
2700        let domain_separator = Hash::default();
2701
2702        let response = Response::try_from(
2703            Hash::create(&[channel_id.as_ref(), &channel_epoch.to_be_bytes(), &index.to_be_bytes()]).as_ref(),
2704        )?;
2705
2706        Ok(TicketBuilder::default()
2707            .direction(&signer.into(), &destination.into())
2708            .amount(primitive_types::U256::from(PRICE_PER_PACKET).div_f64(win_prob)?)
2709            .index(index)
2710            .index_offset(1)
2711            .win_prob(win_prob.try_into()?)
2712            .channel_epoch(1)
2713            .challenge(response.to_challenge()?)
2714            .build_signed(signer, &domain_separator)?
2715            .into_acknowledged(response))
2716    }
2717
2718    #[tokio::test]
2719    async fn on_channel_ticket_redeemed_incoming_channel() -> anyhow::Result<()> {
2720        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2721        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2722            .await?;
2723        let rpc_operations = MockIndexerRpcOperations::new();
2724        // ==> set mock expectations here
2725        let clonable_rpc_operations = ClonableMockOperations {
2726            //
2727            inner: Arc::new(rpc_operations),
2728        };
2729        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2730
2731        let channel = ChannelEntry::new(
2732            *COUNTERPARTY_CHAIN_ADDRESS,
2733            *SELF_CHAIN_ADDRESS,
2734            HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1)),
2735            primitive_types::U256::zero(),
2736            ChannelStatus::Open,
2737            primitive_types::U256::one(),
2738        );
2739
2740        let ticket_index = primitive_types::U256::from((1u128 << 48) - 2);
2741        let next_ticket_index = ticket_index + 1;
2742
2743        let mut ticket =
2744            mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64(), 1.0)?;
2745        ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
2746
2747        let ticket_value = ticket.verified_ticket().amount;
2748
2749        db.upsert_channel(None, channel).await?;
2750        db.upsert_ticket(None, ticket.clone()).await?;
2751
2752        let ticket_redeemed_log = SerializableLog {
2753            address: handlers.addresses.channels,
2754            topics: vec![
2755                hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2756                // TicketRedeemedFilter::signature().into(),
2757                H256::from_slice(channel.get_id().as_ref()).into(),
2758            ],
2759            data: DynSolValue::Tuple(vec![DynSolValue::Uint(
2760                U256::from_be_bytes(next_ticket_index.to_be_bytes()),
2761                48,
2762            )])
2763            .abi_encode(),
2764            ..test_log()
2765        };
2766
2767        let outgoing_ticket_index_before = db
2768            .get_outgoing_ticket_index(channel.get_id())
2769            .await?
2770            .load(Ordering::Relaxed);
2771
2772        let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2773        assert_eq!(
2774            HoprBalance::zero(),
2775            stats.redeemed_value,
2776            "there should not be any redeemed value"
2777        );
2778        assert_eq!(
2779            HoprBalance::zero(),
2780            stats.neglected_value,
2781            "there should not be any neglected value"
2782        );
2783
2784        let event_type = db
2785            .begin_transaction()
2786            .await?
2787            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2788            .await?;
2789
2790        let channel = db
2791            .get_channel_by_id(None, &channel.get_id())
2792            .await?
2793            .context("a value should be present")?;
2794
2795        assert!(
2796            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, t)) if channel == c && t == Some(ticket)),
2797            "must return the updated channel entry and the redeemed ticket"
2798        );
2799
2800        assert_eq!(
2801            channel.ticket_index, next_ticket_index,
2802            "channel entry must contain next ticket index"
2803        );
2804
2805        let outgoing_ticket_index_after = db
2806            .get_outgoing_ticket_index(channel.get_id())
2807            .await?
2808            .load(Ordering::Relaxed);
2809
2810        assert_eq!(
2811            outgoing_ticket_index_before, outgoing_ticket_index_after,
2812            "outgoing ticket index must not change"
2813        );
2814
2815        let tickets = db.get_tickets((&channel).into()).await?;
2816
2817        assert!(tickets.is_empty(), "there should not be any tickets left");
2818
2819        let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2820        assert_eq!(
2821            ticket_value, stats.redeemed_value,
2822            "there should be redeemed value worth 1 ticket"
2823        );
2824        assert_eq!(
2825            HoprBalance::zero(),
2826            stats.neglected_value,
2827            "there should not be any neglected ticket"
2828        );
2829        Ok(())
2830    }
2831
2832    #[tokio::test]
2833    async fn on_channel_ticket_redeemed_incoming_channel_neglect_left_over_tickets() -> anyhow::Result<()> {
2834        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2835        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2836            .await?;
2837        let rpc_operations = MockIndexerRpcOperations::new();
2838        // ==> set mock expectations here
2839        let clonable_rpc_operations = ClonableMockOperations {
2840            //
2841            inner: Arc::new(rpc_operations),
2842        };
2843        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2844
2845        let channel = ChannelEntry::new(
2846            *COUNTERPARTY_CHAIN_ADDRESS,
2847            *SELF_CHAIN_ADDRESS,
2848            primitive_types::U256::from((1u128 << 96) - 1).into(),
2849            primitive_types::U256::zero(),
2850            ChannelStatus::Open,
2851            primitive_types::U256::one(),
2852        );
2853
2854        let ticket_index = primitive_types::U256::from((1u128 << 48) - 2);
2855        let next_ticket_index = ticket_index + 1;
2856
2857        let mut ticket =
2858            mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64(), 1.0)?;
2859        ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
2860
2861        let ticket_value = ticket.verified_ticket().amount;
2862
2863        db.upsert_channel(None, channel).await?;
2864        db.upsert_ticket(None, ticket.clone()).await?;
2865
2866        let old_ticket =
2867            mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64() - 1, 1.0)?;
2868        db.upsert_ticket(None, old_ticket.clone()).await?;
2869
2870        let ticket_redeemed_log = SerializableLog {
2871            address: handlers.addresses.channels,
2872            topics: vec![
2873                hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2874                // TicketRedeemedFilter::signature().into(),
2875                H256::from_slice(channel.get_id().as_ref()).into(),
2876            ],
2877            data: Vec::from(next_ticket_index.to_be_bytes()),
2878            ..test_log()
2879        };
2880
2881        let outgoing_ticket_index_before = db
2882            .get_outgoing_ticket_index(channel.get_id())
2883            .await?
2884            .load(Ordering::Relaxed);
2885
2886        let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2887        assert_eq!(
2888            HoprBalance::zero(),
2889            stats.redeemed_value,
2890            "there should not be any redeemed value"
2891        );
2892        assert_eq!(
2893            HoprBalance::zero(),
2894            stats.neglected_value,
2895            "there should not be any neglected value"
2896        );
2897
2898        let event_type = db
2899            .begin_transaction()
2900            .await?
2901            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2902            .await?;
2903
2904        let channel = db
2905            .get_channel_by_id(None, &channel.get_id())
2906            .await?
2907            .context("a value should be present")?;
2908
2909        assert!(
2910            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, t)) if channel == c && t == Some(ticket)),
2911            "must return the updated channel entry and the redeemed ticket"
2912        );
2913
2914        assert_eq!(
2915            channel.ticket_index, next_ticket_index,
2916            "channel entry must contain next ticket index"
2917        );
2918
2919        let outgoing_ticket_index_after = db
2920            .get_outgoing_ticket_index(channel.get_id())
2921            .await?
2922            .load(Ordering::Relaxed);
2923
2924        assert_eq!(
2925            outgoing_ticket_index_before, outgoing_ticket_index_after,
2926            "outgoing ticket index must not change"
2927        );
2928
2929        let tickets = db.get_tickets((&channel).into()).await?;
2930        assert!(tickets.is_empty(), "there should not be any tickets left");
2931
2932        let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2933        assert_eq!(
2934            ticket_value, stats.redeemed_value,
2935            "there should be redeemed value worth 1 ticket"
2936        );
2937        assert_eq!(
2938            ticket_value, stats.neglected_value,
2939            "there should neglected value worth 1 ticket"
2940        );
2941        Ok(())
2942    }
2943
2944    #[tokio::test]
2945    async fn on_channel_ticket_redeemed_outgoing_channel() -> anyhow::Result<()> {
2946        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2947        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2948            .await?;
2949        let rpc_operations = MockIndexerRpcOperations::new();
2950        // ==> set mock expectations here
2951        let clonable_rpc_operations = ClonableMockOperations {
2952            //
2953            inner: Arc::new(rpc_operations),
2954        };
2955        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2956
2957        let channel = ChannelEntry::new(
2958            *SELF_CHAIN_ADDRESS,
2959            *COUNTERPARTY_CHAIN_ADDRESS,
2960            primitive_types::U256::from((1u128 << 96) - 1).into(),
2961            primitive_types::U256::zero(),
2962            ChannelStatus::Open,
2963            primitive_types::U256::one(),
2964        );
2965
2966        let ticket_index = primitive_types::U256::from((1u128 << 48) - 2);
2967        let next_ticket_index = ticket_index + 1;
2968
2969        db.upsert_channel(None, channel).await?;
2970
2971        let ticket_redeemed_log = SerializableLog {
2972            address: handlers.addresses.channels,
2973            topics: vec![
2974                hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2975                // TicketRedeemedFilter::signature().into(),
2976                H256::from_slice(channel.get_id().as_ref()).into(),
2977            ],
2978            data: Vec::from(next_ticket_index.to_be_bytes()),
2979            ..test_log()
2980        };
2981
2982        let event_type = db
2983            .begin_transaction()
2984            .await?
2985            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2986            .await?;
2987
2988        let channel = db
2989            .get_channel_by_id(None, &channel.get_id())
2990            .await?
2991            .context("a value should be present")?;
2992
2993        assert!(
2994            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if channel == c),
2995            "must return update channel entry and no ticket"
2996        );
2997
2998        assert_eq!(
2999            channel.ticket_index, next_ticket_index,
3000            "channel entry must contain next ticket index"
3001        );
3002
3003        let outgoing_ticket_index = db
3004            .get_outgoing_ticket_index(channel.get_id())
3005            .await?
3006            .load(Ordering::Relaxed);
3007
3008        assert!(
3009            outgoing_ticket_index >= ticket_index.as_u64(),
3010            "outgoing idx {outgoing_ticket_index} must be greater or equal to {ticket_index}"
3011        );
3012        assert_eq!(
3013            outgoing_ticket_index,
3014            next_ticket_index.as_u64(),
3015            "outgoing ticket index must be equal to next ticket index"
3016        );
3017        Ok(())
3018    }
3019
3020    #[tokio::test]
3021    async fn on_channel_ticket_redeemed_on_incoming_channel_with_non_existent_ticket_should_pass() -> anyhow::Result<()>
3022    {
3023        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3024        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
3025            .await?;
3026        let rpc_operations = MockIndexerRpcOperations::new();
3027        // ==> set mock expectations here
3028        let clonable_rpc_operations = ClonableMockOperations {
3029            //
3030            inner: Arc::new(rpc_operations),
3031        };
3032        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3033
3034        let channel = ChannelEntry::new(
3035            *COUNTERPARTY_CHAIN_ADDRESS,
3036            *SELF_CHAIN_ADDRESS,
3037            primitive_types::U256::from((1u128 << 96) - 1).into(),
3038            primitive_types::U256::zero(),
3039            ChannelStatus::Open,
3040            primitive_types::U256::one(),
3041        );
3042
3043        db.upsert_channel(None, channel).await?;
3044
3045        let next_ticket_index = primitive_types::U256::from((1u128 << 48) - 1);
3046
3047        let ticket_redeemed_log = SerializableLog {
3048            address: handlers.addresses.channels,
3049            topics: vec![
3050                hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
3051                // TicketRedeemedFilter::signature().into(),
3052                H256::from_slice(channel.get_id().as_ref()).into(),
3053            ],
3054            data: Vec::from(next_ticket_index.to_be_bytes()),
3055            ..test_log()
3056        };
3057
3058        let event_type = db
3059            .begin_transaction()
3060            .await?
3061            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
3062            .await?;
3063
3064        let channel = db
3065            .get_channel_by_id(None, &channel.get_id())
3066            .await?
3067            .context("a value should be present")?;
3068
3069        assert!(
3070            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if c == channel),
3071            "must return updated channel entry and no ticket"
3072        );
3073
3074        assert_eq!(
3075            channel.ticket_index, next_ticket_index,
3076            "channel entry must contain next ticket index"
3077        );
3078        Ok(())
3079    }
3080
3081    #[tokio::test]
3082    async fn on_channel_ticket_redeemed_on_foreign_channel_should_pass() -> anyhow::Result<()> {
3083        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3084        let rpc_operations = MockIndexerRpcOperations::new();
3085        // ==> set mock expectations here
3086        let clonable_rpc_operations = ClonableMockOperations {
3087            //
3088            inner: Arc::new(rpc_operations),
3089        };
3090        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3091
3092        let channel = ChannelEntry::new(
3093            Address::from(hopr_crypto_random::random_bytes()),
3094            Address::from(hopr_crypto_random::random_bytes()),
3095            primitive_types::U256::from((1u128 << 96) - 1).into(),
3096            primitive_types::U256::zero(),
3097            ChannelStatus::Open,
3098            primitive_types::U256::one(),
3099        );
3100
3101        db.upsert_channel(None, channel).await?;
3102
3103        let next_ticket_index = primitive_types::U256::from((1u128 << 48) - 1);
3104
3105        let ticket_redeemed_log = SerializableLog {
3106            address: handlers.addresses.channels,
3107            topics: vec![
3108                hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
3109                // TicketRedeemedFilter::signature().into(),
3110                H256::from_slice(channel.get_id().as_ref()).into(),
3111            ],
3112            data: Vec::from(next_ticket_index.to_be_bytes()),
3113            ..test_log()
3114        };
3115
3116        let event_type = db
3117            .begin_transaction()
3118            .await?
3119            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
3120            .await?;
3121
3122        let channel = db
3123            .get_channel_by_id(None, &channel.get_id())
3124            .await?
3125            .context("a value should be present")?;
3126
3127        assert!(
3128            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if c == channel),
3129            "must return updated channel entry and no ticket"
3130        );
3131
3132        assert_eq!(
3133            channel.ticket_index, next_ticket_index,
3134            "channel entry must contain next ticket index"
3135        );
3136        Ok(())
3137    }
3138
3139    #[tokio::test]
3140    async fn on_channel_closure_initiated() -> anyhow::Result<()> {
3141        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3142        let rpc_operations = MockIndexerRpcOperations::new();
3143        // ==> set mock expectations here
3144        let clonable_rpc_operations = ClonableMockOperations {
3145            //
3146            inner: Arc::new(rpc_operations),
3147        };
3148        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3149
3150        let channel = ChannelEntry::new(
3151            *SELF_CHAIN_ADDRESS,
3152            *COUNTERPARTY_CHAIN_ADDRESS,
3153            primitive_types::U256::from((1u128 << 96) - 1).into(),
3154            primitive_types::U256::zero(),
3155            ChannelStatus::Open,
3156            primitive_types::U256::one(),
3157        );
3158
3159        db.upsert_channel(None, channel).await?;
3160
3161        let timestamp = SystemTime::now();
3162
3163        let encoded_data = (U256::from(timestamp.as_unix_timestamp().as_secs())).abi_encode();
3164
3165        let closure_initiated_log = SerializableLog {
3166            address: handlers.addresses.channels,
3167            topics: vec![
3168                hopr_bindings::hoprchannels::HoprChannels::OutgoingChannelClosureInitiated::SIGNATURE_HASH.into(),
3169                // OutgoingChannelClosureInitiatedFilter::signature().into(),
3170                H256::from_slice(channel.get_id().as_ref()).into(),
3171            ],
3172            data: encoded_data,
3173            // data: Vec::from(U256::from(timestamp.as_unix_timestamp().as_secs()).to_be_bytes()).into(),
3174            ..test_log()
3175        };
3176
3177        let event_type = db
3178            .begin_transaction()
3179            .await?
3180            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, closure_initiated_log, true).await }))
3181            .await?;
3182
3183        let channel = db
3184            .get_channel_by_id(None, &channel.get_id())
3185            .await?
3186            .context("a value should be present")?;
3187
3188        assert!(
3189            matches!(event_type, Some(ChainEventType::ChannelClosureInitiated(c)) if c == channel),
3190            "must return updated channel entry"
3191        );
3192
3193        assert_eq!(
3194            channel.status,
3195            ChannelStatus::PendingToClose(timestamp),
3196            "channel status must match"
3197        );
3198        Ok(())
3199    }
3200
3201    #[tokio::test]
3202    async fn on_node_safe_registry_registered() -> anyhow::Result<()> {
3203        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3204        let rpc_operations = MockIndexerRpcOperations::new();
3205        // ==> set mock expectations here
3206        let clonable_rpc_operations = ClonableMockOperations {
3207            //
3208            inner: Arc::new(rpc_operations),
3209        };
3210        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3211
3212        let encoded_data = ().abi_encode();
3213
3214        let safe_registered_log = SerializableLog {
3215            address: handlers.addresses.safe_registry,
3216            topics: vec![
3217                hopr_bindings::hoprnodesaferegistry::HoprNodeSafeRegistry::RegisteredNodeSafe::SIGNATURE_HASH.into(),
3218                // RegisteredNodeSafeFilter::signature().into(),
3219                H256::from_slice(&SAFE_INSTANCE_ADDR.to_bytes32()).into(),
3220                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
3221            ],
3222            data: encoded_data,
3223            ..test_log()
3224        };
3225
3226        let event_type = db
3227            .begin_transaction()
3228            .await?
3229            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, safe_registered_log, true).await }))
3230            .await?;
3231
3232        assert!(matches!(event_type, Some(ChainEventType::NodeSafeRegistered(addr)) if addr == *SAFE_INSTANCE_ADDR));
3233
3234        // Nothing to check in the DB here, since we do not track this
3235        Ok(())
3236    }
3237
3238    #[tokio::test]
3239    async fn on_node_safe_registry_deregistered() -> anyhow::Result<()> {
3240        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3241        let rpc_operations = MockIndexerRpcOperations::new();
3242        // ==> set mock expectations here
3243        let clonable_rpc_operations = ClonableMockOperations {
3244            //
3245            inner: Arc::new(rpc_operations),
3246        };
3247        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3248
3249        // Nothing to write to the DB here, since we do not track this
3250
3251        let encoded_data = ().abi_encode();
3252
3253        let safe_registered_log = SerializableLog {
3254            address: handlers.addresses.safe_registry,
3255            topics: vec![
3256                hopr_bindings::hoprnodesaferegistry::HoprNodeSafeRegistry::DergisteredNodeSafe::SIGNATURE_HASH.into(),
3257                // DergisteredNodeSafeFilter::signature().into(),
3258                H256::from_slice(&SAFE_INSTANCE_ADDR.to_bytes32()).into(),
3259                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
3260            ],
3261            data: encoded_data,
3262            ..test_log()
3263        };
3264
3265        let event_type = db
3266            .begin_transaction()
3267            .await?
3268            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, safe_registered_log, true).await }))
3269            .await?;
3270
3271        assert!(
3272            event_type.is_none(),
3273            "there's no associated chain event type with safe deregistration"
3274        );
3275
3276        // Nothing to check in the DB here, since we do not track this
3277        Ok(())
3278    }
3279
3280    #[tokio::test]
3281    async fn ticket_price_update() -> anyhow::Result<()> {
3282        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3283        let rpc_operations = MockIndexerRpcOperations::new();
3284        // ==> set mock expectations here
3285        let clonable_rpc_operations = ClonableMockOperations {
3286            //
3287            inner: Arc::new(rpc_operations),
3288        };
3289        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3290
3291        let encoded_data = (U256::from(1u64), U256::from(123u64)).abi_encode();
3292
3293        let price_change_log = SerializableLog {
3294            address: handlers.addresses.price_oracle,
3295            topics: vec![
3296                hopr_bindings::hoprticketpriceoracle::HoprTicketPriceOracle::TicketPriceUpdated::SIGNATURE_HASH.into(),
3297                // TicketPriceUpdatedFilter::signature().into()
3298            ],
3299            data: encoded_data,
3300            // data: encode(&[Token::Uint(EthU256::from(1u64)), Token::Uint(EthU256::from(123u64))]).into(),
3301            ..test_log()
3302        };
3303
3304        assert_eq!(db.get_indexer_data(None).await?.ticket_price, None);
3305
3306        let event_type = db
3307            .begin_transaction()
3308            .await?
3309            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, price_change_log, true).await }))
3310            .await?;
3311
3312        assert!(
3313            event_type.is_none(),
3314            "there's no associated chain event type with price oracle"
3315        );
3316
3317        assert_eq!(
3318            db.get_indexer_data(None).await?.ticket_price.map(|p| p.amount()),
3319            Some(primitive_types::U256::from(123u64))
3320        );
3321        Ok(())
3322    }
3323
3324    #[tokio::test]
3325    async fn minimum_win_prob_update() -> anyhow::Result<()> {
3326        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3327        let rpc_operations = MockIndexerRpcOperations::new();
3328        // ==> set mock expectations here
3329        let clonable_rpc_operations = ClonableMockOperations {
3330            //
3331            inner: Arc::new(rpc_operations),
3332        };
3333        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3334
3335        let encoded_data = (
3336            U256::from_be_slice(WinningProbability::ALWAYS.as_ref()),
3337            U256::from_be_slice(WinningProbability::try_from_f64(0.5)?.as_ref()),
3338        )
3339            .abi_encode();
3340
3341        let win_prob_change_log = SerializableLog {
3342            address: handlers.addresses.win_prob_oracle,
3343            topics: vec![
3344                hopr_bindings::hoprwinningprobabilityoracle::HoprWinningProbabilityOracle::WinProbUpdated::SIGNATURE_HASH.into()],
3345            data: encoded_data,
3346            ..test_log()
3347        };
3348
3349        assert_eq!(
3350            db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
3351            1.0
3352        );
3353
3354        let event_type = db
3355            .begin_transaction()
3356            .await?
3357            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, win_prob_change_log, true).await }))
3358            .await?;
3359
3360        assert!(
3361            event_type.is_none(),
3362            "there's no associated chain event type with winning probability change"
3363        );
3364
3365        assert_eq!(
3366            db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
3367            0.5
3368        );
3369        Ok(())
3370    }
3371
3372    #[tokio::test]
3373    async fn lowering_minimum_win_prob_update_should_reject_non_satisfying_unredeemed_tickets() -> anyhow::Result<()> {
3374        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3375        db.set_minimum_incoming_ticket_win_prob(None, 0.1.try_into()?).await?;
3376
3377        let new_minimum = 0.5;
3378        let ticket_win_probs = [0.1, 1.0, 0.3, 0.2];
3379
3380        let channel_1 = ChannelEntry::new(
3381            *COUNTERPARTY_CHAIN_ADDRESS,
3382            *SELF_CHAIN_ADDRESS,
3383            primitive_types::U256::from((1u128 << 96) - 1).into(),
3384            3_u32.into(),
3385            ChannelStatus::Open,
3386            primitive_types::U256::one(),
3387        );
3388
3389        db.upsert_channel(None, channel_1).await?;
3390
3391        let ticket = mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, 1, ticket_win_probs[0])?;
3392        db.upsert_ticket(None, ticket).await?;
3393
3394        let ticket = mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, 2, ticket_win_probs[1])?;
3395        db.upsert_ticket(None, ticket).await?;
3396
3397        let tickets = db.get_tickets((&channel_1).into()).await?;
3398        assert_eq!(tickets.len(), 2);
3399
3400        // ---
3401
3402        let other_counterparty = ChainKeypair::random();
3403        let channel_2 = ChannelEntry::new(
3404            other_counterparty.public().to_address(),
3405            *SELF_CHAIN_ADDRESS,
3406            primitive_types::U256::from((1u128 << 96) - 1).into(),
3407            3_u32.into(),
3408            ChannelStatus::Open,
3409            primitive_types::U256::one(),
3410        );
3411
3412        db.upsert_channel(None, channel_2).await?;
3413
3414        let ticket = mock_acknowledged_ticket(&other_counterparty, &SELF_CHAIN_KEY, 1, ticket_win_probs[2])?;
3415        db.upsert_ticket(None, ticket).await?;
3416
3417        let ticket = mock_acknowledged_ticket(&other_counterparty, &SELF_CHAIN_KEY, 2, ticket_win_probs[3])?;
3418        db.upsert_ticket(None, ticket).await?;
3419
3420        let tickets = db.get_tickets((&channel_2).into()).await?;
3421        assert_eq!(tickets.len(), 2);
3422
3423        let stats = db.get_ticket_statistics(None).await?;
3424        assert_eq!(HoprBalance::zero(), stats.rejected_value);
3425
3426        let rpc_operations = MockIndexerRpcOperations::new();
3427        // ==> set mock expectations here
3428        let clonable_rpc_operations = ClonableMockOperations {
3429            //
3430            inner: Arc::new(rpc_operations),
3431        };
3432        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3433
3434        let encoded_data = (
3435            U256::from_be_slice(WinningProbability::try_from(0.1)?.as_ref()),
3436            U256::from_be_slice(WinningProbability::try_from(new_minimum)?.as_ref()),
3437        )
3438            .abi_encode();
3439
3440        let win_prob_change_log = SerializableLog {
3441            address: handlers.addresses.win_prob_oracle,
3442            topics: vec![
3443                hopr_bindings::hoprwinningprobabilityoracle::HoprWinningProbabilityOracle::WinProbUpdated::SIGNATURE_HASH.into(),
3444            ],
3445            data: encoded_data,
3446            ..test_log()
3447        };
3448
3449        let event_type = db
3450            .begin_transaction()
3451            .await?
3452            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, win_prob_change_log, true).await }))
3453            .await?;
3454
3455        assert!(
3456            event_type.is_none(),
3457            "there's no associated chain event type with winning probability change"
3458        );
3459
3460        assert_eq!(
3461            db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
3462            new_minimum
3463        );
3464
3465        let tickets = db.get_tickets((&channel_1).into()).await?;
3466        assert_eq!(tickets.len(), 1);
3467
3468        let tickets = db.get_tickets((&channel_2).into()).await?;
3469        assert_eq!(tickets.len(), 0);
3470
3471        let stats = db.get_ticket_statistics(None).await?;
3472        let rejected_value: primitive_types::U256 = ticket_win_probs
3473            .iter()
3474            .filter(|p| **p < new_minimum)
3475            .map(|p| {
3476                primitive_types::U256::from(PRICE_PER_PACKET)
3477                    .div_f64(*p)
3478                    .expect("must divide")
3479            })
3480            .reduce(|a, b| a + b)
3481            .ok_or(anyhow!("must sum"))?;
3482
3483        assert_eq!(HoprBalance::from(rejected_value), stats.rejected_value);
3484
3485        Ok(())
3486    }
3487}