1use async_stream::stream;
2use async_trait::async_trait;
3use futures::stream::BoxStream;
4use futures::{StreamExt, TryStreamExt};
5use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QueryOrder, QuerySelect, Set};
6use sea_query::{Condition, Expr, IntoCondition, SimpleExpr};
7use std::cmp;
8use std::ops::{Add, Bound};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use tracing::{debug, error, info, trace, warn};
12
13use hopr_crypto_types::prelude::*;
14use hopr_db_api::prelude::{TicketIndexSelector, TicketMarker};
15use hopr_db_api::resolver::HoprDbResolverOperations;
16use hopr_db_api::tickets::AggregationPrerequisites;
17use hopr_db_api::{
18 errors::Result,
19 info::DomainSeparator,
20 tickets::{ChannelTicketStatistics, HoprDbTicketOperations, TicketSelector},
21};
22use hopr_db_entity::ticket_statistics;
23use hopr_db_entity::{outgoing_ticket_index, ticket};
24use hopr_internal_types::prelude::*;
25use hopr_primitive_types::prelude::*;
26
27use crate::channels::HoprDbChannelOperations;
28use crate::db::HoprDb;
29use crate::errors::DbSqlError;
30use crate::errors::DbSqlError::LogicalError;
31use crate::info::HoprDbInfoOperations;
32use crate::{HoprDbGeneralModelOperations, OpenTransaction, OptTx, TargetDb};
33
34#[cfg(all(feature = "prometheus", not(test)))]
35use hopr_metrics::metrics::MultiGauge;
36
37#[cfg(all(feature = "prometheus", not(test)))]
38lazy_static::lazy_static! {
39 pub static ref METRIC_HOPR_TICKETS_INCOMING_STATISTICS: MultiGauge = MultiGauge::new(
40 "hopr_tickets_incoming_statistics",
41 "Ticket statistics for channels with incoming tickets.",
42 &["channel", "statistic"]
43 ).unwrap();
44}
45
46pub const MAX_TICKETS_TO_AGGREGATE_BATCH: u64 = 500;
48
49#[derive(Clone)]
53pub(crate) struct WrappedTicketSelector(pub(crate) TicketSelector);
54
55impl From<TicketSelector> for WrappedTicketSelector {
56 fn from(selector: TicketSelector) -> Self {
57 Self(selector)
58 }
59}
60
61impl AsRef<TicketSelector> for WrappedTicketSelector {
62 fn as_ref(&self) -> &TicketSelector {
63 &self.0
64 }
65}
66
67impl IntoCondition for WrappedTicketSelector {
68 fn into_condition(self) -> Condition {
69 let expr = self
70 .0
71 .channel_identifiers
72 .into_iter()
73 .map(|(channel_id, epoch)| {
74 ticket::Column::ChannelId
75 .eq(channel_id.to_hex())
76 .and(ticket::Column::ChannelEpoch.eq(epoch.to_be_bytes().to_vec()))
77 })
78 .reduce(SimpleExpr::or);
79
80 if expr.is_none() {
82 return Condition::any().not();
83 }
84
85 let mut expr = expr.unwrap();
86
87 match self.0.index {
88 TicketIndexSelector::None => {
89 }
91 TicketIndexSelector::Single(idx) => expr = expr.and(ticket::Column::Index.eq(idx.to_be_bytes().to_vec())),
92 TicketIndexSelector::Multiple(idxs) => {
93 expr = expr.and(ticket::Column::Index.is_in(idxs.into_iter().map(|i| i.to_be_bytes().to_vec())));
94 }
95 TicketIndexSelector::Range((lb, ub)) => {
96 expr = match lb {
97 Bound::Included(gte) => expr.and(ticket::Column::Index.gte(gte.to_be_bytes().to_vec())),
98 Bound::Excluded(gt) => expr.and(ticket::Column::Index.gt(gt.to_be_bytes().to_vec())),
99 Bound::Unbounded => expr,
100 };
101 expr = match ub {
102 Bound::Included(lte) => expr.and(ticket::Column::Index.lte(lte.to_be_bytes().to_vec())),
103 Bound::Excluded(lt) => expr.and(ticket::Column::Index.lt(lt.to_be_bytes().to_vec())),
104 Bound::Unbounded => expr,
105 };
106 }
107 }
108
109 if let Some(state) = self.0.state {
110 expr = expr.and(ticket::Column::State.eq(state as u8))
111 }
112
113 if self.0.only_aggregated {
114 expr = expr.and(ticket::Column::IndexOffset.gt(1));
115 }
116
117 expr = match self.0.win_prob.0 {
119 Bound::Included(gte) => expr.and(ticket::Column::WinningProbability.gte(gte.to_vec())),
120 Bound::Excluded(gt) => expr.and(ticket::Column::WinningProbability.gt(gt.to_vec())),
121 Bound::Unbounded => expr,
122 };
123
124 expr = match self.0.win_prob.1 {
126 Bound::Included(lte) => expr.and(ticket::Column::WinningProbability.lte(lte.to_vec())),
127 Bound::Excluded(lt) => expr.and(ticket::Column::WinningProbability.lt(lt.to_vec())),
128 Bound::Unbounded => expr,
129 };
130
131 expr = match self.0.amount.0 {
133 Bound::Included(gte) => expr.and(ticket::Column::Amount.gte(gte.amount().to_be_bytes().to_vec())),
134 Bound::Excluded(gt) => expr.and(ticket::Column::Amount.gt(gt.amount().to_be_bytes().to_vec())),
135 Bound::Unbounded => expr,
136 };
137
138 expr = match self.0.amount.1 {
140 Bound::Included(lte) => expr.and(ticket::Column::Amount.lte(lte.amount().to_be_bytes().to_vec())),
141 Bound::Excluded(lt) => expr.and(ticket::Column::Amount.lt(lt.amount().to_be_bytes().to_vec())),
142 Bound::Unbounded => expr,
143 };
144
145 expr.into_condition()
146 }
147}
148
149pub(crate) fn filter_satisfying_ticket_models(
157 prerequisites: AggregationPrerequisites,
158 models: Vec<ticket::Model>,
159 channel_entry: &ChannelEntry,
160 min_win_prob: f64,
161) -> crate::errors::Result<Vec<ticket::Model>> {
162 let channel_id = channel_entry.get_id();
163
164 let mut to_be_aggregated = Vec::with_capacity(models.len());
165 let mut total_balance = BalanceType::HOPR.zero();
166
167 for m in models {
168 let ticket_wp = win_prob_to_f64(
169 m.winning_probability
170 .as_slice()
171 .try_into()
172 .map_err(|_| DbSqlError::DecodingError)?,
173 );
174 if !f64_approx_eq(ticket_wp, min_win_prob, LOWEST_POSSIBLE_WINNING_PROB) && ticket_wp < min_win_prob {
175 warn!(
176 channel_id = %channel_entry.get_id(),
177 ticket_wp, min_win_prob, "encountered ticket with winning probability lower than the minimum threshold"
178 );
179 continue;
180 }
181
182 let to_add = BalanceType::HOPR.balance_bytes(&m.amount);
183
184 total_balance = total_balance + to_add;
186 if total_balance.gt(&channel_entry.balance) {
187 total_balance = total_balance - to_add;
189 break;
190 }
191
192 to_be_aggregated.push(m);
193 }
194
195 if prerequisites.min_ticket_count.is_none() && prerequisites.min_unaggregated_ratio.is_none() {
197 info!(channel = %channel_id, "Aggregation check OK, no aggregation prerequisites were given");
198 return Ok(to_be_aggregated);
199 }
200
201 let to_be_agg_count = to_be_aggregated.len();
202
203 if let Some(agg_threshold) = prerequisites.min_ticket_count {
205 if to_be_agg_count >= agg_threshold {
206 info!(channel = %channel_id, count = to_be_agg_count, threshold = agg_threshold, "Aggregation check OK aggregated value greater than threshold");
207 return Ok(to_be_aggregated);
208 } else {
209 debug!(channel = %channel_id, count = to_be_agg_count, threshold = agg_threshold,"Aggregation check FAIL not enough resources to aggregate");
210 }
211 }
212
213 if let Some(unrealized_threshold) = prerequisites.min_unaggregated_ratio {
214 let diminished_balance = channel_entry.balance.mul_f64(unrealized_threshold)?;
215
216 if total_balance.ge(&diminished_balance) {
219 if to_be_agg_count > 1 {
220 info!(channel = %channel_id, count = to_be_agg_count, balance = ?total_balance, ?diminished_balance, "Aggregation check OK: more unrealized than diminished balance");
221 return Ok(to_be_aggregated);
222 } else {
223 debug!(channel = %channel_id, count = to_be_agg_count, balance = ?total_balance, ?diminished_balance, "Aggregation check FAIL: more unrealized than diminished balance but only 1 ticket");
224 }
225 } else {
226 debug!(channel = %channel_id, count = to_be_agg_count, balance = ?total_balance, ?diminished_balance, "Aggregation check FAIL: less unrealized than diminished balance");
227 }
228 }
229
230 debug!(channel = %channel_id,"Aggregation check FAIL: no prerequisites were met");
231 Ok(vec![])
232}
233
234pub(crate) async fn find_stats_for_channel(
235 tx: &OpenTransaction,
236 channel_id: &Hash,
237) -> crate::errors::Result<ticket_statistics::Model> {
238 if let Some(model) = ticket_statistics::Entity::find()
239 .filter(ticket_statistics::Column::ChannelId.eq(channel_id.to_hex()))
240 .one(tx.as_ref())
241 .await?
242 {
243 Ok(model)
244 } else {
245 let new_stats = ticket_statistics::ActiveModel {
246 channel_id: Set(channel_id.to_hex()),
247 ..Default::default()
248 }
249 .insert(tx.as_ref())
250 .await?;
251
252 Ok(new_stats)
253 }
254}
255
256impl HoprDb {
257 async fn get_tickets_value_int<'a>(&'a self, tx: OptTx<'a>, selector: TicketSelector) -> Result<(usize, Balance)> {
258 let selector: WrappedTicketSelector = selector.into();
259 Ok(self
260 .nest_transaction_in_db(tx, TargetDb::Tickets)
261 .await?
262 .perform(|tx| {
263 Box::pin(async move {
264 ticket::Entity::find()
265 .filter(selector)
266 .stream(tx.as_ref())
267 .await
268 .map_err(DbSqlError::from)?
269 .map_err(DbSqlError::from)
270 .try_fold((0_usize, BalanceType::HOPR.zero()), |(count, value), t| async move {
271 Ok((count + 1, value + BalanceType::HOPR.balance_bytes(t.amount)))
272 })
273 .await
274 })
275 })
276 .await?)
277 }
278}
279
280#[async_trait]
281impl HoprDbTicketOperations for HoprDb {
282 async fn get_all_tickets(&self) -> Result<Vec<AcknowledgedTicket>> {
283 Ok(self
284 .nest_transaction_in_db(None, TargetDb::Tickets)
285 .await?
286 .perform(|tx| {
287 Box::pin(async move {
288 ticket::Entity::find()
289 .all(tx.as_ref())
290 .await?
291 .into_iter()
292 .map(AcknowledgedTicket::try_from)
293 .collect::<hopr_db_entity::errors::Result<Vec<_>>>()
294 .map_err(DbSqlError::from)
295 })
296 })
297 .await?)
298 }
299
300 async fn get_tickets(&self, selector: TicketSelector) -> Result<Vec<AcknowledgedTicket>> {
301 debug!("fetching tickets via {selector}");
302 let selector: WrappedTicketSelector = selector.into();
303
304 Ok(self
305 .nest_transaction_in_db(None, TargetDb::Tickets)
306 .await?
307 .perform(|tx| {
308 Box::pin(async move {
309 ticket::Entity::find()
310 .filter(selector)
311 .all(tx.as_ref())
312 .await?
313 .into_iter()
314 .map(AcknowledgedTicket::try_from)
315 .collect::<hopr_db_entity::errors::Result<Vec<_>>>()
316 .map_err(DbSqlError::from)
317 })
318 })
319 .await?)
320 }
321
322 async fn mark_tickets_as(&self, selector: TicketSelector, mark_as: TicketMarker) -> Result<usize> {
323 let myself = self.clone();
324 Ok(self
325 .ticket_manager
326 .with_write_locked_db(|tx| {
327 Box::pin(async move {
328 let mut total_marked_count = 0;
329 for (channel_id, epoch) in selector.channel_identifiers.iter() {
330 let channel_selector = selector.clone().just_on_channel(*channel_id, epoch);
331
332 let (marked_count, marked_value) =
334 myself.get_tickets_value_int(Some(tx), channel_selector.clone()).await?;
335 trace!(marked_count, ?marked_value, ?mark_as, "ticket marking");
336
337 if marked_count > 0 {
338 let deleted = ticket::Entity::delete_many()
340 .filter(WrappedTicketSelector::from(channel_selector.clone()))
341 .exec(tx.as_ref())
342 .await?;
343
344 if deleted.rows_affected == marked_count as u64 {
346 let mut new_stats = find_stats_for_channel(tx, channel_id).await?.into_active_model();
347 let _current_value = match mark_as {
348 TicketMarker::Redeemed => {
349 let current_value = U256::from_be_bytes(new_stats.redeemed_value.as_ref());
350 new_stats.redeemed_value =
351 Set((current_value + marked_value.amount()).to_be_bytes().into());
352 current_value
353 }
354 TicketMarker::Rejected => {
355 let current_value = U256::from_be_bytes(new_stats.rejected_value.as_ref());
356 new_stats.rejected_value =
357 Set((current_value + marked_value.amount()).to_be_bytes().into());
358 current_value
359 }
360 TicketMarker::Neglected => {
361 let current_value = U256::from_be_bytes(new_stats.neglected_value.as_ref());
362 new_stats.neglected_value =
363 Set((current_value + marked_value.amount()).to_be_bytes().into());
364 current_value
365 }
366 };
367 new_stats.save(tx.as_ref()).await?;
368
369 #[cfg(all(feature = "prometheus", not(test)))]
370 {
371 let channel = channel_id.to_string();
372 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
373 &[&channel, &mark_as.to_string()],
374 (_current_value + marked_value.amount()).as_u128() as f64,
375 );
376
377 if mark_as != TicketMarker::Rejected {
380 let unredeemed_value = myself
381 .caches
382 .unrealized_value
383 .get(&(*channel_id, *epoch))
384 .await
385 .unwrap_or(Balance::zero(BalanceType::HOPR));
386
387 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
388 &[&channel, "unredeemed"],
389 (unredeemed_value - marked_value.amount()).amount().as_u128() as f64,
390 );
391 }
392 }
393
394 myself.caches.unrealized_value.invalidate(&(*channel_id, *epoch)).await;
395 } else {
396 return Err(DbSqlError::LogicalError(format!(
397 "could not mark {marked_count} ticket as {mark_as}"
398 )));
399 }
400
401 trace!(marked_count, ?channel_id, ?mark_as, "removed tickets in channel");
402 total_marked_count += marked_count;
403 }
404 }
405
406 info!(
407 count = total_marked_count,
408 ?mark_as,
409 channel_count = selector.channel_identifiers.len(),
410 "removed tickets in channels",
411 );
412 Ok(total_marked_count)
413 })
414 })
415 .await?)
416 }
417
418 async fn mark_unsaved_ticket_rejected(&self, ticket: &Ticket) -> Result<()> {
419 let channel_id = ticket.channel_id;
420 let amount = ticket.amount;
421 Ok(self
422 .ticket_manager
423 .with_write_locked_db(|tx| {
424 Box::pin(async move {
425 let stats = find_stats_for_channel(tx, &channel_id).await?;
426
427 let current_rejected_value = U256::from_be_bytes(stats.rejected_value.clone());
428
429 let mut active_stats = stats.into_active_model();
430 active_stats.rejected_value = Set((current_rejected_value + amount.amount()).to_be_bytes().into());
431 active_stats.save(tx.as_ref()).await?;
432
433 #[cfg(all(feature = "prometheus", not(test)))]
434 {
435 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
436 &[&channel_id.to_string(), "rejected"],
437 (current_rejected_value + amount.amount()).as_u128() as f64,
438 );
439 }
440
441 Ok::<(), DbSqlError>(())
442 })
443 })
444 .await?)
445 }
446
447 async fn update_ticket_states_and_fetch<'a>(
448 &'a self,
449 selector: TicketSelector,
450 new_state: AcknowledgedTicketStatus,
451 ) -> Result<BoxStream<'a, AcknowledgedTicket>> {
452 let selector: WrappedTicketSelector = selector.into();
453 Ok(Box::pin(stream! {
454 match ticket::Entity::find()
455 .filter(selector)
456 .stream(self.conn(TargetDb::Tickets))
457 .await {
458 Ok(mut stream) => {
459 while let Ok(Some(ticket)) = stream.try_next().await {
460 let active_ticket = ticket::ActiveModel {
461 id: Set(ticket.id),
462 state: Set(new_state as i8),
463 ..Default::default()
464 };
465
466 {
467 let _g = self.ticket_manager.mutex.lock();
468 if let Err(e) = active_ticket.update(self.conn(TargetDb::Tickets)).await {
469 error!(error = %e,"failed to update ticket in the db");
470 }
471 }
472
473 match AcknowledgedTicket::try_from(ticket) {
474 Ok(mut ticket) => {
475 ticket.status = new_state;
477 yield ticket
478 },
479 Err(e) => {
480 tracing::error!(error = %e, "failed to decode ticket from the db");
481 }
482 }
483 }
484 },
485 Err(e) => tracing::error!(error = %e, "failed open ticket db stream")
486 }
487 }))
488 }
489
490 async fn update_ticket_states(
491 &self,
492 selector: TicketSelector,
493 new_state: AcknowledgedTicketStatus,
494 ) -> Result<usize> {
495 let selector: WrappedTicketSelector = selector.into();
496 Ok(self
497 .ticket_manager
498 .with_write_locked_db(|tx| {
499 Box::pin(async move {
500 let update = ticket::Entity::update_many()
501 .filter(selector)
502 .col_expr(ticket::Column::State, Expr::value(new_state as u8))
503 .exec(tx.as_ref())
504 .await?;
505 Ok::<_, DbSqlError>(update.rows_affected as usize)
506 })
507 })
508 .await?)
509 }
510
511 async fn get_ticket_statistics(&self, channel_id: Option<Hash>) -> Result<ChannelTicketStatistics> {
512 let res = match channel_id {
513 None => {
514 #[cfg(all(feature = "prometheus", not(test)))]
515 let mut per_channel_unredeemed = std::collections::HashMap::new();
516
517 self.nest_transaction_in_db(None, TargetDb::Tickets)
518 .await?
519 .perform(|tx| {
520 Box::pin(async move {
521 let unredeemed_value = ticket::Entity::find()
522 .stream(tx.as_ref())
523 .await?
524 .try_fold(U256::zero(), |amount, x| {
525 let unredeemed_value = U256::from_be_bytes(x.amount);
526
527 #[cfg(all(feature = "prometheus", not(test)))]
528 per_channel_unredeemed
529 .entry(x.channel_id)
530 .and_modify(|v| *v += unredeemed_value)
531 .or_insert(unredeemed_value);
532
533 futures::future::ok(amount + unredeemed_value)
534 })
535 .await?;
536
537 #[cfg(all(feature = "prometheus", not(test)))]
538 for (channel_id, unredeemed_value) in per_channel_unredeemed {
539 METRIC_HOPR_TICKETS_INCOMING_STATISTICS
540 .set(&[&channel_id, "unredeemed"], unredeemed_value.as_u128() as f64);
541 }
542
543 let mut all_stats = ticket_statistics::Entity::find()
544 .all(tx.as_ref())
545 .await?
546 .into_iter()
547 .fold(ChannelTicketStatistics::default(), |mut acc, stats| {
548 let neglected_value = BalanceType::HOPR.balance_bytes(stats.neglected_value);
549 acc.neglected_value = acc.neglected_value + neglected_value;
550 let redeemed_value = BalanceType::HOPR.balance_bytes(stats.redeemed_value);
551 acc.redeemed_value = acc.redeemed_value + redeemed_value;
552 let rejected_value = BalanceType::HOPR.balance_bytes(stats.rejected_value);
553 acc.rejected_value = acc.rejected_value + rejected_value;
554 acc.winning_tickets += stats.winning_tickets as u128;
555
556 #[cfg(all(feature = "prometheus", not(test)))]
557 {
558 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
559 &[&stats.channel_id, "neglected"],
560 neglected_value.amount().as_u128() as f64,
561 );
562
563 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
564 &[&stats.channel_id, "redeemed"],
565 redeemed_value.amount().as_u128() as f64,
566 );
567
568 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
569 &[&stats.channel_id, "rejected"],
570 rejected_value.amount().as_u128() as f64,
571 );
572 }
573
574 acc
575 });
576
577 all_stats.unredeemed_value = BalanceType::HOPR.balance(unredeemed_value);
578
579 Ok::<_, DbSqlError>(all_stats)
580 })
581 })
582 .await
583 }
584 Some(channel) => {
585 if self.get_channel_by_id(None, &channel).await?.is_none() {
588 return Err(DbSqlError::ChannelNotFound(channel).into());
589 }
590
591 self.nest_transaction_in_db(None, TargetDb::Tickets)
592 .await?
593 .perform(|tx| {
594 Box::pin(async move {
595 let stats = find_stats_for_channel(tx, &channel).await?;
596 let unredeemed_value = ticket::Entity::find()
597 .filter(ticket::Column::ChannelId.eq(channel.to_hex()))
598 .stream(tx.as_ref())
599 .await?
600 .try_fold(U256::zero(), |amount, x| {
601 futures::future::ok(amount + U256::from_be_bytes(x.amount))
602 })
603 .await?;
604
605 Ok::<_, DbSqlError>(ChannelTicketStatistics {
606 winning_tickets: stats.winning_tickets as u128,
607 neglected_value: BalanceType::HOPR.balance_bytes(stats.neglected_value),
608 redeemed_value: BalanceType::HOPR.balance_bytes(stats.redeemed_value),
609 unredeemed_value: BalanceType::HOPR.balance(unredeemed_value),
610 rejected_value: BalanceType::HOPR.balance_bytes(stats.rejected_value),
611 })
612 })
613 })
614 .await
615 }
616 };
617 Ok(res?)
618 }
619
620 async fn reset_ticket_statistics(&self) -> Result<()> {
621 let res = self
622 .nest_transaction_in_db(None, TargetDb::Tickets)
623 .await?
624 .perform(|tx| {
625 Box::pin(async move {
626 #[cfg(all(feature = "prometheus", not(test)))]
627 let rows = ticket_statistics::Entity::find().all(tx.as_ref()).await?;
628
629 let deleted = ticket_statistics::Entity::delete_many().exec(tx.as_ref()).await?;
631
632 #[cfg(all(feature = "prometheus", not(test)))]
633 {
634 if deleted.rows_affected > 0 {
635 for row in rows {
636 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&[&row.channel_id, "neglected"], 0.0_f64);
637
638 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&[&row.channel_id, "redeemed"], 0.0_f64);
639
640 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&[&row.channel_id, "rejected"], 0.0_f64);
641 }
642 }
643 }
644
645 debug!("reset ticket statistics for {:} channel(s)", deleted.rows_affected);
646
647 Ok::<_, DbSqlError>(())
648 })
649 })
650 .await;
651
652 Ok(res?)
653 }
654
655 async fn get_tickets_value(&self, selector: TicketSelector) -> Result<(usize, Balance)> {
656 self.get_tickets_value_int(None, selector).await
657 }
658
659 async fn compare_and_set_outgoing_ticket_index(&self, channel_id: Hash, index: u64) -> Result<u64> {
660 let old_value = self
661 .get_outgoing_ticket_index(channel_id)
662 .await?
663 .fetch_max(index, Ordering::SeqCst);
664
665 Ok(old_value)
669 }
670
671 async fn reset_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64> {
672 let old_value = self
673 .get_outgoing_ticket_index(channel_id)
674 .await?
675 .swap(0, Ordering::SeqCst);
676
677 Ok(old_value)
681 }
682
683 async fn increment_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64> {
684 let old_value = self
685 .get_outgoing_ticket_index(channel_id)
686 .await?
687 .fetch_add(1, Ordering::SeqCst);
688
689 Ok(old_value)
692 }
693
694 async fn get_outgoing_ticket_index(&self, channel_id: Hash) -> Result<Arc<AtomicU64>> {
695 let tkt_manager = self.ticket_manager.clone();
696
697 Ok(self
698 .caches
699 .ticket_index
700 .try_get_with(channel_id, async move {
701 let maybe_index = outgoing_ticket_index::Entity::find()
702 .filter(outgoing_ticket_index::Column::ChannelId.eq(channel_id.to_hex()))
703 .one(&tkt_manager.tickets_db)
704 .await?;
705
706 Ok(Arc::new(AtomicU64::new(match maybe_index {
707 Some(model) => U256::from_be_bytes(model.index).as_u64(),
708 None => {
709 tkt_manager
710 .with_write_locked_db(|tx| {
711 Box::pin(async move {
712 outgoing_ticket_index::ActiveModel {
713 channel_id: Set(channel_id.to_hex()),
714 ..Default::default()
715 }
716 .insert(tx.as_ref())
717 .await?;
718 Ok::<_, DbSqlError>(())
719 })
720 })
721 .await?;
722 0_u64
723 }
724 })))
725 })
726 .await
727 .map_err(|e: Arc<DbSqlError>| LogicalError(format!("failed to retrieve ticket index: {e}")))?)
728 }
729
730 async fn persist_outgoing_ticket_indices(&self) -> Result<usize> {
731 let outgoing_indices = outgoing_ticket_index::Entity::find()
732 .all(&self.tickets_db)
733 .await
734 .map_err(DbSqlError::from)?;
735
736 let mut updated = 0;
737 for index_model in outgoing_indices {
738 let channel_id = Hash::from_hex(&index_model.channel_id).map_err(DbSqlError::from)?;
739 let db_index = U256::from_be_bytes(&index_model.index).as_u64();
740 if let Some(cached_index) = self.caches.ticket_index.get(&channel_id).await {
741 let cached_index = cached_index.load(Ordering::SeqCst);
745
746 if cached_index > db_index {
748 let mut index_active_model = index_model.into_active_model();
749 index_active_model.index = Set(cached_index.to_be_bytes().to_vec());
750 self.ticket_manager
751 .with_write_locked_db(|wtx| {
752 Box::pin(async move {
753 index_active_model.save(wtx.as_ref()).await?;
754 Ok::<_, DbSqlError>(())
755 })
756 })
757 .await?;
758
759 debug!("updated ticket index in channel {channel_id} from {db_index} to {cached_index}");
760 updated += 1;
761 }
762 } else {
763 trace!(?channel_id, "channel not in cache yet");
766 }
767 }
768
769 Ok(Ok::<_, DbSqlError>(updated)?)
770 }
771
772 async fn prepare_aggregation_in_channel(
773 &self,
774 channel: &Hash,
775 prerequisites: AggregationPrerequisites,
776 ) -> Result<Option<(OffchainPublicKey, Vec<TransferableWinningTicket>, Hash)>> {
777 let myself = self.clone();
778
779 let channel_id = *channel;
780
781 let (channel_entry, peer, domain_separator, min_win_prob) = self
782 .nest_transaction_in_db(None, TargetDb::Index)
783 .await?
784 .perform(|tx| {
785 Box::pin(async move {
786 let entry = myself
787 .get_channel_by_id(Some(tx), &channel_id)
788 .await?
789 .ok_or(DbSqlError::ChannelNotFound(channel_id))?;
790
791 if entry.status == ChannelStatus::Closed {
792 return Err(DbSqlError::LogicalError(format!("channel '{channel_id}' is closed")));
793 } else if entry.direction(&myself.me_onchain) != Some(ChannelDirection::Incoming) {
794 return Err(DbSqlError::LogicalError(format!(
795 "channel '{channel_id}' is not incoming"
796 )));
797 }
798
799 let pk = myself
800 .resolve_packet_key(&entry.source)
801 .await?
802 .ok_or(DbSqlError::LogicalError(format!(
803 "peer '{}' has no offchain key record",
804 entry.source
805 )))?;
806
807 let indexer_data = myself.get_indexer_data(Some(tx)).await?;
808
809 let domain_separator = indexer_data
810 .channels_dst
811 .ok_or_else(|| crate::errors::DbSqlError::LogicalError("domain separator missing".into()))?;
812
813 Ok((
814 entry,
815 pk,
816 domain_separator,
817 indexer_data.minimum_incoming_ticket_winning_prob,
818 ))
819 })
820 })
821 .await?;
822
823 let myself = self.clone();
824 let tickets = self
825 .ticket_manager
826 .with_write_locked_db(|tx| {
827 Box::pin(async move {
828 if ticket::Entity::find()
830 .filter(WrappedTicketSelector::from(
831 TicketSelector::from(&channel_entry).with_state(AcknowledgedTicketStatus::BeingAggregated),
832 ))
833 .one(tx.as_ref())
834 .await?
835 .is_some()
836 {
837 return Err(DbSqlError::LogicalError(format!(
838 "'{channel_entry}' is already being aggregated",
839 )));
840 }
841
842 let first_idx_to_take = ticket::Entity::find()
844 .filter(WrappedTicketSelector::from(
845 TicketSelector::from(&channel_entry).with_state(AcknowledgedTicketStatus::BeingRedeemed),
846 ))
847 .order_by_desc(ticket::Column::Index)
848 .one(tx.as_ref())
849 .await?
850 .map(|m| U256::from_be_bytes(m.index).as_u64() + 1)
851 .unwrap_or(0_u64) .max(channel_entry.ticket_index.as_u64()); let to_be_aggregated = ticket::Entity::find()
856 .filter(WrappedTicketSelector::from(TicketSelector::from(&channel_entry)))
857 .filter(ticket::Column::Index.gte(first_idx_to_take.to_be_bytes().to_vec()))
858 .filter(ticket::Column::State.ne(AcknowledgedTicketStatus::BeingAggregated as u8))
859 .order_by_asc(ticket::Column::Index).limit(MAX_TICKETS_TO_AGGREGATE_BATCH)
861 .all(tx.as_ref())
862 .await?;
863
864 let mut to_be_aggregated: Vec<TransferableWinningTicket> =
866 filter_satisfying_ticket_models(prerequisites, to_be_aggregated, &channel_entry, min_win_prob)?
867 .into_iter()
868 .map(|model| {
869 AcknowledgedTicket::try_from(model)
870 .map_err(DbSqlError::from)
871 .and_then(|ack| {
872 ack.into_transferable(&myself.chain_key, &domain_separator)
873 .map_err(DbSqlError::from)
874 })
875 })
876 .collect::<crate::errors::Result<Vec<_>>>()?;
877
878 let mut neglected_idxs = Vec::new();
879
880 if !to_be_aggregated.is_empty() {
881 let first_ticket = to_be_aggregated[0].ticket.clone();
886 let mut i = 1;
887 while i < to_be_aggregated.len() {
888 let current_idx = to_be_aggregated[i].ticket.index;
889 if (first_ticket.index..first_ticket.index + first_ticket.index_offset as u64).contains(¤t_idx) {
890 warn!(ticket_id = current_idx, channel = %channel_id, ?first_ticket, "ticket in channel has been already aggregated and will be removed");
894 neglected_idxs.push(current_idx);
895 to_be_aggregated.remove(i);
896 } else {
897 i += 1;
898 }
899 }
900
901 if !neglected_idxs.is_empty() {
904 warn!(count = neglected_idxs.len(), channel = %channel_id, "tickets were neglected due to duplication in an aggregated ticket!");
905 }
906
907 let marked: sea_orm::UpdateResult = ticket::Entity::update_many()
909 .filter(WrappedTicketSelector::from(TicketSelector::from(&channel_entry)))
910 .filter(ticket::Column::Index.is_in(to_be_aggregated.iter().map(|t| t.ticket.index.to_be_bytes().to_vec())))
911 .filter(ticket::Column::State.ne(AcknowledgedTicketStatus::BeingAggregated as u8))
912 .col_expr(
913 ticket::Column::State,
914 Expr::value(AcknowledgedTicketStatus::BeingAggregated as i8),
915 )
916 .exec(tx.as_ref())
917 .await?;
918
919 if marked.rows_affected as usize != to_be_aggregated.len() {
920 return Err(DbSqlError::LogicalError(format!(
921 "expected to mark {}, but was able to mark {}",
922 to_be_aggregated.len(),
923 marked.rows_affected,
924 )));
925 }
926 }
927
928 debug!(
929 "prepared {} tickets to aggregate in {} ({})",
930 to_be_aggregated.len(),
931 channel_entry.get_id(),
932 channel_entry.channel_epoch,
933 );
934
935 Ok(to_be_aggregated)
936 })
937 })
938 .await?;
939
940 Ok((!tickets.is_empty()).then_some((peer, tickets, domain_separator)))
941 }
942
943 async fn rollback_aggregation_in_channel(&self, channel: Hash) -> Result<()> {
944 let channel_entry = self
945 .get_channel_by_id(None, &channel)
946 .await?
947 .ok_or(DbSqlError::ChannelNotFound(channel))?;
948
949 let selector = TicketSelector::from(channel_entry).with_state(AcknowledgedTicketStatus::BeingAggregated);
950
951 let reverted = self
952 .update_ticket_states(selector, AcknowledgedTicketStatus::Untouched)
953 .await?;
954
955 info!(
956 "rollback happened for ticket aggregation in '{channel}' with {reverted} tickets rolled back as a result",
957 );
958 Ok(())
959 }
960
961 async fn process_received_aggregated_ticket(
962 &self,
963 aggregated_ticket: Ticket,
964 chain_keypair: &ChainKeypair,
965 ) -> Result<AcknowledgedTicket> {
966 if chain_keypair.public().to_address() != self.me_onchain {
967 return Err(DbSqlError::LogicalError(
968 "chain key for ticket aggregation does not match the DB public address".into(),
969 )
970 .into());
971 }
972
973 let myself = self.clone();
974 let channel_id = aggregated_ticket.channel_id;
975
976 let (channel_entry, domain_separator) = self
977 .nest_transaction_in_db(None, TargetDb::Index)
978 .await?
979 .perform(|tx| {
980 Box::pin(async move {
981 let entry = myself
982 .get_channel_by_id(Some(tx), &channel_id)
983 .await?
984 .ok_or(DbSqlError::ChannelNotFound(channel_id))?;
985
986 if entry.status == ChannelStatus::Closed {
987 return Err(DbSqlError::LogicalError(format!("channel '{channel_id}' is closed")));
988 } else if entry.direction(&myself.me_onchain) != Some(ChannelDirection::Incoming) {
989 return Err(DbSqlError::LogicalError(format!(
990 "channel '{channel_id}' is not incoming"
991 )));
992 }
993
994 let domain_separator =
995 myself.get_indexer_data(Some(tx)).await?.channels_dst.ok_or_else(|| {
996 crate::errors::DbSqlError::LogicalError("domain separator missing".into())
997 })?;
998
999 Ok((entry, domain_separator))
1000 })
1001 })
1002 .await?;
1003
1004 let aggregated_ticket = aggregated_ticket
1006 .verify(&channel_entry.source, &domain_separator)
1007 .map_err(|e| {
1008 DbSqlError::LogicalError(format!(
1009 "failed to verify received aggregated ticket in {channel_id}: {e}"
1010 ))
1011 })?;
1012
1013 if !f64_approx_eq(aggregated_ticket.win_prob(), 1.0, LOWEST_POSSIBLE_WINNING_PROB) {
1015 return Err(DbSqlError::LogicalError("Aggregated tickets must have 100% win probability".into()).into());
1016 }
1017
1018 let acknowledged_tickets = self
1019 .nest_transaction_in_db(None, TargetDb::Tickets)
1020 .await?
1021 .perform(|tx| {
1022 Box::pin(async move {
1023 ticket::Entity::find()
1024 .filter(WrappedTicketSelector::from(
1025 TicketSelector::from(&channel_entry).with_state(AcknowledgedTicketStatus::BeingAggregated),
1026 ))
1027 .all(tx.as_ref())
1028 .await
1029 .map_err(DbSqlError::BackendError)
1030 })
1031 })
1032 .await?;
1033
1034 if acknowledged_tickets.is_empty() {
1035 debug!("Received unexpected aggregated ticket in channel {channel_id}");
1036 return Err(DbSqlError::LogicalError(format!(
1037 "failed insert aggregated ticket, because no tickets seem to be aggregated for '{channel_id}'",
1038 ))
1039 .into());
1040 }
1041
1042 let stored_value = acknowledged_tickets
1043 .iter()
1044 .map(|m| BalanceType::HOPR.balance_bytes(&m.amount))
1045 .fold(Balance::zero(BalanceType::HOPR), |acc, amount| acc.add(amount));
1046
1047 if aggregated_ticket.verified_ticket().amount.lt(&stored_value) {
1049 error!(channel = %channel_id, "Aggregated ticket value in channel is lower than sum of stored tickets");
1050 return Err(DbSqlError::LogicalError("Value of received aggregated ticket is too low".into()).into());
1051 }
1052
1053 let acknowledged_tickets = acknowledged_tickets
1054 .into_iter()
1055 .map(AcknowledgedTicket::try_from)
1056 .collect::<hopr_db_entity::errors::Result<Vec<AcknowledgedTicket>>>()
1057 .map_err(DbSqlError::from)?;
1058
1059 let first_stored_ticket = acknowledged_tickets.first().unwrap();
1061
1062 #[allow(unused_variables)]
1064 let current_ticket_index_from_aggregated_ticket =
1065 U256::from(aggregated_ticket.verified_ticket().index).add(aggregated_ticket.verified_ticket().index_offset);
1066
1067 let acked_aggregated_ticket = aggregated_ticket.into_acknowledged(first_stored_ticket.response.clone());
1068
1069 let ticket = acked_aggregated_ticket.clone();
1070 self.ticket_manager.replace_tickets(ticket).await?;
1071
1072 info!(%acked_aggregated_ticket, "successfully processed received aggregated ticket");
1073 Ok(acked_aggregated_ticket)
1074 }
1075
1076 async fn aggregate_tickets(
1077 &self,
1078 destination: OffchainPublicKey,
1079 mut acked_tickets: Vec<TransferableWinningTicket>,
1080 me: &ChainKeypair,
1081 ) -> Result<VerifiedTicket> {
1082 if me.public().to_address() != self.me_onchain {
1083 return Err(DbSqlError::LogicalError(
1084 "chain key for ticket aggregation does not match the DB public address".into(),
1085 )
1086 .into());
1087 }
1088
1089 let domain_separator = self
1090 .get_indexer_data(None)
1091 .await?
1092 .domain_separator(DomainSeparator::Channel)
1093 .ok_or_else(|| DbSqlError::LogicalError("domain separator missing".into()))?;
1094
1095 if acked_tickets.is_empty() {
1096 return Err(DbSqlError::LogicalError("at least one ticket required for aggregation".to_owned()).into());
1097 }
1098
1099 if acked_tickets.len() == 1 {
1100 let single = acked_tickets
1101 .pop()
1102 .unwrap()
1103 .into_redeemable(&self.me_onchain, &domain_separator)
1104 .map_err(DbSqlError::from)?;
1105
1106 self.compare_and_set_outgoing_ticket_index(
1107 single.verified_ticket().channel_id,
1108 single.verified_ticket().index + 1,
1109 )
1110 .await?;
1111
1112 return Ok(single.ticket);
1113 }
1114
1115 acked_tickets.sort_by(|a, b| a.partial_cmp(b).unwrap_or(cmp::Ordering::Equal));
1116 acked_tickets.dedup();
1117
1118 let myself = self.clone();
1119 let address = myself
1120 .resolve_chain_key(&destination)
1121 .await?
1122 .ok_or(DbSqlError::LogicalError(format!(
1123 "peer '{}' has no chain key record",
1124 destination.to_peerid_str()
1125 )))?;
1126
1127 let (channel_entry, destination, min_win_prob) = self
1128 .nest_transaction_in_db(None, TargetDb::Index)
1129 .await?
1130 .perform(|tx| {
1131 Box::pin(async move {
1132 let entry = myself
1133 .get_channel_by_parties(Some(tx), &myself.me_onchain, &address, false)
1134 .await?
1135 .ok_or_else(|| {
1136 DbSqlError::ChannelNotFound(generate_channel_id(&myself.me_onchain, &address))
1137 })?;
1138
1139 if entry.status == ChannelStatus::Closed {
1140 return Err(DbSqlError::LogicalError(format!("{entry} is closed")));
1141 } else if entry.direction(&myself.me_onchain) != Some(ChannelDirection::Outgoing) {
1142 return Err(DbSqlError::LogicalError(format!("{entry} is not outgoing")));
1143 }
1144
1145 let min_win_prob = myself
1146 .get_indexer_data(Some(tx))
1147 .await?
1148 .minimum_incoming_ticket_winning_prob;
1149 Ok((entry, address, min_win_prob))
1150 })
1151 })
1152 .await?;
1153
1154 let channel_balance = channel_entry.balance;
1155 let channel_epoch = channel_entry.channel_epoch.as_u32();
1156 let channel_id = channel_entry.get_id();
1157
1158 let mut final_value = Balance::zero(BalanceType::HOPR);
1159
1160 let verified_tickets = acked_tickets
1162 .into_iter()
1163 .map(|t| t.into_redeemable(&self.me_onchain, &domain_separator))
1164 .collect::<hopr_internal_types::errors::Result<Vec<_>>>()
1165 .map_err(|e| {
1166 DbSqlError::LogicalError(format!("trying to aggregate an invalid or a non-winning ticket: {e}"))
1167 })?;
1168
1169 for (i, acked_ticket) in verified_tickets.iter().enumerate() {
1171 if channel_id != acked_ticket.verified_ticket().channel_id {
1172 return Err(DbSqlError::LogicalError(format!(
1173 "ticket for aggregation has an invalid channel id {}",
1174 acked_ticket.verified_ticket().channel_id
1175 ))
1176 .into());
1177 }
1178
1179 if acked_ticket.verified_ticket().channel_epoch != channel_epoch {
1180 return Err(DbSqlError::LogicalError("channel epochs do not match".into()).into());
1181 }
1182
1183 if i + 1 < verified_tickets.len()
1184 && acked_ticket.verified_ticket().index + acked_ticket.verified_ticket().index_offset as u64
1185 > verified_tickets[i + 1].verified_ticket().index
1186 {
1187 return Err(DbSqlError::LogicalError("tickets with overlapping index intervals".into()).into());
1188 }
1189
1190 if !f64_approx_eq(
1191 acked_ticket.verified_ticket().win_prob(),
1192 min_win_prob,
1193 LOWEST_POSSIBLE_WINNING_PROB,
1194 ) && acked_ticket.verified_ticket().win_prob() < min_win_prob
1195 {
1196 return Err(DbSqlError::LogicalError(
1197 "cannot aggregate ticket with lower than minimum winning probability in network".into(),
1198 )
1199 .into());
1200 }
1201
1202 final_value = final_value.add(&acked_ticket.verified_ticket().amount);
1203 if final_value.gt(&channel_balance) {
1204 return Err(DbSqlError::LogicalError(format!("ticket amount to aggregate {final_value} is greater than the balance {channel_balance} of channel {channel_id}")).into());
1205 }
1206 }
1207
1208 info!(
1209 "aggregated {} tickets in channel {channel_id} with total value {final_value}",
1210 verified_tickets.len()
1211 );
1212
1213 let first_acked_ticket = verified_tickets.first().unwrap();
1214 let last_acked_ticket = verified_tickets.last().unwrap();
1215
1216 let current_ticket_index_from_acked_tickets = last_acked_ticket.verified_ticket().index + 1;
1218 self.compare_and_set_outgoing_ticket_index(channel_id, current_ticket_index_from_acked_tickets)
1219 .await?;
1220
1221 Ok(TicketBuilder::default()
1222 .direction(&self.me_onchain, &destination)
1223 .balance(final_value)
1224 .index(first_acked_ticket.verified_ticket().index)
1225 .index_offset(
1226 (last_acked_ticket.verified_ticket().index - first_acked_ticket.verified_ticket().index + 1) as u32,
1227 )
1228 .win_prob(1.0) .channel_epoch(channel_epoch)
1230 .challenge(first_acked_ticket.verified_ticket().challenge)
1231 .build_signed(me, &domain_separator)
1232 .map_err(DbSqlError::from)?)
1233 }
1234
1235 async fn fix_channels_next_ticket_state(&self) -> Result<()> {
1236 let channels = self.get_incoming_channels(None).await?;
1237
1238 for channel in channels.into_iter() {
1239 let selector = TicketSelector::from(&channel)
1240 .with_state(AcknowledgedTicketStatus::BeingRedeemed)
1241 .with_index(channel.ticket_index.as_u64());
1242
1243 let mut tickets_stream = self
1244 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::Untouched)
1245 .await?;
1246
1247 while let Some(ticket) = tickets_stream.next().await {
1248 let channel_id = channel.get_id();
1249 let ticket_index = ticket.verified_ticket().index;
1250 let ticket_amount = ticket.verified_ticket().amount;
1251 info!(%channel_id, %ticket_index, %ticket_amount, "fixed next out-of-sync ticket");
1252 }
1253 }
1254
1255 Ok(())
1256 }
1257}
1258
1259impl HoprDb {
1260 pub async fn upsert_ticket<'a>(&'a self, tx: OptTx<'a>, acknowledged_ticket: AcknowledgedTicket) -> Result<()> {
1262 self.nest_transaction_in_db(tx, TargetDb::Tickets)
1263 .await?
1264 .perform(|tx| {
1265 Box::pin(async move {
1266 let selector = WrappedTicketSelector::from(
1268 TicketSelector::new(
1269 acknowledged_ticket.verified_ticket().channel_id,
1270 acknowledged_ticket.verified_ticket().channel_epoch,
1271 )
1272 .with_index(acknowledged_ticket.verified_ticket().index),
1273 );
1274
1275 debug!("upserting ticket {acknowledged_ticket}");
1276 let mut model = ticket::ActiveModel::from(acknowledged_ticket);
1277
1278 if let Some(ticket) = ticket::Entity::find().filter(selector).one(tx.as_ref()).await? {
1279 model.id = Set(ticket.id);
1280 }
1281
1282 Ok::<_, DbSqlError>(model.save(tx.as_ref()).await?)
1283 })
1284 })
1285 .await?;
1286 Ok(())
1287 }
1288}
1289
1290#[cfg(test)]
1291mod tests {
1292 use anyhow::{anyhow, Context};
1293 use futures::{pin_mut, StreamExt};
1294 use hex_literal::hex;
1295 use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter, Set};
1296 use std::ops::Add;
1297 use std::sync::atomic::Ordering;
1298 use std::time::{Duration, SystemTime};
1299
1300 use hopr_crypto_types::prelude::*;
1301 use hopr_db_api::prelude::{DbError, TicketMarker};
1302 use hopr_db_api::{info::DomainSeparator, tickets::ChannelTicketStatistics};
1303 use hopr_db_entity::ticket;
1304 use hopr_internal_types::prelude::*;
1305 use hopr_primitive_types::prelude::*;
1306
1307 use crate::accounts::HoprDbAccountOperations;
1308 use crate::channels::HoprDbChannelOperations;
1309 use crate::db::HoprDb;
1310 use crate::errors::DbSqlError;
1311 use crate::info::HoprDbInfoOperations;
1312 use crate::tickets::{
1313 filter_satisfying_ticket_models, AggregationPrerequisites, HoprDbTicketOperations, TicketSelector,
1314 };
1315 use crate::{HoprDbGeneralModelOperations, TargetDb};
1316
1317 lazy_static::lazy_static! {
1318 static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
1319 static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
1320 static ref CHANNEL_ID: Hash = generate_channel_id(&BOB.public().to_address(), &ALICE.public().to_address());
1321 }
1322
1323 lazy_static::lazy_static! {
1324 static ref ALICE_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
1325 static ref BOB_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
1326 }
1327
1328 const TICKET_VALUE: u64 = 100_000;
1329
1330 async fn add_peer_mappings(db: &HoprDb, peers: Vec<(OffchainKeypair, ChainKeypair)>) -> crate::errors::Result<()> {
1331 for (peer_offchain, peer_onchain) in peers.into_iter() {
1332 db.insert_account(
1333 None,
1334 AccountEntry {
1335 public_key: *peer_offchain.public(),
1336 chain_addr: peer_onchain.public().to_address(),
1337 entry_type: AccountType::NotAnnounced,
1338 },
1339 )
1340 .await?
1341 }
1342
1343 Ok(())
1344 }
1345
1346 fn generate_random_ack_ticket(
1347 src: &ChainKeypair,
1348 dst: &ChainKeypair,
1349 index: u64,
1350 index_offset: u32,
1351 win_prob: f64,
1352 ) -> anyhow::Result<AcknowledgedTicket> {
1353 let hk1 = HalfKey::random();
1354 let hk2 = HalfKey::random();
1355
1356 let cp1: CurvePoint = hk1.to_challenge().try_into()?;
1357 let cp2: CurvePoint = hk2.to_challenge().try_into()?;
1358 let cp_sum = CurvePoint::combine(&[&cp1, &cp2]);
1359
1360 Ok(TicketBuilder::default()
1361 .addresses(src, dst)
1362 .amount(TICKET_VALUE)
1363 .index(index)
1364 .index_offset(index_offset)
1365 .win_prob(win_prob)
1366 .channel_epoch(4)
1367 .challenge(Challenge::from(cp_sum).to_ethereum_challenge())
1368 .build_signed(src, &Hash::default())?
1369 .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?))
1370 }
1371
1372 async fn init_db_with_tickets(
1373 db: &HoprDb,
1374 count_tickets: u64,
1375 ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
1376 init_db_with_tickets_and_channel(db, count_tickets, None).await
1377 }
1378
1379 async fn init_db_with_tickets_and_channel(
1380 db: &HoprDb,
1381 count_tickets: u64,
1382 channel_ticket_index: Option<u32>,
1383 ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
1384 let channel = ChannelEntry::new(
1385 BOB.public().to_address(),
1386 ALICE.public().to_address(),
1387 BalanceType::HOPR.balance(u32::MAX),
1388 channel_ticket_index.unwrap_or(0u32).into(),
1389 ChannelStatus::Open,
1390 4_u32.into(),
1391 );
1392
1393 db.upsert_channel(None, channel).await?;
1394
1395 let tickets: Vec<AcknowledgedTicket> = (0..count_tickets)
1396 .map(|i| generate_random_ack_ticket(&BOB, &ALICE, i, 1, 1.0))
1397 .collect::<anyhow::Result<Vec<AcknowledgedTicket>>>()?;
1398
1399 let db_clone = db.clone();
1400 let tickets_clone = tickets.clone();
1401 db.begin_transaction_in_db(TargetDb::Tickets)
1402 .await?
1403 .perform(|tx| {
1404 Box::pin(async move {
1405 for t in tickets_clone {
1406 db_clone.upsert_ticket(Some(tx), t).await?;
1407 }
1408 Ok::<(), DbSqlError>(())
1409 })
1410 })
1411 .await?;
1412
1413 Ok((channel, tickets))
1414 }
1415
1416 #[async_std::test]
1417 async fn test_insert_get_ticket() -> anyhow::Result<()> {
1418 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1419 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
1420 .await?;
1421
1422 let (channel, mut tickets) = init_db_with_tickets(&db, 1).await?;
1423 let ack_ticket = tickets.pop().context("ticket should be present")?;
1424
1425 assert_eq!(
1426 channel.get_id(),
1427 ack_ticket.verified_ticket().channel_id,
1428 "channel ids must match"
1429 );
1430 assert_eq!(
1431 channel.channel_epoch.as_u32(),
1432 ack_ticket.verified_ticket().channel_epoch,
1433 "epochs must match"
1434 );
1435
1436 let db_ticket = db
1437 .get_tickets((&ack_ticket).into())
1438 .await?
1439 .first()
1440 .cloned()
1441 .context("ticket should exist")?;
1442
1443 assert_eq!(ack_ticket, db_ticket, "tickets must be equal");
1444
1445 Ok(())
1446 }
1447
1448 #[async_std::test]
1449 async fn test_mark_redeemed() -> anyhow::Result<()> {
1450 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1451 const COUNT_TICKETS: u64 = 10;
1452
1453 let (_, tickets) = init_db_with_tickets(&db, COUNT_TICKETS).await?;
1454
1455 let stats = db.get_ticket_statistics(None).await?;
1456 assert_eq!(
1457 BalanceType::HOPR.balance(TICKET_VALUE * COUNT_TICKETS),
1458 stats.unredeemed_value,
1459 "unredeemed balance must match"
1460 );
1461 assert_eq!(
1462 BalanceType::HOPR.zero(),
1463 stats.redeemed_value,
1464 "there must be 0 redeemed value"
1465 );
1466
1467 assert_eq!(
1468 stats,
1469 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1470 "per channel stats must be same"
1471 );
1472
1473 const TO_REDEEM: u64 = 2;
1474 let db_clone = db.clone();
1475 db.begin_transaction_in_db(TargetDb::Tickets)
1476 .await?
1477 .perform(|_tx| {
1478 Box::pin(async move {
1479 for i in 0..TO_REDEEM as usize {
1480 let r = db_clone
1481 .mark_tickets_as((&tickets[i]).into(), TicketMarker::Redeemed)
1482 .await?;
1483 assert_eq!(1, r, "must redeem only a single ticket");
1484 }
1485 Ok::<(), DbSqlError>(())
1486 })
1487 })
1488 .await?;
1489
1490 let stats = db.get_ticket_statistics(None).await?;
1491 assert_eq!(
1492 BalanceType::HOPR.balance(TICKET_VALUE * (COUNT_TICKETS - TO_REDEEM)),
1493 stats.unredeemed_value,
1494 "unredeemed balance must match"
1495 );
1496 assert_eq!(
1497 BalanceType::HOPR.balance(TICKET_VALUE * TO_REDEEM),
1498 stats.redeemed_value,
1499 "there must be a redeemed value"
1500 );
1501
1502 assert_eq!(
1503 stats,
1504 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1505 "per channel stats must be same"
1506 );
1507
1508 Ok(())
1509 }
1510
1511 #[async_std::test]
1512 async fn test_mark_redeem_should_not_mark_redeem_twice() -> anyhow::Result<()> {
1513 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1514
1515 let ticket = init_db_with_tickets(&db, 1)
1516 .await?
1517 .1
1518 .pop()
1519 .context("should contain a ticket")?;
1520
1521 db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed).await?;
1522 assert_eq!(0, db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed).await?);
1523
1524 Ok(())
1525 }
1526
1527 #[async_std::test]
1528 async fn test_mark_redeem_should_redeem_all_tickets() -> anyhow::Result<()> {
1529 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1530
1531 let count_tickets = 10;
1532 let channel = init_db_with_tickets(&db, count_tickets).await?.0;
1533
1534 let count_marked = db.mark_tickets_as((&channel).into(), TicketMarker::Redeemed).await?;
1535 assert_eq!(count_tickets, count_marked as u64, "must mark all tickets in channel");
1536
1537 Ok(())
1538 }
1539
1540 #[async_std::test]
1541 async fn test_mark_tickets_neglected() -> anyhow::Result<()> {
1542 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1543 const COUNT_TICKETS: u64 = 10;
1544
1545 let (channel, _) = init_db_with_tickets(&db, COUNT_TICKETS).await?;
1546
1547 let stats = db.get_ticket_statistics(None).await?;
1548 assert_eq!(
1549 BalanceType::HOPR.balance(TICKET_VALUE * COUNT_TICKETS),
1550 stats.unredeemed_value,
1551 "unredeemed balance must match"
1552 );
1553 assert_eq!(
1554 BalanceType::HOPR.zero(),
1555 stats.neglected_value,
1556 "there must be 0 redeemed value"
1557 );
1558
1559 assert_eq!(
1560 stats,
1561 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1562 "per channel stats must be same"
1563 );
1564
1565 db.mark_tickets_as((&channel).into(), TicketMarker::Neglected).await?;
1566
1567 let stats = db.get_ticket_statistics(None).await?;
1568 assert_eq!(
1569 BalanceType::HOPR.zero(),
1570 stats.unredeemed_value,
1571 "unredeemed balance must be zero"
1572 );
1573 assert_eq!(
1574 BalanceType::HOPR.balance(TICKET_VALUE * COUNT_TICKETS),
1575 stats.neglected_value,
1576 "there must be a neglected value"
1577 );
1578
1579 assert_eq!(
1580 stats,
1581 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1582 "per channel stats must be same"
1583 );
1584
1585 Ok(())
1586 }
1587
1588 #[async_std::test]
1589 async fn test_mark_unsaved_ticket_rejected() -> anyhow::Result<()> {
1590 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1591
1592 let (_, mut ticket) = init_db_with_tickets(&db, 1).await?;
1593 let ticket = ticket.pop().context("ticket should be present")?.ticket;
1594
1595 let stats = db.get_ticket_statistics(None).await?;
1596 assert_eq!(BalanceType::HOPR.zero(), stats.rejected_value);
1597 assert_eq!(
1598 stats,
1599 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1600 "per channel stats must be same"
1601 );
1602
1603 db.mark_unsaved_ticket_rejected(ticket.verified_ticket()).await?;
1604
1605 let stats = db.get_ticket_statistics(None).await?;
1606 assert_eq!(ticket.verified_ticket().amount, stats.rejected_value);
1607 assert_eq!(
1608 stats,
1609 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1610 "per channel stats must be same"
1611 );
1612
1613 Ok(())
1614 }
1615
1616 #[async_std::test]
1617 async fn test_update_tickets_states_and_fetch() -> anyhow::Result<()> {
1618 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1619 db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
1620 .await?;
1621
1622 let channel = init_db_with_tickets(&db, 10).await?.0;
1623
1624 let selector = TicketSelector::from(&channel).with_index(5);
1625
1626 let v: Vec<AcknowledgedTicket> = db
1627 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
1628 .await?
1629 .collect()
1630 .await;
1631
1632 assert_eq!(1, v.len(), "single ticket must be updated");
1633 assert_eq!(
1634 AcknowledgedTicketStatus::BeingRedeemed,
1635 v.first().context("should contain a ticket")?.status,
1636 "status must be set"
1637 );
1638
1639 let selector = TicketSelector::from(&channel).with_state(AcknowledgedTicketStatus::Untouched);
1640
1641 let v: Vec<AcknowledgedTicket> = db
1642 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
1643 .await?
1644 .collect()
1645 .await;
1646
1647 assert_eq!(9, v.len(), "only specific tickets must have state set");
1648 assert!(
1649 v.iter().all(|t| t.verified_ticket().index != 5),
1650 "only tickets with different state must update"
1651 );
1652 assert!(
1653 v.iter().all(|t| t.status == AcknowledgedTicketStatus::BeingRedeemed),
1654 "tickets must have updated state"
1655 );
1656
1657 Ok(())
1658 }
1659
1660 #[async_std::test]
1661 async fn test_update_tickets_states() -> anyhow::Result<()> {
1662 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1663 db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
1664 .await?;
1665
1666 let channel = init_db_with_tickets(&db, 10).await?.0;
1667 let selector = TicketSelector::from(&channel).with_state(AcknowledgedTicketStatus::Untouched);
1668
1669 db.update_ticket_states(selector.clone(), AcknowledgedTicketStatus::BeingRedeemed)
1670 .await?;
1671
1672 let v: Vec<AcknowledgedTicket> = db
1673 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
1674 .await?
1675 .collect()
1676 .await;
1677
1678 assert!(v.is_empty(), "must not update if already updated");
1679
1680 Ok(())
1681 }
1682
1683 #[async_std::test]
1684 async fn test_ticket_index_should_be_zero_if_not_yet_present() -> anyhow::Result<()> {
1685 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1686
1687 let hash = Hash::default();
1688
1689 let idx = db.get_outgoing_ticket_index(hash).await?;
1690 assert_eq!(0, idx.load(Ordering::SeqCst), "initial index must be zero");
1691
1692 let r = hopr_db_entity::outgoing_ticket_index::Entity::find()
1693 .filter(hopr_db_entity::outgoing_ticket_index::Column::ChannelId.eq(hash.to_hex()))
1694 .one(&db.tickets_db)
1695 .await?
1696 .context("index must exist")?;
1697
1698 assert_eq!(0, U256::from_be_bytes(r.index).as_u64(), "index must be zero");
1699
1700 Ok(())
1701 }
1702
1703 #[async_std::test]
1704 async fn test_ticket_stats_must_fail_for_non_existing_channel() -> anyhow::Result<()> {
1705 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1706
1707 db.get_ticket_statistics(Some(*CHANNEL_ID))
1708 .await
1709 .expect_err("must fail for non-existing channel");
1710
1711 Ok(())
1712 }
1713
1714 #[async_std::test]
1715 async fn test_ticket_stats_must_be_zero_when_no_tickets() -> anyhow::Result<()> {
1716 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1717
1718 let channel = ChannelEntry::new(
1719 BOB.public().to_address(),
1720 ALICE.public().to_address(),
1721 BalanceType::HOPR.balance(u32::MAX),
1722 0.into(),
1723 ChannelStatus::Open,
1724 4_u32.into(),
1725 );
1726
1727 db.upsert_channel(None, channel).await?;
1728
1729 let stats = db.get_ticket_statistics(Some(*CHANNEL_ID)).await?;
1730
1731 assert_eq!(
1732 ChannelTicketStatistics::default(),
1733 stats,
1734 "must be equal to default which is all zeros"
1735 );
1736
1737 assert_eq!(
1738 stats,
1739 db.get_ticket_statistics(None).await?,
1740 "per-channel stats must be the same as global stats"
1741 );
1742
1743 Ok(())
1744 }
1745
1746 #[async_std::test]
1747 async fn test_ticket_stats_must_be_different_per_channel() -> anyhow::Result<()> {
1748 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1749
1750 let channel_1 = ChannelEntry::new(
1751 BOB.public().to_address(),
1752 ALICE.public().to_address(),
1753 BalanceType::HOPR.balance(u32::MAX),
1754 0.into(),
1755 ChannelStatus::Open,
1756 4_u32.into(),
1757 );
1758
1759 db.upsert_channel(None, channel_1).await?;
1760
1761 let channel_2 = ChannelEntry::new(
1762 ALICE.public().to_address(),
1763 BOB.public().to_address(),
1764 BalanceType::HOPR.balance(u32::MAX),
1765 0.into(),
1766 ChannelStatus::Open,
1767 4_u32.into(),
1768 );
1769
1770 db.upsert_channel(None, channel_2).await?;
1771
1772 let t1 = generate_random_ack_ticket(&BOB, &ALICE, 1, 1, 1.0)?;
1773 let t2 = generate_random_ack_ticket(&ALICE, &BOB, 1, 1, 1.0)?;
1774
1775 let value = t1.verified_ticket().amount;
1776
1777 db.upsert_ticket(None, t1).await?;
1778 db.upsert_ticket(None, t2).await?;
1779
1780 let stats_1 = db
1781 .get_ticket_statistics(Some(generate_channel_id(
1782 &BOB.public().to_address(),
1783 &ALICE.public().to_address(),
1784 )))
1785 .await?;
1786
1787 let stats_2 = db
1788 .get_ticket_statistics(Some(generate_channel_id(
1789 &ALICE.public().to_address(),
1790 &BOB.public().to_address(),
1791 )))
1792 .await?;
1793
1794 assert_eq!(value, stats_1.unredeemed_value);
1795 assert_eq!(value, stats_2.unredeemed_value);
1796
1797 assert_eq!(BalanceType::HOPR.zero(), stats_1.neglected_value);
1798 assert_eq!(BalanceType::HOPR.zero(), stats_2.neglected_value);
1799
1800 assert_eq!(stats_1, stats_2);
1801
1802 db.mark_tickets_as(channel_1.into(), TicketMarker::Neglected).await?;
1803
1804 let stats_1 = db
1805 .get_ticket_statistics(Some(generate_channel_id(
1806 &BOB.public().to_address(),
1807 &ALICE.public().to_address(),
1808 )))
1809 .await?;
1810
1811 let stats_2 = db
1812 .get_ticket_statistics(Some(generate_channel_id(
1813 &ALICE.public().to_address(),
1814 &BOB.public().to_address(),
1815 )))
1816 .await?;
1817
1818 assert_eq!(BalanceType::HOPR.zero(), stats_1.unredeemed_value);
1819 assert_eq!(value, stats_1.neglected_value);
1820
1821 assert_eq!(BalanceType::HOPR.zero(), stats_2.neglected_value);
1822
1823 Ok(())
1824 }
1825
1826 #[async_std::test]
1827 async fn test_ticket_index_compare_and_set_and_increment() -> anyhow::Result<()> {
1828 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1829
1830 let hash = Hash::default();
1831
1832 let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1833 assert_eq!(0, old_idx, "old value must be 0");
1834
1835 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1836 assert_eq!(1, new_idx, "new value must be 1");
1837
1838 let old_idx = db.increment_outgoing_ticket_index(hash).await?;
1839 assert_eq!(1, old_idx, "old value must be 1");
1840
1841 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1842 assert_eq!(2, new_idx, "new value must be 2");
1843
1844 Ok(())
1845 }
1846
1847 #[async_std::test]
1848 async fn test_ticket_index_compare_and_set_must_not_decrease() -> anyhow::Result<()> {
1849 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1850
1851 let hash = Hash::default();
1852
1853 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1854 assert_eq!(0, new_idx, "value must be 0");
1855
1856 db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1857
1858 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1859 assert_eq!(1, new_idx, "new value must be 1");
1860
1861 let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 0).await?;
1862 assert_eq!(1, old_idx, "old value must be 1");
1863 assert_eq!(1, new_idx, "new value must be 1");
1864
1865 let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1866 assert_eq!(1, old_idx, "old value must be 1");
1867 assert_eq!(1, new_idx, "new value must be 1");
1868
1869 Ok(())
1870 }
1871
1872 #[async_std::test]
1873 async fn test_ticket_index_reset() -> anyhow::Result<()> {
1874 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1875
1876 let hash = Hash::default();
1877
1878 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1879 assert_eq!(0, new_idx, "value must be 0");
1880
1881 db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1882
1883 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1884 assert_eq!(1, new_idx, "new value must be 1");
1885
1886 let old_idx = db.reset_outgoing_ticket_index(hash).await?;
1887 assert_eq!(1, old_idx, "old value must be 1");
1888
1889 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1890 assert_eq!(0, new_idx, "new value must be 0");
1891 Ok(())
1892 }
1893
1894 #[async_std::test]
1895 async fn test_persist_ticket_indices() -> anyhow::Result<()> {
1896 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1897
1898 let hash_1 = Hash::default();
1899 let hash_2 = Hash::from(hopr_crypto_random::random_bytes());
1900
1901 db.get_outgoing_ticket_index(hash_1).await?;
1902 db.compare_and_set_outgoing_ticket_index(hash_2, 10).await?;
1903
1904 let persisted = db.persist_outgoing_ticket_indices().await?;
1905 assert_eq!(1, persisted);
1906
1907 let indices = hopr_db_entity::outgoing_ticket_index::Entity::find()
1908 .all(&db.tickets_db)
1909 .await?;
1910 let idx_1 = indices
1911 .iter()
1912 .find(|idx| idx.channel_id == hash_1.to_hex())
1913 .context("must contain index 1")?;
1914 let idx_2 = indices
1915 .iter()
1916 .find(|idx| idx.channel_id == hash_2.to_hex())
1917 .context("must contain index 2")?;
1918 assert_eq!(0, U256::from_be_bytes(&idx_1.index).as_u64(), "index must be 0");
1919 assert_eq!(10, U256::from_be_bytes(&idx_2.index).as_u64(), "index must be 10");
1920
1921 db.compare_and_set_outgoing_ticket_index(hash_1, 3).await?;
1922 db.increment_outgoing_ticket_index(hash_2).await?;
1923
1924 let persisted = db.persist_outgoing_ticket_indices().await?;
1925 assert_eq!(2, persisted);
1926
1927 let indices = hopr_db_entity::outgoing_ticket_index::Entity::find()
1928 .all(&db.tickets_db)
1929 .await?;
1930 let idx_1 = indices
1931 .iter()
1932 .find(|idx| idx.channel_id == hash_1.to_hex())
1933 .context("must contain index 1")?;
1934 let idx_2 = indices
1935 .iter()
1936 .find(|idx| idx.channel_id == hash_2.to_hex())
1937 .context("must contain index 2")?;
1938 assert_eq!(3, U256::from_be_bytes(&idx_1.index).as_u64(), "index must be 3");
1939 assert_eq!(11, U256::from_be_bytes(&idx_2.index).as_u64(), "index must be 11");
1940 Ok(())
1941 }
1942
1943 #[async_std::test]
1944 async fn test_cache_can_be_cloned_but_referencing_the_original_cache_storage() -> anyhow::Result<()> {
1945 let cache: moka::future::Cache<i64, i64> = moka::future::Cache::new(5);
1946
1947 assert_eq!(cache.weighted_size(), 0);
1948
1949 cache.insert(1, 1).await;
1950 cache.insert(2, 2).await;
1951
1952 let clone = cache.clone();
1953
1954 cache.remove(&1).await;
1955 cache.remove(&2).await;
1956
1957 assert_eq!(cache.get(&1).await, None);
1958 assert_eq!(cache.get(&1).await, clone.get(&1).await);
1959 Ok(())
1960 }
1961
1962 fn dummy_ticket_model(channel_id: Hash, idx: u64, idx_offset: u32, amount: u32) -> ticket::Model {
1963 ticket::Model {
1964 id: 0,
1965 channel_id: channel_id.to_string(),
1966 amount: U256::from(amount).to_be_bytes().to_vec(),
1967 index: idx.to_be_bytes().to_vec(),
1968 index_offset: idx_offset as i32,
1969 winning_probability: hex!("0020C49BA5E34F").to_vec(), channel_epoch: vec![],
1971 signature: vec![],
1972 response: vec![],
1973 state: 0,
1974 hash: vec![],
1975 }
1976 }
1977
1978 #[async_std::test]
1979 async fn test_aggregation_prerequisites_default_filter_no_tickets() -> anyhow::Result<()> {
1980 let prerequisites = AggregationPrerequisites::default();
1981 assert_eq!(None, prerequisites.min_unaggregated_ratio);
1982 assert_eq!(None, prerequisites.min_ticket_count);
1983
1984 let channel = ChannelEntry::new(
1985 BOB.public().to_address(),
1986 ALICE.public().to_address(),
1987 BalanceType::HOPR.balance(u32::MAX),
1988 2.into(),
1989 ChannelStatus::Open,
1990 4_u32.into(),
1991 );
1992
1993 let dummy_tickets = vec![dummy_ticket_model(channel.get_id(), 1, 1, 1)];
1994
1995 let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
1996
1997 assert_eq!(
1998 dummy_tickets, filtered_tickets,
1999 "empty prerequisites must not filter anything"
2000 );
2001 Ok(())
2002 }
2003
2004 #[async_std::test]
2005 async fn test_aggregation_prerequisites_should_filter_out_tickets_with_lower_than_min_win_prob(
2006 ) -> anyhow::Result<()> {
2007 let prerequisites = AggregationPrerequisites::default();
2008 assert_eq!(None, prerequisites.min_unaggregated_ratio);
2009 assert_eq!(None, prerequisites.min_ticket_count);
2010
2011 let channel = ChannelEntry::new(
2012 BOB.public().to_address(),
2013 ALICE.public().to_address(),
2014 BalanceType::HOPR.balance(u32::MAX),
2015 2.into(),
2016 ChannelStatus::Open,
2017 4_u32.into(),
2018 );
2019
2020 let dummy_tickets = vec![dummy_ticket_model(channel.get_id(), 1, 1, 1)];
2021
2022 let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0006)?;
2023
2024 assert!(
2025 filtered_tickets.is_empty(),
2026 "must filter out tickets with lower win prob"
2027 );
2028 Ok(())
2029 }
2030
2031 #[async_std::test]
2032 async fn test_aggregation_prerequisites_must_trim_tickets_exceeding_channel_balance() -> anyhow::Result<()> {
2033 const TICKET_COUNT: usize = 110;
2034
2035 let prerequisites = AggregationPrerequisites::default();
2036 assert_eq!(None, prerequisites.min_unaggregated_ratio);
2037 assert_eq!(None, prerequisites.min_ticket_count);
2038
2039 let channel = ChannelEntry::new(
2040 BOB.public().to_address(),
2041 ALICE.public().to_address(),
2042 BalanceType::HOPR.balance(100),
2043 (TICKET_COUNT + 1).into(),
2044 ChannelStatus::Open,
2045 4_u32.into(),
2046 );
2047
2048 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2049 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2050 .collect();
2051
2052 let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
2053
2054 assert_eq!(
2055 100,
2056 filtered_tickets.len(),
2057 "must take only tickets up to channel balance"
2058 );
2059 assert!(
2060 filtered_tickets
2061 .into_iter()
2062 .map(|t| U256::from_be_bytes(t.amount).as_u64())
2063 .sum::<u64>()
2064 <= channel.balance.amount().as_u64(),
2065 "filtered tickets must not exceed channel balance"
2066 );
2067 Ok(())
2068 }
2069
2070 #[async_std::test]
2071 async fn test_aggregation_prerequisites_must_return_empty_when_minimum_ticket_count_not_met() -> anyhow::Result<()>
2072 {
2073 const TICKET_COUNT: usize = 10;
2074
2075 let prerequisites = AggregationPrerequisites {
2076 min_ticket_count: Some(TICKET_COUNT + 1),
2077 min_unaggregated_ratio: None,
2078 };
2079
2080 let channel = ChannelEntry::new(
2081 BOB.public().to_address(),
2082 ALICE.public().to_address(),
2083 BalanceType::HOPR.balance(100),
2084 (TICKET_COUNT + 1).into(),
2085 ChannelStatus::Open,
2086 4_u32.into(),
2087 );
2088
2089 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2090 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2091 .collect();
2092
2093 let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
2094
2095 assert!(
2096 filtered_tickets.is_empty(),
2097 "must return empty when min_ticket_count is not met"
2098 );
2099
2100 Ok(())
2101 }
2102
2103 #[async_std::test]
2104 async fn test_aggregation_prerequisites_must_return_empty_when_minimum_unaggregated_ratio_is_not_met(
2105 ) -> anyhow::Result<()> {
2106 const TICKET_COUNT: usize = 10;
2107
2108 let prerequisites = AggregationPrerequisites {
2109 min_ticket_count: None,
2110 min_unaggregated_ratio: Some(0.9),
2111 };
2112
2113 let channel = ChannelEntry::new(
2114 BOB.public().to_address(),
2115 ALICE.public().to_address(),
2116 BalanceType::HOPR.balance(100),
2117 (TICKET_COUNT + 1).into(),
2118 ChannelStatus::Open,
2119 4_u32.into(),
2120 );
2121
2122 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2123 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2124 .collect();
2125
2126 let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
2127
2128 assert!(
2129 filtered_tickets.is_empty(),
2130 "must return empty when min_unaggregated_ratio is not met"
2131 );
2132
2133 Ok(())
2134 }
2135
2136 #[async_std::test]
2137 async fn test_aggregation_prerequisites_must_return_all_when_minimum_ticket_count_is_met() -> anyhow::Result<()> {
2138 const TICKET_COUNT: usize = 10;
2139
2140 let prerequisites = AggregationPrerequisites {
2141 min_ticket_count: Some(TICKET_COUNT),
2142 min_unaggregated_ratio: None,
2143 };
2144
2145 let channel = ChannelEntry::new(
2146 BOB.public().to_address(),
2147 ALICE.public().to_address(),
2148 BalanceType::HOPR.balance(100),
2149 (TICKET_COUNT + 1).into(),
2150 ChannelStatus::Open,
2151 4_u32.into(),
2152 );
2153
2154 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2155 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2156 .collect();
2157
2158 let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
2159
2160 assert!(!filtered_tickets.is_empty(), "must not return empty");
2161 assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2162 Ok(())
2163 }
2164
2165 #[async_std::test]
2166 async fn test_aggregation_prerequisites_must_return_all_when_minimum_ticket_count_is_met_regardless_ratio(
2167 ) -> anyhow::Result<()> {
2168 const TICKET_COUNT: usize = 10;
2169
2170 let prerequisites = AggregationPrerequisites {
2171 min_ticket_count: Some(TICKET_COUNT),
2172 min_unaggregated_ratio: Some(0.9),
2173 };
2174
2175 let channel = ChannelEntry::new(
2176 BOB.public().to_address(),
2177 ALICE.public().to_address(),
2178 BalanceType::HOPR.balance(100),
2179 (TICKET_COUNT + 1).into(),
2180 ChannelStatus::Open,
2181 4_u32.into(),
2182 );
2183
2184 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2185 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2186 .collect();
2187
2188 let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
2189
2190 assert!(!filtered_tickets.is_empty(), "must not return empty");
2191 assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2192 Ok(())
2193 }
2194
2195 #[async_std::test]
2196 async fn test_aggregation_prerequisites_must_return_all_when_minimum_unaggregated_ratio_is_met(
2197 ) -> anyhow::Result<()> {
2198 const TICKET_COUNT: usize = 90;
2199
2200 let prerequisites = AggregationPrerequisites {
2201 min_ticket_count: None,
2202 min_unaggregated_ratio: Some(0.9),
2203 };
2204
2205 let channel = ChannelEntry::new(
2206 BOB.public().to_address(),
2207 ALICE.public().to_address(),
2208 BalanceType::HOPR.balance(100),
2209 (TICKET_COUNT + 1).into(),
2210 ChannelStatus::Open,
2211 4_u32.into(),
2212 );
2213
2214 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2215 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2216 .collect();
2217
2218 let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
2219
2220 assert!(!filtered_tickets.is_empty(), "must not return empty");
2221 assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2222 Ok(())
2223 }
2224
2225 #[async_std::test]
2226 async fn test_aggregation_prerequisites_must_return_all_when_minimum_unaggregated_ratio_is_met_regardless_count(
2227 ) -> anyhow::Result<()> {
2228 const TICKET_COUNT: usize = 90;
2229
2230 let prerequisites = AggregationPrerequisites {
2231 min_ticket_count: Some(TICKET_COUNT + 1),
2232 min_unaggregated_ratio: Some(0.9),
2233 };
2234
2235 let channel = ChannelEntry::new(
2236 BOB.public().to_address(),
2237 ALICE.public().to_address(),
2238 BalanceType::HOPR.balance(100),
2239 (TICKET_COUNT + 1).into(),
2240 ChannelStatus::Open,
2241 4_u32.into(),
2242 );
2243
2244 let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2245 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2246 .collect();
2247
2248 let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
2249
2250 assert!(!filtered_tickets.is_empty(), "must not return empty");
2251 assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2252 Ok(())
2253 }
2254
2255 #[async_std::test]
2256 async fn test_aggregation_prerequisites_must_return_tickets_when_minimum_incl_aggregated_ratio_is_met(
2257 ) -> anyhow::Result<()> {
2258 const TICKET_COUNT: usize = 90;
2259
2260 let prerequisites = AggregationPrerequisites {
2261 min_ticket_count: None,
2262 min_unaggregated_ratio: Some(0.9),
2263 };
2264
2265 let channel = ChannelEntry::new(
2266 BOB.public().to_address(),
2267 ALICE.public().to_address(),
2268 BalanceType::HOPR.balance(100),
2269 (TICKET_COUNT + 1).into(),
2270 ChannelStatus::Open,
2271 4_u32.into(),
2272 );
2273
2274 let mut dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2275 .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2276 .collect();
2277 dummy_tickets[0].index_offset = 2; let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
2280
2281 assert_eq!(filtered_tickets.len(), TICKET_COUNT);
2282 Ok(())
2283 }
2284
2285 #[async_std::test]
2286 async fn test_aggregation_prerequisites_must_return_empty_when_minimum_only_unaggregated_ratio_is_met_in_single_ticket_only(
2287 ) -> anyhow::Result<()> {
2288 let prerequisites = AggregationPrerequisites {
2289 min_ticket_count: None,
2290 min_unaggregated_ratio: Some(0.9),
2291 };
2292
2293 let channel = ChannelEntry::new(
2294 BOB.public().to_address(),
2295 ALICE.public().to_address(),
2296 BalanceType::HOPR.balance(100),
2297 2.into(),
2298 ChannelStatus::Open,
2299 4_u32.into(),
2300 );
2301
2302 let dummy_tickets = vec![dummy_ticket_model(channel.get_id(), 1, 2, 110)];
2304
2305 let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
2306
2307 assert!(filtered_tickets.is_empty(), "must return empty");
2308 Ok(())
2309 }
2310
2311 async fn create_alice_db_with_tickets_from_bob(
2312 ticket_count: usize,
2313 ) -> anyhow::Result<(HoprDb, ChannelEntry, Vec<AcknowledgedTicket>)> {
2314 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
2315
2316 db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
2317 .await?;
2318
2319 add_peer_mappings(
2320 &db,
2321 vec![
2322 (ALICE_OFFCHAIN.clone(), ALICE.clone()),
2323 (BOB_OFFCHAIN.clone(), BOB.clone()),
2324 ],
2325 )
2326 .await?;
2327
2328 let (channel, tickets) = init_db_with_tickets(&db, ticket_count as u64).await?;
2329
2330 Ok((db, channel, tickets))
2331 }
2332
2333 #[async_std::test]
2334 async fn test_ticket_aggregation_should_fail_if_any_ticket_is_being_aggregated_in_that_channel(
2335 ) -> anyhow::Result<()> {
2336 const COUNT_TICKETS: usize = 5;
2337
2338 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2339
2340 assert_eq!(tickets.len(), COUNT_TICKETS);
2341
2342 let existing_channel_with_multiple_tickets = channel.get_id();
2343
2344 let mut ticket = hopr_db_entity::ticket::Entity::find()
2346 .one(&db.tickets_db)
2347 .await?
2348 .context("should have an active model")?
2349 .into_active_model();
2350 ticket.state = Set(AcknowledgedTicketStatus::BeingAggregated as i8);
2351 ticket.save(&db.tickets_db).await?;
2352
2353 assert!(db
2354 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2355 .await
2356 .is_err());
2357
2358 Ok(())
2359 }
2360
2361 #[async_std::test]
2362 async fn test_ticket_aggregation_should_not_offer_tickets_with_lower_than_min_win_prob() -> anyhow::Result<()> {
2363 const COUNT_TICKETS: usize = 5;
2364
2365 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2366
2367 assert_eq!(tickets.len(), COUNT_TICKETS);
2368
2369 let existing_channel_with_multiple_tickets = channel.get_id();
2370
2371 let mut ticket = hopr_db_entity::ticket::Entity::find()
2373 .one(&db.tickets_db)
2374 .await?
2375 .context("should have an active model")?
2376 .into_active_model();
2377 ticket.winning_probability = Set(f64_to_win_prob(0.5)?.to_vec());
2378 ticket.save(&db.tickets_db).await?;
2379
2380 let prepared_tickets = db
2381 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2382 .await?
2383 .ok_or(anyhow!("should contain tickets"))?
2384 .1;
2385
2386 assert_eq!(COUNT_TICKETS - 1, prepared_tickets.len());
2387
2388 Ok(())
2389 }
2390
2391 #[async_std::test]
2392 async fn test_ticket_aggregation_prepare_request_with_0_tickets_should_return_empty_result() -> anyhow::Result<()> {
2393 const COUNT_TICKETS: usize = 0;
2394
2395 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2396
2397 assert_eq!(tickets.len(), COUNT_TICKETS);
2398
2399 let existing_channel_with_0_tickets = channel.get_id();
2400
2401 let actual = db
2402 .prepare_aggregation_in_channel(&existing_channel_with_0_tickets, Default::default())
2403 .await?;
2404
2405 assert_eq!(actual, None);
2406
2407 Ok(())
2408 }
2409
2410 #[async_std::test]
2411 async fn test_ticket_aggregation_prepare_request_with_multiple_tickets_should_return_that_ticket(
2412 ) -> anyhow::Result<()> {
2413 const COUNT_TICKETS: usize = 2;
2414
2415 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2416 let tickets: Vec<TransferableWinningTicket> = tickets
2417 .into_iter()
2418 .map(|t| t.into_transferable(&ALICE, &Hash::default()))
2419 .collect::<hopr_internal_types::errors::Result<Vec<TransferableWinningTicket>>>()?;
2420
2421 assert_eq!(tickets.len(), COUNT_TICKETS);
2422
2423 let existing_channel_with_multiple_tickets = channel.get_id();
2424
2425 let actual = db
2426 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2427 .await?;
2428
2429 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2430
2431 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2432 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2433 .count(&db.tickets_db)
2434 .await? as usize;
2435
2436 assert_eq!(actual_being_aggregated_count, COUNT_TICKETS);
2437
2438 Ok(())
2439 }
2440
2441 #[async_std::test]
2442 async fn test_ticket_aggregation_prepare_request_with_duplicate_tickets_should_return_dedup_aggregated_ticket(
2443 ) -> anyhow::Result<()> {
2444 let (db, channel, _) = create_alice_db_with_tickets_from_bob(0).await?;
2445 let tickets = vec![
2446 generate_random_ack_ticket(&BOB, &ALICE, 1, 1, 1.0),
2447 generate_random_ack_ticket(&BOB, &ALICE, 0, 2, 1.0),
2448 generate_random_ack_ticket(&BOB, &ALICE, 2, 1, 1.0),
2449 generate_random_ack_ticket(&BOB, &ALICE, 3, 1, 1.0),
2450 ]
2451 .into_iter()
2452 .collect::<anyhow::Result<Vec<AcknowledgedTicket>>>()?;
2453
2454 let tickets_clone = tickets.clone();
2455 let db_clone = db.clone();
2456 db.nest_transaction_in_db(None, TargetDb::Tickets)
2457 .await?
2458 .perform(|tx| {
2459 Box::pin(async move {
2460 for ticket in tickets_clone {
2461 db_clone.upsert_ticket(tx.into(), ticket).await?;
2462 }
2463 Ok::<_, DbError>(())
2464 })
2465 })
2466 .await?;
2467
2468 let existing_channel_with_multiple_tickets = channel.get_id();
2469 let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2470 assert_eq!(stats.neglected_value, BalanceType::HOPR.zero());
2471
2472 let actual = db
2473 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2474 .await?;
2475
2476 let mut tickets = tickets
2477 .into_iter()
2478 .map(|t| t.into_transferable(&ALICE, &Hash::default()))
2479 .collect::<hopr_internal_types::errors::Result<Vec<_>>>()?;
2480
2481 tickets.remove(0);
2483 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2484
2485 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2486 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2487 .count(&db.tickets_db)
2488 .await? as usize;
2489
2490 assert_eq!(actual_being_aggregated_count, 3);
2491
2492 Ok(())
2493 }
2494
2495 #[async_std::test]
2496 async fn test_ticket_aggregation_prepare_request_with_a_being_redeemed_ticket_should_aggregate_only_the_tickets_following_it(
2497 ) -> anyhow::Result<()> {
2498 const COUNT_TICKETS: usize = 5;
2499
2500 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2501 let mut tickets = tickets
2502 .into_iter()
2503 .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2504 .collect::<Vec<_>>();
2505
2506 assert_eq!(tickets.len(), COUNT_TICKETS);
2507
2508 let existing_channel_with_multiple_tickets = channel.get_id();
2509
2510 let mut ticket = hopr_db_entity::ticket::Entity::find()
2512 .one(&db.tickets_db)
2513 .await?
2514 .context("should have 1 active model")?
2515 .into_active_model();
2516 ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
2517 ticket.save(&db.tickets_db).await?;
2518
2519 let actual = db
2520 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2521 .await?;
2522
2523 tickets.remove(0);
2524 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2525
2526 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2527 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2528 .count(&db.tickets_db)
2529 .await? as usize;
2530
2531 assert_eq!(actual_being_aggregated_count, COUNT_TICKETS - 1);
2532
2533 Ok(())
2534 }
2535
2536 #[async_std::test]
2537 async fn test_ticket_aggregation_prepare_request_with_some_requirements_should_return_when_ticket_threshold_is_met(
2538 ) -> anyhow::Result<()> {
2539 const COUNT_TICKETS: usize = 5;
2540
2541 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2542 let tickets = tickets
2543 .into_iter()
2544 .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2545 .collect::<Vec<_>>();
2546
2547 assert_eq!(tickets.len(), COUNT_TICKETS);
2548
2549 let existing_channel_with_multiple_tickets = channel.get_id();
2550
2551 let constraints = AggregationPrerequisites {
2552 min_ticket_count: Some(COUNT_TICKETS - 1),
2553 min_unaggregated_ratio: None,
2554 };
2555 let actual = db
2556 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, constraints)
2557 .await?;
2558
2559 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2560
2561 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2562 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2563 .count(&db.tickets_db)
2564 .await? as usize;
2565
2566 assert_eq!(actual_being_aggregated_count, COUNT_TICKETS);
2567
2568 Ok(())
2569 }
2570
2571 #[async_std::test]
2572 async fn test_ticket_aggregation_prepare_request_with_some_requirements_should_not_return_when_ticket_threshold_is_not_met(
2573 ) -> anyhow::Result<()> {
2574 const COUNT_TICKETS: usize = 2;
2575
2576 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2577
2578 assert_eq!(tickets.len(), COUNT_TICKETS);
2579
2580 let existing_channel_with_multiple_tickets = channel.get_id();
2581
2582 let constraints = AggregationPrerequisites {
2583 min_ticket_count: Some(COUNT_TICKETS + 1),
2584 min_unaggregated_ratio: None,
2585 };
2586 let actual = db
2587 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, constraints)
2588 .await?;
2589
2590 assert_eq!(actual, None);
2591
2592 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2593 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2594 .count(&db.tickets_db)
2595 .await? as usize;
2596
2597 assert_eq!(actual_being_aggregated_count, 0);
2598
2599 Ok(())
2600 }
2601
2602 #[async_std::test]
2603 async fn test_ticket_aggregation_prepare_request_with_no_aggregatable_tickets_should_return_nothing(
2604 ) -> anyhow::Result<()> {
2605 const COUNT_TICKETS: usize = 3;
2606
2607 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2608
2609 assert_eq!(tickets.len(), { COUNT_TICKETS });
2610
2611 let existing_channel_with_multiple_tickets = channel.get_id();
2612
2613 for ticket in hopr_db_entity::ticket::Entity::find()
2615 .all(&db.tickets_db)
2616 .await?
2617 .into_iter()
2618 {
2619 let mut ticket = ticket.into_active_model();
2620 ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
2621 ticket.save(&db.tickets_db).await?;
2622 }
2623
2624 let actual = db
2625 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2626 .await?;
2627
2628 assert_eq!(actual, None);
2629
2630 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2631 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2632 .count(&db.tickets_db)
2633 .await? as usize;
2634
2635 assert_eq!(actual_being_aggregated_count, 0);
2636
2637 Ok(())
2638 }
2639
2640 #[async_std::test]
2641 async fn test_ticket_aggregation_rollback_should_rollback_all_the_being_aggregated_tickets_but_nothing_else(
2642 ) -> anyhow::Result<()> {
2643 const COUNT_TICKETS: usize = 5;
2644
2645 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2646
2647 assert_eq!(tickets.len(), COUNT_TICKETS);
2648
2649 let existing_channel_with_multiple_tickets = channel.get_id();
2650
2651 let mut ticket = hopr_db_entity::ticket::Entity::find()
2653 .one(&db.tickets_db)
2654 .await?
2655 .context("should have one active model")?
2656 .into_active_model();
2657 ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
2658 ticket.save(&db.tickets_db).await?;
2659
2660 assert!(db
2661 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2662 .await
2663 .is_ok());
2664
2665 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2666 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2667 .count(&db.tickets_db)
2668 .await? as usize;
2669
2670 assert_eq!(actual_being_aggregated_count, COUNT_TICKETS - 1);
2671
2672 assert!(db
2673 .rollback_aggregation_in_channel(existing_channel_with_multiple_tickets)
2674 .await
2675 .is_ok());
2676
2677 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2678 .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2679 .count(&db.tickets_db)
2680 .await? as usize;
2681
2682 assert_eq!(actual_being_aggregated_count, 0);
2683
2684 Ok(())
2685 }
2686
2687 #[async_std::test]
2688 async fn test_ticket_aggregation_should_replace_the_tickets_with_a_correctly_aggregated_ticket(
2689 ) -> anyhow::Result<()> {
2690 const COUNT_TICKETS: usize = 5;
2691
2692 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2693
2694 let (notifier_tx, notifier_rx) = futures::channel::mpsc::unbounded();
2695 db.start_ticket_processing(Some(notifier_tx))?;
2696
2697 let tickets = tickets
2698 .into_iter()
2699 .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2700 .collect::<Vec<_>>();
2701
2702 let first_ticket = tickets.first().context("should contain tickets")?.ticket.clone();
2703 let aggregated_ticket = TicketBuilder::default()
2704 .addresses(&*BOB, &*ALICE)
2705 .amount(
2706 tickets
2707 .iter()
2708 .fold(U256::zero(), |acc, v| acc + v.ticket.amount.amount()),
2709 )
2710 .index(first_ticket.index)
2711 .index_offset(
2712 tickets.last().context("should contain tickets")?.ticket.index as u32 - first_ticket.index as u32 + 1,
2713 )
2714 .win_prob(1.0)
2715 .channel_epoch(first_ticket.channel_epoch)
2716 .challenge(first_ticket.challenge)
2717 .build_signed(&BOB, &Hash::default())?;
2718
2719 assert_eq!(tickets.len(), COUNT_TICKETS);
2720
2721 let existing_channel_with_multiple_tickets = channel.get_id();
2722
2723 let actual = db
2724 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2725 .await?;
2726
2727 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2728
2729 let agg_ticket = aggregated_ticket.clone();
2730
2731 let _ = db
2732 .process_received_aggregated_ticket(aggregated_ticket.leak(), &ALICE)
2733 .await?;
2734
2735 pin_mut!(notifier_rx);
2736 let notified_ticket = notifier_rx.next().await.ok_or(anyhow!("must have ticket"))?;
2737
2738 assert_eq!(notified_ticket.verified_ticket(), agg_ticket.verified_ticket());
2739
2740 let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2741 .filter(ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2742 .count(&db.tickets_db)
2743 .await? as usize;
2744
2745 assert_eq!(actual_being_aggregated_count, 0);
2746
2747 Ok(())
2748 }
2749
2750 #[async_std::test]
2751 async fn test_ticket_aggregation_should_fail_if_the_aggregated_ticket_value_is_lower_than_the_stored_one(
2752 ) -> anyhow::Result<()> {
2753 const COUNT_TICKETS: usize = 5;
2754
2755 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2756 let tickets = tickets
2757 .into_iter()
2758 .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2759 .collect::<Vec<_>>();
2760
2761 let first_ticket = tickets.first().context("should contain tickets")?.ticket.clone();
2762 let aggregated_ticket = TicketBuilder::default()
2763 .addresses(&*BOB, &*ALICE)
2764 .amount(0)
2765 .index(first_ticket.index)
2766 .index_offset(
2767 tickets.last().context("should contain tickets")?.ticket.index as u32 - first_ticket.index as u32 + 1,
2768 )
2769 .win_prob(1.0)
2770 .channel_epoch(first_ticket.channel_epoch)
2771 .challenge(first_ticket.challenge)
2772 .build_signed(&BOB, &Hash::default())?;
2773
2774 assert_eq!(tickets.len(), COUNT_TICKETS);
2775
2776 let existing_channel_with_multiple_tickets = channel.get_id();
2777
2778 let actual = db
2779 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2780 .await?;
2781
2782 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2783
2784 assert!(db
2785 .process_received_aggregated_ticket(aggregated_ticket.leak(), &ALICE)
2786 .await
2787 .is_err());
2788
2789 Ok(())
2790 }
2791
2792 #[async_std::test]
2793 async fn test_ticket_aggregation_should_fail_if_the_aggregated_ticket_win_probability_is_not_equal_to_1(
2794 ) -> anyhow::Result<()> {
2795 const COUNT_TICKETS: usize = 5;
2796
2797 let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2798 let tickets = tickets
2799 .into_iter()
2800 .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2801 .collect::<Vec<_>>();
2802
2803 let first_ticket = tickets.first().context("should contain tickets")?.ticket.clone();
2804 let aggregated_ticket = TicketBuilder::default()
2805 .addresses(&*BOB, &*ALICE)
2806 .amount(0)
2807 .index(first_ticket.index)
2808 .index_offset(
2809 tickets.last().context("should contain tickets")?.ticket.index as u32 - first_ticket.index as u32 + 1,
2810 )
2811 .win_prob(0.5) .channel_epoch(first_ticket.channel_epoch)
2813 .challenge(first_ticket.challenge)
2814 .build_signed(&BOB, &Hash::default())?;
2815
2816 assert_eq!(tickets.len(), COUNT_TICKETS);
2817
2818 let existing_channel_with_multiple_tickets = channel.get_id();
2819
2820 let actual = db
2821 .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2822 .await?;
2823
2824 assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2825
2826 assert!(db
2827 .process_received_aggregated_ticket(aggregated_ticket.leak(), &ALICE)
2828 .await
2829 .is_err());
2830
2831 Ok(())
2832 }
2833
2834 async fn init_db_with_channel(channel: ChannelEntry) -> anyhow::Result<HoprDb> {
2835 let db = HoprDb::new_in_memory(BOB.clone()).await?;
2836
2837 db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
2838 .await?;
2839
2840 add_peer_mappings(
2841 &db,
2842 vec![
2843 (ALICE_OFFCHAIN.clone(), ALICE.clone()),
2844 (BOB_OFFCHAIN.clone(), BOB.clone()),
2845 ],
2846 )
2847 .await?;
2848
2849 db.upsert_channel(None, channel).await?;
2850
2851 Ok(db)
2852 }
2853
2854 #[async_std::test]
2855 async fn test_aggregate_ticket_should_aggregate() -> anyhow::Result<()> {
2856 const COUNT_TICKETS: usize = 5;
2857
2858 let channel = ChannelEntry::new(
2859 BOB.public().to_address(),
2860 ALICE.public().to_address(),
2861 BalanceType::HOPR.balance(u32::MAX),
2862 (COUNT_TICKETS + 1).into(),
2863 ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(120))),
2864 4_u32.into(),
2865 );
2866
2867 let db = init_db_with_channel(channel).await?;
2868
2869 let tickets = (0..COUNT_TICKETS)
2870 .map(|i| {
2871 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
2872 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
2873 })
2874 .collect::<anyhow::Result<Vec<_>>>()?;
2875
2876 let sum_value = tickets
2877 .iter()
2878 .fold(BalanceType::HOPR.zero(), |acc, x| acc + x.ticket.amount);
2879 let min_idx = tickets
2880 .iter()
2881 .map(|t| t.ticket.index)
2882 .min()
2883 .context("min index should be present")?;
2884 let max_idx = tickets
2885 .iter()
2886 .map(|t| t.ticket.index)
2887 .max()
2888 .context("max index should be present")?;
2889
2890 let aggregated = db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets, &BOB).await?;
2891
2892 assert_eq!(
2893 &BOB.public().to_address(),
2894 aggregated.verified_issuer(),
2895 "must have correct signer"
2896 );
2897
2898 assert!(aggregated.verified_ticket().is_aggregated(), "must be aggregated");
2899
2900 assert_eq!(
2901 COUNT_TICKETS,
2902 aggregated.verified_ticket().index_offset as usize,
2903 "aggregated ticket must have correct offset"
2904 );
2905 assert_eq!(
2906 sum_value,
2907 aggregated.verified_ticket().amount,
2908 "aggregated ticket token amount must be sum of individual tickets"
2909 );
2910 assert_eq!(
2911 1.0,
2912 aggregated.win_prob(),
2913 "aggregated ticket must have winning probability 1"
2914 );
2915 assert_eq!(
2916 min_idx,
2917 aggregated.verified_ticket().index,
2918 "aggregated ticket must have correct index"
2919 );
2920 assert_eq!(
2921 channel.get_id(),
2922 aggregated.verified_ticket().channel_id,
2923 "aggregated ticket must have correct channel id"
2924 );
2925 assert_eq!(
2926 channel.channel_epoch.as_u32(),
2927 aggregated.verified_ticket().channel_epoch,
2928 "aggregated ticket must have correct channel epoch"
2929 );
2930
2931 assert_eq!(
2932 max_idx + 1,
2933 db.get_outgoing_ticket_index(channel.get_id())
2934 .await?
2935 .load(Ordering::SeqCst)
2936 );
2937
2938 Ok(())
2939 }
2940
2941 #[async_std::test]
2942 async fn test_aggregate_ticket_should_aggregate_including_aggregated() -> anyhow::Result<()> {
2943 const COUNT_TICKETS: usize = 5;
2944
2945 let channel = ChannelEntry::new(
2946 BOB.public().to_address(),
2947 ALICE.public().to_address(),
2948 BalanceType::HOPR.balance(u32::MAX),
2949 (COUNT_TICKETS + 1).into(),
2950 ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(120))),
2951 4_u32.into(),
2952 );
2953
2954 let db = init_db_with_channel(channel).await?;
2955
2956 let offset = 10_usize;
2957
2958 let mut tickets = (1..COUNT_TICKETS)
2959 .map(|i| {
2960 generate_random_ack_ticket(&BOB, &ALICE, (i + offset) as u64, 1, 1.0)
2961 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
2962 })
2963 .collect::<anyhow::Result<Vec<_>>>()?;
2964
2965 tickets.push(
2967 generate_random_ack_ticket(&BOB, &ALICE, 0, offset as u32, 1.0)
2968 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))?,
2969 );
2970
2971 let sum_value = tickets
2972 .iter()
2973 .fold(BalanceType::HOPR.zero(), |acc, x| acc + x.ticket.amount);
2974 let min_idx = tickets
2975 .iter()
2976 .map(|t| t.ticket.index)
2977 .min()
2978 .context("min index should be present")?;
2979 let max_idx = tickets
2980 .iter()
2981 .map(|t| t.ticket.index)
2982 .max()
2983 .context("max index should be present")?;
2984
2985 let aggregated = db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets, &BOB).await?;
2986
2987 assert_eq!(
2988 &BOB.public().to_address(),
2989 aggregated.verified_issuer(),
2990 "must have correct signer"
2991 );
2992
2993 assert!(aggregated.verified_ticket().is_aggregated(), "must be aggregated");
2994
2995 assert_eq!(
2996 COUNT_TICKETS + offset,
2997 aggregated.verified_ticket().index_offset as usize,
2998 "aggregated ticket must have correct offset"
2999 );
3000 assert_eq!(
3001 sum_value,
3002 aggregated.verified_ticket().amount,
3003 "aggregated ticket token amount must be sum of individual tickets"
3004 );
3005 assert_eq!(
3006 1.0,
3007 aggregated.win_prob(),
3008 "aggregated ticket must have winning probability 1"
3009 );
3010 assert_eq!(
3011 min_idx,
3012 aggregated.verified_ticket().index,
3013 "aggregated ticket must have correct index"
3014 );
3015 assert_eq!(
3016 channel.get_id(),
3017 aggregated.verified_ticket().channel_id,
3018 "aggregated ticket must have correct channel id"
3019 );
3020 assert_eq!(
3021 channel.channel_epoch.as_u32(),
3022 aggregated.verified_ticket().channel_epoch,
3023 "aggregated ticket must have correct channel epoch"
3024 );
3025
3026 assert_eq!(
3027 max_idx + 1,
3028 db.get_outgoing_ticket_index(channel.get_id())
3029 .await?
3030 .load(Ordering::SeqCst)
3031 );
3032
3033 Ok(())
3034 }
3035
3036 #[async_std::test]
3037 async fn test_aggregate_ticket_should_not_aggregate_zero_tickets() -> anyhow::Result<()> {
3038 let db = HoprDb::new_in_memory(BOB.clone()).await?;
3039
3040 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), vec![], &BOB)
3041 .await
3042 .expect_err("should not aggregate empty ticket list");
3043
3044 Ok(())
3045 }
3046
3047 #[async_std::test]
3048 async fn test_aggregate_ticket_should_aggregate_single_ticket_to_itself() -> anyhow::Result<()> {
3049 const COUNT_TICKETS: usize = 1;
3050
3051 let channel = ChannelEntry::new(
3052 BOB.public().to_address(),
3053 ALICE.public().to_address(),
3054 BalanceType::HOPR.balance(u32::MAX),
3055 (COUNT_TICKETS + 1).into(),
3056 ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(120))),
3057 4_u32.into(),
3058 );
3059
3060 let db = init_db_with_channel(channel).await?;
3061
3062 let mut tickets = (0..COUNT_TICKETS)
3063 .map(|i| {
3064 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3065 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3066 })
3067 .collect::<anyhow::Result<Vec<_>>>()?;
3068
3069 let aggregated = db
3070 .aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3071 .await?;
3072
3073 assert_eq!(
3074 &tickets.pop().context("ticket should be present")?.ticket,
3075 aggregated.verified_ticket()
3076 );
3077
3078 Ok(())
3079 }
3080
3081 #[async_std::test]
3082 async fn test_aggregate_ticket_should_not_aggregate_on_closed_channel() -> anyhow::Result<()> {
3083 const COUNT_TICKETS: usize = 3;
3084
3085 let channel = ChannelEntry::new(
3086 BOB.public().to_address(),
3087 ALICE.public().to_address(),
3088 BalanceType::HOPR.balance(u32::MAX),
3089 (COUNT_TICKETS + 1).into(),
3090 ChannelStatus::Closed,
3091 4_u32.into(),
3092 );
3093
3094 let db = init_db_with_channel(channel).await?;
3095
3096 let tickets = (0..COUNT_TICKETS)
3097 .map(|i| {
3098 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3099 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3100 })
3101 .collect::<anyhow::Result<Vec<_>>>()?;
3102
3103 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3104 .await
3105 .expect_err("should not aggregate on closed channel");
3106
3107 Ok(())
3108 }
3109
3110 #[async_std::test]
3111 async fn test_aggregate_ticket_should_not_aggregate_on_incoming_channel() -> anyhow::Result<()> {
3112 const COUNT_TICKETS: usize = 3;
3113
3114 let channel = ChannelEntry::new(
3115 ALICE.public().to_address(),
3116 BOB.public().to_address(),
3117 BalanceType::HOPR.balance(u32::MAX),
3118 (COUNT_TICKETS + 1).into(),
3119 ChannelStatus::Open,
3120 4_u32.into(),
3121 );
3122
3123 let db = init_db_with_channel(channel).await?;
3124
3125 let tickets = (0..COUNT_TICKETS)
3126 .map(|i| {
3127 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3128 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3129 })
3130 .collect::<anyhow::Result<Vec<_>>>()?;
3131
3132 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3133 .await
3134 .expect_err("should not aggregate on incoming channel");
3135
3136 Ok(())
3137 }
3138
3139 #[async_std::test]
3140 async fn test_aggregate_ticket_should_not_aggregate_if_mismatching_channel_ids() -> anyhow::Result<()> {
3141 const COUNT_TICKETS: usize = 3;
3142
3143 let channel = ChannelEntry::new(
3144 ALICE.public().to_address(),
3145 BOB.public().to_address(),
3146 BalanceType::HOPR.balance(u32::MAX),
3147 (COUNT_TICKETS + 1).into(),
3148 ChannelStatus::Open,
3149 4_u32.into(),
3150 );
3151
3152 let db = init_db_with_channel(channel).await?;
3153
3154 let mut tickets = (0..COUNT_TICKETS)
3155 .map(|i| {
3156 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3157 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3158 })
3159 .collect::<anyhow::Result<Vec<_>>>()?;
3160
3161 tickets[2] = generate_random_ack_ticket(&BOB, &ChainKeypair::random(), 2, 1, 1.0)
3162 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))?;
3163
3164 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3165 .await
3166 .expect_err("should not aggregate on mismatching channel ids");
3167
3168 Ok(())
3169 }
3170
3171 #[async_std::test]
3172 async fn test_aggregate_ticket_should_not_aggregate_if_mismatching_channel_epoch() -> anyhow::Result<()> {
3173 const COUNT_TICKETS: usize = 3;
3174
3175 let channel = ChannelEntry::new(
3176 ALICE.public().to_address(),
3177 BOB.public().to_address(),
3178 BalanceType::HOPR.balance(u32::MAX),
3179 (COUNT_TICKETS + 1).into(),
3180 ChannelStatus::Open,
3181 3_u32.into(),
3182 );
3183
3184 let db = init_db_with_channel(channel).await?;
3185
3186 let tickets = (0..COUNT_TICKETS)
3187 .map(|i| {
3188 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3189 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3190 })
3191 .collect::<anyhow::Result<Vec<_>>>()?;
3192
3193 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3194 .await
3195 .expect_err("should not aggregate on mismatching channel epoch");
3196
3197 Ok(())
3198 }
3199
3200 #[async_std::test]
3201 async fn test_aggregate_ticket_should_not_aggregate_if_ticket_indices_overlap() -> anyhow::Result<()> {
3202 const COUNT_TICKETS: usize = 3;
3203
3204 let channel = ChannelEntry::new(
3205 ALICE.public().to_address(),
3206 BOB.public().to_address(),
3207 BalanceType::HOPR.balance(u32::MAX),
3208 (COUNT_TICKETS + 1).into(),
3209 ChannelStatus::Open,
3210 3_u32.into(),
3211 );
3212
3213 let db = init_db_with_channel(channel).await?;
3214
3215 let mut tickets = (0..COUNT_TICKETS)
3216 .map(|i| {
3217 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3218 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3219 })
3220 .collect::<anyhow::Result<Vec<_>>>()?;
3221
3222 tickets[1] = generate_random_ack_ticket(&BOB, &ALICE, 1, 2, 1.0)
3223 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))?;
3224
3225 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3226 .await
3227 .expect_err("should not aggregate on overlapping ticket indices");
3228 Ok(())
3229 }
3230
3231 #[async_std::test]
3232 async fn test_aggregate_ticket_should_not_aggregate_if_ticket_is_not_valid() -> anyhow::Result<()> {
3233 const COUNT_TICKETS: usize = 3;
3234
3235 let channel = ChannelEntry::new(
3236 ALICE.public().to_address(),
3237 BOB.public().to_address(),
3238 BalanceType::HOPR.balance(u32::MAX),
3239 (COUNT_TICKETS + 1).into(),
3240 ChannelStatus::Open,
3241 3_u32.into(),
3242 );
3243
3244 let db = init_db_with_channel(channel).await?;
3245
3246 let mut tickets = (0..COUNT_TICKETS)
3247 .map(|i| {
3248 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3249 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3250 })
3251 .collect::<anyhow::Result<Vec<_>>>()?;
3252
3253 tickets[1].ticket.amount = Balance::new(TICKET_VALUE - 10, BalanceType::HOPR);
3255
3256 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3257 .await
3258 .expect_err("should not aggregate on invalid tickets");
3259
3260 Ok(())
3261 }
3262
3263 #[async_std::test]
3264 async fn test_aggregate_ticket_should_not_aggregate_if_ticket_has_lower_than_min_win_prob() -> anyhow::Result<()> {
3265 const COUNT_TICKETS: usize = 3;
3266
3267 let channel = ChannelEntry::new(
3268 ALICE.public().to_address(),
3269 BOB.public().to_address(),
3270 BalanceType::HOPR.balance(u32::MAX),
3271 (COUNT_TICKETS + 1).into(),
3272 ChannelStatus::Open,
3273 3_u32.into(),
3274 );
3275
3276 let db = init_db_with_channel(channel).await?;
3277
3278 let tickets = (0..COUNT_TICKETS)
3279 .map(|i| {
3280 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, if i > 0 { 1.0 } else { 0.9 })
3281 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3282 })
3283 .collect::<anyhow::Result<Vec<_>>>()?;
3284
3285 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3286 .await
3287 .expect_err("should not aggregate tickets with less than minimum win prob");
3288
3289 Ok(())
3290 }
3291
3292 #[async_std::test]
3293 async fn test_aggregate_ticket_should_not_aggregate_if_ticket_is_not_winning() -> anyhow::Result<()> {
3294 const COUNT_TICKETS: usize = 3;
3295
3296 let channel = ChannelEntry::new(
3297 ALICE.public().to_address(),
3298 BOB.public().to_address(),
3299 BalanceType::HOPR.balance(u32::MAX),
3300 (COUNT_TICKETS + 1).into(),
3301 ChannelStatus::Open,
3302 3_u32.into(),
3303 );
3304
3305 let db = init_db_with_channel(channel).await?;
3306
3307 let mut tickets = (0..COUNT_TICKETS)
3308 .map(|i| {
3309 generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3310 .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3311 })
3312 .collect::<anyhow::Result<Vec<_>>>()?;
3313
3314 let resp = Response::from_half_keys(&HalfKey::random(), &HalfKey::random())?;
3316 tickets[1] = TicketBuilder::from(tickets[1].ticket.clone())
3317 .win_prob(0.0)
3318 .challenge(resp.to_challenge().into())
3319 .build_signed(&BOB, &Hash::default())?
3320 .into_acknowledged(resp)
3321 .into_transferable(&ALICE, &Hash::default())?;
3322
3323 db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3324 .await
3325 .expect_err("should not aggregate non-winning tickets");
3326
3327 Ok(())
3328 }
3329
3330 #[async_std::test]
3331 async fn test_set_ticket_statistics_when_tickets_are_in_db() -> anyhow::Result<()> {
3332 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
3333
3334 let ticket = init_db_with_tickets(&db, 1).await?.1.pop().unwrap();
3335
3336 db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed)
3337 .await
3338 .expect("must not fail");
3339
3340 let stats = db.get_ticket_statistics(None).await.expect("must not fail");
3341 assert_ne!(stats.redeemed_value, BalanceType::HOPR.zero());
3342
3343 db.reset_ticket_statistics().await.expect("must not fail");
3344
3345 let stats = db.get_ticket_statistics(None).await.expect("must not fail");
3346 assert_eq!(stats.redeemed_value, BalanceType::HOPR.zero());
3347
3348 Ok(())
3349 }
3350
3351 #[async_std::test]
3352 async fn test_fix_channels_ticket_state() -> anyhow::Result<()> {
3353 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
3354 const COUNT_TICKETS: u64 = 1;
3355
3356 let (_, _) = init_db_with_tickets(&db, COUNT_TICKETS).await?;
3357
3358 let mut ticket = hopr_db_entity::ticket::Entity::find()
3360 .one(&db.tickets_db)
3361 .await?
3362 .context("should have one active model")?
3363 .into_active_model();
3364 ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
3365 ticket.save(&db.tickets_db).await?;
3366
3367 assert!(
3368 hopr_db_entity::ticket::Entity::find()
3369 .one(&db.tickets_db)
3370 .await?
3371 .context("should have one active model")?
3372 .state
3373 == AcknowledgedTicketStatus::BeingRedeemed as i8,
3374 );
3375
3376 db.fix_channels_next_ticket_state().await.expect("must not fail");
3377
3378 assert!(
3379 hopr_db_entity::ticket::Entity::find()
3380 .one(&db.tickets_db)
3381 .await?
3382 .context("should have one active model")?
3383 .state
3384 == AcknowledgedTicketStatus::Untouched as i8,
3385 );
3386
3387 Ok(())
3388 }
3389
3390 #[async_std::test]
3391 async fn test_dont_fix_correct_channels_ticket_state() -> anyhow::Result<()> {
3392 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
3393 const COUNT_TICKETS: u64 = 2;
3394
3395 let (_, _) = init_db_with_tickets_and_channel(&db, COUNT_TICKETS, Some(1u32)).await?;
3397
3398 let mut ticket = hopr_db_entity::ticket::Entity::find()
3400 .filter(ticket::Column::Index.eq(0u64.to_be_bytes().to_vec()))
3401 .one(&db.tickets_db)
3402 .await?
3403 .context("should have one active model")?
3404 .into_active_model();
3405 ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
3406 ticket.save(&db.tickets_db).await?;
3407
3408 assert!(
3409 hopr_db_entity::ticket::Entity::find()
3410 .filter(ticket::Column::Index.eq(0u64.to_be_bytes().to_vec()))
3411 .one(&db.tickets_db)
3412 .await?
3413 .context("should have one active model")?
3414 .state
3415 == AcknowledgedTicketStatus::BeingRedeemed as i8,
3416 );
3417
3418 db.fix_channels_next_ticket_state().await.expect("must not fail");
3419
3420 let ticket0 = hopr_db_entity::ticket::Entity::find()
3422 .filter(ticket::Column::Index.eq(0u64.to_be_bytes().to_vec()))
3423 .one(&db.tickets_db)
3424 .await?
3425 .context("should have one active model")?;
3426 assert_eq!(ticket0.state, AcknowledgedTicketStatus::BeingRedeemed as i8);
3427
3428 let ticket1 = hopr_db_entity::ticket::Entity::find()
3430 .filter(ticket::Column::Index.eq(1u64.to_be_bytes().to_vec()))
3431 .one(&db.tickets_db)
3432 .await?
3433 .context("should have one active model")?;
3434 assert_eq!(ticket1.state, AcknowledgedTicketStatus::Untouched as i8);
3435
3436 Ok(())
3437 }
3438}