hopr_chain_api/
executors.rs1use std::{marker::PhantomData, time::Duration};
2
3use alloy::{providers::PendingTransaction, rpc::types::TransactionRequest};
4use async_trait::async_trait;
5use futures::{FutureExt, future::Either, pin_mut};
6use hopr_async_runtime::prelude::sleep;
7use hopr_chain_actions::{action_queue::TransactionExecutor, payload::PayloadGenerator};
8use hopr_chain_rpc::{HoprRpcOperations, errors::RpcError};
9use hopr_crypto_types::types::Hash;
10use hopr_internal_types::prelude::*;
11use hopr_primitive_types::prelude::*;
12use serde::{Deserialize, Serialize};
13
14#[async_trait]
17pub trait EthereumClient<T: Into<TransactionRequest>> {
18 async fn post_transaction(&self, tx: T) -> hopr_chain_rpc::errors::Result<Hash>;
22
23 async fn post_transaction_and_await_confirmation(&self, tx: T) -> hopr_chain_rpc::errors::Result<Hash>;
28}
29
30#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault, Serialize, Deserialize)]
31pub struct RpcEthereumClientConfig {
32 #[default(Duration::from_secs(5))]
38 pub max_tx_submission_wait: Duration,
39}
40
41#[derive(Debug, Clone)]
43pub struct RpcEthereumClient<Rpc: HoprRpcOperations> {
44 rpc: Rpc,
45 cfg: RpcEthereumClientConfig,
46}
47
48impl<Rpc: HoprRpcOperations> RpcEthereumClient<Rpc> {
49 pub fn new(rpc: Rpc, cfg: RpcEthereumClientConfig) -> Self {
50 Self { rpc, cfg }
51 }
52
53 async fn post_tx_with_timeout(&self, tx: TransactionRequest) -> hopr_chain_rpc::errors::Result<PendingTransaction> {
58 let submit_tx = self.rpc.send_transaction(tx).fuse();
59 let timeout = sleep(self.cfg.max_tx_submission_wait).fuse();
60 pin_mut!(submit_tx, timeout);
61
62 match futures::future::select(submit_tx, timeout).await {
63 Either::Left((res, _)) => res,
64 Either::Right(_) => Err(RpcError::Timeout),
65 }
66 }
67
68 async fn post_tx_with_timeout_and_confirm(&self, tx: TransactionRequest) -> hopr_chain_rpc::errors::Result<Hash> {
73 let submit_tx = self.rpc.send_transaction_with_confirm(tx).fuse();
74 let timeout = sleep(self.cfg.max_tx_submission_wait).fuse();
75 pin_mut!(submit_tx, timeout);
76
77 match futures::future::select(submit_tx, timeout).await {
78 Either::Left((res, _)) => res,
79 Either::Right(_) => Err(RpcError::Timeout),
80 }
81 }
82}
83
84#[async_trait]
85impl<Rpc: HoprRpcOperations + Send + Sync> EthereumClient<TransactionRequest> for RpcEthereumClient<Rpc> {
86 async fn post_transaction(&self, tx: TransactionRequest) -> hopr_chain_rpc::errors::Result<Hash> {
87 self.post_tx_with_timeout(tx).await.map(|t| t.tx_hash().0.into())
88 }
89
90 async fn post_transaction_and_await_confirmation(
94 &self,
95 tx: TransactionRequest,
96 ) -> hopr_chain_rpc::errors::Result<Hash> {
97 self.post_tx_with_timeout_and_confirm(tx).await
98 }
99}
100
101#[derive(Clone, Debug)]
104pub struct EthereumTransactionExecutor<T, C, PGen>
105where
106 T: Into<TransactionRequest>,
107 C: EthereumClient<T> + Clone,
108 PGen: PayloadGenerator<T> + Clone,
109{
110 client: C,
111 payload_generator: PGen,
112 _data: PhantomData<T>,
113}
114
115impl<T, C, PGen> EthereumTransactionExecutor<T, C, PGen>
116where
117 T: Into<TransactionRequest>,
118 C: EthereumClient<T> + Clone,
119 PGen: PayloadGenerator<T> + Clone,
120{
121 pub fn new(client: C, payload_generator: PGen) -> Self {
122 Self {
123 client,
124 payload_generator,
125 _data: PhantomData,
126 }
127 }
128}
129
130#[async_trait]
131impl<T, C, PGen> TransactionExecutor for EthereumTransactionExecutor<T, C, PGen>
132where
133 T: Into<TransactionRequest> + Sync + Send,
134 C: EthereumClient<T> + Clone + Sync + Send,
135 PGen: PayloadGenerator<T> + Clone + Sync + Send,
136{
137 async fn redeem_ticket(&self, acked_ticket: RedeemableTicket) -> hopr_chain_actions::errors::Result<Hash> {
138 let payload = self.payload_generator.redeem_ticket(acked_ticket)?;
139 Ok(self.client.post_transaction(payload).await?)
140 }
141
142 async fn fund_channel(
143 &self,
144 destination: Address,
145 balance: HoprBalance,
146 ) -> hopr_chain_actions::errors::Result<Hash> {
147 let payload = self.payload_generator.fund_channel(destination, balance)?;
148 Ok(self.client.post_transaction(payload).await?)
149 }
150
151 async fn initiate_outgoing_channel_closure(&self, dst: Address) -> hopr_chain_actions::errors::Result<Hash> {
152 let payload = self.payload_generator.initiate_outgoing_channel_closure(dst)?;
153 Ok(self.client.post_transaction(payload).await?)
154 }
155
156 async fn finalize_outgoing_channel_closure(&self, dst: Address) -> hopr_chain_actions::errors::Result<Hash> {
157 let payload = self.payload_generator.finalize_outgoing_channel_closure(dst)?;
158 Ok(self.client.post_transaction(payload).await?)
159 }
160
161 async fn close_incoming_channel(&self, src: Address) -> hopr_chain_actions::errors::Result<Hash> {
162 let payload = self.payload_generator.close_incoming_channel(src)?;
163 Ok(self.client.post_transaction(payload).await?)
164 }
165
166 async fn withdraw<Cr: Currency + Send>(
167 &self,
168 recipient: Address,
169 amount: Balance<Cr>,
170 ) -> hopr_chain_actions::errors::Result<Hash> {
171 let payload = self.payload_generator.transfer(recipient, amount)?;
172
173 Ok(self.client.post_transaction_and_await_confirmation(payload).await?)
176 }
177
178 async fn announce(&self, data: AnnouncementData) -> hopr_chain_actions::errors::Result<Hash> {
179 let payload = self.payload_generator.announce(data)?;
180 Ok(self.client.post_transaction(payload).await?)
181 }
182
183 async fn register_safe(&self, safe_address: Address) -> hopr_chain_actions::errors::Result<Hash> {
184 let payload = self.payload_generator.register_safe_by_node(safe_address)?;
185 Ok(self.client.post_transaction(payload).await?)
186 }
187}