Skip to main content

hopr_ticket_manager/
lib.rs

1//! Implements complete logic of ticket management in the HOPR protocol.
2//!
3//! See the [`HoprTicketManager`] documentation for complete details.
4
5mod backend;
6mod errors;
7mod traits;
8mod utils;
9
10use std::{convert::identity, sync::atomic::AtomicBool};
11
12use futures::{Stream, TryFutureExt};
13use hopr_api::{
14    chain::{ChainWriteTicketOperations, TicketRedeemError},
15    tickets::{ChannelStats, RedemptionResult},
16    types::{internal::prelude::*, primitive::prelude::*},
17};
18
19#[cfg(feature = "redb")]
20pub use crate::backend::{RedbStore, RedbTicketQueue};
21use crate::{
22    backend::ValueCachedQueue,
23    utils::{ChannelTicketQueue, OutgoingIndexCache},
24};
25pub use crate::{
26    backend::{MemoryStore, MemoryTicketQueue},
27    errors::TicketManagerError,
28    traits::{OutgoingIndexStore, TicketQueue, TicketQueueStore},
29};
30
31/// Keeps track of indices for outgoing tickets and (optionally) of incoming redeemable tickets.
32///
33/// The capabilities of the `HoprTicketManager` are given by the store `S`:
34/// - if the store implements [`OutgoingIndexStore`], the outgoing index tracking functions are available.
35/// - if the store implements [`TicketQueueStore`], the incoming redeemable ticket management is available.
36///
37/// It is possible to have both implementations in the same store object. Some store implementations may offer
38/// persistence, others may not. The `HoprTicketManager` takes ownership of the store object, and other
39/// processes should not attempt to change the store externally. For this reason, stores should not be cloneable.
40///
41/// The HOPR node type gives typical use-cases of the `HoprTicketManager`:
42///
43/// - Entry/Exit nodes only need to provide an `OutgoingIndexStore`, since they are dealing with outgoing tickets only.
44/// - Relay nodes need to provide a store which implements both `OutgoingIndexStore + TicketQueueStore`, because they
45///   need to deal with both outgoing tickets and incoming redeemable tickets.
46///
47/// To synchronize the on-chain state with the store, it is advised to call
48/// [`sync_outgoing_channels`](HoprTicketManager::sync_from_outgoing_channels) (and
49/// [`sync_incoming_channels`](HoprTicketManager::sync_from_incoming_channels) if applicable to the chosen store) early
50/// after the construction of the manager, to make sure outdated data is discarded early. This is typically done only
51/// once after construction and not needed to be done during the life-time of the manager.
52///
53/// The manager is safe to be shared via an `Arc`. It is typically shared between the packet processing pipelines
54/// (outgoing on Entry/Exit nodes, incoming on Relay nodes) and some higher level component
55/// that performs redeemable ticket extractions (in the case of a Relay node).
56///
57/// ### Usage in outgoing packet pipeline
58/// The outgoing packet pipeline usually just calls the
59/// [`create_multihop_ticket`](HoprTicketManager::next_multihop_ticket) to create a ticket for the next hop on a
60/// multi-hop path. To create zero/last-hop tickets, the ticket manager is not needed as these tickets essentially
61/// contain bogus data and there's no channel required.
62///
63/// The outgoing indices are **not** automatically synchronized back to the underlying store for performance reasons.
64/// The user is responsible for calling [`save_outgoing_indices`](HoprTicketManager::save_outgoing_indices) to save
65/// the outgoing indices to the store.
66///
67/// This usage is typical for all kinds of nodes (Entry/Relay/Exit).
68///
69/// ### Usage in incoming packet pipeline
70/// The incoming packet pipeline usually just calls the
71/// [`insert_incoming_ticket`](HoprTicketManager::insert_incoming_ticket) whenever a new winning, redeemable ticket is
72/// received on an incoming channel.
73///
74/// This usage is typical for Relay nodes only.
75///
76/// ### Redeemable ticket extraction
77/// On Relay nodes, the manager maintains FIFO queues of redeemable tickets per incoming channel.
78/// There are two ways to extract tickets from the queue on a Relay:
79///
80/// 1. redeeming them via [`redeem_stream`](hopr_api::tickets::TicketManagement::redeem_stream)
81/// 2. neglecting them via [`neglect_tickets`](hopr_api::tickets::TicketManagement::neglect_tickets)
82///
83/// Both of these operations extract the tickets in the FIFO order from the queue,
84/// making sure that they are always processed in their natural order (by epoch and index).
85///
86/// Both ticket extraction operations are mutually exclusive and cannot be performed simultaneously.
87///
88/// ## Locking and lock-contention
89/// There are several methods in the `HoprTicketManager` object that are expected to be called
90/// in the highly performance-sensitive code, on a per-packet basis.
91///
92/// ### Outgoing ticket creation
93/// The [`create_multihop_ticket`](HoprTicketManager::next_multihop_ticket) method is designed to be
94/// high-performance and to be called per each outgoing packet. It is using only atomics to track the outgoing
95/// ticket index for a channel. The synchronization to the underlying storage is done on-demand by calling
96/// `save_outgoing_indices`, making quick snapshots of the current state of outgoing indices.
97/// No significant contention is expected unless `save_outgoing_indices` is called very frequently.`
98///
99/// ### Incoming winning ticket retrieval
100/// The [`insert_incoming_ticket`](HoprTicketManager::insert_incoming_ticket) method is designed to be
101/// high-performance and to be called per each incoming packet **after** it has been forwarded to a next hop.
102///
103/// This operation acquires the write-part of an RW lock (per incoming channel).
104/// This may block the hot-path only if one of the following (also write) operations is performed:
105///     1. Ticket redemption has just finished in that particular channel, and the redeemed ticket is dropped from the
106///     same incoming channel queue.
107///     2. Ticket neglection has just finished in that particular channel, and the neglected ticket is dropped from the
108///     same incoming channel queue.
109///
110/// Both of these operations happen rarely, and the write lock is usually held only for a short time. In addition,
111/// incoming winning tickets are not supposed to usually happen very often. Therefore, high contention on
112/// the write lock is not expected.
113///
114/// ### Incoming unacknowledged ticket verification
115/// The [`unrealized_value`](HoprTicketManager::unrealized_value) method is designed to be high-performance
116/// and to be called per each incoming packet **before** it is forwarded to a next hop.
117///
118/// This operation acquires the read-part of an RW lock (per incoming channel). This may block the hot-path only if
119/// one of the following (write) operations is performed at the same moment:
120///     1. A new incoming winning ticket is inserted into the same incoming channel queue.
121///     2. Ticket redemption has just finished in that particular channel, and the redeemed ticket is dropped from the
122///     same incoming channel queue.
123///     3. Ticket neglection has just finished in that particular channel, and the neglected ticket is dropped from the
124///     same incoming channel queue.
125///
126/// All 3 of these operations are not expected to happen very often on a single channel; therefore, high contention
127/// on the RW lock is not expected.
128#[derive(Debug)]
129pub struct HoprTicketManager<S, Q> {
130    out_idx_tracker: OutgoingIndexCache,
131    channel_tickets: dashmap::DashMap<ChannelId, ChannelTicketQueue<ValueCachedQueue<Q>>>,
132    store: std::sync::Arc<parking_lot::RwLock<S>>,
133}
134
135impl<S, Q> HoprTicketManager<S, Q>
136where
137    S: OutgoingIndexStore + Send + Sync + 'static,
138{
139    /// Creates a new ticket manager instance given the desired `store`.
140    ///
141    /// The instance is supposed to take complete ownership of the `store` object. The store
142    /// implementations should not allow
143    ///
144    /// It is advised to call [`HoprTicketManager::sync_from_outgoing_channels`] and
145    /// [`HoprTicketManager::sync_from_incoming_channels`] at least once before the manager
146    /// is used any further.
147    pub fn new(store: S) -> Result<Self, TicketManagerError> {
148        let store = std::sync::Arc::new(parking_lot::RwLock::new(store));
149        Ok(Self {
150            out_idx_tracker: OutgoingIndexCache::default(),
151            channel_tickets: dashmap::DashMap::new(),
152            store,
153        })
154    }
155
156    /// Gets the next usable ticket index for an outgoing ticket in the given channel and epoch.
157    ///
158    /// This operation is fast and does not immediately put the index into the [`OutgoingIndexStore`].
159    ///
160    /// The returned value is always guaranteed to be greater or equal to the ticket index on the given `channel`.
161    fn next_outgoing_ticket_index(&self, channel: &ChannelEntry) -> u64 {
162        let mut next_index = self.out_idx_tracker.next(channel.get_id(), channel.channel_epoch);
163        tracing::trace!(%channel, next_index, "next outgoing ticket index");
164
165        let epoch = channel.channel_epoch;
166
167        if next_index < channel.ticket_index {
168            // Correct the value in the cache if it was lower than the ticket index on the channel.
169            // This sets the value in the cache to the next index after the ticket index on the channel.
170            self.out_idx_tracker
171                .upsert(channel.get_id(), epoch, channel.ticket_index + 1);
172            next_index = channel.ticket_index; // Still, use the channel's ticket index as the next index.
173        }
174
175        // If this is the first index in this epoch,
176        // remove the previous epoch from the map if any.
177        // Epochs always start at 1, ticket indices at 0.
178        if next_index == 0 && epoch > 1 && self.out_idx_tracker.remove(channel.get_id(), epoch - 1) {
179            tracing::trace!(%channel, prev_epoch = epoch - 1, "removing previous epoch from outgoing index cache");
180        }
181
182        next_index
183    }
184
185    /// Saves outgoing ticket indices back to the store.
186    ///
187    /// The operation does nothing if there were no [new tickets created](HoprTicketManager::next_multihop_ticket)
188    /// on any tracked channel.
189    pub fn save_outgoing_indices(&self) -> Result<(), TicketManagerError> {
190        self.out_idx_tracker
191            .save(self.store.clone())
192            .map_err(TicketManagerError::store)?;
193        Ok(())
194    }
195
196    /// Synchronizes the outgoing index counters based on the current on-chain channel
197    /// state given by `outgoing_channels`.
198    ///
199    /// Outgoing indices for channels that either are not present in `outgoing_channels` or
200    /// not present as opened channels will be removed from the store.
201    ///
202    /// Outgoing indices for existing open channels in `outgoing_channels` will be either:
203    /// - added to the store with their current index and epoch (if not present in the store), or
204    /// - updated to the maximum of the two index values (if present in the store)
205    ///
206    /// It is advised to call this function early after the construction of the `HoprTicketManager`
207    /// to ensure pruning of dangling or out-of-date values.
208    pub fn sync_from_outgoing_channels(&self, outgoing_channels: &[ChannelEntry]) -> Result<(), TicketManagerError> {
209        let outgoing_channels: std::collections::HashSet<_, std::hash::RandomState> =
210            outgoing_channels.iter().collect();
211
212        // Purge outdated outgoing indices
213        let mut store_read = self.store.upgradable_read();
214        let stored_indices = store_read
215            .iter_outgoing_indices()
216            .map_err(TicketManagerError::store)?
217            .collect::<Vec<_>>();
218        for (channel_id, epoch) in stored_indices {
219            // If any stored outgoing index does not match any currently existing opened channel,
220            // remove it from the store
221            if !outgoing_channels.iter().any(|channel| {
222                channel.status == ChannelStatus::Open
223                    && channel.get_id() == &channel_id
224                    && channel.channel_epoch == epoch
225            }) {
226                let mut store_write = parking_lot::RwLockUpgradableReadGuard::upgrade(store_read);
227                store_write
228                    .delete_outgoing_index(&channel_id, epoch)
229                    .map_err(TicketManagerError::store)?;
230                store_read = parking_lot::RwLockWriteGuard::downgrade_to_upgradable(store_write);
231                tracing::debug!(%channel_id, epoch, "purging outdated outgoing index")
232            }
233        }
234
235        for channel in outgoing_channels
236            .iter()
237            .filter(|channel| channel.status == ChannelStatus::Open)
238        {
239            let id = channel.get_id();
240
241            // Either load a previously stored outgoing index or use the channel's ticket index as a
242            // fallback
243            let epoch = channel.channel_epoch;
244            let index = match store_read.load_outgoing_index(id, epoch) {
245                Ok(Some(out_index)) => out_index,
246                Ok(None) => 0,
247                Err(error) => {
248                    tracing::error!(%error, %id, "failed to load outgoing index for channel, falling back to channel ticket index");
249                    0
250                }
251            };
252
253            // Always use the maximum from the stored value and the current ticket index on the channel
254            let out_index = index.max(channel.ticket_index);
255            self.out_idx_tracker.upsert(id, epoch, out_index);
256            tracing::debug!(%id, epoch, out_index, "loaded outgoing ticket index for channel");
257        }
258
259        tracing::debug!(
260            num_channels = outgoing_channels.len(),
261            "synchronized with outgoing channels"
262        );
263        Ok(())
264    }
265
266    /// Creates a ticket for the next hop on a multi-hop path.
267    ///
268    /// The `current_path_pos` indicates the position of the current hop in the multi-hop path.
269    /// It is used to determine the value of the ticket: `price * (current_path_pos - 1) / winning_prob`.
270    /// The function does not make sense for `current_path_pos <= 1` and returns an error if such an argument is
271    /// provided.
272    ///
273    /// For last-hop tickets (`current_path_pos` equal to 1), a [zero hop ticket](TicketBuilder::zero_hop) should be
274    /// created instead.
275    ///
276    /// The function will fail for channels that are not opened or do not have enough funds to cover the ticket value.
277    /// The ticket index of the returned ticket is guaranteed to be greater or equal to the ticket index on the
278    /// given `channel` argument.
279    pub fn next_multihop_ticket(
280        &self,
281        channel: &ChannelEntry,
282        current_path_pos: u8,
283        winning_prob: WinningProbability,
284        ticket_price: HoprBalance,
285    ) -> Result<TicketBuilder, TicketManagerError> {
286        if current_path_pos <= 1 {
287            return Err(TicketManagerError::Other(anyhow::anyhow!(
288                "current path position for multihop ticket must be greater than 1"
289            )));
290        }
291
292        if channel.status != ChannelStatus::Open {
293            return Err(TicketManagerError::Other(anyhow::anyhow!(
294                "channel must be open to create a multihop ticket"
295            )));
296        }
297
298        // The next ticket is worth: price * remaining hop count / winning probability
299        let amount = HoprBalance::from(
300            ticket_price
301                .amount()
302                .saturating_mul(U256::from(current_path_pos - 1))
303                .div_f64(winning_prob.into())
304                .expect("winning probability is always less than or equal to 1"),
305        );
306
307        if channel.balance.lt(&amount) {
308            return Err(TicketManagerError::OutOfFunds(*channel.get_id(), amount));
309        }
310
311        let ticket_builder = TicketBuilder::default()
312            .counterparty(channel.destination)
313            .balance(amount)
314            .index(self.next_outgoing_ticket_index(channel))
315            .win_prob(winning_prob)
316            .channel_epoch(channel.channel_epoch);
317
318        Ok(ticket_builder)
319    }
320}
321
322struct RedeemState<C, Q> {
323    lock: std::sync::Arc<AtomicBool>,
324    queue: std::sync::Arc<parking_lot::RwLock<Q>>,
325    chain: C,
326    min_redeem_value: HoprBalance,
327    channel_id: ChannelId,
328}
329
330impl<C, Q> Drop for RedeemState<C, Q> {
331    fn drop(&mut self) {
332        self.lock.store(false, std::sync::atomic::Ordering::Release);
333    }
334}
335
336impl<S> HoprTicketManager<S, S::Queue>
337where
338    S: TicketQueueStore + Send + Sync + 'static,
339    S::Queue: Send + Sync + 'static,
340{
341    /// Synchronizes the existing incoming redeemable ticket queues with the state of the
342    /// current `incoming_channels`.
343    ///
344    /// Any incoming ticket queues that correspond to a channel that is no longer open or effectively open (in
345    /// `incoming_channels`) will be dropped and the tickets neglected.
346    ///
347    /// For all opened or effectively opened incoming channels inside `incoming_channels`, either an existing
348    /// ticket queue is opened or a new one is created (without any tickets in it).
349    ///
350    /// All the neglected tickets are returned from the function to make further accounting possible,
351    /// but they are no longer redeemable.
352    ///
353    /// It is advised to call this function early after the construction of the `HoprTicketManager`
354    /// to ensure pruning of dangling or out-of-date values.
355    pub fn sync_from_incoming_channels(
356        &self,
357        incoming_channels: &[ChannelEntry],
358    ) -> Result<Vec<VerifiedTicket>, TicketManagerError> {
359        let incoming_channels: std::collections::HashSet<_, std::hash::RandomState> =
360            incoming_channels.iter().collect();
361
362        // Purge outdated incoming channel queues
363        let mut store_read = self.store.upgradable_read();
364        let stored_queues = store_read
365            .iter_queues()
366            .map_err(TicketManagerError::store)?
367            .collect::<Vec<_>>();
368        let mut neglected = Vec::new();
369        let now = hopr_platform::time::current_time();
370        for channel_id in stored_queues {
371            // If any existing redeemable ticket queue does not match any currently existing
372            // channel that's either open or its closure period did not yet elapse (i.e., the channel
373            // is not closed or not effectively closed), remove the queue from the store.
374            if !incoming_channels
375                .iter()
376                .any(|channel| !channel.closure_time_passed(now) && channel.get_id() == &channel_id)
377            {
378                let mut store_write = parking_lot::RwLockUpgradableReadGuard::upgrade(store_read);
379                neglected.extend(
380                    store_write
381                        .delete_queue(&channel_id)
382                        .map_err(TicketManagerError::store)?,
383                );
384                tracing::debug!(%channel_id, "purged outdated incoming tickets queue");
385                store_read = parking_lot::RwLockWriteGuard::downgrade_to_upgradable(store_write);
386
387                // We cannot account the neglected tickets, because the channel has been closed.
388                self.channel_tickets.remove(&channel_id);
389            }
390        }
391        // Create or open ticket queues for all incoming channels that are open or effectively open
392        for channel in incoming_channels
393            .iter()
394            .filter(|channel| !channel.closure_time_passed(now))
395        {
396            let id = channel.get_id();
397
398            // Either open an existing queue for that channel or create a new one
399            let mut store_write = parking_lot::RwLockUpgradableReadGuard::upgrade(store_read);
400            let queue = store_write
401                .open_or_create_queue(id)
402                .map_err(TicketManagerError::store)?;
403            store_read = parking_lot::RwLockWriteGuard::downgrade_to_upgradable(store_write);
404
405            // Wrap the queue with a ticket value cache adapter
406            let queue = ValueCachedQueue::new(queue).map_err(TicketManagerError::store)?;
407
408            tracing::debug!(%id, num_tickets = queue.len().map_err(TicketManagerError::store)?, "loaded redeemable ticket queue for channel");
409            self.channel_tickets.insert(*id, queue.into());
410        }
411
412        tracing::debug!(
413            num_channels = incoming_channels.len(),
414            num_neglected = neglected.len(),
415            "synchronized with incoming channels"
416        );
417        Ok(neglected)
418    }
419
420    /// Inserts a new incoming winning redeemable ticket into the ticket manager.
421    ///
422    /// On success, the method returns all tickets that have been neglected in the ticket queue of this channel,
423    /// in case the inserted ticket has a greater channel epoch than the [next extractable](TicketQueue::peek) ticket in
424    /// the queue. This situation can happen when unredeemed tickets are left in the queue, while the corresponding
425    /// channel restarts its lifecycle and a new winning ticket is received.
426    /// Otherwise, the returned vector is empty.
427    pub fn insert_incoming_ticket(&self, ticket: RedeemableTicket) -> Result<Vec<VerifiedTicket>, TicketManagerError> {
428        // Do not allocate, because neglecting tickets is a rare operation
429        let mut neglected_tickets = Vec::with_capacity(0);
430
431        let ticket_id = ticket.ticket_id();
432        match self.channel_tickets.entry(ticket_id.id) {
433            dashmap::Entry::Occupied(e) => {
434                // High contention on this write lock is possible only when massive numbers of winning tickets
435                // on the same channel are received, or if tickets on the same channel are being
436                // rapidly redeemed or neglected.
437                // Such a scenario is likely not realistic.
438                let mut queue = e.get().queue.write();
439
440                // If the next ticket ready in this queue is from a previous epoch, we must
441                // drain and neglect all the tickets from the queue. The channel has
442                // apparently restarted its lifecycle, and all the tickets from previous epochs
443                // are unredeemable already
444                if let Some(last_ticket) = queue.0.peek().map_err(TicketManagerError::store)? {
445                    if last_ticket.verified_ticket().channel_epoch < ticket.verified_ticket().channel_epoch {
446                        // Count the neglected value and add it to stats
447                        let mut neg = queue.0.drain().map_err(TicketManagerError::store)?;
448                        queue.1.neglected_value += neg.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>();
449
450                        // Ensures allocation according to the number of drained tickets
451                        neglected_tickets.append(&mut neg);
452                        tracing::warn!(%ticket_id, num_neglected = neglected_tickets.len(), "winning ticket has neglected unredeemed tickets from previous epochs");
453                    } else if last_ticket.verified_ticket().channel_epoch > ticket.verified_ticket().channel_epoch {
454                        tracing::warn!(%ticket_id, "tried to insert incoming ticket from an older epoch");
455
456                        queue.1.winning_tickets += 1; // Still count the ticket as winning
457                        queue.1.neglected_value += ticket.verified_ticket().amount;
458                        neglected_tickets.push(ticket.ticket);
459                        return Ok(neglected_tickets);
460                    }
461                }
462                queue.0.push(ticket).map_err(TicketManagerError::store)?;
463                queue.1.winning_tickets += 1;
464
465                tracing::debug!(%ticket_id, "winning ticket on channel");
466            }
467            dashmap::Entry::Vacant(v) => {
468                // A hypothetical chance of high contention on this write lock is
469                // only possible when massive numbers of winning tickets on new unique channels are received.
470                // Such a scenario is likely not realistic.
471                let mut store = self.store.write();
472
473                let queue = store
474                    .open_or_create_queue(&ticket.ticket_id().id)
475                    .map_err(TicketManagerError::store)?;
476
477                // Wrap the queue with a ticket value cache adapter
478                let mut queue = ValueCachedQueue::new(queue).map_err(TicketManagerError::store)?;
479
480                // Should not happen: it suggests the queue has been modified outside the manager
481                if !queue.is_empty().map_err(TicketManagerError::store)? {
482                    return Err(TicketManagerError::Other(anyhow::anyhow!(
483                        "fatal error: queue not empty"
484                    )));
485                }
486
487                queue.push(ticket).map_err(TicketManagerError::store)?;
488                v.insert(queue.into()); // The ticket is accounted for in the stats automatically
489                tracing::debug!(%ticket_id, "first winning ticket on channel");
490            }
491        }
492
493        Ok(neglected_tickets)
494    }
495
496    /// Returns the total value of unredeemed tickets in the given channel and its latest epoch.
497    ///
498    /// NOTE: The function is less efficient when the `min_index` is specified, as
499    /// a full scan of the queue is required to calculate the unrealized value.
500    pub fn unrealized_value(
501        &self,
502        channel_id: &ChannelId,
503        min_index: Option<u64>,
504    ) -> Result<Option<HoprBalance>, TicketManagerError> {
505        if let Some(ticket_queue) = self.channel_tickets.get(channel_id) {
506            // There is low contention on this read lock, because write locks are acquired only
507            // when a new winning ticket has been added, redeemed or neglected, all of which are fairly rare operations.
508            let queue = ticket_queue.queue.read();
509
510            // Get the epoch of the first extractable ticket in the queue.
511            // The ticket insertion takes care that there are no tickets
512            // with epochs other than the current epoch.
513            if let Some(epoch) = queue
514                .0
515                .peek()
516                .map_err(TicketManagerError::store)?
517                .map(|t| t.verified_ticket().channel_epoch)
518            {
519                Ok(Some(
520                    queue
521                        .0
522                        .total_value(epoch, min_index)
523                        .map_err(TicketManagerError::store)?,
524                ))
525            } else {
526                Ok(Some(HoprBalance::zero()))
527            }
528        } else {
529            Ok(None)
530        }
531    }
532}
533impl<S> hopr_api::tickets::TicketManagement for HoprTicketManager<S, S::Queue>
534where
535    S: TicketQueueStore + Send + Sync + 'static,
536    S::Queue: Send + Sync + 'static,
537{
538    type Error = TicketManagerError;
539
540    /// Creates a stream that redeems tickets in-order one by one in the given channel,
541    /// using the given [`ChainWriteTicketOperations`] on-chain client
542    /// implementation.
543    ///
544    /// If `min_redeem_value` is given, all the tickets that are lower than the given value are neglected in the
545    /// process.
546    ///
547    /// If there's already an existing redeem stream for the channel, an error is returned without creating a new
548    /// stream.
549    ///
550    /// The stream terminates when there are no more tickets to process in the queue, or an error is encountered.
551    fn redeem_stream<C: ChainWriteTicketOperations + Send + Sync + 'static>(
552        &self,
553        chain: C,
554        channel_id: ChannelId,
555        min_amount: Option<HoprBalance>,
556    ) -> Result<impl Stream<Item = Result<RedemptionResult, Self::Error>> + Send, Self::Error> {
557        let initial_state = match self.channel_tickets.get(&channel_id) {
558            Some(ticket_queue) => {
559                ticket_queue
560                    .redeem_lock
561                    .compare_exchange(
562                        false,
563                        true,
564                        std::sync::atomic::Ordering::Acquire,
565                        std::sync::atomic::Ordering::Relaxed,
566                    )
567                    .map_err(|_| TicketManagerError::AlreadyRedeeming)?;
568
569                RedeemState {
570                    lock: ticket_queue.redeem_lock.clone(),
571                    queue: ticket_queue.queue.clone(),
572                    min_redeem_value: min_amount.unwrap_or_default(), // default min is 0 wxHOPR
573                    chain,
574                    channel_id,
575                }
576            }
577            None => return Err(TicketManagerError::ChannelQueueNotFound),
578        };
579
580        Ok(futures::stream::try_unfold(initial_state, |state| {
581            // Peek here and release the read lock to prevent holding it across an `await`
582            let next_ticket = state.queue.read().0.peek();
583            async move {
584                match next_ticket.map_err(TicketManagerError::store)? {
585                    Some(ticket_to_redeem) => {
586                        // Attempt to redeem the ticket if it is of sufficient value
587                        let redeem_attempt_result =
588                            if ticket_to_redeem.verified_ticket().amount >= state.min_redeem_value {
589                                match state.chain.redeem_ticket(ticket_to_redeem).and_then(identity).await {
590                                    Ok((redeemed_ticket, _)) => Ok(Some(RedemptionResult::Redeemed(redeemed_ticket))),
591                                    Err(TicketRedeemError::Rejected(ticket, reason)) => {
592                                        Ok(Some(RedemptionResult::RejectedOnChain(ticket, reason)))
593                                    }
594                                    Err(TicketRedeemError::ProcessingError(_, err)) => {
595                                        Err(TicketManagerError::redeem(err))
596                                    }
597                                }
598                            } else {
599                                // Tickets with low value are treated as neglected
600                                Ok(Some(RedemptionResult::ValueTooLow(ticket_to_redeem.ticket)))
601                            };
602
603                        // Once the redemption has been completed, no matter if successful or not,
604                        // check if we need to remove the ticket from the redemption queue.
605                        if let Ok(Some(redeem_complete_result)) =  &redeem_attempt_result {
606                            // In this case, no matter if the ticket has been redeemed,
607                            // neglected or rejected, we're still removing it from the queue.
608                            // Otherwise, the ticket stays in the queue due to a recoverable error
609
610                            let mut queue_write = state.queue.write();
611                            let pop_res = queue_write.0.pop().map_err(TicketManagerError::store)?;
612
613                            // Do accounting of the ticket into the stats
614                            match redeem_complete_result {
615                                RedemptionResult::Redeemed(ticket) => {
616                                    queue_write.1.redeemed_value += ticket.verified_ticket().amount;
617                                    tracing::info!(%ticket, "ticket has been redeemed");
618                                },
619                                RedemptionResult::ValueTooLow(ticket) => {
620                                    queue_write.1.neglected_value += ticket.verified_ticket().amount;
621                                    tracing::warn!(%ticket, "ticket has been neglected");
622                                },
623                                RedemptionResult::RejectedOnChain(ticket, reason) => {
624                                    queue_write.1.rejected_value += ticket.verified_ticket().amount;
625                                    tracing::warn!(%ticket, reason, "ticket has been rejected on-chain");
626                                },
627                            }
628
629                            // This can only happen if `neglect_tickets` has been called while redeeming,
630                            // and it has neglected the ticket during this race-condition.
631                            // In this case we only need to correct the neglected value because
632                            // the ticket has been actually redeemed/rejected or was accounted
633                            // as neglected twice.
634                            if pop_res.is_none() {
635                                let ticket = redeem_complete_result.as_ref();
636                                tracing::warn!(%ticket, "ticket has been neglected from the queue while it actually completed the redemption process");
637                                queue_write.1.neglected_value -= ticket.verified_ticket().amount;
638                            }
639                        }
640
641                        redeem_attempt_result
642                    }
643                    None => {
644                        // No more tickets to redeem in this channel
645                        // Keep the queue in even if it is empty. The cleanup is done only on startup.
646                        tracing::debug!(channel_id = %state.channel_id, "no more tickets to redeem in channel");
647                        Ok(None)
648                    }
649                }
650                .map(|s| s.map(|v| (v, state)))
651            }
652        }))
653    }
654
655    /// Removes all the tickets in the given [`ChannelId`], optionally only up to the given ticket index (inclusive).
656    ///
657    /// If the `up_to_index` is given and lower than the lowest index of an unredeemed ticket in the queue,
658    /// the function does nothing.
659    ///
660    /// If there's ticket redemption ongoing in the same channel and the neglection intersects with the
661    /// redeemed range, the redemption will be cut short, with remaining unredeemed tickets neglected.
662    fn neglect_tickets(
663        &self,
664        channel_id: &ChannelId,
665        up_to_index: Option<u64>,
666    ) -> Result<Vec<VerifiedTicket>, TicketManagerError> {
667        let queue = self
668            .channel_tickets
669            .get(channel_id)
670            .map(|q| {
671                if q.redeem_lock.load(std::sync::atomic::Ordering::Relaxed) {
672                    tracing::warn!(%channel_id, "neglecting tickets in channel while redeeming is ongoing");
673                }
674                q.queue.clone()
675            })
676            .ok_or(TicketManagerError::ChannelQueueNotFound)?;
677
678        let mut neglected_tickets = Vec::new();
679        let mut queue_read = queue.upgradable_read();
680        let max_index = up_to_index.unwrap_or(TicketBuilder::MAX_TICKET_INDEX);
681
682        while queue_read
683            .0
684            .peek()
685            .map_err(TicketManagerError::store)?
686            .filter(|ticket| ticket.verified_ticket().index <= max_index)
687            .is_some()
688        {
689            // Quickly perform pop and downgrade to lock not to block any readers
690            let mut queue_write = parking_lot::RwLockUpgradableReadGuard::upgrade(queue_read);
691            let maybe_ticket = queue_write.0.pop().map_err(TicketManagerError::store)?;
692            queue_write.1.neglected_value += maybe_ticket.map(|t| t.verified_ticket().amount).unwrap_or_default();
693            queue_read = parking_lot::RwLockWriteGuard::downgrade_to_upgradable(queue_write);
694
695            neglected_tickets.extend(maybe_ticket.map(|t| t.ticket));
696            tracing::debug!(%channel_id, ?maybe_ticket, "neglected ticket in channel");
697        }
698
699        // Keep the queue in even if it is empty. The cleanup is done only on startup.
700        tracing::debug!(%channel_id, num_tickets = neglected_tickets.len(), "ticket neglection done in channel");
701        Ok(neglected_tickets)
702    }
703
704    /// Computes [statistics](ChannelStats) for the given `channel` or for all channels if `None` is given.
705    ///
706    /// If the given `channel` does not exist, it returns zero statistics instead of an error.
707    ///
708    /// Apart from [`unredeemed_value`](ChannelStats), the statistics are not persistent.
709    #[allow(deprecated)] // TODO: remove once blokli#237 is merged
710    fn ticket_stats(&self, channel: Option<&ChannelId>) -> Result<ChannelStats, TicketManagerError> {
711        self.channel_tickets
712            .iter()
713            .filter(|e| channel.is_none_or(|c| c == e.key()))
714            .try_fold(ChannelStats::default(), |stats, v| {
715                let queue = v.queue.read();
716                Ok::<_, TicketManagerError>(ChannelStats {
717                    winning_tickets: queue.1.winning_tickets + stats.winning_tickets,
718                    unredeemed_value: queue
719                        .0
720                        .peek()
721                        .map_err(TicketManagerError::store)?
722                        .map(|t| queue.0.total_value(t.verified_ticket().channel_epoch, None))
723                        .transpose()
724                        .map_err(TicketManagerError::store)?
725                        .unwrap_or_default()
726                        + stats.unredeemed_value,
727                    rejected_value: queue.1.rejected_value + stats.rejected_value,
728                    redeemed_value: queue.1.redeemed_value + stats.redeemed_value,
729                    neglected_value: queue.1.neglected_value + stats.neglected_value,
730                })
731            })
732    }
733}
734
735#[allow(deprecated)] // TODO: remove once blokli#237 is merged
736#[cfg(test)]
737mod tests {
738    use std::ops::Sub;
739
740    use futures::{TryStreamExt, pin_mut};
741    use hopr_api::{
742        OffchainKeypair,
743        tickets::TicketManagement,
744        types::crypto::prelude::{ChainKeypair, Keypair},
745    };
746    use hopr_chain_connector::{
747        BlockchainConnectorConfig, HoprBlockchainConnector, InMemoryBackend, PayloadGenerator, SafePayloadGenerator,
748        reexports::chain::contract_addresses_for_network,
749        testing::{BlokliTestClient, BlokliTestStateBuilder, FullStateEmulator},
750    };
751    use rand::prelude::SliceRandom;
752
753    use super::*;
754    use crate::traits::tests::{generate_owned_tickets, generate_tickets};
755
756    fn create_mgr() -> anyhow::Result<HoprTicketManager<MemoryStore, MemoryTicketQueue>> {
757        Ok(HoprTicketManager::new(MemoryStore::default())?)
758    }
759
760    #[test]
761    fn ticket_manager_non_existing_channel_should_return_empty_stats() -> anyhow::Result<()> {
762        let mgr = create_mgr()?;
763
764        assert_eq!(ChannelStats::default(), mgr.ticket_stats(None)?);
765        assert_eq!(ChannelStats::default(), mgr.ticket_stats(Some(&ChannelId::default()))?);
766        Ok(())
767    }
768
769    #[test]
770    fn ticket_manager_should_create_multihop_tickets() -> anyhow::Result<()> {
771        let mgr = create_mgr()?;
772
773        let src = ChainKeypair::random();
774        let dst = ChainKeypair::random();
775
776        let channel = ChannelEntry::builder()
777            .between(&src, &dst)
778            .amount(10)
779            .ticket_index(1)
780            .status(ChannelStatus::Open)
781            .epoch(1)
782            .build()?;
783
784        // Loads index 1 which is the next index for a ticket on this channel
785        mgr.sync_from_outgoing_channels(&[channel])?;
786
787        let ticket_1 = mgr
788            .next_multihop_ticket(&channel, 2, WinningProbability::ALWAYS, 10.into())?
789            .eth_challenge(Default::default())
790            .build_signed(&src, &Default::default())?;
791
792        assert_eq!(ticket_1.channel_id(), channel.get_id());
793        assert_eq!(channel.ticket_index, ticket_1.verified_ticket().index);
794        assert_eq!(channel.channel_epoch, ticket_1.verified_ticket().channel_epoch);
795
796        let ticket_2 = mgr
797            .next_multihop_ticket(&channel, 2, WinningProbability::ALWAYS, 10.into())?
798            .eth_challenge(Default::default())
799            .build_signed(&src, &Default::default())?;
800
801        assert_eq!(ticket_2.channel_id(), channel.get_id());
802        assert_eq!(channel.ticket_index + 1, ticket_2.verified_ticket().index);
803        assert_eq!(channel.channel_epoch, ticket_2.verified_ticket().channel_epoch);
804
805        Ok(())
806    }
807
808    #[test]
809    fn ticket_manager_should_update_state_when_winning_tickets_are_inserted() -> anyhow::Result<()> {
810        let mgr = create_mgr()?;
811
812        let src = ChainKeypair::random();
813        let dst = ChainKeypair::random();
814
815        let channel = ChannelEntry::builder()
816            .between(&src, &dst)
817            .amount(10)
818            .ticket_index(1)
819            .status(ChannelStatus::Open)
820            .epoch(1)
821            .build()?;
822
823        let tickets = generate_owned_tickets(&src, &dst, 2, 1..=1)?;
824
825        mgr.insert_incoming_ticket(tickets[0])?;
826
827        assert_eq!(
828            ChannelStats {
829                winning_tickets: 1,
830                unredeemed_value: tickets[0].verified_ticket().amount,
831                rejected_value: HoprBalance::zero(),
832                redeemed_value: HoprBalance::zero(),
833                neglected_value: HoprBalance::zero(),
834            },
835            mgr.ticket_stats(Some(&channel.get_id()))?
836        );
837
838        mgr.insert_incoming_ticket(tickets[1])?;
839
840        assert_eq!(
841            ChannelStats {
842                winning_tickets: 2,
843                unredeemed_value: tickets[0].verified_ticket().amount + tickets[1].verified_ticket().amount,
844                rejected_value: HoprBalance::zero(),
845                redeemed_value: HoprBalance::zero(),
846                neglected_value: HoprBalance::zero(),
847            },
848            mgr.ticket_stats(Some(&channel.get_id()))?
849        );
850
851        Ok(())
852    }
853
854    #[test]
855    fn ticket_manager_create_multihop_ticket_should_fail_on_wrong_input() -> anyhow::Result<()> {
856        let mgr = create_mgr()?;
857
858        let src = ChainKeypair::random();
859        let dst = ChainKeypair::random();
860
861        let mut channel = ChannelEntry::builder()
862            .between(&src, &dst)
863            .amount(10)
864            .ticket_index(1)
865            .status(ChannelStatus::Closed)
866            .epoch(1)
867            .build()?;
868
869        assert!(
870            mgr.next_multihop_ticket(&channel, 2, WinningProbability::ALWAYS, 1.into())
871                .is_err()
872        );
873
874        channel.status =
875            ChannelStatus::PendingToClose(std::time::SystemTime::now() - std::time::Duration::from_secs(10));
876
877        assert!(
878            mgr.next_multihop_ticket(&channel, 2, WinningProbability::ALWAYS, 1.into())
879                .is_err()
880        );
881
882        channel.status = ChannelStatus::Open;
883
884        assert!(
885            mgr.next_multihop_ticket(&channel, 2, WinningProbability::ALWAYS, 11.into())
886                .is_err()
887        );
888
889        assert!(
890            mgr.next_multihop_ticket(&channel, 1, WinningProbability::ALWAYS, 1.into())
891                .is_err()
892        );
893
894        Ok(())
895    }
896
897    #[test]
898    fn ticket_manager_test_next_outgoing_ticket_index() -> anyhow::Result<()> {
899        let mgr = create_mgr()?;
900
901        let src = ChainKeypair::random();
902        let dst = ChainKeypair::random();
903
904        let mut channel = ChannelEntry::builder()
905            .between(&src, &dst)
906            .amount(10)
907            .ticket_index(0)
908            .status(ChannelStatus::Open)
909            .epoch(1)
910            .build()?;
911
912        assert_eq!(0, mgr.next_outgoing_ticket_index(&channel));
913
914        channel.ticket_index = 10;
915        assert_eq!(10, mgr.next_outgoing_ticket_index(&channel));
916        assert_eq!(11, mgr.next_outgoing_ticket_index(&channel));
917
918        channel.ticket_index = 100;
919        assert_eq!(100, mgr.next_outgoing_ticket_index(&channel));
920        assert_eq!(101, mgr.next_outgoing_ticket_index(&channel));
921
922        channel.ticket_index = 50;
923        assert_eq!(102, mgr.next_outgoing_ticket_index(&channel));
924        assert_eq!(103, mgr.next_outgoing_ticket_index(&channel));
925
926        mgr.save_outgoing_indices()?;
927        assert_eq!(Some(104), mgr.store.read().load_outgoing_index(channel.get_id(), 1)?);
928
929        channel.ticket_index = 0;
930        channel.channel_epoch = 2;
931
932        assert_eq!(0, mgr.next_outgoing_ticket_index(&channel));
933        mgr.save_outgoing_indices()?;
934
935        assert_eq!(None, mgr.store.read().load_outgoing_index(channel.get_id(), 1)?);
936        assert_eq!(Some(1), mgr.store.read().load_outgoing_index(channel.get_id(), 2)?);
937
938        assert_eq!(1, mgr.next_outgoing_ticket_index(&channel));
939
940        Ok(())
941    }
942
943    #[test]
944    fn ticket_manager_should_save_out_indices_to_the_store_on_demand() -> anyhow::Result<()> {
945        let mgr = create_mgr()?;
946
947        let src = ChainKeypair::random();
948        let dst = ChainKeypair::random();
949
950        let channel = ChannelEntry::builder()
951            .between(&src, &dst)
952            .amount(10)
953            .ticket_index(1)
954            .status(ChannelStatus::Open)
955            .epoch(1)
956            .build()?;
957
958        // Loads index 1 which is the next index for a ticket on this channel
959        mgr.sync_from_outgoing_channels(&[channel])?;
960
961        mgr.next_multihop_ticket(&channel, 2, WinningProbability::ALWAYS, 10.into())?;
962
963        // Without saving, the store index should not be present in store
964        let saved_index = mgr.store.read().load_outgoing_index(channel.get_id(), 1)?;
965        assert_eq!(None, saved_index);
966
967        mgr.next_multihop_ticket(&channel, 2, WinningProbability::ALWAYS, 10.into())?;
968
969        mgr.save_outgoing_indices()?;
970        let saved_index = mgr.store.read().load_outgoing_index(channel.get_id(), 1)?;
971        assert_eq!(Some(3), saved_index);
972
973        mgr.next_multihop_ticket(&channel, 2, WinningProbability::ALWAYS, 10.into())?;
974
975        let saved_index = mgr.store.read().load_outgoing_index(channel.get_id(), 1)?;
976        assert_eq!(Some(3), saved_index);
977
978        mgr.save_outgoing_indices()?;
979        let saved_index = mgr.store.read().load_outgoing_index(channel.get_id(), 1)?;
980        assert_eq!(Some(4), saved_index);
981
982        Ok(())
983    }
984
985    #[test]
986    fn ticket_manager_should_sync_out_indices_from_chain_state() -> anyhow::Result<()> {
987        let mgr = create_mgr()?;
988
989        let src = ChainKeypair::random();
990        let dst = ChainKeypair::random();
991
992        let channel = ChannelEntry::builder()
993            .between(&src, &dst)
994            .amount(10)
995            .ticket_index(1)
996            .status(ChannelStatus::Open)
997            .epoch(1)
998            .build()?;
999
1000        mgr.sync_from_outgoing_channels(&[channel])?;
1001        mgr.save_outgoing_indices()?;
1002
1003        let saved_index = mgr.store.read().load_outgoing_index(channel.get_id(), 1)?;
1004        assert_eq!(Some(1), saved_index);
1005
1006        Ok(())
1007    }
1008
1009    #[test_log::test]
1010    fn ticket_manager_should_sync_out_indices_should_remove_indices_for_non_opened_outgoing_channels()
1011    -> anyhow::Result<()> {
1012        let mgr = create_mgr()?;
1013
1014        let src = ChainKeypair::random();
1015        let dst = ChainKeypair::random();
1016
1017        let mut channel_1 = ChannelEntry::builder()
1018            .between(&src, &dst)
1019            .amount(10)
1020            .ticket_index(0)
1021            .status(ChannelStatus::Open)
1022            .epoch(1)
1023            .build()?;
1024
1025        let mut channel_2 = ChannelEntry::builder()
1026            .between(&dst, &src)
1027            .amount(10)
1028            .ticket_index(0)
1029            .status(ChannelStatus::Open)
1030            .epoch(1)
1031            .build()?;
1032
1033        let ticket_1 = mgr
1034            .next_multihop_ticket(&channel_1, 2, WinningProbability::ALWAYS, 10.into())?
1035            .eth_challenge(Default::default())
1036            .build()?;
1037        let ticket_2 = mgr
1038            .next_multihop_ticket(&channel_2, 2, WinningProbability::ALWAYS, 10.into())?
1039            .eth_challenge(Default::default())
1040            .build()?;
1041        assert_eq!(0, ticket_1.index);
1042        assert_eq!(0, ticket_2.index);
1043
1044        mgr.save_outgoing_indices()?;
1045
1046        assert_eq!(Some(1), mgr.store.read().load_outgoing_index(channel_1.get_id(), 1)?);
1047        assert_eq!(Some(1), mgr.store.read().load_outgoing_index(channel_2.get_id(), 1)?);
1048
1049        channel_1.status = ChannelStatus::Closed;
1050        channel_2.status =
1051            ChannelStatus::PendingToClose(std::time::SystemTime::now() - std::time::Duration::from_mins(10));
1052
1053        mgr.sync_from_outgoing_channels(&[channel_1, channel_2])?;
1054
1055        assert_eq!(None, mgr.store.read().load_outgoing_index(channel_1.get_id(), 1)?);
1056        assert_eq!(None, mgr.store.read().load_outgoing_index(channel_2.get_id(), 1)?);
1057
1058        Ok(())
1059    }
1060
1061    #[test]
1062    fn ticket_manager_should_sync_incoming_channels_from_chain_state() -> anyhow::Result<()> {
1063        let mgr = create_mgr()?;
1064
1065        let src = ChainKeypair::random();
1066        let dst = ChainKeypair::random();
1067
1068        let channel = ChannelEntry::builder()
1069            .between(&src, &dst)
1070            .amount(10)
1071            .ticket_index(1)
1072            .status(ChannelStatus::Open)
1073            .epoch(1)
1074            .build()?;
1075
1076        let neglected = mgr.sync_from_incoming_channels(&[channel])?;
1077        assert!(neglected.is_empty());
1078
1079        let queues = mgr.store.read().iter_queues()?.collect::<Vec<_>>();
1080        assert_eq!(vec![*channel.get_id()], queues);
1081
1082        Ok(())
1083    }
1084
1085    #[test]
1086    fn ticket_manager_should_neglect_tickets_from_closed_channels_on_sync() -> anyhow::Result<()> {
1087        let mgr = create_mgr()?;
1088
1089        let tickets = generate_tickets()?;
1090        let neglected = mgr.insert_incoming_ticket(tickets[0])?;
1091        assert!(neglected.is_empty());
1092
1093        let channel = ChannelEntry::builder()
1094            .between(
1095                *tickets[0].ticket.verified_issuer(),
1096                tickets[0].verified_ticket().counterparty,
1097            )
1098            .amount(10)
1099            .ticket_index(1)
1100            .status(ChannelStatus::Closed)
1101            .epoch(1)
1102            .build()?;
1103
1104        let neglected = mgr.sync_from_incoming_channels(&[channel])?;
1105        assert_eq!(1, neglected.len());
1106        assert_eq!(tickets[0].ticket, neglected[0]);
1107
1108        Ok(())
1109    }
1110
1111    #[test]
1112    fn ticket_manager_should_neglect_tickets_from_effectively_closed_channels_on_sync() -> anyhow::Result<()> {
1113        let mgr = create_mgr()?;
1114
1115        let tickets = generate_tickets()?;
1116        let neglected = mgr.insert_incoming_ticket(tickets[0])?;
1117        assert!(neglected.is_empty());
1118
1119        let channel = ChannelEntry::builder()
1120            .between(
1121                *tickets[0].ticket.verified_issuer(),
1122                tickets[0].verified_ticket().counterparty,
1123            )
1124            .amount(10)
1125            .ticket_index(1)
1126            .status(ChannelStatus::PendingToClose(
1127                std::time::SystemTime::now().sub(std::time::Duration::from_mins(10)),
1128            ))
1129            .epoch(1)
1130            .build()?;
1131
1132        let neglected = mgr.sync_from_incoming_channels(&[channel])?;
1133        assert_eq!(1, neglected.len());
1134        assert_eq!(tickets[0].ticket, neglected[0]);
1135
1136        Ok(())
1137    }
1138
1139    #[test]
1140    fn ticket_manager_should_neglect_tickets_from_non_existent_channels_on_sync() -> anyhow::Result<()> {
1141        let mgr = create_mgr()?;
1142
1143        let tickets = generate_tickets()?;
1144
1145        let neglected = mgr.insert_incoming_ticket(tickets[0])?;
1146        assert!(neglected.is_empty());
1147
1148        let neglected = mgr.sync_from_incoming_channels(&[])?;
1149        assert_eq!(1, neglected.len());
1150        assert_eq!(tickets[0].ticket, neglected[0]);
1151
1152        Ok(())
1153    }
1154
1155    #[test]
1156    fn ticket_manager_should_neglect_tickets_on_demand() -> anyhow::Result<()> {
1157        let mgr = create_mgr()?;
1158
1159        let tickets = generate_tickets()?;
1160        let epoch = tickets[0].ticket_id().epoch;
1161
1162        let tickets = tickets
1163            .into_iter()
1164            .filter(|t| t.verified_ticket().channel_epoch == epoch)
1165            .collect::<Vec<_>>();
1166
1167        let channel = ChannelEntry::builder()
1168            .between(
1169                *tickets[0].ticket.verified_issuer(),
1170                tickets[0].verified_ticket().counterparty,
1171            )
1172            .amount(10)
1173            .ticket_index(tickets.len() as u64)
1174            .status(ChannelStatus::Open)
1175            .epoch(1)
1176            .build()?;
1177
1178        for ticket in tickets.iter() {
1179            let neglected = mgr.insert_incoming_ticket(*ticket)?;
1180            assert!(neglected.is_empty());
1181        }
1182
1183        let neglected = mgr.sync_from_incoming_channels(&[channel])?;
1184        assert!(neglected.is_empty());
1185
1186        let unrealized_value = mgr
1187            .unrealized_value(channel.get_id(), None)?
1188            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1189        assert_eq!(
1190            tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1191            unrealized_value
1192        );
1193
1194        let neglected = mgr.neglect_tickets(&channel.get_id(), None)?;
1195        assert_eq!(tickets.iter().map(|t| t.ticket).collect::<Vec<_>>(), neglected);
1196
1197        let unrealized_value_after = mgr
1198            .unrealized_value(channel.get_id(), None)?
1199            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1200        assert_eq!(
1201            unrealized_value_after,
1202            unrealized_value
1203                - neglected
1204                    .iter()
1205                    .map(|t| t.verified_ticket().amount)
1206                    .sum::<HoprBalance>()
1207        );
1208
1209        assert_eq!(
1210            ChannelStats {
1211                winning_tickets: tickets.len() as u128,
1212                unredeemed_value: unrealized_value_after,
1213                rejected_value: HoprBalance::zero(),
1214                redeemed_value: HoprBalance::zero(),
1215                neglected_value: neglected.iter().map(|t| t.verified_ticket().amount).sum(),
1216            },
1217            mgr.ticket_stats(Some(&channel.get_id()))?
1218        );
1219
1220        Ok(())
1221    }
1222
1223    #[test]
1224    fn ticket_manager_should_neglect_tickets_on_demand_with_upper_limit_on_index() -> anyhow::Result<()> {
1225        let mgr = create_mgr()?;
1226
1227        let tickets = generate_tickets()?;
1228        let epoch = tickets[0].ticket_id().epoch;
1229
1230        let tickets = tickets
1231            .into_iter()
1232            .filter(|t| t.verified_ticket().channel_epoch == epoch)
1233            .collect::<Vec<_>>();
1234
1235        let channel = ChannelEntry::builder()
1236            .between(
1237                *tickets[0].ticket.verified_issuer(),
1238                tickets[0].verified_ticket().counterparty,
1239            )
1240            .amount(10)
1241            .ticket_index(tickets.len() as u64)
1242            .status(ChannelStatus::Open)
1243            .epoch(1)
1244            .build()?;
1245
1246        for ticket in tickets.iter() {
1247            let neglected = mgr.insert_incoming_ticket(*ticket)?;
1248            assert!(neglected.is_empty());
1249        }
1250
1251        let neglected = mgr.sync_from_incoming_channels(&[channel])?;
1252        assert!(neglected.is_empty());
1253
1254        let unrealized_value = mgr
1255            .unrealized_value(channel.get_id(), None)?
1256            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1257        assert_eq!(
1258            tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1259            unrealized_value
1260        );
1261
1262        let neglected = mgr.neglect_tickets(&channel.get_id(), Some(3))?;
1263        assert_eq!(
1264            tickets
1265                .iter()
1266                .filter(|t| t.verified_ticket().index <= 3)
1267                .map(|t| t.ticket)
1268                .collect::<Vec<_>>(),
1269            neglected
1270        );
1271
1272        let unrealized_value_after = mgr
1273            .unrealized_value(channel.get_id(), None)?
1274            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1275        assert_eq!(
1276            unrealized_value_after,
1277            unrealized_value
1278                - neglected
1279                    .iter()
1280                    .map(|t| t.verified_ticket().amount)
1281                    .sum::<HoprBalance>()
1282        );
1283
1284        assert_eq!(
1285            ChannelStats {
1286                winning_tickets: tickets.len() as u128,
1287                unredeemed_value: unrealized_value_after,
1288                rejected_value: HoprBalance::zero(),
1289                redeemed_value: HoprBalance::zero(),
1290                neglected_value: neglected.iter().map(|t| t.verified_ticket().amount).sum(),
1291            },
1292            mgr.ticket_stats(Some(&channel.get_id()))?
1293        );
1294
1295        Ok(())
1296    }
1297
1298    #[test]
1299    fn ticket_manager_unrealized_value_should_increase_when_tickets_are_added() -> anyhow::Result<()> {
1300        let mgr = create_mgr()?;
1301
1302        let mut tickets = generate_tickets()?;
1303        let channel_id = tickets[0].ticket_id().id;
1304        let epoch = tickets[0].ticket_id().epoch;
1305        tickets.retain(|ticket| ticket.verified_ticket().channel_epoch == epoch);
1306
1307        assert!(!tickets.is_empty());
1308
1309        assert!(matches!(mgr.unrealized_value(&channel_id, None), Ok(None)));
1310
1311        let mut last_unrealized_value = HoprBalance::zero();
1312        assert_eq!(HoprBalance::zero(), last_unrealized_value);
1313
1314        for ticket in tickets.iter() {
1315            let neglected = mgr.insert_incoming_ticket(*ticket)?;
1316            assert!(neglected.is_empty());
1317
1318            let new_unrealized_value = mgr
1319                .unrealized_value(&channel_id, None)?
1320                .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1321            assert_eq!(
1322                new_unrealized_value - last_unrealized_value,
1323                ticket.verified_ticket().amount
1324            );
1325
1326            last_unrealized_value = new_unrealized_value;
1327        }
1328
1329        let expected_unrealized_value: HoprBalance = tickets.iter().map(|ticket| ticket.verified_ticket().amount).sum();
1330        assert_eq!(expected_unrealized_value, last_unrealized_value);
1331
1332        assert_eq!(
1333            ChannelStats {
1334                winning_tickets: tickets.len() as u128,
1335                unredeemed_value: expected_unrealized_value,
1336                rejected_value: HoprBalance::zero(),
1337                redeemed_value: HoprBalance::zero(),
1338                neglected_value: HoprBalance::zero(),
1339            },
1340            mgr.ticket_stats(Some(&tickets[0].ticket.channel_id()))?
1341        );
1342
1343        Ok(())
1344    }
1345
1346    #[test]
1347    fn ticket_manager_inserted_ticket_with_older_epoch_should_be_neglected() -> anyhow::Result<()> {
1348        let mgr = create_mgr()?;
1349
1350        let tickets = generate_tickets()?;
1351        assert!(!tickets.is_empty());
1352        let channel_id = tickets[0].ticket_id().id;
1353
1354        let tickets_from_epoch_1 = tickets
1355            .iter()
1356            .filter(|ticket| ticket.verified_ticket().channel_epoch == 1)
1357            .cloned()
1358            .collect::<Vec<_>>();
1359        assert!(!tickets_from_epoch_1.is_empty());
1360
1361        let tickets_from_epoch_2 = tickets
1362            .iter()
1363            .filter(|ticket| ticket.verified_ticket().channel_epoch == 2)
1364            .cloned()
1365            .collect::<Vec<_>>();
1366        assert!(!tickets_from_epoch_2.is_empty());
1367
1368        for new_ticket in &tickets_from_epoch_2 {
1369            let neglected = mgr.insert_incoming_ticket(*new_ticket)?;
1370            assert!(neglected.is_empty());
1371        }
1372
1373        for old_ticket in &tickets_from_epoch_1 {
1374            let neglected = mgr.insert_incoming_ticket(*old_ticket)?;
1375            assert_eq!(vec![old_ticket.ticket], neglected);
1376        }
1377
1378        let stats = mgr.ticket_stats(Some(&channel_id))?;
1379
1380        assert_eq!(
1381            (tickets_from_epoch_1.len() + tickets_from_epoch_2.len()) as u128,
1382            stats.winning_tickets
1383        );
1384        assert_eq!(
1385            tickets_from_epoch_2
1386                .iter()
1387                .map(|t| t.verified_ticket().amount)
1388                .sum::<HoprBalance>(),
1389            stats.unredeemed_value
1390        );
1391        assert_eq!(HoprBalance::zero(), stats.rejected_value);
1392        assert_eq!(HoprBalance::zero(), stats.redeemed_value);
1393
1394        Ok(())
1395    }
1396
1397    #[test]
1398    fn ticket_manager_ticket_insertion_should_neglect_tickets_from_previous_epochs() -> anyhow::Result<()> {
1399        let mgr = create_mgr()?;
1400
1401        let tickets = generate_tickets()?;
1402        assert!(!tickets.is_empty());
1403        let channel_id = tickets[0].ticket_id().id;
1404
1405        let tickets_from_epoch_1 = tickets
1406            .iter()
1407            .filter(|ticket| ticket.verified_ticket().channel_epoch == 1)
1408            .cloned()
1409            .collect::<Vec<_>>();
1410        assert!(!tickets_from_epoch_1.is_empty());
1411
1412        let tickets_from_epoch_2 = tickets
1413            .iter()
1414            .filter(|ticket| ticket.verified_ticket().channel_epoch == 2)
1415            .cloned()
1416            .collect::<Vec<_>>();
1417        assert!(!tickets_from_epoch_2.is_empty());
1418
1419        assert!(matches!(mgr.unrealized_value(&channel_id, None), Ok(None)));
1420
1421        for ticket in tickets_from_epoch_1.iter() {
1422            let neglected = mgr.insert_incoming_ticket(*ticket)?;
1423            assert!(neglected.is_empty());
1424        }
1425
1426        let new_unrealized_value = mgr
1427            .unrealized_value(&channel_id, None)?
1428            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1429        assert_eq!(
1430            new_unrealized_value,
1431            tickets_from_epoch_1
1432                .iter()
1433                .map(|ticket| ticket.verified_ticket().amount)
1434                .sum()
1435        );
1436
1437        let neglected = mgr.insert_incoming_ticket(tickets_from_epoch_2[0].clone())?;
1438        assert_eq!(
1439            tickets_from_epoch_1.iter().map(|t| t.ticket).collect::<Vec<_>>(),
1440            neglected
1441        );
1442
1443        // There's now only 1 ticket from epoch 2
1444        let new_unrealized_value = mgr
1445            .unrealized_value(&channel_id, None)?
1446            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1447        assert_eq!(tickets_from_epoch_2[0].verified_ticket().amount, new_unrealized_value);
1448
1449        assert_eq!(
1450            ChannelStats {
1451                winning_tickets: tickets_from_epoch_1.len() as u128 + 1,
1452                unredeemed_value: new_unrealized_value,
1453                rejected_value: HoprBalance::zero(),
1454                redeemed_value: HoprBalance::zero(),
1455                neglected_value: neglected.iter().map(|t| t.verified_ticket().amount).sum(),
1456            },
1457            mgr.ticket_stats(Some(&channel_id))?
1458        );
1459
1460        let queue_tickets = mgr
1461            .store
1462            .write()
1463            .open_or_create_queue(&channel_id)?
1464            .iter_unordered()?
1465            .collect::<Result<Vec<_>, _>>()?;
1466        assert_eq!(1, queue_tickets.len());
1467        assert_eq!(
1468            tickets_from_epoch_2[0].verified_ticket(),
1469            queue_tickets[0].verified_ticket()
1470        );
1471
1472        Ok(())
1473    }
1474
1475    pub type TestConnector = HoprBlockchainConnector<
1476        BlokliTestClient<FullStateEmulator>,
1477        InMemoryBackend,
1478        SafePayloadGenerator,
1479        <SafePayloadGenerator as PayloadGenerator>::TxRequest,
1480    >;
1481
1482    async fn create_test_connector(
1483        private_key: &ChainKeypair,
1484        channel: &ChannelEntry,
1485        tx_sim_delay: Option<std::time::Duration>,
1486    ) -> anyhow::Result<TestConnector> {
1487        let module_addr: [u8; 20] = [1; 20];
1488        // We need to be channel destination because we'll be redeeming tickets
1489        assert_eq!(private_key.public().to_address(), channel.destination);
1490
1491        let blokli_client = BlokliTestStateBuilder::default()
1492            .with_balances([(private_key.public().to_address(), XDaiBalance::new_base(1))])
1493            .with_accounts([
1494                (
1495                    AccountEntry {
1496                        public_key: *OffchainKeypair::random().public(),
1497                        chain_addr: private_key.public().to_address(),
1498                        entry_type: AccountType::NotAnnounced,
1499                        safe_address: None,
1500                        key_id: 1.into(),
1501                    },
1502                    HoprBalance::new_base(1000),
1503                    XDaiBalance::new_base(1),
1504                ),
1505                (
1506                    AccountEntry {
1507                        public_key: *OffchainKeypair::random().public(),
1508                        chain_addr: channel.source,
1509                        entry_type: AccountType::NotAnnounced,
1510                        safe_address: None,
1511                        key_id: 2.into(),
1512                    },
1513                    HoprBalance::new_base(1000),
1514                    XDaiBalance::new_base(1),
1515                ),
1516            ])
1517            .with_channels([*channel])
1518            .with_hopr_network_chain_info("rotsee")
1519            .build_dynamic_client(module_addr.into())
1520            .with_tx_simulation_delay(tx_sim_delay.unwrap_or(std::time::Duration::from_millis(500)));
1521
1522        let mut connector = TestConnector::new(
1523            private_key.clone(),
1524            BlockchainConnectorConfig::default(),
1525            blokli_client,
1526            InMemoryBackend::default(),
1527            SafePayloadGenerator::new(
1528                &private_key,
1529                contract_addresses_for_network("rotsee").unwrap().1,
1530                module_addr.into(),
1531            ),
1532        );
1533        connector.connect().await?;
1534
1535        Ok(connector)
1536    }
1537
1538    #[test_log::test(tokio::test)]
1539    async fn ticket_manager_should_redeem_tickets_on_demand() -> anyhow::Result<()> {
1540        let mgr = create_mgr()?;
1541
1542        let src = ChainKeypair::random();
1543        let dst = ChainKeypair::random();
1544
1545        let channel = ChannelEntry::builder()
1546            .between(&src, &dst)
1547            .amount(10_000_000_000_u64)
1548            .ticket_index(0)
1549            .status(ChannelStatus::Open)
1550            .epoch(1)
1551            .build()?;
1552
1553        let mut tickets = generate_owned_tickets(&src, &dst, 3, 1..=1)?;
1554        tickets.shuffle(&mut rand::rng());
1555
1556        for ticket in tickets.iter() {
1557            assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1558        }
1559
1560        tickets.sort();
1561
1562        let mut unrealized_value = mgr
1563            .unrealized_value(channel.get_id(), None)?
1564            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1565        assert_eq!(
1566            tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1567            unrealized_value
1568        );
1569
1570        let connector = create_test_connector(&dst, &channel, None).await?;
1571
1572        let stream = mgr.redeem_stream(connector, *channel.get_id(), None)?;
1573
1574        pin_mut!(stream);
1575
1576        assert_eq!(
1577            Some(RedemptionResult::Redeemed(tickets[0].ticket)),
1578            stream.try_next().await?
1579        );
1580        assert_eq!(
1581            mgr.unrealized_value(channel.get_id(), None)?
1582                .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1583            unrealized_value - tickets[0].verified_ticket().amount
1584        );
1585        unrealized_value = mgr
1586            .unrealized_value(channel.get_id(), None)?
1587            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1588
1589        assert_eq!(
1590            Some(RedemptionResult::Redeemed(tickets[1].ticket)),
1591            stream.try_next().await?
1592        );
1593        assert_eq!(
1594            mgr.unrealized_value(channel.get_id(), None)?
1595                .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1596            unrealized_value - tickets[1].verified_ticket().amount
1597        );
1598        unrealized_value = mgr
1599            .unrealized_value(channel.get_id(), None)?
1600            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1601
1602        assert_eq!(
1603            Some(RedemptionResult::Redeemed(tickets[2].ticket)),
1604            stream.try_next().await?
1605        );
1606        assert_eq!(
1607            mgr.unrealized_value(channel.get_id(), None)?
1608                .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1609            unrealized_value - tickets[2].verified_ticket().amount
1610        );
1611
1612        assert_eq!(None, stream.try_next().await?);
1613
1614        Ok(())
1615    }
1616
1617    #[tokio::test]
1618    async fn ticket_manager_should_not_allow_concurrent_redemptions_on_the_same_channel() -> anyhow::Result<()> {
1619        let mgr = create_mgr()?;
1620
1621        let src = ChainKeypair::random();
1622        let dst = ChainKeypair::random();
1623
1624        let channel = ChannelEntry::builder()
1625            .between(&src, &dst)
1626            .amount(10_000_000_000_u64)
1627            .ticket_index(0)
1628            .status(ChannelStatus::Open)
1629            .epoch(1)
1630            .build()?;
1631
1632        let mut tickets = generate_owned_tickets(&src, &dst, 3, 1..=1)?;
1633        tickets.shuffle(&mut rand::rng());
1634
1635        for ticket in tickets.iter() {
1636            assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1637        }
1638
1639        let connector = std::sync::Arc::new(create_test_connector(&dst, &channel, None).await?);
1640
1641        let stream = mgr.redeem_stream(connector.clone(), *channel.get_id(), None)?;
1642
1643        assert!(mgr.redeem_stream(connector.clone(), *channel.get_id(), None).is_err());
1644
1645        drop(stream);
1646
1647        assert!(mgr.redeem_stream(connector.clone(), *channel.get_id(), None).is_ok());
1648
1649        Ok(())
1650    }
1651
1652    #[tokio::test]
1653    async fn ticket_manager_ticket_neglection_should_cut_ongoing_redemption_short() -> anyhow::Result<()> {
1654        let mgr = create_mgr()?;
1655
1656        let src = ChainKeypair::random();
1657        let dst = ChainKeypair::random();
1658
1659        let channel = ChannelEntry::builder()
1660            .between(&src, &dst)
1661            .amount(10_000_000_000_u64)
1662            .ticket_index(0)
1663            .status(ChannelStatus::Open)
1664            .epoch(1)
1665            .build()?;
1666
1667        let mut tickets = generate_owned_tickets(&src, &dst, 3, 1..=1)?;
1668        tickets.shuffle(&mut rand::rng());
1669
1670        for ticket in tickets.iter() {
1671            assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1672        }
1673
1674        tickets.sort();
1675
1676        let unrealized_value = mgr
1677            .unrealized_value(channel.get_id(), None)?
1678            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1679        assert_eq!(
1680            tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1681            unrealized_value
1682        );
1683
1684        let connector = std::sync::Arc::new(create_test_connector(&dst, &channel, None).await?);
1685
1686        let stream = mgr.redeem_stream(connector.clone(), *channel.get_id(), None)?;
1687        pin_mut!(stream);
1688
1689        assert_eq!(
1690            Some(RedemptionResult::Redeemed(tickets[0].ticket)),
1691            stream.try_next().await?
1692        );
1693        assert_eq!(
1694            mgr.unrealized_value(channel.get_id(), None)?
1695                .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1696            unrealized_value - tickets[0].verified_ticket().amount
1697        );
1698
1699        let neglected = mgr.neglect_tickets(&channel.get_id(), None)?;
1700        assert_eq!(
1701            tickets.into_iter().skip(1).map(|t| t.ticket).collect::<Vec<_>>(),
1702            neglected
1703        );
1704        assert_eq!(
1705            HoprBalance::zero(),
1706            mgr.unrealized_value(channel.get_id(), None)?
1707                .ok_or(anyhow::anyhow!("must have unrealized value"))?
1708        );
1709
1710        assert_eq!(None, stream.try_next().await?);
1711
1712        Ok(())
1713    }
1714
1715    #[tokio::test]
1716    async fn ticket_manager_partial_ticket_neglection_should_cut_ongoing_redemption_short() -> anyhow::Result<()> {
1717        let mgr = create_mgr()?;
1718
1719        let src = ChainKeypair::random();
1720        let dst = ChainKeypair::random();
1721
1722        let channel = ChannelEntry::builder()
1723            .between(&src, &dst)
1724            .amount(10_000_000_000_u64)
1725            .ticket_index(0)
1726            .status(ChannelStatus::Open)
1727            .epoch(1)
1728            .build()?;
1729
1730        let mut tickets = generate_owned_tickets(&src, &dst, 5, 1..=1)?;
1731        tickets.shuffle(&mut rand::rng());
1732
1733        for ticket in tickets.iter() {
1734            assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1735        }
1736
1737        tickets.sort();
1738
1739        let mut unrealized_value = mgr
1740            .unrealized_value(channel.get_id(), None)?
1741            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1742        assert_eq!(
1743            tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1744            unrealized_value
1745        );
1746
1747        let connector = std::sync::Arc::new(create_test_connector(&dst, &channel, None).await?);
1748
1749        let stream = mgr.redeem_stream(connector.clone(), *channel.get_id(), None)?;
1750        pin_mut!(stream);
1751
1752        // Ticket with index 0 gets redeemed
1753        assert_eq!(
1754            Some(RedemptionResult::Redeemed(tickets[0].ticket)),
1755            stream.try_next().await?
1756        );
1757        assert_eq!(
1758            mgr.unrealized_value(channel.get_id(), None)?
1759                .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1760            unrealized_value - tickets[0].verified_ticket().amount
1761        );
1762        unrealized_value = mgr
1763            .unrealized_value(channel.get_id(), None)?
1764            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1765
1766        // Tickets with index 1,2 and 3 get neglected
1767        let neglected = mgr.neglect_tickets(&channel.get_id(), Some(tickets[3].verified_ticket().index))?;
1768        assert_eq!(
1769            tickets.iter().skip(1).take(3).map(|t| t.ticket).collect::<Vec<_>>(),
1770            neglected
1771        );
1772        assert_eq!(
1773            unrealized_value
1774                - neglected
1775                    .into_iter()
1776                    .map(|t| t.verified_ticket().amount)
1777                    .sum::<HoprBalance>(),
1778            mgr.unrealized_value(channel.get_id(), None)?
1779                .ok_or(anyhow::anyhow!("must have unrealized value"))?
1780        );
1781
1782        // The last ticket with index 4 gets redeemed again
1783        assert_eq!(
1784            Some(RedemptionResult::Redeemed(tickets[4].ticket)),
1785            stream.try_next().await?
1786        );
1787
1788        assert_eq!(
1789            HoprBalance::zero(),
1790            mgr.unrealized_value(channel.get_id(), None)?
1791                .ok_or(anyhow::anyhow!("must have unrealized value"))?
1792        );
1793
1794        assert_eq!(None, stream.try_next().await?);
1795
1796        Ok(())
1797    }
1798
1799    #[tokio::test]
1800    async fn ticket_manager_ticket_neglection_during_on_chain_redemption_should_be_detected() -> anyhow::Result<()> {
1801        let mgr = std::sync::Arc::new(create_mgr()?);
1802
1803        let src = ChainKeypair::random();
1804        let dst = ChainKeypair::random();
1805
1806        let channel = ChannelEntry::builder()
1807            .between(&src, &dst)
1808            .amount(10_000_000_000_u64)
1809            .ticket_index(0)
1810            .status(ChannelStatus::Open)
1811            .epoch(1)
1812            .build()?;
1813
1814        let mut tickets = generate_owned_tickets(&src, &dst, 5, 1..=1)?;
1815        tickets.shuffle(&mut rand::rng());
1816
1817        for ticket in tickets.iter() {
1818            assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1819        }
1820
1821        tickets.sort();
1822
1823        let connector =
1824            std::sync::Arc::new(create_test_connector(&dst, &channel, Some(std::time::Duration::from_secs(2))).await?);
1825
1826        let mgr_clone = mgr.clone();
1827        let jh = tokio::task::spawn(async move {
1828            let stream = mgr_clone.redeem_stream(connector.clone(), *channel.get_id(), None)?;
1829            pin_mut!(stream);
1830            stream.try_next().await
1831        });
1832        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1833
1834        // All the tickets will appear as neglected
1835        let neglected = mgr.neglect_tickets(&channel.get_id(), None)?;
1836        assert_eq!(neglected, tickets.iter().map(|t| t.ticket).collect::<Vec<_>>());
1837
1838        assert_eq!(
1839            ChannelStats {
1840                winning_tickets: tickets.len() as u128,
1841                unredeemed_value: HoprBalance::zero(),
1842                rejected_value: HoprBalance::zero(),
1843                redeemed_value: HoprBalance::zero(),
1844                neglected_value: neglected.iter().map(|t| t.verified_ticket().amount).sum(),
1845            },
1846            mgr.ticket_stats(Some(&channel.get_id()))?
1847        );
1848
1849        // Once redemption completes we should see the tickets as redeemed
1850        let res = jh.await??;
1851        assert_eq!(Some(RedemptionResult::Redeemed(tickets[0].ticket)), res);
1852
1853        assert_eq!(
1854            ChannelStats {
1855                winning_tickets: tickets.len() as u128,
1856                unredeemed_value: HoprBalance::zero(),
1857                rejected_value: HoprBalance::zero(),
1858                redeemed_value: tickets[0].verified_ticket().amount,
1859                neglected_value: neglected
1860                    .iter()
1861                    .map(|t| t.verified_ticket().amount)
1862                    .sum::<HoprBalance>()
1863                    - tickets[0].verified_ticket().amount,
1864            },
1865            mgr.ticket_stats(Some(&channel.get_id()))?
1866        );
1867
1868        Ok(())
1869    }
1870
1871    #[tokio::test]
1872    async fn ticket_manager_ticket_redemption_should_skip_low_value_tickets() -> anyhow::Result<()> {
1873        let mgr = create_mgr()?;
1874
1875        let src = ChainKeypair::random();
1876        let dst = ChainKeypair::random();
1877
1878        let channel = ChannelEntry::builder()
1879            .between(&src, &dst)
1880            .amount(10_000_000_000_u64)
1881            .ticket_index(0)
1882            .status(ChannelStatus::Open)
1883            .epoch(1)
1884            .build()?;
1885
1886        let mut tickets = generate_owned_tickets(&src, &dst, 5, 1..=1)?;
1887        tickets.shuffle(&mut rand::rng());
1888
1889        for ticket in tickets.iter() {
1890            assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1891        }
1892
1893        tickets.sort();
1894
1895        let unrealized_value = mgr
1896            .unrealized_value(channel.get_id(), None)?
1897            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1898        assert_eq!(
1899            tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1900            unrealized_value
1901        );
1902
1903        let connector = std::sync::Arc::new(create_test_connector(&dst, &channel, None).await?);
1904
1905        let results = mgr
1906            .redeem_stream(
1907                connector.clone(),
1908                *channel.get_id(),
1909                Some(tickets[0].verified_ticket().amount + 1),
1910            )?
1911            .try_collect::<Vec<_>>()
1912            .await?;
1913
1914        assert_eq!(
1915            results,
1916            tickets
1917                .into_iter()
1918                .map(|t| RedemptionResult::ValueTooLow(t.ticket))
1919                .collect::<Vec<_>>()
1920        );
1921
1922        Ok(())
1923    }
1924}