hopr_transport_protocol/
lib.rs

1//! Collection of objects and functionality allowing building of p2p or stream protocols for the higher business logic layers.
2//!
3//! ## Contents
4//!
5//! Supported protocol configurations:
6//!
7//! - `msg`
8//! - `ack`
9//! - `heartbeat`
10//! - `ticket_aggregation`
11//!
12//! Supported protocol processors:
13//!
14//! - `ticket_aggregation`
15//!
16//! ### `ticket_aggregation`
17//!
18//! Ticket aggregation processing mechanism is responsible for ingesting the ticket aggregation related requests:
19//!
20//! - `Receive(PeerId, U)`,
21//! - `Reply(PeerId, std::result::Result<Ticket, String>, T)`,
22//! - `Send(PeerId, Vec<AcknowledgedTicket>, TicketAggregationFinalizer)`,
23//!
24//! where `U` is the type of an aggregated ticket extractable (`ResponseChannel<Result<Ticket, String>>`) and `T` represents a network negotiated identifier (`RequestId`).
25//!
26//! In broader context the protocol flow is as follows:
27//!
28//! 1. requesting ticket aggregation
29//!
30//!    - the peer A desires to aggregate tickets, collects the tickets into a data collection and sends a request containing the collection to aggregate `Vec<AcknowledgedTicket>` to peer B using the `Send` mechanism
31//!
32//! 2. responding to ticket aggregation
33//!
34//!    - peer B obtains the request from peer A, performs the ticket aggregation and returns a result of that operation in the form of `std::result::Result<Ticket, String>` using the `Reply` mechanism
35//!
36//! 3. accepting the aggregated ticket
37//!    - peer A receives the aggregated ticket using the `Receive` mechanism
38//!
39//! Furthermore, apart from the basic positive case scenario, standard mechanics of protocol communication apply:
40//!
41//! - the requesting side can time out, if the responding side takes too long to provide an aggregated ticket, in which case the ticket is not considered aggregated, even if eventually an aggregated ticket is delivered
42//! - the responder can fail to aggregate tickets in which case it replies with an error string describing the failure reason and it is the requester's responsibility to handle the negative case as well
43//!   - in the absence of response, the requester will time out
44//!
45
46/// Configuration of the protocol components.
47pub mod config;
48/// Errors produced by the crate.
49pub mod errors;
50
51/// Bloom filter for the transport layer.
52pub mod bloom;
53// protocols
54/// `ack` p2p protocol
55pub mod ack;
56/// `heartbeat` p2p protocol
57pub mod heartbeat;
58/// `msg` p2p protocol
59pub mod msg;
60/// `ticket_aggregation` p2p protocol
61pub mod ticket_aggregation;
62
63/// Stream processing utilities
64pub mod stream;
65
66pub mod timer;
67use hopr_transport_identity::Multiaddr;
68pub use timer::execute_on_tick;
69
70use futures::{SinkExt, StreamExt};
71use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
72use std::collections::HashMap;
73use tracing::error;
74
75use hopr_async_runtime::prelude::spawn;
76use hopr_db_api::protocol::HoprDbProtocolOperations;
77use hopr_internal_types::protocol::{Acknowledgement, ApplicationData};
78use hopr_path::path::TransportPath;
79use hopr_transport_identity::PeerId;
80
81pub use msg::processor::DEFAULT_PRICE_PER_PACKET;
82use msg::processor::{PacketSendFinalizer, PacketUnwrapping, PacketWrapping};
83
84#[cfg(all(feature = "prometheus", not(test)))]
85use hopr_metrics::metrics::{MultiCounter, SimpleCounter};
86
87#[cfg(all(feature = "prometheus", not(test)))]
88lazy_static::lazy_static! {
89    // acknowledgement
90    static ref METRIC_RECEIVED_ACKS: MultiCounter = MultiCounter::new(
91        "hopr_received_ack_count",
92        "Number of received acknowledgements",
93        &["valid"]
94    )
95    .unwrap();
96    static ref METRIC_SENT_ACKS: SimpleCounter =
97        SimpleCounter::new("hopr_sent_acks_count", "Number of sent message acknowledgements").unwrap();
98    static ref METRIC_TICKETS_COUNT: MultiCounter =
99        MultiCounter::new("hopr_tickets_count", "Number of winning tickets", &["type"]).unwrap();
100    // packet
101    static ref METRIC_PACKET_COUNT: MultiCounter = MultiCounter::new(
102        "hopr_packets_count",
103        "Number of processed packets of different types (sent, received, forwarded)",
104        &["type"]
105    ).unwrap();
106    static ref METRIC_PACKET_COUNT_PER_PEER: MultiCounter = MultiCounter::new(
107        "hopr_packets_per_peer_count",
108        "Number of processed packets to/from distinct peers",
109        &["peer", "direction"]
110    ).unwrap();
111    static ref METRIC_REPLAYED_PACKET_COUNT: SimpleCounter = SimpleCounter::new(
112        "hopr_replayed_packet_count",
113        "The total count of replayed packets during the packet processing pipeline run",
114    ).unwrap();
115    static ref METRIC_REJECTED_TICKETS_COUNT: SimpleCounter =
116        SimpleCounter::new("hopr_rejected_tickets_count", "Number of rejected tickets").unwrap();
117}
118
119#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
120pub enum ProtocolProcesses {
121    #[strum(to_string = "HOPR [ack] - ingress")]
122    AckIn,
123    #[strum(to_string = "HOPR [ack] - egress")]
124    AckOut,
125    #[strum(to_string = "HOPR [msg] - ingress")]
126    MsgIn,
127    #[strum(to_string = "HOPR [msg] - egress")]
128    MsgOut,
129    #[strum(to_string = "HOPR [msg] - mixer")]
130    Mixer,
131    #[strum(to_string = "bloom filter persistence (periodic)")]
132    BloomPersist,
133}
134/// Processed indexer generated events.
135#[derive(Debug, Clone)]
136pub enum PeerDiscovery {
137    Allow(PeerId),
138    Ban(PeerId),
139    Announce(PeerId, Vec<Multiaddr>),
140}
141
142/// Run all processes responsible for handling the msg and acknowledgment protocols.
143///
144/// The pipeline does not handle the mixing itself, that needs to be injected as a separate process
145/// overlayed on top of the `wire_msg` Stream or Sink.
146#[allow(clippy::too_many_arguments)]
147pub async fn run_msg_ack_protocol<Db>(
148    packet_cfg: msg::processor::PacketInteractionConfig,
149    db: Db,
150    bloom_filter_persistent_path: Option<String>,
151    wire_ack: (
152        impl futures::Sink<(PeerId, Acknowledgement)> + Send + Sync + 'static,
153        impl futures::Stream<Item = (PeerId, Acknowledgement)> + Send + Sync + 'static,
154    ),
155    wire_msg: (
156        impl futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + Sync + 'static,
157        impl futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + Sync + 'static,
158    ),
159    api: (
160        impl futures::Sink<ApplicationData> + Send + Sync + 'static,
161        impl futures::Stream<Item = (ApplicationData, TransportPath, PacketSendFinalizer)> + Send + Sync + 'static,
162    ),
163) -> HashMap<ProtocolProcesses, hopr_async_runtime::prelude::JoinHandle<()>>
164where
165    Db: HoprDbProtocolOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
166{
167    let me = packet_cfg.packet_keypair.clone();
168    let me_onchain = &packet_cfg.chain_keypair.clone();
169
170    let mut processes = HashMap::new();
171
172    #[cfg(all(feature = "prometheus", not(test)))]
173    {
174        // Initialize the lazy statics here
175        lazy_static::initialize(&METRIC_RECEIVED_ACKS);
176        lazy_static::initialize(&METRIC_SENT_ACKS);
177        lazy_static::initialize(&METRIC_TICKETS_COUNT);
178        lazy_static::initialize(&METRIC_PACKET_COUNT);
179        lazy_static::initialize(&METRIC_PACKET_COUNT_PER_PEER);
180        lazy_static::initialize(&METRIC_REPLAYED_PACKET_COUNT);
181        lazy_static::initialize(&METRIC_REJECTED_TICKETS_COUNT);
182    }
183
184    let tbf = if let Some(bloom_filter_persistent_path) = bloom_filter_persistent_path {
185        let tbf = bloom::WrappedTagBloomFilter::new(bloom_filter_persistent_path);
186        let tbf_2 = tbf.clone();
187        processes.insert(
188            ProtocolProcesses::BloomPersist,
189            spawn(Box::pin(execute_on_tick(
190                std::time::Duration::from_secs(90),
191                move || {
192                    let tbf_clone = tbf_2.clone();
193
194                    async move { tbf_clone.save().await }
195                },
196                "persisting the bloom filter to disk".into(),
197            ))),
198        );
199        tbf
200    } else {
201        bloom::WrappedTagBloomFilter::new("no_tbf".into())
202    };
203
204    let ack_processor_read = ack::processor::AcknowledgementProcessor::new(db.clone(), me_onchain);
205    let ack_processor_write = ack_processor_read.clone();
206    let msg_processor_read = msg::processor::PacketProcessor::new(db.clone(), tbf, packet_cfg);
207    let msg_processor_write = msg_processor_read.clone();
208
209    processes.insert(
210        ProtocolProcesses::AckIn,
211        spawn(async move {
212            let _neverending = wire_ack
213                .1
214                .for_each_concurrent(None, move |(peer, ack)| {
215                    let ack_processor = ack_processor_read.clone();
216
217                    async move {
218                        let _ack_result = ack_processor.recv(&peer, ack).await;
219                        #[cfg(all(feature = "prometheus", not(test)))]
220                        match &_ack_result {
221                            Ok(hopr_db_api::prelude::AckResult::Sender(_)) => {
222                                METRIC_RECEIVED_ACKS.increment(&["true"]);
223                            }
224                            Ok(hopr_db_api::prelude::AckResult::RelayerWinning(_)) => {
225                                METRIC_RECEIVED_ACKS.increment(&["true"]);
226                                METRIC_TICKETS_COUNT.increment(&["winning"]);
227                            }
228                            Ok(hopr_db_api::prelude::AckResult::RelayerLosing) => {
229                                METRIC_RECEIVED_ACKS.increment(&["true"]);
230                                METRIC_TICKETS_COUNT.increment(&["losing"]);
231                            }
232                            Err(_) => {
233                                METRIC_RECEIVED_ACKS.increment(&["false"]);
234                            }
235                        }
236                    }
237                })
238                .await;
239        }),
240    );
241
242    let (internal_ack_send, internal_ack_rx) = futures::channel::mpsc::unbounded::<(PeerId, Acknowledgement)>();
243
244    processes.insert(
245        ProtocolProcesses::AckOut,
246        spawn(async move {
247            let _neverending = internal_ack_rx
248                .then_concurrent(move |(peer, ack)| {
249                    let ack_processor = ack_processor_write.clone();
250
251                    #[cfg(all(feature = "prometheus", not(test)))]
252                    METRIC_SENT_ACKS.increment();
253
254                    async move { (peer, ack_processor.send(&peer, ack).await) }
255                })
256                .map(Ok)
257                .forward(wire_ack.0)
258                .await;
259        }),
260    );
261
262    let msg_to_send_tx = wire_msg.0.clone();
263    processes.insert(
264        ProtocolProcesses::MsgOut,
265        spawn(async move {
266            let _neverending = api
267                .1
268                .then_concurrent(|(data, path, finalizer)| {
269                    let msg_processor = msg_processor_write.clone();
270
271                    async move {
272                        match PacketWrapping::send(&msg_processor, data, path).await {
273                            Ok(v) => {
274                                #[cfg(all(feature = "prometheus", not(test)))]
275                                {
276                                    METRIC_PACKET_COUNT_PER_PEER.increment(&["out", &v.0.to_string()]);
277                                    METRIC_PACKET_COUNT.increment(&["sent"]);
278                                }
279                                finalizer.finalize(Ok(()));
280                                Some(v)
281                            }
282                            Err(e) => {
283                                finalizer.finalize(Err(e));
284                                None
285                            }
286                        }
287                    }
288                })
289                .filter_map(|v| async move { v })
290                .map(Ok)
291                .forward(msg_to_send_tx)
292                .await;
293        }),
294    );
295
296    let me = me.clone();
297    processes.insert(
298        ProtocolProcesses::MsgIn,
299        spawn(async move {
300            let _neverending = wire_msg
301                .1
302                .then_concurrent(move |(peer, data)| {
303                    let msg_processor = msg_processor_read.clone();
304
305                    async move { msg_processor.recv(&peer, data).await.map_err(|e| (peer, e)) }
306                })
307                .filter_map(move |v| {
308                    let mut internal_ack_send = internal_ack_send.clone();
309                    let mut msg_to_send_tx = wire_msg.0.clone();
310                    let me = me.clone();
311
312                    async move {
313                        match v {
314                            Ok(v) => match v {
315                                msg::processor::RecvOperation::Receive { data, ack } => {
316                                    #[cfg(all(feature = "prometheus", not(test)))]
317                                    {
318                                        METRIC_PACKET_COUNT_PER_PEER.increment(&["in", &ack.peer.to_string()]);
319                                        METRIC_PACKET_COUNT.increment(&["received"]);
320                                    }
321                                    internal_ack_send.send((ack.peer, ack.ack)).await.unwrap_or_else(|e| {
322                                        error!(error = %e, "Failed to forward an acknowledgement to the transport layer");
323                                    });
324                                    Some(data)
325                                }
326                                msg::processor::RecvOperation::Forward { msg, ack } => {
327                                    #[cfg(all(feature = "prometheus", not(test)))]
328                                    {
329                                        METRIC_PACKET_COUNT_PER_PEER.increment(&["in", &ack.peer.to_string()]);
330                                        METRIC_PACKET_COUNT_PER_PEER.increment(&["out", &msg.peer.to_string()]);
331                                        METRIC_PACKET_COUNT.increment(&["forwarded"]);
332                                    }
333
334                                    msg_to_send_tx.send((msg.peer, msg.data)).await.unwrap_or_else(|_e| {
335                                        error!("Failed to forward a message to the transport layer");
336                                    });
337                                    internal_ack_send.send((ack.peer, ack.ack)).await.unwrap_or_else(|e| {
338                                        error!(error = %e, "Failed to forward an acknowledgement to the transport layer");
339                                    });
340                                    None
341                                }
342                            },
343                            Err((peer, e)) => {
344                                #[cfg(all(feature = "prometheus", not(test)))]
345                                match e {
346                                    hopr_crypto_packet::errors::PacketError::TagReplay => {
347                                        METRIC_REPLAYED_PACKET_COUNT.increment();
348                                    },
349                                    hopr_crypto_packet::errors::PacketError::TicketValidation(_) => {
350                                        METRIC_REJECTED_TICKETS_COUNT.increment();
351                                    },
352                                    _ => {}
353                                }
354
355                                error!(peer = %peer, error = %e, "Failed to process the received message");
356                                // send random signed acknowledgement to give feedback to the sender
357                                internal_ack_send
358                                    .send((
359                                        peer,
360                                        Acknowledgement::random(&me),
361                                    ))
362                                    .await
363                                    .unwrap_or_else(|e| {
364                                        error!(error = %e, "Failed to forward an acknowledgement for a failed packet recv to the transport layer");
365                                    });
366
367                                None
368                            }
369                        }
370                    }
371                })
372                .map(Ok)
373                .forward(api.0)
374                .await;
375        }),
376    );
377
378    processes
379}