1use std::{
2 collections::{HashMap, hash_map::Entry},
3 fmt::{Debug, Formatter},
4 time::Duration,
5};
6
7use hopr_internal_types::prelude::*;
8use hopr_primitive_types::{prelude::SMA, primitives::Address, sma::SingleSumSMA};
9use petgraph::{
10 Direction,
11 algo::has_path_connecting,
12 dot::Dot,
13 prelude::StableDiGraph,
14 stable_graph::NodeIndex,
15 visit::{EdgeFiltered, EdgeRef, NodeFiltered},
16};
17use tracing::{debug, warn};
18#[cfg(all(feature = "prometheus", not(test)))]
19use {
20 hopr_internal_types::channels::ChannelDirection, hopr_metrics::metrics::MultiGauge,
21 hopr_primitive_types::traits::ToHex,
22};
23
24#[cfg(all(feature = "prometheus", not(test)))]
25lazy_static::lazy_static! {
26 static ref METRIC_NUMBER_OF_CHANNELS: MultiGauge = MultiGauge::new(
27 "hopr_channels_count",
28 "Number of channels per direction",
29 &["direction"]
30 ).unwrap();
31 static ref METRIC_CHANNEL_BALANCES: MultiGauge = MultiGauge::new(
32 "hopr_channel_balances",
33 "Balances on channels per counterparty",
34 &["counterparty", "direction"]
35 ).unwrap();
36}
37
38#[derive(Clone, Copy, Debug, PartialEq)]
41#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
42pub struct ChannelEdge {
43 pub channel: ChannelEntry,
45 pub edge_score: Option<f64>,
47}
48
49impl std::fmt::Display for ChannelEdge {
50 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
51 write!(
52 f,
53 "{}; stake={}; score={:?}; status={};",
54 self.channel,
55 self.channel.balance,
56 self.edge_score,
57 self.channel.status.to_string().to_lowercase()
58 )
59 }
60}
61
62#[derive(Clone, Debug, PartialEq)]
66#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
67pub struct Node {
68 pub address: Address,
70 pub node_score: f64,
72 pub latency: SingleSumSMA<std::time::Duration, u32>,
74}
75
76impl Node {
77 pub fn new(address: Address, latency_window_length: usize) -> Self {
78 Self {
79 address,
80 node_score: 0.0,
81 latency: SingleSumSMA::new(latency_window_length),
82 }
83 }
84
85 pub fn update_score(&mut self, score_update: NodeScoreUpdate, cfg: ChannelGraphConfig) -> f64 {
92 match score_update {
93 NodeScoreUpdate::Reachable(latency) => {
94 self.node_score = 1.0_f64.min(self.node_score + cfg.node_score_step_up);
95 self.latency.push(latency);
96 }
97 NodeScoreUpdate::Unreachable => {
98 self.node_score /= cfg.node_score_decay;
99 self.latency.clear();
100 if self.node_score < cfg.offline_node_score_threshold {
101 self.node_score = 0.0;
102 }
103 }
104 NodeScoreUpdate::Initialize(latency, node_score) => {
105 self.latency.clear();
106 self.latency.push(latency);
107 self.node_score = node_score.clamp(0.0, 1.0);
108 }
109 }
110 self.node_score
111 }
112}
113
114impl std::fmt::Display for Node {
115 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
116 write!(f, "{}; score={}", self.address, self.node_score)
117 }
118}
119
120#[derive(Clone, Copy, Debug, PartialEq, smart_default::SmartDefault)]
122#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
123pub struct ChannelGraphConfig {
124 #[default(20)]
126 pub latency_sma_window_length: usize,
127 #[default(0.1)]
129 pub node_score_step_up: f64,
130 #[default(4.0)]
132 pub node_score_decay: f64,
133 #[default(0.1)]
137 pub offline_node_score_threshold: f64,
138}
139
140#[derive(Clone, Copy, Debug, PartialEq)]
142#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
143pub enum NodeScoreUpdate {
144 Reachable(Duration),
146 Unreachable,
148 Initialize(Duration, f64),
151}
152
153impl<T> From<Result<Duration, T>> for NodeScoreUpdate {
154 fn from(result: Result<Duration, T>) -> Self {
155 match result {
156 Ok(duration) => NodeScoreUpdate::Reachable(duration),
157 Err(_) => NodeScoreUpdate::Unreachable,
158 }
159 }
160}
161
162#[cfg_attr(feature = "serde", cfg_eval::cfg_eval, serde_with::serde_as)]
179#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
180#[derive(Clone, Debug)]
181pub struct ChannelGraph {
182 me: Address,
183 #[cfg_attr(feature = "serde", serde_as(as = "Vec<(_, _)>"))]
184 indices: HashMap<Address, u32>,
185 graph: StableDiGraph<Node, ChannelEdge>,
186 cfg: ChannelGraphConfig,
187}
188
189impl ChannelGraph {
190 pub const INTERMEDIATE_HOPS: usize = 3;
192
193 pub fn new(me: Address, cfg: ChannelGraphConfig) -> Self {
195 #[cfg(all(feature = "prometheus", not(test)))]
196 {
197 lazy_static::initialize(&METRIC_NUMBER_OF_CHANNELS);
198 }
199
200 let mut ret = Self {
201 me,
202 cfg,
203 indices: HashMap::new(),
204 graph: StableDiGraph::default(),
205 };
206 ret.indices.insert(
207 me,
208 ret.graph
209 .add_node(Node {
210 address: me,
211 node_score: 1.0,
212 latency: SingleSumSMA::new_with_samples(
213 cfg.latency_sma_window_length,
214 vec![Duration::ZERO; cfg.latency_sma_window_length],
215 ),
216 })
217 .index() as u32,
218 );
219 ret
220 }
221
222 pub fn count_channels(&self) -> usize {
224 self.graph.edge_count()
225 }
226
227 pub fn count_nodes(&self) -> usize {
229 self.graph.node_count()
230 }
231
232 pub fn is_own_channel(&self, channel: &ChannelEntry) -> bool {
234 channel.destination == self.me || channel.source == self.me
235 }
236
237 pub fn my_address(&self) -> Address {
239 self.me
240 }
241
242 fn get_edge(&self, src: &Address, dst: &Address) -> Option<petgraph::stable_graph::EdgeReference<ChannelEdge>> {
243 let (src_idx, dst_idx) = self
244 .indices
245 .get(src)
246 .and_then(|src| self.indices.get(dst).map(|dst| (*src, *dst)))?;
247 self.graph.edges_connecting(src_idx.into(), dst_idx.into()).next()
248 }
249
250 pub fn get_channel(&self, source: &Address, destination: &Address) -> Option<&ChannelEntry> {
253 self.get_edge(source, destination).map(|e| &e.weight().channel)
254 }
255
256 pub fn get_node(&self, node: &Address) -> Option<&Node> {
259 self.indices
260 .get(node)
261 .and_then(|index| self.graph.node_weight((*index).into()))
262 }
263
264 pub fn open_channels_from(&self, source: Address) -> impl Iterator<Item = (&Node, &ChannelEdge)> {
266 let idx = self
268 .indices
269 .get(&source)
270 .cloned()
271 .unwrap_or(self.graph.node_count() as u32);
272 self.graph
273 .edges_directed(idx.into(), Direction::Outgoing)
274 .filter(|c| c.weight().channel.status == ChannelStatus::Open)
275 .map(|e| (&self.graph[e.target()], e.weight()))
276 }
277
278 pub fn has_path(&self, source: &Address, destination: &Address) -> bool {
281 let only_open_graph = EdgeFiltered::from_fn(&self.graph, |e| e.weight().channel.status == ChannelStatus::Open);
282 if let Some((src_idx, dst_idx)) = self
283 .indices
284 .get(source)
285 .and_then(|src| self.indices.get(destination).map(|dst| (*src, *dst)))
286 {
287 has_path_connecting(&only_open_graph, src_idx.into(), dst_idx.into(), None)
288 } else {
289 false
290 }
291 }
292
293 pub fn update_channel(&mut self, channel: ChannelEntry) -> Option<Vec<ChannelChange>> {
297 #[cfg(all(feature = "prometheus", not(test)))]
298 {
299 if let Some(direction) = channel.direction(&self.me) {
300 match direction {
301 ChannelDirection::Outgoing => match channel.status {
302 ChannelStatus::Closed => {
303 METRIC_NUMBER_OF_CHANNELS.decrement(&["out"], 1.0);
304 METRIC_CHANNEL_BALANCES.set(&[channel.destination.to_hex().as_str(), "out"], 0.0);
305 }
306 ChannelStatus::Open => {
307 METRIC_NUMBER_OF_CHANNELS.increment(&["out"], 1.0);
308 METRIC_CHANNEL_BALANCES.set(
309 &[channel.destination.to_hex().as_str(), "out"],
310 channel
311 .balance
312 .amount_in_base_units()
313 .parse::<f64>()
314 .unwrap_or(f64::INFINITY),
315 );
316 }
317 ChannelStatus::PendingToClose(_) => {}
318 },
319 ChannelDirection::Incoming => match channel.status {
320 ChannelStatus::Closed => {
321 METRIC_NUMBER_OF_CHANNELS.decrement(&["in"], 1.0);
322 METRIC_CHANNEL_BALANCES.set(&[channel.source.to_hex().as_str(), "in"], 0.0);
323 }
324 ChannelStatus::Open => {
325 METRIC_NUMBER_OF_CHANNELS.increment(&["in"], 1.0);
326 METRIC_CHANNEL_BALANCES.set(
327 &[channel.source.to_hex().as_str(), "in"],
328 channel
329 .balance
330 .amount_in_base_units()
331 .parse::<f64>()
332 .unwrap_or(f64::INFINITY),
333 );
334 }
335 ChannelStatus::PendingToClose(_) => {}
336 },
337 }
338 }
339 }
340
341 let maybe_edge_id = self.get_edge(&channel.source, &channel.destination).map(|e| e.id());
342
343 if channel.status == ChannelStatus::Closed {
345 return maybe_edge_id
346 .and_then(|id| self.graph.remove_edge(id))
347 .inspect(|c| debug!("removed {}", c.channel))
348 .map(|old_value| ChannelChange::diff_channels(&old_value.channel, &channel));
349 }
350
351 if let Some(old_value) = maybe_edge_id.and_then(|id| self.graph.edge_weight_mut(id)) {
353 let old_channel = old_value.channel;
354 old_value.channel = channel;
355
356 let ret = ChannelChange::diff_channels(&old_channel, &channel);
357 debug!(
358 "updated {channel}: {}",
359 ret.iter().map(ChannelChange::to_string).collect::<Vec<_>>().join(",")
360 );
361 Some(ret)
362 } else {
363 let src = *self.indices.entry(channel.source).or_insert_with(|| {
365 self.graph
366 .add_node(Node::new(channel.source, self.cfg.latency_sma_window_length))
367 .index() as u32
368 });
369
370 let dst = *self.indices.entry(channel.destination).or_insert_with(|| {
371 self.graph
372 .add_node(Node::new(channel.destination, self.cfg.latency_sma_window_length))
373 .index() as u32
374 });
375
376 let weighted = ChannelEdge {
377 channel,
378 edge_score: None,
379 };
380
381 self.graph.add_edge(src.into(), dst.into(), weighted);
382 debug!("new {channel}");
383
384 None
385 }
386 }
387
388 pub fn update_node_score(&mut self, address: &Address, score_update: NodeScoreUpdate) {
391 if !self.me.eq(address) {
392 match self.indices.entry(*address) {
393 Entry::Occupied(existing) => {
395 let existing_idx: NodeIndex = (*existing.get()).into();
396 if score_update != NodeScoreUpdate::Unreachable
400 || self.graph.neighbors_undirected(existing_idx).count() > 0
401 {
402 if let Some(node) = self.graph.node_weight_mut(existing_idx) {
404 let updated_quality = node.update_score(score_update, self.cfg);
405 debug!(%address, updated_quality, "updated node quality");
406 } else {
407 warn!(%address, "removed dangling node index from channel graph");
409 existing.remove();
410 }
411 } else {
412 if self
415 .graph
416 .node_weight_mut(existing_idx)
417 .map(|node| node.update_score(score_update, self.cfg))
418 .is_some_and(|updated_quality| updated_quality < self.cfg.offline_node_score_threshold)
419 {
420 self.graph.remove_node(existing.remove().into());
421 debug!(%address, "removed offline node with no channels");
422 }
423 }
424 }
425 Entry::Vacant(new_node) if score_update != NodeScoreUpdate::Unreachable => {
427 let mut inserted_node = Node::new(*address, self.cfg.latency_sma_window_length);
428 let updated_quality = inserted_node.update_score(score_update, self.cfg);
429 new_node.insert(self.graph.add_node(inserted_node).index() as u32);
430 debug!(%address, updated_quality, "added new node");
431 }
432 Entry::Vacant(_) => {}
434 }
435 }
436 }
437
438 pub fn update_channel_score(&mut self, source: &Address, destination: &Address, score: f64) {
441 assert!(score >= 0_f64, "score must be non-negative");
442 let maybe_edge_id = self.get_edge(source, destination).map(|e| e.id());
443 if let Some(channel) = maybe_edge_id.and_then(|id| self.graph.edge_weight_mut(id)) {
444 if score != channel.edge_score.unwrap_or(-1_f64) {
445 channel.edge_score = Some(score);
446 debug!("updated score of {} to {score}", channel.channel);
447 }
448 }
449 }
450
451 pub fn get_channel_score(&self, source: &Address, destination: &Address) -> Option<f64> {
454 self.get_edge(source, destination)
455 .and_then(|e| self.graph.edge_weight(e.id()))
456 .and_then(|e| e.edge_score)
457 }
458
459 pub fn contains_channel(&self, channel: &ChannelEntry) -> bool {
461 self.get_channel(&channel.source, &channel.destination).is_some()
462 }
463
464 pub fn contains_node(&self, address: &Address) -> bool {
466 self.get_node(address).is_some()
467 }
468
469 pub fn as_dot(&self, cfg: GraphExportConfig) -> String {
471 if cfg.ignore_disconnected_components {
472 let only_open_graph =
473 EdgeFiltered::from_fn(&self.graph, |e| e.weight().channel.status == ChannelStatus::Open);
474
475 let me_idx: NodeIndex = (*self.indices.get(&self.me).expect("graph must contain self")).into();
476
477 Dot::new(&NodeFiltered::from_fn(&self.graph, |n| {
478 self.graph.node_weight(n).is_some_and(|n| n.node_score > 0_f64)
481 && has_path_connecting(&only_open_graph, me_idx, n, None)
482 }))
483 .to_string()
484 } else if cfg.ignore_non_opened_channels {
485 Dot::new(&NodeFiltered::from_fn(&self.graph, |a| {
487 self.graph
488 .edges_directed(a, Direction::Outgoing)
489 .any(|e| e.weight().channel.status == ChannelStatus::Open)
490 || self
491 .graph
492 .edges_directed(a, Direction::Incoming)
493 .any(|e| e.weight().channel.status == ChannelStatus::Open)
494 }))
495 .to_string()
496 } else if cfg.only_3_hop_accessible_nodes {
497 let me_idx: NodeIndex = (*self.indices.get(&self.me).expect("graph must contain self")).into();
499 let distances = petgraph::algo::dijkstra(&self.graph, me_idx, None, |e| {
500 if e.weight().channel.status == ChannelStatus::Open {
501 1
502 } else {
503 100
504 }
505 });
506
507 Dot::new(&NodeFiltered::from_fn(&self.graph, |a| {
508 distances.get(&a).map(|d| *d <= 3).unwrap_or(false)
509 }))
510 .to_string()
511 } else {
512 Dot::new(&self.graph).to_string()
513 }
514 }
515}
516
517#[derive(Clone, Debug, Default, Eq, PartialEq)]
521pub struct GraphExportConfig {
522 pub ignore_disconnected_components: bool,
526 pub ignore_non_opened_channels: bool,
528 pub only_3_hop_accessible_nodes: bool,
530}
531
532#[cfg(test)]
533mod tests {
534 use std::{
535 ops::Add,
536 time::{Duration, SystemTime},
537 };
538
539 use anyhow::{Context, anyhow};
540 use hopr_internal_types::channels::{ChannelChange, ChannelStatus};
541 use hopr_primitive_types::prelude::*;
542
543 use super::*;
544 use crate::{
545 channel_graph::ChannelGraph,
546 tests::{ADDRESSES, dummy_channel},
547 };
548
549 #[test]
550 fn channel_graph_self_addr() {
551 let cg = ChannelGraph::new(ADDRESSES[0], Default::default());
552 assert_eq!(ADDRESSES[0], cg.my_address(), "must produce correct self address");
553
554 assert!(cg.contains_node(&ADDRESSES[0]), "must contain self address");
555
556 assert_eq!(
557 cg.get_node(&ADDRESSES[0]).cloned(),
558 Some(Node {
559 address: ADDRESSES[0],
560 node_score: 1.0,
561 latency: SingleSumSMA::new_with_samples(
562 cg.cfg.latency_sma_window_length,
563 vec![Duration::ZERO; cg.cfg.latency_sma_window_length]
564 )
565 }),
566 "must contain self node with quality 1"
567 );
568 }
569
570 #[test]
571 fn channel_graph_has_path() {
572 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
573
574 let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
575 cg.update_channel(c);
576
577 assert!(cg.contains_channel(&c), "must contain channel");
578
579 assert!(cg.contains_node(&ADDRESSES[0]), "must contain channel source");
580
581 assert!(cg.contains_node(&ADDRESSES[1]), "must contain channel destination");
582
583 assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]), "must have simple path");
584
585 assert!(
586 !cg.has_path(&ADDRESSES[0], &ADDRESSES[2]),
587 "must not have non existent path"
588 );
589 }
590
591 #[test]
592 fn channel_graph_update_node_quality() -> anyhow::Result<()> {
593 let mut cg = ChannelGraph::new(
594 ADDRESSES[0],
595 ChannelGraphConfig {
596 node_score_step_up: 0.1,
597 node_score_decay: 4.0,
598 offline_node_score_threshold: 0.1,
599 ..Default::default()
600 },
601 );
602
603 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(100)));
604 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
605 assert_eq!(node.node_score, 0.1);
606 assert_eq!(node.latency.average(), Some(Duration::from_millis(100)));
607
608 assert!(!cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
609
610 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
611 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
612 assert_eq!(node.node_score, 0.2);
613 assert_eq!(node.latency.average(), Some(Duration::from_millis(75)));
614
615 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(30)));
616 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(20)));
617
618 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
619 assert_eq!(node.node_score, 0.4);
620 assert_eq!(node.latency.average(), Some(Duration::from_millis(50)));
621
622 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
623 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
624 assert_eq!(node.node_score, 0.1);
625 assert!(node.latency.average().is_none());
626
627 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
628
629 assert_eq!(cg.get_node(&ADDRESSES[1]), None);
631
632 Ok(())
633 }
634
635 #[test]
636 fn channel_graph_update_node_quality_should_not_remove_nodes_with_zero_quality_and_path() -> anyhow::Result<()> {
637 let mut cg = ChannelGraph::new(
638 ADDRESSES[0],
639 ChannelGraphConfig {
640 node_score_step_up: 0.1,
641 node_score_decay: 4.0,
642 offline_node_score_threshold: 0.1,
643 ..Default::default()
644 },
645 );
646
647 let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
648 cg.update_channel(c);
649
650 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
651 assert_eq!(node.address, ADDRESSES[1]);
652 assert_eq!(node.node_score, 0.0);
653 assert!(node.latency.is_empty());
654
655 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
656
657 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
658 assert_eq!(node.address, ADDRESSES[1]);
659 assert_eq!(node.node_score, 0.1);
660
661 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
662 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
663 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
664
665 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
666 assert_eq!(node.address, ADDRESSES[1]);
667 assert_eq!(node.node_score, 0.4);
668
669 assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
670
671 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
672
673 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
674 assert_eq!(node.address, ADDRESSES[1]);
675 assert_eq!(node.node_score, 0.1);
676
677 assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
678
679 cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
680
681 let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
682 assert_eq!(node.address, ADDRESSES[1]);
683 assert_eq!(node.node_score, 0.0);
684
685 assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
686
687 Ok(())
688 }
689
690 #[test]
691 fn channel_graph_update_channel_score() -> anyhow::Result<()> {
692 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
693
694 let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
695 cg.update_channel(c);
696
697 assert!(cg.contains_channel(&c), "must contain channel");
698 assert!(
699 cg.get_channel_score(&ADDRESSES[0], &ADDRESSES[1]).is_none(),
700 "must start with no quality info"
701 );
702
703 cg.update_channel_score(&ADDRESSES[0], &ADDRESSES[1], 0.5_f64);
704
705 let q = cg
706 .get_channel_score(&ADDRESSES[0], &ADDRESSES[1])
707 .context("must have quality when set")?;
708 assert_eq!(0.5_f64, q, "quality must be equal");
709
710 Ok(())
711 }
712
713 #[test]
714 fn channel_graph_is_own_channel() {
715 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
716
717 let c1 = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
718 let c2 = dummy_channel(ADDRESSES[1], ADDRESSES[2], ChannelStatus::Open);
719 cg.update_channel(c1);
720 cg.update_channel(c2);
721
722 assert!(cg.is_own_channel(&c1), "must detect as own channel");
723 assert!(!cg.is_own_channel(&c2), "must not detect as own channel");
724 }
725
726 #[test]
727 fn channel_graph_update_changes() -> anyhow::Result<()> {
728 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
729
730 let mut c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
731
732 let changes = cg.update_channel(c);
733 assert!(changes.is_none(), "must not produce changes for a new channel");
734
735 let cr = cg
736 .get_channel(&ADDRESSES[0], &ADDRESSES[1])
737 .context("must contain channel")?;
738 assert!(c.eq(cr), "channels must be equal");
739
740 let ts = SystemTime::now().add(Duration::from_secs(10));
741 c.balance = 0.into();
742 c.status = ChannelStatus::PendingToClose(ts);
743 let changes = cg.update_channel(c).context("should contain channel changes")?;
744 assert_eq!(2, changes.len(), "must contain 2 changes");
745
746 for change in changes {
747 match change {
748 ChannelChange::Status { left, right } => {
749 assert_eq!(ChannelStatus::Open, left, "previous status does not match");
750 assert_eq!(ChannelStatus::PendingToClose(ts), right, "new status does not match");
751 }
752 ChannelChange::CurrentBalance { left, right } => {
753 assert_eq!(HoprBalance::from(1), left, "previous balance does not match");
754 assert_eq!(HoprBalance::zero(), right, "new balance does not match");
755 }
756 _ => panic!("unexpected change"),
757 }
758 }
759
760 let cr = cg
761 .get_channel(&ADDRESSES[0], &ADDRESSES[1])
762 .context("must contain channel")?;
763 assert!(c.eq(cr), "channels must be equal");
764
765 Ok(())
766 }
767
768 #[test]
769 fn channel_graph_update_changes_on_close() -> anyhow::Result<()> {
770 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
771
772 let ts = SystemTime::now().add(Duration::from_secs(10));
773 let mut c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::PendingToClose(ts));
774
775 let changes = cg.update_channel(c);
776 assert!(changes.is_none(), "must not produce changes for a new channel");
777
778 let cr = cg
779 .get_channel(&ADDRESSES[0], &ADDRESSES[1])
780 .context("must contain channel")?;
781 assert!(c.eq(cr), "channels must be equal");
782
783 c.balance = 0.into();
784 c.status = ChannelStatus::Closed;
785 let changes = cg.update_channel(c).context("must contain changes")?;
786 assert_eq!(2, changes.len(), "must contain 2 changes");
787
788 for change in changes {
789 match change {
790 ChannelChange::Status { left, right } => {
791 assert_eq!(
792 ChannelStatus::PendingToClose(ts),
793 left,
794 "previous status does not match"
795 );
796 assert_eq!(ChannelStatus::Closed, right, "new status does not match");
797 }
798 ChannelChange::CurrentBalance { left, right } => {
799 assert_eq!(HoprBalance::from(1), left, "previous balance does not match");
800 assert_eq!(HoprBalance::zero(), right, "new balance does not match");
801 }
802 _ => panic!("unexpected change"),
803 }
804 }
805
806 let cr = cg.get_channel(&ADDRESSES[0], &ADDRESSES[1]);
807 assert!(cr.is_none(), "must not contain channel after closing");
808
809 Ok(())
810 }
811
812 #[test]
813 fn channel_graph_update_should_not_allow_closed_channels() {
814 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
815 let changes = cg.update_channel(dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Closed));
816 assert!(changes.is_none(), "must not produce changes for a closed channel");
817
818 let c = cg.get_channel(&ADDRESSES[0], &ADDRESSES[1]);
819 assert!(c.is_none(), "must not allow adding closed channels");
820 }
821
822 #[test]
823 fn channel_graph_update_should_allow_pending_to_close_channels() -> anyhow::Result<()> {
824 let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
825 let ts = SystemTime::now().add(Duration::from_secs(10));
826 let changes = cg.update_channel(dummy_channel(
827 ADDRESSES[0],
828 ADDRESSES[1],
829 ChannelStatus::PendingToClose(ts),
830 ));
831 assert!(changes.is_none(), "must not produce changes for a closed channel");
832
833 cg.get_channel(&ADDRESSES[0], &ADDRESSES[1])
834 .context("must allow PendingToClose channels")?;
835
836 Ok(())
837 }
838}