1use std::{
7 fmt::{Debug, Display, Formatter},
8 str::FromStr,
9 time::Duration,
10};
11
12use async_trait::async_trait;
13use futures::{StreamExt, TryStreamExt};
14use hopr_async_runtime::{AbortableList, spawn_as_abortable};
15use hopr_lib::{
16 ChannelChange, ChannelDirection, ChannelEntry, ChannelId, ChannelStatus, HoprBalance, VerifiedTicket,
17 api::{
18 chain::{ChainReadChannelOperations, ChainWriteTicketOperations, ChannelSelector},
19 tickets::TicketManagement,
20 },
21};
22use parking_lot::lock_api::RwLockUpgradableReadGuard;
23use serde::{Deserialize, Serialize};
24use serde_with::{DisplayFromStr, serde_as};
25use validator::Validate;
26
27use crate::{
28 Strategy,
29 errors::{StrategyError, StrategyError::CriteriaNotSatisfied},
30 strategy::SingularStrategy,
31};
32
33#[cfg(all(feature = "telemetry", not(test)))]
34lazy_static::lazy_static! {
35 static ref METRIC_COUNT_AUTO_REDEEMS: hopr_metrics::SimpleCounter =
36 hopr_metrics::SimpleCounter::new("hopr_strategy_auto_redeem_redeem_count", "Count of initiated automatic redemptions").unwrap();
37}
38
39fn just_true() -> bool {
40 true
41}
42
43fn just_false() -> bool {
44 false
45}
46
47fn min_redeem_hopr() -> HoprBalance {
48 HoprBalance::from_str("1 wxHOPR").unwrap()
49}
50
51#[serde_as]
53#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
54pub struct AutoRedeemingStrategyConfig {
55 #[serde(default = "just_true")]
60 #[default = true]
61 pub redeem_all_on_close: bool,
62
63 #[serde(default = "min_redeem_hopr")]
68 #[serde_as(as = "DisplayFromStr")]
69 #[default(min_redeem_hopr())]
70 pub minimum_redeem_ticket_value: HoprBalance,
71
72 #[serde(default = "just_false")]
82 #[default = false]
83 pub redeem_on_winning: bool,
84}
85
86pub struct AutoRedeemingStrategy<A, T> {
90 cfg: AutoRedeemingStrategyConfig,
91 hopr_chain_actions: A,
92 ticket_manager: T,
93 running_redemptions: std::sync::Arc<parking_lot::RwLock<AbortableList<ChannelId>>>,
95}
96
97impl<A, T> Debug for AutoRedeemingStrategy<A, T> {
98 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
99 write!(f, "{:?}", Strategy::AutoRedeeming(self.cfg))
100 }
101}
102
103impl<A, T> Display for AutoRedeemingStrategy<A, T> {
104 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
105 write!(f, "{}", Strategy::AutoRedeeming(self.cfg))
106 }
107}
108
109impl<A, T> AutoRedeemingStrategy<A, T>
110where
111 A: ChainReadChannelOperations + ChainWriteTicketOperations + Clone + Send + Sync + 'static,
112 T: TicketManagement + Clone + Sync + Send + 'static,
113{
114 pub fn new(cfg: AutoRedeemingStrategyConfig, hopr_chain_actions: A, ticket_manager: T) -> Self {
115 Self {
116 cfg,
117 hopr_chain_actions,
118 ticket_manager,
119 running_redemptions: std::sync::Arc::new(parking_lot::RwLock::new(AbortableList::default())),
120 }
121 }
122
123 fn enqueue_redemption(&self, channel_id: &ChannelId) -> Result<(), StrategyError> {
124 let redemptions = self.running_redemptions.upgradable_read();
125 if !redemptions.contains(channel_id) {
126 tracing::debug!(%channel_id, "attempting to start redemption in channel");
127
128 let tmgr = self.ticket_manager.clone();
129 let client = self.hopr_chain_actions.clone();
130 let min_value = self.cfg.minimum_redeem_ticket_value;
131 let channel_id = *channel_id;
132 let redemptions_clone = self.running_redemptions.clone();
133
134 RwLockUpgradableReadGuard::upgrade(redemptions).insert(
135 channel_id,
136 spawn_as_abortable!(async move {
137 let redeem_result = match tmgr
138 .redeem_stream(client.clone(), channel_id, min_value.into())
139 .map_err(StrategyError::other)
140 {
141 Ok(stream) => {
142 stream
143 .map_err(StrategyError::other)
144 .try_for_each(|res| {
145 tracing::debug!(?res, %channel_id, "ticket redemption completed");
146 futures::future::ok(())
147 })
148 .await
149 }
150 err => {
151 hopr_async_runtime::prelude::sleep(Duration::from_millis(100)).await;
154 err.map(|_| ())
155 }
156 };
157
158 tracing::debug!(?redeem_result, %channel_id, "redemption in channel complete");
159 redemptions_clone.write().abort_one(&channel_id);
160 }),
161 );
162 Ok(())
163 } else {
164 tracing::debug!(%channel_id, "existing on-going redemption");
165 Err(StrategyError::InProgress)
166 }
167 }
168}
169
170#[async_trait]
171impl<A, T> SingularStrategy for AutoRedeemingStrategy<A, T>
172where
173 A: ChainReadChannelOperations + ChainWriteTicketOperations + Clone + Send + Sync + 'static,
174 T: TicketManagement + Clone + Sync + Send + 'static,
175{
176 async fn on_tick(&self) -> crate::errors::Result<()> {
177 if !self.cfg.redeem_on_winning {
178 tracing::debug!("trying to redeem all tickets in all channels");
179
180 self.hopr_chain_actions
181 .stream_channels(
182 ChannelSelector::default()
183 .with_destination(*self.hopr_chain_actions.me())
184 .with_redeemable_channels(Duration::from_secs(30).into()),
185 )
186 .map_err(StrategyError::other)?
187 .for_each(|channel| {
188 if let Err(error) = self.enqueue_redemption(channel.get_id()) {
189 tracing::error!(
190 %error,
191 channel_id = %channel.get_id(),
192 "cannot start redemption in channel"
193 );
194 }
195 futures::future::ready(())
196 })
197 .await;
198
199 Ok(())
200 } else {
201 Err(CriteriaNotSatisfied)
202 }
203 }
204
205 async fn on_acknowledged_winning_ticket(&self, ack: &VerifiedTicket) -> crate::errors::Result<()> {
206 if self.cfg.redeem_on_winning && ack.verified_ticket().amount.ge(&self.cfg.minimum_redeem_ticket_value) {
207 let chain_api = self.hopr_chain_actions.clone();
208 let channel_id = *ack.channel_id();
209 let maybe_channel = hopr_async_runtime::prelude::spawn_blocking(move || {
210 chain_api.channel_by_id(&channel_id).map_err(StrategyError::other)
211 })
212 .await
213 .map_err(StrategyError::other)??;
214
215 if let Some(channel) = maybe_channel {
216 tracing::info!(%ack, "redeeming");
217
218 if ack.verified_ticket().index < channel.ticket_index {
219 tracing::error!(%ack, "acknowledged ticket is older than channel ticket index");
220 return Err(CriteriaNotSatisfied);
221 }
222
223 self.enqueue_redemption(channel.get_id())?;
225
226 Ok(())
227 } else {
228 Err(CriteriaNotSatisfied)
229 }
230 } else {
231 Err(CriteriaNotSatisfied)
232 }
233 }
234
235 async fn on_own_channel_changed(
236 &self,
237 channel: &ChannelEntry,
238 direction: ChannelDirection,
239 change: ChannelChange,
240 ) -> crate::errors::Result<()> {
241 if direction != ChannelDirection::Incoming || !self.cfg.redeem_all_on_close {
242 return Ok(());
243 }
244
245 if let ChannelChange::Status { left: old, right: new } = change {
246 if old != ChannelStatus::Open || !matches!(new, ChannelStatus::PendingToClose(_)) {
247 tracing::debug!(?channel, "ignoring channel state change that's not in PendingToClose");
248 return Ok(());
249 }
250 tracing::info!(%channel, "channel transitioned to PendingToClose, checking if it has tickets to redeem");
251
252 self.enqueue_redemption(channel.get_id())?;
254
255 Ok(())
256 } else {
257 Err(CriteriaNotSatisfied)
258 }
259 }
260}
261
262#[cfg(test)]
263mod tests {
264 use std::{
265 ops::Add,
266 sync::Arc,
267 time::{Duration, SystemTime},
268 };
269
270 use futures::stream::BoxStream;
271 use futures_time::future::FutureExt as TimeExt;
272 use hex_literal::hex;
273 use hopr_api::{
274 tickets::{ChannelStats, RedemptionResult},
275 types::crypto_random::Randomizable,
276 };
277 use hopr_chain_connector::{HoprBlockchainSafeConnector, create_trustful_hopr_blokli_connector, testing::*};
278 use hopr_lib::{
279 Address, BytesRepresentable, ChainKeypair, HalfKey, Hash, Keypair, RedeemableTicket, Response, TicketBuilder,
280 UnitaryFloatOps, WinningProbability, XDaiBalance,
281 };
282
283 use super::*;
284
285 mockall::mock! {
286 pub TicketMgmt {}
287 #[allow(refining_impl_trait)]
288 impl TicketManagement for TicketMgmt {
289 type Error = std::io::Error;
290 fn redeem_stream<C: ChainWriteTicketOperations + Send + Sync + 'static>(
291 &self,
292 client: C,
293 channel_id: ChannelId,
294 min_amount: Option<HoprBalance>,
295 ) -> Result<BoxStream<'static, Result<RedemptionResult, std::io::Error>>, std::io::Error>;
296
297 fn neglect_tickets(
298 &self,
299 channel_id: &ChannelId,
300 max_ticket_index: Option<u64>,
301 ) -> Result<Vec<VerifiedTicket>, std::io::Error>;
302
303 fn ticket_stats<'a>(&self, channel_id: Option<&'a ChannelId>) -> Result<ChannelStats, std::io::Error>;
304 }
305 }
306
307 lazy_static::lazy_static! {
308 static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
309 static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
310 static ref CHARLIE: ChainKeypair = ChainKeypair::from_secret(&hex!("d39a926980d6fa96a9eba8f8058b2beb774bc11866a386e9ddf9dc1152557c26")).expect("lazy static keypair should be constructible");
311 static ref PRICE_PER_PACKET: HoprBalance = 10000000000000000_u128.into(); static ref CHANNEL_1: ChannelEntry = ChannelEntry::builder()
314 .between(&*ALICE, &*BOB)
315 .balance(*PRICE_PER_PACKET * 10)
316 .ticket_index(0)
317 .status(ChannelStatus::Open)
318 .epoch(4)
319 .build()
320 .unwrap();
321
322 static ref CHANNEL_2: ChannelEntry = ChannelEntry::builder()
323 .between(&*CHARLIE, &*BOB)
324 .balance(*PRICE_PER_PACKET * 11)
325 .ticket_index(1)
326 .status(ChannelStatus::Open)
327 .epoch(4)
328 .build()
329 .unwrap();
330
331 static ref CHAIN_CLIENT: BlokliTestClient<StaticState> = BlokliTestStateBuilder::default()
332 .with_generated_accounts(&[ALICE.public().as_ref(), BOB.public().as_ref(), CHARLIE.public().as_ref()], false, XDaiBalance::new_base(1), HoprBalance::new_base(1000))
333 .with_channels([*CHANNEL_1, *CHANNEL_2])
334 .build_static_client();
335 }
336
337 fn generate_random_ack_ticket(index: u64, worth_packets: u32) -> anyhow::Result<RedeemableTicket> {
338 let hk1 = HalfKey::random();
339 let hk2 = HalfKey::random();
340
341 let challenge = Response::from_half_keys(&hk1, &hk2)?.to_challenge()?;
342
343 Ok(TicketBuilder::default()
344 .counterparty(&*BOB)
345 .amount(PRICE_PER_PACKET.div_f64(1.0f64)?.amount() * worth_packets)
346 .index(index)
347 .win_prob(WinningProbability::ALWAYS)
348 .channel_epoch(4)
349 .challenge(challenge)
350 .build_signed(&ALICE, &Hash::default())?
351 .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?)
352 .into_redeemable(&BOB, &Hash::default())?)
353 }
354
355 type TestConnector = Arc<HoprBlockchainSafeConnector<BlokliTestClient<StaticState>>>;
356
357 async fn await_redemption_queue_empty(redeems: Arc<parking_lot::RwLock<AbortableList<ChannelId>>>) {
358 loop {
359 hopr_async_runtime::prelude::sleep(Duration::from_millis(100)).await;
360
361 if redeems.read().is_empty() {
362 break;
363 }
364 }
365 }
366
367 #[test_log::test(tokio::test)]
368 async fn test_auto_redeeming_strategy_redeem() -> anyhow::Result<()> {
369 let ack_ticket = generate_random_ack_ticket(0, 5)?;
370
371 let mut connector = create_trustful_hopr_blokli_connector(
372 &BOB,
373 Default::default(),
374 CHAIN_CLIENT.clone(),
375 [1u8; Address::SIZE].into(),
376 )
377 .await?;
378 connector.connect().await?;
379
380 let cfg = AutoRedeemingStrategyConfig {
381 minimum_redeem_ticket_value: 0.into(),
382 redeem_on_winning: true,
383 ..Default::default()
384 };
385
386 let mut mock_tmgr = MockTicketMgmt::new();
387 mock_tmgr
388 .expect_redeem_stream()
389 .once()
390 .with(
391 mockall::predicate::always(),
392 mockall::predicate::eq(*CHANNEL_1.get_id()),
393 mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
394 )
395 .return_once(move |_: TestConnector, _, _| {
396 Ok(futures::stream::once(futures::future::ok(RedemptionResult::Redeemed(ack_ticket.ticket))).boxed())
397 });
398
399 let ars = AutoRedeemingStrategy::new(cfg, Arc::new(connector), Arc::new(mock_tmgr));
400
401 ars.on_acknowledged_winning_ticket(&ack_ticket.ticket).await?;
402 assert!(ars.on_tick().await.is_err());
403
404 await_redemption_queue_empty(ars.running_redemptions.clone())
405 .timeout(futures_time::time::Duration::from_secs(5))
406 .await?;
407
408 Ok(())
409 }
410
411 #[test_log::test(tokio::test)]
412 async fn test_auto_redeeming_strategy_redeem_on_tick() -> anyhow::Result<()> {
413 let mut connector = create_trustful_hopr_blokli_connector(
414 &BOB,
415 Default::default(),
416 CHAIN_CLIENT.clone(),
417 [1u8; Address::SIZE].into(),
418 )
419 .await?;
420 connector.connect().await?;
421
422 let cfg = AutoRedeemingStrategyConfig {
423 minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
424 redeem_on_winning: false,
425 ..Default::default()
426 };
427
428 let mut mock_tmgr = MockTicketMgmt::new();
429 mock_tmgr
430 .expect_redeem_stream()
431 .once()
432 .with(
433 mockall::predicate::always(),
434 mockall::predicate::eq(*CHANNEL_1.get_id()),
435 mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
436 )
437 .return_once(|_: TestConnector, _, _| Ok(futures::stream::empty().boxed()));
438
439 mock_tmgr
440 .expect_redeem_stream()
441 .once()
442 .with(
443 mockall::predicate::always(),
444 mockall::predicate::eq(*CHANNEL_2.get_id()),
445 mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
446 )
447 .return_once(|_: TestConnector, _, _| Ok(futures::stream::empty().boxed()));
448
449 let ars = AutoRedeemingStrategy::new(cfg, Arc::new(connector), Arc::new(mock_tmgr));
450 ars.on_tick().await?;
451
452 await_redemption_queue_empty(ars.running_redemptions.clone())
453 .timeout(futures_time::time::Duration::from_secs(5))
454 .await?;
455
456 Ok(())
457 }
458
459 #[tokio::test]
460 async fn test_auto_redeeming_strategy_should_redeem_singular_ticket_on_close() -> anyhow::Result<()> {
461 let mut channel = *CHANNEL_1;
462 channel.status = ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100)));
463
464 let mut connector = create_trustful_hopr_blokli_connector(
465 &BOB,
466 Default::default(),
467 CHAIN_CLIENT.clone(),
468 [1u8; Address::SIZE].into(),
469 )
470 .await?;
471 connector.connect().await?;
472
473 let cfg = AutoRedeemingStrategyConfig {
474 redeem_all_on_close: true,
475 minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
476 ..Default::default()
477 };
478
479 let mut mock_tmgr = MockTicketMgmt::new();
480 mock_tmgr
481 .expect_redeem_stream()
482 .once()
483 .with(
484 mockall::predicate::always(),
485 mockall::predicate::eq(*CHANNEL_1.get_id()),
486 mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
487 )
488 .return_once(move |_: TestConnector, _, _| Ok(futures::stream::empty().boxed()));
489
490 let ars = AutoRedeemingStrategy::new(cfg, Arc::new(connector), Arc::new(mock_tmgr));
491 ars.on_own_channel_changed(
492 &channel,
493 ChannelDirection::Incoming,
494 ChannelChange::Status {
495 left: ChannelStatus::Open,
496 right: channel.status,
497 },
498 )
499 .await?;
500
501 await_redemption_queue_empty(ars.running_redemptions.clone())
502 .timeout(futures_time::time::Duration::from_secs(5))
503 .await?;
504
505 Ok(())
506 }
507
508 #[tokio::test]
509 async fn test_auto_redeeming_strategy_should_not_redeem_multiple_times_in_same_channel() -> anyhow::Result<()> {
510 let ack_ticket_1 = generate_random_ack_ticket(0, 5)?;
511
512 let mut connector = create_trustful_hopr_blokli_connector(
513 &BOB,
514 Default::default(),
515 CHAIN_CLIENT.clone(),
516 [1u8; Address::SIZE].into(),
517 )
518 .await?;
519 connector.connect().await?;
520
521 let cfg = AutoRedeemingStrategyConfig {
522 minimum_redeem_ticket_value: 0.into(),
523 redeem_on_winning: true,
524 ..Default::default()
525 };
526
527 let mut mock_tmgr = MockTicketMgmt::new();
528 mock_tmgr
529 .expect_redeem_stream()
530 .once()
531 .with(
532 mockall::predicate::always(),
533 mockall::predicate::eq(*CHANNEL_1.get_id()),
534 mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
535 )
536 .return_once(move |_: TestConnector, _, _| {
537 Ok(futures::stream::once(
538 futures::future::ok(RedemptionResult::Redeemed(ack_ticket_1.ticket))
539 .delay(futures_time::time::Duration::from_millis(500)),
540 )
541 .boxed())
542 });
543
544 let ars = AutoRedeemingStrategy::new(cfg, Arc::new(connector), Arc::new(mock_tmgr));
545 ars.on_acknowledged_winning_ticket(&ack_ticket_1.ticket).await?;
546 assert!(matches!(
547 ars.on_acknowledged_winning_ticket(&ack_ticket_1.ticket).await,
548 Err(StrategyError::InProgress)
549 ));
550
551 let mut channel = *CHANNEL_1;
552 channel.status = ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100)));
553
554 assert!(matches!(
555 ars.on_own_channel_changed(
556 &channel,
557 ChannelDirection::Incoming,
558 ChannelChange::Status {
559 left: ChannelStatus::Open,
560 right: channel.status,
561 }
562 )
563 .await,
564 Err(StrategyError::InProgress)
565 ));
566 assert!(ars.on_tick().await.is_err());
567
568 await_redemption_queue_empty(ars.running_redemptions.clone())
569 .timeout(futures_time::time::Duration::from_secs(5))
570 .await?;
571
572 Ok(())
573 }
574}