1use std::{
2 cmp,
3 ops::{Add, Bound},
4 sync::{
5 Arc,
6 atomic::{AtomicU64, Ordering},
7 },
8};
9
10use async_stream::stream;
11use async_trait::async_trait;
12use futures::{StreamExt, TryStreamExt, stream::BoxStream};
13use hopr_crypto_types::prelude::*;
14use hopr_db_api::{
15 errors::Result,
16 info::DomainSeparator,
17 prelude::{TicketIndexSelector, TicketMarker},
18 resolver::HoprDbResolverOperations,
19 tickets::{AggregationPrerequisites, ChannelTicketStatistics, HoprDbTicketOperations, TicketSelector},
20};
21use hopr_db_entity::{outgoing_ticket_index, ticket, ticket_statistics};
22use hopr_internal_types::prelude::*;
23#[cfg(all(feature = "prometheus", not(test)))]
24use hopr_metrics::metrics::MultiGauge;
25use hopr_primitive_types::prelude::*;
26use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QueryOrder, QuerySelect, Set};
27use sea_query::{Condition, Expr, IntoCondition, SimpleExpr};
28use tracing::{debug, error, info, trace, warn};
29
30use crate::{
31 HoprDbGeneralModelOperations, OpenTransaction, OptTx, TargetDb,
32 channels::HoprDbChannelOperations,
33 db::HoprDb,
34 errors::{DbSqlError, DbSqlError::LogicalError},
35 info::HoprDbInfoOperations,
36};
37
38#[cfg(all(feature = "prometheus", not(test)))]
39lazy_static::lazy_static! {
40 pub static ref METRIC_HOPR_TICKETS_INCOMING_STATISTICS: MultiGauge = MultiGauge::new(
41 "hopr_tickets_incoming_statistics",
42 "Ticket statistics for channels with incoming tickets.",
43 &["channel", "statistic"]
44 ).unwrap();
45}
46
47pub const MAX_TICKETS_TO_AGGREGATE_BATCH: u64 = 500;
49
50#[derive(Clone)]
54pub(crate) struct WrappedTicketSelector(pub(crate) TicketSelector);
55
56impl From<TicketSelector> for WrappedTicketSelector {
57 fn from(selector: TicketSelector) -> Self {
58 Self(selector)
59 }
60}
61
62impl AsRef<TicketSelector> for WrappedTicketSelector {
63 fn as_ref(&self) -> &TicketSelector {
64 &self.0
65 }
66}
67
68impl IntoCondition for WrappedTicketSelector {
69 fn into_condition(self) -> Condition {
70 let expr = self
71 .0
72 .channel_identifiers
73 .into_iter()
74 .map(|(channel_id, epoch)| {
75 ticket::Column::ChannelId
76 .eq(channel_id.to_hex())
77 .and(ticket::Column::ChannelEpoch.eq(epoch.to_be_bytes().to_vec()))
78 })
79 .reduce(SimpleExpr::or);
80
81 if expr.is_none() {
83 return Condition::any().not();
84 }
85
86 let mut expr = expr.unwrap();
87
88 match self.0.index {
89 TicketIndexSelector::None => {
90 }
92 TicketIndexSelector::Single(idx) => expr = expr.and(ticket::Column::Index.eq(idx.to_be_bytes().to_vec())),
93 TicketIndexSelector::Multiple(idxs) => {
94 expr = expr.and(ticket::Column::Index.is_in(idxs.into_iter().map(|i| i.to_be_bytes().to_vec())));
95 }
96 TicketIndexSelector::Range((lb, ub)) => {
97 expr = match lb {
98 Bound::Included(gte) => expr.and(ticket::Column::Index.gte(gte.to_be_bytes().to_vec())),
99 Bound::Excluded(gt) => expr.and(ticket::Column::Index.gt(gt.to_be_bytes().to_vec())),
100 Bound::Unbounded => expr,
101 };
102 expr = match ub {
103 Bound::Included(lte) => expr.and(ticket::Column::Index.lte(lte.to_be_bytes().to_vec())),
104 Bound::Excluded(lt) => expr.and(ticket::Column::Index.lt(lt.to_be_bytes().to_vec())),
105 Bound::Unbounded => expr,
106 };
107 }
108 }
109
110 if let Some(state) = self.0.state {
111 expr = expr.and(ticket::Column::State.eq(state as u8))
112 }
113
114 if self.0.only_aggregated {
115 expr = expr.and(ticket::Column::IndexOffset.gt(1));
116 }
117
118 expr = match self.0.win_prob.0 {
120 Bound::Included(gte) => expr.and(ticket::Column::WinningProbability.gte(gte.as_encoded().to_vec())),
121 Bound::Excluded(gt) => expr.and(ticket::Column::WinningProbability.gt(gt.as_encoded().to_vec())),
122 Bound::Unbounded => expr,
123 };
124
125 expr = match self.0.win_prob.1 {
127 Bound::Included(lte) => expr.and(ticket::Column::WinningProbability.lte(lte.as_encoded().to_vec())),
128 Bound::Excluded(lt) => expr.and(ticket::Column::WinningProbability.lt(lt.as_encoded().to_vec())),
129 Bound::Unbounded => expr,
130 };
131
132 expr = match self.0.amount.0 {
134 Bound::Included(gte) => expr.and(ticket::Column::Amount.gte(gte.amount().to_be_bytes().to_vec())),
135 Bound::Excluded(gt) => expr.and(ticket::Column::Amount.gt(gt.amount().to_be_bytes().to_vec())),
136 Bound::Unbounded => expr,
137 };
138
139 expr = match self.0.amount.1 {
141 Bound::Included(lte) => expr.and(ticket::Column::Amount.lte(lte.amount().to_be_bytes().to_vec())),
142 Bound::Excluded(lt) => expr.and(ticket::Column::Amount.lt(lt.amount().to_be_bytes().to_vec())),
143 Bound::Unbounded => expr,
144 };
145
146 expr.into_condition()
147 }
148}
149
150pub(crate) fn filter_satisfying_ticket_models(
159 prerequisites: AggregationPrerequisites,
160 models: Vec<ticket::Model>,
161 channel_entry: &ChannelEntry,
162 min_win_prob: WinningProbability,
163) -> crate::errors::Result<Vec<ticket::Model>> {
164 let channel_id = channel_entry.get_id();
165
166 let mut to_be_aggregated = Vec::with_capacity(models.len());
167 let mut total_balance = HoprBalance::zero();
168
169 for m in models {
170 let ticket_wp: WinningProbability = m
171 .winning_probability
172 .as_slice()
173 .try_into()
174 .map_err(|_| DbSqlError::DecodingError)?;
175
176 if ticket_wp.approx_cmp(&min_win_prob).is_lt() {
177 warn!(
178 channel_id = %channel_entry.get_id(),
179 %ticket_wp, %min_win_prob, "encountered ticket with winning probability lower than the minimum threshold"
180 );
181 continue;
182 }
183
184 let to_add = HoprBalance::from_be_bytes(&m.amount);
185
186 total_balance += to_add;
188 if total_balance.gt(&channel_entry.balance) {
189 total_balance -= to_add;
191 break;
192 }
193
194 to_be_aggregated.push(m);
195 }
196
197 if prerequisites.min_ticket_count.is_none() && prerequisites.min_unaggregated_ratio.is_none() {
199 info!(channel = %channel_id, "Aggregation check OK, no aggregation prerequisites were given");
200 return Ok(to_be_aggregated);
201 }
202
203 let to_be_agg_count = to_be_aggregated.len();
204
205 if let Some(agg_threshold) = prerequisites.min_ticket_count {
207 if to_be_agg_count >= agg_threshold {
208 info!(channel = %channel_id, count = to_be_agg_count, threshold = agg_threshold, "Aggregation check OK aggregated value greater than threshold");
209 return Ok(to_be_aggregated);
210 } else {
211 debug!(channel = %channel_id, count = to_be_agg_count, threshold = agg_threshold,"Aggregation check FAIL not enough resources to aggregate");
212 }
213 }
214
215 if let Some(unrealized_threshold) = prerequisites.min_unaggregated_ratio {
216 let diminished_balance = channel_entry.balance.mul_f64(unrealized_threshold)?;
217
218 if total_balance.ge(&diminished_balance) {
221 if to_be_agg_count > 1 {
222 info!(channel = %channel_id, count = to_be_agg_count, balance = ?total_balance, ?diminished_balance, "Aggregation check OK: more unrealized than diminished balance");
223 return Ok(to_be_aggregated);
224 } else {
225 debug!(channel = %channel_id, count = to_be_agg_count, balance = ?total_balance, ?diminished_balance, "Aggregation check FAIL: more unrealized than diminished balance but only 1 ticket");
226 }
227 } else {
228 debug!(channel = %channel_id, count = to_be_agg_count, balance = ?total_balance, ?diminished_balance, "Aggregation check FAIL: less unrealized than diminished balance");
229 }
230 }
231
232 debug!(channel = %channel_id,"Aggregation check FAIL: no prerequisites were met");
233 Ok(vec![])
234}
235
236pub(crate) async fn find_stats_for_channel(
237 tx: &OpenTransaction,
238 channel_id: &Hash,
239) -> crate::errors::Result<ticket_statistics::Model> {
240 if let Some(model) = ticket_statistics::Entity::find()
241 .filter(ticket_statistics::Column::ChannelId.eq(channel_id.to_hex()))
242 .one(tx.as_ref())
243 .await?
244 {
245 Ok(model)
246 } else {
247 let new_stats = ticket_statistics::ActiveModel {
248 channel_id: Set(channel_id.to_hex()),
249 ..Default::default()
250 }
251 .insert(tx.as_ref())
252 .await?;
253
254 Ok(new_stats)
255 }
256}
257
258impl HoprDb {
259 async fn get_tickets_value_int<'a>(
260 &'a self,
261 tx: OptTx<'a>,
262 selector: TicketSelector,
263 ) -> Result<(usize, HoprBalance)> {
264 let selector: WrappedTicketSelector = selector.into();
265 Ok(self
266 .nest_transaction_in_db(tx, TargetDb::Tickets)
267 .await?
268 .perform(|tx| {
269 Box::pin(async move {
270 ticket::Entity::find()
271 .filter(selector)
272 .stream(tx.as_ref())
273 .await
274 .map_err(DbSqlError::from)?
275 .map_err(DbSqlError::from)
276 .try_fold((0_usize, HoprBalance::zero()), |(count, value), t| async move {
277 Ok((count + 1, value + HoprBalance::from_be_bytes(t.amount)))
278 })
279 .await
280 })
281 })
282 .await?)
283 }
284}
285
286#[async_trait]
287impl HoprDbTicketOperations for HoprDb {
288 async fn get_all_tickets(&self) -> Result<Vec<AcknowledgedTicket>> {
289 Ok(self
290 .nest_transaction_in_db(None, TargetDb::Tickets)
291 .await?
292 .perform(|tx| {
293 Box::pin(async move {
294 ticket::Entity::find()
295 .all(tx.as_ref())
296 .await?
297 .into_iter()
298 .map(AcknowledgedTicket::try_from)
299 .collect::<hopr_db_entity::errors::Result<Vec<_>>>()
300 .map_err(DbSqlError::from)
301 })
302 })
303 .await?)
304 }
305
306 async fn get_tickets(&self, selector: TicketSelector) -> Result<Vec<AcknowledgedTicket>> {
307 debug!("fetching tickets via {selector}");
308 let selector: WrappedTicketSelector = selector.into();
309
310 Ok(self
311 .nest_transaction_in_db(None, TargetDb::Tickets)
312 .await?
313 .perform(|tx| {
314 Box::pin(async move {
315 ticket::Entity::find()
316 .filter(selector)
317 .all(tx.as_ref())
318 .await?
319 .into_iter()
320 .map(AcknowledgedTicket::try_from)
321 .collect::<hopr_db_entity::errors::Result<Vec<_>>>()
322 .map_err(DbSqlError::from)
323 })
324 })
325 .await?)
326 }
327
328 async fn mark_tickets_as(&self, selector: TicketSelector, mark_as: TicketMarker) -> Result<usize> {
329 let myself = self.clone();
330 Ok(self
331 .ticket_manager
332 .with_write_locked_db(|tx| {
333 Box::pin(async move {
334 let mut total_marked_count = 0;
335 for (channel_id, epoch) in selector.channel_identifiers.iter() {
336 let channel_selector = selector.clone().just_on_channel(*channel_id, epoch);
337
338 let (marked_count, marked_value) =
340 myself.get_tickets_value_int(Some(tx), channel_selector.clone()).await?;
341 trace!(marked_count, ?marked_value, ?mark_as, "ticket marking");
342
343 if marked_count > 0 {
344 let deleted = ticket::Entity::delete_many()
346 .filter(WrappedTicketSelector::from(channel_selector.clone()))
347 .exec(tx.as_ref())
348 .await?;
349
350 if deleted.rows_affected == marked_count as u64 {
352 let mut new_stats = find_stats_for_channel(tx, channel_id).await?.into_active_model();
353 let _current_value = match mark_as {
354 TicketMarker::Redeemed => {
355 let current_value = U256::from_be_bytes(new_stats.redeemed_value.as_ref());
356 new_stats.redeemed_value =
357 Set((current_value + marked_value.amount()).to_be_bytes().into());
358 current_value
359 }
360 TicketMarker::Rejected => {
361 let current_value = U256::from_be_bytes(new_stats.rejected_value.as_ref());
362 new_stats.rejected_value =
363 Set((current_value + marked_value.amount()).to_be_bytes().into());
364 current_value
365 }
366 TicketMarker::Neglected => {
367 let current_value = U256::from_be_bytes(new_stats.neglected_value.as_ref());
368 new_stats.neglected_value =
369 Set((current_value + marked_value.amount()).to_be_bytes().into());
370 current_value
371 }
372 };
373 new_stats.save(tx.as_ref()).await?;
374
375 #[cfg(all(feature = "prometheus", not(test)))]
376 {
377 let channel = channel_id.to_string();
378 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
379 &[&channel, &mark_as.to_string()],
380 (_current_value + marked_value.amount()).as_u128() as f64,
381 );
382
383 if mark_as != TicketMarker::Rejected {
386 let unredeemed_value = myself
387 .caches
388 .unrealized_value
389 .get(&(*channel_id, *epoch))
390 .await
391 .unwrap_or_default();
392
393 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
394 &[&channel, "unredeemed"],
395 (unredeemed_value - marked_value.amount()).amount().as_u128() as f64,
396 );
397 }
398 }
399
400 myself.caches.unrealized_value.invalidate(&(*channel_id, *epoch)).await;
401 } else {
402 return Err(DbSqlError::LogicalError(format!(
403 "could not mark {marked_count} ticket as {mark_as}"
404 )));
405 }
406
407 trace!(marked_count, ?channel_id, ?mark_as, "removed tickets in channel");
408 total_marked_count += marked_count;
409 }
410 }
411
412 info!(
413 count = total_marked_count,
414 ?mark_as,
415 channel_count = selector.channel_identifiers.len(),
416 "removed tickets in channels",
417 );
418 Ok(total_marked_count)
419 })
420 })
421 .await?)
422 }
423
424 async fn mark_unsaved_ticket_rejected(&self, ticket: &Ticket) -> Result<()> {
425 let channel_id = ticket.channel_id;
426 let amount = ticket.amount;
427 Ok(self
428 .ticket_manager
429 .with_write_locked_db(|tx| {
430 Box::pin(async move {
431 let stats = find_stats_for_channel(tx, &channel_id).await?;
432
433 let current_rejected_value = U256::from_be_bytes(stats.rejected_value.clone());
434
435 let mut active_stats = stats.into_active_model();
436 active_stats.rejected_value = Set((current_rejected_value + amount.amount()).to_be_bytes().into());
437 active_stats.save(tx.as_ref()).await?;
438
439 #[cfg(all(feature = "prometheus", not(test)))]
440 {
441 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
442 &[&channel_id.to_string(), "rejected"],
443 (current_rejected_value + amount.amount()).as_u128() as f64,
444 );
445 }
446
447 Ok::<(), DbSqlError>(())
448 })
449 })
450 .await?)
451 }
452
453 async fn update_ticket_states_and_fetch<'a>(
454 &'a self,
455 selector: TicketSelector,
456 new_state: AcknowledgedTicketStatus,
457 ) -> Result<BoxStream<'a, AcknowledgedTicket>> {
458 let selector: WrappedTicketSelector = selector.into();
459 Ok(Box::pin(stream! {
460 match ticket::Entity::find()
461 .filter(selector)
462 .stream(self.conn(TargetDb::Tickets))
463 .await {
464 Ok(mut stream) => {
465 while let Ok(Some(ticket)) = stream.try_next().await {
466 let active_ticket = ticket::ActiveModel {
467 id: Set(ticket.id),
468 state: Set(new_state as i8),
469 ..Default::default()
470 };
471
472 {
473 let _g = self.ticket_manager.mutex.lock();
474 if let Err(e) = active_ticket.update(self.conn(TargetDb::Tickets)).await {
475 error!(error = %e,"failed to update ticket in the db");
476 }
477 }
478
479 match AcknowledgedTicket::try_from(ticket) {
480 Ok(mut ticket) => {
481 ticket.status = new_state;
483 yield ticket
484 },
485 Err(e) => {
486 tracing::error!(error = %e, "failed to decode ticket from the db");
487 }
488 }
489 }
490 },
491 Err(e) => tracing::error!(error = %e, "failed open ticket db stream")
492 }
493 }))
494 }
495
496 async fn update_ticket_states(
497 &self,
498 selector: TicketSelector,
499 new_state: AcknowledgedTicketStatus,
500 ) -> Result<usize> {
501 let selector: WrappedTicketSelector = selector.into();
502 Ok(self
503 .ticket_manager
504 .with_write_locked_db(|tx| {
505 Box::pin(async move {
506 let update = ticket::Entity::update_many()
507 .filter(selector)
508 .col_expr(ticket::Column::State, Expr::value(new_state as u8))
509 .exec(tx.as_ref())
510 .await?;
511 Ok::<_, DbSqlError>(update.rows_affected as usize)
512 })
513 })
514 .await?)
515 }
516
517 async fn get_ticket_statistics(&self, channel_id: Option<Hash>) -> Result<ChannelTicketStatistics> {
518 let res = match channel_id {
519 None => {
520 #[cfg(all(feature = "prometheus", not(test)))]
521 let mut per_channel_unredeemed = std::collections::HashMap::new();
522
523 self.nest_transaction_in_db(None, TargetDb::Tickets)
524 .await?
525 .perform(|tx| {
526 Box::pin(async move {
527 let unredeemed_value = ticket::Entity::find()
528 .stream(tx.as_ref())
529 .await?
530 .try_fold(U256::zero(), |amount, x| {
531 let unredeemed_value = U256::from_be_bytes(x.amount);
532
533 #[cfg(all(feature = "prometheus", not(test)))]
534 per_channel_unredeemed
535 .entry(x.channel_id)
536 .and_modify(|v| *v += unredeemed_value)
537 .or_insert(unredeemed_value);
538
539 futures::future::ok(amount + unredeemed_value)
540 })
541 .await?;
542
543 #[cfg(all(feature = "prometheus", not(test)))]
544 for (channel_id, unredeemed_value) in per_channel_unredeemed {
545 METRIC_HOPR_TICKETS_INCOMING_STATISTICS
546 .set(&[&channel_id, "unredeemed"], unredeemed_value.as_u128() as f64);
547 }
548
549 let mut all_stats = ticket_statistics::Entity::find()
550 .all(tx.as_ref())
551 .await?
552 .into_iter()
553 .fold(ChannelTicketStatistics::default(), |mut acc, stats| {
554 let neglected_value = HoprBalance::from_be_bytes(stats.neglected_value);
555 acc.neglected_value += neglected_value;
556 let redeemed_value = HoprBalance::from_be_bytes(stats.redeemed_value);
557 acc.redeemed_value += redeemed_value;
558 let rejected_value = HoprBalance::from_be_bytes(stats.rejected_value);
559 acc.rejected_value += rejected_value;
560 acc.winning_tickets += stats.winning_tickets as u128;
561
562 #[cfg(all(feature = "prometheus", not(test)))]
563 {
564 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
565 &[&stats.channel_id, "neglected"],
566 neglected_value.amount().as_u128() as f64,
567 );
568
569 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
570 &[&stats.channel_id, "redeemed"],
571 redeemed_value.amount().as_u128() as f64,
572 );
573
574 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
575 &[&stats.channel_id, "rejected"],
576 rejected_value.amount().as_u128() as f64,
577 );
578 }
579
580 acc
581 });
582
583 all_stats.unredeemed_value = unredeemed_value.into();
584
585 Ok::<_, DbSqlError>(all_stats)
586 })
587 })
588 .await
589 }
590 Some(channel) => {
591 if self.get_channel_by_id(None, &channel).await?.is_none() {
594 return Err(DbSqlError::ChannelNotFound(channel).into());
595 }
596
597 self.nest_transaction_in_db(None, TargetDb::Tickets)
598 .await?
599 .perform(|tx| {
600 Box::pin(async move {
601 let stats = find_stats_for_channel(tx, &channel).await?;
602 let unredeemed_value = ticket::Entity::find()
603 .filter(ticket::Column::ChannelId.eq(channel.to_hex()))
604 .stream(tx.as_ref())
605 .await?
606 .try_fold(U256::zero(), |amount, x| {
607 futures::future::ok(amount + U256::from_be_bytes(x.amount))
608 })
609 .await?;
610
611 Ok::<_, DbSqlError>(ChannelTicketStatistics {
612 winning_tickets: stats.winning_tickets as u128,
613 neglected_value: HoprBalance::from_be_bytes(stats.neglected_value),
614 redeemed_value: HoprBalance::from_be_bytes(stats.redeemed_value),
615 unredeemed_value: unredeemed_value.into(),
616 rejected_value: HoprBalance::from_be_bytes(stats.rejected_value),
617 })
618 })
619 })
620 .await
621 }
622 };
623 Ok(res?)
624 }
625
626 async fn reset_ticket_statistics(&self) -> Result<()> {
627 let res = self
628 .nest_transaction_in_db(None, TargetDb::Tickets)
629 .await?
630 .perform(|tx| {
631 Box::pin(async move {
632 #[cfg(all(feature = "prometheus", not(test)))]
633 let rows = ticket_statistics::Entity::find().all(tx.as_ref()).await?;
634
635 let deleted = ticket_statistics::Entity::delete_many().exec(tx.as_ref()).await?;
637
638 #[cfg(all(feature = "prometheus", not(test)))]
639 {
640 if deleted.rows_affected > 0 {
641 for row in rows {
642 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&[&row.channel_id, "neglected"], 0.0_f64);
643
644 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&[&row.channel_id, "redeemed"], 0.0_f64);
645
646 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&[&row.channel_id, "rejected"], 0.0_f64);
647 }
648 }
649 }
650
651 debug!("reset ticket statistics for {:} channel(s)", deleted.rows_affected);
652
653 Ok::<_, DbSqlError>(())
654 })
655 })
656 .await;
657
658 Ok(res?)
659 }
660
661 async fn get_tickets_value(&self, selector: TicketSelector) -> Result<(usize, HoprBalance)> {
662 self.get_tickets_value_int(None, selector).await
663 }
664
665 async fn compare_and_set_outgoing_ticket_index(&self, channel_id: Hash, index: u64) -> Result<u64> {
666 let old_value = self
667 .get_outgoing_ticket_index(channel_id)
668 .await?
669 .fetch_max(index, Ordering::SeqCst);
670
671 Ok(old_value)
675 }
676
677 async fn reset_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64> {
678 let old_value = self
679 .get_outgoing_ticket_index(channel_id)
680 .await?
681 .swap(0, Ordering::SeqCst);
682
683 Ok(old_value)
687 }
688
689 async fn increment_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64> {
690 let old_value = self
691 .get_outgoing_ticket_index(channel_id)
692 .await?
693 .fetch_add(1, Ordering::SeqCst);
694
695 Ok(old_value)
698 }
699
700 async fn get_outgoing_ticket_index(&self, channel_id: Hash) -> Result<Arc<AtomicU64>> {
701 let tkt_manager = self.ticket_manager.clone();
702
703 Ok(self
704 .caches
705 .ticket_index
706 .try_get_with(channel_id, async move {
707 let maybe_index = outgoing_ticket_index::Entity::find()
708 .filter(outgoing_ticket_index::Column::ChannelId.eq(channel_id.to_hex()))
709 .one(&tkt_manager.tickets_db)
710 .await?;
711
712 Ok(Arc::new(AtomicU64::new(match maybe_index {
713 Some(model) => U256::from_be_bytes(model.index).as_u64(),
714 None => {
715 tkt_manager
716 .with_write_locked_db(|tx| {
717 Box::pin(async move {
718 outgoing_ticket_index::ActiveModel {
719 channel_id: Set(channel_id.to_hex()),
720 ..Default::default()
721 }
722 .insert(tx.as_ref())
723 .await?;
724 Ok::<_, DbSqlError>(())
725 })
726 })
727 .await?;
728 0_u64
729 }
730 })))
731 })
732 .await
733 .map_err(|e: Arc<DbSqlError>| LogicalError(format!("failed to retrieve ticket index: {e}")))?)
734 }
735
736 async fn persist_outgoing_ticket_indices(&self) -> Result<usize> {
737 let outgoing_indices = outgoing_ticket_index::Entity::find()
738 .all(&self.tickets_db)
739 .await
740 .map_err(DbSqlError::from)?;
741
742 let mut updated = 0;
743 for index_model in outgoing_indices {
744 let channel_id = Hash::from_hex(&index_model.channel_id).map_err(DbSqlError::from)?;
745 let db_index = U256::from_be_bytes(&index_model.index).as_u64();
746 if let Some(cached_index) = self.caches.ticket_index.get(&channel_id).await {
747 let cached_index = cached_index.load(Ordering::SeqCst);
751
752 if cached_index > db_index {
754 let mut index_active_model = index_model.into_active_model();
755 index_active_model.index = Set(cached_index.to_be_bytes().to_vec());
756 self.ticket_manager
757 .with_write_locked_db(|wtx| {
758 Box::pin(async move {
759 index_active_model.save(wtx.as_ref()).await?;
760 Ok::<_, DbSqlError>(())
761 })
762 })
763 .await?;
764
765 debug!("updated ticket index in channel {channel_id} from {db_index} to {cached_index}");
766 updated += 1;
767 }
768 } else {
769 trace!(?channel_id, "channel not in cache yet");
772 }
773 }
774
775 Ok(Ok::<_, DbSqlError>(updated)?)
776 }
777
778 async fn prepare_aggregation_in_channel(
779 &self,
780 channel: &Hash,
781 prerequisites: AggregationPrerequisites,
782 ) -> Result<Option<(OffchainPublicKey, Vec<TransferableWinningTicket>, Hash)>> {
783 let myself = self.clone();
784
785 let channel_id = *channel;
786
787 let (channel_entry, peer, domain_separator, min_win_prob) = self
788 .nest_transaction_in_db(None, TargetDb::Index)
789 .await?
790 .perform(|tx| {
791 Box::pin(async move {
792 let entry = myself
793 .get_channel_by_id(Some(tx), &channel_id)
794 .await?
795 .ok_or(DbSqlError::ChannelNotFound(channel_id))?;
796
797 if entry.status == ChannelStatus::Closed {
798 return Err(DbSqlError::LogicalError(format!("channel '{channel_id}' is closed")));
799 } else if entry.direction(&myself.me_onchain) != Some(ChannelDirection::Incoming) {
800 return Err(DbSqlError::LogicalError(format!(
801 "channel '{channel_id}' is not incoming"
802 )));
803 }
804
805 let pk = myself
806 .resolve_packet_key(&entry.source)
807 .await?
808 .ok_or(DbSqlError::LogicalError(format!(
809 "peer '{}' has no offchain key record",
810 entry.source
811 )))?;
812
813 let indexer_data = myself.get_indexer_data(Some(tx)).await?;
814
815 let domain_separator = indexer_data
816 .channels_dst
817 .ok_or_else(|| crate::errors::DbSqlError::LogicalError("domain separator missing".into()))?;
818
819 Ok((
820 entry,
821 pk,
822 domain_separator,
823 indexer_data.minimum_incoming_ticket_winning_prob,
824 ))
825 })
826 })
827 .await?;
828
829 let myself = self.clone();
830 let tickets = self
831 .ticket_manager
832 .with_write_locked_db(|tx| {
833 Box::pin(async move {
834 if ticket::Entity::find()
836 .filter(WrappedTicketSelector::from(
837 TicketSelector::from(&channel_entry).with_state(AcknowledgedTicketStatus::BeingAggregated),
838 ))
839 .one(tx.as_ref())
840 .await?
841 .is_some()
842 {
843 return Err(DbSqlError::LogicalError(format!(
844 "'{channel_entry}' is already being aggregated",
845 )));
846 }
847
848 let first_idx_to_take = ticket::Entity::find()
850 .filter(WrappedTicketSelector::from(
851 TicketSelector::from(&channel_entry).with_state(AcknowledgedTicketStatus::BeingRedeemed),
852 ))
853 .order_by_desc(ticket::Column::Index)
854 .one(tx.as_ref())
855 .await?
856 .map(|m| U256::from_be_bytes(m.index).as_u64() + 1)
857 .unwrap_or(0_u64) .max(channel_entry.ticket_index.as_u64()); let to_be_aggregated = ticket::Entity::find()
862 .filter(WrappedTicketSelector::from(TicketSelector::from(&channel_entry)))
863 .filter(ticket::Column::Index.gte(first_idx_to_take.to_be_bytes().to_vec()))
864 .filter(ticket::Column::State.ne(AcknowledgedTicketStatus::BeingAggregated as u8))
865 .order_by_asc(ticket::Column::Index).limit(MAX_TICKETS_TO_AGGREGATE_BATCH)
867 .all(tx.as_ref())
868 .await?;
869
870 let mut to_be_aggregated: Vec<TransferableWinningTicket> =
872 filter_satisfying_ticket_models(prerequisites, to_be_aggregated, &channel_entry, min_win_prob)?
873 .into_iter()
874 .map(|model| {
875 AcknowledgedTicket::try_from(model)
876 .map_err(DbSqlError::from)
877 .and_then(|ack| {
878 ack.into_transferable(&myself.chain_key, &domain_separator)
879 .map_err(DbSqlError::from)
880 })
881 })
882 .collect::<crate::errors::Result<Vec<_>>>()?;
883
884 let mut neglected_idxs = Vec::new();
885
886 if !to_be_aggregated.is_empty() {
887 let first_ticket = to_be_aggregated[0].ticket.clone();
892 let mut i = 1;
893 while i < to_be_aggregated.len() {
894 let current_idx = to_be_aggregated[i].ticket.index;
895 if (first_ticket.index..first_ticket.index + first_ticket.index_offset as u64).contains(¤t_idx) {
896 warn!(ticket_id = current_idx, channel = %channel_id, ?first_ticket, "ticket in channel has been already aggregated and will be removed");
900 neglected_idxs.push(current_idx);
901 to_be_aggregated.remove(i);
902 } else {
903 i += 1;
904 }
905 }
906
907 if !neglected_idxs.is_empty() {
910 warn!(count = neglected_idxs.len(), channel = %channel_id, "tickets were neglected due to duplication in an aggregated ticket!");
911 }
912
913 let marked: sea_orm::UpdateResult = ticket::Entity::update_many()
915 .filter(WrappedTicketSelector::from(TicketSelector::from(&channel_entry)))
916 .filter(ticket::Column::Index.is_in(to_be_aggregated.iter().map(|t| t.ticket.index.to_be_bytes().to_vec())))
917 .filter(ticket::Column::State.ne(AcknowledgedTicketStatus::BeingAggregated as u8))
918 .col_expr(
919 ticket::Column::State,
920 Expr::value(AcknowledgedTicketStatus::BeingAggregated as i8),
921 )
922 .exec(tx.as_ref())
923 .await?;
924
925 if marked.rows_affected as usize != to_be_aggregated.len() {
926 return Err(DbSqlError::LogicalError(format!(
927 "expected to mark {}, but was able to mark {}",
928 to_be_aggregated.len(),
929 marked.rows_affected,
930 )));
931 }
932 }
933
934 debug!(
935 "prepared {} tickets to aggregate in {} ({})",
936 to_be_aggregated.len(),
937 channel_entry.get_id(),
938 channel_entry.channel_epoch,
939 );
940
941 Ok(to_be_aggregated)
942 })
943 })
944 .await?;
945
946 Ok((!tickets.is_empty()).then_some((peer, tickets, domain_separator)))
947 }
948
949 async fn rollback_aggregation_in_channel(&self, channel: Hash) -> Result<()> {
950 let channel_entry = self
951 .get_channel_by_id(None, &channel)
952 .await?
953 .ok_or(DbSqlError::ChannelNotFound(channel))?;
954
955 let selector = TicketSelector::from(channel_entry).with_state(AcknowledgedTicketStatus::BeingAggregated);
956
957 let reverted = self
958 .update_ticket_states(selector, AcknowledgedTicketStatus::Untouched)
959 .await?;
960
961 info!(
962 "rollback happened for ticket aggregation in '{channel}' with {reverted} tickets rolled back as a result",
963 );
964 Ok(())
965 }
966
967 async fn process_received_aggregated_ticket(
968 &self,
969 aggregated_ticket: Ticket,
970 chain_keypair: &ChainKeypair,
971 ) -> Result<AcknowledgedTicket> {
972 if chain_keypair.public().to_address() != self.me_onchain {
973 return Err(DbSqlError::LogicalError(
974 "chain key for ticket aggregation does not match the DB public address".into(),
975 )
976 .into());
977 }
978
979 let myself = self.clone();
980 let channel_id = aggregated_ticket.channel_id;
981
982 let (channel_entry, domain_separator) = self
983 .nest_transaction_in_db(None, TargetDb::Index)
984 .await?
985 .perform(|tx| {
986 Box::pin(async move {
987 let entry = myself
988 .get_channel_by_id(Some(tx), &channel_id)
989 .await?
990 .ok_or(DbSqlError::ChannelNotFound(channel_id))?;
991
992 if entry.status == ChannelStatus::Closed {
993 return Err(DbSqlError::LogicalError(format!("channel '{channel_id}' is closed")));
994 } else if entry.direction(&myself.me_onchain) != Some(ChannelDirection::Incoming) {
995 return Err(DbSqlError::LogicalError(format!(
996 "channel '{channel_id}' is not incoming"
997 )));
998 }
999
1000 let domain_separator =
1001 myself.get_indexer_data(Some(tx)).await?.channels_dst.ok_or_else(|| {
1002 crate::errors::DbSqlError::LogicalError("domain separator missing".into())
1003 })?;
1004
1005 Ok((entry, domain_separator))
1006 })
1007 })
1008 .await?;
1009
1010 let aggregated_ticket = aggregated_ticket
1012 .verify(&channel_entry.source, &domain_separator)
1013 .map_err(|e| {
1014 DbSqlError::LogicalError(format!(
1015 "failed to verify received aggregated ticket in {channel_id}: {e}"
1016 ))
1017 })?;
1018
1019 if !aggregated_ticket.win_prob().approx_eq(&WinningProbability::ALWAYS) {
1021 return Err(DbSqlError::LogicalError("Aggregated tickets must have 100% win probability".into()).into());
1022 }
1023
1024 let acknowledged_tickets = self
1025 .nest_transaction_in_db(None, TargetDb::Tickets)
1026 .await?
1027 .perform(|tx| {
1028 Box::pin(async move {
1029 ticket::Entity::find()
1030 .filter(WrappedTicketSelector::from(
1031 TicketSelector::from(&channel_entry).with_state(AcknowledgedTicketStatus::BeingAggregated),
1032 ))
1033 .all(tx.as_ref())
1034 .await
1035 .map_err(DbSqlError::BackendError)
1036 })
1037 })
1038 .await?;
1039
1040 if acknowledged_tickets.is_empty() {
1041 debug!("Received unexpected aggregated ticket in channel {channel_id}");
1042 return Err(DbSqlError::LogicalError(format!(
1043 "failed insert aggregated ticket, because no tickets seem to be aggregated for '{channel_id}'",
1044 ))
1045 .into());
1046 }
1047
1048 let stored_value = acknowledged_tickets
1049 .iter()
1050 .map(|m| HoprBalance::from_be_bytes(&m.amount))
1051 .sum();
1052
1053 if aggregated_ticket.verified_ticket().amount.lt(&stored_value) {
1055 error!(channel = %channel_id, "Aggregated ticket value in channel is lower than sum of stored tickets");
1056 return Err(DbSqlError::LogicalError("Value of received aggregated ticket is too low".into()).into());
1057 }
1058
1059 let acknowledged_tickets = acknowledged_tickets
1060 .into_iter()
1061 .map(AcknowledgedTicket::try_from)
1062 .collect::<hopr_db_entity::errors::Result<Vec<AcknowledgedTicket>>>()
1063 .map_err(DbSqlError::from)?;
1064
1065 let first_stored_ticket = acknowledged_tickets.first().unwrap();
1067
1068 #[allow(unused_variables)]
1070 let current_ticket_index_from_aggregated_ticket =
1071 U256::from(aggregated_ticket.verified_ticket().index).add(aggregated_ticket.verified_ticket().index_offset);
1072
1073 let acked_aggregated_ticket = aggregated_ticket.into_acknowledged(first_stored_ticket.response.clone());
1074
1075 let ticket = acked_aggregated_ticket.clone();
1076 self.ticket_manager.replace_tickets(ticket).await?;
1077
1078 info!(%acked_aggregated_ticket, "successfully processed received aggregated ticket");
1079 Ok(acked_aggregated_ticket)
1080 }
1081
1082 async fn aggregate_tickets(
1083 &self,
1084 destination: OffchainPublicKey,
1085 mut acked_tickets: Vec<TransferableWinningTicket>,
1086 me: &ChainKeypair,
1087 ) -> Result<VerifiedTicket> {
1088 if me.public().to_address() != self.me_onchain {
1089 return Err(DbSqlError::LogicalError(
1090 "chain key for ticket aggregation does not match the DB public address".into(),
1091 )
1092 .into());
1093 }
1094
1095 let domain_separator = self
1096 .get_indexer_data(None)
1097 .await?
1098 .domain_separator(DomainSeparator::Channel)
1099 .ok_or_else(|| DbSqlError::LogicalError("domain separator missing".into()))?;
1100
1101 if acked_tickets.is_empty() {
1102 return Err(DbSqlError::LogicalError("at least one ticket required for aggregation".to_owned()).into());
1103 }
1104
1105 if acked_tickets.len() == 1 {
1106 let single = acked_tickets
1107 .pop()
1108 .unwrap()
1109 .into_redeemable(&self.me_onchain, &domain_separator)
1110 .map_err(DbSqlError::from)?;
1111
1112 self.compare_and_set_outgoing_ticket_index(
1113 single.verified_ticket().channel_id,
1114 single.verified_ticket().index + 1,
1115 )
1116 .await?;
1117
1118 return Ok(single.ticket);
1119 }
1120
1121 acked_tickets.sort_by(|a, b| a.partial_cmp(b).unwrap_or(cmp::Ordering::Equal));
1122 acked_tickets.dedup();
1123
1124 let myself = self.clone();
1125 let address = myself
1126 .resolve_chain_key(&destination)
1127 .await?
1128 .ok_or(DbSqlError::LogicalError(format!(
1129 "peer '{}' has no chain key record",
1130 destination.to_peerid_str()
1131 )))?;
1132
1133 let (channel_entry, destination, min_win_prob) = self
1134 .nest_transaction_in_db(None, TargetDb::Index)
1135 .await?
1136 .perform(|tx| {
1137 Box::pin(async move {
1138 let entry = myself
1139 .get_channel_by_parties(Some(tx), &myself.me_onchain, &address, false)
1140 .await?
1141 .ok_or_else(|| {
1142 DbSqlError::ChannelNotFound(generate_channel_id(&myself.me_onchain, &address))
1143 })?;
1144
1145 if entry.status == ChannelStatus::Closed {
1146 return Err(DbSqlError::LogicalError(format!("{entry} is closed")));
1147 } else if entry.direction(&myself.me_onchain) != Some(ChannelDirection::Outgoing) {
1148 return Err(DbSqlError::LogicalError(format!("{entry} is not outgoing")));
1149 }
1150
1151 let min_win_prob = myself
1152 .get_indexer_data(Some(tx))
1153 .await?
1154 .minimum_incoming_ticket_winning_prob;
1155 Ok((entry, address, min_win_prob))
1156 })
1157 })
1158 .await?;
1159
1160 let channel_balance = channel_entry.balance;
1161 let channel_epoch = channel_entry.channel_epoch.as_u32();
1162 let channel_id = channel_entry.get_id();
1163
1164 let mut final_value = HoprBalance::zero();
1165
1166 let verified_tickets = acked_tickets
1168 .into_iter()
1169 .map(|t| t.into_redeemable(&self.me_onchain, &domain_separator))
1170 .collect::<hopr_internal_types::errors::Result<Vec<_>>>()
1171 .map_err(|e| {
1172 DbSqlError::LogicalError(format!("trying to aggregate an invalid or a non-winning ticket: {e}"))
1173 })?;
1174
1175 for (i, acked_ticket) in verified_tickets.iter().enumerate() {
1177 if channel_id != acked_ticket.verified_ticket().channel_id {
1178 return Err(DbSqlError::LogicalError(format!(
1179 "ticket for aggregation has an invalid channel id {}",
1180 acked_ticket.verified_ticket().channel_id
1181 ))
1182 .into());
1183 }
1184
1185 if acked_ticket.verified_ticket().channel_epoch != channel_epoch {
1186 return Err(DbSqlError::LogicalError("channel epochs do not match".into()).into());
1187 }
1188
1189 if i + 1 < verified_tickets.len()
1190 && acked_ticket.verified_ticket().index + acked_ticket.verified_ticket().index_offset as u64
1191 > verified_tickets[i + 1].verified_ticket().index
1192 {
1193 return Err(DbSqlError::LogicalError("tickets with overlapping index intervals".into()).into());
1194 }
1195
1196 if acked_ticket
1197 .verified_ticket()
1198 .win_prob()
1199 .approx_cmp(&min_win_prob)
1200 .is_lt()
1201 {
1202 return Err(DbSqlError::LogicalError(
1203 "cannot aggregate ticket with lower than minimum winning probability in network".into(),
1204 )
1205 .into());
1206 }
1207
1208 final_value += acked_ticket.verified_ticket().amount;
1209 if final_value.gt(&channel_balance) {
1210 return Err(DbSqlError::LogicalError(format!(
1211 "ticket amount to aggregate {final_value} is greater than the balance {channel_balance} of \
1212 channel {channel_id}"
1213 ))
1214 .into());
1215 }
1216 }
1217
1218 info!(
1219 "aggregated {} tickets in channel {channel_id} with total value {final_value}",
1220 verified_tickets.len()
1221 );
1222
1223 let first_acked_ticket = verified_tickets.first().unwrap();
1224 let last_acked_ticket = verified_tickets.last().unwrap();
1225
1226 let current_ticket_index_from_acked_tickets = last_acked_ticket.verified_ticket().index + 1;
1229 self.compare_and_set_outgoing_ticket_index(channel_id, current_ticket_index_from_acked_tickets)
1230 .await?;
1231
1232 Ok(TicketBuilder::default()
1233 .direction(&self.me_onchain, &destination)
1234 .balance(final_value)
1235 .index(first_acked_ticket.verified_ticket().index)
1236 .index_offset(
1237 (last_acked_ticket.verified_ticket().index - first_acked_ticket.verified_ticket().index + 1) as u32,
1238 )
1239 .win_prob(WinningProbability::ALWAYS) .channel_epoch(channel_epoch)
1241 .eth_challenge(first_acked_ticket.verified_ticket().challenge)
1242 .build_signed(me, &domain_separator)
1243 .map_err(DbSqlError::from)?)
1244 }
1245
1246 async fn fix_channels_next_ticket_state(&self) -> Result<()> {
1247 let channels = self.get_incoming_channels(None).await?;
1248
1249 for channel in channels.into_iter() {
1250 let selector = TicketSelector::from(&channel)
1251 .with_state(AcknowledgedTicketStatus::BeingRedeemed)
1252 .with_index(channel.ticket_index.as_u64());
1253
1254 let mut tickets_stream = self
1255 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::Untouched)
1256 .await?;
1257
1258 while let Some(ticket) = tickets_stream.next().await {
1259 let channel_id = channel.get_id();
1260 let ticket_index = ticket.verified_ticket().index;
1261 let ticket_amount = ticket.verified_ticket().amount;
1262 info!(%channel_id, %ticket_index, %ticket_amount, "fixed next out-of-sync ticket");
1263 }
1264 }
1265
1266 Ok(())
1267 }
1268}
1269
1270impl HoprDb {
1271 pub async fn upsert_ticket<'a>(&'a self, tx: OptTx<'a>, acknowledged_ticket: AcknowledgedTicket) -> Result<()> {
1273 self.nest_transaction_in_db(tx, TargetDb::Tickets)
1274 .await?
1275 .perform(|tx| {
1276 Box::pin(async move {
1277 let selector = WrappedTicketSelector::from(
1279 TicketSelector::new(
1280 acknowledged_ticket.verified_ticket().channel_id,
1281 acknowledged_ticket.verified_ticket().channel_epoch,
1282 )
1283 .with_index(acknowledged_ticket.verified_ticket().index),
1284 );
1285
1286 debug!("upserting ticket {acknowledged_ticket}");
1287 let mut model = ticket::ActiveModel::from(acknowledged_ticket);
1288
1289 if let Some(ticket) = ticket::Entity::find().filter(selector).one(tx.as_ref()).await? {
1290 model.id = Set(ticket.id);
1291 }
1292
1293 Ok::<_, DbSqlError>(model.save(tx.as_ref()).await?)
1294 })
1295 })
1296 .await?;
1297 Ok(())
1298 }
1299}
1300
1301#[cfg(test)]
1302mod tests {
1303 use std::{
1304 ops::Add,
1305 sync::atomic::Ordering,
1306 time::{Duration, SystemTime},
1307 };
1308
1309 use anyhow::{Context, anyhow};
1310 use futures::{StreamExt, pin_mut};
1311 use hex_literal::hex;
1312 use hopr_crypto_random::Randomizable;
1313 use hopr_crypto_types::prelude::*;
1314 use hopr_db_api::{
1315 info::DomainSeparator,
1316 prelude::{DbError, TicketMarker},
1317 tickets::ChannelTicketStatistics,
1318 };
1319 use hopr_db_entity::ticket;
1320 use hopr_internal_types::prelude::*;
1321 use hopr_primitive_types::prelude::*;
1322 use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter, Set};
1323
1324 use crate::{
1325 HoprDbGeneralModelOperations, TargetDb,
1326 accounts::HoprDbAccountOperations,
1327 channels::HoprDbChannelOperations,
1328 db::HoprDb,
1329 errors::DbSqlError,
1330 info::HoprDbInfoOperations,
1331 tickets::{AggregationPrerequisites, HoprDbTicketOperations, TicketSelector, filter_satisfying_ticket_models},
1332 };
1333
1334 lazy_static::lazy_static! {
1335 static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
1336 static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
1337 static ref CHANNEL_ID: Hash = generate_channel_id(&BOB.public().to_address(), &ALICE.public().to_address());
1338 }
1339
1340 lazy_static::lazy_static! {
1341 static ref ALICE_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
1342 static ref BOB_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
1343 }
1344
1345 const TICKET_VALUE: u64 = 100_000;
1346
1347 async fn add_peer_mappings(db: &HoprDb, peers: Vec<(OffchainKeypair, ChainKeypair)>) -> crate::errors::Result<()> {
1348 for (peer_offchain, peer_onchain) in peers.into_iter() {
1349 db.insert_account(
1350 None,
1351 AccountEntry {
1352 public_key: *peer_offchain.public(),
1353 chain_addr: peer_onchain.public().to_address(),
1354 entry_type: AccountType::NotAnnounced,
1355 published_at: 0,
1356 },
1357 )
1358 .await?
1359 }
1360
1361 Ok(())
1362 }
1363
1364 fn generate_random_ack_ticket(
1365 src: &ChainKeypair,
1366 dst: &ChainKeypair,
1367 index: u64,
1368 index_offset: u32,
1369 win_prob: f64,
1370 ) -> anyhow::Result<AcknowledgedTicket> {
1371 let hk1 = HalfKey::random();
1372 let hk2 = HalfKey::random();
1373 let challenge = Response::from_half_keys(&hk1, &hk2)?.to_challenge()?;
1374
1375 Ok(TicketBuilder::default()
1376 .addresses(src, dst)
1377 .amount(TICKET_VALUE)
1378 .index(index)
1379 .index_offset(index_offset)
1380 .win_prob(win_prob.try_into()?)
1381 .channel_epoch(4)
1382 .challenge(challenge)
1383 .build_signed(src, &Hash::default())?
1384 .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?))
1385 }
1386
1387 async fn init_db_with_tickets(
1388 db: &HoprDb,
1389 count_tickets: u64,
1390 ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
1391 init_db_with_tickets_and_channel(db, count_tickets, None).await
1392 }
1393
1394 async fn init_db_with_tickets_and_channel(
1395 db: &HoprDb,
1396 count_tickets: u64,
1397 channel_ticket_index: Option<u32>,
1398 ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
1399 let channel = ChannelEntry::new(
1400 BOB.public().to_address(),
1401 ALICE.public().to_address(),
1402 u32::MAX.into(),
1403 channel_ticket_index.unwrap_or(0u32).into(),
1404 ChannelStatus::Open,
1405 4_u32.into(),
1406 );
1407
1408 db.upsert_channel(None, channel).await?;
1409
1410 let tickets: Vec<AcknowledgedTicket> = (0..count_tickets)
1411 .map(|i| generate_random_ack_ticket(&BOB, &ALICE, i, 1, 1.0))
1412 .collect::<anyhow::Result<Vec<AcknowledgedTicket>>>()?;
1413
1414 let db_clone = db.clone();
1415 let tickets_clone = tickets.clone();
1416 db.begin_transaction_in_db(TargetDb::Tickets)
1417 .await?
1418 .perform(|tx| {
1419 Box::pin(async move {
1420 for t in tickets_clone {
1421 db_clone.upsert_ticket(Some(tx), t).await?;
1422 }
1423 Ok::<(), DbSqlError>(())
1424 })
1425 })
1426 .await?;
1427
1428 Ok((channel, tickets))
1429 }
1430
1431 #[tokio::test]
1432 async fn test_insert_get_ticket() -> anyhow::Result<()> {
1433 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1434 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
1435 .await?;
1436
1437 let (channel, mut tickets) = init_db_with_tickets(&db, 1).await?;
1438 let ack_ticket = tickets.pop().context("ticket should be present")?;
1439
1440 assert_eq!(
1441 channel.get_id(),
1442 ack_ticket.verified_ticket().channel_id,
1443 "channel ids must match"
1444 );
1445 assert_eq!(
1446 channel.channel_epoch.as_u32(),
1447 ack_ticket.verified_ticket().channel_epoch,
1448 "epochs must match"
1449 );
1450
1451 let db_ticket = db
1452 .get_tickets((&ack_ticket).into())
1453 .await?
1454 .first()
1455 .cloned()
1456 .context("ticket should exist")?;
1457
1458 assert_eq!(ack_ticket, db_ticket, "tickets must be equal");
1459
1460 Ok(())
1461 }
1462
1463 #[tokio::test]
1464 async fn test_mark_redeemed() -> anyhow::Result<()> {
1465 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1466 const COUNT_TICKETS: u64 = 10;
1467
1468 let (_, tickets) = init_db_with_tickets(&db, COUNT_TICKETS).await?;
1469
1470 let stats = db.get_ticket_statistics(None).await?;
1471 assert_eq!(
1472 HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
1473 stats.unredeemed_value,
1474 "unredeemed balance must match"
1475 );
1476 assert_eq!(
1477 HoprBalance::zero(),
1478 stats.redeemed_value,
1479 "there must be 0 redeemed value"
1480 );
1481
1482 assert_eq!(
1483 stats,
1484 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1485 "per channel stats must be same"
1486 );
1487
1488 const TO_REDEEM: u64 = 2;
1489 let db_clone = db.clone();
1490 db.begin_transaction_in_db(TargetDb::Tickets)
1491 .await?
1492 .perform(|_tx| {
1493 Box::pin(async move {
1494 for ticket in tickets.iter().take(TO_REDEEM as usize) {
1495 let r = db_clone.mark_tickets_as(ticket.into(), TicketMarker::Redeemed).await?;
1496 assert_eq!(1, r, "must redeem only a single ticket");
1497 }
1498 Ok::<(), DbSqlError>(())
1499 })
1500 })
1501 .await?;
1502
1503 let stats = db.get_ticket_statistics(None).await?;
1504 assert_eq!(
1505 HoprBalance::from(TICKET_VALUE * (COUNT_TICKETS - TO_REDEEM)),
1506 stats.unredeemed_value,
1507 "unredeemed balance must match"
1508 );
1509 assert_eq!(
1510 HoprBalance::from(TICKET_VALUE * TO_REDEEM),
1511 stats.redeemed_value,
1512 "there must be a redeemed value"
1513 );
1514
1515 assert_eq!(
1516 stats,
1517 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1518 "per channel stats must be same"
1519 );
1520
1521 Ok(())
1522 }
1523
1524 #[tokio::test]
1525 async fn test_mark_redeem_should_not_mark_redeem_twice() -> anyhow::Result<()> {
1526 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1527
1528 let ticket = init_db_with_tickets(&db, 1)
1529 .await?
1530 .1
1531 .pop()
1532 .context("should contain a ticket")?;
1533
1534 db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed).await?;
1535 assert_eq!(0, db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed).await?);
1536
1537 Ok(())
1538 }
1539
1540 #[tokio::test]
1541 async fn test_mark_redeem_should_redeem_all_tickets() -> anyhow::Result<()> {
1542 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1543
1544 let count_tickets = 10;
1545 let channel = init_db_with_tickets(&db, count_tickets).await?.0;
1546
1547 let count_marked = db.mark_tickets_as((&channel).into(), TicketMarker::Redeemed).await?;
1548 assert_eq!(count_tickets, count_marked as u64, "must mark all tickets in channel");
1549
1550 Ok(())
1551 }
1552
1553 #[tokio::test]
1554 async fn test_mark_tickets_neglected() -> anyhow::Result<()> {
1555 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1556 const COUNT_TICKETS: u64 = 10;
1557
1558 let (channel, _) = init_db_with_tickets(&db, COUNT_TICKETS).await?;
1559
1560 let stats = db.get_ticket_statistics(None).await?;
1561 assert_eq!(
1562 HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
1563 stats.unredeemed_value,
1564 "unredeemed balance must match"
1565 );
1566 assert_eq!(
1567 HoprBalance::zero(),
1568 stats.neglected_value,
1569 "there must be 0 redeemed value"
1570 );
1571
1572 assert_eq!(
1573 stats,
1574 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1575 "per channel stats must be same"
1576 );
1577
1578 db.mark_tickets_as((&channel).into(), TicketMarker::Neglected).await?;
1579
1580 let stats = db.get_ticket_statistics(None).await?;
1581 assert_eq!(
1582 HoprBalance::zero(),
1583 stats.unredeemed_value,
1584 "unredeemed balance must be zero"
1585 );
1586 assert_eq!(
1587 HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
1588 stats.neglected_value,
1589 "there must be a neglected value"
1590 );
1591
1592 assert_eq!(
1593 stats,
1594 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1595 "per channel stats must be same"
1596 );
1597
1598 Ok(())
1599 }
1600
1601 #[tokio::test]
1602 async fn test_mark_unsaved_ticket_rejected() -> anyhow::Result<()> {
1603 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1604
1605 let (_, mut ticket) = init_db_with_tickets(&db, 1).await?;
1606 let ticket = ticket.pop().context("ticket should be present")?.ticket;
1607
1608 let stats = db.get_ticket_statistics(None).await?;
1609 assert_eq!(HoprBalance::zero(), stats.rejected_value);
1610 assert_eq!(
1611 stats,
1612 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1613 "per channel stats must be same"
1614 );
1615
1616 db.mark_unsaved_ticket_rejected(ticket.verified_ticket()).await?;
1617
1618 let stats = db.get_ticket_statistics(None).await?;
1619 assert_eq!(ticket.verified_ticket().amount, stats.rejected_value);
1620 assert_eq!(
1621 stats,
1622 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1623 "per channel stats must be same"
1624 );
1625
1626 Ok(())
1627 }
1628
1629 #[tokio::test]
1630 async fn test_update_tickets_states_and_fetch() -> anyhow::Result<()> {
1631 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1632 db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
1633 .await?;
1634
1635 let channel = init_db_with_tickets(&db, 10).await?.0;
1636
1637 let selector = TicketSelector::from(&channel).with_index(5);
1638
1639 let v: Vec<AcknowledgedTicket> = db
1640 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
1641 .await?
1642 .collect()
1643 .await;
1644
1645 assert_eq!(1, v.len(), "single ticket must be updated");
1646 assert_eq!(
1647 AcknowledgedTicketStatus::BeingRedeemed,
1648 v.first().context("should contain a ticket")?.status,
1649 "status must be set"
1650 );
1651
1652 let selector = TicketSelector::from(&channel).with_state(AcknowledgedTicketStatus::Untouched);
1653
1654 let v: Vec<AcknowledgedTicket> = db
1655 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
1656 .await?
1657 .collect()
1658 .await;
1659
1660 assert_eq!(9, v.len(), "only specific tickets must have state set");
1661 assert!(
1662 v.iter().all(|t| t.verified_ticket().index != 5),
1663 "only tickets with different state must update"
1664 );
1665 assert!(
1666 v.iter().all(|t| t.status == AcknowledgedTicketStatus::BeingRedeemed),
1667 "tickets must have updated state"
1668 );
1669
1670 Ok(())
1671 }
1672
1673 #[tokio::test]
1674 async fn test_update_tickets_states() -> anyhow::Result<()> {
1675 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1676 db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
1677 .await?;
1678
1679 let channel = init_db_with_tickets(&db, 10).await?.0;
1680 let selector = TicketSelector::from(&channel).with_state(AcknowledgedTicketStatus::Untouched);
1681
1682 db.update_ticket_states(selector.clone(), AcknowledgedTicketStatus::BeingRedeemed)
1683 .await?;
1684
1685 let v: Vec<AcknowledgedTicket> = db
1686 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
1687 .await?
1688 .collect()
1689 .await;
1690
1691 assert!(v.is_empty(), "must not update if already updated");
1692
1693 Ok(())
1694 }
1695
1696 #[tokio::test]
1697 async fn test_ticket_index_should_be_zero_if_not_yet_present() -> anyhow::Result<()> {
1698 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1699
1700 let hash = Hash::default();
1701
1702 let idx = db.get_outgoing_ticket_index(hash).await?;
1703 assert_eq!(0, idx.load(Ordering::SeqCst), "initial index must be zero");
1704
1705 let r = hopr_db_entity::outgoing_ticket_index::Entity::find()
1706 .filter(hopr_db_entity::outgoing_ticket_index::Column::ChannelId.eq(hash.to_hex()))
1707 .one(&db.tickets_db)
1708 .await?
1709 .context("index must exist")?;
1710
1711 assert_eq!(0, U256::from_be_bytes(r.index).as_u64(), "index must be zero");
1712
1713 Ok(())
1714 }
1715
1716 #[tokio::test]
1717 async fn test_ticket_stats_must_fail_for_non_existing_channel() -> anyhow::Result<()> {
1718 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1719
1720 db.get_ticket_statistics(Some(*CHANNEL_ID))
1721 .await
1722 .expect_err("must fail for non-existing channel");
1723
1724 Ok(())
1725 }
1726
1727 #[tokio::test]
1728 async fn test_ticket_stats_must_be_zero_when_no_tickets() -> anyhow::Result<()> {
1729 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1730
1731 let channel = ChannelEntry::new(
1732 BOB.public().to_address(),
1733 ALICE.public().to_address(),
1734 u32::MAX.into(),
1735 0.into(),
1736 ChannelStatus::Open,
1737 4_u32.into(),
1738 );
1739
1740 db.upsert_channel(None, channel).await?;
1741
1742 let stats = db.get_ticket_statistics(Some(*CHANNEL_ID)).await?;
1743
1744 assert_eq!(
1745 ChannelTicketStatistics::default(),
1746 stats,
1747 "must be equal to default which is all zeros"
1748 );
1749
1750 assert_eq!(
1751 stats,
1752 db.get_ticket_statistics(None).await?,
1753 "per-channel stats must be the same as global stats"
1754 );
1755
1756 Ok(())
1757 }
1758
1759 #[tokio::test]
1760 async fn test_ticket_stats_must_be_different_per_channel() -> anyhow::Result<()> {
1761 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1762
1763 let channel_1 = ChannelEntry::new(
1764 BOB.public().to_address(),
1765 ALICE.public().to_address(),
1766 u32::MAX.into(),
1767 0.into(),
1768 ChannelStatus::Open,
1769 4_u32.into(),
1770 );
1771
1772 db.upsert_channel(None, channel_1).await?;
1773
1774 let channel_2 = ChannelEntry::new(
1775 ALICE.public().to_address(),
1776 BOB.public().to_address(),
1777 u32::MAX.into(),
1778 0.into(),
1779 ChannelStatus::Open,
1780 4_u32.into(),
1781 );
1782
1783 db.upsert_channel(None, channel_2).await?;
1784
1785 let t1 = generate_random_ack_ticket(&BOB, &ALICE, 1, 1, 1.0)?;
1786 let t2 = generate_random_ack_ticket(&ALICE, &BOB, 1, 1, 1.0)?;
1787
1788 let value = t1.verified_ticket().amount;
1789
1790 db.upsert_ticket(None, t1).await?;
1791 db.upsert_ticket(None, t2).await?;
1792
1793 let stats_1 = db
1794 .get_ticket_statistics(Some(generate_channel_id(
1795 &BOB.public().to_address(),
1796 &ALICE.public().to_address(),
1797 )))
1798 .await?;
1799
1800 let stats_2 = db
1801 .get_ticket_statistics(Some(generate_channel_id(
1802 &ALICE.public().to_address(),
1803 &BOB.public().to_address(),
1804 )))
1805 .await?;
1806
1807 assert_eq!(value, stats_1.unredeemed_value);
1808 assert_eq!(value, stats_2.unredeemed_value);
1809
1810 assert_eq!(HoprBalance::zero(), stats_1.neglected_value);
1811 assert_eq!(HoprBalance::zero(), stats_2.neglected_value);
1812
1813 assert_eq!(stats_1, stats_2);
1814
1815 db.mark_tickets_as(channel_1.into(), TicketMarker::Neglected).await?;
1816
1817 let stats_1 = db
1818 .get_ticket_statistics(Some(generate_channel_id(
1819 &BOB.public().to_address(),
1820 &ALICE.public().to_address(),
1821 )))
1822 .await?;
1823
1824 let stats_2 = db
1825 .get_ticket_statistics(Some(generate_channel_id(
1826 &ALICE.public().to_address(),
1827 &BOB.public().to_address(),
1828 )))
1829 .await?;
1830
1831 assert_eq!(HoprBalance::zero(), stats_1.unredeemed_value);
1832 assert_eq!(value, stats_1.neglected_value);
1833
1834 assert_eq!(HoprBalance::zero(), stats_2.neglected_value);
1835
1836 Ok(())
1837 }
1838
1839 #[tokio::test]
1840 async fn test_ticket_index_compare_and_set_and_increment() -> anyhow::Result<()> {
1841 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1842
1843 let hash = Hash::default();
1844
1845 let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1846 assert_eq!(0, old_idx, "old value must be 0");
1847
1848 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1849 assert_eq!(1, new_idx, "new value must be 1");
1850
1851 let old_idx = db.increment_outgoing_ticket_index(hash).await?;
1852 assert_eq!(1, old_idx, "old value must be 1");
1853
1854 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1855 assert_eq!(2, new_idx, "new value must be 2");
1856
1857 Ok(())
1858 }
1859
1860 #[tokio::test]
1861 async fn test_ticket_index_compare_and_set_must_not_decrease() -> anyhow::Result<()> {
1862 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1863
1864 let hash = Hash::default();
1865
1866 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1867 assert_eq!(0, new_idx, "value must be 0");
1868
1869 db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1870
1871 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1872 assert_eq!(1, new_idx, "new value must be 1");
1873
1874 let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 0).await?;
1875 assert_eq!(1, old_idx, "old value must be 1");
1876 assert_eq!(1, new_idx, "new value must be 1");
1877
1878 let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1879 assert_eq!(1, old_idx, "old value must be 1");
1880 assert_eq!(1, new_idx, "new value must be 1");
1881
1882 Ok(())
1883 }
1884
1885 #[tokio::test]
1886 async fn test_ticket_index_reset() -> anyhow::Result<()> {
1887 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1888
1889 let hash = Hash::default();
1890
1891 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1892 assert_eq!(0, new_idx, "value must be 0");
1893
1894 db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1895
1896 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1897 assert_eq!(1, new_idx, "new value must be 1");
1898
1899 let old_idx = db.reset_outgoing_ticket_index(hash).await?;
1900 assert_eq!(1, old_idx, "old value must be 1");
1901
1902 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1903 assert_eq!(0, new_idx, "new value must be 0");
1904 Ok(())
1905 }
1906
1907 #[tokio::test]
1908 async fn test_persist_ticket_indices() -> anyhow::Result<()> {
1909 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1910
1911 let hash_1 = Hash::default();
1912 let hash_2 = Hash::from(hopr_crypto_random::random_bytes());
1913
1914 db.get_outgoing_ticket_index(hash_1).await?;
1915 db.compare_and_set_outgoing_ticket_index(hash_2, 10).await?;
1916
1917 let persisted = db.persist_outgoing_ticket_indices().await?;
1918 assert_eq!(1, persisted);
1919
1920 let indices = hopr_db_entity::outgoing_ticket_index::Entity::find()
1921 .all(&db.tickets_db)
1922 .await?;
1923 let idx_1 = indices
1924 .iter()
1925 .find(|idx| idx.channel_id == hash_1.to_hex())
1926 .context("must contain index 1")?;
1927 let idx_2 = indices
1928 .iter()
1929 .find(|idx| idx.channel_id == hash_2.to_hex())
1930 .context("must contain index 2")?;
1931 assert_eq!(0, U256::from_be_bytes(&idx_1.index).as_u64(), "index must be 0");
1932 assert_eq!(10, U256::from_be_bytes(&idx_2.index).as_u64(), "index must be 10");
1933
1934 db.compare_and_set_outgoing_ticket_index(hash_1, 3).await?;
1935 db.increment_outgoing_ticket_index(hash_2).await?;
1936
1937 let persisted = db.persist_outgoing_ticket_indices().await?;
1938 assert_eq!(2, persisted);
1939
1940 let indices = hopr_db_entity::outgoing_ticket_index::Entity::find()
1941 .all(&db.tickets_db)
1942 .await?;
1943 let idx_1 = indices
1944 .iter()
1945 .find(|idx| idx.channel_id == hash_1.to_hex())
1946 .context("must contain index 1")?;
1947 let idx_2 = indices
1948 .iter()
1949 .find(|idx| idx.channel_id == hash_2.to_hex())
1950 .context("must contain index 2")?;
1951 assert_eq!(3, U256::from_be_bytes(&idx_1.index).as_u64(), "index must be 3");
1952 assert_eq!(11, U256::from_be_bytes(&idx_2.index).as_u64(), "index must be 11");
1953 Ok(())
1954 }
1955
1956 #[tokio::test]
1957 async fn test_cache_can_be_cloned_but_referencing_the_original_cache_storage() -> anyhow::Result<()> {
1958 let cache: moka::future::Cache<i64, i64> = moka::future::Cache::new(5);
1959
1960 assert_eq!(cache.weighted_size(), 0);
1961
1962 cache.insert(1, 1).await;
1963 cache.insert(2, 2).await;
1964
1965 let clone = cache.clone();
1966
1967 cache.remove(&1).await;
1968 cache.remove(&2).await;
1969
1970 assert_eq!(cache.get(&1).await, None);
1971 assert_eq!(cache.get(&1).await, clone.get(&1).await);
1972 Ok(())
1973 }
1974
1975 fn dummy_ticket_model(channel_id: Hash, idx: u64, idx_offset: u32, amount: u32) -> ticket::Model {
1976 ticket::Model {
1977 id: 0,
1978 channel_id: channel_id.to_string(),
1979 amount: U256::from(amount).to_be_bytes().to_vec(),
1980 index: idx.to_be_bytes().to_vec(),
1981 index_offset: idx_offset as i32,
1982 winning_probability: hex!("0020C49BA5E34F").to_vec(), channel_epoch: vec![],
1984 signature: vec![],
1985 response: vec![],
1986 state: 0,
1987 hash: vec![],
1988 }
1989 }
1990
1991 #[tokio::test]
1992 async fn test_aggregation_prerequisites_default_filter_no_tickets() -> anyhow::Result<()> {
1993 let prerequisites = AggregationPrerequisites::default();
1994 assert_eq!(None, prerequisites.min_unaggregated_ratio);
1995 assert_eq!(None, prerequisites.min_ticket_count);
1996
1997 let channel = ChannelEntry::new(
1998 BOB.public().to_address(),
1999 ALICE.public().to_address(),
2000 u32::MAX.into(),
2001 2.into(),
2002 ChannelStatus::Open,
2003 4_u32.into(),
2004 );
2005
2006 let dummy_tickets = vec![dummy_ticket_model(channel.get_id(), 1, 1, 1)];
2007
2008 let filtered_tickets =
2009 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2010
2011 assert_eq!(
2012 dummy_tickets, filtered_tickets,
2013 "empty prerequisites must not filter anything"
2014 );
2015 Ok(())
2016 }
2017
2018 #[tokio::test]
2019 async fn test_aggregation_prerequisites_should_filter_out_tickets_with_lower_than_min_win_prob()
2020 -> anyhow::Result<()> {
2021 let prerequisites = AggregationPrerequisites::default();
2022 assert_eq!(None, prerequisites.min_unaggregated_ratio);
2023 assert_eq!(None, prerequisites.min_ticket_count);
2024
2025 let channel = ChannelEntry::new(
2026 BOB.public().to_address(),
2027 ALICE.public().to_address(),
2028 u32::MAX.into(),
2029 2.into(),
2030 ChannelStatus::Open,
2031 4_u32.into(),
2032 );
2033
2034 let dummy_tickets = vec![dummy_ticket_model(channel.get_id(), 1, 1, 1)];
2035
2036 let filtered_tickets =
2037 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0006.try_into()?)?;
2038
2039 assert!(
2040 filtered_tickets.is_empty(),
2041 "must filter out tickets with lower win prob"
2042 );
2043 Ok(())
2044 }
2045
2046 #[tokio::test]
2047 async fn test_aggregation_prerequisites_must_trim_tickets_exceeding_channel_balance() -> anyhow::Result<()> {
2048 const TICKET_COUNT: usize = 110;
2049
2050 let prerequisites = AggregationPrerequisites::default();
2051 assert_eq!(None, prerequisites.min_unaggregated_ratio);
2052 assert_eq!(None, prerequisites.min_ticket_count);
2053
2054 let channel = ChannelEntry::new(
2055 BOB.public().to_address(),
2056 ALICE.public().to_address(),
2057 100.into(),
2058 (TICKET_COUNT + 1).into(),
2059 ChannelStatus::Open,
2060 4_u32.into(),
2061 );
2062
2063 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2064 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2065 .collect();
2066
2067 let filtered_tickets =
2068 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2069
2070 assert_eq!(
2071 100,
2072 filtered_tickets.len(),
2073 "must take only tickets up to channel balance"
2074 );
2075 assert!(
2076 filtered_tickets
2077 .into_iter()
2078 .map(|t| U256::from_be_bytes(t.amount).as_u64())
2079 .sum::<u64>()
2080 <= channel.balance.amount().as_u64(),
2081 "filtered tickets must not exceed channel balance"
2082 );
2083 Ok(())
2084 }
2085
2086 #[tokio::test]
2087 async fn test_aggregation_prerequisites_must_return_empty_when_minimum_ticket_count_not_met() -> anyhow::Result<()>
2088 {
2089 const TICKET_COUNT: usize = 10;
2090
2091 let prerequisites = AggregationPrerequisites {
2092 min_ticket_count: Some(TICKET_COUNT + 1),
2093 min_unaggregated_ratio: None,
2094 };
2095
2096 let channel = ChannelEntry::new(
2097 BOB.public().to_address(),
2098 ALICE.public().to_address(),
2099 100.into(),
2100 (TICKET_COUNT + 1).into(),
2101 ChannelStatus::Open,
2102 4_u32.into(),
2103 );
2104
2105 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2106 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2107 .collect();
2108
2109 let filtered_tickets =
2110 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2111
2112 assert!(
2113 filtered_tickets.is_empty(),
2114 "must return empty when min_ticket_count is not met"
2115 );
2116
2117 Ok(())
2118 }
2119
2120 #[tokio::test]
2121 async fn test_aggregation_prerequisites_must_return_empty_when_minimum_unaggregated_ratio_is_not_met()
2122 -> anyhow::Result<()> {
2123 const TICKET_COUNT: usize = 10;
2124
2125 let prerequisites = AggregationPrerequisites {
2126 min_ticket_count: None,
2127 min_unaggregated_ratio: Some(0.9),
2128 };
2129
2130 let channel = ChannelEntry::new(
2131 BOB.public().to_address(),
2132 ALICE.public().to_address(),
2133 100.into(),
2134 (TICKET_COUNT + 1).into(),
2135 ChannelStatus::Open,
2136 4_u32.into(),
2137 );
2138
2139 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2140 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2141 .collect();
2142
2143 let filtered_tickets =
2144 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2145
2146 assert!(
2147 filtered_tickets.is_empty(),
2148 "must return empty when min_unaggregated_ratio is not met"
2149 );
2150
2151 Ok(())
2152 }
2153
2154 #[tokio::test]
2155 async fn test_aggregation_prerequisites_must_return_all_when_minimum_ticket_count_is_met() -> anyhow::Result<()> {
2156 const TICKET_COUNT: usize = 10;
2157
2158 let prerequisites = AggregationPrerequisites {
2159 min_ticket_count: Some(TICKET_COUNT),
2160 min_unaggregated_ratio: None,
2161 };
2162
2163 let channel = ChannelEntry::new(
2164 BOB.public().to_address(),
2165 ALICE.public().to_address(),
2166 100.into(),
2167 (TICKET_COUNT + 1).into(),
2168 ChannelStatus::Open,
2169 4_u32.into(),
2170 );
2171
2172 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2173 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2174 .collect();
2175
2176 let filtered_tickets =
2177 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2178
2179 assert!(!filtered_tickets.is_empty(), "must not return empty");
2180 assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2181 Ok(())
2182 }
2183
2184 #[tokio::test]
2185 async fn test_aggregation_prerequisites_must_return_all_when_minimum_ticket_count_is_met_regardless_ratio()
2186 -> anyhow::Result<()> {
2187 const TICKET_COUNT: usize = 10;
2188
2189 let prerequisites = AggregationPrerequisites {
2190 min_ticket_count: Some(TICKET_COUNT),
2191 min_unaggregated_ratio: Some(0.9),
2192 };
2193
2194 let channel = ChannelEntry::new(
2195 BOB.public().to_address(),
2196 ALICE.public().to_address(),
2197 100.into(),
2198 (TICKET_COUNT + 1).into(),
2199 ChannelStatus::Open,
2200 4_u32.into(),
2201 );
2202
2203 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2204 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2205 .collect();
2206
2207 let filtered_tickets =
2208 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2209
2210 assert!(!filtered_tickets.is_empty(), "must not return empty");
2211 assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2212 Ok(())
2213 }
2214
2215 #[tokio::test]
2216 async fn test_aggregation_prerequisites_must_return_all_when_minimum_unaggregated_ratio_is_met()
2217 -> anyhow::Result<()> {
2218 const TICKET_COUNT: usize = 90;
2219
2220 let prerequisites = AggregationPrerequisites {
2221 min_ticket_count: None,
2222 min_unaggregated_ratio: Some(0.9),
2223 };
2224
2225 let channel = ChannelEntry::new(
2226 BOB.public().to_address(),
2227 ALICE.public().to_address(),
2228 100.into(),
2229 (TICKET_COUNT + 1).into(),
2230 ChannelStatus::Open,
2231 4_u32.into(),
2232 );
2233
2234 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2235 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2236 .collect();
2237
2238 let filtered_tickets =
2239 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2240
2241 assert!(!filtered_tickets.is_empty(), "must not return empty");
2242 assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2243 Ok(())
2244 }
2245
2246 #[tokio::test]
2247 async fn test_aggregation_prerequisites_must_return_all_when_minimum_unaggregated_ratio_is_met_regardless_count()
2248 -> anyhow::Result<()> {
2249 const TICKET_COUNT: usize = 90;
2250
2251 let prerequisites = AggregationPrerequisites {
2252 min_ticket_count: Some(TICKET_COUNT + 1),
2253 min_unaggregated_ratio: Some(0.9),
2254 };
2255
2256 let channel = ChannelEntry::new(
2257 BOB.public().to_address(),
2258 ALICE.public().to_address(),
2259 100.into(),
2260 (TICKET_COUNT + 1).into(),
2261 ChannelStatus::Open,
2262 4_u32.into(),
2263 );
2264
2265 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2266 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2267 .collect();
2268
2269 let filtered_tickets =
2270 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2271
2272 assert!(!filtered_tickets.is_empty(), "must not return empty");
2273 assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2274 Ok(())
2275 }
2276
2277 #[tokio::test]
2278 async fn test_aggregation_prerequisites_must_return_tickets_when_minimum_incl_aggregated_ratio_is_met()
2279 -> anyhow::Result<()> {
2280 const TICKET_COUNT: usize = 90;
2281
2282 let prerequisites = AggregationPrerequisites {
2283 min_ticket_count: None,
2284 min_unaggregated_ratio: Some(0.9),
2285 };
2286
2287 let channel = ChannelEntry::new(
2288 BOB.public().to_address(),
2289 ALICE.public().to_address(),
2290 100.into(),
2291 (TICKET_COUNT + 1).into(),
2292 ChannelStatus::Open,
2293 4_u32.into(),
2294 );
2295
2296 let mut dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2297 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2298 .collect();
2299 dummy_tickets[0].index_offset = 2; let filtered_tickets =
2302 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2303
2304 assert_eq!(filtered_tickets.len(), TICKET_COUNT);
2305 Ok(())
2306 }
2307
2308 #[tokio::test]
2309 async fn test_aggregation_prerequisites_must_return_empty_when_minimum_only_unaggregated_ratio_is_met_in_single_ticket_only()
2310 -> anyhow::Result<()> {
2311 let prerequisites = AggregationPrerequisites {
2312 min_ticket_count: None,
2313 min_unaggregated_ratio: Some(0.9),
2314 };
2315
2316 let channel = ChannelEntry::new(
2317 BOB.public().to_address(),
2318 ALICE.public().to_address(),
2319 100.into(),
2320 2.into(),
2321 ChannelStatus::Open,
2322 4_u32.into(),
2323 );
2324
2325 let dummy_tickets = vec![dummy_ticket_model(channel.get_id(), 1, 2, 110)];
2327
2328 let filtered_tickets =
2329 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2330
2331 assert!(filtered_tickets.is_empty(), "must return empty");
2332 Ok(())
2333 }
2334
2335 async fn create_alice_db_with_tickets_from_bob(
2336 ticket_count: usize,
2337 ) -> anyhow::Result<(HoprDb, ChannelEntry, Vec<AcknowledgedTicket>)> {
2338 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
2339
2340 db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
2341 .await?;
2342
2343 add_peer_mappings(
2344 &db,
2345 vec![
2346 (ALICE_OFFCHAIN.clone(), ALICE.clone()),
2347 (BOB_OFFCHAIN.clone(), BOB.clone()),
2348 ],
2349 )
2350 .await?;
2351
2352 let (channel, tickets) = init_db_with_tickets(&db, ticket_count as u64).await?;
2353
2354 Ok((db, channel, tickets))
2355 }
2356
2357 #[tokio::test]
2358 async fn test_ticket_aggregation_should_fail_if_any_ticket_is_being_aggregated_in_that_channel()
2359 -> anyhow::Result<()> {
2360 const COUNT_TICKETS: usize = 5;
2361
2362 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2363
2364 assert_eq!(tickets.len(), COUNT_TICKETS);
2365
2366 let existing_channel_with_multiple_tickets = channel.get_id();
2367
2368 let mut ticket = hopr_db_entity::ticket::Entity::find()
2370 .one(&db.tickets_db)
2371 .await?
2372 .context("should have an active model")?
2373 .into_active_model();
2374 ticket.state = Set(AcknowledgedTicketStatus::BeingAggregated as i8);
2375 ticket.save(&db.tickets_db).await?;
2376
2377 assert!(
2378 db.prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2379 .await
2380 .is_err()
2381 );
2382
2383 Ok(())
2384 }
2385
2386 #[tokio::test]
2387 async fn test_ticket_aggregation_should_not_offer_tickets_with_lower_than_min_win_prob() -> anyhow::Result<()> {
2388 const COUNT_TICKETS: usize = 5;
2389
2390 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2391
2392 assert_eq!(tickets.len(), COUNT_TICKETS);
2393
2394 let existing_channel_with_multiple_tickets = channel.get_id();
2395
2396 let mut ticket = hopr_db_entity::ticket::Entity::find()
2398 .one(&db.tickets_db)
2399 .await?
2400 .context("should have an active model")?
2401 .into_active_model();
2402 ticket.winning_probability = Set(WinningProbability::try_from_f64(0.5)?.as_ref().to_vec());
2403 ticket.save(&db.tickets_db).await?;
2404
2405 let prepared_tickets = db
2406 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2407 .await?
2408 .ok_or(anyhow!("should contain tickets"))?
2409 .1;
2410
2411 assert_eq!(COUNT_TICKETS - 1, prepared_tickets.len());
2412
2413 Ok(())
2414 }
2415
2416 #[tokio::test]
2417 async fn test_ticket_aggregation_prepare_request_with_0_tickets_should_return_empty_result() -> anyhow::Result<()> {
2418 const COUNT_TICKETS: usize = 0;
2419
2420 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2421
2422 assert_eq!(tickets.len(), COUNT_TICKETS);
2423
2424 let existing_channel_with_0_tickets = channel.get_id();
2425
2426 let actual = db
2427 .prepare_aggregation_in_channel(&existing_channel_with_0_tickets, Default::default())
2428 .await?;
2429
2430 assert_eq!(actual, None);
2431
2432 Ok(())
2433 }
2434
2435 #[tokio::test]
2436 async fn test_ticket_aggregation_prepare_request_with_multiple_tickets_should_return_that_ticket()
2437 -> anyhow::Result<()> {
2438 const COUNT_TICKETS: usize = 2;
2439
2440 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2441 let tickets: Vec<TransferableWinningTicket> = tickets
2442 .into_iter()
2443 .map(|t| t.into_transferable(&ALICE, &Hash::default()))
2444 .collect::<hopr_internal_types::errors::Result<Vec<TransferableWinningTicket>>>()?;
2445
2446 assert_eq!(tickets.len(), COUNT_TICKETS);
2447
2448 let existing_channel_with_multiple_tickets = channel.get_id();
2449
2450 let actual = db
2451 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2452 .await?;
2453
2454 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2455
2456 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2457 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2458 .count(&db.tickets_db)
2459 .await? as usize;
2460
2461 assert_eq!(actual_being_aggregated_count, COUNT_TICKETS);
2462
2463 Ok(())
2464 }
2465
2466 #[tokio::test]
2467 async fn test_ticket_aggregation_prepare_request_with_duplicate_tickets_should_return_dedup_aggregated_ticket()
2468 -> anyhow::Result<()> {
2469 let (db, channel, _) = create_alice_db_with_tickets_from_bob(0).await?;
2470 let tickets = vec![
2471 generate_random_ack_ticket(&BOB, &ALICE, 1, 1, 1.0),
2472 generate_random_ack_ticket(&BOB, &ALICE, 0, 2, 1.0),
2473 generate_random_ack_ticket(&BOB, &ALICE, 2, 1, 1.0),
2474 generate_random_ack_ticket(&BOB, &ALICE, 3, 1, 1.0),
2475 ]
2476 .into_iter()
2477 .collect::<anyhow::Result<Vec<AcknowledgedTicket>>>()?;
2478
2479 let tickets_clone = tickets.clone();
2480 let db_clone = db.clone();
2481 db.nest_transaction_in_db(None, TargetDb::Tickets)
2482 .await?
2483 .perform(|tx| {
2484 Box::pin(async move {
2485 for ticket in tickets_clone {
2486 db_clone.upsert_ticket(tx.into(), ticket).await?;
2487 }
2488 Ok::<_, DbError>(())
2489 })
2490 })
2491 .await?;
2492
2493 let existing_channel_with_multiple_tickets = channel.get_id();
2494 let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2495 assert_eq!(stats.neglected_value, HoprBalance::zero());
2496
2497 let actual = db
2498 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2499 .await?;
2500
2501 let mut tickets = tickets
2502 .into_iter()
2503 .map(|t| t.into_transferable(&ALICE, &Hash::default()))
2504 .collect::<hopr_internal_types::errors::Result<Vec<_>>>()?;
2505
2506 tickets.remove(0);
2508 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2509
2510 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2511 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2512 .count(&db.tickets_db)
2513 .await? as usize;
2514
2515 assert_eq!(actual_being_aggregated_count, 3);
2516
2517 Ok(())
2518 }
2519
2520 #[tokio::test]
2521 async fn test_ticket_aggregation_prepare_request_with_a_being_redeemed_ticket_should_aggregate_only_the_tickets_following_it()
2522 -> anyhow::Result<()> {
2523 const COUNT_TICKETS: usize = 5;
2524
2525 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2526 let mut tickets = tickets
2527 .into_iter()
2528 .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2529 .collect::<Vec<_>>();
2530
2531 assert_eq!(tickets.len(), COUNT_TICKETS);
2532
2533 let existing_channel_with_multiple_tickets = channel.get_id();
2534
2535 let mut ticket = hopr_db_entity::ticket::Entity::find()
2537 .one(&db.tickets_db)
2538 .await?
2539 .context("should have 1 active model")?
2540 .into_active_model();
2541 ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
2542 ticket.save(&db.tickets_db).await?;
2543
2544 let actual = db
2545 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2546 .await?;
2547
2548 tickets.remove(0);
2549 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2550
2551 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2552 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2553 .count(&db.tickets_db)
2554 .await? as usize;
2555
2556 assert_eq!(actual_being_aggregated_count, COUNT_TICKETS - 1);
2557
2558 Ok(())
2559 }
2560
2561 #[tokio::test]
2562 async fn test_ticket_aggregation_prepare_request_with_some_requirements_should_return_when_ticket_threshold_is_met()
2563 -> anyhow::Result<()> {
2564 const COUNT_TICKETS: usize = 5;
2565
2566 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2567 let tickets = tickets
2568 .into_iter()
2569 .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2570 .collect::<Vec<_>>();
2571
2572 assert_eq!(tickets.len(), COUNT_TICKETS);
2573
2574 let existing_channel_with_multiple_tickets = channel.get_id();
2575
2576 let constraints = AggregationPrerequisites {
2577 min_ticket_count: Some(COUNT_TICKETS - 1),
2578 min_unaggregated_ratio: None,
2579 };
2580 let actual = db
2581 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, constraints)
2582 .await?;
2583
2584 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2585
2586 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2587 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2588 .count(&db.tickets_db)
2589 .await? as usize;
2590
2591 assert_eq!(actual_being_aggregated_count, COUNT_TICKETS);
2592
2593 Ok(())
2594 }
2595
2596 #[tokio::test]
2597 async fn test_ticket_aggregation_prepare_request_with_some_requirements_should_not_return_when_ticket_threshold_is_not_met()
2598 -> anyhow::Result<()> {
2599 const COUNT_TICKETS: usize = 2;
2600
2601 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2602
2603 assert_eq!(tickets.len(), COUNT_TICKETS);
2604
2605 let existing_channel_with_multiple_tickets = channel.get_id();
2606
2607 let constraints = AggregationPrerequisites {
2608 min_ticket_count: Some(COUNT_TICKETS + 1),
2609 min_unaggregated_ratio: None,
2610 };
2611 let actual = db
2612 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, constraints)
2613 .await?;
2614
2615 assert_eq!(actual, None);
2616
2617 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2618 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2619 .count(&db.tickets_db)
2620 .await? as usize;
2621
2622 assert_eq!(actual_being_aggregated_count, 0);
2623
2624 Ok(())
2625 }
2626
2627 #[tokio::test]
2628 async fn test_ticket_aggregation_prepare_request_with_no_aggregatable_tickets_should_return_nothing()
2629 -> anyhow::Result<()> {
2630 const COUNT_TICKETS: usize = 3;
2631
2632 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2633
2634 assert_eq!(tickets.len(), { COUNT_TICKETS });
2635
2636 let existing_channel_with_multiple_tickets = channel.get_id();
2637
2638 for ticket in hopr_db_entity::ticket::Entity::find()
2640 .all(&db.tickets_db)
2641 .await?
2642 .into_iter()
2643 {
2644 let mut ticket = ticket.into_active_model();
2645 ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
2646 ticket.save(&db.tickets_db).await?;
2647 }
2648
2649 let actual = db
2650 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2651 .await?;
2652
2653 assert_eq!(actual, None);
2654
2655 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2656 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2657 .count(&db.tickets_db)
2658 .await? as usize;
2659
2660 assert_eq!(actual_being_aggregated_count, 0);
2661
2662 Ok(())
2663 }
2664
2665 #[tokio::test]
2666 async fn test_ticket_aggregation_rollback_should_rollback_all_the_being_aggregated_tickets_but_nothing_else()
2667 -> anyhow::Result<()> {
2668 const COUNT_TICKETS: usize = 5;
2669
2670 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2671
2672 assert_eq!(tickets.len(), COUNT_TICKETS);
2673
2674 let existing_channel_with_multiple_tickets = channel.get_id();
2675
2676 let mut ticket = hopr_db_entity::ticket::Entity::find()
2678 .one(&db.tickets_db)
2679 .await?
2680 .context("should have one active model")?
2681 .into_active_model();
2682 ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
2683 ticket.save(&db.tickets_db).await?;
2684
2685 assert!(
2686 db.prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2687 .await
2688 .is_ok()
2689 );
2690
2691 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2692 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2693 .count(&db.tickets_db)
2694 .await? as usize;
2695
2696 assert_eq!(actual_being_aggregated_count, COUNT_TICKETS - 1);
2697
2698 assert!(
2699 db.rollback_aggregation_in_channel(existing_channel_with_multiple_tickets)
2700 .await
2701 .is_ok()
2702 );
2703
2704 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2705 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2706 .count(&db.tickets_db)
2707 .await? as usize;
2708
2709 assert_eq!(actual_being_aggregated_count, 0);
2710
2711 Ok(())
2712 }
2713
2714 #[tokio::test]
2715 async fn test_ticket_aggregation_should_replace_the_tickets_with_a_correctly_aggregated_ticket()
2716 -> anyhow::Result<()> {
2717 const COUNT_TICKETS: usize = 5;
2718
2719 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2720
2721 let (notifier_tx, notifier_rx) = futures::channel::mpsc::unbounded();
2722 db.start_ticket_processing(Some(notifier_tx))?;
2723
2724 let tickets = tickets
2725 .into_iter()
2726 .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2727 .collect::<Vec<_>>();
2728
2729 let first_ticket = tickets.first().context("should contain tickets")?.ticket.clone();
2730 let aggregated_ticket = TicketBuilder::default()
2731 .addresses(&*BOB, &*ALICE)
2732 .amount(
2733 tickets
2734 .iter()
2735 .fold(U256::zero(), |acc, v| acc + v.ticket.amount.amount()),
2736 )
2737 .index(first_ticket.index)
2738 .index_offset(
2739 tickets.last().context("should contain tickets")?.ticket.index as u32 - first_ticket.index as u32 + 1,
2740 )
2741 .win_prob(1.0.try_into()?)
2742 .channel_epoch(first_ticket.channel_epoch)
2743 .eth_challenge(first_ticket.challenge)
2744 .build_signed(&BOB, &Hash::default())?;
2745
2746 assert_eq!(tickets.len(), COUNT_TICKETS);
2747
2748 let existing_channel_with_multiple_tickets = channel.get_id();
2749
2750 let actual = db
2751 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2752 .await?;
2753
2754 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2755
2756 let agg_ticket = aggregated_ticket.clone();
2757
2758 let _ = db
2759 .process_received_aggregated_ticket(aggregated_ticket.leak(), &ALICE)
2760 .await?;
2761
2762 pin_mut!(notifier_rx);
2763 let notified_ticket = notifier_rx.next().await.ok_or(anyhow!("must have ticket"))?;
2764
2765 assert_eq!(notified_ticket.verified_ticket(), agg_ticket.verified_ticket());
2766
2767 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2768 .filter(ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2769 .count(&db.tickets_db)
2770 .await? as usize;
2771
2772 assert_eq!(actual_being_aggregated_count, 0);
2773
2774 Ok(())
2775 }
2776
2777 #[tokio::test]
2778 async fn test_ticket_aggregation_should_fail_if_the_aggregated_ticket_value_is_lower_than_the_stored_one()
2779 -> anyhow::Result<()> {
2780 const COUNT_TICKETS: usize = 5;
2781
2782 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2783 let tickets = tickets
2784 .into_iter()
2785 .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2786 .collect::<Vec<_>>();
2787
2788 let first_ticket = tickets.first().context("should contain tickets")?.ticket.clone();
2789 let aggregated_ticket = TicketBuilder::default()
2790 .addresses(&*BOB, &*ALICE)
2791 .amount(0)
2792 .index(first_ticket.index)
2793 .index_offset(
2794 tickets.last().context("should contain tickets")?.ticket.index as u32 - first_ticket.index as u32 + 1,
2795 )
2796 .win_prob(1.0.try_into()?)
2797 .channel_epoch(first_ticket.channel_epoch)
2798 .eth_challenge(first_ticket.challenge)
2799 .build_signed(&BOB, &Hash::default())?;
2800
2801 assert_eq!(tickets.len(), COUNT_TICKETS);
2802
2803 let existing_channel_with_multiple_tickets = channel.get_id();
2804
2805 let actual = db
2806 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2807 .await?;
2808
2809 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2810
2811 assert!(
2812 db.process_received_aggregated_ticket(aggregated_ticket.leak(), &ALICE)
2813 .await
2814 .is_err()
2815 );
2816
2817 Ok(())
2818 }
2819
2820 #[tokio::test]
2821 async fn test_ticket_aggregation_should_fail_if_the_aggregated_ticket_win_probability_is_not_equal_to_1()
2822 -> anyhow::Result<()> {
2823 const COUNT_TICKETS: usize = 5;
2824
2825 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2826 let tickets = tickets
2827 .into_iter()
2828 .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2829 .collect::<Vec<_>>();
2830
2831 let first_ticket = tickets.first().context("should contain tickets")?.ticket.clone();
2832 let aggregated_ticket = TicketBuilder::default()
2833 .addresses(&*BOB, &*ALICE)
2834 .amount(0)
2835 .index(first_ticket.index)
2836 .index_offset(
2837 tickets.last().context("should contain tickets")?.ticket.index as u32 - first_ticket.index as u32 + 1,
2838 )
2839 .win_prob(0.5.try_into()?) .channel_epoch(first_ticket.channel_epoch)
2841 .eth_challenge(first_ticket.challenge)
2842 .build_signed(&BOB, &Hash::default())?;
2843
2844 assert_eq!(tickets.len(), COUNT_TICKETS);
2845
2846 let existing_channel_with_multiple_tickets = channel.get_id();
2847
2848 let actual = db
2849 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2850 .await?;
2851
2852 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2853
2854 assert!(
2855 db.process_received_aggregated_ticket(aggregated_ticket.leak(), &ALICE)
2856 .await
2857 .is_err()
2858 );
2859
2860 Ok(())
2861 }
2862
2863 async fn init_db_with_channel(channel: ChannelEntry) -> anyhow::Result<HoprDb> {
2864 let db = HoprDb::new_in_memory(BOB.clone()).await?;
2865
2866 db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
2867 .await?;
2868
2869 add_peer_mappings(
2870 &db,
2871 vec![
2872 (ALICE_OFFCHAIN.clone(), ALICE.clone()),
2873 (BOB_OFFCHAIN.clone(), BOB.clone()),
2874 ],
2875 )
2876 .await?;
2877
2878 db.upsert_channel(None, channel).await?;
2879
2880 Ok(db)
2881 }
2882
2883 #[tokio::test]
2884 async fn test_aggregate_ticket_should_aggregate() -> anyhow::Result<()> {
2885 const COUNT_TICKETS: usize = 5;
2886
2887 let channel = ChannelEntry::new(
2888 BOB.public().to_address(),
2889 ALICE.public().to_address(),
2890 u32::MAX.into(),
2891 (COUNT_TICKETS + 1).into(),
2892 ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(120))),
2893 4_u32.into(),
2894 );
2895
2896 let db = init_db_with_channel(channel).await?;
2897
2898 let tickets = (0..COUNT_TICKETS)
2899 .map(|i| {
2900 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
2901 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
2902 })
2903 .collect::<anyhow::Result<Vec<_>>>()?;
2904
2905 let sum_value = tickets.iter().fold(HoprBalance::zero(), |acc, x| acc + x.ticket.amount);
2906 let min_idx = tickets
2907 .iter()
2908 .map(|t| t.ticket.index)
2909 .min()
2910 .context("min index should be present")?;
2911 let max_idx = tickets
2912 .iter()
2913 .map(|t| t.ticket.index)
2914 .max()
2915 .context("max index should be present")?;
2916
2917 let aggregated = db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets, &BOB).await?;
2918
2919 assert_eq!(
2920 &BOB.public().to_address(),
2921 aggregated.verified_issuer(),
2922 "must have correct signer"
2923 );
2924
2925 assert!(aggregated.verified_ticket().is_aggregated(), "must be aggregated");
2926
2927 assert_eq!(
2928 COUNT_TICKETS,
2929 aggregated.verified_ticket().index_offset as usize,
2930 "aggregated ticket must have correct offset"
2931 );
2932 assert_eq!(
2933 sum_value,
2934 aggregated.verified_ticket().amount,
2935 "aggregated ticket token amount must be sum of individual tickets"
2936 );
2937 assert_eq!(
2938 1.0,
2939 aggregated.win_prob(),
2940 "aggregated ticket must have winning probability 1"
2941 );
2942 assert_eq!(
2943 min_idx,
2944 aggregated.verified_ticket().index,
2945 "aggregated ticket must have correct index"
2946 );
2947 assert_eq!(
2948 channel.get_id(),
2949 aggregated.verified_ticket().channel_id,
2950 "aggregated ticket must have correct channel id"
2951 );
2952 assert_eq!(
2953 channel.channel_epoch.as_u32(),
2954 aggregated.verified_ticket().channel_epoch,
2955 "aggregated ticket must have correct channel epoch"
2956 );
2957
2958 assert_eq!(
2959 max_idx + 1,
2960 db.get_outgoing_ticket_index(channel.get_id())
2961 .await?
2962 .load(Ordering::SeqCst)
2963 );
2964
2965 Ok(())
2966 }
2967
2968 #[tokio::test]
2969 async fn test_aggregate_ticket_should_aggregate_including_aggregated() -> anyhow::Result<()> {
2970 const COUNT_TICKETS: usize = 5;
2971
2972 let channel = ChannelEntry::new(
2973 BOB.public().to_address(),
2974 ALICE.public().to_address(),
2975 u32::MAX.into(),
2976 (COUNT_TICKETS + 1).into(),
2977 ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(120))),
2978 4_u32.into(),
2979 );
2980
2981 let db = init_db_with_channel(channel).await?;
2982
2983 let offset = 10_usize;
2984
2985 let mut tickets = (1..COUNT_TICKETS)
2986 .map(|i| {
2987 generate_random_ack_ticket(&BOB, &ALICE, (i + offset) as u64, 1, 1.0)
2988 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
2989 })
2990 .collect::<anyhow::Result<Vec<_>>>()?;
2991
2992 tickets.push(
2994 generate_random_ack_ticket(&BOB, &ALICE, 0, offset as u32, 1.0)
2995 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))?,
2996 );
2997
2998 let sum_value = tickets.iter().fold(HoprBalance::zero(), |acc, x| acc + x.ticket.amount);
2999 let min_idx = tickets
3000 .iter()
3001 .map(|t| t.ticket.index)
3002 .min()
3003 .context("min index should be present")?;
3004 let max_idx = tickets
3005 .iter()
3006 .map(|t| t.ticket.index)
3007 .max()
3008 .context("max index should be present")?;
3009
3010 let aggregated = db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets, &BOB).await?;
3011
3012 assert_eq!(
3013 &BOB.public().to_address(),
3014 aggregated.verified_issuer(),
3015 "must have correct signer"
3016 );
3017
3018 assert!(aggregated.verified_ticket().is_aggregated(), "must be aggregated");
3019
3020 assert_eq!(
3021 COUNT_TICKETS + offset,
3022 aggregated.verified_ticket().index_offset as usize,
3023 "aggregated ticket must have correct offset"
3024 );
3025 assert_eq!(
3026 sum_value,
3027 aggregated.verified_ticket().amount,
3028 "aggregated ticket token amount must be sum of individual tickets"
3029 );
3030 assert_eq!(
3031 1.0,
3032 aggregated.win_prob(),
3033 "aggregated ticket must have winning probability 1"
3034 );
3035 assert_eq!(
3036 min_idx,
3037 aggregated.verified_ticket().index,
3038 "aggregated ticket must have correct index"
3039 );
3040 assert_eq!(
3041 channel.get_id(),
3042 aggregated.verified_ticket().channel_id,
3043 "aggregated ticket must have correct channel id"
3044 );
3045 assert_eq!(
3046 channel.channel_epoch.as_u32(),
3047 aggregated.verified_ticket().channel_epoch,
3048 "aggregated ticket must have correct channel epoch"
3049 );
3050
3051 assert_eq!(
3052 max_idx + 1,
3053 db.get_outgoing_ticket_index(channel.get_id())
3054 .await?
3055 .load(Ordering::SeqCst)
3056 );
3057
3058 Ok(())
3059 }
3060
3061 #[tokio::test]
3062 async fn test_aggregate_ticket_should_not_aggregate_zero_tickets() -> anyhow::Result<()> {
3063 let db = HoprDb::new_in_memory(BOB.clone()).await?;
3064
3065 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), vec![], &BOB)
3066 .await
3067 .expect_err("should not aggregate empty ticket list");
3068
3069 Ok(())
3070 }
3071
3072 #[tokio::test]
3073 async fn test_aggregate_ticket_should_aggregate_single_ticket_to_itself() -> anyhow::Result<()> {
3074 const COUNT_TICKETS: usize = 1;
3075
3076 let channel = ChannelEntry::new(
3077 BOB.public().to_address(),
3078 ALICE.public().to_address(),
3079 u32::MAX.into(),
3080 (COUNT_TICKETS + 1).into(),
3081 ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(120))),
3082 4_u32.into(),
3083 );
3084
3085 let db = init_db_with_channel(channel).await?;
3086
3087 let mut tickets = (0..COUNT_TICKETS)
3088 .map(|i| {
3089 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3090 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3091 })
3092 .collect::<anyhow::Result<Vec<_>>>()?;
3093
3094 let aggregated = db
3095 .aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3096 .await?;
3097
3098 assert_eq!(
3099 &tickets.pop().context("ticket should be present")?.ticket,
3100 aggregated.verified_ticket()
3101 );
3102
3103 Ok(())
3104 }
3105
3106 #[tokio::test]
3107 async fn test_aggregate_ticket_should_not_aggregate_on_closed_channel() -> anyhow::Result<()> {
3108 const COUNT_TICKETS: usize = 3;
3109
3110 let channel = ChannelEntry::new(
3111 BOB.public().to_address(),
3112 ALICE.public().to_address(),
3113 u32::MAX.into(),
3114 (COUNT_TICKETS + 1).into(),
3115 ChannelStatus::Closed,
3116 4_u32.into(),
3117 );
3118
3119 let db = init_db_with_channel(channel).await?;
3120
3121 let tickets = (0..COUNT_TICKETS)
3122 .map(|i| {
3123 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3124 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3125 })
3126 .collect::<anyhow::Result<Vec<_>>>()?;
3127
3128 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3129 .await
3130 .expect_err("should not aggregate on closed channel");
3131
3132 Ok(())
3133 }
3134
3135 #[tokio::test]
3136 async fn test_aggregate_ticket_should_not_aggregate_on_incoming_channel() -> anyhow::Result<()> {
3137 const COUNT_TICKETS: usize = 3;
3138
3139 let channel = ChannelEntry::new(
3140 ALICE.public().to_address(),
3141 BOB.public().to_address(),
3142 u32::MAX.into(),
3143 (COUNT_TICKETS + 1).into(),
3144 ChannelStatus::Open,
3145 4_u32.into(),
3146 );
3147
3148 let db = init_db_with_channel(channel).await?;
3149
3150 let tickets = (0..COUNT_TICKETS)
3151 .map(|i| {
3152 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3153 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3154 })
3155 .collect::<anyhow::Result<Vec<_>>>()?;
3156
3157 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3158 .await
3159 .expect_err("should not aggregate on incoming channel");
3160
3161 Ok(())
3162 }
3163
3164 #[tokio::test]
3165 async fn test_aggregate_ticket_should_not_aggregate_if_mismatching_channel_ids() -> anyhow::Result<()> {
3166 const COUNT_TICKETS: usize = 3;
3167
3168 let channel = ChannelEntry::new(
3169 ALICE.public().to_address(),
3170 BOB.public().to_address(),
3171 u32::MAX.into(),
3172 (COUNT_TICKETS + 1).into(),
3173 ChannelStatus::Open,
3174 4_u32.into(),
3175 );
3176
3177 let db = init_db_with_channel(channel).await?;
3178
3179 let mut tickets = (0..COUNT_TICKETS)
3180 .map(|i| {
3181 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3182 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3183 })
3184 .collect::<anyhow::Result<Vec<_>>>()?;
3185
3186 tickets[2] = generate_random_ack_ticket(&BOB, &ChainKeypair::random(), 2, 1, 1.0)
3187 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))?;
3188
3189 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3190 .await
3191 .expect_err("should not aggregate on mismatching channel ids");
3192
3193 Ok(())
3194 }
3195
3196 #[tokio::test]
3197 async fn test_aggregate_ticket_should_not_aggregate_if_mismatching_channel_epoch() -> anyhow::Result<()> {
3198 const COUNT_TICKETS: usize = 3;
3199
3200 let channel = ChannelEntry::new(
3201 ALICE.public().to_address(),
3202 BOB.public().to_address(),
3203 100.into(),
3204 (COUNT_TICKETS + 1).into(),
3205 ChannelStatus::Open,
3206 3_u32.into(),
3207 );
3208
3209 let db = init_db_with_channel(channel).await?;
3210
3211 let tickets = (0..COUNT_TICKETS)
3212 .map(|i| {
3213 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3214 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3215 })
3216 .collect::<anyhow::Result<Vec<_>>>()?;
3217
3218 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3219 .await
3220 .expect_err("should not aggregate on mismatching channel epoch");
3221
3222 Ok(())
3223 }
3224
3225 #[tokio::test]
3226 async fn test_aggregate_ticket_should_not_aggregate_if_ticket_indices_overlap() -> anyhow::Result<()> {
3227 const COUNT_TICKETS: usize = 3;
3228
3229 let channel = ChannelEntry::new(
3230 ALICE.public().to_address(),
3231 BOB.public().to_address(),
3232 u32::MAX.into(),
3233 (COUNT_TICKETS + 1).into(),
3234 ChannelStatus::Open,
3235 3_u32.into(),
3236 );
3237
3238 let db = init_db_with_channel(channel).await?;
3239
3240 let mut tickets = (0..COUNT_TICKETS)
3241 .map(|i| {
3242 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3243 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3244 })
3245 .collect::<anyhow::Result<Vec<_>>>()?;
3246
3247 tickets[1] = generate_random_ack_ticket(&BOB, &ALICE, 1, 2, 1.0)
3248 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))?;
3249
3250 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3251 .await
3252 .expect_err("should not aggregate on overlapping ticket indices");
3253 Ok(())
3254 }
3255
3256 #[tokio::test]
3257 async fn test_aggregate_ticket_should_not_aggregate_if_ticket_is_not_valid() -> anyhow::Result<()> {
3258 const COUNT_TICKETS: usize = 3;
3259
3260 let channel = ChannelEntry::new(
3261 ALICE.public().to_address(),
3262 BOB.public().to_address(),
3263 u32::MAX.into(),
3264 (COUNT_TICKETS + 1).into(),
3265 ChannelStatus::Open,
3266 3_u32.into(),
3267 );
3268
3269 let db = init_db_with_channel(channel).await?;
3270
3271 let mut tickets = (0..COUNT_TICKETS)
3272 .map(|i| {
3273 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3274 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3275 })
3276 .collect::<anyhow::Result<Vec<_>>>()?;
3277
3278 tickets[1].ticket.amount = (TICKET_VALUE - 10).into();
3280
3281 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3282 .await
3283 .expect_err("should not aggregate on invalid tickets");
3284
3285 Ok(())
3286 }
3287
3288 #[tokio::test]
3289 async fn test_aggregate_ticket_should_not_aggregate_if_ticket_has_lower_than_min_win_prob() -> anyhow::Result<()> {
3290 const COUNT_TICKETS: usize = 3;
3291
3292 let channel = ChannelEntry::new(
3293 ALICE.public().to_address(),
3294 BOB.public().to_address(),
3295 u32::MAX.into(),
3296 (COUNT_TICKETS + 1).into(),
3297 ChannelStatus::Open,
3298 3_u32.into(),
3299 );
3300
3301 let db = init_db_with_channel(channel).await?;
3302
3303 let tickets = (0..COUNT_TICKETS)
3304 .map(|i| {
3305 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, if i > 0 { 1.0 } else { 0.9 })
3306 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3307 })
3308 .collect::<anyhow::Result<Vec<_>>>()?;
3309
3310 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3311 .await
3312 .expect_err("should not aggregate tickets with less than minimum win prob");
3313
3314 Ok(())
3315 }
3316
3317 #[tokio::test]
3318 async fn test_aggregate_ticket_should_not_aggregate_if_ticket_is_not_winning() -> anyhow::Result<()> {
3319 const COUNT_TICKETS: usize = 3;
3320
3321 let channel = ChannelEntry::new(
3322 ALICE.public().to_address(),
3323 BOB.public().to_address(),
3324 u32::MAX.into(),
3325 (COUNT_TICKETS + 1).into(),
3326 ChannelStatus::Open,
3327 3_u32.into(),
3328 );
3329
3330 let db = init_db_with_channel(channel).await?;
3331
3332 let mut tickets = (0..COUNT_TICKETS)
3333 .map(|i| {
3334 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3335 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3336 })
3337 .collect::<anyhow::Result<Vec<_>>>()?;
3338
3339 let resp = Response::from_half_keys(&HalfKey::random(), &HalfKey::random())?;
3341 tickets[1] = TicketBuilder::from(tickets[1].ticket.clone())
3342 .win_prob(0.0.try_into()?)
3343 .challenge(resp.to_challenge()?)
3344 .build_signed(&BOB, &Hash::default())?
3345 .into_acknowledged(resp)
3346 .into_transferable(&ALICE, &Hash::default())?;
3347
3348 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3349 .await
3350 .expect_err("should not aggregate non-winning tickets");
3351
3352 Ok(())
3353 }
3354
3355 #[tokio::test]
3356 async fn test_set_ticket_statistics_when_tickets_are_in_db() -> anyhow::Result<()> {
3357 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
3358
3359 let ticket = init_db_with_tickets(&db, 1).await?.1.pop().unwrap();
3360
3361 db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed)
3362 .await
3363 .expect("must not fail");
3364
3365 let stats = db.get_ticket_statistics(None).await.expect("must not fail");
3366 assert_ne!(stats.redeemed_value, HoprBalance::zero());
3367
3368 db.reset_ticket_statistics().await.expect("must not fail");
3369
3370 let stats = db.get_ticket_statistics(None).await.expect("must not fail");
3371 assert_eq!(stats.redeemed_value, HoprBalance::zero());
3372
3373 Ok(())
3374 }
3375
3376 #[tokio::test]
3377 async fn test_fix_channels_ticket_state() -> anyhow::Result<()> {
3378 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
3379 const COUNT_TICKETS: u64 = 1;
3380
3381 let (..) = init_db_with_tickets(&db, COUNT_TICKETS).await?;
3382
3383 let mut ticket = hopr_db_entity::ticket::Entity::find()
3385 .one(&db.tickets_db)
3386 .await?
3387 .context("should have one active model")?
3388 .into_active_model();
3389 ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
3390 ticket.save(&db.tickets_db).await?;
3391
3392 assert!(
3393 hopr_db_entity::ticket::Entity::find()
3394 .one(&db.tickets_db)
3395 .await?
3396 .context("should have one active model")?
3397 .state
3398 == AcknowledgedTicketStatus::BeingRedeemed as i8,
3399 );
3400
3401 db.fix_channels_next_ticket_state().await.expect("must not fail");
3402
3403 assert!(
3404 hopr_db_entity::ticket::Entity::find()
3405 .one(&db.tickets_db)
3406 .await?
3407 .context("should have one active model")?
3408 .state
3409 == AcknowledgedTicketStatus::Untouched as i8,
3410 );
3411
3412 Ok(())
3413 }
3414
3415 #[tokio::test]
3416 async fn test_dont_fix_correct_channels_ticket_state() -> anyhow::Result<()> {
3417 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
3418 const COUNT_TICKETS: u64 = 2;
3419
3420 let (..) = init_db_with_tickets_and_channel(&db, COUNT_TICKETS, Some(1u32)).await?;
3422
3423 let mut ticket = hopr_db_entity::ticket::Entity::find()
3425 .filter(ticket::Column::Index.eq(0u64.to_be_bytes().to_vec()))
3426 .one(&db.tickets_db)
3427 .await?
3428 .context("should have one active model")?
3429 .into_active_model();
3430 ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
3431 ticket.save(&db.tickets_db).await?;
3432
3433 assert!(
3434 hopr_db_entity::ticket::Entity::find()
3435 .filter(ticket::Column::Index.eq(0u64.to_be_bytes().to_vec()))
3436 .one(&db.tickets_db)
3437 .await?
3438 .context("should have one active model")?
3439 .state
3440 == AcknowledgedTicketStatus::BeingRedeemed as i8,
3441 );
3442
3443 db.fix_channels_next_ticket_state().await.expect("must not fail");
3444
3445 let ticket0 = hopr_db_entity::ticket::Entity::find()
3447 .filter(ticket::Column::Index.eq(0u64.to_be_bytes().to_vec()))
3448 .one(&db.tickets_db)
3449 .await?
3450 .context("should have one active model")?;
3451 assert_eq!(ticket0.state, AcknowledgedTicketStatus::BeingRedeemed as i8);
3452
3453 let ticket1 = hopr_db_entity::ticket::Entity::find()
3455 .filter(ticket::Column::Index.eq(1u64.to_be_bytes().to_vec()))
3456 .one(&db.tickets_db)
3457 .await?
3458 .context("should have one active model")?;
3459 assert_eq!(ticket1.state, AcknowledgedTicketStatus::Untouched as i8);
3460
3461 Ok(())
3462 }
3463}