hopr_transport_protocol/
lib.rs

1//! Collection of objects and functionality allowing building of p2p or stream protocols for the higher business logic
2//! layers.
3//!
4//! ## Contents
5//!
6//! Supported protocol configurations:
7//!
8//! - `mix`
9//! - `ack`
10//! - `heartbeat`
11//! - `ticket_aggregation`
12//!
13//! Supported protocol processors:
14//!
15//! - `ticket_aggregation`
16//!
17//! ### `ticket_aggregation`
18//!
19//! Ticket aggregation processing mechanism is responsible for ingesting the ticket aggregation related requests:
20//!
21//! - `Receive(PeerId, U)`,
22//! - `Reply(PeerId, std::result::Result<Ticket, String>, T)`,
23//! - `Send(PeerId, Vec<AcknowledgedTicket>, TicketAggregationFinalizer)`,
24//!
25//! where `U` is the type of an aggregated ticket extractable (`ResponseChannel<Result<Ticket, String>>`) and `T`
26//! represents a network negotiated identifier (`RequestId`).
27//!
28//! In broader context the protocol flow is as follows:
29//!
30//! 1. requesting ticket aggregation
31//!
32//!    - the peer A desires to aggregate tickets, collects the tickets into a data collection and sends a request
33//!      containing the collection to aggregate `Vec<AcknowledgedTicket>` to peer B using the `Send` mechanism
34//!
35//! 2. responding to ticket aggregation
36//!
37//!    - peer B obtains the request from peer A, performs the ticket aggregation and returns a result of that operation
38//!      in the form of `std::result::Result<Ticket, String>` using the `Reply` mechanism
39//!
40//! 3. accepting the aggregated ticket
41//!    - peer A receives the aggregated ticket using the `Receive` mechanism
42//!
43//! Furthermore, apart from the basic positive case scenario, standard mechanics of protocol communication apply:
44//!
45//! - the requesting side can time out, if the responding side takes too long to provide an aggregated ticket, in which
46//!   case the ticket is not considered aggregated, even if eventually an aggregated ticket is delivered
47//! - the responder can fail to aggregate tickets in which case it replies with an error string describing the failure
48//!   reason and it is the requester's responsibility to handle the negative case as well
49//!   - in the absence of response, the requester will time out
50
51/// Coder and decoder for the transport binary protocol layer
52mod codec;
53
54/// Configuration of the protocol components.
55pub mod config;
56/// Errors produced by the crate.
57pub mod errors;
58
59// protocols
60/// `heartbeat` p2p protocol
61pub mod heartbeat;
62/// processor for the protocol
63pub mod processor;
64
65/// Stream processing utilities
66pub mod stream;
67
68pub mod timer;
69use std::collections::HashMap;
70
71use futures::{SinkExt, StreamExt};
72use hopr_async_runtime::spawn_as_abortable;
73use hopr_crypto_types::types::OffchainPublicKey;
74use hopr_db_api::protocol::{HoprDbProtocolOperations, IncomingPacket};
75use hopr_internal_types::{prelude::HoprPseudonym, protocol::Acknowledgement};
76use hopr_network_types::prelude::ResolvedTransportRouting;
77use hopr_transport_bloom::persistent::WrappedTagBloomFilter;
78use hopr_transport_identity::{Multiaddr, PeerId};
79use hopr_transport_packet::prelude::ApplicationData;
80use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
81use tracing::{error, trace, warn};
82
83use crate::processor::{PacketSendFinalizer, PacketUnwrapping, PacketWrapping};
84pub use crate::{processor::DEFAULT_PRICE_PER_PACKET, timer::execute_on_tick};
85
86const HOPR_PACKET_SIZE: usize = hopr_crypto_packet::prelude::HoprPacket::SIZE;
87const SLOW_OP_MS: u128 = 150;
88
89pub type HoprBinaryCodec = crate::codec::FixedLengthCodec<HOPR_PACKET_SIZE>;
90pub const CURRENT_HOPR_MSG_PROTOCOL: &str = "/hopr/mix/1.0.0";
91
92#[cfg(all(feature = "prometheus", not(test)))]
93use hopr_metrics::metrics::{MultiCounter, SimpleCounter};
94
95#[cfg(all(feature = "prometheus", not(test)))]
96lazy_static::lazy_static! {
97    // packet
98    static ref METRIC_PACKET_COUNT: MultiCounter = MultiCounter::new(
99        "hopr_packets_count",
100        "Number of processed packets of different types (sent, received, forwarded)",
101        &["type"]
102    ).unwrap();
103    static ref METRIC_PACKET_COUNT_PER_PEER: MultiCounter = MultiCounter::new(
104        "hopr_packets_per_peer_count",
105        "Number of processed packets to/from distinct peers",
106        &["peer", "direction"]
107    ).unwrap();
108    static ref METRIC_REPLAYED_PACKET_COUNT: SimpleCounter = SimpleCounter::new(
109        "hopr_replayed_packet_count",
110        "The total count of replayed packets during the packet processing pipeline run",
111    ).unwrap();
112    static ref METRIC_REJECTED_TICKETS_COUNT: SimpleCounter =
113        SimpleCounter::new("hopr_rejected_tickets_count", "Number of rejected tickets").unwrap();
114}
115
116#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
117pub enum ProtocolProcesses {
118    #[strum(to_string = "HOPR [msg] - ingress")]
119    MsgIn,
120    #[strum(to_string = "HOPR [msg] - egress")]
121    MsgOut,
122    #[strum(to_string = "HOPR [msg] - mixer")]
123    Mixer,
124    #[strum(to_string = "bloom filter persistence (periodic)")]
125    BloomPersist,
126}
127/// Processed indexer generated events.
128#[derive(Debug, Clone)]
129pub enum PeerDiscovery {
130    Allow(PeerId),
131    Ban(PeerId),
132    Announce(PeerId, Vec<Multiaddr>),
133}
134
135/// Run all processes responsible for handling the msg and acknowledgment protocols.
136///
137/// The pipeline does not handle the mixing itself, that needs to be injected as a separate process
138/// overlayed on top of the `wire_msg` Stream or Sink.
139#[allow(clippy::too_many_arguments)]
140pub async fn run_msg_ack_protocol<Db>(
141    packet_cfg: processor::PacketInteractionConfig,
142    db: Db,
143    bloom_filter_persistent_path: Option<String>,
144    wire_msg: (
145        impl futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + Sync + 'static,
146        impl futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + Sync + 'static,
147    ),
148    api: (
149        impl futures::Sink<(HoprPseudonym, ApplicationData)> + Send + Sync + 'static,
150        impl futures::Stream<Item = (ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)>
151        + Send
152        + Sync
153        + 'static,
154    ),
155) -> HashMap<ProtocolProcesses, hopr_async_runtime::AbortHandle>
156where
157    Db: HoprDbProtocolOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
158{
159    let me = packet_cfg.packet_keypair.clone();
160
161    let mut processes = HashMap::new();
162
163    #[cfg(all(feature = "prometheus", not(test)))]
164    {
165        // Initialize the lazy statics here
166        // lazy_static::initialize(&METRIC_RECEIVED_ACKS);
167        // lazy_static::initialize(&METRIC_SENT_ACKS);
168        // lazy_static::initialize(&METRIC_TICKETS_COUNT);
169        lazy_static::initialize(&METRIC_PACKET_COUNT);
170        lazy_static::initialize(&METRIC_PACKET_COUNT_PER_PEER);
171        lazy_static::initialize(&METRIC_REPLAYED_PACKET_COUNT);
172        lazy_static::initialize(&METRIC_REJECTED_TICKETS_COUNT);
173    }
174
175    let tbf = if let Some(bloom_filter_persistent_path) = bloom_filter_persistent_path {
176        let tbf = WrappedTagBloomFilter::new(bloom_filter_persistent_path);
177        let tbf_2 = tbf.clone();
178        processes.insert(
179            ProtocolProcesses::BloomPersist,
180            spawn_as_abortable(Box::pin(execute_on_tick(
181                std::time::Duration::from_secs(90),
182                move || {
183                    let tbf_clone = tbf_2.clone();
184
185                    async move { tbf_clone.save().await }
186                },
187                "persisting the bloom filter to disk".into(),
188            ))),
189        );
190        tbf
191    } else {
192        WrappedTagBloomFilter::new("no_tbf".into())
193    };
194
195    let msg_processor_read = processor::PacketProcessor::new(db.clone(), packet_cfg);
196    let msg_processor_write = msg_processor_read.clone();
197
198    let msg_to_send_tx = wire_msg.0.clone();
199    processes.insert(
200        ProtocolProcesses::MsgOut,
201        spawn_as_abortable(async move {
202            let _neverending = api
203                .1
204                .then_concurrent(|(data, routing, finalizer)| {
205                    let msg_processor = msg_processor_write.clone();
206
207                    async move {
208                        match PacketWrapping::send(&msg_processor, data, routing).await {
209                            Ok(v) => {
210                                let v: (PeerId, Box<[u8]>) = (v.next_hop.into(), v.data);
211                                #[cfg(all(feature = "prometheus", not(test)))]
212                                {
213                                    METRIC_PACKET_COUNT_PER_PEER.increment(&["out", &v.0.to_string()]);
214                                    METRIC_PACKET_COUNT.increment(&["sent"]);
215                                }
216                                finalizer.finalize(Ok(()));
217                                Some(v)
218                            }
219                            Err(e) => {
220                                finalizer.finalize(Err(e));
221                                None
222                            }
223                        }
224                    }
225                })
226                .filter_map(|v| async move { v })
227                .map(Ok)
228                .forward(msg_to_send_tx)
229                .await;
230        }),
231    );
232
233    let msg_to_send_tx = wire_msg.0.clone();
234    let db_for_recv = db.clone();
235    let me_for_recv = me.clone();
236    processes.insert(
237        ProtocolProcesses::MsgIn,
238        spawn_as_abortable(async move {
239            let _neverending = wire_msg
240                .1
241                .then_concurrent(move |(peer, data)| {
242                    let msg_processor = msg_processor_read.clone();
243                    let db = db_for_recv.clone();
244                    let mut msg_to_send_tx = msg_to_send_tx.clone();
245                    let me = me.clone();
246
247                    async move {
248                        let now = std::time::Instant::now();
249                        let res = msg_processor.recv(&peer, data).await.map_err(move |e| (peer, e));
250                        let elapsed = now.elapsed();
251                        if elapsed.as_millis() > SLOW_OP_MS {
252                            tracing::warn!("msg_processor.recv took {}ms", elapsed.as_millis());
253                        }
254                        if let Err((peer, e)) = &res {
255                            #[cfg(all(feature = "prometheus", not(test)))]
256                            if let hopr_crypto_packet::errors::PacketError::TicketValidation(_) = e {
257                                METRIC_REJECTED_TICKETS_COUNT.increment();
258                            }
259
260                            error!(peer = %peer, error = %e, "Failed to process the received message");
261
262                            let peer: OffchainPublicKey = match peer.try_into() {
263                                Ok(p) => p,
264                                Err(error) => {
265                                    tracing::warn!(%peer, %error, "Dropping packet – cannot convert peer id");
266                                    return None;
267                                }
268                            };
269
270                            // send random signed acknowledgement to give feedback to the sender
271                            let ack = Acknowledgement::random(&me);
272
273                            match db
274                                .to_send_no_ack(ack.as_ref().to_vec().into_boxed_slice(), peer)
275                                .await {
276                                    Ok(ack_packet) => {
277                                        let now = std::time::Instant::now();
278                                        msg_to_send_tx
279                                            .send((
280                                                ack_packet.next_hop.into(),
281                                                ack_packet.data,
282                                            ))
283                                            .await
284                                            .unwrap_or_else(|_e| {
285                                                error!("Failed to forward an acknowledgement for a failed packet recv to the transport layer");
286                                            });
287                                        let elapsed = now.elapsed();
288                                        if elapsed.as_millis() > SLOW_OP_MS {
289                                            tracing::warn!("msg_to_send_tx.send took {}ms", elapsed.as_millis());
290                                        }
291                                    },
292                                    Err(error) => tracing::error!(%error, "Failed to create random ack packet for a failed receive"),
293                                }
294                        }
295
296                        res.ok().flatten()
297                    }
298                })
299                .filter_map(move |maybe_packet| {
300                    let tbf = tbf.clone();
301
302                    async move {
303                    if let Some(packet) = maybe_packet {
304                        match packet {
305                            IncomingPacket::Final { packet_tag, previous_hop,.. }
306                            | IncomingPacket::Forwarded { packet_tag, previous_hop, .. } => {
307                                if tbf.is_tag_replay(&packet_tag).await {
308                                    warn!("replayed packet received from {previous_hop}");
309                                    #[cfg(all(feature = "prometheus", not(test)))]
310                                    METRIC_REPLAYED_PACKET_COUNT.increment();
311
312                                    None
313                                } else {
314                                    Some(packet)
315                                }
316                            }
317                        }
318                    } else {
319                        trace!("received empty packet");
320                        None
321                    }
322                }
323                })
324                .then_concurrent(move |packet| {
325                    let mut msg_to_send_tx = wire_msg.0.clone();
326                    let db = db.clone();
327                    let me = me_for_recv.clone();
328
329                    async move {
330
331                    match packet {
332                        IncomingPacket::Final {
333                            previous_hop,
334                            sender,
335                            plain_text,
336                            ack_key,
337                            ..
338                        } => {
339                            trace!("acknowledging final packet to {previous_hop}");
340                            let ack = Acknowledgement::new(ack_key, &me);
341                            if let Ok(ack_packet) = db
342                                .to_send_no_ack(ack.as_ref().to_vec().into_boxed_slice(), previous_hop)
343                                .await
344                                .inspect_err(|error| tracing::error!(error = %error, "Failed to create ack packet for a received message"))
345                                {
346                                    msg_to_send_tx
347                                        .send((
348                                            ack_packet.next_hop.into(),
349                                            ack_packet.data,
350                                        ))
351                                        .await
352                                        .unwrap_or_else(|_e| {
353                                            error!("Failed to send an acknowledgement for a received packet to the transport layer");
354                                        });
355                                }
356
357                                Some((sender, plain_text))
358                        }
359                        IncomingPacket::Forwarded {
360                            previous_hop,
361                            next_hop,
362                            data,
363                            ack,
364                            ..
365                        } => {
366                            trace!("acknowledging forwarded packet {previous_hop}->{next_hop}");
367                            msg_to_send_tx
368                                .send((
369                                    next_hop.into(),
370                                    data,
371                                ))
372                                .await
373                                .unwrap_or_else(|_e| {
374                                    error!("Failed to forward a packet to the transport layer");
375                                });
376
377                            if let Ok(ack_packet) = db
378                                .to_send_no_ack(ack.as_ref().to_vec().into_boxed_slice(), previous_hop)
379                                .await
380                                .inspect_err(|error| tracing::error!(error = %error, "Failed to create ack packet for a relayed message"))
381                            {
382                                msg_to_send_tx
383                                    .send((
384                                        ack_packet.next_hop.into(),
385                                        ack_packet.data,
386                                    ))
387                                    .await
388                                    .unwrap_or_else(|_e| {
389                                        error!("Failed to send an acknowledgement for a relayed packet to the transport layer");
390                                    });
391                            }
392                            None
393                        }
394                    }
395                }})
396                .filter_map(|maybe_data| async move {
397                    if let Some((sender, data)) = maybe_data {
398                        ApplicationData::from_bytes(data.as_ref())
399                            .inspect_err(|error| tracing::error!(error = %error, "Failed to decode application data"))
400                            .ok()
401                            .map(|data| (sender, data))
402                    } else {
403                        None
404                    }
405                })
406                .map(Ok)
407                .forward(api.0)
408                .await;
409        }),
410    );
411
412    processes
413}