1mod backend;
6mod errors;
7mod traits;
8mod utils;
9
10use std::{convert::identity, sync::atomic::AtomicBool};
11
12use futures::{Stream, TryFutureExt};
13use hopr_api::{
14 chain::{ChainWriteTicketOperations, TicketRedeemError},
15 tickets::{ChannelStats, RedemptionResult},
16 types::{internal::prelude::*, primitive::prelude::*},
17};
18
19#[cfg(feature = "redb")]
20pub use crate::backend::{RedbStore, RedbTicketQueue};
21use crate::{
22 backend::ValueCachedQueue,
23 utils::{ChannelTicketQueue, OutgoingIndexCache},
24};
25pub use crate::{
26 backend::{MemoryStore, MemoryTicketQueue},
27 errors::TicketManagerError,
28 traits::{OutgoingIndexStore, TicketQueue, TicketQueueStore},
29};
30
31#[derive(Debug)]
129pub struct HoprTicketManager<S, Q> {
130 out_idx_tracker: OutgoingIndexCache,
131 channel_tickets: dashmap::DashMap<ChannelId, ChannelTicketQueue<ValueCachedQueue<Q>>>,
132 store: std::sync::Arc<parking_lot::RwLock<S>>,
133}
134
135impl<S, Q> HoprTicketManager<S, Q>
136where
137 S: OutgoingIndexStore + Send + Sync + 'static,
138{
139 pub fn new(store: S) -> Result<Self, TicketManagerError> {
148 let store = std::sync::Arc::new(parking_lot::RwLock::new(store));
149 Ok(Self {
150 out_idx_tracker: OutgoingIndexCache::default(),
151 channel_tickets: dashmap::DashMap::new(),
152 store,
153 })
154 }
155
156 fn next_outgoing_ticket_index(&self, channel: &ChannelEntry) -> u64 {
162 let mut next_index = self.out_idx_tracker.next(channel.get_id(), channel.channel_epoch);
163 tracing::trace!(%channel, next_index, "next outgoing ticket index");
164
165 let epoch = channel.channel_epoch;
166
167 if next_index < channel.ticket_index {
168 self.out_idx_tracker
171 .upsert(channel.get_id(), epoch, channel.ticket_index + 1);
172 next_index = channel.ticket_index; }
174
175 if next_index == 0 && epoch > 1 && self.out_idx_tracker.remove(channel.get_id(), epoch - 1) {
179 tracing::trace!(%channel, prev_epoch = epoch - 1, "removing previous epoch from outgoing index cache");
180 }
181
182 next_index
183 }
184
185 pub fn save_outgoing_indices(&self) -> Result<(), TicketManagerError> {
190 self.out_idx_tracker
191 .save(self.store.clone())
192 .map_err(TicketManagerError::store)?;
193 Ok(())
194 }
195
196 pub fn sync_from_outgoing_channels(&self, outgoing_channels: &[ChannelEntry]) -> Result<(), TicketManagerError> {
209 let outgoing_channels: std::collections::HashSet<_, std::hash::RandomState> =
210 outgoing_channels.iter().collect();
211
212 let mut store_read = self.store.upgradable_read();
214 let stored_indices = store_read
215 .iter_outgoing_indices()
216 .map_err(TicketManagerError::store)?
217 .collect::<Vec<_>>();
218 for (channel_id, epoch) in stored_indices {
219 if !outgoing_channels.iter().any(|channel| {
222 channel.status == ChannelStatus::Open
223 && channel.get_id() == &channel_id
224 && channel.channel_epoch == epoch
225 }) {
226 let mut store_write = parking_lot::RwLockUpgradableReadGuard::upgrade(store_read);
227 store_write
228 .delete_outgoing_index(&channel_id, epoch)
229 .map_err(TicketManagerError::store)?;
230 store_read = parking_lot::RwLockWriteGuard::downgrade_to_upgradable(store_write);
231 tracing::debug!(%channel_id, epoch, "purging outdated outgoing index")
232 }
233 }
234
235 for channel in outgoing_channels
236 .iter()
237 .filter(|channel| channel.status == ChannelStatus::Open)
238 {
239 let id = channel.get_id();
240
241 let epoch = channel.channel_epoch;
244 let index = match store_read.load_outgoing_index(id, epoch) {
245 Ok(Some(out_index)) => out_index,
246 Ok(None) => 0,
247 Err(error) => {
248 tracing::error!(%error, %id, "failed to load outgoing index for channel, falling back to channel ticket index");
249 0
250 }
251 };
252
253 let out_index = index.max(channel.ticket_index);
255 self.out_idx_tracker.upsert(id, epoch, out_index);
256 tracing::debug!(%id, epoch, out_index, "loaded outgoing ticket index for channel");
257 }
258
259 tracing::debug!(
260 num_channels = outgoing_channels.len(),
261 "synchronized with outgoing channels"
262 );
263 Ok(())
264 }
265
266 pub fn next_multihop_ticket(
280 &self,
281 channel: &ChannelEntry,
282 current_path_pos: u8,
283 winning_prob: WinningProbability,
284 ticket_price: HoprBalance,
285 ) -> Result<TicketBuilder, TicketManagerError> {
286 if current_path_pos <= 1 {
287 return Err(TicketManagerError::Other(anyhow::anyhow!(
288 "current path position for multihop ticket must be greater than 1"
289 )));
290 }
291
292 if channel.status != ChannelStatus::Open {
293 return Err(TicketManagerError::Other(anyhow::anyhow!(
294 "channel must be open to create a multihop ticket"
295 )));
296 }
297
298 let amount = HoprBalance::from(
300 ticket_price
301 .amount()
302 .saturating_mul(U256::from(current_path_pos - 1))
303 .div_f64(winning_prob.into())
304 .expect("winning probability is always less than or equal to 1"),
305 );
306
307 if channel.balance.lt(&amount) {
308 return Err(TicketManagerError::OutOfFunds(*channel.get_id(), amount));
309 }
310
311 let ticket_builder = TicketBuilder::default()
312 .counterparty(channel.destination)
313 .balance(amount)
314 .index(self.next_outgoing_ticket_index(channel))
315 .win_prob(winning_prob)
316 .channel_epoch(channel.channel_epoch);
317
318 Ok(ticket_builder)
319 }
320}
321
322struct RedeemState<C, Q> {
323 lock: std::sync::Arc<AtomicBool>,
324 queue: std::sync::Arc<parking_lot::RwLock<Q>>,
325 chain: C,
326 min_redeem_value: HoprBalance,
327 channel_id: ChannelId,
328}
329
330impl<C, Q> Drop for RedeemState<C, Q> {
331 fn drop(&mut self) {
332 self.lock.store(false, std::sync::atomic::Ordering::Release);
333 }
334}
335
336impl<S> HoprTicketManager<S, S::Queue>
337where
338 S: TicketQueueStore + Send + Sync + 'static,
339 S::Queue: Send + Sync + 'static,
340{
341 pub fn sync_from_incoming_channels(
356 &self,
357 incoming_channels: &[ChannelEntry],
358 ) -> Result<Vec<VerifiedTicket>, TicketManagerError> {
359 let incoming_channels: std::collections::HashSet<_, std::hash::RandomState> =
360 incoming_channels.iter().collect();
361
362 let mut store_read = self.store.upgradable_read();
364 let stored_queues = store_read
365 .iter_queues()
366 .map_err(TicketManagerError::store)?
367 .collect::<Vec<_>>();
368 let mut neglected = Vec::new();
369 let now = hopr_platform::time::current_time();
370 for channel_id in stored_queues {
371 if !incoming_channels
375 .iter()
376 .any(|channel| !channel.closure_time_passed(now) && channel.get_id() == &channel_id)
377 {
378 let mut store_write = parking_lot::RwLockUpgradableReadGuard::upgrade(store_read);
379 neglected.extend(
380 store_write
381 .delete_queue(&channel_id)
382 .map_err(TicketManagerError::store)?,
383 );
384 tracing::debug!(%channel_id, "purged outdated incoming tickets queue");
385 store_read = parking_lot::RwLockWriteGuard::downgrade_to_upgradable(store_write);
386
387 self.channel_tickets.remove(&channel_id);
389 }
390 }
391 for channel in incoming_channels
393 .iter()
394 .filter(|channel| !channel.closure_time_passed(now))
395 {
396 let id = channel.get_id();
397
398 let mut store_write = parking_lot::RwLockUpgradableReadGuard::upgrade(store_read);
400 let queue = store_write
401 .open_or_create_queue(id)
402 .map_err(TicketManagerError::store)?;
403 store_read = parking_lot::RwLockWriteGuard::downgrade_to_upgradable(store_write);
404
405 let queue = ValueCachedQueue::new(queue).map_err(TicketManagerError::store)?;
407
408 tracing::debug!(%id, num_tickets = queue.len().map_err(TicketManagerError::store)?, "loaded redeemable ticket queue for channel");
409 self.channel_tickets.insert(*id, queue.into());
410 }
411
412 tracing::debug!(
413 num_channels = incoming_channels.len(),
414 num_neglected = neglected.len(),
415 "synchronized with incoming channels"
416 );
417 Ok(neglected)
418 }
419
420 pub fn insert_incoming_ticket(&self, ticket: RedeemableTicket) -> Result<Vec<VerifiedTicket>, TicketManagerError> {
428 let mut neglected_tickets = Vec::with_capacity(0);
430
431 let ticket_id = ticket.ticket_id();
432 match self.channel_tickets.entry(ticket_id.id) {
433 dashmap::Entry::Occupied(e) => {
434 let mut queue = e.get().queue.write();
439
440 if let Some(last_ticket) = queue.0.peek().map_err(TicketManagerError::store)? {
445 if last_ticket.verified_ticket().channel_epoch < ticket.verified_ticket().channel_epoch {
446 let mut neg = queue.0.drain().map_err(TicketManagerError::store)?;
448 queue.1.neglected_value += neg.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>();
449
450 neglected_tickets.append(&mut neg);
452 tracing::warn!(%ticket_id, num_neglected = neglected_tickets.len(), "winning ticket has neglected unredeemed tickets from previous epochs");
453 } else if last_ticket.verified_ticket().channel_epoch > ticket.verified_ticket().channel_epoch {
454 tracing::warn!(%ticket_id, "tried to insert incoming ticket from an older epoch");
455
456 queue.1.winning_tickets += 1; queue.1.neglected_value += ticket.verified_ticket().amount;
458 neglected_tickets.push(ticket.ticket);
459 return Ok(neglected_tickets);
460 }
461 }
462 queue.0.push(ticket).map_err(TicketManagerError::store)?;
463 queue.1.winning_tickets += 1;
464
465 tracing::debug!(%ticket_id, "winning ticket on channel");
466 }
467 dashmap::Entry::Vacant(v) => {
468 let mut store = self.store.write();
472
473 let queue = store
474 .open_or_create_queue(&ticket.ticket_id().id)
475 .map_err(TicketManagerError::store)?;
476
477 let mut queue = ValueCachedQueue::new(queue).map_err(TicketManagerError::store)?;
479
480 if !queue.is_empty().map_err(TicketManagerError::store)? {
482 return Err(TicketManagerError::Other(anyhow::anyhow!(
483 "fatal error: queue not empty"
484 )));
485 }
486
487 queue.push(ticket).map_err(TicketManagerError::store)?;
488 v.insert(queue.into()); tracing::debug!(%ticket_id, "first winning ticket on channel");
490 }
491 }
492
493 Ok(neglected_tickets)
494 }
495
496 pub fn unrealized_value(
501 &self,
502 channel_id: &ChannelId,
503 min_index: Option<u64>,
504 ) -> Result<Option<HoprBalance>, TicketManagerError> {
505 if let Some(ticket_queue) = self.channel_tickets.get(channel_id) {
506 let queue = ticket_queue.queue.read();
509
510 if let Some(epoch) = queue
514 .0
515 .peek()
516 .map_err(TicketManagerError::store)?
517 .map(|t| t.verified_ticket().channel_epoch)
518 {
519 Ok(Some(
520 queue
521 .0
522 .total_value(epoch, min_index)
523 .map_err(TicketManagerError::store)?,
524 ))
525 } else {
526 Ok(Some(HoprBalance::zero()))
527 }
528 } else {
529 Ok(None)
530 }
531 }
532}
533impl<S> hopr_api::tickets::TicketManagement for HoprTicketManager<S, S::Queue>
534where
535 S: TicketQueueStore + Send + Sync + 'static,
536 S::Queue: Send + Sync + 'static,
537{
538 type Error = TicketManagerError;
539
540 fn redeem_stream<C: ChainWriteTicketOperations + Send + Sync + 'static>(
552 &self,
553 chain: C,
554 channel_id: ChannelId,
555 min_amount: Option<HoprBalance>,
556 ) -> Result<impl Stream<Item = Result<RedemptionResult, Self::Error>> + Send, Self::Error> {
557 let initial_state = match self.channel_tickets.get(&channel_id) {
558 Some(ticket_queue) => {
559 ticket_queue
560 .redeem_lock
561 .compare_exchange(
562 false,
563 true,
564 std::sync::atomic::Ordering::Acquire,
565 std::sync::atomic::Ordering::Relaxed,
566 )
567 .map_err(|_| TicketManagerError::AlreadyRedeeming)?;
568
569 RedeemState {
570 lock: ticket_queue.redeem_lock.clone(),
571 queue: ticket_queue.queue.clone(),
572 min_redeem_value: min_amount.unwrap_or_default(), chain,
574 channel_id,
575 }
576 }
577 None => return Err(TicketManagerError::ChannelQueueNotFound),
578 };
579
580 Ok(futures::stream::try_unfold(initial_state, |state| {
581 let next_ticket = state.queue.read().0.peek();
583 async move {
584 match next_ticket.map_err(TicketManagerError::store)? {
585 Some(ticket_to_redeem) => {
586 let redeem_attempt_result =
588 if ticket_to_redeem.verified_ticket().amount >= state.min_redeem_value {
589 match state.chain.redeem_ticket(ticket_to_redeem).and_then(identity).await {
590 Ok((redeemed_ticket, _)) => Ok(Some(RedemptionResult::Redeemed(redeemed_ticket))),
591 Err(TicketRedeemError::Rejected(ticket, reason)) => {
592 Ok(Some(RedemptionResult::RejectedOnChain(ticket, reason)))
593 }
594 Err(TicketRedeemError::ProcessingError(_, err)) => {
595 Err(TicketManagerError::redeem(err))
596 }
597 }
598 } else {
599 Ok(Some(RedemptionResult::ValueTooLow(ticket_to_redeem.ticket)))
601 };
602
603 if let Ok(Some(redeem_complete_result)) = &redeem_attempt_result {
606 let mut queue_write = state.queue.write();
611 let pop_res = queue_write.0.pop().map_err(TicketManagerError::store)?;
612
613 match redeem_complete_result {
615 RedemptionResult::Redeemed(ticket) => {
616 queue_write.1.redeemed_value += ticket.verified_ticket().amount;
617 tracing::info!(%ticket, "ticket has been redeemed");
618 },
619 RedemptionResult::ValueTooLow(ticket) => {
620 queue_write.1.neglected_value += ticket.verified_ticket().amount;
621 tracing::warn!(%ticket, "ticket has been neglected");
622 },
623 RedemptionResult::RejectedOnChain(ticket, reason) => {
624 queue_write.1.rejected_value += ticket.verified_ticket().amount;
625 tracing::warn!(%ticket, reason, "ticket has been rejected on-chain");
626 },
627 }
628
629 if pop_res.is_none() {
635 let ticket = redeem_complete_result.as_ref();
636 tracing::warn!(%ticket, "ticket has been neglected from the queue while it actually completed the redemption process");
637 queue_write.1.neglected_value -= ticket.verified_ticket().amount;
638 }
639 }
640
641 redeem_attempt_result
642 }
643 None => {
644 tracing::debug!(channel_id = %state.channel_id, "no more tickets to redeem in channel");
647 Ok(None)
648 }
649 }
650 .map(|s| s.map(|v| (v, state)))
651 }
652 }))
653 }
654
655 fn neglect_tickets(
663 &self,
664 channel_id: &ChannelId,
665 up_to_index: Option<u64>,
666 ) -> Result<Vec<VerifiedTicket>, TicketManagerError> {
667 let queue = self
668 .channel_tickets
669 .get(channel_id)
670 .map(|q| {
671 if q.redeem_lock.load(std::sync::atomic::Ordering::Relaxed) {
672 tracing::warn!(%channel_id, "neglecting tickets in channel while redeeming is ongoing");
673 }
674 q.queue.clone()
675 })
676 .ok_or(TicketManagerError::ChannelQueueNotFound)?;
677
678 let mut neglected_tickets = Vec::new();
679 let mut queue_read = queue.upgradable_read();
680 let max_index = up_to_index.unwrap_or(TicketBuilder::MAX_TICKET_INDEX);
681
682 while queue_read
683 .0
684 .peek()
685 .map_err(TicketManagerError::store)?
686 .filter(|ticket| ticket.verified_ticket().index <= max_index)
687 .is_some()
688 {
689 let mut queue_write = parking_lot::RwLockUpgradableReadGuard::upgrade(queue_read);
691 let maybe_ticket = queue_write.0.pop().map_err(TicketManagerError::store)?;
692 queue_write.1.neglected_value += maybe_ticket.map(|t| t.verified_ticket().amount).unwrap_or_default();
693 queue_read = parking_lot::RwLockWriteGuard::downgrade_to_upgradable(queue_write);
694
695 neglected_tickets.extend(maybe_ticket.map(|t| t.ticket));
696 tracing::debug!(%channel_id, ?maybe_ticket, "neglected ticket in channel");
697 }
698
699 tracing::debug!(%channel_id, num_tickets = neglected_tickets.len(), "ticket neglection done in channel");
701 Ok(neglected_tickets)
702 }
703
704 #[allow(deprecated)] fn ticket_stats(&self, channel: Option<&ChannelId>) -> Result<ChannelStats, TicketManagerError> {
711 self.channel_tickets
712 .iter()
713 .filter(|e| channel.is_none_or(|c| c == e.key()))
714 .try_fold(ChannelStats::default(), |stats, v| {
715 let queue = v.queue.read();
716 Ok::<_, TicketManagerError>(ChannelStats {
717 winning_tickets: queue.1.winning_tickets + stats.winning_tickets,
718 unredeemed_value: queue
719 .0
720 .peek()
721 .map_err(TicketManagerError::store)?
722 .map(|t| queue.0.total_value(t.verified_ticket().channel_epoch, None))
723 .transpose()
724 .map_err(TicketManagerError::store)?
725 .unwrap_or_default()
726 + stats.unredeemed_value,
727 rejected_value: queue.1.rejected_value + stats.rejected_value,
728 redeemed_value: queue.1.redeemed_value + stats.redeemed_value,
729 neglected_value: queue.1.neglected_value + stats.neglected_value,
730 })
731 })
732 }
733}
734
735#[allow(deprecated)] #[cfg(test)]
737mod tests {
738 use std::ops::Sub;
739
740 use futures::{TryStreamExt, pin_mut};
741 use hopr_api::{
742 OffchainKeypair,
743 tickets::TicketManagement,
744 types::crypto::prelude::{ChainKeypair, Keypair},
745 };
746 use hopr_chain_connector::{
747 BlockchainConnectorConfig, HoprBlockchainConnector, InMemoryBackend, PayloadGenerator, SafePayloadGenerator,
748 reexports::chain::contract_addresses_for_network,
749 testing::{BlokliTestClient, BlokliTestStateBuilder, FullStateEmulator},
750 };
751 use rand::prelude::SliceRandom;
752
753 use super::*;
754 use crate::traits::tests::{generate_owned_tickets, generate_tickets};
755
756 fn create_mgr() -> anyhow::Result<HoprTicketManager<MemoryStore, MemoryTicketQueue>> {
757 Ok(HoprTicketManager::new(MemoryStore::default())?)
758 }
759
760 #[test]
761 fn ticket_manager_non_existing_channel_should_return_empty_stats() -> anyhow::Result<()> {
762 let mgr = create_mgr()?;
763
764 assert_eq!(ChannelStats::default(), mgr.ticket_stats(None)?);
765 assert_eq!(ChannelStats::default(), mgr.ticket_stats(Some(&ChannelId::default()))?);
766 Ok(())
767 }
768
769 #[test]
770 fn ticket_manager_should_create_multihop_tickets() -> anyhow::Result<()> {
771 let mgr = create_mgr()?;
772
773 let src = ChainKeypair::random();
774 let dst = ChainKeypair::random();
775
776 let channel = ChannelEntry::builder()
777 .between(&src, &dst)
778 .amount(10)
779 .ticket_index(1)
780 .status(ChannelStatus::Open)
781 .epoch(1)
782 .build()?;
783
784 mgr.sync_from_outgoing_channels(&[channel])?;
786
787 let ticket_1 = mgr
788 .next_multihop_ticket(&channel, 2, WinningProbability::ALWAYS, 10.into())?
789 .eth_challenge(Default::default())
790 .build_signed(&src, &Default::default())?;
791
792 assert_eq!(ticket_1.channel_id(), channel.get_id());
793 assert_eq!(channel.ticket_index, ticket_1.verified_ticket().index);
794 assert_eq!(channel.channel_epoch, ticket_1.verified_ticket().channel_epoch);
795
796 let ticket_2 = mgr
797 .next_multihop_ticket(&channel, 2, WinningProbability::ALWAYS, 10.into())?
798 .eth_challenge(Default::default())
799 .build_signed(&src, &Default::default())?;
800
801 assert_eq!(ticket_2.channel_id(), channel.get_id());
802 assert_eq!(channel.ticket_index + 1, ticket_2.verified_ticket().index);
803 assert_eq!(channel.channel_epoch, ticket_2.verified_ticket().channel_epoch);
804
805 Ok(())
806 }
807
808 #[test]
809 fn ticket_manager_should_update_state_when_winning_tickets_are_inserted() -> anyhow::Result<()> {
810 let mgr = create_mgr()?;
811
812 let src = ChainKeypair::random();
813 let dst = ChainKeypair::random();
814
815 let channel = ChannelEntry::builder()
816 .between(&src, &dst)
817 .amount(10)
818 .ticket_index(1)
819 .status(ChannelStatus::Open)
820 .epoch(1)
821 .build()?;
822
823 let tickets = generate_owned_tickets(&src, &dst, 2, 1..=1)?;
824
825 mgr.insert_incoming_ticket(tickets[0])?;
826
827 assert_eq!(
828 ChannelStats {
829 winning_tickets: 1,
830 unredeemed_value: tickets[0].verified_ticket().amount,
831 rejected_value: HoprBalance::zero(),
832 redeemed_value: HoprBalance::zero(),
833 neglected_value: HoprBalance::zero(),
834 },
835 mgr.ticket_stats(Some(&channel.get_id()))?
836 );
837
838 mgr.insert_incoming_ticket(tickets[1])?;
839
840 assert_eq!(
841 ChannelStats {
842 winning_tickets: 2,
843 unredeemed_value: tickets[0].verified_ticket().amount + tickets[1].verified_ticket().amount,
844 rejected_value: HoprBalance::zero(),
845 redeemed_value: HoprBalance::zero(),
846 neglected_value: HoprBalance::zero(),
847 },
848 mgr.ticket_stats(Some(&channel.get_id()))?
849 );
850
851 Ok(())
852 }
853
854 #[test]
855 fn ticket_manager_create_multihop_ticket_should_fail_on_wrong_input() -> anyhow::Result<()> {
856 let mgr = create_mgr()?;
857
858 let src = ChainKeypair::random();
859 let dst = ChainKeypair::random();
860
861 let mut channel = ChannelEntry::builder()
862 .between(&src, &dst)
863 .amount(10)
864 .ticket_index(1)
865 .status(ChannelStatus::Closed)
866 .epoch(1)
867 .build()?;
868
869 assert!(
870 mgr.next_multihop_ticket(&channel, 2, WinningProbability::ALWAYS, 1.into())
871 .is_err()
872 );
873
874 channel.status =
875 ChannelStatus::PendingToClose(std::time::SystemTime::now() - std::time::Duration::from_secs(10));
876
877 assert!(
878 mgr.next_multihop_ticket(&channel, 2, WinningProbability::ALWAYS, 1.into())
879 .is_err()
880 );
881
882 channel.status = ChannelStatus::Open;
883
884 assert!(
885 mgr.next_multihop_ticket(&channel, 2, WinningProbability::ALWAYS, 11.into())
886 .is_err()
887 );
888
889 assert!(
890 mgr.next_multihop_ticket(&channel, 1, WinningProbability::ALWAYS, 1.into())
891 .is_err()
892 );
893
894 Ok(())
895 }
896
897 #[test]
898 fn ticket_manager_test_next_outgoing_ticket_index() -> anyhow::Result<()> {
899 let mgr = create_mgr()?;
900
901 let src = ChainKeypair::random();
902 let dst = ChainKeypair::random();
903
904 let mut channel = ChannelEntry::builder()
905 .between(&src, &dst)
906 .amount(10)
907 .ticket_index(0)
908 .status(ChannelStatus::Open)
909 .epoch(1)
910 .build()?;
911
912 assert_eq!(0, mgr.next_outgoing_ticket_index(&channel));
913
914 channel.ticket_index = 10;
915 assert_eq!(10, mgr.next_outgoing_ticket_index(&channel));
916 assert_eq!(11, mgr.next_outgoing_ticket_index(&channel));
917
918 channel.ticket_index = 100;
919 assert_eq!(100, mgr.next_outgoing_ticket_index(&channel));
920 assert_eq!(101, mgr.next_outgoing_ticket_index(&channel));
921
922 channel.ticket_index = 50;
923 assert_eq!(102, mgr.next_outgoing_ticket_index(&channel));
924 assert_eq!(103, mgr.next_outgoing_ticket_index(&channel));
925
926 mgr.save_outgoing_indices()?;
927 assert_eq!(Some(104), mgr.store.read().load_outgoing_index(channel.get_id(), 1)?);
928
929 channel.ticket_index = 0;
930 channel.channel_epoch = 2;
931
932 assert_eq!(0, mgr.next_outgoing_ticket_index(&channel));
933 mgr.save_outgoing_indices()?;
934
935 assert_eq!(None, mgr.store.read().load_outgoing_index(channel.get_id(), 1)?);
936 assert_eq!(Some(1), mgr.store.read().load_outgoing_index(channel.get_id(), 2)?);
937
938 assert_eq!(1, mgr.next_outgoing_ticket_index(&channel));
939
940 Ok(())
941 }
942
943 #[test]
944 fn ticket_manager_should_save_out_indices_to_the_store_on_demand() -> anyhow::Result<()> {
945 let mgr = create_mgr()?;
946
947 let src = ChainKeypair::random();
948 let dst = ChainKeypair::random();
949
950 let channel = ChannelEntry::builder()
951 .between(&src, &dst)
952 .amount(10)
953 .ticket_index(1)
954 .status(ChannelStatus::Open)
955 .epoch(1)
956 .build()?;
957
958 mgr.sync_from_outgoing_channels(&[channel])?;
960
961 mgr.next_multihop_ticket(&channel, 2, WinningProbability::ALWAYS, 10.into())?;
962
963 let saved_index = mgr.store.read().load_outgoing_index(channel.get_id(), 1)?;
965 assert_eq!(None, saved_index);
966
967 mgr.next_multihop_ticket(&channel, 2, WinningProbability::ALWAYS, 10.into())?;
968
969 mgr.save_outgoing_indices()?;
970 let saved_index = mgr.store.read().load_outgoing_index(channel.get_id(), 1)?;
971 assert_eq!(Some(3), saved_index);
972
973 mgr.next_multihop_ticket(&channel, 2, WinningProbability::ALWAYS, 10.into())?;
974
975 let saved_index = mgr.store.read().load_outgoing_index(channel.get_id(), 1)?;
976 assert_eq!(Some(3), saved_index);
977
978 mgr.save_outgoing_indices()?;
979 let saved_index = mgr.store.read().load_outgoing_index(channel.get_id(), 1)?;
980 assert_eq!(Some(4), saved_index);
981
982 Ok(())
983 }
984
985 #[test]
986 fn ticket_manager_should_sync_out_indices_from_chain_state() -> anyhow::Result<()> {
987 let mgr = create_mgr()?;
988
989 let src = ChainKeypair::random();
990 let dst = ChainKeypair::random();
991
992 let channel = ChannelEntry::builder()
993 .between(&src, &dst)
994 .amount(10)
995 .ticket_index(1)
996 .status(ChannelStatus::Open)
997 .epoch(1)
998 .build()?;
999
1000 mgr.sync_from_outgoing_channels(&[channel])?;
1001 mgr.save_outgoing_indices()?;
1002
1003 let saved_index = mgr.store.read().load_outgoing_index(channel.get_id(), 1)?;
1004 assert_eq!(Some(1), saved_index);
1005
1006 Ok(())
1007 }
1008
1009 #[test_log::test]
1010 fn ticket_manager_should_sync_out_indices_should_remove_indices_for_non_opened_outgoing_channels()
1011 -> anyhow::Result<()> {
1012 let mgr = create_mgr()?;
1013
1014 let src = ChainKeypair::random();
1015 let dst = ChainKeypair::random();
1016
1017 let mut channel_1 = ChannelEntry::builder()
1018 .between(&src, &dst)
1019 .amount(10)
1020 .ticket_index(0)
1021 .status(ChannelStatus::Open)
1022 .epoch(1)
1023 .build()?;
1024
1025 let mut channel_2 = ChannelEntry::builder()
1026 .between(&dst, &src)
1027 .amount(10)
1028 .ticket_index(0)
1029 .status(ChannelStatus::Open)
1030 .epoch(1)
1031 .build()?;
1032
1033 let ticket_1 = mgr
1034 .next_multihop_ticket(&channel_1, 2, WinningProbability::ALWAYS, 10.into())?
1035 .eth_challenge(Default::default())
1036 .build()?;
1037 let ticket_2 = mgr
1038 .next_multihop_ticket(&channel_2, 2, WinningProbability::ALWAYS, 10.into())?
1039 .eth_challenge(Default::default())
1040 .build()?;
1041 assert_eq!(0, ticket_1.index);
1042 assert_eq!(0, ticket_2.index);
1043
1044 mgr.save_outgoing_indices()?;
1045
1046 assert_eq!(Some(1), mgr.store.read().load_outgoing_index(channel_1.get_id(), 1)?);
1047 assert_eq!(Some(1), mgr.store.read().load_outgoing_index(channel_2.get_id(), 1)?);
1048
1049 channel_1.status = ChannelStatus::Closed;
1050 channel_2.status =
1051 ChannelStatus::PendingToClose(std::time::SystemTime::now() - std::time::Duration::from_mins(10));
1052
1053 mgr.sync_from_outgoing_channels(&[channel_1, channel_2])?;
1054
1055 assert_eq!(None, mgr.store.read().load_outgoing_index(channel_1.get_id(), 1)?);
1056 assert_eq!(None, mgr.store.read().load_outgoing_index(channel_2.get_id(), 1)?);
1057
1058 Ok(())
1059 }
1060
1061 #[test]
1062 fn ticket_manager_should_sync_incoming_channels_from_chain_state() -> anyhow::Result<()> {
1063 let mgr = create_mgr()?;
1064
1065 let src = ChainKeypair::random();
1066 let dst = ChainKeypair::random();
1067
1068 let channel = ChannelEntry::builder()
1069 .between(&src, &dst)
1070 .amount(10)
1071 .ticket_index(1)
1072 .status(ChannelStatus::Open)
1073 .epoch(1)
1074 .build()?;
1075
1076 let neglected = mgr.sync_from_incoming_channels(&[channel])?;
1077 assert!(neglected.is_empty());
1078
1079 let queues = mgr.store.read().iter_queues()?.collect::<Vec<_>>();
1080 assert_eq!(vec![*channel.get_id()], queues);
1081
1082 Ok(())
1083 }
1084
1085 #[test]
1086 fn ticket_manager_should_neglect_tickets_from_closed_channels_on_sync() -> anyhow::Result<()> {
1087 let mgr = create_mgr()?;
1088
1089 let tickets = generate_tickets()?;
1090 let neglected = mgr.insert_incoming_ticket(tickets[0])?;
1091 assert!(neglected.is_empty());
1092
1093 let channel = ChannelEntry::builder()
1094 .between(
1095 *tickets[0].ticket.verified_issuer(),
1096 tickets[0].verified_ticket().counterparty,
1097 )
1098 .amount(10)
1099 .ticket_index(1)
1100 .status(ChannelStatus::Closed)
1101 .epoch(1)
1102 .build()?;
1103
1104 let neglected = mgr.sync_from_incoming_channels(&[channel])?;
1105 assert_eq!(1, neglected.len());
1106 assert_eq!(tickets[0].ticket, neglected[0]);
1107
1108 Ok(())
1109 }
1110
1111 #[test]
1112 fn ticket_manager_should_neglect_tickets_from_effectively_closed_channels_on_sync() -> anyhow::Result<()> {
1113 let mgr = create_mgr()?;
1114
1115 let tickets = generate_tickets()?;
1116 let neglected = mgr.insert_incoming_ticket(tickets[0])?;
1117 assert!(neglected.is_empty());
1118
1119 let channel = ChannelEntry::builder()
1120 .between(
1121 *tickets[0].ticket.verified_issuer(),
1122 tickets[0].verified_ticket().counterparty,
1123 )
1124 .amount(10)
1125 .ticket_index(1)
1126 .status(ChannelStatus::PendingToClose(
1127 std::time::SystemTime::now().sub(std::time::Duration::from_mins(10)),
1128 ))
1129 .epoch(1)
1130 .build()?;
1131
1132 let neglected = mgr.sync_from_incoming_channels(&[channel])?;
1133 assert_eq!(1, neglected.len());
1134 assert_eq!(tickets[0].ticket, neglected[0]);
1135
1136 Ok(())
1137 }
1138
1139 #[test]
1140 fn ticket_manager_should_neglect_tickets_from_non_existent_channels_on_sync() -> anyhow::Result<()> {
1141 let mgr = create_mgr()?;
1142
1143 let tickets = generate_tickets()?;
1144
1145 let neglected = mgr.insert_incoming_ticket(tickets[0])?;
1146 assert!(neglected.is_empty());
1147
1148 let neglected = mgr.sync_from_incoming_channels(&[])?;
1149 assert_eq!(1, neglected.len());
1150 assert_eq!(tickets[0].ticket, neglected[0]);
1151
1152 Ok(())
1153 }
1154
1155 #[test]
1156 fn ticket_manager_should_neglect_tickets_on_demand() -> anyhow::Result<()> {
1157 let mgr = create_mgr()?;
1158
1159 let tickets = generate_tickets()?;
1160 let epoch = tickets[0].ticket_id().epoch;
1161
1162 let tickets = tickets
1163 .into_iter()
1164 .filter(|t| t.verified_ticket().channel_epoch == epoch)
1165 .collect::<Vec<_>>();
1166
1167 let channel = ChannelEntry::builder()
1168 .between(
1169 *tickets[0].ticket.verified_issuer(),
1170 tickets[0].verified_ticket().counterparty,
1171 )
1172 .amount(10)
1173 .ticket_index(tickets.len() as u64)
1174 .status(ChannelStatus::Open)
1175 .epoch(1)
1176 .build()?;
1177
1178 for ticket in tickets.iter() {
1179 let neglected = mgr.insert_incoming_ticket(*ticket)?;
1180 assert!(neglected.is_empty());
1181 }
1182
1183 let neglected = mgr.sync_from_incoming_channels(&[channel])?;
1184 assert!(neglected.is_empty());
1185
1186 let unrealized_value = mgr
1187 .unrealized_value(channel.get_id(), None)?
1188 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1189 assert_eq!(
1190 tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1191 unrealized_value
1192 );
1193
1194 let neglected = mgr.neglect_tickets(&channel.get_id(), None)?;
1195 assert_eq!(tickets.iter().map(|t| t.ticket).collect::<Vec<_>>(), neglected);
1196
1197 let unrealized_value_after = mgr
1198 .unrealized_value(channel.get_id(), None)?
1199 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1200 assert_eq!(
1201 unrealized_value_after,
1202 unrealized_value
1203 - neglected
1204 .iter()
1205 .map(|t| t.verified_ticket().amount)
1206 .sum::<HoprBalance>()
1207 );
1208
1209 assert_eq!(
1210 ChannelStats {
1211 winning_tickets: tickets.len() as u128,
1212 unredeemed_value: unrealized_value_after,
1213 rejected_value: HoprBalance::zero(),
1214 redeemed_value: HoprBalance::zero(),
1215 neglected_value: neglected.iter().map(|t| t.verified_ticket().amount).sum(),
1216 },
1217 mgr.ticket_stats(Some(&channel.get_id()))?
1218 );
1219
1220 Ok(())
1221 }
1222
1223 #[test]
1224 fn ticket_manager_should_neglect_tickets_on_demand_with_upper_limit_on_index() -> anyhow::Result<()> {
1225 let mgr = create_mgr()?;
1226
1227 let tickets = generate_tickets()?;
1228 let epoch = tickets[0].ticket_id().epoch;
1229
1230 let tickets = tickets
1231 .into_iter()
1232 .filter(|t| t.verified_ticket().channel_epoch == epoch)
1233 .collect::<Vec<_>>();
1234
1235 let channel = ChannelEntry::builder()
1236 .between(
1237 *tickets[0].ticket.verified_issuer(),
1238 tickets[0].verified_ticket().counterparty,
1239 )
1240 .amount(10)
1241 .ticket_index(tickets.len() as u64)
1242 .status(ChannelStatus::Open)
1243 .epoch(1)
1244 .build()?;
1245
1246 for ticket in tickets.iter() {
1247 let neglected = mgr.insert_incoming_ticket(*ticket)?;
1248 assert!(neglected.is_empty());
1249 }
1250
1251 let neglected = mgr.sync_from_incoming_channels(&[channel])?;
1252 assert!(neglected.is_empty());
1253
1254 let unrealized_value = mgr
1255 .unrealized_value(channel.get_id(), None)?
1256 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1257 assert_eq!(
1258 tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1259 unrealized_value
1260 );
1261
1262 let neglected = mgr.neglect_tickets(&channel.get_id(), Some(3))?;
1263 assert_eq!(
1264 tickets
1265 .iter()
1266 .filter(|t| t.verified_ticket().index <= 3)
1267 .map(|t| t.ticket)
1268 .collect::<Vec<_>>(),
1269 neglected
1270 );
1271
1272 let unrealized_value_after = mgr
1273 .unrealized_value(channel.get_id(), None)?
1274 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1275 assert_eq!(
1276 unrealized_value_after,
1277 unrealized_value
1278 - neglected
1279 .iter()
1280 .map(|t| t.verified_ticket().amount)
1281 .sum::<HoprBalance>()
1282 );
1283
1284 assert_eq!(
1285 ChannelStats {
1286 winning_tickets: tickets.len() as u128,
1287 unredeemed_value: unrealized_value_after,
1288 rejected_value: HoprBalance::zero(),
1289 redeemed_value: HoprBalance::zero(),
1290 neglected_value: neglected.iter().map(|t| t.verified_ticket().amount).sum(),
1291 },
1292 mgr.ticket_stats(Some(&channel.get_id()))?
1293 );
1294
1295 Ok(())
1296 }
1297
1298 #[test]
1299 fn ticket_manager_unrealized_value_should_increase_when_tickets_are_added() -> anyhow::Result<()> {
1300 let mgr = create_mgr()?;
1301
1302 let mut tickets = generate_tickets()?;
1303 let channel_id = tickets[0].ticket_id().id;
1304 let epoch = tickets[0].ticket_id().epoch;
1305 tickets.retain(|ticket| ticket.verified_ticket().channel_epoch == epoch);
1306
1307 assert!(!tickets.is_empty());
1308
1309 assert!(matches!(mgr.unrealized_value(&channel_id, None), Ok(None)));
1310
1311 let mut last_unrealized_value = HoprBalance::zero();
1312 assert_eq!(HoprBalance::zero(), last_unrealized_value);
1313
1314 for ticket in tickets.iter() {
1315 let neglected = mgr.insert_incoming_ticket(*ticket)?;
1316 assert!(neglected.is_empty());
1317
1318 let new_unrealized_value = mgr
1319 .unrealized_value(&channel_id, None)?
1320 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1321 assert_eq!(
1322 new_unrealized_value - last_unrealized_value,
1323 ticket.verified_ticket().amount
1324 );
1325
1326 last_unrealized_value = new_unrealized_value;
1327 }
1328
1329 let expected_unrealized_value: HoprBalance = tickets.iter().map(|ticket| ticket.verified_ticket().amount).sum();
1330 assert_eq!(expected_unrealized_value, last_unrealized_value);
1331
1332 assert_eq!(
1333 ChannelStats {
1334 winning_tickets: tickets.len() as u128,
1335 unredeemed_value: expected_unrealized_value,
1336 rejected_value: HoprBalance::zero(),
1337 redeemed_value: HoprBalance::zero(),
1338 neglected_value: HoprBalance::zero(),
1339 },
1340 mgr.ticket_stats(Some(&tickets[0].ticket.channel_id()))?
1341 );
1342
1343 Ok(())
1344 }
1345
1346 #[test]
1347 fn ticket_manager_inserted_ticket_with_older_epoch_should_be_neglected() -> anyhow::Result<()> {
1348 let mgr = create_mgr()?;
1349
1350 let tickets = generate_tickets()?;
1351 assert!(!tickets.is_empty());
1352 let channel_id = tickets[0].ticket_id().id;
1353
1354 let tickets_from_epoch_1 = tickets
1355 .iter()
1356 .filter(|ticket| ticket.verified_ticket().channel_epoch == 1)
1357 .cloned()
1358 .collect::<Vec<_>>();
1359 assert!(!tickets_from_epoch_1.is_empty());
1360
1361 let tickets_from_epoch_2 = tickets
1362 .iter()
1363 .filter(|ticket| ticket.verified_ticket().channel_epoch == 2)
1364 .cloned()
1365 .collect::<Vec<_>>();
1366 assert!(!tickets_from_epoch_2.is_empty());
1367
1368 for new_ticket in &tickets_from_epoch_2 {
1369 let neglected = mgr.insert_incoming_ticket(*new_ticket)?;
1370 assert!(neglected.is_empty());
1371 }
1372
1373 for old_ticket in &tickets_from_epoch_1 {
1374 let neglected = mgr.insert_incoming_ticket(*old_ticket)?;
1375 assert_eq!(vec![old_ticket.ticket], neglected);
1376 }
1377
1378 let stats = mgr.ticket_stats(Some(&channel_id))?;
1379
1380 assert_eq!(
1381 (tickets_from_epoch_1.len() + tickets_from_epoch_2.len()) as u128,
1382 stats.winning_tickets
1383 );
1384 assert_eq!(
1385 tickets_from_epoch_2
1386 .iter()
1387 .map(|t| t.verified_ticket().amount)
1388 .sum::<HoprBalance>(),
1389 stats.unredeemed_value
1390 );
1391 assert_eq!(HoprBalance::zero(), stats.rejected_value);
1392 assert_eq!(HoprBalance::zero(), stats.redeemed_value);
1393
1394 Ok(())
1395 }
1396
1397 #[test]
1398 fn ticket_manager_ticket_insertion_should_neglect_tickets_from_previous_epochs() -> anyhow::Result<()> {
1399 let mgr = create_mgr()?;
1400
1401 let tickets = generate_tickets()?;
1402 assert!(!tickets.is_empty());
1403 let channel_id = tickets[0].ticket_id().id;
1404
1405 let tickets_from_epoch_1 = tickets
1406 .iter()
1407 .filter(|ticket| ticket.verified_ticket().channel_epoch == 1)
1408 .cloned()
1409 .collect::<Vec<_>>();
1410 assert!(!tickets_from_epoch_1.is_empty());
1411
1412 let tickets_from_epoch_2 = tickets
1413 .iter()
1414 .filter(|ticket| ticket.verified_ticket().channel_epoch == 2)
1415 .cloned()
1416 .collect::<Vec<_>>();
1417 assert!(!tickets_from_epoch_2.is_empty());
1418
1419 assert!(matches!(mgr.unrealized_value(&channel_id, None), Ok(None)));
1420
1421 for ticket in tickets_from_epoch_1.iter() {
1422 let neglected = mgr.insert_incoming_ticket(*ticket)?;
1423 assert!(neglected.is_empty());
1424 }
1425
1426 let new_unrealized_value = mgr
1427 .unrealized_value(&channel_id, None)?
1428 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1429 assert_eq!(
1430 new_unrealized_value,
1431 tickets_from_epoch_1
1432 .iter()
1433 .map(|ticket| ticket.verified_ticket().amount)
1434 .sum()
1435 );
1436
1437 let neglected = mgr.insert_incoming_ticket(tickets_from_epoch_2[0].clone())?;
1438 assert_eq!(
1439 tickets_from_epoch_1.iter().map(|t| t.ticket).collect::<Vec<_>>(),
1440 neglected
1441 );
1442
1443 let new_unrealized_value = mgr
1445 .unrealized_value(&channel_id, None)?
1446 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1447 assert_eq!(tickets_from_epoch_2[0].verified_ticket().amount, new_unrealized_value);
1448
1449 assert_eq!(
1450 ChannelStats {
1451 winning_tickets: tickets_from_epoch_1.len() as u128 + 1,
1452 unredeemed_value: new_unrealized_value,
1453 rejected_value: HoprBalance::zero(),
1454 redeemed_value: HoprBalance::zero(),
1455 neglected_value: neglected.iter().map(|t| t.verified_ticket().amount).sum(),
1456 },
1457 mgr.ticket_stats(Some(&channel_id))?
1458 );
1459
1460 let queue_tickets = mgr
1461 .store
1462 .write()
1463 .open_or_create_queue(&channel_id)?
1464 .iter_unordered()?
1465 .collect::<Result<Vec<_>, _>>()?;
1466 assert_eq!(1, queue_tickets.len());
1467 assert_eq!(
1468 tickets_from_epoch_2[0].verified_ticket(),
1469 queue_tickets[0].verified_ticket()
1470 );
1471
1472 Ok(())
1473 }
1474
1475 pub type TestConnector = HoprBlockchainConnector<
1476 BlokliTestClient<FullStateEmulator>,
1477 InMemoryBackend,
1478 SafePayloadGenerator,
1479 <SafePayloadGenerator as PayloadGenerator>::TxRequest,
1480 >;
1481
1482 async fn create_test_connector(
1483 private_key: &ChainKeypair,
1484 channel: &ChannelEntry,
1485 tx_sim_delay: Option<std::time::Duration>,
1486 ) -> anyhow::Result<TestConnector> {
1487 let module_addr: [u8; 20] = [1; 20];
1488 assert_eq!(private_key.public().to_address(), channel.destination);
1490
1491 let blokli_client = BlokliTestStateBuilder::default()
1492 .with_balances([(private_key.public().to_address(), XDaiBalance::new_base(1))])
1493 .with_accounts([
1494 (
1495 AccountEntry {
1496 public_key: *OffchainKeypair::random().public(),
1497 chain_addr: private_key.public().to_address(),
1498 entry_type: AccountType::NotAnnounced,
1499 safe_address: None,
1500 key_id: 1.into(),
1501 },
1502 HoprBalance::new_base(1000),
1503 XDaiBalance::new_base(1),
1504 ),
1505 (
1506 AccountEntry {
1507 public_key: *OffchainKeypair::random().public(),
1508 chain_addr: channel.source,
1509 entry_type: AccountType::NotAnnounced,
1510 safe_address: None,
1511 key_id: 2.into(),
1512 },
1513 HoprBalance::new_base(1000),
1514 XDaiBalance::new_base(1),
1515 ),
1516 ])
1517 .with_channels([*channel])
1518 .with_hopr_network_chain_info("rotsee")
1519 .build_dynamic_client(module_addr.into())
1520 .with_tx_simulation_delay(tx_sim_delay.unwrap_or(std::time::Duration::from_millis(500)));
1521
1522 let mut connector = TestConnector::new(
1523 private_key.clone(),
1524 BlockchainConnectorConfig::default(),
1525 blokli_client,
1526 InMemoryBackend::default(),
1527 SafePayloadGenerator::new(
1528 &private_key,
1529 contract_addresses_for_network("rotsee").unwrap().1,
1530 module_addr.into(),
1531 ),
1532 );
1533 connector.connect().await?;
1534
1535 Ok(connector)
1536 }
1537
1538 #[test_log::test(tokio::test)]
1539 async fn ticket_manager_should_redeem_tickets_on_demand() -> anyhow::Result<()> {
1540 let mgr = create_mgr()?;
1541
1542 let src = ChainKeypair::random();
1543 let dst = ChainKeypair::random();
1544
1545 let channel = ChannelEntry::builder()
1546 .between(&src, &dst)
1547 .amount(10_000_000_000_u64)
1548 .ticket_index(0)
1549 .status(ChannelStatus::Open)
1550 .epoch(1)
1551 .build()?;
1552
1553 let mut tickets = generate_owned_tickets(&src, &dst, 3, 1..=1)?;
1554 tickets.shuffle(&mut rand::rng());
1555
1556 for ticket in tickets.iter() {
1557 assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1558 }
1559
1560 tickets.sort();
1561
1562 let mut unrealized_value = mgr
1563 .unrealized_value(channel.get_id(), None)?
1564 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1565 assert_eq!(
1566 tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1567 unrealized_value
1568 );
1569
1570 let connector = create_test_connector(&dst, &channel, None).await?;
1571
1572 let stream = mgr.redeem_stream(connector, *channel.get_id(), None)?;
1573
1574 pin_mut!(stream);
1575
1576 assert_eq!(
1577 Some(RedemptionResult::Redeemed(tickets[0].ticket)),
1578 stream.try_next().await?
1579 );
1580 assert_eq!(
1581 mgr.unrealized_value(channel.get_id(), None)?
1582 .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1583 unrealized_value - tickets[0].verified_ticket().amount
1584 );
1585 unrealized_value = mgr
1586 .unrealized_value(channel.get_id(), None)?
1587 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1588
1589 assert_eq!(
1590 Some(RedemptionResult::Redeemed(tickets[1].ticket)),
1591 stream.try_next().await?
1592 );
1593 assert_eq!(
1594 mgr.unrealized_value(channel.get_id(), None)?
1595 .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1596 unrealized_value - tickets[1].verified_ticket().amount
1597 );
1598 unrealized_value = mgr
1599 .unrealized_value(channel.get_id(), None)?
1600 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1601
1602 assert_eq!(
1603 Some(RedemptionResult::Redeemed(tickets[2].ticket)),
1604 stream.try_next().await?
1605 );
1606 assert_eq!(
1607 mgr.unrealized_value(channel.get_id(), None)?
1608 .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1609 unrealized_value - tickets[2].verified_ticket().amount
1610 );
1611
1612 assert_eq!(None, stream.try_next().await?);
1613
1614 Ok(())
1615 }
1616
1617 #[tokio::test]
1618 async fn ticket_manager_should_not_allow_concurrent_redemptions_on_the_same_channel() -> anyhow::Result<()> {
1619 let mgr = create_mgr()?;
1620
1621 let src = ChainKeypair::random();
1622 let dst = ChainKeypair::random();
1623
1624 let channel = ChannelEntry::builder()
1625 .between(&src, &dst)
1626 .amount(10_000_000_000_u64)
1627 .ticket_index(0)
1628 .status(ChannelStatus::Open)
1629 .epoch(1)
1630 .build()?;
1631
1632 let mut tickets = generate_owned_tickets(&src, &dst, 3, 1..=1)?;
1633 tickets.shuffle(&mut rand::rng());
1634
1635 for ticket in tickets.iter() {
1636 assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1637 }
1638
1639 let connector = std::sync::Arc::new(create_test_connector(&dst, &channel, None).await?);
1640
1641 let stream = mgr.redeem_stream(connector.clone(), *channel.get_id(), None)?;
1642
1643 assert!(mgr.redeem_stream(connector.clone(), *channel.get_id(), None).is_err());
1644
1645 drop(stream);
1646
1647 assert!(mgr.redeem_stream(connector.clone(), *channel.get_id(), None).is_ok());
1648
1649 Ok(())
1650 }
1651
1652 #[tokio::test]
1653 async fn ticket_manager_ticket_neglection_should_cut_ongoing_redemption_short() -> anyhow::Result<()> {
1654 let mgr = create_mgr()?;
1655
1656 let src = ChainKeypair::random();
1657 let dst = ChainKeypair::random();
1658
1659 let channel = ChannelEntry::builder()
1660 .between(&src, &dst)
1661 .amount(10_000_000_000_u64)
1662 .ticket_index(0)
1663 .status(ChannelStatus::Open)
1664 .epoch(1)
1665 .build()?;
1666
1667 let mut tickets = generate_owned_tickets(&src, &dst, 3, 1..=1)?;
1668 tickets.shuffle(&mut rand::rng());
1669
1670 for ticket in tickets.iter() {
1671 assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1672 }
1673
1674 tickets.sort();
1675
1676 let unrealized_value = mgr
1677 .unrealized_value(channel.get_id(), None)?
1678 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1679 assert_eq!(
1680 tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1681 unrealized_value
1682 );
1683
1684 let connector = std::sync::Arc::new(create_test_connector(&dst, &channel, None).await?);
1685
1686 let stream = mgr.redeem_stream(connector.clone(), *channel.get_id(), None)?;
1687 pin_mut!(stream);
1688
1689 assert_eq!(
1690 Some(RedemptionResult::Redeemed(tickets[0].ticket)),
1691 stream.try_next().await?
1692 );
1693 assert_eq!(
1694 mgr.unrealized_value(channel.get_id(), None)?
1695 .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1696 unrealized_value - tickets[0].verified_ticket().amount
1697 );
1698
1699 let neglected = mgr.neglect_tickets(&channel.get_id(), None)?;
1700 assert_eq!(
1701 tickets.into_iter().skip(1).map(|t| t.ticket).collect::<Vec<_>>(),
1702 neglected
1703 );
1704 assert_eq!(
1705 HoprBalance::zero(),
1706 mgr.unrealized_value(channel.get_id(), None)?
1707 .ok_or(anyhow::anyhow!("must have unrealized value"))?
1708 );
1709
1710 assert_eq!(None, stream.try_next().await?);
1711
1712 Ok(())
1713 }
1714
1715 #[tokio::test]
1716 async fn ticket_manager_partial_ticket_neglection_should_cut_ongoing_redemption_short() -> anyhow::Result<()> {
1717 let mgr = create_mgr()?;
1718
1719 let src = ChainKeypair::random();
1720 let dst = ChainKeypair::random();
1721
1722 let channel = ChannelEntry::builder()
1723 .between(&src, &dst)
1724 .amount(10_000_000_000_u64)
1725 .ticket_index(0)
1726 .status(ChannelStatus::Open)
1727 .epoch(1)
1728 .build()?;
1729
1730 let mut tickets = generate_owned_tickets(&src, &dst, 5, 1..=1)?;
1731 tickets.shuffle(&mut rand::rng());
1732
1733 for ticket in tickets.iter() {
1734 assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1735 }
1736
1737 tickets.sort();
1738
1739 let mut unrealized_value = mgr
1740 .unrealized_value(channel.get_id(), None)?
1741 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1742 assert_eq!(
1743 tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1744 unrealized_value
1745 );
1746
1747 let connector = std::sync::Arc::new(create_test_connector(&dst, &channel, None).await?);
1748
1749 let stream = mgr.redeem_stream(connector.clone(), *channel.get_id(), None)?;
1750 pin_mut!(stream);
1751
1752 assert_eq!(
1754 Some(RedemptionResult::Redeemed(tickets[0].ticket)),
1755 stream.try_next().await?
1756 );
1757 assert_eq!(
1758 mgr.unrealized_value(channel.get_id(), None)?
1759 .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1760 unrealized_value - tickets[0].verified_ticket().amount
1761 );
1762 unrealized_value = mgr
1763 .unrealized_value(channel.get_id(), None)?
1764 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1765
1766 let neglected = mgr.neglect_tickets(&channel.get_id(), Some(tickets[3].verified_ticket().index))?;
1768 assert_eq!(
1769 tickets.iter().skip(1).take(3).map(|t| t.ticket).collect::<Vec<_>>(),
1770 neglected
1771 );
1772 assert_eq!(
1773 unrealized_value
1774 - neglected
1775 .into_iter()
1776 .map(|t| t.verified_ticket().amount)
1777 .sum::<HoprBalance>(),
1778 mgr.unrealized_value(channel.get_id(), None)?
1779 .ok_or(anyhow::anyhow!("must have unrealized value"))?
1780 );
1781
1782 assert_eq!(
1784 Some(RedemptionResult::Redeemed(tickets[4].ticket)),
1785 stream.try_next().await?
1786 );
1787
1788 assert_eq!(
1789 HoprBalance::zero(),
1790 mgr.unrealized_value(channel.get_id(), None)?
1791 .ok_or(anyhow::anyhow!("must have unrealized value"))?
1792 );
1793
1794 assert_eq!(None, stream.try_next().await?);
1795
1796 Ok(())
1797 }
1798
1799 #[tokio::test]
1800 async fn ticket_manager_ticket_neglection_during_on_chain_redemption_should_be_detected() -> anyhow::Result<()> {
1801 let mgr = std::sync::Arc::new(create_mgr()?);
1802
1803 let src = ChainKeypair::random();
1804 let dst = ChainKeypair::random();
1805
1806 let channel = ChannelEntry::builder()
1807 .between(&src, &dst)
1808 .amount(10_000_000_000_u64)
1809 .ticket_index(0)
1810 .status(ChannelStatus::Open)
1811 .epoch(1)
1812 .build()?;
1813
1814 let mut tickets = generate_owned_tickets(&src, &dst, 5, 1..=1)?;
1815 tickets.shuffle(&mut rand::rng());
1816
1817 for ticket in tickets.iter() {
1818 assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1819 }
1820
1821 tickets.sort();
1822
1823 let connector =
1824 std::sync::Arc::new(create_test_connector(&dst, &channel, Some(std::time::Duration::from_secs(2))).await?);
1825
1826 let mgr_clone = mgr.clone();
1827 let jh = tokio::task::spawn(async move {
1828 let stream = mgr_clone.redeem_stream(connector.clone(), *channel.get_id(), None)?;
1829 pin_mut!(stream);
1830 stream.try_next().await
1831 });
1832 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1833
1834 let neglected = mgr.neglect_tickets(&channel.get_id(), None)?;
1836 assert_eq!(neglected, tickets.iter().map(|t| t.ticket).collect::<Vec<_>>());
1837
1838 assert_eq!(
1839 ChannelStats {
1840 winning_tickets: tickets.len() as u128,
1841 unredeemed_value: HoprBalance::zero(),
1842 rejected_value: HoprBalance::zero(),
1843 redeemed_value: HoprBalance::zero(),
1844 neglected_value: neglected.iter().map(|t| t.verified_ticket().amount).sum(),
1845 },
1846 mgr.ticket_stats(Some(&channel.get_id()))?
1847 );
1848
1849 let res = jh.await??;
1851 assert_eq!(Some(RedemptionResult::Redeemed(tickets[0].ticket)), res);
1852
1853 assert_eq!(
1854 ChannelStats {
1855 winning_tickets: tickets.len() as u128,
1856 unredeemed_value: HoprBalance::zero(),
1857 rejected_value: HoprBalance::zero(),
1858 redeemed_value: tickets[0].verified_ticket().amount,
1859 neglected_value: neglected
1860 .iter()
1861 .map(|t| t.verified_ticket().amount)
1862 .sum::<HoprBalance>()
1863 - tickets[0].verified_ticket().amount,
1864 },
1865 mgr.ticket_stats(Some(&channel.get_id()))?
1866 );
1867
1868 Ok(())
1869 }
1870
1871 #[tokio::test]
1872 async fn ticket_manager_ticket_redemption_should_skip_low_value_tickets() -> anyhow::Result<()> {
1873 let mgr = create_mgr()?;
1874
1875 let src = ChainKeypair::random();
1876 let dst = ChainKeypair::random();
1877
1878 let channel = ChannelEntry::builder()
1879 .between(&src, &dst)
1880 .amount(10_000_000_000_u64)
1881 .ticket_index(0)
1882 .status(ChannelStatus::Open)
1883 .epoch(1)
1884 .build()?;
1885
1886 let mut tickets = generate_owned_tickets(&src, &dst, 5, 1..=1)?;
1887 tickets.shuffle(&mut rand::rng());
1888
1889 for ticket in tickets.iter() {
1890 assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1891 }
1892
1893 tickets.sort();
1894
1895 let unrealized_value = mgr
1896 .unrealized_value(channel.get_id(), None)?
1897 .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1898 assert_eq!(
1899 tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1900 unrealized_value
1901 );
1902
1903 let connector = std::sync::Arc::new(create_test_connector(&dst, &channel, None).await?);
1904
1905 let results = mgr
1906 .redeem_stream(
1907 connector.clone(),
1908 *channel.get_id(),
1909 Some(tickets[0].verified_ticket().amount + 1),
1910 )?
1911 .try_collect::<Vec<_>>()
1912 .await?;
1913
1914 assert_eq!(
1915 results,
1916 tickets
1917 .into_iter()
1918 .map(|t| RedemptionResult::ValueTooLow(t.ticket))
1919 .collect::<Vec<_>>()
1920 );
1921
1922 Ok(())
1923 }
1924}