1use std::collections::hash_set::HashSet;
2use std::time::{Duration, SystemTime};
3
4use futures::StreamExt;
5use libp2p_identity::PeerId;
6
7use multiaddr::Multiaddr;
8use tracing::debug;
9
10pub use hopr_db_api::peers::{HoprDbPeersOperations, PeerOrigin, PeerSelector, PeerStatus, Stats};
11use hopr_platform::time::native::current_time;
12
13use crate::config::NetworkConfig;
14
15#[cfg(all(feature = "prometheus", not(test)))]
16use {
17 hopr_metrics::metrics::{MultiGauge, SimpleGauge},
18 hopr_primitive_types::prelude::*,
19};
20
21#[cfg(all(feature = "prometheus", not(test)))]
22lazy_static::lazy_static! {
23 static ref METRIC_NETWORK_HEALTH: SimpleGauge =
24 SimpleGauge::new("hopr_network_health", "Connectivity health indicator").unwrap();
25 static ref METRIC_PEERS_BY_QUALITY: MultiGauge =
26 MultiGauge::new("hopr_peers_by_quality", "Number different peer types by quality",
27 &["type", "quality"],
28 ).unwrap();
29 static ref METRIC_PEER_COUNT: SimpleGauge =
30 SimpleGauge::new("hopr_peer_count", "Number of all peers").unwrap();
31 static ref METRIC_NETWORK_HEALTH_TIME_TO_GREEN: SimpleGauge = SimpleGauge::new(
32 "hopr_time_to_green_sec",
33 "Time it takes for a node to transition to the GREEN network state"
34 ).unwrap();
35}
36
37#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, strum::Display, strum::EnumString)]
40pub enum Health {
41 Unknown = 0,
43 Red = 1,
45 Orange = 2,
47 Yellow = 3,
49 Green = 4,
51}
52
53#[derive(Debug, Clone, PartialEq, strum::Display)]
57pub enum NetworkTriggeredEvent {
58 CloseConnection(PeerId),
59 UpdateQuality(PeerId, f64),
60}
61
62fn health_from_stats(stats: &Stats, is_public: bool) -> Health {
64 let mut health = Health::Red;
65
66 if stats.bad_quality_public > 0 {
67 health = Health::Orange;
68 }
69
70 if stats.good_quality_public > 0 {
71 health = if is_public || stats.good_quality_non_public > 0 {
72 Health::Green
73 } else {
74 Health::Yellow
75 };
76 }
77
78 health
79}
80
81#[derive(Debug)]
84pub struct Network<T>
85where
86 T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
87{
88 me: PeerId,
89 me_addresses: Vec<Multiaddr>,
90 am_i_public: bool,
91 cfg: NetworkConfig,
92 db: T,
93 #[cfg(all(feature = "prometheus", not(test)))]
94 started_at: Duration,
95}
96
97impl<T> Network<T>
98where
99 T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
100{
101 pub fn new(my_peer_id: PeerId, my_multiaddresses: Vec<Multiaddr>, cfg: NetworkConfig, db: T) -> Self {
102 #[cfg(all(feature = "prometheus", not(test)))]
103 {
104 METRIC_NETWORK_HEALTH.set(0.0);
105 METRIC_NETWORK_HEALTH_TIME_TO_GREEN.set(0.0);
106 METRIC_PEERS_BY_QUALITY.set(&["public", "high"], 0.0);
107 METRIC_PEERS_BY_QUALITY.set(&["public", "low"], 0.0);
108 METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "high"], 0.0);
109 METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "low"], 0.0);
110 }
111
112 Self {
113 me: my_peer_id,
114 me_addresses: my_multiaddresses,
115 am_i_public: true,
116 cfg: cfg.clone(),
117 db,
118 #[cfg(all(feature = "prometheus", not(test)))]
119 started_at: current_time().as_unix_timestamp(),
120 }
121 }
122
123 pub async fn has(&self, peer: &PeerId) -> bool {
125 peer == &self.me || self.db.get_network_peer(peer).await.is_ok_and(|p| p.is_some())
126 }
127
128 pub async fn is_ignored(&self, peer: &PeerId) -> bool {
130 peer != &self.me
131 && self
132 .get(peer)
133 .await
134 .is_ok_and(|ps| ps.is_some_and(|p| p.is_ignored(current_time(), self.cfg.ignore_timeframe)))
135 }
136
137 pub async fn add(&self, peer: &PeerId, origin: PeerOrigin, mut addrs: Vec<Multiaddr>) -> crate::errors::Result<()> {
141 if peer == &self.me {
142 return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
143 }
144
145 if let Some(mut peer_status) = self.db.get_network_peer(peer).await? {
146 if !peer_status.is_ignored(current_time(), self.cfg.ignore_timeframe) {
147 peer_status.ignored = None;
148 peer_status.multiaddresses.append(&mut addrs);
149 peer_status.multiaddresses = peer_status
150 .multiaddresses
151 .into_iter()
152 .collect::<HashSet<_>>()
153 .into_iter()
154 .collect::<Vec<_>>();
155 self.db.update_network_peer(peer_status).await?;
156 }
157 } else {
158 debug!(%peer, %origin, multiaddresses = ?addrs, "Adding peer to the store");
159
160 self.db
161 .add_network_peer(
162 peer,
163 origin,
164 addrs,
165 self.cfg.backoff_exponent,
166 self.cfg.quality_avg_window_size,
167 )
168 .await?;
169 }
170
171 #[cfg(all(feature = "prometheus", not(test)))]
172 {
173 let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
174 self.refresh_metrics(&stats)
175 }
176
177 Ok(())
178 }
179
180 pub async fn get(&self, peer: &PeerId) -> crate::errors::Result<Option<PeerStatus>> {
181 if peer == &self.me {
182 Ok(Some({
183 let mut ps = PeerStatus::new(*peer, PeerOrigin::Initialization, 0.0f64, 2u32);
184 ps.multiaddresses.clone_from(&self.me_addresses);
185 ps
186 }))
187 } else {
188 Ok(self.db.get_network_peer(peer).await?)
189 }
190 }
191
192 pub async fn remove(&self, peer: &PeerId) -> crate::errors::Result<()> {
194 if peer == &self.me {
195 return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
196 }
197
198 self.db.remove_network_peer(peer).await?;
199
200 #[cfg(all(feature = "prometheus", not(test)))]
201 {
202 let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
203 self.refresh_metrics(&stats)
204 }
205
206 Ok(())
207 }
208
209 pub async fn update(
211 &self,
212 peer: &PeerId,
213 ping_result: std::result::Result<Duration, ()>,
214 version: Option<String>,
215 ) -> crate::errors::Result<Option<NetworkTriggeredEvent>> {
216 if peer == &self.me {
217 return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
218 }
219
220 if let Some(mut entry) = self.db.get_network_peer(peer).await? {
221 if !entry.is_ignored(current_time(), self.cfg.ignore_timeframe) {
222 entry.ignored = None;
223 }
224
225 entry.heartbeats_sent += 1;
226 entry.peer_version = version;
227
228 if let Ok(latency) = ping_result {
229 entry.last_seen = current_time();
230 entry.last_seen_latency = latency;
231 entry.heartbeats_succeeded += 1;
232 entry.backoff = self.cfg.backoff_min;
233 entry.update_quality(1.0_f64.min(entry.get_quality() + self.cfg.quality_step));
234 } else {
235 entry.backoff = self.cfg.backoff_max.max(entry.backoff.powf(self.cfg.backoff_exponent));
236 entry.update_quality(0.0_f64.max(entry.get_quality() - self.cfg.quality_step));
237
238 let q = entry.get_quality();
239
240 if q < self.cfg.quality_bad_threshold {
241 entry.ignored = Some(current_time());
242 }
243 }
244
245 let (peer_id, quality) = (entry.id.1, entry.get_quality());
246 self.db.update_network_peer(entry).await?;
247
248 #[cfg(all(feature = "prometheus", not(test)))]
249 {
250 let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
251 self.refresh_metrics(&stats)
252 }
253
254 if quality <= self.cfg.quality_offline_threshold {
255 Ok(Some(NetworkTriggeredEvent::CloseConnection(peer_id)))
256 } else {
257 Ok(Some(NetworkTriggeredEvent::UpdateQuality(peer_id, quality)))
258 }
259 } else {
260 debug!(%peer, "Ignoring update request for unknown peer");
261 Ok(None)
262 }
263 }
264
265 pub async fn health(&self) -> Health {
267 self.db
268 .network_peer_stats(self.cfg.quality_bad_threshold)
269 .await
270 .map(|stats| health_from_stats(&stats, self.am_i_public))
271 .unwrap_or(Health::Unknown)
272 }
273
274 #[cfg(all(feature = "prometheus", not(test)))]
276 fn refresh_metrics(&self, stats: &Stats) {
277 let health = health_from_stats(stats, self.am_i_public);
278
279 if METRIC_NETWORK_HEALTH_TIME_TO_GREEN.get() < 0.5f64 {
280 if let Some(ts) = current_time().checked_sub(self.started_at) {
281 METRIC_NETWORK_HEALTH_TIME_TO_GREEN.set(ts.as_unix_timestamp().as_secs_f64());
282 }
283 }
284 METRIC_PEER_COUNT.set(stats.all_count() as f64);
285 METRIC_PEERS_BY_QUALITY.set(&["public", "high"], stats.good_quality_public as f64);
286 METRIC_PEERS_BY_QUALITY.set(&["public", "low"], stats.bad_quality_public as f64);
287 METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "high"], stats.good_quality_non_public as f64);
288 METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "low"], stats.bad_quality_non_public as f64);
289 METRIC_NETWORK_HEALTH.set((health as i32).into());
290 }
291
292 pub async fn connected_peers(&self) -> crate::errors::Result<Vec<PeerId>> {
293 let minimum_quality = self.cfg.quality_offline_threshold;
294 self.peer_filter(|peer| async move { (peer.get_quality() > minimum_quality).then_some(peer.id.1) })
295 .await
296 }
297
298 pub(crate) async fn peer_filter<Fut, V, F>(&self, filter: F) -> crate::errors::Result<Vec<V>>
300 where
301 F: FnMut(PeerStatus) -> Fut,
302 Fut: std::future::Future<Output = Option<V>>,
303 {
304 let stream = self.db.get_network_peers(Default::default(), false).await?;
305 futures::pin_mut!(stream);
306 Ok(stream.filter_map(filter).collect().await)
307 }
308
309 pub async fn find_peers_to_ping(&self, threshold: SystemTime) -> crate::errors::Result<Vec<PeerId>> {
310 let stream = self
311 .db
312 .get_network_peers(PeerSelector::default().with_last_seen_lte(threshold), false)
313 .await?;
314 futures::pin_mut!(stream);
315 let mut data: Vec<PeerStatus> = stream
316 .filter_map(|v| async move {
317 if v.id.1 == self.me {
318 return None;
319 }
320
321 if let Some(ignore_start) = v.ignored {
322 let should_be_ignored = ignore_start
323 .checked_add(self.cfg.ignore_timeframe)
324 .is_some_and(|v| v > threshold);
325
326 if should_be_ignored {
327 return None;
328 }
329 }
330
331 let backoff = v.backoff.powf(self.cfg.backoff_exponent);
332 let delay = std::cmp::min(self.cfg.min_delay * (backoff as u32), self.cfg.max_delay);
333
334 if (v.last_seen + delay) < threshold {
335 Some(v)
336 } else {
337 None
338 }
339 })
340 .collect()
341 .await;
342
343 data.sort_by(|a, b| {
344 if a.last_seen < b.last_seen {
345 std::cmp::Ordering::Less
346 } else {
347 std::cmp::Ordering::Greater
348 }
349 });
350
351 Ok(data.into_iter().map(|peer| peer.id.1).collect())
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use crate::network::{Health, Network, NetworkConfig, NetworkTriggeredEvent, PeerOrigin};
358 use anyhow::Context;
359 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
360 use hopr_platform::time::native::current_time;
361 use hopr_primitive_types::prelude::AsUnixTimestamp;
362 use libp2p_identity::PeerId;
363 use std::ops::Add;
364 use std::time::Duration;
365
366 #[test]
367 fn test_network_health_should_serialize_to_a_proper_string() {
368 assert_eq!(format!("{}", Health::Orange), "Orange".to_owned())
369 }
370
371 #[test]
372 fn test_network_health_should_deserialize_from_proper_string() -> Result<(), Box<dyn std::error::Error>> {
373 let parsed: Health = "Orange".parse()?;
374 Ok(assert_eq!(parsed, Health::Orange))
375 }
376
377 async fn basic_network(my_id: &PeerId) -> anyhow::Result<Network<hopr_db_sql::db::HoprDb>> {
378 let mut cfg = NetworkConfig::default();
379 cfg.quality_offline_threshold = 0.6;
380 Ok(Network::new(
381 *my_id,
382 vec![],
383 cfg,
384 hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
385 ))
386 }
387
388 #[test]
389 fn test_network_health_should_be_ordered_numerically_for_hopr_metrics_output() {
390 assert_eq!(Health::Unknown as i32, 0);
391 assert_eq!(Health::Red as i32, 1);
392 assert_eq!(Health::Orange as i32, 2);
393 assert_eq!(Health::Yellow as i32, 3);
394 assert_eq!(Health::Green as i32, 4);
395 }
396
397 #[async_std::test]
398 async fn test_network_should_not_be_able_to_add_self_reference() -> anyhow::Result<()> {
399 let me = PeerId::random();
400
401 let peers = basic_network(&me).await?;
402
403 assert!(peers.add(&me, PeerOrigin::IncomingConnection, vec![]).await.is_err());
404
405 assert_eq!(
406 0,
407 peers
408 .peer_filter(|peer| async move { Some(peer.id) })
409 .await
410 .unwrap_or(vec![])
411 .len()
412 );
413 assert!(peers.has(&me).await);
414
415 Ok(())
416 }
417
418 #[async_std::test]
419 async fn test_network_should_contain_a_registered_peer() -> anyhow::Result<()> {
420 let expected: PeerId = OffchainKeypair::random().public().into();
421 let me: PeerId = OffchainKeypair::random().public().into();
422
423 let peers = basic_network(&me).await?;
424
425 peers.add(&expected, PeerOrigin::IncomingConnection, vec![]).await?;
426
427 assert_eq!(
428 1,
429 peers
430 .peer_filter(|peer| async move { Some(peer.id) })
431 .await
432 .unwrap_or(vec![])
433 .len()
434 );
435 assert!(peers.has(&expected).await);
436
437 Ok(())
438 }
439
440 #[async_std::test]
441 async fn test_network_should_remove_a_peer_on_unregistration() -> anyhow::Result<()> {
442 let peer: PeerId = OffchainKeypair::random().public().into();
443 let me: PeerId = OffchainKeypair::random().public().into();
444
445 let peers = basic_network(&me).await?;
446
447 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
448
449 peers.remove(&peer).await?;
450
451 assert_eq!(
452 0,
453 peers
454 .peer_filter(|peer| async move { Some(peer.id) })
455 .await
456 .unwrap_or(vec![])
457 .len()
458 );
459 assert!(!peers.has(&peer).await);
460
461 Ok(())
462 }
463
464 #[async_std::test]
465 async fn test_network_should_ignore_heartbeat_updates_for_peers_that_were_not_registered() -> anyhow::Result<()> {
466 let peer: PeerId = OffchainKeypair::random().public().into();
467 let me: PeerId = OffchainKeypair::random().public().into();
468
469 let peers = basic_network(&me).await?;
470
471 peers
472 .update(&peer, Ok(current_time().as_unix_timestamp()), None)
473 .await?;
474
475 assert_eq!(
476 0,
477 peers
478 .peer_filter(|peer| async move { Some(peer.id) })
479 .await
480 .unwrap_or(vec![])
481 .len()
482 );
483 assert!(!peers.has(&peer).await);
484
485 Ok(())
486 }
487
488 #[async_std::test]
489 async fn test_network_should_be_able_to_register_a_succeeded_heartbeat_result() -> anyhow::Result<()> {
490 let peer: PeerId = OffchainKeypair::random().public().into();
491 let me: PeerId = OffchainKeypair::random().public().into();
492
493 let peers = basic_network(&me).await?;
494
495 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
496
497 let latency = 123u64;
498
499 peers
500 .update(&peer, Ok(std::time::Duration::from_millis(latency)), None)
501 .await?;
502
503 let actual = peers.get(&peer).await?.expect("peer record should be present");
504
505 assert_eq!(actual.heartbeats_sent, 1);
506 assert_eq!(actual.heartbeats_succeeded, 1);
507 assert_eq!(actual.last_seen_latency, std::time::Duration::from_millis(latency));
508
509 Ok(())
510 }
511
512 #[async_std::test]
513 async fn test_network_update_should_merge_metadata() -> anyhow::Result<()> {
514 let peer: PeerId = OffchainKeypair::random().public().into();
515 let me: PeerId = OffchainKeypair::random().public().into();
516
517 let peers = basic_network(&me).await?;
518
519 let expected_version = Some("1.2.4".to_string());
520
521 {
522 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
523 peers
524 .update(&peer, Ok(current_time().as_unix_timestamp()), expected_version.clone())
525 .await?;
526
527 let status = peers.get(&peer).await?.context("peer should be present")?;
528
529 assert_eq!(status.peer_version, expected_version);
530 }
531
532 let ts = current_time().as_unix_timestamp();
533
534 {
535 let expected_version = Some("2.0.0".to_string());
536
537 peers.update(&peer, Ok(ts), expected_version.clone()).await?;
538
539 let status = peers.get(&peer).await?.context("peer should be present")?;
540
541 assert_eq!(status.peer_version, expected_version);
542 }
543
544 Ok(())
545 }
546
547 #[async_std::test]
548 async fn test_network_should_ignore_a_peer_that_has_reached_lower_thresholds_a_specified_amount_of_time(
549 ) -> anyhow::Result<()> {
550 let peer: PeerId = OffchainKeypair::random().public().into();
551 let me: PeerId = OffchainKeypair::random().public().into();
552
553 let peers = basic_network(&me).await?;
554
555 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
556
557 peers
558 .update(&peer, Ok(current_time().as_unix_timestamp()), None)
559 .await?;
560 peers
561 .update(&peer, Ok(current_time().as_unix_timestamp()), None)
562 .await?;
563 peers.update(&peer, Err(()), None).await?; peers.update(&peer, Err(()), None).await.expect("no error should occur"); assert!(peers.is_ignored(&peer).await);
568
569 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
571
572 assert!(peers.is_ignored(&peer).await);
573
574 Ok(())
575 }
576
577 #[async_std::test]
578 async fn test_network_should_be_able_to_register_a_failed_heartbeat_result() -> anyhow::Result<()> {
579 let peer: PeerId = OffchainKeypair::random().public().into();
580 let me: PeerId = OffchainKeypair::random().public().into();
581
582 let peers = basic_network(&me).await?;
583
584 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
585
586 peers
589 .update(&peer, Ok(std::time::Duration::from_millis(123_u64)), None)
590 .await?;
591 peers
592 .update(&peer, Ok(std::time::Duration::from_millis(200_u64)), None)
593 .await?;
594 peers
595 .update(&peer, Ok(std::time::Duration::from_millis(200_u64)), None)
596 .await?;
597
598 peers.update(&peer, Err(()), None).await?;
599
600 let actual = peers.get(&peer).await?.expect("the peer record should be present");
601
602 assert_eq!(actual.heartbeats_succeeded, 3);
603 assert_eq!(actual.backoff, 300f64);
604
605 Ok(())
606 }
607
608 #[async_std::test]
609 async fn test_network_peer_should_be_listed_for_the_ping_if_last_recorded_later_than_reference(
610 ) -> anyhow::Result<()> {
611 let first: PeerId = OffchainKeypair::random().public().into();
612 let second: PeerId = OffchainKeypair::random().public().into();
613 let me: PeerId = OffchainKeypair::random().public().into();
614
615 let peers = basic_network(&me).await?;
616
617 peers.add(&first, PeerOrigin::IncomingConnection, vec![]).await?;
618 peers.add(&second, PeerOrigin::IncomingConnection, vec![]).await?;
619
620 let latency = 77_u64;
621
622 let mut expected = vec![first, second];
623 expected.sort();
624
625 peers
626 .update(&first, Ok(std::time::Duration::from_millis(latency)), None)
627 .await?;
628 peers
629 .update(&second, Ok(std::time::Duration::from_millis(latency)), None)
630 .await?;
631
632 let mut actual = peers
642 .find_peers_to_ping(current_time().add(Duration::from_secs(2u64)))
643 .await?;
644 actual.sort();
645
646 assert_eq!(actual, expected);
647
648 Ok(())
649 }
650
651 #[async_std::test]
652 async fn test_network_should_have_red_health_without_any_registered_peers() -> anyhow::Result<()> {
653 let me: PeerId = OffchainKeypair::random().public().into();
654
655 let peers = basic_network(&me).await?;
656
657 assert_eq!(peers.health().await, Health::Red);
658
659 Ok(())
660 }
661
662 #[async_std::test]
663 async fn test_network_should_be_unhealthy_without_any_heartbeat_updates() -> anyhow::Result<()> {
664 let peer: PeerId = OffchainKeypair::random().public().into();
665 let me: PeerId = OffchainKeypair::random().public().into();
666
667 let peers = basic_network(&me).await?;
668
669 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
670
671 assert_eq!(peers.health().await, Health::Orange);
673
674 Ok(())
675 }
676
677 #[async_std::test]
678 async fn test_network_should_be_unhealthy_without_any_peers_once_the_health_was_known() -> anyhow::Result<()> {
679 let peer: PeerId = OffchainKeypair::random().public().into();
680 let me: PeerId = OffchainKeypair::random().public().into();
681
682 let peers = basic_network(&me).await?;
683
684 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
685 let _ = peers.health();
686 peers.remove(&peer).await?;
687
688 assert_eq!(peers.health().await, Health::Red);
689
690 Ok(())
691 }
692
693 #[async_std::test]
694 async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_low_quality() -> anyhow::Result<()> {
695 let peer: PeerId = OffchainKeypair::random().public().into();
696 let me: PeerId = OffchainKeypair::random().public().into();
697
698 let mut cfg = NetworkConfig::default();
699 cfg.quality_offline_threshold = 0.6;
700
701 let peers = Network::new(
702 me,
703 vec![],
704 cfg,
705 hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
706 );
707
708 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
709
710 peers
711 .update(&peer, Ok(current_time().as_unix_timestamp()), None)
712 .await?;
713
714 assert_eq!(peers.health().await, Health::Orange);
715
716 Ok(())
717 }
718
719 #[async_std::test]
720 async fn test_network_should_close_connection_to_peer_once_it_reaches_the_lowest_possible_quality(
721 ) -> anyhow::Result<()> {
722 let peer: PeerId = OffchainKeypair::random().public().into();
723 let public = peer;
724 let me: PeerId = OffchainKeypair::random().public().into();
725
726 let mut cfg = NetworkConfig::default();
727 cfg.quality_offline_threshold = 0.6;
728
729 let peers = Network::new(
730 me,
731 vec![],
732 cfg,
733 hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
734 );
735
736 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
737
738 assert_eq!(
739 peers.update(&peer, Err(()), None).await?,
740 Some(NetworkTriggeredEvent::CloseConnection(peer))
741 );
742
743 assert!(peers.is_ignored(&public).await);
744
745 Ok(())
746 }
747
748 #[async_std::test]
749 async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_high_quality_and_i_am_public(
750 ) -> anyhow::Result<()> {
751 let me: PeerId = OffchainKeypair::random().public().into();
752 let peer: PeerId = OffchainKeypair::random().public().into();
753
754 let mut cfg = NetworkConfig::default();
755 cfg.quality_offline_threshold = 0.3;
756
757 let peers = Network::new(
758 me,
759 vec![],
760 cfg,
761 hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
762 );
763
764 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
765
766 for _ in 0..3 {
767 peers
768 .update(&peer, Ok(current_time().as_unix_timestamp()), None)
769 .await?;
770 }
771
772 assert_eq!(peers.health().await, Health::Green);
773
774 Ok(())
775 }
776
777 #[async_std::test]
778 async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_high_quality_and_another_high_quality_non_public(
779 ) -> anyhow::Result<()> {
780 let peer: PeerId = OffchainKeypair::random().public().into();
781 let peer2: PeerId = OffchainKeypair::random().public().into();
782
783 let mut cfg = NetworkConfig::default();
784 cfg.quality_offline_threshold = 0.3;
785
786 let peers = Network::new(
787 OffchainKeypair::random().public().into(),
788 vec![],
789 cfg,
790 hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
791 );
792
793 peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
794 peers.add(&peer2, PeerOrigin::IncomingConnection, vec![]).await?;
795
796 for _ in 0..3 {
797 peers
798 .update(&peer2, Ok(current_time().as_unix_timestamp()), None)
799 .await?;
800 peers
801 .update(&peer, Ok(current_time().as_unix_timestamp()), None)
802 .await?;
803 }
804
805 assert_eq!(peers.health().await, Health::Green);
806
807 Ok(())
808 }
809}