hopr_transport_probe/
ping.rs

1use std::ops::Div;
2
3use futures::{
4    StreamExt,
5    channel::mpsc::{UnboundedSender, unbounded},
6};
7use hopr_async_runtime::prelude::timeout_fut;
8use libp2p_identity::PeerId;
9use tracing::{debug, warn};
10
11use crate::errors::{ProbeError, Result};
12
13/// Heartbeat send ping TX type
14///
15/// NOTE: UnboundedSender and UnboundedReceiver are bound only by available memory
16/// in case of faster input than output the memory might run out.
17///
18/// The unboundedness relies on the fact that a back pressure mechanism exists on a
19/// higher level of the business logic making sure that only a fixed maximum count
20/// of pings ever enter the queues at any given time.
21pub type HeartbeatSendPingTx = UnboundedSender<(PeerId, PingQueryReplier)>;
22
23/// Configuration for the [`Ping`] mechanism
24#[derive(Debug, Clone, PartialEq, Eq, smart_default::SmartDefault)]
25pub struct PingConfig {
26    /// The timeout duration for an indiviual ping
27    #[default(std::time::Duration::from_secs(30))]
28    pub timeout: std::time::Duration,
29}
30
31/// Ping query result type holding data about the ping duration and the string
32/// containg an optional version information of the pinged peer, if provided.
33pub type PingQueryResult = Result<std::time::Duration>;
34
35/// Helper object allowing to send a ping query as a wrapped channel combination
36/// that can be filled up on the transport part and awaited locally by the `Pinger`.
37#[derive(Debug, Clone)]
38pub struct PingQueryReplier {
39    /// Back channel for notifications, is [`Clone`] to allow caching
40    notifier: UnboundedSender<PingQueryResult>,
41}
42
43impl PingQueryReplier {
44    pub fn new(notifier: UnboundedSender<PingQueryResult>) -> Self {
45        Self { notifier }
46    }
47
48    /// Mechanism to finalize the ping operation by providing a [`ControlMessage`] received by the
49    /// transport layer.
50    ///
51    /// The resulting timing information about the RTT is halved to provide a unidirectional latency.
52    pub fn notify(self, result: PingQueryResult) {
53        let result = result.map(|rtt| rtt.div(2u32));
54
55        if self.notifier.unbounded_send(result).is_err() {
56            warn!("Failed to notify the ping query result due to upper layer ping timeout");
57        }
58    }
59}
60
61/// Implementation of the ping mechanism
62#[derive(Debug, Clone)]
63pub struct Pinger {
64    config: PingConfig,
65    send_ping: HeartbeatSendPingTx,
66}
67
68impl Pinger {
69    pub fn new(config: PingConfig, send_ping: HeartbeatSendPingTx) -> Self {
70        Self { config, send_ping }
71    }
72
73    /// Performs a ping to a single peer.
74    #[tracing::instrument(level = "info", skip(self))]
75    pub async fn ping(&self, peer: PeerId) -> Result<std::time::Duration> {
76        let (tx, mut rx) = unbounded::<PingQueryResult>();
77        let replier = PingQueryReplier::new(tx);
78
79        if let Err(error) = self.send_ping.clone().unbounded_send((peer, replier)) {
80            warn!(%peer, %error, "Failed to initiate a ping request");
81        }
82
83        match timeout_fut(self.config.timeout, rx.next()).await {
84            Ok(Some(Ok(latency))) => {
85                debug!(latency = latency.as_millis(), %peer, "Ping succeeded",);
86                Ok(latency)
87            }
88            Ok(Some(Err(e))) => {
89                let error = if let ProbeError::DecodingError = e {
90                    ProbeError::PingerError(peer, "incorrect pong response".into())
91                } else {
92                    e
93                };
94
95                debug!(%peer, %error, "Ping failed internally",);
96                Err(error)
97            }
98            Ok(None) => {
99                debug!(%peer, "Ping canceled");
100                Err(ProbeError::PingerError(peer, "canceled".into()))
101            }
102            Err(_) => {
103                debug!(%peer, "Ping failed due to timeout");
104                Err(ProbeError::Timeout(self.config.timeout.as_secs()))
105            }
106        }
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use std::time::Duration;
113
114    use anyhow::anyhow;
115
116    use super::*;
117    use crate::ping::Pinger;
118
119    #[tokio::test]
120    async fn ping_query_replier_should_yield_a_failed_probe() -> anyhow::Result<()> {
121        let (tx, mut rx) = futures::channel::mpsc::unbounded::<PingQueryResult>();
122
123        let replier = PingQueryReplier::new(tx);
124
125        replier.notify(Err(ProbeError::Timeout(4u64)));
126
127        assert!(rx.next().await.is_some_and(|r| r.is_err()));
128
129        Ok(())
130    }
131
132    #[tokio::test]
133    async fn ping_query_replier_should_yield_a_successful_probe_as_unidirectional_latency() -> anyhow::Result<()> {
134        const RTT: Duration = Duration::from_millis(100);
135
136        let (tx, mut rx) = futures::channel::mpsc::unbounded::<PingQueryResult>();
137
138        let replier = PingQueryReplier::new(tx);
139
140        replier.notify(Ok(RTT));
141
142        let result = rx.next().await.ok_or(anyhow!("should contain a value"))?;
143
144        assert!(result.is_ok());
145        let result = result?;
146        assert_eq!(result, RTT.div(2));
147
148        Ok(())
149    }
150
151    #[tokio::test]
152    async fn pinger_should_return_an_error_if_the_latency_is_longer_than_the_configure_timeout() -> anyhow::Result<()> {
153        let (tx, mut rx) = futures::channel::mpsc::unbounded::<(PeerId, PingQueryReplier)>();
154
155        let delay = Duration::from_millis(10);
156        let delaying_channel = tokio::task::spawn(async move {
157            while let Some((_peer, replier)) = rx.next().await {
158                tokio::time::sleep(delay).await;
159
160                replier.notify(Ok(delay));
161            }
162        });
163
164        let pinger = Pinger::new(
165            PingConfig {
166                timeout: Duration::from_millis(0),
167            },
168            tx,
169        );
170        assert!(pinger.ping(PeerId::random()).await.is_err());
171
172        delaying_channel.abort();
173
174        Ok(())
175    }
176}