hopr_chain_api/
executors.rsuse async_trait::async_trait;
use futures::future::Either;
use futures::{pin_mut, FutureExt};
use serde::{Deserialize, Serialize};
use std::marker::PhantomData;
use std::time::Duration;
use hopr_async_runtime::prelude::sleep;
use hopr_chain_actions::action_queue::TransactionExecutor;
use hopr_chain_actions::payload::PayloadGenerator;
use hopr_chain_rpc::errors::RpcError;
use hopr_chain_rpc::{HoprRpcOperations, PendingTransaction};
use hopr_chain_types::TypedTransaction;
use hopr_crypto_types::types::Hash;
use hopr_internal_types::prelude::*;
use hopr_primitive_types::prelude::*;
#[async_trait]
pub trait EthereumClient<T: Into<TypedTransaction>> {
async fn post_transaction(&self, tx: T) -> hopr_chain_rpc::errors::Result<Hash>;
async fn post_transaction_and_await_confirmation(&self, tx: T) -> hopr_chain_rpc::errors::Result<Hash>;
}
#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault, Serialize, Deserialize)]
pub struct RpcEthereumClientConfig {
#[default(Duration::from_secs(30))]
pub max_tx_submission_wait: Duration,
}
#[derive(Debug, Clone)]
pub struct RpcEthereumClient<Rpc: HoprRpcOperations> {
rpc: Rpc,
cfg: RpcEthereumClientConfig,
}
impl<Rpc: HoprRpcOperations> RpcEthereumClient<Rpc> {
pub fn new(rpc: Rpc, cfg: RpcEthereumClientConfig) -> Self {
Self { rpc, cfg }
}
async fn post_tx_with_timeout(&self, tx: TypedTransaction) -> hopr_chain_rpc::errors::Result<PendingTransaction> {
let submit_tx = self.rpc.send_transaction(tx).fuse();
let timeout = sleep(self.cfg.max_tx_submission_wait).fuse();
pin_mut!(submit_tx, timeout);
match futures::future::select(submit_tx, timeout).await {
Either::Left((res, _)) => res,
Either::Right(_) => Err(RpcError::Timeout),
}
}
}
#[async_trait]
impl<Rpc: HoprRpcOperations + Send + Sync> EthereumClient<TypedTransaction> for RpcEthereumClient<Rpc> {
async fn post_transaction(&self, tx: TypedTransaction) -> hopr_chain_rpc::errors::Result<Hash> {
self.post_tx_with_timeout(tx).await.map(|t| t.tx_hash())
}
async fn post_transaction_and_await_confirmation(
&self,
tx: TypedTransaction,
) -> hopr_chain_rpc::errors::Result<Hash> {
Ok(self.post_tx_with_timeout(tx).await?.await?.tx_hash)
}
}
#[derive(Clone, Debug)]
pub struct EthereumTransactionExecutor<T, C, PGen>
where
T: Into<TypedTransaction>,
C: EthereumClient<T> + Clone,
PGen: PayloadGenerator<T> + Clone,
{
client: C,
payload_generator: PGen,
_data: PhantomData<T>,
}
impl<T, C, PGen> EthereumTransactionExecutor<T, C, PGen>
where
T: Into<TypedTransaction>,
C: EthereumClient<T> + Clone,
PGen: PayloadGenerator<T> + Clone,
{
pub fn new(client: C, payload_generator: PGen) -> Self {
Self {
client,
payload_generator,
_data: PhantomData,
}
}
}
#[async_trait]
impl<T, C, PGen> TransactionExecutor for EthereumTransactionExecutor<T, C, PGen>
where
T: Into<TypedTransaction> + Sync + Send,
C: EthereumClient<T> + Clone + Sync + Send,
PGen: PayloadGenerator<T> + Clone + Sync + Send,
{
async fn redeem_ticket(&self, acked_ticket: RedeemableTicket) -> hopr_chain_actions::errors::Result<Hash> {
let payload = self.payload_generator.redeem_ticket(acked_ticket)?;
Ok(self.client.post_transaction(payload).await?)
}
async fn fund_channel(&self, destination: Address, balance: Balance) -> hopr_chain_actions::errors::Result<Hash> {
let payload = self.payload_generator.fund_channel(destination, balance)?;
Ok(self.client.post_transaction(payload).await?)
}
async fn initiate_outgoing_channel_closure(&self, dst: Address) -> hopr_chain_actions::errors::Result<Hash> {
let payload = self.payload_generator.initiate_outgoing_channel_closure(dst)?;
Ok(self.client.post_transaction(payload).await?)
}
async fn finalize_outgoing_channel_closure(&self, dst: Address) -> hopr_chain_actions::errors::Result<Hash> {
let payload = self.payload_generator.finalize_outgoing_channel_closure(dst)?;
Ok(self.client.post_transaction(payload).await?)
}
async fn close_incoming_channel(&self, src: Address) -> hopr_chain_actions::errors::Result<Hash> {
let payload = self.payload_generator.close_incoming_channel(src)?;
Ok(self.client.post_transaction(payload).await?)
}
async fn withdraw(&self, recipient: Address, amount: Balance) -> hopr_chain_actions::errors::Result<Hash> {
let payload = self.payload_generator.transfer(recipient, amount)?;
Ok(self.client.post_transaction_and_await_confirmation(payload).await?)
}
async fn announce(&self, data: AnnouncementData) -> hopr_chain_actions::errors::Result<Hash> {
let payload = self.payload_generator.announce(data)?;
Ok(self.client.post_transaction(payload).await?)
}
async fn register_safe(&self, safe_address: Address) -> hopr_chain_actions::errors::Result<Hash> {
let payload = self.payload_generator.register_safe_by_node(safe_address)?;
Ok(self.client.post_transaction(payload).await?)
}
}