1use std::fmt::{Debug, Display, Formatter};
22
23use async_trait::async_trait;
24use hopr_lib::{
25 ChannelChange, ChannelDirection, ChannelEntry, VerifiedTicket,
26 exports::api::{
27 chain::{ChainReadChannelOperations, ChainWriteChannelOperations},
28 db::TicketSelector,
29 },
30};
31use serde::{Deserialize, Serialize};
32use serde_with::serde_as;
33#[cfg(all(feature = "prometheus", not(test)))]
34use strum::VariantNames;
35use tracing::{error, warn};
36use validator::{Validate, ValidationError};
37
38use crate::{
39 Strategy,
40 auto_funding::AutoFundingStrategy,
41 auto_redeeming::AutoRedeemingStrategy,
42 channel_finalizer::ClosureFinalizerStrategy,
43 errors::{Result, StrategyError},
44};
45
46#[cfg(all(feature = "prometheus", not(test)))]
47lazy_static::lazy_static! {
48 static ref METRIC_ENABLED_STRATEGIES: hopr_metrics::MultiGauge =
49 hopr_metrics::MultiGauge::new("hopr_strategy_enabled_strategies", "List of enabled strategies", &["strategy"]).unwrap();
50}
51
52#[cfg_attr(test, mockall::automock)]
54#[async_trait]
55pub trait SingularStrategy: Display {
56 async fn on_tick(&self) -> Result<()> {
58 Ok(())
59 }
60
61 async fn on_acknowledged_winning_ticket(&self, _ack: &VerifiedTicket) -> Result<()> {
63 Ok(())
64 }
65
66 async fn on_own_channel_changed(
68 &self,
69 _channel: &ChannelEntry,
70 _direction: ChannelDirection,
71 _change: ChannelChange,
72 ) -> Result<()> {
73 Ok(())
74 }
75}
76
77#[inline]
78fn just_true() -> bool {
79 true
80}
81
82#[inline]
83fn sixty_seconds() -> std::time::Duration {
84 std::time::Duration::from_secs(60)
85}
86
87#[inline]
88fn empty_vector() -> Vec<Strategy> {
89 vec![]
90}
91
92fn validate_execution_interval(interval: &std::time::Duration) -> std::result::Result<(), ValidationError> {
93 if interval < &std::time::Duration::from_secs(10) {
94 Err(ValidationError::new(
95 "strategy execution interval must be at least 1 second",
96 ))
97 } else {
98 Ok(())
99 }
100}
101
102#[serde_as]
106#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
107#[serde(deny_unknown_fields)]
108pub struct MultiStrategyConfig {
109 #[default = true]
115 #[serde(default = "just_true")]
116 pub on_fail_continue: bool,
117
118 #[default = true]
122 #[serde(default = "just_true")]
123 pub allow_recursive: bool,
124
125 #[default(sixty_seconds())]
129 #[serde(default = "sixty_seconds")]
130 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
131 #[validate(custom(function = "validate_execution_interval"))]
132 pub execution_interval: std::time::Duration,
133
134 #[default(_code = "vec![]")]
138 #[serde(default = "empty_vector")]
139 pub strategies: Vec<Strategy>,
140}
141
142pub struct MultiStrategy {
147 strategies: Vec<Box<dyn SingularStrategy + Send + Sync>>,
148 cfg: MultiStrategyConfig,
149}
150
151impl MultiStrategy {
152 pub fn new<A, R>(cfg: MultiStrategyConfig, hopr_chain_actions: A, redeem_sink: R) -> Self
156 where
157 A: ChainReadChannelOperations + ChainWriteChannelOperations + Clone + Send + Sync + 'static,
158 R: futures::Sink<TicketSelector> + Sync + Send + Clone + 'static,
159 StrategyError: From<R::Error>,
160 {
161 let mut strategies = Vec::<Box<dyn SingularStrategy + Send + Sync>>::new();
162
163 #[cfg(all(feature = "prometheus", not(test)))]
164 Strategy::VARIANTS
165 .iter()
166 .for_each(|s| METRIC_ENABLED_STRATEGIES.set(&[*s], 0_f64));
167
168 for strategy in cfg.strategies.iter() {
169 match strategy {
170 Strategy::AutoRedeeming(sub_cfg) => strategies.push(Box::new(AutoRedeemingStrategy::new(
171 *sub_cfg,
172 hopr_chain_actions.clone(),
173 redeem_sink.clone(),
174 ))),
175 Strategy::AutoFunding(sub_cfg) => {
176 strategies.push(Box::new(AutoFundingStrategy::new(*sub_cfg, hopr_chain_actions.clone())))
177 }
178 Strategy::ClosureFinalizer(sub_cfg) => strategies.push(Box::new(ClosureFinalizerStrategy::new(
179 *sub_cfg,
180 hopr_chain_actions.clone(),
181 ))),
182 Strategy::Multi(sub_cfg) => {
183 if cfg.allow_recursive {
184 let mut cfg_clone = sub_cfg.clone();
185 cfg_clone.allow_recursive = false; strategies.push(Box::new(Self::new(
188 cfg_clone,
189 hopr_chain_actions.clone(),
190 redeem_sink.clone(),
191 )))
192 } else {
193 error!("recursive multi-strategy not allowed and skipped")
194 }
195 }
196
197 Strategy::Passive => strategies.push(Box::new(Self {
199 cfg: Default::default(),
200 strategies: Vec::new(),
201 })),
202 }
203
204 #[cfg(all(feature = "prometheus", not(test)))]
205 METRIC_ENABLED_STRATEGIES.set(&[&strategy.to_string()], 1_f64);
206 }
207
208 Self { strategies, cfg }
209 }
210}
211
212impl Debug for MultiStrategy {
213 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
214 write!(f, "{:?}", Strategy::Multi(self.cfg.clone()))
215 }
216}
217
218impl Display for MultiStrategy {
219 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
220 write!(f, "{}", Strategy::Multi(self.cfg.clone()))
221 }
222}
223
224#[async_trait]
225impl SingularStrategy for MultiStrategy {
226 async fn on_tick(&self) -> Result<()> {
227 for strategy in self.strategies.iter() {
228 if let Err(e) = strategy.on_tick().await {
229 if !self.cfg.on_fail_continue {
230 warn!(%self, %strategy, "on_tick chain stopped at strategy");
231 return Err(e);
232 }
233 }
234 }
235 Ok(())
236 }
237
238 async fn on_acknowledged_winning_ticket(&self, ack: &VerifiedTicket) -> Result<()> {
239 for strategy in self.strategies.iter() {
240 if let Err(e) = strategy.on_acknowledged_winning_ticket(ack).await {
241 if !self.cfg.on_fail_continue {
242 warn!(%self, %strategy, "on_acknowledged_ticket chain stopped at strategy");
243 return Err(e);
244 }
245 }
246 }
247 Ok(())
248 }
249
250 async fn on_own_channel_changed(
251 &self,
252 channel: &ChannelEntry,
253 direction: ChannelDirection,
254 change: ChannelChange,
255 ) -> Result<()> {
256 for strategy in self.strategies.iter() {
257 if let Err(e) = strategy.on_own_channel_changed(channel, direction, change).await {
258 if !self.cfg.on_fail_continue {
259 warn!(%self, "on_channel_state_changed chain stopped at strategy");
260 return Err(e);
261 }
262 }
263 }
264 Ok(())
265 }
266}
267
268#[cfg(test)]
269impl Display for MockSingularStrategy {
270 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
271 write!(f, "mock")
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use mockall::Sequence;
278
279 use crate::{
280 errors::StrategyError::Other,
281 strategy::{MockSingularStrategy, MultiStrategy, MultiStrategyConfig, SingularStrategy},
282 };
283
284 #[tokio::test]
285 async fn test_multi_strategy_logical_or_flow() -> anyhow::Result<()> {
286 let mut seq = Sequence::new();
287
288 let mut s1 = MockSingularStrategy::new();
289 s1.expect_on_tick()
290 .times(1)
291 .in_sequence(&mut seq)
292 .returning(|| Err(Other("error".into())));
293
294 let mut s2 = MockSingularStrategy::new();
295 s2.expect_on_tick().times(1).in_sequence(&mut seq).returning(|| Ok(()));
296
297 let cfg = MultiStrategyConfig {
298 on_fail_continue: true,
299 allow_recursive: true,
300 execution_interval: std::time::Duration::from_secs(1),
301 strategies: Vec::new(),
302 };
303
304 let ms = MultiStrategy {
305 strategies: vec![Box::new(s1), Box::new(s2)],
306 cfg,
307 };
308 ms.on_tick().await?;
309
310 Ok(())
311 }
312
313 #[tokio::test]
314 async fn test_multi_strategy_logical_and_flow() {
315 let mut seq = Sequence::new();
316
317 let mut s1 = MockSingularStrategy::new();
318 s1.expect_on_tick()
319 .times(1)
320 .in_sequence(&mut seq)
321 .returning(|| Err(Other("error".into())));
322
323 let mut s2 = MockSingularStrategy::new();
324 s2.expect_on_tick().never().in_sequence(&mut seq).returning(|| Ok(()));
325
326 let cfg = MultiStrategyConfig {
327 on_fail_continue: false,
328 allow_recursive: true,
329 execution_interval: std::time::Duration::from_secs(1),
330 strategies: Vec::new(),
331 };
332
333 let ms = MultiStrategy {
334 strategies: vec![Box::new(s1), Box::new(s2)],
335 cfg,
336 };
337 ms.on_tick().await.expect_err("on_tick should fail");
338 }
339}