hopr_strategy/
auto_redeeming.rs

1//! ## Auto Redeeming Strategy
2//! This strategy listens for newly added acknowledged tickets and automatically issues a redeem transaction on that
3//! ticket.
4//!
5//! For details on default parameters, see [AutoRedeemingStrategyConfig].
6use std::{
7    fmt::{Debug, Display, Formatter},
8    str::FromStr,
9};
10
11use async_trait::async_trait;
12use futures::StreamExt;
13use hopr_api::{
14    chain::{ChainReadChannelOperations, ChainWriteTicketOperations, ChannelSelector},
15    db::TicketSelector,
16};
17use hopr_internal_types::{prelude::*, tickets::AcknowledgedTicket};
18use hopr_primitive_types::prelude::*;
19use serde::{Deserialize, Serialize};
20use serde_with::{DisplayFromStr, serde_as};
21use tracing::{debug, error, info};
22use validator::Validate;
23
24use crate::{
25    Strategy,
26    errors::{StrategyError, StrategyError::CriteriaNotSatisfied},
27    strategy::SingularStrategy,
28};
29
30#[cfg(all(feature = "prometheus", not(test)))]
31lazy_static::lazy_static! {
32    static ref METRIC_COUNT_AUTO_REDEEMS:  hopr_metrics::SimpleCounter =
33         hopr_metrics::SimpleCounter::new("hopr_strategy_auto_redeem_redeem_count", "Count of initiated automatic redemptions").unwrap();
34}
35
36fn just_true() -> bool {
37    true
38}
39
40fn just_false() -> bool {
41    false
42}
43
44fn min_redeem_hopr() -> HoprBalance {
45    HoprBalance::from_str("1 wxHOPR").unwrap()
46}
47
48/// Configuration object for the `AutoRedeemingStrategy`
49#[serde_as]
50#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
51pub struct AutoRedeemingStrategyConfig {
52    /// If set to true, will redeem all tickets in the channel (which are over the
53    /// `minimum_redeem_ticket_value` threshold) once it transitions to `PendingToClose`.
54    ///
55    /// Default is `true`.
56    #[serde(default = "just_true")]
57    #[default = true]
58    pub redeem_all_on_close: bool,
59
60    /// The strategy will only redeem an acknowledged winning ticket if it has at least this value of HOPR.
61    /// If 0 is given, the strategy will redeem tickets regardless of their value.
62    ///
63    /// Default is `1 wxHOPR`.
64    #[serde(default = "min_redeem_hopr")]
65    #[serde_as(as = "DisplayFromStr")]
66    #[default(min_redeem_hopr())]
67    pub minimum_redeem_ticket_value: HoprBalance,
68
69    /// If set, the strategy will redeem each incoming winning ticket.
70    /// Otherwise, it will try to redeem tickets in all channels periodically.
71    ///
72    /// Set this to `true` when winning tickets are not happening too often (e.g., when winning probability
73    /// is below 1%).
74    /// Set this to `false` when winning tickets are happening very often (e.g., when winning probability
75    /// is above 1%).
76    ///
77    /// Default is `true`
78    #[serde(default = "just_false")]
79    #[default = false]
80    pub redeem_on_winning: bool,
81}
82
83/// The `AutoRedeemingStrategy` automatically sends an acknowledged ticket
84/// for redemption once encountered.
85/// The strategy does not await the result of the redemption.
86pub struct AutoRedeemingStrategy<A> {
87    hopr_chain_actions: A,
88    cfg: AutoRedeemingStrategyConfig,
89}
90
91/// Number of concurrent channel redemption tasks.
92const REDEEMING_CONCURRENCY: usize = 20;
93
94impl<A> Debug for AutoRedeemingStrategy<A> {
95    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
96        write!(f, "{:?}", Strategy::AutoRedeeming(self.cfg))
97    }
98}
99
100impl<A> Display for AutoRedeemingStrategy<A> {
101    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
102        write!(f, "{}", Strategy::AutoRedeeming(self.cfg))
103    }
104}
105
106impl<A> AutoRedeemingStrategy<A> {
107    pub fn new(cfg: AutoRedeemingStrategyConfig, hopr_chain_actions: A) -> Self {
108        Self {
109            cfg,
110            hopr_chain_actions,
111        }
112    }
113}
114
115#[async_trait]
116impl<A> SingularStrategy for AutoRedeemingStrategy<A>
117where
118    A: ChainWriteTicketOperations + ChainReadChannelOperations + Clone + Send + Sync,
119{
120    async fn on_tick(&self) -> crate::errors::Result<()> {
121        if !self.cfg.redeem_on_winning {
122            debug!("trying to redeem all tickets in all channels");
123
124            let chain_actions = self.hopr_chain_actions.clone();
125            self.hopr_chain_actions
126                .stream_channels(
127                    ChannelSelector::default()
128                        .with_destination(*self.hopr_chain_actions.me())
129                        .with_allowed_states(&[
130                            ChannelStatusDiscriminants::Open,
131                            ChannelStatusDiscriminants::PendingToClose,
132                        ])
133                        .with_closure_time_range(Utc::now()..),
134                )
135                .await
136                .map_err(|e| StrategyError::Other(e.into()))?
137                .for_each_concurrent(REDEEMING_CONCURRENCY, |channel| {
138                    let chain_actions_clone = chain_actions.clone();
139                    async move {
140                        let selector = TicketSelector::from(&channel)
141                            .with_amount(self.cfg.minimum_redeem_ticket_value..)
142                            .with_index_range(channel.ticket_index.as_u64()..)
143                            .with_state(AcknowledgedTicketStatus::Untouched);
144
145                        match chain_actions_clone
146                            .redeem_tickets_via_selector(selector)
147                            .await
148                            .map_err(|e| StrategyError::Other(e.into()))
149                        {
150                            Ok(awaiter) => {
151                                // We don't await the result of the redemption here
152                                let count = awaiter.len();
153                                if count > 0 {
154                                    #[cfg(all(feature = "prometheus", not(test)))]
155                                    METRIC_COUNT_AUTO_REDEEMS.increment_by(count as u64);
156
157                                    info!(count, %channel, "strategy issued ticket redemptions");
158                                } else {
159                                    debug!(count, %channel, "strategy issued no ticket redemptions");
160                                }
161                            }
162                            Err(error) => error!(%error, %channel, "strategy failed to issue ticket redemptions"),
163                        }
164                    }
165                })
166                .await;
167
168            Ok(())
169        } else {
170            Err(CriteriaNotSatisfied)
171        }
172    }
173
174    async fn on_acknowledged_winning_ticket(&self, ack: &AcknowledgedTicket) -> crate::errors::Result<()> {
175        if self.cfg.redeem_on_winning && ack.verified_ticket().amount.ge(&self.cfg.minimum_redeem_ticket_value) {
176            if let Some(channel) = self
177                .hopr_chain_actions
178                .channel_by_id(&ack.verified_ticket().channel_id)
179                .await
180                .map_err(|e| StrategyError::Other(e.into()))?
181            {
182                info!(%ack, "redeeming");
183
184                if ack.ticket.verified_ticket().index < channel.ticket_index.as_u64() {
185                    error!(%ack, "acknowledged ticket is older than channel ticket index");
186                    return Err(CriteriaNotSatisfied);
187                }
188
189                #[cfg(all(feature = "prometheus", not(test)))]
190                METRIC_COUNT_AUTO_REDEEMS.increment();
191
192                let rxs = self
193                    .hopr_chain_actions
194                    .redeem_tickets_via_selector(
195                        TicketSelector::from(channel)
196                            .with_amount(self.cfg.minimum_redeem_ticket_value..)
197                            .with_index_range(channel.ticket_index.as_u64()..=ack.ticket.verified_ticket().index)
198                            .with_state(AcknowledgedTicketStatus::Untouched),
199                    )
200                    .await
201                    .map_err(|e| StrategyError::Other(e.into()))?;
202
203                std::mem::drop(rxs); // The Receiver is not intentionally awaited here and the oneshot Sender can fail safely
204
205                Ok(())
206            } else {
207                Err(CriteriaNotSatisfied)
208            }
209        } else {
210            Err(CriteriaNotSatisfied)
211        }
212    }
213
214    async fn on_own_channel_changed(
215        &self,
216        channel: &ChannelEntry,
217        direction: ChannelDirection,
218        change: ChannelChange,
219    ) -> crate::errors::Result<()> {
220        if direction != ChannelDirection::Incoming || !self.cfg.redeem_all_on_close {
221            return Ok(());
222        }
223
224        if let ChannelChange::Status { left: old, right: new } = change {
225            if old != ChannelStatus::Open || !matches!(new, ChannelStatus::PendingToClose(_)) {
226                debug!(?channel, "ignoring channel state change that's not in PendingToClose");
227                return Ok(());
228            }
229            info!(%channel, "channel transitioned to PendingToClose, checking if it has tickets to redeem");
230
231            let count = self
232                .hopr_chain_actions
233                .redeem_tickets_via_selector(
234                    TicketSelector::from(channel)
235                        .with_amount(self.cfg.minimum_redeem_ticket_value..)
236                        .with_index_range(channel.ticket_index.as_u64()..)
237                        .with_state(AcknowledgedTicketStatus::Untouched),
238                )
239                .await
240                .map_err(|e| StrategyError::Other(e.into()))?
241                .len();
242
243            #[cfg(all(feature = "prometheus", not(test)))]
244            METRIC_COUNT_AUTO_REDEEMS.increment_by(count as u64);
245
246            if count > 0 {
247                info!(count, %channel, "tickets in channel being closed sent for redemption");
248                Ok(())
249            } else {
250                info!(%channel, "no redeemable tickets with minimum amount in channel being closed");
251                Err(CriteriaNotSatisfied)
252            }
253        } else {
254            Err(CriteriaNotSatisfied)
255        }
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use std::{
262        ops::Add,
263        time::{Duration, SystemTime},
264    };
265
266    use hex_literal::hex;
267    use hopr_api::chain::ChainReceipt;
268    use hopr_crypto_random::Randomizable;
269    use hopr_crypto_types::prelude::*;
270
271    use super::*;
272    use crate::tests::{MockChainActions, MockTestActions};
273
274    lazy_static::lazy_static! {
275        static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
276        static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
277        static ref CHARLIE: ChainKeypair = ChainKeypair::from_secret(&hex!("d39a926980d6fa96a9eba8f8058b2beb774bc11866a386e9ddf9dc1152557c26")).expect("lazy static keypair should be constructible");
278        static ref PRICE_PER_PACKET: HoprBalance = 10000000000000000_u128.into(); // 0.01 HOPR
279
280        static ref CHANNEL_1: ChannelEntry = ChannelEntry::new(
281            BOB.public().to_address(),
282            ALICE.public().to_address(),
283            *PRICE_PER_PACKET * 10,
284            0.into(),
285            ChannelStatus::Open,
286            4.into()
287        );
288        static ref CHANNEL_2: ChannelEntry = ChannelEntry::new(
289            BOB.public().to_address(),
290            CHARLIE.public().to_address(),
291            *PRICE_PER_PACKET * 11,
292            1.into(),
293            ChannelStatus::Open,
294            4.into()
295        );
296    }
297
298    fn generate_random_ack_ticket(
299        index: u64,
300        idx_offset: u32,
301        worth_packets: u32,
302    ) -> anyhow::Result<AcknowledgedTicket> {
303        let hk1 = HalfKey::random();
304        let hk2 = HalfKey::random();
305
306        let challenge = Response::from_half_keys(&hk1, &hk2)?.to_challenge()?;
307
308        Ok(TicketBuilder::default()
309            .addresses(&*BOB, &*ALICE)
310            .amount(PRICE_PER_PACKET.div_f64(1.0f64)?.amount() * worth_packets)
311            .index(index)
312            .index_offset(idx_offset)
313            .win_prob(WinningProbability::ALWAYS)
314            .channel_epoch(4)
315            .challenge(challenge)
316            .build_signed(&BOB, &Hash::default())?
317            .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?))
318    }
319
320    #[tokio::test]
321    async fn test_auto_redeeming_strategy_redeem() -> anyhow::Result<()> {
322        let ack_ticket = generate_random_ack_ticket(0, 1, 5)?;
323
324        let mut mock = MockTestActions::new();
325        mock.expect_channel_by_id()
326            .once()
327            .with(mockall::predicate::eq(CHANNEL_1.get_id()))
328            .return_once(|_| Some(CHANNEL_1.clone()));
329
330        mock.expect_redeem_with_selector()
331            .once()
332            .with(mockall::predicate::eq(
333                TicketSelector::from(CHANNEL_1.clone())
334                    .with_amount(HoprBalance::zero()..)
335                    .with_index_range(
336                        ack_ticket.ticket.verified_ticket().index..=ack_ticket.ticket.verified_ticket().index,
337                    )
338                    .with_state(AcknowledgedTicketStatus::Untouched),
339            ))
340            .return_once(|_| vec![ChainReceipt::default()]);
341
342        let cfg = AutoRedeemingStrategyConfig {
343            minimum_redeem_ticket_value: 0.into(),
344            redeem_on_winning: true,
345            ..Default::default()
346        };
347
348        let ars = AutoRedeemingStrategy::new(cfg, MockChainActions(mock.into()));
349        ars.on_acknowledged_winning_ticket(&ack_ticket).await?;
350        assert!(ars.on_tick().await.is_err());
351
352        Ok(())
353    }
354
355    #[tokio::test]
356    async fn test_auto_redeeming_strategy_redeem_on_tick() -> anyhow::Result<()> {
357        let ack_ticket = generate_random_ack_ticket(0, 1, 5)?;
358
359        let mut mock = MockTestActions::new();
360        mock.expect_me().return_const(BOB.public().to_address());
361
362        mock.expect_stream_channels()
363            .once()
364            .withf(move |selector| {
365                selector.source.is_none()
366                    && selector.destination == Some(BOB.public().to_address())
367                    && selector.allowed_states
368                        == &[
369                            ChannelStatusDiscriminants::Open,
370                            ChannelStatusDiscriminants::PendingToClose,
371                        ]
372            })
373            .return_once(|_| futures::stream::iter([CHANNEL_1.clone(), CHANNEL_2.clone()]).boxed());
374
375        mock.expect_redeem_with_selector()
376            .once()
377            .with(mockall::predicate::eq(
378                TicketSelector::from(CHANNEL_1.clone())
379                    .with_amount(HoprBalance::from(*PRICE_PER_PACKET * 5)..)
380                    .with_index_range(CHANNEL_1.ticket_index.as_u64()..)
381                    .with_state(AcknowledgedTicketStatus::Untouched),
382            ))
383            .returning(|_| vec![ChainReceipt::default()]);
384
385        mock.expect_redeem_with_selector()
386            .once()
387            .with(mockall::predicate::eq(
388                TicketSelector::from(CHANNEL_2.clone())
389                    .with_amount(HoprBalance::from(*PRICE_PER_PACKET * 5)..)
390                    .with_index_range(CHANNEL_2.ticket_index.as_u64()..)
391                    .with_state(AcknowledgedTicketStatus::Untouched),
392            ))
393            .returning(|_| vec![ChainReceipt::default()]);
394
395        let cfg = AutoRedeemingStrategyConfig {
396            minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
397            redeem_on_winning: false,
398            ..Default::default()
399        };
400
401        let ars = AutoRedeemingStrategy::new(cfg, MockChainActions(mock.into()));
402        ars.on_tick().await?;
403        assert!(ars.on_acknowledged_winning_ticket(&ack_ticket).await.is_err());
404
405        Ok(())
406    }
407
408    #[tokio::test]
409    async fn test_auto_redeeming_strategy_redeem_minimum_ticket_amount() -> anyhow::Result<()> {
410        let ack_ticket_below = generate_random_ack_ticket(1, 1, 4)?;
411        let ack_ticket_at = generate_random_ack_ticket(1, 1, 5)?;
412
413        let mut mock = MockTestActions::new();
414        mock.expect_channel_by_id()
415            .once()
416            .with(mockall::predicate::eq(CHANNEL_1.get_id()))
417            .return_once(|_| Some(CHANNEL_1.clone()));
418
419        mock.expect_redeem_with_selector()
420            .once()
421            .with(mockall::predicate::eq(
422                TicketSelector::from(CHANNEL_1.clone())
423                    .with_amount(HoprBalance::from(*PRICE_PER_PACKET * 5)..)
424                    .with_index_range(CHANNEL_1.ticket_index.as_u64()..=ack_ticket_at.ticket.verified_ticket().index)
425                    .with_state(AcknowledgedTicketStatus::Untouched),
426            ))
427            .return_once(|_| vec![ChainReceipt::default()]);
428
429        let cfg = AutoRedeemingStrategyConfig {
430            minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
431            redeem_on_winning: true,
432            ..Default::default()
433        };
434
435        let ars = AutoRedeemingStrategy::new(cfg, MockChainActions(mock.into()));
436        ars.on_acknowledged_winning_ticket(&ack_ticket_below)
437            .await
438            .expect_err("ticket below threshold should not satisfy");
439        ars.on_acknowledged_winning_ticket(&ack_ticket_at).await?;
440
441        Ok(())
442    }
443
444    #[tokio::test]
445    async fn test_auto_redeeming_strategy_should_redeem_singular_ticket_on_close() -> anyhow::Result<()> {
446        let channel = ChannelEntry::new(
447            BOB.public().to_address(),
448            ALICE.public().to_address(),
449            10.into(),
450            0.into(),
451            ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100))),
452            4.into(),
453        );
454
455        let mut mock = MockTestActions::new();
456        mock.expect_redeem_with_selector()
457            .once()
458            .with(mockall::predicate::eq(
459                TicketSelector::from(CHANNEL_1.clone())
460                    .with_amount(HoprBalance::from(*PRICE_PER_PACKET * 5)..)
461                    .with_index_range(CHANNEL_1.ticket_index.as_u64()..)
462                    .with_state(AcknowledgedTicketStatus::Untouched),
463            ))
464            .return_once(|_| vec![ChainReceipt::default()]);
465
466        let cfg = AutoRedeemingStrategyConfig {
467            redeem_all_on_close: true,
468            minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
469            ..Default::default()
470        };
471
472        let ars = AutoRedeemingStrategy::new(cfg, MockChainActions(mock.into()));
473        ars.on_own_channel_changed(
474            &channel,
475            ChannelDirection::Incoming,
476            ChannelChange::Status {
477                left: ChannelStatus::Open,
478                right: channel.status,
479            },
480        )
481        .await?;
482
483        Ok(())
484    }
485
486    #[tokio::test]
487    async fn test_auto_redeeming_strategy_redeem_multiple_tickets_in_channel() -> anyhow::Result<()> {
488        let ack_ticket_1 = generate_random_ack_ticket(0, 2, 5)?;
489
490        let mut mock = MockTestActions::new();
491        mock.expect_channel_by_id()
492            .once()
493            .with(mockall::predicate::eq(CHANNEL_1.get_id()))
494            .return_once(|_| Some(CHANNEL_1.clone()));
495
496        mock.expect_redeem_with_selector()
497            .once()
498            .with(mockall::predicate::eq(
499                TicketSelector::from(CHANNEL_1.clone())
500                    .with_amount(HoprBalance::zero()..)
501                    .with_index_range(CHANNEL_1.ticket_index.as_u64()..=ack_ticket_1.ticket.verified_ticket().index)
502                    .with_state(AcknowledgedTicketStatus::Untouched),
503            ))
504            .return_once(|_| vec![ChainReceipt::default()]);
505
506        let cfg = AutoRedeemingStrategyConfig {
507            minimum_redeem_ticket_value: 0.into(),
508            redeem_on_winning: true,
509            ..Default::default()
510        };
511
512        let ars = AutoRedeemingStrategy::new(cfg, MockChainActions(mock.into()));
513        ars.on_acknowledged_winning_ticket(&ack_ticket_1).await?;
514        assert!(ars.on_tick().await.is_err());
515
516        Ok(())
517    }
518}