1use std::{cmp::Ordering, str::FromStr, time::Duration};
2
3use blokli_client::api::{BlokliQueryClient, BlokliSubscriptionClient, BlokliTransactionClient};
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
5use futures_concurrency::stream::Merge;
6use futures_time::future::FutureExt as FuturesTimeExt;
7use hopr_api::{
8 chain::{ChainPathResolver, ChainReceipt, HoprKeyIdent},
9 types::{chain::prelude::*, crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
10};
11use hopr_async_runtime::AbortHandle;
12use petgraph::prelude::DiGraphMap;
13
14use crate::{
15 backend::Backend,
16 connector::{keys::HoprKeyMapper, sequencer::TransactionSequencer, values::CHAIN_INFO_CACHE_KEY},
17 errors::ConnectorError,
18 utils::{
19 ParsedChainInfo, model_to_account_entry, model_to_graph_entry, model_to_ticket_params,
20 process_channel_changes_into_events,
21 },
22};
23
24mod accounts;
25mod channels;
26mod events;
27mod keys;
28mod safe;
29mod sequencer;
30mod tickets;
31mod values;
32
33type EventsChannel = (
34 async_broadcast::Sender<ChainEvent>,
35 async_broadcast::InactiveReceiver<ChainEvent>,
36);
37
38const MIN_CONNECTION_TIMEOUT: Duration = Duration::from_millis(100);
39const MIN_TX_CONFIRM_TIMEOUT: Duration = Duration::from_secs(1);
40const TX_TIMEOUT_MULTIPLIER: u32 = 2;
41const DEFAULT_SYNC_TOLERANCE_PCT: usize = 90;
42
43#[derive(Clone, Copy, Debug, PartialEq, Eq, smart_default::SmartDefault)]
45pub struct BlockchainConnectorConfig {
46 #[default(Duration::from_secs(30))]
50 pub connection_sync_timeout: Duration,
51 #[default(DEFAULT_SYNC_TOLERANCE_PCT)]
57 pub sync_tolerance: usize,
58 #[default(TX_TIMEOUT_MULTIPLIER)]
63 pub tx_timeout_multiplier: u32,
64}
65
66pub struct HoprBlockchainConnector<C, B, P, R> {
71 payload_generator: P,
72 chain_key: ChainKeypair,
73 client: std::sync::Arc<C>,
74 graph: std::sync::Arc<parking_lot::RwLock<DiGraphMap<HoprKeyIdent, ChannelId, ahash::RandomState>>>,
75 backend: std::sync::Arc<B>,
76 connection_handle: Option<AbortHandle>,
77 sequencer: TransactionSequencer<C, R>,
78 events: EventsChannel,
79 cfg: BlockchainConnectorConfig,
80
81 mapper: HoprKeyMapper<B>,
83 chain_to_packet: moka::future::Cache<Address, Option<OffchainPublicKey>, ahash::RandomState>,
85 packet_to_chain: moka::future::Cache<OffchainPublicKey, Option<Address>, ahash::RandomState>,
87 channel_by_id: moka::future::Cache<ChannelId, Option<ChannelEntry>, ahash::RandomState>,
89 channel_by_parties: moka::future::Cache<ChannelParties, Option<ChannelEntry>, ahash::RandomState>,
91 values: moka::future::Cache<u32, ParsedChainInfo>,
93}
94
95const EXPECTED_NUM_NODES: usize = 10_000;
96const EXPECTED_NUM_CHANNELS: usize = 100_000;
97
98const DEFAULT_CACHE_TIMEOUT: Duration = Duration::from_mins(10);
99
100impl<B, C, P> HoprBlockchainConnector<C, B, P, P::TxRequest>
101where
102 B: Backend + Send + Sync + 'static,
103 C: BlokliSubscriptionClient + BlokliQueryClient + BlokliTransactionClient + Send + Sync + 'static,
104 P: PayloadGenerator + Send + Sync + 'static,
105 P::TxRequest: Send + Sync + 'static,
106{
107 pub fn new(
109 chain_key: ChainKeypair,
110 cfg: BlockchainConnectorConfig,
111 client: C,
112 backend: B,
113 payload_generator: P,
114 ) -> Self {
115 let backend = std::sync::Arc::new(backend);
116 let (mut events_tx, events_rx) = async_broadcast::broadcast(1024);
117 events_tx.set_overflow(true);
118 events_tx.set_await_active(false);
119
120 let client = std::sync::Arc::new(client);
121 Self {
122 payload_generator,
123 graph: std::sync::Arc::new(parking_lot::RwLock::new(DiGraphMap::with_capacity_and_hasher(
124 EXPECTED_NUM_NODES,
125 EXPECTED_NUM_CHANNELS,
126 ahash::RandomState::default(),
127 ))),
128 backend: backend.clone(),
129 connection_handle: None,
130 sequencer: TransactionSequencer::new(chain_key.clone(), client.clone()),
131 events: (events_tx, events_rx.deactivate()),
132 client,
133 chain_key,
134 cfg,
135 mapper: HoprKeyMapper {
136 id_to_key: moka::sync::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
137 .time_to_idle(DEFAULT_CACHE_TIMEOUT)
138 .build_with_hasher(ahash::RandomState::default()),
139 key_to_id: moka::sync::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
140 .time_to_idle(DEFAULT_CACHE_TIMEOUT)
141 .build_with_hasher(ahash::RandomState::default()),
142 backend,
143 },
144 chain_to_packet: moka::future::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
145 .time_to_idle(DEFAULT_CACHE_TIMEOUT)
146 .build_with_hasher(ahash::RandomState::default()),
147 packet_to_chain: moka::future::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
148 .time_to_idle(DEFAULT_CACHE_TIMEOUT)
149 .build_with_hasher(ahash::RandomState::default()),
150 channel_by_id: moka::future::CacheBuilder::new(EXPECTED_NUM_CHANNELS as u64)
151 .time_to_idle(DEFAULT_CACHE_TIMEOUT)
152 .build_with_hasher(ahash::RandomState::default()),
153 channel_by_parties: moka::future::CacheBuilder::new(EXPECTED_NUM_CHANNELS as u64)
154 .time_to_idle(DEFAULT_CACHE_TIMEOUT)
155 .build_with_hasher(ahash::RandomState::default()),
156 values: moka::future::CacheBuilder::new(1).build(),
158 }
159 }
160
161 async fn do_connect(&self, timeout: Duration) -> Result<AbortHandle, ConnectorError> {
162 let sync_quota = self.cfg.sync_tolerance.clamp(1, 100) as f64 / 100.0;
163 let min_accounts = (self
164 .client
165 .count_accounts(blokli_client::api::AccountSelector::Any)
166 .await? as f64
167 * sync_quota)
168 .round() as u32;
169 let min_channels = (self
170 .client
171 .count_channels(blokli_client::api::ChannelSelector {
172 filter: None,
173 status: Some(blokli_client::api::types::ChannelStatus::Open),
174 })
175 .await? as f64
176 * sync_quota)
177 .round() as u32;
178 tracing::debug!(min_accounts, min_channels, "connection thresholds");
179
180 let server_health = self.client.query_health().await?;
181 if !server_health.eq_ignore_ascii_case("OK") {
182 tracing::error!(server_health, "blokli server not healthy");
183 return Err(ConnectorError::ServerNotHealthy);
184 }
185
186 let (abort_handle, abort_reg) = AbortHandle::new_pair();
187
188 let (connection_ready_tx, connection_ready_rx) = futures::channel::oneshot::channel();
189 let mut connection_ready_tx = Some(connection_ready_tx);
190
191 let client = self.client.clone();
192 let mapper = self.mapper.clone();
193 let backend = self.backend.clone();
194 let graph = self.graph.clone();
195 let event_tx = self.events.0.clone();
196 let me = self.chain_key.public().to_address();
197 let values_cache = self.values.clone();
198
199 let chain_to_packet = self.chain_to_packet.clone();
200 let packet_to_chain = self.packet_to_chain.clone();
201
202 let channel_by_id = self.channel_by_id.clone();
203 let channel_by_parties = self.channel_by_parties.clone();
204
205 self.query_cached_chain_info().await?;
207
208 #[allow(clippy::large_enum_variant)]
209 #[derive(Debug)]
210 enum SubscribedEventType {
211 Account((AccountEntry, Option<AccountEntry>)),
212 Channel((ChannelEntry, Option<Vec<ChannelChange>>)),
213 WinningProbability((WinningProbability, Option<WinningProbability>)),
214 TicketPrice((HoprBalance, Option<HoprBalance>)),
215 }
216
217 hopr_async_runtime::prelude::spawn(async move {
218 let sync_started = std::time::Instant::now();
219
220 let connections = client
221 .subscribe_accounts(blokli_client::api::AccountSelector::Any)
222 .and_then(|accounts| Ok((accounts, client.subscribe_graph()?)))
223 .and_then(|(accounts, channels)| Ok((accounts, channels, client.subscribe_ticket_params()?)));
224
225 if let Err(error) = connections {
226 if let Some(connection_ready_tx) = connection_ready_tx.take() {
227 let _ = connection_ready_tx.send(Err(error));
228 }
229 return;
230 }
231
232 let (account_stream, channel_stream, ticket_params_stream) = connections.unwrap();
233
234 let graph_clone = graph.clone();
236 let account_stream = account_stream
237 .inspect_ok(|entry| tracing::trace!(?entry, "new account event"))
238 .map_err(ConnectorError::from)
239 .try_filter_map(|account| futures::future::ready(model_to_account_entry(account).map(Some)))
240 .and_then(move |account| {
241 let graph = graph_clone.clone();
242 let mapper = mapper.clone();
243 let chain_to_packet = chain_to_packet.clone();
244 let packet_to_chain = packet_to_chain.clone();
245 hopr_async_runtime::prelude::spawn_blocking(move || {
246 mapper.key_to_id.insert(account.public_key, Some(account.key_id));
247 mapper.id_to_key.insert(account.key_id, Some(account.public_key));
248 graph.write().add_node(account.key_id);
249 mapper.backend.insert_account(account.clone()).map(|old| (account, old))
250 })
251 .map_err(ConnectorError::backend)
252 .and_then(move |res| {
253 let chain_to_packet = chain_to_packet.clone();
254 let packet_to_chain = packet_to_chain.clone();
255 async move {
256 if let Ok((upserted_account, _)) = &res {
257 chain_to_packet
259 .insert(upserted_account.chain_addr, Some(upserted_account.public_key))
260 .await;
261 packet_to_chain
262 .insert(upserted_account.public_key, Some(upserted_account.chain_addr))
263 .await;
264 }
265 res.map(SubscribedEventType::Account).map_err(ConnectorError::backend)
266 }
267 })
268 })
269 .fuse();
270
271 let channel_stream = channel_stream
273 .map_err(ConnectorError::from)
274 .inspect_ok(|entry| tracing::trace!(?entry, "new graph event"))
275 .try_filter_map(|graph_event| futures::future::ready(model_to_graph_entry(graph_event).map(Some)))
276 .and_then(move |(src, dst, channel)| {
277 let graph = graph.clone();
278 let backend = backend.clone();
279 let channel_by_id = channel_by_id.clone();
280 let channel_by_parties = channel_by_parties.clone();
281 hopr_async_runtime::prelude::spawn_blocking(move || {
282 graph.write().add_edge(src.key_id, dst.key_id, *channel.get_id());
283 backend
284 .insert_channel(channel)
285 .map(|old| (channel, old.map(|old| old.diff(&channel))))
286 })
287 .map_err(ConnectorError::backend)
288 .and_then(move |res| {
289 let channel_by_id = channel_by_id.clone();
290 let channel_by_parties = channel_by_parties.clone();
291 async move {
292 if let Ok((upserted_channel, _)) = &res {
293 channel_by_id
295 .insert(*upserted_channel.get_id(), Some(*upserted_channel))
296 .await;
297 channel_by_parties
298 .insert(ChannelParties::from(upserted_channel), Some(*upserted_channel))
299 .await;
300 }
301 res.map(SubscribedEventType::Channel).map_err(ConnectorError::backend)
302 }
303 })
304 })
305 .fuse();
306
307 let ticket_params_stream = ticket_params_stream
309 .map_err(ConnectorError::from)
310 .inspect_ok(|entry| tracing::trace!(?entry, "new ticket params"))
311 .try_filter_map(|ticket_value_event| {
312 futures::future::ready(model_to_ticket_params(ticket_value_event).map(Some))
313 })
314 .and_then(|(new_ticket_price, new_win_prob)| {
315 let values_cache = values_cache.clone();
316 async move {
317 let mut events = Vec::<SubscribedEventType>::new();
318 values_cache
319 .entry(CHAIN_INFO_CACHE_KEY)
320 .and_compute_with(|cached_entry| {
321 futures::future::ready(match cached_entry {
322 Some(chain_info) => {
323 let mut chain_info = chain_info.into_value();
324 if chain_info.ticket_price != new_ticket_price {
325 events.push(SubscribedEventType::TicketPrice((
326 new_ticket_price,
327 Some(chain_info.ticket_price),
328 )));
329 chain_info.ticket_price = new_ticket_price;
330 }
331 if !chain_info.ticket_win_prob.approx_eq(&new_win_prob) {
332 events.push(SubscribedEventType::WinningProbability((
333 new_win_prob,
334 Some(chain_info.ticket_win_prob),
335 )));
336 chain_info.ticket_win_prob = new_win_prob;
337 }
338
339 if !events.is_empty() {
340 moka::ops::compute::Op::Put(chain_info)
341 } else {
342 moka::ops::compute::Op::Nop
343 }
344 }
345 None => {
346 tracing::warn!(
347 "chain info not present in the cache before ticket params update"
348 );
349 events.push(SubscribedEventType::TicketPrice((new_ticket_price, None)));
350 events.push(SubscribedEventType::WinningProbability((new_win_prob, None)));
351 moka::ops::compute::Op::Nop
352 }
353 })
354 })
355 .await;
356 Ok(futures::stream::iter(events).map(Ok::<_, ConnectorError>))
357 }
358 })
359 .try_flatten()
360 .fuse();
361
362 let mut account_counter = 0;
363 let mut channel_counter = 0;
364 if min_accounts == 0 && min_channels == 0 {
365 tracing::info!(account_counter, channel_counter, time = ?sync_started.elapsed(), "on-chain graph has been synced");
366 let _ = connection_ready_tx.take().unwrap().send(Ok(()));
367 }
368
369 futures::stream::Abortable::new(
370 (account_stream, channel_stream, ticket_params_stream).merge(),
371 abort_reg,
372 )
373 .inspect_ok(move |event_type| {
374 if connection_ready_tx.is_some() {
375 match event_type {
376 SubscribedEventType::Account(_) => account_counter += 1,
377 SubscribedEventType::Channel(_) => channel_counter += 1,
378 _ => {}
379 }
380
381 let pct_synced =
382 ((account_counter + channel_counter) * 100 / (min_accounts + min_channels)).clamp(0, 100);
383 tracing::debug!(
384 pct_synced,
385 sync_quota,
386 account_counter,
387 channel_counter,
388 "percentage of connection quota synced"
389 );
390
391 if account_counter >= min_accounts && channel_counter >= min_channels {
395 tracing::info!(account_counter, channel_counter, time = ?sync_started.elapsed(), "on-chain graph has been synced");
396 let _ = connection_ready_tx.take().unwrap().send(Ok(()));
397 }
398 }
399 })
400 .for_each(|event_type| {
401 let event_tx = event_tx.clone();
402 async move {
403 match event_type {
404 Ok(SubscribedEventType::Account((new_account, old_account))) => {
405 tracing::debug!(%new_account, "account inserted");
406 if new_account.has_announced_with_routing_info()
409 && old_account.is_none_or(|a| !a.has_announced_with_routing_info())
410 {
411 tracing::debug!(account = %new_account, "new announcement");
412 let _ = event_tx
413 .broadcast_direct(ChainEvent::Announcement(new_account.clone()))
414 .await;
415 }
416 }
417 Ok(SubscribedEventType::Channel((new_channel, Some(changes)))) => {
418 tracing::debug!(
419 id = %new_channel.get_id(),
420 src = %new_channel.source, dst = %new_channel.destination,
421 num_changes = changes.len(),
422 "channel updated"
423 );
424 process_channel_changes_into_events(new_channel, changes, &me, &event_tx).await;
425 }
426 Ok(SubscribedEventType::Channel((new_channel, None))) => {
427 tracing::debug!(
428 id = %new_channel.get_id(),
429 src = %new_channel.source, dst = %new_channel.destination,
430 "channel opened"
431 );
432 let _ = event_tx.broadcast_direct(ChainEvent::ChannelOpened(new_channel)).await;
433 }
434 Ok(SubscribedEventType::WinningProbability((new, old))) => {
435 let old = old.unwrap_or_default();
436 match new.approx_cmp(&old) {
437 Ordering::Less => {
438 tracing::debug!(%new, %old, "winning probability decreased");
439 let _ = event_tx
440 .broadcast_direct(ChainEvent::WinningProbabilityDecreased(new))
441 .await;
442 }
443 Ordering::Greater => {
444 tracing::debug!(%new, %old, "winning probability increased");
445 let _ = event_tx
446 .broadcast_direct(ChainEvent::WinningProbabilityIncreased(new))
447 .await;
448 }
449 Ordering::Equal => {}
450 }
451 }
452 Ok(SubscribedEventType::TicketPrice((new, old))) => {
453 tracing::debug!(%new, ?old, "ticket price changed");
454 let _ = event_tx.broadcast_direct(ChainEvent::TicketPriceChanged(new)).await;
455 }
456 Err(error) => {
457 tracing::error!(%error, "error processing account/graph/ticket params subscription");
458 }
459 }
460 }
461 })
462 .await;
463 });
464
465 connection_ready_rx
466 .timeout(futures_time::time::Duration::from(timeout))
467 .map(|res| match res {
468 Ok(Ok(Ok(_))) => Ok(abort_handle),
469 Ok(Ok(Err(error))) => {
470 abort_handle.abort();
471 Err(ConnectorError::from(error))
472 }
473 Ok(Err(_)) => {
474 abort_handle.abort();
475 Err(ConnectorError::InvalidState("failed to determine connection state"))
476 }
477 Err(_) => {
478 abort_handle.abort();
479 tracing::error!(min_accounts, min_channels, "connection timeout when syncing");
480 Err(ConnectorError::ConnectionTimeout)
481 }
482 })
483 .await
484 }
485
486 pub async fn connect(&mut self) -> Result<(), ConnectorError> {
504 if self
505 .connection_handle
506 .as_ref()
507 .filter(|handle| !handle.is_aborted())
508 .is_some()
509 {
510 return Err(ConnectorError::InvalidState("connector is already connected"));
511 }
512
513 let abort_handle = self
514 .do_connect(self.cfg.connection_sync_timeout.max(MIN_CONNECTION_TIMEOUT))
515 .await?;
516
517 self.connection_handle = Some(abort_handle);
518
519 tracing::info!(node = %self.chain_key.public().to_address(), "connected to chain as node");
520 Ok(())
521 }
522
523 pub fn client(&self) -> &C {
525 self.client.as_ref()
526 }
527
528 pub fn is_connected(&self) -> bool {
530 self.check_connection_state().is_ok()
531 }
532}
533
534impl<B, C, P> HoprBlockchainConnector<C, B, P, P::TxRequest>
535where
536 C: BlokliTransactionClient + BlokliQueryClient + Send + Sync + 'static,
537 P: PayloadGenerator + Send + Sync,
538 P::TxRequest: Send + Sync,
539{
540 async fn send_tx<'a>(
541 &'a self,
542 tx_req: P::TxRequest,
543 custom_tx_multiplier: Option<u32>,
544 ) -> Result<impl Future<Output = Result<ChainReceipt, ConnectorError>> + Send + 'a, ConnectorError> {
545 let chain_info = self.query_cached_chain_info().await?;
546 let tx_timeout = custom_tx_multiplier.unwrap_or(self.cfg.tx_timeout_multiplier).max(1)
547 * chain_info.finality
548 * chain_info.expected_block_time;
549 Ok(self
550 .sequencer
551 .enqueue_transaction(tx_req, tx_timeout.max(MIN_TX_CONFIRM_TIMEOUT))
552 .await?
553 .and_then(|tx| {
554 futures::future::ready(
555 ChainReceipt::from_str(&tx.transaction_hash.0)
556 .map_err(|_| ConnectorError::TypeConversion("invalid tx hash".into())),
557 )
558 }))
559 }
560}
561
562impl<B, C, P, R> HoprBlockchainConnector<C, R, B, P> {
563 pub(crate) fn check_connection_state(&self) -> Result<(), ConnectorError> {
564 self.connection_handle
565 .as_ref()
566 .filter(|handle| !handle.is_aborted()) .ok_or(ConnectorError::InvalidState("connector is not connected"))
568 .map(|_| ())
569 }
570
571 pub fn invalidate_caches(&self) {
573 self.channel_by_parties.invalidate_all();
574 self.channel_by_id.invalidate_all();
575 self.packet_to_chain.invalidate_all();
576 self.chain_to_packet.invalidate_all();
577 self.values.invalidate_all();
578 }
579}
580
581impl<B, C, P, R> Drop for HoprBlockchainConnector<C, R, B, P> {
582 fn drop(&mut self) {
583 self.events.0.close();
584 if let Some(abort_handle) = self.connection_handle.take() {
585 abort_handle.abort();
586 }
587 }
588}
589
590impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
591where
592 B: Backend + Send + Sync + 'static,
593 C: Send + Sync,
594 P: Send + Sync,
595 R: Send + Sync,
596{
597 pub fn as_path_resolver(&self) -> ChainPathResolver<'_, Self> {
599 self.into()
600 }
601}
602
603#[cfg(test)]
604pub(crate) mod tests {
605 use blokli_client::BlokliTestState;
606 use hex_literal::hex;
607 use hopr_api::types::chain::contract_addresses_for_network;
608
609 use super::*;
610 use crate::{InMemoryBackend, testing::BlokliTestStateBuilder};
611
612 pub const PRIVATE_KEY_1: [u8; 32] = hex!("c14b8faa0a9b8a5fa4453664996f23a7e7de606d42297d723fc4a794f375e260");
613 pub const PRIVATE_KEY_2: [u8; 32] = hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775");
614 pub const MODULE_ADDR: [u8; 20] = hex!("1111111111111111111111111111111111111111");
615
616 pub type TestConnector<C> = HoprBlockchainConnector<
617 C,
618 InMemoryBackend,
619 SafePayloadGenerator,
620 <SafePayloadGenerator as PayloadGenerator>::TxRequest,
621 >;
622
623 pub fn create_connector<C>(blokli_client: C) -> anyhow::Result<TestConnector<C>>
624 where
625 C: BlokliQueryClient + BlokliTransactionClient + BlokliSubscriptionClient + Send + Sync + 'static,
626 {
627 let ckp = ChainKeypair::from_secret(&PRIVATE_KEY_1)?;
628
629 Ok(HoprBlockchainConnector::new(
630 ckp.clone(),
631 Default::default(),
632 blokli_client,
633 InMemoryBackend::default(),
634 SafePayloadGenerator::new(
635 &ckp,
636 contract_addresses_for_network("rotsee").unwrap().1,
637 MODULE_ADDR.into(),
638 ),
639 ))
640 }
641
642 #[tokio::test]
643 async fn connector_should_connect() -> anyhow::Result<()> {
644 let blokli_client = BlokliTestStateBuilder::default().build_static_client();
645
646 let mut connector = create_connector(blokli_client)?;
647 connector.connect().await?;
648
649 assert!(connector.is_connected());
650
651 Ok(())
652 }
653
654 #[tokio::test]
655 async fn connector_should_not_connect_when_blokli_not_healthy() -> anyhow::Result<()> {
656 let state = BlokliTestState {
657 health: "DOWN".into(),
658 ..Default::default()
659 };
660
661 let blokli_client = BlokliTestStateBuilder::from(state).build_static_client();
662
663 let mut connector = create_connector(blokli_client)?;
664
665 let res = connector.connect().await;
666
667 assert!(matches!(res, Err(ConnectorError::ServerNotHealthy)));
668 assert!(!connector.is_connected());
669
670 Ok(())
671 }
672}