Skip to main content

hopr_transport_path/
planner.rs

1use std::{sync::Arc, time::Duration};
2
3use futures::{StreamExt as _, TryStreamExt, stream::FuturesUnordered};
4use hopr_api::chain::{ChainKeyOperations, ChainPathResolver, ChainReadChannelOperations};
5use hopr_crypto_packet::prelude::*;
6use hopr_protocol_hopr::{FoundSurb, SurbStore};
7#[cfg(all(feature = "telemetry", not(test)))]
8use hopr_types::internal::path::Path;
9use hopr_types::{
10    crypto::{crypto_traits::Randomizable, types::OffchainPublicKey},
11    internal::{errors::PathError, prelude::*},
12    primitive::traits::ToHex,
13};
14use tracing::trace;
15
16use crate::{
17    errors::{PathPlannerError, Result},
18    traits::{BackgroundPathCacheRefreshable, PathSelector},
19};
20
21#[cfg(all(feature = "telemetry", not(test)))]
22lazy_static::lazy_static! {
23    static ref METRIC_PATH_LENGTH: hopr_metrics::SimpleHistogram = hopr_metrics::SimpleHistogram::new(
24        "hopr_path_length",
25        "Distribution of number of hops of sent messages",
26        vec![0.0, 1.0, 2.0, 3.0, 4.0]
27    ).unwrap();
28}
29
30/// Configuration for [`PathPlanner`]'s internal path cache.
31#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault)]
32pub struct PathPlannerConfig {
33    /// Maximum number of `(source, destination, options)` entries in the path cache.
34    #[default = 10_000]
35    pub max_cache_capacity: u64,
36    /// Time-to-live for a cached path list.  When an entry expires the next
37    /// [`PathPlanner::resolve_routing`] call transparently recomputes it (lazy refresh).
38    #[default(Duration::from_secs(60))]
39    pub cache_ttl: Duration,
40    /// Period between proactive background cache-refresh sweeps.
41    #[default(Duration::from_secs(30))]
42    pub refresh_period: Duration,
43    /// Maximum number of candidate paths the selector may return per query.
44    /// All returned candidates are validated and cached.
45    #[default = 8]
46    pub max_cached_paths: usize,
47}
48
49/// Cache key for the path planner: `(source, destination, hops)`.
50///
51/// Only the `Hops` variant of [`RoutingOptions`] is cached (explicit intermediate
52/// paths bypass the cache), so the key stores the hop count as a plain `u32`.
53type PlannerCacheKey = (NodeId, NodeId, u32);
54type PlannerCacheValue = Arc<hopr_statistics::WeightedCollection<ValidatedPath>>;
55
56/// Path planner that resolves [`DestinationRouting`] to [`ResolvedTransportRouting`].
57///
58/// The planner delegates path *discovery* to any [`PathSelector`] implementation and
59/// owns the `moka` cache of fully-validated [`ValidatedPath`] objects paired with
60/// their traversal cost, keyed by `(source: NodeId, destination: NodeId, hops: u32)`.
61///
62/// On a cache miss the planner calls the selector, validates every candidate against
63/// the chain resolver, and stores an `Arc<WeightedCollection<ValidatedPath>>` in the
64/// cache. On a cache hit a candidate is picked via weighted random selection (higher
65/// cost = higher quality = higher probability).
66///
67/// A background sweep (`background_refresh`) can be spawned to
68/// proactively re-warm the cache for all previously-seen keys.
69#[derive(Clone)]
70pub struct PathPlanner<Surb, R, S> {
71    me: OffchainPublicKey,
72    pub surb_store: Surb,
73    resolver: Arc<R>,
74    selector: Arc<S>,
75    cache: moka::future::Cache<PlannerCacheKey, PlannerCacheValue>,
76    refresh_period: Duration,
77}
78
79impl<Surb, R, S> PathPlanner<Surb, R, S>
80where
81    Surb: SurbStore + Send + Sync + 'static,
82    R: ChainKeyOperations + ChainReadChannelOperations + Send + Sync + 'static,
83    S: PathSelector + Send + Sync + 'static,
84{
85    /// Create a new path planner.
86    ///
87    /// `me` is this node's [`OffchainPublicKey`]; it is used as the source in path queries.
88    pub fn new(me: OffchainPublicKey, surb_store: Surb, resolver: R, selector: S, config: PathPlannerConfig) -> Self {
89        let cache = moka::future::Cache::builder()
90            .max_capacity(config.max_cache_capacity)
91            .time_to_live(config.cache_ttl)
92            .build();
93
94        Self {
95            me,
96            surb_store,
97            resolver: Arc::new(resolver),
98            selector: Arc::new(selector),
99            cache,
100            refresh_period: config.refresh_period,
101        }
102    }
103
104    /// Resolve a [`NodeId`] to an [`OffchainPublicKey`].
105    async fn resolve_node_id_to_offchain_key(&self, node_id: &NodeId) -> Result<OffchainPublicKey> {
106        match node_id {
107            NodeId::Offchain(key) => Ok(*key),
108            NodeId::Chain(addr) => {
109                let resolver = ChainPathResolver::from(&*self.resolver);
110                resolver
111                    .resolve_transport_address(addr)
112                    .await
113                    .map_err(|e| PathPlannerError::Other(anyhow::anyhow!("{e}")))?
114                    .ok_or_else(|| {
115                        PathPlannerError::Other(anyhow::anyhow!("no offchain key found for chain address {addr}"))
116                    })
117            }
118        }
119    }
120
121    #[tracing::instrument(level = "trace", skip(self))]
122    async fn resolve_path(
123        &self,
124        source: NodeId,
125        destination: NodeId,
126        options: RoutingOptions,
127    ) -> Result<ValidatedPath> {
128        let path = match options {
129            RoutingOptions::IntermediatePath(explicit_path) => {
130                trace!(?explicit_path, "resolving an explicit intermediate path");
131                let resolver = ChainPathResolver::from(&*self.resolver);
132                ValidatedPath::new(
133                    source,
134                    explicit_path
135                        .into_iter()
136                        .chain(std::iter::once(destination))
137                        .collect::<Vec<_>>(),
138                    &resolver,
139                )
140                .await?
141            }
142
143            RoutingOptions::Hops(hops) if u32::from(hops) == 0 => {
144                trace!(hops = 0, "resolving zero-hop direct path");
145                let resolver = ChainPathResolver::from(&*self.resolver);
146                ValidatedPath::new(source, vec![destination], &resolver).await?
147            }
148
149            RoutingOptions::Hops(hops) => {
150                let hops_usize: usize = hops.into();
151                trace!(hops = hops_usize, "resolving path via planner cache");
152
153                let src_key = self.resolve_node_id_to_offchain_key(&source).await?;
154                let dest_key = self.resolve_node_id_to_offchain_key(&destination).await?;
155
156                let cache_key: PlannerCacheKey = (source, destination, u32::from(hops));
157
158                let resolver = self.resolver.clone();
159                let selector = self.selector.clone();
160
161                let paths = self
162                    .cache
163                    .try_get_with(cache_key, async move {
164                        trace!(hops = hops_usize, "path cache miss, querying selector");
165                        let candidates = selector.select_path(src_key, dest_key, hops_usize)?;
166
167                        let chain_resolver = ChainPathResolver::from(&*resolver);
168                        let mut valid_paths: Vec<(ValidatedPath, f64)> = Vec::new();
169                        for pwc in candidates {
170                            let node_ids: Vec<NodeId> = pwc.path.into_iter().map(NodeId::Offchain).collect::<Vec<_>>();
171                            match ValidatedPath::new(source, node_ids, &chain_resolver).await {
172                                Ok(vp) => valid_paths.push((vp, pwc.cost)),
173                                Err(e) => trace!(error = %e, "path candidate failed validation"),
174                            }
175                        }
176
177                        if valid_paths.is_empty() {
178                            return Err(PathPlannerError::Path(PathError::PathNotFound(
179                                hops_usize,
180                                src_key.to_hex(),
181                                dest_key.to_hex(),
182                            )));
183                        }
184
185                        Ok(Arc::new(hopr_statistics::WeightedCollection::new(valid_paths)))
186                    })
187                    .await
188                    .map_err(PathPlannerError::CacheError)?;
189
190                paths.pick_one().ok_or_else(|| {
191                    PathPlannerError::Path(PathError::PathNotFound(hops_usize, src_key.to_hex(), dest_key.to_hex()))
192                })?
193            }
194        };
195
196        #[cfg(all(feature = "telemetry", not(test)))]
197        {
198            hopr_metrics::SimpleHistogram::observe(&METRIC_PATH_LENGTH, (path.num_hops() - 1) as f64);
199        }
200
201        trace!(%path, "validated resolved path");
202        Ok(path)
203    }
204
205    /// Resolve a [`DestinationRouting`] to a [`ResolvedTransportRouting`].
206    ///
207    /// Returns the resolved routing and, for `Return` variants, the number of remaining SURBs.
208    #[tracing::instrument(level = "trace", skip(self))]
209    pub async fn resolve_routing(
210        &self,
211        size_hint: usize,
212        max_surbs: usize,
213        routing: DestinationRouting,
214    ) -> Result<(ResolvedTransportRouting<HoprSurb>, Option<usize>)> {
215        match routing {
216            DestinationRouting::Forward {
217                destination,
218                pseudonym,
219                forward_options,
220                return_options,
221            } => {
222                let forward_path = self
223                    .resolve_path(NodeId::Offchain(self.me), *destination, forward_options)
224                    .await?;
225
226                let return_paths = if let Some(return_options) = return_options {
227                    let num_possible_surbs = HoprPacket::max_surbs_with_message(size_hint).min(max_surbs);
228                    trace!(
229                        %destination,
230                        %num_possible_surbs,
231                        data_len = size_hint,
232                        max_surbs,
233                        "resolving packet return paths"
234                    );
235
236                    (0..num_possible_surbs)
237                        .map(|_| self.resolve_path(*destination, NodeId::Offchain(self.me), return_options.clone()))
238                        .collect::<FuturesUnordered<_>>()
239                        .try_collect::<Vec<ValidatedPath>>()
240                        .await?
241                } else {
242                    vec![]
243                };
244
245                trace!(%destination, num_surbs = return_paths.len(), data_len = size_hint, "resolved packet");
246
247                Ok((
248                    ResolvedTransportRouting::Forward {
249                        pseudonym: pseudonym.unwrap_or_else(HoprPseudonym::random),
250                        forward_path,
251                        return_paths,
252                    },
253                    None,
254                ))
255            }
256
257            DestinationRouting::Return(matcher) => {
258                let FoundSurb {
259                    sender_id,
260                    surb,
261                    remaining,
262                } =
263                    self.surb_store.find_surb(matcher).await.ok_or_else(|| {
264                        PathPlannerError::Surb(format!("no surb for pseudonym {}", matcher.pseudonym()))
265                    })?;
266                Ok((ResolvedTransportRouting::Return(sender_id, surb), Some(remaining)))
267            }
268        }
269    }
270}
271
272impl<Surb, R, S> BackgroundPathCacheRefreshable for PathPlanner<Surb, R, S>
273where
274    Surb: SurbStore + Send + Sync + 'static,
275    R: ChainKeyOperations + ChainReadChannelOperations + Send + Sync + 'static,
276    S: PathSelector + Send + Sync + 'static,
277{
278    /// Returns a future that runs the background path-cache refresh loop.
279    ///
280    /// The returned future iterates over all keys currently in the planner's cache
281    /// and recomputes their paths on a configurable schedule, so that steady-state
282    /// traffic is always served from cache.
283    fn run_background_refresh(&self) -> impl std::future::Future<Output = ()> + Send + 'static {
284        // Clone only the fields we need — avoids requiring R: Clone + S: Clone.
285        let cache = self.cache.clone();
286        let resolver = self.resolver.clone();
287        let selector = self.selector.clone();
288        let refresh_period = self.refresh_period;
289
290        // run at a non-zero interval
291        futures_time::stream::interval(futures_time::time::Duration::from_millis(
292            refresh_period.as_millis() as u64 + 1u64,
293        ))
294        .for_each(move |_| {
295            let cache = cache.clone();
296            let resolver = resolver.clone();
297            let selector = selector.clone();
298
299            async move {
300                for (key, _) in cache.iter() {
301                    let (src, dest, hops_u32) = {
302                        let k = key.as_ref();
303                        (k.0, k.1, k.2)
304                    };
305
306                    if hops_u32 == 0 {
307                        continue;
308                    }
309                    let hops_usize = hops_u32 as usize;
310
311                    let resolve_key = |node: NodeId| {
312                        let resolver = resolver.clone();
313
314                        async move {
315                            match node {
316                                NodeId::Offchain(k) => Some(k),
317                                NodeId::Chain(addr) => ChainPathResolver::from(&*resolver)
318                                    .resolve_transport_address(&addr)
319                                    .await
320                                    .ok()
321                                    .flatten(),
322                            }
323                        }
324                    };
325
326                    if let (Some(src_key), Some(dest_key)) = (resolve_key(src).await, resolve_key(dest).await)
327                        && let Ok(candidates) = selector.select_path(src_key, dest_key, hops_usize)
328                    {
329                        let chain_resolver = ChainPathResolver::from(&*resolver);
330                        let mut valid_paths: Vec<(ValidatedPath, f64)> = Vec::new();
331                        for pwc in candidates {
332                            let node_ids: Vec<NodeId> = pwc.path.into_iter().map(NodeId::Offchain).collect::<Vec<_>>();
333                            match ValidatedPath::new(src, node_ids, &chain_resolver).await {
334                                Ok(vp) => valid_paths.push((vp, pwc.cost)),
335                                Err(e) => trace!(error = %e, "background refresh: path candidate failed validation"),
336                            }
337                        }
338
339                        if !valid_paths.is_empty() {
340                            cache
341                                .insert(
342                                    (src, dest, hops_u32),
343                                    Arc::new(hopr_statistics::WeightedCollection::new(valid_paths)),
344                                )
345                                .await;
346                        }
347                    }
348                }
349            }
350        })
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use std::str::FromStr;
357
358    use async_trait::async_trait;
359    use bimap::BiMap;
360    use futures::stream::{self, BoxStream};
361    use hex_literal::hex;
362    use hopr_api::{
363        chain::{ChainKeyOperations, ChainReadChannelOperations, ChannelSelector, HoprKeyIdent},
364        graph::{
365            NetworkGraphWrite,
366            traits::{EdgeObservableWrite, EdgeWeightType},
367        },
368    };
369    use hopr_network_graph::ChannelGraph;
370    use hopr_types::{
371        crypto::prelude::{Keypair, OffchainKeypair},
372        internal::channels::{ChannelEntry, ChannelStatus, generate_channel_id},
373        primitive::prelude::*,
374    };
375
376    use super::*;
377    use crate::selector::HoprGraphPathSelector;
378
379    #[derive(Debug)]
380    struct TestError(String);
381
382    impl std::fmt::Display for TestError {
383        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
384            f.write_str(&self.0)
385        }
386    }
387
388    impl std::error::Error for TestError {}
389
390    const SECRET_ME: [u8; 32] = hex!("60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d");
391    const SECRET_A: [u8; 32] = hex!("71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a");
392    const SECRET_DEST: [u8; 32] = hex!("c24bd833704dd2abdae3933fcc9962c2ac404f84132224c474147382d4db2299");
393
394    fn pubkey(secret: &[u8; 32]) -> OffchainPublicKey {
395        *OffchainKeypair::from_secret(secret).expect("valid secret").public()
396    }
397
398    #[derive(Clone)]
399    struct Mapper {
400        map: Arc<BiMap<OffchainPublicKey, HoprKeyIdent>>,
401    }
402
403    impl KeyIdMapping<HoprKeyIdent, OffchainPublicKey> for Mapper {
404        fn map_key_to_id(&self, key: &OffchainPublicKey) -> Option<HoprKeyIdent> {
405            self.map.get_by_left(key).copied()
406        }
407
408        fn map_id_to_public(&self, id: &HoprKeyIdent) -> Option<OffchainPublicKey> {
409            self.map.get_by_right(id).copied()
410        }
411
412        fn map_keys_to_ids(&self, keys: &[OffchainPublicKey]) -> Vec<Option<HoprKeyIdent>> {
413            keys.iter().map(|key| self.map_key_to_id(key)).collect()
414        }
415
416        fn map_ids_to_keys(&self, ids: &[HoprKeyIdent]) -> Vec<Option<OffchainPublicKey>> {
417            ids.iter().map(|id| self.map_id_to_public(id)).collect()
418        }
419    }
420
421    struct TestChainApi {
422        me: Address,
423        key_addr_map: BiMap<OffchainPublicKey, Address>,
424        channels: Vec<ChannelEntry>,
425        id_mapper: Mapper,
426    }
427
428    impl TestChainApi {
429        fn new(me_key: OffchainPublicKey, me_addr: Address, peers: Vec<(OffchainPublicKey, Address)>) -> Self {
430            let mut key_addr_map = BiMap::new();
431            let mut key_id_map: BiMap<OffchainPublicKey, HoprKeyIdent> = BiMap::new();
432            key_addr_map.insert(me_key, me_addr);
433            key_id_map.insert(me_key, 0u32.into());
434            for (i, (k, a)) in peers.iter().enumerate() {
435                key_addr_map.insert(*k, *a);
436                key_id_map.insert(*k, ((i + 1) as u32).into());
437            }
438            Self {
439                me: me_addr,
440                key_addr_map,
441                channels: vec![],
442                id_mapper: Mapper {
443                    map: Arc::new(key_id_map),
444                },
445            }
446        }
447
448        fn with_open_channel(mut self, src: Address, dst: Address) -> Self {
449            self.channels.push(
450                ChannelEntry::builder()
451                    .between(src, dst)
452                    .amount(100)
453                    .ticket_index(1)
454                    .status(ChannelStatus::Open)
455                    .epoch(1)
456                    .build()
457                    .unwrap(),
458            );
459            self
460        }
461    }
462
463    #[async_trait]
464    impl ChainKeyOperations for TestChainApi {
465        type Error = TestError;
466        type Mapper = Mapper;
467
468        async fn chain_key_to_packet_key(
469            &self,
470            chain: &Address,
471        ) -> std::result::Result<Option<OffchainPublicKey>, TestError> {
472            Ok(self.key_addr_map.get_by_right(chain).copied())
473        }
474
475        async fn packet_key_to_chain_key(
476            &self,
477            packet: &OffchainPublicKey,
478        ) -> std::result::Result<Option<Address>, TestError> {
479            Ok(self.key_addr_map.get_by_left(packet).copied())
480        }
481
482        fn key_id_mapper_ref(&self) -> &Self::Mapper {
483            &self.id_mapper
484        }
485    }
486
487    #[async_trait]
488    impl ChainReadChannelOperations for TestChainApi {
489        type Error = TestError;
490
491        fn me(&self) -> &Address {
492            &self.me
493        }
494
495        async fn channel_by_id(&self, channel_id: &ChannelId) -> std::result::Result<Option<ChannelEntry>, TestError> {
496            Ok(self
497                .channels
498                .iter()
499                .find(|c| generate_channel_id(&c.source, &c.destination) == *channel_id)
500                .cloned())
501        }
502
503        async fn stream_channels<'a>(
504            &'a self,
505            _selector: ChannelSelector,
506        ) -> std::result::Result<BoxStream<'a, ChannelEntry>, TestError> {
507            Ok(Box::pin(stream::iter(self.channels.clone())))
508        }
509    }
510
511    // ── address fixtures ──────────────────────────────────────────────────────
512    fn me_addr() -> Address {
513        Address::from_str("0x1000d5786d9e6799b3297da1ad55605b91e2c882").expect("valid addr")
514    }
515    fn a_addr() -> Address {
516        Address::from_str("0x200060ddced1e33c9647a71f4fc2cf4ed33e4a9d").expect("valid addr")
517    }
518    fn dest_addr() -> Address {
519        Address::from_str("0x30004105095c8c10f804109b4d1199a9ac40ed46").expect("valid addr")
520    }
521
522    // ── graph helpers ──────────────────────────────────────────────────────────
523    fn mark_edge_full(graph: &ChannelGraph, src: &OffchainPublicKey, dst: &OffchainPublicKey) {
524        use hopr_api::graph::traits::EdgeWeightType;
525        graph.upsert_edge(src, dst, |obs| {
526            obs.record(EdgeWeightType::Connected(true));
527            obs.record(EdgeWeightType::Immediate(Ok(std::time::Duration::from_millis(50))));
528            obs.record(EdgeWeightType::Intermediate(Ok(std::time::Duration::from_millis(50))));
529            obs.record(EdgeWeightType::Capacity(Some(1000)));
530        });
531    }
532
533    fn mark_edge_last(graph: &ChannelGraph, src: &OffchainPublicKey, dst: &OffchainPublicKey) {
534        graph.upsert_edge(src, dst, |obs| {
535            obs.record(EdgeWeightType::Connected(true));
536            obs.record(EdgeWeightType::Immediate(Ok(std::time::Duration::from_millis(50))));
537            obs.record(EdgeWeightType::Intermediate(Ok(std::time::Duration::from_millis(50))));
538            obs.record(EdgeWeightType::Capacity(Some(1000)));
539        });
540    }
541
542    fn small_config() -> PathPlannerConfig {
543        PathPlannerConfig {
544            max_cache_capacity: 100,
545            cache_ttl: std::time::Duration::from_secs(60),
546            refresh_period: std::time::Duration::from_secs(60),
547            max_cached_paths: 2,
548        }
549    }
550
551    // ── test: zero-hop path ───────────────────────────────────────────────────
552
553    #[tokio::test]
554    async fn zero_hop_path_should_bypass_selector() {
555        let me = pubkey(&SECRET_ME);
556        let dest = pubkey(&SECRET_DEST);
557
558        // Build empty graph (no edges) — selector would fail if called.
559        let graph = ChannelGraph::new(me);
560        let selector = HoprGraphPathSelector::new(me, graph, small_config().max_cached_paths);
561
562        let chain_api = TestChainApi::new(me, me_addr(), vec![(dest, dest_addr())]);
563        let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
564
565        let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
566
567        let routing = DestinationRouting::Forward {
568            destination: Box::new(NodeId::Offchain(dest)),
569            pseudonym: None,
570            forward_options: RoutingOptions::Hops(0.try_into().expect("valid 0")),
571            return_options: None,
572        };
573
574        let result = planner.resolve_routing(100, 0, routing).await;
575        assert!(result.is_ok(), "zero-hop should succeed: {:?}", result.err());
576
577        let (resolved, rem) = result.unwrap();
578        assert!(rem.is_none());
579        if let ResolvedTransportRouting::Forward { forward_path, .. } = resolved {
580            assert_eq!(
581                forward_path.num_hops(),
582                1,
583                "zero-hop = 1 node in path (just destination)"
584            );
585        } else {
586            panic!("expected Forward routing");
587        }
588    }
589
590    // ── test: one-hop path via graph selector ─────────────────────────────────
591
592    #[tokio::test]
593    async fn one_hop_path_should_use_selector() {
594        let me = pubkey(&SECRET_ME);
595        let a = pubkey(&SECRET_A);
596        let dest = pubkey(&SECRET_DEST);
597
598        let graph = ChannelGraph::new(me);
599        graph.add_node(a);
600        graph.add_node(dest);
601        graph.add_edge(&me, &a).unwrap();
602        graph.add_edge(&a, &dest).unwrap();
603        mark_edge_full(&graph, &me, &a);
604        mark_edge_last(&graph, &a, &dest);
605
606        let selector = HoprGraphPathSelector::new(me, graph, small_config().max_cached_paths);
607
608        let chain_api = TestChainApi::new(me, me_addr(), vec![(a, a_addr()), (dest, dest_addr())])
609            .with_open_channel(me_addr(), a_addr())
610            .with_open_channel(a_addr(), dest_addr());
611
612        let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
613        let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
614
615        let routing = DestinationRouting::Forward {
616            destination: Box::new(NodeId::Offchain(dest)),
617            pseudonym: None,
618            forward_options: RoutingOptions::Hops(1.try_into().expect("valid 1")),
619            return_options: None,
620        };
621
622        let result = planner.resolve_routing(100, 0, routing).await;
623        assert!(result.is_ok(), "1-hop routing should succeed: {:?}", result.err());
624
625        let (resolved, _) = result.unwrap();
626        if let ResolvedTransportRouting::Forward { forward_path, .. } = resolved {
627            assert_eq!(
628                forward_path.num_hops(),
629                2,
630                "1 intermediate hop means path has 2 nodes [a, dest]"
631            );
632        } else {
633            panic!("expected Forward routing");
634        }
635    }
636
637    #[tokio::test]
638    async fn explicit_intermediate_path_should_bypass_selector() {
639        let me = pubkey(&SECRET_ME);
640        let a = pubkey(&SECRET_A);
641        let dest = pubkey(&SECRET_DEST);
642
643        // Empty graph — selector would fail; explicit path should not use it.
644        let graph = ChannelGraph::new(me);
645        let selector = HoprGraphPathSelector::new(me, graph, small_config().max_cached_paths);
646
647        let chain_api = TestChainApi::new(me, me_addr(), vec![(a, a_addr()), (dest, dest_addr())])
648            .with_open_channel(me_addr(), a_addr())
649            .with_open_channel(a_addr(), dest_addr());
650
651        let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
652        let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
653
654        use hopr_types::primitive::prelude::BoundedVec;
655        let intermediate_path = BoundedVec::try_from(vec![NodeId::Offchain(a)]).expect("valid");
656
657        let routing = DestinationRouting::Forward {
658            destination: Box::new(NodeId::Offchain(dest)),
659            pseudonym: None,
660            forward_options: RoutingOptions::IntermediatePath(intermediate_path),
661            return_options: None,
662        };
663
664        let result = planner.resolve_routing(100, 0, routing).await;
665        assert!(result.is_ok(), "explicit path should succeed: {:?}", result.err());
666
667        let (resolved, _) = result.unwrap();
668        if let ResolvedTransportRouting::Forward { forward_path, .. } = resolved {
669            assert_eq!(forward_path.num_hops(), 2, "one intermediate + destination = 2 hops");
670        } else {
671            panic!("expected Forward routing");
672        }
673    }
674
675    #[tokio::test]
676    async fn return_routing_without_surb_should_return_error() {
677        let me = pubkey(&SECRET_ME);
678        let graph = ChannelGraph::new(me);
679        let selector = HoprGraphPathSelector::new(me, graph, small_config().max_cached_paths);
680        let chain_api = TestChainApi::new(me, me_addr(), vec![]);
681        let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
682
683        let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
684
685        use hopr_types::internal::routing::SurbMatcher;
686        let matcher = SurbMatcher::Pseudonym(HoprPseudonym::random());
687        let routing = DestinationRouting::Return(matcher);
688
689        let result = planner.resolve_routing(0, 0, routing).await;
690        assert!(result.is_err(), "should fail when no SURB exists");
691        assert!(
692            matches!(result.unwrap_err(), PathPlannerError::Surb(_)),
693            "error should be Surb variant"
694        );
695    }
696
697    // ── test: cache integration ───────────────────────────────────────────────
698
699    #[tokio::test]
700    async fn planner_cache_miss_should_populate_cache() {
701        let me = pubkey(&SECRET_ME);
702        let a = pubkey(&SECRET_A);
703        let dest = pubkey(&SECRET_DEST);
704
705        let graph = ChannelGraph::new(me);
706        graph.add_node(a);
707        graph.add_node(dest);
708        graph.add_edge(&me, &a).unwrap();
709        graph.add_edge(&a, &dest).unwrap();
710        mark_edge_full(&graph, &me, &a);
711        mark_edge_last(&graph, &a, &dest);
712
713        let selector = HoprGraphPathSelector::new(me, graph, small_config().max_cached_paths);
714        let chain_api = TestChainApi::new(me, me_addr(), vec![(a, a_addr()), (dest, dest_addr())])
715            .with_open_channel(me_addr(), a_addr())
716            .with_open_channel(a_addr(), dest_addr());
717        let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
718        let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
719
720        let cache_key: PlannerCacheKey = (NodeId::Offchain(me), NodeId::Offchain(dest), 1);
721
722        assert!(
723            planner.cache.get(&cache_key).await.is_none(),
724            "cache should be empty before first call"
725        );
726
727        let routing = DestinationRouting::Forward {
728            destination: Box::new(NodeId::Offchain(dest)),
729            pseudonym: None,
730            forward_options: RoutingOptions::Hops(1.try_into().expect("valid 1")),
731            return_options: None,
732        };
733        planner.resolve_routing(100, 0, routing).await.expect("should succeed");
734
735        let cached = planner.cache.get(&cache_key).await;
736        assert!(cached.is_some(), "cache should be populated after first call");
737        let paths = cached.unwrap();
738        assert!(!paths.is_empty(), "cached paths must be non-empty");
739        let (first_path, first_cost) = paths.iter().next().expect("at least one cached path");
740        assert_eq!(first_path.num_hops(), 2, "path should have 2 hops [a, dest]");
741        assert!(*first_cost > 0.0, "cost should be positive");
742    }
743
744    #[tokio::test]
745    async fn planner_cache_hit_should_return_valid_path() {
746        let me = pubkey(&SECRET_ME);
747        let a = pubkey(&SECRET_A);
748        let dest = pubkey(&SECRET_DEST);
749
750        let graph = ChannelGraph::new(me);
751        graph.add_node(a);
752        graph.add_node(dest);
753        graph.add_edge(&me, &a).unwrap();
754        graph.add_edge(&a, &dest).unwrap();
755        mark_edge_full(&graph, &me, &a);
756        mark_edge_last(&graph, &a, &dest);
757
758        let selector = HoprGraphPathSelector::new(me, graph, small_config().max_cached_paths);
759        let chain_api = TestChainApi::new(me, me_addr(), vec![(a, a_addr()), (dest, dest_addr())])
760            .with_open_channel(me_addr(), a_addr())
761            .with_open_channel(a_addr(), dest_addr());
762        let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
763        let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
764
765        let make_routing = || DestinationRouting::Forward {
766            destination: Box::new(NodeId::Offchain(dest)),
767            pseudonym: None,
768            forward_options: RoutingOptions::Hops(1.try_into().expect("valid 1")),
769            return_options: None,
770        };
771
772        let (r1, _) = planner.resolve_routing(100, 0, make_routing()).await.expect("call 1");
773        let (r2, _) = planner.resolve_routing(100, 0, make_routing()).await.expect("call 2");
774
775        let hops1 = if let ResolvedTransportRouting::Forward { forward_path, .. } = r1 {
776            forward_path.num_hops()
777        } else {
778            panic!("expected Forward");
779        };
780        let hops2 = if let ResolvedTransportRouting::Forward { forward_path, .. } = r2 {
781            forward_path.num_hops()
782        } else {
783            panic!("expected Forward");
784        };
785        assert_eq!(hops1, 2);
786        assert_eq!(hops2, 2);
787    }
788
789    #[tokio::test]
790    async fn background_refresh_should_produce_a_future() {
791        let me = pubkey(&SECRET_ME);
792        let graph = ChannelGraph::new(me);
793        let selector = HoprGraphPathSelector::new(me, graph, small_config().max_cached_paths);
794        let chain_api = TestChainApi::new(me, me_addr(), vec![]);
795        let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
796
797        let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
798        // Just ensure it compiles and produces a future.
799        let _future = planner.run_background_refresh();
800    }
801}