1use std::{
2 collections::hash_set::HashSet,
3 time::{Duration, SystemTime},
4};
5
6use futures::StreamExt;
7use hopr_api::db::{HoprDbPeersOperations, PeerOrigin, PeerSelector, PeerStatus, Stats};
8use hopr_network_types::addr::is_public_address;
9use hopr_platform::time::current_time;
10#[cfg(all(feature = "prometheus", not(test)))]
11use hopr_primitive_types::prelude::*;
12use libp2p_identity::PeerId;
13use multiaddr::Multiaddr;
14use tracing::debug;
15
16use crate::{
17 config::NetworkConfig,
18 errors::{NetworkingError, Result},
19};
20
21#[cfg(all(feature = "prometheus", not(test)))]
22lazy_static::lazy_static! {
23 static ref METRIC_NETWORK_HEALTH: hopr_metrics::SimpleGauge =
24 hopr_metrics::SimpleGauge::new("hopr_network_health", "Connectivity health indicator").unwrap();
25 static ref METRIC_PEERS_BY_QUALITY: hopr_metrics::MultiGauge =
26 hopr_metrics::MultiGauge::new("hopr_peers_by_quality", "Number different peer types by quality",
27 &["type", "quality"],
28 ).unwrap();
29 static ref METRIC_PEER_COUNT: hopr_metrics::SimpleGauge =
30 hopr_metrics::SimpleGauge::new("hopr_peer_count", "Number of all peers").unwrap();
31 static ref METRIC_NETWORK_HEALTH_TIME_TO_GREEN: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
32 "hopr_time_to_green_sec",
33 "Time it takes for a node to transition to the GREEN network state"
34 ).unwrap();
35}
36
37#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, strum::Display, strum::EnumString)]
40pub enum Health {
41 Unknown = 0,
43 Red = 1,
45 Orange = 2,
47 Yellow = 3,
49 Green = 4,
51}
52
53fn health_from_stats(stats: &Stats, is_public: bool) -> Health {
55 let mut health = Health::Red;
56
57 if stats.bad_quality_public > 0 {
58 health = Health::Orange;
59 }
60
61 if stats.good_quality_public > 0 {
62 health = if is_public || stats.good_quality_non_public > 0 {
63 Health::Green
64 } else {
65 Health::Yellow
66 };
67 }
68
69 health
70}
71
72#[derive(Debug, Clone, Copy)]
73pub enum UpdateFailure {
74 Timeout,
76 DialFailure,
78}
79
80#[derive(Debug)]
83pub struct Network<T> {
84 me: PeerId,
85 me_addresses: Vec<Multiaddr>,
86 am_i_public: bool,
87 cfg: NetworkConfig,
88 db: T,
89 #[cfg(all(feature = "prometheus", not(test)))]
90 started_at: Duration,
91}
92
93impl<T> Network<T>
94where
95 T: HoprDbPeersOperations + Sync + Send,
96{
97 pub fn new(my_peer_id: PeerId, my_multiaddresses: Vec<Multiaddr>, cfg: NetworkConfig, db: T) -> Self {
98 #[cfg(all(feature = "prometheus", not(test)))]
99 {
100 METRIC_NETWORK_HEALTH.set(0.0);
101 METRIC_NETWORK_HEALTH_TIME_TO_GREEN.set(0.0);
102 METRIC_PEERS_BY_QUALITY.set(&["public", "high"], 0.0);
103 METRIC_PEERS_BY_QUALITY.set(&["public", "low"], 0.0);
104 METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "high"], 0.0);
105 METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "low"], 0.0);
106 }
107
108 Self {
109 me: my_peer_id,
110 me_addresses: my_multiaddresses,
111 am_i_public: true,
112 cfg,
113 db,
114 #[cfg(all(feature = "prometheus", not(test)))]
115 started_at: current_time().as_unix_timestamp(),
116 }
117 }
118
119 #[tracing::instrument(level = "debug", skip(self), ret(Display))]
121 pub async fn has(&self, peer: &PeerId) -> bool {
122 peer == &self.me || self.db.get_network_peer(peer).await.is_ok_and(|p| p.is_some())
123 }
124
125 #[tracing::instrument(level = "debug", skip(self), ret(level = "trace"), err)]
133 pub async fn add(&self, peer: &PeerId, origin: PeerOrigin, mut addrs: Vec<Multiaddr>) -> Result<()> {
134 if peer == &self.me {
135 return Err(NetworkingError::DisallowedOperationOnOwnPeerIdError);
136 }
137
138 addrs.retain(|a| self.cfg.allow_private_addresses_in_store || is_public_address(a));
140
141 debug!(%peer, %origin, multiaddresses = ?addrs, "Filtered addresses, proceeding with public addresses only");
142
143 if let Some(mut peer_status) = self
144 .db
145 .get_network_peer(peer)
146 .await
147 .map_err(|e| NetworkingError::DbChainError(e.into()))?
148 {
149 debug!(%peer, %origin, multiaddresses = ?addrs, "Updating existing peer in the store");
150
151 if !peer_status.is_ignored() || matches!(origin, PeerOrigin::IncomingConnection) {
152 peer_status.ignored_until = None;
153 }
154
155 peer_status.multiaddresses.append(&mut addrs);
156 peer_status.multiaddresses = peer_status
157 .multiaddresses
158 .into_iter()
159 .collect::<HashSet<_>>()
160 .into_iter()
161 .collect::<Vec<_>>();
162 self.db
163 .update_network_peer(peer_status)
164 .await
165 .map_err(|e| NetworkingError::DbChainError(e.into()))?;
166 } else {
167 debug!(%peer, %origin, multiaddresses = ?addrs, "Adding peer to the store");
168
169 self.db
170 .add_network_peer(
171 peer,
172 origin,
173 addrs,
174 self.cfg.backoff_exponent,
175 self.cfg.quality_avg_window_size,
176 )
177 .await
178 .map_err(|e| NetworkingError::DbChainError(e.into()))?;
179 }
180
181 #[cfg(all(feature = "prometheus", not(test)))]
182 {
183 let stats = self
184 .db
185 .network_peer_stats(self.cfg.quality_bad_threshold)
186 .await
187 .map_err(|e| NetworkingError::DbChainError(e.into()))?;
188 self.refresh_metrics(&stats)
189 }
190
191 Ok(())
192 }
193
194 #[tracing::instrument(level = "debug", skip(self), ret(level = "trace"), err)]
200 pub async fn get(&self, peer: &PeerId) -> Result<Option<PeerStatus>> {
201 if peer == &self.me {
202 Ok(Some({
203 let mut ps = PeerStatus::new(*peer, PeerOrigin::Initialization, 0.0f64, 2u32);
204 ps.multiaddresses = self
206 .me_addresses
207 .iter()
208 .filter(|addr| self.cfg.allow_private_addresses_in_store || is_public_address(addr))
209 .cloned()
210 .collect();
211 ps
212 }))
213 } else {
214 match self
216 .db
217 .get_network_peer(peer)
218 .await
219 .map_err(|e| NetworkingError::DbChainError(e.into()))?
220 {
221 Some(mut peer_status) => {
222 peer_status.multiaddresses = peer_status
224 .multiaddresses
225 .iter()
226 .filter(|addr| self.cfg.allow_private_addresses_in_store || is_public_address(addr))
227 .cloned()
228 .collect();
229 Ok(Some(peer_status))
230 }
231 None => Ok(None),
232 }
233 }
234 }
235
236 #[tracing::instrument(level = "debug", skip(self), ret(level = "trace"), err)]
238 pub async fn remove(&self, peer: &PeerId) -> Result<()> {
239 if peer == &self.me {
240 return Err(NetworkingError::DisallowedOperationOnOwnPeerIdError);
241 }
242
243 self.db
244 .remove_network_peer(peer)
245 .await
246 .map_err(|e| NetworkingError::DbChainError(e.into()))?;
247
248 #[cfg(all(feature = "prometheus", not(test)))]
249 {
250 let stats = self
251 .db
252 .network_peer_stats(self.cfg.quality_bad_threshold)
253 .await
254 .map_err(|e| NetworkingError::DbChainError(e.into()))?;
255 self.refresh_metrics(&stats);
256 tracing::trace!(
257 health = %health_from_stats(&stats, self.am_i_public),
258 trigger = "peer removal",
259 "Network health updated"
260 );
261 }
262
263 Ok(())
264 }
265
266 #[tracing::instrument(level = "debug", skip(self), ret(level = "trace"), err)]
277 pub async fn update(&self, peer: &PeerId, ping_result: std::result::Result<Duration, UpdateFailure>) -> Result<()> {
278 if peer == &self.me {
279 return Err(NetworkingError::DisallowedOperationOnOwnPeerIdError);
280 }
281
282 if let Some(mut entry) = self
283 .db
284 .get_network_peer(peer)
285 .await
286 .map_err(|e| NetworkingError::DbChainError(e.into()))?
287 {
288 entry.heartbeats_sent += 1;
289
290 match ping_result {
291 Ok(latency) => {
292 if !entry.is_ignored() {
293 entry.ignored_until = None;
294 }
295 entry.last_seen = current_time();
296 entry.last_seen_latency = latency;
297 entry.heartbeats_succeeded += 1;
298 entry.backoff = self.cfg.backoff_min;
300 entry.update_quality(1.0_f64.min(entry.get_quality() + self.cfg.quality_step));
301 }
302 Err(error) => match error {
303 UpdateFailure::Timeout => {
304 tracing::trace!("Update failed with timeout");
305 entry.backoff = self.cfg.backoff_max.min(entry.backoff.powf(self.cfg.backoff_exponent));
308 entry.update_quality(0.0_f64.max(entry.get_quality() - self.cfg.quality_step));
309
310 let q = entry.get_quality();
311
312 if q < self.cfg.quality_bad_threshold {
313 entry.ignored_until = Some(current_time() + self.cfg.ignore_timeframe);
314 }
315 }
316 UpdateFailure::DialFailure => {
317 tracing::trace!("Update failed with dial failure");
318 entry.update_quality(0.0_f64);
319 entry.ignored_until = Some(
320 current_time()
321 + crate::config::DEFAULT_CANNOT_DIAL_PENALTY
322 + std::time::Duration::from_secs(hopr_crypto_random::random_integer(0, Some(600))),
323 );
324 }
325 },
326 }
327
328 tracing::trace!(%peer, quality = entry.quality, quality_avg = hopr_primitive_types::sma::SMA::average(&entry.quality_avg), "Updating peer status in the store");
329 self.db
330 .update_network_peer(entry)
331 .await
332 .map_err(|e| NetworkingError::DbChainError(e.into()))?;
333
334 #[cfg(all(feature = "prometheus", not(test)))]
335 {
336 let stats = self
337 .db
338 .network_peer_stats(self.cfg.quality_bad_threshold)
339 .await
340 .map_err(|e| NetworkingError::DbChainError(e.into()))?;
341 self.refresh_metrics(&stats);
342 tracing::trace!(
343 health = %health_from_stats(&stats, self.am_i_public),
344 trigger = "peer update",
345 "Network health updated"
346 );
347 }
348
349 Ok(())
350 } else {
351 debug!(%peer, "Ignoring update request for unknown peer");
352 Ok(())
353 }
354 }
355
356 pub async fn health(&self) -> Health {
358 self.db
359 .network_peer_stats(self.cfg.quality_bad_threshold)
360 .await
361 .map(|stats| health_from_stats(&stats, self.am_i_public))
362 .unwrap_or(Health::Unknown)
363 }
364
365 #[cfg(all(feature = "prometheus", not(test)))]
367 fn refresh_metrics(&self, stats: &Stats) {
368 let health = health_from_stats(stats, self.am_i_public);
369
370 if METRIC_NETWORK_HEALTH_TIME_TO_GREEN.get() < 0.5f64 {
371 if let Some(ts) = current_time().checked_sub(self.started_at) {
372 METRIC_NETWORK_HEALTH_TIME_TO_GREEN.set(ts.as_unix_timestamp().as_secs_f64());
373 }
374 }
375 METRIC_PEER_COUNT.set(stats.all_count() as f64);
376 METRIC_PEERS_BY_QUALITY.set(&["public", "high"], stats.good_quality_public as f64);
377 METRIC_PEERS_BY_QUALITY.set(&["public", "low"], stats.bad_quality_public as f64);
378 METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "high"], stats.good_quality_non_public as f64);
379 METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "low"], stats.bad_quality_non_public as f64);
380 METRIC_NETWORK_HEALTH.set((health as i32).into());
381 }
382
383 pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
384 let minimum_quality = self.cfg.quality_offline_threshold;
385 self.peer_filter(|peer| async move { (peer.get_quality() > minimum_quality).then_some(peer.id.1) })
386 .await
387 }
388
389 pub(crate) async fn peer_filter<Fut, V, F>(&self, filter: F) -> Result<Vec<V>>
391 where
392 F: FnMut(PeerStatus) -> Fut,
393 Fut: std::future::Future<Output = Option<V>>,
394 {
395 Ok(self
396 .db
397 .get_network_peers(Default::default(), false)
398 .await
399 .map_err(|e| NetworkingError::DbChainError(e.into()))?
400 .filter_map(filter)
401 .collect()
402 .await)
403 }
404
405 #[tracing::instrument(level = "debug", skip(self, threshold), ret(level = "trace"), err, fields(since = ?threshold))]
416 pub async fn find_peers_to_ping(&self, threshold: SystemTime) -> Result<Vec<PeerId>> {
417 Ok(self
418 .db
419 .get_network_peers(PeerSelector::default().with_last_seen_lte(threshold), true)
420 .await
421 .map_err(|e| NetworkingError::DbChainError(e.into()))?
422 .filter_map(|v| async move {
423 if v.id.1 == self.me {
424 return None;
425 }
426
427 if let Some(ignore_start) = v.ignored_until {
428 let should_be_ignored = ignore_start
429 .checked_add(self.cfg.ignore_timeframe)
430 .is_some_and(|v| v > threshold);
431
432 if should_be_ignored {
433 return None;
434 }
435 }
436
437 let backoff = v.backoff.powf(self.cfg.backoff_exponent);
438 let delay = std::cmp::min(self.cfg.min_delay * (backoff as u32), self.cfg.max_delay);
439
440 if (v.last_seen + delay) < threshold {
441 Some(v.id.1)
442 } else {
443 None
444 }
445 })
446 .collect()
447 .await)
448 }
449}
450
451#[cfg(test)]
452mod tests {
453 use std::{ops::Add, time::Duration};
454
455 use anyhow::Context;
456 use hopr_crypto_types::keypairs::{Keypair, OffchainKeypair};
457 use hopr_db_node::HoprNodeDb;
458 use hopr_platform::time::native::current_time;
459 use hopr_primitive_types::prelude::AsUnixTimestamp;
460 use libp2p_identity::PeerId;
461 use more_asserts::*;
462
463 use super::*;
464 use crate::network::{Health, Network, NetworkConfig, PeerOrigin};
465
466 impl<T> Network<T>
467 where
468 T: HoprDbPeersOperations + Sync + Send,
469 {
470 async fn is_ignored(&self, peer: &PeerId) -> bool {
472 peer != &self.me && self.get(peer).await.is_ok_and(|ps| ps.is_some_and(|p| p.is_ignored()))
473 }
474 }
475
476 #[test]
477 fn test_network_health_should_serialize_to_a_proper_string() {
478 assert_eq!(format!("{}", Health::Orange), "Orange".to_owned())
479 }
480
481 #[test]
482 fn test_network_health_should_deserialize_from_proper_string() -> anyhow::Result<()> {
483 let parsed: Health = "Orange".parse()?;
484 assert_eq!(parsed, Health::Orange);
485
486 Ok(())
487 }
488
489 async fn basic_network(my_id: &PeerId) -> anyhow::Result<Network<HoprNodeDb>> {
490 let cfg = NetworkConfig {
491 quality_offline_threshold: 0.6,
492 ..Default::default()
493 };
494 Ok(Network::new(*my_id, vec![], cfg, HoprNodeDb::new_in_memory().await?))
495 }
496
497 #[test]
498 fn test_network_health_should_be_ordered_numerically_for_hopr_metrics_output() {
499 assert_eq!(Health::Unknown as i32, 0);
500 assert_eq!(Health::Red as i32, 1);
501 assert_eq!(Health::Orange as i32, 2);
502 assert_eq!(Health::Yellow as i32, 3);
503 assert_eq!(Health::Green as i32, 4);
504 }
505
506 #[tokio::test]
507 async fn test_network_should_not_be_able_to_add_self_reference() -> anyhow::Result<()> {
508 let me = PeerId::random();
509
510 let peers = basic_network(&me).await?;
511
512 assert!(peers.add(&me, PeerOrigin::IncomingConnection, vec![]).await.is_err());
513
514 assert_eq!(
515 0,
516 peers
517 .peer_filter(|peer| async move { Some(peer.id) })
518 .await
519 .unwrap_or(vec![])
520 .len()
521 );
522 assert!(peers.has(&me).await);
523
524 Ok(())
525 }
526
527 #[tokio::test]
528 async fn test_network_should_contain_a_registered_peer() -> anyhow::Result<()> {
529 let expected: PeerId = OffchainKeypair::random().public().into();
530 let me: PeerId = OffchainKeypair::random().public().into();
531
532 let peers = basic_network(&me).await?;
533
534 peers.add(&expected, PeerOrigin::IncomingConnection, vec![]).await?;
535
536 assert_eq!(
537 1,
538 peers
539 .peer_filter(|peer| async move { Some(peer.id) })
540 .await
541 .unwrap_or(vec![])
542 .len()
543 );
544 assert!(peers.has(&expected).await);
545
546 Ok(())
547 }
548
549 #[tokio::test]
550 async fn test_network_should_remove_a_peer_on_unregistration() -> anyhow::Result<()> {
551 let peer: PeerId = OffchainKeypair::random().public().into();
552 let me: PeerId = OffchainKeypair::random().public().into();
553
554 let peers = basic_network(&me).await?;
555
556 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
557
558 peers.remove(&peer).await?;
559
560 assert_eq!(
561 0,
562 peers
563 .peer_filter(|peer| async move { Some(peer.id) })
564 .await
565 .unwrap_or(vec![])
566 .len()
567 );
568 assert!(!peers.has(&peer).await);
569
570 Ok(())
571 }
572
573 #[tokio::test]
574 async fn test_network_should_ignore_heartbeat_updates_for_peers_that_were_not_registered() -> anyhow::Result<()> {
575 let peer: PeerId = OffchainKeypair::random().public().into();
576 let me: PeerId = OffchainKeypair::random().public().into();
577
578 let peers = basic_network(&me).await?;
579
580 peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
581
582 assert_eq!(
583 0,
584 peers
585 .peer_filter(|peer| async move { Some(peer.id) })
586 .await
587 .unwrap_or(vec![])
588 .len()
589 );
590 assert!(!peers.has(&peer).await);
591
592 Ok(())
593 }
594
595 #[tokio::test]
596 async fn test_network_should_be_able_to_register_a_succeeded_heartbeat_result() -> anyhow::Result<()> {
597 let peer: PeerId = OffchainKeypair::random().public().into();
598 let me: PeerId = OffchainKeypair::random().public().into();
599
600 let peers = basic_network(&me).await?;
601
602 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
603
604 let latency = 123u64;
605
606 peers
607 .update(&peer, Ok(std::time::Duration::from_millis(latency)))
608 .await?;
609
610 let actual = peers.get(&peer).await?.expect("peer record should be present");
611
612 assert_eq!(actual.heartbeats_sent, 1);
613 assert_eq!(actual.heartbeats_succeeded, 1);
614 assert_eq!(actual.last_seen_latency, std::time::Duration::from_millis(latency));
615
616 Ok(())
617 }
618
619 #[tokio::test]
620 async fn test_network_update_should_merge_metadata() -> anyhow::Result<()> {
621 let peer: PeerId = OffchainKeypair::random().public().into();
622 let me: PeerId = OffchainKeypair::random().public().into();
623
624 let peers = basic_network(&me).await?;
625
626 let ts = Duration::from_millis(100);
627
628 {
629 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
630 peers.update(&peer, Ok(ts)).await?;
631
632 let status = peers.get(&peer).await?.context("peer should be present")?;
633
634 assert_eq!(status.last_seen_latency, ts);
635 }
636
637 let ts = Duration::from_millis(200);
638
639 {
640 peers.update(&peer, Ok(ts)).await?;
641
642 let status = peers.get(&peer).await?.context("peer should be present")?;
643
644 assert_eq!(status.last_seen_latency, ts);
645 }
646
647 Ok(())
648 }
649
650 #[tokio::test]
651 async fn network_should_ignore_a_peer_that_has_reached_lower_thresholds_a_specified_amount_of_time()
652 -> anyhow::Result<()> {
653 let peer: PeerId = OffchainKeypair::random().public().into();
654 let me: PeerId = OffchainKeypair::random().public().into();
655
656 let peers = basic_network(&me).await?;
657
658 peers.add(&peer, PeerOrigin::NetworkRegistry, vec![]).await?;
659
660 peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
661 peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
662 peers.update(&peer, Err(UpdateFailure::Timeout)).await?; peers
665 .update(&peer, Err(UpdateFailure::Timeout))
666 .await
667 .expect("no error should occur"); assert!(peers.is_ignored(&peer).await);
670
671 peers.add(&peer, PeerOrigin::ManualPing, vec![]).await?;
673
674 assert!(peers.is_ignored(&peer).await);
675
676 Ok(())
677 }
678
679 #[tokio::test]
680 async fn network_should_stop_ignoring_a_peer_that_has_reached_lower_thresholds_but_connected_back()
681 -> anyhow::Result<()> {
682 let peer: PeerId = OffchainKeypair::random().public().into();
683 let me: PeerId = OffchainKeypair::random().public().into();
684
685 let peers = basic_network(&me).await?;
686
687 peers.add(&peer, PeerOrigin::NetworkRegistry, vec![]).await?;
688
689 peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
690 peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
691 peers.update(&peer, Err(UpdateFailure::Timeout)).await?; peers
694 .update(&peer, Err(UpdateFailure::Timeout))
695 .await
696 .expect("no error should occur"); assert!(peers.is_ignored(&peer).await);
699
700 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
702
703 assert!(!peers.is_ignored(&peer).await);
704
705 Ok(())
706 }
707
708 #[tokio::test]
709 async fn network_should_ignore_a_peer_that_could_not_be_dialed() -> anyhow::Result<()> {
710 let peer: PeerId = OffchainKeypair::random().public().into();
711 let me: PeerId = OffchainKeypair::random().public().into();
712
713 let peers = basic_network(&me).await?;
714
715 peers.add(&peer, PeerOrigin::NetworkRegistry, vec![]).await?;
716
717 peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
718 peers.update(&peer, Err(UpdateFailure::DialFailure)).await?; assert!(peers.is_ignored(&peer).await);
721
722 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
723
724 assert!(!peers.is_ignored(&peer).await);
725
726 Ok(())
727 }
728
729 #[tokio::test]
730 async fn test_network_should_be_able_to_register_a_failed_heartbeat_result() -> anyhow::Result<()> {
731 let peer: PeerId = OffchainKeypair::random().public().into();
732 let me: PeerId = OffchainKeypair::random().public().into();
733
734 let peers = basic_network(&me).await?;
735
736 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
737
738 peers
741 .update(&peer, Ok(std::time::Duration::from_millis(123_u64)))
742 .await?;
743 peers
744 .update(&peer, Ok(std::time::Duration::from_millis(200_u64)))
745 .await?;
746 peers
747 .update(&peer, Ok(std::time::Duration::from_millis(200_u64)))
748 .await?;
749
750 peers.update(&peer, Err(UpdateFailure::Timeout)).await?;
751
752 let actual = peers.get(&peer).await?.expect("the peer record should be present");
753
754 assert_eq!(actual.heartbeats_succeeded, 3);
755 assert_lt!(actual.backoff, 3f64);
756 assert_gt!(actual.backoff, 2f64);
757
758 Ok(())
759 }
760
761 #[tokio::test]
762 async fn test_network_should_not_overflow_max_backoff() -> anyhow::Result<()> {
763 let peer: PeerId = OffchainKeypair::random().public().into();
764 let me: PeerId = OffchainKeypair::random().public().into();
765
766 let peers = basic_network(&me).await?;
767
768 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
769
770 for latency in [123_u64, 200_u64, 200_u64] {
771 peers
772 .update(&peer, Ok(std::time::Duration::from_millis(latency)))
773 .await?;
774 }
775
776 loop {
778 let updated_peer = peers.get(&peer).await?.expect("the peer record should be present");
779 if updated_peer.backoff == peers.cfg.backoff_max {
780 break;
781 }
782
783 peers.update(&peer, Err(UpdateFailure::Timeout)).await?;
784 }
785
786 peers.update(&peer, Err(UpdateFailure::Timeout)).await?;
788 let actual = peers.get(&peer).await?.expect("the peer record should be present");
789
790 assert_eq!(actual.backoff, peers.cfg.backoff_max);
791
792 Ok(())
793 }
794
795 #[tokio::test]
796 async fn test_network_peer_should_be_listed_for_the_ping_if_last_recorded_later_than_reference()
797 -> anyhow::Result<()> {
798 let first: PeerId = OffchainKeypair::random().public().into();
799 let second: PeerId = OffchainKeypair::random().public().into();
800 let me: PeerId = OffchainKeypair::random().public().into();
801
802 let peers = basic_network(&me).await?;
803
804 peers.add(&first, PeerOrigin::IncomingConnection, vec![]).await?;
805 peers.add(&second, PeerOrigin::IncomingConnection, vec![]).await?;
806
807 let latency = 77_u64;
808
809 let mut expected = vec![first, second];
810 expected.sort();
811
812 peers
813 .update(&first, Ok(std::time::Duration::from_millis(latency)))
814 .await?;
815 peers
816 .update(&second, Ok(std::time::Duration::from_millis(latency)))
817 .await?;
818
819 let mut actual = peers
829 .find_peers_to_ping(current_time().add(Duration::from_secs(2u64)))
830 .await?;
831 actual.sort();
832
833 assert_eq!(actual, expected);
834
835 Ok(())
836 }
837
838 #[tokio::test]
839 async fn test_network_should_have_red_health_without_any_registered_peers() -> anyhow::Result<()> {
840 let me: PeerId = OffchainKeypair::random().public().into();
841
842 let peers = basic_network(&me).await?;
843
844 assert_eq!(peers.health().await, Health::Red);
845
846 Ok(())
847 }
848
849 #[tokio::test]
850 async fn test_network_should_be_unhealthy_without_any_heartbeat_updates() -> anyhow::Result<()> {
851 let peer: PeerId = OffchainKeypair::random().public().into();
852 let me: PeerId = OffchainKeypair::random().public().into();
853
854 let peers = basic_network(&me).await?;
855
856 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
857
858 assert_eq!(peers.health().await, Health::Orange);
860
861 Ok(())
862 }
863
864 #[tokio::test]
865 async fn test_network_should_be_unhealthy_without_any_peers_once_the_health_was_known() -> anyhow::Result<()> {
866 let peer: PeerId = OffchainKeypair::random().public().into();
867 let me: PeerId = OffchainKeypair::random().public().into();
868
869 let peers = basic_network(&me).await?;
870
871 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
872 let _ = peers.health().await;
873 peers.remove(&peer).await?;
874
875 assert_eq!(peers.health().await, Health::Red);
876
877 Ok(())
878 }
879
880 #[tokio::test]
881 async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_low_quality() -> anyhow::Result<()> {
882 let peer: PeerId = OffchainKeypair::random().public().into();
883 let me: PeerId = OffchainKeypair::random().public().into();
884
885 let cfg = NetworkConfig {
886 quality_offline_threshold: 0.6,
887 ..Default::default()
888 };
889
890 let peers = Network::new(me, vec![], cfg, HoprNodeDb::new_in_memory().await?);
891
892 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
893
894 peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
895
896 assert_eq!(peers.health().await, Health::Orange);
897
898 Ok(())
899 }
900
901 #[tokio::test]
902 async fn network_should_allow_the_quality_to_go_to_0() -> anyhow::Result<()> {
903 let peer: PeerId = OffchainKeypair::random().public().into();
904 let public = peer;
905 let me: PeerId = OffchainKeypair::random().public().into();
906
907 let cfg = NetworkConfig {
908 quality_offline_threshold: 0.6,
909 ..Default::default()
910 };
911
912 let peers = Network::new(me, vec![], cfg, HoprNodeDb::new_in_memory().await?);
913
914 peers.add(&peer, PeerOrigin::NetworkRegistry, vec![]).await?;
915
916 assert!(peers.update(&peer, Err(UpdateFailure::Timeout)).await.is_ok());
917
918 assert!(peers.is_ignored(&public).await);
919
920 Ok(())
921 }
922
923 #[tokio::test]
924 async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_high_quality_and_i_am_public()
925 -> anyhow::Result<()> {
926 let me: PeerId = OffchainKeypair::random().public().into();
927 let peer: PeerId = OffchainKeypair::random().public().into();
928
929 let cfg = NetworkConfig {
930 quality_offline_threshold: 0.3,
931 ..Default::default()
932 };
933
934 let peers = Network::new(me, vec![], cfg, HoprNodeDb::new_in_memory().await?);
935
936 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
937
938 for _ in 0..3 {
939 peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
940 }
941
942 assert_eq!(peers.health().await, Health::Green);
943
944 Ok(())
945 }
946
947 #[tokio::test]
948 async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_high_quality_and_another_high_quality_non_public()
949 -> anyhow::Result<()> {
950 let peer: PeerId = OffchainKeypair::random().public().into();
951 let peer2: PeerId = OffchainKeypair::random().public().into();
952
953 let cfg = NetworkConfig {
954 quality_offline_threshold: 0.3,
955 ..Default::default()
956 };
957
958 let peers = Network::new(
959 OffchainKeypair::random().public().into(),
960 vec![],
961 cfg,
962 HoprNodeDb::new_in_memory().await?,
963 );
964
965 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
966 peers.add(&peer2, PeerOrigin::IncomingConnection, vec![]).await?;
967
968 for _ in 0..3 {
969 peers.update(&peer2, Ok(current_time().as_unix_timestamp())).await?;
970 peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
971 }
972
973 assert_eq!(peers.health().await, Health::Green);
974
975 Ok(())
976 }
977
978 #[tokio::test]
979 #[ignore] async fn network_add_should_filter_private_multiaddresses() -> anyhow::Result<()> {
981 use multiaddr::Multiaddr;
982
983 let peer: PeerId = OffchainKeypair::random().public().into();
984 let me: PeerId = OffchainKeypair::random().public().into();
985
986 let private_addr1: Multiaddr = "/ip4/192.168.1.100/tcp/9091".parse()?;
988 let private_addr2: Multiaddr = "/ip4/10.0.0.1/tcp/9092".parse()?;
989 let private_addr3: Multiaddr = "/ip4/127.0.0.1/tcp/9093".parse()?;
990 let public_addr: Multiaddr = "/ip4/8.8.8.8/tcp/9094".parse()?;
991
992 let mixed_addresses = vec![
993 private_addr1.clone(),
994 public_addr.clone(),
995 private_addr2.clone(),
996 private_addr3.clone(),
997 ];
998
999 let peers = Network::new(
1000 me,
1001 vec![public_addr.clone()], Default::default(),
1003 HoprNodeDb::new_in_memory().await?,
1004 );
1005
1006 peers.add(&peer, PeerOrigin::NetworkRegistry, mixed_addresses).await?;
1008
1009 let peer_status = peers.get(&peer).await?.context("peer should be present")?;
1011
1012 assert_eq!(
1014 peer_status.multiaddresses.len(),
1015 1,
1016 "Should only have 1 public address stored"
1017 );
1018 assert_eq!(
1019 peer_status.multiaddresses[0], public_addr,
1020 "Should only contain the public address"
1021 );
1022
1023 let self_status = peers.get(&me).await?.context("self peer should be present")?;
1025
1026 assert_eq!(
1028 self_status.multiaddresses.len(),
1029 1,
1030 "Self should only have 1 public address"
1031 );
1032 assert_eq!(
1033 self_status.multiaddresses[0], public_addr,
1034 "Self should only contain the public address"
1035 );
1036
1037 Ok(())
1038 }
1039
1040 #[tokio::test]
1041 #[ignore] async fn network_get_should_filter_private_multiaddresses_as_defensive_measure() -> anyhow::Result<()> {
1043 use multiaddr::Multiaddr;
1044
1045 let me: PeerId = OffchainKeypair::random().public().into();
1046
1047 let private_addr1: Multiaddr = "/ip4/192.168.1.100/tcp/9091".parse()?;
1049 let public_addr: Multiaddr = "/ip4/8.8.8.8/tcp/9094".parse()?;
1050
1051 let mixed_self_addresses = vec![private_addr1.clone(), public_addr.clone()];
1052
1053 let peers = Network::new(
1055 me,
1056 mixed_self_addresses,
1057 Default::default(),
1058 HoprNodeDb::new_in_memory().await?,
1059 );
1060
1061 let self_status = peers.get(&me).await?.context("self peer should be present")?;
1063
1064 assert_eq!(
1066 self_status.multiaddresses.len(),
1067 1,
1068 "Self should only have 1 public address"
1069 );
1070 assert_eq!(
1071 self_status.multiaddresses[0], public_addr,
1072 "Self should only contain the public address"
1073 );
1074
1075 Ok(())
1076 }
1077}