Skip to main content

hopr_chain_connector/connector/
sequencer.rs

1use blokli_client::api::{BlokliQueryClient, BlokliTransactionClient};
2use futures::{FutureExt, SinkExt, StreamExt, TryFutureExt};
3use hopr_api::types::{
4    chain::prelude::{GasEstimation, SignableTransaction},
5    crypto::prelude::*,
6};
7
8use crate::{
9    errors::{self, ConnectorError},
10    utils::model_to_chain_info,
11};
12
13type TxRequest<T> = (
14    T,
15    futures::channel::oneshot::Sender<errors::Result<blokli_client::api::TxId>>,
16);
17
18/// Takes care of sequencing transactions, nonce management, and submitting the transactions to the blockchain in-order.
19///
20/// This object is meant to be a singleton and cannot be cloned.
21pub struct TransactionSequencer<C, R> {
22    sender: futures::channel::mpsc::Sender<TxRequest<R>>,
23    client: std::sync::Arc<C>,
24}
25
26const TX_QUEUE_CAPACITY: usize = 2048;
27
28impl<C, R> TransactionSequencer<C, R>
29where
30    C: BlokliQueryClient + BlokliTransactionClient + Send + Sync + 'static,
31    R: SignableTransaction + Send + Sync + 'static,
32{
33    pub fn new(signer: ChainKeypair, client: std::sync::Arc<C>) -> Self {
34        tracing::debug!(signer = %signer.public().to_address(), "starting transaction sequencer");
35
36        let client_clone = client.clone();
37        let (sender, receiver) = futures::channel::mpsc::channel::<TxRequest<R>>(TX_QUEUE_CAPACITY);
38        let current_nonce = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
39        let current_nonce_clone = current_nonce.clone();
40        hopr_async_runtime::prelude::spawn(
41            receiver
42                .then(move |(tx, notifier): (R, _)| {
43                    let client = client_clone.clone();
44                    let signer = signer.clone();
45                    let current_nonce = current_nonce.clone();
46                    async move {
47                        let chain_info = match client.query_chain_info().map_err(ConnectorError::from).await {
48                            Ok(chain_info) => {
49                                tracing::debug!(chain_id = chain_info.chain_id, "chain info retrieved for tx");
50                                chain_info
51                            }
52                            Err(e) => return (Err(e), notifier),
53                        };
54
55                        let parsed_chain_info = match model_to_chain_info(chain_info) {
56                            Ok(parsed_chain_info) => parsed_chain_info,
57                            Err(error) => return (Err(error), notifier),
58                        };
59                        let chain_id = parsed_chain_info.info.chain_id;
60                        let gas_estimation = GasEstimation::from(parsed_chain_info);
61                        tracing::debug!(?gas_estimation, "gas estimation used for tx");
62
63                        // We always query the transaction count for the signer and use
64                        // the maximum between this value and the local counter
65                        match client
66                            .query_transaction_count(&signer.public().to_address().into())
67                            .map_err(ConnectorError::from)
68                            .await
69                        {
70                            Ok(tx_count) => {
71                                let prev_nonce =
72                                    current_nonce.fetch_max(tx_count, std::sync::atomic::Ordering::Relaxed);
73                                tracing::debug!(prev_nonce, tx_count, "transaction count retrieved");
74                            }
75                            Err(e) => return (Err(e), notifier),
76                        }
77
78                        let nonce = current_nonce.load(std::sync::atomic::Ordering::Relaxed);
79                        tracing::debug!(nonce, "nonce used for the tx");
80                        tx.sign_and_encode_to_eip2718(nonce, chain_id, gas_estimation.into(), &signer)
81                            .map_err(ConnectorError::from)
82                            .and_then(move |tx| {
83                                tracing::debug!(nonce, "submitting transaction");
84                                let client = client.clone();
85                                async move {
86                                    client
87                                        .submit_and_track_transaction(&tx)
88                                        .map_err(ConnectorError::from)
89                                        .await
90                                }
91                            })
92                            .map(|res| (res, notifier))
93                            .await
94                    }
95                })
96                .for_each(move |(res, notifier)| {
97                    // The nonce is incremented when the transaction succeeded or failed due to on-chain
98                    // rejection.
99                    if res.is_ok()
100                        || res
101                            .as_ref()
102                            .is_err_and(|error| error.as_transaction_rejection_error().is_some())
103                    {
104                        let prev_nonce = current_nonce_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
105                        tracing::debug!(prev_nonce, ?res, "nonce incremented due to tx success or rejection");
106                    } else {
107                        tracing::warn!(?res, "nonce not incremented due to tx failure other than rejection");
108                    }
109
110                    if notifier.send(res).is_err() {
111                        tracing::debug!(
112                            "failed to notify transaction result - the caller may not want to await the result \
113                             anymore."
114                        );
115                    }
116                    futures::future::ready(())
117                })
118                .inspect(|_| tracing::warn!("transaction sequencer queue stopped")),
119        );
120
121        Self { sender, client }
122    }
123}
124
125impl<C, R> TransactionSequencer<C, R>
126where
127    C: BlokliTransactionClient + Send + Sync + 'static,
128{
129    /// Adds the transaction to the [`TransactionSequencer`] queue.
130    ///
131    /// The `timeout_until_finalized` is a total time until the TX is submitted and either confirmed or rejected.
132    pub async fn enqueue_transaction(
133        &self,
134        transaction: R,
135        timeout_until_finalized: std::time::Duration,
136    ) -> errors::Result<impl Future<Output = errors::Result<blokli_client::api::types::Transaction>>> {
137        let (notifier_tx, notifier_rx) = futures::channel::oneshot::channel();
138
139        self.sender
140            .clone()
141            .send((transaction, notifier_tx))
142            .await
143            .map_err(|_| ConnectorError::InvalidState("transaction queue dropped"))?;
144
145        Ok(notifier_rx
146            .inspect_ok(|res| tracing::debug!(?res, "transaction tracking id received"))
147            .map(move |result| {
148                result
149                    .map_err(|_| ConnectorError::InvalidState("transaction notifier dropped"))
150                    .and_then(|tx_res| tx_res.map(|id| (id, timeout_until_finalized)))
151            })
152            .and_then(|(tx_id, timeout)| {
153                self.client
154                    .track_transaction(tx_id, timeout)
155                    .map_err(ConnectorError::from)
156                    .inspect(|res| tracing::debug!(?res, "transaction tracking done"))
157            }))
158    }
159}
160
161impl<C, R> Drop for TransactionSequencer<C, R> {
162    fn drop(&mut self) {
163        // Causes the internally spawned task that drives the queue to terminate
164        self.sender.close_channel();
165    }
166}