1use hopr_internal_types::prelude::*;
2use hopr_primitive_types::primitives::Address;
3use petgraph::algo::has_path_connecting;
4use petgraph::dot::Dot;
5use petgraph::prelude::StableDiGraph;
6use petgraph::stable_graph::NodeIndex;
7use petgraph::visit::{EdgeFiltered, EdgeRef, NodeFiltered};
8use petgraph::Direction;
9use serde::{Deserialize, Serialize};
10use serde_with::serde_as;
11use std::collections::hash_map::Entry;
12use std::collections::HashMap;
13use std::fmt::{Debug, Formatter};
14use std::time::Duration;
15use tracing::{debug, warn};
16
17use hopr_primitive_types::prelude::SMA;
18use hopr_primitive_types::sma::SingleSumSMA;
19#[cfg(all(feature = "prometheus", not(test)))]
20use {
21 hopr_internal_types::channels::ChannelDirection, hopr_metrics::metrics::MultiGauge,
22 hopr_primitive_types::traits::ToHex,
23};
24
25#[cfg(all(feature = "prometheus", not(test)))]
26lazy_static::lazy_static! {
27 static ref METRIC_NUMBER_OF_CHANNELS: MultiGauge = MultiGauge::new(
28 "hopr_channels_count",
29 "Number of channels per direction",
30 &["direction"]
31 ).unwrap();
32 static ref METRIC_CHANNEL_BALANCES: MultiGauge = MultiGauge::new(
33 "hopr_channel_balances",
34 "Balances on channels per counterparty",
35 &["counterparty", "direction"]
36 ).unwrap();
37}
38
39#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
42pub struct ChannelEdge {
43 pub channel: ChannelEntry,
45 pub edge_score: Option<f64>,
47}
48
49impl std::fmt::Display for ChannelEdge {
50 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
51 write!(
52 f,
53 "{}; stake={}; score={:?}; status={};",
54 self.channel,
55 self.channel.balance,
56 self.edge_score,
57 self.channel.status.to_string().to_lowercase()
58 )
59 }
60}
61
62#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
66pub struct Node {
67 pub address: Address,
69 pub node_score: f64,
71 pub latency: SingleSumSMA<std::time::Duration, u32>,
73}
74
75impl Node {
76 pub fn new(address: Address, latency_window_length: usize) -> Self {
77 Self {
78 address,
79 node_score: 0.0,
80 latency: SingleSumSMA::new(latency_window_length),
81 }
82 }
83
84 pub fn update_score(&mut self, score_update: NodeScoreUpdate, cfg: ChannelGraphConfig) -> f64 {
91 match score_update {
92 NodeScoreUpdate::Reachable(latency) => {
93 self.node_score = 1.0_f64.min(self.node_score + cfg.node_score_step_up);
94 self.latency.push(latency);
95 }
96 NodeScoreUpdate::Unreachable => {
97 self.node_score /= cfg.node_score_decay;
98 self.latency.clear();
99 if self.node_score < cfg.offline_node_score_threshold {
100 self.node_score = 0.0;
101 }
102 }
103 NodeScoreUpdate::Initialize(latency, node_score) => {
104 self.latency.clear();
105 self.latency.push(latency);
106 self.node_score = node_score.clamp(0.0, 1.0);
107 }
108 }
109 self.node_score
110 }
111}
112
113impl std::fmt::Display for Node {
114 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
115 write!(f, "{}; score={}", self.address, self.node_score)
116 }
117}
118
119#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, smart_default::SmartDefault)]
121pub struct ChannelGraphConfig {
122 #[default(20)]
124 pub latency_sma_window_length: usize,
125 #[default(0.1)]
127 pub node_score_step_up: f64,
128 #[default(4.0)]
130 pub node_score_decay: f64,
131 #[default(0.1)]
135 pub offline_node_score_threshold: f64,
136}
137
138#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
140pub enum NodeScoreUpdate {
141 Reachable(Duration),
143 Unreachable,
145 Initialize(Duration, f64),
148}
149
150impl<T> From<Result<Duration, T>> for NodeScoreUpdate {
151 fn from(result: Result<Duration, T>) -> Self {
152 match result {
153 Ok(duration) => NodeScoreUpdate::Reachable(duration),
154 Err(_) => NodeScoreUpdate::Unreachable,
155 }
156 }
157}
158
159#[serde_as]
176#[derive(Clone, Debug, Serialize, Deserialize)]
177pub struct ChannelGraph {
178 me: Address,
179 #[serde_as(as = "Vec<(_, _)>")]
180 indices: HashMap<Address, u32>,
181 graph: StableDiGraph<Node, ChannelEdge>,
182 cfg: ChannelGraphConfig,
183}
184
185impl ChannelGraph {
186 pub const INTERMEDIATE_HOPS: usize = 3;
188
189 pub fn new(me: Address, cfg: ChannelGraphConfig) -> Self {
191 #[cfg(all(feature = "prometheus", not(test)))]
192 {
193 lazy_static::initialize(&METRIC_NUMBER_OF_CHANNELS);
194 }
195
196 let mut ret = Self {
197 me,
198 cfg,
199 indices: HashMap::new(),
200 graph: StableDiGraph::default(),
201 };
202 ret.indices.insert(
203 me,
204 ret.graph
205 .add_node(Node {
206 address: me,
207 node_score: 1.0,
208 latency: SingleSumSMA::new_with_samples(
209 cfg.latency_sma_window_length,
210 vec![Duration::ZERO; cfg.latency_sma_window_length],
211 ),
212 })
213 .index() as u32,
214 );
215 ret
216 }
217
218 pub fn count_channels(&self) -> usize {
220 self.graph.edge_count()
221 }
222
223 pub fn count_nodes(&self) -> usize {
225 self.graph.node_count()
226 }
227
228 pub fn is_own_channel(&self, channel: &ChannelEntry) -> bool {
230 channel.destination == self.me || channel.source == self.me
231 }
232
233 pub fn my_address(&self) -> Address {
235 self.me
236 }
237
238 fn get_edge(&self, src: &Address, dst: &Address) -> Option<petgraph::stable_graph::EdgeReference<ChannelEdge>> {
239 let (src_idx, dst_idx) = self
240 .indices
241 .get(src)
242 .and_then(|src| self.indices.get(dst).map(|dst| (*src, *dst)))?;
243 self.graph.edges_connecting(src_idx.into(), dst_idx.into()).next()
244 }
245
246 pub fn get_channel(&self, source: &Address, destination: &Address) -> Option<&ChannelEntry> {
249 self.get_edge(source, destination).map(|e| &e.weight().channel)
250 }
251
252 pub fn get_node(&self, node: &Address) -> Option<&Node> {
255 self.indices
256 .get(node)
257 .and_then(|index| self.graph.node_weight((*index).into()))
258 }
259
260 pub fn open_channels_from(&self, source: Address) -> impl Iterator<Item = (&Node, &ChannelEdge)> {
262 let idx = self
264 .indices
265 .get(&source)
266 .cloned()
267 .unwrap_or(self.graph.node_count() as u32);
268 self.graph
269 .edges_directed(idx.into(), Direction::Outgoing)
270 .filter(|c| c.weight().channel.status == ChannelStatus::Open)
271 .map(|e| (&self.graph[e.target()], e.weight()))
272 }
273
274 pub fn has_path(&self, source: &Address, destination: &Address) -> bool {
277 let only_open_graph = EdgeFiltered::from_fn(&self.graph, |e| e.weight().channel.status == ChannelStatus::Open);
278 if let Some((src_idx, dst_idx)) = self
279 .indices
280 .get(source)
281 .and_then(|src| self.indices.get(destination).map(|dst| (*src, *dst)))
282 {
283 has_path_connecting(&only_open_graph, src_idx.into(), dst_idx.into(), None)
284 } else {
285 false
286 }
287 }
288
289 pub fn update_channel(&mut self, channel: ChannelEntry) -> Option<Vec<ChannelChange>> {
293 #[cfg(all(feature = "prometheus", not(test)))]
294 {
295 if let Some(direction) = channel.direction(&self.me) {
296 match direction {
297 ChannelDirection::Outgoing => match channel.status {
298 ChannelStatus::Closed => {
299 METRIC_NUMBER_OF_CHANNELS.decrement(&["out"], 1.0);
300 METRIC_CHANNEL_BALANCES.set(&[channel.destination.to_hex().as_str(), "out"], 0.0);
301 }
302 ChannelStatus::Open => {
303 METRIC_NUMBER_OF_CHANNELS.increment(&["out"], 1.0);
304 METRIC_CHANNEL_BALANCES.set(
305 &[channel.destination.to_hex().as_str(), "out"],
306 channel
307 .balance
308 .amount_base_units()
309 .parse::<f64>()
310 .unwrap_or(f64::INFINITY),
311 );
312 }
313 ChannelStatus::PendingToClose(_) => {}
314 },
315 ChannelDirection::Incoming => match channel.status {
316 ChannelStatus::Closed => {
317 METRIC_NUMBER_OF_CHANNELS.decrement(&["in"], 1.0);
318 METRIC_CHANNEL_BALANCES.set(&[channel.source.to_hex().as_str(), "in"], 0.0);
319 }
320 ChannelStatus::Open => {
321 METRIC_NUMBER_OF_CHANNELS.increment(&["in"], 1.0);
322 METRIC_CHANNEL_BALANCES.set(
323 &[channel.source.to_hex().as_str(), "in"],
324 channel
325 .balance
326 .amount_base_units()
327 .parse::<f64>()
328 .unwrap_or(f64::INFINITY),
329 );
330 }
331 ChannelStatus::PendingToClose(_) => {}
332 },
333 }
334 }
335 }
336
337 let maybe_edge_id = self.get_edge(&channel.source, &channel.destination).map(|e| e.id());
338
339 if channel.status == ChannelStatus::Closed {
341 return maybe_edge_id
342 .and_then(|id| self.graph.remove_edge(id))
343 .inspect(|c| debug!("removed {}", c.channel))
344 .map(|old_value| ChannelChange::diff_channels(&old_value.channel, &channel));
345 }
346
347 if let Some(old_value) = maybe_edge_id.and_then(|id| self.graph.edge_weight_mut(id)) {
349 let old_channel = old_value.channel;
350 old_value.channel = channel;
351
352 let ret = ChannelChange::diff_channels(&old_channel, &channel);
353 debug!(
354 "updated {channel}: {}",
355 ret.iter().map(ChannelChange::to_string).collect::<Vec<_>>().join(",")
356 );
357 Some(ret)
358 } else {
359 let src = *self.indices.entry(channel.source).or_insert_with(|| {
361 self.graph
362 .add_node(Node::new(channel.source, self.cfg.latency_sma_window_length))
363 .index() as u32
364 });
365
366 let dst = *self.indices.entry(channel.destination).or_insert_with(|| {
367 self.graph
368 .add_node(Node::new(channel.destination, self.cfg.latency_sma_window_length))
369 .index() as u32
370 });
371
372 let weighted = ChannelEdge {
373 channel,
374 edge_score: None,
375 };
376
377 self.graph.add_edge(src.into(), dst.into(), weighted);
378 debug!("new {channel}");
379
380 None
381 }
382 }
383
384 pub fn update_node_score(&mut self, address: &Address, score_update: NodeScoreUpdate) {
387 if !self.me.eq(address) {
388 match self.indices.entry(*address) {
389 Entry::Occupied(existing) => {
391 let existing_idx: NodeIndex = (*existing.get()).into();
392 if score_update != NodeScoreUpdate::Unreachable
396 || self.graph.neighbors_undirected(existing_idx).count() > 0
397 {
398 if let Some(node) = self.graph.node_weight_mut(existing_idx) {
400 let updated_quality = node.update_score(score_update, self.cfg);
401 debug!(%address, updated_quality, "updated node quality");
402 } else {
403 warn!(%address, "removed dangling node index from channel graph");
405 existing.remove();
406 }
407 } else {
408 if self
411 .graph
412 .node_weight_mut(existing_idx)
413 .map(|node| node.update_score(score_update, self.cfg))
414 .is_some_and(|updated_quality| updated_quality < self.cfg.offline_node_score_threshold)
415 {
416 self.graph.remove_node(existing.remove().into());
417 debug!(%address, "removed offline node with no channels");
418 }
419 }
420 }
421 Entry::Vacant(new_node) if score_update != NodeScoreUpdate::Unreachable => {
423 let mut inserted_node = Node::new(*address, self.cfg.latency_sma_window_length);
424 let updated_quality = inserted_node.update_score(score_update, self.cfg);
425 new_node.insert(self.graph.add_node(inserted_node).index() as u32);
426 debug!(%address, updated_quality, "added new node");
427 }
428 Entry::Vacant(_) => {}
430 }
431 }
432 }
433
434 pub fn update_channel_score(&mut self, source: &Address, destination: &Address, score: f64) {
437 assert!(score >= 0_f64, "score must be non-negative");
438 let maybe_edge_id = self.get_edge(source, destination).map(|e| e.id());
439 if let Some(channel) = maybe_edge_id.and_then(|id| self.graph.edge_weight_mut(id)) {
440 if score != channel.edge_score.unwrap_or(-1_f64) {
441 channel.edge_score = Some(score);
442 debug!("updated score of {} to {score}", channel.channel);
443 }
444 }
445 }
446
447 pub fn get_channel_score(&self, source: &Address, destination: &Address) -> Option<f64> {
450 self.get_edge(source, destination)
451 .and_then(|e| self.graph.edge_weight(e.id()))
452 .and_then(|e| e.edge_score)
453 }
454
455 pub fn contains_channel(&self, channel: &ChannelEntry) -> bool {
457 self.get_channel(&channel.source, &channel.destination).is_some()
458 }
459
460 pub fn contains_node(&self, address: &Address) -> bool {
462 self.get_node(address).is_some()
463 }
464
465 pub fn as_dot(&self, cfg: GraphExportConfig) -> String {
467 if cfg.ignore_disconnected_components {
468 let only_open_graph =
469 EdgeFiltered::from_fn(&self.graph, |e| e.weight().channel.status == ChannelStatus::Open);
470
471 let me_idx: NodeIndex = (*self.indices.get(&self.me).expect("graph must contain self")).into();
472
473 Dot::new(&NodeFiltered::from_fn(&self.graph, |n| {
474 self.graph.node_weight(n).is_some_and(|n| n.node_score > 0_f64)
477 && has_path_connecting(&only_open_graph, me_idx, n, None)
478 }))
479 .to_string()
480 } else if cfg.ignore_non_opened_channels {
481 Dot::new(&NodeFiltered::from_fn(&self.graph, |a| {
483 self.graph
484 .edges_directed(a, Direction::Outgoing)
485 .any(|e| e.weight().channel.status == ChannelStatus::Open)
486 || self
487 .graph
488 .edges_directed(a, Direction::Incoming)
489 .any(|e| e.weight().channel.status == ChannelStatus::Open)
490 }))
491 .to_string()
492 } else if cfg.only_3_hop_accessible_nodes {
493 let me_idx: NodeIndex = (*self.indices.get(&self.me).expect("graph must contain self")).into();
495 let distances = petgraph::algo::dijkstra(&self.graph, me_idx, None, |e| {
496 if e.weight().channel.status == ChannelStatus::Open {
497 1
498 } else {
499 100
500 }
501 });
502
503 Dot::new(&NodeFiltered::from_fn(&self.graph, |a| {
504 distances.get(&a).map(|d| *d <= 3).unwrap_or(false)
505 }))
506 .to_string()
507 } else {
508 Dot::new(&self.graph).to_string()
509 }
510 }
511}
512
513#[derive(Clone, Debug, Default, Eq, PartialEq)]
517pub struct GraphExportConfig {
518 pub ignore_disconnected_components: bool,
522 pub ignore_non_opened_channels: bool,
524 pub only_3_hop_accessible_nodes: bool,
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531
532 use crate::channel_graph::ChannelGraph;
533 use anyhow::{anyhow, Context};
534 use hopr_internal_types::channels::{ChannelChange, ChannelEntry, ChannelStatus};
535 use hopr_primitive_types::prelude::*;
536 use lazy_static::lazy_static;
537 use std::ops::Add;
538 use std::str::FromStr;
539 use std::time::{Duration, SystemTime};
540
541 lazy_static! {
542 static ref ADDRESSES: [Address; 6] = [
543 Address::from_str("0xafe8c178cf70d966be0a798e666ce2782c7b2288")
544 .expect("lazy static address should be valid"),
545 Address::from_str("0x1223d5786d9e6799b3297da1ad55605b91e2c882")
546 .expect("lazy static address should be valid"),
547 Address::from_str("0x0e3e60ddced1e33c9647a71f4fc2cf4ed33e4a9d")
548 .expect("lazy static address should be valid"),
549 Address::from_str("0x27644105095c8c10f804109b4d1199a9ac40ed46")
550 .expect("lazy static address should be valid"),
551 Address::from_str("0x4701a288c38fa8a0f4b79127747257af4a03a623")
552 .expect("lazy static address should be valid"),
553 Address::from_str("0xfddd2f462ec709cf181bbe44a7e952487bd4591d")
554 .expect("lazy static address should be valid"),
555 ];
556 }
557
558 fn dummy_channel(src: Address, dst: Address, status: ChannelStatus) -> ChannelEntry {
559 ChannelEntry::new(
560 src,
561 dst,
562 Balance::new_from_str("1", BalanceType::HOPR),
563 1u32.into(),
564 status,
565 1u32.into(),
566 )
567 }
568
569 #[test]
570 fn channel_graph_self_addr() {
571 let cg = ChannelGraph::new(ADDRESSES[0], Default::default());
572 assert_eq!(ADDRESSES[0], cg.my_address(), "must produce correct self address");
573
574 assert!(cg.contains_node(&ADDRESSES[0]), "must contain self address");
575
576 assert_eq!(
577 cg.get_node(&ADDRESSES[0]).cloned(),
578 Some(Node {
579 address: ADDRESSES[0],
580 node_score: 1.0,
581 latency: SingleSumSMA::new_with_samples(
582 cg.cfg.latency_sma_window_length,
583 vec![Duration::ZERO; cg.cfg.latency_sma_window_length]
584 )
585 }),
586 "must contain self node with quality 1"
587 );
588 }
589
590 #[test]
591 fn channel_graph_has_path() {
592 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
593
594 let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
595 cg.update_channel(c);
596
597 assert!(cg.contains_channel(&c), "must contain channel");
598
599 assert!(cg.contains_node(&ADDRESSES[0]), "must contain channel source");
600
601 assert!(cg.contains_node(&ADDRESSES[1]), "must contain channel destination");
602
603 assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]), "must have simple path");
604
605 assert!(
606 !cg.has_path(&ADDRESSES[0], &ADDRESSES[2]),
607 "must not have non existent path"
608 );
609 }
610
611 #[test]
612 fn channel_graph_update_node_quality() -> anyhow::Result<()> {
613 let mut cg = ChannelGraph::new(
614 ADDRESSES[0],
615 ChannelGraphConfig {
616 node_score_step_up: 0.1,
617 node_score_decay: 4.0,
618 offline_node_score_threshold: 0.1,
619 ..Default::default()
620 },
621 );
622
623 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(100)));
624 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
625 assert_eq!(node.node_score, 0.1);
626 assert_eq!(node.latency.average(), Some(Duration::from_millis(100)));
627
628 assert!(!cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
629
630 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
631 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
632 assert_eq!(node.node_score, 0.2);
633 assert_eq!(node.latency.average(), Some(Duration::from_millis(75)));
634
635 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(30)));
636 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(20)));
637
638 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
639 assert_eq!(node.node_score, 0.4);
640 assert_eq!(node.latency.average(), Some(Duration::from_millis(50)));
641
642 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
643 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
644 assert_eq!(node.node_score, 0.1);
645 assert!(node.latency.average().is_none());
646
647 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
648
649 assert_eq!(cg.get_node(&ADDRESSES[1]), None);
651
652 Ok(())
653 }
654
655 #[test]
656 fn channel_graph_update_node_quality_should_not_remove_nodes_with_zero_quality_and_path() -> anyhow::Result<()> {
657 let mut cg = ChannelGraph::new(
658 ADDRESSES[0],
659 ChannelGraphConfig {
660 node_score_step_up: 0.1,
661 node_score_decay: 4.0,
662 offline_node_score_threshold: 0.1,
663 ..Default::default()
664 },
665 );
666
667 let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
668 cg.update_channel(c);
669
670 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
671 assert_eq!(node.address, ADDRESSES[1]);
672 assert_eq!(node.node_score, 0.0);
673 assert!(node.latency.is_empty());
674
675 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
676
677 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
678 assert_eq!(node.address, ADDRESSES[1]);
679 assert_eq!(node.node_score, 0.1);
680
681 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
682 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
683 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
684
685 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
686 assert_eq!(node.address, ADDRESSES[1]);
687 assert_eq!(node.node_score, 0.4);
688
689 assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
690
691 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
692
693 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
694 assert_eq!(node.address, ADDRESSES[1]);
695 assert_eq!(node.node_score, 0.1);
696
697 assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
698
699 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
700
701 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
702 assert_eq!(node.address, ADDRESSES[1]);
703 assert_eq!(node.node_score, 0.0);
704
705 assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
706
707 Ok(())
708 }
709
710 #[test]
711 fn channel_graph_update_channel_score() -> anyhow::Result<()> {
712 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
713
714 let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
715 cg.update_channel(c);
716
717 assert!(cg.contains_channel(&c), "must contain channel");
718 assert!(
719 cg.get_channel_score(&ADDRESSES[0], &ADDRESSES[1]).is_none(),
720 "must start with no quality info"
721 );
722
723 cg.update_channel_score(&ADDRESSES[0], &ADDRESSES[1], 0.5_f64);
724
725 let q = cg
726 .get_channel_score(&ADDRESSES[0], &ADDRESSES[1])
727 .context("must have quality when set")?;
728 assert_eq!(0.5_f64, q, "quality must be equal");
729
730 Ok(())
731 }
732
733 #[test]
734 fn channel_graph_is_own_channel() {
735 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
736
737 let c1 = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
738 let c2 = dummy_channel(ADDRESSES[1], ADDRESSES[2], ChannelStatus::Open);
739 cg.update_channel(c1);
740 cg.update_channel(c2);
741
742 assert!(cg.is_own_channel(&c1), "must detect as own channel");
743 assert!(!cg.is_own_channel(&c2), "must not detect as own channel");
744 }
745
746 #[test]
747 fn channel_graph_update_changes() -> anyhow::Result<()> {
748 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
749
750 let mut c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
751
752 let changes = cg.update_channel(c);
753 assert!(changes.is_none(), "must not produce changes for a new channel");
754
755 let cr = cg
756 .get_channel(&ADDRESSES[0], &ADDRESSES[1])
757 .context("must contain channel")?;
758 assert!(c.eq(cr), "channels must be equal");
759
760 let ts = SystemTime::now().add(Duration::from_secs(10));
761 c.balance = Balance::zero(BalanceType::HOPR);
762 c.status = ChannelStatus::PendingToClose(ts);
763 let changes = cg.update_channel(c).context("should contain channel changes")?;
764 assert_eq!(2, changes.len(), "must contain 2 changes");
765
766 for change in changes {
767 match change {
768 ChannelChange::Status { left, right } => {
769 assert_eq!(ChannelStatus::Open, left, "previous status does not match");
770 assert_eq!(ChannelStatus::PendingToClose(ts), right, "new status does not match");
771 }
772 ChannelChange::CurrentBalance { left, right } => {
773 assert_eq!(
774 Balance::new(1_u32, BalanceType::HOPR),
775 left,
776 "previous balance does not match"
777 );
778 assert_eq!(Balance::zero(BalanceType::HOPR), right, "new balance does not match");
779 }
780 _ => panic!("unexpected change"),
781 }
782 }
783
784 let cr = cg
785 .get_channel(&ADDRESSES[0], &ADDRESSES[1])
786 .context("must contain channel")?;
787 assert!(c.eq(cr), "channels must be equal");
788
789 Ok(())
790 }
791
792 #[test]
793 fn channel_graph_update_changes_on_close() -> anyhow::Result<()> {
794 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
795
796 let ts = SystemTime::now().add(Duration::from_secs(10));
797 let mut c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::PendingToClose(ts));
798
799 let changes = cg.update_channel(c);
800 assert!(changes.is_none(), "must not produce changes for a new channel");
801
802 let cr = cg
803 .get_channel(&ADDRESSES[0], &ADDRESSES[1])
804 .context("must contain channel")?;
805 assert!(c.eq(cr), "channels must be equal");
806
807 c.balance = Balance::zero(BalanceType::HOPR);
808 c.status = ChannelStatus::Closed;
809 let changes = cg.update_channel(c).context("must contain changes")?;
810 assert_eq!(2, changes.len(), "must contain 2 changes");
811
812 for change in changes {
813 match change {
814 ChannelChange::Status { left, right } => {
815 assert_eq!(
816 ChannelStatus::PendingToClose(ts),
817 left,
818 "previous status does not match"
819 );
820 assert_eq!(ChannelStatus::Closed, right, "new status does not match");
821 }
822 ChannelChange::CurrentBalance { left, right } => {
823 assert_eq!(
824 Balance::new(1_u32, BalanceType::HOPR),
825 left,
826 "previous balance does not match"
827 );
828 assert_eq!(Balance::zero(BalanceType::HOPR), right, "new balance does not match");
829 }
830 _ => panic!("unexpected change"),
831 }
832 }
833
834 let cr = cg.get_channel(&ADDRESSES[0], &ADDRESSES[1]);
835 assert!(cr.is_none(), "must not contain channel after closing");
836
837 Ok(())
838 }
839
840 #[test]
841 fn channel_graph_update_should_not_allow_closed_channels() {
842 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
843 let changes = cg.update_channel(dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Closed));
844 assert!(changes.is_none(), "must not produce changes for a closed channel");
845
846 let c = cg.get_channel(&ADDRESSES[0], &ADDRESSES[1]);
847 assert!(c.is_none(), "must not allow adding closed channels");
848 }
849
850 #[test]
851 fn channel_graph_update_should_allow_pending_to_close_channels() -> anyhow::Result<()> {
852 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
853 let ts = SystemTime::now().add(Duration::from_secs(10));
854 let changes = cg.update_channel(dummy_channel(
855 ADDRESSES[0],
856 ADDRESSES[1],
857 ChannelStatus::PendingToClose(ts),
858 ));
859 assert!(changes.is_none(), "must not produce changes for a closed channel");
860
861 cg.get_channel(&ADDRESSES[0], &ADDRESSES[1])
862 .context("must allow PendingToClose channels")?;
863
864 Ok(())
865 }
866}