hopr_transport_probe/
ping.rs1use std::ops::Div;
2
3use futures::{
4 StreamExt,
5 channel::mpsc::{UnboundedSender, unbounded},
6};
7use hopr_async_runtime::prelude::timeout_fut;
8use libp2p_identity::PeerId;
9use tracing::{debug, warn};
10
11use crate::errors::{ProbeError, Result};
12
13pub type HeartbeatSendPingTx = UnboundedSender<(PeerId, PingQueryReplier)>;
22
23#[derive(Debug, Clone, PartialEq, Eq, smart_default::SmartDefault)]
25pub struct PingConfig {
26 #[default(std::time::Duration::from_secs(30))]
28 pub timeout: std::time::Duration,
29}
30
31pub type PingQueryResult = Result<std::time::Duration>;
34
35#[derive(Debug, Clone)]
38pub struct PingQueryReplier {
39 notifier: UnboundedSender<PingQueryResult>,
41}
42
43impl PingQueryReplier {
44 pub fn new(notifier: UnboundedSender<PingQueryResult>) -> Self {
45 Self { notifier }
46 }
47
48 pub fn notify(self, result: PingQueryResult) {
53 let result = result.map(|rtt| rtt.div(2u32));
54
55 if self.notifier.unbounded_send(result).is_err() {
56 warn!("Failed to notify the ping query result due to upper layer ping timeout");
57 }
58 }
59}
60
61#[derive(Debug, Clone)]
63pub struct Pinger {
64 config: PingConfig,
65 send_ping: HeartbeatSendPingTx,
66}
67
68impl Pinger {
69 pub fn new(config: PingConfig, send_ping: HeartbeatSendPingTx) -> Self {
70 Self { config, send_ping }
71 }
72
73 #[tracing::instrument(level = "info", skip(self))]
75 pub async fn ping(&self, peer: PeerId) -> Result<std::time::Duration> {
76 let (tx, mut rx) = unbounded::<PingQueryResult>();
77 let replier = PingQueryReplier::new(tx);
78
79 if let Err(error) = self.send_ping.clone().unbounded_send((peer, replier)) {
80 warn!(%peer, %error, "Failed to initiate a ping request");
81 }
82
83 match timeout_fut(self.config.timeout, rx.next()).await {
84 Ok(Some(Ok(latency))) => {
85 debug!(latency = latency.as_millis(), %peer, "Ping succeeded",);
86 Ok(latency)
87 }
88 Ok(Some(Err(e))) => {
89 let error = if let ProbeError::DecodingError = e {
90 ProbeError::PingerError(peer, "incorrect pong response".into())
91 } else {
92 e
93 };
94
95 debug!(%peer, %error, "Ping failed internally",);
96 Err(error)
97 }
98 Ok(None) => {
99 debug!(%peer, "Ping canceled");
100 Err(ProbeError::PingerError(peer, "canceled".into()))
101 }
102 Err(_) => {
103 debug!(%peer, "Ping failed due to timeout");
104 Err(ProbeError::Timeout(self.config.timeout.as_secs()))
105 }
106 }
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use std::time::Duration;
113
114 use anyhow::anyhow;
115
116 use super::*;
117 use crate::ping::Pinger;
118
119 #[tokio::test]
120 async fn ping_query_replier_should_yield_a_failed_probe() -> anyhow::Result<()> {
121 let (tx, mut rx) = futures::channel::mpsc::unbounded::<PingQueryResult>();
122
123 let replier = PingQueryReplier::new(tx);
124
125 replier.notify(Err(ProbeError::Timeout(4u64)));
126
127 assert!(rx.next().await.is_some_and(|r| r.is_err()));
128
129 Ok(())
130 }
131
132 #[tokio::test]
133 async fn ping_query_replier_should_yield_a_successful_probe_as_unidirectional_latency() -> anyhow::Result<()> {
134 const RTT: Duration = Duration::from_millis(100);
135
136 let (tx, mut rx) = futures::channel::mpsc::unbounded::<PingQueryResult>();
137
138 let replier = PingQueryReplier::new(tx);
139
140 replier.notify(Ok(RTT));
141
142 let result = rx.next().await.ok_or(anyhow!("should contain a value"))?;
143
144 assert!(result.is_ok());
145 let result = result?;
146 assert_eq!(result, RTT.div(2));
147
148 Ok(())
149 }
150
151 #[tokio::test]
152 async fn pinger_should_return_an_error_if_the_latency_is_longer_than_the_configure_timeout() -> anyhow::Result<()> {
153 let (tx, mut rx) = futures::channel::mpsc::unbounded::<(PeerId, PingQueryReplier)>();
154
155 let delay = Duration::from_millis(10);
156 let delaying_channel = tokio::task::spawn(async move {
157 while let Some((_peer, replier)) = rx.next().await {
158 tokio::time::sleep(delay).await;
159
160 replier.notify(Ok(delay));
161 }
162 });
163
164 let pinger = Pinger::new(
165 PingConfig {
166 timeout: Duration::from_millis(0),
167 },
168 tx,
169 );
170 assert!(pinger.ping(PeerId::random()).await.is_err());
171
172 delaying_channel.abort();
173
174 Ok(())
175 }
176}