hopr_path/
channel_graph.rs

1use std::{
2    collections::{HashMap, hash_map::Entry},
3    fmt::{Debug, Formatter},
4    time::Duration,
5};
6
7use hopr_internal_types::prelude::*;
8use hopr_primitive_types::{prelude::SMA, primitives::Address, sma::SingleSumSMA};
9use petgraph::{
10    Direction,
11    algo::has_path_connecting,
12    dot::Dot,
13    prelude::StableDiGraph,
14    stable_graph::NodeIndex,
15    visit::{EdgeFiltered, EdgeRef, NodeFiltered},
16};
17use tracing::{debug, warn};
18#[cfg(all(feature = "prometheus", not(test)))]
19use {hopr_internal_types::channels::ChannelDirection, hopr_metrics::MultiGauge};
20
21#[cfg(all(feature = "prometheus", not(test)))]
22lazy_static::lazy_static! {
23    static ref METRIC_NUMBER_OF_CHANNELS:  MultiGauge =  MultiGauge::new(
24        "hopr_channels_count",
25        "Number of channels per direction",
26        &["direction"]
27    ).unwrap();
28}
29
30/// Structure that adds additional data to a `ChannelEntry`, which
31/// can be used to compute edge weights and traverse the `ChannelGraph`.
32#[derive(Clone, Copy, Debug, PartialEq)]
33#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
34pub struct ChannelEdge {
35    /// Underlying channel
36    pub channel: ChannelEntry,
37    /// Optional scoring of the edge that might be used for path planning.
38    pub edge_score: Option<f64>,
39}
40
41impl std::fmt::Display for ChannelEdge {
42    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
43        write!(
44            f,
45            "{}; stake={}; score={:?}; status={};",
46            self.channel,
47            self.channel.balance,
48            self.edge_score,
49            self.channel.status.to_string().to_lowercase()
50        )
51    }
52}
53
54/// Represents a node in the Channel Graph.
55/// This is typically represented by an on-chain address and ping quality, which
56/// represents some kind of node's liveness as perceived by us.
57#[derive(Clone, Debug, PartialEq)]
58#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
59pub struct Node {
60    /// Node's on-chain address.
61    pub address: Address,
62    /// Liveness of the node.
63    pub node_score: f64,
64    /// Average node latency
65    pub latency: SingleSumSMA<std::time::Duration, u32>,
66}
67
68impl Node {
69    pub fn new(address: Address, latency_window_length: usize) -> Self {
70        Self {
71            address,
72            node_score: 0.0,
73            latency: SingleSumSMA::new(latency_window_length),
74        }
75    }
76
77    /// Update the score using the [`NodeScoreUpdate`] and [`ChannelGraphConfig`].
78    ///
79    /// The function will ensure additive (slow) ramp-up, but exponential (fast)
80    /// ramp-down of the node's score, depending on whether it was reachable or not.
81    /// The ramp-down has a cut-off at `offline_node_score_threshold`, below which
82    /// is the score set to zero.
83    pub fn update_score(&mut self, score_update: NodeScoreUpdate, cfg: ChannelGraphConfig) -> f64 {
84        match score_update {
85            NodeScoreUpdate::Reachable(latency) => {
86                self.node_score = 1.0_f64.min(self.node_score + cfg.node_score_step_up);
87                self.latency.push(latency);
88            }
89            NodeScoreUpdate::Unreachable => {
90                self.node_score /= cfg.node_score_decay;
91                self.latency.clear();
92                if self.node_score < cfg.offline_node_score_threshold {
93                    self.node_score = 0.0;
94                }
95            }
96            NodeScoreUpdate::Initialize(latency, node_score) => {
97                self.latency.clear();
98                self.latency.push(latency);
99                self.node_score = node_score.clamp(0.0, 1.0);
100            }
101        }
102        self.node_score
103    }
104}
105
106impl std::fmt::Display for Node {
107    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
108        write!(f, "{}; score={}", self.address, self.node_score)
109    }
110}
111
112/// Configuration for the [`ChannelGraph`].
113#[derive(Clone, Copy, Debug, PartialEq, smart_default::SmartDefault)]
114#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
115pub struct ChannelGraphConfig {
116    /// Length of the Simple Moving Average window for node latencies.
117    #[default(20)]
118    pub latency_sma_window_length: usize,
119    /// Additive node score modifier when the node is reachable.
120    #[default(0.1)]
121    pub node_score_step_up: f64,
122    /// Node score divisor when the node is unreachable.
123    #[default(4.0)]
124    pub node_score_decay: f64,
125    /// If a node is unreachable and because of that it reaches a score
126    /// lower than this threshold, it is considered offline (and in some situations
127    /// can be removed from the graph).
128    #[default(0.1)]
129    pub offline_node_score_threshold: f64,
130}
131
132/// Describes an update of the [`Node`]'s score.
133#[derive(Clone, Copy, Debug, PartialEq)]
134#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
135pub enum NodeScoreUpdate {
136    /// Node is reachable with the given latency.
137    Reachable(Duration),
138    /// Node is unreachable.
139    Unreachable,
140    /// Initializes a node's score to the given latency and quality.
141    /// This is useful during loading data from the persistent storage.
142    Initialize(Duration, f64),
143}
144
145impl<T> From<Result<Duration, T>> for NodeScoreUpdate {
146    fn from(result: Result<Duration, T>) -> Self {
147        match result {
148            Ok(duration) => NodeScoreUpdate::Reachable(duration),
149            Err(_) => NodeScoreUpdate::Unreachable,
150        }
151    }
152}
153
154/// Implements a HOPR payment channel graph (directed) cached in-memory.
155///
156/// This structure is useful for tracking channel state changes and
157/// packet path finding.
158///
159/// The edges are updated only from the Indexer, and therefore the graph contains only
160/// the channels *seen* on-chain.
161/// The nodes and their qualities are updated as they are observed on the network.
162///
163/// Using this structure is much faster than querying the DB and therefore
164/// is preferred for per-packet path-finding computations.
165/// Per default, the graph does not track channels in the `Closed` state and therefore
166/// cannot detect channel re-openings.
167///
168/// When a node reaches zero [quality](Node) and there are no edges (channels) containing this node,
169/// it is removed from the graph entirely.
170#[cfg_attr(feature = "serde", cfg_eval::cfg_eval, serde_with::serde_as)]
171#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
172#[derive(Clone, Debug)]
173pub struct ChannelGraph {
174    me: Address,
175    #[cfg_attr(feature = "serde", serde_as(as = "Vec<(_, _)>"))]
176    indices: HashMap<Address, u32>,
177    graph: StableDiGraph<Node, ChannelEdge>,
178    cfg: ChannelGraphConfig,
179}
180
181impl ChannelGraph {
182    /// The maximum number of intermediate hops the automatic path-finding algorithm will look for.
183    pub const INTERMEDIATE_HOPS: usize = 3;
184
185    /// Creates a new instance with the given self `Address`.
186    pub fn new(me: Address, cfg: ChannelGraphConfig) -> Self {
187        #[cfg(all(feature = "prometheus", not(test)))]
188        {
189            lazy_static::initialize(&METRIC_NUMBER_OF_CHANNELS);
190        }
191
192        let mut ret = Self {
193            me,
194            cfg,
195            indices: HashMap::new(),
196            graph: StableDiGraph::default(),
197        };
198        ret.indices.insert(
199            me,
200            ret.graph
201                .add_node(Node {
202                    address: me,
203                    node_score: 1.0,
204                    latency: SingleSumSMA::new_with_samples(
205                        cfg.latency_sma_window_length,
206                        vec![Duration::ZERO; cfg.latency_sma_window_length],
207                    ),
208                })
209                .index() as u32,
210        );
211        ret
212    }
213
214    /// Number of channels (edges) in the graph.
215    pub fn count_channels(&self) -> usize {
216        self.graph.edge_count()
217    }
218
219    /// Number of nodes in the graph.
220    pub fn count_nodes(&self) -> usize {
221        self.graph.node_count()
222    }
223
224    /// Checks if the channel is incoming to or outgoing from this node
225    pub fn is_own_channel(&self, channel: &ChannelEntry) -> bool {
226        channel.destination == self.me || channel.source == self.me
227    }
228
229    /// Convenience method to get this node's own address
230    pub fn my_address(&self) -> Address {
231        self.me
232    }
233
234    fn get_edge(&self, src: &Address, dst: &Address) -> Option<petgraph::stable_graph::EdgeReference<'_, ChannelEdge>> {
235        let (src_idx, dst_idx) = self
236            .indices
237            .get(src)
238            .and_then(|src| self.indices.get(dst).map(|dst| (*src, *dst)))?;
239        self.graph.edges_connecting(src_idx.into(), dst_idx.into()).next()
240    }
241
242    /// Looks up an `Open` or `PendingToClose` channel given the source and destination.
243    /// Returns `None` if no such edge exists in the graph.
244    pub fn get_channel(&self, source: &Address, destination: &Address) -> Option<&ChannelEntry> {
245        self.get_edge(source, destination).map(|e| &e.weight().channel)
246    }
247
248    /// Gets the node information.
249    /// Returns `None` if no such node exists in the graph.
250    pub fn get_node(&self, node: &Address) -> Option<&Node> {
251        self.indices
252            .get(node)
253            .and_then(|index| self.graph.node_weight((*index).into()))
254    }
255
256    /// Gets all `Open` outgoing channels going from the given [source](Address).
257    pub fn open_channels_from(&self, source: Address) -> impl Iterator<Item = (&Node, &ChannelEdge)> {
258        // If the source does not exist, select an impossible index to result in empty iterator.
259        let idx = self
260            .indices
261            .get(&source)
262            .cloned()
263            .unwrap_or(self.graph.node_count() as u32);
264        self.graph
265            .edges_directed(idx.into(), Direction::Outgoing)
266            .filter(|c| c.weight().channel.status == ChannelStatus::Open)
267            .map(|e| (&self.graph[e.target()], e.weight()))
268    }
269
270    /// Checks whether there is any path via Open channels that connects `source` and `destination`
271    /// This does not need to be necessarily a multi-hop path.
272    pub fn has_path(&self, source: &Address, destination: &Address) -> bool {
273        let only_open_graph = EdgeFiltered::from_fn(&self.graph, |e| e.weight().channel.status == ChannelStatus::Open);
274        if let Some((src_idx, dst_idx)) = self
275            .indices
276            .get(source)
277            .and_then(|src| self.indices.get(destination).map(|dst| (*src, *dst)))
278        {
279            has_path_connecting(&only_open_graph, src_idx.into(), dst_idx.into(), None)
280        } else {
281            false
282        }
283    }
284
285    /// Inserts or updates the given channel in the channel graph.
286    /// Returns a set of changes if the channel was already present in the graphs or
287    /// None if the channel was not previously present in the channel graph.
288    pub fn update_channel(&mut self, channel: ChannelEntry) -> Option<Vec<ChannelChange>> {
289        #[cfg(all(feature = "prometheus", not(test)))]
290        {
291            if let Some(direction) = channel.direction(&self.me) {
292                match direction {
293                    ChannelDirection::Outgoing => match channel.status {
294                        ChannelStatus::Closed => {
295                            METRIC_NUMBER_OF_CHANNELS.decrement(&["out"], 1.0);
296                        }
297                        ChannelStatus::Open => {
298                            METRIC_NUMBER_OF_CHANNELS.increment(&["out"], 1.0);
299                        }
300                        ChannelStatus::PendingToClose(_) => {}
301                    },
302                    ChannelDirection::Incoming => match channel.status {
303                        ChannelStatus::Closed => {
304                            METRIC_NUMBER_OF_CHANNELS.decrement(&["in"], 1.0);
305                        }
306                        ChannelStatus::Open => {
307                            METRIC_NUMBER_OF_CHANNELS.increment(&["in"], 1.0);
308                        }
309                        ChannelStatus::PendingToClose(_) => {}
310                    },
311                }
312            }
313        }
314
315        let maybe_edge_id = self.get_edge(&channel.source, &channel.destination).map(|e| e.id());
316
317        // Remove the edge since we don't allow Closed channels
318        if channel.status == ChannelStatus::Closed {
319            return maybe_edge_id
320                .and_then(|id| self.graph.remove_edge(id))
321                .inspect(|c| debug!("removed {}", c.channel))
322                .map(|old_value| ChannelChange::diff_channels(&old_value.channel, &channel));
323        }
324
325        // If an edge already exists, update it and compute ChannelDiff
326        if let Some(old_value) = maybe_edge_id.and_then(|id| self.graph.edge_weight_mut(id)) {
327            let old_channel = old_value.channel;
328            old_value.channel = channel;
329
330            let ret = ChannelChange::diff_channels(&old_channel, &channel);
331            debug!(
332                "updated {channel}: {}",
333                ret.iter().map(ChannelChange::to_string).collect::<Vec<_>>().join(",")
334            );
335            Some(ret)
336        } else {
337            // Otherwise, create a new edge and add the nodes with 0 quality if they don't yet exist
338            let src = *self.indices.entry(channel.source).or_insert_with(|| {
339                self.graph
340                    .add_node(Node::new(channel.source, self.cfg.latency_sma_window_length))
341                    .index() as u32
342            });
343
344            let dst = *self.indices.entry(channel.destination).or_insert_with(|| {
345                self.graph
346                    .add_node(Node::new(channel.destination, self.cfg.latency_sma_window_length))
347                    .index() as u32
348            });
349
350            let weighted = ChannelEdge {
351                channel,
352                edge_score: None,
353            };
354
355            self.graph.add_edge(src.into(), dst.into(), weighted);
356            debug!("new {channel}");
357
358            None
359        }
360    }
361
362    /// Updates the quality of a node (inserting it into the graph if it does not exist yet),
363    /// based on the given [`NodeScoreUpdate`].
364    pub fn update_node_score(&mut self, address: &Address, score_update: NodeScoreUpdate) {
365        if !self.me.eq(address) {
366            match self.indices.entry(*address) {
367                // The node exists
368                Entry::Occupied(existing) => {
369                    let existing_idx: NodeIndex = (*existing.get()).into();
370                    // NOTE: we cannot remove offline nodes that still have edges,
371                    // as we would lose the ability to track changes on those edges if they
372                    // were removed early.
373                    if score_update != NodeScoreUpdate::Unreachable
374                        || self.graph.neighbors_undirected(existing_idx).count() > 0
375                    {
376                        // We are for sure updating to a greater-than-zero score
377                        if let Some(node) = self.graph.node_weight_mut(existing_idx) {
378                            let updated_quality = node.update_score(score_update, self.cfg);
379                            debug!(%address, updated_quality, "updated node quality");
380                        } else {
381                            // This should not happen
382                            warn!(%address, "removed dangling node index from channel graph");
383                            existing.remove();
384                        }
385                    } else {
386                        // If the node has no neighbors, is unreachable, and reached very low
387                        // score, just remove it from the graph
388                        if self
389                            .graph
390                            .node_weight_mut(existing_idx)
391                            .map(|node| node.update_score(score_update, self.cfg))
392                            .is_some_and(|updated_quality| updated_quality < self.cfg.offline_node_score_threshold)
393                        {
394                            self.graph.remove_node(existing.remove().into());
395                            debug!(%address, "removed offline node with no channels");
396                        }
397                    }
398                }
399                // The node does not exist, and we are updating to greater-than-zero quality
400                Entry::Vacant(new_node) if score_update != NodeScoreUpdate::Unreachable => {
401                    let mut inserted_node = Node::new(*address, self.cfg.latency_sma_window_length);
402                    let updated_quality = inserted_node.update_score(score_update, self.cfg);
403                    new_node.insert(self.graph.add_node(inserted_node).index() as u32);
404                    debug!(%address, updated_quality, "added new node");
405                }
406                // We do not want to add unreachable nodes to the graph, so do nothing otherwise
407                Entry::Vacant(_) => {}
408            }
409        }
410    }
411
412    /// Updates the score value of network connection between `source` and `destination`
413    /// The given score value must always be non-negative.
414    pub fn update_channel_score(&mut self, source: &Address, destination: &Address, score: f64) {
415        assert!(score >= 0_f64, "score must be non-negative");
416        let maybe_edge_id = self.get_edge(source, destination).map(|e| e.id());
417        if let Some(channel) = maybe_edge_id.and_then(|id| self.graph.edge_weight_mut(id)) {
418            if score != channel.edge_score.unwrap_or(-1_f64) {
419                channel.edge_score = Some(score);
420                debug!("updated score of {} to {score}", channel.channel);
421            }
422        }
423    }
424
425    /// Gets quality of the given channel. Returns `None` if no such channel exists, or no
426    /// quality has been set for that channel.
427    pub fn get_channel_score(&self, source: &Address, destination: &Address) -> Option<f64> {
428        self.get_edge(source, destination)
429            .and_then(|e| self.graph.edge_weight(e.id()))
430            .and_then(|e| e.edge_score)
431    }
432
433    /// Checks whether the given channel is in the graph already.
434    pub fn contains_channel(&self, channel: &ChannelEntry) -> bool {
435        self.get_channel(&channel.source, &channel.destination).is_some()
436    }
437
438    /// Checks whether the given node is in the channel graph.
439    pub fn contains_node(&self, address: &Address) -> bool {
440        self.get_node(address).is_some()
441    }
442
443    /// Outputs the channel graph in the DOT (graphviz) format with the given `config`.
444    pub fn as_dot(&self, cfg: GraphExportConfig) -> String {
445        if cfg.ignore_disconnected_components {
446            let only_open_graph =
447                EdgeFiltered::from_fn(&self.graph, |e| e.weight().channel.status == ChannelStatus::Open);
448
449            let me_idx: NodeIndex = (*self.indices.get(&self.me).expect("graph must contain self")).into();
450
451            Dot::new(&NodeFiltered::from_fn(&self.graph, |n| {
452                // Include only nodes that have non-zero quality,
453                // and there is a path to them in an Open channel graph
454                self.graph.node_weight(n).is_some_and(|n| n.node_score > 0_f64)
455                    && has_path_connecting(&only_open_graph, me_idx, n, None)
456            }))
457            .to_string()
458        } else if cfg.ignore_non_opened_channels {
459            // Keep nodes that have at least one incoming or outgoing Open channel
460            Dot::new(&NodeFiltered::from_fn(&self.graph, |a| {
461                self.graph
462                    .edges_directed(a, Direction::Outgoing)
463                    .any(|e| e.weight().channel.status == ChannelStatus::Open)
464                    || self
465                        .graph
466                        .edges_directed(a, Direction::Incoming)
467                        .any(|e| e.weight().channel.status == ChannelStatus::Open)
468            }))
469            .to_string()
470        } else if cfg.only_3_hop_accessible_nodes {
471            // Keep only those nodes that are accessible from via less than 3 hop paths
472            let me_idx: NodeIndex = (*self.indices.get(&self.me).expect("graph must contain self")).into();
473            let distances = petgraph::algo::dijkstra(&self.graph, me_idx, None, |e| {
474                if e.weight().channel.status == ChannelStatus::Open {
475                    1
476                } else {
477                    100
478                }
479            });
480
481            Dot::new(&NodeFiltered::from_fn(&self.graph, |a| {
482                distances.get(&a).map(|d| *d <= 3).unwrap_or(false)
483            }))
484            .to_string()
485        } else {
486            Dot::new(&self.graph).to_string()
487        }
488    }
489}
490
491/// Configuration for the DOT export of the [`ChannelGraph`].
492///
493/// See [`ChannelGraph::as_dot`].
494#[derive(Clone, Debug, Default, Eq, PartialEq)]
495pub struct GraphExportConfig {
496    /// If set, nodes that are not connected to this node (via open channels) will not be exported.
497    ///
498    /// This setting automatically implies `ignore_non_opened_channels`.
499    pub ignore_disconnected_components: bool,
500    /// Do not export channels that are not in the [`ChannelStatus::Open`] state.
501    pub ignore_non_opened_channels: bool,
502    /// Show only nodes that are accessible via 3-hops (via open channels) from this node.
503    pub only_3_hop_accessible_nodes: bool,
504}
505
506#[cfg(test)]
507mod tests {
508    use std::{
509        ops::Add,
510        time::{Duration, SystemTime},
511    };
512
513    use anyhow::{Context, anyhow};
514    use hopr_internal_types::channels::{ChannelChange, ChannelStatus};
515    use hopr_primitive_types::prelude::*;
516
517    use super::*;
518    use crate::{
519        channel_graph::ChannelGraph,
520        tests::{ADDRESSES, dummy_channel},
521    };
522
523    #[test]
524    fn channel_graph_self_addr() {
525        let cg = ChannelGraph::new(ADDRESSES[0], Default::default());
526        assert_eq!(ADDRESSES[0], cg.my_address(), "must produce correct self address");
527
528        assert!(cg.contains_node(&ADDRESSES[0]), "must contain self address");
529
530        assert_eq!(
531            cg.get_node(&ADDRESSES[0]).cloned(),
532            Some(Node {
533                address: ADDRESSES[0],
534                node_score: 1.0,
535                latency: SingleSumSMA::new_with_samples(
536                    cg.cfg.latency_sma_window_length,
537                    vec![Duration::ZERO; cg.cfg.latency_sma_window_length]
538                )
539            }),
540            "must contain self node with quality 1"
541        );
542    }
543
544    #[test]
545    fn channel_graph_has_path() {
546        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
547
548        let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
549        cg.update_channel(c);
550
551        assert!(cg.contains_channel(&c), "must contain channel");
552
553        assert!(cg.contains_node(&ADDRESSES[0]), "must contain channel source");
554
555        assert!(cg.contains_node(&ADDRESSES[1]), "must contain channel destination");
556
557        assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]), "must have simple path");
558
559        assert!(
560            !cg.has_path(&ADDRESSES[0], &ADDRESSES[2]),
561            "must not have non existent path"
562        );
563    }
564
565    #[test]
566    fn channel_graph_update_node_quality() -> anyhow::Result<()> {
567        let mut cg = ChannelGraph::new(
568            ADDRESSES[0],
569            ChannelGraphConfig {
570                node_score_step_up: 0.1,
571                node_score_decay: 4.0,
572                offline_node_score_threshold: 0.1,
573                ..Default::default()
574            },
575        );
576
577        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(100)));
578        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
579        assert_eq!(node.node_score, 0.1);
580        assert_eq!(node.latency.average(), Some(Duration::from_millis(100)));
581
582        assert!(!cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
583
584        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
585        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
586        assert_eq!(node.node_score, 0.2);
587        assert_eq!(node.latency.average(), Some(Duration::from_millis(75)));
588
589        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(30)));
590        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(20)));
591
592        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
593        assert_eq!(node.node_score, 0.4);
594        assert_eq!(node.latency.average(), Some(Duration::from_millis(50)));
595
596        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
597        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
598        assert_eq!(node.node_score, 0.1);
599        assert!(node.latency.average().is_none());
600
601        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
602
603        // At this point the node will be removed because there are no channels with it
604        assert_eq!(cg.get_node(&ADDRESSES[1]), None);
605
606        Ok(())
607    }
608
609    #[test]
610    fn channel_graph_update_node_quality_should_not_remove_nodes_with_zero_quality_and_path() -> anyhow::Result<()> {
611        let mut cg = ChannelGraph::new(
612            ADDRESSES[0],
613            ChannelGraphConfig {
614                node_score_step_up: 0.1,
615                node_score_decay: 4.0,
616                offline_node_score_threshold: 0.1,
617                ..Default::default()
618            },
619        );
620
621        let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
622        cg.update_channel(c);
623
624        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
625        assert_eq!(node.address, ADDRESSES[1]);
626        assert_eq!(node.node_score, 0.0);
627        assert!(node.latency.is_empty());
628
629        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
630
631        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
632        assert_eq!(node.address, ADDRESSES[1]);
633        assert_eq!(node.node_score, 0.1);
634
635        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
636        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
637        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
638
639        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
640        assert_eq!(node.address, ADDRESSES[1]);
641        assert_eq!(node.node_score, 0.4);
642
643        assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
644
645        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
646
647        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
648        assert_eq!(node.address, ADDRESSES[1]);
649        assert_eq!(node.node_score, 0.1);
650
651        assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
652
653        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
654
655        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
656        assert_eq!(node.address, ADDRESSES[1]);
657        assert_eq!(node.node_score, 0.0);
658
659        assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
660
661        Ok(())
662    }
663
664    #[test]
665    fn channel_graph_update_channel_score() -> anyhow::Result<()> {
666        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
667
668        let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
669        cg.update_channel(c);
670
671        assert!(cg.contains_channel(&c), "must contain channel");
672        assert!(
673            cg.get_channel_score(&ADDRESSES[0], &ADDRESSES[1]).is_none(),
674            "must start with no quality info"
675        );
676
677        cg.update_channel_score(&ADDRESSES[0], &ADDRESSES[1], 0.5_f64);
678
679        let q = cg
680            .get_channel_score(&ADDRESSES[0], &ADDRESSES[1])
681            .context("must have quality when set")?;
682        assert_eq!(0.5_f64, q, "quality must be equal");
683
684        Ok(())
685    }
686
687    #[test]
688    fn channel_graph_is_own_channel() {
689        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
690
691        let c1 = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
692        let c2 = dummy_channel(ADDRESSES[1], ADDRESSES[2], ChannelStatus::Open);
693        cg.update_channel(c1);
694        cg.update_channel(c2);
695
696        assert!(cg.is_own_channel(&c1), "must detect as own channel");
697        assert!(!cg.is_own_channel(&c2), "must not detect as own channel");
698    }
699
700    #[test]
701    fn channel_graph_update_changes() -> anyhow::Result<()> {
702        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
703
704        let mut c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
705
706        let changes = cg.update_channel(c);
707        assert!(changes.is_none(), "must not produce changes for a new channel");
708
709        let cr = cg
710            .get_channel(&ADDRESSES[0], &ADDRESSES[1])
711            .context("must contain channel")?;
712        assert!(c.eq(cr), "channels must be equal");
713
714        let ts = SystemTime::now().add(Duration::from_secs(10));
715        c.balance = 0.into();
716        c.status = ChannelStatus::PendingToClose(ts);
717        let changes = cg.update_channel(c).context("should contain channel changes")?;
718        assert_eq!(2, changes.len(), "must contain 2 changes");
719
720        for change in changes {
721            match change {
722                ChannelChange::Status { left, right } => {
723                    assert_eq!(ChannelStatus::Open, left, "previous status does not match");
724                    assert_eq!(ChannelStatus::PendingToClose(ts), right, "new status does not match");
725                }
726                ChannelChange::CurrentBalance { left, right } => {
727                    assert_eq!(HoprBalance::from(1), left, "previous balance does not match");
728                    assert_eq!(HoprBalance::zero(), right, "new balance does not match");
729                }
730                _ => panic!("unexpected change"),
731            }
732        }
733
734        let cr = cg
735            .get_channel(&ADDRESSES[0], &ADDRESSES[1])
736            .context("must contain channel")?;
737        assert!(c.eq(cr), "channels must be equal");
738
739        Ok(())
740    }
741
742    #[test]
743    fn channel_graph_update_changes_on_close() -> anyhow::Result<()> {
744        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
745
746        let ts = SystemTime::now().add(Duration::from_secs(10));
747        let mut c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::PendingToClose(ts));
748
749        let changes = cg.update_channel(c);
750        assert!(changes.is_none(), "must not produce changes for a new channel");
751
752        let cr = cg
753            .get_channel(&ADDRESSES[0], &ADDRESSES[1])
754            .context("must contain channel")?;
755        assert!(c.eq(cr), "channels must be equal");
756
757        c.balance = 0.into();
758        c.status = ChannelStatus::Closed;
759        let changes = cg.update_channel(c).context("must contain changes")?;
760        assert_eq!(2, changes.len(), "must contain 2 changes");
761
762        for change in changes {
763            match change {
764                ChannelChange::Status { left, right } => {
765                    assert_eq!(
766                        ChannelStatus::PendingToClose(ts),
767                        left,
768                        "previous status does not match"
769                    );
770                    assert_eq!(ChannelStatus::Closed, right, "new status does not match");
771                }
772                ChannelChange::CurrentBalance { left, right } => {
773                    assert_eq!(HoprBalance::from(1), left, "previous balance does not match");
774                    assert_eq!(HoprBalance::zero(), right, "new balance does not match");
775                }
776                _ => panic!("unexpected change"),
777            }
778        }
779
780        let cr = cg.get_channel(&ADDRESSES[0], &ADDRESSES[1]);
781        assert!(cr.is_none(), "must not contain channel after closing");
782
783        Ok(())
784    }
785
786    #[test]
787    fn channel_graph_update_should_not_allow_closed_channels() {
788        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
789        let changes = cg.update_channel(dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Closed));
790        assert!(changes.is_none(), "must not produce changes for a closed channel");
791
792        let c = cg.get_channel(&ADDRESSES[0], &ADDRESSES[1]);
793        assert!(c.is_none(), "must not allow adding closed channels");
794    }
795
796    #[test]
797    fn channel_graph_update_should_allow_pending_to_close_channels() -> anyhow::Result<()> {
798        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
799        let ts = SystemTime::now().add(Duration::from_secs(10));
800        let changes = cg.update_channel(dummy_channel(
801            ADDRESSES[0],
802            ADDRESSES[1],
803            ChannelStatus::PendingToClose(ts),
804        ));
805        assert!(changes.is_none(), "must not produce changes for a closed channel");
806
807        cg.get_channel(&ADDRESSES[0], &ADDRESSES[1])
808            .context("must allow PendingToClose channels")?;
809
810        Ok(())
811    }
812}