hopr_transport_network/
heartbeat.rs

1use async_trait::async_trait;
2use futures::{
3    future::{select, Either, FutureExt},
4    pin_mut, StreamExt,
5};
6use hopr_db_api::peers::HoprDbPeersOperations;
7use hopr_primitive_types::traits::SaturatingSub;
8use libp2p_identity::PeerId;
9use rand::seq::SliceRandom;
10use serde::{Deserialize, Serialize};
11use serde_with::{serde_as, DurationSeconds};
12use validator::Validate;
13
14use tracing::{debug, info};
15
16#[cfg(all(feature = "prometheus", not(test)))]
17use hopr_metrics::{histogram_start_measure, metrics::SimpleHistogram};
18
19#[cfg(all(feature = "prometheus", not(test)))]
20lazy_static::lazy_static! {
21    static ref METRIC_TIME_TO_HEARTBEAT: SimpleHistogram =
22        SimpleHistogram::new(
23            "hopr_heartbeat_round_time_sec",
24            "Measures total time in seconds it takes to probe all other nodes",
25            vec![0.5, 1.0, 2.5, 5.0, 10.0, 15.0, 30.0, 60.0],
26        ).unwrap();
27}
28
29use hopr_platform::time::native::current_time;
30
31use crate::constants::{
32    DEFAULT_HEARTBEAT_INTERVAL, DEFAULT_HEARTBEAT_INTERVAL_VARIANCE, DEFAULT_HEARTBEAT_THRESHOLD,
33    DEFAULT_MAX_PARALLEL_PINGS,
34};
35use crate::network::Network;
36use crate::ping::Pinging;
37
38/// Configuration for the Heartbeat mechanism
39#[serde_as]
40#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
41#[serde(deny_unknown_fields)]
42pub struct HeartbeatConfig {
43    /// Maximum number of parallel probes performed by the heartbeat mechanism
44    #[validate(range(min = 0))]
45    #[default(default_max_parallel_pings())]
46    #[serde(default = "default_max_parallel_pings")]
47    pub max_parallel_probes: usize,
48
49    /// Round-to-round variance to complicate network sync in seconds
50    #[serde_as(as = "DurationSeconds<u64>")]
51    #[serde(default = "default_heartbeat_variance")]
52    #[default(default_heartbeat_variance())]
53    pub variance: std::time::Duration,
54    /// Interval in which the heartbeat is triggered in seconds
55    #[serde_as(as = "DurationSeconds<u64>")]
56    #[serde(default = "default_heartbeat_interval")]
57    #[default(default_heartbeat_interval())]
58    pub interval: std::time::Duration,
59    /// The time interval for which to consider peer heartbeat renewal in seconds
60    #[serde_as(as = "DurationSeconds<u64>")]
61    #[serde(default = "default_heartbeat_threshold")]
62    #[default(default_heartbeat_threshold())]
63    pub threshold: std::time::Duration,
64}
65
66#[inline]
67fn default_max_parallel_pings() -> usize {
68    DEFAULT_MAX_PARALLEL_PINGS
69}
70
71#[inline]
72fn default_heartbeat_interval() -> std::time::Duration {
73    DEFAULT_HEARTBEAT_INTERVAL
74}
75
76#[inline]
77fn default_heartbeat_threshold() -> std::time::Duration {
78    DEFAULT_HEARTBEAT_THRESHOLD
79}
80
81#[inline]
82fn default_heartbeat_variance() -> std::time::Duration {
83    DEFAULT_HEARTBEAT_INTERVAL_VARIANCE
84}
85
86use std::sync::Arc;
87
88use tracing::error;
89
90#[cfg_attr(test, mockall::automock)]
91#[async_trait]
92pub trait HeartbeatExternalApi {
93    /// Get all peers considered by the `Network` to be pingable.
94    ///
95    /// After a duration of non-pinging based specified by the configurable threshold.
96    async fn get_peers(&self, from_timestamp: std::time::SystemTime) -> Vec<PeerId>;
97}
98
99/// Implementor of the heartbeat external API.
100///
101/// Heartbeat requires functionality from external components in order to obtain
102/// the triggers for its functionality. This class implements the basic API by
103/// aggregating all necessary heartbeat resources without leaking them into the
104/// `Heartbeat` object and keeping both the adaptor and the heartbeat object
105/// OCP and SRP compliant.
106#[derive(Debug, Clone)]
107pub struct HeartbeatExternalInteractions<T>
108where
109    T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
110{
111    network: Arc<Network<T>>,
112}
113
114impl<T> HeartbeatExternalInteractions<T>
115where
116    T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
117{
118    pub fn new(network: Arc<Network<T>>) -> Self {
119        Self { network }
120    }
121}
122
123#[async_trait]
124impl<T> HeartbeatExternalApi for HeartbeatExternalInteractions<T>
125where
126    T: HoprDbPeersOperations + Sync + Send + std::fmt::Debug,
127{
128    /// Get all peers considered by the `Network` to be pingable.
129    ///
130    /// After a duration of non-pinging based specified by the configurable threshold.
131    async fn get_peers(&self, from_timestamp: std::time::SystemTime) -> Vec<PeerId> {
132        self.network
133            .find_peers_to_ping(from_timestamp)
134            .await
135            .unwrap_or_else(|e| {
136                error!(error = %e, "Failed to generate peers for the heartbeat procedure");
137                vec![]
138            })
139    }
140}
141
142pub type AsyncSleepFn =
143    Box<dyn Fn(std::time::Duration) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send>> + Send>;
144
145/// Heartbeat mechanism providing the regular trigger and processing for the heartbeat protocol.
146///
147/// This object provides a single public method that can be polled. Once triggered, it will never
148/// return and will only terminate with an unresolvable error or a panic.
149pub struct Heartbeat<T: Pinging, API: HeartbeatExternalApi> {
150    config: HeartbeatConfig,
151    pinger: T,
152    external_api: API,
153    sleep_fn: AsyncSleepFn,
154}
155
156impl<T: Pinging, API: HeartbeatExternalApi> std::fmt::Debug for Heartbeat<T, API> {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.debug_struct("Heartbeat").field("config", &self.config).finish()
159    }
160}
161
162impl<T: Pinging, API: HeartbeatExternalApi> Heartbeat<T, API> {
163    pub fn new(config: HeartbeatConfig, pinger: T, external_api: API, sleep_fn: AsyncSleepFn) -> Self {
164        Self {
165            config,
166            pinger,
167            external_api,
168            sleep_fn,
169        }
170    }
171
172    #[tracing::instrument(level = "info", skip(self), fields(from_timestamp = tracing::field::debug(current_time())))]
173    async fn perform_heartbeat_round(&mut self) {
174        let start = current_time();
175        let from_timestamp = start.checked_sub(self.config.threshold).unwrap_or(start);
176
177        let mut peers = self.external_api.get_peers(from_timestamp).await;
178
179        // shuffle the peers to make sure that the order is different each heartbeat round
180        let mut rng = hopr_crypto_random::rng();
181        peers.shuffle(&mut rng);
182
183        #[cfg(all(feature = "prometheus", not(test)))]
184        let heartbeat_round_timer = histogram_start_measure!(METRIC_TIME_TO_HEARTBEAT);
185
186        // random timeout to avoid network sync:
187        let this_round_planned_duration = std::time::Duration::from_millis({
188            let interval_ms = self.config.interval.as_millis() as u64;
189            let variance_ms = self.config.variance.as_millis() as u64;
190
191            hopr_crypto_random::random_integer(
192                interval_ms,
193                Some(interval_ms.checked_add(variance_ms.max(1u64)).unwrap_or(u64::MAX)),
194            )
195        });
196
197        let peers_contacted = peers.len();
198        debug!(peers = tracing::field::debug(&peers), "Heartbeat round start");
199        let timeout = (self.sleep_fn)(this_round_planned_duration).fuse();
200        let ping_stream = self.pinger.ping(peers);
201
202        pin_mut!(timeout, ping_stream);
203
204        match select(timeout, ping_stream.collect::<Vec<_>>().fuse()).await {
205            Either::Left(_) => debug!("Heartbeat round interrupted by timeout"),
206            Either::Right((v, _)) => {
207                // We intentionally ignore any ping errors here
208                let this_round_actual_duration = current_time().saturating_sub(start);
209                let time_to_wait_for_next_round =
210                    this_round_planned_duration.saturating_sub(this_round_actual_duration);
211
212                let ping_ok = v.iter().filter(|v| v.is_ok()).count();
213                info!(
214                    round_duration_ms = tracing::field::debug(this_round_actual_duration.as_millis()),
215                    time_til_next_round_ms = tracing::field::debug(time_to_wait_for_next_round.as_millis()),
216                    peers_contacted,
217                    ping_ok,
218                    ping_fail = peers_contacted - ping_ok,
219                    "Heartbeat round finished"
220                );
221
222                (self.sleep_fn)(time_to_wait_for_next_round).await
223            }
224        };
225
226        #[cfg(all(feature = "prometheus", not(test)))]
227        METRIC_TIME_TO_HEARTBEAT.record_measure(heartbeat_round_timer);
228    }
229
230    /// Heartbeat loop responsible for periodically requesting peers to ping around from the
231    /// external API interface.
232    ///
233    /// The loop runs indefinitely, until the program is explicitly terminated.
234    ///
235    /// This feature should be joined with other internal loops and awaited after all
236    /// components have been initialized.
237    pub async fn heartbeat_loop(&mut self) {
238        loop {
239            self.perform_heartbeat_round().await
240        }
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247    use async_std::task::sleep;
248    use futures::Stream;
249    use std::time::Duration;
250
251    fn simple_heartbeat_config() -> HeartbeatConfig {
252        HeartbeatConfig {
253            variance: std::time::Duration::from_millis(0u64),
254            interval: std::time::Duration::from_millis(5u64),
255            threshold: std::time::Duration::from_millis(0u64),
256            max_parallel_probes: 14,
257        }
258    }
259
260    pub struct DelayingPinger {
261        pub delay: std::time::Duration,
262    }
263
264    impl Pinging for DelayingPinger {
265        fn ping(&self, _peers: Vec<PeerId>) -> impl Stream<Item = crate::errors::Result<Duration>> {
266            futures::stream::once(async {
267                sleep(self.delay).await;
268                Ok(Duration::from_millis(1))
269            })
270        }
271    }
272
273    #[async_std::test]
274    async fn test_heartbeat_should_loop_multiple_times() {
275        let config = simple_heartbeat_config();
276
277        let ping_delay = config.interval / 2;
278        let expected_loop_count = 2u32;
279
280        let mut mock = MockHeartbeatExternalApi::new();
281        mock.expect_get_peers()
282            .times(expected_loop_count as usize..)
283            .return_const(vec![PeerId::random(), PeerId::random()]);
284
285        let mut heartbeat = Heartbeat::new(
286            config,
287            DelayingPinger { delay: ping_delay },
288            mock,
289            Box::new(|dur| Box::pin(sleep(dur))),
290        );
291
292        futures::select!(
293            _ = heartbeat.heartbeat_loop().fuse() => {},
294            _ = sleep(config.interval * expected_loop_count).fuse() => {},
295        );
296    }
297
298    #[async_std::test]
299    async fn test_heartbeat_should_interrupt_long_running_heartbeats() {
300        let config = HeartbeatConfig {
301            interval: std::time::Duration::from_millis(5u64),
302            ..simple_heartbeat_config()
303        };
304
305        let ping_delay = 2 * config.interval + config.interval / 2;
306        let expected_loop_count = 2;
307
308        let mut mock = MockHeartbeatExternalApi::new();
309        mock.expect_get_peers()
310            .times(expected_loop_count..)
311            .return_const(vec![PeerId::random(), PeerId::random()]);
312
313        let mut heartbeat = Heartbeat::new(
314            config,
315            DelayingPinger { delay: ping_delay },
316            mock,
317            Box::new(|dur| Box::pin(sleep(dur))),
318        );
319
320        let tolerance = std::time::Duration::from_millis(2);
321        futures::select!(
322            _ = heartbeat.heartbeat_loop().fuse() => {},
323            _ = sleep(config.interval * (expected_loop_count as u32) + tolerance).fuse() => {},
324        );
325    }
326}