hopr_chain_connector/connector/
accounts.rs

1use std::str::FromStr;
2
3use blokli_client::api::{BlokliQueryClient, BlokliSubscriptionClient, BlokliTransactionClient};
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::BoxFuture, pin_mut, stream::BoxStream};
5use futures_time::future::FutureExt as TimeFutureExt;
6use hopr_api::chain::{AccountSelector, AnnouncementError, ChainReceipt, Multiaddr, SafeRegistrationError};
7use hopr_chain_types::prelude::*;
8use hopr_crypto_types::prelude::*;
9use hopr_internal_types::{
10    account::AccountEntry,
11    prelude::{AnnouncementData, KeyBinding},
12};
13use hopr_primitive_types::prelude::*;
14
15use crate::{
16    backend::Backend, connector::HoprBlockchainConnector, errors::ConnectorError, utils::model_to_account_entry,
17};
18
19impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
20where
21    B: Backend + Send + Sync + 'static,
22{
23    pub(crate) fn build_account_stream(
24        &self,
25        selector: AccountSelector,
26    ) -> Result<impl futures::Stream<Item = AccountEntry> + Send + 'static, ConnectorError> {
27        let mut accounts = self.graph.read().nodes().collect::<Vec<_>>();
28
29        // Ensure the returned accounts are always perfectly ordered by their id.
30        accounts.sort_unstable();
31
32        let backend = self.backend.clone();
33        Ok(futures::stream::iter(accounts).filter_map(move |account_id| {
34            let backend = backend.clone();
35            // This avoids the cache on purpose so it does not get spammed
36            async move {
37                match hopr_async_runtime::prelude::spawn_blocking(move || backend.get_account_by_id(&account_id)).await
38                {
39                    Ok(Ok(value)) => value.filter(|c| selector.satisfies(c)),
40                    Ok(Err(error)) => {
41                        tracing::error!(%error, %account_id, "backend error when looking up account");
42                        None
43                    }
44                    Err(error) => {
45                        tracing::error!(%error, %account_id, "join error when looking up account");
46                        None
47                    }
48                }
49            }
50        }))
51    }
52}
53
54#[async_trait::async_trait]
55impl<B, C, P, R> hopr_api::chain::ChainReadAccountOperations for HoprBlockchainConnector<C, B, P, R>
56where
57    B: Backend + Send + Sync + 'static,
58    C: BlokliQueryClient + BlokliSubscriptionClient + Send + Sync + 'static,
59    P: Send + Sync + 'static,
60    R: Send + Sync,
61{
62    type Error = ConnectorError;
63
64    async fn stream_accounts<'a>(
65        &'a self,
66        selector: AccountSelector,
67    ) -> Result<BoxStream<'a, AccountEntry>, Self::Error> {
68        self.check_connection_state()?;
69
70        Ok(self.build_account_stream(selector)?.boxed())
71    }
72
73    async fn count_accounts(&self, selector: AccountSelector) -> Result<usize, Self::Error> {
74        self.check_connection_state()?;
75
76        Ok(self.stream_accounts(selector).await?.count().await)
77    }
78
79    async fn await_key_binding(
80        &self,
81        offchain_key: &OffchainPublicKey,
82        timeout: std::time::Duration,
83    ) -> Result<AccountEntry, Self::Error> {
84        self.check_connection_state()?;
85
86        let selector = blokli_client::api::v1::AccountSelector::PacketKey((*offchain_key).into());
87        if let Some(node) = self.client.query_accounts(selector.clone()).await?.first().cloned() {
88            return model_to_account_entry(node);
89        }
90
91        let stream = self.client.subscribe_accounts(selector)?.map_err(ConnectorError::from);
92        pin_mut!(stream);
93        if let Some(node) = stream
94            .try_next()
95            .timeout(futures_time::time::Duration::from(
96                timeout.max(std::time::Duration::from_secs(1)),
97            ))
98            .await??
99        {
100            model_to_account_entry(node)
101        } else {
102            Err(ConnectorError::AccountDoesNotExist(format!(
103                "with packet key {offchain_key}"
104            )))
105        }
106    }
107}
108
109#[async_trait::async_trait]
110impl<B, C, P> hopr_api::chain::ChainWriteAccountOperations for HoprBlockchainConnector<C, B, P, P::TxRequest>
111where
112    B: Send + Sync,
113    C: BlokliTransactionClient + BlokliQueryClient + Send + Sync + 'static,
114    P: PayloadGenerator + Send + Sync + 'static,
115    P::TxRequest: Send + Sync + 'static,
116{
117    type Error = ConnectorError;
118
119    async fn announce(
120        &self,
121        multiaddrs: &[Multiaddr],
122        key: &OffchainKeypair,
123    ) -> Result<BoxFuture<'_, Result<ChainReceipt, Self::Error>>, AnnouncementError<Self::Error>> {
124        self.check_connection_state().map_err(AnnouncementError::processing)?;
125
126        let new_announced_addrs = ahash::HashSet::from_iter(multiaddrs.iter().map(|a| a.to_string()));
127
128        let existing_account = self
129            .client
130            .query_accounts(blokli_client::api::v1::AccountSelector::Address(
131                self.chain_key.public().to_address().into(),
132            ))
133            .await
134            .map_err(AnnouncementError::processing)?
135            .into_iter()
136            .find(|account| OffchainPublicKey::from_str(&account.packet_key).is_ok_and(|k| &k == key.public()));
137
138        if let Some(account) = &existing_account {
139            let old_announced_addrs = ahash::HashSet::from_iter(account.multi_addresses.iter().cloned());
140            if old_announced_addrs == new_announced_addrs || old_announced_addrs.is_superset(&new_announced_addrs) {
141                return Err(AnnouncementError::AlreadyAnnounced);
142            }
143        }
144
145        // No key-binding fee must be set when the account already exists (with multi-addresses or not)
146        let key_binding = KeyBinding::new(self.chain_key.public().to_address(), key);
147        let key_binding_fee = if existing_account.is_none() {
148            self.query_cached_chain_info()
149                .await
150                .map_err(AnnouncementError::processing)?
151                .key_binding_fee
152        } else {
153            HoprBalance::zero()
154        };
155
156        let tx_req = self
157            .payload_generator
158            .announce(
159                AnnouncementData::new(key_binding, multiaddrs.first().cloned())
160                    .map_err(|e| AnnouncementError::ProcessingError(ConnectorError::OtherError(e.into())))?,
161                key_binding_fee,
162            )
163            .map_err(AnnouncementError::processing)?;
164
165        Ok(self
166            .send_tx(tx_req)
167            .map_err(AnnouncementError::processing)
168            .await?
169            .boxed())
170    }
171
172    async fn withdraw<Cy: Currency + Send>(
173        &self,
174        balance: Balance<Cy>,
175        recipient: &Address,
176    ) -> Result<BoxFuture<'_, Result<ChainReceipt, Self::Error>>, Self::Error> {
177        self.check_connection_state()?;
178
179        let tx_req = self.payload_generator.transfer(*recipient, balance)?;
180
181        Ok(self.send_tx(tx_req).await?.boxed())
182    }
183
184    async fn register_safe(
185        &self,
186        safe_address: &Address,
187    ) -> Result<BoxFuture<'_, Result<ChainReceipt, Self::Error>>, SafeRegistrationError<Self::Error>> {
188        self.check_connection_state()
189            .map_err(SafeRegistrationError::processing)?;
190
191        // Check if the node isn't already registered with some Safe
192        let my_node_addr = self.chain_key.public().to_address();
193        if let Some(safe_with_node) = self
194            .client
195            .query_safe(blokli_client::api::v1::SafeSelector::RegisteredNode(
196                my_node_addr.into(),
197            ))
198            .await
199            .map_err(SafeRegistrationError::processing)?
200        {
201            // If already registered, return which Safe it is registered with
202            let registered_safe_addr =
203                Address::from_hex(&safe_with_node.address).map_err(SafeRegistrationError::processing)?;
204            return Err(SafeRegistrationError::AlreadyRegistered(registered_safe_addr));
205        }
206
207        // Check if Safe with this address even exists (has been deployed)
208        if self
209            .client
210            .query_safe(blokli_client::api::v1::SafeSelector::SafeAddress(
211                (*safe_address).into(),
212            ))
213            .await
214            .map_err(SafeRegistrationError::processing)?
215            .is_none()
216        {
217            return Err(SafeRegistrationError::ProcessingError(
218                ConnectorError::SafeDoesNotExist(*safe_address),
219            ));
220        }
221
222        let tx_req = self
223            .payload_generator
224            .register_safe_by_node(*safe_address)
225            .map_err(SafeRegistrationError::processing)?;
226
227        Ok(self
228            .send_tx(tx_req)
229            .map_err(SafeRegistrationError::processing)
230            .await?
231            .boxed())
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use hex_literal::hex;
238    use hopr_api::chain::{ChainReadAccountOperations, ChainWriteAccountOperations, DeployedSafe};
239    use hopr_internal_types::account::AccountType;
240
241    use super::*;
242    use crate::{
243        connector::tests::{MODULE_ADDR, PRIVATE_KEY_1, PRIVATE_KEY_2, create_connector},
244        testing::BlokliTestStateBuilder,
245    };
246
247    #[tokio::test]
248    async fn connector_should_stream_and_count_accounts() -> anyhow::Result<()> {
249        let account = AccountEntry {
250            public_key: *OffchainKeypair::random().public(),
251            chain_addr: [1u8; Address::SIZE].into(),
252            entry_type: AccountType::NotAnnounced,
253            safe_address: Some([2u8; Address::SIZE].into()),
254            key_id: 1.into(),
255        };
256
257        let blokli_client = BlokliTestStateBuilder::default()
258            .with_accounts([(account.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1))])
259            .build_static_client();
260
261        let mut connector = create_connector(blokli_client)?;
262        connector.connect().await?;
263
264        let accounts = connector
265            .stream_accounts(AccountSelector::default())
266            .await?
267            .collect::<Vec<_>>()
268            .await;
269
270        let count = connector.count_accounts(AccountSelector::default()).await?;
271
272        assert_eq!(accounts.len(), 1);
273        assert_eq!(count, 1);
274        assert_eq!(&accounts[0], &account);
275
276        Ok(())
277    }
278
279    #[tokio::test]
280    async fn connector_should_stream_and_count_accounts_with_selector() -> anyhow::Result<()> {
281        let account_1 = AccountEntry {
282            public_key: *OffchainKeypair::random().public(),
283            chain_addr: [1u8; Address::SIZE].into(),
284            entry_type: AccountType::NotAnnounced,
285            safe_address: Some([2u8; Address::SIZE].into()),
286            key_id: 1.into(),
287        };
288
289        let account_2 = AccountEntry {
290            public_key: *OffchainKeypair::random().public(),
291            chain_addr: [2u8; Address::SIZE].into(),
292            entry_type: AccountType::Announced(vec!["/ip4/1.2.3.4/tcp/1234".parse()?]),
293            safe_address: Some([3u8; Address::SIZE].into()),
294            key_id: 2.into(),
295        };
296
297        let blokli_client = BlokliTestStateBuilder::default()
298            .with_accounts([
299                (account_1.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
300                (account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
301            ])
302            .build_static_client();
303
304        let mut connector = create_connector(blokli_client)?;
305        connector.connect().await?;
306
307        let selector = AccountSelector::default().with_chain_key(account_1.chain_addr);
308        let accounts = connector.stream_accounts(selector).await?.collect::<Vec<_>>().await;
309        let count = connector.count_accounts(selector).await?;
310
311        assert_eq!(accounts.len(), count);
312        assert_eq!(accounts, vec![account_1.clone()]);
313
314        let selector = AccountSelector::default().with_offchain_key(account_1.public_key);
315        let accounts = connector.stream_accounts(selector).await?.collect::<Vec<_>>().await;
316        let count = connector.count_accounts(selector).await?;
317
318        assert_eq!(accounts.len(), count);
319        assert_eq!(accounts, vec![account_1.clone()]);
320
321        let selector = AccountSelector::default().with_public_only(true);
322        let accounts = connector.stream_accounts(selector).await?.collect::<Vec<_>>().await;
323        let count = connector.count_accounts(selector).await?;
324
325        assert_eq!(accounts.len(), count);
326        assert_eq!(accounts, vec![account_2.clone()]);
327
328        let selector = AccountSelector::default()
329            .with_chain_key(account_1.chain_addr)
330            .with_public_only(true);
331        let accounts = connector.stream_accounts(selector).await?.collect::<Vec<_>>().await;
332        let count = connector.count_accounts(selector).await?;
333
334        assert_eq!(count, 0);
335        assert!(accounts.is_empty());
336
337        Ok(())
338    }
339
340    #[test_log::test(tokio::test)]
341    async fn connector_should_announce_new_account_with_multiaddresses() -> anyhow::Result<()> {
342        let blokli_client = BlokliTestStateBuilder::default()
343            .with_balances([(
344                ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
345                XDaiBalance::new_base(1),
346            )])
347            .with_hopr_network_chain_info("rotsee")
348            .build_dynamic_client(MODULE_ADDR.into());
349
350        let mut connector = create_connector(blokli_client)?;
351        connector.connect().await?;
352
353        let offchain_key = OffchainKeypair::from_secret(&hex!(
354            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
355        ))?;
356        let multiaddress = Multiaddr::from_str("/ip4/127.0.0.1/tcp/1234")?;
357
358        connector.announce(&[multiaddress], &offchain_key).await?.await?;
359
360        insta::assert_yaml_snapshot!(*connector.client.snapshot());
361
362        let accounts = connector
363            .stream_accounts(AccountSelector::default().with_public_only(true))
364            .await?
365            .collect::<Vec<_>>()
366            .await;
367
368        assert_eq!(accounts.len(), 1);
369        assert_eq!(
370            accounts[0].get_multiaddrs(),
371            &[Multiaddr::from_str("/ip4/127.0.0.1/tcp/1234")?]
372        );
373
374        Ok(())
375    }
376
377    #[test_log::test(tokio::test)]
378    async fn connector_should_announce_new_account_without_multiaddresses() -> anyhow::Result<()> {
379        let blokli_client = BlokliTestStateBuilder::default()
380            .with_hopr_network_chain_info("rotsee")
381            .with_balances([(
382                ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
383                XDaiBalance::new_base(1),
384            )])
385            .build_dynamic_client(MODULE_ADDR.into());
386
387        let mut connector = create_connector(blokli_client)?;
388        connector.connect().await?;
389
390        let offchain_key = OffchainKeypair::from_secret(&hex!(
391            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
392        ))?;
393
394        connector.announce(&[], &offchain_key).await?.await?;
395
396        insta::assert_yaml_snapshot!(*connector.client.snapshot());
397
398        let accounts = connector
399            .stream_accounts(AccountSelector::default())
400            .await?
401            .collect::<Vec<_>>()
402            .await;
403
404        assert_eq!(accounts.len(), 1);
405        assert!(accounts[0].get_multiaddrs().is_empty());
406
407        Ok(())
408    }
409
410    #[test_log::test(tokio::test)]
411    async fn connector_should_not_reannounce_when_existing_account_has_same_multiaddresses() -> anyhow::Result<()> {
412        let offchain_key = OffchainKeypair::from_secret(&hex!(
413            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
414        ))?;
415        let multiaddr: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse()?;
416        let account = AccountEntry {
417            public_key: *offchain_key.public(),
418            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
419            entry_type: AccountType::Announced(vec![multiaddr.clone()]),
420            safe_address: Some([2u8; Address::SIZE].into()),
421            key_id: 1.into(),
422        };
423
424        let blokli_client = BlokliTestStateBuilder::default()
425            .with_accounts([(account.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1))])
426            .with_hopr_network_chain_info("rotsee")
427            .build_dynamic_client(MODULE_ADDR.into());
428
429        let mut connector = create_connector(blokli_client)?;
430        connector.connect().await?;
431
432        assert!(matches!(
433            connector.announce(&[], &offchain_key).await,
434            Err(AnnouncementError::AlreadyAnnounced)
435        ));
436
437        assert!(matches!(
438            connector.announce(&[multiaddr], &offchain_key).await,
439            Err(AnnouncementError::AlreadyAnnounced)
440        ));
441
442        insta::assert_yaml_snapshot!(*connector.client.snapshot());
443
444        Ok(())
445    }
446
447    #[tokio::test]
448    async fn connector_should_reannounce_when_existing_account_has_no_multiaddresses() -> anyhow::Result<()> {
449        let offchain_key = OffchainKeypair::from_secret(&hex!(
450            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
451        ))?;
452        let multiaddr: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse()?;
453        let account = AccountEntry {
454            public_key: *offchain_key.public(),
455            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
456            entry_type: AccountType::NotAnnounced,
457            safe_address: Some([2u8; Address::SIZE].into()),
458            key_id: 1.into(),
459        };
460
461        let blokli_client = BlokliTestStateBuilder::default()
462            .with_accounts([(account.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1))])
463            .with_hopr_network_chain_info("rotsee")
464            .build_dynamic_client(MODULE_ADDR.into());
465
466        let mut connector = create_connector(blokli_client)?;
467        connector.connect().await?;
468
469        assert!(matches!(
470            connector.announce(&[], &offchain_key).await,
471            Err(AnnouncementError::AlreadyAnnounced)
472        ));
473
474        connector.announce(&[multiaddr.clone()], &offchain_key).await?.await?;
475
476        insta::assert_yaml_snapshot!(*connector.client.snapshot());
477
478        let accounts = connector
479            .stream_accounts(AccountSelector::default().with_public_only(true))
480            .await?
481            .collect::<Vec<_>>()
482            .await;
483
484        assert_eq!(accounts.len(), 1);
485        assert_eq!(accounts[0].get_multiaddrs(), &[multiaddr]);
486
487        Ok(())
488    }
489
490    #[tokio::test]
491    async fn connector_should_withdraw() -> anyhow::Result<()> {
492        let blokli_client = BlokliTestStateBuilder::default()
493            .with_balances([([1u8; Address::SIZE].into(), HoprBalance::zero())])
494            .with_balances([([1u8; Address::SIZE].into(), XDaiBalance::zero())])
495            .with_balances([(
496                ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
497                XDaiBalance::new_base(10),
498            )])
499            .with_balances([(
500                ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
501                HoprBalance::new_base(1000),
502            )])
503            .with_hopr_network_chain_info("rotsee")
504            .build_dynamic_client(MODULE_ADDR.into());
505
506        let mut connector = create_connector(blokli_client)?;
507        connector.connect().await?;
508
509        connector
510            .withdraw(HoprBalance::new_base(10), &[1u8; Address::SIZE].into())
511            .await?
512            .await?;
513        connector
514            .withdraw(XDaiBalance::new_base(1), &[1u8; Address::SIZE].into())
515            .await?
516            .await?;
517
518        insta::assert_yaml_snapshot!(*connector.client.snapshot());
519
520        Ok(())
521    }
522
523    #[tokio::test]
524    async fn connector_should_register_safe() -> anyhow::Result<()> {
525        let blokli_client = BlokliTestStateBuilder::default()
526            .with_balances([(
527                ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
528                XDaiBalance::new_base(10),
529            )])
530            .with_deployed_safes([DeployedSafe {
531                address: [1u8; Address::SIZE].into(),
532                owner: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
533                module: MODULE_ADDR.into(),
534                registered_nodes: vec![],
535            }])
536            .with_hopr_network_chain_info("rotsee")
537            .build_dynamic_client(MODULE_ADDR.into());
538
539        let mut connector = create_connector(blokli_client)?;
540        connector.connect().await?;
541
542        connector.register_safe(&[1u8; Address::SIZE].into()).await?.await?;
543
544        insta::assert_yaml_snapshot!(*connector.client.snapshot());
545
546        Ok(())
547    }
548
549    #[tokio::test]
550    async fn connector_should_register_safe_that_has_nodes_registered_already() -> anyhow::Result<()> {
551        let safe_addr: Address = [2u8; Address::SIZE].into();
552        let other_registered_node = ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address();
553
554        let blokli_client = BlokliTestStateBuilder::default()
555            .with_balances([(
556                ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
557                XDaiBalance::new_base(10),
558            )])
559            .with_deployed_safes([DeployedSafe {
560                address: safe_addr,
561                owner: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
562                module: MODULE_ADDR.into(),
563                registered_nodes: vec![other_registered_node],
564            }])
565            .with_hopr_network_chain_info("rotsee")
566            .build_dynamic_client(MODULE_ADDR.into());
567
568        let mut connector = create_connector(blokli_client)?;
569        connector.connect().await?;
570
571        connector.register_safe(&safe_addr).await?.await?;
572
573        insta::assert_yaml_snapshot!(*connector.client.snapshot());
574
575        Ok(())
576    }
577
578    #[tokio::test]
579    async fn connector_should_not_register_safe_that_does_not_exist() -> anyhow::Result<()> {
580        let safe_addr: Address = [2u8; Address::SIZE].into();
581
582        let blokli_client = BlokliTestStateBuilder::default()
583            .with_balances([(
584                ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
585                XDaiBalance::new_base(10),
586            )])
587            .with_hopr_network_chain_info("rotsee")
588            .build_dynamic_client(MODULE_ADDR.into());
589
590        let mut connector = create_connector(blokli_client)?;
591        connector.connect().await?;
592
593        assert!(connector.register_safe(&safe_addr).await.is_err());
594
595        insta::assert_yaml_snapshot!(*connector.client.snapshot());
596
597        Ok(())
598    }
599
600    #[tokio::test]
601    async fn connector_should_not_register_any_safe_when_node_already_registered() -> anyhow::Result<()> {
602        let blokli_client = BlokliTestStateBuilder::default()
603            .with_balances([(
604                ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
605                XDaiBalance::new_base(10),
606            )])
607            .with_deployed_safes([
608                DeployedSafe {
609                    address: [2u8; Address::SIZE].into(),
610                    owner: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
611                    module: MODULE_ADDR.into(),
612                    registered_nodes: vec![ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address()],
613                },
614                DeployedSafe {
615                    address: [1u8; Address::SIZE].into(),
616                    owner: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
617                    module: MODULE_ADDR.into(),
618                    registered_nodes: vec![],
619                },
620            ])
621            .with_hopr_network_chain_info("rotsee")
622            .build_dynamic_client(MODULE_ADDR.into());
623
624        let mut connector = create_connector(blokli_client)?;
625        connector.connect().await?;
626
627        assert!(
628            matches!(connector.register_safe(&[1u8; Address::SIZE].into()).await, Err(SafeRegistrationError::AlreadyRegistered(a)) if a == [2u8; Address::SIZE].into())
629        );
630
631        insta::assert_yaml_snapshot!(*connector.client.snapshot());
632
633        Ok(())
634    }
635}