1use std::{cmp::Ordering, str::FromStr, sync::atomic::Ordering as AtomicOrdering, time::Duration};
2
3use blokli_client::api::{BlokliQueryClient, BlokliSubscriptionClient, BlokliTransactionClient};
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
5use futures_concurrency::stream::Merge;
6use futures_time::future::FutureExt as FuturesTimeExt;
7use hopr_api::{
8 chain::{ChainPathResolver, ChainReceipt, HoprKeyIdent},
9 types::{chain::prelude::*, crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
10};
11use hopr_utils::runtime::AbortHandle;
12use petgraph::prelude::DiGraphMap;
13
14use crate::{
15 backend::Backend,
16 connector::{keys::HoprKeyMapper, sequencer::TransactionSequencer, values::CHAIN_INFO_CACHE_KEY},
17 errors::ConnectorError,
18 utils::{
19 ParsedChainInfo, model_to_account_entry, model_to_graph_entry, model_to_ticket_params,
20 process_channel_changes_into_events,
21 },
22};
23
24mod accounts;
25mod channels;
26mod events;
27mod keys;
28mod safe;
29mod sequencer;
30mod tickets;
31mod values;
32
33type EventsChannel = (
34 async_broadcast::Sender<ChainEvent>,
35 async_broadcast::InactiveReceiver<ChainEvent>,
36);
37
38const MIN_CONNECTION_TIMEOUT: Duration = Duration::from_millis(100);
39const MIN_TX_CONFIRM_TIMEOUT: Duration = Duration::from_secs(1);
40const TX_TIMEOUT_MULTIPLIER: u32 = 2;
41const DEFAULT_SYNC_TOLERANCE_PCT: usize = 90;
42
43#[atomic_enum::atomic_enum]
49#[derive(PartialEq, Eq)]
50enum ChainHealthState {
51 Ready = 0,
52 WaitingForConnection = 1,
53 Connecting = 2,
54 SubscriptionEnded = 3,
55 SyncTimedOut = 4,
56 ServerNotHealthy = 5,
57 ConnectionFailed = 6,
58 Dropped = 7,
59}
60
61impl From<ChainHealthState> for hopr_api::node::ComponentStatus {
62 fn from(state: ChainHealthState) -> Self {
63 match state {
64 ChainHealthState::Ready => Self::Ready,
65 ChainHealthState::WaitingForConnection => Self::Initializing("waiting for chain connection".into()),
66 ChainHealthState::Connecting => Self::Initializing("connecting to blokli".into()),
67 ChainHealthState::SubscriptionEnded => Self::Degraded("chain subscription ended".into()),
68 ChainHealthState::SyncTimedOut => Self::Degraded("connection sync timed out".into()),
69 ChainHealthState::ServerNotHealthy => Self::Unavailable("blokli server not healthy".into()),
70 ChainHealthState::ConnectionFailed => Self::Unavailable("chain connection failed".into()),
71 ChainHealthState::Dropped => Self::Unavailable("connector dropped".into()),
72 }
73 }
74}
75
76#[derive(Clone, Copy, Debug, PartialEq, Eq, smart_default::SmartDefault)]
78pub struct BlockchainConnectorConfig {
79 #[default(Duration::from_secs(30))]
83 pub connection_sync_timeout: Duration,
84 #[default(DEFAULT_SYNC_TOLERANCE_PCT)]
90 pub sync_tolerance: usize,
91 #[default(TX_TIMEOUT_MULTIPLIER)]
96 pub tx_timeout_multiplier: u32,
97}
98
99pub struct HoprBlockchainConnector<C, B, P, R> {
104 payload_generator: P,
105 chain_key: ChainKeypair,
106 client: std::sync::Arc<C>,
107 graph: std::sync::Arc<parking_lot::RwLock<DiGraphMap<HoprKeyIdent, ChannelId, ahash::RandomState>>>,
108 backend: std::sync::Arc<B>,
109 connection_handle: Option<AbortHandle>,
110 sequencer: TransactionSequencer<C, R>,
111 events: EventsChannel,
112 cfg: BlockchainConnectorConfig,
113 health: std::sync::Arc<AtomicChainHealthState>,
114
115 mapper: HoprKeyMapper<B>,
117 chain_to_packet: moka::sync::Cache<Address, Option<OffchainPublicKey>, ahash::RandomState>,
119 packet_to_chain: moka::sync::Cache<OffchainPublicKey, Option<Address>, ahash::RandomState>,
121 channel_by_id: moka::sync::Cache<ChannelId, Option<ChannelEntry>, ahash::RandomState>,
123 channel_by_parties: moka::sync::Cache<ChannelParties, Option<ChannelEntry>, ahash::RandomState>,
125 values: moka::future::Cache<u32, ParsedChainInfo>,
127 ticket_values: std::sync::Arc<parking_lot::RwLock<Option<(WinningProbability, HoprBalance)>>>,
130}
131
132const EXPECTED_NUM_NODES: usize = 10_000;
133const EXPECTED_NUM_CHANNELS: usize = 100_000;
134
135const DEFAULT_CACHE_TIMEOUT: Duration = Duration::from_mins(10);
136
137impl<B, C, P> HoprBlockchainConnector<C, B, P, P::TxRequest>
138where
139 B: Backend + Send + Sync + 'static,
140 C: BlokliSubscriptionClient + BlokliQueryClient + BlokliTransactionClient + Send + Sync + 'static,
141 P: PayloadGenerator + Send + Sync + 'static,
142 P::TxRequest: Send + Sync + 'static,
143{
144 pub fn new(
146 chain_key: ChainKeypair,
147 cfg: BlockchainConnectorConfig,
148 client: C,
149 backend: B,
150 payload_generator: P,
151 ) -> Self {
152 let backend = std::sync::Arc::new(backend);
153 let (mut events_tx, events_rx) = async_broadcast::broadcast(1024);
154 events_tx.set_overflow(true);
155 events_tx.set_await_active(false);
156
157 let client = std::sync::Arc::new(client);
158 Self {
159 payload_generator,
160 health: std::sync::Arc::new(AtomicChainHealthState::new(ChainHealthState::WaitingForConnection)),
161 graph: std::sync::Arc::new(parking_lot::RwLock::new(DiGraphMap::with_capacity_and_hasher(
162 EXPECTED_NUM_NODES,
163 EXPECTED_NUM_CHANNELS,
164 ahash::RandomState::default(),
165 ))),
166 backend: backend.clone(),
167 connection_handle: None,
168 sequencer: TransactionSequencer::new(chain_key.clone(), client.clone()),
169 events: (events_tx, events_rx.deactivate()),
170 client,
171 chain_key,
172 cfg,
173 mapper: HoprKeyMapper {
174 id_to_key: moka::sync::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
175 .time_to_idle(DEFAULT_CACHE_TIMEOUT)
176 .build_with_hasher(ahash::RandomState::default()),
177 key_to_id: moka::sync::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
178 .time_to_idle(DEFAULT_CACHE_TIMEOUT)
179 .build_with_hasher(ahash::RandomState::default()),
180 backend,
181 },
182 chain_to_packet: moka::sync::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
183 .time_to_idle(DEFAULT_CACHE_TIMEOUT)
184 .build_with_hasher(ahash::RandomState::default()),
185 packet_to_chain: moka::sync::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
186 .time_to_idle(DEFAULT_CACHE_TIMEOUT)
187 .build_with_hasher(ahash::RandomState::default()),
188 channel_by_id: moka::sync::CacheBuilder::new(EXPECTED_NUM_CHANNELS as u64)
189 .time_to_idle(DEFAULT_CACHE_TIMEOUT)
190 .build_with_hasher(ahash::RandomState::default()),
191 channel_by_parties: moka::sync::CacheBuilder::new(EXPECTED_NUM_CHANNELS as u64)
192 .time_to_idle(DEFAULT_CACHE_TIMEOUT)
193 .build_with_hasher(ahash::RandomState::default()),
194 values: moka::future::CacheBuilder::new(1).build(),
196 ticket_values: Default::default(),
197 }
198 }
199
200 async fn do_connect(&self, timeout: Duration) -> Result<AbortHandle, ConnectorError> {
201 let sync_quota = self.cfg.sync_tolerance.clamp(1, 100) as f64 / 100.0;
202 let min_accounts = (self
203 .client
204 .count_accounts(blokli_client::api::AccountSelector::Any)
205 .await? as f64
206 * sync_quota)
207 .round() as u32;
208 let min_channels = (self
209 .client
210 .query_channel_stats(blokli_client::api::ChannelSelector {
211 filter: None,
212 status: Some(blokli_client::api::types::ChannelStatus::Open),
213 ..Default::default()
214 })
215 .await?
216 .count as f64
217 * sync_quota)
218 .round() as u32;
219 tracing::debug!(min_accounts, min_channels, "connection thresholds");
220
221 let server_health = self.client.query_health().await?;
222 if !server_health.eq_ignore_ascii_case("OK") {
223 tracing::error!(server_health, "blokli server not healthy");
224 return Err(ConnectorError::ServerNotHealthy);
225 }
226
227 let (abort_handle, abort_reg) = AbortHandle::new_pair();
228
229 let (connection_ready_tx, connection_ready_rx) = futures::channel::oneshot::channel();
230 let mut connection_ready_tx = Some(connection_ready_tx);
231
232 let client = self.client.clone();
233 let mapper = self.mapper.clone();
234 let backend = self.backend.clone();
235 let graph = self.graph.clone();
236 let event_tx = self.events.0.clone();
237 let me = self.chain_key.public().to_address();
238 let values_cache = self.values.clone();
239
240 let chain_to_packet = self.chain_to_packet.clone();
241 let packet_to_chain = self.packet_to_chain.clone();
242
243 let channel_by_id = self.channel_by_id.clone();
244 let channel_by_parties = self.channel_by_parties.clone();
245
246 let initial_chain_values = self.query_cached_chain_info().await?;
248 self.ticket_values
249 .write()
250 .replace((initial_chain_values.ticket_win_prob, initial_chain_values.ticket_price));
251
252 #[allow(clippy::large_enum_variant)]
253 #[derive(Debug)]
254 enum SubscribedEventType {
255 Account((AccountEntry, Option<AccountEntry>)),
256 Channel((ChannelEntry, Option<Vec<ChannelChange>>)),
257 WinningProbability((WinningProbability, Option<WinningProbability>)),
258 TicketPrice((HoprBalance, Option<HoprBalance>)),
259 }
260
261 let ticket_values = self.ticket_values.clone();
262 let health = self.health.clone();
263 hopr_utils::runtime::prelude::spawn(async move {
264 let sync_started = std::time::Instant::now();
265
266 let connections = client
267 .subscribe_accounts(blokli_client::api::AccountSelector::Any)
268 .and_then(|accounts| Ok((accounts, client.subscribe_graph()?)))
269 .and_then(|(accounts, channels)| Ok((accounts, channels, client.subscribe_ticket_params()?)));
270
271 if let Err(error) = connections {
272 if let Some(connection_ready_tx) = connection_ready_tx.take() {
273 let _ = connection_ready_tx.send(Err(error));
274 }
275 return;
276 }
277
278 let (account_stream, channel_stream, ticket_params_stream) = connections.unwrap();
279
280 let graph_clone = graph.clone();
282 let account_stream = account_stream
283 .inspect_ok(|entry| tracing::trace!(?entry, "new account event"))
284 .map_err(ConnectorError::from)
285 .try_filter_map(|account| futures::future::ready(model_to_account_entry(account).map(Some)))
286 .and_then(move |account| {
287 let graph = graph_clone.clone();
288 let mapper = mapper.clone();
289 let chain_to_packet = chain_to_packet.clone();
290 let packet_to_chain = packet_to_chain.clone();
291 hopr_utils::runtime::prelude::spawn_blocking(move || {
292 mapper.key_to_id.insert(account.public_key, Some(account.key_id));
293 mapper.id_to_key.insert(account.key_id, Some(account.public_key));
294 graph.write().add_node(account.key_id);
295 mapper.backend.insert_account(account.clone()).map(|old| (account, old))
296 })
297 .map_err(ConnectorError::backend)
298 .and_then(move |res| {
299 if let Ok((upserted_account, _)) = &res {
300 chain_to_packet.insert(upserted_account.chain_addr, Some(upserted_account.public_key));
302 packet_to_chain.insert(upserted_account.public_key, Some(upserted_account.chain_addr));
303 }
304 futures::future::ready(res.map(SubscribedEventType::Account).map_err(ConnectorError::backend))
305 })
306 })
307 .fuse();
308
309 let channel_stream = channel_stream
311 .map_err(ConnectorError::from)
312 .inspect_ok(|entry| tracing::trace!(?entry, "new graph event"))
313 .try_filter_map(|graph_event| futures::future::ready(model_to_graph_entry(graph_event).map(Some)))
314 .and_then(move |(src, dst, channel)| {
315 let graph = graph.clone();
316 let backend = backend.clone();
317 let channel_by_id = channel_by_id.clone();
318 let channel_by_parties = channel_by_parties.clone();
319 hopr_utils::runtime::prelude::spawn_blocking(move || {
320 graph.write().add_edge(src.key_id, dst.key_id, *channel.get_id());
321 backend
322 .insert_channel(channel)
323 .map(|old| (channel, old.map(|old| old.diff(&channel))))
324 })
325 .map_err(ConnectorError::backend)
326 .and_then(move |res| {
327 let channel_by_id = channel_by_id.clone();
328 let channel_by_parties = channel_by_parties.clone();
329 if let Ok((upserted_channel, _)) = &res {
330 channel_by_id.insert(*upserted_channel.get_id(), Some(*upserted_channel));
332 channel_by_parties.insert(ChannelParties::from(upserted_channel), Some(*upserted_channel));
333 }
334 futures::future::ready(res.map(SubscribedEventType::Channel).map_err(ConnectorError::backend))
335 })
336 })
337 .fuse();
338
339 let ticket_params_stream = ticket_params_stream
341 .map_err(ConnectorError::from)
342 .inspect_ok(|entry| tracing::trace!(?entry, "new ticket params"))
343 .try_filter_map(|ticket_value_event| {
344 futures::future::ready(model_to_ticket_params(ticket_value_event).map(Some))
345 })
346 .inspect_ok(|(new_ticket_price, new_win_prob)| {
347 let tv = ticket_values.upgradable_read();
349 if let Some((current_win_prob, current_ticket_price)) = tv.as_ref().copied() {
350 if ¤t_ticket_price != new_ticket_price && !current_win_prob.approx_eq(new_win_prob) {
351 parking_lot::RwLockUpgradableReadGuard::upgrade(tv)
352 .replace((*new_win_prob, *new_ticket_price));
353 } else if ¤t_ticket_price != new_ticket_price {
354 parking_lot::RwLockUpgradableReadGuard::upgrade(tv)
355 .replace((current_win_prob, *new_ticket_price));
356 } else if !current_win_prob.approx_eq(new_win_prob) {
357 parking_lot::RwLockUpgradableReadGuard::upgrade(tv)
358 .replace((*new_win_prob, current_ticket_price));
359 }
360 }
361 })
362 .and_then(|(new_ticket_price, new_win_prob)| {
363 let values_cache = values_cache.clone();
364 async move {
365 let mut events = Vec::<SubscribedEventType>::new();
366 values_cache
367 .entry(CHAIN_INFO_CACHE_KEY)
368 .and_compute_with(|cached_entry| {
369 futures::future::ready(match cached_entry {
370 Some(chain_info) => {
371 let mut chain_info = chain_info.into_value();
372 if chain_info.ticket_price != new_ticket_price {
373 events.push(SubscribedEventType::TicketPrice((
374 new_ticket_price,
375 Some(chain_info.ticket_price),
376 )));
377 chain_info.ticket_price = new_ticket_price;
378 }
379 if !chain_info.ticket_win_prob.approx_eq(&new_win_prob) {
380 events.push(SubscribedEventType::WinningProbability((
381 new_win_prob,
382 Some(chain_info.ticket_win_prob),
383 )));
384 chain_info.ticket_win_prob = new_win_prob;
385 }
386
387 if !events.is_empty() {
388 moka::ops::compute::Op::Put(chain_info)
389 } else {
390 moka::ops::compute::Op::Nop
391 }
392 }
393 None => {
394 tracing::warn!(
395 "chain info not present in the cache before ticket params update"
396 );
397 events.push(SubscribedEventType::TicketPrice((new_ticket_price, None)));
398 events.push(SubscribedEventType::WinningProbability((new_win_prob, None)));
399 moka::ops::compute::Op::Nop
400 }
401 })
402 })
403 .await;
404 Ok(futures::stream::iter(events).map(Ok::<_, ConnectorError>))
405 }
406 })
407 .try_flatten()
408 .fuse();
409
410 let mut account_counter = 0;
411 let mut channel_counter = 0;
412 if min_accounts == 0 && min_channels == 0 {
413 tracing::info!(account_counter, channel_counter, time = ?sync_started.elapsed(), "on-chain graph has been synced");
414 let _ = connection_ready_tx.take().unwrap().send(Ok(()));
415 }
416
417 futures::stream::Abortable::new(
418 (account_stream, channel_stream, ticket_params_stream).merge(),
419 abort_reg,
420 )
421 .inspect_ok(move |event_type| {
422 if connection_ready_tx.is_some() {
423 match event_type {
424 SubscribedEventType::Account(_) => account_counter += 1,
425 SubscribedEventType::Channel(_) => channel_counter += 1,
426 _ => {}
427 }
428
429 let pct_synced =
430 ((account_counter + channel_counter) * 100 / (min_accounts + min_channels)).clamp(0, 100);
431 tracing::debug!(
432 pct_synced,
433 sync_quota,
434 account_counter,
435 channel_counter,
436 "percentage of connection quota synced"
437 );
438
439 if account_counter >= min_accounts && channel_counter >= min_channels {
443 tracing::info!(account_counter, channel_counter, time = ?sync_started.elapsed(), "on-chain graph has been synced");
444 let _ = connection_ready_tx.take().unwrap().send(Ok(()));
445 }
446 }
447 })
448 .for_each(|event_type| {
449 let event_tx = event_tx.clone();
450 async move {
451 match event_type {
452 Ok(SubscribedEventType::Account((new_account, old_account))) => {
453 tracing::debug!(%new_account, "account inserted");
454 if new_account.has_announced_with_routing_info()
457 && old_account.is_none_or(|a| !a.has_announced_with_routing_info())
458 {
459 tracing::debug!(account = %new_account, "new announcement");
460 let _ = event_tx
461 .broadcast_direct(ChainEvent::Announcement(new_account.clone()))
462 .await;
463 }
464 }
465 Ok(SubscribedEventType::Channel((new_channel, Some(changes)))) => {
466 tracing::debug!(
467 id = %new_channel.get_id(),
468 src = %new_channel.source, dst = %new_channel.destination,
469 num_changes = changes.len(),
470 "channel updated"
471 );
472 process_channel_changes_into_events(new_channel, changes, &me, &event_tx).await;
473 }
474 Ok(SubscribedEventType::Channel((new_channel, None))) => {
475 tracing::debug!(
476 id = %new_channel.get_id(),
477 src = %new_channel.source, dst = %new_channel.destination,
478 "channel opened"
479 );
480 let _ = event_tx.broadcast_direct(ChainEvent::ChannelOpened(new_channel)).await;
481 }
482 Ok(SubscribedEventType::WinningProbability((new, old))) => {
483 let old = old.unwrap_or_default();
484 match new.approx_cmp(&old) {
485 Ordering::Less => {
486 tracing::debug!(%new, %old, "winning probability decreased");
487 let _ = event_tx
488 .broadcast_direct(ChainEvent::WinningProbabilityDecreased(new))
489 .await;
490 }
491 Ordering::Greater => {
492 tracing::debug!(%new, %old, "winning probability increased");
493 let _ = event_tx
494 .broadcast_direct(ChainEvent::WinningProbabilityIncreased(new))
495 .await;
496 }
497 Ordering::Equal => {}
498 }
499 }
500 Ok(SubscribedEventType::TicketPrice((new, old))) => {
501 tracing::debug!(%new, ?old, "ticket price changed");
502 let _ = event_tx.broadcast_direct(ChainEvent::TicketPriceChanged(new)).await;
503 }
504 Err(error) => {
505 tracing::error!(%error, "error processing account/graph/ticket params subscription");
506 }
507 }
508 }
509 })
510 .await;
511
512 tracing::warn!("chain subscription stream ended, marking chain health as degraded");
515 let current = health.load(AtomicOrdering::Relaxed);
516 if matches!(current, ChainHealthState::Connecting | ChainHealthState::Ready) {
517 let _ = health.compare_exchange(
518 current,
519 ChainHealthState::SubscriptionEnded,
520 AtomicOrdering::Relaxed,
521 AtomicOrdering::Relaxed,
522 );
523 }
524 });
525
526 connection_ready_rx
527 .timeout(futures_time::time::Duration::from(timeout))
528 .map(|res| match res {
529 Ok(Ok(Ok(_))) => Ok(abort_handle),
530 Ok(Ok(Err(error))) => {
531 abort_handle.abort();
532 Err(ConnectorError::from(error))
533 }
534 Ok(Err(_)) => {
535 abort_handle.abort();
536 Err(ConnectorError::InvalidState("failed to determine connection state"))
537 }
538 Err(_) => {
539 abort_handle.abort();
540 tracing::error!(min_accounts, min_channels, "connection timeout when syncing");
541 Err(ConnectorError::ConnectionTimeout)
542 }
543 })
544 .await
545 }
546
547 pub async fn connect(&mut self) -> Result<(), ConnectorError> {
565 if self
566 .connection_handle
567 .as_ref()
568 .filter(|handle| !handle.is_aborted())
569 .is_some()
570 {
571 return Err(ConnectorError::InvalidState("connector is already connected"));
572 }
573
574 self.health.store(ChainHealthState::Connecting, AtomicOrdering::Relaxed);
575
576 let abort_handle = match self
577 .do_connect(self.cfg.connection_sync_timeout.max(MIN_CONNECTION_TIMEOUT))
578 .await
579 {
580 Ok(handle) => handle,
581 Err(e @ ConnectorError::ServerNotHealthy) => {
582 self.health
583 .store(ChainHealthState::ServerNotHealthy, AtomicOrdering::Relaxed);
584 return Err(e);
585 }
586 Err(e @ ConnectorError::ConnectionTimeout) => {
587 self.health
588 .store(ChainHealthState::SyncTimedOut, AtomicOrdering::Relaxed);
589 return Err(e);
590 }
591 Err(e) => {
592 self.health
593 .store(ChainHealthState::ConnectionFailed, AtomicOrdering::Relaxed);
594 return Err(e);
595 }
596 };
597
598 self.connection_handle = Some(abort_handle);
599 let _ = self.health.compare_exchange(
602 ChainHealthState::Connecting,
603 ChainHealthState::Ready,
604 AtomicOrdering::Relaxed,
605 AtomicOrdering::Relaxed,
606 );
607
608 tracing::info!(node = %self.chain_key.public().to_address(), "connected to chain as node");
609 Ok(())
610 }
611
612 pub fn client(&self) -> &C {
614 self.client.as_ref()
615 }
616
617 pub fn is_connected(&self) -> bool {
619 self.check_connection_state().is_ok()
620 }
621}
622
623impl<B, C, P> HoprBlockchainConnector<C, B, P, P::TxRequest>
624where
625 C: BlokliTransactionClient + BlokliQueryClient + Send + Sync + 'static,
626 P: PayloadGenerator + Send + Sync,
627 P::TxRequest: Send + Sync,
628{
629 async fn send_tx<'a>(
630 &'a self,
631 tx_req: P::TxRequest,
632 custom_tx_multiplier: Option<u32>,
633 ) -> Result<impl Future<Output = Result<ChainReceipt, ConnectorError>> + Send + 'a, ConnectorError> {
634 let chain_info = self.query_cached_chain_info().await?;
635 let tx_timeout = custom_tx_multiplier.unwrap_or(self.cfg.tx_timeout_multiplier).max(1)
636 * chain_info.finality
637 * chain_info.expected_block_time;
638 Ok(self
639 .sequencer
640 .enqueue_transaction(tx_req, tx_timeout.max(MIN_TX_CONFIRM_TIMEOUT))
641 .await?
642 .and_then(|tx| {
643 if let Some(tx_exec) = tx.safe_execution
644 && !tx_exec.success
645 {
646 return futures::future::err(ConnectorError::InnerTxFailed(
647 tx_exec.revert_reason.unwrap_or("n/a".into()),
648 ));
649 }
650 futures::future::ready(
651 ChainReceipt::from_str(&tx.transaction_hash.0)
652 .map_err(|_| ConnectorError::TypeConversion("invalid tx hash".into())),
653 )
654 }))
655 }
656}
657
658impl<B, C, P, R> hopr_api::node::ComponentStatusReporter for HoprBlockchainConnector<C, B, P, R> {
659 fn component_status(&self) -> hopr_api::node::ComponentStatus {
660 self.health.load(AtomicOrdering::Relaxed).into()
661 }
662}
663
664impl<B, C, P, R> HoprBlockchainConnector<C, R, B, P> {
665 #[inline]
666 pub(crate) fn check_connection_state(&self) -> Result<(), ConnectorError> {
667 self.connection_handle
668 .as_ref()
669 .filter(|handle| !handle.is_aborted()) .ok_or_else(|| ConnectorError::InvalidState("connector is not connected"))
671 .map(|_| ())
672 }
673
674 pub fn invalidate_caches(&self) {
676 self.channel_by_parties.invalidate_all();
677 self.channel_by_id.invalidate_all();
678 self.packet_to_chain.invalidate_all();
679 self.chain_to_packet.invalidate_all();
680 self.values.invalidate_all();
681 }
682}
683
684impl<B, C, P, R> Drop for HoprBlockchainConnector<C, R, B, P> {
685 fn drop(&mut self) {
686 self.health.store(ChainHealthState::Dropped, AtomicOrdering::Relaxed);
687 self.events.0.close();
688 if let Some(abort_handle) = self.connection_handle.take() {
689 abort_handle.abort();
690 }
691 }
692}
693
694impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
695where
696 B: Backend + Send + Sync + 'static,
697 C: Send + Sync,
698 P: Send + Sync,
699 R: Send + Sync,
700{
701 pub fn as_path_resolver(&self) -> ChainPathResolver<'_, Self> {
703 self.into()
704 }
705}
706
707#[cfg(test)]
708pub(crate) mod tests {
709 use blokli_client::BlokliTestState;
710 use hex_literal::hex;
711 use hopr_api::{chain::ChainWriteTicketOperations, types::chain::contract_addresses_for_network};
712
713 use super::*;
714 use crate::{
715 InMemoryBackend,
716 testing::{BlokliTestStateBuilder, ChainMutator, FullStateEmulator},
717 };
718
719 pub const PRIVATE_KEY_1: [u8; 32] = hex!("c14b8faa0a9b8a5fa4453664996f23a7e7de606d42297d723fc4a794f375e260");
720 pub const PRIVATE_KEY_2: [u8; 32] = hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775");
721 pub const MODULE_ADDR: [u8; 20] = hex!("1111111111111111111111111111111111111111");
722
723 pub type TestConnector<C> = HoprBlockchainConnector<
724 C,
725 InMemoryBackend,
726 SafePayloadGenerator,
727 <SafePayloadGenerator as PayloadGenerator>::TxRequest,
728 >;
729
730 pub fn create_connector<C>(blokli_client: C) -> anyhow::Result<TestConnector<C>>
731 where
732 C: BlokliQueryClient + BlokliTransactionClient + BlokliSubscriptionClient + Send + Sync + 'static,
733 {
734 let ckp = ChainKeypair::from_secret(&PRIVATE_KEY_1)?;
735
736 Ok(HoprBlockchainConnector::new(
737 ckp.clone(),
738 Default::default(),
739 blokli_client,
740 InMemoryBackend::default(),
741 SafePayloadGenerator::new(
742 &ckp,
743 contract_addresses_for_network("rotsee").unwrap().1,
744 MODULE_ADDR.into(),
745 ),
746 ))
747 }
748
749 #[tokio::test]
750 async fn connector_should_connect() -> anyhow::Result<()> {
751 let blokli_client = BlokliTestStateBuilder::default().build_static_client();
752
753 let mut connector = create_connector(blokli_client)?;
754 connector.connect().await?;
755
756 assert!(connector.is_connected());
757
758 Ok(())
759 }
760
761 #[tokio::test]
762 async fn connector_should_not_connect_when_blokli_not_healthy() -> anyhow::Result<()> {
763 let state = BlokliTestState {
764 health: "DOWN".into(),
765 ..Default::default()
766 };
767
768 let blokli_client = BlokliTestStateBuilder::from(state).build_static_client();
769
770 let mut connector = create_connector(blokli_client)?;
771
772 let res = connector.connect().await;
773
774 assert!(matches!(res, Err(ConnectorError::ServerNotHealthy)));
775 assert!(!connector.is_connected());
776
777 Ok(())
778 }
779
780 #[tokio::test]
781 async fn connector_should_handle_inner_tx_failure_during_redemption() -> anyhow::Result<()> {
782 let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
783 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
784 ))?;
785 let account_1 = AccountEntry {
786 public_key: *offchain_key_1.public(),
787 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
788 entry_type: AccountType::NotAnnounced,
789 safe_address: Some([1u8; Address::SIZE].into()),
790 key_id: 1.into(),
791 };
792 let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
793 "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
794 ))?;
795 let account_2 = AccountEntry {
796 public_key: *offchain_key_2.public(),
797 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
798 entry_type: AccountType::NotAnnounced,
799 safe_address: Some([2u8; Address::SIZE].into()),
800 key_id: 2.into(),
801 };
802
803 let channel_1 = ChannelEntry::builder()
804 .between(
805 &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
806 &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
807 )
808 .amount(10)
809 .ticket_index(1)
810 .status(ChannelStatus::Open)
811 .epoch(1)
812 .build()?;
813
814 let blokli_client = BlokliTestStateBuilder::default()
815 .with_accounts([
816 (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
817 (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
818 ])
819 .with_channels([channel_1])
820 .with_hopr_network_chain_info("rotsee")
821 .build_dynamic_client_with_mutator(ChainMutator::new(
822 move |_: &[u8], state: &mut BlokliTestState| -> Result<(), blokli_client::errors::BlokliClientError> {
823 if let Some(c) = state.get_channel_by_id_mut(&(*channel_1.get_id()).into()) {
827 c.ticket_index = blokli_client::api::types::Uint64("2".into());
828 Ok(())
829 } else {
830 Err(blokli_client::errors::ErrorKind::MockClientError(anyhow::anyhow!(
831 "channel unexpectedly not found"
832 ))
833 .into())
834 }
835 },
836 FullStateEmulator(MODULE_ADDR.into(), None),
837 ))
838 .with_tx_simulation_delay(Duration::from_millis(100))
839 .with_use_internal_txs(true);
840
841 let mut connector = create_connector(blokli_client.clone())?;
842 connector.connect().await?;
843
844 let hkc1 = ChainKeypair::from_secret(&hex!(
845 "e17fe86ce6e99f4806715b0c9412f8dad89334bf07f72d5834207a9d8f19d7f8"
846 ))?;
847 let hkc2 = ChainKeypair::from_secret(&hex!(
848 "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
849 ))?;
850
851 let ticket = TicketBuilder::default()
852 .counterparty(&ChainKeypair::from_secret(&PRIVATE_KEY_1)?)
853 .amount(1)
854 .index(1)
855 .channel_epoch(1)
856 .eth_challenge(
857 Challenge::from_hint_and_share(
858 &HalfKeyChallenge::new(hkc1.public().as_ref()),
859 &HalfKeyChallenge::new(hkc2.public().as_ref()),
860 )?
861 .to_ethereum_challenge(),
862 )
863 .build_signed(&ChainKeypair::from_secret(&PRIVATE_KEY_2)?, &Hash::default())?
864 .into_acknowledged(Response::from_half_keys(
865 &HalfKey::try_from(hkc1.secret().as_ref())?,
866 &HalfKey::try_from(hkc2.secret().as_ref())?,
867 )?)
868 .into_redeemable(&ChainKeypair::from_secret(&PRIVATE_KEY_1)?, &Hash::default())?;
869
870 let res = connector.redeem_ticket(ticket).await?;
871 let err = res.await;
872 assert!(matches!(err, Err(hopr_api::chain::TicketRedeemError::Rejected(_, _))));
873
874 Ok(())
875 }
876
877 #[test]
878 fn chain_health_all_variants_convert_to_component_status() {
879 use hopr_api::node::ComponentStatus;
880 let variants = [
881 ChainHealthState::Ready,
882 ChainHealthState::WaitingForConnection,
883 ChainHealthState::Connecting,
884 ChainHealthState::SubscriptionEnded,
885 ChainHealthState::SyncTimedOut,
886 ChainHealthState::ServerNotHealthy,
887 ChainHealthState::ConnectionFailed,
888 ChainHealthState::Dropped,
889 ];
890 for state in variants {
891 let _: ComponentStatus = state.into();
892 }
893 }
894
895 #[test]
896 fn chain_health_ready_maps_to_component_ready() {
897 use hopr_api::node::ComponentStatus;
898 let status: ComponentStatus = ChainHealthState::Ready.into();
899 assert!(status.is_ready());
900 }
901
902 #[test]
903 fn chain_health_degraded_states() {
904 use hopr_api::node::ComponentStatus;
905 let s: ComponentStatus = ChainHealthState::SubscriptionEnded.into();
906 assert!(s.is_degraded());
907 let s: ComponentStatus = ChainHealthState::SyncTimedOut.into();
908 assert!(s.is_degraded());
909 }
910
911 #[test]
912 fn chain_health_unavailable_states() {
913 use hopr_api::node::ComponentStatus;
914 let s: ComponentStatus = ChainHealthState::ServerNotHealthy.into();
915 assert!(s.is_unavailable());
916 let s: ComponentStatus = ChainHealthState::ConnectionFailed.into();
917 assert!(s.is_unavailable());
918 let s: ComponentStatus = ChainHealthState::Dropped.into();
919 assert!(s.is_unavailable());
920 }
921
922 #[test]
923 fn chain_health_initializing_states() {
924 use hopr_api::node::ComponentStatus;
925 let s: ComponentStatus = ChainHealthState::WaitingForConnection.into();
926 assert!(s.is_initializing());
927 let s: ComponentStatus = ChainHealthState::Connecting.into();
928 assert!(s.is_initializing());
929 }
930
931 #[tokio::test]
932 async fn connector_health_starts_as_initializing() -> anyhow::Result<()> {
933 use hopr_api::node::ComponentStatusReporter;
934 let blokli_client = BlokliTestStateBuilder::default().build_static_client();
935 let connector = create_connector(blokli_client)?;
936 assert!(connector.component_status().is_initializing());
937 Ok(())
938 }
939
940 #[tokio::test]
941 async fn connector_health_ready_after_connect() -> anyhow::Result<()> {
942 use hopr_api::node::ComponentStatusReporter;
943 let blokli_client = BlokliTestStateBuilder::default().build_static_client();
944 let mut connector = create_connector(blokli_client)?;
945 connector.connect().await?;
946 assert!(connector.component_status().is_ready());
947 Ok(())
948 }
949
950 #[tokio::test]
951 async fn connector_health_unavailable_when_server_not_healthy() -> anyhow::Result<()> {
952 use hopr_api::node::ComponentStatusReporter;
953 let state = BlokliTestState {
954 health: "DOWN".into(),
955 ..Default::default()
956 };
957 let blokli_client = BlokliTestStateBuilder::from(state).build_static_client();
958 let mut connector = create_connector(blokli_client)?;
959 let _ = connector.connect().await;
960 assert!(connector.component_status().is_unavailable());
961 Ok(())
962 }
963
964 #[test]
965 fn health_cas_ready_only_from_connecting() {
966 let health = AtomicChainHealthState::new(ChainHealthState::Connecting);
967 let result = health.compare_exchange(
968 ChainHealthState::Connecting,
969 ChainHealthState::Ready,
970 AtomicOrdering::Relaxed,
971 AtomicOrdering::Relaxed,
972 );
973 assert!(result.is_ok());
974 assert_eq!(health.load(AtomicOrdering::Relaxed), ChainHealthState::Ready);
975 }
976
977 #[test]
978 fn health_cas_ready_fails_from_subscription_ended() {
979 let health = AtomicChainHealthState::new(ChainHealthState::SubscriptionEnded);
980 let result = health.compare_exchange(
981 ChainHealthState::Connecting,
982 ChainHealthState::Ready,
983 AtomicOrdering::Relaxed,
984 AtomicOrdering::Relaxed,
985 );
986 assert!(result.is_err());
987 assert_eq!(
988 health.load(AtomicOrdering::Relaxed),
989 ChainHealthState::SubscriptionEnded
990 );
991 }
992
993 #[test]
994 fn health_subscription_ended_preserves_terminal_state() {
995 let health = AtomicChainHealthState::new(ChainHealthState::ServerNotHealthy);
996 let current = health.load(AtomicOrdering::Relaxed);
997 assert!(!matches!(
999 current,
1000 ChainHealthState::Connecting | ChainHealthState::Ready
1001 ));
1002 }
1004
1005 #[test]
1006 fn health_subscription_ended_from_ready() {
1007 let health = AtomicChainHealthState::new(ChainHealthState::Ready);
1008 let current = health.load(AtomicOrdering::Relaxed);
1009 if matches!(current, ChainHealthState::Connecting | ChainHealthState::Ready) {
1010 let _ = health.compare_exchange(
1011 current,
1012 ChainHealthState::SubscriptionEnded,
1013 AtomicOrdering::Relaxed,
1014 AtomicOrdering::Relaxed,
1015 );
1016 }
1017 assert_eq!(
1018 health.load(AtomicOrdering::Relaxed),
1019 ChainHealthState::SubscriptionEnded
1020 );
1021 }
1022
1023 #[test]
1024 fn health_drop_overwrites_any_state() {
1025 for initial in [
1026 ChainHealthState::Ready,
1027 ChainHealthState::Connecting,
1028 ChainHealthState::SubscriptionEnded,
1029 ] {
1030 let health = AtomicChainHealthState::new(initial);
1031 health.store(ChainHealthState::Dropped, AtomicOrdering::Relaxed);
1032 assert_eq!(health.load(AtomicOrdering::Relaxed), ChainHealthState::Dropped);
1033 }
1034 }
1035}