1use std::str::FromStr;
2
3use blokli_client::api::{BlokliQueryClient, BlokliSubscriptionClient, BlokliTransactionClient};
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::BoxFuture, pin_mut, stream::BoxStream};
5use futures_time::future::FutureExt as TimeFutureExt;
6use hopr_api::{
7 chain::{AccountSelector, AnnouncementError, ChainReceipt, Multiaddr, SafeRegistrationError},
8 types::{
9 chain::prelude::*,
10 crypto::prelude::*,
11 internal::{
12 account::AccountEntry,
13 prelude::{AnnouncementData, KeyBinding},
14 },
15 primitive::prelude::*,
16 },
17};
18
19use crate::{
20 backend::Backend, connector::HoprBlockchainConnector, errors::ConnectorError, utils::model_to_account_entry,
21};
22
23impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
24where
25 B: Backend + Send + Sync + 'static,
26{
27 pub(crate) fn build_account_stream(
28 &self,
29 selector: AccountSelector,
30 ) -> Result<impl futures::Stream<Item = AccountEntry> + Send + 'static, ConnectorError> {
31 let mut accounts = self.graph.read().nodes().collect::<Vec<_>>();
32
33 accounts.sort_unstable();
35
36 let backend = self.backend.clone();
37 Ok(futures::stream::iter(accounts).filter_map(move |account_id| {
38 let backend = backend.clone();
39 async move {
41 match hopr_async_runtime::prelude::spawn_blocking(move || backend.get_account_by_id(&account_id)).await
42 {
43 Ok(Ok(value)) => value.filter(|c| selector.satisfies(c)),
44 Ok(Err(error)) => {
45 tracing::error!(%error, %account_id, "backend error when looking up account");
46 None
47 }
48 Err(error) => {
49 tracing::error!(%error, %account_id, "join error when looking up account");
50 None
51 }
52 }
53 }
54 }))
55 }
56}
57
58#[async_trait::async_trait]
59impl<B, C, P, R> hopr_api::chain::ChainReadAccountOperations for HoprBlockchainConnector<C, B, P, R>
60where
61 B: Backend + Send + Sync + 'static,
62 C: BlokliQueryClient + BlokliSubscriptionClient + Send + Sync + 'static,
63 P: Send + Sync + 'static,
64 R: Send + Sync,
65{
66 type Error = ConnectorError;
67
68 async fn stream_accounts<'a>(
69 &'a self,
70 selector: AccountSelector,
71 ) -> Result<BoxStream<'a, AccountEntry>, Self::Error> {
72 self.check_connection_state()?;
73
74 Ok(self.build_account_stream(selector)?.boxed())
75 }
76
77 async fn count_accounts(&self, selector: AccountSelector) -> Result<usize, Self::Error> {
78 self.check_connection_state()?;
79
80 Ok(self.stream_accounts(selector).await?.count().await)
81 }
82
83 async fn await_key_binding(
84 &self,
85 offchain_key: &OffchainPublicKey,
86 timeout: std::time::Duration,
87 ) -> Result<AccountEntry, Self::Error> {
88 self.check_connection_state()?;
89
90 let selector = blokli_client::api::v1::AccountSelector::PacketKey((*offchain_key).into());
91 if let Some(node) = self.client.query_accounts(selector.clone()).await?.first().cloned() {
92 return model_to_account_entry(node);
93 }
94
95 let stream = self.client.subscribe_accounts(selector)?.map_err(ConnectorError::from);
96 pin_mut!(stream);
97 if let Some(node) = stream
98 .try_next()
99 .timeout(futures_time::time::Duration::from(
100 timeout.max(std::time::Duration::from_secs(1)),
101 ))
102 .await
103 .map_err(|_| ConnectorError::other(anyhow::anyhow!("timeout while waiting for key binding")))??
104 {
105 model_to_account_entry(node)
106 } else {
107 Err(ConnectorError::AccountDoesNotExist(format!(
108 "with packet key {offchain_key}"
109 )))
110 }
111 }
112}
113
114#[async_trait::async_trait]
115impl<B, C, P> hopr_api::chain::ChainWriteAccountOperations for HoprBlockchainConnector<C, B, P, P::TxRequest>
116where
117 B: Send + Sync,
118 C: BlokliTransactionClient + BlokliQueryClient + Send + Sync + 'static,
119 P: PayloadGenerator + Send + Sync + 'static,
120 P::TxRequest: Send + Sync + 'static,
121{
122 type Error = ConnectorError;
123
124 async fn announce(
125 &self,
126 multiaddrs: &[Multiaddr],
127 key: &OffchainKeypair,
128 ) -> Result<BoxFuture<'_, Result<ChainReceipt, Self::Error>>, AnnouncementError<Self::Error>> {
129 self.check_connection_state().map_err(AnnouncementError::processing)?;
130
131 let new_announced_addrs = ahash::HashSet::from_iter(multiaddrs.iter().map(|a| a.to_string()));
132
133 let existing_account = self
134 .client
135 .query_accounts(blokli_client::api::v1::AccountSelector::Address(
136 self.chain_key.public().to_address().into(),
137 ))
138 .await
139 .map_err(AnnouncementError::processing)?
140 .into_iter()
141 .find(|account| OffchainPublicKey::from_str(&account.packet_key).is_ok_and(|k| &k == key.public()));
142
143 if let Some(account) = &existing_account {
144 let old_announced_addrs = ahash::HashSet::from_iter(account.multi_addresses.iter().cloned());
145 if old_announced_addrs == new_announced_addrs || old_announced_addrs.is_superset(&new_announced_addrs) {
146 return Err(AnnouncementError::AlreadyAnnounced);
147 }
148 }
149
150 let key_binding = KeyBinding::new(self.chain_key.public().to_address(), key);
152 let key_binding_fee = if existing_account.is_none() {
153 self.query_cached_chain_info()
154 .await
155 .map_err(AnnouncementError::processing)?
156 .key_binding_fee
157 } else {
158 HoprBalance::zero()
159 };
160
161 let tx_req = self
162 .payload_generator
163 .announce(
164 AnnouncementData::new(key_binding, multiaddrs.first().cloned())
165 .map_err(|e| AnnouncementError::ProcessingError(ConnectorError::OtherError(e.into())))?,
166 key_binding_fee,
167 )
168 .map_err(AnnouncementError::processing)?;
169
170 Ok(self
171 .send_tx(tx_req, None)
172 .map_err(AnnouncementError::processing)
173 .await?
174 .boxed())
175 }
176
177 async fn withdraw<Cy: Currency + Send>(
178 &self,
179 balance: Balance<Cy>,
180 recipient: &Address,
181 ) -> Result<BoxFuture<'_, Result<ChainReceipt, Self::Error>>, Self::Error> {
182 self.check_connection_state()?;
183
184 let tx_req = self.payload_generator.transfer(*recipient, balance)?;
185
186 Ok(self.send_tx(tx_req, None).await?.boxed())
187 }
188
189 async fn register_safe(
190 &self,
191 safe_address: &Address,
192 ) -> Result<BoxFuture<'_, Result<ChainReceipt, Self::Error>>, SafeRegistrationError<Self::Error>> {
193 self.check_connection_state()
194 .map_err(SafeRegistrationError::processing)?;
195
196 let my_node_addr = self.chain_key.public().to_address();
198 if let Some(safe_with_node) = self
199 .client
200 .query_safe(blokli_client::api::v1::SafeSelector::RegisteredNode(
201 my_node_addr.into(),
202 ))
203 .await
204 .map_err(SafeRegistrationError::processing)?
205 {
206 let registered_safe_addr =
208 Address::from_hex(&safe_with_node.address).map_err(SafeRegistrationError::processing)?;
209 return Err(SafeRegistrationError::AlreadyRegistered(registered_safe_addr));
210 }
211
212 if self
214 .client
215 .query_safe(blokli_client::api::v1::SafeSelector::SafeAddress(
216 (*safe_address).into(),
217 ))
218 .await
219 .map_err(SafeRegistrationError::processing)?
220 .is_none()
221 {
222 return Err(SafeRegistrationError::ProcessingError(
223 ConnectorError::SafeDoesNotExist(*safe_address),
224 ));
225 }
226
227 tracing::debug!(%safe_address, %my_node_addr, "safe exists, proceeding with registration");
228
229 let tx_req = self
230 .payload_generator
231 .register_safe_by_node(*safe_address)
232 .map_err(SafeRegistrationError::processing)?;
233
234 Ok(self
235 .send_tx(tx_req, None)
236 .map_err(SafeRegistrationError::processing)
237 .await?
238 .boxed())
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use hex_literal::hex;
245 use hopr_api::{
246 chain::{ChainReadAccountOperations, ChainWriteAccountOperations, DeployedSafe},
247 types::internal::account::AccountType,
248 };
249
250 use super::*;
251 use crate::{
252 connector::tests::{MODULE_ADDR, PRIVATE_KEY_1, PRIVATE_KEY_2, create_connector},
253 testing::BlokliTestStateBuilder,
254 };
255
256 #[tokio::test]
257 async fn connector_should_stream_and_count_accounts() -> anyhow::Result<()> {
258 let account = AccountEntry {
259 public_key: *OffchainKeypair::random().public(),
260 chain_addr: [1u8; Address::SIZE].into(),
261 entry_type: AccountType::NotAnnounced,
262 safe_address: Some([2u8; Address::SIZE].into()),
263 key_id: 1.into(),
264 };
265
266 let blokli_client = BlokliTestStateBuilder::default()
267 .with_accounts([(account.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1))])
268 .build_static_client();
269
270 let mut connector = create_connector(blokli_client)?;
271 connector.connect().await?;
272
273 let accounts = connector
274 .stream_accounts(AccountSelector::default())
275 .await?
276 .collect::<Vec<_>>()
277 .await;
278
279 let count = connector.count_accounts(AccountSelector::default()).await?;
280
281 assert_eq!(accounts.len(), 1);
282 assert_eq!(count, 1);
283 assert_eq!(&accounts[0], &account);
284
285 Ok(())
286 }
287
288 #[tokio::test]
289 async fn connector_should_stream_and_count_accounts_with_selector() -> anyhow::Result<()> {
290 let account_1 = AccountEntry {
291 public_key: *OffchainKeypair::random().public(),
292 chain_addr: [1u8; Address::SIZE].into(),
293 entry_type: AccountType::NotAnnounced,
294 safe_address: Some([2u8; Address::SIZE].into()),
295 key_id: 1.into(),
296 };
297
298 let account_2 = AccountEntry {
299 public_key: *OffchainKeypair::random().public(),
300 chain_addr: [2u8; Address::SIZE].into(),
301 entry_type: AccountType::Announced(vec!["/ip4/1.2.3.4/tcp/1234".parse()?]),
302 safe_address: Some([3u8; Address::SIZE].into()),
303 key_id: 2.into(),
304 };
305
306 let blokli_client = BlokliTestStateBuilder::default()
307 .with_accounts([
308 (account_1.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
309 (account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
310 ])
311 .build_static_client();
312
313 let mut connector = create_connector(blokli_client)?;
314 connector.connect().await?;
315
316 let selector = AccountSelector::default().with_chain_key(account_1.chain_addr);
317 let accounts = connector.stream_accounts(selector).await?.collect::<Vec<_>>().await;
318 let count = connector.count_accounts(selector).await?;
319
320 assert_eq!(accounts.len(), count);
321 assert_eq!(accounts, vec![account_1.clone()]);
322
323 let selector = AccountSelector::default().with_offchain_key(account_1.public_key);
324 let accounts = connector.stream_accounts(selector).await?.collect::<Vec<_>>().await;
325 let count = connector.count_accounts(selector).await?;
326
327 assert_eq!(accounts.len(), count);
328 assert_eq!(accounts, vec![account_1.clone()]);
329
330 let selector = AccountSelector::default().with_public_only(true);
331 let accounts = connector.stream_accounts(selector).await?.collect::<Vec<_>>().await;
332 let count = connector.count_accounts(selector).await?;
333
334 assert_eq!(accounts.len(), count);
335 assert_eq!(accounts, vec![account_2.clone()]);
336
337 let selector = AccountSelector::default()
338 .with_chain_key(account_1.chain_addr)
339 .with_public_only(true);
340 let accounts = connector.stream_accounts(selector).await?.collect::<Vec<_>>().await;
341 let count = connector.count_accounts(selector).await?;
342
343 assert_eq!(count, 0);
344 assert!(accounts.is_empty());
345
346 Ok(())
347 }
348
349 #[test_log::test(tokio::test)]
350 async fn connector_should_announce_new_account_with_multiaddresses() -> anyhow::Result<()> {
351 let blokli_client = BlokliTestStateBuilder::default()
352 .with_balances([(
353 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
354 XDaiBalance::new_base(1),
355 )])
356 .with_hopr_network_chain_info("rotsee")
357 .build_dynamic_client(MODULE_ADDR.into());
358
359 let mut connector = create_connector(blokli_client)?;
360 connector.connect().await?;
361
362 let offchain_key = OffchainKeypair::from_secret(&hex!(
363 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
364 ))?;
365 let multiaddress = Multiaddr::from_str("/ip4/127.0.0.1/tcp/1234")?;
366
367 connector.announce(&[multiaddress], &offchain_key).await?.await?;
368
369 insta::assert_yaml_snapshot!(*connector.client.snapshot());
370
371 let accounts = connector
372 .stream_accounts(AccountSelector::default().with_public_only(true))
373 .await?
374 .collect::<Vec<_>>()
375 .await;
376
377 assert_eq!(accounts.len(), 1);
378 assert_eq!(
379 accounts[0].get_multiaddrs(),
380 &[Multiaddr::from_str("/ip4/127.0.0.1/tcp/1234")?]
381 );
382
383 Ok(())
384 }
385
386 #[test_log::test(tokio::test)]
387 async fn connector_should_announce_new_account_without_multiaddresses() -> anyhow::Result<()> {
388 let blokli_client = BlokliTestStateBuilder::default()
389 .with_hopr_network_chain_info("rotsee")
390 .with_balances([(
391 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
392 XDaiBalance::new_base(1),
393 )])
394 .build_dynamic_client(MODULE_ADDR.into());
395
396 let mut connector = create_connector(blokli_client)?;
397 connector.connect().await?;
398
399 let offchain_key = OffchainKeypair::from_secret(&hex!(
400 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
401 ))?;
402
403 connector.announce(&[], &offchain_key).await?.await?;
404
405 insta::assert_yaml_snapshot!(*connector.client.snapshot());
406
407 let accounts = connector
408 .stream_accounts(AccountSelector::default())
409 .await?
410 .collect::<Vec<_>>()
411 .await;
412
413 assert_eq!(accounts.len(), 1);
414 assert!(accounts[0].get_multiaddrs().is_empty());
415
416 Ok(())
417 }
418
419 #[test_log::test(tokio::test)]
420 async fn connector_should_not_reannounce_when_existing_account_has_same_multiaddresses() -> anyhow::Result<()> {
421 let offchain_key = OffchainKeypair::from_secret(&hex!(
422 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
423 ))?;
424 let multiaddr: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse()?;
425 let account = AccountEntry {
426 public_key: *offchain_key.public(),
427 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
428 entry_type: AccountType::Announced(vec![multiaddr.clone()]),
429 safe_address: Some([2u8; Address::SIZE].into()),
430 key_id: 1.into(),
431 };
432
433 let blokli_client = BlokliTestStateBuilder::default()
434 .with_accounts([(account.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1))])
435 .with_hopr_network_chain_info("rotsee")
436 .build_dynamic_client(MODULE_ADDR.into());
437
438 let mut connector = create_connector(blokli_client)?;
439 connector.connect().await?;
440
441 assert!(matches!(
442 connector.announce(&[], &offchain_key).await,
443 Err(AnnouncementError::AlreadyAnnounced)
444 ));
445
446 assert!(matches!(
447 connector.announce(&[multiaddr], &offchain_key).await,
448 Err(AnnouncementError::AlreadyAnnounced)
449 ));
450
451 insta::assert_yaml_snapshot!(*connector.client.snapshot());
452
453 Ok(())
454 }
455
456 #[tokio::test]
457 async fn connector_should_reannounce_when_existing_account_has_no_multiaddresses() -> anyhow::Result<()> {
458 let offchain_key = OffchainKeypair::from_secret(&hex!(
459 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
460 ))?;
461 let multiaddr: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse()?;
462 let account = AccountEntry {
463 public_key: *offchain_key.public(),
464 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
465 entry_type: AccountType::NotAnnounced,
466 safe_address: Some([2u8; Address::SIZE].into()),
467 key_id: 1.into(),
468 };
469
470 let blokli_client = BlokliTestStateBuilder::default()
471 .with_accounts([(account.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1))])
472 .with_hopr_network_chain_info("rotsee")
473 .build_dynamic_client(MODULE_ADDR.into());
474
475 let mut connector = create_connector(blokli_client)?;
476 connector.connect().await?;
477
478 assert!(matches!(
479 connector.announce(&[], &offchain_key).await,
480 Err(AnnouncementError::AlreadyAnnounced)
481 ));
482
483 connector
484 .announce(std::slice::from_ref(&multiaddr), &offchain_key)
485 .await?
486 .await?;
487
488 insta::assert_yaml_snapshot!(*connector.client.snapshot());
489
490 let accounts = connector
491 .stream_accounts(AccountSelector::default().with_public_only(true))
492 .await?
493 .collect::<Vec<_>>()
494 .await;
495
496 assert_eq!(accounts.len(), 1);
497 assert_eq!(accounts[0].get_multiaddrs(), &[multiaddr]);
498
499 Ok(())
500 }
501
502 #[tokio::test]
503 async fn connector_should_withdraw() -> anyhow::Result<()> {
504 let blokli_client = BlokliTestStateBuilder::default()
505 .with_balances([([1u8; Address::SIZE].into(), HoprBalance::zero())])
506 .with_balances([([1u8; Address::SIZE].into(), XDaiBalance::zero())])
507 .with_balances([(
508 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
509 XDaiBalance::new_base(10),
510 )])
511 .with_balances([(
512 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
513 HoprBalance::new_base(1000),
514 )])
515 .with_hopr_network_chain_info("rotsee")
516 .build_dynamic_client(MODULE_ADDR.into());
517
518 let mut connector = create_connector(blokli_client)?;
519 connector.connect().await?;
520
521 connector
522 .withdraw(HoprBalance::new_base(10), &[1u8; Address::SIZE].into())
523 .await?
524 .await?;
525 connector
526 .withdraw(XDaiBalance::new_base(1), &[1u8; Address::SIZE].into())
527 .await?
528 .await?;
529
530 insta::assert_yaml_snapshot!(*connector.client.snapshot());
531
532 Ok(())
533 }
534
535 #[tokio::test]
536 async fn connector_should_register_safe() -> anyhow::Result<()> {
537 let blokli_client = BlokliTestStateBuilder::default()
538 .with_balances([(
539 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
540 XDaiBalance::new_base(10),
541 )])
542 .with_deployed_safes([DeployedSafe {
543 address: [1u8; Address::SIZE].into(),
544 owner: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
545 module: MODULE_ADDR.into(),
546 registered_nodes: vec![],
547 }])
548 .with_hopr_network_chain_info("rotsee")
549 .build_dynamic_client(MODULE_ADDR.into());
550
551 let mut connector = create_connector(blokli_client)?;
552 connector.connect().await?;
553
554 connector.register_safe(&[1u8; Address::SIZE].into()).await?.await?;
555
556 insta::assert_yaml_snapshot!(*connector.client.snapshot());
557
558 Ok(())
559 }
560
561 #[tokio::test]
562 async fn connector_should_register_safe_that_has_nodes_registered_already() -> anyhow::Result<()> {
563 let safe_addr: Address = [2u8; Address::SIZE].into();
564 let other_registered_node = ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address();
565
566 let blokli_client = BlokliTestStateBuilder::default()
567 .with_balances([(
568 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
569 XDaiBalance::new_base(10),
570 )])
571 .with_deployed_safes([DeployedSafe {
572 address: safe_addr,
573 owner: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
574 module: MODULE_ADDR.into(),
575 registered_nodes: vec![other_registered_node],
576 }])
577 .with_hopr_network_chain_info("rotsee")
578 .build_dynamic_client(MODULE_ADDR.into());
579
580 let mut connector = create_connector(blokli_client)?;
581 connector.connect().await?;
582
583 connector.register_safe(&safe_addr).await?.await?;
584
585 insta::assert_yaml_snapshot!(*connector.client.snapshot());
586
587 Ok(())
588 }
589
590 #[tokio::test]
591 async fn connector_should_not_register_safe_that_does_not_exist() -> anyhow::Result<()> {
592 let safe_addr: Address = [2u8; Address::SIZE].into();
593
594 let blokli_client = BlokliTestStateBuilder::default()
595 .with_balances([(
596 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
597 XDaiBalance::new_base(10),
598 )])
599 .with_hopr_network_chain_info("rotsee")
600 .build_dynamic_client(MODULE_ADDR.into());
601
602 let mut connector = create_connector(blokli_client)?;
603 connector.connect().await?;
604
605 assert!(connector.register_safe(&safe_addr).await.is_err());
606
607 insta::assert_yaml_snapshot!(*connector.client.snapshot());
608
609 Ok(())
610 }
611
612 #[tokio::test]
613 async fn connector_should_not_register_any_safe_when_node_already_registered() -> anyhow::Result<()> {
614 let blokli_client = BlokliTestStateBuilder::default()
615 .with_balances([(
616 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
617 XDaiBalance::new_base(10),
618 )])
619 .with_deployed_safes([
620 DeployedSafe {
621 address: [2u8; Address::SIZE].into(),
622 owner: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
623 module: MODULE_ADDR.into(),
624 registered_nodes: vec![ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address()],
625 },
626 DeployedSafe {
627 address: [1u8; Address::SIZE].into(),
628 owner: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
629 module: MODULE_ADDR.into(),
630 registered_nodes: vec![],
631 },
632 ])
633 .with_hopr_network_chain_info("rotsee")
634 .build_dynamic_client(MODULE_ADDR.into());
635
636 let mut connector = create_connector(blokli_client)?;
637 connector.connect().await?;
638
639 assert!(
640 matches!(connector.register_safe(&[1u8; Address::SIZE].into()).await, Err(SafeRegistrationError::AlreadyRegistered(a)) if a == [2u8; Address::SIZE].into())
641 );
642
643 insta::assert_yaml_snapshot!(*connector.client.snapshot());
644
645 Ok(())
646 }
647}