hopr_path/
channel_graph.rs

1use std::{
2    collections::{HashMap, hash_map::Entry},
3    fmt::{Debug, Formatter},
4    time::Duration,
5};
6
7use hopr_internal_types::prelude::*;
8use hopr_primitive_types::{prelude::SMA, primitives::Address, sma::SingleSumSMA};
9use petgraph::{
10    Direction,
11    algo::has_path_connecting,
12    dot::Dot,
13    prelude::StableDiGraph,
14    stable_graph::NodeIndex,
15    visit::{EdgeFiltered, EdgeRef, NodeFiltered},
16};
17use tracing::{debug, warn};
18#[cfg(all(feature = "prometheus", not(test)))]
19use {
20    hopr_internal_types::channels::ChannelDirection, hopr_metrics::metrics::MultiGauge,
21    hopr_primitive_types::traits::ToHex,
22};
23
24#[cfg(all(feature = "prometheus", not(test)))]
25lazy_static::lazy_static! {
26    static ref METRIC_NUMBER_OF_CHANNELS: MultiGauge = MultiGauge::new(
27        "hopr_channels_count",
28        "Number of channels per direction",
29        &["direction"]
30    ).unwrap();
31    static ref METRIC_CHANNEL_BALANCES: MultiGauge = MultiGauge::new(
32        "hopr_channel_balances",
33        "Balances on channels per counterparty",
34        &["counterparty", "direction"]
35    ).unwrap();
36}
37
38/// Structure that adds additional data to a `ChannelEntry`, which
39/// can be used to compute edge weights and traverse the `ChannelGraph`.
40#[derive(Clone, Copy, Debug, PartialEq)]
41#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
42pub struct ChannelEdge {
43    /// Underlying channel
44    pub channel: ChannelEntry,
45    /// Optional scoring of the edge that might be used for path planning.
46    pub edge_score: Option<f64>,
47}
48
49impl std::fmt::Display for ChannelEdge {
50    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
51        write!(
52            f,
53            "{}; stake={}; score={:?}; status={};",
54            self.channel,
55            self.channel.balance,
56            self.edge_score,
57            self.channel.status.to_string().to_lowercase()
58        )
59    }
60}
61
62/// Represents a node in the Channel Graph.
63/// This is typically represented by an on-chain address and ping quality, which
64/// represents some kind of node's liveness as perceived by us.
65#[derive(Clone, Debug, PartialEq)]
66#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
67pub struct Node {
68    /// Node's on-chain address.
69    pub address: Address,
70    /// Liveness of the node.
71    pub node_score: f64,
72    /// Average node latency
73    pub latency: SingleSumSMA<std::time::Duration, u32>,
74}
75
76impl Node {
77    pub fn new(address: Address, latency_window_length: usize) -> Self {
78        Self {
79            address,
80            node_score: 0.0,
81            latency: SingleSumSMA::new(latency_window_length),
82        }
83    }
84
85    /// Update the score using the [`NodeScoreUpdate`] and [`ChannelGraphConfig`].
86    ///
87    /// The function will ensure additive (slow) ramp-up, but exponential (fast)
88    /// ramp-down of the node's score, depending on whether it was reachable or not.
89    /// The ramp-down has a cut-off at `offline_node_score_threshold`, below which
90    /// is the score set to zero.
91    pub fn update_score(&mut self, score_update: NodeScoreUpdate, cfg: ChannelGraphConfig) -> f64 {
92        match score_update {
93            NodeScoreUpdate::Reachable(latency) => {
94                self.node_score = 1.0_f64.min(self.node_score + cfg.node_score_step_up);
95                self.latency.push(latency);
96            }
97            NodeScoreUpdate::Unreachable => {
98                self.node_score /= cfg.node_score_decay;
99                self.latency.clear();
100                if self.node_score < cfg.offline_node_score_threshold {
101                    self.node_score = 0.0;
102                }
103            }
104            NodeScoreUpdate::Initialize(latency, node_score) => {
105                self.latency.clear();
106                self.latency.push(latency);
107                self.node_score = node_score.clamp(0.0, 1.0);
108            }
109        }
110        self.node_score
111    }
112}
113
114impl std::fmt::Display for Node {
115    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
116        write!(f, "{}; score={}", self.address, self.node_score)
117    }
118}
119
120/// Configuration for the [`ChannelGraph`].
121#[derive(Clone, Copy, Debug, PartialEq, smart_default::SmartDefault)]
122#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
123pub struct ChannelGraphConfig {
124    /// Length of the Simple Moving Average window for node latencies.
125    #[default(20)]
126    pub latency_sma_window_length: usize,
127    /// Additive node score modifier when the node is reachable.
128    #[default(0.1)]
129    pub node_score_step_up: f64,
130    /// Node score divisor when the node is unreachable.
131    #[default(4.0)]
132    pub node_score_decay: f64,
133    /// If a node is unreachable and because of that it reaches a score
134    /// lower than this threshold, it is considered offline (and in some situations
135    /// can be removed from the graph).
136    #[default(0.1)]
137    pub offline_node_score_threshold: f64,
138}
139
140/// Describes an update of the [`Node`]'s score.
141#[derive(Clone, Copy, Debug, PartialEq)]
142#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
143pub enum NodeScoreUpdate {
144    /// Node is reachable with the given latency.
145    Reachable(Duration),
146    /// Node is unreachable.
147    Unreachable,
148    /// Initializes a node's score to the given latency and quality.
149    /// This is useful during loading data from the persistent storage.
150    Initialize(Duration, f64),
151}
152
153impl<T> From<Result<Duration, T>> for NodeScoreUpdate {
154    fn from(result: Result<Duration, T>) -> Self {
155        match result {
156            Ok(duration) => NodeScoreUpdate::Reachable(duration),
157            Err(_) => NodeScoreUpdate::Unreachable,
158        }
159    }
160}
161
162/// Implements a HOPR payment channel graph (directed) cached in-memory.
163///
164/// This structure is useful for tracking channel state changes and
165/// packet path finding.
166///
167/// The edges are updated only from the Indexer, and therefore the graph contains only
168/// the channels *seen* on-chain.
169/// The nodes and their qualities are updated as they are observed on the network.
170///
171/// Using this structure is much faster than querying the DB and therefore
172/// is preferred for per-packet path-finding computations.
173/// Per default, the graph does not track channels in the `Closed` state and therefore
174/// cannot detect channel re-openings.
175///
176/// When a node reaches zero [quality](Node) and there are no edges (channels) containing this node,
177/// it is removed from the graph entirely.
178#[cfg_attr(feature = "serde", cfg_eval::cfg_eval, serde_with::serde_as)]
179#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
180#[derive(Clone, Debug)]
181pub struct ChannelGraph {
182    me: Address,
183    #[cfg_attr(feature = "serde", serde_as(as = "Vec<(_, _)>"))]
184    indices: HashMap<Address, u32>,
185    graph: StableDiGraph<Node, ChannelEdge>,
186    cfg: ChannelGraphConfig,
187}
188
189impl ChannelGraph {
190    /// The maximum number of intermediate hops the automatic path-finding algorithm will look for.
191    pub const INTERMEDIATE_HOPS: usize = 3;
192
193    /// Creates a new instance with the given self `Address`.
194    pub fn new(me: Address, cfg: ChannelGraphConfig) -> Self {
195        #[cfg(all(feature = "prometheus", not(test)))]
196        {
197            lazy_static::initialize(&METRIC_NUMBER_OF_CHANNELS);
198        }
199
200        let mut ret = Self {
201            me,
202            cfg,
203            indices: HashMap::new(),
204            graph: StableDiGraph::default(),
205        };
206        ret.indices.insert(
207            me,
208            ret.graph
209                .add_node(Node {
210                    address: me,
211                    node_score: 1.0,
212                    latency: SingleSumSMA::new_with_samples(
213                        cfg.latency_sma_window_length,
214                        vec![Duration::ZERO; cfg.latency_sma_window_length],
215                    ),
216                })
217                .index() as u32,
218        );
219        ret
220    }
221
222    /// Number of channels (edges) in the graph.
223    pub fn count_channels(&self) -> usize {
224        self.graph.edge_count()
225    }
226
227    /// Number of nodes in the graph.
228    pub fn count_nodes(&self) -> usize {
229        self.graph.node_count()
230    }
231
232    /// Checks if the channel is incoming to or outgoing from this node
233    pub fn is_own_channel(&self, channel: &ChannelEntry) -> bool {
234        channel.destination == self.me || channel.source == self.me
235    }
236
237    /// Convenience method to get this node's own address
238    pub fn my_address(&self) -> Address {
239        self.me
240    }
241
242    fn get_edge(&self, src: &Address, dst: &Address) -> Option<petgraph::stable_graph::EdgeReference<ChannelEdge>> {
243        let (src_idx, dst_idx) = self
244            .indices
245            .get(src)
246            .and_then(|src| self.indices.get(dst).map(|dst| (*src, *dst)))?;
247        self.graph.edges_connecting(src_idx.into(), dst_idx.into()).next()
248    }
249
250    /// Looks up an `Open` or `PendingToClose` channel given the source and destination.
251    /// Returns `None` if no such edge exists in the graph.
252    pub fn get_channel(&self, source: &Address, destination: &Address) -> Option<&ChannelEntry> {
253        self.get_edge(source, destination).map(|e| &e.weight().channel)
254    }
255
256    /// Gets the node information.
257    /// Returns `None` if no such node exists in the graph.
258    pub fn get_node(&self, node: &Address) -> Option<&Node> {
259        self.indices
260            .get(node)
261            .and_then(|index| self.graph.node_weight((*index).into()))
262    }
263
264    /// Gets all `Open` outgoing channels going from the given [source](Address).
265    pub fn open_channels_from(&self, source: Address) -> impl Iterator<Item = (&Node, &ChannelEdge)> {
266        // If the source does not exist, select an impossible index to result in empty iterator.
267        let idx = self
268            .indices
269            .get(&source)
270            .cloned()
271            .unwrap_or(self.graph.node_count() as u32);
272        self.graph
273            .edges_directed(idx.into(), Direction::Outgoing)
274            .filter(|c| c.weight().channel.status == ChannelStatus::Open)
275            .map(|e| (&self.graph[e.target()], e.weight()))
276    }
277
278    /// Checks whether there is any path via Open channels that connects `source` and `destination`
279    /// This does not need to be necessarily a multi-hop path.
280    pub fn has_path(&self, source: &Address, destination: &Address) -> bool {
281        let only_open_graph = EdgeFiltered::from_fn(&self.graph, |e| e.weight().channel.status == ChannelStatus::Open);
282        if let Some((src_idx, dst_idx)) = self
283            .indices
284            .get(source)
285            .and_then(|src| self.indices.get(destination).map(|dst| (*src, *dst)))
286        {
287            has_path_connecting(&only_open_graph, src_idx.into(), dst_idx.into(), None)
288        } else {
289            false
290        }
291    }
292
293    /// Inserts or updates the given channel in the channel graph.
294    /// Returns a set of changes if the channel was already present in the graphs or
295    /// None if the channel was not previously present in the channel graph.
296    pub fn update_channel(&mut self, channel: ChannelEntry) -> Option<Vec<ChannelChange>> {
297        #[cfg(all(feature = "prometheus", not(test)))]
298        {
299            if let Some(direction) = channel.direction(&self.me) {
300                match direction {
301                    ChannelDirection::Outgoing => match channel.status {
302                        ChannelStatus::Closed => {
303                            METRIC_NUMBER_OF_CHANNELS.decrement(&["out"], 1.0);
304                            METRIC_CHANNEL_BALANCES.set(&[channel.destination.to_hex().as_str(), "out"], 0.0);
305                        }
306                        ChannelStatus::Open => {
307                            METRIC_NUMBER_OF_CHANNELS.increment(&["out"], 1.0);
308                            METRIC_CHANNEL_BALANCES.set(
309                                &[channel.destination.to_hex().as_str(), "out"],
310                                channel
311                                    .balance
312                                    .amount_in_base_units()
313                                    .parse::<f64>()
314                                    .unwrap_or(f64::INFINITY),
315                            );
316                        }
317                        ChannelStatus::PendingToClose(_) => {}
318                    },
319                    ChannelDirection::Incoming => match channel.status {
320                        ChannelStatus::Closed => {
321                            METRIC_NUMBER_OF_CHANNELS.decrement(&["in"], 1.0);
322                            METRIC_CHANNEL_BALANCES.set(&[channel.source.to_hex().as_str(), "in"], 0.0);
323                        }
324                        ChannelStatus::Open => {
325                            METRIC_NUMBER_OF_CHANNELS.increment(&["in"], 1.0);
326                            METRIC_CHANNEL_BALANCES.set(
327                                &[channel.source.to_hex().as_str(), "in"],
328                                channel
329                                    .balance
330                                    .amount_in_base_units()
331                                    .parse::<f64>()
332                                    .unwrap_or(f64::INFINITY),
333                            );
334                        }
335                        ChannelStatus::PendingToClose(_) => {}
336                    },
337                }
338            }
339        }
340
341        let maybe_edge_id = self.get_edge(&channel.source, &channel.destination).map(|e| e.id());
342
343        // Remove the edge since we don't allow Closed channels
344        if channel.status == ChannelStatus::Closed {
345            return maybe_edge_id
346                .and_then(|id| self.graph.remove_edge(id))
347                .inspect(|c| debug!("removed {}", c.channel))
348                .map(|old_value| ChannelChange::diff_channels(&old_value.channel, &channel));
349        }
350
351        // If an edge already exists, update it and compute ChannelDiff
352        if let Some(old_value) = maybe_edge_id.and_then(|id| self.graph.edge_weight_mut(id)) {
353            let old_channel = old_value.channel;
354            old_value.channel = channel;
355
356            let ret = ChannelChange::diff_channels(&old_channel, &channel);
357            debug!(
358                "updated {channel}: {}",
359                ret.iter().map(ChannelChange::to_string).collect::<Vec<_>>().join(",")
360            );
361            Some(ret)
362        } else {
363            // Otherwise, create a new edge and add the nodes with 0 quality if they don't yet exist
364            let src = *self.indices.entry(channel.source).or_insert_with(|| {
365                self.graph
366                    .add_node(Node::new(channel.source, self.cfg.latency_sma_window_length))
367                    .index() as u32
368            });
369
370            let dst = *self.indices.entry(channel.destination).or_insert_with(|| {
371                self.graph
372                    .add_node(Node::new(channel.destination, self.cfg.latency_sma_window_length))
373                    .index() as u32
374            });
375
376            let weighted = ChannelEdge {
377                channel,
378                edge_score: None,
379            };
380
381            self.graph.add_edge(src.into(), dst.into(), weighted);
382            debug!("new {channel}");
383
384            None
385        }
386    }
387
388    /// Updates the quality of a node (inserting it into the graph if it does not exist yet),
389    /// based on the given [`NodeScoreUpdate`].
390    pub fn update_node_score(&mut self, address: &Address, score_update: NodeScoreUpdate) {
391        if !self.me.eq(address) {
392            match self.indices.entry(*address) {
393                // The node exists
394                Entry::Occupied(existing) => {
395                    let existing_idx: NodeIndex = (*existing.get()).into();
396                    // NOTE: we cannot remove offline nodes that still have edges,
397                    // as we would lose the ability to track changes on those edges if they
398                    // were removed early.
399                    if score_update != NodeScoreUpdate::Unreachable
400                        || self.graph.neighbors_undirected(existing_idx).count() > 0
401                    {
402                        // We are for sure updating to a greater-than-zero score
403                        if let Some(node) = self.graph.node_weight_mut(existing_idx) {
404                            let updated_quality = node.update_score(score_update, self.cfg);
405                            debug!(%address, updated_quality, "updated node quality");
406                        } else {
407                            // This should not happen
408                            warn!(%address, "removed dangling node index from channel graph");
409                            existing.remove();
410                        }
411                    } else {
412                        // If the node has no neighbors, is unreachable, and reached very low
413                        // score, just remove it from the graph
414                        if self
415                            .graph
416                            .node_weight_mut(existing_idx)
417                            .map(|node| node.update_score(score_update, self.cfg))
418                            .is_some_and(|updated_quality| updated_quality < self.cfg.offline_node_score_threshold)
419                        {
420                            self.graph.remove_node(existing.remove().into());
421                            debug!(%address, "removed offline node with no channels");
422                        }
423                    }
424                }
425                // The node does not exist, and we are updating to greater-than-zero quality
426                Entry::Vacant(new_node) if score_update != NodeScoreUpdate::Unreachable => {
427                    let mut inserted_node = Node::new(*address, self.cfg.latency_sma_window_length);
428                    let updated_quality = inserted_node.update_score(score_update, self.cfg);
429                    new_node.insert(self.graph.add_node(inserted_node).index() as u32);
430                    debug!(%address, updated_quality, "added new node");
431                }
432                // We do not want to add unreachable nodes to the graph, so do nothing otherwise
433                Entry::Vacant(_) => {}
434            }
435        }
436    }
437
438    /// Updates the score value of network connection between `source` and `destination`
439    /// The given score value must always be non-negative.
440    pub fn update_channel_score(&mut self, source: &Address, destination: &Address, score: f64) {
441        assert!(score >= 0_f64, "score must be non-negative");
442        let maybe_edge_id = self.get_edge(source, destination).map(|e| e.id());
443        if let Some(channel) = maybe_edge_id.and_then(|id| self.graph.edge_weight_mut(id)) {
444            if score != channel.edge_score.unwrap_or(-1_f64) {
445                channel.edge_score = Some(score);
446                debug!("updated score of {} to {score}", channel.channel);
447            }
448        }
449    }
450
451    /// Gets quality of the given channel. Returns `None` if no such channel exists, or no
452    /// quality has been set for that channel.
453    pub fn get_channel_score(&self, source: &Address, destination: &Address) -> Option<f64> {
454        self.get_edge(source, destination)
455            .and_then(|e| self.graph.edge_weight(e.id()))
456            .and_then(|e| e.edge_score)
457    }
458
459    /// Checks whether the given channel is in the graph already.
460    pub fn contains_channel(&self, channel: &ChannelEntry) -> bool {
461        self.get_channel(&channel.source, &channel.destination).is_some()
462    }
463
464    /// Checks whether the given node is in the channel graph.
465    pub fn contains_node(&self, address: &Address) -> bool {
466        self.get_node(address).is_some()
467    }
468
469    /// Outputs the channel graph in the DOT (graphviz) format with the given `config`.
470    pub fn as_dot(&self, cfg: GraphExportConfig) -> String {
471        if cfg.ignore_disconnected_components {
472            let only_open_graph =
473                EdgeFiltered::from_fn(&self.graph, |e| e.weight().channel.status == ChannelStatus::Open);
474
475            let me_idx: NodeIndex = (*self.indices.get(&self.me).expect("graph must contain self")).into();
476
477            Dot::new(&NodeFiltered::from_fn(&self.graph, |n| {
478                // Include only nodes that have non-zero quality,
479                // and there is a path to them in an Open channel graph
480                self.graph.node_weight(n).is_some_and(|n| n.node_score > 0_f64)
481                    && has_path_connecting(&only_open_graph, me_idx, n, None)
482            }))
483            .to_string()
484        } else if cfg.ignore_non_opened_channels {
485            // Keep nodes that have at least one incoming or outgoing Open channel
486            Dot::new(&NodeFiltered::from_fn(&self.graph, |a| {
487                self.graph
488                    .edges_directed(a, Direction::Outgoing)
489                    .any(|e| e.weight().channel.status == ChannelStatus::Open)
490                    || self
491                        .graph
492                        .edges_directed(a, Direction::Incoming)
493                        .any(|e| e.weight().channel.status == ChannelStatus::Open)
494            }))
495            .to_string()
496        } else if cfg.only_3_hop_accessible_nodes {
497            // Keep only those nodes that are accessible from via less than 3 hop paths
498            let me_idx: NodeIndex = (*self.indices.get(&self.me).expect("graph must contain self")).into();
499            let distances = petgraph::algo::dijkstra(&self.graph, me_idx, None, |e| {
500                if e.weight().channel.status == ChannelStatus::Open {
501                    1
502                } else {
503                    100
504                }
505            });
506
507            Dot::new(&NodeFiltered::from_fn(&self.graph, |a| {
508                distances.get(&a).map(|d| *d <= 3).unwrap_or(false)
509            }))
510            .to_string()
511        } else {
512            Dot::new(&self.graph).to_string()
513        }
514    }
515}
516
517/// Configuration for the DOT export of the [`ChannelGraph`].
518///
519/// See [`ChannelGraph::as_dot`].
520#[derive(Clone, Debug, Default, Eq, PartialEq)]
521pub struct GraphExportConfig {
522    /// If set, nodes that are not connected to this node (via open channels) will not be exported.
523    ///
524    /// This setting automatically implies `ignore_non_opened_channels`.
525    pub ignore_disconnected_components: bool,
526    /// Do not export channels that are not in the [`ChannelStatus::Open`] state.
527    pub ignore_non_opened_channels: bool,
528    /// Show only nodes that are accessible via 3-hops (via open channels) from this node.
529    pub only_3_hop_accessible_nodes: bool,
530}
531
532#[cfg(test)]
533mod tests {
534    use std::{
535        ops::Add,
536        time::{Duration, SystemTime},
537    };
538
539    use anyhow::{Context, anyhow};
540    use hopr_internal_types::channels::{ChannelChange, ChannelStatus};
541    use hopr_primitive_types::prelude::*;
542
543    use super::*;
544    use crate::{
545        channel_graph::ChannelGraph,
546        tests::{ADDRESSES, dummy_channel},
547    };
548
549    #[test]
550    fn channel_graph_self_addr() {
551        let cg = ChannelGraph::new(ADDRESSES[0], Default::default());
552        assert_eq!(ADDRESSES[0], cg.my_address(), "must produce correct self address");
553
554        assert!(cg.contains_node(&ADDRESSES[0]), "must contain self address");
555
556        assert_eq!(
557            cg.get_node(&ADDRESSES[0]).cloned(),
558            Some(Node {
559                address: ADDRESSES[0],
560                node_score: 1.0,
561                latency: SingleSumSMA::new_with_samples(
562                    cg.cfg.latency_sma_window_length,
563                    vec![Duration::ZERO; cg.cfg.latency_sma_window_length]
564                )
565            }),
566            "must contain self node with quality 1"
567        );
568    }
569
570    #[test]
571    fn channel_graph_has_path() {
572        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
573
574        let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
575        cg.update_channel(c);
576
577        assert!(cg.contains_channel(&c), "must contain channel");
578
579        assert!(cg.contains_node(&ADDRESSES[0]), "must contain channel source");
580
581        assert!(cg.contains_node(&ADDRESSES[1]), "must contain channel destination");
582
583        assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]), "must have simple path");
584
585        assert!(
586            !cg.has_path(&ADDRESSES[0], &ADDRESSES[2]),
587            "must not have non existent path"
588        );
589    }
590
591    #[test]
592    fn channel_graph_update_node_quality() -> anyhow::Result<()> {
593        let mut cg = ChannelGraph::new(
594            ADDRESSES[0],
595            ChannelGraphConfig {
596                node_score_step_up: 0.1,
597                node_score_decay: 4.0,
598                offline_node_score_threshold: 0.1,
599                ..Default::default()
600            },
601        );
602
603        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(100)));
604        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
605        assert_eq!(node.node_score, 0.1);
606        assert_eq!(node.latency.average(), Some(Duration::from_millis(100)));
607
608        assert!(!cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
609
610        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
611        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
612        assert_eq!(node.node_score, 0.2);
613        assert_eq!(node.latency.average(), Some(Duration::from_millis(75)));
614
615        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(30)));
616        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(20)));
617
618        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
619        assert_eq!(node.node_score, 0.4);
620        assert_eq!(node.latency.average(), Some(Duration::from_millis(50)));
621
622        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
623        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
624        assert_eq!(node.node_score, 0.1);
625        assert!(node.latency.average().is_none());
626
627        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
628
629        // At this point the node will be removed because there are no channels with it
630        assert_eq!(cg.get_node(&ADDRESSES[1]), None);
631
632        Ok(())
633    }
634
635    #[test]
636    fn channel_graph_update_node_quality_should_not_remove_nodes_with_zero_quality_and_path() -> anyhow::Result<()> {
637        let mut cg = ChannelGraph::new(
638            ADDRESSES[0],
639            ChannelGraphConfig {
640                node_score_step_up: 0.1,
641                node_score_decay: 4.0,
642                offline_node_score_threshold: 0.1,
643                ..Default::default()
644            },
645        );
646
647        let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
648        cg.update_channel(c);
649
650        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
651        assert_eq!(node.address, ADDRESSES[1]);
652        assert_eq!(node.node_score, 0.0);
653        assert!(node.latency.is_empty());
654
655        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
656
657        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
658        assert_eq!(node.address, ADDRESSES[1]);
659        assert_eq!(node.node_score, 0.1);
660
661        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
662        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
663        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
664
665        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
666        assert_eq!(node.address, ADDRESSES[1]);
667        assert_eq!(node.node_score, 0.4);
668
669        assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
670
671        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
672
673        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
674        assert_eq!(node.address, ADDRESSES[1]);
675        assert_eq!(node.node_score, 0.1);
676
677        assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
678
679        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
680
681        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
682        assert_eq!(node.address, ADDRESSES[1]);
683        assert_eq!(node.node_score, 0.0);
684
685        assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
686
687        Ok(())
688    }
689
690    #[test]
691    fn channel_graph_update_channel_score() -> anyhow::Result<()> {
692        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
693
694        let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
695        cg.update_channel(c);
696
697        assert!(cg.contains_channel(&c), "must contain channel");
698        assert!(
699            cg.get_channel_score(&ADDRESSES[0], &ADDRESSES[1]).is_none(),
700            "must start with no quality info"
701        );
702
703        cg.update_channel_score(&ADDRESSES[0], &ADDRESSES[1], 0.5_f64);
704
705        let q = cg
706            .get_channel_score(&ADDRESSES[0], &ADDRESSES[1])
707            .context("must have quality when set")?;
708        assert_eq!(0.5_f64, q, "quality must be equal");
709
710        Ok(())
711    }
712
713    #[test]
714    fn channel_graph_is_own_channel() {
715        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
716
717        let c1 = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
718        let c2 = dummy_channel(ADDRESSES[1], ADDRESSES[2], ChannelStatus::Open);
719        cg.update_channel(c1);
720        cg.update_channel(c2);
721
722        assert!(cg.is_own_channel(&c1), "must detect as own channel");
723        assert!(!cg.is_own_channel(&c2), "must not detect as own channel");
724    }
725
726    #[test]
727    fn channel_graph_update_changes() -> anyhow::Result<()> {
728        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
729
730        let mut c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
731
732        let changes = cg.update_channel(c);
733        assert!(changes.is_none(), "must not produce changes for a new channel");
734
735        let cr = cg
736            .get_channel(&ADDRESSES[0], &ADDRESSES[1])
737            .context("must contain channel")?;
738        assert!(c.eq(cr), "channels must be equal");
739
740        let ts = SystemTime::now().add(Duration::from_secs(10));
741        c.balance = 0.into();
742        c.status = ChannelStatus::PendingToClose(ts);
743        let changes = cg.update_channel(c).context("should contain channel changes")?;
744        assert_eq!(2, changes.len(), "must contain 2 changes");
745
746        for change in changes {
747            match change {
748                ChannelChange::Status { left, right } => {
749                    assert_eq!(ChannelStatus::Open, left, "previous status does not match");
750                    assert_eq!(ChannelStatus::PendingToClose(ts), right, "new status does not match");
751                }
752                ChannelChange::CurrentBalance { left, right } => {
753                    assert_eq!(HoprBalance::from(1), left, "previous balance does not match");
754                    assert_eq!(HoprBalance::zero(), right, "new balance does not match");
755                }
756                _ => panic!("unexpected change"),
757            }
758        }
759
760        let cr = cg
761            .get_channel(&ADDRESSES[0], &ADDRESSES[1])
762            .context("must contain channel")?;
763        assert!(c.eq(cr), "channels must be equal");
764
765        Ok(())
766    }
767
768    #[test]
769    fn channel_graph_update_changes_on_close() -> anyhow::Result<()> {
770        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
771
772        let ts = SystemTime::now().add(Duration::from_secs(10));
773        let mut c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::PendingToClose(ts));
774
775        let changes = cg.update_channel(c);
776        assert!(changes.is_none(), "must not produce changes for a new channel");
777
778        let cr = cg
779            .get_channel(&ADDRESSES[0], &ADDRESSES[1])
780            .context("must contain channel")?;
781        assert!(c.eq(cr), "channels must be equal");
782
783        c.balance = 0.into();
784        c.status = ChannelStatus::Closed;
785        let changes = cg.update_channel(c).context("must contain changes")?;
786        assert_eq!(2, changes.len(), "must contain 2 changes");
787
788        for change in changes {
789            match change {
790                ChannelChange::Status { left, right } => {
791                    assert_eq!(
792                        ChannelStatus::PendingToClose(ts),
793                        left,
794                        "previous status does not match"
795                    );
796                    assert_eq!(ChannelStatus::Closed, right, "new status does not match");
797                }
798                ChannelChange::CurrentBalance { left, right } => {
799                    assert_eq!(HoprBalance::from(1), left, "previous balance does not match");
800                    assert_eq!(HoprBalance::zero(), right, "new balance does not match");
801                }
802                _ => panic!("unexpected change"),
803            }
804        }
805
806        let cr = cg.get_channel(&ADDRESSES[0], &ADDRESSES[1]);
807        assert!(cr.is_none(), "must not contain channel after closing");
808
809        Ok(())
810    }
811
812    #[test]
813    fn channel_graph_update_should_not_allow_closed_channels() {
814        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
815        let changes = cg.update_channel(dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Closed));
816        assert!(changes.is_none(), "must not produce changes for a closed channel");
817
818        let c = cg.get_channel(&ADDRESSES[0], &ADDRESSES[1]);
819        assert!(c.is_none(), "must not allow adding closed channels");
820    }
821
822    #[test]
823    fn channel_graph_update_should_allow_pending_to_close_channels() -> anyhow::Result<()> {
824        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
825        let ts = SystemTime::now().add(Duration::from_secs(10));
826        let changes = cg.update_channel(dummy_channel(
827            ADDRESSES[0],
828            ADDRESSES[1],
829            ChannelStatus::PendingToClose(ts),
830        ));
831        assert!(changes.is_none(), "must not produce changes for a closed channel");
832
833        cg.get_channel(&ADDRESSES[0], &ADDRESSES[1])
834            .context("must allow PendingToClose channels")?;
835
836        Ok(())
837    }
838}