hopr_db_sql/
ticket_manager.rs

1use futures::{future::BoxFuture, pin_mut, Sink, SinkExt, StreamExt, TryStreamExt};
2use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, TransactionTrait};
3use std::sync::{Arc, OnceLock};
4use tracing::{debug, error};
5
6use hopr_async_runtime::prelude::spawn;
7use hopr_db_api::tickets::TicketSelector;
8use hopr_db_entity::ticket;
9use hopr_internal_types::prelude::AcknowledgedTicketStatus;
10use hopr_internal_types::tickets::AcknowledgedTicket;
11use hopr_primitive_types::prelude::ToHex;
12use hopr_primitive_types::primitives::{Balance, BalanceType};
13
14use crate::cache::HoprDbCaches;
15use crate::prelude::DbSqlError;
16use crate::tickets::WrappedTicketSelector;
17use crate::{errors::Result, OpenTransaction};
18
19/// Functionality related to locking and structural improvements to the underlying SQLite database
20///
21/// With SQLite, it is only possible to have a single write lock per database, meaning that
22/// high-frequency database access to tickets needed to be split from the rest of the database
23/// operations.
24///
25/// High frequency of locking originating from the ticket processing pipeline could starve the DB
26/// and lock with other concurrent processes. Therefore, a single mutex for write operations exists,
27/// which allows bottle-necking the database write access on the mutex, as well as allowing arbitrary
28/// numbers of concurrent read operations.
29///
30/// The queue-based mechanism also splits the storage of the ticket inside the database from the processing,
31/// effectively allowing the processing pipelines to be independent of a database write access.
32#[derive(Debug, Clone)]
33pub(crate) struct TicketManager {
34    pub(crate) tickets_db: sea_orm::DatabaseConnection,
35    pub(crate) mutex: Arc<async_lock::Mutex<()>>,
36    incoming_ack_tickets_tx: Arc<OnceLock<futures::channel::mpsc::Sender<TicketOperation>>>,
37    caches: Arc<HoprDbCaches>,
38}
39
40enum TicketOperation {
41    /// Inserts a new ticket
42    Insert(AcknowledgedTicket),
43    /// Replaces multiple tickets (in BeingAggregated state) with the given aggregated ticket.
44    Replace(AcknowledgedTicket),
45}
46
47impl TicketOperation {
48    fn ticket(&self) -> &AcknowledgedTicket {
49        match self {
50            TicketOperation::Insert(ticket) => ticket,
51            TicketOperation::Replace(ticket) => ticket,
52        }
53    }
54}
55
56impl TicketManager {
57    pub fn new(tickets_db: sea_orm::DatabaseConnection, caches: Arc<HoprDbCaches>) -> Self {
58        Self {
59            tickets_db,
60            mutex: Arc::new(async_lock::Mutex::new(())),
61            incoming_ack_tickets_tx: Arc::new(OnceLock::new()),
62            caches,
63        }
64    }
65
66    /// Must be called to start processing tickets into the DB.
67    pub fn start_ticket_processing<S, E>(&self, ticket_notifier: S) -> Result<()>
68    where
69        S: Sink<AcknowledgedTicket, Error = E> + Send + 'static,
70        E: std::error::Error,
71    {
72        let (tx, mut rx) = futures::channel::mpsc::channel::<TicketOperation>(100_000);
73
74        self.incoming_ack_tickets_tx
75            .set(tx)
76            .map_err(|_| DbSqlError::LogicalError("ticket processing already started".into()))?;
77
78        // Creates a process to desynchronize storing of the ticket into the database
79        // and the processing calls triggering such an operation.
80        let db_clone = self.tickets_db.clone();
81        let mutex_clone = self.mutex.clone();
82
83        // NOTE: This spawned task does not need to be explicitly canceled, since it will
84        // be automatically dropped when the event sender object is dropped.
85        spawn(async move {
86            pin_mut!(ticket_notifier);
87            while let Some(ticket_op) = rx.next().await {
88                let ticket_to_insert = ticket_op.ticket().clone();
89                let ticket_inserted = match db_clone
90                    .begin_with_config(None, None)
91                    .await
92                    .map_err(DbSqlError::BackendError)
93                {
94                    Ok(transaction) => {
95                        let transaction = OpenTransaction(transaction, crate::TargetDb::Tickets);
96
97                        let _quard = mutex_clone.lock().await;
98
99                        if let Err(error) = transaction
100                            .perform(|tx| {
101                                Box::pin(async move {
102                                    match ticket_op {
103                                        // Insertion of a new acknowledged ticket
104                                        TicketOperation::Insert(ack_ticket) => {
105                                            let channel_id = ack_ticket.verified_ticket().channel_id.to_hex();
106
107                                            hopr_db_entity::ticket::ActiveModel::from(ack_ticket)
108                                                .insert(tx.as_ref())
109                                                .await?;
110
111                                            // Update the ticket winning count in the statistics
112                                            if let Some(model) = hopr_db_entity::ticket_statistics::Entity::find()
113                                                .filter(
114                                                    hopr_db_entity::ticket_statistics::Column::ChannelId.eq(channel_id.clone()),
115                                                )
116                                                .one(tx.as_ref())
117                                                .await?
118                                            {
119                                                let winning_tickets = model.winning_tickets + 1;
120                                                let mut active_model = model.into_active_model();
121                                                active_model.winning_tickets = sea_orm::Set(winning_tickets);
122                                                active_model
123                                            } else {
124                                                hopr_db_entity::ticket_statistics::ActiveModel {
125                                                    channel_id: sea_orm::Set(channel_id),
126                                                    winning_tickets: sea_orm::Set(1),
127                                                    ..Default::default()
128                                                }
129                                            }
130                                            .save(tx.as_ref())
131                                            .await?;
132                                        }
133                                        TicketOperation::Replace(ack_ticket) => {
134                                            // Replacement range on the aggregated ticket
135                                            let start_idx = ack_ticket.verified_ticket().index;
136                                            let offset = ack_ticket.verified_ticket().index_offset as u64;
137
138                                            // Replace all BeingAggregated tickets with aggregated index range in this channel
139                                            let selector = TicketSelector::new(ack_ticket.verified_ticket().channel_id, ack_ticket.verified_ticket().channel_epoch)
140                                                .with_index_range(start_idx..start_idx + offset)
141                                                .with_state(AcknowledgedTicketStatus::BeingAggregated);
142
143                                            let deleted = ticket::Entity::delete_many()
144                                                .filter(WrappedTicketSelector::from(selector))
145                                                .exec(tx.as_ref())
146                                                .await?;
147
148                                            if deleted.rows_affected > offset {
149                                                return Err(DbSqlError::LogicalError(format!(
150                                                    "deleted ticket count ({}) must not be more than the ticket index offset {offset}",
151                                                    deleted.rows_affected,
152                                                )));
153                                            }
154
155                                            ticket::Entity::insert::<ticket::ActiveModel>(ack_ticket.into())
156                                                .exec(tx.as_ref())
157                                                .await?;
158                                        }
159                                    }
160                                    Ok::<_, DbSqlError>(())
161                                })
162                            })
163                            .await
164                        {
165                            error!(%error, "failed to insert the winning ticket and update the ticket stats");
166                            false
167                        } else {
168                            debug!(acknowledged_ticket = %ticket_to_insert, "ticket persisted into the ticket db");
169                            true
170                        }
171                    }
172                    Err(error) => {
173                        error!(%error, "failed to create a transaction for ticket insertion");
174                        false
175                    }
176                };
177
178                // Notify about the ticket once successfully inserted into the Tickets DB
179                if ticket_inserted {
180                    if let Err(error) = ticket_notifier.send(ticket_to_insert).await {
181                        error!(%error, "failed to notify the ticket notifier about the winning ticket");
182                    }
183                }
184            }
185        });
186
187        Ok(())
188    }
189
190    /// Sends a new acknowledged ticket into the FIFO queue.
191    ///
192    /// The [`start_ticket_processing`](TicketManager::start_ticket_processing) method
193    /// must be called before calling this method, or it will fail.
194    pub async fn insert_ticket(&self, ticket: AcknowledgedTicket) -> Result<()> {
195        let channel = ticket.verified_ticket().channel_id;
196        let value = ticket.verified_ticket().amount;
197        let epoch = ticket.verified_ticket().channel_epoch;
198
199        self.incoming_ack_tickets_tx
200            .get()
201            .ok_or(DbSqlError::LogicalError("ticket processing not started".into()))?
202            .clone()
203            .try_send(TicketOperation::Insert(ticket))
204            .map_err(|e| {
205                DbSqlError::LogicalError(format!(
206                    "failed to enqueue acknowledged ticket processing into the DB: {e}"
207                ))
208            })?;
209
210        let unrealized_value = self.unrealized_value(TicketSelector::new(channel, epoch)).await?;
211
212        #[cfg(all(feature = "prometheus", not(test)))]
213        {
214            crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
215                &[&channel.to_string(), "unredeemed"],
216                (unrealized_value + value).amount().as_u128() as f64,
217            );
218        }
219
220        self.caches
221            .unrealized_value
222            .insert((channel, epoch.into()), unrealized_value + value)
223            .await;
224
225        Ok(())
226    }
227
228    /// Sends aggregated replacement ticket into the FIFO queue.
229    ///
230    /// The [`start_ticket_processing`](TicketManager::start_ticket_processing) method
231    /// must be called before calling this method, or it will fail.
232    pub async fn replace_tickets(&self, ticket: AcknowledgedTicket) -> Result<()> {
233        self.incoming_ack_tickets_tx
234            .get()
235            .ok_or(DbSqlError::LogicalError("ticket processing not started".into()))?
236            .clone()
237            .try_send(TicketOperation::Replace(ticket))
238            .map_err(|e| {
239                DbSqlError::LogicalError(format!(
240                    "failed to enqueue acknowledged ticket processing into the DB: {e}"
241                ))
242            })
243    }
244
245    /// Get unrealized value for a channel
246    pub async fn unrealized_value(&self, selector: TicketSelector) -> Result<Balance> {
247        if !selector.is_single_channel() {
248            return Err(crate::DbSqlError::LogicalError(
249                "selector must represent a single channel".into(),
250            ));
251        }
252
253        let channel_id = selector.channel_identifiers[0].0;
254        let channel_epoch = selector.channel_identifiers[0].1;
255        let selector: WrappedTicketSelector = selector.into();
256
257        let transaction = OpenTransaction(
258            self.tickets_db
259                .begin_with_config(None, None)
260                .await
261                .map_err(crate::errors::DbSqlError::BackendError)?,
262            crate::TargetDb::Tickets,
263        );
264
265        let selector_clone = selector.clone();
266        Ok(self
267            .caches
268            .unrealized_value
269            .try_get_with_by_ref(&(channel_id, channel_epoch), async move {
270                transaction
271                    .perform(|tx| {
272                        Box::pin(async move {
273                            ticket::Entity::find()
274                                .filter(selector_clone)
275                                .stream(tx.as_ref())
276                                .await
277                                .map_err(crate::errors::DbSqlError::from)?
278                                .map_err(crate::errors::DbSqlError::from)
279                                .try_fold(BalanceType::HOPR.zero(), |value, t| async move {
280                                    Ok(value + BalanceType::HOPR.balance_bytes(t.amount))
281                                })
282                                .await
283                        })
284                    })
285                    .await
286            })
287            .await?)
288    }
289
290    /// Acquires write lock to the Ticket DB and starts a new transaction.
291    pub async fn with_write_locked_db<'a, F, T, E>(&'a self, f: F) -> std::result::Result<T, E>
292    where
293        F: for<'c> FnOnce(&'c OpenTransaction) -> BoxFuture<'c, std::result::Result<T, E>> + Send,
294        T: Send,
295        E: std::error::Error + From<crate::errors::DbSqlError>,
296    {
297        let mutex = self.mutex.clone();
298        let _guard = mutex.lock().await;
299
300        let transaction = OpenTransaction(
301            self.tickets_db
302                .begin_with_config(None, None)
303                .await
304                .map_err(crate::errors::DbSqlError::BackendError)?,
305            crate::TargetDb::Tickets,
306        );
307
308        transaction.perform(f).await
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use futures::StreamExt;
315    use hex_literal::hex;
316    use hopr_crypto_types::prelude::*;
317    use hopr_db_api::info::DomainSeparator;
318    use hopr_internal_types::prelude::*;
319    use hopr_primitive_types::prelude::*;
320
321    use crate::accounts::HoprDbAccountOperations;
322    use crate::channels::HoprDbChannelOperations;
323    use crate::db::HoprDb;
324    use crate::info::HoprDbInfoOperations;
325
326    lazy_static::lazy_static! {
327        static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
328        static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
329    }
330
331    lazy_static::lazy_static! {
332        static ref ALICE_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
333        static ref BOB_OFFCHAIN: OffchainKeypair = OffchainKeypair::random();
334    }
335
336    const TICKET_VALUE: u64 = 100_000;
337
338    async fn add_peer_mappings(db: &HoprDb, peers: Vec<(OffchainKeypair, ChainKeypair)>) -> crate::errors::Result<()> {
339        for (peer_offchain, peer_onchain) in peers.into_iter() {
340            db.insert_account(
341                None,
342                AccountEntry {
343                    public_key: *peer_offchain.public(),
344                    chain_addr: peer_onchain.public().to_address(),
345                    entry_type: AccountType::NotAnnounced,
346                },
347            )
348            .await?
349        }
350
351        Ok(())
352    }
353
354    fn generate_random_ack_ticket(index: u32) -> anyhow::Result<AcknowledgedTicket> {
355        let hk1 = HalfKey::random();
356        let hk2 = HalfKey::random();
357
358        let cp1: CurvePoint = hk1.to_challenge().try_into()?;
359        let cp2: CurvePoint = hk2.to_challenge().try_into()?;
360        let cp_sum = CurvePoint::combine(&[&cp1, &cp2]);
361
362        let ticket = TicketBuilder::default()
363            .direction(&BOB.public().to_address(), &ALICE.public().to_address())
364            .amount(TICKET_VALUE)
365            .index(index as u64)
366            .channel_epoch(4)
367            .challenge(Challenge::from(cp_sum).to_ethereum_challenge())
368            .build_signed(&BOB, &Hash::default())?;
369
370        Ok(ticket.into_acknowledged(Response::from_half_keys(&hk1, &hk2)?))
371    }
372
373    #[async_std::test]
374    async fn test_insert_ticket_properly_resolves_the_cached_value() -> anyhow::Result<()> {
375        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
376        db.set_domain_separator(None, DomainSeparator::Channel, Hash::default())
377            .await?;
378        add_peer_mappings(
379            &db,
380            vec![
381                (ALICE_OFFCHAIN.clone(), ALICE.clone()),
382                (BOB_OFFCHAIN.clone(), BOB.clone()),
383            ],
384        )
385        .await?;
386
387        let channel = ChannelEntry::new(
388            BOB.public().to_address(),
389            ALICE.public().to_address(),
390            BalanceType::HOPR.balance(u32::MAX),
391            1.into(),
392            ChannelStatus::Open,
393            4_u32.into(),
394        );
395
396        db.upsert_channel(None, channel).await?;
397
398        assert_eq!(
399            Balance::zero(BalanceType::HOPR),
400            db.ticket_manager.unrealized_value((&channel).into()).await?
401        );
402
403        let ticket = generate_random_ack_ticket(1)?;
404        let ticket_value = ticket.verified_ticket().amount;
405
406        let (tx, mut rx) = futures::channel::mpsc::unbounded();
407
408        db.ticket_manager.start_ticket_processing(tx)?;
409
410        db.ticket_manager.insert_ticket(ticket.clone()).await?;
411
412        assert_eq!(
413            ticket_value,
414            db.ticket_manager.unrealized_value((&channel).into()).await?
415        );
416
417        let recv_ticket = rx.next().await.ok_or(anyhow::anyhow!("no ticket received"))?;
418        assert_eq!(recv_ticket, ticket);
419
420        Ok(())
421    }
422}