1use std::{
2 collections::hash_set::HashSet,
3 time::{Duration, SystemTime},
4};
5
6use futures::StreamExt;
7pub use hopr_db_api::peers::{HoprDbPeersOperations, PeerOrigin, PeerSelector, PeerStatus, Stats};
8use hopr_platform::time::native::current_time;
9use libp2p_identity::PeerId;
10use multiaddr::Multiaddr;
11use tracing::debug;
12#[cfg(all(feature = "prometheus", not(test)))]
13use {
14 hopr_metrics::metrics::{MultiGauge, SimpleGauge},
15 hopr_primitive_types::prelude::*,
16};
17
18use crate::{config::NetworkConfig, errors::Result};
19
20#[cfg(all(feature = "prometheus", not(test)))]
21lazy_static::lazy_static! {
22 static ref METRIC_NETWORK_HEALTH: SimpleGauge =
23 SimpleGauge::new("hopr_network_health", "Connectivity health indicator").unwrap();
24 static ref METRIC_PEERS_BY_QUALITY: MultiGauge =
25 MultiGauge::new("hopr_peers_by_quality", "Number different peer types by quality",
26 &["type", "quality"],
27 ).unwrap();
28 static ref METRIC_PEER_COUNT: SimpleGauge =
29 SimpleGauge::new("hopr_peer_count", "Number of all peers").unwrap();
30 static ref METRIC_NETWORK_HEALTH_TIME_TO_GREEN: SimpleGauge = SimpleGauge::new(
31 "hopr_time_to_green_sec",
32 "Time it takes for a node to transition to the GREEN network state"
33 ).unwrap();
34}
35
36#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, strum::Display, strum::EnumString)]
39pub enum Health {
40 Unknown = 0,
42 Red = 1,
44 Orange = 2,
46 Yellow = 3,
48 Green = 4,
50}
51
52fn health_from_stats(stats: &Stats, is_public: bool) -> Health {
54 let mut health = Health::Red;
55
56 if stats.bad_quality_public > 0 {
57 health = Health::Orange;
58 }
59
60 if stats.good_quality_public > 0 {
61 health = if is_public || stats.good_quality_non_public > 0 {
62 Health::Green
63 } else {
64 Health::Yellow
65 };
66 }
67
68 health
69}
70
71#[derive(Debug)]
74pub struct Network<T>
75where
76 T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
77{
78 me: PeerId,
79 me_addresses: Vec<Multiaddr>,
80 am_i_public: bool,
81 cfg: NetworkConfig,
82 db: T,
83 #[cfg(all(feature = "prometheus", not(test)))]
84 started_at: Duration,
85}
86
87impl<T> Network<T>
88where
89 T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
90{
91 pub fn new(my_peer_id: PeerId, my_multiaddresses: Vec<Multiaddr>, cfg: NetworkConfig, db: T) -> Self {
92 #[cfg(all(feature = "prometheus", not(test)))]
93 {
94 METRIC_NETWORK_HEALTH.set(0.0);
95 METRIC_NETWORK_HEALTH_TIME_TO_GREEN.set(0.0);
96 METRIC_PEERS_BY_QUALITY.set(&["public", "high"], 0.0);
97 METRIC_PEERS_BY_QUALITY.set(&["public", "low"], 0.0);
98 METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "high"], 0.0);
99 METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "low"], 0.0);
100 }
101
102 Self {
103 me: my_peer_id,
104 me_addresses: my_multiaddresses,
105 am_i_public: true,
106 cfg: cfg.clone(),
107 db,
108 #[cfg(all(feature = "prometheus", not(test)))]
109 started_at: current_time().as_unix_timestamp(),
110 }
111 }
112
113 pub async fn has(&self, peer: &PeerId) -> bool {
115 peer == &self.me || self.db.get_network_peer(peer).await.is_ok_and(|p| p.is_some())
116 }
117
118 pub async fn add(&self, peer: &PeerId, origin: PeerOrigin, mut addrs: Vec<Multiaddr>) -> Result<()> {
122 if peer == &self.me {
123 return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
124 }
125
126 if let Some(mut peer_status) = self.db.get_network_peer(peer).await? {
127 if !peer_status.is_ignored(current_time(), self.cfg.ignore_timeframe) {
128 peer_status.ignored = None;
129 peer_status.multiaddresses.append(&mut addrs);
130 peer_status.multiaddresses = peer_status
131 .multiaddresses
132 .into_iter()
133 .collect::<HashSet<_>>()
134 .into_iter()
135 .collect::<Vec<_>>();
136 self.db.update_network_peer(peer_status).await?;
137 }
138 } else {
139 debug!(%peer, %origin, multiaddresses = ?addrs, "Adding peer to the store");
140
141 self.db
142 .add_network_peer(
143 peer,
144 origin,
145 addrs,
146 self.cfg.backoff_exponent,
147 self.cfg.quality_avg_window_size,
148 )
149 .await?;
150 }
151
152 #[cfg(all(feature = "prometheus", not(test)))]
153 {
154 let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
155 self.refresh_metrics(&stats)
156 }
157
158 Ok(())
159 }
160
161 pub async fn get(&self, peer: &PeerId) -> Result<Option<PeerStatus>> {
163 if peer == &self.me {
164 Ok(Some({
165 let mut ps = PeerStatus::new(*peer, PeerOrigin::Initialization, 0.0f64, 2u32);
166 ps.multiaddresses.clone_from(&self.me_addresses);
167 ps
168 }))
169 } else {
170 Ok(self.db.get_network_peer(peer).await?)
171 }
172 }
173
174 pub async fn remove(&self, peer: &PeerId) -> Result<()> {
176 if peer == &self.me {
177 return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
178 }
179
180 self.db.remove_network_peer(peer).await?;
181
182 #[cfg(all(feature = "prometheus", not(test)))]
183 {
184 let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
185 self.refresh_metrics(&stats)
186 }
187
188 Ok(())
189 }
190
191 pub async fn update(
202 &self,
203 peer: &PeerId,
204 ping_result: std::result::Result<Duration, ()>,
205 version: Option<String>,
206 ) -> Result<()> {
207 if peer == &self.me {
208 return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
209 }
210
211 if let Some(mut entry) = self.db.get_network_peer(peer).await? {
212 if !entry.is_ignored(current_time(), self.cfg.ignore_timeframe) {
213 entry.ignored = None;
214 }
215
216 entry.heartbeats_sent += 1;
217 entry.peer_version = version;
218
219 if let Ok(latency) = ping_result {
220 entry.last_seen = current_time();
221 entry.last_seen_latency = latency;
222 entry.heartbeats_succeeded += 1;
223 entry.backoff = self.cfg.backoff_min;
225 entry.update_quality(1.0_f64.min(entry.get_quality() + self.cfg.quality_step));
226 } else {
227 entry.backoff = self.cfg.backoff_max.min(entry.backoff.powf(self.cfg.backoff_exponent));
230 entry.update_quality(0.0_f64.max(entry.get_quality() - self.cfg.quality_step));
231
232 let q = entry.get_quality();
233
234 if q < self.cfg.quality_bad_threshold {
235 entry.ignored = Some(current_time());
236 }
237 }
238
239 self.db.update_network_peer(entry).await?;
240
241 #[cfg(all(feature = "prometheus", not(test)))]
242 {
243 let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
244 self.refresh_metrics(&stats)
245 }
246
247 Ok(())
248 } else {
249 debug!(%peer, "Ignoring update request for unknown peer");
250 Ok(())
251 }
252 }
253
254 pub async fn health(&self) -> Health {
256 self.db
257 .network_peer_stats(self.cfg.quality_bad_threshold)
258 .await
259 .map(|stats| health_from_stats(&stats, self.am_i_public))
260 .unwrap_or(Health::Unknown)
261 }
262
263 #[cfg(all(feature = "prometheus", not(test)))]
265 fn refresh_metrics(&self, stats: &Stats) {
266 let health = health_from_stats(stats, self.am_i_public);
267
268 if METRIC_NETWORK_HEALTH_TIME_TO_GREEN.get() < 0.5f64 {
269 if let Some(ts) = current_time().checked_sub(self.started_at) {
270 METRIC_NETWORK_HEALTH_TIME_TO_GREEN.set(ts.as_unix_timestamp().as_secs_f64());
271 }
272 }
273 METRIC_PEER_COUNT.set(stats.all_count() as f64);
274 METRIC_PEERS_BY_QUALITY.set(&["public", "high"], stats.good_quality_public as f64);
275 METRIC_PEERS_BY_QUALITY.set(&["public", "low"], stats.bad_quality_public as f64);
276 METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "high"], stats.good_quality_non_public as f64);
277 METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "low"], stats.bad_quality_non_public as f64);
278 METRIC_NETWORK_HEALTH.set((health as i32).into());
279 }
280
281 pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
282 let minimum_quality = self.cfg.quality_offline_threshold;
283 self.peer_filter(|peer| async move { (peer.get_quality() > minimum_quality).then_some(peer.id.1) })
284 .await
285 }
286
287 pub(crate) async fn peer_filter<Fut, V, F>(&self, filter: F) -> Result<Vec<V>>
289 where
290 F: FnMut(PeerStatus) -> Fut,
291 Fut: std::future::Future<Output = Option<V>>,
292 {
293 let stream = self.db.get_network_peers(Default::default(), false).await?;
294 futures::pin_mut!(stream);
295 Ok(stream.filter_map(filter).collect().await)
296 }
297
298 pub async fn find_peers_to_ping(&self, threshold: SystemTime) -> Result<Vec<PeerId>> {
309 let stream = self
310 .db
311 .get_network_peers(PeerSelector::default().with_last_seen_lte(threshold), false)
312 .await?;
313 futures::pin_mut!(stream);
314 let mut data: Vec<PeerStatus> = stream
315 .filter_map(|v| async move {
316 if v.id.1 == self.me {
317 return None;
318 }
319
320 if let Some(ignore_start) = v.ignored {
321 let should_be_ignored = ignore_start
322 .checked_add(self.cfg.ignore_timeframe)
323 .is_some_and(|v| v > threshold);
324
325 if should_be_ignored {
326 return None;
327 }
328 }
329
330 let backoff = v.backoff.powf(self.cfg.backoff_exponent);
331 let delay = std::cmp::min(self.cfg.min_delay * (backoff as u32), self.cfg.max_delay);
332
333 if (v.last_seen + delay) < threshold {
334 Some(v)
335 } else {
336 None
337 }
338 })
339 .collect()
340 .await;
341
342 data.sort_by(|a, b| {
343 if a.last_seen < b.last_seen {
344 std::cmp::Ordering::Less
345 } else {
346 std::cmp::Ordering::Greater
347 }
348 });
349
350 Ok(data.into_iter().map(|peer| peer.id.1).collect())
351 }
352}
353
354impl<T> Network<T>
355where
356 T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
357{
358 #[cfg(test)]
359 async fn is_ignored(&self, peer: &PeerId) -> bool {
361 peer != &self.me
362 && self
363 .get(peer)
364 .await
365 .is_ok_and(|ps| ps.is_some_and(|p| p.is_ignored(current_time(), self.cfg.ignore_timeframe)))
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use std::{ops::Add, time::Duration};
372
373 use anyhow::Context;
374 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
375 use hopr_platform::time::native::current_time;
376 use hopr_primitive_types::prelude::AsUnixTimestamp;
377 use libp2p_identity::PeerId;
378 use more_asserts::*;
379
380 use super::*;
381 use crate::network::{Health, Network, NetworkConfig, PeerOrigin};
382
383 #[test]
384 fn test_network_health_should_serialize_to_a_proper_string() {
385 assert_eq!(format!("{}", Health::Orange), "Orange".to_owned())
386 }
387
388 #[test]
389 fn test_network_health_should_deserialize_from_proper_string() -> anyhow::Result<()> {
390 let parsed: Health = "Orange".parse()?;
391 assert_eq!(parsed, Health::Orange);
392
393 Ok(())
394 }
395
396 async fn basic_network(my_id: &PeerId) -> anyhow::Result<Network<hopr_db_sql::db::HoprDb>> {
397 let cfg = NetworkConfig {
398 quality_offline_threshold: 0.6,
399 ..Default::default()
400 };
401 Ok(Network::new(
402 *my_id,
403 vec![],
404 cfg,
405 hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
406 ))
407 }
408
409 #[test]
410 fn test_network_health_should_be_ordered_numerically_for_hopr_metrics_output() {
411 assert_eq!(Health::Unknown as i32, 0);
412 assert_eq!(Health::Red as i32, 1);
413 assert_eq!(Health::Orange as i32, 2);
414 assert_eq!(Health::Yellow as i32, 3);
415 assert_eq!(Health::Green as i32, 4);
416 }
417
418 #[tokio::test]
419 async fn test_network_should_not_be_able_to_add_self_reference() -> anyhow::Result<()> {
420 let me = PeerId::random();
421
422 let peers = basic_network(&me).await?;
423
424 assert!(peers.add(&me, PeerOrigin::IncomingConnection, vec![]).await.is_err());
425
426 assert_eq!(
427 0,
428 peers
429 .peer_filter(|peer| async move { Some(peer.id) })
430 .await
431 .unwrap_or(vec![])
432 .len()
433 );
434 assert!(peers.has(&me).await);
435
436 Ok(())
437 }
438
439 #[tokio::test]
440 async fn test_network_should_contain_a_registered_peer() -> anyhow::Result<()> {
441 let expected: PeerId = OffchainKeypair::random().public().into();
442 let me: PeerId = OffchainKeypair::random().public().into();
443
444 let peers = basic_network(&me).await?;
445
446 peers.add(&expected, PeerOrigin::IncomingConnection, vec![]).await?;
447
448 assert_eq!(
449 1,
450 peers
451 .peer_filter(|peer| async move { Some(peer.id) })
452 .await
453 .unwrap_or(vec![])
454 .len()
455 );
456 assert!(peers.has(&expected).await);
457
458 Ok(())
459 }
460
461 #[tokio::test]
462 async fn test_network_should_remove_a_peer_on_unregistration() -> anyhow::Result<()> {
463 let peer: PeerId = OffchainKeypair::random().public().into();
464 let me: PeerId = OffchainKeypair::random().public().into();
465
466 let peers = basic_network(&me).await?;
467
468 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
469
470 peers.remove(&peer).await?;
471
472 assert_eq!(
473 0,
474 peers
475 .peer_filter(|peer| async move { Some(peer.id) })
476 .await
477 .unwrap_or(vec![])
478 .len()
479 );
480 assert!(!peers.has(&peer).await);
481
482 Ok(())
483 }
484
485 #[tokio::test]
486 async fn test_network_should_ignore_heartbeat_updates_for_peers_that_were_not_registered() -> anyhow::Result<()> {
487 let peer: PeerId = OffchainKeypair::random().public().into();
488 let me: PeerId = OffchainKeypair::random().public().into();
489
490 let peers = basic_network(&me).await?;
491
492 peers
493 .update(&peer, Ok(current_time().as_unix_timestamp()), None)
494 .await?;
495
496 assert_eq!(
497 0,
498 peers
499 .peer_filter(|peer| async move { Some(peer.id) })
500 .await
501 .unwrap_or(vec![])
502 .len()
503 );
504 assert!(!peers.has(&peer).await);
505
506 Ok(())
507 }
508
509 #[tokio::test]
510 async fn test_network_should_be_able_to_register_a_succeeded_heartbeat_result() -> anyhow::Result<()> {
511 let peer: PeerId = OffchainKeypair::random().public().into();
512 let me: PeerId = OffchainKeypair::random().public().into();
513
514 let peers = basic_network(&me).await?;
515
516 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
517
518 let latency = 123u64;
519
520 peers
521 .update(&peer, Ok(std::time::Duration::from_millis(latency)), None)
522 .await?;
523
524 let actual = peers.get(&peer).await?.expect("peer record should be present");
525
526 assert_eq!(actual.heartbeats_sent, 1);
527 assert_eq!(actual.heartbeats_succeeded, 1);
528 assert_eq!(actual.last_seen_latency, std::time::Duration::from_millis(latency));
529
530 Ok(())
531 }
532
533 #[tokio::test]
534 async fn test_network_update_should_merge_metadata() -> anyhow::Result<()> {
535 let peer: PeerId = OffchainKeypair::random().public().into();
536 let me: PeerId = OffchainKeypair::random().public().into();
537
538 let peers = basic_network(&me).await?;
539
540 let expected_version = Some("1.2.4".to_string());
541
542 {
543 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
544 peers
545 .update(&peer, Ok(current_time().as_unix_timestamp()), expected_version.clone())
546 .await?;
547
548 let status = peers.get(&peer).await?.context("peer should be present")?;
549
550 assert_eq!(status.peer_version, expected_version);
551 }
552
553 let ts = current_time().as_unix_timestamp();
554
555 {
556 let expected_version = Some("2.0.0".to_string());
557
558 peers.update(&peer, Ok(ts), expected_version.clone()).await?;
559
560 let status = peers.get(&peer).await?.context("peer should be present")?;
561
562 assert_eq!(status.peer_version, expected_version);
563 }
564
565 Ok(())
566 }
567
568 #[tokio::test]
569 async fn test_network_should_ignore_a_peer_that_has_reached_lower_thresholds_a_specified_amount_of_time()
570 -> anyhow::Result<()> {
571 let peer: PeerId = OffchainKeypair::random().public().into();
572 let me: PeerId = OffchainKeypair::random().public().into();
573
574 let peers = basic_network(&me).await?;
575
576 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
577
578 peers
579 .update(&peer, Ok(current_time().as_unix_timestamp()), None)
580 .await?;
581 peers
582 .update(&peer, Ok(current_time().as_unix_timestamp()), None)
583 .await?;
584 peers.update(&peer, Err(()), None).await?; peers.update(&peer, Err(()), None).await.expect("no error should occur"); assert!(peers.is_ignored(&peer).await);
589
590 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
592
593 assert!(peers.is_ignored(&peer).await);
594
595 Ok(())
596 }
597
598 #[tokio::test]
599 async fn test_network_should_be_able_to_register_a_failed_heartbeat_result() -> anyhow::Result<()> {
600 let peer: PeerId = OffchainKeypair::random().public().into();
601 let me: PeerId = OffchainKeypair::random().public().into();
602
603 let peers = basic_network(&me).await?;
604
605 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
606
607 peers
610 .update(&peer, Ok(std::time::Duration::from_millis(123_u64)), None)
611 .await?;
612 peers
613 .update(&peer, Ok(std::time::Duration::from_millis(200_u64)), None)
614 .await?;
615 peers
616 .update(&peer, Ok(std::time::Duration::from_millis(200_u64)), None)
617 .await?;
618
619 peers.update(&peer, Err(()), None).await?;
620
621 let actual = peers.get(&peer).await?.expect("the peer record should be present");
622
623 assert_eq!(actual.heartbeats_succeeded, 3);
624 assert_lt!(actual.backoff, 3f64);
625 assert_gt!(actual.backoff, 2f64);
626
627 Ok(())
628 }
629
630 #[tokio::test]
631 async fn test_network_should_not_overflow_max_backoff() -> anyhow::Result<()> {
632 let peer: PeerId = OffchainKeypair::random().public().into();
633 let me: PeerId = OffchainKeypair::random().public().into();
634
635 let peers = basic_network(&me).await?;
636
637 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
638
639 for latency in [123_u64, 200_u64, 200_u64] {
640 peers
641 .update(&peer, Ok(std::time::Duration::from_millis(latency)), None)
642 .await?;
643 }
644
645 loop {
647 let updated_peer = peers.get(&peer).await?.expect("the peer record should be present");
648 if updated_peer.backoff == peers.cfg.backoff_max {
649 break;
650 }
651
652 peers.update(&peer, Err(()), None).await?;
653 }
654
655 peers.update(&peer, Err(()), None).await?;
657 let actual = peers.get(&peer).await?.expect("the peer record should be present");
658
659 assert_eq!(actual.backoff, peers.cfg.backoff_max);
660
661 Ok(())
662 }
663
664 #[tokio::test]
665 async fn test_network_peer_should_be_listed_for_the_ping_if_last_recorded_later_than_reference()
666 -> anyhow::Result<()> {
667 let first: PeerId = OffchainKeypair::random().public().into();
668 let second: PeerId = OffchainKeypair::random().public().into();
669 let me: PeerId = OffchainKeypair::random().public().into();
670
671 let peers = basic_network(&me).await?;
672
673 peers.add(&first, PeerOrigin::IncomingConnection, vec![]).await?;
674 peers.add(&second, PeerOrigin::IncomingConnection, vec![]).await?;
675
676 let latency = 77_u64;
677
678 let mut expected = vec![first, second];
679 expected.sort();
680
681 peers
682 .update(&first, Ok(std::time::Duration::from_millis(latency)), None)
683 .await?;
684 peers
685 .update(&second, Ok(std::time::Duration::from_millis(latency)), None)
686 .await?;
687
688 let mut actual = peers
698 .find_peers_to_ping(current_time().add(Duration::from_secs(2u64)))
699 .await?;
700 actual.sort();
701
702 assert_eq!(actual, expected);
703
704 Ok(())
705 }
706
707 #[tokio::test]
708 async fn test_network_should_have_red_health_without_any_registered_peers() -> anyhow::Result<()> {
709 let me: PeerId = OffchainKeypair::random().public().into();
710
711 let peers = basic_network(&me).await?;
712
713 assert_eq!(peers.health().await, Health::Red);
714
715 Ok(())
716 }
717
718 #[tokio::test]
719 async fn test_network_should_be_unhealthy_without_any_heartbeat_updates() -> anyhow::Result<()> {
720 let peer: PeerId = OffchainKeypair::random().public().into();
721 let me: PeerId = OffchainKeypair::random().public().into();
722
723 let peers = basic_network(&me).await?;
724
725 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
726
727 assert_eq!(peers.health().await, Health::Orange);
729
730 Ok(())
731 }
732
733 #[tokio::test]
734 async fn test_network_should_be_unhealthy_without_any_peers_once_the_health_was_known() -> anyhow::Result<()> {
735 let peer: PeerId = OffchainKeypair::random().public().into();
736 let me: PeerId = OffchainKeypair::random().public().into();
737
738 let peers = basic_network(&me).await?;
739
740 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
741 let _ = peers.health().await;
742 peers.remove(&peer).await?;
743
744 assert_eq!(peers.health().await, Health::Red);
745
746 Ok(())
747 }
748
749 #[tokio::test]
750 async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_low_quality() -> anyhow::Result<()> {
751 let peer: PeerId = OffchainKeypair::random().public().into();
752 let me: PeerId = OffchainKeypair::random().public().into();
753
754 let cfg = NetworkConfig {
755 quality_offline_threshold: 0.6,
756 ..Default::default()
757 };
758
759 let peers = Network::new(
760 me,
761 vec![],
762 cfg,
763 hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
764 );
765
766 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
767
768 peers
769 .update(&peer, Ok(current_time().as_unix_timestamp()), None)
770 .await?;
771
772 assert_eq!(peers.health().await, Health::Orange);
773
774 Ok(())
775 }
776
777 #[tokio::test]
778 async fn test_network_should_allow_the_quality_to_go_to_0() -> anyhow::Result<()> {
779 let peer: PeerId = OffchainKeypair::random().public().into();
780 let public = peer;
781 let me: PeerId = OffchainKeypair::random().public().into();
782
783 let cfg = NetworkConfig {
784 quality_offline_threshold: 0.6,
785 ..Default::default()
786 };
787
788 let peers = Network::new(
789 me,
790 vec![],
791 cfg,
792 hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
793 );
794
795 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
796
797 assert!(peers.update(&peer, Err(()), None).await.is_ok());
798
799 assert!(peers.is_ignored(&public).await);
800
801 Ok(())
802 }
803
804 #[tokio::test]
805 async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_high_quality_and_i_am_public()
806 -> anyhow::Result<()> {
807 let me: PeerId = OffchainKeypair::random().public().into();
808 let peer: PeerId = OffchainKeypair::random().public().into();
809
810 let cfg = NetworkConfig {
811 quality_offline_threshold: 0.3,
812 ..Default::default()
813 };
814
815 let peers = Network::new(
816 me,
817 vec![],
818 cfg,
819 hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
820 );
821
822 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
823
824 for _ in 0..3 {
825 peers
826 .update(&peer, Ok(current_time().as_unix_timestamp()), None)
827 .await?;
828 }
829
830 assert_eq!(peers.health().await, Health::Green);
831
832 Ok(())
833 }
834
835 #[tokio::test]
836 async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_high_quality_and_another_high_quality_non_public()
837 -> anyhow::Result<()> {
838 let peer: PeerId = OffchainKeypair::random().public().into();
839 let peer2: PeerId = OffchainKeypair::random().public().into();
840
841 let cfg = NetworkConfig {
842 quality_offline_threshold: 0.3,
843 ..Default::default()
844 };
845
846 let peers = Network::new(
847 OffchainKeypair::random().public().into(),
848 vec![],
849 cfg,
850 hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
851 );
852
853 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
854 peers.add(&peer2, PeerOrigin::IncomingConnection, vec![]).await?;
855
856 for _ in 0..3 {
857 peers
858 .update(&peer2, Ok(current_time().as_unix_timestamp()), None)
859 .await?;
860 peers
861 .update(&peer, Ok(current_time().as_unix_timestamp()), None)
862 .await?;
863 }
864
865 assert_eq!(peers.health().await, Health::Green);
866
867 Ok(())
868 }
869}