hopr_strategy/
strategy.rs

1//! ## Multi Strategy
2//!
3//! This strategy can stack the multiple strategies (called sub-strategies in this context) into one.
4//! Once a strategy event is triggered, it is executed sequentially on the sub-strategies one by one.
5//! The strategy can be configured to not call the next sub-strategy event if the sub-strategy currently being executed
6//! failed, which is done by setting the `on_fail_continue` flag.
7//!
8//! Hence, the sub-strategy chain then can behave as a logical AND (`on_fail_continue` = `false`) execution chain
9//! or logical OR (`on_fail_continue` = `true`) execution chain.
10//!
11//! A Multi Strategy can also contain another Multi Strategy as a sub-strategy if the ` allow_recursive ` flag is set.
12//! However, this recursion is always allowed up to 2 levels only.
13//! Along with the `on_fail_continue` value, the recursive feature allows constructing more complex logical strategy
14//! chains.
15//!
16//! The MultiStrategy can also observe channels being `PendingToClose` and running out of a closure grace period,
17//! and if this happens, it will issue automatically the final close transaction, which transitions the state to
18//! `Closed`. This can be controlled by the `finalize_channel_closure` parameter.
19//!
20//! For details on default parameters see [`MultiStrategyConfig`].
21use std::fmt::{Debug, Display, Formatter};
22
23use async_trait::async_trait;
24use hopr_lib::{
25    ChannelChange, ChannelDirection, ChannelEntry, VerifiedTicket,
26    exports::api::{
27        chain::{ChainReadChannelOperations, ChainWriteChannelOperations},
28        db::TicketSelector,
29    },
30};
31use serde::{Deserialize, Serialize};
32use serde_with::serde_as;
33#[cfg(all(feature = "prometheus", not(test)))]
34use strum::VariantNames;
35use tracing::{error, warn};
36use validator::{Validate, ValidationError};
37
38use crate::{
39    Strategy,
40    auto_funding::AutoFundingStrategy,
41    auto_redeeming::AutoRedeemingStrategy,
42    channel_finalizer::ClosureFinalizerStrategy,
43    errors::{Result, StrategyError},
44};
45
46#[cfg(all(feature = "prometheus", not(test)))]
47lazy_static::lazy_static! {
48    static ref METRIC_ENABLED_STRATEGIES: hopr_metrics::MultiGauge =
49        hopr_metrics::MultiGauge::new("hopr_strategy_enabled_strategies", "List of enabled strategies", &["strategy"]).unwrap();
50}
51
52/// Basic single strategy.
53#[cfg_attr(test, mockall::automock)]
54#[async_trait]
55pub trait SingularStrategy: Display {
56    /// Strategy event raised at period intervals (typically each 1 minute).
57    async fn on_tick(&self) -> Result<()> {
58        Ok(())
59    }
60
61    /// Strategy event raised when a new **winning** acknowledged ticket is received in a channel
62    async fn on_acknowledged_winning_ticket(&self, _ack: &VerifiedTicket) -> Result<()> {
63        Ok(())
64    }
65
66    /// Strategy event raised whenever the Indexer registers a change on node's own channel.
67    async fn on_own_channel_changed(
68        &self,
69        _channel: &ChannelEntry,
70        _direction: ChannelDirection,
71        _change: ChannelChange,
72    ) -> Result<()> {
73        Ok(())
74    }
75}
76
77#[inline]
78fn just_true() -> bool {
79    true
80}
81
82#[inline]
83fn sixty_seconds() -> std::time::Duration {
84    std::time::Duration::from_secs(60)
85}
86
87#[inline]
88fn empty_vector() -> Vec<Strategy> {
89    vec![]
90}
91
92fn validate_execution_interval(interval: &std::time::Duration) -> std::result::Result<(), ValidationError> {
93    if interval < &std::time::Duration::from_secs(10) {
94        Err(ValidationError::new(
95            "strategy execution interval must be at least 1 second",
96        ))
97    } else {
98        Ok(())
99    }
100}
101
102/// Configuration options for the `MultiStrategy` chain.
103/// If `fail_on_continue` is set, the `MultiStrategy` sequence behaves as logical AND chain,
104/// otherwise it behaves like a logical OR chain.
105#[serde_as]
106#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
107#[serde(deny_unknown_fields)]
108pub struct MultiStrategyConfig {
109    /// Determines if the strategy should continue executing the next strategy if the current one failed.
110    /// If set to `true`, the strategy behaves like a logical AND chain of `SingularStrategies`
111    /// Otherwise, it behaves like a logical OR chain of `SingularStrategies`.
112    ///
113    /// Default is true.
114    #[default = true]
115    #[serde(default = "just_true")]
116    pub on_fail_continue: bool,
117
118    /// Indicate whether the `MultiStrategy` can contain another `MultiStrategy`.
119    ///
120    /// Default is true.
121    #[default = true]
122    #[serde(default = "just_true")]
123    pub allow_recursive: bool,
124
125    /// Execution interval of the configured strategies in seconds.
126    ///
127    /// Default is 60 seconds, minimum is 10 seconds.
128    #[default(sixty_seconds())]
129    #[serde(default = "sixty_seconds")]
130    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
131    #[validate(custom(function = "validate_execution_interval"))]
132    pub execution_interval: std::time::Duration,
133
134    /// Configuration of individual sub-strategies.
135    ///
136    /// Default is empty, which makes the `MultiStrategy` behave as passive.
137    #[default(_code = "vec![]")]
138    #[serde(default = "empty_vector")]
139    pub strategies: Vec<Strategy>,
140}
141
142/// Defines an execution chain of `SingularStrategies`.
143/// The `MultiStrategy` itself also implements the `SingularStrategy` trait,
144/// which makes it possible (along with different `on_fail_continue` policies) to construct
145/// various logical strategy chains.
146pub struct MultiStrategy {
147    strategies: Vec<Box<dyn SingularStrategy + Send + Sync>>,
148    cfg: MultiStrategyConfig,
149}
150
151impl MultiStrategy {
152    /// Constructs new `MultiStrategy`.
153    ///
154    /// The strategy can contain another `MultiStrategy` if `allow_recursive` is set.
155    pub fn new<A, R>(cfg: MultiStrategyConfig, hopr_chain_actions: A, redeem_sink: R) -> Self
156    where
157        A: ChainReadChannelOperations + ChainWriteChannelOperations + Clone + Send + Sync + 'static,
158        R: futures::Sink<TicketSelector> + Sync + Send + Clone + 'static,
159        StrategyError: From<R::Error>,
160    {
161        let mut strategies = Vec::<Box<dyn SingularStrategy + Send + Sync>>::new();
162
163        #[cfg(all(feature = "prometheus", not(test)))]
164        Strategy::VARIANTS
165            .iter()
166            .for_each(|s| METRIC_ENABLED_STRATEGIES.set(&[*s], 0_f64));
167
168        for strategy in cfg.strategies.iter() {
169            match strategy {
170                Strategy::AutoRedeeming(sub_cfg) => strategies.push(Box::new(AutoRedeemingStrategy::new(
171                    *sub_cfg,
172                    hopr_chain_actions.clone(),
173                    redeem_sink.clone(),
174                ))),
175                Strategy::AutoFunding(sub_cfg) => {
176                    strategies.push(Box::new(AutoFundingStrategy::new(*sub_cfg, hopr_chain_actions.clone())))
177                }
178                Strategy::ClosureFinalizer(sub_cfg) => strategies.push(Box::new(ClosureFinalizerStrategy::new(
179                    *sub_cfg,
180                    hopr_chain_actions.clone(),
181                ))),
182                Strategy::Multi(sub_cfg) => {
183                    if cfg.allow_recursive {
184                        let mut cfg_clone = sub_cfg.clone();
185                        cfg_clone.allow_recursive = false; // Do not allow more levels of recursion
186
187                        strategies.push(Box::new(Self::new(
188                            cfg_clone,
189                            hopr_chain_actions.clone(),
190                            redeem_sink.clone(),
191                        )))
192                    } else {
193                        error!("recursive multi-strategy not allowed and skipped")
194                    }
195                }
196
197                // Passive strategy = empty MultiStrategy
198                Strategy::Passive => strategies.push(Box::new(Self {
199                    cfg: Default::default(),
200                    strategies: Vec::new(),
201                })),
202            }
203
204            #[cfg(all(feature = "prometheus", not(test)))]
205            METRIC_ENABLED_STRATEGIES.set(&[&strategy.to_string()], 1_f64);
206        }
207
208        Self { strategies, cfg }
209    }
210}
211
212impl Debug for MultiStrategy {
213    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
214        write!(f, "{:?}", Strategy::Multi(self.cfg.clone()))
215    }
216}
217
218impl Display for MultiStrategy {
219    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
220        write!(f, "{}", Strategy::Multi(self.cfg.clone()))
221    }
222}
223
224#[async_trait]
225impl SingularStrategy for MultiStrategy {
226    async fn on_tick(&self) -> Result<()> {
227        for strategy in self.strategies.iter() {
228            if let Err(e) = strategy.on_tick().await {
229                if !self.cfg.on_fail_continue {
230                    warn!(%self, %strategy, "on_tick chain stopped at strategy");
231                    return Err(e);
232                }
233            }
234        }
235        Ok(())
236    }
237
238    async fn on_acknowledged_winning_ticket(&self, ack: &VerifiedTicket) -> Result<()> {
239        for strategy in self.strategies.iter() {
240            if let Err(e) = strategy.on_acknowledged_winning_ticket(ack).await {
241                if !self.cfg.on_fail_continue {
242                    warn!(%self, %strategy, "on_acknowledged_ticket chain stopped at strategy");
243                    return Err(e);
244                }
245            }
246        }
247        Ok(())
248    }
249
250    async fn on_own_channel_changed(
251        &self,
252        channel: &ChannelEntry,
253        direction: ChannelDirection,
254        change: ChannelChange,
255    ) -> Result<()> {
256        for strategy in self.strategies.iter() {
257            if let Err(e) = strategy.on_own_channel_changed(channel, direction, change).await {
258                if !self.cfg.on_fail_continue {
259                    warn!(%self, "on_channel_state_changed chain stopped at strategy");
260                    return Err(e);
261                }
262            }
263        }
264        Ok(())
265    }
266}
267
268#[cfg(test)]
269impl Display for MockSingularStrategy {
270    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
271        write!(f, "mock")
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use mockall::Sequence;
278
279    use crate::{
280        errors::StrategyError::Other,
281        strategy::{MockSingularStrategy, MultiStrategy, MultiStrategyConfig, SingularStrategy},
282    };
283
284    #[tokio::test]
285    async fn test_multi_strategy_logical_or_flow() -> anyhow::Result<()> {
286        let mut seq = Sequence::new();
287
288        let mut s1 = MockSingularStrategy::new();
289        s1.expect_on_tick()
290            .times(1)
291            .in_sequence(&mut seq)
292            .returning(|| Err(Other("error".into())));
293
294        let mut s2 = MockSingularStrategy::new();
295        s2.expect_on_tick().times(1).in_sequence(&mut seq).returning(|| Ok(()));
296
297        let cfg = MultiStrategyConfig {
298            on_fail_continue: true,
299            allow_recursive: true,
300            execution_interval: std::time::Duration::from_secs(1),
301            strategies: Vec::new(),
302        };
303
304        let ms = MultiStrategy {
305            strategies: vec![Box::new(s1), Box::new(s2)],
306            cfg,
307        };
308        ms.on_tick().await?;
309
310        Ok(())
311    }
312
313    #[tokio::test]
314    async fn test_multi_strategy_logical_and_flow() {
315        let mut seq = Sequence::new();
316
317        let mut s1 = MockSingularStrategy::new();
318        s1.expect_on_tick()
319            .times(1)
320            .in_sequence(&mut seq)
321            .returning(|| Err(Other("error".into())));
322
323        let mut s2 = MockSingularStrategy::new();
324        s2.expect_on_tick().never().in_sequence(&mut seq).returning(|| Ok(()));
325
326        let cfg = MultiStrategyConfig {
327            on_fail_continue: false,
328            allow_recursive: true,
329            execution_interval: std::time::Duration::from_secs(1),
330            strategies: Vec::new(),
331        };
332
333        let ms = MultiStrategy {
334            strategies: vec![Box::new(s1), Box::new(s2)],
335            cfg,
336        };
337        ms.on_tick().await.expect_err("on_tick should fail");
338    }
339}