hopr_transport_probe/
neighbors.rs1use std::sync::Arc;
2
3use async_stream::stream;
4use hopr_crypto_random::Randomizable;
5use hopr_crypto_types::types::OffchainPublicKey;
6use hopr_internal_types::protocol::HoprPseudonym;
7use hopr_network_types::types::{DestinationRouting, RoutingOptions};
8use libp2p_identity::PeerId;
9use rand::seq::SliceRandom;
10
11use crate::{
12 config::ProbeConfig,
13 traits::{PeerDiscoveryFetch, ProbeStatusUpdate, TrafficGeneration},
14};
15
16struct TelemetrySink<T> {
17 prober: Arc<T>,
18}
19
20impl<T> Clone for TelemetrySink<T> {
21 fn clone(&self) -> Self {
22 Self {
23 prober: self.prober.clone(),
24 }
25 }
26}
27
28impl<T> futures::Sink<crate::errors::Result<crate::types::Telemetry>> for TelemetrySink<T>
29where
30 T: ProbeStatusUpdate + Send + Sync + 'static,
31{
32 type Error = std::convert::Infallible;
33
34 fn poll_ready(
35 self: std::pin::Pin<&mut Self>,
36 _cx: &mut std::task::Context<'_>,
37 ) -> std::task::Poll<Result<(), Self::Error>> {
38 std::task::Poll::Ready(Ok(()))
39 }
40
41 fn start_send(
42 self: std::pin::Pin<&mut Self>,
43 item: crate::errors::Result<crate::types::Telemetry>,
44 ) -> Result<(), Self::Error> {
45 let prober = self.prober.clone();
46 hopr_async_runtime::prelude::spawn(async move {
47 match item {
48 Ok(telemetry) => match telemetry {
49 crate::types::Telemetry::Loopback(_path_telemetry) => {
50 tracing::warn!(
51 reason = "feature not implemented",
52 "loopback path telemetry not supported yet"
53 );
54 }
55 crate::types::Telemetry::Neighbor(neighbor_telemetry) => {
56 tracing::trace!(
57 peer = %neighbor_telemetry.peer,
58 latency_ms = neighbor_telemetry.rtt.as_millis(),
59 "neighbor probe successful"
60 );
61 prober
62 .on_finished(&neighbor_telemetry.peer, &Ok(neighbor_telemetry.rtt))
63 .await;
64 }
65 },
66 Err(error) => match error {
67 crate::errors::ProbeError::ProbeNeighborTimeout(peer) => {
68 tracing::trace!(
69 %peer,
70 "neighbor probe timed out"
71 );
72 prober
73 .on_finished(&peer, &Err(crate::errors::ProbeError::ProbeNeighborTimeout(peer)))
74 .await;
75 }
76 crate::errors::ProbeError::ProbeLoopbackTimeout(_telemetry) => {
77 tracing::warn!(
78 reason = "feature not implemented",
79 "loopback path telemetry not supported yet"
80 );
81 }
82 _ => tracing::error!(%error, "unknown error on probe telemetry result evaluation"),
83 },
84 }
85 });
86 Ok(())
87 }
88
89 fn poll_flush(
90 self: std::pin::Pin<&mut Self>,
91 _cx: &mut std::task::Context<'_>,
92 ) -> std::task::Poll<Result<(), Self::Error>> {
93 std::task::Poll::Ready(Ok(()))
94 }
95
96 fn poll_close(
97 self: std::pin::Pin<&mut Self>,
98 _cx: &mut std::task::Context<'_>,
99 ) -> std::task::Poll<Result<(), Self::Error>> {
100 std::task::Poll::Ready(Ok(()))
101 }
102}
103
104pub struct ImmediateNeighborProber<T> {
105 cfg: ProbeConfig,
106 prober: Arc<T>,
107}
108
109impl<T> ImmediateNeighborProber<T> {
110 pub fn new(cfg: ProbeConfig, prober: T) -> Self {
111 Self {
112 cfg,
113 prober: Arc::new(prober),
114 }
115 }
116}
117
118impl<T> TrafficGeneration for ImmediateNeighborProber<T>
119where
120 T: PeerDiscoveryFetch + ProbeStatusUpdate + Send + Sync + 'static,
121{
122 fn build(
123 self,
124 ) -> (
125 impl futures::Stream<Item = DestinationRouting> + Send,
126 impl futures::Sink<crate::errors::Result<crate::types::Telemetry>, Error = impl std::error::Error>
127 + Send
128 + Sync
129 + Clone
130 + 'static,
131 ) {
132 let cache_peer_routing: moka::future::Cache<PeerId, DestinationRouting> = moka::future::Cache::builder()
134 .time_to_live(std::time::Duration::from_secs(600))
135 .max_capacity(100_000)
136 .build();
137
138 let prober = self.prober.clone();
139
140 let route_stream = stream! {
141 let mut rng = hopr_crypto_random::rng();
142 loop {
143 let now = std::time::SystemTime::now();
144
145 let mut peers = prober.get_peers(now.checked_sub(self.cfg.recheck_threshold).unwrap_or(now)).await;
146 peers.shuffle(&mut rng); for peer in peers {
149 if let Ok(routing) = cache_peer_routing
150 .try_get_with(peer, async move {
151 Ok::<DestinationRouting, anyhow::Error>(DestinationRouting::Forward {
152 destination: Box::new(OffchainPublicKey::from_peerid(&peer)?.into()),
153 pseudonym: Some(HoprPseudonym::random()),
154 forward_options: RoutingOptions::Hops(0.try_into().expect("0 is a valid u8")),
155 return_options: Some(RoutingOptions::Hops(0.try_into().expect("0 is a valid u8"))),
156 })
157 })
158 .await {
159 yield routing;
160 }
161 }
162
163 hopr_async_runtime::prelude::sleep(self.cfg.interval).await;
164 }
165 };
166
167 let result_sink = TelemetrySink { prober: self.prober };
168
169 (route_stream, result_sink)
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use futures::{StreamExt, pin_mut};
176 use hopr_internal_types::NodeId;
177 use tokio::time::timeout;
178
179 use super::*;
180
181 const TINY_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(20);
182
183 mockall::mock! {
184 ScanInteraction {}
185
186 #[async_trait::async_trait]
187 impl ProbeStatusUpdate for ScanInteraction {
188 async fn on_finished(&self, peer: &PeerId, result: &crate::errors::Result<std::time::Duration>);
189 }
190
191 #[async_trait::async_trait]
192 impl PeerDiscoveryFetch for ScanInteraction {
193 async fn get_peers(&self, from_timestamp: std::time::SystemTime) -> Vec<PeerId>;
194 }
195 }
196
197 #[tokio::test]
198 async fn peers_should_not_be_passed_if_none_are_present() -> anyhow::Result<()> {
199 let mut fetcher = MockScanInteraction::new();
200 fetcher.expect_get_peers().returning(|_| vec![]);
201
202 let prober = ImmediateNeighborProber::new(Default::default(), fetcher);
203 let (stream, _sink) = prober.build();
204 pin_mut!(stream);
205
206 assert!(timeout(TINY_TIMEOUT, stream.next()).await.is_err());
207
208 Ok(())
209 }
210
211 lazy_static::lazy_static! {
212 static ref RANDOM_PEERS: Vec<PeerId> = (1..10).map(|_| {
213 let peer: PeerId = OffchainPublicKey::from_privkey(&hopr_crypto_random::random_bytes::<32>()).unwrap().into();
214 peer
215 }).collect::<Vec<_>>();
216 }
217
218 #[tokio::test]
219 async fn peers_should_have_randomized_order() -> anyhow::Result<()> {
220 let mut fetcher = MockScanInteraction::new();
221 fetcher.expect_get_peers().returning(|_| RANDOM_PEERS.clone());
222
223 let prober = ImmediateNeighborProber::new(Default::default(), fetcher);
224 let (stream, _sink) = prober.build();
225 pin_mut!(stream);
226
227 let actual = timeout(
228 TINY_TIMEOUT * 20,
229 stream
230 .take(RANDOM_PEERS.len())
231 .map(|routing| match routing {
232 DestinationRouting::Forward { destination, .. } => {
233 if let NodeId::Offchain(peer_key) = destination.as_ref() {
234 PeerId::from(peer_key)
235 } else {
236 panic!("expected offchain destination, got chain address");
237 }
238 }
239 _ => panic!("expected Forward routing"),
240 })
241 .collect::<Vec<_>>(),
242 )
243 .await?;
244
245 assert_eq!(actual.len(), RANDOM_PEERS.len());
246 assert!(!actual.iter().zip(RANDOM_PEERS.iter()).all(|(a, b)| a == b));
247
248 Ok(())
249 }
250
251 #[tokio::test]
252 async fn peers_should_be_generated_in_multiple_rounds_as_long_as_they_are_available() -> anyhow::Result<()> {
253 let cfg = ProbeConfig {
254 interval: std::time::Duration::from_millis(1),
255 recheck_threshold: std::time::Duration::from_millis(1000),
256 ..Default::default()
257 };
258
259 let mut fetcher = MockScanInteraction::new();
260 fetcher.expect_get_peers().times(2).returning(|_| {
261 let peer: PeerId = OffchainPublicKey::from_privkey(&hopr_crypto_random::random_bytes::<32>())
262 .unwrap()
263 .into();
264 vec![peer]
265 });
266 fetcher.expect_get_peers().returning(|_| vec![]);
267
268 let prober = ImmediateNeighborProber::new(cfg, fetcher);
269 let (stream, _sink) = prober.build();
270 pin_mut!(stream);
271
272 assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
273 assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
274 assert!(timeout(TINY_TIMEOUT, stream.next()).await.is_err());
275
276 Ok(())
277 }
278}