1use async_trait::async_trait;
21use serde::{Deserialize, Serialize};
22use std::{
23 fmt::{Debug, Display, Formatter},
24 sync::Arc,
25};
26use tracing::{error, warn};
27use validator::Validate;
28
29use hopr_chain_actions::ChainActions;
30use hopr_internal_types::prelude::*;
31use hopr_transport_protocol::ticket_aggregation::processor::TicketAggregatorTrait;
32
33use crate::aggregating::AggregatingStrategy;
34use crate::auto_funding::AutoFundingStrategy;
35use crate::auto_redeeming::AutoRedeemingStrategy;
36use crate::channel_finalizer::ClosureFinalizerStrategy;
37use crate::errors::Result;
38use crate::promiscuous::PromiscuousStrategy;
39use crate::Strategy;
40
41use hopr_db_sql::HoprDbAllOperations;
42#[cfg(all(feature = "prometheus", not(test)))]
43use {hopr_metrics::metrics::MultiGauge, strum::VariantNames};
44
45#[cfg(all(feature = "prometheus", not(test)))]
46lazy_static::lazy_static! {
47 static ref METRIC_ENABLED_STRATEGIES: MultiGauge =
48 MultiGauge::new("hopr_strategy_enabled_strategies", "List of enabled strategies", &["strategy"]).unwrap();
49}
50
51#[cfg_attr(test, mockall::automock)]
53#[async_trait]
54pub trait SingularStrategy: Display {
55 async fn on_tick(&self) -> Result<()> {
57 Ok(())
58 }
59
60 async fn on_acknowledged_winning_ticket(&self, _ack: &AcknowledgedTicket) -> Result<()> {
62 Ok(())
63 }
64
65 async fn on_own_channel_changed(
67 &self,
68 _channel: &ChannelEntry,
69 _direction: ChannelDirection,
70 _change: ChannelChange,
71 ) -> Result<()> {
72 Ok(())
73 }
74}
75
76#[inline]
77fn just_true() -> bool {
78 true
79}
80
81#[inline]
82fn sixty() -> u64 {
83 60
84}
85
86#[inline]
87fn empty_vector() -> Vec<Strategy> {
88 vec![]
89}
90
91#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
95#[serde(deny_unknown_fields)]
96pub struct MultiStrategyConfig {
97 #[default = true]
103 #[serde(default = "just_true")]
104 pub on_fail_continue: bool,
105
106 #[default = true]
110 #[serde(default = "just_true")]
111 pub allow_recursive: bool,
112
113 #[default = 60]
117 #[serde(default = "sixty")]
118 #[validate(range(min = 1))]
119 pub execution_interval: u64,
120
121 #[default(_code = "vec![]")]
125 #[serde(default = "empty_vector")]
126 pub strategies: Vec<Strategy>,
127}
128
129pub struct MultiStrategy {
134 strategies: Vec<Box<dyn SingularStrategy + Send + Sync>>,
135 cfg: MultiStrategyConfig,
136}
137
138impl MultiStrategy {
139 pub fn new<Db>(
142 cfg: MultiStrategyConfig,
143 db: Db,
144 hopr_chain_actions: ChainActions<Db>,
145 ticket_aggregator: Arc<dyn TicketAggregatorTrait + Send + Sync + 'static>,
146 ) -> Self
147 where
148 Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
149 {
150 let mut strategies = Vec::<Box<dyn SingularStrategy + Send + Sync>>::new();
151
152 #[cfg(all(feature = "prometheus", not(test)))]
153 Strategy::VARIANTS
154 .iter()
155 .for_each(|s| METRIC_ENABLED_STRATEGIES.set(&[*s], 0_f64));
156
157 for strategy in cfg.strategies.iter() {
158 match strategy {
159 Strategy::Promiscuous(sub_cfg) => strategies.push(Box::new(PromiscuousStrategy::new(
160 sub_cfg.clone(),
161 db.clone(),
162 hopr_chain_actions.clone(),
163 ))),
164 Strategy::Aggregating(sub_cfg) => strategies.push(Box::new(AggregatingStrategy::new(
165 *sub_cfg,
166 db.clone(),
167 ticket_aggregator.clone(),
168 ))),
169 Strategy::AutoRedeeming(sub_cfg) => strategies.push(Box::new(AutoRedeemingStrategy::new(
170 *sub_cfg,
171 db.clone(),
172 hopr_chain_actions.clone(),
173 ))),
174 Strategy::AutoFunding(sub_cfg) => {
175 strategies.push(Box::new(AutoFundingStrategy::new(*sub_cfg, hopr_chain_actions.clone())))
176 }
177 Strategy::ClosureFinalizer(sub_cfg) => strategies.push(Box::new(ClosureFinalizerStrategy::new(
178 *sub_cfg,
179 db.clone(),
180 hopr_chain_actions.clone(),
181 ))),
182 Strategy::Multi(sub_cfg) => {
183 if cfg.allow_recursive {
184 let mut cfg_clone = sub_cfg.clone();
185 cfg_clone.allow_recursive = false; strategies.push(Box::new(Self::new(
188 cfg_clone,
189 db.clone(),
190 hopr_chain_actions.clone(),
191 ticket_aggregator.clone(),
192 )))
193 } else {
194 error!("recursive multi-strategy not allowed and skipped")
195 }
196 }
197
198 Strategy::Passive => strategies.push(Box::new(Self {
200 cfg: Default::default(),
201 strategies: Vec::new(),
202 })),
203 }
204
205 #[cfg(all(feature = "prometheus", not(test)))]
206 METRIC_ENABLED_STRATEGIES.set(&[&strategy.to_string()], 1_f64);
207 }
208
209 Self { strategies, cfg }
210 }
211}
212
213impl Debug for MultiStrategy {
214 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
215 write!(f, "{:?}", Strategy::Multi(self.cfg.clone()))
216 }
217}
218
219impl Display for MultiStrategy {
220 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
221 write!(f, "{}", Strategy::Multi(self.cfg.clone()))
222 }
223}
224
225#[async_trait]
226impl SingularStrategy for MultiStrategy {
227 async fn on_tick(&self) -> Result<()> {
228 for strategy in self.strategies.iter() {
229 if let Err(e) = strategy.on_tick().await {
230 if !self.cfg.on_fail_continue {
231 warn!(%self, %strategy, "on_tick chain stopped at strategy");
232 return Err(e);
233 }
234 }
235 }
236 Ok(())
237 }
238
239 async fn on_acknowledged_winning_ticket(&self, ack: &AcknowledgedTicket) -> Result<()> {
240 for strategy in self.strategies.iter() {
241 if let Err(e) = strategy.on_acknowledged_winning_ticket(ack).await {
242 if !self.cfg.on_fail_continue {
243 warn!(%self, %strategy, "on_acknowledged_ticket chain stopped at strategy");
244 return Err(e);
245 }
246 }
247 }
248 Ok(())
249 }
250
251 async fn on_own_channel_changed(
252 &self,
253 channel: &ChannelEntry,
254 direction: ChannelDirection,
255 change: ChannelChange,
256 ) -> Result<()> {
257 for strategy in self.strategies.iter() {
258 if let Err(e) = strategy.on_own_channel_changed(channel, direction, change).await {
259 if !self.cfg.on_fail_continue {
260 warn!(%self, "on_channel_state_changed chain stopped at strategy");
261 return Err(e);
262 }
263 }
264 }
265 Ok(())
266 }
267}
268
269#[cfg(test)]
270impl Display for MockSingularStrategy {
271 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
272 write!(f, "mock")
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use crate::errors::StrategyError::Other;
279 use crate::strategy::{MockSingularStrategy, MultiStrategy, MultiStrategyConfig, SingularStrategy};
280 use mockall::Sequence;
281
282 #[async_std::test]
283 async fn test_multi_strategy_logical_or_flow() -> anyhow::Result<()> {
284 let mut seq = Sequence::new();
285
286 let mut s1 = MockSingularStrategy::new();
287 s1.expect_on_tick()
288 .times(1)
289 .in_sequence(&mut seq)
290 .returning(|| Err(Other("error".into())));
291
292 let mut s2 = MockSingularStrategy::new();
293 s2.expect_on_tick().times(1).in_sequence(&mut seq).returning(|| Ok(()));
294
295 let cfg = MultiStrategyConfig {
296 on_fail_continue: true,
297 allow_recursive: true,
298 execution_interval: 1,
299 strategies: Vec::new(),
300 };
301
302 let ms = MultiStrategy {
303 strategies: vec![Box::new(s1), Box::new(s2)],
304 cfg,
305 };
306 ms.on_tick().await?;
307
308 Ok(())
309 }
310
311 #[async_std::test]
312 async fn test_multi_strategy_logical_and_flow() {
313 let mut seq = Sequence::new();
314
315 let mut s1 = MockSingularStrategy::new();
316 s1.expect_on_tick()
317 .times(1)
318 .in_sequence(&mut seq)
319 .returning(|| Err(Other("error".into())));
320
321 let mut s2 = MockSingularStrategy::new();
322 s2.expect_on_tick().never().in_sequence(&mut seq).returning(|| Ok(()));
323
324 let cfg = MultiStrategyConfig {
325 on_fail_continue: false,
326 allow_recursive: true,
327 execution_interval: 1,
328 strategies: Vec::new(),
329 };
330
331 let ms = MultiStrategy {
332 strategies: vec![Box::new(s1), Box::new(s2)],
333 cfg,
334 };
335 ms.on_tick().await.expect_err("on_tick should fail");
336 }
337}