1use std::fmt::{Debug, Display, Formatter};
22
23use async_trait::async_trait;
24use hopr_lib::{
25 ChannelChange, ChannelDirection, ChannelEntry, VerifiedTicket,
26 api::{
27 chain::{ChainReadChannelOperations, ChainReadSafeOperations, ChainValues, ChainWriteChannelOperations},
28 db::TicketSelector,
29 },
30};
31use serde::{Deserialize, Serialize};
32#[cfg(all(feature = "prometheus", not(test)))]
33use strum::VariantNames;
34use tracing::{error, warn};
35use validator::{Validate, ValidationError};
36
37use crate::{
38 Strategy,
39 auto_funding::AutoFundingStrategy,
40 auto_redeeming::AutoRedeemingStrategy,
41 channel_finalizer::ClosureFinalizerStrategy,
42 errors::{Result, StrategyError},
43};
44
45#[cfg(all(feature = "prometheus", not(test)))]
46lazy_static::lazy_static! {
47 static ref METRIC_ENABLED_STRATEGIES: hopr_metrics::MultiGauge =
48 hopr_metrics::MultiGauge::new("hopr_strategy_enabled_strategies", "List of enabled strategies", &["strategy"]).unwrap();
49}
50
51#[cfg_attr(test, mockall::automock)]
53#[async_trait]
54pub trait SingularStrategy: Display {
55 async fn on_tick(&self) -> Result<()> {
57 Ok(())
58 }
59
60 async fn on_acknowledged_winning_ticket(&self, _ack: &VerifiedTicket) -> Result<()> {
62 Ok(())
63 }
64
65 async fn on_own_channel_changed(
67 &self,
68 _channel: &ChannelEntry,
69 _direction: ChannelDirection,
70 _change: ChannelChange,
71 ) -> Result<()> {
72 Ok(())
73 }
74}
75
76#[inline]
77fn just_true() -> bool {
78 true
79}
80
81#[inline]
82fn sixty_seconds() -> std::time::Duration {
83 std::time::Duration::from_secs(60)
84}
85
86#[inline]
87fn empty_vector() -> Vec<Strategy> {
88 vec![]
89}
90
91fn validate_execution_interval(interval: &std::time::Duration) -> std::result::Result<(), ValidationError> {
92 if interval < &std::time::Duration::from_secs(10) {
93 Err(ValidationError::new(
94 "strategy execution interval must be at least 1 second",
95 ))
96 } else {
97 Ok(())
98 }
99}
100
101#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
105#[serde(deny_unknown_fields)]
106pub struct MultiStrategyConfig {
107 #[default = true]
113 #[serde(default = "just_true")]
114 pub on_fail_continue: bool,
115
116 #[default = true]
120 #[serde(default = "just_true")]
121 pub allow_recursive: bool,
122
123 #[default(sixty_seconds())]
127 #[serde(default = "sixty_seconds", with = "humantime_serde")]
128 #[validate(custom(function = "validate_execution_interval"))]
129 pub execution_interval: std::time::Duration,
130
131 #[default(_code = "vec![]")]
135 #[serde(default = "empty_vector")]
136 pub strategies: Vec<Strategy>,
137}
138
139pub struct MultiStrategy {
144 strategies: Vec<Box<dyn SingularStrategy + Send + Sync>>,
145 cfg: MultiStrategyConfig,
146}
147
148impl MultiStrategy {
149 pub fn new<A, R>(cfg: MultiStrategyConfig, hopr_chain_actions: A, redeem_sink: R) -> Self
153 where
154 A: ChainReadChannelOperations
155 + ChainReadSafeOperations
156 + ChainValues
157 + ChainWriteChannelOperations
158 + Clone
159 + Send
160 + Sync
161 + 'static,
162 R: futures::Sink<TicketSelector> + Sync + Send + Clone + 'static,
163 StrategyError: From<R::Error>,
164 {
165 let mut strategies = Vec::<Box<dyn SingularStrategy + Send + Sync>>::new();
166
167 #[cfg(all(feature = "prometheus", not(test)))]
168 Strategy::VARIANTS
169 .iter()
170 .for_each(|s| METRIC_ENABLED_STRATEGIES.set(&[*s], 0_f64));
171
172 for strategy in cfg.strategies.iter() {
173 match strategy {
174 Strategy::AutoRedeeming(sub_cfg) => strategies.push(Box::new(AutoRedeemingStrategy::new(
175 *sub_cfg,
176 hopr_chain_actions.clone(),
177 redeem_sink.clone(),
178 ))),
179 Strategy::AutoFunding(sub_cfg) => {
180 strategies.push(Box::new(AutoFundingStrategy::new(*sub_cfg, hopr_chain_actions.clone())))
181 }
182 Strategy::ClosureFinalizer(sub_cfg) => strategies.push(Box::new(ClosureFinalizerStrategy::new(
183 *sub_cfg,
184 hopr_chain_actions.clone(),
185 ))),
186 Strategy::Multi(sub_cfg) => {
187 if cfg.allow_recursive {
188 let mut cfg_clone = sub_cfg.clone();
189 cfg_clone.allow_recursive = false; strategies.push(Box::new(Self::new(
192 cfg_clone,
193 hopr_chain_actions.clone(),
194 redeem_sink.clone(),
195 )))
196 } else {
197 error!("recursive multi-strategy not allowed and skipped")
198 }
199 }
200
201 Strategy::Passive => strategies.push(Box::new(Self {
203 cfg: Default::default(),
204 strategies: Vec::new(),
205 })),
206 }
207
208 #[cfg(all(feature = "prometheus", not(test)))]
209 METRIC_ENABLED_STRATEGIES.set(&[&strategy.to_string()], 1_f64);
210 }
211
212 Self { strategies, cfg }
213 }
214}
215
216impl Debug for MultiStrategy {
217 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
218 write!(f, "{:?}", Strategy::Multi(self.cfg.clone()))
219 }
220}
221
222impl Display for MultiStrategy {
223 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
224 write!(f, "{}", Strategy::Multi(self.cfg.clone()))
225 }
226}
227
228#[async_trait]
229impl SingularStrategy for MultiStrategy {
230 async fn on_tick(&self) -> Result<()> {
231 for strategy in self.strategies.iter() {
232 if let Err(e) = strategy.on_tick().await
233 && !self.cfg.on_fail_continue
234 {
235 warn!(%self, %strategy, "on_tick chain stopped at strategy");
236 return Err(e);
237 }
238 }
239 Ok(())
240 }
241
242 async fn on_acknowledged_winning_ticket(&self, ack: &VerifiedTicket) -> Result<()> {
243 for strategy in self.strategies.iter() {
244 if let Err(e) = strategy.on_acknowledged_winning_ticket(ack).await
245 && !self.cfg.on_fail_continue
246 {
247 warn!(%self, %strategy, "on_acknowledged_ticket chain stopped at strategy");
248 return Err(e);
249 }
250 }
251 Ok(())
252 }
253
254 async fn on_own_channel_changed(
255 &self,
256 channel: &ChannelEntry,
257 direction: ChannelDirection,
258 change: ChannelChange,
259 ) -> Result<()> {
260 for strategy in self.strategies.iter() {
261 if let Err(e) = strategy.on_own_channel_changed(channel, direction, change).await
262 && !self.cfg.on_fail_continue
263 {
264 warn!(%self, "on_channel_state_changed chain stopped at strategy");
265 return Err(e);
266 }
267 }
268 Ok(())
269 }
270}
271
272#[cfg(test)]
273impl Display for MockSingularStrategy {
274 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
275 write!(f, "mock")
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use mockall::Sequence;
282
283 use crate::{
284 errors::StrategyError::Other,
285 strategy::{MockSingularStrategy, MultiStrategy, MultiStrategyConfig, SingularStrategy},
286 };
287
288 #[tokio::test]
289 async fn test_multi_strategy_logical_or_flow() -> anyhow::Result<()> {
290 let mut seq = Sequence::new();
291
292 let mut s1 = MockSingularStrategy::new();
293 s1.expect_on_tick()
294 .times(1)
295 .in_sequence(&mut seq)
296 .returning(|| Err(Other("error".into())));
297
298 let mut s2 = MockSingularStrategy::new();
299 s2.expect_on_tick().times(1).in_sequence(&mut seq).returning(|| Ok(()));
300
301 let cfg = MultiStrategyConfig {
302 on_fail_continue: true,
303 allow_recursive: true,
304 execution_interval: std::time::Duration::from_secs(1),
305 strategies: Vec::new(),
306 };
307
308 let ms = MultiStrategy {
309 strategies: vec![Box::new(s1), Box::new(s2)],
310 cfg,
311 };
312 ms.on_tick().await?;
313
314 Ok(())
315 }
316
317 #[tokio::test]
318 async fn test_multi_strategy_logical_and_flow() {
319 let mut seq = Sequence::new();
320
321 let mut s1 = MockSingularStrategy::new();
322 s1.expect_on_tick()
323 .times(1)
324 .in_sequence(&mut seq)
325 .returning(|| Err(Other("error".into())));
326
327 let mut s2 = MockSingularStrategy::new();
328 s2.expect_on_tick().never().in_sequence(&mut seq).returning(|| Ok(()));
329
330 let cfg = MultiStrategyConfig {
331 on_fail_continue: false,
332 allow_recursive: true,
333 execution_interval: std::time::Duration::from_secs(1),
334 strategies: Vec::new(),
335 };
336
337 let ms = MultiStrategy {
338 strategies: vec![Box::new(s1), Box::new(s2)],
339 cfg,
340 };
341 ms.on_tick().await.expect_err("on_tick should fail");
342 }
343}