Skip to main content

hopr_protocol_hopr/
ticket_processing.rs

1use hopr_api::{
2    chain::ChainReadChannelOperations,
3    types::{crypto::prelude::*, internal::prelude::*},
4};
5#[cfg(feature = "rayon")]
6use hopr_utils::parallelize::cpu::rayon::prelude::*;
7use validator::ValidationError;
8
9use crate::{HoprProtocolError, ResolvedAcknowledgement, TicketAcknowledgementError, UnacknowledgedTicketProcessor};
10
11#[cfg(all(feature = "telemetry", not(test)))]
12lazy_static::lazy_static! {
13        /// Whether per-peer metrics are enabled (disabled by default to avoid cardinality explosion).
14        static ref PER_PEER_ENABLED: bool = std::env::var("HOPR_METRICS_UNACK_PER_PEER")
15            .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
16            .unwrap_or(false);
17
18        static ref UNACK_PEERS: hopr_types::telemetry::SimpleGauge = hopr_types::telemetry::SimpleGauge::new(
19            "hopr_tickets_unack_peers_total",
20            "Number of peers with unacknowledged tickets in cache",
21        )
22        .unwrap();
23        static ref UNACK_TICKETS: hopr_types::telemetry::SimpleGauge = hopr_types::telemetry::SimpleGauge::new(
24            "hopr_tickets_unack_tickets_total",
25            "Total number of unacknowledged tickets across all peer caches",
26        )
27        .unwrap();
28        static ref UNACK_INSERTIONS: hopr_types::telemetry::SimpleCounter = hopr_types::telemetry::SimpleCounter::new(
29            "hopr_tickets_unack_insertions_total",
30            "Total number of unacknowledged tickets inserted into cache",
31        )
32        .unwrap();
33        static ref UNACK_LOOKUPS: hopr_types::telemetry::SimpleCounter = hopr_types::telemetry::SimpleCounter::new(
34            "hopr_tickets_unack_lookups_total",
35            "Total number of ticket acknowledgement lookups",
36        )
37        .unwrap();
38        static ref UNACK_LOOKUP_MISSES: hopr_types::telemetry::SimpleCounter = hopr_types::telemetry::SimpleCounter::new(
39            "hopr_tickets_unack_lookup_misses_total",
40            "Total number of ticket lookup failures (unknown ticket)",
41        )
42        .unwrap();
43        static ref UNACK_EVICTIONS: hopr_types::telemetry::SimpleCounter = hopr_types::telemetry::SimpleCounter::new(
44            "hopr_tickets_unack_evictions_total",
45            "Total number of unacknowledged tickets evicted from cache due to TTL or capacity limits",
46        )
47        .unwrap();
48        static ref UNACK_PEER_EVICTIONS: hopr_types::telemetry::SimpleCounter = hopr_types::telemetry::SimpleCounter::new(
49            "hopr_tickets_unack_peer_evictions_total",
50            "Total number of peer caches evicted from the outer unacknowledged ticket cache",
51        )
52        .unwrap();
53        static ref UNACK_TICKETS_PER_PEER: hopr_types::telemetry::MultiGauge = hopr_types::telemetry::MultiGauge::new(
54            "hopr_tickets_unack_tickets_per_peer",
55            "Number of unacknowledged tickets per peer in cache (enable with HOPR_METRICS_UNACK_PER_PEER=1)",
56            &["peer"],
57        )
58        .unwrap();
59}
60
61const MIN_UNACK_TICKET_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
62fn validate_unack_ticket_timeout(timeout: &std::time::Duration) -> Result<(), ValidationError> {
63    if timeout < &MIN_UNACK_TICKET_TIMEOUT {
64        Err(ValidationError::new("unack_ticket_timeout too low"))
65    } else {
66        Ok(())
67    }
68}
69
70fn default_unack_ticket_timeout() -> std::time::Duration {
71    std::time::Duration::from_secs(30)
72}
73
74fn default_max_unack_tickets() -> usize {
75    10_000_000
76}
77
78fn just_true() -> bool {
79    true
80}
81
82/// Configuration for the [`HoprUnacknowledgedTicketProcessor`].
83
84#[derive(Debug, Clone, Copy, smart_default::SmartDefault, PartialEq, validator::Validate)]
85#[cfg_attr(
86    feature = "serde",
87    derive(serde::Deserialize, serde::Serialize),
88    serde(deny_unknown_fields)
89)]
90pub struct HoprUnacknowledgedTicketProcessorConfig {
91    /// Time after which an unacknowledged ticket is considered stale and is removed from the cache.
92    ///
93    /// If the counterparty does not send an acknowledgement within this period, the ticket is lost forever.
94    ///
95    /// Default is 30 seconds.
96    #[default(default_unack_ticket_timeout())]
97    #[validate(custom(function = "validate_unack_ticket_timeout"))]
98    #[cfg_attr(
99        feature = "serde",
100        serde(default = "default_unack_ticket_timeout", with = "humantime_serde")
101    )]
102    pub unack_ticket_timeout: std::time::Duration,
103    /// Maximum number of unacknowledged tickets that can be stored in the cache at any given time.
104    ///
105    /// When more tickets are received, the oldest ones are discarded and lost forever.
106    ///
107    /// Default is 10 000 000.
108    #[default(default_max_unack_tickets())]
109    #[validate(range(min = 100))]
110    #[cfg_attr(feature = "serde", serde(default = "default_max_unack_tickets"))]
111    pub max_unack_tickets: usize,
112    /// Indicates whether to use batch verification algorithm for acknowledgements.
113    ///
114    /// This has a positive performance impact on higher workloads.
115    ///
116    /// Default is true.
117    #[default(just_true())]
118    #[cfg_attr(feature = "serde", serde(default = "just_true"))]
119    pub use_batch_verification: bool,
120}
121
122/// HOPR-specific implementation of [`UnacknowledgedTicketProcessor`].
123#[derive(Clone)]
124pub struct HoprUnacknowledgedTicketProcessor<Chain> {
125    unacknowledged_tickets:
126        moka::sync::Cache<OffchainPublicKey, moka::sync::Cache<HalfKeyChallenge, UnacknowledgedTicket>>,
127    chain_api: Chain,
128    chain_key: ChainKeypair,
129    channels_dst: Hash,
130    cfg: HoprUnacknowledgedTicketProcessorConfig,
131}
132
133impl<Chain> HoprUnacknowledgedTicketProcessor<Chain> {
134    /// Creates a new instance of the HOPR unacknowledged ticket processor.
135    pub fn new(
136        chain_api: Chain,
137        chain_key: ChainKeypair,
138        channels_dst: Hash,
139        cfg: HoprUnacknowledgedTicketProcessorConfig,
140    ) -> Self {
141        #[cfg(all(feature = "telemetry", not(test)))]
142        {
143            lazy_static::initialize(&PER_PEER_ENABLED);
144            lazy_static::initialize(&UNACK_PEERS);
145            lazy_static::initialize(&UNACK_TICKETS);
146            lazy_static::initialize(&UNACK_INSERTIONS);
147            lazy_static::initialize(&UNACK_LOOKUPS);
148            lazy_static::initialize(&UNACK_LOOKUP_MISSES);
149            lazy_static::initialize(&UNACK_EVICTIONS);
150            lazy_static::initialize(&UNACK_PEER_EVICTIONS);
151            if *PER_PEER_ENABLED {
152                lazy_static::initialize(&UNACK_TICKETS_PER_PEER);
153            }
154        }
155
156        #[allow(unused_mut)]
157        let mut builder = moka::sync::Cache::builder()
158            .time_to_idle(cfg.unack_ticket_timeout)
159            .max_capacity(100_000);
160
161        #[cfg(all(feature = "telemetry", not(test)))]
162        {
163            builder = builder.eviction_listener(
164                |_, value: moka::sync::Cache<HalfKeyChallenge, UnacknowledgedTicket>, cause| {
165                    if !matches!(cause, moka::notification::RemovalCause::Replaced) {
166                        UNACK_PEERS.decrement(1.0);
167
168                        if matches!(
169                            cause,
170                            moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size
171                        ) {
172                            UNACK_PEER_EVICTIONS.increment();
173                        }
174                    }
175                    // Explicitly invalidate all inner cache entries so their eviction
176                    // listeners fire (decrementing UNACK_TICKETS and per-peer metrics).
177                    // Without this, dropping the inner cache silently leaks those metrics.
178                    value.invalidate_all();
179                    // TODO: Improve the metrics in the module in general, because run_pending_tasks() is lengthy
180                    // See https://github.com/hoprnet/hoprnet/issues/8014
181                    // value.run_pending_tasks();
182                },
183            );
184        }
185
186        Self {
187            unacknowledged_tickets: builder.build(),
188            chain_api,
189            chain_key,
190            channels_dst,
191            cfg,
192        }
193    }
194}
195
196#[async_trait::async_trait]
197impl<Chain> UnacknowledgedTicketProcessor for HoprUnacknowledgedTicketProcessor<Chain>
198where
199    Chain: ChainReadChannelOperations + Send + Sync,
200{
201    type Error = HoprProtocolError;
202
203    #[tracing::instrument(skip(self, next_hop, challenge, ticket), level = "trace", fields(next_hop = next_hop.to_peerid_str()))]
204    fn insert_unacknowledged_ticket(
205        &self,
206        next_hop: &OffchainPublicKey,
207        challenge: HalfKeyChallenge,
208        ticket: UnacknowledgedTicket,
209    ) -> Result<(), Self::Error> {
210        tracing::trace!(%ticket, "received unacknowledged ticket");
211
212        self.unacknowledged_tickets
213            .get_with_by_ref(next_hop, || {
214                #[allow(unused_mut)]
215                let mut builder = moka::sync::Cache::builder()
216                    .time_to_live(self.cfg.unack_ticket_timeout)
217                    .max_capacity(self.cfg.max_unack_tickets as u64);
218
219                #[cfg(all(feature = "telemetry", not(test)))]
220                {
221                    UNACK_PEERS.increment(1.0);
222
223                    let next_hop = *next_hop;
224                    builder = builder.eviction_listener(move |_, _, cause| {
225                        let peer_id_for_eviction = next_hop.to_peerid_str();
226                        UNACK_TICKETS.decrement(1.0);
227                        UNACK_TICKETS_PER_PEER.decrement(
228                            &[if *PER_PEER_ENABLED {
229                                peer_id_for_eviction.as_str()
230                            } else {
231                                "redacted"
232                            }],
233                            1.0,
234                        );
235
236                        // Only count Expired/Size removals as evictions (not Explicit or Replaced)
237                        if matches!(
238                            cause,
239                            moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size
240                        ) {
241                            UNACK_EVICTIONS.increment();
242                        }
243                    });
244                }
245
246                builder.build()
247            })
248            .insert(challenge, ticket);
249
250        #[cfg(all(feature = "telemetry", not(test)))]
251        {
252            let peer_id = next_hop.to_peerid_str();
253            UNACK_INSERTIONS.increment();
254            UNACK_TICKETS.increment(1.0);
255            UNACK_TICKETS_PER_PEER.increment(
256                &[if *PER_PEER_ENABLED {
257                    peer_id.as_str()
258                } else {
259                    "redacted"
260                }],
261                1.0,
262            );
263        }
264
265        Ok(())
266    }
267
268    #[tracing::instrument(skip_all, level = "trace", fields(peer = peer.to_peerid_str()))]
269    fn acknowledge_tickets(
270        &self,
271        peer: OffchainPublicKey,
272        acks: Vec<Acknowledgement>,
273    ) -> Result<Vec<ResolvedAcknowledgement>, TicketAcknowledgementError<Self::Error>> {
274        // Check if we're even expecting an acknowledgement from this peer:
275        // We need to first do a check that does not update the popularity estimator of `peer` in this cache,
276        // so we actually allow the entry to time out eventually. However, this comes at the cost
277        // double-lookup.
278        if !self.unacknowledged_tickets.contains_key(&peer) {
279            tracing::trace!("not awaiting any acknowledgement from peer");
280            return Err(TicketAcknowledgementError::UnexpectedAcknowledgement);
281        }
282        let Some(awaiting_ack_from_peer) = self.unacknowledged_tickets.get(&peer) else {
283            tracing::trace!("not awaiting any acknowledgement from peer");
284            return Err(TicketAcknowledgementError::UnexpectedAcknowledgement);
285        };
286
287        // Verify all the acknowledgements and compute challenges from half-keys
288        let use_batch_verify = self.cfg.use_batch_verification;
289        let half_keys_challenges = if use_batch_verify {
290            // Uses regular verifications for small batches but switches to a more effective
291            // batch verification algorithm for larger ones.
292            let acks = Acknowledgement::verify_batch(acks.into_iter().map(|ack| (peer, ack)));
293
294            #[cfg(feature = "rayon")]
295            let iter = acks.into_par_iter();
296
297            #[cfg(not(feature = "rayon"))]
298            let iter = acks.into_iter();
299
300            iter.map(|verified| {
301                verified.and_then(|verified| Ok((*verified.ack_key_share(), verified.ack_key_share().to_challenge()?)))
302            })
303            .filter_map(|res| {
304                res.inspect_err(|error| tracing::error!(%error, "failed to process acknowledgement"))
305                    .ok()
306            })
307            .collect::<Vec<_>>()
308        } else {
309            #[cfg(feature = "rayon")]
310            let iter = acks.into_par_iter();
311
312            #[cfg(not(feature = "rayon"))]
313            let iter = acks.into_iter();
314
315            iter.map(|ack| {
316                ack.verify(&peer)
317                    .and_then(|verified| Ok((*verified.ack_key_share(), verified.ack_key_share().to_challenge()?)))
318            })
319            .filter_map(|res| {
320                res.inspect_err(|error| tracing::error!(%error, "failed to process acknowledgement"))
321                    .ok()
322            })
323            .collect::<Vec<_>>()
324        };
325
326        // Find all the tickets that we're awaiting acknowledgement for
327        let mut unack_tickets = Vec::with_capacity(half_keys_challenges.len());
328        for (half_key, challenge) in half_keys_challenges {
329            #[cfg(all(feature = "telemetry", not(test)))]
330            UNACK_LOOKUPS.increment();
331
332            let Some(unack_ticket) = awaiting_ack_from_peer.remove(&challenge) else {
333                #[cfg(all(feature = "telemetry", not(test)))]
334                UNACK_LOOKUP_MISSES.increment();
335                tracing::trace!(%challenge, "received acknowledgement for unknown ticket");
336                continue;
337            };
338
339            let issuer_channel = match self
340                .chain_api
341                .channel_by_parties(unack_ticket.ticket.verified_issuer(), self.chain_key.as_ref())
342            {
343                Ok(Some(channel)) => {
344                    if channel.channel_epoch != unack_ticket.verified_ticket().channel_epoch {
345                        tracing::error!(%unack_ticket, "received acknowledgement for ticket issued in a different epoch");
346                        continue;
347                    }
348                    channel
349                }
350                Ok(None) => {
351                    tracing::error!(%unack_ticket, "received acknowledgement for ticket issued for unknown channel");
352                    continue;
353                }
354                Err(error) => {
355                    tracing::error!(%error, %unack_ticket, "failed to resolve channel for unacknowledged ticket");
356                    continue;
357                }
358            };
359
360            unack_tickets.push((issuer_channel, half_key, unack_ticket));
361        }
362
363        #[cfg(feature = "rayon")]
364        let iter = unack_tickets.into_par_iter();
365
366        #[cfg(not(feature = "rayon"))]
367        let iter = unack_tickets.into_iter();
368
369        Ok(iter
370            .filter_map(|(channel, half_key, unack_ticket)| {
371                // This explicitly checks whether the acknowledgement
372                // solves the challenge on the ticket.
373                // It must be done before we check that the ticket is winning,
374                // which is a lengthy operation and should not be done for
375                // bogus unacknowledged tickets
376                let Ok(ack_ticket) = unack_ticket.acknowledge(&half_key) else {
377                    tracing::error!(%unack_ticket, "failed to acknowledge ticket");
378                    return None;
379                };
380
381                // This operation checks if the ticket is winning, and if it is, it
382                // turns it into a redeemable ticket.
383                match ack_ticket.into_redeemable(&self.chain_key, &self.channels_dst) {
384                    Ok(redeemable) => {
385                        tracing::trace!(%channel, "found winning ticket");
386                        Some(ResolvedAcknowledgement::RelayingWin(Box::new(redeemable)))
387                    }
388                    Err(CoreTypesError::TicketNotWinning) => {
389                        tracing::trace!(%channel, "found losing ticket");
390                        Some(ResolvedAcknowledgement::RelayingLoss(*channel.get_id()))
391                    }
392                    Err(error) => {
393                        tracing::error!(%error, %channel, "error when acknowledging ticket");
394                        Some(ResolvedAcknowledgement::RelayingLoss(*channel.get_id()))
395                    }
396                }
397            })
398            .collect())
399    }
400}
401
402#[cfg(test)]
403mod tests {
404    use hopr_api::types::crypto_random::Randomizable;
405
406    use super::*;
407    use crate::utils::*;
408
409    #[parameterized::parameterized(batch = { true, false })]
410    #[parameterized_macro(tokio::test)]
411    async fn ticket_processor_should_acknowledge_previously_inserted_tickets(batch: bool) -> anyhow::Result<()> {
412        let blokli_client = create_blokli_client()?;
413
414        let node = create_node(1, &blokli_client).await?;
415
416        let ticket_processor = HoprUnacknowledgedTicketProcessor::new(
417            node.chain_api.clone(),
418            node.chain_key.clone(),
419            Hash::default(),
420            HoprUnacknowledgedTicketProcessorConfig {
421                use_batch_verification: batch,
422                ..HoprUnacknowledgedTicketProcessorConfig::default()
423            },
424        );
425
426        const NUM_TICKETS: usize = 5;
427
428        let mut acks = Vec::with_capacity(5);
429        for index in 0..NUM_TICKETS {
430            let own_share = HalfKey::random();
431            let ack_share = HalfKey::random();
432            let challenge = Challenge::from_own_share_and_half_key(&own_share.to_challenge()?, &ack_share)?;
433
434            let unack_ticket = TicketBuilder::default()
435                .counterparty(&PEERS[1].0)
436                .index(index as u64)
437                .channel_epoch(1)
438                .amount(10_u32)
439                .challenge(challenge)
440                .build_signed(&PEERS[0].0, &Hash::default())?
441                .into_unacknowledged(own_share);
442
443            ticket_processor.insert_unacknowledged_ticket(
444                PEERS[2].1.public(),
445                ack_share.to_challenge()?,
446                unack_ticket,
447            )?;
448
449            acks.push(VerifiedAcknowledgement::new(ack_share, &PEERS[2].1).leak());
450        }
451
452        let resolutions = ticket_processor.acknowledge_tickets(*PEERS[2].1.public(), acks)?;
453        assert_eq!(NUM_TICKETS, resolutions.len());
454        assert!(
455            resolutions
456                .iter()
457                .all(|res| matches!(res, ResolvedAcknowledgement::RelayingWin(_)))
458        );
459
460        Ok(())
461    }
462
463    #[tokio::test]
464    async fn ticket_processor_should_reject_acknowledgements_from_unexpected_sender() -> anyhow::Result<()> {
465        let blokli_client = create_blokli_client()?;
466
467        let node = create_node(1, &blokli_client).await?;
468
469        let ticket_processor = HoprUnacknowledgedTicketProcessor::new(
470            node.chain_api.clone(),
471            node.chain_key.clone(),
472            Hash::default(),
473            HoprUnacknowledgedTicketProcessorConfig::default(),
474        );
475
476        const NUM_ACKS: usize = 5;
477
478        let mut acks = Vec::with_capacity(5);
479        for _ in 0..NUM_ACKS {
480            let ack_share = HalfKey::random();
481            acks.push(VerifiedAcknowledgement::new(ack_share, &PEERS[2].1).leak());
482        }
483
484        assert!(matches!(
485            ticket_processor.acknowledge_tickets(*PEERS[2].1.public(), acks),
486            Err(TicketAcknowledgementError::UnexpectedAcknowledgement)
487        ));
488
489        Ok(())
490    }
491
492    #[tokio::test]
493    async fn ticket_processor_should_ignore_bogus_acknowledgements() -> anyhow::Result<()> {
494        let blokli_client = create_blokli_client()?;
495        let node = create_node(1, &blokli_client).await?;
496        let ticket_processor = HoprUnacknowledgedTicketProcessor::new(
497            node.chain_api.clone(),
498            node.chain_key.clone(),
499            Hash::default(),
500            HoprUnacknowledgedTicketProcessorConfig::default(),
501        );
502
503        let own_share = HalfKey::random();
504        let ack_share = HalfKey::random();
505        let challenge = Challenge::from_own_share_and_half_key(&own_share.to_challenge()?, &ack_share)?;
506
507        let unack_ticket = TicketBuilder::default()
508            .counterparty(&PEERS[1].0)
509            .index(0)
510            .channel_epoch(1)
511            .amount(10_u32)
512            .challenge(challenge)
513            .build_signed(&PEERS[0].0, &Hash::default())?
514            .into_unacknowledged(own_share);
515
516        ticket_processor.insert_unacknowledged_ticket(PEERS[2].1.public(), ack_share.to_challenge()?, unack_ticket)?;
517
518        // Create an acknowledgement from a DIFFERENT peer, but we'll try to pass it as if it's from PEERS[2]
519        let bogus_ack_share = HalfKey::random();
520        let bogus_ack = VerifiedAcknowledgement::new(bogus_ack_share, &PEERS[1].1).leak();
521
522        let resolutions = ticket_processor.acknowledge_tickets(*PEERS[2].1.public(), vec![bogus_ack])?;
523        // Should ignore because signature is invalid (it's from PEERS[1] but we're telling the processor it's from
524        // PEERS[2])
525        assert_eq!(0, resolutions.len());
526
527        Ok(())
528    }
529
530    #[tokio::test]
531    async fn ticket_processor_should_ignore_unknown_challenges() -> anyhow::Result<()> {
532        let blokli_client = create_blokli_client()?;
533        let node = create_node(1, &blokli_client).await?;
534        let ticket_processor = HoprUnacknowledgedTicketProcessor::new(
535            node.chain_api.clone(),
536            node.chain_key.clone(),
537            Hash::default(),
538            HoprUnacknowledgedTicketProcessorConfig::default(),
539        );
540
541        // We insert ONE ticket
542        let own_share = HalfKey::random();
543        let ack_share = HalfKey::random();
544        let challenge = Challenge::from_own_share_and_half_key(&own_share.to_challenge()?, &ack_share)?;
545
546        let unack_ticket = TicketBuilder::default()
547            .counterparty(&PEERS[1].0)
548            .index(0)
549            .channel_epoch(1)
550            .amount(10_u32)
551            .challenge(challenge)
552            .build_signed(&PEERS[0].0, &Hash::default())?
553            .into_unacknowledged(own_share);
554
555        ticket_processor.insert_unacknowledged_ticket(PEERS[2].1.public(), ack_share.to_challenge()?, unack_ticket)?;
556
557        // Now we send TWO acks: one for the ticket we inserted, and one random/unknown
558        let unknown_ack_share = HalfKey::random();
559        let acks = vec![
560            VerifiedAcknowledgement::new(ack_share, &PEERS[2].1).leak(),
561            VerifiedAcknowledgement::new(unknown_ack_share, &PEERS[2].1).leak(),
562        ];
563
564        let resolutions = ticket_processor.acknowledge_tickets(*PEERS[2].1.public(), acks)?;
565        // Should only resolve the one we knew about
566        assert_eq!(1, resolutions.len());
567
568        Ok(())
569    }
570
571    #[tokio::test]
572    async fn ticket_processor_should_ignore_epoch_mismatch() -> anyhow::Result<()> {
573        let blokli_client = create_blokli_client()?;
574        let node = create_node(1, &blokli_client).await?;
575        let ticket_processor = HoprUnacknowledgedTicketProcessor::new(
576            node.chain_api.clone(),
577            node.chain_key.clone(),
578            Hash::default(),
579            HoprUnacknowledgedTicketProcessorConfig::default(),
580        );
581
582        let own_share = HalfKey::random();
583        let ack_share = HalfKey::random();
584        let challenge = Challenge::from_own_share_and_half_key(&own_share.to_challenge()?, &ack_share)?;
585
586        // Channel in utils.rs has epoch 1. We'll create a ticket with epoch 2.
587        let unack_ticket = TicketBuilder::default()
588            .counterparty(&PEERS[1].0)
589            .index(0)
590            .channel_epoch(2) // Mismatch!
591            .amount(10_u32)
592            .challenge(challenge)
593            .build_signed(&PEERS[0].0, &Hash::default())?
594            .into_unacknowledged(own_share);
595
596        ticket_processor.insert_unacknowledged_ticket(PEERS[2].1.public(), ack_share.to_challenge()?, unack_ticket)?;
597
598        let ack = VerifiedAcknowledgement::new(ack_share, &PEERS[2].1).leak();
599        let resolutions = ticket_processor.acknowledge_tickets(*PEERS[2].1.public(), vec![ack])?;
600        assert_eq!(0, resolutions.len());
601
602        Ok(())
603    }
604
605    #[tokio::test]
606    async fn ticket_processor_should_ignore_missing_channel() -> anyhow::Result<()> {
607        let blokli_client = create_blokli_client()?;
608        let node = create_node(1, &blokli_client).await?;
609        let ticket_processor = HoprUnacknowledgedTicketProcessor::new(
610            node.chain_api.clone(),
611            node.chain_key.clone(),
612            Hash::default(),
613            HoprUnacknowledgedTicketProcessorConfig::default(),
614        );
615
616        let own_share = HalfKey::random();
617        let ack_share = HalfKey::random();
618        let challenge = Challenge::from_own_share_and_half_key(&own_share.to_challenge()?, &ack_share)?;
619
620        let unknown_issuer = ChainKeypair::random();
621
622        let unack_ticket = TicketBuilder::default()
623            .counterparty(&node.chain_key)
624            .index(0)
625            .channel_epoch(1)
626            .amount(10_u32)
627            .challenge(challenge)
628            .build_signed(&unknown_issuer, &Hash::default())?
629            .into_unacknowledged(own_share);
630
631        ticket_processor.insert_unacknowledged_ticket(PEERS[2].1.public(), ack_share.to_challenge()?, unack_ticket)?;
632
633        let ack = VerifiedAcknowledgement::new(ack_share, &PEERS[2].1).leak();
634        let resolutions = ticket_processor.acknowledge_tickets(*PEERS[2].1.public(), vec![ack])?;
635        assert_eq!(
636            0,
637            resolutions.len(),
638            "Expected 0 resolutions for missing channel, got: {:?}",
639            resolutions
640        );
641
642        Ok(())
643    }
644
645    #[tokio::test]
646    async fn ticket_processor_should_handle_losing_tickets() -> anyhow::Result<()> {
647        let blokli_client = create_blokli_client()?;
648        let node = create_node(1, &blokli_client).await?;
649        let ticket_processor = HoprUnacknowledgedTicketProcessor::new(
650            node.chain_api.clone(),
651            node.chain_key.clone(),
652            Hash::default(),
653            HoprUnacknowledgedTicketProcessorConfig::default(),
654        );
655
656        let own_share = HalfKey::random();
657        let ack_share = HalfKey::random();
658        let challenge = Challenge::from_own_share_and_half_key(&own_share.to_challenge()?, &ack_share)?;
659
660        let unack_ticket = TicketBuilder::default()
661            .counterparty(&PEERS[1].0)
662            .index(0)
663            .win_prob(WinningProbability::NEVER)
664            .channel_epoch(1)
665            .amount(0_u32)
666            .challenge(challenge)
667            .build_signed(&PEERS[0].0, &Hash::default())?
668            .into_unacknowledged(own_share);
669
670        ticket_processor.insert_unacknowledged_ticket(PEERS[2].1.public(), ack_share.to_challenge()?, unack_ticket)?;
671
672        let ack = VerifiedAcknowledgement::new(ack_share, &PEERS[2].1).leak();
673        let resolutions = ticket_processor.acknowledge_tickets(*PEERS[2].1.public(), vec![ack])?;
674        assert_eq!(1, resolutions.len());
675        assert!(
676            matches!(resolutions[0], ResolvedAcknowledgement::RelayingLoss(_)),
677            "Expected RelayingLoss for 0-amount ticket, got: {:?}",
678            resolutions[0]
679        );
680
681        Ok(())
682    }
683}