1use std::{collections::HashMap, ops::Bound};
2
3use async_stream::stream;
4use async_trait::async_trait;
5use futures::{StreamExt, TryStreamExt, stream::BoxStream};
6use hopr_api::db::*;
7use hopr_db_entity::{outgoing_ticket_index, ticket, ticket_statistics};
8use hopr_internal_types::prelude::*;
9use hopr_primitive_types::prelude::*;
10use sea_orm::{
11 ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QueryOrder, QuerySelect, Set,
12 TransactionTrait,
13};
14use sea_query::{Condition, Expr, ExprTrait, IntoCondition, Order};
15use tracing::{debug, error, info, trace};
16
17use crate::{db::HoprNodeDb, errors::NodeDbError};
18
19#[cfg(all(feature = "prometheus", not(test)))]
20lazy_static::lazy_static! {
21 pub static ref METRIC_HOPR_TICKETS_INCOMING_STATISTICS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
22 "hopr_tickets_incoming_statistics",
23 "Ticket statistics for channels with incoming tickets.",
24 &["statistic"]
25 ).unwrap();
26}
27
28#[derive(Clone)]
32pub(crate) struct WrappedTicketSelector(pub(crate) TicketSelector);
33
34impl From<TicketSelector> for WrappedTicketSelector {
35 fn from(selector: TicketSelector) -> Self {
36 Self(selector)
37 }
38}
39
40impl AsRef<TicketSelector> for WrappedTicketSelector {
41 fn as_ref(&self) -> &TicketSelector {
42 &self.0
43 }
44}
45
46impl IntoCondition for WrappedTicketSelector {
47 fn into_condition(self) -> Condition {
48 let (channel_id, epoch) = self.0.channel_identifier;
49
50 let mut expr = ticket::Column::ChannelId
51 .eq(hex::encode(channel_id))
52 .and(ticket::Column::ChannelEpoch.eq(epoch.to_be_bytes().to_vec()));
53
54 match self.0.index {
55 TicketIndexSelector::None => {
56 }
58 TicketIndexSelector::Single(idx) => expr = expr.and(ticket::Column::Index.eq(idx.to_be_bytes().to_vec())),
59 TicketIndexSelector::Multiple(idxs) => {
60 expr = expr.and(ticket::Column::Index.is_in(idxs.into_iter().map(|i| i.to_be_bytes().to_vec())));
61 }
62 TicketIndexSelector::Range((lb, ub)) => {
63 expr = match lb {
64 Bound::Included(gte) => expr.and(ticket::Column::Index.gte(gte.to_be_bytes().to_vec())),
65 Bound::Excluded(gt) => expr.and(ticket::Column::Index.gt(gt.to_be_bytes().to_vec())),
66 Bound::Unbounded => expr,
67 };
68 expr = match ub {
69 Bound::Included(lte) => expr.and(ticket::Column::Index.lte(lte.to_be_bytes().to_vec())),
70 Bound::Excluded(lt) => expr.and(ticket::Column::Index.lt(lt.to_be_bytes().to_vec())),
71 Bound::Unbounded => expr,
72 };
73 }
74 }
75
76 if let Some(state) = self.0.state {
77 expr = expr.and(ticket::Column::State.eq(state as u8))
78 }
79
80 expr = match self.0.win_prob.0 {
82 Bound::Included(gte) => expr.and(ticket::Column::WinningProbability.gte(gte.as_encoded().to_vec())),
83 Bound::Excluded(gt) => expr.and(ticket::Column::WinningProbability.gt(gt.as_encoded().to_vec())),
84 Bound::Unbounded => expr,
85 };
86
87 expr = match self.0.win_prob.1 {
89 Bound::Included(lte) => expr.and(ticket::Column::WinningProbability.lte(lte.as_encoded().to_vec())),
90 Bound::Excluded(lt) => expr.and(ticket::Column::WinningProbability.lt(lt.as_encoded().to_vec())),
91 Bound::Unbounded => expr,
92 };
93
94 expr = match self.0.amount.0 {
96 Bound::Included(gte) => expr.and(ticket::Column::Amount.gte(gte.amount().to_be_bytes().to_vec())),
97 Bound::Excluded(gt) => expr.and(ticket::Column::Amount.gt(gt.amount().to_be_bytes().to_vec())),
98 Bound::Unbounded => expr,
99 };
100
101 expr = match self.0.amount.1 {
103 Bound::Included(lte) => expr.and(ticket::Column::Amount.lte(lte.amount().to_be_bytes().to_vec())),
104 Bound::Excluded(lt) => expr.and(ticket::Column::Amount.lt(lt.amount().to_be_bytes().to_vec())),
105 Bound::Unbounded => expr,
106 };
107
108 expr.into_condition()
109 }
110}
111
112pub(crate) fn any_selector<I: IntoIterator<Item = S>, S: Into<TicketSelector>>(selectors: I) -> Condition {
114 selectors
115 .into_iter()
116 .map(|s| WrappedTicketSelector(s.into()).into_condition())
117 .reduce(|a, b| a.or(b).into_condition())
118 .unwrap_or(Condition::all())
119}
120
121pub(crate) async fn find_stats_for_channel(
122 tx: &sea_orm::DatabaseTransaction,
123 channel_id: &ChannelId,
124) -> Result<ticket_statistics::Model, NodeDbError> {
125 if let Some(model) = ticket_statistics::Entity::find()
126 .filter(ticket_statistics::Column::ChannelId.eq(hex::encode(channel_id)))
127 .one(tx)
128 .await?
129 {
130 Ok(model)
131 } else {
132 let new_stats = ticket_statistics::ActiveModel {
133 channel_id: Set(hex::encode(channel_id)),
134 ..Default::default()
135 }
136 .insert(tx)
137 .await?;
138
139 Ok(new_stats)
140 }
141}
142
143pub(crate) async fn get_tickets_value_int(
144 tx: &impl TransactionTrait,
145 selector: TicketSelector,
146) -> Result<(usize, HoprBalance), NodeDbError> {
147 let selector: WrappedTicketSelector = selector.into();
148 Ok(tx
149 .transaction(|tx| {
150 Box::pin(async move {
151 ticket::Entity::find()
152 .filter(selector)
153 .stream(tx)
154 .await?
155 .try_fold((0_usize, HoprBalance::zero()), |(count, value), t| async move {
156 Ok((count + 1, value + HoprBalance::from_be_bytes(t.amount)))
157 })
158 .await
159 })
160 })
161 .await?)
162}
163
164#[async_trait]
165impl HoprDbTicketOperations for HoprNodeDb {
166 type Error = NodeDbError;
167
168 async fn stream_tickets<'c, S: Into<TicketSelector>, I: IntoIterator<Item = S> + Send>(
169 &'c self,
170 selectors: I,
171 ) -> Result<BoxStream<'c, RedeemableTicket>, Self::Error> {
172 let qry = ticket::Entity::find().filter(any_selector(selectors));
173
174 Ok(qry
175 .order_by(ticket::Column::ChannelId, Order::Asc)
176 .order_by(ticket::Column::ChannelEpoch, Order::Asc)
177 .order_by(ticket::Column::Index, Order::Asc)
178 .stream(&self.tickets_db)
179 .await?
180 .and_then(|model| {
181 futures::future::ready(
182 RedeemableTicket::try_from(model).map_err(|e| sea_orm::DbErr::Custom(e.to_string())),
183 )
184 })
185 .filter_map(|ticket| {
186 futures::future::ready(ticket.inspect_err(|error| error!(%error, "invalid ticket in db")).ok())
187 })
188 .boxed())
189 }
190
191 async fn insert_ticket(&self, ticket: RedeemableTicket) -> Result<(), Self::Error> {
192 let _lock = self.tickets_write_lock.lock().await;
193 let unrealized_value = self.unrealized_value.clone();
194 self.tickets_db
195 .transaction(|tx| {
196 Box::pin(async move {
197 ticket::ActiveModel::from(ticket).insert(tx).await?;
199
200 let model = find_stats_for_channel(tx, ticket.ticket.channel_id()).await?;
202
203 let winning_tickets = model.winning_tickets + 1;
204 let mut active_model = model.into_active_model();
205 active_model.winning_tickets = sea_orm::Set(winning_tickets);
206 active_model.save(tx).await?;
207
208 #[cfg(all(feature = "prometheus", not(test)))]
209 {
210 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
211 &["unredeemed"],
212 get_tickets_value_int(tx, TicketSelector::from(&ticket).only_channel())
213 .await?
214 .1
215 .amount()
216 .as_u128() as f64,
217 );
218 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.increment(&["winning_count"], 1.0f64);
219 }
220
221 unrealized_value
223 .entry((*ticket.ticket.channel_id(), ticket.verified_ticket().channel_epoch))
224 .and_compute_with(|value| match value {
225 Some(value) => futures::future::ready(moka::ops::compute::Op::Put(
226 value.into_value() + ticket.verified_ticket().amount,
227 )),
228 None => {
229 futures::future::ready(moka::ops::compute::Op::Put(ticket.verified_ticket().amount))
230 }
231 })
232 .await;
233
234 Ok::<_, NodeDbError>(())
235 })
236 })
237 .await?;
238
239 Ok(())
240 }
241
242 async fn mark_tickets_as<S: Into<TicketSelector> + Send, I: IntoIterator<Item = S> + Send>(
243 &self,
244 selectors: I,
245 mark_as: TicketMarker,
246 ) -> Result<usize, Self::Error> {
247 let selectors = selectors.into_iter().map(|s| s.into()).collect::<Vec<_>>();
248 let unrealized_value = self.unrealized_value.clone();
249 let _lock = self.tickets_write_lock.lock().await;
250
251 let (total_count, marked_values) = self
252 .tickets_db
253 .transaction(|tx| {
254 Box::pin(async move {
255 let mut total_marked_count = 0;
256 let mut marked_values = Vec::new();
257 for channel_selector in selectors {
258 let (marked_count, marked_value) =
260 get_tickets_value_int(tx, channel_selector.clone()).await?;
261 trace!(marked_count, ?marked_value, ?mark_as, "ticket marking");
262
263 if marked_count > 0 {
264 let deleted = ticket::Entity::delete_many()
266 .filter(WrappedTicketSelector::from(channel_selector.clone()))
267 .exec(tx)
268 .await?;
269
270 if deleted.rows_affected == marked_count as u64 {
272 let mut new_stats = find_stats_for_channel(tx, &channel_selector.channel_identifier.0)
273 .await?
274 .into_active_model();
275
276 let _current_value = match mark_as {
277 TicketMarker::Redeemed => {
278 let current_value = U256::from_be_bytes(new_stats.redeemed_value.as_ref());
279 new_stats.redeemed_value =
280 Set((current_value + marked_value.amount()).to_be_bytes().into());
281 current_value
282 }
283 TicketMarker::Rejected => {
284 let current_value = U256::from_be_bytes(new_stats.rejected_value.as_ref());
285 new_stats.rejected_value =
286 Set((current_value + marked_value.amount()).to_be_bytes().into());
287 current_value
288 }
289 TicketMarker::Neglected => {
290 let current_value = U256::from_be_bytes(new_stats.neglected_value.as_ref());
291 new_stats.neglected_value =
292 Set((current_value + marked_value.amount()).to_be_bytes().into());
293 current_value
294 }
295 };
296 new_stats.save(tx).await?;
297
298 #[cfg(all(feature = "prometheus", not(test)))]
299 {
300 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
301 &[&mark_as.to_string()],
302 (_current_value + marked_value.amount()).as_u128() as f64,
303 );
304
305 if mark_as != TicketMarker::Rejected {
308 let unredeemed_value = get_tickets_value_int(tx, channel_selector.clone())
309 .await
310 .map(|(_, value)| value)
311 .unwrap_or_default();
312
313 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
314 &["unredeemed"],
315 (unredeemed_value - marked_value.amount()).amount().as_u128() as f64,
316 );
317 }
318 }
319 } else {
320 return Err(NodeDbError::LogicalError(format!(
321 "could not mark {marked_count} ticket as {mark_as}"
322 )));
323 }
324
325 trace!(marked_count, channel_id = ?channel_selector.channel_identifier.0, ?mark_as, "removed tickets in channel");
326 total_marked_count += marked_count;
327 marked_values.push((channel_selector.channel_identifier.0, channel_selector.channel_identifier.1, marked_value));
328 }
329 }
330
331 info!(count = total_marked_count, ?mark_as, "removed tickets in channels",);
332 Ok((total_marked_count, marked_values))
333 })
334 })
335 .await?;
336
337 for (channel_id, epoch, removed_amount) in marked_values {
340 unrealized_value
341 .entry((channel_id, epoch))
342 .and_compute_with(|value| match value {
343 Some(value) => {
344 futures::future::ready(moka::ops::compute::Op::Put(value.into_value() - removed_amount))
345 }
346 None => futures::future::ready(moka::ops::compute::Op::Nop),
347 })
348 .await;
349 }
350
351 Ok(total_count)
352 }
353
354 async fn mark_unsaved_ticket_rejected(&self, issuer: &Address, ticket: &Ticket) -> Result<(), NodeDbError> {
355 let channel_id = generate_channel_id(issuer, &ticket.counterparty);
356 let amount = ticket.amount;
357 let _lock = self.tickets_write_lock.lock().await;
358
359 Ok(self
360 .tickets_db
361 .transaction(|tx| {
362 Box::pin(async move {
363 let stats = find_stats_for_channel(tx, &channel_id).await?;
364
365 let current_rejected_value = U256::from_be_bytes(stats.rejected_value.clone());
366
367 let mut active_stats = stats.into_active_model();
368 active_stats.rejected_value = Set((current_rejected_value + amount.amount()).to_be_bytes().into());
369 active_stats.save(tx).await?;
370
371 #[cfg(all(feature = "prometheus", not(test)))]
372 {
373 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
374 &["rejected"],
375 (current_rejected_value + amount.amount()).as_u128() as f64,
376 );
377 }
378
379 Ok::<(), NodeDbError>(())
380 })
381 })
382 .await?)
383 }
384
385 async fn update_ticket_states_and_fetch<'a, S: Into<TicketSelector>, I: IntoIterator<Item = S> + Send>(
386 &'a self,
387 selectors: I,
388 new_state: AcknowledgedTicketStatus,
389 ) -> Result<BoxStream<'a, RedeemableTicket>, Self::Error> {
390 let selector = any_selector(selectors);
391 Ok(Box::pin(stream! {
392 let _lock = self.tickets_write_lock.lock().await;
394
395 match ticket::Entity::find()
396 .filter(selector)
397 .order_by(ticket::Column::ChannelId, Order::Asc)
398 .order_by(ticket::Column::ChannelEpoch, Order::Asc)
399 .order_by(ticket::Column::Index, Order::Asc)
400 .stream(&self.tickets_db)
401 .await {
402 Ok(mut stream) => {
403 while let Ok(Some(ticket)) = stream.try_next().await {
404 let active_ticket = ticket::ActiveModel {
405 id: Set(ticket.id),
406 state: Set(new_state as i8),
407 ..Default::default()
408 };
409
410 {
411 if let Err(error) = active_ticket.update(&self.tickets_db).await {
412 error!(%error, "failed to update ticket in the db");
413 }
414 }
415
416 match RedeemableTicket::try_from(ticket) {
417 Ok(ticket) => {
418 yield ticket
419 },
420 Err(error) => {
421 tracing::error!(%error, "failed to decode ticket from the db");
422 }
423 }
424 }
425 },
426 Err(error) => tracing::error!(%error, "failed open ticket db stream")
427 }
428 }))
429 }
430
431 async fn update_ticket_states<S: Into<TicketSelector>, I: IntoIterator<Item = S> + Send>(
432 &self,
433 selectors: I,
434 new_state: AcknowledgedTicketStatus,
435 ) -> Result<usize, Self::Error> {
436 let selector = any_selector(selectors);
437 let _lock = self.tickets_write_lock.lock().await;
438 Ok(self
439 .tickets_db
440 .transaction(|tx| {
441 Box::pin(async move {
442 ticket::Entity::update_many()
443 .filter(selector)
444 .col_expr(ticket::Column::State, Expr::value(new_state as i8))
445 .exec(tx)
446 .await
447 .map(|update| update.rows_affected as usize)
448 })
449 })
450 .await?)
451 }
452
453 async fn get_ticket_statistics(
454 &self,
455 channel_id: Option<ChannelId>,
456 ) -> Result<ChannelTicketStatistics, NodeDbError> {
457 let res = match channel_id {
458 None => {
459 self.tickets_db
460 .transaction(|tx| {
461 Box::pin(async move {
462 let unredeemed_value = ticket::Entity::find()
463 .stream(tx)
464 .await?
465 .try_fold(U256::zero(), |amount, x| {
466 let unredeemed_value = U256::from_be_bytes(x.amount);
467 futures::future::ok(amount + unredeemed_value)
468 })
469 .await?;
470
471 #[cfg(all(feature = "prometheus", not(test)))]
472 METRIC_HOPR_TICKETS_INCOMING_STATISTICS
473 .set(&["unredeemed"], unredeemed_value.as_u128() as f64);
474
475 let mut all_stats = ticket_statistics::Entity::find().all(tx).await?.into_iter().fold(
476 ChannelTicketStatistics::default(),
477 |mut acc, stats| {
478 let neglected_value = HoprBalance::from_be_bytes(stats.neglected_value);
479 acc.finalized_values
480 .entry(TicketMarker::Neglected)
481 .and_modify(|b| *b += neglected_value)
482 .or_insert(neglected_value);
483 let redeemed_value = HoprBalance::from_be_bytes(stats.redeemed_value);
484 acc.finalized_values
485 .entry(TicketMarker::Redeemed)
486 .and_modify(|b| *b += redeemed_value)
487 .or_insert(redeemed_value);
488 let rejected_value = HoprBalance::from_be_bytes(stats.rejected_value);
489 acc.finalized_values
490 .entry(TicketMarker::Rejected)
491 .and_modify(|b| *b += rejected_value)
492 .or_insert(rejected_value);
493 acc.winning_tickets += stats.winning_tickets as u128;
494 acc
495 },
496 );
497
498 all_stats.unredeemed_value = unredeemed_value.into();
499
500 #[cfg(all(feature = "prometheus", not(test)))]
501 {
502 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
503 &["neglected"],
504 all_stats
505 .finalized_values
506 .get(&TicketMarker::Neglected)
507 .copied()
508 .unwrap_or_default()
509 .amount()
510 .as_u128() as f64,
511 );
512 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
513 &["redeemed"],
514 all_stats
515 .finalized_values
516 .get(&TicketMarker::Redeemed)
517 .copied()
518 .unwrap_or_default()
519 .amount()
520 .as_u128() as f64,
521 );
522 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
523 &["rejected"],
524 all_stats
525 .finalized_values
526 .get(&TicketMarker::Rejected)
527 .copied()
528 .unwrap_or_default()
529 .amount()
530 .as_u128() as f64,
531 );
532 METRIC_HOPR_TICKETS_INCOMING_STATISTICS
533 .set(&["winning_tickets"], all_stats.winning_tickets as f64);
534 }
535 Ok::<_, NodeDbError>(all_stats)
536 })
537 })
538 .await
539 }
540 Some(channel) => {
541 let _lock = self.tickets_write_lock.lock().await;
542
543 self.tickets_db
544 .transaction(|tx| {
545 Box::pin(async move {
546 let stats = find_stats_for_channel(tx, &channel).await?;
547 let unredeemed_value = ticket::Entity::find()
548 .filter(ticket::Column::ChannelId.eq(hex::encode(channel)))
549 .stream(tx)
550 .await?
551 .try_fold(U256::zero(), |amount, x| {
552 futures::future::ok(amount + U256::from_be_bytes(x.amount))
553 })
554 .await?;
555
556 Ok::<_, NodeDbError>(ChannelTicketStatistics {
557 winning_tickets: stats.winning_tickets as u128,
558 unredeemed_value: unredeemed_value.into(),
559 finalized_values: HashMap::from_iter([
560 (
561 TicketMarker::Neglected,
562 HoprBalance::from_be_bytes(stats.neglected_value),
563 ),
564 (TicketMarker::Redeemed, HoprBalance::from_be_bytes(stats.redeemed_value)),
565 (TicketMarker::Rejected, HoprBalance::from_be_bytes(stats.rejected_value)),
566 ]),
567 })
568 })
569 })
570 .await
571 }
572 };
573 debug!(stats = ?res, "retrieved ticket statistics");
574 Ok(res?)
575 }
576
577 async fn reset_ticket_statistics(&self) -> Result<(), NodeDbError> {
578 let _lock = self.tickets_write_lock.lock().await;
579
580 Ok(self
581 .tickets_db
582 .transaction(|tx| {
583 Box::pin(async move {
584 let deleted = ticket_statistics::Entity::delete_many().exec(tx).await?;
586
587 #[cfg(all(feature = "prometheus", not(test)))]
588 {
589 if deleted.rows_affected > 0 {
590 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&["neglected"], 0.0_f64);
591 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&["redeemed"], 0.0_f64);
592 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&["rejected"], 0.0_f64);
593 }
594 }
595
596 debug!("reset ticket statistics for {:} channel(s)", deleted.rows_affected);
597
598 Ok::<_, sea_orm::DbErr>(())
599 })
600 })
601 .await?)
602 }
603
604 async fn get_tickets_value(&self, id: &ChannelId, epoch: u32) -> Result<HoprBalance, NodeDbError> {
605 Ok(self
606 .unrealized_value
607 .try_get_with((*id, epoch), async {
608 get_tickets_value_int(&self.tickets_db, TicketSelector::new(*id, epoch))
609 .await
610 .map(|(_, value)| value)
611 })
612 .await?)
613 }
614
615 async fn get_or_create_outgoing_ticket_index(
616 &self,
617 channel_id: &ChannelId,
618 epoch: u32,
619 ) -> Result<Option<u64>, Self::Error> {
620 let _lock = self.tickets_write_lock.lock().await;
621 let channel_id = hex::encode(channel_id);
622
623 Ok(self
624 .tickets_db
625 .transaction(|tx| {
626 Box::pin(async move {
627 outgoing_ticket_index::Entity::delete_many()
629 .filter(
630 outgoing_ticket_index::Column::ChannelId
631 .eq(&channel_id)
632 .and(outgoing_ticket_index::Column::Epoch.lt(epoch)),
633 )
634 .exec(tx)
635 .await?;
636
637 let newer_epoch = outgoing_ticket_index::Entity::find()
639 .filter(
640 outgoing_ticket_index::Column::ChannelId
641 .eq(&channel_id)
642 .and(outgoing_ticket_index::Column::Epoch.gt(epoch)),
643 )
644 .limit(1)
645 .one(tx)
646 .await?;
647
648 if let Some(newer_epoch) = newer_epoch {
649 return Err(NodeDbError::LogicalError(format!(
650 "attempted to get or insert outgoing index for older epoch {epoch} < {} in channel \
651 {channel_id}",
652 newer_epoch.epoch
653 )));
654 }
655
656 let maybe_index = outgoing_ticket_index::Entity::find()
658 .filter(
659 outgoing_ticket_index::Column::ChannelId
660 .eq(&channel_id)
661 .and(outgoing_ticket_index::Column::Epoch.eq(epoch)),
662 )
663 .order_by_asc(outgoing_ticket_index::Column::Epoch)
664 .one(tx)
665 .await?;
666
667 Ok(match maybe_index {
668 Some(model) => Some(u64::from_be_bytes(model.index.try_into().map_err(|_| {
669 NodeDbError::LogicalError(format!(
670 "could not convert outgoing ticket index to u64 for channel {channel_id}"
671 ))
672 })?)),
673 None => {
674 outgoing_ticket_index::ActiveModel {
675 channel_id: Set(channel_id),
676 epoch: Set(epoch as i32),
677 ..Default::default()
678 }
679 .insert(tx)
680 .await?;
681
682 None
683 }
684 })
685 })
686 })
687 .await?)
688 }
689
690 async fn update_outgoing_ticket_index(
691 &self,
692 channel_id: &ChannelId,
693 epoch: u32,
694 index: u64,
695 ) -> Result<(), Self::Error> {
696 let _lock = self.tickets_write_lock.lock().await;
697 let channel_id = hex::encode(channel_id);
698 Ok(self
699 .tickets_db
700 .transaction(|tx| {
701 Box::pin(async move {
702 let maybe_index = outgoing_ticket_index::Entity::find()
703 .filter(
704 outgoing_ticket_index::Column::ChannelId
705 .eq(&channel_id)
706 .and(outgoing_ticket_index::Column::Epoch.eq(epoch)),
707 )
708 .one(tx)
709 .await?;
710
711 if let Some(model) = maybe_index {
712 let current_index = U256::from_be_bytes(model.index.as_slice()).as_u64();
713 if current_index > index {
714 return Err(NodeDbError::LogicalError(format!(
715 "cannot set outgoing ticket index to {index} for channel {channel_id} - index is \
716 already {current_index}"
717 )));
718 }
719
720 if current_index != index {
722 let mut active_model = model.into_active_model();
723 active_model.index = Set(index.to_be_bytes().to_vec());
724 active_model.save(tx).await?;
725
726 tracing::debug!(%channel_id, %epoch, %index, "updated outgoing ticket index");
727 }
728 } else {
729 tracing::debug!("ignoring attempt to update outgoing ticket index for non-existing entry");
730 }
731
732 Ok::<_, NodeDbError>(())
733 })
734 })
735 .await?)
736 }
737
738 async fn remove_outgoing_ticket_index(&self, channel_id: &ChannelId, epoch: u32) -> Result<(), Self::Error> {
739 let _lock = self.tickets_write_lock.lock().await;
740 let res = outgoing_ticket_index::Entity::delete_many()
741 .filter(
742 outgoing_ticket_index::Column::ChannelId
743 .eq(hex::encode(channel_id))
744 .and(outgoing_ticket_index::Column::Epoch.eq(epoch)),
745 )
746 .exec(&self.tickets_db)
747 .await?;
748
749 if res.rows_affected > 0 {
750 tracing::debug!(%channel_id, %epoch, "removed outgoing ticket index");
751 } else {
752 tracing::warn!(%channel_id, %epoch, "outgoing ticket index not found");
753 }
754
755 Ok(())
756 }
757}
758
759#[cfg(test)]
760impl HoprNodeDb {
761 pub(crate) async fn upsert_ticket(&self, acknowledged_ticket: RedeemableTicket) -> Result<(), NodeDbError> {
762 let _lock = self.tickets_write_lock.lock().await;
763
764 self.tickets_db
765 .transaction(|tx| {
766 Box::pin(async move {
767 let selector = WrappedTicketSelector::from(TicketSelector::from(&acknowledged_ticket));
769
770 debug!(%acknowledged_ticket, "upserting ticket");
771 let mut model = ticket::ActiveModel::from(acknowledged_ticket);
772
773 if let Some(ticket) = ticket::Entity::find().filter(selector).one(tx).await? {
774 model.id = Set(ticket.id);
775 }
776
777 model.save(tx).await
778 })
779 })
780 .await?;
781 Ok(())
782 }
783}
784
785#[cfg(test)]
786mod tests {
787 use anyhow::Context;
788 use futures::StreamExt;
789 use hex_literal::hex;
790 use hopr_api::db::{ChannelTicketStatistics, TicketMarker};
791 use hopr_crypto_random::Randomizable;
792 use hopr_crypto_types::prelude::*;
793 use hopr_internal_types::prelude::*;
794 use hopr_primitive_types::prelude::*;
795
796 use crate::{
797 db::HoprNodeDb,
798 tickets::{HoprDbTicketOperations, TicketSelector},
799 };
800
801 lazy_static::lazy_static! {
802 static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
803 static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
804 static ref CHANNEL_ID: Hash = generate_channel_id(BOB.public().as_ref(), ALICE.public().as_ref());
805 }
806
807 lazy_static::lazy_static! {
808 static ref ALICE_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
809 static ref BOB_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
810 }
811
812 const TICKET_VALUE: u64 = 100_000;
813 const CHANNEL_EPOCH: u32 = 4;
814
815 fn generate_random_ack_ticket(
816 src: &ChainKeypair,
817 dst: &ChainKeypair,
818 index: u64,
819 _index_offset: u32,
820 win_prob: f64,
821 ) -> anyhow::Result<RedeemableTicket> {
822 let hk1 = HalfKey::random();
823 let hk2 = HalfKey::random();
824 let challenge = Response::from_half_keys(&hk1, &hk2)?.to_challenge()?;
825
826 Ok(TicketBuilder::default()
827 .counterparty(dst)
828 .amount(TICKET_VALUE)
829 .index(index)
830 .win_prob(win_prob.try_into()?)
831 .channel_epoch(CHANNEL_EPOCH)
832 .challenge(challenge)
833 .build_signed(src, &Hash::default())?
834 .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?)
835 .into_redeemable(dst, &Hash::default())?)
836 }
837
838 async fn init_db_with_tickets(db: &HoprNodeDb, count_tickets: u64) -> anyhow::Result<Vec<RedeemableTicket>> {
839 let tickets: Vec<RedeemableTicket> = (0..count_tickets)
840 .map(|i| generate_random_ack_ticket(&BOB, &ALICE, i, 1, 1.0))
841 .collect::<anyhow::Result<Vec<RedeemableTicket>>>()?;
842
843 for t in &tickets {
844 db.upsert_ticket(t.clone()).await?;
845 }
846
847 Ok(tickets)
848 }
849
850 #[test_log::test(tokio::test)]
851 async fn test_insert_get_ticket() -> anyhow::Result<()> {
852 let db = HoprNodeDb::new_in_memory().await?;
853
854 let mut tickets = init_db_with_tickets(&db, 1).await?;
855 let ack_ticket = tickets.pop().context("ticket should be present")?;
856
857 assert_eq!(*CHANNEL_ID, *ack_ticket.ticket.channel_id(), "channel ids must match");
858 assert_eq!(
859 CHANNEL_EPOCH,
860 ack_ticket.verified_ticket().channel_epoch,
861 "epochs must match"
862 );
863
864 let db_ticket = db
865 .stream_tickets([&ack_ticket])
866 .await?
867 .collect::<Vec<_>>()
868 .await
869 .first()
870 .cloned()
871 .context("ticket should exist 1")?;
872
873 assert_eq!(ack_ticket, db_ticket, "tickets must be equal");
874
875 let db_ticket = db
876 .stream_tickets(None::<TicketSelector>)
877 .await?
878 .collect::<Vec<_>>()
879 .await
880 .first()
881 .cloned()
882 .context("ticket should exist 2")?;
883
884 assert_eq!(ack_ticket, db_ticket, "tickets must be equal");
885
886 Ok(())
887 }
888
889 #[tokio::test]
890 async fn test_mark_redeemed() -> anyhow::Result<()> {
891 let db = HoprNodeDb::new_in_memory().await?;
892 const COUNT_TICKETS: u64 = 10;
893
894 let tickets = init_db_with_tickets(&db, COUNT_TICKETS).await?;
895
896 let stats = db.get_ticket_statistics(None).await?;
897 assert_eq!(
898 HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
899 stats.unredeemed_value,
900 "unredeemed balance must match"
901 );
902 assert_eq!(
903 HoprBalance::zero(),
904 stats.redeemed_value(),
905 "there must be 0 redeemed value"
906 );
907
908 assert_eq!(
909 stats,
910 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
911 "per channel stats must be same"
912 );
913
914 const TO_REDEEM: u64 = 2;
915 for ticket in tickets.iter().take(TO_REDEEM as usize) {
916 let r = db.mark_tickets_as([ticket], TicketMarker::Redeemed).await?;
917 assert_eq!(1, r, "must redeem only a single ticket");
918 }
919
920 let stats = db.get_ticket_statistics(None).await?;
921 assert_eq!(
922 HoprBalance::from(TICKET_VALUE * (COUNT_TICKETS - TO_REDEEM)),
923 stats.unredeemed_value,
924 "unredeemed balance must match"
925 );
926 assert_eq!(
927 HoprBalance::from(TICKET_VALUE * TO_REDEEM),
928 stats.redeemed_value(),
929 "there must be a redeemed value"
930 );
931
932 assert_eq!(
933 stats,
934 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
935 "per channel stats must be same"
936 );
937
938 Ok(())
939 }
940
941 #[tokio::test]
942 async fn test_mark_redeem_should_not_mark_redeem_twice() -> anyhow::Result<()> {
943 let db = HoprNodeDb::new_in_memory().await?;
944
945 let ticket = init_db_with_tickets(&db, 1)
946 .await?
947 .pop()
948 .context("should contain a ticket")?;
949
950 db.mark_tickets_as([&ticket], TicketMarker::Redeemed).await?;
951 assert_eq!(0, db.mark_tickets_as([&ticket], TicketMarker::Redeemed).await?);
952
953 Ok(())
954 }
955
956 #[tokio::test]
957 async fn test_mark_redeem_should_redeem_all_tickets() -> anyhow::Result<()> {
958 let db = HoprNodeDb::new_in_memory().await?;
959
960 let count_tickets = 10;
961 init_db_with_tickets(&db, count_tickets).await?;
962
963 let count_marked = db
964 .mark_tickets_as(
965 [TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH)],
966 TicketMarker::Redeemed,
967 )
968 .await?;
969 assert_eq!(count_tickets, count_marked as u64, "must mark all tickets in channel");
970
971 Ok(())
972 }
973
974 #[tokio::test]
975 async fn test_mark_tickets_neglected() -> anyhow::Result<()> {
976 let db = HoprNodeDb::new_in_memory().await?;
977 const COUNT_TICKETS: u64 = 10;
978
979 init_db_with_tickets(&db, COUNT_TICKETS).await?;
980
981 let stats = db.get_ticket_statistics(None).await?;
982 assert_eq!(
983 HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
984 stats.unredeemed_value,
985 "unredeemed balance must match"
986 );
987 assert_eq!(
988 HoprBalance::zero(),
989 stats.neglected_value(),
990 "there must be 0 redeemed value"
991 );
992
993 assert_eq!(
994 stats,
995 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
996 "per channel stats must be same"
997 );
998
999 db.mark_tickets_as(
1000 [TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH)],
1001 TicketMarker::Neglected,
1002 )
1003 .await?;
1004
1005 let stats = db.get_ticket_statistics(None).await?;
1006 assert_eq!(
1007 HoprBalance::zero(),
1008 stats.unredeemed_value,
1009 "unredeemed balance must be zero"
1010 );
1011 assert_eq!(
1012 HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
1013 stats.neglected_value(),
1014 "there must be a neglected value"
1015 );
1016
1017 assert_eq!(
1018 stats,
1019 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1020 "per channel stats must be same"
1021 );
1022
1023 Ok(())
1024 }
1025
1026 #[test_log::test(tokio::test)]
1027 async fn test_mark_unsaved_ticket_rejected() -> anyhow::Result<()> {
1028 let db = HoprNodeDb::new_in_memory().await?;
1029
1030 let mut ticket = init_db_with_tickets(&db, 1).await?;
1031 let ticket = ticket.pop().context("ticket should be present")?.ticket;
1032
1033 let stats = db.get_ticket_statistics(None).await?;
1034 assert_eq!(HoprBalance::zero(), stats.rejected_value());
1035 assert_eq!(
1036 stats,
1037 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1038 "per channel stats must be same"
1039 );
1040
1041 tracing::debug!("marking ticket as rejected");
1042 db.mark_unsaved_ticket_rejected(BOB.public().as_ref(), ticket.verified_ticket())
1043 .await?;
1044
1045 let stats = db.get_ticket_statistics(None).await?;
1046 assert_eq!(ticket.verified_ticket().amount, stats.rejected_value());
1047 assert_eq!(
1048 stats,
1049 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
1050 "per channel stats must be same"
1051 );
1052
1053 Ok(())
1054 }
1055
1056 #[tokio::test]
1057 async fn test_update_tickets_states_and_fetch() -> anyhow::Result<()> {
1058 let db = HoprNodeDb::new_in_memory().await?;
1059
1060 init_db_with_tickets(&db, 10).await?;
1061
1062 let selector = TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH).with_index(5);
1063
1064 let v: Vec<RedeemableTicket> = db
1065 .update_ticket_states_and_fetch([selector], AcknowledgedTicketStatus::BeingRedeemed)
1066 .await?
1067 .collect()
1068 .await;
1069
1070 assert_eq!(1, v.len(), "single ticket must be updated");
1071
1072 let selector = TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH).with_state(AcknowledgedTicketStatus::Untouched);
1073
1074 let v: Vec<RedeemableTicket> = db
1075 .update_ticket_states_and_fetch([selector], AcknowledgedTicketStatus::BeingRedeemed)
1076 .await?
1077 .collect()
1078 .await;
1079
1080 assert_eq!(9, v.len(), "only specific tickets must have state set");
1081 assert!(
1082 v.iter().all(|t| t.verified_ticket().index != 5),
1083 "only tickets with different state must update"
1084 );
1085
1086 Ok(())
1087 }
1088
1089 #[tokio::test]
1090 async fn test_update_tickets_states() -> anyhow::Result<()> {
1091 let db = HoprNodeDb::new_in_memory().await?;
1092
1093 init_db_with_tickets(&db, 10).await?;
1094 let selector = TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH).with_state(AcknowledgedTicketStatus::Untouched);
1095
1096 db.update_ticket_states([selector.clone()], AcknowledgedTicketStatus::BeingRedeemed)
1097 .await?;
1098
1099 let v: Vec<RedeemableTicket> = db
1100 .update_ticket_states_and_fetch([selector], AcknowledgedTicketStatus::BeingRedeemed)
1101 .await?
1102 .collect()
1103 .await;
1104
1105 assert!(v.is_empty(), "must not update if already updated");
1106
1107 Ok(())
1108 }
1109
1110 #[tokio::test]
1111 async fn test_outgoing_ticket_index_should_be_zero_if_not_yet_present() -> anyhow::Result<()> {
1112 let db = HoprNodeDb::new_in_memory().await?;
1113
1114 let hash = Hash::default();
1115
1116 let idx = db.get_or_create_outgoing_ticket_index(&hash, 1).await?;
1117 assert_eq!(None, idx, "initial index must be None");
1118
1119 let idx = db.get_or_create_outgoing_ticket_index(&hash, 1).await?;
1120 assert_eq!(Some(0), idx, "index must be zero");
1121
1122 Ok(())
1123 }
1124
1125 #[test_log::test(tokio::test)]
1126 async fn test_outgoing_ticket_index_should_not_allow_old_epochs() -> anyhow::Result<()> {
1127 let db = HoprNodeDb::new_in_memory().await?;
1128
1129 let hash = Hash::default();
1130
1131 db.get_or_create_outgoing_ticket_index(&hash, 1).await?;
1132 db.get_or_create_outgoing_ticket_index(&hash, 2).await?;
1133
1134 assert!(db.get_or_create_outgoing_ticket_index(&hash, 1).await.is_err());
1135
1136 Ok(())
1137 }
1138
1139 #[tokio::test]
1140 async fn test_outgoing_ticket_index_should_be_updated_if_already_present() -> anyhow::Result<()> {
1141 let db = HoprNodeDb::new_in_memory().await?;
1142
1143 let hash = Hash::default();
1144
1145 db.get_or_create_outgoing_ticket_index(&hash, 1).await?;
1146 db.update_outgoing_ticket_index(&hash, 1, 2).await?;
1147
1148 assert_eq!(Some(2), db.get_or_create_outgoing_ticket_index(&hash, 1).await?);
1149
1150 Ok(())
1151 }
1152
1153 #[tokio::test]
1154 async fn test_outgoing_ticket_index_should_not_be_updated_if_not_present() -> anyhow::Result<()> {
1155 let db = HoprNodeDb::new_in_memory().await?;
1156
1157 let hash = Hash::default();
1158
1159 db.update_outgoing_ticket_index(&hash, 1, 2).await?;
1160
1161 assert_eq!(None, db.get_or_create_outgoing_ticket_index(&hash, 1).await?);
1162
1163 Ok(())
1164 }
1165
1166 #[tokio::test]
1167 async fn test_outgoing_ticket_index_should_not_be_updated_if_lower() -> anyhow::Result<()> {
1168 let db = HoprNodeDb::new_in_memory().await?;
1169
1170 let hash = Hash::default();
1171
1172 db.get_or_create_outgoing_ticket_index(&hash, 1).await?;
1173 db.update_outgoing_ticket_index(&hash, 1, 2).await?;
1174
1175 assert_eq!(Some(2), db.get_or_create_outgoing_ticket_index(&hash, 1).await?);
1176
1177 assert!(db.update_outgoing_ticket_index(&hash, 1, 1).await.is_err());
1178
1179 assert_eq!(Some(2), db.get_or_create_outgoing_ticket_index(&hash, 1).await?);
1180
1181 Ok(())
1182 }
1183
1184 #[tokio::test]
1185 async fn test_outgoing_ticket_index_should_be_removed() -> anyhow::Result<()> {
1186 let db = HoprNodeDb::new_in_memory().await?;
1187
1188 let hash = Hash::default();
1189
1190 let idx = db.get_or_create_outgoing_ticket_index(&hash, 1).await?;
1191 assert!(idx.is_none());
1192
1193 let idx = db.get_or_create_outgoing_ticket_index(&hash, 1).await?;
1194 assert_eq!(Some(0), idx);
1195
1196 db.remove_outgoing_ticket_index(&hash, 1).await?;
1197
1198 let idx = db.get_or_create_outgoing_ticket_index(&hash, 1).await?;
1199 assert!(idx.is_none());
1200
1201 Ok(())
1202 }
1203
1204 #[test]
1205 fn test_ticket_stats_default_must_be_zero() -> anyhow::Result<()> {
1206 let stats = ChannelTicketStatistics::default();
1207 assert_eq!(stats.unredeemed_value, HoprBalance::zero());
1208 assert_eq!(stats.redeemed_value(), HoprBalance::zero());
1209 assert_eq!(stats.neglected_value(), HoprBalance::zero());
1210 assert_eq!(stats.rejected_value(), HoprBalance::zero());
1211 assert_eq!(stats.winning_tickets, 0);
1212
1213 Ok(())
1214 }
1215
1216 #[tokio::test]
1217 async fn test_ticket_stats_must_be_zero_for_non_existing_channel() -> anyhow::Result<()> {
1218 let db = HoprNodeDb::new_in_memory().await?;
1219
1220 let stats = db.get_ticket_statistics(Some(*CHANNEL_ID)).await?;
1221
1222 assert_eq!(stats, ChannelTicketStatistics::default());
1223
1224 Ok(())
1225 }
1226
1227 #[tokio::test]
1228 async fn test_ticket_stats_must_be_zero_when_no_tickets() -> anyhow::Result<()> {
1229 let db = HoprNodeDb::new_in_memory().await?;
1230
1231 let stats = db.get_ticket_statistics(Some(*CHANNEL_ID)).await?;
1232
1233 assert_eq!(
1234 ChannelTicketStatistics::default(),
1235 stats,
1236 "must be equal to default which is all zeros"
1237 );
1238
1239 assert_eq!(
1240 stats,
1241 db.get_ticket_statistics(None).await?,
1242 "per-channel stats must be the same as global stats"
1243 );
1244
1245 Ok(())
1246 }
1247
1248 #[tokio::test]
1249 async fn test_ticket_stats_must_be_different_per_channel() -> anyhow::Result<()> {
1250 let db = HoprNodeDb::new_in_memory().await?;
1251
1252 let channel_1 = generate_channel_id(BOB.public().as_ref(), ALICE.public().as_ref());
1253 let channel_2 = generate_channel_id(ALICE.public().as_ref(), BOB.public().as_ref());
1254
1255 let t1 = generate_random_ack_ticket(&BOB, &ALICE, 1, 1, 1.0)?;
1256 let t2 = generate_random_ack_ticket(&ALICE, &BOB, 1, 1, 1.0)?;
1257
1258 let value = t1.verified_ticket().amount;
1259
1260 db.upsert_ticket(t1).await?;
1261 db.upsert_ticket(t2).await?;
1262
1263 let stats_1 = db.get_ticket_statistics(Some(channel_1)).await?;
1264
1265 let stats_2 = db.get_ticket_statistics(Some(channel_2)).await?;
1266
1267 assert_eq!(value, stats_1.unredeemed_value);
1268 assert_eq!(value, stats_2.unredeemed_value);
1269
1270 assert_eq!(HoprBalance::zero(), stats_1.neglected_value());
1271 assert_eq!(HoprBalance::zero(), stats_2.neglected_value());
1272
1273 assert_eq!(stats_1, stats_2);
1274
1275 db.mark_tickets_as([TicketSelector::new(channel_1, CHANNEL_EPOCH)], TicketMarker::Neglected)
1276 .await?;
1277
1278 let stats_1 = db.get_ticket_statistics(Some(channel_1)).await?;
1279
1280 let stats_2 = db.get_ticket_statistics(Some(channel_2)).await?;
1281
1282 assert_eq!(HoprBalance::zero(), stats_1.unredeemed_value);
1283 assert_eq!(value, stats_1.neglected_value());
1284
1285 assert_eq!(HoprBalance::zero(), stats_2.neglected_value());
1286
1287 Ok(())
1288 }
1289
1290 #[tokio::test]
1291 async fn test_set_ticket_statistics_when_tickets_are_in_db() -> anyhow::Result<()> {
1292 let db = HoprNodeDb::new_in_memory().await?;
1293
1294 let ticket = init_db_with_tickets(&db, 1).await?.pop().unwrap();
1295
1296 db.mark_tickets_as([&ticket], TicketMarker::Redeemed)
1297 .await
1298 .expect("must not fail");
1299
1300 let stats = db.get_ticket_statistics(None).await.expect("must not fail");
1301 assert_ne!(stats.redeemed_value(), HoprBalance::zero());
1302
1303 db.reset_ticket_statistics().await.expect("must not fail");
1304
1305 let stats = db.get_ticket_statistics(None).await.expect("must not fail");
1306 assert_eq!(stats.redeemed_value(), HoprBalance::zero());
1307
1308 Ok(())
1309 }
1310}