hopr_strategy/
strategy.rs

1//! ## Multi Strategy
2//!
3//! This strategy can stack multiple above strategies (called sub-strategies in this context) into one.
4//! Once a strategy event is triggered, it is executed sequentially on the sub-strategies one by one.
5//! The strategy can be configured to not call the next sub-strategy event if the sub-strategy currently being executed
6//! failed, which is done by setting the `on_fail_continue` flag.
7//!
8//! Hence, the sub-strategy chain then can behave as a logical AND (`on_fail_continue` = `false`) execution chain
9//! or logical OR (`on_fail_continue` = `true`) execution chain.
10//!
11//! A Multi Strategy can also contain another Multi Strategy as a sub-strategy if `allow_recursive` flag is set.
12//! However, this recursion is always allowed up to 2 levels only.
13//! Along with the `on_fail_continue` value, the recursive feature allows constructing more complex logical strategy
14//! chains.
15//!
16//! The MultiStrategy can also observe channels being `PendingToClose` and running out of closure grace period,
17//! and if this happens, it will issue automatically the final close transaction, which transitions the state to
18//! `Closed`. This can be controlled by the `finalize_channel_closure` parameter.
19//!
20//! For details on default parameters see [MultiStrategyConfig].
21use std::{
22    fmt::{Debug, Display, Formatter},
23    sync::Arc,
24};
25
26use async_trait::async_trait;
27use hopr_chain_actions::ChainActions;
28use hopr_db_sql::HoprDbAllOperations;
29use hopr_internal_types::prelude::*;
30use hopr_transport_ticket_aggregation::TicketAggregatorTrait;
31use serde::{Deserialize, Serialize};
32use tracing::{error, warn};
33use validator::Validate;
34#[cfg(all(feature = "prometheus", not(test)))]
35use {hopr_metrics::metrics::MultiGauge, strum::VariantNames};
36
37use crate::{
38    Strategy, aggregating::AggregatingStrategy, auto_funding::AutoFundingStrategy,
39    auto_redeeming::AutoRedeemingStrategy, channel_finalizer::ClosureFinalizerStrategy, errors::Result,
40    promiscuous::PromiscuousStrategy,
41};
42
43#[cfg(all(feature = "prometheus", not(test)))]
44lazy_static::lazy_static! {
45    static ref METRIC_ENABLED_STRATEGIES: MultiGauge =
46        MultiGauge::new("hopr_strategy_enabled_strategies", "List of enabled strategies", &["strategy"]).unwrap();
47}
48
49/// Basic single strategy.
50#[cfg_attr(test, mockall::automock)]
51#[async_trait]
52pub trait SingularStrategy: Display {
53    /// Strategy event raised at period intervals (typically each 1 minute).
54    async fn on_tick(&self) -> Result<()> {
55        Ok(())
56    }
57
58    /// Strategy event raised when a new **winning** acknowledged ticket is received in a channel
59    async fn on_acknowledged_winning_ticket(&self, _ack: &AcknowledgedTicket) -> Result<()> {
60        Ok(())
61    }
62
63    /// Strategy event raised whenever the Indexer registers a change on node's own channel.
64    async fn on_own_channel_changed(
65        &self,
66        _channel: &ChannelEntry,
67        _direction: ChannelDirection,
68        _change: ChannelChange,
69    ) -> Result<()> {
70        Ok(())
71    }
72}
73
74#[inline]
75fn just_true() -> bool {
76    true
77}
78
79#[inline]
80fn sixty() -> u64 {
81    60
82}
83
84#[inline]
85fn empty_vector() -> Vec<Strategy> {
86    vec![]
87}
88
89/// Configuration options for the `MultiStrategy` chain.
90/// If `fail_on_continue` is set, the `MultiStrategy` sequence behaves as logical AND chain,
91/// otherwise it behaves like a logical OR chain.
92#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
93#[serde(deny_unknown_fields)]
94pub struct MultiStrategyConfig {
95    /// Determines if the strategy should continue executing the next strategy if the current one failed.
96    /// If set to `true`, the strategy behaves like a logical AND chain of `SingularStrategies`
97    /// Otherwise, it behaves like a logical OR chain of `SingularStrategies`.
98    ///
99    /// Default is true.
100    #[default = true]
101    #[serde(default = "just_true")]
102    pub on_fail_continue: bool,
103
104    /// Indicate whether the `MultiStrategy` can contain another `MultiStrategy`.
105    ///
106    /// Default is true.
107    #[default = true]
108    #[serde(default = "just_true")]
109    pub allow_recursive: bool,
110
111    /// Execution interval of the configured strategies in seconds.
112    ///
113    /// Default is 60, minimum is 1.
114    #[default = 60]
115    #[serde(default = "sixty")]
116    #[validate(range(min = 1))]
117    pub execution_interval: u64,
118
119    /// Configuration of individual sub-strategies.
120    ///
121    /// Default is empty, which makes the `MultiStrategy` behave as passive.
122    #[default(_code = "vec![]")]
123    #[serde(default = "empty_vector")]
124    pub strategies: Vec<Strategy>,
125}
126
127/// Defines an execution chain of `SingularStrategies`.
128/// The `MultiStrategy` itself also implements the `SingularStrategy` trait,
129/// which makes it possible (along with different `on_fail_continue` policies) to construct
130/// various logical strategy chains.
131pub struct MultiStrategy {
132    strategies: Vec<Box<dyn SingularStrategy + Send + Sync>>,
133    cfg: MultiStrategyConfig,
134}
135
136impl MultiStrategy {
137    /// Constructs new `MultiStrategy`.
138    /// The strategy can contain another `MultiStrategy` if `allow_recursive` is set.
139    pub fn new<Db>(
140        cfg: MultiStrategyConfig,
141        db: Db,
142        hopr_chain_actions: ChainActions<Db>,
143        ticket_aggregator: Arc<dyn TicketAggregatorTrait + Send + Sync + 'static>,
144    ) -> Self
145    where
146        Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
147    {
148        let mut strategies = Vec::<Box<dyn SingularStrategy + Send + Sync>>::new();
149
150        #[cfg(all(feature = "prometheus", not(test)))]
151        Strategy::VARIANTS
152            .iter()
153            .for_each(|s| METRIC_ENABLED_STRATEGIES.set(&[*s], 0_f64));
154
155        for strategy in cfg.strategies.iter() {
156            match strategy {
157                Strategy::Promiscuous(sub_cfg) => strategies.push(Box::new(PromiscuousStrategy::new(
158                    sub_cfg.clone(),
159                    db.clone(),
160                    hopr_chain_actions.clone(),
161                ))),
162                Strategy::Aggregating(sub_cfg) => strategies.push(Box::new(AggregatingStrategy::new(
163                    *sub_cfg,
164                    db.clone(),
165                    ticket_aggregator.clone(),
166                ))),
167                Strategy::AutoRedeeming(sub_cfg) => strategies.push(Box::new(AutoRedeemingStrategy::new(
168                    *sub_cfg,
169                    hopr_chain_actions.clone(),
170                ))),
171                Strategy::AutoFunding(sub_cfg) => {
172                    strategies.push(Box::new(AutoFundingStrategy::new(*sub_cfg, hopr_chain_actions.clone())))
173                }
174                Strategy::ClosureFinalizer(sub_cfg) => strategies.push(Box::new(ClosureFinalizerStrategy::new(
175                    *sub_cfg,
176                    db.clone(),
177                    hopr_chain_actions.clone(),
178                ))),
179                Strategy::Multi(sub_cfg) => {
180                    if cfg.allow_recursive {
181                        let mut cfg_clone = sub_cfg.clone();
182                        cfg_clone.allow_recursive = false; // Do not allow more levels of recursion
183
184                        strategies.push(Box::new(Self::new(
185                            cfg_clone,
186                            db.clone(),
187                            hopr_chain_actions.clone(),
188                            ticket_aggregator.clone(),
189                        )))
190                    } else {
191                        error!("recursive multi-strategy not allowed and skipped")
192                    }
193                }
194
195                // Passive strategy = empty MultiStrategy
196                Strategy::Passive => strategies.push(Box::new(Self {
197                    cfg: Default::default(),
198                    strategies: Vec::new(),
199                })),
200            }
201
202            #[cfg(all(feature = "prometheus", not(test)))]
203            METRIC_ENABLED_STRATEGIES.set(&[&strategy.to_string()], 1_f64);
204        }
205
206        Self { strategies, cfg }
207    }
208}
209
210impl Debug for MultiStrategy {
211    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
212        write!(f, "{:?}", Strategy::Multi(self.cfg.clone()))
213    }
214}
215
216impl Display for MultiStrategy {
217    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
218        write!(f, "{}", Strategy::Multi(self.cfg.clone()))
219    }
220}
221
222#[async_trait]
223impl SingularStrategy for MultiStrategy {
224    async fn on_tick(&self) -> Result<()> {
225        for strategy in self.strategies.iter() {
226            if let Err(e) = strategy.on_tick().await {
227                if !self.cfg.on_fail_continue {
228                    warn!(%self, %strategy, "on_tick chain stopped at strategy");
229                    return Err(e);
230                }
231            }
232        }
233        Ok(())
234    }
235
236    async fn on_acknowledged_winning_ticket(&self, ack: &AcknowledgedTicket) -> Result<()> {
237        for strategy in self.strategies.iter() {
238            if let Err(e) = strategy.on_acknowledged_winning_ticket(ack).await {
239                if !self.cfg.on_fail_continue {
240                    warn!(%self, %strategy, "on_acknowledged_ticket chain stopped at strategy");
241                    return Err(e);
242                }
243            }
244        }
245        Ok(())
246    }
247
248    async fn on_own_channel_changed(
249        &self,
250        channel: &ChannelEntry,
251        direction: ChannelDirection,
252        change: ChannelChange,
253    ) -> Result<()> {
254        for strategy in self.strategies.iter() {
255            if let Err(e) = strategy.on_own_channel_changed(channel, direction, change).await {
256                if !self.cfg.on_fail_continue {
257                    warn!(%self, "on_channel_state_changed chain stopped at strategy");
258                    return Err(e);
259                }
260            }
261        }
262        Ok(())
263    }
264}
265
266#[cfg(test)]
267impl Display for MockSingularStrategy {
268    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
269        write!(f, "mock")
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use mockall::Sequence;
276
277    use crate::{
278        errors::StrategyError::Other,
279        strategy::{MockSingularStrategy, MultiStrategy, MultiStrategyConfig, SingularStrategy},
280    };
281
282    #[tokio::test]
283    async fn test_multi_strategy_logical_or_flow() -> anyhow::Result<()> {
284        let mut seq = Sequence::new();
285
286        let mut s1 = MockSingularStrategy::new();
287        s1.expect_on_tick()
288            .times(1)
289            .in_sequence(&mut seq)
290            .returning(|| Err(Other("error".into())));
291
292        let mut s2 = MockSingularStrategy::new();
293        s2.expect_on_tick().times(1).in_sequence(&mut seq).returning(|| Ok(()));
294
295        let cfg = MultiStrategyConfig {
296            on_fail_continue: true,
297            allow_recursive: true,
298            execution_interval: 1,
299            strategies: Vec::new(),
300        };
301
302        let ms = MultiStrategy {
303            strategies: vec![Box::new(s1), Box::new(s2)],
304            cfg,
305        };
306        ms.on_tick().await?;
307
308        Ok(())
309    }
310
311    #[tokio::test]
312    async fn test_multi_strategy_logical_and_flow() {
313        let mut seq = Sequence::new();
314
315        let mut s1 = MockSingularStrategy::new();
316        s1.expect_on_tick()
317            .times(1)
318            .in_sequence(&mut seq)
319            .returning(|| Err(Other("error".into())));
320
321        let mut s2 = MockSingularStrategy::new();
322        s2.expect_on_tick().never().in_sequence(&mut seq).returning(|| Ok(()));
323
324        let cfg = MultiStrategyConfig {
325            on_fail_continue: false,
326            allow_recursive: true,
327            execution_interval: 1,
328            strategies: Vec::new(),
329        };
330
331        let ms = MultiStrategy {
332            strategies: vec![Box::new(s1), Box::new(s2)],
333            cfg,
334        };
335        ms.on_tick().await.expect_err("on_tick should fail");
336    }
337}