hopr_strategy/
strategy.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
//! ## Multi Strategy
//!
//! This strategy can stack multiple above strategies (called sub-strategies in this context) into one.
//! Once a strategy event is triggered, it is executed sequentially on the sub-strategies one by one.
//! The strategy can be configured to not call the next sub-strategy event if the sub-strategy currently being executed failed,
//! which is done by setting the `on_fail_continue` flag.
//!
//! Hence, the sub-strategy chain then can behave as a logical AND (`on_fail_continue` = `false`) execution chain
//! or logical OR (`on_fail_continue` = `true`) execution chain.
//!
//! A Multi Strategy can also contain another Multi Strategy as a sub-strategy if `allow_recursive` flag is set.
//! However, this recursion is always allowed up to 2 levels only.
//! Along with the `on_fail_continue` value, the recursive feature allows constructing more complex logical strategy chains.
//!
//! The MultiStrategy can also observe channels being `PendingToClose` and running out of closure grace period,
//! and if this happens, it will issue automatically the final close transaction, which transitions the state to `Closed`.
//! This can be controlled by the `finalize_channel_closure` parameter.
//!
//! For details on default parameters see [MultiStrategyConfig].
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::{
    fmt::{Debug, Display, Formatter},
    sync::Arc,
};
use tracing::{error, warn};
use validator::Validate;

use hopr_chain_actions::ChainActions;
use hopr_internal_types::prelude::*;
use hopr_transport_protocol::ticket_aggregation::processor::TicketAggregatorTrait;

use crate::aggregating::AggregatingStrategy;
use crate::auto_funding::AutoFundingStrategy;
use crate::auto_redeeming::AutoRedeemingStrategy;
use crate::channel_finalizer::ClosureFinalizerStrategy;
use crate::errors::Result;
use crate::promiscuous::PromiscuousStrategy;
use crate::Strategy;

use hopr_db_sql::HoprDbAllOperations;
#[cfg(all(feature = "prometheus", not(test)))]
use {hopr_metrics::metrics::MultiGauge, strum::VariantNames};

#[cfg(all(feature = "prometheus", not(test)))]
lazy_static::lazy_static! {
    static ref METRIC_ENABLED_STRATEGIES: MultiGauge =
        MultiGauge::new("hopr_strategy_enabled_strategies", "List of enabled strategies", &["strategy"]).unwrap();
}

/// Basic single strategy.
#[cfg_attr(test, mockall::automock)]
#[async_trait]
pub trait SingularStrategy: Display {
    /// Strategy event raised at period intervals (typically each 1 minute).
    async fn on_tick(&self) -> Result<()> {
        Ok(())
    }

    /// Strategy event raised when a new **winning** acknowledged ticket is received in a channel
    async fn on_acknowledged_winning_ticket(&self, _ack: &AcknowledgedTicket) -> Result<()> {
        Ok(())
    }

    /// Strategy event raised whenever the Indexer registers a change on node's own channel.
    async fn on_own_channel_changed(
        &self,
        _channel: &ChannelEntry,
        _direction: ChannelDirection,
        _change: ChannelChange,
    ) -> Result<()> {
        Ok(())
    }
}

#[inline]
fn just_true() -> bool {
    true
}

#[inline]
fn sixty() -> u64 {
    60
}

#[inline]
fn empty_vector() -> Vec<Strategy> {
    vec![]
}

/// Configuration options for the `MultiStrategy` chain.
/// If `fail_on_continue` is set, the `MultiStrategy` sequence behaves as logical AND chain,
/// otherwise it behaves like a logical OR chain.
#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct MultiStrategyConfig {
    /// Determines if the strategy should continue executing the next strategy if the current one failed.
    /// If set to `true`, the strategy behaves like a logical AND chain of `SingularStrategies`
    /// Otherwise, it behaves like a logical OR chain of `SingularStrategies`.
    ///
    /// Default is true.
    #[default = true]
    #[serde(default = "just_true")]
    pub on_fail_continue: bool,

    /// Indicate whether the `MultiStrategy` can contain another `MultiStrategy`.
    ///
    /// Default is true.
    #[default = true]
    #[serde(default = "just_true")]
    pub allow_recursive: bool,

    /// Execution interval of the configured strategies in seconds.
    ///
    /// Default is 60, minimum is 1.
    #[default = 60]
    #[serde(default = "sixty")]
    #[validate(range(min = 1))]
    pub execution_interval: u64,

    /// Configuration of individual sub-strategies.
    ///
    /// Default is empty, which makes the `MultiStrategy` behave as passive.
    #[default(_code = "vec![]")]
    #[serde(default = "empty_vector")]
    pub strategies: Vec<Strategy>,
}

/// Defines an execution chain of `SingularStrategies`.
/// The `MultiStrategy` itself also implements the `SingularStrategy` trait,
/// which makes it possible (along with different `on_fail_continue` policies) to construct
/// various logical strategy chains.
pub struct MultiStrategy {
    strategies: Vec<Box<dyn SingularStrategy + Send + Sync>>,
    cfg: MultiStrategyConfig,
}

