hopr_protocol_hopr/
ticket_processing.rs

1use std::sync::{Arc, atomic::AtomicU64};
2
3use futures::StreamExt;
4use hopr_api::{chain::ChainReadChannelOperations, db::HoprDbTicketOperations};
5use hopr_crypto_types::prelude::*;
6use hopr_internal_types::prelude::*;
7use hopr_primitive_types::balance::HoprBalance;
8use validator::ValidationError;
9
10use crate::{HoprProtocolError, ResolvedAcknowledgement, TicketTracker, UnacknowledgedTicketProcessor};
11
12const MIN_UNACK_TICKET_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
13fn validate_unack_ticket_timeout(timeout: &std::time::Duration) -> Result<(), ValidationError> {
14    if timeout < &MIN_UNACK_TICKET_TIMEOUT {
15        Err(ValidationError::new("unack_ticket_timeout too low"))
16    } else {
17        Ok(())
18    }
19}
20
21const MIN_OUTGOING_INDEX_RETENTION: std::time::Duration = std::time::Duration::from_secs(10);
22
23fn validate_outgoing_index_retention(retention: &std::time::Duration) -> Result<(), ValidationError> {
24    if retention < &MIN_OUTGOING_INDEX_RETENTION {
25        Err(ValidationError::new("outgoing_index_cache_retention too low"))
26    } else {
27        Ok(())
28    }
29}
30
31fn default_outgoing_index_retention() -> std::time::Duration {
32    std::time::Duration::from_secs(10)
33}
34
35fn default_unack_ticket_timeout() -> std::time::Duration {
36    std::time::Duration::from_secs(30)
37}
38
39fn default_max_unack_tickets() -> usize {
40    10_000_000
41}
42
43/// Configuration for the HOPR ticket processor within the packet pipeline.
44
45#[derive(Debug, Clone, Copy, smart_default::SmartDefault, PartialEq, validator::Validate)]
46#[cfg_attr(
47    feature = "serde",
48    derive(serde::Deserialize, serde::Serialize),
49    serde(deny_unknown_fields)
50)]
51pub struct HoprTicketProcessorConfig {
52    /// Time after which an unacknowledged ticket is considered stale and is removed from the cache.
53    ///
54    /// If the counterparty does not send an acknowledgement within this period, the ticket is lost forever.
55    ///
56    /// Default is 30 seconds.
57    #[default(default_unack_ticket_timeout())]
58    #[validate(custom(function = "validate_unack_ticket_timeout"))]
59    #[cfg_attr(
60        feature = "serde",
61        serde(default = "default_unack_ticket_timeout", with = "humantime_serde")
62    )]
63    pub unack_ticket_timeout: std::time::Duration,
64    /// Maximum number of unacknowledged tickets that can be stored in the cache at any given time.
65    ///
66    /// When more tickets are received, the oldest ones are discarded and lost forever.
67    ///
68    /// Default is 10 000 000.
69    #[default(default_max_unack_tickets())]
70    #[validate(range(min = 100))]
71    #[cfg_attr(feature = "serde", serde(default = "default_max_unack_tickets"))]
72    pub max_unack_tickets: usize,
73    /// Period for which the outgoing ticket index is cached in memory for each channel.
74    ///
75    /// Default is 10 seconds.
76    #[default(default_outgoing_index_retention())]
77    #[validate(custom(function = "validate_outgoing_index_retention"))]
78    #[cfg_attr(
79        feature = "serde",
80        serde(default = "default_outgoing_index_retention", with = "humantime_serde")
81    )]
82    pub outgoing_index_cache_retention: std::time::Duration,
83}
84
85/// HOPR-specific implementation of [`UnacknowledgedTicketProcessor`] and [`TicketTracker`].
86#[derive(Clone)]
87pub struct HoprTicketProcessor<Chain, Db> {
88    unacknowledged_tickets:
89        moka::future::Cache<OffchainPublicKey, moka::future::Cache<HalfKeyChallenge, UnacknowledgedTicket>>,
90    out_ticket_index: moka::future::Cache<(ChannelId, u32), Arc<AtomicU64>>,
91    db: Db,
92    chain_api: Chain,
93    chain_key: ChainKeypair,
94    channels_dst: Hash,
95    cfg: HoprTicketProcessorConfig,
96}
97
98impl<Chain, Db> HoprTicketProcessor<Chain, Db> {
99    /// Creates a new instance of the HOPR ticket processor.
100    pub fn new(
101        chain_api: Chain,
102        db: Db,
103        chain_key: ChainKeypair,
104        channels_dst: Hash,
105        cfg: HoprTicketProcessorConfig,
106    ) -> Self {
107        Self {
108            out_ticket_index: moka::future::Cache::builder()
109                .time_to_idle(cfg.outgoing_index_cache_retention)
110                .max_capacity(10_000)
111                .build(),
112            unacknowledged_tickets: moka::future::Cache::builder()
113                .time_to_idle(cfg.unack_ticket_timeout * 2)
114                .max_capacity(100_000)
115                .build(),
116            chain_api,
117            db,
118            chain_key,
119            channels_dst,
120            cfg,
121        }
122    }
123}
124
125impl<Chain, Db> HoprTicketProcessor<Chain, Db>
126where
127    Db: HoprDbTicketOperations + Clone + Send + 'static,
128{
129    /// Task that performs periodic synchronization of the outgoing ticket index cache
130    /// to the underlying database.
131    ///
132    /// If this task is not started, the outgoing ticket indices will not survive a node
133    /// restart and will result in invalid tickets received by the counterparty.
134    pub fn outgoing_index_sync_task(
135        &self,
136        reg: futures::future::AbortRegistration,
137    ) -> impl Future<Output = ()> + use<Db, Chain> {
138        let index_save_stream = futures::stream::Abortable::new(
139            futures_time::stream::interval(futures_time::time::Duration::from(
140                self.cfg.outgoing_index_cache_retention.div_f32(2.0),
141            )),
142            reg,
143        );
144
145        let db = self.db.clone();
146        let out_ticket_index = self.out_ticket_index.clone();
147
148        index_save_stream
149            .for_each(move |_| {
150                let db = db.clone();
151                let out_ticket_index = out_ticket_index.clone();
152                async move {
153                    // This iteration does not alter the popularity estimator of the cache
154                    // and therefore still allows the unused entries to expire
155                    for (channel_key, out_idx) in out_ticket_index.iter() {
156                        if let Err(error) = db
157                            .update_outgoing_ticket_index(
158                                &channel_key.0,
159                                channel_key.1,
160                                out_idx.load(std::sync::atomic::Ordering::SeqCst),
161                            )
162                            .await
163                        {
164                            tracing::error!(%error, channel_id = %channel_key.0, epoch = channel_key.1, "failed to sync outgoing ticket index to db");
165                        }
166                    }
167                    tracing::trace!("synced outgoing ticket indices to db");
168                }
169            })
170    }
171}
172
173#[async_trait::async_trait]
174impl<Chain, Db> UnacknowledgedTicketProcessor for HoprTicketProcessor<Chain, Db>
175where
176    Chain: ChainReadChannelOperations + Send + Sync,
177    Db: Send + Sync,
178{
179    type Error = HoprProtocolError;
180
181    async fn insert_unacknowledged_ticket(
182        &self,
183        next_hop: &OffchainPublicKey,
184        challenge: HalfKeyChallenge,
185        ticket: UnacknowledgedTicket,
186    ) -> Result<(), Self::Error> {
187        tracing::trace!(%ticket, "received unacknowledged ticket");
188        self.unacknowledged_tickets
189            .get_with_by_ref(next_hop, async {
190                moka::future::Cache::builder()
191                    .time_to_live(self.cfg.unack_ticket_timeout)
192                    .max_capacity(self.cfg.max_unack_tickets as u64)
193                    .build()
194            })
195            .await
196            .insert(challenge, ticket)
197            .await;
198
199        Ok(())
200    }
201
202    async fn acknowledge_ticket(
203        &self,
204        peer: OffchainPublicKey,
205        ack: Acknowledgement,
206    ) -> Result<Option<ResolvedAcknowledgement>, Self::Error> {
207        // Check if we're even expecting an acknowledgement from this peer
208        let Some(awaiting_ack_from_peer) = self.unacknowledged_tickets.get(&peer).await else {
209            tracing::trace!(%peer, "not awaiting any acknowledgement from peer");
210            return Ok(None);
211        };
212
213        // If we do, verify the acknowledgement signature and extract the challenge
214        let (half_key, challenge) = hopr_parallelize::cpu::spawn_fifo_blocking(move || {
215            ack.verify(&peer)
216                .and_then(|verified| Ok((*verified.ack_key_share(), verified.ack_key_share().to_challenge()?)))
217        })
218        .await?;
219
220        // Check if we have a ticket to be acknowledged by this peer
221        let unacknowledged = awaiting_ack_from_peer
222            .remove(&challenge)
223            .await
224            .ok_or_else(|| HoprProtocolError::UnacknowledgedTicketNotFound(challenge))?;
225
226        // Issuer's channel must have an epoch matching with the unacknowledged ticket
227        let issuer_channel = self
228            .chain_api
229            .channel_by_parties(unacknowledged.ticket.verified_issuer(), self.chain_key.as_ref())
230            .await
231            .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
232            .filter(|c| c.channel_epoch == unacknowledged.verified_ticket().channel_epoch)
233            .ok_or(HoprProtocolError::ChannelNotFound(
234                *unacknowledged.ticket.verified_issuer(),
235                *self.chain_key.as_ref(),
236            ))?;
237
238        let domain_separator = self.channels_dst;
239        let chain_key = self.chain_key.clone();
240        let channel_id = *issuer_channel.get_id();
241        hopr_parallelize::cpu::spawn_fifo_blocking(move || {
242            // This explicitly checks whether the acknowledgement
243            // solves the challenge on the ticket. It must be done before we
244            // check that the ticket is winning, which is a lengthy operation
245            // and should not be done for bogus unacknowledged tickets
246            let ack_ticket = unacknowledged.acknowledge(&half_key)?;
247
248            // This operation checks if the ticket is winning, and if it is, it
249            // turns it into a redeemable ticket.
250            match ack_ticket.into_redeemable(&chain_key, &domain_separator) {
251                Ok(redeemable) => {
252                    tracing::debug!(%issuer_channel, "found winning ticket");
253                    Ok(Some(ResolvedAcknowledgement::RelayingWin(Box::new(redeemable))))
254                }
255                Err(CoreTypesError::TicketNotWinning) => {
256                    tracing::trace!(%issuer_channel, "found losing ticket");
257                    Ok(Some(ResolvedAcknowledgement::RelayingLoss(channel_id)))
258                }
259                Err(error) => {
260                    tracing::error!(%error, %issuer_channel, "error when acknowledging ticket");
261                    Ok(Some(ResolvedAcknowledgement::RelayingLoss(channel_id)))
262                }
263            }
264        })
265        .await
266    }
267}
268
269#[async_trait::async_trait]
270impl<Chain, Db> TicketTracker for HoprTicketProcessor<Chain, Db>
271where
272    Chain: Send + Sync,
273    Db: HoprDbTicketOperations + Clone + Send + Sync + 'static,
274{
275    type Error = Arc<Db::Error>;
276
277    async fn next_outgoing_ticket_index(&self, channel_id: &ChannelId, epoch: u32) -> Result<u64, Self::Error> {
278        let channel_id = *channel_id;
279        self.out_ticket_index
280            .try_get_with((channel_id, epoch), async {
281                self.db
282                    .get_or_create_outgoing_ticket_index(&channel_id, epoch)
283                    .await
284                    .map(|maybe_idx| Arc::new(AtomicU64::new(maybe_idx.unwrap_or_default())))
285            })
286            .await
287            .map(|idx| idx.fetch_add(1, std::sync::atomic::Ordering::SeqCst))
288    }
289
290    async fn incoming_channel_unrealized_balance(
291        &self,
292        channel_id: &ChannelId,
293        epoch: u32,
294    ) -> Result<HoprBalance, Self::Error> {
295        // This value cannot be cached here and must be cached in the DB
296        // because the cache invalidation logic can be only done from within the DB.
297        self.db.get_tickets_value(channel_id, epoch).await.map_err(Into::into)
298    }
299}