Skip to main content

hopr_chain_connector/connector/
mod.rs

1use std::{cmp::Ordering, str::FromStr, sync::atomic::Ordering as AtomicOrdering, time::Duration};
2
3use blokli_client::api::{BlokliQueryClient, BlokliSubscriptionClient, BlokliTransactionClient};
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
5use futures_concurrency::stream::Merge;
6use futures_time::future::FutureExt as FuturesTimeExt;
7use hopr_api::{
8    chain::{ChainPathResolver, ChainReceipt, HoprKeyIdent},
9    types::{chain::prelude::*, crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
10};
11use hopr_utils::runtime::AbortHandle;
12use petgraph::prelude::DiGraphMap;
13
14use crate::{
15    backend::Backend,
16    connector::{keys::HoprKeyMapper, sequencer::TransactionSequencer, values::CHAIN_INFO_CACHE_KEY},
17    errors::ConnectorError,
18    utils::{
19        ParsedChainInfo, model_to_account_entry, model_to_graph_entry, model_to_ticket_params,
20        process_channel_changes_into_events,
21    },
22};
23
24mod accounts;
25mod channels;
26mod events;
27mod keys;
28mod safe;
29mod sequencer;
30mod tickets;
31mod values;
32
33type EventsChannel = (
34    async_broadcast::Sender<ChainEvent>,
35    async_broadcast::InactiveReceiver<ChainEvent>,
36);
37
38const MIN_CONNECTION_TIMEOUT: Duration = Duration::from_millis(100);
39const MIN_TX_CONFIRM_TIMEOUT: Duration = Duration::from_secs(1);
40const TX_TIMEOUT_MULTIPLIER: u32 = 2;
41const DEFAULT_SYNC_TOLERANCE_PCT: usize = 90;
42
43/// Connector health states.
44///
45/// Each value maps to a `ComponentStatus` variant plus a fixed detail message.
46/// Storage and updates are lock-free via [`AtomicChainHealthState`]; reads
47/// convert to `ComponentStatus` which allocates for non-`Ready` variants.
48#[atomic_enum::atomic_enum]
49#[derive(PartialEq, Eq)]
50enum ChainHealthState {
51    Ready = 0,
52    WaitingForConnection = 1,
53    Connecting = 2,
54    SubscriptionEnded = 3,
55    SyncTimedOut = 4,
56    ServerNotHealthy = 5,
57    ConnectionFailed = 6,
58    Dropped = 7,
59}
60
61impl From<ChainHealthState> for hopr_api::node::ComponentStatus {
62    fn from(state: ChainHealthState) -> Self {
63        match state {
64            ChainHealthState::Ready => Self::Ready,
65            ChainHealthState::WaitingForConnection => Self::Initializing("waiting for chain connection".into()),
66            ChainHealthState::Connecting => Self::Initializing("connecting to blokli".into()),
67            ChainHealthState::SubscriptionEnded => Self::Degraded("chain subscription ended".into()),
68            ChainHealthState::SyncTimedOut => Self::Degraded("connection sync timed out".into()),
69            ChainHealthState::ServerNotHealthy => Self::Unavailable("blokli server not healthy".into()),
70            ChainHealthState::ConnectionFailed => Self::Unavailable("chain connection failed".into()),
71            ChainHealthState::Dropped => Self::Unavailable("connector dropped".into()),
72        }
73    }
74}
75
76/// Configuration of the [`HoprBlockchainConnector`].
77#[derive(Clone, Copy, Debug, PartialEq, Eq, smart_default::SmartDefault)]
78pub struct BlockchainConnectorConfig {
79    /// Maximum time to wait for [connection](HoprBlockchainConnector::connect) to complete.
80    ///
81    /// Default is 30 seconds, minimum is 100 milliseconds.
82    #[default(Duration::from_secs(30))]
83    pub connection_sync_timeout: Duration,
84    /// Percentage of the total number of accounts and opened channels that must
85    /// be received during a [connection attempt](HoprBlockchainConnector::connect)
86    /// to be successful.
87    ///
88    /// Default is 90%, minimum is 1, maximum is 100.
89    #[default(DEFAULT_SYNC_TOLERANCE_PCT)]
90    pub sync_tolerance: usize,
91    /// Transaction waits for confirmation by multiplying chain's blocktime, finality, and this multiplier.
92    /// Set it to higher values if transactions are failing due to timeout at the client.
93    ///
94    /// Default is 2, minimum is 1.
95    #[default(TX_TIMEOUT_MULTIPLIER)]
96    pub tx_timeout_multiplier: u32,
97}
98
99/// A connector acting as middleware between the HOPR APIs (see the [`hopr_api`] crate) and the Blokli Client API (see
100/// the [`blokli_client`] crate).
101///
102/// The connector object cannot be cloned and shall be used inside an `Arc` if cloning is needed.
103pub struct HoprBlockchainConnector<C, B, P, R> {
104    payload_generator: P,
105    chain_key: ChainKeypair,
106    client: std::sync::Arc<C>,
107    graph: std::sync::Arc<parking_lot::RwLock<DiGraphMap<HoprKeyIdent, ChannelId, ahash::RandomState>>>,
108    backend: std::sync::Arc<B>,
109    connection_handle: Option<AbortHandle>,
110    sequencer: TransactionSequencer<C, R>,
111    events: EventsChannel,
112    cfg: BlockchainConnectorConfig,
113    health: std::sync::Arc<AtomicChainHealthState>,
114
115    // KeyId <-> OffchainPublicKey mapping
116    mapper: HoprKeyMapper<B>,
117    // Fast retrieval of chain keys by address
118    chain_to_packet: moka::sync::Cache<Address, Option<OffchainPublicKey>, ahash::RandomState>,
119    // Fast retrieval of packet keys by chain key
120    packet_to_chain: moka::sync::Cache<OffchainPublicKey, Option<Address>, ahash::RandomState>,
121    // Fast retrieval of channel entries by id
122    channel_by_id: moka::sync::Cache<ChannelId, Option<ChannelEntry>, ahash::RandomState>,
123    // Fast retrieval of channel entries by parties
124    channel_by_parties: moka::sync::Cache<ChannelParties, Option<ChannelEntry>, ahash::RandomState>,
125    // Contains chain info (no TTL - kept fresh by subscription handler)
126    values: moka::future::Cache<u32, ParsedChainInfo>,
127    // Ticket values (winning probability, price), kept fresh by subscription handler
128    // Set only when connected
129    ticket_values: std::sync::Arc<parking_lot::RwLock<Option<(WinningProbability, HoprBalance)>>>,
130}
131
132const EXPECTED_NUM_NODES: usize = 10_000;
133const EXPECTED_NUM_CHANNELS: usize = 100_000;
134
135const DEFAULT_CACHE_TIMEOUT: Duration = Duration::from_mins(10);
136
137impl<B, C, P> HoprBlockchainConnector<C, B, P, P::TxRequest>
138where
139    B: Backend + Send + Sync + 'static,
140    C: BlokliSubscriptionClient + BlokliQueryClient + BlokliTransactionClient + Send + Sync + 'static,
141    P: PayloadGenerator + Send + Sync + 'static,
142    P::TxRequest: Send + Sync + 'static,
143{
144    /// Creates a new instance.
145    pub fn new(
146        chain_key: ChainKeypair,
147        cfg: BlockchainConnectorConfig,
148        client: C,
149        backend: B,
150        payload_generator: P,
151    ) -> Self {
152        let backend = std::sync::Arc::new(backend);
153        let (mut events_tx, events_rx) = async_broadcast::broadcast(1024);
154        events_tx.set_overflow(true);
155        events_tx.set_await_active(false);
156
157        let client = std::sync::Arc::new(client);
158        Self {
159            payload_generator,
160            health: std::sync::Arc::new(AtomicChainHealthState::new(ChainHealthState::WaitingForConnection)),
161            graph: std::sync::Arc::new(parking_lot::RwLock::new(DiGraphMap::with_capacity_and_hasher(
162                EXPECTED_NUM_NODES,
163                EXPECTED_NUM_CHANNELS,
164                ahash::RandomState::default(),
165            ))),
166            backend: backend.clone(),
167            connection_handle: None,
168            sequencer: TransactionSequencer::new(chain_key.clone(), client.clone()),
169            events: (events_tx, events_rx.deactivate()),
170            client,
171            chain_key,
172            cfg,
173            mapper: HoprKeyMapper {
174                id_to_key: moka::sync::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
175                    .time_to_idle(DEFAULT_CACHE_TIMEOUT)
176                    .build_with_hasher(ahash::RandomState::default()),
177                key_to_id: moka::sync::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
178                    .time_to_idle(DEFAULT_CACHE_TIMEOUT)
179                    .build_with_hasher(ahash::RandomState::default()),
180                backend,
181            },
182            chain_to_packet: moka::sync::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
183                .time_to_idle(DEFAULT_CACHE_TIMEOUT)
184                .build_with_hasher(ahash::RandomState::default()),
185            packet_to_chain: moka::sync::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
186                .time_to_idle(DEFAULT_CACHE_TIMEOUT)
187                .build_with_hasher(ahash::RandomState::default()),
188            channel_by_id: moka::sync::CacheBuilder::new(EXPECTED_NUM_CHANNELS as u64)
189                .time_to_idle(DEFAULT_CACHE_TIMEOUT)
190                .build_with_hasher(ahash::RandomState::default()),
191            channel_by_parties: moka::sync::CacheBuilder::new(EXPECTED_NUM_CHANNELS as u64)
192                .time_to_idle(DEFAULT_CACHE_TIMEOUT)
193                .build_with_hasher(ahash::RandomState::default()),
194            // No TTL: kept fresh by the Blokli subscription handler
195            values: moka::future::CacheBuilder::new(1).build(),
196            ticket_values: Default::default(),
197        }
198    }
199
200    async fn do_connect(&self, timeout: Duration) -> Result<AbortHandle, ConnectorError> {
201        let sync_quota = self.cfg.sync_tolerance.clamp(1, 100) as f64 / 100.0;
202        let min_accounts = (self
203            .client
204            .count_accounts(blokli_client::api::AccountSelector::Any)
205            .await? as f64
206            * sync_quota)
207            .round() as u32;
208        let min_channels = (self
209            .client
210            .query_channel_stats(blokli_client::api::ChannelSelector {
211                filter: None,
212                status: Some(blokli_client::api::types::ChannelStatus::Open),
213                ..Default::default()
214            })
215            .await?
216            .count as f64
217            * sync_quota)
218            .round() as u32;
219        tracing::debug!(min_accounts, min_channels, "connection thresholds");
220
221        let server_health = self.client.query_health().await?;
222        if !server_health.eq_ignore_ascii_case("OK") {
223            tracing::error!(server_health, "blokli server not healthy");
224            return Err(ConnectorError::ServerNotHealthy);
225        }
226
227        let (abort_handle, abort_reg) = AbortHandle::new_pair();
228
229        let (connection_ready_tx, connection_ready_rx) = futures::channel::oneshot::channel();
230        let mut connection_ready_tx = Some(connection_ready_tx);
231
232        let client = self.client.clone();
233        let mapper = self.mapper.clone();
234        let backend = self.backend.clone();
235        let graph = self.graph.clone();
236        let event_tx = self.events.0.clone();
237        let me = self.chain_key.public().to_address();
238        let values_cache = self.values.clone();
239
240        let chain_to_packet = self.chain_to_packet.clone();
241        let packet_to_chain = self.packet_to_chain.clone();
242
243        let channel_by_id = self.channel_by_id.clone();
244        let channel_by_parties = self.channel_by_parties.clone();
245
246        // Query chain info to populate the cache
247        let initial_chain_values = self.query_cached_chain_info().await?;
248        self.ticket_values
249            .write()
250            .replace((initial_chain_values.ticket_win_prob, initial_chain_values.ticket_price));
251
252        #[allow(clippy::large_enum_variant)]
253        #[derive(Debug)]
254        enum SubscribedEventType {
255            Account((AccountEntry, Option<AccountEntry>)),
256            Channel((ChannelEntry, Option<Vec<ChannelChange>>)),
257            WinningProbability((WinningProbability, Option<WinningProbability>)),
258            TicketPrice((HoprBalance, Option<HoprBalance>)),
259        }
260
261        let ticket_values = self.ticket_values.clone();
262        let health = self.health.clone();
263        hopr_utils::runtime::prelude::spawn(async move {
264            let sync_started = std::time::Instant::now();
265
266            let connections = client
267                .subscribe_accounts(blokli_client::api::AccountSelector::Any)
268                .and_then(|accounts| Ok((accounts, client.subscribe_graph()?)))
269                .and_then(|(accounts, channels)| Ok((accounts, channels, client.subscribe_ticket_params()?)));
270
271            if let Err(error) = connections {
272                if let Some(connection_ready_tx) = connection_ready_tx.take() {
273                    let _ = connection_ready_tx.send(Err(error));
274                }
275                return;
276            }
277
278            let (account_stream, channel_stream, ticket_params_stream) = connections.unwrap();
279
280            // Stream of Account events (Announcements)
281            let graph_clone = graph.clone();
282            let account_stream = account_stream
283                .inspect_ok(|entry| tracing::trace!(?entry, "new account event"))
284                .map_err(ConnectorError::from)
285                .try_filter_map(|account| futures::future::ready(model_to_account_entry(account).map(Some)))
286                .and_then(move |account| {
287                    let graph = graph_clone.clone();
288                    let mapper = mapper.clone();
289                    let chain_to_packet = chain_to_packet.clone();
290                    let packet_to_chain = packet_to_chain.clone();
291                    hopr_utils::runtime::prelude::spawn_blocking(move || {
292                        mapper.key_to_id.insert(account.public_key, Some(account.key_id));
293                        mapper.id_to_key.insert(account.key_id, Some(account.public_key));
294                        graph.write().add_node(account.key_id);
295                        mapper.backend.insert_account(account.clone()).map(|old| (account, old))
296                    })
297                    .map_err(ConnectorError::backend)
298                    .and_then(move |res| {
299                        if let Ok((upserted_account, _)) = &res {
300                            // Rather update the cached entry than invalidating it
301                            chain_to_packet.insert(upserted_account.chain_addr, Some(upserted_account.public_key));
302                            packet_to_chain.insert(upserted_account.public_key, Some(upserted_account.chain_addr));
303                        }
304                        futures::future::ready(res.map(SubscribedEventType::Account).map_err(ConnectorError::backend))
305                    })
306                })
307                .fuse();
308
309            // Stream of channel graph updates
310            let channel_stream = channel_stream
311                .map_err(ConnectorError::from)
312                .inspect_ok(|entry| tracing::trace!(?entry, "new graph event"))
313                .try_filter_map(|graph_event| futures::future::ready(model_to_graph_entry(graph_event).map(Some)))
314                .and_then(move |(src, dst, channel)| {
315                    let graph = graph.clone();
316                    let backend = backend.clone();
317                    let channel_by_id = channel_by_id.clone();
318                    let channel_by_parties = channel_by_parties.clone();
319                    hopr_utils::runtime::prelude::spawn_blocking(move || {
320                        graph.write().add_edge(src.key_id, dst.key_id, *channel.get_id());
321                        backend
322                            .insert_channel(channel)
323                            .map(|old| (channel, old.map(|old| old.diff(&channel))))
324                    })
325                    .map_err(ConnectorError::backend)
326                    .and_then(move |res| {
327                        let channel_by_id = channel_by_id.clone();
328                        let channel_by_parties = channel_by_parties.clone();
329                        if let Ok((upserted_channel, _)) = &res {
330                            // Rather update the cached entry than invalidating it
331                            channel_by_id.insert(*upserted_channel.get_id(), Some(*upserted_channel));
332                            channel_by_parties.insert(ChannelParties::from(upserted_channel), Some(*upserted_channel));
333                        }
334                        futures::future::ready(res.map(SubscribedEventType::Channel).map_err(ConnectorError::backend))
335                    })
336                })
337                .fuse();
338
339            // Stream of ticket parameter updates (ticket price, minimum winning probability)
340            let ticket_params_stream = ticket_params_stream
341                .map_err(ConnectorError::from)
342                .inspect_ok(|entry| tracing::trace!(?entry, "new ticket params"))
343                .try_filter_map(|ticket_value_event| {
344                    futures::future::ready(model_to_ticket_params(ticket_value_event).map(Some))
345                })
346                .inspect_ok(|(new_ticket_price, new_win_prob)| {
347                    // This cannot block, because there are no other concurrent writers/upgradeable readers
348                    let tv = ticket_values.upgradable_read();
349                    if let Some((current_win_prob, current_ticket_price)) = tv.as_ref().copied() {
350                        if &current_ticket_price != new_ticket_price && !current_win_prob.approx_eq(new_win_prob) {
351                            parking_lot::RwLockUpgradableReadGuard::upgrade(tv)
352                                .replace((*new_win_prob, *new_ticket_price));
353                        } else if &current_ticket_price != new_ticket_price {
354                            parking_lot::RwLockUpgradableReadGuard::upgrade(tv)
355                                .replace((current_win_prob, *new_ticket_price));
356                        } else if !current_win_prob.approx_eq(new_win_prob) {
357                            parking_lot::RwLockUpgradableReadGuard::upgrade(tv)
358                                .replace((*new_win_prob, current_ticket_price));
359                        }
360                    }
361                })
362                .and_then(|(new_ticket_price, new_win_prob)| {
363                    let values_cache = values_cache.clone();
364                    async move {
365                        let mut events = Vec::<SubscribedEventType>::new();
366                        values_cache
367                            .entry(CHAIN_INFO_CACHE_KEY)
368                            .and_compute_with(|cached_entry| {
369                                futures::future::ready(match cached_entry {
370                                    Some(chain_info) => {
371                                        let mut chain_info = chain_info.into_value();
372                                        if chain_info.ticket_price != new_ticket_price {
373                                            events.push(SubscribedEventType::TicketPrice((
374                                                new_ticket_price,
375                                                Some(chain_info.ticket_price),
376                                            )));
377                                            chain_info.ticket_price = new_ticket_price;
378                                        }
379                                        if !chain_info.ticket_win_prob.approx_eq(&new_win_prob) {
380                                            events.push(SubscribedEventType::WinningProbability((
381                                                new_win_prob,
382                                                Some(chain_info.ticket_win_prob),
383                                            )));
384                                            chain_info.ticket_win_prob = new_win_prob;
385                                        }
386
387                                        if !events.is_empty() {
388                                            moka::ops::compute::Op::Put(chain_info)
389                                        } else {
390                                            moka::ops::compute::Op::Nop
391                                        }
392                                    }
393                                    None => {
394                                        tracing::warn!(
395                                            "chain info not present in the cache before ticket params update"
396                                        );
397                                        events.push(SubscribedEventType::TicketPrice((new_ticket_price, None)));
398                                        events.push(SubscribedEventType::WinningProbability((new_win_prob, None)));
399                                        moka::ops::compute::Op::Nop
400                                    }
401                                })
402                            })
403                            .await;
404                        Ok(futures::stream::iter(events).map(Ok::<_, ConnectorError>))
405                    }
406                })
407                .try_flatten()
408                .fuse();
409
410            let mut account_counter = 0;
411            let mut channel_counter = 0;
412            if min_accounts == 0 && min_channels == 0 {
413                tracing::info!(account_counter, channel_counter, time = ?sync_started.elapsed(), "on-chain graph has been synced");
414                let _ = connection_ready_tx.take().unwrap().send(Ok(()));
415            }
416
417            futures::stream::Abortable::new(
418                (account_stream, channel_stream, ticket_params_stream).merge(),
419                abort_reg,
420            )
421            .inspect_ok(move |event_type| {
422                if connection_ready_tx.is_some() {
423                    match event_type {
424                        SubscribedEventType::Account(_) => account_counter += 1,
425                        SubscribedEventType::Channel(_) => channel_counter += 1,
426                        _ => {}
427                    }
428
429                    let pct_synced =
430                        ((account_counter + channel_counter) * 100 / (min_accounts + min_channels)).clamp(0, 100);
431                    tracing::debug!(
432                        pct_synced,
433                        sync_quota,
434                        account_counter,
435                        channel_counter,
436                        "percentage of connection quota synced"
437                    );
438
439                    // Send the completion notification
440                    // once we reach the expected number of accounts and channels with
441                    // the given tolerance
442                    if account_counter >= min_accounts && channel_counter >= min_channels {
443                        tracing::info!(account_counter, channel_counter, time = ?sync_started.elapsed(), "on-chain graph has been synced");
444                        let _ = connection_ready_tx.take().unwrap().send(Ok(()));
445                    }
446                }
447            })
448            .for_each(|event_type| {
449                let event_tx = event_tx.clone();
450                async move {
451                    match event_type {
452                        Ok(SubscribedEventType::Account((new_account, old_account))) => {
453                            tracing::debug!(%new_account, "account inserted");
454                            // We only track public accounts as events and also
455                            // broadcast announcements of already existing accounts (old_account == None).
456                            if new_account.has_announced_with_routing_info()
457                                && old_account.is_none_or(|a| !a.has_announced_with_routing_info())
458                            {
459                                tracing::debug!(account = %new_account, "new announcement");
460                                let _ = event_tx
461                                    .broadcast_direct(ChainEvent::Announcement(new_account.clone()))
462                                    .await;
463                            }
464                        }
465                        Ok(SubscribedEventType::Channel((new_channel, Some(changes)))) => {
466                            tracing::debug!(
467                                id = %new_channel.get_id(),
468                                src = %new_channel.source, dst = %new_channel.destination,
469                                num_changes = changes.len(),
470                                "channel updated"
471                            );
472                            process_channel_changes_into_events(new_channel, changes, &me, &event_tx).await;
473                        }
474                        Ok(SubscribedEventType::Channel((new_channel, None))) => {
475                            tracing::debug!(
476                                id = %new_channel.get_id(),
477                                src = %new_channel.source, dst = %new_channel.destination,
478                                "channel opened"
479                            );
480                            let _ = event_tx.broadcast_direct(ChainEvent::ChannelOpened(new_channel)).await;
481                        }
482                        Ok(SubscribedEventType::WinningProbability((new, old))) => {
483                            let old = old.unwrap_or_default();
484                            match new.approx_cmp(&old) {
485                                Ordering::Less => {
486                                    tracing::debug!(%new, %old, "winning probability decreased");
487                                    let _ = event_tx
488                                        .broadcast_direct(ChainEvent::WinningProbabilityDecreased(new))
489                                        .await;
490                                }
491                                Ordering::Greater => {
492                                    tracing::debug!(%new, %old, "winning probability increased");
493                                    let _ = event_tx
494                                        .broadcast_direct(ChainEvent::WinningProbabilityIncreased(new))
495                                        .await;
496                                }
497                                Ordering::Equal => {}
498                            }
499                        }
500                        Ok(SubscribedEventType::TicketPrice((new, old))) => {
501                            tracing::debug!(%new, ?old, "ticket price changed");
502                            let _ = event_tx.broadcast_direct(ChainEvent::TicketPriceChanged(new)).await;
503                        }
504                        Err(error) => {
505                            tracing::error!(%error, "error processing account/graph/ticket params subscription");
506                        }
507                    }
508                }
509            })
510            .await;
511
512            // Only transition to SubscriptionEnded if currently Ready or Connecting —
513            // don't overwrite terminal error states (ServerNotHealthy, ConnectionFailed, etc.)
514            tracing::warn!("chain subscription stream ended, marking chain health as degraded");
515            let current = health.load(AtomicOrdering::Relaxed);
516            if matches!(current, ChainHealthState::Connecting | ChainHealthState::Ready) {
517                let _ = health.compare_exchange(
518                    current,
519                    ChainHealthState::SubscriptionEnded,
520                    AtomicOrdering::Relaxed,
521                    AtomicOrdering::Relaxed,
522                );
523            }
524        });
525
526        connection_ready_rx
527            .timeout(futures_time::time::Duration::from(timeout))
528            .map(|res| match res {
529                Ok(Ok(Ok(_))) => Ok(abort_handle),
530                Ok(Ok(Err(error))) => {
531                    abort_handle.abort();
532                    Err(ConnectorError::from(error))
533                }
534                Ok(Err(_)) => {
535                    abort_handle.abort();
536                    Err(ConnectorError::InvalidState("failed to determine connection state"))
537                }
538                Err(_) => {
539                    abort_handle.abort();
540                    tracing::error!(min_accounts, min_channels, "connection timeout when syncing");
541                    Err(ConnectorError::ConnectionTimeout)
542                }
543            })
544            .await
545    }
546
547    /// Connects to the chain using the underlying client, syncs all on-chain data,
548    /// and subscribes for all future updates.
549    ///
550    /// If the connection does not finish within
551    /// [`BlockchainConnectorConfig::connection_timeout`](BlockchainConnectorConfig)
552    /// the [`ConnectorError::ConnectionTimeout`] error is returned.
553    ///
554    /// Most of the operations with the Connector will fail if it is not connected first.
555    ///
556    /// There are some notable exceptions that DO NOT require a prior call to `connect`:
557    /// - all the [`ChainValues`](hopr_api::chain::ChainValues) methods,
558    /// - all the [`ChainReadSafeOperations`](hopr_api::chain::ChainReadSafeOperations) methods,
559    /// - all the [`ChainWriteSafeOperations`](hopr_api::chain::ChainWriteSafeOperations) methods,
560    /// - [`me`](hopr_api::chain::ChainReadChannelOperations::me)
561    ///
562    /// If you wish to only call operations from the above Chain APIs, consider constructing
563    /// the [`HoprBlockchainReader`](crate::HoprBlockchainReader) instead.
564    pub async fn connect(&mut self) -> Result<(), ConnectorError> {
565        if self
566            .connection_handle
567            .as_ref()
568            .filter(|handle| !handle.is_aborted())
569            .is_some()
570        {
571            return Err(ConnectorError::InvalidState("connector is already connected"));
572        }
573
574        self.health.store(ChainHealthState::Connecting, AtomicOrdering::Relaxed);
575
576        let abort_handle = match self
577            .do_connect(self.cfg.connection_sync_timeout.max(MIN_CONNECTION_TIMEOUT))
578            .await
579        {
580            Ok(handle) => handle,
581            Err(e @ ConnectorError::ServerNotHealthy) => {
582                self.health
583                    .store(ChainHealthState::ServerNotHealthy, AtomicOrdering::Relaxed);
584                return Err(e);
585            }
586            Err(e @ ConnectorError::ConnectionTimeout) => {
587                self.health
588                    .store(ChainHealthState::SyncTimedOut, AtomicOrdering::Relaxed);
589                return Err(e);
590            }
591            Err(e) => {
592                self.health
593                    .store(ChainHealthState::ConnectionFailed, AtomicOrdering::Relaxed);
594                return Err(e);
595            }
596        };
597
598        self.connection_handle = Some(abort_handle);
599        // Only transition to Ready if still Connecting — the subscription task
600        // may have already set SubscriptionEnded in a race.
601        let _ = self.health.compare_exchange(
602            ChainHealthState::Connecting,
603            ChainHealthState::Ready,
604            AtomicOrdering::Relaxed,
605            AtomicOrdering::Relaxed,
606        );
607
608        tracing::info!(node = %self.chain_key.public().to_address(), "connected to chain as node");
609        Ok(())
610    }
611
612    /// Returns the reference to the underlying client.
613    pub fn client(&self) -> &C {
614        self.client.as_ref()
615    }
616
617    /// Checks if the connector is [connected](HoprBlockchainConnector::connect) to the chain.
618    pub fn is_connected(&self) -> bool {
619        self.check_connection_state().is_ok()
620    }
621}
622
623impl<B, C, P> HoprBlockchainConnector<C, B, P, P::TxRequest>
624where
625    C: BlokliTransactionClient + BlokliQueryClient + Send + Sync + 'static,
626    P: PayloadGenerator + Send + Sync,
627    P::TxRequest: Send + Sync,
628{
629    async fn send_tx<'a>(
630        &'a self,
631        tx_req: P::TxRequest,
632        custom_tx_multiplier: Option<u32>,
633    ) -> Result<impl Future<Output = Result<ChainReceipt, ConnectorError>> + Send + 'a, ConnectorError> {
634        let chain_info = self.query_cached_chain_info().await?;
635        let tx_timeout = custom_tx_multiplier.unwrap_or(self.cfg.tx_timeout_multiplier).max(1)
636            * chain_info.finality
637            * chain_info.expected_block_time;
638        Ok(self
639            .sequencer
640            .enqueue_transaction(tx_req, tx_timeout.max(MIN_TX_CONFIRM_TIMEOUT))
641            .await?
642            .and_then(|tx| {
643                if let Some(tx_exec) = tx.safe_execution
644                    && !tx_exec.success
645                {
646                    return futures::future::err(ConnectorError::InnerTxFailed(
647                        tx_exec.revert_reason.unwrap_or("n/a".into()),
648                    ));
649                }
650                futures::future::ready(
651                    ChainReceipt::from_str(&tx.transaction_hash.0)
652                        .map_err(|_| ConnectorError::TypeConversion("invalid tx hash".into())),
653                )
654            }))
655    }
656}
657
658impl<B, C, P, R> hopr_api::node::ComponentStatusReporter for HoprBlockchainConnector<C, B, P, R> {
659    fn component_status(&self) -> hopr_api::node::ComponentStatus {
660        self.health.load(AtomicOrdering::Relaxed).into()
661    }
662}
663
664impl<B, C, P, R> HoprBlockchainConnector<C, R, B, P> {
665    #[inline]
666    pub(crate) fn check_connection_state(&self) -> Result<(), ConnectorError> {
667        self.connection_handle
668            .as_ref()
669            .filter(|handle| !handle.is_aborted()) // Do a safety check
670            .ok_or_else(|| ConnectorError::InvalidState("connector is not connected"))
671            .map(|_| ())
672    }
673
674    /// Invalidates all cached on-chain data.
675    pub fn invalidate_caches(&self) {
676        self.channel_by_parties.invalidate_all();
677        self.channel_by_id.invalidate_all();
678        self.packet_to_chain.invalidate_all();
679        self.chain_to_packet.invalidate_all();
680        self.values.invalidate_all();
681    }
682}
683
684impl<B, C, P, R> Drop for HoprBlockchainConnector<C, R, B, P> {
685    fn drop(&mut self) {
686        self.health.store(ChainHealthState::Dropped, AtomicOrdering::Relaxed);
687        self.events.0.close();
688        if let Some(abort_handle) = self.connection_handle.take() {
689            abort_handle.abort();
690        }
691    }
692}
693
694impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
695where
696    B: Backend + Send + Sync + 'static,
697    C: Send + Sync,
698    P: Send + Sync,
699    R: Send + Sync,
700{
701    /// Returns a [`PathAddressResolver`] using this connector.
702    pub fn as_path_resolver(&self) -> ChainPathResolver<'_, Self> {
703        self.into()
704    }
705}
706
707#[cfg(test)]
708pub(crate) mod tests {
709    use blokli_client::BlokliTestState;
710    use hex_literal::hex;
711    use hopr_api::{chain::ChainWriteTicketOperations, types::chain::contract_addresses_for_network};
712
713    use super::*;
714    use crate::{
715        InMemoryBackend,
716        testing::{BlokliTestStateBuilder, ChainMutator, FullStateEmulator},
717    };
718
719    pub const PRIVATE_KEY_1: [u8; 32] = hex!("c14b8faa0a9b8a5fa4453664996f23a7e7de606d42297d723fc4a794f375e260");
720    pub const PRIVATE_KEY_2: [u8; 32] = hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775");
721    pub const MODULE_ADDR: [u8; 20] = hex!("1111111111111111111111111111111111111111");
722
723    pub type TestConnector<C> = HoprBlockchainConnector<
724        C,
725        InMemoryBackend,
726        SafePayloadGenerator,
727        <SafePayloadGenerator as PayloadGenerator>::TxRequest,
728    >;
729
730    pub fn create_connector<C>(blokli_client: C) -> anyhow::Result<TestConnector<C>>
731    where
732        C: BlokliQueryClient + BlokliTransactionClient + BlokliSubscriptionClient + Send + Sync + 'static,
733    {
734        let ckp = ChainKeypair::from_secret(&PRIVATE_KEY_1)?;
735
736        Ok(HoprBlockchainConnector::new(
737            ckp.clone(),
738            Default::default(),
739            blokli_client,
740            InMemoryBackend::default(),
741            SafePayloadGenerator::new(
742                &ckp,
743                contract_addresses_for_network("rotsee").unwrap().1,
744                MODULE_ADDR.into(),
745            ),
746        ))
747    }
748
749    #[tokio::test]
750    async fn connector_should_connect() -> anyhow::Result<()> {
751        let blokli_client = BlokliTestStateBuilder::default().build_static_client();
752
753        let mut connector = create_connector(blokli_client)?;
754        connector.connect().await?;
755
756        assert!(connector.is_connected());
757
758        Ok(())
759    }
760
761    #[tokio::test]
762    async fn connector_should_not_connect_when_blokli_not_healthy() -> anyhow::Result<()> {
763        let state = BlokliTestState {
764            health: "DOWN".into(),
765            ..Default::default()
766        };
767
768        let blokli_client = BlokliTestStateBuilder::from(state).build_static_client();
769
770        let mut connector = create_connector(blokli_client)?;
771
772        let res = connector.connect().await;
773
774        assert!(matches!(res, Err(ConnectorError::ServerNotHealthy)));
775        assert!(!connector.is_connected());
776
777        Ok(())
778    }
779
780    #[tokio::test]
781    async fn connector_should_handle_inner_tx_failure_during_redemption() -> anyhow::Result<()> {
782        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
783            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
784        ))?;
785        let account_1 = AccountEntry {
786            public_key: *offchain_key_1.public(),
787            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
788            entry_type: AccountType::NotAnnounced,
789            safe_address: Some([1u8; Address::SIZE].into()),
790            key_id: 1.into(),
791        };
792        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
793            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
794        ))?;
795        let account_2 = AccountEntry {
796            public_key: *offchain_key_2.public(),
797            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
798            entry_type: AccountType::NotAnnounced,
799            safe_address: Some([2u8; Address::SIZE].into()),
800            key_id: 2.into(),
801        };
802
803        let channel_1 = ChannelEntry::builder()
804            .between(
805                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
806                &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
807            )
808            .amount(10)
809            .ticket_index(1)
810            .status(ChannelStatus::Open)
811            .epoch(1)
812            .build()?;
813
814        let blokli_client = BlokliTestStateBuilder::default()
815            .with_accounts([
816                (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
817                (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
818            ])
819            .with_channels([channel_1])
820            .with_hopr_network_chain_info("rotsee")
821            .build_dynamic_client_with_mutator(ChainMutator::new(
822                move |_: &[u8], state: &mut BlokliTestState| -> Result<(), blokli_client::errors::BlokliClientError> {
823                    // Update the channel ticket index, without the client noticing the change
824                    // This will cause the transaction to be rejected in the Emulator, and
825                    // not by the checks performed by the Connector before the redemption.
826                    if let Some(c) = state.get_channel_by_id_mut(&(*channel_1.get_id()).into()) {
827                        c.ticket_index = blokli_client::api::types::Uint64("2".into());
828                        Ok(())
829                    } else {
830                        Err(blokli_client::errors::ErrorKind::MockClientError(anyhow::anyhow!(
831                            "channel unexpectedly not found"
832                        ))
833                        .into())
834                    }
835                },
836                FullStateEmulator(MODULE_ADDR.into(), None),
837            ))
838            .with_tx_simulation_delay(Duration::from_millis(100))
839            .with_use_internal_txs(true);
840
841        let mut connector = create_connector(blokli_client.clone())?;
842        connector.connect().await?;
843
844        let hkc1 = ChainKeypair::from_secret(&hex!(
845            "e17fe86ce6e99f4806715b0c9412f8dad89334bf07f72d5834207a9d8f19d7f8"
846        ))?;
847        let hkc2 = ChainKeypair::from_secret(&hex!(
848            "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
849        ))?;
850
851        let ticket = TicketBuilder::default()
852            .counterparty(&ChainKeypair::from_secret(&PRIVATE_KEY_1)?)
853            .amount(1)
854            .index(1)
855            .channel_epoch(1)
856            .eth_challenge(
857                Challenge::from_hint_and_share(
858                    &HalfKeyChallenge::new(hkc1.public().as_ref()),
859                    &HalfKeyChallenge::new(hkc2.public().as_ref()),
860                )?
861                .to_ethereum_challenge(),
862            )
863            .build_signed(&ChainKeypair::from_secret(&PRIVATE_KEY_2)?, &Hash::default())?
864            .into_acknowledged(Response::from_half_keys(
865                &HalfKey::try_from(hkc1.secret().as_ref())?,
866                &HalfKey::try_from(hkc2.secret().as_ref())?,
867            )?)
868            .into_redeemable(&ChainKeypair::from_secret(&PRIVATE_KEY_1)?, &Hash::default())?;
869
870        let res = connector.redeem_ticket(ticket).await?;
871        let err = res.await;
872        assert!(matches!(err, Err(hopr_api::chain::TicketRedeemError::Rejected(_, _))));
873
874        Ok(())
875    }
876
877    #[test]
878    fn chain_health_all_variants_convert_to_component_status() {
879        use hopr_api::node::ComponentStatus;
880        let variants = [
881            ChainHealthState::Ready,
882            ChainHealthState::WaitingForConnection,
883            ChainHealthState::Connecting,
884            ChainHealthState::SubscriptionEnded,
885            ChainHealthState::SyncTimedOut,
886            ChainHealthState::ServerNotHealthy,
887            ChainHealthState::ConnectionFailed,
888            ChainHealthState::Dropped,
889        ];
890        for state in variants {
891            let _: ComponentStatus = state.into();
892        }
893    }
894
895    #[test]
896    fn chain_health_ready_maps_to_component_ready() {
897        use hopr_api::node::ComponentStatus;
898        let status: ComponentStatus = ChainHealthState::Ready.into();
899        assert!(status.is_ready());
900    }
901
902    #[test]
903    fn chain_health_degraded_states() {
904        use hopr_api::node::ComponentStatus;
905        let s: ComponentStatus = ChainHealthState::SubscriptionEnded.into();
906        assert!(s.is_degraded());
907        let s: ComponentStatus = ChainHealthState::SyncTimedOut.into();
908        assert!(s.is_degraded());
909    }
910
911    #[test]
912    fn chain_health_unavailable_states() {
913        use hopr_api::node::ComponentStatus;
914        let s: ComponentStatus = ChainHealthState::ServerNotHealthy.into();
915        assert!(s.is_unavailable());
916        let s: ComponentStatus = ChainHealthState::ConnectionFailed.into();
917        assert!(s.is_unavailable());
918        let s: ComponentStatus = ChainHealthState::Dropped.into();
919        assert!(s.is_unavailable());
920    }
921
922    #[test]
923    fn chain_health_initializing_states() {
924        use hopr_api::node::ComponentStatus;
925        let s: ComponentStatus = ChainHealthState::WaitingForConnection.into();
926        assert!(s.is_initializing());
927        let s: ComponentStatus = ChainHealthState::Connecting.into();
928        assert!(s.is_initializing());
929    }
930
931    #[tokio::test]
932    async fn connector_health_starts_as_initializing() -> anyhow::Result<()> {
933        use hopr_api::node::ComponentStatusReporter;
934        let blokli_client = BlokliTestStateBuilder::default().build_static_client();
935        let connector = create_connector(blokli_client)?;
936        assert!(connector.component_status().is_initializing());
937        Ok(())
938    }
939
940    #[tokio::test]
941    async fn connector_health_ready_after_connect() -> anyhow::Result<()> {
942        use hopr_api::node::ComponentStatusReporter;
943        let blokli_client = BlokliTestStateBuilder::default().build_static_client();
944        let mut connector = create_connector(blokli_client)?;
945        connector.connect().await?;
946        assert!(connector.component_status().is_ready());
947        Ok(())
948    }
949
950    #[tokio::test]
951    async fn connector_health_unavailable_when_server_not_healthy() -> anyhow::Result<()> {
952        use hopr_api::node::ComponentStatusReporter;
953        let state = BlokliTestState {
954            health: "DOWN".into(),
955            ..Default::default()
956        };
957        let blokli_client = BlokliTestStateBuilder::from(state).build_static_client();
958        let mut connector = create_connector(blokli_client)?;
959        let _ = connector.connect().await;
960        assert!(connector.component_status().is_unavailable());
961        Ok(())
962    }
963
964    #[test]
965    fn health_cas_ready_only_from_connecting() {
966        let health = AtomicChainHealthState::new(ChainHealthState::Connecting);
967        let result = health.compare_exchange(
968            ChainHealthState::Connecting,
969            ChainHealthState::Ready,
970            AtomicOrdering::Relaxed,
971            AtomicOrdering::Relaxed,
972        );
973        assert!(result.is_ok());
974        assert_eq!(health.load(AtomicOrdering::Relaxed), ChainHealthState::Ready);
975    }
976
977    #[test]
978    fn health_cas_ready_fails_from_subscription_ended() {
979        let health = AtomicChainHealthState::new(ChainHealthState::SubscriptionEnded);
980        let result = health.compare_exchange(
981            ChainHealthState::Connecting,
982            ChainHealthState::Ready,
983            AtomicOrdering::Relaxed,
984            AtomicOrdering::Relaxed,
985        );
986        assert!(result.is_err());
987        assert_eq!(
988            health.load(AtomicOrdering::Relaxed),
989            ChainHealthState::SubscriptionEnded
990        );
991    }
992
993    #[test]
994    fn health_subscription_ended_preserves_terminal_state() {
995        let health = AtomicChainHealthState::new(ChainHealthState::ServerNotHealthy);
996        let current = health.load(AtomicOrdering::Relaxed);
997        // ServerNotHealthy is a terminal state — should NOT transition to SubscriptionEnded
998        assert!(!matches!(
999            current,
1000            ChainHealthState::Connecting | ChainHealthState::Ready
1001        ));
1002        // The conditional store would skip this
1003    }
1004
1005    #[test]
1006    fn health_subscription_ended_from_ready() {
1007        let health = AtomicChainHealthState::new(ChainHealthState::Ready);
1008        let current = health.load(AtomicOrdering::Relaxed);
1009        if matches!(current, ChainHealthState::Connecting | ChainHealthState::Ready) {
1010            let _ = health.compare_exchange(
1011                current,
1012                ChainHealthState::SubscriptionEnded,
1013                AtomicOrdering::Relaxed,
1014                AtomicOrdering::Relaxed,
1015            );
1016        }
1017        assert_eq!(
1018            health.load(AtomicOrdering::Relaxed),
1019            ChainHealthState::SubscriptionEnded
1020        );
1021    }
1022
1023    #[test]
1024    fn health_drop_overwrites_any_state() {
1025        for initial in [
1026            ChainHealthState::Ready,
1027            ChainHealthState::Connecting,
1028            ChainHealthState::SubscriptionEnded,
1029        ] {
1030            let health = AtomicChainHealthState::new(initial);
1031            health.store(ChainHealthState::Dropped, AtomicOrdering::Relaxed);
1032            assert_eq!(health.load(AtomicOrdering::Relaxed), ChainHealthState::Dropped);
1033        }
1034    }
1035}