use async_trait::async_trait;
use futures::StreamExt;
use hopr_chain_types::actions::Action;
use hopr_crypto_types::types::Hash;
use hopr_db_sql::api::info::DomainSeparator;
use hopr_db_sql::api::tickets::{HoprDbTicketOperations, TicketSelector};
use hopr_db_sql::channels::HoprDbChannelOperations;
use hopr_db_sql::prelude::HoprDbInfoOperations;
use hopr_internal_types::prelude::*;
use hopr_primitive_types::prelude::*;
use tracing::{debug, error, info, warn};
use crate::action_queue::PendingAction;
use crate::errors::ChainActionsError::{ChannelDoesNotExist, InvalidState, OldTicket};
use crate::errors::{ChainActionsError::WrongTicketState, Result};
use crate::ChainActions;
lazy_static::lazy_static! {
static ref EMPTY_TX_HASH: Hash = Hash::default();
}
#[async_trait]
pub trait TicketRedeemActions {
async fn redeem_all_tickets(&self, only_aggregated: bool) -> Result<Vec<PendingAction>>;
async fn redeem_tickets_with_counterparty(
&self,
counterparty: &Address,
only_aggregated: bool,
) -> Result<Vec<PendingAction>>;
async fn redeem_tickets_in_channel(
&self,
channel: &ChannelEntry,
only_aggregated: bool,
) -> Result<Vec<PendingAction>>;
async fn redeem_tickets(&self, selector: TicketSelector) -> Result<Vec<PendingAction>>;
async fn redeem_ticket(&self, ack: AcknowledgedTicket) -> Result<PendingAction>;
}
#[async_trait]
impl<Db> TicketRedeemActions for ChainActions<Db>
where
Db: HoprDbChannelOperations + HoprDbTicketOperations + HoprDbInfoOperations + Clone + Send + Sync + std::fmt::Debug,
{
#[tracing::instrument(level = "debug", skip(self))]
async fn redeem_all_tickets(&self, only_aggregated: bool) -> Result<Vec<PendingAction>> {
let incoming_channels = self
.db
.get_channels_via(None, ChannelDirection::Incoming, &self.self_address())
.await?;
debug!(
channel_count = incoming_channels.len(),
"starting to redeem all tickets in channels to self"
);
let mut receivers: Vec<PendingAction> = vec![];
for incoming_channel in incoming_channels {
match self.redeem_tickets_in_channel(&incoming_channel, only_aggregated).await {
Ok(mut successful_txs) => {
receivers.append(&mut successful_txs);
}
Err(e) => {
warn!(
channel = %generate_channel_id(&incoming_channel.source, &incoming_channel.destination),
error = %e,
"Failed to redeem tickets in channel",
);
}
}
}
Ok(receivers)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn redeem_tickets_with_counterparty(
&self,
counterparty: &Address,
only_aggregated: bool,
) -> Result<Vec<PendingAction>> {
let maybe_channel = self
.db
.get_channel_by_parties(None, counterparty, &self.self_address(), false)
.await?;
if let Some(channel) = maybe_channel {
self.redeem_tickets_in_channel(&channel, only_aggregated).await
} else {
Err(ChannelDoesNotExist)
}
}
#[tracing::instrument(level = "debug", skip(self))]
async fn redeem_tickets_in_channel(
&self,
channel: &ChannelEntry,
only_aggregated: bool,
) -> Result<Vec<PendingAction>> {
self.redeem_tickets(
TicketSelector::from(channel)
.with_aggregated_only(only_aggregated)
.with_index_range(channel.ticket_index.as_u64()..)
.with_state(AcknowledgedTicketStatus::Untouched),
)
.await
}
#[tracing::instrument(level = "debug", skip(self))]
async fn redeem_tickets(&self, selector: TicketSelector) -> Result<Vec<PendingAction>> {
let (count_redeemable_tickets, _) = self.db.get_tickets_value(selector.clone()).await?;
info!(
count_redeemable_tickets, %selector,
"acknowledged tickets in channel that can be redeemed"
);
if count_redeemable_tickets == 0 {
return Ok(vec![]);
}
let channel_dst = self
.db
.get_indexer_data(None)
.await?
.domain_separator(DomainSeparator::Channel)
.ok_or(InvalidState("missing channel dst".into()))?;
let selector_id = selector.to_string();
let redeem_stream = self
.db
.update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
.await?
.collect::<Vec<_>>()
.await;
let mut receivers: Vec<PendingAction> = vec![];
for ack_ticket in redeem_stream {
let ticket_id = ack_ticket.to_string();
if let Ok(redeemable) = ack_ticket.into_redeemable(&self.chain_key, &channel_dst) {
let action = self.tx_sender.send(Action::RedeemTicket(redeemable)).await;
match action {
Ok(successful_tx) => {
receivers.push(successful_tx);
}
Err(e) => {
error!(ticket_id, error = %e, "Failed to submit transaction that redeems ticket",);
}
}
} else {
error!("failed to extract redeemable ticket");
}
}
info!(
count = receivers.len(),
selector = selector_id,
"acknowledged tickets were submitted to redeem in channel",
);
Ok(receivers)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> Result<PendingAction> {
if let Some(channel) = self
.db
.get_channel_by_id(None, &ack_ticket.verified_ticket().channel_id)
.await?
{
if ack_ticket.verified_ticket().index < channel.ticket_index.as_u64() {
return Err(OldTicket);
}
debug!(%ack_ticket, %channel, "redeeming single ticket");
let selector = TicketSelector::from(&channel)
.with_index(ack_ticket.verified_ticket().index)
.with_state(AcknowledgedTicketStatus::Untouched);
let maybe_ticket = self
.db
.update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
.await?
.next()
.await;
if let Some(ticket) = maybe_ticket {
let channel_dst = self
.db
.get_indexer_data(None)
.await?
.domain_separator(DomainSeparator::Channel)
.ok_or(InvalidState("missing channel dst".into()))?;
let redeemable = ticket.into_redeemable(&self.chain_key, &channel_dst)?;
debug!(%ack_ticket, "ticket is redeemable");
Ok(self.tx_sender.send(Action::RedeemTicket(redeemable)).await?)
} else {
Err(WrongTicketState(ack_ticket.to_string()))
}
} else {
Err(ChannelDoesNotExist)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::FutureExt;
use hex_literal::hex;
use hopr_chain_types::chain_events::ChainEventType::TicketRedeemed;
use hopr_chain_types::chain_events::SignificantChainEvent;
use hopr_crypto_random::random_bytes;
use hopr_crypto_types::prelude::*;
use hopr_db_sql::api::info::DomainSeparator;
use hopr_db_sql::db::HoprDb;
use hopr_db_sql::errors::DbSqlError;
use hopr_db_sql::info::HoprDbInfoOperations;
use hopr_db_sql::{HoprDbGeneralModelOperations, TargetDb};
use crate::action_queue::{ActionQueue, MockTransactionExecutor};
use crate::action_state::MockActionState;
lazy_static::lazy_static! {
static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be constructible");
static ref CHARLIE: ChainKeypair = ChainKeypair::from_secret(&hex!("d39a926980d6fa96a9eba8f8058b2beb774bc11866a386e9ddf9dc1152557c26")).expect("lazy static keypair should be constructible");
}
fn generate_random_ack_ticket(
idx: u64,
counterparty: &ChainKeypair,
channel_epoch: u32,
) -> anyhow::Result<AcknowledgedTicket> {
let hk1 = HalfKey::random();
let hk2 = HalfKey::random();
let cp1: CurvePoint = hk1.to_challenge().try_into()?;
let cp2: CurvePoint = hk2.to_challenge().try_into()?;
let cp_sum = CurvePoint::combine(&[&cp1, &cp2]);
let price_per_packet: U256 = 10000000000000000u128.into(); Ok(TicketBuilder::default()
.addresses(counterparty, &*ALICE)
.amount(price_per_packet.div_f64(1.0f64)? * 5u32)
.index(idx)
.index_offset(1)
.win_prob(1.0)
.channel_epoch(channel_epoch)
.challenge(Challenge::from(cp_sum).to_ethereum_challenge())
.build_signed(counterparty, &Hash::default())?
.into_acknowledged(Response::from_half_keys(&hk1, &hk2)?))
}
async fn create_channel_with_ack_tickets(
db: HoprDb,
ticket_count: usize,
counterparty: &ChainKeypair,
channel_epoch: u32,
) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
let ckp = counterparty.clone();
let db_clone = db.clone();
let channel = db
.begin_transaction()
.await?
.perform(|tx| {
Box::pin(async move {
db_clone
.set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
.await?;
let channel = ChannelEntry::new(
ckp.public().to_address(),
ALICE.public().to_address(),
Balance::zero(BalanceType::HOPR),
U256::zero(),
ChannelStatus::Open,
channel_epoch.into(),
);
db_clone.upsert_channel(Some(tx), channel).await?;
Ok::<_, DbSqlError>(channel)
})
})
.await?;
let ckp = counterparty.clone();
let input_tickets = db
.begin_transaction_in_db(TargetDb::Tickets)
.await?
.perform(|tx| {
Box::pin(async move {
let mut input_tickets = Vec::new();
for i in 0..ticket_count {
let ack_ticket = generate_random_ack_ticket(i as u64, &ckp, channel_epoch)
.map_err(|e| hopr_db_sql::errors::DbSqlError::LogicalError(e.to_string()))?;
db.upsert_ticket(Some(tx), ack_ticket.clone()).await?;
input_tickets.push(ack_ticket);
}
Ok::<_, DbSqlError>(input_tickets)
})
})
.await?;
Ok((channel, input_tickets))
}
#[async_std::test]
async fn test_ticket_redeem_flow() -> anyhow::Result<()> {
let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
let ticket_count = 5;
let db = HoprDb::new_in_memory(ALICE.clone()).await?;
let (channel_from_bob, bob_tickets) =
create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
let (channel_from_charlie, charlie_tickets) =
create_channel_with_ack_tickets(db.clone(), ticket_count, &CHARLIE, 4u32).await?;
let mut indexer_action_tracker = MockActionState::new();
let mut seq2 = mockall::Sequence::new();
for tkt in bob_tickets.iter().cloned() {
indexer_action_tracker
.expect_register_expectation()
.once()
.in_sequence(&mut seq2)
.return_once(move |_| {
Ok(futures::future::ok(SignificantChainEvent {
tx_hash: random_hash,
event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
})
.boxed())
});
}
for tkt in charlie_tickets.iter().cloned() {
indexer_action_tracker
.expect_register_expectation()
.once()
.in_sequence(&mut seq2)
.return_once(move |_| {
Ok(futures::future::ok(SignificantChainEvent {
tx_hash: random_hash,
event_type: TicketRedeemed(channel_from_charlie, Some(tkt)),
})
.boxed())
});
}
let mut tx_exec = MockTransactionExecutor::new();
let mut seq = mockall::Sequence::new();
tx_exec
.expect_redeem_ticket()
.times(ticket_count)
.in_sequence(&mut seq)
.withf(move |t| bob_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
.returning(move |_| Ok(random_hash));
tx_exec
.expect_redeem_ticket()
.times(ticket_count)
.in_sequence(&mut seq)
.withf(move |t| charlie_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
.returning(move |_| Ok(random_hash));
let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
let tx_sender = tx_queue.new_sender();
async_std::task::spawn(async move {
tx_queue.start().await;
});
let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
let confirmations = futures::future::try_join_all(actions.redeem_all_tickets(false).await?.into_iter()).await?;
assert_eq!(2 * ticket_count, confirmations.len(), "must have all confirmations");
assert!(
confirmations.into_iter().all(|c| c.tx_hash == random_hash),
"tx hashes must be equal"
);
let db_acks_bob = db.get_tickets((&channel_from_bob).into()).await?;
let db_acks_charlie = db.get_tickets((&channel_from_charlie).into()).await?;
assert!(
db_acks_bob
.into_iter()
.all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
"all bob's tickets must be in BeingRedeemed state"
);
assert!(
db_acks_charlie
.into_iter()
.all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
"all charlie's tickets must be in BeingRedeemed state"
);
Ok(())
}
#[async_std::test]
async fn test_ticket_redeem_in_channel() -> anyhow::Result<()> {
let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
let ticket_count = 5;
let db = HoprDb::new_in_memory(ALICE.clone()).await?;
let (mut channel_from_bob, bob_tickets) =
create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
let (channel_from_charlie, _) =
create_channel_with_ack_tickets(db.clone(), ticket_count, &CHARLIE, 4u32).await?;
channel_from_bob.ticket_index = 1_u32.into();
db.upsert_channel(None, channel_from_bob.clone()).await?;
let mut indexer_action_tracker = MockActionState::new();
let mut seq2 = mockall::Sequence::new();
for tkt in bob_tickets.iter().cloned() {
indexer_action_tracker
.expect_register_expectation()
.once()
.in_sequence(&mut seq2)
.return_once(move |_| {
Ok(futures::future::ok(SignificantChainEvent {
tx_hash: random_hash,
event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
})
.boxed())
});
}
let mut tx_exec = MockTransactionExecutor::new();
tx_exec
.expect_redeem_ticket()
.times(ticket_count - 1)
.withf(move |t| bob_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
.returning(move |_| Ok(random_hash));
let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
let tx_sender = tx_queue.new_sender();
async_std::task::spawn(async move {
tx_queue.start().await;
});
let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
let confirmations = futures::future::try_join_all(
actions
.redeem_tickets_with_counterparty(&BOB.public().to_address(), false)
.await?
.into_iter(),
)
.await?;
assert_eq!(ticket_count - 1, confirmations.len(), "must have all confirmations");
assert!(
confirmations.into_iter().all(|c| c.tx_hash == random_hash),
"tx hashes must be equal"
);
let db_acks_bob = db.get_tickets((&channel_from_bob).into()).await?;
let db_acks_charlie = db.get_tickets((&channel_from_charlie).into()).await?;
assert!(
db_acks_bob
.into_iter()
.take_while(|tkt| tkt.verified_ticket().index != 0)
.all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
"all bob's tickets must be in BeingRedeemed state"
);
assert!(
db_acks_charlie
.into_iter()
.all(|tkt| tkt.status == AcknowledgedTicketStatus::Untouched),
"all charlie's tickets must be in Untouched state"
);
Ok(())
}
#[async_std::test]
async fn test_redeem_must_not_work_for_tickets_being_aggregated_and_being_redeemed() -> anyhow::Result<()> {
let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
let ticket_count = 3;
let db = HoprDb::new_in_memory(ALICE.clone()).await?;
let (channel_from_bob, mut tickets) =
create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
tickets[0].status = AcknowledgedTicketStatus::BeingAggregated;
let selector = TicketSelector::from(&tickets[0]).with_no_state();
db.update_ticket_states(selector, AcknowledgedTicketStatus::BeingAggregated)
.await?;
tickets[1].status = AcknowledgedTicketStatus::BeingRedeemed;
let selector = TicketSelector::from(&tickets[1]).with_no_state();
db.update_ticket_states(selector, AcknowledgedTicketStatus::BeingRedeemed)
.await?;
let tickets_clone = tickets.clone();
let mut tx_exec = MockTransactionExecutor::new();
tx_exec
.expect_redeem_ticket()
.times(ticket_count - 2)
.withf(move |t| tickets_clone[2..].iter().any(|tk| tk.ticket.eq(&t.ticket)))
.returning(move |_| Ok(random_hash));
let mut indexer_action_tracker = MockActionState::new();
for tkt in tickets.iter().skip(2).cloned() {
indexer_action_tracker
.expect_register_expectation()
.once()
.return_once(move |_| {
Ok(futures::future::ok(SignificantChainEvent {
tx_hash: random_hash,
event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
})
.boxed())
});
}
let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
let tx_sender = tx_queue.new_sender();
async_std::task::spawn(async move {
tx_queue.start().await;
});
let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
let confirmations = futures::future::try_join_all(
actions
.redeem_tickets_in_channel(&channel_from_bob, false)
.await?
.into_iter(),
)
.await?;
assert_eq!(
ticket_count - 2,
confirmations.len(),
"must redeem only redeemable tickets in channel"
);
assert!(
actions.redeem_ticket(tickets[0].clone()).await.is_err(),
"cannot redeem a ticket that's being aggregated"
);
assert!(
actions.redeem_ticket(tickets[1].clone()).await.is_err(),
"cannot redeem a ticket that's being redeemed"
);
Ok(())
}
#[async_std::test]
async fn test_redeem_must_not_work_for_tickets_of_previous_epoch_being_aggregated_and_being_redeemed(
) -> anyhow::Result<()> {
let ticket_count = 3;
let ticket_from_previous_epoch_count = 2;
let db = HoprDb::new_in_memory(ALICE.clone()).await?;
let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
let (channel_from_bob, mut tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 4u32).await?;
let ticket = generate_random_ack_ticket(0, &BOB, 3)?;
db.upsert_ticket(None, ticket.clone()).await?;
tickets.insert(0, ticket);
let ticket = generate_random_ack_ticket(1, &BOB, 3)?;
db.upsert_ticket(None, ticket.clone()).await?;
tickets.insert(1, ticket);
let tickets_clone = tickets.clone();
let mut tx_exec = MockTransactionExecutor::new();
tx_exec
.expect_redeem_ticket()
.times(ticket_count - ticket_from_previous_epoch_count)
.withf(move |t| {
tickets_clone[ticket_from_previous_epoch_count..]
.iter()
.any(|tk| tk.ticket.eq(&t.ticket))
})
.returning(move |_| Ok(random_hash));
let mut indexer_action_tracker = MockActionState::new();
for tkt in tickets.iter().skip(ticket_from_previous_epoch_count).cloned() {
indexer_action_tracker
.expect_register_expectation()
.once()
.return_once(move |_| {
Ok(futures::future::ok(SignificantChainEvent {
tx_hash: random_hash,
event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
})
.boxed())
});
}
let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
let tx_sender = tx_queue.new_sender();
async_std::task::spawn(async move {
tx_queue.start().await;
});
let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
futures::future::join_all(
actions
.redeem_tickets_in_channel(&channel_from_bob, false)
.await?
.into_iter(),
)
.await;
assert!(
actions.redeem_ticket(tickets[0].clone()).await.is_err(),
"cannot redeem a ticket that's from the previous epoch"
);
Ok(())
}
#[async_std::test]
async fn test_redeem_must_not_work_for_tickets_of_next_epoch_being_redeemed() -> anyhow::Result<()> {
let ticket_count = 4;
let ticket_from_next_epoch_count = 2;
let db = HoprDb::new_in_memory(ALICE.clone()).await?;
let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
let (channel_from_bob, mut tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 4u32).await?;
let ticket = generate_random_ack_ticket(0, &BOB, 5)?;
db.upsert_ticket(None, ticket.clone()).await?;
tickets.insert(0, ticket);
let ticket = generate_random_ack_ticket(1, &BOB, 5)?;
db.upsert_ticket(None, ticket.clone()).await?;
tickets.insert(1, ticket);
let tickets_clone = tickets.clone();
let mut tx_exec = MockTransactionExecutor::new();
tx_exec
.expect_redeem_ticket()
.times(ticket_count - ticket_from_next_epoch_count)
.withf(move |t| {
tickets_clone[ticket_from_next_epoch_count..]
.iter()
.any(|tk| tk.ticket.eq(&t.ticket))
})
.returning(move |_| Ok(random_hash));
let mut indexer_action_tracker = MockActionState::new();
for tkt in tickets.iter().skip(ticket_from_next_epoch_count).cloned() {
indexer_action_tracker
.expect_register_expectation()
.once()
.return_once(move |_| {
Ok(futures::future::ok(SignificantChainEvent {
tx_hash: random_hash,
event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
})
.boxed())
});
}
let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
let tx_sender = tx_queue.new_sender();
async_std::task::spawn(async move {
tx_queue.start().await;
});
let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
futures::future::join_all(
actions
.redeem_tickets_in_channel(&channel_from_bob, false)
.await?
.into_iter(),
)
.await;
for unredeemable_index in 0..ticket_from_next_epoch_count {
assert!(
actions
.redeem_ticket(tickets[unredeemable_index].clone())
.await
.is_err(),
"cannot redeem a ticket that's from the next epoch"
);
}
Ok(())
}
#[async_std::test]
async fn test_should_redeem_single_ticket() -> anyhow::Result<()> {
let db = HoprDb::new_in_memory(ALICE.clone()).await?;
let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
let (channel_from_bob, tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 1u32).await?;
let ticket = tickets.into_iter().next().unwrap();
let mut tx_exec = MockTransactionExecutor::new();
let ticket_clone = ticket.clone();
tx_exec
.expect_redeem_ticket()
.once()
.withf(move |t| ticket_clone.ticket.eq(&t.ticket))
.returning(move |_| Ok(random_hash));
let mut indexer_action_tracker = MockActionState::new();
let ticket_clone = ticket.clone();
indexer_action_tracker
.expect_register_expectation()
.once()
.return_once(move |_| {
Ok(futures::future::ok(SignificantChainEvent {
tx_hash: random_hash,
event_type: TicketRedeemed(channel_from_bob, Some(ticket_clone)),
})
.boxed())
});
let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
let tx_sender = tx_queue.new_sender();
async_std::task::spawn(async move {
tx_queue.start().await;
});
let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
let confirmation = actions.redeem_ticket(ticket).await?.await?;
assert_eq!(confirmation.tx_hash, random_hash);
assert!(
db.get_tickets((&channel_from_bob).into())
.await?
.into_iter()
.all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
"all bob's tickets must be in BeingRedeemed state"
);
Ok(())
}
#[async_std::test]
async fn test_should_not_redeem_single_ticket_with_lower_index_than_channel_index() -> anyhow::Result<()> {
let db = HoprDb::new_in_memory(ALICE.clone()).await?;
let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
let (mut channel_from_bob, tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 1u32).await?;
channel_from_bob.ticket_index = 2_u32.into();
db.upsert_channel(None, channel_from_bob.clone()).await?;
let ticket = tickets.into_iter().next().unwrap();
let mut tx_exec = MockTransactionExecutor::new();
let ticket_clone = ticket.clone();
tx_exec
.expect_redeem_ticket()
.never()
.withf(move |t| ticket_clone.ticket.eq(&t.ticket))
.returning(move |_| Ok(random_hash));
let mut indexer_action_tracker = MockActionState::new();
let ticket_clone = ticket.clone();
indexer_action_tracker
.expect_register_expectation()
.never()
.return_once(move |_| {
Ok(futures::future::ok(SignificantChainEvent {
tx_hash: random_hash,
event_type: TicketRedeemed(channel_from_bob, Some(ticket_clone)),
})
.boxed())
});
let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
let tx_sender = tx_queue.new_sender();
async_std::task::spawn(async move {
tx_queue.start().await;
});
let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
assert!(matches!(actions.redeem_ticket(ticket).await, Err(OldTicket)));
Ok(())
}
}