hopr_chain_api/
executors.rs1use async_trait::async_trait;
2use futures::future::Either;
3use futures::{pin_mut, FutureExt};
4use serde::{Deserialize, Serialize};
5use std::marker::PhantomData;
6use std::time::Duration;
7
8use hopr_async_runtime::prelude::sleep;
9use hopr_chain_actions::action_queue::TransactionExecutor;
10use hopr_chain_actions::payload::PayloadGenerator;
11use hopr_chain_rpc::errors::RpcError;
12use hopr_chain_rpc::{HoprRpcOperations, PendingTransaction};
13use hopr_chain_types::TypedTransaction;
14use hopr_crypto_types::types::Hash;
15use hopr_internal_types::prelude::*;
16use hopr_primitive_types::prelude::*;
17
18#[async_trait]
21pub trait EthereumClient<T: Into<TypedTransaction>> {
22 async fn post_transaction(&self, tx: T) -> hopr_chain_rpc::errors::Result<Hash>;
26
27 async fn post_transaction_and_await_confirmation(&self, tx: T) -> hopr_chain_rpc::errors::Result<Hash>;
32}
33
34#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault, Serialize, Deserialize)]
35pub struct RpcEthereumClientConfig {
36 #[default(Duration::from_secs(30))]
42 pub max_tx_submission_wait: Duration,
43}
44
45#[derive(Debug, Clone)]
47pub struct RpcEthereumClient<Rpc: HoprRpcOperations> {
48 rpc: Rpc,
49 cfg: RpcEthereumClientConfig,
50}
51
52impl<Rpc: HoprRpcOperations> RpcEthereumClient<Rpc> {
53 pub fn new(rpc: Rpc, cfg: RpcEthereumClientConfig) -> Self {
54 Self { rpc, cfg }
55 }
56
57 async fn post_tx_with_timeout(&self, tx: TypedTransaction) -> hopr_chain_rpc::errors::Result<PendingTransaction> {
62 let submit_tx = self.rpc.send_transaction(tx).fuse();
63 let timeout = sleep(self.cfg.max_tx_submission_wait).fuse();
64 pin_mut!(submit_tx, timeout);
65
66 match futures::future::select(submit_tx, timeout).await {
67 Either::Left((res, _)) => res,
68 Either::Right(_) => Err(RpcError::Timeout),
69 }
70 }
71}
72
73#[async_trait]
74impl<Rpc: HoprRpcOperations + Send + Sync> EthereumClient<TypedTransaction> for RpcEthereumClient<Rpc> {
75 async fn post_transaction(&self, tx: TypedTransaction) -> hopr_chain_rpc::errors::Result<Hash> {
76 self.post_tx_with_timeout(tx).await.map(|t| t.tx_hash())
77 }
78
79 async fn post_transaction_and_await_confirmation(
83 &self,
84 tx: TypedTransaction,
85 ) -> hopr_chain_rpc::errors::Result<Hash> {
86 Ok(self.post_tx_with_timeout(tx).await?.await?.tx_hash)
87 }
88}
89
90#[derive(Clone, Debug)]
93pub struct EthereumTransactionExecutor<T, C, PGen>
94where
95 T: Into<TypedTransaction>,
96 C: EthereumClient<T> + Clone,
97 PGen: PayloadGenerator<T> + Clone,
98{
99 client: C,
100 payload_generator: PGen,
101 _data: PhantomData<T>,
102}
103
104impl<T, C, PGen> EthereumTransactionExecutor<T, C, PGen>
105where
106 T: Into<TypedTransaction>,
107 C: EthereumClient<T> + Clone,
108 PGen: PayloadGenerator<T> + Clone,
109{
110 pub fn new(client: C, payload_generator: PGen) -> Self {
111 Self {
112 client,
113 payload_generator,
114 _data: PhantomData,
115 }
116 }
117}
118
119#[async_trait]
120impl<T, C, PGen> TransactionExecutor for EthereumTransactionExecutor<T, C, PGen>
121where
122 T: Into<TypedTransaction> + Sync + Send,
123 C: EthereumClient<T> + Clone + Sync + Send,
124 PGen: PayloadGenerator<T> + Clone + Sync + Send,
125{
126 async fn redeem_ticket(&self, acked_ticket: RedeemableTicket) -> hopr_chain_actions::errors::Result<Hash> {
127 let payload = self.payload_generator.redeem_ticket(acked_ticket)?;
128 Ok(self.client.post_transaction(payload).await?)
129 }
130
131 async fn fund_channel(&self, destination: Address, balance: Balance) -> hopr_chain_actions::errors::Result<Hash> {
132 let payload = self.payload_generator.fund_channel(destination, balance)?;
133 Ok(self.client.post_transaction(payload).await?)
134 }
135
136 async fn initiate_outgoing_channel_closure(&self, dst: Address) -> hopr_chain_actions::errors::Result<Hash> {
137 let payload = self.payload_generator.initiate_outgoing_channel_closure(dst)?;
138 Ok(self.client.post_transaction(payload).await?)
139 }
140
141 async fn finalize_outgoing_channel_closure(&self, dst: Address) -> hopr_chain_actions::errors::Result<Hash> {
142 let payload = self.payload_generator.finalize_outgoing_channel_closure(dst)?;
143 Ok(self.client.post_transaction(payload).await?)
144 }
145
146 async fn close_incoming_channel(&self, src: Address) -> hopr_chain_actions::errors::Result<Hash> {
147 let payload = self.payload_generator.close_incoming_channel(src)?;
148 Ok(self.client.post_transaction(payload).await?)
149 }
150
151 async fn withdraw(&self, recipient: Address, amount: Balance) -> hopr_chain_actions::errors::Result<Hash> {
152 let payload = self.payload_generator.transfer(recipient, amount)?;
153
154 Ok(self.client.post_transaction_and_await_confirmation(payload).await?)
157 }
158
159 async fn announce(&self, data: AnnouncementData) -> hopr_chain_actions::errors::Result<Hash> {
160 let payload = self.payload_generator.announce(data)?;
161 Ok(self.client.post_transaction(payload).await?)
162 }
163
164 async fn register_safe(&self, safe_address: Address) -> hopr_chain_actions::errors::Result<Hash> {
165 let payload = self.payload_generator.register_safe_by_node(safe_address)?;
166 Ok(self.client.post_transaction(payload).await?)
167 }
168}