hopr_chain_connector/
reader.rs1use std::time::Duration;
2
3use blokli_client::api::{BlokliQueryClient, BlokliSubscriptionClient, RedeemedStatsSelector};
4use futures::{StreamExt, TryStreamExt};
5use futures_time::future::FutureExt as FuturesTimeExt;
6use hopr_api::{
7 chain::{ChainInfo, DeployedSafe, DomainSeparators, RedemptionStats, SafeSelector},
8 types::{internal::prelude::*, primitive::prelude::*},
9};
10
11use crate::{
12 errors::ConnectorError,
13 utils::{model_to_chain_info, model_to_deployed_safe, model_to_redeemed_stats},
14};
15
16pub struct HoprBlockchainReader<C>(pub(crate) std::sync::Arc<C>);
30
31impl<C> HoprBlockchainReader<C> {
32 pub fn new(client: C) -> Self {
34 Self(std::sync::Arc::new(client))
35 }
36}
37
38impl<C> Clone for HoprBlockchainReader<C> {
39 fn clone(&self) -> Self {
40 Self(self.0.clone())
41 }
42}
43
44#[async_trait::async_trait]
45impl<C> hopr_api::chain::ChainValues for HoprBlockchainReader<C>
46where
47 C: BlokliQueryClient + Send + Sync,
48{
49 type Error = ConnectorError;
50
51 async fn balance<Cy: Currency, A: Into<Address> + Send>(&self, address: A) -> Result<Balance<Cy>, Self::Error> {
52 let address = address.into();
53 if Cy::is::<WxHOPR>() {
54 Ok(self.0.query_token_balance(&address.into()).await?.balance.0.parse()?)
55 } else if Cy::is::<XDai>() {
56 Ok(self.0.query_native_balance(&address.into()).await?.balance.0.parse()?)
57 } else {
58 Err(ConnectorError::InvalidState("unsupported currency"))
59 }
60 }
61
62 async fn domain_separators(&self) -> Result<DomainSeparators, Self::Error> {
63 let chain_info = self.0.query_chain_info().await?;
64 Ok(model_to_chain_info(chain_info)?.domain_separators)
65 }
66
67 async fn minimum_incoming_ticket_win_prob(&self) -> Result<WinningProbability, Self::Error> {
68 let chain_info = self.0.query_chain_info().await?;
69 Ok(model_to_chain_info(chain_info)?.ticket_win_prob)
70 }
71
72 async fn minimum_ticket_price(&self) -> Result<HoprBalance, Self::Error> {
73 let chain_info = self.0.query_chain_info().await?;
74 Ok(model_to_chain_info(chain_info)?.ticket_price)
75 }
76
77 async fn key_binding_fee(&self) -> Result<HoprBalance, Self::Error> {
78 let chain_info = self.0.query_chain_info().await?;
79 Ok(model_to_chain_info(chain_info)?.key_binding_fee)
80 }
81
82 async fn channel_closure_notice_period(&self) -> Result<Duration, Self::Error> {
83 let chain_info = self.0.query_chain_info().await?;
84 Ok(model_to_chain_info(chain_info)?.channel_closure_grace_period)
85 }
86
87 async fn chain_info(&self) -> Result<ChainInfo, Self::Error> {
88 let chain_info = self.0.query_chain_info().await?;
89 Ok(model_to_chain_info(chain_info)?.info)
90 }
91
92 async fn redemption_stats<A: Into<Address> + Send>(&self, safe_addr: A) -> Result<RedemptionStats, Self::Error> {
93 let safe_addr = safe_addr.into();
94 model_to_redeemed_stats(
95 self.0
96 .query_redeemed_stats(RedeemedStatsSelector::SafeAddress(safe_addr.into()))
97 .await?,
98 )
99 }
100
101 async fn typical_resolution_time(&self) -> Result<Duration, Self::Error> {
102 let chain_info = self.0.query_chain_info().await?;
103 let info = model_to_chain_info(chain_info)?;
104 Ok(info.expected_block_time * info.finality)
105 }
106}
107
108#[async_trait::async_trait]
109impl<C> hopr_api::chain::ChainReadSafeOperations for HoprBlockchainReader<C>
110where
111 C: BlokliQueryClient + BlokliSubscriptionClient + Send + Sync,
112{
113 type Error = ConnectorError;
114
115 async fn safe_allowance<Cy: Currency, A: Into<Address> + Send>(
116 &self,
117 safe_address: A,
118 ) -> Result<Balance<Cy>, Self::Error> {
119 let address = safe_address.into();
120 if Cy::is::<WxHOPR>() {
121 Ok(self
122 .0
123 .query_safe_allowance(&address.into())
124 .await?
125 .allowance
126 .0
127 .parse()?)
128 } else if Cy::is::<XDai>() {
129 Err(ConnectorError::InvalidState("cannot query allowance on xDai"))
130 } else {
131 Err(ConnectorError::InvalidState("unsupported currency"))
132 }
133 }
134
135 async fn safe_info(&self, selector: SafeSelector) -> Result<Option<DeployedSafe>, Self::Error> {
136 let selector = match selector {
137 SafeSelector::Address(safe_address) => blokli_client::api::SafeSelector::SafeAddress(safe_address.into()),
138 SafeSelector::Deployer(deployer_address) => {
139 blokli_client::api::SafeSelector::ChainKey(deployer_address.into())
140 }
141 SafeSelector::NodeAddress(node_address) => {
142 blokli_client::api::SafeSelector::RegisteredNode(node_address.into())
143 }
144 SafeSelector::Owner(owner_address) => blokli_client::api::SafeSelector::Owner(owner_address.into()),
145 };
146
147 if let Some(safe) = self.0.query_safe(selector).await?.first() {
148 Ok(Some(model_to_deployed_safe(safe.clone())?))
149 } else {
150 Ok(None)
151 }
152 }
153
154 async fn await_safe_deployment(
155 &self,
156 selector: SafeSelector,
157 timeout: Duration,
158 ) -> Result<DeployedSafe, Self::Error> {
159 if let Some(safe) = self.safe_info(selector).await? {
160 return Ok(safe);
161 }
162
163 let res = self
164 .0
165 .subscribe_safe_deployments()?
166 .map_err(ConnectorError::from)
167 .and_then(|safe| futures::future::ready(model_to_deployed_safe(safe)))
168 .try_skip_while(|deployed_safe| futures::future::ok(!selector.satisfies(deployed_safe)))
169 .take(1)
170 .try_collect::<Vec<_>>()
171 .timeout(futures_time::time::Duration::from(timeout.max(Duration::from_secs(1))))
172 .await
173 .map_err(|_| ConnectorError::other(anyhow::anyhow!("timeout while waiting for safe deployment")))??;
174
175 res.into_iter()
176 .next()
177 .ok_or(ConnectorError::InvalidState("safe deployment stream closed"))
178 }
179
180 async fn predict_module_address(
181 &self,
182 nonce: u64,
183 owner: &Address,
184 safe_address: &Address,
185 ) -> Result<Address, Self::Error> {
186 Ok(self
187 .0
188 .query_module_address_prediction(blokli_client::api::ModulePredictionInput {
189 nonce,
190 owner: (*owner).into(),
191 safe_address: (*safe_address).into(),
192 })
193 .await?
194 .into())
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use hopr_api::chain::ChainValues;
201
202 use super::*;
203 use crate::testing::BlokliTestStateBuilder;
204
205 #[tokio::test]
206 async fn redeemed_stats() -> anyhow::Result<()> {
207 let blokli_client = BlokliTestStateBuilder::default()
208 .with_deployed_safes([DeployedSafe {
209 address: [1u8; Address::SIZE].into(),
210 owners: vec![[2u8; Address::SIZE].into()],
211 module: [3u8; Address::SIZE].into(),
212 registered_nodes: vec![],
213 deployer: [2u8; Address::SIZE].into(),
214 }])
215 .with_hopr_network_chain_info("rotsee")
216 .build_static_client();
217
218 let reader = HoprBlockchainReader::new(blokli_client);
219
220 let stats = reader.redemption_stats([1u8; Address::SIZE]).await?;
221 assert_eq!(0, stats.redeemed_count);
222 assert_eq!(HoprBalance::zero(), stats.redeemed_value);
223
224 Ok(())
225 }
226}