1use std::time::{Duration, SystemTime};
2
3use async_trait::async_trait;
4use futures::stream::BoxStream;
5use hopr_crypto_types::prelude::OffchainPublicKey;
6use hopr_primitive_types::prelude::*;
7use libp2p_identity::PeerId;
8use multiaddr::Multiaddr;
9use tracing::warn;
10
11use crate::errors::Result;
12
13#[derive(Debug, Copy, Clone, PartialEq, Eq, strum::Display, num_enum::IntoPrimitive, num_enum::TryFromPrimitive)]
17#[repr(u8)]
18pub enum PeerOrigin {
19 #[strum(to_string = "node initialization")]
20 Initialization = 0,
21 #[strum(to_string = "network registry")]
22 NetworkRegistry = 1,
23 #[strum(to_string = "incoming connection")]
24 IncomingConnection = 2,
25 #[strum(to_string = "outgoing connection attempt")]
26 OutgoingConnection = 3,
27 #[strum(to_string = "strategy monitors existing channel")]
28 StrategyExistingChannel = 4,
29 #[strum(to_string = "strategy considers opening a channel")]
30 StrategyConsideringChannel = 5,
31 #[strum(to_string = "strategy decided to open new channel")]
32 StrategyNewChannel = 6,
33 #[strum(to_string = "manual ping")]
34 ManualPing = 7,
35 #[strum(to_string = "testing")]
36 Testing = 8,
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
42pub struct Stats {
43 pub good_quality_public: u32,
45 pub bad_quality_public: u32,
47 pub good_quality_non_public: u32,
49 pub bad_quality_non_public: u32,
51}
52
53impl Stats {
55 pub fn all_count(&self) -> usize {
57 self.good_quality_public as usize
58 + self.bad_quality_public as usize
59 + self.good_quality_non_public as usize
60 + self.bad_quality_non_public as usize
61 }
62}
63
64#[derive(Copy, Clone, Debug, Default, PartialEq)]
65pub struct PeerSelector {
66 pub last_seen: (Option<SystemTime>, Option<SystemTime>),
68 pub quality: (Option<f64>, Option<f64>),
70}
71
72impl PeerSelector {
73 pub fn with_last_seen_gte(mut self, lower_bound: SystemTime) -> Self {
74 self.last_seen.0 = Some(lower_bound);
75 self
76 }
77
78 pub fn with_last_seen_lte(mut self, upper_bound: SystemTime) -> Self {
79 self.last_seen.1 = Some(upper_bound);
80 self
81 }
82
83 pub fn with_quality_gte(mut self, lower_bound: f64) -> Self {
84 self.quality.0 = Some(lower_bound);
85 self
86 }
87
88 pub fn with_quality_lte(mut self, upper_bound: f64) -> Self {
89 self.quality.1 = Some(upper_bound);
90 self
91 }
92}
93
94#[derive(Debug, Clone, PartialEq)]
96pub struct PeerStatus {
97 pub id: (OffchainPublicKey, PeerId),
98 pub origin: PeerOrigin,
99 pub is_public: bool,
100 pub last_seen: SystemTime,
101 pub last_seen_latency: Duration,
102 pub heartbeats_sent: u64,
103 pub heartbeats_succeeded: u64,
104 pub backoff: f64,
105 pub ignored: Option<SystemTime>,
106 pub peer_version: Option<String>,
107 pub multiaddresses: Vec<Multiaddr>,
108 pub quality: f64,
110 pub quality_avg: SingleSumSMA<f64>,
112}
113
114impl PeerStatus {
115 pub fn new(id: PeerId, origin: PeerOrigin, backoff: f64, quality_window: u32) -> PeerStatus {
116 PeerStatus {
117 id: (OffchainPublicKey::try_from(&id).expect("invalid peer id given"), id),
118 origin,
119 is_public: true,
120 heartbeats_sent: 0,
121 heartbeats_succeeded: 0,
122 last_seen: SystemTime::UNIX_EPOCH,
123 last_seen_latency: Duration::default(),
124 ignored: None,
125 backoff,
126 quality: 0.0,
127 peer_version: None,
128 quality_avg: SingleSumSMA::new(quality_window as usize),
129 multiaddresses: vec![],
130 }
131 }
132
133 pub fn update_quality(&mut self, new_value: f64) {
135 if (0.0f64..=1.0f64).contains(&new_value) {
136 self.quality = new_value;
137 self.quality_avg.push(new_value);
138 } else {
139 warn!("Quality failed to update with value outside the [0,1] range")
140 }
141 }
142
143 pub fn get_average_quality(&self) -> f64 {
145 self.quality_avg.average().unwrap_or_default()
146 }
147
148 pub fn get_quality(&self) -> f64 {
150 self.quality
151 }
152
153 #[inline]
156 pub fn is_ignored(&self, now: SystemTime, max_ignore: Duration) -> bool {
157 self.ignored.is_some_and(|t| now.saturating_sub(t) <= max_ignore)
158 }
159}
160
161impl std::fmt::Display for PeerStatus {
162 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
163 write!(
164 f,
165 "Entry: [id={}, origin={}, last seen on={:?}, quality={}, heartbeats sent={}, heartbeats succeeded={}, \
166 backoff={}]",
167 self.id.1,
168 self.origin,
169 self.last_seen,
170 self.quality,
171 self.heartbeats_sent,
172 self.heartbeats_succeeded,
173 self.backoff
174 )
175 }
176}
177
178#[async_trait]
179pub trait HoprDbPeersOperations {
180 async fn add_network_peer(
184 &self,
185 peer: &PeerId,
186 origin: PeerOrigin,
187 mas: Vec<Multiaddr>,
188 backoff: f64,
189 quality_window: u32,
190 ) -> Result<()>;
191
192 async fn remove_network_peer(&self, peer: &PeerId) -> Result<()>;
196
197 async fn update_network_peer(&self, new_status: PeerStatus) -> Result<()>;
200
201 async fn get_network_peer(&self, peer: &PeerId) -> Result<Option<PeerStatus>>;
205
206 async fn get_network_peers<'a>(
211 &'a self,
212 selector: PeerSelector,
213 sort_last_seen_asc: bool,
214 ) -> Result<BoxStream<'a, PeerStatus>>;
215
216 async fn network_peer_stats(&self, quality_threshold: f64) -> Result<Stats>;
218}