1use futures::{future::BoxFuture, pin_mut, Sink, SinkExt, StreamExt, TryStreamExt};
2use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, TransactionTrait};
3use std::sync::{Arc, OnceLock};
4use tracing::{debug, error};
5
6use hopr_async_runtime::prelude::spawn;
7use hopr_db_api::tickets::TicketSelector;
8use hopr_db_entity::ticket;
9use hopr_internal_types::prelude::AcknowledgedTicketStatus;
10use hopr_internal_types::tickets::AcknowledgedTicket;
11use hopr_primitive_types::prelude::ToHex;
12use hopr_primitive_types::primitives::{Balance, BalanceType};
13
14use crate::cache::HoprDbCaches;
15use crate::prelude::DbSqlError;
16use crate::tickets::WrappedTicketSelector;
17use crate::{errors::Result, OpenTransaction};
18
19#[derive(Debug, Clone)]
33pub(crate) struct TicketManager {
34 pub(crate) tickets_db: sea_orm::DatabaseConnection,
35 pub(crate) mutex: Arc<async_lock::Mutex<()>>,
36 incoming_ack_tickets_tx: Arc<OnceLock<futures::channel::mpsc::Sender<TicketOperation>>>,
37 caches: Arc<HoprDbCaches>,
38}
39
40enum TicketOperation {
41 Insert(AcknowledgedTicket),
43 Replace(AcknowledgedTicket),
45}
46
47impl TicketOperation {
48 fn ticket(&self) -> &AcknowledgedTicket {
49 match self {
50 TicketOperation::Insert(ticket) => ticket,
51 TicketOperation::Replace(ticket) => ticket,
52 }
53 }
54}
55
56impl TicketManager {
57 pub fn new(tickets_db: sea_orm::DatabaseConnection, caches: Arc<HoprDbCaches>) -> Self {
58 Self {
59 tickets_db,
60 mutex: Arc::new(async_lock::Mutex::new(())),
61 incoming_ack_tickets_tx: Arc::new(OnceLock::new()),
62 caches,
63 }
64 }
65
66 pub fn start_ticket_processing<S, E>(&self, ticket_notifier: S) -> Result<()>
68 where
69 S: Sink<AcknowledgedTicket, Error = E> + Send + 'static,
70 E: std::error::Error,
71 {
72 let (tx, mut rx) = futures::channel::mpsc::channel::<TicketOperation>(100_000);
73
74 self.incoming_ack_tickets_tx
75 .set(tx)
76 .map_err(|_| DbSqlError::LogicalError("ticket processing already started".into()))?;
77
78 let db_clone = self.tickets_db.clone();
81 let mutex_clone = self.mutex.clone();
82
83 spawn(async move {
86 pin_mut!(ticket_notifier);
87 while let Some(ticket_op) = rx.next().await {
88 let ticket_to_insert = ticket_op.ticket().clone();
89 let ticket_inserted = match db_clone
90 .begin_with_config(None, None)
91 .await
92 .map_err(DbSqlError::BackendError)
93 {
94 Ok(transaction) => {
95 let transaction = OpenTransaction(transaction, crate::TargetDb::Tickets);
96
97 let _quard = mutex_clone.lock().await;
98
99 if let Err(error) = transaction
100 .perform(|tx| {
101 Box::pin(async move {
102 match ticket_op {
103 TicketOperation::Insert(ack_ticket) => {
105 let channel_id = ack_ticket.verified_ticket().channel_id.to_hex();
106
107 hopr_db_entity::ticket::ActiveModel::from(ack_ticket)
108 .insert(tx.as_ref())
109 .await?;
110
111 if let Some(model) = hopr_db_entity::ticket_statistics::Entity::find()
113 .filter(
114 hopr_db_entity::ticket_statistics::Column::ChannelId.eq(channel_id.clone()),
115 )
116 .one(tx.as_ref())
117 .await?
118 {
119 let winning_tickets = model.winning_tickets + 1;
120 let mut active_model = model.into_active_model();
121 active_model.winning_tickets = sea_orm::Set(winning_tickets);
122 active_model
123 } else {
124 hopr_db_entity::ticket_statistics::ActiveModel {
125 channel_id: sea_orm::Set(channel_id),
126 winning_tickets: sea_orm::Set(1),
127 ..Default::default()
128 }
129 }
130 .save(tx.as_ref())
131 .await?;
132 }
133 TicketOperation::Replace(ack_ticket) => {
134 let start_idx = ack_ticket.verified_ticket().index;
136 let offset = ack_ticket.verified_ticket().index_offset as u64;
137
138 let selector = TicketSelector::new(ack_ticket.verified_ticket().channel_id, ack_ticket.verified_ticket().channel_epoch)
140 .with_index_range(start_idx..start_idx + offset)
141 .with_state(AcknowledgedTicketStatus::BeingAggregated);
142
143 let deleted = ticket::Entity::delete_many()
144 .filter(WrappedTicketSelector::from(selector))
145 .exec(tx.as_ref())
146 .await?;
147
148 if deleted.rows_affected > offset {
149 return Err(DbSqlError::LogicalError(format!(
150 "deleted ticket count ({}) must not be more than the ticket index offset {offset}",
151 deleted.rows_affected,
152 )));
153 }
154
155 ticket::Entity::insert::<ticket::ActiveModel>(ack_ticket.into())
156 .exec(tx.as_ref())
157 .await?;
158 }
159 }
160 Ok::<_, DbSqlError>(())
161 })
162 })
163 .await
164 {
165 error!(%error, "failed to insert the winning ticket and update the ticket stats");
166 false
167 } else {
168 debug!(acknowledged_ticket = %ticket_to_insert, "ticket persisted into the ticket db");
169 true
170 }
171 }
172 Err(error) => {
173 error!(%error, "failed to create a transaction for ticket insertion");
174 false
175 }
176 };
177
178 if ticket_inserted {
180 if let Err(error) = ticket_notifier.send(ticket_to_insert).await {
181 error!(%error, "failed to notify the ticket notifier about the winning ticket");
182 }
183 }
184 }
185 });
186
187 Ok(())
188 }
189
190 pub async fn insert_ticket(&self, ticket: AcknowledgedTicket) -> Result<()> {
195 let channel = ticket.verified_ticket().channel_id;
196 let value = ticket.verified_ticket().amount;
197 let epoch = ticket.verified_ticket().channel_epoch;
198
199 self.incoming_ack_tickets_tx
200 .get()
201 .ok_or(DbSqlError::LogicalError("ticket processing not started".into()))?
202 .clone()
203 .try_send(TicketOperation::Insert(ticket))
204 .map_err(|e| {
205 DbSqlError::LogicalError(format!(
206 "failed to enqueue acknowledged ticket processing into the DB: {e}"
207 ))
208 })?;
209
210 let unrealized_value = self.unrealized_value(TicketSelector::new(channel, epoch)).await?;
211
212 #[cfg(all(feature = "prometheus", not(test)))]
213 {
214 crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
215 &[&channel.to_string(), "unredeemed"],
216 (unrealized_value + value).amount().as_u128() as f64,
217 );
218 }
219
220 self.caches
221 .unrealized_value
222 .insert((channel, epoch.into()), unrealized_value + value)
223 .await;
224
225 Ok(())
226 }
227
228 pub async fn replace_tickets(&self, ticket: AcknowledgedTicket) -> Result<()> {
233 self.incoming_ack_tickets_tx
234 .get()
235 .ok_or(DbSqlError::LogicalError("ticket processing not started".into()))?
236 .clone()
237 .try_send(TicketOperation::Replace(ticket))
238 .map_err(|e| {
239 DbSqlError::LogicalError(format!(
240 "failed to enqueue acknowledged ticket processing into the DB: {e}"
241 ))
242 })
243 }
244
245 pub async fn unrealized_value(&self, selector: TicketSelector) -> Result<Balance> {
247 if !selector.is_single_channel() {
248 return Err(crate::DbSqlError::LogicalError(
249 "selector must represent a single channel".into(),
250 ));
251 }
252
253 let channel_id = selector.channel_identifiers[0].0;
254 let channel_epoch = selector.channel_identifiers[0].1;
255 let selector: WrappedTicketSelector = selector.into();
256
257 let transaction = OpenTransaction(
258 self.tickets_db
259 .begin_with_config(None, None)
260 .await
261 .map_err(crate::errors::DbSqlError::BackendError)?,
262 crate::TargetDb::Tickets,
263 );
264
265 let selector_clone = selector.clone();
266 Ok(self
267 .caches
268 .unrealized_value
269 .try_get_with_by_ref(&(channel_id, channel_epoch), async move {
270 transaction
271 .perform(|tx| {
272 Box::pin(async move {
273 ticket::Entity::find()
274 .filter(selector_clone)
275 .stream(tx.as_ref())
276 .await
277 .map_err(crate::errors::DbSqlError::from)?
278 .map_err(crate::errors::DbSqlError::from)
279 .try_fold(BalanceType::HOPR.zero(), |value, t| async move {
280 Ok(value + BalanceType::HOPR.balance_bytes(t.amount))
281 })
282 .await
283 })
284 })
285 .await
286 })
287 .await?)
288 }
289
290 pub async fn with_write_locked_db<'a, F, T, E>(&'a self, f: F) -> std::result::Result<T, E>
292 where
293 F: for<'c> FnOnce(&'c OpenTransaction) -> BoxFuture<'c, std::result::Result<T, E>> + Send,
294 T: Send,
295 E: std::error::Error + From<crate::errors::DbSqlError>,
296 {
297 let mutex = self.mutex.clone();
298 let _guard = mutex.lock().await;
299
300 let transaction = OpenTransaction(
301 self.tickets_db
302 .begin_with_config(None, None)
303 .await
304 .map_err(crate::errors::DbSqlError::BackendError)?,
305 crate::TargetDb::Tickets,
306 );
307
308 transaction.perform(f).await
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use futures::StreamExt;
315 use hex_literal::hex;
316 use hopr_crypto_types::prelude::*;
317 use hopr_db_api::info::DomainSeparator;
318 use hopr_internal_types::prelude::*;
319 use hopr_primitive_types::prelude::*;
320
321 use crate::accounts::HoprDbAccountOperations;
322 use crate::channels::HoprDbChannelOperations;
323 use crate::db::HoprDb;
324 use crate::info::HoprDbInfoOperations;
325
326 lazy_static::lazy_static! {
327 static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
328 static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
329 }
330
331 lazy_static::lazy_static! {
332 static ref ALICE_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
333 static ref BOB_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
334 }
335
336 const TICKET_VALUE: u64 = 100_000;
337
338 async fn add_peer_mappings(db: &HoprDb, peers: Vec<(OffchainKeypair, ChainKeypair)>) -> crate::errors::Result<()> {
339 for (peer_offchain, peer_onchain) in peers.into_iter() {
340 db.insert_account(
341 None,
342 AccountEntry {
343 public_key: *peer_offchain.public(),
344 chain_addr: peer_onchain.public().to_address(),
345 entry_type: AccountType::NotAnnounced,
346 },
347 )
348 .await?
349 }
350
351 Ok(())
352 }
353
354 fn generate_random_ack_ticket(index: u32) -> anyhow::Result<AcknowledgedTicket> {
355 let hk1 = HalfKey::random();
356 let hk2 = HalfKey::random();
357
358 let cp1: CurvePoint = hk1.to_challenge().try_into()?;
359 let cp2: CurvePoint = hk2.to_challenge().try_into()?;
360 let cp_sum = CurvePoint::combine(&[&cp1, &cp2]);
361
362 let ticket = TicketBuilder::default()
363 .direction(&BOB.public().to_address(), &ALICE.public().to_address())
364 .amount(TICKET_VALUE)
365 .index(index as u64)
366 .channel_epoch(4)
367 .challenge(Challenge::from(cp_sum).to_ethereum_challenge())
368 .build_signed(&BOB, &Hash::default())?;
369
370 Ok(ticket.into_acknowledged(Response::from_half_keys(&hk1, &hk2)?))
371 }
372
373 #[async_std::test]
374 async fn test_insert_ticket_properly_resolves_the_cached_value() -> anyhow::Result<()> {
375 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
376 db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
377 .await?;
378 add_peer_mappings(
379 &db,
380 vec![
381 (ALICE_OFFCHAIN.clone(), ALICE.clone()),
382 (BOB_OFFCHAIN.clone(), BOB.clone()),
383 ],
384 )
385 .await?;
386
387 let channel = ChannelEntry::new(
388 BOB.public().to_address(),
389 ALICE.public().to_address(),
390 BalanceType::HOPR.balance(u32::MAX),
391 1.into(),
392 ChannelStatus::Open,
393 4_u32.into(),
394 );
395
396 db.upsert_channel(None, channel).await?;
397
398 assert_eq!(
399 Balance::zero(BalanceType::HOPR),
400 db.ticket_manager.unrealized_value((&channel).into()).await?
401 );
402
403 let ticket = generate_random_ack_ticket(1)?;
404 let ticket_value = ticket.verified_ticket().amount;
405
406 let (tx, mut rx) = futures::channel::mpsc::unbounded();
407
408 db.ticket_manager.start_ticket_processing(tx)?;
409
410 db.ticket_manager.insert_ticket(ticket.clone()).await?;
411
412 assert_eq!(
413 ticket_value,
414 db.ticket_manager.unrealized_value((&channel).into()).await?
415 );
416
417 let recv_ticket = rx.next().await.ok_or(anyhow::anyhow!("no ticket received"))?;
418 assert_eq!(recv_ticket, ticket);
419
420 Ok(())
421 }
422}