hopr_chain_connector/
reader.rs1use std::time::Duration;
2
3use blokli_client::api::{BlokliQueryClient, BlokliSubscriptionClient};
4use futures::{StreamExt, TryStreamExt};
5use futures_time::future::FutureExt as FuturesTimeExt;
6use hopr_api::{
7 chain::{ChainInfo, DeployedSafe, DomainSeparators, SafeSelector},
8 types::{internal::prelude::*, primitive::prelude::*},
9};
10
11use crate::{
12 errors::ConnectorError,
13 utils::{model_to_chain_info, model_to_deployed_safe},
14};
15
16pub struct HoprBlockchainReader<C>(pub(crate) std::sync::Arc<C>);
30
31impl<C> HoprBlockchainReader<C> {
32 pub fn new(client: C) -> Self {
34 Self(std::sync::Arc::new(client))
35 }
36}
37
38impl<C> Clone for HoprBlockchainReader<C> {
39 fn clone(&self) -> Self {
40 Self(self.0.clone())
41 }
42}
43
44#[async_trait::async_trait]
45impl<C> hopr_api::chain::ChainValues for HoprBlockchainReader<C>
46where
47 C: BlokliQueryClient + Send + Sync,
48{
49 type Error = ConnectorError;
50
51 async fn balance<Cy: Currency, A: Into<Address> + Send>(&self, address: A) -> Result<Balance<Cy>, Self::Error> {
52 let address = address.into();
53 if Cy::is::<WxHOPR>() {
54 Ok(self.0.query_token_balance(&address.into()).await?.balance.0.parse()?)
55 } else if Cy::is::<XDai>() {
56 Ok(self.0.query_native_balance(&address.into()).await?.balance.0.parse()?)
57 } else {
58 Err(ConnectorError::InvalidState("unsupported currency"))
59 }
60 }
61
62 async fn domain_separators(&self) -> Result<DomainSeparators, Self::Error> {
63 let chain_info = self.0.query_chain_info().await?;
64 Ok(model_to_chain_info(chain_info)?.domain_separators)
65 }
66
67 async fn minimum_incoming_ticket_win_prob(&self) -> Result<WinningProbability, Self::Error> {
68 let chain_info = self.0.query_chain_info().await?;
69 Ok(model_to_chain_info(chain_info)?.ticket_win_prob)
70 }
71
72 async fn minimum_ticket_price(&self) -> Result<HoprBalance, Self::Error> {
73 let chain_info = self.0.query_chain_info().await?;
74 Ok(model_to_chain_info(chain_info)?.ticket_price)
75 }
76
77 async fn key_binding_fee(&self) -> Result<HoprBalance, Self::Error> {
78 let chain_info = self.0.query_chain_info().await?;
79 Ok(model_to_chain_info(chain_info)?.key_binding_fee)
80 }
81
82 async fn channel_closure_notice_period(&self) -> Result<Duration, Self::Error> {
83 let chain_info = self.0.query_chain_info().await?;
84 Ok(model_to_chain_info(chain_info)?.channel_closure_grace_period)
85 }
86
87 async fn chain_info(&self) -> Result<ChainInfo, Self::Error> {
88 let chain_info = self.0.query_chain_info().await?;
89 Ok(model_to_chain_info(chain_info)?.info)
90 }
91}
92
93#[async_trait::async_trait]
94impl<C> hopr_api::chain::ChainReadSafeOperations for HoprBlockchainReader<C>
95where
96 C: BlokliQueryClient + BlokliSubscriptionClient + Send + Sync,
97{
98 type Error = ConnectorError;
99
100 async fn safe_allowance<Cy: Currency, A: Into<Address> + Send>(
101 &self,
102 safe_address: A,
103 ) -> Result<Balance<Cy>, Self::Error> {
104 let address = safe_address.into();
105 if Cy::is::<WxHOPR>() {
106 Ok(self
107 .0
108 .query_safe_allowance(&address.into())
109 .await?
110 .allowance
111 .0
112 .parse()?)
113 } else if Cy::is::<XDai>() {
114 Err(ConnectorError::InvalidState("cannot query allowance on xDai"))
115 } else {
116 Err(ConnectorError::InvalidState("unsupported currency"))
117 }
118 }
119
120 async fn safe_info(&self, selector: SafeSelector) -> Result<Option<DeployedSafe>, Self::Error> {
121 let selector = match selector {
122 SafeSelector::Owner(owner_address) => blokli_client::api::SafeSelector::ChainKey(owner_address.into()),
123 SafeSelector::Address(safe_address) => blokli_client::api::SafeSelector::SafeAddress(safe_address.into()),
124 };
125
126 if let Some(safe) = self.0.query_safe(selector).await? {
127 Ok(Some(model_to_deployed_safe(safe)?))
128 } else {
129 Ok(None)
130 }
131 }
132
133 async fn await_safe_deployment(
134 &self,
135 selector: SafeSelector,
136 timeout: Duration,
137 ) -> Result<DeployedSafe, Self::Error> {
138 if let Some(safe) = self.safe_info(selector).await? {
139 return Ok(safe);
140 }
141
142 let res = self
143 .0
144 .subscribe_safe_deployments()?
145 .map_err(ConnectorError::from)
146 .and_then(|safe| futures::future::ready(model_to_deployed_safe(safe)))
147 .try_skip_while(|deployed_safe| futures::future::ok(!selector.satisfies(deployed_safe)))
148 .take(1)
149 .try_collect::<Vec<_>>()
150 .timeout(futures_time::time::Duration::from(timeout.max(Duration::from_secs(1))))
151 .await
152 .map_err(|_| ConnectorError::other(anyhow::anyhow!("timeout while waiting for safe deployment")))??;
153
154 res.into_iter()
155 .next()
156 .ok_or(ConnectorError::InvalidState("safe deployment stream closed"))
157 }
158
159 async fn predict_module_address(
160 &self,
161 nonce: u64,
162 owner: &Address,
163 safe_address: &Address,
164 ) -> Result<Address, Self::Error> {
165 Ok(self
166 .0
167 .query_module_address_prediction(blokli_client::api::ModulePredictionInput {
168 nonce,
169 owner: (*owner).into(),
170 safe_address: (*safe_address).into(),
171 })
172 .await?
173 .into())
174 }
175}