Skip to main content

hopr_chain_connector/connector/
mod.rs

1use std::{cmp::Ordering, str::FromStr, time::Duration};
2
3use blokli_client::api::{BlokliQueryClient, BlokliSubscriptionClient, BlokliTransactionClient};
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
5use futures_concurrency::stream::Merge;
6use futures_time::future::FutureExt as FuturesTimeExt;
7use hopr_api::{
8    chain::{ChainPathResolver, ChainReceipt, HoprKeyIdent},
9    types::{chain::prelude::*, crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
10};
11use hopr_async_runtime::AbortHandle;
12use petgraph::prelude::DiGraphMap;
13
14use crate::{
15    backend::Backend,
16    connector::{keys::HoprKeyMapper, sequencer::TransactionSequencer, values::CHAIN_INFO_CACHE_KEY},
17    errors::ConnectorError,
18    utils::{
19        ParsedChainInfo, model_to_account_entry, model_to_graph_entry, model_to_ticket_params,
20        process_channel_changes_into_events,
21    },
22};
23
24mod accounts;
25mod channels;
26mod events;
27mod keys;
28mod safe;
29mod sequencer;
30mod tickets;
31mod values;
32
33type EventsChannel = (
34    async_broadcast::Sender<ChainEvent>,
35    async_broadcast::InactiveReceiver<ChainEvent>,
36);
37
38const MIN_CONNECTION_TIMEOUT: Duration = Duration::from_millis(100);
39const MIN_TX_CONFIRM_TIMEOUT: Duration = Duration::from_secs(1);
40const TX_TIMEOUT_MULTIPLIER: u32 = 2;
41const DEFAULT_SYNC_TOLERANCE_PCT: usize = 90;
42
43/// Configuration of the [`HoprBlockchainConnector`].
44#[derive(Clone, Copy, Debug, PartialEq, Eq, smart_default::SmartDefault)]
45pub struct BlockchainConnectorConfig {
46    /// Maximum time to wait for [connection](HoprBlockchainConnector::connect) to complete.
47    ///
48    /// Default is 30 seconds, minimum is 100 milliseconds.
49    #[default(Duration::from_secs(30))]
50    pub connection_sync_timeout: Duration,
51    /// Percentage of the total number of accounts and opened channels that must
52    /// be received during a [connection attempt](HoprBlockchainConnector::connect)
53    /// to be successful.
54    ///
55    /// Default is 90%, minimum is 1, maximum is 100.
56    #[default(DEFAULT_SYNC_TOLERANCE_PCT)]
57    pub sync_tolerance: usize,
58    /// Transaction waits for confirmation by multiplying chain's blocktime, finality, and this multiplier.
59    /// Set it to higher values if transactions are failing due to timeout at the client.
60    ///
61    /// Default is 2, minimum is 1.
62    #[default(TX_TIMEOUT_MULTIPLIER)]
63    pub tx_timeout_multiplier: u32,
64}
65
66/// A connector acting as middleware between the HOPR APIs (see the [`hopr_api`] crate) and the Blokli Client API (see
67/// the [`blokli_client`] crate).
68///
69/// The connector object cannot be cloned and shall be used inside an `Arc` if cloning is needed.
70pub struct HoprBlockchainConnector<C, B, P, R> {
71    payload_generator: P,
72    chain_key: ChainKeypair,
73    client: std::sync::Arc<C>,
74    graph: std::sync::Arc<parking_lot::RwLock<DiGraphMap<HoprKeyIdent, ChannelId, ahash::RandomState>>>,
75    backend: std::sync::Arc<B>,
76    connection_handle: Option<AbortHandle>,
77    sequencer: TransactionSequencer<C, R>,
78    events: EventsChannel,
79    cfg: BlockchainConnectorConfig,
80
81    // KeyId <-> OffchainPublicKey mapping
82    mapper: HoprKeyMapper<B>,
83    // Fast retrieval of chain keys by address
84    chain_to_packet: moka::future::Cache<Address, Option<OffchainPublicKey>, ahash::RandomState>,
85    // Fast retrieval of packet keys by chain key
86    packet_to_chain: moka::future::Cache<OffchainPublicKey, Option<Address>, ahash::RandomState>,
87    // Fast retrieval of channel entries by id
88    channel_by_id: moka::future::Cache<ChannelId, Option<ChannelEntry>, ahash::RandomState>,
89    // Fast retrieval of channel entries by parties
90    channel_by_parties: moka::future::Cache<ChannelParties, Option<ChannelEntry>, ahash::RandomState>,
91    // Contains chain info (no TTL - kept fresh by subscription handler)
92    values: moka::future::Cache<u32, ParsedChainInfo>,
93}
94
95const EXPECTED_NUM_NODES: usize = 10_000;
96const EXPECTED_NUM_CHANNELS: usize = 100_000;
97
98const DEFAULT_CACHE_TIMEOUT: Duration = Duration::from_mins(10);
99
100impl<B, C, P> HoprBlockchainConnector<C, B, P, P::TxRequest>
101where
102    B: Backend + Send + Sync + 'static,
103    C: BlokliSubscriptionClient + BlokliQueryClient + BlokliTransactionClient + Send + Sync + 'static,
104    P: PayloadGenerator + Send + Sync + 'static,
105    P::TxRequest: Send + Sync + 'static,
106{
107    /// Creates a new instance.
108    pub fn new(
109        chain_key: ChainKeypair,
110        cfg: BlockchainConnectorConfig,
111        client: C,
112        backend: B,
113        payload_generator: P,
114    ) -> Self {
115        let backend = std::sync::Arc::new(backend);
116        let (mut events_tx, events_rx) = async_broadcast::broadcast(1024);
117        events_tx.set_overflow(true);
118        events_tx.set_await_active(false);
119
120        let client = std::sync::Arc::new(client);
121        Self {
122            payload_generator,
123            graph: std::sync::Arc::new(parking_lot::RwLock::new(DiGraphMap::with_capacity_and_hasher(
124                EXPECTED_NUM_NODES,
125                EXPECTED_NUM_CHANNELS,
126                ahash::RandomState::default(),
127            ))),
128            backend: backend.clone(),
129            connection_handle: None,
130            sequencer: TransactionSequencer::new(chain_key.clone(), client.clone()),
131            events: (events_tx, events_rx.deactivate()),
132            client,
133            chain_key,
134            cfg,
135            mapper: HoprKeyMapper {
136                id_to_key: moka::sync::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
137                    .time_to_idle(DEFAULT_CACHE_TIMEOUT)
138                    .build_with_hasher(ahash::RandomState::default()),
139                key_to_id: moka::sync::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
140                    .time_to_idle(DEFAULT_CACHE_TIMEOUT)
141                    .build_with_hasher(ahash::RandomState::default()),
142                backend,
143            },
144            chain_to_packet: moka::future::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
145                .time_to_idle(DEFAULT_CACHE_TIMEOUT)
146                .build_with_hasher(ahash::RandomState::default()),
147            packet_to_chain: moka::future::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
148                .time_to_idle(DEFAULT_CACHE_TIMEOUT)
149                .build_with_hasher(ahash::RandomState::default()),
150            channel_by_id: moka::future::CacheBuilder::new(EXPECTED_NUM_CHANNELS as u64)
151                .time_to_idle(DEFAULT_CACHE_TIMEOUT)
152                .build_with_hasher(ahash::RandomState::default()),
153            channel_by_parties: moka::future::CacheBuilder::new(EXPECTED_NUM_CHANNELS as u64)
154                .time_to_idle(DEFAULT_CACHE_TIMEOUT)
155                .build_with_hasher(ahash::RandomState::default()),
156            // No TTL: kept fresh by the Blokli subscription handler
157            values: moka::future::CacheBuilder::new(1).build(),
158        }
159    }
160
161    async fn do_connect(&self, timeout: Duration) -> Result<AbortHandle, ConnectorError> {
162        let sync_quota = self.cfg.sync_tolerance.clamp(1, 100) as f64 / 100.0;
163        let min_accounts = (self
164            .client
165            .count_accounts(blokli_client::api::AccountSelector::Any)
166            .await? as f64
167            * sync_quota)
168            .round() as u32;
169        let min_channels = (self
170            .client
171            .count_channels(blokli_client::api::ChannelSelector {
172                filter: None,
173                status: Some(blokli_client::api::types::ChannelStatus::Open),
174            })
175            .await? as f64
176            * sync_quota)
177            .round() as u32;
178        tracing::debug!(min_accounts, min_channels, "connection thresholds");
179
180        let server_health = self.client.query_health().await?;
181        if !server_health.eq_ignore_ascii_case("OK") {
182            tracing::error!(server_health, "blokli server not healthy");
183            return Err(ConnectorError::ServerNotHealthy);
184        }
185
186        let (abort_handle, abort_reg) = AbortHandle::new_pair();
187
188        let (connection_ready_tx, connection_ready_rx) = futures::channel::oneshot::channel();
189        let mut connection_ready_tx = Some(connection_ready_tx);
190
191        let client = self.client.clone();
192        let mapper = self.mapper.clone();
193        let backend = self.backend.clone();
194        let graph = self.graph.clone();
195        let event_tx = self.events.0.clone();
196        let me = self.chain_key.public().to_address();
197        let values_cache = self.values.clone();
198
199        let chain_to_packet = self.chain_to_packet.clone();
200        let packet_to_chain = self.packet_to_chain.clone();
201
202        let channel_by_id = self.channel_by_id.clone();
203        let channel_by_parties = self.channel_by_parties.clone();
204
205        // Query chain info to populate the cache
206        self.query_cached_chain_info().await?;
207
208        #[allow(clippy::large_enum_variant)]
209        #[derive(Debug)]
210        enum SubscribedEventType {
211            Account((AccountEntry, Option<AccountEntry>)),
212            Channel((ChannelEntry, Option<Vec<ChannelChange>>)),
213            WinningProbability((WinningProbability, Option<WinningProbability>)),
214            TicketPrice((HoprBalance, Option<HoprBalance>)),
215        }
216
217        hopr_async_runtime::prelude::spawn(async move {
218            let sync_started = std::time::Instant::now();
219
220            let connections = client
221                .subscribe_accounts(blokli_client::api::AccountSelector::Any)
222                .and_then(|accounts| Ok((accounts, client.subscribe_graph()?)))
223                .and_then(|(accounts, channels)| Ok((accounts, channels, client.subscribe_ticket_params()?)));
224
225            if let Err(error) = connections {
226                if let Some(connection_ready_tx) = connection_ready_tx.take() {
227                    let _ = connection_ready_tx.send(Err(error));
228                }
229                return;
230            }
231
232            let (account_stream, channel_stream, ticket_params_stream) = connections.unwrap();
233
234            // Stream of Account events (Announcements)
235            let graph_clone = graph.clone();
236            let account_stream = account_stream
237                .inspect_ok(|entry| tracing::trace!(?entry, "new account event"))
238                .map_err(ConnectorError::from)
239                .try_filter_map(|account| futures::future::ready(model_to_account_entry(account).map(Some)))
240                .and_then(move |account| {
241                    let graph = graph_clone.clone();
242                    let mapper = mapper.clone();
243                    let chain_to_packet = chain_to_packet.clone();
244                    let packet_to_chain = packet_to_chain.clone();
245                    hopr_async_runtime::prelude::spawn_blocking(move || {
246                        mapper.key_to_id.insert(account.public_key, Some(account.key_id));
247                        mapper.id_to_key.insert(account.key_id, Some(account.public_key));
248                        graph.write().add_node(account.key_id);
249                        mapper.backend.insert_account(account.clone()).map(|old| (account, old))
250                    })
251                    .map_err(ConnectorError::backend)
252                    .and_then(move |res| {
253                        let chain_to_packet = chain_to_packet.clone();
254                        let packet_to_chain = packet_to_chain.clone();
255                        async move {
256                            if let Ok((upserted_account, _)) = &res {
257                                // Rather update the cached entry than invalidating it
258                                chain_to_packet
259                                    .insert(upserted_account.chain_addr, Some(upserted_account.public_key))
260                                    .await;
261                                packet_to_chain
262                                    .insert(upserted_account.public_key, Some(upserted_account.chain_addr))
263                                    .await;
264                            }
265                            res.map(SubscribedEventType::Account).map_err(ConnectorError::backend)
266                        }
267                    })
268                })
269                .fuse();
270
271            // Stream of channel graph updates
272            let channel_stream = channel_stream
273                .map_err(ConnectorError::from)
274                .inspect_ok(|entry| tracing::trace!(?entry, "new graph event"))
275                .try_filter_map(|graph_event| futures::future::ready(model_to_graph_entry(graph_event).map(Some)))
276                .and_then(move |(src, dst, channel)| {
277                    let graph = graph.clone();
278                    let backend = backend.clone();
279                    let channel_by_id = channel_by_id.clone();
280                    let channel_by_parties = channel_by_parties.clone();
281                    hopr_async_runtime::prelude::spawn_blocking(move || {
282                        graph.write().add_edge(src.key_id, dst.key_id, *channel.get_id());
283                        backend
284                            .insert_channel(channel)
285                            .map(|old| (channel, old.map(|old| old.diff(&channel))))
286                    })
287                    .map_err(ConnectorError::backend)
288                    .and_then(move |res| {
289                        let channel_by_id = channel_by_id.clone();
290                        let channel_by_parties = channel_by_parties.clone();
291                        async move {
292                            if let Ok((upserted_channel, _)) = &res {
293                                // Rather update the cached entry than invalidating it
294                                channel_by_id
295                                    .insert(*upserted_channel.get_id(), Some(*upserted_channel))
296                                    .await;
297                                channel_by_parties
298                                    .insert(ChannelParties::from(upserted_channel), Some(*upserted_channel))
299                                    .await;
300                            }
301                            res.map(SubscribedEventType::Channel).map_err(ConnectorError::backend)
302                        }
303                    })
304                })
305                .fuse();
306
307            // Stream of ticket parameter updates (ticket price, minimum winning probability)
308            let ticket_params_stream = ticket_params_stream
309                .map_err(ConnectorError::from)
310                .inspect_ok(|entry| tracing::trace!(?entry, "new ticket params"))
311                .try_filter_map(|ticket_value_event| {
312                    futures::future::ready(model_to_ticket_params(ticket_value_event).map(Some))
313                })
314                .and_then(|(new_ticket_price, new_win_prob)| {
315                    let values_cache = values_cache.clone();
316                    async move {
317                        let mut events = Vec::<SubscribedEventType>::new();
318                        values_cache
319                            .entry(CHAIN_INFO_CACHE_KEY)
320                            .and_compute_with(|cached_entry| {
321                                futures::future::ready(match cached_entry {
322                                    Some(chain_info) => {
323                                        let mut chain_info = chain_info.into_value();
324                                        if chain_info.ticket_price != new_ticket_price {
325                                            events.push(SubscribedEventType::TicketPrice((
326                                                new_ticket_price,
327                                                Some(chain_info.ticket_price),
328                                            )));
329                                            chain_info.ticket_price = new_ticket_price;
330                                        }
331                                        if !chain_info.ticket_win_prob.approx_eq(&new_win_prob) {
332                                            events.push(SubscribedEventType::WinningProbability((
333                                                new_win_prob,
334                                                Some(chain_info.ticket_win_prob),
335                                            )));
336                                            chain_info.ticket_win_prob = new_win_prob;
337                                        }
338
339                                        if !events.is_empty() {
340                                            moka::ops::compute::Op::Put(chain_info)
341                                        } else {
342                                            moka::ops::compute::Op::Nop
343                                        }
344                                    }
345                                    None => {
346                                        tracing::warn!(
347                                            "chain info not present in the cache before ticket params update"
348                                        );
349                                        events.push(SubscribedEventType::TicketPrice((new_ticket_price, None)));
350                                        events.push(SubscribedEventType::WinningProbability((new_win_prob, None)));
351                                        moka::ops::compute::Op::Nop
352                                    }
353                                })
354                            })
355                            .await;
356                        Ok(futures::stream::iter(events).map(Ok::<_, ConnectorError>))
357                    }
358                })
359                .try_flatten()
360                .fuse();
361
362            let mut account_counter = 0;
363            let mut channel_counter = 0;
364            if min_accounts == 0 && min_channels == 0 {
365                tracing::info!(account_counter, channel_counter, time = ?sync_started.elapsed(), "on-chain graph has been synced");
366                let _ = connection_ready_tx.take().unwrap().send(Ok(()));
367            }
368
369            futures::stream::Abortable::new(
370                (account_stream, channel_stream, ticket_params_stream).merge(),
371                abort_reg,
372            )
373            .inspect_ok(move |event_type| {
374                if connection_ready_tx.is_some() {
375                    match event_type {
376                        SubscribedEventType::Account(_) => account_counter += 1,
377                        SubscribedEventType::Channel(_) => channel_counter += 1,
378                        _ => {}
379                    }
380
381                    let pct_synced =
382                        ((account_counter + channel_counter) * 100 / (min_accounts + min_channels)).clamp(0, 100);
383                    tracing::debug!(
384                        pct_synced,
385                        sync_quota,
386                        account_counter,
387                        channel_counter,
388                        "percentage of connection quota synced"
389                    );
390
391                    // Send the completion notification
392                    // once we reach the expected number of accounts and channels with
393                    // the given tolerance
394                    if account_counter >= min_accounts && channel_counter >= min_channels {
395                        tracing::info!(account_counter, channel_counter, time = ?sync_started.elapsed(), "on-chain graph has been synced");
396                        let _ = connection_ready_tx.take().unwrap().send(Ok(()));
397                    }
398                }
399            })
400            .for_each(|event_type| {
401                let event_tx = event_tx.clone();
402                async move {
403                    match event_type {
404                        Ok(SubscribedEventType::Account((new_account, old_account))) => {
405                            tracing::debug!(%new_account, "account inserted");
406                            // We only track public accounts as events and also
407                            // broadcast announcements of already existing accounts (old_account == None).
408                            if new_account.has_announced_with_routing_info()
409                                && old_account.is_none_or(|a| !a.has_announced_with_routing_info())
410                            {
411                                tracing::debug!(account = %new_account, "new announcement");
412                                let _ = event_tx
413                                    .broadcast_direct(ChainEvent::Announcement(new_account.clone()))
414                                    .await;
415                            }
416                        }
417                        Ok(SubscribedEventType::Channel((new_channel, Some(changes)))) => {
418                            tracing::debug!(
419                                id = %new_channel.get_id(),
420                                src = %new_channel.source, dst = %new_channel.destination,
421                                num_changes = changes.len(),
422                                "channel updated"
423                            );
424                            process_channel_changes_into_events(new_channel, changes, &me, &event_tx).await;
425                        }
426                        Ok(SubscribedEventType::Channel((new_channel, None))) => {
427                            tracing::debug!(
428                                id = %new_channel.get_id(),
429                                src = %new_channel.source, dst = %new_channel.destination,
430                                "channel opened"
431                            );
432                            let _ = event_tx.broadcast_direct(ChainEvent::ChannelOpened(new_channel)).await;
433                        }
434                        Ok(SubscribedEventType::WinningProbability((new, old))) => {
435                            let old = old.unwrap_or_default();
436                            match new.approx_cmp(&old) {
437                                Ordering::Less => {
438                                    tracing::debug!(%new, %old, "winning probability decreased");
439                                    let _ = event_tx
440                                        .broadcast_direct(ChainEvent::WinningProbabilityDecreased(new))
441                                        .await;
442                                }
443                                Ordering::Greater => {
444                                    tracing::debug!(%new, %old, "winning probability increased");
445                                    let _ = event_tx
446                                        .broadcast_direct(ChainEvent::WinningProbabilityIncreased(new))
447                                        .await;
448                                }
449                                Ordering::Equal => {}
450                            }
451                        }
452                        Ok(SubscribedEventType::TicketPrice((new, old))) => {
453                            tracing::debug!(%new, ?old, "ticket price changed");
454                            let _ = event_tx.broadcast_direct(ChainEvent::TicketPriceChanged(new)).await;
455                        }
456                        Err(error) => {
457                            tracing::error!(%error, "error processing account/graph/ticket params subscription");
458                        }
459                    }
460                }
461            })
462            .await;
463        });
464
465        connection_ready_rx
466            .timeout(futures_time::time::Duration::from(timeout))
467            .map(|res| match res {
468                Ok(Ok(Ok(_))) => Ok(abort_handle),
469                Ok(Ok(Err(error))) => {
470                    abort_handle.abort();
471                    Err(ConnectorError::from(error))
472                }
473                Ok(Err(_)) => {
474                    abort_handle.abort();
475                    Err(ConnectorError::InvalidState("failed to determine connection state"))
476                }
477                Err(_) => {
478                    abort_handle.abort();
479                    tracing::error!(min_accounts, min_channels, "connection timeout when syncing");
480                    Err(ConnectorError::ConnectionTimeout)
481                }
482            })
483            .await
484    }
485
486    /// Connects to the chain using the underlying client, syncs all on-chain data,
487    /// and subscribes for all future updates.
488    ///
489    /// If the connection does not finish within
490    /// [`BlockchainConnectorConfig::connection_timeout`](BlockchainConnectorConfig)
491    /// the [`ConnectorError::ConnectionTimeout`] error is returned.
492    ///
493    /// Most of the operations with the Connector will fail if it is not connected first.
494    ///
495    /// There are some notable exceptions that DO NOT require a prior call to `connect`:
496    /// - all the [`ChainValues`](hopr_api::chain::ChainValues) methods,
497    /// - all the [`ChainReadSafeOperations`](hopr_api::chain::ChainReadSafeOperations) methods,
498    /// - all the [`ChainWriteSafeOperations`](hopr_api::chain::ChainWriteSafeOperations) methods,
499    /// - [`me`](hopr_api::chain::ChainReadChannelOperations::me)
500    ///
501    /// If you wish to only call operations from the above Chain APIs, consider constructing
502    /// the [`HoprBlockchainReader`](crate::HoprBlockchainReader) instead.
503    pub async fn connect(&mut self) -> Result<(), ConnectorError> {
504        if self
505            .connection_handle
506            .as_ref()
507            .filter(|handle| !handle.is_aborted())
508            .is_some()
509        {
510            return Err(ConnectorError::InvalidState("connector is already connected"));
511        }
512
513        let abort_handle = self
514            .do_connect(self.cfg.connection_sync_timeout.max(MIN_CONNECTION_TIMEOUT))
515            .await?;
516
517        self.connection_handle = Some(abort_handle);
518
519        tracing::info!(node = %self.chain_key.public().to_address(), "connected to chain as node");
520        Ok(())
521    }
522
523    /// Returns the reference to the underlying client.
524    pub fn client(&self) -> &C {
525        self.client.as_ref()
526    }
527
528    /// Checks if the connector is [connected](HoprBlockchainConnector::connect) to the chain.
529    pub fn is_connected(&self) -> bool {
530        self.check_connection_state().is_ok()
531    }
532}
533
534impl<B, C, P> HoprBlockchainConnector<C, B, P, P::TxRequest>
535where
536    C: BlokliTransactionClient + BlokliQueryClient + Send + Sync + 'static,
537    P: PayloadGenerator + Send + Sync,
538    P::TxRequest: Send + Sync,
539{
540    async fn send_tx<'a>(
541        &'a self,
542        tx_req: P::TxRequest,
543        custom_tx_multiplier: Option<u32>,
544    ) -> Result<impl Future<Output = Result<ChainReceipt, ConnectorError>> + Send + 'a, ConnectorError> {
545        let chain_info = self.query_cached_chain_info().await?;
546        let tx_timeout = custom_tx_multiplier.unwrap_or(self.cfg.tx_timeout_multiplier).max(1)
547            * chain_info.finality
548            * chain_info.expected_block_time;
549        Ok(self
550            .sequencer
551            .enqueue_transaction(tx_req, tx_timeout.max(MIN_TX_CONFIRM_TIMEOUT))
552            .await?
553            .and_then(|tx| {
554                futures::future::ready(
555                    ChainReceipt::from_str(&tx.transaction_hash.0)
556                        .map_err(|_| ConnectorError::TypeConversion("invalid tx hash".into())),
557                )
558            }))
559    }
560}
561
562impl<B, C, P, R> HoprBlockchainConnector<C, R, B, P> {
563    pub(crate) fn check_connection_state(&self) -> Result<(), ConnectorError> {
564        self.connection_handle
565            .as_ref()
566            .filter(|handle| !handle.is_aborted()) // Do a safety check
567            .ok_or(ConnectorError::InvalidState("connector is not connected"))
568            .map(|_| ())
569    }
570
571    /// Invalidates all cached on-chain data.
572    pub fn invalidate_caches(&self) {
573        self.channel_by_parties.invalidate_all();
574        self.channel_by_id.invalidate_all();
575        self.packet_to_chain.invalidate_all();
576        self.chain_to_packet.invalidate_all();
577        self.values.invalidate_all();
578    }
579}
580
581impl<B, C, P, R> Drop for HoprBlockchainConnector<C, R, B, P> {
582    fn drop(&mut self) {
583        self.events.0.close();
584        if let Some(abort_handle) = self.connection_handle.take() {
585            abort_handle.abort();
586        }
587    }
588}
589
590impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
591where
592    B: Backend + Send + Sync + 'static,
593    C: Send + Sync,
594    P: Send + Sync,
595    R: Send + Sync,
596{
597    /// Returns a [`PathAddressResolver`] using this connector.
598    pub fn as_path_resolver(&self) -> ChainPathResolver<'_, Self> {
599        self.into()
600    }
601}
602
603#[cfg(test)]
604pub(crate) mod tests {
605    use blokli_client::BlokliTestState;
606    use hex_literal::hex;
607    use hopr_api::types::chain::contract_addresses_for_network;
608
609    use super::*;
610    use crate::{InMemoryBackend, testing::BlokliTestStateBuilder};
611
612    pub const PRIVATE_KEY_1: [u8; 32] = hex!("c14b8faa0a9b8a5fa4453664996f23a7e7de606d42297d723fc4a794f375e260");
613    pub const PRIVATE_KEY_2: [u8; 32] = hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775");
614    pub const MODULE_ADDR: [u8; 20] = hex!("1111111111111111111111111111111111111111");
615
616    pub type TestConnector<C> = HoprBlockchainConnector<
617        C,
618        InMemoryBackend,
619        SafePayloadGenerator,
620        <SafePayloadGenerator as PayloadGenerator>::TxRequest,
621    >;
622
623    pub fn create_connector<C>(blokli_client: C) -> anyhow::Result<TestConnector<C>>
624    where
625        C: BlokliQueryClient + BlokliTransactionClient + BlokliSubscriptionClient + Send + Sync + 'static,
626    {
627        let ckp = ChainKeypair::from_secret(&PRIVATE_KEY_1)?;
628
629        Ok(HoprBlockchainConnector::new(
630            ckp.clone(),
631            Default::default(),
632            blokli_client,
633            InMemoryBackend::default(),
634            SafePayloadGenerator::new(
635                &ckp,
636                contract_addresses_for_network("rotsee").unwrap().1,
637                MODULE_ADDR.into(),
638            ),
639        ))
640    }
641
642    #[tokio::test]
643    async fn connector_should_connect() -> anyhow::Result<()> {
644        let blokli_client = BlokliTestStateBuilder::default().build_static_client();
645
646        let mut connector = create_connector(blokli_client)?;
647        connector.connect().await?;
648
649        assert!(connector.is_connected());
650
651        Ok(())
652    }
653
654    #[tokio::test]
655    async fn connector_should_not_connect_when_blokli_not_healthy() -> anyhow::Result<()> {
656        let state = BlokliTestState {
657            health: "DOWN".into(),
658            ..Default::default()
659        };
660
661        let blokli_client = BlokliTestStateBuilder::from(state).build_static_client();
662
663        let mut connector = create_connector(blokli_client)?;
664
665        let res = connector.connect().await;
666
667        assert!(matches!(res, Err(ConnectorError::ServerNotHealthy)));
668        assert!(!connector.is_connected());
669
670        Ok(())
671    }
672}