1mod builder;
4mod config;
5
6pub use builder::{PacketPipelineBuilder, Unset};
7use bytes::Bytes;
8pub use config::{AcknowledgementPipelineConfig, PacketPipelineConfig};
9use futures::{SinkExt, StreamExt, future::Either};
10use futures_time::{future::FutureExt as TimeExt, stream::StreamExt as TimeStreamExt};
11use hopr_api::{
12 PeerId,
13 node::TicketEvent,
14 types::{crypto::prelude::*, internal::prelude::*},
15};
16use hopr_crypto_packet::HoprSurb;
17use hopr_protocol_app::prelude::*;
18use hopr_protocol_hopr::prelude::*;
19use hopr_utils::{
20 network_types::timeout::{SinkTimeoutError, TimeoutSinkExt, TimeoutStreamExt},
21 runtime::AbortableList,
22};
23use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
24use tracing::Instrument;
25
26use crate::PeerProtocolCounterRegistry;
27
28const DEFAULT_ACK_INPUT_CONCURRENCY: usize = 10;
31const DEFAULT_ACK_OUTPUT_CONCURRENCY: usize = 10;
34const QUEUE_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50);
35const PACKET_DECODING_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(150);
36const PACKET_ENCODING_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(150);
37
38#[cfg(all(feature = "telemetry", not(test)))]
39lazy_static::lazy_static! {
40 static ref METRIC_PACKET_COUNT: hopr_types::telemetry::MultiCounter = hopr_types::telemetry::MultiCounter::new(
41 "hopr_packets_count",
42 "Number of processed packets of different types (sent, received, forwarded)",
43 &["type"]
44 ).unwrap();
45 static ref METRIC_PACKET_REJECTED_COUNT: hopr_types::telemetry::MultiCounter = hopr_types::telemetry::MultiCounter::new(
46 "hopr_packet_rejected_count",
47 "Number of incoming packets rejected due various reasons",
48 &["reason"]
49 ).unwrap();
50 static ref METRIC_PACKET_DECODE_TIMEOUTS: hopr_types::telemetry::SimpleCounter = hopr_types::telemetry::SimpleCounter::new(
55 "hopr_packet_decode_timeouts_total",
56 "Number of incoming packets dropped due to decode timeout (sustained rate indicates Rayon pool saturation)"
57 ).unwrap();
58 static ref METRIC_VALIDATION_ERRORS: hopr_types::telemetry::MultiCounter = hopr_types::telemetry::MultiCounter::new(
59 "hopr_packet_ticket_validation_errors",
60 "Number of different ticket validation errors encountered during packet processing",
61 &["type"]
62 ).unwrap();
63}
64
65#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
66pub enum PacketPipelineProcesses {
67 #[strum(to_string = "HOPR [msg] - ingress")]
68 MsgIn,
69 #[strum(to_string = "HOPR [msg] - egress")]
70 MsgOut,
71 #[strum(to_string = "HOPR [ack] - egress")]
72 AckOut,
73 #[strum(to_string = "HOPR [ack] - ingress")]
74 AckIn,
75 #[strum(to_string = "HOPR [msg] - mixer")]
76 Mixer,
77}
78
79async fn start_outgoing_packet_pipeline<AppOut, E, WOut, WOutErr>(
81 app_outgoing: AppOut,
82 encoder: std::sync::Arc<E>,
83 wire_outgoing: WOut,
84 counters: super::counters::PeerProtocolCounterRegistry,
85 concurrency: usize,
86) where
87 AppOut: futures::Stream<Item = (ResolvedTransportRouting<HoprSurb>, ApplicationDataOut)> + Send + 'static,
88 E: PacketEncoder + Send + Sync + 'static,
89 WOut: futures::Sink<(PeerId, Bytes), Error = SinkTimeoutError<WOutErr>> + Clone + Unpin + Send + 'static,
90 WOutErr: std::error::Error,
91{
92 let res = app_outgoing
93 .then_concurrent(
94 |(routing, data)| {
95 let encoder = encoder.clone();
96 let counters = counters.clone();
97 async move {
98 match hopr_utils::parallelize::cpu::spawn_fifo_blocking(
99 move || {
100 encoder.encode_packet(
101 data.data.to_bytes(),
102 routing,
103 data.packet_info
104 .map(|data| data.signals_to_destination)
105 .unwrap_or_default(),
106 )
107 },
108 "packet_encode",
109 )
110 .timeout(futures_time::time::Duration::from(PACKET_ENCODING_TIMEOUT))
111 .await
112 {
113 Ok(Ok(Ok(packet))) => {
114 #[cfg(all(feature = "telemetry", not(test)))]
115 METRIC_PACKET_COUNT.increment(&["sent"]);
116
117 counters.get_or_create(&packet.next_hop).record_message_sent();
118 tracing::trace!(peer = packet.next_hop.to_peerid_str(), "protocol message out");
119 Some((packet.next_hop.into(), packet.data))
120 }
121 Ok(Ok(Err(error))) => {
122 tracing::error!(%error, "outgoing packet could not be encoded");
123 None
124 }
125 Ok(Err(error)) => {
126 tracing::error!(%error, "parallel processing of the outgoing packet failed");
127 None
128 }
129 Err(error) => {
130 tracing::error!(%error, "timeout while processing the outgoing packet");
131 None
132 }
133 }
134 }
135 },
136 concurrency,
137 )
138 .filter_map(futures::future::ready)
139 .map(Ok)
140 .forward_to_timeout(wire_outgoing)
141 .in_current_span()
142 .await;
143
144 if let Err(error) = res {
145 tracing::error!(
146 task = "transport (protocol - msg egress)",
147 %error,
148 "long-running background task finished with error"
149 );
150 } else {
151 tracing::warn!(
152 task = "transport (protocol - msg egress)",
153 "long-running background task finished"
154 )
155 }
156}
157
158#[allow(clippy::too_many_arguments)]
165async fn start_incoming_packet_pipeline<WIn, WOut, D, T, TEvt, AckIn, AckOut, AppIn, AppInErr>(
166 (wire_outgoing, wire_incoming): (WOut, WIn),
167 decoder: std::sync::Arc<D>,
168 ticket_proc: std::sync::Arc<T>,
169 ticket_events: TEvt,
170 (ack_outgoing, ack_incoming): (AckOut, AckIn),
171 app_incoming: AppIn,
172 counters: super::counters::PeerProtocolCounterRegistry,
173 concurrency: usize,
174) where
175 WIn: futures::Stream<Item = (PeerId, Bytes)> + Send + 'static,
176 WOut: futures::Sink<(PeerId, Bytes)> + Clone + Unpin + Send + 'static,
177 WOut::Error: std::error::Error,
178 D: PacketDecoder + Sync + Send + 'static,
179 T: UnacknowledgedTicketProcessor + Send + 'static,
180 TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
181 TEvt::Error: std::error::Error,
182 AckIn: futures::Sink<(OffchainPublicKey, Vec<Acknowledgement>)> + Send + Unpin + Clone + 'static,
183 AckIn::Error: std::error::Error,
184 AckOut: futures::Sink<(OffchainPublicKey, Option<HalfKey>)> + Send + Unpin + Clone + 'static,
185 AckOut::Error: std::error::Error,
186 AppIn: futures::Sink<(HoprPseudonym, ApplicationDataIn), Error = SinkTimeoutError<AppInErr>> + Send + 'static,
187 AppInErr: std::error::Error,
188{
189 let ack_outgoing_success = ack_outgoing.clone();
190 let ack_outgoing_failure = ack_outgoing;
191 let ticket_proc_success = ticket_proc;
192
193 let res = wire_incoming
194 .then_concurrent(move |(peer, data)| {
195 let decoder = decoder.clone();
196 let mut ack_outgoing_failure = ack_outgoing_failure.clone();
197 let mut ticket_events_reject = ticket_events.clone();
198
199 tracing::trace!(%peer, "protocol message in");
200
201 async move {
202 match hopr_utils::parallelize::cpu::spawn_fifo_blocking(move || decoder.decode(peer, data), "packet_decode")
203 .timeout(futures_time::time::Duration::from(PACKET_DECODING_TIMEOUT))
204 .await {
205 Ok(Ok(Ok(packet))) => {
206 tracing::trace!(%peer, ?packet, "successfully decoded incoming packet");
207 Some(packet)
208 },
209 Ok(Ok(Err(IncomingPacketError::Undecodable(error)))) => {
210 tracing::trace!(%peer, %error, "not sending ack back on undecodable packet - possible adversarial behavior");
214
215 #[cfg(all(feature = "telemetry", not(test)))]
216 METRIC_PACKET_REJECTED_COUNT.increment(&["undecodable"]);
217
218 None
219 },
220 Ok(Ok(Err(IncomingPacketError::ProcessingError(sender, error)))) => {
221 tracing::error!(%peer, %error, "failed to process the decoded packet");
222 ack_outgoing_failure
224 .send((*sender, None))
225 .await
226 .unwrap_or_else(|error| {
227 tracing::error!(%error, "failed to send ack to the egress queue");
228 });
229
230 #[cfg(all(feature = "telemetry", not(test)))]
231 METRIC_PACKET_REJECTED_COUNT.increment(&["processing_error"]);
232
233 None
234 },
235 Ok(Ok(Err(IncomingPacketError::InvalidTicket(sender, error)))) => {
236 tracing::error!(%peer, %error, "failed to validate ticket on the received packet");
237 if let Err(error) = ticket_events_reject
238 .send(TicketEvent::RejectedTicket(error.ticket, error.issuer))
239 .await {
240 tracing::error!(%error, "failed to notify invalid ticket rejection");
241 }
242 ack_outgoing_failure
244 .send((*sender, None))
245 .await
246 .unwrap_or_else(|error| {
247 tracing::error!(%error, "failed to send ack to the egress queue");
248 });
249
250 #[cfg(all(feature = "telemetry", not(test)))]
251 {
252 METRIC_VALIDATION_ERRORS.increment(&[error.kind.as_ref()]);
253 METRIC_PACKET_REJECTED_COUNT.increment(&["invalid_ticket"]);
254 }
255
256 None
257 }
258 Ok(Err(error)) => {
259 tracing::error!(%error, "parallel processing of the incoming packet failed");
260 None
261 },
262 Err(_) => {
263 tracing::error!(
265 %peer,
266 timeout_ms = PACKET_DECODING_TIMEOUT.as_millis() as u64,
267 "dropped incoming packet: decode timeout - check the 'hopr_rayon_queue_wait_seconds' metric for pool saturation"
268 );
269 #[cfg(all(feature = "telemetry", not(test)))]
270 {
271 METRIC_PACKET_DECODE_TIMEOUTS.increment();
272 METRIC_PACKET_REJECTED_COUNT.increment(&["timeout"]);
273 }
274
275 None
276 }
277 }
278 }.instrument(tracing::debug_span!("incoming_packet_decode", %peer))
279 }, concurrency)
280 .filter_map(futures::future::ready)
281 .then_concurrent(move |packet| {
285 match packet {
286 IncomingPacket::Acknowledgement(ack) => {
287 let mut ack_incoming = ack_incoming.clone();
288 let counters = counters.clone();
289 Either::Left(async move {
290 let IncomingAcknowledgementPacket { previous_hop, received_acks, .. } = *ack;
291 tracing::trace!(previous_hop = previous_hop.to_peerid_str(), num_acks = received_acks.len(), "incoming acknowledgements");
292 counters.get_or_create(&previous_hop).record_acks_received(received_acks.len() as u64);
293
294 ack_incoming
295 .send((previous_hop, received_acks))
296 .await
297 .unwrap_or_else(|error| {
298 tracing::error!(%error, "failed dispatching received acknowledgement to the ticket ack queue");
299 });
300
301 None
303 })
304 }
305 IncomingPacket::Final(final_packet) => {
306 let mut ack_outgoing_success = ack_outgoing_success.clone();
307 Either::Right(Either::Left(async move {
308 let IncomingFinalPacket {
309 previous_hop,
310 sender,
311 plain_text,
312 ack_key,
313 info,
314 ..
315 } = *final_packet;
316 tracing::trace!(previous_hop = previous_hop.to_peerid_str(), "incoming final packet");
317
318 ack_outgoing_success
320 .send((previous_hop, Some(ack_key)))
321 .await
322 .unwrap_or_else(|error| {
323 tracing::error!(%error, "failed to send ack to the egress queue");
324 });
325
326 #[cfg(all(feature = "telemetry", not(test)))]
327 METRIC_PACKET_COUNT.increment(&["received"]);
328
329 Some((sender, plain_text, info))
330 }))
331 }
332 IncomingPacket::Forwarded(fwd_packet) => {
333 let ticket_proc = ticket_proc_success.clone();
334 let mut wire_outgoing = wire_outgoing.clone();
335 let mut ack_outgoing_success = ack_outgoing_success.clone();
336 let counters = counters.clone();
337 Either::Right(Either::Right(async move {
338 let IncomingForwardedPacket {
339 previous_hop,
340 next_hop,
341 data,
342 ack_key_prev_hop,
343 ack_challenge,
344 received_ticket,
345 ..
346 } = *fwd_packet;
347 if let Err(error) = ticket_proc.insert_unacknowledged_ticket(&next_hop, ack_challenge, received_ticket) {
349 tracing::error!(
350 previous_hop = previous_hop.to_peerid_str(),
351 next_hop = next_hop.to_peerid_str(),
352 %error,
353 "failed to insert unacknowledged ticket into the ticket processor"
354 );
355
356 #[cfg(all(feature = "telemetry", not(test)))]
357 METRIC_PACKET_REJECTED_COUNT.increment(&["unack_processing_error"]);
358
359 return None;
360 }
361
362 tracing::trace!(
364 previous_hop = previous_hop.to_peerid_str(),
365 next_hop = next_hop.to_peerid_str(),
366 "forwarding packet to the next hop"
367 );
368
369 match wire_outgoing.send((next_hop.into(), data)).await {
370 Ok(()) => {
371 counters.get_or_create(&next_hop).record_message_sent();
372
373 #[cfg(all(feature = "telemetry", not(test)))]
374 METRIC_PACKET_COUNT.increment(&["forwarded"]);
375 }
376 Err(error) => {
377 tracing::error!(%error, "failed to forward a packet to the transport layer");
378 return None;
379 }
380 }
381
382 tracing::trace!(previous_hop = previous_hop.to_peerid_str(), "acknowledging forwarded packet back");
384 ack_outgoing_success
385 .send((previous_hop, Some(ack_key_prev_hop)))
386 .await
387 .unwrap_or_else(|error| {
388 tracing::error!(%error, "failed to send ack to the egress queue");
389 });
390
391 None
392 }))
393 }
394 }
395 }, concurrency)
396 .filter_map(|maybe_data| futures::future::ready(
397 maybe_data
399 .and_then(|(sender, data, aux_info)| ApplicationData::try_from(data.as_ref())
400 .inspect_err(|error| tracing::error!(%sender, %error, "failed to decode application data"))
401 .ok()
402 .map(|data| (sender, ApplicationDataIn {
403 data,
404 packet_info: IncomingPacketInfo {
405 signals_from_sender: aux_info.packet_signals,
406 num_saved_surbs: aux_info.num_surbs,
407 }
408 })))
409 ))
410 .map(Ok)
411 .forward_to_timeout(app_incoming)
412 .in_current_span()
413 .await;
414
415 if let Err(error) = res {
416 tracing::error!(
417 task = "transport (protocol - msg ingress)",
418 %error,
419 "long-running background task finished with error"
420 );
421 } else {
422 tracing::warn!(
423 task = "transport (protocol - msg ingress)",
424 "long-running background task finished"
425 )
426 }
427}
428
429async fn start_outgoing_ack_pipeline<AckOut, E, WOut>(
430 ack_outgoing: AckOut,
431 encoder: std::sync::Arc<E>,
432 cfg: AcknowledgementPipelineConfig,
433 packet_key: OffchainKeypair,
434 wire_outgoing: WOut,
435) where
436 AckOut: futures::Stream<Item = (OffchainPublicKey, Option<HalfKey>)> + Send + 'static,
437 E: PacketEncoder + Sync + Send + 'static,
438 WOut: futures::Sink<(PeerId, Bytes)> + Clone + Unpin + Send + 'static,
439 WOut::Error: std::error::Error,
440{
441 ack_outgoing
442 .map(move |(destination, maybe_ack_key)| {
443 let packet_key = packet_key.clone();
444 let ack = maybe_ack_key
446 .map(|ack_key| VerifiedAcknowledgement::new(ack_key, &packet_key))
447 .unwrap_or_else(|| VerifiedAcknowledgement::random(&packet_key));
448 (destination, ack)
449 })
450 .buffer(futures_time::time::Duration::from(cfg.ack_buffer_interval))
451 .filter(|acks| futures::future::ready(!acks.is_empty()))
452 .scan(
460 halfbrown::HashMap::<OffchainPublicKey, Vec<VerifiedAcknowledgement>, ahash::RandomState>::with_capacity_and_hasher(
461 cfg.ack_grouping_capacity,
462 ahash::RandomState::default(),
463 ),
464 |groups, buffered_acks| {
465 for (dst, ack) in buffered_acks {
466 groups
467 .entry(dst)
468 .and_modify(|v| v.push(ack))
469 .or_insert_with(|| vec![ack]);
470 }
471 tracing::trace!(
472 num_groups = groups.len(),
473 num_acks = groups.values().map(|v| v.len()).sum::<usize>(),
474 "acknowledgements grouped"
475 );
476 let drained: Vec<_> = groups.drain().collect();
477 futures::future::ready(Some(futures::stream::iter(drained)))
478 },
479 )
480 .flatten()
481 .for_each_concurrent(
482 cfg.ack_output_concurrency.filter(|&n| n > 0).unwrap_or(DEFAULT_ACK_OUTPUT_CONCURRENCY),
483 move |(destination, acks)| {
484 let encoder = encoder.clone();
485 let mut wire_outgoing = wire_outgoing.clone();
486 async move {
487 let c = acks.chunks(MAX_ACKNOWLEDGEMENTS_BATCH_SIZE).map(|c| c.to_vec()).collect::<Vec<_>>();
490 for ack_chunk in c {
491 let encoder = encoder.clone();
492 match hopr_utils::parallelize::cpu::spawn_fifo_blocking(move || encoder.encode_acknowledgements(&ack_chunk, &destination), "ack_encode").await {
493 Ok(Ok(ack_packet)) => {
494 wire_outgoing
495 .feed((ack_packet.next_hop.into(), ack_packet.data))
496 .await
497 .unwrap_or_else(|error| {
498 tracing::error!(%error, "failed to forward an acknowledgement to the transport layer");
499 });
500 }
501 Ok(Err(error)) => tracing::error!(%error, "failed to encode acknowledgements"),
502 Err(error) => tracing::error!(%error, "parallel processing of the outgoing acknowledgements failed"),
503 }
504 }
505 if let Err(error) = wire_outgoing.flush().await {
506 tracing::error!(%error, "failed to flush acknowledgements batch to the transport layer");
507 }
508 tracing::trace!("acknowledgements out");
509 }.instrument(tracing::debug_span!("outgoing_ack_batch", peer = destination.to_peerid_str()))
510 }
511 )
512 .in_current_span()
513 .await;
514
515 tracing::warn!(
516 task = "transport (protocol - ack outgoing)",
517 "long-running background task finished"
518 );
519}
520
521async fn start_exit_incoming_ack_pipeline<AckIn>(ack_incoming: AckIn)
526where
527 AckIn: futures::Stream<Item = (OffchainPublicKey, Vec<Acknowledgement>)> + Send + 'static,
528{
529 ack_incoming
530 .for_each(move |(peer, acks)| {
531 tracing::trace!(%peer, num = acks.len(), "received acknowledgements (drained, not processed)");
533 futures::future::ready(())
534 })
535 .in_current_span()
536 .await;
537
538 tracing::warn!(
539 task = "transport (protocol - ticket acknowledgement drain)",
540 "long-running background task finished"
541 );
542}
543
544async fn start_relay_incoming_ack_pipeline<AckIn, T, TEvt>(
545 ack_incoming: AckIn,
546 ticket_events: TEvt,
547 ticket_proc: std::sync::Arc<T>,
548 concurrency: usize,
549) where
550 AckIn: futures::Stream<Item = (OffchainPublicKey, Vec<Acknowledgement>)> + Send + 'static,
551 T: UnacknowledgedTicketProcessor + Sync + Send + 'static,
552 TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
553 TEvt::Error: std::error::Error,
554{
555 ack_incoming
556 .for_each_concurrent(concurrency, move |(peer, acks)| {
557 let ticket_proc = ticket_proc.clone();
558 let mut ticket_evt = ticket_events.clone();
559 async move {
560 tracing::trace!(num = acks.len(), "received acknowledgements");
561 match hopr_utils::parallelize::cpu::spawn_fifo_blocking(
562 move || ticket_proc.acknowledge_tickets(peer, acks),
563 "ack_decode",
564 )
565 .await
566 {
567 Ok(Ok(resolutions)) if !resolutions.is_empty() => {
568 let resolutions_iter = resolutions.into_iter().filter_map(|resolution| match resolution {
569 ResolvedAcknowledgement::RelayingWin(redeemable_ticket) => {
570 tracing::trace!("received ack for a winning ticket");
571 Some(Ok(TicketEvent::WinningTicket(redeemable_ticket)))
572 }
573 ResolvedAcknowledgement::RelayingLoss(_) => {
574 tracing::trace!("received ack for a losing ticket");
576 None
577 }
578 });
579
580 if let Err(error) = ticket_evt.send_all(&mut futures::stream::iter(resolutions_iter)).await {
582 tracing::error!(%error, "failed to notify ticket resolutions");
583 }
584 }
585 Ok(Ok(_)) => {
586 tracing::debug!("acknowledgement batch could not acknowledge any ticket");
587 }
588 Ok(Err(TicketAcknowledgementError::UnexpectedAcknowledgement)) => {
589 tracing::trace!("received unexpected acknowledgement");
592 }
593 Ok(Err(error)) => {
594 tracing::error!(%error, "failed to acknowledge ticket");
595 }
596 Err(error) => {
597 tracing::error!(%error, "parallel processing of the incoming acknowledgements failed")
598 }
599 }
600 }
601 .instrument(tracing::debug_span!("incoming_ack_batch", peer = peer.to_peerid_str()))
602 })
603 .in_current_span()
604 .await;
605
606 tracing::warn!(
607 task = "transport (protocol - ticket acknowledgement)",
608 "long-running background task finished"
609 );
610}
611#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
620pub enum NodeType {
621 Relay,
622 Entry,
623 Exit,
624}
625
626#[derive(Debug, Default, Copy, Clone)]
631#[doc(hidden)]
632pub struct NoopTicketProcessor;
633
634impl UnacknowledgedTicketProcessor for NoopTicketProcessor {
635 type Error = std::convert::Infallible;
636
637 #[inline]
638 fn insert_unacknowledged_ticket(
639 &self,
640 _: &OffchainPublicKey,
641 _: HalfKeyChallenge,
642 _: UnacknowledgedTicket,
643 ) -> Result<(), Self::Error> {
644 Ok(())
645 }
646
647 #[inline]
648 fn acknowledge_tickets(
649 &self,
650 _: OffchainPublicKey,
651 _: Vec<Acknowledgement>,
652 ) -> Result<Vec<ResolvedAcknowledgement>, TicketAcknowledgementError<Self::Error>> {
653 Ok(Vec::with_capacity(0))
654 }
655}
656#[allow(clippy::too_many_arguments)]
659#[tracing::instrument(skip_all, level = "trace", fields(me = packet_key.public().to_peerid_str()))]
660pub(super) fn run_packet_pipeline_inner<WIn, WOut, C, D, T, TEvt, AppOut, AppIn>(
661 node_type: NodeType,
662 packet_key: OffchainKeypair,
663 wire_msg: (WOut, WIn),
664 codec: (C, D),
665 ticket_proc: T,
666 ticket_events: TEvt,
667 cfg: PacketPipelineConfig,
668 api: (AppOut, AppIn),
669 counters: PeerProtocolCounterRegistry,
670) -> AbortableList<PacketPipelineProcesses>
671where
672 WOut: futures::Sink<(PeerId, Bytes)> + Clone + Unpin + Send + 'static,
673 WOut::Error: std::error::Error,
674 WIn: futures::Stream<Item = (PeerId, Bytes)> + Send + 'static,
675 C: PacketEncoder + Sync + Send + 'static,
676 D: PacketDecoder + Sync + Send + 'static,
677 T: UnacknowledgedTicketProcessor + Sync + Send + 'static,
678 TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
679 TEvt::Error: std::error::Error,
680 AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
681 AppOut::Error: std::error::Error,
682 AppIn: futures::Stream<Item = (ResolvedTransportRouting<HoprSurb>, ApplicationDataOut)> + Send + 'static,
683{
684 let mut processes = AbortableList::default();
685
686 #[cfg(all(feature = "telemetry", not(test)))]
687 {
688 lazy_static::initialize(&METRIC_PACKET_COUNT);
690 lazy_static::initialize(&METRIC_PACKET_DECODE_TIMEOUTS);
691 lazy_static::initialize(&METRIC_PACKET_REJECTED_COUNT);
692 lazy_static::initialize(&METRIC_VALIDATION_ERRORS);
693 }
694
695 let (outgoing_ack_tx, outgoing_ack_rx) =
696 futures::channel::mpsc::channel::<(OffchainPublicKey, Option<HalfKey>)>(cfg.ack_config.ack_out_buffer_size);
697
698 let (incoming_ack_tx, incoming_ack_rx) = futures::channel::mpsc::channel::<(OffchainPublicKey, Vec<Acknowledgement>)>(
699 cfg.ack_config.ticket_ack_buffer_size,
700 );
701
702 let (wire_out, wire_in) = (wire_msg.0.with_timeout(QUEUE_SEND_TIMEOUT), wire_msg.1);
705 let (app_out, app_in) = (api.0.with_timeout(QUEUE_SEND_TIMEOUT), api.1);
706 let incoming_ack_tx = incoming_ack_tx.with_timeout(QUEUE_SEND_TIMEOUT);
707 let outgoing_ack_tx = outgoing_ack_tx.with_timeout(QUEUE_SEND_TIMEOUT);
708 let ticket_events = ticket_events.with_timeout(QUEUE_SEND_TIMEOUT);
709
710 let encoder = std::sync::Arc::new(codec.0);
711 let decoder = std::sync::Arc::new(codec.1);
712 let ticket_proc = std::sync::Arc::new(ticket_proc);
713
714 let avail_concurrency = std::thread::available_parallelism()
717 .ok()
718 .map(|n| n.get())
719 .unwrap_or(1)
720 .max(1)
721 * 8;
722
723 let output_concurrency = cfg.output_concurrency.filter(|&n| n > 0).unwrap_or(avail_concurrency);
724 let input_concurrency = cfg.input_concurrency.filter(|&n| n > 0).unwrap_or(avail_concurrency);
725
726 processes.insert(
727 PacketPipelineProcesses::MsgOut,
728 hopr_utils::spawn_as_abortable!(
729 start_outgoing_packet_pipeline(
730 app_in,
731 encoder.clone(),
732 wire_out.clone(),
733 counters.clone(),
734 output_concurrency
735 )
736 .in_current_span()
737 ),
738 );
739
740 processes.insert(
741 PacketPipelineProcesses::MsgIn,
742 hopr_utils::spawn_as_abortable!(
743 start_incoming_packet_pipeline(
744 (wire_out.clone(), wire_in),
745 decoder,
746 ticket_proc.clone(),
747 ticket_events.clone(),
748 (outgoing_ack_tx, incoming_ack_tx),
749 app_out,
750 counters.clone(),
751 input_concurrency,
752 )
753 .in_current_span()
754 ),
755 );
756
757 processes.insert(
758 PacketPipelineProcesses::AckOut,
759 hopr_utils::spawn_as_abortable!(
760 start_outgoing_ack_pipeline(outgoing_ack_rx, encoder, cfg.ack_config, packet_key.clone(), wire_out,)
761 .in_current_span()
762 ),
763 );
764
765 let ack_input_concurrency = cfg
766 .ack_config
767 .ack_input_concurrency
768 .filter(|&n| n > 0)
769 .unwrap_or(DEFAULT_ACK_INPUT_CONCURRENCY);
770
771 match node_type {
772 NodeType::Relay => {
773 processes.insert(
774 PacketPipelineProcesses::AckIn,
775 hopr_utils::spawn_as_abortable!(
776 start_relay_incoming_ack_pipeline(
777 incoming_ack_rx,
778 ticket_events,
779 ticket_proc,
780 ack_input_concurrency
781 )
782 .in_current_span()
783 ),
784 );
785 }
786 NodeType::Exit => {
787 let _ = (ticket_events, ticket_proc, ack_input_concurrency);
791 processes.insert(
792 PacketPipelineProcesses::AckIn,
793 hopr_utils::spawn_as_abortable!(start_exit_incoming_ack_pipeline(incoming_ack_rx).in_current_span()),
794 );
795 }
796 NodeType::Entry => {
797 let _ = (ticket_events, ticket_proc, ack_input_concurrency, incoming_ack_rx);
801 }
802 }
803
804 processes
805}