1use futures::{StreamExt, stream::BoxStream};
2use hopr_api::{
3 ct::{CoverTrafficGeneration, ProbeRouting, ProbingTrafficGeneration},
4 graph::{NetworkGraphTraverse, NetworkGraphView},
5 types::{
6 crypto::types::OffchainPublicKey,
7 crypto_random::Randomizable,
8 internal::{
9 protocol::HoprPseudonym,
10 routing::{DestinationRouting, RoutingOptions},
11 },
12 },
13};
14#[cfg(feature = "serde")]
15use serde::{Deserialize, Serialize};
16use validator::Validate;
17
18#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault, Validate)]
20#[cfg_attr(feature = "serde", derive(Serialize, Deserialize), serde(deny_unknown_fields))]
21pub struct ProberConfig {
22 #[cfg_attr(
24 feature = "serde",
25 serde(default = "default_probing_interval", with = "humantime_serde")
26 )]
27 #[default(default_probing_interval())]
28 pub interval: std::time::Duration,
29
30 #[cfg_attr(
32 feature = "serde",
33 serde(default = "default_recheck_threshold", with = "humantime_serde")
34 )]
35 #[default(default_recheck_threshold())]
36 pub recheck_threshold: std::time::Duration,
37}
38
39const DEFAULT_REPEATED_PROBING_DELAY: std::time::Duration = std::time::Duration::from_secs(5);
41
42const DEFAULT_PROBE_RECHECK_THRESHOLD: std::time::Duration = std::time::Duration::from_secs(60);
44
45#[inline]
46const fn default_probing_interval() -> std::time::Duration {
47 DEFAULT_REPEATED_PROBING_DELAY
48}
49
50#[inline]
51const fn default_recheck_threshold() -> std::time::Duration {
52 DEFAULT_PROBE_RECHECK_THRESHOLD
53}
54
55pub struct ImmediateNeighborProber<U> {
56 cfg: ProberConfig,
57 graph: U,
58}
59
60impl<U> ImmediateNeighborProber<U> {
61 pub fn new(cfg: ProberConfig, graph: U) -> Self {
62 Self { cfg, graph }
63 }
64}
65
66impl<U> CoverTrafficGeneration for ImmediateNeighborProber<U>
67where
68 U: NetworkGraphTraverse<NodeId = OffchainPublicKey> + Clone + Send + Sync + 'static,
69{
70 fn build(&self) -> BoxStream<'static, DestinationRouting> {
71 Box::pin(futures::stream::empty())
73 }
74}
75
76impl<U> ProbingTrafficGeneration for ImmediateNeighborProber<U>
77where
78 U: NetworkGraphView<NodeId = OffchainPublicKey> + Clone + Send + Sync + 'static,
79{
80 fn build(&self) -> BoxStream<'static, ProbeRouting> {
81 let cache_peer_routing: moka::future::Cache<OffchainPublicKey, ProbeRouting> = moka::future::Cache::builder()
83 .time_to_live(std::time::Duration::from_secs(600))
84 .max_capacity(100_000)
85 .build();
86
87 let cfg = self.cfg;
88 let graph = self.graph.clone();
89
90 futures::stream::repeat(())
91 .filter_map(move |_| {
92 let nodes = graph.nodes();
93
94 async move {
95 hopr_async_runtime::prelude::sleep(cfg.interval).await;
96 Some(nodes)
97 }
98 })
99 .flatten()
100 .filter_map(move |peer| {
101 let cache_peer_routing = cache_peer_routing.clone();
102
103 async move {
104 cache_peer_routing
105 .try_get_with(peer, async move {
106 Ok::<ProbeRouting, anyhow::Error>(ProbeRouting::Neighbor(DestinationRouting::Forward {
107 destination: Box::new(peer.into()),
108 pseudonym: Some(HoprPseudonym::random()),
109 forward_options: RoutingOptions::Hops(0.try_into().expect("0 is a valid u8")),
110 return_options: Some(RoutingOptions::Hops(0.try_into().expect("0 is a valid u8"))),
111 }))
112 })
113 .await
114 .ok()
115 }
116 })
117 .boxed()
118 }
119}
120
121#[cfg(test)]
122mod tests {
123 use std::{collections::HashSet, sync::Arc};
124
125 use futures::{StreamExt, pin_mut};
126 use hopr_api::{
127 OffchainKeypair,
128 graph::NetworkGraphUpdate,
129 types::{crypto::keypairs::Keypair, internal::NodeId},
130 };
131 use hopr_network_graph::ChannelGraph;
132 use tokio::time::timeout;
133
134 use super::*;
135
136 const TINY_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(20);
137
138 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
139 pub struct Node {
140 pub id: OffchainPublicKey,
141 }
142
143 impl Into<OffchainPublicKey> for Node {
144 fn into(self) -> OffchainPublicKey {
145 self.id
146 }
147 }
148
149 lazy_static::lazy_static! {
150 static ref RANDOM_PEERS: HashSet<Node> = (1..10).map(|_| {
151 Node {
152 id: OffchainPublicKey::from_privkey(&hopr_api::types::crypto_random::random_bytes::<32>()).unwrap(),
153 }
154 }).collect::<HashSet<_>>();
155 }
156
157 #[tokio::test]
158 async fn peers_should_not_be_passed_if_none_are_present() -> anyhow::Result<()> {
159 let channel_graph = Arc::new(ChannelGraph::new(OffchainKeypair::random().public().clone()));
160
161 let prober = ImmediateNeighborProber::new(Default::default(), channel_graph);
162 let stream = ProbingTrafficGeneration::build(&prober);
163 pin_mut!(stream);
164
165 assert!(timeout(TINY_TIMEOUT, stream.next()).await.is_err());
166
167 Ok(())
168 }
169
170 #[tokio::test]
171 async fn peers_should_have_randomized_order() -> anyhow::Result<()> {
172 let channel_graph = Arc::new(ChannelGraph::new(OffchainKeypair::random().public().clone()));
173
174 for node in RANDOM_PEERS.iter() {
175 channel_graph.record_node(node.clone());
176 }
177
178 let prober = ImmediateNeighborProber::new(
179 ProberConfig {
180 interval: std::time::Duration::from_millis(1),
181 ..Default::default()
182 },
183 channel_graph,
184 );
185
186 let stream = ProbingTrafficGeneration::build(&prober);
187 pin_mut!(stream);
188
189 let actual = timeout(
190 TINY_TIMEOUT * 20,
191 stream
192 .take(RANDOM_PEERS.len())
193 .map(|routing| match routing {
194 ProbeRouting::Neighbor(DestinationRouting::Forward { destination, .. }) => {
195 if let NodeId::Offchain(peer_key) = destination.as_ref() {
196 *peer_key
197 } else {
198 panic!("expected offchain destination, got chain address");
199 }
200 }
201 _ => panic!("expected Neighbor Forward routing"),
202 })
203 .collect::<Vec<_>>(),
204 )
205 .await?;
206
207 assert_eq!(actual.len(), RANDOM_PEERS.len());
208 assert!(!actual.iter().zip(RANDOM_PEERS.iter()).all(|(a, b)| a == &b.id));
209
210 Ok(())
211 }
212
213 #[tokio::test]
214 async fn peers_should_be_generated_in_multiple_rounds_as_long_as_they_are_available() -> anyhow::Result<()> {
215 let cfg = ProberConfig {
216 interval: std::time::Duration::from_millis(1),
217 recheck_threshold: std::time::Duration::from_millis(1000),
218 };
219
220 let channel_graph = Arc::new(ChannelGraph::new(OffchainKeypair::random().public().clone()));
221 channel_graph.record_node(RANDOM_PEERS.iter().next().unwrap().clone());
222
223 let prober = ImmediateNeighborProber::new(cfg, channel_graph);
224 let stream = ProbingTrafficGeneration::build(&prober);
225 pin_mut!(stream);
226
227 assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
228 assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
229
230 Ok(())
231 }
232}