hopr_transport_protocol/
lib.rs

1//! Collection of objects and functionality allowing building of p2p or stream protocols for the higher business logic
2//! layers.
3//!
4//! ## Contents
5//!
6//! Supported protocol configurations:
7//!
8//! - `mix`
9//! - `ack`
10//! - `heartbeat`
11//! - `ticket_aggregation`
12//!
13//! Supported protocol processors:
14//!
15//! - `ticket_aggregation`
16//!
17//! ### `ticket_aggregation`
18//!
19//! Ticket aggregation processing mechanism is responsible for ingesting the ticket aggregation related requests:
20//!
21//! - `Receive(PeerId, U)`,
22//! - `Reply(PeerId, std::result::Result<Ticket, String>, T)`,
23//! - `Send(PeerId, Vec<AcknowledgedTicket>, TicketAggregationFinalizer)`,
24//!
25//! where `U` is the type of an aggregated ticket extractable (`ResponseChannel<Result<Ticket, String>>`) and `T`
26//! represents a network negotiated identifier (`RequestId`).
27//!
28//! In broader context the protocol flow is as follows:
29//!
30//! 1. requesting ticket aggregation
31//!
32//!    - the peer A desires to aggregate tickets, collects the tickets into a data collection and sends a request
33//!      containing the collection to aggregate `Vec<AcknowledgedTicket>` to peer B using the `Send` mechanism
34//!
35//! 2. responding to ticket aggregation
36//!
37//!    - peer B obtains the request from peer A, performs the ticket aggregation and returns a result of that operation
38//!      in the form of `std::result::Result<Ticket, String>` using the `Reply` mechanism
39//!
40//! 3. accepting the aggregated ticket
41//!    - peer A receives the aggregated ticket using the `Receive` mechanism
42//!
43//! Furthermore, apart from the basic positive case scenario, standard mechanics of protocol communication apply:
44//!
45//! - the requesting side can time out, if the responding side takes too long to provide an aggregated ticket, in which
46//!   case the ticket is not considered aggregated, even if eventually an aggregated ticket is delivered
47//! - the responder can fail to aggregate tickets in which case it replies with an error string describing the failure
48//!   reason and it is the requester's responsibility to handle the negative case as well
49//!   - in the absence of response, the requester will time out
50
51/// Coder and decoder for the transport binary protocol layer
52mod codec;
53
54/// Configuration of the protocol components.
55pub mod config;
56/// Errors produced by the crate.
57pub mod errors;
58
59// protocols
60/// `heartbeat` p2p protocol
61pub mod heartbeat;
62/// processor for the protocol
63pub mod processor;
64
65/// Stream processing utilities
66pub mod stream;
67
68/// Allows capturing dissected HOPR packets before they are processed by the transport.
69///
70/// Requires the `capture` feature to be enabled.
71#[cfg(feature = "capture")]
72mod capture;
73
74use std::{collections::HashMap, time::Duration};
75
76use futures::{FutureExt, SinkExt, StreamExt};
77use futures_time::future::FutureExt as FuturesTimeExt;
78use hopr_api::{
79    chain::{ChainKeyOperations, ChainReadChannelOperations, ChainValues},
80    db::{HoprDbProtocolOperations, IncomingPacket},
81};
82use hopr_async_runtime::spawn_as_abortable;
83use hopr_crypto_types::types::{HalfKey, OffchainPublicKey};
84use hopr_internal_types::{
85    prelude::{Acknowledgement, HoprPseudonym},
86    protocol::VerifiedAcknowledgement,
87};
88use hopr_network_types::prelude::ResolvedTransportRouting;
89use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, IncomingPacketInfo};
90use hopr_transport_bloom::TagBloomFilter;
91use hopr_transport_identity::{Multiaddr, PeerId};
92use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
93use tracing::Instrument;
94
95use crate::processor::{PacketUnwrapping, PacketWrapping};
96
97const HOPR_PACKET_SIZE: usize = hopr_crypto_packet::prelude::HoprPacket::SIZE;
98const SLOW_OP: std::time::Duration = std::time::Duration::from_millis(150);
99const QUEUE_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50);
100
101pub type HoprBinaryCodec = crate::codec::FixedLengthCodec<HOPR_PACKET_SIZE>;
102pub const CURRENT_HOPR_MSG_PROTOCOL: &str = "/hopr/mix/1.0.0";
103
104pub const TICKET_ACK_BUFFER_SIZE: usize = 1_000_000;
105pub const NUM_CONCURRENT_TICKET_ACK_PROCESSING: usize = 10;
106
107pub const ACK_OUT_BUFFER_SIZE: usize = 1_000_000;
108pub const NUM_CONCURRENT_ACK_OUT_PROCESSING: usize = 10;
109
110#[cfg(all(feature = "prometheus", not(test)))]
111lazy_static::lazy_static! {
112    // packet
113    static ref METRIC_PACKET_COUNT:  hopr_metrics::MultiCounter =  hopr_metrics::MultiCounter::new(
114        "hopr_packets_count",
115        "Number of processed packets of different types (sent, received, forwarded)",
116        &["type"]
117    ).unwrap();
118    static ref METRIC_REPLAYED_PACKET_COUNT: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
119        "hopr_replayed_packet_count",
120        "The total count of replayed packets during the packet processing pipeline run",
121    ).unwrap();
122}
123
124#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
125pub enum ProtocolProcesses {
126    #[strum(to_string = "HOPR [msg] - ingress")]
127    MsgIn,
128    #[strum(to_string = "HOPR [msg] - egress")]
129    MsgOut,
130    #[strum(to_string = "HOPR [ack] - egress")]
131    AckOut,
132    #[strum(to_string = "HOPR [ack] - ingress - ticket acknowledgement")]
133    TicketAck,
134    #[strum(to_string = "HOPR [msg] - mixer")]
135    Mixer,
136    #[cfg(feature = "capture")]
137    #[strum(to_string = "packet capture")]
138    Capture,
139}
140/// Processed indexer generated events.
141#[derive(Debug, Clone)]
142pub enum PeerDiscovery {
143    Allow(PeerId),
144    Ban(PeerId),
145    Announce(PeerId, Vec<Multiaddr>),
146}
147
148#[cfg(feature = "capture")]
149fn inspect_ticket_data_in_packet(raw_packet: &[u8]) -> &[u8] {
150    use hopr_primitive_types::traits::BytesEncodable;
151    if raw_packet.len() >= hopr_internal_types::tickets::Ticket::SIZE {
152        &raw_packet[raw_packet.len() - hopr_internal_types::tickets::Ticket::SIZE..]
153    } else {
154        &[]
155    }
156}
157
158/// Run all processes responsible for handling the msg and acknowledgment protocols.
159///
160/// The pipeline does not handle the mixing itself, that needs to be injected as a separate process
161/// overlay on top of the `wire_msg` Stream or Sink.
162#[allow(clippy::too_many_arguments)]
163pub async fn run_msg_ack_protocol<Db, R>(
164    packet_cfg: processor::PacketInteractionConfig,
165    db: Db,
166    resolver: R,
167    wire_msg: (
168        impl futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
169        impl futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + 'static,
170    ),
171    api: (
172        impl futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
173        impl futures::Stream<Item = (ResolvedTransportRouting, ApplicationDataOut)> + Send + 'static,
174    ),
175) -> HashMap<ProtocolProcesses, hopr_async_runtime::AbortHandle>
176where
177    Db: HoprDbProtocolOperations + Clone + Send + Sync + 'static,
178    R: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Clone + Send + Sync + 'static,
179{
180    let me = packet_cfg.packet_keypair.clone();
181
182    #[cfg(feature = "capture")]
183    let me_pub = *hopr_crypto_types::keypairs::Keypair::public(&me);
184
185    let mut processes = HashMap::new();
186
187    #[cfg(all(feature = "prometheus", not(test)))]
188    {
189        // Initialize the lazy statics here
190        lazy_static::initialize(&METRIC_PACKET_COUNT);
191        lazy_static::initialize(&METRIC_REPLAYED_PACKET_COUNT);
192    }
193
194    #[cfg(feature = "capture")]
195    let capture = {
196        use std::{path::PathBuf, str::FromStr};
197        let writer: Box<dyn capture::PacketWriter + Send + 'static> =
198            if let Ok(desc) = std::env::var("HOPR_CAPTURE_PACKETS") {
199                if let Ok(udp_writer) = std::net::SocketAddr::from_str(&desc)
200                    .map_err(std::io::Error::other)
201                    .and_then(capture::UdpPacketDump::new)
202                {
203                    tracing::warn!("udp packet capture initialized to {desc}");
204                    Box::new(udp_writer)
205                } else if let Ok(pcap_writer) = std::fs::File::create(&desc).and_then(capture::PcapPacketWriter::new) {
206                    tracing::info!("pcap file packet capture initialized to {desc}");
207                    Box::new(pcap_writer)
208                } else {
209                    tracing::error!(desc, "failed to create packet capture: invalid socket address or file");
210                    Box::new(capture::NullWriter)
211                }
212            } else {
213                tracing::warn!("no packet capture specified");
214                Box::new(capture::NullWriter)
215            };
216        let start_capturing_path: Option<PathBuf> = std::env::var("HOPR_CAPTURE_PATH_TRIGGER").ok().map(PathBuf::from);
217        if let Some(ref path) = start_capturing_path {
218            tracing::info!(
219                "To start capturing packets, create it by running 'touch {}'",
220                path.display()
221            );
222        } else {
223            tracing::warn!("The env var 'HOPR_CAPTURE_PATH_TRIGGER' is not set, packet capture won't start");
224        };
225        let (capture, ah) = capture::packet_capture_channel(writer, start_capturing_path);
226        processes.insert(ProtocolProcesses::Capture, ah);
227        capture
228    };
229
230    let tbf = std::sync::Arc::new(parking_lot::Mutex::new(TagBloomFilter::default()));
231
232    let (ticket_ack_tx, ticket_ack_rx) =
233        futures::channel::mpsc::channel::<(Acknowledgement, OffchainPublicKey)>(TICKET_ACK_BUFFER_SIZE);
234
235    let db_clone = db.clone();
236    let resolver_clone = resolver.clone();
237    processes.insert(
238        ProtocolProcesses::TicketAck,
239        spawn_as_abortable!(ticket_ack_rx
240            .for_each_concurrent(NUM_CONCURRENT_TICKET_ACK_PROCESSING, move |(ack, sender)| {
241                let db = db_clone.clone();
242                let resolver = resolver_clone.clone();
243                async move {
244                    if let Ok(verified) = hopr_parallelize::cpu::spawn_blocking(move || ack.verify(&sender)).await {
245                        tracing::trace!(%sender, "received a valid acknowledgement");
246                            match db.handle_acknowledgement(verified, &resolver).await {
247                                Ok(_) => tracing::trace!(%sender, "successfully processed a known acknowledgement"),
248                                // Eventually, we do not care here if the acknowledgement does not belong to any
249                                // unacknowledged packets.
250                                Err(error) => tracing::trace!(%sender, %error, "valid acknowledgement is unknown or error occurred while processing it"),
251                            }
252                    } else {
253                        tracing::error!(%sender, "failed to verify signature on acknowledgement");
254                    }
255                }
256            })
257            .inspect(|_| tracing::warn!(task = "transport (protocol - ticket acknowledgement)", "long-running background task finished")))
258    );
259
260    let (ack_out_tx, ack_out_rx) =
261        futures::channel::mpsc::channel::<(Option<HalfKey>, OffchainPublicKey)>(ACK_OUT_BUFFER_SIZE);
262
263    #[cfg(feature = "capture")]
264    let capture_clone = capture.clone();
265
266    let db_clone = db.clone();
267    let resolver_clone = resolver.clone();
268    let me_clone = me.clone();
269    let msg_to_send_tx = wire_msg.0.clone();
270    processes.insert(
271        ProtocolProcesses::AckOut,
272        spawn_as_abortable!(
273            ack_out_rx
274                .for_each_concurrent(
275                    NUM_CONCURRENT_ACK_OUT_PROCESSING,
276                    move |(maybe_ack_key, destination)| {
277                        let db = db_clone.clone();
278                        let resolver = resolver_clone.clone();
279                        let me = me_clone.clone();
280                        let mut msg_to_send_tx_clone = msg_to_send_tx.clone();
281
282                        #[cfg(feature = "capture")]
283                        let mut capture = capture_clone.clone();
284                        async move {
285                            #[cfg(feature = "capture")]
286                            let (is_random, me_pub) = (
287                                maybe_ack_key.is_none(),
288                                *hopr_crypto_types::keypairs::Keypair::public(&me),
289                            );
290
291                            // Sign acknowledgement with the given half-key or generate a signed random one
292                            let ack = hopr_parallelize::cpu::spawn_blocking(move || {
293                                maybe_ack_key
294                                    .map(|ack_key| VerifiedAcknowledgement::new(ack_key, &me))
295                                    .unwrap_or_else(|| VerifiedAcknowledgement::random(&me))
296                            })
297                            .await;
298
299                            #[cfg(feature = "capture")]
300                            let captured_packet: capture::CapturedPacket = capture::PacketBeforeTransit::OutgoingAck {
301                                me: me_pub,
302                                ack,
303                                is_random,
304                                next_hop: destination,
305                            }
306                            .into();
307
308                            match db
309                                .to_send_no_ack(ack.leak().as_ref().into(), destination, &resolver)
310                                .await
311                            {
312                                Ok(ack_packet) => {
313                                    msg_to_send_tx_clone
314                                        .send((ack_packet.next_hop.into(), ack_packet.data))
315                                        .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
316                                        .await
317                                        .unwrap_or_else(|_| {
318                                            tracing::error!(
319                                                "failed to forward an acknowledgement to the transport layer: timeout"
320                                            );
321                                            Ok(())
322                                        })
323                                        .unwrap_or_else(|_| {
324                                            tracing::error!(
325                                                "failed to forward an acknowledgement to the transport layer"
326                                            );
327                                        });
328
329                                    #[cfg(feature = "capture")]
330                                    let _ = capture.try_send(captured_packet);
331                                }
332                                Err(error) => tracing::error!(%error, "failed to create ack packet"),
333                            }
334                        }
335                    }
336                )
337                .inspect(|_| tracing::warn!(
338                    task = "transport (protocol - ack outgoing)",
339                    "long-running background task finished"
340                ))
341        ),
342    );
343
344    let msg_processor_read = processor::PacketProcessor::new(db.clone(), resolver.clone(), packet_cfg);
345    let msg_processor_write = msg_processor_read.clone();
346
347    #[cfg(feature = "capture")]
348    let capture_clone = capture.clone();
349
350    let msg_to_send_tx = wire_msg.0.clone();
351    processes.insert(
352        ProtocolProcesses::MsgOut,
353        spawn_as_abortable!(async move {
354            let _neverending = api
355                .1
356                .then_concurrent(|(routing, data)| {
357                    let msg_processor = msg_processor_write.clone();
358
359                    #[cfg(feature = "capture")]
360                    let (mut capture_clone, data_clone, num_surbs) =
361                        (capture_clone.clone(), data.clone(), routing.count_return_paths() as u8);
362
363                    async move {
364                        match PacketWrapping::send(&msg_processor, data, routing).await {
365                            Ok(v) => {
366                                #[cfg(all(feature = "prometheus", not(test)))]
367                                {
368                                    METRIC_PACKET_COUNT.increment(&["sent"]);
369                                }
370
371                                #[cfg(feature = "capture")]
372                                let _ = capture_clone.try_send(
373                                    capture::PacketBeforeTransit::OutgoingPacket {
374                                        me: me_pub,
375                                        next_hop: v.next_hop,
376                                        num_surbs,
377                                        is_forwarded: false,
378                                        data: data_clone.data.to_bytes().into_vec().into(),
379                                        ack_challenge: v.ack_challenge.as_ref().into(),
380                                        signals: data_clone.packet_info.unwrap_or_default().signals_to_destination,
381                                        ticket: inspect_ticket_data_in_packet(&v.data).into(),
382                                    }
383                                    .into(),
384                                );
385
386                                Some((v.next_hop.into(), v.data))
387                            }
388                            Err(error) => {
389                                tracing::error!(%error, "packet could not be wrapped for sending");
390                                None
391                            }
392                        }
393                    }
394                })
395                .filter_map(futures::future::ready)
396                .inspect(|(peer, _)| tracing::trace!(%peer, "protocol message out"))
397                .map(Ok)
398                .forward(msg_to_send_tx)
399                .instrument(tracing::trace_span!("msg protocol processing - egress"))
400                .inspect(|_| {
401                    tracing::warn!(
402                        task = "transport (protocol - msg egress)",
403                        "long-running background task finished"
404                    )
405                })
406                .await;
407        }),
408    );
409
410    let ack_out_tx_clone_1 = ack_out_tx.clone();
411    let ack_out_tx_clone_2 = ack_out_tx.clone();
412
413    #[cfg(feature = "capture")]
414    let capture_clone = capture.clone();
415
416    // Create a cache for a CPU-intensive conversion PeerId -> OffchainPublicKey
417    let peer_id_cache: moka::future::Cache<PeerId, OffchainPublicKey> = moka::future::Cache::builder()
418        .time_to_idle(Duration::from_secs(600))
419        .max_capacity(100_000)
420        .build();
421
422    processes.insert(
423        ProtocolProcesses::MsgIn,
424        spawn_as_abortable!(async move {
425            let _neverending = wire_msg
426                .1
427                .then_concurrent(move |(peer, data)| {
428                    let msg_processor = msg_processor_read.clone();
429                    let mut ack_out_tx = ack_out_tx_clone_1.clone();
430                    let peer_id_key_cache = peer_id_cache.clone();
431
432                    tracing::trace!(%peer, "protocol message in");
433
434                    #[cfg(feature = "capture")]
435                    let (mut capture_clone, ticket_data_clone) = (
436                        capture.clone(),
437                        inspect_ticket_data_in_packet(&data).to_vec()
438                    );
439
440                    async move {
441                        // Try to retrieve the peer's public key from the cache or compute it if it does not exist yet
442                        let peer_key = match peer_id_key_cache
443                                .try_get_with_by_ref(&peer, hopr_parallelize::cpu::spawn_fifo_blocking(move || OffchainPublicKey::from_peerid(&peer)))
444                                .await {
445                            Ok(peer) => peer,
446                            Err(error) => {
447                                // There absolutely nothing we can do when the peer id is unparseable (e.g., non-ed25519 based)
448                                tracing::error!(%peer, %error, "dropping packet - cannot convert peer id");
449                                return None;
450                            }
451                        };
452
453                        let now = std::time::Instant::now();
454                        let res = msg_processor.recv(peer_key, data).await;
455                        let elapsed = now.elapsed();
456                        if elapsed > SLOW_OP {
457                            tracing::warn!(%peer, ?elapsed, "msg_processor.recv took too long");
458                        }
459
460                        // If there was an error caused by interpretation of the packet data,
461                        // we must send a random acknowledgement back.
462                        if let Err(error) = &res {
463                            tracing::error!(%peer, %error, "failed to process the received packet");
464
465                            // Send random signed acknowledgement to give feedback to the sender
466                            if error.is_undecodable() {
467                                // Do not send an ack back if the packet could not be decoded at all
468                                // 
469                                // Potentially adversarial behavior
470                                tracing::trace!(%peer, "not sending ack back on undecodable packet - possible adversarial behavior");
471                            } else {
472                                ack_out_tx
473                                    .send((None, peer_key))
474                                    .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
475                                    .await
476                                    .unwrap_or_else(|_| {
477                                        tracing::error!("failed to send ack to the egress queue: timeout");
478                                        Ok(())
479                                    })
480                                    .unwrap_or_else(|_| {
481                                        tracing::error!("failed to send ack to the egress queue");
482                                    });
483                            }
484                        }
485
486                        #[cfg(feature = "capture")]
487                        if let Ok(packet) = &res {
488                            let _ = capture_clone.try_send(capture::PacketBeforeTransit::IncomingPacket {
489                                    me: me_pub,
490                                    packet,
491                                    ticket: ticket_data_clone.into(),
492                                }.into()
493                            );
494                        }
495
496                        res.ok()
497                    }
498                })
499                .filter_map(move |maybe_packet| {
500                    let tbf = tbf.clone();
501
502                    futures::future::ready(
503                        if let Some(packet) = maybe_packet {
504                            match packet {
505                                IncomingPacket::Acknowledgement { packet_tag, previous_hop, .. } |
506                                IncomingPacket::Final { packet_tag, previous_hop,.. } |
507                                IncomingPacket::Forwarded { packet_tag, previous_hop, .. } => {
508                                    // This operation has run-time of ~10 nanoseconds,
509                                    // and therefore does not need to be invoked via spawn_blocking
510                                    if tbf.lock().check_and_set(&packet_tag) {
511                                        tracing::warn!(%previous_hop, "replayed packet received");
512
513                                        #[cfg(all(feature = "prometheus", not(test)))]
514                                        METRIC_REPLAYED_PACKET_COUNT.increment();
515
516                                        None
517                                    } else {
518                                        Some(packet)
519                                    }
520                                }
521                            }
522                        } else {
523                            tracing::trace!("received empty packet");
524                            None
525                        }
526                    )
527                })
528                .then_concurrent(move |packet| {
529                    let mut msg_to_send_tx = wire_msg.0.clone();
530
531                    #[cfg(feature = "capture")]
532                    let mut capture_clone = capture_clone.clone();
533
534                    let mut ticket_ack_tx_clone = ticket_ack_tx.clone();
535                    let mut ack_out_tx = ack_out_tx_clone_2.clone();
536                    async move {
537
538                    match packet {
539                        IncomingPacket::Acknowledgement {
540                            previous_hop,
541                            ack,
542                            ..
543                        } => {
544                            tracing::trace!(%previous_hop, "acknowledging ticket using received ack");
545                            ticket_ack_tx_clone
546                                .send((ack, previous_hop))
547                                .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
548                                .await
549                                .unwrap_or_else(|_| {
550                                    tracing::error!("failed dispatching received acknowledgement to the ticket ack queue: timeout");
551                                    Ok(())
552                                })
553                                .unwrap_or_else(|_| {
554                                    tracing::error!("failed dispatching received acknowledgement to the ticket ack queue");
555                                });
556
557                            // We do not acknowledge back acknowledgements.
558                            None
559                        },
560                        IncomingPacket::Final {
561                            previous_hop,
562                            sender,
563                            plain_text,
564                            ack_key,
565                            info,
566                            ..
567                        } => {
568                            // Send acknowledgement back
569                            ack_out_tx
570                                .send((Some(ack_key), previous_hop))
571                                .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
572                                .await
573                                .unwrap_or_else(|_| {
574                                    tracing::error!("failed to send ack to the egress queue: timeout");
575                                    Ok(())
576                                })
577                                .unwrap_or_else(|_| {
578                                    tracing::error!("failed to send ack to the egress queue");
579                                });
580
581                            #[cfg(all(feature = "prometheus", not(test)))]
582                            {
583                                METRIC_PACKET_COUNT.increment(&["received"]);
584                            }
585
586                            Some((sender, plain_text, info))
587                        }
588                        IncomingPacket::Forwarded {
589                            previous_hop,
590                            next_hop,
591                            data,
592                            ack_key,
593                            ..
594                        } => {
595                            // First, relay the packet to the next hop
596                            tracing::trace!(%previous_hop, %next_hop, "forwarding packet to the next hop");
597
598                            #[cfg(feature = "capture")]
599                            let captured_packet: capture::CapturedPacket = capture::PacketBeforeTransit::OutgoingPacket {
600                                me: me_pub,
601                                next_hop,
602                                num_surbs: 0,
603                                is_forwarded: true,
604                                data: data.as_ref().into(),
605                                ack_challenge: Default::default(),
606                                signals: None.into(),
607                                ticket: inspect_ticket_data_in_packet(data.as_ref()).into()
608                            }.into();
609
610                            msg_to_send_tx
611                                .send((next_hop.into(), data))
612                                .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
613                                .await
614                                .unwrap_or_else(|_| {
615                                    tracing::error!("failed to forward a packet to the transport layer: timeout");
616                                    Ok(())
617                                })
618                                .unwrap_or_else(|_| {
619                                    tracing::error!("failed to forward a packet to the transport layer");
620                                });
621
622                            #[cfg(all(feature = "prometheus", not(test)))]
623                            {
624                                METRIC_PACKET_COUNT.increment(&["forwarded"]);
625                            }
626
627                            #[cfg(feature = "capture")]
628                            let _ = capture_clone.try_send(captured_packet);
629
630                             // Send acknowledgement back
631                            tracing::trace!(%previous_hop, "acknowledging forwarded packet back");
632                            ack_out_tx
633                                .send((Some(ack_key), previous_hop))
634                                .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
635                                .await
636                                .unwrap_or_else(|_| {
637                                    tracing::error!("failed to send ack to the egress queue: timeout");
638                                    Ok(())
639                                })
640                                .unwrap_or_else(|_| {
641                                    tracing::error!("failed to send ack to the egress queue");
642                                });
643
644                            None
645                        }
646                    }
647                }})
648                .filter_map(|maybe_data| futures::future::ready(
649                    // Create the ApplicationDataIn data structure for incoming data
650                    maybe_data
651                        .and_then(|(sender, data, aux_info)| ApplicationData::try_from(data.as_ref())
652                            .inspect_err(|error| tracing::error!(%sender, %error, "failed to decode application data"))
653                            .ok()
654                            .map(|data| (sender, ApplicationDataIn {
655                                data,
656                                packet_info: IncomingPacketInfo {
657                                    signals_from_sender: aux_info.packet_signals,
658                                    num_saved_surbs: aux_info.num_surbs,
659                                }
660                            })))
661                ))
662                .map(Ok)
663                .forward(api.0)
664                .instrument(tracing::trace_span!("msg protocol processing - ingress"))
665                .inspect(|_| tracing::warn!(task = "transport (protocol - msg ingress)", "long-running background task finished"))
666                .await;
667        }),
668    );
669
670    processes
671}