hopr_db_node/
ticket_manager.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    sync::{Arc, OnceLock},
5};
6
7use futures::{Sink, SinkExt, StreamExt, TryStreamExt, pin_mut};
8use hopr_api::db::TicketSelector;
9use hopr_async_runtime::prelude::spawn;
10use hopr_db_entity::ticket;
11use hopr_internal_types::tickets::AcknowledgedTicket;
12use hopr_primitive_types::prelude::{HoprBalance, IntoEndian, ToHex};
13use sea_orm::{
14    ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, QueryFilter, TransactionTrait,
15};
16use tracing::{debug, error};
17
18use crate::{cache::NodeDbCaches, db::HoprNodeDb, errors::NodeDbError, tickets::WrappedTicketSelector};
19
20/// Functionality related to locking and structural improvements to the underlying SQLite database
21///
22/// With SQLite, it is only possible to have a single write lock per database, meaning that
23/// high-frequency database access to tickets needed to be split from the rest of the database
24/// operations.
25///
26/// High frequency of locking originating from the ticket processing pipeline could starve the DB
27/// and lock with other concurrent processes. Therefore, a single mutex for write operations exists,
28/// which allows bottle-necking the database write access on the mutex, as well as allowing arbitrary
29/// numbers of concurrent read operations.
30///
31/// The queue-based mechanism also splits the storage of the ticket inside the database from the processing,
32/// effectively allowing the processing pipelines to be independent of a database write access.
33#[derive(Debug, Clone)]
34pub(crate) struct TicketManager {
35    pub(crate) tickets_db: sea_orm::DatabaseConnection,
36    pub(crate) mutex: Arc<async_lock::Mutex<()>>,
37    incoming_ack_tickets_tx: Arc<OnceLock<futures::channel::mpsc::Sender<AcknowledgedTicket>>>,
38    caches: Arc<NodeDbCaches>,
39}
40
41impl TicketManager {
42    pub fn new(tickets_db: sea_orm::DatabaseConnection, caches: Arc<NodeDbCaches>) -> Self {
43        Self {
44            tickets_db,
45            mutex: Arc::new(async_lock::Mutex::new(())),
46            incoming_ack_tickets_tx: Arc::new(OnceLock::new()),
47            caches,
48        }
49    }
50
51    /// Must be called to start processing tickets into the DB.
52    pub fn start_ticket_processing<S, E>(&self, ticket_notifier: S) -> Result<(), NodeDbError>
53    where
54        S: Sink<AcknowledgedTicket, Error = E> + Send + 'static,
55        E: std::error::Error,
56    {
57        let (tx, mut rx) = futures::channel::mpsc::channel::<AcknowledgedTicket>(100_000);
58
59        self.incoming_ack_tickets_tx
60            .set(tx)
61            .map_err(|_| NodeDbError::LogicalError("ticket processing already started".into()))?;
62
63        // Creates a process to desynchronize storing of the ticket into the database
64        // and the processing calls triggering such an operation.
65        let db_clone = self.tickets_db.clone();
66        let mutex_clone = self.mutex.clone();
67
68        // NOTE: This spawned task does not need to be explicitly canceled, since it will
69        // be automatically dropped when the event sender object is dropped.
70        spawn(async move {
71            pin_mut!(ticket_notifier);
72            while let Some(ticket_to_insert) = rx.next().await {
73                let ticket_to_insert_clone = ticket_to_insert.clone();
74                let tx_result = {
75                    let _quard = mutex_clone.lock().await;
76                    db_clone
77                        .transaction(|tx| {
78                            Box::pin(async move {
79                                // Insertion of a new acknowledged ticket
80                                let channel_id = ticket_to_insert_clone.verified_ticket().channel_id.to_hex();
81
82                                hopr_db_entity::ticket::ActiveModel::from(ticket_to_insert_clone)
83                                    .insert(tx)
84                                    .await?;
85
86                                // Update the ticket winning count in the statistics
87                                let model = if let Some(model) = hopr_db_entity::ticket_statistics::Entity::find()
88                                    .filter(hopr_db_entity::ticket_statistics::Column::ChannelId.eq(channel_id.clone()))
89                                    .one(tx)
90                                    .await?
91                                {
92                                    let winning_tickets = model.winning_tickets + 1;
93                                    let mut active_model = model.into_active_model();
94                                    active_model.winning_tickets = sea_orm::Set(winning_tickets);
95                                    active_model
96                                } else {
97                                    hopr_db_entity::ticket_statistics::ActiveModel {
98                                        channel_id: sea_orm::Set(channel_id),
99                                        winning_tickets: sea_orm::Set(1),
100                                        ..Default::default()
101                                    }
102                                };
103
104                                model.save(tx).await?;
105                                Ok::<_, sea_orm::DbErr>(())
106                            })
107                        })
108                        .await
109                };
110
111                match tx_result {
112                    Ok(_) => {
113                        debug!(acknowledged_ticket = %ticket_to_insert, "ticket persisted into the ticket db");
114                        // Notify about the ticket once successfully inserted into the Tickets DB
115                        if let Err(error) = ticket_notifier.send(ticket_to_insert).await {
116                            error!(%error, "failed to notify the ticket notifier about the winning ticket");
117                        }
118                    }
119                    Err(error) => {
120                        error!(%error, "failed to insert the winning ticket and update the ticket stats");
121                    }
122                };
123            }
124
125            tracing::info!(task = "ticket processing", "long-running background task finished")
126        });
127
128        Ok(())
129    }
130
131    /// Sends a new acknowledged ticket into the FIFO queue.
132    ///
133    /// The [`start_ticket_processing`](TicketManager::start_ticket_processing) method
134    /// must be called before calling this method, or it will fail.
135    pub async fn insert_ticket(&self, ticket: AcknowledgedTicket) -> Result<(), NodeDbError> {
136        let channel = ticket.verified_ticket().channel_id;
137        let value = ticket.verified_ticket().amount;
138        let epoch = ticket.verified_ticket().channel_epoch;
139
140        self.incoming_ack_tickets_tx
141            .get()
142            .ok_or(NodeDbError::LogicalError("ticket processing not started".into()))?
143            .clone()
144            .try_send(ticket)
145            .map_err(|e| {
146                NodeDbError::LogicalError(format!(
147                    "failed to enqueue acknowledged ticket processing into the DB: {e}"
148                ))
149            })?;
150
151        let unrealized_value = self.unrealized_value(TicketSelector::new(channel, epoch)).await?;
152
153        #[cfg(all(feature = "prometheus", not(test)))]
154        {
155            crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS
156                .increment(&["unredeemed"], (value).amount().as_u128() as f64);
157        }
158
159        self.caches
160            .unrealized_value
161            .insert((channel, epoch.into()), unrealized_value + value)
162            .await;
163
164        Ok(())
165    }
166
167    /// Get unrealized value for a channel
168    pub async fn unrealized_value(&self, selector: TicketSelector) -> Result<HoprBalance, NodeDbError> {
169        if !selector.is_single_channel() {
170            return Err(NodeDbError::LogicalError(
171                "selector must represent a single channel".into(),
172            ));
173        }
174
175        let channel_id = selector.channel_identifiers[0].0;
176        let channel_epoch = selector.channel_identifiers[0].1;
177        let selector: WrappedTicketSelector = selector.into();
178
179        let tickets_db = self.tickets_db.clone();
180        let selector_clone = selector.clone();
181        Ok(self
182            .caches
183            .unrealized_value
184            .try_get_with_by_ref(&(channel_id, channel_epoch), async move {
185                tracing::warn!(%channel_id, %channel_epoch, "cache miss on unrealized value");
186                tickets_db
187                    .transaction(|tx| {
188                        Box::pin(async move {
189                            ticket::Entity::find()
190                                .filter(selector_clone)
191                                .stream(tx)
192                                .await?
193                                .try_fold(HoprBalance::zero(), |value, t| async move {
194                                    Ok(value + HoprBalance::from_be_bytes(t.amount))
195                                })
196                                .await
197                        })
198                    })
199                    .await
200            })
201            .await?)
202    }
203
204    pub async fn write_transaction<'a, F, T, E>(&self, action: F) -> Result<T, sea_orm::TransactionError<E>>
205    where
206        F: for<'c> FnOnce(&'c DatabaseTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'c>> + Send,
207        T: Send,
208        E: std::error::Error + Send,
209    {
210        let _guard = self.mutex.lock().await;
211        self.tickets_db.transaction(action).await
212    }
213}
214
215impl HoprNodeDb {
216    /// Starts ticket processing by the `TicketManager` with an optional new ticket notifier.
217    /// Without calling this method, tickets will not be persisted into the DB.
218    ///
219    /// If the notifier is given, it will receive notifications once a new ticket has been
220    /// persisted into the Tickets DB.
221    pub fn start_ticket_processing<S>(&self, ticket_notifier: Option<S>) -> Result<(), NodeDbError>
222    where
223        S: futures::Sink<AcknowledgedTicket> + Send + 'static,
224        S::Error: std::fmt::Display + std::error::Error,
225    {
226        if let Some(notifier) = ticket_notifier {
227            self.ticket_manager.start_ticket_processing(notifier)
228        } else {
229            self.ticket_manager.start_ticket_processing(futures::sink::drain())
230        }
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use futures::StreamExt;
237    use hex_literal::hex;
238    use hopr_crypto_random::Randomizable;
239    use hopr_crypto_types::prelude::*;
240    use hopr_internal_types::prelude::*;
241
242    use super::*;
243    use crate::db::HoprNodeDb;
244
245    lazy_static::lazy_static! {
246        static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
247        static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
248        static ref ALICE_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
249        static ref BOB_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
250    }
251
252    const TICKET_VALUE: u64 = 100_000;
253
254    fn generate_random_ack_ticket(index: u32) -> anyhow::Result<AcknowledgedTicket> {
255        let hk1 = HalfKey::random();
256        let hk2 = HalfKey::random();
257
258        let challenge = Response::from_half_keys(&hk1, &hk2)?.to_challenge()?;
259
260        let ticket = TicketBuilder::default()
261            .direction(BOB.public().as_ref(), ALICE.public().as_ref())
262            .amount(TICKET_VALUE)
263            .index(index as u64)
264            .channel_epoch(4)
265            .challenge(challenge)
266            .build_signed(&BOB, &Hash::default())?;
267
268        Ok(ticket.into_acknowledged(Response::from_half_keys(&hk1, &hk2)?))
269    }
270
271    #[tokio::test]
272    async fn test_insert_ticket_properly_resolves_the_cached_value() -> anyhow::Result<()> {
273        let db = HoprNodeDb::new_in_memory(BOB.clone()).await?;
274
275        let channel = ChannelEntry::new(
276            BOB.public().to_address(),
277            ALICE.public().to_address(),
278            u32::MAX.into(),
279            1.into(),
280            ChannelStatus::Open,
281            4_u32.into(),
282        );
283
284        assert_eq!(
285            HoprBalance::zero(),
286            db.ticket_manager.unrealized_value((&channel).into()).await?
287        );
288
289        let ticket = generate_random_ack_ticket(1)?;
290        let ticket_value = ticket.verified_ticket().amount;
291
292        let (tx, mut rx) = futures::channel::mpsc::unbounded();
293
294        db.ticket_manager.start_ticket_processing(tx)?;
295
296        db.ticket_manager.insert_ticket(ticket.clone()).await?;
297
298        assert_eq!(
299            ticket_value,
300            db.ticket_manager.unrealized_value((&channel).into()).await?
301        );
302
303        let recv_ticket = rx.next().await.ok_or(anyhow::anyhow!("no ticket received"))?;
304        assert_eq!(recv_ticket, ticket);
305
306        Ok(())
307    }
308}