use hopr_internal_types::prelude::*;
use hopr_primitive_types::prelude::*;
use std::collections::HashMap;
use tracing::{debug, error, info, warn};
use async_lock::RwLock;
use async_trait::async_trait;
use futures::StreamExt;
use hopr_chain_actions::channels::ChannelActions;
use hopr_db_sql::api::peers::PeerSelector;
use hopr_db_sql::errors::DbSqlError;
use hopr_db_sql::HoprDbAllOperations;
use rand::seq::SliceRandom;
use semver::Version;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::fmt::{Debug, Display, Formatter};
use std::ops::Sub;
use std::str::FromStr;
use validator::Validate;
use crate::errors::Result;
use crate::errors::StrategyError::CriteriaNotSatisfied;
use crate::strategy::SingularStrategy;
use crate::Strategy;
#[cfg(all(feature = "prometheus", not(test)))]
use hopr_metrics::metrics::{SimpleCounter, SimpleGauge};
#[cfg(all(feature = "prometheus", not(test)))]
lazy_static::lazy_static! {
static ref METRIC_COUNT_OPENS: SimpleCounter =
SimpleCounter::new("hopr_strategy_promiscuous_opened_channels_count", "Count of open channel decisions").unwrap();
static ref METRIC_COUNT_CLOSURES: SimpleCounter =
SimpleCounter::new("hopr_strategy_promiscuous_closed_channels_count", "Count of close channel decisions").unwrap();
static ref METRIC_MAX_AUTO_CHANNELS: SimpleGauge =
SimpleGauge::new("hopr_strategy_promiscuous_max_auto_channels", "Count of maximum number of channels managed by the strategy").unwrap();
}
#[derive(Clone, Debug, PartialEq, Default)]
struct ChannelDecision {
to_close: Vec<ChannelEntry>,
to_open: Vec<(Address, Balance)>,
}
impl ChannelDecision {
pub fn will_channel_be_closed(&self, counter_party: &Address) -> bool {
self.to_close.iter().any(|c| &c.destination == counter_party)
}
pub fn will_address_be_opened(&self, address: &Address) -> bool {
self.to_open.iter().any(|(addr, _)| addr == address)
}
pub fn add_to_close(&mut self, entry: ChannelEntry) {
self.to_close.push(entry);
}
pub fn add_to_open(&mut self, address: Address, balance: Balance) {
self.to_open.push((address, balance));
}
pub fn get_to_close(&self) -> &Vec<ChannelEntry> {
&self.to_close
}
pub fn get_to_open(&self) -> &Vec<(Address, Balance)> {
&self.to_open
}
}
impl Display for ChannelDecision {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"channel decision: opening ({}), closing({})",
self.to_open.len(),
self.to_close.len()
)
}
}
#[serde_as]
#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
pub struct PromiscuousStrategyConfig {
#[validate(range(min = 0_f64, max = 1.0_f64))]
#[default = 0.5]
pub network_quality_threshold: f64,
#[validate(range(min = 1_u32))]
#[default = 10]
pub min_network_size_samples: u32,
#[serde_as(as = "DisplayFromStr")]
#[default(Balance::new_from_str("10000000000000000000", BalanceType::HOPR))]
pub new_channel_stake: Balance,
#[serde_as(as = "DisplayFromStr")]
#[default(Balance::new_from_str("10000000000000000000", BalanceType::HOPR))]
pub minimum_node_balance: Balance,
#[validate(range(min = 1))]
pub max_channels: Option<usize>,
#[default = true]
pub enforce_max_channels: bool,
#[serde_as(as = "DisplayFromStr")]
#[default(">=2.0.0".parse().expect("should be valid default version"))]
pub minimum_peer_version: semver::VersionReq,
}
pub struct PromiscuousStrategy<Db, A>
where
Db: HoprDbAllOperations + Clone,
A: ChannelActions,
{
db: Db,
hopr_chain_actions: A,
cfg: PromiscuousStrategyConfig,
sma: RwLock<SingleSumSMA<u32>>,
}
impl<Db, A> PromiscuousStrategy<Db, A>
where
Db: HoprDbAllOperations + Clone,
A: ChannelActions,
{
pub fn new(cfg: PromiscuousStrategyConfig, db: Db, hopr_chain_actions: A) -> Self {
Self {
db,
hopr_chain_actions,
sma: RwLock::new(SingleSumSMA::new(cfg.min_network_size_samples as usize)),
cfg,
}
}
async fn sample_size_and_evaluate_avg(&self, sample: u32) -> Option<u32> {
self.sma.write().await.push(sample);
info!(peers = ?sample, "evaluated qualities of peers seen in the network");
let sma = self.sma.read().await;
if sma.len() >= sma.window_size() {
sma.average()
} else {
info!(
count = sma.len(), window_size = %sma.window_size(),
"not yet enough samples of network size to perform a strategy tick, skipping",
);
None
}
}
async fn get_peers_with_quality(&self) -> Result<HashMap<Address, f64>> {
Ok(self
.db
.get_network_peers(PeerSelector::default(), false)
.await?
.filter_map(|status| async move {
if let Some(version) = status.peer_version.clone().and_then(|v| {
semver::Version::from_str(&v)
.ok() .map(|v| Version::new(v.major, v.major, v.patch))
}) {
if self.cfg.minimum_peer_version.matches(&version) {
if let Ok(addr) = self
.db
.resolve_chain_key(&status.id.0)
.await
.and_then(|addr| addr.ok_or(DbSqlError::MissingAccount.into()))
{
Some((addr, status.get_average_quality()))
} else {
error!(address = %status.id.1, "could not find on-chain address");
None
}
} else {
debug!(peer = %status.id.1, ?version, "version of peer does not match the expectation");
None
}
} else {
error!(peer = %status.id.1, "cannot get version");
None
}
})
.collect()
.await)
}
async fn collect_tick_decision(&self) -> Result<ChannelDecision> {
let mut tick_decision = ChannelDecision::default();
let mut new_channel_candidates: Vec<(Address, f64)> = Vec::new();
let outgoing_open_channels = self
.db
.get_outgoing_channels(None)
.await
.map_err(hopr_db_sql::api::errors::DbError::from)?
.into_iter()
.filter(|channel| channel.status == ChannelStatus::Open)
.collect::<Vec<_>>();
debug!("tracking {} open outgoing channels", outgoing_open_channels.len());
let peers_with_quality = self.get_peers_with_quality().await?;
let current_average_network_size =
match self.sample_size_and_evaluate_avg(peers_with_quality.len() as u32).await {
Some(avg) => avg,
None => return Err(CriteriaNotSatisfied), };
for (address, quality) in peers_with_quality.iter() {
let channel_with_peer = outgoing_open_channels.iter().find(|c| c.destination.eq(address));
if let Some(channel) = channel_with_peer {
if *quality <= self.cfg.network_quality_threshold {
debug!(
"closure of channel to {}: {quality} <= {}",
channel.destination, self.cfg.network_quality_threshold
);
tick_decision.add_to_close(*channel);
}
} else if *quality >= self.cfg.network_quality_threshold {
new_channel_candidates.push((*address, *quality));
}
}
debug!(
"proposed closures: {}, proposed new candidates: {}",
tick_decision.get_to_close().len(),
new_channel_candidates.len()
);
let max_auto_channels = self
.cfg
.max_channels
.unwrap_or((current_average_network_size as f64).sqrt().ceil() as usize);
debug!("current upper bound for maximum number of auto-channels is {max_auto_channels}");
#[cfg(all(feature = "prometheus", not(test)))]
METRIC_MAX_AUTO_CHANNELS.set(max_auto_channels as f64);
let occupied = outgoing_open_channels
.len()
.saturating_sub(tick_decision.get_to_close().len());
if occupied > max_auto_channels && self.cfg.enforce_max_channels {
warn!(
count = occupied,
max_auto_channels, "the strategy allows only less occupied channels"
);
let mut sorted_channels = outgoing_open_channels
.iter()
.filter(|c| !tick_decision.will_channel_be_closed(&c.destination))
.collect::<Vec<_>>();
sorted_channels.sort_unstable_by(|p1, p2| {
let q1 = match peers_with_quality.get(&p1.destination) {
Some(q) => *q,
None => {
error!(channel = ?p1, "could not determine peer quality");
0_f64
}
};
let q2 = match peers_with_quality.get(&p2.destination) {
Some(q) => *q,
None => {
error!(peer = %p2, "could not determine peer quality");
0_f64
}
};
q1.partial_cmp(&q2).expect("invalid comparison")
});
sorted_channels
.into_iter()
.take(occupied - max_auto_channels)
.for_each(|channel| {
debug!("enforcing channel closure of {channel}");
tick_decision.add_to_close(*channel);
});
} else if max_auto_channels > occupied {
new_channel_candidates.shuffle(&mut hopr_crypto_random::rng());
new_channel_candidates
.sort_unstable_by(|(_, q1), (_, q2)| q1.partial_cmp(q2).expect("should be comparable").reverse());
new_channel_candidates.truncate(max_auto_channels - occupied);
debug!("got {} new channel candidates", new_channel_candidates.len());
let mut remaining_balance = self
.db
.get_safe_hopr_balance(None)
.await
.map_err(hopr_db_sql::api::errors::DbError::from)?;
for (address, _) in new_channel_candidates {
if remaining_balance.le(&self.cfg.minimum_node_balance) {
warn!(%remaining_balance, "node ran out of allowed node balance");
break;
}
if !tick_decision.will_address_be_opened(&address) {
tick_decision.add_to_open(address, self.cfg.new_channel_stake);
remaining_balance = remaining_balance.sub(&self.cfg.new_channel_stake);
debug!(%address, "promoted for channel opening");
}
}
} else {
info!(
count = occupied,
"not going to allocate new channels, maximum number of effective channels is reached"
)
}
Ok(tick_decision)
}
}
impl<Db, A> Debug for PromiscuousStrategy<Db, A>
where
Db: HoprDbAllOperations + Clone,
A: ChannelActions,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", Strategy::Promiscuous(self.cfg.clone()))
}
}
impl<Db, A> Display for PromiscuousStrategy<Db, A>
where
Db: HoprDbAllOperations + Clone,
A: ChannelActions,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", Strategy::Promiscuous(self.cfg.clone()))
}
}
#[async_trait]
impl<Db, A> SingularStrategy for PromiscuousStrategy<Db, A>
where
Db: HoprDbAllOperations + Clone + Send + Sync,
A: ChannelActions + Send + Sync,
{
async fn on_tick(&self) -> Result<()> {
let tick_decision = self.collect_tick_decision().await?;
debug!("on tick executing {tick_decision}");
for channel_to_close in tick_decision.get_to_close() {
match self
.hopr_chain_actions
.close_channel(
channel_to_close.destination,
ChannelDirection::Outgoing,
false, )
.await
{
Ok(_) => {
debug!("issued channel closing tx: {}", channel_to_close);
}
Err(e) => {
error!(error = %e, "error while closing channel");
}
}
}
for channel_to_open in tick_decision.get_to_open() {
match self
.hopr_chain_actions
.open_channel(channel_to_open.0, channel_to_open.1)
.await
{
Ok(_) => {
debug!("issued channel opening tx: {}", channel_to_open.0);
}
Err(e) => {
error!(error = %e, channel = %channel_to_open.0, "error while issuing channel opening");
}
}
}
#[cfg(all(feature = "prometheus", not(test)))]
{
METRIC_COUNT_OPENS.increment_by(tick_decision.get_to_open().len() as u64);
METRIC_COUNT_CLOSURES.increment_by(tick_decision.get_to_close().len() as u64);
}
info!(%tick_decision, "on tick executed");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Context;
use futures::{future::ok, FutureExt};
use hex_literal::hex;
use hopr_chain_actions::action_queue::{ActionConfirmation, PendingAction};
use hopr_chain_types::actions::Action;
use hopr_chain_types::chain_events::ChainEventType;
use hopr_crypto_random::random_bytes;
use hopr_crypto_types::prelude::*;
use hopr_db_sql::accounts::HoprDbAccountOperations;
use hopr_db_sql::api::peers::HoprDbPeersOperations;
use hopr_db_sql::channels::HoprDbChannelOperations;
use hopr_db_sql::db::HoprDb;
use hopr_db_sql::info::HoprDbInfoOperations;
use hopr_db_sql::HoprDbGeneralModelOperations;
use hopr_transport_network::{network::PeerOrigin, PeerId};
use lazy_static::lazy_static;
use mockall::mock;
lazy_static! {
static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!(
"492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
))
.expect("lazy static keypair should be valid");
static ref PEERS: [(Address, PeerId); 10] = [
(
ALICE.public().to_address().into(),
hex!("e03640d3184c8aa6f9d4ccd533281c51974a170c0c4d0fe1da9296a081ab1fd9")
),
(
hex!("5f98dc63889681eb4306f0e3b5ee2e04b13af7c8"),
hex!("82a3cec1660697d8f3eb798f82ae281fc885c3e5370ef700c95c17397846c1e7")
),
(
hex!("6e0bed94a8d2da952ad4468ff81157b6137a5566"),
hex!("2b93fcca9db2c5c12d1add5c07dd81d20c68eb713e99aa5c488210179c7505e3")
),
(
hex!("8275b9ce8a3d2fe14029111f85b72ab05aa0f5d3"),
hex!("5cfd16dc160fd43396bfaff06e7c2e62cd087317671c159ce7cbc31c34fc32b6")
),
(
hex!("3231673fd10c9ebeb9330745f1709c91db9cf40f"),
hex!("7f5b421cc58cf8449f5565756697261723fb96bba5f0aa2ba83c4973e0e994bf")
),
(
hex!("585f4ca77b07ac7a3bf37de3069b641ba97bf76f"),
hex!("848af931ce57f54fbf96d7250eda8b0f36e3d1988ec8048c892e8d8ff0798f2f")
),
(
hex!("ba413645edb6ddbd46d5911466264b119087dfea"),
hex!("d79258fc521dba8ded208066fe98fd8a857cf2e8f42f1b71c8f6e29b8f47e406")
),
(
hex!("9ea8c0f3766022f84c41abd524c942971bd22d23"),
hex!("cd7a06caebcb90f95690c72472127cae8732b415440a1783c6ff9f9cb0bacf1e")
),
(
hex!("9790b6cf8afe6a7d80102570fac18a322e26ef83"),
hex!("2dc3ff226be59333127ebfd3c79517eac8f81e0333abaa45189aae309880e55a")
),
(
hex!("f6ab491cd4e2eccbe60a7f87aeaacfc408dabde8"),
hex!("5826ed44f52b3a26c472621812165bb2d3e60a9929e06db8b8df4e4d23068eba")
),
]
.map(|(addr, privkey)| (
addr.into(),
OffchainKeypair::from_secret(&privkey)
.expect("lazy static keypair should be valid")
.public()
.into()
));
}
mock! {
ChannelAct { }
#[async_trait]
impl ChannelActions for ChannelAct {
async fn open_channel(&self, destination: Address, amount: Balance) -> hopr_chain_actions::errors::Result<PendingAction>;
async fn fund_channel(&self, channel_id: Hash, amount: Balance) -> hopr_chain_actions::errors::Result<PendingAction>;
async fn close_channel(
&self,
counterparty: Address,
direction: ChannelDirection,
redeem_before_close: bool,
) -> hopr_chain_actions::errors::Result<PendingAction>;
}
}
async fn mock_channel(db: HoprDb, dst: Address, balance: Balance) -> anyhow::Result<ChannelEntry> {
let channel = ChannelEntry::new(
PEERS[0].0,
dst,
balance,
U256::zero(),
ChannelStatus::Open,
U256::zero(),
);
db.upsert_channel(None, channel).await?;
Ok(channel)
}
async fn prepare_network(db: HoprDb, qualities: Vec<f64>) -> anyhow::Result<()> {
assert_eq!(qualities.len(), PEERS.len() - 1, "invalid network setup");
for (i, quality) in qualities.into_iter().enumerate() {
let peer = &PEERS[i + 1].1;
db.add_network_peer(peer, PeerOrigin::Initialization, vec![], 0.0, 10)
.await?;
let mut status = db.get_network_peer(peer).await?.expect("should be present");
status.peer_version = Some("2.0.0".into());
while status.get_average_quality() < quality {
status.update_quality(quality);
}
db.update_network_peer(status).await?;
}
Ok(())
}
async fn init_db(db: HoprDb, node_balance: Balance) -> anyhow::Result<()> {
db.begin_transaction()
.await?
.perform(|tx| {
Box::pin(async move {
db.set_safe_hopr_balance(Some(tx), node_balance).await?;
db.set_safe_hopr_allowance(Some(tx), node_balance).await?;
for (chain_key, peer_id) in PEERS.iter() {
db.insert_account(
Some(tx),
AccountEntry::new(
OffchainPublicKey::try_from(*peer_id).expect("should be valid PeerId"),
*chain_key,
AccountType::NotAnnounced,
),
)
.await?;
}
Ok::<_, DbSqlError>(())
})
})
.await?;
Ok(())
}
fn mock_action_confirmation_closure(channel: ChannelEntry) -> ActionConfirmation {
let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
ActionConfirmation {
tx_hash: random_hash,
event: Some(ChainEventType::ChannelClosureInitiated(channel)),
action: Action::CloseChannel(channel, ChannelDirection::Outgoing),
}
}
fn mock_action_confirmation_opening(address: Address, balance: Balance) -> ActionConfirmation {
let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
ActionConfirmation {
tx_hash: random_hash,
event: Some(ChainEventType::ChannelOpened(ChannelEntry::new(
PEERS[0].0,
address,
balance,
U256::zero(),
ChannelStatus::Open,
U256::zero(),
))),
action: Action::OpenChannel(address, balance),
}
}
#[test]
fn test_semver() -> anyhow::Result<()> {
let ver: semver::Version = "2.1.0-rc.3+commit.f75bc6c8".parse()?;
let stripped = semver::Version::new(ver.major, ver.minor, ver.patch);
let req = semver::VersionReq::from_str(">=2.0.0")?;
assert!(req.matches(&stripped), "constraint must match");
Ok(())
}
#[async_std::test]
async fn test_promiscuous_strategy_tick_decisions() -> anyhow::Result<()> {
let db = HoprDb::new_in_memory(ALICE.clone()).await?;
let qualities_that_alice_sees = vec![0.7, 0.9, 0.8, 0.98, 0.1, 0.3, 0.1, 0.2, 1.0];
let balance = Balance::new_from_str("100000000000000000000", BalanceType::HOPR);
init_db(db.clone(), balance).await?;
prepare_network(db.clone(), qualities_that_alice_sees).await?;
mock_channel(db.clone(), PEERS[1].0, balance).await?;
mock_channel(db.clone(), PEERS[2].0, balance).await?;
let for_closing = mock_channel(db.clone(), PEERS[5].0, balance).await?;
let mut status_3 = db
.get_network_peer(&PEERS[3].1)
.await?
.context("peer should be present")?;
status_3.peer_version = Some("2.1.0-rc.3+commit.f75bc6c8".into());
db.update_network_peer(status_3).await?;
let mut status_10 = db
.get_network_peer(&PEERS[9].1)
.await?
.context("peer should be present")?;
status_10.peer_version = Some("1.92.0".into());
db.update_network_peer(status_10).await?;
let mut strat_cfg = PromiscuousStrategyConfig::default();
strat_cfg.max_channels = Some(3); strat_cfg.network_quality_threshold = 0.5;
let mut actions = MockChannelAct::new();
actions
.expect_close_channel()
.times(1)
.withf(|dst, dir, _| PEERS[5].0.eq(dst) && ChannelDirection::Outgoing.eq(dir))
.return_once(move |_, _, _| Ok(ok(mock_action_confirmation_closure(for_closing)).boxed()));
let new_stake = strat_cfg.new_channel_stake;
actions
.expect_open_channel()
.times(1)
.withf(move |dst, b| PEERS[4].0.eq(dst) && new_stake.eq(b))
.return_once(move |_, _| Ok(ok(mock_action_confirmation_opening(PEERS[4].0, new_stake)).boxed()));
let strat = PromiscuousStrategy::new(strat_cfg.clone(), db, actions);
for _ in 0..strat_cfg.min_network_size_samples - 1 {
strat
.on_tick()
.await
.expect_err("on tick should fail when criteria are not met");
}
strat.on_tick().await?;
Ok(())
}
}