Skip to main content

hopr_chain_connector/connector/
accounts.rs

1use std::str::FromStr;
2
3use blokli_client::api::{BlokliQueryClient, BlokliSubscriptionClient, BlokliTransactionClient};
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::BoxFuture, pin_mut, stream::BoxStream};
5use futures_time::future::FutureExt as TimeFutureExt;
6use hopr_api::{
7    chain::{AccountSelector, AnnouncementError, ChainReceipt, Multiaddr, SafeRegistrationError},
8    types::{
9        chain::prelude::*,
10        crypto::prelude::*,
11        internal::{
12            account::AccountEntry,
13            prelude::{AnnouncementData, KeyBinding},
14        },
15        primitive::prelude::*,
16    },
17};
18
19use crate::{
20    backend::Backend, connector::HoprBlockchainConnector, errors::ConnectorError, utils::model_to_account_entry,
21};
22
23impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
24where
25    B: Backend + Send + Sync + 'static,
26{
27    pub(crate) fn build_account_stream(
28        &self,
29        selector: AccountSelector,
30    ) -> Result<impl futures::Stream<Item = AccountEntry> + Send + 'static, ConnectorError> {
31        let mut accounts = self.graph.read().nodes().collect::<Vec<_>>();
32
33        // Ensure the returned accounts are always perfectly ordered by their id.
34        accounts.sort_unstable();
35
36        let backend = self.backend.clone();
37        Ok(futures::stream::iter(accounts).filter_map(move |account_id| {
38            let backend = backend.clone();
39            // This avoids the cache on purpose so it does not get spammed
40            async move {
41                match hopr_async_runtime::prelude::spawn_blocking(move || backend.get_account_by_id(&account_id)).await
42                {
43                    Ok(Ok(value)) => value.filter(|c| selector.satisfies(c)),
44                    Ok(Err(error)) => {
45                        tracing::error!(%error, %account_id, "backend error when looking up account");
46                        None
47                    }
48                    Err(error) => {
49                        tracing::error!(%error, %account_id, "join error when looking up account");
50                        None
51                    }
52                }
53            }
54        }))
55    }
56}
57
58#[async_trait::async_trait]
59impl<B, C, P, R> hopr_api::chain::ChainReadAccountOperations for HoprBlockchainConnector<C, B, P, R>
60where
61    B: Backend + Send + Sync + 'static,
62    C: BlokliQueryClient + BlokliSubscriptionClient + Send + Sync + 'static,
63    P: Send + Sync + 'static,
64    R: Send + Sync,
65{
66    type Error = ConnectorError;
67
68    async fn stream_accounts<'a>(
69        &'a self,
70        selector: AccountSelector,
71    ) -> Result<BoxStream<'a, AccountEntry>, Self::Error> {
72        self.check_connection_state()?;
73
74        Ok(self.build_account_stream(selector)?.boxed())
75    }
76
77    async fn count_accounts(&self, selector: AccountSelector) -> Result<usize, Self::Error> {
78        self.check_connection_state()?;
79
80        Ok(self.stream_accounts(selector).await?.count().await)
81    }
82
83    async fn await_key_binding(
84        &self,
85        offchain_key: &OffchainPublicKey,
86        timeout: std::time::Duration,
87    ) -> Result<AccountEntry, Self::Error> {
88        self.check_connection_state()?;
89
90        let selector = blokli_client::api::v1::AccountSelector::PacketKey((*offchain_key).into());
91        if let Some(node) = self.client.query_accounts(selector.clone()).await?.first().cloned() {
92            return model_to_account_entry(node);
93        }
94
95        let stream = self.client.subscribe_accounts(selector)?.map_err(ConnectorError::from);
96        pin_mut!(stream);
97        if let Some(node) = stream
98            .try_next()
99            .timeout(futures_time::time::Duration::from(
100                timeout.max(std::time::Duration::from_secs(1)),
101            ))
102            .await
103            .map_err(|_| ConnectorError::other(anyhow::anyhow!("timeout while waiting for key binding")))??
104        {
105            model_to_account_entry(node)
106        } else {
107            Err(ConnectorError::AccountDoesNotExist(format!(
108                "with packet key {offchain_key}"
109            )))
110        }
111    }
112}
113
114#[async_trait::async_trait]
115impl<B, C, P> hopr_api::chain::ChainWriteAccountOperations for HoprBlockchainConnector<C, B, P, P::TxRequest>
116where
117    B: Send + Sync,
118    C: BlokliTransactionClient + BlokliQueryClient + Send + Sync + 'static,
119    P: PayloadGenerator + Send + Sync + 'static,
120    P::TxRequest: Send + Sync + 'static,
121{
122    type Error = ConnectorError;
123
124    async fn announce(
125        &self,
126        multiaddrs: &[Multiaddr],
127        key: &OffchainKeypair,
128    ) -> Result<BoxFuture<'_, Result<ChainReceipt, Self::Error>>, AnnouncementError<Self::Error>> {
129        self.check_connection_state().map_err(AnnouncementError::processing)?;
130
131        let new_announced_addrs = ahash::HashSet::from_iter(multiaddrs.iter().map(|a| a.to_string()));
132
133        let existing_account = self
134            .client
135            .query_accounts(blokli_client::api::v1::AccountSelector::Address(
136                self.chain_key.public().to_address().into(),
137            ))
138            .await
139            .map_err(AnnouncementError::processing)?
140            .into_iter()
141            .find(|account| OffchainPublicKey::from_str(&account.packet_key).is_ok_and(|k| &k == key.public()));
142
143        if let Some(account) = &existing_account {
144            let old_announced_addrs = ahash::HashSet::from_iter(account.multi_addresses.iter().cloned());
145            if old_announced_addrs == new_announced_addrs || old_announced_addrs.is_superset(&new_announced_addrs) {
146                return Err(AnnouncementError::AlreadyAnnounced);
147            }
148        }
149
150        // No key-binding fee must be set when the account already exists (with multi-addresses or not)
151        let key_binding = KeyBinding::new(self.chain_key.public().to_address(), key);
152        let key_binding_fee = if existing_account.is_none() {
153            self.query_cached_chain_info()
154                .await
155                .map_err(AnnouncementError::processing)?
156                .key_binding_fee
157        } else {
158            HoprBalance::zero()
159        };
160
161        let tx_req = self
162            .payload_generator
163            .announce(
164                AnnouncementData::new(key_binding, multiaddrs.first().cloned())
165                    .map_err(|e| AnnouncementError::ProcessingError(ConnectorError::OtherError(e.into())))?,
166                key_binding_fee,
167            )
168            .map_err(AnnouncementError::processing)?;
169
170        Ok(self
171            .send_tx(tx_req, None)
172            .map_err(AnnouncementError::processing)
173            .await?
174            .boxed())
175    }
176
177    async fn withdraw<Cy: Currency + Send>(
178        &self,
179        balance: Balance<Cy>,
180        recipient: &Address,
181    ) -> Result<BoxFuture<'_, Result<ChainReceipt, Self::Error>>, Self::Error> {
182        self.check_connection_state()?;
183
184        let tx_req = self.payload_generator.transfer(*recipient, balance)?;
185
186        Ok(self.send_tx(tx_req, None).await?.boxed())
187    }
188
189    async fn register_safe(
190        &self,
191        safe_address: &Address,
192    ) -> Result<BoxFuture<'_, Result<ChainReceipt, Self::Error>>, SafeRegistrationError<Self::Error>> {
193        self.check_connection_state()
194            .map_err(SafeRegistrationError::processing)?;
195
196        // Check if the node isn't already registered with some Safe
197        let my_node_addr = self.chain_key.public().to_address();
198        if let Some(safe_with_node) = self
199            .client
200            .query_safe(blokli_client::api::v1::SafeSelector::RegisteredNode(
201                my_node_addr.into(),
202            ))
203            .await
204            .map_err(SafeRegistrationError::processing)?
205        {
206            // If already registered, return which Safe it is registered with
207            let registered_safe_addr =
208                Address::from_hex(&safe_with_node.address).map_err(SafeRegistrationError::processing)?;
209            return Err(SafeRegistrationError::AlreadyRegistered(registered_safe_addr));
210        }
211
212        // Check if Safe with this address even exists (has been deployed)
213        if self
214            .client
215            .query_safe(blokli_client::api::v1::SafeSelector::SafeAddress(
216                (*safe_address).into(),
217            ))
218            .await
219            .map_err(SafeRegistrationError::processing)?
220            .is_none()
221        {
222            return Err(SafeRegistrationError::ProcessingError(
223                ConnectorError::SafeDoesNotExist(*safe_address),
224            ));
225        }
226
227        tracing::debug!(%safe_address, %my_node_addr, "safe exists, proceeding with registration");
228
229        let tx_req = self
230            .payload_generator
231            .register_safe_by_node(*safe_address)
232            .map_err(SafeRegistrationError::processing)?;
233
234        Ok(self
235            .send_tx(tx_req, None)
236            .map_err(SafeRegistrationError::processing)
237            .await?
238            .boxed())
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use hex_literal::hex;
245    use hopr_api::{
246        chain::{ChainReadAccountOperations, ChainWriteAccountOperations, DeployedSafe},
247        types::internal::account::AccountType,
248    };
249
250    use super::*;
251    use crate::{
252        connector::tests::{MODULE_ADDR, PRIVATE_KEY_1, PRIVATE_KEY_2, create_connector},
253        testing::BlokliTestStateBuilder,
254    };
255
256    #[tokio::test]
257    async fn connector_should_stream_and_count_accounts() -> anyhow::Result<()> {
258        let account = AccountEntry {
259            public_key: *OffchainKeypair::random().public(),
260            chain_addr: [1u8; Address::SIZE].into(),
261            entry_type: AccountType::NotAnnounced,
262            safe_address: Some([2u8; Address::SIZE].into()),
263            key_id: 1.into(),
264        };
265
266        let blokli_client = BlokliTestStateBuilder::default()
267            .with_accounts([(account.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1))])
268            .build_static_client();
269
270        let mut connector = create_connector(blokli_client)?;
271        connector.connect().await?;
272
273        let accounts = connector
274            .stream_accounts(AccountSelector::default())
275            .await?
276            .collect::<Vec<_>>()
277            .await;
278
279        let count = connector.count_accounts(AccountSelector::default()).await?;
280
281        assert_eq!(accounts.len(), 1);
282        assert_eq!(count, 1);
283        assert_eq!(&accounts[0], &account);
284
285        Ok(())
286    }
287
288    #[tokio::test]
289    async fn connector_should_stream_and_count_accounts_with_selector() -> anyhow::Result<()> {
290        let account_1 = AccountEntry {
291            public_key: *OffchainKeypair::random().public(),
292            chain_addr: [1u8; Address::SIZE].into(),
293            entry_type: AccountType::NotAnnounced,
294            safe_address: Some([2u8; Address::SIZE].into()),
295            key_id: 1.into(),
296        };
297
298        let account_2 = AccountEntry {
299            public_key: *OffchainKeypair::random().public(),
300            chain_addr: [2u8; Address::SIZE].into(),
301            entry_type: AccountType::Announced(vec!["/ip4/1.2.3.4/tcp/1234".parse()?]),
302            safe_address: Some([3u8; Address::SIZE].into()),
303            key_id: 2.into(),
304        };
305
306        let blokli_client = BlokliTestStateBuilder::default()
307            .with_accounts([
308                (account_1.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
309                (account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
310            ])
311            .build_static_client();
312
313        let mut connector = create_connector(blokli_client)?;
314        connector.connect().await?;
315
316        let selector = AccountSelector::default().with_chain_key(account_1.chain_addr);
317        let accounts = connector.stream_accounts(selector).await?.collect::<Vec<_>>().await;
318        let count = connector.count_accounts(selector).await?;
319
320        assert_eq!(accounts.len(), count);
321        assert_eq!(accounts, vec![account_1.clone()]);
322
323        let selector = AccountSelector::default().with_offchain_key(account_1.public_key);
324        let accounts = connector.stream_accounts(selector).await?.collect::<Vec<_>>().await;
325        let count = connector.count_accounts(selector).await?;
326
327        assert_eq!(accounts.len(), count);
328        assert_eq!(accounts, vec![account_1.clone()]);
329
330        let selector = AccountSelector::default().with_public_only(true);
331        let accounts = connector.stream_accounts(selector).await?.collect::<Vec<_>>().await;
332        let count = connector.count_accounts(selector).await?;
333
334        assert_eq!(accounts.len(), count);
335        assert_eq!(accounts, vec![account_2.clone()]);
336
337        let selector = AccountSelector::default()
338            .with_chain_key(account_1.chain_addr)
339            .with_public_only(true);
340        let accounts = connector.stream_accounts(selector).await?.collect::<Vec<_>>().await;
341        let count = connector.count_accounts(selector).await?;
342
343        assert_eq!(count, 0);
344        assert!(accounts.is_empty());
345
346        Ok(())
347    }
348
349    #[test_log::test(tokio::test)]
350    async fn connector_should_announce_new_account_with_multiaddresses() -> anyhow::Result<()> {
351        let blokli_client = BlokliTestStateBuilder::default()
352            .with_balances([(
353                ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
354                XDaiBalance::new_base(1),
355            )])
356            .with_hopr_network_chain_info("rotsee")
357            .build_dynamic_client(MODULE_ADDR.into());
358
359        let mut connector = create_connector(blokli_client)?;
360        connector.connect().await?;
361
362        let offchain_key = OffchainKeypair::from_secret(&hex!(
363            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
364        ))?;
365        let multiaddress = Multiaddr::from_str("/ip4/127.0.0.1/tcp/1234")?;
366
367        connector.announce(&[multiaddress], &offchain_key).await?.await?;
368
369        insta::assert_yaml_snapshot!(*connector.client.snapshot());
370
371        let accounts = connector
372            .stream_accounts(AccountSelector::default().with_public_only(true))
373            .await?
374            .collect::<Vec<_>>()
375            .await;
376
377        assert_eq!(accounts.len(), 1);
378        assert_eq!(
379            accounts[0].get_multiaddrs(),
380            &[Multiaddr::from_str("/ip4/127.0.0.1/tcp/1234")?]
381        );
382
383        Ok(())
384    }
385
386    #[test_log::test(tokio::test)]
387    async fn connector_should_announce_new_account_without_multiaddresses() -> anyhow::Result<()> {
388        let blokli_client = BlokliTestStateBuilder::default()
389            .with_hopr_network_chain_info("rotsee")
390            .with_balances([(
391                ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
392                XDaiBalance::new_base(1),
393            )])
394            .build_dynamic_client(MODULE_ADDR.into());
395
396        let mut connector = create_connector(blokli_client)?;
397        connector.connect().await?;
398
399        let offchain_key = OffchainKeypair::from_secret(&hex!(
400            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
401        ))?;
402
403        connector.announce(&[], &offchain_key).await?.await?;
404
405        insta::assert_yaml_snapshot!(*connector.client.snapshot());
406
407        let accounts = connector
408            .stream_accounts(AccountSelector::default())
409            .await?
410            .collect::<Vec<_>>()
411            .await;
412
413        assert_eq!(accounts.len(), 1);
414        assert!(accounts[0].get_multiaddrs().is_empty());
415
416        Ok(())
417    }
418
419    #[test_log::test(tokio::test)]
420    async fn connector_should_not_reannounce_when_existing_account_has_same_multiaddresses() -> anyhow::Result<()> {
421        let offchain_key = OffchainKeypair::from_secret(&hex!(
422            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
423        ))?;
424        let multiaddr: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse()?;
425        let account = AccountEntry {
426            public_key: *offchain_key.public(),
427            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
428            entry_type: AccountType::Announced(vec![multiaddr.clone()]),
429            safe_address: Some([2u8; Address::SIZE].into()),
430            key_id: 1.into(),
431        };
432
433        let blokli_client = BlokliTestStateBuilder::default()
434            .with_accounts([(account.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1))])
435            .with_hopr_network_chain_info("rotsee")
436            .build_dynamic_client(MODULE_ADDR.into());
437
438        let mut connector = create_connector(blokli_client)?;
439        connector.connect().await?;
440
441        assert!(matches!(
442            connector.announce(&[], &offchain_key).await,
443            Err(AnnouncementError::AlreadyAnnounced)
444        ));
445
446        assert!(matches!(
447            connector.announce(&[multiaddr], &offchain_key).await,
448            Err(AnnouncementError::AlreadyAnnounced)
449        ));
450
451        insta::assert_yaml_snapshot!(*connector.client.snapshot());
452
453        Ok(())
454    }
455
456    #[tokio::test]
457    async fn connector_should_reannounce_when_existing_account_has_no_multiaddresses() -> anyhow::Result<()> {
458        let offchain_key = OffchainKeypair::from_secret(&hex!(
459            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
460        ))?;
461        let multiaddr: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse()?;
462        let account = AccountEntry {
463            public_key: *offchain_key.public(),
464            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
465            entry_type: AccountType::NotAnnounced,
466            safe_address: Some([2u8; Address::SIZE].into()),
467            key_id: 1.into(),
468        };
469
470        let blokli_client = BlokliTestStateBuilder::default()
471            .with_accounts([(account.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1))])
472            .with_hopr_network_chain_info("rotsee")
473            .build_dynamic_client(MODULE_ADDR.into());
474
475        let mut connector = create_connector(blokli_client)?;
476        connector.connect().await?;
477
478        assert!(matches!(
479            connector.announce(&[], &offchain_key).await,
480            Err(AnnouncementError::AlreadyAnnounced)
481        ));
482
483        connector
484            .announce(std::slice::from_ref(&multiaddr), &offchain_key)
485            .await?
486            .await?;
487
488        insta::assert_yaml_snapshot!(*connector.client.snapshot());
489
490        let accounts = connector
491            .stream_accounts(AccountSelector::default().with_public_only(true))
492            .await?
493            .collect::<Vec<_>>()
494            .await;
495
496        assert_eq!(accounts.len(), 1);
497        assert_eq!(accounts[0].get_multiaddrs(), &[multiaddr]);
498
499        Ok(())
500    }
501
502    #[tokio::test]
503    async fn connector_should_withdraw() -> anyhow::Result<()> {
504        let blokli_client = BlokliTestStateBuilder::default()
505            .with_balances([([1u8; Address::SIZE].into(), HoprBalance::zero())])
506            .with_balances([([1u8; Address::SIZE].into(), XDaiBalance::zero())])
507            .with_balances([(
508                ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
509                XDaiBalance::new_base(10),
510            )])
511            .with_balances([(
512                ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
513                HoprBalance::new_base(1000),
514            )])
515            .with_hopr_network_chain_info("rotsee")
516            .build_dynamic_client(MODULE_ADDR.into());
517
518        let mut connector = create_connector(blokli_client)?;
519        connector.connect().await?;
520
521        connector
522            .withdraw(HoprBalance::new_base(10), &[1u8; Address::SIZE].into())
523            .await?
524            .await?;
525        connector
526            .withdraw(XDaiBalance::new_base(1), &[1u8; Address::SIZE].into())
527            .await?
528            .await?;
529
530        insta::assert_yaml_snapshot!(*connector.client.snapshot());
531
532        Ok(())
533    }
534
535    #[tokio::test]
536    async fn connector_should_register_safe() -> anyhow::Result<()> {
537        let blokli_client = BlokliTestStateBuilder::default()
538            .with_balances([(
539                ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
540                XDaiBalance::new_base(10),
541            )])
542            .with_deployed_safes([DeployedSafe {
543                address: [1u8; Address::SIZE].into(),
544                owner: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
545                module: MODULE_ADDR.into(),
546                registered_nodes: vec![],
547            }])
548            .with_hopr_network_chain_info("rotsee")
549            .build_dynamic_client(MODULE_ADDR.into());
550
551        let mut connector = create_connector(blokli_client)?;
552        connector.connect().await?;
553
554        connector.register_safe(&[1u8; Address::SIZE].into()).await?.await?;
555
556        insta::assert_yaml_snapshot!(*connector.client.snapshot());
557
558        Ok(())
559    }
560
561    #[tokio::test]
562    async fn connector_should_register_safe_that_has_nodes_registered_already() -> anyhow::Result<()> {
563        let safe_addr: Address = [2u8; Address::SIZE].into();
564        let other_registered_node = ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address();
565
566        let blokli_client = BlokliTestStateBuilder::default()
567            .with_balances([(
568                ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
569                XDaiBalance::new_base(10),
570            )])
571            .with_deployed_safes([DeployedSafe {
572                address: safe_addr,
573                owner: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
574                module: MODULE_ADDR.into(),
575                registered_nodes: vec![other_registered_node],
576            }])
577            .with_hopr_network_chain_info("rotsee")
578            .build_dynamic_client(MODULE_ADDR.into());
579
580        let mut connector = create_connector(blokli_client)?;
581        connector.connect().await?;
582
583        connector.register_safe(&safe_addr).await?.await?;
584
585        insta::assert_yaml_snapshot!(*connector.client.snapshot());
586
587        Ok(())
588    }
589
590    #[tokio::test]
591    async fn connector_should_not_register_safe_that_does_not_exist() -> anyhow::Result<()> {
592        let safe_addr: Address = [2u8; Address::SIZE].into();
593
594        let blokli_client = BlokliTestStateBuilder::default()
595            .with_balances([(
596                ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
597                XDaiBalance::new_base(10),
598            )])
599            .with_hopr_network_chain_info("rotsee")
600            .build_dynamic_client(MODULE_ADDR.into());
601
602        let mut connector = create_connector(blokli_client)?;
603        connector.connect().await?;
604
605        assert!(connector.register_safe(&safe_addr).await.is_err());
606
607        insta::assert_yaml_snapshot!(*connector.client.snapshot());
608
609        Ok(())
610    }
611
612    #[tokio::test]
613    async fn connector_should_not_register_any_safe_when_node_already_registered() -> anyhow::Result<()> {
614        let blokli_client = BlokliTestStateBuilder::default()
615            .with_balances([(
616                ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
617                XDaiBalance::new_base(10),
618            )])
619            .with_deployed_safes([
620                DeployedSafe {
621                    address: [2u8; Address::SIZE].into(),
622                    owner: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
623                    module: MODULE_ADDR.into(),
624                    registered_nodes: vec![ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address()],
625                },
626                DeployedSafe {
627                    address: [1u8; Address::SIZE].into(),
628                    owner: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
629                    module: MODULE_ADDR.into(),
630                    registered_nodes: vec![],
631                },
632            ])
633            .with_hopr_network_chain_info("rotsee")
634            .build_dynamic_client(MODULE_ADDR.into());
635
636        let mut connector = create_connector(blokli_client)?;
637        connector.connect().await?;
638
639        assert!(
640            matches!(connector.register_safe(&[1u8; Address::SIZE].into()).await, Err(SafeRegistrationError::AlreadyRegistered(a)) if a == [2u8; Address::SIZE].into())
641        );
642
643        insta::assert_yaml_snapshot!(*connector.client.snapshot());
644
645        Ok(())
646    }
647}