hopr_transport_network/
network.rs

1use std::{
2    collections::hash_set::HashSet,
3    time::{Duration, SystemTime},
4};
5
6use futures::StreamExt;
7pub use hopr_db_api::peers::{HoprDbPeersOperations, PeerOrigin, PeerSelector, PeerStatus, Stats};
8use hopr_platform::time::native::current_time;
9use libp2p_identity::PeerId;
10use multiaddr::Multiaddr;
11use tracing::debug;
12#[cfg(all(feature = "prometheus", not(test)))]
13use {
14    hopr_metrics::metrics::{MultiGauge, SimpleGauge},
15    hopr_primitive_types::prelude::*,
16};
17
18use crate::{config::NetworkConfig, errors::Result};
19
20#[cfg(all(feature = "prometheus", not(test)))]
21lazy_static::lazy_static! {
22    static ref METRIC_NETWORK_HEALTH: SimpleGauge =
23        SimpleGauge::new("hopr_network_health", "Connectivity health indicator").unwrap();
24    static ref METRIC_PEERS_BY_QUALITY: MultiGauge =
25        MultiGauge::new("hopr_peers_by_quality", "Number different peer types by quality",
26            &["type", "quality"],
27        ).unwrap();
28    static ref METRIC_PEER_COUNT: SimpleGauge =
29        SimpleGauge::new("hopr_peer_count", "Number of all peers").unwrap();
30    static ref METRIC_NETWORK_HEALTH_TIME_TO_GREEN: SimpleGauge = SimpleGauge::new(
31        "hopr_time_to_green_sec",
32        "Time it takes for a node to transition to the GREEN network state"
33    ).unwrap();
34}
35
36/// Network health represented with colors, where green is the best and red
37/// is the worst possible observed nework quality.
38#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, strum::Display, strum::EnumString)]
39pub enum Health {
40    /// Unknown health, on application startup
41    Unknown = 0,
42    /// No connection, default
43    Red = 1,
44    /// Low quality connection to at least 1 public relay
45    Orange = 2,
46    /// High quality connection to at least 1 public relay
47    Yellow = 3,
48    /// High quality connection to at least 1 public relay and 1 NAT node
49    Green = 4,
50}
51
52/// Calculate the health factor for network from the available stats
53fn health_from_stats(stats: &Stats, is_public: bool) -> Health {
54    let mut health = Health::Red;
55
56    if stats.bad_quality_public > 0 {
57        health = Health::Orange;
58    }
59
60    if stats.good_quality_public > 0 {
61        health = if is_public || stats.good_quality_non_public > 0 {
62            Health::Green
63        } else {
64            Health::Yellow
65        };
66    }
67
68    health
69}
70
71/// The network object storing information about the running observed state of the network,
72/// including peers, connection qualities and updates for other parts of the system.
73#[derive(Debug)]
74pub struct Network<T>
75where
76    T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
77{
78    me: PeerId,
79    me_addresses: Vec<Multiaddr>,
80    am_i_public: bool,
81    cfg: NetworkConfig,
82    db: T,
83    #[cfg(all(feature = "prometheus", not(test)))]
84    started_at: Duration,
85}
86
87impl<T> Network<T>
88where
89    T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
90{
91    pub fn new(my_peer_id: PeerId, my_multiaddresses: Vec<Multiaddr>, cfg: NetworkConfig, db: T) -> Self {
92        #[cfg(all(feature = "prometheus", not(test)))]
93        {
94            METRIC_NETWORK_HEALTH.set(0.0);
95            METRIC_NETWORK_HEALTH_TIME_TO_GREEN.set(0.0);
96            METRIC_PEERS_BY_QUALITY.set(&["public", "high"], 0.0);
97            METRIC_PEERS_BY_QUALITY.set(&["public", "low"], 0.0);
98            METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "high"], 0.0);
99            METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "low"], 0.0);
100        }
101
102        Self {
103            me: my_peer_id,
104            me_addresses: my_multiaddresses,
105            am_i_public: true,
106            cfg: cfg.clone(),
107            db,
108            #[cfg(all(feature = "prometheus", not(test)))]
109            started_at: current_time().as_unix_timestamp(),
110        }
111    }
112
113    /// Check whether the PeerId is present in the network.
114    pub async fn has(&self, peer: &PeerId) -> bool {
115        peer == &self.me || self.db.get_network_peer(peer).await.is_ok_and(|p| p.is_some())
116    }
117
118    /// Add a new peer into the network.
119    ///
120    /// Each peer must have an origin specification.
121    pub async fn add(&self, peer: &PeerId, origin: PeerOrigin, mut addrs: Vec<Multiaddr>) -> Result<()> {
122        if peer == &self.me {
123            return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
124        }
125
126        if let Some(mut peer_status) = self.db.get_network_peer(peer).await? {
127            if !peer_status.is_ignored(current_time(), self.cfg.ignore_timeframe) {
128                peer_status.ignored = None;
129                peer_status.multiaddresses.append(&mut addrs);
130                peer_status.multiaddresses = peer_status
131                    .multiaddresses
132                    .into_iter()
133                    .collect::<HashSet<_>>()
134                    .into_iter()
135                    .collect::<Vec<_>>();
136                self.db.update_network_peer(peer_status).await?;
137            }
138        } else {
139            debug!(%peer, %origin, multiaddresses = ?addrs, "Adding peer to the store");
140
141            self.db
142                .add_network_peer(
143                    peer,
144                    origin,
145                    addrs,
146                    self.cfg.backoff_exponent,
147                    self.cfg.quality_avg_window_size,
148                )
149                .await?;
150        }
151
152        #[cfg(all(feature = "prometheus", not(test)))]
153        {
154            let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
155            self.refresh_metrics(&stats)
156        }
157
158        Ok(())
159    }
160
161    /// Get peer information and status.
162    pub async fn get(&self, peer: &PeerId) -> Result<Option<PeerStatus>> {
163        if peer == &self.me {
164            Ok(Some({
165                let mut ps = PeerStatus::new(*peer, PeerOrigin::Initialization, 0.0f64, 2u32);
166                ps.multiaddresses.clone_from(&self.me_addresses);
167                ps
168            }))
169        } else {
170            Ok(self.db.get_network_peer(peer).await?)
171        }
172    }
173
174    /// Remove peer from the network
175    pub async fn remove(&self, peer: &PeerId) -> Result<()> {
176        if peer == &self.me {
177            return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
178        }
179
180        self.db.remove_network_peer(peer).await?;
181
182        #[cfg(all(feature = "prometheus", not(test)))]
183        {
184            let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
185            self.refresh_metrics(&stats)
186        }
187
188        Ok(())
189    }
190
191    /// Updates a peer's record with the result of a heartbeat ping.
192    ///
193    /// Adjusts the peer's quality, backoff, and ignore status based on the ping outcome. If the peer's quality drops
194    /// below configured thresholds, may trigger a connection close or quality update event. Returns an error if called
195    /// on the local peer.
196    ///
197    /// # Returns
198    /// - `Ok(Some(NetworkTriggeredEvent))` if the peer's status changed and an event should be triggered.
199    /// - `Ok(None)` if the peer is unknown.
200    /// - `Err(NetworkingError)` if the operation is disallowed or a database error occurs.
201    pub async fn update(
202        &self,
203        peer: &PeerId,
204        ping_result: std::result::Result<Duration, ()>,
205        version: Option<String>,
206    ) -> Result<()> {
207        if peer == &self.me {
208            return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
209        }
210
211        if let Some(mut entry) = self.db.get_network_peer(peer).await? {
212            if !entry.is_ignored(current_time(), self.cfg.ignore_timeframe) {
213                entry.ignored = None;
214            }
215
216            entry.heartbeats_sent += 1;
217            entry.peer_version = version;
218
219            if let Ok(latency) = ping_result {
220                entry.last_seen = current_time();
221                entry.last_seen_latency = latency;
222                entry.heartbeats_succeeded += 1;
223                // reset backoff in case of a successful ping
224                entry.backoff = self.cfg.backoff_min;
225                entry.update_quality(1.0_f64.min(entry.get_quality() + self.cfg.quality_step));
226            } else {
227                // increase backoff in case of a failed ping, but cap it at the max backoff to
228                // prevent entries from being shut out
229                entry.backoff = self.cfg.backoff_max.min(entry.backoff.powf(self.cfg.backoff_exponent));
230                entry.update_quality(0.0_f64.max(entry.get_quality() - self.cfg.quality_step));
231
232                let q = entry.get_quality();
233
234                if q < self.cfg.quality_bad_threshold {
235                    entry.ignored = Some(current_time());
236                }
237            }
238
239            self.db.update_network_peer(entry).await?;
240
241            #[cfg(all(feature = "prometheus", not(test)))]
242            {
243                let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
244                self.refresh_metrics(&stats)
245            }
246
247            Ok(())
248        } else {
249            debug!(%peer, "Ignoring update request for unknown peer");
250            Ok(())
251        }
252    }
253
254    /// Returns the quality of the network as a network health indicator.
255    pub async fn health(&self) -> Health {
256        self.db
257            .network_peer_stats(self.cfg.quality_bad_threshold)
258            .await
259            .map(|stats| health_from_stats(&stats, self.am_i_public))
260            .unwrap_or(Health::Unknown)
261    }
262
263    /// Update the internally perceived network status that is processed to the network health
264    #[cfg(all(feature = "prometheus", not(test)))]
265    fn refresh_metrics(&self, stats: &Stats) {
266        let health = health_from_stats(stats, self.am_i_public);
267
268        if METRIC_NETWORK_HEALTH_TIME_TO_GREEN.get() < 0.5f64 {
269            if let Some(ts) = current_time().checked_sub(self.started_at) {
270                METRIC_NETWORK_HEALTH_TIME_TO_GREEN.set(ts.as_unix_timestamp().as_secs_f64());
271            }
272        }
273        METRIC_PEER_COUNT.set(stats.all_count() as f64);
274        METRIC_PEERS_BY_QUALITY.set(&["public", "high"], stats.good_quality_public as f64);
275        METRIC_PEERS_BY_QUALITY.set(&["public", "low"], stats.bad_quality_public as f64);
276        METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "high"], stats.good_quality_non_public as f64);
277        METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "low"], stats.bad_quality_non_public as f64);
278        METRIC_NETWORK_HEALTH.set((health as i32).into());
279    }
280
281    pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
282        let minimum_quality = self.cfg.quality_offline_threshold;
283        self.peer_filter(|peer| async move { (peer.get_quality() > minimum_quality).then_some(peer.id.1) })
284            .await
285    }
286
287    // ======
288    pub(crate) async fn peer_filter<Fut, V, F>(&self, filter: F) -> Result<Vec<V>>
289    where
290        F: FnMut(PeerStatus) -> Fut,
291        Fut: std::future::Future<Output = Option<V>>,
292    {
293        let stream = self.db.get_network_peers(Default::default(), false).await?;
294        futures::pin_mut!(stream);
295        Ok(stream.filter_map(filter).collect().await)
296    }
297
298    /// Returns a list of peer IDs eligible for pinging based on last seen time, ignore status, and backoff delay.
299    ///
300    /// Peers are filtered to exclude self, those currently within their ignore timeframe, and those whose
301    /// backoff-adjusted delay has not yet elapsed. The resulting peers are sorted by last seen time in ascending order.
302    ///
303    /// # Parameters
304    /// - `threshold`: The cutoff `SystemTime`; only peers whose next ping is due before this time are considered.
305    ///
306    /// # Returns
307    /// A vector of peer IDs that should be pinged.
308    pub async fn find_peers_to_ping(&self, threshold: SystemTime) -> Result<Vec<PeerId>> {
309        let stream = self
310            .db
311            .get_network_peers(PeerSelector::default().with_last_seen_lte(threshold), false)
312            .await?;
313        futures::pin_mut!(stream);
314        let mut data: Vec<PeerStatus> = stream
315            .filter_map(|v| async move {
316                if v.id.1 == self.me {
317                    return None;
318                }
319
320                if let Some(ignore_start) = v.ignored {
321                    let should_be_ignored = ignore_start
322                        .checked_add(self.cfg.ignore_timeframe)
323                        .is_some_and(|v| v > threshold);
324
325                    if should_be_ignored {
326                        return None;
327                    }
328                }
329
330                let backoff = v.backoff.powf(self.cfg.backoff_exponent);
331                let delay = std::cmp::min(self.cfg.min_delay * (backoff as u32), self.cfg.max_delay);
332
333                if (v.last_seen + delay) < threshold {
334                    Some(v)
335                } else {
336                    None
337                }
338            })
339            .collect()
340            .await;
341
342        data.sort_by(|a, b| {
343            if a.last_seen < b.last_seen {
344                std::cmp::Ordering::Less
345            } else {
346                std::cmp::Ordering::Greater
347            }
348        });
349
350        Ok(data.into_iter().map(|peer| peer.id.1).collect())
351    }
352}
353
354impl<T> Network<T>
355where
356    T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
357{
358    #[cfg(test)]
359    /// Checks if the peer is present in the network, but it is being currently ignored.
360    async fn is_ignored(&self, peer: &PeerId) -> bool {
361        peer != &self.me
362            && self
363                .get(peer)
364                .await
365                .is_ok_and(|ps| ps.is_some_and(|p| p.is_ignored(current_time(), self.cfg.ignore_timeframe)))
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use std::{ops::Add, time::Duration};
372
373    use anyhow::Context;
374    use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
375    use hopr_platform::time::native::current_time;
376    use hopr_primitive_types::prelude::AsUnixTimestamp;
377    use libp2p_identity::PeerId;
378    use more_asserts::*;
379
380    use super::*;
381    use crate::network::{Health, Network, NetworkConfig, PeerOrigin};
382
383    #[test]
384    fn test_network_health_should_serialize_to_a_proper_string() {
385        assert_eq!(format!("{}", Health::Orange), "Orange".to_owned())
386    }
387
388    #[test]
389    fn test_network_health_should_deserialize_from_proper_string() -> anyhow::Result<()> {
390        let parsed: Health = "Orange".parse()?;
391        assert_eq!(parsed, Health::Orange);
392
393        Ok(())
394    }
395
396    async fn basic_network(my_id: &PeerId) -> anyhow::Result<Network<hopr_db_sql::db::HoprDb>> {
397        let cfg = NetworkConfig {
398            quality_offline_threshold: 0.6,
399            ..Default::default()
400        };
401        Ok(Network::new(
402            *my_id,
403            vec![],
404            cfg,
405            hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
406        ))
407    }
408
409    #[test]
410    fn test_network_health_should_be_ordered_numerically_for_hopr_metrics_output() {
411        assert_eq!(Health::Unknown as i32, 0);
412        assert_eq!(Health::Red as i32, 1);
413        assert_eq!(Health::Orange as i32, 2);
414        assert_eq!(Health::Yellow as i32, 3);
415        assert_eq!(Health::Green as i32, 4);
416    }
417
418    #[tokio::test]
419    async fn test_network_should_not_be_able_to_add_self_reference() -> anyhow::Result<()> {
420        let me = PeerId::random();
421
422        let peers = basic_network(&me).await?;
423
424        assert!(peers.add(&me, PeerOrigin::IncomingConnection, vec![]).await.is_err());
425
426        assert_eq!(
427            0,
428            peers
429                .peer_filter(|peer| async move { Some(peer.id) })
430                .await
431                .unwrap_or(vec![])
432                .len()
433        );
434        assert!(peers.has(&me).await);
435
436        Ok(())
437    }
438
439    #[tokio::test]
440    async fn test_network_should_contain_a_registered_peer() -> anyhow::Result<()> {
441        let expected: PeerId = OffchainKeypair::random().public().into();
442        let me: PeerId = OffchainKeypair::random().public().into();
443
444        let peers = basic_network(&me).await?;
445
446        peers.add(&expected, PeerOrigin::IncomingConnection, vec![]).await?;
447
448        assert_eq!(
449            1,
450            peers
451                .peer_filter(|peer| async move { Some(peer.id) })
452                .await
453                .unwrap_or(vec![])
454                .len()
455        );
456        assert!(peers.has(&expected).await);
457
458        Ok(())
459    }
460
461    #[tokio::test]
462    async fn test_network_should_remove_a_peer_on_unregistration() -> anyhow::Result<()> {
463        let peer: PeerId = OffchainKeypair::random().public().into();
464        let me: PeerId = OffchainKeypair::random().public().into();
465
466        let peers = basic_network(&me).await?;
467
468        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
469
470        peers.remove(&peer).await?;
471
472        assert_eq!(
473            0,
474            peers
475                .peer_filter(|peer| async move { Some(peer.id) })
476                .await
477                .unwrap_or(vec![])
478                .len()
479        );
480        assert!(!peers.has(&peer).await);
481
482        Ok(())
483    }
484
485    #[tokio::test]
486    async fn test_network_should_ignore_heartbeat_updates_for_peers_that_were_not_registered() -> anyhow::Result<()> {
487        let peer: PeerId = OffchainKeypair::random().public().into();
488        let me: PeerId = OffchainKeypair::random().public().into();
489
490        let peers = basic_network(&me).await?;
491
492        peers
493            .update(&peer, Ok(current_time().as_unix_timestamp()), None)
494            .await?;
495
496        assert_eq!(
497            0,
498            peers
499                .peer_filter(|peer| async move { Some(peer.id) })
500                .await
501                .unwrap_or(vec![])
502                .len()
503        );
504        assert!(!peers.has(&peer).await);
505
506        Ok(())
507    }
508
509    #[tokio::test]
510    async fn test_network_should_be_able_to_register_a_succeeded_heartbeat_result() -> anyhow::Result<()> {
511        let peer: PeerId = OffchainKeypair::random().public().into();
512        let me: PeerId = OffchainKeypair::random().public().into();
513
514        let peers = basic_network(&me).await?;
515
516        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
517
518        let latency = 123u64;
519
520        peers
521            .update(&peer, Ok(std::time::Duration::from_millis(latency)), None)
522            .await?;
523
524        let actual = peers.get(&peer).await?.expect("peer record should be present");
525
526        assert_eq!(actual.heartbeats_sent, 1);
527        assert_eq!(actual.heartbeats_succeeded, 1);
528        assert_eq!(actual.last_seen_latency, std::time::Duration::from_millis(latency));
529
530        Ok(())
531    }
532
533    #[tokio::test]
534    async fn test_network_update_should_merge_metadata() -> anyhow::Result<()> {
535        let peer: PeerId = OffchainKeypair::random().public().into();
536        let me: PeerId = OffchainKeypair::random().public().into();
537
538        let peers = basic_network(&me).await?;
539
540        let expected_version = Some("1.2.4".to_string());
541
542        {
543            peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
544            peers
545                .update(&peer, Ok(current_time().as_unix_timestamp()), expected_version.clone())
546                .await?;
547
548            let status = peers.get(&peer).await?.context("peer should be present")?;
549
550            assert_eq!(status.peer_version, expected_version);
551        }
552
553        let ts = current_time().as_unix_timestamp();
554
555        {
556            let expected_version = Some("2.0.0".to_string());
557
558            peers.update(&peer, Ok(ts), expected_version.clone()).await?;
559
560            let status = peers.get(&peer).await?.context("peer should be present")?;
561
562            assert_eq!(status.peer_version, expected_version);
563        }
564
565        Ok(())
566    }
567
568    #[tokio::test]
569    async fn test_network_should_ignore_a_peer_that_has_reached_lower_thresholds_a_specified_amount_of_time()
570    -> anyhow::Result<()> {
571        let peer: PeerId = OffchainKeypair::random().public().into();
572        let me: PeerId = OffchainKeypair::random().public().into();
573
574        let peers = basic_network(&me).await?;
575
576        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
577
578        peers
579            .update(&peer, Ok(current_time().as_unix_timestamp()), None)
580            .await?;
581        peers
582            .update(&peer, Ok(current_time().as_unix_timestamp()), None)
583            .await?;
584        peers.update(&peer, Err(()), None).await?; // should drop to ignored
585
586        peers.update(&peer, Err(()), None).await.expect("no error should occur"); // should drop from network
587
588        assert!(peers.is_ignored(&peer).await);
589
590        // peer should remain ignored and not be added
591        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
592
593        assert!(peers.is_ignored(&peer).await);
594
595        Ok(())
596    }
597
598    #[tokio::test]
599    async fn test_network_should_be_able_to_register_a_failed_heartbeat_result() -> anyhow::Result<()> {
600        let peer: PeerId = OffchainKeypair::random().public().into();
601        let me: PeerId = OffchainKeypair::random().public().into();
602
603        let peers = basic_network(&me).await?;
604
605        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
606
607        // Needs to do 3 pings, so we get over the ignore threshold limit
608        // when doing the 4th failed ping
609        peers
610            .update(&peer, Ok(std::time::Duration::from_millis(123_u64)), None)
611            .await?;
612        peers
613            .update(&peer, Ok(std::time::Duration::from_millis(200_u64)), None)
614            .await?;
615        peers
616            .update(&peer, Ok(std::time::Duration::from_millis(200_u64)), None)
617            .await?;
618
619        peers.update(&peer, Err(()), None).await?;
620
621        let actual = peers.get(&peer).await?.expect("the peer record should be present");
622
623        assert_eq!(actual.heartbeats_succeeded, 3);
624        assert_lt!(actual.backoff, 3f64);
625        assert_gt!(actual.backoff, 2f64);
626
627        Ok(())
628    }
629
630    #[tokio::test]
631    async fn test_network_should_not_overflow_max_backoff() -> anyhow::Result<()> {
632        let peer: PeerId = OffchainKeypair::random().public().into();
633        let me: PeerId = OffchainKeypair::random().public().into();
634
635        let peers = basic_network(&me).await?;
636
637        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
638
639        for latency in [123_u64, 200_u64, 200_u64] {
640            peers
641                .update(&peer, Ok(std::time::Duration::from_millis(latency)), None)
642                .await?;
643        }
644
645        // iterate until max backoff is reached
646        loop {
647            let updated_peer = peers.get(&peer).await?.expect("the peer record should be present");
648            if updated_peer.backoff == peers.cfg.backoff_max {
649                break;
650            }
651
652            peers.update(&peer, Err(()), None).await?;
653        }
654
655        // perform one more failing heartbeat update and ensure max backoff is not exceeded
656        peers.update(&peer, Err(()), None).await?;
657        let actual = peers.get(&peer).await?.expect("the peer record should be present");
658
659        assert_eq!(actual.backoff, peers.cfg.backoff_max);
660
661        Ok(())
662    }
663
664    #[tokio::test]
665    async fn test_network_peer_should_be_listed_for_the_ping_if_last_recorded_later_than_reference()
666    -> anyhow::Result<()> {
667        let first: PeerId = OffchainKeypair::random().public().into();
668        let second: PeerId = OffchainKeypair::random().public().into();
669        let me: PeerId = OffchainKeypair::random().public().into();
670
671        let peers = basic_network(&me).await?;
672
673        peers.add(&first, PeerOrigin::IncomingConnection, vec![]).await?;
674        peers.add(&second, PeerOrigin::IncomingConnection, vec![]).await?;
675
676        let latency = 77_u64;
677
678        let mut expected = vec![first, second];
679        expected.sort();
680
681        peers
682            .update(&first, Ok(std::time::Duration::from_millis(latency)), None)
683            .await?;
684        peers
685            .update(&second, Ok(std::time::Duration::from_millis(latency)), None)
686            .await?;
687
688        // assert_eq!(
689        //     format!(
690        //         "{:?}",
691        //         peers.should_still_be_ignored(&peers.get(&first).await.unwrap().unwrap())
692        //     ),
693        //     ""
694        // );
695        // assert_eq!(format!("{:?}", peers.get(&first).await), "");
696
697        let mut actual = peers
698            .find_peers_to_ping(current_time().add(Duration::from_secs(2u64)))
699            .await?;
700        actual.sort();
701
702        assert_eq!(actual, expected);
703
704        Ok(())
705    }
706
707    #[tokio::test]
708    async fn test_network_should_have_red_health_without_any_registered_peers() -> anyhow::Result<()> {
709        let me: PeerId = OffchainKeypair::random().public().into();
710
711        let peers = basic_network(&me).await?;
712
713        assert_eq!(peers.health().await, Health::Red);
714
715        Ok(())
716    }
717
718    #[tokio::test]
719    async fn test_network_should_be_unhealthy_without_any_heartbeat_updates() -> anyhow::Result<()> {
720        let peer: PeerId = OffchainKeypair::random().public().into();
721        let me: PeerId = OffchainKeypair::random().public().into();
722
723        let peers = basic_network(&me).await?;
724
725        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
726
727        // all peers are public
728        assert_eq!(peers.health().await, Health::Orange);
729
730        Ok(())
731    }
732
733    #[tokio::test]
734    async fn test_network_should_be_unhealthy_without_any_peers_once_the_health_was_known() -> anyhow::Result<()> {
735        let peer: PeerId = OffchainKeypair::random().public().into();
736        let me: PeerId = OffchainKeypair::random().public().into();
737
738        let peers = basic_network(&me).await?;
739
740        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
741        let _ = peers.health().await;
742        peers.remove(&peer).await?;
743
744        assert_eq!(peers.health().await, Health::Red);
745
746        Ok(())
747    }
748
749    #[tokio::test]
750    async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_low_quality() -> anyhow::Result<()> {
751        let peer: PeerId = OffchainKeypair::random().public().into();
752        let me: PeerId = OffchainKeypair::random().public().into();
753
754        let cfg = NetworkConfig {
755            quality_offline_threshold: 0.6,
756            ..Default::default()
757        };
758
759        let peers = Network::new(
760            me,
761            vec![],
762            cfg,
763            hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
764        );
765
766        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
767
768        peers
769            .update(&peer, Ok(current_time().as_unix_timestamp()), None)
770            .await?;
771
772        assert_eq!(peers.health().await, Health::Orange);
773
774        Ok(())
775    }
776
777    #[tokio::test]
778    async fn test_network_should_allow_the_quality_to_go_to_0() -> anyhow::Result<()> {
779        let peer: PeerId = OffchainKeypair::random().public().into();
780        let public = peer;
781        let me: PeerId = OffchainKeypair::random().public().into();
782
783        let cfg = NetworkConfig {
784            quality_offline_threshold: 0.6,
785            ..Default::default()
786        };
787
788        let peers = Network::new(
789            me,
790            vec![],
791            cfg,
792            hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
793        );
794
795        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
796
797        assert!(peers.update(&peer, Err(()), None).await.is_ok());
798
799        assert!(peers.is_ignored(&public).await);
800
801        Ok(())
802    }
803
804    #[tokio::test]
805    async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_high_quality_and_i_am_public()
806    -> anyhow::Result<()> {
807        let me: PeerId = OffchainKeypair::random().public().into();
808        let peer: PeerId = OffchainKeypair::random().public().into();
809
810        let cfg = NetworkConfig {
811            quality_offline_threshold: 0.3,
812            ..Default::default()
813        };
814
815        let peers = Network::new(
816            me,
817            vec![],
818            cfg,
819            hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
820        );
821
822        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
823
824        for _ in 0..3 {
825            peers
826                .update(&peer, Ok(current_time().as_unix_timestamp()), None)
827                .await?;
828        }
829
830        assert_eq!(peers.health().await, Health::Green);
831
832        Ok(())
833    }
834
835    #[tokio::test]
836    async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_high_quality_and_another_high_quality_non_public()
837    -> anyhow::Result<()> {
838        let peer: PeerId = OffchainKeypair::random().public().into();
839        let peer2: PeerId = OffchainKeypair::random().public().into();
840
841        let cfg = NetworkConfig {
842            quality_offline_threshold: 0.3,
843            ..Default::default()
844        };
845
846        let peers = Network::new(
847            OffchainKeypair::random().public().into(),
848            vec![],
849            cfg,
850            hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
851        );
852
853        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
854        peers.add(&peer2, PeerOrigin::IncomingConnection, vec![]).await?;
855
856        for _ in 0..3 {
857            peers
858                .update(&peer2, Ok(current_time().as_unix_timestamp()), None)
859                .await?;
860            peers
861                .update(&peer, Ok(current_time().as_unix_timestamp()), None)
862                .await?;
863        }
864
865        assert_eq!(peers.health().await, Health::Green);
866
867        Ok(())
868    }
869}