Skip to main content

hopr_transport/protocol/pipeline/
mod.rs

1//! HOPR packet processing pipeline.
2
3mod builder;
4mod config;
5
6pub use builder::{PacketPipelineBuilder, Unset};
7use bytes::Bytes;
8pub use config::{AcknowledgementPipelineConfig, PacketPipelineConfig};
9use futures::{SinkExt, StreamExt, future::Either};
10use futures_time::{future::FutureExt as TimeExt, stream::StreamExt as TimeStreamExt};
11use hopr_api::{
12    PeerId,
13    node::TicketEvent,
14    types::{crypto::prelude::*, internal::prelude::*},
15};
16use hopr_crypto_packet::HoprSurb;
17use hopr_protocol_app::prelude::*;
18use hopr_protocol_hopr::prelude::*;
19use hopr_utils::{
20    network_types::timeout::{SinkTimeoutError, TimeoutSinkExt, TimeoutStreamExt},
21    runtime::AbortableList,
22};
23use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
24use tracing::Instrument;
25
26use crate::PeerProtocolCounterRegistry;
27
28/// Default concurrency for the incoming acknowledgement processing pipeline when not overridden
29/// via [`AcknowledgementPipelineConfig::ack_input_concurrency`].
30const DEFAULT_ACK_INPUT_CONCURRENCY: usize = 10;
31/// Default concurrency for the outgoing acknowledgement processing pipeline when not overridden
32/// via [`AcknowledgementPipelineConfig::ack_output_concurrency`].
33const DEFAULT_ACK_OUTPUT_CONCURRENCY: usize = 10;
34const QUEUE_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50);
35const PACKET_DECODING_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(150);
36const PACKET_ENCODING_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(150);
37
38#[cfg(all(feature = "telemetry", not(test)))]
39lazy_static::lazy_static! {
40    static ref METRIC_PACKET_COUNT:  hopr_types::telemetry::MultiCounter =  hopr_types::telemetry::MultiCounter::new(
41        "hopr_packets_count",
42        "Number of processed packets of different types (sent, received, forwarded)",
43        &["type"]
44    ).unwrap();
45    static ref METRIC_PACKET_REJECTED_COUNT: hopr_types::telemetry::MultiCounter = hopr_types::telemetry::MultiCounter::new(
46        "hopr_packet_rejected_count",
47        "Number of incoming packets rejected due various reasons",
48        &["reason"]
49    ).unwrap();
50    // Tracks how often the Rayon-backed packet decode path exceeds PACKET_DECODING_TIMEOUT.
51    // A sustained non-zero rate here indicates the Rayon pool is saturated—correlate with
52    // `hopr_rayon_tasks_cancelled_total` and hopr_rayon_queue_wait_seconds to diagnose whether
53    // the bottleneck is queue depth, individual task duration, or both.
54    static ref METRIC_PACKET_DECODE_TIMEOUTS: hopr_types::telemetry::SimpleCounter = hopr_types::telemetry::SimpleCounter::new(
55        "hopr_packet_decode_timeouts_total",
56        "Number of incoming packets dropped due to decode timeout (sustained rate indicates Rayon pool saturation)"
57    ).unwrap();
58    static ref METRIC_VALIDATION_ERRORS: hopr_types::telemetry::MultiCounter =  hopr_types::telemetry::MultiCounter::new(
59        "hopr_packet_ticket_validation_errors",
60        "Number of different ticket validation errors encountered during packet processing",
61        &["type"]
62    ).unwrap();
63}
64
65#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
66pub enum PacketPipelineProcesses {
67    #[strum(to_string = "HOPR [msg] - ingress")]
68    MsgIn,
69    #[strum(to_string = "HOPR [msg] - egress")]
70    MsgOut,
71    #[strum(to_string = "HOPR [ack] - egress")]
72    AckOut,
73    #[strum(to_string = "HOPR [ack] - ingress")]
74    AckIn,
75    #[strum(to_string = "HOPR [msg] - mixer")]
76    Mixer,
77}
78
79/// Performs encoding of outgoing Application protocol packets into HOPR protocol outgoing packets.
80async fn start_outgoing_packet_pipeline<AppOut, E, WOut, WOutErr>(
81    app_outgoing: AppOut,
82    encoder: std::sync::Arc<E>,
83    wire_outgoing: WOut,
84    counters: super::counters::PeerProtocolCounterRegistry,
85    concurrency: usize,
86) where
87    AppOut: futures::Stream<Item = (ResolvedTransportRouting<HoprSurb>, ApplicationDataOut)> + Send + 'static,
88    E: PacketEncoder + Send + Sync + 'static,
89    WOut: futures::Sink<(PeerId, Bytes), Error = SinkTimeoutError<WOutErr>> + Clone + Unpin + Send + 'static,
90    WOutErr: std::error::Error,
91{
92    let res = app_outgoing
93        .then_concurrent(
94            |(routing, data)| {
95                let encoder = encoder.clone();
96                let counters = counters.clone();
97                async move {
98                    match hopr_utils::parallelize::cpu::spawn_fifo_blocking(
99                        move || {
100                            encoder.encode_packet(
101                                data.data.to_bytes(),
102                                routing,
103                                data.packet_info
104                                    .map(|data| data.signals_to_destination)
105                                    .unwrap_or_default(),
106                            )
107                        },
108                        "packet_encode",
109                    )
110                    .timeout(futures_time::time::Duration::from(PACKET_ENCODING_TIMEOUT))
111                    .await
112                    {
113                        Ok(Ok(Ok(packet))) => {
114                            #[cfg(all(feature = "telemetry", not(test)))]
115                            METRIC_PACKET_COUNT.increment(&["sent"]);
116
117                            counters.get_or_create(&packet.next_hop).record_message_sent();
118                            tracing::trace!(peer = packet.next_hop.to_peerid_str(), "protocol message out");
119                            Some((packet.next_hop.into(), packet.data))
120                        }
121                        Ok(Ok(Err(error))) => {
122                            tracing::error!(%error, "outgoing packet could not be encoded");
123                            None
124                        }
125                        Ok(Err(error)) => {
126                            tracing::error!(%error, "parallel processing of the outgoing packet failed");
127                            None
128                        }
129                        Err(error) => {
130                            tracing::error!(%error, "timeout while processing the outgoing packet");
131                            None
132                        }
133                    }
134                }
135            },
136            concurrency,
137        )
138        .filter_map(futures::future::ready)
139        .map(Ok)
140        .forward_to_timeout(wire_outgoing)
141        .in_current_span()
142        .await;
143
144    if let Err(error) = res {
145        tracing::error!(
146            task = "transport (protocol - msg egress)",
147            %error,
148            "long-running background task finished with error"
149        );
150    } else {
151        tracing::warn!(
152            task = "transport (protocol - msg egress)",
153            "long-running background task finished"
154        )
155    }
156}
157
158/// Performs HOPR protocol decoding of incoming packets into Application protocol packets.
159///
160/// `wire_incoming` --> `decoder` --> `ack_outgoing` (final + forwarded)
161///                             | --> `wire_outgoing` (forwarded)
162///                             | --> `ack_incoming` (forwarded)
163///                             | --> `app_incoming` (final)
164#[allow(clippy::too_many_arguments)]
165async fn start_incoming_packet_pipeline<WIn, WOut, D, T, TEvt, AckIn, AckOut, AppIn, AppInErr>(
166    (wire_outgoing, wire_incoming): (WOut, WIn),
167    decoder: std::sync::Arc<D>,
168    ticket_proc: std::sync::Arc<T>,
169    ticket_events: TEvt,
170    (ack_outgoing, ack_incoming): (AckOut, AckIn),
171    app_incoming: AppIn,
172    counters: super::counters::PeerProtocolCounterRegistry,
173    concurrency: usize,
174) where
175    WIn: futures::Stream<Item = (PeerId, Bytes)> + Send + 'static,
176    WOut: futures::Sink<(PeerId, Bytes)> + Clone + Unpin + Send + 'static,
177    WOut::Error: std::error::Error,
178    D: PacketDecoder + Sync + Send + 'static,
179    T: UnacknowledgedTicketProcessor + Send + 'static,
180    TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
181    TEvt::Error: std::error::Error,
182    AckIn: futures::Sink<(OffchainPublicKey, Vec<Acknowledgement>)> + Send + Unpin + Clone + 'static,
183    AckIn::Error: std::error::Error,
184    AckOut: futures::Sink<(OffchainPublicKey, Option<HalfKey>)> + Send + Unpin + Clone + 'static,
185    AckOut::Error: std::error::Error,
186    AppIn: futures::Sink<(HoprPseudonym, ApplicationDataIn), Error = SinkTimeoutError<AppInErr>> + Send + 'static,
187    AppInErr: std::error::Error,
188{
189    let ack_outgoing_success = ack_outgoing.clone();
190    let ack_outgoing_failure = ack_outgoing;
191    let ticket_proc_success = ticket_proc;
192
193    let res = wire_incoming
194        .then_concurrent(move |(peer, data)| {
195            let decoder = decoder.clone();
196            let mut ack_outgoing_failure = ack_outgoing_failure.clone();
197            let mut ticket_events_reject = ticket_events.clone();
198
199            tracing::trace!(%peer, "protocol message in");
200
201            async move {
202                match hopr_utils::parallelize::cpu::spawn_fifo_blocking(move || decoder.decode(peer, data), "packet_decode")
203                    .timeout(futures_time::time::Duration::from(PACKET_DECODING_TIMEOUT))
204                    .await {
205                    Ok(Ok(Ok(packet))) => {
206                        tracing::trace!(%peer, ?packet, "successfully decoded incoming packet");
207                        Some(packet)
208                    },
209                    Ok(Ok(Err(IncomingPacketError::Undecodable(error)))) => {
210                        // Do not send an ack back if the packet could not be decoded at all
211                        //
212                        // Potentially adversarial behavior
213                        tracing::trace!(%peer, %error, "not sending ack back on undecodable packet - possible adversarial behavior");
214
215                        #[cfg(all(feature = "telemetry", not(test)))]
216                        METRIC_PACKET_REJECTED_COUNT.increment(&["undecodable"]);
217
218                        None
219                    },
220                    Ok(Ok(Err(IncomingPacketError::ProcessingError(sender, error)))) => {
221                        tracing::error!(%peer, %error, "failed to process the decoded packet");
222                        // On this failure, we send back a random acknowledgement
223                        ack_outgoing_failure
224                            .send((*sender, None))
225                            .await
226                            .unwrap_or_else(|error| {
227                                tracing::error!(%error, "failed to send ack to the egress queue");
228                            });
229
230                        #[cfg(all(feature = "telemetry", not(test)))]
231                        METRIC_PACKET_REJECTED_COUNT.increment(&["processing_error"]);
232
233                        None
234                    },
235                    Ok(Ok(Err(IncomingPacketError::InvalidTicket(sender, error)))) => {
236                        tracing::error!(%peer, %error, "failed to validate ticket on the received packet");
237                        if let Err(error) = ticket_events_reject
238                            .send(TicketEvent::RejectedTicket(error.ticket, error.issuer))
239                            .await {
240                            tracing::error!(%error, "failed to notify invalid ticket rejection");
241                        }
242                        // On this failure, we send back a random acknowledgement
243                        ack_outgoing_failure
244                            .send((*sender, None))
245                            .await
246                            .unwrap_or_else(|error| {
247                                tracing::error!(%error, "failed to send ack to the egress queue");
248                            });
249
250                        #[cfg(all(feature = "telemetry", not(test)))]
251                        {
252                            METRIC_VALIDATION_ERRORS.increment(&[error.kind.as_ref()]);
253                            METRIC_PACKET_REJECTED_COUNT.increment(&["invalid_ticket"]);
254                        }
255
256                        None
257                    }
258                    Ok(Err(error)) => {
259                        tracing::error!(%error, "parallel processing of the incoming packet failed");
260                        None
261                    },
262                    Err(_) => {
263                        // If we cannot decode the packet within the time limit, just drop it
264                        tracing::error!(
265                            %peer,
266                            timeout_ms = PACKET_DECODING_TIMEOUT.as_millis() as u64,
267                            "dropped incoming packet: decode timeout - check the 'hopr_rayon_queue_wait_seconds' metric for pool saturation"
268                        );
269                        #[cfg(all(feature = "telemetry", not(test)))]
270                        {
271                            METRIC_PACKET_DECODE_TIMEOUTS.increment();
272                            METRIC_PACKET_REJECTED_COUNT.increment(&["timeout"]);
273                        }
274
275                        None
276                    }
277                }
278            }.instrument(tracing::debug_span!("incoming_packet_decode", %peer))
279        }, concurrency)
280        .filter_map(futures::future::ready)
281        // Branch on the packet type BEFORE building the async future so each arm only clones
282        // the handles it actually needs. `futures::future::Either` lets us return three
283        // distinct async blocks from one closure without boxing.
284        .then_concurrent(move |packet| {
285            match packet {
286                IncomingPacket::Acknowledgement(ack) => {
287                    let mut ack_incoming = ack_incoming.clone();
288                    let counters = counters.clone();
289                    Either::Left(async move {
290                        let IncomingAcknowledgementPacket { previous_hop, received_acks, .. } = *ack;
291                        tracing::trace!(previous_hop = previous_hop.to_peerid_str(), num_acks = received_acks.len(), "incoming acknowledgements");
292                        counters.get_or_create(&previous_hop).record_acks_received(received_acks.len() as u64);
293
294                        ack_incoming
295                            .send((previous_hop, received_acks))
296                            .await
297                            .unwrap_or_else(|error| {
298                                tracing::error!(%error, "failed dispatching received acknowledgement to the ticket ack queue");
299                            });
300
301                        // We do not acknowledge back acknowledgements.
302                        None
303                    })
304                }
305                IncomingPacket::Final(final_packet) => {
306                    let mut ack_outgoing_success = ack_outgoing_success.clone();
307                    Either::Right(Either::Left(async move {
308                        let IncomingFinalPacket {
309                            previous_hop,
310                            sender,
311                            plain_text,
312                            ack_key,
313                            info,
314                            ..
315                        } = *final_packet;
316                        tracing::trace!(previous_hop = previous_hop.to_peerid_str(), "incoming final packet");
317
318                        // Send acknowledgement back
319                        ack_outgoing_success
320                            .send((previous_hop, Some(ack_key)))
321                            .await
322                            .unwrap_or_else(|error| {
323                                tracing::error!(%error, "failed to send ack to the egress queue");
324                            });
325
326                        #[cfg(all(feature = "telemetry", not(test)))]
327                        METRIC_PACKET_COUNT.increment(&["received"]);
328
329                        Some((sender, plain_text, info))
330                    }))
331                }
332                IncomingPacket::Forwarded(fwd_packet) => {
333                    let ticket_proc = ticket_proc_success.clone();
334                    let mut wire_outgoing = wire_outgoing.clone();
335                    let mut ack_outgoing_success = ack_outgoing_success.clone();
336                    let counters = counters.clone();
337                    Either::Right(Either::Right(async move {
338                        let IncomingForwardedPacket {
339                            previous_hop,
340                            next_hop,
341                            data,
342                            ack_key_prev_hop,
343                            ack_challenge,
344                            received_ticket,
345                            ..
346                        } = *fwd_packet;
347                        // Per requirements, this call is not blocking
348                        if let Err(error) = ticket_proc.insert_unacknowledged_ticket(&next_hop, ack_challenge, received_ticket) {
349                            tracing::error!(
350                                previous_hop = previous_hop.to_peerid_str(),
351                                next_hop = next_hop.to_peerid_str(),
352                                %error,
353                                "failed to insert unacknowledged ticket into the ticket processor"
354                            );
355
356                            #[cfg(all(feature = "telemetry", not(test)))]
357                            METRIC_PACKET_REJECTED_COUNT.increment(&["unack_processing_error"]);
358
359                            return None;
360                        }
361
362                        // First, relay the packet to the next hop
363                        tracing::trace!(
364                            previous_hop = previous_hop.to_peerid_str(),
365                            next_hop = next_hop.to_peerid_str(),
366                            "forwarding packet to the next hop"
367                        );
368
369                        match wire_outgoing.send((next_hop.into(), data)).await {
370                            Ok(()) => {
371                                counters.get_or_create(&next_hop).record_message_sent();
372
373                                #[cfg(all(feature = "telemetry", not(test)))]
374                                METRIC_PACKET_COUNT.increment(&["forwarded"]);
375                            }
376                            Err(error) => {
377                                tracing::error!(%error, "failed to forward a packet to the transport layer");
378                                return None;
379                            }
380                        }
381
382                        // Send acknowledgement back
383                        tracing::trace!(previous_hop = previous_hop.to_peerid_str(), "acknowledging forwarded packet back");
384                        ack_outgoing_success
385                            .send((previous_hop, Some(ack_key_prev_hop)))
386                            .await
387                            .unwrap_or_else(|error| {
388                                tracing::error!(%error, "failed to send ack to the egress queue");
389                            });
390
391                        None
392                    }))
393                }
394            }
395        }, concurrency)
396        .filter_map(|maybe_data| futures::future::ready(
397            // Create the ApplicationDataIn data structure for incoming data
398            maybe_data
399                .and_then(|(sender, data, aux_info)| ApplicationData::try_from(data.as_ref())
400                    .inspect_err(|error| tracing::error!(%sender, %error, "failed to decode application data"))
401                    .ok()
402                    .map(|data| (sender, ApplicationDataIn {
403                        data,
404                        packet_info: IncomingPacketInfo {
405                            signals_from_sender: aux_info.packet_signals,
406                            num_saved_surbs: aux_info.num_surbs,
407                        }
408                    })))
409        ))
410        .map(Ok)
411        .forward_to_timeout(app_incoming)
412        .in_current_span()
413        .await;
414
415    if let Err(error) = res {
416        tracing::error!(
417            task = "transport (protocol - msg ingress)",
418            %error,
419            "long-running background task finished with error"
420        );
421    } else {
422        tracing::warn!(
423            task = "transport (protocol - msg ingress)",
424            "long-running background task finished"
425        )
426    }
427}
428
429async fn start_outgoing_ack_pipeline<AckOut, E, WOut>(
430    ack_outgoing: AckOut,
431    encoder: std::sync::Arc<E>,
432    cfg: AcknowledgementPipelineConfig,
433    packet_key: OffchainKeypair,
434    wire_outgoing: WOut,
435) where
436    AckOut: futures::Stream<Item = (OffchainPublicKey, Option<HalfKey>)> + Send + 'static,
437    E: PacketEncoder + Sync + Send + 'static,
438    WOut: futures::Sink<(PeerId, Bytes)> + Clone + Unpin + Send + 'static,
439    WOut::Error: std::error::Error,
440{
441    ack_outgoing
442        .map(move |(destination, maybe_ack_key)| {
443            let packet_key = packet_key.clone();
444            // Sign acknowledgement with the given half-key or generate a signed random one
445            let ack = maybe_ack_key
446                .map(|ack_key| VerifiedAcknowledgement::new(ack_key, &packet_key))
447                .unwrap_or_else(|| VerifiedAcknowledgement::random(&packet_key));
448            (destination, ack)
449        })
450        .buffer(futures_time::time::Duration::from(cfg.ack_buffer_interval))
451        .filter(|acks| futures::future::ready(!acks.is_empty()))
452        // Group by sender, reusing the same HashMap across buffer cycles so we don't
453        // re-allocate its bucket storage every `cfg.ack_buffer_interval` (default 200ms).
454        //
455        // The halfbrown map uses a Vec backing for a small number of distinct senders
456        // (<32) and transitions to hashbrown otherwise — calling `drain()` keeps the
457        // underlying allocation, leaving us with only the per-group Vec<Ack> to allocate
458        // (which downstream consumes as owned values).
459        .scan(
460            halfbrown::HashMap::<OffchainPublicKey, Vec<VerifiedAcknowledgement>, ahash::RandomState>::with_capacity_and_hasher(
461                cfg.ack_grouping_capacity,
462                ahash::RandomState::default(),
463            ),
464            |groups, buffered_acks| {
465                for (dst, ack) in buffered_acks {
466                    groups
467                        .entry(dst)
468                        .and_modify(|v| v.push(ack))
469                        .or_insert_with(|| vec![ack]);
470                }
471                tracing::trace!(
472                    num_groups = groups.len(),
473                    num_acks = groups.values().map(|v| v.len()).sum::<usize>(),
474                    "acknowledgements grouped"
475                );
476                let drained: Vec<_> = groups.drain().collect();
477                futures::future::ready(Some(futures::stream::iter(drained)))
478            },
479        )
480        .flatten()
481        .for_each_concurrent(
482            cfg.ack_output_concurrency.filter(|&n| n > 0).unwrap_or(DEFAULT_ACK_OUTPUT_CONCURRENCY),
483            move |(destination, acks)| {
484                let encoder = encoder.clone();
485                let mut wire_outgoing = wire_outgoing.clone();
486                async move {
487                    // Make sure that the acknowledgements are sent in batches of at most MAX_ACKNOWLEDGEMENTS_BATCH_SIZE
488                    // TODO: find better strategy to avoid reallocations
489                    let c = acks.chunks(MAX_ACKNOWLEDGEMENTS_BATCH_SIZE).map(|c| c.to_vec()).collect::<Vec<_>>();
490                    for ack_chunk in c {
491                        let encoder = encoder.clone();
492                        match hopr_utils::parallelize::cpu::spawn_fifo_blocking(move || encoder.encode_acknowledgements(&ack_chunk, &destination), "ack_encode").await {
493                            Ok(Ok(ack_packet)) => {
494                                wire_outgoing
495                                    .feed((ack_packet.next_hop.into(), ack_packet.data))
496                                    .await
497                                    .unwrap_or_else(|error| {
498                                        tracing::error!(%error, "failed to forward an acknowledgement to the transport layer");
499                                    });
500                            }
501                            Ok(Err(error)) => tracing::error!(%error, "failed to encode acknowledgements"),
502                            Err(error) => tracing::error!(%error, "parallel processing of the outgoing acknowledgements failed"),
503                        }
504                    }
505                    if let Err(error) = wire_outgoing.flush().await {
506                        tracing::error!(%error, "failed to flush acknowledgements batch to the transport layer");
507                    }
508                    tracing::trace!("acknowledgements out");
509                }.instrument(tracing::debug_span!("outgoing_ack_batch", peer = destination.to_peerid_str()))
510            }
511        )
512        .in_current_span()
513        .await;
514
515    tracing::warn!(
516        task = "transport (protocol - ack outgoing)",
517        "long-running background task finished"
518    );
519}
520
521/// Drains incoming acknowledgements without forwarding them to an [`UnacknowledgedTicketProcessor`].
522///
523/// Used by Exit nodes: they keep the incoming acknowledgement pipeline running (for future
524/// development), but since they never receive tickets, they have nothing to acknowledge.
525async fn start_exit_incoming_ack_pipeline<AckIn>(ack_incoming: AckIn)
526where
527    AckIn: futures::Stream<Item = (OffchainPublicKey, Vec<Acknowledgement>)> + Send + 'static,
528{
529    ack_incoming
530        .for_each(move |(peer, acks)| {
531            // TODO: PIX will make use of acknowledgements at Exits
532            tracing::trace!(%peer, num = acks.len(), "received acknowledgements (drained, not processed)");
533            futures::future::ready(())
534        })
535        .in_current_span()
536        .await;
537
538    tracing::warn!(
539        task = "transport (protocol - ticket acknowledgement drain)",
540        "long-running background task finished"
541    );
542}
543
544async fn start_relay_incoming_ack_pipeline<AckIn, T, TEvt>(
545    ack_incoming: AckIn,
546    ticket_events: TEvt,
547    ticket_proc: std::sync::Arc<T>,
548    concurrency: usize,
549) where
550    AckIn: futures::Stream<Item = (OffchainPublicKey, Vec<Acknowledgement>)> + Send + 'static,
551    T: UnacknowledgedTicketProcessor + Sync + Send + 'static,
552    TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
553    TEvt::Error: std::error::Error,
554{
555    ack_incoming
556        .for_each_concurrent(concurrency, move |(peer, acks)| {
557            let ticket_proc = ticket_proc.clone();
558            let mut ticket_evt = ticket_events.clone();
559            async move {
560                tracing::trace!(num = acks.len(), "received acknowledgements");
561                match hopr_utils::parallelize::cpu::spawn_fifo_blocking(
562                    move || ticket_proc.acknowledge_tickets(peer, acks),
563                    "ack_decode",
564                )
565                .await
566                {
567                    Ok(Ok(resolutions)) if !resolutions.is_empty() => {
568                        let resolutions_iter = resolutions.into_iter().filter_map(|resolution| match resolution {
569                            ResolvedAcknowledgement::RelayingWin(redeemable_ticket) => {
570                                tracing::trace!("received ack for a winning ticket");
571                                Some(Ok(TicketEvent::WinningTicket(redeemable_ticket)))
572                            }
573                            ResolvedAcknowledgement::RelayingLoss(_) => {
574                                // Losing tickets are not getting accounted for anywhere.
575                                tracing::trace!("received ack for a losing ticket");
576                                None
577                            }
578                        });
579
580                        // All acknowledgements that resulted in winning tickets go upstream
581                        if let Err(error) = ticket_evt.send_all(&mut futures::stream::iter(resolutions_iter)).await {
582                            tracing::error!(%error, "failed to notify ticket resolutions");
583                        }
584                    }
585                    Ok(Ok(_)) => {
586                        tracing::debug!("acknowledgement batch could not acknowledge any ticket");
587                    }
588                    Ok(Err(TicketAcknowledgementError::UnexpectedAcknowledgement)) => {
589                        // Unexpected acknowledgements naturally happen
590                        // as acknowledgements of 0-hop packets
591                        tracing::trace!("received unexpected acknowledgement");
592                    }
593                    Ok(Err(error)) => {
594                        tracing::error!(%error, "failed to acknowledge ticket");
595                    }
596                    Err(error) => {
597                        tracing::error!(%error, "parallel processing of the incoming acknowledgements failed")
598                    }
599                }
600            }
601            .instrument(tracing::debug_span!("incoming_ack_batch", peer = peer.to_peerid_str()))
602        })
603        .in_current_span()
604        .await;
605
606    tracing::warn!(
607        task = "transport (protocol - ticket acknowledgement)",
608        "long-running background task finished"
609    );
610}
611/// Node type for which the packet processing pipeline is being constructed.
612///
613/// The three HOPR node types differ in how they treat tickets and incoming acknowledgements:
614/// * [`Relay`](NodeType::Relay) — full pipeline, processes tickets and incoming acknowledgements.
615/// * [`Entry`](NodeType::Entry) — does not process tickets and does not even start the incoming acknowledgement
616///   pipeline.
617/// * [`Exit`](NodeType::Exit) — does not process tickets, but still runs the incoming acknowledgement pipeline (which
618///   only drains the stream) for future use.
619#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
620pub enum NodeType {
621    Relay,
622    Entry,
623    Exit,
624}
625
626/// No-op [`UnacknowledgedTicketProcessor`] used by node types that do not process tickets
627/// (Entry and Exit). All methods are unreachable because the inner pipeline never invokes
628/// them on those node types (Entry skips the ack pipeline entirely, Exit uses the drain
629/// variant, and the forwarded packet branch never fires on a terminal/source node).
630#[derive(Debug, Default, Copy, Clone)]
631#[doc(hidden)]
632pub struct NoopTicketProcessor;
633
634impl UnacknowledgedTicketProcessor for NoopTicketProcessor {
635    type Error = std::convert::Infallible;
636
637    #[inline]
638    fn insert_unacknowledged_ticket(
639        &self,
640        _: &OffchainPublicKey,
641        _: HalfKeyChallenge,
642        _: UnacknowledgedTicket,
643    ) -> Result<(), Self::Error> {
644        Ok(())
645    }
646
647    #[inline]
648    fn acknowledge_tickets(
649        &self,
650        _: OffchainPublicKey,
651        _: Vec<Acknowledgement>,
652    ) -> Result<Vec<ResolvedAcknowledgement>, TicketAcknowledgementError<Self::Error>> {
653        Ok(Vec::with_capacity(0))
654    }
655}
656/// Shared implementation of the packet pipeline used by [`PacketPipelineBuilder`]'s
657/// terminal `build_for_*` methods.
658#[allow(clippy::too_many_arguments)]
659#[tracing::instrument(skip_all, level = "trace", fields(me = packet_key.public().to_peerid_str()))]
660pub(super) fn run_packet_pipeline_inner<WIn, WOut, C, D, T, TEvt, AppOut, AppIn>(
661    node_type: NodeType,
662    packet_key: OffchainKeypair,
663    wire_msg: (WOut, WIn),
664    codec: (C, D),
665    ticket_proc: T,
666    ticket_events: TEvt,
667    cfg: PacketPipelineConfig,
668    api: (AppOut, AppIn),
669    counters: PeerProtocolCounterRegistry,
670) -> AbortableList<PacketPipelineProcesses>
671where
672    WOut: futures::Sink<(PeerId, Bytes)> + Clone + Unpin + Send + 'static,
673    WOut::Error: std::error::Error,
674    WIn: futures::Stream<Item = (PeerId, Bytes)> + Send + 'static,
675    C: PacketEncoder + Sync + Send + 'static,
676    D: PacketDecoder + Sync + Send + 'static,
677    T: UnacknowledgedTicketProcessor + Sync + Send + 'static,
678    TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
679    TEvt::Error: std::error::Error,
680    AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
681    AppOut::Error: std::error::Error,
682    AppIn: futures::Stream<Item = (ResolvedTransportRouting<HoprSurb>, ApplicationDataOut)> + Send + 'static,
683{
684    let mut processes = AbortableList::default();
685
686    #[cfg(all(feature = "telemetry", not(test)))]
687    {
688        // Initialize the lazy statics here
689        lazy_static::initialize(&METRIC_PACKET_COUNT);
690        lazy_static::initialize(&METRIC_PACKET_DECODE_TIMEOUTS);
691        lazy_static::initialize(&METRIC_PACKET_REJECTED_COUNT);
692        lazy_static::initialize(&METRIC_VALIDATION_ERRORS);
693    }
694
695    let (outgoing_ack_tx, outgoing_ack_rx) =
696        futures::channel::mpsc::channel::<(OffchainPublicKey, Option<HalfKey>)>(cfg.ack_config.ack_out_buffer_size);
697
698    let (incoming_ack_tx, incoming_ack_rx) = futures::channel::mpsc::channel::<(OffchainPublicKey, Vec<Acknowledgement>)>(
699        cfg.ack_config.ticket_ack_buffer_size,
700    );
701
702    // Attach timeouts to all Sinks so that the pipelines are not blocked when
703    // some channel is not being timely processed
704    let (wire_out, wire_in) = (wire_msg.0.with_timeout(QUEUE_SEND_TIMEOUT), wire_msg.1);
705    let (app_out, app_in) = (api.0.with_timeout(QUEUE_SEND_TIMEOUT), api.1);
706    let incoming_ack_tx = incoming_ack_tx.with_timeout(QUEUE_SEND_TIMEOUT);
707    let outgoing_ack_tx = outgoing_ack_tx.with_timeout(QUEUE_SEND_TIMEOUT);
708    let ticket_events = ticket_events.with_timeout(QUEUE_SEND_TIMEOUT);
709
710    let encoder = std::sync::Arc::new(codec.0);
711    let decoder = std::sync::Arc::new(codec.1);
712    let ticket_proc = std::sync::Arc::new(ticket_proc);
713
714    // The default maximum concurrency (if not set or zero) is 8 times the number of available cores.
715    // Zero is normalized to the default to prevent deadlock (0 concurrent tasks = no work).
716    let avail_concurrency = std::thread::available_parallelism()
717        .ok()
718        .map(|n| n.get())
719        .unwrap_or(1)
720        .max(1)
721        * 8;
722
723    let output_concurrency = cfg.output_concurrency.filter(|&n| n > 0).unwrap_or(avail_concurrency);
724    let input_concurrency = cfg.input_concurrency.filter(|&n| n > 0).unwrap_or(avail_concurrency);
725
726    processes.insert(
727        PacketPipelineProcesses::MsgOut,
728        hopr_utils::spawn_as_abortable!(
729            start_outgoing_packet_pipeline(
730                app_in,
731                encoder.clone(),
732                wire_out.clone(),
733                counters.clone(),
734                output_concurrency
735            )
736            .in_current_span()
737        ),
738    );
739
740    processes.insert(
741        PacketPipelineProcesses::MsgIn,
742        hopr_utils::spawn_as_abortable!(
743            start_incoming_packet_pipeline(
744                (wire_out.clone(), wire_in),
745                decoder,
746                ticket_proc.clone(),
747                ticket_events.clone(),
748                (outgoing_ack_tx, incoming_ack_tx),
749                app_out,
750                counters.clone(),
751                input_concurrency,
752            )
753            .in_current_span()
754        ),
755    );
756
757    processes.insert(
758        PacketPipelineProcesses::AckOut,
759        hopr_utils::spawn_as_abortable!(
760            start_outgoing_ack_pipeline(outgoing_ack_rx, encoder, cfg.ack_config, packet_key.clone(), wire_out,)
761                .in_current_span()
762        ),
763    );
764
765    let ack_input_concurrency = cfg
766        .ack_config
767        .ack_input_concurrency
768        .filter(|&n| n > 0)
769        .unwrap_or(DEFAULT_ACK_INPUT_CONCURRENCY);
770
771    match node_type {
772        NodeType::Relay => {
773            processes.insert(
774                PacketPipelineProcesses::AckIn,
775                hopr_utils::spawn_as_abortable!(
776                    start_relay_incoming_ack_pipeline(
777                        incoming_ack_rx,
778                        ticket_events,
779                        ticket_proc,
780                        ack_input_concurrency
781                    )
782                    .in_current_span()
783                ),
784            );
785        }
786        NodeType::Exit => {
787            // Exit nodes still run the incoming acknowledgement pipeline (for future use),
788            // but only drain the stream — incoming acknowledgements are NOT forwarded to the
789            // UnacknowledgedTicketProcessor because Exit nodes do not process tickets.
790            let _ = (ticket_events, ticket_proc, ack_input_concurrency);
791            processes.insert(
792                PacketPipelineProcesses::AckIn,
793                hopr_utils::spawn_as_abortable!(start_exit_incoming_ack_pipeline(incoming_ack_rx).in_current_span()),
794            );
795        }
796        NodeType::Entry => {
797            // Entry nodes do not process tickets at all and do not even start the incoming
798            // acknowledgement pipeline. The receiver is dropped here, which propagates back
799            // pressure to senders of `incoming_ack_tx`.
800            let _ = (ticket_events, ticket_proc, ack_input_concurrency, incoming_ack_rx);
801        }
802    }
803
804    processes
805}