hopr_ct_telemetry/
lib.rs

1use std::sync::Arc;
2
3use futures::StreamExt;
4use hopr_api::{
5    PeerId,
6    ct::{DestinationRouting, NetworkGraphView, TrafficGeneration},
7    network::{NetworkObservations, NetworkView},
8};
9use hopr_crypto_random::Randomizable;
10use hopr_crypto_types::types::OffchainPublicKey;
11use hopr_internal_types::protocol::HoprPseudonym;
12use hopr_network_types::types::RoutingOptions;
13use rand::seq::SliceRandom;
14#[cfg(feature = "serde")]
15use serde::{Deserialize, Serialize};
16use validator::Validate;
17
18/// Configuration for the probing mechanism
19#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault, Validate)]
20#[cfg_attr(feature = "serde", derive(Serialize, Deserialize), serde(deny_unknown_fields))]
21pub struct ProberConfig {
22    /// The delay between individual probing rounds for neighbor discovery
23    #[cfg_attr(
24        feature = "serde",
25        serde(default = "default_probing_interval", with = "humantime_serde")
26    )]
27    #[default(default_probing_interval())]
28    pub interval: std::time::Duration,
29
30    /// The time threshold after which it is reasonable to recheck the nearest neighbor
31    #[cfg_attr(
32        feature = "serde",
33        serde(default = "default_recheck_threshold", with = "humantime_serde")
34    )]
35    #[default(default_recheck_threshold())]
36    pub recheck_threshold: std::time::Duration,
37}
38
39/// Delay before repeating probing rounds, must include enough time to traverse NATs
40const DEFAULT_REPEATED_PROBING_DELAY: std::time::Duration = std::time::Duration::from_secs(5);
41
42/// Time after which the availability of a node gets rechecked
43const DEFAULT_PROBE_RECHECK_THRESHOLD: std::time::Duration = std::time::Duration::from_secs(60);
44
45#[inline]
46const fn default_probing_interval() -> std::time::Duration {
47    DEFAULT_REPEATED_PROBING_DELAY
48}
49
50#[inline]
51const fn default_recheck_threshold() -> std::time::Duration {
52    DEFAULT_PROBE_RECHECK_THRESHOLD
53}
54
55pub struct ImmediateNeighborProber {
56    cfg: ProberConfig,
57}
58
59impl ImmediateNeighborProber {
60    pub fn new(cfg: ProberConfig) -> Self {
61        Self { cfg }
62    }
63}
64
65impl TrafficGeneration for ImmediateNeighborProber {
66    fn build<U>(self, network_graph: U) -> impl futures::Stream<Item = DestinationRouting> + Send
67    where
68        U: NetworkGraphView + Send + Sync + 'static,
69    {
70        // For each probe target a cached version of transport routing is stored
71        let cache_peer_routing: moka::future::Cache<PeerId, DestinationRouting> = moka::future::Cache::builder()
72            .time_to_live(std::time::Duration::from_secs(600))
73            .max_capacity(100_000)
74            .build();
75
76        let cfg = self.cfg;
77
78        futures::stream::repeat(())
79            .filter_map(move |_| {
80                let nodes = network_graph.nodes();
81
82                async move {
83                    hopr_async_runtime::prelude::sleep(cfg.interval).await;
84                    Some(nodes)
85                }
86            })
87            .flatten()
88            .filter_map(move |peer| {
89                let cache_peer_routing = cache_peer_routing.clone();
90
91                async move {
92                    cache_peer_routing
93                        .try_get_with(peer, async move {
94                            Ok::<DestinationRouting, anyhow::Error>(DestinationRouting::Forward {
95                                destination: Box::new(OffchainPublicKey::from_peerid(&peer)?.into()),
96                                pseudonym: Some(HoprPseudonym::random()),
97                                forward_options: RoutingOptions::Hops(0.try_into().expect("0 is a valid u8")),
98                                return_options: Some(RoutingOptions::Hops(0.try_into().expect("0 is a valid u8"))),
99                            })
100                        })
101                        .await
102                        .ok()
103                }
104            })
105    }
106}
107
108#[derive(Clone)]
109pub struct ImmediateNeighborChannelGraph<T> {
110    network: Arc<T>,
111    recheck_threshold: std::time::Duration,
112}
113
114impl<T> ImmediateNeighborChannelGraph<T> {
115    pub fn new(network: T, recheck_threshold: std::time::Duration) -> Self {
116        Self {
117            network: Arc::new(network),
118            recheck_threshold,
119        }
120    }
121}
122
123#[async_trait::async_trait]
124impl<T> hopr_api::ct::NetworkGraphUpdate for ImmediateNeighborChannelGraph<T>
125where
126    T: NetworkObservations + Send + Sync,
127{
128    async fn record<N, P>(
129        &self,
130        telemetry: std::result::Result<hopr_api::ct::Telemetry<N, P>, hopr_api::ct::TrafficGenerationError<P>>,
131    ) where
132        N: hopr_api::ct::MeasurableNeighbor + Send + Clone,
133        P: hopr_api::ct::MeasurablePath + Send + Clone,
134    {
135        match telemetry {
136            Ok(hopr_api::ct::Telemetry::Neighbor(telemetry)) => {
137                tracing::trace!(
138                    peer = %telemetry.peer(),
139                    latency_ms = telemetry.rtt().as_millis(),
140                    "neighbor probe successful"
141                );
142                hopr_api::network::NetworkObservations::update(
143                    self.network.as_ref(),
144                    telemetry.peer(),
145                    Ok(telemetry.rtt() / 2),
146                );
147            }
148            Ok(hopr_api::ct::Telemetry::Loopback(_)) => {
149                tracing::warn!(
150                    reason = "feature not implemented",
151                    "loopback path telemetry not supported yet"
152                );
153            }
154            Err(hopr_api::ct::TrafficGenerationError::ProbeNeighborTimeout(peer)) => {
155                hopr_api::network::NetworkObservations::update(self.network.as_ref(), &peer, Err(()));
156            }
157            Err(hopr_api::ct::TrafficGenerationError::ProbeLoopbackTimeout(_)) => {
158                tracing::warn!(
159                    reason = "feature not implemented",
160                    "loopback path telemetry not supported yet"
161                );
162            }
163        }
164    }
165}
166
167#[async_trait::async_trait]
168impl<T> hopr_api::ct::NetworkGraphView for ImmediateNeighborChannelGraph<T>
169where
170    T: NetworkView + Send + Sync + Clone + 'static,
171{
172    fn nodes(&self) -> futures::stream::BoxStream<'static, PeerId> {
173        let fetcher = self.network.clone();
174        let _recheck_threshold = self.recheck_threshold; // TODO: currently being ignored
175        let mut rng = hopr_crypto_random::rng();
176
177        Box::pin(async_stream::stream! {
178            let mut peers: Vec<PeerId> = fetcher.discovered_peers().into_iter().collect();
179            peers.shuffle(&mut rng);    // shuffle peers to randomize order between rounds
180
181            for peer in peers {
182                yield peer;
183            }
184        })
185    }
186
187    async fn find_routes(&self, _destination: &PeerId, _length: usize) -> Vec<DestinationRouting> {
188        vec![]
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use std::collections::HashSet;
195
196    use futures::{StreamExt, pin_mut};
197    use hopr_api::{
198        Multiaddr,
199        network::{Health, Observable},
200    };
201    use hopr_internal_types::NodeId;
202    use tokio::time::timeout;
203
204    use super::*;
205
206    const TINY_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(20);
207
208    mockall::mock! {
209        Observed {}
210
211        impl Observable for Observed {
212            fn record_probe(&mut self, latency: std::result::Result<std::time::Duration, ()>);
213
214            fn last_update(&self) -> std::time::Duration;
215
216            fn average_latency(&self) -> Option<std::time::Duration>;
217
218            fn average_probe_rate(&self) -> f64;
219
220            fn score(&self) -> f64;
221        }
222    }
223
224    mockall::mock! {
225        ScanInteraction {}
226
227        #[async_trait::async_trait]
228        impl hopr_api::network::NetworkObservations for ScanInteraction {
229            fn update(&self, peer: &PeerId, result: std::result::Result<std::time::Duration, ()>);
230        }
231
232        #[async_trait::async_trait]
233        impl hopr_api::network::NetworkView for ScanInteraction {
234            fn listening_as(&self) -> HashSet<Multiaddr>;
235
236            fn multiaddress_of(&self, peer: &PeerId) -> Option<HashSet<Multiaddr>>;
237
238            fn discovered_peers(&self) -> std::collections::HashSet<PeerId> ;
239
240            fn connected_peers(&self) -> HashSet<PeerId>;
241
242            #[allow(refining_impl_trait)]
243            fn observations_for(&self, peer: &PeerId) -> Option<MockObserved>;
244
245            fn health(&self) -> Health;
246        }
247
248        impl Clone for ScanInteraction {
249            fn clone(&self) -> Self;
250        }
251    }
252
253    #[tokio::test]
254    async fn peers_should_not_be_passed_if_none_are_present() -> anyhow::Result<()> {
255        let mut fetcher = MockScanInteraction::new();
256        fetcher.expect_discovered_peers().returning(|| HashSet::new());
257
258        let channel_graph = ImmediateNeighborChannelGraph {
259            network: Arc::new(fetcher),
260            recheck_threshold: ProberConfig::default().recheck_threshold,
261        };
262
263        let prober = ImmediateNeighborProber::new(Default::default());
264        let stream = prober.build(channel_graph);
265        pin_mut!(stream);
266
267        assert!(timeout(TINY_TIMEOUT, stream.next()).await.is_err());
268
269        Ok(())
270    }
271
272    lazy_static::lazy_static! {
273        static ref RANDOM_PEERS: HashSet<PeerId> = (1..10).map(|_| {
274            let peer: PeerId = OffchainPublicKey::from_privkey(&hopr_crypto_random::random_bytes::<32>()).unwrap().into();
275            peer
276        }).collect::<HashSet<_>>();
277    }
278
279    #[tokio::test]
280    async fn peers_should_have_randomized_order() -> anyhow::Result<()> {
281        let mut fetcher = MockScanInteraction::new();
282        fetcher.expect_discovered_peers().returning(|| RANDOM_PEERS.clone());
283
284        let channel_graph = ImmediateNeighborChannelGraph {
285            network: Arc::new(fetcher),
286            recheck_threshold: ProberConfig::default().recheck_threshold,
287        };
288
289        let prober = ImmediateNeighborProber::new(ProberConfig {
290            interval: std::time::Duration::from_millis(1),
291            ..Default::default()
292        });
293        let stream = prober.build(channel_graph);
294        pin_mut!(stream);
295
296        let actual = timeout(
297            TINY_TIMEOUT * 20,
298            stream
299                .take(RANDOM_PEERS.len())
300                .map(|routing| match routing {
301                    DestinationRouting::Forward { destination, .. } => {
302                        if let NodeId::Offchain(peer_key) = destination.as_ref() {
303                            PeerId::from(peer_key)
304                        } else {
305                            panic!("expected offchain destination, got chain address");
306                        }
307                    }
308                    _ => panic!("expected Forward routing"),
309                })
310                .collect::<Vec<_>>(),
311        )
312        .await?;
313
314        assert_eq!(actual.len(), RANDOM_PEERS.len());
315        assert!(!actual.iter().zip(RANDOM_PEERS.iter()).all(|(a, b)| a == b));
316
317        Ok(())
318    }
319
320    #[tokio::test]
321    async fn peers_should_be_generated_in_multiple_rounds_as_long_as_they_are_available() -> anyhow::Result<()> {
322        let cfg = ProberConfig {
323            interval: std::time::Duration::from_millis(1),
324            recheck_threshold: std::time::Duration::from_millis(1000),
325            ..Default::default()
326        };
327
328        let mut fetcher = MockScanInteraction::new();
329        fetcher.expect_discovered_peers().times(2).returning(|| {
330            let peer: PeerId = OffchainPublicKey::from_privkey(&hopr_crypto_random::random_bytes::<32>())
331                .unwrap()
332                .into();
333            HashSet::from([peer])
334        });
335        fetcher.expect_discovered_peers().returning(|| HashSet::new());
336
337        let channel_graph = ImmediateNeighborChannelGraph {
338            network: Arc::new(fetcher),
339            recheck_threshold: cfg.recheck_threshold,
340        };
341
342        let prober = ImmediateNeighborProber::new(cfg);
343        let stream = prober.build(channel_graph);
344        pin_mut!(stream);
345
346        assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
347        assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
348        assert!(timeout(TINY_TIMEOUT, stream.next()).await.is_err());
349
350        Ok(())
351    }
352}