1use std::{
22 fmt::{Debug, Display, Formatter},
23 sync::Arc,
24};
25
26use async_trait::async_trait;
27use hopr_chain_actions::ChainActions;
28use hopr_db_sql::HoprDbAllOperations;
29use hopr_internal_types::prelude::*;
30use hopr_transport_ticket_aggregation::TicketAggregatorTrait;
31use serde::{Deserialize, Serialize};
32use tracing::{error, warn};
33use validator::Validate;
34#[cfg(all(feature = "prometheus", not(test)))]
35use {hopr_metrics::metrics::MultiGauge, strum::VariantNames};
36
37use crate::{
38 Strategy, aggregating::AggregatingStrategy, auto_funding::AutoFundingStrategy,
39 auto_redeeming::AutoRedeemingStrategy, channel_finalizer::ClosureFinalizerStrategy, errors::Result,
40 promiscuous::PromiscuousStrategy,
41};
42
43#[cfg(all(feature = "prometheus", not(test)))]
44lazy_static::lazy_static! {
45 static ref METRIC_ENABLED_STRATEGIES: MultiGauge =
46 MultiGauge::new("hopr_strategy_enabled_strategies", "List of enabled strategies", &["strategy"]).unwrap();
47}
48
49#[cfg_attr(test, mockall::automock)]
51#[async_trait]
52pub trait SingularStrategy: Display {
53 async fn on_tick(&self) -> Result<()> {
55 Ok(())
56 }
57
58 async fn on_acknowledged_winning_ticket(&self, _ack: &AcknowledgedTicket) -> Result<()> {
60 Ok(())
61 }
62
63 async fn on_own_channel_changed(
65 &self,
66 _channel: &ChannelEntry,
67 _direction: ChannelDirection,
68 _change: ChannelChange,
69 ) -> Result<()> {
70 Ok(())
71 }
72}
73
74#[inline]
75fn just_true() -> bool {
76 true
77}
78
79#[inline]
80fn sixty() -> u64 {
81 60
82}
83
84#[inline]
85fn empty_vector() -> Vec<Strategy> {
86 vec![]
87}
88
89#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
93#[serde(deny_unknown_fields)]
94pub struct MultiStrategyConfig {
95 #[default = true]
101 #[serde(default = "just_true")]
102 pub on_fail_continue: bool,
103
104 #[default = true]
108 #[serde(default = "just_true")]
109 pub allow_recursive: bool,
110
111 #[default = 60]
115 #[serde(default = "sixty")]
116 #[validate(range(min = 1))]
117 pub execution_interval: u64,
118
119 #[default(_code = "vec![]")]
123 #[serde(default = "empty_vector")]
124 pub strategies: Vec<Strategy>,
125}
126
127pub struct MultiStrategy {
132 strategies: Vec<Box<dyn SingularStrategy + Send + Sync>>,
133 cfg: MultiStrategyConfig,
134}
135
136impl MultiStrategy {
137 pub fn new<Db>(
140 cfg: MultiStrategyConfig,
141 db: Db,
142 hopr_chain_actions: ChainActions<Db>,
143 ticket_aggregator: Arc<dyn TicketAggregatorTrait + Send + Sync + 'static>,
144 ) -> Self
145 where
146 Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
147 {
148 let mut strategies = Vec::<Box<dyn SingularStrategy + Send + Sync>>::new();
149
150 #[cfg(all(feature = "prometheus", not(test)))]
151 Strategy::VARIANTS
152 .iter()
153 .for_each(|s| METRIC_ENABLED_STRATEGIES.set(&[*s], 0_f64));
154
155 for strategy in cfg.strategies.iter() {
156 match strategy {
157 Strategy::Promiscuous(sub_cfg) => strategies.push(Box::new(PromiscuousStrategy::new(
158 sub_cfg.clone(),
159 db.clone(),
160 hopr_chain_actions.clone(),
161 ))),
162 Strategy::Aggregating(sub_cfg) => strategies.push(Box::new(AggregatingStrategy::new(
163 *sub_cfg,
164 db.clone(),
165 ticket_aggregator.clone(),
166 ))),
167 Strategy::AutoRedeeming(sub_cfg) => strategies.push(Box::new(AutoRedeemingStrategy::new(
168 *sub_cfg,
169 db.clone(),
170 hopr_chain_actions.clone(),
171 ))),
172 Strategy::AutoFunding(sub_cfg) => {
173 strategies.push(Box::new(AutoFundingStrategy::new(*sub_cfg, hopr_chain_actions.clone())))
174 }
175 Strategy::ClosureFinalizer(sub_cfg) => strategies.push(Box::new(ClosureFinalizerStrategy::new(
176 *sub_cfg,
177 db.clone(),
178 hopr_chain_actions.clone(),
179 ))),
180 Strategy::Multi(sub_cfg) => {
181 if cfg.allow_recursive {
182 let mut cfg_clone = sub_cfg.clone();
183 cfg_clone.allow_recursive = false; strategies.push(Box::new(Self::new(
186 cfg_clone,
187 db.clone(),
188 hopr_chain_actions.clone(),
189 ticket_aggregator.clone(),
190 )))
191 } else {
192 error!("recursive multi-strategy not allowed and skipped")
193 }
194 }
195
196 Strategy::Passive => strategies.push(Box::new(Self {
198 cfg: Default::default(),
199 strategies: Vec::new(),
200 })),
201 }
202
203 #[cfg(all(feature = "prometheus", not(test)))]
204 METRIC_ENABLED_STRATEGIES.set(&[&strategy.to_string()], 1_f64);
205 }
206
207 Self { strategies, cfg }
208 }
209}
210
211impl Debug for MultiStrategy {
212 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
213 write!(f, "{:?}", Strategy::Multi(self.cfg.clone()))
214 }
215}
216
217impl Display for MultiStrategy {
218 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
219 write!(f, "{}", Strategy::Multi(self.cfg.clone()))
220 }
221}
222
223#[async_trait]
224impl SingularStrategy for MultiStrategy {
225 async fn on_tick(&self) -> Result<()> {
226 for strategy in self.strategies.iter() {
227 if let Err(e) = strategy.on_tick().await {
228 if !self.cfg.on_fail_continue {
229 warn!(%self, %strategy, "on_tick chain stopped at strategy");
230 return Err(e);
231 }
232 }
233 }
234 Ok(())
235 }
236
237 async fn on_acknowledged_winning_ticket(&self, ack: &AcknowledgedTicket) -> Result<()> {
238 for strategy in self.strategies.iter() {
239 if let Err(e) = strategy.on_acknowledged_winning_ticket(ack).await {
240 if !self.cfg.on_fail_continue {
241 warn!(%self, %strategy, "on_acknowledged_ticket chain stopped at strategy");
242 return Err(e);
243 }
244 }
245 }
246 Ok(())
247 }
248
249 async fn on_own_channel_changed(
250 &self,
251 channel: &ChannelEntry,
252 direction: ChannelDirection,
253 change: ChannelChange,
254 ) -> Result<()> {
255 for strategy in self.strategies.iter() {
256 if let Err(e) = strategy.on_own_channel_changed(channel, direction, change).await {
257 if !self.cfg.on_fail_continue {
258 warn!(%self, "on_channel_state_changed chain stopped at strategy");
259 return Err(e);
260 }
261 }
262 }
263 Ok(())
264 }
265}
266
267#[cfg(test)]
268impl Display for MockSingularStrategy {
269 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
270 write!(f, "mock")
271 }
272}
273
274#[cfg(test)]
275mod tests {
276 use mockall::Sequence;
277
278 use crate::{
279 errors::StrategyError::Other,
280 strategy::{MockSingularStrategy, MultiStrategy, MultiStrategyConfig, SingularStrategy},
281 };
282
283 #[tokio::test]
284 async fn test_multi_strategy_logical_or_flow() -> anyhow::Result<()> {
285 let mut seq = Sequence::new();
286
287 let mut s1 = MockSingularStrategy::new();
288 s1.expect_on_tick()
289 .times(1)
290 .in_sequence(&mut seq)
291 .returning(|| Err(Other("error".into())));
292
293 let mut s2 = MockSingularStrategy::new();
294 s2.expect_on_tick().times(1).in_sequence(&mut seq).returning(|| Ok(()));
295
296 let cfg = MultiStrategyConfig {
297 on_fail_continue: true,
298 allow_recursive: true,
299 execution_interval: 1,
300 strategies: Vec::new(),
301 };
302
303 let ms = MultiStrategy {
304 strategies: vec![Box::new(s1), Box::new(s2)],
305 cfg,
306 };
307 ms.on_tick().await?;
308
309 Ok(())
310 }
311
312 #[tokio::test]
313 async fn test_multi_strategy_logical_and_flow() {
314 let mut seq = Sequence::new();
315
316 let mut s1 = MockSingularStrategy::new();
317 s1.expect_on_tick()
318 .times(1)
319 .in_sequence(&mut seq)
320 .returning(|| Err(Other("error".into())));
321
322 let mut s2 = MockSingularStrategy::new();
323 s2.expect_on_tick().never().in_sequence(&mut seq).returning(|| Ok(()));
324
325 let cfg = MultiStrategyConfig {
326 on_fail_continue: false,
327 allow_recursive: true,
328 execution_interval: 1,
329 strategies: Vec::new(),
330 };
331
332 let ms = MultiStrategy {
333 strategies: vec![Box::new(s1), Box::new(s2)],
334 cfg,
335 };
336 ms.on_tick().await.expect_err("on_tick should fail");
337 }
338}