Skip to main content

hopr_chain_connector/
reader.rs

1use std::time::Duration;
2
3use blokli_client::api::{BlokliQueryClient, BlokliSubscriptionClient};
4use futures::{StreamExt, TryStreamExt};
5use futures_time::future::FutureExt as FuturesTimeExt;
6use hopr_api::{
7    chain::{ChainInfo, DeployedSafe, DomainSeparators, SafeSelector},
8    types::{internal::prelude::*, primitive::prelude::*},
9};
10
11use crate::{
12    errors::ConnectorError,
13    utils::{model_to_chain_info, model_to_deployed_safe},
14};
15
16/// A simplified version of [`HoprBlockchainConnector`](crate::HoprBlockchainConnector)
17/// which only implements [HOPR Chain API](hopr_api::chain) partially, allowing for read-only operations.
18///
19/// This object specifically implements only the following traits:
20///
21/// - [`ChainValues`](hopr_api::chain::ChainValues)
22/// - [`ChainReadSafeOperations`](hopr_api::chain::ChainReadSafeOperations)
23///
24/// The implementation is currently realized using the Blokli client and acts as a partial HOPR Chain API compatible
25/// wrapper for [`blokli_client::BlokliClient`].
26///
27/// This object is useful for bootstrapping purposes that usually precede construction of the [full
28/// connector](crate::HoprBlockchainConnector).
29pub struct HoprBlockchainReader<C>(pub(crate) std::sync::Arc<C>);
30
31impl<C> HoprBlockchainReader<C> {
32    /// Creates new instance given the `client`.
33    pub fn new(client: C) -> Self {
34        Self(std::sync::Arc::new(client))
35    }
36}
37
38impl<C> Clone for HoprBlockchainReader<C> {
39    fn clone(&self) -> Self {
40        Self(self.0.clone())
41    }
42}
43
44#[async_trait::async_trait]
45impl<C> hopr_api::chain::ChainValues for HoprBlockchainReader<C>
46where
47    C: BlokliQueryClient + Send + Sync,
48{
49    type Error = ConnectorError;
50
51    async fn balance<Cy: Currency, A: Into<Address> + Send>(&self, address: A) -> Result<Balance<Cy>, Self::Error> {
52        let address = address.into();
53        if Cy::is::<WxHOPR>() {
54            Ok(self.0.query_token_balance(&address.into()).await?.balance.0.parse()?)
55        } else if Cy::is::<XDai>() {
56            Ok(self.0.query_native_balance(&address.into()).await?.balance.0.parse()?)
57        } else {
58            Err(ConnectorError::InvalidState("unsupported currency"))
59        }
60    }
61
62    async fn domain_separators(&self) -> Result<DomainSeparators, Self::Error> {
63        let chain_info = self.0.query_chain_info().await?;
64        Ok(model_to_chain_info(chain_info)?.domain_separators)
65    }
66
67    async fn minimum_incoming_ticket_win_prob(&self) -> Result<WinningProbability, Self::Error> {
68        let chain_info = self.0.query_chain_info().await?;
69        Ok(model_to_chain_info(chain_info)?.ticket_win_prob)
70    }
71
72    async fn minimum_ticket_price(&self) -> Result<HoprBalance, Self::Error> {
73        let chain_info = self.0.query_chain_info().await?;
74        Ok(model_to_chain_info(chain_info)?.ticket_price)
75    }
76
77    async fn key_binding_fee(&self) -> Result<HoprBalance, Self::Error> {
78        let chain_info = self.0.query_chain_info().await?;
79        Ok(model_to_chain_info(chain_info)?.key_binding_fee)
80    }
81
82    async fn channel_closure_notice_period(&self) -> Result<Duration, Self::Error> {
83        let chain_info = self.0.query_chain_info().await?;
84        Ok(model_to_chain_info(chain_info)?.channel_closure_grace_period)
85    }
86
87    async fn chain_info(&self) -> Result<ChainInfo, Self::Error> {
88        let chain_info = self.0.query_chain_info().await?;
89        Ok(model_to_chain_info(chain_info)?.info)
90    }
91}
92
93#[async_trait::async_trait]
94impl<C> hopr_api::chain::ChainReadSafeOperations for HoprBlockchainReader<C>
95where
96    C: BlokliQueryClient + BlokliSubscriptionClient + Send + Sync,
97{
98    type Error = ConnectorError;
99
100    async fn safe_allowance<Cy: Currency, A: Into<Address> + Send>(
101        &self,
102        safe_address: A,
103    ) -> Result<Balance<Cy>, Self::Error> {
104        let address = safe_address.into();
105        if Cy::is::<WxHOPR>() {
106            Ok(self
107                .0
108                .query_safe_allowance(&address.into())
109                .await?
110                .allowance
111                .0
112                .parse()?)
113        } else if Cy::is::<XDai>() {
114            Err(ConnectorError::InvalidState("cannot query allowance on xDai"))
115        } else {
116            Err(ConnectorError::InvalidState("unsupported currency"))
117        }
118    }
119
120    async fn safe_info(&self, selector: SafeSelector) -> Result<Option<DeployedSafe>, Self::Error> {
121        let selector = match selector {
122            SafeSelector::Owner(owner_address) => blokli_client::api::SafeSelector::ChainKey(owner_address.into()),
123            SafeSelector::Address(safe_address) => blokli_client::api::SafeSelector::SafeAddress(safe_address.into()),
124        };
125
126        if let Some(safe) = self.0.query_safe(selector).await? {
127            Ok(Some(model_to_deployed_safe(safe)?))
128        } else {
129            Ok(None)
130        }
131    }
132
133    async fn await_safe_deployment(
134        &self,
135        selector: SafeSelector,
136        timeout: Duration,
137    ) -> Result<DeployedSafe, Self::Error> {
138        if let Some(safe) = self.safe_info(selector).await? {
139            return Ok(safe);
140        }
141
142        let res = self
143            .0
144            .subscribe_safe_deployments()?
145            .map_err(ConnectorError::from)
146            .and_then(|safe| futures::future::ready(model_to_deployed_safe(safe)))
147            .try_skip_while(|deployed_safe| futures::future::ok(!selector.satisfies(deployed_safe)))
148            .take(1)
149            .try_collect::<Vec<_>>()
150            .timeout(futures_time::time::Duration::from(timeout.max(Duration::from_secs(1))))
151            .await
152            .map_err(|_| ConnectorError::other(anyhow::anyhow!("timeout while waiting for safe deployment")))??;
153
154        res.into_iter()
155            .next()
156            .ok_or(ConnectorError::InvalidState("safe deployment stream closed"))
157    }
158
159    async fn predict_module_address(
160        &self,
161        nonce: u64,
162        owner: &Address,
163        safe_address: &Address,
164    ) -> Result<Address, Self::Error> {
165        Ok(self
166            .0
167            .query_module_address_prediction(blokli_client::api::ModulePredictionInput {
168                nonce,
169                owner: (*owner).into(),
170                safe_address: (*safe_address).into(),
171            })
172            .await?
173            .into())
174    }
175}