hopr_chain_api/
executors.rs

1use async_trait::async_trait;
2use futures::future::Either;
3use futures::{pin_mut, FutureExt};
4use serde::{Deserialize, Serialize};
5use std::marker::PhantomData;
6use std::time::Duration;
7
8use hopr_async_runtime::prelude::sleep;
9use hopr_chain_actions::action_queue::TransactionExecutor;
10use hopr_chain_actions::payload::PayloadGenerator;
11use hopr_chain_rpc::errors::RpcError;
12use hopr_chain_rpc::{HoprRpcOperations, PendingTransaction};
13use hopr_chain_types::TypedTransaction;
14use hopr_crypto_types::types::Hash;
15use hopr_internal_types::prelude::*;
16use hopr_primitive_types::prelude::*;
17
18/// Represents an abstract client that is capable of submitting
19/// an Ethereum transaction-like object to the blockchain.
20#[async_trait]
21pub trait EthereumClient<T: Into<TypedTransaction>> {
22    /// Sends transaction to the blockchain and returns its hash.
23    ///
24    /// Does not poll for transaction completion.
25    async fn post_transaction(&self, tx: T) -> hopr_chain_rpc::errors::Result<Hash>;
26
27    /// Sends transaction to the blockchain and awaits the required number
28    /// of confirmations by polling the underlying provider periodically.
29    ///
30    /// Returns the TX hash.
31    async fn post_transaction_and_await_confirmation(&self, tx: T) -> hopr_chain_rpc::errors::Result<Hash>;
32}
33
34#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault, Serialize, Deserialize)]
35pub struct RpcEthereumClientConfig {
36    /// Maximum time to wait for the TX to get submitted.
37    ///
38    /// This must be strictly greater than any timeouts in the underlying `HoprRpcOperations`
39    ///
40    /// Defaults to 30 seconds.
41    #[default(Duration::from_secs(30))]
42    pub max_tx_submission_wait: Duration,
43}
44
45/// Instantiation of `EthereumClient` using `HoprRpcOperations`.
46#[derive(Debug, Clone)]
47pub struct RpcEthereumClient<Rpc: HoprRpcOperations> {
48    rpc: Rpc,
49    cfg: RpcEthereumClientConfig,
50}
51
52impl<Rpc: HoprRpcOperations> RpcEthereumClient<Rpc> {
53    pub fn new(rpc: Rpc, cfg: RpcEthereumClientConfig) -> Self {
54        Self { rpc, cfg }
55    }
56
57    /// Post a transaction with a specified timeout.
58    ///
59    /// If the transaction yields a result before the timeout, the result value is returned.
60    /// Otherwise, an [RpcError::Timeout] is returned and the transaction sending is aborted.
61    async fn post_tx_with_timeout(&self, tx: TypedTransaction) -> hopr_chain_rpc::errors::Result<PendingTransaction> {
62        let submit_tx = self.rpc.send_transaction(tx).fuse();
63        let timeout = sleep(self.cfg.max_tx_submission_wait).fuse();
64        pin_mut!(submit_tx, timeout);
65
66        match futures::future::select(submit_tx, timeout).await {
67            Either::Left((res, _)) => res,
68            Either::Right(_) => Err(RpcError::Timeout),
69        }
70    }
71}
72
73#[async_trait]
74impl<Rpc: HoprRpcOperations + Send + Sync> EthereumClient<TypedTransaction> for RpcEthereumClient<Rpc> {
75    async fn post_transaction(&self, tx: TypedTransaction) -> hopr_chain_rpc::errors::Result<Hash> {
76        self.post_tx_with_timeout(tx).await.map(|t| t.tx_hash())
77    }
78
79    /// Post a transaction and wait for its completion.
80    ///
81    /// The mechanism uses an internal timeout and retry mechanism (set to `3`)
82    async fn post_transaction_and_await_confirmation(
83        &self,
84        tx: TypedTransaction,
85    ) -> hopr_chain_rpc::errors::Result<Hash> {
86        Ok(self.post_tx_with_timeout(tx).await?.await?.tx_hash)
87    }
88}
89
90/// Implementation of [`TransactionExecutor`] using the given [`EthereumClient`] and corresponding
91/// [`PayloadGenerator`].
92#[derive(Clone, Debug)]
93pub struct EthereumTransactionExecutor<T, C, PGen>
94where
95    T: Into<TypedTransaction>,
96    C: EthereumClient<T> + Clone,
97    PGen: PayloadGenerator<T> + Clone,
98{
99    client: C,
100    payload_generator: PGen,
101    _data: PhantomData<T>,
102}
103
104impl<T, C, PGen> EthereumTransactionExecutor<T, C, PGen>
105where
106    T: Into<TypedTransaction>,
107    C: EthereumClient<T> + Clone,
108    PGen: PayloadGenerator<T> + Clone,
109{
110    pub fn new(client: C, payload_generator: PGen) -> Self {
111        Self {
112            client,
113            payload_generator,
114            _data: PhantomData,
115        }
116    }
117}
118
119#[async_trait]
120impl<T, C, PGen> TransactionExecutor for EthereumTransactionExecutor<T, C, PGen>
121where
122    T: Into<TypedTransaction> + Sync + Send,
123    C: EthereumClient<T> + Clone + Sync + Send,
124    PGen: PayloadGenerator<T> + Clone + Sync + Send,
125{
126    async fn redeem_ticket(&self, acked_ticket: RedeemableTicket) -> hopr_chain_actions::errors::Result<Hash> {
127        let payload = self.payload_generator.redeem_ticket(acked_ticket)?;
128        Ok(self.client.post_transaction(payload).await?)
129    }
130
131    async fn fund_channel(&self, destination: Address, balance: Balance) -> hopr_chain_actions::errors::Result<Hash> {
132        let payload = self.payload_generator.fund_channel(destination, balance)?;
133        Ok(self.client.post_transaction(payload).await?)
134    }
135
136    async fn initiate_outgoing_channel_closure(&self, dst: Address) -> hopr_chain_actions::errors::Result<Hash> {
137        let payload = self.payload_generator.initiate_outgoing_channel_closure(dst)?;
138        Ok(self.client.post_transaction(payload).await?)
139    }
140
141    async fn finalize_outgoing_channel_closure(&self, dst: Address) -> hopr_chain_actions::errors::Result<Hash> {
142        let payload = self.payload_generator.finalize_outgoing_channel_closure(dst)?;
143        Ok(self.client.post_transaction(payload).await?)
144    }
145
146    async fn close_incoming_channel(&self, src: Address) -> hopr_chain_actions::errors::Result<Hash> {
147        let payload = self.payload_generator.close_incoming_channel(src)?;
148        Ok(self.client.post_transaction(payload).await?)
149    }
150
151    async fn withdraw(&self, recipient: Address, amount: Balance) -> hopr_chain_actions::errors::Result<Hash> {
152        let payload = self.payload_generator.transfer(recipient, amount)?;
153
154        // Withdraw transaction is out-of-band from Indexer, so its confirmation
155        // is awaited via polling.
156        Ok(self.client.post_transaction_and_await_confirmation(payload).await?)
157    }
158
159    async fn announce(&self, data: AnnouncementData) -> hopr_chain_actions::errors::Result<Hash> {
160        let payload = self.payload_generator.announce(data)?;
161        Ok(self.client.post_transaction(payload).await?)
162    }
163
164    async fn register_safe(&self, safe_address: Address) -> hopr_chain_actions::errors::Result<Hash> {
165        let payload = self.payload_generator.register_safe_by_node(safe_address)?;
166        Ok(self.client.post_transaction(payload).await?)
167    }
168}