hopr_db_node/
tickets.rs

1use std::{collections::HashMap, ops::Bound};
2
3use async_stream::stream;
4use async_trait::async_trait;
5use futures::{StreamExt, TryStreamExt, stream::BoxStream};
6use hopr_api::db::*;
7use hopr_db_entity::{outgoing_ticket_index, ticket, ticket_statistics};
8use hopr_internal_types::prelude::*;
9use hopr_primitive_types::prelude::*;
10use sea_orm::{
11    ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QueryOrder, QuerySelect, Set,
12    TransactionTrait,
13};
14use sea_query::{Condition, Expr, ExprTrait, IntoCondition, Order};
15use tracing::{debug, error, info, trace};
16
17use crate::{db::HoprNodeDb, errors::NodeDbError};
18
19#[cfg(all(feature = "prometheus", not(test)))]
20lazy_static::lazy_static! {
21    pub static ref METRIC_HOPR_TICKETS_INCOMING_STATISTICS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
22        "hopr_tickets_incoming_statistics",
23        "Ticket statistics for channels with incoming tickets.",
24        &["statistic"]
25    ).unwrap();
26}
27
28/// The type is necessary solely to allow
29/// implementing the [`IntoCondition`] trait for the [`TicketSelector`]
30/// from the `hopr_api` crate.
31#[derive(Clone)]
32pub(crate) struct WrappedTicketSelector(pub(crate) TicketSelector);
33
34impl From<TicketSelector> for WrappedTicketSelector {
35    fn from(selector: TicketSelector) -> Self {
36        Self(selector)
37    }
38}
39
40impl AsRef<TicketSelector> for WrappedTicketSelector {
41    fn as_ref(&self) -> &TicketSelector {
42        &self.0
43    }
44}
45
46impl IntoCondition for WrappedTicketSelector {
47    fn into_condition(self) -> Condition {
48        let (channel_id, epoch) = self.0.channel_identifier;
49
50        let mut expr = ticket::Column::ChannelId
51            .eq(hex::encode(channel_id))
52            .and(ticket::Column::ChannelEpoch.eq(epoch.to_be_bytes().to_vec()));
53
54        match self.0.index {
55            TicketIndexSelector::None => {
56                // This will always be the case if there were multiple channel identifiers
57            }
58            TicketIndexSelector::Single(idx) => expr = expr.and(ticket::Column::Index.eq(idx.to_be_bytes().to_vec())),
59            TicketIndexSelector::Multiple(idxs) => {
60                expr = expr.and(ticket::Column::Index.is_in(idxs.into_iter().map(|i| i.to_be_bytes().to_vec())));
61            }
62            TicketIndexSelector::Range((lb, ub)) => {
63                expr = match lb {
64                    Bound::Included(gte) => expr.and(ticket::Column::Index.gte(gte.to_be_bytes().to_vec())),
65                    Bound::Excluded(gt) => expr.and(ticket::Column::Index.gt(gt.to_be_bytes().to_vec())),
66                    Bound::Unbounded => expr,
67                };
68                expr = match ub {
69                    Bound::Included(lte) => expr.and(ticket::Column::Index.lte(lte.to_be_bytes().to_vec())),
70                    Bound::Excluded(lt) => expr.and(ticket::Column::Index.lt(lt.to_be_bytes().to_vec())),
71                    Bound::Unbounded => expr,
72                };
73            }
74        }
75
76        if let Some(state) = self.0.state {
77            expr = expr.and(ticket::Column::State.eq(state as u8))
78        }
79
80        // Win prob lower bound
81        expr = match self.0.win_prob.0 {
82            Bound::Included(gte) => expr.and(ticket::Column::WinningProbability.gte(gte.as_encoded().to_vec())),
83            Bound::Excluded(gt) => expr.and(ticket::Column::WinningProbability.gt(gt.as_encoded().to_vec())),
84            Bound::Unbounded => expr,
85        };
86
87        // Win prob upper bound
88        expr = match self.0.win_prob.1 {
89            Bound::Included(lte) => expr.and(ticket::Column::WinningProbability.lte(lte.as_encoded().to_vec())),
90            Bound::Excluded(lt) => expr.and(ticket::Column::WinningProbability.lt(lt.as_encoded().to_vec())),
91            Bound::Unbounded => expr,
92        };
93
94        // Amount lower bound
95        expr = match self.0.amount.0 {
96            Bound::Included(gte) => expr.and(ticket::Column::Amount.gte(gte.amount().to_be_bytes().to_vec())),
97            Bound::Excluded(gt) => expr.and(ticket::Column::Amount.gt(gt.amount().to_be_bytes().to_vec())),
98            Bound::Unbounded => expr,
99        };
100
101        // Amount upper bound
102        expr = match self.0.amount.1 {
103            Bound::Included(lte) => expr.and(ticket::Column::Amount.lte(lte.amount().to_be_bytes().to_vec())),
104            Bound::Excluded(lt) => expr.and(ticket::Column::Amount.lt(lt.amount().to_be_bytes().to_vec())),
105            Bound::Unbounded => expr,
106        };
107
108        expr.into_condition()
109    }
110}
111
112/// Returns a condition satisfied if any of the given selectors is satisfied.
113pub(crate) fn any_selector<I: IntoIterator<Item = S>, S: Into<TicketSelector>>(selectors: I) -> Condition {
114    selectors
115        .into_iter()
116        .map(|s| WrappedTicketSelector(s.into()).into_condition())
117        .reduce(|a, b| a.or(b).into_condition())
118        .unwrap_or(Condition::all())
119}
120
121pub(crate) async fn find_stats_for_channel(
122    tx: &sea_orm::DatabaseTransaction,
123    channel_id: &ChannelId,
124) -> Result<ticket_statistics::Model, NodeDbError> {
125    if let Some(model) = ticket_statistics::Entity::find()
126        .filter(ticket_statistics::Column::ChannelId.eq(hex::encode(channel_id)))
127        .one(tx)
128        .await?
129    {
130        Ok(model)
131    } else {
132        let new_stats = ticket_statistics::ActiveModel {
133            channel_id: Set(hex::encode(channel_id)),
134            ..Default::default()
135        }
136        .insert(tx)
137        .await?;
138
139        Ok(new_stats)
140    }
141}
142
143pub(crate) async fn get_tickets_value_int(
144    tx: &impl TransactionTrait,
145    selector: TicketSelector,
146) -> Result<(usize, HoprBalance), NodeDbError> {
147    let selector: WrappedTicketSelector = selector.into();
148    Ok(tx
149        .transaction(|tx| {
150            Box::pin(async move {
151                ticket::Entity::find()
152                    .filter(selector)
153                    .stream(tx)
154                    .await?
155                    .try_fold((0_usize, HoprBalance::zero()), |(count, value), t| async move {
156                        Ok((count + 1, value + HoprBalance::from_be_bytes(t.amount)))
157                    })
158                    .await
159            })
160        })
161        .await?)
162}
163
164#[async_trait]
165impl HoprDbTicketOperations for HoprNodeDb {
166    type Error = NodeDbError;
167
168    async fn stream_tickets<'c, S: Into<TicketSelector>, I: IntoIterator<Item = S> + Send>(
169        &'c self,
170        selectors: I,
171    ) -> Result<BoxStream<'c, RedeemableTicket>, Self::Error> {
172        let qry = ticket::Entity::find().filter(any_selector(selectors));
173
174        Ok(qry
175            .order_by(ticket::Column::ChannelId, Order::Asc)
176            .order_by(ticket::Column::ChannelEpoch, Order::Asc)
177            .order_by(ticket::Column::Index, Order::Asc)
178            .stream(&self.tickets_db)
179            .await?
180            .and_then(|model| {
181                futures::future::ready(
182                    RedeemableTicket::try_from(model).map_err(|e| sea_orm::DbErr::Custom(e.to_string())),
183                )
184            })
185            .filter_map(|ticket| {
186                futures::future::ready(ticket.inspect_err(|error| error!(%error, "invalid ticket in db")).ok())
187            })
188            .boxed())
189    }
190
191    async fn insert_ticket(&self, ticket: RedeemableTicket) -> Result<(), Self::Error> {
192        let _lock = self.tickets_write_lock.lock().await;
193        let unrealized_value = self.unrealized_value.clone();
194        self.tickets_db
195            .transaction(|tx| {
196                Box::pin(async move {
197                    // Insertion of a new acknowledged ticket
198                    ticket::ActiveModel::from(ticket).insert(tx).await?;
199
200                    // Update the ticket winning count in the statistics
201                    let model = find_stats_for_channel(tx, ticket.ticket.channel_id()).await?;
202
203                    let winning_tickets = model.winning_tickets + 1;
204                    let mut active_model = model.into_active_model();
205                    active_model.winning_tickets = sea_orm::Set(winning_tickets);
206                    active_model.save(tx).await?;
207
208                    #[cfg(all(feature = "prometheus", not(test)))]
209                    {
210                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
211                            &["unredeemed"],
212                            get_tickets_value_int(tx, TicketSelector::from(&ticket).only_channel())
213                                .await?
214                                .1
215                                .amount()
216                                .as_u128() as f64,
217                        );
218                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.increment(&["winning_count"], 1.0f64);
219                    }
220
221                    // Increase the unrealized value in the corresponding channel
222                    unrealized_value
223                        .entry((*ticket.ticket.channel_id(), ticket.verified_ticket().channel_epoch))
224                        .and_compute_with(|value| match value {
225                            Some(value) => futures::future::ready(moka::ops::compute::Op::Put(
226                                value.into_value() + ticket.verified_ticket().amount,
227                            )),
228                            None => {
229                                futures::future::ready(moka::ops::compute::Op::Put(ticket.verified_ticket().amount))
230                            }
231                        })
232                        .await;
233
234                    Ok::<_, NodeDbError>(())
235                })
236            })
237            .await?;
238
239        Ok(())
240    }
241
242    async fn mark_tickets_as<S: Into<TicketSelector> + Send, I: IntoIterator<Item = S> + Send>(
243        &self,
244        selectors: I,
245        mark_as: TicketMarker,
246    ) -> Result<usize, Self::Error> {
247        let selectors = selectors.into_iter().map(|s| s.into()).collect::<Vec<_>>();
248        let unrealized_value = self.unrealized_value.clone();
249        let _lock = self.tickets_write_lock.lock().await;
250
251        let (total_count, marked_values) = self
252            .tickets_db
253            .transaction(|tx| {
254                Box::pin(async move {
255                    let mut total_marked_count = 0;
256                    let mut marked_values = Vec::new();
257                    for channel_selector in selectors {
258                        // Get the number of tickets and their value just for this channel
259                        let (marked_count, marked_value) =
260                            get_tickets_value_int(tx, channel_selector.clone()).await?;
261                        trace!(marked_count, ?marked_value, ?mark_as, "ticket marking");
262
263                        if marked_count > 0 {
264                            // Delete the redeemed tickets first
265                            let deleted = ticket::Entity::delete_many()
266                                .filter(WrappedTicketSelector::from(channel_selector.clone()))
267                                .exec(tx)
268                                .await?;
269
270                            // Update the stats if successful
271                            if deleted.rows_affected == marked_count as u64 {
272                                let mut new_stats = find_stats_for_channel(tx, &channel_selector.channel_identifier.0)
273                                    .await?
274                                    .into_active_model();
275
276                                let _current_value = match mark_as {
277                                    TicketMarker::Redeemed => {
278                                        let current_value = U256::from_be_bytes(new_stats.redeemed_value.as_ref());
279                                        new_stats.redeemed_value =
280                                            Set((current_value + marked_value.amount()).to_be_bytes().into());
281                                        current_value
282                                    }
283                                    TicketMarker::Rejected => {
284                                        let current_value = U256::from_be_bytes(new_stats.rejected_value.as_ref());
285                                        new_stats.rejected_value =
286                                            Set((current_value + marked_value.amount()).to_be_bytes().into());
287                                        current_value
288                                    }
289                                    TicketMarker::Neglected => {
290                                        let current_value = U256::from_be_bytes(new_stats.neglected_value.as_ref());
291                                        new_stats.neglected_value =
292                                            Set((current_value + marked_value.amount()).to_be_bytes().into());
293                                        current_value
294                                    }
295                                };
296                                new_stats.save(tx).await?;
297
298                                #[cfg(all(feature = "prometheus", not(test)))]
299                                {
300                                    METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
301                                        &[&mark_as.to_string()],
302                                        (_current_value + marked_value.amount()).as_u128() as f64,
303                                    );
304
305                                    // Tickets that are counted as rejected were never counted as unredeemed,
306                                    // so skip the metric subtraction in that case.
307                                    if mark_as != TicketMarker::Rejected {
308                                        let unredeemed_value = get_tickets_value_int(tx, channel_selector.clone())
309                                            .await
310                                            .map(|(_, value)| value)
311                                            .unwrap_or_default();
312
313                                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
314                                            &["unredeemed"],
315                                            (unredeemed_value - marked_value.amount()).amount().as_u128() as f64,
316                                        );
317                                    }
318                                }
319                            } else {
320                                return Err(NodeDbError::LogicalError(format!(
321                                    "could not mark {marked_count} ticket as {mark_as}"
322                                )));
323                            }
324
325                            trace!(marked_count, channel_id = ?channel_selector.channel_identifier.0, ?mark_as, "removed tickets in channel");
326                            total_marked_count += marked_count;
327                            marked_values.push((channel_selector.channel_identifier.0, channel_selector.channel_identifier.1, marked_value));
328                        }
329                    }
330
331                    info!(count = total_marked_count, ?mark_as, "removed tickets in channels",);
332                    Ok((total_marked_count, marked_values))
333                })
334            })
335            .await?;
336
337        // Decrease the unrealized value in each channel.
338        // This must happen no matter if tickets have been neglected, rejected or redeemed.
339        for (channel_id, epoch, removed_amount) in marked_values {
340            unrealized_value
341                .entry((channel_id, epoch))
342                .and_compute_with(|value| match value {
343                    Some(value) => {
344                        futures::future::ready(moka::ops::compute::Op::Put(value.into_value() - removed_amount))
345                    }
346                    None => futures::future::ready(moka::ops::compute::Op::Nop),
347                })
348                .await;
349        }
350
351        Ok(total_count)
352    }
353
354    async fn mark_unsaved_ticket_rejected(&self, issuer: &Address, ticket: &Ticket) -> Result<(), NodeDbError> {
355        let channel_id = generate_channel_id(issuer, &ticket.counterparty);
356        let amount = ticket.amount;
357        let _lock = self.tickets_write_lock.lock().await;
358
359        Ok(self
360            .tickets_db
361            .transaction(|tx| {
362                Box::pin(async move {
363                    let stats = find_stats_for_channel(tx, &channel_id).await?;
364
365                    let current_rejected_value = U256::from_be_bytes(stats.rejected_value.clone());
366
367                    let mut active_stats = stats.into_active_model();
368                    active_stats.rejected_value = Set((current_rejected_value + amount.amount()).to_be_bytes().into());
369                    active_stats.save(tx).await?;
370
371                    #[cfg(all(feature = "prometheus", not(test)))]
372                    {
373                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
374                            &["rejected"],
375                            (current_rejected_value + amount.amount()).as_u128() as f64,
376                        );
377                    }
378
379                    Ok::<(), NodeDbError>(())
380                })
381            })
382            .await?)
383    }
384
385    async fn update_ticket_states_and_fetch<'a, S: Into<TicketSelector>, I: IntoIterator<Item = S> + Send>(
386        &'a self,
387        selectors: I,
388        new_state: AcknowledgedTicketStatus,
389    ) -> Result<BoxStream<'a, RedeemableTicket>, Self::Error> {
390        let selector = any_selector(selectors);
391        Ok(Box::pin(stream! {
392            // The stream holds the write lock until it is consumed
393            let _lock = self.tickets_write_lock.lock().await;
394
395            match ticket::Entity::find()
396                .filter(selector)
397                .order_by(ticket::Column::ChannelId, Order::Asc)
398                .order_by(ticket::Column::ChannelEpoch, Order::Asc)
399                .order_by(ticket::Column::Index, Order::Asc)
400                .stream(&self.tickets_db)
401                .await {
402                Ok(mut stream) => {
403                    while let Ok(Some(ticket)) = stream.try_next().await {
404                        let active_ticket = ticket::ActiveModel {
405                            id: Set(ticket.id),
406                            state: Set(new_state as i8),
407                            ..Default::default()
408                        };
409
410                        {
411                            if let Err(error) = active_ticket.update(&self.tickets_db).await {
412                                error!(%error, "failed to update ticket in the db");
413                            }
414                        }
415
416                        match RedeemableTicket::try_from(ticket) {
417                            Ok(ticket) => {
418                                yield ticket
419                            },
420                            Err(error) => {
421                                tracing::error!(%error, "failed to decode ticket from the db");
422                            }
423                        }
424                    }
425                },
426                Err(error) => tracing::error!(%error, "failed open ticket db stream")
427            }
428        }))
429    }
430
431    async fn update_ticket_states<S: Into<TicketSelector>, I: IntoIterator<Item = S> + Send>(
432        &self,
433        selectors: I,
434        new_state: AcknowledgedTicketStatus,
435    ) -> Result<usize, Self::Error> {
436        let selector = any_selector(selectors);
437        let _lock = self.tickets_write_lock.lock().await;
438        Ok(self
439            .tickets_db
440            .transaction(|tx| {
441                Box::pin(async move {
442                    ticket::Entity::update_many()
443                        .filter(selector)
444                        .col_expr(ticket::Column::State, Expr::value(new_state as i8))
445                        .exec(tx)
446                        .await
447                        .map(|update| update.rows_affected as usize)
448                })
449            })
450            .await?)
451    }
452
453    async fn get_ticket_statistics(
454        &self,
455        channel_id: Option<ChannelId>,
456    ) -> Result<ChannelTicketStatistics, NodeDbError> {
457        let res = match channel_id {
458            None => {
459                self.tickets_db
460                    .transaction(|tx| {
461                        Box::pin(async move {
462                            let unredeemed_value = ticket::Entity::find()
463                                .stream(tx)
464                                .await?
465                                .try_fold(U256::zero(), |amount, x| {
466                                    let unredeemed_value = U256::from_be_bytes(x.amount);
467                                    futures::future::ok(amount + unredeemed_value)
468                                })
469                                .await?;
470
471                            #[cfg(all(feature = "prometheus", not(test)))]
472                            METRIC_HOPR_TICKETS_INCOMING_STATISTICS
473                                .set(&["unredeemed"], unredeemed_value.as_u128() as f64);
474
475                            let mut all_stats = ticket_statistics::Entity::find().all(tx).await?.into_iter().fold(
476                                ChannelTicketStatistics::default(),
477                                |mut acc, stats| {
478                                    let neglected_value = HoprBalance::from_be_bytes(stats.neglected_value);
479                                    acc.finalized_values
480                                        .entry(TicketMarker::Neglected)
481                                        .and_modify(|b| *b += neglected_value)
482                                        .or_insert(neglected_value);
483                                    let redeemed_value = HoprBalance::from_be_bytes(stats.redeemed_value);
484                                    acc.finalized_values
485                                        .entry(TicketMarker::Redeemed)
486                                        .and_modify(|b| *b += redeemed_value)
487                                        .or_insert(redeemed_value);
488                                    let rejected_value = HoprBalance::from_be_bytes(stats.rejected_value);
489                                    acc.finalized_values
490                                        .entry(TicketMarker::Rejected)
491                                        .and_modify(|b| *b += rejected_value)
492                                        .or_insert(rejected_value);
493                                    acc.winning_tickets += stats.winning_tickets as u128;
494                                    acc
495                                },
496                            );
497
498                            all_stats.unredeemed_value = unredeemed_value.into();
499
500                            #[cfg(all(feature = "prometheus", not(test)))]
501                            {
502                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
503                                    &["neglected"],
504                                    all_stats
505                                        .finalized_values
506                                        .get(&TicketMarker::Neglected)
507                                        .copied()
508                                        .unwrap_or_default()
509                                        .amount()
510                                        .as_u128() as f64,
511                                );
512                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
513                                    &["redeemed"],
514                                    all_stats
515                                        .finalized_values
516                                        .get(&TicketMarker::Redeemed)
517                                        .copied()
518                                        .unwrap_or_default()
519                                        .amount()
520                                        .as_u128() as f64,
521                                );
522                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
523                                    &["rejected"],
524                                    all_stats
525                                        .finalized_values
526                                        .get(&TicketMarker::Rejected)
527                                        .copied()
528                                        .unwrap_or_default()
529                                        .amount()
530                                        .as_u128() as f64,
531                                );
532                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS
533                                    .set(&["winning_tickets"], all_stats.winning_tickets as f64);
534                            }
535                            Ok::<_, NodeDbError>(all_stats)
536                        })
537                    })
538                    .await
539            }
540            Some(channel) => {
541                let _lock = self.tickets_write_lock.lock().await;
542
543                self.tickets_db
544                    .transaction(|tx| {
545                        Box::pin(async move {
546                            let stats = find_stats_for_channel(tx, &channel).await?;
547                            let unredeemed_value = ticket::Entity::find()
548                                .filter(ticket::Column::ChannelId.eq(hex::encode(channel)))
549                                .stream(tx)
550                                .await?
551                                .try_fold(U256::zero(), |amount, x| {
552                                    futures::future::ok(amount + U256::from_be_bytes(x.amount))
553                                })
554                                .await?;
555
556                            Ok::<_, NodeDbError>(ChannelTicketStatistics {
557                                winning_tickets: stats.winning_tickets as u128,
558                                unredeemed_value: unredeemed_value.into(),
559                                finalized_values: HashMap::from_iter([
560                                    (
561                                        TicketMarker::Neglected,
562                                        HoprBalance::from_be_bytes(stats.neglected_value),
563                                    ),
564                                    (TicketMarker::Redeemed, HoprBalance::from_be_bytes(stats.redeemed_value)),
565                                    (TicketMarker::Rejected, HoprBalance::from_be_bytes(stats.rejected_value)),
566                                ]),
567                            })
568                        })
569                    })
570                    .await
571            }
572        };
573        debug!(stats = ?res, "retrieved ticket statistics");
574        Ok(res?)
575    }
576
577    async fn reset_ticket_statistics(&self) -> Result<(), NodeDbError> {
578        let _lock = self.tickets_write_lock.lock().await;
579
580        Ok(self
581            .tickets_db
582            .transaction(|tx| {
583                Box::pin(async move {
584                    // delete statistics for the found rows
585                    let deleted = ticket_statistics::Entity::delete_many().exec(tx).await?;
586
587                    #[cfg(all(feature = "prometheus", not(test)))]
588                    {
589                        if deleted.rows_affected > 0 {
590                            METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&["neglected"], 0.0_f64);
591                            METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&["redeemed"], 0.0_f64);
592                            METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&["rejected"], 0.0_f64);
593                        }
594                    }
595
596                    debug!("reset ticket statistics for {:} channel(s)", deleted.rows_affected);
597
598                    Ok::<_, sea_orm::DbErr>(())
599                })
600            })
601            .await?)
602    }
603
604    async fn get_tickets_value(&self, id: &ChannelId, epoch: u32) -> Result<HoprBalance, NodeDbError> {
605        Ok(self
606            .unrealized_value
607            .try_get_with((*id, epoch), async {
608                get_tickets_value_int(&self.tickets_db, TicketSelector::new(*id, epoch))
609                    .await
610                    .map(|(_, value)| value)
611            })
612            .await?)
613    }
614
615    async fn get_or_create_outgoing_ticket_index(
616        &self,
617        channel_id: &ChannelId,
618        epoch: u32,
619    ) -> Result<Option<u64>, Self::Error> {
620        let _lock = self.tickets_write_lock.lock().await;
621        let channel_id = hex::encode(channel_id);
622
623        Ok(self
624            .tickets_db
625            .transaction(|tx| {
626                Box::pin(async move {
627                    // Delete all older epochs
628                    outgoing_ticket_index::Entity::delete_many()
629                        .filter(
630                            outgoing_ticket_index::Column::ChannelId
631                                .eq(&channel_id)
632                                .and(outgoing_ticket_index::Column::Epoch.lt(epoch)),
633                        )
634                        .exec(tx)
635                        .await?;
636
637                    // Check if there are no newer epochs for this channel
638                    let newer_epoch = outgoing_ticket_index::Entity::find()
639                        .filter(
640                            outgoing_ticket_index::Column::ChannelId
641                                .eq(&channel_id)
642                                .and(outgoing_ticket_index::Column::Epoch.gt(epoch)),
643                        )
644                        .limit(1)
645                        .one(tx)
646                        .await?;
647
648                    if let Some(newer_epoch) = newer_epoch {
649                        return Err(NodeDbError::LogicalError(format!(
650                            "attempted to get or insert outgoing index for older epoch {epoch} < {} in channel \
651                             {channel_id}",
652                            newer_epoch.epoch
653                        )));
654                    }
655
656                    // Look if this epoch already has an existing index
657                    let maybe_index = outgoing_ticket_index::Entity::find()
658                        .filter(
659                            outgoing_ticket_index::Column::ChannelId
660                                .eq(&channel_id)
661                                .and(outgoing_ticket_index::Column::Epoch.eq(epoch)),
662                        )
663                        .order_by_asc(outgoing_ticket_index::Column::Epoch)
664                        .one(tx)
665                        .await?;
666
667                    Ok(match maybe_index {
668                        Some(model) => Some(u64::from_be_bytes(model.index.try_into().map_err(|_| {
669                            NodeDbError::LogicalError(format!(
670                                "could not convert outgoing ticket index to u64 for channel {channel_id}"
671                            ))
672                        })?)),
673                        None => {
674                            outgoing_ticket_index::ActiveModel {
675                                channel_id: Set(channel_id),
676                                epoch: Set(epoch as i32),
677                                ..Default::default()
678                            }
679                            .insert(tx)
680                            .await?;
681
682                            None
683                        }
684                    })
685                })
686            })
687            .await?)
688    }
689
690    async fn update_outgoing_ticket_index(
691        &self,
692        channel_id: &ChannelId,
693        epoch: u32,
694        index: u64,
695    ) -> Result<(), Self::Error> {
696        let _lock = self.tickets_write_lock.lock().await;
697        let channel_id = hex::encode(channel_id);
698        Ok(self
699            .tickets_db
700            .transaction(|tx| {
701                Box::pin(async move {
702                    let maybe_index = outgoing_ticket_index::Entity::find()
703                        .filter(
704                            outgoing_ticket_index::Column::ChannelId
705                                .eq(&channel_id)
706                                .and(outgoing_ticket_index::Column::Epoch.eq(epoch)),
707                        )
708                        .one(tx)
709                        .await?;
710
711                    if let Some(model) = maybe_index {
712                        let current_index = U256::from_be_bytes(model.index.as_slice()).as_u64();
713                        if current_index > index {
714                            return Err(NodeDbError::LogicalError(format!(
715                                "cannot set outgoing ticket index to {index} for channel {channel_id} - index is \
716                                 already {current_index}"
717                            )));
718                        }
719
720                        // Save us the writing, if the indices are equal.
721                        if current_index != index {
722                            let mut active_model = model.into_active_model();
723                            active_model.index = Set(index.to_be_bytes().to_vec());
724                            active_model.save(tx).await?;
725
726                            tracing::debug!(%channel_id, %epoch, %index, "updated outgoing ticket index");
727                        }
728                    } else {
729                        tracing::debug!("ignoring attempt to update outgoing ticket index for non-existing entry");
730                    }
731
732                    Ok::<_, NodeDbError>(())
733                })
734            })
735            .await?)
736    }
737
738    async fn remove_outgoing_ticket_index(&self, channel_id: &ChannelId, epoch: u32) -> Result<(), Self::Error> {
739        let _lock = self.tickets_write_lock.lock().await;
740        let res = outgoing_ticket_index::Entity::delete_many()
741            .filter(
742                outgoing_ticket_index::Column::ChannelId
743                    .eq(hex::encode(channel_id))
744                    .and(outgoing_ticket_index::Column::Epoch.eq(epoch)),
745            )
746            .exec(&self.tickets_db)
747            .await?;
748
749        if res.rows_affected > 0 {
750            tracing::debug!(%channel_id, %epoch, "removed outgoing ticket index");
751        } else {
752            tracing::warn!(%channel_id, %epoch, "outgoing ticket index not found");
753        }
754
755        Ok(())
756    }
757}
758
759#[cfg(test)]
760impl HoprNodeDb {
761    pub(crate) async fn upsert_ticket(&self, acknowledged_ticket: RedeemableTicket) -> Result<(), NodeDbError> {
762        let _lock = self.tickets_write_lock.lock().await;
763
764        self.tickets_db
765            .transaction(|tx| {
766                Box::pin(async move {
767                    // For upserting, we must select only by the triplet (channel id, epoch, index)
768                    let selector = WrappedTicketSelector::from(TicketSelector::from(&acknowledged_ticket));
769
770                    debug!(%acknowledged_ticket, "upserting ticket");
771                    let mut model = ticket::ActiveModel::from(acknowledged_ticket);
772
773                    if let Some(ticket) = ticket::Entity::find().filter(selector).one(tx).await? {
774                        model.id = Set(ticket.id);
775                    }
776
777                    model.save(tx).await
778                })
779            })
780            .await?;
781        Ok(())
782    }
783}
784
785#[cfg(test)]
786mod tests {
787    use anyhow::Context;
788    use futures::StreamExt;
789    use hex_literal::hex;
790    use hopr_api::db::{ChannelTicketStatistics, TicketMarker};
791    use hopr_crypto_random::Randomizable;
792    use hopr_crypto_types::prelude::*;
793    use hopr_internal_types::prelude::*;
794    use hopr_primitive_types::prelude::*;
795
796    use crate::{
797        db::HoprNodeDb,
798        tickets::{HoprDbTicketOperations, TicketSelector},
799    };
800
801    lazy_static::lazy_static! {
802        static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
803        static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
804        static ref CHANNEL_ID: Hash = generate_channel_id(BOB.public().as_ref(), ALICE.public().as_ref());
805    }
806
807    lazy_static::lazy_static! {
808        static ref ALICE_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
809        static ref BOB_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
810    }
811
812    const TICKET_VALUE: u64 = 100_000;
813    const CHANNEL_EPOCH: u32 = 4;
814
815    fn generate_random_ack_ticket(
816        src: &ChainKeypair,
817        dst: &ChainKeypair,
818        index: u64,
819        _index_offset: u32,
820        win_prob: f64,
821    ) -> anyhow::Result<RedeemableTicket> {
822        let hk1 = HalfKey::random();
823        let hk2 = HalfKey::random();
824        let challenge = Response::from_half_keys(&hk1, &hk2)?.to_challenge()?;
825
826        Ok(TicketBuilder::default()
827            .counterparty(dst)
828            .amount(TICKET_VALUE)
829            .index(index)
830            .win_prob(win_prob.try_into()?)
831            .channel_epoch(CHANNEL_EPOCH)
832            .challenge(challenge)
833            .build_signed(src, &Hash::default())?
834            .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?)
835            .into_redeemable(dst, &Hash::default())?)
836    }
837
838    async fn init_db_with_tickets(db: &HoprNodeDb, count_tickets: u64) -> anyhow::Result<Vec<RedeemableTicket>> {
839        let tickets: Vec<RedeemableTicket> = (0..count_tickets)
840            .map(|i| generate_random_ack_ticket(&BOB, &ALICE, i, 1, 1.0))
841            .collect::<anyhow::Result<Vec<RedeemableTicket>>>()?;
842
843        for t in &tickets {
844            db.upsert_ticket(t.clone()).await?;
845        }
846
847        Ok(tickets)
848    }
849
850    #[test_log::test(tokio::test)]
851    async fn test_insert_get_ticket() -> anyhow::Result<()> {
852        let db = HoprNodeDb::new_in_memory().await?;
853
854        let mut tickets = init_db_with_tickets(&db, 1).await?;
855        let ack_ticket = tickets.pop().context("ticket should be present")?;
856
857        assert_eq!(*CHANNEL_ID, *ack_ticket.ticket.channel_id(), "channel ids must match");
858        assert_eq!(
859            CHANNEL_EPOCH,
860            ack_ticket.verified_ticket().channel_epoch,
861            "epochs must match"
862        );
863
864        let db_ticket = db
865            .stream_tickets([&ack_ticket])
866            .await?
867            .collect::<Vec<_>>()
868            .await
869            .first()
870            .cloned()
871            .context("ticket should exist 1")?;
872
873        assert_eq!(ack_ticket, db_ticket, "tickets must be equal");
874
875        let db_ticket = db
876            .stream_tickets(None::<TicketSelector>)
877            .await?
878            .collect::<Vec<_>>()
879            .await
880            .first()
881            .cloned()
882            .context("ticket should exist 2")?;
883
884        assert_eq!(ack_ticket, db_ticket, "tickets must be equal");
885
886        Ok(())
887    }
888
889    #[tokio::test]
890    async fn test_mark_redeemed() -> anyhow::Result<()> {
891        let db = HoprNodeDb::new_in_memory().await?;
892        const COUNT_TICKETS: u64 = 10;
893
894        let tickets = init_db_with_tickets(&db, COUNT_TICKETS).await?;
895
896        let stats = db.get_ticket_statistics(None).await?;
897        assert_eq!(
898            HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
899            stats.unredeemed_value,
900            "unredeemed balance must match"
901        );
902        assert_eq!(
903            HoprBalance::zero(),
904            stats.redeemed_value(),
905            "there must be 0 redeemed value"
906        );
907
908        assert_eq!(
909            stats,
910            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
911            "per channel stats must be same"
912        );
913
914        const TO_REDEEM: u64 = 2;
915        for ticket in tickets.iter().take(TO_REDEEM as usize) {
916            let r = db.mark_tickets_as([ticket], TicketMarker::Redeemed).await?;
917            assert_eq!(1, r, "must redeem only a single ticket");
918        }
919
920        let stats = db.get_ticket_statistics(None).await?;
921        assert_eq!(
922            HoprBalance::from(TICKET_VALUE * (COUNT_TICKETS - TO_REDEEM)),
923            stats.unredeemed_value,
924            "unredeemed balance must match"
925        );
926        assert_eq!(
927            HoprBalance::from(TICKET_VALUE * TO_REDEEM),
928            stats.redeemed_value(),
929            "there must be a redeemed value"
930        );
931
932        assert_eq!(
933            stats,
934            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
935            "per channel stats must be same"
936        );
937
938        Ok(())
939    }
940
941    #[tokio::test]
942    async fn test_mark_redeem_should_not_mark_redeem_twice() -> anyhow::Result<()> {
943        let db = HoprNodeDb::new_in_memory().await?;
944
945        let ticket = init_db_with_tickets(&db, 1)
946            .await?
947            .pop()
948            .context("should contain a ticket")?;
949
950        db.mark_tickets_as([&ticket], TicketMarker::Redeemed).await?;
951        assert_eq!(0, db.mark_tickets_as([&ticket], TicketMarker::Redeemed).await?);
952
953        Ok(())
954    }
955
956    #[tokio::test]
957    async fn test_mark_redeem_should_redeem_all_tickets() -> anyhow::Result<()> {
958        let db = HoprNodeDb::new_in_memory().await?;
959
960        let count_tickets = 10;
961        init_db_with_tickets(&db, count_tickets).await?;
962
963        let count_marked = db
964            .mark_tickets_as(
965                [TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH)],
966                TicketMarker::Redeemed,
967            )
968            .await?;
969        assert_eq!(count_tickets, count_marked as u64, "must mark all tickets in channel");
970
971        Ok(())
972    }
973
974    #[tokio::test]
975    async fn test_mark_tickets_neglected() -> anyhow::Result<()> {
976        let db = HoprNodeDb::new_in_memory().await?;
977        const COUNT_TICKETS: u64 = 10;
978
979        init_db_with_tickets(&db, COUNT_TICKETS).await?;
980
981        let stats = db.get_ticket_statistics(None).await?;
982        assert_eq!(
983            HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
984            stats.unredeemed_value,
985            "unredeemed balance must match"
986        );
987        assert_eq!(
988            HoprBalance::zero(),
989            stats.neglected_value(),
990            "there must be 0 redeemed value"
991        );
992
993        assert_eq!(
994            stats,
995            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
996            "per channel stats must be same"
997        );
998
999        db.mark_tickets_as(
1000            [TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH)],
1001            TicketMarker::Neglected,
1002        )
1003        .await?;
1004
1005        let stats = db.get_ticket_statistics(None).await?;
1006        assert_eq!(
1007            HoprBalance::zero(),
1008            stats.unredeemed_value,
1009            "unredeemed balance must be zero"
1010        );
1011        assert_eq!(
1012            HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
1013            stats.neglected_value(),
1014            "there must be a neglected value"
1015        );
1016
1017        assert_eq!(
1018            stats,
1019            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1020            "per channel stats must be same"
1021        );
1022
1023        Ok(())
1024    }
1025
1026    #[test_log::test(tokio::test)]
1027    async fn test_mark_unsaved_ticket_rejected() -> anyhow::Result<()> {
1028        let db = HoprNodeDb::new_in_memory().await?;
1029
1030        let mut ticket = init_db_with_tickets(&db, 1).await?;
1031        let ticket = ticket.pop().context("ticket should be present")?.ticket;
1032
1033        let stats = db.get_ticket_statistics(None).await?;
1034        assert_eq!(HoprBalance::zero(), stats.rejected_value());
1035        assert_eq!(
1036            stats,
1037            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1038            "per channel stats must be same"
1039        );
1040
1041        tracing::debug!("marking ticket as rejected");
1042        db.mark_unsaved_ticket_rejected(BOB.public().as_ref(), ticket.verified_ticket())
1043            .await?;
1044
1045        let stats = db.get_ticket_statistics(None).await?;
1046        assert_eq!(ticket.verified_ticket().amount, stats.rejected_value());
1047        assert_eq!(
1048            stats,
1049            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1050            "per channel stats must be same"
1051        );
1052
1053        Ok(())
1054    }
1055
1056    #[tokio::test]
1057    async fn test_update_tickets_states_and_fetch() -> anyhow::Result<()> {
1058        let db = HoprNodeDb::new_in_memory().await?;
1059
1060        init_db_with_tickets(&db, 10).await?;
1061
1062        let selector = TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH).with_index(5);
1063
1064        let v: Vec<RedeemableTicket> = db
1065            .update_ticket_states_and_fetch([selector], AcknowledgedTicketStatus::BeingRedeemed)
1066            .await?
1067            .collect()
1068            .await;
1069
1070        assert_eq!(1, v.len(), "single ticket must be updated");
1071
1072        let selector = TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH).with_state(AcknowledgedTicketStatus::Untouched);
1073
1074        let v: Vec<RedeemableTicket> = db
1075            .update_ticket_states_and_fetch([selector], AcknowledgedTicketStatus::BeingRedeemed)
1076            .await?
1077            .collect()
1078            .await;
1079
1080        assert_eq!(9, v.len(), "only specific tickets must have state set");
1081        assert!(
1082            v.iter().all(|t| t.verified_ticket().index != 5),
1083            "only tickets with different state must update"
1084        );
1085
1086        Ok(())
1087    }
1088
1089    #[tokio::test]
1090    async fn test_update_tickets_states() -> anyhow::Result<()> {
1091        let db = HoprNodeDb::new_in_memory().await?;
1092
1093        init_db_with_tickets(&db, 10).await?;
1094        let selector = TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH).with_state(AcknowledgedTicketStatus::Untouched);
1095
1096        db.update_ticket_states([selector.clone()], AcknowledgedTicketStatus::BeingRedeemed)
1097            .await?;
1098
1099        let v: Vec<RedeemableTicket> = db
1100            .update_ticket_states_and_fetch([selector], AcknowledgedTicketStatus::BeingRedeemed)
1101            .await?
1102            .collect()
1103            .await;
1104
1105        assert!(v.is_empty(), "must not update if already updated");
1106
1107        Ok(())
1108    }
1109
1110    #[tokio::test]
1111    async fn test_outgoing_ticket_index_should_be_zero_if_not_yet_present() -> anyhow::Result<()> {
1112        let db = HoprNodeDb::new_in_memory().await?;
1113
1114        let hash = Hash::default();
1115
1116        let idx = db.get_or_create_outgoing_ticket_index(&hash, 1).await?;
1117        assert_eq!(None, idx, "initial index must be None");
1118
1119        let idx = db.get_or_create_outgoing_ticket_index(&hash, 1).await?;
1120        assert_eq!(Some(0), idx, "index must be zero");
1121
1122        Ok(())
1123    }
1124
1125    #[test_log::test(tokio::test)]
1126    async fn test_outgoing_ticket_index_should_not_allow_old_epochs() -> anyhow::Result<()> {
1127        let db = HoprNodeDb::new_in_memory().await?;
1128
1129        let hash = Hash::default();
1130
1131        db.get_or_create_outgoing_ticket_index(&hash, 1).await?;
1132        db.get_or_create_outgoing_ticket_index(&hash, 2).await?;
1133
1134        assert!(db.get_or_create_outgoing_ticket_index(&hash, 1).await.is_err());
1135
1136        Ok(())
1137    }
1138
1139    #[tokio::test]
1140    async fn test_outgoing_ticket_index_should_be_updated_if_already_present() -> anyhow::Result<()> {
1141        let db = HoprNodeDb::new_in_memory().await?;
1142
1143        let hash = Hash::default();
1144
1145        db.get_or_create_outgoing_ticket_index(&hash, 1).await?;
1146        db.update_outgoing_ticket_index(&hash, 1, 2).await?;
1147
1148        assert_eq!(Some(2), db.get_or_create_outgoing_ticket_index(&hash, 1).await?);
1149
1150        Ok(())
1151    }
1152
1153    #[tokio::test]
1154    async fn test_outgoing_ticket_index_should_not_be_updated_if_not_present() -> anyhow::Result<()> {
1155        let db = HoprNodeDb::new_in_memory().await?;
1156
1157        let hash = Hash::default();
1158
1159        db.update_outgoing_ticket_index(&hash, 1, 2).await?;
1160
1161        assert_eq!(None, db.get_or_create_outgoing_ticket_index(&hash, 1).await?);
1162
1163        Ok(())
1164    }
1165
1166    #[tokio::test]
1167    async fn test_outgoing_ticket_index_should_not_be_updated_if_lower() -> anyhow::Result<()> {
1168        let db = HoprNodeDb::new_in_memory().await?;
1169
1170        let hash = Hash::default();
1171
1172        db.get_or_create_outgoing_ticket_index(&hash, 1).await?;
1173        db.update_outgoing_ticket_index(&hash, 1, 2).await?;
1174
1175        assert_eq!(Some(2), db.get_or_create_outgoing_ticket_index(&hash, 1).await?);
1176
1177        assert!(db.update_outgoing_ticket_index(&hash, 1, 1).await.is_err());
1178
1179        assert_eq!(Some(2), db.get_or_create_outgoing_ticket_index(&hash, 1).await?);
1180
1181        Ok(())
1182    }
1183
1184    #[tokio::test]
1185    async fn test_outgoing_ticket_index_should_be_removed() -> anyhow::Result<()> {
1186        let db = HoprNodeDb::new_in_memory().await?;
1187
1188        let hash = Hash::default();
1189
1190        let idx = db.get_or_create_outgoing_ticket_index(&hash, 1).await?;
1191        assert!(idx.is_none());
1192
1193        let idx = db.get_or_create_outgoing_ticket_index(&hash, 1).await?;
1194        assert_eq!(Some(0), idx);
1195
1196        db.remove_outgoing_ticket_index(&hash, 1).await?;
1197
1198        let idx = db.get_or_create_outgoing_ticket_index(&hash, 1).await?;
1199        assert!(idx.is_none());
1200
1201        Ok(())
1202    }
1203
1204    #[test]
1205    fn test_ticket_stats_default_must_be_zero() -> anyhow::Result<()> {
1206        let stats = ChannelTicketStatistics::default();
1207        assert_eq!(stats.unredeemed_value, HoprBalance::zero());
1208        assert_eq!(stats.redeemed_value(), HoprBalance::zero());
1209        assert_eq!(stats.neglected_value(), HoprBalance::zero());
1210        assert_eq!(stats.rejected_value(), HoprBalance::zero());
1211        assert_eq!(stats.winning_tickets, 0);
1212
1213        Ok(())
1214    }
1215
1216    #[tokio::test]
1217    async fn test_ticket_stats_must_be_zero_for_non_existing_channel() -> anyhow::Result<()> {
1218        let db = HoprNodeDb::new_in_memory().await?;
1219
1220        let stats = db.get_ticket_statistics(Some(*CHANNEL_ID)).await?;
1221
1222        assert_eq!(stats, ChannelTicketStatistics::default());
1223
1224        Ok(())
1225    }
1226
1227    #[tokio::test]
1228    async fn test_ticket_stats_must_be_zero_when_no_tickets() -> anyhow::Result<()> {
1229        let db = HoprNodeDb::new_in_memory().await?;
1230
1231        let stats = db.get_ticket_statistics(Some(*CHANNEL_ID)).await?;
1232
1233        assert_eq!(
1234            ChannelTicketStatistics::default(),
1235            stats,
1236            "must be equal to default which is all zeros"
1237        );
1238
1239        assert_eq!(
1240            stats,
1241            db.get_ticket_statistics(None).await?,
1242            "per-channel stats must be the same as global stats"
1243        );
1244
1245        Ok(())
1246    }
1247
1248    #[tokio::test]
1249    async fn test_ticket_stats_must_be_different_per_channel() -> anyhow::Result<()> {
1250        let db = HoprNodeDb::new_in_memory().await?;
1251
1252        let channel_1 = generate_channel_id(BOB.public().as_ref(), ALICE.public().as_ref());
1253        let channel_2 = generate_channel_id(ALICE.public().as_ref(), BOB.public().as_ref());
1254
1255        let t1 = generate_random_ack_ticket(&BOB, &ALICE, 1, 1, 1.0)?;
1256        let t2 = generate_random_ack_ticket(&ALICE, &BOB, 1, 1, 1.0)?;
1257
1258        let value = t1.verified_ticket().amount;
1259
1260        db.upsert_ticket(t1).await?;
1261        db.upsert_ticket(t2).await?;
1262
1263        let stats_1 = db.get_ticket_statistics(Some(channel_1)).await?;
1264
1265        let stats_2 = db.get_ticket_statistics(Some(channel_2)).await?;
1266
1267        assert_eq!(value, stats_1.unredeemed_value);
1268        assert_eq!(value, stats_2.unredeemed_value);
1269
1270        assert_eq!(HoprBalance::zero(), stats_1.neglected_value());
1271        assert_eq!(HoprBalance::zero(), stats_2.neglected_value());
1272
1273        assert_eq!(stats_1, stats_2);
1274
1275        db.mark_tickets_as([TicketSelector::new(channel_1, CHANNEL_EPOCH)], TicketMarker::Neglected)
1276            .await?;
1277
1278        let stats_1 = db.get_ticket_statistics(Some(channel_1)).await?;
1279
1280        let stats_2 = db.get_ticket_statistics(Some(channel_2)).await?;
1281
1282        assert_eq!(HoprBalance::zero(), stats_1.unredeemed_value);
1283        assert_eq!(value, stats_1.neglected_value());
1284
1285        assert_eq!(HoprBalance::zero(), stats_2.neglected_value());
1286
1287        Ok(())
1288    }
1289
1290    #[tokio::test]
1291    async fn test_set_ticket_statistics_when_tickets_are_in_db() -> anyhow::Result<()> {
1292        let db = HoprNodeDb::new_in_memory().await?;
1293
1294        let ticket = init_db_with_tickets(&db, 1).await?.pop().unwrap();
1295
1296        db.mark_tickets_as([&ticket], TicketMarker::Redeemed)
1297            .await
1298            .expect("must not fail");
1299
1300        let stats = db.get_ticket_statistics(None).await.expect("must not fail");
1301        assert_ne!(stats.redeemed_value(), HoprBalance::zero());
1302
1303        db.reset_ticket_statistics().await.expect("must not fail");
1304
1305        let stats = db.get_ticket_statistics(None).await.expect("must not fail");
1306        assert_eq!(stats.redeemed_value(), HoprBalance::zero());
1307
1308        Ok(())
1309    }
1310}