1use async_stream::stream;
2use async_trait::async_trait;
3use futures::channel::mpsc::UnboundedSender;
4use futures::{Stream, StreamExt};
5use hopr_primitive_types::traits::SaturatingSub;
6use libp2p_identity::PeerId;
7use std::ops::Div;
8
9use tracing::{debug, warn};
10
11use hopr_async_runtime::prelude::timeout_fut;
12use hopr_platform::time::native::current_time;
13
14use crate::errors::{NetworkingError, Result};
15use crate::messaging::ControlMessage;
16
17#[cfg(all(feature = "prometheus", not(test)))]
18use hopr_metrics::metrics::{MultiCounter, SimpleHistogram};
19use hopr_primitive_types::prelude::AsUnixTimestamp;
20
21#[cfg(all(feature = "prometheus", not(test)))]
22lazy_static::lazy_static! {
23 static ref METRIC_TIME_TO_PING: SimpleHistogram =
24 SimpleHistogram::new(
25 "hopr_ping_time_sec",
26 "Measures total time it takes to ping a single node (seconds)",
27 vec![0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 15.0, 30.0],
28 ).unwrap();
29 static ref METRIC_PING_COUNT: MultiCounter = MultiCounter::new(
30 "hopr_heartbeat_pings_count",
31 "Total number of pings by result",
32 &["success"]
33 ).unwrap();
34}
35
36pub trait Pinging {
38 fn ping(&self, peers: Vec<PeerId>) -> impl Stream<Item = Result<std::time::Duration>>;
39}
40
41#[cfg_attr(test, mockall::automock)]
44#[async_trait]
45pub trait PingExternalAPI {
46 async fn on_finished_ping(&self, peer: &PeerId, result: &Result<std::time::Duration>, version: String);
47}
48
49pub type HeartbeatSendPingTx = UnboundedSender<(PeerId, PingQueryReplier)>;
58
59#[derive(Debug, Clone, PartialEq, Eq, smart_default::SmartDefault)]
61pub struct PingConfig {
62 #[default = 25]
64 pub max_parallel_pings: usize,
65 #[default(std::time::Duration::from_secs(30))]
67 pub timeout: std::time::Duration, }
69
70pub type PingQueryResult = Result<(std::time::Duration, String)>;
73
74#[derive(Debug, Clone)]
77pub struct PingQueryReplier {
78 notifier: UnboundedSender<PingQueryResult>,
79 challenge: Box<(u64, ControlMessage)>,
80}
81
82impl PingQueryReplier {
83 pub fn new(notifier: UnboundedSender<PingQueryResult>) -> Self {
84 Self {
85 notifier,
86 challenge: Box::new((
87 current_time().as_unix_timestamp().as_millis() as u64,
88 ControlMessage::generate_ping_request(),
89 )),
90 }
91 }
92
93 pub fn challenge(&self) -> ControlMessage {
95 self.challenge.1.clone()
96 }
97
98 pub fn notify(self, pong: ControlMessage, version: String) {
103 let timed_result = if ControlMessage::validate_pong_response(&self.challenge.1, &pong).is_ok() {
104 let unidirectional_latency = current_time()
105 .as_unix_timestamp()
106 .saturating_sub(std::time::Duration::from_millis(self.challenge.0))
107 .div(2u32);
108 Ok((unidirectional_latency, version))
109 } else {
110 Err(NetworkingError::DecodingError)
111 };
112
113 if self.notifier.unbounded_send(timed_result).is_err() {
114 warn!("Failed to notify the ping query result due to upper layer ping timeout");
115 }
116 }
117}
118
119#[tracing::instrument(level = "trace", skip(sender, timeout))]
121pub fn to_active_ping(
122 peer: PeerId,
123 sender: HeartbeatSendPingTx,
124 timeout: std::time::Duration,
125) -> impl std::future::Future<Output = (PeerId, Result<std::time::Duration>, String)> {
126 let (tx, mut rx) = futures::channel::mpsc::unbounded::<PingQueryResult>();
127 let replier = PingQueryReplier::new(tx);
128
129 if let Err(e) = sender.unbounded_send((peer, replier)) {
130 warn!(%peer, error = %e, "Failed to initiate a ping request");
131 }
132
133 async move {
134 match timeout_fut(timeout, rx.next()).await {
135 Ok(Some(Ok((latency, version)))) => {
136 debug!(latency = latency.as_millis(), %peer, %version, "Ping succeeded",);
137 (peer, Ok(latency), version)
138 }
139 Ok(Some(Err(e))) => {
140 let error = if let NetworkingError::DecodingError = e {
141 NetworkingError::PingerError(peer, "incorrect pong response".into())
142 } else {
143 e
144 };
145
146 debug!(%peer, %error, "Ping failed internally",);
147 (peer, Err(error), "unknown".into())
148 }
149 Ok(None) => {
150 debug!(%peer, "Ping canceled");
151 (
152 peer,
153 Err(NetworkingError::PingerError(peer, "canceled".into())),
154 "unknown".into(),
155 )
156 }
157 Err(_) => {
158 debug!(%peer, "Ping failed due to timeout");
159 (peer, Err(NetworkingError::Timeout(timeout.as_secs())), "unknown".into())
160 }
161 }
162 }
163}
164
165#[derive(Debug, Clone)]
167pub struct Pinger<T>
168where
169 T: PingExternalAPI + Send + Sync,
170{
171 config: PingConfig,
172 send_ping: HeartbeatSendPingTx,
173 recorder: T,
174}
175
176impl<T> Pinger<T>
177where
178 T: PingExternalAPI + Send + Sync,
179{
180 pub fn new(config: PingConfig, send_ping: HeartbeatSendPingTx, recorder: T) -> Self {
181 let config = PingConfig {
182 max_parallel_pings: config.max_parallel_pings,
183 ..config
184 };
185
186 Pinger {
187 config,
188 send_ping,
189 recorder,
190 }
191 }
192
193 pub fn config(&self) -> &PingConfig {
194 &self.config
195 }
196}
197
198impl<T> Pinging for Pinger<T>
199where
200 T: PingExternalAPI + Send + Sync,
201{
202 #[tracing::instrument(level = "info", skip(self, peers), fields(peers.count = peers.len()))]
212 fn ping(&self, mut peers: Vec<PeerId>) -> impl Stream<Item = Result<std::time::Duration>> {
213 let start_all_peers = current_time();
214
215 stream! {
216 if !peers.is_empty() {
217 let remainder = peers.split_off(self.config.max_parallel_pings.min(peers.len()));
218 let mut active_pings = peers
219 .into_iter()
220 .map(|peer| to_active_ping(peer, self.send_ping.clone(), self.config.timeout))
221 .collect::<futures::stream::FuturesUnordered<_>>();
222
223 let mut waiting = std::collections::VecDeque::from(remainder);
224
225 while let Some((peer, result, version)) = active_pings.next().await {
226 self.recorder.on_finished_ping(&peer, &result, version).await;
227
228 #[cfg(all(feature = "prometheus", not(test)))]
229 match &result {
230 Ok(duration) => {
231 METRIC_TIME_TO_PING.observe((duration.as_millis() as f64) / 1000.0); METRIC_PING_COUNT.increment(&["true"]);
233 }
234 Err(_) => {
235 METRIC_PING_COUNT.increment(&["false"]);
236 }
237 }
238
239 if current_time().saturating_sub(start_all_peers) < self.config.timeout {
240 if let Some(peer) = waiting.pop_front() {
241 active_pings.push(to_active_ping(peer, self.send_ping.clone(), self.config.timeout));
242 }
243 }
244
245 yield result;
246
247 if active_pings.is_empty() && waiting.is_empty() {
248 break;
249 }
250 }
251 } else {
252 debug!("Received an empty peer list, not pinging any peers");
253 }
254 }
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261 use crate::messaging::ControlMessage;
262 use crate::ping::Pinger;
263 use futures::TryStreamExt;
264 use hopr_primitive_types::traits::SaturatingSub;
265 use mockall::*;
266 use more_asserts::*;
267
268 fn simple_ping_config() -> PingConfig {
269 PingConfig {
270 max_parallel_pings: 2,
271 timeout: std::time::Duration::from_millis(150),
272 }
273 }
274
275 #[async_std::test]
276 async fn ping_query_replier_should_return_ok_result_when_the_pong_is_correct_for_the_challenge(
277 ) -> anyhow::Result<()> {
278 let (tx, mut rx) = futures::channel::mpsc::unbounded::<PingQueryResult>();
279
280 let replier = PingQueryReplier::new(tx);
281 let challenge = replier.challenge.clone();
282
283 replier.notify(
284 ControlMessage::generate_pong_response(&challenge.1)?,
285 "version".to_owned(),
286 );
287
288 assert!(rx.next().await.is_some_and(|r| r.is_ok()));
289
290 Ok(())
291 }
292
293 #[async_std::test]
294 async fn ping_query_replier_should_return_err_result_when_the_pong_is_incorrect_for_the_challenge(
295 ) -> anyhow::Result<()> {
296 let (tx, mut rx) = futures::channel::mpsc::unbounded::<PingQueryResult>();
297
298 let replier = PingQueryReplier::new(tx);
299
300 replier.notify(
301 ControlMessage::generate_pong_response(&ControlMessage::generate_ping_request())?,
302 "version".to_owned(),
303 );
304
305 assert!(rx.next().await.is_some_and(|r| r.is_err()));
306
307 Ok(())
308 }
309
310 #[async_std::test]
311 async fn ping_query_replier_should_return_the_unidirectional_latency() -> anyhow::Result<()> {
312 let (tx, mut rx) = futures::channel::mpsc::unbounded::<PingQueryResult>();
313
314 let replier = PingQueryReplier::new(tx);
315 let challenge = replier.challenge.clone();
316
317 let delay = std::time::Duration::from_millis(10);
318
319 async_std::task::sleep(delay).await;
320 replier.notify(
321 ControlMessage::generate_pong_response(&challenge.1)?,
322 "version".to_owned(),
323 );
324
325 let actual_latency = rx
326 .next()
327 .await
328 .ok_or_else(|| anyhow::anyhow!("should contain a result value"))?
329 .map_err(|_e| anyhow::anyhow!("should contain a result value"))?
330 .0;
331 assert!(actual_latency > delay / 2);
332 assert!(actual_latency < delay);
333
334 Ok(())
335 }
336
337 #[async_std::test]
338 async fn ping_empty_vector_of_peers_should_not_do_any_api_calls() -> anyhow::Result<()> {
339 let (tx, mut rx) = futures::channel::mpsc::unbounded::<(PeerId, PingQueryReplier)>();
340
341 let ideal_channel = async_std::task::spawn(async move {
342 while let Some((_peer, replier)) = rx.next().await {
343 let challenge = replier.challenge.1.clone();
344
345 replier.notify(
346 ControlMessage::generate_pong_response(&challenge).expect("valid challenge reply"),
347 "version".to_owned(),
348 );
349 }
350 });
351
352 let mut mock = MockPingExternalAPI::new();
353 mock.expect_on_finished_ping().times(0);
354
355 let pinger = Pinger::new(simple_ping_config(), tx, mock);
356
357 assert!(pinger.ping(vec![]).try_collect::<Vec<_>>().await?.is_empty());
358
359 ideal_channel.cancel().await;
360
361 Ok(())
362 }
363
364 #[async_std::test]
365 async fn test_ping_peers_with_happy_path_should_trigger_the_desired_external_api_calls() -> anyhow::Result<()> {
366 let (tx, mut rx) = futures::channel::mpsc::unbounded::<(PeerId, PingQueryReplier)>();
367
368 let ideal_channel = async_std::task::spawn(async move {
369 while let Some((_peer, replier)) = rx.next().await {
370 let challenge = replier.challenge.1.clone();
371
372 replier.notify(
373 ControlMessage::generate_pong_response(&challenge).expect("valid challenge reply"),
374 "version".to_owned(),
375 );
376 }
377 });
378
379 let peer = PeerId::random();
380
381 let mut mock = MockPingExternalAPI::new();
382 mock.expect_on_finished_ping()
383 .times(1)
384 .with(
385 predicate::eq(peer),
386 predicate::function(|x: &Result<std::time::Duration>| x.is_ok()),
387 predicate::eq("version".to_owned()),
388 )
389 .return_const(());
390
391 let pinger = Pinger::new(simple_ping_config(), tx, mock);
392 pinger.ping(vec![peer]).try_collect::<Vec<_>>().await?;
393
394 ideal_channel.cancel().await;
395
396 Ok(())
397 }
398
399 #[async_std::test]
400 async fn test_ping_should_invoke_a_failed_ping_reply_for_an_incorrect_reply() -> anyhow::Result<()> {
401 let (tx, mut rx) = futures::channel::mpsc::unbounded::<(PeerId, PingQueryReplier)>();
402
403 let failing_channel = async_std::task::spawn(async move {
404 while let Some((_peer, replier)) = rx.next().await {
405 replier.notify(
406 ControlMessage::generate_pong_response(&ControlMessage::generate_ping_request())
407 .expect("valid challenge reply"),
408 "version".to_owned(),
409 );
410 }
411 });
412
413 let peer = PeerId::random();
414
415 let mut mock = MockPingExternalAPI::new();
416 mock.expect_on_finished_ping()
417 .times(1)
418 .with(
419 predicate::eq(peer),
420 predicate::function(|x: &Result<std::time::Duration>| x.is_err()),
421 predicate::eq("unknown".to_owned()),
422 )
423 .return_const(());
424
425 let pinger = Pinger::new(simple_ping_config(), tx, mock);
426 assert!(pinger.ping(vec![peer]).try_collect::<Vec<_>>().await.is_err());
427
428 failing_channel.cancel().await;
429
430 Ok(())
431 }
432
433 #[async_std::test]
434 async fn test_ping_peer_returns_error_on_the_pong() -> anyhow::Result<()> {
435 let (tx, mut rx) = futures::channel::mpsc::unbounded::<(PeerId, PingQueryReplier)>();
436
437 let delay = std::time::Duration::from_millis(10);
438 let delaying_channel = async_std::task::spawn(async move {
439 while let Some((_peer, replier)) = rx.next().await {
440 let challenge = replier.challenge.1.clone();
441
442 async_std::task::sleep(delay).await;
443 replier.notify(
444 ControlMessage::generate_pong_response(&challenge).expect("valid challenge reply"),
445 "version".to_owned(),
446 );
447 }
448 });
449
450 let peer = PeerId::random();
451 let ping_config = PingConfig {
452 timeout: std::time::Duration::from_millis(0),
453 ..simple_ping_config()
454 };
455
456 let mut mock = MockPingExternalAPI::new();
457 mock.expect_on_finished_ping()
458 .times(1)
459 .with(
460 predicate::eq(peer),
461 predicate::function(|x: &Result<std::time::Duration>| x.is_err()),
462 predicate::eq("unknown".to_owned()),
463 )
464 .return_const(());
465
466 let pinger = Pinger::new(ping_config, tx, mock);
467 assert!(pinger.ping(vec![peer]).try_collect::<Vec<_>>().await.is_err());
468
469 delaying_channel.cancel().await;
470
471 Ok(())
472 }
473
474 #[async_std::test]
475 async fn test_ping_peers_multiple_peers_are_pinged_in_parallel() -> anyhow::Result<()> {
476 let (tx, mut rx) = futures::channel::mpsc::unbounded::<(PeerId, PingQueryReplier)>();
477
478 let ideal_channel = async_std::task::spawn(async move {
479 while let Some((_peer, replier)) = rx.next().await {
480 let challenge = replier.challenge.1.clone();
481
482 replier.notify(
483 ControlMessage::generate_pong_response(&challenge).expect("valid challenge reply"),
484 "version".to_owned(),
485 );
486 }
487 });
488
489 let peers = vec![PeerId::random(), PeerId::random()];
490
491 let mut mock = MockPingExternalAPI::new();
492 mock.expect_on_finished_ping()
493 .times(1)
494 .with(
495 predicate::eq(peers[0]),
496 predicate::function(|x: &Result<std::time::Duration>| x.is_ok()),
497 predicate::eq("version".to_owned()),
498 )
499 .return_const(());
500 mock.expect_on_finished_ping()
501 .times(1)
502 .with(
503 predicate::eq(peers[1]),
504 predicate::function(|x: &Result<std::time::Duration>| x.is_ok()),
505 predicate::eq("version".to_owned()),
506 )
507 .return_const(());
508
509 let pinger = Pinger::new(simple_ping_config(), tx, mock);
510 pinger.ping(peers).try_collect::<Vec<_>>().await?;
511
512 ideal_channel.cancel().await;
513
514 Ok(())
515 }
516
517 #[async_std::test]
518 async fn test_ping_peers_should_ping_parallel_only_a_limited_number_of_peers() -> anyhow::Result<()> {
519 let (tx, mut rx) = futures::channel::mpsc::unbounded::<(PeerId, PingQueryReplier)>();
520
521 let delay = 10u64;
522
523 let ideal_delaying_channel = async_std::task::spawn(async move {
524 while let Some((_peer, replier)) = rx.next().await {
525 let challenge = replier.challenge.1.clone();
526
527 async_std::task::sleep(std::time::Duration::from_millis(delay)).await;
528 replier.notify(
529 ControlMessage::generate_pong_response(&challenge).expect("valid challenge reply"),
530 "version".to_owned(),
531 );
532 }
533 });
534
535 let peers = vec![PeerId::random(), PeerId::random()];
536
537 let mut mock = MockPingExternalAPI::new();
538 mock.expect_on_finished_ping()
539 .times(1)
540 .with(
541 predicate::eq(peers[0]),
542 predicate::function(|x: &Result<std::time::Duration>| x.is_ok()),
543 predicate::eq("version".to_owned()),
544 )
545 .return_const(());
546 mock.expect_on_finished_ping()
547 .times(1)
548 .with(
549 predicate::eq(peers[1]),
550 predicate::function(|x: &Result<std::time::Duration>| x.is_ok()),
551 predicate::eq("version".to_owned()),
552 )
553 .return_const(());
554
555 let pinger = Pinger::new(
556 PingConfig {
557 max_parallel_pings: 1,
558 ..simple_ping_config()
559 },
560 tx,
561 mock,
562 );
563
564 let start = current_time();
565 pinger.ping(peers).try_collect::<Vec<_>>().await?;
566 let end = current_time();
567
568 assert_ge!(end.saturating_sub(start), std::time::Duration::from_millis(delay));
569
570 ideal_delaying_channel.cancel().await;
571
572 Ok(())
573 }
574}