1use std::fmt::{Debug, Display, Formatter};
22
23use async_trait::async_trait;
24use hopr_api::chain::{ChainReadChannelOperations, ChainWriteChannelOperations, ChainWriteTicketOperations};
25use hopr_internal_types::prelude::*;
26use serde::{Deserialize, Serialize};
27#[cfg(all(feature = "prometheus", not(test)))]
28use strum::VariantNames;
29use tracing::{error, warn};
30use validator::Validate;
31
32use crate::{
33 Strategy, auto_funding::AutoFundingStrategy, auto_redeeming::AutoRedeemingStrategy,
34 channel_finalizer::ClosureFinalizerStrategy, errors::Result,
35};
36
37#[cfg(all(feature = "prometheus", not(test)))]
38lazy_static::lazy_static! {
39 static ref METRIC_ENABLED_STRATEGIES: hopr_metrics::MultiGauge =
40 hopr_metrics::MultiGauge::new("hopr_strategy_enabled_strategies", "List of enabled strategies", &["strategy"]).unwrap();
41}
42
43#[cfg_attr(test, mockall::automock)]
45#[async_trait]
46pub trait SingularStrategy: Display {
47 async fn on_tick(&self) -> Result<()> {
49 Ok(())
50 }
51
52 async fn on_acknowledged_winning_ticket(&self, _ack: &AcknowledgedTicket) -> Result<()> {
54 Ok(())
55 }
56
57 async fn on_own_channel_changed(
59 &self,
60 _channel: &ChannelEntry,
61 _direction: ChannelDirection,
62 _change: ChannelChange,
63 ) -> Result<()> {
64 Ok(())
65 }
66}
67
68#[inline]
69fn just_true() -> bool {
70 true
71}
72
73#[inline]
74fn sixty() -> u64 {
75 60
76}
77
78#[inline]
79fn empty_vector() -> Vec<Strategy> {
80 vec![]
81}
82
83#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
87#[serde(deny_unknown_fields)]
88pub struct MultiStrategyConfig {
89 #[default = true]
95 #[serde(default = "just_true")]
96 pub on_fail_continue: bool,
97
98 #[default = true]
102 #[serde(default = "just_true")]
103 pub allow_recursive: bool,
104
105 #[default = 60]
109 #[serde(default = "sixty")]
110 #[validate(range(min = 1))]
111 pub execution_interval: u64,
112
113 #[default(_code = "vec![]")]
117 #[serde(default = "empty_vector")]
118 pub strategies: Vec<Strategy>,
119}
120
121pub struct MultiStrategy {
126 strategies: Vec<Box<dyn SingularStrategy + Send + Sync>>,
127 cfg: MultiStrategyConfig,
128}
129
130impl MultiStrategy {
131 pub fn new<A>(cfg: MultiStrategyConfig, hopr_chain_actions: A) -> Self
134 where
135 A: ChainReadChannelOperations
136 + ChainWriteChannelOperations
137 + ChainWriteTicketOperations
138 + Clone
139 + Send
140 + Sync
141 + 'static,
142 {
143 let mut strategies = Vec::<Box<dyn SingularStrategy + Send + Sync>>::new();
144
145 #[cfg(all(feature = "prometheus", not(test)))]
146 Strategy::VARIANTS
147 .iter()
148 .for_each(|s| METRIC_ENABLED_STRATEGIES.set(&[*s], 0_f64));
149
150 for strategy in cfg.strategies.iter() {
151 match strategy {
152 Strategy::AutoRedeeming(sub_cfg) => strategies.push(Box::new(AutoRedeemingStrategy::new(
153 *sub_cfg,
154 hopr_chain_actions.clone(),
155 ))),
156 Strategy::AutoFunding(sub_cfg) => {
157 strategies.push(Box::new(AutoFundingStrategy::new(*sub_cfg, hopr_chain_actions.clone())))
158 }
159 Strategy::ClosureFinalizer(sub_cfg) => strategies.push(Box::new(ClosureFinalizerStrategy::new(
160 *sub_cfg,
161 hopr_chain_actions.clone(),
162 ))),
163 Strategy::Multi(sub_cfg) => {
164 if cfg.allow_recursive {
165 let mut cfg_clone = sub_cfg.clone();
166 cfg_clone.allow_recursive = false; strategies.push(Box::new(Self::new(cfg_clone, hopr_chain_actions.clone())))
169 } else {
170 error!("recursive multi-strategy not allowed and skipped")
171 }
172 }
173
174 Strategy::Passive => strategies.push(Box::new(Self {
176 cfg: Default::default(),
177 strategies: Vec::new(),
178 })),
179 }
180
181 #[cfg(all(feature = "prometheus", not(test)))]
182 METRIC_ENABLED_STRATEGIES.set(&[&strategy.to_string()], 1_f64);
183 }
184
185 Self { strategies, cfg }
186 }
187}
188
189impl Debug for MultiStrategy {
190 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
191 write!(f, "{:?}", Strategy::Multi(self.cfg.clone()))
192 }
193}
194
195impl Display for MultiStrategy {
196 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
197 write!(f, "{}", Strategy::Multi(self.cfg.clone()))
198 }
199}
200
201#[async_trait]
202impl SingularStrategy for MultiStrategy {
203 async fn on_tick(&self) -> Result<()> {
204 for strategy in self.strategies.iter() {
205 if let Err(e) = strategy.on_tick().await {
206 if !self.cfg.on_fail_continue {
207 warn!(%self, %strategy, "on_tick chain stopped at strategy");
208 return Err(e);
209 }
210 }
211 }
212 Ok(())
213 }
214
215 async fn on_acknowledged_winning_ticket(&self, ack: &AcknowledgedTicket) -> Result<()> {
216 for strategy in self.strategies.iter() {
217 if let Err(e) = strategy.on_acknowledged_winning_ticket(ack).await {
218 if !self.cfg.on_fail_continue {
219 warn!(%self, %strategy, "on_acknowledged_ticket chain stopped at strategy");
220 return Err(e);
221 }
222 }
223 }
224 Ok(())
225 }
226
227 async fn on_own_channel_changed(
228 &self,
229 channel: &ChannelEntry,
230 direction: ChannelDirection,
231 change: ChannelChange,
232 ) -> Result<()> {
233 for strategy in self.strategies.iter() {
234 if let Err(e) = strategy.on_own_channel_changed(channel, direction, change).await {
235 if !self.cfg.on_fail_continue {
236 warn!(%self, "on_channel_state_changed chain stopped at strategy");
237 return Err(e);
238 }
239 }
240 }
241 Ok(())
242 }
243}
244
245#[cfg(test)]
246impl Display for MockSingularStrategy {
247 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
248 write!(f, "mock")
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use mockall::Sequence;
255
256 use crate::{
257 errors::StrategyError::Other,
258 strategy::{MockSingularStrategy, MultiStrategy, MultiStrategyConfig, SingularStrategy},
259 };
260
261 #[tokio::test]
262 async fn test_multi_strategy_logical_or_flow() -> anyhow::Result<()> {
263 let mut seq = Sequence::new();
264
265 let mut s1 = MockSingularStrategy::new();
266 s1.expect_on_tick()
267 .times(1)
268 .in_sequence(&mut seq)
269 .returning(|| Err(Other("error".into())));
270
271 let mut s2 = MockSingularStrategy::new();
272 s2.expect_on_tick().times(1).in_sequence(&mut seq).returning(|| Ok(()));
273
274 let cfg = MultiStrategyConfig {
275 on_fail_continue: true,
276 allow_recursive: true,
277 execution_interval: 1,
278 strategies: Vec::new(),
279 };
280
281 let ms = MultiStrategy {
282 strategies: vec![Box::new(s1), Box::new(s2)],
283 cfg,
284 };
285 ms.on_tick().await?;
286
287 Ok(())
288 }
289
290 #[tokio::test]
291 async fn test_multi_strategy_logical_and_flow() {
292 let mut seq = Sequence::new();
293
294 let mut s1 = MockSingularStrategy::new();
295 s1.expect_on_tick()
296 .times(1)
297 .in_sequence(&mut seq)
298 .returning(|| Err(Other("error".into())));
299
300 let mut s2 = MockSingularStrategy::new();
301 s2.expect_on_tick().never().in_sequence(&mut seq).returning(|| Ok(()));
302
303 let cfg = MultiStrategyConfig {
304 on_fail_continue: false,
305 allow_recursive: true,
306 execution_interval: 1,
307 strategies: Vec::new(),
308 };
309
310 let ms = MultiStrategy {
311 strategies: vec![Box::new(s1), Box::new(s2)],
312 cfg,
313 };
314 ms.on_tick().await.expect_err("on_tick should fail");
315 }
316}