1use std::str::FromStr;
2
3use blokli_client::api::{BlokliQueryClient, BlokliSubscriptionClient, BlokliTransactionClient};
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::BoxFuture, pin_mut, stream::BoxStream};
5use futures_time::future::FutureExt as TimeFutureExt;
6use hopr_api::chain::{AccountSelector, AnnouncementError, ChainReceipt, Multiaddr, SafeRegistrationError};
7use hopr_chain_types::prelude::*;
8use hopr_crypto_types::prelude::*;
9use hopr_internal_types::{
10 account::AccountEntry,
11 prelude::{AnnouncementData, KeyBinding},
12};
13use hopr_primitive_types::prelude::*;
14
15use crate::{
16 backend::Backend, connector::HoprBlockchainConnector, errors::ConnectorError, utils::model_to_account_entry,
17};
18
19impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
20where
21 B: Backend + Send + Sync + 'static,
22{
23 pub(crate) fn build_account_stream(
24 &self,
25 selector: AccountSelector,
26 ) -> Result<impl futures::Stream<Item = AccountEntry> + Send + 'static, ConnectorError> {
27 let mut accounts = self.graph.read().nodes().collect::<Vec<_>>();
28
29 accounts.sort_unstable();
31
32 let backend = self.backend.clone();
33 Ok(futures::stream::iter(accounts).filter_map(move |account_id| {
34 let backend = backend.clone();
35 async move {
37 match hopr_async_runtime::prelude::spawn_blocking(move || backend.get_account_by_id(&account_id)).await
38 {
39 Ok(Ok(value)) => value.filter(|c| selector.satisfies(c)),
40 Ok(Err(error)) => {
41 tracing::error!(%error, %account_id, "backend error when looking up account");
42 None
43 }
44 Err(error) => {
45 tracing::error!(%error, %account_id, "join error when looking up account");
46 None
47 }
48 }
49 }
50 }))
51 }
52}
53
54#[async_trait::async_trait]
55impl<B, C, P, R> hopr_api::chain::ChainReadAccountOperations for HoprBlockchainConnector<C, B, P, R>
56where
57 B: Backend + Send + Sync + 'static,
58 C: BlokliQueryClient + BlokliSubscriptionClient + Send + Sync + 'static,
59 P: Send + Sync + 'static,
60 R: Send + Sync,
61{
62 type Error = ConnectorError;
63
64 async fn stream_accounts<'a>(
65 &'a self,
66 selector: AccountSelector,
67 ) -> Result<BoxStream<'a, AccountEntry>, Self::Error> {
68 self.check_connection_state()?;
69
70 Ok(self.build_account_stream(selector)?.boxed())
71 }
72
73 async fn count_accounts(&self, selector: AccountSelector) -> Result<usize, Self::Error> {
74 self.check_connection_state()?;
75
76 Ok(self.stream_accounts(selector).await?.count().await)
77 }
78
79 async fn await_key_binding(
80 &self,
81 offchain_key: &OffchainPublicKey,
82 timeout: std::time::Duration,
83 ) -> Result<AccountEntry, Self::Error> {
84 self.check_connection_state()?;
85
86 let selector = blokli_client::api::v1::AccountSelector::PacketKey((*offchain_key).into());
87 if let Some(node) = self.client.query_accounts(selector.clone()).await?.first().cloned() {
88 return model_to_account_entry(node);
89 }
90
91 let stream = self.client.subscribe_accounts(selector)?.map_err(ConnectorError::from);
92 pin_mut!(stream);
93 if let Some(node) = stream
94 .try_next()
95 .timeout(futures_time::time::Duration::from(
96 timeout.max(std::time::Duration::from_secs(1)),
97 ))
98 .await??
99 {
100 model_to_account_entry(node)
101 } else {
102 Err(ConnectorError::AccountDoesNotExist(format!(
103 "with packet key {offchain_key}"
104 )))
105 }
106 }
107}
108
109#[async_trait::async_trait]
110impl<B, C, P> hopr_api::chain::ChainWriteAccountOperations for HoprBlockchainConnector<C, B, P, P::TxRequest>
111where
112 B: Send + Sync,
113 C: BlokliTransactionClient + BlokliQueryClient + Send + Sync + 'static,
114 P: PayloadGenerator + Send + Sync + 'static,
115 P::TxRequest: Send + Sync + 'static,
116{
117 type Error = ConnectorError;
118
119 async fn announce(
120 &self,
121 multiaddrs: &[Multiaddr],
122 key: &OffchainKeypair,
123 ) -> Result<BoxFuture<'_, Result<ChainReceipt, Self::Error>>, AnnouncementError<Self::Error>> {
124 self.check_connection_state().map_err(AnnouncementError::processing)?;
125
126 let new_announced_addrs = ahash::HashSet::from_iter(multiaddrs.iter().map(|a| a.to_string()));
127
128 let existing_account = self
129 .client
130 .query_accounts(blokli_client::api::v1::AccountSelector::Address(
131 self.chain_key.public().to_address().into(),
132 ))
133 .await
134 .map_err(AnnouncementError::processing)?
135 .into_iter()
136 .find(|account| OffchainPublicKey::from_str(&account.packet_key).is_ok_and(|k| &k == key.public()));
137
138 if let Some(account) = &existing_account {
139 let old_announced_addrs = ahash::HashSet::from_iter(account.multi_addresses.iter().cloned());
140 if old_announced_addrs == new_announced_addrs || old_announced_addrs.is_superset(&new_announced_addrs) {
141 return Err(AnnouncementError::AlreadyAnnounced);
142 }
143 }
144
145 let key_binding = KeyBinding::new(self.chain_key.public().to_address(), key);
147 let key_binding_fee = if existing_account.is_none() {
148 self.query_cached_chain_info()
149 .await
150 .map_err(AnnouncementError::processing)?
151 .key_binding_fee
152 } else {
153 HoprBalance::zero()
154 };
155
156 let tx_req = self
157 .payload_generator
158 .announce(
159 AnnouncementData::new(key_binding, multiaddrs.first().cloned())
160 .map_err(|e| AnnouncementError::ProcessingError(ConnectorError::OtherError(e.into())))?,
161 key_binding_fee,
162 )
163 .map_err(AnnouncementError::processing)?;
164
165 Ok(self
166 .send_tx(tx_req)
167 .map_err(AnnouncementError::processing)
168 .await?
169 .boxed())
170 }
171
172 async fn withdraw<Cy: Currency + Send>(
173 &self,
174 balance: Balance<Cy>,
175 recipient: &Address,
176 ) -> Result<BoxFuture<'_, Result<ChainReceipt, Self::Error>>, Self::Error> {
177 self.check_connection_state()?;
178
179 let tx_req = self.payload_generator.transfer(*recipient, balance)?;
180
181 Ok(self.send_tx(tx_req).await?.boxed())
182 }
183
184 async fn register_safe(
185 &self,
186 safe_address: &Address,
187 ) -> Result<BoxFuture<'_, Result<ChainReceipt, Self::Error>>, SafeRegistrationError<Self::Error>> {
188 self.check_connection_state()
189 .map_err(SafeRegistrationError::processing)?;
190
191 let my_node_addr = self.chain_key.public().to_address();
193 if let Some(safe_with_node) = self
194 .client
195 .query_safe(blokli_client::api::v1::SafeSelector::RegisteredNode(
196 my_node_addr.into(),
197 ))
198 .await
199 .map_err(SafeRegistrationError::processing)?
200 {
201 let registered_safe_addr =
203 Address::from_hex(&safe_with_node.address).map_err(SafeRegistrationError::processing)?;
204 return Err(SafeRegistrationError::AlreadyRegistered(registered_safe_addr));
205 }
206
207 if self
209 .client
210 .query_safe(blokli_client::api::v1::SafeSelector::SafeAddress(
211 (*safe_address).into(),
212 ))
213 .await
214 .map_err(SafeRegistrationError::processing)?
215 .is_none()
216 {
217 return Err(SafeRegistrationError::ProcessingError(
218 ConnectorError::SafeDoesNotExist(*safe_address),
219 ));
220 }
221
222 let tx_req = self
223 .payload_generator
224 .register_safe_by_node(*safe_address)
225 .map_err(SafeRegistrationError::processing)?;
226
227 Ok(self
228 .send_tx(tx_req)
229 .map_err(SafeRegistrationError::processing)
230 .await?
231 .boxed())
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use hex_literal::hex;
238 use hopr_api::chain::{ChainReadAccountOperations, ChainWriteAccountOperations, DeployedSafe};
239 use hopr_internal_types::account::AccountType;
240
241 use super::*;
242 use crate::{
243 connector::tests::{MODULE_ADDR, PRIVATE_KEY_1, PRIVATE_KEY_2, create_connector},
244 testing::BlokliTestStateBuilder,
245 };
246
247 #[tokio::test]
248 async fn connector_should_stream_and_count_accounts() -> anyhow::Result<()> {
249 let account = AccountEntry {
250 public_key: *OffchainKeypair::random().public(),
251 chain_addr: [1u8; Address::SIZE].into(),
252 entry_type: AccountType::NotAnnounced,
253 safe_address: Some([2u8; Address::SIZE].into()),
254 key_id: 1.into(),
255 };
256
257 let blokli_client = BlokliTestStateBuilder::default()
258 .with_accounts([(account.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1))])
259 .build_static_client();
260
261 let mut connector = create_connector(blokli_client)?;
262 connector.connect().await?;
263
264 let accounts = connector
265 .stream_accounts(AccountSelector::default())
266 .await?
267 .collect::<Vec<_>>()
268 .await;
269
270 let count = connector.count_accounts(AccountSelector::default()).await?;
271
272 assert_eq!(accounts.len(), 1);
273 assert_eq!(count, 1);
274 assert_eq!(&accounts[0], &account);
275
276 Ok(())
277 }
278
279 #[tokio::test]
280 async fn connector_should_stream_and_count_accounts_with_selector() -> anyhow::Result<()> {
281 let account_1 = AccountEntry {
282 public_key: *OffchainKeypair::random().public(),
283 chain_addr: [1u8; Address::SIZE].into(),
284 entry_type: AccountType::NotAnnounced,
285 safe_address: Some([2u8; Address::SIZE].into()),
286 key_id: 1.into(),
287 };
288
289 let account_2 = AccountEntry {
290 public_key: *OffchainKeypair::random().public(),
291 chain_addr: [2u8; Address::SIZE].into(),
292 entry_type: AccountType::Announced(vec!["/ip4/1.2.3.4/tcp/1234".parse()?]),
293 safe_address: Some([3u8; Address::SIZE].into()),
294 key_id: 2.into(),
295 };
296
297 let blokli_client = BlokliTestStateBuilder::default()
298 .with_accounts([
299 (account_1.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
300 (account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
301 ])
302 .build_static_client();
303
304 let mut connector = create_connector(blokli_client)?;
305 connector.connect().await?;
306
307 let selector = AccountSelector::default().with_chain_key(account_1.chain_addr);
308 let accounts = connector.stream_accounts(selector).await?.collect::<Vec<_>>().await;
309 let count = connector.count_accounts(selector).await?;
310
311 assert_eq!(accounts.len(), count);
312 assert_eq!(accounts, vec![account_1.clone()]);
313
314 let selector = AccountSelector::default().with_offchain_key(account_1.public_key);
315 let accounts = connector.stream_accounts(selector).await?.collect::<Vec<_>>().await;
316 let count = connector.count_accounts(selector).await?;
317
318 assert_eq!(accounts.len(), count);
319 assert_eq!(accounts, vec![account_1.clone()]);
320
321 let selector = AccountSelector::default().with_public_only(true);
322 let accounts = connector.stream_accounts(selector).await?.collect::<Vec<_>>().await;
323 let count = connector.count_accounts(selector).await?;
324
325 assert_eq!(accounts.len(), count);
326 assert_eq!(accounts, vec![account_2.clone()]);
327
328 let selector = AccountSelector::default()
329 .with_chain_key(account_1.chain_addr)
330 .with_public_only(true);
331 let accounts = connector.stream_accounts(selector).await?.collect::<Vec<_>>().await;
332 let count = connector.count_accounts(selector).await?;
333
334 assert_eq!(count, 0);
335 assert!(accounts.is_empty());
336
337 Ok(())
338 }
339
340 #[test_log::test(tokio::test)]
341 async fn connector_should_announce_new_account_with_multiaddresses() -> anyhow::Result<()> {
342 let blokli_client = BlokliTestStateBuilder::default()
343 .with_balances([(
344 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
345 XDaiBalance::new_base(1),
346 )])
347 .with_hopr_network_chain_info("rotsee")
348 .build_dynamic_client(MODULE_ADDR.into());
349
350 let mut connector = create_connector(blokli_client)?;
351 connector.connect().await?;
352
353 let offchain_key = OffchainKeypair::from_secret(&hex!(
354 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
355 ))?;
356 let multiaddress = Multiaddr::from_str("/ip4/127.0.0.1/tcp/1234")?;
357
358 connector.announce(&[multiaddress], &offchain_key).await?.await?;
359
360 insta::assert_yaml_snapshot!(*connector.client.snapshot());
361
362 let accounts = connector
363 .stream_accounts(AccountSelector::default().with_public_only(true))
364 .await?
365 .collect::<Vec<_>>()
366 .await;
367
368 assert_eq!(accounts.len(), 1);
369 assert_eq!(
370 accounts[0].get_multiaddrs(),
371 &[Multiaddr::from_str("/ip4/127.0.0.1/tcp/1234")?]
372 );
373
374 Ok(())
375 }
376
377 #[test_log::test(tokio::test)]
378 async fn connector_should_announce_new_account_without_multiaddresses() -> anyhow::Result<()> {
379 let blokli_client = BlokliTestStateBuilder::default()
380 .with_hopr_network_chain_info("rotsee")
381 .with_balances([(
382 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
383 XDaiBalance::new_base(1),
384 )])
385 .build_dynamic_client(MODULE_ADDR.into());
386
387 let mut connector = create_connector(blokli_client)?;
388 connector.connect().await?;
389
390 let offchain_key = OffchainKeypair::from_secret(&hex!(
391 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
392 ))?;
393
394 connector.announce(&[], &offchain_key).await?.await?;
395
396 insta::assert_yaml_snapshot!(*connector.client.snapshot());
397
398 let accounts = connector
399 .stream_accounts(AccountSelector::default())
400 .await?
401 .collect::<Vec<_>>()
402 .await;
403
404 assert_eq!(accounts.len(), 1);
405 assert!(accounts[0].get_multiaddrs().is_empty());
406
407 Ok(())
408 }
409
410 #[test_log::test(tokio::test)]
411 async fn connector_should_not_reannounce_when_existing_account_has_same_multiaddresses() -> anyhow::Result<()> {
412 let offchain_key = OffchainKeypair::from_secret(&hex!(
413 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
414 ))?;
415 let multiaddr: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse()?;
416 let account = AccountEntry {
417 public_key: *offchain_key.public(),
418 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
419 entry_type: AccountType::Announced(vec![multiaddr.clone()]),
420 safe_address: Some([2u8; Address::SIZE].into()),
421 key_id: 1.into(),
422 };
423
424 let blokli_client = BlokliTestStateBuilder::default()
425 .with_accounts([(account.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1))])
426 .with_hopr_network_chain_info("rotsee")
427 .build_dynamic_client(MODULE_ADDR.into());
428
429 let mut connector = create_connector(blokli_client)?;
430 connector.connect().await?;
431
432 assert!(matches!(
433 connector.announce(&[], &offchain_key).await,
434 Err(AnnouncementError::AlreadyAnnounced)
435 ));
436
437 assert!(matches!(
438 connector.announce(&[multiaddr], &offchain_key).await,
439 Err(AnnouncementError::AlreadyAnnounced)
440 ));
441
442 insta::assert_yaml_snapshot!(*connector.client.snapshot());
443
444 Ok(())
445 }
446
447 #[tokio::test]
448 async fn connector_should_reannounce_when_existing_account_has_no_multiaddresses() -> anyhow::Result<()> {
449 let offchain_key = OffchainKeypair::from_secret(&hex!(
450 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
451 ))?;
452 let multiaddr: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse()?;
453 let account = AccountEntry {
454 public_key: *offchain_key.public(),
455 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
456 entry_type: AccountType::NotAnnounced,
457 safe_address: Some([2u8; Address::SIZE].into()),
458 key_id: 1.into(),
459 };
460
461 let blokli_client = BlokliTestStateBuilder::default()
462 .with_accounts([(account.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1))])
463 .with_hopr_network_chain_info("rotsee")
464 .build_dynamic_client(MODULE_ADDR.into());
465
466 let mut connector = create_connector(blokli_client)?;
467 connector.connect().await?;
468
469 assert!(matches!(
470 connector.announce(&[], &offchain_key).await,
471 Err(AnnouncementError::AlreadyAnnounced)
472 ));
473
474 connector.announce(&[multiaddr.clone()], &offchain_key).await?.await?;
475
476 insta::assert_yaml_snapshot!(*connector.client.snapshot());
477
478 let accounts = connector
479 .stream_accounts(AccountSelector::default().with_public_only(true))
480 .await?
481 .collect::<Vec<_>>()
482 .await;
483
484 assert_eq!(accounts.len(), 1);
485 assert_eq!(accounts[0].get_multiaddrs(), &[multiaddr]);
486
487 Ok(())
488 }
489
490 #[tokio::test]
491 async fn connector_should_withdraw() -> anyhow::Result<()> {
492 let blokli_client = BlokliTestStateBuilder::default()
493 .with_balances([([1u8; Address::SIZE].into(), HoprBalance::zero())])
494 .with_balances([([1u8; Address::SIZE].into(), XDaiBalance::zero())])
495 .with_balances([(
496 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
497 XDaiBalance::new_base(10),
498 )])
499 .with_balances([(
500 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
501 HoprBalance::new_base(1000),
502 )])
503 .with_hopr_network_chain_info("rotsee")
504 .build_dynamic_client(MODULE_ADDR.into());
505
506 let mut connector = create_connector(blokli_client)?;
507 connector.connect().await?;
508
509 connector
510 .withdraw(HoprBalance::new_base(10), &[1u8; Address::SIZE].into())
511 .await?
512 .await?;
513 connector
514 .withdraw(XDaiBalance::new_base(1), &[1u8; Address::SIZE].into())
515 .await?
516 .await?;
517
518 insta::assert_yaml_snapshot!(*connector.client.snapshot());
519
520 Ok(())
521 }
522
523 #[tokio::test]
524 async fn connector_should_register_safe() -> anyhow::Result<()> {
525 let blokli_client = BlokliTestStateBuilder::default()
526 .with_balances([(
527 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
528 XDaiBalance::new_base(10),
529 )])
530 .with_deployed_safes([DeployedSafe {
531 address: [1u8; Address::SIZE].into(),
532 owner: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
533 module: MODULE_ADDR.into(),
534 registered_nodes: vec![],
535 }])
536 .with_hopr_network_chain_info("rotsee")
537 .build_dynamic_client(MODULE_ADDR.into());
538
539 let mut connector = create_connector(blokli_client)?;
540 connector.connect().await?;
541
542 connector.register_safe(&[1u8; Address::SIZE].into()).await?.await?;
543
544 insta::assert_yaml_snapshot!(*connector.client.snapshot());
545
546 Ok(())
547 }
548
549 #[tokio::test]
550 async fn connector_should_register_safe_that_has_nodes_registered_already() -> anyhow::Result<()> {
551 let safe_addr: Address = [2u8; Address::SIZE].into();
552 let other_registered_node = ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address();
553
554 let blokli_client = BlokliTestStateBuilder::default()
555 .with_balances([(
556 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
557 XDaiBalance::new_base(10),
558 )])
559 .with_deployed_safes([DeployedSafe {
560 address: safe_addr,
561 owner: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
562 module: MODULE_ADDR.into(),
563 registered_nodes: vec![other_registered_node],
564 }])
565 .with_hopr_network_chain_info("rotsee")
566 .build_dynamic_client(MODULE_ADDR.into());
567
568 let mut connector = create_connector(blokli_client)?;
569 connector.connect().await?;
570
571 connector.register_safe(&safe_addr).await?.await?;
572
573 insta::assert_yaml_snapshot!(*connector.client.snapshot());
574
575 Ok(())
576 }
577
578 #[tokio::test]
579 async fn connector_should_not_register_safe_that_does_not_exist() -> anyhow::Result<()> {
580 let safe_addr: Address = [2u8; Address::SIZE].into();
581
582 let blokli_client = BlokliTestStateBuilder::default()
583 .with_balances([(
584 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
585 XDaiBalance::new_base(10),
586 )])
587 .with_hopr_network_chain_info("rotsee")
588 .build_dynamic_client(MODULE_ADDR.into());
589
590 let mut connector = create_connector(blokli_client)?;
591 connector.connect().await?;
592
593 assert!(connector.register_safe(&safe_addr).await.is_err());
594
595 insta::assert_yaml_snapshot!(*connector.client.snapshot());
596
597 Ok(())
598 }
599
600 #[tokio::test]
601 async fn connector_should_not_register_any_safe_when_node_already_registered() -> anyhow::Result<()> {
602 let blokli_client = BlokliTestStateBuilder::default()
603 .with_balances([(
604 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
605 XDaiBalance::new_base(10),
606 )])
607 .with_deployed_safes([
608 DeployedSafe {
609 address: [2u8; Address::SIZE].into(),
610 owner: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
611 module: MODULE_ADDR.into(),
612 registered_nodes: vec![ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address()],
613 },
614 DeployedSafe {
615 address: [1u8; Address::SIZE].into(),
616 owner: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
617 module: MODULE_ADDR.into(),
618 registered_nodes: vec![],
619 },
620 ])
621 .with_hopr_network_chain_info("rotsee")
622 .build_dynamic_client(MODULE_ADDR.into());
623
624 let mut connector = create_connector(blokli_client)?;
625 connector.connect().await?;
626
627 assert!(
628 matches!(connector.register_safe(&[1u8; Address::SIZE].into()).await, Err(SafeRegistrationError::AlreadyRegistered(a)) if a == [2u8; Address::SIZE].into())
629 );
630
631 insta::assert_yaml_snapshot!(*connector.client.snapshot());
632
633 Ok(())
634 }
635}