hopr_db_sql/
tickets.rs

1use std::{
2    cmp,
3    ops::{Add, Bound},
4    sync::{
5        Arc,
6        atomic::{AtomicU64, Ordering},
7    },
8};
9
10use async_stream::stream;
11use async_trait::async_trait;
12use futures::{StreamExt, TryStreamExt, stream::BoxStream};
13use hopr_crypto_types::prelude::*;
14use hopr_db_api::{
15    errors::Result,
16    info::DomainSeparator,
17    prelude::{TicketIndexSelector, TicketMarker},
18    resolver::HoprDbResolverOperations,
19    tickets::{AggregationPrerequisites, ChannelTicketStatistics, HoprDbTicketOperations, TicketSelector},
20};
21use hopr_db_entity::{outgoing_ticket_index, ticket, ticket_statistics};
22use hopr_internal_types::prelude::*;
23#[cfg(all(feature = "prometheus", not(test)))]
24use hopr_metrics::metrics::MultiGauge;
25use hopr_primitive_types::prelude::*;
26use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QueryOrder, QuerySelect, Set};
27use sea_query::{Condition, Expr, IntoCondition, SimpleExpr};
28use tracing::{debug, error, info, trace, warn};
29
30use crate::{
31    HoprDbGeneralModelOperations, OpenTransaction, OptTx, TargetDb,
32    channels::HoprDbChannelOperations,
33    db::HoprDb,
34    errors::{DbSqlError, DbSqlError::LogicalError},
35    info::HoprDbInfoOperations,
36};
37
38#[cfg(all(feature = "prometheus", not(test)))]
39lazy_static::lazy_static! {
40    pub static ref METRIC_HOPR_TICKETS_INCOMING_STATISTICS: MultiGauge = MultiGauge::new(
41        "hopr_tickets_incoming_statistics",
42        "Ticket statistics for channels with incoming tickets.",
43        &["channel", "statistic"]
44    ).unwrap();
45}
46
47/// The maximum number of tickets that can sent for aggregation in a single request.
48pub const MAX_TICKETS_TO_AGGREGATE_BATCH: u64 = 500;
49
50/// The type is necessary solely to allow
51/// implementing the [`IntoCondition`] trait for [`TicketSelector`]
52/// from the `hopr_db_api` crate.
53#[derive(Clone)]
54pub(crate) struct WrappedTicketSelector(pub(crate) TicketSelector);
55
56impl From<TicketSelector> for WrappedTicketSelector {
57    fn from(selector: TicketSelector) -> Self {
58        Self(selector)
59    }
60}
61
62impl AsRef<TicketSelector> for WrappedTicketSelector {
63    fn as_ref(&self) -> &TicketSelector {
64        &self.0
65    }
66}
67
68impl IntoCondition for WrappedTicketSelector {
69    fn into_condition(self) -> Condition {
70        let expr = self
71            .0
72            .channel_identifiers
73            .into_iter()
74            .map(|(channel_id, epoch)| {
75                ticket::Column::ChannelId
76                    .eq(channel_id.to_hex())
77                    .and(ticket::Column::ChannelEpoch.eq(epoch.to_be_bytes().to_vec()))
78            })
79            .reduce(SimpleExpr::or);
80
81        // This cannot happen, but instead of panicking, return an impossible condition object
82        if expr.is_none() {
83            return Condition::any().not();
84        }
85
86        let mut expr = expr.unwrap();
87
88        match self.0.index {
89            TicketIndexSelector::None => {
90                // This will always be the case if there were multiple channel identifiers
91            }
92            TicketIndexSelector::Single(idx) => expr = expr.and(ticket::Column::Index.eq(idx.to_be_bytes().to_vec())),
93            TicketIndexSelector::Multiple(idxs) => {
94                expr = expr.and(ticket::Column::Index.is_in(idxs.into_iter().map(|i| i.to_be_bytes().to_vec())));
95            }
96            TicketIndexSelector::Range((lb, ub)) => {
97                expr = match lb {
98                    Bound::Included(gte) => expr.and(ticket::Column::Index.gte(gte.to_be_bytes().to_vec())),
99                    Bound::Excluded(gt) => expr.and(ticket::Column::Index.gt(gt.to_be_bytes().to_vec())),
100                    Bound::Unbounded => expr,
101                };
102                expr = match ub {
103                    Bound::Included(lte) => expr.and(ticket::Column::Index.lte(lte.to_be_bytes().to_vec())),
104                    Bound::Excluded(lt) => expr.and(ticket::Column::Index.lt(lt.to_be_bytes().to_vec())),
105                    Bound::Unbounded => expr,
106                };
107            }
108        }
109
110        if let Some(state) = self.0.state {
111            expr = expr.and(ticket::Column::State.eq(state as u8))
112        }
113
114        if self.0.only_aggregated {
115            expr = expr.and(ticket::Column::IndexOffset.gt(1));
116        }
117
118        // Win prob lower bound
119        expr = match self.0.win_prob.0 {
120            Bound::Included(gte) => expr.and(ticket::Column::WinningProbability.gte(gte.as_encoded().to_vec())),
121            Bound::Excluded(gt) => expr.and(ticket::Column::WinningProbability.gt(gt.as_encoded().to_vec())),
122            Bound::Unbounded => expr,
123        };
124
125        // Win prob upper bound
126        expr = match self.0.win_prob.1 {
127            Bound::Included(lte) => expr.and(ticket::Column::WinningProbability.lte(lte.as_encoded().to_vec())),
128            Bound::Excluded(lt) => expr.and(ticket::Column::WinningProbability.lt(lt.as_encoded().to_vec())),
129            Bound::Unbounded => expr,
130        };
131
132        // Amount lower bound
133        expr = match self.0.amount.0 {
134            Bound::Included(gte) => expr.and(ticket::Column::Amount.gte(gte.amount().to_be_bytes().to_vec())),
135            Bound::Excluded(gt) => expr.and(ticket::Column::Amount.gt(gt.amount().to_be_bytes().to_vec())),
136            Bound::Unbounded => expr,
137        };
138
139        // Amount upper bound
140        expr = match self.0.amount.1 {
141            Bound::Included(lte) => expr.and(ticket::Column::Amount.lte(lte.amount().to_be_bytes().to_vec())),
142            Bound::Excluded(lt) => expr.and(ticket::Column::Amount.lt(lt.amount().to_be_bytes().to_vec())),
143            Bound::Unbounded => expr,
144        };
145
146        expr.into_condition()
147    }
148}
149
150/// Filters the list of ticket models according to the prerequisites.
151/// **NOTE:** the input list is assumed to be sorted by ticket index in ascending order.
152///
153/// The following is applied:
154/// - the list of tickets is reduced so that the total amount on the tickets does not exceed the channel balance
155/// - it is checked whether the list size is greater than `min_unaggregated_ratio`
156/// - it is checked whether the ratio of total amount on the unaggregated tickets on the list and the channel balance
157///   ratio is greater than `min_unaggregated_ratio`
158pub(crate) fn filter_satisfying_ticket_models(
159    prerequisites: AggregationPrerequisites,
160    models: Vec<ticket::Model>,
161    channel_entry: &ChannelEntry,
162    min_win_prob: WinningProbability,
163) -> crate::errors::Result<Vec<ticket::Model>> {
164    let channel_id = channel_entry.get_id();
165
166    let mut to_be_aggregated = Vec::with_capacity(models.len());
167    let mut total_balance = HoprBalance::zero();
168
169    for m in models {
170        let ticket_wp: WinningProbability = m
171            .winning_probability
172            .as_slice()
173            .try_into()
174            .map_err(|_| DbSqlError::DecodingError)?;
175
176        if ticket_wp.approx_cmp(&min_win_prob).is_lt() {
177            warn!(
178                channel_id = %channel_entry.get_id(),
179                %ticket_wp, %min_win_prob, "encountered ticket with winning probability lower than the minimum threshold"
180            );
181            continue;
182        }
183
184        let to_add = HoprBalance::from_be_bytes(&m.amount);
185
186        // Do a balance check to be sure not to aggregate more than the current channel stake
187        total_balance += to_add;
188        if total_balance.gt(&channel_entry.balance) {
189            // Remove the last sub-balance which led to the overflow before breaking out of the loop.
190            total_balance -= to_add;
191            break;
192        }
193
194        to_be_aggregated.push(m);
195    }
196
197    // If there are no criteria, just send everything for aggregation
198    if prerequisites.min_ticket_count.is_none() && prerequisites.min_unaggregated_ratio.is_none() {
199        info!(channel = %channel_id, "Aggregation check OK, no aggregation prerequisites were given");
200        return Ok(to_be_aggregated);
201    }
202
203    let to_be_agg_count = to_be_aggregated.len();
204
205    // Check the aggregation threshold
206    if let Some(agg_threshold) = prerequisites.min_ticket_count {
207        if to_be_agg_count >= agg_threshold {
208            info!(channel = %channel_id, count = to_be_agg_count, threshold = agg_threshold, "Aggregation check OK aggregated value greater than threshold");
209            return Ok(to_be_aggregated);
210        } else {
211            debug!(channel = %channel_id, count = to_be_agg_count, threshold = agg_threshold,"Aggregation check FAIL not enough resources to aggregate");
212        }
213    }
214
215    if let Some(unrealized_threshold) = prerequisites.min_unaggregated_ratio {
216        let diminished_balance = channel_entry.balance.mul_f64(unrealized_threshold)?;
217
218        // Trigger aggregation if unrealized balance greater or equal to X percentage of the current balance
219        // and there are at least two tickets
220        if total_balance.ge(&diminished_balance) {
221            if to_be_agg_count > 1 {
222                info!(channel = %channel_id, count = to_be_agg_count, balance = ?total_balance, ?diminished_balance, "Aggregation check OK: more unrealized than diminished balance");
223                return Ok(to_be_aggregated);
224            } else {
225                debug!(channel = %channel_id, count = to_be_agg_count, balance = ?total_balance, ?diminished_balance, "Aggregation check FAIL: more unrealized than diminished balance but only 1 ticket");
226            }
227        } else {
228            debug!(channel = %channel_id, count = to_be_agg_count, balance = ?total_balance, ?diminished_balance, "Aggregation check FAIL: less unrealized than diminished balance");
229        }
230    }
231
232    debug!(channel = %channel_id,"Aggregation check FAIL: no prerequisites were met");
233    Ok(vec![])
234}
235
236pub(crate) async fn find_stats_for_channel(
237    tx: &OpenTransaction,
238    channel_id: &Hash,
239) -> crate::errors::Result<ticket_statistics::Model> {
240    if let Some(model) = ticket_statistics::Entity::find()
241        .filter(ticket_statistics::Column::ChannelId.eq(channel_id.to_hex()))
242        .one(tx.as_ref())
243        .await?
244    {
245        Ok(model)
246    } else {
247        let new_stats = ticket_statistics::ActiveModel {
248            channel_id: Set(channel_id.to_hex()),
249            ..Default::default()
250        }
251        .insert(tx.as_ref())
252        .await?;
253
254        Ok(new_stats)
255    }
256}
257
258impl HoprDb {
259    async fn get_tickets_value_int<'a>(
260        &'a self,
261        tx: OptTx<'a>,
262        selector: TicketSelector,
263    ) -> Result<(usize, HoprBalance)> {
264        let selector: WrappedTicketSelector = selector.into();
265        Ok(self
266            .nest_transaction_in_db(tx, TargetDb::Tickets)
267            .await?
268            .perform(|tx| {
269                Box::pin(async move {
270                    ticket::Entity::find()
271                        .filter(selector)
272                        .stream(tx.as_ref())
273                        .await
274                        .map_err(DbSqlError::from)?
275                        .map_err(DbSqlError::from)
276                        .try_fold((0_usize, HoprBalance::zero()), |(count, value), t| async move {
277                            Ok((count + 1, value + HoprBalance::from_be_bytes(t.amount)))
278                        })
279                        .await
280                })
281            })
282            .await?)
283    }
284}
285
286#[async_trait]
287impl HoprDbTicketOperations for HoprDb {
288    async fn get_all_tickets(&self) -> Result<Vec<AcknowledgedTicket>> {
289        Ok(self
290            .nest_transaction_in_db(None, TargetDb::Tickets)
291            .await?
292            .perform(|tx| {
293                Box::pin(async move {
294                    ticket::Entity::find()
295                        .all(tx.as_ref())
296                        .await?
297                        .into_iter()
298                        .map(AcknowledgedTicket::try_from)
299                        .collect::<hopr_db_entity::errors::Result<Vec<_>>>()
300                        .map_err(DbSqlError::from)
301                })
302            })
303            .await?)
304    }
305
306    async fn get_tickets(&self, selector: TicketSelector) -> Result<Vec<AcknowledgedTicket>> {
307        debug!("fetching tickets via {selector}");
308        let selector: WrappedTicketSelector = selector.into();
309
310        Ok(self
311            .nest_transaction_in_db(None, TargetDb::Tickets)
312            .await?
313            .perform(|tx| {
314                Box::pin(async move {
315                    ticket::Entity::find()
316                        .filter(selector)
317                        .all(tx.as_ref())
318                        .await?
319                        .into_iter()
320                        .map(AcknowledgedTicket::try_from)
321                        .collect::<hopr_db_entity::errors::Result<Vec<_>>>()
322                        .map_err(DbSqlError::from)
323                })
324            })
325            .await?)
326    }
327
328    async fn mark_tickets_as(&self, selector: TicketSelector, mark_as: TicketMarker) -> Result<usize> {
329        let myself = self.clone();
330        Ok(self
331            .ticket_manager
332            .with_write_locked_db(|tx| {
333                Box::pin(async move {
334                    let mut total_marked_count = 0;
335                    for (channel_id, epoch) in selector.channel_identifiers.iter() {
336                        let channel_selector = selector.clone().just_on_channel(*channel_id, epoch);
337
338                        // Get the number of tickets and their value just for this channel
339                        let (marked_count, marked_value) =
340                            myself.get_tickets_value_int(Some(tx), channel_selector.clone()).await?;
341                        trace!(marked_count, ?marked_value, ?mark_as, "ticket marking");
342
343                        if marked_count > 0 {
344                            // Delete the redeemed tickets first
345                            let deleted = ticket::Entity::delete_many()
346                                .filter(WrappedTicketSelector::from(channel_selector.clone()))
347                                .exec(tx.as_ref())
348                                .await?;
349
350                            // Update the stats if successful
351                            if deleted.rows_affected == marked_count as u64 {
352                                let mut new_stats = find_stats_for_channel(tx, channel_id).await?.into_active_model();
353                                let _current_value = match mark_as {
354                                    TicketMarker::Redeemed => {
355                                        let current_value = U256::from_be_bytes(new_stats.redeemed_value.as_ref());
356                                        new_stats.redeemed_value =
357                                            Set((current_value + marked_value.amount()).to_be_bytes().into());
358                                        current_value
359                                    }
360                                    TicketMarker::Rejected => {
361                                        let current_value = U256::from_be_bytes(new_stats.rejected_value.as_ref());
362                                        new_stats.rejected_value =
363                                            Set((current_value + marked_value.amount()).to_be_bytes().into());
364                                        current_value
365                                    }
366                                    TicketMarker::Neglected => {
367                                        let current_value = U256::from_be_bytes(new_stats.neglected_value.as_ref());
368                                        new_stats.neglected_value =
369                                            Set((current_value + marked_value.amount()).to_be_bytes().into());
370                                        current_value
371                                    }
372                                };
373                                new_stats.save(tx.as_ref()).await?;
374
375                                #[cfg(all(feature = "prometheus", not(test)))]
376                                {
377                                    let channel = channel_id.to_string();
378                                    METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
379                                        &[&channel, &mark_as.to_string()],
380                                        (_current_value + marked_value.amount()).as_u128() as f64,
381                                    );
382
383                                    // Tickets that are counted as rejected were never counted as unredeemed,
384                                    // so skip the metric subtraction in that case.
385                                    if mark_as != TicketMarker::Rejected {
386                                        let unredeemed_value = myself
387                                            .caches
388                                            .unrealized_value
389                                            .get(&(*channel_id, *epoch))
390                                            .await
391                                            .unwrap_or_default();
392
393                                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
394                                            &[&channel, "unredeemed"],
395                                            (unredeemed_value - marked_value.amount()).amount().as_u128() as f64,
396                                        );
397                                    }
398                                }
399
400                                myself.caches.unrealized_value.invalidate(&(*channel_id, *epoch)).await;
401                            } else {
402                                return Err(DbSqlError::LogicalError(format!(
403                                    "could not mark {marked_count} ticket as {mark_as}"
404                                )));
405                            }
406
407                            trace!(marked_count, ?channel_id, ?mark_as, "removed tickets in channel");
408                            total_marked_count += marked_count;
409                        }
410                    }
411
412                    info!(
413                        count = total_marked_count,
414                        ?mark_as,
415                        channel_count = selector.channel_identifiers.len(),
416                        "removed tickets in channels",
417                    );
418                    Ok(total_marked_count)
419                })
420            })
421            .await?)
422    }
423
424    async fn mark_unsaved_ticket_rejected(&self, ticket: &Ticket) -> Result<()> {
425        let channel_id = ticket.channel_id;
426        let amount = ticket.amount;
427        Ok(self
428            .ticket_manager
429            .with_write_locked_db(|tx| {
430                Box::pin(async move {
431                    let stats = find_stats_for_channel(tx, &channel_id).await?;
432
433                    let current_rejected_value = U256::from_be_bytes(stats.rejected_value.clone());
434
435                    let mut active_stats = stats.into_active_model();
436                    active_stats.rejected_value = Set((current_rejected_value + amount.amount()).to_be_bytes().into());
437                    active_stats.save(tx.as_ref()).await?;
438
439                    #[cfg(all(feature = "prometheus", not(test)))]
440                    {
441                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
442                            &[&channel_id.to_string(), "rejected"],
443                            (current_rejected_value + amount.amount()).as_u128() as f64,
444                        );
445                    }
446
447                    Ok::<(), DbSqlError>(())
448                })
449            })
450            .await?)
451    }
452
453    async fn update_ticket_states_and_fetch<'a>(
454        &'a self,
455        selector: TicketSelector,
456        new_state: AcknowledgedTicketStatus,
457    ) -> Result<BoxStream<'a, AcknowledgedTicket>> {
458        let selector: WrappedTicketSelector = selector.into();
459        Ok(Box::pin(stream! {
460            match ticket::Entity::find()
461                .filter(selector)
462                .stream(self.conn(TargetDb::Tickets))
463                .await {
464                Ok(mut stream) => {
465                    while let Ok(Some(ticket)) = stream.try_next().await {
466                        let active_ticket = ticket::ActiveModel {
467                            id: Set(ticket.id),
468                            state: Set(new_state as i8),
469                            ..Default::default()
470                        };
471
472                        {
473                            let _g = self.ticket_manager.mutex.lock();
474                            if let Err(e) = active_ticket.update(self.conn(TargetDb::Tickets)).await {
475                                error!(error = %e,"failed to update ticket in the db");
476                            }
477                        }
478
479                        match AcknowledgedTicket::try_from(ticket) {
480                            Ok(mut ticket) => {
481                                // Update the state manually, since we do not want to re-fetch the model after the update
482                                ticket.status = new_state;
483                                yield ticket
484                            },
485                            Err(e) => {
486                                tracing::error!(error = %e, "failed to decode ticket from the db");
487                            }
488                        }
489                    }
490                },
491                Err(e) => tracing::error!(error = %e, "failed open ticket db stream")
492            }
493        }))
494    }
495
496    async fn update_ticket_states(
497        &self,
498        selector: TicketSelector,
499        new_state: AcknowledgedTicketStatus,
500    ) -> Result<usize> {
501        let selector: WrappedTicketSelector = selector.into();
502        Ok(self
503            .ticket_manager
504            .with_write_locked_db(|tx| {
505                Box::pin(async move {
506                    let update = ticket::Entity::update_many()
507                        .filter(selector)
508                        .col_expr(ticket::Column::State, Expr::value(new_state as u8))
509                        .exec(tx.as_ref())
510                        .await?;
511                    Ok::<_, DbSqlError>(update.rows_affected as usize)
512                })
513            })
514            .await?)
515    }
516
517    async fn get_ticket_statistics(&self, channel_id: Option<Hash>) -> Result<ChannelTicketStatistics> {
518        let res = match channel_id {
519            None => {
520                #[cfg(all(feature = "prometheus", not(test)))]
521                let mut per_channel_unredeemed = std::collections::HashMap::new();
522
523                self.nest_transaction_in_db(None, TargetDb::Tickets)
524                    .await?
525                    .perform(|tx| {
526                        Box::pin(async move {
527                            let unredeemed_value = ticket::Entity::find()
528                                .stream(tx.as_ref())
529                                .await?
530                                .try_fold(U256::zero(), |amount, x| {
531                                    let unredeemed_value = U256::from_be_bytes(x.amount);
532
533                                    #[cfg(all(feature = "prometheus", not(test)))]
534                                    per_channel_unredeemed
535                                        .entry(x.channel_id)
536                                        .and_modify(|v| *v += unredeemed_value)
537                                        .or_insert(unredeemed_value);
538
539                                    futures::future::ok(amount + unredeemed_value)
540                                })
541                                .await?;
542
543                            #[cfg(all(feature = "prometheus", not(test)))]
544                            for (channel_id, unredeemed_value) in per_channel_unredeemed {
545                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS
546                                    .set(&[&channel_id, "unredeemed"], unredeemed_value.as_u128() as f64);
547                            }
548
549                            let mut all_stats = ticket_statistics::Entity::find()
550                                .all(tx.as_ref())
551                                .await?
552                                .into_iter()
553                                .fold(ChannelTicketStatistics::default(), |mut acc, stats| {
554                                    let neglected_value = HoprBalance::from_be_bytes(stats.neglected_value);
555                                    acc.neglected_value += neglected_value;
556                                    let redeemed_value = HoprBalance::from_be_bytes(stats.redeemed_value);
557                                    acc.redeemed_value += redeemed_value;
558                                    let rejected_value = HoprBalance::from_be_bytes(stats.rejected_value);
559                                    acc.rejected_value += rejected_value;
560                                    acc.winning_tickets += stats.winning_tickets as u128;
561
562                                    #[cfg(all(feature = "prometheus", not(test)))]
563                                    {
564                                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
565                                            &[&stats.channel_id, "neglected"],
566                                            neglected_value.amount().as_u128() as f64,
567                                        );
568
569                                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
570                                            &[&stats.channel_id, "redeemed"],
571                                            redeemed_value.amount().as_u128() as f64,
572                                        );
573
574                                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
575                                            &[&stats.channel_id, "rejected"],
576                                            rejected_value.amount().as_u128() as f64,
577                                        );
578                                    }
579
580                                    acc
581                                });
582
583                            all_stats.unredeemed_value = unredeemed_value.into();
584
585                            Ok::<_, DbSqlError>(all_stats)
586                        })
587                    })
588                    .await
589            }
590            Some(channel) => {
591                // We need to make sure the channel exists to avoid creating
592                // statistic entry for a non-existing channel
593                if self.get_channel_by_id(None, &channel).await?.is_none() {
594                    return Err(DbSqlError::ChannelNotFound(channel).into());
595                }
596
597                self.nest_transaction_in_db(None, TargetDb::Tickets)
598                    .await?
599                    .perform(|tx| {
600                        Box::pin(async move {
601                            let stats = find_stats_for_channel(tx, &channel).await?;
602                            let unredeemed_value = ticket::Entity::find()
603                                .filter(ticket::Column::ChannelId.eq(channel.to_hex()))
604                                .stream(tx.as_ref())
605                                .await?
606                                .try_fold(U256::zero(), |amount, x| {
607                                    futures::future::ok(amount + U256::from_be_bytes(x.amount))
608                                })
609                                .await?;
610
611                            Ok::<_, DbSqlError>(ChannelTicketStatistics {
612                                winning_tickets: stats.winning_tickets as u128,
613                                neglected_value: HoprBalance::from_be_bytes(stats.neglected_value),
614                                redeemed_value: HoprBalance::from_be_bytes(stats.redeemed_value),
615                                unredeemed_value: unredeemed_value.into(),
616                                rejected_value: HoprBalance::from_be_bytes(stats.rejected_value),
617                            })
618                        })
619                    })
620                    .await
621            }
622        };
623        Ok(res?)
624    }
625
626    async fn reset_ticket_statistics(&self) -> Result<()> {
627        let res = self
628            .nest_transaction_in_db(None, TargetDb::Tickets)
629            .await?
630            .perform(|tx| {
631                Box::pin(async move {
632                    #[cfg(all(feature = "prometheus", not(test)))]
633                    let rows = ticket_statistics::Entity::find().all(tx.as_ref()).await?;
634
635                    // delete statistics for the found rows
636                    let deleted = ticket_statistics::Entity::delete_many().exec(tx.as_ref()).await?;
637
638                    #[cfg(all(feature = "prometheus", not(test)))]
639                    {
640                        if deleted.rows_affected > 0 {
641                            for row in rows {
642                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&[&row.channel_id, "neglected"], 0.0_f64);
643
644                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&[&row.channel_id, "redeemed"], 0.0_f64);
645
646                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&[&row.channel_id, "rejected"], 0.0_f64);
647                            }
648                        }
649                    }
650
651                    debug!("reset ticket statistics for {:} channel(s)", deleted.rows_affected);
652
653                    Ok::<_, DbSqlError>(())
654                })
655            })
656            .await;
657
658        Ok(res?)
659    }
660
661    async fn get_tickets_value(&self, selector: TicketSelector) -> Result<(usize, HoprBalance)> {
662        self.get_tickets_value_int(None, selector).await
663    }
664
665    async fn compare_and_set_outgoing_ticket_index(&self, channel_id: Hash, index: u64) -> Result<u64> {
666        let old_value = self
667            .get_outgoing_ticket_index(channel_id)
668            .await?
669            .fetch_max(index, Ordering::SeqCst);
670
671        // TODO: should we hint the persisting mechanism to trigger the flush?
672        // if old_value < index {}
673
674        Ok(old_value)
675    }
676
677    async fn reset_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64> {
678        let old_value = self
679            .get_outgoing_ticket_index(channel_id)
680            .await?
681            .swap(0, Ordering::SeqCst);
682
683        // TODO: should we hint the persisting mechanism to trigger the flush?
684        // if old_value > 0 { }
685
686        Ok(old_value)
687    }
688
689    async fn increment_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64> {
690        let old_value = self
691            .get_outgoing_ticket_index(channel_id)
692            .await?
693            .fetch_add(1, Ordering::SeqCst);
694
695        // TODO: should we hint the persisting mechanism to trigger the flush?
696
697        Ok(old_value)
698    }
699
700    async fn get_outgoing_ticket_index(&self, channel_id: Hash) -> Result<Arc<AtomicU64>> {
701        let tkt_manager = self.ticket_manager.clone();
702
703        Ok(self
704            .caches
705            .ticket_index
706            .try_get_with(channel_id, async move {
707                let maybe_index = outgoing_ticket_index::Entity::find()
708                    .filter(outgoing_ticket_index::Column::ChannelId.eq(channel_id.to_hex()))
709                    .one(&tkt_manager.tickets_db)
710                    .await?;
711
712                Ok(Arc::new(AtomicU64::new(match maybe_index {
713                    Some(model) => U256::from_be_bytes(model.index).as_u64(),
714                    None => {
715                        tkt_manager
716                            .with_write_locked_db(|tx| {
717                                Box::pin(async move {
718                                    outgoing_ticket_index::ActiveModel {
719                                        channel_id: Set(channel_id.to_hex()),
720                                        ..Default::default()
721                                    }
722                                    .insert(tx.as_ref())
723                                    .await?;
724                                    Ok::<_, DbSqlError>(())
725                                })
726                            })
727                            .await?;
728                        0_u64
729                    }
730                })))
731            })
732            .await
733            .map_err(|e: Arc<DbSqlError>| LogicalError(format!("failed to retrieve ticket index: {e}")))?)
734    }
735
736    async fn persist_outgoing_ticket_indices(&self) -> Result<usize> {
737        let outgoing_indices = outgoing_ticket_index::Entity::find()
738            .all(&self.tickets_db)
739            .await
740            .map_err(DbSqlError::from)?;
741
742        let mut updated = 0;
743        for index_model in outgoing_indices {
744            let channel_id = Hash::from_hex(&index_model.channel_id).map_err(DbSqlError::from)?;
745            let db_index = U256::from_be_bytes(&index_model.index).as_u64();
746            if let Some(cached_index) = self.caches.ticket_index.get(&channel_id).await {
747                // Note that the persisted value is always lagging behind the cache,
748                // so the fact that the cached index can change between this load
749                // storing it in the DB is allowed.
750                let cached_index = cached_index.load(Ordering::SeqCst);
751
752                // Store the ticket index in a separate write transaction
753                if cached_index > db_index {
754                    let mut index_active_model = index_model.into_active_model();
755                    index_active_model.index = Set(cached_index.to_be_bytes().to_vec());
756                    self.ticket_manager
757                        .with_write_locked_db(|wtx| {
758                            Box::pin(async move {
759                                index_active_model.save(wtx.as_ref()).await?;
760                                Ok::<_, DbSqlError>(())
761                            })
762                        })
763                        .await?;
764
765                    debug!("updated ticket index in channel {channel_id} from {db_index} to {cached_index}");
766                    updated += 1;
767                }
768            } else {
769                // The value is not yet in the cache, meaning there's low traffic on this
770                // channel, so the value has not been yet fetched.
771                trace!(?channel_id, "channel not in cache yet");
772            }
773        }
774
775        Ok(Ok::<_, DbSqlError>(updated)?)
776    }
777
778    async fn prepare_aggregation_in_channel(
779        &self,
780        channel: &Hash,
781        prerequisites: AggregationPrerequisites,
782    ) -> Result<Option<(OffchainPublicKey, Vec<TransferableWinningTicket>, Hash)>> {
783        let myself = self.clone();
784
785        let channel_id = *channel;
786
787        let (channel_entry, peer, domain_separator, min_win_prob) = self
788            .nest_transaction_in_db(None, TargetDb::Index)
789            .await?
790            .perform(|tx| {
791                Box::pin(async move {
792                    let entry = myself
793                        .get_channel_by_id(Some(tx), &channel_id)
794                        .await?
795                        .ok_or(DbSqlError::ChannelNotFound(channel_id))?;
796
797                    if entry.status == ChannelStatus::Closed {
798                        return Err(DbSqlError::LogicalError(format!("channel '{channel_id}' is closed")));
799                    } else if entry.direction(&myself.me_onchain) != Some(ChannelDirection::Incoming) {
800                        return Err(DbSqlError::LogicalError(format!(
801                            "channel '{channel_id}' is not incoming"
802                        )));
803                    }
804
805                    let pk = myself
806                        .resolve_packet_key(&entry.source)
807                        .await?
808                        .ok_or(DbSqlError::LogicalError(format!(
809                            "peer '{}' has no offchain key record",
810                            entry.source
811                        )))?;
812
813                    let indexer_data = myself.get_indexer_data(Some(tx)).await?;
814
815                    let domain_separator = indexer_data
816                        .channels_dst
817                        .ok_or_else(|| crate::errors::DbSqlError::LogicalError("domain separator missing".into()))?;
818
819                    Ok((
820                        entry,
821                        pk,
822                        domain_separator,
823                        indexer_data.minimum_incoming_ticket_winning_prob,
824                    ))
825                })
826            })
827            .await?;
828
829        let myself = self.clone();
830        let tickets = self
831            .ticket_manager
832            .with_write_locked_db(|tx| {
833                Box::pin(async move {
834                    // verify that no aggregation is in progress in the channel
835                    if ticket::Entity::find()
836                        .filter(WrappedTicketSelector::from(
837                            TicketSelector::from(&channel_entry).with_state(AcknowledgedTicketStatus::BeingAggregated),
838                        ))
839                        .one(tx.as_ref())
840                        .await?
841                        .is_some()
842                    {
843                        return Err(DbSqlError::LogicalError(format!(
844                            "'{channel_entry}' is already being aggregated",
845                        )));
846                    }
847
848                    // find the index of the last ticket being redeemed
849                    let first_idx_to_take = ticket::Entity::find()
850                        .filter(WrappedTicketSelector::from(
851                            TicketSelector::from(&channel_entry).with_state(AcknowledgedTicketStatus::BeingRedeemed),
852                        ))
853                        .order_by_desc(ticket::Column::Index)
854                        .one(tx.as_ref())
855                        .await?
856                        .map(|m| U256::from_be_bytes(m.index).as_u64() + 1)
857                        .unwrap_or(0_u64) // go from the lowest possible index of none is found
858                        .max(channel_entry.ticket_index.as_u64()); // but cannot be less than the ticket index on the Channel entry
859
860                    // get the list of all tickets to be aggregated
861                    let to_be_aggregated = ticket::Entity::find()
862                        .filter(WrappedTicketSelector::from(TicketSelector::from(&channel_entry)))
863                        .filter(ticket::Column::Index.gte(first_idx_to_take.to_be_bytes().to_vec()))
864                        .filter(ticket::Column::State.ne(AcknowledgedTicketStatus::BeingAggregated as u8))
865                        .order_by_asc(ticket::Column::Index)// tickets must be sorted by indices in ascending order
866                        .limit(MAX_TICKETS_TO_AGGREGATE_BATCH)
867                        .all(tx.as_ref())
868                        .await?;
869
870                    // Filter the list of tickets according to the prerequisites
871                    let mut to_be_aggregated: Vec<TransferableWinningTicket> =
872                        filter_satisfying_ticket_models(prerequisites, to_be_aggregated, &channel_entry, min_win_prob)?
873                            .into_iter()
874                            .map(|model| {
875                                AcknowledgedTicket::try_from(model)
876                                    .map_err(DbSqlError::from)
877                                    .and_then(|ack| {
878                                        ack.into_transferable(&myself.chain_key, &domain_separator)
879                                            .map_err(DbSqlError::from)
880                                    })
881                            })
882                            .collect::<crate::errors::Result<Vec<_>>>()?;
883
884                    let mut neglected_idxs = Vec::new();
885
886                    if !to_be_aggregated.is_empty() {
887                        // Clean up any tickets in this channel that are already inside an aggregated ticket.
888                        // This situation cannot be avoided 100% as aggregation can be triggered when out-of-order
889                        // tickets arrive and only some of them are necessary to satisfy the aggregation threshold.
890                        // The following code *assumes* that only the first ticket with the lowest index *can be* an aggregate.
891                        let first_ticket = to_be_aggregated[0].ticket.clone();
892                        let mut i = 1;
893                        while i < to_be_aggregated.len() {
894                            let current_idx = to_be_aggregated[i].ticket.index;
895                            if (first_ticket.index..first_ticket.index + first_ticket.index_offset as u64).contains(&current_idx) {
896                                // Cleanup is the only reasonable thing to do at this point,
897                                // since the aggregator will check for index range overlaps and deny
898                                // the aggregation of the entire batch otherwise.
899                                warn!(ticket_id = current_idx, channel = %channel_id, ?first_ticket, "ticket in channel has been already aggregated and will be removed");
900                                neglected_idxs.push(current_idx);
901                                to_be_aggregated.remove(i);
902                            } else {
903                                i += 1;
904                            }
905                        }
906
907                        // The cleanup (neglecting of tickets) is not made directly here but on the next ticket redemption in this channel
908                        // See handler.rs around L402
909                        if !neglected_idxs.is_empty() {
910                            warn!(count = neglected_idxs.len(), channel = %channel_id, "tickets were neglected due to duplication in an aggregated ticket!");
911                        }
912
913                        // mark all tickets with appropriate characteristics as being aggregated
914                        let marked: sea_orm::UpdateResult = ticket::Entity::update_many()
915                            .filter(WrappedTicketSelector::from(TicketSelector::from(&channel_entry)))
916                            .filter(ticket::Column::Index.is_in(to_be_aggregated.iter().map(|t| t.ticket.index.to_be_bytes().to_vec())))
917                            .filter(ticket::Column::State.ne(AcknowledgedTicketStatus::BeingAggregated as u8))
918                            .col_expr(
919                                ticket::Column::State,
920                                Expr::value(AcknowledgedTicketStatus::BeingAggregated as i8),
921                            )
922                            .exec(tx.as_ref())
923                            .await?;
924
925                        if marked.rows_affected as usize != to_be_aggregated.len() {
926                            return Err(DbSqlError::LogicalError(format!(
927                                "expected to mark {}, but was able to mark {}",
928                                to_be_aggregated.len(),
929                                marked.rows_affected,
930                            )));
931                        }
932                    }
933
934                    debug!(
935                        "prepared {} tickets to aggregate in {} ({})",
936                        to_be_aggregated.len(),
937                        channel_entry.get_id(),
938                        channel_entry.channel_epoch,
939                    );
940
941                    Ok(to_be_aggregated)
942                })
943            })
944            .await?;
945
946        Ok((!tickets.is_empty()).then_some((peer, tickets, domain_separator)))
947    }
948
949    async fn rollback_aggregation_in_channel(&self, channel: Hash) -> Result<()> {
950        let channel_entry = self
951            .get_channel_by_id(None, &channel)
952            .await?
953            .ok_or(DbSqlError::ChannelNotFound(channel))?;
954
955        let selector = TicketSelector::from(channel_entry).with_state(AcknowledgedTicketStatus::BeingAggregated);
956
957        let reverted = self
958            .update_ticket_states(selector, AcknowledgedTicketStatus::Untouched)
959            .await?;
960
961        info!(
962            "rollback happened for ticket aggregation in '{channel}' with {reverted} tickets rolled back as a result",
963        );
964        Ok(())
965    }
966
967    async fn process_received_aggregated_ticket(
968        &self,
969        aggregated_ticket: Ticket,
970        chain_keypair: &ChainKeypair,
971    ) -> Result<AcknowledgedTicket> {
972        if chain_keypair.public().to_address() != self.me_onchain {
973            return Err(DbSqlError::LogicalError(
974                "chain key for ticket aggregation does not match the DB public address".into(),
975            )
976            .into());
977        }
978
979        let myself = self.clone();
980        let channel_id = aggregated_ticket.channel_id;
981
982        let (channel_entry, domain_separator) = self
983            .nest_transaction_in_db(None, TargetDb::Index)
984            .await?
985            .perform(|tx| {
986                Box::pin(async move {
987                    let entry = myself
988                        .get_channel_by_id(Some(tx), &channel_id)
989                        .await?
990                        .ok_or(DbSqlError::ChannelNotFound(channel_id))?;
991
992                    if entry.status == ChannelStatus::Closed {
993                        return Err(DbSqlError::LogicalError(format!("channel '{channel_id}' is closed")));
994                    } else if entry.direction(&myself.me_onchain) != Some(ChannelDirection::Incoming) {
995                        return Err(DbSqlError::LogicalError(format!(
996                            "channel '{channel_id}' is not incoming"
997                        )));
998                    }
999
1000                    let domain_separator =
1001                        myself.get_indexer_data(Some(tx)).await?.channels_dst.ok_or_else(|| {
1002                            crate::errors::DbSqlError::LogicalError("domain separator missing".into())
1003                        })?;
1004
1005                    Ok((entry, domain_separator))
1006                })
1007            })
1008            .await?;
1009
1010        // Verify the ticket first
1011        let aggregated_ticket = aggregated_ticket
1012            .verify(&channel_entry.source, &domain_separator)
1013            .map_err(|e| {
1014                DbSqlError::LogicalError(format!(
1015                    "failed to verify received aggregated ticket in {channel_id}: {e}"
1016                ))
1017            })?;
1018
1019        // Aggregated tickets always have 100% winning probability
1020        if !aggregated_ticket.win_prob().approx_eq(&WinningProbability::ALWAYS) {
1021            return Err(DbSqlError::LogicalError("Aggregated tickets must have 100% win probability".into()).into());
1022        }
1023
1024        let acknowledged_tickets = self
1025            .nest_transaction_in_db(None, TargetDb::Tickets)
1026            .await?
1027            .perform(|tx| {
1028                Box::pin(async move {
1029                    ticket::Entity::find()
1030                        .filter(WrappedTicketSelector::from(
1031                            TicketSelector::from(&channel_entry).with_state(AcknowledgedTicketStatus::BeingAggregated),
1032                        ))
1033                        .all(tx.as_ref())
1034                        .await
1035                        .map_err(DbSqlError::BackendError)
1036                })
1037            })
1038            .await?;
1039
1040        if acknowledged_tickets.is_empty() {
1041            debug!("Received unexpected aggregated ticket in channel {channel_id}");
1042            return Err(DbSqlError::LogicalError(format!(
1043                "failed insert aggregated ticket, because no tickets seem to be aggregated for '{channel_id}'",
1044            ))
1045            .into());
1046        }
1047
1048        let stored_value = acknowledged_tickets
1049            .iter()
1050            .map(|m| HoprBalance::from_be_bytes(&m.amount))
1051            .sum();
1052
1053        // The value of a received ticket can be higher (profit for us) but not lower
1054        if aggregated_ticket.verified_ticket().amount.lt(&stored_value) {
1055            error!(channel = %channel_id, "Aggregated ticket value in channel is lower than sum of stored tickets");
1056            return Err(DbSqlError::LogicalError("Value of received aggregated ticket is too low".into()).into());
1057        }
1058
1059        let acknowledged_tickets = acknowledged_tickets
1060            .into_iter()
1061            .map(AcknowledgedTicket::try_from)
1062            .collect::<hopr_db_entity::errors::Result<Vec<AcknowledgedTicket>>>()
1063            .map_err(DbSqlError::from)?;
1064
1065        // can be done, because the tickets collection is tested for emptiness before
1066        let first_stored_ticket = acknowledged_tickets.first().unwrap();
1067
1068        // calculate the new current ticket index
1069        #[allow(unused_variables)]
1070        let current_ticket_index_from_aggregated_ticket =
1071            U256::from(aggregated_ticket.verified_ticket().index).add(aggregated_ticket.verified_ticket().index_offset);
1072
1073        let acked_aggregated_ticket = aggregated_ticket.into_acknowledged(first_stored_ticket.response.clone());
1074
1075        let ticket = acked_aggregated_ticket.clone();
1076        self.ticket_manager.replace_tickets(ticket).await?;
1077
1078        info!(%acked_aggregated_ticket, "successfully processed received aggregated ticket");
1079        Ok(acked_aggregated_ticket)
1080    }
1081
1082    async fn aggregate_tickets(
1083        &self,
1084        destination: OffchainPublicKey,
1085        mut acked_tickets: Vec<TransferableWinningTicket>,
1086        me: &ChainKeypair,
1087    ) -> Result<VerifiedTicket> {
1088        if me.public().to_address() != self.me_onchain {
1089            return Err(DbSqlError::LogicalError(
1090                "chain key for ticket aggregation does not match the DB public address".into(),
1091            )
1092            .into());
1093        }
1094
1095        let domain_separator = self
1096            .get_indexer_data(None)
1097            .await?
1098            .domain_separator(DomainSeparator::Channel)
1099            .ok_or_else(|| DbSqlError::LogicalError("domain separator missing".into()))?;
1100
1101        if acked_tickets.is_empty() {
1102            return Err(DbSqlError::LogicalError("at least one ticket required for aggregation".to_owned()).into());
1103        }
1104
1105        if acked_tickets.len() == 1 {
1106            let single = acked_tickets
1107                .pop()
1108                .unwrap()
1109                .into_redeemable(&self.me_onchain, &domain_separator)
1110                .map_err(DbSqlError::from)?;
1111
1112            self.compare_and_set_outgoing_ticket_index(
1113                single.verified_ticket().channel_id,
1114                single.verified_ticket().index + 1,
1115            )
1116            .await?;
1117
1118            return Ok(single.ticket);
1119        }
1120
1121        acked_tickets.sort_by(|a, b| a.partial_cmp(b).unwrap_or(cmp::Ordering::Equal));
1122        acked_tickets.dedup();
1123
1124        let myself = self.clone();
1125        let address = myself
1126            .resolve_chain_key(&destination)
1127            .await?
1128            .ok_or(DbSqlError::LogicalError(format!(
1129                "peer '{}' has no chain key record",
1130                destination.to_peerid_str()
1131            )))?;
1132
1133        let (channel_entry, destination, min_win_prob) = self
1134            .nest_transaction_in_db(None, TargetDb::Index)
1135            .await?
1136            .perform(|tx| {
1137                Box::pin(async move {
1138                    let entry = myself
1139                        .get_channel_by_parties(Some(tx), &myself.me_onchain, &address, false)
1140                        .await?
1141                        .ok_or_else(|| {
1142                            DbSqlError::ChannelNotFound(generate_channel_id(&myself.me_onchain, &address))
1143                        })?;
1144
1145                    if entry.status == ChannelStatus::Closed {
1146                        return Err(DbSqlError::LogicalError(format!("{entry} is closed")));
1147                    } else if entry.direction(&myself.me_onchain) != Some(ChannelDirection::Outgoing) {
1148                        return Err(DbSqlError::LogicalError(format!("{entry} is not outgoing")));
1149                    }
1150
1151                    let min_win_prob = myself
1152                        .get_indexer_data(Some(tx))
1153                        .await?
1154                        .minimum_incoming_ticket_winning_prob;
1155                    Ok((entry, address, min_win_prob))
1156                })
1157            })
1158            .await?;
1159
1160        let channel_balance = channel_entry.balance;
1161        let channel_epoch = channel_entry.channel_epoch.as_u32();
1162        let channel_id = channel_entry.get_id();
1163
1164        let mut final_value = HoprBalance::zero();
1165
1166        // Validate all received tickets and turn them into RedeemableTickets
1167        let verified_tickets = acked_tickets
1168            .into_iter()
1169            .map(|t| t.into_redeemable(&self.me_onchain, &domain_separator))
1170            .collect::<hopr_internal_types::errors::Result<Vec<_>>>()
1171            .map_err(|e| {
1172                DbSqlError::LogicalError(format!("trying to aggregate an invalid or a non-winning ticket: {e}"))
1173            })?;
1174
1175        // Perform additional consistency check on the verified tickets
1176        for (i, acked_ticket) in verified_tickets.iter().enumerate() {
1177            if channel_id != acked_ticket.verified_ticket().channel_id {
1178                return Err(DbSqlError::LogicalError(format!(
1179                    "ticket for aggregation has an invalid channel id {}",
1180                    acked_ticket.verified_ticket().channel_id
1181                ))
1182                .into());
1183            }
1184
1185            if acked_ticket.verified_ticket().channel_epoch != channel_epoch {
1186                return Err(DbSqlError::LogicalError("channel epochs do not match".into()).into());
1187            }
1188
1189            if i + 1 < verified_tickets.len()
1190                && acked_ticket.verified_ticket().index + acked_ticket.verified_ticket().index_offset as u64
1191                    > verified_tickets[i + 1].verified_ticket().index
1192            {
1193                return Err(DbSqlError::LogicalError("tickets with overlapping index intervals".into()).into());
1194            }
1195
1196            if acked_ticket
1197                .verified_ticket()
1198                .win_prob()
1199                .approx_cmp(&min_win_prob)
1200                .is_lt()
1201            {
1202                return Err(DbSqlError::LogicalError(
1203                    "cannot aggregate ticket with lower than minimum winning probability in network".into(),
1204                )
1205                .into());
1206            }
1207
1208            final_value += acked_ticket.verified_ticket().amount;
1209            if final_value.gt(&channel_balance) {
1210                return Err(DbSqlError::LogicalError(format!(
1211                    "ticket amount to aggregate {final_value} is greater than the balance {channel_balance} of \
1212                     channel {channel_id}"
1213                ))
1214                .into());
1215            }
1216        }
1217
1218        info!(
1219            "aggregated {} tickets in channel {channel_id} with total value {final_value}",
1220            verified_tickets.len()
1221        );
1222
1223        let first_acked_ticket = verified_tickets.first().unwrap();
1224        let last_acked_ticket = verified_tickets.last().unwrap();
1225
1226        // calculate the minimum current ticket index as the larger value from the acked ticket index and on-chain
1227        // ticket_index from channel_entry
1228        let current_ticket_index_from_acked_tickets = last_acked_ticket.verified_ticket().index + 1;
1229        self.compare_and_set_outgoing_ticket_index(channel_id, current_ticket_index_from_acked_tickets)
1230            .await?;
1231
1232        Ok(TicketBuilder::default()
1233            .direction(&self.me_onchain, &destination)
1234            .balance(final_value)
1235            .index(first_acked_ticket.verified_ticket().index)
1236            .index_offset(
1237                (last_acked_ticket.verified_ticket().index - first_acked_ticket.verified_ticket().index + 1) as u32,
1238            )
1239            .win_prob(WinningProbability::ALWAYS) // Aggregated tickets have always 100% winning probability
1240            .channel_epoch(channel_epoch)
1241            .eth_challenge(first_acked_ticket.verified_ticket().challenge)
1242            .build_signed(me, &domain_separator)
1243            .map_err(DbSqlError::from)?)
1244    }
1245
1246    async fn fix_channels_next_ticket_state(&self) -> Result<()> {
1247        let channels = self.get_incoming_channels(None).await?;
1248
1249        for channel in channels.into_iter() {
1250            let selector = TicketSelector::from(&channel)
1251                .with_state(AcknowledgedTicketStatus::BeingRedeemed)
1252                .with_index(channel.ticket_index.as_u64());
1253
1254            let mut tickets_stream = self
1255                .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::Untouched)
1256                .await?;
1257
1258            while let Some(ticket) = tickets_stream.next().await {
1259                let channel_id = channel.get_id();
1260                let ticket_index = ticket.verified_ticket().index;
1261                let ticket_amount = ticket.verified_ticket().amount;
1262                info!(%channel_id, %ticket_index, %ticket_amount, "fixed next out-of-sync ticket");
1263            }
1264        }
1265
1266        Ok(())
1267    }
1268}
1269
1270impl HoprDb {
1271    /// Used only by non-SQLite code and tests.
1272    pub async fn upsert_ticket<'a>(&'a self, tx: OptTx<'a>, acknowledged_ticket: AcknowledgedTicket) -> Result<()> {
1273        self.nest_transaction_in_db(tx, TargetDb::Tickets)
1274            .await?
1275            .perform(|tx| {
1276                Box::pin(async move {
1277                    // For upserting, we must select only by the triplet (channel id, epoch, index)
1278                    let selector = WrappedTicketSelector::from(
1279                        TicketSelector::new(
1280                            acknowledged_ticket.verified_ticket().channel_id,
1281                            acknowledged_ticket.verified_ticket().channel_epoch,
1282                        )
1283                        .with_index(acknowledged_ticket.verified_ticket().index),
1284                    );
1285
1286                    debug!("upserting ticket {acknowledged_ticket}");
1287                    let mut model = ticket::ActiveModel::from(acknowledged_ticket);
1288
1289                    if let Some(ticket) = ticket::Entity::find().filter(selector).one(tx.as_ref()).await? {
1290                        model.id = Set(ticket.id);
1291                    }
1292
1293                    Ok::<_, DbSqlError>(model.save(tx.as_ref()).await?)
1294                })
1295            })
1296            .await?;
1297        Ok(())
1298    }
1299}
1300
1301#[cfg(test)]
1302mod tests {
1303    use std::{
1304        ops::Add,
1305        sync::atomic::Ordering,
1306        time::{Duration, SystemTime},
1307    };
1308
1309    use anyhow::{Context, anyhow};
1310    use futures::{StreamExt, pin_mut};
1311    use hex_literal::hex;
1312    use hopr_crypto_random::Randomizable;
1313    use hopr_crypto_types::prelude::*;
1314    use hopr_db_api::{
1315        info::DomainSeparator,
1316        prelude::{DbError, TicketMarker},
1317        tickets::ChannelTicketStatistics,
1318    };
1319    use hopr_db_entity::ticket;
1320    use hopr_internal_types::prelude::*;
1321    use hopr_primitive_types::prelude::*;
1322    use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter, Set};
1323
1324    use crate::{
1325        HoprDbGeneralModelOperations, TargetDb,
1326        accounts::HoprDbAccountOperations,
1327        channels::HoprDbChannelOperations,
1328        db::HoprDb,
1329        errors::DbSqlError,
1330        info::HoprDbInfoOperations,
1331        tickets::{AggregationPrerequisites, HoprDbTicketOperations, TicketSelector, filter_satisfying_ticket_models},
1332    };
1333
1334    lazy_static::lazy_static! {
1335        static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
1336        static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
1337        static ref CHANNEL_ID: Hash = generate_channel_id(&BOB.public().to_address(), &ALICE.public().to_address());
1338    }
1339
1340    lazy_static::lazy_static! {
1341        static ref ALICE_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
1342        static ref BOB_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
1343    }
1344
1345    const TICKET_VALUE: u64 = 100_000;
1346
1347    async fn add_peer_mappings(db: &HoprDb, peers: Vec<(OffchainKeypair, ChainKeypair)>) -> crate::errors::Result<()> {
1348        for (peer_offchain, peer_onchain) in peers.into_iter() {
1349            db.insert_account(
1350                None,
1351                AccountEntry {
1352                    public_key: *peer_offchain.public(),
1353                    chain_addr: peer_onchain.public().to_address(),
1354                    entry_type: AccountType::NotAnnounced,
1355                    published_at: 0,
1356                },
1357            )
1358            .await?
1359        }
1360
1361        Ok(())
1362    }
1363
1364    fn generate_random_ack_ticket(
1365        src: &ChainKeypair,
1366        dst: &ChainKeypair,
1367        index: u64,
1368        index_offset: u32,
1369        win_prob: f64,
1370    ) -> anyhow::Result<AcknowledgedTicket> {
1371        let hk1 = HalfKey::random();
1372        let hk2 = HalfKey::random();
1373        let challenge = Response::from_half_keys(&hk1, &hk2)?.to_challenge()?;
1374
1375        Ok(TicketBuilder::default()
1376            .addresses(src, dst)
1377            .amount(TICKET_VALUE)
1378            .index(index)
1379            .index_offset(index_offset)
1380            .win_prob(win_prob.try_into()?)
1381            .channel_epoch(4)
1382            .challenge(challenge)
1383            .build_signed(src, &Hash::default())?
1384            .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?))
1385    }
1386
1387    async fn init_db_with_tickets(
1388        db: &HoprDb,
1389        count_tickets: u64,
1390    ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
1391        init_db_with_tickets_and_channel(db, count_tickets, None).await
1392    }
1393
1394    async fn init_db_with_tickets_and_channel(
1395        db: &HoprDb,
1396        count_tickets: u64,
1397        channel_ticket_index: Option<u32>,
1398    ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
1399        let channel = ChannelEntry::new(
1400            BOB.public().to_address(),
1401            ALICE.public().to_address(),
1402            u32::MAX.into(),
1403            channel_ticket_index.unwrap_or(0u32).into(),
1404            ChannelStatus::Open,
1405            4_u32.into(),
1406        );
1407
1408        db.upsert_channel(None, channel).await?;
1409
1410        let tickets: Vec<AcknowledgedTicket> = (0..count_tickets)
1411            .map(|i| generate_random_ack_ticket(&BOB, &ALICE, i, 1, 1.0))
1412            .collect::<anyhow::Result<Vec<AcknowledgedTicket>>>()?;
1413
1414        let db_clone = db.clone();
1415        let tickets_clone = tickets.clone();
1416        db.begin_transaction_in_db(TargetDb::Tickets)
1417            .await?
1418            .perform(|tx| {
1419                Box::pin(async move {
1420                    for t in tickets_clone {
1421                        db_clone.upsert_ticket(Some(tx), t).await?;
1422                    }
1423                    Ok::<(), DbSqlError>(())
1424                })
1425            })
1426            .await?;
1427
1428        Ok((channel, tickets))
1429    }
1430
1431    #[tokio::test]
1432    async fn test_insert_get_ticket() -> anyhow::Result<()> {
1433        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1434        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
1435            .await?;
1436
1437        let (channel, mut tickets) = init_db_with_tickets(&db, 1).await?;
1438        let ack_ticket = tickets.pop().context("ticket should be present")?;
1439
1440        assert_eq!(
1441            channel.get_id(),
1442            ack_ticket.verified_ticket().channel_id,
1443            "channel ids must match"
1444        );
1445        assert_eq!(
1446            channel.channel_epoch.as_u32(),
1447            ack_ticket.verified_ticket().channel_epoch,
1448            "epochs must match"
1449        );
1450
1451        let db_ticket = db
1452            .get_tickets((&ack_ticket).into())
1453            .await?
1454            .first()
1455            .cloned()
1456            .context("ticket should exist")?;
1457
1458        assert_eq!(ack_ticket, db_ticket, "tickets must be equal");
1459
1460        Ok(())
1461    }
1462
1463    #[tokio::test]
1464    async fn test_mark_redeemed() -> anyhow::Result<()> {
1465        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1466        const COUNT_TICKETS: u64 = 10;
1467
1468        let (_, tickets) = init_db_with_tickets(&db, COUNT_TICKETS).await?;
1469
1470        let stats = db.get_ticket_statistics(None).await?;
1471        assert_eq!(
1472            HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
1473            stats.unredeemed_value,
1474            "unredeemed balance must match"
1475        );
1476        assert_eq!(
1477            HoprBalance::zero(),
1478            stats.redeemed_value,
1479            "there must be 0 redeemed value"
1480        );
1481
1482        assert_eq!(
1483            stats,
1484            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1485            "per channel stats must be same"
1486        );
1487
1488        const TO_REDEEM: u64 = 2;
1489        let db_clone = db.clone();
1490        db.begin_transaction_in_db(TargetDb::Tickets)
1491            .await?
1492            .perform(|_tx| {
1493                Box::pin(async move {
1494                    for ticket in tickets.iter().take(TO_REDEEM as usize) {
1495                        let r = db_clone.mark_tickets_as(ticket.into(), TicketMarker::Redeemed).await?;
1496                        assert_eq!(1, r, "must redeem only a single ticket");
1497                    }
1498                    Ok::<(), DbSqlError>(())
1499                })
1500            })
1501            .await?;
1502
1503        let stats = db.get_ticket_statistics(None).await?;
1504        assert_eq!(
1505            HoprBalance::from(TICKET_VALUE * (COUNT_TICKETS - TO_REDEEM)),
1506            stats.unredeemed_value,
1507            "unredeemed balance must match"
1508        );
1509        assert_eq!(
1510            HoprBalance::from(TICKET_VALUE * TO_REDEEM),
1511            stats.redeemed_value,
1512            "there must be a redeemed value"
1513        );
1514
1515        assert_eq!(
1516            stats,
1517            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1518            "per channel stats must be same"
1519        );
1520
1521        Ok(())
1522    }
1523
1524    #[tokio::test]
1525    async fn test_mark_redeem_should_not_mark_redeem_twice() -> anyhow::Result<()> {
1526        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1527
1528        let ticket = init_db_with_tickets(&db, 1)
1529            .await?
1530            .1
1531            .pop()
1532            .context("should contain a ticket")?;
1533
1534        db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed).await?;
1535        assert_eq!(0, db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed).await?);
1536
1537        Ok(())
1538    }
1539
1540    #[tokio::test]
1541    async fn test_mark_redeem_should_redeem_all_tickets() -> anyhow::Result<()> {
1542        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1543
1544        let count_tickets = 10;
1545        let channel = init_db_with_tickets(&db, count_tickets).await?.0;
1546
1547        let count_marked = db.mark_tickets_as((&channel).into(), TicketMarker::Redeemed).await?;
1548        assert_eq!(count_tickets, count_marked as u64, "must mark all tickets in channel");
1549
1550        Ok(())
1551    }
1552
1553    #[tokio::test]
1554    async fn test_mark_tickets_neglected() -> anyhow::Result<()> {
1555        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1556        const COUNT_TICKETS: u64 = 10;
1557
1558        let (channel, _) = init_db_with_tickets(&db, COUNT_TICKETS).await?;
1559
1560        let stats = db.get_ticket_statistics(None).await?;
1561        assert_eq!(
1562            HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
1563            stats.unredeemed_value,
1564            "unredeemed balance must match"
1565        );
1566        assert_eq!(
1567            HoprBalance::zero(),
1568            stats.neglected_value,
1569            "there must be 0 redeemed value"
1570        );
1571
1572        assert_eq!(
1573            stats,
1574            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1575            "per channel stats must be same"
1576        );
1577
1578        db.mark_tickets_as((&channel).into(), TicketMarker::Neglected).await?;
1579
1580        let stats = db.get_ticket_statistics(None).await?;
1581        assert_eq!(
1582            HoprBalance::zero(),
1583            stats.unredeemed_value,
1584            "unredeemed balance must be zero"
1585        );
1586        assert_eq!(
1587            HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
1588            stats.neglected_value,
1589            "there must be a neglected value"
1590        );
1591
1592        assert_eq!(
1593            stats,
1594            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1595            "per channel stats must be same"
1596        );
1597
1598        Ok(())
1599    }
1600
1601    #[tokio::test]
1602    async fn test_mark_unsaved_ticket_rejected() -> anyhow::Result<()> {
1603        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1604
1605        let (_, mut ticket) = init_db_with_tickets(&db, 1).await?;
1606        let ticket = ticket.pop().context("ticket should be present")?.ticket;
1607
1608        let stats = db.get_ticket_statistics(None).await?;
1609        assert_eq!(HoprBalance::zero(), stats.rejected_value);
1610        assert_eq!(
1611            stats,
1612            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1613            "per channel stats must be same"
1614        );
1615
1616        db.mark_unsaved_ticket_rejected(ticket.verified_ticket()).await?;
1617
1618        let stats = db.get_ticket_statistics(None).await?;
1619        assert_eq!(ticket.verified_ticket().amount, stats.rejected_value);
1620        assert_eq!(
1621            stats,
1622            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1623            "per channel stats must be same"
1624        );
1625
1626        Ok(())
1627    }
1628
1629    #[tokio::test]
1630    async fn test_update_tickets_states_and_fetch() -> anyhow::Result<()> {
1631        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1632        db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
1633            .await?;
1634
1635        let channel = init_db_with_tickets(&db, 10).await?.0;
1636
1637        let selector = TicketSelector::from(&channel).with_index(5);
1638
1639        let v: Vec<AcknowledgedTicket> = db
1640            .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
1641            .await?
1642            .collect()
1643            .await;
1644
1645        assert_eq!(1, v.len(), "single ticket must be updated");
1646        assert_eq!(
1647            AcknowledgedTicketStatus::BeingRedeemed,
1648            v.first().context("should contain a ticket")?.status,
1649            "status must be set"
1650        );
1651
1652        let selector = TicketSelector::from(&channel).with_state(AcknowledgedTicketStatus::Untouched);
1653
1654        let v: Vec<AcknowledgedTicket> = db
1655            .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
1656            .await?
1657            .collect()
1658            .await;
1659
1660        assert_eq!(9, v.len(), "only specific tickets must have state set");
1661        assert!(
1662            v.iter().all(|t| t.verified_ticket().index != 5),
1663            "only tickets with different state must update"
1664        );
1665        assert!(
1666            v.iter().all(|t| t.status == AcknowledgedTicketStatus::BeingRedeemed),
1667            "tickets must have updated state"
1668        );
1669
1670        Ok(())
1671    }
1672
1673    #[tokio::test]
1674    async fn test_update_tickets_states() -> anyhow::Result<()> {
1675        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1676        db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
1677            .await?;
1678
1679        let channel = init_db_with_tickets(&db, 10).await?.0;
1680        let selector = TicketSelector::from(&channel).with_state(AcknowledgedTicketStatus::Untouched);
1681
1682        db.update_ticket_states(selector.clone(), AcknowledgedTicketStatus::BeingRedeemed)
1683            .await?;
1684
1685        let v: Vec<AcknowledgedTicket> = db
1686            .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
1687            .await?
1688            .collect()
1689            .await;
1690
1691        assert!(v.is_empty(), "must not update if already updated");
1692
1693        Ok(())
1694    }
1695
1696    #[tokio::test]
1697    async fn test_ticket_index_should_be_zero_if_not_yet_present() -> anyhow::Result<()> {
1698        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1699
1700        let hash = Hash::default();
1701
1702        let idx = db.get_outgoing_ticket_index(hash).await?;
1703        assert_eq!(0, idx.load(Ordering::SeqCst), "initial index must be zero");
1704
1705        let r = hopr_db_entity::outgoing_ticket_index::Entity::find()
1706            .filter(hopr_db_entity::outgoing_ticket_index::Column::ChannelId.eq(hash.to_hex()))
1707            .one(&db.tickets_db)
1708            .await?
1709            .context("index must exist")?;
1710
1711        assert_eq!(0, U256::from_be_bytes(r.index).as_u64(), "index must be zero");
1712
1713        Ok(())
1714    }
1715
1716    #[tokio::test]
1717    async fn test_ticket_stats_must_fail_for_non_existing_channel() -> anyhow::Result<()> {
1718        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1719
1720        db.get_ticket_statistics(Some(*CHANNEL_ID))
1721            .await
1722            .expect_err("must fail for non-existing channel");
1723
1724        Ok(())
1725    }
1726
1727    #[tokio::test]
1728    async fn test_ticket_stats_must_be_zero_when_no_tickets() -> anyhow::Result<()> {
1729        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1730
1731        let channel = ChannelEntry::new(
1732            BOB.public().to_address(),
1733            ALICE.public().to_address(),
1734            u32::MAX.into(),
1735            0.into(),
1736            ChannelStatus::Open,
1737            4_u32.into(),
1738        );
1739
1740        db.upsert_channel(None, channel).await?;
1741
1742        let stats = db.get_ticket_statistics(Some(*CHANNEL_ID)).await?;
1743
1744        assert_eq!(
1745            ChannelTicketStatistics::default(),
1746            stats,
1747            "must be equal to default which is all zeros"
1748        );
1749
1750        assert_eq!(
1751            stats,
1752            db.get_ticket_statistics(None).await?,
1753            "per-channel stats must be the same as global stats"
1754        );
1755
1756        Ok(())
1757    }
1758
1759    #[tokio::test]
1760    async fn test_ticket_stats_must_be_different_per_channel() -> anyhow::Result<()> {
1761        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1762
1763        let channel_1 = ChannelEntry::new(
1764            BOB.public().to_address(),
1765            ALICE.public().to_address(),
1766            u32::MAX.into(),
1767            0.into(),
1768            ChannelStatus::Open,
1769            4_u32.into(),
1770        );
1771
1772        db.upsert_channel(None, channel_1).await?;
1773
1774        let channel_2 = ChannelEntry::new(
1775            ALICE.public().to_address(),
1776            BOB.public().to_address(),
1777            u32::MAX.into(),
1778            0.into(),
1779            ChannelStatus::Open,
1780            4_u32.into(),
1781        );
1782
1783        db.upsert_channel(None, channel_2).await?;
1784
1785        let t1 = generate_random_ack_ticket(&BOB, &ALICE, 1, 1, 1.0)?;
1786        let t2 = generate_random_ack_ticket(&ALICE, &BOB, 1, 1, 1.0)?;
1787
1788        let value = t1.verified_ticket().amount;
1789
1790        db.upsert_ticket(None, t1).await?;
1791        db.upsert_ticket(None, t2).await?;
1792
1793        let stats_1 = db
1794            .get_ticket_statistics(Some(generate_channel_id(
1795                &BOB.public().to_address(),
1796                &ALICE.public().to_address(),
1797            )))
1798            .await?;
1799
1800        let stats_2 = db
1801            .get_ticket_statistics(Some(generate_channel_id(
1802                &ALICE.public().to_address(),
1803                &BOB.public().to_address(),
1804            )))
1805            .await?;
1806
1807        assert_eq!(value, stats_1.unredeemed_value);
1808        assert_eq!(value, stats_2.unredeemed_value);
1809
1810        assert_eq!(HoprBalance::zero(), stats_1.neglected_value);
1811        assert_eq!(HoprBalance::zero(), stats_2.neglected_value);
1812
1813        assert_eq!(stats_1, stats_2);
1814
1815        db.mark_tickets_as(channel_1.into(), TicketMarker::Neglected).await?;
1816
1817        let stats_1 = db
1818            .get_ticket_statistics(Some(generate_channel_id(
1819                &BOB.public().to_address(),
1820                &ALICE.public().to_address(),
1821            )))
1822            .await?;
1823
1824        let stats_2 = db
1825            .get_ticket_statistics(Some(generate_channel_id(
1826                &ALICE.public().to_address(),
1827                &BOB.public().to_address(),
1828            )))
1829            .await?;
1830
1831        assert_eq!(HoprBalance::zero(), stats_1.unredeemed_value);
1832        assert_eq!(value, stats_1.neglected_value);
1833
1834        assert_eq!(HoprBalance::zero(), stats_2.neglected_value);
1835
1836        Ok(())
1837    }
1838
1839    #[tokio::test]
1840    async fn test_ticket_index_compare_and_set_and_increment() -> anyhow::Result<()> {
1841        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1842
1843        let hash = Hash::default();
1844
1845        let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1846        assert_eq!(0, old_idx, "old value must be 0");
1847
1848        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1849        assert_eq!(1, new_idx, "new value must be 1");
1850
1851        let old_idx = db.increment_outgoing_ticket_index(hash).await?;
1852        assert_eq!(1, old_idx, "old value must be 1");
1853
1854        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1855        assert_eq!(2, new_idx, "new value must be 2");
1856
1857        Ok(())
1858    }
1859
1860    #[tokio::test]
1861    async fn test_ticket_index_compare_and_set_must_not_decrease() -> anyhow::Result<()> {
1862        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1863
1864        let hash = Hash::default();
1865
1866        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1867        assert_eq!(0, new_idx, "value must be 0");
1868
1869        db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1870
1871        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1872        assert_eq!(1, new_idx, "new value must be 1");
1873
1874        let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 0).await?;
1875        assert_eq!(1, old_idx, "old value must be 1");
1876        assert_eq!(1, new_idx, "new value must be 1");
1877
1878        let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1879        assert_eq!(1, old_idx, "old value must be 1");
1880        assert_eq!(1, new_idx, "new value must be 1");
1881
1882        Ok(())
1883    }
1884
1885    #[tokio::test]
1886    async fn test_ticket_index_reset() -> anyhow::Result<()> {
1887        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1888
1889        let hash = Hash::default();
1890
1891        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1892        assert_eq!(0, new_idx, "value must be 0");
1893
1894        db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1895
1896        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1897        assert_eq!(1, new_idx, "new value must be 1");
1898
1899        let old_idx = db.reset_outgoing_ticket_index(hash).await?;
1900        assert_eq!(1, old_idx, "old value must be 1");
1901
1902        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1903        assert_eq!(0, new_idx, "new value must be 0");
1904        Ok(())
1905    }
1906
1907    #[tokio::test]
1908    async fn test_persist_ticket_indices() -> anyhow::Result<()> {
1909        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1910
1911        let hash_1 = Hash::default();
1912        let hash_2 = Hash::from(hopr_crypto_random::random_bytes());
1913
1914        db.get_outgoing_ticket_index(hash_1).await?;
1915        db.compare_and_set_outgoing_ticket_index(hash_2, 10).await?;
1916
1917        let persisted = db.persist_outgoing_ticket_indices().await?;
1918        assert_eq!(1, persisted);
1919
1920        let indices = hopr_db_entity::outgoing_ticket_index::Entity::find()
1921            .all(&db.tickets_db)
1922            .await?;
1923        let idx_1 = indices
1924            .iter()
1925            .find(|idx| idx.channel_id == hash_1.to_hex())
1926            .context("must contain index 1")?;
1927        let idx_2 = indices
1928            .iter()
1929            .find(|idx| idx.channel_id == hash_2.to_hex())
1930            .context("must contain index 2")?;
1931        assert_eq!(0, U256::from_be_bytes(&idx_1.index).as_u64(), "index must be 0");
1932        assert_eq!(10, U256::from_be_bytes(&idx_2.index).as_u64(), "index must be 10");
1933
1934        db.compare_and_set_outgoing_ticket_index(hash_1, 3).await?;
1935        db.increment_outgoing_ticket_index(hash_2).await?;
1936
1937        let persisted = db.persist_outgoing_ticket_indices().await?;
1938        assert_eq!(2, persisted);
1939
1940        let indices = hopr_db_entity::outgoing_ticket_index::Entity::find()
1941            .all(&db.tickets_db)
1942            .await?;
1943        let idx_1 = indices
1944            .iter()
1945            .find(|idx| idx.channel_id == hash_1.to_hex())
1946            .context("must contain index 1")?;
1947        let idx_2 = indices
1948            .iter()
1949            .find(|idx| idx.channel_id == hash_2.to_hex())
1950            .context("must contain index 2")?;
1951        assert_eq!(3, U256::from_be_bytes(&idx_1.index).as_u64(), "index must be 3");
1952        assert_eq!(11, U256::from_be_bytes(&idx_2.index).as_u64(), "index must be 11");
1953        Ok(())
1954    }
1955
1956    #[tokio::test]
1957    async fn test_cache_can_be_cloned_but_referencing_the_original_cache_storage() -> anyhow::Result<()> {
1958        let cache: moka::future::Cache<i64, i64> = moka::future::Cache::new(5);
1959
1960        assert_eq!(cache.weighted_size(), 0);
1961
1962        cache.insert(1, 1).await;
1963        cache.insert(2, 2).await;
1964
1965        let clone = cache.clone();
1966
1967        cache.remove(&1).await;
1968        cache.remove(&2).await;
1969
1970        assert_eq!(cache.get(&1).await, None);
1971        assert_eq!(cache.get(&1).await, clone.get(&1).await);
1972        Ok(())
1973    }
1974
1975    fn dummy_ticket_model(channel_id: Hash, idx: u64, idx_offset: u32, amount: u32) -> ticket::Model {
1976        ticket::Model {
1977            id: 0,
1978            channel_id: channel_id.to_string(),
1979            amount: U256::from(amount).to_be_bytes().to_vec(),
1980            index: idx.to_be_bytes().to_vec(),
1981            index_offset: idx_offset as i32,
1982            winning_probability: hex!("0020C49BA5E34F").to_vec(), // 0.0005
1983            channel_epoch: vec![],
1984            signature: vec![],
1985            response: vec![],
1986            state: 0,
1987            hash: vec![],
1988        }
1989    }
1990
1991    #[tokio::test]
1992    async fn test_aggregation_prerequisites_default_filter_no_tickets() -> anyhow::Result<()> {
1993        let prerequisites = AggregationPrerequisites::default();
1994        assert_eq!(None, prerequisites.min_unaggregated_ratio);
1995        assert_eq!(None, prerequisites.min_ticket_count);
1996
1997        let channel = ChannelEntry::new(
1998            BOB.public().to_address(),
1999            ALICE.public().to_address(),
2000            u32::MAX.into(),
2001            2.into(),
2002            ChannelStatus::Open,
2003            4_u32.into(),
2004        );
2005
2006        let dummy_tickets = vec![dummy_ticket_model(channel.get_id(), 1, 1, 1)];
2007
2008        let filtered_tickets =
2009            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2010
2011        assert_eq!(
2012            dummy_tickets, filtered_tickets,
2013            "empty prerequisites must not filter anything"
2014        );
2015        Ok(())
2016    }
2017
2018    #[tokio::test]
2019    async fn test_aggregation_prerequisites_should_filter_out_tickets_with_lower_than_min_win_prob()
2020    -> anyhow::Result<()> {
2021        let prerequisites = AggregationPrerequisites::default();
2022        assert_eq!(None, prerequisites.min_unaggregated_ratio);
2023        assert_eq!(None, prerequisites.min_ticket_count);
2024
2025        let channel = ChannelEntry::new(
2026            BOB.public().to_address(),
2027            ALICE.public().to_address(),
2028            u32::MAX.into(),
2029            2.into(),
2030            ChannelStatus::Open,
2031            4_u32.into(),
2032        );
2033
2034        let dummy_tickets = vec![dummy_ticket_model(channel.get_id(), 1, 1, 1)];
2035
2036        let filtered_tickets =
2037            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0006.try_into()?)?;
2038
2039        assert!(
2040            filtered_tickets.is_empty(),
2041            "must filter out tickets with lower win prob"
2042        );
2043        Ok(())
2044    }
2045
2046    #[tokio::test]
2047    async fn test_aggregation_prerequisites_must_trim_tickets_exceeding_channel_balance() -> anyhow::Result<()> {
2048        const TICKET_COUNT: usize = 110;
2049
2050        let prerequisites = AggregationPrerequisites::default();
2051        assert_eq!(None, prerequisites.min_unaggregated_ratio);
2052        assert_eq!(None, prerequisites.min_ticket_count);
2053
2054        let channel = ChannelEntry::new(
2055            BOB.public().to_address(),
2056            ALICE.public().to_address(),
2057            100.into(),
2058            (TICKET_COUNT + 1).into(),
2059            ChannelStatus::Open,
2060            4_u32.into(),
2061        );
2062
2063        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2064            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2065            .collect();
2066
2067        let filtered_tickets =
2068            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2069
2070        assert_eq!(
2071            100,
2072            filtered_tickets.len(),
2073            "must take only tickets up to channel balance"
2074        );
2075        assert!(
2076            filtered_tickets
2077                .into_iter()
2078                .map(|t| U256::from_be_bytes(t.amount).as_u64())
2079                .sum::<u64>()
2080                <= channel.balance.amount().as_u64(),
2081            "filtered tickets must not exceed channel balance"
2082        );
2083        Ok(())
2084    }
2085
2086    #[tokio::test]
2087    async fn test_aggregation_prerequisites_must_return_empty_when_minimum_ticket_count_not_met() -> anyhow::Result<()>
2088    {
2089        const TICKET_COUNT: usize = 10;
2090
2091        let prerequisites = AggregationPrerequisites {
2092            min_ticket_count: Some(TICKET_COUNT + 1),
2093            min_unaggregated_ratio: None,
2094        };
2095
2096        let channel = ChannelEntry::new(
2097            BOB.public().to_address(),
2098            ALICE.public().to_address(),
2099            100.into(),
2100            (TICKET_COUNT + 1).into(),
2101            ChannelStatus::Open,
2102            4_u32.into(),
2103        );
2104
2105        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2106            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2107            .collect();
2108
2109        let filtered_tickets =
2110            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2111
2112        assert!(
2113            filtered_tickets.is_empty(),
2114            "must return empty when min_ticket_count is not met"
2115        );
2116
2117        Ok(())
2118    }
2119
2120    #[tokio::test]
2121    async fn test_aggregation_prerequisites_must_return_empty_when_minimum_unaggregated_ratio_is_not_met()
2122    -> anyhow::Result<()> {
2123        const TICKET_COUNT: usize = 10;
2124
2125        let prerequisites = AggregationPrerequisites {
2126            min_ticket_count: None,
2127            min_unaggregated_ratio: Some(0.9),
2128        };
2129
2130        let channel = ChannelEntry::new(
2131            BOB.public().to_address(),
2132            ALICE.public().to_address(),
2133            100.into(),
2134            (TICKET_COUNT + 1).into(),
2135            ChannelStatus::Open,
2136            4_u32.into(),
2137        );
2138
2139        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2140            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2141            .collect();
2142
2143        let filtered_tickets =
2144            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2145
2146        assert!(
2147            filtered_tickets.is_empty(),
2148            "must return empty when min_unaggregated_ratio is not met"
2149        );
2150
2151        Ok(())
2152    }
2153
2154    #[tokio::test]
2155    async fn test_aggregation_prerequisites_must_return_all_when_minimum_ticket_count_is_met() -> anyhow::Result<()> {
2156        const TICKET_COUNT: usize = 10;
2157
2158        let prerequisites = AggregationPrerequisites {
2159            min_ticket_count: Some(TICKET_COUNT),
2160            min_unaggregated_ratio: None,
2161        };
2162
2163        let channel = ChannelEntry::new(
2164            BOB.public().to_address(),
2165            ALICE.public().to_address(),
2166            100.into(),
2167            (TICKET_COUNT + 1).into(),
2168            ChannelStatus::Open,
2169            4_u32.into(),
2170        );
2171
2172        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2173            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2174            .collect();
2175
2176        let filtered_tickets =
2177            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2178
2179        assert!(!filtered_tickets.is_empty(), "must not return empty");
2180        assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2181        Ok(())
2182    }
2183
2184    #[tokio::test]
2185    async fn test_aggregation_prerequisites_must_return_all_when_minimum_ticket_count_is_met_regardless_ratio()
2186    -> anyhow::Result<()> {
2187        const TICKET_COUNT: usize = 10;
2188
2189        let prerequisites = AggregationPrerequisites {
2190            min_ticket_count: Some(TICKET_COUNT),
2191            min_unaggregated_ratio: Some(0.9),
2192        };
2193
2194        let channel = ChannelEntry::new(
2195            BOB.public().to_address(),
2196            ALICE.public().to_address(),
2197            100.into(),
2198            (TICKET_COUNT + 1).into(),
2199            ChannelStatus::Open,
2200            4_u32.into(),
2201        );
2202
2203        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2204            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2205            .collect();
2206
2207        let filtered_tickets =
2208            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2209
2210        assert!(!filtered_tickets.is_empty(), "must not return empty");
2211        assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2212        Ok(())
2213    }
2214
2215    #[tokio::test]
2216    async fn test_aggregation_prerequisites_must_return_all_when_minimum_unaggregated_ratio_is_met()
2217    -> anyhow::Result<()> {
2218        const TICKET_COUNT: usize = 90;
2219
2220        let prerequisites = AggregationPrerequisites {
2221            min_ticket_count: None,
2222            min_unaggregated_ratio: Some(0.9),
2223        };
2224
2225        let channel = ChannelEntry::new(
2226            BOB.public().to_address(),
2227            ALICE.public().to_address(),
2228            100.into(),
2229            (TICKET_COUNT + 1).into(),
2230            ChannelStatus::Open,
2231            4_u32.into(),
2232        );
2233
2234        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2235            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2236            .collect();
2237
2238        let filtered_tickets =
2239            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2240
2241        assert!(!filtered_tickets.is_empty(), "must not return empty");
2242        assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2243        Ok(())
2244    }
2245
2246    #[tokio::test]
2247    async fn test_aggregation_prerequisites_must_return_all_when_minimum_unaggregated_ratio_is_met_regardless_count()
2248    -> anyhow::Result<()> {
2249        const TICKET_COUNT: usize = 90;
2250
2251        let prerequisites = AggregationPrerequisites {
2252            min_ticket_count: Some(TICKET_COUNT + 1),
2253            min_unaggregated_ratio: Some(0.9),
2254        };
2255
2256        let channel = ChannelEntry::new(
2257            BOB.public().to_address(),
2258            ALICE.public().to_address(),
2259            100.into(),
2260            (TICKET_COUNT + 1).into(),
2261            ChannelStatus::Open,
2262            4_u32.into(),
2263        );
2264
2265        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2266            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2267            .collect();
2268
2269        let filtered_tickets =
2270            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2271
2272        assert!(!filtered_tickets.is_empty(), "must not return empty");
2273        assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2274        Ok(())
2275    }
2276
2277    #[tokio::test]
2278    async fn test_aggregation_prerequisites_must_return_tickets_when_minimum_incl_aggregated_ratio_is_met()
2279    -> anyhow::Result<()> {
2280        const TICKET_COUNT: usize = 90;
2281
2282        let prerequisites = AggregationPrerequisites {
2283            min_ticket_count: None,
2284            min_unaggregated_ratio: Some(0.9),
2285        };
2286
2287        let channel = ChannelEntry::new(
2288            BOB.public().to_address(),
2289            ALICE.public().to_address(),
2290            100.into(),
2291            (TICKET_COUNT + 1).into(),
2292            ChannelStatus::Open,
2293            4_u32.into(),
2294        );
2295
2296        let mut dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2297            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2298            .collect();
2299        dummy_tickets[0].index_offset = 2; // Make this ticket aggregated
2300
2301        let filtered_tickets =
2302            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2303
2304        assert_eq!(filtered_tickets.len(), TICKET_COUNT);
2305        Ok(())
2306    }
2307
2308    #[tokio::test]
2309    async fn test_aggregation_prerequisites_must_return_empty_when_minimum_only_unaggregated_ratio_is_met_in_single_ticket_only()
2310    -> anyhow::Result<()> {
2311        let prerequisites = AggregationPrerequisites {
2312            min_ticket_count: None,
2313            min_unaggregated_ratio: Some(0.9),
2314        };
2315
2316        let channel = ChannelEntry::new(
2317            BOB.public().to_address(),
2318            ALICE.public().to_address(),
2319            100.into(),
2320            2.into(),
2321            ChannelStatus::Open,
2322            4_u32.into(),
2323        );
2324
2325        // Single aggregated ticket exceeding the min_unaggregated_ratio
2326        let dummy_tickets = vec![dummy_ticket_model(channel.get_id(), 1, 2, 110)];
2327
2328        let filtered_tickets =
2329            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2330
2331        assert!(filtered_tickets.is_empty(), "must return empty");
2332        Ok(())
2333    }
2334
2335    async fn create_alice_db_with_tickets_from_bob(
2336        ticket_count: usize,
2337    ) -> anyhow::Result<(HoprDb, ChannelEntry, Vec<AcknowledgedTicket>)> {
2338        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
2339
2340        db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
2341            .await?;
2342
2343        add_peer_mappings(
2344            &db,
2345            vec![
2346                (ALICE_OFFCHAIN.clone(), ALICE.clone()),
2347                (BOB_OFFCHAIN.clone(), BOB.clone()),
2348            ],
2349        )
2350        .await?;
2351
2352        let (channel, tickets) = init_db_with_tickets(&db, ticket_count as u64).await?;
2353
2354        Ok((db, channel, tickets))
2355    }
2356
2357    #[tokio::test]
2358    async fn test_ticket_aggregation_should_fail_if_any_ticket_is_being_aggregated_in_that_channel()
2359    -> anyhow::Result<()> {
2360        const COUNT_TICKETS: usize = 5;
2361
2362        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2363
2364        assert_eq!(tickets.len(), COUNT_TICKETS);
2365
2366        let existing_channel_with_multiple_tickets = channel.get_id();
2367
2368        // mark the first ticket as being aggregated
2369        let mut ticket = hopr_db_entity::ticket::Entity::find()
2370            .one(&db.tickets_db)
2371            .await?
2372            .context("should have an active model")?
2373            .into_active_model();
2374        ticket.state = Set(AcknowledgedTicketStatus::BeingAggregated as i8);
2375        ticket.save(&db.tickets_db).await?;
2376
2377        assert!(
2378            db.prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2379                .await
2380                .is_err()
2381        );
2382
2383        Ok(())
2384    }
2385
2386    #[tokio::test]
2387    async fn test_ticket_aggregation_should_not_offer_tickets_with_lower_than_min_win_prob() -> anyhow::Result<()> {
2388        const COUNT_TICKETS: usize = 5;
2389
2390        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2391
2392        assert_eq!(tickets.len(), COUNT_TICKETS);
2393
2394        let existing_channel_with_multiple_tickets = channel.get_id();
2395
2396        // Decrease the win prob of one ticket
2397        let mut ticket = hopr_db_entity::ticket::Entity::find()
2398            .one(&db.tickets_db)
2399            .await?
2400            .context("should have an active model")?
2401            .into_active_model();
2402        ticket.winning_probability = Set(WinningProbability::try_from_f64(0.5)?.as_ref().to_vec());
2403        ticket.save(&db.tickets_db).await?;
2404
2405        let prepared_tickets = db
2406            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2407            .await?
2408            .ok_or(anyhow!("should contain tickets"))?
2409            .1;
2410
2411        assert_eq!(COUNT_TICKETS - 1, prepared_tickets.len());
2412
2413        Ok(())
2414    }
2415
2416    #[tokio::test]
2417    async fn test_ticket_aggregation_prepare_request_with_0_tickets_should_return_empty_result() -> anyhow::Result<()> {
2418        const COUNT_TICKETS: usize = 0;
2419
2420        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2421
2422        assert_eq!(tickets.len(), COUNT_TICKETS);
2423
2424        let existing_channel_with_0_tickets = channel.get_id();
2425
2426        let actual = db
2427            .prepare_aggregation_in_channel(&existing_channel_with_0_tickets, Default::default())
2428            .await?;
2429
2430        assert_eq!(actual, None);
2431
2432        Ok(())
2433    }
2434
2435    #[tokio::test]
2436    async fn test_ticket_aggregation_prepare_request_with_multiple_tickets_should_return_that_ticket()
2437    -> anyhow::Result<()> {
2438        const COUNT_TICKETS: usize = 2;
2439
2440        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2441        let tickets: Vec<TransferableWinningTicket> = tickets
2442            .into_iter()
2443            .map(|t| t.into_transferable(&ALICE, &Hash::default()))
2444            .collect::<hopr_internal_types::errors::Result<Vec<TransferableWinningTicket>>>()?;
2445
2446        assert_eq!(tickets.len(), COUNT_TICKETS);
2447
2448        let existing_channel_with_multiple_tickets = channel.get_id();
2449
2450        let actual = db
2451            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2452            .await?;
2453
2454        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2455
2456        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2457            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2458            .count(&db.tickets_db)
2459            .await? as usize;
2460
2461        assert_eq!(actual_being_aggregated_count, COUNT_TICKETS);
2462
2463        Ok(())
2464    }
2465
2466    #[tokio::test]
2467    async fn test_ticket_aggregation_prepare_request_with_duplicate_tickets_should_return_dedup_aggregated_ticket()
2468    -> anyhow::Result<()> {
2469        let (db, channel, _) = create_alice_db_with_tickets_from_bob(0).await?;
2470        let tickets = vec![
2471            generate_random_ack_ticket(&BOB, &ALICE, 1, 1, 1.0),
2472            generate_random_ack_ticket(&BOB, &ALICE, 0, 2, 1.0),
2473            generate_random_ack_ticket(&BOB, &ALICE, 2, 1, 1.0),
2474            generate_random_ack_ticket(&BOB, &ALICE, 3, 1, 1.0),
2475        ]
2476        .into_iter()
2477        .collect::<anyhow::Result<Vec<AcknowledgedTicket>>>()?;
2478
2479        let tickets_clone = tickets.clone();
2480        let db_clone = db.clone();
2481        db.nest_transaction_in_db(None, TargetDb::Tickets)
2482            .await?
2483            .perform(|tx| {
2484                Box::pin(async move {
2485                    for ticket in tickets_clone {
2486                        db_clone.upsert_ticket(tx.into(), ticket).await?;
2487                    }
2488                    Ok::<_, DbError>(())
2489                })
2490            })
2491            .await?;
2492
2493        let existing_channel_with_multiple_tickets = channel.get_id();
2494        let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2495        assert_eq!(stats.neglected_value, HoprBalance::zero());
2496
2497        let actual = db
2498            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2499            .await?;
2500
2501        let mut tickets = tickets
2502            .into_iter()
2503            .map(|t| t.into_transferable(&ALICE, &Hash::default()))
2504            .collect::<hopr_internal_types::errors::Result<Vec<_>>>()?;
2505
2506        // We expect the first ticket to be removed
2507        tickets.remove(0);
2508        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2509
2510        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2511            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2512            .count(&db.tickets_db)
2513            .await? as usize;
2514
2515        assert_eq!(actual_being_aggregated_count, 3);
2516
2517        Ok(())
2518    }
2519
2520    #[tokio::test]
2521    async fn test_ticket_aggregation_prepare_request_with_a_being_redeemed_ticket_should_aggregate_only_the_tickets_following_it()
2522    -> anyhow::Result<()> {
2523        const COUNT_TICKETS: usize = 5;
2524
2525        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2526        let mut tickets = tickets
2527            .into_iter()
2528            .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2529            .collect::<Vec<_>>();
2530
2531        assert_eq!(tickets.len(), COUNT_TICKETS);
2532
2533        let existing_channel_with_multiple_tickets = channel.get_id();
2534
2535        // mark the first ticket as being redeemed
2536        let mut ticket = hopr_db_entity::ticket::Entity::find()
2537            .one(&db.tickets_db)
2538            .await?
2539            .context("should have 1 active model")?
2540            .into_active_model();
2541        ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
2542        ticket.save(&db.tickets_db).await?;
2543
2544        let actual = db
2545            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2546            .await?;
2547
2548        tickets.remove(0);
2549        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2550
2551        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2552            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2553            .count(&db.tickets_db)
2554            .await? as usize;
2555
2556        assert_eq!(actual_being_aggregated_count, COUNT_TICKETS - 1);
2557
2558        Ok(())
2559    }
2560
2561    #[tokio::test]
2562    async fn test_ticket_aggregation_prepare_request_with_some_requirements_should_return_when_ticket_threshold_is_met()
2563    -> anyhow::Result<()> {
2564        const COUNT_TICKETS: usize = 5;
2565
2566        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2567        let tickets = tickets
2568            .into_iter()
2569            .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2570            .collect::<Vec<_>>();
2571
2572        assert_eq!(tickets.len(), COUNT_TICKETS);
2573
2574        let existing_channel_with_multiple_tickets = channel.get_id();
2575
2576        let constraints = AggregationPrerequisites {
2577            min_ticket_count: Some(COUNT_TICKETS - 1),
2578            min_unaggregated_ratio: None,
2579        };
2580        let actual = db
2581            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, constraints)
2582            .await?;
2583
2584        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2585
2586        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2587            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2588            .count(&db.tickets_db)
2589            .await? as usize;
2590
2591        assert_eq!(actual_being_aggregated_count, COUNT_TICKETS);
2592
2593        Ok(())
2594    }
2595
2596    #[tokio::test]
2597    async fn test_ticket_aggregation_prepare_request_with_some_requirements_should_not_return_when_ticket_threshold_is_not_met()
2598    -> anyhow::Result<()> {
2599        const COUNT_TICKETS: usize = 2;
2600
2601        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2602
2603        assert_eq!(tickets.len(), COUNT_TICKETS);
2604
2605        let existing_channel_with_multiple_tickets = channel.get_id();
2606
2607        let constraints = AggregationPrerequisites {
2608            min_ticket_count: Some(COUNT_TICKETS + 1),
2609            min_unaggregated_ratio: None,
2610        };
2611        let actual = db
2612            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, constraints)
2613            .await?;
2614
2615        assert_eq!(actual, None);
2616
2617        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2618            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2619            .count(&db.tickets_db)
2620            .await? as usize;
2621
2622        assert_eq!(actual_being_aggregated_count, 0);
2623
2624        Ok(())
2625    }
2626
2627    #[tokio::test]
2628    async fn test_ticket_aggregation_prepare_request_with_no_aggregatable_tickets_should_return_nothing()
2629    -> anyhow::Result<()> {
2630        const COUNT_TICKETS: usize = 3;
2631
2632        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2633
2634        assert_eq!(tickets.len(), { COUNT_TICKETS });
2635
2636        let existing_channel_with_multiple_tickets = channel.get_id();
2637
2638        // mark all tickets as being redeemed
2639        for ticket in hopr_db_entity::ticket::Entity::find()
2640            .all(&db.tickets_db)
2641            .await?
2642            .into_iter()
2643        {
2644            let mut ticket = ticket.into_active_model();
2645            ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
2646            ticket.save(&db.tickets_db).await?;
2647        }
2648
2649        let actual = db
2650            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2651            .await?;
2652
2653        assert_eq!(actual, None);
2654
2655        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2656            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2657            .count(&db.tickets_db)
2658            .await? as usize;
2659
2660        assert_eq!(actual_being_aggregated_count, 0);
2661
2662        Ok(())
2663    }
2664
2665    #[tokio::test]
2666    async fn test_ticket_aggregation_rollback_should_rollback_all_the_being_aggregated_tickets_but_nothing_else()
2667    -> anyhow::Result<()> {
2668        const COUNT_TICKETS: usize = 5;
2669
2670        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2671
2672        assert_eq!(tickets.len(), COUNT_TICKETS);
2673
2674        let existing_channel_with_multiple_tickets = channel.get_id();
2675
2676        // mark the first ticket as being redeemed
2677        let mut ticket = hopr_db_entity::ticket::Entity::find()
2678            .one(&db.tickets_db)
2679            .await?
2680            .context("should have one active model")?
2681            .into_active_model();
2682        ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
2683        ticket.save(&db.tickets_db).await?;
2684
2685        assert!(
2686            db.prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2687                .await
2688                .is_ok()
2689        );
2690
2691        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2692            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2693            .count(&db.tickets_db)
2694            .await? as usize;
2695
2696        assert_eq!(actual_being_aggregated_count, COUNT_TICKETS - 1);
2697
2698        assert!(
2699            db.rollback_aggregation_in_channel(existing_channel_with_multiple_tickets)
2700                .await
2701                .is_ok()
2702        );
2703
2704        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2705            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2706            .count(&db.tickets_db)
2707            .await? as usize;
2708
2709        assert_eq!(actual_being_aggregated_count, 0);
2710
2711        Ok(())
2712    }
2713
2714    #[tokio::test]
2715    async fn test_ticket_aggregation_should_replace_the_tickets_with_a_correctly_aggregated_ticket()
2716    -> anyhow::Result<()> {
2717        const COUNT_TICKETS: usize = 5;
2718
2719        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2720
2721        let (notifier_tx, notifier_rx) = futures::channel::mpsc::unbounded();
2722        db.start_ticket_processing(Some(notifier_tx))?;
2723
2724        let tickets = tickets
2725            .into_iter()
2726            .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2727            .collect::<Vec<_>>();
2728
2729        let first_ticket = tickets.first().context("should contain tickets")?.ticket.clone();
2730        let aggregated_ticket = TicketBuilder::default()
2731            .addresses(&*BOB, &*ALICE)
2732            .amount(
2733                tickets
2734                    .iter()
2735                    .fold(U256::zero(), |acc, v| acc + v.ticket.amount.amount()),
2736            )
2737            .index(first_ticket.index)
2738            .index_offset(
2739                tickets.last().context("should contain tickets")?.ticket.index as u32 - first_ticket.index as u32 + 1,
2740            )
2741            .win_prob(1.0.try_into()?)
2742            .channel_epoch(first_ticket.channel_epoch)
2743            .eth_challenge(first_ticket.challenge)
2744            .build_signed(&BOB, &Hash::default())?;
2745
2746        assert_eq!(tickets.len(), COUNT_TICKETS);
2747
2748        let existing_channel_with_multiple_tickets = channel.get_id();
2749
2750        let actual = db
2751            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2752            .await?;
2753
2754        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2755
2756        let agg_ticket = aggregated_ticket.clone();
2757
2758        let _ = db
2759            .process_received_aggregated_ticket(aggregated_ticket.leak(), &ALICE)
2760            .await?;
2761
2762        pin_mut!(notifier_rx);
2763        let notified_ticket = notifier_rx.next().await.ok_or(anyhow!("must have ticket"))?;
2764
2765        assert_eq!(notified_ticket.verified_ticket(), agg_ticket.verified_ticket());
2766
2767        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2768            .filter(ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2769            .count(&db.tickets_db)
2770            .await? as usize;
2771
2772        assert_eq!(actual_being_aggregated_count, 0);
2773
2774        Ok(())
2775    }
2776
2777    #[tokio::test]
2778    async fn test_ticket_aggregation_should_fail_if_the_aggregated_ticket_value_is_lower_than_the_stored_one()
2779    -> anyhow::Result<()> {
2780        const COUNT_TICKETS: usize = 5;
2781
2782        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2783        let tickets = tickets
2784            .into_iter()
2785            .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2786            .collect::<Vec<_>>();
2787
2788        let first_ticket = tickets.first().context("should contain tickets")?.ticket.clone();
2789        let aggregated_ticket = TicketBuilder::default()
2790            .addresses(&*BOB, &*ALICE)
2791            .amount(0)
2792            .index(first_ticket.index)
2793            .index_offset(
2794                tickets.last().context("should contain tickets")?.ticket.index as u32 - first_ticket.index as u32 + 1,
2795            )
2796            .win_prob(1.0.try_into()?)
2797            .channel_epoch(first_ticket.channel_epoch)
2798            .eth_challenge(first_ticket.challenge)
2799            .build_signed(&BOB, &Hash::default())?;
2800
2801        assert_eq!(tickets.len(), COUNT_TICKETS);
2802
2803        let existing_channel_with_multiple_tickets = channel.get_id();
2804
2805        let actual = db
2806            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2807            .await?;
2808
2809        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2810
2811        assert!(
2812            db.process_received_aggregated_ticket(aggregated_ticket.leak(), &ALICE)
2813                .await
2814                .is_err()
2815        );
2816
2817        Ok(())
2818    }
2819
2820    #[tokio::test]
2821    async fn test_ticket_aggregation_should_fail_if_the_aggregated_ticket_win_probability_is_not_equal_to_1()
2822    -> anyhow::Result<()> {
2823        const COUNT_TICKETS: usize = 5;
2824
2825        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2826        let tickets = tickets
2827            .into_iter()
2828            .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2829            .collect::<Vec<_>>();
2830
2831        let first_ticket = tickets.first().context("should contain tickets")?.ticket.clone();
2832        let aggregated_ticket = TicketBuilder::default()
2833            .addresses(&*BOB, &*ALICE)
2834            .amount(0)
2835            .index(first_ticket.index)
2836            .index_offset(
2837                tickets.last().context("should contain tickets")?.ticket.index as u32 - first_ticket.index as u32 + 1,
2838            )
2839            .win_prob(0.5.try_into()?) // 50% winning prob
2840            .channel_epoch(first_ticket.channel_epoch)
2841            .eth_challenge(first_ticket.challenge)
2842            .build_signed(&BOB, &Hash::default())?;
2843
2844        assert_eq!(tickets.len(), COUNT_TICKETS);
2845
2846        let existing_channel_with_multiple_tickets = channel.get_id();
2847
2848        let actual = db
2849            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2850            .await?;
2851
2852        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2853
2854        assert!(
2855            db.process_received_aggregated_ticket(aggregated_ticket.leak(), &ALICE)
2856                .await
2857                .is_err()
2858        );
2859
2860        Ok(())
2861    }
2862
2863    async fn init_db_with_channel(channel: ChannelEntry) -> anyhow::Result<HoprDb> {
2864        let db = HoprDb::new_in_memory(BOB.clone()).await?;
2865
2866        db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
2867            .await?;
2868
2869        add_peer_mappings(
2870            &db,
2871            vec![
2872                (ALICE_OFFCHAIN.clone(), ALICE.clone()),
2873                (BOB_OFFCHAIN.clone(), BOB.clone()),
2874            ],
2875        )
2876        .await?;
2877
2878        db.upsert_channel(None, channel).await?;
2879
2880        Ok(db)
2881    }
2882
2883    #[tokio::test]
2884    async fn test_aggregate_ticket_should_aggregate() -> anyhow::Result<()> {
2885        const COUNT_TICKETS: usize = 5;
2886
2887        let channel = ChannelEntry::new(
2888            BOB.public().to_address(),
2889            ALICE.public().to_address(),
2890            u32::MAX.into(),
2891            (COUNT_TICKETS + 1).into(),
2892            ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(120))),
2893            4_u32.into(),
2894        );
2895
2896        let db = init_db_with_channel(channel).await?;
2897
2898        let tickets = (0..COUNT_TICKETS)
2899            .map(|i| {
2900                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
2901                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
2902            })
2903            .collect::<anyhow::Result<Vec<_>>>()?;
2904
2905        let sum_value = tickets.iter().fold(HoprBalance::zero(), |acc, x| acc + x.ticket.amount);
2906        let min_idx = tickets
2907            .iter()
2908            .map(|t| t.ticket.index)
2909            .min()
2910            .context("min index should be present")?;
2911        let max_idx = tickets
2912            .iter()
2913            .map(|t| t.ticket.index)
2914            .max()
2915            .context("max index should be present")?;
2916
2917        let aggregated = db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets, &BOB).await?;
2918
2919        assert_eq!(
2920            &BOB.public().to_address(),
2921            aggregated.verified_issuer(),
2922            "must have correct signer"
2923        );
2924
2925        assert!(aggregated.verified_ticket().is_aggregated(), "must be aggregated");
2926
2927        assert_eq!(
2928            COUNT_TICKETS,
2929            aggregated.verified_ticket().index_offset as usize,
2930            "aggregated ticket must have correct offset"
2931        );
2932        assert_eq!(
2933            sum_value,
2934            aggregated.verified_ticket().amount,
2935            "aggregated ticket token amount must be sum of individual tickets"
2936        );
2937        assert_eq!(
2938            1.0,
2939            aggregated.win_prob(),
2940            "aggregated ticket must have winning probability 1"
2941        );
2942        assert_eq!(
2943            min_idx,
2944            aggregated.verified_ticket().index,
2945            "aggregated ticket must have correct index"
2946        );
2947        assert_eq!(
2948            channel.get_id(),
2949            aggregated.verified_ticket().channel_id,
2950            "aggregated ticket must have correct channel id"
2951        );
2952        assert_eq!(
2953            channel.channel_epoch.as_u32(),
2954            aggregated.verified_ticket().channel_epoch,
2955            "aggregated ticket must have correct channel epoch"
2956        );
2957
2958        assert_eq!(
2959            max_idx + 1,
2960            db.get_outgoing_ticket_index(channel.get_id())
2961                .await?
2962                .load(Ordering::SeqCst)
2963        );
2964
2965        Ok(())
2966    }
2967
2968    #[tokio::test]
2969    async fn test_aggregate_ticket_should_aggregate_including_aggregated() -> anyhow::Result<()> {
2970        const COUNT_TICKETS: usize = 5;
2971
2972        let channel = ChannelEntry::new(
2973            BOB.public().to_address(),
2974            ALICE.public().to_address(),
2975            u32::MAX.into(),
2976            (COUNT_TICKETS + 1).into(),
2977            ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(120))),
2978            4_u32.into(),
2979        );
2980
2981        let db = init_db_with_channel(channel).await?;
2982
2983        let offset = 10_usize;
2984
2985        let mut tickets = (1..COUNT_TICKETS)
2986            .map(|i| {
2987                generate_random_ack_ticket(&BOB, &ALICE, (i + offset) as u64, 1, 1.0)
2988                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
2989            })
2990            .collect::<anyhow::Result<Vec<_>>>()?;
2991
2992        // Add an aggregated ticket to the set too
2993        tickets.push(
2994            generate_random_ack_ticket(&BOB, &ALICE, 0, offset as u32, 1.0)
2995                .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))?,
2996        );
2997
2998        let sum_value = tickets.iter().fold(HoprBalance::zero(), |acc, x| acc + x.ticket.amount);
2999        let min_idx = tickets
3000            .iter()
3001            .map(|t| t.ticket.index)
3002            .min()
3003            .context("min index should be present")?;
3004        let max_idx = tickets
3005            .iter()
3006            .map(|t| t.ticket.index)
3007            .max()
3008            .context("max index should be present")?;
3009
3010        let aggregated = db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets, &BOB).await?;
3011
3012        assert_eq!(
3013            &BOB.public().to_address(),
3014            aggregated.verified_issuer(),
3015            "must have correct signer"
3016        );
3017
3018        assert!(aggregated.verified_ticket().is_aggregated(), "must be aggregated");
3019
3020        assert_eq!(
3021            COUNT_TICKETS + offset,
3022            aggregated.verified_ticket().index_offset as usize,
3023            "aggregated ticket must have correct offset"
3024        );
3025        assert_eq!(
3026            sum_value,
3027            aggregated.verified_ticket().amount,
3028            "aggregated ticket token amount must be sum of individual tickets"
3029        );
3030        assert_eq!(
3031            1.0,
3032            aggregated.win_prob(),
3033            "aggregated ticket must have winning probability 1"
3034        );
3035        assert_eq!(
3036            min_idx,
3037            aggregated.verified_ticket().index,
3038            "aggregated ticket must have correct index"
3039        );
3040        assert_eq!(
3041            channel.get_id(),
3042            aggregated.verified_ticket().channel_id,
3043            "aggregated ticket must have correct channel id"
3044        );
3045        assert_eq!(
3046            channel.channel_epoch.as_u32(),
3047            aggregated.verified_ticket().channel_epoch,
3048            "aggregated ticket must have correct channel epoch"
3049        );
3050
3051        assert_eq!(
3052            max_idx + 1,
3053            db.get_outgoing_ticket_index(channel.get_id())
3054                .await?
3055                .load(Ordering::SeqCst)
3056        );
3057
3058        Ok(())
3059    }
3060
3061    #[tokio::test]
3062    async fn test_aggregate_ticket_should_not_aggregate_zero_tickets() -> anyhow::Result<()> {
3063        let db = HoprDb::new_in_memory(BOB.clone()).await?;
3064
3065        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), vec![], &BOB)
3066            .await
3067            .expect_err("should not aggregate empty ticket list");
3068
3069        Ok(())
3070    }
3071
3072    #[tokio::test]
3073    async fn test_aggregate_ticket_should_aggregate_single_ticket_to_itself() -> anyhow::Result<()> {
3074        const COUNT_TICKETS: usize = 1;
3075
3076        let channel = ChannelEntry::new(
3077            BOB.public().to_address(),
3078            ALICE.public().to_address(),
3079            u32::MAX.into(),
3080            (COUNT_TICKETS + 1).into(),
3081            ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(120))),
3082            4_u32.into(),
3083        );
3084
3085        let db = init_db_with_channel(channel).await?;
3086
3087        let mut tickets = (0..COUNT_TICKETS)
3088            .map(|i| {
3089                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3090                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3091            })
3092            .collect::<anyhow::Result<Vec<_>>>()?;
3093
3094        let aggregated = db
3095            .aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3096            .await?;
3097
3098        assert_eq!(
3099            &tickets.pop().context("ticket should be present")?.ticket,
3100            aggregated.verified_ticket()
3101        );
3102
3103        Ok(())
3104    }
3105
3106    #[tokio::test]
3107    async fn test_aggregate_ticket_should_not_aggregate_on_closed_channel() -> anyhow::Result<()> {
3108        const COUNT_TICKETS: usize = 3;
3109
3110        let channel = ChannelEntry::new(
3111            BOB.public().to_address(),
3112            ALICE.public().to_address(),
3113            u32::MAX.into(),
3114            (COUNT_TICKETS + 1).into(),
3115            ChannelStatus::Closed,
3116            4_u32.into(),
3117        );
3118
3119        let db = init_db_with_channel(channel).await?;
3120
3121        let tickets = (0..COUNT_TICKETS)
3122            .map(|i| {
3123                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3124                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3125            })
3126            .collect::<anyhow::Result<Vec<_>>>()?;
3127
3128        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3129            .await
3130            .expect_err("should not aggregate on closed channel");
3131
3132        Ok(())
3133    }
3134
3135    #[tokio::test]
3136    async fn test_aggregate_ticket_should_not_aggregate_on_incoming_channel() -> anyhow::Result<()> {
3137        const COUNT_TICKETS: usize = 3;
3138
3139        let channel = ChannelEntry::new(
3140            ALICE.public().to_address(),
3141            BOB.public().to_address(),
3142            u32::MAX.into(),
3143            (COUNT_TICKETS + 1).into(),
3144            ChannelStatus::Open,
3145            4_u32.into(),
3146        );
3147
3148        let db = init_db_with_channel(channel).await?;
3149
3150        let tickets = (0..COUNT_TICKETS)
3151            .map(|i| {
3152                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3153                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3154            })
3155            .collect::<anyhow::Result<Vec<_>>>()?;
3156
3157        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3158            .await
3159            .expect_err("should not aggregate on incoming channel");
3160
3161        Ok(())
3162    }
3163
3164    #[tokio::test]
3165    async fn test_aggregate_ticket_should_not_aggregate_if_mismatching_channel_ids() -> anyhow::Result<()> {
3166        const COUNT_TICKETS: usize = 3;
3167
3168        let channel = ChannelEntry::new(
3169            ALICE.public().to_address(),
3170            BOB.public().to_address(),
3171            u32::MAX.into(),
3172            (COUNT_TICKETS + 1).into(),
3173            ChannelStatus::Open,
3174            4_u32.into(),
3175        );
3176
3177        let db = init_db_with_channel(channel).await?;
3178
3179        let mut tickets = (0..COUNT_TICKETS)
3180            .map(|i| {
3181                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3182                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3183            })
3184            .collect::<anyhow::Result<Vec<_>>>()?;
3185
3186        tickets[2] = generate_random_ack_ticket(&BOB, &ChainKeypair::random(), 2, 1, 1.0)
3187            .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))?;
3188
3189        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3190            .await
3191            .expect_err("should not aggregate on mismatching channel ids");
3192
3193        Ok(())
3194    }
3195
3196    #[tokio::test]
3197    async fn test_aggregate_ticket_should_not_aggregate_if_mismatching_channel_epoch() -> anyhow::Result<()> {
3198        const COUNT_TICKETS: usize = 3;
3199
3200        let channel = ChannelEntry::new(
3201            ALICE.public().to_address(),
3202            BOB.public().to_address(),
3203            100.into(),
3204            (COUNT_TICKETS + 1).into(),
3205            ChannelStatus::Open,
3206            3_u32.into(),
3207        );
3208
3209        let db = init_db_with_channel(channel).await?;
3210
3211        let tickets = (0..COUNT_TICKETS)
3212            .map(|i| {
3213                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3214                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3215            })
3216            .collect::<anyhow::Result<Vec<_>>>()?;
3217
3218        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3219            .await
3220            .expect_err("should not aggregate on mismatching channel epoch");
3221
3222        Ok(())
3223    }
3224
3225    #[tokio::test]
3226    async fn test_aggregate_ticket_should_not_aggregate_if_ticket_indices_overlap() -> anyhow::Result<()> {
3227        const COUNT_TICKETS: usize = 3;
3228
3229        let channel = ChannelEntry::new(
3230            ALICE.public().to_address(),
3231            BOB.public().to_address(),
3232            u32::MAX.into(),
3233            (COUNT_TICKETS + 1).into(),
3234            ChannelStatus::Open,
3235            3_u32.into(),
3236        );
3237
3238        let db = init_db_with_channel(channel).await?;
3239
3240        let mut tickets = (0..COUNT_TICKETS)
3241            .map(|i| {
3242                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3243                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3244            })
3245            .collect::<anyhow::Result<Vec<_>>>()?;
3246
3247        tickets[1] = generate_random_ack_ticket(&BOB, &ALICE, 1, 2, 1.0)
3248            .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))?;
3249
3250        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3251            .await
3252            .expect_err("should not aggregate on overlapping ticket indices");
3253        Ok(())
3254    }
3255
3256    #[tokio::test]
3257    async fn test_aggregate_ticket_should_not_aggregate_if_ticket_is_not_valid() -> anyhow::Result<()> {
3258        const COUNT_TICKETS: usize = 3;
3259
3260        let channel = ChannelEntry::new(
3261            ALICE.public().to_address(),
3262            BOB.public().to_address(),
3263            u32::MAX.into(),
3264            (COUNT_TICKETS + 1).into(),
3265            ChannelStatus::Open,
3266            3_u32.into(),
3267        );
3268
3269        let db = init_db_with_channel(channel).await?;
3270
3271        let mut tickets = (0..COUNT_TICKETS)
3272            .map(|i| {
3273                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3274                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3275            })
3276            .collect::<anyhow::Result<Vec<_>>>()?;
3277
3278        // Modify the ticket and do not sign it
3279        tickets[1].ticket.amount = (TICKET_VALUE - 10).into();
3280
3281        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3282            .await
3283            .expect_err("should not aggregate on invalid tickets");
3284
3285        Ok(())
3286    }
3287
3288    #[tokio::test]
3289    async fn test_aggregate_ticket_should_not_aggregate_if_ticket_has_lower_than_min_win_prob() -> anyhow::Result<()> {
3290        const COUNT_TICKETS: usize = 3;
3291
3292        let channel = ChannelEntry::new(
3293            ALICE.public().to_address(),
3294            BOB.public().to_address(),
3295            u32::MAX.into(),
3296            (COUNT_TICKETS + 1).into(),
3297            ChannelStatus::Open,
3298            3_u32.into(),
3299        );
3300
3301        let db = init_db_with_channel(channel).await?;
3302
3303        let tickets = (0..COUNT_TICKETS)
3304            .map(|i| {
3305                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, if i > 0 { 1.0 } else { 0.9 })
3306                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3307            })
3308            .collect::<anyhow::Result<Vec<_>>>()?;
3309
3310        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3311            .await
3312            .expect_err("should not aggregate tickets with less than minimum win prob");
3313
3314        Ok(())
3315    }
3316
3317    #[tokio::test]
3318    async fn test_aggregate_ticket_should_not_aggregate_if_ticket_is_not_winning() -> anyhow::Result<()> {
3319        const COUNT_TICKETS: usize = 3;
3320
3321        let channel = ChannelEntry::new(
3322            ALICE.public().to_address(),
3323            BOB.public().to_address(),
3324            u32::MAX.into(),
3325            (COUNT_TICKETS + 1).into(),
3326            ChannelStatus::Open,
3327            3_u32.into(),
3328        );
3329
3330        let db = init_db_with_channel(channel).await?;
3331
3332        let mut tickets = (0..COUNT_TICKETS)
3333            .map(|i| {
3334                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3335                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3336            })
3337            .collect::<anyhow::Result<Vec<_>>>()?;
3338
3339        // Set the winning probability to zero and sign the ticket again
3340        let resp = Response::from_half_keys(&HalfKey::random(), &HalfKey::random())?;
3341        tickets[1] = TicketBuilder::from(tickets[1].ticket.clone())
3342            .win_prob(0.0.try_into()?)
3343            .challenge(resp.to_challenge()?)
3344            .build_signed(&BOB, &Hash::default())?
3345            .into_acknowledged(resp)
3346            .into_transferable(&ALICE, &Hash::default())?;
3347
3348        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3349            .await
3350            .expect_err("should not aggregate non-winning tickets");
3351
3352        Ok(())
3353    }
3354
3355    #[tokio::test]
3356    async fn test_set_ticket_statistics_when_tickets_are_in_db() -> anyhow::Result<()> {
3357        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
3358
3359        let ticket = init_db_with_tickets(&db, 1).await?.1.pop().unwrap();
3360
3361        db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed)
3362            .await
3363            .expect("must not fail");
3364
3365        let stats = db.get_ticket_statistics(None).await.expect("must not fail");
3366        assert_ne!(stats.redeemed_value, HoprBalance::zero());
3367
3368        db.reset_ticket_statistics().await.expect("must not fail");
3369
3370        let stats = db.get_ticket_statistics(None).await.expect("must not fail");
3371        assert_eq!(stats.redeemed_value, HoprBalance::zero());
3372
3373        Ok(())
3374    }
3375
3376    #[tokio::test]
3377    async fn test_fix_channels_ticket_state() -> anyhow::Result<()> {
3378        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
3379        const COUNT_TICKETS: u64 = 1;
3380
3381        let (..) = init_db_with_tickets(&db, COUNT_TICKETS).await?;
3382
3383        // mark the first ticket as being redeemed
3384        let mut ticket = hopr_db_entity::ticket::Entity::find()
3385            .one(&db.tickets_db)
3386            .await?
3387            .context("should have one active model")?
3388            .into_active_model();
3389        ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
3390        ticket.save(&db.tickets_db).await?;
3391
3392        assert!(
3393            hopr_db_entity::ticket::Entity::find()
3394                .one(&db.tickets_db)
3395                .await?
3396                .context("should have one active model")?
3397                .state
3398                == AcknowledgedTicketStatus::BeingRedeemed as i8,
3399        );
3400
3401        db.fix_channels_next_ticket_state().await.expect("must not fail");
3402
3403        assert!(
3404            hopr_db_entity::ticket::Entity::find()
3405                .one(&db.tickets_db)
3406                .await?
3407                .context("should have one active model")?
3408                .state
3409                == AcknowledgedTicketStatus::Untouched as i8,
3410        );
3411
3412        Ok(())
3413    }
3414
3415    #[tokio::test]
3416    async fn test_dont_fix_correct_channels_ticket_state() -> anyhow::Result<()> {
3417        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
3418        const COUNT_TICKETS: u64 = 2;
3419
3420        // we set up the channel to have ticket index 1, and ensure that fix does not trigger
3421        let (..) = init_db_with_tickets_and_channel(&db, COUNT_TICKETS, Some(1u32)).await?;
3422
3423        // mark the first ticket as being redeemed
3424        let mut ticket = hopr_db_entity::ticket::Entity::find()
3425            .filter(ticket::Column::Index.eq(0u64.to_be_bytes().to_vec()))
3426            .one(&db.tickets_db)
3427            .await?
3428            .context("should have one active model")?
3429            .into_active_model();
3430        ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
3431        ticket.save(&db.tickets_db).await?;
3432
3433        assert!(
3434            hopr_db_entity::ticket::Entity::find()
3435                .filter(ticket::Column::Index.eq(0u64.to_be_bytes().to_vec()))
3436                .one(&db.tickets_db)
3437                .await?
3438                .context("should have one active model")?
3439                .state
3440                == AcknowledgedTicketStatus::BeingRedeemed as i8,
3441        );
3442
3443        db.fix_channels_next_ticket_state().await.expect("must not fail");
3444
3445        // first ticket should still be in BeingRedeemed state
3446        let ticket0 = hopr_db_entity::ticket::Entity::find()
3447            .filter(ticket::Column::Index.eq(0u64.to_be_bytes().to_vec()))
3448            .one(&db.tickets_db)
3449            .await?
3450            .context("should have one active model")?;
3451        assert_eq!(ticket0.state, AcknowledgedTicketStatus::BeingRedeemed as i8);
3452
3453        // second ticket should be in Untouched state
3454        let ticket1 = hopr_db_entity::ticket::Entity::find()
3455            .filter(ticket::Column::Index.eq(1u64.to_be_bytes().to_vec()))
3456            .one(&db.tickets_db)
3457            .await?
3458            .context("should have one active model")?;
3459        assert_eq!(ticket1.state, AcknowledgedTicketStatus::Untouched as i8);
3460
3461        Ok(())
3462    }
3463}