1use std::{sync::Arc, time::Duration};
2
3use futures::{StreamExt as _, TryStreamExt, stream::FuturesUnordered};
4use hopr_api::chain::{ChainKeyOperations, ChainPathResolver, ChainReadChannelOperations};
5use hopr_crypto_packet::prelude::*;
6use hopr_protocol_hopr::{FoundSurb, SurbStore};
7#[cfg(all(feature = "telemetry", not(test)))]
8use hopr_types::internal::path::Path;
9use hopr_types::{
10 crypto::{crypto_traits::Randomizable, types::OffchainPublicKey},
11 internal::{errors::PathError, prelude::*},
12 primitive::traits::ToHex,
13};
14use tracing::trace;
15
16use crate::{
17 errors::{PathPlannerError, Result},
18 traits::{BackgroundPathCacheRefreshable, PathSelector},
19};
20
21#[cfg(all(feature = "telemetry", not(test)))]
22lazy_static::lazy_static! {
23 static ref METRIC_PATH_LENGTH: hopr_metrics::SimpleHistogram = hopr_metrics::SimpleHistogram::new(
24 "hopr_path_length",
25 "Distribution of number of hops of sent messages",
26 vec![0.0, 1.0, 2.0, 3.0, 4.0]
27 ).unwrap();
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault)]
32pub struct PathPlannerConfig {
33 #[default = 10_000]
35 pub max_cache_capacity: u64,
36 #[default(Duration::from_secs(60))]
39 pub cache_ttl: Duration,
40 #[default(Duration::from_secs(30))]
42 pub refresh_period: Duration,
43 #[default = 8]
46 pub max_cached_paths: usize,
47}
48
49type PlannerCacheKey = (NodeId, NodeId, u32);
54type PlannerCacheValue = Arc<hopr_statistics::WeightedCollection<ValidatedPath>>;
55
56#[derive(Clone)]
70pub struct PathPlanner<Surb, R, S> {
71 me: OffchainPublicKey,
72 pub surb_store: Surb,
73 resolver: Arc<R>,
74 selector: Arc<S>,
75 cache: moka::future::Cache<PlannerCacheKey, PlannerCacheValue>,
76 refresh_period: Duration,
77}
78
79impl<Surb, R, S> PathPlanner<Surb, R, S>
80where
81 Surb: SurbStore + Send + Sync + 'static,
82 R: ChainKeyOperations + ChainReadChannelOperations + Send + Sync + 'static,
83 S: PathSelector + Send + Sync + 'static,
84{
85 pub fn new(me: OffchainPublicKey, surb_store: Surb, resolver: R, selector: S, config: PathPlannerConfig) -> Self {
89 let cache = moka::future::Cache::builder()
90 .max_capacity(config.max_cache_capacity)
91 .time_to_live(config.cache_ttl)
92 .build();
93
94 Self {
95 me,
96 surb_store,
97 resolver: Arc::new(resolver),
98 selector: Arc::new(selector),
99 cache,
100 refresh_period: config.refresh_period,
101 }
102 }
103
104 async fn resolve_node_id_to_offchain_key(&self, node_id: &NodeId) -> Result<OffchainPublicKey> {
106 match node_id {
107 NodeId::Offchain(key) => Ok(*key),
108 NodeId::Chain(addr) => {
109 let resolver = ChainPathResolver::from(&*self.resolver);
110 resolver
111 .resolve_transport_address(addr)
112 .await
113 .map_err(|e| PathPlannerError::Other(anyhow::anyhow!("{e}")))?
114 .ok_or_else(|| {
115 PathPlannerError::Other(anyhow::anyhow!("no offchain key found for chain address {addr}"))
116 })
117 }
118 }
119 }
120
121 #[tracing::instrument(level = "trace", skip(self))]
122 async fn resolve_path(
123 &self,
124 source: NodeId,
125 destination: NodeId,
126 options: RoutingOptions,
127 ) -> Result<ValidatedPath> {
128 let path = match options {
129 RoutingOptions::IntermediatePath(explicit_path) => {
130 trace!(?explicit_path, "resolving an explicit intermediate path");
131 let resolver = ChainPathResolver::from(&*self.resolver);
132 ValidatedPath::new(
133 source,
134 explicit_path
135 .into_iter()
136 .chain(std::iter::once(destination))
137 .collect::<Vec<_>>(),
138 &resolver,
139 )
140 .await?
141 }
142
143 RoutingOptions::Hops(hops) if u32::from(hops) == 0 => {
144 trace!(hops = 0, "resolving zero-hop direct path");
145 let resolver = ChainPathResolver::from(&*self.resolver);
146 ValidatedPath::new(source, vec![destination], &resolver).await?
147 }
148
149 RoutingOptions::Hops(hops) => {
150 let hops_usize: usize = hops.into();
151 trace!(hops = hops_usize, "resolving path via planner cache");
152
153 let src_key = self.resolve_node_id_to_offchain_key(&source).await?;
154 let dest_key = self.resolve_node_id_to_offchain_key(&destination).await?;
155
156 let cache_key: PlannerCacheKey = (source, destination, u32::from(hops));
157
158 let resolver = self.resolver.clone();
159 let selector = self.selector.clone();
160
161 let paths = self
162 .cache
163 .try_get_with(cache_key, async move {
164 trace!(hops = hops_usize, "path cache miss, querying selector");
165 let candidates = selector.select_path(src_key, dest_key, hops_usize)?;
166
167 let chain_resolver = ChainPathResolver::from(&*resolver);
168 let mut valid_paths: Vec<(ValidatedPath, f64)> = Vec::new();
169 for pwc in candidates {
170 let node_ids: Vec<NodeId> = pwc.path.into_iter().map(NodeId::Offchain).collect::<Vec<_>>();
171 match ValidatedPath::new(source, node_ids, &chain_resolver).await {
172 Ok(vp) => valid_paths.push((vp, pwc.cost)),
173 Err(e) => trace!(error = %e, "path candidate failed validation"),
174 }
175 }
176
177 if valid_paths.is_empty() {
178 return Err(PathPlannerError::Path(PathError::PathNotFound(
179 hops_usize,
180 src_key.to_hex(),
181 dest_key.to_hex(),
182 )));
183 }
184
185 Ok(Arc::new(hopr_statistics::WeightedCollection::new(valid_paths)))
186 })
187 .await
188 .map_err(PathPlannerError::CacheError)?;
189
190 paths.pick_one().ok_or_else(|| {
191 PathPlannerError::Path(PathError::PathNotFound(hops_usize, src_key.to_hex(), dest_key.to_hex()))
192 })?
193 }
194 };
195
196 #[cfg(all(feature = "telemetry", not(test)))]
197 {
198 hopr_metrics::SimpleHistogram::observe(&METRIC_PATH_LENGTH, (path.num_hops() - 1) as f64);
199 }
200
201 trace!(%path, "validated resolved path");
202 Ok(path)
203 }
204
205 #[tracing::instrument(level = "trace", skip(self))]
209 pub async fn resolve_routing(
210 &self,
211 size_hint: usize,
212 max_surbs: usize,
213 routing: DestinationRouting,
214 ) -> Result<(ResolvedTransportRouting<HoprSurb>, Option<usize>)> {
215 match routing {
216 DestinationRouting::Forward {
217 destination,
218 pseudonym,
219 forward_options,
220 return_options,
221 } => {
222 let forward_path = self
223 .resolve_path(NodeId::Offchain(self.me), *destination, forward_options)
224 .await?;
225
226 let return_paths = if let Some(return_options) = return_options {
227 let num_possible_surbs = HoprPacket::max_surbs_with_message(size_hint).min(max_surbs);
228 trace!(
229 %destination,
230 %num_possible_surbs,
231 data_len = size_hint,
232 max_surbs,
233 "resolving packet return paths"
234 );
235
236 (0..num_possible_surbs)
237 .map(|_| self.resolve_path(*destination, NodeId::Offchain(self.me), return_options.clone()))
238 .collect::<FuturesUnordered<_>>()
239 .try_collect::<Vec<ValidatedPath>>()
240 .await?
241 } else {
242 vec![]
243 };
244
245 trace!(%destination, num_surbs = return_paths.len(), data_len = size_hint, "resolved packet");
246
247 Ok((
248 ResolvedTransportRouting::Forward {
249 pseudonym: pseudonym.unwrap_or_else(HoprPseudonym::random),
250 forward_path,
251 return_paths,
252 },
253 None,
254 ))
255 }
256
257 DestinationRouting::Return(matcher) => {
258 let FoundSurb {
259 sender_id,
260 surb,
261 remaining,
262 } =
263 self.surb_store.find_surb(matcher).await.ok_or_else(|| {
264 PathPlannerError::Surb(format!("no surb for pseudonym {}", matcher.pseudonym()))
265 })?;
266 Ok((ResolvedTransportRouting::Return(sender_id, surb), Some(remaining)))
267 }
268 }
269 }
270}
271
272impl<Surb, R, S> BackgroundPathCacheRefreshable for PathPlanner<Surb, R, S>
273where
274 Surb: SurbStore + Send + Sync + 'static,
275 R: ChainKeyOperations + ChainReadChannelOperations + Send + Sync + 'static,
276 S: PathSelector + Send + Sync + 'static,
277{
278 fn run_background_refresh(&self) -> impl std::future::Future<Output = ()> + Send + 'static {
284 let cache = self.cache.clone();
286 let resolver = self.resolver.clone();
287 let selector = self.selector.clone();
288 let refresh_period = self.refresh_period;
289
290 futures_time::stream::interval(futures_time::time::Duration::from_millis(
292 refresh_period.as_millis() as u64 + 1u64,
293 ))
294 .for_each(move |_| {
295 let cache = cache.clone();
296 let resolver = resolver.clone();
297 let selector = selector.clone();
298
299 async move {
300 for (key, _) in cache.iter() {
301 let (src, dest, hops_u32) = {
302 let k = key.as_ref();
303 (k.0, k.1, k.2)
304 };
305
306 if hops_u32 == 0 {
307 continue;
308 }
309 let hops_usize = hops_u32 as usize;
310
311 let resolve_key = |node: NodeId| {
312 let resolver = resolver.clone();
313
314 async move {
315 match node {
316 NodeId::Offchain(k) => Some(k),
317 NodeId::Chain(addr) => ChainPathResolver::from(&*resolver)
318 .resolve_transport_address(&addr)
319 .await
320 .ok()
321 .flatten(),
322 }
323 }
324 };
325
326 if let (Some(src_key), Some(dest_key)) = (resolve_key(src).await, resolve_key(dest).await)
327 && let Ok(candidates) = selector.select_path(src_key, dest_key, hops_usize)
328 {
329 let chain_resolver = ChainPathResolver::from(&*resolver);
330 let mut valid_paths: Vec<(ValidatedPath, f64)> = Vec::new();
331 for pwc in candidates {
332 let node_ids: Vec<NodeId> = pwc.path.into_iter().map(NodeId::Offchain).collect::<Vec<_>>();
333 match ValidatedPath::new(src, node_ids, &chain_resolver).await {
334 Ok(vp) => valid_paths.push((vp, pwc.cost)),
335 Err(e) => trace!(error = %e, "background refresh: path candidate failed validation"),
336 }
337 }
338
339 if !valid_paths.is_empty() {
340 cache
341 .insert(
342 (src, dest, hops_u32),
343 Arc::new(hopr_statistics::WeightedCollection::new(valid_paths)),
344 )
345 .await;
346 }
347 }
348 }
349 }
350 })
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use std::str::FromStr;
357
358 use async_trait::async_trait;
359 use bimap::BiMap;
360 use futures::stream::{self, BoxStream};
361 use hex_literal::hex;
362 use hopr_api::{
363 chain::{ChainKeyOperations, ChainReadChannelOperations, ChannelSelector, HoprKeyIdent},
364 graph::{
365 NetworkGraphWrite,
366 traits::{EdgeObservableWrite, EdgeWeightType},
367 },
368 };
369 use hopr_network_graph::ChannelGraph;
370 use hopr_types::{
371 crypto::prelude::{Keypair, OffchainKeypair},
372 internal::channels::{ChannelEntry, ChannelStatus, generate_channel_id},
373 primitive::prelude::*,
374 };
375
376 use super::*;
377 use crate::selector::HoprGraphPathSelector;
378
379 #[derive(Debug)]
380 struct TestError(String);
381
382 impl std::fmt::Display for TestError {
383 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
384 f.write_str(&self.0)
385 }
386 }
387
388 impl std::error::Error for TestError {}
389
390 const SECRET_ME: [u8; 32] = hex!("60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d");
391 const SECRET_A: [u8; 32] = hex!("71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a");
392 const SECRET_DEST: [u8; 32] = hex!("c24bd833704dd2abdae3933fcc9962c2ac404f84132224c474147382d4db2299");
393
394 fn pubkey(secret: &[u8; 32]) -> OffchainPublicKey {
395 *OffchainKeypair::from_secret(secret).expect("valid secret").public()
396 }
397
398 #[derive(Clone)]
399 struct Mapper {
400 map: Arc<BiMap<OffchainPublicKey, HoprKeyIdent>>,
401 }
402
403 impl KeyIdMapping<HoprKeyIdent, OffchainPublicKey> for Mapper {
404 fn map_key_to_id(&self, key: &OffchainPublicKey) -> Option<HoprKeyIdent> {
405 self.map.get_by_left(key).copied()
406 }
407
408 fn map_id_to_public(&self, id: &HoprKeyIdent) -> Option<OffchainPublicKey> {
409 self.map.get_by_right(id).copied()
410 }
411
412 fn map_keys_to_ids(&self, keys: &[OffchainPublicKey]) -> Vec<Option<HoprKeyIdent>> {
413 keys.iter().map(|key| self.map_key_to_id(key)).collect()
414 }
415
416 fn map_ids_to_keys(&self, ids: &[HoprKeyIdent]) -> Vec<Option<OffchainPublicKey>> {
417 ids.iter().map(|id| self.map_id_to_public(id)).collect()
418 }
419 }
420
421 struct TestChainApi {
422 me: Address,
423 key_addr_map: BiMap<OffchainPublicKey, Address>,
424 channels: Vec<ChannelEntry>,
425 id_mapper: Mapper,
426 }
427
428 impl TestChainApi {
429 fn new(me_key: OffchainPublicKey, me_addr: Address, peers: Vec<(OffchainPublicKey, Address)>) -> Self {
430 let mut key_addr_map = BiMap::new();
431 let mut key_id_map: BiMap<OffchainPublicKey, HoprKeyIdent> = BiMap::new();
432 key_addr_map.insert(me_key, me_addr);
433 key_id_map.insert(me_key, 0u32.into());
434 for (i, (k, a)) in peers.iter().enumerate() {
435 key_addr_map.insert(*k, *a);
436 key_id_map.insert(*k, ((i + 1) as u32).into());
437 }
438 Self {
439 me: me_addr,
440 key_addr_map,
441 channels: vec![],
442 id_mapper: Mapper {
443 map: Arc::new(key_id_map),
444 },
445 }
446 }
447
448 fn with_open_channel(mut self, src: Address, dst: Address) -> Self {
449 self.channels.push(
450 ChannelEntry::builder()
451 .between(src, dst)
452 .amount(100)
453 .ticket_index(1)
454 .status(ChannelStatus::Open)
455 .epoch(1)
456 .build()
457 .unwrap(),
458 );
459 self
460 }
461 }
462
463 #[async_trait]
464 impl ChainKeyOperations for TestChainApi {
465 type Error = TestError;
466 type Mapper = Mapper;
467
468 async fn chain_key_to_packet_key(
469 &self,
470 chain: &Address,
471 ) -> std::result::Result<Option<OffchainPublicKey>, TestError> {
472 Ok(self.key_addr_map.get_by_right(chain).copied())
473 }
474
475 async fn packet_key_to_chain_key(
476 &self,
477 packet: &OffchainPublicKey,
478 ) -> std::result::Result<Option<Address>, TestError> {
479 Ok(self.key_addr_map.get_by_left(packet).copied())
480 }
481
482 fn key_id_mapper_ref(&self) -> &Self::Mapper {
483 &self.id_mapper
484 }
485 }
486
487 #[async_trait]
488 impl ChainReadChannelOperations for TestChainApi {
489 type Error = TestError;
490
491 fn me(&self) -> &Address {
492 &self.me
493 }
494
495 async fn channel_by_id(&self, channel_id: &ChannelId) -> std::result::Result<Option<ChannelEntry>, TestError> {
496 Ok(self
497 .channels
498 .iter()
499 .find(|c| generate_channel_id(&c.source, &c.destination) == *channel_id)
500 .cloned())
501 }
502
503 async fn stream_channels<'a>(
504 &'a self,
505 _selector: ChannelSelector,
506 ) -> std::result::Result<BoxStream<'a, ChannelEntry>, TestError> {
507 Ok(Box::pin(stream::iter(self.channels.clone())))
508 }
509 }
510
511 fn me_addr() -> Address {
513 Address::from_str("0x1000d5786d9e6799b3297da1ad55605b91e2c882").expect("valid addr")
514 }
515 fn a_addr() -> Address {
516 Address::from_str("0x200060ddced1e33c9647a71f4fc2cf4ed33e4a9d").expect("valid addr")
517 }
518 fn dest_addr() -> Address {
519 Address::from_str("0x30004105095c8c10f804109b4d1199a9ac40ed46").expect("valid addr")
520 }
521
522 fn mark_edge_full(graph: &ChannelGraph, src: &OffchainPublicKey, dst: &OffchainPublicKey) {
524 use hopr_api::graph::traits::EdgeWeightType;
525 graph.upsert_edge(src, dst, |obs| {
526 obs.record(EdgeWeightType::Connected(true));
527 obs.record(EdgeWeightType::Immediate(Ok(std::time::Duration::from_millis(50))));
528 obs.record(EdgeWeightType::Intermediate(Ok(std::time::Duration::from_millis(50))));
529 obs.record(EdgeWeightType::Capacity(Some(1000)));
530 });
531 }
532
533 fn mark_edge_last(graph: &ChannelGraph, src: &OffchainPublicKey, dst: &OffchainPublicKey) {
534 graph.upsert_edge(src, dst, |obs| {
535 obs.record(EdgeWeightType::Connected(true));
536 obs.record(EdgeWeightType::Immediate(Ok(std::time::Duration::from_millis(50))));
537 obs.record(EdgeWeightType::Intermediate(Ok(std::time::Duration::from_millis(50))));
538 obs.record(EdgeWeightType::Capacity(Some(1000)));
539 });
540 }
541
542 fn small_config() -> PathPlannerConfig {
543 PathPlannerConfig {
544 max_cache_capacity: 100,
545 cache_ttl: std::time::Duration::from_secs(60),
546 refresh_period: std::time::Duration::from_secs(60),
547 max_cached_paths: 2,
548 }
549 }
550
551 #[tokio::test]
554 async fn zero_hop_path_should_bypass_selector() {
555 let me = pubkey(&SECRET_ME);
556 let dest = pubkey(&SECRET_DEST);
557
558 let graph = ChannelGraph::new(me);
560 let selector = HoprGraphPathSelector::new(me, graph, small_config().max_cached_paths);
561
562 let chain_api = TestChainApi::new(me, me_addr(), vec![(dest, dest_addr())]);
563 let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
564
565 let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
566
567 let routing = DestinationRouting::Forward {
568 destination: Box::new(NodeId::Offchain(dest)),
569 pseudonym: None,
570 forward_options: RoutingOptions::Hops(0.try_into().expect("valid 0")),
571 return_options: None,
572 };
573
574 let result = planner.resolve_routing(100, 0, routing).await;
575 assert!(result.is_ok(), "zero-hop should succeed: {:?}", result.err());
576
577 let (resolved, rem) = result.unwrap();
578 assert!(rem.is_none());
579 if let ResolvedTransportRouting::Forward { forward_path, .. } = resolved {
580 assert_eq!(
581 forward_path.num_hops(),
582 1,
583 "zero-hop = 1 node in path (just destination)"
584 );
585 } else {
586 panic!("expected Forward routing");
587 }
588 }
589
590 #[tokio::test]
593 async fn one_hop_path_should_use_selector() {
594 let me = pubkey(&SECRET_ME);
595 let a = pubkey(&SECRET_A);
596 let dest = pubkey(&SECRET_DEST);
597
598 let graph = ChannelGraph::new(me);
599 graph.add_node(a);
600 graph.add_node(dest);
601 graph.add_edge(&me, &a).unwrap();
602 graph.add_edge(&a, &dest).unwrap();
603 mark_edge_full(&graph, &me, &a);
604 mark_edge_last(&graph, &a, &dest);
605
606 let selector = HoprGraphPathSelector::new(me, graph, small_config().max_cached_paths);
607
608 let chain_api = TestChainApi::new(me, me_addr(), vec![(a, a_addr()), (dest, dest_addr())])
609 .with_open_channel(me_addr(), a_addr())
610 .with_open_channel(a_addr(), dest_addr());
611
612 let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
613 let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
614
615 let routing = DestinationRouting::Forward {
616 destination: Box::new(NodeId::Offchain(dest)),
617 pseudonym: None,
618 forward_options: RoutingOptions::Hops(1.try_into().expect("valid 1")),
619 return_options: None,
620 };
621
622 let result = planner.resolve_routing(100, 0, routing).await;
623 assert!(result.is_ok(), "1-hop routing should succeed: {:?}", result.err());
624
625 let (resolved, _) = result.unwrap();
626 if let ResolvedTransportRouting::Forward { forward_path, .. } = resolved {
627 assert_eq!(
628 forward_path.num_hops(),
629 2,
630 "1 intermediate hop means path has 2 nodes [a, dest]"
631 );
632 } else {
633 panic!("expected Forward routing");
634 }
635 }
636
637 #[tokio::test]
638 async fn explicit_intermediate_path_should_bypass_selector() {
639 let me = pubkey(&SECRET_ME);
640 let a = pubkey(&SECRET_A);
641 let dest = pubkey(&SECRET_DEST);
642
643 let graph = ChannelGraph::new(me);
645 let selector = HoprGraphPathSelector::new(me, graph, small_config().max_cached_paths);
646
647 let chain_api = TestChainApi::new(me, me_addr(), vec![(a, a_addr()), (dest, dest_addr())])
648 .with_open_channel(me_addr(), a_addr())
649 .with_open_channel(a_addr(), dest_addr());
650
651 let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
652 let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
653
654 use hopr_types::primitive::prelude::BoundedVec;
655 let intermediate_path = BoundedVec::try_from(vec![NodeId::Offchain(a)]).expect("valid");
656
657 let routing = DestinationRouting::Forward {
658 destination: Box::new(NodeId::Offchain(dest)),
659 pseudonym: None,
660 forward_options: RoutingOptions::IntermediatePath(intermediate_path),
661 return_options: None,
662 };
663
664 let result = planner.resolve_routing(100, 0, routing).await;
665 assert!(result.is_ok(), "explicit path should succeed: {:?}", result.err());
666
667 let (resolved, _) = result.unwrap();
668 if let ResolvedTransportRouting::Forward { forward_path, .. } = resolved {
669 assert_eq!(forward_path.num_hops(), 2, "one intermediate + destination = 2 hops");
670 } else {
671 panic!("expected Forward routing");
672 }
673 }
674
675 #[tokio::test]
676 async fn return_routing_without_surb_should_return_error() {
677 let me = pubkey(&SECRET_ME);
678 let graph = ChannelGraph::new(me);
679 let selector = HoprGraphPathSelector::new(me, graph, small_config().max_cached_paths);
680 let chain_api = TestChainApi::new(me, me_addr(), vec![]);
681 let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
682
683 let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
684
685 use hopr_types::internal::routing::SurbMatcher;
686 let matcher = SurbMatcher::Pseudonym(HoprPseudonym::random());
687 let routing = DestinationRouting::Return(matcher);
688
689 let result = planner.resolve_routing(0, 0, routing).await;
690 assert!(result.is_err(), "should fail when no SURB exists");
691 assert!(
692 matches!(result.unwrap_err(), PathPlannerError::Surb(_)),
693 "error should be Surb variant"
694 );
695 }
696
697 #[tokio::test]
700 async fn planner_cache_miss_should_populate_cache() {
701 let me = pubkey(&SECRET_ME);
702 let a = pubkey(&SECRET_A);
703 let dest = pubkey(&SECRET_DEST);
704
705 let graph = ChannelGraph::new(me);
706 graph.add_node(a);
707 graph.add_node(dest);
708 graph.add_edge(&me, &a).unwrap();
709 graph.add_edge(&a, &dest).unwrap();
710 mark_edge_full(&graph, &me, &a);
711 mark_edge_last(&graph, &a, &dest);
712
713 let selector = HoprGraphPathSelector::new(me, graph, small_config().max_cached_paths);
714 let chain_api = TestChainApi::new(me, me_addr(), vec![(a, a_addr()), (dest, dest_addr())])
715 .with_open_channel(me_addr(), a_addr())
716 .with_open_channel(a_addr(), dest_addr());
717 let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
718 let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
719
720 let cache_key: PlannerCacheKey = (NodeId::Offchain(me), NodeId::Offchain(dest), 1);
721
722 assert!(
723 planner.cache.get(&cache_key).await.is_none(),
724 "cache should be empty before first call"
725 );
726
727 let routing = DestinationRouting::Forward {
728 destination: Box::new(NodeId::Offchain(dest)),
729 pseudonym: None,
730 forward_options: RoutingOptions::Hops(1.try_into().expect("valid 1")),
731 return_options: None,
732 };
733 planner.resolve_routing(100, 0, routing).await.expect("should succeed");
734
735 let cached = planner.cache.get(&cache_key).await;
736 assert!(cached.is_some(), "cache should be populated after first call");
737 let paths = cached.unwrap();
738 assert!(!paths.is_empty(), "cached paths must be non-empty");
739 let (first_path, first_cost) = paths.iter().next().expect("at least one cached path");
740 assert_eq!(first_path.num_hops(), 2, "path should have 2 hops [a, dest]");
741 assert!(*first_cost > 0.0, "cost should be positive");
742 }
743
744 #[tokio::test]
745 async fn planner_cache_hit_should_return_valid_path() {
746 let me = pubkey(&SECRET_ME);
747 let a = pubkey(&SECRET_A);
748 let dest = pubkey(&SECRET_DEST);
749
750 let graph = ChannelGraph::new(me);
751 graph.add_node(a);
752 graph.add_node(dest);
753 graph.add_edge(&me, &a).unwrap();
754 graph.add_edge(&a, &dest).unwrap();
755 mark_edge_full(&graph, &me, &a);
756 mark_edge_last(&graph, &a, &dest);
757
758 let selector = HoprGraphPathSelector::new(me, graph, small_config().max_cached_paths);
759 let chain_api = TestChainApi::new(me, me_addr(), vec![(a, a_addr()), (dest, dest_addr())])
760 .with_open_channel(me_addr(), a_addr())
761 .with_open_channel(a_addr(), dest_addr());
762 let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
763 let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
764
765 let make_routing = || DestinationRouting::Forward {
766 destination: Box::new(NodeId::Offchain(dest)),
767 pseudonym: None,
768 forward_options: RoutingOptions::Hops(1.try_into().expect("valid 1")),
769 return_options: None,
770 };
771
772 let (r1, _) = planner.resolve_routing(100, 0, make_routing()).await.expect("call 1");
773 let (r2, _) = planner.resolve_routing(100, 0, make_routing()).await.expect("call 2");
774
775 let hops1 = if let ResolvedTransportRouting::Forward { forward_path, .. } = r1 {
776 forward_path.num_hops()
777 } else {
778 panic!("expected Forward");
779 };
780 let hops2 = if let ResolvedTransportRouting::Forward { forward_path, .. } = r2 {
781 forward_path.num_hops()
782 } else {
783 panic!("expected Forward");
784 };
785 assert_eq!(hops1, 2);
786 assert_eq!(hops2, 2);
787 }
788
789 #[tokio::test]
790 async fn background_refresh_should_produce_a_future() {
791 let me = pubkey(&SECRET_ME);
792 let graph = ChannelGraph::new(me);
793 let selector = HoprGraphPathSelector::new(me, graph, small_config().max_cached_paths);
794 let chain_api = TestChainApi::new(me, me_addr(), vec![]);
795 let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
796
797 let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
798 let _future = planner.run_background_refresh();
800 }
801}