hopr_chain_api/
executors.rs

1use std::{marker::PhantomData, time::Duration};
2
3use alloy::{providers::PendingTransaction, rpc::types::TransactionRequest};
4use async_trait::async_trait;
5use futures::{FutureExt, future::Either, pin_mut};
6use hopr_async_runtime::prelude::sleep;
7use hopr_chain_actions::{action_queue::TransactionExecutor, payload::PayloadGenerator};
8use hopr_chain_rpc::{HoprRpcOperations, errors::RpcError};
9use hopr_crypto_types::types::Hash;
10use hopr_internal_types::prelude::*;
11use hopr_primitive_types::prelude::*;
12use serde::{Deserialize, Serialize};
13
14/// Represents an abstract client that is capable of submitting
15/// an Ethereum transaction-like object to the blockchain.
16#[async_trait]
17pub trait EthereumClient<T: Into<TransactionRequest>> {
18    /// Sends transaction to the blockchain and returns its hash.
19    ///
20    /// Does not poll for transaction completion.
21    async fn post_transaction(&self, tx: T) -> hopr_chain_rpc::errors::Result<Hash>;
22
23    /// Sends transaction to the blockchain and awaits the required number
24    /// of confirmations by polling the underlying provider periodically.
25    ///
26    /// Returns the TX hash.
27    async fn post_transaction_and_await_confirmation(&self, tx: T) -> hopr_chain_rpc::errors::Result<Hash>;
28}
29
30#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault, Serialize, Deserialize)]
31pub struct RpcEthereumClientConfig {
32    /// Maximum time to wait for the TX to get submitted.
33    ///
34    /// This should be strictly greater than any timeouts in the underlying `HoprRpcOperations`
35    ///
36    /// Defaults to 5 seconds.
37    #[default(Duration::from_secs(5))]
38    pub max_tx_submission_wait: Duration,
39}
40
41/// Instantiation of `EthereumClient` using `HoprRpcOperations`.
42#[derive(Debug, Clone)]
43pub struct RpcEthereumClient<Rpc: HoprRpcOperations> {
44    rpc: Rpc,
45    cfg: RpcEthereumClientConfig,
46}
47
48impl<Rpc: HoprRpcOperations> RpcEthereumClient<Rpc> {
49    pub fn new(rpc: Rpc, cfg: RpcEthereumClientConfig) -> Self {
50        Self { rpc, cfg }
51    }
52
53    /// Post a transaction with a specified timeout.
54    ///
55    /// If the transaction yields a result before the timeout, the result value is returned.
56    /// Otherwise, an [RpcError::Timeout] is returned and the transaction sending is aborted.
57    async fn post_tx_with_timeout(&self, tx: TransactionRequest) -> hopr_chain_rpc::errors::Result<PendingTransaction> {
58        let submit_tx = self.rpc.send_transaction(tx).fuse();
59        let timeout = sleep(self.cfg.max_tx_submission_wait).fuse();
60        pin_mut!(submit_tx, timeout);
61
62        match futures::future::select(submit_tx, timeout).await {
63            Either::Left((res, _)) => res,
64            Either::Right(_) => Err(RpcError::Timeout),
65        }
66    }
67}
68
69#[async_trait]
70impl<Rpc: HoprRpcOperations + Send + Sync> EthereumClient<TransactionRequest> for RpcEthereumClient<Rpc> {
71    async fn post_transaction(&self, tx: TransactionRequest) -> hopr_chain_rpc::errors::Result<Hash> {
72        self.post_tx_with_timeout(tx).await.map(|t| t.tx_hash().0.into())
73    }
74
75    /// Post a transaction and wait for its completion.
76    ///
77    /// The mechanism uses an internal timeout and retry mechanism (set to `3`)
78    async fn post_transaction_and_await_confirmation(
79        &self,
80        tx: TransactionRequest,
81    ) -> hopr_chain_rpc::errors::Result<Hash> {
82        Ok(self.post_tx_with_timeout(tx).await?.await?.0.into())
83    }
84}
85
86/// Implementation of [`TransactionExecutor`] using the given [`EthereumClient`] and corresponding
87/// [`PayloadGenerator`].
88#[derive(Clone, Debug)]
89pub struct EthereumTransactionExecutor<T, C, PGen>
90where
91    T: Into<TransactionRequest>,
92    C: EthereumClient<T> + Clone,
93    PGen: PayloadGenerator<T> + Clone,
94{
95    client: C,
96    payload_generator: PGen,
97    _data: PhantomData<T>,
98}
99
100impl<T, C, PGen> EthereumTransactionExecutor<T, C, PGen>
101where
102    T: Into<TransactionRequest>,
103    C: EthereumClient<T> + Clone,
104    PGen: PayloadGenerator<T> + Clone,
105{
106    pub fn new(client: C, payload_generator: PGen) -> Self {
107        Self {
108            client,
109            payload_generator,
110            _data: PhantomData,
111        }
112    }
113}
114
115#[async_trait]
116impl<T, C, PGen> TransactionExecutor for EthereumTransactionExecutor<T, C, PGen>
117where
118    T: Into<TransactionRequest> + Sync + Send,
119    C: EthereumClient<T> + Clone + Sync + Send,
120    PGen: PayloadGenerator<T> + Clone + Sync + Send,
121{
122    async fn redeem_ticket(&self, acked_ticket: RedeemableTicket) -> hopr_chain_actions::errors::Result<Hash> {
123        let payload = self.payload_generator.redeem_ticket(acked_ticket)?;
124        Ok(self.client.post_transaction(payload).await?)
125    }
126
127    async fn fund_channel(
128        &self,
129        destination: Address,
130        balance: HoprBalance,
131    ) -> hopr_chain_actions::errors::Result<Hash> {
132        let payload = self.payload_generator.fund_channel(destination, balance)?;
133        Ok(self.client.post_transaction(payload).await?)
134    }
135
136    async fn initiate_outgoing_channel_closure(&self, dst: Address) -> hopr_chain_actions::errors::Result<Hash> {
137        let payload = self.payload_generator.initiate_outgoing_channel_closure(dst)?;
138        Ok(self.client.post_transaction(payload).await?)
139    }
140
141    async fn finalize_outgoing_channel_closure(&self, dst: Address) -> hopr_chain_actions::errors::Result<Hash> {
142        let payload = self.payload_generator.finalize_outgoing_channel_closure(dst)?;
143        Ok(self.client.post_transaction(payload).await?)
144    }
145
146    async fn close_incoming_channel(&self, src: Address) -> hopr_chain_actions::errors::Result<Hash> {
147        let payload = self.payload_generator.close_incoming_channel(src)?;
148        Ok(self.client.post_transaction(payload).await?)
149    }
150
151    async fn withdraw<Cr: Currency + Send>(
152        &self,
153        recipient: Address,
154        amount: Balance<Cr>,
155    ) -> hopr_chain_actions::errors::Result<Hash> {
156        let payload = self.payload_generator.transfer(recipient, amount)?;
157
158        // Withdraw transaction is out-of-band from Indexer, so its confirmation
159        // is awaited via polling.
160        Ok(self.client.post_transaction_and_await_confirmation(payload).await?)
161    }
162
163    async fn announce(&self, data: AnnouncementData) -> hopr_chain_actions::errors::Result<Hash> {
164        let payload = self.payload_generator.announce(data)?;
165        Ok(self.client.post_transaction(payload).await?)
166    }
167
168    async fn register_safe(&self, safe_address: Address) -> hopr_chain_actions::errors::Result<Hash> {
169        let payload = self.payload_generator.register_safe_by_node(safe_address)?;
170        Ok(self.client.post_transaction(payload).await?)
171    }
172}