1use std::{
7 fmt::{Debug, Display, Formatter},
8 str::FromStr,
9};
10
11use async_trait::async_trait;
12use futures::StreamExt;
13use hopr_api::{
14 chain::{ChainReadChannelOperations, ChainWriteTicketOperations, ChannelSelector},
15 db::TicketSelector,
16};
17use hopr_internal_types::{prelude::*, tickets::AcknowledgedTicket};
18use hopr_primitive_types::prelude::*;
19use serde::{Deserialize, Serialize};
20use serde_with::{DisplayFromStr, serde_as};
21use tracing::{debug, error, info};
22use validator::Validate;
23
24use crate::{
25 Strategy,
26 errors::{StrategyError, StrategyError::CriteriaNotSatisfied},
27 strategy::SingularStrategy,
28};
29
30#[cfg(all(feature = "prometheus", not(test)))]
31lazy_static::lazy_static! {
32 static ref METRIC_COUNT_AUTO_REDEEMS: hopr_metrics::SimpleCounter =
33 hopr_metrics::SimpleCounter::new("hopr_strategy_auto_redeem_redeem_count", "Count of initiated automatic redemptions").unwrap();
34}
35
36fn just_true() -> bool {
37 true
38}
39
40fn just_false() -> bool {
41 false
42}
43
44fn min_redeem_hopr() -> HoprBalance {
45 HoprBalance::from_str("1 wxHOPR").unwrap()
46}
47
48#[serde_as]
50#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
51pub struct AutoRedeemingStrategyConfig {
52 #[serde(default = "just_true")]
57 #[default = true]
58 pub redeem_all_on_close: bool,
59
60 #[serde(default = "min_redeem_hopr")]
65 #[serde_as(as = "DisplayFromStr")]
66 #[default(min_redeem_hopr())]
67 pub minimum_redeem_ticket_value: HoprBalance,
68
69 #[serde(default = "just_false")]
79 #[default = false]
80 pub redeem_on_winning: bool,
81}
82
83pub struct AutoRedeemingStrategy<A> {
87 hopr_chain_actions: A,
88 cfg: AutoRedeemingStrategyConfig,
89}
90
91const REDEEMING_CONCURRENCY: usize = 20;
93
94impl<A> Debug for AutoRedeemingStrategy<A> {
95 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
96 write!(f, "{:?}", Strategy::AutoRedeeming(self.cfg))
97 }
98}
99
100impl<A> Display for AutoRedeemingStrategy<A> {
101 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
102 write!(f, "{}", Strategy::AutoRedeeming(self.cfg))
103 }
104}
105
106impl<A> AutoRedeemingStrategy<A> {
107 pub fn new(cfg: AutoRedeemingStrategyConfig, hopr_chain_actions: A) -> Self {
108 Self {
109 cfg,
110 hopr_chain_actions,
111 }
112 }
113}
114
115#[async_trait]
116impl<A> SingularStrategy for AutoRedeemingStrategy<A>
117where
118 A: ChainWriteTicketOperations + ChainReadChannelOperations + Clone + Send + Sync,
119{
120 async fn on_tick(&self) -> crate::errors::Result<()> {
121 if !self.cfg.redeem_on_winning {
122 debug!("trying to redeem all tickets in all channels");
123
124 let chain_actions = self.hopr_chain_actions.clone();
125 self.hopr_chain_actions
126 .stream_channels(
127 ChannelSelector::default()
128 .with_destination(*self.hopr_chain_actions.me())
129 .with_allowed_states(&[
130 ChannelStatusDiscriminants::Open,
131 ChannelStatusDiscriminants::PendingToClose,
132 ])
133 .with_closure_time_range(Utc::now()..),
134 )
135 .await
136 .map_err(|e| StrategyError::Other(e.into()))?
137 .for_each_concurrent(REDEEMING_CONCURRENCY, |channel| {
138 let chain_actions_clone = chain_actions.clone();
139 async move {
140 let selector = TicketSelector::from(&channel)
141 .with_amount(self.cfg.minimum_redeem_ticket_value..)
142 .with_index_range(channel.ticket_index.as_u64()..)
143 .with_state(AcknowledgedTicketStatus::Untouched);
144
145 match chain_actions_clone
146 .redeem_tickets_via_selector(selector)
147 .await
148 .map_err(|e| StrategyError::Other(e.into()))
149 {
150 Ok(awaiter) => {
151 let count = awaiter.len();
153 if count > 0 {
154 #[cfg(all(feature = "prometheus", not(test)))]
155 METRIC_COUNT_AUTO_REDEEMS.increment_by(count as u64);
156
157 info!(count, %channel, "strategy issued ticket redemptions");
158 } else {
159 debug!(count, %channel, "strategy issued no ticket redemptions");
160 }
161 }
162 Err(error) => error!(%error, %channel, "strategy failed to issue ticket redemptions"),
163 }
164 }
165 })
166 .await;
167
168 Ok(())
169 } else {
170 Err(CriteriaNotSatisfied)
171 }
172 }
173
174 async fn on_acknowledged_winning_ticket(&self, ack: &AcknowledgedTicket) -> crate::errors::Result<()> {
175 if self.cfg.redeem_on_winning && ack.verified_ticket().amount.ge(&self.cfg.minimum_redeem_ticket_value) {
176 if let Some(channel) = self
177 .hopr_chain_actions
178 .channel_by_id(&ack.verified_ticket().channel_id)
179 .await
180 .map_err(|e| StrategyError::Other(e.into()))?
181 {
182 info!(%ack, "redeeming");
183
184 if ack.ticket.verified_ticket().index < channel.ticket_index.as_u64() {
185 error!(%ack, "acknowledged ticket is older than channel ticket index");
186 return Err(CriteriaNotSatisfied);
187 }
188
189 #[cfg(all(feature = "prometheus", not(test)))]
190 METRIC_COUNT_AUTO_REDEEMS.increment();
191
192 let rxs = self
193 .hopr_chain_actions
194 .redeem_tickets_via_selector(
195 TicketSelector::from(channel)
196 .with_amount(self.cfg.minimum_redeem_ticket_value..)
197 .with_index_range(channel.ticket_index.as_u64()..=ack.ticket.verified_ticket().index)
198 .with_state(AcknowledgedTicketStatus::Untouched),
199 )
200 .await
201 .map_err(|e| StrategyError::Other(e.into()))?;
202
203 std::mem::drop(rxs); Ok(())
206 } else {
207 Err(CriteriaNotSatisfied)
208 }
209 } else {
210 Err(CriteriaNotSatisfied)
211 }
212 }
213
214 async fn on_own_channel_changed(
215 &self,
216 channel: &ChannelEntry,
217 direction: ChannelDirection,
218 change: ChannelChange,
219 ) -> crate::errors::Result<()> {
220 if direction != ChannelDirection::Incoming || !self.cfg.redeem_all_on_close {
221 return Ok(());
222 }
223
224 if let ChannelChange::Status { left: old, right: new } = change {
225 if old != ChannelStatus::Open || !matches!(new, ChannelStatus::PendingToClose(_)) {
226 debug!(?channel, "ignoring channel state change that's not in PendingToClose");
227 return Ok(());
228 }
229 info!(%channel, "channel transitioned to PendingToClose, checking if it has tickets to redeem");
230
231 let count = self
232 .hopr_chain_actions
233 .redeem_tickets_via_selector(
234 TicketSelector::from(channel)
235 .with_amount(self.cfg.minimum_redeem_ticket_value..)
236 .with_index_range(channel.ticket_index.as_u64()..)
237 .with_state(AcknowledgedTicketStatus::Untouched),
238 )
239 .await
240 .map_err(|e| StrategyError::Other(e.into()))?
241 .len();
242
243 #[cfg(all(feature = "prometheus", not(test)))]
244 METRIC_COUNT_AUTO_REDEEMS.increment_by(count as u64);
245
246 if count > 0 {
247 info!(count, %channel, "tickets in channel being closed sent for redemption");
248 Ok(())
249 } else {
250 info!(%channel, "no redeemable tickets with minimum amount in channel being closed");
251 Err(CriteriaNotSatisfied)
252 }
253 } else {
254 Err(CriteriaNotSatisfied)
255 }
256 }
257}
258
259#[cfg(test)]
260mod tests {
261 use std::{
262 ops::Add,
263 time::{Duration, SystemTime},
264 };
265
266 use hex_literal::hex;
267 use hopr_api::chain::ChainReceipt;
268 use hopr_crypto_random::Randomizable;
269 use hopr_crypto_types::prelude::*;
270
271 use super::*;
272 use crate::tests::{MockChainActions, MockTestActions};
273
274 lazy_static::lazy_static! {
275 static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
276 static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
277 static ref CHARLIE: ChainKeypair = ChainKeypair::from_secret(&hex!("d39a926980d6fa96a9eba8f8058b2beb774bc11866a386e9ddf9dc1152557c26")).expect("lazy static keypair should be constructible");
278 static ref PRICE_PER_PACKET: HoprBalance = 10000000000000000_u128.into(); static ref CHANNEL_1: ChannelEntry = ChannelEntry::new(
281 BOB.public().to_address(),
282 ALICE.public().to_address(),
283 *PRICE_PER_PACKET * 10,
284 0.into(),
285 ChannelStatus::Open,
286 4.into()
287 );
288 static ref CHANNEL_2: ChannelEntry = ChannelEntry::new(
289 BOB.public().to_address(),
290 CHARLIE.public().to_address(),
291 *PRICE_PER_PACKET * 11,
292 1.into(),
293 ChannelStatus::Open,
294 4.into()
295 );
296 }
297
298 fn generate_random_ack_ticket(
299 index: u64,
300 idx_offset: u32,
301 worth_packets: u32,
302 ) -> anyhow::Result<AcknowledgedTicket> {
303 let hk1 = HalfKey::random();
304 let hk2 = HalfKey::random();
305
306 let challenge = Response::from_half_keys(&hk1, &hk2)?.to_challenge()?;
307
308 Ok(TicketBuilder::default()
309 .addresses(&*BOB, &*ALICE)
310 .amount(PRICE_PER_PACKET.div_f64(1.0f64)?.amount() * worth_packets)
311 .index(index)
312 .index_offset(idx_offset)
313 .win_prob(WinningProbability::ALWAYS)
314 .channel_epoch(4)
315 .challenge(challenge)
316 .build_signed(&BOB, &Hash::default())?
317 .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?))
318 }
319
320 #[tokio::test]
321 async fn test_auto_redeeming_strategy_redeem() -> anyhow::Result<()> {
322 let ack_ticket = generate_random_ack_ticket(0, 1, 5)?;
323
324 let mut mock = MockTestActions::new();
325 mock.expect_channel_by_id()
326 .once()
327 .with(mockall::predicate::eq(CHANNEL_1.get_id()))
328 .return_once(|_| Some(CHANNEL_1.clone()));
329
330 mock.expect_redeem_with_selector()
331 .once()
332 .with(mockall::predicate::eq(
333 TicketSelector::from(CHANNEL_1.clone())
334 .with_amount(HoprBalance::zero()..)
335 .with_index_range(
336 ack_ticket.ticket.verified_ticket().index..=ack_ticket.ticket.verified_ticket().index,
337 )
338 .with_state(AcknowledgedTicketStatus::Untouched),
339 ))
340 .return_once(|_| vec![ChainReceipt::default()]);
341
342 let cfg = AutoRedeemingStrategyConfig {
343 minimum_redeem_ticket_value: 0.into(),
344 redeem_on_winning: true,
345 ..Default::default()
346 };
347
348 let ars = AutoRedeemingStrategy::new(cfg, MockChainActions(mock.into()));
349 ars.on_acknowledged_winning_ticket(&ack_ticket).await?;
350 assert!(ars.on_tick().await.is_err());
351
352 Ok(())
353 }
354
355 #[tokio::test]
356 async fn test_auto_redeeming_strategy_redeem_on_tick() -> anyhow::Result<()> {
357 let ack_ticket = generate_random_ack_ticket(0, 1, 5)?;
358
359 let mut mock = MockTestActions::new();
360 mock.expect_me().return_const(BOB.public().to_address());
361
362 mock.expect_stream_channels()
363 .once()
364 .withf(move |selector| {
365 selector.source.is_none()
366 && selector.destination == Some(BOB.public().to_address())
367 && selector.allowed_states
368 == &[
369 ChannelStatusDiscriminants::Open,
370 ChannelStatusDiscriminants::PendingToClose,
371 ]
372 })
373 .return_once(|_| futures::stream::iter([CHANNEL_1.clone(), CHANNEL_2.clone()]).boxed());
374
375 mock.expect_redeem_with_selector()
376 .once()
377 .with(mockall::predicate::eq(
378 TicketSelector::from(CHANNEL_1.clone())
379 .with_amount(HoprBalance::from(*PRICE_PER_PACKET * 5)..)
380 .with_index_range(CHANNEL_1.ticket_index.as_u64()..)
381 .with_state(AcknowledgedTicketStatus::Untouched),
382 ))
383 .returning(|_| vec![ChainReceipt::default()]);
384
385 mock.expect_redeem_with_selector()
386 .once()
387 .with(mockall::predicate::eq(
388 TicketSelector::from(CHANNEL_2.clone())
389 .with_amount(HoprBalance::from(*PRICE_PER_PACKET * 5)..)
390 .with_index_range(CHANNEL_2.ticket_index.as_u64()..)
391 .with_state(AcknowledgedTicketStatus::Untouched),
392 ))
393 .returning(|_| vec![ChainReceipt::default()]);
394
395 let cfg = AutoRedeemingStrategyConfig {
396 minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
397 redeem_on_winning: false,
398 ..Default::default()
399 };
400
401 let ars = AutoRedeemingStrategy::new(cfg, MockChainActions(mock.into()));
402 ars.on_tick().await?;
403 assert!(ars.on_acknowledged_winning_ticket(&ack_ticket).await.is_err());
404
405 Ok(())
406 }
407
408 #[tokio::test]
409 async fn test_auto_redeeming_strategy_redeem_minimum_ticket_amount() -> anyhow::Result<()> {
410 let ack_ticket_below = generate_random_ack_ticket(1, 1, 4)?;
411 let ack_ticket_at = generate_random_ack_ticket(1, 1, 5)?;
412
413 let mut mock = MockTestActions::new();
414 mock.expect_channel_by_id()
415 .once()
416 .with(mockall::predicate::eq(CHANNEL_1.get_id()))
417 .return_once(|_| Some(CHANNEL_1.clone()));
418
419 mock.expect_redeem_with_selector()
420 .once()
421 .with(mockall::predicate::eq(
422 TicketSelector::from(CHANNEL_1.clone())
423 .with_amount(HoprBalance::from(*PRICE_PER_PACKET * 5)..)
424 .with_index_range(CHANNEL_1.ticket_index.as_u64()..=ack_ticket_at.ticket.verified_ticket().index)
425 .with_state(AcknowledgedTicketStatus::Untouched),
426 ))
427 .return_once(|_| vec![ChainReceipt::default()]);
428
429 let cfg = AutoRedeemingStrategyConfig {
430 minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
431 redeem_on_winning: true,
432 ..Default::default()
433 };
434
435 let ars = AutoRedeemingStrategy::new(cfg, MockChainActions(mock.into()));
436 ars.on_acknowledged_winning_ticket(&ack_ticket_below)
437 .await
438 .expect_err("ticket below threshold should not satisfy");
439 ars.on_acknowledged_winning_ticket(&ack_ticket_at).await?;
440
441 Ok(())
442 }
443
444 #[tokio::test]
445 async fn test_auto_redeeming_strategy_should_redeem_singular_ticket_on_close() -> anyhow::Result<()> {
446 let channel = ChannelEntry::new(
447 BOB.public().to_address(),
448 ALICE.public().to_address(),
449 10.into(),
450 0.into(),
451 ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100))),
452 4.into(),
453 );
454
455 let mut mock = MockTestActions::new();
456 mock.expect_redeem_with_selector()
457 .once()
458 .with(mockall::predicate::eq(
459 TicketSelector::from(CHANNEL_1.clone())
460 .with_amount(HoprBalance::from(*PRICE_PER_PACKET * 5)..)
461 .with_index_range(CHANNEL_1.ticket_index.as_u64()..)
462 .with_state(AcknowledgedTicketStatus::Untouched),
463 ))
464 .return_once(|_| vec![ChainReceipt::default()]);
465
466 let cfg = AutoRedeemingStrategyConfig {
467 redeem_all_on_close: true,
468 minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
469 ..Default::default()
470 };
471
472 let ars = AutoRedeemingStrategy::new(cfg, MockChainActions(mock.into()));
473 ars.on_own_channel_changed(
474 &channel,
475 ChannelDirection::Incoming,
476 ChannelChange::Status {
477 left: ChannelStatus::Open,
478 right: channel.status,
479 },
480 )
481 .await?;
482
483 Ok(())
484 }
485
486 #[tokio::test]
487 async fn test_auto_redeeming_strategy_redeem_multiple_tickets_in_channel() -> anyhow::Result<()> {
488 let ack_ticket_1 = generate_random_ack_ticket(0, 2, 5)?;
489
490 let mut mock = MockTestActions::new();
491 mock.expect_channel_by_id()
492 .once()
493 .with(mockall::predicate::eq(CHANNEL_1.get_id()))
494 .return_once(|_| Some(CHANNEL_1.clone()));
495
496 mock.expect_redeem_with_selector()
497 .once()
498 .with(mockall::predicate::eq(
499 TicketSelector::from(CHANNEL_1.clone())
500 .with_amount(HoprBalance::zero()..)
501 .with_index_range(CHANNEL_1.ticket_index.as_u64()..=ack_ticket_1.ticket.verified_ticket().index)
502 .with_state(AcknowledgedTicketStatus::Untouched),
503 ))
504 .return_once(|_| vec![ChainReceipt::default()]);
505
506 let cfg = AutoRedeemingStrategyConfig {
507 minimum_redeem_ticket_value: 0.into(),
508 redeem_on_winning: true,
509 ..Default::default()
510 };
511
512 let ars = AutoRedeemingStrategy::new(cfg, MockChainActions(mock.into()));
513 ars.on_acknowledged_winning_ticket(&ack_ticket_1).await?;
514 assert!(ars.on_tick().await.is_err());
515
516 Ok(())
517 }
518}