hopr_db_sql/
tickets.rs

1use async_stream::stream;
2use async_trait::async_trait;
3use futures::stream::BoxStream;
4use futures::{StreamExt, TryStreamExt};
5use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QueryOrder, QuerySelect, Set};
6use sea_query::{Condition, Expr, IntoCondition, SimpleExpr};
7use std::cmp;
8use std::ops::{Add, Bound};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use tracing::{debug, error, info, trace, warn};
12
13use hopr_crypto_types::prelude::*;
14use hopr_db_api::prelude::{TicketIndexSelector, TicketMarker};
15use hopr_db_api::resolver::HoprDbResolverOperations;
16use hopr_db_api::tickets::AggregationPrerequisites;
17use hopr_db_api::{
18    errors::Result,
19    info::DomainSeparator,
20    tickets::{ChannelTicketStatistics, HoprDbTicketOperations, TicketSelector},
21};
22use hopr_db_entity::ticket_statistics;
23use hopr_db_entity::{outgoing_ticket_index, ticket};
24use hopr_internal_types::prelude::*;
25use hopr_primitive_types::prelude::*;
26
27use crate::channels::HoprDbChannelOperations;
28use crate::db::HoprDb;
29use crate::errors::DbSqlError;
30use crate::errors::DbSqlError::LogicalError;
31use crate::info::HoprDbInfoOperations;
32use crate::{HoprDbGeneralModelOperations, OpenTransaction, OptTx, TargetDb};
33
34#[cfg(all(feature = "prometheus", not(test)))]
35use hopr_metrics::metrics::MultiGauge;
36
37#[cfg(all(feature = "prometheus", not(test)))]
38lazy_static::lazy_static! {
39    pub static ref METRIC_HOPR_TICKETS_INCOMING_STATISTICS: MultiGauge = MultiGauge::new(
40        "hopr_tickets_incoming_statistics",
41        "Ticket statistics for channels with incoming tickets.",
42        &["channel", "statistic"]
43    ).unwrap();
44}
45
46/// The maximum number of tickets that can sent for aggregation in a single request.
47pub const MAX_TICKETS_TO_AGGREGATE_BATCH: u64 = 500;
48
49/// The type is necessary solely to allow
50/// implementing the [`IntoCondition`] trait for [`TicketSelector`]
51/// from the `hopr_db_api` crate.
52#[derive(Clone)]
53pub(crate) struct WrappedTicketSelector(pub(crate) TicketSelector);
54
55impl From<TicketSelector> for WrappedTicketSelector {
56    fn from(selector: TicketSelector) -> Self {
57        Self(selector)
58    }
59}
60
61impl AsRef<TicketSelector> for WrappedTicketSelector {
62    fn as_ref(&self) -> &TicketSelector {
63        &self.0
64    }
65}
66
67impl IntoCondition for WrappedTicketSelector {
68    fn into_condition(self) -> Condition {
69        let expr = self
70            .0
71            .channel_identifiers
72            .into_iter()
73            .map(|(channel_id, epoch)| {
74                ticket::Column::ChannelId
75                    .eq(channel_id.to_hex())
76                    .and(ticket::Column::ChannelEpoch.eq(epoch.to_be_bytes().to_vec()))
77            })
78            .reduce(SimpleExpr::or);
79
80        // This cannot happen, but instead of panicking, return an impossible condition object
81        if expr.is_none() {
82            return Condition::any().not();
83        }
84
85        let mut expr = expr.unwrap();
86
87        match self.0.index {
88            TicketIndexSelector::None => {
89                // This will always be the case if there were multiple channel identifiers
90            }
91            TicketIndexSelector::Single(idx) => expr = expr.and(ticket::Column::Index.eq(idx.to_be_bytes().to_vec())),
92            TicketIndexSelector::Multiple(idxs) => {
93                expr = expr.and(ticket::Column::Index.is_in(idxs.into_iter().map(|i| i.to_be_bytes().to_vec())));
94            }
95            TicketIndexSelector::Range((lb, ub)) => {
96                expr = match lb {
97                    Bound::Included(gte) => expr.and(ticket::Column::Index.gte(gte.to_be_bytes().to_vec())),
98                    Bound::Excluded(gt) => expr.and(ticket::Column::Index.gt(gt.to_be_bytes().to_vec())),
99                    Bound::Unbounded => expr,
100                };
101                expr = match ub {
102                    Bound::Included(lte) => expr.and(ticket::Column::Index.lte(lte.to_be_bytes().to_vec())),
103                    Bound::Excluded(lt) => expr.and(ticket::Column::Index.lt(lt.to_be_bytes().to_vec())),
104                    Bound::Unbounded => expr,
105                };
106            }
107        }
108
109        if let Some(state) = self.0.state {
110            expr = expr.and(ticket::Column::State.eq(state as u8))
111        }
112
113        if self.0.only_aggregated {
114            expr = expr.and(ticket::Column::IndexOffset.gt(1));
115        }
116
117        // Win prob lower bound
118        expr = match self.0.win_prob.0 {
119            Bound::Included(gte) => expr.and(ticket::Column::WinningProbability.gte(gte.to_vec())),
120            Bound::Excluded(gt) => expr.and(ticket::Column::WinningProbability.gt(gt.to_vec())),
121            Bound::Unbounded => expr,
122        };
123
124        // Win prob upper bound
125        expr = match self.0.win_prob.1 {
126            Bound::Included(lte) => expr.and(ticket::Column::WinningProbability.lte(lte.to_vec())),
127            Bound::Excluded(lt) => expr.and(ticket::Column::WinningProbability.lt(lt.to_vec())),
128            Bound::Unbounded => expr,
129        };
130
131        // Amount lower bound
132        expr = match self.0.amount.0 {
133            Bound::Included(gte) => expr.and(ticket::Column::Amount.gte(gte.amount().to_be_bytes().to_vec())),
134            Bound::Excluded(gt) => expr.and(ticket::Column::Amount.gt(gt.amount().to_be_bytes().to_vec())),
135            Bound::Unbounded => expr,
136        };
137
138        // Amount upper bound
139        expr = match self.0.amount.1 {
140            Bound::Included(lte) => expr.and(ticket::Column::Amount.lte(lte.amount().to_be_bytes().to_vec())),
141            Bound::Excluded(lt) => expr.and(ticket::Column::Amount.lt(lt.amount().to_be_bytes().to_vec())),
142            Bound::Unbounded => expr,
143        };
144
145        expr.into_condition()
146    }
147}
148
149/// Filters the list of ticket models according to the prerequisites.
150/// **NOTE:** the input list is assumed to be sorted by ticket index in ascending order.
151///
152/// The following is applied:
153/// - the list of tickets is reduced so that the total amount on the tickets does not exceed the channel balance
154/// - it is checked whether the list size is greater than `min_unaggregated_ratio`
155/// - it is checked whether the ratio of total amount on the unaggregated tickets on the list and the channel balance ratio is greater than `min_unaggregated_ratio`
156pub(crate) fn filter_satisfying_ticket_models(
157    prerequisites: AggregationPrerequisites,
158    models: Vec<ticket::Model>,
159    channel_entry: &ChannelEntry,
160    min_win_prob: f64,
161) -> crate::errors::Result<Vec<ticket::Model>> {
162    let channel_id = channel_entry.get_id();
163
164    let mut to_be_aggregated = Vec::with_capacity(models.len());
165    let mut total_balance = BalanceType::HOPR.zero();
166
167    for m in models {
168        let ticket_wp = win_prob_to_f64(
169            m.winning_probability
170                .as_slice()
171                .try_into()
172                .map_err(|_| DbSqlError::DecodingError)?,
173        );
174        if !f64_approx_eq(ticket_wp, min_win_prob, LOWEST_POSSIBLE_WINNING_PROB) && ticket_wp < min_win_prob {
175            warn!(
176                channel_id = %channel_entry.get_id(),
177                ticket_wp, min_win_prob, "encountered ticket with winning probability lower than the minimum threshold"
178            );
179            continue;
180        }
181
182        let to_add = BalanceType::HOPR.balance_bytes(&m.amount);
183
184        // Do a balance check to be sure not to aggregate more than the current channel stake
185        total_balance = total_balance + to_add;
186        if total_balance.gt(&channel_entry.balance) {
187            // Remove last sub-balance which led to the overflow before breaking out of the loop.
188            total_balance = total_balance - to_add;
189            break;
190        }
191
192        to_be_aggregated.push(m);
193    }
194
195    // If there are no criteria, just send everything for aggregation
196    if prerequisites.min_ticket_count.is_none() && prerequisites.min_unaggregated_ratio.is_none() {
197        info!(channel = %channel_id, "Aggregation check OK, no aggregation prerequisites were given");
198        return Ok(to_be_aggregated);
199    }
200
201    let to_be_agg_count = to_be_aggregated.len();
202
203    // Check the aggregation threshold
204    if let Some(agg_threshold) = prerequisites.min_ticket_count {
205        if to_be_agg_count >= agg_threshold {
206            info!(channel = %channel_id, count = to_be_agg_count, threshold = agg_threshold, "Aggregation check OK aggregated value greater than threshold");
207            return Ok(to_be_aggregated);
208        } else {
209            debug!(channel = %channel_id, count = to_be_agg_count, threshold = agg_threshold,"Aggregation check FAIL not enough resources to aggregate");
210        }
211    }
212
213    if let Some(unrealized_threshold) = prerequisites.min_unaggregated_ratio {
214        let diminished_balance = channel_entry.balance.mul_f64(unrealized_threshold)?;
215
216        // Trigger aggregation if unrealized balance greater or equal to X percentage of the current balance
217        // and there are at least two tickets
218        if total_balance.ge(&diminished_balance) {
219            if to_be_agg_count > 1 {
220                info!(channel = %channel_id, count = to_be_agg_count, balance = ?total_balance, ?diminished_balance, "Aggregation check OK: more unrealized than diminished balance");
221                return Ok(to_be_aggregated);
222            } else {
223                debug!(channel = %channel_id, count = to_be_agg_count, balance = ?total_balance, ?diminished_balance, "Aggregation check FAIL: more unrealized than diminished balance but only 1 ticket");
224            }
225        } else {
226            debug!(channel = %channel_id, count = to_be_agg_count, balance = ?total_balance, ?diminished_balance, "Aggregation check FAIL: less unrealized than diminished balance");
227        }
228    }
229
230    debug!(channel = %channel_id,"Aggregation check FAIL: no prerequisites were met");
231    Ok(vec![])
232}
233
234pub(crate) async fn find_stats_for_channel(
235    tx: &OpenTransaction,
236    channel_id: &Hash,
237) -> crate::errors::Result<ticket_statistics::Model> {
238    if let Some(model) = ticket_statistics::Entity::find()
239        .filter(ticket_statistics::Column::ChannelId.eq(channel_id.to_hex()))
240        .one(tx.as_ref())
241        .await?
242    {
243        Ok(model)
244    } else {
245        let new_stats = ticket_statistics::ActiveModel {
246            channel_id: Set(channel_id.to_hex()),
247            ..Default::default()
248        }
249        .insert(tx.as_ref())
250        .await?;
251
252        Ok(new_stats)
253    }
254}
255
256impl HoprDb {
257    async fn get_tickets_value_int<'a>(&'a self, tx: OptTx<'a>, selector: TicketSelector) -> Result<(usize, Balance)> {
258        let selector: WrappedTicketSelector = selector.into();
259        Ok(self
260            .nest_transaction_in_db(tx, TargetDb::Tickets)
261            .await?
262            .perform(|tx| {
263                Box::pin(async move {
264                    ticket::Entity::find()
265                        .filter(selector)
266                        .stream(tx.as_ref())
267                        .await
268                        .map_err(DbSqlError::from)?
269                        .map_err(DbSqlError::from)
270                        .try_fold((0_usize, BalanceType::HOPR.zero()), |(count, value), t| async move {
271                            Ok((count + 1, value + BalanceType::HOPR.balance_bytes(t.amount)))
272                        })
273                        .await
274                })
275            })
276            .await?)
277    }
278}
279
280#[async_trait]
281impl HoprDbTicketOperations for HoprDb {
282    async fn get_all_tickets(&self) -> Result<Vec<AcknowledgedTicket>> {
283        Ok(self
284            .nest_transaction_in_db(None, TargetDb::Tickets)
285            .await?
286            .perform(|tx| {
287                Box::pin(async move {
288                    ticket::Entity::find()
289                        .all(tx.as_ref())
290                        .await?
291                        .into_iter()
292                        .map(AcknowledgedTicket::try_from)
293                        .collect::<hopr_db_entity::errors::Result<Vec<_>>>()
294                        .map_err(DbSqlError::from)
295                })
296            })
297            .await?)
298    }
299
300    async fn get_tickets(&self, selector: TicketSelector) -> Result<Vec<AcknowledgedTicket>> {
301        debug!("fetching tickets via {selector}");
302        let selector: WrappedTicketSelector = selector.into();
303
304        Ok(self
305            .nest_transaction_in_db(None, TargetDb::Tickets)
306            .await?
307            .perform(|tx| {
308                Box::pin(async move {
309                    ticket::Entity::find()
310                        .filter(selector)
311                        .all(tx.as_ref())
312                        .await?
313                        .into_iter()
314                        .map(AcknowledgedTicket::try_from)
315                        .collect::<hopr_db_entity::errors::Result<Vec<_>>>()
316                        .map_err(DbSqlError::from)
317                })
318            })
319            .await?)
320    }
321
322    async fn mark_tickets_as(&self, selector: TicketSelector, mark_as: TicketMarker) -> Result<usize> {
323        let myself = self.clone();
324        Ok(self
325            .ticket_manager
326            .with_write_locked_db(|tx| {
327                Box::pin(async move {
328                    let mut total_marked_count = 0;
329                    for (channel_id, epoch) in selector.channel_identifiers.iter() {
330                        let channel_selector = selector.clone().just_on_channel(*channel_id, epoch);
331
332                        // Get the number of tickets and their value just for this channel
333                        let (marked_count, marked_value) =
334                            myself.get_tickets_value_int(Some(tx), channel_selector.clone()).await?;
335                        trace!(marked_count, ?marked_value, ?mark_as, "ticket marking");
336
337                        if marked_count > 0 {
338                            // Delete the redeemed tickets first
339                            let deleted = ticket::Entity::delete_many()
340                                .filter(WrappedTicketSelector::from(channel_selector.clone()))
341                                .exec(tx.as_ref())
342                                .await?;
343
344                            // Update the stats if successful
345                            if deleted.rows_affected == marked_count as u64 {
346                                let mut new_stats = find_stats_for_channel(tx, channel_id).await?.into_active_model();
347                                let _current_value = match mark_as {
348                                    TicketMarker::Redeemed => {
349                                        let current_value = U256::from_be_bytes(new_stats.redeemed_value.as_ref());
350                                        new_stats.redeemed_value =
351                                            Set((current_value + marked_value.amount()).to_be_bytes().into());
352                                        current_value
353                                    }
354                                    TicketMarker::Rejected => {
355                                        let current_value = U256::from_be_bytes(new_stats.rejected_value.as_ref());
356                                        new_stats.rejected_value =
357                                            Set((current_value + marked_value.amount()).to_be_bytes().into());
358                                        current_value
359                                    }
360                                    TicketMarker::Neglected => {
361                                        let current_value = U256::from_be_bytes(new_stats.neglected_value.as_ref());
362                                        new_stats.neglected_value =
363                                            Set((current_value + marked_value.amount()).to_be_bytes().into());
364                                        current_value
365                                    }
366                                };
367                                new_stats.save(tx.as_ref()).await?;
368
369                                #[cfg(all(feature = "prometheus", not(test)))]
370                                {
371                                    let channel = channel_id.to_string();
372                                    METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
373                                        &[&channel, &mark_as.to_string()],
374                                        (_current_value + marked_value.amount()).as_u128() as f64,
375                                    );
376
377                                    // Tickets that are counted as rejected were never counted as unredeemed,
378                                    // so skip the metric subtraction in that case.
379                                    if mark_as != TicketMarker::Rejected {
380                                        let unredeemed_value = myself
381                                            .caches
382                                            .unrealized_value
383                                            .get(&(*channel_id, *epoch))
384                                            .await
385                                            .unwrap_or(Balance::zero(BalanceType::HOPR));
386
387                                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
388                                            &[&channel, "unredeemed"],
389                                            (unredeemed_value - marked_value.amount()).amount().as_u128() as f64,
390                                        );
391                                    }
392                                }
393
394                                myself.caches.unrealized_value.invalidate(&(*channel_id, *epoch)).await;
395                            } else {
396                                return Err(DbSqlError::LogicalError(format!(
397                                    "could not mark {marked_count} ticket as {mark_as}"
398                                )));
399                            }
400
401                            trace!(marked_count, ?channel_id, ?mark_as, "removed tickets in channel");
402                            total_marked_count += marked_count;
403                        }
404                    }
405
406                    info!(
407                        count = total_marked_count,
408                        ?mark_as,
409                        channel_count = selector.channel_identifiers.len(),
410                        "removed tickets in channels",
411                    );
412                    Ok(total_marked_count)
413                })
414            })
415            .await?)
416    }
417
418    async fn mark_unsaved_ticket_rejected(&self, ticket: &Ticket) -> Result<()> {
419        let channel_id = ticket.channel_id;
420        let amount = ticket.amount;
421        Ok(self
422            .ticket_manager
423            .with_write_locked_db(|tx| {
424                Box::pin(async move {
425                    let stats = find_stats_for_channel(tx, &channel_id).await?;
426
427                    let current_rejected_value = U256::from_be_bytes(stats.rejected_value.clone());
428
429                    let mut active_stats = stats.into_active_model();
430                    active_stats.rejected_value = Set((current_rejected_value + amount.amount()).to_be_bytes().into());
431                    active_stats.save(tx.as_ref()).await?;
432
433                    #[cfg(all(feature = "prometheus", not(test)))]
434                    {
435                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
436                            &[&channel_id.to_string(), "rejected"],
437                            (current_rejected_value + amount.amount()).as_u128() as f64,
438                        );
439                    }
440
441                    Ok::<(), DbSqlError>(())
442                })
443            })
444            .await?)
445    }
446
447    async fn update_ticket_states_and_fetch<'a>(
448        &'a self,
449        selector: TicketSelector,
450        new_state: AcknowledgedTicketStatus,
451    ) -> Result<BoxStream<'a, AcknowledgedTicket>> {
452        let selector: WrappedTicketSelector = selector.into();
453        Ok(Box::pin(stream! {
454            match ticket::Entity::find()
455                .filter(selector)
456                .stream(self.conn(TargetDb::Tickets))
457                .await {
458                Ok(mut stream) => {
459                    while let Ok(Some(ticket)) = stream.try_next().await {
460                        let active_ticket = ticket::ActiveModel {
461                            id: Set(ticket.id),
462                            state: Set(new_state as i8),
463                            ..Default::default()
464                        };
465
466                        {
467                            let _g = self.ticket_manager.mutex.lock();
468                            if let Err(e) = active_ticket.update(self.conn(TargetDb::Tickets)).await {
469                                error!(error = %e,"failed to update ticket in the db");
470                            }
471                        }
472
473                        match AcknowledgedTicket::try_from(ticket) {
474                            Ok(mut ticket) => {
475                                // Update the state manually, since we do not want to re-fetch the model after the update
476                                ticket.status = new_state;
477                                yield ticket
478                            },
479                            Err(e) => {
480                                tracing::error!(error = %e, "failed to decode ticket from the db");
481                            }
482                        }
483                    }
484                },
485                Err(e) => tracing::error!(error = %e, "failed open ticket db stream")
486            }
487        }))
488    }
489
490    async fn update_ticket_states(
491        &self,
492        selector: TicketSelector,
493        new_state: AcknowledgedTicketStatus,
494    ) -> Result<usize> {
495        let selector: WrappedTicketSelector = selector.into();
496        Ok(self
497            .ticket_manager
498            .with_write_locked_db(|tx| {
499                Box::pin(async move {
500                    let update = ticket::Entity::update_many()
501                        .filter(selector)
502                        .col_expr(ticket::Column::State, Expr::value(new_state as u8))
503                        .exec(tx.as_ref())
504                        .await?;
505                    Ok::<_, DbSqlError>(update.rows_affected as usize)
506                })
507            })
508            .await?)
509    }
510
511    async fn get_ticket_statistics(&self, channel_id: Option<Hash>) -> Result<ChannelTicketStatistics> {
512        let res = match channel_id {
513            None => {
514                #[cfg(all(feature = "prometheus", not(test)))]
515                let mut per_channel_unredeemed = std::collections::HashMap::new();
516
517                self.nest_transaction_in_db(None, TargetDb::Tickets)
518                    .await?
519                    .perform(|tx| {
520                        Box::pin(async move {
521                            let unredeemed_value = ticket::Entity::find()
522                                .stream(tx.as_ref())
523                                .await?
524                                .try_fold(U256::zero(), |amount, x| {
525                                    let unredeemed_value = U256::from_be_bytes(x.amount);
526
527                                    #[cfg(all(feature = "prometheus", not(test)))]
528                                    per_channel_unredeemed
529                                        .entry(x.channel_id)
530                                        .and_modify(|v| *v += unredeemed_value)
531                                        .or_insert(unredeemed_value);
532
533                                    futures::future::ok(amount + unredeemed_value)
534                                })
535                                .await?;
536
537                            #[cfg(all(feature = "prometheus", not(test)))]
538                            for (channel_id, unredeemed_value) in per_channel_unredeemed {
539                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS
540                                    .set(&[&channel_id, "unredeemed"], unredeemed_value.as_u128() as f64);
541                            }
542
543                            let mut all_stats = ticket_statistics::Entity::find()
544                                .all(tx.as_ref())
545                                .await?
546                                .into_iter()
547                                .fold(ChannelTicketStatistics::default(), |mut acc, stats| {
548                                    let neglected_value = BalanceType::HOPR.balance_bytes(stats.neglected_value);
549                                    acc.neglected_value = acc.neglected_value + neglected_value;
550                                    let redeemed_value = BalanceType::HOPR.balance_bytes(stats.redeemed_value);
551                                    acc.redeemed_value = acc.redeemed_value + redeemed_value;
552                                    let rejected_value = BalanceType::HOPR.balance_bytes(stats.rejected_value);
553                                    acc.rejected_value = acc.rejected_value + rejected_value;
554                                    acc.winning_tickets += stats.winning_tickets as u128;
555
556                                    #[cfg(all(feature = "prometheus", not(test)))]
557                                    {
558                                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
559                                            &[&stats.channel_id, "neglected"],
560                                            neglected_value.amount().as_u128() as f64,
561                                        );
562
563                                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
564                                            &[&stats.channel_id, "redeemed"],
565                                            redeemed_value.amount().as_u128() as f64,
566                                        );
567
568                                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
569                                            &[&stats.channel_id, "rejected"],
570                                            rejected_value.amount().as_u128() as f64,
571                                        );
572                                    }
573
574                                    acc
575                                });
576
577                            all_stats.unredeemed_value = BalanceType::HOPR.balance(unredeemed_value);
578
579                            Ok::<_, DbSqlError>(all_stats)
580                        })
581                    })
582                    .await
583            }
584            Some(channel) => {
585                // We need to make sure the channel exists to avoid creating
586                // statistic entry for a non-existing channel
587                if self.get_channel_by_id(None, &channel).await?.is_none() {
588                    return Err(DbSqlError::ChannelNotFound(channel).into());
589                }
590
591                self.nest_transaction_in_db(None, TargetDb::Tickets)
592                    .await?
593                    .perform(|tx| {
594                        Box::pin(async move {
595                            let stats = find_stats_for_channel(tx, &channel).await?;
596                            let unredeemed_value = ticket::Entity::find()
597                                .filter(ticket::Column::ChannelId.eq(channel.to_hex()))
598                                .stream(tx.as_ref())
599                                .await?
600                                .try_fold(U256::zero(), |amount, x| {
601                                    futures::future::ok(amount + U256::from_be_bytes(x.amount))
602                                })
603                                .await?;
604
605                            Ok::<_, DbSqlError>(ChannelTicketStatistics {
606                                winning_tickets: stats.winning_tickets as u128,
607                                neglected_value: BalanceType::HOPR.balance_bytes(stats.neglected_value),
608                                redeemed_value: BalanceType::HOPR.balance_bytes(stats.redeemed_value),
609                                unredeemed_value: BalanceType::HOPR.balance(unredeemed_value),
610                                rejected_value: BalanceType::HOPR.balance_bytes(stats.rejected_value),
611                            })
612                        })
613                    })
614                    .await
615            }
616        };
617        Ok(res?)
618    }
619
620    async fn reset_ticket_statistics(&self) -> Result<()> {
621        let res = self
622            .nest_transaction_in_db(None, TargetDb::Tickets)
623            .await?
624            .perform(|tx| {
625                Box::pin(async move {
626                    #[cfg(all(feature = "prometheus", not(test)))]
627                    let rows = ticket_statistics::Entity::find().all(tx.as_ref()).await?;
628
629                    // delete statistics for the found rows
630                    let deleted = ticket_statistics::Entity::delete_many().exec(tx.as_ref()).await?;
631
632                    #[cfg(all(feature = "prometheus", not(test)))]
633                    {
634                        if deleted.rows_affected > 0 {
635                            for row in rows {
636                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&[&row.channel_id, "neglected"], 0.0_f64);
637
638                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&[&row.channel_id, "redeemed"], 0.0_f64);
639
640                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&[&row.channel_id, "rejected"], 0.0_f64);
641                            }
642                        }
643                    }
644
645                    debug!("reset ticket statistics for {:} channel(s)", deleted.rows_affected);
646
647                    Ok::<_, DbSqlError>(())
648                })
649            })
650            .await;
651
652        Ok(res?)
653    }
654
655    async fn get_tickets_value(&self, selector: TicketSelector) -> Result<(usize, Balance)> {
656        self.get_tickets_value_int(None, selector).await
657    }
658
659    async fn compare_and_set_outgoing_ticket_index(&self, channel_id: Hash, index: u64) -> Result<u64> {
660        let old_value = self
661            .get_outgoing_ticket_index(channel_id)
662            .await?
663            .fetch_max(index, Ordering::SeqCst);
664
665        // TODO: should we hint the persisting mechanism to trigger the flush?
666        // if old_value < index {}
667
668        Ok(old_value)
669    }
670
671    async fn reset_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64> {
672        let old_value = self
673            .get_outgoing_ticket_index(channel_id)
674            .await?
675            .swap(0, Ordering::SeqCst);
676
677        // TODO: should we hint the persisting mechanism to trigger the flush?
678        // if old_value > 0 { }
679
680        Ok(old_value)
681    }
682
683    async fn increment_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64> {
684        let old_value = self
685            .get_outgoing_ticket_index(channel_id)
686            .await?
687            .fetch_add(1, Ordering::SeqCst);
688
689        // TODO: should we hint the persisting mechanism to trigger the flush?
690
691        Ok(old_value)
692    }
693
694    async fn get_outgoing_ticket_index(&self, channel_id: Hash) -> Result<Arc<AtomicU64>> {
695        let tkt_manager = self.ticket_manager.clone();
696
697        Ok(self
698            .caches
699            .ticket_index
700            .try_get_with(channel_id, async move {
701                let maybe_index = outgoing_ticket_index::Entity::find()
702                    .filter(outgoing_ticket_index::Column::ChannelId.eq(channel_id.to_hex()))
703                    .one(&tkt_manager.tickets_db)
704                    .await?;
705
706                Ok(Arc::new(AtomicU64::new(match maybe_index {
707                    Some(model) => U256::from_be_bytes(model.index).as_u64(),
708                    None => {
709                        tkt_manager
710                            .with_write_locked_db(|tx| {
711                                Box::pin(async move {
712                                    outgoing_ticket_index::ActiveModel {
713                                        channel_id: Set(channel_id.to_hex()),
714                                        ..Default::default()
715                                    }
716                                    .insert(tx.as_ref())
717                                    .await?;
718                                    Ok::<_, DbSqlError>(())
719                                })
720                            })
721                            .await?;
722                        0_u64
723                    }
724                })))
725            })
726            .await
727            .map_err(|e: Arc<DbSqlError>| LogicalError(format!("failed to retrieve ticket index: {e}")))?)
728    }
729
730    async fn persist_outgoing_ticket_indices(&self) -> Result<usize> {
731        let outgoing_indices = outgoing_ticket_index::Entity::find()
732            .all(&self.tickets_db)
733            .await
734            .map_err(DbSqlError::from)?;
735
736        let mut updated = 0;
737        for index_model in outgoing_indices {
738            let channel_id = Hash::from_hex(&index_model.channel_id).map_err(DbSqlError::from)?;
739            let db_index = U256::from_be_bytes(&index_model.index).as_u64();
740            if let Some(cached_index) = self.caches.ticket_index.get(&channel_id).await {
741                // Note that the persisted value is always lagging behind the cache,
742                // so the fact that the cached index can change between this load
743                // storing it in the DB is allowed.
744                let cached_index = cached_index.load(Ordering::SeqCst);
745
746                // Store the ticket index in a separate write transaction
747                if cached_index > db_index {
748                    let mut index_active_model = index_model.into_active_model();
749                    index_active_model.index = Set(cached_index.to_be_bytes().to_vec());
750                    self.ticket_manager
751                        .with_write_locked_db(|wtx| {
752                            Box::pin(async move {
753                                index_active_model.save(wtx.as_ref()).await?;
754                                Ok::<_, DbSqlError>(())
755                            })
756                        })
757                        .await?;
758
759                    debug!("updated ticket index in channel {channel_id} from {db_index} to {cached_index}");
760                    updated += 1;
761                }
762            } else {
763                // The value is not yet in the cache, meaning there's low traffic on this
764                // channel, so the value has not been yet fetched.
765                trace!(?channel_id, "channel not in cache yet");
766            }
767        }
768
769        Ok(Ok::<_, DbSqlError>(updated)?)
770    }
771
772    async fn prepare_aggregation_in_channel(
773        &self,
774        channel: &Hash,
775        prerequisites: AggregationPrerequisites,
776    ) -> Result<Option<(OffchainPublicKey, Vec<TransferableWinningTicket>, Hash)>> {
777        let myself = self.clone();
778
779        let channel_id = *channel;
780
781        let (channel_entry, peer, domain_separator, min_win_prob) = self
782            .nest_transaction_in_db(None, TargetDb::Index)
783            .await?
784            .perform(|tx| {
785                Box::pin(async move {
786                    let entry = myself
787                        .get_channel_by_id(Some(tx), &channel_id)
788                        .await?
789                        .ok_or(DbSqlError::ChannelNotFound(channel_id))?;
790
791                    if entry.status == ChannelStatus::Closed {
792                        return Err(DbSqlError::LogicalError(format!("channel '{channel_id}' is closed")));
793                    } else if entry.direction(&myself.me_onchain) != Some(ChannelDirection::Incoming) {
794                        return Err(DbSqlError::LogicalError(format!(
795                            "channel '{channel_id}' is not incoming"
796                        )));
797                    }
798
799                    let pk = myself
800                        .resolve_packet_key(&entry.source)
801                        .await?
802                        .ok_or(DbSqlError::LogicalError(format!(
803                            "peer '{}' has no offchain key record",
804                            entry.source
805                        )))?;
806
807                    let indexer_data = myself.get_indexer_data(Some(tx)).await?;
808
809                    let domain_separator = indexer_data
810                        .channels_dst
811                        .ok_or_else(|| crate::errors::DbSqlError::LogicalError("domain separator missing".into()))?;
812
813                    Ok((
814                        entry,
815                        pk,
816                        domain_separator,
817                        indexer_data.minimum_incoming_ticket_winning_prob,
818                    ))
819                })
820            })
821            .await?;
822
823        let myself = self.clone();
824        let tickets = self
825            .ticket_manager
826            .with_write_locked_db(|tx| {
827                Box::pin(async move {
828                    // verify that no aggregation is in progress in the channel
829                    if ticket::Entity::find()
830                        .filter(WrappedTicketSelector::from(
831                            TicketSelector::from(&channel_entry).with_state(AcknowledgedTicketStatus::BeingAggregated),
832                        ))
833                        .one(tx.as_ref())
834                        .await?
835                        .is_some()
836                    {
837                        return Err(DbSqlError::LogicalError(format!(
838                            "'{channel_entry}' is already being aggregated",
839                        )));
840                    }
841
842                    // find the index of the last ticket being redeemed
843                    let first_idx_to_take = ticket::Entity::find()
844                        .filter(WrappedTicketSelector::from(
845                            TicketSelector::from(&channel_entry).with_state(AcknowledgedTicketStatus::BeingRedeemed),
846                        ))
847                        .order_by_desc(ticket::Column::Index)
848                        .one(tx.as_ref())
849                        .await?
850                        .map(|m| U256::from_be_bytes(m.index).as_u64() + 1)
851                        .unwrap_or(0_u64) // go from the lowest possible index of none is found
852                        .max(channel_entry.ticket_index.as_u64()); // but cannot be less than the ticket index on the Channel entry
853
854                    // get the list of all tickets to be aggregated
855                    let to_be_aggregated = ticket::Entity::find()
856                        .filter(WrappedTicketSelector::from(TicketSelector::from(&channel_entry)))
857                        .filter(ticket::Column::Index.gte(first_idx_to_take.to_be_bytes().to_vec()))
858                        .filter(ticket::Column::State.ne(AcknowledgedTicketStatus::BeingAggregated as u8))
859                        .order_by_asc(ticket::Column::Index)// tickets must be sorted by indices in ascending order
860                        .limit(MAX_TICKETS_TO_AGGREGATE_BATCH)
861                        .all(tx.as_ref())
862                        .await?;
863
864                    // Filter the list of tickets according to the prerequisites
865                    let mut to_be_aggregated: Vec<TransferableWinningTicket> =
866                        filter_satisfying_ticket_models(prerequisites, to_be_aggregated, &channel_entry, min_win_prob)?
867                            .into_iter()
868                            .map(|model| {
869                                AcknowledgedTicket::try_from(model)
870                                    .map_err(DbSqlError::from)
871                                    .and_then(|ack| {
872                                        ack.into_transferable(&myself.chain_key, &domain_separator)
873                                            .map_err(DbSqlError::from)
874                                    })
875                            })
876                            .collect::<crate::errors::Result<Vec<_>>>()?;
877
878                    let mut neglected_idxs = Vec::new();
879
880                    if !to_be_aggregated.is_empty() {
881                        // Clean up any tickets in this channel that are already inside an aggregated ticket.
882                        // This situation cannot be avoided 100% as aggregation can be triggered when out-of-order
883                        // tickets arrive and only some of them are necessary to satisfy the aggregation threshold.
884                        // The following code *assumes* that only the first ticket with the lowest index *can be* an aggregate.
885                        let first_ticket = to_be_aggregated[0].ticket.clone();
886                        let mut i = 1;
887                        while i < to_be_aggregated.len() {
888                            let current_idx = to_be_aggregated[i].ticket.index;
889                            if (first_ticket.index..first_ticket.index + first_ticket.index_offset as u64).contains(&current_idx) {
890                                // Cleanup is the only reasonable thing to do at this point,
891                                // since the aggregator will check for index range overlaps and deny
892                                // the aggregation of the entire batch otherwise.
893                                warn!(ticket_id = current_idx, channel = %channel_id, ?first_ticket, "ticket in channel has been already aggregated and will be removed");
894                                neglected_idxs.push(current_idx);
895                                to_be_aggregated.remove(i);
896                            } else {
897                                i += 1;
898                            }
899                        }
900
901                        // The cleanup (neglecting of tickets) is not made directly here but on the next ticket redemption in this channel
902                        // See handler.rs around L402
903                        if !neglected_idxs.is_empty() {
904                            warn!(count = neglected_idxs.len(), channel = %channel_id, "tickets were neglected due to duplication in an aggregated ticket!");
905                        }
906
907                        // mark all tickets with appropriate characteristics as being aggregated
908                        let marked: sea_orm::UpdateResult = ticket::Entity::update_many()
909                            .filter(WrappedTicketSelector::from(TicketSelector::from(&channel_entry)))
910                            .filter(ticket::Column::Index.is_in(to_be_aggregated.iter().map(|t| t.ticket.index.to_be_bytes().to_vec())))
911                            .filter(ticket::Column::State.ne(AcknowledgedTicketStatus::BeingAggregated as u8))
912                            .col_expr(
913                                ticket::Column::State,
914                                Expr::value(AcknowledgedTicketStatus::BeingAggregated as i8),
915                            )
916                            .exec(tx.as_ref())
917                            .await?;
918
919                        if marked.rows_affected as usize != to_be_aggregated.len() {
920                            return Err(DbSqlError::LogicalError(format!(
921                                "expected to mark {}, but was able to mark {}",
922                                to_be_aggregated.len(),
923                                marked.rows_affected,
924                            )));
925                        }
926                    }
927
928                    debug!(
929                        "prepared {} tickets to aggregate in {} ({})",
930                        to_be_aggregated.len(),
931                        channel_entry.get_id(),
932                        channel_entry.channel_epoch,
933                    );
934
935                    Ok(to_be_aggregated)
936                })
937            })
938            .await?;
939
940        Ok((!tickets.is_empty()).then_some((peer, tickets, domain_separator)))
941    }
942
943    async fn rollback_aggregation_in_channel(&self, channel: Hash) -> Result<()> {
944        let channel_entry = self
945            .get_channel_by_id(None, &channel)
946            .await?
947            .ok_or(DbSqlError::ChannelNotFound(channel))?;
948
949        let selector = TicketSelector::from(channel_entry).with_state(AcknowledgedTicketStatus::BeingAggregated);
950
951        let reverted = self
952            .update_ticket_states(selector, AcknowledgedTicketStatus::Untouched)
953            .await?;
954
955        info!(
956            "rollback happened for ticket aggregation in '{channel}' with {reverted} tickets rolled back as a result",
957        );
958        Ok(())
959    }
960
961    async fn process_received_aggregated_ticket(
962        &self,
963        aggregated_ticket: Ticket,
964        chain_keypair: &ChainKeypair,
965    ) -> Result<AcknowledgedTicket> {
966        if chain_keypair.public().to_address() != self.me_onchain {
967            return Err(DbSqlError::LogicalError(
968                "chain key for ticket aggregation does not match the DB public address".into(),
969            )
970            .into());
971        }
972
973        let myself = self.clone();
974        let channel_id = aggregated_ticket.channel_id;
975
976        let (channel_entry, domain_separator) = self
977            .nest_transaction_in_db(None, TargetDb::Index)
978            .await?
979            .perform(|tx| {
980                Box::pin(async move {
981                    let entry = myself
982                        .get_channel_by_id(Some(tx), &channel_id)
983                        .await?
984                        .ok_or(DbSqlError::ChannelNotFound(channel_id))?;
985
986                    if entry.status == ChannelStatus::Closed {
987                        return Err(DbSqlError::LogicalError(format!("channel '{channel_id}' is closed")));
988                    } else if entry.direction(&myself.me_onchain) != Some(ChannelDirection::Incoming) {
989                        return Err(DbSqlError::LogicalError(format!(
990                            "channel '{channel_id}' is not incoming"
991                        )));
992                    }
993
994                    let domain_separator =
995                        myself.get_indexer_data(Some(tx)).await?.channels_dst.ok_or_else(|| {
996                            crate::errors::DbSqlError::LogicalError("domain separator missing".into())
997                        })?;
998
999                    Ok((entry, domain_separator))
1000                })
1001            })
1002            .await?;
1003
1004        // Verify the ticket first
1005        let aggregated_ticket = aggregated_ticket
1006            .verify(&channel_entry.source, &domain_separator)
1007            .map_err(|e| {
1008                DbSqlError::LogicalError(format!(
1009                    "failed to verify received aggregated ticket in {channel_id}: {e}"
1010                ))
1011            })?;
1012
1013        // Aggregated tickets always have 100% winning probability
1014        if !f64_approx_eq(aggregated_ticket.win_prob(), 1.0, LOWEST_POSSIBLE_WINNING_PROB) {
1015            return Err(DbSqlError::LogicalError("Aggregated tickets must have 100% win probability".into()).into());
1016        }
1017
1018        let acknowledged_tickets = self
1019            .nest_transaction_in_db(None, TargetDb::Tickets)
1020            .await?
1021            .perform(|tx| {
1022                Box::pin(async move {
1023                    ticket::Entity::find()
1024                        .filter(WrappedTicketSelector::from(
1025                            TicketSelector::from(&channel_entry).with_state(AcknowledgedTicketStatus::BeingAggregated),
1026                        ))
1027                        .all(tx.as_ref())
1028                        .await
1029                        .map_err(DbSqlError::BackendError)
1030                })
1031            })
1032            .await?;
1033
1034        if acknowledged_tickets.is_empty() {
1035            debug!("Received unexpected aggregated ticket in channel {channel_id}");
1036            return Err(DbSqlError::LogicalError(format!(
1037                "failed insert aggregated ticket, because no tickets seem to be aggregated for '{channel_id}'",
1038            ))
1039            .into());
1040        }
1041
1042        let stored_value = acknowledged_tickets
1043            .iter()
1044            .map(|m| BalanceType::HOPR.balance_bytes(&m.amount))
1045            .fold(Balance::zero(BalanceType::HOPR), |acc, amount| acc.add(amount));
1046
1047        // The value of received ticket can be higher (profit for us) but not lower
1048        if aggregated_ticket.verified_ticket().amount.lt(&stored_value) {
1049            error!(channel = %channel_id, "Aggregated ticket value in channel is lower than sum of stored tickets");
1050            return Err(DbSqlError::LogicalError("Value of received aggregated ticket is too low".into()).into());
1051        }
1052
1053        let acknowledged_tickets = acknowledged_tickets
1054            .into_iter()
1055            .map(AcknowledgedTicket::try_from)
1056            .collect::<hopr_db_entity::errors::Result<Vec<AcknowledgedTicket>>>()
1057            .map_err(DbSqlError::from)?;
1058
1059        // can be done, because the tickets collection is tested for emptiness before
1060        let first_stored_ticket = acknowledged_tickets.first().unwrap();
1061
1062        // calculate the new current ticket index
1063        #[allow(unused_variables)]
1064        let current_ticket_index_from_aggregated_ticket =
1065            U256::from(aggregated_ticket.verified_ticket().index).add(aggregated_ticket.verified_ticket().index_offset);
1066
1067        let acked_aggregated_ticket = aggregated_ticket.into_acknowledged(first_stored_ticket.response.clone());
1068
1069        let ticket = acked_aggregated_ticket.clone();
1070        self.ticket_manager.replace_tickets(ticket).await?;
1071
1072        info!(%acked_aggregated_ticket, "successfully processed received aggregated ticket");
1073        Ok(acked_aggregated_ticket)
1074    }
1075
1076    async fn aggregate_tickets(
1077        &self,
1078        destination: OffchainPublicKey,
1079        mut acked_tickets: Vec<TransferableWinningTicket>,
1080        me: &ChainKeypair,
1081    ) -> Result<VerifiedTicket> {
1082        if me.public().to_address() != self.me_onchain {
1083            return Err(DbSqlError::LogicalError(
1084                "chain key for ticket aggregation does not match the DB public address".into(),
1085            )
1086            .into());
1087        }
1088
1089        let domain_separator = self
1090            .get_indexer_data(None)
1091            .await?
1092            .domain_separator(DomainSeparator::Channel)
1093            .ok_or_else(|| DbSqlError::LogicalError("domain separator missing".into()))?;
1094
1095        if acked_tickets.is_empty() {
1096            return Err(DbSqlError::LogicalError("at least one ticket required for aggregation".to_owned()).into());
1097        }
1098
1099        if acked_tickets.len() == 1 {
1100            let single = acked_tickets
1101                .pop()
1102                .unwrap()
1103                .into_redeemable(&self.me_onchain, &domain_separator)
1104                .map_err(DbSqlError::from)?;
1105
1106            self.compare_and_set_outgoing_ticket_index(
1107                single.verified_ticket().channel_id,
1108                single.verified_ticket().index + 1,
1109            )
1110            .await?;
1111
1112            return Ok(single.ticket);
1113        }
1114
1115        acked_tickets.sort_by(|a, b| a.partial_cmp(b).unwrap_or(cmp::Ordering::Equal));
1116        acked_tickets.dedup();
1117
1118        let myself = self.clone();
1119        let address = myself
1120            .resolve_chain_key(&destination)
1121            .await?
1122            .ok_or(DbSqlError::LogicalError(format!(
1123                "peer '{}' has no chain key record",
1124                destination.to_peerid_str()
1125            )))?;
1126
1127        let (channel_entry, destination, min_win_prob) = self
1128            .nest_transaction_in_db(None, TargetDb::Index)
1129            .await?
1130            .perform(|tx| {
1131                Box::pin(async move {
1132                    let entry = myself
1133                        .get_channel_by_parties(Some(tx), &myself.me_onchain, &address, false)
1134                        .await?
1135                        .ok_or_else(|| {
1136                            DbSqlError::ChannelNotFound(generate_channel_id(&myself.me_onchain, &address))
1137                        })?;
1138
1139                    if entry.status == ChannelStatus::Closed {
1140                        return Err(DbSqlError::LogicalError(format!("{entry} is closed")));
1141                    } else if entry.direction(&myself.me_onchain) != Some(ChannelDirection::Outgoing) {
1142                        return Err(DbSqlError::LogicalError(format!("{entry} is not outgoing")));
1143                    }
1144
1145                    let min_win_prob = myself
1146                        .get_indexer_data(Some(tx))
1147                        .await?
1148                        .minimum_incoming_ticket_winning_prob;
1149                    Ok((entry, address, min_win_prob))
1150                })
1151            })
1152            .await?;
1153
1154        let channel_balance = channel_entry.balance;
1155        let channel_epoch = channel_entry.channel_epoch.as_u32();
1156        let channel_id = channel_entry.get_id();
1157
1158        let mut final_value = Balance::zero(BalanceType::HOPR);
1159
1160        // Validate all received tickets and turn them into RedeemableTickets
1161        let verified_tickets = acked_tickets
1162            .into_iter()
1163            .map(|t| t.into_redeemable(&self.me_onchain, &domain_separator))
1164            .collect::<hopr_internal_types::errors::Result<Vec<_>>>()
1165            .map_err(|e| {
1166                DbSqlError::LogicalError(format!("trying to aggregate an invalid or a non-winning ticket: {e}"))
1167            })?;
1168
1169        // Perform additional consistency check on the verified tickets
1170        for (i, acked_ticket) in verified_tickets.iter().enumerate() {
1171            if channel_id != acked_ticket.verified_ticket().channel_id {
1172                return Err(DbSqlError::LogicalError(format!(
1173                    "ticket for aggregation has an invalid channel id {}",
1174                    acked_ticket.verified_ticket().channel_id
1175                ))
1176                .into());
1177            }
1178
1179            if acked_ticket.verified_ticket().channel_epoch != channel_epoch {
1180                return Err(DbSqlError::LogicalError("channel epochs do not match".into()).into());
1181            }
1182
1183            if i + 1 < verified_tickets.len()
1184                && acked_ticket.verified_ticket().index + acked_ticket.verified_ticket().index_offset as u64
1185                    > verified_tickets[i + 1].verified_ticket().index
1186            {
1187                return Err(DbSqlError::LogicalError("tickets with overlapping index intervals".into()).into());
1188            }
1189
1190            if !f64_approx_eq(
1191                acked_ticket.verified_ticket().win_prob(),
1192                min_win_prob,
1193                LOWEST_POSSIBLE_WINNING_PROB,
1194            ) && acked_ticket.verified_ticket().win_prob() < min_win_prob
1195            {
1196                return Err(DbSqlError::LogicalError(
1197                    "cannot aggregate ticket with lower than minimum winning probability in network".into(),
1198                )
1199                .into());
1200            }
1201
1202            final_value = final_value.add(&acked_ticket.verified_ticket().amount);
1203            if final_value.gt(&channel_balance) {
1204                return Err(DbSqlError::LogicalError(format!("ticket amount to aggregate {final_value} is greater than the balance {channel_balance} of channel {channel_id}")).into());
1205            }
1206        }
1207
1208        info!(
1209            "aggregated {} tickets in channel {channel_id} with total value {final_value}",
1210            verified_tickets.len()
1211        );
1212
1213        let first_acked_ticket = verified_tickets.first().unwrap();
1214        let last_acked_ticket = verified_tickets.last().unwrap();
1215
1216        // calculate the minimum current ticket index as the larger value from the acked ticket index and on-chain ticket_index from channel_entry
1217        let current_ticket_index_from_acked_tickets = last_acked_ticket.verified_ticket().index + 1;
1218        self.compare_and_set_outgoing_ticket_index(channel_id, current_ticket_index_from_acked_tickets)
1219            .await?;
1220
1221        Ok(TicketBuilder::default()
1222            .direction(&self.me_onchain, &destination)
1223            .balance(final_value)
1224            .index(first_acked_ticket.verified_ticket().index)
1225            .index_offset(
1226                (last_acked_ticket.verified_ticket().index - first_acked_ticket.verified_ticket().index + 1) as u32,
1227            )
1228            .win_prob(1.0) // Aggregated tickets have always 100% winning probability
1229            .channel_epoch(channel_epoch)
1230            .challenge(first_acked_ticket.verified_ticket().challenge)
1231            .build_signed(me, &domain_separator)
1232            .map_err(DbSqlError::from)?)
1233    }
1234
1235    async fn fix_channels_next_ticket_state(&self) -> Result<()> {
1236        let channels = self.get_incoming_channels(None).await?;
1237
1238        for channel in channels.into_iter() {
1239            let selector = TicketSelector::from(&channel)
1240                .with_state(AcknowledgedTicketStatus::BeingRedeemed)
1241                .with_index(channel.ticket_index.as_u64());
1242
1243            let mut tickets_stream = self
1244                .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::Untouched)
1245                .await?;
1246
1247            while let Some(ticket) = tickets_stream.next().await {
1248                let channel_id = channel.get_id();
1249                let ticket_index = ticket.verified_ticket().index;
1250                let ticket_amount = ticket.verified_ticket().amount;
1251                info!(%channel_id, %ticket_index, %ticket_amount, "fixed next out-of-sync ticket");
1252            }
1253        }
1254
1255        Ok(())
1256    }
1257}
1258
1259impl HoprDb {
1260    /// Used only by non-SQLite code and tests.
1261    pub async fn upsert_ticket<'a>(&'a self, tx: OptTx<'a>, acknowledged_ticket: AcknowledgedTicket) -> Result<()> {
1262        self.nest_transaction_in_db(tx, TargetDb::Tickets)
1263            .await?
1264            .perform(|tx| {
1265                Box::pin(async move {
1266                    // For upserting, we must select only by the triplet (channel id, epoch, index)
1267                    let selector = WrappedTicketSelector::from(
1268                        TicketSelector::new(
1269                            acknowledged_ticket.verified_ticket().channel_id,
1270                            acknowledged_ticket.verified_ticket().channel_epoch,
1271                        )
1272                        .with_index(acknowledged_ticket.verified_ticket().index),
1273                    );
1274
1275                    debug!("upserting ticket {acknowledged_ticket}");
1276                    let mut model = ticket::ActiveModel::from(acknowledged_ticket);
1277
1278                    if let Some(ticket) = ticket::Entity::find().filter(selector).one(tx.as_ref()).await? {
1279                        model.id = Set(ticket.id);
1280                    }
1281
1282                    Ok::<_, DbSqlError>(model.save(tx.as_ref()).await?)
1283                })
1284            })
1285            .await?;
1286        Ok(())
1287    }
1288}
1289
1290#[cfg(test)]
1291mod tests {
1292    use anyhow::{anyhow, Context};
1293    use futures::{pin_mut, StreamExt};
1294    use hex_literal::hex;
1295    use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter, Set};
1296    use std::ops::Add;
1297    use std::sync::atomic::Ordering;
1298    use std::time::{Duration, SystemTime};
1299
1300    use hopr_crypto_types::prelude::*;
1301    use hopr_db_api::prelude::{DbError, TicketMarker};
1302    use hopr_db_api::{info::DomainSeparator, tickets::ChannelTicketStatistics};
1303    use hopr_db_entity::ticket;
1304    use hopr_internal_types::prelude::*;
1305    use hopr_primitive_types::prelude::*;
1306
1307    use crate::accounts::HoprDbAccountOperations;
1308    use crate::channels::HoprDbChannelOperations;
1309    use crate::db::HoprDb;
1310    use crate::errors::DbSqlError;
1311    use crate::info::HoprDbInfoOperations;
1312    use crate::tickets::{
1313        filter_satisfying_ticket_models, AggregationPrerequisites, HoprDbTicketOperations, TicketSelector,
1314    };
1315    use crate::{HoprDbGeneralModelOperations, TargetDb};
1316
1317    lazy_static::lazy_static! {
1318        static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
1319        static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
1320        static ref CHANNEL_ID: Hash = generate_channel_id(&BOB.public().to_address(), &ALICE.public().to_address());
1321    }
1322
1323    lazy_static::lazy_static! {
1324        static ref ALICE_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
1325        static ref BOB_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
1326    }
1327
1328    const TICKET_VALUE: u64 = 100_000;
1329
1330    async fn add_peer_mappings(db: &HoprDb, peers: Vec<(OffchainKeypair, ChainKeypair)>) -> crate::errors::Result<()> {
1331        for (peer_offchain, peer_onchain) in peers.into_iter() {
1332            db.insert_account(
1333                None,
1334                AccountEntry {
1335                    public_key: *peer_offchain.public(),
1336                    chain_addr: peer_onchain.public().to_address(),
1337                    entry_type: AccountType::NotAnnounced,
1338                },
1339            )
1340            .await?
1341        }
1342
1343        Ok(())
1344    }
1345
1346    fn generate_random_ack_ticket(
1347        src: &ChainKeypair,
1348        dst: &ChainKeypair,
1349        index: u64,
1350        index_offset: u32,
1351        win_prob: f64,
1352    ) -> anyhow::Result<AcknowledgedTicket> {
1353        let hk1 = HalfKey::random();
1354        let hk2 = HalfKey::random();
1355
1356        let cp1: CurvePoint = hk1.to_challenge().try_into()?;
1357        let cp2: CurvePoint = hk2.to_challenge().try_into()?;
1358        let cp_sum = CurvePoint::combine(&[&cp1, &cp2]);
1359
1360        Ok(TicketBuilder::default()
1361            .addresses(src, dst)
1362            .amount(TICKET_VALUE)
1363            .index(index)
1364            .index_offset(index_offset)
1365            .win_prob(win_prob)
1366            .channel_epoch(4)
1367            .challenge(Challenge::from(cp_sum).to_ethereum_challenge())
1368            .build_signed(src, &Hash::default())?
1369            .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?))
1370    }
1371
1372    async fn init_db_with_tickets(
1373        db: &HoprDb,
1374        count_tickets: u64,
1375    ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
1376        init_db_with_tickets_and_channel(db, count_tickets, None).await
1377    }
1378
1379    async fn init_db_with_tickets_and_channel(
1380        db: &HoprDb,
1381        count_tickets: u64,
1382        channel_ticket_index: Option<u32>,
1383    ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
1384        let channel = ChannelEntry::new(
1385            BOB.public().to_address(),
1386            ALICE.public().to_address(),
1387            BalanceType::HOPR.balance(u32::MAX),
1388            channel_ticket_index.unwrap_or(0u32).into(),
1389            ChannelStatus::Open,
1390            4_u32.into(),
1391        );
1392
1393        db.upsert_channel(None, channel).await?;
1394
1395        let tickets: Vec<AcknowledgedTicket> = (0..count_tickets)
1396            .map(|i| generate_random_ack_ticket(&BOB, &ALICE, i, 1, 1.0))
1397            .collect::<anyhow::Result<Vec<AcknowledgedTicket>>>()?;
1398
1399        let db_clone = db.clone();
1400        let tickets_clone = tickets.clone();
1401        db.begin_transaction_in_db(TargetDb::Tickets)
1402            .await?
1403            .perform(|tx| {
1404                Box::pin(async move {
1405                    for t in tickets_clone {
1406                        db_clone.upsert_ticket(Some(tx), t).await?;
1407                    }
1408                    Ok::<(), DbSqlError>(())
1409                })
1410            })
1411            .await?;
1412
1413        Ok((channel, tickets))
1414    }
1415
1416    #[async_std::test]
1417    async fn test_insert_get_ticket() -> anyhow::Result<()> {
1418        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1419        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
1420            .await?;
1421
1422        let (channel, mut tickets) = init_db_with_tickets(&db, 1).await?;
1423        let ack_ticket = tickets.pop().context("ticket should be present")?;
1424
1425        assert_eq!(
1426            channel.get_id(),
1427            ack_ticket.verified_ticket().channel_id,
1428            "channel ids must match"
1429        );
1430        assert_eq!(
1431            channel.channel_epoch.as_u32(),
1432            ack_ticket.verified_ticket().channel_epoch,
1433            "epochs must match"
1434        );
1435
1436        let db_ticket = db
1437            .get_tickets((&ack_ticket).into())
1438            .await?
1439            .first()
1440            .cloned()
1441            .context("ticket should exist")?;
1442
1443        assert_eq!(ack_ticket, db_ticket, "tickets must be equal");
1444
1445        Ok(())
1446    }
1447
1448    #[async_std::test]
1449    async fn test_mark_redeemed() -> anyhow::Result<()> {
1450        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1451        const COUNT_TICKETS: u64 = 10;
1452
1453        let (_, tickets) = init_db_with_tickets(&db, COUNT_TICKETS).await?;
1454
1455        let stats = db.get_ticket_statistics(None).await?;
1456        assert_eq!(
1457            BalanceType::HOPR.balance(TICKET_VALUE * COUNT_TICKETS),
1458            stats.unredeemed_value,
1459            "unredeemed balance must match"
1460        );
1461        assert_eq!(
1462            BalanceType::HOPR.zero(),
1463            stats.redeemed_value,
1464            "there must be 0 redeemed value"
1465        );
1466
1467        assert_eq!(
1468            stats,
1469            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1470            "per channel stats must be same"
1471        );
1472
1473        const TO_REDEEM: u64 = 2;
1474        let db_clone = db.clone();
1475        db.begin_transaction_in_db(TargetDb::Tickets)
1476            .await?
1477            .perform(|_tx| {
1478                Box::pin(async move {
1479                    for i in 0..TO_REDEEM as usize {
1480                        let r = db_clone
1481                            .mark_tickets_as((&tickets[i]).into(), TicketMarker::Redeemed)
1482                            .await?;
1483                        assert_eq!(1, r, "must redeem only a single ticket");
1484                    }
1485                    Ok::<(), DbSqlError>(())
1486                })
1487            })
1488            .await?;
1489
1490        let stats = db.get_ticket_statistics(None).await?;
1491        assert_eq!(
1492            BalanceType::HOPR.balance(TICKET_VALUE * (COUNT_TICKETS - TO_REDEEM)),
1493            stats.unredeemed_value,
1494            "unredeemed balance must match"
1495        );
1496        assert_eq!(
1497            BalanceType::HOPR.balance(TICKET_VALUE * TO_REDEEM),
1498            stats.redeemed_value,
1499            "there must be a redeemed value"
1500        );
1501
1502        assert_eq!(
1503            stats,
1504            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1505            "per channel stats must be same"
1506        );
1507
1508        Ok(())
1509    }
1510
1511    #[async_std::test]
1512    async fn test_mark_redeem_should_not_mark_redeem_twice() -> anyhow::Result<()> {
1513        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1514
1515        let ticket = init_db_with_tickets(&db, 1)
1516            .await?
1517            .1
1518            .pop()
1519            .context("should contain a ticket")?;
1520
1521        db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed).await?;
1522        assert_eq!(0, db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed).await?);
1523
1524        Ok(())
1525    }
1526
1527    #[async_std::test]
1528    async fn test_mark_redeem_should_redeem_all_tickets() -> anyhow::Result<()> {
1529        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1530
1531        let count_tickets = 10;
1532        let channel = init_db_with_tickets(&db, count_tickets).await?.0;
1533
1534        let count_marked = db.mark_tickets_as((&channel).into(), TicketMarker::Redeemed).await?;
1535        assert_eq!(count_tickets, count_marked as u64, "must mark all tickets in channel");
1536
1537        Ok(())
1538    }
1539
1540    #[async_std::test]
1541    async fn test_mark_tickets_neglected() -> anyhow::Result<()> {
1542        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1543        const COUNT_TICKETS: u64 = 10;
1544
1545        let (channel, _) = init_db_with_tickets(&db, COUNT_TICKETS).await?;
1546
1547        let stats = db.get_ticket_statistics(None).await?;
1548        assert_eq!(
1549            BalanceType::HOPR.balance(TICKET_VALUE * COUNT_TICKETS),
1550            stats.unredeemed_value,
1551            "unredeemed balance must match"
1552        );
1553        assert_eq!(
1554            BalanceType::HOPR.zero(),
1555            stats.neglected_value,
1556            "there must be 0 redeemed value"
1557        );
1558
1559        assert_eq!(
1560            stats,
1561            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1562            "per channel stats must be same"
1563        );
1564
1565        db.mark_tickets_as((&channel).into(), TicketMarker::Neglected).await?;
1566
1567        let stats = db.get_ticket_statistics(None).await?;
1568        assert_eq!(
1569            BalanceType::HOPR.zero(),
1570            stats.unredeemed_value,
1571            "unredeemed balance must be zero"
1572        );
1573        assert_eq!(
1574            BalanceType::HOPR.balance(TICKET_VALUE * COUNT_TICKETS),
1575            stats.neglected_value,
1576            "there must be a neglected value"
1577        );
1578
1579        assert_eq!(
1580            stats,
1581            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1582            "per channel stats must be same"
1583        );
1584
1585        Ok(())
1586    }
1587
1588    #[async_std::test]
1589    async fn test_mark_unsaved_ticket_rejected() -> anyhow::Result<()> {
1590        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1591
1592        let (_, mut ticket) = init_db_with_tickets(&db, 1).await?;
1593        let ticket = ticket.pop().context("ticket should be present")?.ticket;
1594
1595        let stats = db.get_ticket_statistics(None).await?;
1596        assert_eq!(BalanceType::HOPR.zero(), stats.rejected_value);
1597        assert_eq!(
1598            stats,
1599            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1600            "per channel stats must be same"
1601        );
1602
1603        db.mark_unsaved_ticket_rejected(ticket.verified_ticket()).await?;
1604
1605        let stats = db.get_ticket_statistics(None).await?;
1606        assert_eq!(ticket.verified_ticket().amount, stats.rejected_value);
1607        assert_eq!(
1608            stats,
1609            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1610            "per channel stats must be same"
1611        );
1612
1613        Ok(())
1614    }
1615
1616    #[async_std::test]
1617    async fn test_update_tickets_states_and_fetch() -> anyhow::Result<()> {
1618        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1619        db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
1620            .await?;
1621
1622        let channel = init_db_with_tickets(&db, 10).await?.0;
1623
1624        let selector = TicketSelector::from(&channel).with_index(5);
1625
1626        let v: Vec<AcknowledgedTicket> = db
1627            .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
1628            .await?
1629            .collect()
1630            .await;
1631
1632        assert_eq!(1, v.len(), "single ticket must be updated");
1633        assert_eq!(
1634            AcknowledgedTicketStatus::BeingRedeemed,
1635            v.first().context("should contain a ticket")?.status,
1636            "status must be set"
1637        );
1638
1639        let selector = TicketSelector::from(&channel).with_state(AcknowledgedTicketStatus::Untouched);
1640
1641        let v: Vec<AcknowledgedTicket> = db
1642            .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
1643            .await?
1644            .collect()
1645            .await;
1646
1647        assert_eq!(9, v.len(), "only specific tickets must have state set");
1648        assert!(
1649            v.iter().all(|t| t.verified_ticket().index != 5),
1650            "only tickets with different state must update"
1651        );
1652        assert!(
1653            v.iter().all(|t| t.status == AcknowledgedTicketStatus::BeingRedeemed),
1654            "tickets must have updated state"
1655        );
1656
1657        Ok(())
1658    }
1659
1660    #[async_std::test]
1661    async fn test_update_tickets_states() -> anyhow::Result<()> {
1662        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1663        db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
1664            .await?;
1665
1666        let channel = init_db_with_tickets(&db, 10).await?.0;
1667        let selector = TicketSelector::from(&channel).with_state(AcknowledgedTicketStatus::Untouched);
1668
1669        db.update_ticket_states(selector.clone(), AcknowledgedTicketStatus::BeingRedeemed)
1670            .await?;
1671
1672        let v: Vec<AcknowledgedTicket> = db
1673            .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
1674            .await?
1675            .collect()
1676            .await;
1677
1678        assert!(v.is_empty(), "must not update if already updated");
1679
1680        Ok(())
1681    }
1682
1683    #[async_std::test]
1684    async fn test_ticket_index_should_be_zero_if_not_yet_present() -> anyhow::Result<()> {
1685        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1686
1687        let hash = Hash::default();
1688
1689        let idx = db.get_outgoing_ticket_index(hash).await?;
1690        assert_eq!(0, idx.load(Ordering::SeqCst), "initial index must be zero");
1691
1692        let r = hopr_db_entity::outgoing_ticket_index::Entity::find()
1693            .filter(hopr_db_entity::outgoing_ticket_index::Column::ChannelId.eq(hash.to_hex()))
1694            .one(&db.tickets_db)
1695            .await?
1696            .context("index must exist")?;
1697
1698        assert_eq!(0, U256::from_be_bytes(r.index).as_u64(), "index must be zero");
1699
1700        Ok(())
1701    }
1702
1703    #[async_std::test]
1704    async fn test_ticket_stats_must_fail_for_non_existing_channel() -> anyhow::Result<()> {
1705        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1706
1707        db.get_ticket_statistics(Some(*CHANNEL_ID))
1708            .await
1709            .expect_err("must fail for non-existing channel");
1710
1711        Ok(())
1712    }
1713
1714    #[async_std::test]
1715    async fn test_ticket_stats_must_be_zero_when_no_tickets() -> anyhow::Result<()> {
1716        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1717
1718        let channel = ChannelEntry::new(
1719            BOB.public().to_address(),
1720            ALICE.public().to_address(),
1721            BalanceType::HOPR.balance(u32::MAX),
1722            0.into(),
1723            ChannelStatus::Open,
1724            4_u32.into(),
1725        );
1726
1727        db.upsert_channel(None, channel).await?;
1728
1729        let stats = db.get_ticket_statistics(Some(*CHANNEL_ID)).await?;
1730
1731        assert_eq!(
1732            ChannelTicketStatistics::default(),
1733            stats,
1734            "must be equal to default which is all zeros"
1735        );
1736
1737        assert_eq!(
1738            stats,
1739            db.get_ticket_statistics(None).await?,
1740            "per-channel stats must be the same as global stats"
1741        );
1742
1743        Ok(())
1744    }
1745
1746    #[async_std::test]
1747    async fn test_ticket_stats_must_be_different_per_channel() -> anyhow::Result<()> {
1748        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1749
1750        let channel_1 = ChannelEntry::new(
1751            BOB.public().to_address(),
1752            ALICE.public().to_address(),
1753            BalanceType::HOPR.balance(u32::MAX),
1754            0.into(),
1755            ChannelStatus::Open,
1756            4_u32.into(),
1757        );
1758
1759        db.upsert_channel(None, channel_1).await?;
1760
1761        let channel_2 = ChannelEntry::new(
1762            ALICE.public().to_address(),
1763            BOB.public().to_address(),
1764            BalanceType::HOPR.balance(u32::MAX),
1765            0.into(),
1766            ChannelStatus::Open,
1767            4_u32.into(),
1768        );
1769
1770        db.upsert_channel(None, channel_2).await?;
1771
1772        let t1 = generate_random_ack_ticket(&BOB, &ALICE, 1, 1, 1.0)?;
1773        let t2 = generate_random_ack_ticket(&ALICE, &BOB, 1, 1, 1.0)?;
1774
1775        let value = t1.verified_ticket().amount;
1776
1777        db.upsert_ticket(None, t1).await?;
1778        db.upsert_ticket(None, t2).await?;
1779
1780        let stats_1 = db
1781            .get_ticket_statistics(Some(generate_channel_id(
1782                &BOB.public().to_address(),
1783                &ALICE.public().to_address(),
1784            )))
1785            .await?;
1786
1787        let stats_2 = db
1788            .get_ticket_statistics(Some(generate_channel_id(
1789                &ALICE.public().to_address(),
1790                &BOB.public().to_address(),
1791            )))
1792            .await?;
1793
1794        assert_eq!(value, stats_1.unredeemed_value);
1795        assert_eq!(value, stats_2.unredeemed_value);
1796
1797        assert_eq!(BalanceType::HOPR.zero(), stats_1.neglected_value);
1798        assert_eq!(BalanceType::HOPR.zero(), stats_2.neglected_value);
1799
1800        assert_eq!(stats_1, stats_2);
1801
1802        db.mark_tickets_as(channel_1.into(), TicketMarker::Neglected).await?;
1803
1804        let stats_1 = db
1805            .get_ticket_statistics(Some(generate_channel_id(
1806                &BOB.public().to_address(),
1807                &ALICE.public().to_address(),
1808            )))
1809            .await?;
1810
1811        let stats_2 = db
1812            .get_ticket_statistics(Some(generate_channel_id(
1813                &ALICE.public().to_address(),
1814                &BOB.public().to_address(),
1815            )))
1816            .await?;
1817
1818        assert_eq!(BalanceType::HOPR.zero(), stats_1.unredeemed_value);
1819        assert_eq!(value, stats_1.neglected_value);
1820
1821        assert_eq!(BalanceType::HOPR.zero(), stats_2.neglected_value);
1822
1823        Ok(())
1824    }
1825
1826    #[async_std::test]
1827    async fn test_ticket_index_compare_and_set_and_increment() -> anyhow::Result<()> {
1828        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1829
1830        let hash = Hash::default();
1831
1832        let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1833        assert_eq!(0, old_idx, "old value must be 0");
1834
1835        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1836        assert_eq!(1, new_idx, "new value must be 1");
1837
1838        let old_idx = db.increment_outgoing_ticket_index(hash).await?;
1839        assert_eq!(1, old_idx, "old value must be 1");
1840
1841        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1842        assert_eq!(2, new_idx, "new value must be 2");
1843
1844        Ok(())
1845    }
1846
1847    #[async_std::test]
1848    async fn test_ticket_index_compare_and_set_must_not_decrease() -> anyhow::Result<()> {
1849        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1850
1851        let hash = Hash::default();
1852
1853        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1854        assert_eq!(0, new_idx, "value must be 0");
1855
1856        db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1857
1858        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1859        assert_eq!(1, new_idx, "new value must be 1");
1860
1861        let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 0).await?;
1862        assert_eq!(1, old_idx, "old value must be 1");
1863        assert_eq!(1, new_idx, "new value must be 1");
1864
1865        let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1866        assert_eq!(1, old_idx, "old value must be 1");
1867        assert_eq!(1, new_idx, "new value must be 1");
1868
1869        Ok(())
1870    }
1871
1872    #[async_std::test]
1873    async fn test_ticket_index_reset() -> anyhow::Result<()> {
1874        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1875
1876        let hash = Hash::default();
1877
1878        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1879        assert_eq!(0, new_idx, "value must be 0");
1880
1881        db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1882
1883        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1884        assert_eq!(1, new_idx, "new value must be 1");
1885
1886        let old_idx = db.reset_outgoing_ticket_index(hash).await?;
1887        assert_eq!(1, old_idx, "old value must be 1");
1888
1889        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1890        assert_eq!(0, new_idx, "new value must be 0");
1891        Ok(())
1892    }
1893
1894    #[async_std::test]
1895    async fn test_persist_ticket_indices() -> anyhow::Result<()> {
1896        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1897
1898        let hash_1 = Hash::default();
1899        let hash_2 = Hash::from(hopr_crypto_random::random_bytes());
1900
1901        db.get_outgoing_ticket_index(hash_1).await?;
1902        db.compare_and_set_outgoing_ticket_index(hash_2, 10).await?;
1903
1904        let persisted = db.persist_outgoing_ticket_indices().await?;
1905        assert_eq!(1, persisted);
1906
1907        let indices = hopr_db_entity::outgoing_ticket_index::Entity::find()
1908            .all(&db.tickets_db)
1909            .await?;
1910        let idx_1 = indices
1911            .iter()
1912            .find(|idx| idx.channel_id == hash_1.to_hex())
1913            .context("must contain index 1")?;
1914        let idx_2 = indices
1915            .iter()
1916            .find(|idx| idx.channel_id == hash_2.to_hex())
1917            .context("must contain index 2")?;
1918        assert_eq!(0, U256::from_be_bytes(&idx_1.index).as_u64(), "index must be 0");
1919        assert_eq!(10, U256::from_be_bytes(&idx_2.index).as_u64(), "index must be 10");
1920
1921        db.compare_and_set_outgoing_ticket_index(hash_1, 3).await?;
1922        db.increment_outgoing_ticket_index(hash_2).await?;
1923
1924        let persisted = db.persist_outgoing_ticket_indices().await?;
1925        assert_eq!(2, persisted);
1926
1927        let indices = hopr_db_entity::outgoing_ticket_index::Entity::find()
1928            .all(&db.tickets_db)
1929            .await?;
1930        let idx_1 = indices
1931            .iter()
1932            .find(|idx| idx.channel_id == hash_1.to_hex())
1933            .context("must contain index 1")?;
1934        let idx_2 = indices
1935            .iter()
1936            .find(|idx| idx.channel_id == hash_2.to_hex())
1937            .context("must contain index 2")?;
1938        assert_eq!(3, U256::from_be_bytes(&idx_1.index).as_u64(), "index must be 3");
1939        assert_eq!(11, U256::from_be_bytes(&idx_2.index).as_u64(), "index must be 11");
1940        Ok(())
1941    }
1942
1943    #[async_std::test]
1944    async fn test_cache_can_be_cloned_but_referencing_the_original_cache_storage() -> anyhow::Result<()> {
1945        let cache: moka::future::Cache<i64, i64> = moka::future::Cache::new(5);
1946
1947        assert_eq!(cache.weighted_size(), 0);
1948
1949        cache.insert(1, 1).await;
1950        cache.insert(2, 2).await;
1951
1952        let clone = cache.clone();
1953
1954        cache.remove(&1).await;
1955        cache.remove(&2).await;
1956
1957        assert_eq!(cache.get(&1).await, None);
1958        assert_eq!(cache.get(&1).await, clone.get(&1).await);
1959        Ok(())
1960    }
1961
1962    fn dummy_ticket_model(channel_id: Hash, idx: u64, idx_offset: u32, amount: u32) -> ticket::Model {
1963        ticket::Model {
1964            id: 0,
1965            channel_id: channel_id.to_string(),
1966            amount: U256::from(amount).to_be_bytes().to_vec(),
1967            index: idx.to_be_bytes().to_vec(),
1968            index_offset: idx_offset as i32,
1969            winning_probability: hex!("0020C49BA5E34F").to_vec(), // 0.0005
1970            channel_epoch: vec![],
1971            signature: vec![],
1972            response: vec![],
1973            state: 0,
1974            hash: vec![],
1975        }
1976    }
1977
1978    #[async_std::test]
1979    async fn test_aggregation_prerequisites_default_filter_no_tickets() -> anyhow::Result<()> {
1980        let prerequisites = AggregationPrerequisites::default();
1981        assert_eq!(None, prerequisites.min_unaggregated_ratio);
1982        assert_eq!(None, prerequisites.min_ticket_count);
1983
1984        let channel = ChannelEntry::new(
1985            BOB.public().to_address(),
1986            ALICE.public().to_address(),
1987            BalanceType::HOPR.balance(u32::MAX),
1988            2.into(),
1989            ChannelStatus::Open,
1990            4_u32.into(),
1991        );
1992
1993        let dummy_tickets = vec![dummy_ticket_model(channel.get_id(), 1, 1, 1)];
1994
1995        let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
1996
1997        assert_eq!(
1998            dummy_tickets, filtered_tickets,
1999            "empty prerequisites must not filter anything"
2000        );
2001        Ok(())
2002    }
2003
2004    #[async_std::test]
2005    async fn test_aggregation_prerequisites_should_filter_out_tickets_with_lower_than_min_win_prob(
2006    ) -> anyhow::Result<()> {
2007        let prerequisites = AggregationPrerequisites::default();
2008        assert_eq!(None, prerequisites.min_unaggregated_ratio);
2009        assert_eq!(None, prerequisites.min_ticket_count);
2010
2011        let channel = ChannelEntry::new(
2012            BOB.public().to_address(),
2013            ALICE.public().to_address(),
2014            BalanceType::HOPR.balance(u32::MAX),
2015            2.into(),
2016            ChannelStatus::Open,
2017            4_u32.into(),
2018        );
2019
2020        let dummy_tickets = vec![dummy_ticket_model(channel.get_id(), 1, 1, 1)];
2021
2022        let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0006)?;
2023
2024        assert!(
2025            filtered_tickets.is_empty(),
2026            "must filter out tickets with lower win prob"
2027        );
2028        Ok(())
2029    }
2030
2031    #[async_std::test]
2032    async fn test_aggregation_prerequisites_must_trim_tickets_exceeding_channel_balance() -> anyhow::Result<()> {
2033        const TICKET_COUNT: usize = 110;
2034
2035        let prerequisites = AggregationPrerequisites::default();
2036        assert_eq!(None, prerequisites.min_unaggregated_ratio);
2037        assert_eq!(None, prerequisites.min_ticket_count);
2038
2039        let channel = ChannelEntry::new(
2040            BOB.public().to_address(),
2041            ALICE.public().to_address(),
2042            BalanceType::HOPR.balance(100),
2043            (TICKET_COUNT + 1).into(),
2044            ChannelStatus::Open,
2045            4_u32.into(),
2046        );
2047
2048        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2049            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2050            .collect();
2051
2052        let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
2053
2054        assert_eq!(
2055            100,
2056            filtered_tickets.len(),
2057            "must take only tickets up to channel balance"
2058        );
2059        assert!(
2060            filtered_tickets
2061                .into_iter()
2062                .map(|t| U256::from_be_bytes(t.amount).as_u64())
2063                .sum::<u64>()
2064                <= channel.balance.amount().as_u64(),
2065            "filtered tickets must not exceed channel balance"
2066        );
2067        Ok(())
2068    }
2069
2070    #[async_std::test]
2071    async fn test_aggregation_prerequisites_must_return_empty_when_minimum_ticket_count_not_met() -> anyhow::Result<()>
2072    {
2073        const TICKET_COUNT: usize = 10;
2074
2075        let prerequisites = AggregationPrerequisites {
2076            min_ticket_count: Some(TICKET_COUNT + 1),
2077            min_unaggregated_ratio: None,
2078        };
2079
2080        let channel = ChannelEntry::new(
2081            BOB.public().to_address(),
2082            ALICE.public().to_address(),
2083            BalanceType::HOPR.balance(100),
2084            (TICKET_COUNT + 1).into(),
2085            ChannelStatus::Open,
2086            4_u32.into(),
2087        );
2088
2089        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2090            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2091            .collect();
2092
2093        let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
2094
2095        assert!(
2096            filtered_tickets.is_empty(),
2097            "must return empty when min_ticket_count is not met"
2098        );
2099
2100        Ok(())
2101    }
2102
2103    #[async_std::test]
2104    async fn test_aggregation_prerequisites_must_return_empty_when_minimum_unaggregated_ratio_is_not_met(
2105    ) -> anyhow::Result<()> {
2106        const TICKET_COUNT: usize = 10;
2107
2108        let prerequisites = AggregationPrerequisites {
2109            min_ticket_count: None,
2110            min_unaggregated_ratio: Some(0.9),
2111        };
2112
2113        let channel = ChannelEntry::new(
2114            BOB.public().to_address(),
2115            ALICE.public().to_address(),
2116            BalanceType::HOPR.balance(100),
2117            (TICKET_COUNT + 1).into(),
2118            ChannelStatus::Open,
2119            4_u32.into(),
2120        );
2121
2122        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2123            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2124            .collect();
2125
2126        let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
2127
2128        assert!(
2129            filtered_tickets.is_empty(),
2130            "must return empty when min_unaggregated_ratio is not met"
2131        );
2132
2133        Ok(())
2134    }
2135
2136    #[async_std::test]
2137    async fn test_aggregation_prerequisites_must_return_all_when_minimum_ticket_count_is_met() -> anyhow::Result<()> {
2138        const TICKET_COUNT: usize = 10;
2139
2140        let prerequisites = AggregationPrerequisites {
2141            min_ticket_count: Some(TICKET_COUNT),
2142            min_unaggregated_ratio: None,
2143        };
2144
2145        let channel = ChannelEntry::new(
2146            BOB.public().to_address(),
2147            ALICE.public().to_address(),
2148            BalanceType::HOPR.balance(100),
2149            (TICKET_COUNT + 1).into(),
2150            ChannelStatus::Open,
2151            4_u32.into(),
2152        );
2153
2154        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2155            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2156            .collect();
2157
2158        let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
2159
2160        assert!(!filtered_tickets.is_empty(), "must not return empty");
2161        assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2162        Ok(())
2163    }
2164
2165    #[async_std::test]
2166    async fn test_aggregation_prerequisites_must_return_all_when_minimum_ticket_count_is_met_regardless_ratio(
2167    ) -> anyhow::Result<()> {
2168        const TICKET_COUNT: usize = 10;
2169
2170        let prerequisites = AggregationPrerequisites {
2171            min_ticket_count: Some(TICKET_COUNT),
2172            min_unaggregated_ratio: Some(0.9),
2173        };
2174
2175        let channel = ChannelEntry::new(
2176            BOB.public().to_address(),
2177            ALICE.public().to_address(),
2178            BalanceType::HOPR.balance(100),
2179            (TICKET_COUNT + 1).into(),
2180            ChannelStatus::Open,
2181            4_u32.into(),
2182        );
2183
2184        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2185            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2186            .collect();
2187
2188        let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
2189
2190        assert!(!filtered_tickets.is_empty(), "must not return empty");
2191        assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2192        Ok(())
2193    }
2194
2195    #[async_std::test]
2196    async fn test_aggregation_prerequisites_must_return_all_when_minimum_unaggregated_ratio_is_met(
2197    ) -> anyhow::Result<()> {
2198        const TICKET_COUNT: usize = 90;
2199
2200        let prerequisites = AggregationPrerequisites {
2201            min_ticket_count: None,
2202            min_unaggregated_ratio: Some(0.9),
2203        };
2204
2205        let channel = ChannelEntry::new(
2206            BOB.public().to_address(),
2207            ALICE.public().to_address(),
2208            BalanceType::HOPR.balance(100),
2209            (TICKET_COUNT + 1).into(),
2210            ChannelStatus::Open,
2211            4_u32.into(),
2212        );
2213
2214        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2215            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2216            .collect();
2217
2218        let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
2219
2220        assert!(!filtered_tickets.is_empty(), "must not return empty");
2221        assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2222        Ok(())
2223    }
2224
2225    #[async_std::test]
2226    async fn test_aggregation_prerequisites_must_return_all_when_minimum_unaggregated_ratio_is_met_regardless_count(
2227    ) -> anyhow::Result<()> {
2228        const TICKET_COUNT: usize = 90;
2229
2230        let prerequisites = AggregationPrerequisites {
2231            min_ticket_count: Some(TICKET_COUNT + 1),
2232            min_unaggregated_ratio: Some(0.9),
2233        };
2234
2235        let channel = ChannelEntry::new(
2236            BOB.public().to_address(),
2237            ALICE.public().to_address(),
2238            BalanceType::HOPR.balance(100),
2239            (TICKET_COUNT + 1).into(),
2240            ChannelStatus::Open,
2241            4_u32.into(),
2242        );
2243
2244        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2245            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2246            .collect();
2247
2248        let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
2249
2250        assert!(!filtered_tickets.is_empty(), "must not return empty");
2251        assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2252        Ok(())
2253    }
2254
2255    #[async_std::test]
2256    async fn test_aggregation_prerequisites_must_return_tickets_when_minimum_incl_aggregated_ratio_is_met(
2257    ) -> anyhow::Result<()> {
2258        const TICKET_COUNT: usize = 90;
2259
2260        let prerequisites = AggregationPrerequisites {
2261            min_ticket_count: None,
2262            min_unaggregated_ratio: Some(0.9),
2263        };
2264
2265        let channel = ChannelEntry::new(
2266            BOB.public().to_address(),
2267            ALICE.public().to_address(),
2268            BalanceType::HOPR.balance(100),
2269            (TICKET_COUNT + 1).into(),
2270            ChannelStatus::Open,
2271            4_u32.into(),
2272        );
2273
2274        let mut dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2275            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2276            .collect();
2277        dummy_tickets[0].index_offset = 2; // Make this ticket aggregated
2278
2279        let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
2280
2281        assert_eq!(filtered_tickets.len(), TICKET_COUNT);
2282        Ok(())
2283    }
2284
2285    #[async_std::test]
2286    async fn test_aggregation_prerequisites_must_return_empty_when_minimum_only_unaggregated_ratio_is_met_in_single_ticket_only(
2287    ) -> anyhow::Result<()> {
2288        let prerequisites = AggregationPrerequisites {
2289            min_ticket_count: None,
2290            min_unaggregated_ratio: Some(0.9),
2291        };
2292
2293        let channel = ChannelEntry::new(
2294            BOB.public().to_address(),
2295            ALICE.public().to_address(),
2296            BalanceType::HOPR.balance(100),
2297            2.into(),
2298            ChannelStatus::Open,
2299            4_u32.into(),
2300        );
2301
2302        // Single aggregated ticket exceeding the min_unaggregated_ratio
2303        let dummy_tickets = vec![dummy_ticket_model(channel.get_id(), 1, 2, 110)];
2304
2305        let filtered_tickets = filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005)?;
2306
2307        assert!(filtered_tickets.is_empty(), "must return empty");
2308        Ok(())
2309    }
2310
2311    async fn create_alice_db_with_tickets_from_bob(
2312        ticket_count: usize,
2313    ) -> anyhow::Result<(HoprDb, ChannelEntry, Vec<AcknowledgedTicket>)> {
2314        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
2315
2316        db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
2317            .await?;
2318
2319        add_peer_mappings(
2320            &db,
2321            vec![
2322                (ALICE_OFFCHAIN.clone(), ALICE.clone()),
2323                (BOB_OFFCHAIN.clone(), BOB.clone()),
2324            ],
2325        )
2326        .await?;
2327
2328        let (channel, tickets) = init_db_with_tickets(&db, ticket_count as u64).await?;
2329
2330        Ok((db, channel, tickets))
2331    }
2332
2333    #[async_std::test]
2334    async fn test_ticket_aggregation_should_fail_if_any_ticket_is_being_aggregated_in_that_channel(
2335    ) -> anyhow::Result<()> {
2336        const COUNT_TICKETS: usize = 5;
2337
2338        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2339
2340        assert_eq!(tickets.len(), COUNT_TICKETS);
2341
2342        let existing_channel_with_multiple_tickets = channel.get_id();
2343
2344        // mark the first ticket as being aggregated
2345        let mut ticket = hopr_db_entity::ticket::Entity::find()
2346            .one(&db.tickets_db)
2347            .await?
2348            .context("should have an active model")?
2349            .into_active_model();
2350        ticket.state = Set(AcknowledgedTicketStatus::BeingAggregated as i8);
2351        ticket.save(&db.tickets_db).await?;
2352
2353        assert!(db
2354            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2355            .await
2356            .is_err());
2357
2358        Ok(())
2359    }
2360
2361    #[async_std::test]
2362    async fn test_ticket_aggregation_should_not_offer_tickets_with_lower_than_min_win_prob() -> anyhow::Result<()> {
2363        const COUNT_TICKETS: usize = 5;
2364
2365        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2366
2367        assert_eq!(tickets.len(), COUNT_TICKETS);
2368
2369        let existing_channel_with_multiple_tickets = channel.get_id();
2370
2371        // Decrease the win prob of one ticket
2372        let mut ticket = hopr_db_entity::ticket::Entity::find()
2373            .one(&db.tickets_db)
2374            .await?
2375            .context("should have an active model")?
2376            .into_active_model();
2377        ticket.winning_probability = Set(f64_to_win_prob(0.5)?.to_vec());
2378        ticket.save(&db.tickets_db).await?;
2379
2380        let prepared_tickets = db
2381            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2382            .await?
2383            .ok_or(anyhow!("should contain tickets"))?
2384            .1;
2385
2386        assert_eq!(COUNT_TICKETS - 1, prepared_tickets.len());
2387
2388        Ok(())
2389    }
2390
2391    #[async_std::test]
2392    async fn test_ticket_aggregation_prepare_request_with_0_tickets_should_return_empty_result() -> anyhow::Result<()> {
2393        const COUNT_TICKETS: usize = 0;
2394
2395        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2396
2397        assert_eq!(tickets.len(), COUNT_TICKETS);
2398
2399        let existing_channel_with_0_tickets = channel.get_id();
2400
2401        let actual = db
2402            .prepare_aggregation_in_channel(&existing_channel_with_0_tickets, Default::default())
2403            .await?;
2404
2405        assert_eq!(actual, None);
2406
2407        Ok(())
2408    }
2409
2410    #[async_std::test]
2411    async fn test_ticket_aggregation_prepare_request_with_multiple_tickets_should_return_that_ticket(
2412    ) -> anyhow::Result<()> {
2413        const COUNT_TICKETS: usize = 2;
2414
2415        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2416        let tickets: Vec<TransferableWinningTicket> = tickets
2417            .into_iter()
2418            .map(|t| t.into_transferable(&ALICE, &Hash::default()))
2419            .collect::<hopr_internal_types::errors::Result<Vec<TransferableWinningTicket>>>()?;
2420
2421        assert_eq!(tickets.len(), COUNT_TICKETS);
2422
2423        let existing_channel_with_multiple_tickets = channel.get_id();
2424
2425        let actual = db
2426            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2427            .await?;
2428
2429        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2430
2431        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2432            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2433            .count(&db.tickets_db)
2434            .await? as usize;
2435
2436        assert_eq!(actual_being_aggregated_count, COUNT_TICKETS);
2437
2438        Ok(())
2439    }
2440
2441    #[async_std::test]
2442    async fn test_ticket_aggregation_prepare_request_with_duplicate_tickets_should_return_dedup_aggregated_ticket(
2443    ) -> anyhow::Result<()> {
2444        let (db, channel, _) = create_alice_db_with_tickets_from_bob(0).await?;
2445        let tickets = vec![
2446            generate_random_ack_ticket(&BOB, &ALICE, 1, 1, 1.0),
2447            generate_random_ack_ticket(&BOB, &ALICE, 0, 2, 1.0),
2448            generate_random_ack_ticket(&BOB, &ALICE, 2, 1, 1.0),
2449            generate_random_ack_ticket(&BOB, &ALICE, 3, 1, 1.0),
2450        ]
2451        .into_iter()
2452        .collect::<anyhow::Result<Vec<AcknowledgedTicket>>>()?;
2453
2454        let tickets_clone = tickets.clone();
2455        let db_clone = db.clone();
2456        db.nest_transaction_in_db(None, TargetDb::Tickets)
2457            .await?
2458            .perform(|tx| {
2459                Box::pin(async move {
2460                    for ticket in tickets_clone {
2461                        db_clone.upsert_ticket(tx.into(), ticket).await?;
2462                    }
2463                    Ok::<_, DbError>(())
2464                })
2465            })
2466            .await?;
2467
2468        let existing_channel_with_multiple_tickets = channel.get_id();
2469        let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2470        assert_eq!(stats.neglected_value, BalanceType::HOPR.zero());
2471
2472        let actual = db
2473            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2474            .await?;
2475
2476        let mut tickets = tickets
2477            .into_iter()
2478            .map(|t| t.into_transferable(&ALICE, &Hash::default()))
2479            .collect::<hopr_internal_types::errors::Result<Vec<_>>>()?;
2480
2481        // We expect the first ticket to be removed
2482        tickets.remove(0);
2483        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2484
2485        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2486            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2487            .count(&db.tickets_db)
2488            .await? as usize;
2489
2490        assert_eq!(actual_being_aggregated_count, 3);
2491
2492        Ok(())
2493    }
2494
2495    #[async_std::test]
2496    async fn test_ticket_aggregation_prepare_request_with_a_being_redeemed_ticket_should_aggregate_only_the_tickets_following_it(
2497    ) -> anyhow::Result<()> {
2498        const COUNT_TICKETS: usize = 5;
2499
2500        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2501        let mut tickets = tickets
2502            .into_iter()
2503            .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2504            .collect::<Vec<_>>();
2505
2506        assert_eq!(tickets.len(), COUNT_TICKETS);
2507
2508        let existing_channel_with_multiple_tickets = channel.get_id();
2509
2510        // mark the first ticket as being redeemed
2511        let mut ticket = hopr_db_entity::ticket::Entity::find()
2512            .one(&db.tickets_db)
2513            .await?
2514            .context("should have 1 active model")?
2515            .into_active_model();
2516        ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
2517        ticket.save(&db.tickets_db).await?;
2518
2519        let actual = db
2520            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2521            .await?;
2522
2523        tickets.remove(0);
2524        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2525
2526        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2527            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2528            .count(&db.tickets_db)
2529            .await? as usize;
2530
2531        assert_eq!(actual_being_aggregated_count, COUNT_TICKETS - 1);
2532
2533        Ok(())
2534    }
2535
2536    #[async_std::test]
2537    async fn test_ticket_aggregation_prepare_request_with_some_requirements_should_return_when_ticket_threshold_is_met(
2538    ) -> anyhow::Result<()> {
2539        const COUNT_TICKETS: usize = 5;
2540
2541        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2542        let tickets = tickets
2543            .into_iter()
2544            .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2545            .collect::<Vec<_>>();
2546
2547        assert_eq!(tickets.len(), COUNT_TICKETS);
2548
2549        let existing_channel_with_multiple_tickets = channel.get_id();
2550
2551        let constraints = AggregationPrerequisites {
2552            min_ticket_count: Some(COUNT_TICKETS - 1),
2553            min_unaggregated_ratio: None,
2554        };
2555        let actual = db
2556            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, constraints)
2557            .await?;
2558
2559        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2560
2561        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2562            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2563            .count(&db.tickets_db)
2564            .await? as usize;
2565
2566        assert_eq!(actual_being_aggregated_count, COUNT_TICKETS);
2567
2568        Ok(())
2569    }
2570
2571    #[async_std::test]
2572    async fn test_ticket_aggregation_prepare_request_with_some_requirements_should_not_return_when_ticket_threshold_is_not_met(
2573    ) -> anyhow::Result<()> {
2574        const COUNT_TICKETS: usize = 2;
2575
2576        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2577
2578        assert_eq!(tickets.len(), COUNT_TICKETS);
2579
2580        let existing_channel_with_multiple_tickets = channel.get_id();
2581
2582        let constraints = AggregationPrerequisites {
2583            min_ticket_count: Some(COUNT_TICKETS + 1),
2584            min_unaggregated_ratio: None,
2585        };
2586        let actual = db
2587            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, constraints)
2588            .await?;
2589
2590        assert_eq!(actual, None);
2591
2592        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2593            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2594            .count(&db.tickets_db)
2595            .await? as usize;
2596
2597        assert_eq!(actual_being_aggregated_count, 0);
2598
2599        Ok(())
2600    }
2601
2602    #[async_std::test]
2603    async fn test_ticket_aggregation_prepare_request_with_no_aggregatable_tickets_should_return_nothing(
2604    ) -> anyhow::Result<()> {
2605        const COUNT_TICKETS: usize = 3;
2606
2607        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2608
2609        assert_eq!(tickets.len(), { COUNT_TICKETS });
2610
2611        let existing_channel_with_multiple_tickets = channel.get_id();
2612
2613        // mark all tickets as being redeemed
2614        for ticket in hopr_db_entity::ticket::Entity::find()
2615            .all(&db.tickets_db)
2616            .await?
2617            .into_iter()
2618        {
2619            let mut ticket = ticket.into_active_model();
2620            ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
2621            ticket.save(&db.tickets_db).await?;
2622        }
2623
2624        let actual = db
2625            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2626            .await?;
2627
2628        assert_eq!(actual, None);
2629
2630        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2631            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2632            .count(&db.tickets_db)
2633            .await? as usize;
2634
2635        assert_eq!(actual_being_aggregated_count, 0);
2636
2637        Ok(())
2638    }
2639
2640    #[async_std::test]
2641    async fn test_ticket_aggregation_rollback_should_rollback_all_the_being_aggregated_tickets_but_nothing_else(
2642    ) -> anyhow::Result<()> {
2643        const COUNT_TICKETS: usize = 5;
2644
2645        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2646
2647        assert_eq!(tickets.len(), COUNT_TICKETS);
2648
2649        let existing_channel_with_multiple_tickets = channel.get_id();
2650
2651        // mark the first ticket as being redeemed
2652        let mut ticket = hopr_db_entity::ticket::Entity::find()
2653            .one(&db.tickets_db)
2654            .await?
2655            .context("should have one active model")?
2656            .into_active_model();
2657        ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
2658        ticket.save(&db.tickets_db).await?;
2659
2660        assert!(db
2661            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2662            .await
2663            .is_ok());
2664
2665        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2666            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2667            .count(&db.tickets_db)
2668            .await? as usize;
2669
2670        assert_eq!(actual_being_aggregated_count, COUNT_TICKETS - 1);
2671
2672        assert!(db
2673            .rollback_aggregation_in_channel(existing_channel_with_multiple_tickets)
2674            .await
2675            .is_ok());
2676
2677        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2678            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2679            .count(&db.tickets_db)
2680            .await? as usize;
2681
2682        assert_eq!(actual_being_aggregated_count, 0);
2683
2684        Ok(())
2685    }
2686
2687    #[async_std::test]
2688    async fn test_ticket_aggregation_should_replace_the_tickets_with_a_correctly_aggregated_ticket(
2689    ) -> anyhow::Result<()> {
2690        const COUNT_TICKETS: usize = 5;
2691
2692        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2693
2694        let (notifier_tx, notifier_rx) = futures::channel::mpsc::unbounded();
2695        db.start_ticket_processing(Some(notifier_tx))?;
2696
2697        let tickets = tickets
2698            .into_iter()
2699            .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2700            .collect::<Vec<_>>();
2701
2702        let first_ticket = tickets.first().context("should contain tickets")?.ticket.clone();
2703        let aggregated_ticket = TicketBuilder::default()
2704            .addresses(&*BOB, &*ALICE)
2705            .amount(
2706                tickets
2707                    .iter()
2708                    .fold(U256::zero(), |acc, v| acc + v.ticket.amount.amount()),
2709            )
2710            .index(first_ticket.index)
2711            .index_offset(
2712                tickets.last().context("should contain tickets")?.ticket.index as u32 - first_ticket.index as u32 + 1,
2713            )
2714            .win_prob(1.0)
2715            .channel_epoch(first_ticket.channel_epoch)
2716            .challenge(first_ticket.challenge)
2717            .build_signed(&BOB, &Hash::default())?;
2718
2719        assert_eq!(tickets.len(), COUNT_TICKETS);
2720
2721        let existing_channel_with_multiple_tickets = channel.get_id();
2722
2723        let actual = db
2724            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2725            .await?;
2726
2727        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2728
2729        let agg_ticket = aggregated_ticket.clone();
2730
2731        let _ = db
2732            .process_received_aggregated_ticket(aggregated_ticket.leak(), &ALICE)
2733            .await?;
2734
2735        pin_mut!(notifier_rx);
2736        let notified_ticket = notifier_rx.next().await.ok_or(anyhow!("must have ticket"))?;
2737
2738        assert_eq!(notified_ticket.verified_ticket(), agg_ticket.verified_ticket());
2739
2740        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2741            .filter(ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2742            .count(&db.tickets_db)
2743            .await? as usize;
2744
2745        assert_eq!(actual_being_aggregated_count, 0);
2746
2747        Ok(())
2748    }
2749
2750    #[async_std::test]
2751    async fn test_ticket_aggregation_should_fail_if_the_aggregated_ticket_value_is_lower_than_the_stored_one(
2752    ) -> anyhow::Result<()> {
2753        const COUNT_TICKETS: usize = 5;
2754
2755        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2756        let tickets = tickets
2757            .into_iter()
2758            .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2759            .collect::<Vec<_>>();
2760
2761        let first_ticket = tickets.first().context("should contain tickets")?.ticket.clone();
2762        let aggregated_ticket = TicketBuilder::default()
2763            .addresses(&*BOB, &*ALICE)
2764            .amount(0)
2765            .index(first_ticket.index)
2766            .index_offset(
2767                tickets.last().context("should contain tickets")?.ticket.index as u32 - first_ticket.index as u32 + 1,
2768            )
2769            .win_prob(1.0)
2770            .channel_epoch(first_ticket.channel_epoch)
2771            .challenge(first_ticket.challenge)
2772            .build_signed(&BOB, &Hash::default())?;
2773
2774        assert_eq!(tickets.len(), COUNT_TICKETS);
2775
2776        let existing_channel_with_multiple_tickets = channel.get_id();
2777
2778        let actual = db
2779            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2780            .await?;
2781
2782        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2783
2784        assert!(db
2785            .process_received_aggregated_ticket(aggregated_ticket.leak(), &ALICE)
2786            .await
2787            .is_err());
2788
2789        Ok(())
2790    }
2791
2792    #[async_std::test]
2793    async fn test_ticket_aggregation_should_fail_if_the_aggregated_ticket_win_probability_is_not_equal_to_1(
2794    ) -> anyhow::Result<()> {
2795        const COUNT_TICKETS: usize = 5;
2796
2797        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2798        let tickets = tickets
2799            .into_iter()
2800            .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2801            .collect::<Vec<_>>();
2802
2803        let first_ticket = tickets.first().context("should contain tickets")?.ticket.clone();
2804        let aggregated_ticket = TicketBuilder::default()
2805            .addresses(&*BOB, &*ALICE)
2806            .amount(0)
2807            .index(first_ticket.index)
2808            .index_offset(
2809                tickets.last().context("should contain tickets")?.ticket.index as u32 - first_ticket.index as u32 + 1,
2810            )
2811            .win_prob(0.5) // 50% winning prob
2812            .channel_epoch(first_ticket.channel_epoch)
2813            .challenge(first_ticket.challenge)
2814            .build_signed(&BOB, &Hash::default())?;
2815
2816        assert_eq!(tickets.len(), COUNT_TICKETS);
2817
2818        let existing_channel_with_multiple_tickets = channel.get_id();
2819
2820        let actual = db
2821            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2822            .await?;
2823
2824        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2825
2826        assert!(db
2827            .process_received_aggregated_ticket(aggregated_ticket.leak(), &ALICE)
2828            .await
2829            .is_err());
2830
2831        Ok(())
2832    }
2833
2834    async fn init_db_with_channel(channel: ChannelEntry) -> anyhow::Result<HoprDb> {
2835        let db = HoprDb::new_in_memory(BOB.clone()).await?;
2836
2837        db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
2838            .await?;
2839
2840        add_peer_mappings(
2841            &db,
2842            vec![
2843                (ALICE_OFFCHAIN.clone(), ALICE.clone()),
2844                (BOB_OFFCHAIN.clone(), BOB.clone()),
2845            ],
2846        )
2847        .await?;
2848
2849        db.upsert_channel(None, channel).await?;
2850
2851        Ok(db)
2852    }
2853
2854    #[async_std::test]
2855    async fn test_aggregate_ticket_should_aggregate() -> anyhow::Result<()> {
2856        const COUNT_TICKETS: usize = 5;
2857
2858        let channel = ChannelEntry::new(
2859            BOB.public().to_address(),
2860            ALICE.public().to_address(),
2861            BalanceType::HOPR.balance(u32::MAX),
2862            (COUNT_TICKETS + 1).into(),
2863            ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(120))),
2864            4_u32.into(),
2865        );
2866
2867        let db = init_db_with_channel(channel).await?;
2868
2869        let tickets = (0..COUNT_TICKETS)
2870            .map(|i| {
2871                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
2872                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
2873            })
2874            .collect::<anyhow::Result<Vec<_>>>()?;
2875
2876        let sum_value = tickets
2877            .iter()
2878            .fold(BalanceType::HOPR.zero(), |acc, x| acc + x.ticket.amount);
2879        let min_idx = tickets
2880            .iter()
2881            .map(|t| t.ticket.index)
2882            .min()
2883            .context("min index should be present")?;
2884        let max_idx = tickets
2885            .iter()
2886            .map(|t| t.ticket.index)
2887            .max()
2888            .context("max index should be present")?;
2889
2890        let aggregated = db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets, &BOB).await?;
2891
2892        assert_eq!(
2893            &BOB.public().to_address(),
2894            aggregated.verified_issuer(),
2895            "must have correct signer"
2896        );
2897
2898        assert!(aggregated.verified_ticket().is_aggregated(), "must be aggregated");
2899
2900        assert_eq!(
2901            COUNT_TICKETS,
2902            aggregated.verified_ticket().index_offset as usize,
2903            "aggregated ticket must have correct offset"
2904        );
2905        assert_eq!(
2906            sum_value,
2907            aggregated.verified_ticket().amount,
2908            "aggregated ticket token amount must be sum of individual tickets"
2909        );
2910        assert_eq!(
2911            1.0,
2912            aggregated.win_prob(),
2913            "aggregated ticket must have winning probability 1"
2914        );
2915        assert_eq!(
2916            min_idx,
2917            aggregated.verified_ticket().index,
2918            "aggregated ticket must have correct index"
2919        );
2920        assert_eq!(
2921            channel.get_id(),
2922            aggregated.verified_ticket().channel_id,
2923            "aggregated ticket must have correct channel id"
2924        );
2925        assert_eq!(
2926            channel.channel_epoch.as_u32(),
2927            aggregated.verified_ticket().channel_epoch,
2928            "aggregated ticket must have correct channel epoch"
2929        );
2930
2931        assert_eq!(
2932            max_idx + 1,
2933            db.get_outgoing_ticket_index(channel.get_id())
2934                .await?
2935                .load(Ordering::SeqCst)
2936        );
2937
2938        Ok(())
2939    }
2940
2941    #[async_std::test]
2942    async fn test_aggregate_ticket_should_aggregate_including_aggregated() -> anyhow::Result<()> {
2943        const COUNT_TICKETS: usize = 5;
2944
2945        let channel = ChannelEntry::new(
2946            BOB.public().to_address(),
2947            ALICE.public().to_address(),
2948            BalanceType::HOPR.balance(u32::MAX),
2949            (COUNT_TICKETS + 1).into(),
2950            ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(120))),
2951            4_u32.into(),
2952        );
2953
2954        let db = init_db_with_channel(channel).await?;
2955
2956        let offset = 10_usize;
2957
2958        let mut tickets = (1..COUNT_TICKETS)
2959            .map(|i| {
2960                generate_random_ack_ticket(&BOB, &ALICE, (i + offset) as u64, 1, 1.0)
2961                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
2962            })
2963            .collect::<anyhow::Result<Vec<_>>>()?;
2964
2965        // Add an aggregated ticket to the set too
2966        tickets.push(
2967            generate_random_ack_ticket(&BOB, &ALICE, 0, offset as u32, 1.0)
2968                .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))?,
2969        );
2970
2971        let sum_value = tickets
2972            .iter()
2973            .fold(BalanceType::HOPR.zero(), |acc, x| acc + x.ticket.amount);
2974        let min_idx = tickets
2975            .iter()
2976            .map(|t| t.ticket.index)
2977            .min()
2978            .context("min index should be present")?;
2979        let max_idx = tickets
2980            .iter()
2981            .map(|t| t.ticket.index)
2982            .max()
2983            .context("max index should be present")?;
2984
2985        let aggregated = db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets, &BOB).await?;
2986
2987        assert_eq!(
2988            &BOB.public().to_address(),
2989            aggregated.verified_issuer(),
2990            "must have correct signer"
2991        );
2992
2993        assert!(aggregated.verified_ticket().is_aggregated(), "must be aggregated");
2994
2995        assert_eq!(
2996            COUNT_TICKETS + offset,
2997            aggregated.verified_ticket().index_offset as usize,
2998            "aggregated ticket must have correct offset"
2999        );
3000        assert_eq!(
3001            sum_value,
3002            aggregated.verified_ticket().amount,
3003            "aggregated ticket token amount must be sum of individual tickets"
3004        );
3005        assert_eq!(
3006            1.0,
3007            aggregated.win_prob(),
3008            "aggregated ticket must have winning probability 1"
3009        );
3010        assert_eq!(
3011            min_idx,
3012            aggregated.verified_ticket().index,
3013            "aggregated ticket must have correct index"
3014        );
3015        assert_eq!(
3016            channel.get_id(),
3017            aggregated.verified_ticket().channel_id,
3018            "aggregated ticket must have correct channel id"
3019        );
3020        assert_eq!(
3021            channel.channel_epoch.as_u32(),
3022            aggregated.verified_ticket().channel_epoch,
3023            "aggregated ticket must have correct channel epoch"
3024        );
3025
3026        assert_eq!(
3027            max_idx + 1,
3028            db.get_outgoing_ticket_index(channel.get_id())
3029                .await?
3030                .load(Ordering::SeqCst)
3031        );
3032
3033        Ok(())
3034    }
3035
3036    #[async_std::test]
3037    async fn test_aggregate_ticket_should_not_aggregate_zero_tickets() -> anyhow::Result<()> {
3038        let db = HoprDb::new_in_memory(BOB.clone()).await?;
3039
3040        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), vec![], &BOB)
3041            .await
3042            .expect_err("should not aggregate empty ticket list");
3043
3044        Ok(())
3045    }
3046
3047    #[async_std::test]
3048    async fn test_aggregate_ticket_should_aggregate_single_ticket_to_itself() -> anyhow::Result<()> {
3049        const COUNT_TICKETS: usize = 1;
3050
3051        let channel = ChannelEntry::new(
3052            BOB.public().to_address(),
3053            ALICE.public().to_address(),
3054            BalanceType::HOPR.balance(u32::MAX),
3055            (COUNT_TICKETS + 1).into(),
3056            ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(120))),
3057            4_u32.into(),
3058        );
3059
3060        let db = init_db_with_channel(channel).await?;
3061
3062        let mut tickets = (0..COUNT_TICKETS)
3063            .map(|i| {
3064                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3065                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3066            })
3067            .collect::<anyhow::Result<Vec<_>>>()?;
3068
3069        let aggregated = db
3070            .aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3071            .await?;
3072
3073        assert_eq!(
3074            &tickets.pop().context("ticket should be present")?.ticket,
3075            aggregated.verified_ticket()
3076        );
3077
3078        Ok(())
3079    }
3080
3081    #[async_std::test]
3082    async fn test_aggregate_ticket_should_not_aggregate_on_closed_channel() -> anyhow::Result<()> {
3083        const COUNT_TICKETS: usize = 3;
3084
3085        let channel = ChannelEntry::new(
3086            BOB.public().to_address(),
3087            ALICE.public().to_address(),
3088            BalanceType::HOPR.balance(u32::MAX),
3089            (COUNT_TICKETS + 1).into(),
3090            ChannelStatus::Closed,
3091            4_u32.into(),
3092        );
3093
3094        let db = init_db_with_channel(channel).await?;
3095
3096        let tickets = (0..COUNT_TICKETS)
3097            .map(|i| {
3098                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3099                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3100            })
3101            .collect::<anyhow::Result<Vec<_>>>()?;
3102
3103        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3104            .await
3105            .expect_err("should not aggregate on closed channel");
3106
3107        Ok(())
3108    }
3109
3110    #[async_std::test]
3111    async fn test_aggregate_ticket_should_not_aggregate_on_incoming_channel() -> anyhow::Result<()> {
3112        const COUNT_TICKETS: usize = 3;
3113
3114        let channel = ChannelEntry::new(
3115            ALICE.public().to_address(),
3116            BOB.public().to_address(),
3117            BalanceType::HOPR.balance(u32::MAX),
3118            (COUNT_TICKETS + 1).into(),
3119            ChannelStatus::Open,
3120            4_u32.into(),
3121        );
3122
3123        let db = init_db_with_channel(channel).await?;
3124
3125        let tickets = (0..COUNT_TICKETS)
3126            .map(|i| {
3127                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3128                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3129            })
3130            .collect::<anyhow::Result<Vec<_>>>()?;
3131
3132        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3133            .await
3134            .expect_err("should not aggregate on incoming channel");
3135
3136        Ok(())
3137    }
3138
3139    #[async_std::test]
3140    async fn test_aggregate_ticket_should_not_aggregate_if_mismatching_channel_ids() -> anyhow::Result<()> {
3141        const COUNT_TICKETS: usize = 3;
3142
3143        let channel = ChannelEntry::new(
3144            ALICE.public().to_address(),
3145            BOB.public().to_address(),
3146            BalanceType::HOPR.balance(u32::MAX),
3147            (COUNT_TICKETS + 1).into(),
3148            ChannelStatus::Open,
3149            4_u32.into(),
3150        );
3151
3152        let db = init_db_with_channel(channel).await?;
3153
3154        let mut tickets = (0..COUNT_TICKETS)
3155            .map(|i| {
3156                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3157                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3158            })
3159            .collect::<anyhow::Result<Vec<_>>>()?;
3160
3161        tickets[2] = generate_random_ack_ticket(&BOB, &ChainKeypair::random(), 2, 1, 1.0)
3162            .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))?;
3163
3164        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3165            .await
3166            .expect_err("should not aggregate on mismatching channel ids");
3167
3168        Ok(())
3169    }
3170
3171    #[async_std::test]
3172    async fn test_aggregate_ticket_should_not_aggregate_if_mismatching_channel_epoch() -> anyhow::Result<()> {
3173        const COUNT_TICKETS: usize = 3;
3174
3175        let channel = ChannelEntry::new(
3176            ALICE.public().to_address(),
3177            BOB.public().to_address(),
3178            BalanceType::HOPR.balance(u32::MAX),
3179            (COUNT_TICKETS + 1).into(),
3180            ChannelStatus::Open,
3181            3_u32.into(),
3182        );
3183
3184        let db = init_db_with_channel(channel).await?;
3185
3186        let tickets = (0..COUNT_TICKETS)
3187            .map(|i| {
3188                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3189                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3190            })
3191            .collect::<anyhow::Result<Vec<_>>>()?;
3192
3193        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3194            .await
3195            .expect_err("should not aggregate on mismatching channel epoch");
3196
3197        Ok(())
3198    }
3199
3200    #[async_std::test]
3201    async fn test_aggregate_ticket_should_not_aggregate_if_ticket_indices_overlap() -> anyhow::Result<()> {
3202        const COUNT_TICKETS: usize = 3;
3203
3204        let channel = ChannelEntry::new(
3205            ALICE.public().to_address(),
3206            BOB.public().to_address(),
3207            BalanceType::HOPR.balance(u32::MAX),
3208            (COUNT_TICKETS + 1).into(),
3209            ChannelStatus::Open,
3210            3_u32.into(),
3211        );
3212
3213        let db = init_db_with_channel(channel).await?;
3214
3215        let mut tickets = (0..COUNT_TICKETS)
3216            .map(|i| {
3217                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3218                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3219            })
3220            .collect::<anyhow::Result<Vec<_>>>()?;
3221
3222        tickets[1] = generate_random_ack_ticket(&BOB, &ALICE, 1, 2, 1.0)
3223            .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))?;
3224
3225        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3226            .await
3227            .expect_err("should not aggregate on overlapping ticket indices");
3228        Ok(())
3229    }
3230
3231    #[async_std::test]
3232    async fn test_aggregate_ticket_should_not_aggregate_if_ticket_is_not_valid() -> anyhow::Result<()> {
3233        const COUNT_TICKETS: usize = 3;
3234
3235        let channel = ChannelEntry::new(
3236            ALICE.public().to_address(),
3237            BOB.public().to_address(),
3238            BalanceType::HOPR.balance(u32::MAX),
3239            (COUNT_TICKETS + 1).into(),
3240            ChannelStatus::Open,
3241            3_u32.into(),
3242        );
3243
3244        let db = init_db_with_channel(channel).await?;
3245
3246        let mut tickets = (0..COUNT_TICKETS)
3247            .map(|i| {
3248                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3249                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3250            })
3251            .collect::<anyhow::Result<Vec<_>>>()?;
3252
3253        // Modify the ticket and do not sign it
3254        tickets[1].ticket.amount = Balance::new(TICKET_VALUE - 10, BalanceType::HOPR);
3255
3256        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3257            .await
3258            .expect_err("should not aggregate on invalid tickets");
3259
3260        Ok(())
3261    }
3262
3263    #[async_std::test]
3264    async fn test_aggregate_ticket_should_not_aggregate_if_ticket_has_lower_than_min_win_prob() -> anyhow::Result<()> {
3265        const COUNT_TICKETS: usize = 3;
3266
3267        let channel = ChannelEntry::new(
3268            ALICE.public().to_address(),
3269            BOB.public().to_address(),
3270            BalanceType::HOPR.balance(u32::MAX),
3271            (COUNT_TICKETS + 1).into(),
3272            ChannelStatus::Open,
3273            3_u32.into(),
3274        );
3275
3276        let db = init_db_with_channel(channel).await?;
3277
3278        let tickets = (0..COUNT_TICKETS)
3279            .map(|i| {
3280                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, if i > 0 { 1.0 } else { 0.9 })
3281                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3282            })
3283            .collect::<anyhow::Result<Vec<_>>>()?;
3284
3285        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3286            .await
3287            .expect_err("should not aggregate tickets with less than minimum win prob");
3288
3289        Ok(())
3290    }
3291
3292    #[async_std::test]
3293    async fn test_aggregate_ticket_should_not_aggregate_if_ticket_is_not_winning() -> anyhow::Result<()> {
3294        const COUNT_TICKETS: usize = 3;
3295
3296        let channel = ChannelEntry::new(
3297            ALICE.public().to_address(),
3298            BOB.public().to_address(),
3299            BalanceType::HOPR.balance(u32::MAX),
3300            (COUNT_TICKETS + 1).into(),
3301            ChannelStatus::Open,
3302            3_u32.into(),
3303        );
3304
3305        let db = init_db_with_channel(channel).await?;
3306
3307        let mut tickets = (0..COUNT_TICKETS)
3308            .map(|i| {
3309                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3310                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3311            })
3312            .collect::<anyhow::Result<Vec<_>>>()?;
3313
3314        // Set winning probability to zero and sign the ticket again
3315        let resp = Response::from_half_keys(&HalfKey::random(), &HalfKey::random())?;
3316        tickets[1] = TicketBuilder::from(tickets[1].ticket.clone())
3317            .win_prob(0.0)
3318            .challenge(resp.to_challenge().into())
3319            .build_signed(&BOB, &Hash::default())?
3320            .into_acknowledged(resp)
3321            .into_transferable(&ALICE, &Hash::default())?;
3322
3323        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3324            .await
3325            .expect_err("should not aggregate non-winning tickets");
3326
3327        Ok(())
3328    }
3329
3330    #[async_std::test]
3331    async fn test_set_ticket_statistics_when_tickets_are_in_db() -> anyhow::Result<()> {
3332        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
3333
3334        let ticket = init_db_with_tickets(&db, 1).await?.1.pop().unwrap();
3335
3336        db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed)
3337            .await
3338            .expect("must not fail");
3339
3340        let stats = db.get_ticket_statistics(None).await.expect("must not fail");
3341        assert_ne!(stats.redeemed_value, BalanceType::HOPR.zero());
3342
3343        db.reset_ticket_statistics().await.expect("must not fail");
3344
3345        let stats = db.get_ticket_statistics(None).await.expect("must not fail");
3346        assert_eq!(stats.redeemed_value, BalanceType::HOPR.zero());
3347
3348        Ok(())
3349    }
3350
3351    #[async_std::test]
3352    async fn test_fix_channels_ticket_state() -> anyhow::Result<()> {
3353        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
3354        const COUNT_TICKETS: u64 = 1;
3355
3356        let (_, _) = init_db_with_tickets(&db, COUNT_TICKETS).await?;
3357
3358        // mark the first ticket as being redeemed
3359        let mut ticket = hopr_db_entity::ticket::Entity::find()
3360            .one(&db.tickets_db)
3361            .await?
3362            .context("should have one active model")?
3363            .into_active_model();
3364        ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
3365        ticket.save(&db.tickets_db).await?;
3366
3367        assert!(
3368            hopr_db_entity::ticket::Entity::find()
3369                .one(&db.tickets_db)
3370                .await?
3371                .context("should have one active model")?
3372                .state
3373                == AcknowledgedTicketStatus::BeingRedeemed as i8,
3374        );
3375
3376        db.fix_channels_next_ticket_state().await.expect("must not fail");
3377
3378        assert!(
3379            hopr_db_entity::ticket::Entity::find()
3380                .one(&db.tickets_db)
3381                .await?
3382                .context("should have one active model")?
3383                .state
3384                == AcknowledgedTicketStatus::Untouched as i8,
3385        );
3386
3387        Ok(())
3388    }
3389
3390    #[async_std::test]
3391    async fn test_dont_fix_correct_channels_ticket_state() -> anyhow::Result<()> {
3392        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
3393        const COUNT_TICKETS: u64 = 2;
3394
3395        // we set up the channel to have ticket index 1, and ensure that fix does not trigger
3396        let (_, _) = init_db_with_tickets_and_channel(&db, COUNT_TICKETS, Some(1u32)).await?;
3397
3398        // mark the first ticket as being redeemed
3399        let mut ticket = hopr_db_entity::ticket::Entity::find()
3400            .filter(ticket::Column::Index.eq(0u64.to_be_bytes().to_vec()))
3401            .one(&db.tickets_db)
3402            .await?
3403            .context("should have one active model")?
3404            .into_active_model();
3405        ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
3406        ticket.save(&db.tickets_db).await?;
3407
3408        assert!(
3409            hopr_db_entity::ticket::Entity::find()
3410                .filter(ticket::Column::Index.eq(0u64.to_be_bytes().to_vec()))
3411                .one(&db.tickets_db)
3412                .await?
3413                .context("should have one active model")?
3414                .state
3415                == AcknowledgedTicketStatus::BeingRedeemed as i8,
3416        );
3417
3418        db.fix_channels_next_ticket_state().await.expect("must not fail");
3419
3420        // first ticket should still be in BeingRedeemed state
3421        let ticket0 = hopr_db_entity::ticket::Entity::find()
3422            .filter(ticket::Column::Index.eq(0u64.to_be_bytes().to_vec()))
3423            .one(&db.tickets_db)
3424            .await?
3425            .context("should have one active model")?;
3426        assert_eq!(ticket0.state, AcknowledgedTicketStatus::BeingRedeemed as i8);
3427
3428        // second ticket should be in Untouched state
3429        let ticket1 = hopr_db_entity::ticket::Entity::find()
3430            .filter(ticket::Column::Index.eq(1u64.to_be_bytes().to_vec()))
3431            .one(&db.tickets_db)
3432            .await?
3433            .context("should have one active model")?;
3434        assert_eq!(ticket1.state, AcknowledgedTicketStatus::Untouched as i8);
3435
3436        Ok(())
3437    }
3438}