use hopr_internal_types::prelude::*;
use hopr_primitive_types::primitives::Address;
use petgraph::algo::has_path_connecting;
use petgraph::dot::Dot;
use petgraph::prelude::StableDiGraph;
use petgraph::stable_graph::NodeIndex;
use petgraph::visit::{EdgeFiltered, EdgeRef, NodeFiltered};
use petgraph::Direction;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use tracing::{debug, warn};
#[cfg(all(feature = "prometheus", not(test)))]
use {
hopr_internal_types::channels::ChannelDirection, hopr_metrics::metrics::MultiGauge,
hopr_primitive_types::traits::ToHex,
};
#[cfg(all(feature = "prometheus", not(test)))]
lazy_static::lazy_static! {
static ref METRIC_NUMBER_OF_CHANNELS: MultiGauge = MultiGauge::new(
"hopr_channels_count",
"Number of channels per direction",
&["direction"]
).unwrap();
static ref METRIC_CHANNEL_BALANCES: MultiGauge = MultiGauge::new(
"hopr_channel_balances",
"Balances on channels per counterparty",
&["counterparty", "direction"]
).unwrap();
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
pub struct ChannelEdge {
pub channel: ChannelEntry,
pub score: Option<f64>,
}
impl std::fmt::Display for ChannelEdge {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}; stake={}; score={:?}",
self.channel, self.channel.balance, self.score
)
}
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
pub struct Node {
pub address: Address,
pub quality: f64,
}
impl std::fmt::Display for Node {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}; q={}", self.address, self.quality)
}
}
#[serde_as]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChannelGraph {
me: Address,
#[serde_as(as = "Vec<(_, _)>")]
indices: HashMap<Address, u32>,
graph: StableDiGraph<Node, ChannelEdge>,
}
impl ChannelGraph {
pub const INTERMEDIATE_HOPS: usize = 3;
pub fn new(me: Address) -> Self {
#[cfg(all(feature = "prometheus", not(test)))]
{
lazy_static::initialize(&METRIC_NUMBER_OF_CHANNELS);
}
let mut ret = Self {
me,
indices: HashMap::new(),
graph: StableDiGraph::default(),
};
ret.indices.insert(
me,
ret.graph
.add_node(Node {
address: me,
quality: 1.0,
})
.index() as u32,
);
ret
}
pub fn count_channels(&self) -> usize {
self.graph.edge_count()
}
pub fn count_nodes(&self) -> usize {
self.graph.node_count()
}
pub fn is_own_channel(&self, channel: &ChannelEntry) -> bool {
channel.destination == self.me || channel.source == self.me
}
pub fn my_address(&self) -> Address {
self.me
}
fn get_edge(&self, src: &Address, dst: &Address) -> Option<petgraph::stable_graph::EdgeReference<ChannelEdge>> {
let (src_idx, dst_idx) = self
.indices
.get(src)
.and_then(|src| self.indices.get(dst).map(|dst| (*src, *dst)))?;
self.graph.edges_connecting(src_idx.into(), dst_idx.into()).next()
}
pub fn get_channel(&self, source: &Address, destination: &Address) -> Option<&ChannelEntry> {
self.get_edge(source, destination).map(|e| &e.weight().channel)
}
pub fn get_node(&self, node: &Address) -> Option<&Node> {
self.indices
.get(node)
.and_then(|index| self.graph.node_weight((*index).into()))
}
pub fn open_channels_from(&self, source: Address) -> impl Iterator<Item = (&Node, &ChannelEdge)> {
let idx = self
.indices
.get(&source)
.cloned()
.unwrap_or(self.graph.node_count() as u32);
self.graph
.edges_directed(idx.into(), Direction::Outgoing)
.filter(|c| c.weight().channel.status == ChannelStatus::Open)
.map(|e| (&self.graph[e.target()], e.weight()))
}
pub fn has_path(&self, source: &Address, destination: &Address) -> bool {
let only_open_graph = EdgeFiltered::from_fn(&self.graph, |e| e.weight().channel.status == ChannelStatus::Open);
if let Some((src_idx, dst_idx)) = self
.indices
.get(source)
.and_then(|src| self.indices.get(destination).map(|dst| (*src, *dst)))
{
has_path_connecting(&only_open_graph, src_idx.into(), dst_idx.into(), None)
} else {
false
}
}
pub fn update_channel(&mut self, channel: ChannelEntry) -> Option<Vec<ChannelChange>> {
#[cfg(all(feature = "prometheus", not(test)))]
{
if let Some(direction) = channel.direction(&self.me) {
match direction {
ChannelDirection::Outgoing => match channel.status {
ChannelStatus::Closed => {
METRIC_NUMBER_OF_CHANNELS.decrement(&["out"], 1.0);
METRIC_CHANNEL_BALANCES.set(&[channel.destination.to_hex().as_str(), "out"], 0.0);
}
ChannelStatus::Open => {
METRIC_NUMBER_OF_CHANNELS.increment(&["out"], 1.0);
METRIC_CHANNEL_BALANCES.set(
&[channel.destination.to_hex().as_str(), "out"],
channel
.balance
.amount_base_units()
.parse::<f64>()
.unwrap_or(f64::INFINITY),
);
}
ChannelStatus::PendingToClose(_) => {}
},
ChannelDirection::Incoming => match channel.status {
ChannelStatus::Closed => {
METRIC_NUMBER_OF_CHANNELS.decrement(&["in"], 1.0);
METRIC_CHANNEL_BALANCES.set(&[channel.source.to_hex().as_str(), "in"], 0.0);
}
ChannelStatus::Open => {
METRIC_NUMBER_OF_CHANNELS.increment(&["in"], 1.0);
METRIC_CHANNEL_BALANCES.set(
&[channel.source.to_hex().as_str(), "in"],
channel
.balance
.amount_base_units()
.parse::<f64>()
.unwrap_or(f64::INFINITY),
);
}
ChannelStatus::PendingToClose(_) => {}
},
}
}
}
let maybe_edge_id = self.get_edge(&channel.source, &channel.destination).map(|e| e.id());
if channel.status == ChannelStatus::Closed {
return maybe_edge_id
.and_then(|id| self.graph.remove_edge(id))
.inspect(|c| debug!("removed {}", c.channel))
.map(|old_value| ChannelChange::diff_channels(&old_value.channel, &channel));
}
if let Some(old_value) = maybe_edge_id.and_then(|id| self.graph.edge_weight_mut(id)) {
let old_channel = old_value.channel;
old_value.channel = channel;
let ret = ChannelChange::diff_channels(&old_channel, &channel);
debug!(
"updated {channel}: {}",
ret.iter().map(ChannelChange::to_string).collect::<Vec<_>>().join(",")
);
Some(ret)
} else {
let src = *self.indices.entry(channel.source).or_insert_with(|| {
self.graph
.add_node(Node {
address: channel.source,
quality: 0.0,
})
.index() as u32
});
let dst = *self.indices.entry(channel.destination).or_insert_with(|| {
self.graph
.add_node(Node {
address: channel.destination,
quality: 0.0,
})
.index() as u32
});
let weighted = ChannelEdge { channel, score: None };
self.graph.add_edge(src.into(), dst.into(), weighted);
debug!("new {channel}");
None
}
}
pub fn update_node_quality(&mut self, address: &Address, quality: f64) {
assert!(quality >= 0_f64, "quality must be non-negative");
if !self.me.eq(address) {
match self.indices.entry(*address) {
Entry::Occupied(existing) => {
let existing_idx: NodeIndex = (*existing.get()).into();
if quality > 0.0 || self.graph.neighbors_undirected(existing_idx).count() > 0 {
if let Some(node) = self.graph.node_weight_mut(existing_idx) {
node.quality = quality;
debug!("updated quality of {address} to {quality}");
} else {
warn!("removed dangling node {address} in channel graph");
existing.remove();
}
} else {
self.graph.remove_node(existing.remove().into());
debug!("removed solitary node {address} with zero quality");
}
}
Entry::Vacant(new_node) if quality > 0_f64 => {
new_node.insert(
self.graph
.add_node(Node {
address: *address,
quality,
})
.index() as u32,
);
debug!("added node {address} with {quality}");
}
Entry::Vacant(_) => {}
}
}
}
pub fn update_channel_score(&mut self, source: &Address, destination: &Address, score: f64) {
assert!(score >= 0_f64, "quality must be non-negative");
let maybe_edge_id = self.get_edge(source, destination).map(|e| e.id());
if let Some(channel) = maybe_edge_id.and_then(|id| self.graph.edge_weight_mut(id)) {
if score != channel.score.unwrap_or(-1_f64) {
channel.score = Some(score);
debug!("updated score of {} to {score}", channel.channel);
}
}
}
pub fn get_channel_score(&self, source: &Address, destination: &Address) -> Option<f64> {
self.get_edge(source, destination)
.and_then(|e| self.graph.edge_weight(e.id()))
.and_then(|e| e.score)
}
pub fn contains_channel(&self, channel: &ChannelEntry) -> bool {
self.get_channel(&channel.source, &channel.destination).is_some()
}
pub fn contains_node(&self, address: &Address) -> bool {
self.get_node(address).is_some()
}
pub fn as_dot(&self, cfg: GraphExportConfig) -> String {
if cfg.ignore_disconnected_components {
let only_open_graph =
EdgeFiltered::from_fn(&self.graph, |e| e.weight().channel.status == ChannelStatus::Open);
let me_idx: NodeIndex = (*self.indices.get(&self.me).expect("graph must contain self")).into();
Dot::new(&NodeFiltered::from_fn(&self.graph, |n| {
self.graph.node_weight(n).is_some_and(|n| n.quality > 0_f64)
&& has_path_connecting(&only_open_graph, me_idx, n, None)
}))
.to_string()
} else if cfg.ignore_non_opened_channels {
Dot::new(&NodeFiltered::from_fn(&self.graph, |a| {
self.graph
.edges_directed(a, Direction::Outgoing)
.any(|e| e.weight().channel.status == ChannelStatus::Open)
|| self
.graph
.edges_directed(a, Direction::Incoming)
.any(|e| e.weight().channel.status == ChannelStatus::Open)
}))
.to_string()
} else {
Dot::new(&self.graph).to_string()
}
}
}
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct GraphExportConfig {
pub ignore_disconnected_components: bool,
pub ignore_non_opened_channels: bool,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::channel_graph::ChannelGraph;
use anyhow::Context;
use hopr_internal_types::channels::{ChannelChange, ChannelEntry, ChannelStatus};
use hopr_primitive_types::prelude::*;
use lazy_static::lazy_static;
use std::ops::Add;
use std::str::FromStr;
use std::time::{Duration, SystemTime};
lazy_static! {
static ref ADDRESSES: [Address; 6] = [
Address::from_str("0xafe8c178cf70d966be0a798e666ce2782c7b2288")
.expect("lazy static address should be valid"),
Address::from_str("0x1223d5786d9e6799b3297da1ad55605b91e2c882")
.expect("lazy static address should be valid"),
Address::from_str("0x0e3e60ddced1e33c9647a71f4fc2cf4ed33e4a9d")
.expect("lazy static address should be valid"),
Address::from_str("0x27644105095c8c10f804109b4d1199a9ac40ed46")
.expect("lazy static address should be valid"),
Address::from_str("0x4701a288c38fa8a0f4b79127747257af4a03a623")
.expect("lazy static address should be valid"),
Address::from_str("0xfddd2f462ec709cf181bbe44a7e952487bd4591d")
.expect("lazy static address should be valid"),
];
}
fn dummy_channel(src: Address, dst: Address, status: ChannelStatus) -> ChannelEntry {
ChannelEntry::new(
src,
dst,
Balance::new_from_str("1", BalanceType::HOPR),
1u32.into(),
status,
1u32.into(),
)
}
#[test]
fn channel_graph_self_addr() {
let cg = ChannelGraph::new(ADDRESSES[0]);
assert_eq!(ADDRESSES[0], cg.my_address(), "must produce correct self address");
assert!(cg.contains_node(&ADDRESSES[0]), "must contain self address");
assert_eq!(
cg.get_node(&ADDRESSES[0]).cloned(),
Some(Node {
address: ADDRESSES[0],
quality: 1.0
}),
"must contain self node with quality 1"
);
}
#[test]
fn channel_graph_has_path() {
let mut cg = ChannelGraph::new(ADDRESSES[0]);
let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
cg.update_channel(c);
assert!(cg.contains_channel(&c), "must contain channel");
assert!(cg.contains_node(&ADDRESSES[0]), "must contain channel source");
assert!(cg.contains_node(&ADDRESSES[1]), "must contain channel destination");
assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]), "must have simple path");
assert!(
!cg.has_path(&ADDRESSES[0], &ADDRESSES[2]),
"must not have non existent path"
);
}
#[test]
fn channel_graph_update_node_quality() {
let mut cg = ChannelGraph::new(ADDRESSES[0]);
cg.update_node_quality(&ADDRESSES[1], 0.5);
assert_eq!(
cg.get_node(&ADDRESSES[1]).cloned(),
Some(Node {
address: ADDRESSES[1],
quality: 0.5
})
);
cg.update_node_quality(&ADDRESSES[1], 0.3);
assert_eq!(
cg.get_node(&ADDRESSES[1]).cloned(),
Some(Node {
address: ADDRESSES[1],
quality: 0.3
})
);
assert!(!cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
cg.update_node_quality(&ADDRESSES[1], 0.0);
assert_eq!(cg.get_node(&ADDRESSES[1]), None);
}
#[test]
fn channel_graph_update_node_quality_should_not_remove_nodes_with_zero_quality_and_path() {
let mut cg = ChannelGraph::new(ADDRESSES[0]);
let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
cg.update_channel(c);
assert_eq!(
cg.get_node(&ADDRESSES[1]).cloned(),
Some(Node {
address: ADDRESSES[1],
quality: 0.0
})
);
cg.update_node_quality(&ADDRESSES[1], 0.5);
assert_eq!(
cg.get_node(&ADDRESSES[1]).cloned(),
Some(Node {
address: ADDRESSES[1],
quality: 0.5
})
);
cg.update_node_quality(&ADDRESSES[1], 0.3);
assert_eq!(
cg.get_node(&ADDRESSES[1]).cloned(),
Some(Node {
address: ADDRESSES[1],
quality: 0.3
})
);
assert!(cg.has_path(&ADDRESSES[0], &ADDRESSES[1]));
cg.update_node_quality(&ADDRESSES[1], 0.0);
assert_eq!(
cg.get_node(&ADDRESSES[1]).cloned(),
Some(Node {
address: ADDRESSES[1],
quality: 0.0
})
);
}
#[test]
fn channel_graph_update_channel_score() -> anyhow::Result<()> {
let mut cg = ChannelGraph::new(ADDRESSES[0]);
let c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
cg.update_channel(c);
assert!(cg.contains_channel(&c), "must contain channel");
assert!(
cg.get_channel_score(&ADDRESSES[0], &ADDRESSES[1]).is_none(),
"must start with no quality info"
);
cg.update_channel_score(&ADDRESSES[0], &ADDRESSES[1], 0.5_f64);
let q = cg
.get_channel_score(&ADDRESSES[0], &ADDRESSES[1])
.context("must have quality when set")?;
assert_eq!(0.5_f64, q, "quality must be equal");
Ok(())
}
#[test]
fn channel_graph_is_own_channel() {
let mut cg = ChannelGraph::new(ADDRESSES[0]);
let c1 = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
let c2 = dummy_channel(ADDRESSES[1], ADDRESSES[2], ChannelStatus::Open);
cg.update_channel(c1);
cg.update_channel(c2);
assert!(cg.is_own_channel(&c1), "must detect as own channel");
assert!(!cg.is_own_channel(&c2), "must not detect as own channel");
}
#[test]
fn channel_graph_update_changes() -> anyhow::Result<()> {
let mut cg = ChannelGraph::new(ADDRESSES[0]);
let mut c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Open);
let changes = cg.update_channel(c);
assert!(changes.is_none(), "must not produce changes for a new channel");
let cr = cg
.get_channel(&ADDRESSES[0], &ADDRESSES[1])
.context("must contain channel")?;
assert!(c.eq(cr), "channels must be equal");
let ts = SystemTime::now().add(Duration::from_secs(10));
c.balance = Balance::zero(BalanceType::HOPR);
c.status = ChannelStatus::PendingToClose(ts);
let changes = cg.update_channel(c).context("should contain channel changes")?;
assert_eq!(2, changes.len(), "must contain 2 changes");
for change in changes {
match change {
ChannelChange::Status { left, right } => {
assert_eq!(ChannelStatus::Open, left, "previous status does not match");
assert_eq!(ChannelStatus::PendingToClose(ts), right, "new status does not match");
}
ChannelChange::CurrentBalance { left, right } => {
assert_eq!(
Balance::new(1_u32, BalanceType::HOPR),
left,
"previous balance does not match"
);
assert_eq!(Balance::zero(BalanceType::HOPR), right, "new balance does not match");
}
_ => panic!("unexpected change"),
}
}
let cr = cg
.get_channel(&ADDRESSES[0], &ADDRESSES[1])
.context("must contain channel")?;
assert!(c.eq(cr), "channels must be equal");
Ok(())
}
#[test]
fn channel_graph_update_changes_on_close() -> anyhow::Result<()> {
let mut cg = ChannelGraph::new(ADDRESSES[0]);
let ts = SystemTime::now().add(Duration::from_secs(10));
let mut c = dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::PendingToClose(ts));
let changes = cg.update_channel(c);
assert!(changes.is_none(), "must not produce changes for a new channel");
let cr = cg
.get_channel(&ADDRESSES[0], &ADDRESSES[1])
.context("must contain channel")?;
assert!(c.eq(cr), "channels must be equal");
c.balance = Balance::zero(BalanceType::HOPR);
c.status = ChannelStatus::Closed;
let changes = cg.update_channel(c).context("must contain changes")?;
assert_eq!(2, changes.len(), "must contain 2 changes");
for change in changes {
match change {
ChannelChange::Status { left, right } => {
assert_eq!(
ChannelStatus::PendingToClose(ts),
left,
"previous status does not match"
);
assert_eq!(ChannelStatus::Closed, right, "new status does not match");
}
ChannelChange::CurrentBalance { left, right } => {
assert_eq!(
Balance::new(1_u32, BalanceType::HOPR),
left,
"previous balance does not match"
);
assert_eq!(Balance::zero(BalanceType::HOPR), right, "new balance does not match");
}
_ => panic!("unexpected change"),
}
}
let cr = cg.get_channel(&ADDRESSES[0], &ADDRESSES[1]);
assert!(cr.is_none(), "must not contain channel after closing");
Ok(())
}
#[test]
fn channel_graph_update_should_not_allow_closed_channels() {
let mut cg = ChannelGraph::new(ADDRESSES[0]);
let changes = cg.update_channel(dummy_channel(ADDRESSES[0], ADDRESSES[1], ChannelStatus::Closed));
assert!(changes.is_none(), "must not produce changes for a closed channel");
let c = cg.get_channel(&ADDRESSES[0], &ADDRESSES[1]);
assert!(c.is_none(), "must not allow adding closed channels");
}
#[test]
fn channel_graph_update_should_allow_pending_to_close_channels() -> anyhow::Result<()> {
let mut cg = ChannelGraph::new(ADDRESSES[0]);
let ts = SystemTime::now().add(Duration::from_secs(10));
let changes = cg.update_channel(dummy_channel(
ADDRESSES[0],
ADDRESSES[1],
ChannelStatus::PendingToClose(ts),
));
assert!(changes.is_none(), "must not produce changes for a closed channel");
cg.get_channel(&ADDRESSES[0], &ADDRESSES[1])
.context("must allow PendingToClose channels")?;
Ok(())
}
}