1use futures::{SinkExt, StreamExt};
2use futures_time::{future::FutureExt as TimeExt, stream::StreamExt as TimeStreamExt};
3use hopr_async_runtime::{AbortableList, spawn_as_abortable};
4use hopr_crypto_types::prelude::*;
5use hopr_internal_types::prelude::*;
6use hopr_network_types::{
7 prelude::*,
8 timeout::{SinkTimeoutError, TimeoutSinkExt, TimeoutStreamExt},
9};
10use hopr_primitive_types::prelude::Address;
11use hopr_protocol_app::prelude::*;
12use hopr_protocol_hopr::prelude::*;
13use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
14use tracing::Instrument;
15use validator::{Validate, ValidationError, ValidationErrors};
16
17use crate::PeerId;
18
19const TICKET_ACK_BUFFER_SIZE: usize = 1_000_000;
20const NUM_CONCURRENT_TICKET_ACK_PROCESSING: usize = 10;
21const ACK_OUT_BUFFER_SIZE: usize = 1_000_000;
22const NUM_CONCURRENT_ACK_OUT_PROCESSING: usize = 10;
23const QUEUE_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50);
24const PACKET_DECODING_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(150);
25
26#[cfg(all(feature = "prometheus", not(test)))]
27lazy_static::lazy_static! {
28 static ref METRIC_PACKET_COUNT: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
29 "hopr_packets_count",
30 "Number of processed packets of different types (sent, received, forwarded)",
31 &["type"]
32 ).unwrap();
33 static ref METRIC_PACKET_DECODE_TIMEOUTS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
38 "hopr_packet_decode_timeouts_total",
39 "Number of incoming packets dropped due to decode timeout"
40 ).unwrap();
41 static ref METRIC_VALIDATION_ERRORS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
42 "hopr_packet_ticket_validation_errors",
43 "Number of different ticket validation errors encountered during packet processing",
44 &["type"]
45 ).unwrap();
46}
47
48#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
49pub enum PacketPipelineProcesses {
50 #[strum(to_string = "HOPR [msg] - ingress")]
51 MsgIn,
52 #[strum(to_string = "HOPR [msg] - egress")]
53 MsgOut,
54 #[strum(to_string = "HOPR [ack] - egress")]
55 AckOut,
56 #[strum(to_string = "HOPR [ack] - ingress")]
57 AckIn,
58 #[strum(to_string = "HOPR [msg] - mixer")]
59 Mixer,
60}
61
62#[derive(Debug, Clone, strum::EnumIs, strum::EnumTryAs)]
64pub enum TicketEvent {
65 WinningTicket(Box<RedeemableTicket>),
67 RejectedTicket(Box<Ticket>, Option<Address>),
69}
70
71async fn start_outgoing_packet_pipeline<AppOut, E, WOut, WOutErr>(
73 app_outgoing: AppOut,
74 encoder: std::sync::Arc<E>,
75 wire_outgoing: WOut,
76 concurrency: usize,
77) where
78 AppOut: futures::Stream<Item = (ResolvedTransportRouting, ApplicationDataOut)> + Send + 'static,
79 E: PacketEncoder + Send + 'static,
80 WOut: futures::Sink<(PeerId, Box<[u8]>), Error = SinkTimeoutError<WOutErr>> + Clone + Unpin + Send + 'static,
81 WOutErr: std::error::Error,
82{
83 let res = app_outgoing
84 .then_concurrent(
85 |(routing, data)| {
86 let encoder = encoder.clone();
87 async move {
88 match encoder
89 .encode_packet(
90 data.data.to_bytes(),
91 routing,
92 data.packet_info
93 .map(|data| data.signals_to_destination)
94 .unwrap_or_default(),
95 )
96 .await
97 {
98 Ok(packet) => {
99 #[cfg(all(feature = "prometheus", not(test)))]
100 METRIC_PACKET_COUNT.increment(&["sent"]);
101
102 tracing::trace!(peer = packet.next_hop.to_peerid_str(), "protocol message out");
103 Some((packet.next_hop.into(), packet.data))
104 }
105 Err(error) => {
106 tracing::error!(%error, "packet could not be wrapped for sending");
107 None
108 }
109 }
110 }
111 },
112 concurrency,
113 )
114 .filter_map(futures::future::ready)
115 .map(Ok)
116 .forward_to_timeout(wire_outgoing)
117 .in_current_span()
118 .await;
119
120 if let Err(error) = res {
121 tracing::error!(
122 task = "transport (protocol - msg egress)",
123 %error,
124 "long-running background task finished with error"
125 );
126 } else {
127 tracing::warn!(
128 task = "transport (protocol - msg egress)",
129 "long-running background task finished"
130 )
131 }
132}
133
134async fn start_incoming_packet_pipeline<WIn, WOut, D, T, TEvt, AckIn, AckOut, AppIn, AppInErr>(
141 (wire_outgoing, wire_incoming): (WOut, WIn),
142 decoder: std::sync::Arc<D>,
143 ticket_proc: std::sync::Arc<T>,
144 ticket_events: TEvt,
145 (ack_outgoing, ack_incoming): (AckOut, AckIn),
146 app_incoming: AppIn,
147 concurrency: usize,
148) where
149 WIn: futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + 'static,
150 WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
151 WOut::Error: std::error::Error,
152 D: PacketDecoder + Send + 'static,
153 T: UnacknowledgedTicketProcessor + Send + 'static,
154 TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
155 TEvt::Error: std::error::Error,
156 AckIn: futures::Sink<(OffchainPublicKey, Vec<Acknowledgement>)> + Send + Unpin + Clone + 'static,
157 AckIn::Error: std::error::Error,
158 AckOut: futures::Sink<(OffchainPublicKey, Option<HalfKey>)> + Send + Unpin + Clone + 'static,
159 AckOut::Error: std::error::Error,
160 AppIn: futures::Sink<(HoprPseudonym, ApplicationDataIn), Error = SinkTimeoutError<AppInErr>> + Send + 'static,
161 AppInErr: std::error::Error,
162{
163 let ack_outgoing_success = ack_outgoing.clone();
164 let ack_outgoing_failure = ack_outgoing;
165 let ticket_proc_success = ticket_proc;
166
167 let res = wire_incoming
168 .then_concurrent(move |(peer, data)| {
169 let decoder = decoder.clone();
170 let mut ack_outgoing_failure = ack_outgoing_failure.clone();
171 let mut ticket_events_reject = ticket_events.clone();
172
173 tracing::trace!(%peer, "protocol message in");
174
175 async move {
176 match decoder.decode(peer, data)
177 .timeout(futures_time::time::Duration::from(PACKET_DECODING_TIMEOUT))
178 .await {
179 Ok(Ok(packet)) => {
180 tracing::trace!(%peer, ?packet, "successfully decoded incoming packet");
181 Some(packet)
182 },
183 Ok(Err(IncomingPacketError::Overloaded(error))) => {
184 tracing::warn!(%peer, %error, "dropping packet due to local CPU overload");
185 None
186 },
187 Ok(Err(IncomingPacketError::Undecodable(error))) => {
188 tracing::trace!(%peer, %error, "not sending ack back on undecodable packet - possible adversarial behavior");
192 None
193 },
194 Ok(Err(IncomingPacketError::ProcessingError(sender, error))) => {
195 tracing::error!(%peer, %error, "failed to process the decoded packet");
196 ack_outgoing_failure
198 .send((sender, None))
199 .await
200 .unwrap_or_else(|error| {
201 tracing::error!(%error, "failed to send ack to the egress queue");
202 });
203 None
204 },
205 Ok(Err(IncomingPacketError::InvalidTicket(sender, error))) => {
206 tracing::error!(%peer, %error, "failed to validate ticket on the received packet");
207 if let Err(error) = ticket_events_reject
208 .send(TicketEvent::RejectedTicket(error.ticket, error.issuer))
209 .await {
210 tracing::error!(%error, "failed to notify invalid ticket rejection");
211 }
212 ack_outgoing_failure
214 .send((sender, None))
215 .await
216 .unwrap_or_else(|error| {
217 tracing::error!(%error, "failed to send ack to the egress queue");
218 });
219
220 #[cfg(all(feature = "prometheus", not(test)))]
221 METRIC_VALIDATION_ERRORS.increment(&[error.kind.as_ref()]);
222
223 None
224 }
225 Err(_) => {
226 tracing::error!(
228 %peer,
229 timeout_ms = PACKET_DECODING_TIMEOUT.as_millis() as u64,
230 "dropped incoming packet: decode timeout - check the 'hopr_rayon_queue_wait_seconds' metric for pool saturation"
231 );
232 #[cfg(all(feature = "prometheus", not(test)))]
233 METRIC_PACKET_DECODE_TIMEOUTS.increment();
234
235 None
236 }
237 }
238 }.instrument(tracing::debug_span!("incoming_packet_decode", %peer))
239 }, concurrency)
240 .filter_map(futures::future::ready)
241 .then_concurrent(move |packet| {
242 let ticket_proc = ticket_proc_success.clone();
243 let mut wire_outgoing = wire_outgoing.clone();
244 let mut ack_incoming = ack_incoming.clone();
245 let mut ack_outgoing_success = ack_outgoing_success.clone();
246 async move {
247 match packet {
248 IncomingPacket::Acknowledgement(ack) => {
249 let IncomingAcknowledgementPacket { previous_hop, received_acks, .. } = *ack;
250 tracing::trace!(previous_hop = previous_hop.to_peerid_str(), num_acks = received_acks.len(), "incoming acknowledgements");
251 ack_incoming
252 .send((previous_hop, received_acks))
253 .await
254 .unwrap_or_else(|error| {
255 tracing::error!(%error, "failed dispatching received acknowledgement to the ticket ack queue");
256 });
257
258 None
260 },
261 IncomingPacket::Final(final_packet) => {
262 let IncomingFinalPacket {
263 previous_hop,
264 sender,
265 plain_text,
266 ack_key,
267 info,
268 ..
269 } = *final_packet;
270 tracing::trace!(previous_hop = previous_hop.to_peerid_str(), "incoming final packet");
271
272 ack_outgoing_success
274 .send((previous_hop, Some(ack_key)))
275 .await
276 .unwrap_or_else(|error| {
277 tracing::error!(%error, "failed to send ack to the egress queue");
278 });
279
280 #[cfg(all(feature = "prometheus", not(test)))]
281 METRIC_PACKET_COUNT.increment(&["received"]);
282
283 Some((sender, plain_text, info))
284 }
285 IncomingPacket::Forwarded(fwd_packet) => {
286 let IncomingForwardedPacket {
287 previous_hop,
288 next_hop,
289 data,
290 ack_key_prev_hop,
291 ack_challenge,
292 received_ticket,
293 ..
294 } = *fwd_packet;
295 if let Err(error) = ticket_proc.insert_unacknowledged_ticket(&next_hop, ack_challenge, received_ticket).await {
296 tracing::error!(
297 previous_hop = previous_hop.to_peerid_str(),
298 next_hop = next_hop.to_peerid_str(),
299 %error,
300 "failed to insert unacknowledged ticket into the ticket processor"
301 );
302 return None;
303 }
304
305 tracing::trace!(
307 previous_hop = previous_hop.to_peerid_str(),
308 next_hop = next_hop.to_peerid_str(),
309 "forwarding packet to the next hop"
310 );
311
312 wire_outgoing
313 .send((next_hop.into(), data))
314 .await
315 .unwrap_or_else(|error| {
316 tracing::error!(%error, "failed to forward a packet to the transport layer");
317 });
318
319 #[cfg(all(feature = "prometheus", not(test)))]
320 METRIC_PACKET_COUNT.increment(&["forwarded"]);
321
322 tracing::trace!(previous_hop = previous_hop.to_peerid_str(), "acknowledging forwarded packet back");
324 ack_outgoing_success
325 .send((previous_hop, Some(ack_key_prev_hop)))
326 .await
327 .unwrap_or_else(|error| {
328 tracing::error!(%error, "failed to send ack to the egress queue");
329 });
330
331 None
332 }
333 }
334 }}, concurrency)
335 .filter_map(|maybe_data| futures::future::ready(
336 maybe_data
338 .and_then(|(sender, data, aux_info)| ApplicationData::try_from(data.as_ref())
339 .inspect_err(|error| tracing::error!(%sender, %error, "failed to decode application data"))
340 .ok()
341 .map(|data| (sender, ApplicationDataIn {
342 data,
343 packet_info: IncomingPacketInfo {
344 signals_from_sender: aux_info.packet_signals,
345 num_saved_surbs: aux_info.num_surbs,
346 }
347 })))
348 ))
349 .map(Ok)
350 .forward_to_timeout(app_incoming)
351 .in_current_span()
352 .await;
353
354 if let Err(error) = res {
355 tracing::error!(
356 task = "transport (protocol - msg ingress)",
357 %error,
358 "long-running background task finished with error"
359 );
360 } else {
361 tracing::warn!(
362 task = "transport (protocol - msg ingress)",
363 "long-running background task finished"
364 )
365 }
366}
367
368async fn start_outgoing_ack_pipeline<AckOut, E, WOut>(
369 ack_outgoing: AckOut,
370 encoder: std::sync::Arc<E>,
371 cfg: AcknowledgementPipelineConfig,
372 packet_key: OffchainKeypair,
373 wire_outgoing: WOut,
374) where
375 AckOut: futures::Stream<Item = (OffchainPublicKey, Option<HalfKey>)> + Send + 'static,
376 E: PacketEncoder + Send + 'static,
377 WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
378 WOut::Error: std::error::Error,
379{
380 ack_outgoing
381 .map(move |(destination, maybe_ack_key)| {
382 let packet_key = packet_key.clone();
383 let ack = maybe_ack_key
385 .map(|ack_key| VerifiedAcknowledgement::new(ack_key, &packet_key))
386 .unwrap_or_else(|| VerifiedAcknowledgement::random(&packet_key));
387 (destination, ack)
388 })
389 .buffer(futures_time::time::Duration::from(cfg.ack_buffer_interval))
390 .filter(|acks| futures::future::ready(!acks.is_empty()))
391 .flat_map(|buffered_acks| {
392 let mut groups = halfbrown::HashMap::<OffchainPublicKey, Vec<VerifiedAcknowledgement>, ahash::RandomState>::with_capacity_and_hasher(
396 cfg.ack_grouping_capacity,
397 ahash::RandomState::default()
398 );
399 for (dst, ack) in buffered_acks {
400 groups
401 .entry(dst)
402 .and_modify(|v| v.push(ack))
403 .or_insert_with(|| vec![ack]);
404 }
405 tracing::trace!(
406 num_groups = groups.len(),
407 num_acks = groups.values().map(|v| v.len()).sum::<usize>(),
408 "acknowledgements grouped"
409 );
410 futures::stream::iter(groups)
411 })
412 .for_each_concurrent(
413 NUM_CONCURRENT_ACK_OUT_PROCESSING,
414 move |(destination, acks)| {
415 let encoder = encoder.clone();
416 let mut wire_outgoing = wire_outgoing.clone();
417 async move {
418 for ack_chunk in acks.chunks(MAX_ACKNOWLEDGEMENTS_BATCH_SIZE) {
420 match encoder.encode_acknowledgements(ack_chunk, &destination).await {
421 Ok(ack_packet) => {
422 wire_outgoing
423 .feed((ack_packet.next_hop.into(), ack_packet.data))
424 .await
425 .unwrap_or_else(|error| {
426 tracing::error!(%error, "failed to forward an acknowledgement to the transport layer");
427 });
428 }
429 Err(error) => tracing::error!(%error, "failed to create ack packet"),
430 }
431 }
432 if let Err(error) = wire_outgoing.flush().await {
433 tracing::error!(%error, "failed to flush acknowledgements batch to the transport layer");
434 }
435 tracing::trace!("acknowledgements out");
436 }.instrument(tracing::debug_span!("outgoing_ack_batch", peer = destination.to_peerid_str()))
437 }
438 )
439 .in_current_span()
440 .await;
441
442 tracing::warn!(
443 task = "transport (protocol - ack outgoing)",
444 "long-running background task finished"
445 );
446}
447
448async fn start_incoming_ack_pipeline<AckIn, T, TEvt>(
449 ack_incoming: AckIn,
450 ticket_events: TEvt,
451 ticket_proc: std::sync::Arc<T>,
452) where
453 AckIn: futures::Stream<Item = (OffchainPublicKey, Vec<Acknowledgement>)> + Send + 'static,
454 T: UnacknowledgedTicketProcessor + Sync + Send + 'static,
455 TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
456 TEvt::Error: std::error::Error,
457{
458 ack_incoming
459 .for_each_concurrent(NUM_CONCURRENT_TICKET_ACK_PROCESSING, move |(peer, acks)| {
460 let ticket_proc = ticket_proc.clone();
461 let mut ticket_evt = ticket_events.clone();
462 async move {
463 tracing::trace!(num = acks.len(), "received acknowledgements");
464 match ticket_proc.acknowledge_tickets(peer, acks).await {
465 Ok(resolutions) if !resolutions.is_empty() => {
466 let resolutions_iter = resolutions.into_iter().filter_map(|resolution| match resolution {
467 ResolvedAcknowledgement::RelayingWin(redeemable_ticket) => {
468 tracing::trace!("received ack for a winning ticket");
469 Some(Ok(TicketEvent::WinningTicket(redeemable_ticket)))
470 }
471 ResolvedAcknowledgement::RelayingLoss(_) => {
472 tracing::trace!("received ack for a losing ticket");
474 None
475 }
476 });
477
478 if let Err(error) = ticket_evt.send_all(&mut futures::stream::iter(resolutions_iter)).await {
480 tracing::error!(%error, "failed to notify ticket resolutions");
481 }
482 }
483 Ok(_) => {
484 tracing::debug!("acknowledgement batch could not acknowledge any ticket");
485 }
486 Err(TicketAcknowledgementError::UnexpectedAcknowledgement) => {
487 tracing::trace!("received unexpected acknowledgement");
490 }
491 Err(error) => {
492 tracing::error!(%error, "failed to acknowledge ticket");
493 }
494 }
495 }
496 .instrument(tracing::debug_span!("incoming_ack_batch", peer = peer.to_peerid_str()))
497 })
498 .in_current_span()
499 .await;
500
501 tracing::warn!(
502 task = "transport (protocol - ticket acknowledgement)",
503 "long-running background task finished"
504 );
505}
506
507fn default_ack_buffer_interval() -> std::time::Duration {
508 std::time::Duration::from_millis(200)
509}
510
511fn default_ack_grouping_capacity() -> usize {
512 5
513}
514
515#[derive(Debug, Copy, Clone, smart_default::SmartDefault, Eq, PartialEq)]
517#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
518pub struct AcknowledgementPipelineConfig {
519 #[default(default_ack_buffer_interval())]
523 #[cfg_attr(
524 feature = "serde",
525 serde(default = "default_ack_buffer_interval", with = "humantime_serde")
526 )]
527 pub ack_buffer_interval: std::time::Duration,
528 #[default(default_ack_grouping_capacity())]
535 #[cfg_attr(feature = "serde", serde(default = "default_ack_grouping_capacity"))]
536 pub ack_grouping_capacity: usize,
537}
538
539impl Validate for AcknowledgementPipelineConfig {
541 fn validate(&self) -> Result<(), ValidationErrors> {
542 let mut errors = ValidationErrors::new();
543 if self.ack_grouping_capacity == 0 {
544 errors.add("ack_grouping_capacity", ValidationError::new("must be greater than 0"));
545 }
546 if self.ack_buffer_interval < std::time::Duration::from_millis(10) {
547 errors.add("ack_buffer_interval", ValidationError::new("must be at least 10 ms"));
548 }
549 if errors.is_empty() { Ok(()) } else { Err(errors) }
550 }
551}
552
553#[derive(Clone, Copy, Debug, Default, PartialEq, Validate)]
555#[cfg_attr(
556 feature = "serde",
557 derive(serde::Serialize, serde::Deserialize),
558 serde(deny_unknown_fields)
559)]
560pub struct PacketPipelineConfig {
561 pub output_concurrency: Option<usize>,
565 pub input_concurrency: Option<usize>,
569 #[validate(nested)]
571 pub ack_config: AcknowledgementPipelineConfig,
572}
573
574#[tracing::instrument(skip_all, level = "trace", fields(me = packet_key.public().to_peerid_str()))]
579pub fn run_packet_pipeline<WIn, WOut, C, D, T, TEvt, AppOut, AppIn>(
580 packet_key: OffchainKeypair,
581 wire_msg: (WOut, WIn),
582 codec: (C, D),
583 ticket_proc: T,
584 ticket_events: TEvt,
585 cfg: PacketPipelineConfig,
586 api: (AppOut, AppIn),
587) -> AbortableList<PacketPipelineProcesses>
588where
589 WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
590 WOut::Error: std::error::Error,
591 WIn: futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + 'static,
592 C: PacketEncoder + Sync + Send + 'static,
593 D: PacketDecoder + Sync + Send + 'static,
594 T: UnacknowledgedTicketProcessor + Sync + Send + 'static,
595 TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
596 TEvt::Error: std::error::Error,
597 AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
598 AppOut::Error: std::error::Error,
599 AppIn: futures::Stream<Item = (ResolvedTransportRouting, ApplicationDataOut)> + Send + 'static,
600{
601 let mut processes = AbortableList::default();
602
603 #[cfg(all(feature = "prometheus", not(test)))]
604 {
605 lazy_static::initialize(&METRIC_PACKET_COUNT);
607 lazy_static::initialize(&METRIC_PACKET_DECODE_TIMEOUTS);
608 lazy_static::initialize(&METRIC_VALIDATION_ERRORS);
609 }
610
611 let (outgoing_ack_tx, outgoing_ack_rx) =
612 futures::channel::mpsc::channel::<(OffchainPublicKey, Option<HalfKey>)>(ACK_OUT_BUFFER_SIZE);
613
614 let (incoming_ack_tx, incoming_ack_rx) =
615 futures::channel::mpsc::channel::<(OffchainPublicKey, Vec<Acknowledgement>)>(TICKET_ACK_BUFFER_SIZE);
616
617 let (wire_out, wire_in) = (wire_msg.0.with_timeout(QUEUE_SEND_TIMEOUT), wire_msg.1);
620 let (app_out, app_in) = (api.0.with_timeout(QUEUE_SEND_TIMEOUT), api.1);
621 let incoming_ack_tx = incoming_ack_tx.with_timeout(QUEUE_SEND_TIMEOUT);
622 let outgoing_ack_tx = outgoing_ack_tx.with_timeout(QUEUE_SEND_TIMEOUT);
623 let ticket_events = ticket_events.with_timeout(QUEUE_SEND_TIMEOUT);
624
625 let encoder = std::sync::Arc::new(codec.0);
626 let decoder = std::sync::Arc::new(codec.1);
627 let ticket_proc = std::sync::Arc::new(ticket_proc);
628
629 let avail_concurrency = std::thread::available_parallelism()
632 .ok()
633 .map(|n| n.get())
634 .unwrap_or(1)
635 .max(1)
636 * 8;
637
638 let output_concurrency = cfg.output_concurrency.filter(|&n| n > 0).unwrap_or(avail_concurrency);
639 let input_concurrency = cfg.input_concurrency.filter(|&n| n > 0).unwrap_or(avail_concurrency);
640
641 processes.insert(
642 PacketPipelineProcesses::MsgOut,
643 spawn_as_abortable!(
644 start_outgoing_packet_pipeline(app_in, encoder.clone(), wire_out.clone(), output_concurrency)
645 .in_current_span()
646 ),
647 );
648
649 processes.insert(
650 PacketPipelineProcesses::MsgIn,
651 spawn_as_abortable!(
652 start_incoming_packet_pipeline(
653 (wire_out.clone(), wire_in),
654 decoder,
655 ticket_proc.clone(),
656 ticket_events.clone(),
657 (outgoing_ack_tx, incoming_ack_tx),
658 app_out,
659 input_concurrency,
660 )
661 .in_current_span()
662 ),
663 );
664
665 processes.insert(
666 PacketPipelineProcesses::AckOut,
667 spawn_as_abortable!(
668 start_outgoing_ack_pipeline(outgoing_ack_rx, encoder, cfg.ack_config, packet_key.clone(), wire_out,)
669 .in_current_span()
670 ),
671 );
672
673 processes.insert(
674 PacketPipelineProcesses::AckIn,
675 spawn_as_abortable!(start_incoming_ack_pipeline(incoming_ack_rx, ticket_events, ticket_proc).in_current_span()),
676 );
677
678 processes
679}