hopr_db_api/
peers.rs

1use std::time::{Duration, SystemTime};
2
3use async_trait::async_trait;
4use futures::stream::BoxStream;
5use hopr_crypto_types::prelude::OffchainPublicKey;
6use hopr_primitive_types::prelude::*;
7use libp2p_identity::PeerId;
8use multiaddr::Multiaddr;
9use tracing::warn;
10
11use crate::errors::Result;
12
13/// Actual origin.
14///
15/// First occurence of the peer in the network mechanism.
16#[derive(Debug, Copy, Clone, PartialEq, Eq, strum::Display, num_enum::IntoPrimitive, num_enum::TryFromPrimitive)]
17#[repr(u8)]
18pub enum PeerOrigin {
19    #[strum(to_string = "node initialization")]
20    Initialization = 0,
21    #[strum(to_string = "network registry")]
22    NetworkRegistry = 1,
23    #[strum(to_string = "incoming connection")]
24    IncomingConnection = 2,
25    #[strum(to_string = "outgoing connection attempt")]
26    OutgoingConnection = 3,
27    #[strum(to_string = "strategy monitors existing channel")]
28    StrategyExistingChannel = 4,
29    #[strum(to_string = "strategy considers opening a channel")]
30    StrategyConsideringChannel = 5,
31    #[strum(to_string = "strategy decided to open new channel")]
32    StrategyNewChannel = 6,
33    #[strum(to_string = "manual ping")]
34    ManualPing = 7,
35    #[strum(to_string = "testing")]
36    Testing = 8,
37}
38
39/// Statistical observation related to peers in the network. Statistics on all peer entries stored
40/// by the network component.
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
42pub struct Stats {
43    /// Number of good-quality public nodes.
44    pub good_quality_public: u32,
45    /// Number of bad-quality public nodes.
46    pub bad_quality_public: u32,
47    /// Number of good-quality non-public nodes.
48    pub good_quality_non_public: u32,
49    /// Number of bad-quality non-public nodes.
50    pub bad_quality_non_public: u32,
51}
52
53// #[cfg(all(feature = "prometheus", not(test)))]
54impl Stats {
55    /// Returns count of all peers.
56    pub fn all_count(&self) -> usize {
57        self.good_quality_public as usize
58            + self.bad_quality_public as usize
59            + self.good_quality_non_public as usize
60            + self.bad_quality_non_public as usize
61    }
62}
63
64#[derive(Copy, Clone, Debug, Default, PartialEq)]
65pub struct PeerSelector {
66    /// Lower and upper bounds (both inclusive) on last seen timestamp.
67    pub last_seen: (Option<SystemTime>, Option<SystemTime>),
68    /// Lower and upper bounds (both inclusive) on peer quality.
69    pub quality: (Option<f64>, Option<f64>),
70}
71
72impl PeerSelector {
73    pub fn with_last_seen_gte(mut self, lower_bound: SystemTime) -> Self {
74        self.last_seen.0 = Some(lower_bound);
75        self
76    }
77
78    pub fn with_last_seen_lte(mut self, upper_bound: SystemTime) -> Self {
79        self.last_seen.1 = Some(upper_bound);
80        self
81    }
82
83    pub fn with_quality_gte(mut self, lower_bound: f64) -> Self {
84        self.quality.0 = Some(lower_bound);
85        self
86    }
87
88    pub fn with_quality_lte(mut self, upper_bound: f64) -> Self {
89        self.quality.1 = Some(upper_bound);
90        self
91    }
92}
93
94/// Status of the peer as recorded by a network component.
95#[derive(Debug, Clone, PartialEq)]
96pub struct PeerStatus {
97    pub id: (OffchainPublicKey, PeerId),
98    pub origin: PeerOrigin,
99    pub is_public: bool,
100    pub last_seen: SystemTime,
101    pub last_seen_latency: Duration,
102    pub heartbeats_sent: u64,
103    pub heartbeats_succeeded: u64,
104    pub backoff: f64,
105    pub ignored: Option<SystemTime>,
106    pub peer_version: Option<String>,
107    pub multiaddresses: Vec<Multiaddr>,
108    // Should be public(crate) but the separation through traits does not allow direct SQL ORM serde
109    pub quality: f64,
110    // Should be public(crate) but the separation through traits does not allow direct SQL ORM serde
111    pub quality_avg: SingleSumSMA<f64>,
112}
113
114impl PeerStatus {
115    pub fn new(id: PeerId, origin: PeerOrigin, backoff: f64, quality_window: u32) -> PeerStatus {
116        PeerStatus {
117            id: (OffchainPublicKey::try_from(&id).expect("invalid peer id given"), id),
118            origin,
119            is_public: true,
120            heartbeats_sent: 0,
121            heartbeats_succeeded: 0,
122            last_seen: SystemTime::UNIX_EPOCH,
123            last_seen_latency: Duration::default(),
124            ignored: None,
125            backoff,
126            quality: 0.0,
127            peer_version: None,
128            quality_avg: SingleSumSMA::new(quality_window as usize),
129            multiaddresses: vec![],
130        }
131    }
132
133    // Update both the immediate last quality and the average windowed quality
134    pub fn update_quality(&mut self, new_value: f64) {
135        if (0.0f64..=1.0f64).contains(&new_value) {
136            self.quality = new_value;
137            self.quality_avg.push(new_value);
138        } else {
139            warn!("Quality failed to update with value outside the [0,1] range")
140        }
141    }
142
143    /// Gets the average quality of this peer
144    pub fn get_average_quality(&self) -> f64 {
145        self.quality_avg.average().unwrap_or_default()
146    }
147
148    /// Gets the immediate node quality
149    pub fn get_quality(&self) -> f64 {
150        self.quality
151    }
152
153    /// Determines whether the peer is ignored due to quality concerns, given the current time
154    /// and maximum peer ignore period.
155    #[inline]
156    pub fn is_ignored(&self, now: SystemTime, max_ignore: Duration) -> bool {
157        self.ignored.is_some_and(|t| now.saturating_sub(t) <= max_ignore)
158    }
159}
160
161impl std::fmt::Display for PeerStatus {
162    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
163        write!(
164            f,
165            "Entry: [id={}, origin={}, last seen on={:?}, quality={}, heartbeats sent={}, heartbeats succeeded={}, \
166             backoff={}]",
167            self.id.1,
168            self.origin,
169            self.last_seen,
170            self.quality,
171            self.heartbeats_sent,
172            self.heartbeats_succeeded,
173            self.backoff
174        )
175    }
176}
177
178#[async_trait]
179pub trait HoprDbPeersOperations {
180    /// Adds a peer to the backend.
181    ///
182    /// Should fail if the given peer id already exists in the store.
183    async fn add_network_peer(
184        &self,
185        peer: &PeerId,
186        origin: PeerOrigin,
187        mas: Vec<Multiaddr>,
188        backoff: f64,
189        quality_window: u32,
190    ) -> Result<()>;
191
192    /// Removes the peer from the backend.
193    ///
194    /// Should fail if the given peer id does not exist.
195    async fn remove_network_peer(&self, peer: &PeerId) -> Result<()>;
196
197    /// Updates stored information about the peer.
198    /// Should fail if the peer does not exist in the store.
199    async fn update_network_peer(&self, new_status: PeerStatus) -> Result<()>;
200
201    /// Gets stored information about the peer.
202    ///
203    /// Should return `None` if such peer does not exist in the store.
204    async fn get_network_peer(&self, peer: &PeerId) -> Result<Option<PeerStatus>>;
205
206    /// Returns a stream of all stored peers, optionally matching the given [`PeerSelector`] filter.
207    ///
208    /// The `sort_last_seen_asc` indicates whether the results should be sorted in ascending
209    /// or descending order of the `last_seen` field.
210    async fn get_network_peers<'a>(
211        &'a self,
212        selector: PeerSelector,
213        sort_last_seen_asc: bool,
214    ) -> Result<BoxStream<'a, PeerStatus>>;
215
216    /// Returns the [statistics](Stats) on the stored peers.
217    async fn network_peer_stats(&self, quality_threshold: f64) -> Result<Stats>;
218}