impl MultiStrategy {
    /// Constructs new `MultiStrategy`.
    /// The strategy can contain another `MultiStrategy` if `allow_recursive` is set.
    pub fn new<Db>(
        cfg: MultiStrategyConfig,
        db: Db,
        hopr_chain_actions: ChainActions<Db>,
        ticket_aggregator: Arc<dyn TicketAggregatorTrait + Send + Sync + 'static>,
    ) -> Self
    where
        Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
    {
        let mut strategies = Vec::<Box<dyn SingularStrategy + Send + Sync>>::new();

        #[cfg(all(feature = "prometheus", not(test)))]
        Strategy::VARIANTS
            .iter()
            .for_each(|s| METRIC_ENABLED_STRATEGIES.set(&[*s], 0_f64));

        for strategy in cfg.strategies.iter() {
            match strategy {
                Strategy::Promiscuous(sub_cfg) => strategies.push(Box::new(PromiscuousStrategy::new(
                    sub_cfg.clone(),
                    db.clone(),
                    hopr_chain_actions.clone(),
                ))),
                Strategy::Aggregating(sub_cfg) => strategies.push(Box::new(AggregatingStrategy::new(
                    *sub_cfg,
                    db.clone(),
                    ticket_aggregator.clone(),
                ))),
                Strategy::AutoRedeeming(sub_cfg) => strategies.push(Box::new(AutoRedeemingStrategy::new(
                    *sub_cfg,
                    db.clone(),
                    hopr_chain_actions.clone(),
                ))),
                Strategy::AutoFunding(sub_cfg) => {
                    strategies.push(Box::new(AutoFundingStrategy::new(*sub_cfg, hopr_chain_actions.clone())))
                }
                Strategy::ClosureFinalizer(sub_cfg) => strategies.push(Box::new(ClosureFinalizerStrategy::new(
                    *sub_cfg,
                    db.clone(),
                    hopr_chain_actions.clone(),
                ))),
                Strategy::Multi(sub_cfg) => {
                    if cfg.allow_recursive {
                        let mut cfg_clone = sub_cfg.clone();
                        cfg_clone.allow_recursive = false; // Do not allow more levels of recursion

                        strategies.push(Box::new(Self::new(
                            cfg_clone,
                            db.clone(),
                            hopr_chain_actions.clone(),
                            ticket_aggregator.clone(),
                        )))
                    } else {
                        error!("recursive multi-strategy not allowed and skipped")
                    }
                }

                // Passive strategy = empty MultiStrategy
                Strategy::Passive => strategies.push(Box::new(Self {
                    cfg: Default::default(),
                    strategies: Vec::new(),
                })),
            }

            #[cfg(all(feature = "prometheus", not(test)))]
            METRIC_ENABLED_STRATEGIES.set(&[&strategy.to_string()], 1_f64);
        }

        Self { strategies, cfg }
    }
}

impl Debug for MultiStrategy {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "{:?}", Strategy::Multi(self.cfg.clone()))
    }
}

impl Display for MultiStrategy {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", Strategy::Multi(self.cfg.clone()))
    }
}

#[async_trait]
impl SingularStrategy for MultiStrategy {
    async fn on_tick(&self) -> Result<()> {
        for strategy in self.strategies.iter() {
            if let Err(e) = strategy.on_tick().await {
                if !self.cfg.on_fail_continue {
                    warn!(%self, %strategy, "on_tick chain stopped at strategy");
                    return Err(e);
                }
            }
        }
        Ok(())
    }

    async fn on_acknowledged_winning_ticket(&self, ack: &AcknowledgedTicket) -> Result<()> {
        for strategy in self.strategies.iter() {
            if let Err(e) = strategy.on_acknowledged_winning_ticket(ack).await {
                if !self.cfg.on_fail_continue {
                    warn!(%self, %strategy, "on_acknowledged_ticket chain stopped at strategy");
                    return Err(e);
                }
            }
        }
        Ok(())
    }

    async fn on_own_channel_changed(
        &self,
        channel: &ChannelEntry,
        direction: ChannelDirection,
        change: ChannelChange,
    ) -> Result<()> {
        for strategy in self.strategies.iter() {
            if let Err(e) = strategy.on_own_channel_changed(channel, direction, change).await {
                if !self.cfg.on_fail_continue {
                    warn!(%self, "on_channel_state_changed chain stopped at strategy");
                    return Err(e);
                }
            }
        }
        Ok(())
    }
}

#[cfg(test)]
impl Display for MockSingularStrategy {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "mock")
    }
}

#[cfg(test)]
mod tests {
    use crate::errors::StrategyError::Other;
    use crate::strategy::{MockSingularStrategy, MultiStrategy, MultiStrategyConfig, SingularStrategy};
    use mockall::Sequence;

    #[async_std::test]
    async fn test_multi_strategy_logical_or_flow() -> anyhow::Result<()> {
        let mut seq = Sequence::new();

        let mut s1 = MockSingularStrategy::new();
        s1.expect_on_tick()
            .times(1)
            .in_sequence(&mut seq)
            .returning(|| Err(Other("error".into())));

        let mut s2 = MockSingularStrategy::new();
        s2.expect_on_tick().times(1).in_sequence(&mut seq).returning(|| Ok(()));

        let cfg = MultiStrategyConfig {
            on_fail_continue: true,
            allow_recursive: true,
            execution_interval: 1,
            strategies: Vec::new(),
        };

        let ms = MultiStrategy {
            strategies: vec![Box::new(s1), Box::new(s2)],
            cfg,
        };
        ms.on_tick().await?;

        Ok(())
    }

    #[async_std::test]
    async fn test_multi_strategy_logical_and_flow() {
        let mut seq = Sequence::new();

        let mut s1 = MockSingularStrategy::new();
        s1.expect_on_tick()
            .times(1)
            .in_sequence(&mut seq)
            .returning(|| Err(Other("error".into())));

        let mut s2 = MockSingularStrategy::new();
        s2.expect_on_tick().never().in_sequence(&mut seq).returning(|| Ok(()));

        let cfg = MultiStrategyConfig {
            on_fail_continue: false,
            allow_recursive: true,
            execution_interval: 1,
            strategies: Vec::new(),
        };

        let ms = MultiStrategy {
            strategies: vec![Box::new(s1), Box::new(s2)],
            cfg,
        };
        ms.on_tick().await.expect_err("on_tick should fail");
    }
}