Skip to main content

hopr_strategy/
auto_redeeming.rs

1//! ## Auto Redeeming Strategy
2//! This strategy listens for newly added acknowledged tickets and automatically issues a redeem transaction on that
3//! ticket.
4//!
5//! For details on default parameters, see [AutoRedeemingStrategyConfig].
6use std::{
7    fmt::{Debug, Display, Formatter},
8    str::FromStr,
9    time::Duration,
10};
11
12use async_trait::async_trait;
13use futures::{StreamExt, TryStreamExt};
14use hopr_async_runtime::{AbortableList, spawn_as_abortable};
15use hopr_lib::{
16    ChannelChange, ChannelDirection, ChannelEntry, ChannelId, ChannelStatus, HoprBalance, VerifiedTicket,
17    api::{
18        chain::{ChainReadChannelOperations, ChainWriteTicketOperations, ChannelSelector},
19        tickets::TicketManagement,
20    },
21};
22use parking_lot::lock_api::RwLockUpgradableReadGuard;
23use serde::{Deserialize, Serialize};
24use serde_with::{DisplayFromStr, serde_as};
25use validator::Validate;
26
27use crate::{
28    Strategy,
29    errors::{StrategyError, StrategyError::CriteriaNotSatisfied},
30    strategy::SingularStrategy,
31};
32
33#[cfg(all(feature = "telemetry", not(test)))]
34lazy_static::lazy_static! {
35    static ref METRIC_COUNT_AUTO_REDEEMS:  hopr_metrics::SimpleCounter =
36         hopr_metrics::SimpleCounter::new("hopr_strategy_auto_redeem_redeem_count", "Count of initiated automatic redemptions").unwrap();
37}
38
39fn just_true() -> bool {
40    true
41}
42
43fn just_false() -> bool {
44    false
45}
46
47fn min_redeem_hopr() -> HoprBalance {
48    HoprBalance::from_str("1 wxHOPR").unwrap()
49}
50
51/// Configuration object for the `AutoRedeemingStrategy`
52#[serde_as]
53#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
54pub struct AutoRedeemingStrategyConfig {
55    /// If set to true, will redeem all tickets in the channel (which are over the
56    /// `minimum_redeem_ticket_value` threshold) once it transitions to `PendingToClose`.
57    ///
58    /// Default is `true`.
59    #[serde(default = "just_true")]
60    #[default = true]
61    pub redeem_all_on_close: bool,
62
63    /// The strategy will only redeem an acknowledged winning ticket if it has at least this value of HOPR.
64    /// If 0 is given, the strategy will redeem tickets regardless of their value.
65    ///
66    /// Default is `1 wxHOPR`.
67    #[serde(default = "min_redeem_hopr")]
68    #[serde_as(as = "DisplayFromStr")]
69    #[default(min_redeem_hopr())]
70    pub minimum_redeem_ticket_value: HoprBalance,
71
72    /// If set, the strategy will redeem each incoming winning ticket.
73    /// Otherwise, it will try to redeem tickets in all channels periodically.
74    ///
75    /// Set this to `true` when winning tickets are not happening too often (e.g., when winning probability
76    /// is below 1%).
77    /// Set this to `false` when winning tickets are happening very often (e.g., when winning probability
78    /// is above 1%).
79    ///
80    /// Default is `true`
81    #[serde(default = "just_false")]
82    #[default = false]
83    pub redeem_on_winning: bool,
84}
85
86/// The `AutoRedeemingStrategy` automatically sends an acknowledged ticket
87/// for redemption once encountered.
88/// The strategy does not await the result of the redemption.
89pub struct AutoRedeemingStrategy<A, T> {
90    cfg: AutoRedeemingStrategyConfig,
91    hopr_chain_actions: A,
92    ticket_manager: T,
93    // Makes sure all ongoing ticket redemptions to be terminated once the strategy is dropped.
94    running_redemptions: std::sync::Arc<parking_lot::RwLock<AbortableList<ChannelId>>>,
95}
96
97impl<A, T> Debug for AutoRedeemingStrategy<A, T> {
98    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
99        write!(f, "{:?}", Strategy::AutoRedeeming(self.cfg))
100    }
101}
102
103impl<A, T> Display for AutoRedeemingStrategy<A, T> {
104    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
105        write!(f, "{}", Strategy::AutoRedeeming(self.cfg))
106    }
107}
108
109impl<A, T> AutoRedeemingStrategy<A, T>
110where
111    A: ChainReadChannelOperations + ChainWriteTicketOperations + Clone + Send + Sync + 'static,
112    T: TicketManagement + Clone + Sync + Send + 'static,
113{
114    pub fn new(cfg: AutoRedeemingStrategyConfig, hopr_chain_actions: A, ticket_manager: T) -> Self {
115        Self {
116            cfg,
117            hopr_chain_actions,
118            ticket_manager,
119            running_redemptions: std::sync::Arc::new(parking_lot::RwLock::new(AbortableList::default())),
120        }
121    }
122
123    fn enqueue_redemption(&self, channel_id: &ChannelId) -> Result<(), StrategyError> {
124        let redemptions = self.running_redemptions.upgradable_read();
125        if !redemptions.contains(channel_id) {
126            tracing::debug!(%channel_id, "attempting to start redemption in channel");
127
128            let tmgr = self.ticket_manager.clone();
129            let client = self.hopr_chain_actions.clone();
130            let min_value = self.cfg.minimum_redeem_ticket_value;
131            let channel_id = *channel_id;
132            let redemptions_clone = self.running_redemptions.clone();
133
134            RwLockUpgradableReadGuard::upgrade(redemptions).insert(
135                channel_id,
136                spawn_as_abortable!(async move {
137                    let redeem_result = match tmgr
138                        .redeem_stream(client.clone(), channel_id, min_value.into())
139                        .map_err(StrategyError::other)
140                    {
141                        Ok(stream) => {
142                            stream
143                                .map_err(StrategyError::other)
144                                .try_for_each(|res| {
145                                    tracing::debug!(?res, %channel_id, "ticket redemption completed");
146                                    futures::future::ok(())
147                                })
148                                .await
149                        }
150                        err => {
151                            // Add small delay to avoid the write lock acquired for insertion
152                            // still being held.
153                            hopr_async_runtime::prelude::sleep(Duration::from_millis(100)).await;
154                            err.map(|_| ())
155                        }
156                    };
157
158                    tracing::debug!(?redeem_result, %channel_id, "redemption in channel complete");
159                    redemptions_clone.write().abort_one(&channel_id);
160                }),
161            );
162            Ok(())
163        } else {
164            tracing::debug!(%channel_id, "existing on-going redemption");
165            Err(StrategyError::InProgress)
166        }
167    }
168}
169
170#[async_trait]
171impl<A, T> SingularStrategy for AutoRedeemingStrategy<A, T>
172where
173    A: ChainReadChannelOperations + ChainWriteTicketOperations + Clone + Send + Sync + 'static,
174    T: TicketManagement + Clone + Sync + Send + 'static,
175{
176    async fn on_tick(&self) -> crate::errors::Result<()> {
177        if !self.cfg.redeem_on_winning {
178            tracing::debug!("trying to redeem all tickets in all channels");
179
180            self.hopr_chain_actions
181                .stream_channels(
182                    ChannelSelector::default()
183                        .with_destination(*self.hopr_chain_actions.me())
184                        .with_redeemable_channels(Duration::from_secs(30).into()),
185                )
186                .map_err(StrategyError::other)?
187                .for_each(|channel| {
188                    if let Err(error) = self.enqueue_redemption(channel.get_id()) {
189                        tracing::error!(
190                            %error,
191                            channel_id = %channel.get_id(),
192                            "cannot start redemption in channel"
193                        );
194                    }
195                    futures::future::ready(())
196                })
197                .await;
198
199            Ok(())
200        } else {
201            Err(CriteriaNotSatisfied)
202        }
203    }
204
205    async fn on_acknowledged_winning_ticket(&self, ack: &VerifiedTicket) -> crate::errors::Result<()> {
206        if self.cfg.redeem_on_winning && ack.verified_ticket().amount.ge(&self.cfg.minimum_redeem_ticket_value) {
207            let chain_api = self.hopr_chain_actions.clone();
208            let channel_id = *ack.channel_id();
209            let maybe_channel = hopr_async_runtime::prelude::spawn_blocking(move || {
210                chain_api.channel_by_id(&channel_id).map_err(StrategyError::other)
211            })
212            .await
213            .map_err(StrategyError::other)??;
214
215            if let Some(channel) = maybe_channel {
216                tracing::info!(%ack, "redeeming");
217
218                if ack.verified_ticket().index < channel.ticket_index {
219                    tracing::error!(%ack, "acknowledged ticket is older than channel ticket index");
220                    return Err(CriteriaNotSatisfied);
221                }
222
223                // Raises an error if redemption in this channel is ongoing
224                self.enqueue_redemption(channel.get_id())?;
225
226                Ok(())
227            } else {
228                Err(CriteriaNotSatisfied)
229            }
230        } else {
231            Err(CriteriaNotSatisfied)
232        }
233    }
234
235    async fn on_own_channel_changed(
236        &self,
237        channel: &ChannelEntry,
238        direction: ChannelDirection,
239        change: ChannelChange,
240    ) -> crate::errors::Result<()> {
241        if direction != ChannelDirection::Incoming || !self.cfg.redeem_all_on_close {
242            return Ok(());
243        }
244
245        if let ChannelChange::Status { left: old, right: new } = change {
246            if old != ChannelStatus::Open || !matches!(new, ChannelStatus::PendingToClose(_)) {
247                tracing::debug!(?channel, "ignoring channel state change that's not in PendingToClose");
248                return Ok(());
249            }
250            tracing::info!(%channel, "channel transitioned to PendingToClose, checking if it has tickets to redeem");
251
252            // Raises an error if redemption in this channel is ongoing
253            self.enqueue_redemption(channel.get_id())?;
254
255            Ok(())
256        } else {
257            Err(CriteriaNotSatisfied)
258        }
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use std::{
265        ops::Add,
266        sync::Arc,
267        time::{Duration, SystemTime},
268    };
269
270    use futures::stream::BoxStream;
271    use futures_time::future::FutureExt as TimeExt;
272    use hex_literal::hex;
273    use hopr_api::{
274        tickets::{ChannelStats, RedemptionResult},
275        types::crypto_random::Randomizable,
276    };
277    use hopr_chain_connector::{HoprBlockchainSafeConnector, create_trustful_hopr_blokli_connector, testing::*};
278    use hopr_lib::{
279        Address, BytesRepresentable, ChainKeypair, HalfKey, Hash, Keypair, RedeemableTicket, Response, TicketBuilder,
280        UnitaryFloatOps, WinningProbability, XDaiBalance,
281    };
282
283    use super::*;
284
285    mockall::mock! {
286        pub TicketMgmt {}
287         #[allow(refining_impl_trait)]
288        impl TicketManagement for TicketMgmt {
289            type Error = std::io::Error;
290            fn redeem_stream<C: ChainWriteTicketOperations + Send + Sync + 'static>(
291                &self,
292                client: C,
293                channel_id: ChannelId,
294                min_amount: Option<HoprBalance>,
295            ) -> Result<BoxStream<'static, Result<RedemptionResult, std::io::Error>>, std::io::Error>;
296
297            fn neglect_tickets(
298                &self,
299                channel_id: &ChannelId,
300                max_ticket_index: Option<u64>,
301            ) -> Result<Vec<VerifiedTicket>, std::io::Error>;
302
303            fn ticket_stats<'a>(&self, channel_id: Option<&'a ChannelId>) -> Result<ChannelStats, std::io::Error>;
304        }
305    }
306
307    lazy_static::lazy_static! {
308        static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
309        static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
310        static ref CHARLIE: ChainKeypair = ChainKeypair::from_secret(&hex!("d39a926980d6fa96a9eba8f8058b2beb774bc11866a386e9ddf9dc1152557c26")).expect("lazy static keypair should be constructible");
311        static ref PRICE_PER_PACKET: HoprBalance = 10000000000000000_u128.into(); // 0.01 HOPR
312
313        static ref CHANNEL_1: ChannelEntry = ChannelEntry::builder()
314            .between(&*ALICE, &*BOB)
315            .balance(*PRICE_PER_PACKET * 10)
316            .ticket_index(0)
317            .status(ChannelStatus::Open)
318            .epoch(4)
319            .build()
320            .unwrap();
321
322        static ref CHANNEL_2: ChannelEntry = ChannelEntry::builder()
323            .between(&*CHARLIE, &*BOB)
324            .balance(*PRICE_PER_PACKET * 11)
325            .ticket_index(1)
326            .status(ChannelStatus::Open)
327            .epoch(4)
328            .build()
329            .unwrap();
330
331        static ref CHAIN_CLIENT: BlokliTestClient<StaticState> = BlokliTestStateBuilder::default()
332            .with_generated_accounts(&[ALICE.public().as_ref(), BOB.public().as_ref(), CHARLIE.public().as_ref()], false, XDaiBalance::new_base(1), HoprBalance::new_base(1000))
333            .with_channels([*CHANNEL_1, *CHANNEL_2])
334            .build_static_client();
335    }
336
337    fn generate_random_ack_ticket(index: u64, worth_packets: u32) -> anyhow::Result<RedeemableTicket> {
338        let hk1 = HalfKey::random();
339        let hk2 = HalfKey::random();
340
341        let challenge = Response::from_half_keys(&hk1, &hk2)?.to_challenge()?;
342
343        Ok(TicketBuilder::default()
344            .counterparty(&*BOB)
345            .amount(PRICE_PER_PACKET.div_f64(1.0f64)?.amount() * worth_packets)
346            .index(index)
347            .win_prob(WinningProbability::ALWAYS)
348            .channel_epoch(4)
349            .challenge(challenge)
350            .build_signed(&ALICE, &Hash::default())?
351            .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?)
352            .into_redeemable(&BOB, &Hash::default())?)
353    }
354
355    type TestConnector = Arc<HoprBlockchainSafeConnector<BlokliTestClient<StaticState>>>;
356
357    async fn await_redemption_queue_empty(redeems: Arc<parking_lot::RwLock<AbortableList<ChannelId>>>) {
358        loop {
359            hopr_async_runtime::prelude::sleep(Duration::from_millis(100)).await;
360
361            if redeems.read().is_empty() {
362                break;
363            }
364        }
365    }
366
367    #[test_log::test(tokio::test)]
368    async fn test_auto_redeeming_strategy_redeem() -> anyhow::Result<()> {
369        let ack_ticket = generate_random_ack_ticket(0, 5)?;
370
371        let mut connector = create_trustful_hopr_blokli_connector(
372            &BOB,
373            Default::default(),
374            CHAIN_CLIENT.clone(),
375            [1u8; Address::SIZE].into(),
376        )
377        .await?;
378        connector.connect().await?;
379
380        let cfg = AutoRedeemingStrategyConfig {
381            minimum_redeem_ticket_value: 0.into(),
382            redeem_on_winning: true,
383            ..Default::default()
384        };
385
386        let mut mock_tmgr = MockTicketMgmt::new();
387        mock_tmgr
388            .expect_redeem_stream()
389            .once()
390            .with(
391                mockall::predicate::always(),
392                mockall::predicate::eq(*CHANNEL_1.get_id()),
393                mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
394            )
395            .return_once(move |_: TestConnector, _, _| {
396                Ok(futures::stream::once(futures::future::ok(RedemptionResult::Redeemed(ack_ticket.ticket))).boxed())
397            });
398
399        let ars = AutoRedeemingStrategy::new(cfg, Arc::new(connector), Arc::new(mock_tmgr));
400
401        ars.on_acknowledged_winning_ticket(&ack_ticket.ticket).await?;
402        assert!(ars.on_tick().await.is_err());
403
404        await_redemption_queue_empty(ars.running_redemptions.clone())
405            .timeout(futures_time::time::Duration::from_secs(5))
406            .await?;
407
408        Ok(())
409    }
410
411    #[test_log::test(tokio::test)]
412    async fn test_auto_redeeming_strategy_redeem_on_tick() -> anyhow::Result<()> {
413        let mut connector = create_trustful_hopr_blokli_connector(
414            &BOB,
415            Default::default(),
416            CHAIN_CLIENT.clone(),
417            [1u8; Address::SIZE].into(),
418        )
419        .await?;
420        connector.connect().await?;
421
422        let cfg = AutoRedeemingStrategyConfig {
423            minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
424            redeem_on_winning: false,
425            ..Default::default()
426        };
427
428        let mut mock_tmgr = MockTicketMgmt::new();
429        mock_tmgr
430            .expect_redeem_stream()
431            .once()
432            .with(
433                mockall::predicate::always(),
434                mockall::predicate::eq(*CHANNEL_1.get_id()),
435                mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
436            )
437            .return_once(|_: TestConnector, _, _| Ok(futures::stream::empty().boxed()));
438
439        mock_tmgr
440            .expect_redeem_stream()
441            .once()
442            .with(
443                mockall::predicate::always(),
444                mockall::predicate::eq(*CHANNEL_2.get_id()),
445                mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
446            )
447            .return_once(|_: TestConnector, _, _| Ok(futures::stream::empty().boxed()));
448
449        let ars = AutoRedeemingStrategy::new(cfg, Arc::new(connector), Arc::new(mock_tmgr));
450        ars.on_tick().await?;
451
452        await_redemption_queue_empty(ars.running_redemptions.clone())
453            .timeout(futures_time::time::Duration::from_secs(5))
454            .await?;
455
456        Ok(())
457    }
458
459    #[tokio::test]
460    async fn test_auto_redeeming_strategy_should_redeem_singular_ticket_on_close() -> anyhow::Result<()> {
461        let mut channel = *CHANNEL_1;
462        channel.status = ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100)));
463
464        let mut connector = create_trustful_hopr_blokli_connector(
465            &BOB,
466            Default::default(),
467            CHAIN_CLIENT.clone(),
468            [1u8; Address::SIZE].into(),
469        )
470        .await?;
471        connector.connect().await?;
472
473        let cfg = AutoRedeemingStrategyConfig {
474            redeem_all_on_close: true,
475            minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
476            ..Default::default()
477        };
478
479        let mut mock_tmgr = MockTicketMgmt::new();
480        mock_tmgr
481            .expect_redeem_stream()
482            .once()
483            .with(
484                mockall::predicate::always(),
485                mockall::predicate::eq(*CHANNEL_1.get_id()),
486                mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
487            )
488            .return_once(move |_: TestConnector, _, _| Ok(futures::stream::empty().boxed()));
489
490        let ars = AutoRedeemingStrategy::new(cfg, Arc::new(connector), Arc::new(mock_tmgr));
491        ars.on_own_channel_changed(
492            &channel,
493            ChannelDirection::Incoming,
494            ChannelChange::Status {
495                left: ChannelStatus::Open,
496                right: channel.status,
497            },
498        )
499        .await?;
500
501        await_redemption_queue_empty(ars.running_redemptions.clone())
502            .timeout(futures_time::time::Duration::from_secs(5))
503            .await?;
504
505        Ok(())
506    }
507
508    #[tokio::test]
509    async fn test_auto_redeeming_strategy_should_not_redeem_multiple_times_in_same_channel() -> anyhow::Result<()> {
510        let ack_ticket_1 = generate_random_ack_ticket(0, 5)?;
511
512        let mut connector = create_trustful_hopr_blokli_connector(
513            &BOB,
514            Default::default(),
515            CHAIN_CLIENT.clone(),
516            [1u8; Address::SIZE].into(),
517        )
518        .await?;
519        connector.connect().await?;
520
521        let cfg = AutoRedeemingStrategyConfig {
522            minimum_redeem_ticket_value: 0.into(),
523            redeem_on_winning: true,
524            ..Default::default()
525        };
526
527        let mut mock_tmgr = MockTicketMgmt::new();
528        mock_tmgr
529            .expect_redeem_stream()
530            .once()
531            .with(
532                mockall::predicate::always(),
533                mockall::predicate::eq(*CHANNEL_1.get_id()),
534                mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
535            )
536            .return_once(move |_: TestConnector, _, _| {
537                Ok(futures::stream::once(
538                    futures::future::ok(RedemptionResult::Redeemed(ack_ticket_1.ticket))
539                        .delay(futures_time::time::Duration::from_millis(500)),
540                )
541                .boxed())
542            });
543
544        let ars = AutoRedeemingStrategy::new(cfg, Arc::new(connector), Arc::new(mock_tmgr));
545        ars.on_acknowledged_winning_ticket(&ack_ticket_1.ticket).await?;
546        assert!(matches!(
547            ars.on_acknowledged_winning_ticket(&ack_ticket_1.ticket).await,
548            Err(StrategyError::InProgress)
549        ));
550
551        let mut channel = *CHANNEL_1;
552        channel.status = ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100)));
553
554        assert!(matches!(
555            ars.on_own_channel_changed(
556                &channel,
557                ChannelDirection::Incoming,
558                ChannelChange::Status {
559                    left: ChannelStatus::Open,
560                    right: channel.status,
561                }
562            )
563            .await,
564            Err(StrategyError::InProgress)
565        ));
566        assert!(ars.on_tick().await.is_err());
567
568        await_redemption_queue_empty(ars.running_redemptions.clone())
569            .timeout(futures_time::time::Duration::from_secs(5))
570            .await?;
571
572        Ok(())
573    }
574}