Skip to main content

hopr_transport_protocol/
pipeline.rs

1use futures::{SinkExt, StreamExt};
2use futures_time::{future::FutureExt as TimeExt, stream::StreamExt as TimeStreamExt};
3use hopr_async_runtime::{AbortableList, spawn_as_abortable};
4use hopr_crypto_types::prelude::*;
5use hopr_internal_types::prelude::*;
6use hopr_network_types::{
7    prelude::*,
8    timeout::{SinkTimeoutError, TimeoutSinkExt, TimeoutStreamExt},
9};
10use hopr_primitive_types::prelude::Address;
11use hopr_protocol_app::prelude::*;
12use hopr_protocol_hopr::prelude::*;
13use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
14use tracing::Instrument;
15use validator::{Validate, ValidationError, ValidationErrors};
16
17use crate::PeerId;
18
19const TICKET_ACK_BUFFER_SIZE: usize = 1_000_000;
20const NUM_CONCURRENT_TICKET_ACK_PROCESSING: usize = 10;
21const ACK_OUT_BUFFER_SIZE: usize = 1_000_000;
22const NUM_CONCURRENT_ACK_OUT_PROCESSING: usize = 10;
23const QUEUE_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50);
24const PACKET_DECODING_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(150);
25
26#[cfg(all(feature = "prometheus", not(test)))]
27lazy_static::lazy_static! {
28    static ref METRIC_PACKET_COUNT:  hopr_metrics::MultiCounter =  hopr_metrics::MultiCounter::new(
29        "hopr_packets_count",
30        "Number of processed packets of different types (sent, received, forwarded)",
31        &["type"]
32    ).unwrap();
33    // Tracks how often the Rayon-backed packet decode path exceeds PACKET_DECODING_TIMEOUT.
34    // A sustained non-zero rate here indicates the Rayon pool is saturated—correlate with
35    // hopr_rayon_tasks_cancelled_total and hopr_rayon_queue_wait_seconds to diagnose whether
36    // the bottleneck is queue depth, individual task duration, or both.
37    static ref METRIC_PACKET_DECODE_TIMEOUTS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
38        "hopr_packet_decode_timeouts_total",
39        "Number of incoming packets dropped due to decode timeout"
40    ).unwrap();
41    static ref METRIC_VALIDATION_ERRORS: hopr_metrics::MultiCounter =  hopr_metrics::MultiCounter::new(
42        "hopr_packet_ticket_validation_errors",
43        "Number of different ticket validation errors encountered during packet processing",
44        &["type"]
45    ).unwrap();
46}
47
48#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
49pub enum PacketPipelineProcesses {
50    #[strum(to_string = "HOPR [msg] - ingress")]
51    MsgIn,
52    #[strum(to_string = "HOPR [msg] - egress")]
53    MsgOut,
54    #[strum(to_string = "HOPR [ack] - egress")]
55    AckOut,
56    #[strum(to_string = "HOPR [ack] - ingress")]
57    AckIn,
58    #[strum(to_string = "HOPR [msg] - mixer")]
59    Mixer,
60}
61
62/// Ticket events emitted from the packet processing pipeline.
63#[derive(Debug, Clone, strum::EnumIs, strum::EnumTryAs)]
64pub enum TicketEvent {
65    /// A winning ticket was received.
66    WinningTicket(Box<RedeemableTicket>),
67    /// A ticket has been rejected.
68    RejectedTicket(Box<Ticket>, Option<Address>),
69}
70
71/// Performs encoding of outgoing Application protocol packets into HOPR protocol outgoing packets.
72async fn start_outgoing_packet_pipeline<AppOut, E, WOut, WOutErr>(
73    app_outgoing: AppOut,
74    encoder: std::sync::Arc<E>,
75    wire_outgoing: WOut,
76    concurrency: usize,
77) where
78    AppOut: futures::Stream<Item = (ResolvedTransportRouting, ApplicationDataOut)> + Send + 'static,
79    E: PacketEncoder + Send + 'static,
80    WOut: futures::Sink<(PeerId, Box<[u8]>), Error = SinkTimeoutError<WOutErr>> + Clone + Unpin + Send + 'static,
81    WOutErr: std::error::Error,
82{
83    let res = app_outgoing
84        .then_concurrent(
85            |(routing, data)| {
86                let encoder = encoder.clone();
87                async move {
88                    match encoder
89                        .encode_packet(
90                            data.data.to_bytes(),
91                            routing,
92                            data.packet_info
93                                .map(|data| data.signals_to_destination)
94                                .unwrap_or_default(),
95                        )
96                        .await
97                    {
98                        Ok(packet) => {
99                            #[cfg(all(feature = "prometheus", not(test)))]
100                            METRIC_PACKET_COUNT.increment(&["sent"]);
101
102                            tracing::trace!(peer = packet.next_hop.to_peerid_str(), "protocol message out");
103                            Some((packet.next_hop.into(), packet.data))
104                        }
105                        Err(error) => {
106                            tracing::error!(%error, "packet could not be wrapped for sending");
107                            None
108                        }
109                    }
110                }
111            },
112            concurrency,
113        )
114        .filter_map(futures::future::ready)
115        .map(Ok)
116        .forward_to_timeout(wire_outgoing)
117        .in_current_span()
118        .await;
119
120    if let Err(error) = res {
121        tracing::error!(
122            task = "transport (protocol - msg egress)",
123            %error,
124            "long-running background task finished with error"
125        );
126    } else {
127        tracing::warn!(
128            task = "transport (protocol - msg egress)",
129            "long-running background task finished"
130        )
131    }
132}
133
134/// Performs HOPR protocol decoding of incoming packets into Application protocol packets.
135///
136/// `wire_incoming` --> `decoder` --> `ack_outgoing` (final + forwarded)
137///                             | --> `wire_outgoing` (forwarded)
138///                             | --> `ack_incoming` (forwarded)
139///                             | --> `app_incoming` (final)
140async fn start_incoming_packet_pipeline<WIn, WOut, D, T, TEvt, AckIn, AckOut, AppIn, AppInErr>(
141    (wire_outgoing, wire_incoming): (WOut, WIn),
142    decoder: std::sync::Arc<D>,
143    ticket_proc: std::sync::Arc<T>,
144    ticket_events: TEvt,
145    (ack_outgoing, ack_incoming): (AckOut, AckIn),
146    app_incoming: AppIn,
147    concurrency: usize,
148) where
149    WIn: futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + 'static,
150    WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
151    WOut::Error: std::error::Error,
152    D: PacketDecoder + Send + 'static,
153    T: UnacknowledgedTicketProcessor + Send + 'static,
154    TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
155    TEvt::Error: std::error::Error,
156    AckIn: futures::Sink<(OffchainPublicKey, Vec<Acknowledgement>)> + Send + Unpin + Clone + 'static,
157    AckIn::Error: std::error::Error,
158    AckOut: futures::Sink<(OffchainPublicKey, Option<HalfKey>)> + Send + Unpin + Clone + 'static,
159    AckOut::Error: std::error::Error,
160    AppIn: futures::Sink<(HoprPseudonym, ApplicationDataIn), Error = SinkTimeoutError<AppInErr>> + Send + 'static,
161    AppInErr: std::error::Error,
162{
163    let ack_outgoing_success = ack_outgoing.clone();
164    let ack_outgoing_failure = ack_outgoing;
165    let ticket_proc_success = ticket_proc;
166
167    let res = wire_incoming
168        .then_concurrent(move |(peer, data)| {
169            let decoder = decoder.clone();
170            let mut ack_outgoing_failure = ack_outgoing_failure.clone();
171            let mut ticket_events_reject = ticket_events.clone();
172
173            tracing::trace!(%peer, "protocol message in");
174
175            async move {
176                match decoder.decode(peer, data)
177                    .timeout(futures_time::time::Duration::from(PACKET_DECODING_TIMEOUT))
178                    .await {
179                    Ok(Ok(packet)) => {
180                        tracing::trace!(%peer, ?packet, "successfully decoded incoming packet");
181                        Some(packet)
182                    },
183                    Ok(Err(IncomingPacketError::Overloaded(error))) => {
184                        tracing::warn!(%peer, %error, "dropping packet due to local CPU overload");
185                        None
186                    },
187                    Ok(Err(IncomingPacketError::Undecodable(error))) => {
188                        // Do not send an ack back if the packet could not be decoded at all
189                        //
190                        // Potentially adversarial behavior
191                        tracing::trace!(%peer, %error, "not sending ack back on undecodable packet - possible adversarial behavior");
192                        None
193                    },
194                    Ok(Err(IncomingPacketError::ProcessingError(sender, error))) => {
195                        tracing::error!(%peer, %error, "failed to process the decoded packet");
196                        // On this failure, we send back a random acknowledgement
197                        ack_outgoing_failure
198                            .send((sender, None))
199                            .await
200                            .unwrap_or_else(|error| {
201                                tracing::error!(%error, "failed to send ack to the egress queue");
202                            });
203                        None
204                    },
205                    Ok(Err(IncomingPacketError::InvalidTicket(sender, error))) => {
206                        tracing::error!(%peer, %error, "failed to validate ticket on the received packet");
207                        if let Err(error) = ticket_events_reject
208                            .send(TicketEvent::RejectedTicket(error.ticket, error.issuer))
209                            .await {
210                            tracing::error!(%error, "failed to notify invalid ticket rejection");
211                        }
212                        // On this failure, we send back a random acknowledgement
213                        ack_outgoing_failure
214                            .send((sender, None))
215                            .await
216                            .unwrap_or_else(|error| {
217                                tracing::error!(%error, "failed to send ack to the egress queue");
218                            });
219
220                        #[cfg(all(feature = "prometheus", not(test)))]
221                        METRIC_VALIDATION_ERRORS.increment(&[error.kind.as_ref()]);
222
223                        None
224                    }
225                    Err(_) => {
226                        // If we cannot decode the packet within the time limit, just drop it
227                        tracing::error!(
228                            %peer,
229                            timeout_ms = PACKET_DECODING_TIMEOUT.as_millis() as u64,
230                            "dropped incoming packet: decode timeout - check the 'hopr_rayon_queue_wait_seconds' metric for pool saturation"
231                        );
232                        #[cfg(all(feature = "prometheus", not(test)))]
233                        METRIC_PACKET_DECODE_TIMEOUTS.increment();
234
235                        None
236                    }
237                }
238            }.instrument(tracing::debug_span!("incoming_packet_decode", %peer))
239        }, concurrency)
240        .filter_map(futures::future::ready)
241        .then_concurrent(move |packet| {
242            let ticket_proc = ticket_proc_success.clone();
243            let mut wire_outgoing = wire_outgoing.clone();
244            let mut ack_incoming = ack_incoming.clone();
245            let mut ack_outgoing_success = ack_outgoing_success.clone();
246            async move {
247                match packet {
248                    IncomingPacket::Acknowledgement(ack) => {
249                        let IncomingAcknowledgementPacket { previous_hop, received_acks, .. } = *ack;
250                        tracing::trace!(previous_hop = previous_hop.to_peerid_str(), num_acks = received_acks.len(), "incoming acknowledgements");
251                        ack_incoming
252                            .send((previous_hop, received_acks))
253                            .await
254                            .unwrap_or_else(|error| {
255                                tracing::error!(%error, "failed dispatching received acknowledgement to the ticket ack queue");
256                            });
257
258                        // We do not acknowledge back acknowledgements.
259                        None
260                    },
261                    IncomingPacket::Final(final_packet) => {
262                        let IncomingFinalPacket {
263                            previous_hop,
264                            sender,
265                            plain_text,
266                            ack_key,
267                            info,
268                            ..
269                        } = *final_packet;
270                        tracing::trace!(previous_hop = previous_hop.to_peerid_str(), "incoming final packet");
271
272                        // Send acknowledgement back
273                        ack_outgoing_success
274                            .send((previous_hop, Some(ack_key)))
275                            .await
276                            .unwrap_or_else(|error| {
277                                tracing::error!(%error, "failed to send ack to the egress queue");
278                            });
279
280                        #[cfg(all(feature = "prometheus", not(test)))]
281                        METRIC_PACKET_COUNT.increment(&["received"]);
282
283                        Some((sender, plain_text, info))
284                    }
285                    IncomingPacket::Forwarded(fwd_packet) => {
286                        let IncomingForwardedPacket {
287                            previous_hop,
288                            next_hop,
289                            data,
290                            ack_key_prev_hop,
291                            ack_challenge,
292                            received_ticket,
293                            ..
294                        } = *fwd_packet;
295                        if let Err(error) = ticket_proc.insert_unacknowledged_ticket(&next_hop, ack_challenge, received_ticket).await {
296                            tracing::error!(
297                                previous_hop = previous_hop.to_peerid_str(),
298                                next_hop = next_hop.to_peerid_str(),
299                                %error,
300                                "failed to insert unacknowledged ticket into the ticket processor"
301                            );
302                            return None;
303                        }
304
305                        // First, relay the packet to the next hop
306                        tracing::trace!(
307                            previous_hop = previous_hop.to_peerid_str(),
308                            next_hop = next_hop.to_peerid_str(),
309                            "forwarding packet to the next hop"
310                        );
311
312                        wire_outgoing
313                            .send((next_hop.into(), data))
314                            .await
315                            .unwrap_or_else(|error| {
316                                tracing::error!(%error, "failed to forward a packet to the transport layer");
317                            });
318
319                        #[cfg(all(feature = "prometheus", not(test)))]
320                        METRIC_PACKET_COUNT.increment(&["forwarded"]);
321
322                        // Send acknowledgement back
323                        tracing::trace!(previous_hop = previous_hop.to_peerid_str(), "acknowledging forwarded packet back");
324                        ack_outgoing_success
325                            .send((previous_hop, Some(ack_key_prev_hop)))
326                            .await
327                            .unwrap_or_else(|error| {
328                                tracing::error!(%error, "failed to send ack to the egress queue");
329                            });
330
331                        None
332                    }
333                }
334            }}, concurrency)
335        .filter_map(|maybe_data| futures::future::ready(
336            // Create the ApplicationDataIn data structure for incoming data
337            maybe_data
338                .and_then(|(sender, data, aux_info)| ApplicationData::try_from(data.as_ref())
339                    .inspect_err(|error| tracing::error!(%sender, %error, "failed to decode application data"))
340                    .ok()
341                    .map(|data| (sender, ApplicationDataIn {
342                        data,
343                        packet_info: IncomingPacketInfo {
344                            signals_from_sender: aux_info.packet_signals,
345                            num_saved_surbs: aux_info.num_surbs,
346                        }
347                    })))
348        ))
349        .map(Ok)
350        .forward_to_timeout(app_incoming)
351        .in_current_span()
352        .await;
353
354    if let Err(error) = res {
355        tracing::error!(
356            task = "transport (protocol - msg ingress)",
357            %error,
358            "long-running background task finished with error"
359        );
360    } else {
361        tracing::warn!(
362            task = "transport (protocol - msg ingress)",
363            "long-running background task finished"
364        )
365    }
366}
367
368async fn start_outgoing_ack_pipeline<AckOut, E, WOut>(
369    ack_outgoing: AckOut,
370    encoder: std::sync::Arc<E>,
371    cfg: AcknowledgementPipelineConfig,
372    packet_key: OffchainKeypair,
373    wire_outgoing: WOut,
374) where
375    AckOut: futures::Stream<Item = (OffchainPublicKey, Option<HalfKey>)> + Send + 'static,
376    E: PacketEncoder + Send + 'static,
377    WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
378    WOut::Error: std::error::Error,
379{
380    ack_outgoing
381        .map(move |(destination, maybe_ack_key)| {
382            let packet_key = packet_key.clone();
383            // Sign acknowledgement with the given half-key or generate a signed random one
384            let ack = maybe_ack_key
385                .map(|ack_key| VerifiedAcknowledgement::new(ack_key, &packet_key))
386                .unwrap_or_else(|| VerifiedAcknowledgement::random(&packet_key));
387            (destination, ack)
388        })
389        .buffer(futures_time::time::Duration::from(cfg.ack_buffer_interval))
390        .filter(|acks| futures::future::ready(!acks.is_empty()))
391        .flat_map(|buffered_acks| {
392            // Split the acknowledgements into groups based on the sender
393            // The halfbrown hash map will use Vec for a lower number of distinct senders, and possibly transition to
394            // the hashbrown hash map when the number of distinct senders exceeds 32.
395            let mut groups = halfbrown::HashMap::<OffchainPublicKey, Vec<VerifiedAcknowledgement>, ahash::RandomState>::with_capacity_and_hasher(
396                cfg.ack_grouping_capacity,
397                ahash::RandomState::default()
398            );
399            for (dst, ack) in buffered_acks {
400                groups
401                    .entry(dst)
402                    .and_modify(|v| v.push(ack))
403                    .or_insert_with(|| vec![ack]);
404            }
405            tracing::trace!(
406                num_groups = groups.len(),
407                num_acks = groups.values().map(|v| v.len()).sum::<usize>(),
408                "acknowledgements grouped"
409            );
410            futures::stream::iter(groups)
411        })
412        .for_each_concurrent(
413            NUM_CONCURRENT_ACK_OUT_PROCESSING,
414            move |(destination, acks)| {
415                let encoder = encoder.clone();
416                let mut wire_outgoing = wire_outgoing.clone();
417                async move {
418                    // Make sure that the acknowledgements are sent in batches of at most MAX_ACKNOWLEDGEMENTS_BATCH_SIZE
419                    for ack_chunk in acks.chunks(MAX_ACKNOWLEDGEMENTS_BATCH_SIZE) {
420                        match encoder.encode_acknowledgements(ack_chunk, &destination).await {
421                            Ok(ack_packet) => {
422                                wire_outgoing
423                                    .feed((ack_packet.next_hop.into(), ack_packet.data))
424                                    .await
425                                    .unwrap_or_else(|error| {
426                                        tracing::error!(%error, "failed to forward an acknowledgement to the transport layer");
427                                    });
428                            }
429                            Err(error) => tracing::error!(%error, "failed to create ack packet"),
430                        }
431                    }
432                    if let Err(error) = wire_outgoing.flush().await {
433                        tracing::error!(%error, "failed to flush acknowledgements batch to the transport layer");
434                    }
435                    tracing::trace!("acknowledgements out");
436                }.instrument(tracing::debug_span!("outgoing_ack_batch", peer = destination.to_peerid_str()))
437            }
438        )
439        .in_current_span()
440        .await;
441
442    tracing::warn!(
443        task = "transport (protocol - ack outgoing)",
444        "long-running background task finished"
445    );
446}
447
448async fn start_incoming_ack_pipeline<AckIn, T, TEvt>(
449    ack_incoming: AckIn,
450    ticket_events: TEvt,
451    ticket_proc: std::sync::Arc<T>,
452) where
453    AckIn: futures::Stream<Item = (OffchainPublicKey, Vec<Acknowledgement>)> + Send + 'static,
454    T: UnacknowledgedTicketProcessor + Sync + Send + 'static,
455    TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
456    TEvt::Error: std::error::Error,
457{
458    ack_incoming
459        .for_each_concurrent(NUM_CONCURRENT_TICKET_ACK_PROCESSING, move |(peer, acks)| {
460            let ticket_proc = ticket_proc.clone();
461            let mut ticket_evt = ticket_events.clone();
462            async move {
463                tracing::trace!(num = acks.len(), "received acknowledgements");
464                match ticket_proc.acknowledge_tickets(peer, acks).await {
465                    Ok(resolutions) if !resolutions.is_empty() => {
466                        let resolutions_iter = resolutions.into_iter().filter_map(|resolution| match resolution {
467                            ResolvedAcknowledgement::RelayingWin(redeemable_ticket) => {
468                                tracing::trace!("received ack for a winning ticket");
469                                Some(Ok(TicketEvent::WinningTicket(redeemable_ticket)))
470                            }
471                            ResolvedAcknowledgement::RelayingLoss(_) => {
472                                // Losing tickets are not getting accounted for anywhere.
473                                tracing::trace!("received ack for a losing ticket");
474                                None
475                            }
476                        });
477
478                        // All acknowledgements that resulted in winning tickets go upstream
479                        if let Err(error) = ticket_evt.send_all(&mut futures::stream::iter(resolutions_iter)).await {
480                            tracing::error!(%error, "failed to notify ticket resolutions");
481                        }
482                    }
483                    Ok(_) => {
484                        tracing::debug!("acknowledgement batch could not acknowledge any ticket");
485                    }
486                    Err(TicketAcknowledgementError::UnexpectedAcknowledgement) => {
487                        // Unexpected acknowledgements naturally happen
488                        // as acknowledgements of 0-hop packets
489                        tracing::trace!("received unexpected acknowledgement");
490                    }
491                    Err(error) => {
492                        tracing::error!(%error, "failed to acknowledge ticket");
493                    }
494                }
495            }
496            .instrument(tracing::debug_span!("incoming_ack_batch", peer = peer.to_peerid_str()))
497        })
498        .in_current_span()
499        .await;
500
501    tracing::warn!(
502        task = "transport (protocol - ticket acknowledgement)",
503        "long-running background task finished"
504    );
505}
506
507fn default_ack_buffer_interval() -> std::time::Duration {
508    std::time::Duration::from_millis(200)
509}
510
511fn default_ack_grouping_capacity() -> usize {
512    5
513}
514
515/// Configuration for the acknowledgement processing pipeline.
516#[derive(Debug, Copy, Clone, smart_default::SmartDefault, Eq, PartialEq)]
517#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
518pub struct AcknowledgementPipelineConfig {
519    /// Interval for which to wait to buffer acknowledgements before sending them out.
520    ///
521    /// Default is 200 ms.
522    #[default(default_ack_buffer_interval())]
523    #[cfg_attr(
524        feature = "serde",
525        serde(default = "default_ack_buffer_interval", with = "humantime_serde")
526    )]
527    pub ack_buffer_interval: std::time::Duration,
528    /// Initial capacity when grouping outgoing acknowledgements.
529    ///
530    /// If set too low, it causes additional reallocations in the outgoing acknowledgement processing pipeline.
531    /// The value should grow if `ack_buffer_interval` grows.
532    ///
533    /// Default is 5.
534    #[default(default_ack_grouping_capacity())]
535    #[cfg_attr(feature = "serde", serde(default = "default_ack_grouping_capacity"))]
536    pub ack_grouping_capacity: usize,
537}
538
539// Requires manual implementation due to https://github.com/Keats/validator/issues/285
540impl Validate for AcknowledgementPipelineConfig {
541    fn validate(&self) -> Result<(), ValidationErrors> {
542        let mut errors = ValidationErrors::new();
543        if self.ack_grouping_capacity == 0 {
544            errors.add("ack_grouping_capacity", ValidationError::new("must be greater than 0"));
545        }
546        if self.ack_buffer_interval < std::time::Duration::from_millis(10) {
547            errors.add("ack_buffer_interval", ValidationError::new("must be at least 10 ms"));
548        }
549        if errors.is_empty() { Ok(()) } else { Err(errors) }
550    }
551}
552
553/// Overall configuration of the input/output packet processing pipeline.
554#[derive(Clone, Copy, Debug, Default, PartialEq, Validate)]
555#[cfg_attr(
556    feature = "serde",
557    derive(serde::Serialize, serde::Deserialize),
558    serde(deny_unknown_fields)
559)]
560pub struct PacketPipelineConfig {
561    /// Maximum concurrency when processing outgoing packets.
562    ///
563    /// `None` or `Some(0)` both fall back to the default (available parallelism * 8).
564    pub output_concurrency: Option<usize>,
565    /// Maximum concurrency when processing incoming packets.
566    ///
567    /// `None` or `Some(0)` both fall back to the default (available parallelism * 8).
568    pub input_concurrency: Option<usize>,
569    /// Configuration of the packet acknowledgement processing
570    #[validate(nested)]
571    pub ack_config: AcknowledgementPipelineConfig,
572}
573
574/// Run all processes responsible for handling the msg and acknowledgment protocols.
575///
576/// The pipeline does not handle the mixing itself, that needs to be injected as a separate process
577/// overlay on top of the `wire_msg` Stream or Sink.
578#[tracing::instrument(skip_all, level = "trace", fields(me = packet_key.public().to_peerid_str()))]
579pub fn run_packet_pipeline<WIn, WOut, C, D, T, TEvt, AppOut, AppIn>(
580    packet_key: OffchainKeypair,
581    wire_msg: (WOut, WIn),
582    codec: (C, D),
583    ticket_proc: T,
584    ticket_events: TEvt,
585    cfg: PacketPipelineConfig,
586    api: (AppOut, AppIn),
587) -> AbortableList<PacketPipelineProcesses>
588where
589    WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
590    WOut::Error: std::error::Error,
591    WIn: futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + 'static,
592    C: PacketEncoder + Sync + Send + 'static,
593    D: PacketDecoder + Sync + Send + 'static,
594    T: UnacknowledgedTicketProcessor + Sync + Send + 'static,
595    TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
596    TEvt::Error: std::error::Error,
597    AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
598    AppOut::Error: std::error::Error,
599    AppIn: futures::Stream<Item = (ResolvedTransportRouting, ApplicationDataOut)> + Send + 'static,
600{
601    let mut processes = AbortableList::default();
602
603    #[cfg(all(feature = "prometheus", not(test)))]
604    {
605        // Initialize the lazy statics here
606        lazy_static::initialize(&METRIC_PACKET_COUNT);
607        lazy_static::initialize(&METRIC_PACKET_DECODE_TIMEOUTS);
608        lazy_static::initialize(&METRIC_VALIDATION_ERRORS);
609    }
610
611    let (outgoing_ack_tx, outgoing_ack_rx) =
612        futures::channel::mpsc::channel::<(OffchainPublicKey, Option<HalfKey>)>(ACK_OUT_BUFFER_SIZE);
613
614    let (incoming_ack_tx, incoming_ack_rx) =
615        futures::channel::mpsc::channel::<(OffchainPublicKey, Vec<Acknowledgement>)>(TICKET_ACK_BUFFER_SIZE);
616
617    // Attach timeouts to all Sinks so that the pipelines are not blocked when
618    // some channel is not being timely processed
619    let (wire_out, wire_in) = (wire_msg.0.with_timeout(QUEUE_SEND_TIMEOUT), wire_msg.1);
620    let (app_out, app_in) = (api.0.with_timeout(QUEUE_SEND_TIMEOUT), api.1);
621    let incoming_ack_tx = incoming_ack_tx.with_timeout(QUEUE_SEND_TIMEOUT);
622    let outgoing_ack_tx = outgoing_ack_tx.with_timeout(QUEUE_SEND_TIMEOUT);
623    let ticket_events = ticket_events.with_timeout(QUEUE_SEND_TIMEOUT);
624
625    let encoder = std::sync::Arc::new(codec.0);
626    let decoder = std::sync::Arc::new(codec.1);
627    let ticket_proc = std::sync::Arc::new(ticket_proc);
628
629    // Default maximum concurrency (if not set or zero) is 8 times the number of available cores.
630    // Zero is normalized to the default to prevent deadlock (0 concurrent tasks = no work).
631    let avail_concurrency = std::thread::available_parallelism()
632        .ok()
633        .map(|n| n.get())
634        .unwrap_or(1)
635        .max(1)
636        * 8;
637
638    let output_concurrency = cfg.output_concurrency.filter(|&n| n > 0).unwrap_or(avail_concurrency);
639    let input_concurrency = cfg.input_concurrency.filter(|&n| n > 0).unwrap_or(avail_concurrency);
640
641    processes.insert(
642        PacketPipelineProcesses::MsgOut,
643        spawn_as_abortable!(
644            start_outgoing_packet_pipeline(app_in, encoder.clone(), wire_out.clone(), output_concurrency)
645                .in_current_span()
646        ),
647    );
648
649    processes.insert(
650        PacketPipelineProcesses::MsgIn,
651        spawn_as_abortable!(
652            start_incoming_packet_pipeline(
653                (wire_out.clone(), wire_in),
654                decoder,
655                ticket_proc.clone(),
656                ticket_events.clone(),
657                (outgoing_ack_tx, incoming_ack_tx),
658                app_out,
659                input_concurrency,
660            )
661            .in_current_span()
662        ),
663    );
664
665    processes.insert(
666        PacketPipelineProcesses::AckOut,
667        spawn_as_abortable!(
668            start_outgoing_ack_pipeline(outgoing_ack_rx, encoder, cfg.ack_config, packet_key.clone(), wire_out,)
669                .in_current_span()
670        ),
671    );
672
673    processes.insert(
674        PacketPipelineProcesses::AckIn,
675        spawn_as_abortable!(start_incoming_ack_pipeline(incoming_ack_rx, ticket_events, ticket_proc).in_current_span()),
676    );
677
678    processes
679}