Skip to main content

hopr_ticket_manager/
factory.rs

1use std::num::NonZeroU8;
2
3use hopr_api::{
4    HoprBalance,
5    chain::{ChannelEntry, WinningProbability},
6    tickets::TicketBuilder,
7    types::{
8        internal::channels::ChannelStatus,
9        primitive::prelude::{U256, UnitaryFloatOps},
10    },
11};
12
13use crate::{
14    OutgoingIndexStore, TicketManagerError,
15    utils::{OutgoingIndexCache, UnrealizedValue},
16};
17
18/// Keeps track of outgoing ticket indices and provides an interface for creating multihop tickets.
19///
20/// It is therefore always used on all noded types (Entry/Relay/Exit) in the outgoing packet pipeline, as
21/// it handles the outgoing ticket indices for new tickets.
22///
23/// For Entry/Exit nodes, the `HoprTicketFactory` is typically created as standalone (via [`HoprTicketFactory::new`]).
24///
25/// To synchronize the on-chain state with the store, it is advised to call
26/// [`sync_outgoing_channels`](HoprTicketFactory::sync_from_outgoing_channels) early
27/// after the construction of the factory, to make sure outgoing indices are up to date. This is typically done only
28/// once after construction and not needed to be done during the life-time of the factory.
29///
30/// The factory is safe to be shared via an `Arc`.
31///
32/// ### Usage in the outgoing packet pipeline
33/// The outgoing packet pipeline usually just calls
34/// [`new_multihop_ticket`](hopr_api::tickets::TicketFactory::new_multihop_ticket) to create a ticket for the next hop
35/// on a multi-hop path. To create zero/last-hop tickets, the factory is not needed as these tickets essentially contain
36/// bogus data and there's no channel required.
37///
38/// The outgoing indices are **not** automatically synchronized back to the underlying store for performance reasons.
39/// The user is responsible for calling [`save_outgoing_indices`](HoprTicketFactory::save_outgoing_indices) to save
40/// the outgoing indices to the store.
41///
42/// This usage is typical for all kinds of nodes (Entry/Relay/Exit).
43///
44/// ### Usage in the incoming packet pipeline in Relay nodes
45/// There is additional usage of the `HoprTicketFactory` in the incoming pipeline in Relay nodes.
46/// The Relay nodes typically need to validate incoming tickets in their incoming packet pipeline **before**
47/// they forward the packet to the outgoing packet pipeline (out to the next hop).
48///
49/// The `HoprTicketFactory` in this case maintains a weak reference to [`HoprTicketManager`](crate::HoprTicketManager)
50/// if they were created together via
51/// [`HoprTicketManager::new_with_factory`](crate::HoprTicketManager::new_with_factory).
52///
53/// By using this weak reference, it can get the [remaining channel
54/// stake](hopr_api::tickets::TicketFactory::remaining_incoming_channel_stake) on the given channel by subtracting the
55/// value of unredeemed tickets in the matching channel queue of the associated
56/// [`HoprTicketManager`](crate::HoprTicketManager).
57///
58/// This is useful for Relay nodes that need to validate incoming tickets before forwarding them to the outgoing packet
59/// pipeline.
60///
61/// NOTE: if the `HoprTicketFactor` is not created with a `HoprTicketManager`, it cannot evaluate the remaining stake on
62/// the given channel and will always return the channel balance.
63///
64/// ## Locking and lock-contention
65///
66/// ### Outgoing ticket creation
67/// The [`new_multihop_ticket`](hopr_api::tickets::TicketFactory::new_multihop_ticket) method is designed to be
68/// high-performance and to be called per each outgoing packet. It is using only atomics to track the outgoing
69/// ticket index for a channel. The synchronization to the underlying storage is done on-demand by calling
70/// `save_outgoing_indices`, making quick snapshots of the current state of outgoing indices.
71/// No significant contention is expected unless `save_outgoing_indices` is called very frequently.
72///
73/// ### Remaining channel stake calculation
74/// The [`remaining_incoming_channel_stake`](hopr_api::tickets::TicketFactory::remaining_incoming_channel_stake) method
75/// is designed to be high-performance and to be called per each incoming packet **before** it is forwarded to a next
76/// hop.
77///
78/// This operation acquires the read-part of an RW lock in `HoprTicketManager` (per incoming channel). This may block
79/// the hot-path only if one of the following (write) operations is performed at the same moment:
80///     1. A new incoming winning ticket is inserted into the same incoming channel queue of the `HoprTicketManager`.
81///     2. Ticket redemption has just finished in that particular channel, and the redeemed ticket is dropped from the
82///     same incoming channel queue of the `HoprTicketManager`.
83///     3. Ticket neglection has just finished in that particular channel, and the neglected ticket is dropped from the
84///     same incoming channel queue of the `HoprTicketManager`.
85///
86/// All 3 of these operations are not expected to happen very often on a single channel; therefore, high contention
87/// on the RW lock is not expected.
88pub struct HoprTicketFactory<S> {
89    out_idx_tracker: OutgoingIndexCache,
90    queue_map: std::sync::Weak<dyn UnrealizedValue + Send + Sync + 'static>,
91    store: std::sync::Arc<parking_lot::RwLock<S>>,
92}
93
94impl<S: OutgoingIndexStore + 'static> HoprTicketFactory<S> {
95    /// Creates a new independent ticket factory instance backed by the given `store`.
96    ///
97    /// The `store` must be an [`OutgoingIndexStore`].
98    pub fn new(store: S) -> Self {
99        Self {
100            out_idx_tracker: Default::default(),
101            queue_map: std::sync::Weak::<()>::new(),
102            store: std::sync::Arc::new(parking_lot::RwLock::new(store)),
103        }
104    }
105
106    pub(crate) fn new_shared<Q: UnrealizedValue + Send + Sync + 'static>(
107        store: std::sync::Arc<parking_lot::RwLock<S>>,
108        queue_map: std::sync::Weak<Q>,
109    ) -> Self {
110        Self {
111            out_idx_tracker: Default::default(),
112            queue_map,
113            store,
114        }
115    }
116}
117
118impl<S> HoprTicketFactory<S>
119where
120    S: OutgoingIndexStore + Send + Sync + 'static,
121{
122    /// Gets the next usable ticket index for an outgoing ticket in the given channel and epoch.
123    ///
124    /// This operation is fast and does not immediately put the index into the [`OutgoingIndexStore`].
125    ///
126    /// The returned value is always guaranteed to be greater or equal to the ticket index on the given `channel`.
127    pub fn next_outgoing_ticket_index(&self, channel: &ChannelEntry) -> u64 {
128        let mut next_index = self.out_idx_tracker.next(channel.get_id(), channel.channel_epoch);
129        tracing::trace!(%channel, next_index, "next outgoing ticket index");
130
131        let epoch = channel.channel_epoch;
132
133        if next_index < channel.ticket_index {
134            // Correct the value in the cache if it was lower than the ticket index on the channel.
135            // This sets the value in the cache to the next index after the ticket index on the channel.
136            self.out_idx_tracker
137                .upsert(channel.get_id(), epoch, channel.ticket_index + 1);
138            next_index = channel.ticket_index; // Still, use the channel's ticket index as the next index.
139        }
140
141        // If this is the first index in this epoch,
142        // remove the previous epoch from the map if any.
143        // Epochs always start at 1, ticket indices at 0.
144        if next_index == 0 && epoch > 1 && self.out_idx_tracker.remove(channel.get_id(), epoch - 1) {
145            tracing::trace!(%channel, prev_epoch = epoch - 1, "removing previous epoch from outgoing index cache");
146        }
147
148        next_index
149    }
150
151    /// Saves updated outgoing ticket indices back to the store.
152    ///
153    /// The operation does nothing if there were no [new tickets
154    /// created](hopr_api::tickets::TicketFactory::new_multihop_ticket) on any tracked channel, or the indices were
155    /// not updated.
156    pub fn save_outgoing_indices(&self) -> Result<(), TicketManagerError> {
157        self.out_idx_tracker
158            .save(self.store.clone())
159            .map_err(TicketManagerError::store)?;
160        Ok(())
161    }
162
163    /// Synchronizes the outgoing index counters based on the current on-chain channel
164    /// state given by `outgoing_channels`.
165    ///
166    /// Outgoing indices for channels that either are not present in `outgoing_channels` or
167    /// not present as opened channels will be removed from the store.
168    ///
169    /// Outgoing indices for existing open channels in `outgoing_channels` will be either:
170    /// - added to the store with their current index and epoch (if not present in the store), or
171    /// - updated to the maximum of the two index values (if present in the store)
172    ///
173    /// It is advised to call this function early after the construction of the `HoprTicketFactory`
174    /// to ensure pruning of dangling or out-of-date values.
175    pub fn sync_from_outgoing_channels(&self, outgoing_channels: &[ChannelEntry]) -> Result<(), TicketManagerError> {
176        let outgoing_channels: std::collections::HashSet<_, std::hash::RandomState> =
177            outgoing_channels.iter().collect();
178
179        // Purge outdated outgoing indices
180        let mut store_read = self.store.upgradable_read();
181        let stored_indices = store_read
182            .iter_outgoing_indices()
183            .map_err(TicketManagerError::store)?
184            .collect::<Vec<_>>();
185        for (channel_id, epoch) in stored_indices {
186            // If any stored outgoing index does not match any currently existing opened channel,
187            // remove it from the store
188            if !outgoing_channels.iter().any(|channel| {
189                channel.status == ChannelStatus::Open
190                    && channel.get_id() == &channel_id
191                    && channel.channel_epoch == epoch
192            }) {
193                let mut store_write = parking_lot::RwLockUpgradableReadGuard::upgrade(store_read);
194                store_write
195                    .delete_outgoing_index(&channel_id, epoch)
196                    .map_err(TicketManagerError::store)?;
197                store_read = parking_lot::RwLockWriteGuard::downgrade_to_upgradable(store_write);
198                tracing::debug!(%channel_id, epoch, "purging outdated outgoing index")
199            }
200        }
201
202        for channel in outgoing_channels
203            .iter()
204            .filter(|channel| channel.status == ChannelStatus::Open)
205        {
206            let id = channel.get_id();
207
208            // Either load a previously stored outgoing index or use the channel's ticket index as a
209            // fallback
210            let epoch = channel.channel_epoch;
211            let index = match store_read.load_outgoing_index(id, epoch) {
212                Ok(Some(out_index)) => out_index,
213                Ok(None) => 0,
214                Err(error) => {
215                    tracing::error!(%error, %id, "failed to load outgoing index for channel");
216                    return Err(TicketManagerError::store(error));
217                }
218            };
219
220            // Always use the maximum from the stored value and the current ticket index on the channel
221            let out_index = index.max(channel.ticket_index);
222            self.out_idx_tracker.upsert(id, epoch, out_index);
223            tracing::debug!(%id, epoch, out_index, "loaded outgoing ticket index for channel");
224        }
225
226        tracing::debug!(
227            num_channels = outgoing_channels.len(),
228            "synchronized with outgoing channels"
229        );
230        Ok(())
231    }
232}
233
234impl<S> hopr_api::tickets::TicketFactory for HoprTicketFactory<S>
235where
236    S: OutgoingIndexStore + Send + Sync + 'static,
237{
238    type Error = TicketManagerError;
239
240    /// Method fulfills the implementation of
241    /// [`TicketFactory::new_multihop_ticket`](hopr_api::tickets::TicketFactory::new_multihop_ticket).
242    ///
243    /// ### Implementation details
244    /// The `current_path_pos` indicates the position of the current hop in the multi-hop path.
245    /// It is used to determine the value of the ticket: `price * (current_path_pos - 1) / winning_prob`.
246    /// The function does not make sense for `current_path_pos <= 1` and returns an error if such an argument is
247    /// provided.
248    ///
249    /// For last-hop tickets (`current_path_pos` equal to 1), a [zero hop ticket](TicketBuilder::zero_hop) should be
250    /// created instead.
251    ///
252    /// The function will fail for channels that are not opened or do not have enough funds to cover the ticket value.
253    /// The ticket index of the returned ticket is guaranteed to be greater or equal to the ticket index on the
254    /// given `channel` argument.
255    fn new_multihop_ticket(
256        &self,
257        channel: &ChannelEntry,
258        path_position: NonZeroU8,
259        winning_probability: WinningProbability,
260        price_per_hop: HoprBalance,
261    ) -> Result<TicketBuilder, Self::Error> {
262        let current_path_pos = path_position.get();
263        if current_path_pos == 1 {
264            return Err(TicketManagerError::Other(anyhow::anyhow!(
265                "current path position for multihop ticket must be greater than 1"
266            )));
267        }
268
269        if channel.status != ChannelStatus::Open {
270            return Err(TicketManagerError::Other(anyhow::anyhow!(
271                "channel must be open to create a multihop ticket"
272            )));
273        }
274
275        // The next ticket is worth: price * remaining hop count / winning probability\
276        // The check will also not allow creation of tickets with 0 winning probability.
277        let amount = HoprBalance::from(
278            price_per_hop
279                .amount()
280                .saturating_mul(U256::from(current_path_pos - 1))
281                .div_f64(winning_probability.into())
282                .map_err(|_| {
283                    TicketManagerError::Other(anyhow::anyhow!(
284                        "invalid winning probability for outgoing ticket: {winning_probability}"
285                    ))
286                })?,
287        );
288
289        if channel.balance.lt(&amount) {
290            return Err(TicketManagerError::OutOfFunds(*channel.get_id(), amount));
291        }
292
293        let ticket_builder = TicketBuilder::default()
294            .counterparty(channel.destination)
295            .balance(amount)
296            .index(self.next_outgoing_ticket_index(channel))
297            .win_prob(winning_probability)
298            .channel_epoch(channel.channel_epoch);
299
300        Ok(ticket_builder)
301    }
302
303    /// Method fulfills the implementation of
304    /// [`TicketFactory::remaining_incoming_channel_stake`](hopr_api::tickets::TicketFactory::remaining_incoming_channel_stake).
305    ///
306    /// ## Implementation details
307    ///
308    /// If this instance is created as standalone (via [`HoprTicketFactory::new`]), or the
309    /// [`HoprTicketManager`](crate::HoprTicketManager) that has been initially
310    /// [created](crate::HoprTicketManager::new_with_factory) with this instance is dropped, this method
311    /// returns `Ok(channel.balance)`.
312    ///
313    /// Otherwise, as per requirements, this method returns the balance of the `channel` diminished by the total value
314    /// of unredeemed tickets tracked by the associated [`HoprTicketManager`](crate::HoprTicketManager).
315    fn remaining_incoming_channel_stake(&self, channel: &ChannelEntry) -> Result<HoprBalance, Self::Error> {
316        if let Some(queue_map) = self.queue_map.upgrade() {
317            // Here we do not use the current channel ticket index as the minimum index we should start
318            // computing the unrealized value from, because we assume the tickets get neglected as soon as
319            // the index on the channel increases. This is typically done by the ticket manager after
320            // successful ticket redemption.
321            let unrealized_value = queue_map.unrealized_value(channel.get_id(), None)?;
322
323            // Subtraction on HoprBalance type naturally saturating at 0
324            Ok(channel.balance - unrealized_value.unwrap_or_default())
325        } else {
326            // We intentionally do not return an error here because the factory should work
327            // without ticket manager.
328            tracing::warn!("cannot get remaining stake for channel without ticket manager");
329            Ok(channel.balance)
330        }
331    }
332}
333
334#[cfg(test)]
335mod tests {
336    use hopr_api::{
337        tickets::{TicketFactory, TicketManagement},
338        types::crypto::prelude::Keypair,
339    };
340    use hopr_chain_connector::ChainKeypair;
341
342    use super::*;
343    use crate::{MemoryStore, traits::tests::generate_owned_tickets};
344
345    fn create_factory() -> anyhow::Result<HoprTicketFactory<MemoryStore>> {
346        Ok(HoprTicketFactory::new(MemoryStore::default()))
347    }
348
349    #[test]
350    fn ticket_factory_remaining_incoming_channel_stake_should_behave_as_identity_without_manager() -> anyhow::Result<()>
351    {
352        let factory = create_factory()?;
353        let channel = ChannelEntry::builder()
354            .between(&ChainKeypair::random(), &ChainKeypair::random())
355            .amount(10)
356            .ticket_index(0)
357            .status(ChannelStatus::Open)
358            .epoch(1)
359            .build()?;
360
361        assert_eq!(channel.balance, factory.remaining_incoming_channel_stake(&channel)?);
362        Ok(())
363    }
364
365    #[test]
366    fn ticket_factory_remaining_incoming_channel_stake_should_be_reduced_by_unrealized_value() -> anyhow::Result<()> {
367        let (manager, factory) = crate::HoprTicketManager::new_with_factory(MemoryStore::default());
368
369        let src = ChainKeypair::random();
370        let dst = ChainKeypair::random();
371
372        let tickets = generate_owned_tickets(&src, &dst, 2, 1..=1)?;
373
374        let channel = ChannelEntry::builder()
375            .between(&src, &dst)
376            .balance(tickets[0].verified_ticket().amount * 10)
377            .ticket_index(0)
378            .status(ChannelStatus::Open)
379            .epoch(1)
380            .build()?;
381
382        assert_eq!(channel.balance, factory.remaining_incoming_channel_stake(&channel)?);
383
384        manager.insert_incoming_ticket(tickets[0])?;
385
386        assert_eq!(
387            channel.balance - tickets[0].verified_ticket().amount,
388            factory.remaining_incoming_channel_stake(&channel)?
389        );
390
391        manager.insert_incoming_ticket(tickets[1])?;
392
393        assert_eq!(
394            channel.balance - tickets[0].verified_ticket().amount - tickets[1].verified_ticket().amount,
395            factory.remaining_incoming_channel_stake(&channel)?
396        );
397
398        drop(manager);
399
400        assert_eq!(channel.balance, factory.remaining_incoming_channel_stake(&channel)?);
401
402        Ok(())
403    }
404
405    #[test]
406    fn ticket_factory_should_not_create_tickets_with_zero_winning_probability() -> anyhow::Result<()> {
407        let factory = create_factory()?;
408
409        let src = ChainKeypair::random();
410        let dst = ChainKeypair::random();
411
412        let channel = ChannelEntry::builder()
413            .between(&src, &dst)
414            .amount(10)
415            .ticket_index(1)
416            .status(ChannelStatus::Open)
417            .epoch(1)
418            .build()?;
419
420        assert!(
421            factory
422                .new_multihop_ticket(&channel, 2.try_into()?, WinningProbability::NEVER, 10.into())
423                .is_err()
424        );
425
426        Ok(())
427    }
428
429    #[test]
430    fn ticket_factory_should_create_multihop_tickets() -> anyhow::Result<()> {
431        let factory = create_factory()?;
432
433        let src = ChainKeypair::random();
434        let dst = ChainKeypair::random();
435
436        let channel = ChannelEntry::builder()
437            .between(&src, &dst)
438            .amount(10)
439            .ticket_index(1)
440            .status(ChannelStatus::Open)
441            .epoch(1)
442            .build()?;
443
444        // Loads index 1 which is the next index for a ticket on this channel
445        factory.sync_from_outgoing_channels(&[channel])?;
446
447        let ticket_1 = factory
448            .new_multihop_ticket(&channel, 2.try_into()?, WinningProbability::ALWAYS, 10.into())?
449            .eth_challenge(Default::default())
450            .build_signed(&src, &Default::default())?;
451
452        assert_eq!(ticket_1.channel_id(), channel.get_id());
453        assert_eq!(channel.ticket_index, ticket_1.verified_ticket().index);
454        assert_eq!(channel.channel_epoch, ticket_1.verified_ticket().channel_epoch);
455
456        let ticket_2 = factory
457            .new_multihop_ticket(&channel, 2.try_into()?, WinningProbability::ALWAYS, 10.into())?
458            .eth_challenge(Default::default())
459            .build_signed(&src, &Default::default())?;
460
461        assert_eq!(ticket_2.channel_id(), channel.get_id());
462        assert_eq!(channel.ticket_index + 1, ticket_2.verified_ticket().index);
463        assert_eq!(channel.channel_epoch, ticket_2.verified_ticket().channel_epoch);
464
465        // Should not accept path positions less than 2
466        assert!(
467            factory
468                .new_multihop_ticket(&channel, 1.try_into()?, WinningProbability::ALWAYS, 10.into())
469                .is_err()
470        );
471
472        Ok(())
473    }
474
475    #[test]
476    fn ticket_manager_create_multihop_ticket_should_fail_on_wrong_input() -> anyhow::Result<()> {
477        let factory = create_factory()?;
478
479        let src = ChainKeypair::random();
480        let dst = ChainKeypair::random();
481
482        let mut channel = ChannelEntry::builder()
483            .between(&src, &dst)
484            .amount(10)
485            .ticket_index(1)
486            .status(ChannelStatus::Closed)
487            .epoch(1)
488            .build()?;
489
490        assert!(
491            factory
492                .new_multihop_ticket(&channel, 2.try_into()?, WinningProbability::ALWAYS, 1.into())
493                .is_err()
494        );
495
496        channel.status =
497            ChannelStatus::PendingToClose(std::time::SystemTime::now() - std::time::Duration::from_secs(10));
498
499        assert!(
500            factory
501                .new_multihop_ticket(&channel, 2.try_into()?, WinningProbability::ALWAYS, 1.into())
502                .is_err()
503        );
504
505        channel.status = ChannelStatus::Open;
506
507        assert!(
508            factory
509                .new_multihop_ticket(&channel, 2.try_into()?, WinningProbability::ALWAYS, 11.into())
510                .is_err()
511        );
512
513        assert!(
514            factory
515                .new_multihop_ticket(&channel, 1.try_into()?, WinningProbability::ALWAYS, 1.into())
516                .is_err()
517        );
518
519        Ok(())
520    }
521
522    #[test]
523    fn ticket_manager_test_next_outgoing_ticket_index() -> anyhow::Result<()> {
524        let factory = create_factory()?;
525
526        let src = ChainKeypair::random();
527        let dst = ChainKeypair::random();
528
529        let mut channel = ChannelEntry::builder()
530            .between(&src, &dst)
531            .amount(10)
532            .ticket_index(0)
533            .status(ChannelStatus::Open)
534            .epoch(1)
535            .build()?;
536
537        assert_eq!(0, factory.next_outgoing_ticket_index(&channel));
538
539        channel.ticket_index = 10;
540        assert_eq!(10, factory.next_outgoing_ticket_index(&channel));
541        assert_eq!(11, factory.next_outgoing_ticket_index(&channel));
542
543        channel.ticket_index = 100;
544        assert_eq!(100, factory.next_outgoing_ticket_index(&channel));
545        assert_eq!(101, factory.next_outgoing_ticket_index(&channel));
546
547        channel.ticket_index = 50;
548        assert_eq!(102, factory.next_outgoing_ticket_index(&channel));
549        assert_eq!(103, factory.next_outgoing_ticket_index(&channel));
550
551        factory.save_outgoing_indices()?;
552        assert_eq!(
553            Some(104),
554            factory.store.read().load_outgoing_index(channel.get_id(), 1)?
555        );
556
557        channel.ticket_index = 0;
558        channel.channel_epoch = 2;
559
560        assert_eq!(0, factory.next_outgoing_ticket_index(&channel));
561        factory.save_outgoing_indices()?;
562
563        assert_eq!(None, factory.store.read().load_outgoing_index(channel.get_id(), 1)?);
564        assert_eq!(Some(1), factory.store.read().load_outgoing_index(channel.get_id(), 2)?);
565
566        assert_eq!(1, factory.next_outgoing_ticket_index(&channel));
567
568        Ok(())
569    }
570
571    #[test]
572    fn ticket_manager_should_save_out_indices_to_the_store_on_demand() -> anyhow::Result<()> {
573        let factory = create_factory()?;
574
575        let src = ChainKeypair::random();
576        let dst = ChainKeypair::random();
577
578        let channel = ChannelEntry::builder()
579            .between(&src, &dst)
580            .amount(10)
581            .ticket_index(1)
582            .status(ChannelStatus::Open)
583            .epoch(1)
584            .build()?;
585
586        // Loads index 1 which is the next index for a ticket on this channel
587        factory.sync_from_outgoing_channels(&[channel])?;
588
589        factory.new_multihop_ticket(&channel, 2.try_into()?, WinningProbability::ALWAYS, 10.into())?;
590
591        // Without saving, the store index should not be present in store
592        let saved_index = factory.store.read().load_outgoing_index(channel.get_id(), 1)?;
593        assert_eq!(None, saved_index);
594
595        factory.new_multihop_ticket(&channel, 2.try_into()?, WinningProbability::ALWAYS, 10.into())?;
596
597        factory.save_outgoing_indices()?;
598        let saved_index = factory.store.read().load_outgoing_index(channel.get_id(), 1)?;
599        assert_eq!(Some(3), saved_index);
600
601        factory.new_multihop_ticket(&channel, 2.try_into()?, WinningProbability::ALWAYS, 10.into())?;
602
603        let saved_index = factory.store.read().load_outgoing_index(channel.get_id(), 1)?;
604        assert_eq!(Some(3), saved_index);
605
606        factory.save_outgoing_indices()?;
607        let saved_index = factory.store.read().load_outgoing_index(channel.get_id(), 1)?;
608        assert_eq!(Some(4), saved_index);
609
610        Ok(())
611    }
612
613    #[test]
614    fn ticket_manager_should_sync_out_indices_from_chain_state() -> anyhow::Result<()> {
615        let factory = create_factory()?;
616
617        let src = ChainKeypair::random();
618        let dst = ChainKeypair::random();
619
620        let channel = ChannelEntry::builder()
621            .between(&src, &dst)
622            .amount(10)
623            .ticket_index(1)
624            .status(ChannelStatus::Open)
625            .epoch(1)
626            .build()?;
627
628        factory.sync_from_outgoing_channels(&[channel])?;
629        factory.save_outgoing_indices()?;
630
631        let saved_index = factory.store.read().load_outgoing_index(channel.get_id(), 1)?;
632        assert_eq!(Some(1), saved_index);
633
634        Ok(())
635    }
636
637    #[test_log::test]
638    fn ticket_manager_should_sync_out_indices_should_remove_indices_for_non_opened_outgoing_channels()
639    -> anyhow::Result<()> {
640        let factory = create_factory()?;
641
642        let src = ChainKeypair::random();
643        let dst = ChainKeypair::random();
644
645        let mut channel_1 = ChannelEntry::builder()
646            .between(&src, &dst)
647            .amount(10)
648            .ticket_index(0)
649            .status(ChannelStatus::Open)
650            .epoch(1)
651            .build()?;
652
653        let mut channel_2 = ChannelEntry::builder()
654            .between(&dst, &src)
655            .amount(10)
656            .ticket_index(0)
657            .status(ChannelStatus::Open)
658            .epoch(1)
659            .build()?;
660
661        let ticket_1 = factory
662            .new_multihop_ticket(&channel_1, 2.try_into()?, WinningProbability::ALWAYS, 10.into())?
663            .eth_challenge(Default::default())
664            .build()?;
665        let ticket_2 = factory
666            .new_multihop_ticket(&channel_2, 2.try_into()?, WinningProbability::ALWAYS, 10.into())?
667            .eth_challenge(Default::default())
668            .build()?;
669        assert_eq!(0, ticket_1.index);
670        assert_eq!(0, ticket_2.index);
671
672        factory.save_outgoing_indices()?;
673
674        assert_eq!(
675            Some(1),
676            factory.store.read().load_outgoing_index(channel_1.get_id(), 1)?
677        );
678        assert_eq!(
679            Some(1),
680            factory.store.read().load_outgoing_index(channel_2.get_id(), 1)?
681        );
682
683        channel_1.status = ChannelStatus::Closed;
684        channel_2.status =
685            ChannelStatus::PendingToClose(std::time::SystemTime::now() - std::time::Duration::from_mins(10));
686
687        factory.sync_from_outgoing_channels(&[channel_1, channel_2])?;
688
689        assert_eq!(None, factory.store.read().load_outgoing_index(channel_1.get_id(), 1)?);
690        assert_eq!(None, factory.store.read().load_outgoing_index(channel_2.get_id(), 1)?);
691
692        Ok(())
693    }
694}