hopr_chain_connector/connector/
accounts.rs

1use std::str::FromStr;
2
3use blokli_client::api::{BlokliQueryClient, BlokliTransactionClient};
4use futures::{FutureExt, StreamExt, TryFutureExt, future::BoxFuture, stream::BoxStream};
5use hopr_api::chain::{AccountSelector, AnnouncementError, ChainReceipt, Multiaddr, SafeRegistrationError};
6use hopr_chain_types::prelude::*;
7use hopr_crypto_types::prelude::*;
8use hopr_internal_types::{
9    account::AccountEntry,
10    prelude::{AnnouncementData, KeyBinding},
11};
12use hopr_primitive_types::prelude::*;
13
14use crate::{backend::Backend, connector::HoprBlockchainConnector, errors::ConnectorError};
15
16impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
17where
18    B: Backend + Send + Sync + 'static,
19{
20    pub(crate) fn build_account_stream(
21        &self,
22        selector: AccountSelector,
23    ) -> Result<impl futures::Stream<Item = AccountEntry> + Send + 'static, ConnectorError> {
24        let accounts = self.graph.read().nodes().collect::<Vec<_>>();
25        let backend = self.backend.clone();
26        Ok(futures::stream::iter(accounts).filter_map(move |account_id| {
27            let backend = backend.clone();
28            let selector = selector.clone();
29            // This avoids the cache on purpose so it does not get spammed
30            async move {
31                match hopr_async_runtime::prelude::spawn_blocking(move || backend.get_account_by_id(&account_id)).await
32                {
33                    Ok(Ok(value)) => value.filter(|c| selector.satisfies(c)),
34                    Ok(Err(error)) => {
35                        tracing::error!(%error, %account_id, "backend error when looking up account");
36                        None
37                    }
38                    Err(error) => {
39                        tracing::error!(%error, %account_id, "join error when looking up account");
40                        None
41                    }
42                }
43            }
44        }))
45    }
46}
47
48#[async_trait::async_trait]
49impl<B, C, P, R> hopr_api::chain::ChainReadAccountOperations for HoprBlockchainConnector<C, B, P, R>
50where
51    B: Backend + Send + Sync + 'static,
52    C: BlokliQueryClient + Send + Sync + 'static,
53    P: Send + Sync + 'static,
54    R: Send + Sync,
55{
56    type Error = ConnectorError;
57
58    async fn get_balance<Cy: Currency, A: Into<Address> + Send>(&self, address: A) -> Result<Balance<Cy>, Self::Error> {
59        self.check_connection_state()?;
60
61        let address = address.into();
62        if Cy::is::<WxHOPR>() {
63            Ok(self
64                .client
65                .query_token_balance(&address.into())
66                .await?
67                .balance
68                .0
69                .parse()?)
70        } else if Cy::is::<XDai>() {
71            Ok(self
72                .client
73                .query_native_balance(&address.into())
74                .await?
75                .balance
76                .0
77                .parse()?)
78        } else {
79            Err(ConnectorError::InvalidState("unsupported currency"))
80        }
81    }
82
83    async fn safe_allowance<Cy: Currency, A: Into<Address> + Send>(
84        &self,
85        address: A,
86    ) -> Result<Balance<Cy>, Self::Error> {
87        self.check_connection_state()?;
88
89        let address = address.into();
90        if Cy::is::<WxHOPR>() {
91            Ok(self
92                .client
93                .query_safe_allowance(&address.into())
94                .await?
95                .allowance
96                .0
97                .parse()?)
98        } else if Cy::is::<XDai>() {
99            Err(ConnectorError::InvalidState("cannot query allowance on xDai"))
100        } else {
101            Err(ConnectorError::InvalidState("unsupported currency"))
102        }
103    }
104
105    async fn stream_accounts<'a>(
106        &'a self,
107        selector: AccountSelector,
108    ) -> Result<BoxStream<'a, AccountEntry>, Self::Error> {
109        self.check_connection_state()?;
110
111        Ok(self.build_account_stream(selector)?.boxed())
112    }
113
114    async fn count_accounts(&self, selector: AccountSelector) -> Result<usize, Self::Error> {
115        self.check_connection_state()?;
116
117        Ok(self.stream_accounts(selector).await?.count().await)
118    }
119}
120
121#[async_trait::async_trait]
122impl<B, C, P> hopr_api::chain::ChainWriteAccountOperations for HoprBlockchainConnector<C, B, P, P::TxRequest>
123where
124    B: Send + Sync,
125    C: BlokliTransactionClient + BlokliQueryClient + Send + Sync + 'static,
126    P: PayloadGenerator + Send + Sync + 'static,
127    P::TxRequest: Send + Sync + 'static,
128{
129    type Error = ConnectorError;
130
131    async fn announce(
132        &self,
133        multiaddrs: &[Multiaddr],
134        key: &OffchainKeypair,
135    ) -> Result<BoxFuture<'_, Result<ChainReceipt, Self::Error>>, AnnouncementError<Self::Error>> {
136        self.check_connection_state()
137            .map_err(AnnouncementError::ProcessingError)?;
138
139        let new_announced_addrs = ahash::HashSet::from_iter(multiaddrs.iter().map(|a| a.to_string()));
140
141        let existing_account = self
142            .client
143            .query_accounts(blokli_client::api::v1::AccountSelector::Address(
144                self.chain_key.public().to_address().into(),
145            ))
146            .await
147            .map_err(|e| AnnouncementError::ProcessingError(ConnectorError::from(e)))?
148            .into_iter()
149            .find(|account| OffchainPublicKey::from_str(&account.packet_key).is_ok_and(|k| &k == key.public()));
150
151        if let Some(account) = &existing_account {
152            let old_announced_addrs = ahash::HashSet::from_iter(account.multi_addresses.iter().cloned());
153            if old_announced_addrs == new_announced_addrs || old_announced_addrs.is_superset(&new_announced_addrs) {
154                return Err(AnnouncementError::AlreadyAnnounced);
155            }
156        }
157
158        let key_binding = KeyBinding::new(self.chain_key.public().to_address(), key);
159
160        let tx_req = self
161            .payload_generator
162            .announce(
163                AnnouncementData::new(key_binding, multiaddrs.first().cloned())
164                    .map_err(|e| AnnouncementError::ProcessingError(ConnectorError::OtherError(e.into())))?,
165                existing_account
166                    .map(|_| HoprBalance::zero())
167                    .unwrap_or(self.cfg.new_key_binding_fee),
168            )
169            .map_err(|e| AnnouncementError::ProcessingError(ConnectorError::from(e)))?;
170
171        Ok(self
172            .send_tx(tx_req)
173            .map_err(AnnouncementError::ProcessingError)
174            .await?
175            .boxed())
176    }
177
178    async fn withdraw<Cy: Currency + Send>(
179        &self,
180        balance: Balance<Cy>,
181        recipient: &Address,
182    ) -> Result<BoxFuture<'_, Result<ChainReceipt, Self::Error>>, Self::Error> {
183        self.check_connection_state()?;
184
185        let tx_req = self.payload_generator.transfer(*recipient, balance)?;
186
187        Ok(self.send_tx(tx_req).await?.boxed())
188    }
189
190    async fn register_safe(
191        &self,
192        safe_address: &Address,
193    ) -> Result<BoxFuture<'_, Result<ChainReceipt, Self::Error>>, SafeRegistrationError<Self::Error>> {
194        self.check_connection_state()
195            .map_err(SafeRegistrationError::ProcessingError)?;
196
197        if let Some(safe) = self
198            .client
199            .query_accounts(blokli_client::api::v1::AccountSelector::Address(
200                self.chain_key.public().to_address().into(),
201            ))
202            .await
203            .map_err(|e| SafeRegistrationError::ProcessingError(ConnectorError::from(e)))?
204            .iter()
205            .find_map(|account| account.safe_address.clone())
206        {
207            return Err(SafeRegistrationError::AlreadyRegistered(safe.parse().unwrap_or_else(
208                |e| {
209                    tracing::error!("failed to parse safe {safe} address: {e}");
210                    Address::default()
211                },
212            )));
213        }
214
215        let tx_req = self
216            .payload_generator
217            .register_safe_by_node(*safe_address)
218            .map_err(|e| SafeRegistrationError::ProcessingError(ConnectorError::from(e)))?;
219
220        Ok(self
221            .send_tx(tx_req)
222            .map_err(SafeRegistrationError::ProcessingError)
223            .await?
224            .boxed())
225    }
226}