1use std::{
2 collections::hash_set::HashSet,
3 time::{Duration, SystemTime},
4};
5
6use futures::StreamExt;
7pub use hopr_db_api::peers::{HoprDbPeersOperations, PeerOrigin, PeerSelector, PeerStatus, Stats};
8use hopr_platform::time::current_time;
9use libp2p_identity::PeerId;
10use multiaddr::Multiaddr;
11use tracing::debug;
12#[cfg(all(feature = "prometheus", not(test)))]
13use {
14 hopr_metrics::metrics::{MultiGauge, SimpleGauge},
15 hopr_primitive_types::prelude::*,
16};
17
18use crate::{config::NetworkConfig, errors::Result};
19
20#[cfg(all(feature = "prometheus", not(test)))]
21lazy_static::lazy_static! {
22 static ref METRIC_NETWORK_HEALTH: SimpleGauge =
23 SimpleGauge::new("hopr_network_health", "Connectivity health indicator").unwrap();
24 static ref METRIC_PEERS_BY_QUALITY: MultiGauge =
25 MultiGauge::new("hopr_peers_by_quality", "Number different peer types by quality",
26 &["type", "quality"],
27 ).unwrap();
28 static ref METRIC_PEER_COUNT: SimpleGauge =
29 SimpleGauge::new("hopr_peer_count", "Number of all peers").unwrap();
30 static ref METRIC_NETWORK_HEALTH_TIME_TO_GREEN: SimpleGauge = SimpleGauge::new(
31 "hopr_time_to_green_sec",
32 "Time it takes for a node to transition to the GREEN network state"
33 ).unwrap();
34}
35
36#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, strum::Display, strum::EnumString)]
39pub enum Health {
40 Unknown = 0,
42 Red = 1,
44 Orange = 2,
46 Yellow = 3,
48 Green = 4,
50}
51
52fn health_from_stats(stats: &Stats, is_public: bool) -> Health {
54 let mut health = Health::Red;
55
56 if stats.bad_quality_public > 0 {
57 health = Health::Orange;
58 }
59
60 if stats.good_quality_public > 0 {
61 health = if is_public || stats.good_quality_non_public > 0 {
62 Health::Green
63 } else {
64 Health::Yellow
65 };
66 }
67
68 health
69}
70
71#[derive(Debug, Clone, Copy)]
72pub enum UpdateFailure {
73 Timeout,
75 DialFailure,
77}
78
79#[derive(Debug)]
82pub struct Network<T>
83where
84 T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
85{
86 me: PeerId,
87 me_addresses: Vec<Multiaddr>,
88 am_i_public: bool,
89 cfg: NetworkConfig,
90 db: T,
91 #[cfg(all(feature = "prometheus", not(test)))]
92 started_at: Duration,
93}
94
95impl<T> Network<T>
96where
97 T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
98{
99 pub fn new(my_peer_id: PeerId, my_multiaddresses: Vec<Multiaddr>, cfg: NetworkConfig, db: T) -> Self {
100 #[cfg(all(feature = "prometheus", not(test)))]
101 {
102 METRIC_NETWORK_HEALTH.set(0.0);
103 METRIC_NETWORK_HEALTH_TIME_TO_GREEN.set(0.0);
104 METRIC_PEERS_BY_QUALITY.set(&["public", "high"], 0.0);
105 METRIC_PEERS_BY_QUALITY.set(&["public", "low"], 0.0);
106 METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "high"], 0.0);
107 METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "low"], 0.0);
108 }
109
110 Self {
111 me: my_peer_id,
112 me_addresses: my_multiaddresses,
113 am_i_public: true,
114 cfg,
115 db,
116 #[cfg(all(feature = "prometheus", not(test)))]
117 started_at: current_time().as_unix_timestamp(),
118 }
119 }
120
121 #[tracing::instrument(level = "debug", skip(self), ret(Display))]
123 pub async fn has(&self, peer: &PeerId) -> bool {
124 peer == &self.me || self.db.get_network_peer(peer).await.is_ok_and(|p| p.is_some())
125 }
126
127 #[tracing::instrument(level = "debug", skip(self), ret(level = "trace"), err)]
131 pub async fn add(&self, peer: &PeerId, origin: PeerOrigin, mut addrs: Vec<Multiaddr>) -> Result<()> {
132 if peer == &self.me {
133 return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
134 }
135
136 if let Some(mut peer_status) = self.db.get_network_peer(peer).await? {
137 debug!(%peer, %origin, multiaddresses = ?addrs, "Updating existing peer in the store");
138
139 if !peer_status.is_ignored() || matches!(origin, PeerOrigin::IncomingConnection) {
140 peer_status.ignored_until = None;
141 }
142
143 peer_status.multiaddresses.append(&mut addrs);
144 peer_status.multiaddresses = peer_status
145 .multiaddresses
146 .into_iter()
147 .collect::<HashSet<_>>()
148 .into_iter()
149 .collect::<Vec<_>>();
150 self.db.update_network_peer(peer_status).await?;
151 } else {
152 debug!(%peer, %origin, multiaddresses = ?addrs, "Adding peer to the store");
153
154 self.db
155 .add_network_peer(
156 peer,
157 origin,
158 addrs,
159 self.cfg.backoff_exponent,
160 self.cfg.quality_avg_window_size,
161 )
162 .await?;
163 }
164
165 #[cfg(all(feature = "prometheus", not(test)))]
166 {
167 let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
168 self.refresh_metrics(&stats)
169 }
170
171 Ok(())
172 }
173
174 #[tracing::instrument(level = "debug", skip(self), ret(level = "trace"), err)]
176 pub async fn get(&self, peer: &PeerId) -> Result<Option<PeerStatus>> {
177 if peer == &self.me {
178 Ok(Some({
179 let mut ps = PeerStatus::new(*peer, PeerOrigin::Initialization, 0.0f64, 2u32);
180 ps.multiaddresses.clone_from(&self.me_addresses);
181 ps
182 }))
183 } else {
184 Ok(self.db.get_network_peer(peer).await?)
185 }
186 }
187
188 #[tracing::instrument(level = "debug", skip(self), ret(level = "trace"), err)]
190 pub async fn remove(&self, peer: &PeerId) -> Result<()> {
191 if peer == &self.me {
192 return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
193 }
194
195 self.db.remove_network_peer(peer).await?;
196
197 #[cfg(all(feature = "prometheus", not(test)))]
198 {
199 let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
200 self.refresh_metrics(&stats);
201 tracing::trace!(
202 health = %health_from_stats(&stats, self.am_i_public),
203 trigger = "peer removal",
204 "Network health updated"
205 );
206 }
207
208 Ok(())
209 }
210
211 #[tracing::instrument(level = "debug", skip(self), ret(level = "trace"), err)]
222 pub async fn update(&self, peer: &PeerId, ping_result: std::result::Result<Duration, UpdateFailure>) -> Result<()> {
223 if peer == &self.me {
224 return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
225 }
226
227 if let Some(mut entry) = self.db.get_network_peer(peer).await? {
228 entry.heartbeats_sent += 1;
229
230 match ping_result {
231 Ok(latency) => {
232 if !entry.is_ignored() {
233 entry.ignored_until = None;
234 }
235 entry.last_seen = current_time();
236 entry.last_seen_latency = latency;
237 entry.heartbeats_succeeded += 1;
238 entry.backoff = self.cfg.backoff_min;
240 entry.update_quality(1.0_f64.min(entry.get_quality() + self.cfg.quality_step));
241 }
242 Err(error) => match error {
243 UpdateFailure::Timeout => {
244 tracing::trace!("Update failed with timeout");
245 entry.backoff = self.cfg.backoff_max.min(entry.backoff.powf(self.cfg.backoff_exponent));
248 entry.update_quality(0.0_f64.max(entry.get_quality() - self.cfg.quality_step));
249
250 let q = entry.get_quality();
251
252 if q < self.cfg.quality_bad_threshold {
253 entry.ignored_until = Some(current_time() + self.cfg.ignore_timeframe);
254 }
255 }
256 UpdateFailure::DialFailure => {
257 tracing::trace!("Update failed with dial failure");
258 entry.update_quality(0.0_f64);
259 entry.ignored_until = Some(
260 current_time()
261 + crate::config::DEFAULT_CANNOT_DIAL_PENALTY
262 + std::time::Duration::from_secs(hopr_crypto_random::random_integer(0, Some(600))),
263 );
264 }
265 },
266 }
267
268 tracing::trace!(%peer, quality = entry.quality, quality_avg = hopr_primitive_types::sma::SMA::average(&entry.quality_avg), "Updating peer status in the store");
269 self.db.update_network_peer(entry).await?;
270
271 #[cfg(all(feature = "prometheus", not(test)))]
272 {
273 let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
274 self.refresh_metrics(&stats);
275 tracing::trace!(
276 health = %health_from_stats(&stats, self.am_i_public),
277 trigger = "peer update",
278 "Network health updated"
279 );
280 }
281
282 Ok(())
283 } else {
284 debug!(%peer, "Ignoring update request for unknown peer");
285 Ok(())
286 }
287 }
288
289 pub async fn health(&self) -> Health {
291 self.db
292 .network_peer_stats(self.cfg.quality_bad_threshold)
293 .await
294 .map(|stats| health_from_stats(&stats, self.am_i_public))
295 .unwrap_or(Health::Unknown)
296 }
297
298 #[cfg(all(feature = "prometheus", not(test)))]
300 fn refresh_metrics(&self, stats: &Stats) {
301 let health = health_from_stats(stats, self.am_i_public);
302
303 if METRIC_NETWORK_HEALTH_TIME_TO_GREEN.get() < 0.5f64 {
304 if let Some(ts) = current_time().checked_sub(self.started_at) {
305 METRIC_NETWORK_HEALTH_TIME_TO_GREEN.set(ts.as_unix_timestamp().as_secs_f64());
306 }
307 }
308 METRIC_PEER_COUNT.set(stats.all_count() as f64);
309 METRIC_PEERS_BY_QUALITY.set(&["public", "high"], stats.good_quality_public as f64);
310 METRIC_PEERS_BY_QUALITY.set(&["public", "low"], stats.bad_quality_public as f64);
311 METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "high"], stats.good_quality_non_public as f64);
312 METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "low"], stats.bad_quality_non_public as f64);
313 METRIC_NETWORK_HEALTH.set((health as i32).into());
314 }
315
316 pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
317 let minimum_quality = self.cfg.quality_offline_threshold;
318 self.peer_filter(|peer| async move { (peer.get_quality() > minimum_quality).then_some(peer.id.1) })
319 .await
320 }
321
322 pub(crate) async fn peer_filter<Fut, V, F>(&self, filter: F) -> Result<Vec<V>>
324 where
325 F: FnMut(PeerStatus) -> Fut,
326 Fut: std::future::Future<Output = Option<V>>,
327 {
328 Ok(self
329 .db
330 .get_network_peers(Default::default(), false)
331 .await?
332 .filter_map(filter)
333 .collect()
334 .await)
335 }
336
337 #[tracing::instrument(level = "debug", skip(self, threshold), ret(level = "trace"), err, fields(since = ?threshold))]
348 pub async fn find_peers_to_ping(&self, threshold: SystemTime) -> Result<Vec<PeerId>> {
349 Ok(self
350 .db
351 .get_network_peers(PeerSelector::default().with_last_seen_lte(threshold), true)
352 .await?
353 .filter_map(|v| async move {
354 if v.id.1 == self.me {
355 return None;
356 }
357
358 if let Some(ignore_start) = v.ignored_until {
359 let should_be_ignored = ignore_start
360 .checked_add(self.cfg.ignore_timeframe)
361 .is_some_and(|v| v > threshold);
362
363 if should_be_ignored {
364 return None;
365 }
366 }
367
368 let backoff = v.backoff.powf(self.cfg.backoff_exponent);
369 let delay = std::cmp::min(self.cfg.min_delay * (backoff as u32), self.cfg.max_delay);
370
371 if (v.last_seen + delay) < threshold {
372 Some(v.id.1)
373 } else {
374 None
375 }
376 })
377 .collect()
378 .await)
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use std::{ops::Add, time::Duration};
385
386 use anyhow::Context;
387 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
388 use hopr_platform::time::native::current_time;
389 use hopr_primitive_types::prelude::AsUnixTimestamp;
390 use libp2p_identity::PeerId;
391 use more_asserts::*;
392
393 use super::*;
394 use crate::network::{Health, Network, NetworkConfig, PeerOrigin};
395
396 impl<T> Network<T>
397 where
398 T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
399 {
400 async fn is_ignored(&self, peer: &PeerId) -> bool {
402 peer != &self.me && self.get(peer).await.is_ok_and(|ps| ps.is_some_and(|p| p.is_ignored()))
403 }
404 }
405
406 #[test]
407 fn test_network_health_should_serialize_to_a_proper_string() {
408 assert_eq!(format!("{}", Health::Orange), "Orange".to_owned())
409 }
410
411 #[test]
412 fn test_network_health_should_deserialize_from_proper_string() -> anyhow::Result<()> {
413 let parsed: Health = "Orange".parse()?;
414 assert_eq!(parsed, Health::Orange);
415
416 Ok(())
417 }
418
419 async fn basic_network(my_id: &PeerId) -> anyhow::Result<Network<hopr_db_sql::db::HoprDb>> {
420 let cfg = NetworkConfig {
421 quality_offline_threshold: 0.6,
422 ..Default::default()
423 };
424 Ok(Network::new(
425 *my_id,
426 vec![],
427 cfg,
428 hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
429 ))
430 }
431
432 #[test]
433 fn test_network_health_should_be_ordered_numerically_for_hopr_metrics_output() {
434 assert_eq!(Health::Unknown as i32, 0);
435 assert_eq!(Health::Red as i32, 1);
436 assert_eq!(Health::Orange as i32, 2);
437 assert_eq!(Health::Yellow as i32, 3);
438 assert_eq!(Health::Green as i32, 4);
439 }
440
441 #[tokio::test]
442 async fn test_network_should_not_be_able_to_add_self_reference() -> anyhow::Result<()> {
443 let me = PeerId::random();
444
445 let peers = basic_network(&me).await?;
446
447 assert!(peers.add(&me, PeerOrigin::IncomingConnection, vec![]).await.is_err());
448
449 assert_eq!(
450 0,
451 peers
452 .peer_filter(|peer| async move { Some(peer.id) })
453 .await
454 .unwrap_or(vec![])
455 .len()
456 );
457 assert!(peers.has(&me).await);
458
459 Ok(())
460 }
461
462 #[tokio::test]
463 async fn test_network_should_contain_a_registered_peer() -> anyhow::Result<()> {
464 let expected: PeerId = OffchainKeypair::random().public().into();
465 let me: PeerId = OffchainKeypair::random().public().into();
466
467 let peers = basic_network(&me).await?;
468
469 peers.add(&expected, PeerOrigin::IncomingConnection, vec![]).await?;
470
471 assert_eq!(
472 1,
473 peers
474 .peer_filter(|peer| async move { Some(peer.id) })
475 .await
476 .unwrap_or(vec![])
477 .len()
478 );
479 assert!(peers.has(&expected).await);
480
481 Ok(())
482 }
483
484 #[tokio::test]
485 async fn test_network_should_remove_a_peer_on_unregistration() -> anyhow::Result<()> {
486 let peer: PeerId = OffchainKeypair::random().public().into();
487 let me: PeerId = OffchainKeypair::random().public().into();
488
489 let peers = basic_network(&me).await?;
490
491 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
492
493 peers.remove(&peer).await?;
494
495 assert_eq!(
496 0,
497 peers
498 .peer_filter(|peer| async move { Some(peer.id) })
499 .await
500 .unwrap_or(vec![])
501 .len()
502 );
503 assert!(!peers.has(&peer).await);
504
505 Ok(())
506 }
507
508 #[tokio::test]
509 async fn test_network_should_ignore_heartbeat_updates_for_peers_that_were_not_registered() -> anyhow::Result<()> {
510 let peer: PeerId = OffchainKeypair::random().public().into();
511 let me: PeerId = OffchainKeypair::random().public().into();
512
513 let peers = basic_network(&me).await?;
514
515 peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
516
517 assert_eq!(
518 0,
519 peers
520 .peer_filter(|peer| async move { Some(peer.id) })
521 .await
522 .unwrap_or(vec![])
523 .len()
524 );
525 assert!(!peers.has(&peer).await);
526
527 Ok(())
528 }
529
530 #[tokio::test]
531 async fn test_network_should_be_able_to_register_a_succeeded_heartbeat_result() -> anyhow::Result<()> {
532 let peer: PeerId = OffchainKeypair::random().public().into();
533 let me: PeerId = OffchainKeypair::random().public().into();
534
535 let peers = basic_network(&me).await?;
536
537 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
538
539 let latency = 123u64;
540
541 peers
542 .update(&peer, Ok(std::time::Duration::from_millis(latency)))
543 .await?;
544
545 let actual = peers.get(&peer).await?.expect("peer record should be present");
546
547 assert_eq!(actual.heartbeats_sent, 1);
548 assert_eq!(actual.heartbeats_succeeded, 1);
549 assert_eq!(actual.last_seen_latency, std::time::Duration::from_millis(latency));
550
551 Ok(())
552 }
553
554 #[tokio::test]
555 async fn test_network_update_should_merge_metadata() -> anyhow::Result<()> {
556 let peer: PeerId = OffchainKeypair::random().public().into();
557 let me: PeerId = OffchainKeypair::random().public().into();
558
559 let peers = basic_network(&me).await?;
560
561 let ts = Duration::from_millis(100);
562
563 {
564 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
565 peers.update(&peer, Ok(ts)).await?;
566
567 let status = peers.get(&peer).await?.context("peer should be present")?;
568
569 assert_eq!(status.last_seen_latency, ts);
570 }
571
572 let ts = Duration::from_millis(200);
573
574 {
575 peers.update(&peer, Ok(ts)).await?;
576
577 let status = peers.get(&peer).await?.context("peer should be present")?;
578
579 assert_eq!(status.last_seen_latency, ts);
580 }
581
582 Ok(())
583 }
584
585 #[tokio::test]
586 async fn network_should_ignore_a_peer_that_has_reached_lower_thresholds_a_specified_amount_of_time()
587 -> anyhow::Result<()> {
588 let peer: PeerId = OffchainKeypair::random().public().into();
589 let me: PeerId = OffchainKeypair::random().public().into();
590
591 let peers = basic_network(&me).await?;
592
593 peers.add(&peer, PeerOrigin::NetworkRegistry, vec![]).await?;
594
595 peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
596 peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
597 peers.update(&peer, Err(UpdateFailure::Timeout)).await?; peers
600 .update(&peer, Err(UpdateFailure::Timeout))
601 .await
602 .expect("no error should occur"); assert!(peers.is_ignored(&peer).await);
605
606 peers.add(&peer, PeerOrigin::ManualPing, vec![]).await?;
608
609 assert!(peers.is_ignored(&peer).await);
610
611 Ok(())
612 }
613
614 #[tokio::test]
615 async fn network_should_stop_ignoring_a_peer_that_has_reached_lower_thresholds_but_connected_back()
616 -> anyhow::Result<()> {
617 let peer: PeerId = OffchainKeypair::random().public().into();
618 let me: PeerId = OffchainKeypair::random().public().into();
619
620 let peers = basic_network(&me).await?;
621
622 peers.add(&peer, PeerOrigin::NetworkRegistry, vec![]).await?;
623
624 peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
625 peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
626 peers.update(&peer, Err(UpdateFailure::Timeout)).await?; peers
629 .update(&peer, Err(UpdateFailure::Timeout))
630 .await
631 .expect("no error should occur"); assert!(peers.is_ignored(&peer).await);
634
635 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
637
638 assert!(!peers.is_ignored(&peer).await);
639
640 Ok(())
641 }
642
643 #[tokio::test]
644 async fn network_should_ignore_a_peer_that_could_not_be_dialed() -> anyhow::Result<()> {
645 let peer: PeerId = OffchainKeypair::random().public().into();
646 let me: PeerId = OffchainKeypair::random().public().into();
647
648 let peers = basic_network(&me).await?;
649
650 peers.add(&peer, PeerOrigin::NetworkRegistry, vec![]).await?;
651
652 peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
653 peers.update(&peer, Err(UpdateFailure::DialFailure)).await?; assert!(peers.is_ignored(&peer).await);
656
657 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
658
659 assert!(!peers.is_ignored(&peer).await);
660
661 Ok(())
662 }
663
664 #[tokio::test]
665 async fn test_network_should_be_able_to_register_a_failed_heartbeat_result() -> anyhow::Result<()> {
666 let peer: PeerId = OffchainKeypair::random().public().into();
667 let me: PeerId = OffchainKeypair::random().public().into();
668
669 let peers = basic_network(&me).await?;
670
671 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
672
673 peers
676 .update(&peer, Ok(std::time::Duration::from_millis(123_u64)))
677 .await?;
678 peers
679 .update(&peer, Ok(std::time::Duration::from_millis(200_u64)))
680 .await?;
681 peers
682 .update(&peer, Ok(std::time::Duration::from_millis(200_u64)))
683 .await?;
684
685 peers.update(&peer, Err(UpdateFailure::Timeout)).await?;
686
687 let actual = peers.get(&peer).await?.expect("the peer record should be present");
688
689 assert_eq!(actual.heartbeats_succeeded, 3);
690 assert_lt!(actual.backoff, 3f64);
691 assert_gt!(actual.backoff, 2f64);
692
693 Ok(())
694 }
695
696 #[tokio::test]
697 async fn test_network_should_not_overflow_max_backoff() -> anyhow::Result<()> {
698 let peer: PeerId = OffchainKeypair::random().public().into();
699 let me: PeerId = OffchainKeypair::random().public().into();
700
701 let peers = basic_network(&me).await?;
702
703 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
704
705 for latency in [123_u64, 200_u64, 200_u64] {
706 peers
707 .update(&peer, Ok(std::time::Duration::from_millis(latency)))
708 .await?;
709 }
710
711 loop {
713 let updated_peer = peers.get(&peer).await?.expect("the peer record should be present");
714 if updated_peer.backoff == peers.cfg.backoff_max {
715 break;
716 }
717
718 peers.update(&peer, Err(UpdateFailure::Timeout)).await?;
719 }
720
721 peers.update(&peer, Err(UpdateFailure::Timeout)).await?;
723 let actual = peers.get(&peer).await?.expect("the peer record should be present");
724
725 assert_eq!(actual.backoff, peers.cfg.backoff_max);
726
727 Ok(())
728 }
729
730 #[tokio::test]
731 async fn test_network_peer_should_be_listed_for_the_ping_if_last_recorded_later_than_reference()
732 -> anyhow::Result<()> {
733 let first: PeerId = OffchainKeypair::random().public().into();
734 let second: PeerId = OffchainKeypair::random().public().into();
735 let me: PeerId = OffchainKeypair::random().public().into();
736
737 let peers = basic_network(&me).await?;
738
739 peers.add(&first, PeerOrigin::IncomingConnection, vec![]).await?;
740 peers.add(&second, PeerOrigin::IncomingConnection, vec![]).await?;
741
742 let latency = 77_u64;
743
744 let mut expected = vec![first, second];
745 expected.sort();
746
747 peers
748 .update(&first, Ok(std::time::Duration::from_millis(latency)))
749 .await?;
750 peers
751 .update(&second, Ok(std::time::Duration::from_millis(latency)))
752 .await?;
753
754 let mut actual = peers
764 .find_peers_to_ping(current_time().add(Duration::from_secs(2u64)))
765 .await?;
766 actual.sort();
767
768 assert_eq!(actual, expected);
769
770 Ok(())
771 }
772
773 #[tokio::test]
774 async fn test_network_should_have_red_health_without_any_registered_peers() -> anyhow::Result<()> {
775 let me: PeerId = OffchainKeypair::random().public().into();
776
777 let peers = basic_network(&me).await?;
778
779 assert_eq!(peers.health().await, Health::Red);
780
781 Ok(())
782 }
783
784 #[tokio::test]
785 async fn test_network_should_be_unhealthy_without_any_heartbeat_updates() -> anyhow::Result<()> {
786 let peer: PeerId = OffchainKeypair::random().public().into();
787 let me: PeerId = OffchainKeypair::random().public().into();
788
789 let peers = basic_network(&me).await?;
790
791 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
792
793 assert_eq!(peers.health().await, Health::Orange);
795
796 Ok(())
797 }
798
799 #[tokio::test]
800 async fn test_network_should_be_unhealthy_without_any_peers_once_the_health_was_known() -> anyhow::Result<()> {
801 let peer: PeerId = OffchainKeypair::random().public().into();
802 let me: PeerId = OffchainKeypair::random().public().into();
803
804 let peers = basic_network(&me).await?;
805
806 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
807 let _ = peers.health().await;
808 peers.remove(&peer).await?;
809
810 assert_eq!(peers.health().await, Health::Red);
811
812 Ok(())
813 }
814
815 #[tokio::test]
816 async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_low_quality() -> anyhow::Result<()> {
817 let peer: PeerId = OffchainKeypair::random().public().into();
818 let me: PeerId = OffchainKeypair::random().public().into();
819
820 let cfg = NetworkConfig {
821 quality_offline_threshold: 0.6,
822 ..Default::default()
823 };
824
825 let peers = Network::new(
826 me,
827 vec![],
828 cfg,
829 hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
830 );
831
832 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
833
834 peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
835
836 assert_eq!(peers.health().await, Health::Orange);
837
838 Ok(())
839 }
840
841 #[tokio::test]
842 async fn network_should_allow_the_quality_to_go_to_0() -> anyhow::Result<()> {
843 let peer: PeerId = OffchainKeypair::random().public().into();
844 let public = peer;
845 let me: PeerId = OffchainKeypair::random().public().into();
846
847 let cfg = NetworkConfig {
848 quality_offline_threshold: 0.6,
849 ..Default::default()
850 };
851
852 let peers = Network::new(
853 me,
854 vec![],
855 cfg,
856 hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
857 );
858
859 peers.add(&peer, PeerOrigin::NetworkRegistry, vec![]).await?;
860
861 assert!(peers.update(&peer, Err(UpdateFailure::Timeout)).await.is_ok());
862
863 assert!(peers.is_ignored(&public).await);
864
865 Ok(())
866 }
867
868 #[tokio::test]
869 async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_high_quality_and_i_am_public()
870 -> anyhow::Result<()> {
871 let me: PeerId = OffchainKeypair::random().public().into();
872 let peer: PeerId = OffchainKeypair::random().public().into();
873
874 let cfg = NetworkConfig {
875 quality_offline_threshold: 0.3,
876 ..Default::default()
877 };
878
879 let peers = Network::new(
880 me,
881 vec![],
882 cfg,
883 hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
884 );
885
886 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
887
888 for _ in 0..3 {
889 peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
890 }
891
892 assert_eq!(peers.health().await, Health::Green);
893
894 Ok(())
895 }
896
897 #[tokio::test]
898 async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_high_quality_and_another_high_quality_non_public()
899 -> anyhow::Result<()> {
900 let peer: PeerId = OffchainKeypair::random().public().into();
901 let peer2: PeerId = OffchainKeypair::random().public().into();
902
903 let cfg = NetworkConfig {
904 quality_offline_threshold: 0.3,
905 ..Default::default()
906 };
907
908 let peers = Network::new(
909 OffchainKeypair::random().public().into(),
910 vec![],
911 cfg,
912 hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
913 );
914
915 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
916 peers.add(&peer2, PeerOrigin::IncomingConnection, vec![]).await?;
917
918 for _ in 0..3 {
919 peers.update(&peer2, Ok(current_time().as_unix_timestamp())).await?;
920 peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
921 }
922
923 assert_eq!(peers.health().await, Health::Green);
924
925 Ok(())
926 }
927}