hopr_path/selectors/
dfs.rs

1use async_trait::async_trait;
2use hopr_crypto_random::random_float;
3use hopr_internal_types::prelude::*;
4use hopr_primitive_types::prelude::*;
5use std::cmp::Ordering;
6use std::collections::BinaryHeap;
7use std::marker::PhantomData;
8use std::time::Duration;
9use tracing::trace;
10
11use crate::channel_graph::{ChannelEdge, ChannelGraph, Node};
12use crate::errors::{PathError, Result};
13use crate::path::ChannelPath;
14use crate::selectors::{EdgeWeighting, PathSelector};
15
16/// Holds a weighted channel path and auxiliary information for the graph traversal.
17#[derive(Clone, Debug, PartialEq, Eq)]
18struct WeightedChannelPath {
19    path: Vec<Address>,
20    weight: U256,
21    fully_explored: bool,
22}
23
24impl WeightedChannelPath {
25    /// Extend this path with another [`ChannelEdge`] and return a new [`WeightedChannelPath`].
26    fn extend<CW: EdgeWeighting<U256>>(mut self, edge: &ChannelEdge) -> Self {
27        if !self.fully_explored {
28            self.path.push(edge.channel.destination);
29            self.weight += CW::calculate_weight(edge);
30        }
31        self
32    }
33}
34
35impl Default for WeightedChannelPath {
36    fn default() -> Self {
37        Self {
38            path: Vec::with_capacity(INTERMEDIATE_HOPS),
39            weight: U256::zero(),
40            fully_explored: false,
41        }
42    }
43}
44
45impl PartialOrd for WeightedChannelPath {
46    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
47        Some(self.cmp(other))
48    }
49}
50
51impl Ord for WeightedChannelPath {
52    /// Favors unexplored paths over fully explored paths even
53    /// when a better alternative exists.
54    ///
55    /// Favors longer paths over shorter paths, longer path
56    /// means more privacy.
57    ///
58    /// If both parts are of the same length, favors paths
59    /// with higher weights.
60    fn cmp(&self, other: &Self) -> Ordering {
61        if other.fully_explored == self.fully_explored {
62            match self.path.len().cmp(&other.path.len()) {
63                Ordering::Equal => self.weight.cmp(&other.weight),
64                o => o,
65            }
66        } else if other.fully_explored {
67            Ordering::Greater
68        } else {
69            Ordering::Less
70        }
71    }
72}
73
74/// Assigns each channel a weight.
75/// The weight is randomized such that the same
76/// nodes get not always selected.
77/// This is necessary to achieve privacy.
78/// It also favors nodes with higher stake.
79#[derive(Clone, Copy, Debug, Default)]
80pub struct RandomizedEdgeWeighting;
81
82impl EdgeWeighting<U256> for RandomizedEdgeWeighting {
83    /// Multiply channel stake with a random float in the interval [0,1).
84    /// Given that the floats are uniformly distributed, nodes with higher
85    /// stake have a higher probability of reaching a higher value.
86    ///
87    /// Sorting the list of weights thus moves nodes with higher stakes more
88    /// often to the front.
89    fn calculate_weight(edge: &ChannelEdge) -> U256 {
90        edge.channel
91            .balance
92            .amount()
93            .mul_f64(random_float())
94            .expect("Could not multiply edge weight with float")
95            .max(1.into())
96    }
97}
98
99#[derive(Clone, Copy, Debug, PartialEq, smart_default::SmartDefault)]
100pub struct DfsPathSelectorConfig {
101    /// The maximum number of iterations before a path selection fails
102    /// Default is 100
103    #[default(100)]
104    pub max_iterations: usize,
105    /// Peer quality threshold for a node to be taken into account.
106    /// Default is 0.5
107    #[default(0.5)]
108    pub node_score_threshold: f64,
109    /// Channel score threshold for a channel to be taken into account.
110    /// Default is 0
111    #[default(0.0)]
112    pub edge_score_threshold: f64,
113    /// The maximum latency of the first hop
114    /// Default is 100 ms
115    #[default(Some(Duration::from_millis(100)))]
116    pub max_first_hop_latency: Option<Duration>,
117    /// If true, include paths with payment channels, which have no
118    /// funds in it. By default, that option is set to `false` to
119    /// prevent tickets being dropped immediately.
120    /// Defaults to false.
121    #[default(false)]
122    pub allow_zero_edge_weight: bool,
123}
124
125/// Path selector using depth-first search of the channel graph.
126#[derive(Clone, Debug)]
127pub struct DfsPathSelector<CW> {
128    graph: std::sync::Arc<async_lock::RwLock<ChannelGraph>>,
129    cfg: DfsPathSelectorConfig,
130    _cw: PhantomData<CW>,
131}
132
133impl<CW: EdgeWeighting<U256>> DfsPathSelector<CW> {
134    /// Creates a new path selector with the given [config](DfsPathSelectorConfig) and
135    /// [`ChannelGraph`].
136    pub fn new(graph: std::sync::Arc<async_lock::RwLock<ChannelGraph>>, cfg: DfsPathSelectorConfig) -> Self {
137        Self {
138            graph,
139            cfg,
140            _cw: PhantomData,
141        }
142    }
143
144    /// Determines whether a `next_hop` node is considered "interesting".
145    ///
146    /// To achieve privacy, we need at least one honest node along
147    /// the path. Each additional node decreases the probability of
148    /// having only malicious nodes, so we can sort out many nodes.
149    /// Nodes that have shown to be unreliable are of no use, so
150    /// drop them.
151    ///
152    /// ## Arguments
153    /// * `next_hop`: node in the channel graph that we're trying to add to the path
154    /// * `edge`: the information about the corresponding graph edge ([`ChannelEntry`] and `score`).
155    /// * `initial_source`: the initial node on the path
156    /// * `final_destination`: the desired destination node (will not be part of the path)
157    /// * `current_path`: currently selected relayers
158    #[tracing::instrument(level = "trace", skip(self))]
159    fn is_next_hop_usable(
160        &self,
161        next_hop: &Node,
162        edge: &ChannelEdge,
163        initial_source: &Address,
164        final_destination: &Address,
165        current_path: &[Address],
166    ) -> bool {
167        debug_assert_eq!(next_hop.address, edge.channel.destination);
168
169        // Looping back to self does not give any privacy
170        if next_hop.address.eq(initial_source) {
171            trace!("source loopback not allowed");
172            return false;
173        }
174
175        // We cannot use `final_destination` as the last intermediate hop as
176        // this would be a loopback that does not give any privacy
177        if next_hop.address.eq(final_destination) {
178            trace!("destination loopback not allowed");
179            return false;
180        }
181
182        // Only use nodes that have shown to be somewhat reliable
183        if next_hop.node_score < self.cfg.node_score_threshold {
184            trace!("node quality threshold not satisfied");
185            return false;
186        }
187
188        // Edges which have score and is below the threshold will not be considered
189        if edge
190            .edge_score
191            .is_some_and(|score| score < self.cfg.edge_score_threshold)
192        {
193            trace!("channel score threshold not satisfied");
194            return false;
195        }
196
197        // If this is the first hop, check if the latency is not too high
198        if current_path.is_empty()
199            && self
200                .cfg
201                .max_first_hop_latency
202                .is_some_and(|limit| next_hop.latency.average().is_none_or(|avg_latency| avg_latency > limit))
203        {
204            trace!("first hop latency too high");
205            return false;
206        }
207
208        // At the moment, we do not allow circles because they
209        // do not give additional privacy
210        if current_path.contains(&next_hop.address) {
211            trace!("circles not allowed");
212            return false;
213        }
214
215        // We cannot use channels with zero stake, if configure so
216        if !self.cfg.allow_zero_edge_weight && edge.channel.balance.is_zero() {
217            trace!(%next_hop, "zero stake channels not allowed");
218            return false;
219        }
220
221        trace!("usable node");
222        true
223    }
224}
225
226#[async_trait]
227impl<CW> PathSelector for DfsPathSelector<CW>
228where
229    CW: EdgeWeighting<U256> + Send + Sync,
230{
231    /// Attempts to find a path with at least `min_hops` hops and at most `max_hops` hops
232    /// that goes from `source` to `destination`. There does not need to be
233    /// a payment channel to `destination`, so the path only includes intermediate hops.
234    ///
235    /// The function implements a randomized best-first search through the path space. The graph
236    /// traversal is bounded by `self.max_iterations` to prevent from long-running path
237    /// selection runs.
238    async fn select_path(
239        &self,
240        source: Address,
241        destination: Address,
242        min_hops: usize,
243        max_hops: usize,
244    ) -> Result<ChannelPath> {
245        // The protocol does not support >3 hop paths and will presumably never do,
246        // so we can exclude it here.
247        if !(1..=INTERMEDIATE_HOPS).contains(&max_hops) || !(1..=max_hops).contains(&min_hops) {
248            return Err(GeneralError::InvalidInput.into());
249        }
250
251        let graph = self.graph.read().await;
252
253        // Populate the queue with possible initial path offsprings
254        let mut queue = graph
255            .open_channels_from(source)
256            .filter(|(node, edge)| self.is_next_hop_usable(node, edge, &source, &destination, &[]))
257            .map(|(_, edge)| WeightedChannelPath::default().extend::<CW>(edge))
258            .collect::<BinaryHeap<_>>();
259
260        trace!(last_peer = %source, queue_len = queue.len(), "got next possible steps");
261
262        let mut iters = 0;
263        while let Some(mut current) = queue.pop() {
264            let current_len = current.path.len();
265
266            trace!(
267                ?current,
268                ?queue,
269                queue_len = queue.len(),
270                iters,
271                min_hops,
272                max_hops,
273                "testing next path in queue"
274            );
275            if current_len == max_hops || current.fully_explored || iters > self.cfg.max_iterations {
276                return if current_len >= min_hops && current_len <= max_hops {
277                    Ok(ChannelPath::new_valid(current.path))
278                } else {
279                    trace!(current_len, min_hops, max_hops, iters, "path not found");
280                    Err(PathError::PathNotFound(
281                        max_hops,
282                        source.to_string(),
283                        destination.to_string(),
284                    ))
285                };
286            }
287
288            // Check for any acceptable path continuations
289            let last_peer = *current.path.last().unwrap();
290            let mut new_channels = graph
291                .open_channels_from(last_peer)
292                .filter(|(next_hop, edge)| {
293                    self.is_next_hop_usable(next_hop, edge, &source, &destination, &current.path)
294                })
295                .peekable();
296
297            // If there are more usable path continuations, add them all to the queue
298            if new_channels.peek().is_some() {
299                queue.extend(new_channels.map(|(_, edge)| current.clone().extend::<CW>(edge)));
300                trace!(%last_peer, queue_len = queue.len(), "got next possible steps");
301            } else {
302                // No more possible continuations, mark this path as fully explored,
303                // but push it into the queue
304                // if we don't manage to find anything better
305                current.fully_explored = true;
306                trace!(path = ?current, "fully explored");
307                queue.push(current);
308            }
309
310            iters += 1;
311        }
312
313        Err(PathError::PathNotFound(
314            max_hops,
315            source.to_string(),
316            destination.to_string(),
317        ))
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324
325    use crate::channel_graph::NodeScoreUpdate;
326    use crate::path::Path;
327    use async_lock::RwLock;
328    use core::panic;
329    use regex::Regex;
330    use std::ops::Deref;
331    use std::str::FromStr;
332    use std::sync::Arc;
333
334    lazy_static::lazy_static! {
335        static ref ADDRESSES: [Address; 6] = [
336            Address::from_str("0x0000c178cf70d966be0a798e666ce2782c7b2288").unwrap(),
337            Address::from_str("0x1000d5786d9e6799b3297da1ad55605b91e2c882").unwrap(),
338            Address::from_str("0x200060ddced1e33c9647a71f4fc2cf4ed33e4a9d").unwrap(),
339            Address::from_str("0x30004105095c8c10f804109b4d1199a9ac40ed46").unwrap(),
340            Address::from_str("0x4000a288c38fa8a0f4b79127747257af4a03a623").unwrap(),
341            Address::from_str("0x50002f462ec709cf181bbe44a7e952487bd4591d").unwrap(),
342        ];
343    }
344
345    fn create_channel(src: Address, dst: Address, status: ChannelStatus, stake: Balance) -> ChannelEntry {
346        ChannelEntry::new(src, dst, stake, U256::zero(), status, U256::zero())
347    }
348
349    fn check_path(path: &ChannelPath, graph: &ChannelGraph, dst: Address) -> anyhow::Result<()> {
350        let other = ChannelPath::new(path.hops().into(), graph)?;
351        assert_eq!(other, path.clone(), "valid paths must be equal");
352        assert!(!path.contains_cycle(), "path must not be cyclic");
353        assert!(!path.hops().contains(&dst), "path must not contain destination");
354
355        Ok(())
356    }
357
358    /// Quickly define a graph with edge weights (channel stakes).
359    /// Additional closures allow defining node qualities and edge scores.
360    ///
361    /// Syntax:
362    /// `0 [1] -> 1` => edge from `0` to `1` with edge weight `1`
363    /// `0 <- [1] 1` => edge from `1` to `0` with edge weight `1`
364    /// `0 [1] <-> [2] 1` => edge from `0` to `1` with edge weight `1` and edge from `1` to `0` with edge weight `2`
365    ///
366    /// ```rust
367    /// let star = define_graph(
368    ///     "0 [1] <-> [2] 1, 0 [1] <-> [3] 2, 0 [1] <-> [4] 3, 0 [1] <-> [5] 4",
369    ///     "0x1223d5786d9e6799b3297da1ad55605b91e2c882".parse().unwrap(),
370    ///     |_| 1_f64,
371    ///     None,
372    /// );
373    /// ```
374    fn define_graph<Q, S>(def: &str, me: Address, quality: Q, score: S) -> ChannelGraph
375    where
376        Q: Fn(Address) -> f64,
377        S: Fn(Address, Address) -> f64,
378    {
379        let mut graph = ChannelGraph::new(me, Default::default());
380
381        if def.is_empty() {
382            return graph;
383        }
384
385        let re: Regex = Regex::new(r"^\s*(\d+)\s*(\[\d+\])?\s*(<?->?)\s*(\[\d+\])?\s*(\d+)\s*$").unwrap();
386        let re_stake = Regex::new(r"^\[(\d+)\]$").unwrap();
387
388        let mut match_stake_and_update_channel = |src: Address, dest: Address, stake_str: &str| {
389            let stake_caps = re_stake.captures(stake_str).unwrap();
390
391            if stake_caps.get(0).is_none() {
392                panic!("no matching stake. got {}", stake_str);
393            }
394            graph.update_channel(create_channel(
395                src,
396                dest,
397                ChannelStatus::Open,
398                Balance::new(
399                    U256::from_str(stake_caps.get(1).unwrap().as_str())
400                        .expect("failed to create U256 from given stake"),
401                    BalanceType::HOPR,
402                ),
403            ));
404
405            graph.update_node_score(
406                &src,
407                NodeScoreUpdate::Initialize(Duration::from_millis(10), quality(src)),
408            );
409            graph.update_node_score(
410                &dest,
411                NodeScoreUpdate::Initialize(Duration::from_millis(10), quality(dest)),
412            );
413
414            graph.update_channel_score(&src, &dest, score(src, dest));
415        };
416
417        for edge in def.split(",") {
418            let caps = re.captures(edge).unwrap();
419
420            if caps.get(0).is_none() {
421                panic!("no matching edge. got `{edge}`");
422            }
423
424            let addr_a = ADDRESSES[usize::from_str(caps.get(1).unwrap().as_str()).unwrap()];
425            let addr_b = ADDRESSES[usize::from_str(caps.get(5).unwrap().as_str()).unwrap()];
426
427            let dir = caps.get(3).unwrap().as_str();
428
429            match dir {
430                "->" => {
431                    if let Some(stake_b) = caps.get(4) {
432                        panic!(
433                            "Cannot assign stake for counterparty because channel is unidirectional. Got `{}`",
434                            stake_b.as_str()
435                        );
436                    }
437
438                    let stake_opt_a = caps.get(2).ok_or("missing stake for initiator").unwrap();
439
440                    match_stake_and_update_channel(addr_a, addr_b, stake_opt_a.as_str());
441                }
442                "<-" => {
443                    if let Some(stake_a) = caps.get(2) {
444                        panic!(
445                            "Cannot assign stake for counterparty because channel is unidirectional. Got `{}`",
446                            stake_a.as_str()
447                        );
448                    }
449
450                    let stake_opt_b = caps.get(4).ok_or("missing stake for counterparty").unwrap();
451
452                    match_stake_and_update_channel(addr_b, addr_a, stake_opt_b.as_str());
453                }
454                "<->" => {
455                    let stake_opt_a = caps.get(2).ok_or("missing stake for initiator").unwrap();
456
457                    match_stake_and_update_channel(addr_a, addr_b, stake_opt_a.as_str());
458
459                    let stake_opt_b = caps.get(4).ok_or("missing stake for counterparty").unwrap();
460
461                    match_stake_and_update_channel(addr_b, addr_a, stake_opt_b.as_str());
462                }
463                _ => panic!("unknown direction infix"),
464            };
465        }
466
467        graph
468    }
469
470    #[derive(Default)]
471    pub struct TestWeights;
472    impl EdgeWeighting<U256> for TestWeights {
473        fn calculate_weight(edge: &ChannelEdge) -> U256 {
474            edge.channel.balance.amount() + 1u32
475        }
476    }
477
478    #[async_std::test]
479    async fn test_should_not_find_path_if_isolated() {
480        let graph = Arc::new(RwLock::new(define_graph("", ADDRESSES[0], |_| 1.0, |_, _| 0.0)));
481
482        let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
483
484        selector
485            .select_path(ADDRESSES[0], ADDRESSES[5], 1, 2)
486            .await
487            .expect_err("should not find a path");
488    }
489
490    #[async_std::test]
491    async fn test_should_not_find_zero_weight_path() {
492        let graph = Arc::new(RwLock::new(define_graph(
493            "0 [0] -> 1",
494            ADDRESSES[0],
495            |_| 1.0,
496            |_, _| 0.0,
497        )));
498
499        let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
500
501        selector
502            .select_path(ADDRESSES[0], ADDRESSES[5], 1, 1)
503            .await
504            .expect_err("should not find a path");
505    }
506
507    #[async_std::test]
508    async fn test_should_not_find_one_hop_path_when_unrelated_channels_are_in_the_graph() {
509        let graph = Arc::new(RwLock::new(define_graph(
510            "1 [1] -> 2",
511            ADDRESSES[0],
512            |_| 1.0,
513            |_, _| 0.0,
514        )));
515
516        let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
517
518        selector
519            .select_path(ADDRESSES[0], ADDRESSES[5], 1, 1)
520            .await
521            .expect_err("should not find a path");
522    }
523
524    #[async_std::test]
525    async fn test_should_not_find_one_hop_path_in_empty_graph() {
526        let graph = Arc::new(RwLock::new(define_graph("", ADDRESSES[0], |_| 1.0, |_, _| 0.0)));
527
528        let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
529
530        selector
531            .select_path(ADDRESSES[0], ADDRESSES[5], 1, 1)
532            .await
533            .expect_err("should not find a path");
534    }
535
536    #[async_std::test]
537    async fn test_should_not_find_path_with_unreliable_node() {
538        let graph = Arc::new(RwLock::new(define_graph(
539            "0 [1] -> 1",
540            ADDRESSES[0],
541            |_| 0_f64,
542            |_, _| 0.0,
543        )));
544
545        let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
546
547        selector
548            .select_path(ADDRESSES[0], ADDRESSES[5], 1, 1)
549            .await
550            .expect_err("should not find a path");
551    }
552
553    #[async_std::test]
554    async fn test_should_not_find_loopback_path() {
555        let graph = Arc::new(RwLock::new(define_graph(
556            "0 [1] <-> [1] 1",
557            ADDRESSES[0],
558            |_| 1_f64,
559            |_, _| 0.0,
560        )));
561
562        let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
563
564        selector
565            .select_path(ADDRESSES[0], ADDRESSES[5], 2, 2)
566            .await
567            .expect_err("should not find a path");
568    }
569
570    #[async_std::test]
571    async fn test_should_not_include_destination_in_path() {
572        let graph = Arc::new(RwLock::new(define_graph(
573            "0 [1] -> 1",
574            ADDRESSES[0],
575            |_| 1_f64,
576            |_, _| 0.0,
577        )));
578
579        let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
580
581        selector
582            .select_path(ADDRESSES[0], ADDRESSES[1], 1, 1)
583            .await
584            .expect_err("should not find a path");
585    }
586
587    #[async_std::test]
588    async fn test_should_find_path_in_reliable_star() -> anyhow::Result<()> {
589        let graph = Arc::new(RwLock::new(define_graph(
590            "0 [1] <-> [2] 1, 0 [1] <-> [3] 2, 0 [1] <-> [4] 3, 0 [1] <-> [5] 4",
591            ADDRESSES[1],
592            |_| 1_f64,
593            |_, _| 0.0,
594        )));
595
596        let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
597        let path = selector.select_path(ADDRESSES[1], ADDRESSES[5], 1, 2).await?;
598
599        check_path(&path, graph.read().await.deref(), ADDRESSES[5])?;
600        assert_eq!(2, path.length(), "should have 2 hops");
601
602        Ok(())
603    }
604
605    #[async_std::test]
606    async fn test_should_find_path_in_reliable_arrow_with_lower_weight() -> anyhow::Result<()> {
607        let graph = Arc::new(RwLock::new(define_graph(
608            "0 [1] -> 1, 1 [1] -> 2, 2 [1] -> 3, 1 [1] -> 3",
609            ADDRESSES[0],
610            |_| 1_f64,
611            |_, _| 0.0,
612        )));
613        let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
614
615        let path = selector.select_path(ADDRESSES[0], ADDRESSES[5], 3, 3).await?;
616        check_path(&path, graph.read().await.deref(), ADDRESSES[5])?;
617        assert_eq!(3, path.length(), "should have 3 hops");
618
619        Ok(())
620    }
621
622    #[async_std::test]
623    async fn test_should_find_path_in_reliable_arrow_with_higher_weight() -> anyhow::Result<()> {
624        let graph = Arc::new(RwLock::new(define_graph(
625            "0 [1] -> 1, 1 [2] -> 2, 2 [3] -> 3, 1 [2] -> 3",
626            ADDRESSES[0],
627            |_| 1_f64,
628            |_, _| 0.0,
629        )));
630        let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
631
632        let path = selector.select_path(ADDRESSES[0], ADDRESSES[5], 3, 3).await?;
633        check_path(&path, graph.read().await.deref(), ADDRESSES[5])?;
634        assert_eq!(3, path.length(), "should have 3 hops");
635
636        Ok(())
637    }
638
639    #[async_std::test]
640    async fn test_should_find_path_in_reliable_arrow_with_random_weight() -> anyhow::Result<()> {
641        let graph = Arc::new(RwLock::new(define_graph(
642            "0 [29] -> 1, 1 [5] -> 2, 2 [31] -> 3, 1 [2] -> 3",
643            ADDRESSES[0],
644            |_| 1_f64,
645            |_, _| 0.0,
646        )));
647        let selector = DfsPathSelector::<RandomizedEdgeWeighting>::new(graph.clone(), Default::default());
648
649        let path = selector.select_path(ADDRESSES[0], ADDRESSES[5], 3, 3).await?;
650        check_path(&path, graph.read().await.deref(), ADDRESSES[5])?;
651        assert_eq!(3, path.length(), "should have 3 hops");
652
653        Ok(())
654    }
655}