use std::collections::hash_set::HashSet;
use std::time::{Duration, SystemTime};
use futures::StreamExt;
use libp2p_identity::PeerId;
use multiaddr::Multiaddr;
use tracing::debug;
pub use hopr_db_api::peers::{HoprDbPeersOperations, PeerOrigin, PeerSelector, PeerStatus, Stats};
use hopr_platform::time::native::current_time;
use crate::config::NetworkConfig;
#[cfg(all(feature = "prometheus", not(test)))]
use {
hopr_metrics::metrics::{MultiGauge, SimpleGauge},
hopr_primitive_types::prelude::*,
};
#[cfg(all(feature = "prometheus", not(test)))]
lazy_static::lazy_static! {
static ref METRIC_NETWORK_HEALTH: SimpleGauge =
SimpleGauge::new("hopr_network_health", "Connectivity health indicator").unwrap();
static ref METRIC_PEERS_BY_QUALITY: MultiGauge =
MultiGauge::new("hopr_peers_by_quality", "Number different peer types by quality",
&["type", "quality"],
).unwrap();
static ref METRIC_PEER_COUNT: SimpleGauge =
SimpleGauge::new("hopr_peer_count", "Number of all peers").unwrap();
static ref METRIC_NETWORK_HEALTH_TIME_TO_GREEN: SimpleGauge = SimpleGauge::new(
"hopr_time_to_green_sec",
"Time it takes for a node to transition to the GREEN network state"
).unwrap();
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, strum::Display, strum::EnumString)]
pub enum Health {
Unknown = 0,
Red = 1,
Orange = 2,
Yellow = 3,
Green = 4,
}
#[derive(Debug, Clone, PartialEq, strum::Display)]
pub enum NetworkTriggeredEvent {
CloseConnection(PeerId),
UpdateQuality(PeerId, f64),
}
fn health_from_stats(stats: &Stats, is_public: bool) -> Health {
let mut health = Health::Red;
if stats.bad_quality_public > 0 {
health = Health::Orange;
}
if stats.good_quality_public > 0 {
health = if is_public || stats.good_quality_non_public > 0 {
Health::Green
} else {
Health::Yellow
};
}
health
}
#[derive(Debug)]
pub struct Network<T>
where
T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
{
me: PeerId,
me_addresses: Vec<Multiaddr>,
am_i_public: bool,
cfg: NetworkConfig,
db: T,
#[cfg(all(feature = "prometheus", not(test)))]
started_at: Duration,
}
impl<T> Network<T>
where
T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
{
pub fn new(my_peer_id: PeerId, my_multiaddresses: Vec<Multiaddr>, cfg: NetworkConfig, db: T) -> Self {
#[cfg(all(feature = "prometheus", not(test)))]
{
METRIC_NETWORK_HEALTH.set(0.0);
METRIC_NETWORK_HEALTH_TIME_TO_GREEN.set(0.0);
METRIC_PEERS_BY_QUALITY.set(&["public", "high"], 0.0);
METRIC_PEERS_BY_QUALITY.set(&["public", "low"], 0.0);
METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "high"], 0.0);
METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "low"], 0.0);
}
Self {
me: my_peer_id,
me_addresses: my_multiaddresses,
am_i_public: true,
cfg: cfg.clone(),
db,
#[cfg(all(feature = "prometheus", not(test)))]
started_at: current_time().as_unix_timestamp(),
}
}
pub async fn has(&self, peer: &PeerId) -> bool {
peer == &self.me || self.db.get_network_peer(peer).await.is_ok_and(|p| p.is_some())
}
pub async fn is_ignored(&self, peer: &PeerId) -> bool {
peer != &self.me
&& self
.get(peer)
.await
.is_ok_and(|ps| ps.is_some_and(|p| p.is_ignored(current_time(), self.cfg.ignore_timeframe)))
}
pub async fn add(&self, peer: &PeerId, origin: PeerOrigin, mut addrs: Vec<Multiaddr>) -> crate::errors::Result<()> {
if peer == &self.me {
return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
}
if let Some(mut peer_status) = self.db.get_network_peer(peer).await? {
if !peer_status.is_ignored(current_time(), self.cfg.ignore_timeframe) {
peer_status.ignored = None;
peer_status.multiaddresses.append(&mut addrs);
peer_status.multiaddresses = peer_status
.multiaddresses
.into_iter()
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
self.db.update_network_peer(peer_status).await?;
}
} else {
debug!(%peer, %origin, multiaddresses = ?addrs, "Adding peer to the store");
self.db
.add_network_peer(
peer,
origin,
addrs,
self.cfg.backoff_exponent,
self.cfg.quality_avg_window_size,
)
.await?;
}
#[cfg(all(feature = "prometheus", not(test)))]
{
let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
self.refresh_metrics(&stats)
}
Ok(())
}
pub async fn get(&self, peer: &PeerId) -> crate::errors::Result<Option<PeerStatus>> {
if peer == &self.me {
Ok(Some({
let mut ps = PeerStatus::new(*peer, PeerOrigin::Initialization, 0.0f64, 2u32);
ps.multiaddresses.clone_from(&self.me_addresses);
ps
}))
} else {
Ok(self.db.get_network_peer(peer).await?)
}
}
pub async fn remove(&self, peer: &PeerId) -> crate::errors::Result<()> {
if peer == &self.me {
return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
}
self.db.remove_network_peer(peer).await?;
#[cfg(all(feature = "prometheus", not(test)))]
{
let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
self.refresh_metrics(&stats)
}
Ok(())
}
pub async fn update(
&self,
peer: &PeerId,
ping_result: std::result::Result<Duration, ()>,
version: Option<String>,
) -> crate::errors::Result<Option<NetworkTriggeredEvent>> {
if peer == &self.me {
return Err(crate::errors::NetworkingError::DisallowedOperationOnOwnPeerIdError);
}
if let Some(mut entry) = self.db.get_network_peer(peer).await? {
if !entry.is_ignored(current_time(), self.cfg.ignore_timeframe) {
entry.ignored = None;
}
entry.heartbeats_sent += 1;
entry.peer_version = version;
if let Ok(latency) = ping_result {
entry.last_seen = current_time();
entry.last_seen_latency = latency;
entry.heartbeats_succeeded += 1;
entry.backoff = self.cfg.backoff_min;
entry.update_quality(1.0_f64.min(entry.get_quality() + self.cfg.quality_step));
} else {
entry.backoff = self.cfg.backoff_max.max(entry.backoff.powf(self.cfg.backoff_exponent));
entry.update_quality(0.0_f64.max(entry.get_quality() - self.cfg.quality_step));
let q = entry.get_quality();
if q < self.cfg.quality_bad_threshold {
entry.ignored = Some(current_time());
}
}
let (peer_id, quality) = (entry.id.1, entry.get_quality());
self.db.update_network_peer(entry).await?;
#[cfg(all(feature = "prometheus", not(test)))]
{
let stats = self.db.network_peer_stats(self.cfg.quality_bad_threshold).await?;
self.refresh_metrics(&stats)
}
if quality <= self.cfg.quality_offline_threshold {
Ok(Some(NetworkTriggeredEvent::CloseConnection(peer_id)))
} else {
Ok(Some(NetworkTriggeredEvent::UpdateQuality(peer_id, quality)))
}
} else {
debug!(%peer, "Ignoring update request for unknown peer");
Ok(None)
}
}
pub async fn health(&self) -> Health {
self.db
.network_peer_stats(self.cfg.quality_bad_threshold)
.await
.map(|stats| health_from_stats(&stats, self.am_i_public))
.unwrap_or(Health::Unknown)
}
#[cfg(all(feature = "prometheus", not(test)))]
fn refresh_metrics(&self, stats: &Stats) {
let health = health_from_stats(stats, self.am_i_public);
if METRIC_NETWORK_HEALTH_TIME_TO_GREEN.get() < 0.5f64 {
if let Some(ts) = current_time().checked_sub(self.started_at) {
METRIC_NETWORK_HEALTH_TIME_TO_GREEN.set(ts.as_unix_timestamp().as_secs_f64());
}
}
METRIC_PEER_COUNT.set(stats.all_count() as f64);
METRIC_PEERS_BY_QUALITY.set(&["public", "high"], stats.good_quality_public as f64);
METRIC_PEERS_BY_QUALITY.set(&["public", "low"], stats.bad_quality_public as f64);
METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "high"], stats.good_quality_non_public as f64);
METRIC_PEERS_BY_QUALITY.set(&["nonPublic", "low"], stats.bad_quality_non_public as f64);
METRIC_NETWORK_HEALTH.set((health as i32).into());
}
pub async fn connected_peers(&self) -> crate::errors::Result<Vec<PeerId>> {
let minimum_quality = self.cfg.quality_offline_threshold;
self.peer_filter(|peer| async move { (peer.get_quality() > minimum_quality).then_some(peer.id.1) })
.await
}
pub(crate) async fn peer_filter<Fut, V, F>(&self, filter: F) -> crate::errors::Result<Vec<V>>
where
F: FnMut(PeerStatus) -> Fut,
Fut: std::future::Future<Output = Option<V>>,
{
let stream = self.db.get_network_peers(Default::default(), false).await?;
futures::pin_mut!(stream);
Ok(stream.filter_map(filter).collect().await)
}
pub async fn find_peers_to_ping(&self, threshold: SystemTime) -> crate::errors::Result<Vec<PeerId>> {
let stream = self
.db
.get_network_peers(PeerSelector::default().with_last_seen_lte(threshold), false)
.await?;
futures::pin_mut!(stream);
let mut data: Vec<PeerStatus> = stream
.filter_map(|v| async move {
if v.id.1 == self.me {
return None;
}
if let Some(ignore_start) = v.ignored {
let should_be_ignored = ignore_start
.checked_add(self.cfg.ignore_timeframe)
.is_some_and(|v| v > threshold);
if should_be_ignored {
return None;
}
}
let backoff = v.backoff.powf(self.cfg.backoff_exponent);
let delay = std::cmp::min(self.cfg.min_delay * (backoff as u32), self.cfg.max_delay);
if (v.last_seen + delay) < threshold {
Some(v)
} else {
None
}
})
.collect()
.await;
data.sort_by(|a, b| {
if a.last_seen < b.last_seen {
std::cmp::Ordering::Less
} else {
std::cmp::Ordering::Greater
}
});
Ok(data.into_iter().map(|peer| peer.id.1).collect())
}
}
#[cfg(test)]
mod tests {
use crate::network::{Health, Network, NetworkConfig, NetworkTriggeredEvent, PeerOrigin};
use anyhow::Context;
use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
use hopr_platform::time::native::current_time;
use hopr_primitive_types::prelude::AsUnixTimestamp;
use libp2p_identity::PeerId;
use std::ops::Add;
use std::time::Duration;
#[test]
fn test_network_health_should_serialize_to_a_proper_string() {
assert_eq!(format!("{}", Health::Orange), "Orange".to_owned())
}
#[test]
fn test_network_health_should_deserialize_from_proper_string() -> Result<(), Box<dyn std::error::Error>> {
let parsed: Health = "Orange".parse()?;
Ok(assert_eq!(parsed, Health::Orange))
}
async fn basic_network(my_id: &PeerId) -> anyhow::Result<Network<hopr_db_sql::db::HoprDb>> {
let mut cfg = NetworkConfig::default();
cfg.quality_offline_threshold = 0.6;
Ok(Network::new(
*my_id,
vec![],
cfg,
hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
))
}
#[test]
fn test_network_health_should_be_ordered_numerically_for_hopr_metrics_output() {
assert_eq!(Health::Unknown as i32, 0);
assert_eq!(Health::Red as i32, 1);
assert_eq!(Health::Orange as i32, 2);
assert_eq!(Health::Yellow as i32, 3);
assert_eq!(Health::Green as i32, 4);
}
#[async_std::test]
async fn test_network_should_not_be_able_to_add_self_reference() -> anyhow::Result<()> {
let me = PeerId::random();
let peers = basic_network(&me).await?;
assert!(peers.add(&me, PeerOrigin::IncomingConnection, vec![]).await.is_err());
assert_eq!(
0,
peers
.peer_filter(|peer| async move { Some(peer.id) })
.await
.unwrap_or(vec![])
.len()
);
assert!(peers.has(&me).await);
Ok(())
}
#[async_std::test]
async fn test_network_should_contain_a_registered_peer() -> anyhow::Result<()> {
let expected: PeerId = OffchainKeypair::random().public().into();
let me: PeerId = OffchainKeypair::random().public().into();
let peers = basic_network(&me).await?;
peers.add(&expected, PeerOrigin::IncomingConnection, vec![]).await?;
assert_eq!(
1,
peers
.peer_filter(|peer| async move { Some(peer.id) })
.await
.unwrap_or(vec![])
.len()
);
assert!(peers.has(&expected).await);
Ok(())
}
#[async_std::test]
async fn test_network_should_remove_a_peer_on_unregistration() -> anyhow::Result<()> {
let peer: PeerId = OffchainKeypair::random().public().into();
let me: PeerId = OffchainKeypair::random().public().into();
let peers = basic_network(&me).await?;
peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
peers.remove(&peer).await?;
assert_eq!(
0,
peers
.peer_filter(|peer| async move { Some(peer.id) })
.await
.unwrap_or(vec![])
.len()
);
assert!(!peers.has(&peer).await);
Ok(())
}
#[async_std::test]
async fn test_network_should_ignore_heartbeat_updates_for_peers_that_were_not_registered() -> anyhow::Result<()> {
let peer: PeerId = OffchainKeypair::random().public().into();
let me: PeerId = OffchainKeypair::random().public().into();
let peers = basic_network(&me).await?;
peers
.update(&peer, Ok(current_time().as_unix_timestamp()), None)
.await?;
assert_eq!(
0,
peers
.peer_filter(|peer| async move { Some(peer.id) })
.await
.unwrap_or(vec![])
.len()
);
assert!(!peers.has(&peer).await);
Ok(())
}
#[async_std::test]
async fn test_network_should_be_able_to_register_a_succeeded_heartbeat_result() -> anyhow::Result<()> {
let peer: PeerId = OffchainKeypair::random().public().into();
let me: PeerId = OffchainKeypair::random().public().into();
let peers = basic_network(&me).await?;
peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
let latency = 123u64;
peers
.update(&peer, Ok(std::time::Duration::from_millis(latency)), None)
.await?;
let actual = peers.get(&peer).await?.expect("peer record should be present");
assert_eq!(actual.heartbeats_sent, 1);
assert_eq!(actual.heartbeats_succeeded, 1);
assert_eq!(actual.last_seen_latency, std::time::Duration::from_millis(latency));
Ok(())
}
#[async_std::test]
async fn test_network_update_should_merge_metadata() -> anyhow::Result<()> {
let peer: PeerId = OffchainKeypair::random().public().into();
let me: PeerId = OffchainKeypair::random().public().into();
let peers = basic_network(&me).await?;
let expected_version = Some("1.2.4".to_string());
{
peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
peers
.update(&peer, Ok(current_time().as_unix_timestamp()), expected_version.clone())
.await?;
let status = peers.get(&peer).await?.context("peer should be present")?;
assert_eq!(status.peer_version, expected_version);
}
let ts = current_time().as_unix_timestamp();
{
let expected_version = Some("2.0.0".to_string());
peers.update(&peer, Ok(ts), expected_version.clone()).await?;
let status = peers.get(&peer).await?.context("peer should be present")?;
assert_eq!(status.peer_version, expected_version);
}
Ok(())
}
#[async_std::test]
async fn test_network_should_ignore_a_peer_that_has_reached_lower_thresholds_a_specified_amount_of_time(
) -> anyhow::Result<()> {
let peer: PeerId = OffchainKeypair::random().public().into();
let me: PeerId = OffchainKeypair::random().public().into();
let peers = basic_network(&me).await?;
peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
peers
.update(&peer, Ok(current_time().as_unix_timestamp()), None)
.await?;
peers
.update(&peer, Ok(current_time().as_unix_timestamp()), None)
.await?;
peers.update(&peer, Err(()), None).await?; peers.update(&peer, Err(()), None).await.expect("no error should occur"); assert!(peers.is_ignored(&peer).await);
peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
assert!(peers.is_ignored(&peer).await);
Ok(())
}
#[async_std::test]
async fn test_network_should_be_able_to_register_a_failed_heartbeat_result() -> anyhow::Result<()> {
let peer: PeerId = OffchainKeypair::random().public().into();
let me: PeerId = OffchainKeypair::random().public().into();
let peers = basic_network(&me).await?;
peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
peers
.update(&peer, Ok(std::time::Duration::from_millis(123_u64)), None)
.await?;
peers
.update(&peer, Ok(std::time::Duration::from_millis(200_u64)), None)
.await?;
peers
.update(&peer, Ok(std::time::Duration::from_millis(200_u64)), None)
.await?;
peers.update(&peer, Err(()), None).await?;
let actual = peers.get(&peer).await?.expect("the peer record should be present");
assert_eq!(actual.heartbeats_succeeded, 3);
assert_eq!(actual.backoff, 300f64);
Ok(())
}
#[async_std::test]
async fn test_network_peer_should_be_listed_for_the_ping_if_last_recorded_later_than_reference(
) -> anyhow::Result<()> {
let first: PeerId = OffchainKeypair::random().public().into();
let second: PeerId = OffchainKeypair::random().public().into();
let me: PeerId = OffchainKeypair::random().public().into();
let peers = basic_network(&me).await?;
peers.add(&first, PeerOrigin::IncomingConnection, vec![]).await?;
peers.add(&second, PeerOrigin::IncomingConnection, vec![]).await?;
let latency = 77_u64;
let mut expected = vec![first, second];
expected.sort();
peers
.update(&first, Ok(std::time::Duration::from_millis(latency)), None)
.await?;
peers
.update(&second, Ok(std::time::Duration::from_millis(latency)), None)
.await?;
let mut actual = peers
.find_peers_to_ping(current_time().add(Duration::from_secs(2u64)))
.await?;
actual.sort();
assert_eq!(actual, expected);
Ok(())
}
#[async_std::test]
async fn test_network_should_have_red_health_without_any_registered_peers() -> anyhow::Result<()> {
let me: PeerId = OffchainKeypair::random().public().into();
let peers = basic_network(&me).await?;
assert_eq!(peers.health().await, Health::Red);
Ok(())
}
#[async_std::test]
async fn test_network_should_be_unhealthy_without_any_heartbeat_updates() -> anyhow::Result<()> {
let peer: PeerId = OffchainKeypair::random().public().into();
let me: PeerId = OffchainKeypair::random().public().into();
let peers = basic_network(&me).await?;
peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
assert_eq!(peers.health().await, Health::Orange);
Ok(())
}
#[async_std::test]
async fn test_network_should_be_unhealthy_without_any_peers_once_the_health_was_known() -> anyhow::Result<()> {
let peer: PeerId = OffchainKeypair::random().public().into();
let me: PeerId = OffchainKeypair::random().public().into();
let peers = basic_network(&me).await?;
peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
let _ = peers.health();
peers.remove(&peer).await?;
assert_eq!(peers.health().await, Health::Red);
Ok(())
}
#[async_std::test]
async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_low_quality() -> anyhow::Result<()> {
let peer: PeerId = OffchainKeypair::random().public().into();
let me: PeerId = OffchainKeypair::random().public().into();
let mut cfg = NetworkConfig::default();
cfg.quality_offline_threshold = 0.6;
let peers = Network::new(
me,
vec![],
cfg,
hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
);
peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
peers
.update(&peer, Ok(current_time().as_unix_timestamp()), None)
.await?;
assert_eq!(peers.health().await, Health::Orange);
Ok(())
}
#[async_std::test]
async fn test_network_should_close_connection_to_peer_once_it_reaches_the_lowest_possible_quality(
) -> anyhow::Result<()> {
let peer: PeerId = OffchainKeypair::random().public().into();
let public = peer;
let me: PeerId = OffchainKeypair::random().public().into();
let mut cfg = NetworkConfig::default();
cfg.quality_offline_threshold = 0.6;
let peers = Network::new(
me,
vec![],
cfg,
hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
);
peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
assert_eq!(
peers.update(&peer, Err(()), None).await?,
Some(NetworkTriggeredEvent::CloseConnection(peer))
);
assert!(peers.is_ignored(&public).await);
Ok(())
}
#[async_std::test]
async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_high_quality_and_i_am_public(
) -> anyhow::Result<()> {
let me: PeerId = OffchainKeypair::random().public().into();
let peer: PeerId = OffchainKeypair::random().public().into();
let mut cfg = NetworkConfig::default();
cfg.quality_offline_threshold = 0.3;
let peers = Network::new(
me,
vec![],
cfg,
hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
);
peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
for _ in 0..3 {
peers
.update(&peer, Ok(current_time().as_unix_timestamp()), None)
.await?;
}
assert_eq!(peers.health().await, Health::Green);
Ok(())
}
#[async_std::test]
async fn test_network_should_be_healthy_when_a_public_peer_is_pingable_with_high_quality_and_another_high_quality_non_public(
) -> anyhow::Result<()> {
let peer: PeerId = OffchainKeypair::random().public().into();
let peer2: PeerId = OffchainKeypair::random().public().into();
let mut cfg = NetworkConfig::default();
cfg.quality_offline_threshold = 0.3;
let peers = Network::new(
OffchainKeypair::random().public().into(),
vec![],
cfg,
hopr_db_sql::db::HoprDb::new_in_memory(ChainKeypair::random()).await?,
);
peers.add(&peer, PeerOrigin::IncomingConnection, vec![]).await?;
peers.add(&peer2, PeerOrigin::IncomingConnection, vec![]).await?;
for _ in 0..3 {
peers
.update(&peer2, Ok(current_time().as_unix_timestamp()), None)
.await?;
peers
.update(&peer, Ok(current_time().as_unix_timestamp()), None)
.await?;
}
assert_eq!(peers.health().await, Health::Green);
Ok(())
}
}