hopr_chain_indexer/
handlers.rs

1use std::{
2    cmp::Ordering,
3    fmt::{Debug, Formatter},
4    ops::Add,
5    sync::Arc,
6    time::{Duration, SystemTime},
7};
8
9use alloy::{primitives::B256, sol_types::SolEventInterface};
10use async_trait::async_trait;
11use hopr_bindings::{
12    hoprannouncements::HoprAnnouncements::HoprAnnouncementsEvents, hoprchannels::HoprChannels::HoprChannelsEvents,
13    hoprnetworkregistry::HoprNetworkRegistry::HoprNetworkRegistryEvents,
14    hoprnodemanagementmodule::HoprNodeManagementModule::HoprNodeManagementModuleEvents,
15    hoprnodesaferegistry::HoprNodeSafeRegistry::HoprNodeSafeRegistryEvents,
16    hoprticketpriceoracle::HoprTicketPriceOracle::HoprTicketPriceOracleEvents, hoprtoken::HoprToken::HoprTokenEvents,
17    hoprwinningprobabilityoracle::HoprWinningProbabilityOracle::HoprWinningProbabilityOracleEvents,
18};
19use hopr_chain_rpc::{HoprIndexerRpcOperations, Log};
20use hopr_chain_types::{
21    ContractAddresses,
22    chain_events::{ChainEventType, NetworkRegistryStatus, SignificantChainEvent},
23};
24use hopr_crypto_types::prelude::*;
25use hopr_db_sql::{
26    HoprDbAllOperations, OpenTransaction,
27    api::{info::DomainSeparator, tickets::TicketSelector},
28    errors::DbSqlError,
29    prelude::TicketMarker,
30};
31use hopr_internal_types::prelude::*;
32use hopr_primitive_types::prelude::*;
33use tracing::{debug, error, info, trace, warn};
34
35use crate::errors::{CoreEthereumIndexerError, Result};
36
37#[cfg(all(feature = "prometheus", not(test)))]
38lazy_static::lazy_static! {
39    static ref METRIC_INDEXER_LOG_COUNTERS: hopr_metrics::MultiCounter =
40        hopr_metrics::MultiCounter::new(
41            "hopr_indexer_contract_log_count",
42            "Counts of different HOPR contract logs processed by the Indexer",
43            &["contract"]
44    ).unwrap();
45}
46
47/// Event handling an object for on-chain operations
48///
49/// Once an on-chain operation is recorded by the [crate::block::Indexer], it is pre-processed
50/// and passed on to this object that handles event-specific actions for each on-chain operation.
51#[derive(Clone)]
52pub struct ContractEventHandlers<T, Db> {
53    /// channels, announcements, network_registry, token: contract addresses
54    /// whose event we process
55    addresses: Arc<ContractAddresses>,
56    /// Safe on-chain address which we are monitoring
57    safe_address: Address,
58    /// own address, aka message sender
59    chain_key: ChainKeypair, // TODO: store only address here once Ticket have TryFrom DB
60    /// callbacks to inform other modules
61    db: Db,
62    /// rpc operations to interact with the chain
63    rpc_operations: T,
64}
65
66impl<T, Db> Debug for ContractEventHandlers<T, Db> {
67    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct("ContractEventHandler")
69            .field("addresses", &self.addresses)
70            .field("safe_address", &self.safe_address)
71            .field("chain_key", &self.chain_key)
72            .finish_non_exhaustive()
73    }
74}
75
76impl<T, Db> ContractEventHandlers<T, Db>
77where
78    T: HoprIndexerRpcOperations + Clone + Send + 'static,
79    Db: HoprDbAllOperations + Clone,
80{
81    /// Creates a new instance of contract event handlers with RPC operations support.
82    ///
83    /// This constructor initializes the event handlers with all necessary dependencies
84    /// for processing blockchain events and making direct RPC calls for fresh state data.
85    ///
86    /// # Type Parameters
87    /// * `T` - Type implementing `HoprIndexerRpcOperations` for blockchain queries
88    ///
89    /// # Arguments
90    /// * `addresses` - Contract addresses configuration
91    /// * `safe_address` - The node's safe address for filtering relevant events
92    /// * `chain_key` - Cryptographic key for chain operations
93    /// * `db` - Database connection for persistent storage
94    /// * `rpc_operations` - RPC interface for direct blockchain queries
95    ///
96    /// # Returns
97    /// * `Self` - New instance of `ContractEventHandlers`
98    pub fn new(
99        addresses: ContractAddresses,
100        safe_address: Address,
101        chain_key: ChainKeypair,
102        db: Db,
103        rpc_operations: T,
104    ) -> Self {
105        Self {
106            addresses: Arc::new(addresses),
107            safe_address,
108            chain_key,
109            db,
110            rpc_operations,
111        }
112    }
113
114    async fn on_announcement_event(
115        &self,
116        tx: &OpenTransaction,
117        event: HoprAnnouncementsEvents,
118        block_number: u32,
119        _is_synced: bool,
120    ) -> Result<Option<ChainEventType>> {
121        #[cfg(all(feature = "prometheus", not(test)))]
122        METRIC_INDEXER_LOG_COUNTERS.increment(&["announcements"]);
123
124        match event {
125            HoprAnnouncementsEvents::AddressAnnouncement(address_announcement) => {
126                debug!(
127                    multiaddress = &address_announcement.baseMultiaddr,
128                    address = &address_announcement.node.to_string(),
129                    "on_announcement_event: AddressAnnouncement",
130                );
131                // safeguard against empty multiaddrs, skip
132                if address_announcement.baseMultiaddr.is_empty() {
133                    warn!(
134                        address = %address_announcement.node,
135                        "encountered empty multiaddress announcement",
136                    );
137                    return Ok(None);
138                }
139                let node_address: Address = address_announcement.node.into();
140
141                return match self
142                    .db
143                    .insert_announcement(
144                        Some(tx),
145                        node_address,
146                        address_announcement.baseMultiaddr.parse()?,
147                        block_number,
148                    )
149                    .await
150                {
151                    Ok(account) => Ok(Some(ChainEventType::Announcement {
152                        peer: account.public_key.into(),
153                        address: account.chain_addr,
154                        multiaddresses: vec![account.get_multiaddr().expect("not must contain multiaddr")],
155                    })),
156                    Err(DbSqlError::MissingAccount) => Err(CoreEthereumIndexerError::AnnounceBeforeKeyBinding),
157                    Err(e) => Err(e.into()),
158                };
159            }
160            HoprAnnouncementsEvents::KeyBinding(key_binding) => {
161                debug!(
162                    address = %key_binding.chain_key,
163                    public_key = %key_binding.ed25519_pub_key,
164                    "on_announcement_event: KeyBinding",
165                );
166                match KeyBinding::from_parts(
167                    key_binding.chain_key.into(),
168                    key_binding.ed25519_pub_key.0.try_into()?,
169                    OffchainSignature::try_from((key_binding.ed25519_sig_0.0, key_binding.ed25519_sig_1.0))?,
170                ) {
171                    Ok(binding) => {
172                        match self
173                            .db
174                            .insert_account(
175                                Some(tx),
176                                AccountEntry {
177                                    public_key: binding.packet_key,
178                                    chain_addr: binding.chain_key,
179                                    entry_type: AccountType::NotAnnounced,
180                                    published_at: block_number,
181                                },
182                            )
183                            .await
184                        {
185                            Ok(_) => (),
186                            Err(err) => {
187                                // We handle these errors gracefully and don't want the indexer to crash,
188                                // because anybody could write faulty entries into the announcement contract.
189                                error!(%err, "failed to store announcement key binding")
190                            }
191                        }
192                    }
193                    Err(e) => {
194                        warn!(
195                            address = ?key_binding.chain_key,
196                            error = %e,
197                            "Filtering announcement with invalid signature",
198
199                        )
200                    }
201                }
202            }
203            HoprAnnouncementsEvents::RevokeAnnouncement(revocation) => {
204                let node_address: Address = revocation.node.into();
205                match self.db.delete_all_announcements(Some(tx), node_address).await {
206                    Err(DbSqlError::MissingAccount) => {
207                        return Err(CoreEthereumIndexerError::RevocationBeforeKeyBinding);
208                    }
209                    Err(e) => return Err(e.into()),
210                    _ => {}
211                }
212            }
213        };
214
215        Ok(None)
216    }
217
218    async fn on_channel_event(
219        &self,
220        tx: &OpenTransaction,
221        event: HoprChannelsEvents,
222        is_synced: bool,
223    ) -> Result<Option<ChainEventType>> {
224        #[cfg(all(feature = "prometheus", not(test)))]
225        METRIC_INDEXER_LOG_COUNTERS.increment(&["channels"]);
226
227        match event {
228            HoprChannelsEvents::ChannelBalanceDecreased(balance_decreased) => {
229                let channel_id = balance_decreased.channelId.0.into();
230
231                let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
232                    Ok(channel) => channel,
233                    Err(e) => {
234                        error!(%channel_id, %e, "failed to begin channel update on on_channel_balance_decreased_event");
235                        return Err(e.into());
236                    }
237                };
238
239                trace!(
240                    %channel_id,
241                    is_channel = maybe_channel.is_some(),
242                    "on_channel_balance_decreased_event",
243                );
244
245                if let Some(channel_edits) = maybe_channel {
246                    let new_balance = HoprBalance::from_be_bytes(balance_decreased.newBalance.to_be_bytes::<12>());
247                    let diff = channel_edits.entry().balance - new_balance;
248
249                    let updated_channel = self
250                        .db
251                        .finish_channel_update(tx.into(), channel_edits.change_balance(new_balance))
252                        .await?
253                        .ok_or(CoreEthereumIndexerError::ProcessError(format!(
254                            "channel balance decreased event for channel {channel_id} did not return an updated \
255                             channel"
256                        )))?;
257
258                    if is_synced
259                        && (updated_channel.source == self.chain_key.public().to_address()
260                            || updated_channel.destination == self.chain_key.public().to_address())
261                    {
262                        info!("updating safe balance from chain after channel balance decreased event");
263                        match self.rpc_operations.get_hopr_balance(self.safe_address).await {
264                            Ok(balance) => {
265                                self.db.set_safe_hopr_balance(Some(tx), balance).await?;
266                            }
267                            Err(error) => {
268                                error!(%error, "error getting safe balance from chain after channel balance decreased event");
269                            }
270                        }
271                    }
272
273                    Ok(Some(ChainEventType::ChannelBalanceDecreased(updated_channel, diff)))
274                } else {
275                    error!(%channel_id, "observed balance decreased event for a channel that does not exist");
276                    self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
277                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
278                }
279            }
280            HoprChannelsEvents::ChannelBalanceIncreased(balance_increased) => {
281                let channel_id = balance_increased.channelId.0.into();
282
283                let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
284                    Ok(channel) => channel,
285                    Err(e) => {
286                        error!(%channel_id, %e, "failed to begin channel update on on_channel_balance_increased_event");
287                        return Err(e.into());
288                    }
289                };
290
291                trace!(
292                    %channel_id,
293                    is_channel = maybe_channel.is_some(),
294                    "on_channel_balance_increased_event",
295                );
296
297                if let Some(channel_edits) = maybe_channel {
298                    let new_balance = HoprBalance::from_be_bytes(balance_increased.newBalance.to_be_bytes::<12>());
299                    let diff = new_balance - channel_edits.entry().balance;
300
301                    let updated_channel = self
302                        .db
303                        .finish_channel_update(tx.into(), channel_edits.change_balance(new_balance))
304                        .await?
305                        .ok_or(CoreEthereumIndexerError::ProcessError(format!(
306                            "channel balance increased event for channel {channel_id} did not return an updated \
307                             channel"
308                        )))?;
309
310                    if updated_channel.source == self.chain_key.public().to_address() && is_synced {
311                        info!("updating safe balance from chain after channel balance increased event");
312                        match self.rpc_operations.get_hopr_balance(self.safe_address).await {
313                            Ok(balance) => {
314                                self.db.set_safe_hopr_balance(Some(tx), balance).await?;
315                            }
316                            Err(error) => {
317                                error!(%error, "error getting safe balance from chain after channel balance increased event");
318                            }
319                        }
320                        info!("updating safe allowance from chain after channel balance increased event");
321                        match self
322                            .rpc_operations
323                            .get_hopr_allowance(self.safe_address, self.addresses.channels)
324                            .await
325                        {
326                            Ok(allowance) => {
327                                self.db.set_safe_hopr_allowance(Some(tx), allowance).await?;
328                            }
329                            Err(error) => {
330                                error!(%error, "error getting safe allowance from chain after channel balance increased event");
331                            }
332                        }
333                    }
334
335                    Ok(Some(ChainEventType::ChannelBalanceIncreased(updated_channel, diff)))
336                } else {
337                    error!(%channel_id, "observed balance increased event for a channel that does not exist");
338                    self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
339                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
340                }
341            }
342            HoprChannelsEvents::ChannelClosed(channel_closed) => {
343                let channel_id = channel_closed.channelId.0.into();
344
345                let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
346                    Ok(channel) => channel,
347                    Err(e) => {
348                        error!(%channel_id, %e, "failed to begin channel update on on_channel_closed_event");
349                        return Err(e.into());
350                    }
351                };
352
353                trace!(
354                    %channel_id,
355                    is_channel = maybe_channel.is_some(),
356                    "on_channel_closed_event",
357                );
358
359                if let Some(channel_edits) = maybe_channel {
360                    let channel_id = channel_edits.entry().get_id();
361                    let orientation = channel_edits.entry().orientation(&self.chain_key.public().to_address());
362
363                    // If the channel is our own (incoming or outgoing) reset its fields
364                    // and change its state to Closed.
365                    let updated_channel = if let Some((direction, _)) = orientation {
366                        // Set all channel fields like we do on-chain on close
367                        let channel_edits = channel_edits
368                            .change_status(ChannelStatus::Closed)
369                            .change_balance(HoprBalance::zero())
370                            .change_ticket_index(0);
371
372                        let updated_channel = self.db.finish_channel_update(tx.into(), channel_edits).await?.ok_or(
373                            CoreEthereumIndexerError::ProcessError(format!(
374                                "channel closed event for channel {channel_id} did not return an updated channel",
375                            )),
376                        )?;
377
378                        // Perform additional tasks based on the channel's direction
379                        match direction {
380                            ChannelDirection::Incoming => {
381                                // On incoming channel, mark all unredeemed tickets as neglected
382                                self.db
383                                    .mark_tickets_as(updated_channel.into(), TicketMarker::Neglected)
384                                    .await?;
385                            }
386                            ChannelDirection::Outgoing => {
387                                // On outgoing channels, reset the current_ticket_index to zero
388                                self.db.reset_outgoing_ticket_index(channel_id).await?;
389                            }
390                        }
391                        updated_channel
392                    } else {
393                        // Closed channels that are not our own can be safely removed from the database
394                        let updated_channel = self
395                            .db
396                            .finish_channel_update(tx.into(), channel_edits.delete())
397                            .await?
398                            .ok_or(CoreEthereumIndexerError::ProcessError(format!(
399                                "channel closed event for channel {channel_id} did not return an updated channel",
400                            )))?;
401
402                        debug!(%channel_id, "foreign closed closed channel was deleted");
403                        updated_channel
404                    };
405
406                    Ok(Some(ChainEventType::ChannelClosed(updated_channel)))
407                } else {
408                    error!(%channel_id, "observed closure finalization event for a channel that does not exist.");
409                    self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
410                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
411                }
412            }
413            HoprChannelsEvents::ChannelOpened(channel_opened) => {
414                let source: Address = channel_opened.source.into();
415                let destination: Address = channel_opened.destination.into();
416                let channel_id = generate_channel_id(&source, &destination);
417
418                let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
419                    Ok(channel) => channel,
420                    Err(e) => {
421                        error!(%source, %destination, %channel_id, %e, "failed to begin channel update on on_channel_opened_event");
422                        return Err(e.into());
423                    }
424                };
425
426                let channel = if let Some(channel_edits) = maybe_channel {
427                    // Check that we're not receiving the Open event without the channel being Close prior, or that the
428                    // channel is not yet corrupted
429
430                    if channel_edits.entry().status != ChannelStatus::Closed {
431                        warn!(%source, %destination, %channel_id, "received Open event for a channel that is not Closed, marking it as corrupted");
432
433                        self.db.finish_channel_update(tx.into(), channel_edits.delete()).await?;
434                        self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
435
436                        return Ok(None);
437                    }
438
439                    trace!(%source, %destination, %channel_id, "on_channel_reopened_event");
440
441                    let current_epoch = channel_edits.entry().channel_epoch;
442
443                    // cleanup tickets from previous epochs on channel re-opening
444                    if source == self.chain_key.public().to_address()
445                        || destination == self.chain_key.public().to_address()
446                    {
447                        self.db
448                            .mark_tickets_as(TicketSelector::new(channel_id, current_epoch), TicketMarker::Neglected)
449                            .await?;
450
451                        self.db.reset_outgoing_ticket_index(channel_id).await?;
452                    }
453
454                    // set all channel fields like we do on-chain on close
455                    self.db
456                        .finish_channel_update(
457                            tx.into(),
458                            channel_edits
459                                .change_ticket_index(0_u32)
460                                .change_epoch(current_epoch.add(1))
461                                .change_status(ChannelStatus::Open),
462                        )
463                        .await?
464                        .ok_or(CoreEthereumIndexerError::ProcessError(format!(
465                            "channel opened event for channel {channel_id} did not return an updated channel",
466                        )))?
467                } else {
468                    trace!(%source, %destination, %channel_id, "on_channel_opened_event");
469
470                    let new_channel = ChannelEntry::new(
471                        source,
472                        destination,
473                        0_u32.into(),
474                        0_u32.into(),
475                        ChannelStatus::Open,
476                        1_u32.into(),
477                    );
478
479                    self.db.upsert_channel(tx.into(), new_channel).await?;
480                    new_channel
481                };
482
483                Ok(Some(ChainEventType::ChannelOpened(channel)))
484            }
485            HoprChannelsEvents::TicketRedeemed(ticket_redeemed) => {
486                let channel_id = ticket_redeemed.channelId.0.into();
487
488                let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
489                    Ok(channel) => channel,
490                    Err(e) => {
491                        error!(%channel_id, %e, "failed to begin channel update on on_ticket_redeemed_event");
492                        return Err(e.into());
493                    }
494                };
495
496                if let Some(channel_edits) = maybe_channel {
497                    let ack_ticket = match channel_edits.entry().direction(&self.chain_key.public().to_address()) {
498                        // For channels where destination is us, it means that our ticket
499                        // has been redeemed, so mark it in the DB as redeemed
500                        Some(ChannelDirection::Incoming) => {
501                            // Filter all BeingRedeemed tickets in this channel and its current epoch
502                            let mut matching_tickets = self
503                                .db
504                                .get_tickets(
505                                    TicketSelector::from(channel_edits.entry())
506                                        .with_state(AcknowledgedTicketStatus::BeingRedeemed),
507                                )
508                                .await?
509                                .into_iter()
510                                .filter(|ticket| {
511                                    // The ticket that has been redeemed at this point has: index + index_offset - 1 ==
512                                    // new_ticket_index - 1 Since unaggregated
513                                    // tickets have index_offset = 1, for the unagg case this leads to: index ==
514                                    // new_ticket_index - 1
515                                    let ticket_idx = ticket.verified_ticket().index;
516                                    let ticket_off = ticket.verified_ticket().index_offset as u64;
517
518                                    ticket_idx + ticket_off == ticket_redeemed.newTicketIndex.to::<u64>()
519                                })
520                                .collect::<Vec<_>>();
521
522                            match matching_tickets.len().cmp(&1) {
523                                Ordering::Equal => {
524                                    let ack_ticket = matching_tickets.pop().unwrap();
525
526                                    self.db
527                                        .mark_tickets_as((&ack_ticket).into(), TicketMarker::Redeemed)
528                                        .await?;
529                                    info!(%ack_ticket, "ticket marked as redeemed");
530                                    Some(ack_ticket)
531                                }
532                                Ordering::Less => {
533                                    error!(
534                                        idx = %ticket_redeemed.newTicketIndex.to::<u64>() - 1,
535                                        entry = %channel_edits.entry(),
536                                        "could not find acknowledged 'BeingRedeemed' ticket",
537                                    );
538                                    // This is not an error, because the ticket might've become neglected before
539                                    // the ticket redemption could finish
540                                    None
541                                }
542                                Ordering::Greater => {
543                                    error!(
544                                        count = matching_tickets.len(),
545                                        index = %ticket_redeemed.newTicketIndex.to::<u64>() - 1,
546                                        entry = %channel_edits.entry(),
547                                        "found tickets matching 'BeingRedeemed'",
548                                    );
549
550                                    let entry_str = channel_edits.entry().to_string();
551
552                                    self.db.finish_channel_update(tx.into(), channel_edits.delete()).await?;
553                                    self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
554
555                                    return Err(CoreEthereumIndexerError::ProcessError(format!(
556                                        "multiple tickets matching idx {} found in {}",
557                                        ticket_redeemed.newTicketIndex.to::<u64>() - 1,
558                                        entry_str
559                                    )));
560                                }
561                            }
562                        }
563                        // For the channel where the source is us, it means a ticket that we
564                        // issue has been redeemed.
565                        // So we just need to be sure our outgoing ticket
566                        // index value in the cache is at least the index of the redeemed ticket
567                        Some(ChannelDirection::Outgoing) => {
568                            // We need to ensure the outgoing ticket index is at least this new value
569                            debug!(channel = %channel_edits.entry(), "observed redeem event on an outgoing channel");
570                            self.db
571                                .compare_and_set_outgoing_ticket_index(
572                                    channel_edits.entry().get_id(),
573                                    ticket_redeemed.newTicketIndex.to::<u64>(),
574                                )
575                                .await?;
576                            None
577                        }
578                        // For a channel where neither source nor destination is us, we don't care
579                        None => {
580                            // Not our redeem event
581                            debug!(channel = %channel_edits.entry(), "observed redeem event on a foreign channel");
582                            None
583                        }
584                    };
585
586                    // Update the ticket index on the Channel entry and get the updated model
587                    let channel = self
588                        .db
589                        .finish_channel_update(
590                            tx.into(),
591                            channel_edits.change_ticket_index(U256::from_be_bytes(
592                                ticket_redeemed.newTicketIndex.to_be_bytes::<6>(),
593                            )),
594                        )
595                        .await?
596                        .ok_or(CoreEthereumIndexerError::ProcessError(format!(
597                            "ticket redeemed event for channel {channel_id} did not return an updated channel",
598                        )))?;
599
600                    // Neglect all the tickets in this channel
601                    // which have a lower ticket index than `ticket_redeemed.new_ticket_index`
602                    self.db
603                        .mark_tickets_as(
604                            TicketSelector::from(&channel)
605                                .with_index_range(..ticket_redeemed.newTicketIndex.to::<u64>()),
606                            TicketMarker::Neglected,
607                        )
608                        .await?;
609
610                    Ok(Some(ChainEventType::TicketRedeemed(channel, ack_ticket)))
611                } else {
612                    error!(%channel_id, "observed ticket redeem on a channel that we don't have in the DB");
613                    self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
614                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
615                }
616            }
617            HoprChannelsEvents::OutgoingChannelClosureInitiated(closure_initiated) => {
618                let channel_id = closure_initiated.channelId.0.into();
619
620                let maybe_channel = match self.db.begin_channel_update(tx.into(), &channel_id).await {
621                    Ok(channel) => channel,
622                    Err(e) => {
623                        error!(%channel_id, %e, "failed to begin channel update on on_outgoing_channel_closure_initiated_event");
624                        return Err(e.into());
625                    }
626                };
627
628                let closure_time: u32 = closure_initiated.closureTime;
629                if let Some(channel_edits) = maybe_channel {
630                    let new_status = ChannelStatus::PendingToClose(
631                        SystemTime::UNIX_EPOCH.add(Duration::from_secs(closure_time.into())),
632                    );
633
634                    let channel = self
635                        .db
636                        .finish_channel_update(tx.into(), channel_edits.change_status(new_status))
637                        .await?
638                        .ok_or(CoreEthereumIndexerError::ProcessError(format!(
639                            "channel closure initiation event for channel {channel_id} did not return an updated \
640                             channel",
641                        )))?;
642
643                    Ok(Some(ChainEventType::ChannelClosureInitiated(channel)))
644                } else {
645                    error!(%channel_id, "observed channel closure initiation on a channel that we don't have in the DB");
646                    self.db.upsert_corrupted_channel(tx.into(), channel_id).await?;
647                    Err(CoreEthereumIndexerError::ChannelDoesNotExist)
648                }
649            }
650            HoprChannelsEvents::DomainSeparatorUpdated(domain_separator_updated) => {
651                self.db
652                    .set_domain_separator(
653                        Some(tx),
654                        DomainSeparator::Channel,
655                        domain_separator_updated.domainSeparator.0.into(),
656                    )
657                    .await?;
658
659                Ok(None)
660            }
661            HoprChannelsEvents::LedgerDomainSeparatorUpdated(ledger_domain_separator_updated) => {
662                self.db
663                    .set_domain_separator(
664                        Some(tx),
665                        DomainSeparator::Ledger,
666                        ledger_domain_separator_updated.ledgerDomainSeparator.0.into(),
667                    )
668                    .await?;
669
670                Ok(None)
671            }
672        }
673    }
674
675    async fn on_token_event(
676        &self,
677        tx: &OpenTransaction,
678        event: HoprTokenEvents,
679        is_synced: bool,
680    ) -> Result<Option<ChainEventType>> {
681        #[cfg(all(feature = "prometheus", not(test)))]
682        METRIC_INDEXER_LOG_COUNTERS.increment(&["token"]);
683
684        match event {
685            HoprTokenEvents::Transfer(transferred) => {
686                let from: Address = transferred.from.into();
687                let to: Address = transferred.to.into();
688
689                trace!(
690                    safe_address = %&self.safe_address, %from, %to,
691                    "on_token_transfer_event"
692                );
693
694                if to.ne(&self.safe_address) && from.ne(&self.safe_address) {
695                    error!(
696                    safe_address = %&self.safe_address, %from, %to,
697                        "filter misconfiguration: transfer event not involving the safe");
698                    return Ok(None);
699                }
700
701                if is_synced {
702                    info!("updating safe balance from chain after transfer event");
703                    match self.rpc_operations.get_hopr_balance(self.safe_address).await {
704                        Ok(balance) => {
705                            self.db.set_safe_hopr_balance(Some(tx), balance).await?;
706                        }
707                        Err(error) => {
708                            error!(%error, "error getting safe balance from chain after transfer event");
709                        }
710                    }
711                    info!("updating safe allowance from chain after transfer event");
712                    match self
713                        .rpc_operations
714                        .get_hopr_allowance(self.safe_address, self.addresses.channels)
715                        .await
716                    {
717                        Ok(allowance) => {
718                            self.db.set_safe_hopr_allowance(Some(tx), allowance).await?;
719                        }
720                        Err(error) => {
721                            error!(%error, "error getting safe allowance from chain after transfer event");
722                        }
723                    }
724                }
725            }
726            HoprTokenEvents::Approval(approved) => {
727                let owner: Address = approved.owner.into();
728                let spender: Address = approved.spender.into();
729
730                trace!(
731                    address = %&self.safe_address, %owner, %spender, allowance = %approved.value,
732                    "on_token_approval_event",
733
734                );
735
736                if owner.ne(&self.safe_address) || spender.ne(&self.addresses.channels) {
737                    error!(
738                        address = %&self.safe_address, %owner, %spender, allowance = %approved.value,
739                        "filter misconfiguration: approval event not involving the safe and channels contract");
740                    return Ok(None);
741                }
742
743                // if approval is for tokens on Safe contract to be spent by HoprChannels
744                if is_synced {
745                    info!("updating safe allowance from chain after approval event");
746                    match self
747                        .rpc_operations
748                        .get_hopr_allowance(self.safe_address, self.addresses.channels)
749                        .await
750                    {
751                        Ok(allowance) => {
752                            self.db.set_safe_hopr_allowance(Some(tx), allowance).await?;
753                        }
754                        Err(error) => {
755                            error!(%error, "error getting safe allowance from chain after approval event");
756                        }
757                    }
758                }
759            }
760            _ => error!("Implement all the other filters for HoprTokenEvents"),
761        }
762
763        Ok(None)
764    }
765
766    async fn on_network_registry_event(
767        &self,
768        tx: &OpenTransaction,
769        event: HoprNetworkRegistryEvents,
770        _is_synced: bool,
771    ) -> Result<Option<ChainEventType>> {
772        #[cfg(all(feature = "prometheus", not(test)))]
773        METRIC_INDEXER_LOG_COUNTERS.increment(&["network_registry"]);
774
775        match event {
776            HoprNetworkRegistryEvents::DeregisteredByManager(deregistered) => {
777                debug!(
778                    node_address = %deregistered.nodeAddress,
779                    "on_network_registry_deregistered_by_manager_event",
780                );
781                let node_address: Address = deregistered.nodeAddress.into();
782                self.db
783                    .set_access_in_network_registry(Some(tx), node_address, false)
784                    .await?;
785
786                return Ok(Some(ChainEventType::NetworkRegistryUpdate(
787                    node_address,
788                    NetworkRegistryStatus::Denied,
789                )));
790            }
791            HoprNetworkRegistryEvents::Deregistered(deregistered) => {
792                debug!(
793                    node_address = %deregistered.nodeAddress,
794                    "on_network_registry_deregistered_event",
795                );
796                let node_address: Address = deregistered.nodeAddress.into();
797                self.db
798                    .set_access_in_network_registry(Some(tx), node_address, false)
799                    .await?;
800
801                return Ok(Some(ChainEventType::NetworkRegistryUpdate(
802                    node_address,
803                    NetworkRegistryStatus::Denied,
804                )));
805            }
806            HoprNetworkRegistryEvents::RegisteredByManager(registered) => {
807                let node_address: Address = registered.nodeAddress.into();
808                debug!(
809                    %node_address,
810                    "Node registered by manager, adding to the network registry",
811                );
812                self.db
813                    .set_access_in_network_registry(Some(tx), node_address, true)
814                    .await?;
815
816                if node_address == self.chain_key.public().to_address() {
817                    info!(
818                        "This node has been added to the registry, node activation process continues on: http://hub.hoprnet.org/."
819                    );
820                }
821
822                return Ok(Some(ChainEventType::NetworkRegistryUpdate(
823                    node_address,
824                    NetworkRegistryStatus::Allowed,
825                )));
826            }
827            HoprNetworkRegistryEvents::Registered(registered) => {
828                let node_address: Address = registered.nodeAddress.into();
829                debug!(
830                    %node_address,
831                    "Node registered, adding to the network registry",
832                );
833                self.db
834                    .set_access_in_network_registry(Some(tx), node_address, true)
835                    .await?;
836
837                if node_address == self.chain_key.public().to_address() {
838                    info!(
839                        "This node has been added to the registry, node can now continue the node activation process on: http://hub.hoprnet.org/."
840                    );
841                }
842
843                return Ok(Some(ChainEventType::NetworkRegistryUpdate(
844                    node_address,
845                    NetworkRegistryStatus::Allowed,
846                )));
847            }
848            HoprNetworkRegistryEvents::EligibilityUpdated(eligibility_updated) => {
849                debug!(
850                    staking_account = %eligibility_updated.stakingAccount,
851                    eligibility = %eligibility_updated.eligibility,
852                    "Node eligibility updated, updating the safe eligibility",
853                );
854                let account: Address = eligibility_updated.stakingAccount.into();
855                self.db
856                    .set_safe_eligibility(Some(tx), account, eligibility_updated.eligibility)
857                    .await?;
858            }
859            HoprNetworkRegistryEvents::NetworkRegistryStatusUpdated(enabled) => {
860                debug!(enabled = enabled.isEnabled, "on_network_registry_status_updated_event",);
861                self.db
862                    .set_network_registry_enabled(Some(tx), enabled.isEnabled)
863                    .await?;
864            }
865            _ => {
866                debug!("on_network_registry_event - not implemented for event: {:?}", event);
867            } // Not important to at the moment
868        };
869
870        Ok(None)
871    }
872
873    async fn on_node_safe_registry_event(
874        &self,
875        tx: &OpenTransaction,
876        event: HoprNodeSafeRegistryEvents,
877        _is_synced: bool,
878    ) -> Result<Option<ChainEventType>> {
879        #[cfg(all(feature = "prometheus", not(test)))]
880        METRIC_INDEXER_LOG_COUNTERS.increment(&["safe_registry"]);
881
882        match event {
883            HoprNodeSafeRegistryEvents::RegisteredNodeSafe(registered) => {
884                if self.chain_key.public().to_address() == registered.nodeAddress.into() {
885                    info!(safe_address = %registered.safeAddress, "Node safe registered", );
886                    // NOTE: we don't store this state in the DB
887                    return Ok(Some(ChainEventType::NodeSafeRegistered(registered.safeAddress.into())));
888                }
889            }
890            HoprNodeSafeRegistryEvents::DergisteredNodeSafe(deregistered) => {
891                if self.chain_key.public().to_address() == deregistered.nodeAddress.into() {
892                    info!("Node safe unregistered");
893                    // NOTE: we don't store this state in the DB
894                }
895            }
896            HoprNodeSafeRegistryEvents::DomainSeparatorUpdated(domain_separator_updated) => {
897                self.db
898                    .set_domain_separator(
899                        Some(tx),
900                        DomainSeparator::SafeRegistry,
901                        domain_separator_updated.domainSeparator.0.into(),
902                    )
903                    .await?;
904            }
905        }
906
907        Ok(None)
908    }
909
910    async fn on_node_management_module_event(
911        &self,
912        _db: &OpenTransaction,
913        _event: HoprNodeManagementModuleEvents,
914        _is_synced: bool,
915    ) -> Result<Option<ChainEventType>> {
916        #[cfg(all(feature = "prometheus", not(test)))]
917        METRIC_INDEXER_LOG_COUNTERS.increment(&["node_management_module"]);
918
919        // Don't care at the moment
920        Ok(None)
921    }
922
923    async fn on_ticket_winning_probability_oracle_event(
924        &self,
925        tx: &OpenTransaction,
926        event: HoprWinningProbabilityOracleEvents,
927        _is_synced: bool,
928    ) -> Result<Option<ChainEventType>> {
929        #[cfg(all(feature = "prometheus", not(test)))]
930        METRIC_INDEXER_LOG_COUNTERS.increment(&["win_prob_oracle"]);
931
932        match event {
933            HoprWinningProbabilityOracleEvents::WinProbUpdated(update) => {
934                let old_minimum_win_prob: WinningProbability = update.oldWinProb.to_be_bytes().into();
935                let new_minimum_win_prob: WinningProbability = update.newWinProb.to_be_bytes().into();
936
937                trace!(
938                    %old_minimum_win_prob,
939                    %new_minimum_win_prob,
940                    "on_ticket_minimum_win_prob_updated",
941                );
942
943                self.db
944                    .set_minimum_incoming_ticket_win_prob(Some(tx), new_minimum_win_prob)
945                    .await?;
946
947                info!(
948                    %old_minimum_win_prob,
949                    %new_minimum_win_prob,
950                    "minimum ticket winning probability updated"
951                );
952
953                // If the old minimum was less strict, we need to mark of all the
954                // tickets below the new higher minimum as rejected
955                if old_minimum_win_prob.approx_cmp(&new_minimum_win_prob).is_lt() {
956                    let mut selector: Option<TicketSelector> = None;
957                    for channel in self.db.get_incoming_channels(tx.into()).await? {
958                        selector = selector
959                            .map(|s| s.also_on_channel(channel.get_id(), channel.channel_epoch))
960                            .or_else(|| Some(TicketSelector::from(channel)));
961                    }
962                    // Reject unredeemed tickets on all the channels with win prob lower than the new one
963                    if let Some(selector) = selector {
964                        let num_rejected = self
965                            .db
966                            .mark_tickets_as(
967                                selector.with_winning_probability(..new_minimum_win_prob),
968                                TicketMarker::Rejected,
969                            )
970                            .await?;
971                        info!(
972                            count = num_rejected,
973                            "unredeemed tickets were rejected, because the minimum winning probability has been \
974                             increased"
975                        );
976                    }
977                }
978            }
979            _ => {
980                // Ignore other events
981            }
982        }
983        Ok(None)
984    }
985
986    async fn on_ticket_price_oracle_event(
987        &self,
988        tx: &OpenTransaction,
989        event: HoprTicketPriceOracleEvents,
990        _is_synced: bool,
991    ) -> Result<Option<ChainEventType>> {
992        #[cfg(all(feature = "prometheus", not(test)))]
993        METRIC_INDEXER_LOG_COUNTERS.increment(&["price_oracle"]);
994
995        match event {
996            HoprTicketPriceOracleEvents::TicketPriceUpdated(update) => {
997                trace!(
998                    old = update._0.to_string(),
999                    new = update._1.to_string(),
1000                    "on_ticket_price_updated",
1001                );
1002
1003                self.db
1004                    .update_ticket_price(Some(tx), HoprBalance::from_be_bytes(update._1.to_be_bytes::<32>()))
1005                    .await?;
1006
1007                info!(price = %update._1, "ticket price updated");
1008            }
1009            HoprTicketPriceOracleEvents::OwnershipTransferred(_event) => {
1010                // ignore ownership transfer event
1011            }
1012        }
1013        Ok(None)
1014    }
1015
1016    #[tracing::instrument(level = "debug", skip(self, slog), fields(log=%slog))]
1017    async fn process_log_event(
1018        &self,
1019        tx: &OpenTransaction,
1020        slog: SerializableLog,
1021        is_synced: bool,
1022    ) -> Result<Option<ChainEventType>> {
1023        trace!(log = %slog, "log content");
1024
1025        let log = Log::from(slog.clone());
1026
1027        let primitive_log = alloy::primitives::Log::new(
1028            slog.address.into(),
1029            slog.topics.iter().map(|h| B256::from_slice(h.as_ref())).collect(),
1030            slog.data.clone().into(),
1031        )
1032        .ok_or_else(|| {
1033            CoreEthereumIndexerError::ProcessError(format!("failed to convert log to primitive log: {slog:?}"))
1034        })?;
1035
1036        if log.address.eq(&self.addresses.announcements) {
1037            let bn = log.block_number as u32;
1038            let event = HoprAnnouncementsEvents::decode_log(&primitive_log)?;
1039            self.on_announcement_event(tx, event.data, bn, is_synced).await
1040        } else if log.address.eq(&self.addresses.channels) {
1041            let event = HoprChannelsEvents::decode_log(&primitive_log)?;
1042            match self.on_channel_event(tx, event.data, is_synced).await {
1043                Ok(res) => Ok(res),
1044                Err(CoreEthereumIndexerError::ChannelDoesNotExist) => {
1045                    // This is not an error, just a log that we don't have the channel in the DB
1046                    debug!(
1047                        ?log,
1048                        "channel didn't exist in the db. Created a corrupted channel entry and ignored event"
1049                    );
1050                    Ok(None)
1051                }
1052                Err(e) => Err(e),
1053            }
1054        } else if log.address.eq(&self.addresses.network_registry) {
1055            let event = HoprNetworkRegistryEvents::decode_log(&primitive_log)?;
1056            self.on_network_registry_event(tx, event.data, is_synced).await
1057        } else if log.address.eq(&self.addresses.token) {
1058            let event = HoprTokenEvents::decode_log(&primitive_log)?;
1059            self.on_token_event(tx, event.data, is_synced).await
1060        } else if log.address.eq(&self.addresses.safe_registry) {
1061            let event = HoprNodeSafeRegistryEvents::decode_log(&primitive_log)?;
1062            self.on_node_safe_registry_event(tx, event.data, is_synced).await
1063        } else if log.address.eq(&self.addresses.module_implementation) {
1064            let event = HoprNodeManagementModuleEvents::decode_log(&primitive_log)?;
1065            self.on_node_management_module_event(tx, event.data, is_synced).await
1066        } else if log.address.eq(&self.addresses.price_oracle) {
1067            let event = HoprTicketPriceOracleEvents::decode_log(&primitive_log)?;
1068            self.on_ticket_price_oracle_event(tx, event.data, is_synced).await
1069        } else if log.address.eq(&self.addresses.win_prob_oracle) {
1070            let event = HoprWinningProbabilityOracleEvents::decode_log(&primitive_log)?;
1071            self.on_ticket_winning_probability_oracle_event(tx, event.data, is_synced)
1072                .await
1073        } else {
1074            #[cfg(all(feature = "prometheus", not(test)))]
1075            METRIC_INDEXER_LOG_COUNTERS.increment(&["unknown"]);
1076
1077            error!(
1078                address = %log.address, log = ?log,
1079                "on_event error - unknown contract address, received log"
1080            );
1081            return Err(CoreEthereumIndexerError::UnknownContract(log.address));
1082        }
1083    }
1084}
1085
1086#[async_trait]
1087impl<T, Db> crate::traits::ChainLogHandler for ContractEventHandlers<T, Db>
1088where
1089    T: HoprIndexerRpcOperations + Clone + Send + Sync + 'static,
1090    Db: HoprDbAllOperations + Clone + Debug + Send + Sync + 'static,
1091{
1092    fn contract_addresses(&self) -> Vec<Address> {
1093        vec![
1094            self.addresses.announcements,
1095            self.addresses.channels,
1096            self.addresses.module_implementation,
1097            self.addresses.network_registry,
1098            self.addresses.price_oracle,
1099            self.addresses.win_prob_oracle,
1100            self.addresses.safe_registry,
1101            self.addresses.token,
1102        ]
1103    }
1104
1105    fn contract_addresses_map(&self) -> Arc<ContractAddresses> {
1106        self.addresses.clone()
1107    }
1108
1109    fn safe_address(&self) -> Address {
1110        self.safe_address
1111    }
1112
1113    fn contract_address_topics(&self, contract: Address) -> Vec<B256> {
1114        if contract.eq(&self.addresses.announcements) {
1115            crate::constants::topics::announcement()
1116        } else if contract.eq(&self.addresses.channels) {
1117            crate::constants::topics::channel()
1118        } else if contract.eq(&self.addresses.module_implementation) {
1119            crate::constants::topics::module_implementation()
1120        } else if contract.eq(&self.addresses.network_registry) {
1121            crate::constants::topics::network_registry()
1122        } else if contract.eq(&self.addresses.price_oracle) {
1123            crate::constants::topics::ticket_price_oracle()
1124        } else if contract.eq(&self.addresses.win_prob_oracle) {
1125            crate::constants::topics::winning_prob_oracle()
1126        } else if contract.eq(&self.addresses.safe_registry) {
1127            crate::constants::topics::node_safe_registry()
1128        } else {
1129            panic!("use of unsupported contract address: {contract}");
1130        }
1131    }
1132
1133    async fn collect_log_event(&self, slog: SerializableLog, is_synced: bool) -> Result<Option<SignificantChainEvent>> {
1134        let myself = self.clone();
1135        self.db
1136            .begin_transaction()
1137            .await?
1138            .perform(move |tx| {
1139                let log = slog.clone();
1140                let tx_hash = Hash::from(log.tx_hash);
1141                let log_id = log.log_index;
1142                let block_id = log.block_number;
1143
1144                Box::pin(async move {
1145                    match myself.process_log_event(tx, log, is_synced).await {
1146                        // If a significant chain event can be extracted from the log
1147                        Ok(Some(event_type)) => {
1148                            let significant_event = SignificantChainEvent { tx_hash, event_type };
1149                            debug!(block_id, %tx_hash, log_id, ?significant_event, "indexer got significant_event");
1150                            Ok(Some(significant_event))
1151                        }
1152                        Ok(None) => {
1153                            debug!(block_id, %tx_hash, log_id, "no significant event in log");
1154                            Ok(None)
1155                        }
1156                        Err(error) => {
1157                            error!(block_id, %tx_hash, log_id, %error, "error processing log in tx");
1158                            Err(error)
1159                        }
1160                    }
1161                })
1162            })
1163            .await
1164    }
1165}
1166
1167#[cfg(test)]
1168mod tests {
1169    use std::{
1170        sync::{Arc, atomic::Ordering},
1171        time::SystemTime,
1172    };
1173
1174    use alloy::{
1175        dyn_abi::DynSolValue,
1176        primitives::{Address as AlloyAddress, U256},
1177        sol_types::{SolEvent, SolValue},
1178    };
1179    use anyhow::{Context, anyhow};
1180    use hex_literal::hex;
1181    use hopr_chain_rpc::HoprIndexerRpcOperations;
1182    use hopr_chain_types::{
1183        ContractAddresses,
1184        chain_events::{ChainEventType, NetworkRegistryStatus},
1185    };
1186    use hopr_crypto_types::prelude::*;
1187    use hopr_db_sql::{
1188        HoprDbAllOperations, HoprDbGeneralModelOperations,
1189        accounts::{ChainOrPacketKey, HoprDbAccountOperations},
1190        api::{info::DomainSeparator, tickets::HoprDbTicketOperations},
1191        channels::HoprDbChannelOperations,
1192        corrupted_channels::HoprDbCorruptedChannelOperations,
1193        db::HoprDb,
1194        info::HoprDbInfoOperations,
1195        prelude::HoprDbResolverOperations,
1196        registry::HoprDbRegistryOperations,
1197    };
1198    use hopr_internal_types::prelude::*;
1199    use hopr_primitive_types::prelude::*;
1200    use multiaddr::Multiaddr;
1201    use primitive_types::H256;
1202
1203    use super::ContractEventHandlers;
1204
1205    lazy_static::lazy_static! {
1206        static ref SELF_PRIV_KEY: OffchainKeypair = OffchainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
1207        static ref COUNTERPARTY_CHAIN_KEY: ChainKeypair = ChainKeypair::random();
1208        static ref COUNTERPARTY_CHAIN_ADDRESS: Address = COUNTERPARTY_CHAIN_KEY.public().to_address();
1209        static ref SELF_CHAIN_KEY: ChainKeypair = ChainKeypair::random();
1210        static ref SELF_CHAIN_ADDRESS: Address = SELF_CHAIN_KEY.public().to_address();
1211        static ref STAKE_ADDRESS: Address = "4331eaa9542b6b034c43090d9ec1c2198758dbc3".parse().expect("lazy static address should be constructible");
1212        static ref CHANNELS_ADDR: Address = "bab20aea98368220baa4e3b7f151273ee71df93b".parse().expect("lazy static address should be constructible"); // just a dummy
1213        static ref TOKEN_ADDR: Address = "47d1677e018e79dcdd8a9c554466cb1556fa5007".parse().expect("lazy static address should be constructible"); // just a dummy
1214        static ref NETWORK_REGISTRY_ADDR: Address = "a469d0225f884fb989cbad4fe289f6fd2fb98051".parse().expect("lazy static address should be constructible"); // just a dummy
1215        static ref NODE_SAFE_REGISTRY_ADDR: Address = "0dcd1bf9a1b36ce34237eeafef220932846bcd82".parse().expect("lazy static address should be constructible"); // just a dummy
1216        static ref ANNOUNCEMENTS_ADDR: Address = "11db4791bf45ef31a10ea4a1b5cb90f46cc72c7e".parse().expect("lazy static address should be constructible"); // just a dummy
1217        static ref SAFE_MANAGEMENT_MODULE_ADDR: Address = "9b91245a65ad469163a86e32b2281af7a25f38ce".parse().expect("lazy static address should be constructible"); // just a dummy
1218        static ref SAFE_INSTANCE_ADDR: Address = "b93d7fdd605fb64fdcc87f21590f950170719d47".parse().expect("lazy static address should be constructible"); // just a dummy
1219        static ref TICKET_PRICE_ORACLE_ADDR: Address = "11db4391bf45ef31a10ea4a1b5cb90f46cc72c7e".parse().expect("lazy static address should be constructible"); // just a dummy
1220        static ref WIN_PROB_ORACLE_ADDR: Address = "00db4391bf45ef31a10ea4a1b5cb90f46cc64c7e".parse().expect("lazy static address should be constructible"); // just a dummy
1221    }
1222
1223    mockall::mock! {
1224        pub(crate) IndexerRpcOperations {}
1225
1226        #[async_trait::async_trait]
1227        impl HoprIndexerRpcOperations for IndexerRpcOperations {
1228            async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64>;
1229
1230        async fn get_hopr_allowance(&self, owner: Address, spender: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
1231
1232        async fn get_xdai_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<XDaiBalance>;
1233
1234        async fn get_hopr_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
1235
1236        fn try_stream_logs<'a>(
1237            &'a self,
1238            start_block_number: u64,
1239            filters: hopr_chain_rpc::FilterSet,
1240            is_synced: bool,
1241        ) -> hopr_chain_rpc::errors::Result<std::pin::Pin<Box<dyn futures::Stream<Item=hopr_chain_rpc::BlockWithLogs> + Send + 'a> > >;
1242        }
1243    }
1244
1245    #[derive(Clone)]
1246    pub struct ClonableMockOperations {
1247        pub inner: Arc<MockIndexerRpcOperations>,
1248    }
1249
1250    #[async_trait::async_trait]
1251    impl HoprIndexerRpcOperations for ClonableMockOperations {
1252        async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64> {
1253            self.inner.block_number().await
1254        }
1255
1256        async fn get_hopr_allowance(
1257            &self,
1258            owner: Address,
1259            spender: Address,
1260        ) -> hopr_chain_rpc::errors::Result<HoprBalance> {
1261            self.inner.get_hopr_allowance(owner, spender).await
1262        }
1263
1264        async fn get_xdai_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<XDaiBalance> {
1265            self.inner.get_xdai_balance(address).await
1266        }
1267
1268        async fn get_hopr_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<HoprBalance> {
1269            self.inner.get_hopr_balance(address).await
1270        }
1271
1272        fn try_stream_logs<'a>(
1273            &'a self,
1274            start_block_number: u64,
1275            filters: hopr_chain_rpc::FilterSet,
1276            is_synced: bool,
1277        ) -> hopr_chain_rpc::errors::Result<
1278            std::pin::Pin<Box<dyn futures::Stream<Item = hopr_chain_rpc::BlockWithLogs> + Send + 'a>>,
1279        > {
1280            self.inner.try_stream_logs(start_block_number, filters, is_synced)
1281        }
1282    }
1283
1284    fn init_handlers<T: Clone, Db: HoprDbAllOperations + Clone>(
1285        rpc_operations: T,
1286        db: Db,
1287    ) -> ContractEventHandlers<T, Db> {
1288        ContractEventHandlers {
1289            addresses: Arc::new(ContractAddresses {
1290                channels: *CHANNELS_ADDR,
1291                token: *TOKEN_ADDR,
1292                network_registry: *NETWORK_REGISTRY_ADDR,
1293                network_registry_proxy: Default::default(),
1294                safe_registry: *NODE_SAFE_REGISTRY_ADDR,
1295                announcements: *ANNOUNCEMENTS_ADDR,
1296                module_implementation: *SAFE_MANAGEMENT_MODULE_ADDR,
1297                price_oracle: *TICKET_PRICE_ORACLE_ADDR,
1298                win_prob_oracle: *WIN_PROB_ORACLE_ADDR,
1299                stake_factory: Default::default(),
1300            }),
1301            chain_key: SELF_CHAIN_KEY.clone(),
1302            safe_address: *STAKE_ADDRESS,
1303            db,
1304            rpc_operations,
1305        }
1306    }
1307
1308    fn test_log() -> SerializableLog {
1309        SerializableLog { ..Default::default() }
1310    }
1311
1312    #[tokio::test]
1313    async fn announce_keybinding() -> anyhow::Result<()> {
1314        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1315
1316        let rpc_operations = MockIndexerRpcOperations::new();
1317        // ==> set mock expectations here
1318        let clonable_rpc_operations = ClonableMockOperations {
1319            inner: Arc::new(rpc_operations),
1320        };
1321
1322        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1323
1324        let keybinding = KeyBinding::new(*SELF_CHAIN_ADDRESS, &SELF_PRIV_KEY);
1325
1326        let keybinding_log = SerializableLog {
1327            address: handlers.addresses.announcements,
1328            topics: vec![
1329                hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::KeyBinding::SIGNATURE_HASH.into(),
1330            ],
1331            data: DynSolValue::Tuple(vec![
1332                DynSolValue::Bytes(keybinding.signature.as_ref().to_vec()),
1333                DynSolValue::Bytes(keybinding.packet_key.as_ref().to_vec()),
1334                DynSolValue::FixedBytes(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref()).into_word(), 32),
1335            ])
1336            .abi_encode_packed(),
1337            ..test_log()
1338        };
1339
1340        let account_entry = AccountEntry {
1341            public_key: *SELF_PRIV_KEY.public(),
1342            chain_addr: *SELF_CHAIN_ADDRESS,
1343            entry_type: AccountType::NotAnnounced,
1344            published_at: 0,
1345        };
1346
1347        let event_type = db
1348            .begin_transaction()
1349            .await?
1350            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, keybinding_log, true).await }))
1351            .await?;
1352
1353        assert!(event_type.is_none(), "keybinding does not have a chain event type");
1354
1355        assert_eq!(
1356            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1357                .await?
1358                .context("a value should be present")?,
1359            account_entry
1360        );
1361        Ok(())
1362    }
1363
1364    #[tokio::test]
1365    async fn announce_address_announcement() -> anyhow::Result<()> {
1366        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1367        let rpc_operations = MockIndexerRpcOperations::new();
1368        // ==> set mock expectations here
1369        let clonable_rpc_operations = ClonableMockOperations {
1370            //
1371            inner: Arc::new(rpc_operations),
1372        };
1373        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1374
1375        // Assume that there is a keybinding
1376        let account_entry = AccountEntry {
1377            public_key: *SELF_PRIV_KEY.public(),
1378            chain_addr: *SELF_CHAIN_ADDRESS,
1379            entry_type: AccountType::NotAnnounced,
1380            published_at: 1,
1381        };
1382        db.insert_account(None, account_entry.clone()).await?;
1383
1384        let test_multiaddr_empty: Multiaddr = "".parse()?;
1385
1386        let address_announcement_empty_log_encoded_data = DynSolValue::Tuple(vec![
1387            DynSolValue::Address(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref())),
1388            DynSolValue::String(test_multiaddr_empty.to_string()),
1389        ])
1390        .abi_encode();
1391
1392        let address_announcement_empty_log = SerializableLog {
1393            address: handlers.addresses.announcements,
1394            topics: vec![
1395                hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH
1396                    .into(),
1397            ],
1398            data: address_announcement_empty_log_encoded_data[32..].into(),
1399            ..test_log()
1400        };
1401
1402        let handlers_clone = handlers.clone();
1403        let event_type = db
1404            .begin_transaction()
1405            .await?
1406            .perform(|tx| {
1407                Box::pin(async move {
1408                    handlers_clone
1409                        .process_log_event(tx, address_announcement_empty_log, true)
1410                        .await
1411                })
1412            })
1413            .await?;
1414
1415        assert!(
1416            event_type.is_none(),
1417            "announcement of empty multiaddresses must pass through"
1418        );
1419
1420        assert_eq!(
1421            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1422                .await?
1423                .context("a value should be present")?,
1424            account_entry
1425        );
1426
1427        let test_multiaddr: Multiaddr = "/ip4/1.2.3.4/tcp/56".parse()?;
1428
1429        let address_announcement_log_encoded_data = DynSolValue::Tuple(vec![
1430            DynSolValue::Address(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref())),
1431            DynSolValue::String(test_multiaddr.to_string()),
1432        ])
1433        .abi_encode();
1434
1435        let address_announcement_log = SerializableLog {
1436            address: handlers.addresses.announcements,
1437            block_number: 1,
1438            topics: vec![
1439                hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH
1440                    .into(),
1441            ],
1442            data: address_announcement_log_encoded_data[32..].into(),
1443            ..test_log()
1444        };
1445
1446        let announced_account_entry = AccountEntry {
1447            public_key: *SELF_PRIV_KEY.public(),
1448            chain_addr: *SELF_CHAIN_ADDRESS,
1449            entry_type: AccountType::Announced {
1450                multiaddr: test_multiaddr.clone(),
1451                updated_block: 1,
1452            },
1453            published_at: 1,
1454        };
1455
1456        let handlers_clone = handlers.clone();
1457        let event_type = db
1458            .begin_transaction()
1459            .await?
1460            .perform(|tx| {
1461                Box::pin(async move {
1462                    handlers_clone
1463                        .process_log_event(tx, address_announcement_log, true)
1464                        .await
1465                })
1466            })
1467            .await?;
1468
1469        assert!(
1470            matches!(event_type, Some(ChainEventType::Announcement { multiaddresses,.. }) if multiaddresses == vec![test_multiaddr]),
1471            "must return the latest announce multiaddress"
1472        );
1473
1474        assert_eq!(
1475            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1476                .await?
1477                .context("a value should be present")?,
1478            announced_account_entry
1479        );
1480
1481        assert_eq!(
1482            Some(*SELF_CHAIN_ADDRESS),
1483            db.resolve_chain_key(SELF_PRIV_KEY.public()).await?,
1484            "must resolve correct chain key"
1485        );
1486
1487        assert_eq!(
1488            Some(*SELF_PRIV_KEY.public()),
1489            db.resolve_packet_key(&SELF_CHAIN_ADDRESS).await?,
1490            "must resolve correct packet key"
1491        );
1492
1493        let test_multiaddr_dns: Multiaddr = "/dns4/useful.domain/tcp/56".parse()?;
1494
1495        let address_announcement_dns_log_encoded_data = DynSolValue::Tuple(vec![
1496            DynSolValue::Address(AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref())),
1497            DynSolValue::String(test_multiaddr_dns.to_string()),
1498        ])
1499        .abi_encode();
1500
1501        let address_announcement_dns_log = SerializableLog {
1502            address: handlers.addresses.announcements,
1503            block_number: 2,
1504            topics: vec![
1505                hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH
1506                    .into(),
1507            ],
1508            data: address_announcement_dns_log_encoded_data[32..].into(),
1509            ..test_log()
1510        };
1511
1512        let announced_dns_account_entry = AccountEntry {
1513            public_key: *SELF_PRIV_KEY.public(),
1514            chain_addr: *SELF_CHAIN_ADDRESS,
1515            entry_type: AccountType::Announced {
1516                multiaddr: test_multiaddr_dns.clone(),
1517                updated_block: 2,
1518            },
1519            published_at: 1,
1520        };
1521
1522        let event_type = db
1523            .begin_transaction()
1524            .await?
1525            .perform(|tx| {
1526                Box::pin(async move { handlers.process_log_event(tx, address_announcement_dns_log, true).await })
1527            })
1528            .await?;
1529
1530        assert!(
1531            matches!(event_type, Some(ChainEventType::Announcement { multiaddresses,.. }) if multiaddresses == vec![test_multiaddr_dns]),
1532            "must return the latest announce multiaddress"
1533        );
1534
1535        assert_eq!(
1536            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1537                .await?
1538                .context("a value should be present")?,
1539            announced_dns_account_entry
1540        );
1541
1542        assert_eq!(
1543            Some(*SELF_CHAIN_ADDRESS),
1544            db.resolve_chain_key(SELF_PRIV_KEY.public()).await?,
1545            "must resolve correct chain key"
1546        );
1547
1548        assert_eq!(
1549            Some(*SELF_PRIV_KEY.public()),
1550            db.resolve_packet_key(&SELF_CHAIN_ADDRESS).await?,
1551            "must resolve correct packet key"
1552        );
1553        Ok(())
1554    }
1555
1556    #[tokio::test]
1557    async fn announce_revoke() -> anyhow::Result<()> {
1558        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1559        let rpc_operations = MockIndexerRpcOperations::new();
1560        // ==> set mock expectations here
1561        let clonable_rpc_operations = ClonableMockOperations {
1562            //
1563            inner: Arc::new(rpc_operations),
1564        };
1565        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1566
1567        let test_multiaddr: Multiaddr = "/ip4/1.2.3.4/tcp/56".parse()?;
1568
1569        // Assume that there is a keybinding and an address announcement
1570        let announced_account_entry = AccountEntry {
1571            public_key: *SELF_PRIV_KEY.public(),
1572            chain_addr: *SELF_CHAIN_ADDRESS,
1573            entry_type: AccountType::Announced {
1574                multiaddr: test_multiaddr,
1575                updated_block: 0,
1576            },
1577            published_at: 1,
1578        };
1579
1580        db.insert_account(None, announced_account_entry).await?;
1581
1582        let encoded_data = (AlloyAddress::from_slice(SELF_CHAIN_ADDRESS.as_ref()),).abi_encode();
1583
1584        let revoke_announcement_log = SerializableLog {
1585            address: handlers.addresses.announcements,
1586            topics: vec![
1587                hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::RevokeAnnouncement::SIGNATURE_HASH
1588                    .into(),
1589            ],
1590            data: encoded_data,
1591            ..test_log()
1592        };
1593
1594        let account_entry = AccountEntry {
1595            public_key: *SELF_PRIV_KEY.public(),
1596            chain_addr: *SELF_CHAIN_ADDRESS,
1597            entry_type: AccountType::NotAnnounced,
1598            published_at: 1,
1599        };
1600
1601        let event_type = db
1602            .begin_transaction()
1603            .await?
1604            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, revoke_announcement_log, true).await }))
1605            .await?;
1606
1607        assert!(
1608            event_type.is_none(),
1609            "revoke announcement does not have chain event type"
1610        );
1611
1612        assert_eq!(
1613            db.get_account(None, ChainOrPacketKey::ChainKey(*SELF_CHAIN_ADDRESS))
1614                .await?
1615                .context("a value should be present")?,
1616            account_entry
1617        );
1618        Ok(())
1619    }
1620
1621    #[tokio::test]
1622    async fn on_token_transfer_to() -> anyhow::Result<()> {
1623        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1624
1625        let value = U256::MAX;
1626        let target_hopr_balance = HoprBalance::from(primitive_types::U256::from_big_endian(
1627            value.to_be_bytes_vec().as_slice(),
1628        ));
1629
1630        let mut rpc_operations = MockIndexerRpcOperations::new();
1631        rpc_operations
1632            .expect_get_hopr_balance()
1633            .times(1)
1634            .return_once(move |_| Ok(target_hopr_balance));
1635        rpc_operations
1636            .expect_get_hopr_allowance()
1637            .times(1)
1638            .returning(move |_, _| Ok(HoprBalance::from(primitive_types::U256::from(1000u64))));
1639        let clonable_rpc_operations = ClonableMockOperations {
1640            inner: Arc::new(rpc_operations),
1641        };
1642
1643        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1644
1645        let encoded_data = (value).abi_encode();
1646
1647        let transferred_log = SerializableLog {
1648            address: handlers.addresses.token,
1649            topics: vec![
1650                hopr_bindings::hoprtoken::HoprToken::Transfer::SIGNATURE_HASH.into(),
1651                H256::from_slice(&Address::default().to_bytes32()).into(),
1652                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1653            ],
1654            data: encoded_data,
1655            ..test_log()
1656        };
1657
1658        let event_type = db
1659            .begin_transaction()
1660            .await?
1661            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, transferred_log, true).await }))
1662            .await?;
1663
1664        assert!(event_type.is_none(), "token transfer does not have chain event type");
1665
1666        assert_eq!(db.get_safe_hopr_balance(None).await?, target_hopr_balance);
1667
1668        Ok(())
1669    }
1670
1671    #[test_log::test(tokio::test)]
1672    async fn on_token_transfer_from() -> anyhow::Result<()> {
1673        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1674
1675        let mut rpc_operations = MockIndexerRpcOperations::new();
1676        rpc_operations
1677            .expect_get_hopr_balance()
1678            .times(1)
1679            .return_once(|_| Ok(HoprBalance::zero()));
1680        rpc_operations
1681            .expect_get_hopr_allowance()
1682            .times(1)
1683            .returning(move |_, _| Ok(HoprBalance::from(primitive_types::U256::from(1000u64))));
1684        let clonable_rpc_operations = ClonableMockOperations {
1685            inner: Arc::new(rpc_operations),
1686        };
1687
1688        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1689
1690        let value = U256::MAX;
1691
1692        let encoded_data = (value).abi_encode();
1693
1694        db.set_safe_hopr_balance(
1695            None,
1696            HoprBalance::from(primitive_types::U256::from_big_endian(
1697                value.to_be_bytes_vec().as_slice(),
1698            )),
1699        )
1700        .await?;
1701
1702        let transferred_log = SerializableLog {
1703            address: handlers.addresses.token,
1704            topics: vec![
1705                hopr_bindings::hoprtoken::HoprToken::Transfer::SIGNATURE_HASH.into(),
1706                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1707                H256::from_slice(&Address::default().to_bytes32()).into(),
1708            ],
1709            data: encoded_data,
1710            ..test_log()
1711        };
1712
1713        let event_type = db
1714            .begin_transaction()
1715            .await?
1716            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, transferred_log, true).await }))
1717            .await?;
1718
1719        assert!(event_type.is_none(), "token transfer does not have chain event type");
1720
1721        assert_eq!(db.get_safe_hopr_balance(None).await?, HoprBalance::zero());
1722
1723        Ok(())
1724    }
1725
1726    #[tokio::test]
1727    async fn on_token_approval_correct() -> anyhow::Result<()> {
1728        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1729
1730        let target_allowance = HoprBalance::from(primitive_types::U256::from(1000u64));
1731        let mut rpc_operations = MockIndexerRpcOperations::new();
1732        rpc_operations
1733            .expect_get_hopr_allowance()
1734            .times(2)
1735            .returning(move |_, _| Ok(target_allowance));
1736        let clonable_rpc_operations = ClonableMockOperations {
1737            inner: Arc::new(rpc_operations),
1738        };
1739
1740        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1741
1742        let encoded_data = (U256::from(1000u64)).abi_encode();
1743
1744        let approval_log = SerializableLog {
1745            address: handlers.addresses.token,
1746            topics: vec![
1747                hopr_bindings::hoprtoken::HoprToken::Approval::SIGNATURE_HASH.into(),
1748                H256::from_slice(&handlers.safe_address.to_bytes32()).into(),
1749                H256::from_slice(&handlers.addresses.channels.to_bytes32()).into(),
1750            ],
1751            data: encoded_data,
1752            ..test_log()
1753        };
1754
1755        // before any operation the allowance should be 0
1756        assert_eq!(db.get_safe_hopr_allowance(None).await?, HoprBalance::zero());
1757
1758        let approval_log_clone = approval_log.clone();
1759        let handlers_clone = handlers.clone();
1760        let event_type = db
1761            .begin_transaction()
1762            .await?
1763            .perform(|tx| Box::pin(async move { handlers_clone.process_log_event(tx, approval_log_clone, true).await }))
1764            .await?;
1765
1766        assert!(event_type.is_none(), "token approval does not have chain event type");
1767
1768        // after processing the allowance should be 0
1769        assert_eq!(db.get_safe_hopr_allowance(None).await?, target_allowance.clone());
1770
1771        // reduce allowance manually to verify a second time
1772        let _ = db
1773            .set_safe_hopr_allowance(None, HoprBalance::from(primitive_types::U256::from(10u64)))
1774            .await;
1775        assert_eq!(
1776            db.get_safe_hopr_allowance(None).await?,
1777            HoprBalance::from(primitive_types::U256::from(10u64))
1778        );
1779
1780        let handlers_clone = handlers.clone();
1781        let event_type = db
1782            .begin_transaction()
1783            .await?
1784            .perform(|tx| Box::pin(async move { handlers_clone.process_log_event(tx, approval_log, true).await }))
1785            .await?;
1786
1787        assert!(event_type.is_none(), "token approval does not have chain event type");
1788
1789        assert_eq!(db.get_safe_hopr_allowance(None).await?, target_allowance);
1790        Ok(())
1791    }
1792
1793    #[tokio::test]
1794    async fn on_network_registry_event_registered() -> anyhow::Result<()> {
1795        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1796        let rpc_operations = MockIndexerRpcOperations::new();
1797        // ==> set mock expectations here
1798        let clonable_rpc_operations = ClonableMockOperations {
1799            //
1800            inner: Arc::new(rpc_operations),
1801        };
1802        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1803
1804        let encoded_data = ().abi_encode();
1805
1806        let registered_log = SerializableLog {
1807            address: handlers.addresses.network_registry,
1808            topics: vec![
1809                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::Registered::SIGNATURE_HASH.into(),
1810                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1811                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1812            ],
1813            data: encoded_data,
1814            // data: encode(&[]).into(),
1815            ..test_log()
1816        };
1817
1818        assert!(
1819            !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1820                .await?
1821        );
1822
1823        let event_type = db
1824            .begin_transaction()
1825            .await?
1826            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1827            .await?;
1828
1829        assert!(
1830            matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Allowed),
1831            "must return correct NR update"
1832        );
1833
1834        assert!(
1835            db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1836                .await?,
1837            "must be allowed in NR"
1838        );
1839        Ok(())
1840    }
1841
1842    #[tokio::test]
1843    async fn on_network_registry_event_registered_by_manager() -> anyhow::Result<()> {
1844        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1845        let rpc_operations = MockIndexerRpcOperations::new();
1846        // ==> set mock expectations here
1847        let clonable_rpc_operations = ClonableMockOperations {
1848            //
1849            inner: Arc::new(rpc_operations),
1850        };
1851        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1852
1853        let registered_log = SerializableLog {
1854            address: handlers.addresses.network_registry,
1855            topics: vec![
1856                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::RegisteredByManager::SIGNATURE_HASH.into(),
1857                // RegisteredByManagerFilter::signature().into(),
1858                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1859                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1860            ],
1861            data: ().abi_encode(),
1862            // data: encode(&[]).into(),
1863            ..test_log()
1864        };
1865
1866        assert!(
1867            !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1868                .await?
1869        );
1870
1871        let event_type = db
1872            .begin_transaction()
1873            .await?
1874            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1875            .await?;
1876
1877        assert!(
1878            matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Allowed),
1879            "must return correct NR update"
1880        );
1881
1882        assert!(
1883            db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1884                .await?,
1885            "must be allowed in NR"
1886        );
1887        Ok(())
1888    }
1889
1890    #[tokio::test]
1891    async fn on_network_registry_event_deregistered() -> anyhow::Result<()> {
1892        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1893        let rpc_operations = MockIndexerRpcOperations::new();
1894        // ==> set mock expectations here
1895        let clonable_rpc_operations = ClonableMockOperations {
1896            //
1897            inner: Arc::new(rpc_operations),
1898        };
1899        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1900
1901        db.set_access_in_network_registry(None, *SELF_CHAIN_ADDRESS, true)
1902            .await?;
1903
1904        let encoded_data = ().abi_encode();
1905
1906        let registered_log = SerializableLog {
1907            address: handlers.addresses.network_registry,
1908            topics: vec![
1909                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::Deregistered::SIGNATURE_HASH.into(),
1910                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1911                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1912            ],
1913            data: encoded_data,
1914            ..test_log()
1915        };
1916
1917        assert!(
1918            db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1919                .await?
1920        );
1921
1922        let event_type = db
1923            .begin_transaction()
1924            .await?
1925            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1926            .await?;
1927
1928        assert!(
1929            matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Denied),
1930            "must return correct NR update"
1931        );
1932
1933        assert!(
1934            !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1935                .await?,
1936            "must not be allowed in NR"
1937        );
1938        Ok(())
1939    }
1940
1941    #[tokio::test]
1942    async fn on_network_registry_event_deregistered_by_manager() -> anyhow::Result<()> {
1943        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1944        let rpc_operations = MockIndexerRpcOperations::new();
1945        // ==> set mock expectations here
1946        let clonable_rpc_operations = ClonableMockOperations {
1947            //
1948            inner: Arc::new(rpc_operations),
1949        };
1950        let handlers = init_handlers(clonable_rpc_operations, db.clone());
1951
1952        db.set_access_in_network_registry(None, *SELF_CHAIN_ADDRESS, true)
1953            .await?;
1954
1955        let encoded_data = ().abi_encode();
1956
1957        let registered_log = SerializableLog {
1958            address: handlers.addresses.network_registry,
1959            topics: vec![
1960                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::DeregisteredByManager::SIGNATURE_HASH.into(),
1961                // DeregisteredByManagerFilter::signature().into(),
1962                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
1963                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
1964            ],
1965            data: encoded_data,
1966            ..test_log()
1967        };
1968
1969        assert!(
1970            db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1971                .await?
1972        );
1973
1974        let event_type = db
1975            .begin_transaction()
1976            .await?
1977            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, registered_log, false).await }))
1978            .await?;
1979
1980        assert!(
1981            matches!(event_type, Some(ChainEventType::NetworkRegistryUpdate(a, s)) if a == *SELF_CHAIN_ADDRESS && s == NetworkRegistryStatus::Denied),
1982            "must return correct NR update"
1983        );
1984
1985        assert!(
1986            !db.is_allowed_in_network_registry(None, &SELF_CHAIN_ADDRESS.as_ref())
1987                .await?,
1988            "must not be allowed in NR"
1989        );
1990        Ok(())
1991    }
1992
1993    #[tokio::test]
1994    async fn on_network_registry_event_enabled() -> anyhow::Result<()> {
1995        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
1996        let rpc_operations = MockIndexerRpcOperations::new();
1997        // ==> set mock expectations here
1998        let clonable_rpc_operations = ClonableMockOperations {
1999            //
2000            inner: Arc::new(rpc_operations),
2001        };
2002        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2003
2004        let encoded_data = ().abi_encode();
2005
2006        let nr_enabled = SerializableLog {
2007            address: handlers.addresses.network_registry,
2008            topics: vec![
2009                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::NetworkRegistryStatusUpdated::SIGNATURE_HASH
2010                    .into(),
2011                // NetworkRegistryStatusUpdatedFilter::signature().into(),
2012                H256::from_low_u64_be(1).into(),
2013            ],
2014            data: encoded_data,
2015            ..test_log()
2016        };
2017
2018        let event_type = db
2019            .begin_transaction()
2020            .await?
2021            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, nr_enabled, false).await }))
2022            .await?;
2023
2024        assert!(event_type.is_none(), "there's no chain event type for nr disable");
2025
2026        assert!(db.get_indexer_data(None).await?.nr_enabled);
2027        Ok(())
2028    }
2029
2030    #[tokio::test]
2031    async fn on_network_registry_event_disabled() -> anyhow::Result<()> {
2032        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2033        let rpc_operations = MockIndexerRpcOperations::new();
2034        // ==> set mock expectations here
2035        let clonable_rpc_operations = ClonableMockOperations {
2036            //
2037            inner: Arc::new(rpc_operations),
2038        };
2039        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2040
2041        db.set_network_registry_enabled(None, true).await?;
2042
2043        let encoded_data = ().abi_encode();
2044
2045        let nr_disabled = SerializableLog {
2046            address: handlers.addresses.network_registry,
2047            topics: vec![
2048                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::NetworkRegistryStatusUpdated::SIGNATURE_HASH
2049                    .into(),
2050                // NetworkRegistryStatusUpdatedFilter::signature().into(),
2051                H256::from_low_u64_be(0).into(),
2052            ],
2053            data: encoded_data,
2054            ..test_log()
2055        };
2056
2057        let event_type = db
2058            .begin_transaction()
2059            .await?
2060            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, nr_disabled, false).await }))
2061            .await?;
2062
2063        assert!(event_type.is_none(), "there's no chain event type for nr enable");
2064
2065        assert!(!db.get_indexer_data(None).await?.nr_enabled);
2066        Ok(())
2067    }
2068
2069    #[tokio::test]
2070    async fn on_network_registry_set_eligible() -> anyhow::Result<()> {
2071        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2072        let rpc_operations = MockIndexerRpcOperations::new();
2073        // ==> set mock expectations here
2074        let clonable_rpc_operations = ClonableMockOperations {
2075            //
2076            inner: Arc::new(rpc_operations),
2077        };
2078        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2079
2080        let encoded_data = ().abi_encode();
2081
2082        let set_eligible = SerializableLog {
2083            address: handlers.addresses.network_registry,
2084            topics: vec![
2085                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::EligibilityUpdated::SIGNATURE_HASH.into(),
2086                // EligibilityUpdatedFilter::signature().into(),
2087                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
2088                H256::from_low_u64_be(1).into(),
2089            ],
2090            data: encoded_data,
2091            ..test_log()
2092        };
2093
2094        let event_type = db
2095            .begin_transaction()
2096            .await?
2097            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, set_eligible, false).await }))
2098            .await?;
2099
2100        assert!(
2101            event_type.is_none(),
2102            "there's no chain event type for setting nr eligibility"
2103        );
2104
2105        assert!(db.is_safe_eligible(None, *STAKE_ADDRESS).await?);
2106
2107        Ok(())
2108    }
2109
2110    #[tokio::test]
2111    async fn on_network_registry_set_not_eligible() -> anyhow::Result<()> {
2112        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2113        let rpc_operations = MockIndexerRpcOperations::new();
2114        // ==> set mock expectations here
2115        let clonable_rpc_operations = ClonableMockOperations {
2116            //
2117            inner: Arc::new(rpc_operations),
2118        };
2119        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2120
2121        db.set_safe_eligibility(None, *STAKE_ADDRESS, false).await?;
2122
2123        let encoded_data = ().abi_encode();
2124
2125        let set_eligible = SerializableLog {
2126            address: handlers.addresses.network_registry,
2127            topics: vec![
2128                hopr_bindings::hoprnetworkregistry::HoprNetworkRegistry::EligibilityUpdated::SIGNATURE_HASH.into(),
2129                // EligibilityUpdatedFilter::signature().into(),
2130                H256::from_slice(&STAKE_ADDRESS.to_bytes32()).into(),
2131                H256::from_low_u64_be(0).into(),
2132            ],
2133            data: encoded_data,
2134            ..test_log()
2135        };
2136
2137        let event_type = db
2138            .begin_transaction()
2139            .await?
2140            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, set_eligible, true).await }))
2141            .await?;
2142
2143        assert!(
2144            event_type.is_none(),
2145            "there's no chain event type for unsetting nr eligibility"
2146        );
2147
2148        assert!(!db.is_safe_eligible(None, *STAKE_ADDRESS).await?);
2149
2150        Ok(())
2151    }
2152
2153    #[tokio::test]
2154    async fn on_channel_event_balance_increased() -> anyhow::Result<()> {
2155        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2156
2157        let value = U256::MAX;
2158        let target_hopr_balance = HoprBalance::from(primitive_types::U256::from_big_endian(
2159            value.to_be_bytes_vec().as_slice(),
2160        ));
2161
2162        let mut rpc_operations = MockIndexerRpcOperations::new();
2163        rpc_operations
2164            .expect_get_hopr_balance()
2165            .times(1)
2166            .return_once(move |_| Ok(target_hopr_balance));
2167        rpc_operations
2168            .expect_get_hopr_allowance()
2169            .times(1)
2170            .returning(move |_, _| Ok(HoprBalance::from(primitive_types::U256::from(1000u64))));
2171        let clonable_rpc_operations = ClonableMockOperations {
2172            inner: Arc::new(rpc_operations),
2173        };
2174        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2175
2176        let channel = ChannelEntry::new(
2177            *SELF_CHAIN_ADDRESS,
2178            *COUNTERPARTY_CHAIN_ADDRESS,
2179            0.into(),
2180            primitive_types::U256::zero(),
2181            ChannelStatus::Open,
2182            primitive_types::U256::one(),
2183        );
2184
2185        db.upsert_channel(None, channel).await?;
2186
2187        let solidity_balance: HoprBalance = primitive_types::U256::from((1u128 << 96) - 1).into();
2188        let diff = solidity_balance - channel.balance;
2189
2190        let encoded_data = (solidity_balance.amount().to_be_bytes()).abi_encode();
2191
2192        let balance_increased_log = SerializableLog {
2193            address: handlers.addresses.channels,
2194            topics: vec![
2195                hopr_bindings::hoprchannels::HoprChannels::ChannelBalanceIncreased::SIGNATURE_HASH.into(),
2196                // ChannelBalanceIncreasedFilter::signature().into(),
2197                H256::from_slice(channel.get_id().as_ref()).into(),
2198            ],
2199            data: encoded_data,
2200            ..test_log()
2201        };
2202
2203        let event_type = db
2204            .begin_transaction()
2205            .await?
2206            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_increased_log, true).await }))
2207            .await?;
2208
2209        let channel = db
2210            .get_channel_by_id(None, &channel.get_id())
2211            .await?
2212            .context("a value should be present")?;
2213
2214        assert!(
2215            matches!(event_type, Some(ChainEventType::ChannelBalanceIncreased(c, b)) if c == channel && b == diff),
2216            "must return updated channel entry and balance diff"
2217        );
2218
2219        assert_eq!(solidity_balance, channel.balance, "balance must be updated");
2220        Ok(())
2221    }
2222
2223    #[tokio::test]
2224    async fn on_channel_event_domain_separator_updated() -> anyhow::Result<()> {
2225        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2226        let rpc_operations = MockIndexerRpcOperations::new();
2227        // ==> set mock expectations here
2228        let clonable_rpc_operations = ClonableMockOperations {
2229            //
2230            inner: Arc::new(rpc_operations),
2231        };
2232        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2233
2234        let separator = Hash::from(hopr_crypto_random::random_bytes());
2235
2236        let encoded_data = ().abi_encode();
2237
2238        let channels_dst_updated = SerializableLog {
2239            address: handlers.addresses.channels,
2240            topics: vec![
2241                hopr_bindings::hoprchannels::HoprChannels::DomainSeparatorUpdated::SIGNATURE_HASH.into(),
2242                // DomainSeparatorUpdatedFilter::signature().into(),
2243                H256::from_slice(separator.as_ref()).into(),
2244            ],
2245            data: encoded_data,
2246            ..test_log()
2247        };
2248
2249        assert!(db.get_indexer_data(None).await?.channels_dst.is_none());
2250
2251        let event_type = db
2252            .begin_transaction()
2253            .await?
2254            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channels_dst_updated, true).await }))
2255            .await?;
2256
2257        assert!(
2258            event_type.is_none(),
2259            "there's no chain event type for channel dst update"
2260        );
2261
2262        assert_eq!(
2263            separator,
2264            db.get_indexer_data(None)
2265                .await?
2266                .channels_dst
2267                .context("a value should be present")?,
2268            "separator must be updated"
2269        );
2270        Ok(())
2271    }
2272
2273    #[tokio::test]
2274    async fn on_channel_event_balance_decreased() -> anyhow::Result<()> {
2275        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2276
2277        let value = U256::MAX;
2278        let target_hopr_balance = HoprBalance::from(primitive_types::U256::from_big_endian(
2279            value.to_be_bytes_vec().as_slice(),
2280        ));
2281
2282        let mut rpc_operations = MockIndexerRpcOperations::new();
2283        rpc_operations
2284            .expect_get_hopr_balance()
2285            .times(1)
2286            .return_once(move |_| Ok(target_hopr_balance));
2287        let clonable_rpc_operations = ClonableMockOperations {
2288            inner: Arc::new(rpc_operations),
2289        };
2290        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2291
2292        let channel = ChannelEntry::new(
2293            *SELF_CHAIN_ADDRESS,
2294            *COUNTERPARTY_CHAIN_ADDRESS,
2295            HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1)),
2296            primitive_types::U256::zero(),
2297            ChannelStatus::Open,
2298            primitive_types::U256::one(),
2299        );
2300
2301        db.upsert_channel(None, channel).await?;
2302
2303        let solidity_balance: HoprBalance = primitive_types::U256::from((1u128 << 96) - 2).into();
2304        let diff = channel.balance - solidity_balance;
2305
2306        // let encoded_data = (solidity_balance).abi_encode();
2307        let encoded_data = DynSolValue::Tuple(vec![DynSolValue::Uint(
2308            U256::from_be_slice(&solidity_balance.amount().to_be_bytes()),
2309            256,
2310        )])
2311        .abi_encode();
2312
2313        let balance_decreased_log = SerializableLog {
2314            address: handlers.addresses.channels,
2315            topics: vec![
2316                hopr_bindings::hoprchannels::HoprChannels::ChannelBalanceDecreased::SIGNATURE_HASH.into(),
2317                // ChannelBalanceDecreasedFilter::signature().into(),
2318                H256::from_slice(channel.get_id().as_ref()).into(),
2319            ],
2320            data: encoded_data,
2321            ..test_log()
2322        };
2323
2324        let event_type = db
2325            .begin_transaction()
2326            .await?
2327            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_decreased_log, true).await }))
2328            .await?;
2329
2330        let channel = db
2331            .get_channel_by_id(None, &channel.get_id())
2332            .await?
2333            .context("a value should be present")?;
2334
2335        assert!(
2336            matches!(event_type, Some(ChainEventType::ChannelBalanceDecreased(c, b)) if c == channel && b == diff),
2337            "must return updated channel entry and balance diff"
2338        );
2339
2340        assert_eq!(solidity_balance, channel.balance, "balance must be updated");
2341        Ok(())
2342    }
2343
2344    #[tokio::test]
2345    async fn on_channel_closed() -> anyhow::Result<()> {
2346        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2347        let rpc_operations = MockIndexerRpcOperations::new();
2348        // ==> set mock expectations here
2349        let clonable_rpc_operations = ClonableMockOperations {
2350            //
2351            inner: Arc::new(rpc_operations),
2352        };
2353        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2354
2355        let starting_balance = HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1));
2356
2357        let channel = ChannelEntry::new(
2358            *SELF_CHAIN_ADDRESS,
2359            *COUNTERPARTY_CHAIN_ADDRESS,
2360            starting_balance,
2361            primitive_types::U256::zero(),
2362            ChannelStatus::Open,
2363            primitive_types::U256::one(),
2364        );
2365
2366        db.upsert_channel(None, channel).await?;
2367
2368        let encoded_data = ().abi_encode();
2369
2370        let channel_closed_log = SerializableLog {
2371            address: handlers.addresses.channels,
2372            topics: vec![
2373                hopr_bindings::hoprchannels::HoprChannels::ChannelClosed::SIGNATURE_HASH.into(),
2374                // ChannelClosedFilter::signature().into(),
2375                H256::from_slice(channel.get_id().as_ref()).into(),
2376            ],
2377            data: encoded_data,
2378            ..test_log()
2379        };
2380
2381        let event_type = db
2382            .begin_transaction()
2383            .await?
2384            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_closed_log, true).await }))
2385            .await?;
2386
2387        let closed_channel = db
2388            .get_channel_by_id(None, &channel.get_id())
2389            .await?
2390            .context("a value should be present")?;
2391
2392        assert!(
2393            matches!(event_type, Some(ChainEventType::ChannelClosed(c)) if c == closed_channel),
2394            "must return the updated channel entry"
2395        );
2396
2397        assert_eq!(closed_channel.status, ChannelStatus::Closed);
2398        assert_eq!(closed_channel.ticket_index, 0u64.into());
2399        assert_eq!(
2400            0,
2401            db.get_outgoing_ticket_index(closed_channel.get_id())
2402                .await?
2403                .load(Ordering::Relaxed)
2404        );
2405
2406        assert!(closed_channel.balance.amount().eq(&primitive_types::U256::zero()));
2407        Ok(())
2408    }
2409
2410    #[tokio::test]
2411    async fn on_foreign_channel_closed() -> anyhow::Result<()> {
2412        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2413        let rpc_operations = MockIndexerRpcOperations::new();
2414        // ==> set mock expectations here
2415        let clonable_rpc_operations = ClonableMockOperations {
2416            //
2417            inner: Arc::new(rpc_operations),
2418        };
2419        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2420
2421        let starting_balance = HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1));
2422
2423        let channel = ChannelEntry::new(
2424            Address::new(&hex!("B7397C218766eBe6A1A634df523A1a7e412e67eA")),
2425            Address::new(&hex!("D4fdec44DB9D44B8f2b6d529620f9C0C7066A2c1")),
2426            starting_balance,
2427            primitive_types::U256::zero(),
2428            ChannelStatus::Open,
2429            primitive_types::U256::one(),
2430        );
2431
2432        db.upsert_channel(None, channel).await?;
2433
2434        let encoded_data = ().abi_encode();
2435
2436        let channel_closed_log = SerializableLog {
2437            address: handlers.addresses.channels,
2438            topics: vec![
2439                hopr_bindings::hoprchannels::HoprChannels::ChannelClosed::SIGNATURE_HASH.into(),
2440                // ChannelClosedFilter::signature().into(),
2441                H256::from_slice(channel.get_id().as_ref()).into(),
2442            ],
2443            data: encoded_data,
2444            ..test_log()
2445        };
2446
2447        let event_type = db
2448            .begin_transaction()
2449            .await?
2450            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_closed_log, true).await }))
2451            .await?;
2452
2453        let closed_channel = db.get_channel_by_id(None, &channel.get_id()).await?;
2454
2455        assert_eq!(None, closed_channel, "foreign channel must be deleted");
2456
2457        assert!(
2458            matches!(event_type, Some(ChainEventType::ChannelClosed(c)) if c.get_id() == channel.get_id()),
2459            "must return the closed channel entry"
2460        );
2461
2462        Ok(())
2463    }
2464
2465    #[tokio::test]
2466    async fn on_channel_opened() -> anyhow::Result<()> {
2467        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2468        let rpc_operations = MockIndexerRpcOperations::new();
2469        // ==> set mock expectations here
2470        let clonable_rpc_operations = ClonableMockOperations {
2471            //
2472            inner: Arc::new(rpc_operations),
2473        };
2474        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2475
2476        let channel_id = generate_channel_id(&SELF_CHAIN_ADDRESS, &COUNTERPARTY_CHAIN_ADDRESS);
2477
2478        let encoded_data = ().abi_encode();
2479
2480        let channel_opened_log = SerializableLog {
2481            address: handlers.addresses.channels,
2482            topics: vec![
2483                hopr_bindings::hoprchannels::HoprChannels::ChannelOpened::SIGNATURE_HASH.into(),
2484                // ChannelOpenedFilter::signature().into(),
2485                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2486                H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2487            ],
2488            data: encoded_data,
2489            ..test_log()
2490        };
2491
2492        let event_type = db
2493            .begin_transaction()
2494            .await?
2495            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log, true).await }))
2496            .await?;
2497
2498        let channel = db
2499            .get_channel_by_id(None, &channel_id)
2500            .await?
2501            .context("a value should be present")?;
2502
2503        assert!(
2504            matches!(event_type, Some(ChainEventType::ChannelOpened(c)) if c == channel),
2505            "must return the updated channel entry"
2506        );
2507
2508        assert_eq!(channel.status, ChannelStatus::Open);
2509        assert_eq!(channel.channel_epoch, 1u64.into());
2510        assert_eq!(channel.ticket_index, 0u64.into());
2511        assert_eq!(
2512            0,
2513            db.get_outgoing_ticket_index(channel.get_id())
2514                .await?
2515                .load(Ordering::Relaxed)
2516        );
2517        Ok(())
2518    }
2519
2520    #[tokio::test]
2521    async fn on_channel_reopened() -> anyhow::Result<()> {
2522        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2523        let rpc_operations = MockIndexerRpcOperations::new();
2524        // ==> set mock expectations here
2525        let clonable_rpc_operations = ClonableMockOperations {
2526            //
2527            inner: Arc::new(rpc_operations),
2528        };
2529        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2530
2531        let channel = ChannelEntry::new(
2532            *SELF_CHAIN_ADDRESS,
2533            *COUNTERPARTY_CHAIN_ADDRESS,
2534            HoprBalance::zero(),
2535            primitive_types::U256::zero(),
2536            ChannelStatus::Closed,
2537            3.into(),
2538        );
2539
2540        db.upsert_channel(None, channel).await?;
2541
2542        let encoded_data = ().abi_encode();
2543
2544        let channel_opened_log = SerializableLog {
2545            address: handlers.addresses.channels,
2546            topics: vec![
2547                hopr_bindings::hoprchannels::HoprChannels::ChannelOpened::SIGNATURE_HASH.into(),
2548                // ChannelOpenedFilter::signature().into(),
2549                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2550                H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2551            ],
2552            data: encoded_data,
2553            ..test_log()
2554        };
2555
2556        let event_type = db
2557            .begin_transaction()
2558            .await?
2559            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log, true).await }))
2560            .await?;
2561
2562        let channel = db
2563            .get_channel_by_id(None, &channel.get_id())
2564            .await?
2565            .context("a value should be present")?;
2566
2567        assert!(
2568            matches!(event_type, Some(ChainEventType::ChannelOpened(c)) if c == channel),
2569            "must return the updated channel entry"
2570        );
2571
2572        assert_eq!(channel.status, ChannelStatus::Open);
2573        assert_eq!(channel.channel_epoch, 4u64.into());
2574        assert_eq!(channel.ticket_index, 0u64.into());
2575
2576        assert_eq!(
2577            0,
2578            db.get_outgoing_ticket_index(channel.get_id())
2579                .await?
2580                .load(Ordering::Relaxed)
2581        );
2582        Ok(())
2583    }
2584
2585    #[tokio::test]
2586    async fn on_channel_should_not_reopen_when_not_closed() -> anyhow::Result<()> {
2587        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2588        let rpc_operations = MockIndexerRpcOperations::new();
2589        // ==> set mock expectations here
2590        let clonable_rpc_operations = ClonableMockOperations {
2591            //
2592            inner: Arc::new(rpc_operations),
2593        };
2594        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2595
2596        let channel = ChannelEntry::new(
2597            *SELF_CHAIN_ADDRESS,
2598            *COUNTERPARTY_CHAIN_ADDRESS,
2599            0.into(),
2600            primitive_types::U256::zero(),
2601            ChannelStatus::Open,
2602            3.into(),
2603        );
2604
2605        db.upsert_channel(None, channel).await?;
2606
2607        let encoded_data = ().abi_encode();
2608
2609        let channel_opened_log = SerializableLog {
2610            address: handlers.addresses.channels,
2611            topics: vec![
2612                hopr_bindings::hoprchannels::HoprChannels::ChannelOpened::SIGNATURE_HASH.into(),
2613                // ChannelOpenedFilter::signature().into(),
2614                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
2615                H256::from_slice(&COUNTERPARTY_CHAIN_ADDRESS.to_bytes32()).into(),
2616            ],
2617            data: encoded_data,
2618            ..test_log()
2619        };
2620
2621        db.begin_transaction()
2622            .await?
2623            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, channel_opened_log, true).await }))
2624            .await
2625            .context("Channel should stay open, with corrupted flag set")?;
2626
2627        assert!(
2628            db.get_channel_by_id(None, &channel.get_id()).await?.is_none(),
2629            "channel should not be returned as marked as corrupted",
2630        );
2631
2632        db.get_corrupted_channel_by_id(None, &channel.get_id())
2633            .await?
2634            .context("a value should be present")?;
2635
2636        Ok(())
2637    }
2638
2639    #[tokio::test]
2640    async fn event_for_non_existing_channel_should_create_corrupted_channel() -> anyhow::Result<()> {
2641        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2642        let rpc_operations = MockIndexerRpcOperations::new();
2643        // ==> set mock expectations here
2644        let clonable_rpc_operations = ClonableMockOperations {
2645            //
2646            inner: Arc::new(rpc_operations),
2647        };
2648        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2649
2650        let channel_id = generate_channel_id(&SELF_CHAIN_ADDRESS, &COUNTERPARTY_CHAIN_ADDRESS);
2651
2652        // Attempt to increase balance
2653        let solidity_balance: HoprBalance = primitive_types::U256::from((1u128 << 96) - 1).into();
2654
2655        let encoded_data = (solidity_balance.amount().to_be_bytes()).abi_encode();
2656
2657        let balance_increased_log = SerializableLog {
2658            address: handlers.addresses.channels,
2659            topics: vec![
2660                hopr_bindings::hoprchannels::HoprChannels::ChannelBalanceIncreased::SIGNATURE_HASH.into(),
2661                // ChannelBalanceIncreasedFilter::signature().into(),
2662                H256::from_slice(channel_id.as_ref()).into(),
2663            ],
2664            data: encoded_data,
2665            ..test_log()
2666        };
2667
2668        db.begin_transaction()
2669            .await?
2670            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, balance_increased_log, true).await }))
2671            .await?;
2672
2673        // Check that the corrupted channel was created
2674        db.get_corrupted_channel_by_id(None, &channel_id)
2675            .await?
2676            .context("channel should be set a corrupted")?;
2677
2678        Ok(())
2679    }
2680
2681    const PRICE_PER_PACKET: u32 = 20_u32;
2682
2683    fn mock_acknowledged_ticket(
2684        signer: &ChainKeypair,
2685        destination: &ChainKeypair,
2686        index: u64,
2687        win_prob: f64,
2688    ) -> anyhow::Result<AcknowledgedTicket> {
2689        let channel_id = generate_channel_id(&signer.into(), &destination.into());
2690
2691        let channel_epoch = 1u64;
2692        let domain_separator = Hash::default();
2693
2694        let response = Response::try_from(
2695            Hash::create(&[channel_id.as_ref(), &channel_epoch.to_be_bytes(), &index.to_be_bytes()]).as_ref(),
2696        )?;
2697
2698        Ok(TicketBuilder::default()
2699            .direction(&signer.into(), &destination.into())
2700            .amount(primitive_types::U256::from(PRICE_PER_PACKET).div_f64(win_prob)?)
2701            .index(index)
2702            .index_offset(1)
2703            .win_prob(win_prob.try_into()?)
2704            .channel_epoch(1)
2705            .challenge(response.to_challenge()?)
2706            .build_signed(signer, &domain_separator)?
2707            .into_acknowledged(response))
2708    }
2709
2710    #[tokio::test]
2711    async fn on_channel_ticket_redeemed_incoming_channel() -> anyhow::Result<()> {
2712        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2713        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2714            .await?;
2715        let rpc_operations = MockIndexerRpcOperations::new();
2716        // ==> set mock expectations here
2717        let clonable_rpc_operations = ClonableMockOperations {
2718            //
2719            inner: Arc::new(rpc_operations),
2720        };
2721        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2722
2723        let channel = ChannelEntry::new(
2724            *COUNTERPARTY_CHAIN_ADDRESS,
2725            *SELF_CHAIN_ADDRESS,
2726            HoprBalance::from(primitive_types::U256::from((1u128 << 96) - 1)),
2727            primitive_types::U256::zero(),
2728            ChannelStatus::Open,
2729            primitive_types::U256::one(),
2730        );
2731
2732        let ticket_index = primitive_types::U256::from((1u128 << 48) - 2);
2733        let next_ticket_index = ticket_index + 1;
2734
2735        let mut ticket =
2736            mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64(), 1.0)?;
2737        ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
2738
2739        let ticket_value = ticket.verified_ticket().amount;
2740
2741        db.upsert_channel(None, channel).await?;
2742        db.upsert_ticket(None, ticket.clone()).await?;
2743
2744        let ticket_redeemed_log = SerializableLog {
2745            address: handlers.addresses.channels,
2746            topics: vec![
2747                hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2748                // TicketRedeemedFilter::signature().into(),
2749                H256::from_slice(channel.get_id().as_ref()).into(),
2750            ],
2751            data: DynSolValue::Tuple(vec![DynSolValue::Uint(
2752                U256::from_be_bytes(next_ticket_index.to_be_bytes()),
2753                48,
2754            )])
2755            .abi_encode(),
2756            ..test_log()
2757        };
2758
2759        let outgoing_ticket_index_before = db
2760            .get_outgoing_ticket_index(channel.get_id())
2761            .await?
2762            .load(Ordering::Relaxed);
2763
2764        let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2765        assert_eq!(
2766            HoprBalance::zero(),
2767            stats.redeemed_value,
2768            "there should not be any redeemed value"
2769        );
2770        assert_eq!(
2771            HoprBalance::zero(),
2772            stats.neglected_value,
2773            "there should not be any neglected value"
2774        );
2775
2776        let event_type = db
2777            .begin_transaction()
2778            .await?
2779            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2780            .await?;
2781
2782        let channel = db
2783            .get_channel_by_id(None, &channel.get_id())
2784            .await?
2785            .context("a value should be present")?;
2786
2787        assert!(
2788            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, t)) if channel == c && t == Some(ticket)),
2789            "must return the updated channel entry and the redeemed ticket"
2790        );
2791
2792        assert_eq!(
2793            channel.ticket_index, next_ticket_index,
2794            "channel entry must contain next ticket index"
2795        );
2796
2797        let outgoing_ticket_index_after = db
2798            .get_outgoing_ticket_index(channel.get_id())
2799            .await?
2800            .load(Ordering::Relaxed);
2801
2802        assert_eq!(
2803            outgoing_ticket_index_before, outgoing_ticket_index_after,
2804            "outgoing ticket index must not change"
2805        );
2806
2807        let tickets = db.get_tickets((&channel).into()).await?;
2808
2809        assert!(tickets.is_empty(), "there should not be any tickets left");
2810
2811        let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2812        assert_eq!(
2813            ticket_value, stats.redeemed_value,
2814            "there should be redeemed value worth 1 ticket"
2815        );
2816        assert_eq!(
2817            HoprBalance::zero(),
2818            stats.neglected_value,
2819            "there should not be any neglected ticket"
2820        );
2821        Ok(())
2822    }
2823
2824    #[tokio::test]
2825    async fn on_channel_ticket_redeemed_incoming_channel_neglect_left_over_tickets() -> anyhow::Result<()> {
2826        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2827        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2828            .await?;
2829        let rpc_operations = MockIndexerRpcOperations::new();
2830        // ==> set mock expectations here
2831        let clonable_rpc_operations = ClonableMockOperations {
2832            //
2833            inner: Arc::new(rpc_operations),
2834        };
2835        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2836
2837        let channel = ChannelEntry::new(
2838            *COUNTERPARTY_CHAIN_ADDRESS,
2839            *SELF_CHAIN_ADDRESS,
2840            primitive_types::U256::from((1u128 << 96) - 1).into(),
2841            primitive_types::U256::zero(),
2842            ChannelStatus::Open,
2843            primitive_types::U256::one(),
2844        );
2845
2846        let ticket_index = primitive_types::U256::from((1u128 << 48) - 2);
2847        let next_ticket_index = ticket_index + 1;
2848
2849        let mut ticket =
2850            mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64(), 1.0)?;
2851        ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
2852
2853        let ticket_value = ticket.verified_ticket().amount;
2854
2855        db.upsert_channel(None, channel).await?;
2856        db.upsert_ticket(None, ticket.clone()).await?;
2857
2858        let old_ticket =
2859            mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, ticket_index.as_u64() - 1, 1.0)?;
2860        db.upsert_ticket(None, old_ticket.clone()).await?;
2861
2862        let ticket_redeemed_log = SerializableLog {
2863            address: handlers.addresses.channels,
2864            topics: vec![
2865                hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2866                // TicketRedeemedFilter::signature().into(),
2867                H256::from_slice(channel.get_id().as_ref()).into(),
2868            ],
2869            data: Vec::from(next_ticket_index.to_be_bytes()),
2870            ..test_log()
2871        };
2872
2873        let outgoing_ticket_index_before = db
2874            .get_outgoing_ticket_index(channel.get_id())
2875            .await?
2876            .load(Ordering::Relaxed);
2877
2878        let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2879        assert_eq!(
2880            HoprBalance::zero(),
2881            stats.redeemed_value,
2882            "there should not be any redeemed value"
2883        );
2884        assert_eq!(
2885            HoprBalance::zero(),
2886            stats.neglected_value,
2887            "there should not be any neglected value"
2888        );
2889
2890        let event_type = db
2891            .begin_transaction()
2892            .await?
2893            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2894            .await?;
2895
2896        let channel = db
2897            .get_channel_by_id(None, &channel.get_id())
2898            .await?
2899            .context("a value should be present")?;
2900
2901        assert!(
2902            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, t)) if channel == c && t == Some(ticket)),
2903            "must return the updated channel entry and the redeemed ticket"
2904        );
2905
2906        assert_eq!(
2907            channel.ticket_index, next_ticket_index,
2908            "channel entry must contain next ticket index"
2909        );
2910
2911        let outgoing_ticket_index_after = db
2912            .get_outgoing_ticket_index(channel.get_id())
2913            .await?
2914            .load(Ordering::Relaxed);
2915
2916        assert_eq!(
2917            outgoing_ticket_index_before, outgoing_ticket_index_after,
2918            "outgoing ticket index must not change"
2919        );
2920
2921        let tickets = db.get_tickets((&channel).into()).await?;
2922        assert!(tickets.is_empty(), "there should not be any tickets left");
2923
2924        let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2925        assert_eq!(
2926            ticket_value, stats.redeemed_value,
2927            "there should be redeemed value worth 1 ticket"
2928        );
2929        assert_eq!(
2930            ticket_value, stats.neglected_value,
2931            "there should neglected value worth 1 ticket"
2932        );
2933        Ok(())
2934    }
2935
2936    #[tokio::test]
2937    async fn on_channel_ticket_redeemed_outgoing_channel() -> anyhow::Result<()> {
2938        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
2939        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
2940            .await?;
2941        let rpc_operations = MockIndexerRpcOperations::new();
2942        // ==> set mock expectations here
2943        let clonable_rpc_operations = ClonableMockOperations {
2944            //
2945            inner: Arc::new(rpc_operations),
2946        };
2947        let handlers = init_handlers(clonable_rpc_operations, db.clone());
2948
2949        let channel = ChannelEntry::new(
2950            *SELF_CHAIN_ADDRESS,
2951            *COUNTERPARTY_CHAIN_ADDRESS,
2952            primitive_types::U256::from((1u128 << 96) - 1).into(),
2953            primitive_types::U256::zero(),
2954            ChannelStatus::Open,
2955            primitive_types::U256::one(),
2956        );
2957
2958        let ticket_index = primitive_types::U256::from((1u128 << 48) - 2);
2959        let next_ticket_index = ticket_index + 1;
2960
2961        db.upsert_channel(None, channel).await?;
2962
2963        let ticket_redeemed_log = SerializableLog {
2964            address: handlers.addresses.channels,
2965            topics: vec![
2966                hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
2967                // TicketRedeemedFilter::signature().into(),
2968                H256::from_slice(channel.get_id().as_ref()).into(),
2969            ],
2970            data: Vec::from(next_ticket_index.to_be_bytes()),
2971            ..test_log()
2972        };
2973
2974        let event_type = db
2975            .begin_transaction()
2976            .await?
2977            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
2978            .await?;
2979
2980        let channel = db
2981            .get_channel_by_id(None, &channel.get_id())
2982            .await?
2983            .context("a value should be present")?;
2984
2985        assert!(
2986            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if channel == c),
2987            "must return update channel entry and no ticket"
2988        );
2989
2990        assert_eq!(
2991            channel.ticket_index, next_ticket_index,
2992            "channel entry must contain next ticket index"
2993        );
2994
2995        let outgoing_ticket_index = db
2996            .get_outgoing_ticket_index(channel.get_id())
2997            .await?
2998            .load(Ordering::Relaxed);
2999
3000        assert!(
3001            outgoing_ticket_index >= ticket_index.as_u64(),
3002            "outgoing idx {outgoing_ticket_index} must be greater or equal to {ticket_index}"
3003        );
3004        assert_eq!(
3005            outgoing_ticket_index,
3006            next_ticket_index.as_u64(),
3007            "outgoing ticket index must be equal to next ticket index"
3008        );
3009        Ok(())
3010    }
3011
3012    #[tokio::test]
3013    async fn on_channel_ticket_redeemed_on_incoming_channel_with_non_existent_ticket_should_pass() -> anyhow::Result<()>
3014    {
3015        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3016        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
3017            .await?;
3018        let rpc_operations = MockIndexerRpcOperations::new();
3019        // ==> set mock expectations here
3020        let clonable_rpc_operations = ClonableMockOperations {
3021            //
3022            inner: Arc::new(rpc_operations),
3023        };
3024        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3025
3026        let channel = ChannelEntry::new(
3027            *COUNTERPARTY_CHAIN_ADDRESS,
3028            *SELF_CHAIN_ADDRESS,
3029            primitive_types::U256::from((1u128 << 96) - 1).into(),
3030            primitive_types::U256::zero(),
3031            ChannelStatus::Open,
3032            primitive_types::U256::one(),
3033        );
3034
3035        db.upsert_channel(None, channel).await?;
3036
3037        let next_ticket_index = primitive_types::U256::from((1u128 << 48) - 1);
3038
3039        let ticket_redeemed_log = SerializableLog {
3040            address: handlers.addresses.channels,
3041            topics: vec![
3042                hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
3043                // TicketRedeemedFilter::signature().into(),
3044                H256::from_slice(channel.get_id().as_ref()).into(),
3045            ],
3046            data: Vec::from(next_ticket_index.to_be_bytes()),
3047            ..test_log()
3048        };
3049
3050        let event_type = db
3051            .begin_transaction()
3052            .await?
3053            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
3054            .await?;
3055
3056        let channel = db
3057            .get_channel_by_id(None, &channel.get_id())
3058            .await?
3059            .context("a value should be present")?;
3060
3061        assert!(
3062            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if c == channel),
3063            "must return updated channel entry and no ticket"
3064        );
3065
3066        assert_eq!(
3067            channel.ticket_index, next_ticket_index,
3068            "channel entry must contain next ticket index"
3069        );
3070        Ok(())
3071    }
3072
3073    #[tokio::test]
3074    async fn on_channel_ticket_redeemed_on_foreign_channel_should_pass() -> anyhow::Result<()> {
3075        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3076        let rpc_operations = MockIndexerRpcOperations::new();
3077        // ==> set mock expectations here
3078        let clonable_rpc_operations = ClonableMockOperations {
3079            //
3080            inner: Arc::new(rpc_operations),
3081        };
3082        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3083
3084        let channel = ChannelEntry::new(
3085            Address::from(hopr_crypto_random::random_bytes()),
3086            Address::from(hopr_crypto_random::random_bytes()),
3087            primitive_types::U256::from((1u128 << 96) - 1).into(),
3088            primitive_types::U256::zero(),
3089            ChannelStatus::Open,
3090            primitive_types::U256::one(),
3091        );
3092
3093        db.upsert_channel(None, channel).await?;
3094
3095        let next_ticket_index = primitive_types::U256::from((1u128 << 48) - 1);
3096
3097        let ticket_redeemed_log = SerializableLog {
3098            address: handlers.addresses.channels,
3099            topics: vec![
3100                hopr_bindings::hoprchannels::HoprChannels::TicketRedeemed::SIGNATURE_HASH.into(),
3101                // TicketRedeemedFilter::signature().into(),
3102                H256::from_slice(channel.get_id().as_ref()).into(),
3103            ],
3104            data: Vec::from(next_ticket_index.to_be_bytes()),
3105            ..test_log()
3106        };
3107
3108        let event_type = db
3109            .begin_transaction()
3110            .await?
3111            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, ticket_redeemed_log, true).await }))
3112            .await?;
3113
3114        let channel = db
3115            .get_channel_by_id(None, &channel.get_id())
3116            .await?
3117            .context("a value should be present")?;
3118
3119        assert!(
3120            matches!(event_type, Some(ChainEventType::TicketRedeemed(c, None)) if c == channel),
3121            "must return updated channel entry and no ticket"
3122        );
3123
3124        assert_eq!(
3125            channel.ticket_index, next_ticket_index,
3126            "channel entry must contain next ticket index"
3127        );
3128        Ok(())
3129    }
3130
3131    #[tokio::test]
3132    async fn on_channel_closure_initiated() -> anyhow::Result<()> {
3133        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3134        let rpc_operations = MockIndexerRpcOperations::new();
3135        // ==> set mock expectations here
3136        let clonable_rpc_operations = ClonableMockOperations {
3137            //
3138            inner: Arc::new(rpc_operations),
3139        };
3140        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3141
3142        let channel = ChannelEntry::new(
3143            *SELF_CHAIN_ADDRESS,
3144            *COUNTERPARTY_CHAIN_ADDRESS,
3145            primitive_types::U256::from((1u128 << 96) - 1).into(),
3146            primitive_types::U256::zero(),
3147            ChannelStatus::Open,
3148            primitive_types::U256::one(),
3149        );
3150
3151        db.upsert_channel(None, channel).await?;
3152
3153        let timestamp = SystemTime::now();
3154
3155        let encoded_data = (U256::from(timestamp.as_unix_timestamp().as_secs())).abi_encode();
3156
3157        let closure_initiated_log = SerializableLog {
3158            address: handlers.addresses.channels,
3159            topics: vec![
3160                hopr_bindings::hoprchannels::HoprChannels::OutgoingChannelClosureInitiated::SIGNATURE_HASH.into(),
3161                // OutgoingChannelClosureInitiatedFilter::signature().into(),
3162                H256::from_slice(channel.get_id().as_ref()).into(),
3163            ],
3164            data: encoded_data,
3165            // data: Vec::from(U256::from(timestamp.as_unix_timestamp().as_secs()).to_be_bytes()).into(),
3166            ..test_log()
3167        };
3168
3169        let event_type = db
3170            .begin_transaction()
3171            .await?
3172            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, closure_initiated_log, true).await }))
3173            .await?;
3174
3175        let channel = db
3176            .get_channel_by_id(None, &channel.get_id())
3177            .await?
3178            .context("a value should be present")?;
3179
3180        assert!(
3181            matches!(event_type, Some(ChainEventType::ChannelClosureInitiated(c)) if c == channel),
3182            "must return updated channel entry"
3183        );
3184
3185        assert_eq!(
3186            channel.status,
3187            ChannelStatus::PendingToClose(timestamp),
3188            "channel status must match"
3189        );
3190        Ok(())
3191    }
3192
3193    #[tokio::test]
3194    async fn on_node_safe_registry_registered() -> anyhow::Result<()> {
3195        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3196        let rpc_operations = MockIndexerRpcOperations::new();
3197        // ==> set mock expectations here
3198        let clonable_rpc_operations = ClonableMockOperations {
3199            //
3200            inner: Arc::new(rpc_operations),
3201        };
3202        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3203
3204        let encoded_data = ().abi_encode();
3205
3206        let safe_registered_log = SerializableLog {
3207            address: handlers.addresses.safe_registry,
3208            topics: vec![
3209                hopr_bindings::hoprnodesaferegistry::HoprNodeSafeRegistry::RegisteredNodeSafe::SIGNATURE_HASH.into(),
3210                // RegisteredNodeSafeFilter::signature().into(),
3211                H256::from_slice(&SAFE_INSTANCE_ADDR.to_bytes32()).into(),
3212                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
3213            ],
3214            data: encoded_data,
3215            ..test_log()
3216        };
3217
3218        let event_type = db
3219            .begin_transaction()
3220            .await?
3221            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, safe_registered_log, true).await }))
3222            .await?;
3223
3224        assert!(matches!(event_type, Some(ChainEventType::NodeSafeRegistered(addr)) if addr == *SAFE_INSTANCE_ADDR));
3225
3226        // Nothing to check in the DB here, since we do not track this
3227        Ok(())
3228    }
3229
3230    #[tokio::test]
3231    async fn on_node_safe_registry_deregistered() -> anyhow::Result<()> {
3232        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3233        let rpc_operations = MockIndexerRpcOperations::new();
3234        // ==> set mock expectations here
3235        let clonable_rpc_operations = ClonableMockOperations {
3236            //
3237            inner: Arc::new(rpc_operations),
3238        };
3239        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3240
3241        // Nothing to write to the DB here, since we do not track this
3242
3243        let encoded_data = ().abi_encode();
3244
3245        let safe_registered_log = SerializableLog {
3246            address: handlers.addresses.safe_registry,
3247            topics: vec![
3248                hopr_bindings::hoprnodesaferegistry::HoprNodeSafeRegistry::DergisteredNodeSafe::SIGNATURE_HASH.into(),
3249                // DergisteredNodeSafeFilter::signature().into(),
3250                H256::from_slice(&SAFE_INSTANCE_ADDR.to_bytes32()).into(),
3251                H256::from_slice(&SELF_CHAIN_ADDRESS.to_bytes32()).into(),
3252            ],
3253            data: encoded_data,
3254            ..test_log()
3255        };
3256
3257        let event_type = db
3258            .begin_transaction()
3259            .await?
3260            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, safe_registered_log, true).await }))
3261            .await?;
3262
3263        assert!(
3264            event_type.is_none(),
3265            "there's no associated chain event type with safe deregistration"
3266        );
3267
3268        // Nothing to check in the DB here, since we do not track this
3269        Ok(())
3270    }
3271
3272    #[tokio::test]
3273    async fn ticket_price_update() -> anyhow::Result<()> {
3274        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3275        let rpc_operations = MockIndexerRpcOperations::new();
3276        // ==> set mock expectations here
3277        let clonable_rpc_operations = ClonableMockOperations {
3278            //
3279            inner: Arc::new(rpc_operations),
3280        };
3281        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3282
3283        let encoded_data = (U256::from(1u64), U256::from(123u64)).abi_encode();
3284
3285        let price_change_log = SerializableLog {
3286            address: handlers.addresses.price_oracle,
3287            topics: vec![
3288                hopr_bindings::hoprticketpriceoracle::HoprTicketPriceOracle::TicketPriceUpdated::SIGNATURE_HASH.into(),
3289                // TicketPriceUpdatedFilter::signature().into()
3290            ],
3291            data: encoded_data,
3292            // data: encode(&[Token::Uint(EthU256::from(1u64)), Token::Uint(EthU256::from(123u64))]).into(),
3293            ..test_log()
3294        };
3295
3296        assert_eq!(db.get_indexer_data(None).await?.ticket_price, None);
3297
3298        let event_type = db
3299            .begin_transaction()
3300            .await?
3301            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, price_change_log, true).await }))
3302            .await?;
3303
3304        assert!(
3305            event_type.is_none(),
3306            "there's no associated chain event type with price oracle"
3307        );
3308
3309        assert_eq!(
3310            db.get_indexer_data(None).await?.ticket_price.map(|p| p.amount()),
3311            Some(primitive_types::U256::from(123u64))
3312        );
3313        Ok(())
3314    }
3315
3316    #[tokio::test]
3317    async fn minimum_win_prob_update() -> anyhow::Result<()> {
3318        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3319        let rpc_operations = MockIndexerRpcOperations::new();
3320        // ==> set mock expectations here
3321        let clonable_rpc_operations = ClonableMockOperations {
3322            //
3323            inner: Arc::new(rpc_operations),
3324        };
3325        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3326
3327        let encoded_data = (
3328            U256::from_be_slice(WinningProbability::ALWAYS.as_ref()),
3329            U256::from_be_slice(WinningProbability::try_from_f64(0.5)?.as_ref()),
3330        )
3331            .abi_encode();
3332
3333        let win_prob_change_log = SerializableLog {
3334            address: handlers.addresses.win_prob_oracle,
3335            topics: vec![
3336                hopr_bindings::hoprwinningprobabilityoracle::HoprWinningProbabilityOracle::WinProbUpdated::SIGNATURE_HASH.into()],
3337            data: encoded_data,
3338            ..test_log()
3339        };
3340
3341        assert_eq!(
3342            db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
3343            1.0
3344        );
3345
3346        let event_type = db
3347            .begin_transaction()
3348            .await?
3349            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, win_prob_change_log, true).await }))
3350            .await?;
3351
3352        assert!(
3353            event_type.is_none(),
3354            "there's no associated chain event type with winning probability change"
3355        );
3356
3357        assert_eq!(
3358            db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
3359            0.5
3360        );
3361        Ok(())
3362    }
3363
3364    #[tokio::test]
3365    async fn lowering_minimum_win_prob_update_should_reject_non_satisfying_unredeemed_tickets() -> anyhow::Result<()> {
3366        let db = HoprDb::new_in_memory(SELF_CHAIN_KEY.clone()).await?;
3367        db.set_minimum_incoming_ticket_win_prob(None, 0.1.try_into()?).await?;
3368
3369        let new_minimum = 0.5;
3370        let ticket_win_probs = [0.1, 1.0, 0.3, 0.2];
3371
3372        let channel_1 = ChannelEntry::new(
3373            *COUNTERPARTY_CHAIN_ADDRESS,
3374            *SELF_CHAIN_ADDRESS,
3375            primitive_types::U256::from((1u128 << 96) - 1).into(),
3376            3_u32.into(),
3377            ChannelStatus::Open,
3378            primitive_types::U256::one(),
3379        );
3380
3381        db.upsert_channel(None, channel_1).await?;
3382
3383        let ticket = mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, 1, ticket_win_probs[0])?;
3384        db.upsert_ticket(None, ticket).await?;
3385
3386        let ticket = mock_acknowledged_ticket(&COUNTERPARTY_CHAIN_KEY, &SELF_CHAIN_KEY, 2, ticket_win_probs[1])?;
3387        db.upsert_ticket(None, ticket).await?;
3388
3389        let tickets = db.get_tickets((&channel_1).into()).await?;
3390        assert_eq!(tickets.len(), 2);
3391
3392        // ---
3393
3394        let other_counterparty = ChainKeypair::random();
3395        let channel_2 = ChannelEntry::new(
3396            other_counterparty.public().to_address(),
3397            *SELF_CHAIN_ADDRESS,
3398            primitive_types::U256::from((1u128 << 96) - 1).into(),
3399            3_u32.into(),
3400            ChannelStatus::Open,
3401            primitive_types::U256::one(),
3402        );
3403
3404        db.upsert_channel(None, channel_2).await?;
3405
3406        let ticket = mock_acknowledged_ticket(&other_counterparty, &SELF_CHAIN_KEY, 1, ticket_win_probs[2])?;
3407        db.upsert_ticket(None, ticket).await?;
3408
3409        let ticket = mock_acknowledged_ticket(&other_counterparty, &SELF_CHAIN_KEY, 2, ticket_win_probs[3])?;
3410        db.upsert_ticket(None, ticket).await?;
3411
3412        let tickets = db.get_tickets((&channel_2).into()).await?;
3413        assert_eq!(tickets.len(), 2);
3414
3415        let stats = db.get_ticket_statistics(None).await?;
3416        assert_eq!(HoprBalance::zero(), stats.rejected_value);
3417
3418        let rpc_operations = MockIndexerRpcOperations::new();
3419        // ==> set mock expectations here
3420        let clonable_rpc_operations = ClonableMockOperations {
3421            //
3422            inner: Arc::new(rpc_operations),
3423        };
3424        let handlers = init_handlers(clonable_rpc_operations, db.clone());
3425
3426        let encoded_data = (
3427            U256::from_be_slice(WinningProbability::try_from(0.1)?.as_ref()),
3428            U256::from_be_slice(WinningProbability::try_from(new_minimum)?.as_ref()),
3429        )
3430            .abi_encode();
3431
3432        let win_prob_change_log = SerializableLog {
3433            address: handlers.addresses.win_prob_oracle,
3434            topics: vec![
3435                hopr_bindings::hoprwinningprobabilityoracle::HoprWinningProbabilityOracle::WinProbUpdated::SIGNATURE_HASH.into(),
3436            ],
3437            data: encoded_data,
3438            ..test_log()
3439        };
3440
3441        let event_type = db
3442            .begin_transaction()
3443            .await?
3444            .perform(|tx| Box::pin(async move { handlers.process_log_event(tx, win_prob_change_log, true).await }))
3445            .await?;
3446
3447        assert!(
3448            event_type.is_none(),
3449            "there's no associated chain event type with winning probability change"
3450        );
3451
3452        assert_eq!(
3453            db.get_indexer_data(None).await?.minimum_incoming_ticket_winning_prob,
3454            new_minimum
3455        );
3456
3457        let tickets = db.get_tickets((&channel_1).into()).await?;
3458        assert_eq!(tickets.len(), 1);
3459
3460        let tickets = db.get_tickets((&channel_2).into()).await?;
3461        assert_eq!(tickets.len(), 0);
3462
3463        let stats = db.get_ticket_statistics(None).await?;
3464        let rejected_value: primitive_types::U256 = ticket_win_probs
3465            .iter()
3466            .filter(|p| **p < new_minimum)
3467            .map(|p| {
3468                primitive_types::U256::from(PRICE_PER_PACKET)
3469                    .div_f64(*p)
3470                    .expect("must divide")
3471            })
3472            .reduce(|a, b| a + b)
3473            .ok_or(anyhow!("must sum"))?;
3474
3475        assert_eq!(HoprBalance::from(rejected_value), stats.rejected_value);
3476
3477        Ok(())
3478    }
3479}