hopr_transport_probe/
ping.rs

1use std::ops::Div;
2
3use futures::{
4    StreamExt,
5    channel::mpsc::{Sender, channel},
6};
7use hopr_api::ct::types::TrafficGenerationError;
8use hopr_async_runtime::prelude::timeout_fut;
9use libp2p_identity::PeerId;
10use tracing::{debug, warn};
11
12use crate::errors::{ProbeError, Result};
13
14/// Heartbeat send ping TX type
15pub type HeartbeatSendPingTx = Sender<(PeerId, PingQueryReplier)>;
16
17/// Configuration for the [`Ping`] mechanism
18#[derive(Debug, Clone, PartialEq, Eq, smart_default::SmartDefault)]
19pub struct PingConfig {
20    /// The timeout duration for an indiviual ping
21    #[default(std::time::Duration::from_secs(30))]
22    pub timeout: std::time::Duration,
23}
24
25/// Ping query result type holding data about the ping duration and the string
26/// containg an optional version information of the pinged peer, if provided.
27pub type PingQueryResult = Result<std::time::Duration>;
28
29/// Helper object allowing to send a ping query as a wrapped channel combination
30/// that can be filled up on the transport part and awaited locally by the `Pinger`.
31#[derive(Debug, Clone)]
32pub struct PingQueryReplier {
33    /// Back channel for notifications, is [`Clone`] to allow caching
34    notifier: Sender<PingQueryResult>,
35}
36
37impl PingQueryReplier {
38    pub fn new(notifier: Sender<PingQueryResult>) -> Self {
39        Self { notifier }
40    }
41
42    /// Mechanism to finalize the ping operation by providing a [`ControlMessage`] received by the
43    /// transport layer.
44    ///
45    /// The resulting timing information about the RTT is halved to provide a unidirectional latency.
46    pub fn notify(mut self, result: PingQueryResult) {
47        let result = result.map(|rtt| rtt.div(2u32));
48
49        if self.notifier.try_send(result).is_err() {
50            warn!("Failed to notify the ping query result due to upper layer ping timeout");
51        }
52    }
53}
54
55/// Implementation of the ping mechanism
56#[derive(Debug, Clone)]
57pub struct Pinger {
58    config: PingConfig,
59    send_ping: HeartbeatSendPingTx,
60}
61
62impl Pinger {
63    pub fn new(config: PingConfig, send_ping: HeartbeatSendPingTx) -> Self {
64        Self { config, send_ping }
65    }
66
67    /// Performs a ping to a single peer.
68    #[tracing::instrument(level = "info", skip(self))]
69    pub async fn ping(&self, peer: PeerId) -> Result<std::time::Duration> {
70        let (tx, mut rx) = channel::<PingQueryResult>(1);
71        let replier = PingQueryReplier::new(tx);
72
73        if let Err(error) = self.send_ping.clone().try_send((peer, replier)) {
74            warn!(%peer, %error, "Failed to initiate a ping request");
75        }
76
77        match timeout_fut(self.config.timeout, rx.next()).await {
78            Ok(Some(Ok(latency))) => {
79                debug!(latency = latency.as_millis(), %peer, "Ping succeeded",);
80                Ok(latency)
81            }
82            Ok(Some(Err(e))) => {
83                let error = if let ProbeError::DecodingError = e {
84                    ProbeError::PingerError(peer, "incorrect pong response".into())
85                } else {
86                    e
87                };
88
89                debug!(%peer, %error, "Ping failed internally",);
90                Err(error)
91            }
92            Ok(None) => {
93                debug!(%peer, "Ping canceled");
94                Err(ProbeError::PingerError(peer, "canceled".into()))
95            }
96            Err(_) => {
97                debug!(%peer, "Ping failed due to timeout");
98                Err(ProbeError::TrafficError(TrafficGenerationError::ProbeNeighborTimeout(
99                    peer,
100                )))
101            }
102        }
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use std::time::Duration;
109
110    use anyhow::anyhow;
111
112    use super::*;
113    use crate::ping::Pinger;
114
115    #[tokio::test]
116    async fn ping_query_replier_should_yield_a_failed_probe() -> anyhow::Result<()> {
117        let (tx, mut rx) = futures::channel::mpsc::channel::<PingQueryResult>(256);
118
119        let replier = PingQueryReplier::new(tx);
120
121        replier.notify(Err(ProbeError::TrafficError(
122            TrafficGenerationError::ProbeNeighborTimeout(PeerId::random()),
123        )));
124
125        assert!(rx.next().await.is_some_and(|r| r.is_err()));
126
127        Ok(())
128    }
129
130    #[tokio::test]
131    async fn ping_query_replier_should_yield_a_successful_probe_as_unidirectional_latency() -> anyhow::Result<()> {
132        const RTT: Duration = Duration::from_millis(100);
133
134        let (tx, mut rx) = futures::channel::mpsc::channel::<PingQueryResult>(256);
135
136        let replier = PingQueryReplier::new(tx);
137
138        replier.notify(Ok(RTT));
139
140        let result = rx.next().await.ok_or(anyhow!("should contain a value"))?;
141
142        assert!(result.is_ok());
143        let result = result?;
144        assert_eq!(result, RTT.div(2));
145
146        Ok(())
147    }
148
149    #[tokio::test]
150    async fn pinger_should_return_an_error_if_the_latency_is_longer_than_the_configure_timeout() -> anyhow::Result<()> {
151        let (tx, mut rx) = futures::channel::mpsc::channel::<(PeerId, PingQueryReplier)>(256);
152
153        let delay = Duration::from_millis(10);
154        let delaying_channel = tokio::task::spawn(async move {
155            while let Some((_peer, replier)) = rx.next().await {
156                tokio::time::sleep(delay).await;
157
158                replier.notify(Ok(delay));
159            }
160        });
161
162        let pinger = Pinger::new(
163            PingConfig {
164                timeout: Duration::from_millis(0),
165            },
166            tx,
167        );
168        assert!(pinger.ping(PeerId::random()).await.is_err());
169
170        delaying_channel.abort();
171
172        Ok(())
173    }
174}