Skip to main content

hopr_transport_probe/
ping.rs

1use std::ops::Div;
2
3use futures::{
4    StreamExt,
5    channel::mpsc::{Sender, channel},
6};
7use hopr_api::{OffchainPublicKey, graph::NetworkGraphError};
8use hopr_utils::runtime::prelude::timeout_fut;
9use tracing::{debug, warn};
10
11use crate::errors::{ProbeError, Result};
12
13/// Heartbeat send ping TX type
14pub type HeartbeatSendPingTx = Sender<(OffchainPublicKey, PingQueryReplier)>;
15
16/// Configuration for the `Ping` mechanism
17#[derive(Debug, Clone, PartialEq, Eq, smart_default::SmartDefault)]
18pub struct PingConfig {
19    /// The timeout duration for an indiviual ping
20    #[default(std::time::Duration::from_secs(30))]
21    pub timeout: std::time::Duration,
22}
23
24/// Ping query result type holding data about the ping duration and the string
25/// containg an optional version information of the pinged peer, if provided.
26pub type PingQueryResult = std::result::Result<std::time::Duration, ()>;
27
28/// Helper object allowing to send a ping query as a wrapped channel combination
29/// that can be filled up on the transport part and awaited locally by the `Pinger`.
30#[derive(Debug, Clone)]
31pub struct PingQueryReplier {
32    /// Back channel for notifications, is [`Clone`] to allow caching
33    notifier: Sender<PingQueryResult>,
34}
35
36impl PingQueryReplier {
37    pub fn new(notifier: Sender<PingQueryResult>) -> Self {
38        Self { notifier }
39    }
40
41    /// Mechanism to finalize the ping operation by providing a `ControlMessage` received by the
42    /// transport layer.
43    ///
44    /// The resulting timing information about the RTT is halved to provide a unidirectional latency.
45    pub fn notify(mut self, result: PingQueryResult) {
46        let result = result.map(|rtt| rtt.div(2u32));
47
48        if self.notifier.try_send(result).is_err() {
49            warn!("Failed to notify the ping query result due to upper layer ping timeout");
50        }
51    }
52}
53
54/// Implementation of the ping mechanism
55#[derive(Debug, Clone)]
56pub struct Pinger {
57    config: PingConfig,
58    send_ping: HeartbeatSendPingTx,
59}
60
61impl Pinger {
62    pub fn new(config: PingConfig, send_ping: HeartbeatSendPingTx) -> Self {
63        Self { config, send_ping }
64    }
65
66    /// Performs a ping to a single peer.
67    #[tracing::instrument(level = "info", skip(self))]
68    pub async fn ping(&self, peer: &OffchainPublicKey) -> Result<std::time::Duration> {
69        let (tx, mut rx) = channel::<PingQueryResult>(1);
70        let replier = PingQueryReplier::new(tx);
71
72        if let Err(error) = self.send_ping.clone().try_send((*peer, replier)) {
73            warn!(%peer, %error, "Failed to initiate a ping request");
74        }
75
76        match timeout_fut(self.config.timeout, rx.next()).await {
77            Ok(Some(Ok(latency))) => {
78                debug!(?latency, %peer, "Ping succeeded",);
79                Ok(latency)
80            }
81            Ok(Some(Err(_))) => {
82                debug!(%peer, "Ping failed internally",);
83                Err(ProbeError::PingerError(format!("could not successfully ping: {peer}")))
84            }
85            Ok(None) => {
86                debug!(%peer, "Ping canceled");
87                Err(ProbeError::PingerError("canceled".into()))
88            }
89            Err(_) => {
90                debug!(%peer, "ping failed due to timeout");
91                Err(ProbeError::TrafficError(NetworkGraphError::ProbeNeighborTimeout(
92                    Box::new(*peer),
93                )))
94            }
95        }
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use std::time::Duration;
102
103    use anyhow::anyhow;
104    use hex_literal::hex;
105
106    use super::*;
107    use crate::ping::Pinger;
108
109    const SECRET_0: [u8; 32] = hex!("60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d");
110
111    #[tokio::test]
112    async fn ping_query_replier_should_yield_a_failed_probe() -> anyhow::Result<()> {
113        let (tx, mut rx) = futures::channel::mpsc::channel::<PingQueryResult>(256);
114
115        let replier = PingQueryReplier::new(tx);
116
117        replier.notify(Err(()));
118
119        assert!(rx.next().await.is_some_and(|r| r.is_err()));
120
121        Ok(())
122    }
123
124    #[tokio::test]
125    async fn ping_query_replier_should_yield_a_successful_probe_as_unidirectional_latency() -> anyhow::Result<()> {
126        const RTT: Duration = Duration::from_millis(100);
127
128        let (tx, mut rx) = futures::channel::mpsc::channel::<PingQueryResult>(256);
129
130        let replier = PingQueryReplier::new(tx);
131
132        replier.notify(Ok(RTT));
133
134        let result = rx.next().await.ok_or(anyhow!("should contain a value"))?;
135
136        assert!(result.is_ok());
137        let result = result.map_err(|_| anyhow!("should succeed"))?;
138        assert_eq!(result, RTT.div(2));
139
140        Ok(())
141    }
142
143    #[tokio::test]
144    async fn pinger_should_return_an_error_if_the_latency_is_longer_than_the_configure_timeout() -> anyhow::Result<()> {
145        let (tx, mut rx) = futures::channel::mpsc::channel::<(OffchainPublicKey, PingQueryReplier)>(256);
146
147        let delay = Duration::from_millis(10);
148        let delaying_channel = tokio::task::spawn(async move {
149            while let Some((_peer, replier)) = rx.next().await {
150                tokio::time::sleep(delay).await;
151
152                replier.notify(Ok(delay));
153            }
154        });
155
156        let pinger = Pinger::new(
157            PingConfig {
158                timeout: Duration::from_millis(0),
159            },
160            tx,
161        );
162        assert!(pinger.ping(&OffchainPublicKey::from_privkey(&SECRET_0)?).await.is_err());
163
164        delaying_channel.abort();
165
166        Ok(())
167    }
168}