1use async_trait::async_trait;
2use futures::{
3 future::{select, Either, FutureExt},
4 pin_mut, StreamExt,
5};
6use hopr_db_api::peers::HoprDbPeersOperations;
7use hopr_primitive_types::traits::SaturatingSub;
8use libp2p_identity::PeerId;
9use rand::seq::SliceRandom;
10use serde::{Deserialize, Serialize};
11use serde_with::{serde_as, DurationSeconds};
12use validator::Validate;
13
14use tracing::{debug, info};
15
16#[cfg(all(feature = "prometheus", not(test)))]
17use hopr_metrics::{histogram_start_measure, metrics::SimpleHistogram};
18
19#[cfg(all(feature = "prometheus", not(test)))]
20lazy_static::lazy_static! {
21 static ref METRIC_TIME_TO_HEARTBEAT: SimpleHistogram =
22 SimpleHistogram::new(
23 "hopr_heartbeat_round_time_sec",
24 "Measures total time in seconds it takes to probe all other nodes",
25 vec![0.5, 1.0, 2.5, 5.0, 10.0, 15.0, 30.0, 60.0],
26 ).unwrap();
27}
28
29use hopr_platform::time::native::current_time;
30
31use crate::constants::{
32 DEFAULT_HEARTBEAT_INTERVAL, DEFAULT_HEARTBEAT_INTERVAL_VARIANCE, DEFAULT_HEARTBEAT_THRESHOLD,
33 DEFAULT_MAX_PARALLEL_PINGS,
34};
35use crate::network::Network;
36use crate::ping::Pinging;
37
38#[serde_as]
40#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
41#[serde(deny_unknown_fields)]
42pub struct HeartbeatConfig {
43 #[validate(range(min = 0))]
45 #[default(default_max_parallel_pings())]
46 #[serde(default = "default_max_parallel_pings")]
47 pub max_parallel_probes: usize,
48
49 #[serde_as(as = "DurationSeconds<u64>")]
51 #[serde(default = "default_heartbeat_variance")]
52 #[default(default_heartbeat_variance())]
53 pub variance: std::time::Duration,
54 #[serde_as(as = "DurationSeconds<u64>")]
56 #[serde(default = "default_heartbeat_interval")]
57 #[default(default_heartbeat_interval())]
58 pub interval: std::time::Duration,
59 #[serde_as(as = "DurationSeconds<u64>")]
61 #[serde(default = "default_heartbeat_threshold")]
62 #[default(default_heartbeat_threshold())]
63 pub threshold: std::time::Duration,
64}
65
66#[inline]
67fn default_max_parallel_pings() -> usize {
68 DEFAULT_MAX_PARALLEL_PINGS
69}
70
71#[inline]
72fn default_heartbeat_interval() -> std::time::Duration {
73 DEFAULT_HEARTBEAT_INTERVAL
74}
75
76#[inline]
77fn default_heartbeat_threshold() -> std::time::Duration {
78 DEFAULT_HEARTBEAT_THRESHOLD
79}
80
81#[inline]
82fn default_heartbeat_variance() -> std::time::Duration {
83 DEFAULT_HEARTBEAT_INTERVAL_VARIANCE
84}
85
86use std::sync::Arc;
87
88use tracing::error;
89
90#[cfg_attr(test, mockall::automock)]
91#[async_trait]
92pub trait HeartbeatExternalApi {
93 async fn get_peers(&self, from_timestamp: std::time::SystemTime) -> Vec<PeerId>;
97}
98
99#[derive(Debug, Clone)]
107pub struct HeartbeatExternalInteractions<T>
108where
109 T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
110{
111 network: Arc<Network<T>>,
112}
113
114impl<T> HeartbeatExternalInteractions<T>
115where
116 T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
117{
118 pub fn new(network: Arc<Network<T>>) -> Self {
119 Self { network }
120 }
121}
122
123#[async_trait]
124impl<T> HeartbeatExternalApi for HeartbeatExternalInteractions<T>
125where
126 T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
127{
128 async fn get_peers(&self, from_timestamp: std::time::SystemTime) -> Vec<PeerId> {
132 self.network
133 .find_peers_to_ping(from_timestamp)
134 .await
135 .unwrap_or_else(|e| {
136 error!(error = %e, "Failed to generate peers for the heartbeat procedure");
137 vec![]
138 })
139 }
140}
141
142pub type AsyncSleepFn =
143 Box<dyn Fn(std::time::Duration) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send>> + Send>;
144
145pub struct Heartbeat<T: Pinging, API: HeartbeatExternalApi> {
150 config: HeartbeatConfig,
151 pinger: T,
152 external_api: API,
153 sleep_fn: AsyncSleepFn,
154}
155
156impl<T: Pinging, API: HeartbeatExternalApi> std::fmt::Debug for Heartbeat<T, API> {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.debug_struct("Heartbeat").field("config", &self.config).finish()
159 }
160}
161
162impl<T: Pinging, API: HeartbeatExternalApi> Heartbeat<T, API> {
163 pub fn new(config: HeartbeatConfig, pinger: T, external_api: API, sleep_fn: AsyncSleepFn) -> Self {
164 Self {
165 config,
166 pinger,
167 external_api,
168 sleep_fn,
169 }
170 }
171
172 #[tracing::instrument(level = "info", skip(self), fields(from_timestamp = tracing::field::debug(current_time())))]
173 async fn perform_heartbeat_round(&mut self) {
174 let start = current_time();
175 let from_timestamp = start.checked_sub(self.config.threshold).unwrap_or(start);
176
177 let mut peers = self.external_api.get_peers(from_timestamp).await;
178
179 let mut rng = hopr_crypto_random::rng();
181 peers.shuffle(&mut rng);
182
183 #[cfg(all(feature = "prometheus", not(test)))]
184 let heartbeat_round_timer = histogram_start_measure!(METRIC_TIME_TO_HEARTBEAT);
185
186 let this_round_planned_duration = std::time::Duration::from_millis({
188 let interval_ms = self.config.interval.as_millis() as u64;
189 let variance_ms = self.config.variance.as_millis() as u64;
190
191 hopr_crypto_random::random_integer(
192 interval_ms,
193 Some(interval_ms.checked_add(variance_ms.max(1u64)).unwrap_or(u64::MAX)),
194 )
195 });
196
197 let peers_contacted = peers.len();
198 debug!(peers = tracing::field::debug(&peers), "Heartbeat round start");
199 let timeout = (self.sleep_fn)(this_round_planned_duration).fuse();
200 let ping_stream = self.pinger.ping(peers);
201
202 pin_mut!(timeout, ping_stream);
203
204 match select(timeout, ping_stream.collect::<Vec<_>>().fuse()).await {
205 Either::Left(_) => debug!("Heartbeat round interrupted by timeout"),
206 Either::Right((v, _)) => {
207 let this_round_actual_duration = current_time().saturating_sub(start);
209 let time_to_wait_for_next_round =
210 this_round_planned_duration.saturating_sub(this_round_actual_duration);
211
212 let ping_ok = v.iter().filter(|v| v.is_ok()).count();
213 info!(
214 round_duration_ms = tracing::field::debug(this_round_actual_duration.as_millis()),
215 time_til_next_round_ms = tracing::field::debug(time_to_wait_for_next_round.as_millis()),
216 peers_contacted,
217 ping_ok,
218 ping_fail = peers_contacted - ping_ok,
219 "Heartbeat round finished"
220 );
221
222 (self.sleep_fn)(time_to_wait_for_next_round).await
223 }
224 };
225
226 #[cfg(all(feature = "prometheus", not(test)))]
227 METRIC_TIME_TO_HEARTBEAT.record_measure(heartbeat_round_timer);
228 }
229
230 pub async fn heartbeat_loop(&mut self) {
238 loop {
239 self.perform_heartbeat_round().await
240 }
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247 use async_std::task::sleep;
248 use futures::Stream;
249 use std::time::Duration;
250
251 fn simple_heartbeat_config() -> HeartbeatConfig {
252 HeartbeatConfig {
253 variance: std::time::Duration::from_millis(0u64),
254 interval: std::time::Duration::from_millis(5u64),
255 threshold: std::time::Duration::from_millis(0u64),
256 max_parallel_probes: 14,
257 }
258 }
259
260 pub struct DelayingPinger {
261 pub delay: std::time::Duration,
262 }
263
264 impl Pinging for DelayingPinger {
265 fn ping(&self, _peers: Vec<PeerId>) -> impl Stream<Item = crate::errors::Result<Duration>> {
266 futures::stream::once(async {
267 sleep(self.delay).await;
268 Ok(Duration::from_millis(1))
269 })
270 }
271 }
272
273 #[async_std::test]
274 async fn test_heartbeat_should_loop_multiple_times() {
275 let config = simple_heartbeat_config();
276
277 let ping_delay = config.interval / 2;
278 let expected_loop_count = 2u32;
279
280 let mut mock = MockHeartbeatExternalApi::new();
281 mock.expect_get_peers()
282 .times(expected_loop_count as usize..)
283 .return_const(vec![PeerId::random(), PeerId::random()]);
284
285 let mut heartbeat = Heartbeat::new(
286 config,
287 DelayingPinger { delay: ping_delay },
288 mock,
289 Box::new(|dur| Box::pin(sleep(dur))),
290 );
291
292 futures::select!(
293 _ = heartbeat.heartbeat_loop().fuse() => {},
294 _ = sleep(config.interval * expected_loop_count).fuse() => {},
295 );
296 }
297
298 #[async_std::test]
299 async fn test_heartbeat_should_interrupt_long_running_heartbeats() {
300 let config = HeartbeatConfig {
301 interval: std::time::Duration::from_millis(5u64),
302 ..simple_heartbeat_config()
303 };
304
305 let ping_delay = 2 * config.interval + config.interval / 2;
306 let expected_loop_count = 2;
307
308 let mut mock = MockHeartbeatExternalApi::new();
309 mock.expect_get_peers()
310 .times(expected_loop_count..)
311 .return_const(vec![PeerId::random(), PeerId::random()]);
312
313 let mut heartbeat = Heartbeat::new(
314 config,
315 DelayingPinger { delay: ping_delay },
316 mock,
317 Box::new(|dur| Box::pin(sleep(dur))),
318 );
319
320 let tolerance = std::time::Duration::from_millis(2);
321 futures::select!(
322 _ = heartbeat.heartbeat_loop().fuse() => {},
323 _ = sleep(config.interval * (expected_loop_count as u32) + tolerance).fuse() => {},
324 );
325 }
326}