1use std::{
2 collections::{HashMap, hash_map::Entry},
3 fmt::{Debug, Formatter},
4 time::Duration,
5};
6
7use hopr_internal_types::prelude::*;
8use hopr_primitive_types::{prelude::SMA, primitives::Address, sma::SingleSumSMA};
9use petgraph::{
10 Direction,
11 algo::has_path_connecting,
12 dot::Dot,
13 prelude::StableDiGraph,
14 stable_graph::NodeIndex,
15 visit::{EdgeFiltered, EdgeRef, NodeFiltered},
16};
17use tracing::{debug, warn};
18#[cfg(all(feature = "prometheus", not(test)))]
19use {hopr_internal_types::channels::ChannelDirection, hopr_metrics::MultiGauge};
20
21#[cfg(all(feature = "prometheus", not(test)))]
22lazy_static::lazy_static! {
23 static ref METRIC_NUMBER_OF_CHANNELS: MultiGauge = MultiGauge::new(
24 "hopr_channels_count",
25 "Number of channels per direction",
26 &["direction"]
27 ).unwrap();
28}
29
30#[derive(Clone, Copy, Debug, PartialEq)]
33#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
34pub struct ChannelEdge {
35 pub channel: ChannelEntry,
37 pub edge_score: Option<f64>,
39}
40
41impl std::fmt::Display for ChannelEdge {
42 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
43 write!(
44 f,
45 "{}; stake={}; score={:?}; status={};",
46 self.channel,
47 self.channel.balance,
48 self.edge_score,
49 self.channel.status.to_string().to_lowercase()
50 )
51 }
52}
53
54#[derive(Clone, Debug, PartialEq)]
58#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
59pub struct Node {
60 pub address: Address,
62 pub node_score: f64,
64 pub latency: SingleSumSMA<std::time::Duration, u32>,
66}
67
68impl Node {
69 pub fn new(address: Address, latency_window_length: usize) -> Self {
70 Self {
71 address,
72 node_score: 0.0,
73 latency: SingleSumSMA::new(latency_window_length),
74 }
75 }
76
77 pub fn update_score(&mut self, score_update: NodeScoreUpdate, cfg: ChannelGraphConfig) -> f64 {
84 match score_update {
85 NodeScoreUpdate::Reachable(latency) => {
86 self.node_score = 1.0_f64.min(self.node_score + cfg.node_score_step_up);
87 self.latency.push(latency);
88 }
89 NodeScoreUpdate::Unreachable => {
90 self.node_score /= cfg.node_score_decay;
91 self.latency.clear();
92 if self.node_score < cfg.offline_node_score_threshold {
93 self.node_score = 0.0;
94 }
95 }
96 NodeScoreUpdate::Initialize(latency, node_score) => {
97 self.latency.clear();
98 self.latency.push(latency);
99 self.node_score = node_score.clamp(0.0, 1.0);
100 }
101 }
102 self.node_score
103 }
104}
105
106impl std::fmt::Display for Node {
107 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
108 write!(f, "{}; score={}", self.address, self.node_score)
109 }
110}
111
112#[derive(Clone, Copy, Debug, PartialEq, smart_default::SmartDefault)]
114#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
115pub struct ChannelGraphConfig {
116 #[default(20)]
118 pub latency_sma_window_length: usize,
119 #[default(0.1)]
121 pub node_score_step_up: f64,
122 #[default(4.0)]
124 pub node_score_decay: f64,
125 #[default(0.1)]
129 pub offline_node_score_threshold: f64,
130}
131
132#[derive(Clone, Copy, Debug, PartialEq)]
134#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
135pub enum NodeScoreUpdate {
136 Reachable(Duration),
138 Unreachable,
140 Initialize(Duration, f64),
143}
144
145impl<T> From<Result<Duration, T>> for NodeScoreUpdate {
146 fn from(result: Result<Duration, T>) -> Self {
147 match result {
148 Ok(duration) => NodeScoreUpdate::Reachable(duration),
149 Err(_) => NodeScoreUpdate::Unreachable,
150 }
151 }
152}
153
154#[cfg_attr(feature = "serde", cfg_eval::cfg_eval, serde_with::serde_as)]
171#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
172#[derive(Clone, Debug)]
173pub struct ChannelGraph {
174 me: Address,
175 #[cfg_attr(feature = "serde", serde_as(as = "Vec<(_, _)>"))]
176 indices: HashMap<Address, u32>,
177 graph: StableDiGraph<Node, ChannelEdge>,
178 cfg: ChannelGraphConfig,
179}
180
181impl ChannelGraph {
182 pub const INTERMEDIATE_HOPS: usize = 3;
184
185 pub fn new(me: Address, cfg: ChannelGraphConfig) -> Self {
187 #[cfg(all(feature = "prometheus", not(test)))]
188 {
189 lazy_static::initialize(&METRIC_NUMBER_OF_CHANNELS);
190 }
191
192 let mut ret = Self {
193 me,
194 cfg,
195 indices: HashMap::new(),
196 graph: StableDiGraph::default(),
197 };
198 ret.indices.insert(
199 me,
200 ret.graph
201 .add_node(Node {
202 address: me,
203 node_score: 1.0,
204 latency: SingleSumSMA::new_with_samples(
205 cfg.latency_sma_window_length,
206 vec![Duration::ZERO; cfg.latency_sma_window_length],
207 ),
208 })
209 .index() as u32,
210 );
211 ret
212 }
213
214 pub fn count_channels(&self) -> usize {
216 self.graph.edge_count()
217 }
218
219 pub fn count_nodes(&self) -> usize {
221 self.graph.node_count()
222 }
223
224 pub fn is_own_channel(&self, channel: &ChannelEntry) -> bool {
226 channel.destination == self.me || channel.source == self.me
227 }
228
229 pub fn my_address(&self) -> Address {
231 self.me
232 }
233
234 fn get_edge(&self, src: &Address, dst: &Address) -> Option<petgraph::stable_graph::EdgeReference<'_, ChannelEdge>> {
235 let (src_idx, dst_idx) = self
236 .indices
237 .get(src)
238 .and_then(|src| self.indices.get(dst).map(|dst| (*src, *dst)))?;
239 self.graph.edges_connecting(src_idx.into(), dst_idx.into()).next()
240 }
241
242 pub fn get_channel(&self, source: &Address, destination: &Address) -> Option<&ChannelEntry> {
245 self.get_edge(source, destination).map(|e| &e.weight().channel)
246 }
247
248 pub fn get_node(&self, node: &Address) -> Option<&Node> {
251 self.indices
252 .get(node)
253 .and_then(|index| self.graph.node_weight((*index).into()))
254 }
255
256 pub fn open_channels_from(&self, source: Address) -> impl Iterator<Item = (&Node, &ChannelEdge)> {
258 let idx = self
260 .indices
261 .get(&source)
262 .cloned()
263 .unwrap_or(self.graph.node_count() as u32);
264 self.graph
265 .edges_directed(idx.into(), Direction::Outgoing)
266 .filter(|c| c.weight().channel.status == ChannelStatus::Open)
267 .map(|e| (&self.graph[e.target()], e.weight()))
268 }
269
270 pub fn has_path(&self, source: &Address, destination: &Address) -> bool {
273 let only_open_graph = EdgeFiltered::from_fn(&self.graph, |e| e.weight().channel.status == ChannelStatus::Open);
274 if let Some((src_idx, dst_idx)) = self
275 .indices
276 .get(source)
277 .and_then(|src| self.indices.get(destination).map(|dst| (*src, *dst)))
278 {
279 has_path_connecting(&only_open_graph, src_idx.into(), dst_idx.into(), None)
280 } else {
281 false
282 }
283 }
284
285 pub fn update_channel(&mut self, channel: ChannelEntry) -> Option<Vec<ChannelChange>> {
289 #[cfg(all(feature = "prometheus", not(test)))]
290 {
291 if let Some(direction) = channel.direction(&self.me) {
292 match direction {
293 ChannelDirection::Outgoing => match channel.status {
294 ChannelStatus::Closed => {
295 METRIC_NUMBER_OF_CHANNELS.decrement(&["out"], 1.0);
296 }
297 ChannelStatus::Open => {
298 METRIC_NUMBER_OF_CHANNELS.increment(&["out"], 1.0);
299 }
300 ChannelStatus::PendingToClose(_) => {}
301 },
302 ChannelDirection::Incoming => match channel.status {
303 ChannelStatus::Closed => {
304 METRIC_NUMBER_OF_CHANNELS.decrement(&["in"], 1.0);
305 }
306 ChannelStatus::Open => {
307 METRIC_NUMBER_OF_CHANNELS.increment(&["in"], 1.0);
308 }
309 ChannelStatus::PendingToClose(_) => {}
310 },
311 }
312 }
313 }
314
315 let maybe_edge_id = self.get_edge(&channel.source, &channel.destination).map(|e| e.id());
316
317 if channel.status == ChannelStatus::Closed {
319 return maybe_edge_id
320 .and_then(|id| self.graph.remove_edge(id))
321 .inspect(|c| debug!("removed {}", c.channel))
322 .map(|old_value| ChannelChange::diff_channels(&old_value.channel, &channel));
323 }
324
325 if let Some(old_value) = maybe_edge_id.and_then(|id| self.graph.edge_weight_mut(id)) {
327 let old_channel = old_value.channel;
328 old_value.channel = channel;
329
330 let ret = ChannelChange::diff_channels(&old_channel, &channel);
331 debug!(
332 "updated {channel}: {}",
333 ret.iter().map(ChannelChange::to_string).collect::<Vec<_>>().join(",")
334 );
335 Some(ret)
336 } else {
337 let src = *self.indices.entry(channel.source).or_insert_with(|| {
339 self.graph
340 .add_node(Node::new(channel.source, self.cfg.latency_sma_window_length))
341 .index() as u32
342 });
343
344 let dst = *self.indices.entry(channel.destination).or_insert_with(|| {
345 self.graph
346 .add_node(Node::new(channel.destination, self.cfg.latency_sma_window_length))
347 .index() as u32
348 });
349
350 let weighted = ChannelEdge {
351 channel,
352 edge_score: None,
353 };
354
355 self.graph.add_edge(src.into(), dst.into(), weighted);
356 debug!("new {channel}");
357
358 None
359 }
360 }
361
362 pub fn update_node_score(&mut self, address: &Address, score_update: NodeScoreUpdate) {
365 if !self.me.eq(address) {
366 match self.indices.entry(*address) {
367 Entry::Occupied(existing) => {
369 let existing_idx: NodeIndex = (*existing.get()).into();
370 if score_update != NodeScoreUpdate::Unreachable
374 || self.graph.neighbors_undirected(existing_idx).count() > 0
375 {
376 if let Some(node) = self.graph.node_weight_mut(existing_idx) {
378 let updated_quality = node.update_score(score_update, self.cfg);
379 debug!(%address, updated_quality, "updated node quality");
380 } else {
381 warn!(%address, "removed dangling node index from channel graph");
383 existing.remove();
384 }
385 } else {
386 if self
389 .graph
390 .node_weight_mut(existing_idx)
391 .map(|node| node.update_score(score_update, self.cfg))
392 .is_some_and(|updated_quality| updated_quality < self.cfg.offline_node_score_threshold)
393 {
394 self.graph.remove_node(existing.remove().into());
395 debug!(%address, "removed offline node with no channels");
396 }
397 }
398 }
399 Entry::Vacant(new_node) if score_update != NodeScoreUpdate::Unreachable => {
401 let mut inserted_node = Node::new(*address, self.cfg.latency_sma_window_length);
402 let updated_quality = inserted_node.update_score(score_update, self.cfg);
403 new_node.insert(self.graph.add_node(inserted_node).index() as u32);
404 debug!(%address, updated_quality, "added new node");
405 }
406 Entry::Vacant(_) => {}
408 }
409 }
410 }
411
412 pub fn update_channel_score(&mut self, source: &Address, destination: &Address, score: f64) {
415 assert!(score >= 0_f64, "score must be non-negative");
416 let maybe_edge_id = self.get_edge(source, destination).map(|e| e.id());
417 if let Some(channel) = maybe_edge_id.and_then(|id| self.graph.edge_weight_mut(id)) {
418 if score != channel.edge_score.unwrap_or(-1_f64) {
419 channel.edge_score = Some(score);
420 debug!("updated score of {} to {score}", channel.channel);
421 }
422 }
423 }
424
425 pub fn get_channel_score(&self, source: &Address, destination: &Address) -> Option<f64> {
428 self.get_edge(source, destination)
429 .and_then(|e| self.graph.edge_weight(e.id()))
430 .and_then(|e| e.edge_score)
431 }
432
433 pub fn contains_channel(&self, channel: &ChannelEntry) -> bool {
435 self.get_channel(&channel.source, &channel.destination).is_some()
436 }
437
438 pub fn contains_node(&self, address: &Address) -> bool {
440 self.get_node(address).is_some()
441 }
442
443 pub fn as_dot(&self, cfg: GraphExportConfig) -> String {
445 if cfg.ignore_disconnected_components {
446 let only_open_graph =
447 EdgeFiltered::from_fn(&self.graph, |e| e.weight().channel.status == ChannelStatus::Open);
448
449 let me_idx: NodeIndex = (*self.indices.get(&self.me).expect("graph must contain self")).into();
450
451 Dot::new(&NodeFiltered::from_fn(&self.graph, |n| {
452 self.graph.node_weight(n).is_some_and(|n| n.node_score > 0_f64)
455 && has_path_connecting(&only_open_graph, me_idx, n, None)
456 }))
457 .to_string()
458 } else if cfg.ignore_non_opened_channels {
459 Dot::new(&NodeFiltered::from_fn(&self.graph, |a| {
461 self.graph
462 .edges_directed(a, Direction::Outgoing)
463 .any(|e| e.weight().channel.status == ChannelStatus::Open)
464 || self
465 .graph
466 .edges_directed(a, Direction::Incoming)
467 .any(|e| e.weight().channel.status == ChannelStatus::Open)
468 }))
469 .to_string()
470 } else if cfg.only_3_hop_accessible_nodes {
471 let me_idx: NodeIndex = (*self.indices.get(&self.me).expect("graph must contain self")).into();
473 let distances = petgraph::algo::dijkstra(&self.graph, me_idx, None, |e| {
474 if e.weight().channel.status == ChannelStatus::Open {
475 1
476 } else {
477 100
478 }
479 });
480
481 Dot::new(&NodeFiltered::from_fn(&self.graph, |a| {
482 distances.get(&a).map(|d| *d <= 3).unwrap_or(false)
483 }))
484 .to_string()
485 } else {
486 Dot::new(&self.graph).to_string()
487 }
488 }
489}
490
491#[derive(Clone, Debug, Default, Eq, PartialEq)]
495pub struct GraphExportConfig {
496 pub ignore_disconnected_components: bool,
500 pub ignore_non_opened_channels: bool,
502 pub only_3_hop_accessible_nodes: bool,
504}
505
506#[cfg(test)]
507mod tests {
508 use std::{
509 ops::Add,
510 time::{Duration, SystemTime},
511 };
512
513 use anyhow::{Context, anyhow};
514 use hopr_internal_types::channels::{ChannelChange, ChannelStatus};
515 use hopr_primitive_types::prelude::*;
516
517 use super::*;
518 use crate::{
519 channel_graph::ChannelGraph,
520 tests::{ADDRESSES, dummy_channel},
521 };
522
523 #[test]
524 fn channel_graph_self_addr() {
525 let cg = ChannelGraph::new(ADDRESSES[0], Default::default());
526 assert_eq!(ADDRESSES[0], cg.my_address(), "must produce correct self address");
527
528 assert!(cg.contains_node(&ADDRESSES[0]), "must contain self address");
529
530 assert_eq!(
531 cg.get_node(&ADDRESSES[0]).cloned(),
532 Some(Node {
533 address: ADDRESSES[0],
534 node_score: 1.0,
535 latency: SingleSumSMA::new_with_samples(
536 cg.cfg.latency_sma_window_length,
537 vec![Duration::ZERO; cg.cfg.latency_sma_window_length]
538 )
539 }),
540 "must contain self node with quality 1"
541 );
542 }
543
544 #[test]
545 fn channel_graph_has_path() {
546 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
547
548 let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
549 cg.update_channel(c);
550
551 assert!(cg.contains_channel(&c), "must contain channel");
552
553 assert!(cg.contains_node(&ADDRESSES[0]), "must contain channel source");
554
555 assert!(cg.contains_node(&ADDRESSES[1]), "must contain channel destination");
556
557 assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]), "must have simple path");
558
559 assert!(
560 !cg.has_path(&ADDRESSES[0], &ADDRESSES[2]),
561 "must not have non existent path"
562 );
563 }
564
565 #[test]
566 fn channel_graph_update_node_quality() -> anyhow::Result<()> {
567 let mut cg = ChannelGraph::new(
568 ADDRESSES[0],
569 ChannelGraphConfig {
570 node_score_step_up: 0.1,
571 node_score_decay: 4.0,
572 offline_node_score_threshold: 0.1,
573 ..Default::default()
574 },
575 );
576
577 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(100)));
578 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
579 assert_eq!(node.node_score, 0.1);
580 assert_eq!(node.latency.average(), Some(Duration::from_millis(100)));
581
582 assert!(!cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
583
584 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
585 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
586 assert_eq!(node.node_score, 0.2);
587 assert_eq!(node.latency.average(), Some(Duration::from_millis(75)));
588
589 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(30)));
590 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(20)));
591
592 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
593 assert_eq!(node.node_score, 0.4);
594 assert_eq!(node.latency.average(), Some(Duration::from_millis(50)));
595
596 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
597 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
598 assert_eq!(node.node_score, 0.1);
599 assert!(node.latency.average().is_none());
600
601 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
602
603 assert_eq!(cg.get_node(&ADDRESSES[1]), None);
605
606 Ok(())
607 }
608
609 #[test]
610 fn channel_graph_update_node_quality_should_not_remove_nodes_with_zero_quality_and_path() -> anyhow::Result<()> {
611 let mut cg = ChannelGraph::new(
612 ADDRESSES[0],
613 ChannelGraphConfig {
614 node_score_step_up: 0.1,
615 node_score_decay: 4.0,
616 offline_node_score_threshold: 0.1,
617 ..Default::default()
618 },
619 );
620
621 let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
622 cg.update_channel(c);
623
624 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
625 assert_eq!(node.address, ADDRESSES[1]);
626 assert_eq!(node.node_score, 0.0);
627 assert!(node.latency.is_empty());
628
629 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
630
631 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
632 assert_eq!(node.address, ADDRESSES[1]);
633 assert_eq!(node.node_score, 0.1);
634
635 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
636 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
637 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
638
639 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
640 assert_eq!(node.address, ADDRESSES[1]);
641 assert_eq!(node.node_score, 0.4);
642
643 assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
644
645 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
646
647 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
648 assert_eq!(node.address, ADDRESSES[1]);
649 assert_eq!(node.node_score, 0.1);
650
651 assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
652
653 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
654
655 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
656 assert_eq!(node.address, ADDRESSES[1]);
657 assert_eq!(node.node_score, 0.0);
658
659 assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
660
661 Ok(())
662 }
663
664 #[test]
665 fn channel_graph_update_channel_score() -> anyhow::Result<()> {
666 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
667
668 let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
669 cg.update_channel(c);
670
671 assert!(cg.contains_channel(&c), "must contain channel");
672 assert!(
673 cg.get_channel_score(&ADDRESSES[0], &ADDRESSES[1]).is_none(),
674 "must start with no quality info"
675 );
676
677 cg.update_channel_score(&ADDRESSES[0], &ADDRESSES[1], 0.5_f64);
678
679 let q = cg
680 .get_channel_score(&ADDRESSES[0], &ADDRESSES[1])
681 .context("must have quality when set")?;
682 assert_eq!(0.5_f64, q, "quality must be equal");
683
684 Ok(())
685 }
686
687 #[test]
688 fn channel_graph_is_own_channel() {
689 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
690
691 let c1 = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
692 let c2 = dummy_channel(ADDRESSES[1], ADDRESSES[2], ChannelStatus::Open);
693 cg.update_channel(c1);
694 cg.update_channel(c2);
695
696 assert!(cg.is_own_channel(&c1), "must detect as own channel");
697 assert!(!cg.is_own_channel(&c2), "must not detect as own channel");
698 }
699
700 #[test]
701 fn channel_graph_update_changes() -> anyhow::Result<()> {
702 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
703
704 let mut c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
705
706 let changes = cg.update_channel(c);
707 assert!(changes.is_none(), "must not produce changes for a new channel");
708
709 let cr = cg
710 .get_channel(&ADDRESSES[0], &ADDRESSES[1])
711 .context("must contain channel")?;
712 assert!(c.eq(cr), "channels must be equal");
713
714 let ts = SystemTime::now().add(Duration::from_secs(10));
715 c.balance = 0.into();
716 c.status = ChannelStatus::PendingToClose(ts);
717 let changes = cg.update_channel(c).context("should contain channel changes")?;
718 assert_eq!(2, changes.len(), "must contain 2 changes");
719
720 for change in changes {
721 match change {
722 ChannelChange::Status { left, right } => {
723 assert_eq!(ChannelStatus::Open, left, "previous status does not match");
724 assert_eq!(ChannelStatus::PendingToClose(ts), right, "new status does not match");
725 }
726 ChannelChange::CurrentBalance { left, right } => {
727 assert_eq!(HoprBalance::from(1), left, "previous balance does not match");
728 assert_eq!(HoprBalance::zero(), right, "new balance does not match");
729 }
730 _ => panic!("unexpected change"),
731 }
732 }
733
734 let cr = cg
735 .get_channel(&ADDRESSES[0], &ADDRESSES[1])
736 .context("must contain channel")?;
737 assert!(c.eq(cr), "channels must be equal");
738
739 Ok(())
740 }
741
742 #[test]
743 fn channel_graph_update_changes_on_close() -> anyhow::Result<()> {
744 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
745
746 let ts = SystemTime::now().add(Duration::from_secs(10));
747 let mut c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::PendingToClose(ts));
748
749 let changes = cg.update_channel(c);
750 assert!(changes.is_none(), "must not produce changes for a new channel");
751
752 let cr = cg
753 .get_channel(&ADDRESSES[0], &ADDRESSES[1])
754 .context("must contain channel")?;
755 assert!(c.eq(cr), "channels must be equal");
756
757 c.balance = 0.into();
758 c.status = ChannelStatus::Closed;
759 let changes = cg.update_channel(c).context("must contain changes")?;
760 assert_eq!(2, changes.len(), "must contain 2 changes");
761
762 for change in changes {
763 match change {
764 ChannelChange::Status { left, right } => {
765 assert_eq!(
766 ChannelStatus::PendingToClose(ts),
767 left,
768 "previous status does not match"
769 );
770 assert_eq!(ChannelStatus::Closed, right, "new status does not match");
771 }
772 ChannelChange::CurrentBalance { left, right } => {
773 assert_eq!(HoprBalance::from(1), left, "previous balance does not match");
774 assert_eq!(HoprBalance::zero(), right, "new balance does not match");
775 }
776 _ => panic!("unexpected change"),
777 }
778 }
779
780 let cr = cg.get_channel(&ADDRESSES[0], &ADDRESSES[1]);
781 assert!(cr.is_none(), "must not contain channel after closing");
782
783 Ok(())
784 }
785
786 #[test]
787 fn channel_graph_update_should_not_allow_closed_channels() {
788 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
789 let changes = cg.update_channel(dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Closed));
790 assert!(changes.is_none(), "must not produce changes for a closed channel");
791
792 let c = cg.get_channel(&ADDRESSES[0], &ADDRESSES[1]);
793 assert!(c.is_none(), "must not allow adding closed channels");
794 }
795
796 #[test]
797 fn channel_graph_update_should_allow_pending_to_close_channels() -> anyhow::Result<()> {
798 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
799 let ts = SystemTime::now().add(Duration::from_secs(10));
800 let changes = cg.update_channel(dummy_channel(
801 ADDRESSES[0],
802 ADDRESSES[1],
803 ChannelStatus::PendingToClose(ts),
804 ));
805 assert!(changes.is_none(), "must not produce changes for a closed channel");
806
807 cg.get_channel(&ADDRESSES[0], &ADDRESSES[1])
808 .context("must allow PendingToClose channels")?;
809
810 Ok(())
811 }
812}