1use std::num::NonZeroU8;
2
3use hopr_api::{
4 HoprBalance,
5 chain::{ChannelEntry, WinningProbability},
6 tickets::TicketBuilder,
7 types::{
8 internal::channels::ChannelStatus,
9 primitive::prelude::{U256, UnitaryFloatOps},
10 },
11};
12
13use crate::{
14 OutgoingIndexStore, TicketManagerError,
15 utils::{OutgoingIndexCache, UnrealizedValue},
16};
17
18pub struct HoprTicketFactory<S> {
89 out_idx_tracker: OutgoingIndexCache,
90 queue_map: std::sync::Weak<dyn UnrealizedValue + Send + Sync + 'static>,
91 store: std::sync::Arc<parking_lot::RwLock<S>>,
92}
93
94impl<S: OutgoingIndexStore + 'static> HoprTicketFactory<S> {
95 pub fn new(store: S) -> Self {
99 Self {
100 out_idx_tracker: Default::default(),
101 queue_map: std::sync::Weak::<()>::new(),
102 store: std::sync::Arc::new(parking_lot::RwLock::new(store)),
103 }
104 }
105
106 pub(crate) fn new_shared<Q: UnrealizedValue + Send + Sync + 'static>(
107 store: std::sync::Arc<parking_lot::RwLock<S>>,
108 queue_map: std::sync::Weak<Q>,
109 ) -> Self {
110 Self {
111 out_idx_tracker: Default::default(),
112 queue_map,
113 store,
114 }
115 }
116}
117
118impl<S> HoprTicketFactory<S>
119where
120 S: OutgoingIndexStore + Send + Sync + 'static,
121{
122 pub fn next_outgoing_ticket_index(&self, channel: &ChannelEntry) -> u64 {
128 let mut next_index = self.out_idx_tracker.next(channel.get_id(), channel.channel_epoch);
129 tracing::trace!(%channel, next_index, "next outgoing ticket index");
130
131 let epoch = channel.channel_epoch;
132
133 if next_index < channel.ticket_index {
134 self.out_idx_tracker
137 .upsert(channel.get_id(), epoch, channel.ticket_index + 1);
138 next_index = channel.ticket_index; }
140
141 if next_index == 0 && epoch > 1 && self.out_idx_tracker.remove(channel.get_id(), epoch - 1) {
145 tracing::trace!(%channel, prev_epoch = epoch - 1, "removing previous epoch from outgoing index cache");
146 }
147
148 next_index
149 }
150
151 pub fn save_outgoing_indices(&self) -> Result<(), TicketManagerError> {
157 self.out_idx_tracker
158 .save(self.store.clone())
159 .map_err(TicketManagerError::store)?;
160 Ok(())
161 }
162
163 pub fn sync_from_outgoing_channels(&self, outgoing_channels: &[ChannelEntry]) -> Result<(), TicketManagerError> {
176 let outgoing_channels: std::collections::HashSet<_, std::hash::RandomState> =
177 outgoing_channels.iter().collect();
178
179 let mut store_read = self.store.upgradable_read();
181 let stored_indices = store_read
182 .iter_outgoing_indices()
183 .map_err(TicketManagerError::store)?
184 .collect::<Vec<_>>();
185 for (channel_id, epoch) in stored_indices {
186 if !outgoing_channels.iter().any(|channel| {
189 channel.status == ChannelStatus::Open
190 && channel.get_id() == &channel_id
191 && channel.channel_epoch == epoch
192 }) {
193 let mut store_write = parking_lot::RwLockUpgradableReadGuard::upgrade(store_read);
194 store_write
195 .delete_outgoing_index(&channel_id, epoch)
196 .map_err(TicketManagerError::store)?;
197 store_read = parking_lot::RwLockWriteGuard::downgrade_to_upgradable(store_write);
198 tracing::debug!(%channel_id, epoch, "purging outdated outgoing index")
199 }
200 }
201
202 for channel in outgoing_channels
203 .iter()
204 .filter(|channel| channel.status == ChannelStatus::Open)
205 {
206 let id = channel.get_id();
207
208 let epoch = channel.channel_epoch;
211 let index = match store_read.load_outgoing_index(id, epoch) {
212 Ok(Some(out_index)) => out_index,
213 Ok(None) => 0,
214 Err(error) => {
215 tracing::error!(%error, %id, "failed to load outgoing index for channel");
216 return Err(TicketManagerError::store(error));
217 }
218 };
219
220 let out_index = index.max(channel.ticket_index);
222 self.out_idx_tracker.upsert(id, epoch, out_index);
223 tracing::debug!(%id, epoch, out_index, "loaded outgoing ticket index for channel");
224 }
225
226 tracing::debug!(
227 num_channels = outgoing_channels.len(),
228 "synchronized with outgoing channels"
229 );
230 Ok(())
231 }
232}
233
234impl<S> hopr_api::tickets::TicketFactory for HoprTicketFactory<S>
235where
236 S: OutgoingIndexStore + Send + Sync + 'static,
237{
238 type Error = TicketManagerError;
239
240 fn new_multihop_ticket(
256 &self,
257 channel: &ChannelEntry,
258 path_position: NonZeroU8,
259 winning_probability: WinningProbability,
260 price_per_hop: HoprBalance,
261 ) -> Result<TicketBuilder, Self::Error> {
262 let current_path_pos = path_position.get();
263 if current_path_pos == 1 {
264 return Err(TicketManagerError::Other(anyhow::anyhow!(
265 "current path position for multihop ticket must be greater than 1"
266 )));
267 }
268
269 if channel.status != ChannelStatus::Open {
270 return Err(TicketManagerError::Other(anyhow::anyhow!(
271 "channel must be open to create a multihop ticket"
272 )));
273 }
274
275 let amount = HoprBalance::from(
278 price_per_hop
279 .amount()
280 .saturating_mul(U256::from(current_path_pos - 1))
281 .div_f64(winning_probability.into())
282 .map_err(|_| {
283 TicketManagerError::Other(anyhow::anyhow!(
284 "invalid winning probability for outgoing ticket: {winning_probability}"
285 ))
286 })?,
287 );
288
289 if channel.balance.lt(&amount) {
290 return Err(TicketManagerError::OutOfFunds(*channel.get_id(), amount));
291 }
292
293 let ticket_builder = TicketBuilder::default()
294 .counterparty(channel.destination)
295 .balance(amount)
296 .index(self.next_outgoing_ticket_index(channel))
297 .win_prob(winning_probability)
298 .channel_epoch(channel.channel_epoch);
299
300 Ok(ticket_builder)
301 }
302
303 fn remaining_incoming_channel_stake(&self, channel: &ChannelEntry) -> Result<HoprBalance, Self::Error> {
316 if let Some(queue_map) = self.queue_map.upgrade() {
317 let unrealized_value = queue_map.unrealized_value(channel.get_id(), None)?;
322
323 Ok(channel.balance - unrealized_value.unwrap_or_default())
325 } else {
326 tracing::warn!("cannot get remaining stake for channel without ticket manager");
329 Ok(channel.balance)
330 }
331 }
332}
333
334#[cfg(test)]
335mod tests {
336 use hopr_api::{
337 tickets::{TicketFactory, TicketManagement},
338 types::crypto::prelude::Keypair,
339 };
340 use hopr_chain_connector::ChainKeypair;
341
342 use super::*;
343 use crate::{MemoryStore, traits::tests::generate_owned_tickets};
344
345 fn create_factory() -> anyhow::Result<HoprTicketFactory<MemoryStore>> {
346 Ok(HoprTicketFactory::new(MemoryStore::default()))
347 }
348
349 #[test]
350 fn ticket_factory_remaining_incoming_channel_stake_should_behave_as_identity_without_manager() -> anyhow::Result<()>
351 {
352 let factory = create_factory()?;
353 let channel = ChannelEntry::builder()
354 .between(&ChainKeypair::random(), &ChainKeypair::random())
355 .amount(10)
356 .ticket_index(0)
357 .status(ChannelStatus::Open)
358 .epoch(1)
359 .build()?;
360
361 assert_eq!(channel.balance, factory.remaining_incoming_channel_stake(&channel)?);
362 Ok(())
363 }
364
365 #[test]
366 fn ticket_factory_remaining_incoming_channel_stake_should_be_reduced_by_unrealized_value() -> anyhow::Result<()> {
367 let (manager, factory) = crate::HoprTicketManager::new_with_factory(MemoryStore::default());
368
369 let src = ChainKeypair::random();
370 let dst = ChainKeypair::random();
371
372 let tickets = generate_owned_tickets(&src, &dst, 2, 1..=1)?;
373
374 let channel = ChannelEntry::builder()
375 .between(&src, &dst)
376 .balance(tickets[0].verified_ticket().amount * 10)
377 .ticket_index(0)
378 .status(ChannelStatus::Open)
379 .epoch(1)
380 .build()?;
381
382 assert_eq!(channel.balance, factory.remaining_incoming_channel_stake(&channel)?);
383
384 manager.insert_incoming_ticket(tickets[0])?;
385
386 assert_eq!(
387 channel.balance - tickets[0].verified_ticket().amount,
388 factory.remaining_incoming_channel_stake(&channel)?
389 );
390
391 manager.insert_incoming_ticket(tickets[1])?;
392
393 assert_eq!(
394 channel.balance - tickets[0].verified_ticket().amount - tickets[1].verified_ticket().amount,
395 factory.remaining_incoming_channel_stake(&channel)?
396 );
397
398 drop(manager);
399
400 assert_eq!(channel.balance, factory.remaining_incoming_channel_stake(&channel)?);
401
402 Ok(())
403 }
404
405 #[test]
406 fn ticket_factory_should_not_create_tickets_with_zero_winning_probability() -> anyhow::Result<()> {
407 let factory = create_factory()?;
408
409 let src = ChainKeypair::random();
410 let dst = ChainKeypair::random();
411
412 let channel = ChannelEntry::builder()
413 .between(&src, &dst)
414 .amount(10)
415 .ticket_index(1)
416 .status(ChannelStatus::Open)
417 .epoch(1)
418 .build()?;
419
420 assert!(
421 factory
422 .new_multihop_ticket(&channel, 2.try_into()?, WinningProbability::NEVER, 10.into())
423 .is_err()
424 );
425
426 Ok(())
427 }
428
429 #[test]
430 fn ticket_factory_should_create_multihop_tickets() -> anyhow::Result<()> {
431 let factory = create_factory()?;
432
433 let src = ChainKeypair::random();
434 let dst = ChainKeypair::random();
435
436 let channel = ChannelEntry::builder()
437 .between(&src, &dst)
438 .amount(10)
439 .ticket_index(1)
440 .status(ChannelStatus::Open)
441 .epoch(1)
442 .build()?;
443
444 factory.sync_from_outgoing_channels(&[channel])?;
446
447 let ticket_1 = factory
448 .new_multihop_ticket(&channel, 2.try_into()?, WinningProbability::ALWAYS, 10.into())?
449 .eth_challenge(Default::default())
450 .build_signed(&src, &Default::default())?;
451
452 assert_eq!(ticket_1.channel_id(), channel.get_id());
453 assert_eq!(channel.ticket_index, ticket_1.verified_ticket().index);
454 assert_eq!(channel.channel_epoch, ticket_1.verified_ticket().channel_epoch);
455
456 let ticket_2 = factory
457 .new_multihop_ticket(&channel, 2.try_into()?, WinningProbability::ALWAYS, 10.into())?
458 .eth_challenge(Default::default())
459 .build_signed(&src, &Default::default())?;
460
461 assert_eq!(ticket_2.channel_id(), channel.get_id());
462 assert_eq!(channel.ticket_index + 1, ticket_2.verified_ticket().index);
463 assert_eq!(channel.channel_epoch, ticket_2.verified_ticket().channel_epoch);
464
465 assert!(
467 factory
468 .new_multihop_ticket(&channel, 1.try_into()?, WinningProbability::ALWAYS, 10.into())
469 .is_err()
470 );
471
472 Ok(())
473 }
474
475 #[test]
476 fn ticket_manager_create_multihop_ticket_should_fail_on_wrong_input() -> anyhow::Result<()> {
477 let factory = create_factory()?;
478
479 let src = ChainKeypair::random();
480 let dst = ChainKeypair::random();
481
482 let mut channel = ChannelEntry::builder()
483 .between(&src, &dst)
484 .amount(10)
485 .ticket_index(1)
486 .status(ChannelStatus::Closed)
487 .epoch(1)
488 .build()?;
489
490 assert!(
491 factory
492 .new_multihop_ticket(&channel, 2.try_into()?, WinningProbability::ALWAYS, 1.into())
493 .is_err()
494 );
495
496 channel.status =
497 ChannelStatus::PendingToClose(std::time::SystemTime::now() - std::time::Duration::from_secs(10));
498
499 assert!(
500 factory
501 .new_multihop_ticket(&channel, 2.try_into()?, WinningProbability::ALWAYS, 1.into())
502 .is_err()
503 );
504
505 channel.status = ChannelStatus::Open;
506
507 assert!(
508 factory
509 .new_multihop_ticket(&channel, 2.try_into()?, WinningProbability::ALWAYS, 11.into())
510 .is_err()
511 );
512
513 assert!(
514 factory
515 .new_multihop_ticket(&channel, 1.try_into()?, WinningProbability::ALWAYS, 1.into())
516 .is_err()
517 );
518
519 Ok(())
520 }
521
522 #[test]
523 fn ticket_manager_test_next_outgoing_ticket_index() -> anyhow::Result<()> {
524 let factory = create_factory()?;
525
526 let src = ChainKeypair::random();
527 let dst = ChainKeypair::random();
528
529 let mut channel = ChannelEntry::builder()
530 .between(&src, &dst)
531 .amount(10)
532 .ticket_index(0)
533 .status(ChannelStatus::Open)
534 .epoch(1)
535 .build()?;
536
537 assert_eq!(0, factory.next_outgoing_ticket_index(&channel));
538
539 channel.ticket_index = 10;
540 assert_eq!(10, factory.next_outgoing_ticket_index(&channel));
541 assert_eq!(11, factory.next_outgoing_ticket_index(&channel));
542
543 channel.ticket_index = 100;
544 assert_eq!(100, factory.next_outgoing_ticket_index(&channel));
545 assert_eq!(101, factory.next_outgoing_ticket_index(&channel));
546
547 channel.ticket_index = 50;
548 assert_eq!(102, factory.next_outgoing_ticket_index(&channel));
549 assert_eq!(103, factory.next_outgoing_ticket_index(&channel));
550
551 factory.save_outgoing_indices()?;
552 assert_eq!(
553 Some(104),
554 factory.store.read().load_outgoing_index(channel.get_id(), 1)?
555 );
556
557 channel.ticket_index = 0;
558 channel.channel_epoch = 2;
559
560 assert_eq!(0, factory.next_outgoing_ticket_index(&channel));
561 factory.save_outgoing_indices()?;
562
563 assert_eq!(None, factory.store.read().load_outgoing_index(channel.get_id(), 1)?);
564 assert_eq!(Some(1), factory.store.read().load_outgoing_index(channel.get_id(), 2)?);
565
566 assert_eq!(1, factory.next_outgoing_ticket_index(&channel));
567
568 Ok(())
569 }
570
571 #[test]
572 fn ticket_manager_should_save_out_indices_to_the_store_on_demand() -> anyhow::Result<()> {
573 let factory = create_factory()?;
574
575 let src = ChainKeypair::random();
576 let dst = ChainKeypair::random();
577
578 let channel = ChannelEntry::builder()
579 .between(&src, &dst)
580 .amount(10)
581 .ticket_index(1)
582 .status(ChannelStatus::Open)
583 .epoch(1)
584 .build()?;
585
586 factory.sync_from_outgoing_channels(&[channel])?;
588
589 factory.new_multihop_ticket(&channel, 2.try_into()?, WinningProbability::ALWAYS, 10.into())?;
590
591 let saved_index = factory.store.read().load_outgoing_index(channel.get_id(), 1)?;
593 assert_eq!(None, saved_index);
594
595 factory.new_multihop_ticket(&channel, 2.try_into()?, WinningProbability::ALWAYS, 10.into())?;
596
597 factory.save_outgoing_indices()?;
598 let saved_index = factory.store.read().load_outgoing_index(channel.get_id(), 1)?;
599 assert_eq!(Some(3), saved_index);
600
601 factory.new_multihop_ticket(&channel, 2.try_into()?, WinningProbability::ALWAYS, 10.into())?;
602
603 let saved_index = factory.store.read().load_outgoing_index(channel.get_id(), 1)?;
604 assert_eq!(Some(3), saved_index);
605
606 factory.save_outgoing_indices()?;
607 let saved_index = factory.store.read().load_outgoing_index(channel.get_id(), 1)?;
608 assert_eq!(Some(4), saved_index);
609
610 Ok(())
611 }
612
613 #[test]
614 fn ticket_manager_should_sync_out_indices_from_chain_state() -> anyhow::Result<()> {
615 let factory = create_factory()?;
616
617 let src = ChainKeypair::random();
618 let dst = ChainKeypair::random();
619
620 let channel = ChannelEntry::builder()
621 .between(&src, &dst)
622 .amount(10)
623 .ticket_index(1)
624 .status(ChannelStatus::Open)
625 .epoch(1)
626 .build()?;
627
628 factory.sync_from_outgoing_channels(&[channel])?;
629 factory.save_outgoing_indices()?;
630
631 let saved_index = factory.store.read().load_outgoing_index(channel.get_id(), 1)?;
632 assert_eq!(Some(1), saved_index);
633
634 Ok(())
635 }
636
637 #[test_log::test]
638 fn ticket_manager_should_sync_out_indices_should_remove_indices_for_non_opened_outgoing_channels()
639 -> anyhow::Result<()> {
640 let factory = create_factory()?;
641
642 let src = ChainKeypair::random();
643 let dst = ChainKeypair::random();
644
645 let mut channel_1 = ChannelEntry::builder()
646 .between(&src, &dst)
647 .amount(10)
648 .ticket_index(0)
649 .status(ChannelStatus::Open)
650 .epoch(1)
651 .build()?;
652
653 let mut channel_2 = ChannelEntry::builder()
654 .between(&dst, &src)
655 .amount(10)
656 .ticket_index(0)
657 .status(ChannelStatus::Open)
658 .epoch(1)
659 .build()?;
660
661 let ticket_1 = factory
662 .new_multihop_ticket(&channel_1, 2.try_into()?, WinningProbability::ALWAYS, 10.into())?
663 .eth_challenge(Default::default())
664 .build()?;
665 let ticket_2 = factory
666 .new_multihop_ticket(&channel_2, 2.try_into()?, WinningProbability::ALWAYS, 10.into())?
667 .eth_challenge(Default::default())
668 .build()?;
669 assert_eq!(0, ticket_1.index);
670 assert_eq!(0, ticket_2.index);
671
672 factory.save_outgoing_indices()?;
673
674 assert_eq!(
675 Some(1),
676 factory.store.read().load_outgoing_index(channel_1.get_id(), 1)?
677 );
678 assert_eq!(
679 Some(1),
680 factory.store.read().load_outgoing_index(channel_2.get_id(), 1)?
681 );
682
683 channel_1.status = ChannelStatus::Closed;
684 channel_2.status =
685 ChannelStatus::PendingToClose(std::time::SystemTime::now() - std::time::Duration::from_mins(10));
686
687 factory.sync_from_outgoing_channels(&[channel_1, channel_2])?;
688
689 assert_eq!(None, factory.store.read().load_outgoing_index(channel_1.get_id(), 1)?);
690 assert_eq!(None, factory.store.read().load_outgoing_index(channel_2.get_id(), 1)?);
691
692 Ok(())
693 }
694}