Skip to main content

hopr_strategy/
strategy.rs

1//! ## Multi Strategy
2//!
3//! This strategy can stack the multiple strategies (called sub-strategies in this context) into one.
4//! Once a strategy event is triggered, it is executed sequentially on the sub-strategies one by one.
5//! The strategy can be configured to not call the next sub-strategy event if the sub-strategy currently being executed
6//! failed, which is done by setting the `on_fail_continue` flag.
7//!
8//! Hence, the sub-strategy chain then can behave as a logical AND (`on_fail_continue` = `false`) execution chain
9//! or logical OR (`on_fail_continue` = `true`) execution chain.
10//!
11//! A Multi Strategy can also contain another Multi Strategy as a sub-strategy if the ` allow_recursive ` flag is set.
12//! However, this recursion is always allowed up to 2 levels only.
13//! Along with the `on_fail_continue` value, the recursive feature allows constructing more complex logical strategy
14//! chains.
15//!
16//! The MultiStrategy can also observe channels being `PendingToClose` and running out of a closure grace period,
17//! and if this happens, it will issue automatically the final close transaction, which transitions the state to
18//! `Closed`. This can be controlled by the `finalize_channel_closure` parameter.
19//!
20//! For details on default parameters see [`MultiStrategyConfig`].
21use std::fmt::{Debug, Display, Formatter};
22
23use async_trait::async_trait;
24use hopr_lib::{
25    ChannelChange, ChannelDirection, ChannelEntry, VerifiedTicket,
26    api::{
27        chain::{ChainReadChannelOperations, ChainReadSafeOperations, ChainValues, ChainWriteChannelOperations},
28        db::TicketSelector,
29    },
30};
31use serde::{Deserialize, Serialize};
32#[cfg(all(feature = "prometheus", not(test)))]
33use strum::VariantNames;
34use tracing::{error, warn};
35use validator::{Validate, ValidationError};
36
37use crate::{
38    Strategy,
39    auto_funding::AutoFundingStrategy,
40    auto_redeeming::AutoRedeemingStrategy,
41    channel_finalizer::ClosureFinalizerStrategy,
42    errors::{Result, StrategyError},
43};
44
45#[cfg(all(feature = "prometheus", not(test)))]
46lazy_static::lazy_static! {
47    static ref METRIC_ENABLED_STRATEGIES: hopr_metrics::MultiGauge =
48        hopr_metrics::MultiGauge::new("hopr_strategy_enabled_strategies", "List of enabled strategies", &["strategy"]).unwrap();
49}
50
51/// Basic single strategy.
52#[cfg_attr(test, mockall::automock)]
53#[async_trait]
54pub trait SingularStrategy: Display {
55    /// Strategy event raised at period intervals (typically each 1 minute).
56    async fn on_tick(&self) -> Result<()> {
57        Ok(())
58    }
59
60    /// Strategy event raised when a new **winning** acknowledged ticket is received in a channel
61    async fn on_acknowledged_winning_ticket(&self, _ack: &VerifiedTicket) -> Result<()> {
62        Ok(())
63    }
64
65    /// Strategy event raised whenever the Indexer registers a change on node's own channel.
66    async fn on_own_channel_changed(
67        &self,
68        _channel: &ChannelEntry,
69        _direction: ChannelDirection,
70        _change: ChannelChange,
71    ) -> Result<()> {
72        Ok(())
73    }
74}
75
76#[inline]
77fn just_true() -> bool {
78    true
79}
80
81#[inline]
82fn sixty_seconds() -> std::time::Duration {
83    std::time::Duration::from_secs(60)
84}
85
86#[inline]
87fn empty_vector() -> Vec<Strategy> {
88    vec![]
89}
90
91fn validate_execution_interval(interval: &std::time::Duration) -> std::result::Result<(), ValidationError> {
92    if interval < &std::time::Duration::from_secs(10) {
93        Err(ValidationError::new(
94            "strategy execution interval must be at least 1 second",
95        ))
96    } else {
97        Ok(())
98    }
99}
100
101/// Configuration options for the `MultiStrategy` chain.
102/// If `fail_on_continue` is set, the `MultiStrategy` sequence behaves as logical AND chain,
103/// otherwise it behaves like a logical OR chain.
104#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
105#[serde(deny_unknown_fields)]
106pub struct MultiStrategyConfig {
107    /// Determines if the strategy should continue executing the next strategy if the current one failed.
108    /// If set to `true`, the strategy behaves like a logical AND chain of `SingularStrategies`
109    /// Otherwise, it behaves like a logical OR chain of `SingularStrategies`.
110    ///
111    /// Default is true.
112    #[default = true]
113    #[serde(default = "just_true")]
114    pub on_fail_continue: bool,
115
116    /// Indicate whether the `MultiStrategy` can contain another `MultiStrategy`.
117    ///
118    /// Default is true.
119    #[default = true]
120    #[serde(default = "just_true")]
121    pub allow_recursive: bool,
122
123    /// Execution interval of the configured strategies in seconds.
124    ///
125    /// Default is 60 seconds, minimum is 10 seconds.
126    #[default(sixty_seconds())]
127    #[serde(default = "sixty_seconds", with = "humantime_serde")]
128    #[validate(custom(function = "validate_execution_interval"))]
129    pub execution_interval: std::time::Duration,
130
131    /// Configuration of individual sub-strategies.
132    ///
133    /// Default is empty, which makes the `MultiStrategy` behave as passive.
134    #[default(_code = "vec![]")]
135    #[serde(default = "empty_vector")]
136    pub strategies: Vec<Strategy>,
137}
138
139/// Defines an execution chain of `SingularStrategies`.
140/// The `MultiStrategy` itself also implements the `SingularStrategy` trait,
141/// which makes it possible (along with different `on_fail_continue` policies) to construct
142/// various logical strategy chains.
143pub struct MultiStrategy {
144    strategies: Vec<Box<dyn SingularStrategy + Send + Sync>>,
145    cfg: MultiStrategyConfig,
146}
147
148impl MultiStrategy {
149    /// Constructs new `MultiStrategy`.
150    ///
151    /// The strategy can contain another `MultiStrategy` if `allow_recursive` is set.
152    pub fn new<A, R>(cfg: MultiStrategyConfig, hopr_chain_actions: A, redeem_sink: R) -> Self
153    where
154        A: ChainReadChannelOperations
155            + ChainReadSafeOperations
156            + ChainValues
157            + ChainWriteChannelOperations
158            + Clone
159            + Send
160            + Sync
161            + 'static,
162        R: futures::Sink<TicketSelector> + Sync + Send + Clone + 'static,
163        StrategyError: From<R::Error>,
164    {
165        let mut strategies = Vec::<Box<dyn SingularStrategy + Send + Sync>>::new();
166
167        #[cfg(all(feature = "prometheus", not(test)))]
168        Strategy::VARIANTS
169            .iter()
170            .for_each(|s| METRIC_ENABLED_STRATEGIES.set(&[*s], 0_f64));
171
172        for strategy in cfg.strategies.iter() {
173            match strategy {
174                Strategy::AutoRedeeming(sub_cfg) => strategies.push(Box::new(AutoRedeemingStrategy::new(
175                    *sub_cfg,
176                    hopr_chain_actions.clone(),
177                    redeem_sink.clone(),
178                ))),
179                Strategy::AutoFunding(sub_cfg) => {
180                    strategies.push(Box::new(AutoFundingStrategy::new(*sub_cfg, hopr_chain_actions.clone())))
181                }
182                Strategy::ClosureFinalizer(sub_cfg) => strategies.push(Box::new(ClosureFinalizerStrategy::new(
183                    *sub_cfg,
184                    hopr_chain_actions.clone(),
185                ))),
186                Strategy::Multi(sub_cfg) => {
187                    if cfg.allow_recursive {
188                        let mut cfg_clone = sub_cfg.clone();
189                        cfg_clone.allow_recursive = false; // Do not allow more levels of recursion
190
191                        strategies.push(Box::new(Self::new(
192                            cfg_clone,
193                            hopr_chain_actions.clone(),
194                            redeem_sink.clone(),
195                        )))
196                    } else {
197                        error!("recursive multi-strategy not allowed and skipped")
198                    }
199                }
200
201                // Passive strategy = empty MultiStrategy
202                Strategy::Passive => strategies.push(Box::new(Self {
203                    cfg: Default::default(),
204                    strategies: Vec::new(),
205                })),
206            }
207
208            #[cfg(all(feature = "prometheus", not(test)))]
209            METRIC_ENABLED_STRATEGIES.set(&[&strategy.to_string()], 1_f64);
210        }
211
212        Self { strategies, cfg }
213    }
214}
215
216impl Debug for MultiStrategy {
217    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
218        write!(f, "{:?}", Strategy::Multi(self.cfg.clone()))
219    }
220}
221
222impl Display for MultiStrategy {
223    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
224        write!(f, "{}", Strategy::Multi(self.cfg.clone()))
225    }
226}
227
228#[async_trait]
229impl SingularStrategy for MultiStrategy {
230    async fn on_tick(&self) -> Result<()> {
231        for strategy in self.strategies.iter() {
232            if let Err(e) = strategy.on_tick().await
233                && !self.cfg.on_fail_continue
234            {
235                warn!(%self, %strategy, "on_tick chain stopped at strategy");
236                return Err(e);
237            }
238        }
239        Ok(())
240    }
241
242    async fn on_acknowledged_winning_ticket(&self, ack: &VerifiedTicket) -> Result<()> {
243        for strategy in self.strategies.iter() {
244            if let Err(e) = strategy.on_acknowledged_winning_ticket(ack).await
245                && !self.cfg.on_fail_continue
246            {
247                warn!(%self, %strategy, "on_acknowledged_ticket chain stopped at strategy");
248                return Err(e);
249            }
250        }
251        Ok(())
252    }
253
254    async fn on_own_channel_changed(
255        &self,
256        channel: &ChannelEntry,
257        direction: ChannelDirection,
258        change: ChannelChange,
259    ) -> Result<()> {
260        for strategy in self.strategies.iter() {
261            if let Err(e) = strategy.on_own_channel_changed(channel, direction, change).await
262                && !self.cfg.on_fail_continue
263            {
264                warn!(%self, "on_channel_state_changed chain stopped at strategy");
265                return Err(e);
266            }
267        }
268        Ok(())
269    }
270}
271
272#[cfg(test)]
273impl Display for MockSingularStrategy {
274    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
275        write!(f, "mock")
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use mockall::Sequence;
282
283    use crate::{
284        errors::StrategyError::Other,
285        strategy::{MockSingularStrategy, MultiStrategy, MultiStrategyConfig, SingularStrategy},
286    };
287
288    #[tokio::test]
289    async fn test_multi_strategy_logical_or_flow() -> anyhow::Result<()> {
290        let mut seq = Sequence::new();
291
292        let mut s1 = MockSingularStrategy::new();
293        s1.expect_on_tick()
294            .times(1)
295            .in_sequence(&mut seq)
296            .returning(|| Err(Other("error".into())));
297
298        let mut s2 = MockSingularStrategy::new();
299        s2.expect_on_tick().times(1).in_sequence(&mut seq).returning(|| Ok(()));
300
301        let cfg = MultiStrategyConfig {
302            on_fail_continue: true,
303            allow_recursive: true,
304            execution_interval: std::time::Duration::from_secs(1),
305            strategies: Vec::new(),
306        };
307
308        let ms = MultiStrategy {
309            strategies: vec![Box::new(s1), Box::new(s2)],
310            cfg,
311        };
312        ms.on_tick().await?;
313
314        Ok(())
315    }
316
317    #[tokio::test]
318    async fn test_multi_strategy_logical_and_flow() {
319        let mut seq = Sequence::new();
320
321        let mut s1 = MockSingularStrategy::new();
322        s1.expect_on_tick()
323            .times(1)
324            .in_sequence(&mut seq)
325            .returning(|| Err(Other("error".into())));
326
327        let mut s2 = MockSingularStrategy::new();
328        s2.expect_on_tick().never().in_sequence(&mut seq).returning(|| Ok(()));
329
330        let cfg = MultiStrategyConfig {
331            on_fail_continue: false,
332            allow_recursive: true,
333            execution_interval: std::time::Duration::from_secs(1),
334            strategies: Vec::new(),
335        };
336
337        let ms = MultiStrategy {
338            strategies: vec![Box::new(s1), Box::new(s2)],
339            cfg,
340        };
341        ms.on_tick().await.expect_err("on_tick should fail");
342    }
343}