hopr_db_sql/
tickets.rs

1use std::{
2    cmp,
3    ops::{Add, Bound},
4    sync::{
5        Arc,
6        atomic::{AtomicU64, Ordering},
7    },
8};
9
10use async_stream::stream;
11use async_trait::async_trait;
12use futures::{StreamExt, TryStreamExt, stream::BoxStream};
13use hopr_crypto_types::prelude::*;
14use hopr_db_api::{
15    errors::Result,
16    info::DomainSeparator,
17    prelude::{TicketIndexSelector, TicketMarker},
18    resolver::HoprDbResolverOperations,
19    tickets::{AggregationPrerequisites, ChannelTicketStatistics, HoprDbTicketOperations, TicketSelector},
20};
21use hopr_db_entity::{outgoing_ticket_index, ticket, ticket_statistics};
22use hopr_internal_types::prelude::*;
23#[cfg(all(feature = "prometheus", not(test)))]
24use hopr_metrics::metrics::MultiGauge;
25use hopr_primitive_types::prelude::*;
26use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QueryOrder, QuerySelect, Set};
27use sea_query::{Condition, Expr, IntoCondition, SimpleExpr};
28use tracing::{debug, error, info, trace, warn};
29
30use crate::{
31    HoprDbGeneralModelOperations, OpenTransaction, OptTx, TargetDb,
32    channels::HoprDbChannelOperations,
33    db::HoprDb,
34    errors::{DbSqlError, DbSqlError::LogicalError},
35    info::HoprDbInfoOperations,
36};
37
38#[cfg(all(feature = "prometheus", not(test)))]
39lazy_static::lazy_static! {
40    pub static ref METRIC_HOPR_TICKETS_INCOMING_STATISTICS: MultiGauge = MultiGauge::new(
41        "hopr_tickets_incoming_statistics",
42        "Ticket statistics for channels with incoming tickets.",
43        &["channel", "statistic"]
44    ).unwrap();
45}
46
47/// The maximum number of tickets that can sent for aggregation in a single request.
48pub const MAX_TICKETS_TO_AGGREGATE_BATCH: u64 = 500;
49
50/// The type is necessary solely to allow
51/// implementing the [`IntoCondition`] trait for [`TicketSelector`]
52/// from the `hopr_db_api` crate.
53#[derive(Clone)]
54pub(crate) struct WrappedTicketSelector(pub(crate) TicketSelector);
55
56impl From<TicketSelector> for WrappedTicketSelector {
57    fn from(selector: TicketSelector) -> Self {
58        Self(selector)
59    }
60}
61
62impl AsRef<TicketSelector> for WrappedTicketSelector {
63    fn as_ref(&self) -> &TicketSelector {
64        &self.0
65    }
66}
67
68impl IntoCondition for WrappedTicketSelector {
69    fn into_condition(self) -> Condition {
70        let expr = self
71            .0
72            .channel_identifiers
73            .into_iter()
74            .map(|(channel_id, epoch)| {
75                ticket::Column::ChannelId
76                    .eq(channel_id.to_hex())
77                    .and(ticket::Column::ChannelEpoch.eq(epoch.to_be_bytes().to_vec()))
78            })
79            .reduce(SimpleExpr::or);
80
81        // This cannot happen, but instead of panicking, return an impossible condition object
82        if expr.is_none() {
83            return Condition::any().not();
84        }
85
86        let mut expr = expr.unwrap();
87
88        match self.0.index {
89            TicketIndexSelector::None => {
90                // This will always be the case if there were multiple channel identifiers
91            }
92            TicketIndexSelector::Single(idx) => expr = expr.and(ticket::Column::Index.eq(idx.to_be_bytes().to_vec())),
93            TicketIndexSelector::Multiple(idxs) => {
94                expr = expr.and(ticket::Column::Index.is_in(idxs.into_iter().map(|i| i.to_be_bytes().to_vec())));
95            }
96            TicketIndexSelector::Range((lb, ub)) => {
97                expr = match lb {
98                    Bound::Included(gte) => expr.and(ticket::Column::Index.gte(gte.to_be_bytes().to_vec())),
99                    Bound::Excluded(gt) => expr.and(ticket::Column::Index.gt(gt.to_be_bytes().to_vec())),
100                    Bound::Unbounded => expr,
101                };
102                expr = match ub {
103                    Bound::Included(lte) => expr.and(ticket::Column::Index.lte(lte.to_be_bytes().to_vec())),
104                    Bound::Excluded(lt) => expr.and(ticket::Column::Index.lt(lt.to_be_bytes().to_vec())),
105                    Bound::Unbounded => expr,
106                };
107            }
108        }
109
110        if let Some(state) = self.0.state {
111            expr = expr.and(ticket::Column::State.eq(state as u8))
112        }
113
114        if self.0.only_aggregated {
115            expr = expr.and(ticket::Column::IndexOffset.gt(1));
116        }
117
118        // Win prob lower bound
119        expr = match self.0.win_prob.0 {
120            Bound::Included(gte) => expr.and(ticket::Column::WinningProbability.gte(gte.as_encoded().to_vec())),
121            Bound::Excluded(gt) => expr.and(ticket::Column::WinningProbability.gt(gt.as_encoded().to_vec())),
122            Bound::Unbounded => expr,
123        };
124
125        // Win prob upper bound
126        expr = match self.0.win_prob.1 {
127            Bound::Included(lte) => expr.and(ticket::Column::WinningProbability.lte(lte.as_encoded().to_vec())),
128            Bound::Excluded(lt) => expr.and(ticket::Column::WinningProbability.lt(lt.as_encoded().to_vec())),
129            Bound::Unbounded => expr,
130        };
131
132        // Amount lower bound
133        expr = match self.0.amount.0 {
134            Bound::Included(gte) => expr.and(ticket::Column::Amount.gte(gte.amount().to_be_bytes().to_vec())),
135            Bound::Excluded(gt) => expr.and(ticket::Column::Amount.gt(gt.amount().to_be_bytes().to_vec())),
136            Bound::Unbounded => expr,
137        };
138
139        // Amount upper bound
140        expr = match self.0.amount.1 {
141            Bound::Included(lte) => expr.and(ticket::Column::Amount.lte(lte.amount().to_be_bytes().to_vec())),
142            Bound::Excluded(lt) => expr.and(ticket::Column::Amount.lt(lt.amount().to_be_bytes().to_vec())),
143            Bound::Unbounded => expr,
144        };
145
146        expr.into_condition()
147    }
148}
149
150/// Filters the list of ticket models according to the prerequisites.
151/// **NOTE:** the input list is assumed to be sorted by ticket index in ascending order.
152///
153/// The following is applied:
154/// - the list of tickets is reduced so that the total amount on the tickets does not exceed the channel balance
155/// - it is checked whether the list size is greater than `min_unaggregated_ratio`
156/// - it is checked whether the ratio of total amount on the unaggregated tickets on the list and the channel balance
157///   ratio is greater than `min_unaggregated_ratio`
158pub(crate) fn filter_satisfying_ticket_models(
159    prerequisites: AggregationPrerequisites,
160    models: Vec<ticket::Model>,
161    channel_entry: &ChannelEntry,
162    min_win_prob: WinningProbability,
163) -> crate::errors::Result<Vec<ticket::Model>> {
164    let channel_id = channel_entry.get_id();
165
166    let mut to_be_aggregated = Vec::with_capacity(models.len());
167    let mut total_balance = HoprBalance::zero();
168
169    for m in models {
170        let ticket_wp: WinningProbability = m
171            .winning_probability
172            .as_slice()
173            .try_into()
174            .map_err(|_| DbSqlError::DecodingError)?;
175
176        if ticket_wp.approx_cmp(&min_win_prob).is_lt() {
177            warn!(
178                channel_id = %channel_entry.get_id(),
179                %ticket_wp, %min_win_prob, "encountered ticket with winning probability lower than the minimum threshold"
180            );
181            continue;
182        }
183
184        let to_add = HoprBalance::from_be_bytes(&m.amount);
185
186        // Do a balance check to be sure not to aggregate more than the current channel stake
187        total_balance += to_add;
188        if total_balance.gt(&channel_entry.balance) {
189            // Remove the last sub-balance which led to the overflow before breaking out of the loop.
190            total_balance -= to_add;
191            break;
192        }
193
194        to_be_aggregated.push(m);
195    }
196
197    // If there are no criteria, just send everything for aggregation
198    if prerequisites.min_ticket_count.is_none() && prerequisites.min_unaggregated_ratio.is_none() {
199        info!(channel = %channel_id, "Aggregation check OK, no aggregation prerequisites were given");
200        return Ok(to_be_aggregated);
201    }
202
203    let to_be_agg_count = to_be_aggregated.len();
204
205    // Check the aggregation threshold
206    if let Some(agg_threshold) = prerequisites.min_ticket_count {
207        if to_be_agg_count >= agg_threshold {
208            info!(channel = %channel_id, count = to_be_agg_count, threshold = agg_threshold, "Aggregation check OK aggregated value greater than threshold");
209            return Ok(to_be_aggregated);
210        } else {
211            debug!(channel = %channel_id, count = to_be_agg_count, threshold = agg_threshold,"Aggregation check FAIL not enough resources to aggregate");
212        }
213    }
214
215    if let Some(unrealized_threshold) = prerequisites.min_unaggregated_ratio {
216        let diminished_balance = channel_entry.balance.mul_f64(unrealized_threshold)?;
217
218        // Trigger aggregation if unrealized balance greater or equal to X percentage of the current balance
219        // and there are at least two tickets
220        if total_balance.ge(&diminished_balance) {
221            if to_be_agg_count > 1 {
222                info!(channel = %channel_id, count = to_be_agg_count, balance = ?total_balance, ?diminished_balance, "Aggregation check OK: more unrealized than diminished balance");
223                return Ok(to_be_aggregated);
224            } else {
225                debug!(channel = %channel_id, count = to_be_agg_count, balance = ?total_balance, ?diminished_balance, "Aggregation check FAIL: more unrealized than diminished balance but only 1 ticket");
226            }
227        } else {
228            debug!(channel = %channel_id, count = to_be_agg_count, balance = ?total_balance, ?diminished_balance, "Aggregation check FAIL: less unrealized than diminished balance");
229        }
230    }
231
232    debug!(channel = %channel_id,"Aggregation check FAIL: no prerequisites were met");
233    Ok(vec![])
234}
235
236pub(crate) async fn find_stats_for_channel(
237    tx: &OpenTransaction,
238    channel_id: &Hash,
239) -> crate::errors::Result<ticket_statistics::Model> {
240    if let Some(model) = ticket_statistics::Entity::find()
241        .filter(ticket_statistics::Column::ChannelId.eq(channel_id.to_hex()))
242        .one(tx.as_ref())
243        .await?
244    {
245        Ok(model)
246    } else {
247        let new_stats = ticket_statistics::ActiveModel {
248            channel_id: Set(channel_id.to_hex()),
249            ..Default::default()
250        }
251        .insert(tx.as_ref())
252        .await?;
253
254        Ok(new_stats)
255    }
256}
257
258impl HoprDb {
259    async fn get_tickets_value_int<'a>(
260        &'a self,
261        tx: OptTx<'a>,
262        selector: TicketSelector,
263    ) -> Result<(usize, HoprBalance)> {
264        let selector: WrappedTicketSelector = selector.into();
265        Ok(self
266            .nest_transaction_in_db(tx, TargetDb::Tickets)
267            .await?
268            .perform(|tx| {
269                Box::pin(async move {
270                    ticket::Entity::find()
271                        .filter(selector)
272                        .stream(tx.as_ref())
273                        .await
274                        .map_err(DbSqlError::from)?
275                        .map_err(DbSqlError::from)
276                        .try_fold((0_usize, HoprBalance::zero()), |(count, value), t| async move {
277                            Ok((count + 1, value + HoprBalance::from_be_bytes(t.amount)))
278                        })
279                        .await
280                })
281            })
282            .await?)
283    }
284}
285
286#[async_trait]
287impl HoprDbTicketOperations for HoprDb {
288    async fn get_all_tickets(&self) -> Result<Vec<AcknowledgedTicket>> {
289        Ok(self
290            .nest_transaction_in_db(None, TargetDb::Tickets)
291            .await?
292            .perform(|tx| {
293                Box::pin(async move {
294                    ticket::Entity::find()
295                        .all(tx.as_ref())
296                        .await?
297                        .into_iter()
298                        .map(AcknowledgedTicket::try_from)
299                        .collect::<hopr_db_entity::errors::Result<Vec<_>>>()
300                        .map_err(DbSqlError::from)
301                })
302            })
303            .await?)
304    }
305
306    async fn get_tickets(&self, selector: TicketSelector) -> Result<Vec<AcknowledgedTicket>> {
307        debug!("fetching tickets via {selector}");
308        let selector: WrappedTicketSelector = selector.into();
309
310        Ok(self
311            .nest_transaction_in_db(None, TargetDb::Tickets)
312            .await?
313            .perform(|tx| {
314                Box::pin(async move {
315                    ticket::Entity::find()
316                        .filter(selector)
317                        .all(tx.as_ref())
318                        .await?
319                        .into_iter()
320                        .map(AcknowledgedTicket::try_from)
321                        .collect::<hopr_db_entity::errors::Result<Vec<_>>>()
322                        .map_err(DbSqlError::from)
323                })
324            })
325            .await?)
326    }
327
328    async fn mark_tickets_as(&self, selector: TicketSelector, mark_as: TicketMarker) -> Result<usize> {
329        let myself = self.clone();
330        Ok(self
331            .ticket_manager
332            .with_write_locked_db(|tx| {
333                Box::pin(async move {
334                    let mut total_marked_count = 0;
335                    for (channel_id, epoch) in selector.channel_identifiers.iter() {
336                        let channel_selector = selector.clone().just_on_channel(*channel_id, epoch);
337
338                        // Get the number of tickets and their value just for this channel
339                        let (marked_count, marked_value) =
340                            myself.get_tickets_value_int(Some(tx), channel_selector.clone()).await?;
341                        trace!(marked_count, ?marked_value, ?mark_as, "ticket marking");
342
343                        if marked_count > 0 {
344                            // Delete the redeemed tickets first
345                            let deleted = ticket::Entity::delete_many()
346                                .filter(WrappedTicketSelector::from(channel_selector.clone()))
347                                .exec(tx.as_ref())
348                                .await?;
349
350                            // Update the stats if successful
351                            if deleted.rows_affected == marked_count as u64 {
352                                let mut new_stats = find_stats_for_channel(tx, channel_id).await?.into_active_model();
353                                let _current_value = match mark_as {
354                                    TicketMarker::Redeemed => {
355                                        let current_value = U256::from_be_bytes(new_stats.redeemed_value.as_ref());
356                                        new_stats.redeemed_value =
357                                            Set((current_value + marked_value.amount()).to_be_bytes().into());
358                                        current_value
359                                    }
360                                    TicketMarker::Rejected => {
361                                        let current_value = U256::from_be_bytes(new_stats.rejected_value.as_ref());
362                                        new_stats.rejected_value =
363                                            Set((current_value + marked_value.amount()).to_be_bytes().into());
364                                        current_value
365                                    }
366                                    TicketMarker::Neglected => {
367                                        let current_value = U256::from_be_bytes(new_stats.neglected_value.as_ref());
368                                        new_stats.neglected_value =
369                                            Set((current_value + marked_value.amount()).to_be_bytes().into());
370                                        current_value
371                                    }
372                                };
373                                new_stats.save(tx.as_ref()).await?;
374
375                                #[cfg(all(feature = "prometheus", not(test)))]
376                                {
377                                    let channel = channel_id.to_string();
378                                    METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
379                                        &[&channel, &mark_as.to_string()],
380                                        (_current_value + marked_value.amount()).as_u128() as f64,
381                                    );
382
383                                    // Tickets that are counted as rejected were never counted as unredeemed,
384                                    // so skip the metric subtraction in that case.
385                                    if mark_as != TicketMarker::Rejected {
386                                        let unredeemed_value = myself
387                                            .caches
388                                            .unrealized_value
389                                            .get(&(*channel_id, *epoch))
390                                            .await
391                                            .unwrap_or_default();
392
393                                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
394                                            &[&channel, "unredeemed"],
395                                            (unredeemed_value - marked_value.amount()).amount().as_u128() as f64,
396                                        );
397                                    }
398                                }
399
400                                myself.caches.unrealized_value.invalidate(&(*channel_id, *epoch)).await;
401                            } else {
402                                return Err(DbSqlError::LogicalError(format!(
403                                    "could not mark {marked_count} ticket as {mark_as}"
404                                )));
405                            }
406
407                            trace!(marked_count, ?channel_id, ?mark_as, "removed tickets in channel");
408                            total_marked_count += marked_count;
409                        }
410                    }
411
412                    info!(
413                        count = total_marked_count,
414                        ?mark_as,
415                        channel_count = selector.channel_identifiers.len(),
416                        "removed tickets in channels",
417                    );
418                    Ok(total_marked_count)
419                })
420            })
421            .await?)
422    }
423
424    async fn mark_unsaved_ticket_rejected(&self, ticket: &Ticket) -> Result<()> {
425        let channel_id = ticket.channel_id;
426        let amount = ticket.amount;
427        Ok(self
428            .ticket_manager
429            .with_write_locked_db(|tx| {
430                Box::pin(async move {
431                    let stats = find_stats_for_channel(tx, &channel_id).await?;
432
433                    let current_rejected_value = U256::from_be_bytes(stats.rejected_value.clone());
434
435                    let mut active_stats = stats.into_active_model();
436                    active_stats.rejected_value = Set((current_rejected_value + amount.amount()).to_be_bytes().into());
437                    active_stats.save(tx.as_ref()).await?;
438
439                    #[cfg(all(feature = "prometheus", not(test)))]
440                    {
441                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
442                            &[&channel_id.to_string(), "rejected"],
443                            (current_rejected_value + amount.amount()).as_u128() as f64,
444                        );
445                    }
446
447                    Ok::<(), DbSqlError>(())
448                })
449            })
450            .await?)
451    }
452
453    async fn update_ticket_states_and_fetch<'a>(
454        &'a self,
455        selector: TicketSelector,
456        new_state: AcknowledgedTicketStatus,
457    ) -> Result<BoxStream<'a, AcknowledgedTicket>> {
458        let selector: WrappedTicketSelector = selector.into();
459        Ok(Box::pin(stream! {
460            match ticket::Entity::find()
461                .filter(selector)
462                .stream(self.conn(TargetDb::Tickets))
463                .await {
464                Ok(mut stream) => {
465                    while let Ok(Some(ticket)) = stream.try_next().await {
466                        let active_ticket = ticket::ActiveModel {
467                            id: Set(ticket.id),
468                            state: Set(new_state as i8),
469                            ..Default::default()
470                        };
471
472                        {
473                            let _g = self.ticket_manager.mutex.lock();
474                            if let Err(e) = active_ticket.update(self.conn(TargetDb::Tickets)).await {
475                                error!(error = %e,"failed to update ticket in the db");
476                            }
477                        }
478
479                        match AcknowledgedTicket::try_from(ticket) {
480                            Ok(mut ticket) => {
481                                // Update the state manually, since we do not want to re-fetch the model after the update
482                                ticket.status = new_state;
483                                yield ticket
484                            },
485                            Err(e) => {
486                                tracing::error!(error = %e, "failed to decode ticket from the db");
487                            }
488                        }
489                    }
490                },
491                Err(e) => tracing::error!(error = %e, "failed open ticket db stream")
492            }
493        }))
494    }
495
496    async fn update_ticket_states(
497        &self,
498        selector: TicketSelector,
499        new_state: AcknowledgedTicketStatus,
500    ) -> Result<usize> {
501        let selector: WrappedTicketSelector = selector.into();
502        Ok(self
503            .ticket_manager
504            .with_write_locked_db(|tx| {
505                Box::pin(async move {
506                    let update = ticket::Entity::update_many()
507                        .filter(selector)
508                        .col_expr(ticket::Column::State, Expr::value(new_state as u8))
509                        .exec(tx.as_ref())
510                        .await?;
511                    Ok::<_, DbSqlError>(update.rows_affected as usize)
512                })
513            })
514            .await?)
515    }
516
517    async fn get_ticket_statistics(&self, channel_id: Option<Hash>) -> Result<ChannelTicketStatistics> {
518        let res = match channel_id {
519            None => {
520                #[cfg(all(feature = "prometheus", not(test)))]
521                let mut per_channel_unredeemed = std::collections::HashMap::new();
522
523                self.nest_transaction_in_db(None, TargetDb::Tickets)
524                    .await?
525                    .perform(|tx| {
526                        Box::pin(async move {
527                            let unredeemed_value = ticket::Entity::find()
528                                .stream(tx.as_ref())
529                                .await?
530                                .try_fold(U256::zero(), |amount, x| {
531                                    let unredeemed_value = U256::from_be_bytes(x.amount);
532
533                                    #[cfg(all(feature = "prometheus", not(test)))]
534                                    per_channel_unredeemed
535                                        .entry(x.channel_id)
536                                        .and_modify(|v| *v += unredeemed_value)
537                                        .or_insert(unredeemed_value);
538
539                                    futures::future::ok(amount + unredeemed_value)
540                                })
541                                .await?;
542
543                            #[cfg(all(feature = "prometheus", not(test)))]
544                            for (channel_id, unredeemed_value) in per_channel_unredeemed {
545                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS
546                                    .set(&[&channel_id, "unredeemed"], unredeemed_value.as_u128() as f64);
547                            }
548
549                            let mut all_stats = ticket_statistics::Entity::find()
550                                .all(tx.as_ref())
551                                .await?
552                                .into_iter()
553                                .fold(ChannelTicketStatistics::default(), |mut acc, stats| {
554                                    let neglected_value = HoprBalance::from_be_bytes(stats.neglected_value);
555                                    acc.neglected_value += neglected_value;
556                                    let redeemed_value = HoprBalance::from_be_bytes(stats.redeemed_value);
557                                    acc.redeemed_value += redeemed_value;
558                                    let rejected_value = HoprBalance::from_be_bytes(stats.rejected_value);
559                                    acc.rejected_value += rejected_value;
560                                    acc.winning_tickets += stats.winning_tickets as u128;
561
562                                    #[cfg(all(feature = "prometheus", not(test)))]
563                                    {
564                                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
565                                            &[&stats.channel_id, "neglected"],
566                                            neglected_value.amount().as_u128() as f64,
567                                        );
568
569                                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
570                                            &[&stats.channel_id, "redeemed"],
571                                            redeemed_value.amount().as_u128() as f64,
572                                        );
573
574                                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
575                                            &[&stats.channel_id, "rejected"],
576                                            rejected_value.amount().as_u128() as f64,
577                                        );
578                                    }
579
580                                    acc
581                                });
582
583                            all_stats.unredeemed_value = unredeemed_value.into();
584
585                            Ok::<_, DbSqlError>(all_stats)
586                        })
587                    })
588                    .await
589            }
590            Some(channel) => {
591                // We need to make sure the channel exists to avoid creating
592                // statistic entry for a non-existing channel
593                if self.get_channel_by_id(None, &channel).await?.is_none() {
594                    return Err(DbSqlError::ChannelNotFound(channel).into());
595                }
596
597                self.nest_transaction_in_db(None, TargetDb::Tickets)
598                    .await?
599                    .perform(|tx| {
600                        Box::pin(async move {
601                            let stats = find_stats_for_channel(tx, &channel).await?;
602                            let unredeemed_value = ticket::Entity::find()
603                                .filter(ticket::Column::ChannelId.eq(channel.to_hex()))
604                                .stream(tx.as_ref())
605                                .await?
606                                .try_fold(U256::zero(), |amount, x| {
607                                    futures::future::ok(amount + U256::from_be_bytes(x.amount))
608                                })
609                                .await?;
610
611                            Ok::<_, DbSqlError>(ChannelTicketStatistics {
612                                winning_tickets: stats.winning_tickets as u128,
613                                neglected_value: HoprBalance::from_be_bytes(stats.neglected_value),
614                                redeemed_value: HoprBalance::from_be_bytes(stats.redeemed_value),
615                                unredeemed_value: unredeemed_value.into(),
616                                rejected_value: HoprBalance::from_be_bytes(stats.rejected_value),
617                            })
618                        })
619                    })
620                    .await
621            }
622        };
623        Ok(res?)
624    }
625
626    async fn reset_ticket_statistics(&self) -> Result<()> {
627        let res = self
628            .nest_transaction_in_db(None, TargetDb::Tickets)
629            .await?
630            .perform(|tx| {
631                Box::pin(async move {
632                    #[cfg(all(feature = "prometheus", not(test)))]
633                    let rows = ticket_statistics::Entity::find().all(tx.as_ref()).await?;
634
635                    // delete statistics for the found rows
636                    let deleted = ticket_statistics::Entity::delete_many().exec(tx.as_ref()).await?;
637
638                    #[cfg(all(feature = "prometheus", not(test)))]
639                    {
640                        if deleted.rows_affected > 0 {
641                            for row in rows {
642                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&[&row.channel_id, "neglected"], 0.0_f64);
643
644                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&[&row.channel_id, "redeemed"], 0.0_f64);
645
646                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&[&row.channel_id, "rejected"], 0.0_f64);
647                            }
648                        }
649                    }
650
651                    debug!("reset ticket statistics for {:} channel(s)", deleted.rows_affected);
652
653                    Ok::<_, DbSqlError>(())
654                })
655            })
656            .await;
657
658        Ok(res?)
659    }
660
661    async fn get_tickets_value(&self, selector: TicketSelector) -> Result<(usize, HoprBalance)> {
662        self.get_tickets_value_int(None, selector).await
663    }
664
665    async fn compare_and_set_outgoing_ticket_index(&self, channel_id: Hash, index: u64) -> Result<u64> {
666        let old_value = self
667            .get_outgoing_ticket_index(channel_id)
668            .await?
669            .fetch_max(index, Ordering::SeqCst);
670
671        // TODO: should we hint the persisting mechanism to trigger the flush?
672        // if old_value < index {}
673
674        Ok(old_value)
675    }
676
677    async fn reset_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64> {
678        let old_value = self
679            .get_outgoing_ticket_index(channel_id)
680            .await?
681            .swap(0, Ordering::SeqCst);
682
683        // TODO: should we hint the persisting mechanism to trigger the flush?
684        // if old_value > 0 { }
685
686        Ok(old_value)
687    }
688
689    async fn increment_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64> {
690        let old_value = self
691            .get_outgoing_ticket_index(channel_id)
692            .await?
693            .fetch_add(1, Ordering::SeqCst);
694
695        // TODO: should we hint the persisting mechanism to trigger the flush?
696
697        Ok(old_value)
698    }
699
700    async fn get_outgoing_ticket_index(&self, channel_id: Hash) -> Result<Arc<AtomicU64>> {
701        let tkt_manager = self.ticket_manager.clone();
702
703        Ok(self
704            .caches
705            .ticket_index
706            .try_get_with(channel_id, async move {
707                let maybe_index = outgoing_ticket_index::Entity::find()
708                    .filter(outgoing_ticket_index::Column::ChannelId.eq(channel_id.to_hex()))
709                    .one(&tkt_manager.tickets_db)
710                    .await?;
711
712                Ok(Arc::new(AtomicU64::new(match maybe_index {
713                    Some(model) => U256::from_be_bytes(model.index).as_u64(),
714                    None => {
715                        tkt_manager
716                            .with_write_locked_db(|tx| {
717                                Box::pin(async move {
718                                    outgoing_ticket_index::ActiveModel {
719                                        channel_id: Set(channel_id.to_hex()),
720                                        ..Default::default()
721                                    }
722                                    .insert(tx.as_ref())
723                                    .await?;
724                                    Ok::<_, DbSqlError>(())
725                                })
726                            })
727                            .await?;
728                        0_u64
729                    }
730                })))
731            })
732            .await
733            .map_err(|e: Arc<DbSqlError>| LogicalError(format!("failed to retrieve ticket index: {e}")))?)
734    }
735
736    async fn persist_outgoing_ticket_indices(&self) -> Result<usize> {
737        let outgoing_indices = outgoing_ticket_index::Entity::find()
738            .all(&self.tickets_db)
739            .await
740            .map_err(DbSqlError::from)?;
741
742        let mut updated = 0;
743        for index_model in outgoing_indices {
744            let channel_id = Hash::from_hex(&index_model.channel_id).map_err(DbSqlError::from)?;
745            let db_index = U256::from_be_bytes(&index_model.index).as_u64();
746            if let Some(cached_index) = self.caches.ticket_index.get(&channel_id).await {
747                // Note that the persisted value is always lagging behind the cache,
748                // so the fact that the cached index can change between this load
749                // storing it in the DB is allowed.
750                let cached_index = cached_index.load(Ordering::SeqCst);
751
752                // Store the ticket index in a separate write transaction
753                if cached_index > db_index {
754                    let mut index_active_model = index_model.into_active_model();
755                    index_active_model.index = Set(cached_index.to_be_bytes().to_vec());
756                    self.ticket_manager
757                        .with_write_locked_db(|wtx| {
758                            Box::pin(async move {
759                                index_active_model.save(wtx.as_ref()).await?;
760                                Ok::<_, DbSqlError>(())
761                            })
762                        })
763                        .await?;
764
765                    debug!("updated ticket index in channel {channel_id} from {db_index} to {cached_index}");
766                    updated += 1;
767                }
768            } else {
769                // The value is not yet in the cache, meaning there's low traffic on this
770                // channel, so the value has not been yet fetched.
771                trace!(?channel_id, "channel not in cache yet");
772            }
773        }
774
775        Ok(Ok::<_, DbSqlError>(updated)?)
776    }
777
778    async fn prepare_aggregation_in_channel(
779        &self,
780        channel: &Hash,
781        prerequisites: AggregationPrerequisites,
782    ) -> Result<Option<(OffchainPublicKey, Vec<TransferableWinningTicket>, Hash)>> {
783        let myself = self.clone();
784
785        let channel_id = *channel;
786
787        let (channel_entry, peer, domain_separator, min_win_prob) = self
788            .nest_transaction_in_db(None, TargetDb::Index)
789            .await?
790            .perform(|tx| {
791                Box::pin(async move {
792                    let entry = myself
793                        .get_channel_by_id(Some(tx), &channel_id)
794                        .await?
795                        .ok_or(DbSqlError::ChannelNotFound(channel_id))?;
796
797                    if entry.status == ChannelStatus::Closed {
798                        return Err(DbSqlError::LogicalError(format!("channel '{channel_id}' is closed")));
799                    } else if entry.direction(&myself.me_onchain) != Some(ChannelDirection::Incoming) {
800                        return Err(DbSqlError::LogicalError(format!(
801                            "channel '{channel_id}' is not incoming"
802                        )));
803                    }
804
805                    let pk = myself
806                        .resolve_packet_key(&entry.source)
807                        .await?
808                        .ok_or(DbSqlError::LogicalError(format!(
809                            "peer '{}' has no offchain key record",
810                            entry.source
811                        )))?;
812
813                    let indexer_data = myself.get_indexer_data(Some(tx)).await?;
814
815                    let domain_separator = indexer_data
816                        .channels_dst
817                        .ok_or_else(|| crate::errors::DbSqlError::LogicalError("domain separator missing".into()))?;
818
819                    Ok((
820                        entry,
821                        pk,
822                        domain_separator,
823                        indexer_data.minimum_incoming_ticket_winning_prob,
824                    ))
825                })
826            })
827            .await?;
828
829        let myself = self.clone();
830        let tickets = self
831            .ticket_manager
832            .with_write_locked_db(|tx| {
833                Box::pin(async move {
834                    // verify that no aggregation is in progress in the channel
835                    if ticket::Entity::find()
836                        .filter(WrappedTicketSelector::from(
837                            TicketSelector::from(&channel_entry).with_state(AcknowledgedTicketStatus::BeingAggregated),
838                        ))
839                        .one(tx.as_ref())
840                        .await?
841                        .is_some()
842                    {
843                        return Err(DbSqlError::LogicalError(format!(
844                            "'{channel_entry}' is already being aggregated",
845                        )));
846                    }
847
848                    // find the index of the last ticket being redeemed
849                    let first_idx_to_take = ticket::Entity::find()
850                        .filter(WrappedTicketSelector::from(
851                            TicketSelector::from(&channel_entry).with_state(AcknowledgedTicketStatus::BeingRedeemed),
852                        ))
853                        .order_by_desc(ticket::Column::Index)
854                        .one(tx.as_ref())
855                        .await?
856                        .map(|m| U256::from_be_bytes(m.index).as_u64() + 1)
857                        .unwrap_or(0_u64) // go from the lowest possible index of none is found
858                        .max(channel_entry.ticket_index.as_u64()); // but cannot be less than the ticket index on the Channel entry
859
860                    // get the list of all tickets to be aggregated
861                    let to_be_aggregated = ticket::Entity::find()
862                        .filter(WrappedTicketSelector::from(TicketSelector::from(&channel_entry)))
863                        .filter(ticket::Column::Index.gte(first_idx_to_take.to_be_bytes().to_vec()))
864                        .filter(ticket::Column::State.ne(AcknowledgedTicketStatus::BeingAggregated as u8))
865                        .order_by_asc(ticket::Column::Index)// tickets must be sorted by indices in ascending order
866                        .limit(MAX_TICKETS_TO_AGGREGATE_BATCH)
867                        .all(tx.as_ref())
868                        .await?;
869
870                    // Filter the list of tickets according to the prerequisites
871                    let mut to_be_aggregated: Vec<TransferableWinningTicket> =
872                        filter_satisfying_ticket_models(prerequisites, to_be_aggregated, &channel_entry, min_win_prob)?
873                            .into_iter()
874                            .map(|model| {
875                                AcknowledgedTicket::try_from(model)
876                                    .map_err(DbSqlError::from)
877                                    .and_then(|ack| {
878                                        ack.into_transferable(&myself.chain_key, &domain_separator)
879                                            .map_err(DbSqlError::from)
880                                    })
881                            })
882                            .collect::<crate::errors::Result<Vec<_>>>()?;
883
884                    let mut neglected_idxs = Vec::new();
885
886                    if !to_be_aggregated.is_empty() {
887                        // Clean up any tickets in this channel that are already inside an aggregated ticket.
888                        // This situation cannot be avoided 100% as aggregation can be triggered when out-of-order
889                        // tickets arrive and only some of them are necessary to satisfy the aggregation threshold.
890                        // The following code *assumes* that only the first ticket with the lowest index *can be* an aggregate.
891                        let first_ticket = to_be_aggregated[0].ticket.clone();
892                        let mut i = 1;
893                        while i < to_be_aggregated.len() {
894                            let current_idx = to_be_aggregated[i].ticket.index;
895                            if (first_ticket.index..first_ticket.index + first_ticket.index_offset as u64).contains(&current_idx) {
896                                // Cleanup is the only reasonable thing to do at this point,
897                                // since the aggregator will check for index range overlaps and deny
898                                // the aggregation of the entire batch otherwise.
899                                warn!(ticket_id = current_idx, channel = %channel_id, ?first_ticket, "ticket in channel has been already aggregated and will be removed");
900                                neglected_idxs.push(current_idx);
901                                to_be_aggregated.remove(i);
902                            } else {
903                                i += 1;
904                            }
905                        }
906
907                        // The cleanup (neglecting of tickets) is not made directly here but on the next ticket redemption in this channel
908                        // See handler.rs around L402
909                        if !neglected_idxs.is_empty() {
910                            warn!(count = neglected_idxs.len(), channel = %channel_id, "tickets were neglected due to duplication in an aggregated ticket!");
911                        }
912
913                        // mark all tickets with appropriate characteristics as being aggregated
914                        let marked: sea_orm::UpdateResult = ticket::Entity::update_many()
915                            .filter(WrappedTicketSelector::from(TicketSelector::from(&channel_entry)))
916                            .filter(ticket::Column::Index.is_in(to_be_aggregated.iter().map(|t| t.ticket.index.to_be_bytes().to_vec())))
917                            .filter(ticket::Column::State.ne(AcknowledgedTicketStatus::BeingAggregated as u8))
918                            .col_expr(
919                                ticket::Column::State,
920                                Expr::value(AcknowledgedTicketStatus::BeingAggregated as i8),
921                            )
922                            .exec(tx.as_ref())
923                            .await?;
924
925                        if marked.rows_affected as usize != to_be_aggregated.len() {
926                            return Err(DbSqlError::LogicalError(format!(
927                                "expected to mark {}, but was able to mark {}",
928                                to_be_aggregated.len(),
929                                marked.rows_affected,
930                            )));
931                        }
932                    }
933
934                    debug!(
935                        "prepared {} tickets to aggregate in {} ({})",
936                        to_be_aggregated.len(),
937                        channel_entry.get_id(),
938                        channel_entry.channel_epoch,
939                    );
940
941                    Ok(to_be_aggregated)
942                })
943            })
944            .await?;
945
946        Ok((!tickets.is_empty()).then_some((peer, tickets, domain_separator)))
947    }
948
949    async fn rollback_aggregation_in_channel(&self, channel: Hash) -> Result<()> {
950        let channel_entry = self
951            .get_channel_by_id(None, &channel)
952            .await?
953            .ok_or(DbSqlError::ChannelNotFound(channel))?;
954
955        let selector = TicketSelector::from(channel_entry).with_state(AcknowledgedTicketStatus::BeingAggregated);
956
957        let reverted = self
958            .update_ticket_states(selector, AcknowledgedTicketStatus::Untouched)
959            .await?;
960
961        info!(
962            "rollback happened for ticket aggregation in '{channel}' with {reverted} tickets rolled back as a result",
963        );
964        Ok(())
965    }
966
967    async fn process_received_aggregated_ticket(
968        &self,
969        aggregated_ticket: Ticket,
970        chain_keypair: &ChainKeypair,
971    ) -> Result<AcknowledgedTicket> {
972        if chain_keypair.public().to_address() != self.me_onchain {
973            return Err(DbSqlError::LogicalError(
974                "chain key for ticket aggregation does not match the DB public address".into(),
975            )
976            .into());
977        }
978
979        let myself = self.clone();
980        let channel_id = aggregated_ticket.channel_id;
981
982        let (channel_entry, domain_separator) = self
983            .nest_transaction_in_db(None, TargetDb::Index)
984            .await?
985            .perform(|tx| {
986                Box::pin(async move {
987                    let entry = myself
988                        .get_channel_by_id(Some(tx), &channel_id)
989                        .await?
990                        .ok_or(DbSqlError::ChannelNotFound(channel_id))?;
991
992                    if entry.status == ChannelStatus::Closed {
993                        return Err(DbSqlError::LogicalError(format!("channel '{channel_id}' is closed")));
994                    } else if entry.direction(&myself.me_onchain) != Some(ChannelDirection::Incoming) {
995                        return Err(DbSqlError::LogicalError(format!(
996                            "channel '{channel_id}' is not incoming"
997                        )));
998                    }
999
1000                    let domain_separator =
1001                        myself.get_indexer_data(Some(tx)).await?.channels_dst.ok_or_else(|| {
1002                            crate::errors::DbSqlError::LogicalError("domain separator missing".into())
1003                        })?;
1004
1005                    Ok((entry, domain_separator))
1006                })
1007            })
1008            .await?;
1009
1010        // Verify the ticket first
1011        let aggregated_ticket = aggregated_ticket
1012            .verify(&channel_entry.source, &domain_separator)
1013            .map_err(|e| {
1014                DbSqlError::LogicalError(format!(
1015                    "failed to verify received aggregated ticket in {channel_id}: {e}"
1016                ))
1017            })?;
1018
1019        // Aggregated tickets always have 100% winning probability
1020        if !aggregated_ticket.win_prob().approx_eq(&WinningProbability::ALWAYS) {
1021            return Err(DbSqlError::LogicalError("Aggregated tickets must have 100% win probability".into()).into());
1022        }
1023
1024        let acknowledged_tickets = self
1025            .nest_transaction_in_db(None, TargetDb::Tickets)
1026            .await?
1027            .perform(|tx| {
1028                Box::pin(async move {
1029                    ticket::Entity::find()
1030                        .filter(WrappedTicketSelector::from(
1031                            TicketSelector::from(&channel_entry).with_state(AcknowledgedTicketStatus::BeingAggregated),
1032                        ))
1033                        .all(tx.as_ref())
1034                        .await
1035                        .map_err(DbSqlError::BackendError)
1036                })
1037            })
1038            .await?;
1039
1040        if acknowledged_tickets.is_empty() {
1041            debug!("Received unexpected aggregated ticket in channel {channel_id}");
1042            return Err(DbSqlError::LogicalError(format!(
1043                "failed insert aggregated ticket, because no tickets seem to be aggregated for '{channel_id}'",
1044            ))
1045            .into());
1046        }
1047
1048        let stored_value = acknowledged_tickets
1049            .iter()
1050            .map(|m| HoprBalance::from_be_bytes(&m.amount))
1051            .sum();
1052
1053        // The value of a received ticket can be higher (profit for us) but not lower
1054        if aggregated_ticket.verified_ticket().amount.lt(&stored_value) {
1055            error!(channel = %channel_id, "Aggregated ticket value in channel is lower than sum of stored tickets");
1056            return Err(DbSqlError::LogicalError("Value of received aggregated ticket is too low".into()).into());
1057        }
1058
1059        let acknowledged_tickets = acknowledged_tickets
1060            .into_iter()
1061            .map(AcknowledgedTicket::try_from)
1062            .collect::<hopr_db_entity::errors::Result<Vec<AcknowledgedTicket>>>()
1063            .map_err(DbSqlError::from)?;
1064
1065        // can be done, because the tickets collection is tested for emptiness before
1066        let first_stored_ticket = acknowledged_tickets.first().unwrap();
1067
1068        // calculate the new current ticket index
1069        #[allow(unused_variables)]
1070        let current_ticket_index_from_aggregated_ticket =
1071            U256::from(aggregated_ticket.verified_ticket().index).add(aggregated_ticket.verified_ticket().index_offset);
1072
1073        let acked_aggregated_ticket = aggregated_ticket.into_acknowledged(first_stored_ticket.response.clone());
1074
1075        let ticket = acked_aggregated_ticket.clone();
1076        self.ticket_manager.replace_tickets(ticket).await?;
1077
1078        info!(%acked_aggregated_ticket, "successfully processed received aggregated ticket");
1079        Ok(acked_aggregated_ticket)
1080    }
1081
1082    async fn aggregate_tickets(
1083        &self,
1084        destination: OffchainPublicKey,
1085        mut acked_tickets: Vec<TransferableWinningTicket>,
1086        me: &ChainKeypair,
1087    ) -> Result<VerifiedTicket> {
1088        if me.public().to_address() != self.me_onchain {
1089            return Err(DbSqlError::LogicalError(
1090                "chain key for ticket aggregation does not match the DB public address".into(),
1091            )
1092            .into());
1093        }
1094
1095        let domain_separator = self
1096            .get_indexer_data(None)
1097            .await?
1098            .domain_separator(DomainSeparator::Channel)
1099            .ok_or_else(|| DbSqlError::LogicalError("domain separator missing".into()))?;
1100
1101        if acked_tickets.is_empty() {
1102            return Err(DbSqlError::LogicalError("at least one ticket required for aggregation".to_owned()).into());
1103        }
1104
1105        if acked_tickets.len() == 1 {
1106            let single = acked_tickets
1107                .pop()
1108                .unwrap()
1109                .into_redeemable(&self.me_onchain, &domain_separator)
1110                .map_err(DbSqlError::from)?;
1111
1112            self.compare_and_set_outgoing_ticket_index(
1113                single.verified_ticket().channel_id,
1114                single.verified_ticket().index + 1,
1115            )
1116            .await?;
1117
1118            return Ok(single.ticket);
1119        }
1120
1121        acked_tickets.sort_by(|a, b| a.partial_cmp(b).unwrap_or(cmp::Ordering::Equal));
1122        acked_tickets.dedup();
1123
1124        let myself = self.clone();
1125        let address = myself
1126            .resolve_chain_key(&destination)
1127            .await?
1128            .ok_or(DbSqlError::LogicalError(format!(
1129                "peer '{}' has no chain key record",
1130                destination.to_peerid_str()
1131            )))?;
1132
1133        let (channel_entry, destination, min_win_prob) = self
1134            .nest_transaction_in_db(None, TargetDb::Index)
1135            .await?
1136            .perform(|tx| {
1137                Box::pin(async move {
1138                    let entry = myself
1139                        .get_channel_by_parties(Some(tx), &myself.me_onchain, &address, false)
1140                        .await?
1141                        .ok_or_else(|| {
1142                            DbSqlError::ChannelNotFound(generate_channel_id(&myself.me_onchain, &address))
1143                        })?;
1144
1145                    if entry.status == ChannelStatus::Closed {
1146                        return Err(DbSqlError::LogicalError(format!("{entry} is closed")));
1147                    } else if entry.direction(&myself.me_onchain) != Some(ChannelDirection::Outgoing) {
1148                        return Err(DbSqlError::LogicalError(format!("{entry} is not outgoing")));
1149                    }
1150
1151                    let min_win_prob = myself
1152                        .get_indexer_data(Some(tx))
1153                        .await?
1154                        .minimum_incoming_ticket_winning_prob;
1155                    Ok((entry, address, min_win_prob))
1156                })
1157            })
1158            .await?;
1159
1160        let channel_balance = channel_entry.balance;
1161        let channel_epoch = channel_entry.channel_epoch.as_u32();
1162        let channel_id = channel_entry.get_id();
1163
1164        let mut final_value = HoprBalance::zero();
1165
1166        // Validate all received tickets and turn them into RedeemableTickets
1167        let verified_tickets = acked_tickets
1168            .into_iter()
1169            .map(|t| t.into_redeemable(&self.me_onchain, &domain_separator))
1170            .collect::<hopr_internal_types::errors::Result<Vec<_>>>()
1171            .map_err(|e| {
1172                DbSqlError::LogicalError(format!("trying to aggregate an invalid or a non-winning ticket: {e}"))
1173            })?;
1174
1175        // Perform additional consistency check on the verified tickets
1176        for (i, acked_ticket) in verified_tickets.iter().enumerate() {
1177            if channel_id != acked_ticket.verified_ticket().channel_id {
1178                return Err(DbSqlError::LogicalError(format!(
1179                    "ticket for aggregation has an invalid channel id {}",
1180                    acked_ticket.verified_ticket().channel_id
1181                ))
1182                .into());
1183            }
1184
1185            if acked_ticket.verified_ticket().channel_epoch != channel_epoch {
1186                return Err(DbSqlError::LogicalError("channel epochs do not match".into()).into());
1187            }
1188
1189            if i + 1 < verified_tickets.len()
1190                && acked_ticket.verified_ticket().index + acked_ticket.verified_ticket().index_offset as u64
1191                    > verified_tickets[i + 1].verified_ticket().index
1192            {
1193                return Err(DbSqlError::LogicalError("tickets with overlapping index intervals".into()).into());
1194            }
1195
1196            if acked_ticket
1197                .verified_ticket()
1198                .win_prob()
1199                .approx_cmp(&min_win_prob)
1200                .is_lt()
1201            {
1202                return Err(DbSqlError::LogicalError(
1203                    "cannot aggregate ticket with lower than minimum winning probability in network".into(),
1204                )
1205                .into());
1206            }
1207
1208            final_value += acked_ticket.verified_ticket().amount;
1209            if final_value.gt(&channel_balance) {
1210                return Err(DbSqlError::LogicalError(format!(
1211                    "ticket amount to aggregate {final_value} is greater than the balance {channel_balance} of \
1212                     channel {channel_id}"
1213                ))
1214                .into());
1215            }
1216        }
1217
1218        info!(
1219            "aggregated {} tickets in channel {channel_id} with total value {final_value}",
1220            verified_tickets.len()
1221        );
1222
1223        let first_acked_ticket = verified_tickets.first().unwrap();
1224        let last_acked_ticket = verified_tickets.last().unwrap();
1225
1226        // calculate the minimum current ticket index as the larger value from the acked ticket index and on-chain
1227        // ticket_index from channel_entry
1228        let current_ticket_index_from_acked_tickets = last_acked_ticket.verified_ticket().index + 1;
1229        self.compare_and_set_outgoing_ticket_index(channel_id, current_ticket_index_from_acked_tickets)
1230            .await?;
1231
1232        Ok(TicketBuilder::default()
1233            .direction(&self.me_onchain, &destination)
1234            .balance(final_value)
1235            .index(first_acked_ticket.verified_ticket().index)
1236            .index_offset(
1237                (last_acked_ticket.verified_ticket().index - first_acked_ticket.verified_ticket().index + 1) as u32,
1238            )
1239            .win_prob(WinningProbability::ALWAYS) // Aggregated tickets have always 100% winning probability
1240            .channel_epoch(channel_epoch)
1241            .challenge(first_acked_ticket.verified_ticket().challenge)
1242            .build_signed(me, &domain_separator)
1243            .map_err(DbSqlError::from)?)
1244    }
1245
1246    async fn fix_channels_next_ticket_state(&self) -> Result<()> {
1247        let channels = self.get_incoming_channels(None).await?;
1248
1249        for channel in channels.into_iter() {
1250            let selector = TicketSelector::from(&channel)
1251                .with_state(AcknowledgedTicketStatus::BeingRedeemed)
1252                .with_index(channel.ticket_index.as_u64());
1253
1254            let mut tickets_stream = self
1255                .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::Untouched)
1256                .await?;
1257
1258            while let Some(ticket) = tickets_stream.next().await {
1259                let channel_id = channel.get_id();
1260                let ticket_index = ticket.verified_ticket().index;
1261                let ticket_amount = ticket.verified_ticket().amount;
1262                info!(%channel_id, %ticket_index, %ticket_amount, "fixed next out-of-sync ticket");
1263            }
1264        }
1265
1266        Ok(())
1267    }
1268}
1269
1270impl HoprDb {
1271    /// Used only by non-SQLite code and tests.
1272    pub async fn upsert_ticket<'a>(&'a self, tx: OptTx<'a>, acknowledged_ticket: AcknowledgedTicket) -> Result<()> {
1273        self.nest_transaction_in_db(tx, TargetDb::Tickets)
1274            .await?
1275            .perform(|tx| {
1276                Box::pin(async move {
1277                    // For upserting, we must select only by the triplet (channel id, epoch, index)
1278                    let selector = WrappedTicketSelector::from(
1279                        TicketSelector::new(
1280                            acknowledged_ticket.verified_ticket().channel_id,
1281                            acknowledged_ticket.verified_ticket().channel_epoch,
1282                        )
1283                        .with_index(acknowledged_ticket.verified_ticket().index),
1284                    );
1285
1286                    debug!("upserting ticket {acknowledged_ticket}");
1287                    let mut model = ticket::ActiveModel::from(acknowledged_ticket);
1288
1289                    if let Some(ticket) = ticket::Entity::find().filter(selector).one(tx.as_ref()).await? {
1290                        model.id = Set(ticket.id);
1291                    }
1292
1293                    Ok::<_, DbSqlError>(model.save(tx.as_ref()).await?)
1294                })
1295            })
1296            .await?;
1297        Ok(())
1298    }
1299}
1300
1301#[cfg(test)]
1302mod tests {
1303    use std::{
1304        ops::Add,
1305        sync::atomic::Ordering,
1306        time::{Duration, SystemTime},
1307    };
1308
1309    use anyhow::{Context, anyhow};
1310    use futures::{StreamExt, pin_mut};
1311    use hex_literal::hex;
1312    use hopr_crypto_random::Randomizable;
1313    use hopr_crypto_types::prelude::*;
1314    use hopr_db_api::{
1315        info::DomainSeparator,
1316        prelude::{DbError, TicketMarker},
1317        tickets::ChannelTicketStatistics,
1318    };
1319    use hopr_db_entity::ticket;
1320    use hopr_internal_types::prelude::*;
1321    use hopr_primitive_types::prelude::*;
1322    use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter, Set};
1323
1324    use crate::{
1325        HoprDbGeneralModelOperations, TargetDb,
1326        accounts::HoprDbAccountOperations,
1327        channels::HoprDbChannelOperations,
1328        db::HoprDb,
1329        errors::DbSqlError,
1330        info::HoprDbInfoOperations,
1331        tickets::{AggregationPrerequisites, HoprDbTicketOperations, TicketSelector, filter_satisfying_ticket_models},
1332    };
1333
1334    lazy_static::lazy_static! {
1335        static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
1336        static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
1337        static ref CHANNEL_ID: Hash = generate_channel_id(&BOB.public().to_address(), &ALICE.public().to_address());
1338    }
1339
1340    lazy_static::lazy_static! {
1341        static ref ALICE_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
1342        static ref BOB_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
1343    }
1344
1345    const TICKET_VALUE: u64 = 100_000;
1346
1347    async fn add_peer_mappings(db: &HoprDb, peers: Vec<(OffchainKeypair, ChainKeypair)>) -> crate::errors::Result<()> {
1348        for (peer_offchain, peer_onchain) in peers.into_iter() {
1349            db.insert_account(
1350                None,
1351                AccountEntry {
1352                    public_key: *peer_offchain.public(),
1353                    chain_addr: peer_onchain.public().to_address(),
1354                    entry_type: AccountType::NotAnnounced,
1355                    published_at: 0,
1356                },
1357            )
1358            .await?
1359        }
1360
1361        Ok(())
1362    }
1363
1364    fn generate_random_ack_ticket(
1365        src: &ChainKeypair,
1366        dst: &ChainKeypair,
1367        index: u64,
1368        index_offset: u32,
1369        win_prob: f64,
1370    ) -> anyhow::Result<AcknowledgedTicket> {
1371        let hk1 = HalfKey::random();
1372        let hk2 = HalfKey::random();
1373
1374        let cp1: CurvePoint = hk1.to_challenge().try_into()?;
1375        let cp2: CurvePoint = hk2.to_challenge().try_into()?;
1376        let cp_sum = CurvePoint::combine(&[&cp1, &cp2]);
1377
1378        Ok(TicketBuilder::default()
1379            .addresses(src, dst)
1380            .amount(TICKET_VALUE)
1381            .index(index)
1382            .index_offset(index_offset)
1383            .win_prob(win_prob.try_into()?)
1384            .channel_epoch(4)
1385            .challenge(Challenge::from(cp_sum).to_ethereum_challenge())
1386            .build_signed(src, &Hash::default())?
1387            .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?))
1388    }
1389
1390    async fn init_db_with_tickets(
1391        db: &HoprDb,
1392        count_tickets: u64,
1393    ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
1394        init_db_with_tickets_and_channel(db, count_tickets, None).await
1395    }
1396
1397    async fn init_db_with_tickets_and_channel(
1398        db: &HoprDb,
1399        count_tickets: u64,
1400        channel_ticket_index: Option<u32>,
1401    ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
1402        let channel = ChannelEntry::new(
1403            BOB.public().to_address(),
1404            ALICE.public().to_address(),
1405            u32::MAX.into(),
1406            channel_ticket_index.unwrap_or(0u32).into(),
1407            ChannelStatus::Open,
1408            4_u32.into(),
1409        );
1410
1411        db.upsert_channel(None, channel).await?;
1412
1413        let tickets: Vec<AcknowledgedTicket> = (0..count_tickets)
1414            .map(|i| generate_random_ack_ticket(&BOB, &ALICE, i, 1, 1.0))
1415            .collect::<anyhow::Result<Vec<AcknowledgedTicket>>>()?;
1416
1417        let db_clone = db.clone();
1418        let tickets_clone = tickets.clone();
1419        db.begin_transaction_in_db(TargetDb::Tickets)
1420            .await?
1421            .perform(|tx| {
1422                Box::pin(async move {
1423                    for t in tickets_clone {
1424                        db_clone.upsert_ticket(Some(tx), t).await?;
1425                    }
1426                    Ok::<(), DbSqlError>(())
1427                })
1428            })
1429            .await?;
1430
1431        Ok((channel, tickets))
1432    }
1433
1434    #[tokio::test]
1435    async fn test_insert_get_ticket() -> anyhow::Result<()> {
1436        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1437        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
1438            .await?;
1439
1440        let (channel, mut tickets) = init_db_with_tickets(&db, 1).await?;
1441        let ack_ticket = tickets.pop().context("ticket should be present")?;
1442
1443        assert_eq!(
1444            channel.get_id(),
1445            ack_ticket.verified_ticket().channel_id,
1446            "channel ids must match"
1447        );
1448        assert_eq!(
1449            channel.channel_epoch.as_u32(),
1450            ack_ticket.verified_ticket().channel_epoch,
1451            "epochs must match"
1452        );
1453
1454        let db_ticket = db
1455            .get_tickets((&ack_ticket).into())
1456            .await?
1457            .first()
1458            .cloned()
1459            .context("ticket should exist")?;
1460
1461        assert_eq!(ack_ticket, db_ticket, "tickets must be equal");
1462
1463        Ok(())
1464    }
1465
1466    #[tokio::test]
1467    async fn test_mark_redeemed() -> anyhow::Result<()> {
1468        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1469        const COUNT_TICKETS: u64 = 10;
1470
1471        let (_, tickets) = init_db_with_tickets(&db, COUNT_TICKETS).await?;
1472
1473        let stats = db.get_ticket_statistics(None).await?;
1474        assert_eq!(
1475            HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
1476            stats.unredeemed_value,
1477            "unredeemed balance must match"
1478        );
1479        assert_eq!(
1480            HoprBalance::zero(),
1481            stats.redeemed_value,
1482            "there must be 0 redeemed value"
1483        );
1484
1485        assert_eq!(
1486            stats,
1487            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1488            "per channel stats must be same"
1489        );
1490
1491        const TO_REDEEM: u64 = 2;
1492        let db_clone = db.clone();
1493        db.begin_transaction_in_db(TargetDb::Tickets)
1494            .await?
1495            .perform(|_tx| {
1496                Box::pin(async move {
1497                    for ticket in tickets.iter().take(TO_REDEEM as usize) {
1498                        let r = db_clone.mark_tickets_as(ticket.into(), TicketMarker::Redeemed).await?;
1499                        assert_eq!(1, r, "must redeem only a single ticket");
1500                    }
1501                    Ok::<(), DbSqlError>(())
1502                })
1503            })
1504            .await?;
1505
1506        let stats = db.get_ticket_statistics(None).await?;
1507        assert_eq!(
1508            HoprBalance::from(TICKET_VALUE * (COUNT_TICKETS - TO_REDEEM)),
1509            stats.unredeemed_value,
1510            "unredeemed balance must match"
1511        );
1512        assert_eq!(
1513            HoprBalance::from(TICKET_VALUE * TO_REDEEM),
1514            stats.redeemed_value,
1515            "there must be a redeemed value"
1516        );
1517
1518        assert_eq!(
1519            stats,
1520            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1521            "per channel stats must be same"
1522        );
1523
1524        Ok(())
1525    }
1526
1527    #[tokio::test]
1528    async fn test_mark_redeem_should_not_mark_redeem_twice() -> anyhow::Result<()> {
1529        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1530
1531        let ticket = init_db_with_tickets(&db, 1)
1532            .await?
1533            .1
1534            .pop()
1535            .context("should contain a ticket")?;
1536
1537        db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed).await?;
1538        assert_eq!(0, db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed).await?);
1539
1540        Ok(())
1541    }
1542
1543    #[tokio::test]
1544    async fn test_mark_redeem_should_redeem_all_tickets() -> anyhow::Result<()> {
1545        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1546
1547        let count_tickets = 10;
1548        let channel = init_db_with_tickets(&db, count_tickets).await?.0;
1549
1550        let count_marked = db.mark_tickets_as((&channel).into(), TicketMarker::Redeemed).await?;
1551        assert_eq!(count_tickets, count_marked as u64, "must mark all tickets in channel");
1552
1553        Ok(())
1554    }
1555
1556    #[tokio::test]
1557    async fn test_mark_tickets_neglected() -> anyhow::Result<()> {
1558        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1559        const COUNT_TICKETS: u64 = 10;
1560
1561        let (channel, _) = init_db_with_tickets(&db, COUNT_TICKETS).await?;
1562
1563        let stats = db.get_ticket_statistics(None).await?;
1564        assert_eq!(
1565            HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
1566            stats.unredeemed_value,
1567            "unredeemed balance must match"
1568        );
1569        assert_eq!(
1570            HoprBalance::zero(),
1571            stats.neglected_value,
1572            "there must be 0 redeemed value"
1573        );
1574
1575        assert_eq!(
1576            stats,
1577            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1578            "per channel stats must be same"
1579        );
1580
1581        db.mark_tickets_as((&channel).into(), TicketMarker::Neglected).await?;
1582
1583        let stats = db.get_ticket_statistics(None).await?;
1584        assert_eq!(
1585            HoprBalance::zero(),
1586            stats.unredeemed_value,
1587            "unredeemed balance must be zero"
1588        );
1589        assert_eq!(
1590            HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
1591            stats.neglected_value,
1592            "there must be a neglected value"
1593        );
1594
1595        assert_eq!(
1596            stats,
1597            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1598            "per channel stats must be same"
1599        );
1600
1601        Ok(())
1602    }
1603
1604    #[tokio::test]
1605    async fn test_mark_unsaved_ticket_rejected() -> anyhow::Result<()> {
1606        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1607
1608        let (_, mut ticket) = init_db_with_tickets(&db, 1).await?;
1609        let ticket = ticket.pop().context("ticket should be present")?.ticket;
1610
1611        let stats = db.get_ticket_statistics(None).await?;
1612        assert_eq!(HoprBalance::zero(), stats.rejected_value);
1613        assert_eq!(
1614            stats,
1615            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1616            "per channel stats must be same"
1617        );
1618
1619        db.mark_unsaved_ticket_rejected(ticket.verified_ticket()).await?;
1620
1621        let stats = db.get_ticket_statistics(None).await?;
1622        assert_eq!(ticket.verified_ticket().amount, stats.rejected_value);
1623        assert_eq!(
1624            stats,
1625            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1626            "per channel stats must be same"
1627        );
1628
1629        Ok(())
1630    }
1631
1632    #[tokio::test]
1633    async fn test_update_tickets_states_and_fetch() -> anyhow::Result<()> {
1634        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1635        db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
1636            .await?;
1637
1638        let channel = init_db_with_tickets(&db, 10).await?.0;
1639
1640        let selector = TicketSelector::from(&channel).with_index(5);
1641
1642        let v: Vec<AcknowledgedTicket> = db
1643            .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
1644            .await?
1645            .collect()
1646            .await;
1647
1648        assert_eq!(1, v.len(), "single ticket must be updated");
1649        assert_eq!(
1650            AcknowledgedTicketStatus::BeingRedeemed,
1651            v.first().context("should contain a ticket")?.status,
1652            "status must be set"
1653        );
1654
1655        let selector = TicketSelector::from(&channel).with_state(AcknowledgedTicketStatus::Untouched);
1656
1657        let v: Vec<AcknowledgedTicket> = db
1658            .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
1659            .await?
1660            .collect()
1661            .await;
1662
1663        assert_eq!(9, v.len(), "only specific tickets must have state set");
1664        assert!(
1665            v.iter().all(|t| t.verified_ticket().index != 5),
1666            "only tickets with different state must update"
1667        );
1668        assert!(
1669            v.iter().all(|t| t.status == AcknowledgedTicketStatus::BeingRedeemed),
1670            "tickets must have updated state"
1671        );
1672
1673        Ok(())
1674    }
1675
1676    #[tokio::test]
1677    async fn test_update_tickets_states() -> anyhow::Result<()> {
1678        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
1679        db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
1680            .await?;
1681
1682        let channel = init_db_with_tickets(&db, 10).await?.0;
1683        let selector = TicketSelector::from(&channel).with_state(AcknowledgedTicketStatus::Untouched);
1684
1685        db.update_ticket_states(selector.clone(), AcknowledgedTicketStatus::BeingRedeemed)
1686            .await?;
1687
1688        let v: Vec<AcknowledgedTicket> = db
1689            .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
1690            .await?
1691            .collect()
1692            .await;
1693
1694        assert!(v.is_empty(), "must not update if already updated");
1695
1696        Ok(())
1697    }
1698
1699    #[tokio::test]
1700    async fn test_ticket_index_should_be_zero_if_not_yet_present() -> anyhow::Result<()> {
1701        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1702
1703        let hash = Hash::default();
1704
1705        let idx = db.get_outgoing_ticket_index(hash).await?;
1706        assert_eq!(0, idx.load(Ordering::SeqCst), "initial index must be zero");
1707
1708        let r = hopr_db_entity::outgoing_ticket_index::Entity::find()
1709            .filter(hopr_db_entity::outgoing_ticket_index::Column::ChannelId.eq(hash.to_hex()))
1710            .one(&db.tickets_db)
1711            .await?
1712            .context("index must exist")?;
1713
1714        assert_eq!(0, U256::from_be_bytes(r.index).as_u64(), "index must be zero");
1715
1716        Ok(())
1717    }
1718
1719    #[tokio::test]
1720    async fn test_ticket_stats_must_fail_for_non_existing_channel() -> anyhow::Result<()> {
1721        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1722
1723        db.get_ticket_statistics(Some(*CHANNEL_ID))
1724            .await
1725            .expect_err("must fail for non-existing channel");
1726
1727        Ok(())
1728    }
1729
1730    #[tokio::test]
1731    async fn test_ticket_stats_must_be_zero_when_no_tickets() -> anyhow::Result<()> {
1732        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1733
1734        let channel = ChannelEntry::new(
1735            BOB.public().to_address(),
1736            ALICE.public().to_address(),
1737            u32::MAX.into(),
1738            0.into(),
1739            ChannelStatus::Open,
1740            4_u32.into(),
1741        );
1742
1743        db.upsert_channel(None, channel).await?;
1744
1745        let stats = db.get_ticket_statistics(Some(*CHANNEL_ID)).await?;
1746
1747        assert_eq!(
1748            ChannelTicketStatistics::default(),
1749            stats,
1750            "must be equal to default which is all zeros"
1751        );
1752
1753        assert_eq!(
1754            stats,
1755            db.get_ticket_statistics(None).await?,
1756            "per-channel stats must be the same as global stats"
1757        );
1758
1759        Ok(())
1760    }
1761
1762    #[tokio::test]
1763    async fn test_ticket_stats_must_be_different_per_channel() -> anyhow::Result<()> {
1764        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1765
1766        let channel_1 = ChannelEntry::new(
1767            BOB.public().to_address(),
1768            ALICE.public().to_address(),
1769            u32::MAX.into(),
1770            0.into(),
1771            ChannelStatus::Open,
1772            4_u32.into(),
1773        );
1774
1775        db.upsert_channel(None, channel_1).await?;
1776
1777        let channel_2 = ChannelEntry::new(
1778            ALICE.public().to_address(),
1779            BOB.public().to_address(),
1780            u32::MAX.into(),
1781            0.into(),
1782            ChannelStatus::Open,
1783            4_u32.into(),
1784        );
1785
1786        db.upsert_channel(None, channel_2).await?;
1787
1788        let t1 = generate_random_ack_ticket(&BOB, &ALICE, 1, 1, 1.0)?;
1789        let t2 = generate_random_ack_ticket(&ALICE, &BOB, 1, 1, 1.0)?;
1790
1791        let value = t1.verified_ticket().amount;
1792
1793        db.upsert_ticket(None, t1).await?;
1794        db.upsert_ticket(None, t2).await?;
1795
1796        let stats_1 = db
1797            .get_ticket_statistics(Some(generate_channel_id(
1798                &BOB.public().to_address(),
1799                &ALICE.public().to_address(),
1800            )))
1801            .await?;
1802
1803        let stats_2 = db
1804            .get_ticket_statistics(Some(generate_channel_id(
1805                &ALICE.public().to_address(),
1806                &BOB.public().to_address(),
1807            )))
1808            .await?;
1809
1810        assert_eq!(value, stats_1.unredeemed_value);
1811        assert_eq!(value, stats_2.unredeemed_value);
1812
1813        assert_eq!(HoprBalance::zero(), stats_1.neglected_value);
1814        assert_eq!(HoprBalance::zero(), stats_2.neglected_value);
1815
1816        assert_eq!(stats_1, stats_2);
1817
1818        db.mark_tickets_as(channel_1.into(), TicketMarker::Neglected).await?;
1819
1820        let stats_1 = db
1821            .get_ticket_statistics(Some(generate_channel_id(
1822                &BOB.public().to_address(),
1823                &ALICE.public().to_address(),
1824            )))
1825            .await?;
1826
1827        let stats_2 = db
1828            .get_ticket_statistics(Some(generate_channel_id(
1829                &ALICE.public().to_address(),
1830                &BOB.public().to_address(),
1831            )))
1832            .await?;
1833
1834        assert_eq!(HoprBalance::zero(), stats_1.unredeemed_value);
1835        assert_eq!(value, stats_1.neglected_value);
1836
1837        assert_eq!(HoprBalance::zero(), stats_2.neglected_value);
1838
1839        Ok(())
1840    }
1841
1842    #[tokio::test]
1843    async fn test_ticket_index_compare_and_set_and_increment() -> anyhow::Result<()> {
1844        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1845
1846        let hash = Hash::default();
1847
1848        let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1849        assert_eq!(0, old_idx, "old value must be 0");
1850
1851        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1852        assert_eq!(1, new_idx, "new value must be 1");
1853
1854        let old_idx = db.increment_outgoing_ticket_index(hash).await?;
1855        assert_eq!(1, old_idx, "old value must be 1");
1856
1857        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1858        assert_eq!(2, new_idx, "new value must be 2");
1859
1860        Ok(())
1861    }
1862
1863    #[tokio::test]
1864    async fn test_ticket_index_compare_and_set_must_not_decrease() -> anyhow::Result<()> {
1865        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1866
1867        let hash = Hash::default();
1868
1869        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1870        assert_eq!(0, new_idx, "value must be 0");
1871
1872        db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1873
1874        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1875        assert_eq!(1, new_idx, "new value must be 1");
1876
1877        let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 0).await?;
1878        assert_eq!(1, old_idx, "old value must be 1");
1879        assert_eq!(1, new_idx, "new value must be 1");
1880
1881        let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1882        assert_eq!(1, old_idx, "old value must be 1");
1883        assert_eq!(1, new_idx, "new value must be 1");
1884
1885        Ok(())
1886    }
1887
1888    #[tokio::test]
1889    async fn test_ticket_index_reset() -> anyhow::Result<()> {
1890        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1891
1892        let hash = Hash::default();
1893
1894        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1895        assert_eq!(0, new_idx, "value must be 0");
1896
1897        db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1898
1899        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1900        assert_eq!(1, new_idx, "new value must be 1");
1901
1902        let old_idx = db.reset_outgoing_ticket_index(hash).await?;
1903        assert_eq!(1, old_idx, "old value must be 1");
1904
1905        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1906        assert_eq!(0, new_idx, "new value must be 0");
1907        Ok(())
1908    }
1909
1910    #[tokio::test]
1911    async fn test_persist_ticket_indices() -> anyhow::Result<()> {
1912        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1913
1914        let hash_1 = Hash::default();
1915        let hash_2 = Hash::from(hopr_crypto_random::random_bytes());
1916
1917        db.get_outgoing_ticket_index(hash_1).await?;
1918        db.compare_and_set_outgoing_ticket_index(hash_2, 10).await?;
1919
1920        let persisted = db.persist_outgoing_ticket_indices().await?;
1921        assert_eq!(1, persisted);
1922
1923        let indices = hopr_db_entity::outgoing_ticket_index::Entity::find()
1924            .all(&db.tickets_db)
1925            .await?;
1926        let idx_1 = indices
1927            .iter()
1928            .find(|idx| idx.channel_id == hash_1.to_hex())
1929            .context("must contain index 1")?;
1930        let idx_2 = indices
1931            .iter()
1932            .find(|idx| idx.channel_id == hash_2.to_hex())
1933            .context("must contain index 2")?;
1934        assert_eq!(0, U256::from_be_bytes(&idx_1.index).as_u64(), "index must be 0");
1935        assert_eq!(10, U256::from_be_bytes(&idx_2.index).as_u64(), "index must be 10");
1936
1937        db.compare_and_set_outgoing_ticket_index(hash_1, 3).await?;
1938        db.increment_outgoing_ticket_index(hash_2).await?;
1939
1940        let persisted = db.persist_outgoing_ticket_indices().await?;
1941        assert_eq!(2, persisted);
1942
1943        let indices = hopr_db_entity::outgoing_ticket_index::Entity::find()
1944            .all(&db.tickets_db)
1945            .await?;
1946        let idx_1 = indices
1947            .iter()
1948            .find(|idx| idx.channel_id == hash_1.to_hex())
1949            .context("must contain index 1")?;
1950        let idx_2 = indices
1951            .iter()
1952            .find(|idx| idx.channel_id == hash_2.to_hex())
1953            .context("must contain index 2")?;
1954        assert_eq!(3, U256::from_be_bytes(&idx_1.index).as_u64(), "index must be 3");
1955        assert_eq!(11, U256::from_be_bytes(&idx_2.index).as_u64(), "index must be 11");
1956        Ok(())
1957    }
1958
1959    #[tokio::test]
1960    async fn test_cache_can_be_cloned_but_referencing_the_original_cache_storage() -> anyhow::Result<()> {
1961        let cache: moka::future::Cache<i64, i64> = moka::future::Cache::new(5);
1962
1963        assert_eq!(cache.weighted_size(), 0);
1964
1965        cache.insert(1, 1).await;
1966        cache.insert(2, 2).await;
1967
1968        let clone = cache.clone();
1969
1970        cache.remove(&1).await;
1971        cache.remove(&2).await;
1972
1973        assert_eq!(cache.get(&1).await, None);
1974        assert_eq!(cache.get(&1).await, clone.get(&1).await);
1975        Ok(())
1976    }
1977
1978    fn dummy_ticket_model(channel_id: Hash, idx: u64, idx_offset: u32, amount: u32) -> ticket::Model {
1979        ticket::Model {
1980            id: 0,
1981            channel_id: channel_id.to_string(),
1982            amount: U256::from(amount).to_be_bytes().to_vec(),
1983            index: idx.to_be_bytes().to_vec(),
1984            index_offset: idx_offset as i32,
1985            winning_probability: hex!("0020C49BA5E34F").to_vec(), // 0.0005
1986            channel_epoch: vec![],
1987            signature: vec![],
1988            response: vec![],
1989            state: 0,
1990            hash: vec![],
1991        }
1992    }
1993
1994    #[tokio::test]
1995    async fn test_aggregation_prerequisites_default_filter_no_tickets() -> anyhow::Result<()> {
1996        let prerequisites = AggregationPrerequisites::default();
1997        assert_eq!(None, prerequisites.min_unaggregated_ratio);
1998        assert_eq!(None, prerequisites.min_ticket_count);
1999
2000        let channel = ChannelEntry::new(
2001            BOB.public().to_address(),
2002            ALICE.public().to_address(),
2003            u32::MAX.into(),
2004            2.into(),
2005            ChannelStatus::Open,
2006            4_u32.into(),
2007        );
2008
2009        let dummy_tickets = vec![dummy_ticket_model(channel.get_id(), 1, 1, 1)];
2010
2011        let filtered_tickets =
2012            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2013
2014        assert_eq!(
2015            dummy_tickets, filtered_tickets,
2016            "empty prerequisites must not filter anything"
2017        );
2018        Ok(())
2019    }
2020
2021    #[tokio::test]
2022    async fn test_aggregation_prerequisites_should_filter_out_tickets_with_lower_than_min_win_prob()
2023    -> anyhow::Result<()> {
2024        let prerequisites = AggregationPrerequisites::default();
2025        assert_eq!(None, prerequisites.min_unaggregated_ratio);
2026        assert_eq!(None, prerequisites.min_ticket_count);
2027
2028        let channel = ChannelEntry::new(
2029            BOB.public().to_address(),
2030            ALICE.public().to_address(),
2031            u32::MAX.into(),
2032            2.into(),
2033            ChannelStatus::Open,
2034            4_u32.into(),
2035        );
2036
2037        let dummy_tickets = vec![dummy_ticket_model(channel.get_id(), 1, 1, 1)];
2038
2039        let filtered_tickets =
2040            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0006.try_into()?)?;
2041
2042        assert!(
2043            filtered_tickets.is_empty(),
2044            "must filter out tickets with lower win prob"
2045        );
2046        Ok(())
2047    }
2048
2049    #[tokio::test]
2050    async fn test_aggregation_prerequisites_must_trim_tickets_exceeding_channel_balance() -> anyhow::Result<()> {
2051        const TICKET_COUNT: usize = 110;
2052
2053        let prerequisites = AggregationPrerequisites::default();
2054        assert_eq!(None, prerequisites.min_unaggregated_ratio);
2055        assert_eq!(None, prerequisites.min_ticket_count);
2056
2057        let channel = ChannelEntry::new(
2058            BOB.public().to_address(),
2059            ALICE.public().to_address(),
2060            100.into(),
2061            (TICKET_COUNT + 1).into(),
2062            ChannelStatus::Open,
2063            4_u32.into(),
2064        );
2065
2066        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2067            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2068            .collect();
2069
2070        let filtered_tickets =
2071            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2072
2073        assert_eq!(
2074            100,
2075            filtered_tickets.len(),
2076            "must take only tickets up to channel balance"
2077        );
2078        assert!(
2079            filtered_tickets
2080                .into_iter()
2081                .map(|t| U256::from_be_bytes(t.amount).as_u64())
2082                .sum::<u64>()
2083                <= channel.balance.amount().as_u64(),
2084            "filtered tickets must not exceed channel balance"
2085        );
2086        Ok(())
2087    }
2088
2089    #[tokio::test]
2090    async fn test_aggregation_prerequisites_must_return_empty_when_minimum_ticket_count_not_met() -> anyhow::Result<()>
2091    {
2092        const TICKET_COUNT: usize = 10;
2093
2094        let prerequisites = AggregationPrerequisites {
2095            min_ticket_count: Some(TICKET_COUNT + 1),
2096            min_unaggregated_ratio: None,
2097        };
2098
2099        let channel = ChannelEntry::new(
2100            BOB.public().to_address(),
2101            ALICE.public().to_address(),
2102            100.into(),
2103            (TICKET_COUNT + 1).into(),
2104            ChannelStatus::Open,
2105            4_u32.into(),
2106        );
2107
2108        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2109            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2110            .collect();
2111
2112        let filtered_tickets =
2113            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2114
2115        assert!(
2116            filtered_tickets.is_empty(),
2117            "must return empty when min_ticket_count is not met"
2118        );
2119
2120        Ok(())
2121    }
2122
2123    #[tokio::test]
2124    async fn test_aggregation_prerequisites_must_return_empty_when_minimum_unaggregated_ratio_is_not_met()
2125    -> anyhow::Result<()> {
2126        const TICKET_COUNT: usize = 10;
2127
2128        let prerequisites = AggregationPrerequisites {
2129            min_ticket_count: None,
2130            min_unaggregated_ratio: Some(0.9),
2131        };
2132
2133        let channel = ChannelEntry::new(
2134            BOB.public().to_address(),
2135            ALICE.public().to_address(),
2136            100.into(),
2137            (TICKET_COUNT + 1).into(),
2138            ChannelStatus::Open,
2139            4_u32.into(),
2140        );
2141
2142        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2143            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2144            .collect();
2145
2146        let filtered_tickets =
2147            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2148
2149        assert!(
2150            filtered_tickets.is_empty(),
2151            "must return empty when min_unaggregated_ratio is not met"
2152        );
2153
2154        Ok(())
2155    }
2156
2157    #[tokio::test]
2158    async fn test_aggregation_prerequisites_must_return_all_when_minimum_ticket_count_is_met() -> anyhow::Result<()> {
2159        const TICKET_COUNT: usize = 10;
2160
2161        let prerequisites = AggregationPrerequisites {
2162            min_ticket_count: Some(TICKET_COUNT),
2163            min_unaggregated_ratio: None,
2164        };
2165
2166        let channel = ChannelEntry::new(
2167            BOB.public().to_address(),
2168            ALICE.public().to_address(),
2169            100.into(),
2170            (TICKET_COUNT + 1).into(),
2171            ChannelStatus::Open,
2172            4_u32.into(),
2173        );
2174
2175        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2176            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2177            .collect();
2178
2179        let filtered_tickets =
2180            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2181
2182        assert!(!filtered_tickets.is_empty(), "must not return empty");
2183        assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2184        Ok(())
2185    }
2186
2187    #[tokio::test]
2188    async fn test_aggregation_prerequisites_must_return_all_when_minimum_ticket_count_is_met_regardless_ratio()
2189    -> anyhow::Result<()> {
2190        const TICKET_COUNT: usize = 10;
2191
2192        let prerequisites = AggregationPrerequisites {
2193            min_ticket_count: Some(TICKET_COUNT),
2194            min_unaggregated_ratio: Some(0.9),
2195        };
2196
2197        let channel = ChannelEntry::new(
2198            BOB.public().to_address(),
2199            ALICE.public().to_address(),
2200            100.into(),
2201            (TICKET_COUNT + 1).into(),
2202            ChannelStatus::Open,
2203            4_u32.into(),
2204        );
2205
2206        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2207            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2208            .collect();
2209
2210        let filtered_tickets =
2211            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2212
2213        assert!(!filtered_tickets.is_empty(), "must not return empty");
2214        assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2215        Ok(())
2216    }
2217
2218    #[tokio::test]
2219    async fn test_aggregation_prerequisites_must_return_all_when_minimum_unaggregated_ratio_is_met()
2220    -> anyhow::Result<()> {
2221        const TICKET_COUNT: usize = 90;
2222
2223        let prerequisites = AggregationPrerequisites {
2224            min_ticket_count: None,
2225            min_unaggregated_ratio: Some(0.9),
2226        };
2227
2228        let channel = ChannelEntry::new(
2229            BOB.public().to_address(),
2230            ALICE.public().to_address(),
2231            100.into(),
2232            (TICKET_COUNT + 1).into(),
2233            ChannelStatus::Open,
2234            4_u32.into(),
2235        );
2236
2237        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2238            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2239            .collect();
2240
2241        let filtered_tickets =
2242            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2243
2244        assert!(!filtered_tickets.is_empty(), "must not return empty");
2245        assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2246        Ok(())
2247    }
2248
2249    #[tokio::test]
2250    async fn test_aggregation_prerequisites_must_return_all_when_minimum_unaggregated_ratio_is_met_regardless_count()
2251    -> anyhow::Result<()> {
2252        const TICKET_COUNT: usize = 90;
2253
2254        let prerequisites = AggregationPrerequisites {
2255            min_ticket_count: Some(TICKET_COUNT + 1),
2256            min_unaggregated_ratio: Some(0.9),
2257        };
2258
2259        let channel = ChannelEntry::new(
2260            BOB.public().to_address(),
2261            ALICE.public().to_address(),
2262            100.into(),
2263            (TICKET_COUNT + 1).into(),
2264            ChannelStatus::Open,
2265            4_u32.into(),
2266        );
2267
2268        let dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2269            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2270            .collect();
2271
2272        let filtered_tickets =
2273            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2274
2275        assert!(!filtered_tickets.is_empty(), "must not return empty");
2276        assert_eq!(dummy_tickets, filtered_tickets, "return all tickets");
2277        Ok(())
2278    }
2279
2280    #[tokio::test]
2281    async fn test_aggregation_prerequisites_must_return_tickets_when_minimum_incl_aggregated_ratio_is_met()
2282    -> anyhow::Result<()> {
2283        const TICKET_COUNT: usize = 90;
2284
2285        let prerequisites = AggregationPrerequisites {
2286            min_ticket_count: None,
2287            min_unaggregated_ratio: Some(0.9),
2288        };
2289
2290        let channel = ChannelEntry::new(
2291            BOB.public().to_address(),
2292            ALICE.public().to_address(),
2293            100.into(),
2294            (TICKET_COUNT + 1).into(),
2295            ChannelStatus::Open,
2296            4_u32.into(),
2297        );
2298
2299        let mut dummy_tickets: Vec<ticket::Model> = (0..TICKET_COUNT)
2300            .map(|i| dummy_ticket_model(channel.get_id(), i as u64, 1, 1))
2301            .collect();
2302        dummy_tickets[0].index_offset = 2; // Make this ticket aggregated
2303
2304        let filtered_tickets =
2305            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2306
2307        assert_eq!(filtered_tickets.len(), TICKET_COUNT);
2308        Ok(())
2309    }
2310
2311    #[tokio::test]
2312    async fn test_aggregation_prerequisites_must_return_empty_when_minimum_only_unaggregated_ratio_is_met_in_single_ticket_only()
2313    -> anyhow::Result<()> {
2314        let prerequisites = AggregationPrerequisites {
2315            min_ticket_count: None,
2316            min_unaggregated_ratio: Some(0.9),
2317        };
2318
2319        let channel = ChannelEntry::new(
2320            BOB.public().to_address(),
2321            ALICE.public().to_address(),
2322            100.into(),
2323            2.into(),
2324            ChannelStatus::Open,
2325            4_u32.into(),
2326        );
2327
2328        // Single aggregated ticket exceeding the min_unaggregated_ratio
2329        let dummy_tickets = vec![dummy_ticket_model(channel.get_id(), 1, 2, 110)];
2330
2331        let filtered_tickets =
2332            filter_satisfying_ticket_models(prerequisites, dummy_tickets.clone(), &channel, 0.0005.try_into()?)?;
2333
2334        assert!(filtered_tickets.is_empty(), "must return empty");
2335        Ok(())
2336    }
2337
2338    async fn create_alice_db_with_tickets_from_bob(
2339        ticket_count: usize,
2340    ) -> anyhow::Result<(HoprDb, ChannelEntry, Vec<AcknowledgedTicket>)> {
2341        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
2342
2343        db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
2344            .await?;
2345
2346        add_peer_mappings(
2347            &db,
2348            vec![
2349                (ALICE_OFFCHAIN.clone(), ALICE.clone()),
2350                (BOB_OFFCHAIN.clone(), BOB.clone()),
2351            ],
2352        )
2353        .await?;
2354
2355        let (channel, tickets) = init_db_with_tickets(&db, ticket_count as u64).await?;
2356
2357        Ok((db, channel, tickets))
2358    }
2359
2360    #[tokio::test]
2361    async fn test_ticket_aggregation_should_fail_if_any_ticket_is_being_aggregated_in_that_channel()
2362    -> anyhow::Result<()> {
2363        const COUNT_TICKETS: usize = 5;
2364
2365        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2366
2367        assert_eq!(tickets.len(), COUNT_TICKETS);
2368
2369        let existing_channel_with_multiple_tickets = channel.get_id();
2370
2371        // mark the first ticket as being aggregated
2372        let mut ticket = hopr_db_entity::ticket::Entity::find()
2373            .one(&db.tickets_db)
2374            .await?
2375            .context("should have an active model")?
2376            .into_active_model();
2377        ticket.state = Set(AcknowledgedTicketStatus::BeingAggregated as i8);
2378        ticket.save(&db.tickets_db).await?;
2379
2380        assert!(
2381            db.prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2382                .await
2383                .is_err()
2384        );
2385
2386        Ok(())
2387    }
2388
2389    #[tokio::test]
2390    async fn test_ticket_aggregation_should_not_offer_tickets_with_lower_than_min_win_prob() -> anyhow::Result<()> {
2391        const COUNT_TICKETS: usize = 5;
2392
2393        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2394
2395        assert_eq!(tickets.len(), COUNT_TICKETS);
2396
2397        let existing_channel_with_multiple_tickets = channel.get_id();
2398
2399        // Decrease the win prob of one ticket
2400        let mut ticket = hopr_db_entity::ticket::Entity::find()
2401            .one(&db.tickets_db)
2402            .await?
2403            .context("should have an active model")?
2404            .into_active_model();
2405        ticket.winning_probability = Set(WinningProbability::try_from_f64(0.5)?.as_ref().to_vec());
2406        ticket.save(&db.tickets_db).await?;
2407
2408        let prepared_tickets = db
2409            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2410            .await?
2411            .ok_or(anyhow!("should contain tickets"))?
2412            .1;
2413
2414        assert_eq!(COUNT_TICKETS - 1, prepared_tickets.len());
2415
2416        Ok(())
2417    }
2418
2419    #[tokio::test]
2420    async fn test_ticket_aggregation_prepare_request_with_0_tickets_should_return_empty_result() -> anyhow::Result<()> {
2421        const COUNT_TICKETS: usize = 0;
2422
2423        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2424
2425        assert_eq!(tickets.len(), COUNT_TICKETS);
2426
2427        let existing_channel_with_0_tickets = channel.get_id();
2428
2429        let actual = db
2430            .prepare_aggregation_in_channel(&existing_channel_with_0_tickets, Default::default())
2431            .await?;
2432
2433        assert_eq!(actual, None);
2434
2435        Ok(())
2436    }
2437
2438    #[tokio::test]
2439    async fn test_ticket_aggregation_prepare_request_with_multiple_tickets_should_return_that_ticket()
2440    -> anyhow::Result<()> {
2441        const COUNT_TICKETS: usize = 2;
2442
2443        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2444        let tickets: Vec<TransferableWinningTicket> = tickets
2445            .into_iter()
2446            .map(|t| t.into_transferable(&ALICE, &Hash::default()))
2447            .collect::<hopr_internal_types::errors::Result<Vec<TransferableWinningTicket>>>()?;
2448
2449        assert_eq!(tickets.len(), COUNT_TICKETS);
2450
2451        let existing_channel_with_multiple_tickets = channel.get_id();
2452
2453        let actual = db
2454            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2455            .await?;
2456
2457        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2458
2459        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2460            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2461            .count(&db.tickets_db)
2462            .await? as usize;
2463
2464        assert_eq!(actual_being_aggregated_count, COUNT_TICKETS);
2465
2466        Ok(())
2467    }
2468
2469    #[tokio::test]
2470    async fn test_ticket_aggregation_prepare_request_with_duplicate_tickets_should_return_dedup_aggregated_ticket()
2471    -> anyhow::Result<()> {
2472        let (db, channel, _) = create_alice_db_with_tickets_from_bob(0).await?;
2473        let tickets = vec![
2474            generate_random_ack_ticket(&BOB, &ALICE, 1, 1, 1.0),
2475            generate_random_ack_ticket(&BOB, &ALICE, 0, 2, 1.0),
2476            generate_random_ack_ticket(&BOB, &ALICE, 2, 1, 1.0),
2477            generate_random_ack_ticket(&BOB, &ALICE, 3, 1, 1.0),
2478        ]
2479        .into_iter()
2480        .collect::<anyhow::Result<Vec<AcknowledgedTicket>>>()?;
2481
2482        let tickets_clone = tickets.clone();
2483        let db_clone = db.clone();
2484        db.nest_transaction_in_db(None, TargetDb::Tickets)
2485            .await?
2486            .perform(|tx| {
2487                Box::pin(async move {
2488                    for ticket in tickets_clone {
2489                        db_clone.upsert_ticket(tx.into(), ticket).await?;
2490                    }
2491                    Ok::<_, DbError>(())
2492                })
2493            })
2494            .await?;
2495
2496        let existing_channel_with_multiple_tickets = channel.get_id();
2497        let stats = db.get_ticket_statistics(Some(channel.get_id())).await?;
2498        assert_eq!(stats.neglected_value, HoprBalance::zero());
2499
2500        let actual = db
2501            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2502            .await?;
2503
2504        let mut tickets = tickets
2505            .into_iter()
2506            .map(|t| t.into_transferable(&ALICE, &Hash::default()))
2507            .collect::<hopr_internal_types::errors::Result<Vec<_>>>()?;
2508
2509        // We expect the first ticket to be removed
2510        tickets.remove(0);
2511        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2512
2513        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2514            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2515            .count(&db.tickets_db)
2516            .await? as usize;
2517
2518        assert_eq!(actual_being_aggregated_count, 3);
2519
2520        Ok(())
2521    }
2522
2523    #[tokio::test]
2524    async fn test_ticket_aggregation_prepare_request_with_a_being_redeemed_ticket_should_aggregate_only_the_tickets_following_it()
2525    -> anyhow::Result<()> {
2526        const COUNT_TICKETS: usize = 5;
2527
2528        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2529        let mut tickets = tickets
2530            .into_iter()
2531            .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2532            .collect::<Vec<_>>();
2533
2534        assert_eq!(tickets.len(), COUNT_TICKETS);
2535
2536        let existing_channel_with_multiple_tickets = channel.get_id();
2537
2538        // mark the first ticket as being redeemed
2539        let mut ticket = hopr_db_entity::ticket::Entity::find()
2540            .one(&db.tickets_db)
2541            .await?
2542            .context("should have 1 active model")?
2543            .into_active_model();
2544        ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
2545        ticket.save(&db.tickets_db).await?;
2546
2547        let actual = db
2548            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2549            .await?;
2550
2551        tickets.remove(0);
2552        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2553
2554        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2555            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2556            .count(&db.tickets_db)
2557            .await? as usize;
2558
2559        assert_eq!(actual_being_aggregated_count, COUNT_TICKETS - 1);
2560
2561        Ok(())
2562    }
2563
2564    #[tokio::test]
2565    async fn test_ticket_aggregation_prepare_request_with_some_requirements_should_return_when_ticket_threshold_is_met()
2566    -> anyhow::Result<()> {
2567        const COUNT_TICKETS: usize = 5;
2568
2569        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2570        let tickets = tickets
2571            .into_iter()
2572            .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2573            .collect::<Vec<_>>();
2574
2575        assert_eq!(tickets.len(), COUNT_TICKETS);
2576
2577        let existing_channel_with_multiple_tickets = channel.get_id();
2578
2579        let constraints = AggregationPrerequisites {
2580            min_ticket_count: Some(COUNT_TICKETS - 1),
2581            min_unaggregated_ratio: None,
2582        };
2583        let actual = db
2584            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, constraints)
2585            .await?;
2586
2587        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2588
2589        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2590            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2591            .count(&db.tickets_db)
2592            .await? as usize;
2593
2594        assert_eq!(actual_being_aggregated_count, COUNT_TICKETS);
2595
2596        Ok(())
2597    }
2598
2599    #[tokio::test]
2600    async fn test_ticket_aggregation_prepare_request_with_some_requirements_should_not_return_when_ticket_threshold_is_not_met()
2601    -> anyhow::Result<()> {
2602        const COUNT_TICKETS: usize = 2;
2603
2604        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2605
2606        assert_eq!(tickets.len(), COUNT_TICKETS);
2607
2608        let existing_channel_with_multiple_tickets = channel.get_id();
2609
2610        let constraints = AggregationPrerequisites {
2611            min_ticket_count: Some(COUNT_TICKETS + 1),
2612            min_unaggregated_ratio: None,
2613        };
2614        let actual = db
2615            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, constraints)
2616            .await?;
2617
2618        assert_eq!(actual, None);
2619
2620        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2621            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2622            .count(&db.tickets_db)
2623            .await? as usize;
2624
2625        assert_eq!(actual_being_aggregated_count, 0);
2626
2627        Ok(())
2628    }
2629
2630    #[tokio::test]
2631    async fn test_ticket_aggregation_prepare_request_with_no_aggregatable_tickets_should_return_nothing()
2632    -> anyhow::Result<()> {
2633        const COUNT_TICKETS: usize = 3;
2634
2635        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2636
2637        assert_eq!(tickets.len(), { COUNT_TICKETS });
2638
2639        let existing_channel_with_multiple_tickets = channel.get_id();
2640
2641        // mark all tickets as being redeemed
2642        for ticket in hopr_db_entity::ticket::Entity::find()
2643            .all(&db.tickets_db)
2644            .await?
2645            .into_iter()
2646        {
2647            let mut ticket = ticket.into_active_model();
2648            ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
2649            ticket.save(&db.tickets_db).await?;
2650        }
2651
2652        let actual = db
2653            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2654            .await?;
2655
2656        assert_eq!(actual, None);
2657
2658        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2659            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2660            .count(&db.tickets_db)
2661            .await? as usize;
2662
2663        assert_eq!(actual_being_aggregated_count, 0);
2664
2665        Ok(())
2666    }
2667
2668    #[tokio::test]
2669    async fn test_ticket_aggregation_rollback_should_rollback_all_the_being_aggregated_tickets_but_nothing_else()
2670    -> anyhow::Result<()> {
2671        const COUNT_TICKETS: usize = 5;
2672
2673        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2674
2675        assert_eq!(tickets.len(), COUNT_TICKETS);
2676
2677        let existing_channel_with_multiple_tickets = channel.get_id();
2678
2679        // mark the first ticket as being redeemed
2680        let mut ticket = hopr_db_entity::ticket::Entity::find()
2681            .one(&db.tickets_db)
2682            .await?
2683            .context("should have one active model")?
2684            .into_active_model();
2685        ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
2686        ticket.save(&db.tickets_db).await?;
2687
2688        assert!(
2689            db.prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2690                .await
2691                .is_ok()
2692        );
2693
2694        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2695            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2696            .count(&db.tickets_db)
2697            .await? as usize;
2698
2699        assert_eq!(actual_being_aggregated_count, COUNT_TICKETS - 1);
2700
2701        assert!(
2702            db.rollback_aggregation_in_channel(existing_channel_with_multiple_tickets)
2703                .await
2704                .is_ok()
2705        );
2706
2707        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2708            .filter(hopr_db_entity::ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2709            .count(&db.tickets_db)
2710            .await? as usize;
2711
2712        assert_eq!(actual_being_aggregated_count, 0);
2713
2714        Ok(())
2715    }
2716
2717    #[tokio::test]
2718    async fn test_ticket_aggregation_should_replace_the_tickets_with_a_correctly_aggregated_ticket()
2719    -> anyhow::Result<()> {
2720        const COUNT_TICKETS: usize = 5;
2721
2722        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2723
2724        let (notifier_tx, notifier_rx) = futures::channel::mpsc::unbounded();
2725        db.start_ticket_processing(Some(notifier_tx))?;
2726
2727        let tickets = tickets
2728            .into_iter()
2729            .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2730            .collect::<Vec<_>>();
2731
2732        let first_ticket = tickets.first().context("should contain tickets")?.ticket.clone();
2733        let aggregated_ticket = TicketBuilder::default()
2734            .addresses(&*BOB, &*ALICE)
2735            .amount(
2736                tickets
2737                    .iter()
2738                    .fold(U256::zero(), |acc, v| acc + v.ticket.amount.amount()),
2739            )
2740            .index(first_ticket.index)
2741            .index_offset(
2742                tickets.last().context("should contain tickets")?.ticket.index as u32 - first_ticket.index as u32 + 1,
2743            )
2744            .win_prob(1.0.try_into()?)
2745            .channel_epoch(first_ticket.channel_epoch)
2746            .challenge(first_ticket.challenge)
2747            .build_signed(&BOB, &Hash::default())?;
2748
2749        assert_eq!(tickets.len(), COUNT_TICKETS);
2750
2751        let existing_channel_with_multiple_tickets = channel.get_id();
2752
2753        let actual = db
2754            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2755            .await?;
2756
2757        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2758
2759        let agg_ticket = aggregated_ticket.clone();
2760
2761        let _ = db
2762            .process_received_aggregated_ticket(aggregated_ticket.leak(), &ALICE)
2763            .await?;
2764
2765        pin_mut!(notifier_rx);
2766        let notified_ticket = notifier_rx.next().await.ok_or(anyhow!("must have ticket"))?;
2767
2768        assert_eq!(notified_ticket.verified_ticket(), agg_ticket.verified_ticket());
2769
2770        let actual_being_aggregated_count = hopr_db_entity::ticket::Entity::find()
2771            .filter(ticket::Column::State.eq(AcknowledgedTicketStatus::BeingAggregated as u8))
2772            .count(&db.tickets_db)
2773            .await? as usize;
2774
2775        assert_eq!(actual_being_aggregated_count, 0);
2776
2777        Ok(())
2778    }
2779
2780    #[tokio::test]
2781    async fn test_ticket_aggregation_should_fail_if_the_aggregated_ticket_value_is_lower_than_the_stored_one()
2782    -> anyhow::Result<()> {
2783        const COUNT_TICKETS: usize = 5;
2784
2785        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2786        let tickets = tickets
2787            .into_iter()
2788            .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2789            .collect::<Vec<_>>();
2790
2791        let first_ticket = tickets.first().context("should contain tickets")?.ticket.clone();
2792        let aggregated_ticket = TicketBuilder::default()
2793            .addresses(&*BOB, &*ALICE)
2794            .amount(0)
2795            .index(first_ticket.index)
2796            .index_offset(
2797                tickets.last().context("should contain tickets")?.ticket.index as u32 - first_ticket.index as u32 + 1,
2798            )
2799            .win_prob(1.0.try_into()?)
2800            .channel_epoch(first_ticket.channel_epoch)
2801            .challenge(first_ticket.challenge)
2802            .build_signed(&BOB, &Hash::default())?;
2803
2804        assert_eq!(tickets.len(), COUNT_TICKETS);
2805
2806        let existing_channel_with_multiple_tickets = channel.get_id();
2807
2808        let actual = db
2809            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2810            .await?;
2811
2812        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2813
2814        assert!(
2815            db.process_received_aggregated_ticket(aggregated_ticket.leak(), &ALICE)
2816                .await
2817                .is_err()
2818        );
2819
2820        Ok(())
2821    }
2822
2823    #[tokio::test]
2824    async fn test_ticket_aggregation_should_fail_if_the_aggregated_ticket_win_probability_is_not_equal_to_1()
2825    -> anyhow::Result<()> {
2826        const COUNT_TICKETS: usize = 5;
2827
2828        let (db, channel, tickets) = create_alice_db_with_tickets_from_bob(COUNT_TICKETS).await?;
2829        let tickets = tickets
2830            .into_iter()
2831            .map(|t| t.into_transferable(&ALICE, &Hash::default()).unwrap())
2832            .collect::<Vec<_>>();
2833
2834        let first_ticket = tickets.first().context("should contain tickets")?.ticket.clone();
2835        let aggregated_ticket = TicketBuilder::default()
2836            .addresses(&*BOB, &*ALICE)
2837            .amount(0)
2838            .index(first_ticket.index)
2839            .index_offset(
2840                tickets.last().context("should contain tickets")?.ticket.index as u32 - first_ticket.index as u32 + 1,
2841            )
2842            .win_prob(0.5.try_into()?) // 50% winning prob
2843            .channel_epoch(first_ticket.channel_epoch)
2844            .challenge(first_ticket.challenge)
2845            .build_signed(&BOB, &Hash::default())?;
2846
2847        assert_eq!(tickets.len(), COUNT_TICKETS);
2848
2849        let existing_channel_with_multiple_tickets = channel.get_id();
2850
2851        let actual = db
2852            .prepare_aggregation_in_channel(&existing_channel_with_multiple_tickets, Default::default())
2853            .await?;
2854
2855        assert_eq!(actual, Some((*BOB_OFFCHAIN.public(), tickets, Default::default())));
2856
2857        assert!(
2858            db.process_received_aggregated_ticket(aggregated_ticket.leak(), &ALICE)
2859                .await
2860                .is_err()
2861        );
2862
2863        Ok(())
2864    }
2865
2866    async fn init_db_with_channel(channel: ChannelEntry) -> anyhow::Result<HoprDb> {
2867        let db = HoprDb::new_in_memory(BOB.clone()).await?;
2868
2869        db.set_domain_separator(None, DomainSeparator::Channel, Default::default())
2870            .await?;
2871
2872        add_peer_mappings(
2873            &db,
2874            vec![
2875                (ALICE_OFFCHAIN.clone(), ALICE.clone()),
2876                (BOB_OFFCHAIN.clone(), BOB.clone()),
2877            ],
2878        )
2879        .await?;
2880
2881        db.upsert_channel(None, channel).await?;
2882
2883        Ok(db)
2884    }
2885
2886    #[tokio::test]
2887    async fn test_aggregate_ticket_should_aggregate() -> anyhow::Result<()> {
2888        const COUNT_TICKETS: usize = 5;
2889
2890        let channel = ChannelEntry::new(
2891            BOB.public().to_address(),
2892            ALICE.public().to_address(),
2893            u32::MAX.into(),
2894            (COUNT_TICKETS + 1).into(),
2895            ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(120))),
2896            4_u32.into(),
2897        );
2898
2899        let db = init_db_with_channel(channel).await?;
2900
2901        let tickets = (0..COUNT_TICKETS)
2902            .map(|i| {
2903                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
2904                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
2905            })
2906            .collect::<anyhow::Result<Vec<_>>>()?;
2907
2908        let sum_value = tickets.iter().fold(HoprBalance::zero(), |acc, x| acc + x.ticket.amount);
2909        let min_idx = tickets
2910            .iter()
2911            .map(|t| t.ticket.index)
2912            .min()
2913            .context("min index should be present")?;
2914        let max_idx = tickets
2915            .iter()
2916            .map(|t| t.ticket.index)
2917            .max()
2918            .context("max index should be present")?;
2919
2920        let aggregated = db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets, &BOB).await?;
2921
2922        assert_eq!(
2923            &BOB.public().to_address(),
2924            aggregated.verified_issuer(),
2925            "must have correct signer"
2926        );
2927
2928        assert!(aggregated.verified_ticket().is_aggregated(), "must be aggregated");
2929
2930        assert_eq!(
2931            COUNT_TICKETS,
2932            aggregated.verified_ticket().index_offset as usize,
2933            "aggregated ticket must have correct offset"
2934        );
2935        assert_eq!(
2936            sum_value,
2937            aggregated.verified_ticket().amount,
2938            "aggregated ticket token amount must be sum of individual tickets"
2939        );
2940        assert_eq!(
2941            1.0,
2942            aggregated.win_prob(),
2943            "aggregated ticket must have winning probability 1"
2944        );
2945        assert_eq!(
2946            min_idx,
2947            aggregated.verified_ticket().index,
2948            "aggregated ticket must have correct index"
2949        );
2950        assert_eq!(
2951            channel.get_id(),
2952            aggregated.verified_ticket().channel_id,
2953            "aggregated ticket must have correct channel id"
2954        );
2955        assert_eq!(
2956            channel.channel_epoch.as_u32(),
2957            aggregated.verified_ticket().channel_epoch,
2958            "aggregated ticket must have correct channel epoch"
2959        );
2960
2961        assert_eq!(
2962            max_idx + 1,
2963            db.get_outgoing_ticket_index(channel.get_id())
2964                .await?
2965                .load(Ordering::SeqCst)
2966        );
2967
2968        Ok(())
2969    }
2970
2971    #[tokio::test]
2972    async fn test_aggregate_ticket_should_aggregate_including_aggregated() -> anyhow::Result<()> {
2973        const COUNT_TICKETS: usize = 5;
2974
2975        let channel = ChannelEntry::new(
2976            BOB.public().to_address(),
2977            ALICE.public().to_address(),
2978            u32::MAX.into(),
2979            (COUNT_TICKETS + 1).into(),
2980            ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(120))),
2981            4_u32.into(),
2982        );
2983
2984        let db = init_db_with_channel(channel).await?;
2985
2986        let offset = 10_usize;
2987
2988        let mut tickets = (1..COUNT_TICKETS)
2989            .map(|i| {
2990                generate_random_ack_ticket(&BOB, &ALICE, (i + offset) as u64, 1, 1.0)
2991                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
2992            })
2993            .collect::<anyhow::Result<Vec<_>>>()?;
2994
2995        // Add an aggregated ticket to the set too
2996        tickets.push(
2997            generate_random_ack_ticket(&BOB, &ALICE, 0, offset as u32, 1.0)
2998                .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))?,
2999        );
3000
3001        let sum_value = tickets.iter().fold(HoprBalance::zero(), |acc, x| acc + x.ticket.amount);
3002        let min_idx = tickets
3003            .iter()
3004            .map(|t| t.ticket.index)
3005            .min()
3006            .context("min index should be present")?;
3007        let max_idx = tickets
3008            .iter()
3009            .map(|t| t.ticket.index)
3010            .max()
3011            .context("max index should be present")?;
3012
3013        let aggregated = db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets, &BOB).await?;
3014
3015        assert_eq!(
3016            &BOB.public().to_address(),
3017            aggregated.verified_issuer(),
3018            "must have correct signer"
3019        );
3020
3021        assert!(aggregated.verified_ticket().is_aggregated(), "must be aggregated");
3022
3023        assert_eq!(
3024            COUNT_TICKETS + offset,
3025            aggregated.verified_ticket().index_offset as usize,
3026            "aggregated ticket must have correct offset"
3027        );
3028        assert_eq!(
3029            sum_value,
3030            aggregated.verified_ticket().amount,
3031            "aggregated ticket token amount must be sum of individual tickets"
3032        );
3033        assert_eq!(
3034            1.0,
3035            aggregated.win_prob(),
3036            "aggregated ticket must have winning probability 1"
3037        );
3038        assert_eq!(
3039            min_idx,
3040            aggregated.verified_ticket().index,
3041            "aggregated ticket must have correct index"
3042        );
3043        assert_eq!(
3044            channel.get_id(),
3045            aggregated.verified_ticket().channel_id,
3046            "aggregated ticket must have correct channel id"
3047        );
3048        assert_eq!(
3049            channel.channel_epoch.as_u32(),
3050            aggregated.verified_ticket().channel_epoch,
3051            "aggregated ticket must have correct channel epoch"
3052        );
3053
3054        assert_eq!(
3055            max_idx + 1,
3056            db.get_outgoing_ticket_index(channel.get_id())
3057                .await?
3058                .load(Ordering::SeqCst)
3059        );
3060
3061        Ok(())
3062    }
3063
3064    #[tokio::test]
3065    async fn test_aggregate_ticket_should_not_aggregate_zero_tickets() -> anyhow::Result<()> {
3066        let db = HoprDb::new_in_memory(BOB.clone()).await?;
3067
3068        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), vec![], &BOB)
3069            .await
3070            .expect_err("should not aggregate empty ticket list");
3071
3072        Ok(())
3073    }
3074
3075    #[tokio::test]
3076    async fn test_aggregate_ticket_should_aggregate_single_ticket_to_itself() -> anyhow::Result<()> {
3077        const COUNT_TICKETS: usize = 1;
3078
3079        let channel = ChannelEntry::new(
3080            BOB.public().to_address(),
3081            ALICE.public().to_address(),
3082            u32::MAX.into(),
3083            (COUNT_TICKETS + 1).into(),
3084            ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(120))),
3085            4_u32.into(),
3086        );
3087
3088        let db = init_db_with_channel(channel).await?;
3089
3090        let mut tickets = (0..COUNT_TICKETS)
3091            .map(|i| {
3092                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3093                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3094            })
3095            .collect::<anyhow::Result<Vec<_>>>()?;
3096
3097        let aggregated = db
3098            .aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3099            .await?;
3100
3101        assert_eq!(
3102            &tickets.pop().context("ticket should be present")?.ticket,
3103            aggregated.verified_ticket()
3104        );
3105
3106        Ok(())
3107    }
3108
3109    #[tokio::test]
3110    async fn test_aggregate_ticket_should_not_aggregate_on_closed_channel() -> anyhow::Result<()> {
3111        const COUNT_TICKETS: usize = 3;
3112
3113        let channel = ChannelEntry::new(
3114            BOB.public().to_address(),
3115            ALICE.public().to_address(),
3116            u32::MAX.into(),
3117            (COUNT_TICKETS + 1).into(),
3118            ChannelStatus::Closed,
3119            4_u32.into(),
3120        );
3121
3122        let db = init_db_with_channel(channel).await?;
3123
3124        let tickets = (0..COUNT_TICKETS)
3125            .map(|i| {
3126                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3127                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3128            })
3129            .collect::<anyhow::Result<Vec<_>>>()?;
3130
3131        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3132            .await
3133            .expect_err("should not aggregate on closed channel");
3134
3135        Ok(())
3136    }
3137
3138    #[tokio::test]
3139    async fn test_aggregate_ticket_should_not_aggregate_on_incoming_channel() -> anyhow::Result<()> {
3140        const COUNT_TICKETS: usize = 3;
3141
3142        let channel = ChannelEntry::new(
3143            ALICE.public().to_address(),
3144            BOB.public().to_address(),
3145            u32::MAX.into(),
3146            (COUNT_TICKETS + 1).into(),
3147            ChannelStatus::Open,
3148            4_u32.into(),
3149        );
3150
3151        let db = init_db_with_channel(channel).await?;
3152
3153        let tickets = (0..COUNT_TICKETS)
3154            .map(|i| {
3155                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3156                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3157            })
3158            .collect::<anyhow::Result<Vec<_>>>()?;
3159
3160        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3161            .await
3162            .expect_err("should not aggregate on incoming channel");
3163
3164        Ok(())
3165    }
3166
3167    #[tokio::test]
3168    async fn test_aggregate_ticket_should_not_aggregate_if_mismatching_channel_ids() -> anyhow::Result<()> {
3169        const COUNT_TICKETS: usize = 3;
3170
3171        let channel = ChannelEntry::new(
3172            ALICE.public().to_address(),
3173            BOB.public().to_address(),
3174            u32::MAX.into(),
3175            (COUNT_TICKETS + 1).into(),
3176            ChannelStatus::Open,
3177            4_u32.into(),
3178        );
3179
3180        let db = init_db_with_channel(channel).await?;
3181
3182        let mut tickets = (0..COUNT_TICKETS)
3183            .map(|i| {
3184                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3185                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3186            })
3187            .collect::<anyhow::Result<Vec<_>>>()?;
3188
3189        tickets[2] = generate_random_ack_ticket(&BOB, &ChainKeypair::random(), 2, 1, 1.0)
3190            .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))?;
3191
3192        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3193            .await
3194            .expect_err("should not aggregate on mismatching channel ids");
3195
3196        Ok(())
3197    }
3198
3199    #[tokio::test]
3200    async fn test_aggregate_ticket_should_not_aggregate_if_mismatching_channel_epoch() -> anyhow::Result<()> {
3201        const COUNT_TICKETS: usize = 3;
3202
3203        let channel = ChannelEntry::new(
3204            ALICE.public().to_address(),
3205            BOB.public().to_address(),
3206            100.into(),
3207            (COUNT_TICKETS + 1).into(),
3208            ChannelStatus::Open,
3209            3_u32.into(),
3210        );
3211
3212        let db = init_db_with_channel(channel).await?;
3213
3214        let tickets = (0..COUNT_TICKETS)
3215            .map(|i| {
3216                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3217                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3218            })
3219            .collect::<anyhow::Result<Vec<_>>>()?;
3220
3221        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3222            .await
3223            .expect_err("should not aggregate on mismatching channel epoch");
3224
3225        Ok(())
3226    }
3227
3228    #[tokio::test]
3229    async fn test_aggregate_ticket_should_not_aggregate_if_ticket_indices_overlap() -> anyhow::Result<()> {
3230        const COUNT_TICKETS: usize = 3;
3231
3232        let channel = ChannelEntry::new(
3233            ALICE.public().to_address(),
3234            BOB.public().to_address(),
3235            u32::MAX.into(),
3236            (COUNT_TICKETS + 1).into(),
3237            ChannelStatus::Open,
3238            3_u32.into(),
3239        );
3240
3241        let db = init_db_with_channel(channel).await?;
3242
3243        let mut tickets = (0..COUNT_TICKETS)
3244            .map(|i| {
3245                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3246                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3247            })
3248            .collect::<anyhow::Result<Vec<_>>>()?;
3249
3250        tickets[1] = generate_random_ack_ticket(&BOB, &ALICE, 1, 2, 1.0)
3251            .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))?;
3252
3253        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3254            .await
3255            .expect_err("should not aggregate on overlapping ticket indices");
3256        Ok(())
3257    }
3258
3259    #[tokio::test]
3260    async fn test_aggregate_ticket_should_not_aggregate_if_ticket_is_not_valid() -> anyhow::Result<()> {
3261        const COUNT_TICKETS: usize = 3;
3262
3263        let channel = ChannelEntry::new(
3264            ALICE.public().to_address(),
3265            BOB.public().to_address(),
3266            u32::MAX.into(),
3267            (COUNT_TICKETS + 1).into(),
3268            ChannelStatus::Open,
3269            3_u32.into(),
3270        );
3271
3272        let db = init_db_with_channel(channel).await?;
3273
3274        let mut tickets = (0..COUNT_TICKETS)
3275            .map(|i| {
3276                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3277                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3278            })
3279            .collect::<anyhow::Result<Vec<_>>>()?;
3280
3281        // Modify the ticket and do not sign it
3282        tickets[1].ticket.amount = (TICKET_VALUE - 10).into();
3283
3284        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3285            .await
3286            .expect_err("should not aggregate on invalid tickets");
3287
3288        Ok(())
3289    }
3290
3291    #[tokio::test]
3292    async fn test_aggregate_ticket_should_not_aggregate_if_ticket_has_lower_than_min_win_prob() -> anyhow::Result<()> {
3293        const COUNT_TICKETS: usize = 3;
3294
3295        let channel = ChannelEntry::new(
3296            ALICE.public().to_address(),
3297            BOB.public().to_address(),
3298            u32::MAX.into(),
3299            (COUNT_TICKETS + 1).into(),
3300            ChannelStatus::Open,
3301            3_u32.into(),
3302        );
3303
3304        let db = init_db_with_channel(channel).await?;
3305
3306        let tickets = (0..COUNT_TICKETS)
3307            .map(|i| {
3308                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, if i > 0 { 1.0 } else { 0.9 })
3309                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3310            })
3311            .collect::<anyhow::Result<Vec<_>>>()?;
3312
3313        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3314            .await
3315            .expect_err("should not aggregate tickets with less than minimum win prob");
3316
3317        Ok(())
3318    }
3319
3320    #[tokio::test]
3321    async fn test_aggregate_ticket_should_not_aggregate_if_ticket_is_not_winning() -> anyhow::Result<()> {
3322        const COUNT_TICKETS: usize = 3;
3323
3324        let channel = ChannelEntry::new(
3325            ALICE.public().to_address(),
3326            BOB.public().to_address(),
3327            u32::MAX.into(),
3328            (COUNT_TICKETS + 1).into(),
3329            ChannelStatus::Open,
3330            3_u32.into(),
3331        );
3332
3333        let db = init_db_with_channel(channel).await?;
3334
3335        let mut tickets = (0..COUNT_TICKETS)
3336            .map(|i| {
3337                generate_random_ack_ticket(&BOB, &ALICE, i as u64, 1, 1.0)
3338                    .and_then(|v| Ok(v.into_transferable(&ALICE, &Hash::default())?))
3339            })
3340            .collect::<anyhow::Result<Vec<_>>>()?;
3341
3342        // Set the winning probability to zero and sign the ticket again
3343        let resp = Response::from_half_keys(&HalfKey::random(), &HalfKey::random())?;
3344        tickets[1] = TicketBuilder::from(tickets[1].ticket.clone())
3345            .win_prob(0.0.try_into()?)
3346            .challenge(resp.to_challenge().into())
3347            .build_signed(&BOB, &Hash::default())?
3348            .into_acknowledged(resp)
3349            .into_transferable(&ALICE, &Hash::default())?;
3350
3351        db.aggregate_tickets(*ALICE_OFFCHAIN.public(), tickets.clone(), &BOB)
3352            .await
3353            .expect_err("should not aggregate non-winning tickets");
3354
3355        Ok(())
3356    }
3357
3358    #[tokio::test]
3359    async fn test_set_ticket_statistics_when_tickets_are_in_db() -> anyhow::Result<()> {
3360        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
3361
3362        let ticket = init_db_with_tickets(&db, 1).await?.1.pop().unwrap();
3363
3364        db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed)
3365            .await
3366            .expect("must not fail");
3367
3368        let stats = db.get_ticket_statistics(None).await.expect("must not fail");
3369        assert_ne!(stats.redeemed_value, HoprBalance::zero());
3370
3371        db.reset_ticket_statistics().await.expect("must not fail");
3372
3373        let stats = db.get_ticket_statistics(None).await.expect("must not fail");
3374        assert_eq!(stats.redeemed_value, HoprBalance::zero());
3375
3376        Ok(())
3377    }
3378
3379    #[tokio::test]
3380    async fn test_fix_channels_ticket_state() -> anyhow::Result<()> {
3381        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
3382        const COUNT_TICKETS: u64 = 1;
3383
3384        let (..) = init_db_with_tickets(&db, COUNT_TICKETS).await?;
3385
3386        // mark the first ticket as being redeemed
3387        let mut ticket = hopr_db_entity::ticket::Entity::find()
3388            .one(&db.tickets_db)
3389            .await?
3390            .context("should have one active model")?
3391            .into_active_model();
3392        ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
3393        ticket.save(&db.tickets_db).await?;
3394
3395        assert!(
3396            hopr_db_entity::ticket::Entity::find()
3397                .one(&db.tickets_db)
3398                .await?
3399                .context("should have one active model")?
3400                .state
3401                == AcknowledgedTicketStatus::BeingRedeemed as i8,
3402        );
3403
3404        db.fix_channels_next_ticket_state().await.expect("must not fail");
3405
3406        assert!(
3407            hopr_db_entity::ticket::Entity::find()
3408                .one(&db.tickets_db)
3409                .await?
3410                .context("should have one active model")?
3411                .state
3412                == AcknowledgedTicketStatus::Untouched as i8,
3413        );
3414
3415        Ok(())
3416    }
3417
3418    #[tokio::test]
3419    async fn test_dont_fix_correct_channels_ticket_state() -> anyhow::Result<()> {
3420        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
3421        const COUNT_TICKETS: u64 = 2;
3422
3423        // we set up the channel to have ticket index 1, and ensure that fix does not trigger
3424        let (..) = init_db_with_tickets_and_channel(&db, COUNT_TICKETS, Some(1u32)).await?;
3425
3426        // mark the first ticket as being redeemed
3427        let mut ticket = hopr_db_entity::ticket::Entity::find()
3428            .filter(ticket::Column::Index.eq(0u64.to_be_bytes().to_vec()))
3429            .one(&db.tickets_db)
3430            .await?
3431            .context("should have one active model")?
3432            .into_active_model();
3433        ticket.state = Set(AcknowledgedTicketStatus::BeingRedeemed as i8);
3434        ticket.save(&db.tickets_db).await?;
3435
3436        assert!(
3437            hopr_db_entity::ticket::Entity::find()
3438                .filter(ticket::Column::Index.eq(0u64.to_be_bytes().to_vec()))
3439                .one(&db.tickets_db)
3440                .await?
3441                .context("should have one active model")?
3442                .state
3443                == AcknowledgedTicketStatus::BeingRedeemed as i8,
3444        );
3445
3446        db.fix_channels_next_ticket_state().await.expect("must not fail");
3447
3448        // first ticket should still be in BeingRedeemed state
3449        let ticket0 = hopr_db_entity::ticket::Entity::find()
3450            .filter(ticket::Column::Index.eq(0u64.to_be_bytes().to_vec()))
3451            .one(&db.tickets_db)
3452            .await?
3453            .context("should have one active model")?;
3454        assert_eq!(ticket0.state, AcknowledgedTicketStatus::BeingRedeemed as i8);
3455
3456        // second ticket should be in Untouched state
3457        let ticket1 = hopr_db_entity::ticket::Entity::find()
3458            .filter(ticket::Column::Index.eq(1u64.to_be_bytes().to_vec()))
3459            .one(&db.tickets_db)
3460            .await?
3461            .context("should have one active model")?;
3462        assert_eq!(ticket1.state, AcknowledgedTicketStatus::Untouched as i8);
3463
3464        Ok(())
3465    }
3466}