hopr_api/db/
peers.rs

1use std::time::{Duration, SystemTime};
2
3use async_trait::async_trait;
4use futures::stream::BoxStream;
5use hopr_crypto_types::prelude::{OffchainPublicKey, PeerId};
6use hopr_primitive_types::prelude::*;
7use multiaddr::Multiaddr;
8
9/// Actual origin.
10///
11/// First occurence of the peer in the network mechanism.
12#[derive(Debug, Copy, Clone, PartialEq, Eq, strum::Display, strum::FromRepr)]
13#[repr(u8)]
14pub enum PeerOrigin {
15    #[strum(to_string = "node initialization")]
16    Initialization = 0,
17    #[strum(to_string = "network registry")]
18    NetworkRegistry = 1,
19    #[strum(to_string = "incoming connection")]
20    IncomingConnection = 2,
21    #[strum(to_string = "outgoing connection attempt")]
22    OutgoingConnection = 3,
23    #[strum(to_string = "strategy monitors existing channel")]
24    StrategyExistingChannel = 4,
25    #[strum(to_string = "strategy considers opening a channel")]
26    StrategyConsideringChannel = 5,
27    #[strum(to_string = "strategy decided to open new channel")]
28    StrategyNewChannel = 6,
29    #[strum(to_string = "manual ping")]
30    ManualPing = 7,
31    #[strum(to_string = "testing")]
32    Testing = 8,
33}
34
35/// Statistical observation related to peers in the network. Statistics on all peer entries stored
36/// by the network component.
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
38pub struct Stats {
39    /// Number of good-quality public nodes.
40    pub good_quality_public: u32,
41    /// Number of bad-quality public nodes.
42    pub bad_quality_public: u32,
43    /// Number of good-quality non-public nodes.
44    pub good_quality_non_public: u32,
45    /// Number of bad-quality non-public nodes.
46    pub bad_quality_non_public: u32,
47}
48
49// #[cfg(all(feature = "prometheus", not(test)))]
50impl Stats {
51    /// Returns count of all peers.
52    pub fn all_count(&self) -> usize {
53        self.good_quality_public as usize
54            + self.bad_quality_public as usize
55            + self.good_quality_non_public as usize
56            + self.bad_quality_non_public as usize
57    }
58}
59
60#[derive(Copy, Clone, Debug, Default, PartialEq)]
61pub struct PeerSelector {
62    /// Lower and upper bounds (both inclusive) on last seen timestamp.
63    pub last_seen: (Option<SystemTime>, Option<SystemTime>),
64    /// Lower and upper bounds (both inclusive) on peer quality.
65    pub quality: (Option<f64>, Option<f64>),
66}
67
68impl PeerSelector {
69    pub fn with_last_seen_gte(mut self, lower_bound: SystemTime) -> Self {
70        self.last_seen.0 = Some(lower_bound);
71        self
72    }
73
74    pub fn with_last_seen_lte(mut self, upper_bound: SystemTime) -> Self {
75        self.last_seen.1 = Some(upper_bound);
76        self
77    }
78
79    pub fn with_quality_gte(mut self, lower_bound: f64) -> Self {
80        self.quality.0 = Some(lower_bound);
81        self
82    }
83
84    pub fn with_quality_lte(mut self, upper_bound: f64) -> Self {
85        self.quality.1 = Some(upper_bound);
86        self
87    }
88}
89
90/// Status of the peer as recorded by a network component.
91#[derive(Debug, Clone, PartialEq)]
92pub struct PeerStatus {
93    pub id: (OffchainPublicKey, PeerId),
94    pub origin: PeerOrigin,
95    pub last_seen: SystemTime,
96    pub last_seen_latency: Duration,
97    pub heartbeats_sent: u64,
98    pub heartbeats_succeeded: u64,
99    pub backoff: f64,
100    /// Until when the peer should be ignored.
101    pub ignored_until: Option<SystemTime>,
102    pub multiaddresses: Vec<Multiaddr>,
103    // Should be public(crate) but the separation through traits does not allow direct SQL ORM serde
104    pub quality: f64,
105    // Should be public(crate) but the separation through traits does not allow direct SQL ORM serde
106    pub quality_avg: SingleSumSMA<f64>,
107}
108
109impl PeerStatus {
110    pub fn new(id: PeerId, origin: PeerOrigin, backoff: f64, quality_window: u32) -> PeerStatus {
111        PeerStatus {
112            id: (OffchainPublicKey::from_peerid(&id).expect("invalid peer id given"), id),
113            origin,
114            heartbeats_sent: 0,
115            heartbeats_succeeded: 0,
116            last_seen: SystemTime::UNIX_EPOCH,
117            last_seen_latency: Duration::default(),
118            ignored_until: None,
119            backoff,
120            quality: 0.0,
121            quality_avg: SingleSumSMA::new(quality_window as usize),
122            multiaddresses: vec![],
123        }
124    }
125
126    // Update both the immediate last quality and the average windowed quality
127    pub fn update_quality(&mut self, new_value: f64) {
128        debug_assert!(
129            (0.0f64..=1.0f64).contains(&new_value),
130            "quality failed to update with value outside the [0,1] range"
131        );
132
133        if (0.0f64..=1.0f64).contains(&new_value) {
134            self.quality = new_value;
135            self.quality_avg.push(new_value);
136        }
137    }
138
139    /// Gets the average quality of this peer
140    pub fn get_average_quality(&self) -> f64 {
141        self.quality_avg.average().unwrap_or_default()
142    }
143
144    /// Gets the immediate node quality
145    pub fn get_quality(&self) -> f64 {
146        self.quality
147    }
148
149    /// Determines whether the peer is ignored due to quality concerns, given the current time.
150    #[inline]
151    pub fn is_ignored(&self) -> bool {
152        let now = hopr_platform::time::current_time();
153        self.ignored_until.is_some_and(|t| now <= t)
154    }
155}
156
157impl std::fmt::Display for PeerStatus {
158    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
159        write!(
160            f,
161            "Entry: [id={}, origin={}, last seen on={:?}, quality={}, heartbeats sent={}, heartbeats succeeded={}, \
162             backoff={}]",
163            self.id.1,
164            self.origin,
165            self.last_seen,
166            self.quality,
167            self.heartbeats_sent,
168            self.heartbeats_succeeded,
169            self.backoff
170        )
171    }
172}
173
174#[async_trait]
175pub trait HoprDbPeersOperations {
176    type Error: std::error::Error + Send + Sync + 'static;
177    /// Adds a peer to the backend.
178    ///
179    /// Should fail if the given peer id already exists in the store.
180    async fn add_network_peer(
181        &self,
182        peer: &PeerId,
183        origin: PeerOrigin,
184        mas: Vec<Multiaddr>,
185        backoff: f64,
186        quality_window: u32,
187    ) -> Result<(), Self::Error>;
188
189    /// Removes the peer from the backend.
190    ///
191    /// Should fail if the given peer id does not exist.
192    async fn remove_network_peer(&self, peer: &PeerId) -> Result<(), Self::Error>;
193
194    /// Updates stored information about the peer.
195    /// Should fail if the peer does not exist in the store.
196    async fn update_network_peer(&self, new_status: PeerStatus) -> Result<(), Self::Error>;
197
198    /// Gets stored information about the peer.
199    ///
200    /// Should return `None` if such peer does not exist in the store.
201    async fn get_network_peer(&self, peer: &PeerId) -> Result<Option<PeerStatus>, Self::Error>;
202
203    /// Returns a stream of all stored peers, optionally matching the given [`PeerSelector`] filter.
204    ///
205    /// The `sort_last_seen_asc` indicates whether the results should be sorted in ascending
206    /// or descending order of the `last_seen` field.
207    async fn get_network_peers<'a>(
208        &'a self,
209        selector: PeerSelector,
210        sort_last_seen_asc: bool,
211    ) -> Result<BoxStream<'a, PeerStatus>, Self::Error>;
212
213    /// Returns the [statistics](Stats) on the stored peers.
214    async fn network_peer_stats(&self, quality_threshold: f64) -> Result<Stats, Self::Error>;
215}