hopr_transport_probe/
ping.rs1use std::ops::Div;
2
3use futures::{
4 StreamExt,
5 channel::mpsc::{Sender, channel},
6};
7use hopr_async_runtime::prelude::timeout_fut;
8use libp2p_identity::PeerId;
9use tracing::{debug, warn};
10
11use crate::errors::{ProbeError, Result};
12
13pub type HeartbeatSendPingTx = Sender<(PeerId, PingQueryReplier)>;
15
16#[derive(Debug, Clone, PartialEq, Eq, smart_default::SmartDefault)]
18pub struct PingConfig {
19 #[default(std::time::Duration::from_secs(30))]
21 pub timeout: std::time::Duration,
22}
23
24pub type PingQueryResult = Result<std::time::Duration>;
27
28#[derive(Debug, Clone)]
31pub struct PingQueryReplier {
32 notifier: Sender<PingQueryResult>,
34}
35
36impl PingQueryReplier {
37 pub fn new(notifier: Sender<PingQueryResult>) -> Self {
38 Self { notifier }
39 }
40
41 pub fn notify(mut self, result: PingQueryResult) {
46 let result = result.map(|rtt| rtt.div(2u32));
47
48 if self.notifier.try_send(result).is_err() {
49 warn!("Failed to notify the ping query result due to upper layer ping timeout");
50 }
51 }
52}
53
54#[derive(Debug, Clone)]
56pub struct Pinger {
57 config: PingConfig,
58 send_ping: HeartbeatSendPingTx,
59}
60
61impl Pinger {
62 pub fn new(config: PingConfig, send_ping: HeartbeatSendPingTx) -> Self {
63 Self { config, send_ping }
64 }
65
66 #[tracing::instrument(level = "info", skip(self))]
68 pub async fn ping(&self, peer: PeerId) -> Result<std::time::Duration> {
69 let (tx, mut rx) = channel::<PingQueryResult>(1);
70 let replier = PingQueryReplier::new(tx);
71
72 if let Err(error) = self.send_ping.clone().try_send((peer, replier)) {
73 warn!(%peer, %error, "Failed to initiate a ping request");
74 }
75
76 match timeout_fut(self.config.timeout, rx.next()).await {
77 Ok(Some(Ok(latency))) => {
78 debug!(latency = latency.as_millis(), %peer, "Ping succeeded",);
79 Ok(latency)
80 }
81 Ok(Some(Err(e))) => {
82 let error = if let ProbeError::DecodingError = e {
83 ProbeError::PingerError(peer, "incorrect pong response".into())
84 } else {
85 e
86 };
87
88 debug!(%peer, %error, "Ping failed internally",);
89 Err(error)
90 }
91 Ok(None) => {
92 debug!(%peer, "Ping canceled");
93 Err(ProbeError::PingerError(peer, "canceled".into()))
94 }
95 Err(_) => {
96 debug!(%peer, "Ping failed due to timeout");
97 Err(ProbeError::Timeout(self.config.timeout.as_secs()))
98 }
99 }
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use std::time::Duration;
106
107 use anyhow::anyhow;
108
109 use super::*;
110 use crate::ping::Pinger;
111
112 #[tokio::test]
113 async fn ping_query_replier_should_yield_a_failed_probe() -> anyhow::Result<()> {
114 let (tx, mut rx) = futures::channel::mpsc::channel::<PingQueryResult>(256);
115
116 let replier = PingQueryReplier::new(tx);
117
118 replier.notify(Err(ProbeError::Timeout(4u64)));
119
120 assert!(rx.next().await.is_some_and(|r| r.is_err()));
121
122 Ok(())
123 }
124
125 #[tokio::test]
126 async fn ping_query_replier_should_yield_a_successful_probe_as_unidirectional_latency() -> anyhow::Result<()> {
127 const RTT: Duration = Duration::from_millis(100);
128
129 let (tx, mut rx) = futures::channel::mpsc::channel::<PingQueryResult>(256);
130
131 let replier = PingQueryReplier::new(tx);
132
133 replier.notify(Ok(RTT));
134
135 let result = rx.next().await.ok_or(anyhow!("should contain a value"))?;
136
137 assert!(result.is_ok());
138 let result = result?;
139 assert_eq!(result, RTT.div(2));
140
141 Ok(())
142 }
143
144 #[tokio::test]
145 async fn pinger_should_return_an_error_if_the_latency_is_longer_than_the_configure_timeout() -> anyhow::Result<()> {
146 let (tx, mut rx) = futures::channel::mpsc::channel::<(PeerId, PingQueryReplier)>(256);
147
148 let delay = Duration::from_millis(10);
149 let delaying_channel = tokio::task::spawn(async move {
150 while let Some((_peer, replier)) = rx.next().await {
151 tokio::time::sleep(delay).await;
152
153 replier.notify(Ok(delay));
154 }
155 });
156
157 let pinger = Pinger::new(
158 PingConfig {
159 timeout: Duration::from_millis(0),
160 },
161 tx,
162 );
163 assert!(pinger.ping(PeerId::random()).await.is_err());
164
165 delaying_channel.abort();
166
167 Ok(())
168 }
169}