hopr_transport_network/
network.rs

1use std::collections::hash_set::HashSet;
2use std::time::{Duration, SystemTime};
3
4use futures::StreamExt;
5use libp2p_identity::PeerId;
6
7use multiaddr::Multiaddr;
8use tracing::debug;
9
10pub use hopr_db_api::peers::{HoprDbPeersOperations, PeerOrigin, PeerSelector, PeerStatus, Stats};
11use hopr_platform::time::native::current_time;
12
13use crate::config::NetworkConfig;
14
15#[cfg(all(feature = "prometheus", not(test)))]
16use {
17    hopr_metrics::metrics::{MultiGauge, SimpleGauge},
18    hopr_primitive_types::prelude::*,
19};
20
21#[cfg(all(feature = "prometheus", not(test)))]
22lazy_static::lazy_static! {
23    static ref METRIC_NETWORK_HEALTH: SimpleGauge =
24        SimpleGauge::new("hopr_network_health", "Connectivity health indicator").unwrap();
25    static ref METRIC_PEERS_BY_QUALITY: MultiGauge =
26        MultiGauge::new("hopr_peers_by_quality", "Number different peer types by quality",
27            &["type", "quality"],
28        ).unwrap();
29    static ref METRIC_PEER_COUNT: SimpleGauge =
30        SimpleGauge::new("hopr_peer_count", "Number of all peers").unwrap();
31    static ref METRIC_NETWORK_HEALTH_TIME_TO_GREEN: SimpleGauge = SimpleGauge::new(
32        "hopr_time_to_green_sec",
33        "Time it takes for a node to transition to the GREEN network state"
34    ).unwrap();
35}
36
37/// Network health represented with colors, where green is the best and red
38/// is the worst possible observed nework quality.
39#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, strum::Display, strum::EnumString)]
40pub enum Health {
41    /// Unknown health, on application startup
42    Unknown = 0,
43    /// No connection, default
44    Red = 1,
45    /// Low quality connection to at least 1 public relay
46    Orange = 2,
47    /// High quality connection to at least 1 public relay
48    Yellow = 3,
49    /// High quality connection to at least 1 public relay and 1 NAT node
50    Green = 4,
51}
52
53/// Events generated by the [Network] object allowing it
54/// to physically interact with external systems,
55/// including the transport mechanism.
56#[derive(Debug, Clone, PartialEq, strum::Display)]
57pub enum NetworkTriggeredEvent {
58    CloseConnection(PeerId),
59    UpdateQuality(PeerId, f64),
60}
61
62/// Calculate the health factor for network from the available stats
63fn health_from_stats(stats: &Stats, is_public: bool) -> Health {
64    let mut health = Health::Red;
65
66    if stats.bad_quality_public > 0 {
67        health = Health::Orange;
68    }
69
70    if stats.good_quality_public > 0 {
71        health = if is_public || stats.good_quality_non_public > 0 {
72            Health::Green
73        } else {
74            Health::Yellow
75        };
76    }
77
78    health
79}
80
81/// The network object storing information about the running observed state of the network,
82/// including peers, connection qualities and updates for other parts of the system.
83#[derive(Debug)]
84pub struct Network<T>
85where
86    T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
87{
88    me: PeerId,
89    me_addresses: Vec<Multiaddr>,
90    am_i_public: bool,
91    cfg: NetworkConfig,
92    db: T,
93    #[cfg(all(feature = "prometheus", not(test)))]
94    started_at: Duration,
95}
96
97impl<T> Network<T>
98where
99    T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
100{
101    pub fn new(my_peer_id: PeerId, my_multiaddresses: Vec<Multiaddr>, cfg: NetworkConfig, db: T) -> Self {
102        #[cfg(all(feature = "prometheus", not(test)))]
103        {
104            METRIC_NETWORK_HEALTH.set(0.0);
105            METRIC_NETWORK_HEALTH_TIME_TO_GREEN.set(0.0);
106            METRIC_PEERS_BY_QUALITY.set(&["public", "high"], 0.0);
107            METRIC_PEERS_BY_QUALITY.set(&["public", "low"], 0.0);
108            METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "high"], 0.0);
109            METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "low"], 0.0);
110        }
111
112        Self {
113            me: my_peer_id,
114            me_addresses: my_multiaddresses,
115            am_i_public: true,
116            cfg: cfg.clone(),
117            db,
118            #[cfg(all(feature = "prometheus", not(test)))]
119            started_at: current_time().as_unix_timestamp(),
120        }
121    }
122
123    /// Check whether the PeerId is present in the network
124    pub async fn has(&self, peer: &PeerId) -> bool {
125        peer == &self.me || self.db.get_network_peer(peer).await.is_ok_and(|p| p.is_some())
126    }
127
128    /// Checks if the peer is present in the network, but it is being currently ignored.
129    pub async fn is_ignored(&self, peer: &PeerId) -> bool {
130        peer != &self.me
131            && self
132                .get(peer)
133                .await
134                .is_ok_and(|ps| ps.is_some_and(|p| p.is_ignored(current_time(), self.cfg.ignore_timeframe)))
135    }
136
137    /// Add a new peer into the network
138    ///
139    /// Each peer must have an origin specification.
140    pub async fn add(&self, peer: &PeerId, origin: PeerOrigin, mut addrs: Vec<Multiaddr>) -> crate::errors::Result<()> {
141        if peer == &self.me {
142            return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
143        }
144
145        if let Some(mut peer_status) = self.db.get_network_peer(peer).await? {
146            if !peer_status.is_ignored(current_time(), self.cfg.ignore_timeframe) {
147                peer_status.ignored = None;
148                peer_status.multiaddresses.append(&mut addrs);
149                peer_status.multiaddresses = peer_status
150                    .multiaddresses
151                    .into_iter()
152                    .collect::<HashSet<_>>()
153                    .into_iter()
154                    .collect::<Vec<_>>();
155                self.db.update_network_peer(peer_status).await?;
156            }
157        } else {
158            debug!(%peer, %origin, multiaddresses = ?addrs, "Adding peer to the store");
159
160            self.db
161                .add_network_peer(
162                    peer,
163                    origin,
164                    addrs,
165                    self.cfg.backoff_exponent,
166                    self.cfg.quality_avg_window_size,
167                )
168                .await?;
169        }
170
171        #[cfg(all(feature = "prometheus", not(test)))]
172        {
173            let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
174            self.refresh_metrics(&stats)
175        }
176
177        Ok(())
178    }
179
180    pub async fn get(&self, peer: &PeerId) -> crate::errors::Result<Option<PeerStatus>> {
181        if peer == &self.me {
182            Ok(Some({
183                let mut ps = PeerStatus::new(*peer, PeerOrigin::Initialization, 0.0f64, 2u32);
184                ps.multiaddresses.clone_from(&self.me_addresses);
185                ps
186            }))
187        } else {
188            Ok(self.db.get_network_peer(peer).await?)
189        }
190    }
191
192    /// Remove peer from the network
193    pub async fn remove(&self, peer: &PeerId) -> crate::errors::Result<()> {
194        if peer == &self.me {
195            return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
196        }
197
198        self.db.remove_network_peer(peer).await?;
199
200        #[cfg(all(feature = "prometheus", not(test)))]
201        {
202            let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
203            self.refresh_metrics(&stats)
204        }
205
206        Ok(())
207    }
208
209    /// Update the peer record with the observation
210    pub async fn update(
211        &self,
212        peer: &PeerId,
213        ping_result: std::result::Result<Duration, ()>,
214        version: Option<String>,
215    ) -> crate::errors::Result<Option<NetworkTriggeredEvent>> {
216        if peer == &self.me {
217            return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
218        }
219
220        if let Some(mut entry) = self.db.get_network_peer(peer).await? {
221            if !entry.is_ignored(current_time(), self.cfg.ignore_timeframe) {
222                entry.ignored = None;
223            }
224
225            entry.heartbeats_sent += 1;
226            entry.peer_version = version;
227
228            if let Ok(latency) = ping_result {
229                entry.last_seen = current_time();
230                entry.last_seen_latency = latency;
231                entry.heartbeats_succeeded += 1;
232                entry.backoff = self.cfg.backoff_min;
233                entry.update_quality(1.0_f64.min(entry.get_quality() + self.cfg.quality_step));
234            } else {
235                entry.backoff = self.cfg.backoff_max.max(entry.backoff.powf(self.cfg.backoff_exponent));
236                entry.update_quality(0.0_f64.max(entry.get_quality() - self.cfg.quality_step));
237
238                let q = entry.get_quality();
239
240                if q < self.cfg.quality_bad_threshold {
241                    entry.ignored = Some(current_time());
242                }
243            }
244
245            let (peer_id, quality) = (entry.id.1, entry.get_quality());
246            self.db.update_network_peer(entry).await?;
247
248            #[cfg(all(feature = "prometheus", not(test)))]
249            {
250                let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
251                self.refresh_metrics(&stats)
252            }
253
254            if quality <= self.cfg.quality_offline_threshold {
255                Ok(Some(NetworkTriggeredEvent::CloseConnection(peer_id)))
256            } else {
257                Ok(Some(NetworkTriggeredEvent::UpdateQuality(peer_id, quality)))
258            }
259        } else {
260            debug!(%peer, "Ignoring update request for unknown peer");
261            Ok(None)
262        }
263    }
264
265    /// Returns the quality of the network as a network health indicator.
266    pub async fn health(&self) -> Health {
267        self.db
268            .network_peer_stats(self.cfg.quality_bad_threshold)
269            .await
270            .map(|stats| health_from_stats(&stats, self.am_i_public))
271            .unwrap_or(Health::Unknown)
272    }
273
274    /// Update the internally perceived network status that is processed to the network health
275    #[cfg(all(feature = "prometheus", not(test)))]
276    fn refresh_metrics(&self, stats: &Stats) {
277        let health = health_from_stats(stats, self.am_i_public);
278
279        if METRIC_NETWORK_HEALTH_TIME_TO_GREEN.get() < 0.5f64 {
280            if let Some(ts) = current_time().checked_sub(self.started_at) {
281                METRIC_NETWORK_HEALTH_TIME_TO_GREEN.set(ts.as_unix_timestamp().as_secs_f64());
282            }
283        }
284        METRIC_PEER_COUNT.set(stats.all_count() as f64);
285        METRIC_PEERS_BY_QUALITY.set(&["public", "high"], stats.good_quality_public as f64);
286        METRIC_PEERS_BY_QUALITY.set(&["public", "low"], stats.bad_quality_public as f64);
287        METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "high"], stats.good_quality_non_public as f64);
288        METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "low"], stats.bad_quality_non_public as f64);
289        METRIC_NETWORK_HEALTH.set((health as i32).into());
290    }
291
292    pub async fn connected_peers(&self) -> crate::errors::Result<Vec<PeerId>> {
293        let minimum_quality = self.cfg.quality_offline_threshold;
294        self.peer_filter(|peer| async move { (peer.get_quality() > minimum_quality).then_some(peer.id.1) })
295            .await
296    }
297
298    // ======
299    pub(crate) async fn peer_filter<Fut, V, F>(&self, filter: F) -> crate::errors::Result<Vec<V>>
300    where
301        F: FnMut(PeerStatus) -> Fut,
302        Fut: std::future::Future<Output = Option<V>>,
303    {
304        let stream = self.db.get_network_peers(Default::default(), false).await?;
305        futures::pin_mut!(stream);
306        Ok(stream.filter_map(filter).collect().await)
307    }
308
309    pub async fn find_peers_to_ping(&self, threshold: SystemTime) -> crate::errors::Result<Vec<PeerId>> {
310        let stream = self
311            .db
312            .get_network_peers(PeerSelector::default().with_last_seen_lte(threshold), false)
313            .await?;
314        futures::pin_mut!(stream);
315        let mut data: Vec<PeerStatus> = stream
316            .filter_map(|v| async move {
317                if v.id.1 == self.me {
318                    return None;
319                }
320
321                if let Some(ignore_start) = v.ignored {
322                    let should_be_ignored = ignore_start
323                        .checked_add(self.cfg.ignore_timeframe)
324                        .is_some_and(|v| v > threshold);
325
326                    if should_be_ignored {
327                        return None;
328                    }
329                }
330
331                let backoff = v.backoff.powf(self.cfg.backoff_exponent);
332                let delay = std::cmp::min(self.cfg.min_delay * (backoff as u32), self.cfg.max_delay);
333
334                if (v.last_seen + delay) < threshold {
335                    Some(v)
336                } else {
337                    None
338                }
339            })
340            .collect()
341            .await;
342
343        data.sort_by(|a, b| {
344            if a.last_seen < b.last_seen {
345                std::cmp::Ordering::Less
346            } else {
347                std::cmp::Ordering::Greater
348            }
349        });
350
351        Ok(data.into_iter().map(|peer| peer.id.1).collect())
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use crate::network::{Health, Network, NetworkConfig, NetworkTriggeredEvent, PeerOrigin};
358    use anyhow::Context;
359    use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
360    use hopr_platform::time::native::current_time;
361    use hopr_primitive_types::prelude::AsUnixTimestamp;
362    use libp2p_identity::PeerId;
363    use std::ops::Add;
364    use std::time::Duration;
365
366    #[test]
367    fn test_network_health_should_serialize_to_a_proper_string() {
368        assert_eq!(format!("{}", Health::Orange), "Orange".to_owned())
369    }
370
371    #[test]
372    fn test_network_health_should_deserialize_from_proper_string() -> Result<(), Box<dyn std::error::Error>> {
373        let parsed: Health = "Orange".parse()?;
374        Ok(assert_eq!(parsed, Health::Orange))
375    }
376
377    async fn basic_network(my_id: &PeerId) -> anyhow::Result<Network<hopr_db_sql::db::HoprDb>> {
378        let mut cfg = NetworkConfig::default();
379        cfg.quality_offline_threshold = 0.6;
380        Ok(Network::new(
381            *my_id,
382            vec![],
383            cfg,
384            hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
385        ))
386    }
387
388    #[test]
389    fn test_network_health_should_be_ordered_numerically_for_hopr_metrics_output() {
390        assert_eq!(Health::Unknown as i32, 0);
391        assert_eq!(Health::Red as i32, 1);
392        assert_eq!(Health::Orange as i32, 2);
393        assert_eq!(Health::Yellow as i32, 3);
394        assert_eq!(Health::Green as i32, 4);
395    }
396
397    #[async_std::test]
398    async fn test_network_should_not_be_able_to_add_self_reference() -> anyhow::Result<()> {
399        let me = PeerId::random();
400
401        let peers = basic_network(&me).await?;
402
403        assert!(peers.add(&me, PeerOrigin::IncomingConnection, vec![]).await.is_err());
404
405        assert_eq!(
406            0,
407            peers
408                .peer_filter(|peer| async move { Some(peer.id) })
409                .await
410                .unwrap_or(vec![])
411                .len()
412        );
413        assert!(peers.has(&me).await);
414
415        Ok(())
416    }
417
418    #[async_std::test]
419    async fn test_network_should_contain_a_registered_peer() -> anyhow::Result<()> {
420        let expected: PeerId = OffchainKeypair::random().public().into();
421        let me: PeerId = OffchainKeypair::random().public().into();
422
423        let peers = basic_network(&me).await?;
424
425        peers.add(&expected, PeerOrigin::IncomingConnection, vec![]).await?;
426
427        assert_eq!(
428            1,
429            peers
430                .peer_filter(|peer| async move { Some(peer.id) })
431                .await
432                .unwrap_or(vec![])
433                .len()
434        );
435        assert!(peers.has(&expected).await);
436
437        Ok(())
438    }
439
440    #[async_std::test]
441    async fn test_network_should_remove_a_peer_on_unregistration() -> anyhow::Result<()> {
442        let peer: PeerId = OffchainKeypair::random().public().into();
443        let me: PeerId = OffchainKeypair::random().public().into();
444
445        let peers = basic_network(&me).await?;
446
447        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
448
449        peers.remove(&peer).await?;
450
451        assert_eq!(
452            0,
453            peers
454                .peer_filter(|peer| async move { Some(peer.id) })
455                .await
456                .unwrap_or(vec![])
457                .len()
458        );
459        assert!(!peers.has(&peer).await);
460
461        Ok(())
462    }
463
464    #[async_std::test]
465    async fn test_network_should_ignore_heartbeat_updates_for_peers_that_were_not_registered() -> anyhow::Result<()> {
466        let peer: PeerId = OffchainKeypair::random().public().into();
467        let me: PeerId = OffchainKeypair::random().public().into();
468
469        let peers = basic_network(&me).await?;
470
471        peers
472            .update(&peer, Ok(current_time().as_unix_timestamp()), None)
473            .await?;
474
475        assert_eq!(
476            0,
477            peers
478                .peer_filter(|peer| async move { Some(peer.id) })
479                .await
480                .unwrap_or(vec![])
481                .len()
482        );
483        assert!(!peers.has(&peer).await);
484
485        Ok(())
486    }
487
488    #[async_std::test]
489    async fn test_network_should_be_able_to_register_a_succeeded_heartbeat_result() -> anyhow::Result<()> {
490        let peer: PeerId = OffchainKeypair::random().public().into();
491        let me: PeerId = OffchainKeypair::random().public().into();
492
493        let peers = basic_network(&me).await?;
494
495        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
496
497        let latency = 123u64;
498
499        peers
500            .update(&peer, Ok(std::time::Duration::from_millis(latency)), None)
501            .await?;
502
503        let actual = peers.get(&peer).await?.expect("peer record should be present");
504
505        assert_eq!(actual.heartbeats_sent, 1);
506        assert_eq!(actual.heartbeats_succeeded, 1);
507        assert_eq!(actual.last_seen_latency, std::time::Duration::from_millis(latency));
508
509        Ok(())
510    }
511
512    #[async_std::test]
513    async fn test_network_update_should_merge_metadata() -> anyhow::Result<()> {
514        let peer: PeerId = OffchainKeypair::random().public().into();
515        let me: PeerId = OffchainKeypair::random().public().into();
516
517        let peers = basic_network(&me).await?;
518
519        let expected_version = Some("1.2.4".to_string());
520
521        {
522            peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
523            peers
524                .update(&peer, Ok(current_time().as_unix_timestamp()), expected_version.clone())
525                .await?;
526
527            let status = peers.get(&peer).await?.context("peer should be present")?;
528
529            assert_eq!(status.peer_version, expected_version);
530        }
531
532        let ts = current_time().as_unix_timestamp();
533
534        {
535            let expected_version = Some("2.0.0".to_string());
536
537            peers.update(&peer, Ok(ts), expected_version.clone()).await?;
538
539            let status = peers.get(&peer).await?.context("peer should be present")?;
540
541            assert_eq!(status.peer_version, expected_version);
542        }
543
544        Ok(())
545    }
546
547    #[async_std::test]
548    async fn test_network_should_ignore_a_peer_that_has_reached_lower_thresholds_a_specified_amount_of_time(
549    ) -> anyhow::Result<()> {
550        let peer: PeerId = OffchainKeypair::random().public().into();
551        let me: PeerId = OffchainKeypair::random().public().into();
552
553        let peers = basic_network(&me).await?;
554
555        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
556
557        peers
558            .update(&peer, Ok(current_time().as_unix_timestamp()), None)
559            .await?;
560        peers
561            .update(&peer, Ok(current_time().as_unix_timestamp()), None)
562            .await?;
563        peers.update(&peer, Err(()), None).await?; // should drop to ignored
564
565        peers.update(&peer, Err(()), None).await.expect("no error should occur"); // should drop from network
566
567        assert!(peers.is_ignored(&peer).await);
568
569        // peer should remain ignored and not be added
570        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
571
572        assert!(peers.is_ignored(&peer).await);
573
574        Ok(())
575    }
576
577    #[async_std::test]
578    async fn test_network_should_be_able_to_register_a_failed_heartbeat_result() -> anyhow::Result<()> {
579        let peer: PeerId = OffchainKeypair::random().public().into();
580        let me: PeerId = OffchainKeypair::random().public().into();
581
582        let peers = basic_network(&me).await?;
583
584        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
585
586        // Needs to do 3 pings, so we get over the ignore threshold limit
587        // when doing the 4th failed ping
588        peers
589            .update(&peer, Ok(std::time::Duration::from_millis(123_u64)), None)
590            .await?;
591        peers
592            .update(&peer, Ok(std::time::Duration::from_millis(200_u64)), None)
593            .await?;
594        peers
595            .update(&peer, Ok(std::time::Duration::from_millis(200_u64)), None)
596            .await?;
597
598        peers.update(&peer, Err(()), None).await?;
599
600        let actual = peers.get(&peer).await?.expect("the peer record should be present");
601
602        assert_eq!(actual.heartbeats_succeeded, 3);
603        assert_eq!(actual.backoff, 300f64);
604
605        Ok(())
606    }
607
608    #[async_std::test]
609    async fn test_network_peer_should_be_listed_for_the_ping_if_last_recorded_later_than_reference(
610    ) -> anyhow::Result<()> {
611        let first: PeerId = OffchainKeypair::random().public().into();
612        let second: PeerId = OffchainKeypair::random().public().into();
613        let me: PeerId = OffchainKeypair::random().public().into();
614
615        let peers = basic_network(&me).await?;
616
617        peers.add(&first, PeerOrigin::IncomingConnection, vec![]).await?;
618        peers.add(&second, PeerOrigin::IncomingConnection, vec![]).await?;
619
620        let latency = 77_u64;
621
622        let mut expected = vec![first, second];
623        expected.sort();
624
625        peers
626            .update(&first, Ok(std::time::Duration::from_millis(latency)), None)
627            .await?;
628        peers
629            .update(&second, Ok(std::time::Duration::from_millis(latency)), None)
630            .await?;
631
632        // assert_eq!(
633        //     format!(
634        //         "{:?}",
635        //         peers.should_still_be_ignored(&peers.get(&first).await.unwrap().unwrap())
636        //     ),
637        //     ""
638        // );
639        // assert_eq!(format!("{:?}", peers.get(&first).await), "");
640
641        let mut actual = peers
642            .find_peers_to_ping(current_time().add(Duration::from_secs(2u64)))
643            .await?;
644        actual.sort();
645
646        assert_eq!(actual, expected);
647
648        Ok(())
649    }
650
651    #[async_std::test]
652    async fn test_network_should_have_red_health_without_any_registered_peers() -> anyhow::Result<()> {
653        let me: PeerId = OffchainKeypair::random().public().into();
654
655        let peers = basic_network(&me).await?;
656
657        assert_eq!(peers.health().await, Health::Red);
658
659        Ok(())
660    }
661
662    #[async_std::test]
663    async fn test_network_should_be_unhealthy_without_any_heartbeat_updates() -> anyhow::Result<()> {
664        let peer: PeerId = OffchainKeypair::random().public().into();
665        let me: PeerId = OffchainKeypair::random().public().into();
666
667        let peers = basic_network(&me).await?;
668
669        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
670
671        // all peers are public
672        assert_eq!(peers.health().await, Health::Orange);
673
674        Ok(())
675    }
676
677    #[async_std::test]
678    async fn test_network_should_be_unhealthy_without_any_peers_once_the_health_was_known() -> anyhow::Result<()> {
679        let peer: PeerId = OffchainKeypair::random().public().into();
680        let me: PeerId = OffchainKeypair::random().public().into();
681
682        let peers = basic_network(&me).await?;
683
684        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
685        let _ = peers.health();
686        peers.remove(&peer).await?;
687
688        assert_eq!(peers.health().await, Health::Red);
689
690        Ok(())
691    }
692
693    #[async_std::test]
694    async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_low_quality() -> anyhow::Result<()> {
695        let peer: PeerId = OffchainKeypair::random().public().into();
696        let me: PeerId = OffchainKeypair::random().public().into();
697
698        let mut cfg = NetworkConfig::default();
699        cfg.quality_offline_threshold = 0.6;
700
701        let peers = Network::new(
702            me,
703            vec![],
704            cfg,
705            hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
706        );
707
708        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
709
710        peers
711            .update(&peer, Ok(current_time().as_unix_timestamp()), None)
712            .await?;
713
714        assert_eq!(peers.health().await, Health::Orange);
715
716        Ok(())
717    }
718
719    #[async_std::test]
720    async fn test_network_should_close_connection_to_peer_once_it_reaches_the_lowest_possible_quality(
721    ) -> anyhow::Result<()> {
722        let peer: PeerId = OffchainKeypair::random().public().into();
723        let public = peer;
724        let me: PeerId = OffchainKeypair::random().public().into();
725
726        let mut cfg = NetworkConfig::default();
727        cfg.quality_offline_threshold = 0.6;
728
729        let peers = Network::new(
730            me,
731            vec![],
732            cfg,
733            hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
734        );
735
736        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
737
738        assert_eq!(
739            peers.update(&peer, Err(()), None).await?,
740            Some(NetworkTriggeredEvent::CloseConnection(peer))
741        );
742
743        assert!(peers.is_ignored(&public).await);
744
745        Ok(())
746    }
747
748    #[async_std::test]
749    async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_high_quality_and_i_am_public(
750    ) -> anyhow::Result<()> {
751        let me: PeerId = OffchainKeypair::random().public().into();
752        let peer: PeerId = OffchainKeypair::random().public().into();
753
754        let mut cfg = NetworkConfig::default();
755        cfg.quality_offline_threshold = 0.3;
756
757        let peers = Network::new(
758            me,
759            vec![],
760            cfg,
761            hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
762        );
763
764        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
765
766        for _ in 0..3 {
767            peers
768                .update(&peer, Ok(current_time().as_unix_timestamp()), None)
769                .await?;
770        }
771
772        assert_eq!(peers.health().await, Health::Green);
773
774        Ok(())
775    }
776
777    #[async_std::test]
778    async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_high_quality_and_another_high_quality_non_public(
779    ) -> anyhow::Result<()> {
780        let peer: PeerId = OffchainKeypair::random().public().into();
781        let peer2: PeerId = OffchainKeypair::random().public().into();
782
783        let mut cfg = NetworkConfig::default();
784        cfg.quality_offline_threshold = 0.3;
785
786        let peers = Network::new(
787            OffchainKeypair::random().public().into(),
788            vec![],
789            cfg,
790            hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
791        );
792
793        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
794        peers.add(&peer2, PeerOrigin::IncomingConnection, vec![]).await?;
795
796        for _ in 0..3 {
797            peers
798                .update(&peer2, Ok(current_time().as_unix_timestamp()), None)
799                .await?;
800            peers
801                .update(&peer, Ok(current_time().as_unix_timestamp()), None)
802                .await?;
803        }
804
805        assert_eq!(peers.health().await, Health::Green);
806
807        Ok(())
808    }
809}