Skip to main content

hopr_protocol_hopr/
ticket_processing.rs

1use std::sync::{Arc, atomic::AtomicU64};
2
3use futures::StreamExt;
4use hopr_api::{
5    chain::ChainReadChannelOperations,
6    db::{HoprDbTicketOperations, TicketSelector},
7};
8use hopr_crypto_types::prelude::*;
9use hopr_internal_types::prelude::*;
10#[cfg(feature = "rayon")]
11use hopr_parallelize::cpu::rayon::prelude::*;
12use hopr_primitive_types::balance::HoprBalance;
13use validator::ValidationError;
14
15use crate::{
16    HoprProtocolError, ResolvedAcknowledgement, TicketAcknowledgementError, TicketTracker,
17    UnacknowledgedTicketProcessor,
18};
19
20/// Metrics for unacknowledged ticket cache diagnostics.
21///
22/// These help investigate "unknown ticket" acknowledgement failures by tracking
23/// cache insertions, lookups, misses, and evictions.
24///
25/// Per-peer metrics are disabled by default due to Prometheus cardinality concerns.
26/// Set `HOPR_METRICS_UNACK_PER_PEER=1` to enable them for debugging.
27mod metrics {
28    #[cfg(any(not(feature = "prometheus"), test))]
29    pub use noop::*;
30    #[cfg(all(feature = "prometheus", not(test)))]
31    pub use real::*;
32
33    #[cfg(all(feature = "prometheus", not(test)))]
34    mod real {
35        lazy_static::lazy_static! {
36            /// Whether per-peer metrics are enabled (disabled by default to avoid cardinality explosion).
37            static ref PER_PEER_ENABLED: bool = std::env::var("HOPR_METRICS_UNACK_PER_PEER")
38                .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
39                .unwrap_or(false);
40
41            static ref UNACK_PEERS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
42                "hopr_tickets_unack_peers_total",
43                "Number of peers with unacknowledged tickets in cache",
44            )
45            .unwrap();
46            static ref UNACK_TICKETS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
47                "hopr_tickets_unack_tickets_total",
48                "Total number of unacknowledged tickets across all peer caches",
49            )
50            .unwrap();
51            static ref UNACK_INSERTIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
52                "hopr_tickets_unack_insertions_total",
53                "Total number of unacknowledged tickets inserted into cache",
54            )
55            .unwrap();
56            static ref UNACK_LOOKUPS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
57                "hopr_tickets_unack_lookups_total",
58                "Total number of ticket acknowledgement lookups",
59            )
60            .unwrap();
61            static ref UNACK_LOOKUP_MISSES: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
62                "hopr_tickets_unack_lookup_misses_total",
63                "Total number of ticket lookup failures (unknown ticket)",
64            )
65            .unwrap();
66            static ref UNACK_EVICTIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
67                "hopr_tickets_unack_evictions_total",
68                "Total number of unacknowledged tickets evicted from cache",
69            )
70            .unwrap();
71            static ref UNACK_PEER_EVICTIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
72                "hopr_tickets_unack_peer_evictions_total",
73                "Total number of peer caches evicted from the outer unacknowledged ticket cache",
74            )
75            .unwrap();
76            static ref UNACK_TICKETS_PER_PEER: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
77                "hopr_tickets_unack_tickets_per_peer",
78                "Number of unacknowledged tickets per peer in cache (enable with HOPR_METRICS_UNACK_PER_PEER=1)",
79                &["peer"],
80            )
81            .unwrap();
82        }
83
84        pub fn initialize() {
85            lazy_static::initialize(&PER_PEER_ENABLED);
86            lazy_static::initialize(&UNACK_PEERS);
87            lazy_static::initialize(&UNACK_TICKETS);
88            lazy_static::initialize(&UNACK_INSERTIONS);
89            lazy_static::initialize(&UNACK_LOOKUPS);
90            lazy_static::initialize(&UNACK_LOOKUP_MISSES);
91            lazy_static::initialize(&UNACK_EVICTIONS);
92            lazy_static::initialize(&UNACK_PEER_EVICTIONS);
93            if *PER_PEER_ENABLED {
94                lazy_static::initialize(&UNACK_TICKETS_PER_PEER);
95            }
96        }
97
98        #[inline]
99        #[allow(dead_code)]
100        pub fn per_peer_enabled() -> bool {
101            *PER_PEER_ENABLED
102        }
103
104        #[inline]
105        pub fn inc_unack_peers() {
106            UNACK_PEERS.increment(1.0);
107        }
108
109        #[inline]
110        pub fn dec_unack_peers() {
111            UNACK_PEERS.decrement(1.0);
112        }
113
114        #[inline]
115        pub fn inc_unack_tickets() {
116            UNACK_TICKETS.increment(1.0);
117        }
118
119        #[inline]
120        pub fn dec_unack_tickets() {
121            UNACK_TICKETS.decrement(1.0);
122        }
123
124        #[inline]
125        pub fn inc_insertions() {
126            UNACK_INSERTIONS.increment();
127        }
128
129        #[inline]
130        pub fn inc_lookups() {
131            UNACK_LOOKUPS.increment();
132        }
133
134        #[inline]
135        pub fn inc_lookup_misses() {
136            UNACK_LOOKUP_MISSES.increment();
137        }
138
139        #[inline]
140        pub fn inc_evictions() {
141            UNACK_EVICTIONS.increment();
142        }
143
144        #[inline]
145        pub fn inc_peer_evictions() {
146            UNACK_PEER_EVICTIONS.increment();
147        }
148
149        #[inline]
150        pub fn inc_unack_tickets_for_peer(peer: &str) {
151            if *PER_PEER_ENABLED {
152                UNACK_TICKETS_PER_PEER.increment(&[peer], 1.0);
153            }
154        }
155
156        #[inline]
157        pub fn dec_unack_tickets_for_peer(peer: &str) {
158            if *PER_PEER_ENABLED {
159                UNACK_TICKETS_PER_PEER.decrement(&[peer], 1.0);
160            }
161        }
162
163        #[inline]
164        #[allow(dead_code)]
165        pub fn reset_unack_tickets_for_peer(peer: &str) {
166            if *PER_PEER_ENABLED {
167                UNACK_TICKETS_PER_PEER.set(&[peer], 0.0);
168            }
169        }
170    }
171
172    #[cfg(any(not(feature = "prometheus"), test))]
173    mod noop {
174        #[inline]
175        pub fn initialize() {}
176        #[inline]
177        #[allow(dead_code)]
178        pub fn per_peer_enabled() -> bool {
179            false
180        }
181        #[inline]
182        pub fn inc_unack_peers() {}
183        #[inline]
184        pub fn dec_unack_peers() {}
185        #[inline]
186        pub fn inc_unack_tickets() {}
187        #[inline]
188        pub fn dec_unack_tickets() {}
189        #[inline]
190        pub fn inc_insertions() {}
191        #[inline]
192        pub fn inc_lookups() {}
193        #[inline]
194        pub fn inc_lookup_misses() {}
195        #[inline]
196        pub fn inc_evictions() {}
197        #[inline]
198        pub fn inc_peer_evictions() {}
199        #[inline]
200        pub fn inc_unack_tickets_for_peer(_: &str) {}
201        #[inline]
202        pub fn dec_unack_tickets_for_peer(_: &str) {}
203        #[inline]
204        #[allow(dead_code)]
205        pub fn reset_unack_tickets_for_peer(_: &str) {}
206    }
207}
208
209const MIN_UNACK_TICKET_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
210fn validate_unack_ticket_timeout(timeout: &std::time::Duration) -> Result<(), ValidationError> {
211    if timeout < &MIN_UNACK_TICKET_TIMEOUT {
212        Err(ValidationError::new("unack_ticket_timeout too low"))
213    } else {
214        Ok(())
215    }
216}
217
218const MIN_OUTGOING_INDEX_RETENTION: std::time::Duration = std::time::Duration::from_secs(10);
219
220fn validate_outgoing_index_retention(retention: &std::time::Duration) -> Result<(), ValidationError> {
221    if retention < &MIN_OUTGOING_INDEX_RETENTION {
222        Err(ValidationError::new("outgoing_index_cache_retention too low"))
223    } else {
224        Ok(())
225    }
226}
227
228fn default_outgoing_index_retention() -> std::time::Duration {
229    std::time::Duration::from_secs(10)
230}
231
232fn default_unack_ticket_timeout() -> std::time::Duration {
233    std::time::Duration::from_secs(30)
234}
235
236fn default_max_unack_tickets() -> usize {
237    10_000_000
238}
239
240fn just_true() -> bool {
241    true
242}
243
244/// Configuration for the HOPR ticket processor within the packet pipeline.
245
246#[derive(Debug, Clone, Copy, smart_default::SmartDefault, PartialEq, validator::Validate)]
247#[cfg_attr(
248    feature = "serde",
249    derive(serde::Deserialize, serde::Serialize),
250    serde(deny_unknown_fields)
251)]
252pub struct HoprTicketProcessorConfig {
253    /// Time after which an unacknowledged ticket is considered stale and is removed from the cache.
254    ///
255    /// If the counterparty does not send an acknowledgement within this period, the ticket is lost forever.
256    ///
257    /// Default is 30 seconds.
258    #[default(default_unack_ticket_timeout())]
259    #[validate(custom(function = "validate_unack_ticket_timeout"))]
260    #[cfg_attr(
261        feature = "serde",
262        serde(default = "default_unack_ticket_timeout", with = "humantime_serde")
263    )]
264    pub unack_ticket_timeout: std::time::Duration,
265    /// Maximum number of unacknowledged tickets that can be stored in the cache at any given time.
266    ///
267    /// When more tickets are received, the oldest ones are discarded and lost forever.
268    ///
269    /// Default is 10 000 000.
270    #[default(default_max_unack_tickets())]
271    #[validate(range(min = 100))]
272    #[cfg_attr(feature = "serde", serde(default = "default_max_unack_tickets"))]
273    pub max_unack_tickets: usize,
274    /// Period for which the outgoing ticket index is cached in memory for each channel.
275    ///
276    /// Default is 10 seconds.
277    #[default(default_outgoing_index_retention())]
278    #[validate(custom(function = "validate_outgoing_index_retention"))]
279    #[cfg_attr(
280        feature = "serde",
281        serde(default = "default_outgoing_index_retention", with = "humantime_serde")
282    )]
283    pub outgoing_index_cache_retention: std::time::Duration,
284    /// Indicates whether to use batch verification algorithm for acknowledgements.
285    ///
286    /// This has a positive performance impact on higher workloads.
287    ///
288    /// Default is true.
289    #[default(just_true())]
290    #[cfg_attr(feature = "serde", serde(default = "just_true"))]
291    pub use_batch_verification: bool,
292}
293
294/// HOPR-specific implementation of [`UnacknowledgedTicketProcessor`] and [`TicketTracker`].
295#[derive(Clone)]
296pub struct HoprTicketProcessor<Chain, Db> {
297    unacknowledged_tickets:
298        moka::future::Cache<OffchainPublicKey, moka::future::Cache<HalfKeyChallenge, UnacknowledgedTicket>>,
299    out_ticket_index: moka::future::Cache<(ChannelId, u32), Arc<AtomicU64>>,
300    db: Db,
301    chain_api: Chain,
302    chain_key: ChainKeypair,
303    channels_dst: Hash,
304    cfg: HoprTicketProcessorConfig,
305}
306
307impl<Chain, Db> HoprTicketProcessor<Chain, Db> {
308    /// Creates a new instance of the HOPR ticket processor.
309    pub fn new(
310        chain_api: Chain,
311        db: Db,
312        chain_key: ChainKeypair,
313        channels_dst: Hash,
314        cfg: HoprTicketProcessorConfig,
315    ) -> Self {
316        metrics::initialize();
317
318        Self {
319            out_ticket_index: moka::future::Cache::builder()
320                .time_to_idle(cfg.outgoing_index_cache_retention)
321                .max_capacity(10_000)
322                .build(),
323            unacknowledged_tickets: moka::future::Cache::builder()
324                .time_to_idle(cfg.unack_ticket_timeout)
325                .max_capacity(100_000)
326                .async_eviction_listener(
327                    |_key,
328                     value: moka::future::Cache<HalfKeyChallenge, UnacknowledgedTicket>,
329                     cause|
330                     -> moka::notification::ListenerFuture {
331                        Box::pin(async move {
332                            if !matches!(cause, moka::notification::RemovalCause::Replaced) {
333                                metrics::dec_unack_peers();
334                                if matches!(
335                                    cause,
336                                    moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size
337                                ) {
338                                    metrics::inc_peer_evictions();
339                                }
340                            }
341                            // Explicitly invalidate all inner cache entries so their eviction
342                            // listeners fire (decrementing UNACK_TICKETS and per-peer metrics).
343                            // Without this, dropping the inner cache silently leaks those metrics.
344                            value.invalidate_all();
345                            value.run_pending_tasks().await;
346                        })
347                    },
348                )
349                .build(),
350            chain_api,
351            db,
352            chain_key,
353            channels_dst,
354            cfg,
355        }
356    }
357}
358
359impl<Chain, Db> HoprTicketProcessor<Chain, Db>
360where
361    Db: HoprDbTicketOperations + Clone + Send + 'static,
362{
363    /// Task that performs periodic synchronization of the outgoing ticket index cache
364    /// to the underlying database.
365    ///
366    /// If this task is not started, the outgoing ticket indices will not survive a node
367    /// restart and will result in invalid tickets received by the counterparty.
368    pub fn outgoing_index_sync_task(
369        &self,
370        reg: futures::future::AbortRegistration,
371    ) -> impl Future<Output = ()> + use<Db, Chain> {
372        let index_save_stream = futures::stream::Abortable::new(
373            futures_time::stream::interval(futures_time::time::Duration::from(
374                self.cfg.outgoing_index_cache_retention.div_f32(2.0),
375            )),
376            reg,
377        );
378
379        let db = self.db.clone();
380        let out_ticket_index = self.out_ticket_index.clone();
381
382        index_save_stream
383            .for_each(move |_| {
384                let db = db.clone();
385                let out_ticket_index = out_ticket_index.clone();
386                async move {
387                    // This iteration does not alter the popularity estimator of the cache
388                    // and therefore still allows the unused entries to expire
389                    for (channel_key, out_idx) in out_ticket_index.iter() {
390                        if let Err(error) = db
391                            .update_outgoing_ticket_index(
392                                &channel_key.0,
393                                channel_key.1,
394                                out_idx.load(std::sync::atomic::Ordering::SeqCst),
395                            )
396                            .await
397                        {
398                            tracing::error!(%error, channel_id = %channel_key.0, epoch = channel_key.1, "failed to sync outgoing ticket index to db");
399                        }
400                    }
401                    tracing::trace!("synced outgoing ticket indices to db");
402                }
403            })
404    }
405}
406
407#[async_trait::async_trait]
408impl<Chain, Db> UnacknowledgedTicketProcessor for HoprTicketProcessor<Chain, Db>
409where
410    Chain: ChainReadChannelOperations + Send + Sync,
411    Db: Send + Sync,
412{
413    type Error = HoprProtocolError;
414
415    #[tracing::instrument(skip(self, next_hop, challenge, ticket), level = "trace", fields(next_hop = next_hop.to_peerid_str()))]
416    async fn insert_unacknowledged_ticket(
417        &self,
418        next_hop: &OffchainPublicKey,
419        challenge: HalfKeyChallenge,
420        ticket: UnacknowledgedTicket,
421    ) -> Result<(), Self::Error> {
422        tracing::trace!(%ticket, "received unacknowledged ticket");
423
424        let peer_id = next_hop.to_peerid_str();
425        let inner_cache = self
426            .unacknowledged_tickets
427            .get_with_by_ref(next_hop, async {
428                let peer_id_for_eviction = peer_id.clone();
429                metrics::inc_unack_peers();
430                moka::future::Cache::builder()
431                    .time_to_live(self.cfg.unack_ticket_timeout)
432                    .max_capacity(self.cfg.max_unack_tickets as u64)
433                    .eviction_listener(move |_key, _value, cause| {
434                        metrics::dec_unack_tickets();
435                        metrics::dec_unack_tickets_for_peer(&peer_id_for_eviction);
436
437                        // Only count Expired/Size removals as evictions (not Explicit or Replaced)
438                        if matches!(
439                            cause,
440                            moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size
441                        ) {
442                            metrics::inc_evictions();
443                        }
444                    })
445                    .build()
446            })
447            .await;
448
449        inner_cache.insert(challenge, ticket).await;
450
451        metrics::inc_insertions();
452        metrics::inc_unack_tickets();
453        metrics::inc_unack_tickets_for_peer(&peer_id);
454
455        Ok(())
456    }
457
458    #[tracing::instrument(skip_all, level = "trace", fields(peer = peer.to_peerid_str()))]
459    async fn acknowledge_tickets(
460        &self,
461        peer: OffchainPublicKey,
462        acks: Vec<Acknowledgement>,
463    ) -> Result<Vec<ResolvedAcknowledgement>, TicketAcknowledgementError<Self::Error>> {
464        // Check if we're even expecting an acknowledgement from this peer:
465        // We need to first do a check that does not update the popularity estimator of `peer` in this cache,
466        // so we actually allow the entry to time out eventually. However, this comes at the cost
467        // double-lookup.
468        if !self.unacknowledged_tickets.contains_key(&peer) {
469            tracing::trace!("not awaiting any acknowledgement from peer");
470            return Err(TicketAcknowledgementError::UnexpectedAcknowledgement);
471        }
472        let Some(awaiting_ack_from_peer) = self.unacknowledged_tickets.get(&peer).await else {
473            tracing::trace!("not awaiting any acknowledgement from peer");
474            return Err(TicketAcknowledgementError::UnexpectedAcknowledgement);
475        };
476
477        // Verify all the acknowledgements and compute challenges from half-keys
478        let use_batch_verify = self.cfg.use_batch_verification;
479        let half_keys_challenges = hopr_parallelize::cpu::spawn_fifo_blocking(
480            move || {
481                if use_batch_verify {
482                    // Uses regular verifications for small batches but switches to a more effective
483                    // batch verification algorithm for larger ones.
484                    let acks = Acknowledgement::verify_batch(acks.into_iter().map(|ack| (peer, ack)));
485
486                    #[cfg(feature = "rayon")]
487                    let iter = acks.into_par_iter();
488
489                    #[cfg(not(feature = "rayon"))]
490                    let iter = acks.into_iter();
491
492                    iter.map(|verified| {
493                        verified.and_then(|verified| {
494                            Ok((*verified.ack_key_share(), verified.ack_key_share().to_challenge()?))
495                        })
496                    })
497                    .filter_map(|res| {
498                        res.inspect_err(|error| tracing::error!(%error, "failed to process acknowledgement"))
499                            .ok()
500                    })
501                    .collect::<Vec<_>>()
502                } else {
503                    #[cfg(feature = "rayon")]
504                    let iter = acks.into_par_iter();
505
506                    #[cfg(not(feature = "rayon"))]
507                    let iter = acks.into_iter();
508
509                    iter.map(|ack| {
510                        ack.verify(&peer).and_then(|verified| {
511                            Ok((*verified.ack_key_share(), verified.ack_key_share().to_challenge()?))
512                        })
513                    })
514                    .filter_map(|res| {
515                        res.inspect_err(|error| tracing::error!(%error, "failed to process acknowledgement"))
516                            .ok()
517                    })
518                    .collect::<Vec<_>>()
519                }
520            },
521            "ack_verify",
522        )
523        .await
524        .map_err(|e| TicketAcknowledgementError::Inner(HoprProtocolError::from(e)))?;
525
526        // Find all the tickets that we're awaiting acknowledgement for
527        let mut unack_tickets = Vec::with_capacity(half_keys_challenges.len());
528        for (half_key, challenge) in half_keys_challenges {
529            metrics::inc_lookups();
530
531            let Some(unack_ticket) = awaiting_ack_from_peer.remove(&challenge).await else {
532                tracing::trace!(%challenge, "received acknowledgement for unknown ticket");
533                metrics::inc_lookup_misses();
534                continue;
535            };
536
537            let issuer_channel = match self
538                .chain_api
539                .channel_by_parties(unack_ticket.ticket.verified_issuer(), self.chain_key.as_ref())
540                .await
541            {
542                Ok(Some(channel)) => {
543                    if channel.channel_epoch != unack_ticket.verified_ticket().channel_epoch {
544                        tracing::error!(%unack_ticket, "received acknowledgement for ticket issued in a different epoch");
545                        continue;
546                    }
547                    channel
548                }
549                Ok(None) => {
550                    tracing::error!(%unack_ticket, "received acknowledgement for ticket issued for unknown channel");
551                    continue;
552                }
553                Err(error) => {
554                    tracing::error!(%error, %unack_ticket, "failed to resolve channel for unacknowledged ticket");
555                    continue;
556                }
557            };
558
559            unack_tickets.push((issuer_channel, half_key, unack_ticket));
560        }
561
562        let domain_separator = self.channels_dst;
563        let chain_key = self.chain_key.clone();
564        Ok(hopr_parallelize::cpu::spawn_fifo_blocking(
565            move || {
566                #[cfg(feature = "rayon")]
567                let iter = unack_tickets.into_par_iter();
568
569                #[cfg(not(feature = "rayon"))]
570                let iter = unack_tickets.into_iter();
571
572                iter.filter_map(|(channel, half_key, unack_ticket)| {
573                    // This explicitly checks whether the acknowledgement
574                    // solves the challenge on the ticket.
575                    // It must be done before we check that the ticket is winning,
576                    // which is a lengthy operation and should not be done for
577                    // bogus unacknowledged tickets
578                    let Ok(ack_ticket) = unack_ticket.acknowledge(&half_key) else {
579                        tracing::error!(%unack_ticket, "failed to acknowledge ticket");
580                        return None;
581                    };
582
583                    // This operation checks if the ticket is winning, and if it is, it
584                    // turns it into a redeemable ticket.
585                    match ack_ticket.into_redeemable(&chain_key, &domain_separator) {
586                        Ok(redeemable) => {
587                            tracing::trace!(%channel, "found winning ticket");
588                            Some(ResolvedAcknowledgement::RelayingWin(Box::new(redeemable)))
589                        }
590                        Err(CoreTypesError::TicketNotWinning) => {
591                            tracing::trace!(%channel, "found losing ticket");
592                            Some(ResolvedAcknowledgement::RelayingLoss(*channel.get_id()))
593                        }
594                        Err(error) => {
595                            tracing::error!(%error, %channel, "error when acknowledging ticket");
596                            Some(ResolvedAcknowledgement::RelayingLoss(*channel.get_id()))
597                        }
598                    }
599                })
600                .collect::<Vec<_>>()
601            },
602            "ticket_into_redeemable",
603        )
604        .await
605        .map_err(|e| TicketAcknowledgementError::Inner(HoprProtocolError::from(e)))?)
606    }
607}
608
609#[async_trait::async_trait]
610impl<Chain, Db> TicketTracker for HoprTicketProcessor<Chain, Db>
611where
612    Chain: Send + Sync,
613    Db: HoprDbTicketOperations + Clone + Send + Sync + 'static,
614{
615    type Error = Arc<Db::Error>;
616
617    async fn next_outgoing_ticket_index(&self, channel: &ChannelEntry) -> Result<u64, Self::Error> {
618        let channel_id = *channel.get_id();
619        let epoch = channel.channel_epoch;
620        let current_idx = channel.ticket_index;
621        self.out_ticket_index
622            .try_get_with((channel_id, epoch), async {
623                self.db
624                    .get_or_create_outgoing_ticket_index(&channel_id, epoch, current_idx)
625                    .await
626                    .map(|maybe_idx| Arc::new(AtomicU64::new(maybe_idx.unwrap_or_default())))
627            })
628            .await
629            .map(|idx| idx.fetch_add(1, std::sync::atomic::Ordering::SeqCst))
630    }
631
632    async fn incoming_channel_unrealized_balance(
633        &self,
634        channel_id: &ChannelId,
635        epoch: u32,
636        index: u64,
637    ) -> Result<HoprBalance, Self::Error> {
638        // This value cannot be cached here and must be cached in the DB
639        // because the cache invalidation logic can be only done from within the DB.
640        // The DB caches this value based on the ChannelId and Epoch, regardless of the index,
641        // but the index is used to filter the ticket value.
642        self.db
643            .get_tickets_value(TicketSelector::new(*channel_id, epoch).with_index_range(index..))
644            .await
645            .map_err(Into::into)
646    }
647}
648
649#[cfg(test)]
650mod tests {
651    use hopr_crypto_random::Randomizable;
652
653    use super::*;
654    use crate::utils::*;
655
656    #[tokio::test]
657    async fn ticket_processor_should_acknowledge_previously_inserted_tickets() -> anyhow::Result<()> {
658        let blokli_client = create_blokli_client()?;
659
660        let node = create_node(1, &blokli_client).await?;
661
662        let ticket_processor = HoprTicketProcessor::new(
663            node.chain_api.clone(),
664            node.node_db.clone(),
665            node.chain_key.clone(),
666            Hash::default(),
667            HoprTicketProcessorConfig::default(),
668        );
669
670        const NUM_TICKETS: usize = 5;
671
672        let mut acks = Vec::with_capacity(5);
673        for index in 0..NUM_TICKETS {
674            let own_share = HalfKey::random();
675            let ack_share = HalfKey::random();
676            let challenge = Challenge::from_own_share_and_half_key(&own_share.to_challenge()?, &ack_share)?;
677
678            let unack_ticket = TicketBuilder::default()
679                .counterparty(&PEERS[1].0)
680                .index(index as u64)
681                .channel_epoch(1)
682                .amount(10_u32)
683                .challenge(challenge)
684                .build_signed(&PEERS[0].0, &Hash::default())?
685                .into_unacknowledged(own_share);
686
687            ticket_processor
688                .insert_unacknowledged_ticket(PEERS[2].1.public(), ack_share.to_challenge()?, unack_ticket)
689                .await?;
690
691            acks.push(VerifiedAcknowledgement::new(ack_share, &PEERS[2].1).leak());
692        }
693
694        let resolutions = ticket_processor.acknowledge_tickets(*PEERS[2].1.public(), acks).await?;
695        assert_eq!(NUM_TICKETS, resolutions.len());
696        assert!(
697            resolutions
698                .iter()
699                .all(|res| matches!(res, ResolvedAcknowledgement::RelayingWin(_)))
700        );
701
702        Ok(())
703    }
704
705    #[tokio::test]
706    async fn ticket_processor_should_reject_acknowledgements_from_unexpected_sender() -> anyhow::Result<()> {
707        let blokli_client = create_blokli_client()?;
708
709        let node = create_node(1, &blokli_client).await?;
710
711        let ticket_processor = HoprTicketProcessor::new(
712            node.chain_api.clone(),
713            node.node_db.clone(),
714            node.chain_key.clone(),
715            Hash::default(),
716            HoprTicketProcessorConfig::default(),
717        );
718
719        const NUM_ACKS: usize = 5;
720
721        let mut acks = Vec::with_capacity(5);
722        for _ in 0..NUM_ACKS {
723            let ack_share = HalfKey::random();
724            acks.push(VerifiedAcknowledgement::new(ack_share, &PEERS[2].1).leak());
725        }
726
727        assert!(matches!(
728            ticket_processor.acknowledge_tickets(*PEERS[2].1.public(), acks).await,
729            Err(TicketAcknowledgementError::UnexpectedAcknowledgement)
730        ));
731
732        Ok(())
733    }
734}