hopr_chain_connector/
reader.rs1use std::time::Duration;
2
3use blokli_client::api::{BlokliQueryClient, BlokliSubscriptionClient};
4use futures::{StreamExt, TryStreamExt};
5use futures_time::future::FutureExt as FuturesTimeExt;
6use hopr_api::chain::{ChainInfo, DeployedSafe, DomainSeparators, SafeSelector};
7use hopr_internal_types::prelude::*;
8use hopr_primitive_types::prelude::*;
9
10use crate::{
11 errors::ConnectorError,
12 utils::{model_to_chain_info, model_to_deployed_safe},
13};
14
15pub struct HoprBlockchainReader<C>(pub(crate) std::sync::Arc<C>);
29
30impl<C> HoprBlockchainReader<C> {
31 pub fn new(client: C) -> Self {
33 Self(std::sync::Arc::new(client))
34 }
35}
36
37impl<C> Clone for HoprBlockchainReader<C> {
38 fn clone(&self) -> Self {
39 Self(self.0.clone())
40 }
41}
42
43#[async_trait::async_trait]
44impl<C> hopr_api::chain::ChainValues for HoprBlockchainReader<C>
45where
46 C: BlokliQueryClient + Send + Sync,
47{
48 type Error = ConnectorError;
49
50 async fn balance<Cy: Currency, A: Into<Address> + Send>(&self, address: A) -> Result<Balance<Cy>, Self::Error> {
51 let address = address.into();
52 if Cy::is::<WxHOPR>() {
53 Ok(self.0.query_token_balance(&address.into()).await?.balance.0.parse()?)
54 } else if Cy::is::<XDai>() {
55 Ok(self.0.query_native_balance(&address.into()).await?.balance.0.parse()?)
56 } else {
57 Err(ConnectorError::InvalidState("unsupported currency"))
58 }
59 }
60
61 async fn domain_separators(&self) -> Result<DomainSeparators, Self::Error> {
62 let chain_info = self.0.query_chain_info().await?;
63 Ok(model_to_chain_info(chain_info)?.domain_separators)
64 }
65
66 async fn minimum_incoming_ticket_win_prob(&self) -> Result<WinningProbability, Self::Error> {
67 let chain_info = self.0.query_chain_info().await?;
68 Ok(model_to_chain_info(chain_info)?.ticket_win_prob)
69 }
70
71 async fn minimum_ticket_price(&self) -> Result<HoprBalance, Self::Error> {
72 let chain_info = self.0.query_chain_info().await?;
73 Ok(model_to_chain_info(chain_info)?.ticket_price)
74 }
75
76 async fn key_binding_fee(&self) -> Result<HoprBalance, Self::Error> {
77 let chain_info = self.0.query_chain_info().await?;
78 Ok(model_to_chain_info(chain_info)?.key_binding_fee)
79 }
80
81 async fn channel_closure_notice_period(&self) -> Result<Duration, Self::Error> {
82 let chain_info = self.0.query_chain_info().await?;
83 Ok(model_to_chain_info(chain_info)?.channel_closure_grace_period)
84 }
85
86 async fn chain_info(&self) -> Result<ChainInfo, Self::Error> {
87 let chain_info = self.0.query_chain_info().await?;
88 Ok(model_to_chain_info(chain_info)?.info)
89 }
90}
91
92#[async_trait::async_trait]
93impl<C> hopr_api::chain::ChainReadSafeOperations for HoprBlockchainReader<C>
94where
95 C: BlokliQueryClient + BlokliSubscriptionClient + Send + Sync,
96{
97 type Error = ConnectorError;
98
99 async fn safe_allowance<Cy: Currency, A: Into<Address> + Send>(
100 &self,
101 safe_address: A,
102 ) -> Result<Balance<Cy>, Self::Error> {
103 let address = safe_address.into();
104 if Cy::is::<WxHOPR>() {
105 Ok(self
106 .0
107 .query_safe_allowance(&address.into())
108 .await?
109 .allowance
110 .0
111 .parse()?)
112 } else if Cy::is::<XDai>() {
113 Err(ConnectorError::InvalidState("cannot query allowance on xDai"))
114 } else {
115 Err(ConnectorError::InvalidState("unsupported currency"))
116 }
117 }
118
119 async fn safe_info(&self, selector: SafeSelector) -> Result<Option<DeployedSafe>, Self::Error> {
120 let selector = match selector {
121 SafeSelector::Owner(owner_address) => blokli_client::api::SafeSelector::ChainKey(owner_address.into()),
122 SafeSelector::Address(safe_address) => blokli_client::api::SafeSelector::SafeAddress(safe_address.into()),
123 };
124
125 if let Some(safe) = self.0.query_safe(selector).await? {
126 Ok(Some(model_to_deployed_safe(safe)?))
127 } else {
128 Ok(None)
129 }
130 }
131
132 async fn await_safe_deployment(
133 &self,
134 selector: SafeSelector,
135 timeout: Duration,
136 ) -> Result<DeployedSafe, Self::Error> {
137 if let Some(safe) = self.safe_info(selector).await? {
138 return Ok(safe);
139 }
140
141 let res = self
142 .0
143 .subscribe_safe_deployments()?
144 .map_err(ConnectorError::from)
145 .and_then(|safe| futures::future::ready(model_to_deployed_safe(safe)))
146 .try_skip_while(|deployed_safe| futures::future::ok(!selector.satisfies(deployed_safe)))
147 .take(1)
148 .try_collect::<Vec<_>>()
149 .timeout(futures_time::time::Duration::from(timeout.max(Duration::from_secs(1))))
150 .await??;
151
152 res.into_iter()
153 .next()
154 .ok_or(ConnectorError::InvalidState("safe deployment stream closed"))
155 }
156
157 async fn predict_module_address(
158 &self,
159 nonce: u64,
160 owner: &Address,
161 safe_address: &Address,
162 ) -> Result<Address, Self::Error> {
163 Ok(self
164 .0
165 .query_module_address_prediction(blokli_client::api::ModulePredictionInput {
166 nonce,
167 owner: (*owner).into(),
168 safe_address: (*safe_address).into(),
169 })
170 .await?
171 .into())
172 }
173}