1use std::fmt::{Debug, Display, Formatter};
22
23use async_trait::async_trait;
24use hopr_lib::{
25 ChannelChange, ChannelDirection, ChannelEntry, VerifiedTicket,
26 api::{
27 chain::{ChainReadChannelOperations, ChainWriteChannelOperations},
28 db::TicketSelector,
29 },
30};
31use serde::{Deserialize, Serialize};
32#[cfg(all(feature = "prometheus", not(test)))]
33use strum::VariantNames;
34use tracing::{error, warn};
35use validator::{Validate, ValidationError};
36
37use crate::{
38 Strategy,
39 auto_funding::AutoFundingStrategy,
40 auto_redeeming::AutoRedeemingStrategy,
41 channel_finalizer::ClosureFinalizerStrategy,
42 errors::{Result, StrategyError},
43};
44
45#[cfg(all(feature = "prometheus", not(test)))]
46lazy_static::lazy_static! {
47 static ref METRIC_ENABLED_STRATEGIES: hopr_metrics::MultiGauge =
48 hopr_metrics::MultiGauge::new("hopr_strategy_enabled_strategies", "List of enabled strategies", &["strategy"]).unwrap();
49}
50
51#[cfg_attr(test, mockall::automock)]
53#[async_trait]
54pub trait SingularStrategy: Display {
55 async fn on_tick(&self) -> Result<()> {
57 Ok(())
58 }
59
60 async fn on_acknowledged_winning_ticket(&self, _ack: &VerifiedTicket) -> Result<()> {
62 Ok(())
63 }
64
65 async fn on_own_channel_changed(
67 &self,
68 _channel: &ChannelEntry,
69 _direction: ChannelDirection,
70 _change: ChannelChange,
71 ) -> Result<()> {
72 Ok(())
73 }
74}
75
76#[inline]
77fn just_true() -> bool {
78 true
79}
80
81#[inline]
82fn sixty_seconds() -> std::time::Duration {
83 std::time::Duration::from_secs(60)
84}
85
86#[inline]
87fn empty_vector() -> Vec<Strategy> {
88 vec![]
89}
90
91fn validate_execution_interval(interval: &std::time::Duration) -> std::result::Result<(), ValidationError> {
92 if interval < &std::time::Duration::from_secs(10) {
93 Err(ValidationError::new(
94 "strategy execution interval must be at least 1 second",
95 ))
96 } else {
97 Ok(())
98 }
99}
100
101#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
105#[serde(deny_unknown_fields)]
106pub struct MultiStrategyConfig {
107 #[default = true]
113 #[serde(default = "just_true")]
114 pub on_fail_continue: bool,
115
116 #[default = true]
120 #[serde(default = "just_true")]
121 pub allow_recursive: bool,
122
123 #[default(sixty_seconds())]
127 #[serde(default = "sixty_seconds", with = "humantime_serde")]
128 #[validate(custom(function = "validate_execution_interval"))]
129 pub execution_interval: std::time::Duration,
130
131 #[default(_code = "vec![]")]
135 #[serde(default = "empty_vector")]
136 pub strategies: Vec<Strategy>,
137}
138
139pub struct MultiStrategy {
144 strategies: Vec<Box<dyn SingularStrategy + Send + Sync>>,
145 cfg: MultiStrategyConfig,
146}
147
148impl MultiStrategy {
149 pub fn new<A, R>(cfg: MultiStrategyConfig, hopr_chain_actions: A, redeem_sink: R) -> Self
153 where
154 A: ChainReadChannelOperations + ChainWriteChannelOperations + Clone + Send + Sync + 'static,
155 R: futures::Sink<TicketSelector> + Sync + Send + Clone + 'static,
156 StrategyError: From<R::Error>,
157 {
158 let mut strategies = Vec::<Box<dyn SingularStrategy + Send + Sync>>::new();
159
160 #[cfg(all(feature = "prometheus", not(test)))]
161 Strategy::VARIANTS
162 .iter()
163 .for_each(|s| METRIC_ENABLED_STRATEGIES.set(&[*s], 0_f64));
164
165 for strategy in cfg.strategies.iter() {
166 match strategy {
167 Strategy::AutoRedeeming(sub_cfg) => strategies.push(Box::new(AutoRedeemingStrategy::new(
168 *sub_cfg,
169 hopr_chain_actions.clone(),
170 redeem_sink.clone(),
171 ))),
172 Strategy::AutoFunding(sub_cfg) => {
173 strategies.push(Box::new(AutoFundingStrategy::new(*sub_cfg, hopr_chain_actions.clone())))
174 }
175 Strategy::ClosureFinalizer(sub_cfg) => strategies.push(Box::new(ClosureFinalizerStrategy::new(
176 *sub_cfg,
177 hopr_chain_actions.clone(),
178 ))),
179 Strategy::Multi(sub_cfg) => {
180 if cfg.allow_recursive {
181 let mut cfg_clone = sub_cfg.clone();
182 cfg_clone.allow_recursive = false; strategies.push(Box::new(Self::new(
185 cfg_clone,
186 hopr_chain_actions.clone(),
187 redeem_sink.clone(),
188 )))
189 } else {
190 error!("recursive multi-strategy not allowed and skipped")
191 }
192 }
193
194 Strategy::Passive => strategies.push(Box::new(Self {
196 cfg: Default::default(),
197 strategies: Vec::new(),
198 })),
199 }
200
201 #[cfg(all(feature = "prometheus", not(test)))]
202 METRIC_ENABLED_STRATEGIES.set(&[&strategy.to_string()], 1_f64);
203 }
204
205 Self { strategies, cfg }
206 }
207}
208
209impl Debug for MultiStrategy {
210 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
211 write!(f, "{:?}", Strategy::Multi(self.cfg.clone()))
212 }
213}
214
215impl Display for MultiStrategy {
216 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
217 write!(f, "{}", Strategy::Multi(self.cfg.clone()))
218 }
219}
220
221#[async_trait]
222impl SingularStrategy for MultiStrategy {
223 async fn on_tick(&self) -> Result<()> {
224 for strategy in self.strategies.iter() {
225 if let Err(e) = strategy.on_tick().await {
226 if !self.cfg.on_fail_continue {
227 warn!(%self, %strategy, "on_tick chain stopped at strategy");
228 return Err(e);
229 }
230 }
231 }
232 Ok(())
233 }
234
235 async fn on_acknowledged_winning_ticket(&self, ack: &VerifiedTicket) -> Result<()> {
236 for strategy in self.strategies.iter() {
237 if let Err(e) = strategy.on_acknowledged_winning_ticket(ack).await {
238 if !self.cfg.on_fail_continue {
239 warn!(%self, %strategy, "on_acknowledged_ticket chain stopped at strategy");
240 return Err(e);
241 }
242 }
243 }
244 Ok(())
245 }
246
247 async fn on_own_channel_changed(
248 &self,
249 channel: &ChannelEntry,
250 direction: ChannelDirection,
251 change: ChannelChange,
252 ) -> Result<()> {
253 for strategy in self.strategies.iter() {
254 if let Err(e) = strategy.on_own_channel_changed(channel, direction, change).await {
255 if !self.cfg.on_fail_continue {
256 warn!(%self, "on_channel_state_changed chain stopped at strategy");
257 return Err(e);
258 }
259 }
260 }
261 Ok(())
262 }
263}
264
265#[cfg(test)]
266impl Display for MockSingularStrategy {
267 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
268 write!(f, "mock")
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use mockall::Sequence;
275
276 use crate::{
277 errors::StrategyError::Other,
278 strategy::{MockSingularStrategy, MultiStrategy, MultiStrategyConfig, SingularStrategy},
279 };
280
281 #[tokio::test]
282 async fn test_multi_strategy_logical_or_flow() -> anyhow::Result<()> {
283 let mut seq = Sequence::new();
284
285 let mut s1 = MockSingularStrategy::new();
286 s1.expect_on_tick()
287 .times(1)
288 .in_sequence(&mut seq)
289 .returning(|| Err(Other("error".into())));
290
291 let mut s2 = MockSingularStrategy::new();
292 s2.expect_on_tick().times(1).in_sequence(&mut seq).returning(|| Ok(()));
293
294 let cfg = MultiStrategyConfig {
295 on_fail_continue: true,
296 allow_recursive: true,
297 execution_interval: std::time::Duration::from_secs(1),
298 strategies: Vec::new(),
299 };
300
301 let ms = MultiStrategy {
302 strategies: vec![Box::new(s1), Box::new(s2)],
303 cfg,
304 };
305 ms.on_tick().await?;
306
307 Ok(())
308 }
309
310 #[tokio::test]
311 async fn test_multi_strategy_logical_and_flow() {
312 let mut seq = Sequence::new();
313
314 let mut s1 = MockSingularStrategy::new();
315 s1.expect_on_tick()
316 .times(1)
317 .in_sequence(&mut seq)
318 .returning(|| Err(Other("error".into())));
319
320 let mut s2 = MockSingularStrategy::new();
321 s2.expect_on_tick().never().in_sequence(&mut seq).returning(|| Ok(()));
322
323 let cfg = MultiStrategyConfig {
324 on_fail_continue: false,
325 allow_recursive: true,
326 execution_interval: std::time::Duration::from_secs(1),
327 strategies: Vec::new(),
328 };
329
330 let ms = MultiStrategy {
331 strategies: vec![Box::new(s1), Box::new(s2)],
332 cfg,
333 };
334 ms.on_tick().await.expect_err("on_tick should fail");
335 }
336}