hopr_transport_network/
network.rs

1use std::{
2    collections::hash_set::HashSet,
3    time::{Duration, SystemTime},
4};
5
6use futures::StreamExt;
7pub use hopr_db_api::peers::{HoprDbPeersOperations, PeerOrigin, PeerSelector, PeerStatus, Stats};
8use hopr_platform::time::current_time;
9use libp2p_identity::PeerId;
10use multiaddr::Multiaddr;
11use tracing::debug;
12#[cfg(all(feature = "prometheus", not(test)))]
13use {
14    hopr_metrics::metrics::{MultiGauge, SimpleGauge},
15    hopr_primitive_types::prelude::*,
16};
17
18use crate::{config::NetworkConfig, errors::Result};
19
20#[cfg(all(feature = "prometheus", not(test)))]
21lazy_static::lazy_static! {
22    static ref METRIC_NETWORK_HEALTH: SimpleGauge =
23        SimpleGauge::new("hopr_network_health", "Connectivity health indicator").unwrap();
24    static ref METRIC_PEERS_BY_QUALITY: MultiGauge =
25        MultiGauge::new("hopr_peers_by_quality", "Number different peer types by quality",
26            &["type", "quality"],
27        ).unwrap();
28    static ref METRIC_PEER_COUNT: SimpleGauge =
29        SimpleGauge::new("hopr_peer_count", "Number of all peers").unwrap();
30    static ref METRIC_NETWORK_HEALTH_TIME_TO_GREEN: SimpleGauge = SimpleGauge::new(
31        "hopr_time_to_green_sec",
32        "Time it takes for a node to transition to the GREEN network state"
33    ).unwrap();
34}
35
36/// Network health represented with colors, where green is the best and red
37/// is the worst possible observed nework quality.
38#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, strum::Display, strum::EnumString)]
39pub enum Health {
40    /// Unknown health, on application startup
41    Unknown = 0,
42    /// No connection, default
43    Red = 1,
44    /// Low quality connection to at least 1 public relay
45    Orange = 2,
46    /// High quality connection to at least 1 public relay
47    Yellow = 3,
48    /// High quality connection to at least 1 public relay and 1 NAT node
49    Green = 4,
50}
51
52/// Calculate the health factor for network from the available stats
53fn health_from_stats(stats: &Stats, is_public: bool) -> Health {
54    let mut health = Health::Red;
55
56    if stats.bad_quality_public > 0 {
57        health = Health::Orange;
58    }
59
60    if stats.good_quality_public > 0 {
61        health = if is_public || stats.good_quality_non_public > 0 {
62            Health::Green
63        } else {
64            Health::Yellow
65        };
66    }
67
68    health
69}
70
71#[derive(Debug, Clone, Copy)]
72pub enum UpdateFailure {
73    /// Check timed out
74    Timeout,
75    /// Dial failure
76    DialFailure,
77}
78
79/// The network object storing information about the running observed state of the network,
80/// including peers, connection qualities and updates for other parts of the system.
81#[derive(Debug)]
82pub struct Network<T>
83where
84    T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
85{
86    me: PeerId,
87    me_addresses: Vec<Multiaddr>,
88    am_i_public: bool,
89    cfg: NetworkConfig,
90    db: T,
91    #[cfg(all(feature = "prometheus", not(test)))]
92    started_at: Duration,
93}
94
95impl<T> Network<T>
96where
97    T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
98{
99    pub fn new(my_peer_id: PeerId, my_multiaddresses: Vec<Multiaddr>, cfg: NetworkConfig, db: T) -> Self {
100        #[cfg(all(feature = "prometheus", not(test)))]
101        {
102            METRIC_NETWORK_HEALTH.set(0.0);
103            METRIC_NETWORK_HEALTH_TIME_TO_GREEN.set(0.0);
104            METRIC_PEERS_BY_QUALITY.set(&["public", "high"], 0.0);
105            METRIC_PEERS_BY_QUALITY.set(&["public", "low"], 0.0);
106            METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "high"], 0.0);
107            METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "low"], 0.0);
108        }
109
110        Self {
111            me: my_peer_id,
112            me_addresses: my_multiaddresses,
113            am_i_public: true,
114            cfg,
115            db,
116            #[cfg(all(feature = "prometheus", not(test)))]
117            started_at: current_time().as_unix_timestamp(),
118        }
119    }
120
121    /// Check whether the PeerId is present in the network.
122    #[tracing::instrument(level = "debug", skip(self), ret(Display))]
123    pub async fn has(&self, peer: &PeerId) -> bool {
124        peer == &self.me || self.db.get_network_peer(peer).await.is_ok_and(|p| p.is_some())
125    }
126
127    /// Add a new peer into the network.
128    ///
129    /// Each peer must have an origin specification.
130    #[tracing::instrument(level = "debug", skip(self), ret(level = "trace"), err)]
131    pub async fn add(&self, peer: &PeerId, origin: PeerOrigin, mut addrs: Vec<Multiaddr>) -> Result<()> {
132        if peer == &self.me {
133            return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
134        }
135
136        if let Some(mut peer_status) = self.db.get_network_peer(peer).await? {
137            debug!(%peer, %origin, multiaddresses = ?addrs, "Updating existing peer in the store");
138
139            if !peer_status.is_ignored() || matches!(origin, PeerOrigin::IncomingConnection) {
140                peer_status.ignored_until = None;
141            }
142
143            peer_status.multiaddresses.append(&mut addrs);
144            peer_status.multiaddresses = peer_status
145                .multiaddresses
146                .into_iter()
147                .collect::<HashSet<_>>()
148                .into_iter()
149                .collect::<Vec<_>>();
150            self.db.update_network_peer(peer_status).await?;
151        } else {
152            debug!(%peer, %origin, multiaddresses = ?addrs, "Adding peer to the store");
153
154            self.db
155                .add_network_peer(
156                    peer,
157                    origin,
158                    addrs,
159                    self.cfg.backoff_exponent,
160                    self.cfg.quality_avg_window_size,
161                )
162                .await?;
163        }
164
165        #[cfg(all(feature = "prometheus", not(test)))]
166        {
167            let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
168            self.refresh_metrics(&stats)
169        }
170
171        Ok(())
172    }
173
174    /// Get peer information and status.
175    #[tracing::instrument(level = "debug", skip(self), ret(level = "trace"), err)]
176    pub async fn get(&self, peer: &PeerId) -> Result<Option<PeerStatus>> {
177        if peer == &self.me {
178            Ok(Some({
179                let mut ps = PeerStatus::new(*peer, PeerOrigin::Initialization, 0.0f64, 2u32);
180                ps.multiaddresses.clone_from(&self.me_addresses);
181                ps
182            }))
183        } else {
184            Ok(self.db.get_network_peer(peer).await?)
185        }
186    }
187
188    /// Remove peer from the network
189    #[tracing::instrument(level = "debug", skip(self), ret(level = "trace"), err)]
190    pub async fn remove(&self, peer: &PeerId) -> Result<()> {
191        if peer == &self.me {
192            return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
193        }
194
195        self.db.remove_network_peer(peer).await?;
196
197        #[cfg(all(feature = "prometheus", not(test)))]
198        {
199            let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
200            self.refresh_metrics(&stats);
201            tracing::trace!(
202                health = %health_from_stats(&stats, self.am_i_public),
203                trigger = "peer removal",
204                "Network health updated"
205            );
206        }
207
208        Ok(())
209    }
210
211    /// Updates a peer's record with the result of a heartbeat ping.
212    ///
213    /// Adjusts the peer's quality, backoff, and ignore status based on the ping outcome. If the peer's quality drops
214    /// below configured thresholds, may trigger a connection close or quality update event. Returns an error if called
215    /// on the local peer.
216    ///
217    /// # Returns
218    /// - `Ok(Some(NetworkTriggeredEvent))` if the peer's status changed and an event should be triggered.
219    /// - `Ok(None)` if the peer is unknown.
220    /// - `Err(NetworkingError)` if the operation is disallowed or a database error occurs.
221    #[tracing::instrument(level = "debug", skip(self), ret(level = "trace"), err)]
222    pub async fn update(&self, peer: &PeerId, ping_result: std::result::Result<Duration, UpdateFailure>) -> Result<()> {
223        if peer == &self.me {
224            return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
225        }
226
227        if let Some(mut entry) = self.db.get_network_peer(peer).await? {
228            entry.heartbeats_sent += 1;
229
230            match ping_result {
231                Ok(latency) => {
232                    if !entry.is_ignored() {
233                        entry.ignored_until = None;
234                    }
235                    entry.last_seen = current_time();
236                    entry.last_seen_latency = latency;
237                    entry.heartbeats_succeeded += 1;
238                    // reset backoff in case of a successful ping
239                    entry.backoff = self.cfg.backoff_min;
240                    entry.update_quality(1.0_f64.min(entry.get_quality() + self.cfg.quality_step));
241                }
242                Err(error) => match error {
243                    UpdateFailure::Timeout => {
244                        tracing::trace!("Update failed with timeout");
245                        // increase backoff in case of a failed ping, but cap it at the max backoff to
246                        // prevent entries from being shut out
247                        entry.backoff = self.cfg.backoff_max.min(entry.backoff.powf(self.cfg.backoff_exponent));
248                        entry.update_quality(0.0_f64.max(entry.get_quality() - self.cfg.quality_step));
249
250                        let q = entry.get_quality();
251
252                        if q < self.cfg.quality_bad_threshold {
253                            entry.ignored_until = Some(current_time() + self.cfg.ignore_timeframe);
254                        }
255                    }
256                    UpdateFailure::DialFailure => {
257                        tracing::trace!("Update failed with dial failure");
258                        entry.update_quality(0.0_f64);
259                        entry.ignored_until = Some(
260                            current_time()
261                                + crate::config::DEFAULT_CANNOT_DIAL_PENALTY
262                                + std::time::Duration::from_secs(hopr_crypto_random::random_integer(0, Some(600))),
263                        );
264                    }
265                },
266            }
267
268            tracing::trace!(%peer, quality = entry.quality, quality_avg = hopr_primitive_types::sma::SMA::average(&entry.quality_avg), "Updating peer status in the store");
269            self.db.update_network_peer(entry).await?;
270
271            #[cfg(all(feature = "prometheus", not(test)))]
272            {
273                let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
274                self.refresh_metrics(&stats);
275                tracing::trace!(
276                    health = %health_from_stats(&stats, self.am_i_public),
277                    trigger = "peer update",
278                    "Network health updated"
279                );
280            }
281
282            Ok(())
283        } else {
284            debug!(%peer, "Ignoring update request for unknown peer");
285            Ok(())
286        }
287    }
288
289    /// Returns the quality of the network as a network health indicator.
290    pub async fn health(&self) -> Health {
291        self.db
292            .network_peer_stats(self.cfg.quality_bad_threshold)
293            .await
294            .map(|stats| health_from_stats(&stats, self.am_i_public))
295            .unwrap_or(Health::Unknown)
296    }
297
298    /// Update the internally perceived network status that is processed to the network health
299    #[cfg(all(feature = "prometheus", not(test)))]
300    fn refresh_metrics(&self, stats: &Stats) {
301        let health = health_from_stats(stats, self.am_i_public);
302
303        if METRIC_NETWORK_HEALTH_TIME_TO_GREEN.get() < 0.5f64 {
304            if let Some(ts) = current_time().checked_sub(self.started_at) {
305                METRIC_NETWORK_HEALTH_TIME_TO_GREEN.set(ts.as_unix_timestamp().as_secs_f64());
306            }
307        }
308        METRIC_PEER_COUNT.set(stats.all_count() as f64);
309        METRIC_PEERS_BY_QUALITY.set(&["public", "high"], stats.good_quality_public as f64);
310        METRIC_PEERS_BY_QUALITY.set(&["public", "low"], stats.bad_quality_public as f64);
311        METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "high"], stats.good_quality_non_public as f64);
312        METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "low"], stats.bad_quality_non_public as f64);
313        METRIC_NETWORK_HEALTH.set((health as i32).into());
314    }
315
316    pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
317        let minimum_quality = self.cfg.quality_offline_threshold;
318        self.peer_filter(|peer| async move { (peer.get_quality() > minimum_quality).then_some(peer.id.1) })
319            .await
320    }
321
322    // ======
323    pub(crate) async fn peer_filter<Fut, V, F>(&self, filter: F) -> Result<Vec<V>>
324    where
325        F: FnMut(PeerStatus) -> Fut,
326        Fut: std::future::Future<Output = Option<V>>,
327    {
328        Ok(self
329            .db
330            .get_network_peers(Default::default(), false)
331            .await?
332            .filter_map(filter)
333            .collect()
334            .await)
335    }
336
337    /// Returns a list of peer IDs eligible for pinging based on last-seen time, ignore status, and backoff delay.
338    ///
339    /// Peers are filtered to exclude self, those currently within their ignore timeframe, and those whose
340    /// backoff-adjusted delay has not yet elapsed. The resulting peers are sorted by last seen time in ascending order.
341    ///
342    /// # Parameters
343    /// - `threshold`: The cutoff `SystemTime`; only peers whose next ping is due before this time are considered.
344    ///
345    /// # Returns
346    /// A vector of peer IDs that should be pinged.
347    #[tracing::instrument(level = "debug", skip(self, threshold), ret(level = "trace"), err, fields(since = ?threshold))]
348    pub async fn find_peers_to_ping(&self, threshold: SystemTime) -> Result<Vec<PeerId>> {
349        Ok(self
350            .db
351            .get_network_peers(PeerSelector::default().with_last_seen_lte(threshold), true)
352            .await?
353            .filter_map(|v| async move {
354                if v.id.1 == self.me {
355                    return None;
356                }
357
358                if let Some(ignore_start) = v.ignored_until {
359                    let should_be_ignored = ignore_start
360                        .checked_add(self.cfg.ignore_timeframe)
361                        .is_some_and(|v| v > threshold);
362
363                    if should_be_ignored {
364                        return None;
365                    }
366                }
367
368                let backoff = v.backoff.powf(self.cfg.backoff_exponent);
369                let delay = std::cmp::min(self.cfg.min_delay * (backoff as u32), self.cfg.max_delay);
370
371                if (v.last_seen + delay) < threshold {
372                    Some(v.id.1)
373                } else {
374                    None
375                }
376            })
377            .collect()
378            .await)
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use std::{ops::Add, time::Duration};
385
386    use anyhow::Context;
387    use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
388    use hopr_platform::time::native::current_time;
389    use hopr_primitive_types::prelude::AsUnixTimestamp;
390    use libp2p_identity::PeerId;
391    use more_asserts::*;
392
393    use super::*;
394    use crate::network::{Health, Network, NetworkConfig, PeerOrigin};
395
396    impl<T> Network<T>
397    where
398        T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
399    {
400        /// Checks if the peer is present in the network, but it is being currently ignored.
401        async fn is_ignored(&self, peer: &PeerId) -> bool {
402            peer != &self.me && self.get(peer).await.is_ok_and(|ps| ps.is_some_and(|p| p.is_ignored()))
403        }
404    }
405
406    #[test]
407    fn test_network_health_should_serialize_to_a_proper_string() {
408        assert_eq!(format!("{}", Health::Orange), "Orange".to_owned())
409    }
410
411    #[test]
412    fn test_network_health_should_deserialize_from_proper_string() -> anyhow::Result<()> {
413        let parsed: Health = "Orange".parse()?;
414        assert_eq!(parsed, Health::Orange);
415
416        Ok(())
417    }
418
419    async fn basic_network(my_id: &PeerId) -> anyhow::Result<Network<hopr_db_sql::db::HoprDb>> {
420        let cfg = NetworkConfig {
421            quality_offline_threshold: 0.6,
422            ..Default::default()
423        };
424        Ok(Network::new(
425            *my_id,
426            vec![],
427            cfg,
428            hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
429        ))
430    }
431
432    #[test]
433    fn test_network_health_should_be_ordered_numerically_for_hopr_metrics_output() {
434        assert_eq!(Health::Unknown as i32, 0);
435        assert_eq!(Health::Red as i32, 1);
436        assert_eq!(Health::Orange as i32, 2);
437        assert_eq!(Health::Yellow as i32, 3);
438        assert_eq!(Health::Green as i32, 4);
439    }
440
441    #[tokio::test]
442    async fn test_network_should_not_be_able_to_add_self_reference() -> anyhow::Result<()> {
443        let me = PeerId::random();
444
445        let peers = basic_network(&me).await?;
446
447        assert!(peers.add(&me, PeerOrigin::IncomingConnection, vec![]).await.is_err());
448
449        assert_eq!(
450            0,
451            peers
452                .peer_filter(|peer| async move { Some(peer.id) })
453                .await
454                .unwrap_or(vec![])
455                .len()
456        );
457        assert!(peers.has(&me).await);
458
459        Ok(())
460    }
461
462    #[tokio::test]
463    async fn test_network_should_contain_a_registered_peer() -> anyhow::Result<()> {
464        let expected: PeerId = OffchainKeypair::random().public().into();
465        let me: PeerId = OffchainKeypair::random().public().into();
466
467        let peers = basic_network(&me).await?;
468
469        peers.add(&expected, PeerOrigin::IncomingConnection, vec![]).await?;
470
471        assert_eq!(
472            1,
473            peers
474                .peer_filter(|peer| async move { Some(peer.id) })
475                .await
476                .unwrap_or(vec![])
477                .len()
478        );
479        assert!(peers.has(&expected).await);
480
481        Ok(())
482    }
483
484    #[tokio::test]
485    async fn test_network_should_remove_a_peer_on_unregistration() -> anyhow::Result<()> {
486        let peer: PeerId = OffchainKeypair::random().public().into();
487        let me: PeerId = OffchainKeypair::random().public().into();
488
489        let peers = basic_network(&me).await?;
490
491        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
492
493        peers.remove(&peer).await?;
494
495        assert_eq!(
496            0,
497            peers
498                .peer_filter(|peer| async move { Some(peer.id) })
499                .await
500                .unwrap_or(vec![])
501                .len()
502        );
503        assert!(!peers.has(&peer).await);
504
505        Ok(())
506    }
507
508    #[tokio::test]
509    async fn test_network_should_ignore_heartbeat_updates_for_peers_that_were_not_registered() -> anyhow::Result<()> {
510        let peer: PeerId = OffchainKeypair::random().public().into();
511        let me: PeerId = OffchainKeypair::random().public().into();
512
513        let peers = basic_network(&me).await?;
514
515        peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
516
517        assert_eq!(
518            0,
519            peers
520                .peer_filter(|peer| async move { Some(peer.id) })
521                .await
522                .unwrap_or(vec![])
523                .len()
524        );
525        assert!(!peers.has(&peer).await);
526
527        Ok(())
528    }
529
530    #[tokio::test]
531    async fn test_network_should_be_able_to_register_a_succeeded_heartbeat_result() -> anyhow::Result<()> {
532        let peer: PeerId = OffchainKeypair::random().public().into();
533        let me: PeerId = OffchainKeypair::random().public().into();
534
535        let peers = basic_network(&me).await?;
536
537        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
538
539        let latency = 123u64;
540
541        peers
542            .update(&peer, Ok(std::time::Duration::from_millis(latency)))
543            .await?;
544
545        let actual = peers.get(&peer).await?.expect("peer record should be present");
546
547        assert_eq!(actual.heartbeats_sent, 1);
548        assert_eq!(actual.heartbeats_succeeded, 1);
549        assert_eq!(actual.last_seen_latency, std::time::Duration::from_millis(latency));
550
551        Ok(())
552    }
553
554    #[tokio::test]
555    async fn test_network_update_should_merge_metadata() -> anyhow::Result<()> {
556        let peer: PeerId = OffchainKeypair::random().public().into();
557        let me: PeerId = OffchainKeypair::random().public().into();
558
559        let peers = basic_network(&me).await?;
560
561        let ts = Duration::from_millis(100);
562
563        {
564            peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
565            peers.update(&peer, Ok(ts)).await?;
566
567            let status = peers.get(&peer).await?.context("peer should be present")?;
568
569            assert_eq!(status.last_seen_latency, ts);
570        }
571
572        let ts = Duration::from_millis(200);
573
574        {
575            peers.update(&peer, Ok(ts)).await?;
576
577            let status = peers.get(&peer).await?.context("peer should be present")?;
578
579            assert_eq!(status.last_seen_latency, ts);
580        }
581
582        Ok(())
583    }
584
585    #[tokio::test]
586    async fn network_should_ignore_a_peer_that_has_reached_lower_thresholds_a_specified_amount_of_time()
587    -> anyhow::Result<()> {
588        let peer: PeerId = OffchainKeypair::random().public().into();
589        let me: PeerId = OffchainKeypair::random().public().into();
590
591        let peers = basic_network(&me).await?;
592
593        peers.add(&peer, PeerOrigin::NetworkRegistry, vec![]).await?;
594
595        peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
596        peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
597        peers.update(&peer, Err(UpdateFailure::Timeout)).await?; // should drop to ignored
598
599        peers
600            .update(&peer, Err(UpdateFailure::Timeout))
601            .await
602            .expect("no error should occur"); // should drop from network
603
604        assert!(peers.is_ignored(&peer).await);
605
606        // peer should remain ignored and not be added
607        peers.add(&peer, PeerOrigin::ManualPing, vec![]).await?;
608
609        assert!(peers.is_ignored(&peer).await);
610
611        Ok(())
612    }
613
614    #[tokio::test]
615    async fn network_should_stop_ignoring_a_peer_that_has_reached_lower_thresholds_but_connected_back()
616    -> anyhow::Result<()> {
617        let peer: PeerId = OffchainKeypair::random().public().into();
618        let me: PeerId = OffchainKeypair::random().public().into();
619
620        let peers = basic_network(&me).await?;
621
622        peers.add(&peer, PeerOrigin::NetworkRegistry, vec![]).await?;
623
624        peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
625        peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
626        peers.update(&peer, Err(UpdateFailure::Timeout)).await?; // should drop to ignored
627
628        peers
629            .update(&peer, Err(UpdateFailure::Timeout))
630            .await
631            .expect("no error should occur"); // should drop from network
632
633        assert!(peers.is_ignored(&peer).await);
634
635        // peer should remain ignored and not be added
636        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
637
638        assert!(!peers.is_ignored(&peer).await);
639
640        Ok(())
641    }
642
643    #[tokio::test]
644    async fn network_should_ignore_a_peer_that_could_not_be_dialed() -> anyhow::Result<()> {
645        let peer: PeerId = OffchainKeypair::random().public().into();
646        let me: PeerId = OffchainKeypair::random().public().into();
647
648        let peers = basic_network(&me).await?;
649
650        peers.add(&peer, PeerOrigin::NetworkRegistry, vec![]).await?;
651
652        peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
653        peers.update(&peer, Err(UpdateFailure::DialFailure)).await?; // should drop to ignored
654
655        assert!(peers.is_ignored(&peer).await);
656
657        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
658
659        assert!(!peers.is_ignored(&peer).await);
660
661        Ok(())
662    }
663
664    #[tokio::test]
665    async fn test_network_should_be_able_to_register_a_failed_heartbeat_result() -> anyhow::Result<()> {
666        let peer: PeerId = OffchainKeypair::random().public().into();
667        let me: PeerId = OffchainKeypair::random().public().into();
668
669        let peers = basic_network(&me).await?;
670
671        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
672
673        // Needs to do 3 pings, so we get over the ignore threshold limit
674        // when doing the 4th failed ping
675        peers
676            .update(&peer, Ok(std::time::Duration::from_millis(123_u64)))
677            .await?;
678        peers
679            .update(&peer, Ok(std::time::Duration::from_millis(200_u64)))
680            .await?;
681        peers
682            .update(&peer, Ok(std::time::Duration::from_millis(200_u64)))
683            .await?;
684
685        peers.update(&peer, Err(UpdateFailure::Timeout)).await?;
686
687        let actual = peers.get(&peer).await?.expect("the peer record should be present");
688
689        assert_eq!(actual.heartbeats_succeeded, 3);
690        assert_lt!(actual.backoff, 3f64);
691        assert_gt!(actual.backoff, 2f64);
692
693        Ok(())
694    }
695
696    #[tokio::test]
697    async fn test_network_should_not_overflow_max_backoff() -> anyhow::Result<()> {
698        let peer: PeerId = OffchainKeypair::random().public().into();
699        let me: PeerId = OffchainKeypair::random().public().into();
700
701        let peers = basic_network(&me).await?;
702
703        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
704
705        for latency in [123_u64, 200_u64, 200_u64] {
706            peers
707                .update(&peer, Ok(std::time::Duration::from_millis(latency)))
708                .await?;
709        }
710
711        // iterate until max backoff is reached
712        loop {
713            let updated_peer = peers.get(&peer).await?.expect("the peer record should be present");
714            if updated_peer.backoff == peers.cfg.backoff_max {
715                break;
716            }
717
718            peers.update(&peer, Err(UpdateFailure::Timeout)).await?;
719        }
720
721        // perform one more failing heartbeat update and ensure max backoff is not exceeded
722        peers.update(&peer, Err(UpdateFailure::Timeout)).await?;
723        let actual = peers.get(&peer).await?.expect("the peer record should be present");
724
725        assert_eq!(actual.backoff, peers.cfg.backoff_max);
726
727        Ok(())
728    }
729
730    #[tokio::test]
731    async fn test_network_peer_should_be_listed_for_the_ping_if_last_recorded_later_than_reference()
732    -> anyhow::Result<()> {
733        let first: PeerId = OffchainKeypair::random().public().into();
734        let second: PeerId = OffchainKeypair::random().public().into();
735        let me: PeerId = OffchainKeypair::random().public().into();
736
737        let peers = basic_network(&me).await?;
738
739        peers.add(&first, PeerOrigin::IncomingConnection, vec![]).await?;
740        peers.add(&second, PeerOrigin::IncomingConnection, vec![]).await?;
741
742        let latency = 77_u64;
743
744        let mut expected = vec![first, second];
745        expected.sort();
746
747        peers
748            .update(&first, Ok(std::time::Duration::from_millis(latency)))
749            .await?;
750        peers
751            .update(&second, Ok(std::time::Duration::from_millis(latency)))
752            .await?;
753
754        // assert_eq!(
755        //     format!(
756        //         "{:?}",
757        //         peers.should_still_be_ignored(&peers.get(&first).await.unwrap().unwrap())
758        //     ),
759        //     ""
760        // );
761        // assert_eq!(format!("{:?}", peers.get(&first).await), "");
762
763        let mut actual = peers
764            .find_peers_to_ping(current_time().add(Duration::from_secs(2u64)))
765            .await?;
766        actual.sort();
767
768        assert_eq!(actual, expected);
769
770        Ok(())
771    }
772
773    #[tokio::test]
774    async fn test_network_should_have_red_health_without_any_registered_peers() -> anyhow::Result<()> {
775        let me: PeerId = OffchainKeypair::random().public().into();
776
777        let peers = basic_network(&me).await?;
778
779        assert_eq!(peers.health().await, Health::Red);
780
781        Ok(())
782    }
783
784    #[tokio::test]
785    async fn test_network_should_be_unhealthy_without_any_heartbeat_updates() -> anyhow::Result<()> {
786        let peer: PeerId = OffchainKeypair::random().public().into();
787        let me: PeerId = OffchainKeypair::random().public().into();
788
789        let peers = basic_network(&me).await?;
790
791        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
792
793        // all peers are public
794        assert_eq!(peers.health().await, Health::Orange);
795
796        Ok(())
797    }
798
799    #[tokio::test]
800    async fn test_network_should_be_unhealthy_without_any_peers_once_the_health_was_known() -> anyhow::Result<()> {
801        let peer: PeerId = OffchainKeypair::random().public().into();
802        let me: PeerId = OffchainKeypair::random().public().into();
803
804        let peers = basic_network(&me).await?;
805
806        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
807        let _ = peers.health().await;
808        peers.remove(&peer).await?;
809
810        assert_eq!(peers.health().await, Health::Red);
811
812        Ok(())
813    }
814
815    #[tokio::test]
816    async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_low_quality() -> anyhow::Result<()> {
817        let peer: PeerId = OffchainKeypair::random().public().into();
818        let me: PeerId = OffchainKeypair::random().public().into();
819
820        let cfg = NetworkConfig {
821            quality_offline_threshold: 0.6,
822            ..Default::default()
823        };
824
825        let peers = Network::new(
826            me,
827            vec![],
828            cfg,
829            hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
830        );
831
832        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
833
834        peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
835
836        assert_eq!(peers.health().await, Health::Orange);
837
838        Ok(())
839    }
840
841    #[tokio::test]
842    async fn network_should_allow_the_quality_to_go_to_0() -> anyhow::Result<()> {
843        let peer: PeerId = OffchainKeypair::random().public().into();
844        let public = peer;
845        let me: PeerId = OffchainKeypair::random().public().into();
846
847        let cfg = NetworkConfig {
848            quality_offline_threshold: 0.6,
849            ..Default::default()
850        };
851
852        let peers = Network::new(
853            me,
854            vec![],
855            cfg,
856            hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
857        );
858
859        peers.add(&peer, PeerOrigin::NetworkRegistry, vec![]).await?;
860
861        assert!(peers.update(&peer, Err(UpdateFailure::Timeout)).await.is_ok());
862
863        assert!(peers.is_ignored(&public).await);
864
865        Ok(())
866    }
867
868    #[tokio::test]
869    async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_high_quality_and_i_am_public()
870    -> anyhow::Result<()> {
871        let me: PeerId = OffchainKeypair::random().public().into();
872        let peer: PeerId = OffchainKeypair::random().public().into();
873
874        let cfg = NetworkConfig {
875            quality_offline_threshold: 0.3,
876            ..Default::default()
877        };
878
879        let peers = Network::new(
880            me,
881            vec![],
882            cfg,
883            hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
884        );
885
886        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
887
888        for _ in 0..3 {
889            peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
890        }
891
892        assert_eq!(peers.health().await, Health::Green);
893
894        Ok(())
895    }
896
897    #[tokio::test]
898    async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_high_quality_and_another_high_quality_non_public()
899    -> anyhow::Result<()> {
900        let peer: PeerId = OffchainKeypair::random().public().into();
901        let peer2: PeerId = OffchainKeypair::random().public().into();
902
903        let cfg = NetworkConfig {
904            quality_offline_threshold: 0.3,
905            ..Default::default()
906        };
907
908        let peers = Network::new(
909            OffchainKeypair::random().public().into(),
910            vec![],
911            cfg,
912            hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
913        );
914
915        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
916        peers.add(&peer2, PeerOrigin::IncomingConnection, vec![]).await?;
917
918        for _ in 0..3 {
919            peers.update(&peer2, Ok(current_time().as_unix_timestamp())).await?;
920            peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
921        }
922
923        assert_eq!(peers.health().await, Health::Green);
924
925        Ok(())
926    }
927}