hopr_transport_network/
ping.rs

1use async_stream::stream;
2use async_trait::async_trait;
3use futures::channel::mpsc::UnboundedSender;
4use futures::{Stream, StreamExt};
5use hopr_primitive_types::traits::SaturatingSub;
6use libp2p_identity::PeerId;
7use std::ops::Div;
8
9use tracing::{debug, warn};
10
11use hopr_async_runtime::prelude::timeout_fut;
12use hopr_platform::time::native::current_time;
13
14use crate::errors::{NetworkingError, Result};
15use crate::messaging::ControlMessage;
16
17#[cfg(all(feature = "prometheus", not(test)))]
18use hopr_metrics::metrics::{MultiCounter, SimpleHistogram};
19use hopr_primitive_types::prelude::AsUnixTimestamp;
20
21#[cfg(all(feature = "prometheus", not(test)))]
22lazy_static::lazy_static! {
23    static ref METRIC_TIME_TO_PING: SimpleHistogram =
24        SimpleHistogram::new(
25            "hopr_ping_time_sec",
26            "Measures total time it takes to ping a single node (seconds)",
27            vec![0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 15.0, 30.0],
28        ).unwrap();
29    static ref METRIC_PING_COUNT: MultiCounter = MultiCounter::new(
30            "hopr_heartbeat_pings_count",
31            "Total number of pings by result",
32            &["success"]
33        ).unwrap();
34}
35
36/// Trait for the ping operation itself.
37pub trait Pinging {
38    fn ping(&self, peers: Vec<PeerId>) -> impl Stream<Item = Result<std::time::Duration>>;
39}
40
41/// External behavior that will be triggered once a ping operation result is available
42/// per each pinged peer.
43#[cfg_attr(test, mockall::automock)]
44#[async_trait]
45pub trait PingExternalAPI {
46    async fn on_finished_ping(&self, peer: &PeerId, result: &Result<std::time::Duration>, version: String);
47}
48
49/// Heartbeat send ping TX type
50///
51/// NOTE: UnboundedSender and UnboundedReceiver are bound only by available memory
52/// in case of faster input than output the memory might run out.
53///
54/// The unboundedness relies on the fact that a back pressure mechanism exists on a
55/// higher level of the business logic making sure that only a fixed maximum count
56/// of pings ever enter the queues at any given time.
57pub type HeartbeatSendPingTx = UnboundedSender<(PeerId, PingQueryReplier)>;
58
59/// Configuration for the [`Pinger`] mechanism
60#[derive(Debug, Clone, PartialEq, Eq, smart_default::SmartDefault)]
61pub struct PingConfig {
62    /// The maximum total allowed concurrent heartbeat ping count
63    #[default = 25]
64    pub max_parallel_pings: usize,
65    /// The timeout duration for an indiviual ping
66    #[default(std::time::Duration::from_secs(30))]
67    pub timeout: std::time::Duration, // `Duration` -> should be in millis,
68}
69
70/// Ping query result type holding data about the ping duration and the string
71/// containg an optional version information of the pinged peer, if provided.
72pub type PingQueryResult = Result<(std::time::Duration, String)>;
73
74/// Helper object allowing to send a ping query as a wrapped channel combination
75/// that can be filled up on the transport part and awaited locally by the `Pinger`.
76#[derive(Debug, Clone)]
77pub struct PingQueryReplier {
78    notifier: UnboundedSender<PingQueryResult>,
79    challenge: Box<(u64, ControlMessage)>,
80}
81
82impl PingQueryReplier {
83    pub fn new(notifier: UnboundedSender<PingQueryResult>) -> Self {
84        Self {
85            notifier,
86            challenge: Box::new((
87                current_time().as_unix_timestamp().as_millis() as u64,
88                ControlMessage::generate_ping_request(),
89            )),
90        }
91    }
92
93    /// Return a copy of the challenge for which the reply is expected
94    pub fn challenge(&self) -> ControlMessage {
95        self.challenge.1.clone()
96    }
97
98    /// Mechanism to finalize the ping operation by providing a [`ControlMessage`] received by the
99    /// transport layer.
100    ///
101    /// The resulting timing information about the RTT is halved to provide a unidirectional latency.
102    pub fn notify(self, pong: ControlMessage, version: String) {
103        let timed_result = if ControlMessage::validate_pong_response(&self.challenge.1, &pong).is_ok() {
104            let unidirectional_latency = current_time()
105                .as_unix_timestamp()
106                .saturating_sub(std::time::Duration::from_millis(self.challenge.0))
107                .div(2u32);
108            Ok((unidirectional_latency, version))
109        } else {
110            Err(NetworkingError::DecodingError)
111        };
112
113        if self.notifier.unbounded_send(timed_result).is_err() {
114            warn!("Failed to notify the ping query result due to upper layer ping timeout");
115        }
116    }
117}
118
119/// Timeout-based future that will resolve to the result of the ping operation.
120#[tracing::instrument(level = "trace", skip(sender, timeout))]
121pub fn to_active_ping(
122    peer: PeerId,
123    sender: HeartbeatSendPingTx,
124    timeout: std::time::Duration,
125) -> impl std::future::Future<Output = (PeerId, Result<std::time::Duration>, String)> {
126    let (tx, mut rx) = futures::channel::mpsc::unbounded::<PingQueryResult>();
127    let replier = PingQueryReplier::new(tx);
128
129    if let Err(e) = sender.unbounded_send((peer, replier)) {
130        warn!(%peer, error = %e, "Failed to initiate a ping request");
131    }
132
133    async move {
134        match timeout_fut(timeout, rx.next()).await {
135            Ok(Some(Ok((latency, version)))) => {
136                debug!(latency = latency.as_millis(), %peer, %version, "Ping succeeded",);
137                (peer, Ok(latency), version)
138            }
139            Ok(Some(Err(e))) => {
140                let error = if let NetworkingError::DecodingError = e {
141                    NetworkingError::PingerError(peer, "incorrect pong response".into())
142                } else {
143                    e
144                };
145
146                debug!(%peer, %error, "Ping failed internally",);
147                (peer, Err(error), "unknown".into())
148            }
149            Ok(None) => {
150                debug!(%peer, "Ping canceled");
151                (
152                    peer,
153                    Err(NetworkingError::PingerError(peer, "canceled".into())),
154                    "unknown".into(),
155                )
156            }
157            Err(_) => {
158                debug!(%peer, "Ping failed due to timeout");
159                (peer, Err(NetworkingError::Timeout(timeout.as_secs())), "unknown".into())
160            }
161        }
162    }
163}
164
165/// Implementation of the ping mechanism
166#[derive(Debug, Clone)]
167pub struct Pinger<T>
168where
169    T: PingExternalAPI + Send + Sync,
170{
171    config: PingConfig,
172    send_ping: HeartbeatSendPingTx,
173    recorder: T,
174}
175
176impl<T> Pinger<T>
177where
178    T: PingExternalAPI + Send + Sync,
179{
180    pub fn new(config: PingConfig, send_ping: HeartbeatSendPingTx, recorder: T) -> Self {
181        let config = PingConfig {
182            max_parallel_pings: config.max_parallel_pings,
183            ..config
184        };
185
186        Pinger {
187            config,
188            send_ping,
189            recorder,
190        }
191    }
192
193    pub fn config(&self) -> &PingConfig {
194        &self.config
195    }
196}
197
198impl<T> Pinging for Pinger<T>
199where
200    T: PingExternalAPI + Send + Sync,
201{
202    /// Performs multiple concurrent async pings to the specified peers.
203    ///
204    /// A sliding window mechanism is used to select at most a fixed number of concurrently processed
205    /// peers in order to stabilize the pinging mechanism. Pings that do not fit into that window must
206    /// wait until they can be further processed.
207    ///
208    /// # Arguments
209    ///
210    /// * `peers` - A vector of PeerId objects referencing the peers to be pinged
211    #[tracing::instrument(level = "info", skip(self, peers), fields(peers.count = peers.len()))]
212    fn ping(&self, mut peers: Vec<PeerId>) -> impl Stream<Item = Result<std::time::Duration>> {
213        let start_all_peers = current_time();
214
215        stream! {
216            if !peers.is_empty() {
217                let remainder = peers.split_off(self.config.max_parallel_pings.min(peers.len()));
218                let mut active_pings = peers
219                    .into_iter()
220                    .map(|peer| to_active_ping(peer, self.send_ping.clone(), self.config.timeout))
221                    .collect::<futures::stream::FuturesUnordered<_>>();
222
223                let mut waiting = std::collections::VecDeque::from(remainder);
224
225                while let Some((peer, result, version)) = active_pings.next().await {
226                    self.recorder.on_finished_ping(&peer, &result, version).await;
227
228                    #[cfg(all(feature = "prometheus", not(test)))]
229                    match &result {
230                        Ok(duration) => {
231                            METRIC_TIME_TO_PING.observe((duration.as_millis() as f64) / 1000.0); // precision for seconds
232                            METRIC_PING_COUNT.increment(&["true"]);
233                        }
234                        Err(_) => {
235                            METRIC_PING_COUNT.increment(&["false"]);
236                        }
237                    }
238
239                    if current_time().saturating_sub(start_all_peers) < self.config.timeout {
240                        if let Some(peer) = waiting.pop_front() {
241                            active_pings.push(to_active_ping(peer, self.send_ping.clone(), self.config.timeout));
242                        }
243                    }
244
245                    yield result;
246
247                    if active_pings.is_empty() && waiting.is_empty() {
248                        break;
249                    }
250                }
251            } else {
252                debug!("Received an empty peer list, not pinging any peers");
253            }
254        }
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261    use crate::messaging::ControlMessage;
262    use crate::ping::Pinger;
263    use futures::TryStreamExt;
264    use hopr_primitive_types::traits::SaturatingSub;
265    use mockall::*;
266    use more_asserts::*;
267
268    fn simple_ping_config() -> PingConfig {
269        PingConfig {
270            max_parallel_pings: 2,
271            timeout: std::time::Duration::from_millis(150),
272        }
273    }
274
275    #[async_std::test]
276    async fn ping_query_replier_should_return_ok_result_when_the_pong_is_correct_for_the_challenge(
277    ) -> anyhow::Result<()> {
278        let (tx, mut rx) = futures::channel::mpsc::unbounded::<PingQueryResult>();
279
280        let replier = PingQueryReplier::new(tx);
281        let challenge = replier.challenge.clone();
282
283        replier.notify(
284            ControlMessage::generate_pong_response(&challenge.1)?,
285            "version".to_owned(),
286        );
287
288        assert!(rx.next().await.is_some_and(|r| r.is_ok()));
289
290        Ok(())
291    }
292
293    #[async_std::test]
294    async fn ping_query_replier_should_return_err_result_when_the_pong_is_incorrect_for_the_challenge(
295    ) -> anyhow::Result<()> {
296        let (tx, mut rx) = futures::channel::mpsc::unbounded::<PingQueryResult>();
297
298        let replier = PingQueryReplier::new(tx);
299
300        replier.notify(
301            ControlMessage::generate_pong_response(&ControlMessage::generate_ping_request())?,
302            "version".to_owned(),
303        );
304
305        assert!(rx.next().await.is_some_and(|r| r.is_err()));
306
307        Ok(())
308    }
309
310    #[async_std::test]
311    async fn ping_query_replier_should_return_the_unidirectional_latency() -> anyhow::Result<()> {
312        let (tx, mut rx) = futures::channel::mpsc::unbounded::<PingQueryResult>();
313
314        let replier = PingQueryReplier::new(tx);
315        let challenge = replier.challenge.clone();
316
317        let delay = std::time::Duration::from_millis(10);
318
319        async_std::task::sleep(delay).await;
320        replier.notify(
321            ControlMessage::generate_pong_response(&challenge.1)?,
322            "version".to_owned(),
323        );
324
325        let actual_latency = rx
326            .next()
327            .await
328            .ok_or_else(|| anyhow::anyhow!("should contain a result value"))?
329            .map_err(|_e| anyhow::anyhow!("should contain a result value"))?
330            .0;
331        assert!(actual_latency > delay / 2);
332        assert!(actual_latency < delay);
333
334        Ok(())
335    }
336
337    #[async_std::test]
338    async fn ping_empty_vector_of_peers_should_not_do_any_api_calls() -> anyhow::Result<()> {
339        let (tx, mut rx) = futures::channel::mpsc::unbounded::<(PeerId, PingQueryReplier)>();
340
341        let ideal_channel = async_std::task::spawn(async move {
342            while let Some((_peer, replier)) = rx.next().await {
343                let challenge = replier.challenge.1.clone();
344
345                replier.notify(
346                    ControlMessage::generate_pong_response(&challenge).expect("valid challenge reply"),
347                    "version".to_owned(),
348                );
349            }
350        });
351
352        let mut mock = MockPingExternalAPI::new();
353        mock.expect_on_finished_ping().times(0);
354
355        let pinger = Pinger::new(simple_ping_config(), tx, mock);
356
357        assert!(pinger.ping(vec![]).try_collect::<Vec<_>>().await?.is_empty());
358
359        ideal_channel.cancel().await;
360
361        Ok(())
362    }
363
364    #[async_std::test]
365    async fn test_ping_peers_with_happy_path_should_trigger_the_desired_external_api_calls() -> anyhow::Result<()> {
366        let (tx, mut rx) = futures::channel::mpsc::unbounded::<(PeerId, PingQueryReplier)>();
367
368        let ideal_channel = async_std::task::spawn(async move {
369            while let Some((_peer, replier)) = rx.next().await {
370                let challenge = replier.challenge.1.clone();
371
372                replier.notify(
373                    ControlMessage::generate_pong_response(&challenge).expect("valid challenge reply"),
374                    "version".to_owned(),
375                );
376            }
377        });
378
379        let peer = PeerId::random();
380
381        let mut mock = MockPingExternalAPI::new();
382        mock.expect_on_finished_ping()
383            .times(1)
384            .with(
385                predicate::eq(peer),
386                predicate::function(|x: &Result<std::time::Duration>| x.is_ok()),
387                predicate::eq("version".to_owned()),
388            )
389            .return_const(());
390
391        let pinger = Pinger::new(simple_ping_config(), tx, mock);
392        pinger.ping(vec![peer]).try_collect::<Vec<_>>().await?;
393
394        ideal_channel.cancel().await;
395
396        Ok(())
397    }
398
399    #[async_std::test]
400    async fn test_ping_should_invoke_a_failed_ping_reply_for_an_incorrect_reply() -> anyhow::Result<()> {
401        let (tx, mut rx) = futures::channel::mpsc::unbounded::<(PeerId, PingQueryReplier)>();
402
403        let failing_channel = async_std::task::spawn(async move {
404            while let Some((_peer, replier)) = rx.next().await {
405                replier.notify(
406                    ControlMessage::generate_pong_response(&ControlMessage::generate_ping_request())
407                        .expect("valid challenge reply"),
408                    "version".to_owned(),
409                );
410            }
411        });
412
413        let peer = PeerId::random();
414
415        let mut mock = MockPingExternalAPI::new();
416        mock.expect_on_finished_ping()
417            .times(1)
418            .with(
419                predicate::eq(peer),
420                predicate::function(|x: &Result<std::time::Duration>| x.is_err()),
421                predicate::eq("unknown".to_owned()),
422            )
423            .return_const(());
424
425        let pinger = Pinger::new(simple_ping_config(), tx, mock);
426        assert!(pinger.ping(vec![peer]).try_collect::<Vec<_>>().await.is_err());
427
428        failing_channel.cancel().await;
429
430        Ok(())
431    }
432
433    #[async_std::test]
434    async fn test_ping_peer_returns_error_on_the_pong() -> anyhow::Result<()> {
435        let (tx, mut rx) = futures::channel::mpsc::unbounded::<(PeerId, PingQueryReplier)>();
436
437        let delay = std::time::Duration::from_millis(10);
438        let delaying_channel = async_std::task::spawn(async move {
439            while let Some((_peer, replier)) = rx.next().await {
440                let challenge = replier.challenge.1.clone();
441
442                async_std::task::sleep(delay).await;
443                replier.notify(
444                    ControlMessage::generate_pong_response(&challenge).expect("valid challenge reply"),
445                    "version".to_owned(),
446                );
447            }
448        });
449
450        let peer = PeerId::random();
451        let ping_config = PingConfig {
452            timeout: std::time::Duration::from_millis(0),
453            ..simple_ping_config()
454        };
455
456        let mut mock = MockPingExternalAPI::new();
457        mock.expect_on_finished_ping()
458            .times(1)
459            .with(
460                predicate::eq(peer),
461                predicate::function(|x: &Result<std::time::Duration>| x.is_err()),
462                predicate::eq("unknown".to_owned()),
463            )
464            .return_const(());
465
466        let pinger = Pinger::new(ping_config, tx, mock);
467        assert!(pinger.ping(vec![peer]).try_collect::<Vec<_>>().await.is_err());
468
469        delaying_channel.cancel().await;
470
471        Ok(())
472    }
473
474    #[async_std::test]
475    async fn test_ping_peers_multiple_peers_are_pinged_in_parallel() -> anyhow::Result<()> {
476        let (tx, mut rx) = futures::channel::mpsc::unbounded::<(PeerId, PingQueryReplier)>();
477
478        let ideal_channel = async_std::task::spawn(async move {
479            while let Some((_peer, replier)) = rx.next().await {
480                let challenge = replier.challenge.1.clone();
481
482                replier.notify(
483                    ControlMessage::generate_pong_response(&challenge).expect("valid challenge reply"),
484                    "version".to_owned(),
485                );
486            }
487        });
488
489        let peers = vec![PeerId::random(), PeerId::random()];
490
491        let mut mock = MockPingExternalAPI::new();
492        mock.expect_on_finished_ping()
493            .times(1)
494            .with(
495                predicate::eq(peers[0]),
496                predicate::function(|x: &Result<std::time::Duration>| x.is_ok()),
497                predicate::eq("version".to_owned()),
498            )
499            .return_const(());
500        mock.expect_on_finished_ping()
501            .times(1)
502            .with(
503                predicate::eq(peers[1]),
504                predicate::function(|x: &Result<std::time::Duration>| x.is_ok()),
505                predicate::eq("version".to_owned()),
506            )
507            .return_const(());
508
509        let pinger = Pinger::new(simple_ping_config(), tx, mock);
510        pinger.ping(peers).try_collect::<Vec<_>>().await?;
511
512        ideal_channel.cancel().await;
513
514        Ok(())
515    }
516
517    #[async_std::test]
518    async fn test_ping_peers_should_ping_parallel_only_a_limited_number_of_peers() -> anyhow::Result<()> {
519        let (tx, mut rx) = futures::channel::mpsc::unbounded::<(PeerId, PingQueryReplier)>();
520
521        let delay = 10u64;
522
523        let ideal_delaying_channel = async_std::task::spawn(async move {
524            while let Some((_peer, replier)) = rx.next().await {
525                let challenge = replier.challenge.1.clone();
526
527                async_std::task::sleep(std::time::Duration::from_millis(delay)).await;
528                replier.notify(
529                    ControlMessage::generate_pong_response(&challenge).expect("valid challenge reply"),
530                    "version".to_owned(),
531                );
532            }
533        });
534
535        let peers = vec![PeerId::random(), PeerId::random()];
536
537        let mut mock = MockPingExternalAPI::new();
538        mock.expect_on_finished_ping()
539            .times(1)
540            .with(
541                predicate::eq(peers[0]),
542                predicate::function(|x: &Result<std::time::Duration>| x.is_ok()),
543                predicate::eq("version".to_owned()),
544            )
545            .return_const(());
546        mock.expect_on_finished_ping()
547            .times(1)
548            .with(
549                predicate::eq(peers[1]),
550                predicate::function(|x: &Result<std::time::Duration>| x.is_ok()),
551                predicate::eq("version".to_owned()),
552            )
553            .return_const(());
554
555        let pinger = Pinger::new(
556            PingConfig {
557                max_parallel_pings: 1,
558                ..simple_ping_config()
559            },
560            tx,
561            mock,
562        );
563
564        let start = current_time();
565        pinger.ping(peers).try_collect::<Vec<_>>().await?;
566        let end = current_time();
567
568        assert_ge!(end.saturating_sub(start), std::time::Duration::from_millis(delay));
569
570        ideal_delaying_channel.cancel().await;
571
572        Ok(())
573    }
574}