Skip to main content

hopr_transport/path/
planner.rs

1use std::{sync::Arc, time::Duration};
2
3use futures::{StreamExt as _, TryStreamExt, stream::FuturesUnordered};
4use hopr_api::chain::{ChainKeyOperations, ChainPathResolver, ChainReadChannelOperations};
5use hopr_crypto_packet::prelude::*;
6use hopr_protocol_hopr::{FoundSurb, SurbStore};
7#[cfg(all(feature = "telemetry", not(test)))]
8use hopr_types::internal::path::Path;
9use hopr_types::{
10    crypto::{crypto_traits::Randomizable, types::OffchainPublicKey},
11    internal::{errors::PathError, prelude::*},
12    primitive::traits::ToHex,
13};
14use tracing::trace;
15use validator::{Validate, ValidationError};
16
17use super::{
18    errors::{PathPlannerError, Result},
19    traits::{BackgroundPathCacheRefreshable, PathSelector},
20};
21
22#[cfg(all(feature = "telemetry", not(test)))]
23lazy_static::lazy_static! {
24    static ref METRIC_PATH_LENGTH: hopr_types::telemetry::SimpleHistogram = hopr_types::telemetry::SimpleHistogram::new(
25        "hopr_path_length",
26        "Distribution of number of hops of sent messages",
27        vec![0.0, 1.0, 2.0, 3.0, 4.0]
28    ).unwrap();
29}
30
31/// Configuration for [`PathPlanner`]'s internal path cache.
32#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault, Validate)]
33pub struct PathPlannerConfig {
34    /// Maximum number of `(source, destination, options)` entries in the path cache.
35    #[default = 10_000]
36    pub max_cache_capacity: u64,
37    /// Time-to-live for a cached path list.  When an entry expires the next
38    /// [`PathPlanner::resolve_routing`] call transparently recomputes it (lazy refresh).
39    #[default(Duration::from_secs(60))]
40    pub cache_ttl: Duration,
41    /// Period between proactive background cache-refresh sweeps.
42    #[default(Duration::from_secs(30))]
43    pub refresh_period: Duration,
44    /// Maximum number of candidate paths the selector may return per query.
45    /// All returned candidates are validated and cached.
46    #[default = 8]
47    pub max_cached_paths: usize,
48    /// Penalty multiplier for edges lacking probe-based quality observations.
49    /// Applied during path cost evaluation to down-weight unprobed edges.
50    /// Must be finite and in `0.0..=1.0`.
51    #[default = 0.5]
52    #[validate(custom(function = "validate_unit_interval"))]
53    pub edge_penalty: f64,
54    /// Minimum acceptable message acknowledgment rate for path selection.
55    /// Edges with an ack rate below this threshold are excluded from candidate paths.
56    /// Must be finite and in `0.0..=1.0`.
57    #[default = 0.1]
58    #[validate(custom(function = "validate_unit_interval"))]
59    pub min_ack_rate: f64,
60}
61
62fn validate_unit_interval(value: f64) -> std::result::Result<(), ValidationError> {
63    if value.is_finite() && (0.0..=1.0).contains(&value) {
64        Ok(())
65    } else {
66        Err(ValidationError::new("value must be finite and in 0.0..=1.0"))
67    }
68}
69
70/// Cache key for the path planner: `(source, destination, hops)`.
71///
72/// Only the `Hops` variant of [`RoutingOptions`] is cached (explicit intermediate
73/// paths bypass the cache), so the key stores the hop count as a plain `u32`.
74type PlannerCacheKey = (NodeId, NodeId, u32);
75type PlannerCacheValue = Arc<hopr_utils::statistics::WeightedCollection<ValidatedPath>>;
76
77/// Path planner that resolves [`DestinationRouting`] to [`ResolvedTransportRouting`].
78///
79/// The planner delegates path *discovery* to any [`PathSelector`] implementation and
80/// owns the `moka` cache of fully-validated [`ValidatedPath`] objects paired with
81/// their traversal cost, keyed by `(source: NodeId, destination: NodeId, hops: u32)`.
82///
83/// On a cache miss the planner calls the selector, validates every candidate against
84/// the chain resolver, and stores an `Arc<WeightedCollection<ValidatedPath>>` in the
85/// cache. On a cache hit a candidate is picked via weighted random selection (higher
86/// cost = higher quality = higher probability).
87///
88/// A background sweep (`background_refresh`) can be spawned to
89/// proactively re-warm the cache for all previously-seen keys.
90#[derive(Clone)]
91pub struct PathPlanner<Surb, R, S> {
92    me: OffchainPublicKey,
93    pub surb_store: Surb,
94    resolver: Arc<R>,
95    selector: Arc<S>,
96    cache: moka::future::Cache<PlannerCacheKey, PlannerCacheValue>,
97    refresh_period: Duration,
98}
99
100impl<Surb, R, S> PathPlanner<Surb, R, S>
101where
102    Surb: SurbStore + Send + Sync + 'static,
103    R: ChainKeyOperations + ChainReadChannelOperations + Send + Sync + 'static,
104    S: PathSelector + Send + Sync + 'static,
105{
106    /// Create a new path planner.
107    ///
108    /// `me` is this node's [`OffchainPublicKey`]; it is used as the source in path queries.
109    pub fn new(me: OffchainPublicKey, surb_store: Surb, resolver: R, selector: S, config: PathPlannerConfig) -> Self {
110        let cache = moka::future::Cache::builder()
111            .max_capacity(config.max_cache_capacity)
112            .time_to_live(config.cache_ttl)
113            .build();
114
115        Self {
116            me,
117            surb_store,
118            resolver: Arc::new(resolver),
119            selector: Arc::new(selector),
120            cache,
121            refresh_period: config.refresh_period,
122        }
123    }
124
125    /// Resolve a [`NodeId`] to an [`OffchainPublicKey`].
126    async fn resolve_node_id_to_offchain_key(&self, node_id: &NodeId) -> Result<OffchainPublicKey> {
127        match node_id {
128            NodeId::Offchain(key) => Ok(*key),
129            NodeId::Chain(addr) => {
130                let resolver = ChainPathResolver::from(&*self.resolver);
131                resolver
132                    .resolve_transport_address(addr)
133                    .await
134                    .map_err(|e| PathPlannerError::Other(anyhow::anyhow!("{e}")))?
135                    .ok_or_else(|| {
136                        PathPlannerError::Other(anyhow::anyhow!("no offchain key found for chain address {addr}"))
137                    })
138            }
139        }
140    }
141
142    #[tracing::instrument(level = "trace", skip(self))]
143    async fn resolve_path(
144        &self,
145        source: NodeId,
146        destination: NodeId,
147        options: RoutingOptions,
148    ) -> Result<ValidatedPath> {
149        let path = match options {
150            RoutingOptions::IntermediatePath(explicit_path) => {
151                tracing::debug!(
152                    direction = "loopback",
153                    ?source,
154                    ?destination,
155                    ?explicit_path,
156                    "resolving intermediate path"
157                );
158                let resolver = ChainPathResolver::from(&*self.resolver);
159                ValidatedPath::new(
160                    source,
161                    explicit_path
162                        .into_iter()
163                        .chain(std::iter::once(destination))
164                        .collect::<Vec<_>>(),
165                    &resolver,
166                )
167                .await?
168            }
169
170            RoutingOptions::Hops(hops) if u32::from(hops) == 0 => {
171                trace!(hops = 0, "resolving zero-hop direct path");
172                let resolver = ChainPathResolver::from(&*self.resolver);
173                ValidatedPath::new(source, vec![destination], &resolver).await?
174            }
175
176            RoutingOptions::Hops(hops) => {
177                let hops_usize: usize = hops.into();
178                trace!(hops = hops_usize, "resolving path via planner cache");
179
180                let src_key = self.resolve_node_id_to_offchain_key(&source).await?;
181                let dest_key = self.resolve_node_id_to_offchain_key(&destination).await?;
182
183                let cache_key: PlannerCacheKey = (source, destination, u32::from(hops));
184
185                let resolver = self.resolver.clone();
186                let selector = self.selector.clone();
187
188                let paths = self
189                    .cache
190                    .try_get_with(cache_key, async move {
191                        trace!(hops = hops_usize, "path cache miss, querying selector");
192                        let candidates = selector.select_path(src_key, dest_key, hops_usize)?;
193
194                        let chain_resolver = ChainPathResolver::from(&*resolver);
195                        let mut valid_paths: Vec<(ValidatedPath, f64)> = Vec::new();
196                        for pwc in candidates {
197                            let node_ids: Vec<NodeId> = pwc.path.into_iter().map(NodeId::Offchain).collect::<Vec<_>>();
198                            match ValidatedPath::new(source, node_ids, &chain_resolver).await {
199                                Ok(vp) => {
200                                    // Post-resolution: catch non-adjacent chain-address duplicates
201                                    // (ValidatedPath::new only checks consecutive collisions).
202                                    if vp.chain_path().iter().enumerate().any(|(i, a)| vp.chain_path()[..i].contains(a)) {
203                                        tracing::debug!(path = %vp, "skipping path candidate with repeated chain addresses");
204                                        continue;
205                                    }
206                                    valid_paths.push((vp, pwc.cost))
207                                }
208                                Err(e) => tracing::warn!(error = %e, "path candidate failed validation"),
209                            }
210                        }
211
212                        if valid_paths.is_empty() {
213                            return Err(PathPlannerError::Path(PathError::PathNotFound(
214                                hops_usize,
215                                src_key.to_hex(),
216                                dest_key.to_hex(),
217                            )));
218                        }
219
220                        Ok(Arc::new(hopr_utils::statistics::WeightedCollection::new(valid_paths)))
221                    })
222                    .await
223                    .map_err(PathPlannerError::CacheError)?;
224
225                paths.pick_one().ok_or_else(|| {
226                    PathPlannerError::Path(PathError::PathNotFound(hops_usize, src_key.to_hex(), dest_key.to_hex()))
227                })?
228            }
229        };
230
231        #[cfg(all(feature = "telemetry", not(test)))]
232        {
233            hopr_types::telemetry::SimpleHistogram::observe(&METRIC_PATH_LENGTH, (path.num_hops() - 1) as f64);
234        }
235
236        trace!(%path, "validated resolved path");
237        Ok(path)
238    }
239
240    /// Resolve a [`DestinationRouting`] to a [`ResolvedTransportRouting`].
241    ///
242    /// Returns the resolved routing and, for `Return` variants, the number of remaining SURBs.
243    #[tracing::instrument(level = "trace", skip(self))]
244    pub async fn resolve_routing(
245        &self,
246        size_hint: usize,
247        max_surbs: usize,
248        routing: DestinationRouting,
249    ) -> Result<(ResolvedTransportRouting<HoprSurb>, Option<usize>)> {
250        match routing {
251            DestinationRouting::Forward {
252                destination,
253                pseudonym,
254                forward_options,
255                return_options,
256            } => {
257                tracing::debug!(direction = "forward", %destination, "resolving forward path");
258
259                let forward_path = self
260                    .resolve_path(NodeId::Offchain(self.me), *destination, forward_options)
261                    .await?;
262                tracing::debug!(direction = "forward", %destination, path = %forward_path, "resolved path");
263
264                let return_paths = if let Some(return_options) = return_options {
265                    let num_possible_surbs = HoprPacket::max_surbs_with_message(size_hint).min(max_surbs);
266                    trace!(
267                        %destination,
268                        %num_possible_surbs,
269                        data_len = size_hint,
270                        max_surbs,
271                        "resolving packet return paths"
272                    );
273
274                    (0..num_possible_surbs)
275                        .map(|_| self.resolve_path(*destination, NodeId::Offchain(self.me), return_options.clone()))
276                        .collect::<FuturesUnordered<_>>()
277                        .try_collect::<Vec<ValidatedPath>>()
278                        .await?
279                        .into_iter()
280                        .enumerate()
281                        .inspect(|(i, rp)| {
282                            tracing::debug!(direction = "return", %destination, index = i, path = %rp, "resolved return path");
283                        })
284                        .map(|(_, rp)| rp)
285                        .collect()
286                } else {
287                    vec![]
288                };
289
290                trace!(%destination, num_surbs = return_paths.len(), data_len = size_hint, "resolved packet");
291
292                Ok((
293                    ResolvedTransportRouting::Forward {
294                        pseudonym: pseudonym.unwrap_or_else(HoprPseudonym::random),
295                        forward_path,
296                        return_paths,
297                    },
298                    None,
299                ))
300            }
301
302            DestinationRouting::Return(matcher) => {
303                let FoundSurb {
304                    sender_id,
305                    surb,
306                    remaining,
307                } = self
308                    .surb_store
309                    .find_surb(matcher)
310                    .ok_or_else(|| PathPlannerError::Surb(format!("no surb for pseudonym {}", matcher.pseudonym())))?;
311                Ok((ResolvedTransportRouting::Return(sender_id, surb), Some(remaining)))
312            }
313        }
314    }
315}
316
317impl<Surb, R, S> BackgroundPathCacheRefreshable for PathPlanner<Surb, R, S>
318where
319    Surb: SurbStore + Send + Sync + 'static,
320    R: ChainKeyOperations + ChainReadChannelOperations + Send + Sync + 'static,
321    S: PathSelector + Send + Sync + 'static,
322{
323    /// Returns a future that runs the background path-cache refresh loop.
324    ///
325    /// The returned future iterates over all keys currently in the planner's cache
326    /// and recomputes their paths on a configurable schedule, so that steady-state
327    /// traffic is always served from cache.
328    fn run_background_refresh(&self) -> impl std::future::Future<Output = ()> + Send + 'static {
329        // Clone only the fields we need — avoids requiring R: Clone + S: Clone.
330        let cache = self.cache.clone();
331        let resolver = self.resolver.clone();
332        let selector = self.selector.clone();
333        let refresh_period = self.refresh_period;
334
335        // run at a non-zero interval
336        futures_time::stream::interval(futures_time::time::Duration::from_millis(
337            refresh_period.as_millis() as u64 + 1u64,
338        ))
339        .for_each(move |_| {
340            let cache = cache.clone();
341            let resolver = resolver.clone();
342            let selector = selector.clone();
343
344            async move {
345                for (key, _) in cache.iter() {
346                    let (src, dest, hops_u32) = {
347                        let k = key.as_ref();
348                        (k.0, k.1, k.2)
349                    };
350
351                    if hops_u32 == 0 {
352                        continue;
353                    }
354                    let hops_usize = hops_u32 as usize;
355
356                    let resolve_key = |node: NodeId| {
357                        let resolver = resolver.clone();
358
359                        async move {
360                            match node {
361                                NodeId::Offchain(k) => Some(k),
362                                NodeId::Chain(addr) => ChainPathResolver::from(&*resolver)
363                                    .resolve_transport_address(&addr)
364                                    .await
365                                    .ok()
366                                    .flatten(),
367                            }
368                        }
369                    };
370
371                    if let (Some(src_key), Some(dest_key)) = (resolve_key(src).await, resolve_key(dest).await)
372                        && let Ok(candidates) = selector.select_path(src_key, dest_key, hops_usize)
373                    {
374                        let chain_resolver = ChainPathResolver::from(&*resolver);
375                        let mut valid_paths: Vec<(ValidatedPath, f64)> = Vec::new();
376                        for pwc in candidates {
377                            let node_ids: Vec<NodeId> = pwc.path.into_iter().map(NodeId::Offchain).collect::<Vec<_>>();
378                            match ValidatedPath::new(src, node_ids, &chain_resolver).await {
379                                Ok(vp) => {
380                                    if vp.chain_path().iter().enumerate().any(|(i, a)| vp.chain_path()[..i].contains(a)) {
381                                        tracing::debug!(path = %vp, "background refresh: skipping candidate with repeated chain addresses");
382                                        continue;
383                                    }
384                                    valid_paths.push((vp, pwc.cost))
385                                }
386                                Err(e) => tracing::warn!(error = %e, "background refresh: path candidate failed validation"),
387                            }
388                        }
389
390                        if !valid_paths.is_empty() {
391                            cache
392                                .insert(
393                                    (src, dest, hops_u32),
394                                    Arc::new(hopr_utils::statistics::WeightedCollection::new(valid_paths)),
395                                )
396                                .await;
397                        }
398                    }
399                }
400            }
401        })
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use std::str::FromStr;
408
409    use bimap::BiMap;
410    use futures::stream::{self, BoxStream};
411    use hex_literal::hex;
412    use hopr_api::{
413        chain::{ChainKeyOperations, ChainReadChannelOperations, ChannelSelector, HoprKeyIdent},
414        graph::{
415            NetworkGraphWrite,
416            traits::{EdgeObservableWrite, EdgeWeightType},
417        },
418    };
419    use hopr_network_graph::ChannelGraph;
420    use hopr_types::{
421        crypto::prelude::{Keypair, OffchainKeypair},
422        internal::channels::{ChannelEntry, ChannelStatus, generate_channel_id},
423        primitive::prelude::*,
424    };
425
426    use super::*;
427    use crate::path::selector::HoprGraphPathSelector;
428
429    #[derive(Debug)]
430    struct TestError(String);
431
432    impl std::fmt::Display for TestError {
433        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434            f.write_str(&self.0)
435        }
436    }
437
438    impl std::error::Error for TestError {}
439
440    const SECRET_ME: [u8; 32] = hex!("60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d");
441    const SECRET_A: [u8; 32] = hex!("71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a");
442    const SECRET_DEST: [u8; 32] = hex!("c24bd833704dd2abdae3933fcc9962c2ac404f84132224c474147382d4db2299");
443
444    fn pubkey(secret: &[u8; 32]) -> OffchainPublicKey {
445        *OffchainKeypair::from_secret(secret).expect("valid secret").public()
446    }
447
448    #[derive(Clone)]
449    struct Mapper {
450        map: Arc<BiMap<OffchainPublicKey, HoprKeyIdent>>,
451    }
452
453    impl KeyIdMapping<HoprKeyIdent, OffchainPublicKey> for Mapper {
454        fn map_key_to_id(&self, key: &OffchainPublicKey) -> Option<HoprKeyIdent> {
455            self.map.get_by_left(key).copied()
456        }
457
458        fn map_id_to_public(&self, id: &HoprKeyIdent) -> Option<OffchainPublicKey> {
459            self.map.get_by_right(id).copied()
460        }
461
462        fn map_keys_to_ids(&self, keys: &[OffchainPublicKey]) -> Vec<Option<HoprKeyIdent>> {
463            keys.iter().map(|key| self.map_key_to_id(key)).collect()
464        }
465
466        fn map_ids_to_keys(&self, ids: &[HoprKeyIdent]) -> Vec<Option<OffchainPublicKey>> {
467            ids.iter().map(|id| self.map_id_to_public(id)).collect()
468        }
469    }
470
471    struct TestChainApi {
472        me: Address,
473        key_addr_map: BiMap<OffchainPublicKey, Address>,
474        channels: Vec<ChannelEntry>,
475        id_mapper: Mapper,
476    }
477
478    impl TestChainApi {
479        fn new(me_key: OffchainPublicKey, me_addr: Address, peers: Vec<(OffchainPublicKey, Address)>) -> Self {
480            let mut key_addr_map = BiMap::new();
481            let mut key_id_map: BiMap<OffchainPublicKey, HoprKeyIdent> = BiMap::new();
482            key_addr_map.insert(me_key, me_addr);
483            key_id_map.insert(me_key, 0u32.into());
484            for (i, (k, a)) in peers.iter().enumerate() {
485                key_addr_map.insert(*k, *a);
486                key_id_map.insert(*k, ((i + 1) as u32).into());
487            }
488            Self {
489                me: me_addr,
490                key_addr_map,
491                channels: vec![],
492                id_mapper: Mapper {
493                    map: Arc::new(key_id_map),
494                },
495            }
496        }
497
498        fn with_open_channel(mut self, src: Address, dst: Address) -> Self {
499            self.channels.push(
500                ChannelEntry::builder()
501                    .between(src, dst)
502                    .amount(100)
503                    .ticket_index(1)
504                    .status(ChannelStatus::Open)
505                    .epoch(1)
506                    .build()
507                    .unwrap(),
508            );
509            self
510        }
511    }
512
513    impl ChainKeyOperations for TestChainApi {
514        type Error = TestError;
515        type Mapper = Mapper;
516
517        fn chain_key_to_packet_key(
518            &self,
519            chain: &Address,
520        ) -> std::result::Result<Option<OffchainPublicKey>, TestError> {
521            Ok(self.key_addr_map.get_by_right(chain).copied())
522        }
523
524        fn packet_key_to_chain_key(
525            &self,
526            packet: &OffchainPublicKey,
527        ) -> std::result::Result<Option<Address>, TestError> {
528            Ok(self.key_addr_map.get_by_left(packet).copied())
529        }
530
531        fn key_id_mapper_ref(&self) -> &Self::Mapper {
532            &self.id_mapper
533        }
534    }
535
536    impl ChainReadChannelOperations for TestChainApi {
537        type Error = TestError;
538
539        fn me(&self) -> &Address {
540            &self.me
541        }
542
543        fn channel_by_id(&self, channel_id: &ChannelId) -> std::result::Result<Option<ChannelEntry>, TestError> {
544            Ok(self
545                .channels
546                .iter()
547                .find(|c| generate_channel_id(&c.source, &c.destination) == *channel_id)
548                .cloned())
549        }
550
551        fn stream_channels<'a>(
552            &'a self,
553            _selector: ChannelSelector,
554        ) -> std::result::Result<BoxStream<'a, ChannelEntry>, TestError> {
555            Ok(Box::pin(stream::iter(self.channels.clone())))
556        }
557    }
558
559    // ── address fixtures ──────────────────────────────────────────────────────
560    fn me_addr() -> Address {
561        Address::from_str("0x1000d5786d9e6799b3297da1ad55605b91e2c882").expect("valid addr")
562    }
563    fn a_addr() -> Address {
564        Address::from_str("0x200060ddced1e33c9647a71f4fc2cf4ed33e4a9d").expect("valid addr")
565    }
566    fn dest_addr() -> Address {
567        Address::from_str("0x30004105095c8c10f804109b4d1199a9ac40ed46").expect("valid addr")
568    }
569
570    // ── graph helpers ──────────────────────────────────────────────────────────
571    fn mark_edge_full(graph: &ChannelGraph, src: &OffchainPublicKey, dst: &OffchainPublicKey) {
572        use hopr_api::graph::traits::EdgeWeightType;
573        graph.upsert_edge(src, dst, |obs| {
574            obs.record(EdgeWeightType::Connected(true));
575            obs.record(EdgeWeightType::Immediate(Ok(std::time::Duration::from_millis(50))));
576            obs.record(EdgeWeightType::Intermediate(Ok(std::time::Duration::from_millis(50))));
577            obs.record(EdgeWeightType::Capacity(Some(1000)));
578        });
579    }
580
581    fn mark_edge_last(graph: &ChannelGraph, src: &OffchainPublicKey, dst: &OffchainPublicKey) {
582        graph.upsert_edge(src, dst, |obs| {
583            obs.record(EdgeWeightType::Connected(true));
584            obs.record(EdgeWeightType::Immediate(Ok(std::time::Duration::from_millis(50))));
585            obs.record(EdgeWeightType::Intermediate(Ok(std::time::Duration::from_millis(50))));
586            obs.record(EdgeWeightType::Capacity(Some(1000)));
587        });
588    }
589
590    fn small_config() -> PathPlannerConfig {
591        PathPlannerConfig {
592            max_cache_capacity: 100,
593            cache_ttl: std::time::Duration::from_secs(60),
594            refresh_period: std::time::Duration::from_secs(60),
595            max_cached_paths: 2,
596            ..PathPlannerConfig::default()
597        }
598    }
599
600    // ── test: zero-hop path ───────────────────────────────────────────────────
601
602    #[tokio::test]
603    async fn zero_hop_path_should_bypass_selector() {
604        let me = pubkey(&SECRET_ME);
605        let dest = pubkey(&SECRET_DEST);
606
607        // Build empty graph (no edges) — selector would fail if called.
608        let graph = ChannelGraph::new(me);
609        let cfg = small_config();
610        let selector = HoprGraphPathSelector::new(me, graph, cfg.max_cached_paths, cfg.edge_penalty, cfg.min_ack_rate);
611
612        let chain_api = TestChainApi::new(me, me_addr(), vec![(dest, dest_addr())]);
613        let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
614
615        let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
616
617        let routing = DestinationRouting::Forward {
618            destination: Box::new(NodeId::Offchain(dest)),
619            pseudonym: None,
620            forward_options: RoutingOptions::Hops(0.try_into().expect("valid 0")),
621            return_options: None,
622        };
623
624        let result = planner.resolve_routing(100, 0, routing).await;
625        assert!(result.is_ok(), "zero-hop should succeed: {:?}", result.err());
626
627        let (resolved, rem) = result.unwrap();
628        assert!(rem.is_none());
629        if let ResolvedTransportRouting::Forward { forward_path, .. } = resolved {
630            assert_eq!(
631                forward_path.num_hops(),
632                1,
633                "zero-hop = 1 node in path (just destination)"
634            );
635        } else {
636            panic!("expected Forward routing");
637        }
638    }
639
640    // ── test: one-hop path via graph selector ─────────────────────────────────
641
642    #[tokio::test]
643    async fn one_hop_path_should_use_selector() {
644        let me = pubkey(&SECRET_ME);
645        let a = pubkey(&SECRET_A);
646        let dest = pubkey(&SECRET_DEST);
647
648        let graph = ChannelGraph::new(me);
649        graph.add_node(a);
650        graph.add_node(dest);
651        graph.add_edge(&me, &a).unwrap();
652        graph.add_edge(&a, &dest).unwrap();
653        mark_edge_full(&graph, &me, &a);
654        mark_edge_last(&graph, &a, &dest);
655
656        let cfg = small_config();
657        let selector = HoprGraphPathSelector::new(me, graph, cfg.max_cached_paths, cfg.edge_penalty, cfg.min_ack_rate);
658
659        let chain_api = TestChainApi::new(me, me_addr(), vec![(a, a_addr()), (dest, dest_addr())])
660            .with_open_channel(me_addr(), a_addr())
661            .with_open_channel(a_addr(), dest_addr());
662
663        let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
664        let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
665
666        let routing = DestinationRouting::Forward {
667            destination: Box::new(NodeId::Offchain(dest)),
668            pseudonym: None,
669            forward_options: RoutingOptions::Hops(1.try_into().expect("valid 1")),
670            return_options: None,
671        };
672
673        let result = planner.resolve_routing(100, 0, routing).await;
674        assert!(result.is_ok(), "1-hop routing should succeed: {:?}", result.err());
675
676        let (resolved, _) = result.unwrap();
677        if let ResolvedTransportRouting::Forward { forward_path, .. } = resolved {
678            assert_eq!(
679                forward_path.num_hops(),
680                2,
681                "1 intermediate hop means path has 2 nodes [a, dest]"
682            );
683        } else {
684            panic!("expected Forward routing");
685        }
686    }
687
688    #[tokio::test]
689    async fn explicit_intermediate_path_should_bypass_selector() {
690        let me = pubkey(&SECRET_ME);
691        let a = pubkey(&SECRET_A);
692        let dest = pubkey(&SECRET_DEST);
693
694        // Empty graph — selector would fail; explicit path should not use it.
695        let graph = ChannelGraph::new(me);
696        let cfg = small_config();
697        let selector = HoprGraphPathSelector::new(me, graph, cfg.max_cached_paths, cfg.edge_penalty, cfg.min_ack_rate);
698
699        let chain_api = TestChainApi::new(me, me_addr(), vec![(a, a_addr()), (dest, dest_addr())])
700            .with_open_channel(me_addr(), a_addr())
701            .with_open_channel(a_addr(), dest_addr());
702
703        let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
704        let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
705
706        use hopr_types::primitive::prelude::BoundedVec;
707        let intermediate_path = BoundedVec::try_from(vec![NodeId::Offchain(a)]).expect("valid");
708
709        let routing = DestinationRouting::Forward {
710            destination: Box::new(NodeId::Offchain(dest)),
711            pseudonym: None,
712            forward_options: RoutingOptions::IntermediatePath(intermediate_path),
713            return_options: None,
714        };
715
716        let result = planner.resolve_routing(100, 0, routing).await;
717        assert!(result.is_ok(), "explicit path should succeed: {:?}", result.err());
718
719        let (resolved, _) = result.unwrap();
720        if let ResolvedTransportRouting::Forward { forward_path, .. } = resolved {
721            assert_eq!(forward_path.num_hops(), 2, "one intermediate + destination = 2 hops");
722        } else {
723            panic!("expected Forward routing");
724        }
725    }
726
727    #[tokio::test]
728    async fn return_routing_without_surb_should_return_error() {
729        let me = pubkey(&SECRET_ME);
730        let graph = ChannelGraph::new(me);
731        let cfg = small_config();
732        let selector = HoprGraphPathSelector::new(me, graph, cfg.max_cached_paths, cfg.edge_penalty, cfg.min_ack_rate);
733        let chain_api = TestChainApi::new(me, me_addr(), vec![]);
734        let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
735
736        let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
737
738        use hopr_types::internal::routing::SurbMatcher;
739        let matcher = SurbMatcher::Pseudonym(HoprPseudonym::random());
740        let routing = DestinationRouting::Return(matcher);
741
742        let result = planner.resolve_routing(0, 0, routing).await;
743        assert!(result.is_err(), "should fail when no SURB exists");
744        assert!(
745            matches!(result.unwrap_err(), PathPlannerError::Surb(_)),
746            "error should be Surb variant"
747        );
748    }
749
750    // ── test: cache integration ───────────────────────────────────────────────
751
752    #[tokio::test]
753    async fn planner_cache_miss_should_populate_cache() {
754        let me = pubkey(&SECRET_ME);
755        let a = pubkey(&SECRET_A);
756        let dest = pubkey(&SECRET_DEST);
757
758        let graph = ChannelGraph::new(me);
759        graph.add_node(a);
760        graph.add_node(dest);
761        graph.add_edge(&me, &a).unwrap();
762        graph.add_edge(&a, &dest).unwrap();
763        mark_edge_full(&graph, &me, &a);
764        mark_edge_last(&graph, &a, &dest);
765
766        let cfg = small_config();
767        let selector = HoprGraphPathSelector::new(me, graph, cfg.max_cached_paths, cfg.edge_penalty, cfg.min_ack_rate);
768        let chain_api = TestChainApi::new(me, me_addr(), vec![(a, a_addr()), (dest, dest_addr())])
769            .with_open_channel(me_addr(), a_addr())
770            .with_open_channel(a_addr(), dest_addr());
771        let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
772        let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
773
774        let cache_key: PlannerCacheKey = (NodeId::Offchain(me), NodeId::Offchain(dest), 1);
775
776        assert!(
777            planner.cache.get(&cache_key).await.is_none(),
778            "cache should be empty before first call"
779        );
780
781        let routing = DestinationRouting::Forward {
782            destination: Box::new(NodeId::Offchain(dest)),
783            pseudonym: None,
784            forward_options: RoutingOptions::Hops(1.try_into().expect("valid 1")),
785            return_options: None,
786        };
787        planner.resolve_routing(100, 0, routing).await.expect("should succeed");
788
789        let cached = planner.cache.get(&cache_key).await;
790        assert!(cached.is_some(), "cache should be populated after first call");
791        let paths = cached.unwrap();
792        assert!(!paths.is_empty(), "cached paths must be non-empty");
793        let (first_path, first_cost) = paths.iter().next().expect("at least one cached path");
794        assert_eq!(first_path.num_hops(), 2, "path should have 2 hops [a, dest]");
795        assert!(*first_cost > 0.0, "cost should be positive");
796    }
797
798    #[tokio::test]
799    async fn planner_cache_hit_should_return_valid_path() {
800        let me = pubkey(&SECRET_ME);
801        let a = pubkey(&SECRET_A);
802        let dest = pubkey(&SECRET_DEST);
803
804        let graph = ChannelGraph::new(me);
805        graph.add_node(a);
806        graph.add_node(dest);
807        graph.add_edge(&me, &a).unwrap();
808        graph.add_edge(&a, &dest).unwrap();
809        mark_edge_full(&graph, &me, &a);
810        mark_edge_last(&graph, &a, &dest);
811
812        let cfg = small_config();
813        let selector = HoprGraphPathSelector::new(me, graph, cfg.max_cached_paths, cfg.edge_penalty, cfg.min_ack_rate);
814        let chain_api = TestChainApi::new(me, me_addr(), vec![(a, a_addr()), (dest, dest_addr())])
815            .with_open_channel(me_addr(), a_addr())
816            .with_open_channel(a_addr(), dest_addr());
817        let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
818        let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
819
820        let make_routing = || DestinationRouting::Forward {
821            destination: Box::new(NodeId::Offchain(dest)),
822            pseudonym: None,
823            forward_options: RoutingOptions::Hops(1.try_into().expect("valid 1")),
824            return_options: None,
825        };
826
827        let (r1, _) = planner.resolve_routing(100, 0, make_routing()).await.expect("call 1");
828        let (r2, _) = planner.resolve_routing(100, 0, make_routing()).await.expect("call 2");
829
830        let hops1 = if let ResolvedTransportRouting::Forward { forward_path, .. } = r1 {
831            forward_path.num_hops()
832        } else {
833            panic!("expected Forward");
834        };
835        let hops2 = if let ResolvedTransportRouting::Forward { forward_path, .. } = r2 {
836            forward_path.num_hops()
837        } else {
838            panic!("expected Forward");
839        };
840        assert_eq!(hops1, 2);
841        assert_eq!(hops2, 2);
842    }
843
844    #[tokio::test]
845    async fn background_refresh_should_produce_a_future() {
846        let me = pubkey(&SECRET_ME);
847        let graph = ChannelGraph::new(me);
848        let cfg = small_config();
849        let selector = HoprGraphPathSelector::new(me, graph, cfg.max_cached_paths, cfg.edge_penalty, cfg.min_ack_rate);
850        let chain_api = TestChainApi::new(me, me_addr(), vec![]);
851        let surb_store = hopr_protocol_hopr::MemorySurbStore::default();
852
853        let planner = PathPlanner::new(me, surb_store, chain_api, selector, small_config());
854        // Just ensure it compiles and produces a future.
855        let _future = planner.run_background_refresh();
856    }
857}