hopr_chain_connector/connector/
accounts.rs1use std::str::FromStr;
2
3use blokli_client::api::{BlokliQueryClient, BlokliTransactionClient};
4use futures::{FutureExt, StreamExt, TryFutureExt, future::BoxFuture, stream::BoxStream};
5use hopr_api::chain::{AccountSelector, AnnouncementError, ChainReceipt, Multiaddr, SafeRegistrationError};
6use hopr_chain_types::prelude::*;
7use hopr_crypto_types::prelude::*;
8use hopr_internal_types::{
9 account::AccountEntry,
10 prelude::{AnnouncementData, KeyBinding},
11};
12use hopr_primitive_types::prelude::*;
13
14use crate::{backend::Backend, connector::HoprBlockchainConnector, errors::ConnectorError};
15
16impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
17where
18 B: Backend + Send + Sync + 'static,
19{
20 pub(crate) fn build_account_stream(
21 &self,
22 selector: AccountSelector,
23 ) -> Result<impl futures::Stream<Item = AccountEntry> + Send + 'static, ConnectorError> {
24 let accounts = self.graph.read().nodes().collect::<Vec<_>>();
25 let backend = self.backend.clone();
26 Ok(futures::stream::iter(accounts).filter_map(move |account_id| {
27 let backend = backend.clone();
28 let selector = selector.clone();
29 async move {
31 match hopr_async_runtime::prelude::spawn_blocking(move || backend.get_account_by_id(&account_id)).await
32 {
33 Ok(Ok(value)) => value.filter(|c| selector.satisfies(c)),
34 Ok(Err(error)) => {
35 tracing::error!(%error, %account_id, "backend error when looking up account");
36 None
37 }
38 Err(error) => {
39 tracing::error!(%error, %account_id, "join error when looking up account");
40 None
41 }
42 }
43 }
44 }))
45 }
46}
47
48#[async_trait::async_trait]
49impl<B, C, P, R> hopr_api::chain::ChainReadAccountOperations for HoprBlockchainConnector<C, B, P, R>
50where
51 B: Backend + Send + Sync + 'static,
52 C: BlokliQueryClient + Send + Sync + 'static,
53 P: Send + Sync + 'static,
54 R: Send + Sync,
55{
56 type Error = ConnectorError;
57
58 async fn get_balance<Cy: Currency, A: Into<Address> + Send>(&self, address: A) -> Result<Balance<Cy>, Self::Error> {
59 self.check_connection_state()?;
60
61 let address = address.into();
62 if Cy::is::<WxHOPR>() {
63 Ok(self
64 .client
65 .query_token_balance(&address.into())
66 .await?
67 .balance
68 .0
69 .parse()?)
70 } else if Cy::is::<XDai>() {
71 Ok(self
72 .client
73 .query_native_balance(&address.into())
74 .await?
75 .balance
76 .0
77 .parse()?)
78 } else {
79 Err(ConnectorError::InvalidState("unsupported currency"))
80 }
81 }
82
83 async fn safe_allowance<Cy: Currency, A: Into<Address> + Send>(
84 &self,
85 address: A,
86 ) -> Result<Balance<Cy>, Self::Error> {
87 self.check_connection_state()?;
88
89 let address = address.into();
90 if Cy::is::<WxHOPR>() {
91 Ok(self
92 .client
93 .query_safe_allowance(&address.into())
94 .await?
95 .allowance
96 .0
97 .parse()?)
98 } else if Cy::is::<XDai>() {
99 Err(ConnectorError::InvalidState("cannot query allowance on xDai"))
100 } else {
101 Err(ConnectorError::InvalidState("unsupported currency"))
102 }
103 }
104
105 async fn stream_accounts<'a>(
106 &'a self,
107 selector: AccountSelector,
108 ) -> Result<BoxStream<'a, AccountEntry>, Self::Error> {
109 self.check_connection_state()?;
110
111 Ok(self.build_account_stream(selector)?.boxed())
112 }
113
114 async fn count_accounts(&self, selector: AccountSelector) -> Result<usize, Self::Error> {
115 self.check_connection_state()?;
116
117 Ok(self.stream_accounts(selector).await?.count().await)
118 }
119}
120
121#[async_trait::async_trait]
122impl<B, C, P> hopr_api::chain::ChainWriteAccountOperations for HoprBlockchainConnector<C, B, P, P::TxRequest>
123where
124 B: Send + Sync,
125 C: BlokliTransactionClient + BlokliQueryClient + Send + Sync + 'static,
126 P: PayloadGenerator + Send + Sync + 'static,
127 P::TxRequest: Send + Sync + 'static,
128{
129 type Error = ConnectorError;
130
131 async fn announce(
132 &self,
133 multiaddrs: &[Multiaddr],
134 key: &OffchainKeypair,
135 ) -> Result<BoxFuture<'_, Result<ChainReceipt, Self::Error>>, AnnouncementError<Self::Error>> {
136 self.check_connection_state()
137 .map_err(AnnouncementError::ProcessingError)?;
138
139 let new_announced_addrs = ahash::HashSet::from_iter(multiaddrs.iter().map(|a| a.to_string()));
140
141 let existing_account = self
142 .client
143 .query_accounts(blokli_client::api::v1::AccountSelector::Address(
144 self.chain_key.public().to_address().into(),
145 ))
146 .await
147 .map_err(|e| AnnouncementError::ProcessingError(ConnectorError::from(e)))?
148 .into_iter()
149 .find(|account| OffchainPublicKey::from_str(&account.packet_key).is_ok_and(|k| &k == key.public()));
150
151 if let Some(account) = &existing_account {
152 let old_announced_addrs = ahash::HashSet::from_iter(account.multi_addresses.iter().cloned());
153 if old_announced_addrs == new_announced_addrs || old_announced_addrs.is_superset(&new_announced_addrs) {
154 return Err(AnnouncementError::AlreadyAnnounced);
155 }
156 }
157
158 let key_binding = KeyBinding::new(self.chain_key.public().to_address(), key);
159
160 let tx_req = self
161 .payload_generator
162 .announce(
163 AnnouncementData::new(key_binding, multiaddrs.first().cloned())
164 .map_err(|e| AnnouncementError::ProcessingError(ConnectorError::OtherError(e.into())))?,
165 existing_account
166 .map(|_| HoprBalance::zero())
167 .unwrap_or(self.cfg.new_key_binding_fee),
168 )
169 .map_err(|e| AnnouncementError::ProcessingError(ConnectorError::from(e)))?;
170
171 Ok(self
172 .send_tx(tx_req)
173 .map_err(AnnouncementError::ProcessingError)
174 .await?
175 .boxed())
176 }
177
178 async fn withdraw<Cy: Currency + Send>(
179 &self,
180 balance: Balance<Cy>,
181 recipient: &Address,
182 ) -> Result<BoxFuture<'_, Result<ChainReceipt, Self::Error>>, Self::Error> {
183 self.check_connection_state()?;
184
185 let tx_req = self.payload_generator.transfer(*recipient, balance)?;
186
187 Ok(self.send_tx(tx_req).await?.boxed())
188 }
189
190 async fn register_safe(
191 &self,
192 safe_address: &Address,
193 ) -> Result<BoxFuture<'_, Result<ChainReceipt, Self::Error>>, SafeRegistrationError<Self::Error>> {
194 self.check_connection_state()
195 .map_err(SafeRegistrationError::ProcessingError)?;
196
197 if let Some(safe) = self
198 .client
199 .query_accounts(blokli_client::api::v1::AccountSelector::Address(
200 self.chain_key.public().to_address().into(),
201 ))
202 .await
203 .map_err(|e| SafeRegistrationError::ProcessingError(ConnectorError::from(e)))?
204 .iter()
205 .find_map(|account| account.safe_address.clone())
206 {
207 return Err(SafeRegistrationError::AlreadyRegistered(safe.parse().unwrap_or_else(
208 |e| {
209 tracing::error!("failed to parse safe {safe} address: {e}");
210 Address::default()
211 },
212 )));
213 }
214
215 let tx_req = self
216 .payload_generator
217 .register_safe_by_node(*safe_address)
218 .map_err(|e| SafeRegistrationError::ProcessingError(ConnectorError::from(e)))?;
219
220 Ok(self
221 .send_tx(tx_req)
222 .map_err(SafeRegistrationError::ProcessingError)
223 .await?
224 .boxed())
225 }
226}