hopr_transport_protocol/
pipeline.rs

1use futures::{SinkExt, StreamExt};
2use futures_time::{future::FutureExt as TimeExt, stream::StreamExt as TimeStreamExt};
3use hopr_async_runtime::{AbortableList, spawn_as_abortable};
4use hopr_crypto_types::prelude::*;
5use hopr_internal_types::prelude::*;
6use hopr_network_types::{
7    prelude::*,
8    timeout::{SinkTimeoutError, TimeoutSinkExt, TimeoutStreamExt},
9};
10use hopr_primitive_types::prelude::Address;
11use hopr_protocol_app::prelude::*;
12use hopr_protocol_hopr::prelude::*;
13use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
14use tracing::Instrument;
15use validator::{Validate, ValidationError, ValidationErrors};
16
17use crate::PeerId;
18
19const TICKET_ACK_BUFFER_SIZE: usize = 1_000_000;
20const NUM_CONCURRENT_TICKET_ACK_PROCESSING: usize = 10;
21const ACK_OUT_BUFFER_SIZE: usize = 1_000_000;
22const NUM_CONCURRENT_ACK_OUT_PROCESSING: usize = 10;
23const QUEUE_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50);
24const PACKET_DECODING_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(150);
25
26#[cfg(all(feature = "prometheus", not(test)))]
27lazy_static::lazy_static! {
28    static ref METRIC_PACKET_COUNT:  hopr_metrics::MultiCounter =  hopr_metrics::MultiCounter::new(
29        "hopr_packets_count",
30        "Number of processed packets of different types (sent, received, forwarded)",
31        &["type"]
32    ).unwrap();
33}
34
35#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
36pub enum PacketPipelineProcesses {
37    #[strum(to_string = "HOPR [msg] - ingress")]
38    MsgIn,
39    #[strum(to_string = "HOPR [msg] - egress")]
40    MsgOut,
41    #[strum(to_string = "HOPR [ack] - egress")]
42    AckOut,
43    #[strum(to_string = "HOPR [ack] - ingress")]
44    AckIn,
45    #[strum(to_string = "HOPR [msg] - mixer")]
46    Mixer,
47}
48
49/// Ticket events emitted from the packet processing pipeline.
50#[derive(Debug, Clone, strum::EnumIs, strum::EnumTryAs)]
51pub enum TicketEvent {
52    /// A winning ticket was received.
53    WinningTicket(Box<RedeemableTicket>),
54    /// A ticket has been rejected.
55    RejectedTicket(Box<Ticket>, Option<Address>),
56}
57
58/// Performs encoding of outgoing Application protocol packets into HOPR protocol outgoing packets.
59async fn start_outgoing_packet_pipeline<AppOut, E, WOut, WOutErr>(
60    app_outgoing: AppOut,
61    encoder: std::sync::Arc<E>,
62    wire_outgoing: WOut,
63) where
64    AppOut: futures::Stream<Item = (ResolvedTransportRouting, ApplicationDataOut)> + Send + 'static,
65    E: PacketEncoder + Send + 'static,
66    WOut: futures::Sink<(PeerId, Box<[u8]>), Error = SinkTimeoutError<WOutErr>> + Clone + Unpin + Send + 'static,
67    WOutErr: std::error::Error,
68{
69    let res = app_outgoing
70        .then_concurrent(|(routing, data)| {
71            let encoder = encoder.clone();
72            async move {
73                match encoder
74                    .encode_packet(
75                        data.data.to_bytes(),
76                        routing,
77                        data.packet_info
78                            .map(|data| data.signals_to_destination)
79                            .unwrap_or_default(),
80                    )
81                    .await
82                {
83                    Ok(packet) => {
84                        #[cfg(all(feature = "prometheus", not(test)))]
85                        METRIC_PACKET_COUNT.increment(&["sent"]);
86
87                        tracing::trace!(peer = packet.next_hop.to_peerid_str(), "protocol message out");
88                        Some((packet.next_hop.into(), packet.data))
89                    }
90                    Err(error) => {
91                        tracing::error!(%error, "packet could not be wrapped for sending");
92                        None
93                    }
94                }
95            }
96        })
97        .filter_map(futures::future::ready)
98        .map(Ok)
99        .forward_to_timeout(wire_outgoing)
100        .in_current_span()
101        .await;
102
103    if let Err(error) = res {
104        tracing::error!(
105            task = "transport (protocol - msg egress)",
106            %error,
107            "long-running background task finished with error"
108        );
109    } else {
110        tracing::warn!(
111            task = "transport (protocol - msg egress)",
112            "long-running background task finished"
113        )
114    }
115}
116
117/// Performs HOPR protocol decoding of incoming packets into Application protocol packets.
118///
119/// `wire_incoming` --> `decoder` --> `ack_outgoing` (final + forwarded)
120///                             | --> `wire_outgoing` (forwarded)
121///                             | --> `ack_incoming` (forwarded)
122///                             | --> `app_incoming` (final)
123async fn start_incoming_packet_pipeline<WIn, WOut, D, T, TEvt, AckIn, AckOut, AppIn, AppInErr>(
124    (wire_outgoing, wire_incoming): (WOut, WIn),
125    decoder: std::sync::Arc<D>,
126    ticket_proc: std::sync::Arc<T>,
127    ticket_events: TEvt,
128    (ack_outgoing, ack_incoming): (AckOut, AckIn),
129    app_incoming: AppIn,
130) where
131    WIn: futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + 'static,
132    WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
133    WOut::Error: std::error::Error,
134    D: PacketDecoder + Send + 'static,
135    T: UnacknowledgedTicketProcessor + Send + 'static,
136    TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
137    TEvt::Error: std::error::Error,
138    AckIn: futures::Sink<(OffchainPublicKey, Vec<Acknowledgement>)> + Send + Unpin + Clone + 'static,
139    AckIn::Error: std::error::Error,
140    AckOut: futures::Sink<(OffchainPublicKey, Option<HalfKey>)> + Send + Unpin + Clone + 'static,
141    AckOut::Error: std::error::Error,
142    AppIn: futures::Sink<(HoprPseudonym, ApplicationDataIn), Error = SinkTimeoutError<AppInErr>> + Send + 'static,
143    AppInErr: std::error::Error,
144{
145    let ack_outgoing_success = ack_outgoing.clone();
146    let ack_outgoing_failure = ack_outgoing;
147    let ticket_proc_success = ticket_proc;
148
149    let res = wire_incoming
150        .then_concurrent(move |(peer, data)| {
151            let decoder = decoder.clone();
152            let mut ack_outgoing_failure = ack_outgoing_failure.clone();
153            let mut ticket_events_reject = ticket_events.clone();
154
155            tracing::trace!("protocol message in");
156
157            async move {
158                match decoder.decode(peer, data)
159                    .timeout(futures_time::time::Duration::from(PACKET_DECODING_TIMEOUT))
160                    .await {
161                    Ok(Ok(packet)) => {
162                        tracing::trace!(?packet, "successfully decoded incoming packet");
163                        Some(packet)
164                    },
165                    Ok(Err(IncomingPacketError::Undecodable(error))) => {
166                        // Do not send an ack back if the packet could not be decoded at all
167                        //
168                        // Potentially adversarial behavior
169                        tracing::trace!(%error, "not sending ack back on undecodable packet - possible adversarial behavior");
170                        None
171                    },
172                    Ok(Err(IncomingPacketError::ProcessingError(sender, error))) => {
173                        tracing::error!(%error, "failed to process the decoded packet");
174                        // On this failure, we send back a random acknowledgement
175                        ack_outgoing_failure
176                            .send((sender, None))
177                            .await
178                            .unwrap_or_else(|error| {
179                                tracing::error!(%error, "failed to send ack to the egress queue");
180                            });
181                        None
182                    },
183                    Ok(Err(IncomingPacketError::InvalidTicket(sender, error))) => {
184                        tracing::error!(%peer, %error, "failed to validate ticket on the received packet");
185                        if let Err(error) = ticket_events_reject
186                            .send(TicketEvent::RejectedTicket(error.ticket, error.issuer))
187                            .await {
188                            tracing::error!(%error, "failed to notify invalid ticket rejection");
189                        }
190                        // On this failure, we send back a random acknowledgement
191                        ack_outgoing_failure
192                            .send((sender, None))
193                            .await
194                            .unwrap_or_else(|error| {
195                                tracing::error!(%error, "failed to send ack to the egress queue");
196                            });
197                        None
198                    }
199                    Err(_) => {
200                        // If we cannot decode the packet within the time limit, just drop it
201                        tracing::error!("dropped incoming packet: failed to decode packet within {:?}", PACKET_DECODING_TIMEOUT);
202                        None
203                    }
204                }
205            }.instrument(tracing::debug_span!("incoming_packet_decode", %peer))
206        })
207        .filter_map(futures::future::ready)
208        .then_concurrent(move |packet| {
209            let ticket_proc = ticket_proc_success.clone();
210            let mut wire_outgoing = wire_outgoing.clone();
211            let mut ack_incoming = ack_incoming.clone();
212            let mut ack_outgoing_success = ack_outgoing_success.clone();
213            async move {
214                match packet {
215                    IncomingPacket::Acknowledgement(ack) => {
216                        let IncomingAcknowledgementPacket { previous_hop, received_acks, .. } = *ack;
217                        tracing::trace!(previous_hop = previous_hop.to_peerid_str(), num_acks = received_acks.len(), "incoming acknowledgements");
218                        ack_incoming
219                            .send((previous_hop, received_acks))
220                            .await
221                            .unwrap_or_else(|error| {
222                                tracing::error!(%error, "failed dispatching received acknowledgement to the ticket ack queue");
223                            });
224
225                        // We do not acknowledge back acknowledgements.
226                        None
227                    },
228                    IncomingPacket::Final(final_packet) => {
229                        let IncomingFinalPacket {
230                            previous_hop,
231                            sender,
232                            plain_text,
233                            ack_key,
234                            info,
235                            ..
236                        } = *final_packet;
237                        tracing::trace!(previous_hop = previous_hop.to_peerid_str(), "incoming final packet");
238
239                        // Send acknowledgement back
240                        ack_outgoing_success
241                            .send((previous_hop, Some(ack_key)))
242                            .await
243                            .unwrap_or_else(|error| {
244                                tracing::error!(%error, "failed to send ack to the egress queue");
245                            });
246
247                        #[cfg(all(feature = "prometheus", not(test)))]
248                        METRIC_PACKET_COUNT.increment(&["received"]);
249
250                        Some((sender, plain_text, info))
251                    }
252                    IncomingPacket::Forwarded(fwd_packet) => {
253                        let IncomingForwardedPacket {
254                            previous_hop,
255                            next_hop,
256                            data,
257                            ack_key_prev_hop,
258                            ack_challenge,
259                            received_ticket,
260                            ..
261                        } = *fwd_packet;
262                        if let Err(error) = ticket_proc.insert_unacknowledged_ticket(&next_hop, ack_challenge, received_ticket).await {
263                            tracing::error!(
264                                previous_hop = previous_hop.to_peerid_str(),
265                                next_hop = next_hop.to_peerid_str(),
266                                %error,
267                                "failed to insert unacknowledged ticket into the ticket processor"
268                            );
269                            return None;
270                        }
271
272                        // First, relay the packet to the next hop
273                        tracing::trace!(
274                            previous_hop = previous_hop.to_peerid_str(),
275                            next_hop = next_hop.to_peerid_str(),
276                            "forwarding packet to the next hop"
277                        );
278
279                        wire_outgoing
280                            .send((next_hop.into(), data))
281                            .await
282                            .unwrap_or_else(|error| {
283                                tracing::error!(%error, "failed to forward a packet to the transport layer");
284                            });
285
286                        #[cfg(all(feature = "prometheus", not(test)))]
287                        METRIC_PACKET_COUNT.increment(&["forwarded"]);
288
289                        // Send acknowledgement back
290                        tracing::trace!(previous_hop = previous_hop.to_peerid_str(), "acknowledging forwarded packet back");
291                        ack_outgoing_success
292                            .send((previous_hop, Some(ack_key_prev_hop)))
293                            .await
294                            .unwrap_or_else(|error| {
295                                tracing::error!(%error, "failed to send ack to the egress queue");
296                            });
297
298                        None
299                    }
300                }
301            }})
302        .filter_map(|maybe_data| futures::future::ready(
303            // Create the ApplicationDataIn data structure for incoming data
304            maybe_data
305                .and_then(|(sender, data, aux_info)| ApplicationData::try_from(data.as_ref())
306                    .inspect_err(|error| tracing::error!(%sender, %error, "failed to decode application data"))
307                    .ok()
308                    .map(|data| (sender, ApplicationDataIn {
309                        data,
310                        packet_info: IncomingPacketInfo {
311                            signals_from_sender: aux_info.packet_signals,
312                            num_saved_surbs: aux_info.num_surbs,
313                        }
314                    })))
315        ))
316        .map(Ok)
317        .forward_to_timeout(app_incoming)
318        .in_current_span()
319        .await;
320
321    if let Err(error) = res {
322        tracing::error!(
323            task = "transport (protocol - msg ingress)",
324            %error,
325            "long-running background task finished with error"
326        );
327    } else {
328        tracing::warn!(
329            task = "transport (protocol - msg ingress)",
330            "long-running background task finished"
331        )
332    }
333}
334
335async fn start_outgoing_ack_pipeline<AckOut, E, WOut>(
336    ack_outgoing: AckOut,
337    encoder: std::sync::Arc<E>,
338    cfg: AcknowledgementPipelineConfig,
339    packet_key: OffchainKeypair,
340    wire_outgoing: WOut,
341) where
342    AckOut: futures::Stream<Item = (OffchainPublicKey, Option<HalfKey>)> + Send + 'static,
343    E: PacketEncoder + Send + 'static,
344    WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
345    WOut::Error: std::error::Error,
346{
347    ack_outgoing
348        .then(move |(destination, maybe_ack_key)|{
349            let packet_key = packet_key.clone();
350            async move {
351                 // Sign acknowledgement with the given half-key or generate a signed random one
352                 let ack = hopr_parallelize::cpu::spawn_blocking(move || {
353                     maybe_ack_key
354                         .map(|ack_key| VerifiedAcknowledgement::new(ack_key, &packet_key))
355                         .unwrap_or_else(|| VerifiedAcknowledgement::random(&packet_key))
356                 })
357                 .await;
358                (destination, ack)
359            }
360        })
361        .buffer(futures_time::time::Duration::from(cfg.ack_buffer_interval))
362        .filter(|acks| futures::future::ready(!acks.is_empty()))
363        .flat_map(|buffered_acks| {
364            // Split the acknowledgements into groups based on the sender
365            // The halfbrown hash map will use Vec for a lower number of distinct senders, and possibly transition to
366            // the hashbrown hash map when the number of distinct senders exceeds 32.
367            let mut groups = halfbrown::HashMap::<OffchainPublicKey, Vec<VerifiedAcknowledgement>, ahash::RandomState>::with_capacity_and_hasher(
368                cfg.ack_grouping_capacity,
369                ahash::RandomState::default()
370            );
371            for (dst, ack) in buffered_acks {
372                groups
373                    .entry(dst)
374                    .and_modify(|v| v.push(ack))
375                    .or_insert_with(|| vec![ack]);
376            }
377            tracing::trace!(
378                num_groups = groups.len(),
379                num_acks = groups.values().map(|v| v.len()).sum::<usize>(),
380                "acknowledgements grouped"
381            );
382            futures::stream::iter(groups)
383        })
384        .for_each_concurrent(
385            NUM_CONCURRENT_ACK_OUT_PROCESSING,
386            move |(destination, acks)| {
387                let encoder = encoder.clone();
388                let mut wire_outgoing = wire_outgoing.clone();
389                async move {
390                    // Make sure that the acknowledgements are sent in batches of at most MAX_ACKNOWLEDGEMENTS_BATCH_SIZE
391                    for ack_chunk in acks.chunks(MAX_ACKNOWLEDGEMENTS_BATCH_SIZE) {
392                        match encoder.encode_acknowledgements(ack_chunk, &destination).await {
393                            Ok(ack_packet) => {
394                                wire_outgoing
395                                    .feed((ack_packet.next_hop.into(), ack_packet.data))
396                                    .await
397                                    .unwrap_or_else(|error| {
398                                        tracing::error!(%error, "failed to forward an acknowledgement to the transport layer");
399                                    });
400                            }
401                            Err(error) => tracing::error!(%error, "failed to create ack packet"),
402                        }
403                    }
404                    if let Err(error) = wire_outgoing.flush().await {
405                        tracing::error!(%error, "failed to flush acknowledgements batch to the transport layer");
406                    }
407                    tracing::trace!("acknowledgements out");
408                }.instrument(tracing::debug_span!("outgoing_ack_batch", peer = destination.to_peerid_str()))
409            }
410        )
411        .in_current_span()
412        .await;
413
414    tracing::warn!(
415        task = "transport (protocol - ack outgoing)",
416        "long-running background task finished"
417    );
418}
419
420async fn start_incoming_ack_pipeline<AckIn, T, TEvt>(
421    ack_incoming: AckIn,
422    ticket_events: TEvt,
423    ticket_proc: std::sync::Arc<T>,
424) where
425    AckIn: futures::Stream<Item = (OffchainPublicKey, Vec<Acknowledgement>)> + Send + 'static,
426    T: UnacknowledgedTicketProcessor + Sync + Send + 'static,
427    TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
428    TEvt::Error: std::error::Error,
429{
430    ack_incoming
431        .for_each_concurrent(NUM_CONCURRENT_TICKET_ACK_PROCESSING, move |(peer, acks)| {
432            let ticket_proc = ticket_proc.clone();
433            let mut ticket_evt = ticket_events.clone();
434            async move {
435                tracing::trace!(num = acks.len(), "received acknowledgements");
436                match ticket_proc.acknowledge_tickets(peer, acks).await {
437                    Ok(resolutions) if !resolutions.is_empty() => {
438                        let resolutions_iter = resolutions.into_iter().filter_map(|resolution| match resolution {
439                            ResolvedAcknowledgement::RelayingWin(redeemable_ticket) => {
440                                tracing::trace!("received ack for a winning ticket");
441                                Some(Ok(TicketEvent::WinningTicket(redeemable_ticket)))
442                            }
443                            ResolvedAcknowledgement::RelayingLoss(_) => {
444                                // Losing tickets are not getting accounted for anywhere.
445                                tracing::trace!("received ack for a losing ticket");
446                                None
447                            }
448                        });
449
450                        // All acknowledgements that resulted in winning tickets go upstream
451                        if let Err(error) = ticket_evt.send_all(&mut futures::stream::iter(resolutions_iter)).await {
452                            tracing::error!(%error, "failed to notify ticket resolutions");
453                        }
454                    }
455                    Ok(_) => {
456                        tracing::debug!("acknowledgement batch could not acknowledge any ticket");
457                    }
458                    Err(TicketAcknowledgementError::UnexpectedAcknowledgement) => {
459                        // Unexpected acknowledgements naturally happen
460                        // as acknowledgements of 0-hop packets
461                        tracing::trace!("received unexpected acknowledgement");
462                    }
463                    Err(error) => {
464                        tracing::error!(%error, "failed to acknowledge ticket");
465                    }
466                }
467            }
468            .instrument(tracing::debug_span!("incoming_ack_batch", peer = peer.to_peerid_str()))
469        })
470        .in_current_span()
471        .await;
472
473    tracing::warn!(
474        task = "transport (protocol - ticket acknowledgement)",
475        "long-running background task finished"
476    );
477}
478
479fn default_ack_buffer_interval() -> std::time::Duration {
480    std::time::Duration::from_millis(200)
481}
482
483fn default_ack_grouping_capacity() -> usize {
484    5
485}
486
487/// Configuration for the acknowledgement processing pipeline.
488#[derive(Debug, Copy, Clone, smart_default::SmartDefault, Eq, PartialEq)]
489#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
490pub struct AcknowledgementPipelineConfig {
491    /// Interval for which to wait to buffer acknowledgements before sending them out.
492    ///
493    /// Default is 200 ms.
494    #[default(default_ack_buffer_interval())]
495    #[cfg_attr(
496        feature = "serde",
497        serde(default = "default_ack_buffer_interval", with = "humantime_serde")
498    )]
499    pub ack_buffer_interval: std::time::Duration,
500    /// Initial capacity when grouping outgoing acknowledgements.
501    ///
502    /// If set too low, it causes additional reallocations in the outgoing acknowledgement processing pipeline.
503    /// The value should grow if `ack_buffer_interval` grows.
504    ///
505    /// Default is 5.
506    #[default(default_ack_grouping_capacity())]
507    #[cfg_attr(feature = "serde", serde(default = "default_ack_grouping_capacity"))]
508    pub ack_grouping_capacity: usize,
509}
510
511// Requires manual implementation due to https://github.com/Keats/validator/issues/285
512impl Validate for AcknowledgementPipelineConfig {
513    fn validate(&self) -> Result<(), ValidationErrors> {
514        let mut errors = ValidationErrors::new();
515        if self.ack_grouping_capacity == 0 {
516            errors.add("ack_grouping_capacity", ValidationError::new("must be greater than 0"));
517        }
518        if self.ack_buffer_interval < std::time::Duration::from_millis(10) {
519            errors.add("ack_buffer_interval", ValidationError::new("must be at least 10 ms"));
520        }
521        if errors.is_empty() { Ok(()) } else { Err(errors) }
522    }
523}
524
525/// Run all processes responsible for handling the msg and acknowledgment protocols.
526///
527/// The pipeline does not handle the mixing itself, that needs to be injected as a separate process
528/// overlay on top of the `wire_msg` Stream or Sink.
529#[tracing::instrument(skip_all, level = "trace", fields(me = packet_key.public().to_peerid_str()))]
530pub fn run_packet_pipeline<WIn, WOut, C, D, T, TEvt, AppOut, AppIn>(
531    packet_key: OffchainKeypair,
532    wire_msg: (WOut, WIn),
533    codec: (C, D),
534    ticket_proc: T,
535    ticket_events: TEvt,
536    ack_config: AcknowledgementPipelineConfig,
537    api: (AppOut, AppIn),
538) -> AbortableList<PacketPipelineProcesses>
539where
540    WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
541    WOut::Error: std::error::Error,
542    WIn: futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + 'static,
543    C: PacketEncoder + Sync + Send + 'static,
544    D: PacketDecoder + Sync + Send + 'static,
545    T: UnacknowledgedTicketProcessor + Sync + Send + 'static,
546    TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
547    TEvt::Error: std::error::Error,
548    AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
549    AppOut::Error: std::error::Error,
550    AppIn: futures::Stream<Item = (ResolvedTransportRouting, ApplicationDataOut)> + Send + 'static,
551{
552    let mut processes = AbortableList::default();
553
554    #[cfg(all(feature = "prometheus", not(test)))]
555    {
556        // Initialize the lazy statics here
557        lazy_static::initialize(&METRIC_PACKET_COUNT);
558    }
559
560    let (outgoing_ack_tx, outgoing_ack_rx) =
561        futures::channel::mpsc::channel::<(OffchainPublicKey, Option<HalfKey>)>(ACK_OUT_BUFFER_SIZE);
562
563    let (incoming_ack_tx, incoming_ack_rx) =
564        futures::channel::mpsc::channel::<(OffchainPublicKey, Vec<Acknowledgement>)>(TICKET_ACK_BUFFER_SIZE);
565
566    // Attach timeouts to all Sinks so that the pipelines are not blocked when
567    // some channel is not being timely processed
568    let (wire_out, wire_in) = (wire_msg.0.with_timeout(QUEUE_SEND_TIMEOUT), wire_msg.1);
569    let (app_out, app_in) = (api.0.with_timeout(QUEUE_SEND_TIMEOUT), api.1);
570    let incoming_ack_tx = incoming_ack_tx.with_timeout(QUEUE_SEND_TIMEOUT);
571    let outgoing_ack_tx = outgoing_ack_tx.with_timeout(QUEUE_SEND_TIMEOUT);
572    let ticket_events = ticket_events.with_timeout(QUEUE_SEND_TIMEOUT);
573
574    let encoder = std::sync::Arc::new(codec.0);
575    let decoder = std::sync::Arc::new(codec.1);
576    let ticket_proc = std::sync::Arc::new(ticket_proc);
577
578    processes.insert(
579        PacketPipelineProcesses::MsgOut,
580        spawn_as_abortable!(
581            start_outgoing_packet_pipeline(app_in, encoder.clone(), wire_out.clone(),).in_current_span()
582        ),
583    );
584
585    processes.insert(
586        PacketPipelineProcesses::MsgIn,
587        spawn_as_abortable!(
588            start_incoming_packet_pipeline(
589                (wire_out.clone(), wire_in),
590                decoder,
591                ticket_proc.clone(),
592                ticket_events.clone(),
593                (outgoing_ack_tx, incoming_ack_tx),
594                app_out,
595            )
596            .in_current_span()
597        ),
598    );
599
600    processes.insert(
601        PacketPipelineProcesses::AckOut,
602        spawn_as_abortable!(
603            start_outgoing_ack_pipeline(outgoing_ack_rx, encoder, ack_config, packet_key.clone(), wire_out,)
604                .in_current_span()
605        ),
606    );
607
608    processes.insert(
609        PacketPipelineProcesses::AckIn,
610        spawn_as_abortable!(start_incoming_ack_pipeline(incoming_ack_rx, ticket_events, ticket_proc).in_current_span()),
611    );
612
613    processes
614}