hopr_protocol_hopr/
ticket_processing.rs

1use std::sync::{Arc, atomic::AtomicU64};
2
3use futures::StreamExt;
4use hopr_api::{chain::ChainReadChannelOperations, db::HoprDbTicketOperations};
5use hopr_crypto_types::prelude::*;
6use hopr_internal_types::prelude::*;
7#[cfg(feature = "rayon")]
8use hopr_parallelize::cpu::rayon::prelude::*;
9use hopr_primitive_types::balance::HoprBalance;
10use validator::ValidationError;
11
12use crate::{
13    HoprProtocolError, ResolvedAcknowledgement, TicketAcknowledgementError, TicketTracker,
14    UnacknowledgedTicketProcessor,
15};
16
17const MIN_UNACK_TICKET_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
18fn validate_unack_ticket_timeout(timeout: &std::time::Duration) -> Result<(), ValidationError> {
19    if timeout < &MIN_UNACK_TICKET_TIMEOUT {
20        Err(ValidationError::new("unack_ticket_timeout too low"))
21    } else {
22        Ok(())
23    }
24}
25
26const MIN_OUTGOING_INDEX_RETENTION: std::time::Duration = std::time::Duration::from_secs(10);
27
28fn validate_outgoing_index_retention(retention: &std::time::Duration) -> Result<(), ValidationError> {
29    if retention < &MIN_OUTGOING_INDEX_RETENTION {
30        Err(ValidationError::new("outgoing_index_cache_retention too low"))
31    } else {
32        Ok(())
33    }
34}
35
36fn default_outgoing_index_retention() -> std::time::Duration {
37    std::time::Duration::from_secs(10)
38}
39
40fn default_unack_ticket_timeout() -> std::time::Duration {
41    std::time::Duration::from_secs(30)
42}
43
44fn default_max_unack_tickets() -> usize {
45    10_000_000
46}
47
48fn just_true() -> bool {
49    true
50}
51
52/// Configuration for the HOPR ticket processor within the packet pipeline.
53
54#[derive(Debug, Clone, Copy, smart_default::SmartDefault, PartialEq, validator::Validate)]
55#[cfg_attr(
56    feature = "serde",
57    derive(serde::Deserialize, serde::Serialize),
58    serde(deny_unknown_fields)
59)]
60pub struct HoprTicketProcessorConfig {
61    /// Time after which an unacknowledged ticket is considered stale and is removed from the cache.
62    ///
63    /// If the counterparty does not send an acknowledgement within this period, the ticket is lost forever.
64    ///
65    /// Default is 30 seconds.
66    #[default(default_unack_ticket_timeout())]
67    #[validate(custom(function = "validate_unack_ticket_timeout"))]
68    #[cfg_attr(
69        feature = "serde",
70        serde(default = "default_unack_ticket_timeout", with = "humantime_serde")
71    )]
72    pub unack_ticket_timeout: std::time::Duration,
73    /// Maximum number of unacknowledged tickets that can be stored in the cache at any given time.
74    ///
75    /// When more tickets are received, the oldest ones are discarded and lost forever.
76    ///
77    /// Default is 10 000 000.
78    #[default(default_max_unack_tickets())]
79    #[validate(range(min = 100))]
80    #[cfg_attr(feature = "serde", serde(default = "default_max_unack_tickets"))]
81    pub max_unack_tickets: usize,
82    /// Period for which the outgoing ticket index is cached in memory for each channel.
83    ///
84    /// Default is 10 seconds.
85    #[default(default_outgoing_index_retention())]
86    #[validate(custom(function = "validate_outgoing_index_retention"))]
87    #[cfg_attr(
88        feature = "serde",
89        serde(default = "default_outgoing_index_retention", with = "humantime_serde")
90    )]
91    pub outgoing_index_cache_retention: std::time::Duration,
92    /// Indicates whether to use batch verification algorithm for acknowledgements.
93    ///
94    /// This has a positive performance impact on higher workloads.
95    ///
96    /// Default is true.
97    #[default(just_true())]
98    #[cfg_attr(feature = "serde", serde(default = "just_true"))]
99    pub use_batch_verification: bool,
100}
101
102/// HOPR-specific implementation of [`UnacknowledgedTicketProcessor`] and [`TicketTracker`].
103#[derive(Clone)]
104pub struct HoprTicketProcessor<Chain, Db> {
105    unacknowledged_tickets:
106        moka::future::Cache<OffchainPublicKey, moka::future::Cache<HalfKeyChallenge, UnacknowledgedTicket>>,
107    out_ticket_index: moka::future::Cache<(ChannelId, u32), Arc<AtomicU64>>,
108    db: Db,
109    chain_api: Chain,
110    chain_key: ChainKeypair,
111    channels_dst: Hash,
112    cfg: HoprTicketProcessorConfig,
113}
114
115impl<Chain, Db> HoprTicketProcessor<Chain, Db> {
116    /// Creates a new instance of the HOPR ticket processor.
117    pub fn new(
118        chain_api: Chain,
119        db: Db,
120        chain_key: ChainKeypair,
121        channels_dst: Hash,
122        cfg: HoprTicketProcessorConfig,
123    ) -> Self {
124        Self {
125            out_ticket_index: moka::future::Cache::builder()
126                .time_to_idle(cfg.outgoing_index_cache_retention)
127                .max_capacity(10_000)
128                .build(),
129            unacknowledged_tickets: moka::future::Cache::builder()
130                .time_to_idle(cfg.unack_ticket_timeout)
131                .max_capacity(100_000)
132                .build(),
133            chain_api,
134            db,
135            chain_key,
136            channels_dst,
137            cfg,
138        }
139    }
140}
141
142impl<Chain, Db> HoprTicketProcessor<Chain, Db>
143where
144    Db: HoprDbTicketOperations + Clone + Send + 'static,
145{
146    /// Task that performs periodic synchronization of the outgoing ticket index cache
147    /// to the underlying database.
148    ///
149    /// If this task is not started, the outgoing ticket indices will not survive a node
150    /// restart and will result in invalid tickets received by the counterparty.
151    pub fn outgoing_index_sync_task(
152        &self,
153        reg: futures::future::AbortRegistration,
154    ) -> impl Future<Output = ()> + use<Db, Chain> {
155        let index_save_stream = futures::stream::Abortable::new(
156            futures_time::stream::interval(futures_time::time::Duration::from(
157                self.cfg.outgoing_index_cache_retention.div_f32(2.0),
158            )),
159            reg,
160        );
161
162        let db = self.db.clone();
163        let out_ticket_index = self.out_ticket_index.clone();
164
165        index_save_stream
166            .for_each(move |_| {
167                let db = db.clone();
168                let out_ticket_index = out_ticket_index.clone();
169                async move {
170                    // This iteration does not alter the popularity estimator of the cache
171                    // and therefore still allows the unused entries to expire
172                    for (channel_key, out_idx) in out_ticket_index.iter() {
173                        if let Err(error) = db
174                            .update_outgoing_ticket_index(
175                                &channel_key.0,
176                                channel_key.1,
177                                out_idx.load(std::sync::atomic::Ordering::SeqCst),
178                            )
179                            .await
180                        {
181                            tracing::error!(%error, channel_id = %channel_key.0, epoch = channel_key.1, "failed to sync outgoing ticket index to db");
182                        }
183                    }
184                    tracing::trace!("synced outgoing ticket indices to db");
185                }
186            })
187    }
188}
189
190#[async_trait::async_trait]
191impl<Chain, Db> UnacknowledgedTicketProcessor for HoprTicketProcessor<Chain, Db>
192where
193    Chain: ChainReadChannelOperations + Send + Sync,
194    Db: Send + Sync,
195{
196    type Error = HoprProtocolError;
197
198    #[tracing::instrument(skip(self, next_hop, challenge, ticket), level = "trace", fields(next_hop = next_hop.to_peerid_str()))]
199    async fn insert_unacknowledged_ticket(
200        &self,
201        next_hop: &OffchainPublicKey,
202        challenge: HalfKeyChallenge,
203        ticket: UnacknowledgedTicket,
204    ) -> Result<(), Self::Error> {
205        tracing::trace!(%ticket, "received unacknowledged ticket");
206        self.unacknowledged_tickets
207            .get_with_by_ref(next_hop, async {
208                moka::future::Cache::builder()
209                    .time_to_live(self.cfg.unack_ticket_timeout)
210                    .max_capacity(self.cfg.max_unack_tickets as u64)
211                    .build()
212            })
213            .await
214            .insert(challenge, ticket)
215            .await;
216
217        Ok(())
218    }
219
220    #[tracing::instrument(skip_all, level = "trace", fields(peer = peer.to_peerid_str()))]
221    async fn acknowledge_tickets(
222        &self,
223        peer: OffchainPublicKey,
224        acks: Vec<Acknowledgement>,
225    ) -> Result<Vec<ResolvedAcknowledgement>, TicketAcknowledgementError<Self::Error>> {
226        // Check if we're even expecting an acknowledgement from this peer:
227        // We need to first do a check that does not update the popularity estimator of `peer` in this cache,
228        // so we actually allow the entry to time out eventually. However, this comes at the cost
229        // double-lookup.
230        if !self.unacknowledged_tickets.contains_key(&peer) {
231            tracing::trace!("not awaiting any acknowledgement from peer");
232            return Err(TicketAcknowledgementError::UnexpectedAcknowledgement);
233        }
234        let Some(awaiting_ack_from_peer) = self.unacknowledged_tickets.get(&peer).await else {
235            tracing::trace!("not awaiting any acknowledgement from peer");
236            return Err(TicketAcknowledgementError::UnexpectedAcknowledgement);
237        };
238
239        // Verify all the acknowledgements and compute challenges from half-keys
240        let use_batch_verify = self.cfg.use_batch_verification;
241        let half_keys_challenges = hopr_parallelize::cpu::spawn_fifo_blocking(move || {
242            if use_batch_verify {
243                // Uses regular verifications for small batches but switches to a more effective
244                // batch verification algorithm for larger ones.
245                let acks = Acknowledgement::verify_batch(acks.into_iter().map(|ack| (peer, ack)));
246
247                #[cfg(feature = "rayon")]
248                let iter = acks.into_par_iter();
249
250                #[cfg(not(feature = "rayon"))]
251                let iter = acks.into_iter();
252
253                iter.map(|verified| {
254                    verified
255                        .and_then(|verified| Ok((*verified.ack_key_share(), verified.ack_key_share().to_challenge()?)))
256                })
257                .filter_map(|res| {
258                    res.inspect_err(|error| tracing::error!(%error, "failed to process acknowledgement"))
259                        .ok()
260                })
261                .collect::<Vec<_>>()
262            } else {
263                #[cfg(feature = "rayon")]
264                let iter = acks.into_par_iter();
265
266                #[cfg(not(feature = "rayon"))]
267                let iter = acks.into_iter();
268
269                iter.map(|ack| {
270                    ack.verify(&peer)
271                        .and_then(|verified| Ok((*verified.ack_key_share(), verified.ack_key_share().to_challenge()?)))
272                })
273                .filter_map(|res| {
274                    res.inspect_err(|error| tracing::error!(%error, "failed to process acknowledgement"))
275                        .ok()
276                })
277                .collect::<Vec<_>>()
278            }
279        })
280        .await;
281
282        // Find all the tickets that we're awaiting acknowledgement for
283        let mut unack_tickets = Vec::with_capacity(half_keys_challenges.len());
284        for (half_key, challenge) in half_keys_challenges {
285            let Some(unack_ticket) = awaiting_ack_from_peer.remove(&challenge).await else {
286                tracing::debug!(%challenge, "received acknowledgement for unknown ticket");
287                continue;
288            };
289
290            let issuer_channel = match self
291                .chain_api
292                .channel_by_parties(unack_ticket.ticket.verified_issuer(), self.chain_key.as_ref())
293                .await
294            {
295                Ok(Some(channel)) => {
296                    if channel.channel_epoch != unack_ticket.verified_ticket().channel_epoch {
297                        tracing::error!(%unack_ticket, "received acknowledgement for ticket issued in a different epoch");
298                        continue;
299                    }
300                    channel
301                }
302                Ok(None) => {
303                    tracing::error!(%unack_ticket, "received acknowledgement for ticket issued for unknown channel");
304                    continue;
305                }
306                Err(error) => {
307                    tracing::error!(%error, %unack_ticket, "failed to resolve channel for unacknowledged ticket");
308                    continue;
309                }
310            };
311
312            unack_tickets.push((issuer_channel, half_key, unack_ticket));
313        }
314
315        let domain_separator = self.channels_dst;
316        let chain_key = self.chain_key.clone();
317        Ok(hopr_parallelize::cpu::spawn_fifo_blocking(move || {
318            #[cfg(feature = "rayon")]
319            let iter = unack_tickets.into_par_iter();
320
321            #[cfg(not(feature = "rayon"))]
322            let iter = unack_tickets.into_iter();
323
324            iter.filter_map(|(channel, half_key, unack_ticket)| {
325                // This explicitly checks whether the acknowledgement
326                // solves the challenge on the ticket.
327                // It must be done before we check that the ticket is winning,
328                // which is a lengthy operation and should not be done for
329                // bogus unacknowledged tickets
330                let Ok(ack_ticket) = unack_ticket.acknowledge(&half_key) else {
331                    tracing::error!(%unack_ticket, "failed to acknowledge ticket");
332                    return None;
333                };
334
335                // This operation checks if the ticket is winning, and if it is, it
336                // turns it into a redeemable ticket.
337                match ack_ticket.into_redeemable(&chain_key, &domain_separator) {
338                    Ok(redeemable) => {
339                        tracing::debug!(%channel, "found winning ticket");
340                        Some(ResolvedAcknowledgement::RelayingWin(Box::new(redeemable)))
341                    }
342                    Err(CoreTypesError::TicketNotWinning) => {
343                        tracing::trace!(%channel, "found losing ticket");
344                        Some(ResolvedAcknowledgement::RelayingLoss(*channel.get_id()))
345                    }
346                    Err(error) => {
347                        tracing::error!(%error, %channel, "error when acknowledging ticket");
348                        Some(ResolvedAcknowledgement::RelayingLoss(*channel.get_id()))
349                    }
350                }
351            })
352            .collect::<Vec<_>>()
353        })
354        .await)
355    }
356}
357
358#[async_trait::async_trait]
359impl<Chain, Db> TicketTracker for HoprTicketProcessor<Chain, Db>
360where
361    Chain: Send + Sync,
362    Db: HoprDbTicketOperations + Clone + Send + Sync + 'static,
363{
364    type Error = Arc<Db::Error>;
365
366    async fn next_outgoing_ticket_index(&self, channel_id: &ChannelId, epoch: u32) -> Result<u64, Self::Error> {
367        let channel_id = *channel_id;
368        self.out_ticket_index
369            .try_get_with((channel_id, epoch), async {
370                self.db
371                    .get_or_create_outgoing_ticket_index(&channel_id, epoch)
372                    .await
373                    .map(|maybe_idx| Arc::new(AtomicU64::new(maybe_idx.unwrap_or_default())))
374            })
375            .await
376            .map(|idx| idx.fetch_add(1, std::sync::atomic::Ordering::SeqCst))
377    }
378
379    async fn incoming_channel_unrealized_balance(
380        &self,
381        channel_id: &ChannelId,
382        epoch: u32,
383    ) -> Result<HoprBalance, Self::Error> {
384        // This value cannot be cached here and must be cached in the DB
385        // because the cache invalidation logic can be only done from within the DB.
386        self.db.get_tickets_value(channel_id, epoch).await.map_err(Into::into)
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use hopr_crypto_random::Randomizable;
393
394    use super::*;
395    use crate::utils::*;
396
397    #[tokio::test]
398    async fn ticket_processor_should_acknowledge_previously_inserted_tickets() -> anyhow::Result<()> {
399        let blokli_client = create_blokli_client()?;
400
401        let node = create_node(1, &blokli_client).await?;
402
403        let ticket_processor = HoprTicketProcessor::new(
404            node.chain_api.clone(),
405            node.node_db.clone(),
406            node.chain_key.clone(),
407            Hash::default(),
408            HoprTicketProcessorConfig::default(),
409        );
410
411        const NUM_TICKETS: usize = 5;
412
413        let mut acks = Vec::with_capacity(5);
414        for index in 0..NUM_TICKETS {
415            let own_share = HalfKey::random();
416            let ack_share = HalfKey::random();
417            let challenge = Challenge::from_own_share_and_half_key(&own_share.to_challenge()?, &ack_share)?;
418
419            let unack_ticket = TicketBuilder::default()
420                .counterparty(&PEERS[1].0)
421                .index(index as u64)
422                .channel_epoch(1)
423                .amount(10_u32)
424                .challenge(challenge)
425                .build_signed(&PEERS[0].0, &Hash::default())?
426                .into_unacknowledged(own_share);
427
428            ticket_processor
429                .insert_unacknowledged_ticket(PEERS[2].1.public(), ack_share.to_challenge()?, unack_ticket)
430                .await?;
431
432            acks.push(VerifiedAcknowledgement::new(ack_share, &PEERS[2].1).leak());
433        }
434
435        let resolutions = ticket_processor.acknowledge_tickets(*PEERS[2].1.public(), acks).await?;
436        assert_eq!(NUM_TICKETS, resolutions.len());
437        assert!(
438            resolutions
439                .iter()
440                .all(|res| matches!(res, ResolvedAcknowledgement::RelayingWin(_)))
441        );
442
443        Ok(())
444    }
445
446    #[tokio::test]
447    async fn ticket_processor_should_reject_acknowledgements_from_unexpected_sender() -> anyhow::Result<()> {
448        let blokli_client = create_blokli_client()?;
449
450        let node = create_node(1, &blokli_client).await?;
451
452        let ticket_processor = HoprTicketProcessor::new(
453            node.chain_api.clone(),
454            node.node_db.clone(),
455            node.chain_key.clone(),
456            Hash::default(),
457            HoprTicketProcessorConfig::default(),
458        );
459
460        const NUM_ACKS: usize = 5;
461
462        let mut acks = Vec::with_capacity(5);
463        for _ in 0..NUM_ACKS {
464            let ack_share = HalfKey::random();
465            acks.push(VerifiedAcknowledgement::new(ack_share, &PEERS[2].1).leak());
466        }
467
468        assert!(matches!(
469            ticket_processor.acknowledge_tickets(*PEERS[2].1.public(), acks).await,
470            Err(TicketAcknowledgementError::UnexpectedAcknowledgement)
471        ));
472
473        Ok(())
474    }
475}