Skip to main content

hopr_ct_telemetry/
lib.rs

1use std::sync::Arc;
2
3use futures::StreamExt;
4use hopr_api::{
5    PeerId,
6    ct::{DestinationRouting, NetworkGraphView, TrafficGeneration},
7    network::{NetworkObservations, NetworkView},
8};
9use hopr_crypto_random::Randomizable;
10use hopr_crypto_types::types::OffchainPublicKey;
11use hopr_internal_types::protocol::HoprPseudonym;
12use hopr_network_types::types::RoutingOptions;
13use rand::seq::SliceRandom;
14#[cfg(feature = "serde")]
15use serde::{Deserialize, Serialize};
16use validator::Validate;
17
18/// Configuration for the probing mechanism
19#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault, Validate)]
20#[cfg_attr(feature = "serde", derive(Serialize, Deserialize), serde(deny_unknown_fields))]
21pub struct ProberConfig {
22    /// The delay between individual probing rounds for neighbor discovery
23    #[cfg_attr(
24        feature = "serde",
25        serde(default = "default_probing_interval", with = "humantime_serde")
26    )]
27    #[default(default_probing_interval())]
28    pub interval: std::time::Duration,
29
30    /// The time threshold after which it is reasonable to recheck the nearest neighbor
31    #[cfg_attr(
32        feature = "serde",
33        serde(default = "default_recheck_threshold", with = "humantime_serde")
34    )]
35    #[default(default_recheck_threshold())]
36    pub recheck_threshold: std::time::Duration,
37}
38
39/// Delay before repeating probing rounds, must include enough time to traverse NATs
40const DEFAULT_REPEATED_PROBING_DELAY: std::time::Duration = std::time::Duration::from_secs(5);
41
42/// Time after which the availability of a node gets rechecked
43const DEFAULT_PROBE_RECHECK_THRESHOLD: std::time::Duration = std::time::Duration::from_secs(60);
44
45#[inline]
46const fn default_probing_interval() -> std::time::Duration {
47    DEFAULT_REPEATED_PROBING_DELAY
48}
49
50#[inline]
51const fn default_recheck_threshold() -> std::time::Duration {
52    DEFAULT_PROBE_RECHECK_THRESHOLD
53}
54
55pub struct ImmediateNeighborProber {
56    cfg: ProberConfig,
57}
58
59impl ImmediateNeighborProber {
60    pub fn new(cfg: ProberConfig) -> Self {
61        Self { cfg }
62    }
63}
64
65impl TrafficGeneration for ImmediateNeighborProber {
66    fn build<U>(self, network_graph: U) -> impl futures::Stream<Item = DestinationRouting> + Send
67    where
68        U: NetworkGraphView + Send + Sync + 'static,
69    {
70        // For each probe target a cached version of transport routing is stored
71        let cache_peer_routing: moka::future::Cache<PeerId, DestinationRouting> = moka::future::Cache::builder()
72            .time_to_live(std::time::Duration::from_secs(600))
73            .max_capacity(100_000)
74            .build();
75
76        let cfg = self.cfg;
77
78        futures::stream::repeat(())
79            .filter_map(move |_| {
80                let nodes = network_graph.nodes();
81
82                async move {
83                    hopr_async_runtime::prelude::sleep(cfg.interval).await;
84                    Some(nodes)
85                }
86            })
87            .flatten()
88            .filter_map(move |peer| {
89                let cache_peer_routing = cache_peer_routing.clone();
90
91                async move {
92                    cache_peer_routing
93                        .try_get_with(peer, async move {
94                            Ok::<DestinationRouting, anyhow::Error>(DestinationRouting::Forward {
95                                destination: Box::new(OffchainPublicKey::from_peerid(&peer)?.into()),
96                                pseudonym: Some(HoprPseudonym::random()),
97                                forward_options: RoutingOptions::Hops(0.try_into().expect("0 is a valid u8")),
98                                return_options: Some(RoutingOptions::Hops(0.try_into().expect("0 is a valid u8"))),
99                            })
100                        })
101                        .await
102                        .ok()
103                }
104            })
105    }
106}
107
108#[derive(Clone)]
109pub struct ImmediateNeighborChannelGraph<T> {
110    network: Arc<T>,
111    recheck_threshold: std::time::Duration,
112}
113
114impl<T> ImmediateNeighborChannelGraph<T> {
115    pub fn new(network: T, recheck_threshold: std::time::Duration) -> Self {
116        Self {
117            network: Arc::new(network),
118            recheck_threshold,
119        }
120    }
121}
122
123#[async_trait::async_trait]
124impl<T> hopr_api::ct::NetworkGraphUpdate for ImmediateNeighborChannelGraph<T>
125where
126    T: NetworkObservations + Send + Sync,
127{
128    async fn record<N, P>(
129        &self,
130        telemetry: std::result::Result<hopr_api::ct::Telemetry<N, P>, hopr_api::ct::TrafficGenerationError<P>>,
131    ) where
132        N: hopr_api::ct::MeasurableNeighbor + Send + Clone,
133        P: hopr_api::ct::MeasurablePath + Send + Clone,
134    {
135        match telemetry {
136            Ok(hopr_api::ct::Telemetry::Neighbor(telemetry)) => {
137                tracing::trace!(
138                    peer = %telemetry.peer(),
139                    latency_ms = telemetry.rtt().as_millis(),
140                    "neighbor probe successful"
141                );
142                hopr_api::network::NetworkObservations::update(
143                    self.network.as_ref(),
144                    telemetry.peer(),
145                    Ok(telemetry.rtt() / 2),
146                );
147            }
148            Ok(hopr_api::ct::Telemetry::Loopback(_)) => {
149                tracing::warn!(
150                    reason = "feature not implemented",
151                    "loopback path telemetry not supported yet"
152                );
153            }
154            Err(hopr_api::ct::TrafficGenerationError::ProbeNeighborTimeout(peer)) => {
155                hopr_api::network::NetworkObservations::update(self.network.as_ref(), &peer, Err(()));
156            }
157            Err(hopr_api::ct::TrafficGenerationError::ProbeLoopbackTimeout(_)) => {
158                tracing::warn!(
159                    reason = "feature not implemented",
160                    "loopback path telemetry not supported yet"
161                );
162            }
163        }
164    }
165}
166
167#[async_trait::async_trait]
168impl<T> hopr_api::ct::NetworkGraphView for ImmediateNeighborChannelGraph<T>
169where
170    T: NetworkView + Send + Sync + Clone + 'static,
171{
172    fn nodes(&self) -> futures::stream::BoxStream<'static, PeerId> {
173        let fetcher = self.network.clone();
174        let _recheck_threshold = self.recheck_threshold; // TODO: currently being ignored
175
176        Box::pin(async_stream::stream! {
177            let mut peers: Vec<PeerId> = fetcher.discovered_peers().into_iter().collect();
178            peers.shuffle(&mut hopr_crypto_random::rng());    // shuffle peers to randomize order between rounds
179
180            for peer in peers {
181                yield peer;
182            }
183        })
184    }
185
186    async fn find_routes(&self, _destination: &PeerId, _length: usize) -> Vec<DestinationRouting> {
187        vec![]
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use std::collections::HashSet;
194
195    use futures::{StreamExt, pin_mut};
196    use hopr_api::{
197        Multiaddr,
198        network::{Health, Observable},
199    };
200    use hopr_internal_types::NodeId;
201    use tokio::time::timeout;
202
203    use super::*;
204
205    const TINY_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(20);
206
207    mockall::mock! {
208        Observed {}
209
210        impl Observable for Observed {
211            fn record_probe(&mut self, latency: std::result::Result<std::time::Duration, ()>);
212
213            fn last_update(&self) -> std::time::Duration;
214
215            fn average_latency(&self) -> Option<std::time::Duration>;
216
217            fn average_probe_rate(&self) -> f64;
218
219            fn score(&self) -> f64;
220        }
221    }
222
223    mockall::mock! {
224        ScanInteraction {}
225
226        #[async_trait::async_trait]
227        impl hopr_api::network::NetworkObservations for ScanInteraction {
228            fn update(&self, peer: &PeerId, result: std::result::Result<std::time::Duration, ()>);
229        }
230
231        #[async_trait::async_trait]
232        impl hopr_api::network::NetworkView for ScanInteraction {
233            fn listening_as(&self) -> HashSet<Multiaddr>;
234
235            fn multiaddress_of(&self, peer: &PeerId) -> Option<HashSet<Multiaddr>>;
236
237            fn discovered_peers(&self) -> std::collections::HashSet<PeerId> ;
238
239            fn connected_peers(&self) -> HashSet<PeerId>;
240
241            #[allow(refining_impl_trait)]
242            fn observations_for(&self, peer: &PeerId) -> Option<MockObserved>;
243
244            fn health(&self) -> Health;
245        }
246
247        impl Clone for ScanInteraction {
248            fn clone(&self) -> Self;
249        }
250    }
251
252    #[tokio::test]
253    async fn peers_should_not_be_passed_if_none_are_present() -> anyhow::Result<()> {
254        let mut fetcher = MockScanInteraction::new();
255        fetcher.expect_discovered_peers().returning(HashSet::new);
256
257        let channel_graph = ImmediateNeighborChannelGraph {
258            network: Arc::new(fetcher),
259            recheck_threshold: ProberConfig::default().recheck_threshold,
260        };
261
262        let prober = ImmediateNeighborProber::new(Default::default());
263        let stream = prober.build(channel_graph);
264        pin_mut!(stream);
265
266        assert!(timeout(TINY_TIMEOUT, stream.next()).await.is_err());
267
268        Ok(())
269    }
270
271    lazy_static::lazy_static! {
272        static ref RANDOM_PEERS: HashSet<PeerId> = (1..10).map(|_| {
273            let peer: PeerId = OffchainPublicKey::from_privkey(&hopr_crypto_random::random_bytes::<32>()).unwrap().into();
274            peer
275        }).collect::<HashSet<_>>();
276    }
277
278    #[tokio::test]
279    async fn peers_should_have_randomized_order() -> anyhow::Result<()> {
280        let mut fetcher = MockScanInteraction::new();
281        fetcher.expect_discovered_peers().returning(|| RANDOM_PEERS.clone());
282
283        let channel_graph = ImmediateNeighborChannelGraph {
284            network: Arc::new(fetcher),
285            recheck_threshold: ProberConfig::default().recheck_threshold,
286        };
287
288        let prober = ImmediateNeighborProber::new(ProberConfig {
289            interval: std::time::Duration::from_millis(1),
290            ..Default::default()
291        });
292        let stream = prober.build(channel_graph);
293        pin_mut!(stream);
294
295        let actual = timeout(
296            TINY_TIMEOUT * 20,
297            stream
298                .take(RANDOM_PEERS.len())
299                .map(|routing| match routing {
300                    DestinationRouting::Forward { destination, .. } => {
301                        if let NodeId::Offchain(peer_key) = destination.as_ref() {
302                            PeerId::from(peer_key)
303                        } else {
304                            panic!("expected offchain destination, got chain address");
305                        }
306                    }
307                    _ => panic!("expected Forward routing"),
308                })
309                .collect::<Vec<_>>(),
310        )
311        .await?;
312
313        assert_eq!(actual.len(), RANDOM_PEERS.len());
314        assert!(!actual.iter().zip(RANDOM_PEERS.iter()).all(|(a, b)| a == b));
315
316        Ok(())
317    }
318
319    #[tokio::test]
320    async fn peers_should_be_generated_in_multiple_rounds_as_long_as_they_are_available() -> anyhow::Result<()> {
321        let cfg = ProberConfig {
322            interval: std::time::Duration::from_millis(1),
323            recheck_threshold: std::time::Duration::from_millis(1000),
324        };
325
326        let mut fetcher = MockScanInteraction::new();
327        fetcher.expect_discovered_peers().times(2).returning(|| {
328            let peer: PeerId = OffchainPublicKey::from_privkey(&hopr_crypto_random::random_bytes::<32>())
329                .unwrap()
330                .into();
331            HashSet::from([peer])
332        });
333        fetcher.expect_discovered_peers().returning(HashSet::new);
334
335        let channel_graph = ImmediateNeighborChannelGraph {
336            network: Arc::new(fetcher),
337            recheck_threshold: cfg.recheck_threshold,
338        };
339
340        let prober = ImmediateNeighborProber::new(cfg);
341        let stream = prober.build(channel_graph);
342        pin_mut!(stream);
343
344        assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
345        assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
346        assert!(timeout(TINY_TIMEOUT, stream.next()).await.is_err());
347
348        Ok(())
349    }
350}