hopr_strategy/
strategy.rs

1//! ## Multi Strategy
2//!
3//! This strategy can stack multiple above strategies (called sub-strategies in this context) into one.
4//! Once a strategy event is triggered, it is executed sequentially on the sub-strategies one by one.
5//! The strategy can be configured to not call the next sub-strategy event if the sub-strategy currently being executed
6//! failed, which is done by setting the `on_fail_continue` flag.
7//!
8//! Hence, the sub-strategy chain then can behave as a logical AND (`on_fail_continue` = `false`) execution chain
9//! or logical OR (`on_fail_continue` = `true`) execution chain.
10//!
11//! A Multi Strategy can also contain another Multi Strategy as a sub-strategy if `allow_recursive` flag is set.
12//! However, this recursion is always allowed up to 2 levels only.
13//! Along with the `on_fail_continue` value, the recursive feature allows constructing more complex logical strategy
14//! chains.
15//!
16//! The MultiStrategy can also observe channels being `PendingToClose` and running out of closure grace period,
17//! and if this happens, it will issue automatically the final close transaction, which transitions the state to
18//! `Closed`. This can be controlled by the `finalize_channel_closure` parameter.
19//!
20//! For details on default parameters see [MultiStrategyConfig].
21use std::{
22    fmt::{Debug, Display, Formatter},
23    sync::Arc,
24};
25
26use async_trait::async_trait;
27use hopr_chain_actions::ChainActions;
28use hopr_db_sql::HoprDbAllOperations;
29use hopr_internal_types::prelude::*;
30use hopr_transport_ticket_aggregation::TicketAggregatorTrait;
31use serde::{Deserialize, Serialize};
32use tracing::{error, warn};
33use validator::Validate;
34#[cfg(all(feature = "prometheus", not(test)))]
35use {hopr_metrics::metrics::MultiGauge, strum::VariantNames};
36
37use crate::{
38    Strategy, aggregating::AggregatingStrategy, auto_funding::AutoFundingStrategy,
39    auto_redeeming::AutoRedeemingStrategy, channel_finalizer::ClosureFinalizerStrategy, errors::Result,
40    promiscuous::PromiscuousStrategy,
41};
42
43#[cfg(all(feature = "prometheus", not(test)))]
44lazy_static::lazy_static! {
45    static ref METRIC_ENABLED_STRATEGIES: MultiGauge =
46        MultiGauge::new("hopr_strategy_enabled_strategies", "List of enabled strategies", &["strategy"]).unwrap();
47}
48
49/// Basic single strategy.
50#[cfg_attr(test, mockall::automock)]
51#[async_trait]
52pub trait SingularStrategy: Display {
53    /// Strategy event raised at period intervals (typically each 1 minute).
54    async fn on_tick(&self) -> Result<()> {
55        Ok(())
56    }
57
58    /// Strategy event raised when a new **winning** acknowledged ticket is received in a channel
59    async fn on_acknowledged_winning_ticket(&self, _ack: &AcknowledgedTicket) -> Result<()> {
60        Ok(())
61    }
62
63    /// Strategy event raised whenever the Indexer registers a change on node's own channel.
64    async fn on_own_channel_changed(
65        &self,
66        _channel: &ChannelEntry,
67        _direction: ChannelDirection,
68        _change: ChannelChange,
69    ) -> Result<()> {
70        Ok(())
71    }
72}
73
74#[inline]
75fn just_true() -> bool {
76    true
77}
78
79#[inline]
80fn sixty() -> u64 {
81    60
82}
83
84#[inline]
85fn empty_vector() -> Vec<Strategy> {
86    vec![]
87}
88
89/// Configuration options for the `MultiStrategy` chain.
90/// If `fail_on_continue` is set, the `MultiStrategy` sequence behaves as logical AND chain,
91/// otherwise it behaves like a logical OR chain.
92#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
93#[serde(deny_unknown_fields)]
94pub struct MultiStrategyConfig {
95    /// Determines if the strategy should continue executing the next strategy if the current one failed.
96    /// If set to `true`, the strategy behaves like a logical AND chain of `SingularStrategies`
97    /// Otherwise, it behaves like a logical OR chain of `SingularStrategies`.
98    ///
99    /// Default is true.
100    #[default = true]
101    #[serde(default = "just_true")]
102    pub on_fail_continue: bool,
103
104    /// Indicate whether the `MultiStrategy` can contain another `MultiStrategy`.
105    ///
106    /// Default is true.
107    #[default = true]
108    #[serde(default = "just_true")]
109    pub allow_recursive: bool,
110
111    /// Execution interval of the configured strategies in seconds.
112    ///
113    /// Default is 60, minimum is 1.
114    #[default = 60]
115    #[serde(default = "sixty")]
116    #[validate(range(min = 1))]
117    pub execution_interval: u64,
118
119    /// Configuration of individual sub-strategies.
120    ///
121    /// Default is empty, which makes the `MultiStrategy` behave as passive.
122    #[default(_code = "vec![]")]
123    #[serde(default = "empty_vector")]
124    pub strategies: Vec<Strategy>,
125}
126
127/// Defines an execution chain of `SingularStrategies`.
128/// The `MultiStrategy` itself also implements the `SingularStrategy` trait,
129/// which makes it possible (along with different `on_fail_continue` policies) to construct
130/// various logical strategy chains.
131pub struct MultiStrategy {
132    strategies: Vec<Box<dyn SingularStrategy + Send + Sync>>,
133    cfg: MultiStrategyConfig,
134}
135
136impl MultiStrategy {
137    /// Constructs new `MultiStrategy`.
138    /// The strategy can contain another `MultiStrategy` if `allow_recursive` is set.
139    pub fn new<Db>(
140        cfg: MultiStrategyConfig,
141        db: Db,
142        hopr_chain_actions: ChainActions<Db>,
143        ticket_aggregator: Arc<dyn TicketAggregatorTrait + Send + Sync + 'static>,
144    ) -> Self
145    where
146        Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
147    {
148        let mut strategies = Vec::<Box<dyn SingularStrategy + Send + Sync>>::new();
149
150        #[cfg(all(feature = "prometheus", not(test)))]
151        Strategy::VARIANTS
152            .iter()
153            .for_each(|s| METRIC_ENABLED_STRATEGIES.set(&[*s], 0_f64));
154
155        for strategy in cfg.strategies.iter() {
156            match strategy {
157                Strategy::Promiscuous(sub_cfg) => strategies.push(Box::new(PromiscuousStrategy::new(
158                    sub_cfg.clone(),
159                    db.clone(),
160                    hopr_chain_actions.clone(),
161                ))),
162                Strategy::Aggregating(sub_cfg) => strategies.push(Box::new(AggregatingStrategy::new(
163                    *sub_cfg,
164                    db.clone(),
165                    ticket_aggregator.clone(),
166                ))),
167                Strategy::AutoRedeeming(sub_cfg) => strategies.push(Box::new(AutoRedeemingStrategy::new(
168                    *sub_cfg,
169                    db.clone(),
170                    hopr_chain_actions.clone(),
171                ))),
172                Strategy::AutoFunding(sub_cfg) => {
173                    strategies.push(Box::new(AutoFundingStrategy::new(*sub_cfg, hopr_chain_actions.clone())))
174                }
175                Strategy::ClosureFinalizer(sub_cfg) => strategies.push(Box::new(ClosureFinalizerStrategy::new(
176                    *sub_cfg,
177                    db.clone(),
178                    hopr_chain_actions.clone(),
179                ))),
180                Strategy::Multi(sub_cfg) => {
181                    if cfg.allow_recursive {
182                        let mut cfg_clone = sub_cfg.clone();
183                        cfg_clone.allow_recursive = false; // Do not allow more levels of recursion
184
185                        strategies.push(Box::new(Self::new(
186                            cfg_clone,
187                            db.clone(),
188                            hopr_chain_actions.clone(),
189                            ticket_aggregator.clone(),
190                        )))
191                    } else {
192                        error!("recursive multi-strategy not allowed and skipped")
193                    }
194                }
195
196                // Passive strategy = empty MultiStrategy
197                Strategy::Passive => strategies.push(Box::new(Self {
198                    cfg: Default::default(),
199                    strategies: Vec::new(),
200                })),
201            }
202
203            #[cfg(all(feature = "prometheus", not(test)))]
204            METRIC_ENABLED_STRATEGIES.set(&[&strategy.to_string()], 1_f64);
205        }
206
207        Self { strategies, cfg }
208    }
209}
210
211impl Debug for MultiStrategy {
212    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
213        write!(f, "{:?}", Strategy::Multi(self.cfg.clone()))
214    }
215}
216
217impl Display for MultiStrategy {
218    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
219        write!(f, "{}", Strategy::Multi(self.cfg.clone()))
220    }
221}
222
223#[async_trait]
224impl SingularStrategy for MultiStrategy {
225    async fn on_tick(&self) -> Result<()> {
226        for strategy in self.strategies.iter() {
227            if let Err(e) = strategy.on_tick().await {
228                if !self.cfg.on_fail_continue {
229                    warn!(%self, %strategy, "on_tick chain stopped at strategy");
230                    return Err(e);
231                }
232            }
233        }
234        Ok(())
235    }
236
237    async fn on_acknowledged_winning_ticket(&self, ack: &AcknowledgedTicket) -> Result<()> {
238        for strategy in self.strategies.iter() {
239            if let Err(e) = strategy.on_acknowledged_winning_ticket(ack).await {
240                if !self.cfg.on_fail_continue {
241                    warn!(%self, %strategy, "on_acknowledged_ticket chain stopped at strategy");
242                    return Err(e);
243                }
244            }
245        }
246        Ok(())
247    }
248
249    async fn on_own_channel_changed(
250        &self,
251        channel: &ChannelEntry,
252        direction: ChannelDirection,
253        change: ChannelChange,
254    ) -> Result<()> {
255        for strategy in self.strategies.iter() {
256            if let Err(e) = strategy.on_own_channel_changed(channel, direction, change).await {
257                if !self.cfg.on_fail_continue {
258                    warn!(%self, "on_channel_state_changed chain stopped at strategy");
259                    return Err(e);
260                }
261            }
262        }
263        Ok(())
264    }
265}
266
267#[cfg(test)]
268impl Display for MockSingularStrategy {
269    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
270        write!(f, "mock")
271    }
272}
273
274#[cfg(test)]
275mod tests {
276    use mockall::Sequence;
277
278    use crate::{
279        errors::StrategyError::Other,
280        strategy::{MockSingularStrategy, MultiStrategy, MultiStrategyConfig, SingularStrategy},
281    };
282
283    #[tokio::test]
284    async fn test_multi_strategy_logical_or_flow() -> anyhow::Result<()> {
285        let mut seq = Sequence::new();
286
287        let mut s1 = MockSingularStrategy::new();
288        s1.expect_on_tick()
289            .times(1)
290            .in_sequence(&mut seq)
291            .returning(|| Err(Other("error".into())));
292
293        let mut s2 = MockSingularStrategy::new();
294        s2.expect_on_tick().times(1).in_sequence(&mut seq).returning(|| Ok(()));
295
296        let cfg = MultiStrategyConfig {
297            on_fail_continue: true,
298            allow_recursive: true,
299            execution_interval: 1,
300            strategies: Vec::new(),
301        };
302
303        let ms = MultiStrategy {
304            strategies: vec![Box::new(s1), Box::new(s2)],
305            cfg,
306        };
307        ms.on_tick().await?;
308
309        Ok(())
310    }
311
312    #[tokio::test]
313    async fn test_multi_strategy_logical_and_flow() {
314        let mut seq = Sequence::new();
315
316        let mut s1 = MockSingularStrategy::new();
317        s1.expect_on_tick()
318            .times(1)
319            .in_sequence(&mut seq)
320            .returning(|| Err(Other("error".into())));
321
322        let mut s2 = MockSingularStrategy::new();
323        s2.expect_on_tick().never().in_sequence(&mut seq).returning(|| Ok(()));
324
325        let cfg = MultiStrategyConfig {
326            on_fail_continue: false,
327            allow_recursive: true,
328            execution_interval: 1,
329            strategies: Vec::new(),
330        };
331
332        let ms = MultiStrategy {
333            strategies: vec![Box::new(s1), Box::new(s2)],
334            cfg,
335        };
336        ms.on_tick().await.expect_err("on_tick should fail");
337    }
338}