1use std::{
2 ops::Bound,
3 sync::{
4 Arc,
5 atomic::{AtomicU64, Ordering},
6 },
7};
8
9use async_stream::stream;
10use async_trait::async_trait;
11use futures::{StreamExt, TryStreamExt, stream::BoxStream};
12use hopr_api::db::*;
13use hopr_crypto_types::prelude::*;
14use hopr_db_entity::{outgoing_ticket_index, ticket, ticket_statistics};
15use hopr_internal_types::prelude::*;
16use hopr_primitive_types::prelude::*;
17use sea_orm::{
18 ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QueryOrder, Set, TransactionTrait,
19};
20use sea_query::{Condition, Expr, IntoCondition, Order, SimpleExpr};
21use tracing::{debug, error, info, trace};
22
23use crate::{db::HoprNodeDb, errors::NodeDbError};
24
25#[cfg(all(feature = "prometheus", not(test)))]
26lazy_static::lazy_static! {
27 pub static ref METRIC_HOPR_TICKETS_INCOMING_STATISTICS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
28 "hopr_tickets_incoming_statistics",
29 "Ticket statistics for channels with incoming tickets.",
30 &["statistic"]
31 ).unwrap();
32}
33
34#[derive(Clone)]
38pub(crate) struct WrappedTicketSelector(pub(crate) TicketSelector);
39
40impl From<TicketSelector> for WrappedTicketSelector {
41 fn from(selector: TicketSelector) -> Self {
42 Self(selector)
43 }
44}
45
46impl AsRef<TicketSelector> for WrappedTicketSelector {
47 fn as_ref(&self) -> &TicketSelector {
48 &self.0
49 }
50}
51
52impl IntoCondition for WrappedTicketSelector {
53 fn into_condition(self) -> Condition {
54 let expr = self
55 .0
56 .channel_identifiers
57 .into_iter()
58 .map(|(channel_id, epoch)| {
59 ticket::Column::ChannelId
60 .eq(channel_id.to_hex())
61 .and(ticket::Column::ChannelEpoch.eq(epoch.to_be_bytes().to_vec()))
62 })
63 .reduce(SimpleExpr::or);
64
65 if expr.is_none() {
67 return Condition::any().not();
68 }
69
70 let mut expr = expr.unwrap();
71
72 match self.0.index {
73 TicketIndexSelector::None => {
74 }
76 TicketIndexSelector::Single(idx) => expr = expr.and(ticket::Column::Index.eq(idx.to_be_bytes().to_vec())),
77 TicketIndexSelector::Multiple(idxs) => {
78 expr = expr.and(ticket::Column::Index.is_in(idxs.into_iter().map(|i| i.to_be_bytes().to_vec())));
79 }
80 TicketIndexSelector::Range((lb, ub)) => {
81 expr = match lb {
82 Bound::Included(gte) => expr.and(ticket::Column::Index.gte(gte.to_be_bytes().to_vec())),
83 Bound::Excluded(gt) => expr.and(ticket::Column::Index.gt(gt.to_be_bytes().to_vec())),
84 Bound::Unbounded => expr,
85 };
86 expr = match ub {
87 Bound::Included(lte) => expr.and(ticket::Column::Index.lte(lte.to_be_bytes().to_vec())),
88 Bound::Excluded(lt) => expr.and(ticket::Column::Index.lt(lt.to_be_bytes().to_vec())),
89 Bound::Unbounded => expr,
90 };
91 }
92 }
93
94 if let Some(state) = self.0.state {
95 expr = expr.and(ticket::Column::State.eq(state as u8))
96 }
97
98 if self.0.only_aggregated {
99 expr = expr.and(ticket::Column::IndexOffset.gt(1));
100 }
101
102 expr = match self.0.win_prob.0 {
104 Bound::Included(gte) => expr.and(ticket::Column::WinningProbability.gte(gte.as_encoded().to_vec())),
105 Bound::Excluded(gt) => expr.and(ticket::Column::WinningProbability.gt(gt.as_encoded().to_vec())),
106 Bound::Unbounded => expr,
107 };
108
109 expr = match self.0.win_prob.1 {
111 Bound::Included(lte) => expr.and(ticket::Column::WinningProbability.lte(lte.as_encoded().to_vec())),
112 Bound::Excluded(lt) => expr.and(ticket::Column::WinningProbability.lt(lt.as_encoded().to_vec())),
113 Bound::Unbounded => expr,
114 };
115
116 expr = match self.0.amount.0 {
118 Bound::Included(gte) => expr.and(ticket::Column::Amount.gte(gte.amount().to_be_bytes().to_vec())),
119 Bound::Excluded(gt) => expr.and(ticket::Column::Amount.gt(gt.amount().to_be_bytes().to_vec())),
120 Bound::Unbounded => expr,
121 };
122
123 expr = match self.0.amount.1 {
125 Bound::Included(lte) => expr.and(ticket::Column::Amount.lte(lte.amount().to_be_bytes().to_vec())),
126 Bound::Excluded(lt) => expr.and(ticket::Column::Amount.lt(lt.amount().to_be_bytes().to_vec())),
127 Bound::Unbounded => expr,
128 };
129
130 expr.into_condition()
131 }
132}
133
134pub(crate) async fn find_stats_for_channel(
135 tx: &sea_orm::DatabaseTransaction,
136 channel_id: &Hash,
137) -> Result<ticket_statistics::Model, NodeDbError> {
138 if let Some(model) = ticket_statistics::Entity::find()
139 .filter(ticket_statistics::Column::ChannelId.eq(channel_id.to_hex()))
140 .one(tx)
141 .await?
142 {
143 Ok(model)
144 } else {
145 let new_stats = ticket_statistics::ActiveModel {
146 channel_id: Set(channel_id.to_hex()),
147 ..Default::default()
148 }
149 .insert(tx)
150 .await?;
151
152 Ok(new_stats)
153 }
154}
155
156#[async_trait]
157impl HoprDbTicketOperations for HoprNodeDb {
158 type Error = NodeDbError;
159
160 async fn stream_tickets<'c>(
161 &'c self,
162 selector: Option<TicketSelector>,
163 ) -> Result<BoxStream<'c, AcknowledgedTicket>, Self::Error> {
164 let qry = if let Some(selector) = selector.map(WrappedTicketSelector::from) {
165 ticket::Entity::find().filter(selector)
166 } else {
167 ticket::Entity::find()
168 };
169
170 Ok(qry
171 .order_by(ticket::Column::ChannelId, Order::Asc)
172 .order_by(ticket::Column::ChannelEpoch, Order::Asc)
173 .order_by(ticket::Column::Index, Order::Asc)
174 .stream(&self.tickets_db)
175 .await?
176 .and_then(|model| {
177 futures::future::ready(
178 AcknowledgedTicket::try_from(model).map_err(|e| sea_orm::DbErr::Custom(e.to_string())),
179 )
180 })
181 .filter_map(|ticket| {
182 futures::future::ready(ticket.inspect_err(|error| error!(%error, "invalid ticket in db")).ok())
183 })
184 .boxed())
185 }
186
187 async fn mark_tickets_as(&self, selector: TicketSelector, mark_as: TicketMarker) -> Result<usize, NodeDbError> {
188 let myself = self.clone();
189 Ok(self
190 .ticket_manager
191 .write_transaction(|tx| {
192 Box::pin(async move {
193 let mut total_marked_count = 0;
194 for (channel_id, epoch) in selector.channel_identifiers.iter() {
195 let channel_selector = selector.clone().just_on_channel(*channel_id, epoch);
196
197 let (marked_count, marked_value) =
199 myself.get_tickets_value_int(tx, channel_selector.clone()).await?;
200 trace!(marked_count, ?marked_value, ?mark_as, "ticket marking");
201
202 if marked_count > 0 {
203 let deleted = ticket::Entity::delete_many()
205 .filter(WrappedTicketSelector::from(channel_selector.clone()))
206 .exec(tx)
207 .await?;
208
209 if deleted.rows_affected == marked_count as u64 {
211 let mut new_stats = find_stats_for_channel(tx, channel_id).await?.into_active_model();
212 let _current_value = match mark_as {
213 TicketMarker::Redeemed => {
214 let current_value = U256::from_be_bytes(new_stats.redeemed_value.as_ref());
215 new_stats.redeemed_value =
216 Set((current_value + marked_value.amount()).to_be_bytes().into());
217 current_value
218 }
219 TicketMarker::Rejected => {
220 let current_value = U256::from_be_bytes(new_stats.rejected_value.as_ref());
221 new_stats.rejected_value =
222 Set((current_value + marked_value.amount()).to_be_bytes().into());
223 current_value
224 }
225 TicketMarker::Neglected => {
226 let current_value = U256::from_be_bytes(new_stats.neglected_value.as_ref());
227 new_stats.neglected_value =
228 Set((current_value + marked_value.amount()).to_be_bytes().into());
229 current_value
230 }
231 };
232 new_stats.save(tx).await?;
233
234 #[cfg(all(feature = "prometheus", not(test)))]
235 {
236 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
237 &[&mark_as.to_string()],
238 (_current_value + marked_value.amount()).as_u128() as f64,
239 );
240
241 if mark_as != TicketMarker::Rejected {
244 let unredeemed_value = myself
245 .caches
246 .unrealized_value
247 .get(&(*channel_id, *epoch))
248 .await
249 .unwrap_or_default();
250
251 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
252 &["unredeemed"],
253 (unredeemed_value - marked_value.amount()).amount().as_u128() as f64,
254 );
255 }
256 }
257
258 myself.caches.unrealized_value.invalidate(&(*channel_id, *epoch)).await;
259 } else {
260 return Err(NodeDbError::LogicalError(format!(
261 "could not mark {marked_count} ticket as {mark_as}"
262 )));
263 }
264
265 trace!(marked_count, ?channel_id, ?mark_as, "removed tickets in channel");
266 total_marked_count += marked_count;
267 }
268 }
269
270 info!(
271 count = total_marked_count,
272 ?mark_as,
273 channel_count = selector.channel_identifiers.len(),
274 "removed tickets in channels",
275 );
276 Ok(total_marked_count)
277 })
278 })
279 .await?)
280 }
281
282 async fn mark_unsaved_ticket_rejected(&self, ticket: &Ticket) -> Result<(), NodeDbError> {
283 let channel_id = ticket.channel_id;
284 let amount = ticket.amount;
285 Ok(self
286 .ticket_manager
287 .write_transaction(|tx| {
288 Box::pin(async move {
289 let stats = find_stats_for_channel(tx, &channel_id).await?;
290
291 let current_rejected_value = U256::from_be_bytes(stats.rejected_value.clone());
292
293 let mut active_stats = stats.into_active_model();
294 active_stats.rejected_value = Set((current_rejected_value + amount.amount()).to_be_bytes().into());
295 active_stats.save(tx).await?;
296
297 #[cfg(all(feature = "prometheus", not(test)))]
298 {
299 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
300 &["rejected"],
301 (current_rejected_value + amount.amount()).as_u128() as f64,
302 );
303 }
304
305 Ok::<(), NodeDbError>(())
306 })
307 })
308 .await?)
309 }
310
311 async fn update_ticket_states_and_fetch<'a>(
312 &'a self,
313 selector: TicketSelector,
314 new_state: AcknowledgedTicketStatus,
315 ) -> Result<BoxStream<'a, AcknowledgedTicket>, NodeDbError> {
316 let selector: WrappedTicketSelector = selector.into();
317 Ok(Box::pin(stream! {
318 match ticket::Entity::find()
319 .filter(selector)
320 .order_by(ticket::Column::ChannelId, Order::Asc)
321 .order_by(ticket::Column::ChannelEpoch, Order::Asc)
322 .order_by(ticket::Column::Index, Order::Asc)
323 .stream(&self.tickets_db)
324 .await {
325 Ok(mut stream) => {
326 while let Ok(Some(ticket)) = stream.try_next().await {
327 let active_ticket = ticket::ActiveModel {
328 id: Set(ticket.id),
329 state: Set(new_state as i8),
330 ..Default::default()
331 };
332
333 {
334 let _g = self.ticket_manager.mutex.lock().await;
335 if let Err(error) = active_ticket.update(&self.tickets_db).await {
336 error!(%error, "failed to update ticket in the db");
337 }
338 }
339
340 match AcknowledgedTicket::try_from(ticket) {
341 Ok(mut ticket) => {
342 ticket.status = new_state;
344 yield ticket
345 },
346 Err(error) => {
347 tracing::error!(%error, "failed to decode ticket from the db");
348 }
349 }
350 }
351 },
352 Err(error) => tracing::error!(%error, "failed open ticket db stream")
353 }
354 }))
355 }
356
357 async fn update_ticket_states(
358 &self,
359 selector: TicketSelector,
360 new_state: AcknowledgedTicketStatus,
361 ) -> Result<usize, NodeDbError> {
362 let selector: WrappedTicketSelector = selector.into();
363 Ok(self
364 .ticket_manager
365 .write_transaction(|tx| {
366 Box::pin(async move {
367 ticket::Entity::update_many()
368 .filter(selector)
369 .col_expr(ticket::Column::State, Expr::value(new_state as i8))
370 .exec(tx)
371 .await
372 .map(|update| update.rows_affected as usize)
373 })
374 })
375 .await?)
376 }
377
378 async fn get_ticket_statistics(&self, channel_id: Option<Hash>) -> Result<ChannelTicketStatistics, NodeDbError> {
379 let res = match channel_id {
380 None => {
381 self.tickets_db
382 .transaction(|tx| {
383 Box::pin(async move {
384 let unredeemed_value = ticket::Entity::find()
385 .stream(tx)
386 .await?
387 .try_fold(U256::zero(), |amount, x| {
388 let unredeemed_value = U256::from_be_bytes(x.amount);
389 futures::future::ok(amount + unredeemed_value)
390 })
391 .await?;
392
393 #[cfg(all(feature = "prometheus", not(test)))]
394 METRIC_HOPR_TICKETS_INCOMING_STATISTICS
395 .set(&["unredeemed"], unredeemed_value.as_u128() as f64);
396
397 let mut all_stats = ticket_statistics::Entity::find().all(tx).await?.into_iter().fold(
398 ChannelTicketStatistics::default(),
399 |mut acc, stats| {
400 let neglected_value = HoprBalance::from_be_bytes(stats.neglected_value);
401 acc.neglected_value += neglected_value;
402 let redeemed_value = HoprBalance::from_be_bytes(stats.redeemed_value);
403 acc.redeemed_value += redeemed_value;
404 let rejected_value = HoprBalance::from_be_bytes(stats.rejected_value);
405 acc.rejected_value += rejected_value;
406 acc.winning_tickets += stats.winning_tickets as u128;
407 acc
408 },
409 );
410
411 all_stats.unredeemed_value = unredeemed_value.into();
412
413 #[cfg(all(feature = "prometheus", not(test)))]
414 {
415 METRIC_HOPR_TICKETS_INCOMING_STATISTICS
416 .set(&["neglected"], all_stats.neglected_value.amount().as_u128() as f64);
417 METRIC_HOPR_TICKETS_INCOMING_STATISTICS
418 .set(&["redeemed"], all_stats.redeemed_value.amount().as_u128() as f64);
419 METRIC_HOPR_TICKETS_INCOMING_STATISTICS
420 .set(&["rejected"], all_stats.rejected_value.amount().as_u128() as f64);
421 METRIC_HOPR_TICKETS_INCOMING_STATISTICS
422 .set(&["winning_tickets"], all_stats.winning_tickets as f64);
423 }
424 Ok::<_, NodeDbError>(all_stats)
425 })
426 })
427 .await
428 }
429 Some(channel) => {
430 self.tickets_db
431 .transaction(|tx| {
432 Box::pin(async move {
433 let stats = find_stats_for_channel(tx, &channel).await?;
434 let unredeemed_value = ticket::Entity::find()
435 .filter(ticket::Column::ChannelId.eq(channel.to_hex()))
436 .stream(tx)
437 .await?
438 .try_fold(U256::zero(), |amount, x| {
439 futures::future::ok(amount + U256::from_be_bytes(x.amount))
440 })
441 .await?;
442
443 Ok::<_, NodeDbError>(ChannelTicketStatistics {
444 winning_tickets: stats.winning_tickets as u128,
445 neglected_value: HoprBalance::from_be_bytes(stats.neglected_value),
446 redeemed_value: HoprBalance::from_be_bytes(stats.redeemed_value),
447 unredeemed_value: unredeemed_value.into(),
448 rejected_value: HoprBalance::from_be_bytes(stats.rejected_value),
449 })
450 })
451 })
452 .await
453 }
454 };
455 debug!(stats = ?res, "retrieved ticket statistics");
456 Ok(res?)
457 }
458
459 async fn reset_ticket_statistics(&self) -> Result<(), NodeDbError> {
460 Ok(self
461 .tickets_db
462 .transaction(|tx| {
463 Box::pin(async move {
464 let deleted = ticket_statistics::Entity::delete_many().exec(tx).await?;
466
467 #[cfg(all(feature = "prometheus", not(test)))]
468 {
469 if deleted.rows_affected > 0 {
470 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&["neglected"], 0.0_f64);
471 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&["redeemed"], 0.0_f64);
472 METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(&["rejected"], 0.0_f64);
473 }
474 }
475
476 debug!("reset ticket statistics for {:} channel(s)", deleted.rows_affected);
477
478 Ok::<_, sea_orm::DbErr>(())
479 })
480 })
481 .await?)
482 }
483
484 async fn get_tickets_value(&self, selector: TicketSelector) -> Result<(usize, HoprBalance), NodeDbError> {
485 self.get_tickets_value_int(&self.tickets_db, selector).await
486 }
487
488 async fn compare_and_set_outgoing_ticket_index(&self, channel_id: Hash, index: u64) -> Result<u64, NodeDbError> {
489 let old_value = self
490 .get_outgoing_ticket_index(channel_id)
491 .await?
492 .fetch_max(index, Ordering::SeqCst);
493
494 Ok(old_value)
498 }
499
500 async fn reset_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64, NodeDbError> {
501 let old_value = self
502 .get_outgoing_ticket_index(channel_id)
503 .await?
504 .swap(0, Ordering::SeqCst);
505
506 Ok(old_value)
510 }
511
512 async fn increment_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64, NodeDbError> {
513 let old_value = self
514 .get_outgoing_ticket_index(channel_id)
515 .await?
516 .fetch_add(1, Ordering::SeqCst);
517
518 Ok(old_value)
521 }
522
523 async fn get_outgoing_ticket_index(&self, channel_id: Hash) -> Result<Arc<AtomicU64>, NodeDbError> {
524 let tkt_manager = self.ticket_manager.clone();
525
526 Ok(self
527 .caches
528 .ticket_index
529 .try_get_with(channel_id, async move {
530 let maybe_index = outgoing_ticket_index::Entity::find()
531 .filter(outgoing_ticket_index::Column::ChannelId.eq(channel_id.to_hex()))
532 .one(&tkt_manager.tickets_db)
533 .await?;
534
535 Ok(Arc::new(AtomicU64::new(match maybe_index {
536 Some(model) => U256::from_be_bytes(model.index).as_u64(),
537 None => {
538 tkt_manager
539 .write_transaction(|tx| {
540 Box::pin(async move {
541 outgoing_ticket_index::ActiveModel {
542 channel_id: Set(channel_id.to_hex()),
543 ..Default::default()
544 }
545 .insert(tx)
546 .await?;
547 Ok::<_, sea_orm::DbErr>(())
548 })
549 })
550 .await?;
551 0_u64
552 }
553 })))
554 })
555 .await
556 .map_err(|e: Arc<NodeDbError>| {
557 NodeDbError::LogicalError(format!("failed to retrieve ticket index: {e}"))
558 })?)
559 }
560
561 async fn persist_outgoing_ticket_indices(&self) -> Result<usize, NodeDbError> {
562 let outgoing_indices = outgoing_ticket_index::Entity::find().all(&self.tickets_db).await?;
563
564 let mut updated = 0;
565 for index_model in outgoing_indices {
566 let channel_id = Hash::from_hex(&index_model.channel_id).map_err(NodeDbError::from)?;
567 let db_index = U256::from_be_bytes(&index_model.index).as_u64();
568 if let Some(cached_index) = self.caches.ticket_index.get(&channel_id).await {
569 let cached_index = cached_index.load(Ordering::SeqCst);
573
574 if cached_index > db_index {
576 let mut index_active_model = index_model.into_active_model();
577 index_active_model.index = Set(cached_index.to_be_bytes().to_vec());
578 self.ticket_manager
579 .write_transaction(|tx| {
580 Box::pin(async move {
581 index_active_model.save(tx).await?;
582 Ok::<_, sea_orm::DbErr>(())
583 })
584 })
585 .await?;
586
587 debug!("updated ticket index in channel {channel_id} from {db_index} to {cached_index}");
588 updated += 1;
589 }
590 } else {
591 trace!(?channel_id, "channel not in cache yet");
594 }
595 }
596
597 Ok(Ok::<_, NodeDbError>(updated)?)
598 }
599}
600
601impl HoprNodeDb {
602 pub async fn upsert_ticket(&self, acknowledged_ticket: AcknowledgedTicket) -> Result<(), NodeDbError> {
604 self.tickets_db
605 .transaction(|tx| {
606 Box::pin(async move {
607 let selector = WrappedTicketSelector::from(
609 TicketSelector::new(
610 acknowledged_ticket.verified_ticket().channel_id,
611 acknowledged_ticket.verified_ticket().channel_epoch,
612 )
613 .with_index(acknowledged_ticket.verified_ticket().index),
614 );
615
616 debug!("upserting ticket {acknowledged_ticket}");
617 let mut model = ticket::ActiveModel::from(acknowledged_ticket);
618
619 if let Some(ticket) = ticket::Entity::find().filter(selector).one(tx).await? {
620 model.id = Set(ticket.id);
621 }
622
623 model.save(tx).await
624 })
625 })
626 .await?;
627 Ok(())
628 }
629
630 async fn get_tickets_value_int(
631 &self,
632 tx: &impl TransactionTrait,
633 selector: TicketSelector,
634 ) -> Result<(usize, HoprBalance), NodeDbError> {
635 let selector: WrappedTicketSelector = selector.into();
636 Ok(tx
637 .transaction(|tx| {
638 Box::pin(async move {
639 ticket::Entity::find()
640 .filter(selector)
641 .stream(tx)
642 .await?
643 .try_fold((0_usize, HoprBalance::zero()), |(count, value), t| async move {
644 Ok((count + 1, value + HoprBalance::from_be_bytes(t.amount)))
645 })
646 .await
647 })
648 })
649 .await?)
650 }
651}
652
653#[cfg(test)]
654mod tests {
655 use std::sync::atomic::Ordering;
656
657 use anyhow::Context;
658 use futures::StreamExt;
659 use hex_literal::hex;
660 use hopr_api::db::{ChannelTicketStatistics, TicketMarker};
661 use hopr_crypto_random::Randomizable;
662 use hopr_crypto_types::prelude::*;
663 use hopr_internal_types::prelude::*;
664 use hopr_primitive_types::prelude::*;
665 use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
666
667 use crate::{
668 db::HoprNodeDb,
669 tickets::{HoprDbTicketOperations, TicketSelector},
670 };
671
672 lazy_static::lazy_static! {
673 static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
674 static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
675 static ref CHANNEL_ID: Hash = generate_channel_id(BOB.public().as_ref(), ALICE.public().as_ref());
676 }
677
678 lazy_static::lazy_static! {
679 static ref ALICE_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
680 static ref BOB_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
681 }
682
683 const TICKET_VALUE: u64 = 100_000;
684 const CHANNEL_EPOCH: u32 = 4;
685
686 fn generate_random_ack_ticket(
687 src: &ChainKeypair,
688 dst: &ChainKeypair,
689 index: u64,
690 index_offset: u32,
691 win_prob: f64,
692 ) -> anyhow::Result<AcknowledgedTicket> {
693 let hk1 = HalfKey::random();
694 let hk2 = HalfKey::random();
695 let challenge = Response::from_half_keys(&hk1, &hk2)?.to_challenge()?;
696
697 Ok(TicketBuilder::default()
698 .addresses(src, dst)
699 .amount(TICKET_VALUE)
700 .index(index)
701 .index_offset(index_offset)
702 .win_prob(win_prob.try_into()?)
703 .channel_epoch(CHANNEL_EPOCH)
704 .challenge(challenge)
705 .build_signed(src, &Hash::default())?
706 .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?))
707 }
708
709 async fn init_db_with_tickets(db: &HoprNodeDb, count_tickets: u64) -> anyhow::Result<Vec<AcknowledgedTicket>> {
710 let tickets: Vec<AcknowledgedTicket> = (0..count_tickets)
711 .map(|i| generate_random_ack_ticket(&BOB, &ALICE, i, 1, 1.0))
712 .collect::<anyhow::Result<Vec<AcknowledgedTicket>>>()?;
713
714 for t in &tickets {
715 db.upsert_ticket(t.clone()).await?;
716 }
717
718 Ok(tickets)
719 }
720
721 #[tokio::test]
722 async fn test_insert_get_ticket() -> anyhow::Result<()> {
723 let db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
724
725 let mut tickets = init_db_with_tickets(&db, 1).await?;
726 let ack_ticket = tickets.pop().context("ticket should be present")?;
727
728 assert_eq!(
729 *CHANNEL_ID,
730 ack_ticket.verified_ticket().channel_id,
731 "channel ids must match"
732 );
733 assert_eq!(
734 CHANNEL_EPOCH,
735 ack_ticket.verified_ticket().channel_epoch,
736 "epochs must match"
737 );
738
739 let db_ticket = db
740 .stream_tickets(Some((&ack_ticket).into()))
741 .await?
742 .collect::<Vec<_>>()
743 .await
744 .first()
745 .cloned()
746 .context("ticket should exist")?;
747
748 assert_eq!(ack_ticket, db_ticket, "tickets must be equal");
749
750 Ok(())
751 }
752
753 #[tokio::test]
754 async fn test_mark_redeemed() -> anyhow::Result<()> {
755 let db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
756 const COUNT_TICKETS: u64 = 10;
757
758 let tickets = init_db_with_tickets(&db, COUNT_TICKETS).await?;
759
760 let stats = db.get_ticket_statistics(None).await?;
761 assert_eq!(
762 HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
763 stats.unredeemed_value,
764 "unredeemed balance must match"
765 );
766 assert_eq!(
767 HoprBalance::zero(),
768 stats.redeemed_value,
769 "there must be 0 redeemed value"
770 );
771
772 assert_eq!(
773 stats,
774 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
775 "per channel stats must be same"
776 );
777
778 const TO_REDEEM: u64 = 2;
779 for ticket in tickets.iter().take(TO_REDEEM as usize) {
780 let r = db.mark_tickets_as(ticket.into(), TicketMarker::Redeemed).await?;
781 assert_eq!(1, r, "must redeem only a single ticket");
782 }
783
784 let stats = db.get_ticket_statistics(None).await?;
785 assert_eq!(
786 HoprBalance::from(TICKET_VALUE * (COUNT_TICKETS - TO_REDEEM)),
787 stats.unredeemed_value,
788 "unredeemed balance must match"
789 );
790 assert_eq!(
791 HoprBalance::from(TICKET_VALUE * TO_REDEEM),
792 stats.redeemed_value,
793 "there must be a redeemed value"
794 );
795
796 assert_eq!(
797 stats,
798 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
799 "per channel stats must be same"
800 );
801
802 Ok(())
803 }
804
805 #[tokio::test]
806 async fn test_mark_redeem_should_not_mark_redeem_twice() -> anyhow::Result<()> {
807 let db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
808
809 let ticket = init_db_with_tickets(&db, 1)
810 .await?
811 .pop()
812 .context("should contain a ticket")?;
813
814 db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed).await?;
815 assert_eq!(0, db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed).await?);
816
817 Ok(())
818 }
819
820 #[tokio::test]
821 async fn test_mark_redeem_should_redeem_all_tickets() -> anyhow::Result<()> {
822 let db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
823
824 let count_tickets = 10;
825 init_db_with_tickets(&db, count_tickets).await?;
826
827 let count_marked = db
828 .mark_tickets_as(TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH), TicketMarker::Redeemed)
829 .await?;
830 assert_eq!(count_tickets, count_marked as u64, "must mark all tickets in channel");
831
832 Ok(())
833 }
834
835 #[tokio::test]
836 async fn test_mark_tickets_neglected() -> anyhow::Result<()> {
837 let db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
838 const COUNT_TICKETS: u64 = 10;
839
840 init_db_with_tickets(&db, COUNT_TICKETS).await?;
841
842 let stats = db.get_ticket_statistics(None).await?;
843 assert_eq!(
844 HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
845 stats.unredeemed_value,
846 "unredeemed balance must match"
847 );
848 assert_eq!(
849 HoprBalance::zero(),
850 stats.neglected_value,
851 "there must be 0 redeemed value"
852 );
853
854 assert_eq!(
855 stats,
856 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
857 "per channel stats must be same"
858 );
859
860 db.mark_tickets_as(TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH), TicketMarker::Neglected)
861 .await?;
862
863 let stats = db.get_ticket_statistics(None).await?;
864 assert_eq!(
865 HoprBalance::zero(),
866 stats.unredeemed_value,
867 "unredeemed balance must be zero"
868 );
869 assert_eq!(
870 HoprBalance::from(TICKET_VALUE * COUNT_TICKETS),
871 stats.neglected_value,
872 "there must be a neglected value"
873 );
874
875 assert_eq!(
876 stats,
877 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
878 "per channel stats must be same"
879 );
880
881 Ok(())
882 }
883
884 #[tokio::test]
885 async fn test_mark_unsaved_ticket_rejected() -> anyhow::Result<()> {
886 let db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
887
888 let mut ticket = init_db_with_tickets(&db, 1).await?;
889 let ticket = ticket.pop().context("ticket should be present")?.ticket;
890
891 let stats = db.get_ticket_statistics(None).await?;
892 assert_eq!(HoprBalance::zero(), stats.rejected_value);
893 assert_eq!(
894 stats,
895 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
896 "per channel stats must be same"
897 );
898
899 db.mark_unsaved_ticket_rejected(ticket.verified_ticket()).await?;
900
901 let stats = db.get_ticket_statistics(None).await?;
902 assert_eq!(ticket.verified_ticket().amount, stats.rejected_value);
903 assert_eq!(
904 stats,
905 db.get_ticket_statistics(Some(*CHANNEL_ID)).await?,
906 "per channel stats must be same"
907 );
908
909 Ok(())
910 }
911
912 #[tokio::test]
913 async fn test_update_tickets_states_and_fetch() -> anyhow::Result<()> {
914 let db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
915
916 init_db_with_tickets(&db, 10).await?;
917
918 let selector = TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH).with_index(5);
919
920 let v: Vec<AcknowledgedTicket> = db
921 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
922 .await?
923 .collect()
924 .await;
925
926 assert_eq!(1, v.len(), "single ticket must be updated");
927 assert_eq!(
928 AcknowledgedTicketStatus::BeingRedeemed,
929 v.first().context("should contain a ticket")?.status,
930 "status must be set"
931 );
932
933 let selector = TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH).with_state(AcknowledgedTicketStatus::Untouched);
934
935 let v: Vec<AcknowledgedTicket> = db
936 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
937 .await?
938 .collect()
939 .await;
940
941 assert_eq!(9, v.len(), "only specific tickets must have state set");
942 assert!(
943 v.iter().all(|t| t.verified_ticket().index != 5),
944 "only tickets with different state must update"
945 );
946 assert!(
947 v.iter().all(|t| t.status == AcknowledgedTicketStatus::BeingRedeemed),
948 "tickets must have updated state"
949 );
950
951 Ok(())
952 }
953
954 #[tokio::test]
955 async fn test_update_tickets_states() -> anyhow::Result<()> {
956 let db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
957
958 init_db_with_tickets(&db, 10).await?;
959 let selector = TicketSelector::new(*CHANNEL_ID, CHANNEL_EPOCH).with_state(AcknowledgedTicketStatus::Untouched);
960
961 db.update_ticket_states(selector.clone(), AcknowledgedTicketStatus::BeingRedeemed)
962 .await?;
963
964 let v: Vec<AcknowledgedTicket> = db
965 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
966 .await?
967 .collect()
968 .await;
969
970 assert!(v.is_empty(), "must not update if already updated");
971
972 Ok(())
973 }
974
975 #[tokio::test]
976 async fn test_ticket_index_should_be_zero_if_not_yet_present() -> anyhow::Result<()> {
977 let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
978
979 let hash = Hash::default();
980
981 let idx = db.get_outgoing_ticket_index(hash).await?;
982 assert_eq!(0, idx.load(Ordering::SeqCst), "initial index must be zero");
983
984 let r = hopr_db_entity::outgoing_ticket_index::Entity::find()
985 .filter(hopr_db_entity::outgoing_ticket_index::Column::ChannelId.eq(hash.to_hex()))
986 .one(&db.tickets_db)
987 .await?
988 .context("index must exist")?;
989
990 assert_eq!(0, U256::from_be_bytes(r.index).as_u64(), "index must be zero");
991
992 Ok(())
993 }
994
995 #[test]
996 fn test_ticket_stats_default_must_be_zero() -> anyhow::Result<()> {
997 let stats = ChannelTicketStatistics::default();
998 assert_eq!(stats.unredeemed_value, HoprBalance::zero());
999 assert_eq!(stats.redeemed_value, HoprBalance::zero());
1000 assert_eq!(stats.neglected_value, HoprBalance::zero());
1001 assert_eq!(stats.rejected_value, HoprBalance::zero());
1002 assert_eq!(stats.winning_tickets, 0);
1003
1004 Ok(())
1005 }
1006
1007 #[tokio::test]
1008 async fn test_ticket_stats_must_be_zero_for_non_existing_channel() -> anyhow::Result<()> {
1009 let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
1010
1011 let stats = db.get_ticket_statistics(Some(*CHANNEL_ID)).await?;
1012
1013 assert_eq!(stats, ChannelTicketStatistics::default());
1014
1015 Ok(())
1016 }
1017
1018 #[tokio::test]
1019 async fn test_ticket_stats_must_be_zero_when_no_tickets() -> anyhow::Result<()> {
1020 let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
1021
1022 let stats = db.get_ticket_statistics(Some(*CHANNEL_ID)).await?;
1023
1024 assert_eq!(
1025 ChannelTicketStatistics::default(),
1026 stats,
1027 "must be equal to default which is all zeros"
1028 );
1029
1030 assert_eq!(
1031 stats,
1032 db.get_ticket_statistics(None).await?,
1033 "per-channel stats must be the same as global stats"
1034 );
1035
1036 Ok(())
1037 }
1038
1039 #[tokio::test]
1040 async fn test_ticket_stats_must_be_different_per_channel() -> anyhow::Result<()> {
1041 let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
1042
1043 let channel_1 = generate_channel_id(BOB.public().as_ref(), ALICE.public().as_ref());
1044 let channel_2 = generate_channel_id(ALICE.public().as_ref(), BOB.public().as_ref());
1045
1046 let t1 = generate_random_ack_ticket(&BOB, &ALICE, 1, 1, 1.0)?;
1047 let t2 = generate_random_ack_ticket(&ALICE, &BOB, 1, 1, 1.0)?;
1048
1049 let value = t1.verified_ticket().amount;
1050
1051 db.upsert_ticket(t1).await?;
1052 db.upsert_ticket(t2).await?;
1053
1054 let stats_1 = db.get_ticket_statistics(Some(channel_1)).await?;
1055
1056 let stats_2 = db.get_ticket_statistics(Some(channel_2)).await?;
1057
1058 assert_eq!(value, stats_1.unredeemed_value);
1059 assert_eq!(value, stats_2.unredeemed_value);
1060
1061 assert_eq!(HoprBalance::zero(), stats_1.neglected_value);
1062 assert_eq!(HoprBalance::zero(), stats_2.neglected_value);
1063
1064 assert_eq!(stats_1, stats_2);
1065
1066 db.mark_tickets_as(TicketSelector::new(channel_1, CHANNEL_EPOCH), TicketMarker::Neglected)
1067 .await?;
1068
1069 let stats_1 = db.get_ticket_statistics(Some(channel_1)).await?;
1070
1071 let stats_2 = db.get_ticket_statistics(Some(channel_2)).await?;
1072
1073 assert_eq!(HoprBalance::zero(), stats_1.unredeemed_value);
1074 assert_eq!(value, stats_1.neglected_value);
1075
1076 assert_eq!(HoprBalance::zero(), stats_2.neglected_value);
1077
1078 Ok(())
1079 }
1080
1081 #[tokio::test]
1082 async fn test_ticket_index_compare_and_set_and_increment() -> anyhow::Result<()> {
1083 let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
1084
1085 let hash = Hash::default();
1086
1087 let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1088 assert_eq!(0, old_idx, "old value must be 0");
1089
1090 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1091 assert_eq!(1, new_idx, "new value must be 1");
1092
1093 let old_idx = db.increment_outgoing_ticket_index(hash).await?;
1094 assert_eq!(1, old_idx, "old value must be 1");
1095
1096 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1097 assert_eq!(2, new_idx, "new value must be 2");
1098
1099 Ok(())
1100 }
1101
1102 #[tokio::test]
1103 async fn test_ticket_index_compare_and_set_must_not_decrease() -> anyhow::Result<()> {
1104 let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
1105
1106 let hash = Hash::default();
1107
1108 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1109 assert_eq!(0, new_idx, "value must be 0");
1110
1111 db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1112
1113 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1114 assert_eq!(1, new_idx, "new value must be 1");
1115
1116 let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 0).await?;
1117 assert_eq!(1, old_idx, "old value must be 1");
1118 assert_eq!(1, new_idx, "new value must be 1");
1119
1120 let old_idx = db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1121 assert_eq!(1, old_idx, "old value must be 1");
1122 assert_eq!(1, new_idx, "new value must be 1");
1123
1124 Ok(())
1125 }
1126
1127 #[tokio::test]
1128 async fn test_ticket_index_reset() -> anyhow::Result<()> {
1129 let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
1130
1131 let hash = Hash::default();
1132
1133 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1134 assert_eq!(0, new_idx, "value must be 0");
1135
1136 db.compare_and_set_outgoing_ticket_index(hash, 1).await?;
1137
1138 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1139 assert_eq!(1, new_idx, "new value must be 1");
1140
1141 let old_idx = db.reset_outgoing_ticket_index(hash).await?;
1142 assert_eq!(1, old_idx, "old value must be 1");
1143
1144 let new_idx = db.get_outgoing_ticket_index(hash).await?.load(Ordering::SeqCst);
1145 assert_eq!(0, new_idx, "new value must be 0");
1146 Ok(())
1147 }
1148
1149 #[tokio::test]
1150 async fn test_persist_ticket_indices() -> anyhow::Result<()> {
1151 let db = HoprNodeDb::new_in_memory(ChainKeypair::random()).await?;
1152
1153 let hash_1 = Hash::default();
1154 let hash_2 = Hash::from(hopr_crypto_random::random_bytes());
1155
1156 db.get_outgoing_ticket_index(hash_1).await?;
1157 db.compare_and_set_outgoing_ticket_index(hash_2, 10).await?;
1158
1159 let persisted = db.persist_outgoing_ticket_indices().await?;
1160 assert_eq!(1, persisted);
1161
1162 let indices = hopr_db_entity::outgoing_ticket_index::Entity::find()
1163 .all(&db.tickets_db)
1164 .await?;
1165 let idx_1 = indices
1166 .iter()
1167 .find(|idx| idx.channel_id == hash_1.to_hex())
1168 .context("must contain index 1")?;
1169 let idx_2 = indices
1170 .iter()
1171 .find(|idx| idx.channel_id == hash_2.to_hex())
1172 .context("must contain index 2")?;
1173 assert_eq!(0, U256::from_be_bytes(&idx_1.index).as_u64(), "index must be 0");
1174 assert_eq!(10, U256::from_be_bytes(&idx_2.index).as_u64(), "index must be 10");
1175
1176 db.compare_and_set_outgoing_ticket_index(hash_1, 3).await?;
1177 db.increment_outgoing_ticket_index(hash_2).await?;
1178
1179 let persisted = db.persist_outgoing_ticket_indices().await?;
1180 assert_eq!(2, persisted);
1181
1182 let indices = hopr_db_entity::outgoing_ticket_index::Entity::find()
1183 .all(&db.tickets_db)
1184 .await?;
1185 let idx_1 = indices
1186 .iter()
1187 .find(|idx| idx.channel_id == hash_1.to_hex())
1188 .context("must contain index 1")?;
1189 let idx_2 = indices
1190 .iter()
1191 .find(|idx| idx.channel_id == hash_2.to_hex())
1192 .context("must contain index 2")?;
1193 assert_eq!(3, U256::from_be_bytes(&idx_1.index).as_u64(), "index must be 3");
1194 assert_eq!(11, U256::from_be_bytes(&idx_2.index).as_u64(), "index must be 11");
1195 Ok(())
1196 }
1197
1198 #[tokio::test]
1199 async fn test_cache_can_be_cloned_but_referencing_the_original_cache_storage() -> anyhow::Result<()> {
1200 let cache: moka::future::Cache<i64, i64> = moka::future::Cache::new(5);
1201
1202 assert_eq!(cache.weighted_size(), 0);
1203
1204 cache.insert(1, 1).await;
1205 cache.insert(2, 2).await;
1206
1207 let clone = cache.clone();
1208
1209 cache.remove(&1).await;
1210 cache.remove(&2).await;
1211
1212 assert_eq!(cache.get(&1).await, None);
1213 assert_eq!(cache.get(&1).await, clone.get(&1).await);
1214 Ok(())
1215 }
1216
1217 #[tokio::test]
1218 async fn test_set_ticket_statistics_when_tickets_are_in_db() -> anyhow::Result<()> {
1219 let db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
1220
1221 let ticket = init_db_with_tickets(&db, 1).await?.pop().unwrap();
1222
1223 db.mark_tickets_as((&ticket).into(), TicketMarker::Redeemed)
1224 .await
1225 .expect("must not fail");
1226
1227 let stats = db.get_ticket_statistics(None).await.expect("must not fail");
1228 assert_ne!(stats.redeemed_value, HoprBalance::zero());
1229
1230 db.reset_ticket_statistics().await.expect("must not fail");
1231
1232 let stats = db.get_ticket_statistics(None).await.expect("must not fail");
1233 assert_eq!(stats.redeemed_value, HoprBalance::zero());
1234
1235 Ok(())
1236 }
1237}