use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::{
fmt::{Debug, Display, Formatter},
sync::Arc,
};
use tracing::{error, warn};
use validator::Validate;
use hopr_chain_actions::ChainActions;
use hopr_internal_types::prelude::*;
use hopr_transport_protocol::ticket_aggregation::processor::TicketAggregatorTrait;
use crate::aggregating::AggregatingStrategy;
use crate::auto_funding::AutoFundingStrategy;
use crate::auto_redeeming::AutoRedeemingStrategy;
use crate::channel_finalizer::ClosureFinalizerStrategy;
use crate::errors::Result;
use crate::promiscuous::PromiscuousStrategy;
use crate::Strategy;
use hopr_db_sql::HoprDbAllOperations;
#[cfg(all(feature = "prometheus", not(test)))]
use {hopr_metrics::metrics::MultiGauge, strum::VariantNames};
#[cfg(all(feature = "prometheus", not(test)))]
lazy_static::lazy_static! {
static ref METRIC_ENABLED_STRATEGIES: MultiGauge =
MultiGauge::new("hopr_strategy_enabled_strategies", "List of enabled strategies", &["strategy"]).unwrap();
}
#[cfg_attr(test, mockall::automock)]
#[async_trait]
pub trait SingularStrategy: Display {
async fn on_tick(&self) -> Result<()> {
Ok(())
}
async fn on_acknowledged_winning_ticket(&self, _ack: &AcknowledgedTicket) -> Result<()> {
Ok(())
}
async fn on_own_channel_changed(
&self,
_channel: &ChannelEntry,
_direction: ChannelDirection,
_change: ChannelChange,
) -> Result<()> {
Ok(())
}
}
#[inline]
fn just_true() -> bool {
true
}
#[inline]
fn sixty() -> u64 {
60
}
#[inline]
fn empty_vector() -> Vec<Strategy> {
vec![]
}
#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct MultiStrategyConfig {
#[default = true]
#[serde(default = "just_true")]
pub on_fail_continue: bool,
#[default = true]
#[serde(default = "just_true")]
pub allow_recursive: bool,
#[default = 60]
#[serde(default = "sixty")]
#[validate(range(min = 1))]
pub execution_interval: u64,
#[default(_code = "vec![]")]
#[serde(default = "empty_vector")]
pub strategies: Vec<Strategy>,
}
pub struct MultiStrategy {
strategies: Vec<Box<dyn SingularStrategy + Send + Sync>>,
cfg: MultiStrategyConfig,
}
impl MultiStrategy {
pub fn new<Db>(
cfg: MultiStrategyConfig,
db: Db,
hopr_chain_actions: ChainActions<Db>,
ticket_aggregator: Arc<dyn TicketAggregatorTrait + Send + Sync + 'static>,
) -> Self
where
Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
{
let mut strategies = Vec::<Box<dyn SingularStrategy + Send + Sync>>::new();
#[cfg(all(feature = "prometheus", not(test)))]
Strategy::VARIANTS
.iter()
.for_each(|s| METRIC_ENABLED_STRATEGIES.set(&[*s], 0_f64));
for strategy in cfg.strategies.iter() {
match strategy {
Strategy::Promiscuous(sub_cfg) => strategies.push(Box::new(PromiscuousStrategy::new(
sub_cfg.clone(),
db.clone(),
hopr_chain_actions.clone(),
))),
Strategy::Aggregating(sub_cfg) => strategies.push(Box::new(AggregatingStrategy::new(
*sub_cfg,
db.clone(),
ticket_aggregator.clone(),
))),
Strategy::AutoRedeeming(sub_cfg) => strategies.push(Box::new(AutoRedeemingStrategy::new(
*sub_cfg,
db.clone(),
hopr_chain_actions.clone(),
))),
Strategy::AutoFunding(sub_cfg) => {
strategies.push(Box::new(AutoFundingStrategy::new(*sub_cfg, hopr_chain_actions.clone())))
}
Strategy::ClosureFinalizer(sub_cfg) => strategies.push(Box::new(ClosureFinalizerStrategy::new(
*sub_cfg,
db.clone(),
hopr_chain_actions.clone(),
))),
Strategy::Multi(sub_cfg) => {
if cfg.allow_recursive {
let mut cfg_clone = sub_cfg.clone();
cfg_clone.allow_recursive = false; strategies.push(Box::new(Self::new(
cfg_clone,
db.clone(),
hopr_chain_actions.clone(),
ticket_aggregator.clone(),
)))
} else {
error!("recursive multi-strategy not allowed and skipped")
}
}
Strategy::Passive => strategies.push(Box::new(Self {
cfg: Default::default(),
strategies: Vec::new(),
})),
}
#[cfg(all(feature = "prometheus", not(test)))]
METRIC_ENABLED_STRATEGIES.set(&[&strategy.to_string()], 1_f64);
}
Self { strategies, cfg }
}
}
impl Debug for MultiStrategy {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", Strategy::Multi(self.cfg.clone()))
}
}
impl Display for MultiStrategy {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", Strategy::Multi(self.cfg.clone()))
}
}
#[async_trait]
impl SingularStrategy for MultiStrategy {
async fn on_tick(&self) -> Result<()> {
for strategy in self.strategies.iter() {
if let Err(e) = strategy.on_tick().await {
if !self.cfg.on_fail_continue {
warn!(%self, %strategy, "on_tick chain stopped at strategy");
return Err(e);
}
}
}
Ok(())
}
async fn on_acknowledged_winning_ticket(&self, ack: &AcknowledgedTicket) -> Result<()> {
for strategy in self.strategies.iter() {
if let Err(e) = strategy.on_acknowledged_winning_ticket(ack).await {
if !self.cfg.on_fail_continue {
warn!(%self, %strategy, "on_acknowledged_ticket chain stopped at strategy");
return Err(e);
}
}
}
Ok(())
}
async fn on_own_channel_changed(
&self,
channel: &ChannelEntry,
direction: ChannelDirection,
change: ChannelChange,
) -> Result<()> {
for strategy in self.strategies.iter() {
if let Err(e) = strategy.on_own_channel_changed(channel, direction, change).await {
if !self.cfg.on_fail_continue {
warn!(%self, "on_channel_state_changed chain stopped at strategy");
return Err(e);
}
}
}
Ok(())
}
}
#[cfg(test)]
impl Display for MockSingularStrategy {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "mock")
}
}
#[cfg(test)]
mod tests {
use crate::errors::StrategyError::Other;
use crate::strategy::{MockSingularStrategy, MultiStrategy, MultiStrategyConfig, SingularStrategy};
use mockall::Sequence;
#[async_std::test]
async fn test_multi_strategy_logical_or_flow() -> anyhow::Result<()> {
let mut seq = Sequence::new();
let mut s1 = MockSingularStrategy::new();
s1.expect_on_tick()
.times(1)
.in_sequence(&mut seq)
.returning(|| Err(Other("error".into())));
let mut s2 = MockSingularStrategy::new();
s2.expect_on_tick().times(1).in_sequence(&mut seq).returning(|| Ok(()));
let cfg = MultiStrategyConfig {
on_fail_continue: true,
allow_recursive: true,
execution_interval: 1,
strategies: Vec::new(),
};
let ms = MultiStrategy {
strategies: vec![Box::new(s1), Box::new(s2)],
cfg,
};
ms.on_tick().await?;
Ok(())
}
#[async_std::test]
async fn test_multi_strategy_logical_and_flow() {
let mut seq = Sequence::new();
let mut s1 = MockSingularStrategy::new();
s1.expect_on_tick()
.times(1)
.in_sequence(&mut seq)
.returning(|| Err(Other("error".into())));
let mut s2 = MockSingularStrategy::new();
s2.expect_on_tick().never().in_sequence(&mut seq).returning(|| Ok(()));
let cfg = MultiStrategyConfig {
on_fail_continue: false,
allow_recursive: true,
execution_interval: 1,
strategies: Vec::new(),
};
let ms = MultiStrategy {
strategies: vec![Box::new(s1), Box::new(s2)],
cfg,
};
ms.on_tick().await.expect_err("on_tick should fail");
}
}