hopr_path/
channel_graph.rs

1use hopr_internal_types::prelude::*;
2use hopr_primitive_types::primitives::Address;
3use petgraph::algo::has_path_connecting;
4use petgraph::dot::Dot;
5use petgraph::prelude::StableDiGraph;
6use petgraph::stable_graph::NodeIndex;
7use petgraph::visit::{EdgeFiltered, EdgeRef, NodeFiltered};
8use petgraph::Direction;
9use serde::{Deserialize, Serialize};
10use serde_with::serde_as;
11use std::collections::hash_map::Entry;
12use std::collections::HashMap;
13use std::fmt::{Debug, Formatter};
14use std::time::Duration;
15use tracing::{debug, warn};
16
17use hopr_primitive_types::prelude::SMA;
18use hopr_primitive_types::sma::SingleSumSMA;
19#[cfg(all(feature = "prometheus", not(test)))]
20use {
21    hopr_internal_types::channels::ChannelDirection, hopr_metrics::metrics::MultiGauge,
22    hopr_primitive_types::traits::ToHex,
23};
24
25#[cfg(all(feature = "prometheus", not(test)))]
26lazy_static::lazy_static! {
27    static ref METRIC_NUMBER_OF_CHANNELS: MultiGauge = MultiGauge::new(
28        "hopr_channels_count",
29        "Number of channels per direction",
30        &["direction"]
31    ).unwrap();
32    static ref METRIC_CHANNEL_BALANCES: MultiGauge = MultiGauge::new(
33        "hopr_channel_balances",
34        "Balances on channels per counterparty",
35        &["counterparty", "direction"]
36    ).unwrap();
37}
38
39/// Structure that adds additional data to a `ChannelEntry`, which
40/// can be used to compute edge weights and traverse the `ChannelGraph`.
41#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
42pub struct ChannelEdge {
43    /// Underlying channel
44    pub channel: ChannelEntry,
45    /// Optional scoring of the edge that might be used for path planning.
46    pub edge_score: Option<f64>,
47}
48
49impl std::fmt::Display for ChannelEdge {
50    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
51        write!(
52            f,
53            "{}; stake={}; score={:?}; status={};",
54            self.channel,
55            self.channel.balance,
56            self.edge_score,
57            self.channel.status.to_string().to_lowercase()
58        )
59    }
60}
61
62/// Represents a node in the Channel Graph.
63/// This is typically represented by an on-chain address and ping quality, which
64/// represents some kind of node's liveness as perceived by us.
65#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
66pub struct Node {
67    /// Node's on-chain address.
68    pub address: Address,
69    /// Liveness of the node.
70    pub node_score: f64,
71    /// Average node latency
72    pub latency: SingleSumSMA<std::time::Duration, u32>,
73}
74
75impl Node {
76    pub fn new(address: Address, latency_window_length: usize) -> Self {
77        Self {
78            address,
79            node_score: 0.0,
80            latency: SingleSumSMA::new(latency_window_length),
81        }
82    }
83
84    /// Update the score using the [`NodeScoreUpdate`] and [`ChannelGraphConfig`].
85    ///
86    /// The function will ensure additive (slow) ramp-up, but exponential (fast)
87    /// ramp-down of the node's score, depending on whether it was reachable or not.
88    /// The ramp-down has a cut-off at `offline_node_score_threshold`, below which
89    /// is the score set to zero.
90    pub fn update_score(&mut self, score_update: NodeScoreUpdate, cfg: ChannelGraphConfig) -> f64 {
91        match score_update {
92            NodeScoreUpdate::Reachable(latency) => {
93                self.node_score = 1.0_f64.min(self.node_score + cfg.node_score_step_up);
94                self.latency.push(latency);
95            }
96            NodeScoreUpdate::Unreachable => {
97                self.node_score /= cfg.node_score_decay;
98                self.latency.clear();
99                if self.node_score < cfg.offline_node_score_threshold {
100                    self.node_score = 0.0;
101                }
102            }
103            NodeScoreUpdate::Initialize(latency, node_score) => {
104                self.latency.clear();
105                self.latency.push(latency);
106                self.node_score = node_score.clamp(0.0, 1.0);
107            }
108        }
109        self.node_score
110    }
111}
112
113impl std::fmt::Display for Node {
114    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
115        write!(f, "{}; score={}", self.address, self.node_score)
116    }
117}
118
119/// Configuration for the [`ChannelGraph`].
120#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, smart_default::SmartDefault)]
121pub struct ChannelGraphConfig {
122    /// Length of the Simple Moving Average window for node latencies.
123    #[default(20)]
124    pub latency_sma_window_length: usize,
125    /// Additive node score modifier when the node is reachable.
126    #[default(0.1)]
127    pub node_score_step_up: f64,
128    /// Node score divisor when the node is unreachable.
129    #[default(4.0)]
130    pub node_score_decay: f64,
131    /// If a node is unreachable and because of that it reaches a score
132    /// lower than this threshold, it is considered offline (and in some situations
133    /// can be removed from the graph).
134    #[default(0.1)]
135    pub offline_node_score_threshold: f64,
136}
137
138/// Describes an update of the [`Node`]'s score.
139#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
140pub enum NodeScoreUpdate {
141    /// Node is reachable with the given latency.
142    Reachable(Duration),
143    /// Node is unreachable.
144    Unreachable,
145    /// Initializes a node's score to the given latency and quality.
146    /// This is useful during loading data from the persistent storage.
147    Initialize(Duration, f64),
148}
149
150impl<T> From<Result<Duration, T>> for NodeScoreUpdate {
151    fn from(result: Result<Duration, T>) -> Self {
152        match result {
153            Ok(duration) => NodeScoreUpdate::Reachable(duration),
154            Err(_) => NodeScoreUpdate::Unreachable,
155        }
156    }
157}
158
159/// Implements a HOPR payment channel graph (directed) cached in-memory.
160///
161/// This structure is useful for tracking channel state changes and
162/// packet path finding.
163///
164/// The edges are updated only from the Indexer, and therefore the graph contains only
165/// the channels *seen* on-chain.
166/// The nodes and their qualities are updated as they are observed on the network.
167///
168/// Using this structure is much faster than querying the DB and therefore
169/// is preferred for per-packet path-finding computations.
170/// Per default, the graph does not track channels in the `Closed` state and therefore
171/// cannot detect channel re-openings.
172///
173/// When a node reaches zero [quality](Node) and there are no edges (channels) containing this node,
174/// it is removed from the graph entirely.
175#[serde_as]
176#[derive(Clone, Debug, Serialize, Deserialize)]
177pub struct ChannelGraph {
178    me: Address,
179    #[serde_as(as = "Vec<(_, _)>")]
180    indices: HashMap<Address, u32>,
181    graph: StableDiGraph<Node, ChannelEdge>,
182    cfg: ChannelGraphConfig,
183}
184
185impl ChannelGraph {
186    /// The maximum number of intermediate hops the automatic path-finding algorithm will look for.
187    pub const INTERMEDIATE_HOPS: usize = 3;
188
189    /// Creates a new instance with the given self `Address`.
190    pub fn new(me: Address, cfg: ChannelGraphConfig) -> Self {
191        #[cfg(all(feature = "prometheus", not(test)))]
192        {
193            lazy_static::initialize(&METRIC_NUMBER_OF_CHANNELS);
194        }
195
196        let mut ret = Self {
197            me,
198            cfg,
199            indices: HashMap::new(),
200            graph: StableDiGraph::default(),
201        };
202        ret.indices.insert(
203            me,
204            ret.graph
205                .add_node(Node {
206                    address: me,
207                    node_score: 1.0,
208                    latency: SingleSumSMA::new_with_samples(
209                        cfg.latency_sma_window_length,
210                        vec![Duration::ZERO; cfg.latency_sma_window_length],
211                    ),
212                })
213                .index() as u32,
214        );
215        ret
216    }
217
218    /// Number of channels (edges) in the graph.
219    pub fn count_channels(&self) -> usize {
220        self.graph.edge_count()
221    }
222
223    /// Number of nodes in the graph.
224    pub fn count_nodes(&self) -> usize {
225        self.graph.node_count()
226    }
227
228    /// Checks if the channel is incoming to or outgoing from this node
229    pub fn is_own_channel(&self, channel: &ChannelEntry) -> bool {
230        channel.destination == self.me || channel.source == self.me
231    }
232
233    /// Convenience method to get this node's own address
234    pub fn my_address(&self) -> Address {
235        self.me
236    }
237
238    fn get_edge(&self, src: &Address, dst: &Address) -> Option<petgraph::stable_graph::EdgeReference<ChannelEdge>> {
239        let (src_idx, dst_idx) = self
240            .indices
241            .get(src)
242            .and_then(|src| self.indices.get(dst).map(|dst| (*src, *dst)))?;
243        self.graph.edges_connecting(src_idx.into(), dst_idx.into()).next()
244    }
245
246    /// Looks up an `Open` or `PendingToClose` channel given the source and destination.
247    /// Returns `None` if no such edge exists in the graph.
248    pub fn get_channel(&self, source: &Address, destination: &Address) -> Option<&ChannelEntry> {
249        self.get_edge(source, destination).map(|e| &e.weight().channel)
250    }
251
252    /// Gets the node information.
253    /// Returns `None` if no such node exists in the graph.
254    pub fn get_node(&self, node: &Address) -> Option<&Node> {
255        self.indices
256            .get(node)
257            .and_then(|index| self.graph.node_weight((*index).into()))
258    }
259
260    /// Gets all `Open` outgoing channels going from the given [source](Address).
261    pub fn open_channels_from(&self, source: Address) -> impl Iterator<Item = (&Node, &ChannelEdge)> {
262        // If the source does not exist, select an impossible index to result in empty iterator.
263        let idx = self
264            .indices
265            .get(&source)
266            .cloned()
267            .unwrap_or(self.graph.node_count() as u32);
268        self.graph
269            .edges_directed(idx.into(), Direction::Outgoing)
270            .filter(|c| c.weight().channel.status == ChannelStatus::Open)
271            .map(|e| (&self.graph[e.target()], e.weight()))
272    }
273
274    /// Checks whether there is any path via Open channels that connects `source` and `destination`
275    /// This does not need to be necessarily a multi-hop path.
276    pub fn has_path(&self, source: &Address, destination: &Address) -> bool {
277        let only_open_graph = EdgeFiltered::from_fn(&self.graph, |e| e.weight().channel.status == ChannelStatus::Open);
278        if let Some((src_idx, dst_idx)) = self
279            .indices
280            .get(source)
281            .and_then(|src| self.indices.get(destination).map(|dst| (*src, *dst)))
282        {
283            has_path_connecting(&only_open_graph, src_idx.into(), dst_idx.into(), None)
284        } else {
285            false
286        }
287    }
288
289    /// Inserts or updates the given channel in the channel graph.
290    /// Returns a set of changes if the channel was already present in the graphs or
291    /// None if the channel was not previously present in the channel graph.
292    pub fn update_channel(&mut self, channel: ChannelEntry) -> Option<Vec<ChannelChange>> {
293        #[cfg(all(feature = "prometheus", not(test)))]
294        {
295            if let Some(direction) = channel.direction(&self.me) {
296                match direction {
297                    ChannelDirection::Outgoing => match channel.status {
298                        ChannelStatus::Closed => {
299                            METRIC_NUMBER_OF_CHANNELS.decrement(&["out"], 1.0);
300                            METRIC_CHANNEL_BALANCES.set(&[channel.destination.to_hex().as_str(), "out"], 0.0);
301                        }
302                        ChannelStatus::Open => {
303                            METRIC_NUMBER_OF_CHANNELS.increment(&["out"], 1.0);
304                            METRIC_CHANNEL_BALANCES.set(
305                                &[channel.destination.to_hex().as_str(), "out"],
306                                channel
307                                    .balance
308                                    .amount_base_units()
309                                    .parse::<f64>()
310                                    .unwrap_or(f64::INFINITY),
311                            );
312                        }
313                        ChannelStatus::PendingToClose(_) => {}
314                    },
315                    ChannelDirection::Incoming => match channel.status {
316                        ChannelStatus::Closed => {
317                            METRIC_NUMBER_OF_CHANNELS.decrement(&["in"], 1.0);
318                            METRIC_CHANNEL_BALANCES.set(&[channel.source.to_hex().as_str(), "in"], 0.0);
319                        }
320                        ChannelStatus::Open => {
321                            METRIC_NUMBER_OF_CHANNELS.increment(&["in"], 1.0);
322                            METRIC_CHANNEL_BALANCES.set(
323                                &[channel.source.to_hex().as_str(), "in"],
324                                channel
325                                    .balance
326                                    .amount_base_units()
327                                    .parse::<f64>()
328                                    .unwrap_or(f64::INFINITY),
329                            );
330                        }
331                        ChannelStatus::PendingToClose(_) => {}
332                    },
333                }
334            }
335        }
336
337        let maybe_edge_id = self.get_edge(&channel.source, &channel.destination).map(|e| e.id());
338
339        // Remove the edge since we don't allow Closed channels
340        if channel.status == ChannelStatus::Closed {
341            return maybe_edge_id
342                .and_then(|id| self.graph.remove_edge(id))
343                .inspect(|c| debug!("removed {}", c.channel))
344                .map(|old_value| ChannelChange::diff_channels(&old_value.channel, &channel));
345        }
346
347        // If an edge already exists, update it and compute ChannelDiff
348        if let Some(old_value) = maybe_edge_id.and_then(|id| self.graph.edge_weight_mut(id)) {
349            let old_channel = old_value.channel;
350            old_value.channel = channel;
351
352            let ret = ChannelChange::diff_channels(&old_channel, &channel);
353            debug!(
354                "updated {channel}: {}",
355                ret.iter().map(ChannelChange::to_string).collect::<Vec<_>>().join(",")
356            );
357            Some(ret)
358        } else {
359            // Otherwise, create a new edge and add the nodes with 0 quality if they don't yet exist
360            let src = *self.indices.entry(channel.source).or_insert_with(|| {
361                self.graph
362                    .add_node(Node::new(channel.source, self.cfg.latency_sma_window_length))
363                    .index() as u32
364            });
365
366            let dst = *self.indices.entry(channel.destination).or_insert_with(|| {
367                self.graph
368                    .add_node(Node::new(channel.destination, self.cfg.latency_sma_window_length))
369                    .index() as u32
370            });
371
372            let weighted = ChannelEdge {
373                channel,
374                edge_score: None,
375            };
376
377            self.graph.add_edge(src.into(), dst.into(), weighted);
378            debug!("new {channel}");
379
380            None
381        }
382    }
383
384    /// Updates the quality of a node (inserting it into the graph if it does not exist yet),
385    /// based on the given [`NodeScoreUpdate`].
386    pub fn update_node_score(&mut self, address: &Address, score_update: NodeScoreUpdate) {
387        if !self.me.eq(address) {
388            match self.indices.entry(*address) {
389                // The node exists
390                Entry::Occupied(existing) => {
391                    let existing_idx: NodeIndex = (*existing.get()).into();
392                    // NOTE: we cannot remove offline nodes that still have edges,
393                    // as we would lose the ability to track changes on those edges if they
394                    // were removed early.
395                    if score_update != NodeScoreUpdate::Unreachable
396                        || self.graph.neighbors_undirected(existing_idx).count() > 0
397                    {
398                        // We are for sure updating to a greater-than-zero score
399                        if let Some(node) = self.graph.node_weight_mut(existing_idx) {
400                            let updated_quality = node.update_score(score_update, self.cfg);
401                            debug!(%address, updated_quality, "updated node quality");
402                        } else {
403                            // This should not happen
404                            warn!(%address, "removed dangling node index from channel graph");
405                            existing.remove();
406                        }
407                    } else {
408                        // If the node has no neighbors, is unreachable, and reached very low
409                        // score, just remove it from the graph
410                        if self
411                            .graph
412                            .node_weight_mut(existing_idx)
413                            .map(|node| node.update_score(score_update, self.cfg))
414                            .is_some_and(|updated_quality| updated_quality < self.cfg.offline_node_score_threshold)
415                        {
416                            self.graph.remove_node(existing.remove().into());
417                            debug!(%address, "removed offline node with no channels");
418                        }
419                    }
420                }
421                // The node does not exist, and we are updating to greater-than-zero quality
422                Entry::Vacant(new_node) if score_update != NodeScoreUpdate::Unreachable => {
423                    let mut inserted_node = Node::new(*address, self.cfg.latency_sma_window_length);
424                    let updated_quality = inserted_node.update_score(score_update, self.cfg);
425                    new_node.insert(self.graph.add_node(inserted_node).index() as u32);
426                    debug!(%address, updated_quality, "added new node");
427                }
428                // We do not want to add unreachable nodes to the graph, so do nothing otherwise
429                Entry::Vacant(_) => {}
430            }
431        }
432    }
433
434    /// Updates the score value of network connection between `source` and `destination`
435    /// The given score value must always be non-negative.
436    pub fn update_channel_score(&mut self, source: &Address, destination: &Address, score: f64) {
437        assert!(score >= 0_f64, "score must be non-negative");
438        let maybe_edge_id = self.get_edge(source, destination).map(|e| e.id());
439        if let Some(channel) = maybe_edge_id.and_then(|id| self.graph.edge_weight_mut(id)) {
440            if score != channel.edge_score.unwrap_or(-1_f64) {
441                channel.edge_score = Some(score);
442                debug!("updated score of {} to {score}", channel.channel);
443            }
444        }
445    }
446
447    /// Gets quality of the given channel. Returns `None` if no such channel exists, or no
448    /// quality has been set for that channel.
449    pub fn get_channel_score(&self, source: &Address, destination: &Address) -> Option<f64> {
450        self.get_edge(source, destination)
451            .and_then(|e| self.graph.edge_weight(e.id()))
452            .and_then(|e| e.edge_score)
453    }
454
455    /// Checks whether the given channel is in the graph already.
456    pub fn contains_channel(&self, channel: &ChannelEntry) -> bool {
457        self.get_channel(&channel.source, &channel.destination).is_some()
458    }
459
460    /// Checks whether the given node is in the channel graph.
461    pub fn contains_node(&self, address: &Address) -> bool {
462        self.get_node(address).is_some()
463    }
464
465    /// Outputs the channel graph in the DOT (graphviz) format with the given `config`.
466    pub fn as_dot(&self, cfg: GraphExportConfig) -> String {
467        if cfg.ignore_disconnected_components {
468            let only_open_graph =
469                EdgeFiltered::from_fn(&self.graph, |e| e.weight().channel.status == ChannelStatus::Open);
470
471            let me_idx: NodeIndex = (*self.indices.get(&self.me).expect("graph must contain self")).into();
472
473            Dot::new(&NodeFiltered::from_fn(&self.graph, |n| {
474                // Include only nodes that have non-zero quality,
475                // and there is a path to them in an Open channel graph
476                self.graph.node_weight(n).is_some_and(|n| n.node_score > 0_f64)
477                    && has_path_connecting(&only_open_graph, me_idx, n, None)
478            }))
479            .to_string()
480        } else if cfg.ignore_non_opened_channels {
481            // Keep nodes that have at least one incoming or outgoing Open channel
482            Dot::new(&NodeFiltered::from_fn(&self.graph, |a| {
483                self.graph
484                    .edges_directed(a, Direction::Outgoing)
485                    .any(|e| e.weight().channel.status == ChannelStatus::Open)
486                    || self
487                        .graph
488                        .edges_directed(a, Direction::Incoming)
489                        .any(|e| e.weight().channel.status == ChannelStatus::Open)
490            }))
491            .to_string()
492        } else if cfg.only_3_hop_accessible_nodes {
493            // Keep only those nodes that are accessible from via less than 3 hop paths
494            let me_idx: NodeIndex = (*self.indices.get(&self.me).expect("graph must contain self")).into();
495            let distances = petgraph::algo::dijkstra(&self.graph, me_idx, None, |e| {
496                if e.weight().channel.status == ChannelStatus::Open {
497                    1
498                } else {
499                    100
500                }
501            });
502
503            Dot::new(&NodeFiltered::from_fn(&self.graph, |a| {
504                distances.get(&a).map(|d| *d <= 3).unwrap_or(false)
505            }))
506            .to_string()
507        } else {
508            Dot::new(&self.graph).to_string()
509        }
510    }
511}
512
513/// Configuration for the DOT export of the [`ChannelGraph`].
514///
515/// See [`ChannelGraph::as_dot`].
516#[derive(Clone, Debug, Default, Eq, PartialEq)]
517pub struct GraphExportConfig {
518    /// If set, nodes that are not connected to this node (via open channels) will not be exported.
519    ///
520    /// This setting automatically implies `ignore_non_opened_channels`.
521    pub ignore_disconnected_components: bool,
522    /// Do not export channels that are not in the [`ChannelStatus::Open`] state.
523    pub ignore_non_opened_channels: bool,
524    /// Show only nodes that are accessible via 3-hops (via open channels) from this node.
525    pub only_3_hop_accessible_nodes: bool,
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531
532    use crate::channel_graph::ChannelGraph;
533    use anyhow::{anyhow, Context};
534    use hopr_internal_types::channels::{ChannelChange, ChannelEntry, ChannelStatus};
535    use hopr_primitive_types::prelude::*;
536    use lazy_static::lazy_static;
537    use std::ops::Add;
538    use std::str::FromStr;
539    use std::time::{Duration, SystemTime};
540
541    lazy_static! {
542        static ref ADDRESSES: [Address; 6] = [
543            Address::from_str("0xafe8c178cf70d966be0a798e666ce2782c7b2288")
544                .expect("lazy static address should be valid"),
545            Address::from_str("0x1223d5786d9e6799b3297da1ad55605b91e2c882")
546                .expect("lazy static address should be valid"),
547            Address::from_str("0x0e3e60ddced1e33c9647a71f4fc2cf4ed33e4a9d")
548                .expect("lazy static address should be valid"),
549            Address::from_str("0x27644105095c8c10f804109b4d1199a9ac40ed46")
550                .expect("lazy static address should be valid"),
551            Address::from_str("0x4701a288c38fa8a0f4b79127747257af4a03a623")
552                .expect("lazy static address should be valid"),
553            Address::from_str("0xfddd2f462ec709cf181bbe44a7e952487bd4591d")
554                .expect("lazy static address should be valid"),
555        ];
556    }
557
558    fn dummy_channel(src: Address, dst: Address, status: ChannelStatus) -> ChannelEntry {
559        ChannelEntry::new(
560            src,
561            dst,
562            Balance::new_from_str("1", BalanceType::HOPR),
563            1u32.into(),
564            status,
565            1u32.into(),
566        )
567    }
568
569    #[test]
570    fn channel_graph_self_addr() {
571        let cg = ChannelGraph::new(ADDRESSES[0], Default::default());
572        assert_eq!(ADDRESSES[0], cg.my_address(), "must produce correct self address");
573
574        assert!(cg.contains_node(&ADDRESSES[0]), "must contain self address");
575
576        assert_eq!(
577            cg.get_node(&ADDRESSES[0]).cloned(),
578            Some(Node {
579                address: ADDRESSES[0],
580                node_score: 1.0,
581                latency: SingleSumSMA::new_with_samples(
582                    cg.cfg.latency_sma_window_length,
583                    vec![Duration::ZERO; cg.cfg.latency_sma_window_length]
584                )
585            }),
586            "must contain self node with quality 1"
587        );
588    }
589
590    #[test]
591    fn channel_graph_has_path() {
592        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
593
594        let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
595        cg.update_channel(c);
596
597        assert!(cg.contains_channel(&c), "must contain channel");
598
599        assert!(cg.contains_node(&ADDRESSES[0]), "must contain channel source");
600
601        assert!(cg.contains_node(&ADDRESSES[1]), "must contain channel destination");
602
603        assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]), "must have simple path");
604
605        assert!(
606            !cg.has_path(&ADDRESSES[0], &ADDRESSES[2]),
607            "must not have non existent path"
608        );
609    }
610
611    #[test]
612    fn channel_graph_update_node_quality() -> anyhow::Result<()> {
613        let mut cg = ChannelGraph::new(
614            ADDRESSES[0],
615            ChannelGraphConfig {
616                node_score_step_up: 0.1,
617                node_score_decay: 4.0,
618                offline_node_score_threshold: 0.1,
619                ..Default::default()
620            },
621        );
622
623        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(100)));
624        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
625        assert_eq!(node.node_score, 0.1);
626        assert_eq!(node.latency.average(), Some(Duration::from_millis(100)));
627
628        assert!(!cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
629
630        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
631        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
632        assert_eq!(node.node_score, 0.2);
633        assert_eq!(node.latency.average(), Some(Duration::from_millis(75)));
634
635        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(30)));
636        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(20)));
637
638        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
639        assert_eq!(node.node_score, 0.4);
640        assert_eq!(node.latency.average(), Some(Duration::from_millis(50)));
641
642        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
643        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
644        assert_eq!(node.node_score, 0.1);
645        assert!(node.latency.average().is_none());
646
647        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
648
649        // At this point the node will be removed because there are no channels with it
650        assert_eq!(cg.get_node(&ADDRESSES[1]), None);
651
652        Ok(())
653    }
654
655    #[test]
656    fn channel_graph_update_node_quality_should_not_remove_nodes_with_zero_quality_and_path() -> anyhow::Result<()> {
657        let mut cg = ChannelGraph::new(
658            ADDRESSES[0],
659            ChannelGraphConfig {
660                node_score_step_up: 0.1,
661                node_score_decay: 4.0,
662                offline_node_score_threshold: 0.1,
663                ..Default::default()
664            },
665        );
666
667        let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
668        cg.update_channel(c);
669
670        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
671        assert_eq!(node.address, ADDRESSES[1]);
672        assert_eq!(node.node_score, 0.0);
673        assert!(node.latency.is_empty());
674
675        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
676
677        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
678        assert_eq!(node.address, ADDRESSES[1]);
679        assert_eq!(node.node_score, 0.1);
680
681        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
682        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
683        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Reachable(Duration::from_millis(50)));
684
685        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
686        assert_eq!(node.address, ADDRESSES[1]);
687        assert_eq!(node.node_score, 0.4);
688
689        assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
690
691        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
692
693        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
694        assert_eq!(node.address, ADDRESSES[1]);
695        assert_eq!(node.node_score, 0.1);
696
697        assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
698
699        cg.update_node_score(&ADDRESSES[1], NodeScoreUpdate::Unreachable);
700
701        let node = cg.get_node(&ADDRESSES[1]).cloned().ok_or(anyhow!("node must exist"))?;
702        assert_eq!(node.address, ADDRESSES[1]);
703        assert_eq!(node.node_score, 0.0);
704
705        assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
706
707        Ok(())
708    }
709
710    #[test]
711    fn channel_graph_update_channel_score() -> anyhow::Result<()> {
712        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
713
714        let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
715        cg.update_channel(c);
716
717        assert!(cg.contains_channel(&c), "must contain channel");
718        assert!(
719            cg.get_channel_score(&ADDRESSES[0], &ADDRESSES[1]).is_none(),
720            "must start with no quality info"
721        );
722
723        cg.update_channel_score(&ADDRESSES[0], &ADDRESSES[1], 0.5_f64);
724
725        let q = cg
726            .get_channel_score(&ADDRESSES[0], &ADDRESSES[1])
727            .context("must have quality when set")?;
728        assert_eq!(0.5_f64, q, "quality must be equal");
729
730        Ok(())
731    }
732
733    #[test]
734    fn channel_graph_is_own_channel() {
735        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
736
737        let c1 = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
738        let c2 = dummy_channel(ADDRESSES[1], ADDRESSES[2], ChannelStatus::Open);
739        cg.update_channel(c1);
740        cg.update_channel(c2);
741
742        assert!(cg.is_own_channel(&c1), "must detect as own channel");
743        assert!(!cg.is_own_channel(&c2), "must not detect as own channel");
744    }
745
746    #[test]
747    fn channel_graph_update_changes() -> anyhow::Result<()> {
748        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
749
750        let mut c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
751
752        let changes = cg.update_channel(c);
753        assert!(changes.is_none(), "must not produce changes for a new channel");
754
755        let cr = cg
756            .get_channel(&ADDRESSES[0], &ADDRESSES[1])
757            .context("must contain channel")?;
758        assert!(c.eq(cr), "channels must be equal");
759
760        let ts = SystemTime::now().add(Duration::from_secs(10));
761        c.balance = Balance::zero(BalanceType::HOPR);
762        c.status = ChannelStatus::PendingToClose(ts);
763        let changes = cg.update_channel(c).context("should contain channel changes")?;
764        assert_eq!(2, changes.len(), "must contain 2 changes");
765
766        for change in changes {
767            match change {
768                ChannelChange::Status { left, right } => {
769                    assert_eq!(ChannelStatus::Open, left, "previous status does not match");
770                    assert_eq!(ChannelStatus::PendingToClose(ts), right, "new status does not match");
771                }
772                ChannelChange::CurrentBalance { left, right } => {
773                    assert_eq!(
774                        Balance::new(1_u32, BalanceType::HOPR),
775                        left,
776                        "previous balance does not match"
777                    );
778                    assert_eq!(Balance::zero(BalanceType::HOPR), right, "new balance does not match");
779                }
780                _ => panic!("unexpected change"),
781            }
782        }
783
784        let cr = cg
785            .get_channel(&ADDRESSES[0], &ADDRESSES[1])
786            .context("must contain channel")?;
787        assert!(c.eq(cr), "channels must be equal");
788
789        Ok(())
790    }
791
792    #[test]
793    fn channel_graph_update_changes_on_close() -> anyhow::Result<()> {
794        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
795
796        let ts = SystemTime::now().add(Duration::from_secs(10));
797        let mut c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::PendingToClose(ts));
798
799        let changes = cg.update_channel(c);
800        assert!(changes.is_none(), "must not produce changes for a new channel");
801
802        let cr = cg
803            .get_channel(&ADDRESSES[0], &ADDRESSES[1])
804            .context("must contain channel")?;
805        assert!(c.eq(cr), "channels must be equal");
806
807        c.balance = Balance::zero(BalanceType::HOPR);
808        c.status = ChannelStatus::Closed;
809        let changes = cg.update_channel(c).context("must contain changes")?;
810        assert_eq!(2, changes.len(), "must contain 2 changes");
811
812        for change in changes {
813            match change {
814                ChannelChange::Status { left, right } => {
815                    assert_eq!(
816                        ChannelStatus::PendingToClose(ts),
817                        left,
818                        "previous status does not match"
819                    );
820                    assert_eq!(ChannelStatus::Closed, right, "new status does not match");
821                }
822                ChannelChange::CurrentBalance { left, right } => {
823                    assert_eq!(
824                        Balance::new(1_u32, BalanceType::HOPR),
825                        left,
826                        "previous balance does not match"
827                    );
828                    assert_eq!(Balance::zero(BalanceType::HOPR), right, "new balance does not match");
829                }
830                _ => panic!("unexpected change"),
831            }
832        }
833
834        let cr = cg.get_channel(&ADDRESSES[0], &ADDRESSES[1]);
835        assert!(cr.is_none(), "must not contain channel after closing");
836
837        Ok(())
838    }
839
840    #[test]
841    fn channel_graph_update_should_not_allow_closed_channels() {
842        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
843        let changes = cg.update_channel(dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Closed));
844        assert!(changes.is_none(), "must not produce changes for a closed channel");
845
846        let c = cg.get_channel(&ADDRESSES[0], &ADDRESSES[1]);
847        assert!(c.is_none(), "must not allow adding closed channels");
848    }
849
850    #[test]
851    fn channel_graph_update_should_allow_pending_to_close_channels() -> anyhow::Result<()> {
852        let mut cg = ChannelGraph::new(ADDRESSES[0], Default::default());
853        let ts = SystemTime::now().add(Duration::from_secs(10));
854        let changes = cg.update_channel(dummy_channel(
855            ADDRESSES[0],
856            ADDRESSES[1],
857            ChannelStatus::PendingToClose(ts),
858        ));
859        assert!(changes.is_none(), "must not produce changes for a closed channel");
860
861        cg.get_channel(&ADDRESSES[0], &ADDRESSES[1])
862            .context("must allow PendingToClose channels")?;
863
864        Ok(())
865    }
866}