hopr_transport_protocol/
lib.rs

1//! Collection of objects and functionality allowing building of p2p or stream protocols for the higher business logic
2//! layers.
3//!
4//! ## Contents
5//!
6//! Supported protocol configurations:
7//!
8//! - `mix`
9//! - `ack`
10//! - `heartbeat`
11//! - `ticket_aggregation`
12//!
13//! Supported protocol processors:
14//!
15//! - `ticket_aggregation`
16//!
17//! ### `ticket_aggregation`
18//!
19//! Ticket aggregation processing mechanism is responsible for ingesting the ticket aggregation related requests:
20//!
21//! - `Receive(PeerId, U)`,
22//! - `Reply(PeerId, std::result::Result<Ticket, String>, T)`,
23//! - `Send(PeerId, Vec<AcknowledgedTicket>, TicketAggregationFinalizer)`,
24//!
25//! where `U` is the type of an aggregated ticket extractable (`ResponseChannel<Result<Ticket, String>>`) and `T`
26//! represents a network negotiated identifier (`RequestId`).
27//!
28//! In broader context the protocol flow is as follows:
29//!
30//! 1. requesting ticket aggregation
31//!
32//!    - the peer A desires to aggregate tickets, collects the tickets into a data collection and sends a request
33//!      containing the collection to aggregate `Vec<AcknowledgedTicket>` to peer B using the `Send` mechanism
34//!
35//! 2. responding to ticket aggregation
36//!
37//!    - peer B obtains the request from peer A, performs the ticket aggregation and returns a result of that operation
38//!      in the form of `std::result::Result<Ticket, String>` using the `Reply` mechanism
39//!
40//! 3. accepting the aggregated ticket
41//!    - peer A receives the aggregated ticket using the `Receive` mechanism
42//!
43//! Furthermore, apart from the basic positive case scenario, standard mechanics of protocol communication apply:
44//!
45//! - the requesting side can time out, if the responding side takes too long to provide an aggregated ticket, in which
46//!   case the ticket is not considered aggregated, even if eventually an aggregated ticket is delivered
47//! - the responder can fail to aggregate tickets in which case it replies with an error string describing the failure
48//!   reason and it is the requester's responsibility to handle the negative case as well
49//!   - in the absence of response, the requester will time out
50
51/// Coder and decoder for the transport binary protocol layer
52mod codec;
53
54/// Configuration of the protocol components.
55pub mod config;
56/// Errors produced by the crate.
57pub mod errors;
58
59// protocols
60/// `heartbeat` p2p protocol
61pub mod heartbeat;
62/// processor for the protocol
63pub mod processor;
64
65/// Stream processing utilities
66pub mod stream;
67
68pub mod timer;
69
70/// Allows capturing dissected HOPR packets before they are processed by the transport.
71///
72/// Requires the `capture` feature to be enabled.
73#[cfg(feature = "capture")]
74mod capture;
75
76use std::{collections::HashMap, time::Duration};
77
78use futures::{FutureExt, SinkExt, StreamExt};
79use futures_time::future::FutureExt as FuturesTimeExt;
80use hopr_api::{
81    chain::{ChainKeyOperations, ChainReadChannelOperations, ChainValues},
82    db::{HoprDbProtocolOperations, IncomingPacket},
83};
84use hopr_async_runtime::spawn_as_abortable;
85use hopr_crypto_types::types::{HalfKey, OffchainPublicKey};
86use hopr_internal_types::{
87    prelude::{Acknowledgement, HoprPseudonym},
88    protocol::VerifiedAcknowledgement,
89};
90use hopr_network_types::prelude::ResolvedTransportRouting;
91use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, IncomingPacketInfo};
92use hopr_transport_bloom::TagBloomFilter;
93use hopr_transport_identity::{Multiaddr, PeerId};
94use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
95use tracing::Instrument;
96
97use crate::processor::{PacketUnwrapping, PacketWrapping};
98pub use crate::timer::execute_on_tick;
99
100const HOPR_PACKET_SIZE: usize = hopr_crypto_packet::prelude::HoprPacket::SIZE;
101const SLOW_OP: std::time::Duration = std::time::Duration::from_millis(150);
102const QUEUE_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50);
103
104pub type HoprBinaryCodec = crate::codec::FixedLengthCodec<HOPR_PACKET_SIZE>;
105pub const CURRENT_HOPR_MSG_PROTOCOL: &str = "/hopr/mix/1.0.0";
106
107pub const TICKET_ACK_BUFFER_SIZE: usize = 1_000_000;
108pub const NUM_CONCURRENT_TICKET_ACK_PROCESSING: usize = 10;
109
110pub const ACK_OUT_BUFFER_SIZE: usize = 1_000_000;
111pub const NUM_CONCURRENT_ACK_OUT_PROCESSING: usize = 10;
112
113#[cfg(all(feature = "prometheus", not(test)))]
114lazy_static::lazy_static! {
115    // packet
116    static ref METRIC_PACKET_COUNT:  hopr_metrics::MultiCounter =  hopr_metrics::MultiCounter::new(
117        "hopr_packets_count",
118        "Number of processed packets of different types (sent, received, forwarded)",
119        &["type"]
120    ).unwrap();
121    static ref METRIC_REPLAYED_PACKET_COUNT: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
122        "hopr_replayed_packet_count",
123        "The total count of replayed packets during the packet processing pipeline run",
124    ).unwrap();
125}
126
127#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
128pub enum ProtocolProcesses {
129    #[strum(to_string = "HOPR [msg] - ingress")]
130    MsgIn,
131    #[strum(to_string = "HOPR [msg] - egress")]
132    MsgOut,
133    #[strum(to_string = "HOPR [ack] - egress")]
134    AckOut,
135    #[strum(to_string = "HOPR [ack] - ingress - ticket acknowledgement")]
136    TicketAck,
137    #[strum(to_string = "HOPR [msg] - mixer")]
138    Mixer,
139    #[cfg(feature = "capture")]
140    #[strum(to_string = "packet capture")]
141    Capture,
142}
143/// Processed indexer generated events.
144#[derive(Debug, Clone)]
145pub enum PeerDiscovery {
146    Allow(PeerId),
147    Ban(PeerId),
148    Announce(PeerId, Vec<Multiaddr>),
149}
150
151#[cfg(feature = "capture")]
152fn inspect_ticket_data_in_packet(raw_packet: &[u8]) -> &[u8] {
153    use hopr_primitive_types::traits::BytesEncodable;
154    if raw_packet.len() >= hopr_internal_types::tickets::Ticket::SIZE {
155        &raw_packet[raw_packet.len() - hopr_internal_types::tickets::Ticket::SIZE..]
156    } else {
157        &[]
158    }
159}
160
161/// Run all processes responsible for handling the msg and acknowledgment protocols.
162///
163/// The pipeline does not handle the mixing itself, that needs to be injected as a separate process
164/// overlay on top of the `wire_msg` Stream or Sink.
165#[allow(clippy::too_many_arguments)]
166pub async fn run_msg_ack_protocol<Db, R>(
167    packet_cfg: processor::PacketInteractionConfig,
168    db: Db,
169    resolver: R,
170    wire_msg: (
171        impl futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
172        impl futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + 'static,
173    ),
174    api: (
175        impl futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
176        impl futures::Stream<Item = (ResolvedTransportRouting, ApplicationDataOut)> + Send + 'static,
177    ),
178) -> HashMap<ProtocolProcesses, hopr_async_runtime::AbortHandle>
179where
180    Db: HoprDbProtocolOperations + Clone + Send + Sync + 'static,
181    R: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Clone + Send + Sync + 'static,
182{
183    let me = packet_cfg.packet_keypair.clone();
184
185    #[cfg(feature = "capture")]
186    let me_pub = *hopr_crypto_types::keypairs::Keypair::public(&me);
187
188    let mut processes = HashMap::new();
189
190    #[cfg(all(feature = "prometheus", not(test)))]
191    {
192        // Initialize the lazy statics here
193        lazy_static::initialize(&METRIC_PACKET_COUNT);
194        lazy_static::initialize(&METRIC_REPLAYED_PACKET_COUNT);
195    }
196
197    #[cfg(feature = "capture")]
198    let capture = {
199        use std::{path::PathBuf, str::FromStr};
200        let writer: Box<dyn capture::PacketWriter + Send + 'static> =
201            if let Ok(desc) = std::env::var("HOPR_CAPTURE_PACKETS") {
202                if let Ok(udp_writer) = std::net::SocketAddr::from_str(&desc)
203                    .map_err(std::io::Error::other)
204                    .and_then(capture::UdpPacketDump::new)
205                {
206                    tracing::warn!("udp packet capture initialized to {desc}");
207                    Box::new(udp_writer)
208                } else if let Ok(pcap_writer) = std::fs::File::create(&desc).and_then(capture::PcapPacketWriter::new) {
209                    tracing::info!("pcap file packet capture initialized to {desc}");
210                    Box::new(pcap_writer)
211                } else {
212                    tracing::error!(desc, "failed to create packet capture: invalid socket address or file");
213                    Box::new(capture::NullWriter)
214                }
215            } else {
216                tracing::warn!("no packet capture specified");
217                Box::new(capture::NullWriter)
218            };
219        let start_capturing_path: Option<PathBuf> = std::env::var("HOPR_CAPTURE_PATH_TRIGGER").ok().map(PathBuf::from);
220        if let Some(ref path) = start_capturing_path {
221            tracing::info!(
222                "To start capturing packets, create it by running 'touch {}'",
223                path.display()
224            );
225        } else {
226            tracing::warn!("The env var 'HOPR_CAPTURE_PATH_TRIGGER' is not set, packet capture won't start");
227        };
228        let (capture, ah) = capture::packet_capture_channel(writer, start_capturing_path);
229        processes.insert(ProtocolProcesses::Capture, ah);
230        capture
231    };
232
233    let tbf = std::sync::Arc::new(parking_lot::Mutex::new(TagBloomFilter::default()));
234
235    let (ticket_ack_tx, ticket_ack_rx) =
236        futures::channel::mpsc::channel::<(Acknowledgement, OffchainPublicKey)>(TICKET_ACK_BUFFER_SIZE);
237
238    let db_clone = db.clone();
239    let resolver_clone = resolver.clone();
240    processes.insert(
241        ProtocolProcesses::TicketAck,
242        spawn_as_abortable!(ticket_ack_rx
243            .for_each_concurrent(NUM_CONCURRENT_TICKET_ACK_PROCESSING, move |(ack, sender)| {
244                let db = db_clone.clone();
245                let resolver = resolver_clone.clone();
246                async move {
247                    if let Ok(verified) = hopr_parallelize::cpu::spawn_blocking(move || ack.verify(&sender)).await {
248                        tracing::trace!(%sender, "received a valid acknowledgement");
249                            match db.handle_acknowledgement(verified, &resolver).await {
250                                Ok(_) => tracing::trace!(%sender, "successfully processed a known acknowledgement"),
251                                // Eventually, we do not care here if the acknowledgement does not belong to any
252                                // unacknowledged packets.
253                                Err(error) => tracing::trace!(%sender, %error, "valid acknowledgement is unknown or error occurred while processing it"),
254                            }
255                    } else {
256                        tracing::error!(%sender, "failed to verify signature on acknowledgement");
257                    }
258                }
259            })
260            .inspect(|_| tracing::warn!(task = "transport (protocol - ticket acknowledgement)", "long-running background task finished")))
261    );
262
263    let (ack_out_tx, ack_out_rx) =
264        futures::channel::mpsc::channel::<(Option<HalfKey>, OffchainPublicKey)>(ACK_OUT_BUFFER_SIZE);
265
266    #[cfg(feature = "capture")]
267    let capture_clone = capture.clone();
268
269    let db_clone = db.clone();
270    let resolver_clone = resolver.clone();
271    let me_clone = me.clone();
272    let msg_to_send_tx = wire_msg.0.clone();
273    processes.insert(
274        ProtocolProcesses::AckOut,
275        spawn_as_abortable!(
276            ack_out_rx
277                .for_each_concurrent(
278                    NUM_CONCURRENT_ACK_OUT_PROCESSING,
279                    move |(maybe_ack_key, destination)| {
280                        let db = db_clone.clone();
281                        let resolver = resolver_clone.clone();
282                        let me = me_clone.clone();
283                        let mut msg_to_send_tx_clone = msg_to_send_tx.clone();
284
285                        #[cfg(feature = "capture")]
286                        let mut capture = capture_clone.clone();
287                        async move {
288                            #[cfg(feature = "capture")]
289                            let (is_random, me_pub) = (
290                                maybe_ack_key.is_none(),
291                                *hopr_crypto_types::keypairs::Keypair::public(&me),
292                            );
293
294                            // Sign acknowledgement with the given half-key or generate a signed random one
295                            let ack = hopr_parallelize::cpu::spawn_blocking(move || {
296                                maybe_ack_key
297                                    .map(|ack_key| VerifiedAcknowledgement::new(ack_key, &me))
298                                    .unwrap_or_else(|| VerifiedAcknowledgement::random(&me))
299                            })
300                            .await;
301
302                            #[cfg(feature = "capture")]
303                            let captured_packet: capture::CapturedPacket = capture::PacketBeforeTransit::OutgoingAck {
304                                me: me_pub,
305                                ack,
306                                is_random,
307                                next_hop: destination,
308                            }
309                            .into();
310
311                            match db
312                                .to_send_no_ack(ack.leak().as_ref().into(), destination, &resolver)
313                                .await
314                            {
315                                Ok(ack_packet) => {
316                                    msg_to_send_tx_clone
317                                        .send((ack_packet.next_hop.into(), ack_packet.data))
318                                        .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
319                                        .await
320                                        .unwrap_or_else(|_| {
321                                            tracing::error!(
322                                                "failed to forward an acknowledgement to the transport layer: timeout"
323                                            );
324                                            Ok(())
325                                        })
326                                        .unwrap_or_else(|_| {
327                                            tracing::error!(
328                                                "failed to forward an acknowledgement to the transport layer"
329                                            );
330                                        });
331
332                                    #[cfg(feature = "capture")]
333                                    let _ = capture.try_send(captured_packet);
334                                }
335                                Err(error) => tracing::error!(%error, "failed to create ack packet"),
336                            }
337                        }
338                    }
339                )
340                .inspect(|_| tracing::warn!(
341                    task = "transport (protocol - ack outgoing)",
342                    "long-running background task finished"
343                ))
344        ),
345    );
346
347    let msg_processor_read = processor::PacketProcessor::new(db.clone(), resolver.clone(), packet_cfg);
348    let msg_processor_write = msg_processor_read.clone();
349
350    #[cfg(feature = "capture")]
351    let capture_clone = capture.clone();
352
353    let msg_to_send_tx = wire_msg.0.clone();
354    processes.insert(
355        ProtocolProcesses::MsgOut,
356        spawn_as_abortable!(async move {
357            let _neverending = api
358                .1
359                .then_concurrent(|(routing, data)| {
360                    let msg_processor = msg_processor_write.clone();
361
362                    #[cfg(feature = "capture")]
363                    let (mut capture_clone, data_clone, num_surbs) =
364                        (capture_clone.clone(), data.clone(), routing.count_return_paths() as u8);
365
366                    async move {
367                        match PacketWrapping::send(&msg_processor, data, routing).await {
368                            Ok(v) => {
369                                #[cfg(all(feature = "prometheus", not(test)))]
370                                {
371                                    METRIC_PACKET_COUNT.increment(&["sent"]);
372                                }
373
374                                #[cfg(feature = "capture")]
375                                let _ = capture_clone.try_send(
376                                    capture::PacketBeforeTransit::OutgoingPacket {
377                                        me: me_pub,
378                                        next_hop: v.next_hop,
379                                        num_surbs,
380                                        is_forwarded: false,
381                                        data: data_clone.data.to_bytes().into_vec().into(),
382                                        ack_challenge: v.ack_challenge.as_ref().into(),
383                                        signals: data_clone.packet_info.unwrap_or_default().signals_to_destination,
384                                        ticket: inspect_ticket_data_in_packet(&v.data).into(),
385                                    }
386                                    .into(),
387                                );
388
389                                Some((v.next_hop.into(), v.data))
390                            }
391                            Err(error) => {
392                                tracing::error!(%error, "packet could not be wrapped for sending");
393                                None
394                            }
395                        }
396                    }
397                })
398                .filter_map(futures::future::ready)
399                .inspect(|(peer, _)| tracing::trace!(%peer, "protocol message out"))
400                .map(Ok)
401                .forward(msg_to_send_tx)
402                .instrument(tracing::trace_span!("msg protocol processing - egress"))
403                .inspect(|_| {
404                    tracing::warn!(
405                        task = "transport (protocol - msg egress)",
406                        "long-running background task finished"
407                    )
408                })
409                .await;
410        }),
411    );
412
413    let ack_out_tx_clone_1 = ack_out_tx.clone();
414    let ack_out_tx_clone_2 = ack_out_tx.clone();
415
416    #[cfg(feature = "capture")]
417    let capture_clone = capture.clone();
418
419    // Create a cache for a CPU-intensive conversion PeerId -> OffchainPublicKey
420    let peer_id_cache: moka::future::Cache<PeerId, OffchainPublicKey> = moka::future::Cache::builder()
421        .time_to_idle(Duration::from_secs(600))
422        .max_capacity(100_000)
423        .build();
424
425    processes.insert(
426        ProtocolProcesses::MsgIn,
427        spawn_as_abortable!(async move {
428            let _neverending = wire_msg
429                .1
430                .then_concurrent(move |(peer, data)| {
431                    let msg_processor = msg_processor_read.clone();
432                    let mut ack_out_tx = ack_out_tx_clone_1.clone();
433                    let peer_id_key_cache = peer_id_cache.clone();
434
435                    tracing::trace!(%peer, "protocol message in");
436
437                    #[cfg(feature = "capture")]
438                    let (mut capture_clone, ticket_data_clone) = (
439                        capture.clone(),
440                        inspect_ticket_data_in_packet(&data).to_vec()
441                    );
442
443                    async move {
444                        // Try to retrieve the peer's public key from the cache or compute it if it does not exist yet
445                        let peer_key = match peer_id_key_cache
446                                .try_get_with_by_ref(&peer, hopr_parallelize::cpu::spawn_fifo_blocking(move || OffchainPublicKey::from_peerid(&peer)))
447                                .await {
448                            Ok(peer) => peer,
449                            Err(error) => {
450                                // There absolutely nothing we can do when the peer id is unparseable (e.g., non-ed25519 based)
451                                tracing::error!(%peer, %error, "dropping packet - cannot convert peer id");
452                                return None;
453                            }
454                        };
455
456                        let now = std::time::Instant::now();
457                        let res = msg_processor.recv(peer_key, data).await;
458                        let elapsed = now.elapsed();
459                        if elapsed > SLOW_OP {
460                            tracing::warn!(%peer, ?elapsed, "msg_processor.recv took too long");
461                        }
462
463                        // If there was an error caused by interpretation of the packet data,
464                        // we must send a random acknowledgement back.
465                        if let Err(error) = &res {
466                            tracing::error!(%peer, %error, "failed to process the received packet");
467
468                            // Send random signed acknowledgement to give feedback to the sender
469                            if error.is_undecodable() {
470                                // Do not send an ack back if the packet could not be decoded at all
471                                // 
472                                // Potentially adversarial behavior
473                                tracing::trace!(%peer, "not sending ack back on undecodable packet - possible adversarial behavior");
474                            } else {
475                                ack_out_tx
476                                    .send((None, peer_key))
477                                    .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
478                                    .await
479                                    .unwrap_or_else(|_| {
480                                        tracing::error!("failed to send ack to the egress queue: timeout");
481                                        Ok(())
482                                    })
483                                    .unwrap_or_else(|_| {
484                                        tracing::error!("failed to send ack to the egress queue");
485                                    });
486                            }
487                        }
488
489                        #[cfg(feature = "capture")]
490                        if let Ok(packet) = &res {
491                            let _ = capture_clone.try_send(capture::PacketBeforeTransit::IncomingPacket {
492                                    me: me_pub,
493                                    packet,
494                                    ticket: ticket_data_clone.into(),
495                                }.into()
496                            );
497                        }
498
499                        res.ok()
500                    }
501                })
502                .filter_map(move |maybe_packet| {
503                    let tbf = tbf.clone();
504
505                    futures::future::ready(
506                        if let Some(packet) = maybe_packet {
507                            match packet {
508                                IncomingPacket::Acknowledgement { packet_tag, previous_hop, .. } |
509                                IncomingPacket::Final { packet_tag, previous_hop,.. } |
510                                IncomingPacket::Forwarded { packet_tag, previous_hop, .. } => {
511                                    // This operation has run-time of ~10 nanoseconds,
512                                    // and therefore does not need to be invoked via spawn_blocking
513                                    if tbf.lock().check_and_set(&packet_tag) {
514                                        tracing::warn!(%previous_hop, "replayed packet received");
515
516                                        #[cfg(all(feature = "prometheus", not(test)))]
517                                        METRIC_REPLAYED_PACKET_COUNT.increment();
518
519                                        None
520                                    } else {
521                                        Some(packet)
522                                    }
523                                }
524                            }
525                        } else {
526                            tracing::trace!("received empty packet");
527                            None
528                        }
529                    )
530                })
531                .then_concurrent(move |packet| {
532                    let mut msg_to_send_tx = wire_msg.0.clone();
533
534                    #[cfg(feature = "capture")]
535                    let mut capture_clone = capture_clone.clone();
536
537                    let mut ticket_ack_tx_clone = ticket_ack_tx.clone();
538                    let mut ack_out_tx = ack_out_tx_clone_2.clone();
539                    async move {
540
541                    match packet {
542                        IncomingPacket::Acknowledgement {
543                            previous_hop,
544                            ack,
545                            ..
546                        } => {
547                            tracing::trace!(%previous_hop, "acknowledging ticket using received ack");
548                            ticket_ack_tx_clone
549                                .send((ack, previous_hop))
550                                .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
551                                .await
552                                .unwrap_or_else(|_| {
553                                    tracing::error!("failed dispatching received acknowledgement to the ticket ack queue: timeout");
554                                    Ok(())
555                                })
556                                .unwrap_or_else(|_| {
557                                    tracing::error!("failed dispatching received acknowledgement to the ticket ack queue");
558                                });
559
560                            // We do not acknowledge back acknowledgements.
561                            None
562                        },
563                        IncomingPacket::Final {
564                            previous_hop,
565                            sender,
566                            plain_text,
567                            ack_key,
568                            info,
569                            ..
570                        } => {
571                            // Send acknowledgement back
572                            ack_out_tx
573                                .send((Some(ack_key), previous_hop))
574                                .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
575                                .await
576                                .unwrap_or_else(|_| {
577                                    tracing::error!("failed to send ack to the egress queue: timeout");
578                                    Ok(())
579                                })
580                                .unwrap_or_else(|_| {
581                                    tracing::error!("failed to send ack to the egress queue");
582                                });
583
584                            #[cfg(all(feature = "prometheus", not(test)))]
585                            {
586                                METRIC_PACKET_COUNT.increment(&["received"]);
587                            }
588
589                            Some((sender, plain_text, info))
590                        }
591                        IncomingPacket::Forwarded {
592                            previous_hop,
593                            next_hop,
594                            data,
595                            ack_key,
596                            ..
597                        } => {
598                            // First, relay the packet to the next hop
599                            tracing::trace!(%previous_hop, %next_hop, "forwarding packet to the next hop");
600
601                            #[cfg(feature = "capture")]
602                            let captured_packet: capture::CapturedPacket = capture::PacketBeforeTransit::OutgoingPacket {
603                                me: me_pub,
604                                next_hop,
605                                num_surbs: 0,
606                                is_forwarded: true,
607                                data: data.as_ref().into(),
608                                ack_challenge: Default::default(),
609                                signals: None.into(),
610                                ticket: inspect_ticket_data_in_packet(data.as_ref()).into()
611                            }.into();
612
613                            msg_to_send_tx
614                                .send((next_hop.into(), data))
615                                .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
616                                .await
617                                .unwrap_or_else(|_| {
618                                    tracing::error!("failed to forward a packet to the transport layer: timeout");
619                                    Ok(())
620                                })
621                                .unwrap_or_else(|_| {
622                                    tracing::error!("failed to forward a packet to the transport layer");
623                                });
624
625                            #[cfg(all(feature = "prometheus", not(test)))]
626                            {
627                                METRIC_PACKET_COUNT.increment(&["forwarded"]);
628                            }
629
630                            #[cfg(feature = "capture")]
631                            let _ = capture_clone.try_send(captured_packet);
632
633                             // Send acknowledgement back
634                            tracing::trace!(%previous_hop, "acknowledging forwarded packet back");
635                            ack_out_tx
636                                .send((Some(ack_key), previous_hop))
637                                .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
638                                .await
639                                .unwrap_or_else(|_| {
640                                    tracing::error!("failed to send ack to the egress queue: timeout");
641                                    Ok(())
642                                })
643                                .unwrap_or_else(|_| {
644                                    tracing::error!("failed to send ack to the egress queue");
645                                });
646
647                            None
648                        }
649                    }
650                }})
651                .filter_map(|maybe_data| futures::future::ready(
652                    // Create the ApplicationDataIn data structure for incoming data
653                    maybe_data
654                        .and_then(|(sender, data, aux_info)| ApplicationData::try_from(data.as_ref())
655                            .inspect_err(|error| tracing::error!(%sender, %error, "failed to decode application data"))
656                            .ok()
657                            .map(|data| (sender, ApplicationDataIn {
658                                data,
659                                packet_info: IncomingPacketInfo {
660                                    signals_from_sender: aux_info.packet_signals,
661                                    num_saved_surbs: aux_info.num_surbs,
662                                }
663                            })))
664                ))
665                .map(Ok)
666                .forward(api.0)
667                .instrument(tracing::trace_span!("msg protocol processing - ingress"))
668                .inspect(|_| tracing::warn!(task = "transport (protocol - msg ingress)", "long-running background task finished"))
669                .await;
670        }),
671    );
672
673    processes
674}