hopr_strategy/
lib.rs

1//! This crate contains all the Strategies for HOPRd.
2//! Strategies are vital for (partial) automation of ticket and HOPR channel operations
3//! during node runtime.
4//!
5//! - [passive strategy](crate::strategy::MultiStrategy)
6//! - [auto funding strategy](crate::auto_funding)
7//! - [auto redeeming strategy](crate::auto_redeeming)
8//! - [multiple strategy chains](crate::strategy)
9//!
10//! HOPRd can be configured to use any of the above strategies.
11//!
12//! ## Configuring strategies in HOPRd
13//!
14//! There are two ways of configuring strategies in HOPRd: via CLI and via a YAML config file.
15//!
16//! The configuration through CLI allows only fairly primitive single-strategy setting, through the `defaultStrategy`
17//! parameter. It can be set to any of the above strategies, however, the strategy parameters are not further
18//! configurable via the CLI and will always have their default values.
19//! In addition, if the ` disableTicketAutoRedeem ` CLI argument is `false`, the default Auto Redeem strategy is added
20//! to the strategy configured via the `defaultStrategy` argument (they execute together as Multi strategy).
21//!
22//! For more complex strategy configurations, the YAML configuration method is recommended via the `strategy` YAML
23//! section. In this case, the top-most strategy is always assumed to be Multi strategy:
24//!
25//! ```yaml
26//! strategy:
27//!   on_fail_continue: true
28//!   allow_recursive: true
29//!   execution_interval: 60
30//!   strategies:
31//!     - !AutoFunding
32//!       funding_amount: 20
33//! ```
34
35use std::{ops::Sub, str::FromStr, time::Duration};
36
37use futures::{StreamExt, pin_mut};
38use futures_concurrency::stream::Merge;
39use hopr_lib::{Address, ChannelChange, ChannelStatus, HoprBalance, VerifiedTicket, exports::api::chain::ChainEvent};
40use serde::{Deserialize, Serialize};
41use strum::{Display, EnumString, VariantNames};
42
43use crate::{
44    Strategy::AutoRedeeming,
45    auto_funding::AutoFundingStrategyConfig,
46    auto_redeeming::AutoRedeemingStrategyConfig,
47    channel_finalizer::ClosureFinalizerStrategyConfig,
48    strategy::{MultiStrategyConfig, SingularStrategy},
49};
50
51pub mod auto_funding;
52pub mod auto_redeeming;
53pub mod channel_finalizer;
54pub mod errors;
55pub mod strategy;
56
57/// Lists all possible strategies with their respective configurations.
58#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Display, EnumString, VariantNames)]
59#[strum(serialize_all = "snake_case")]
60pub enum Strategy {
61    AutoRedeeming(AutoRedeemingStrategyConfig),
62    AutoFunding(AutoFundingStrategyConfig),
63    ClosureFinalizer(ClosureFinalizerStrategyConfig),
64    Multi(MultiStrategyConfig),
65    Passive,
66}
67
68/// Default HOPR node strategies (in order).
69///
70/// ## Auto-redeem Strategy
71/// - redeem single tickets on channel close if worth at least 1 wxHOPR
72pub fn hopr_default_strategies() -> MultiStrategyConfig {
73    MultiStrategyConfig {
74        on_fail_continue: true,
75        allow_recursive: false,
76        execution_interval: Duration::from_secs(60),
77        strategies: vec![
78            // AutoFunding(AutoFundingStrategyConfig {
79            // min_stake_threshold: Balance::new_from_str("1000000000000000000", BalanceType::HOPR),
80            // funding_amount: Balance::new_from_str("10000000000000000000", BalanceType::HOPR),
81            // }),
82            AutoRedeeming(AutoRedeemingStrategyConfig {
83                redeem_all_on_close: true,
84                minimum_redeem_ticket_value: HoprBalance::from_str("1 wxHOPR").unwrap(),
85                redeem_on_winning: true,
86            }),
87        ],
88    }
89}
90
91enum StrategyEvent {
92    Tick,
93    ChainEvent(ChainEvent),
94    Ticket(VerifiedTicket),
95}
96
97/// Streams [`ChainEvents`](ChainEvent), [`VerifiedTickets`](VerifiedTicket) and `tick` at regular time
98/// intervals as events into the given `strategy`.
99pub fn stream_events_to_strategy_with_tick<C, T, S>(
100    strategy: std::sync::Arc<S>,
101    chain_events: C,
102    ticket_events: T,
103    tick: Duration,
104    me: Address,
105) -> hopr_async_runtime::AbortHandle
106where
107    C: futures::stream::Stream<Item = ChainEvent> + Send + 'static,
108    T: futures::stream::Stream<Item = VerifiedTicket> + Send + 'static,
109    S: SingularStrategy + Send + Sync + 'static,
110{
111    let tick_stream = futures_time::stream::interval(tick.into()).map(|_| StrategyEvent::Tick);
112    let chain_stream = chain_events.map(StrategyEvent::ChainEvent).fuse();
113    let ticket_stream = ticket_events.map(StrategyEvent::Ticket).fuse();
114
115    let (stream, abort_handle) = futures::stream::abortable((tick_stream, chain_stream, ticket_stream).merge());
116    hopr_async_runtime::prelude::spawn(async move {
117        pin_mut!(stream);
118        while let Some(event) = stream.next().await {
119            match event {
120                StrategyEvent::Tick => {
121                    if let Err(error) = strategy.on_tick().await {
122                        tracing::error!(%error, "error while notifying tick to strategy");
123                    }
124                }
125                StrategyEvent::ChainEvent(chain_event) => {
126                    // TODO: rework strategies so that they can react directly to `ChainEvent`s and avoid the following
127                    // conversion to `ChannelChange`
128                    match chain_event {
129                        ChainEvent::ChannelOpened(channel) => {
130                            if let Some(dir) = channel.direction(&me) {
131                                let _ = strategy
132                                    .on_own_channel_changed(
133                                        &channel,
134                                        dir,
135                                        ChannelChange::Status {
136                                            left: ChannelStatus::Closed,
137                                            right: ChannelStatus::Open,
138                                        },
139                                    )
140                                    .await;
141                            }
142                        }
143                        ChainEvent::ChannelClosureInitiated(channel) => {
144                            if let Some(dir) = channel.direction(&me) {
145                                let _ = strategy
146                                    .on_own_channel_changed(
147                                        &channel,
148                                        dir,
149                                        ChannelChange::Status {
150                                            left: ChannelStatus::Open,
151                                            right: channel.status,
152                                        },
153                                    )
154                                    .await;
155                            }
156                        }
157                        ChainEvent::ChannelClosed(channel) => {
158                            if let Some(dir) = channel.direction(&me) {
159                                let _ = strategy
160                                    .on_own_channel_changed(
161                                        &channel,
162                                        dir,
163                                        ChannelChange::Status {
164                                            left: ChannelStatus::PendingToClose(
165                                                std::time::SystemTime::now().sub(Duration::from_secs(30)),
166                                            ),
167                                            right: ChannelStatus::Closed,
168                                        },
169                                    )
170                                    .await;
171                            }
172                        }
173                        ChainEvent::ChannelBalanceIncreased(channel, diff) => {
174                            if let Some(dir) = channel.direction(&me) {
175                                let _ = strategy
176                                    .on_own_channel_changed(
177                                        &channel,
178                                        dir,
179                                        ChannelChange::Balance {
180                                            left: channel.balance - diff,
181                                            right: channel.balance,
182                                        },
183                                    )
184                                    .await;
185                            }
186                        }
187                        ChainEvent::ChannelBalanceDecreased(channel, diff) => {
188                            if let Some(dir) = channel.direction(&me) {
189                                let _ = strategy
190                                    .on_own_channel_changed(
191                                        &channel,
192                                        dir,
193                                        ChannelChange::Balance {
194                                            left: channel.balance + diff,
195                                            right: channel.balance,
196                                        },
197                                    )
198                                    .await;
199                            }
200                        }
201                        ChainEvent::TicketRedeemed(channel, _) => {
202                            if let Some(dir) = channel.direction(&me) {
203                                let _ = strategy
204                                    .on_own_channel_changed(
205                                        &channel,
206                                        dir,
207                                        ChannelChange::TicketIndex {
208                                            left: channel.ticket_index - 1,
209                                            right: channel.ticket_index,
210                                        },
211                                    )
212                                    .await;
213                            }
214                        }
215                        _ => {}
216                    }
217                }
218                StrategyEvent::Ticket(ack_ticket) => {
219                    if let Err(error) = strategy.on_acknowledged_winning_ticket(&ack_ticket).await {
220                        tracing::error!(%error, "error while notifying new winning ticket to strategy");
221                    }
222                }
223            }
224        }
225    });
226
227    abort_handle
228}
229
230impl Default for Strategy {
231    fn default() -> Self {
232        Self::Multi(hopr_default_strategies())
233    }
234}
235
236/// An alias for the strategy configuration type.
237pub type StrategyConfig = MultiStrategyConfig;