hopr_db_api/
peers.rs

1use std::time::{Duration, SystemTime};
2
3use async_trait::async_trait;
4use futures::stream::BoxStream;
5use libp2p_identity::PeerId;
6use multiaddr::Multiaddr;
7use tracing::warn;
8
9use hopr_crypto_types::prelude::OffchainPublicKey;
10use hopr_primitive_types::prelude::*;
11
12use crate::errors::Result;
13
14/// Actual origin.
15///
16/// First occurence of the peer in the network mechanism.
17#[derive(Debug, Copy, Clone, PartialEq, Eq, strum::Display, num_enum::IntoPrimitive, num_enum::TryFromPrimitive)]
18#[repr(u8)]
19pub enum PeerOrigin {
20    #[strum(to_string = "node initialization")]
21    Initialization = 0,
22    #[strum(to_string = "network registry")]
23    NetworkRegistry = 1,
24    #[strum(to_string = "incoming connection")]
25    IncomingConnection = 2,
26    #[strum(to_string = "outgoing connection attempt")]
27    OutgoingConnection = 3,
28    #[strum(to_string = "strategy monitors existing channel")]
29    StrategyExistingChannel = 4,
30    #[strum(to_string = "strategy considers opening a channel")]
31    StrategyConsideringChannel = 5,
32    #[strum(to_string = "strategy decided to open new channel")]
33    StrategyNewChannel = 6,
34    #[strum(to_string = "manual ping")]
35    ManualPing = 7,
36    #[strum(to_string = "testing")]
37    Testing = 8,
38}
39
40/// Statistical observation related to peers in the network. Statistics on all peer entries stored
41/// by the network component.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
43pub struct Stats {
44    /// Number of good-quality public nodes.
45    pub good_quality_public: u32,
46    /// Number of bad-quality public nodes.
47    pub bad_quality_public: u32,
48    /// Number of good-quality non-public nodes.
49    pub good_quality_non_public: u32,
50    /// Number of bad-quality non-public nodes.
51    pub bad_quality_non_public: u32,
52}
53
54// #[cfg(all(feature = "prometheus", not(test)))]
55impl Stats {
56    /// Returns count of all peers.
57    pub fn all_count(&self) -> usize {
58        self.good_quality_public as usize
59            + self.bad_quality_public as usize
60            + self.good_quality_non_public as usize
61            + self.bad_quality_non_public as usize
62    }
63}
64
65#[derive(Copy, Clone, Debug, Default, PartialEq)]
66pub struct PeerSelector {
67    /// Lower and upper bounds (both inclusive) on last seen timestamp.
68    pub last_seen: (Option<SystemTime>, Option<SystemTime>),
69    /// Lower and upper bounds (both inclusive) on peer quality.
70    pub quality: (Option<f64>, Option<f64>),
71}
72
73impl PeerSelector {
74    pub fn with_last_seen_gte(mut self, lower_bound: SystemTime) -> Self {
75        self.last_seen.0 = Some(lower_bound);
76        self
77    }
78
79    pub fn with_last_seen_lte(mut self, upper_bound: SystemTime) -> Self {
80        self.last_seen.1 = Some(upper_bound);
81        self
82    }
83
84    pub fn with_quality_gte(mut self, lower_bound: f64) -> Self {
85        self.quality.0 = Some(lower_bound);
86        self
87    }
88
89    pub fn with_quality_lte(mut self, upper_bound: f64) -> Self {
90        self.quality.1 = Some(upper_bound);
91        self
92    }
93}
94
95/// Status of the peer as recorded by a network component.
96#[derive(Debug, Clone, PartialEq)]
97pub struct PeerStatus {
98    pub id: (OffchainPublicKey, PeerId),
99    pub origin: PeerOrigin,
100    pub is_public: bool,
101    pub last_seen: SystemTime,
102    pub last_seen_latency: Duration,
103    pub heartbeats_sent: u64,
104    pub heartbeats_succeeded: u64,
105    pub backoff: f64,
106    pub ignored: Option<SystemTime>,
107    pub peer_version: Option<String>,
108    pub multiaddresses: Vec<Multiaddr>,
109    // Should be public(crate) but the separation through traits does not allow direct SQL ORM serde
110    pub quality: f64,
111    // Should be public(crate) but the separation through traits does not allow direct SQL ORM serde
112    pub quality_avg: SingleSumSMA<f64>,
113}
114
115impl PeerStatus {
116    pub fn new(id: PeerId, origin: PeerOrigin, backoff: f64, quality_window: u32) -> PeerStatus {
117        PeerStatus {
118            id: (OffchainPublicKey::try_from(&id).expect("invalid peer id given"), id),
119            origin,
120            is_public: true,
121            heartbeats_sent: 0,
122            heartbeats_succeeded: 0,
123            last_seen: SystemTime::UNIX_EPOCH,
124            last_seen_latency: Duration::default(),
125            ignored: None,
126            backoff,
127            quality: 0.0,
128            peer_version: None,
129            quality_avg: SingleSumSMA::new(quality_window as usize),
130            multiaddresses: vec![],
131        }
132    }
133
134    // Update both the immediate last quality and the average windowed quality
135    pub fn update_quality(&mut self, new_value: f64) {
136        if (0.0f64..=1.0f64).contains(&new_value) {
137            self.quality = new_value;
138            self.quality_avg.push(new_value);
139        } else {
140            warn!("Quality failed to update with value outside the [0,1] range")
141        }
142    }
143
144    /// Gets the average quality of this peer
145    pub fn get_average_quality(&self) -> f64 {
146        self.quality_avg.average().unwrap_or_default()
147    }
148
149    /// Gets the immediate node quality
150    pub fn get_quality(&self) -> f64 {
151        self.quality
152    }
153
154    /// Determines whether the peer is ignored due to quality concerns, given the current time
155    /// and maximum peer ignore period.
156    #[inline]
157    pub fn is_ignored(&self, now: SystemTime, max_ignore: Duration) -> bool {
158        self.ignored.is_some_and(|t| now.saturating_sub(t) <= max_ignore)
159    }
160}
161
162impl std::fmt::Display for PeerStatus {
163    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
164        write!(f, "Entry: [id={}, origin={}, last seen on={:?}, quality={}, heartbeats sent={}, heartbeats succeeded={}, backoff={}]",
165            self.id.1, self.origin, self.last_seen, self.quality, self.heartbeats_sent, self.heartbeats_succeeded, self.backoff)
166    }
167}
168
169#[async_trait]
170pub trait HoprDbPeersOperations {
171    /// Adds a peer to the backend.
172    ///
173    /// Should fail if the given peer id already exists in the store.
174    async fn add_network_peer(
175        &self,
176        peer: &PeerId,
177        origin: PeerOrigin,
178        mas: Vec<Multiaddr>,
179        backoff: f64,
180        quality_window: u32,
181    ) -> Result<()>;
182
183    /// Removes the peer from the backend.
184    ///
185    /// Should fail if the given peer id does not exist.
186    async fn remove_network_peer(&self, peer: &PeerId) -> Result<()>;
187
188    /// Updates stored information about the peer.
189    /// Should fail if the peer does not exist in the store.
190    async fn update_network_peer(&self, new_status: PeerStatus) -> Result<()>;
191
192    /// Gets stored information about the peer.
193    ///
194    /// Should return `None` if such peer does not exist in the store.
195    async fn get_network_peer(&self, peer: &PeerId) -> Result<Option<PeerStatus>>;
196
197    /// Returns a stream of all stored peers, optionally matching the given [`PeerSelector`] filter.
198    ///
199    /// The `sort_last_seen_asc` indicates whether the results should be sorted in ascending
200    /// or descending order of the `last_seen` field.
201    async fn get_network_peers<'a>(
202        &'a self,
203        selector: PeerSelector,
204        sort_last_seen_asc: bool,
205    ) -> Result<BoxStream<'a, PeerStatus>>;
206
207    /// Returns the [statistics](Stats) on the stored peers.
208    async fn network_peer_stats(&self, quality_threshold: f64) -> Result<Stats>;
209}