hopr_db_api/
peers.rs

1use std::time::{Duration, SystemTime};
2
3use async_trait::async_trait;
4use futures::stream::BoxStream;
5use hopr_crypto_types::prelude::OffchainPublicKey;
6use hopr_primitive_types::prelude::*;
7use libp2p_identity::PeerId;
8use multiaddr::Multiaddr;
9use tracing::warn;
10
11use crate::errors::Result;
12
13/// Actual origin.
14///
15/// First occurence of the peer in the network mechanism.
16#[derive(Debug, Copy, Clone, PartialEq, Eq, strum::Display, num_enum::IntoPrimitive, num_enum::TryFromPrimitive)]
17#[repr(u8)]
18pub enum PeerOrigin {
19    #[strum(to_string = "node initialization")]
20    Initialization = 0,
21    #[strum(to_string = "network registry")]
22    NetworkRegistry = 1,
23    #[strum(to_string = "incoming connection")]
24    IncomingConnection = 2,
25    #[strum(to_string = "outgoing connection attempt")]
26    OutgoingConnection = 3,
27    #[strum(to_string = "strategy monitors existing channel")]
28    StrategyExistingChannel = 4,
29    #[strum(to_string = "strategy considers opening a channel")]
30    StrategyConsideringChannel = 5,
31    #[strum(to_string = "strategy decided to open new channel")]
32    StrategyNewChannel = 6,
33    #[strum(to_string = "manual ping")]
34    ManualPing = 7,
35    #[strum(to_string = "testing")]
36    Testing = 8,
37}
38
39/// Statistical observation related to peers in the network. Statistics on all peer entries stored
40/// by the network component.
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
42pub struct Stats {
43    /// Number of good-quality public nodes.
44    pub good_quality_public: u32,
45    /// Number of bad-quality public nodes.
46    pub bad_quality_public: u32,
47    /// Number of good-quality non-public nodes.
48    pub good_quality_non_public: u32,
49    /// Number of bad-quality non-public nodes.
50    pub bad_quality_non_public: u32,
51}
52
53// #[cfg(all(feature = "prometheus", not(test)))]
54impl Stats {
55    /// Returns count of all peers.
56    pub fn all_count(&self) -> usize {
57        self.good_quality_public as usize
58            + self.bad_quality_public as usize
59            + self.good_quality_non_public as usize
60            + self.bad_quality_non_public as usize
61    }
62}
63
64#[derive(Copy, Clone, Debug, Default, PartialEq)]
65pub struct PeerSelector {
66    /// Lower and upper bounds (both inclusive) on last seen timestamp.
67    pub last_seen: (Option<SystemTime>, Option<SystemTime>),
68    /// Lower and upper bounds (both inclusive) on peer quality.
69    pub quality: (Option<f64>, Option<f64>),
70}
71
72impl PeerSelector {
73    pub fn with_last_seen_gte(mut self, lower_bound: SystemTime) -> Self {
74        self.last_seen.0 = Some(lower_bound);
75        self
76    }
77
78    pub fn with_last_seen_lte(mut self, upper_bound: SystemTime) -> Self {
79        self.last_seen.1 = Some(upper_bound);
80        self
81    }
82
83    pub fn with_quality_gte(mut self, lower_bound: f64) -> Self {
84        self.quality.0 = Some(lower_bound);
85        self
86    }
87
88    pub fn with_quality_lte(mut self, upper_bound: f64) -> Self {
89        self.quality.1 = Some(upper_bound);
90        self
91    }
92}
93
94/// Status of the peer as recorded by a network component.
95#[derive(Debug, Clone, PartialEq)]
96pub struct PeerStatus {
97    pub id: (OffchainPublicKey, PeerId),
98    pub origin: PeerOrigin,
99    pub last_seen: SystemTime,
100    pub last_seen_latency: Duration,
101    pub heartbeats_sent: u64,
102    pub heartbeats_succeeded: u64,
103    pub backoff: f64,
104    /// Until when the peer should be ignored.
105    pub ignored_until: Option<SystemTime>,
106    pub multiaddresses: Vec<Multiaddr>,
107    // Should be public(crate) but the separation through traits does not allow direct SQL ORM serde
108    pub quality: f64,
109    // Should be public(crate) but the separation through traits does not allow direct SQL ORM serde
110    pub quality_avg: SingleSumSMA<f64>,
111}
112
113impl PeerStatus {
114    pub fn new(id: PeerId, origin: PeerOrigin, backoff: f64, quality_window: u32) -> PeerStatus {
115        PeerStatus {
116            id: (OffchainPublicKey::from_peerid(&id).expect("invalid peer id given"), id),
117            origin,
118            heartbeats_sent: 0,
119            heartbeats_succeeded: 0,
120            last_seen: SystemTime::UNIX_EPOCH,
121            last_seen_latency: Duration::default(),
122            ignored_until: None,
123            backoff,
124            quality: 0.0,
125            quality_avg: SingleSumSMA::new(quality_window as usize),
126            multiaddresses: vec![],
127        }
128    }
129
130    // Update both the immediate last quality and the average windowed quality
131    pub fn update_quality(&mut self, new_value: f64) {
132        if (0.0f64..=1.0f64).contains(&new_value) {
133            self.quality = new_value;
134            self.quality_avg.push(new_value);
135        } else {
136            warn!("Quality failed to update with value outside the [0,1] range")
137        }
138    }
139
140    /// Gets the average quality of this peer
141    pub fn get_average_quality(&self) -> f64 {
142        self.quality_avg.average().unwrap_or_default()
143    }
144
145    /// Gets the immediate node quality
146    pub fn get_quality(&self) -> f64 {
147        self.quality
148    }
149
150    /// Determines whether the peer is ignored due to quality concerns, given the current time.
151    #[inline]
152    pub fn is_ignored(&self) -> bool {
153        let now = hopr_platform::time::current_time();
154        self.ignored_until
155            .is_some_and(|t| now.saturating_sub(t) == std::time::Duration::from_secs(0))
156    }
157}
158
159impl std::fmt::Display for PeerStatus {
160    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
161        write!(
162            f,
163            "Entry: [id={}, origin={}, last seen on={:?}, quality={}, heartbeats sent={}, heartbeats succeeded={}, \
164             backoff={}]",
165            self.id.1,
166            self.origin,
167            self.last_seen,
168            self.quality,
169            self.heartbeats_sent,
170            self.heartbeats_succeeded,
171            self.backoff
172        )
173    }
174}
175
176#[async_trait]
177pub trait HoprDbPeersOperations {
178    /// Adds a peer to the backend.
179    ///
180    /// Should fail if the given peer id already exists in the store.
181    async fn add_network_peer(
182        &self,
183        peer: &PeerId,
184        origin: PeerOrigin,
185        mas: Vec<Multiaddr>,
186        backoff: f64,
187        quality_window: u32,
188    ) -> Result<()>;
189
190    /// Removes the peer from the backend.
191    ///
192    /// Should fail if the given peer id does not exist.
193    async fn remove_network_peer(&self, peer: &PeerId) -> Result<()>;
194
195    /// Updates stored information about the peer.
196    /// Should fail if the peer does not exist in the store.
197    async fn update_network_peer(&self, new_status: PeerStatus) -> Result<()>;
198
199    /// Gets stored information about the peer.
200    ///
201    /// Should return `None` if such peer does not exist in the store.
202    async fn get_network_peer(&self, peer: &PeerId) -> Result<Option<PeerStatus>>;
203
204    /// Returns a stream of all stored peers, optionally matching the given [`PeerSelector`] filter.
205    ///
206    /// The `sort_last_seen_asc` indicates whether the results should be sorted in ascending
207    /// or descending order of the `last_seen` field.
208    async fn get_network_peers<'a>(
209        &'a self,
210        selector: PeerSelector,
211        sort_last_seen_asc: bool,
212    ) -> Result<BoxStream<'a, PeerStatus>>;
213
214    /// Returns the [statistics](Stats) on the stored peers.
215    async fn network_peer_stats(&self, quality_threshold: f64) -> Result<Stats>;
216}