hopr_transport_network/
network.rs

1use std::{
2    collections::hash_set::HashSet,
3    time::{Duration, SystemTime},
4};
5
6use futures::StreamExt;
7use hopr_api::db::{HoprDbPeersOperations, PeerOrigin, PeerSelector, PeerStatus, Stats};
8use hopr_network_types::addr::is_public_address;
9use hopr_platform::time::current_time;
10#[cfg(all(feature = "prometheus", not(test)))]
11use hopr_primitive_types::prelude::*;
12use libp2p_identity::PeerId;
13use multiaddr::Multiaddr;
14use tracing::debug;
15
16use crate::{
17    config::NetworkConfig,
18    errors::{NetworkingError, Result},
19};
20
21#[cfg(all(feature = "prometheus", not(test)))]
22lazy_static::lazy_static! {
23    static ref METRIC_NETWORK_HEALTH:  hopr_metrics::SimpleGauge =
24         hopr_metrics::SimpleGauge::new("hopr_network_health", "Connectivity health indicator").unwrap();
25    static ref METRIC_PEERS_BY_QUALITY:  hopr_metrics::MultiGauge =
26         hopr_metrics::MultiGauge::new("hopr_peers_by_quality", "Number different peer types by quality",
27            &["type", "quality"],
28        ).unwrap();
29    static ref METRIC_PEER_COUNT:  hopr_metrics::SimpleGauge =
30         hopr_metrics::SimpleGauge::new("hopr_peer_count", "Number of all peers").unwrap();
31    static ref METRIC_NETWORK_HEALTH_TIME_TO_GREEN:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
32        "hopr_time_to_green_sec",
33        "Time it takes for a node to transition to the GREEN network state"
34    ).unwrap();
35}
36
37/// Network health represented with colors, where green is the best and red
38/// is the worst possible observed nework quality.
39#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, strum::Display, strum::EnumString)]
40pub enum Health {
41    /// Unknown health, on application startup
42    Unknown = 0,
43    /// No connection, default
44    Red = 1,
45    /// Low-quality connection to at least 1 public relay
46    Orange = 2,
47    /// High-quality connection to at least 1 public relay
48    Yellow = 3,
49    /// High-quality connection to at least 1 public relay and 1 NAT node
50    Green = 4,
51}
52
53/// Calculate the health factor for network from the available stats
54fn health_from_stats(stats: &Stats, is_public: bool) -> Health {
55    let mut health = Health::Red;
56
57    if stats.bad_quality_public > 0 {
58        health = Health::Orange;
59    }
60
61    if stats.good_quality_public > 0 {
62        health = if is_public || stats.good_quality_non_public > 0 {
63            Health::Green
64        } else {
65            Health::Yellow
66        };
67    }
68
69    health
70}
71
72#[derive(Debug, Clone, Copy)]
73pub enum UpdateFailure {
74    /// Check timed out
75    Timeout,
76    /// Dial failure
77    DialFailure,
78}
79
80/// The network object storing information about the running observed state of the network,
81/// including peers, connection qualities, and updates for other parts of the system.
82#[derive(Debug)]
83pub struct Network<T> {
84    me: PeerId,
85    me_addresses: Vec<Multiaddr>,
86    am_i_public: bool,
87    cfg: NetworkConfig,
88    db: T,
89    #[cfg(all(feature = "prometheus", not(test)))]
90    started_at: Duration,
91}
92
93impl<T> Network<T>
94where
95    T: HoprDbPeersOperations + Sync + Send,
96{
97    pub fn new(my_peer_id: PeerId, my_multiaddresses: Vec<Multiaddr>, cfg: NetworkConfig, db: T) -> Self {
98        #[cfg(all(feature = "prometheus", not(test)))]
99        {
100            METRIC_NETWORK_HEALTH.set(0.0);
101            METRIC_NETWORK_HEALTH_TIME_TO_GREEN.set(0.0);
102            METRIC_PEERS_BY_QUALITY.set(&["public", "high"], 0.0);
103            METRIC_PEERS_BY_QUALITY.set(&["public", "low"], 0.0);
104            METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "high"], 0.0);
105            METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "low"], 0.0);
106        }
107
108        Self {
109            me: my_peer_id,
110            me_addresses: my_multiaddresses,
111            am_i_public: true,
112            cfg,
113            db,
114            #[cfg(all(feature = "prometheus", not(test)))]
115            started_at: current_time().as_unix_timestamp(),
116        }
117    }
118
119    /// Check whether the PeerId is present in the network.
120    #[tracing::instrument(level = "debug", skip(self), ret(Display))]
121    pub async fn has(&self, peer: &PeerId) -> bool {
122        peer == &self.me || self.db.get_network_peer(peer).await.is_ok_and(|p| p.is_some())
123    }
124
125    /// Add a new peer into the network.
126    ///
127    /// Each peer must have an origin specification.
128    ///
129    /// Private multiaddresses (RFC1918, loopback, link-local) are filtered out before storing
130    /// to prevent private addresses from entering the peer store,
131    /// unless [`NetworkConfig::allow_private_addresses_in_store`] is set.
132    #[tracing::instrument(level = "debug", skip(self), ret(level = "trace"), err)]
133    pub async fn add(&self, peer: &PeerId, origin: PeerOrigin, mut addrs: Vec<Multiaddr>) -> Result<()> {
134        if peer == &self.me {
135            return Err(NetworkingError::DisallowedOperationOnOwnPeerIdError);
136        }
137
138        // Filter out private addresses before storing
139        addrs.retain(|a| self.cfg.allow_private_addresses_in_store || is_public_address(a));
140
141        debug!(%peer, %origin, multiaddresses = ?addrs, "Filtered addresses, proceeding with public addresses only");
142
143        if let Some(mut peer_status) = self
144            .db
145            .get_network_peer(peer)
146            .await
147            .map_err(|e| NetworkingError::DbChainError(e.into()))?
148        {
149            debug!(%peer, %origin, multiaddresses = ?addrs, "Updating existing peer in the store");
150
151            if !peer_status.is_ignored() || matches!(origin, PeerOrigin::IncomingConnection) {
152                peer_status.ignored_until = None;
153            }
154
155            peer_status.multiaddresses.append(&mut addrs);
156            peer_status.multiaddresses = peer_status
157                .multiaddresses
158                .into_iter()
159                .collect::<HashSet<_>>()
160                .into_iter()
161                .collect::<Vec<_>>();
162            self.db
163                .update_network_peer(peer_status)
164                .await
165                .map_err(|e| NetworkingError::DbChainError(e.into()))?;
166        } else {
167            debug!(%peer, %origin, multiaddresses = ?addrs, "Adding peer to the store");
168
169            self.db
170                .add_network_peer(
171                    peer,
172                    origin,
173                    addrs,
174                    self.cfg.backoff_exponent,
175                    self.cfg.quality_avg_window_size,
176                )
177                .await
178                .map_err(|e| NetworkingError::DbChainError(e.into()))?;
179        }
180
181        #[cfg(all(feature = "prometheus", not(test)))]
182        {
183            let stats = self
184                .db
185                .network_peer_stats(self.cfg.quality_bad_threshold)
186                .await
187                .map_err(|e| NetworkingError::DbChainError(e.into()))?;
188            self.refresh_metrics(&stats)
189        }
190
191        Ok(())
192    }
193
194    /// Get peer information and status.
195    ///
196    /// Private multiaddresses (RFC1918, loopback, link-local) are filtered out from the returned
197    /// PeerStatus to prevent exposure through public APIs,
198    /// unless [`NetworkConfig::allow_private_addresses_in_store`] is set.
199    #[tracing::instrument(level = "debug", skip(self), ret(level = "trace"), err)]
200    pub async fn get(&self, peer: &PeerId) -> Result<Option<PeerStatus>> {
201        if peer == &self.me {
202            Ok(Some({
203                let mut ps = PeerStatus::new(*peer, PeerOrigin::Initialization, 0.0f64, 2u32);
204                // Filter private addresses from self addresses before returning
205                ps.multiaddresses = self
206                    .me_addresses
207                    .iter()
208                    .filter(|addr| self.cfg.allow_private_addresses_in_store || is_public_address(addr))
209                    .cloned()
210                    .collect();
211                ps
212            }))
213        } else {
214            // Get peer info from database and filter private addresses
215            match self
216                .db
217                .get_network_peer(peer)
218                .await
219                .map_err(|e| NetworkingError::DbChainError(e.into()))?
220            {
221                Some(mut peer_status) => {
222                    // Filter out private addresses from multiaddresses before returning
223                    peer_status.multiaddresses = peer_status
224                        .multiaddresses
225                        .iter()
226                        .filter(|addr| self.cfg.allow_private_addresses_in_store || is_public_address(addr))
227                        .cloned()
228                        .collect();
229                    Ok(Some(peer_status))
230                }
231                None => Ok(None),
232            }
233        }
234    }
235
236    /// Remove peer from the network
237    #[tracing::instrument(level = "debug", skip(self), ret(level = "trace"), err)]
238    pub async fn remove(&self, peer: &PeerId) -> Result<()> {
239        if peer == &self.me {
240            return Err(NetworkingError::DisallowedOperationOnOwnPeerIdError);
241        }
242
243        self.db
244            .remove_network_peer(peer)
245            .await
246            .map_err(|e| NetworkingError::DbChainError(e.into()))?;
247
248        #[cfg(all(feature = "prometheus", not(test)))]
249        {
250            let stats = self
251                .db
252                .network_peer_stats(self.cfg.quality_bad_threshold)
253                .await
254                .map_err(|e| NetworkingError::DbChainError(e.into()))?;
255            self.refresh_metrics(&stats);
256            tracing::trace!(
257                health = %health_from_stats(&stats, self.am_i_public),
258                trigger = "peer removal",
259                "Network health updated"
260            );
261        }
262
263        Ok(())
264    }
265
266    /// Updates a peer's record with the result of a heartbeat ping.
267    ///
268    /// Adjusts the peer's quality, backoff, and ignore status based on the ping outcome. If the peer's quality drops
269    /// below configured thresholds, may trigger a connection close or quality update event. Returns an error if called
270    /// on the local peer.
271    ///
272    /// # Returns
273    /// - `Ok(Some(NetworkTriggeredEvent))` if the peer's status changed and an event should be triggered.
274    /// - `Ok(None)` if the peer is unknown.
275    /// - `Err(NetworkingError)` if the operation is disallowed or a database error occurs.
276    #[tracing::instrument(level = "debug", skip(self), ret(level = "trace"), err)]
277    pub async fn update(&self, peer: &PeerId, ping_result: std::result::Result<Duration, UpdateFailure>) -> Result<()> {
278        if peer == &self.me {
279            return Err(NetworkingError::DisallowedOperationOnOwnPeerIdError);
280        }
281
282        if let Some(mut entry) = self
283            .db
284            .get_network_peer(peer)
285            .await
286            .map_err(|e| NetworkingError::DbChainError(e.into()))?
287        {
288            entry.heartbeats_sent += 1;
289
290            match ping_result {
291                Ok(latency) => {
292                    if !entry.is_ignored() {
293                        entry.ignored_until = None;
294                    }
295                    entry.last_seen = current_time();
296                    entry.last_seen_latency = latency;
297                    entry.heartbeats_succeeded += 1;
298                    // reset backoff in case of a successful ping
299                    entry.backoff = self.cfg.backoff_min;
300                    entry.update_quality(1.0_f64.min(entry.get_quality() + self.cfg.quality_step));
301                }
302                Err(error) => match error {
303                    UpdateFailure::Timeout => {
304                        tracing::trace!("Update failed with timeout");
305                        // increase backoff in case of a failed ping, but cap it at the max backoff to
306                        // prevent entries from being shut out
307                        entry.backoff = self.cfg.backoff_max.min(entry.backoff.powf(self.cfg.backoff_exponent));
308                        entry.update_quality(0.0_f64.max(entry.get_quality() - self.cfg.quality_step));
309
310                        let q = entry.get_quality();
311
312                        if q < self.cfg.quality_bad_threshold {
313                            entry.ignored_until = Some(current_time() + self.cfg.ignore_timeframe);
314                        }
315                    }
316                    UpdateFailure::DialFailure => {
317                        tracing::trace!("Update failed with dial failure");
318                        entry.update_quality(0.0_f64);
319                        entry.ignored_until = Some(
320                            current_time()
321                                + crate::config::DEFAULT_CANNOT_DIAL_PENALTY
322                                + std::time::Duration::from_secs(hopr_crypto_random::random_integer(0, Some(600))),
323                        );
324                    }
325                },
326            }
327
328            tracing::trace!(%peer, quality = entry.quality, quality_avg = hopr_primitive_types::sma::SMA::average(&entry.quality_avg), "Updating peer status in the store");
329            self.db
330                .update_network_peer(entry)
331                .await
332                .map_err(|e| NetworkingError::DbChainError(e.into()))?;
333
334            #[cfg(all(feature = "prometheus", not(test)))]
335            {
336                let stats = self
337                    .db
338                    .network_peer_stats(self.cfg.quality_bad_threshold)
339                    .await
340                    .map_err(|e| NetworkingError::DbChainError(e.into()))?;
341                self.refresh_metrics(&stats);
342                tracing::trace!(
343                    health = %health_from_stats(&stats, self.am_i_public),
344                    trigger = "peer update",
345                    "Network health updated"
346                );
347            }
348
349            Ok(())
350        } else {
351            debug!(%peer, "Ignoring update request for unknown peer");
352            Ok(())
353        }
354    }
355
356    /// Returns the quality of the network as a network health indicator.
357    pub async fn health(&self) -> Health {
358        self.db
359            .network_peer_stats(self.cfg.quality_bad_threshold)
360            .await
361            .map(|stats| health_from_stats(&stats, self.am_i_public))
362            .unwrap_or(Health::Unknown)
363    }
364
365    /// Update the internally perceived network status that is processed to the network health
366    #[cfg(all(feature = "prometheus", not(test)))]
367    fn refresh_metrics(&self, stats: &Stats) {
368        let health = health_from_stats(stats, self.am_i_public);
369
370        if METRIC_NETWORK_HEALTH_TIME_TO_GREEN.get() < 0.5f64 {
371            if let Some(ts) = current_time().checked_sub(self.started_at) {
372                METRIC_NETWORK_HEALTH_TIME_TO_GREEN.set(ts.as_unix_timestamp().as_secs_f64());
373            }
374        }
375        METRIC_PEER_COUNT.set(stats.all_count() as f64);
376        METRIC_PEERS_BY_QUALITY.set(&["public", "high"], stats.good_quality_public as f64);
377        METRIC_PEERS_BY_QUALITY.set(&["public", "low"], stats.bad_quality_public as f64);
378        METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "high"], stats.good_quality_non_public as f64);
379        METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "low"], stats.bad_quality_non_public as f64);
380        METRIC_NETWORK_HEALTH.set((health as i32).into());
381    }
382
383    pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
384        let minimum_quality = self.cfg.quality_offline_threshold;
385        self.peer_filter(|peer| async move { (peer.get_quality() > minimum_quality).then_some(peer.id.1) })
386            .await
387    }
388
389    // ======
390    pub(crate) async fn peer_filter<Fut, V, F>(&self, filter: F) -> Result<Vec<V>>
391    where
392        F: FnMut(PeerStatus) -> Fut,
393        Fut: std::future::Future<Output = Option<V>>,
394    {
395        Ok(self
396            .db
397            .get_network_peers(Default::default(), false)
398            .await
399            .map_err(|e| NetworkingError::DbChainError(e.into()))?
400            .filter_map(filter)
401            .collect()
402            .await)
403    }
404
405    /// Returns a list of peer IDs eligible for pinging based on last-seen time, ignore status, and backoff delay.
406    ///
407    /// Peers are filtered to exclude self, those currently within their ignore timeframe, and those whose
408    /// backoff-adjusted delay has not yet elapsed. The resulting peers are sorted by last seen time in ascending order.
409    ///
410    /// # Parameters
411    /// - `threshold`: The cutoff `SystemTime`; only peers whose next ping is due before this time are considered.
412    ///
413    /// # Returns
414    /// A vector of peer IDs that should be pinged.
415    #[tracing::instrument(level = "debug", skip(self, threshold), ret(level = "trace"), err, fields(since = ?threshold))]
416    pub async fn find_peers_to_ping(&self, threshold: SystemTime) -> Result<Vec<PeerId>> {
417        Ok(self
418            .db
419            .get_network_peers(PeerSelector::default().with_last_seen_lte(threshold), true)
420            .await
421            .map_err(|e| NetworkingError::DbChainError(e.into()))?
422            .filter_map(|v| async move {
423                if v.id.1 == self.me {
424                    return None;
425                }
426
427                if let Some(ignore_start) = v.ignored_until {
428                    let should_be_ignored = ignore_start
429                        .checked_add(self.cfg.ignore_timeframe)
430                        .is_some_and(|v| v > threshold);
431
432                    if should_be_ignored {
433                        return None;
434                    }
435                }
436
437                let backoff = v.backoff.powf(self.cfg.backoff_exponent);
438                let delay = std::cmp::min(self.cfg.min_delay * (backoff as u32), self.cfg.max_delay);
439
440                if (v.last_seen + delay) < threshold {
441                    Some(v.id.1)
442                } else {
443                    None
444                }
445            })
446            .collect()
447            .await)
448    }
449}
450
451#[cfg(test)]
452mod tests {
453    use std::{ops::Add, time::Duration};
454
455    use anyhow::Context;
456    use hopr_crypto_types::keypairs::{Keypair, OffchainKeypair};
457    use hopr_db_node::HoprNodeDb;
458    use hopr_platform::time::native::current_time;
459    use hopr_primitive_types::prelude::AsUnixTimestamp;
460    use libp2p_identity::PeerId;
461    use more_asserts::*;
462
463    use super::*;
464    use crate::network::{Health, Network, NetworkConfig, PeerOrigin};
465
466    impl<T> Network<T>
467    where
468        T: HoprDbPeersOperations + Sync + Send,
469    {
470        /// Checks if the peer is present in the network, but it is being currently ignored.
471        async fn is_ignored(&self, peer: &PeerId) -> bool {
472            peer != &self.me && self.get(peer).await.is_ok_and(|ps| ps.is_some_and(|p| p.is_ignored()))
473        }
474    }
475
476    #[test]
477    fn test_network_health_should_serialize_to_a_proper_string() {
478        assert_eq!(format!("{}", Health::Orange), "Orange".to_owned())
479    }
480
481    #[test]
482    fn test_network_health_should_deserialize_from_proper_string() -> anyhow::Result<()> {
483        let parsed: Health = "Orange".parse()?;
484        assert_eq!(parsed, Health::Orange);
485
486        Ok(())
487    }
488
489    async fn basic_network(my_id: &PeerId) -> anyhow::Result<Network<HoprNodeDb>> {
490        let cfg = NetworkConfig {
491            quality_offline_threshold: 0.6,
492            ..Default::default()
493        };
494        Ok(Network::new(*my_id, vec![], cfg, HoprNodeDb::new_in_memory().await?))
495    }
496
497    #[test]
498    fn test_network_health_should_be_ordered_numerically_for_hopr_metrics_output() {
499        assert_eq!(Health::Unknown as i32, 0);
500        assert_eq!(Health::Red as i32, 1);
501        assert_eq!(Health::Orange as i32, 2);
502        assert_eq!(Health::Yellow as i32, 3);
503        assert_eq!(Health::Green as i32, 4);
504    }
505
506    #[tokio::test]
507    async fn test_network_should_not_be_able_to_add_self_reference() -> anyhow::Result<()> {
508        let me = PeerId::random();
509
510        let peers = basic_network(&me).await?;
511
512        assert!(peers.add(&me, PeerOrigin::IncomingConnection, vec![]).await.is_err());
513
514        assert_eq!(
515            0,
516            peers
517                .peer_filter(|peer| async move { Some(peer.id) })
518                .await
519                .unwrap_or(vec![])
520                .len()
521        );
522        assert!(peers.has(&me).await);
523
524        Ok(())
525    }
526
527    #[tokio::test]
528    async fn test_network_should_contain_a_registered_peer() -> anyhow::Result<()> {
529        let expected: PeerId = OffchainKeypair::random().public().into();
530        let me: PeerId = OffchainKeypair::random().public().into();
531
532        let peers = basic_network(&me).await?;
533
534        peers.add(&expected, PeerOrigin::IncomingConnection, vec![]).await?;
535
536        assert_eq!(
537            1,
538            peers
539                .peer_filter(|peer| async move { Some(peer.id) })
540                .await
541                .unwrap_or(vec![])
542                .len()
543        );
544        assert!(peers.has(&expected).await);
545
546        Ok(())
547    }
548
549    #[tokio::test]
550    async fn test_network_should_remove_a_peer_on_unregistration() -> anyhow::Result<()> {
551        let peer: PeerId = OffchainKeypair::random().public().into();
552        let me: PeerId = OffchainKeypair::random().public().into();
553
554        let peers = basic_network(&me).await?;
555
556        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
557
558        peers.remove(&peer).await?;
559
560        assert_eq!(
561            0,
562            peers
563                .peer_filter(|peer| async move { Some(peer.id) })
564                .await
565                .unwrap_or(vec![])
566                .len()
567        );
568        assert!(!peers.has(&peer).await);
569
570        Ok(())
571    }
572
573    #[tokio::test]
574    async fn test_network_should_ignore_heartbeat_updates_for_peers_that_were_not_registered() -> anyhow::Result<()> {
575        let peer: PeerId = OffchainKeypair::random().public().into();
576        let me: PeerId = OffchainKeypair::random().public().into();
577
578        let peers = basic_network(&me).await?;
579
580        peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
581
582        assert_eq!(
583            0,
584            peers
585                .peer_filter(|peer| async move { Some(peer.id) })
586                .await
587                .unwrap_or(vec![])
588                .len()
589        );
590        assert!(!peers.has(&peer).await);
591
592        Ok(())
593    }
594
595    #[tokio::test]
596    async fn test_network_should_be_able_to_register_a_succeeded_heartbeat_result() -> anyhow::Result<()> {
597        let peer: PeerId = OffchainKeypair::random().public().into();
598        let me: PeerId = OffchainKeypair::random().public().into();
599
600        let peers = basic_network(&me).await?;
601
602        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
603
604        let latency = 123u64;
605
606        peers
607            .update(&peer, Ok(std::time::Duration::from_millis(latency)))
608            .await?;
609
610        let actual = peers.get(&peer).await?.expect("peer record should be present");
611
612        assert_eq!(actual.heartbeats_sent, 1);
613        assert_eq!(actual.heartbeats_succeeded, 1);
614        assert_eq!(actual.last_seen_latency, std::time::Duration::from_millis(latency));
615
616        Ok(())
617    }
618
619    #[tokio::test]
620    async fn test_network_update_should_merge_metadata() -> anyhow::Result<()> {
621        let peer: PeerId = OffchainKeypair::random().public().into();
622        let me: PeerId = OffchainKeypair::random().public().into();
623
624        let peers = basic_network(&me).await?;
625
626        let ts = Duration::from_millis(100);
627
628        {
629            peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
630            peers.update(&peer, Ok(ts)).await?;
631
632            let status = peers.get(&peer).await?.context("peer should be present")?;
633
634            assert_eq!(status.last_seen_latency, ts);
635        }
636
637        let ts = Duration::from_millis(200);
638
639        {
640            peers.update(&peer, Ok(ts)).await?;
641
642            let status = peers.get(&peer).await?.context("peer should be present")?;
643
644            assert_eq!(status.last_seen_latency, ts);
645        }
646
647        Ok(())
648    }
649
650    #[tokio::test]
651    async fn network_should_ignore_a_peer_that_has_reached_lower_thresholds_a_specified_amount_of_time()
652    -> anyhow::Result<()> {
653        let peer: PeerId = OffchainKeypair::random().public().into();
654        let me: PeerId = OffchainKeypair::random().public().into();
655
656        let peers = basic_network(&me).await?;
657
658        peers.add(&peer, PeerOrigin::NetworkRegistry, vec![]).await?;
659
660        peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
661        peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
662        peers.update(&peer, Err(UpdateFailure::Timeout)).await?; // should drop to ignored
663
664        peers
665            .update(&peer, Err(UpdateFailure::Timeout))
666            .await
667            .expect("no error should occur"); // should drop from network
668
669        assert!(peers.is_ignored(&peer).await);
670
671        // peer should remain ignored and not be added
672        peers.add(&peer, PeerOrigin::ManualPing, vec![]).await?;
673
674        assert!(peers.is_ignored(&peer).await);
675
676        Ok(())
677    }
678
679    #[tokio::test]
680    async fn network_should_stop_ignoring_a_peer_that_has_reached_lower_thresholds_but_connected_back()
681    -> anyhow::Result<()> {
682        let peer: PeerId = OffchainKeypair::random().public().into();
683        let me: PeerId = OffchainKeypair::random().public().into();
684
685        let peers = basic_network(&me).await?;
686
687        peers.add(&peer, PeerOrigin::NetworkRegistry, vec![]).await?;
688
689        peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
690        peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
691        peers.update(&peer, Err(UpdateFailure::Timeout)).await?; // should drop to ignored
692
693        peers
694            .update(&peer, Err(UpdateFailure::Timeout))
695            .await
696            .expect("no error should occur"); // should drop from network
697
698        assert!(peers.is_ignored(&peer).await);
699
700        // peer should remain ignored and not be added
701        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
702
703        assert!(!peers.is_ignored(&peer).await);
704
705        Ok(())
706    }
707
708    #[tokio::test]
709    async fn network_should_ignore_a_peer_that_could_not_be_dialed() -> anyhow::Result<()> {
710        let peer: PeerId = OffchainKeypair::random().public().into();
711        let me: PeerId = OffchainKeypair::random().public().into();
712
713        let peers = basic_network(&me).await?;
714
715        peers.add(&peer, PeerOrigin::NetworkRegistry, vec![]).await?;
716
717        peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
718        peers.update(&peer, Err(UpdateFailure::DialFailure)).await?; // should drop to ignored
719
720        assert!(peers.is_ignored(&peer).await);
721
722        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
723
724        assert!(!peers.is_ignored(&peer).await);
725
726        Ok(())
727    }
728
729    #[tokio::test]
730    async fn test_network_should_be_able_to_register_a_failed_heartbeat_result() -> anyhow::Result<()> {
731        let peer: PeerId = OffchainKeypair::random().public().into();
732        let me: PeerId = OffchainKeypair::random().public().into();
733
734        let peers = basic_network(&me).await?;
735
736        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
737
738        // Needs to do 3 pings, so we get over the ignore threshold limit
739        // when doing the 4th failed ping
740        peers
741            .update(&peer, Ok(std::time::Duration::from_millis(123_u64)))
742            .await?;
743        peers
744            .update(&peer, Ok(std::time::Duration::from_millis(200_u64)))
745            .await?;
746        peers
747            .update(&peer, Ok(std::time::Duration::from_millis(200_u64)))
748            .await?;
749
750        peers.update(&peer, Err(UpdateFailure::Timeout)).await?;
751
752        let actual = peers.get(&peer).await?.expect("the peer record should be present");
753
754        assert_eq!(actual.heartbeats_succeeded, 3);
755        assert_lt!(actual.backoff, 3f64);
756        assert_gt!(actual.backoff, 2f64);
757
758        Ok(())
759    }
760
761    #[tokio::test]
762    async fn test_network_should_not_overflow_max_backoff() -> anyhow::Result<()> {
763        let peer: PeerId = OffchainKeypair::random().public().into();
764        let me: PeerId = OffchainKeypair::random().public().into();
765
766        let peers = basic_network(&me).await?;
767
768        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
769
770        for latency in [123_u64, 200_u64, 200_u64] {
771            peers
772                .update(&peer, Ok(std::time::Duration::from_millis(latency)))
773                .await?;
774        }
775
776        // iterate until max backoff is reached
777        loop {
778            let updated_peer = peers.get(&peer).await?.expect("the peer record should be present");
779            if updated_peer.backoff == peers.cfg.backoff_max {
780                break;
781            }
782
783            peers.update(&peer, Err(UpdateFailure::Timeout)).await?;
784        }
785
786        // perform one more failing heartbeat update and ensure max backoff is not exceeded
787        peers.update(&peer, Err(UpdateFailure::Timeout)).await?;
788        let actual = peers.get(&peer).await?.expect("the peer record should be present");
789
790        assert_eq!(actual.backoff, peers.cfg.backoff_max);
791
792        Ok(())
793    }
794
795    #[tokio::test]
796    async fn test_network_peer_should_be_listed_for_the_ping_if_last_recorded_later_than_reference()
797    -> anyhow::Result<()> {
798        let first: PeerId = OffchainKeypair::random().public().into();
799        let second: PeerId = OffchainKeypair::random().public().into();
800        let me: PeerId = OffchainKeypair::random().public().into();
801
802        let peers = basic_network(&me).await?;
803
804        peers.add(&first, PeerOrigin::IncomingConnection, vec![]).await?;
805        peers.add(&second, PeerOrigin::IncomingConnection, vec![]).await?;
806
807        let latency = 77_u64;
808
809        let mut expected = vec![first, second];
810        expected.sort();
811
812        peers
813            .update(&first, Ok(std::time::Duration::from_millis(latency)))
814            .await?;
815        peers
816            .update(&second, Ok(std::time::Duration::from_millis(latency)))
817            .await?;
818
819        // assert_eq!(
820        //     format!(
821        //         "{:?}",
822        //         peers.should_still_be_ignored(&peers.get(&first).await.unwrap().unwrap())
823        //     ),
824        //     ""
825        // );
826        // assert_eq!(format!("{:?}", peers.get(&first).await), "");
827
828        let mut actual = peers
829            .find_peers_to_ping(current_time().add(Duration::from_secs(2u64)))
830            .await?;
831        actual.sort();
832
833        assert_eq!(actual, expected);
834
835        Ok(())
836    }
837
838    #[tokio::test]
839    async fn test_network_should_have_red_health_without_any_registered_peers() -> anyhow::Result<()> {
840        let me: PeerId = OffchainKeypair::random().public().into();
841
842        let peers = basic_network(&me).await?;
843
844        assert_eq!(peers.health().await, Health::Red);
845
846        Ok(())
847    }
848
849    #[tokio::test]
850    async fn test_network_should_be_unhealthy_without_any_heartbeat_updates() -> anyhow::Result<()> {
851        let peer: PeerId = OffchainKeypair::random().public().into();
852        let me: PeerId = OffchainKeypair::random().public().into();
853
854        let peers = basic_network(&me).await?;
855
856        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
857
858        // all peers are public
859        assert_eq!(peers.health().await, Health::Orange);
860
861        Ok(())
862    }
863
864    #[tokio::test]
865    async fn test_network_should_be_unhealthy_without_any_peers_once_the_health_was_known() -> anyhow::Result<()> {
866        let peer: PeerId = OffchainKeypair::random().public().into();
867        let me: PeerId = OffchainKeypair::random().public().into();
868
869        let peers = basic_network(&me).await?;
870
871        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
872        let _ = peers.health().await;
873        peers.remove(&peer).await?;
874
875        assert_eq!(peers.health().await, Health::Red);
876
877        Ok(())
878    }
879
880    #[tokio::test]
881    async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_low_quality() -> anyhow::Result<()> {
882        let peer: PeerId = OffchainKeypair::random().public().into();
883        let me: PeerId = OffchainKeypair::random().public().into();
884
885        let cfg = NetworkConfig {
886            quality_offline_threshold: 0.6,
887            ..Default::default()
888        };
889
890        let peers = Network::new(me, vec![], cfg, HoprNodeDb::new_in_memory().await?);
891
892        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
893
894        peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
895
896        assert_eq!(peers.health().await, Health::Orange);
897
898        Ok(())
899    }
900
901    #[tokio::test]
902    async fn network_should_allow_the_quality_to_go_to_0() -> anyhow::Result<()> {
903        let peer: PeerId = OffchainKeypair::random().public().into();
904        let public = peer;
905        let me: PeerId = OffchainKeypair::random().public().into();
906
907        let cfg = NetworkConfig {
908            quality_offline_threshold: 0.6,
909            ..Default::default()
910        };
911
912        let peers = Network::new(me, vec![], cfg, HoprNodeDb::new_in_memory().await?);
913
914        peers.add(&peer, PeerOrigin::NetworkRegistry, vec![]).await?;
915
916        assert!(peers.update(&peer, Err(UpdateFailure::Timeout)).await.is_ok());
917
918        assert!(peers.is_ignored(&public).await);
919
920        Ok(())
921    }
922
923    #[tokio::test]
924    async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_high_quality_and_i_am_public()
925    -> anyhow::Result<()> {
926        let me: PeerId = OffchainKeypair::random().public().into();
927        let peer: PeerId = OffchainKeypair::random().public().into();
928
929        let cfg = NetworkConfig {
930            quality_offline_threshold: 0.3,
931            ..Default::default()
932        };
933
934        let peers = Network::new(me, vec![], cfg, HoprNodeDb::new_in_memory().await?);
935
936        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
937
938        for _ in 0..3 {
939            peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
940        }
941
942        assert_eq!(peers.health().await, Health::Green);
943
944        Ok(())
945    }
946
947    #[tokio::test]
948    async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_high_quality_and_another_high_quality_non_public()
949    -> anyhow::Result<()> {
950        let peer: PeerId = OffchainKeypair::random().public().into();
951        let peer2: PeerId = OffchainKeypair::random().public().into();
952
953        let cfg = NetworkConfig {
954            quality_offline_threshold: 0.3,
955            ..Default::default()
956        };
957
958        let peers = Network::new(
959            OffchainKeypair::random().public().into(),
960            vec![],
961            cfg,
962            HoprNodeDb::new_in_memory().await?,
963        );
964
965        peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
966        peers.add(&peer2, PeerOrigin::IncomingConnection, vec![]).await?;
967
968        for _ in 0..3 {
969            peers.update(&peer2, Ok(current_time().as_unix_timestamp())).await?;
970            peers.update(&peer, Ok(current_time().as_unix_timestamp())).await?;
971        }
972
973        assert_eq!(peers.health().await, Health::Green);
974
975        Ok(())
976    }
977
978    #[tokio::test]
979    #[ignore] // untestable without a feature flag
980    async fn network_add_should_filter_private_multiaddresses() -> anyhow::Result<()> {
981        use multiaddr::Multiaddr;
982
983        let peer: PeerId = OffchainKeypair::random().public().into();
984        let me: PeerId = OffchainKeypair::random().public().into();
985
986        // Create multiaddresses with both public and private addresses
987        let private_addr1: Multiaddr = "/ip4/192.168.1.100/tcp/9091".parse()?;
988        let private_addr2: Multiaddr = "/ip4/10.0.0.1/tcp/9092".parse()?;
989        let private_addr3: Multiaddr = "/ip4/127.0.0.1/tcp/9093".parse()?;
990        let public_addr: Multiaddr = "/ip4/8.8.8.8/tcp/9094".parse()?;
991
992        let mixed_addresses = vec![
993            private_addr1.clone(),
994            public_addr.clone(),
995            private_addr2.clone(),
996            private_addr3.clone(),
997        ];
998
999        let peers = Network::new(
1000            me,
1001            vec![public_addr.clone()], // Set only public address for self
1002            Default::default(),
1003            HoprNodeDb::new_in_memory().await?,
1004        );
1005
1006        // Add peer with mixed addresses - private addresses should be filtered out during add
1007        peers.add(&peer, PeerOrigin::NetworkRegistry, mixed_addresses).await?;
1008
1009        // Get the peer info - should only contain public addresses since private ones were filtered during add
1010        let peer_status = peers.get(&peer).await?.context("peer should be present")?;
1011
1012        // Verify only public addresses remain in the database
1013        assert_eq!(
1014            peer_status.multiaddresses.len(),
1015            1,
1016            "Should only have 1 public address stored"
1017        );
1018        assert_eq!(
1019            peer_status.multiaddresses[0], public_addr,
1020            "Should only contain the public address"
1021        );
1022
1023        // Test self peer filtering too (get method still filters self addresses as defensive measure)
1024        let self_status = peers.get(&me).await?.context("self peer should be present")?;
1025
1026        // Verify only public addresses remain for self
1027        assert_eq!(
1028            self_status.multiaddresses.len(),
1029            1,
1030            "Self should only have 1 public address"
1031        );
1032        assert_eq!(
1033            self_status.multiaddresses[0], public_addr,
1034            "Self should only contain the public address"
1035        );
1036
1037        Ok(())
1038    }
1039
1040    #[tokio::test]
1041    #[ignore] // untestable without a feature flag
1042    async fn network_get_should_filter_private_multiaddresses_as_defensive_measure() -> anyhow::Result<()> {
1043        use multiaddr::Multiaddr;
1044
1045        let me: PeerId = OffchainKeypair::random().public().into();
1046
1047        // Create multiaddresses with both public and private addresses for self
1048        let private_addr1: Multiaddr = "/ip4/192.168.1.100/tcp/9091".parse()?;
1049        let public_addr: Multiaddr = "/ip4/8.8.8.8/tcp/9094".parse()?;
1050
1051        let mixed_self_addresses = vec![private_addr1.clone(), public_addr.clone()];
1052
1053        // Create network with mixed addresses for self (simulating if somehow private addresses get set)
1054        let peers = Network::new(
1055            me,
1056            mixed_self_addresses,
1057            Default::default(),
1058            HoprNodeDb::new_in_memory().await?,
1059        );
1060
1061        // Test that get() method filters self addresses as a defensive measure
1062        let self_status = peers.get(&me).await?.context("self peer should be present")?;
1063
1064        // Verify only public addresses remain for self
1065        assert_eq!(
1066            self_status.multiaddresses.len(),
1067            1,
1068            "Self should only have 1 public address"
1069        );
1070        assert_eq!(
1071            self_status.multiaddresses[0], public_addr,
1072            "Self should only contain the public address"
1073        );
1074
1075        Ok(())
1076    }
1077}