1use std::{convert::identity, sync::atomic::AtomicBool};
2
3use futures::{Stream, TryFutureExt};
4use hopr_api::{
5 chain::{ChainWriteTicketOperations, TicketRedeemError},
6 tickets::{ChannelStats, RedemptionResult},
7 types::{internal::prelude::*, primitive::prelude::*},
8};
9
10use crate::{
11 backend::ValueCachedQueue,
12 errors::TicketManagerError,
13 factory::HoprTicketFactory,
14 traits::{OutgoingIndexStore, TicketQueue, TicketQueueStore},
15 utils::{CachedQueueMap, UnrealizedValue},
16};
17
18#[derive(Debug)]
81pub struct HoprTicketManager<S, Q> {
82 channel_tickets: std::sync::Arc<CachedQueueMap<Q>>,
83 store: std::sync::Arc<parking_lot::RwLock<S>>,
84}
85
86impl<S> HoprTicketManager<S, S::Queue>
87where
88 S: OutgoingIndexStore + TicketQueueStore + 'static,
89 S::Queue: Send + Sync + 'static,
90{
91 pub fn new_with_factory(store: S) -> (Self, HoprTicketFactory<S>) {
95 let store = std::sync::Arc::new(parking_lot::RwLock::new(store));
96 let channel_tickets = std::sync::Arc::new(CachedQueueMap::<S::Queue>::default());
97 let factory = HoprTicketFactory::new_shared(store.clone(), std::sync::Arc::downgrade(&channel_tickets));
98
99 (HoprTicketManager { store, channel_tickets }, factory)
100 }
101}
102
103struct RedeemState<C, Q> {
104 lock: std::sync::Arc<AtomicBool>,
105 queue: std::sync::Arc<parking_lot::RwLock<Q>>,
106 chain: C,
107 min_redeem_value: HoprBalance,
108 channel_id: ChannelId,
109}
110
111impl<C, Q> Drop for RedeemState<C, Q> {
112 fn drop(&mut self) {
113 self.lock.store(false, std::sync::atomic::Ordering::Release);
114 }
115}
116
117impl<S> HoprTicketManager<S, S::Queue>
118where
119 S: TicketQueueStore + Send + Sync + 'static,
120 S::Queue: Send + Sync + 'static,
121{
122 pub fn sync_from_incoming_channels(
140 &self,
141 incoming_channels: &[ChannelEntry],
142 ) -> Result<Vec<VerifiedTicket>, TicketManagerError> {
143 let incoming_channels: std::collections::HashSet<_, std::hash::RandomState> =
144 incoming_channels.iter().collect();
145
146 let mut store_read = self.store.upgradable_read();
148 let stored_queues = store_read
149 .iter_queues()
150 .map_err(TicketManagerError::store)?
151 .collect::<Vec<_>>();
152 let mut neglected = Vec::new();
153 let now = hopr_platform::time::current_time();
154 for channel_id in stored_queues {
155 if !incoming_channels
159 .iter()
160 .any(|channel| !channel.closure_time_passed(now) && channel.get_id() == &channel_id)
161 {
162 let mut store_write = parking_lot::RwLockUpgradableReadGuard::upgrade(store_read);
163 neglected.extend(
164 store_write
165 .delete_queue(&channel_id)
166 .map_err(TicketManagerError::store)?,
167 );
168 tracing::debug!(%channel_id, "purged outdated incoming tickets queue");
169 store_read = parking_lot::RwLockWriteGuard::downgrade_to_upgradable(store_write);
170
171 self.channel_tickets.0.remove(&channel_id);
173 }
174 }
175 for channel in incoming_channels
177 .iter()
178 .filter(|channel| !channel.closure_time_passed(now))
179 {
180 let id = channel.get_id();
181
182 let mut store_write = parking_lot::RwLockUpgradableReadGuard::upgrade(store_read);
184 let mut queue = store_write
185 .open_or_create_queue(id)
186 .map_err(TicketManagerError::store)?;
187
188 while queue
190 .peek()
191 .map_err(TicketManagerError::store)?
192 .filter(|ticket| {
193 ticket.verified_ticket().channel_epoch < channel.channel_epoch
194 || ticket.verified_ticket().index < channel.ticket_index
195 })
196 .is_some()
197 {
198 neglected.extend(queue.pop().map_err(TicketManagerError::store)?.map(|t| t.ticket));
199 }
200
201 store_read = parking_lot::RwLockWriteGuard::downgrade_to_upgradable(store_write);
202
203 let queue = ValueCachedQueue::new(queue).map_err(TicketManagerError::store)?;
205
206 tracing::debug!(%id, num_tickets = queue.len().map_err(TicketManagerError::store)?, "loaded redeemable ticket queue for channel");
207 self.channel_tickets.0.insert(*id, queue.into());
208 }
209
210 tracing::debug!(
211 num_channels = incoming_channels.len(),
212 num_neglected = neglected.len(),
213 "synchronized with incoming channels"
214 );
215 Ok(neglected)
216 }
217
218 pub fn unrealized_value(
230 &self,
231 channel_id: &ChannelId,
232 min_index: Option<u64>,
233 ) -> Result<Option<HoprBalance>, TicketManagerError> {
234 self.channel_tickets.unrealized_value(channel_id, min_index)
235 }
236}
237impl<S> hopr_api::tickets::TicketManagement for HoprTicketManager<S, S::Queue>
238where
239 S: TicketQueueStore + Send + Sync + 'static,
240 S::Queue: Send + Sync + 'static,
241{
242 type Error = TicketManagerError;
243
244 fn redeem_stream<C: ChainWriteTicketOperations + Send + Sync + 'static>(
256 &self,
257 chain: C,
258 channel_id: ChannelId,
259 min_amount: Option<HoprBalance>,
260 ) -> Result<impl Stream<Item = Result<RedemptionResult, Self::Error>> + Send, Self::Error> {
261 let initial_state = match self.channel_tickets.0.get(&channel_id) {
262 Some(ticket_queue) => {
263 ticket_queue
264 .redeem_lock
265 .compare_exchange(
266 false,
267 true,
268 std::sync::atomic::Ordering::Acquire,
269 std::sync::atomic::Ordering::Relaxed,
270 )
271 .map_err(|_| TicketManagerError::AlreadyRedeeming)?;
272
273 RedeemState {
274 lock: ticket_queue.redeem_lock.clone(),
275 queue: ticket_queue.queue.clone(),
276 min_redeem_value: min_amount.unwrap_or_default(), chain,
278 channel_id,
279 }
280 }
281 None => return Err(TicketManagerError::ChannelQueueNotFound),
282 };
283
284 Ok(futures::stream::try_unfold(initial_state, |state| {
285 let next_ticket = state.queue.read().0.peek();
287 async move {
288 match next_ticket.map_err(TicketManagerError::store)? {
289 Some(ticket_to_redeem) => {
290 let redeem_attempt_result =
292 if ticket_to_redeem.verified_ticket().amount >= state.min_redeem_value {
293 match state.chain.redeem_ticket(ticket_to_redeem).and_then(identity).await {
294 Ok((redeemed_ticket, _)) => Ok(Some(RedemptionResult::Redeemed(redeemed_ticket))),
295 Err(TicketRedeemError::Rejected(ticket, reason)) => {
296 Ok(Some(RedemptionResult::RejectedOnChain(ticket, reason)))
297 }
298 Err(TicketRedeemError::ProcessingError(_, err)) => {
299 Err(TicketManagerError::redeem(err))
300 }
301 }
302 } else {
303 Ok(Some(RedemptionResult::ValueTooLow(ticket_to_redeem.ticket)))
305 };
306
307 if let Ok(Some(redeem_complete_result)) = &redeem_attempt_result {
310 let mut queue_write = state.queue.write();
314
315 let pop_res = queue_write.0
318 .peek()
319 .map_err(TicketManagerError::store)?
320 .filter(|ticket_to_pop| ticket_to_pop == &ticket_to_redeem)
321 .and_then(|_| queue_write.0.pop().map_err(TicketManagerError::store).transpose())
322 .transpose()?;
323
324 match redeem_complete_result {
326 RedemptionResult::Redeemed(ticket) => {
327 queue_write.1.redeemed_value += ticket.verified_ticket().amount;
328 tracing::info!(%ticket, "ticket has been redeemed");
329 },
330 RedemptionResult::ValueTooLow(ticket) => {
331 queue_write.1.neglected_value += ticket.verified_ticket().amount;
332 tracing::warn!(%ticket, "ticket has been neglected");
333 },
334 RedemptionResult::RejectedOnChain(ticket, reason) => {
335 queue_write.1.rejected_value += ticket.verified_ticket().amount;
336 tracing::warn!(%ticket, reason, "ticket has been rejected on-chain");
337 },
338 }
339
340 if pop_res.is_none() {
346 let ticket = redeem_complete_result.as_ref();
347 tracing::warn!(%ticket, "ticket has been neglected from the queue while it actually completed the redemption process");
348 queue_write.1.neglected_value -= ticket.verified_ticket().amount;
349 }
350 }
351
352 redeem_attempt_result
353 }
354 None => {
355 tracing::debug!(channel_id = %state.channel_id, "no more tickets to redeem in channel");
358 Ok(None)
359 }
360 }
361 .map(|s| s.map(|v| (v, state)))
362 }
363 }))
364 }
365
366 fn neglect_tickets(
374 &self,
375 channel_id: &ChannelId,
376 up_to_index: Option<u64>,
377 ) -> Result<Vec<VerifiedTicket>, TicketManagerError> {
378 let queue = self
379 .channel_tickets
380 .0
381 .get(channel_id)
382 .map(|q| {
383 if q.redeem_lock.load(std::sync::atomic::Ordering::Relaxed) {
384 tracing::warn!(%channel_id, "neglecting tickets in channel while redeeming is ongoing");
385 }
386 q.queue.clone()
387 })
388 .ok_or(TicketManagerError::ChannelQueueNotFound)?;
389
390 let mut neglected_tickets = Vec::new();
391 let mut queue_read = queue.upgradable_read();
392 let max_index = up_to_index.unwrap_or(TicketBuilder::MAX_TICKET_INDEX);
393
394 while queue_read
395 .0
396 .peek()
397 .map_err(TicketManagerError::store)?
398 .filter(|ticket| ticket.verified_ticket().index <= max_index)
399 .is_some()
400 {
401 let mut queue_write = parking_lot::RwLockUpgradableReadGuard::upgrade(queue_read);
403 let maybe_ticket = queue_write.0.pop().map_err(TicketManagerError::store)?;
404 queue_write.1.neglected_value += maybe_ticket.map(|t| t.verified_ticket().amount).unwrap_or_default();
405 queue_read = parking_lot::RwLockWriteGuard::downgrade_to_upgradable(queue_write);
406
407 neglected_tickets.extend(maybe_ticket.map(|t| t.ticket));
408 tracing::debug!(%channel_id, ?maybe_ticket, "neglected ticket in channel");
409 }
410
411 tracing::debug!(%channel_id, num_tickets = neglected_tickets.len(), "ticket neglection done in channel");
413 Ok(neglected_tickets)
414 }
415
416 fn ticket_stats(&self, channel: Option<&ChannelId>) -> Result<ChannelStats, TicketManagerError> {
422 self.channel_tickets
423 .0
424 .iter()
425 .filter(|e| channel.is_none_or(|c| c == e.key()))
426 .try_fold(ChannelStats::default(), |stats, v| {
427 let queue = v.queue.read();
428 Ok::<_, TicketManagerError>(ChannelStats {
429 winning_tickets: queue.1.winning_tickets + stats.winning_tickets,
430 unredeemed_value: queue
431 .0
432 .peek()
433 .map_err(TicketManagerError::store)?
434 .map(|t| queue.0.total_value(t.verified_ticket().channel_epoch, None))
435 .transpose()
436 .map_err(TicketManagerError::store)?
437 .unwrap_or_default()
438 + stats.unredeemed_value,
439 rejected_value: queue.1.rejected_value + stats.rejected_value,
440 neglected_value: queue.1.neglected_value + stats.neglected_value,
441 })
442 })
443 }
444
445 fn insert_incoming_ticket(&self, ticket: RedeemableTicket) -> Result<Vec<VerifiedTicket>, TicketManagerError> {
446 let mut neglected_tickets = Vec::with_capacity(0);
448
449 let ticket_id = ticket.ticket_id();
450 match self.channel_tickets.0.entry(ticket_id.id) {
451 dashmap::Entry::Occupied(e) => {
452 let mut queue = e.get().queue.write();
457
458 if let Some(last_ticket) = queue.0.peek().map_err(TicketManagerError::store)? {
463 if last_ticket.verified_ticket().channel_epoch < ticket.verified_ticket().channel_epoch {
464 let mut neg = queue.0.drain().map_err(TicketManagerError::store)?;
466 queue.1.neglected_value += neg.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>();
467
468 neglected_tickets.append(&mut neg);
470 tracing::warn!(%ticket_id, num_neglected = neglected_tickets.len(), "winning ticket has neglected unredeemed tickets from previous epochs");
471 } else if last_ticket.verified_ticket().channel_epoch > ticket.verified_ticket().channel_epoch {
472 tracing::warn!(%ticket_id, "tried to insert incoming ticket from an older epoch");
473
474 queue.1.winning_tickets += 1; queue.1.neglected_value += ticket.verified_ticket().amount;
476 neglected_tickets.push(ticket.ticket);
477 return Ok(neglected_tickets);
478 }
479 }
480 queue.0.push(ticket).map_err(TicketManagerError::store)?;
481 queue.1.winning_tickets += 1;
482
483 tracing::debug!(%ticket_id, "winning ticket on channel");
484 }
485 dashmap::Entry::Vacant(v) => {
486 let mut store = self.store.write();
490
491 let queue = store
492 .open_or_create_queue(&ticket.ticket_id().id)
493 .map_err(TicketManagerError::store)?;
494
495 let mut queue = ValueCachedQueue::new(queue).map_err(TicketManagerError::store)?;
497
498 if !queue.is_empty().map_err(TicketManagerError::store)? {
500 return Err(TicketManagerError::Other(anyhow::anyhow!(
501 "fatal error: queue not empty"
502 )));
503 }
504
505 queue.push(ticket).map_err(TicketManagerError::store)?;
506 v.insert(queue.into()); tracing::debug!(%ticket_id, "first winning ticket on channel");
508 }
509 }
510
511 Ok(neglected_tickets)
512 }
513}
514
515#[cfg(test)]
516mod tests {
517 use std::ops::Sub;
518
519 use futures::{TryStreamExt, pin_mut};
520 use hopr_api::{
521 OffchainKeypair,
522 tickets::TicketManagement,
523 types::crypto::prelude::{ChainKeypair, Keypair},
524 };
525 use hopr_chain_connector::{
526 BlockchainConnectorConfig, HoprBlockchainConnector, InMemoryBackend, PayloadGenerator, SafePayloadGenerator,
527 reexports::chain::contract_addresses_for_network,
528 testing::{BlokliTestClient, BlokliTestStateBuilder, FullStateEmulator},
529 };
530 use rand::prelude::SliceRandom;
531
532 use super::*;
533 use crate::{
534 MemoryStore, MemoryTicketQueue,
535 traits::tests::{generate_owned_tickets, generate_tickets},
536 };
537
538 fn create_mgr() -> anyhow::Result<HoprTicketManager<MemoryStore, MemoryTicketQueue>> {
539 Ok(HoprTicketManager::new_with_factory(MemoryStore::default()).0)
540 }
541
542 #[test]
543 fn ticket_manager_non_existing_channel_should_return_empty_stats() -> anyhow::Result<()> {
544 let mgr = create_mgr()?;
545
546 assert_eq!(ChannelStats::default(), mgr.ticket_stats(None)?);
547 assert_eq!(ChannelStats::default(), mgr.ticket_stats(Some(&ChannelId::default()))?);
548 Ok(())
549 }
550
551 #[test]
552 fn ticket_manager_should_update_state_when_winning_tickets_are_inserted() -> anyhow::Result<()> {
553 let mgr = create_mgr()?;
554
555 let src = ChainKeypair::random();
556 let dst = ChainKeypair::random();
557
558 let channel = ChannelEntry::builder()
559 .between(&src, &dst)
560 .amount(10)
561 .ticket_index(1)
562 .status(ChannelStatus::Open)
563 .epoch(1)
564 .build()?;
565
566 let tickets = generate_owned_tickets(&src, &dst, 2, 1..=1)?;
567
568 mgr.insert_incoming_ticket(tickets[0])?;
569
570 assert_eq!(
571 ChannelStats {
572 winning_tickets: 1,
573 unredeemed_value: tickets[0].verified_ticket().amount,
574 rejected_value: HoprBalance::zero(),
575 neglected_value: HoprBalance::zero(),
576 },
577 mgr.ticket_stats(Some(&channel.get_id()))?
578 );
579
580 mgr.insert_incoming_ticket(tickets[1])?;
581
582 assert_eq!(
583 ChannelStats {
584 winning_tickets: 2,
585 unredeemed_value: tickets[0].verified_ticket().amount + tickets[1].verified_ticket().amount,
586 rejected_value: HoprBalance::zero(),
587 neglected_value: HoprBalance::zero(),
588 },
589 mgr.ticket_stats(Some(&channel.get_id()))?
590 );
591
592 Ok(())
593 }
594
595 #[test]
596 fn ticket_manager_should_sync_incoming_channels_from_chain_state() -> anyhow::Result<()> {
597 let mgr = create_mgr()?;
598
599 let src = ChainKeypair::random();
600 let dst = ChainKeypair::random();
601
602 let channel = ChannelEntry::builder()
603 .between(&src, &dst)
604 .amount(10)
605 .ticket_index(1)
606 .status(ChannelStatus::Open)
607 .epoch(1)
608 .build()?;
609
610 let neglected = mgr.sync_from_incoming_channels(&[channel])?;
611 assert!(neglected.is_empty());
612
613 let queues = mgr.store.read().iter_queues()?.collect::<Vec<_>>();
614 assert_eq!(vec![*channel.get_id()], queues);
615
616 Ok(())
617 }
618
619 #[test]
620 fn ticket_manager_should_neglect_tickets_from_closed_channels_on_sync() -> anyhow::Result<()> {
621 let mgr = create_mgr()?;
622
623 let tickets = generate_tickets()?;
624 let neglected = mgr.insert_incoming_ticket(tickets[0])?;
625 assert!(neglected.is_empty());
626
627 let channel = ChannelEntry::builder()
628 .between(
629 *tickets[0].ticket.verified_issuer(),
630 tickets[0].verified_ticket().counterparty,
631 )
632 .amount(10)
633 .ticket_index(tickets[0].verified_ticket().index)
634 .status(ChannelStatus::Closed)
635 .epoch(tickets[0].verified_ticket().channel_epoch)
636 .build()?;
637
638 let neglected = mgr.sync_from_incoming_channels(&[channel])?;
639 assert_eq!(1, neglected.len());
640 assert_eq!(tickets[0].ticket, neglected[0]);
641
642 Ok(())
643 }
644
645 #[test]
646 fn ticket_manager_should_neglect_tickets_from_older_epoch_channels_on_sync() -> anyhow::Result<()> {
647 let mgr = create_mgr()?;
648
649 let tickets = generate_tickets()?;
650 let neglected = mgr.insert_incoming_ticket(tickets[0])?;
651 assert!(neglected.is_empty());
652
653 let channel = ChannelEntry::builder()
654 .between(
655 *tickets[0].ticket.verified_issuer(),
656 tickets[0].verified_ticket().counterparty,
657 )
658 .amount(10)
659 .ticket_index(1)
660 .status(ChannelStatus::Open)
661 .epoch(tickets[0].verified_ticket().channel_epoch + 1)
662 .build()?;
663
664 let neglected = mgr.sync_from_incoming_channels(&[channel])?;
665 assert_eq!(1, neglected.len());
666 assert_eq!(tickets[0].ticket, neglected[0]);
667
668 Ok(())
669 }
670
671 #[test]
672 fn ticket_manager_should_neglect_tickets_with_older_index_channels_on_sync() -> anyhow::Result<()> {
673 let mgr = create_mgr()?;
674
675 let tickets = generate_tickets()?;
676 let neglected = mgr.insert_incoming_ticket(tickets[0])?;
677 assert!(neglected.is_empty());
678
679 let channel = ChannelEntry::builder()
680 .between(
681 *tickets[0].ticket.verified_issuer(),
682 tickets[0].verified_ticket().counterparty,
683 )
684 .amount(10)
685 .ticket_index(tickets[0].verified_ticket().index + 1)
686 .status(ChannelStatus::Open)
687 .epoch(tickets[0].verified_ticket().channel_epoch)
688 .build()?;
689
690 let neglected = mgr.sync_from_incoming_channels(&[channel])?;
691 assert_eq!(1, neglected.len());
692 assert_eq!(tickets[0].ticket, neglected[0]);
693
694 Ok(())
695 }
696
697 #[test]
698 fn ticket_manager_should_neglect_tickets_from_effectively_closed_channels_on_sync() -> anyhow::Result<()> {
699 let mgr = create_mgr()?;
700
701 let tickets = generate_tickets()?;
702 let neglected = mgr.insert_incoming_ticket(tickets[0])?;
703 assert!(neglected.is_empty());
704
705 let channel = ChannelEntry::builder()
706 .between(
707 *tickets[0].ticket.verified_issuer(),
708 tickets[0].verified_ticket().counterparty,
709 )
710 .amount(10)
711 .ticket_index(1)
712 .status(ChannelStatus::PendingToClose(
713 std::time::SystemTime::now().sub(std::time::Duration::from_mins(10)),
714 ))
715 .epoch(1)
716 .build()?;
717
718 let neglected = mgr.sync_from_incoming_channels(&[channel])?;
719 assert_eq!(1, neglected.len());
720 assert_eq!(tickets[0].ticket, neglected[0]);
721
722 Ok(())
723 }
724
725 #[test]
726 fn ticket_manager_should_neglect_tickets_from_non_existent_channels_on_sync() -> anyhow::Result<()> {
727 let mgr = create_mgr()?;
728
729 let tickets = generate_tickets()?;
730
731 let neglected = mgr.insert_incoming_ticket(tickets[0])?;
732 assert!(neglected.is_empty());
733
734 let neglected = mgr.sync_from_incoming_channels(&[])?;
735 assert_eq!(1, neglected.len());
736 assert_eq!(tickets[0].ticket, neglected[0]);
737
738 Ok(())
739 }
740
741 #[test]
742 fn ticket_manager_should_neglect_tickets_on_demand() -> anyhow::Result<()> {
743 let mgr = create_mgr()?;
744
745 let tickets = generate_tickets()?;
746 let epoch = tickets[0].ticket_id().epoch;
747
748 let tickets = tickets
749 .into_iter()
750 .filter(|t| t.verified_ticket().channel_epoch == epoch)
751 .collect::<Vec<_>>();
752
753 let channel = ChannelEntry::builder()
754 .between(
755 *tickets[0].ticket.verified_issuer(),
756 tickets[0].verified_ticket().counterparty,
757 )
758 .amount(10)
759 .ticket_index(0)
760 .status(ChannelStatus::Open)
761 .epoch(1)
762 .build()?;
763
764 for ticket in tickets.iter() {
765 let neglected = mgr.insert_incoming_ticket(*ticket)?;
766 assert!(neglected.is_empty());
767 }
768
769 let neglected = mgr.sync_from_incoming_channels(&[channel])?;
770 assert!(neglected.is_empty());
771
772 let unrealized_value = mgr
773 .unrealized_value(channel.get_id(), None)?
774 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
775 assert_eq!(
776 tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
777 unrealized_value
778 );
779
780 let neglected = mgr.neglect_tickets(&channel.get_id(), None)?;
781 assert_eq!(tickets.iter().map(|t| t.ticket).collect::<Vec<_>>(), neglected);
782
783 let unrealized_value_after = mgr
784 .unrealized_value(channel.get_id(), None)?
785 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
786 assert_eq!(
787 unrealized_value_after,
788 unrealized_value
789 - neglected
790 .iter()
791 .map(|t| t.verified_ticket().amount)
792 .sum::<HoprBalance>()
793 );
794
795 assert_eq!(
796 ChannelStats {
797 winning_tickets: tickets.len() as u128,
798 unredeemed_value: unrealized_value_after,
799 rejected_value: HoprBalance::zero(),
800 neglected_value: neglected.iter().map(|t| t.verified_ticket().amount).sum(),
801 },
802 mgr.ticket_stats(Some(&channel.get_id()))?
803 );
804
805 Ok(())
806 }
807
808 #[test]
809 fn ticket_manager_should_neglect_tickets_on_demand_with_upper_limit_on_index() -> anyhow::Result<()> {
810 let mgr = create_mgr()?;
811
812 let tickets = generate_tickets()?;
813 let epoch = tickets[0].ticket_id().epoch;
814
815 let tickets = tickets
816 .into_iter()
817 .filter(|t| t.verified_ticket().channel_epoch == epoch)
818 .collect::<Vec<_>>();
819
820 let channel = ChannelEntry::builder()
821 .between(
822 *tickets[0].ticket.verified_issuer(),
823 tickets[0].verified_ticket().counterparty,
824 )
825 .amount(10)
826 .ticket_index(0)
827 .status(ChannelStatus::Open)
828 .epoch(1)
829 .build()?;
830
831 for ticket in tickets.iter() {
832 let neglected = mgr.insert_incoming_ticket(*ticket)?;
833 assert!(neglected.is_empty());
834 }
835
836 let neglected = mgr.sync_from_incoming_channels(&[channel])?;
837 assert!(neglected.is_empty());
838
839 let unrealized_value = mgr
840 .unrealized_value(channel.get_id(), None)?
841 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
842 assert_eq!(
843 tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
844 unrealized_value
845 );
846
847 let neglected = mgr.neglect_tickets(&channel.get_id(), Some(3))?;
848 assert_eq!(
849 tickets
850 .iter()
851 .filter(|t| t.verified_ticket().index <= 3)
852 .map(|t| t.ticket)
853 .collect::<Vec<_>>(),
854 neglected
855 );
856
857 let unrealized_value_after = mgr
858 .unrealized_value(channel.get_id(), None)?
859 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
860 assert_eq!(
861 unrealized_value_after,
862 unrealized_value
863 - neglected
864 .iter()
865 .map(|t| t.verified_ticket().amount)
866 .sum::<HoprBalance>()
867 );
868
869 assert_eq!(
870 ChannelStats {
871 winning_tickets: tickets.len() as u128,
872 unredeemed_value: unrealized_value_after,
873 rejected_value: HoprBalance::zero(),
874 neglected_value: neglected.iter().map(|t| t.verified_ticket().amount).sum(),
875 },
876 mgr.ticket_stats(Some(&channel.get_id()))?
877 );
878
879 Ok(())
880 }
881
882 #[test]
883 fn ticket_manager_unrealized_value_should_increase_when_tickets_are_added() -> anyhow::Result<()> {
884 let mgr = create_mgr()?;
885
886 let mut tickets = generate_tickets()?;
887 let channel_id = tickets[0].ticket_id().id;
888 let epoch = tickets[0].ticket_id().epoch;
889 tickets.retain(|ticket| ticket.verified_ticket().channel_epoch == epoch);
890
891 assert!(!tickets.is_empty());
892
893 assert!(matches!(mgr.unrealized_value(&channel_id, None), Ok(None)));
894
895 let mut last_unrealized_value = HoprBalance::zero();
896 assert_eq!(HoprBalance::zero(), last_unrealized_value);
897
898 for ticket in tickets.iter() {
899 let neglected = mgr.insert_incoming_ticket(*ticket)?;
900 assert!(neglected.is_empty());
901
902 let new_unrealized_value = mgr
903 .unrealized_value(&channel_id, None)?
904 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
905 assert_eq!(
906 new_unrealized_value - last_unrealized_value,
907 ticket.verified_ticket().amount
908 );
909
910 last_unrealized_value = new_unrealized_value;
911 }
912
913 let expected_unrealized_value: HoprBalance = tickets.iter().map(|ticket| ticket.verified_ticket().amount).sum();
914 assert_eq!(expected_unrealized_value, last_unrealized_value);
915
916 assert_eq!(
917 ChannelStats {
918 winning_tickets: tickets.len() as u128,
919 unredeemed_value: expected_unrealized_value,
920 rejected_value: HoprBalance::zero(),
921 neglected_value: HoprBalance::zero(),
922 },
923 mgr.ticket_stats(Some(&tickets[0].ticket.channel_id()))?
924 );
925
926 Ok(())
927 }
928
929 #[test]
930 fn ticket_manager_inserted_ticket_with_older_epoch_should_be_neglected() -> anyhow::Result<()> {
931 let mgr = create_mgr()?;
932
933 let tickets = generate_tickets()?;
934 assert!(!tickets.is_empty());
935 let channel_id = tickets[0].ticket_id().id;
936
937 let tickets_from_epoch_1 = tickets
938 .iter()
939 .filter(|ticket| ticket.verified_ticket().channel_epoch == 1)
940 .cloned()
941 .collect::<Vec<_>>();
942 assert!(!tickets_from_epoch_1.is_empty());
943
944 let tickets_from_epoch_2 = tickets
945 .iter()
946 .filter(|ticket| ticket.verified_ticket().channel_epoch == 2)
947 .cloned()
948 .collect::<Vec<_>>();
949 assert!(!tickets_from_epoch_2.is_empty());
950
951 for new_ticket in &tickets_from_epoch_2 {
952 let neglected = mgr.insert_incoming_ticket(*new_ticket)?;
953 assert!(neglected.is_empty());
954 }
955
956 for old_ticket in &tickets_from_epoch_1 {
957 let neglected = mgr.insert_incoming_ticket(*old_ticket)?;
958 assert_eq!(vec![old_ticket.ticket], neglected);
959 }
960
961 let stats = mgr.ticket_stats(Some(&channel_id))?;
962
963 assert_eq!(
964 (tickets_from_epoch_1.len() + tickets_from_epoch_2.len()) as u128,
965 stats.winning_tickets
966 );
967 assert_eq!(
968 tickets_from_epoch_2
969 .iter()
970 .map(|t| t.verified_ticket().amount)
971 .sum::<HoprBalance>(),
972 stats.unredeemed_value
973 );
974 assert_eq!(HoprBalance::zero(), stats.rejected_value);
975
976 Ok(())
977 }
978
979 #[test]
980 fn ticket_manager_ticket_insertion_should_neglect_tickets_from_previous_epochs() -> anyhow::Result<()> {
981 let mgr = create_mgr()?;
982
983 let tickets = generate_tickets()?;
984 assert!(!tickets.is_empty());
985 let channel_id = tickets[0].ticket_id().id;
986
987 let tickets_from_epoch_1 = tickets
988 .iter()
989 .filter(|ticket| ticket.verified_ticket().channel_epoch == 1)
990 .cloned()
991 .collect::<Vec<_>>();
992 assert!(!tickets_from_epoch_1.is_empty());
993
994 let tickets_from_epoch_2 = tickets
995 .iter()
996 .filter(|ticket| ticket.verified_ticket().channel_epoch == 2)
997 .cloned()
998 .collect::<Vec<_>>();
999 assert!(!tickets_from_epoch_2.is_empty());
1000
1001 assert!(matches!(mgr.unrealized_value(&channel_id, None), Ok(None)));
1002
1003 for ticket in tickets_from_epoch_1.iter() {
1004 let neglected = mgr.insert_incoming_ticket(*ticket)?;
1005 assert!(neglected.is_empty());
1006 }
1007
1008 let new_unrealized_value = mgr
1009 .unrealized_value(&channel_id, None)?
1010 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1011 assert_eq!(
1012 new_unrealized_value,
1013 tickets_from_epoch_1
1014 .iter()
1015 .map(|ticket| ticket.verified_ticket().amount)
1016 .sum()
1017 );
1018
1019 let neglected = mgr.insert_incoming_ticket(tickets_from_epoch_2[0].clone())?;
1020 assert_eq!(
1021 tickets_from_epoch_1.iter().map(|t| t.ticket).collect::<Vec<_>>(),
1022 neglected
1023 );
1024
1025 let new_unrealized_value = mgr
1027 .unrealized_value(&channel_id, None)?
1028 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1029 assert_eq!(tickets_from_epoch_2[0].verified_ticket().amount, new_unrealized_value);
1030
1031 assert_eq!(
1032 ChannelStats {
1033 winning_tickets: tickets_from_epoch_1.len() as u128 + 1,
1034 unredeemed_value: new_unrealized_value,
1035 rejected_value: HoprBalance::zero(),
1036 neglected_value: neglected.iter().map(|t| t.verified_ticket().amount).sum(),
1037 },
1038 mgr.ticket_stats(Some(&channel_id))?
1039 );
1040
1041 let queue_tickets = mgr
1042 .store
1043 .write()
1044 .open_or_create_queue(&channel_id)?
1045 .iter_unordered()?
1046 .collect::<Result<Vec<_>, _>>()?;
1047 assert_eq!(1, queue_tickets.len());
1048 assert_eq!(
1049 tickets_from_epoch_2[0].verified_ticket(),
1050 queue_tickets[0].verified_ticket()
1051 );
1052
1053 Ok(())
1054 }
1055
1056 pub type TestConnector = HoprBlockchainConnector<
1057 BlokliTestClient<FullStateEmulator>,
1058 InMemoryBackend,
1059 SafePayloadGenerator,
1060 <SafePayloadGenerator as PayloadGenerator>::TxRequest,
1061 >;
1062
1063 async fn create_test_connector(
1064 private_key: &ChainKeypair,
1065 channel: &ChannelEntry,
1066 tx_sim_delay: Option<std::time::Duration>,
1067 ) -> anyhow::Result<TestConnector> {
1068 let module_addr: [u8; 20] = [1; 20];
1069 assert_eq!(private_key.public().to_address(), channel.destination);
1071
1072 let blokli_client = BlokliTestStateBuilder::default()
1073 .with_balances([(private_key.public().to_address(), XDaiBalance::new_base(1))])
1074 .with_accounts([
1075 (
1076 AccountEntry {
1077 public_key: *OffchainKeypair::random().public(),
1078 chain_addr: private_key.public().to_address(),
1079 entry_type: AccountType::NotAnnounced,
1080 safe_address: None,
1081 key_id: 1.into(),
1082 },
1083 HoprBalance::new_base(1000),
1084 XDaiBalance::new_base(1),
1085 ),
1086 (
1087 AccountEntry {
1088 public_key: *OffchainKeypair::random().public(),
1089 chain_addr: channel.source,
1090 entry_type: AccountType::NotAnnounced,
1091 safe_address: None,
1092 key_id: 2.into(),
1093 },
1094 HoprBalance::new_base(1000),
1095 XDaiBalance::new_base(1),
1096 ),
1097 ])
1098 .with_channels([*channel])
1099 .with_hopr_network_chain_info("rotsee")
1100 .build_dynamic_client(module_addr.into())
1101 .with_tx_simulation_delay(tx_sim_delay.unwrap_or(std::time::Duration::from_millis(500)));
1102
1103 let mut connector = TestConnector::new(
1104 private_key.clone(),
1105 BlockchainConnectorConfig::default(),
1106 blokli_client,
1107 InMemoryBackend::default(),
1108 SafePayloadGenerator::new(
1109 &private_key,
1110 contract_addresses_for_network("rotsee").unwrap().1,
1111 module_addr.into(),
1112 ),
1113 );
1114 connector.connect().await?;
1115
1116 Ok(connector)
1117 }
1118
1119 #[test_log::test(tokio::test)]
1120 async fn ticket_manager_should_redeem_tickets_on_demand() -> anyhow::Result<()> {
1121 let mgr = create_mgr()?;
1122
1123 let src = ChainKeypair::random();
1124 let dst = ChainKeypair::random();
1125
1126 let channel = ChannelEntry::builder()
1127 .between(&src, &dst)
1128 .amount(10_000_000_000_u64)
1129 .ticket_index(0)
1130 .status(ChannelStatus::Open)
1131 .epoch(1)
1132 .build()?;
1133
1134 let mut tickets = generate_owned_tickets(&src, &dst, 3, 1..=1)?;
1135 tickets.shuffle(&mut rand::rng());
1136
1137 for ticket in tickets.iter() {
1138 assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1139 }
1140
1141 tickets.sort();
1142
1143 let mut unrealized_value = mgr
1144 .unrealized_value(channel.get_id(), None)?
1145 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1146 assert_eq!(
1147 tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1148 unrealized_value
1149 );
1150
1151 let connector = create_test_connector(&dst, &channel, None).await?;
1152
1153 let stream = mgr.redeem_stream(connector, *channel.get_id(), None)?;
1154
1155 pin_mut!(stream);
1156
1157 assert_eq!(
1158 Some(RedemptionResult::Redeemed(tickets[0].ticket)),
1159 stream.try_next().await?
1160 );
1161 assert_eq!(
1162 mgr.unrealized_value(channel.get_id(), None)?
1163 .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1164 unrealized_value - tickets[0].verified_ticket().amount
1165 );
1166 unrealized_value = mgr
1167 .unrealized_value(channel.get_id(), None)?
1168 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1169
1170 assert_eq!(
1171 Some(RedemptionResult::Redeemed(tickets[1].ticket)),
1172 stream.try_next().await?
1173 );
1174 assert_eq!(
1175 mgr.unrealized_value(channel.get_id(), None)?
1176 .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1177 unrealized_value - tickets[1].verified_ticket().amount
1178 );
1179 unrealized_value = mgr
1180 .unrealized_value(channel.get_id(), None)?
1181 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1182
1183 assert_eq!(
1184 Some(RedemptionResult::Redeemed(tickets[2].ticket)),
1185 stream.try_next().await?
1186 );
1187 assert_eq!(
1188 mgr.unrealized_value(channel.get_id(), None)?
1189 .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1190 unrealized_value - tickets[2].verified_ticket().amount
1191 );
1192
1193 assert_eq!(None, stream.try_next().await?);
1194
1195 Ok(())
1196 }
1197
1198 #[tokio::test]
1199 async fn ticket_manager_should_not_allow_concurrent_redemptions_on_the_same_channel() -> anyhow::Result<()> {
1200 let mgr = create_mgr()?;
1201
1202 let src = ChainKeypair::random();
1203 let dst = ChainKeypair::random();
1204
1205 let channel = ChannelEntry::builder()
1206 .between(&src, &dst)
1207 .amount(10_000_000_000_u64)
1208 .ticket_index(0)
1209 .status(ChannelStatus::Open)
1210 .epoch(1)
1211 .build()?;
1212
1213 let mut tickets = generate_owned_tickets(&src, &dst, 3, 1..=1)?;
1214 tickets.shuffle(&mut rand::rng());
1215
1216 for ticket in tickets.iter() {
1217 assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1218 }
1219
1220 let connector = std::sync::Arc::new(create_test_connector(&dst, &channel, None).await?);
1221
1222 let stream = mgr.redeem_stream(connector.clone(), *channel.get_id(), None)?;
1223
1224 assert!(mgr.redeem_stream(connector.clone(), *channel.get_id(), None).is_err());
1225
1226 drop(stream);
1227
1228 assert!(mgr.redeem_stream(connector.clone(), *channel.get_id(), None).is_ok());
1229
1230 Ok(())
1231 }
1232
1233 #[tokio::test]
1234 async fn ticket_manager_ticket_neglection_should_cut_ongoing_redemption_short() -> anyhow::Result<()> {
1235 let mgr = create_mgr()?;
1236
1237 let src = ChainKeypair::random();
1238 let dst = ChainKeypair::random();
1239
1240 let channel = ChannelEntry::builder()
1241 .between(&src, &dst)
1242 .amount(10_000_000_000_u64)
1243 .ticket_index(0)
1244 .status(ChannelStatus::Open)
1245 .epoch(1)
1246 .build()?;
1247
1248 let mut tickets = generate_owned_tickets(&src, &dst, 3, 1..=1)?;
1249 tickets.shuffle(&mut rand::rng());
1250
1251 for ticket in tickets.iter() {
1252 assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1253 }
1254
1255 tickets.sort();
1256
1257 let unrealized_value = mgr
1258 .unrealized_value(channel.get_id(), None)?
1259 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1260 assert_eq!(
1261 tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1262 unrealized_value
1263 );
1264
1265 let connector = std::sync::Arc::new(create_test_connector(&dst, &channel, None).await?);
1266
1267 let stream = mgr.redeem_stream(connector.clone(), *channel.get_id(), None)?;
1268 pin_mut!(stream);
1269
1270 assert_eq!(
1271 Some(RedemptionResult::Redeemed(tickets[0].ticket)),
1272 stream.try_next().await?
1273 );
1274 assert_eq!(
1275 mgr.unrealized_value(channel.get_id(), None)?
1276 .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1277 unrealized_value - tickets[0].verified_ticket().amount
1278 );
1279
1280 let neglected = mgr.neglect_tickets(&channel.get_id(), None)?;
1281 assert_eq!(
1282 tickets.into_iter().skip(1).map(|t| t.ticket).collect::<Vec<_>>(),
1283 neglected
1284 );
1285 assert_eq!(
1286 HoprBalance::zero(),
1287 mgr.unrealized_value(channel.get_id(), None)?
1288 .ok_or(anyhow::anyhow!("must have unrealized value"))?
1289 );
1290
1291 assert_eq!(None, stream.try_next().await?);
1292
1293 Ok(())
1294 }
1295
1296 #[tokio::test]
1297 async fn ticket_manager_partial_ticket_neglection_should_cut_ongoing_redemption_short() -> anyhow::Result<()> {
1298 let mgr = create_mgr()?;
1299
1300 let src = ChainKeypair::random();
1301 let dst = ChainKeypair::random();
1302
1303 let channel = ChannelEntry::builder()
1304 .between(&src, &dst)
1305 .amount(10_000_000_000_u64)
1306 .ticket_index(0)
1307 .status(ChannelStatus::Open)
1308 .epoch(1)
1309 .build()?;
1310
1311 let mut tickets = generate_owned_tickets(&src, &dst, 5, 1..=1)?;
1312 tickets.shuffle(&mut rand::rng());
1313
1314 for ticket in tickets.iter() {
1315 assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1316 }
1317
1318 tickets.sort();
1319
1320 let mut unrealized_value = mgr
1321 .unrealized_value(channel.get_id(), None)?
1322 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1323 assert_eq!(
1324 tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1325 unrealized_value
1326 );
1327
1328 let connector = std::sync::Arc::new(create_test_connector(&dst, &channel, None).await?);
1329
1330 let stream = mgr.redeem_stream(connector.clone(), *channel.get_id(), None)?;
1331 pin_mut!(stream);
1332
1333 assert_eq!(
1335 Some(RedemptionResult::Redeemed(tickets[0].ticket)),
1336 stream.try_next().await?
1337 );
1338 assert_eq!(
1339 mgr.unrealized_value(channel.get_id(), None)?
1340 .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1341 unrealized_value - tickets[0].verified_ticket().amount
1342 );
1343 unrealized_value = mgr
1344 .unrealized_value(channel.get_id(), None)?
1345 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1346
1347 let neglected = mgr.neglect_tickets(&channel.get_id(), Some(tickets[3].verified_ticket().index))?;
1349 assert_eq!(
1350 tickets.iter().skip(1).take(3).map(|t| t.ticket).collect::<Vec<_>>(),
1351 neglected
1352 );
1353 assert_eq!(
1354 unrealized_value
1355 - neglected
1356 .into_iter()
1357 .map(|t| t.verified_ticket().amount)
1358 .sum::<HoprBalance>(),
1359 mgr.unrealized_value(channel.get_id(), None)?
1360 .ok_or(anyhow::anyhow!("must have unrealized value"))?
1361 );
1362
1363 assert_eq!(
1365 Some(RedemptionResult::Redeemed(tickets[4].ticket)),
1366 stream.try_next().await?
1367 );
1368
1369 assert_eq!(
1370 HoprBalance::zero(),
1371 mgr.unrealized_value(channel.get_id(), None)?
1372 .ok_or(anyhow::anyhow!("must have unrealized value"))?
1373 );
1374
1375 assert_eq!(None, stream.try_next().await?);
1376
1377 Ok(())
1378 }
1379
1380 #[tokio::test]
1381 async fn ticket_manager_ticket_neglection_during_on_chain_redemption_should_be_detected() -> anyhow::Result<()> {
1382 let mgr = std::sync::Arc::new(create_mgr()?);
1383
1384 let src = ChainKeypair::random();
1385 let dst = ChainKeypair::random();
1386
1387 let channel = ChannelEntry::builder()
1388 .between(&src, &dst)
1389 .amount(10_000_000_000_u64)
1390 .ticket_index(0)
1391 .status(ChannelStatus::Open)
1392 .epoch(1)
1393 .build()?;
1394
1395 let mut tickets = generate_owned_tickets(&src, &dst, 5, 1..=1)?;
1396 tickets.shuffle(&mut rand::rng());
1397
1398 for ticket in tickets.iter() {
1399 assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1400 }
1401
1402 tickets.sort();
1403
1404 let connector =
1405 std::sync::Arc::new(create_test_connector(&dst, &channel, Some(std::time::Duration::from_secs(2))).await?);
1406
1407 let mgr_clone = mgr.clone();
1408 let jh = tokio::task::spawn(async move {
1409 let stream = mgr_clone.redeem_stream(connector.clone(), *channel.get_id(), None)?;
1410 pin_mut!(stream);
1411 stream.try_next().await
1412 });
1413 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1414
1415 let neglected = mgr.neglect_tickets(&channel.get_id(), None)?;
1417 assert_eq!(neglected, tickets.iter().map(|t| t.ticket).collect::<Vec<_>>());
1418
1419 assert_eq!(
1420 ChannelStats {
1421 winning_tickets: tickets.len() as u128,
1422 unredeemed_value: HoprBalance::zero(),
1423 rejected_value: HoprBalance::zero(),
1424 neglected_value: neglected.iter().map(|t| t.verified_ticket().amount).sum(),
1425 },
1426 mgr.ticket_stats(Some(&channel.get_id()))?
1427 );
1428
1429 let res = jh.await??;
1431 assert_eq!(Some(RedemptionResult::Redeemed(tickets[0].ticket)), res);
1432
1433 assert_eq!(
1434 ChannelStats {
1435 winning_tickets: tickets.len() as u128,
1436 unredeemed_value: HoprBalance::zero(),
1437 rejected_value: HoprBalance::zero(),
1438 neglected_value: neglected
1439 .iter()
1440 .map(|t| t.verified_ticket().amount)
1441 .sum::<HoprBalance>()
1442 - tickets[0].verified_ticket().amount,
1443 },
1444 mgr.ticket_stats(Some(&channel.get_id()))?
1445 );
1446
1447 Ok(())
1448 }
1449
1450 #[tokio::test]
1451 async fn ticket_manager_ticket_redemption_should_skip_low_value_tickets() -> anyhow::Result<()> {
1452 let mgr = create_mgr()?;
1453
1454 let src = ChainKeypair::random();
1455 let dst = ChainKeypair::random();
1456
1457 let channel = ChannelEntry::builder()
1458 .between(&src, &dst)
1459 .amount(10_000_000_000_u64)
1460 .ticket_index(0)
1461 .status(ChannelStatus::Open)
1462 .epoch(1)
1463 .build()?;
1464
1465 let mut tickets = generate_owned_tickets(&src, &dst, 5, 1..=1)?;
1466 tickets.shuffle(&mut rand::rng());
1467
1468 for ticket in tickets.iter() {
1469 assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1470 }
1471
1472 tickets.sort();
1473
1474 let unrealized_value = mgr
1475 .unrealized_value(channel.get_id(), None)?
1476 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1477 assert_eq!(
1478 tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1479 unrealized_value
1480 );
1481
1482 let connector = std::sync::Arc::new(create_test_connector(&dst, &channel, None).await?);
1483
1484 let results = mgr
1485 .redeem_stream(
1486 connector.clone(),
1487 *channel.get_id(),
1488 Some(tickets[0].verified_ticket().amount + 1),
1489 )?
1490 .try_collect::<Vec<_>>()
1491 .await?;
1492
1493 assert_eq!(
1494 results,
1495 tickets
1496 .into_iter()
1497 .map(|t| RedemptionResult::ValueTooLow(t.ticket))
1498 .collect::<Vec<_>>()
1499 );
1500
1501 Ok(())
1502 }
1503}