hopr_transport_probe/
neighbors.rs

1use std::sync::Arc;
2
3use async_stream::stream;
4use hopr_crypto_random::Randomizable;
5use hopr_crypto_types::types::OffchainPublicKey;
6use hopr_internal_types::protocol::HoprPseudonym;
7use hopr_network_types::types::{DestinationRouting, RoutingOptions};
8use libp2p_identity::PeerId;
9use rand::seq::SliceRandom;
10
11use crate::{
12    config::ProbeConfig,
13    traits::{PeerDiscoveryFetch, ProbeStatusUpdate, TrafficGeneration},
14};
15
16struct TelemetrySink<T> {
17    prober: Arc<T>,
18}
19
20impl<T> Clone for TelemetrySink<T> {
21    fn clone(&self) -> Self {
22        Self {
23            prober: self.prober.clone(),
24        }
25    }
26}
27
28impl<T> futures::Sink<crate::errors::Result<crate::types::Telemetry>> for TelemetrySink<T>
29where
30    T: ProbeStatusUpdate + Send + Sync + 'static,
31{
32    type Error = std::convert::Infallible;
33
34    fn poll_ready(
35        self: std::pin::Pin<&mut Self>,
36        _cx: &mut std::task::Context<'_>,
37    ) -> std::task::Poll<Result<(), Self::Error>> {
38        std::task::Poll::Ready(Ok(()))
39    }
40
41    fn start_send(
42        self: std::pin::Pin<&mut Self>,
43        item: crate::errors::Result<crate::types::Telemetry>,
44    ) -> Result<(), Self::Error> {
45        let prober = self.prober.clone();
46        hopr_async_runtime::prelude::spawn(async move {
47            match item {
48                Ok(telemetry) => match telemetry {
49                    crate::types::Telemetry::Loopback(_path_telemetry) => {
50                        tracing::warn!(
51                            reason = "feature not implemented",
52                            "loopback path telemetry not supported yet"
53                        );
54                    }
55                    crate::types::Telemetry::Neighbor(neighbor_telemetry) => {
56                        tracing::trace!(
57                            peer = %neighbor_telemetry.peer,
58                            latency_ms = neighbor_telemetry.rtt.as_millis(),
59                            "neighbor probe successful"
60                        );
61                        prober
62                            .on_finished(&neighbor_telemetry.peer, &Ok(neighbor_telemetry.rtt))
63                            .await;
64                    }
65                },
66                Err(error) => match error {
67                    crate::errors::ProbeError::ProbeNeighborTimeout(peer) => {
68                        tracing::trace!(
69                            %peer,
70                            "neighbor probe timed out"
71                        );
72                        prober
73                            .on_finished(&peer, &Err(crate::errors::ProbeError::ProbeNeighborTimeout(peer)))
74                            .await;
75                    }
76                    crate::errors::ProbeError::ProbeLoopbackTimeout(_telemetry) => {
77                        tracing::warn!(
78                            reason = "feature not implemented",
79                            "loopback path telemetry not supported yet"
80                        );
81                    }
82                    _ => tracing::error!(%error, "unknown error on probe telemetry result evaluation"),
83                },
84            }
85        });
86        Ok(())
87    }
88
89    fn poll_flush(
90        self: std::pin::Pin<&mut Self>,
91        _cx: &mut std::task::Context<'_>,
92    ) -> std::task::Poll<Result<(), Self::Error>> {
93        std::task::Poll::Ready(Ok(()))
94    }
95
96    fn poll_close(
97        self: std::pin::Pin<&mut Self>,
98        _cx: &mut std::task::Context<'_>,
99    ) -> std::task::Poll<Result<(), Self::Error>> {
100        std::task::Poll::Ready(Ok(()))
101    }
102}
103
104pub struct ImmediateNeighborProber<T> {
105    cfg: ProbeConfig,
106    prober: Arc<T>,
107}
108
109impl<T> ImmediateNeighborProber<T> {
110    pub fn new(cfg: ProbeConfig, prober: T) -> Self {
111        Self {
112            cfg,
113            prober: Arc::new(prober),
114        }
115    }
116}
117
118impl<T> TrafficGeneration for ImmediateNeighborProber<T>
119where
120    T: PeerDiscoveryFetch + ProbeStatusUpdate + Send + Sync + 'static,
121{
122    fn build(
123        self,
124    ) -> (
125        impl futures::Stream<Item = DestinationRouting> + Send,
126        impl futures::Sink<crate::errors::Result<crate::types::Telemetry>, Error = impl std::error::Error>
127        + Send
128        + Sync
129        + Clone
130        + 'static,
131    ) {
132        // For each probe target a cached version of transport routing is stored
133        let cache_peer_routing: moka::future::Cache<PeerId, DestinationRouting> = moka::future::Cache::builder()
134            .time_to_live(std::time::Duration::from_secs(600))
135            .max_capacity(100_000)
136            .build();
137
138        let prober = self.prober.clone();
139
140        let route_stream = stream! {
141            let mut rng = hopr_crypto_random::rng();
142            loop {
143                let now = std::time::SystemTime::now();
144
145                let mut peers = prober.get_peers(now.checked_sub(self.cfg.recheck_threshold).unwrap_or(now)).await;
146                peers.shuffle(&mut rng);    // shuffle peers to randomize order between rounds
147
148                for peer in peers {
149                    if let Ok(routing) = cache_peer_routing
150                        .try_get_with(peer, async move {
151                            Ok::<DestinationRouting, anyhow::Error>(DestinationRouting::Forward {
152                                destination: Box::new(OffchainPublicKey::from_peerid(&peer)?.into()),
153                                pseudonym: Some(HoprPseudonym::random()),
154                                forward_options: RoutingOptions::Hops(0.try_into().expect("0 is a valid u8")),
155                                return_options: Some(RoutingOptions::Hops(0.try_into().expect("0 is a valid u8"))),
156                            })
157                        })
158                        .await {
159                            yield routing;
160                        }
161                }
162
163                hopr_async_runtime::prelude::sleep(self.cfg.interval).await;
164            }
165        };
166
167        let result_sink = TelemetrySink { prober: self.prober };
168
169        (route_stream, result_sink)
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use futures::{StreamExt, pin_mut};
176    use hopr_internal_types::NodeId;
177    use tokio::time::timeout;
178
179    use super::*;
180
181    const TINY_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(20);
182
183    mockall::mock! {
184        ScanInteraction {}
185
186        #[async_trait::async_trait]
187        impl ProbeStatusUpdate for ScanInteraction {
188            async fn on_finished(&self, peer: &PeerId, result: &crate::errors::Result<std::time::Duration>);
189        }
190
191        #[async_trait::async_trait]
192        impl PeerDiscoveryFetch for ScanInteraction {
193            async fn get_peers(&self, from_timestamp: std::time::SystemTime) -> Vec<PeerId>;
194        }
195    }
196
197    #[tokio::test]
198    async fn peers_should_not_be_passed_if_none_are_present() -> anyhow::Result<()> {
199        let mut fetcher = MockScanInteraction::new();
200        fetcher.expect_get_peers().returning(|_| vec![]);
201
202        let prober = ImmediateNeighborProber::new(Default::default(), fetcher);
203        let (stream, _sink) = prober.build();
204        pin_mut!(stream);
205
206        assert!(timeout(TINY_TIMEOUT, stream.next()).await.is_err());
207
208        Ok(())
209    }
210
211    lazy_static::lazy_static! {
212        static ref RANDOM_PEERS: Vec<PeerId> = (1..10).map(|_| {
213            let peer: PeerId = OffchainPublicKey::from_privkey(&hopr_crypto_random::random_bytes::<32>()).unwrap().into();
214            peer
215        }).collect::<Vec<_>>();
216    }
217
218    #[tokio::test]
219    async fn peers_should_have_randomized_order() -> anyhow::Result<()> {
220        let mut fetcher = MockScanInteraction::new();
221        fetcher.expect_get_peers().returning(|_| RANDOM_PEERS.clone());
222
223        let prober = ImmediateNeighborProber::new(Default::default(), fetcher);
224        let (stream, _sink) = prober.build();
225        pin_mut!(stream);
226
227        let actual = timeout(
228            TINY_TIMEOUT * 20,
229            stream
230                .take(RANDOM_PEERS.len())
231                .map(|routing| match routing {
232                    DestinationRouting::Forward { destination, .. } => {
233                        if let NodeId::Offchain(peer_key) = destination.as_ref() {
234                            PeerId::from(peer_key)
235                        } else {
236                            panic!("expected offchain destination, got chain address");
237                        }
238                    }
239                    _ => panic!("expected Forward routing"),
240                })
241                .collect::<Vec<_>>(),
242        )
243        .await?;
244
245        assert_eq!(actual.len(), RANDOM_PEERS.len());
246        assert!(!actual.iter().zip(RANDOM_PEERS.iter()).all(|(a, b)| a == b));
247
248        Ok(())
249    }
250
251    #[tokio::test]
252    async fn peers_should_be_generated_in_multiple_rounds_as_long_as_they_are_available() -> anyhow::Result<()> {
253        let cfg = ProbeConfig {
254            interval: std::time::Duration::from_millis(1),
255            recheck_threshold: std::time::Duration::from_millis(1000),
256            ..Default::default()
257        };
258
259        let mut fetcher = MockScanInteraction::new();
260        fetcher.expect_get_peers().times(2).returning(|_| {
261            let peer: PeerId = OffchainPublicKey::from_privkey(&hopr_crypto_random::random_bytes::<32>())
262                .unwrap()
263                .into();
264            vec![peer]
265        });
266        fetcher.expect_get_peers().returning(|_| vec![]);
267
268        let prober = ImmediateNeighborProber::new(cfg, fetcher);
269        let (stream, _sink) = prober.build();
270        pin_mut!(stream);
271
272        assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
273        assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
274        assert!(timeout(TINY_TIMEOUT, stream.next()).await.is_err());
275
276        Ok(())
277    }
278}