hopr_strategy/
strategy.rs

1//! ## Multi Strategy
2//!
3//! This strategy can stack multiple above strategies (called sub-strategies in this context) into one.
4//! Once a strategy event is triggered, it is executed sequentially on the sub-strategies one by one.
5//! The strategy can be configured to not call the next sub-strategy event if the sub-strategy currently being executed failed,
6//! which is done by setting the `on_fail_continue` flag.
7//!
8//! Hence, the sub-strategy chain then can behave as a logical AND (`on_fail_continue` = `false`) execution chain
9//! or logical OR (`on_fail_continue` = `true`) execution chain.
10//!
11//! A Multi Strategy can also contain another Multi Strategy as a sub-strategy if `allow_recursive` flag is set.
12//! However, this recursion is always allowed up to 2 levels only.
13//! Along with the `on_fail_continue` value, the recursive feature allows constructing more complex logical strategy chains.
14//!
15//! The MultiStrategy can also observe channels being `PendingToClose` and running out of closure grace period,
16//! and if this happens, it will issue automatically the final close transaction, which transitions the state to `Closed`.
17//! This can be controlled by the `finalize_channel_closure` parameter.
18//!
19//! For details on default parameters see [MultiStrategyConfig].
20use async_trait::async_trait;
21use serde::{Deserialize, Serialize};
22use std::{
23    fmt::{Debug, Display, Formatter},
24    sync::Arc,
25};
26use tracing::{error, warn};
27use validator::Validate;
28
29use hopr_chain_actions::ChainActions;
30use hopr_internal_types::prelude::*;
31use hopr_transport_protocol::ticket_aggregation::processor::TicketAggregatorTrait;
32
33use crate::aggregating::AggregatingStrategy;
34use crate::auto_funding::AutoFundingStrategy;
35use crate::auto_redeeming::AutoRedeemingStrategy;
36use crate::channel_finalizer::ClosureFinalizerStrategy;
37use crate::errors::Result;
38use crate::promiscuous::PromiscuousStrategy;
39use crate::Strategy;
40
41use hopr_db_sql::HoprDbAllOperations;
42#[cfg(all(feature = "prometheus", not(test)))]
43use {hopr_metrics::metrics::MultiGauge, strum::VariantNames};
44
45#[cfg(all(feature = "prometheus", not(test)))]
46lazy_static::lazy_static! {
47    static ref METRIC_ENABLED_STRATEGIES: MultiGauge =
48        MultiGauge::new("hopr_strategy_enabled_strategies", "List of enabled strategies", &["strategy"]).unwrap();
49}
50
51/// Basic single strategy.
52#[cfg_attr(test, mockall::automock)]
53#[async_trait]
54pub trait SingularStrategy: Display {
55    /// Strategy event raised at period intervals (typically each 1 minute).
56    async fn on_tick(&self) -> Result<()> {
57        Ok(())
58    }
59
60    /// Strategy event raised when a new **winning** acknowledged ticket is received in a channel
61    async fn on_acknowledged_winning_ticket(&self, _ack: &AcknowledgedTicket) -> Result<()> {
62        Ok(())
63    }
64
65    /// Strategy event raised whenever the Indexer registers a change on node's own channel.
66    async fn on_own_channel_changed(
67        &self,
68        _channel: &ChannelEntry,
69        _direction: ChannelDirection,
70        _change: ChannelChange,
71    ) -> Result<()> {
72        Ok(())
73    }
74}
75
76#[inline]
77fn just_true() -> bool {
78    true
79}
80
81#[inline]
82fn sixty() -> u64 {
83    60
84}
85
86#[inline]
87fn empty_vector() -> Vec<Strategy> {
88    vec![]
89}
90
91/// Configuration options for the `MultiStrategy` chain.
92/// If `fail_on_continue` is set, the `MultiStrategy` sequence behaves as logical AND chain,
93/// otherwise it behaves like a logical OR chain.
94#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
95#[serde(deny_unknown_fields)]
96pub struct MultiStrategyConfig {
97    /// Determines if the strategy should continue executing the next strategy if the current one failed.
98    /// If set to `true`, the strategy behaves like a logical AND chain of `SingularStrategies`
99    /// Otherwise, it behaves like a logical OR chain of `SingularStrategies`.
100    ///
101    /// Default is true.
102    #[default = true]
103    #[serde(default = "just_true")]
104    pub on_fail_continue: bool,
105
106    /// Indicate whether the `MultiStrategy` can contain another `MultiStrategy`.
107    ///
108    /// Default is true.
109    #[default = true]
110    #[serde(default = "just_true")]
111    pub allow_recursive: bool,
112
113    /// Execution interval of the configured strategies in seconds.
114    ///
115    /// Default is 60, minimum is 1.
116    #[default = 60]
117    #[serde(default = "sixty")]
118    #[validate(range(min = 1))]
119    pub execution_interval: u64,
120
121    /// Configuration of individual sub-strategies.
122    ///
123    /// Default is empty, which makes the `MultiStrategy` behave as passive.
124    #[default(_code = "vec![]")]
125    #[serde(default = "empty_vector")]
126    pub strategies: Vec<Strategy>,
127}
128
129/// Defines an execution chain of `SingularStrategies`.
130/// The `MultiStrategy` itself also implements the `SingularStrategy` trait,
131/// which makes it possible (along with different `on_fail_continue` policies) to construct
132/// various logical strategy chains.
133pub struct MultiStrategy {
134    strategies: Vec<Box<dyn SingularStrategy + Send + Sync>>,
135    cfg: MultiStrategyConfig,
136}
137
138impl MultiStrategy {
139    /// Constructs new `MultiStrategy`.
140    /// The strategy can contain another `MultiStrategy` if `allow_recursive` is set.
141    pub fn new<Db>(
142        cfg: MultiStrategyConfig,
143        db: Db,
144        hopr_chain_actions: ChainActions<Db>,
145        ticket_aggregator: Arc<dyn TicketAggregatorTrait + Send + Sync + 'static>,
146    ) -> Self
147    where
148        Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
149    {
150        let mut strategies = Vec::<Box<dyn SingularStrategy + Send + Sync>>::new();
151
152        #[cfg(all(feature = "prometheus", not(test)))]
153        Strategy::VARIANTS
154            .iter()
155            .for_each(|s| METRIC_ENABLED_STRATEGIES.set(&[*s], 0_f64));
156
157        for strategy in cfg.strategies.iter() {
158            match strategy {
159                Strategy::Promiscuous(sub_cfg) => strategies.push(Box::new(PromiscuousStrategy::new(
160                    sub_cfg.clone(),
161                    db.clone(),
162                    hopr_chain_actions.clone(),
163                ))),
164                Strategy::Aggregating(sub_cfg) => strategies.push(Box::new(AggregatingStrategy::new(
165                    *sub_cfg,
166                    db.clone(),
167                    ticket_aggregator.clone(),
168                ))),
169                Strategy::AutoRedeeming(sub_cfg) => strategies.push(Box::new(AutoRedeemingStrategy::new(
170                    *sub_cfg,
171                    db.clone(),
172                    hopr_chain_actions.clone(),
173                ))),
174                Strategy::AutoFunding(sub_cfg) => {
175                    strategies.push(Box::new(AutoFundingStrategy::new(*sub_cfg, hopr_chain_actions.clone())))
176                }
177                Strategy::ClosureFinalizer(sub_cfg) => strategies.push(Box::new(ClosureFinalizerStrategy::new(
178                    *sub_cfg,
179                    db.clone(),
180                    hopr_chain_actions.clone(),
181                ))),
182                Strategy::Multi(sub_cfg) => {
183                    if cfg.allow_recursive {
184                        let mut cfg_clone = sub_cfg.clone();
185                        cfg_clone.allow_recursive = false; // Do not allow more levels of recursion
186
187                        strategies.push(Box::new(Self::new(
188                            cfg_clone,
189                            db.clone(),
190                            hopr_chain_actions.clone(),
191                            ticket_aggregator.clone(),
192                        )))
193                    } else {
194                        error!("recursive multi-strategy not allowed and skipped")
195                    }
196                }
197
198                // Passive strategy = empty MultiStrategy
199                Strategy::Passive => strategies.push(Box::new(Self {
200                    cfg: Default::default(),
201                    strategies: Vec::new(),
202                })),
203            }
204
205            #[cfg(all(feature = "prometheus", not(test)))]
206            METRIC_ENABLED_STRATEGIES.set(&[&strategy.to_string()], 1_f64);
207        }
208
209        Self { strategies, cfg }
210    }
211}
212
213impl Debug for MultiStrategy {
214    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
215        write!(f, "{:?}", Strategy::Multi(self.cfg.clone()))
216    }
217}
218
219impl Display for MultiStrategy {
220    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
221        write!(f, "{}", Strategy::Multi(self.cfg.clone()))
222    }
223}
224
225#[async_trait]
226impl SingularStrategy for MultiStrategy {
227    async fn on_tick(&self) -> Result<()> {
228        for strategy in self.strategies.iter() {
229            if let Err(e) = strategy.on_tick().await {
230                if !self.cfg.on_fail_continue {
231                    warn!(%self, %strategy, "on_tick chain stopped at strategy");
232                    return Err(e);
233                }
234            }
235        }
236        Ok(())
237    }
238
239    async fn on_acknowledged_winning_ticket(&self, ack: &AcknowledgedTicket) -> Result<()> {
240        for strategy in self.strategies.iter() {
241            if let Err(e) = strategy.on_acknowledged_winning_ticket(ack).await {
242                if !self.cfg.on_fail_continue {
243                    warn!(%self, %strategy, "on_acknowledged_ticket chain stopped at strategy");
244                    return Err(e);
245                }
246            }
247        }
248        Ok(())
249    }
250
251    async fn on_own_channel_changed(
252        &self,
253        channel: &ChannelEntry,
254        direction: ChannelDirection,
255        change: ChannelChange,
256    ) -> Result<()> {
257        for strategy in self.strategies.iter() {
258            if let Err(e) = strategy.on_own_channel_changed(channel, direction, change).await {
259                if !self.cfg.on_fail_continue {
260                    warn!(%self, "on_channel_state_changed chain stopped at strategy");
261                    return Err(e);
262                }
263            }
264        }
265        Ok(())
266    }
267}
268
269#[cfg(test)]
270impl Display for MockSingularStrategy {
271    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
272        write!(f, "mock")
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use crate::errors::StrategyError::Other;
279    use crate::strategy::{MockSingularStrategy, MultiStrategy, MultiStrategyConfig, SingularStrategy};
280    use mockall::Sequence;
281
282    #[async_std::test]
283    async fn test_multi_strategy_logical_or_flow() -> anyhow::Result<()> {
284        let mut seq = Sequence::new();
285
286        let mut s1 = MockSingularStrategy::new();
287        s1.expect_on_tick()
288            .times(1)
289            .in_sequence(&mut seq)
290            .returning(|| Err(Other("error".into())));
291
292        let mut s2 = MockSingularStrategy::new();
293        s2.expect_on_tick().times(1).in_sequence(&mut seq).returning(|| Ok(()));
294
295        let cfg = MultiStrategyConfig {
296            on_fail_continue: true,
297            allow_recursive: true,
298            execution_interval: 1,
299            strategies: Vec::new(),
300        };
301
302        let ms = MultiStrategy {
303            strategies: vec![Box::new(s1), Box::new(s2)],
304            cfg,
305        };
306        ms.on_tick().await?;
307
308        Ok(())
309    }
310
311    #[async_std::test]
312    async fn test_multi_strategy_logical_and_flow() {
313        let mut seq = Sequence::new();
314
315        let mut s1 = MockSingularStrategy::new();
316        s1.expect_on_tick()
317            .times(1)
318            .in_sequence(&mut seq)
319            .returning(|| Err(Other("error".into())));
320
321        let mut s2 = MockSingularStrategy::new();
322        s2.expect_on_tick().never().in_sequence(&mut seq).returning(|| Ok(()));
323
324        let cfg = MultiStrategyConfig {
325            on_fail_continue: false,
326            allow_recursive: true,
327            execution_interval: 1,
328            strategies: Vec::new(),
329        };
330
331        let ms = MultiStrategy {
332            strategies: vec![Box::new(s1), Box::new(s2)],
333            cfg,
334        };
335        ms.on_tick().await.expect_err("on_tick should fail");
336    }
337}