Skip to main content

hopr_ct_immediate/
lib.rs

1use futures::{StreamExt, stream::BoxStream};
2use hopr_api::{
3    ct::{CoverTrafficGeneration, ProbeRouting, ProbingTrafficGeneration},
4    graph::{NetworkGraphTraverse, NetworkGraphView},
5    types::{
6        crypto::types::OffchainPublicKey,
7        crypto_random::Randomizable,
8        internal::{
9            protocol::HoprPseudonym,
10            routing::{DestinationRouting, RoutingOptions},
11        },
12    },
13};
14#[cfg(feature = "serde")]
15use serde::{Deserialize, Serialize};
16use validator::Validate;
17
18/// Configuration for the probing mechanism
19#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault, Validate)]
20#[cfg_attr(feature = "serde", derive(Serialize, Deserialize), serde(deny_unknown_fields))]
21pub struct ProberConfig {
22    /// The delay between individual probing rounds for neighbor discovery
23    #[cfg_attr(
24        feature = "serde",
25        serde(default = "default_probing_interval", with = "humantime_serde")
26    )]
27    #[default(default_probing_interval())]
28    pub interval: std::time::Duration,
29
30    /// The time threshold after which it is reasonable to recheck the nearest neighbor
31    #[cfg_attr(
32        feature = "serde",
33        serde(default = "default_recheck_threshold", with = "humantime_serde")
34    )]
35    #[default(default_recheck_threshold())]
36    pub recheck_threshold: std::time::Duration,
37}
38
39/// Delay before repeating probing rounds, must include enough time to traverse NATs
40const DEFAULT_REPEATED_PROBING_DELAY: std::time::Duration = std::time::Duration::from_secs(5);
41
42/// Time after which the availability of a node gets rechecked
43const DEFAULT_PROBE_RECHECK_THRESHOLD: std::time::Duration = std::time::Duration::from_secs(60);
44
45#[inline]
46const fn default_probing_interval() -> std::time::Duration {
47    DEFAULT_REPEATED_PROBING_DELAY
48}
49
50#[inline]
51const fn default_recheck_threshold() -> std::time::Duration {
52    DEFAULT_PROBE_RECHECK_THRESHOLD
53}
54
55pub struct ImmediateNeighborProber<U> {
56    cfg: ProberConfig,
57    graph: U,
58}
59
60impl<U> ImmediateNeighborProber<U> {
61    pub fn new(cfg: ProberConfig, graph: U) -> Self {
62        Self { cfg, graph }
63    }
64}
65
66impl<U> CoverTrafficGeneration for ImmediateNeighborProber<U>
67where
68    U: NetworkGraphTraverse<NodeId = OffchainPublicKey> + Clone + Send + Sync + 'static,
69{
70    fn build(&self) -> BoxStream<'static, DestinationRouting> {
71        // Cover traffic not currently generating any data
72        Box::pin(futures::stream::empty())
73    }
74}
75
76impl<U> ProbingTrafficGeneration for ImmediateNeighborProber<U>
77where
78    U: NetworkGraphView<NodeId = OffchainPublicKey> + Clone + Send + Sync + 'static,
79{
80    fn build(&self) -> BoxStream<'static, ProbeRouting> {
81        // For each probe target a cached version of transport routing is stored
82        let cache_peer_routing: moka::future::Cache<OffchainPublicKey, ProbeRouting> = moka::future::Cache::builder()
83            .time_to_live(std::time::Duration::from_secs(600))
84            .max_capacity(100_000)
85            .build();
86
87        let cfg = self.cfg;
88        let graph = self.graph.clone();
89
90        futures::stream::repeat(())
91            .filter_map(move |_| {
92                let nodes = graph.nodes();
93
94                async move {
95                    hopr_async_runtime::prelude::sleep(cfg.interval).await;
96                    Some(nodes)
97                }
98            })
99            .flatten()
100            .filter_map(move |peer| {
101                let cache_peer_routing = cache_peer_routing.clone();
102
103                async move {
104                    cache_peer_routing
105                        .try_get_with(peer, async move {
106                            Ok::<ProbeRouting, anyhow::Error>(ProbeRouting::Neighbor(DestinationRouting::Forward {
107                                destination: Box::new(peer.into()),
108                                pseudonym: Some(HoprPseudonym::random()),
109                                forward_options: RoutingOptions::Hops(0.try_into().expect("0 is a valid u8")),
110                                return_options: Some(RoutingOptions::Hops(0.try_into().expect("0 is a valid u8"))),
111                            }))
112                        })
113                        .await
114                        .ok()
115                }
116            })
117            .boxed()
118    }
119}
120
121#[cfg(test)]
122mod tests {
123    use std::{collections::HashSet, sync::Arc};
124
125    use futures::{StreamExt, pin_mut};
126    use hopr_api::{
127        OffchainKeypair,
128        graph::NetworkGraphUpdate,
129        types::{crypto::keypairs::Keypair, internal::NodeId},
130    };
131    use hopr_network_graph::ChannelGraph;
132    use tokio::time::timeout;
133
134    use super::*;
135
136    const TINY_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(20);
137
138    #[derive(Debug, Clone, PartialEq, Eq, Hash)]
139    pub struct Node {
140        pub id: OffchainPublicKey,
141    }
142
143    impl Into<OffchainPublicKey> for Node {
144        fn into(self) -> OffchainPublicKey {
145            self.id
146        }
147    }
148
149    lazy_static::lazy_static! {
150        static ref RANDOM_PEERS: HashSet<Node> = (1..10).map(|_| {
151            Node {
152                id: OffchainPublicKey::from_privkey(&hopr_api::types::crypto_random::random_bytes::<32>()).unwrap(),
153            }
154        }).collect::<HashSet<_>>();
155    }
156
157    #[tokio::test]
158    async fn peers_should_not_be_passed_if_none_are_present() -> anyhow::Result<()> {
159        let channel_graph = Arc::new(ChannelGraph::new(OffchainKeypair::random().public().clone()));
160
161        let prober = ImmediateNeighborProber::new(Default::default(), channel_graph);
162        let stream = ProbingTrafficGeneration::build(&prober);
163        pin_mut!(stream);
164
165        assert!(timeout(TINY_TIMEOUT, stream.next()).await.is_err());
166
167        Ok(())
168    }
169
170    #[tokio::test]
171    async fn peers_should_have_randomized_order() -> anyhow::Result<()> {
172        let channel_graph = Arc::new(ChannelGraph::new(OffchainKeypair::random().public().clone()));
173
174        for node in RANDOM_PEERS.iter() {
175            channel_graph.record_node(node.clone());
176        }
177
178        let prober = ImmediateNeighborProber::new(
179            ProberConfig {
180                interval: std::time::Duration::from_millis(1),
181                ..Default::default()
182            },
183            channel_graph,
184        );
185
186        let stream = ProbingTrafficGeneration::build(&prober);
187        pin_mut!(stream);
188
189        let actual = timeout(
190            TINY_TIMEOUT * 20,
191            stream
192                .take(RANDOM_PEERS.len())
193                .map(|routing| match routing {
194                    ProbeRouting::Neighbor(DestinationRouting::Forward { destination, .. }) => {
195                        if let NodeId::Offchain(peer_key) = destination.as_ref() {
196                            *peer_key
197                        } else {
198                            panic!("expected offchain destination, got chain address");
199                        }
200                    }
201                    _ => panic!("expected Neighbor Forward routing"),
202                })
203                .collect::<Vec<_>>(),
204        )
205        .await?;
206
207        assert_eq!(actual.len(), RANDOM_PEERS.len());
208        assert!(!actual.iter().zip(RANDOM_PEERS.iter()).all(|(a, b)| a == &b.id));
209
210        Ok(())
211    }
212
213    #[tokio::test]
214    async fn peers_should_be_generated_in_multiple_rounds_as_long_as_they_are_available() -> anyhow::Result<()> {
215        let cfg = ProberConfig {
216            interval: std::time::Duration::from_millis(1),
217            recheck_threshold: std::time::Duration::from_millis(1000),
218        };
219
220        let channel_graph = Arc::new(ChannelGraph::new(OffchainKeypair::random().public().clone()));
221        channel_graph.record_node(RANDOM_PEERS.iter().next().unwrap().clone());
222
223        let prober = ImmediateNeighborProber::new(cfg, channel_graph);
224        let stream = ProbingTrafficGeneration::build(&prober);
225        pin_mut!(stream);
226
227        assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
228        assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
229
230        Ok(())
231    }
232}