hopr_transport_protocol/
pipeline.rs

1use futures::{SinkExt, StreamExt};
2use futures_time::future::FutureExt as TimeExt;
3use hopr_async_runtime::{AbortableList, spawn_as_abortable};
4use hopr_crypto_types::prelude::*;
5use hopr_internal_types::prelude::*;
6use hopr_network_types::{
7    prelude::*,
8    timeout::{SinkTimeoutError, TimeoutSinkExt, TimeoutStreamExt},
9};
10use hopr_primitive_types::prelude::Address;
11use hopr_protocol_app::prelude::*;
12use hopr_protocol_hopr::prelude::*;
13use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
14use tracing::Instrument;
15
16use crate::PeerId;
17
18const TICKET_ACK_BUFFER_SIZE: usize = 1_000_000;
19const NUM_CONCURRENT_TICKET_ACK_PROCESSING: usize = 10;
20const ACK_OUT_BUFFER_SIZE: usize = 1_000_000;
21const NUM_CONCURRENT_ACK_OUT_PROCESSING: usize = 10;
22const QUEUE_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50);
23const PACKET_DECODING_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(150);
24
25#[cfg(all(feature = "prometheus", not(test)))]
26lazy_static::lazy_static! {
27    static ref METRIC_PACKET_COUNT:  hopr_metrics::MultiCounter =  hopr_metrics::MultiCounter::new(
28        "hopr_packets_count",
29        "Number of processed packets of different types (sent, received, forwarded)",
30        &["type"]
31    ).unwrap();
32}
33
34#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
35pub enum PacketPipelineProcesses {
36    #[strum(to_string = "HOPR [msg] - ingress")]
37    MsgIn,
38    #[strum(to_string = "HOPR [msg] - egress")]
39    MsgOut,
40    #[strum(to_string = "HOPR [ack] - egress")]
41    AckOut,
42    #[strum(to_string = "HOPR [ack] - ingress")]
43    AckIn,
44    #[strum(to_string = "HOPR [msg] - mixer")]
45    Mixer,
46}
47
48/// Ticket events emitted from the packet processing pipeline.
49#[derive(Debug, Clone, strum::EnumIs, strum::EnumTryAs)]
50pub enum TicketEvent {
51    /// A winning ticket was received.
52    WinningTicket(Box<RedeemableTicket>),
53    /// A ticket has been rejected.
54    RejectedTicket(Box<Ticket>, Option<Address>),
55}
56
57/// Performs encoding of outgoing Application protocol packets into HOPR protocol outgoing packets.
58async fn start_outgoing_packet_pipeline<AppOut, E, WOut, WOutErr>(
59    app_outgoing: AppOut,
60    encoder: std::sync::Arc<E>,
61    wire_outgoing: WOut,
62) where
63    AppOut: futures::Stream<Item = (ResolvedTransportRouting, ApplicationDataOut)> + Send + 'static,
64    E: PacketEncoder + Send + 'static,
65    WOut: futures::Sink<(PeerId, Box<[u8]>), Error = SinkTimeoutError<WOutErr>> + Clone + Unpin + Send + 'static,
66    WOutErr: std::error::Error,
67{
68    let res = app_outgoing
69        .then_concurrent(|(routing, data)| {
70            let encoder = encoder.clone();
71            async move {
72                match encoder
73                    .encode_packet(
74                        data.data.to_bytes(),
75                        routing,
76                        data.packet_info
77                            .map(|data| data.signals_to_destination)
78                            .unwrap_or_default(),
79                    )
80                    .await
81                {
82                    Ok(packet) => {
83                        #[cfg(all(feature = "prometheus", not(test)))]
84                        METRIC_PACKET_COUNT.increment(&["sent"]);
85
86                        tracing::trace!(peer = %packet.next_hop, "protocol message out");
87                        Some((packet.next_hop.into(), packet.data))
88                    }
89                    Err(error) => {
90                        tracing::error!(%error, "packet could not be wrapped for sending");
91                        None
92                    }
93                }
94            }
95        })
96        .filter_map(futures::future::ready)
97        .map(Ok)
98        .forward_to_timeout(wire_outgoing)
99        .instrument(tracing::trace_span!("msg protocol processing - egress"))
100        .await;
101
102    if let Err(error) = res {
103        tracing::error!(
104            task = "transport (protocol - msg egress)",
105            %error,
106            "long-running background task finished with error"
107        );
108    } else {
109        tracing::warn!(
110            task = "transport (protocol - msg egress)",
111            "long-running background task finished"
112        )
113    }
114}
115
116/// Performs HOPR protocol decoding of incoming packets into Application protocol packets.
117///
118/// `wire_incoming` --> `decoder` --> `ack_outgoing` (final + forwarded)
119///                             | --> `wire_outgoing` (forwarded)
120///                             | --> `ack_incoming` (forwarded)
121///                             | --> `app_incoming` (final)
122#[allow(clippy::too_many_arguments)] // Allowed on internal functions
123async fn start_incoming_packet_pipeline<WIn, WOut, D, T, TEvt, AckIn, AckOut, AppIn, AppInErr>(
124    wire_incoming: WIn,
125    decoder: std::sync::Arc<D>,
126    ticket_proc: std::sync::Arc<T>,
127    ticket_events: TEvt,
128    ack_outgoing: AckOut,
129    wire_outgoing: WOut,
130    ack_incoming: AckIn,
131    app_incoming: AppIn,
132) where
133    WIn: futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + 'static,
134    WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
135    WOut::Error: std::error::Error,
136    D: PacketDecoder + Send + 'static,
137    T: UnacknowledgedTicketProcessor + Send + 'static,
138    TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
139    TEvt::Error: std::error::Error,
140    AckIn: futures::Sink<(OffchainPublicKey, Acknowledgement)> + Send + Unpin + Clone + 'static,
141    AckIn::Error: std::error::Error,
142    AckOut: futures::Sink<(OffchainPublicKey, Option<HalfKey>)> + Send + Unpin + Clone + 'static,
143    AckOut::Error: std::error::Error,
144    AppIn: futures::Sink<(HoprPseudonym, ApplicationDataIn), Error = SinkTimeoutError<AppInErr>> + Send + 'static,
145    AppInErr: std::error::Error,
146{
147    let ack_outgoing_success = ack_outgoing.clone();
148    let ack_outgoing_failure = ack_outgoing;
149    let ticket_proc_success = ticket_proc;
150
151    let res = wire_incoming
152        .then_concurrent(move |(peer, data)| {
153            let decoder = decoder.clone();
154            let mut ack_outgoing_failure = ack_outgoing_failure.clone();
155            let mut ticket_events_reject = ticket_events.clone();
156
157            tracing::trace!(%peer, "protocol message in");
158
159            async move {
160                match decoder.decode(peer, data)
161                    .timeout(futures_time::time::Duration::from(PACKET_DECODING_TIMEOUT))
162                    .await {
163                    Ok(Ok(packet)) => {
164                        tracing::trace!(%peer, ?packet, "successfully decoded incoming packet");
165                        Some(packet)
166                    },
167                    Ok(Err(IncomingPacketError::Undecodable(error))) => {
168                        // Do not send an ack back if the packet could not be decoded at all
169                        //
170                        // Potentially adversarial behavior
171                        tracing::trace!(%peer, %error, "not sending ack back on undecodable packet - possible adversarial behavior");
172                        None
173                    },
174                    Ok(Err(IncomingPacketError::ProcessingError(sender, error))) => {
175                        tracing::error!(%peer, %error, "failed to process the decoded packet");
176                        // On this failure, we send back a random acknowledgement
177                        ack_outgoing_failure
178                            .send((sender, None))
179                            .await
180                            .unwrap_or_else(|error| {
181                                tracing::error!(%error, "failed to send ack to the egress queue");
182                            });
183                        None
184                    },
185                    Ok(Err(IncomingPacketError::InvalidTicket(sender, error))) => {
186                        tracing::error!(%peer, %error, "failed to validate ticket on the received packet");
187                        if let Err(error) = ticket_events_reject
188                            .send(TicketEvent::RejectedTicket(error.ticket, error.issuer))
189                            .await {
190                            tracing::error!(%error, "failed to notify invalid ticket rejection");
191                        }
192                        // On this failure, we send back a random acknowledgement
193                        ack_outgoing_failure
194                            .send((sender, None))
195                            .await
196                            .unwrap_or_else(|error| {
197                                tracing::error!(%error, "failed to send ack to the egress queue");
198                            });
199                        None
200                    }
201                    Err(_) => {
202                        // If we cannot decode the packet within the time limit, just drop it
203                        tracing::error!(%peer, "dropped incoming packet: failed to decode packet within {:?}", PACKET_DECODING_TIMEOUT);
204                        None
205                    }
206                }
207            }
208        })
209        .filter_map(futures::future::ready)
210        .then_concurrent(move |packet| {
211            let ticket_proc = ticket_proc_success.clone();
212            let mut wire_outgoing = wire_outgoing.clone();
213            let mut ack_incoming = ack_incoming.clone();
214            let mut ack_outgoing_success = ack_outgoing_success.clone();
215            async move {
216                match packet {
217                    IncomingPacket::Acknowledgement(ack) => {
218                        let IncomingAcknowledgementPacket { previous_hop, received_ack: ack, .. } = *ack;
219                        tracing::trace!(%previous_hop , "acknowledging ticket using received ack");
220                        ack_incoming
221                            .send((previous_hop, ack))
222                            .await
223                            .unwrap_or_else(|error| {
224                                tracing::error!(%error, "failed dispatching received acknowledgement to the ticket ack queue");
225                            });
226
227                        // We do not acknowledge back acknowledgements.
228                        None
229                    },
230                    IncomingPacket::Final(final_packet) => {
231                        let IncomingFinalPacket {
232                            previous_hop,
233                            sender,
234                            plain_text,
235                            ack_key,
236                            info,
237                            ..
238                        } = *final_packet;
239                        tracing::trace!(%previous_hop, "incoming final packet");
240
241                        // Send acknowledgement back
242                        ack_outgoing_success
243                            .send((previous_hop, Some(ack_key)))
244                            .await
245                            .unwrap_or_else(|error| {
246                                tracing::error!(%error, "failed to send ack to the egress queue");
247                            });
248
249                        #[cfg(all(feature = "prometheus", not(test)))]
250                        METRIC_PACKET_COUNT.increment(&["received"]);
251
252                        Some((sender, plain_text, info))
253                    }
254                    IncomingPacket::Forwarded(fwd_packet) => {
255                        let IncomingForwardedPacket {
256                            previous_hop,
257                            next_hop,
258                            data,
259                            ack_key_prev_hop,
260                            ack_challenge,
261                            received_ticket,
262                            ..
263                        } = *fwd_packet;
264                        if let Err(error) = ticket_proc.insert_unacknowledged_ticket(&next_hop, ack_challenge, received_ticket).await {
265                            tracing::error!(%previous_hop, %next_hop, %error, "failed to insert unacknowledged ticket into the ticket processor");
266                            return None;
267                        }
268
269                        // First, relay the packet to the next hop
270                        tracing::trace!(%previous_hop, %next_hop, "forwarding packet to the next hop");
271
272                        wire_outgoing
273                            .send((next_hop.into(), data))
274                            .await
275                            .unwrap_or_else(|error| {
276                                tracing::error!(%error, "failed to forward a packet to the transport layer");
277                            });
278
279                        #[cfg(all(feature = "prometheus", not(test)))]
280                        METRIC_PACKET_COUNT.increment(&["forwarded"]);
281
282                        // Send acknowledgement back
283                        tracing::trace!(%previous_hop, "acknowledging forwarded packet back");
284                        ack_outgoing_success
285                            .send((previous_hop, Some(ack_key_prev_hop)))
286                            .await
287                            .unwrap_or_else(|error| {
288                                tracing::error!(%error, "failed to send ack to the egress queue");
289                            });
290
291                        None
292                    }
293                }
294            }})
295        .filter_map(|maybe_data| futures::future::ready(
296            // Create the ApplicationDataIn data structure for incoming data
297            maybe_data
298                .and_then(|(sender, data, aux_info)| ApplicationData::try_from(data.as_ref())
299                    .inspect_err(|error| tracing::error!(%sender, %error, "failed to decode application data"))
300                    .ok()
301                    .map(|data| (sender, ApplicationDataIn {
302                        data,
303                        packet_info: IncomingPacketInfo {
304                            signals_from_sender: aux_info.packet_signals,
305                            num_saved_surbs: aux_info.num_surbs,
306                        }
307                    })))
308        ))
309        .map(Ok)
310        .inspect(|data| tracing::trace!(?data, "application data in"))
311        .forward_to_timeout(app_incoming)
312        .instrument(tracing::trace_span!("msg protocol processing - ingress"))
313        .await;
314
315    if let Err(error) = res {
316        tracing::error!(
317            task = "transport (protocol - msg ingress)",
318            %error,
319            "long-running background task finished with error"
320        );
321    } else {
322        tracing::warn!(
323            task = "transport (protocol - msg ingress)",
324            "long-running background task finished"
325        )
326    }
327}
328
329async fn start_outgoing_ack_pipeline<AckOut, E, WOut>(
330    ack_outgoing: AckOut,
331    encoder: std::sync::Arc<E>,
332    packet_key: OffchainKeypair,
333    wire_outgoing: WOut,
334) where
335    AckOut: futures::Stream<Item = (OffchainPublicKey, Option<HalfKey>)> + Send + 'static,
336    E: PacketEncoder + Send + 'static,
337    WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
338    WOut::Error: std::error::Error,
339{
340    ack_outgoing
341        .for_each_concurrent(
342            NUM_CONCURRENT_ACK_OUT_PROCESSING,
343            move |(destination, maybe_ack_key)| {
344                let packet_key = packet_key.clone();
345                let encoder = encoder.clone();
346                let mut wire_outgoing = wire_outgoing.clone();
347
348                async move {
349                    // Sign acknowledgement with the given half-key or generate a signed random one
350                    let ack = hopr_parallelize::cpu::spawn_blocking(move || {
351                        maybe_ack_key
352                            .map(|ack_key| VerifiedAcknowledgement::new(ack_key, &packet_key))
353                            .unwrap_or_else(|| VerifiedAcknowledgement::random(&packet_key))
354                    })
355                        .await;
356
357                    match encoder.encode_acknowledgement(ack, &destination).await {
358                        Ok(ack_packet) => {
359                            tracing::trace!(%destination, "acknowledgement out");
360                            wire_outgoing
361                                .send((ack_packet.next_hop.into(), ack_packet.data))
362                                .await
363                                .unwrap_or_else(|error| {
364                                    tracing::error!(%error, "failed to forward an acknowledgement to the transport layer");
365                                });
366                        }
367                        Err(error) => tracing::error!(%error, "failed to create ack packet"),
368                    }
369                }
370            }
371        ).await;
372
373    tracing::warn!(
374        task = "transport (protocol - ack outgoing)",
375        "long-running background task finished"
376    );
377}
378
379async fn start_incoming_ack_pipeline<AckIn, T, TEvt>(
380    ack_incoming: AckIn,
381    ticket_events: TEvt,
382    ticket_proc: std::sync::Arc<T>,
383) where
384    AckIn: futures::Stream<Item = (OffchainPublicKey, Acknowledgement)> + Send + 'static,
385    T: UnacknowledgedTicketProcessor + Sync + Send + 'static,
386    TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
387    TEvt::Error: std::error::Error,
388{
389    ack_incoming
390        .for_each_concurrent(NUM_CONCURRENT_TICKET_ACK_PROCESSING, move |(peer, ack)| {
391            let ticket_proc = ticket_proc.clone();
392            let mut ticket_evt = ticket_events.clone();
393            async move {
394                tracing::trace!(%peer, "received acknowledgement");
395                match ticket_proc.acknowledge_ticket(peer, ack).await {
396                    Ok(Some(ResolvedAcknowledgement::RelayingWin(redeemable_ticket))) => {
397                        tracing::trace!(%peer, "received ack for a winning ticket");
398                        ticket_evt
399                            .send(TicketEvent::WinningTicket(redeemable_ticket))
400                            .await
401                            .unwrap_or_else(|error| {
402                                tracing::error!(%error, "failed to notify winning ticket");
403                            });
404                    }
405                    Ok(Some(ResolvedAcknowledgement::RelayingLoss(_))) => {
406                        // Losing tickets are not getting accounted for anywhere.
407                        tracing::trace!(%peer, "received ack for a losing ticket");
408                    }
409                    Ok(None) => {
410                        // Unexpected acknowledgements naturally happen
411                        // as acknowledgements of 0-hop packets
412                        tracing::trace!(%peer, "received unexpected acknowledgement");
413                    }
414                    Err(error) => {
415                        tracing::error!(%error, "failed to acknowledge ticket");
416                    }
417                }
418            }
419        })
420        .await;
421
422    tracing::warn!(
423        task = "transport (protocol - ticket acknowledgement)",
424        "long-running background task finished"
425    );
426}
427
428/// Run all processes responsible for handling the msg and acknowledgment protocols.
429///
430/// The pipeline does not handle the mixing itself, that needs to be injected as a separate process
431/// overlay on top of the `wire_msg` Stream or Sink.
432pub fn run_packet_pipeline<WIn, WOut, C, D, T, TEvt, AppOut, AppIn>(
433    packet_key: OffchainKeypair,
434    wire_msg: (WOut, WIn),
435    codec: (C, D),
436    ticket_proc: T,
437    ticket_events: TEvt,
438    api: (AppOut, AppIn),
439) -> AbortableList<PacketPipelineProcesses>
440where
441    WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
442    WOut::Error: std::error::Error,
443    WIn: futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + 'static,
444    C: PacketEncoder + Sync + Send + 'static,
445    D: PacketDecoder + Sync + Send + 'static,
446    T: UnacknowledgedTicketProcessor + Sync + Send + 'static,
447    TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
448    TEvt::Error: std::error::Error,
449    AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
450    AppOut::Error: std::error::Error,
451    AppIn: futures::Stream<Item = (ResolvedTransportRouting, ApplicationDataOut)> + Send + 'static,
452{
453    let mut processes = AbortableList::default();
454
455    #[cfg(all(feature = "prometheus", not(test)))]
456    {
457        // Initialize the lazy statics here
458        lazy_static::initialize(&METRIC_PACKET_COUNT);
459    }
460
461    let (outgoing_ack_tx, outgoing_ack_rx) =
462        futures::channel::mpsc::channel::<(OffchainPublicKey, Option<HalfKey>)>(ACK_OUT_BUFFER_SIZE);
463
464    let (incoming_ack_tx, incoming_ack_rx) =
465        futures::channel::mpsc::channel::<(OffchainPublicKey, Acknowledgement)>(TICKET_ACK_BUFFER_SIZE);
466
467    // Attach timeouts to all Sinks so that the pipelines are not blocked when
468    // some channel is not being timely processed
469    let (wire_out, wire_in) = (wire_msg.0.with_timeout(QUEUE_SEND_TIMEOUT), wire_msg.1);
470    let (app_out, app_in) = (api.0.with_timeout(QUEUE_SEND_TIMEOUT), api.1);
471    let incoming_ack_tx = incoming_ack_tx.with_timeout(QUEUE_SEND_TIMEOUT);
472    let outgoing_ack_tx = outgoing_ack_tx.with_timeout(QUEUE_SEND_TIMEOUT);
473    let ticket_events = ticket_events.with_timeout(QUEUE_SEND_TIMEOUT);
474
475    let encoder = std::sync::Arc::new(codec.0);
476    let decoder = std::sync::Arc::new(codec.1);
477    let ticket_proc = std::sync::Arc::new(ticket_proc);
478
479    processes.insert(
480        PacketPipelineProcesses::MsgOut,
481        spawn_as_abortable!(start_outgoing_packet_pipeline(
482            app_in,
483            encoder.clone(),
484            wire_out.clone(),
485        )),
486    );
487
488    processes.insert(
489        PacketPipelineProcesses::MsgIn,
490        spawn_as_abortable!(start_incoming_packet_pipeline(
491            wire_in,
492            decoder,
493            ticket_proc.clone(),
494            ticket_events.clone(),
495            outgoing_ack_tx,
496            wire_out.clone(),
497            incoming_ack_tx,
498            app_out,
499        )),
500    );
501
502    processes.insert(
503        PacketPipelineProcesses::AckOut,
504        spawn_as_abortable!(start_outgoing_ack_pipeline(
505            outgoing_ack_rx,
506            encoder,
507            packet_key.clone(),
508            wire_out,
509        )),
510    );
511
512    processes.insert(
513        PacketPipelineProcesses::AckIn,
514        spawn_as_abortable!(start_incoming_ack_pipeline(incoming_ack_rx, ticket_events, ticket_proc)),
515    );
516
517    processes
518}