hopr_transport_protocol/
lib.rs

1//! Collection of objects and functionality allowing building of p2p or stream protocols for the higher business logic
2//! layers.
3//!
4//! ## Contents
5//!
6//! Supported protocol configurations:
7//!
8//! - `mix`
9//! - `ack`
10//! - `heartbeat`
11//! - `ticket_aggregation`
12//!
13//! Supported protocol processors:
14//!
15//! - `ticket_aggregation`
16//!
17//! ### `ticket_aggregation`
18//!
19//! Ticket aggregation processing mechanism is responsible for ingesting the ticket aggregation related requests:
20//!
21//! - `Receive(PeerId, U)`,
22//! - `Reply(PeerId, std::result::Result<Ticket, String>, T)`,
23//! - `Send(PeerId, Vec<AcknowledgedTicket>, TicketAggregationFinalizer)`,
24//!
25//! where `U` is the type of an aggregated ticket extractable (`ResponseChannel<Result<Ticket, String>>`) and `T`
26//! represents a network negotiated identifier (`RequestId`).
27//!
28//! In broader context the protocol flow is as follows:
29//!
30//! 1. requesting ticket aggregation
31//!
32//!    - the peer A desires to aggregate tickets, collects the tickets into a data collection and sends a request
33//!      containing the collection to aggregate `Vec<AcknowledgedTicket>` to peer B using the `Send` mechanism
34//!
35//! 2. responding to ticket aggregation
36//!
37//!    - peer B obtains the request from peer A, performs the ticket aggregation and returns a result of that operation
38//!      in the form of `std::result::Result<Ticket, String>` using the `Reply` mechanism
39//!
40//! 3. accepting the aggregated ticket
41//!    - peer A receives the aggregated ticket using the `Receive` mechanism
42//!
43//! Furthermore, apart from the basic positive case scenario, standard mechanics of protocol communication apply:
44//!
45//! - the requesting side can time out, if the responding side takes too long to provide an aggregated ticket, in which
46//!   case the ticket is not considered aggregated, even if eventually an aggregated ticket is delivered
47//! - the responder can fail to aggregate tickets in which case it replies with an error string describing the failure
48//!   reason and it is the requester's responsibility to handle the negative case as well
49//!   - in the absence of response, the requester will time out
50
51/// Coder and decoder for the transport binary protocol layer
52mod codec;
53
54/// Configuration of the protocol components.
55pub mod config;
56/// Errors produced by the crate.
57pub mod errors;
58
59// protocols
60/// `heartbeat` p2p protocol
61pub mod heartbeat;
62/// processor for the protocol
63pub mod processor;
64
65/// Stream processing utilities
66pub mod stream;
67
68pub mod timer;
69
70/// Allows capturing dissected HOPR packets before they are processed by the transport.
71///
72/// Requires the `capture` feature to be enabled.
73#[cfg(feature = "capture")]
74mod capture;
75
76use std::{collections::HashMap, time::Duration};
77
78use futures::{SinkExt, StreamExt};
79use hopr_async_runtime::spawn_as_abortable;
80use hopr_crypto_types::types::{HalfKey, OffchainPublicKey};
81use hopr_db_api::protocol::{HoprDbProtocolOperations, IncomingPacket};
82use hopr_internal_types::{
83    prelude::{Acknowledgement, HoprPseudonym},
84    protocol::VerifiedAcknowledgement,
85};
86use hopr_network_types::prelude::ResolvedTransportRouting;
87use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, IncomingPacketInfo};
88use hopr_transport_bloom::TagBloomFilter;
89use hopr_transport_identity::{Multiaddr, PeerId};
90use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
91use tracing::{Instrument, error, trace, warn};
92
93use crate::processor::{PacketSendFinalizer, PacketUnwrapping, PacketWrapping};
94pub use crate::{processor::DEFAULT_PRICE_PER_PACKET, timer::execute_on_tick};
95
96const HOPR_PACKET_SIZE: usize = hopr_crypto_packet::prelude::HoprPacket::SIZE;
97const SLOW_OP_MS: u128 = 150;
98
99pub type HoprBinaryCodec = crate::codec::FixedLengthCodec<HOPR_PACKET_SIZE>;
100pub const CURRENT_HOPR_MSG_PROTOCOL: &str = "/hopr/mix/1.0.0";
101
102pub const TICKET_ACK_BUFFER_SIZE: usize = 1_000_000;
103pub const NUM_CONCURRENT_TICKET_ACK_PROCESSING: usize = 10;
104
105pub const ACK_OUT_BUFFER_SIZE: usize = 1_000_000;
106pub const NUM_CONCURRENT_ACK_OUT_PROCESSING: usize = 10;
107
108#[cfg(all(feature = "prometheus", not(test)))]
109use hopr_metrics::metrics::{MultiCounter, SimpleCounter};
110
111#[cfg(all(feature = "prometheus", not(test)))]
112lazy_static::lazy_static! {
113    // packet
114    static ref METRIC_PACKET_COUNT: MultiCounter = MultiCounter::new(
115        "hopr_packets_count",
116        "Number of processed packets of different types (sent, received, forwarded)",
117        &["type"]
118    ).unwrap();
119    static ref METRIC_PACKET_COUNT_PER_PEER: MultiCounter = MultiCounter::new(
120        "hopr_packets_per_peer_count",
121        "Number of processed packets to/from distinct peers",
122        &["peer", "direction"]
123    ).unwrap();
124    static ref METRIC_REPLAYED_PACKET_COUNT: SimpleCounter = SimpleCounter::new(
125        "hopr_replayed_packet_count",
126        "The total count of replayed packets during the packet processing pipeline run",
127    ).unwrap();
128    static ref METRIC_REJECTED_TICKETS_COUNT: SimpleCounter =
129        SimpleCounter::new("hopr_rejected_tickets_count", "Number of rejected tickets").unwrap();
130}
131
132#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
133pub enum ProtocolProcesses {
134    #[strum(to_string = "HOPR [msg] - ingress")]
135    MsgIn,
136    #[strum(to_string = "HOPR [msg] - egress")]
137    MsgOut,
138    #[strum(to_string = "HOPR [ack] - egress")]
139    AckOut,
140    #[strum(to_string = "HOPR [ack] - ingress - ticket acknowledgement")]
141    TicketAck,
142    #[strum(to_string = "HOPR [msg] - mixer")]
143    Mixer,
144    #[cfg(feature = "capture")]
145    #[strum(to_string = "packet capture")]
146    Capture,
147}
148/// Processed indexer generated events.
149#[derive(Debug, Clone)]
150pub enum PeerDiscovery {
151    Allow(PeerId),
152    Ban(PeerId),
153    Announce(PeerId, Vec<Multiaddr>),
154}
155
156#[cfg(feature = "capture")]
157fn inspect_ticket_data_in_packet(raw_packet: &[u8]) -> &[u8] {
158    use hopr_primitive_types::traits::BytesEncodable;
159    if raw_packet.len() >= hopr_internal_types::tickets::Ticket::SIZE {
160        &raw_packet[raw_packet.len() - hopr_internal_types::tickets::Ticket::SIZE..]
161    } else {
162        &[]
163    }
164}
165
166/// Run all processes responsible for handling the msg and acknowledgment protocols.
167///
168/// The pipeline does not handle the mixing itself, that needs to be injected as a separate process
169/// overlay on top of the `wire_msg` Stream or Sink.
170#[allow(clippy::too_many_arguments)]
171pub async fn run_msg_ack_protocol<Db>(
172    packet_cfg: processor::PacketInteractionConfig,
173    db: Db,
174    wire_msg: (
175        impl futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + Sync + 'static,
176        impl futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + Sync + 'static,
177    ),
178    api: (
179        impl futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + Sync + 'static,
180        impl futures::Stream<Item = (ApplicationDataOut, ResolvedTransportRouting, PacketSendFinalizer)>
181        + Send
182        + Sync
183        + 'static,
184    ),
185) -> HashMap<ProtocolProcesses, hopr_async_runtime::AbortHandle>
186where
187    Db: HoprDbProtocolOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
188{
189    let me = packet_cfg.packet_keypair.clone();
190
191    #[cfg(feature = "capture")]
192    let me_pub = *hopr_crypto_types::keypairs::Keypair::public(&me);
193
194    let mut processes = HashMap::new();
195
196    #[cfg(all(feature = "prometheus", not(test)))]
197    {
198        // Initialize the lazy statics here
199        lazy_static::initialize(&METRIC_PACKET_COUNT);
200        lazy_static::initialize(&METRIC_PACKET_COUNT_PER_PEER);
201        lazy_static::initialize(&METRIC_REPLAYED_PACKET_COUNT);
202        lazy_static::initialize(&METRIC_REJECTED_TICKETS_COUNT);
203    }
204
205    #[cfg(feature = "capture")]
206    let capture = {
207        use std::str::FromStr;
208        let writer: Box<dyn capture::PacketWriter + Send + 'static> =
209            if let Ok(desc) = std::env::var("HOPR_CAPTURE_PACKETS") {
210                if let Ok(udp_writer) = std::net::SocketAddr::from_str(&desc)
211                    .map_err(std::io::Error::other)
212                    .and_then(capture::UdpPacketDump::new)
213                {
214                    warn!("udp packet capture initialized to {desc}");
215                    Box::new(udp_writer)
216                } else if let Ok(pcap_writer) = std::fs::File::create(&desc).and_then(capture::PcapPacketWriter::new) {
217                    warn!("pcap file packet capture initialized to {desc}");
218                    Box::new(pcap_writer)
219                } else {
220                    error!(desc, "failed to create packet capture: invalid socket address or file");
221                    Box::new(capture::NullWriter)
222                }
223            } else {
224                warn!("no packet capture specified");
225                Box::new(capture::NullWriter)
226            };
227        let (capture, ah) = capture::packet_capture_channel(writer);
228        processes.insert(ProtocolProcesses::Capture, ah);
229        capture
230    };
231
232    let tbf = std::sync::Arc::new(parking_lot::Mutex::new(TagBloomFilter::default()));
233
234    let (ticket_ack_tx, ticket_ack_rx) =
235        futures::channel::mpsc::channel::<(Acknowledgement, OffchainPublicKey)>(TICKET_ACK_BUFFER_SIZE);
236
237    let db_clone = db.clone();
238    processes.insert(
239        ProtocolProcesses::TicketAck,
240        spawn_as_abortable!(ticket_ack_rx
241            .for_each_concurrent(NUM_CONCURRENT_TICKET_ACK_PROCESSING, move |(ack, sender)| {
242                let db = db_clone.clone();
243                async move {
244                    if let Ok(verified) = hopr_parallelize::cpu::spawn_blocking(move || ack.verify(&sender)).await {
245                        trace!(%sender, "received a valid acknowledgement");
246                            match db.handle_acknowledgement(verified).await {
247                                Ok(_) => trace!(%sender, "successfully processed a known acknowledgement"),
248                                // Eventually, we do not care here if the acknowledgement does not belong to any
249                                // unacknowledged packets.
250                                Err(error) => trace!(%sender, %error, "valid acknowledgement is unknown or error occurred while processing it"),
251                            }
252                    } else {
253                        error!(%sender, "failed to verify signature on acknowledgement");
254                    }
255                }
256            }))
257    );
258
259    let (ack_out_tx, ack_out_rx) =
260        futures::channel::mpsc::channel::<(Option<HalfKey>, OffchainPublicKey)>(ACK_OUT_BUFFER_SIZE);
261
262    #[cfg(feature = "capture")]
263    let capture_clone = capture.clone();
264
265    let db_clone = db.clone();
266    let me_clone = me.clone();
267    let msg_to_send_tx = wire_msg.0.clone();
268    processes.insert(
269        ProtocolProcesses::AckOut,
270        spawn_as_abortable!(ack_out_rx.for_each_concurrent(
271            NUM_CONCURRENT_ACK_OUT_PROCESSING,
272            move |(maybe_ack_key, destination)| {
273                let db = db_clone.clone();
274                let me = me_clone.clone();
275                let mut msg_to_send_tx_clone = msg_to_send_tx.clone();
276
277                #[cfg(feature = "capture")]
278                let mut capture = capture_clone.clone();
279                async move {
280                    #[cfg(feature = "capture")]
281                    let (is_random, me_pub) = (
282                        maybe_ack_key.is_none(),
283                        *hopr_crypto_types::keypairs::Keypair::public(&me),
284                    );
285
286                    // Sign acknowledgement with the given half-key or generate a signed random one
287                    let ack = hopr_parallelize::cpu::spawn_blocking(move || {
288                        maybe_ack_key
289                            .map(|ack_key| VerifiedAcknowledgement::new(ack_key, &me))
290                            .unwrap_or_else(|| VerifiedAcknowledgement::random(&me))
291                    })
292                    .await;
293
294                    #[cfg(feature = "capture")]
295                    let captured_packet: capture::CapturedPacket = capture::PacketBeforeTransit::OutgoingAck {
296                        me: me_pub,
297                        ack,
298                        is_random,
299                        next_hop: destination,
300                    }
301                    .into();
302
303                    match db.to_send_no_ack(ack.leak().as_ref().into(), destination).await {
304                        Ok(ack_packet) => {
305                            let now = std::time::Instant::now();
306                            if msg_to_send_tx_clone
307                                .send((ack_packet.next_hop.into(), ack_packet.data))
308                                .await
309                                .is_err()
310                            {
311                                error!("failed to forward an acknowledgement to the transport layer");
312                            }
313                            let elapsed = now.elapsed();
314                            if elapsed.as_millis() > SLOW_OP_MS {
315                                warn!(?elapsed, " msg_to_send_tx.send on ack took too long");
316                            }
317
318                            #[cfg(feature = "capture")]
319                            let _ = capture.try_send(captured_packet);
320                        }
321                        Err(error) => tracing::error!(%error, "failed to create ack packet"),
322                    }
323                }
324            }
325        )),
326    );
327
328    let msg_processor_read = processor::PacketProcessor::new(db.clone(), packet_cfg);
329    let msg_processor_write = msg_processor_read.clone();
330
331    #[cfg(feature = "capture")]
332    let capture_clone = capture.clone();
333
334    let msg_to_send_tx = wire_msg.0.clone();
335    processes.insert(
336        ProtocolProcesses::MsgOut,
337        spawn_as_abortable!(async move {
338            let _neverending = api
339                .1
340                .then_concurrent(|(data, routing, finalizer)| {
341                    let msg_processor = msg_processor_write.clone();
342
343                    #[cfg(feature = "capture")]
344                    let (mut capture_clone, data_clone, num_surbs) =
345                        (capture_clone.clone(), data.clone(), routing.count_return_paths() as u8);
346
347                    async move {
348                        match PacketWrapping::send(&msg_processor, data, routing).await {
349                            Ok(v) => {
350                                #[cfg(all(feature = "prometheus", not(test)))]
351                                {
352                                    METRIC_PACKET_COUNT_PER_PEER.increment(&[&v.next_hop.to_string(), "out"]);
353                                    METRIC_PACKET_COUNT.increment(&["sent"]);
354                                }
355                                finalizer.finalize(Ok(()));
356
357                                #[cfg(feature = "capture")]
358                                let _ = capture_clone.try_send(
359                                    capture::PacketBeforeTransit::OutgoingPacket {
360                                        me: me_pub,
361                                        next_hop: v.next_hop,
362                                        num_surbs,
363                                        is_forwarded: false,
364                                        data: data_clone.data.to_bytes().into_vec().into(),
365                                        ack_challenge: v.ack_challenge.as_ref().into(),
366                                        signals: data_clone.packet_info.unwrap_or_default().signals_to_destination,
367                                        ticket: inspect_ticket_data_in_packet(&v.data).into(),
368                                    }
369                                    .into(),
370                                );
371
372                                Some((v.next_hop.into(), v.data))
373                            }
374                            Err(e) => {
375                                finalizer.finalize(Err(e));
376                                None
377                            }
378                        }
379                    }
380                })
381                .filter_map(futures::future::ready)
382                .inspect(|(peer, _)| trace!(%peer, "protocol message out"))
383                .map(Ok)
384                .forward(msg_to_send_tx)
385                .instrument(tracing::trace_span!("msg protocol processing - outgoing"))
386                .await;
387        }),
388    );
389
390    let ack_out_tx_clone_1 = ack_out_tx.clone();
391    let ack_out_tx_clone_2 = ack_out_tx.clone();
392
393    #[cfg(feature = "capture")]
394    let capture_clone = capture.clone();
395
396    // Create a cache for a CPU-intensive conversion PeerId -> OffchainPublicKey
397    let peer_id_cache: moka::future::Cache<PeerId, OffchainPublicKey> = moka::future::Cache::builder()
398        .time_to_idle(Duration::from_secs(600))
399        .max_capacity(100_000)
400        .build();
401
402    processes.insert(
403        ProtocolProcesses::MsgIn,
404        spawn_as_abortable!(async move {
405            let _neverending = wire_msg
406                .1
407                .then_concurrent(move |(peer, data)| {
408                    let msg_processor = msg_processor_read.clone();
409                    let mut ack_out_tx = ack_out_tx_clone_1.clone();
410                    let peer_id_key_cache = peer_id_cache.clone();
411
412                    trace!(%peer, "protocol message in");
413
414                    #[cfg(feature = "capture")]
415                    let (mut capture_clone, ticket_data_clone) = (
416                        capture.clone(),
417                        inspect_ticket_data_in_packet(&data).to_vec()
418                    );
419
420                    async move {
421                        // Try to retrieve the peer's public key from the cache or compute it if it does not exist yet
422                        let peer_key = match peer_id_key_cache
423                                .try_get_with_by_ref(&peer, hopr_parallelize::cpu::spawn_fifo_blocking(move || OffchainPublicKey::from_peerid(&peer)))
424                                .await {
425                            Ok(peer) => peer,
426                            Err(error) => {
427                                // There absolutely nothing we can do when the peer id is unparseable (e.g., non-ed25519 based)
428                                error!(%peer, %error, "dropping packet - cannot convert peer id");
429                                return None;
430                            }
431                        };
432
433                        let now = std::time::Instant::now();
434                        let res = msg_processor.recv(peer_key, data).await.map_err(move |e| (peer, e));
435                        let elapsed = now.elapsed();
436                        if elapsed.as_millis() > SLOW_OP_MS {
437                            warn!(%peer, ?elapsed, "msg_processor.recv took too long");
438                        }
439
440                        // If there was an error caused by interpretation of the packet data,
441                        // we must send a random acknowledgement back.
442                        if let Err((peer, error)) = &res {
443                            #[cfg(all(feature = "prometheus", not(test)))]
444                            if let hopr_crypto_packet::errors::PacketError::TicketValidation(_) = error {
445                                METRIC_REJECTED_TICKETS_COUNT.increment();
446                            }
447
448                            error!(%peer, %error, "failed to process the received message");
449
450                            // Send random signed acknowledgement to give feedback to the sender
451                            if let hopr_crypto_packet::errors::PacketError::PacketDecodingError(_) | hopr_crypto_packet::errors::PacketError::SphinxError(_) = error {
452                                // Do not send an ack back if the packet could not be decoded at all
453                                // 
454                                // Potentially adversarial behavior
455                                tracing::trace!(%peer, "not sending ack back on undecodable packet - possible adversarial behavior");
456                            } else {
457                                let now = std::time::Instant::now();
458
459                                if let Err(error) = ack_out_tx.send((None, peer_key)).await {
460                                    tracing::error!(%error, "failed to send ack to the egress queue");
461                                }
462                                let elapsed = now.elapsed();
463                                if elapsed.as_millis() > SLOW_OP_MS {
464                                    warn!(%peer, ?elapsed, "ack_out.send on failed packet took too long");
465                                }
466                            }
467                        }
468
469                        #[cfg(feature = "capture")]
470                        if let Ok(packet) = &res {
471                            let _ = capture_clone.try_send(capture::PacketBeforeTransit::IncomingPacket {
472                                    me: me_pub,
473                                    packet,
474                                    ticket: ticket_data_clone.into(),
475                                }.into()
476                            );
477                        }
478
479                        res.ok()
480                    }
481                })
482                .filter_map(move |maybe_packet| {
483                    let tbf = tbf.clone();
484
485                    futures::future::ready(
486                        if let Some(packet) = maybe_packet {
487                            match packet {
488                                IncomingPacket::Acknowledgement { packet_tag, previous_hop, .. } |
489                                IncomingPacket::Final { packet_tag, previous_hop,.. } |
490                                IncomingPacket::Forwarded { packet_tag, previous_hop, .. } => {
491                                    // This operation has run-time of ~10 nanoseconds,
492                                    // and therefore does not need to be invoked via spawn_blocking
493                                    if tbf.lock().check_and_set(&packet_tag) {
494                                        warn!(%previous_hop, "replayed packet received");
495
496                                        #[cfg(all(feature = "prometheus", not(test)))]
497                                        METRIC_REPLAYED_PACKET_COUNT.increment();
498
499                                        None
500                                    } else {
501                                        Some(packet)
502                                    }
503                                }
504                            }
505                        } else {
506                            trace!("received empty packet");
507                            None
508                        }
509                    )
510                })
511                .then_concurrent(move |packet| {
512                    let mut msg_to_send_tx = wire_msg.0.clone();
513
514                    #[cfg(feature = "capture")]
515                    let mut capture_clone = capture_clone.clone();
516
517                    let mut ticket_ack_tx_clone = ticket_ack_tx.clone();
518                    let mut ack_out_tx = ack_out_tx_clone_2.clone();
519                    async move {
520
521                    match packet {
522                        IncomingPacket::Acknowledgement {
523                            previous_hop,
524                            ack,
525                            ..
526                        } => {
527                            trace!(%previous_hop, "acknowledging ticket using received ack");
528                            let now = std::time::Instant::now();
529                            if let Err(error) = ticket_ack_tx_clone.send((ack, previous_hop)).await {
530                                tracing::error!(%error, "failed dispatching received acknowledgement to the ticket ack queue");
531                            }
532                            let elapsed = now.elapsed();
533                            if elapsed.as_millis() > SLOW_OP_MS {
534                                warn!(?elapsed," ack_tx.send took too long");
535                            }
536                            let elapsed = now.elapsed();
537                            if elapsed.as_millis() > SLOW_OP_MS {
538                                warn!(%previous_hop, ?elapsed, "ack_processor.handle_acknowledgement took too long");
539                            }
540
541                            // We do not acknowledge back acknowledgements.
542                            None
543                        },
544                        IncomingPacket::Final {
545                            previous_hop,
546                            sender,
547                            plain_text,
548                            ack_key,
549                            info,
550                            ..
551                        } => {
552                            // Send acknowledgement back
553                            trace!(%previous_hop, "acknowledging final packet back");
554                            let now = std::time::Instant::now();
555                            if let Err(error) = ack_out_tx.send((Some(ack_key), previous_hop)).await {
556                                tracing::error!(%error, "failed to send ack to the egress queue");
557                            }
558                            let elapsed = now.elapsed();
559                            if elapsed.as_millis() > SLOW_OP_MS {
560                                warn!(%previous_hop, ?elapsed, "ack_out.send on final packet took too long");
561                            }
562
563                            #[cfg(all(feature = "prometheus", not(test)))]
564                            {
565                                METRIC_PACKET_COUNT_PER_PEER.increment(&[&previous_hop.to_string(), "in"]);
566                                METRIC_PACKET_COUNT.increment(&["received"]);
567                            }
568
569                            Some((sender, plain_text, info))
570                        }
571                        IncomingPacket::Forwarded {
572                            previous_hop,
573                            next_hop,
574                            data,
575                            ack_key,
576                            ..
577                        } => {
578                            // First, relay the packet to the next hop
579                            trace!(%previous_hop, %next_hop, "forwarding packet to the next hop");
580
581                            #[cfg(feature = "capture")]
582                            let captured_packet: capture::CapturedPacket = capture::PacketBeforeTransit::OutgoingPacket {
583                                me: me_pub,
584                                next_hop,
585                                num_surbs: 0,
586                                is_forwarded: true,
587                                data: data.as_ref().into(),
588                                ack_challenge: Default::default(),
589                                signals: None.into(),
590                                ticket: inspect_ticket_data_in_packet(data.as_ref()).into()
591                            }.into();
592
593                            msg_to_send_tx
594                                .send((next_hop.into(), data))
595                                .await
596                                .unwrap_or_else(|_| {
597                                    error!("failed to forward a packet to the transport layer");
598                                });
599
600                            #[cfg(all(feature = "prometheus", not(test)))]
601                            {
602                                METRIC_PACKET_COUNT.increment(&["forwarded"]);
603                            }
604
605                            #[cfg(feature = "capture")]
606                            let _ = capture_clone.try_send(captured_packet);
607
608                             // Send acknowledgement back
609                            trace!(%previous_hop, "acknowledging forwarded packet back");
610                            let now = std::time::Instant::now();
611                            if let Err(error) = ack_out_tx.send((Some(ack_key), previous_hop)).await {
612                                tracing::error!(%error, "failed to send ack to the egress queue");
613                            }
614                            let elapsed = now.elapsed();
615                            if elapsed.as_millis() > SLOW_OP_MS {
616                                warn!(%previous_hop, ?elapsed, "ack_out.send on forwarded packet took too long");
617                            }
618
619                            None
620                        }
621                    }
622                }})
623                .filter_map(|maybe_data| futures::future::ready(
624                    // Create the ApplicationDataIn data structure for incoming data
625                    maybe_data
626                        .and_then(|(sender, data, aux_info)| ApplicationData::try_from(data.as_ref())
627                            .inspect_err(|error| tracing::error!(%sender, %error, "failed to decode application data"))
628                            .ok()
629                            .map(|data| (sender, ApplicationDataIn {
630                                data,
631                                packet_info: IncomingPacketInfo {
632                                    signals_from_sender: aux_info.packet_signals,
633                                    num_saved_surbs: aux_info.num_surbs,
634                                }
635                            })))
636                ))
637                .map(Ok)
638                .forward(api.0)
639                .instrument(tracing::trace_span!("msg protocol processing - incoming"))
640                .await;
641        }),
642    );
643
644    processes
645}