hopr_chain_connector/connector/
sequencer.rs1use blokli_client::api::{BlokliQueryClient, BlokliTransactionClient};
2use futures::{FutureExt, SinkExt, StreamExt, TryFutureExt};
3use hopr_api::types::{
4 chain::prelude::{GasEstimation, SignableTransaction},
5 crypto::prelude::*,
6};
7
8use crate::{
9 errors::{self, ConnectorError},
10 utils::model_to_chain_info,
11};
12
13type TxRequest<T> = (
14 T,
15 futures::channel::oneshot::Sender<errors::Result<blokli_client::api::TxId>>,
16);
17
18pub struct TransactionSequencer<C, R> {
22 sender: futures::channel::mpsc::Sender<TxRequest<R>>,
23 client: std::sync::Arc<C>,
24}
25
26const TX_QUEUE_CAPACITY: usize = 2048;
27
28impl<C, R> TransactionSequencer<C, R>
29where
30 C: BlokliQueryClient + BlokliTransactionClient + Send + Sync + 'static,
31 R: SignableTransaction + Send + Sync + 'static,
32{
33 pub fn new(signer: ChainKeypair, client: std::sync::Arc<C>) -> Self {
34 tracing::debug!(signer = %signer.public().to_address(), "starting transaction sequencer");
35
36 let client_clone = client.clone();
37 let (sender, receiver) = futures::channel::mpsc::channel::<TxRequest<R>>(TX_QUEUE_CAPACITY);
38 let current_nonce = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
39 let current_nonce_clone = current_nonce.clone();
40 hopr_async_runtime::prelude::spawn(
41 receiver
42 .then(move |(tx, notifier): (R, _)| {
43 let client = client_clone.clone();
44 let signer = signer.clone();
45 let current_nonce = current_nonce.clone();
46 async move {
47 let chain_info = match client.query_chain_info().map_err(ConnectorError::from).await {
48 Ok(chain_info) => {
49 tracing::debug!(chain_id = chain_info.chain_id, "chain info retrieved for tx");
50 chain_info
51 }
52 Err(e) => return (Err(e), notifier),
53 };
54
55 let parsed_chain_info = match model_to_chain_info(chain_info) {
56 Ok(parsed_chain_info) => parsed_chain_info,
57 Err(error) => return (Err(error), notifier),
58 };
59 let chain_id = parsed_chain_info.info.chain_id;
60 let gas_estimation = GasEstimation::from(parsed_chain_info);
61 tracing::debug!(?gas_estimation, "gas estimation used for tx");
62
63 match client
66 .query_transaction_count(&signer.public().to_address().into())
67 .map_err(ConnectorError::from)
68 .await
69 {
70 Ok(tx_count) => {
71 let prev_nonce =
72 current_nonce.fetch_max(tx_count, std::sync::atomic::Ordering::Relaxed);
73 tracing::debug!(prev_nonce, tx_count, "transaction count retrieved");
74 }
75 Err(e) => return (Err(e), notifier),
76 }
77
78 let nonce = current_nonce.load(std::sync::atomic::Ordering::Relaxed);
79 tracing::debug!(nonce, "nonce used for the tx");
80 tx.sign_and_encode_to_eip2718(nonce, chain_id, gas_estimation.into(), &signer)
81 .map_err(ConnectorError::from)
82 .and_then(move |tx| {
83 tracing::debug!(nonce, "submitting transaction");
84 let client = client.clone();
85 async move {
86 client
87 .submit_and_track_transaction(&tx)
88 .map_err(ConnectorError::from)
89 .await
90 }
91 })
92 .map(|res| (res, notifier))
93 .await
94 }
95 })
96 .for_each(move |(res, notifier)| {
97 if res.is_ok()
100 || res
101 .as_ref()
102 .is_err_and(|error| error.as_transaction_rejection_error().is_some())
103 {
104 let prev_nonce = current_nonce_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
105 tracing::debug!(prev_nonce, ?res, "nonce incremented due to tx success or rejection");
106 } else {
107 tracing::warn!(?res, "nonce not incremented due to tx failure other than rejection");
108 }
109
110 if notifier.send(res).is_err() {
111 tracing::debug!(
112 "failed to notify transaction result - the caller may not want to await the result \
113 anymore."
114 );
115 }
116 futures::future::ready(())
117 })
118 .inspect(|_| tracing::warn!("transaction sequencer queue stopped")),
119 );
120
121 Self { sender, client }
122 }
123}
124
125impl<C, R> TransactionSequencer<C, R>
126where
127 C: BlokliTransactionClient + Send + Sync + 'static,
128{
129 pub async fn enqueue_transaction(
133 &self,
134 transaction: R,
135 timeout_until_finalized: std::time::Duration,
136 ) -> errors::Result<impl Future<Output = errors::Result<blokli_client::api::types::Transaction>>> {
137 let (notifier_tx, notifier_rx) = futures::channel::oneshot::channel();
138
139 self.sender
140 .clone()
141 .send((transaction, notifier_tx))
142 .await
143 .map_err(|_| ConnectorError::InvalidState("transaction queue dropped"))?;
144
145 Ok(notifier_rx
146 .inspect_ok(|res| tracing::debug!(?res, "transaction tracking id received"))
147 .map(move |result| {
148 result
149 .map_err(|_| ConnectorError::InvalidState("transaction notifier dropped"))
150 .and_then(|tx_res| tx_res.map(|id| (id, timeout_until_finalized)))
151 })
152 .and_then(|(tx_id, timeout)| {
153 self.client
154 .track_transaction(tx_id, timeout)
155 .map_err(ConnectorError::from)
156 .inspect(|res| tracing::debug!(?res, "transaction tracking done"))
157 }))
158 }
159}
160
161impl<C, R> Drop for TransactionSequencer<C, R> {
162 fn drop(&mut self) {
163 self.sender.close_channel();
165 }
166}