hopr_chain_connector/connector/
mod.rs

1use std::{cmp::Ordering, str::FromStr, time::Duration};
2
3use blokli_client::api::{BlokliQueryClient, BlokliSubscriptionClient, BlokliTransactionClient};
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
5use futures_concurrency::stream::Merge;
6use futures_time::future::FutureExt as FuturesTimeExt;
7use hopr_api::chain::{ChainPathResolver, ChainReceipt, HoprKeyIdent};
8use hopr_async_runtime::AbortHandle;
9use hopr_chain_types::prelude::*;
10use hopr_crypto_types::prelude::*;
11use hopr_internal_types::prelude::*;
12use hopr_primitive_types::prelude::*;
13use petgraph::prelude::DiGraphMap;
14
15use crate::{
16    backend::Backend,
17    connector::{keys::HoprKeyMapper, sequencer::TransactionSequencer, values::CHAIN_INFO_CACHE_KEY},
18    errors::ConnectorError,
19    utils::{
20        ParsedChainInfo, model_to_account_entry, model_to_graph_entry, model_to_ticket_params,
21        process_channel_changes_into_events,
22    },
23};
24
25mod accounts;
26mod channels;
27mod events;
28mod keys;
29mod safe;
30mod sequencer;
31mod tickets;
32mod values;
33
34type EventsChannel = (
35    async_broadcast::Sender<ChainEvent>,
36    async_broadcast::InactiveReceiver<ChainEvent>,
37);
38
39const MIN_CONNECTION_TIMEOUT: Duration = Duration::from_millis(100);
40const MIN_TX_CONFIRM_TIMEOUT: Duration = Duration::from_secs(1);
41
42/// Configuration of the [`HoprBlockchainConnector`].
43#[derive(Clone, Copy, Debug, PartialEq, Eq, smart_default::SmartDefault)]
44pub struct BlockchainConnectorConfig {
45    /// Default time to wait until a transaction is confirmed.
46    ///
47    /// Default is 30 seconds, minimum is 1 second.
48    #[default(Duration::from_secs(30))]
49    pub tx_confirm_timeout: Duration,
50    /// Time to wait for [connection](HoprBlockchainConnector::connect) to complete.
51    ///
52    /// Default is 30 seconds, minimum is 100 milliseconds.
53    #[default(Duration::from_secs(30))]
54    pub connection_timeout: Duration,
55}
56
57/// A connector acting as middleware between the HOPR APIs (see the [`hopr_api`] crate) and the Blokli Client API (see
58/// the [`blokli_client`] crate).
59///
60/// The connector object cannot be cloned and shall be used inside an `Arc` if cloning is needed.
61pub struct HoprBlockchainConnector<C, B, P, R> {
62    payload_generator: P,
63    chain_key: ChainKeypair,
64    client: std::sync::Arc<C>,
65    graph: std::sync::Arc<parking_lot::RwLock<DiGraphMap<HoprKeyIdent, ChannelId, ahash::RandomState>>>,
66    backend: std::sync::Arc<B>,
67    connection_handle: Option<AbortHandle>,
68    sequencer: TransactionSequencer<C, R>,
69    events: EventsChannel,
70    cfg: BlockchainConnectorConfig,
71
72    // KeyId <-> OffchainPublicKey mapping
73    mapper: HoprKeyMapper<B>,
74    // Fast retrieval of chain keys by address
75    chain_to_packet: moka::future::Cache<Address, Option<OffchainPublicKey>, ahash::RandomState>,
76    // Fast retrieval of packet keys by chain key
77    packet_to_chain: moka::future::Cache<OffchainPublicKey, Option<Address>, ahash::RandomState>,
78    // Fast retrieval of channel entries by id
79    channel_by_id: moka::future::Cache<ChannelId, Option<ChannelEntry>, ahash::RandomState>,
80    // Fast retrieval of channel entries by parties
81    channel_by_parties: moka::future::Cache<ChannelParties, Option<ChannelEntry>, ahash::RandomState>,
82    // Contains only the chain info structure
83    values: moka::future::Cache<u32, ParsedChainInfo>,
84}
85
86const EXPECTED_NUM_NODES: usize = 10_000;
87const EXPECTED_NUM_CHANNELS: usize = 100_000;
88
89const DEFAULT_CACHE_TIMEOUT: Duration = Duration::from_mins(10);
90
91impl<B, C, P> HoprBlockchainConnector<C, B, P, P::TxRequest>
92where
93    B: Backend + Send + Sync + 'static,
94    C: BlokliSubscriptionClient + BlokliQueryClient + BlokliTransactionClient + Send + Sync + 'static,
95    P: PayloadGenerator + Send + Sync + 'static,
96    P::TxRequest: Send + Sync + 'static,
97{
98    /// Creates a new instance.
99    pub fn new(
100        chain_key: ChainKeypair,
101        cfg: BlockchainConnectorConfig,
102        client: C,
103        backend: B,
104        payload_generator: P,
105    ) -> Self {
106        let backend = std::sync::Arc::new(backend);
107        let (mut events_tx, events_rx) = async_broadcast::broadcast(1024);
108        events_tx.set_overflow(true);
109        events_tx.set_await_active(false);
110
111        let client = std::sync::Arc::new(client);
112        Self {
113            payload_generator,
114            graph: std::sync::Arc::new(parking_lot::RwLock::new(DiGraphMap::with_capacity_and_hasher(
115                EXPECTED_NUM_NODES,
116                EXPECTED_NUM_CHANNELS,
117                ahash::RandomState::default(),
118            ))),
119            backend: backend.clone(),
120            connection_handle: None,
121            sequencer: TransactionSequencer::new(chain_key.clone(), client.clone()),
122            events: (events_tx, events_rx.deactivate()),
123            client,
124            chain_key,
125            cfg,
126            mapper: HoprKeyMapper {
127                id_to_key: moka::sync::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
128                    .time_to_idle(DEFAULT_CACHE_TIMEOUT)
129                    .build_with_hasher(ahash::RandomState::default()),
130                key_to_id: moka::sync::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
131                    .time_to_idle(DEFAULT_CACHE_TIMEOUT)
132                    .build_with_hasher(ahash::RandomState::default()),
133                backend,
134            },
135            chain_to_packet: moka::future::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
136                .time_to_idle(DEFAULT_CACHE_TIMEOUT)
137                .build_with_hasher(ahash::RandomState::default()),
138            packet_to_chain: moka::future::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
139                .time_to_idle(DEFAULT_CACHE_TIMEOUT)
140                .build_with_hasher(ahash::RandomState::default()),
141            channel_by_id: moka::future::CacheBuilder::new(EXPECTED_NUM_CHANNELS as u64)
142                .time_to_idle(DEFAULT_CACHE_TIMEOUT)
143                .build_with_hasher(ahash::RandomState::default()),
144            channel_by_parties: moka::future::CacheBuilder::new(EXPECTED_NUM_CHANNELS as u64)
145                .time_to_idle(DEFAULT_CACHE_TIMEOUT)
146                .build_with_hasher(ahash::RandomState::default()),
147            values: moka::future::CacheBuilder::new(1)
148                .time_to_live(DEFAULT_CACHE_TIMEOUT)
149                .build(),
150        }
151    }
152
153    async fn do_connect(&self, timeout: Duration) -> Result<AbortHandle, ConnectorError> {
154        const TOLERANCE: f64 = 0.01;
155        let min_accounts = (self
156            .client
157            .count_accounts(blokli_client::api::AccountSelector::Any)
158            .await? as f64
159            * (1.0 - TOLERANCE))
160            .round() as u32;
161        let min_channels = (self
162            .client
163            .count_channels(blokli_client::api::ChannelSelector {
164                filter: None,
165                status: None,
166            })
167            .await? as f64
168            * (1.0 - TOLERANCE))
169            .round() as u32;
170        tracing::debug!(min_accounts, min_channels, "connection thresholds");
171
172        let server_health = self.client.query_health().await?;
173        if !server_health.eq_ignore_ascii_case("OK") {
174            tracing::error!(server_health, "blokli server not healthy");
175            return Err(ConnectorError::ServerNotHealthy);
176        }
177
178        let (abort_handle, abort_reg) = AbortHandle::new_pair();
179
180        let (connection_ready_tx, connection_ready_rx) = futures::channel::oneshot::channel();
181        let mut connection_ready_tx = Some(connection_ready_tx);
182
183        let client = self.client.clone();
184        let mapper = self.mapper.clone();
185        let backend = self.backend.clone();
186        let graph = self.graph.clone();
187        let event_tx = self.events.0.clone();
188        let me = self.chain_key.public().to_address();
189        let values_cache = self.values.clone();
190
191        let chain_to_packet = self.chain_to_packet.clone();
192        let packet_to_chain = self.packet_to_chain.clone();
193
194        let channel_by_id = self.channel_by_id.clone();
195        let channel_by_parties = self.channel_by_parties.clone();
196
197        // Query chain info to populate the cache
198        self.query_cached_chain_info().await?;
199
200        #[allow(clippy::large_enum_variant)]
201        #[derive(Debug)]
202        enum SubscribedEventType {
203            Account((AccountEntry, Option<AccountEntry>)),
204            Channel((ChannelEntry, Option<Vec<ChannelChange>>)),
205            WinningProbability((WinningProbability, Option<WinningProbability>)),
206            TicketPrice((HoprBalance, Option<HoprBalance>)),
207        }
208
209        hopr_async_runtime::prelude::spawn(async move {
210            let connections = client
211                .subscribe_accounts(blokli_client::api::AccountSelector::Any)
212                .and_then(|accounts| Ok((accounts, client.subscribe_graph()?)))
213                .and_then(|(accounts, channels)| Ok((accounts, channels, client.subscribe_ticket_params()?)));
214
215            if let Err(error) = connections {
216                if let Some(connection_ready_tx) = connection_ready_tx.take() {
217                    let _ = connection_ready_tx.send(Err(error));
218                }
219                return;
220            }
221
222            let (account_stream, channel_stream, ticket_params_stream) = connections.unwrap();
223
224            // Stream of Account events (Announcements)
225            let graph_clone = graph.clone();
226            let account_stream = account_stream
227                .inspect_ok(|entry| tracing::trace!(?entry, "new account entry"))
228                .map_err(ConnectorError::from)
229                .try_filter_map(|account| futures::future::ready(model_to_account_entry(account).map(Some)))
230                .and_then(move |account| {
231                    let graph = graph_clone.clone();
232                    let mapper = mapper.clone();
233                    let chain_to_packet = chain_to_packet.clone();
234                    let packet_to_chain = packet_to_chain.clone();
235                    hopr_async_runtime::prelude::spawn_blocking(move || {
236                        mapper.key_to_id.insert(account.public_key, Some(account.key_id));
237                        mapper.id_to_key.insert(account.key_id, Some(account.public_key));
238                        graph.write().add_node(account.key_id);
239                        mapper.backend.insert_account(account.clone()).map(|old| (account, old))
240                    })
241                    .map_err(|e| ConnectorError::BackendError(e.into()))
242                    .and_then(move |res| {
243                        let chain_to_packet = chain_to_packet.clone();
244                        let packet_to_chain = packet_to_chain.clone();
245                        async move {
246                            if let Ok((account, _)) = &res {
247                                // Rather update the cached entry than invalidating it
248                                chain_to_packet
249                                    .insert(account.chain_addr, Some(account.public_key))
250                                    .await;
251                                packet_to_chain
252                                    .insert(account.public_key, Some(account.chain_addr))
253                                    .await;
254                            }
255                            res.map(SubscribedEventType::Account)
256                                .map_err(|e| ConnectorError::BackendError(e.into()))
257                        }
258                    })
259                })
260                .fuse();
261
262            // Stream of channel graph updates
263            let channel_stream = channel_stream
264                .map_err(ConnectorError::from)
265                .inspect_ok(|entry| tracing::trace!(?entry, "new graph entry"))
266                .try_filter_map(|graph_event| futures::future::ready(model_to_graph_entry(graph_event).map(Some)))
267                .and_then(move |(src, dst, channel)| {
268                    let graph = graph.clone();
269                    let backend = backend.clone();
270                    let channel_by_id = channel_by_id.clone();
271                    let channel_by_parties = channel_by_parties.clone();
272                    hopr_async_runtime::prelude::spawn_blocking(move || {
273                        graph.write().add_edge(src.key_id, dst.key_id, *channel.get_id());
274                        backend
275                            .insert_channel(channel)
276                            .map(|old| (channel, old.map(|old| old.diff(&channel))))
277                    })
278                    .map_err(|e| ConnectorError::BackendError(e.into()))
279                    .and_then(move |res| {
280                        let channel_by_id = channel_by_id.clone();
281                        let channel_by_parties = channel_by_parties.clone();
282                        async move {
283                            if let Ok((channel, _)) = &res {
284                                // Rather update the cached entry than invalidating it
285                                channel_by_id.insert(*channel.get_id(), Some(*channel)).await;
286                                channel_by_parties
287                                    .insert(ChannelParties::from(channel), Some(*channel))
288                                    .await;
289                            }
290                            res.map(SubscribedEventType::Channel)
291                                .map_err(|e| ConnectorError::BackendError(e.into()))
292                        }
293                    })
294                })
295                .fuse();
296
297            // Stream of ticket parameter updates (ticket price, minimum winning probability)
298            let ticket_params_stream = ticket_params_stream
299                .map_err(ConnectorError::from)
300                .inspect_ok(|entry| tracing::trace!(?entry, "new ticket params"))
301                .try_filter_map(|ticket_value_event| {
302                    futures::future::ready(model_to_ticket_params(ticket_value_event).map(Some))
303                })
304                .and_then(|(new_ticket_price, new_win_prob)| {
305                    let values_cache = values_cache.clone();
306                    async move {
307                        let mut events = Vec::<SubscribedEventType>::new();
308                        values_cache
309                            .entry(CHAIN_INFO_CACHE_KEY)
310                            .and_compute_with(|cached_entry| {
311                                futures::future::ready(match cached_entry {
312                                    Some(chain_info) => {
313                                        let mut chain_info = chain_info.into_value();
314                                        if chain_info.ticket_price != new_ticket_price {
315                                            events.push(SubscribedEventType::TicketPrice((
316                                                new_ticket_price,
317                                                Some(chain_info.ticket_price),
318                                            )));
319                                            chain_info.ticket_price = new_ticket_price;
320                                        }
321                                        if !chain_info.ticket_win_prob.approx_eq(&new_win_prob) {
322                                            events.push(SubscribedEventType::WinningProbability((
323                                                new_win_prob,
324                                                Some(chain_info.ticket_win_prob),
325                                            )));
326                                            chain_info.ticket_win_prob = new_win_prob;
327                                        }
328
329                                        if !events.is_empty() {
330                                            moka::ops::compute::Op::Put(chain_info)
331                                        } else {
332                                            moka::ops::compute::Op::Nop
333                                        }
334                                    }
335                                    None => {
336                                        tracing::warn!(
337                                            "chain info not present in the cache before ticket params update"
338                                        );
339                                        events.push(SubscribedEventType::TicketPrice((new_ticket_price, None)));
340                                        events.push(SubscribedEventType::WinningProbability((new_win_prob, None)));
341                                        moka::ops::compute::Op::Nop
342                                    }
343                                })
344                            })
345                            .await;
346                        Ok(futures::stream::iter(events).map(Ok::<_, ConnectorError>))
347                    }
348                })
349                .try_flatten()
350                .fuse();
351
352            let mut account_counter = 0;
353            let mut channel_counter = 0;
354            if min_accounts == 0 && min_channels == 0 {
355                tracing::debug!(account_counter, channel_counter, "on-chain graph has been synced");
356                let _ = connection_ready_tx.take().unwrap().send(Ok(()));
357            }
358
359            futures::stream::Abortable::new(
360                (account_stream, channel_stream, ticket_params_stream).merge(),
361                abort_reg,
362            )
363            .inspect_ok(move |event_type| {
364                if connection_ready_tx.is_some() {
365                    match event_type {
366                        SubscribedEventType::Account(_) => account_counter += 1,
367                        SubscribedEventType::Channel(_) => channel_counter += 1,
368                        _ => {}
369                    }
370
371                    // Send the completion notification
372                    // once we reach the expected number of accounts and channels with
373                    // the given tolerance
374                    if account_counter >= min_accounts && channel_counter >= min_channels {
375                        tracing::debug!(account_counter, channel_counter, "on-chain graph has been synced");
376                        let _ = connection_ready_tx.take().unwrap().send(Ok(()));
377                    }
378                }
379            })
380            .for_each(|event_type| {
381                let event_tx = event_tx.clone();
382                async move {
383                    match event_type {
384                        Ok(SubscribedEventType::Account((new_account, old_account))) => {
385                            tracing::debug!(%new_account, "account inserted");
386                            // We only track public accounts as events and also
387                            // broadcast announcements of already existing accounts (old_account == None).
388                            if new_account.has_announced_with_routing_info()
389                                && old_account.is_none_or(|a| !a.has_announced_with_routing_info())
390                            {
391                                tracing::debug!(account = %new_account, "new announcement");
392                                let _ = event_tx
393                                    .broadcast_direct(ChainEvent::Announcement(new_account.clone()))
394                                    .await;
395                            }
396                        }
397                        Ok(SubscribedEventType::Channel((new_channel, Some(changes)))) => {
398                            tracing::debug!(
399                                id = %new_channel.get_id(),
400                                src = %new_channel.source, dst = %new_channel.destination,
401                                num_changes = changes.len(),
402                                "channel updated"
403                            );
404                            process_channel_changes_into_events(new_channel, changes, &me, &event_tx).await;
405                        }
406                        Ok(SubscribedEventType::Channel((new_channel, None))) => {
407                            tracing::debug!(
408                                id = %new_channel.get_id(),
409                                src = %new_channel.source, dst = %new_channel.destination,
410                                "channel opened"
411                            );
412                            let _ = event_tx.broadcast_direct(ChainEvent::ChannelOpened(new_channel)).await;
413                        }
414                        Ok(SubscribedEventType::WinningProbability((new, old))) => {
415                            let old = old.unwrap_or_default();
416                            match new.approx_cmp(&old) {
417                                Ordering::Less => {
418                                    tracing::debug!(%new, %old, "winning probability decreased");
419                                    let _ = event_tx
420                                        .broadcast_direct(ChainEvent::WinningProbabilityDecreased(new))
421                                        .await;
422                                }
423                                Ordering::Greater => {
424                                    tracing::debug!(%new, %old, "winning probability increased");
425                                    let _ = event_tx
426                                        .broadcast_direct(ChainEvent::WinningProbabilityIncreased(new))
427                                        .await;
428                                }
429                                Ordering::Equal => {}
430                            }
431                        }
432                        Ok(SubscribedEventType::TicketPrice((new, old))) => {
433                            tracing::debug!(%new, ?old, "ticket price changed");
434                            let _ = event_tx.broadcast_direct(ChainEvent::TicketPriceChanged(new)).await;
435                        }
436                        Err(error) => {
437                            tracing::error!(%error, "error processing account/graph/ticket params subscription");
438                        }
439                    }
440                }
441            })
442            .await;
443        });
444
445        connection_ready_rx
446            .timeout(futures_time::time::Duration::from(timeout))
447            .map(|res| match res {
448                Ok(Ok(Ok(_))) => Ok(abort_handle),
449                Ok(Ok(Err(error))) => {
450                    abort_handle.abort();
451                    Err(ConnectorError::from(error))
452                }
453                Ok(Err(_)) => {
454                    abort_handle.abort();
455                    Err(ConnectorError::InvalidState("failed to determine connection state"))
456                }
457                Err(_) => {
458                    abort_handle.abort();
459                    tracing::error!(min_accounts, min_channels, "connection timeout when syncing");
460                    Err(ConnectorError::ConnectionTimeout)
461                }
462            })
463            .await
464    }
465
466    /// Connects to the chain using the underlying client, syncs all on-chain data,
467    /// and subscribes for all future updates.
468    ///
469    /// If the connection does not finish within
470    /// [`BlockchainConnectorConfig::connection_timeout`](BlockchainConnectorConfig)
471    /// the [`ConnectorError::ConnectionTimeout`] error is returned.
472    ///
473    /// Most of the operations with the Connector will fail if it is not connected first.
474    ///
475    /// There are some notable exceptions that do not require a prior call to `connect`:
476    /// - all the [`ChainValues`](hopr_api::chain::ChainValues) methods,
477    /// - all the [`ChainReadSafeOperations`](hopr_api::chain::ChainReadSafeOperations) methods,
478    /// - [`me`](hopr_api::chain::ChainReadChannelOperations::me)
479    ///
480    /// If you wish to only call operations from the above Chain APIs, consider constructing
481    /// the [`HoprBlockchainReader`](crate::HoprBlockchainReader) instead.
482    pub async fn connect(&mut self) -> Result<(), ConnectorError> {
483        if self
484            .connection_handle
485            .as_ref()
486            .filter(|handle| !handle.is_aborted())
487            .is_some()
488        {
489            return Err(ConnectorError::InvalidState("connector is already connected"));
490        }
491
492        let abort_handle = self
493            .do_connect(self.cfg.connection_timeout.max(MIN_CONNECTION_TIMEOUT))
494            .await?;
495
496        if let Err(error) = self.sequencer.start().await {
497            abort_handle.abort();
498            return Err(error);
499        }
500
501        self.connection_handle = Some(abort_handle);
502
503        tracing::info!(node = %self.chain_key.public().to_address(), "connected to chain as node");
504        Ok(())
505    }
506
507    /// Returns the reference to the underlying client.
508    pub fn client(&self) -> &C {
509        self.client.as_ref()
510    }
511
512    /// Checks if the connector is [connected](HoprBlockchainConnector::connect) to the chain.
513    pub fn is_connected(&self) -> bool {
514        self.check_connection_state().is_ok()
515    }
516}
517
518impl<B, C, P> HoprBlockchainConnector<C, B, P, P::TxRequest>
519where
520    C: BlokliTransactionClient + Send + Sync + 'static,
521    P: PayloadGenerator + Send + Sync,
522    P::TxRequest: Send + Sync,
523{
524    async fn send_tx<'a>(
525        &'a self,
526        tx_req: P::TxRequest,
527    ) -> Result<impl Future<Output = Result<ChainReceipt, ConnectorError>> + Send + 'a, ConnectorError> {
528        Ok(self
529            .sequencer
530            .enqueue_transaction(tx_req, self.cfg.tx_confirm_timeout.max(MIN_TX_CONFIRM_TIMEOUT))
531            .await?
532            .and_then(|tx| {
533                futures::future::ready(
534                    ChainReceipt::from_str(&tx.transaction_hash.0)
535                        .map_err(|_| ConnectorError::TypeConversion("invalid tx hash".into())),
536                )
537            }))
538    }
539}
540
541impl<B, C, P, R> HoprBlockchainConnector<C, R, B, P> {
542    pub(crate) fn check_connection_state(&self) -> Result<(), ConnectorError> {
543        self.connection_handle
544            .as_ref()
545            .filter(|handle| !handle.is_aborted()) // Do a safety check
546            .ok_or(ConnectorError::InvalidState("connector is not connected"))
547            .map(|_| ())
548    }
549
550    /// Invalidates all cached on-chain data.
551    pub fn invalidate_caches(&self) {
552        self.channel_by_parties.invalidate_all();
553        self.channel_by_id.invalidate_all();
554        self.packet_to_chain.invalidate_all();
555        self.chain_to_packet.invalidate_all();
556        self.values.invalidate_all();
557    }
558}
559
560impl<B, C, P, R> Drop for HoprBlockchainConnector<C, R, B, P> {
561    fn drop(&mut self) {
562        self.events.0.close();
563        if let Some(abort_handle) = self.connection_handle.take() {
564            abort_handle.abort();
565        }
566    }
567}
568
569impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
570where
571    B: Backend + Send + Sync + 'static,
572    C: Send + Sync,
573    P: Send + Sync,
574    R: Send + Sync,
575{
576    /// Returns a [`PathAddressResolver`] using this connector.
577    pub fn as_path_resolver(&self) -> ChainPathResolver<'_, Self> {
578        self.into()
579    }
580}
581
582#[cfg(test)]
583pub(crate) mod tests {
584    use blokli_client::BlokliTestState;
585    use hex_literal::hex;
586    use hopr_chain_types::contract_addresses_for_network;
587
588    use super::*;
589    use crate::{InMemoryBackend, testing::BlokliTestStateBuilder};
590
591    pub const PRIVATE_KEY_1: [u8; 32] = hex!("c14b8faa0a9b8a5fa4453664996f23a7e7de606d42297d723fc4a794f375e260");
592    pub const PRIVATE_KEY_2: [u8; 32] = hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775");
593    pub const MODULE_ADDR: [u8; 20] = hex!("1111111111111111111111111111111111111111");
594
595    pub type TestConnector<C> = HoprBlockchainConnector<
596        C,
597        InMemoryBackend,
598        SafePayloadGenerator,
599        <SafePayloadGenerator as PayloadGenerator>::TxRequest,
600    >;
601
602    pub fn create_connector<C>(blokli_client: C) -> anyhow::Result<TestConnector<C>>
603    where
604        C: BlokliQueryClient + BlokliTransactionClient + BlokliSubscriptionClient + Send + Sync + 'static,
605    {
606        let ckp = ChainKeypair::from_secret(&PRIVATE_KEY_1)?;
607
608        Ok(HoprBlockchainConnector::new(
609            ckp.clone(),
610            Default::default(),
611            blokli_client,
612            InMemoryBackend::default(),
613            SafePayloadGenerator::new(
614                &ckp,
615                contract_addresses_for_network("rotsee").unwrap().1,
616                MODULE_ADDR.into(),
617            ),
618        ))
619    }
620
621    #[tokio::test]
622    async fn connector_should_connect() -> anyhow::Result<()> {
623        let blokli_client = BlokliTestStateBuilder::default().build_static_client();
624
625        let mut connector = create_connector(blokli_client)?;
626        connector.connect().await?;
627
628        assert!(connector.is_connected());
629
630        Ok(())
631    }
632
633    #[tokio::test]
634    async fn connector_should_not_connect_when_blokli_not_healthy() -> anyhow::Result<()> {
635        let mut state = BlokliTestState::default();
636        state.health = "DOWN".into();
637
638        let blokli_client = BlokliTestStateBuilder::from(state).build_static_client();
639
640        let mut connector = create_connector(blokli_client)?;
641
642        let res = connector.connect().await;
643
644        assert!(matches!(res, Err(ConnectorError::ServerNotHealthy)));
645        assert!(!connector.is_connected());
646
647        Ok(())
648    }
649}