Skip to main content

hopr_ticket_manager/
manager.rs

1use std::{convert::identity, sync::atomic::AtomicBool};
2
3use futures::{Stream, TryFutureExt};
4use hopr_api::{
5    chain::{ChainWriteTicketOperations, TicketRedeemError},
6    tickets::{ChannelStats, RedemptionResult},
7    types::{internal::prelude::*, primitive::prelude::*},
8};
9
10use crate::{
11    backend::ValueCachedQueue,
12    errors::TicketManagerError,
13    factory::HoprTicketFactory,
14    traits::{OutgoingIndexStore, TicketQueue, TicketQueueStore},
15    utils::{CachedQueueMap, UnrealizedValue},
16};
17
18/// Keeps track of incoming redeemable tickets and provides ticket redemption and neglection operations.
19///
20/// To synchronize the on-chain state with the store, it is advised to call
21/// [`sync_incoming_channels`](HoprTicketManager::sync_from_incoming_channels) early
22/// after the construction of the manager, to make sure outdated data is discarded early. This is typically done only
23/// once after construction and not needed to be done during the life-time of the manager.
24///
25/// The manager is safe to be shared via an `Arc`.
26///
27/// This usage is for Relay nodes only because other types of nodes do not need to keep track of incoming redeemable
28/// tickets.
29///
30/// ### Usage in incoming packet pipeline
31/// The incoming packet pipeline usually just calls the
32/// [`insert_incoming_ticket`](hopr_api::tickets::TicketManagement::insert_incoming_ticket) whenever a new winning,
33/// redeemable ticket is received on an incoming channel.
34///
35/// ### Redeemable ticket extraction
36/// On Relay nodes, the manager maintains FIFO queues of redeemable tickets per incoming channel.
37/// There are two ways to extract tickets from the queue on a Relay:
38///
39/// 1. redeeming them via [`redeem_stream`](hopr_api::tickets::TicketManagement::redeem_stream)
40/// 2. neglecting them via [`neglect_tickets`](hopr_api::tickets::TicketManagement::neglect_tickets)
41///
42/// Both of these operations extract the tickets in the FIFO order from the queue,
43/// making sure that they are always processed in their natural order (by epoch and index).
44///
45/// Both ticket extraction operations are mutually exclusive and cannot be performed simultaneously.
46///
47/// ## Locking and lock-contention
48/// There are several methods in the `HoprTicketManager` object that are expected to be called
49/// in the highly performance-sensitive code, on a per-packet basis.
50///
51/// ### Incoming winning ticket retrieval
52/// The [`insert_incoming_ticket`](hopr_api::tickets::TicketManagement::insert_incoming_ticket) method is designed to be
53/// high-performance and to be called per each incoming packet **after** it has been forwarded to a next hop.
54///
55/// This operation acquires the write-part of an RW lock (per incoming channel).
56/// This may block the hot-path only if one of the following (also write) operations is performed:
57///     1. Ticket redemption has just finished in that particular channel, and the redeemed ticket is dropped from the
58///     same incoming channel queue.
59///     2. Ticket neglection has just finished in that particular channel, and the neglected ticket is dropped from the
60///     same incoming channel queue.
61///
62/// Both of these operations happen rarely, and the write lock is usually held only for a short time. In addition,
63/// incoming winning tickets are not supposed to usually happen very often. Therefore, high contention on
64/// the write lock is not expected.
65///
66/// ### Incoming unacknowledged ticket verification
67/// The [`unrealized_value`](HoprTicketManager::unrealized_value) method is designed to be high-performance
68/// and to be called per each incoming packet **before** it is forwarded to a next hop.
69///
70/// This operation acquires the read-part of an RW lock (per incoming channel). This may block the hot-path only if
71/// one of the following (write) operations is performed at the same moment:
72///     1. A new incoming winning ticket is inserted into the same incoming channel queue.
73///     2. Ticket redemption has just finished in that particular channel, and the redeemed ticket is dropped from the
74///     same incoming channel queue.
75///     3. Ticket neglection has just finished in that particular channel, and the neglected ticket is dropped from the
76///     same incoming channel queue.
77///
78/// All 3 of these operations are not expected to happen very often on a single channel; therefore, high contention
79/// on the RW lock is not expected.
80#[derive(Debug)]
81pub struct HoprTicketManager<S, Q> {
82    channel_tickets: std::sync::Arc<CachedQueueMap<Q>>,
83    store: std::sync::Arc<parking_lot::RwLock<S>>,
84}
85
86impl<S> HoprTicketManager<S, S::Queue>
87where
88    S: OutgoingIndexStore + TicketQueueStore + 'static,
89    S::Queue: Send + Sync + 'static,
90{
91    /// Creates the ticket manager in a pair with [`HoprTicketFactory`], both backed by the given `store`.
92    ///
93    /// The `store` must be [`OutgoingIndexStore`] and [`TicketQueueStore`].
94    pub fn new_with_factory(store: S) -> (Self, HoprTicketFactory<S>) {
95        let store = std::sync::Arc::new(parking_lot::RwLock::new(store));
96        let channel_tickets = std::sync::Arc::new(CachedQueueMap::<S::Queue>::default());
97        let factory = HoprTicketFactory::new_shared(store.clone(), std::sync::Arc::downgrade(&channel_tickets));
98
99        (HoprTicketManager { store, channel_tickets }, factory)
100    }
101}
102
103struct RedeemState<C, Q> {
104    lock: std::sync::Arc<AtomicBool>,
105    queue: std::sync::Arc<parking_lot::RwLock<Q>>,
106    chain: C,
107    min_redeem_value: HoprBalance,
108    channel_id: ChannelId,
109}
110
111impl<C, Q> Drop for RedeemState<C, Q> {
112    fn drop(&mut self) {
113        self.lock.store(false, std::sync::atomic::Ordering::Release);
114    }
115}
116
117impl<S> HoprTicketManager<S, S::Queue>
118where
119    S: TicketQueueStore + Send + Sync + 'static,
120    S::Queue: Send + Sync + 'static,
121{
122    /// Synchronizes the existing incoming redeemable ticket queues with the state of the
123    /// current `incoming_channels`.
124    ///
125    /// Any incoming ticket queues that correspond to a channel that is no longer open or effectively open (in
126    /// `incoming_channels`) will be dropped and the tickets neglected.
127    ///
128    /// For all opened or effectively opened incoming channels inside `incoming_channels`, either an existing
129    /// ticket queue is opened or a new one is created (without any tickets in it).
130    ///
131    /// If there are any unredeemable tickets in the existing queues (with an older epoch or lower index than the
132    /// current index), they are neglected as well.
133    ///
134    /// All the neglected tickets are returned from the function to make further accounting possible,
135    /// but they are no longer redeemable.
136    ///
137    /// It is advised to call this function early after the construction of the `HoprTicketManager`
138    /// to ensure pruning of dangling or out-of-date values.
139    pub fn sync_from_incoming_channels(
140        &self,
141        incoming_channels: &[ChannelEntry],
142    ) -> Result<Vec<VerifiedTicket>, TicketManagerError> {
143        let incoming_channels: std::collections::HashSet<_, std::hash::RandomState> =
144            incoming_channels.iter().collect();
145
146        // Purge outdated incoming channel queues
147        let mut store_read = self.store.upgradable_read();
148        let stored_queues = store_read
149            .iter_queues()
150            .map_err(TicketManagerError::store)?
151            .collect::<Vec<_>>();
152        let mut neglected = Vec::new();
153        let now = hopr_platform::time::current_time();
154        for channel_id in stored_queues {
155            // If any existing redeemable ticket queue does not match any currently existing
156            // channel that's either open or its closure period did not yet elapse (i.e., the channel
157            // is not closed or not effectively closed), remove the queue from the store.
158            if !incoming_channels
159                .iter()
160                .any(|channel| !channel.closure_time_passed(now) && channel.get_id() == &channel_id)
161            {
162                let mut store_write = parking_lot::RwLockUpgradableReadGuard::upgrade(store_read);
163                neglected.extend(
164                    store_write
165                        .delete_queue(&channel_id)
166                        .map_err(TicketManagerError::store)?,
167                );
168                tracing::debug!(%channel_id, "purged outdated incoming tickets queue");
169                store_read = parking_lot::RwLockWriteGuard::downgrade_to_upgradable(store_write);
170
171                // We cannot account the neglected tickets, because the channel has been closed.
172                self.channel_tickets.0.remove(&channel_id);
173            }
174        }
175        // Create or open ticket queues for all incoming channels that are open or effectively open
176        for channel in incoming_channels
177            .iter()
178            .filter(|channel| !channel.closure_time_passed(now))
179        {
180            let id = channel.get_id();
181
182            // Either open an existing queue for that channel or create a new one
183            let mut store_write = parking_lot::RwLockUpgradableReadGuard::upgrade(store_read);
184            let mut queue = store_write
185                .open_or_create_queue(id)
186                .map_err(TicketManagerError::store)?;
187
188            // Clean up the queue from tickets which are unredeemable (old epoch or lower than current index).
189            while queue
190                .peek()
191                .map_err(TicketManagerError::store)?
192                .filter(|ticket| {
193                    ticket.verified_ticket().channel_epoch < channel.channel_epoch
194                        || ticket.verified_ticket().index < channel.ticket_index
195                })
196                .is_some()
197            {
198                neglected.extend(queue.pop().map_err(TicketManagerError::store)?.map(|t| t.ticket));
199            }
200
201            store_read = parking_lot::RwLockWriteGuard::downgrade_to_upgradable(store_write);
202
203            // Wrap the queue with a ticket value cache adapter
204            let queue = ValueCachedQueue::new(queue).map_err(TicketManagerError::store)?;
205
206            tracing::debug!(%id, num_tickets = queue.len().map_err(TicketManagerError::store)?, "loaded redeemable ticket queue for channel");
207            self.channel_tickets.0.insert(*id, queue.into());
208        }
209
210        tracing::debug!(
211            num_channels = incoming_channels.len(),
212            num_neglected = neglected.len(),
213            "synchronized with incoming channels"
214        );
215        Ok(neglected)
216    }
217
218    /// Inserts a new incoming winning redeemable ticket into the ticket manager.
219    ///
220    /// On success, the method returns all tickets that have been neglected in the ticket queue of this channel,
221    /// in case the inserted ticket has a greater channel epoch than the [next extractable](TicketQueue::peek) ticket in
222    /// the queue. This situation can happen when unredeemed tickets are left in the queue, while the corresponding
223    /// channel restarts its lifecycle and a new winning ticket is received.
224    /// Otherwise, the returned vector is empty.
225    /// Returns the total value of unredeemed tickets in the given channel and its latest epoch.
226    ///
227    /// NOTE: The function is less efficient when the `min_index` is specified, as
228    /// a full scan of the queue is required to calculate the unrealized value.
229    pub fn unrealized_value(
230        &self,
231        channel_id: &ChannelId,
232        min_index: Option<u64>,
233    ) -> Result<Option<HoprBalance>, TicketManagerError> {
234        self.channel_tickets.unrealized_value(channel_id, min_index)
235    }
236}
237impl<S> hopr_api::tickets::TicketManagement for HoprTicketManager<S, S::Queue>
238where
239    S: TicketQueueStore + Send + Sync + 'static,
240    S::Queue: Send + Sync + 'static,
241{
242    type Error = TicketManagerError;
243
244    /// Creates a stream that redeems tickets in-order one by one in the given channel,
245    /// using the given [`ChainWriteTicketOperations`] on-chain client
246    /// implementation.
247    ///
248    /// If `min_redeem_value` is given, all the tickets that are lower than the given value are neglected in the
249    /// process.
250    ///
251    /// If there's already an existing redeem stream for the channel, an error is returned without creating a new
252    /// stream.
253    ///
254    /// The stream terminates when there are no more tickets to process in the queue, or an error is encountered.
255    fn redeem_stream<C: ChainWriteTicketOperations + Send + Sync + 'static>(
256        &self,
257        chain: C,
258        channel_id: ChannelId,
259        min_amount: Option<HoprBalance>,
260    ) -> Result<impl Stream<Item = Result<RedemptionResult, Self::Error>> + Send, Self::Error> {
261        let initial_state = match self.channel_tickets.0.get(&channel_id) {
262            Some(ticket_queue) => {
263                ticket_queue
264                    .redeem_lock
265                    .compare_exchange(
266                        false,
267                        true,
268                        std::sync::atomic::Ordering::Acquire,
269                        std::sync::atomic::Ordering::Relaxed,
270                    )
271                    .map_err(|_| TicketManagerError::AlreadyRedeeming)?;
272
273                RedeemState {
274                    lock: ticket_queue.redeem_lock.clone(),
275                    queue: ticket_queue.queue.clone(),
276                    min_redeem_value: min_amount.unwrap_or_default(), // default min is 0 wxHOPR
277                    chain,
278                    channel_id,
279                }
280            }
281            None => return Err(TicketManagerError::ChannelQueueNotFound),
282        };
283
284        Ok(futures::stream::try_unfold(initial_state, |state| {
285            // Peek here and release the read lock to prevent holding it across an `await`
286            let next_ticket = state.queue.read().0.peek();
287            async move {
288                match next_ticket.map_err(TicketManagerError::store)? {
289                    Some(ticket_to_redeem) => {
290                        // Attempt to redeem the ticket if it is of sufficient value
291                        let redeem_attempt_result =
292                            if ticket_to_redeem.verified_ticket().amount >= state.min_redeem_value {
293                                match state.chain.redeem_ticket(ticket_to_redeem).and_then(identity).await {
294                                    Ok((redeemed_ticket, _)) => Ok(Some(RedemptionResult::Redeemed(redeemed_ticket))),
295                                    Err(TicketRedeemError::Rejected(ticket, reason)) => {
296                                        Ok(Some(RedemptionResult::RejectedOnChain(ticket, reason)))
297                                    }
298                                    Err(TicketRedeemError::ProcessingError(_, err)) => {
299                                        Err(TicketManagerError::redeem(err))
300                                    }
301                                }
302                            } else {
303                                // Tickets with low value are treated as neglected
304                                Ok(Some(RedemptionResult::ValueTooLow(ticket_to_redeem.ticket)))
305                            };
306
307                        // Once the redemption has been completed, no matter if successful or not,
308                        // check if we need to remove the ticket from the redemption queue.
309                        if let Ok(Some(redeem_complete_result)) =  &redeem_attempt_result {
310                            // In this case, no matter if the ticket has been redeemed,
311                            // neglected or rejected, we're still removing it from the queue.
312                            // Otherwise, the ticket stays in the queue due to a recoverable error
313                            let mut queue_write = state.queue.write();
314
315                            // Check if the next ticket to pop is really the one we just redeemed,
316                            // otherwise we might be popping a wrong ticket.
317                            let pop_res = queue_write.0
318                                .peek()
319                                .map_err(TicketManagerError::store)?
320                                .filter(|ticket_to_pop| ticket_to_pop == &ticket_to_redeem)
321                                .and_then(|_| queue_write.0.pop().map_err(TicketManagerError::store).transpose())
322                                .transpose()?;
323
324                            // Do accounting of the ticket into the stats
325                            match redeem_complete_result {
326                                RedemptionResult::Redeemed(ticket) => {
327                                    queue_write.1.redeemed_value += ticket.verified_ticket().amount;
328                                    tracing::info!(%ticket, "ticket has been redeemed");
329                                },
330                                RedemptionResult::ValueTooLow(ticket) => {
331                                    queue_write.1.neglected_value += ticket.verified_ticket().amount;
332                                    tracing::warn!(%ticket, "ticket has been neglected");
333                                },
334                                RedemptionResult::RejectedOnChain(ticket, reason) => {
335                                    queue_write.1.rejected_value += ticket.verified_ticket().amount;
336                                    tracing::warn!(%ticket, reason, "ticket has been rejected on-chain");
337                                },
338                            }
339
340                            // This can only happen if `neglect_tickets` has been called while redeeming,
341                            // and it has neglected the ticket during this race-condition.
342                            // In this case we only need to correct the neglected value because
343                            // the ticket has been actually redeemed/rejected or was accounted
344                            // as neglected twice.
345                            if pop_res.is_none() {
346                                let ticket = redeem_complete_result.as_ref();
347                                tracing::warn!(%ticket, "ticket has been neglected from the queue while it actually completed the redemption process");
348                                queue_write.1.neglected_value -= ticket.verified_ticket().amount;
349                            }
350                        }
351
352                        redeem_attempt_result
353                    }
354                    None => {
355                        // No more tickets to redeem in this channel
356                        // Keep the queue in even if it is empty. The cleanup is done only on startup.
357                        tracing::debug!(channel_id = %state.channel_id, "no more tickets to redeem in channel");
358                        Ok(None)
359                    }
360                }
361                    .map(|s| s.map(|v| (v, state)))
362            }
363        }))
364    }
365
366    /// Removes all the tickets in the given [`ChannelId`], optionally only up to the given ticket index (inclusive).
367    ///
368    /// If the `up_to_index` is given and lower than the lowest index of an unredeemed ticket in the queue,
369    /// the function does nothing.
370    ///
371    /// If there's ticket redemption ongoing in the same channel and the neglection intersects with the
372    /// redeemed range, the redemption will be cut short, with remaining unredeemed tickets neglected.
373    fn neglect_tickets(
374        &self,
375        channel_id: &ChannelId,
376        up_to_index: Option<u64>,
377    ) -> Result<Vec<VerifiedTicket>, TicketManagerError> {
378        let queue = self
379            .channel_tickets
380            .0
381            .get(channel_id)
382            .map(|q| {
383                if q.redeem_lock.load(std::sync::atomic::Ordering::Relaxed) {
384                    tracing::warn!(%channel_id, "neglecting tickets in channel while redeeming is ongoing");
385                }
386                q.queue.clone()
387            })
388            .ok_or(TicketManagerError::ChannelQueueNotFound)?;
389
390        let mut neglected_tickets = Vec::new();
391        let mut queue_read = queue.upgradable_read();
392        let max_index = up_to_index.unwrap_or(TicketBuilder::MAX_TICKET_INDEX);
393
394        while queue_read
395            .0
396            .peek()
397            .map_err(TicketManagerError::store)?
398            .filter(|ticket| ticket.verified_ticket().index <= max_index)
399            .is_some()
400        {
401            // Quickly perform pop and downgrade to lock not to block any readers
402            let mut queue_write = parking_lot::RwLockUpgradableReadGuard::upgrade(queue_read);
403            let maybe_ticket = queue_write.0.pop().map_err(TicketManagerError::store)?;
404            queue_write.1.neglected_value += maybe_ticket.map(|t| t.verified_ticket().amount).unwrap_or_default();
405            queue_read = parking_lot::RwLockWriteGuard::downgrade_to_upgradable(queue_write);
406
407            neglected_tickets.extend(maybe_ticket.map(|t| t.ticket));
408            tracing::debug!(%channel_id, ?maybe_ticket, "neglected ticket in channel");
409        }
410
411        // Keep the queue in even if it is empty. The cleanup is done only on startup.
412        tracing::debug!(%channel_id, num_tickets = neglected_tickets.len(), "ticket neglection done in channel");
413        Ok(neglected_tickets)
414    }
415
416    /// Computes [statistics](ChannelStats) for the given `channel` or for all channels if `None` is given.
417    ///
418    /// If the given `channel` does not exist, it returns zero statistics instead of an error.
419    ///
420    /// Apart from [`unredeemed_value`](ChannelStats), the statistics are not persistent.
421    fn ticket_stats(&self, channel: Option<&ChannelId>) -> Result<ChannelStats, TicketManagerError> {
422        self.channel_tickets
423            .0
424            .iter()
425            .filter(|e| channel.is_none_or(|c| c == e.key()))
426            .try_fold(ChannelStats::default(), |stats, v| {
427                let queue = v.queue.read();
428                Ok::<_, TicketManagerError>(ChannelStats {
429                    winning_tickets: queue.1.winning_tickets + stats.winning_tickets,
430                    unredeemed_value: queue
431                        .0
432                        .peek()
433                        .map_err(TicketManagerError::store)?
434                        .map(|t| queue.0.total_value(t.verified_ticket().channel_epoch, None))
435                        .transpose()
436                        .map_err(TicketManagerError::store)?
437                        .unwrap_or_default()
438                        + stats.unredeemed_value,
439                    rejected_value: queue.1.rejected_value + stats.rejected_value,
440                    neglected_value: queue.1.neglected_value + stats.neglected_value,
441                })
442            })
443    }
444
445    fn insert_incoming_ticket(&self, ticket: RedeemableTicket) -> Result<Vec<VerifiedTicket>, TicketManagerError> {
446        // Do not allocate, because neglecting tickets is a rare operation
447        let mut neglected_tickets = Vec::with_capacity(0);
448
449        let ticket_id = ticket.ticket_id();
450        match self.channel_tickets.0.entry(ticket_id.id) {
451            dashmap::Entry::Occupied(e) => {
452                // High contention on this write lock is possible only when massive numbers of winning tickets
453                // on the same channel are received, or if tickets on the same channel are being
454                // rapidly redeemed or neglected.
455                // Such a scenario is likely not realistic.
456                let mut queue = e.get().queue.write();
457
458                // If the next ticket ready in this queue is from a previous epoch, we must
459                // drain and neglect all the tickets from the queue. The channel has
460                // apparently restarted its lifecycle, and all the tickets from previous epochs
461                // are unredeemable already
462                if let Some(last_ticket) = queue.0.peek().map_err(TicketManagerError::store)? {
463                    if last_ticket.verified_ticket().channel_epoch < ticket.verified_ticket().channel_epoch {
464                        // Count the neglected value and add it to stats
465                        let mut neg = queue.0.drain().map_err(TicketManagerError::store)?;
466                        queue.1.neglected_value += neg.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>();
467
468                        // Ensures allocation according to the number of drained tickets
469                        neglected_tickets.append(&mut neg);
470                        tracing::warn!(%ticket_id, num_neglected = neglected_tickets.len(), "winning ticket has neglected unredeemed tickets from previous epochs");
471                    } else if last_ticket.verified_ticket().channel_epoch > ticket.verified_ticket().channel_epoch {
472                        tracing::warn!(%ticket_id, "tried to insert incoming ticket from an older epoch");
473
474                        queue.1.winning_tickets += 1; // Still count the ticket as winning
475                        queue.1.neglected_value += ticket.verified_ticket().amount;
476                        neglected_tickets.push(ticket.ticket);
477                        return Ok(neglected_tickets);
478                    }
479                }
480                queue.0.push(ticket).map_err(TicketManagerError::store)?;
481                queue.1.winning_tickets += 1;
482
483                tracing::debug!(%ticket_id, "winning ticket on channel");
484            }
485            dashmap::Entry::Vacant(v) => {
486                // A hypothetical chance of high contention on this write lock is
487                // only possible when massive numbers of winning tickets on new unique channels are received.
488                // Such a scenario is likely not realistic.
489                let mut store = self.store.write();
490
491                let queue = store
492                    .open_or_create_queue(&ticket.ticket_id().id)
493                    .map_err(TicketManagerError::store)?;
494
495                // Wrap the queue with a ticket value cache adapter
496                let mut queue = ValueCachedQueue::new(queue).map_err(TicketManagerError::store)?;
497
498                // Should not happen: it suggests the queue has been modified outside the manager
499                if !queue.is_empty().map_err(TicketManagerError::store)? {
500                    return Err(TicketManagerError::Other(anyhow::anyhow!(
501                        "fatal error: queue not empty"
502                    )));
503                }
504
505                queue.push(ticket).map_err(TicketManagerError::store)?;
506                v.insert(queue.into()); // The ticket is accounted for in the stats automatically
507                tracing::debug!(%ticket_id, "first winning ticket on channel");
508            }
509        }
510
511        Ok(neglected_tickets)
512    }
513}
514
515#[cfg(test)]
516mod tests {
517    use std::ops::Sub;
518
519    use futures::{TryStreamExt, pin_mut};
520    use hopr_api::{
521        OffchainKeypair,
522        tickets::TicketManagement,
523        types::crypto::prelude::{ChainKeypair, Keypair},
524    };
525    use hopr_chain_connector::{
526        BlockchainConnectorConfig, HoprBlockchainConnector, InMemoryBackend, PayloadGenerator, SafePayloadGenerator,
527        reexports::chain::contract_addresses_for_network,
528        testing::{BlokliTestClient, BlokliTestStateBuilder, FullStateEmulator},
529    };
530    use rand::prelude::SliceRandom;
531
532    use super::*;
533    use crate::{
534        MemoryStore, MemoryTicketQueue,
535        traits::tests::{generate_owned_tickets, generate_tickets},
536    };
537
538    fn create_mgr() -> anyhow::Result<HoprTicketManager<MemoryStore, MemoryTicketQueue>> {
539        Ok(HoprTicketManager::new_with_factory(MemoryStore::default()).0)
540    }
541
542    #[test]
543    fn ticket_manager_non_existing_channel_should_return_empty_stats() -> anyhow::Result<()> {
544        let mgr = create_mgr()?;
545
546        assert_eq!(ChannelStats::default(), mgr.ticket_stats(None)?);
547        assert_eq!(ChannelStats::default(), mgr.ticket_stats(Some(&ChannelId::default()))?);
548        Ok(())
549    }
550
551    #[test]
552    fn ticket_manager_should_update_state_when_winning_tickets_are_inserted() -> anyhow::Result<()> {
553        let mgr = create_mgr()?;
554
555        let src = ChainKeypair::random();
556        let dst = ChainKeypair::random();
557
558        let channel = ChannelEntry::builder()
559            .between(&src, &dst)
560            .amount(10)
561            .ticket_index(1)
562            .status(ChannelStatus::Open)
563            .epoch(1)
564            .build()?;
565
566        let tickets = generate_owned_tickets(&src, &dst, 2, 1..=1)?;
567
568        mgr.insert_incoming_ticket(tickets[0])?;
569
570        assert_eq!(
571            ChannelStats {
572                winning_tickets: 1,
573                unredeemed_value: tickets[0].verified_ticket().amount,
574                rejected_value: HoprBalance::zero(),
575                neglected_value: HoprBalance::zero(),
576            },
577            mgr.ticket_stats(Some(&channel.get_id()))?
578        );
579
580        mgr.insert_incoming_ticket(tickets[1])?;
581
582        assert_eq!(
583            ChannelStats {
584                winning_tickets: 2,
585                unredeemed_value: tickets[0].verified_ticket().amount + tickets[1].verified_ticket().amount,
586                rejected_value: HoprBalance::zero(),
587                neglected_value: HoprBalance::zero(),
588            },
589            mgr.ticket_stats(Some(&channel.get_id()))?
590        );
591
592        Ok(())
593    }
594
595    #[test]
596    fn ticket_manager_should_sync_incoming_channels_from_chain_state() -> anyhow::Result<()> {
597        let mgr = create_mgr()?;
598
599        let src = ChainKeypair::random();
600        let dst = ChainKeypair::random();
601
602        let channel = ChannelEntry::builder()
603            .between(&src, &dst)
604            .amount(10)
605            .ticket_index(1)
606            .status(ChannelStatus::Open)
607            .epoch(1)
608            .build()?;
609
610        let neglected = mgr.sync_from_incoming_channels(&[channel])?;
611        assert!(neglected.is_empty());
612
613        let queues = mgr.store.read().iter_queues()?.collect::<Vec<_>>();
614        assert_eq!(vec![*channel.get_id()], queues);
615
616        Ok(())
617    }
618
619    #[test]
620    fn ticket_manager_should_neglect_tickets_from_closed_channels_on_sync() -> anyhow::Result<()> {
621        let mgr = create_mgr()?;
622
623        let tickets = generate_tickets()?;
624        let neglected = mgr.insert_incoming_ticket(tickets[0])?;
625        assert!(neglected.is_empty());
626
627        let channel = ChannelEntry::builder()
628            .between(
629                *tickets[0].ticket.verified_issuer(),
630                tickets[0].verified_ticket().counterparty,
631            )
632            .amount(10)
633            .ticket_index(tickets[0].verified_ticket().index)
634            .status(ChannelStatus::Closed)
635            .epoch(tickets[0].verified_ticket().channel_epoch)
636            .build()?;
637
638        let neglected = mgr.sync_from_incoming_channels(&[channel])?;
639        assert_eq!(1, neglected.len());
640        assert_eq!(tickets[0].ticket, neglected[0]);
641
642        Ok(())
643    }
644
645    #[test]
646    fn ticket_manager_should_neglect_tickets_from_older_epoch_channels_on_sync() -> anyhow::Result<()> {
647        let mgr = create_mgr()?;
648
649        let tickets = generate_tickets()?;
650        let neglected = mgr.insert_incoming_ticket(tickets[0])?;
651        assert!(neglected.is_empty());
652
653        let channel = ChannelEntry::builder()
654            .between(
655                *tickets[0].ticket.verified_issuer(),
656                tickets[0].verified_ticket().counterparty,
657            )
658            .amount(10)
659            .ticket_index(1)
660            .status(ChannelStatus::Open)
661            .epoch(tickets[0].verified_ticket().channel_epoch + 1)
662            .build()?;
663
664        let neglected = mgr.sync_from_incoming_channels(&[channel])?;
665        assert_eq!(1, neglected.len());
666        assert_eq!(tickets[0].ticket, neglected[0]);
667
668        Ok(())
669    }
670
671    #[test]
672    fn ticket_manager_should_neglect_tickets_with_older_index_channels_on_sync() -> anyhow::Result<()> {
673        let mgr = create_mgr()?;
674
675        let tickets = generate_tickets()?;
676        let neglected = mgr.insert_incoming_ticket(tickets[0])?;
677        assert!(neglected.is_empty());
678
679        let channel = ChannelEntry::builder()
680            .between(
681                *tickets[0].ticket.verified_issuer(),
682                tickets[0].verified_ticket().counterparty,
683            )
684            .amount(10)
685            .ticket_index(tickets[0].verified_ticket().index + 1)
686            .status(ChannelStatus::Open)
687            .epoch(tickets[0].verified_ticket().channel_epoch)
688            .build()?;
689
690        let neglected = mgr.sync_from_incoming_channels(&[channel])?;
691        assert_eq!(1, neglected.len());
692        assert_eq!(tickets[0].ticket, neglected[0]);
693
694        Ok(())
695    }
696
697    #[test]
698    fn ticket_manager_should_neglect_tickets_from_effectively_closed_channels_on_sync() -> anyhow::Result<()> {
699        let mgr = create_mgr()?;
700
701        let tickets = generate_tickets()?;
702        let neglected = mgr.insert_incoming_ticket(tickets[0])?;
703        assert!(neglected.is_empty());
704
705        let channel = ChannelEntry::builder()
706            .between(
707                *tickets[0].ticket.verified_issuer(),
708                tickets[0].verified_ticket().counterparty,
709            )
710            .amount(10)
711            .ticket_index(1)
712            .status(ChannelStatus::PendingToClose(
713                std::time::SystemTime::now().sub(std::time::Duration::from_mins(10)),
714            ))
715            .epoch(1)
716            .build()?;
717
718        let neglected = mgr.sync_from_incoming_channels(&[channel])?;
719        assert_eq!(1, neglected.len());
720        assert_eq!(tickets[0].ticket, neglected[0]);
721
722        Ok(())
723    }
724
725    #[test]
726    fn ticket_manager_should_neglect_tickets_from_non_existent_channels_on_sync() -> anyhow::Result<()> {
727        let mgr = create_mgr()?;
728
729        let tickets = generate_tickets()?;
730
731        let neglected = mgr.insert_incoming_ticket(tickets[0])?;
732        assert!(neglected.is_empty());
733
734        let neglected = mgr.sync_from_incoming_channels(&[])?;
735        assert_eq!(1, neglected.len());
736        assert_eq!(tickets[0].ticket, neglected[0]);
737
738        Ok(())
739    }
740
741    #[test]
742    fn ticket_manager_should_neglect_tickets_on_demand() -> anyhow::Result<()> {
743        let mgr = create_mgr()?;
744
745        let tickets = generate_tickets()?;
746        let epoch = tickets[0].ticket_id().epoch;
747
748        let tickets = tickets
749            .into_iter()
750            .filter(|t| t.verified_ticket().channel_epoch == epoch)
751            .collect::<Vec<_>>();
752
753        let channel = ChannelEntry::builder()
754            .between(
755                *tickets[0].ticket.verified_issuer(),
756                tickets[0].verified_ticket().counterparty,
757            )
758            .amount(10)
759            .ticket_index(0)
760            .status(ChannelStatus::Open)
761            .epoch(1)
762            .build()?;
763
764        for ticket in tickets.iter() {
765            let neglected = mgr.insert_incoming_ticket(*ticket)?;
766            assert!(neglected.is_empty());
767        }
768
769        let neglected = mgr.sync_from_incoming_channels(&[channel])?;
770        assert!(neglected.is_empty());
771
772        let unrealized_value = mgr
773            .unrealized_value(channel.get_id(), None)?
774            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
775        assert_eq!(
776            tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
777            unrealized_value
778        );
779
780        let neglected = mgr.neglect_tickets(&channel.get_id(), None)?;
781        assert_eq!(tickets.iter().map(|t| t.ticket).collect::<Vec<_>>(), neglected);
782
783        let unrealized_value_after = mgr
784            .unrealized_value(channel.get_id(), None)?
785            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
786        assert_eq!(
787            unrealized_value_after,
788            unrealized_value
789                - neglected
790                    .iter()
791                    .map(|t| t.verified_ticket().amount)
792                    .sum::<HoprBalance>()
793        );
794
795        assert_eq!(
796            ChannelStats {
797                winning_tickets: tickets.len() as u128,
798                unredeemed_value: unrealized_value_after,
799                rejected_value: HoprBalance::zero(),
800                neglected_value: neglected.iter().map(|t| t.verified_ticket().amount).sum(),
801            },
802            mgr.ticket_stats(Some(&channel.get_id()))?
803        );
804
805        Ok(())
806    }
807
808    #[test]
809    fn ticket_manager_should_neglect_tickets_on_demand_with_upper_limit_on_index() -> anyhow::Result<()> {
810        let mgr = create_mgr()?;
811
812        let tickets = generate_tickets()?;
813        let epoch = tickets[0].ticket_id().epoch;
814
815        let tickets = tickets
816            .into_iter()
817            .filter(|t| t.verified_ticket().channel_epoch == epoch)
818            .collect::<Vec<_>>();
819
820        let channel = ChannelEntry::builder()
821            .between(
822                *tickets[0].ticket.verified_issuer(),
823                tickets[0].verified_ticket().counterparty,
824            )
825            .amount(10)
826            .ticket_index(0)
827            .status(ChannelStatus::Open)
828            .epoch(1)
829            .build()?;
830
831        for ticket in tickets.iter() {
832            let neglected = mgr.insert_incoming_ticket(*ticket)?;
833            assert!(neglected.is_empty());
834        }
835
836        let neglected = mgr.sync_from_incoming_channels(&[channel])?;
837        assert!(neglected.is_empty());
838
839        let unrealized_value = mgr
840            .unrealized_value(channel.get_id(), None)?
841            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
842        assert_eq!(
843            tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
844            unrealized_value
845        );
846
847        let neglected = mgr.neglect_tickets(&channel.get_id(), Some(3))?;
848        assert_eq!(
849            tickets
850                .iter()
851                .filter(|t| t.verified_ticket().index <= 3)
852                .map(|t| t.ticket)
853                .collect::<Vec<_>>(),
854            neglected
855        );
856
857        let unrealized_value_after = mgr
858            .unrealized_value(channel.get_id(), None)?
859            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
860        assert_eq!(
861            unrealized_value_after,
862            unrealized_value
863                - neglected
864                    .iter()
865                    .map(|t| t.verified_ticket().amount)
866                    .sum::<HoprBalance>()
867        );
868
869        assert_eq!(
870            ChannelStats {
871                winning_tickets: tickets.len() as u128,
872                unredeemed_value: unrealized_value_after,
873                rejected_value: HoprBalance::zero(),
874                neglected_value: neglected.iter().map(|t| t.verified_ticket().amount).sum(),
875            },
876            mgr.ticket_stats(Some(&channel.get_id()))?
877        );
878
879        Ok(())
880    }
881
882    #[test]
883    fn ticket_manager_unrealized_value_should_increase_when_tickets_are_added() -> anyhow::Result<()> {
884        let mgr = create_mgr()?;
885
886        let mut tickets = generate_tickets()?;
887        let channel_id = tickets[0].ticket_id().id;
888        let epoch = tickets[0].ticket_id().epoch;
889        tickets.retain(|ticket| ticket.verified_ticket().channel_epoch == epoch);
890
891        assert!(!tickets.is_empty());
892
893        assert!(matches!(mgr.unrealized_value(&channel_id, None), Ok(None)));
894
895        let mut last_unrealized_value = HoprBalance::zero();
896        assert_eq!(HoprBalance::zero(), last_unrealized_value);
897
898        for ticket in tickets.iter() {
899            let neglected = mgr.insert_incoming_ticket(*ticket)?;
900            assert!(neglected.is_empty());
901
902            let new_unrealized_value = mgr
903                .unrealized_value(&channel_id, None)?
904                .ok_or(anyhow::anyhow!("must have unrealized value"))?;
905            assert_eq!(
906                new_unrealized_value - last_unrealized_value,
907                ticket.verified_ticket().amount
908            );
909
910            last_unrealized_value = new_unrealized_value;
911        }
912
913        let expected_unrealized_value: HoprBalance = tickets.iter().map(|ticket| ticket.verified_ticket().amount).sum();
914        assert_eq!(expected_unrealized_value, last_unrealized_value);
915
916        assert_eq!(
917            ChannelStats {
918                winning_tickets: tickets.len() as u128,
919                unredeemed_value: expected_unrealized_value,
920                rejected_value: HoprBalance::zero(),
921                neglected_value: HoprBalance::zero(),
922            },
923            mgr.ticket_stats(Some(&tickets[0].ticket.channel_id()))?
924        );
925
926        Ok(())
927    }
928
929    #[test]
930    fn ticket_manager_inserted_ticket_with_older_epoch_should_be_neglected() -> anyhow::Result<()> {
931        let mgr = create_mgr()?;
932
933        let tickets = generate_tickets()?;
934        assert!(!tickets.is_empty());
935        let channel_id = tickets[0].ticket_id().id;
936
937        let tickets_from_epoch_1 = tickets
938            .iter()
939            .filter(|ticket| ticket.verified_ticket().channel_epoch == 1)
940            .cloned()
941            .collect::<Vec<_>>();
942        assert!(!tickets_from_epoch_1.is_empty());
943
944        let tickets_from_epoch_2 = tickets
945            .iter()
946            .filter(|ticket| ticket.verified_ticket().channel_epoch == 2)
947            .cloned()
948            .collect::<Vec<_>>();
949        assert!(!tickets_from_epoch_2.is_empty());
950
951        for new_ticket in &tickets_from_epoch_2 {
952            let neglected = mgr.insert_incoming_ticket(*new_ticket)?;
953            assert!(neglected.is_empty());
954        }
955
956        for old_ticket in &tickets_from_epoch_1 {
957            let neglected = mgr.insert_incoming_ticket(*old_ticket)?;
958            assert_eq!(vec![old_ticket.ticket], neglected);
959        }
960
961        let stats = mgr.ticket_stats(Some(&channel_id))?;
962
963        assert_eq!(
964            (tickets_from_epoch_1.len() + tickets_from_epoch_2.len()) as u128,
965            stats.winning_tickets
966        );
967        assert_eq!(
968            tickets_from_epoch_2
969                .iter()
970                .map(|t| t.verified_ticket().amount)
971                .sum::<HoprBalance>(),
972            stats.unredeemed_value
973        );
974        assert_eq!(HoprBalance::zero(), stats.rejected_value);
975
976        Ok(())
977    }
978
979    #[test]
980    fn ticket_manager_ticket_insertion_should_neglect_tickets_from_previous_epochs() -> anyhow::Result<()> {
981        let mgr = create_mgr()?;
982
983        let tickets = generate_tickets()?;
984        assert!(!tickets.is_empty());
985        let channel_id = tickets[0].ticket_id().id;
986
987        let tickets_from_epoch_1 = tickets
988            .iter()
989            .filter(|ticket| ticket.verified_ticket().channel_epoch == 1)
990            .cloned()
991            .collect::<Vec<_>>();
992        assert!(!tickets_from_epoch_1.is_empty());
993
994        let tickets_from_epoch_2 = tickets
995            .iter()
996            .filter(|ticket| ticket.verified_ticket().channel_epoch == 2)
997            .cloned()
998            .collect::<Vec<_>>();
999        assert!(!tickets_from_epoch_2.is_empty());
1000
1001        assert!(matches!(mgr.unrealized_value(&channel_id, None), Ok(None)));
1002
1003        for ticket in tickets_from_epoch_1.iter() {
1004            let neglected = mgr.insert_incoming_ticket(*ticket)?;
1005            assert!(neglected.is_empty());
1006        }
1007
1008        let new_unrealized_value = mgr
1009            .unrealized_value(&channel_id, None)?
1010            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1011        assert_eq!(
1012            new_unrealized_value,
1013            tickets_from_epoch_1
1014                .iter()
1015                .map(|ticket| ticket.verified_ticket().amount)
1016                .sum()
1017        );
1018
1019        let neglected = mgr.insert_incoming_ticket(tickets_from_epoch_2[0].clone())?;
1020        assert_eq!(
1021            tickets_from_epoch_1.iter().map(|t| t.ticket).collect::<Vec<_>>(),
1022            neglected
1023        );
1024
1025        // There's now only 1 ticket from epoch 2
1026        let new_unrealized_value = mgr
1027            .unrealized_value(&channel_id, None)?
1028            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1029        assert_eq!(tickets_from_epoch_2[0].verified_ticket().amount, new_unrealized_value);
1030
1031        assert_eq!(
1032            ChannelStats {
1033                winning_tickets: tickets_from_epoch_1.len() as u128 + 1,
1034                unredeemed_value: new_unrealized_value,
1035                rejected_value: HoprBalance::zero(),
1036                neglected_value: neglected.iter().map(|t| t.verified_ticket().amount).sum(),
1037            },
1038            mgr.ticket_stats(Some(&channel_id))?
1039        );
1040
1041        let queue_tickets = mgr
1042            .store
1043            .write()
1044            .open_or_create_queue(&channel_id)?
1045            .iter_unordered()?
1046            .collect::<Result<Vec<_>, _>>()?;
1047        assert_eq!(1, queue_tickets.len());
1048        assert_eq!(
1049            tickets_from_epoch_2[0].verified_ticket(),
1050            queue_tickets[0].verified_ticket()
1051        );
1052
1053        Ok(())
1054    }
1055
1056    pub type TestConnector = HoprBlockchainConnector<
1057        BlokliTestClient<FullStateEmulator>,
1058        InMemoryBackend,
1059        SafePayloadGenerator,
1060        <SafePayloadGenerator as PayloadGenerator>::TxRequest,
1061    >;
1062
1063    async fn create_test_connector(
1064        private_key: &ChainKeypair,
1065        channel: &ChannelEntry,
1066        tx_sim_delay: Option<std::time::Duration>,
1067    ) -> anyhow::Result<TestConnector> {
1068        let module_addr: [u8; 20] = [1; 20];
1069        // We need to be channel destination because we'll be redeeming tickets
1070        assert_eq!(private_key.public().to_address(), channel.destination);
1071
1072        let blokli_client = BlokliTestStateBuilder::default()
1073            .with_balances([(private_key.public().to_address(), XDaiBalance::new_base(1))])
1074            .with_accounts([
1075                (
1076                    AccountEntry {
1077                        public_key: *OffchainKeypair::random().public(),
1078                        chain_addr: private_key.public().to_address(),
1079                        entry_type: AccountType::NotAnnounced,
1080                        safe_address: None,
1081                        key_id: 1.into(),
1082                    },
1083                    HoprBalance::new_base(1000),
1084                    XDaiBalance::new_base(1),
1085                ),
1086                (
1087                    AccountEntry {
1088                        public_key: *OffchainKeypair::random().public(),
1089                        chain_addr: channel.source,
1090                        entry_type: AccountType::NotAnnounced,
1091                        safe_address: None,
1092                        key_id: 2.into(),
1093                    },
1094                    HoprBalance::new_base(1000),
1095                    XDaiBalance::new_base(1),
1096                ),
1097            ])
1098            .with_channels([*channel])
1099            .with_hopr_network_chain_info("rotsee")
1100            .build_dynamic_client(module_addr.into())
1101            .with_tx_simulation_delay(tx_sim_delay.unwrap_or(std::time::Duration::from_millis(500)));
1102
1103        let mut connector = TestConnector::new(
1104            private_key.clone(),
1105            BlockchainConnectorConfig::default(),
1106            blokli_client,
1107            InMemoryBackend::default(),
1108            SafePayloadGenerator::new(
1109                &private_key,
1110                contract_addresses_for_network("rotsee").unwrap().1,
1111                module_addr.into(),
1112            ),
1113        );
1114        connector.connect().await?;
1115
1116        Ok(connector)
1117    }
1118
1119    #[test_log::test(tokio::test)]
1120    async fn ticket_manager_should_redeem_tickets_on_demand() -> anyhow::Result<()> {
1121        let mgr = create_mgr()?;
1122
1123        let src = ChainKeypair::random();
1124        let dst = ChainKeypair::random();
1125
1126        let channel = ChannelEntry::builder()
1127            .between(&src, &dst)
1128            .amount(10_000_000_000_u64)
1129            .ticket_index(0)
1130            .status(ChannelStatus::Open)
1131            .epoch(1)
1132            .build()?;
1133
1134        let mut tickets = generate_owned_tickets(&src, &dst, 3, 1..=1)?;
1135        tickets.shuffle(&mut rand::rng());
1136
1137        for ticket in tickets.iter() {
1138            assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1139        }
1140
1141        tickets.sort();
1142
1143        let mut unrealized_value = mgr
1144            .unrealized_value(channel.get_id(), None)?
1145            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1146        assert_eq!(
1147            tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1148            unrealized_value
1149        );
1150
1151        let connector = create_test_connector(&dst, &channel, None).await?;
1152
1153        let stream = mgr.redeem_stream(connector, *channel.get_id(), None)?;
1154
1155        pin_mut!(stream);
1156
1157        assert_eq!(
1158            Some(RedemptionResult::Redeemed(tickets[0].ticket)),
1159            stream.try_next().await?
1160        );
1161        assert_eq!(
1162            mgr.unrealized_value(channel.get_id(), None)?
1163                .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1164            unrealized_value - tickets[0].verified_ticket().amount
1165        );
1166        unrealized_value = mgr
1167            .unrealized_value(channel.get_id(), None)?
1168            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1169
1170        assert_eq!(
1171            Some(RedemptionResult::Redeemed(tickets[1].ticket)),
1172            stream.try_next().await?
1173        );
1174        assert_eq!(
1175            mgr.unrealized_value(channel.get_id(), None)?
1176                .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1177            unrealized_value - tickets[1].verified_ticket().amount
1178        );
1179        unrealized_value = mgr
1180            .unrealized_value(channel.get_id(), None)?
1181            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1182
1183        assert_eq!(
1184            Some(RedemptionResult::Redeemed(tickets[2].ticket)),
1185            stream.try_next().await?
1186        );
1187        assert_eq!(
1188            mgr.unrealized_value(channel.get_id(), None)?
1189                .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1190            unrealized_value - tickets[2].verified_ticket().amount
1191        );
1192
1193        assert_eq!(None, stream.try_next().await?);
1194
1195        Ok(())
1196    }
1197
1198    #[tokio::test]
1199    async fn ticket_manager_should_not_allow_concurrent_redemptions_on_the_same_channel() -> anyhow::Result<()> {
1200        let mgr = create_mgr()?;
1201
1202        let src = ChainKeypair::random();
1203        let dst = ChainKeypair::random();
1204
1205        let channel = ChannelEntry::builder()
1206            .between(&src, &dst)
1207            .amount(10_000_000_000_u64)
1208            .ticket_index(0)
1209            .status(ChannelStatus::Open)
1210            .epoch(1)
1211            .build()?;
1212
1213        let mut tickets = generate_owned_tickets(&src, &dst, 3, 1..=1)?;
1214        tickets.shuffle(&mut rand::rng());
1215
1216        for ticket in tickets.iter() {
1217            assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1218        }
1219
1220        let connector = std::sync::Arc::new(create_test_connector(&dst, &channel, None).await?);
1221
1222        let stream = mgr.redeem_stream(connector.clone(), *channel.get_id(), None)?;
1223
1224        assert!(mgr.redeem_stream(connector.clone(), *channel.get_id(), None).is_err());
1225
1226        drop(stream);
1227
1228        assert!(mgr.redeem_stream(connector.clone(), *channel.get_id(), None).is_ok());
1229
1230        Ok(())
1231    }
1232
1233    #[tokio::test]
1234    async fn ticket_manager_ticket_neglection_should_cut_ongoing_redemption_short() -> anyhow::Result<()> {
1235        let mgr = create_mgr()?;
1236
1237        let src = ChainKeypair::random();
1238        let dst = ChainKeypair::random();
1239
1240        let channel = ChannelEntry::builder()
1241            .between(&src, &dst)
1242            .amount(10_000_000_000_u64)
1243            .ticket_index(0)
1244            .status(ChannelStatus::Open)
1245            .epoch(1)
1246            .build()?;
1247
1248        let mut tickets = generate_owned_tickets(&src, &dst, 3, 1..=1)?;
1249        tickets.shuffle(&mut rand::rng());
1250
1251        for ticket in tickets.iter() {
1252            assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1253        }
1254
1255        tickets.sort();
1256
1257        let unrealized_value = mgr
1258            .unrealized_value(channel.get_id(), None)?
1259            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1260        assert_eq!(
1261            tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1262            unrealized_value
1263        );
1264
1265        let connector = std::sync::Arc::new(create_test_connector(&dst, &channel, None).await?);
1266
1267        let stream = mgr.redeem_stream(connector.clone(), *channel.get_id(), None)?;
1268        pin_mut!(stream);
1269
1270        assert_eq!(
1271            Some(RedemptionResult::Redeemed(tickets[0].ticket)),
1272            stream.try_next().await?
1273        );
1274        assert_eq!(
1275            mgr.unrealized_value(channel.get_id(), None)?
1276                .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1277            unrealized_value - tickets[0].verified_ticket().amount
1278        );
1279
1280        let neglected = mgr.neglect_tickets(&channel.get_id(), None)?;
1281        assert_eq!(
1282            tickets.into_iter().skip(1).map(|t| t.ticket).collect::<Vec<_>>(),
1283            neglected
1284        );
1285        assert_eq!(
1286            HoprBalance::zero(),
1287            mgr.unrealized_value(channel.get_id(), None)?
1288                .ok_or(anyhow::anyhow!("must have unrealized value"))?
1289        );
1290
1291        assert_eq!(None, stream.try_next().await?);
1292
1293        Ok(())
1294    }
1295
1296    #[tokio::test]
1297    async fn ticket_manager_partial_ticket_neglection_should_cut_ongoing_redemption_short() -> anyhow::Result<()> {
1298        let mgr = create_mgr()?;
1299
1300        let src = ChainKeypair::random();
1301        let dst = ChainKeypair::random();
1302
1303        let channel = ChannelEntry::builder()
1304            .between(&src, &dst)
1305            .amount(10_000_000_000_u64)
1306            .ticket_index(0)
1307            .status(ChannelStatus::Open)
1308            .epoch(1)
1309            .build()?;
1310
1311        let mut tickets = generate_owned_tickets(&src, &dst, 5, 1..=1)?;
1312        tickets.shuffle(&mut rand::rng());
1313
1314        for ticket in tickets.iter() {
1315            assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1316        }
1317
1318        tickets.sort();
1319
1320        let mut unrealized_value = mgr
1321            .unrealized_value(channel.get_id(), None)?
1322            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1323        assert_eq!(
1324            tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1325            unrealized_value
1326        );
1327
1328        let connector = std::sync::Arc::new(create_test_connector(&dst, &channel, None).await?);
1329
1330        let stream = mgr.redeem_stream(connector.clone(), *channel.get_id(), None)?;
1331        pin_mut!(stream);
1332
1333        // Ticket with index 0 gets redeemed
1334        assert_eq!(
1335            Some(RedemptionResult::Redeemed(tickets[0].ticket)),
1336            stream.try_next().await?
1337        );
1338        assert_eq!(
1339            mgr.unrealized_value(channel.get_id(), None)?
1340                .ok_or(anyhow::anyhow!("must have unrealized value"))?,
1341            unrealized_value - tickets[0].verified_ticket().amount
1342        );
1343        unrealized_value = mgr
1344            .unrealized_value(channel.get_id(), None)?
1345            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1346
1347        // Tickets with index 1,2 and 3 get neglected
1348        let neglected = mgr.neglect_tickets(&channel.get_id(), Some(tickets[3].verified_ticket().index))?;
1349        assert_eq!(
1350            tickets.iter().skip(1).take(3).map(|t| t.ticket).collect::<Vec<_>>(),
1351            neglected
1352        );
1353        assert_eq!(
1354            unrealized_value
1355                - neglected
1356                    .into_iter()
1357                    .map(|t| t.verified_ticket().amount)
1358                    .sum::<HoprBalance>(),
1359            mgr.unrealized_value(channel.get_id(), None)?
1360                .ok_or(anyhow::anyhow!("must have unrealized value"))?
1361        );
1362
1363        // The last ticket with index 4 gets redeemed again
1364        assert_eq!(
1365            Some(RedemptionResult::Redeemed(tickets[4].ticket)),
1366            stream.try_next().await?
1367        );
1368
1369        assert_eq!(
1370            HoprBalance::zero(),
1371            mgr.unrealized_value(channel.get_id(), None)?
1372                .ok_or(anyhow::anyhow!("must have unrealized value"))?
1373        );
1374
1375        assert_eq!(None, stream.try_next().await?);
1376
1377        Ok(())
1378    }
1379
1380    #[tokio::test]
1381    async fn ticket_manager_ticket_neglection_during_on_chain_redemption_should_be_detected() -> anyhow::Result<()> {
1382        let mgr = std::sync::Arc::new(create_mgr()?);
1383
1384        let src = ChainKeypair::random();
1385        let dst = ChainKeypair::random();
1386
1387        let channel = ChannelEntry::builder()
1388            .between(&src, &dst)
1389            .amount(10_000_000_000_u64)
1390            .ticket_index(0)
1391            .status(ChannelStatus::Open)
1392            .epoch(1)
1393            .build()?;
1394
1395        let mut tickets = generate_owned_tickets(&src, &dst, 5, 1..=1)?;
1396        tickets.shuffle(&mut rand::rng());
1397
1398        for ticket in tickets.iter() {
1399            assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1400        }
1401
1402        tickets.sort();
1403
1404        let connector =
1405            std::sync::Arc::new(create_test_connector(&dst, &channel, Some(std::time::Duration::from_secs(2))).await?);
1406
1407        let mgr_clone = mgr.clone();
1408        let jh = tokio::task::spawn(async move {
1409            let stream = mgr_clone.redeem_stream(connector.clone(), *channel.get_id(), None)?;
1410            pin_mut!(stream);
1411            stream.try_next().await
1412        });
1413        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1414
1415        // All the tickets will appear as neglected
1416        let neglected = mgr.neglect_tickets(&channel.get_id(), None)?;
1417        assert_eq!(neglected, tickets.iter().map(|t| t.ticket).collect::<Vec<_>>());
1418
1419        assert_eq!(
1420            ChannelStats {
1421                winning_tickets: tickets.len() as u128,
1422                unredeemed_value: HoprBalance::zero(),
1423                rejected_value: HoprBalance::zero(),
1424                neglected_value: neglected.iter().map(|t| t.verified_ticket().amount).sum(),
1425            },
1426            mgr.ticket_stats(Some(&channel.get_id()))?
1427        );
1428
1429        // Once redemption completes we should see the tickets as redeemed
1430        let res = jh.await??;
1431        assert_eq!(Some(RedemptionResult::Redeemed(tickets[0].ticket)), res);
1432
1433        assert_eq!(
1434            ChannelStats {
1435                winning_tickets: tickets.len() as u128,
1436                unredeemed_value: HoprBalance::zero(),
1437                rejected_value: HoprBalance::zero(),
1438                neglected_value: neglected
1439                    .iter()
1440                    .map(|t| t.verified_ticket().amount)
1441                    .sum::<HoprBalance>()
1442                    - tickets[0].verified_ticket().amount,
1443            },
1444            mgr.ticket_stats(Some(&channel.get_id()))?
1445        );
1446
1447        Ok(())
1448    }
1449
1450    #[tokio::test]
1451    async fn ticket_manager_ticket_redemption_should_skip_low_value_tickets() -> anyhow::Result<()> {
1452        let mgr = create_mgr()?;
1453
1454        let src = ChainKeypair::random();
1455        let dst = ChainKeypair::random();
1456
1457        let channel = ChannelEntry::builder()
1458            .between(&src, &dst)
1459            .amount(10_000_000_000_u64)
1460            .ticket_index(0)
1461            .status(ChannelStatus::Open)
1462            .epoch(1)
1463            .build()?;
1464
1465        let mut tickets = generate_owned_tickets(&src, &dst, 5, 1..=1)?;
1466        tickets.shuffle(&mut rand::rng());
1467
1468        for ticket in tickets.iter() {
1469            assert!(mgr.insert_incoming_ticket(*ticket)?.is_empty());
1470        }
1471
1472        tickets.sort();
1473
1474        let unrealized_value = mgr
1475            .unrealized_value(channel.get_id(), None)?
1476            .ok_or(anyhow::anyhow!("must have unrealized value"))?;
1477        assert_eq!(
1478            tickets.iter().map(|t| t.verified_ticket().amount).sum::<HoprBalance>(),
1479            unrealized_value
1480        );
1481
1482        let connector = std::sync::Arc::new(create_test_connector(&dst, &channel, None).await?);
1483
1484        let results = mgr
1485            .redeem_stream(
1486                connector.clone(),
1487                *channel.get_id(),
1488                Some(tickets[0].verified_ticket().amount + 1),
1489            )?
1490            .try_collect::<Vec<_>>()
1491            .await?;
1492
1493        assert_eq!(
1494            results,
1495            tickets
1496                .into_iter()
1497                .map(|t| RedemptionResult::ValueTooLow(t.ticket))
1498                .collect::<Vec<_>>()
1499        );
1500
1501        Ok(())
1502    }
1503}