hopr_chain_api/
executors.rs1use std::{marker::PhantomData, time::Duration};
2
3use alloy::{providers::PendingTransaction, rpc::types::TransactionRequest};
4use async_trait::async_trait;
5use futures::{FutureExt, future::Either, pin_mut};
6use hopr_async_runtime::prelude::sleep;
7use hopr_chain_actions::{action_queue::TransactionExecutor, payload::PayloadGenerator};
8use hopr_chain_rpc::{HoprRpcOperations, errors::RpcError};
9use hopr_crypto_types::types::Hash;
10use hopr_internal_types::prelude::*;
11use hopr_primitive_types::prelude::*;
12use serde::{Deserialize, Serialize};
13
14#[async_trait]
17pub trait EthereumClient<T: Into<TransactionRequest>> {
18 async fn post_transaction(&self, tx: T) -> hopr_chain_rpc::errors::Result<Hash>;
22
23 async fn post_transaction_and_await_confirmation(&self, tx: T) -> hopr_chain_rpc::errors::Result<Hash>;
28}
29
30#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault, Serialize, Deserialize)]
31pub struct RpcEthereumClientConfig {
32 #[default(Duration::from_secs(5))]
38 pub max_tx_submission_wait: Duration,
39}
40
41#[derive(Debug, Clone)]
43pub struct RpcEthereumClient<Rpc: HoprRpcOperations> {
44 rpc: Rpc,
45 cfg: RpcEthereumClientConfig,
46}
47
48impl<Rpc: HoprRpcOperations> RpcEthereumClient<Rpc> {
49 pub fn new(rpc: Rpc, cfg: RpcEthereumClientConfig) -> Self {
50 Self { rpc, cfg }
51 }
52
53 async fn post_tx_with_timeout(&self, tx: TransactionRequest) -> hopr_chain_rpc::errors::Result<PendingTransaction> {
58 let submit_tx = self.rpc.send_transaction(tx).fuse();
59 let timeout = sleep(self.cfg.max_tx_submission_wait).fuse();
60 pin_mut!(submit_tx, timeout);
61
62 match futures::future::select(submit_tx, timeout).await {
63 Either::Left((res, _)) => res,
64 Either::Right(_) => Err(RpcError::Timeout),
65 }
66 }
67}
68
69#[async_trait]
70impl<Rpc: HoprRpcOperations + Send + Sync> EthereumClient<TransactionRequest> for RpcEthereumClient<Rpc> {
71 async fn post_transaction(&self, tx: TransactionRequest) -> hopr_chain_rpc::errors::Result<Hash> {
72 self.post_tx_with_timeout(tx).await.map(|t| t.tx_hash().0.into())
73 }
74
75 async fn post_transaction_and_await_confirmation(
79 &self,
80 tx: TransactionRequest,
81 ) -> hopr_chain_rpc::errors::Result<Hash> {
82 Ok(self.post_tx_with_timeout(tx).await?.await?.0.into())
83 }
84}
85
86#[derive(Clone, Debug)]
89pub struct EthereumTransactionExecutor<T, C, PGen>
90where
91 T: Into<TransactionRequest>,
92 C: EthereumClient<T> + Clone,
93 PGen: PayloadGenerator<T> + Clone,
94{
95 client: C,
96 payload_generator: PGen,
97 _data: PhantomData<T>,
98}
99
100impl<T, C, PGen> EthereumTransactionExecutor<T, C, PGen>
101where
102 T: Into<TransactionRequest>,
103 C: EthereumClient<T> + Clone,
104 PGen: PayloadGenerator<T> + Clone,
105{
106 pub fn new(client: C, payload_generator: PGen) -> Self {
107 Self {
108 client,
109 payload_generator,
110 _data: PhantomData,
111 }
112 }
113}
114
115#[async_trait]
116impl<T, C, PGen> TransactionExecutor for EthereumTransactionExecutor<T, C, PGen>
117where
118 T: Into<TransactionRequest> + Sync + Send,
119 C: EthereumClient<T> + Clone + Sync + Send,
120 PGen: PayloadGenerator<T> + Clone + Sync + Send,
121{
122 async fn redeem_ticket(&self, acked_ticket: RedeemableTicket) -> hopr_chain_actions::errors::Result<Hash> {
123 let payload = self.payload_generator.redeem_ticket(acked_ticket)?;
124 Ok(self.client.post_transaction(payload).await?)
125 }
126
127 async fn fund_channel(
128 &self,
129 destination: Address,
130 balance: HoprBalance,
131 ) -> hopr_chain_actions::errors::Result<Hash> {
132 let payload = self.payload_generator.fund_channel(destination, balance)?;
133 Ok(self.client.post_transaction(payload).await?)
134 }
135
136 async fn initiate_outgoing_channel_closure(&self, dst: Address) -> hopr_chain_actions::errors::Result<Hash> {
137 let payload = self.payload_generator.initiate_outgoing_channel_closure(dst)?;
138 Ok(self.client.post_transaction(payload).await?)
139 }
140
141 async fn finalize_outgoing_channel_closure(&self, dst: Address) -> hopr_chain_actions::errors::Result<Hash> {
142 let payload = self.payload_generator.finalize_outgoing_channel_closure(dst)?;
143 Ok(self.client.post_transaction(payload).await?)
144 }
145
146 async fn close_incoming_channel(&self, src: Address) -> hopr_chain_actions::errors::Result<Hash> {
147 let payload = self.payload_generator.close_incoming_channel(src)?;
148 Ok(self.client.post_transaction(payload).await?)
149 }
150
151 async fn withdraw<Cr: Currency + Send>(
152 &self,
153 recipient: Address,
154 amount: Balance<Cr>,
155 ) -> hopr_chain_actions::errors::Result<Hash> {
156 let payload = self.payload_generator.transfer(recipient, amount)?;
157
158 Ok(self.client.post_transaction_and_await_confirmation(payload).await?)
161 }
162
163 async fn announce(&self, data: AnnouncementData) -> hopr_chain_actions::errors::Result<Hash> {
164 let payload = self.payload_generator.announce(data)?;
165 Ok(self.client.post_transaction(payload).await?)
166 }
167
168 async fn register_safe(&self, safe_address: Address) -> hopr_chain_actions::errors::Result<Hash> {
169 let payload = self.payload_generator.register_safe_by_node(safe_address)?;
170 Ok(self.client.post_transaction(payload).await?)
171 }
172}