1use std::{
2 future::Future,
3 pin::Pin,
4 sync::{Arc, OnceLock},
5};
6
7use futures::{Sink, SinkExt, StreamExt, TryStreamExt, pin_mut};
8use hopr_api::db::TicketSelector;
9use hopr_async_runtime::prelude::spawn;
10use hopr_db_entity::ticket;
11use hopr_internal_types::tickets::AcknowledgedTicket;
12use hopr_primitive_types::prelude::{HoprBalance, IntoEndian, ToHex};
13use sea_orm::{
14 ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, QueryFilter, TransactionTrait,
15};
16use tracing::{debug, error};
17
18use crate::{cache::NodeDbCaches, db::HoprNodeDb, errors::NodeDbError, tickets::WrappedTicketSelector};
19
20#[derive(Debug, Clone)]
34pub(crate) struct TicketManager {
35 pub(crate) tickets_db: sea_orm::DatabaseConnection,
36 pub(crate) mutex: Arc<async_lock::Mutex<()>>,
37 incoming_ack_tickets_tx: Arc<OnceLock<futures::channel::mpsc::Sender<AcknowledgedTicket>>>,
38 caches: Arc<NodeDbCaches>,
39}
40
41impl TicketManager {
42 pub fn new(tickets_db: sea_orm::DatabaseConnection, caches: Arc<NodeDbCaches>) -> Self {
43 Self {
44 tickets_db,
45 mutex: Arc::new(async_lock::Mutex::new(())),
46 incoming_ack_tickets_tx: Arc::new(OnceLock::new()),
47 caches,
48 }
49 }
50
51 pub fn start_ticket_processing<S, E>(&self, ticket_notifier: S) -> Result<(), NodeDbError>
53 where
54 S: Sink<AcknowledgedTicket, Error = E> + Send + 'static,
55 E: std::error::Error,
56 {
57 let (tx, mut rx) = futures::channel::mpsc::channel::<AcknowledgedTicket>(100_000);
58
59 self.incoming_ack_tickets_tx
60 .set(tx)
61 .map_err(|_| NodeDbError::LogicalError("ticket processing already started".into()))?;
62
63 let db_clone = self.tickets_db.clone();
66 let mutex_clone = self.mutex.clone();
67
68 spawn(async move {
71 pin_mut!(ticket_notifier);
72 while let Some(ticket_to_insert) = rx.next().await {
73 let ticket_to_insert_clone = ticket_to_insert.clone();
74 let tx_result = {
75 let _quard = mutex_clone.lock().await;
76 db_clone
77 .transaction(|tx| {
78 Box::pin(async move {
79 let channel_id = ticket_to_insert_clone.verified_ticket().channel_id.to_hex();
81
82 hopr_db_entity::ticket::ActiveModel::from(ticket_to_insert_clone)
83 .insert(tx)
84 .await?;
85
86 let model = if let Some(model) = hopr_db_entity::ticket_statistics::Entity::find()
88 .filter(hopr_db_entity::ticket_statistics::Column::ChannelId.eq(channel_id.clone()))
89 .one(tx)
90 .await?
91 {
92 let winning_tickets = model.winning_tickets + 1;
93 let mut active_model = model.into_active_model();
94 active_model.winning_tickets = sea_orm::Set(winning_tickets);
95 active_model
96 } else {
97 hopr_db_entity::ticket_statistics::ActiveModel {
98 channel_id: sea_orm::Set(channel_id),
99 winning_tickets: sea_orm::Set(1),
100 ..Default::default()
101 }
102 };
103
104 model.save(tx).await?;
105 Ok::<_, sea_orm::DbErr>(())
106 })
107 })
108 .await
109 };
110
111 match tx_result {
112 Ok(_) => {
113 debug!(acknowledged_ticket = %ticket_to_insert, "ticket persisted into the ticket db");
114 if let Err(error) = ticket_notifier.send(ticket_to_insert).await {
116 error!(%error, "failed to notify the ticket notifier about the winning ticket");
117 }
118 }
119 Err(error) => {
120 error!(%error, "failed to insert the winning ticket and update the ticket stats");
121 }
122 };
123 }
124
125 tracing::info!(task = "ticket processing", "long-running background task finished")
126 });
127
128 Ok(())
129 }
130
131 pub async fn insert_ticket(&self, ticket: AcknowledgedTicket) -> Result<(), NodeDbError> {
136 let channel = ticket.verified_ticket().channel_id;
137 let value = ticket.verified_ticket().amount;
138 let epoch = ticket.verified_ticket().channel_epoch;
139
140 self.incoming_ack_tickets_tx
141 .get()
142 .ok_or(NodeDbError::LogicalError("ticket processing not started".into()))?
143 .clone()
144 .try_send(ticket)
145 .map_err(|e| {
146 NodeDbError::LogicalError(format!(
147 "failed to enqueue acknowledged ticket processing into the DB: {e}"
148 ))
149 })?;
150
151 let unrealized_value = self.unrealized_value(TicketSelector::new(channel, epoch)).await?;
152
153 #[cfg(all(feature = "prometheus", not(test)))]
154 {
155 crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS
156 .increment(&["unredeemed"], (value).amount().as_u128() as f64);
157 }
158
159 self.caches
160 .unrealized_value
161 .insert((channel, epoch.into()), unrealized_value + value)
162 .await;
163
164 Ok(())
165 }
166
167 pub async fn unrealized_value(&self, selector: TicketSelector) -> Result<HoprBalance, NodeDbError> {
169 if !selector.is_single_channel() {
170 return Err(NodeDbError::LogicalError(
171 "selector must represent a single channel".into(),
172 ));
173 }
174
175 let channel_id = selector.channel_identifiers[0].0;
176 let channel_epoch = selector.channel_identifiers[0].1;
177 let selector: WrappedTicketSelector = selector.into();
178
179 let tickets_db = self.tickets_db.clone();
180 let selector_clone = selector.clone();
181 Ok(self
182 .caches
183 .unrealized_value
184 .try_get_with_by_ref(&(channel_id, channel_epoch), async move {
185 tracing::warn!(%channel_id, %channel_epoch, "cache miss on unrealized value");
186 tickets_db
187 .transaction(|tx| {
188 Box::pin(async move {
189 ticket::Entity::find()
190 .filter(selector_clone)
191 .stream(tx)
192 .await?
193 .try_fold(HoprBalance::zero(), |value, t| async move {
194 Ok(value + HoprBalance::from_be_bytes(t.amount))
195 })
196 .await
197 })
198 })
199 .await
200 })
201 .await?)
202 }
203
204 pub async fn write_transaction<'a, F, T, E>(&self, action: F) -> Result<T, sea_orm::TransactionError<E>>
205 where
206 F: for<'c> FnOnce(&'c DatabaseTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>> + Send,
207 T: Send,
208 E: std::error::Error + Send,
209 {
210 let _guard = self.mutex.lock().await;
211 self.tickets_db.transaction(action).await
212 }
213}
214
215impl HoprNodeDb {
216 pub fn start_ticket_processing<S>(&self, ticket_notifier: Option<S>) -> Result<(), NodeDbError>
222 where
223 S: futures::Sink<AcknowledgedTicket> + Send + 'static,
224 S::Error: std::fmt::Display + std::error::Error,
225 {
226 if let Some(notifier) = ticket_notifier {
227 self.ticket_manager.start_ticket_processing(notifier)
228 } else {
229 self.ticket_manager.start_ticket_processing(futures::sink::drain())
230 }
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use futures::StreamExt;
237 use hex_literal::hex;
238 use hopr_crypto_random::Randomizable;
239 use hopr_crypto_types::prelude::*;
240 use hopr_internal_types::prelude::*;
241
242 use super::*;
243 use crate::db::HoprNodeDb;
244
245 lazy_static::lazy_static! {
246 static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
247 static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
248 static ref ALICE_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
249 static ref BOB_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
250 }
251
252 const TICKET_VALUE: u64 = 100_000;
253
254 fn generate_random_ack_ticket(index: u32) -> anyhow::Result<AcknowledgedTicket> {
255 let hk1 = HalfKey::random();
256 let hk2 = HalfKey::random();
257
258 let challenge = Response::from_half_keys(&hk1, &hk2)?.to_challenge()?;
259
260 let ticket = TicketBuilder::default()
261 .direction(BOB.public().as_ref(), ALICE.public().as_ref())
262 .amount(TICKET_VALUE)
263 .index(index as u64)
264 .channel_epoch(4)
265 .challenge(challenge)
266 .build_signed(&BOB, &Hash::default())?;
267
268 Ok(ticket.into_acknowledged(Response::from_half_keys(&hk1, &hk2)?))
269 }
270
271 #[tokio::test]
272 async fn test_insert_ticket_properly_resolves_the_cached_value() -> anyhow::Result<()> {
273 let db = HoprNodeDb::new_in_memory(BOB.clone()).await?;
274
275 let channel = ChannelEntry::new(
276 BOB.public().to_address(),
277 ALICE.public().to_address(),
278 u32::MAX.into(),
279 1.into(),
280 ChannelStatus::Open,
281 4_u32.into(),
282 );
283
284 assert_eq!(
285 HoprBalance::zero(),
286 db.ticket_manager.unrealized_value((&channel).into()).await?
287 );
288
289 let ticket = generate_random_ack_ticket(1)?;
290 let ticket_value = ticket.verified_ticket().amount;
291
292 let (tx, mut rx) = futures::channel::mpsc::unbounded();
293
294 db.ticket_manager.start_ticket_processing(tx)?;
295
296 db.ticket_manager.insert_ticket(ticket.clone()).await?;
297
298 assert_eq!(
299 ticket_value,
300 db.ticket_manager.unrealized_value((&channel).into()).await?
301 );
302
303 let recv_ticket = rx.next().await.ok_or(anyhow::anyhow!("no ticket received"))?;
304 assert_eq!(recv_ticket, ticket);
305
306 Ok(())
307 }
308}