1use std::{
2 cmp,
3 ops::{Add, Bound},
4 sync::{
5 Arc,
6 atomic::{AtomicU64, Ordering},
7 },
8};
9
10use async_stream::stream;
11use async_trait::async_trait;
12use futures::{StreamExt, TryStreamExt, stream::BoxStream};
13use hopr_crypto_types::prelude::*;
14use hopr_db_api::{
15 errors::Result,
16 info::DomainSeparator,
17 prelude::{TicketIndexSelector, TicketMarker},
18 resolver::HoprDbResolverOperations,
19 tickets::{AggregationPrerequisites, ChannelTicketStatistics, HoprDbTicketOperations, TicketSelector},
20};
21use hopr_db_entity::{outgoing_ticket_index, ticket, ticket_statistics};
22use hopr_internal_types::prelude::*;
23#[cfg(all(feature = "prometheus", not(test)))]
24use hopr_metrics::metrics::MultiGauge;
25use hopr_primitive_types::prelude::*;
26use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QueryOrder, QuerySelect, Set};
27use sea_query::{Condition, Expr, IntoCondition, SimpleExpr};
28use tracing::{debug, error, info, trace, warn};
29
30use crate::{
31 HoprDbGeneralModelOperations, OpenTransaction, OptTx, TargetDb,
32 channels::HoprDbChannelOperations,
33 db::HoprDb,
34 errors::{DbSqlError, DbSqlError::LogicalError},
35 info::HoprDbInfoOperations,
36};
37
38#[cfg(all(feature = "prometheus", not(test)))]
39lazy_static::lazy_static! {
40 pub static ref METRIC_HOPR_TICKETS_INCOMING_STATISTICS: MultiGauge = MultiGauge::new(
41 "hopr_tickets_incoming_statistics",
42 "Ticket statistics for channels with incoming tickets.",
43 &["channel", "statistic"]
44 ).unwrap();
45}
46
47pub const MAX_TICKETS_TO_AGGREGATE_BATCH: u64 = 500;
49
50#[derive(Clone)]
54pub(crate) struct WrappedTicketSelector(pub(crate) TicketSelector);
55
56impl From<TicketSelector> for WrappedTicketSelector {
57 fn from(selector: TicketSelector) -> Self {
58 Self(selector)
59 }
60}
61
62impl AsRef<TicketSelector> for WrappedTicketSelector {
63 fn as_ref(&self) -> &TicketSelector {
64 &self.0
65 }
66}
67
68impl IntoCondition for WrappedTicketSelector {
69 fn into_condition(self) -> Condition {
70 let expr = self
71 .0
72 .channel_identifiers
73 .into_iter()
74 .map(|(channel_id, epoch)| {
75 ticket::Column::ChannelId
76 .eq(channel_id.to_hex())
77 .and(ticket::Column::ChannelEpoch.eq(epoch.to_be_bytes().to_vec()))
78 })
79 .reduce(SimpleExpr::or);
80
81 if expr.is_none() {
83 return Condition::any().not();
84 }
85
86 let mut expr = expr.unwrap();
87
88 match self.0.index {
89 TicketIndexSelector::None => {
90 }
92 TicketIndexSelector::Single(idx) => expr = expr.and(ticket::Column::Index.eq(idx.to_be_bytes().to_vec())),
93 TicketIndexSelector::Multiple(idxs) => {
94 expr = expr.and(ticket::Column::Index.is_in(idxs.into_iter().map(|i| i.to_be_bytes().to_vec())));
95 }
96 TicketIndexSelector::Range((lb, ub)) => {
97 expr = match lb {
98 Bound::Included(gte) => expr.and(ticket::Column::Index.gte(gte.to_be_bytes().to_vec())),
99 Bound::Excluded(gt) => expr.and(ticket::Column::Index.gt(gt.to_be_bytes().to_vec())),
100 Bound::Unbounded => expr,
101 };
102 expr = match ub {
103 Bound::Included(lte) => expr.and(ticket::Column::Index.lte(lte.to_be_bytes().to_vec())),
104 Bound::Excluded(lt) => expr.and(ticket::Column::Index.lt(lt.to_be_bytes().to_vec())),
105 Bound::Unbounded => expr,
106 };
107 }
108 }
109
110 if let Some(state) = self.0.state {
111 expr = expr.and(ticket::Column::State.eq(state as u8))
112 }
113
114 if self.0.only_aggregated {
115 expr = expr.and(ticket::Column::IndexOffset.gt(1));
116 }
117
118 expr = match self.0.win_prob.0 {
120 Bound::Included(gte) => expr.and(ticket::Column::WinningProbability.gte(gte.as_encoded().to_vec())),
121 Bound::Excluded(gt) => expr.and(ticket::Column::WinningProbability.gt(gt.as_encoded().to_vec())),
122 Bound::Unbounded => expr,
123 };
124
125 expr = match self.0.win_prob.1 {
127 Bound::Included(lte) => expr.and(ticket::Column::WinningProbability.lte(lte.as_encoded().to_vec())),
128 Bound::Excluded(lt) => expr.and(ticket::Column::WinningProbability.lt(lt.as_encoded().to_vec())),
129 Bound::Unbounded => expr,
130 };
131
132 expr = match self.0.amount.0 {
134 Bound::Included(gte) => expr.and(ticket::Column::Amount.gte(gte.amount().to_be_bytes().to_vec())),
135 Bound::Excluded(gt) => expr.and(ticket::Column::Amount.gt(gt.amount().to_be_bytes().to_vec())),
136 Bound::Unbounded => expr,
137 };
138
139 expr = match self.0.amount.1 {
141 Bound::Included(lte) => expr.and(ticket::Column::Amount.lte(lte.amount().to_be_bytes().to_vec())),
142 Bound::Excluded(lt) => expr.and(ticket::Column::Amount.lt(lt.amount().to_be_bytes().to_vec())),
143 Bound::Unbounded => expr,
144 };
145
146 expr.into_condition()
147 }
148}
149
150pub(crate) fn filter_satisfying_ticket_models(
159 prerequisites: AggregationPrerequisites,
160 models: Vec<ticket::Model>,
161 channel_entry: &ChannelEntry,
162 min_win_prob: WinningProbability,
163) -> crate::errors::Result<Vec<ticket::Model>> {
164 let channel_id = channel_entry.get_id();
165
166 let mut to_be_aggregated = Vec::with_capacity(models.len());
167 let mut total_balance = HoprBalance::zero();
168
169 for m in models {
170 let ticket_wp: WinningProbability = m
171 .winning_probability
172 .as_slice()
173 .try_into()
174 .map_err(|_| DbSqlError::DecodingError)?;
175
176 if ticket_wp.approx_cmp(&min_win_prob).is_lt() {
177 warn!(
178 channel_id = %channel_entry.get_id(),
179 %ticket_wp, %min_win_prob, "encountered ticket with winning probability lower than the minimum threshold"
180 );
181 continue;
182 }
183
184 let to_add = HoprBalance::from_be_bytes(&m.amount);
185
186 total_balance += to_add;
188 if total_balance.gt(&channel_entry.balance) {
189 total_balance -= to_add;
191 break;
192 }
193
194 to_be_aggregated.push(m);
195 }
196
197 if prerequisites.min_ticket_count.is_none() && prerequisites.min_unaggregated_ratio.is_none() {
199 info!(channel = %channel_id, "Aggregation check OK, no aggregation prerequisites were given");
200 return Ok(to_be_aggregated);
201 }
202
203 let to_be_agg_count = to_be_aggregated.len();
204
205 if let Some(agg_threshold) = prerequisites.min_ticket_count {
207 if to_be_agg_count >= agg_threshold {
208 info!(channel = %channel_id, count = to_be_agg_count, threshold = agg_threshold, "Aggregation check OK aggregated value greater than threshold");
209 return Ok(to_be_aggregated);
210 } else {
211 debug!(channel = %channel_id, count = to_be_agg_count, threshold = agg_threshold,"Aggregation check FAIL not enough resources to aggregate");
212 }
213 }
214
215 if let Some(unrealized_threshold) = prerequisites.min_unaggregated_ratio {
216 let diminished_balance = channel_entry.balance.mul_f64(unrealized_threshold)?;
217
218 if total_balance.ge(&diminished_balance) {
221 if to_be_agg_count > 1 {
222 info!(channel = %channel_id, count = to_be_agg_count, balance = ?total_balance, ?diminished_balance, "Aggregation check OK: more unrealized than diminished balance");
223 return Ok(to_be_aggregated);
224 } else {
225 debug!(channel = %channel_id, count = to_be_agg_count, balance = ?total_balance, ?diminished_balance, "Aggregation check FAIL: more unrealized than diminished balance but only 1 ticket");
226 }
227 } else {
228 debug!(channel = %channel_id, count = to_be_agg_count, balance = ?total_balance, ?diminished_balance, "Aggregation check FAIL: less unrealized than diminished balance");
229 }
230 }
231
232 debug!(channel = %channel_id,"Aggregation check FAIL: no prerequisites were met");
233 Ok(vec![])
234}
235
236pub(crate) async fn find_stats_for_channel(
237 tx: &OpenTransaction,
238 channel_id: &Hash,
239) -> crate::errors::Result<ticket_statistics::Model> {
240 if let Some(model) = ticket_statistics::Entity::find()
241 .filter(ticket_statistics::Column::ChannelId.eq(channel_id.to_hex()))
242 .one(tx.as_ref())
243 .await?
244 {
245 Ok(model)
246 } else {
247 let new_stats = ticket_statistics::ActiveModel {
248 channel_id: Set(channel_id.to_hex()),
249 ..Default::default()
250 }
251 .insert(tx.as_ref())
252 .await?;
253
254 Ok(new_stats)
255 }
256}
257
258impl HoprDb {
259 async fn get_tickets_value_int<'a>(
260 &'a self,
261 tx: OptTx<'a>,
262 selector: TicketSelector,
263 ) -> Result<(usize, HoprBalance)> {
264 let selector: WrappedTicketSelector = selector.into();
265 Ok(self
266 .nest_transaction_in_db(tx, TargetDb::Tickets)
267 .await?
268 .perform(|tx| {
269 Box::pin(async move {
270 ticket::Entity::find()
271 .filter(selector)
272 .stream(tx.as_ref())
273 .await
274 .map_err(DbSqlError::from)?
275 .map_err(DbSqlError::from)
276 .try_fold((0_usize, HoprBalance::zero()), |(count, value), t| async move {
277 Ok((count + 1, value + HoprBalance::from_be_bytes(t.amount)))
278 })
279 .await
280 })
281 })
282 .await?)
283 }
284}
285
286#[async_trait]
287impl HoprDbTicketOperations for HoprDb {
288 async fn get_all_tickets(&self) -> Result<Vec<AcknowledgedTicket>> {
289 Ok(self
290 .nest_transaction_in_db(None, TargetDb::Tickets)
291 .await?
292 .perform(|tx| {
293 Box::pin(async move {
294 ticket::Entity::find()
295 .all(tx.as_ref())
296 .await?
297 .into_iter()
298 .map(AcknowledgedTicket::try_from)
299 .collect::<hopr_db_entity::errors::Result<Vec<_>>>()
300 .map_err(DbSqlError::from)
301 })
302 })
303 .await?)
304 }
305
306 async fn get_tickets(&self, selector: TicketSelector) -> Result<Vec<AcknowledgedTicket>> {
307 debug!("fetching tickets via {selector}");
308 let selector: WrappedTicketSelector = selector.into();
309
310 Ok(self
311 .nest_transaction_in_db(None, TargetDb::Tickets)
312 .await?
313 .perform(|tx| {
314 Box::pin(async move {
315 ticket::Entity::find()
316 .filter(selector)
317 .all(tx.as_ref())
318 .await?
319 .into_iter()
320 .map(AcknowledgedTicket::try_from)
321 .collect::<hopr_db_entity::errors::Result<Vec<_>>>()
322 .map_err(DbSqlError::from)
323 })
324 })
325 .await?)
326 }
327
328 async fn mark_tickets_as(&self, selector: TicketSelector, mark_as: TicketMarker) -> Result<usize> {
329 let myself = self.clone();
330 Ok(self
331 .ticket_manager
332 .with_write_locked_db(|tx| {
333 Box::pin(async move {
334 let mut total_marked_count = 0;
335 for (channel_id, epoch) in selector.channel_identifiers.iter() {
336 let channel_selector = selector.clone().just_on_channel(*channel_id, epoch);
337
338 let (marked_count, marked_value) =
340 myself.get_tickets_value_int(Some(tx), channel_selector.clone()).await?;
341 trace!(marked_count, ?marked_value, ?mark_as, "ticket marking");
342
343 if marked_count > 0 {
344 let deleted = ticket::Entity::delete_many()
346 .filter(WrappedTicketSelector::from(channel_selector.clone()))
347 .exec(tx.as_ref())
348 .await?;
349
350 if deleted.rows_affected == marked_count as u64 {
352 let mut new_stats = find_stats_for_channel(tx, channel_id).await?.into_active_model();
353 let _current_value = match mark_as {
354 TicketMarker::Redeemed => {
355 let current_value = U256::from_be_bytes(new_stats.redeemed_value.as_ref());
356 new_stats.redeemed_value =
357 Set((current_value + marked_value.amount()).to_be_bytes().into());
358 current_value
359 }
360 TicketMarker::Rejected => {
361 let current_value = U256::from_be_bytes(new_stats.rejected_value.as_ref());
362 new_stats.rejected_value =
363 Set((current_value + marked_value.amount()).to_be_bytes().into());
364 current_value
365 }
366 TicketMarker::Neglected => {
367 let current_value = U256::from_be_bytes(new_stats.neglected_value.as_ref());
368 new_stats.neglected_value =
369 Set((current_value + marked_value.amount()).to_be_bytes().into());
370 current_value
371 }
372 };
373 new_stats.save(tx.as_ref()).await?;
374
375 #[cfg(all(feature = "prometheus", not(test)))]
376 {
377 let channel = channel_id.to_string();
378 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
379 &[&channel, &mark_as.to_string()],
380 (_current_value + marked_value.amount()).as_u128() as f64,
381 );
382
383 if mark_as != TicketMarker::Rejected {
386 let unredeemed_value = myself
387 .caches
388 .unrealized_value
389 .get(&(*channel_id, *epoch))
390 .await
391 .unwrap_or_default();
392
393 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
394 &[&channel, "unredeemed"],
395 (unredeemed_value - marked_value.amount()).amount().as_u128() as f64,
396 );
397 }
398 }
399
400 myself.caches.unrealized_value.invalidate(&(*channel_id, *epoch)).await;
401 } else {
402 return Err(DbSqlError::LogicalError(format!(
403 "could not mark {marked_count} ticket as {mark_as}"
404 )));
405 }
406
407 trace!(marked_count, ?channel_id, ?mark_as, "removed tickets in channel");
408 total_marked_count += marked_count;
409 }
410 }
411
412 info!(
413 count = total_marked_count,
414 ?mark_as,
415 channel_count = selector.channel_identifiers.len(),
416 "removed tickets in channels",
417 );
418 Ok(total_marked_count)
419 })
420 })
421 .await?)
422 }
423
424 async fn mark_unsaved_ticket_rejected(&self, ticket: &Ticket) -> Result<()> {
425 let channel_id = ticket.channel_id;
426 let amount = ticket.amount;
427 Ok(self
428 .ticket_manager
429 .with_write_locked_db(|tx| {
430 Box::pin(async move {
431 let stats = find_stats_for_channel(tx, &channel_id).await?;
432
433 let current_rejected_value = U256::from_be_bytes(stats.rejected_value.clone());
434
435 let mut active_stats = stats.into_active_model();
436 active_stats.rejected_value = Set((current_rejected_value + amount.amount()).to_be_bytes().into());
437 active_stats.save(tx.as_ref()).await?;
438
439 #[cfg(all(feature = "prometheus", not(test)))]
440 {
441 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
442 &[&channel_id.to_string(), "rejected"],
443 (current_rejected_value + amount.amount()).as_u128() as f64,
444 );
445 }
446
447 Ok::<(), DbSqlError>(())
448 })
449 })
450 .await?)
451 }
452
453 async fn update_ticket_states_and_fetch<'a>(
454 &'a self,
455 selector: TicketSelector,
456 new_state: AcknowledgedTicketStatus,
457 ) -> Result<BoxStream<'a, AcknowledgedTicket>> {
458 let selector: WrappedTicketSelector = selector.into();
459 Ok(Box::pin(stream! {
460 match ticket::Entity::find()
461 .filter(selector)
462 .stream(self.conn(TargetDb::Tickets))
463 .await {
464 Ok(mut stream) => {
465 while let Ok(Some(ticket)) = stream.try_next().await {
466 let active_ticket = ticket::ActiveModel {
467 id: Set(ticket.id),
468 state: Set(new_state as i8),
469 ..Default::default()
470 };
471
472 {
473 let _g = self.ticket_manager.mutex.lock();
474 if let Err(e) = active_ticket.update(self.conn(TargetDb::Tickets)).await {
475 error!(error = %e,"failed to update ticket in the db");
476 }
477 }
478
479 match AcknowledgedTicket::try_from(ticket) {
480 Ok(mut ticket) => {
481 ticket.status = new_state;
483 yield ticket
484 },
485 Err(e) => {
486 tracing::error!(error = %e, "failed to decode ticket from the db");
487 }
488 }
489 }
490 },
491 Err(e) => tracing::error!(error = %e, "failed open ticket db stream")
492 }
493 }))
494 }
495
496 async fn update_ticket_states(
497 &self,
498 selector: TicketSelector,
499 new_state: AcknowledgedTicketStatus,
500 ) -> Result<usize> {
501 let selector: WrappedTicketSelector = selector.into();
502 Ok(self
503 .ticket_manager
504 .with_write_locked_db(|tx| {
505 Box::pin(async move {
506 let update = ticket::Entity::update_many()
507 .filter(selector)
508 .col_expr(ticket::Column::State, Expr::value(new_state as u8))
509 .exec(tx.as_ref())
510 .await?;
511 Ok::<_, DbSqlError>(update.rows_affected as usize)
512 })
513 })
514 .await?)
515 }
516
517 async fn get_ticket_statistics(&self, channel_id: Option<Hash>) -> Result<ChannelTicketStatistics> {
518 let res = match channel_id {
519 None => {
520 #[cfg(all(feature = "prometheus", not(test)))]
521 let mut per_channel_unredeemed = std::collections::HashMap::new();
522
523 self.nest_transaction_in_db(None, TargetDb::Tickets)
524 .await?
525 .perform(|tx| {
526 Box::pin(async move {
527 let unredeemed_value = ticket::Entity::find()
528 .stream(tx.as_ref())
529 .await?
530 .try_fold(U256::zero(), |amount, x| {
531 let unredeemed_value = U256::from_be_bytes(x.amount);
532
533 #[cfg(all(feature = "prometheus", not(test)))]
534 per_channel_unredeemed
535 .entry(x.channel_id)
536 .and_modify(|v| *v += unredeemed_value)
537 .or_insert(unredeemed_value);
538
539 futures::future::ok(amount + unredeemed_value)
540 })
541 .await?;
542
543 #[cfg(all(feature = "prometheus", not(test)))]
544 for (channel_id, unredeemed_value) in per_channel_unredeemed {
545 METRIC_HOPR_TICKETS_INCOMING_STATISTICS
546 .set(&[&channel_id, "unredeemed"], unredeemed_value.as_u128() as f64);
547 }
548
549 let mut all_stats = ticket_statistics::Entity::find()
550 .all(tx.as_ref())
551 .await?
552 .into_iter()
553 .fold(ChannelTicketStatistics::default(), |mut acc, stats| {
554 let neglected_value = HoprBalance::from_be_bytes(stats.neglected_value);
555 acc.neglected_value += neglected_value;
556 let redeemed_value = HoprBalance::from_be_bytes(stats.redeemed_value);
557 acc.redeemed_value += redeemed_value;
558 let rejected_value = HoprBalance::from_be_bytes(stats.rejected_value);
559 acc.rejected_value += rejected_value;
560 acc.winning_tickets += stats.winning_tickets as u128;
561
562 #[cfg(all(feature = "prometheus", not(test)))]
563 {
564 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
565 &[&stats.channel_id, "neglected"],
566 neglected_value.amount().as_u128() as f64,
567 );
568
569 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
570 &[&stats.channel_id, "redeemed"],
571 redeemed_value.amount().as_u128() as f64,
572 );
573
574 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
575 &[&stats.channel_id, "rejected"],
576 rejected_value.amount().as_u128() as f64,
577 );
578 }
579
580 acc
581 });
582
583 all_stats.unredeemed_value = unredeemed_value.into();
584
585 Ok::<_, DbSqlError>(all_stats)
586 })
587 })
588 .await
589 }
590 Some(channel) => {
591 if self.get_channel_by_id(None, &channel).await?.is_none() {
594 return Err(DbSqlError::ChannelNotFound(channel).into());
595 }
596
597 self.nest_transaction_in_db(None, TargetDb::Tickets)
598 .await?
599 .perform(|tx| {
600 Box::pin(async move {
601 let stats = find_stats_for_channel(tx, &channel).await?;
602 let unredeemed_value = ticket::Entity::find()
603 .filter(ticket::Column::ChannelId.eq(channel.to_hex()))
604 .stream(tx.as_ref())
605 .await?
606 .try_fold(U256::zero(), |amount, x| {
607 futures::future::ok(amount + U256::from_be_bytes(x.amount))
608 })
609 .await?;
610
611 Ok::<_, DbSqlError>(ChannelTicketStatistics {
612 winning_tickets: stats.winning_tickets as u128,
613 neglected_value: HoprBalance::from_be_bytes(stats.neglected_value),
614 redeemed_value: HoprBalance::from_be_bytes(stats.redeemed_value),
615 unredeemed_value: unredeemed_value.into(),
616 rejected_value: HoprBalance::from_be_bytes(stats.rejected_value),
617 })
618 })
619 })
620 .await
621 }
622 };
623 Ok(res?)
624 }
625
626 async fn reset_ticket_statistics(&self) -> Result<()> {
627 let res = self
628 .nest_transaction_in_db(None, TargetDb::Tickets)
629 .await?
630 .perform(|tx| {
631 Box::pin(async move {
632 #[cfg(all(feature = "prometheus", not(test)))]
633 let rows = ticket_statistics::Entity::find().all(tx.as_ref()).await?;
634
635 let deleted = ticket_statistics::Entity::delete_many().exec(tx.as_ref()).await?;
637
638 #[cfg(all(feature = "prometheus", not(test)))]
639 {
640 if deleted.rows_affected > 0 {
641 for row in rows {
642 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&[&row.channel_id, "neglected"], 0.0_f64);
643
644 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&[&row.channel_id, "redeemed"], 0.0_f64);
645
646 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&[&row.channel_id, "rejected"], 0.0_f64);
647 }
648 }
649 }
650
651 debug!("reset ticket statistics for {:} channel(s)", deleted.rows_affected);
652
653 Ok::<_, DbSqlError>(())
654 })
655 })
656 .await;
657
658 Ok(res?)
659 }
660
661 async fn get_tickets_value(&self, selector: TicketSelector) -> Result<(usize, HoprBalance)> {
662 self.get_tickets_value_int(None, selector).await
663 }
664
665 async fn compare_and_set_outgoing_ticket_index(&self, channel_id: Hash, index: u64) -> Result<u64> {
666 let old_value = self
667 .get_outgoing_ticket_index(channel_id)
668 .await?
669 .fetch_max(index, Ordering::SeqCst);
670
671 Ok(old_value)
675 }
676
677 async fn reset_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64> {
678 let old_value = self
679 .get_outgoing_ticket_index(channel_id)
680 .await?
681 .swap(0, Ordering::SeqCst);
682
683 Ok(old_value)
687 }
688
689 async fn increment_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64> {
690 let old_value = self
691 .get_outgoing_ticket_index(channel_id)
692 .await?
693 .fetch_add(1, Ordering::SeqCst);
694
695 Ok(old_value)
698 }
699
700 async fn get_outgoing_ticket_index(&self, channel_id: Hash) -> Result<Arc<AtomicU64>> {
701 let tkt_manager = self.ticket_manager.clone();
702
703 Ok(self
704 .caches
705 .ticket_index
706 .try_get_with(channel_id, async move {
707 let maybe_index = outgoing_ticket_index::Entity::find()
708 .filter(outgoing_ticket_index::Column::ChannelId.eq(channel_id.to_hex()))
709 .one(&tkt_manager.tickets_db)
710 .await?;
711
712 Ok(Arc::new(AtomicU64::new(match maybe_index {
713 Some(model) => U256::from_be_bytes(model.index).as_u64(),
714 None => {
715 tkt_manager
716 .with_write_locked_db(|tx| {
717 Box::pin(async move {
718 outgoing_ticket_index::ActiveModel {
719 channel_id: Set(channel_id.to_hex()),
720 ..Default::default()
721 }
722 .insert(tx.as_ref())
723 .await?;
724 Ok::<_, DbSqlError>(())
725 })
726 })
727 .await?;
728 0_u64
729 }
730 })))
731 })
732 .await
733 .map_err(|e: Arc<DbSqlError>| LogicalError(format!("failed to retrieve ticket index: {e}")))?)
734 }
735
736 async fn persist_outgoing_ticket_indices(&self) -> Result<usize> {
737 let outgoing_indices = outgoing_ticket_index::Entity::find()
738 .all(&self.tickets_db)
739 .await
740 .map_err(DbSqlError::from)?;
741
742 let mut updated = 0;
743 for index_model in outgoing_indices {
744 let channel_id = Hash::from_hex(&index_model.channel_id).map_err(DbSqlError::from)?;
745 let db_index = U256::from_be_bytes(&index_model.index).as_u64();
746 if let Some(cached_index) = self.caches.ticket_index.get(&channel_id).await {
747 let cached_index = cached_index.load(Ordering::SeqCst);
751
752 if cached_index > db_index {
754 let mut index_active_model = index_model.into_active_model();
755 index_active_model.index = Set(cached_index.to_be_bytes().to_vec());
756 self.ticket_manager
757 .with_write_locked_db(|wtx| {
758 Box::pin(async move {
759 index_active_model.save(wtx.as_ref()).await?;
760 Ok::<_, DbSqlError>(())
761 })
762 })
763 .await?;
764
765 debug!("updated ticket index in channel {channel_id} from {db_index} to {cached_index}");
766 updated += 1;
767 }
768 } else {
769 trace!(?channel_id, "channel not in cache yet");
772 }
773 }
774
775 Ok(Ok::<_, DbSqlError>(updated)?)
776 }
777
778 async fn prepare_aggregation_in_channel(
779 &self,
780 channel: &Hash,
781 prerequisites: AggregationPrerequisites,
782 ) -> Result<Option<(OffchainPublicKey, Vec<TransferableWinningTicket>, Hash)>> {
783 let myself = self.clone();
784
785 let channel_id = *channel;
786
787 let (channel_entry, peer, domain_separator, min_win_prob) = self
788 .nest_transaction_in_db(None, TargetDb::Index)
789 .await?
790 .perform(|tx| {
791 Box::pin(async move {
792 let entry = myself
793 .get_channel_by_id(Some(tx), &channel_id)
794 .await?
795 .ok_or(DbSqlError::ChannelNotFound(channel_id))?;
796
797 if entry.status == ChannelStatus::Closed {
798 return Err(DbSqlError::LogicalError(format!("channel '{channel_id}' is closed")));
799 } else if entry.direction(&myself.me_onchain) != Some(ChannelDirection::Incoming) {
800 return Err(DbSqlError::LogicalError(format!(
801 "channel '{channel_id}' is not incoming"
802 )));
803 }
804
805 let pk = myself
806 .resolve_packet_key(&entry.source)
807 .await?
808 .ok_or(DbSqlError::LogicalError(format!(
809 "peer '{}' has no offchain key record",
810 entry.source
811 )))?;
812
813 let indexer_data = myself.get_indexer_data(Some(tx)).await?;
814
815 let domain_separator = indexer_data
816 .channels_dst
817 .ok_or_else(|| crate::errors::DbSqlError::LogicalError("domain separator missing".into()))?;
818
819 Ok((
820 entry,
821 pk,
822 domain_separator,
823 indexer_data.minimum_incoming_ticket_winning_prob,
824 ))
825 })
826 })
827 .await?;
828
829 let myself = self.clone();
830 let tickets = self
831 .ticket_manager
832 .with_write_locked_db(|tx| {
833 Box::pin(async move {
834 if ticket::Entity::find()
836 .filter(WrappedTicketSelector::from(
837 TicketSelector::from(&channel_entry).with_state(AcknowledgedTicketStatus::BeingAggregated),
838 ))
839 .one(tx.as_ref())
840 .await?
841 .is_some()
842 {
843 return Err(DbSqlError::LogicalError(format!(
844 "'{channel_entry}' is already being aggregated",
845 )));
846 }
847
848 let first_idx_to_take = ticket::Entity::find()
850 .filter(WrappedTicketSelector::from(
851 TicketSelector::from(&channel_entry).with_state(AcknowledgedTicketStatus::BeingRedeemed),
852 ))
853 .order_by_desc(ticket::Column::Index)
854 .one(tx.as_ref())
855 .await?
856 .map(|m| U256::from_be_bytes(m.index).as_u64() + 1)
857 .unwrap_or(0_u64) .max(channel_entry.ticket_index.as_u64()); let to_be_aggregated = ticket::Entity::find()
862 .filter(WrappedTicketSelector::from(TicketSelector::from(&channel_entry)))
863 .filter(ticket::Column::Index.gte(first_idx_to_take.to_be_bytes().to_vec()))
864 .filter(ticket::Column::State.ne(AcknowledgedTicketStatus::BeingAggregated as u8))
865 .order_by_asc(ticket::Column::Index).limit(MAX_TICKETS_TO_AGGREGATE_BATCH)
867 .all(tx.as_ref())
868 .await?;
869
870 let mut to_be_aggregated: Vec<TransferableWinningTicket> =
872 filter_satisfying_ticket_models(prerequisites, to_be_aggregated, &channel_entry, min_win_prob)?
873 .into_iter()
874 .map(|model| {
875 AcknowledgedTicket::try_from(model)
876 .map_err(DbSqlError::from)
877 .and_then(|ack| {
878 ack.into_transferable(&myself.chain_key, &domain_separator)
879 .map_err(DbSqlError::from)
880 })
881 })
882 .collect::<crate::errors::Result<Vec<_>>>()?;
883
884 let mut neglected_idxs = Vec::new();
885
886 if !to_be_aggregated.is_empty() {
887 let first_ticket = to_be_aggregated[0].ticket.clone();
892 let mut i = 1;
893 while i < to_be_aggregated.len() {
894 let current_idx = to_be_aggregated[i].ticket.index;
895 if (first_ticket.index..first_ticket.index + first_ticket.index_offset as u64).contains(¤t_idx) {
896 warn!(ticket_id = current_idx, channel = %channel_id, ?first_ticket, "ticket in channel has been already aggregated and will be removed");
900 neglected_idxs.push(current_idx);
901 to_be_aggregated.remove(i);
902 } else {
903 i += 1;
904 }
905 }
906
907 if !neglected_idxs.is_empty() {
910 warn!(count = neglected_idxs.len(), channel = %channel_id, "tickets were neglected due to duplication in an aggregated ticket!");
911 }
912
913 let marked: sea_orm::UpdateResult = ticket::Entity::update_many()
915 .filter(WrappedTicketSelector::from(TicketSelector::from(&channel_entry)))
916 .filter(ticket::Column::Index.is_in(to_be_aggregated.iter().map(|t| t.ticket.index.to_be_bytes().to_vec())))
917 .filter(ticket::Column::State.ne(AcknowledgedTicketStatus::BeingAggregated as u8))
918 .col_expr(
919 ticket::Column::State,
920 Expr::value(AcknowledgedTicketStatus::BeingAggregated as i8),
921 )
922 .exec(tx.as_ref())
923 .await?;
924
925 if marked.rows_affected as usize != to_be_aggregated.len() {
926 return Err(DbSqlError::LogicalError(format!(
927 "expected to mark {}, but was able to mark {}",
928 to_be_aggregated.len(),
929 marked.rows_affected,
930 )));
931 }
932 }
933
934 debug!(
935 "prepared {} tickets to aggregate in {} ({})",
936 to_be_aggregated.len(),
937 channel_entry.get_id(),
938 channel_entry.channel_epoch,
939 );
940
941 Ok(to_be_aggregated)
942 })
943 })
944 .await?;
945
946 Ok((!tickets.is_empty()).then_some((peer, tickets, domain_separator)))
947 }
948
949 async fn rollback_aggregation_in_channel(&self, channel: Hash) -> Result<()> {
950 let channel_entry = self
951 .get_channel_by_id(None, &channel)
952 .await?
953 .ok_or(DbSqlError::ChannelNotFound(channel))?;
954
955 let selector = TicketSelector::from(channel_entry).with_state(AcknowledgedTicketStatus::BeingAggregated);
956
957 let reverted = self
958 .update_ticket_states(selector, AcknowledgedTicketStatus::Untouched)
959 .await?;
960
961 info!(
962 "rollback happened for ticket aggregation in '{channel}' with {reverted} tickets rolled back as a result",
963 );
964 Ok(())
965 }
966
967 async fn process_received_aggregated_ticket(
968 &self,
969 aggregated_ticket: Ticket,
970 chain_keypair: &ChainKeypair,
971 ) -> Result<AcknowledgedTicket> {
972 if chain_keypair.public().to_address() != self.me_onchain {
973 return Err(DbSqlError::LogicalError(
974 "chain key for ticket aggregation does not match the DB public address".into(),
975 )
976 .into());
977 }
978
979 let myself = self.clone();
980 let channel_id = aggregated_ticket.channel_id;
981
982 let (channel_entry, domain_separator) = self
983 .nest_transaction_in_db(None, TargetDb::Index)
984 .await?
985 .perform(|tx| {
986 Box::pin(async move {
987 let entry = myself
988 .get_channel_by_id(Some(tx), &channel_id)
989 .await?
990 .ok_or(DbSqlError::ChannelNotFound(channel_id))?;
991
992 if entry.status == ChannelStatus::Closed {
993 return Err(DbSqlError::LogicalError(format!("channel '{channel_id}' is closed")));
994 } else if entry.direction(&myself.me_onchain) != Some(ChannelDirection::Incoming) {
995 return Err(DbSqlError::LogicalError(format!(
996 "channel '{channel_id}' is not incoming"
997 )));
998 }
999
1000 let domain_separator =
1001 myself.get_indexer_data(Some(tx)).await?.channels_dst.ok_or_else(|| {
1002 crate::errors::DbSqlError::LogicalError("domain separator missing".into())
1003 })?;
1004
1005 Ok((entry, domain_separator))
1006 })
1007 })
1008 .await?;
1009
1010 let aggregated_ticket = aggregated_ticket
1012 .verify(&channel_entry.source, &domain_separator)
1013 .map_err(|e| {
1014 DbSqlError::LogicalError(format!(
1015 "failed to verify received aggregated ticket in {channel_id}: {e}"
1016 ))
1017 })?;
1018
1019 if !aggregated_ticket.win_prob().approx_eq(&WinningProbability::ALWAYS) {
1021 return Err(DbSqlError::LogicalError("Aggregated tickets must have 100% win probability".into()).into());
1022 }
1023
1024 let acknowledged_tickets = self
1025 .nest_transaction_in_db(None, TargetDb::Tickets)
1026 .await?
1027 .perform(|tx| {
1028 Box::pin(async move {
1029 ticket::Entity::find()
1030 .filter(WrappedTicketSelector::from(
1031 TicketSelector::from(&channel_entry).with_state(AcknowledgedTicketStatus::BeingAggregated),
1032 ))
1033 .all(tx.as_ref())
1034 .await
1035 .map_err(DbSqlError::BackendError)
1036 })
1037 })
1038 .await?;
1039
1040 if acknowledged_tickets.is_empty() {
1041 debug!("Received unexpected aggregated ticket in channel {channel_id}");
1042 return Err(DbSqlError::LogicalError(format!(
1043 "failed insert aggregated ticket, because no tickets seem to be aggregated for '{channel_id}'",
1044 ))
1045 .into());
1046 }
1047
1048 let stored_value = acknowledged_tickets
1049 .iter()
1050 .map(|m| HoprBalance::from_be_bytes(&m.amount))
1051 .sum();
1052
1053 if aggregated_ticket.verified_ticket().amount.lt(&stored_value) {
1055 error!(channel = %channel_id, "Aggregated ticket value in channel is lower than sum of stored tickets");
1056 return Err(DbSqlError::LogicalError("Value of received aggregated ticket is too low".into()).into());
1057 }
1058
1059 let acknowledged_tickets = acknowledged_tickets
1060 .into_iter()
1061 .map(AcknowledgedTicket::try_from)
1062 .collect::<hopr_db_entity::errors::Result<Vec<AcknowledgedTicket>>>()
1063 .map_err(DbSqlError::from)?;
1064
1065 let first_stored_ticket = acknowledged_tickets.first().unwrap();
1067
1068 #[allow(unused_variables)]
1070 let current_ticket_index_from_aggregated_ticket =
1071 U256::from(aggregated_ticket.verified_ticket().index).add(aggregated_ticket.verified_ticket().index_offset);
1072
1073 let acked_aggregated_ticket = aggregated_ticket.into_acknowledged(first_stored_ticket.response.clone());
1074
1075 let ticket = acked_aggregated_ticket.clone();
1076 self.ticket_manager.replace_tickets(ticket).await?;
1077
1078 info!(%acked_aggregated_ticket, "successfully processed received aggregated ticket");
1079 Ok(acked_aggregated_ticket)
1080 }
1081
1082 async fn aggregate_tickets(
1083 &self,
1084 destination: OffchainPublicKey,
1085 mut acked_tickets: Vec<TransferableWinningTicket>,
1086 me: &ChainKeypair,
1087 ) -> Result<VerifiedTicket> {
1088 if me.public().to_address() != self.me_onchain {
1089 return Err(DbSqlError::LogicalError(
1090 "chain key for ticket aggregation does not match the DB public address".into(),
1091 )
1092 .into());
1093 }
1094
1095 let domain_separator = self
1096 .get_indexer_data(None)
1097 .await?
1098 .domain_separator(DomainSeparator::Channel)
1099 .ok_or_else(|| DbSqlError::LogicalError("domain separator missing".into()))?;
1100
1101 if acked_tickets.is_empty() {
1102 return Err(DbSqlError::LogicalError("at least one ticket required for aggregation".to_owned()).into());
1103 }
1104
1105 if acked_tickets.len() == 1 {
1106 let single = acked_tickets
1107 .pop()
1108 .unwrap()
1109 .into_redeemable(&self.me_onchain, &domain_separator)
1110 .map_err(DbSqlError::from)?;
1111
1112 self.compare_and_set_outgoing_ticket_index(
1113 single.verified_ticket().channel_id,
1114 single.verified_ticket().index + 1,
1115 )
1116 .await?;
1117
1118 return Ok(single.ticket);
1119 }
1120
1121 acked_tickets.sort_by(|a, b| a.partial_cmp(b).unwrap_or(cmp::Ordering::Equal));
1122 acked_tickets.dedup();
1123
1124 let myself = self.clone();
1125 let address = myself
1126 .resolve_chain_key(&destination)
1127 .await?
1128 .ok_or(DbSqlError::LogicalError(format!(
1129 "peer '{}' has no chain key record",
1130 destination.to_peerid_str()
1131 )))?;
1132
1133 let (channel_entry, destination, min_win_prob) = self
1134 .nest_transaction_in_db(None, TargetDb::Index)
1135 .await?
1136 .perform(|tx| {
1137 Box::pin(async move {
1138 let entry = myself
1139 .get_channel_by_parties(Some(tx), &myself.me_onchain, &address, false)
1140 .await?
1141 .ok_or_else(|| {
1142 DbSqlError::ChannelNotFound(generate_channel_id(&myself.me_onchain, &address))
1143 })?;
1144
1145 if entry.status == ChannelStatus::Closed {
1146 return Err(DbSqlError::LogicalError(format!("{entry} is closed")));
1147 } else if entry.direction(&myself.me_onchain) != Some(ChannelDirection::Outgoing) {
1148 return Err(DbSqlError::LogicalError(format!("{entry} is not outgoing")));
1149 }
1150
1151 let min_win_prob = myself
1152 .get_indexer_data(Some(tx))
1153 .await?
1154 .minimum_incoming_ticket_winning_prob;
1155 Ok((entry, address, min_win_prob))
1156 })
1157 })
1158 .await?;
1159
1160 let channel_balance = channel_entry.balance;
1161 let channel_epoch = channel_entry.channel_epoch.as_u32();
1162 let channel_id = channel_entry.get_id();
1163
1164 let mut final_value = HoprBalance::zero();
1165
1166 let verified_tickets = acked_tickets
1168 .into_iter()
1169 .map(|t| t.into_redeemable(&self.me_onchain, &domain_separator))
1170 .collect::<hopr_internal_types::errors::Result<Vec<_>>>()
1171 .map_err(|e| {
1172 DbSqlError::LogicalError(format!("trying to aggregate an invalid or a non-winning ticket: {e}"))
1173 })?;
1174
1175 for (i, acked_ticket) in verified_tickets.iter().enumerate() {
1177 if channel_id != acked_ticket.verified_ticket().channel_id {
1178 return Err(DbSqlError::LogicalError(format!(
1179 "ticket for aggregation has an invalid channel id {}",
1180 acked_ticket.verified_ticket().channel_id
1181 ))
1182 .into());
1183 }
1184
1185 if acked_ticket.verified_ticket().channel_epoch != channel_epoch {
1186 return Err(DbSqlError::LogicalError("channel epochs do not match".into()).into());
1187 }
1188
1189 if i + 1 < verified_tickets.len()
1190 && acked_ticket.verified_ticket().index + acked_ticket.verified_ticket().index_offset as u64
1191 > verified_tickets[i + 1].verified_ticket().index
1192 {
1193 return Err(DbSqlError::LogicalError("tickets with overlapping index intervals".into()).into());
1194 }
1195
1196 if acked_ticket
1197 .verified_ticket()
1198 .win_prob()
1199 .approx_cmp(&min_win_prob)
1200 .is_lt()
1201 {
1202 return Err(DbSqlError::LogicalError(
1203 "cannot aggregate ticket with lower than minimum winning probability in network".into(),
1204 )
1205 .into());
1206 }
1207
1208 final_value += acked_ticket.verified_ticket().amount;
1209 if final_value.gt(&channel_balance) {
1210 return Err(DbSqlError::LogicalError(format!(
1211 "ticket amount to aggregate {final_value} is greater than the balance {channel_balance} of \
1212 channel {channel_id}"
1213 ))
1214 .into());
1215 }
1216 }
1217
1218 info!(
1219 "aggregated {} tickets in channel {channel_id} with total value {final_value}",
1220 verified_tickets.len()
1221 );
1222
1223 let first_acked_ticket = verified_tickets.first().unwrap();
1224 let last_acked_ticket = verified_tickets.last().unwrap();
1225
1226 let current_ticket_index_from_acked_tickets = last_acked_ticket.verified_ticket().index + 1;
1229 self.compare_and_set_outgoing_ticket_index(channel_id, current_ticket_index_from_acked_tickets)
1230 .await?;
1231
1232 Ok(TicketBuilder::default()
1233 .direction(&self.me_onchain, &destination)
1234 .balance(final_value)
1235 .index(first_acked_ticket.verified_ticket().index)
1236 .index_offset(
1237 (last_acked_ticket.verified_ticket().index - first_acked_ticket.verified_ticket().index + 1) as u32,
1238 )
1239 .win_prob(WinningProbability::ALWAYS) .channel_epoch(channel_epoch)
1241 .challenge(first_acked_ticket.verified_ticket().challenge)
1242 .build_signed(me, &domain_separator)
1243 .map_err(DbSqlError::from)?)
1244 }
1245
1246 async fn fix_channels_next_ticket_state(&self) -> Result<()> {
1247 let channels = self.get_incoming_channels(None).await?;
1248
1249 for channel in channels.into_iter() {
1250 let selector = TicketSelector::from(&channel)
1251 .with_state(AcknowledgedTicketStatus::BeingRedeemed)
1252 .with_index(channel.ticket_index.as_u64());
1253
1254 let mut tickets_stream = self
1255 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::Untouched)
1256 .await?;
1257
1258 while let Some(ticket) = tickets_stream.next().await {
1259 let channel_id = channel.get_id();
1260 let ticket_index = ticket.verified_ticket().index;
1261 let ticket_amount = ticket.verified_ticket().amount;
1262 info!(%channel_id, %ticket_index, %ticket_amount, "fixed next out-of-sync ticket");
1263 }
1264 }
1265
1266 Ok(())
1267 }
1268}
1269
1270impl HoprDb {
1271 pub async fn upsert_ticket<'a>(&'a self, tx: OptTx<'a>, acknowledged_ticket: AcknowledgedTicket) -> Result<()> {
1273 self.nest_transaction_in_db(tx, TargetDb::Tickets)
1274 .await?
1275 .perform(|tx| {
1276 Box::pin(async move {
1277 let selector = WrappedTicketSelector::from(
1279 TicketSelector::new(
1280 acknowledged_ticket.verified_ticket().channel_id,
1281 acknowledged_ticket.verified_ticket().channel_epoch,
1282 )
1283 .with_index(acknowledged_ticket.verified_ticket().index),
1284 );
1285
1286 debug!("upserting ticket {acknowledged_ticket}");
1287 let mut model = ticket::ActiveModel::from(acknowledged_ticket);
1288
1289 if let Some(ticket) = ticket::Entity::find().filter(selector).one(tx.as_ref()).await? {
1290 model.id = Set(ticket.id);
1291 }
1292
1293 Ok::<_, DbSqlError>(model.save(tx.as_ref()).await?)
1294 })
1295 })
1296 .await?;
1297 Ok(())
1298 }
1299}
1300
1301#[cfg(test)]
1302mod tests {
1303 use std::{
1304 ops::Add,
1305 sync::atomic::Ordering,
1306 time::{Duration, SystemTime},
1307 };
1308
1309 use anyhow::{Context, anyhow};
1310 use futures::{StreamExt, pin_mut};
1311 use hex_literal::hex;
1312 use hopr_crypto_random::Randomizable;
1313 use hopr_crypto_types::prelude::*;
1314 use hopr_db_api::{
1315 info::DomainSeparator,
1316 prelude::{DbError, TicketMarker},
1317 tickets::ChannelTicketStatistics,
1318 };
1319 use hopr_db_entity::ticket;
1320 use hopr_internal_types::prelude::*;
1321 use hopr_primitive_types::prelude::*;
1322 use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter, Set};
1323
1324 use crate::{
1325 HoprDbGeneralModelOperations, TargetDb,
1326 accounts::HoprDbAccountOperations,
1327 channels::HoprDbChannelOperations,
1328 db::HoprDb,
1329 errors::DbSqlError,
1330 info::HoprDbInfoOperations,
1331 tickets::{AggregationPrerequisites, HoprDbTicketOperations, TicketSelector, filter_satisfying_ticket_models},
1332 };
1333
1334 lazy_static::lazy_static! {
1335 static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
1336 static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
1337 static ref CHANNEL_ID: Hash = generate_channel_id(&BOB.public().to_address(), &ALICE.public().to_address());
1338 }
1339
1340 lazy_static::lazy_static! {
1341 static ref ALICE_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
1342 static ref BOB_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
1343 }
1344
1345 const TICKET_VALUE: u64 = 100_000;
1346
1347 async fn add_peer_mappings(db: &HoprDb, peers: Vec<(OffchainKeypair, ChainKeypair)>) -> crate::errors::Result<()> {
1348 for (peer_offchain, peer_onchain) in peers.into_iter() {
1349 db.insert_account(
1350 None,
1351 AccountEntry {
1352 public_key: *peer_offchain.public(),
1353 chain_addr: peer_onchain.public().to_address(),
1354 entry_type: AccountType::NotAnnounced,
1355 published_at: 0,
1356 },
1357 )
1358 .await?
1359 }
1360
1361 Ok(())
1362 }
1363
1364 fn generate_random_ack_ticket(
1365 src: &ChainKeypair,
1366 dst: &ChainKeypair,
1367 index: u64,
1368 index_offset: u32,
1369 win_prob: f64,
1370 ) -> anyhow::Result<AcknowledgedTicket> {
1371 let hk1 = HalfKey::random();
1372 let hk2 = HalfKey::random();
1373
1374 let cp1: CurvePoint = hk1.to_challenge().try_into()?;
1375 let cp2: CurvePoint = hk2.to_challenge().try_into()?;
1376 let cp_sum = CurvePoint::combine(&[&cp1, &cp2]);
1377
1378 Ok(TicketBuilder::default()
1379 .addresses(src, dst)
1380 .amount(TICKET_VALUE)
1381 .index(index)
1382 .index_offset(index_offset)
1383 .win_prob(win_prob.try_into()?)
1384 .channel_epoch(4)
1385 .challenge(Challenge::from(cp_sum).to_ethereum_challenge())
1386 .build_signed(src, &Hash::default())?
1387 .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?))
1388 }
1389
1390 async fn init_db_with_tickets(
1391 db: &HoprDb,
1392 count_tickets: u64,
1393 ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
1394 init_db_with_tickets_and_channel(db, count_tickets, None).await
1395 }
1396
1397 async fn init_db_with_tickets_and_channel(
1398 db: &HoprDb,
1399 count_tickets: u64,
1400 channel_ticket_index: Option<u32>,
1401 ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
1402 let channel = ChannelEntry::new(
1403 BOB.public().to_address(),
1404 ALICE.public().to_address(),
1405 u32::MAX.into(),
1406 channel_ticket_index.unwrap_or(0u32).into(),
1407 ChannelStatus::Open,
1408 4_u32.into(),
1409 );
1410
1411 db.upsert_channel(None, channel).await?;
1412
1413 let tickets: Vec<AcknowledgedTicket> = (0..count_tickets)
1414 .map(|i| generate_random_ack_ticket(&BOB, &ALICE, i, 1, 1.0))
1415 .collect::<anyhow::Result<Vec<AcknowledgedTicket>>>()?;
1416
1417 let db_clone = db.clone();
1418 let tickets_clone = tickets.clone();
1419 db.begin_transaction_in_db(TargetDb::Tickets)
1420 .await?
1421 .perform(|tx| {
1422 Box::pin(async move {
1423 for t in tickets_clone {
1424 db_clone.upsert_ticket(Some(tx), t).await?;
1425 }
1426 Ok::<(), DbSqlError>(())
1427 })
1428 })
1429 .await?;
1430
1431 Ok((channel, tickets))
1432 }
1433
1434 #[tokio::test]
1435 async fn test_insert_get_ticket() -> anyhow::Result<()> {
1436 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1437 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
1438 .await?;
1439
1440 let (channel, mut tickets) = init_db_with_tickets(&db, 1).await?;
1441 let ack_ticket = tickets.pop().context("ticket should be present")?;
1442
1443 assert_eq!(
1444 channel.get_id(),
1445 ack_ticket.verified_ticket().channel_id,
1446 "channel ids must match"
1447 );
1448 assert_eq!(
1449 channel.channel_epoch.as_u32(),
1450 ack_ticket.verified_ticket().channel_epoch,
1451 "epochs must match"
1452 );
1453
1454 let db_ticket = db
1455 .get_tickets((&ack_ticket).into())
1456 .await?
1457 .first()
1458 .cloned()
1459 .context("ticket should exist")?;
1460
1461 assert_eq!(ack_ticket, db_ticket, "tickets must be equal");
1462
1463 Ok(())
1464 }
1465
1466 #[tokio::test]
1467 async fn test_mark_redeemed() -> anyhow::Result<()> {
1468 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1469 const COUNT_TICKETS: u64 = 10;
1470
1471 let (_, tickets) = init_db_with_tickets(&db, COUNT_TICKETS).await?;
1472
1473 let stats = db.get_ticket_statistics(None).await?;
1474 assert_eq!(
1475 HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
1476 stats.unredeemed_value,
1477 "unredeemed balance must match"
1478 );
1479 assert_eq!(
1480 HoprBalance::zero(),
1481 stats.redeemed_value,
1482 "there must be 0 redeemed value"
1483 );
1484
1485 assert_eq!(
1486 stats,
1487 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1488 "per channel stats must be same"
1489 );
1490
1491 const TO_REDEEM: u64 = 2;
1492 let db_clone = db.clone();
1493 db.begin_transaction_in_db(TargetDb::Tickets)
1494 .await?
1495 .perform(|_tx| {
1496 Box::pin(async move {
1497 for ticket in tickets.iter().take(TO_REDEEM as usize) {
1498 let r = db_clone.mark_tickets_as(ticket.into(), TicketMarker::Redeemed).await?;
1499 assert_eq!(1, r, "must redeem only a single ticket");
1500 }
1501 Ok::<(), DbSqlError>(())
1502 })
1503 })
1504 .await?;
1505
1506 let stats = db.get_ticket_statistics(None).await?;
1507 assert_eq!(
1508 HoprBalance::from(TICKET_VALUE * (COUNT_TICKETS - TO_REDEEM)),
1509 stats.unredeemed_value,
1510 "unredeemed balance must match"
1511 );
1512 assert_eq!(
1513 HoprBalance::from(TICKET_VALUE * TO_REDEEM),
1514 stats.redeemed_value,
1515 "there must be a redeemed value"
1516 );
1517
1518 assert_eq!(
1519 stats,
1520 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1521 "per channel stats must be same"
1522 );
1523
1524 Ok(())
1525 }
1526
1527 #[tokio::test]
1528 async fn test_mark_redeem_should_not_mark_redeem_twice() -> anyhow::Result<()> {
1529 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1530
1531 let ticket = init_db_with_tickets(&db, 1)
1532 .await?
1533 .1
1534 .pop()
1535 .context("should contain a ticket")?;
1536
1537 db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed).await?;
1538 assert_eq!(0, db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed).await?);
1539
1540 Ok(())
1541 }
1542
1543 #[tokio::test]
1544 async fn test_mark_redeem_should_redeem_all_tickets() -> anyhow::Result<()> {
1545 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1546
1547 let count_tickets = 10;
1548 let channel = init_db_with_tickets(&db, count_tickets).await?.0;
1549
1550 let count_marked = db.mark_tickets_as((&channel).into(), TicketMarker::Redeemed).await?;
1551 assert_eq!(count_tickets, count_marked as u64, "must mark all tickets in channel");
1552
1553 Ok(())
1554 }
1555
1556 #[tokio::test]
1557 async fn test_mark_tickets_neglected() -> anyhow::Result<()> {
1558 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1559 const COUNT_TICKETS: u64 = 10;
1560
1561 let (channel, _) = init_db_with_tickets(&db, COUNT_TICKETS).await?;
1562
1563 let stats = db.get_ticket_statistics(None).await?;
1564 assert_eq!(
1565 HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
1566 stats.unredeemed_value,
1567 "unredeemed balance must match"
1568 );
1569 assert_eq!(
1570 HoprBalance::zero(),
1571 stats.neglected_value,
1572 "there must be 0 redeemed value"
1573 );
1574
1575 assert_eq!(
1576 stats,
1577 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1578 "per channel stats must be same"
1579 );
1580
1581 db.mark_tickets_as((&channel).into(), TicketMarker::Neglected).await?;
1582
1583 let stats = db.get_ticket_statistics(None).await?;
1584 assert_eq!(
1585 HoprBalance::zero(),
1586 stats.unredeemed_value,
1587 "unredeemed balance must be zero"
1588 );
1589 assert_eq!(
1590 HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
1591 stats.neglected_value,
1592 "there must be a neglected value"
1593 );
1594
1595 assert_eq!(
1596 stats,
1597 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1598 "per channel stats must be same"
1599 );
1600
1601 Ok(())
1602 }
1603
1604 #[tokio::test]
1605 async fn test_mark_unsaved_ticket_rejected() -> anyhow::Result<()> {
1606 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1607
1608 let (_, mut ticket) = init_db_with_tickets(&db, 1).await?;
1609 let ticket = ticket.pop().context("ticket should be present")?.ticket;
1610
1611 let stats = db.get_ticket_statistics(None).await?;
1612 assert_eq!(HoprBalance::zero(), stats.rejected_value);
1613 assert_eq!(
1614 stats,
1615 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1616 "per channel stats must be same"
1617 );
1618
1619 db.mark_unsaved_ticket_rejected(ticket.verified_ticket()).await?;
1620
1621 let stats = db.get_ticket_statistics(None).await?;
1622 assert_eq!(ticket.verified_ticket().amount, stats.rejected_value);
1623 assert_eq!(
1624 stats,
1625 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1626 "per channel stats must be same"
1627 );
1628
1629 Ok(())
1630 }
1631
1632 #[tokio::test]
1633 async fn test_update_tickets_states_and_fetch() -> anyhow::Result<()> {
1634 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1635 db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
1636 .await?;
1637
1638 let channel = init_db_with_tickets(&db, 10).await?.0;
1639
1640 let selector = TicketSelector::from(&channel).with_index(5);
1641
1642 let v: Vec<AcknowledgedTicket> = db
1643 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
1644 .await?
1645 .collect()
1646 .await;
1647
1648 assert_eq!(1, v.len(), "single ticket must be updated");
1649 assert_eq!(
1650 AcknowledgedTicketStatus::BeingRedeemed,
1651 v.first().context("should contain a ticket")?.status,
1652 "status must be set"
1653 );
1654
1655 let selector = TicketSelector::from(&channel).with_state(AcknowledgedTicketStatus::Untouched);
1656
1657 let v: Vec<AcknowledgedTicket> = db
1658 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
1659 .await?
1660 .collect()
1661 .await;
1662
1663 assert_eq!(9, v.len(), "only specific tickets must have state set");
1664 assert!(
1665 v.iter().all(|t| t.verified_ticket().index != 5),
1666 "only tickets with different state must update"
1667 );
1668 assert!(
1669 v.iter().all(|t| t.status == AcknowledgedTicketStatus::BeingRedeemed),
1670 "tickets must have updated state"
1671 );
1672
1673 Ok(())
1674 }
1675
1676 #[tokio::test]
1677 async fn test_update_tickets_states() -> anyhow::Result<()> {
1678 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1679 db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
1680 .await?;
1681
1682 let channel = init_db_with_tickets(&db, 10).await?.0;
1683 let selector = TicketSelector::from(&channel).with_state(AcknowledgedTicketStatus::Untouched);
1684
1685 db.update_ticket_states(selector.clone(), AcknowledgedTicketStatus::BeingRedeemed)
1686 .await?;
1687
1688 let v: Vec<AcknowledgedTicket> = db
1689 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
1690 .await?
1691 .collect()
1692 .await;
1693
1694 assert!(v.is_empty(), "must not update if already updated");
1695
1696 Ok(())
1697 }
1698
1699 #[tokio::test]
1700 async fn test_ticket_index_should_be_zero_if_not_yet_present() -> anyhow::Result<()> {
1701 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1702
1703 let hash = Hash::default();
1704
1705 let idx = db.get_outgoing_ticket_index(hash).await?;
1706 assert_eq!(0, idx.load(Ordering::SeqCst), "initial index must be zero");
1707
1708 let r = hopr_db_entity::outgoing_ticket_index::Entity::find()
1709 .filter(hopr_db_entity::outgoing_ticket_index::Column::ChannelId.eq(hash.to_hex()))
1710 .one(&db.tickets_db)
1711 .await?
1712 .context("index must exist")?;
1713
1714 assert_eq!(0, U256::from_be_bytes(r.index).as_u64(), "index must be zero");
1715
1716 Ok(())
1717 }
1718
1719 #[tokio::test]
1720 async fn test_ticket_stats_must_fail_for_non_existing_channel() -> anyhow::Result<()> {
1721 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1722
1723 db.get_ticket_statistics(Some(*CHANNEL_ID))
1724 .await
1725 .expect_err("must fail for non-existing channel");
1726
1727 Ok(())
1728 }
1729
1730 #[tokio::test]
1731 async fn test_ticket_stats_must_be_zero_when_no_tickets() -> anyhow::Result<()> {
1732 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1733
1734 let channel = ChannelEntry::new(
1735 BOB.public().to_address(),
1736 ALICE.public().to_address(),
1737 u32::MAX.into(),
1738 0.into(),
1739 ChannelStatus::Open,
1740 4_u32.into(),
1741 );
1742
1743 db.upsert_channel(None, channel).await?;
1744
1745 let stats = db.get_ticket_statistics(Some(*CHANNEL_ID)).await?;
1746
1747 assert_eq!(
1748 ChannelTicketStatistics::default(),
1749 stats,
1750 "must be equal to default which is all zeros"
1751 );
1752
1753 assert_eq!(
1754 stats,
1755 db.get_ticket_statistics(None).await?,
1756 "per-channel stats must be the same as global stats"
1757 );
1758
1759 Ok(())
1760 }
1761
1762 #[tokio::test]
1763 async fn test_ticket_stats_must_be_different_per_channel() -> anyhow::Result<()> {
1764 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1765
1766 let channel_1 = ChannelEntry::new(
1767 BOB.public().to_address(),
1768 ALICE.public().to_address(),
1769 u32::MAX.into(),
1770 0.into(),
1771 ChannelStatus::Open,
1772 4_u32.into(),
1773 );
1774
1775 db.upsert_channel(None, channel_1).await?;
1776
1777 let channel_2 = ChannelEntry::new(
1778 ALICE.public().to_address(),
1779 BOB.public().to_address(),
1780 u32::MAX.into(),
1781 0.into(),
1782 ChannelStatus::Open,
1783 4_u32.into(),
1784 );
1785
1786 db.upsert_channel(None, channel_2).await?;
1787
1788 let t1 = generate_random_ack_ticket(&BOB, &ALICE, 1, 1, 1.0)?;
1789 let t2 = generate_random_ack_ticket(&ALICE, &BOB, 1, 1, 1.0)?;
1790
1791 let value = t1.verified_ticket().amount;
1792
1793 db.upsert_ticket(None, t1).await?;
1794 db.upsert_ticket(None, t2).await?;
1795
1796 let stats_1 = db
1797 .get_ticket_statistics(Some(generate_channel_id(
1798 &BOB.public().to_address(),
1799 &ALICE.public().to_address(),
1800 )))
1801 .await?;
1802
1803 let stats_2 = db
1804 .get_ticket_statistics(Some(generate_channel_id(
1805 &ALICE.public().to_address(),
1806 &BOB.public().to_address(),
1807 )))
1808 .await?;
1809
1810 assert_eq!(value, stats_1.unredeemed_value);
1811 assert_eq!(value, stats_2.unredeemed_value);
1812
1813 assert_eq!(HoprBalance::zero(), stats_1.neglected_value);
1814 assert_eq!(HoprBalance::zero(), stats_2.neglected_value);
1815
1816 assert_eq!(stats_1, stats_2);
1817
1818 db.mark_tickets_as(channel_1.into(), TicketMarker::Neglected).await?;
1819
1820 let stats_1 = db
1821 .get_ticket_statistics(Some(generate_channel_id(
1822 &BOB.public().to_address(),
1823 &ALICE.public().to_address(),
1824 )))
1825 .await?;
1826
1827 let stats_2 = db
1828 .get_ticket_statistics(Some(generate_channel_id(
1829 &ALICE.public().to_address(),
1830 &BOB.public().to_address(),
1831 )))
1832 .await?;
1833
1834 assert_eq!(HoprBalance::zero(), stats_1.unredeemed_value);
1835 assert_eq!(value, stats_1.neglected_value);
1836
1837 assert_eq!(HoprBalance::zero(), stats_2.neglected_value);
1838
1839 Ok(())
1840 }
1841
1842 #[tokio::test]
1843 async fn test_ticket_index_compare_and_set_and_increment() -> anyhow::Result<()> {
1844 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1845
1846 let hash = Hash::default();
1847
1848 let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1849 assert_eq!(0, old_idx, "old value must be 0");
1850
1851 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1852 assert_eq!(1, new_idx, "new value must be 1");
1853
1854 let old_idx = db.increment_outgoing_ticket_index(hash).await?;
1855 assert_eq!(1, old_idx, "old value must be 1");
1856
1857 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1858 assert_eq!(2, new_idx, "new value must be 2");
1859
1860 Ok(())
1861 }
1862
1863 #[tokio::test]
1864 async fn test_ticket_index_compare_and_set_must_not_decrease() -> anyhow::Result<()> {
1865 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1866
1867 let hash = Hash::default();
1868
1869 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1870 assert_eq!(0, new_idx, "value must be 0");
1871
1872 db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1873
1874 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1875 assert_eq!(1, new_idx, "new value must be 1");
1876
1877 let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 0).await?;
1878 assert_eq!(1, old_idx, "old value must be 1");
1879 assert_eq!(1, new_idx, "new value must be 1");
1880
1881 let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1882 assert_eq!(1, old_idx, "old value must be 1");
1883 assert_eq!(1, new_idx, "new value must be 1");
1884
1885 Ok(())
1886 }
1887
1888 #[tokio::test]
1889 async fn test_ticket_index_reset() -> anyhow::Result<()> {
1890 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1891
1892 let hash = Hash::default();
1893
1894 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1895 assert_eq!(0, new_idx, "value must be 0");
1896
1897 db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1898
1899 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1900 assert_eq!(1, new_idx, "new value must be 1");
1901
1902 let old_idx = db.reset_outgoing_ticket_index(hash).await?;
1903 assert_eq!(1, old_idx, "old value must be 1");
1904
1905 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1906 assert_eq!(0, new_idx, "new value must be 0");
1907 Ok(())
1908 }
1909
1910 #[tokio::test]
1911 async fn test_persist_ticket_indices() -> anyhow::Result<()> {
1912 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1913
1914 let hash_1 = Hash::default();
1915 let hash_2 = Hash::from(hopr_crypto_random::random_bytes());
1916
1917 db.get_outgoing_ticket_index(hash_1).await?;
1918 db.compare_and_set_outgoing_ticket_index(hash_2, 10).await?;
1919
1920 let persisted = db.persist_outgoing_ticket_indices().await?;
1921 assert_eq!(1, persisted);
1922
1923 let indices = hopr_db_entity::outgoing_ticket_index::Entity::find()
1924 .all(&db.tickets_db)
1925 .await?;
1926 let idx_1 = indices
1927 .iter()
1928 .find(|idx| idx.channel_id == hash_1.to_hex())
1929 .context("must contain index 1")?;
1930 let idx_2 = indices
1931 .iter()
1932 .find(|idx| idx.channel_id == hash_2.to_hex())
1933 .context("must contain index 2")?;
1934 assert_eq!(0, U256::from_be_bytes(&idx_1.index).as_u64(), "index must be 0");
1935 assert_eq!(10, U256::from_be_bytes(&idx_2.index).as_u64(), "index must be 10");
1936
1937 db.compare_and_set_outgoing_ticket_index(hash_1, 3).await?;
1938 db.increment_outgoing_ticket_index(hash_2).await?;
1939
1940 let persisted = db.persist_outgoing_ticket_indices().await?;
1941 assert_eq!(2, persisted);
1942
1943 let indices = hopr_db_entity::outgoing_ticket_index::Entity::find()
1944 .all(&db.tickets_db)
1945 .await?;
1946 let idx_1 = indices
1947 .iter()
1948 .find(|idx| idx.channel_id == hash_1.to_hex())
1949 .context("must contain index 1")?;
1950 let idx_2 = indices
1951 .iter()
1952 .find(|idx| idx.channel_id == hash_2.to_hex())
1953 .context("must contain index 2")?;
1954 assert_eq!(3, U256::from_be_bytes(&idx_1.index).as_u64(), "index must be 3");
1955 assert_eq!(11, U256::from_be_bytes(&idx_2.index).as_u64(), "index must be 11");
1956 Ok(())
1957 }
1958
1959 #[tokio::test]
1960 async fn test_cache_can_be_cloned_but_referencing_the_original_cache_storage() -> anyhow::Result<()> {
1961 let cache: moka::future::Cache<i64, i64> = moka::future::Cache::new(5);
1962
1963 assert_eq!(cache.weighted_size(), 0);
1964
1965 cache.insert(1, 1).await;
1966 cache.insert(2, 2).await;
1967
1968 let clone = cache.clone();
1969
1970 cache.remove(&1).await;
1971 cache.remove(&2).await;
1972
1973 assert_eq!(cache.get(&1).await, None);
1974 assert_eq!(cache.get(&1).await, clone.get(&1).await);
1975 Ok(())
1976 }
1977
1978 fn dummy_ticket_model(channel_id: Hash, idx: u64, idx_offset: u32, amount: u32) -> ticket::Model {
1979 ticket::Model {
1980 id: 0,
1981 channel_id: channel_id.to_string(),
1982 amount: U256::from(amount).to_be_bytes().to_vec(),
1983 index: idx.to_be_bytes().to_vec(),
1984 index_offset: idx_offset as i32,
1985 winning_probability: hex!("0020C49BA5E34F").to_vec(), channel_epoch: vec![],
1987 signature: vec![],
1988 response: vec![],
1989 state: 0,
1990 hash: vec![],
1991 }
1992 }
1993
1994 #[tokio::test]
1995 async fn test_aggregation_prerequisites_default_filter_no_tickets() -> anyhow::Result<()> {
1996 let prerequisites = AggregationPrerequisites::default();
1997 assert_eq!(None, prerequisites.min_unaggregated_ratio);
1998 assert_eq!(None, prerequisites.min_ticket_count);
1999
2000 let channel = ChannelEntry::new(
2001 BOB.public().to_address(),
2002 ALICE.public().to_address(),
2003 u32::MAX.into(),
2004 2.into(),
2005 ChannelStatus::Open,
2006 4_u32.into(),
2007 );
2008
2009 let dummy_tickets = vec![dummy_ticket_model(channel.get_id(), 1, 1, 1)];
2010
2011 let filtered_tickets =
2012 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2013
2014 assert_eq!(
2015 dummy_tickets, filtered_tickets,
2016 "empty prerequisites must not filter anything"
2017 );
2018 Ok(())
2019 }
2020
2021 #[tokio::test]
2022 async fn test_aggregation_prerequisites_should_filter_out_tickets_with_lower_than_min_win_prob()
2023 -> anyhow::Result<()> {
2024 let prerequisites = AggregationPrerequisites::default();
2025 assert_eq!(None, prerequisites.min_unaggregated_ratio);
2026 assert_eq!(None, prerequisites.min_ticket_count);
2027
2028 let channel = ChannelEntry::new(
2029 BOB.public().to_address(),
2030 ALICE.public().to_address(),
2031 u32::MAX.into(),
2032 2.into(),
2033 ChannelStatus::Open,
2034 4_u32.into(),
2035 );
2036
2037 let dummy_tickets = vec![dummy_ticket_model(channel.get_id(), 1, 1, 1)];
2038
2039 let filtered_tickets =
2040 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0006.try_into()?)?;
2041
2042 assert!(
2043 filtered_tickets.is_empty(),
2044 "must filter out tickets with lower win prob"
2045 );
2046 Ok(())
2047 }
2048
2049 #[tokio::test]
2050 async fn test_aggregation_prerequisites_must_trim_tickets_exceeding_channel_balance() -> anyhow::Result<()> {
2051 const TICKET_COUNT: usize = 110;
2052
2053 let prerequisites = AggregationPrerequisites::default();
2054 assert_eq!(None, prerequisites.min_unaggregated_ratio);
2055 assert_eq!(None, prerequisites.min_ticket_count);
2056
2057 let channel = ChannelEntry::new(
2058 BOB.public().to_address(),
2059 ALICE.public().to_address(),
2060 100.into(),
2061 (TICKET_COUNT + 1).into(),
2062 ChannelStatus::Open,
2063 4_u32.into(),
2064 );
2065
2066 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2067 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2068 .collect();
2069
2070 let filtered_tickets =
2071 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2072
2073 assert_eq!(
2074 100,
2075 filtered_tickets.len(),
2076 "must take only tickets up to channel balance"
2077 );
2078 assert!(
2079 filtered_tickets
2080 .into_iter()
2081 .map(|t| U256::from_be_bytes(t.amount).as_u64())
2082 .sum::<u64>()
2083 <= channel.balance.amount().as_u64(),
2084 "filtered tickets must not exceed channel balance"
2085 );
2086 Ok(())
2087 }
2088
2089 #[tokio::test]
2090 async fn test_aggregation_prerequisites_must_return_empty_when_minimum_ticket_count_not_met() -> anyhow::Result<()>
2091 {
2092 const TICKET_COUNT: usize = 10;
2093
2094 let prerequisites = AggregationPrerequisites {
2095 min_ticket_count: Some(TICKET_COUNT + 1),
2096 min_unaggregated_ratio: None,
2097 };
2098
2099 let channel = ChannelEntry::new(
2100 BOB.public().to_address(),
2101 ALICE.public().to_address(),
2102 100.into(),
2103 (TICKET_COUNT + 1).into(),
2104 ChannelStatus::Open,
2105 4_u32.into(),
2106 );
2107
2108 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2109 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2110 .collect();
2111
2112 let filtered_tickets =
2113 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2114
2115 assert!(
2116 filtered_tickets.is_empty(),
2117 "must return empty when min_ticket_count is not met"
2118 );
2119
2120 Ok(())
2121 }
2122
2123 #[tokio::test]
2124 async fn test_aggregation_prerequisites_must_return_empty_when_minimum_unaggregated_ratio_is_not_met()
2125 -> anyhow::Result<()> {
2126 const TICKET_COUNT: usize = 10;
2127
2128 let prerequisites = AggregationPrerequisites {
2129 min_ticket_count: None,
2130 min_unaggregated_ratio: Some(0.9),
2131 };
2132
2133 let channel = ChannelEntry::new(
2134 BOB.public().to_address(),
2135 ALICE.public().to_address(),
2136 100.into(),
2137 (TICKET_COUNT + 1).into(),
2138 ChannelStatus::Open,
2139 4_u32.into(),
2140 );
2141
2142 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2143 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2144 .collect();
2145
2146 let filtered_tickets =
2147 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2148
2149 assert!(
2150 filtered_tickets.is_empty(),
2151 "must return empty when min_unaggregated_ratio is not met"
2152 );
2153
2154 Ok(())
2155 }
2156
2157 #[tokio::test]
2158 async fn test_aggregation_prerequisites_must_return_all_when_minimum_ticket_count_is_met() -> anyhow::Result<()> {
2159 const TICKET_COUNT: usize = 10;
2160
2161 let prerequisites = AggregationPrerequisites {
2162 min_ticket_count: Some(TICKET_COUNT),
2163 min_unaggregated_ratio: None,
2164 };
2165
2166 let channel = ChannelEntry::new(
2167 BOB.public().to_address(),
2168 ALICE.public().to_address(),
2169 100.into(),
2170 (TICKET_COUNT + 1).into(),
2171 ChannelStatus::Open,
2172 4_u32.into(),
2173 );
2174
2175 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2176 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2177 .collect();
2178
2179 let filtered_tickets =
2180 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2181
2182 assert!(!filtered_tickets.is_empty(), "must not return empty");
2183 assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2184 Ok(())
2185 }
2186
2187 #[tokio::test]
2188 async fn test_aggregation_prerequisites_must_return_all_when_minimum_ticket_count_is_met_regardless_ratio()
2189 -> anyhow::Result<()> {
2190 const TICKET_COUNT: usize = 10;
2191
2192 let prerequisites = AggregationPrerequisites {
2193 min_ticket_count: Some(TICKET_COUNT),
2194 min_unaggregated_ratio: Some(0.9),
2195 };
2196
2197 let channel = ChannelEntry::new(
2198 BOB.public().to_address(),
2199 ALICE.public().to_address(),
2200 100.into(),
2201 (TICKET_COUNT + 1).into(),
2202 ChannelStatus::Open,
2203 4_u32.into(),
2204 );
2205
2206 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2207 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2208 .collect();
2209
2210 let filtered_tickets =
2211 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2212
2213 assert!(!filtered_tickets.is_empty(), "must not return empty");
2214 assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2215 Ok(())
2216 }
2217
2218 #[tokio::test]
2219 async fn test_aggregation_prerequisites_must_return_all_when_minimum_unaggregated_ratio_is_met()
2220 -> anyhow::Result<()> {
2221 const TICKET_COUNT: usize = 90;
2222
2223 let prerequisites = AggregationPrerequisites {
2224 min_ticket_count: None,
2225 min_unaggregated_ratio: Some(0.9),
2226 };
2227
2228 let channel = ChannelEntry::new(
2229 BOB.public().to_address(),
2230 ALICE.public().to_address(),
2231 100.into(),
2232 (TICKET_COUNT + 1).into(),
2233 ChannelStatus::Open,
2234 4_u32.into(),
2235 );
2236
2237 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2238 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2239 .collect();
2240
2241 let filtered_tickets =
2242 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2243
2244 assert!(!filtered_tickets.is_empty(), "must not return empty");
2245 assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2246 Ok(())
2247 }
2248
2249 #[tokio::test]
2250 async fn test_aggregation_prerequisites_must_return_all_when_minimum_unaggregated_ratio_is_met_regardless_count()
2251 -> anyhow::Result<()> {
2252 const TICKET_COUNT: usize = 90;
2253
2254 let prerequisites = AggregationPrerequisites {
2255 min_ticket_count: Some(TICKET_COUNT + 1),
2256 min_unaggregated_ratio: Some(0.9),
2257 };
2258
2259 let channel = ChannelEntry::new(
2260 BOB.public().to_address(),
2261 ALICE.public().to_address(),
2262 100.into(),
2263 (TICKET_COUNT + 1).into(),
2264 ChannelStatus::Open,
2265 4_u32.into(),
2266 );
2267
2268 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2269 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2270 .collect();
2271
2272 let filtered_tickets =
2273 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2274
2275 assert!(!filtered_tickets.is_empty(), "must not return empty");
2276 assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2277 Ok(())
2278 }
2279
2280 #[tokio::test]
2281 async fn test_aggregation_prerequisites_must_return_tickets_when_minimum_incl_aggregated_ratio_is_met()
2282 -> anyhow::Result<()> {
2283 const TICKET_COUNT: usize = 90;
2284
2285 let prerequisites = AggregationPrerequisites {
2286 min_ticket_count: None,
2287 min_unaggregated_ratio: Some(0.9),
2288 };
2289
2290 let channel = ChannelEntry::new(
2291 BOB.public().to_address(),
2292 ALICE.public().to_address(),
2293 100.into(),
2294 (TICKET_COUNT + 1).into(),
2295 ChannelStatus::Open,
2296 4_u32.into(),
2297 );
2298
2299 let mut dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2300 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2301 .collect();
2302 dummy_tickets[0].index_offset = 2; let filtered_tickets =
2305 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2306
2307 assert_eq!(filtered_tickets.len(), TICKET_COUNT);
2308 Ok(())
2309 }
2310
2311 #[tokio::test]
2312 async fn test_aggregation_prerequisites_must_return_empty_when_minimum_only_unaggregated_ratio_is_met_in_single_ticket_only()
2313 -> anyhow::Result<()> {
2314 let prerequisites = AggregationPrerequisites {
2315 min_ticket_count: None,
2316 min_unaggregated_ratio: Some(0.9),
2317 };
2318
2319 let channel = ChannelEntry::new(
2320 BOB.public().to_address(),
2321 ALICE.public().to_address(),
2322 100.into(),
2323 2.into(),
2324 ChannelStatus::Open,
2325 4_u32.into(),
2326 );
2327
2328 let dummy_tickets = vec![dummy_ticket_model(channel.get_id(), 1, 2, 110)];
2330
2331 let filtered_tickets =
2332 filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2333
2334 assert!(filtered_tickets.is_empty(), "must return empty");
2335 Ok(())
2336 }
2337
2338 async fn create_alice_db_with_tickets_from_bob(
2339 ticket_count: usize,
2340 ) -> anyhow::Result<(HoprDb, ChannelEntry, Vec<AcknowledgedTicket>)> {
2341 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
2342
2343 db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
2344 .await?;
2345
2346 add_peer_mappings(
2347 &db,
2348 vec![
2349 (ALICE_OFFCHAIN.clone(), ALICE.clone()),
2350 (BOB_OFFCHAIN.clone(), BOB.clone()),
2351 ],
2352 )
2353 .await?;
2354
2355 let (channel, tickets) = init_db_with_tickets(&db, ticket_count as u64).await?;
2356
2357 Ok((db, channel, tickets))
2358 }
2359
2360 #[tokio::test]
2361 async fn test_ticket_aggregation_should_fail_if_any_ticket_is_being_aggregated_in_that_channel()
2362 -> anyhow::Result<()> {
2363 const COUNT_TICKETS: usize = 5;
2364
2365 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2366
2367 assert_eq!(tickets.len(), COUNT_TICKETS);
2368
2369 let existing_channel_with_multiple_tickets = channel.get_id();
2370
2371 let mut ticket = hopr_db_entity::ticket::Entity::find()
2373 .one(&db.tickets_db)
2374 .await?
2375 .context("should have an active model")?
2376 .into_active_model();
2377 ticket.state = Set(AcknowledgedTicketStatus::BeingAggregated as i8);
2378 ticket.save(&db.tickets_db).await?;
2379
2380 assert!(
2381 db.prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2382 .await
2383 .is_err()
2384 );
2385
2386 Ok(())
2387 }
2388
2389 #[tokio::test]
2390 async fn test_ticket_aggregation_should_not_offer_tickets_with_lower_than_min_win_prob() -> anyhow::Result<()> {
2391 const COUNT_TICKETS: usize = 5;
2392
2393 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2394
2395 assert_eq!(tickets.len(), COUNT_TICKETS);
2396
2397 let existing_channel_with_multiple_tickets = channel.get_id();
2398
2399 let mut ticket = hopr_db_entity::ticket::Entity::find()
2401 .one(&db.tickets_db)
2402 .await?
2403 .context("should have an active model")?
2404 .into_active_model();
2405 ticket.winning_probability = Set(WinningProbability::try_from_f64(0.5)?.as_ref().to_vec());
2406 ticket.save(&db.tickets_db).await?;
2407
2408 let prepared_tickets = db
2409 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2410 .await?
2411 .ok_or(anyhow!("should contain tickets"))?
2412 .1;
2413
2414 assert_eq!(COUNT_TICKETS - 1, prepared_tickets.len());
2415
2416 Ok(())
2417 }
2418
2419 #[tokio::test]
2420 async fn test_ticket_aggregation_prepare_request_with_0_tickets_should_return_empty_result() -> anyhow::Result<()> {
2421 const COUNT_TICKETS: usize = 0;
2422
2423 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2424
2425 assert_eq!(tickets.len(), COUNT_TICKETS);
2426
2427 let existing_channel_with_0_tickets = channel.get_id();
2428
2429 let actual = db
2430 .prepare_aggregation_in_channel(&existing_channel_with_0_tickets, Default::default())
2431 .await?;
2432
2433 assert_eq!(actual, None);
2434
2435 Ok(())
2436 }
2437
2438 #[tokio::test]
2439 async fn test_ticket_aggregation_prepare_request_with_multiple_tickets_should_return_that_ticket()
2440 -> anyhow::Result<()> {
2441 const COUNT_TICKETS: usize = 2;
2442
2443 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2444 let tickets: Vec<TransferableWinningTicket> = tickets
2445 .into_iter()
2446 .map(|t| t.into_transferable(&ALICE, &Hash::default()))
2447 .collect::<hopr_internal_types::errors::Result<Vec<TransferableWinningTicket>>>()?;
2448
2449 assert_eq!(tickets.len(), COUNT_TICKETS);
2450
2451 let existing_channel_with_multiple_tickets = channel.get_id();
2452
2453 let actual = db
2454 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2455 .await?;
2456
2457 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2458
2459 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2460 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2461 .count(&db.tickets_db)
2462 .await? as usize;
2463
2464 assert_eq!(actual_being_aggregated_count, COUNT_TICKETS);
2465
2466 Ok(())
2467 }
2468
2469 #[tokio::test]
2470 async fn test_ticket_aggregation_prepare_request_with_duplicate_tickets_should_return_dedup_aggregated_ticket()
2471 -> anyhow::Result<()> {
2472 let (db, channel, _) = create_alice_db_with_tickets_from_bob(0).await?;
2473 let tickets = vec![
2474 generate_random_ack_ticket(&BOB, &ALICE, 1, 1, 1.0),
2475 generate_random_ack_ticket(&BOB, &ALICE, 0, 2, 1.0),
2476 generate_random_ack_ticket(&BOB, &ALICE, 2, 1, 1.0),
2477 generate_random_ack_ticket(&BOB, &ALICE, 3, 1, 1.0),
2478 ]
2479 .into_iter()
2480 .collect::<anyhow::Result<Vec<AcknowledgedTicket>>>()?;
2481
2482 let tickets_clone = tickets.clone();
2483 let db_clone = db.clone();
2484 db.nest_transaction_in_db(None, TargetDb::Tickets)
2485 .await?
2486 .perform(|tx| {
2487 Box::pin(async move {
2488 for ticket in tickets_clone {
2489 db_clone.upsert_ticket(tx.into(), ticket).await?;
2490 }
2491 Ok::<_, DbError>(())
2492 })
2493 })
2494 .await?;
2495
2496 let existing_channel_with_multiple_tickets = channel.get_id();
2497 let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2498 assert_eq!(stats.neglected_value, HoprBalance::zero());
2499
2500 let actual = db
2501 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2502 .await?;
2503
2504 let mut tickets = tickets
2505 .into_iter()
2506 .map(|t| t.into_transferable(&ALICE, &Hash::default()))
2507 .collect::<hopr_internal_types::errors::Result<Vec<_>>>()?;
2508
2509 tickets.remove(0);
2511 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2512
2513 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2514 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2515 .count(&db.tickets_db)
2516 .await? as usize;
2517
2518 assert_eq!(actual_being_aggregated_count, 3);
2519
2520 Ok(())
2521 }
2522
2523 #[tokio::test]
2524 async fn test_ticket_aggregation_prepare_request_with_a_being_redeemed_ticket_should_aggregate_only_the_tickets_following_it()
2525 -> anyhow::Result<()> {
2526 const COUNT_TICKETS: usize = 5;
2527
2528 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2529 let mut tickets = tickets
2530 .into_iter()
2531 .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2532 .collect::<Vec<_>>();
2533
2534 assert_eq!(tickets.len(), COUNT_TICKETS);
2535
2536 let existing_channel_with_multiple_tickets = channel.get_id();
2537
2538 let mut ticket = hopr_db_entity::ticket::Entity::find()
2540 .one(&db.tickets_db)
2541 .await?
2542 .context("should have 1 active model")?
2543 .into_active_model();
2544 ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
2545 ticket.save(&db.tickets_db).await?;
2546
2547 let actual = db
2548 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2549 .await?;
2550
2551 tickets.remove(0);
2552 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2553
2554 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2555 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2556 .count(&db.tickets_db)
2557 .await? as usize;
2558
2559 assert_eq!(actual_being_aggregated_count, COUNT_TICKETS - 1);
2560
2561 Ok(())
2562 }
2563
2564 #[tokio::test]
2565 async fn test_ticket_aggregation_prepare_request_with_some_requirements_should_return_when_ticket_threshold_is_met()
2566 -> anyhow::Result<()> {
2567 const COUNT_TICKETS: usize = 5;
2568
2569 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2570 let tickets = tickets
2571 .into_iter()
2572 .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2573 .collect::<Vec<_>>();
2574
2575 assert_eq!(tickets.len(), COUNT_TICKETS);
2576
2577 let existing_channel_with_multiple_tickets = channel.get_id();
2578
2579 let constraints = AggregationPrerequisites {
2580 min_ticket_count: Some(COUNT_TICKETS - 1),
2581 min_unaggregated_ratio: None,
2582 };
2583 let actual = db
2584 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, constraints)
2585 .await?;
2586
2587 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2588
2589 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2590 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2591 .count(&db.tickets_db)
2592 .await? as usize;
2593
2594 assert_eq!(actual_being_aggregated_count, COUNT_TICKETS);
2595
2596 Ok(())
2597 }
2598
2599 #[tokio::test]
2600 async fn test_ticket_aggregation_prepare_request_with_some_requirements_should_not_return_when_ticket_threshold_is_not_met()
2601 -> anyhow::Result<()> {
2602 const COUNT_TICKETS: usize = 2;
2603
2604 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2605
2606 assert_eq!(tickets.len(), COUNT_TICKETS);
2607
2608 let existing_channel_with_multiple_tickets = channel.get_id();
2609
2610 let constraints = AggregationPrerequisites {
2611 min_ticket_count: Some(COUNT_TICKETS + 1),
2612 min_unaggregated_ratio: None,
2613 };
2614 let actual = db
2615 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, constraints)
2616 .await?;
2617
2618 assert_eq!(actual, None);
2619
2620 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2621 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2622 .count(&db.tickets_db)
2623 .await? as usize;
2624
2625 assert_eq!(actual_being_aggregated_count, 0);
2626
2627 Ok(())
2628 }
2629
2630 #[tokio::test]
2631 async fn test_ticket_aggregation_prepare_request_with_no_aggregatable_tickets_should_return_nothing()
2632 -> anyhow::Result<()> {
2633 const COUNT_TICKETS: usize = 3;
2634
2635 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2636
2637 assert_eq!(tickets.len(), { COUNT_TICKETS });
2638
2639 let existing_channel_with_multiple_tickets = channel.get_id();
2640
2641 for ticket in hopr_db_entity::ticket::Entity::find()
2643 .all(&db.tickets_db)
2644 .await?
2645 .into_iter()
2646 {
2647 let mut ticket = ticket.into_active_model();
2648 ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
2649 ticket.save(&db.tickets_db).await?;
2650 }
2651
2652 let actual = db
2653 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2654 .await?;
2655
2656 assert_eq!(actual, None);
2657
2658 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2659 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2660 .count(&db.tickets_db)
2661 .await? as usize;
2662
2663 assert_eq!(actual_being_aggregated_count, 0);
2664
2665 Ok(())
2666 }
2667
2668 #[tokio::test]
2669 async fn test_ticket_aggregation_rollback_should_rollback_all_the_being_aggregated_tickets_but_nothing_else()
2670 -> anyhow::Result<()> {
2671 const COUNT_TICKETS: usize = 5;
2672
2673 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2674
2675 assert_eq!(tickets.len(), COUNT_TICKETS);
2676
2677 let existing_channel_with_multiple_tickets = channel.get_id();
2678
2679 let mut ticket = hopr_db_entity::ticket::Entity::find()
2681 .one(&db.tickets_db)
2682 .await?
2683 .context("should have one active model")?
2684 .into_active_model();
2685 ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
2686 ticket.save(&db.tickets_db).await?;
2687
2688 assert!(
2689 db.prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2690 .await
2691 .is_ok()
2692 );
2693
2694 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2695 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2696 .count(&db.tickets_db)
2697 .await? as usize;
2698
2699 assert_eq!(actual_being_aggregated_count, COUNT_TICKETS - 1);
2700
2701 assert!(
2702 db.rollback_aggregation_in_channel(existing_channel_with_multiple_tickets)
2703 .await
2704 .is_ok()
2705 );
2706
2707 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2708 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2709 .count(&db.tickets_db)
2710 .await? as usize;
2711
2712 assert_eq!(actual_being_aggregated_count, 0);
2713
2714 Ok(())
2715 }
2716
2717 #[tokio::test]
2718 async fn test_ticket_aggregation_should_replace_the_tickets_with_a_correctly_aggregated_ticket()
2719 -> anyhow::Result<()> {
2720 const COUNT_TICKETS: usize = 5;
2721
2722 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2723
2724 let (notifier_tx, notifier_rx) = futures::channel::mpsc::unbounded();
2725 db.start_ticket_processing(Some(notifier_tx))?;
2726
2727 let tickets = tickets
2728 .into_iter()
2729 .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2730 .collect::<Vec<_>>();
2731
2732 let first_ticket = tickets.first().context("should contain tickets")?.ticket.clone();
2733 let aggregated_ticket = TicketBuilder::default()
2734 .addresses(&*BOB, &*ALICE)
2735 .amount(
2736 tickets
2737 .iter()
2738 .fold(U256::zero(), |acc, v| acc + v.ticket.amount.amount()),
2739 )
2740 .index(first_ticket.index)
2741 .index_offset(
2742 tickets.last().context("should contain tickets")?.ticket.index as u32 - first_ticket.index as u32 + 1,
2743 )
2744 .win_prob(1.0.try_into()?)
2745 .channel_epoch(first_ticket.channel_epoch)
2746 .challenge(first_ticket.challenge)
2747 .build_signed(&BOB, &Hash::default())?;
2748
2749 assert_eq!(tickets.len(), COUNT_TICKETS);
2750
2751 let existing_channel_with_multiple_tickets = channel.get_id();
2752
2753 let actual = db
2754 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2755 .await?;
2756
2757 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2758
2759 let agg_ticket = aggregated_ticket.clone();
2760
2761 let _ = db
2762 .process_received_aggregated_ticket(aggregated_ticket.leak(), &ALICE)
2763 .await?;
2764
2765 pin_mut!(notifier_rx);
2766 let notified_ticket = notifier_rx.next().await.ok_or(anyhow!("must have ticket"))?;
2767
2768 assert_eq!(notified_ticket.verified_ticket(), agg_ticket.verified_ticket());
2769
2770 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2771 .filter(ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2772 .count(&db.tickets_db)
2773 .await? as usize;
2774
2775 assert_eq!(actual_being_aggregated_count, 0);
2776
2777 Ok(())
2778 }
2779
2780 #[tokio::test]
2781 async fn test_ticket_aggregation_should_fail_if_the_aggregated_ticket_value_is_lower_than_the_stored_one()
2782 -> anyhow::Result<()> {
2783 const COUNT_TICKETS: usize = 5;
2784
2785 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2786 let tickets = tickets
2787 .into_iter()
2788 .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2789 .collect::<Vec<_>>();
2790
2791 let first_ticket = tickets.first().context("should contain tickets")?.ticket.clone();
2792 let aggregated_ticket = TicketBuilder::default()
2793 .addresses(&*BOB, &*ALICE)
2794 .amount(0)
2795 .index(first_ticket.index)
2796 .index_offset(
2797 tickets.last().context("should contain tickets")?.ticket.index as u32 - first_ticket.index as u32 + 1,
2798 )
2799 .win_prob(1.0.try_into()?)
2800 .channel_epoch(first_ticket.channel_epoch)
2801 .challenge(first_ticket.challenge)
2802 .build_signed(&BOB, &Hash::default())?;
2803
2804 assert_eq!(tickets.len(), COUNT_TICKETS);
2805
2806 let existing_channel_with_multiple_tickets = channel.get_id();
2807
2808 let actual = db
2809 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2810 .await?;
2811
2812 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2813
2814 assert!(
2815 db.process_received_aggregated_ticket(aggregated_ticket.leak(), &ALICE)
2816 .await
2817 .is_err()
2818 );
2819
2820 Ok(())
2821 }
2822
2823 #[tokio::test]
2824 async fn test_ticket_aggregation_should_fail_if_the_aggregated_ticket_win_probability_is_not_equal_to_1()
2825 -> anyhow::Result<()> {
2826 const COUNT_TICKETS: usize = 5;
2827
2828 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2829 let tickets = tickets
2830 .into_iter()
2831 .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2832 .collect::<Vec<_>>();
2833
2834 let first_ticket = tickets.first().context("should contain tickets")?.ticket.clone();
2835 let aggregated_ticket = TicketBuilder::default()
2836 .addresses(&*BOB, &*ALICE)
2837 .amount(0)
2838 .index(first_ticket.index)
2839 .index_offset(
2840 tickets.last().context("should contain tickets")?.ticket.index as u32 - first_ticket.index as u32 + 1,
2841 )
2842 .win_prob(0.5.try_into()?) .channel_epoch(first_ticket.channel_epoch)
2844 .challenge(first_ticket.challenge)
2845 .build_signed(&BOB, &Hash::default())?;
2846
2847 assert_eq!(tickets.len(), COUNT_TICKETS);
2848
2849 let existing_channel_with_multiple_tickets = channel.get_id();
2850
2851 let actual = db
2852 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2853 .await?;
2854
2855 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2856
2857 assert!(
2858 db.process_received_aggregated_ticket(aggregated_ticket.leak(), &ALICE)
2859 .await
2860 .is_err()
2861 );
2862
2863 Ok(())
2864 }
2865
2866 async fn init_db_with_channel(channel: ChannelEntry) -> anyhow::Result<HoprDb> {
2867 let db = HoprDb::new_in_memory(BOB.clone()).await?;
2868
2869 db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
2870 .await?;
2871
2872 add_peer_mappings(
2873 &db,
2874 vec![
2875 (ALICE_OFFCHAIN.clone(), ALICE.clone()),
2876 (BOB_OFFCHAIN.clone(), BOB.clone()),
2877 ],
2878 )
2879 .await?;
2880
2881 db.upsert_channel(None, channel).await?;
2882
2883 Ok(db)
2884 }
2885
2886 #[tokio::test]
2887 async fn test_aggregate_ticket_should_aggregate() -> anyhow::Result<()> {
2888 const COUNT_TICKETS: usize = 5;
2889
2890 let channel = ChannelEntry::new(
2891 BOB.public().to_address(),
2892 ALICE.public().to_address(),
2893 u32::MAX.into(),
2894 (COUNT_TICKETS + 1).into(),
2895 ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(120))),
2896 4_u32.into(),
2897 );
2898
2899 let db = init_db_with_channel(channel).await?;
2900
2901 let tickets = (0..COUNT_TICKETS)
2902 .map(|i| {
2903 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
2904 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
2905 })
2906 .collect::<anyhow::Result<Vec<_>>>()?;
2907
2908 let sum_value = tickets.iter().fold(HoprBalance::zero(), |acc, x| acc + x.ticket.amount);
2909 let min_idx = tickets
2910 .iter()
2911 .map(|t| t.ticket.index)
2912 .min()
2913 .context("min index should be present")?;
2914 let max_idx = tickets
2915 .iter()
2916 .map(|t| t.ticket.index)
2917 .max()
2918 .context("max index should be present")?;
2919
2920 let aggregated = db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets, &BOB).await?;
2921
2922 assert_eq!(
2923 &BOB.public().to_address(),
2924 aggregated.verified_issuer(),
2925 "must have correct signer"
2926 );
2927
2928 assert!(aggregated.verified_ticket().is_aggregated(), "must be aggregated");
2929
2930 assert_eq!(
2931 COUNT_TICKETS,
2932 aggregated.verified_ticket().index_offset as usize,
2933 "aggregated ticket must have correct offset"
2934 );
2935 assert_eq!(
2936 sum_value,
2937 aggregated.verified_ticket().amount,
2938 "aggregated ticket token amount must be sum of individual tickets"
2939 );
2940 assert_eq!(
2941 1.0,
2942 aggregated.win_prob(),
2943 "aggregated ticket must have winning probability 1"
2944 );
2945 assert_eq!(
2946 min_idx,
2947 aggregated.verified_ticket().index,
2948 "aggregated ticket must have correct index"
2949 );
2950 assert_eq!(
2951 channel.get_id(),
2952 aggregated.verified_ticket().channel_id,
2953 "aggregated ticket must have correct channel id"
2954 );
2955 assert_eq!(
2956 channel.channel_epoch.as_u32(),
2957 aggregated.verified_ticket().channel_epoch,
2958 "aggregated ticket must have correct channel epoch"
2959 );
2960
2961 assert_eq!(
2962 max_idx + 1,
2963 db.get_outgoing_ticket_index(channel.get_id())
2964 .await?
2965 .load(Ordering::SeqCst)
2966 );
2967
2968 Ok(())
2969 }
2970
2971 #[tokio::test]
2972 async fn test_aggregate_ticket_should_aggregate_including_aggregated() -> anyhow::Result<()> {
2973 const COUNT_TICKETS: usize = 5;
2974
2975 let channel = ChannelEntry::new(
2976 BOB.public().to_address(),
2977 ALICE.public().to_address(),
2978 u32::MAX.into(),
2979 (COUNT_TICKETS + 1).into(),
2980 ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(120))),
2981 4_u32.into(),
2982 );
2983
2984 let db = init_db_with_channel(channel).await?;
2985
2986 let offset = 10_usize;
2987
2988 let mut tickets = (1..COUNT_TICKETS)
2989 .map(|i| {
2990 generate_random_ack_ticket(&BOB, &ALICE, (i + offset) as u64, 1, 1.0)
2991 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
2992 })
2993 .collect::<anyhow::Result<Vec<_>>>()?;
2994
2995 tickets.push(
2997 generate_random_ack_ticket(&BOB, &ALICE, 0, offset as u32, 1.0)
2998 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))?,
2999 );
3000
3001 let sum_value = tickets.iter().fold(HoprBalance::zero(), |acc, x| acc + x.ticket.amount);
3002 let min_idx = tickets
3003 .iter()
3004 .map(|t| t.ticket.index)
3005 .min()
3006 .context("min index should be present")?;
3007 let max_idx = tickets
3008 .iter()
3009 .map(|t| t.ticket.index)
3010 .max()
3011 .context("max index should be present")?;
3012
3013 let aggregated = db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets, &BOB).await?;
3014
3015 assert_eq!(
3016 &BOB.public().to_address(),
3017 aggregated.verified_issuer(),
3018 "must have correct signer"
3019 );
3020
3021 assert!(aggregated.verified_ticket().is_aggregated(), "must be aggregated");
3022
3023 assert_eq!(
3024 COUNT_TICKETS + offset,
3025 aggregated.verified_ticket().index_offset as usize,
3026 "aggregated ticket must have correct offset"
3027 );
3028 assert_eq!(
3029 sum_value,
3030 aggregated.verified_ticket().amount,
3031 "aggregated ticket token amount must be sum of individual tickets"
3032 );
3033 assert_eq!(
3034 1.0,
3035 aggregated.win_prob(),
3036 "aggregated ticket must have winning probability 1"
3037 );
3038 assert_eq!(
3039 min_idx,
3040 aggregated.verified_ticket().index,
3041 "aggregated ticket must have correct index"
3042 );
3043 assert_eq!(
3044 channel.get_id(),
3045 aggregated.verified_ticket().channel_id,
3046 "aggregated ticket must have correct channel id"
3047 );
3048 assert_eq!(
3049 channel.channel_epoch.as_u32(),
3050 aggregated.verified_ticket().channel_epoch,
3051 "aggregated ticket must have correct channel epoch"
3052 );
3053
3054 assert_eq!(
3055 max_idx + 1,
3056 db.get_outgoing_ticket_index(channel.get_id())
3057 .await?
3058 .load(Ordering::SeqCst)
3059 );
3060
3061 Ok(())
3062 }
3063
3064 #[tokio::test]
3065 async fn test_aggregate_ticket_should_not_aggregate_zero_tickets() -> anyhow::Result<()> {
3066 let db = HoprDb::new_in_memory(BOB.clone()).await?;
3067
3068 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), vec![], &BOB)
3069 .await
3070 .expect_err("should not aggregate empty ticket list");
3071
3072 Ok(())
3073 }
3074
3075 #[tokio::test]
3076 async fn test_aggregate_ticket_should_aggregate_single_ticket_to_itself() -> anyhow::Result<()> {
3077 const COUNT_TICKETS: usize = 1;
3078
3079 let channel = ChannelEntry::new(
3080 BOB.public().to_address(),
3081 ALICE.public().to_address(),
3082 u32::MAX.into(),
3083 (COUNT_TICKETS + 1).into(),
3084 ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(120))),
3085 4_u32.into(),
3086 );
3087
3088 let db = init_db_with_channel(channel).await?;
3089
3090 let mut tickets = (0..COUNT_TICKETS)
3091 .map(|i| {
3092 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3093 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3094 })
3095 .collect::<anyhow::Result<Vec<_>>>()?;
3096
3097 let aggregated = db
3098 .aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3099 .await?;
3100
3101 assert_eq!(
3102 &tickets.pop().context("ticket should be present")?.ticket,
3103 aggregated.verified_ticket()
3104 );
3105
3106 Ok(())
3107 }
3108
3109 #[tokio::test]
3110 async fn test_aggregate_ticket_should_not_aggregate_on_closed_channel() -> anyhow::Result<()> {
3111 const COUNT_TICKETS: usize = 3;
3112
3113 let channel = ChannelEntry::new(
3114 BOB.public().to_address(),
3115 ALICE.public().to_address(),
3116 u32::MAX.into(),
3117 (COUNT_TICKETS + 1).into(),
3118 ChannelStatus::Closed,
3119 4_u32.into(),
3120 );
3121
3122 let db = init_db_with_channel(channel).await?;
3123
3124 let tickets = (0..COUNT_TICKETS)
3125 .map(|i| {
3126 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3127 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3128 })
3129 .collect::<anyhow::Result<Vec<_>>>()?;
3130
3131 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3132 .await
3133 .expect_err("should not aggregate on closed channel");
3134
3135 Ok(())
3136 }
3137
3138 #[tokio::test]
3139 async fn test_aggregate_ticket_should_not_aggregate_on_incoming_channel() -> anyhow::Result<()> {
3140 const COUNT_TICKETS: usize = 3;
3141
3142 let channel = ChannelEntry::new(
3143 ALICE.public().to_address(),
3144 BOB.public().to_address(),
3145 u32::MAX.into(),
3146 (COUNT_TICKETS + 1).into(),
3147 ChannelStatus::Open,
3148 4_u32.into(),
3149 );
3150
3151 let db = init_db_with_channel(channel).await?;
3152
3153 let tickets = (0..COUNT_TICKETS)
3154 .map(|i| {
3155 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3156 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3157 })
3158 .collect::<anyhow::Result<Vec<_>>>()?;
3159
3160 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3161 .await
3162 .expect_err("should not aggregate on incoming channel");
3163
3164 Ok(())
3165 }
3166
3167 #[tokio::test]
3168 async fn test_aggregate_ticket_should_not_aggregate_if_mismatching_channel_ids() -> anyhow::Result<()> {
3169 const COUNT_TICKETS: usize = 3;
3170
3171 let channel = ChannelEntry::new(
3172 ALICE.public().to_address(),
3173 BOB.public().to_address(),
3174 u32::MAX.into(),
3175 (COUNT_TICKETS + 1).into(),
3176 ChannelStatus::Open,
3177 4_u32.into(),
3178 );
3179
3180 let db = init_db_with_channel(channel).await?;
3181
3182 let mut tickets = (0..COUNT_TICKETS)
3183 .map(|i| {
3184 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3185 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3186 })
3187 .collect::<anyhow::Result<Vec<_>>>()?;
3188
3189 tickets[2] = generate_random_ack_ticket(&BOB, &ChainKeypair::random(), 2, 1, 1.0)
3190 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))?;
3191
3192 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3193 .await
3194 .expect_err("should not aggregate on mismatching channel ids");
3195
3196 Ok(())
3197 }
3198
3199 #[tokio::test]
3200 async fn test_aggregate_ticket_should_not_aggregate_if_mismatching_channel_epoch() -> anyhow::Result<()> {
3201 const COUNT_TICKETS: usize = 3;
3202
3203 let channel = ChannelEntry::new(
3204 ALICE.public().to_address(),
3205 BOB.public().to_address(),
3206 100.into(),
3207 (COUNT_TICKETS + 1).into(),
3208 ChannelStatus::Open,
3209 3_u32.into(),
3210 );
3211
3212 let db = init_db_with_channel(channel).await?;
3213
3214 let tickets = (0..COUNT_TICKETS)
3215 .map(|i| {
3216 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3217 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3218 })
3219 .collect::<anyhow::Result<Vec<_>>>()?;
3220
3221 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3222 .await
3223 .expect_err("should not aggregate on mismatching channel epoch");
3224
3225 Ok(())
3226 }
3227
3228 #[tokio::test]
3229 async fn test_aggregate_ticket_should_not_aggregate_if_ticket_indices_overlap() -> anyhow::Result<()> {
3230 const COUNT_TICKETS: usize = 3;
3231
3232 let channel = ChannelEntry::new(
3233 ALICE.public().to_address(),
3234 BOB.public().to_address(),
3235 u32::MAX.into(),
3236 (COUNT_TICKETS + 1).into(),
3237 ChannelStatus::Open,
3238 3_u32.into(),
3239 );
3240
3241 let db = init_db_with_channel(channel).await?;
3242
3243 let mut tickets = (0..COUNT_TICKETS)
3244 .map(|i| {
3245 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3246 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3247 })
3248 .collect::<anyhow::Result<Vec<_>>>()?;
3249
3250 tickets[1] = generate_random_ack_ticket(&BOB, &ALICE, 1, 2, 1.0)
3251 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))?;
3252
3253 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3254 .await
3255 .expect_err("should not aggregate on overlapping ticket indices");
3256 Ok(())
3257 }
3258
3259 #[tokio::test]
3260 async fn test_aggregate_ticket_should_not_aggregate_if_ticket_is_not_valid() -> anyhow::Result<()> {
3261 const COUNT_TICKETS: usize = 3;
3262
3263 let channel = ChannelEntry::new(
3264 ALICE.public().to_address(),
3265 BOB.public().to_address(),
3266 u32::MAX.into(),
3267 (COUNT_TICKETS + 1).into(),
3268 ChannelStatus::Open,
3269 3_u32.into(),
3270 );
3271
3272 let db = init_db_with_channel(channel).await?;
3273
3274 let mut tickets = (0..COUNT_TICKETS)
3275 .map(|i| {
3276 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3277 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3278 })
3279 .collect::<anyhow::Result<Vec<_>>>()?;
3280
3281 tickets[1].ticket.amount = (TICKET_VALUE - 10).into();
3283
3284 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3285 .await
3286 .expect_err("should not aggregate on invalid tickets");
3287
3288 Ok(())
3289 }
3290
3291 #[tokio::test]
3292 async fn test_aggregate_ticket_should_not_aggregate_if_ticket_has_lower_than_min_win_prob() -> anyhow::Result<()> {
3293 const COUNT_TICKETS: usize = 3;
3294
3295 let channel = ChannelEntry::new(
3296 ALICE.public().to_address(),
3297 BOB.public().to_address(),
3298 u32::MAX.into(),
3299 (COUNT_TICKETS + 1).into(),
3300 ChannelStatus::Open,
3301 3_u32.into(),
3302 );
3303
3304 let db = init_db_with_channel(channel).await?;
3305
3306 let tickets = (0..COUNT_TICKETS)
3307 .map(|i| {
3308 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, if i > 0 { 1.0 } else { 0.9 })
3309 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3310 })
3311 .collect::<anyhow::Result<Vec<_>>>()?;
3312
3313 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3314 .await
3315 .expect_err("should not aggregate tickets with less than minimum win prob");
3316
3317 Ok(())
3318 }
3319
3320 #[tokio::test]
3321 async fn test_aggregate_ticket_should_not_aggregate_if_ticket_is_not_winning() -> anyhow::Result<()> {
3322 const COUNT_TICKETS: usize = 3;
3323
3324 let channel = ChannelEntry::new(
3325 ALICE.public().to_address(),
3326 BOB.public().to_address(),
3327 u32::MAX.into(),
3328 (COUNT_TICKETS + 1).into(),
3329 ChannelStatus::Open,
3330 3_u32.into(),
3331 );
3332
3333 let db = init_db_with_channel(channel).await?;
3334
3335 let mut tickets = (0..COUNT_TICKETS)
3336 .map(|i| {
3337 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3338 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3339 })
3340 .collect::<anyhow::Result<Vec<_>>>()?;
3341
3342 let resp = Response::from_half_keys(&HalfKey::random(), &HalfKey::random())?;
3344 tickets[1] = TicketBuilder::from(tickets[1].ticket.clone())
3345 .win_prob(0.0.try_into()?)
3346 .challenge(resp.to_challenge().into())
3347 .build_signed(&BOB, &Hash::default())?
3348 .into_acknowledged(resp)
3349 .into_transferable(&ALICE, &Hash::default())?;
3350
3351 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3352 .await
3353 .expect_err("should not aggregate non-winning tickets");
3354
3355 Ok(())
3356 }
3357
3358 #[tokio::test]
3359 async fn test_set_ticket_statistics_when_tickets_are_in_db() -> anyhow::Result<()> {
3360 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
3361
3362 let ticket = init_db_with_tickets(&db, 1).await?.1.pop().unwrap();
3363
3364 db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed)
3365 .await
3366 .expect("must not fail");
3367
3368 let stats = db.get_ticket_statistics(None).await.expect("must not fail");
3369 assert_ne!(stats.redeemed_value, HoprBalance::zero());
3370
3371 db.reset_ticket_statistics().await.expect("must not fail");
3372
3373 let stats = db.get_ticket_statistics(None).await.expect("must not fail");
3374 assert_eq!(stats.redeemed_value, HoprBalance::zero());
3375
3376 Ok(())
3377 }
3378
3379 #[tokio::test]
3380 async fn test_fix_channels_ticket_state() -> anyhow::Result<()> {
3381 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
3382 const COUNT_TICKETS: u64 = 1;
3383
3384 let (..) = init_db_with_tickets(&db, COUNT_TICKETS).await?;
3385
3386 let mut ticket = hopr_db_entity::ticket::Entity::find()
3388 .one(&db.tickets_db)
3389 .await?
3390 .context("should have one active model")?
3391 .into_active_model();
3392 ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
3393 ticket.save(&db.tickets_db).await?;
3394
3395 assert!(
3396 hopr_db_entity::ticket::Entity::find()
3397 .one(&db.tickets_db)
3398 .await?
3399 .context("should have one active model")?
3400 .state
3401 == AcknowledgedTicketStatus::BeingRedeemed as i8,
3402 );
3403
3404 db.fix_channels_next_ticket_state().await.expect("must not fail");
3405
3406 assert!(
3407 hopr_db_entity::ticket::Entity::find()
3408 .one(&db.tickets_db)
3409 .await?
3410 .context("should have one active model")?
3411 .state
3412 == AcknowledgedTicketStatus::Untouched as i8,
3413 );
3414
3415 Ok(())
3416 }
3417
3418 #[tokio::test]
3419 async fn test_dont_fix_correct_channels_ticket_state() -> anyhow::Result<()> {
3420 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
3421 const COUNT_TICKETS: u64 = 2;
3422
3423 let (..) = init_db_with_tickets_and_channel(&db, COUNT_TICKETS, Some(1u32)).await?;
3425
3426 let mut ticket = hopr_db_entity::ticket::Entity::find()
3428 .filter(ticket::Column::Index.eq(0u64.to_be_bytes().to_vec()))
3429 .one(&db.tickets_db)
3430 .await?
3431 .context("should have one active model")?
3432 .into_active_model();
3433 ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
3434 ticket.save(&db.tickets_db).await?;
3435
3436 assert!(
3437 hopr_db_entity::ticket::Entity::find()
3438 .filter(ticket::Column::Index.eq(0u64.to_be_bytes().to_vec()))
3439 .one(&db.tickets_db)
3440 .await?
3441 .context("should have one active model")?
3442 .state
3443 == AcknowledgedTicketStatus::BeingRedeemed as i8,
3444 );
3445
3446 db.fix_channels_next_ticket_state().await.expect("must not fail");
3447
3448 let ticket0 = hopr_db_entity::ticket::Entity::find()
3450 .filter(ticket::Column::Index.eq(0u64.to_be_bytes().to_vec()))
3451 .one(&db.tickets_db)
3452 .await?
3453 .context("should have one active model")?;
3454 assert_eq!(ticket0.state, AcknowledgedTicketStatus::BeingRedeemed as i8);
3455
3456 let ticket1 = hopr_db_entity::ticket::Entity::find()
3458 .filter(ticket::Column::Index.eq(1u64.to_be_bytes().to_vec()))
3459 .one(&db.tickets_db)
3460 .await?
3461 .context("should have one active model")?;
3462 assert_eq!(ticket1.state, AcknowledgedTicketStatus::Untouched as i8);
3463
3464 Ok(())
3465 }
3466}