1use std::{sync::Arc, time::Duration};
2
3use futures::{StreamExt as _, TryStreamExt, stream::FuturesUnordered};
4use hopr_api::chain::{ChainKeyOperations, ChainPathResolver, ChainReadChannelOperations};
5use hopr_crypto_packet::prelude::*;
6use hopr_protocol_hopr::{FoundSurb, SurbStore};
7#[cfg(all(feature = "telemetry", not(test)))]
8use hopr_types::internal::path::Path;
9use hopr_types::{
10 crypto::{crypto_traits::Randomizable, types::OffchainPublicKey},
11 internal::{errors::PathError, prelude::*},
12 primitive::traits::ToHex,
13};
14use tracing::trace;
15use validator::{Validate, ValidationError};
16
17use super::{
18 errors::{PathPlannerError, Result},
19 traits::{BackgroundPathCacheRefreshable, PathSelector},
20};
21
22#[cfg(all(feature = "telemetry", not(test)))]
23lazy_static::lazy_static! {
24 static ref METRIC_PATH_LENGTH: hopr_types::telemetry::SimpleHistogram = hopr_types::telemetry::SimpleHistogram::new(
25 "hopr_path_length",
26 "Distribution of number of hops of sent messages",
27 vec![0.0, 1.0, 2.0, 3.0, 4.0]
28 ).unwrap();
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault, Validate)]
33pub struct PathPlannerConfig {
34 #[default = 10_000]
36 pub max_cache_capacity: u64,
37 #[default(Duration::from_secs(60))]
40 pub cache_ttl: Duration,
41 #[default(Duration::from_secs(30))]
43 pub refresh_period: Duration,
44 #[default = 8]
47 pub max_cached_paths: usize,
48 #[default = 0.5]
52 #[validate(custom(function = "validate_unit_interval"))]
53 pub edge_penalty: f64,
54 #[default = 0.1]
58 #[validate(custom(function = "validate_unit_interval"))]
59 pub min_ack_rate: f64,
60}
61
62fn validate_unit_interval(value: f64) -> std::result::Result<(), ValidationError> {
63 if value.is_finite() && (0.0..=1.0).contains(&value) {
64 Ok(())
65 } else {
66 Err(ValidationError::new("value must be finite and in 0.0..=1.0"))
67 }
68}
69
70type PlannerCacheKey = (NodeId, NodeId, u32);
75type PlannerCacheValue = Arc<hopr_utils::statistics::WeightedCollection<ValidatedPath>>;
76
77#[derive(Clone)]
91pub struct PathPlanner<Surb, R, S> {
92 me: OffchainPublicKey,
93 pub surb_store: Surb,
94 resolver: Arc<R>,
95 selector: Arc<S>,
96 cache: moka::future::Cache<PlannerCacheKey, PlannerCacheValue>,
97 refresh_period: Duration,
98}
99
100impl<Surb, R, S> PathPlanner<Surb, R, S>
101where
102 Surb: SurbStore + Send + Sync + 'static,
103 R: ChainKeyOperations + ChainReadChannelOperations + Send + Sync + 'static,
104 S: PathSelector + Send + Sync + 'static,
105{
106 pub fn new(me: OffchainPublicKey, surb_store: Surb, resolver: R, selector: S, config: PathPlannerConfig) -> Self {
110 let cache = moka::future::Cache::builder()
111 .max_capacity(config.max_cache_capacity)
112 .time_to_live(config.cache_ttl)
113 .build();
114
115 Self {
116 me,
117 surb_store,
118 resolver: Arc::new(resolver),
119 selector: Arc::new(selector),
120 cache,
121 refresh_period: config.refresh_period,
122 }
123 }
124
125 async fn resolve_node_id_to_offchain_key(&self, node_id: &NodeId) -> Result<OffchainPublicKey> {
127 match node_id {
128 NodeId::Offchain(key) => Ok(*key),
129 NodeId::Chain(addr) => {
130 let resolver = ChainPathResolver::from(&*self.resolver);
131 resolver
132 .resolve_transport_address(addr)
133 .await
134 .map_err(|e| PathPlannerError::Other(anyhow::anyhow!("{e}")))?
135 .ok_or_else(|| {
136 PathPlannerError::Other(anyhow::anyhow!("no offchain key found for chain address {addr}"))
137 })
138 }
139 }
140 }
141
142 #[tracing::instrument(level = "trace", skip(self))]
143 async fn resolve_path(
144 &self,
145 source: NodeId,
146 destination: NodeId,
147 options: RoutingOptions,
148 ) -> Result<ValidatedPath> {
149 let path = match options {
150 RoutingOptions::IntermediatePath(explicit_path) => {
151 tracing::debug!(
152 direction = "loopback",
153 ?source,
154 ?destination,
155 ?explicit_path,
156 "resolving intermediate path"
157 );
158 let resolver = ChainPathResolver::from(&*self.resolver);
159 ValidatedPath::new(
160 source,
161 explicit_path
162 .into_iter()
163 .chain(std::iter::once(destination))
164 .collect::<Vec<_>>(),
165 &resolver,
166 )
167 .await?
168 }
169
170 RoutingOptions::Hops(hops) if u32::from(hops) == 0 => {
171 trace!(hops = 0, "resolving zero-hop direct path");
172 let resolver = ChainPathResolver::from(&*self.resolver);
173 ValidatedPath::new(source, vec![destination], &resolver).await?
174 }
175
176 RoutingOptions::Hops(hops) => {
177 let hops_usize: usize = hops.into();
178 trace!(hops = hops_usize, "resolving path via planner cache");
179
180 let src_key = self.resolve_node_id_to_offchain_key(&source).await?;
181 let dest_key = self.resolve_node_id_to_offchain_key(&destination).await?;
182
183 let cache_key: PlannerCacheKey = (source, destination, u32::from(hops));
184
185 let resolver = self.resolver.clone();
186 let selector = self.selector.clone();
187
188 let paths = self
189 .cache
190 .try_get_with(cache_key, async move {
191 trace!(hops = hops_usize, "path cache miss, querying selector");
192 let candidates = selector.select_path(src_key, dest_key, hops_usize)?;
193
194 let chain_resolver = ChainPathResolver::from(&*resolver);
195 let mut valid_paths: Vec<(ValidatedPath, f64)> = Vec::new();
196 for pwc in candidates {
197 let node_ids: Vec<NodeId> = pwc.path.into_iter().map(NodeId::Offchain).collect::<Vec<_>>();
198 match ValidatedPath::new(source, node_ids, &chain_resolver).await {
199 Ok(vp) => {
200 if vp.chain_path().iter().enumerate().any(|(i, a)| vp.chain_path()[..i].contains(a)) {
203 tracing::debug!(path = %vp, "skipping path candidate with repeated chain addresses");
204 continue;
205 }
206 valid_paths.push((vp, pwc.cost))
207 }
208 Err(e) => tracing::warn!(error = %e, "path candidate failed validation"),
209 }
210 }
211
212 if valid_paths.is_empty() {
213 return Err(PathPlannerError::Path(PathError::PathNotFound(
214 hops_usize,
215 src_key.to_hex(),
216 dest_key.to_hex(),
217 )));
218 }
219
220 Ok(Arc::new(hopr_utils::statistics::WeightedCollection::new(valid_paths)))
221 })
222 .await
223 .map_err(PathPlannerError::CacheError)?;
224
225 paths.pick_one().ok_or_else(|| {
226 PathPlannerError::Path(PathError::PathNotFound(hops_usize, src_key.to_hex(), dest_key.to_hex()))
227 })?
228 }
229 };
230
231 #[cfg(all(feature = "telemetry", not(test)))]
232 {
233 hopr_types::telemetry::SimpleHistogram::observe(&METRIC_PATH_LENGTH, (path.num_hops() - 1) as f64);
234 }
235
236 trace!(%path, "validated resolved path");
237 Ok(path)
238 }
239
240 #[tracing::instrument(level = "trace", skip(self))]
244 pub async fn resolve_routing(
245 &self,
246 size_hint: usize,
247 max_surbs: usize,
248 routing: DestinationRouting,
249 ) -> Result<(ResolvedTransportRouting<HoprSurb>, Option<usize>)> {
250 match routing {
251 DestinationRouting::Forward {
252 destination,
253 pseudonym,
254 forward_options,
255 return_options,
256 } => {
257 tracing::debug!(direction = "forward", %destination, "resolving forward path");
258
259 let forward_path = self
260 .resolve_path(NodeId::Offchain(self.me), *destination, forward_options)
261 .await?;
262 tracing::debug!(direction = "forward", %destination, path = %forward_path, "resolved path");
263
264 let return_paths = if let Some(return_options) = return_options {
265 let num_possible_surbs = HoprPacket::max_surbs_with_message(size_hint).min(max_surbs);
266 trace!(
267 %destination,
268 %num_possible_surbs,
269 data_len = size_hint,
270 max_surbs,
271 "resolving packet return paths"
272 );
273
274 (0..num_possible_surbs)
275 .map(|_| self.resolve_path(*destination, NodeId::Offchain(self.me), return_options.clone()))
276 .collect::<FuturesUnordered<_>>()
277 .try_collect::<Vec<ValidatedPath>>()
278 .await?
279 .into_iter()
280 .enumerate()
281 .inspect(|(i, rp)| {
282 tracing::debug!(direction = "return", %destination, index = i, path = %rp, "resolved return path");
283 })
284 .map(|(_, rp)| rp)
285 .collect()
286 } else {
287 vec![]
288 };
289
290 trace!(%destination, num_surbs = return_paths.len(), data_len = size_hint, "resolved packet");
291
292 Ok((
293 ResolvedTransportRouting::Forward {
294 pseudonym: pseudonym.unwrap_or_else(HoprPseudonym::random),
295 forward_path,
296 return_paths,
297 },
298 None,
299 ))
300 }
301
302 DestinationRouting::Return(matcher) => {
303 let FoundSurb {
304 sender_id,
305 surb,
306 remaining,
307 } = self
308 .surb_store
309 .find_surb(matcher)
310 .ok_or_else(|| PathPlannerError::Surb(format!("no surb for pseudonym {}", matcher.pseudonym())))?;
311 Ok((ResolvedTransportRouting::Return(sender_id, surb), Some(remaining)))
312 }
313 }
314 }
315}
316
317impl<Surb, R, S> BackgroundPathCacheRefreshable for PathPlanner<Surb, R, S>
318where
319 Surb: SurbStore + Send + Sync + 'static,
320 R: ChainKeyOperations + ChainReadChannelOperations + Send + Sync + 'static,
321 S: PathSelector + Send + Sync + 'static,
322{
323 fn run_background_refresh(&self) -> impl std::future::Future<Output = ()> + Send + 'static {
329 let cache = self.cache.clone();
331 let resolver = self.resolver.clone();
332 let selector = self.selector.clone();
333 let refresh_period = self.refresh_period;
334
335 futures_time::stream::interval(futures_time::time::Duration::from_millis(
337 refresh_period.as_millis() as u64 + 1u64,
338 ))
339 .for_each(move |_| {
340 let cache = cache.clone();
341 let resolver = resolver.clone();
342 let selector = selector.clone();
343
344 async move {
345 for (key, _) in cache.iter() {
346 let (src, dest, hops_u32) = {
347 let k = key.as_ref();
348 (k.0, k.1, k.2)
349 };
350
351 if hops_u32 == 0 {
352 continue;
353 }
354 let hops_usize = hops_u32 as usize;
355
356 let resolve_key = |node: NodeId| {
357 let resolver = resolver.clone();
358
359 async move {
360 match node {
361 NodeId::Offchain(k) => Some(k),
362 NodeId::Chain(addr) => ChainPathResolver::from(&*resolver)
363 .resolve_transport_address(&addr)
364 .await
365 .ok()
366 .flatten(),
367 }
368 }
369 };
370
371 if let (Some(src_key), Some(dest_key)) = (resolve_key(src).await, resolve_key(dest).await)
372 && let Ok(candidates) = selector.select_path(src_key, dest_key, hops_usize)
373 {
374 let chain_resolver = ChainPathResolver::from(&*resolver);
375 let mut valid_paths: Vec<(ValidatedPath, f64)> = Vec::new();
376 for pwc in candidates {
377 let node_ids: Vec<NodeId> = pwc.path.into_iter().map(NodeId::Offchain).collect::<Vec<_>>();
378 match ValidatedPath::new(src, node_ids, &chain_resolver).await {
379 Ok(vp) => {
380 if vp.chain_path().iter().enumerate().any(|(i, a)| vp.chain_path()[..i].contains(a)) {
381 tracing::debug!(path = %vp, "background refresh: skipping candidate with repeated chain addresses");
382 continue;
383 }
384 valid_paths.push((vp, pwc.cost))
385 }
386 Err(e) => tracing::warn!(error = %e, "background refresh: path candidate failed validation"),
387 }
388 }
389
390 if !valid_paths.is_empty() {
391 cache
392 .insert(
393 (src, dest, hops_u32),
394 Arc::new(hopr_utils::statistics::WeightedCollection::new(valid_paths)),
395 )
396 .await;
397 }
398 }
399 }
400 }
401 })
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use std::str::FromStr;
408
409 use bimap::BiMap;
410 use futures::stream::{self, BoxStream};
411 use hex_literal::hex;
412 use hopr_api::{
413 chain::{ChainKeyOperations, ChainReadChannelOperations, ChannelSelector, HoprKeyIdent},
414 graph::{
415 NetworkGraphWrite,
416 traits::{EdgeObservableWrite, EdgeWeightType},
417 },
418 };
419 use hopr_network_graph::ChannelGraph;
420 use hopr_types::{
421 crypto::prelude::{Keypair, OffchainKeypair},
422 internal::channels::{ChannelEntry, ChannelStatus, generate_channel_id},
423 primitive::prelude::*,
424 };
425
426 use super::*;
427 use crate::path::selector::HoprGraphPathSelector;
428
429 #[derive(Debug)]
430 struct TestError(String);
431
432 impl std::fmt::Display for TestError {
433 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434 f.write_str(&self.0)
435 }
436 }
437
438 impl std::error::Error for TestError {}
439
440 const SECRET_ME: [u8; 32] = hex!("60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d");
441 const SECRET_A: [u8; 32] = hex!("71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a");
442 const SECRET_DEST: [u8; 32] = hex!("c24bd833704dd2abdae3933fcc9962c2ac404f84132224c474147382d4db2299");
443
444 fn pubkey(secret: &[u8; 32]) -> OffchainPublicKey {
445 *OffchainKeypair::from_secret(secret).expect("valid secret").public()
446 }
447
448 #[derive(Clone)]
449 struct Mapper {
450 map: Arc<BiMap<OffchainPublicKey, HoprKeyIdent>>,
451 }
452
453 impl KeyIdMapping<HoprKeyIdent, OffchainPublicKey> for Mapper {
454 fn map_key_to_id(&self, key: &OffchainPublicKey) -> Option<HoprKeyIdent> {
455 self.map.get_by_left(key).copied()
456 }
457
458 fn map_id_to_public(&self, id: &HoprKeyIdent) -> Option<OffchainPublicKey> {
459 self.map.get_by_right(id).copied()
460 }
461
462 fn map_keys_to_ids(&self, keys: &[OffchainPublicKey]) -> Vec<Option<HoprKeyIdent>> {
463 keys.iter().map(|key| self.map_key_to_id(key)).collect()
464 }
465
466 fn map_ids_to_keys(&self, ids: &[HoprKeyIdent]) -> Vec<Option<OffchainPublicKey>> {
467 ids.iter().map(|id| self.map_id_to_public(id)).collect()
468 }
469 }
470
471 struct TestChainApi {
472 me: Address,
473 key_addr_map: BiMap<OffchainPublicKey, Address>,
474 channels: Vec<ChannelEntry>,
475 id_mapper: Mapper,
476 }
477
478 impl TestChainApi {
479 fn new(me_key: OffchainPublicKey, me_addr: Address, peers: Vec<(OffchainPublicKey, Address)>) -> Self {
480 let mut key_addr_map = BiMap::new();
481 let mut key_id_map: BiMap<OffchainPublicKey, HoprKeyIdent> = BiMap::new();
482 key_addr_map.insert(me_key, me_addr);
483 key_id_map.insert(me_key, 0u32.into());
484 for (i, (k, a)) in peers.iter().enumerate() {
485 key_addr_map.insert(*k, *a);
486 key_id_map.insert(*k, ((i + 1) as u32).into());
487 }
488 Self {
489 me: me_addr,
490 key_addr_map,
491 channels: vec![],
492 id_mapper: Mapper {
493 map: Arc::new(key_id_map),
494 },
495 }
496 }
497
498 fn with_open_channel(mut self, src: Address, dst: Address) -> Self {
499 self.channels.push(
500 ChannelEntry::builder()
501 .between(src, dst)
502 .amount(100)
503 .ticket_index(1)
504 .status(ChannelStatus::Open)
505 .epoch(1)
506 .build()
507 .unwrap(),
508 );
509 self
510 }
511 }
512
513 impl ChainKeyOperations for TestChainApi {
514 type Error = TestError;
515 type Mapper = Mapper;
516
517 fn chain_key_to_packet_key(
518 &self,
519 chain: &Address,
520 ) -> std::result::Result<Option<OffchainPublicKey>, TestError> {
521 Ok(self.key_addr_map.get_by_right(chain).copied())
522 }
523
524 fn packet_key_to_chain_key(
525 &self,
526 packet: &OffchainPublicKey,
527 ) -> std::result::Result<Option<Address>, TestError> {
528 Ok(self.key_addr_map.get_by_left(packet).copied())
529 }
530
531 fn key_id_mapper_ref(&self) -> &Self::Mapper {
532 &self.id_mapper
533 }
534 }
535
536 impl ChainReadChannelOperations for TestChainApi {
537 type Error = TestError;
538
539 fn me(&self) -> &Address {
540 &self.me
541 }
542
543 fn channel_by_id(&self, channel_id: &ChannelId) -> std::result::Result<Option<ChannelEntry>, TestError> {
544 Ok(self
545 .channels
546 .iter()
547 .find(|c| generate_channel_id(&c.source, &c.destination) == *channel_id)
548 .cloned())
549 }
550
551 fn stream_channels<'a>(
552 &'a self,
553 _selector: ChannelSelector,
554 ) -> std::result::Result<BoxStream<'a, ChannelEntry>, TestError> {
555 Ok(Box::pin(stream::iter(self.channels.clone())))
556 }
557 }
558
559 fn me_addr() -> Address {
561 Address::from_str("0x1000d5786d9e6799b3297da1ad55605b91e2c882").expect("valid addr")
562 }
563 fn a_addr() -> Address {
564 Address::from_str("0x200060ddced1e33c9647a71f4fc2cf4ed33e4a9d").expect("valid addr")
565 }
566 fn dest_addr() -> Address {
567 Address::from_str("0x30004105095c8c10f804109b4d1199a9ac40ed46").expect("valid addr")
568 }
569
570 fn mark_edge_full(graph: &ChannelGraph, src: &OffchainPublicKey, dst: &OffchainPublicKey) {
572 use hopr_api::graph::traits::EdgeWeightType;
573 graph.upsert_edge(src, dst, |obs| {
574 obs.record(EdgeWeightType::Connected(true));
575 obs.record(EdgeWeightType::Immediate(Ok(std::time::Duration::from_millis(50))));
576 obs.record(EdgeWeightType::Intermediate(Ok(std::time::Duration::from_millis(50))));
577 obs.record(EdgeWeightType::Capacity(Some(1000)));
578 });
579 }
580
581 fn mark_edge_last(graph: &ChannelGraph, src: &OffchainPublicKey, dst: &OffchainPublicKey) {
582 graph.upsert_edge(src, dst, |obs| {
583 obs.record(EdgeWeightType::Connected(true));
584 obs.record(EdgeWeightType::Immediate(Ok(std::time::Duration::from_millis(50))));
585 obs.record(EdgeWeightType::Intermediate(Ok(std::time::Duration::from_millis(50))));
586 obs.record(EdgeWeightType::Capacity(Some(1000)));
587 });
588 }
589
590 fn small_config() -> PathPlannerConfig {
591 PathPlannerConfig {
592 max_cache_capacity: 100,
593 cache_ttl: std::time::Duration::from_secs(60),
594 refresh_period: std::time::Duration::from_secs(60),
595 max_cached_paths: 2,
596 ..PathPlannerConfig::default()
597 }
598 }
599
600 #[tokio::test]
603 async fn zero_hop_path_should_bypass_selector() {
604 let me = pubkey(&SECRET_ME);
605 let dest = pubkey(&SECRET_DEST);
606
607 let graph = ChannelGraph::new(me);
609 let cfg = small_config();
610 let selector = HoprGraphPathSelector::new(me, graph, cfg.max_cached_paths, cfg.edge_penalty, cfg.min_ack_rate);
611
612 let chain_api = TestChainApi::new(me, me_addr(), vec![(dest, dest_addr())]);
613 let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
614
615 let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
616
617 let routing = DestinationRouting::Forward {
618 destination: Box::new(NodeId::Offchain(dest)),
619 pseudonym: None,
620 forward_options: RoutingOptions::Hops(0.try_into().expect("valid 0")),
621 return_options: None,
622 };
623
624 let result = planner.resolve_routing(100, 0, routing).await;
625 assert!(result.is_ok(), "zero-hop should succeed: {:?}", result.err());
626
627 let (resolved, rem) = result.unwrap();
628 assert!(rem.is_none());
629 if let ResolvedTransportRouting::Forward { forward_path, .. } = resolved {
630 assert_eq!(
631 forward_path.num_hops(),
632 1,
633 "zero-hop = 1 node in path (just destination)"
634 );
635 } else {
636 panic!("expected Forward routing");
637 }
638 }
639
640 #[tokio::test]
643 async fn one_hop_path_should_use_selector() {
644 let me = pubkey(&SECRET_ME);
645 let a = pubkey(&SECRET_A);
646 let dest = pubkey(&SECRET_DEST);
647
648 let graph = ChannelGraph::new(me);
649 graph.add_node(a);
650 graph.add_node(dest);
651 graph.add_edge(&me, &a).unwrap();
652 graph.add_edge(&a, &dest).unwrap();
653 mark_edge_full(&graph, &me, &a);
654 mark_edge_last(&graph, &a, &dest);
655
656 let cfg = small_config();
657 let selector = HoprGraphPathSelector::new(me, graph, cfg.max_cached_paths, cfg.edge_penalty, cfg.min_ack_rate);
658
659 let chain_api = TestChainApi::new(me, me_addr(), vec![(a, a_addr()), (dest, dest_addr())])
660 .with_open_channel(me_addr(), a_addr())
661 .with_open_channel(a_addr(), dest_addr());
662
663 let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
664 let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
665
666 let routing = DestinationRouting::Forward {
667 destination: Box::new(NodeId::Offchain(dest)),
668 pseudonym: None,
669 forward_options: RoutingOptions::Hops(1.try_into().expect("valid 1")),
670 return_options: None,
671 };
672
673 let result = planner.resolve_routing(100, 0, routing).await;
674 assert!(result.is_ok(), "1-hop routing should succeed: {:?}", result.err());
675
676 let (resolved, _) = result.unwrap();
677 if let ResolvedTransportRouting::Forward { forward_path, .. } = resolved {
678 assert_eq!(
679 forward_path.num_hops(),
680 2,
681 "1 intermediate hop means path has 2 nodes [a, dest]"
682 );
683 } else {
684 panic!("expected Forward routing");
685 }
686 }
687
688 #[tokio::test]
689 async fn explicit_intermediate_path_should_bypass_selector() {
690 let me = pubkey(&SECRET_ME);
691 let a = pubkey(&SECRET_A);
692 let dest = pubkey(&SECRET_DEST);
693
694 let graph = ChannelGraph::new(me);
696 let cfg = small_config();
697 let selector = HoprGraphPathSelector::new(me, graph, cfg.max_cached_paths, cfg.edge_penalty, cfg.min_ack_rate);
698
699 let chain_api = TestChainApi::new(me, me_addr(), vec![(a, a_addr()), (dest, dest_addr())])
700 .with_open_channel(me_addr(), a_addr())
701 .with_open_channel(a_addr(), dest_addr());
702
703 let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
704 let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
705
706 use hopr_types::primitive::prelude::BoundedVec;
707 let intermediate_path = BoundedVec::try_from(vec![NodeId::Offchain(a)]).expect("valid");
708
709 let routing = DestinationRouting::Forward {
710 destination: Box::new(NodeId::Offchain(dest)),
711 pseudonym: None,
712 forward_options: RoutingOptions::IntermediatePath(intermediate_path),
713 return_options: None,
714 };
715
716 let result = planner.resolve_routing(100, 0, routing).await;
717 assert!(result.is_ok(), "explicit path should succeed: {:?}", result.err());
718
719 let (resolved, _) = result.unwrap();
720 if let ResolvedTransportRouting::Forward { forward_path, .. } = resolved {
721 assert_eq!(forward_path.num_hops(), 2, "one intermediate + destination = 2 hops");
722 } else {
723 panic!("expected Forward routing");
724 }
725 }
726
727 #[tokio::test]
728 async fn return_routing_without_surb_should_return_error() {
729 let me = pubkey(&SECRET_ME);
730 let graph = ChannelGraph::new(me);
731 let cfg = small_config();
732 let selector = HoprGraphPathSelector::new(me, graph, cfg.max_cached_paths, cfg.edge_penalty, cfg.min_ack_rate);
733 let chain_api = TestChainApi::new(me, me_addr(), vec![]);
734 let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
735
736 let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
737
738 use hopr_types::internal::routing::SurbMatcher;
739 let matcher = SurbMatcher::Pseudonym(HoprPseudonym::random());
740 let routing = DestinationRouting::Return(matcher);
741
742 let result = planner.resolve_routing(0, 0, routing).await;
743 assert!(result.is_err(), "should fail when no SURB exists");
744 assert!(
745 matches!(result.unwrap_err(), PathPlannerError::Surb(_)),
746 "error should be Surb variant"
747 );
748 }
749
750 #[tokio::test]
753 async fn planner_cache_miss_should_populate_cache() {
754 let me = pubkey(&SECRET_ME);
755 let a = pubkey(&SECRET_A);
756 let dest = pubkey(&SECRET_DEST);
757
758 let graph = ChannelGraph::new(me);
759 graph.add_node(a);
760 graph.add_node(dest);
761 graph.add_edge(&me, &a).unwrap();
762 graph.add_edge(&a, &dest).unwrap();
763 mark_edge_full(&graph, &me, &a);
764 mark_edge_last(&graph, &a, &dest);
765
766 let cfg = small_config();
767 let selector = HoprGraphPathSelector::new(me, graph, cfg.max_cached_paths, cfg.edge_penalty, cfg.min_ack_rate);
768 let chain_api = TestChainApi::new(me, me_addr(), vec![(a, a_addr()), (dest, dest_addr())])
769 .with_open_channel(me_addr(), a_addr())
770 .with_open_channel(a_addr(), dest_addr());
771 let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
772 let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
773
774 let cache_key: PlannerCacheKey = (NodeId::Offchain(me), NodeId::Offchain(dest), 1);
775
776 assert!(
777 planner.cache.get(&cache_key).await.is_none(),
778 "cache should be empty before first call"
779 );
780
781 let routing = DestinationRouting::Forward {
782 destination: Box::new(NodeId::Offchain(dest)),
783 pseudonym: None,
784 forward_options: RoutingOptions::Hops(1.try_into().expect("valid 1")),
785 return_options: None,
786 };
787 planner.resolve_routing(100, 0, routing).await.expect("should succeed");
788
789 let cached = planner.cache.get(&cache_key).await;
790 assert!(cached.is_some(), "cache should be populated after first call");
791 let paths = cached.unwrap();
792 assert!(!paths.is_empty(), "cached paths must be non-empty");
793 let (first_path, first_cost) = paths.iter().next().expect("at least one cached path");
794 assert_eq!(first_path.num_hops(), 2, "path should have 2 hops [a, dest]");
795 assert!(*first_cost > 0.0, "cost should be positive");
796 }
797
798 #[tokio::test]
799 async fn planner_cache_hit_should_return_valid_path() {
800 let me = pubkey(&SECRET_ME);
801 let a = pubkey(&SECRET_A);
802 let dest = pubkey(&SECRET_DEST);
803
804 let graph = ChannelGraph::new(me);
805 graph.add_node(a);
806 graph.add_node(dest);
807 graph.add_edge(&me, &a).unwrap();
808 graph.add_edge(&a, &dest).unwrap();
809 mark_edge_full(&graph, &me, &a);
810 mark_edge_last(&graph, &a, &dest);
811
812 let cfg = small_config();
813 let selector = HoprGraphPathSelector::new(me, graph, cfg.max_cached_paths, cfg.edge_penalty, cfg.min_ack_rate);
814 let chain_api = TestChainApi::new(me, me_addr(), vec![(a, a_addr()), (dest, dest_addr())])
815 .with_open_channel(me_addr(), a_addr())
816 .with_open_channel(a_addr(), dest_addr());
817 let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
818 let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
819
820 let make_routing = || DestinationRouting::Forward {
821 destination: Box::new(NodeId::Offchain(dest)),
822 pseudonym: None,
823 forward_options: RoutingOptions::Hops(1.try_into().expect("valid 1")),
824 return_options: None,
825 };
826
827 let (r1, _) = planner.resolve_routing(100, 0, make_routing()).await.expect("call 1");
828 let (r2, _) = planner.resolve_routing(100, 0, make_routing()).await.expect("call 2");
829
830 let hops1 = if let ResolvedTransportRouting::Forward { forward_path, .. } = r1 {
831 forward_path.num_hops()
832 } else {
833 panic!("expected Forward");
834 };
835 let hops2 = if let ResolvedTransportRouting::Forward { forward_path, .. } = r2 {
836 forward_path.num_hops()
837 } else {
838 panic!("expected Forward");
839 };
840 assert_eq!(hops1, 2);
841 assert_eq!(hops2, 2);
842 }
843
844 #[tokio::test]
845 async fn background_refresh_should_produce_a_future() {
846 let me = pubkey(&SECRET_ME);
847 let graph = ChannelGraph::new(me);
848 let cfg = small_config();
849 let selector = HoprGraphPathSelector::new(me, graph, cfg.max_cached_paths, cfg.edge_penalty, cfg.min_ack_rate);
850 let chain_api = TestChainApi::new(me, me_addr(), vec![]);
851 let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
852
853 let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
854 let _future = planner.run_background_refresh();
856 }
857}