Skip to main content

hopr_chain_connector/
reader.rs

1use std::time::Duration;
2
3use blokli_client::api::{BlokliQueryClient, BlokliSubscriptionClient, RedeemedStatsSelector};
4use futures::{StreamExt, TryStreamExt};
5use futures_time::future::FutureExt as FuturesTimeExt;
6use hopr_api::{
7    chain::{ChainInfo, DeployedSafe, DomainSeparators, RedemptionStats, SafeSelector},
8    types::{internal::prelude::*, primitive::prelude::*},
9};
10
11use crate::{
12    errors::ConnectorError,
13    utils::{model_to_chain_info, model_to_deployed_safe, model_to_redeemed_stats},
14};
15
16/// A simplified version of [`HoprBlockchainConnector`](crate::HoprBlockchainConnector)
17/// which only implements [HOPR Chain API](hopr_api::chain) partially, allowing for read-only operations.
18///
19/// This object specifically implements only the following traits:
20///
21/// - [`ChainValues`](hopr_api::chain::ChainValues)
22/// - [`ChainReadSafeOperations`](hopr_api::chain::ChainReadSafeOperations)
23///
24/// The implementation is currently realized using the Blokli client and acts as a partial HOPR Chain API compatible
25/// wrapper for [`blokli_client::BlokliClient`].
26///
27/// This object is useful for bootstrapping purposes that usually precede construction of the [full
28/// connector](crate::HoprBlockchainConnector).
29pub struct HoprBlockchainReader<C>(pub(crate) std::sync::Arc<C>);
30
31impl<C> HoprBlockchainReader<C> {
32    /// Creates new instance given the `client`.
33    pub fn new(client: C) -> Self {
34        Self(std::sync::Arc::new(client))
35    }
36}
37
38impl<C> Clone for HoprBlockchainReader<C> {
39    fn clone(&self) -> Self {
40        Self(self.0.clone())
41    }
42}
43
44#[async_trait::async_trait]
45impl<C> hopr_api::chain::ChainValues for HoprBlockchainReader<C>
46where
47    C: BlokliQueryClient + Send + Sync,
48{
49    type Error = ConnectorError;
50
51    async fn balance<Cy: Currency, A: Into<Address> + Send>(&self, address: A) -> Result<Balance<Cy>, Self::Error> {
52        let address = address.into();
53        if Cy::is::<WxHOPR>() {
54            Ok(self.0.query_token_balance(&address.into()).await?.balance.0.parse()?)
55        } else if Cy::is::<XDai>() {
56            Ok(self.0.query_native_balance(&address.into()).await?.balance.0.parse()?)
57        } else {
58            Err(ConnectorError::InvalidState("unsupported currency"))
59        }
60    }
61
62    async fn domain_separators(&self) -> Result<DomainSeparators, Self::Error> {
63        let chain_info = self.0.query_chain_info().await?;
64        Ok(model_to_chain_info(chain_info)?.domain_separators)
65    }
66
67    async fn minimum_incoming_ticket_win_prob(&self) -> Result<WinningProbability, Self::Error> {
68        let chain_info = self.0.query_chain_info().await?;
69        Ok(model_to_chain_info(chain_info)?.ticket_win_prob)
70    }
71
72    async fn minimum_ticket_price(&self) -> Result<HoprBalance, Self::Error> {
73        let chain_info = self.0.query_chain_info().await?;
74        Ok(model_to_chain_info(chain_info)?.ticket_price)
75    }
76
77    async fn key_binding_fee(&self) -> Result<HoprBalance, Self::Error> {
78        let chain_info = self.0.query_chain_info().await?;
79        Ok(model_to_chain_info(chain_info)?.key_binding_fee)
80    }
81
82    async fn channel_closure_notice_period(&self) -> Result<Duration, Self::Error> {
83        let chain_info = self.0.query_chain_info().await?;
84        Ok(model_to_chain_info(chain_info)?.channel_closure_grace_period)
85    }
86
87    async fn chain_info(&self) -> Result<ChainInfo, Self::Error> {
88        let chain_info = self.0.query_chain_info().await?;
89        Ok(model_to_chain_info(chain_info)?.info)
90    }
91
92    async fn redemption_stats<A: Into<Address> + Send>(&self, safe_addr: A) -> Result<RedemptionStats, Self::Error> {
93        let safe_addr = safe_addr.into();
94        model_to_redeemed_stats(
95            self.0
96                .query_redeemed_stats(RedeemedStatsSelector::SafeAddress(safe_addr.into()))
97                .await?,
98        )
99    }
100
101    async fn typical_resolution_time(&self) -> Result<Duration, Self::Error> {
102        let chain_info = self.0.query_chain_info().await?;
103        let info = model_to_chain_info(chain_info)?;
104        Ok(info.expected_block_time * info.finality)
105    }
106}
107
108#[async_trait::async_trait]
109impl<C> hopr_api::chain::ChainReadSafeOperations for HoprBlockchainReader<C>
110where
111    C: BlokliQueryClient + BlokliSubscriptionClient + Send + Sync,
112{
113    type Error = ConnectorError;
114
115    async fn safe_allowance<Cy: Currency, A: Into<Address> + Send>(
116        &self,
117        safe_address: A,
118    ) -> Result<Balance<Cy>, Self::Error> {
119        let address = safe_address.into();
120        if Cy::is::<WxHOPR>() {
121            Ok(self
122                .0
123                .query_safe_allowance(&address.into())
124                .await?
125                .allowance
126                .0
127                .parse()?)
128        } else if Cy::is::<XDai>() {
129            Err(ConnectorError::InvalidState("cannot query allowance on xDai"))
130        } else {
131            Err(ConnectorError::InvalidState("unsupported currency"))
132        }
133    }
134
135    async fn safe_info(&self, selector: SafeSelector) -> Result<Option<DeployedSafe>, Self::Error> {
136        let selector = match selector {
137            SafeSelector::Address(safe_address) => blokli_client::api::SafeSelector::SafeAddress(safe_address.into()),
138            SafeSelector::Deployer(deployer_address) => {
139                blokli_client::api::SafeSelector::ChainKey(deployer_address.into())
140            }
141            SafeSelector::NodeAddress(node_address) => {
142                blokli_client::api::SafeSelector::RegisteredNode(node_address.into())
143            }
144            SafeSelector::Owner(owner_address) => blokli_client::api::SafeSelector::Owner(owner_address.into()),
145        };
146
147        if let Some(safe) = self.0.query_safe(selector).await?.first() {
148            Ok(Some(model_to_deployed_safe(safe.clone())?))
149        } else {
150            Ok(None)
151        }
152    }
153
154    async fn await_safe_deployment(
155        &self,
156        selector: SafeSelector,
157        timeout: Duration,
158    ) -> Result<DeployedSafe, Self::Error> {
159        if let Some(safe) = self.safe_info(selector).await? {
160            return Ok(safe);
161        }
162
163        let res = self
164            .0
165            .subscribe_safe_deployments()?
166            .map_err(ConnectorError::from)
167            .and_then(|safe| futures::future::ready(model_to_deployed_safe(safe)))
168            .try_skip_while(|deployed_safe| futures::future::ok(!selector.satisfies(deployed_safe)))
169            .take(1)
170            .try_collect::<Vec<_>>()
171            .timeout(futures_time::time::Duration::from(timeout.max(Duration::from_secs(1))))
172            .await
173            .map_err(|_| ConnectorError::other(anyhow::anyhow!("timeout while waiting for safe deployment")))??;
174
175        res.into_iter()
176            .next()
177            .ok_or(ConnectorError::InvalidState("safe deployment stream closed"))
178    }
179
180    async fn predict_module_address(
181        &self,
182        nonce: u64,
183        owner: &Address,
184        safe_address: &Address,
185    ) -> Result<Address, Self::Error> {
186        Ok(self
187            .0
188            .query_module_address_prediction(blokli_client::api::ModulePredictionInput {
189                nonce,
190                owner: (*owner).into(),
191                safe_address: (*safe_address).into(),
192            })
193            .await?
194            .into())
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use hopr_api::chain::ChainValues;
201
202    use super::*;
203    use crate::testing::BlokliTestStateBuilder;
204
205    #[tokio::test]
206    async fn redeemed_stats() -> anyhow::Result<()> {
207        let blokli_client = BlokliTestStateBuilder::default()
208            .with_deployed_safes([DeployedSafe {
209                address: [1u8; Address::SIZE].into(),
210                owners: vec![[2u8; Address::SIZE].into()],
211                module: [3u8; Address::SIZE].into(),
212                registered_nodes: vec![],
213                deployer: [2u8; Address::SIZE].into(),
214            }])
215            .with_hopr_network_chain_info("rotsee")
216            .build_static_client();
217
218        let reader = HoprBlockchainReader::new(blokli_client);
219
220        let stats = reader.redemption_stats([1u8; Address::SIZE]).await?;
221        assert_eq!(0, stats.redeemed_count);
222        assert_eq!(HoprBalance::zero(), stats.redeemed_value);
223
224        Ok(())
225    }
226}