1use async_trait::async_trait;
2use hopr_crypto_random::random_float;
3use hopr_internal_types::prelude::*;
4use hopr_primitive_types::prelude::*;
5use std::cmp::Ordering;
6use std::collections::BinaryHeap;
7use std::marker::PhantomData;
8use std::time::Duration;
9use tracing::trace;
10
11use crate::channel_graph::{ChannelEdge, ChannelGraph, Node};
12use crate::errors::{PathError, Result};
13use crate::path::ChannelPath;
14use crate::selectors::{EdgeWeighting, PathSelector};
15
16#[derive(Clone, Debug, PartialEq, Eq)]
18struct WeightedChannelPath {
19 path: Vec<Address>,
20 weight: U256,
21 fully_explored: bool,
22}
23
24impl WeightedChannelPath {
25 fn extend<CW: EdgeWeighting<U256>>(mut self, edge: &ChannelEdge) -> Self {
27 if !self.fully_explored {
28 self.path.push(edge.channel.destination);
29 self.weight += CW::calculate_weight(edge);
30 }
31 self
32 }
33}
34
35impl Default for WeightedChannelPath {
36 fn default() -> Self {
37 Self {
38 path: Vec::with_capacity(INTERMEDIATE_HOPS),
39 weight: U256::zero(),
40 fully_explored: false,
41 }
42 }
43}
44
45impl PartialOrd for WeightedChannelPath {
46 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
47 Some(self.cmp(other))
48 }
49}
50
51impl Ord for WeightedChannelPath {
52 fn cmp(&self, other: &Self) -> Ordering {
61 if other.fully_explored == self.fully_explored {
62 match self.path.len().cmp(&other.path.len()) {
63 Ordering::Equal => self.weight.cmp(&other.weight),
64 o => o,
65 }
66 } else if other.fully_explored {
67 Ordering::Greater
68 } else {
69 Ordering::Less
70 }
71 }
72}
73
74#[derive(Clone, Copy, Debug, Default)]
80pub struct RandomizedEdgeWeighting;
81
82impl EdgeWeighting<U256> for RandomizedEdgeWeighting {
83 fn calculate_weight(edge: &ChannelEdge) -> U256 {
90 edge.channel
91 .balance
92 .amount()
93 .mul_f64(random_float())
94 .expect("Could not multiply edge weight with float")
95 .max(1.into())
96 }
97}
98
99#[derive(Clone, Copy, Debug, PartialEq, smart_default::SmartDefault)]
100pub struct DfsPathSelectorConfig {
101 #[default(100)]
104 pub max_iterations: usize,
105 #[default(0.5)]
108 pub node_score_threshold: f64,
109 #[default(0.0)]
112 pub edge_score_threshold: f64,
113 #[default(Some(Duration::from_millis(100)))]
116 pub max_first_hop_latency: Option<Duration>,
117 #[default(false)]
122 pub allow_zero_edge_weight: bool,
123}
124
125#[derive(Clone, Debug)]
127pub struct DfsPathSelector<CW> {
128 graph: std::sync::Arc<async_lock::RwLock<ChannelGraph>>,
129 cfg: DfsPathSelectorConfig,
130 _cw: PhantomData<CW>,
131}
132
133impl<CW: EdgeWeighting<U256>> DfsPathSelector<CW> {
134 pub fn new(graph: std::sync::Arc<async_lock::RwLock<ChannelGraph>>, cfg: DfsPathSelectorConfig) -> Self {
137 Self {
138 graph,
139 cfg,
140 _cw: PhantomData,
141 }
142 }
143
144 #[tracing::instrument(level = "trace", skip(self))]
159 fn is_next_hop_usable(
160 &self,
161 next_hop: &Node,
162 edge: &ChannelEdge,
163 initial_source: &Address,
164 final_destination: &Address,
165 current_path: &[Address],
166 ) -> bool {
167 debug_assert_eq!(next_hop.address, edge.channel.destination);
168
169 if next_hop.address.eq(initial_source) {
171 trace!("source loopback not allowed");
172 return false;
173 }
174
175 if next_hop.address.eq(final_destination) {
178 trace!("destination loopback not allowed");
179 return false;
180 }
181
182 if next_hop.node_score < self.cfg.node_score_threshold {
184 trace!("node quality threshold not satisfied");
185 return false;
186 }
187
188 if edge
190 .edge_score
191 .is_some_and(|score| score < self.cfg.edge_score_threshold)
192 {
193 trace!("channel score threshold not satisfied");
194 return false;
195 }
196
197 if current_path.is_empty()
199 && self
200 .cfg
201 .max_first_hop_latency
202 .is_some_and(|limit| next_hop.latency.average().is_none_or(|avg_latency| avg_latency > limit))
203 {
204 trace!("first hop latency too high");
205 return false;
206 }
207
208 if current_path.contains(&next_hop.address) {
211 trace!("circles not allowed");
212 return false;
213 }
214
215 if !self.cfg.allow_zero_edge_weight && edge.channel.balance.is_zero() {
217 trace!(%next_hop, "zero stake channels not allowed");
218 return false;
219 }
220
221 trace!("usable node");
222 true
223 }
224}
225
226#[async_trait]
227impl<CW> PathSelector for DfsPathSelector<CW>
228where
229 CW: EdgeWeighting<U256> + Send + Sync,
230{
231 async fn select_path(
239 &self,
240 source: Address,
241 destination: Address,
242 min_hops: usize,
243 max_hops: usize,
244 ) -> Result<ChannelPath> {
245 if !(1..=INTERMEDIATE_HOPS).contains(&max_hops) || !(1..=max_hops).contains(&min_hops) {
248 return Err(GeneralError::InvalidInput.into());
249 }
250
251 let graph = self.graph.read().await;
252
253 let mut queue = graph
255 .open_channels_from(source)
256 .filter(|(node, edge)| self.is_next_hop_usable(node, edge, &source, &destination, &[]))
257 .map(|(_, edge)| WeightedChannelPath::default().extend::<CW>(edge))
258 .collect::<BinaryHeap<_>>();
259
260 trace!(last_peer = %source, queue_len = queue.len(), "got next possible steps");
261
262 let mut iters = 0;
263 while let Some(mut current) = queue.pop() {
264 let current_len = current.path.len();
265
266 trace!(
267 ?current,
268 ?queue,
269 queue_len = queue.len(),
270 iters,
271 min_hops,
272 max_hops,
273 "testing next path in queue"
274 );
275 if current_len == max_hops || current.fully_explored || iters > self.cfg.max_iterations {
276 return if current_len >= min_hops && current_len <= max_hops {
277 Ok(ChannelPath::new_valid(current.path))
278 } else {
279 trace!(current_len, min_hops, max_hops, iters, "path not found");
280 Err(PathError::PathNotFound(
281 max_hops,
282 source.to_string(),
283 destination.to_string(),
284 ))
285 };
286 }
287
288 let last_peer = *current.path.last().unwrap();
290 let mut new_channels = graph
291 .open_channels_from(last_peer)
292 .filter(|(next_hop, edge)| {
293 self.is_next_hop_usable(next_hop, edge, &source, &destination, ¤t.path)
294 })
295 .peekable();
296
297 if new_channels.peek().is_some() {
299 queue.extend(new_channels.map(|(_, edge)| current.clone().extend::<CW>(edge)));
300 trace!(%last_peer, queue_len = queue.len(), "got next possible steps");
301 } else {
302 current.fully_explored = true;
306 trace!(path = ?current, "fully explored");
307 queue.push(current);
308 }
309
310 iters += 1;
311 }
312
313 Err(PathError::PathNotFound(
314 max_hops,
315 source.to_string(),
316 destination.to_string(),
317 ))
318 }
319}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324
325 use crate::channel_graph::NodeScoreUpdate;
326 use crate::path::Path;
327 use async_lock::RwLock;
328 use core::panic;
329 use regex::Regex;
330 use std::ops::Deref;
331 use std::str::FromStr;
332 use std::sync::Arc;
333
334 lazy_static::lazy_static! {
335 static ref ADDRESSES: [Address; 6] = [
336 Address::from_str("0x0000c178cf70d966be0a798e666ce2782c7b2288").unwrap(),
337 Address::from_str("0x1000d5786d9e6799b3297da1ad55605b91e2c882").unwrap(),
338 Address::from_str("0x200060ddced1e33c9647a71f4fc2cf4ed33e4a9d").unwrap(),
339 Address::from_str("0x30004105095c8c10f804109b4d1199a9ac40ed46").unwrap(),
340 Address::from_str("0x4000a288c38fa8a0f4b79127747257af4a03a623").unwrap(),
341 Address::from_str("0x50002f462ec709cf181bbe44a7e952487bd4591d").unwrap(),
342 ];
343 }
344
345 fn create_channel(src: Address, dst: Address, status: ChannelStatus, stake: Balance) -> ChannelEntry {
346 ChannelEntry::new(src, dst, stake, U256::zero(), status, U256::zero())
347 }
348
349 fn check_path(path: &ChannelPath, graph: &ChannelGraph, dst: Address) -> anyhow::Result<()> {
350 let other = ChannelPath::new(path.hops().into(), graph)?;
351 assert_eq!(other, path.clone(), "valid paths must be equal");
352 assert!(!path.contains_cycle(), "path must not be cyclic");
353 assert!(!path.hops().contains(&dst), "path must not contain destination");
354
355 Ok(())
356 }
357
358 fn define_graph<Q, S>(def: &str, me: Address, quality: Q, score: S) -> ChannelGraph
375 where
376 Q: Fn(Address) -> f64,
377 S: Fn(Address, Address) -> f64,
378 {
379 let mut graph = ChannelGraph::new(me, Default::default());
380
381 if def.is_empty() {
382 return graph;
383 }
384
385 let re: Regex = Regex::new(r"^\s*(\d+)\s*(\[\d+\])?\s*(<?->?)\s*(\[\d+\])?\s*(\d+)\s*$").unwrap();
386 let re_stake = Regex::new(r"^\[(\d+)\]$").unwrap();
387
388 let mut match_stake_and_update_channel = |src: Address, dest: Address, stake_str: &str| {
389 let stake_caps = re_stake.captures(stake_str).unwrap();
390
391 if stake_caps.get(0).is_none() {
392 panic!("no matching stake. got {}", stake_str);
393 }
394 graph.update_channel(create_channel(
395 src,
396 dest,
397 ChannelStatus::Open,
398 Balance::new(
399 U256::from_str(stake_caps.get(1).unwrap().as_str())
400 .expect("failed to create U256 from given stake"),
401 BalanceType::HOPR,
402 ),
403 ));
404
405 graph.update_node_score(
406 &src,
407 NodeScoreUpdate::Initialize(Duration::from_millis(10), quality(src)),
408 );
409 graph.update_node_score(
410 &dest,
411 NodeScoreUpdate::Initialize(Duration::from_millis(10), quality(dest)),
412 );
413
414 graph.update_channel_score(&src, &dest, score(src, dest));
415 };
416
417 for edge in def.split(",") {
418 let caps = re.captures(edge).unwrap();
419
420 if caps.get(0).is_none() {
421 panic!("no matching edge. got `{edge}`");
422 }
423
424 let addr_a = ADDRESSES[usize::from_str(caps.get(1).unwrap().as_str()).unwrap()];
425 let addr_b = ADDRESSES[usize::from_str(caps.get(5).unwrap().as_str()).unwrap()];
426
427 let dir = caps.get(3).unwrap().as_str();
428
429 match dir {
430 "->" => {
431 if let Some(stake_b) = caps.get(4) {
432 panic!(
433 "Cannot assign stake for counterparty because channel is unidirectional. Got `{}`",
434 stake_b.as_str()
435 );
436 }
437
438 let stake_opt_a = caps.get(2).ok_or("missing stake for initiator").unwrap();
439
440 match_stake_and_update_channel(addr_a, addr_b, stake_opt_a.as_str());
441 }
442 "<-" => {
443 if let Some(stake_a) = caps.get(2) {
444 panic!(
445 "Cannot assign stake for counterparty because channel is unidirectional. Got `{}`",
446 stake_a.as_str()
447 );
448 }
449
450 let stake_opt_b = caps.get(4).ok_or("missing stake for counterparty").unwrap();
451
452 match_stake_and_update_channel(addr_b, addr_a, stake_opt_b.as_str());
453 }
454 "<->" => {
455 let stake_opt_a = caps.get(2).ok_or("missing stake for initiator").unwrap();
456
457 match_stake_and_update_channel(addr_a, addr_b, stake_opt_a.as_str());
458
459 let stake_opt_b = caps.get(4).ok_or("missing stake for counterparty").unwrap();
460
461 match_stake_and_update_channel(addr_b, addr_a, stake_opt_b.as_str());
462 }
463 _ => panic!("unknown direction infix"),
464 };
465 }
466
467 graph
468 }
469
470 #[derive(Default)]
471 pub struct TestWeights;
472 impl EdgeWeighting<U256> for TestWeights {
473 fn calculate_weight(edge: &ChannelEdge) -> U256 {
474 edge.channel.balance.amount() + 1u32
475 }
476 }
477
478 #[async_std::test]
479 async fn test_should_not_find_path_if_isolated() {
480 let graph = Arc::new(RwLock::new(define_graph("", ADDRESSES[0], |_| 1.0, |_, _| 0.0)));
481
482 let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
483
484 selector
485 .select_path(ADDRESSES[0], ADDRESSES[5], 1, 2)
486 .await
487 .expect_err("should not find a path");
488 }
489
490 #[async_std::test]
491 async fn test_should_not_find_zero_weight_path() {
492 let graph = Arc::new(RwLock::new(define_graph(
493 "0 [0] -> 1",
494 ADDRESSES[0],
495 |_| 1.0,
496 |_, _| 0.0,
497 )));
498
499 let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
500
501 selector
502 .select_path(ADDRESSES[0], ADDRESSES[5], 1, 1)
503 .await
504 .expect_err("should not find a path");
505 }
506
507 #[async_std::test]
508 async fn test_should_not_find_one_hop_path_when_unrelated_channels_are_in_the_graph() {
509 let graph = Arc::new(RwLock::new(define_graph(
510 "1 [1] -> 2",
511 ADDRESSES[0],
512 |_| 1.0,
513 |_, _| 0.0,
514 )));
515
516 let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
517
518 selector
519 .select_path(ADDRESSES[0], ADDRESSES[5], 1, 1)
520 .await
521 .expect_err("should not find a path");
522 }
523
524 #[async_std::test]
525 async fn test_should_not_find_one_hop_path_in_empty_graph() {
526 let graph = Arc::new(RwLock::new(define_graph("", ADDRESSES[0], |_| 1.0, |_, _| 0.0)));
527
528 let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
529
530 selector
531 .select_path(ADDRESSES[0], ADDRESSES[5], 1, 1)
532 .await
533 .expect_err("should not find a path");
534 }
535
536 #[async_std::test]
537 async fn test_should_not_find_path_with_unreliable_node() {
538 let graph = Arc::new(RwLock::new(define_graph(
539 "0 [1] -> 1",
540 ADDRESSES[0],
541 |_| 0_f64,
542 |_, _| 0.0,
543 )));
544
545 let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
546
547 selector
548 .select_path(ADDRESSES[0], ADDRESSES[5], 1, 1)
549 .await
550 .expect_err("should not find a path");
551 }
552
553 #[async_std::test]
554 async fn test_should_not_find_loopback_path() {
555 let graph = Arc::new(RwLock::new(define_graph(
556 "0 [1] <-> [1] 1",
557 ADDRESSES[0],
558 |_| 1_f64,
559 |_, _| 0.0,
560 )));
561
562 let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
563
564 selector
565 .select_path(ADDRESSES[0], ADDRESSES[5], 2, 2)
566 .await
567 .expect_err("should not find a path");
568 }
569
570 #[async_std::test]
571 async fn test_should_not_include_destination_in_path() {
572 let graph = Arc::new(RwLock::new(define_graph(
573 "0 [1] -> 1",
574 ADDRESSES[0],
575 |_| 1_f64,
576 |_, _| 0.0,
577 )));
578
579 let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
580
581 selector
582 .select_path(ADDRESSES[0], ADDRESSES[1], 1, 1)
583 .await
584 .expect_err("should not find a path");
585 }
586
587 #[async_std::test]
588 async fn test_should_find_path_in_reliable_star() -> anyhow::Result<()> {
589 let graph = Arc::new(RwLock::new(define_graph(
590 "0 [1] <-> [2] 1, 0 [1] <-> [3] 2, 0 [1] <-> [4] 3, 0 [1] <-> [5] 4",
591 ADDRESSES[1],
592 |_| 1_f64,
593 |_, _| 0.0,
594 )));
595
596 let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
597 let path = selector.select_path(ADDRESSES[1], ADDRESSES[5], 1, 2).await?;
598
599 check_path(&path, graph.read().await.deref(), ADDRESSES[5])?;
600 assert_eq!(2, path.length(), "should have 2 hops");
601
602 Ok(())
603 }
604
605 #[async_std::test]
606 async fn test_should_find_path_in_reliable_arrow_with_lower_weight() -> anyhow::Result<()> {
607 let graph = Arc::new(RwLock::new(define_graph(
608 "0 [1] -> 1, 1 [1] -> 2, 2 [1] -> 3, 1 [1] -> 3",
609 ADDRESSES[0],
610 |_| 1_f64,
611 |_, _| 0.0,
612 )));
613 let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
614
615 let path = selector.select_path(ADDRESSES[0], ADDRESSES[5], 3, 3).await?;
616 check_path(&path, graph.read().await.deref(), ADDRESSES[5])?;
617 assert_eq!(3, path.length(), "should have 3 hops");
618
619 Ok(())
620 }
621
622 #[async_std::test]
623 async fn test_should_find_path_in_reliable_arrow_with_higher_weight() -> anyhow::Result<()> {
624 let graph = Arc::new(RwLock::new(define_graph(
625 "0 [1] -> 1, 1 [2] -> 2, 2 [3] -> 3, 1 [2] -> 3",
626 ADDRESSES[0],
627 |_| 1_f64,
628 |_, _| 0.0,
629 )));
630 let selector = DfsPathSelector::<TestWeights>::new(graph.clone(), Default::default());
631
632 let path = selector.select_path(ADDRESSES[0], ADDRESSES[5], 3, 3).await?;
633 check_path(&path, graph.read().await.deref(), ADDRESSES[5])?;
634 assert_eq!(3, path.length(), "should have 3 hops");
635
636 Ok(())
637 }
638
639 #[async_std::test]
640 async fn test_should_find_path_in_reliable_arrow_with_random_weight() -> anyhow::Result<()> {
641 let graph = Arc::new(RwLock::new(define_graph(
642 "0 [29] -> 1, 1 [5] -> 2, 2 [31] -> 3, 1 [2] -> 3",
643 ADDRESSES[0],
644 |_| 1_f64,
645 |_, _| 0.0,
646 )));
647 let selector = DfsPathSelector::<RandomizedEdgeWeighting>::new(graph.clone(), Default::default());
648
649 let path = selector.select_path(ADDRESSES[0], ADDRESSES[5], 3, 3).await?;
650 check_path(&path, graph.read().await.deref(), ADDRESSES[5])?;
651 assert_eq!(3, path.length(), "should have 3 hops");
652
653 Ok(())
654 }
655}