hopr_db_node/
tickets.rs

1use std::{
2    ops::Bound,
3    sync::{
4        Arc,
5        atomic::{AtomicU64, Ordering},
6    },
7};
8
9use async_stream::stream;
10use async_trait::async_trait;
11use futures::{StreamExt, TryStreamExt, stream::BoxStream};
12use hopr_api::db::*;
13use hopr_crypto_types::prelude::*;
14use hopr_db_entity::{outgoing_ticket_index, ticket, ticket_statistics};
15use hopr_internal_types::prelude::*;
16use hopr_primitive_types::prelude::*;
17use sea_orm::{
18    ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QueryOrder, Set, TransactionTrait,
19};
20use sea_query::{Condition, Expr, IntoCondition, Order, SimpleExpr};
21use tracing::{debug, error, info, trace};
22
23use crate::{db::HoprNodeDb, errors::NodeDbError};
24
25#[cfg(all(feature = "prometheus", not(test)))]
26lazy_static::lazy_static! {
27    pub static ref METRIC_HOPR_TICKETS_INCOMING_STATISTICS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
28        "hopr_tickets_incoming_statistics",
29        "Ticket statistics for channels with incoming tickets.",
30        &["statistic"]
31    ).unwrap();
32}
33
34/// The type is necessary solely to allow
35/// implementing the [`IntoCondition`] trait for [`TicketSelector`]
36/// from the `hopr_db_api` crate.
37#[derive(Clone)]
38pub(crate) struct WrappedTicketSelector(pub(crate) TicketSelector);
39
40impl From<TicketSelector> for WrappedTicketSelector {
41    fn from(selector: TicketSelector) -> Self {
42        Self(selector)
43    }
44}
45
46impl AsRef<TicketSelector> for WrappedTicketSelector {
47    fn as_ref(&self) -> &TicketSelector {
48        &self.0
49    }
50}
51
52impl IntoCondition for WrappedTicketSelector {
53    fn into_condition(self) -> Condition {
54        let expr = self
55            .0
56            .channel_identifiers
57            .into_iter()
58            .map(|(channel_id, epoch)| {
59                ticket::Column::ChannelId
60                    .eq(channel_id.to_hex())
61                    .and(ticket::Column::ChannelEpoch.eq(epoch.to_be_bytes().to_vec()))
62            })
63            .reduce(SimpleExpr::or);
64
65        // This cannot happen, but instead of panicking, return an impossible condition object
66        if expr.is_none() {
67            return Condition::any().not();
68        }
69
70        let mut expr = expr.unwrap();
71
72        match self.0.index {
73            TicketIndexSelector::None => {
74                // This will always be the case if there were multiple channel identifiers
75            }
76            TicketIndexSelector::Single(idx) => expr = expr.and(ticket::Column::Index.eq(idx.to_be_bytes().to_vec())),
77            TicketIndexSelector::Multiple(idxs) => {
78                expr = expr.and(ticket::Column::Index.is_in(idxs.into_iter().map(|i| i.to_be_bytes().to_vec())));
79            }
80            TicketIndexSelector::Range((lb, ub)) => {
81                expr = match lb {
82                    Bound::Included(gte) => expr.and(ticket::Column::Index.gte(gte.to_be_bytes().to_vec())),
83                    Bound::Excluded(gt) => expr.and(ticket::Column::Index.gt(gt.to_be_bytes().to_vec())),
84                    Bound::Unbounded => expr,
85                };
86                expr = match ub {
87                    Bound::Included(lte) => expr.and(ticket::Column::Index.lte(lte.to_be_bytes().to_vec())),
88                    Bound::Excluded(lt) => expr.and(ticket::Column::Index.lt(lt.to_be_bytes().to_vec())),
89                    Bound::Unbounded => expr,
90                };
91            }
92        }
93
94        if let Some(state) = self.0.state {
95            expr = expr.and(ticket::Column::State.eq(state as u8))
96        }
97
98        if self.0.only_aggregated {
99            expr = expr.and(ticket::Column::IndexOffset.gt(1));
100        }
101
102        // Win prob lower bound
103        expr = match self.0.win_prob.0 {
104            Bound::Included(gte) => expr.and(ticket::Column::WinningProbability.gte(gte.as_encoded().to_vec())),
105            Bound::Excluded(gt) => expr.and(ticket::Column::WinningProbability.gt(gt.as_encoded().to_vec())),
106            Bound::Unbounded => expr,
107        };
108
109        // Win prob upper bound
110        expr = match self.0.win_prob.1 {
111            Bound::Included(lte) => expr.and(ticket::Column::WinningProbability.lte(lte.as_encoded().to_vec())),
112            Bound::Excluded(lt) => expr.and(ticket::Column::WinningProbability.lt(lt.as_encoded().to_vec())),
113            Bound::Unbounded => expr,
114        };
115
116        // Amount lower bound
117        expr = match self.0.amount.0 {
118            Bound::Included(gte) => expr.and(ticket::Column::Amount.gte(gte.amount().to_be_bytes().to_vec())),
119            Bound::Excluded(gt) => expr.and(ticket::Column::Amount.gt(gt.amount().to_be_bytes().to_vec())),
120            Bound::Unbounded => expr,
121        };
122
123        // Amount upper bound
124        expr = match self.0.amount.1 {
125            Bound::Included(lte) => expr.and(ticket::Column::Amount.lte(lte.amount().to_be_bytes().to_vec())),
126            Bound::Excluded(lt) => expr.and(ticket::Column::Amount.lt(lt.amount().to_be_bytes().to_vec())),
127            Bound::Unbounded => expr,
128        };
129
130        expr.into_condition()
131    }
132}
133
134pub(crate) async fn find_stats_for_channel(
135    tx: &sea_orm::DatabaseTransaction,
136    channel_id: &Hash,
137) -> Result<ticket_statistics::Model, NodeDbError> {
138    if let Some(model) = ticket_statistics::Entity::find()
139        .filter(ticket_statistics::Column::ChannelId.eq(channel_id.to_hex()))
140        .one(tx)
141        .await?
142    {
143        Ok(model)
144    } else {
145        let new_stats = ticket_statistics::ActiveModel {
146            channel_id: Set(channel_id.to_hex()),
147            ..Default::default()
148        }
149        .insert(tx)
150        .await?;
151
152        Ok(new_stats)
153    }
154}
155
156#[async_trait]
157impl HoprDbTicketOperations for HoprNodeDb {
158    type Error = NodeDbError;
159
160    async fn stream_tickets<'c>(
161        &'c self,
162        selector: Option<TicketSelector>,
163    ) -> Result<BoxStream<'c, AcknowledgedTicket>, Self::Error> {
164        let qry = if let Some(selector) = selector.map(WrappedTicketSelector::from) {
165            ticket::Entity::find().filter(selector)
166        } else {
167            ticket::Entity::find()
168        };
169
170        Ok(qry
171            .order_by(ticket::Column::ChannelId, Order::Asc)
172            .order_by(ticket::Column::ChannelEpoch, Order::Asc)
173            .order_by(ticket::Column::Index, Order::Asc)
174            .stream(&self.tickets_db)
175            .await?
176            .and_then(|model| {
177                futures::future::ready(
178                    AcknowledgedTicket::try_from(model).map_err(|e| sea_orm::DbErr::Custom(e.to_string())),
179                )
180            })
181            .filter_map(|ticket| {
182                futures::future::ready(ticket.inspect_err(|error| error!(%error, "invalid ticket in db")).ok())
183            })
184            .boxed())
185    }
186
187    async fn mark_tickets_as(&self, selector: TicketSelector, mark_as: TicketMarker) -> Result<usize, NodeDbError> {
188        let myself = self.clone();
189        Ok(self
190            .ticket_manager
191            .write_transaction(|tx| {
192                Box::pin(async move {
193                    let mut total_marked_count = 0;
194                    for (channel_id, epoch) in selector.channel_identifiers.iter() {
195                        let channel_selector = selector.clone().just_on_channel(*channel_id, epoch);
196
197                        // Get the number of tickets and their value just for this channel
198                        let (marked_count, marked_value) =
199                            myself.get_tickets_value_int(tx, channel_selector.clone()).await?;
200                        trace!(marked_count, ?marked_value, ?mark_as, "ticket marking");
201
202                        if marked_count > 0 {
203                            // Delete the redeemed tickets first
204                            let deleted = ticket::Entity::delete_many()
205                                .filter(WrappedTicketSelector::from(channel_selector.clone()))
206                                .exec(tx)
207                                .await?;
208
209                            // Update the stats if successful
210                            if deleted.rows_affected == marked_count as u64 {
211                                let mut new_stats = find_stats_for_channel(tx, channel_id).await?.into_active_model();
212                                let _current_value = match mark_as {
213                                    TicketMarker::Redeemed => {
214                                        let current_value = U256::from_be_bytes(new_stats.redeemed_value.as_ref());
215                                        new_stats.redeemed_value =
216                                            Set((current_value + marked_value.amount()).to_be_bytes().into());
217                                        current_value
218                                    }
219                                    TicketMarker::Rejected => {
220                                        let current_value = U256::from_be_bytes(new_stats.rejected_value.as_ref());
221                                        new_stats.rejected_value =
222                                            Set((current_value + marked_value.amount()).to_be_bytes().into());
223                                        current_value
224                                    }
225                                    TicketMarker::Neglected => {
226                                        let current_value = U256::from_be_bytes(new_stats.neglected_value.as_ref());
227                                        new_stats.neglected_value =
228                                            Set((current_value + marked_value.amount()).to_be_bytes().into());
229                                        current_value
230                                    }
231                                };
232                                new_stats.save(tx).await?;
233
234                                #[cfg(all(feature = "prometheus", not(test)))]
235                                {
236                                    METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
237                                        &[&mark_as.to_string()],
238                                        (_current_value + marked_value.amount()).as_u128() as f64,
239                                    );
240
241                                    // Tickets that are counted as rejected were never counted as unredeemed,
242                                    // so skip the metric subtraction in that case.
243                                    if mark_as != TicketMarker::Rejected {
244                                        let unredeemed_value = myself
245                                            .caches
246                                            .unrealized_value
247                                            .get(&(*channel_id, *epoch))
248                                            .await
249                                            .unwrap_or_default();
250
251                                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
252                                            &["unredeemed"],
253                                            (unredeemed_value - marked_value.amount()).amount().as_u128() as f64,
254                                        );
255                                    }
256                                }
257
258                                myself.caches.unrealized_value.invalidate(&(*channel_id, *epoch)).await;
259                            } else {
260                                return Err(NodeDbError::LogicalError(format!(
261                                    "could not mark {marked_count} ticket as {mark_as}"
262                                )));
263                            }
264
265                            trace!(marked_count, ?channel_id, ?mark_as, "removed tickets in channel");
266                            total_marked_count += marked_count;
267                        }
268                    }
269
270                    info!(
271                        count = total_marked_count,
272                        ?mark_as,
273                        channel_count = selector.channel_identifiers.len(),
274                        "removed tickets in channels",
275                    );
276                    Ok(total_marked_count)
277                })
278            })
279            .await?)
280    }
281
282    async fn mark_unsaved_ticket_rejected(&self, ticket: &Ticket) -> Result<(), NodeDbError> {
283        let channel_id = ticket.channel_id;
284        let amount = ticket.amount;
285        Ok(self
286            .ticket_manager
287            .write_transaction(|tx| {
288                Box::pin(async move {
289                    let stats = find_stats_for_channel(tx, &channel_id).await?;
290
291                    let current_rejected_value = U256::from_be_bytes(stats.rejected_value.clone());
292
293                    let mut active_stats = stats.into_active_model();
294                    active_stats.rejected_value = Set((current_rejected_value + amount.amount()).to_be_bytes().into());
295                    active_stats.save(tx).await?;
296
297                    #[cfg(all(feature = "prometheus", not(test)))]
298                    {
299                        METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
300                            &["rejected"],
301                            (current_rejected_value + amount.amount()).as_u128() as f64,
302                        );
303                    }
304
305                    Ok::<(), NodeDbError>(())
306                })
307            })
308            .await?)
309    }
310
311    async fn update_ticket_states_and_fetch<'a>(
312        &'a self,
313        selector: TicketSelector,
314        new_state: AcknowledgedTicketStatus,
315    ) -> Result<BoxStream<'a, AcknowledgedTicket>, NodeDbError> {
316        let selector: WrappedTicketSelector = selector.into();
317        Ok(Box::pin(stream! {
318            match ticket::Entity::find()
319                .filter(selector)
320                .order_by(ticket::Column::ChannelId, Order::Asc)
321                .order_by(ticket::Column::ChannelEpoch, Order::Asc)
322                .order_by(ticket::Column::Index, Order::Asc)
323                .stream(&self.tickets_db)
324                .await {
325                Ok(mut stream) => {
326                    while let Ok(Some(ticket)) = stream.try_next().await {
327                        let active_ticket = ticket::ActiveModel {
328                            id: Set(ticket.id),
329                            state: Set(new_state as i8),
330                            ..Default::default()
331                        };
332
333                        {
334                            let _g = self.ticket_manager.mutex.lock().await;
335                            if let Err(error) = active_ticket.update(&self.tickets_db).await {
336                                error!(%error, "failed to update ticket in the db");
337                            }
338                        }
339
340                        match AcknowledgedTicket::try_from(ticket) {
341                            Ok(mut ticket) => {
342                                // Update the state manually, since we do not want to re-fetch the model after the update
343                                ticket.status = new_state;
344                                yield ticket
345                            },
346                            Err(error) => {
347                                tracing::error!(%error, "failed to decode ticket from the db");
348                            }
349                        }
350                    }
351                },
352                Err(error) => tracing::error!(%error, "failed open ticket db stream")
353            }
354        }))
355    }
356
357    async fn update_ticket_states(
358        &self,
359        selector: TicketSelector,
360        new_state: AcknowledgedTicketStatus,
361    ) -> Result<usize, NodeDbError> {
362        let selector: WrappedTicketSelector = selector.into();
363        Ok(self
364            .ticket_manager
365            .write_transaction(|tx| {
366                Box::pin(async move {
367                    ticket::Entity::update_many()
368                        .filter(selector)
369                        .col_expr(ticket::Column::State, Expr::value(new_state as i8))
370                        .exec(tx)
371                        .await
372                        .map(|update| update.rows_affected as usize)
373                })
374            })
375            .await?)
376    }
377
378    async fn get_ticket_statistics(&self, channel_id: Option<Hash>) -> Result<ChannelTicketStatistics, NodeDbError> {
379        let res = match channel_id {
380            None => {
381                self.tickets_db
382                    .transaction(|tx| {
383                        Box::pin(async move {
384                            let unredeemed_value = ticket::Entity::find()
385                                .stream(tx)
386                                .await?
387                                .try_fold(U256::zero(), |amount, x| {
388                                    let unredeemed_value = U256::from_be_bytes(x.amount);
389                                    futures::future::ok(amount + unredeemed_value)
390                                })
391                                .await?;
392
393                            #[cfg(all(feature = "prometheus", not(test)))]
394                            METRIC_HOPR_TICKETS_INCOMING_STATISTICS
395                                .set(&["unredeemed"], unredeemed_value.as_u128() as f64);
396
397                            let mut all_stats = ticket_statistics::Entity::find().all(tx).await?.into_iter().fold(
398                                ChannelTicketStatistics::default(),
399                                |mut acc, stats| {
400                                    let neglected_value = HoprBalance::from_be_bytes(stats.neglected_value);
401                                    acc.neglected_value += neglected_value;
402                                    let redeemed_value = HoprBalance::from_be_bytes(stats.redeemed_value);
403                                    acc.redeemed_value += redeemed_value;
404                                    let rejected_value = HoprBalance::from_be_bytes(stats.rejected_value);
405                                    acc.rejected_value += rejected_value;
406                                    acc.winning_tickets += stats.winning_tickets as u128;
407                                    acc
408                                },
409                            );
410
411                            all_stats.unredeemed_value = unredeemed_value.into();
412
413                            #[cfg(all(feature = "prometheus", not(test)))]
414                            {
415                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS
416                                    .set(&["neglected"], all_stats.neglected_value.amount().as_u128() as f64);
417                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS
418                                    .set(&["redeemed"], all_stats.redeemed_value.amount().as_u128() as f64);
419                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS
420                                    .set(&["rejected"], all_stats.rejected_value.amount().as_u128() as f64);
421                                METRIC_HOPR_TICKETS_INCOMING_STATISTICS
422                                    .set(&["winning_tickets"], all_stats.winning_tickets as f64);
423                            }
424                            Ok::<_, NodeDbError>(all_stats)
425                        })
426                    })
427                    .await
428            }
429            Some(channel) => {
430                self.tickets_db
431                    .transaction(|tx| {
432                        Box::pin(async move {
433                            let stats = find_stats_for_channel(tx, &channel).await?;
434                            let unredeemed_value = ticket::Entity::find()
435                                .filter(ticket::Column::ChannelId.eq(channel.to_hex()))
436                                .stream(tx)
437                                .await?
438                                .try_fold(U256::zero(), |amount, x| {
439                                    futures::future::ok(amount + U256::from_be_bytes(x.amount))
440                                })
441                                .await?;
442
443                            Ok::<_, NodeDbError>(ChannelTicketStatistics {
444                                winning_tickets: stats.winning_tickets as u128,
445                                neglected_value: HoprBalance::from_be_bytes(stats.neglected_value),
446                                redeemed_value: HoprBalance::from_be_bytes(stats.redeemed_value),
447                                unredeemed_value: unredeemed_value.into(),
448                                rejected_value: HoprBalance::from_be_bytes(stats.rejected_value),
449                            })
450                        })
451                    })
452                    .await
453            }
454        };
455        debug!(stats = ?res, "retrieved ticket statistics");
456        Ok(res?)
457    }
458
459    async fn reset_ticket_statistics(&self) -> Result<(), NodeDbError> {
460        Ok(self
461            .tickets_db
462            .transaction(|tx| {
463                Box::pin(async move {
464                    // delete statistics for the found rows
465                    let deleted = ticket_statistics::Entity::delete_many().exec(tx).await?;
466
467                    #[cfg(all(feature = "prometheus", not(test)))]
468                    {
469                        if deleted.rows_affected > 0 {
470                            METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&["neglected"], 0.0_f64);
471                            METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&["redeemed"], 0.0_f64);
472                            METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&["rejected"], 0.0_f64);
473                        }
474                    }
475
476                    debug!("reset ticket statistics for {:} channel(s)", deleted.rows_affected);
477
478                    Ok::<_, sea_orm::DbErr>(())
479                })
480            })
481            .await?)
482    }
483
484    async fn get_tickets_value(&self, selector: TicketSelector) -> Result<(usize, HoprBalance), NodeDbError> {
485        self.get_tickets_value_int(&self.tickets_db, selector).await
486    }
487
488    async fn compare_and_set_outgoing_ticket_index(&self, channel_id: Hash, index: u64) -> Result<u64, NodeDbError> {
489        let old_value = self
490            .get_outgoing_ticket_index(channel_id)
491            .await?
492            .fetch_max(index, Ordering::SeqCst);
493
494        // TODO: should we hint the persisting mechanism to trigger the flush?
495        // if old_value < index {}
496
497        Ok(old_value)
498    }
499
500    async fn reset_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64, NodeDbError> {
501        let old_value = self
502            .get_outgoing_ticket_index(channel_id)
503            .await?
504            .swap(0, Ordering::SeqCst);
505
506        // TODO: should we hint the persisting mechanism to trigger the flush?
507        // if old_value > 0 { }
508
509        Ok(old_value)
510    }
511
512    async fn increment_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64, NodeDbError> {
513        let old_value = self
514            .get_outgoing_ticket_index(channel_id)
515            .await?
516            .fetch_add(1, Ordering::SeqCst);
517
518        // TODO: should we hint the persisting mechanism to trigger the flush?
519
520        Ok(old_value)
521    }
522
523    async fn get_outgoing_ticket_index(&self, channel_id: Hash) -> Result<Arc<AtomicU64>, NodeDbError> {
524        let tkt_manager = self.ticket_manager.clone();
525
526        Ok(self
527            .caches
528            .ticket_index
529            .try_get_with(channel_id, async move {
530                let maybe_index = outgoing_ticket_index::Entity::find()
531                    .filter(outgoing_ticket_index::Column::ChannelId.eq(channel_id.to_hex()))
532                    .one(&tkt_manager.tickets_db)
533                    .await?;
534
535                Ok(Arc::new(AtomicU64::new(match maybe_index {
536                    Some(model) => U256::from_be_bytes(model.index).as_u64(),
537                    None => {
538                        tkt_manager
539                            .write_transaction(|tx| {
540                                Box::pin(async move {
541                                    outgoing_ticket_index::ActiveModel {
542                                        channel_id: Set(channel_id.to_hex()),
543                                        ..Default::default()
544                                    }
545                                    .insert(tx)
546                                    .await?;
547                                    Ok::<_, sea_orm::DbErr>(())
548                                })
549                            })
550                            .await?;
551                        0_u64
552                    }
553                })))
554            })
555            .await
556            .map_err(|e: Arc<NodeDbError>| {
557                NodeDbError::LogicalError(format!("failed to retrieve ticket index: {e}"))
558            })?)
559    }
560
561    async fn persist_outgoing_ticket_indices(&self) -> Result<usize, NodeDbError> {
562        let outgoing_indices = outgoing_ticket_index::Entity::find().all(&self.tickets_db).await?;
563
564        let mut updated = 0;
565        for index_model in outgoing_indices {
566            let channel_id = Hash::from_hex(&index_model.channel_id).map_err(NodeDbError::from)?;
567            let db_index = U256::from_be_bytes(&index_model.index).as_u64();
568            if let Some(cached_index) = self.caches.ticket_index.get(&channel_id).await {
569                // Note that the persisted value is always lagging behind the cache,
570                // so the fact that the cached index can change between this load
571                // storing it in the DB is allowed.
572                let cached_index = cached_index.load(Ordering::SeqCst);
573
574                // Store the ticket index in a separate write transaction
575                if cached_index > db_index {
576                    let mut index_active_model = index_model.into_active_model();
577                    index_active_model.index = Set(cached_index.to_be_bytes().to_vec());
578                    self.ticket_manager
579                        .write_transaction(|tx| {
580                            Box::pin(async move {
581                                index_active_model.save(tx).await?;
582                                Ok::<_, sea_orm::DbErr>(())
583                            })
584                        })
585                        .await?;
586
587                    debug!("updated ticket index in channel {channel_id} from {db_index} to {cached_index}");
588                    updated += 1;
589                }
590            } else {
591                // The value is not yet in the cache, meaning there's low traffic on this
592                // channel, so the value has not been yet fetched.
593                trace!(?channel_id, "channel not in cache yet");
594            }
595        }
596
597        Ok(Ok::<_, NodeDbError>(updated)?)
598    }
599}
600
601impl HoprNodeDb {
602    /// Used only by non-SQLite code and tests.
603    pub async fn upsert_ticket(&self, acknowledged_ticket: AcknowledgedTicket) -> Result<(), NodeDbError> {
604        self.tickets_db
605            .transaction(|tx| {
606                Box::pin(async move {
607                    // For upserting, we must select only by the triplet (channel id, epoch, index)
608                    let selector = WrappedTicketSelector::from(
609                        TicketSelector::new(
610                            acknowledged_ticket.verified_ticket().channel_id,
611                            acknowledged_ticket.verified_ticket().channel_epoch,
612                        )
613                        .with_index(acknowledged_ticket.verified_ticket().index),
614                    );
615
616                    debug!("upserting ticket {acknowledged_ticket}");
617                    let mut model = ticket::ActiveModel::from(acknowledged_ticket);
618
619                    if let Some(ticket) = ticket::Entity::find().filter(selector).one(tx).await? {
620                        model.id = Set(ticket.id);
621                    }
622
623                    model.save(tx).await
624                })
625            })
626            .await?;
627        Ok(())
628    }
629
630    async fn get_tickets_value_int(
631        &self,
632        tx: &impl TransactionTrait,
633        selector: TicketSelector,
634    ) -> Result<(usize, HoprBalance), NodeDbError> {
635        let selector: WrappedTicketSelector = selector.into();
636        Ok(tx
637            .transaction(|tx| {
638                Box::pin(async move {
639                    ticket::Entity::find()
640                        .filter(selector)
641                        .stream(tx)
642                        .await?
643                        .try_fold((0_usize, HoprBalance::zero()), |(count, value), t| async move {
644                            Ok((count + 1, value + HoprBalance::from_be_bytes(t.amount)))
645                        })
646                        .await
647                })
648            })
649            .await?)
650    }
651}
652
653#[cfg(test)]
654mod tests {
655    use std::sync::atomic::Ordering;
656
657    use anyhow::Context;
658    use futures::StreamExt;
659    use hex_literal::hex;
660    use hopr_api::db::{ChannelTicketStatistics, TicketMarker};
661    use hopr_crypto_random::Randomizable;
662    use hopr_crypto_types::prelude::*;
663    use hopr_internal_types::prelude::*;
664    use hopr_primitive_types::prelude::*;
665    use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
666
667    use crate::{
668        db::HoprNodeDb,
669        tickets::{HoprDbTicketOperations, TicketSelector},
670    };
671
672    lazy_static::lazy_static! {
673        static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
674        static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
675        static ref CHANNEL_ID: Hash = generate_channel_id(BOB.public().as_ref(), ALICE.public().as_ref());
676    }
677
678    lazy_static::lazy_static! {
679        static ref ALICE_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
680        static ref BOB_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
681    }
682
683    const TICKET_VALUE: u64 = 100_000;
684    const CHANNEL_EPOCH: u32 = 4;
685
686    fn generate_random_ack_ticket(
687        src: &ChainKeypair,
688        dst: &ChainKeypair,
689        index: u64,
690        index_offset: u32,
691        win_prob: f64,
692    ) -> anyhow::Result<AcknowledgedTicket> {
693        let hk1 = HalfKey::random();
694        let hk2 = HalfKey::random();
695        let challenge = Response::from_half_keys(&hk1, &hk2)?.to_challenge()?;
696
697        Ok(TicketBuilder::default()
698            .addresses(src, dst)
699            .amount(TICKET_VALUE)
700            .index(index)
701            .index_offset(index_offset)
702            .win_prob(win_prob.try_into()?)
703            .channel_epoch(CHANNEL_EPOCH)
704            .challenge(challenge)
705            .build_signed(src, &Hash::default())?
706            .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?))
707    }
708
709    async fn init_db_with_tickets(db: &HoprNodeDb, count_tickets: u64) -> anyhow::Result<Vec<AcknowledgedTicket>> {
710        let tickets: Vec<AcknowledgedTicket> = (0..count_tickets)
711            .map(|i| generate_random_ack_ticket(&BOB, &ALICE, i, 1, 1.0))
712            .collect::<anyhow::Result<Vec<AcknowledgedTicket>>>()?;
713
714        for t in &tickets {
715            db.upsert_ticket(t.clone()).await?;
716        }
717
718        Ok(tickets)
719    }
720
721    #[tokio::test]
722    async fn test_insert_get_ticket() -> anyhow::Result<()> {
723        let db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
724
725        let mut tickets = init_db_with_tickets(&db, 1).await?;
726        let ack_ticket = tickets.pop().context("ticket should be present")?;
727
728        assert_eq!(
729            *CHANNEL_ID,
730            ack_ticket.verified_ticket().channel_id,
731            "channel ids must match"
732        );
733        assert_eq!(
734            CHANNEL_EPOCH,
735            ack_ticket.verified_ticket().channel_epoch,
736            "epochs must match"
737        );
738
739        let db_ticket = db
740            .stream_tickets(Some((&ack_ticket).into()))
741            .await?
742            .collect::<Vec<_>>()
743            .await
744            .first()
745            .cloned()
746            .context("ticket should exist")?;
747
748        assert_eq!(ack_ticket, db_ticket, "tickets must be equal");
749
750        Ok(())
751    }
752
753    #[tokio::test]
754    async fn test_mark_redeemed() -> anyhow::Result<()> {
755        let db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
756        const COUNT_TICKETS: u64 = 10;
757
758        let tickets = init_db_with_tickets(&db, COUNT_TICKETS).await?;
759
760        let stats = db.get_ticket_statistics(None).await?;
761        assert_eq!(
762            HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
763            stats.unredeemed_value,
764            "unredeemed balance must match"
765        );
766        assert_eq!(
767            HoprBalance::zero(),
768            stats.redeemed_value,
769            "there must be 0 redeemed value"
770        );
771
772        assert_eq!(
773            stats,
774            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
775            "per channel stats must be same"
776        );
777
778        const TO_REDEEM: u64 = 2;
779        for ticket in tickets.iter().take(TO_REDEEM as usize) {
780            let r = db.mark_tickets_as(ticket.into(), TicketMarker::Redeemed).await?;
781            assert_eq!(1, r, "must redeem only a single ticket");
782        }
783
784        let stats = db.get_ticket_statistics(None).await?;
785        assert_eq!(
786            HoprBalance::from(TICKET_VALUE * (COUNT_TICKETS - TO_REDEEM)),
787            stats.unredeemed_value,
788            "unredeemed balance must match"
789        );
790        assert_eq!(
791            HoprBalance::from(TICKET_VALUE * TO_REDEEM),
792            stats.redeemed_value,
793            "there must be a redeemed value"
794        );
795
796        assert_eq!(
797            stats,
798            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
799            "per channel stats must be same"
800        );
801
802        Ok(())
803    }
804
805    #[tokio::test]
806    async fn test_mark_redeem_should_not_mark_redeem_twice() -> anyhow::Result<()> {
807        let db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
808
809        let ticket = init_db_with_tickets(&db, 1)
810            .await?
811            .pop()
812            .context("should contain a ticket")?;
813
814        db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed).await?;
815        assert_eq!(0, db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed).await?);
816
817        Ok(())
818    }
819
820    #[tokio::test]
821    async fn test_mark_redeem_should_redeem_all_tickets() -> anyhow::Result<()> {
822        let db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
823
824        let count_tickets = 10;
825        init_db_with_tickets(&db, count_tickets).await?;
826
827        let count_marked = db
828            .mark_tickets_as(TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH), TicketMarker::Redeemed)
829            .await?;
830        assert_eq!(count_tickets, count_marked as u64, "must mark all tickets in channel");
831
832        Ok(())
833    }
834
835    #[tokio::test]
836    async fn test_mark_tickets_neglected() -> anyhow::Result<()> {
837        let db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
838        const COUNT_TICKETS: u64 = 10;
839
840        init_db_with_tickets(&db, COUNT_TICKETS).await?;
841
842        let stats = db.get_ticket_statistics(None).await?;
843        assert_eq!(
844            HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
845            stats.unredeemed_value,
846            "unredeemed balance must match"
847        );
848        assert_eq!(
849            HoprBalance::zero(),
850            stats.neglected_value,
851            "there must be 0 redeemed value"
852        );
853
854        assert_eq!(
855            stats,
856            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
857            "per channel stats must be same"
858        );
859
860        db.mark_tickets_as(TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH), TicketMarker::Neglected)
861            .await?;
862
863        let stats = db.get_ticket_statistics(None).await?;
864        assert_eq!(
865            HoprBalance::zero(),
866            stats.unredeemed_value,
867            "unredeemed balance must be zero"
868        );
869        assert_eq!(
870            HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
871            stats.neglected_value,
872            "there must be a neglected value"
873        );
874
875        assert_eq!(
876            stats,
877            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
878            "per channel stats must be same"
879        );
880
881        Ok(())
882    }
883
884    #[tokio::test]
885    async fn test_mark_unsaved_ticket_rejected() -> anyhow::Result<()> {
886        let db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
887
888        let mut ticket = init_db_with_tickets(&db, 1).await?;
889        let ticket = ticket.pop().context("ticket should be present")?.ticket;
890
891        let stats = db.get_ticket_statistics(None).await?;
892        assert_eq!(HoprBalance::zero(), stats.rejected_value);
893        assert_eq!(
894            stats,
895            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
896            "per channel stats must be same"
897        );
898
899        db.mark_unsaved_ticket_rejected(ticket.verified_ticket()).await?;
900
901        let stats = db.get_ticket_statistics(None).await?;
902        assert_eq!(ticket.verified_ticket().amount, stats.rejected_value);
903        assert_eq!(
904            stats,
905            db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
906            "per channel stats must be same"
907        );
908
909        Ok(())
910    }
911
912    #[tokio::test]
913    async fn test_update_tickets_states_and_fetch() -> anyhow::Result<()> {
914        let db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
915
916        init_db_with_tickets(&db, 10).await?;
917
918        let selector = TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH).with_index(5);
919
920        let v: Vec<AcknowledgedTicket> = db
921            .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
922            .await?
923            .collect()
924            .await;
925
926        assert_eq!(1, v.len(), "single ticket must be updated");
927        assert_eq!(
928            AcknowledgedTicketStatus::BeingRedeemed,
929            v.first().context("should contain a ticket")?.status,
930            "status must be set"
931        );
932
933        let selector = TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH).with_state(AcknowledgedTicketStatus::Untouched);
934
935        let v: Vec<AcknowledgedTicket> = db
936            .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
937            .await?
938            .collect()
939            .await;
940
941        assert_eq!(9, v.len(), "only specific tickets must have state set");
942        assert!(
943            v.iter().all(|t| t.verified_ticket().index != 5),
944            "only tickets with different state must update"
945        );
946        assert!(
947            v.iter().all(|t| t.status == AcknowledgedTicketStatus::BeingRedeemed),
948            "tickets must have updated state"
949        );
950
951        Ok(())
952    }
953
954    #[tokio::test]
955    async fn test_update_tickets_states() -> anyhow::Result<()> {
956        let db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
957
958        init_db_with_tickets(&db, 10).await?;
959        let selector = TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH).with_state(AcknowledgedTicketStatus::Untouched);
960
961        db.update_ticket_states(selector.clone(), AcknowledgedTicketStatus::BeingRedeemed)
962            .await?;
963
964        let v: Vec<AcknowledgedTicket> = db
965            .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
966            .await?
967            .collect()
968            .await;
969
970        assert!(v.is_empty(), "must not update if already updated");
971
972        Ok(())
973    }
974
975    #[tokio::test]
976    async fn test_ticket_index_should_be_zero_if_not_yet_present() -> anyhow::Result<()> {
977        let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
978
979        let hash = Hash::default();
980
981        let idx = db.get_outgoing_ticket_index(hash).await?;
982        assert_eq!(0, idx.load(Ordering::SeqCst), "initial index must be zero");
983
984        let r = hopr_db_entity::outgoing_ticket_index::Entity::find()
985            .filter(hopr_db_entity::outgoing_ticket_index::Column::ChannelId.eq(hash.to_hex()))
986            .one(&db.tickets_db)
987            .await?
988            .context("index must exist")?;
989
990        assert_eq!(0, U256::from_be_bytes(r.index).as_u64(), "index must be zero");
991
992        Ok(())
993    }
994
995    #[test]
996    fn test_ticket_stats_default_must_be_zero() -> anyhow::Result<()> {
997        let stats = ChannelTicketStatistics::default();
998        assert_eq!(stats.unredeemed_value, HoprBalance::zero());
999        assert_eq!(stats.redeemed_value, HoprBalance::zero());
1000        assert_eq!(stats.neglected_value, HoprBalance::zero());
1001        assert_eq!(stats.rejected_value, HoprBalance::zero());
1002        assert_eq!(stats.winning_tickets, 0);
1003
1004        Ok(())
1005    }
1006
1007    #[tokio::test]
1008    async fn test_ticket_stats_must_be_zero_for_non_existing_channel() -> anyhow::Result<()> {
1009        let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
1010
1011        let stats = db.get_ticket_statistics(Some(*CHANNEL_ID)).await?;
1012
1013        assert_eq!(stats, ChannelTicketStatistics::default());
1014
1015        Ok(())
1016    }
1017
1018    #[tokio::test]
1019    async fn test_ticket_stats_must_be_zero_when_no_tickets() -> anyhow::Result<()> {
1020        let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
1021
1022        let stats = db.get_ticket_statistics(Some(*CHANNEL_ID)).await?;
1023
1024        assert_eq!(
1025            ChannelTicketStatistics::default(),
1026            stats,
1027            "must be equal to default which is all zeros"
1028        );
1029
1030        assert_eq!(
1031            stats,
1032            db.get_ticket_statistics(None).await?,
1033            "per-channel stats must be the same as global stats"
1034        );
1035
1036        Ok(())
1037    }
1038
1039    #[tokio::test]
1040    async fn test_ticket_stats_must_be_different_per_channel() -> anyhow::Result<()> {
1041        let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
1042
1043        let channel_1 = generate_channel_id(BOB.public().as_ref(), ALICE.public().as_ref());
1044        let channel_2 = generate_channel_id(ALICE.public().as_ref(), BOB.public().as_ref());
1045
1046        let t1 = generate_random_ack_ticket(&BOB, &ALICE, 1, 1, 1.0)?;
1047        let t2 = generate_random_ack_ticket(&ALICE, &BOB, 1, 1, 1.0)?;
1048
1049        let value = t1.verified_ticket().amount;
1050
1051        db.upsert_ticket(t1).await?;
1052        db.upsert_ticket(t2).await?;
1053
1054        let stats_1 = db.get_ticket_statistics(Some(channel_1)).await?;
1055
1056        let stats_2 = db.get_ticket_statistics(Some(channel_2)).await?;
1057
1058        assert_eq!(value, stats_1.unredeemed_value);
1059        assert_eq!(value, stats_2.unredeemed_value);
1060
1061        assert_eq!(HoprBalance::zero(), stats_1.neglected_value);
1062        assert_eq!(HoprBalance::zero(), stats_2.neglected_value);
1063
1064        assert_eq!(stats_1, stats_2);
1065
1066        db.mark_tickets_as(TicketSelector::new(channel_1, CHANNEL_EPOCH), TicketMarker::Neglected)
1067            .await?;
1068
1069        let stats_1 = db.get_ticket_statistics(Some(channel_1)).await?;
1070
1071        let stats_2 = db.get_ticket_statistics(Some(channel_2)).await?;
1072
1073        assert_eq!(HoprBalance::zero(), stats_1.unredeemed_value);
1074        assert_eq!(value, stats_1.neglected_value);
1075
1076        assert_eq!(HoprBalance::zero(), stats_2.neglected_value);
1077
1078        Ok(())
1079    }
1080
1081    #[tokio::test]
1082    async fn test_ticket_index_compare_and_set_and_increment() -> anyhow::Result<()> {
1083        let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
1084
1085        let hash = Hash::default();
1086
1087        let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1088        assert_eq!(0, old_idx, "old value must be 0");
1089
1090        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1091        assert_eq!(1, new_idx, "new value must be 1");
1092
1093        let old_idx = db.increment_outgoing_ticket_index(hash).await?;
1094        assert_eq!(1, old_idx, "old value must be 1");
1095
1096        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1097        assert_eq!(2, new_idx, "new value must be 2");
1098
1099        Ok(())
1100    }
1101
1102    #[tokio::test]
1103    async fn test_ticket_index_compare_and_set_must_not_decrease() -> anyhow::Result<()> {
1104        let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
1105
1106        let hash = Hash::default();
1107
1108        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1109        assert_eq!(0, new_idx, "value must be 0");
1110
1111        db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1112
1113        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1114        assert_eq!(1, new_idx, "new value must be 1");
1115
1116        let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 0).await?;
1117        assert_eq!(1, old_idx, "old value must be 1");
1118        assert_eq!(1, new_idx, "new value must be 1");
1119
1120        let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1121        assert_eq!(1, old_idx, "old value must be 1");
1122        assert_eq!(1, new_idx, "new value must be 1");
1123
1124        Ok(())
1125    }
1126
1127    #[tokio::test]
1128    async fn test_ticket_index_reset() -> anyhow::Result<()> {
1129        let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
1130
1131        let hash = Hash::default();
1132
1133        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1134        assert_eq!(0, new_idx, "value must be 0");
1135
1136        db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1137
1138        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1139        assert_eq!(1, new_idx, "new value must be 1");
1140
1141        let old_idx = db.reset_outgoing_ticket_index(hash).await?;
1142        assert_eq!(1, old_idx, "old value must be 1");
1143
1144        let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1145        assert_eq!(0, new_idx, "new value must be 0");
1146        Ok(())
1147    }
1148
1149    #[tokio::test]
1150    async fn test_persist_ticket_indices() -> anyhow::Result<()> {
1151        let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
1152
1153        let hash_1 = Hash::default();
1154        let hash_2 = Hash::from(hopr_crypto_random::random_bytes());
1155
1156        db.get_outgoing_ticket_index(hash_1).await?;
1157        db.compare_and_set_outgoing_ticket_index(hash_2, 10).await?;
1158
1159        let persisted = db.persist_outgoing_ticket_indices().await?;
1160        assert_eq!(1, persisted);
1161
1162        let indices = hopr_db_entity::outgoing_ticket_index::Entity::find()
1163            .all(&db.tickets_db)
1164            .await?;
1165        let idx_1 = indices
1166            .iter()
1167            .find(|idx| idx.channel_id == hash_1.to_hex())
1168            .context("must contain index 1")?;
1169        let idx_2 = indices
1170            .iter()
1171            .find(|idx| idx.channel_id == hash_2.to_hex())
1172            .context("must contain index 2")?;
1173        assert_eq!(0, U256::from_be_bytes(&idx_1.index).as_u64(), "index must be 0");
1174        assert_eq!(10, U256::from_be_bytes(&idx_2.index).as_u64(), "index must be 10");
1175
1176        db.compare_and_set_outgoing_ticket_index(hash_1, 3).await?;
1177        db.increment_outgoing_ticket_index(hash_2).await?;
1178
1179        let persisted = db.persist_outgoing_ticket_indices().await?;
1180        assert_eq!(2, persisted);
1181
1182        let indices = hopr_db_entity::outgoing_ticket_index::Entity::find()
1183            .all(&db.tickets_db)
1184            .await?;
1185        let idx_1 = indices
1186            .iter()
1187            .find(|idx| idx.channel_id == hash_1.to_hex())
1188            .context("must contain index 1")?;
1189        let idx_2 = indices
1190            .iter()
1191            .find(|idx| idx.channel_id == hash_2.to_hex())
1192            .context("must contain index 2")?;
1193        assert_eq!(3, U256::from_be_bytes(&idx_1.index).as_u64(), "index must be 3");
1194        assert_eq!(11, U256::from_be_bytes(&idx_2.index).as_u64(), "index must be 11");
1195        Ok(())
1196    }
1197
1198    #[tokio::test]
1199    async fn test_cache_can_be_cloned_but_referencing_the_original_cache_storage() -> anyhow::Result<()> {
1200        let cache: moka::future::Cache<i64, i64> = moka::future::Cache::new(5);
1201
1202        assert_eq!(cache.weighted_size(), 0);
1203
1204        cache.insert(1, 1).await;
1205        cache.insert(2, 2).await;
1206
1207        let clone = cache.clone();
1208
1209        cache.remove(&1).await;
1210        cache.remove(&2).await;
1211
1212        assert_eq!(cache.get(&1).await, None);
1213        assert_eq!(cache.get(&1).await, clone.get(&1).await);
1214        Ok(())
1215    }
1216
1217    #[tokio::test]
1218    async fn test_set_ticket_statistics_when_tickets_are_in_db() -> anyhow::Result<()> {
1219        let db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
1220
1221        let ticket = init_db_with_tickets(&db, 1).await?.pop().unwrap();
1222
1223        db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed)
1224            .await
1225            .expect("must not fail");
1226
1227        let stats = db.get_ticket_statistics(None).await.expect("must not fail");
1228        assert_ne!(stats.redeemed_value, HoprBalance::zero());
1229
1230        db.reset_ticket_statistics().await.expect("must not fail");
1231
1232        let stats = db.get_ticket_statistics(None).await.expect("must not fail");
1233        assert_eq!(stats.redeemed_value, HoprBalance::zero());
1234
1235        Ok(())
1236    }
1237}