1use std::{cmp::Ordering, str::FromStr, time::Duration};
2
3use blokli_client::api::{BlokliQueryClient, BlokliSubscriptionClient, BlokliTransactionClient};
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
5use futures_concurrency::stream::Merge;
6use futures_time::future::FutureExt as FuturesTimeExt;
7use hopr_api::chain::{ChainPathResolver, ChainReceipt, HoprKeyIdent};
8use hopr_async_runtime::AbortHandle;
9use hopr_chain_types::prelude::*;
10use hopr_crypto_types::prelude::*;
11use hopr_internal_types::prelude::*;
12use hopr_primitive_types::prelude::*;
13use petgraph::prelude::DiGraphMap;
14
15use crate::{
16 backend::Backend,
17 connector::{keys::HoprKeyMapper, sequencer::TransactionSequencer, values::CHAIN_INFO_CACHE_KEY},
18 errors::ConnectorError,
19 utils::{
20 ParsedChainInfo, model_to_account_entry, model_to_graph_entry, model_to_ticket_params,
21 process_channel_changes_into_events,
22 },
23};
24
25mod accounts;
26mod channels;
27mod events;
28mod keys;
29mod safe;
30mod sequencer;
31mod tickets;
32mod values;
33
34type EventsChannel = (
35 async_broadcast::Sender<ChainEvent>,
36 async_broadcast::InactiveReceiver<ChainEvent>,
37);
38
39const MIN_CONNECTION_TIMEOUT: Duration = Duration::from_millis(100);
40const MIN_TX_CONFIRM_TIMEOUT: Duration = Duration::from_secs(1);
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, smart_default::SmartDefault)]
44pub struct BlockchainConnectorConfig {
45 #[default(Duration::from_secs(30))]
49 pub tx_confirm_timeout: Duration,
50 #[default(Duration::from_secs(30))]
54 pub connection_timeout: Duration,
55}
56
57pub struct HoprBlockchainConnector<C, B, P, R> {
62 payload_generator: P,
63 chain_key: ChainKeypair,
64 client: std::sync::Arc<C>,
65 graph: std::sync::Arc<parking_lot::RwLock<DiGraphMap<HoprKeyIdent, ChannelId, ahash::RandomState>>>,
66 backend: std::sync::Arc<B>,
67 connection_handle: Option<AbortHandle>,
68 sequencer: TransactionSequencer<C, R>,
69 events: EventsChannel,
70 cfg: BlockchainConnectorConfig,
71
72 mapper: HoprKeyMapper<B>,
74 chain_to_packet: moka::future::Cache<Address, Option<OffchainPublicKey>, ahash::RandomState>,
76 packet_to_chain: moka::future::Cache<OffchainPublicKey, Option<Address>, ahash::RandomState>,
78 channel_by_id: moka::future::Cache<ChannelId, Option<ChannelEntry>, ahash::RandomState>,
80 channel_by_parties: moka::future::Cache<ChannelParties, Option<ChannelEntry>, ahash::RandomState>,
82 values: moka::future::Cache<u32, ParsedChainInfo>,
84}
85
86const EXPECTED_NUM_NODES: usize = 10_000;
87const EXPECTED_NUM_CHANNELS: usize = 100_000;
88
89const DEFAULT_CACHE_TIMEOUT: Duration = Duration::from_mins(10);
90
91impl<B, C, P> HoprBlockchainConnector<C, B, P, P::TxRequest>
92where
93 B: Backend + Send + Sync + 'static,
94 C: BlokliSubscriptionClient + BlokliQueryClient + BlokliTransactionClient + Send + Sync + 'static,
95 P: PayloadGenerator + Send + Sync + 'static,
96 P::TxRequest: Send + Sync + 'static,
97{
98 pub fn new(
100 chain_key: ChainKeypair,
101 cfg: BlockchainConnectorConfig,
102 client: C,
103 backend: B,
104 payload_generator: P,
105 ) -> Self {
106 let backend = std::sync::Arc::new(backend);
107 let (mut events_tx, events_rx) = async_broadcast::broadcast(1024);
108 events_tx.set_overflow(true);
109 events_tx.set_await_active(false);
110
111 let client = std::sync::Arc::new(client);
112 Self {
113 payload_generator,
114 graph: std::sync::Arc::new(parking_lot::RwLock::new(DiGraphMap::with_capacity_and_hasher(
115 EXPECTED_NUM_NODES,
116 EXPECTED_NUM_CHANNELS,
117 ahash::RandomState::default(),
118 ))),
119 backend: backend.clone(),
120 connection_handle: None,
121 sequencer: TransactionSequencer::new(chain_key.clone(), client.clone()),
122 events: (events_tx, events_rx.deactivate()),
123 client,
124 chain_key,
125 cfg,
126 mapper: HoprKeyMapper {
127 id_to_key: moka::sync::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
128 .time_to_idle(DEFAULT_CACHE_TIMEOUT)
129 .build_with_hasher(ahash::RandomState::default()),
130 key_to_id: moka::sync::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
131 .time_to_idle(DEFAULT_CACHE_TIMEOUT)
132 .build_with_hasher(ahash::RandomState::default()),
133 backend,
134 },
135 chain_to_packet: moka::future::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
136 .time_to_idle(DEFAULT_CACHE_TIMEOUT)
137 .build_with_hasher(ahash::RandomState::default()),
138 packet_to_chain: moka::future::CacheBuilder::new(EXPECTED_NUM_NODES as u64)
139 .time_to_idle(DEFAULT_CACHE_TIMEOUT)
140 .build_with_hasher(ahash::RandomState::default()),
141 channel_by_id: moka::future::CacheBuilder::new(EXPECTED_NUM_CHANNELS as u64)
142 .time_to_idle(DEFAULT_CACHE_TIMEOUT)
143 .build_with_hasher(ahash::RandomState::default()),
144 channel_by_parties: moka::future::CacheBuilder::new(EXPECTED_NUM_CHANNELS as u64)
145 .time_to_idle(DEFAULT_CACHE_TIMEOUT)
146 .build_with_hasher(ahash::RandomState::default()),
147 values: moka::future::CacheBuilder::new(1)
148 .time_to_live(DEFAULT_CACHE_TIMEOUT)
149 .build(),
150 }
151 }
152
153 async fn do_connect(&self, timeout: Duration) -> Result<AbortHandle, ConnectorError> {
154 const TOLERANCE: f64 = 0.01;
155 let min_accounts = (self
156 .client
157 .count_accounts(blokli_client::api::AccountSelector::Any)
158 .await? as f64
159 * (1.0 - TOLERANCE))
160 .round() as u32;
161 let min_channels = (self
162 .client
163 .count_channels(blokli_client::api::ChannelSelector {
164 filter: None,
165 status: None,
166 })
167 .await? as f64
168 * (1.0 - TOLERANCE))
169 .round() as u32;
170 tracing::debug!(min_accounts, min_channels, "connection thresholds");
171
172 let server_health = self.client.query_health().await?;
173 if !server_health.eq_ignore_ascii_case("OK") {
174 tracing::error!(server_health, "blokli server not healthy");
175 return Err(ConnectorError::ServerNotHealthy);
176 }
177
178 let (abort_handle, abort_reg) = AbortHandle::new_pair();
179
180 let (connection_ready_tx, connection_ready_rx) = futures::channel::oneshot::channel();
181 let mut connection_ready_tx = Some(connection_ready_tx);
182
183 let client = self.client.clone();
184 let mapper = self.mapper.clone();
185 let backend = self.backend.clone();
186 let graph = self.graph.clone();
187 let event_tx = self.events.0.clone();
188 let me = self.chain_key.public().to_address();
189 let values_cache = self.values.clone();
190
191 let chain_to_packet = self.chain_to_packet.clone();
192 let packet_to_chain = self.packet_to_chain.clone();
193
194 let channel_by_id = self.channel_by_id.clone();
195 let channel_by_parties = self.channel_by_parties.clone();
196
197 self.query_cached_chain_info().await?;
199
200 #[allow(clippy::large_enum_variant)]
201 #[derive(Debug)]
202 enum SubscribedEventType {
203 Account((AccountEntry, Option<AccountEntry>)),
204 Channel((ChannelEntry, Option<Vec<ChannelChange>>)),
205 WinningProbability((WinningProbability, Option<WinningProbability>)),
206 TicketPrice((HoprBalance, Option<HoprBalance>)),
207 }
208
209 hopr_async_runtime::prelude::spawn(async move {
210 let connections = client
211 .subscribe_accounts(blokli_client::api::AccountSelector::Any)
212 .and_then(|accounts| Ok((accounts, client.subscribe_graph()?)))
213 .and_then(|(accounts, channels)| Ok((accounts, channels, client.subscribe_ticket_params()?)));
214
215 if let Err(error) = connections {
216 if let Some(connection_ready_tx) = connection_ready_tx.take() {
217 let _ = connection_ready_tx.send(Err(error));
218 }
219 return;
220 }
221
222 let (account_stream, channel_stream, ticket_params_stream) = connections.unwrap();
223
224 let graph_clone = graph.clone();
226 let account_stream = account_stream
227 .inspect_ok(|entry| tracing::trace!(?entry, "new account entry"))
228 .map_err(ConnectorError::from)
229 .try_filter_map(|account| futures::future::ready(model_to_account_entry(account).map(Some)))
230 .and_then(move |account| {
231 let graph = graph_clone.clone();
232 let mapper = mapper.clone();
233 let chain_to_packet = chain_to_packet.clone();
234 let packet_to_chain = packet_to_chain.clone();
235 hopr_async_runtime::prelude::spawn_blocking(move || {
236 mapper.key_to_id.insert(account.public_key, Some(account.key_id));
237 mapper.id_to_key.insert(account.key_id, Some(account.public_key));
238 graph.write().add_node(account.key_id);
239 mapper.backend.insert_account(account.clone()).map(|old| (account, old))
240 })
241 .map_err(|e| ConnectorError::BackendError(e.into()))
242 .and_then(move |res| {
243 let chain_to_packet = chain_to_packet.clone();
244 let packet_to_chain = packet_to_chain.clone();
245 async move {
246 if let Ok((account, _)) = &res {
247 chain_to_packet
249 .insert(account.chain_addr, Some(account.public_key))
250 .await;
251 packet_to_chain
252 .insert(account.public_key, Some(account.chain_addr))
253 .await;
254 }
255 res.map(SubscribedEventType::Account)
256 .map_err(|e| ConnectorError::BackendError(e.into()))
257 }
258 })
259 })
260 .fuse();
261
262 let channel_stream = channel_stream
264 .map_err(ConnectorError::from)
265 .inspect_ok(|entry| tracing::trace!(?entry, "new graph entry"))
266 .try_filter_map(|graph_event| futures::future::ready(model_to_graph_entry(graph_event).map(Some)))
267 .and_then(move |(src, dst, channel)| {
268 let graph = graph.clone();
269 let backend = backend.clone();
270 let channel_by_id = channel_by_id.clone();
271 let channel_by_parties = channel_by_parties.clone();
272 hopr_async_runtime::prelude::spawn_blocking(move || {
273 graph.write().add_edge(src.key_id, dst.key_id, *channel.get_id());
274 backend
275 .insert_channel(channel)
276 .map(|old| (channel, old.map(|old| old.diff(&channel))))
277 })
278 .map_err(|e| ConnectorError::BackendError(e.into()))
279 .and_then(move |res| {
280 let channel_by_id = channel_by_id.clone();
281 let channel_by_parties = channel_by_parties.clone();
282 async move {
283 if let Ok((channel, _)) = &res {
284 channel_by_id.insert(*channel.get_id(), Some(*channel)).await;
286 channel_by_parties
287 .insert(ChannelParties::from(channel), Some(*channel))
288 .await;
289 }
290 res.map(SubscribedEventType::Channel)
291 .map_err(|e| ConnectorError::BackendError(e.into()))
292 }
293 })
294 })
295 .fuse();
296
297 let ticket_params_stream = ticket_params_stream
299 .map_err(ConnectorError::from)
300 .inspect_ok(|entry| tracing::trace!(?entry, "new ticket params"))
301 .try_filter_map(|ticket_value_event| {
302 futures::future::ready(model_to_ticket_params(ticket_value_event).map(Some))
303 })
304 .and_then(|(new_ticket_price, new_win_prob)| {
305 let values_cache = values_cache.clone();
306 async move {
307 let mut events = Vec::<SubscribedEventType>::new();
308 values_cache
309 .entry(CHAIN_INFO_CACHE_KEY)
310 .and_compute_with(|cached_entry| {
311 futures::future::ready(match cached_entry {
312 Some(chain_info) => {
313 let mut chain_info = chain_info.into_value();
314 if chain_info.ticket_price != new_ticket_price {
315 events.push(SubscribedEventType::TicketPrice((
316 new_ticket_price,
317 Some(chain_info.ticket_price),
318 )));
319 chain_info.ticket_price = new_ticket_price;
320 }
321 if !chain_info.ticket_win_prob.approx_eq(&new_win_prob) {
322 events.push(SubscribedEventType::WinningProbability((
323 new_win_prob,
324 Some(chain_info.ticket_win_prob),
325 )));
326 chain_info.ticket_win_prob = new_win_prob;
327 }
328
329 if !events.is_empty() {
330 moka::ops::compute::Op::Put(chain_info)
331 } else {
332 moka::ops::compute::Op::Nop
333 }
334 }
335 None => {
336 tracing::warn!(
337 "chain info not present in the cache before ticket params update"
338 );
339 events.push(SubscribedEventType::TicketPrice((new_ticket_price, None)));
340 events.push(SubscribedEventType::WinningProbability((new_win_prob, None)));
341 moka::ops::compute::Op::Nop
342 }
343 })
344 })
345 .await;
346 Ok(futures::stream::iter(events).map(Ok::<_, ConnectorError>))
347 }
348 })
349 .try_flatten()
350 .fuse();
351
352 let mut account_counter = 0;
353 let mut channel_counter = 0;
354 if min_accounts == 0 && min_channels == 0 {
355 tracing::debug!(account_counter, channel_counter, "on-chain graph has been synced");
356 let _ = connection_ready_tx.take().unwrap().send(Ok(()));
357 }
358
359 futures::stream::Abortable::new(
360 (account_stream, channel_stream, ticket_params_stream).merge(),
361 abort_reg,
362 )
363 .inspect_ok(move |event_type| {
364 if connection_ready_tx.is_some() {
365 match event_type {
366 SubscribedEventType::Account(_) => account_counter += 1,
367 SubscribedEventType::Channel(_) => channel_counter += 1,
368 _ => {}
369 }
370
371 if account_counter >= min_accounts && channel_counter >= min_channels {
375 tracing::debug!(account_counter, channel_counter, "on-chain graph has been synced");
376 let _ = connection_ready_tx.take().unwrap().send(Ok(()));
377 }
378 }
379 })
380 .for_each(|event_type| {
381 let event_tx = event_tx.clone();
382 async move {
383 match event_type {
384 Ok(SubscribedEventType::Account((new_account, old_account))) => {
385 tracing::debug!(%new_account, "account inserted");
386 if new_account.has_announced_with_routing_info()
389 && old_account.is_none_or(|a| !a.has_announced_with_routing_info())
390 {
391 tracing::debug!(account = %new_account, "new announcement");
392 let _ = event_tx
393 .broadcast_direct(ChainEvent::Announcement(new_account.clone()))
394 .await;
395 }
396 }
397 Ok(SubscribedEventType::Channel((new_channel, Some(changes)))) => {
398 tracing::debug!(
399 id = %new_channel.get_id(),
400 src = %new_channel.source, dst = %new_channel.destination,
401 num_changes = changes.len(),
402 "channel updated"
403 );
404 process_channel_changes_into_events(new_channel, changes, &me, &event_tx).await;
405 }
406 Ok(SubscribedEventType::Channel((new_channel, None))) => {
407 tracing::debug!(
408 id = %new_channel.get_id(),
409 src = %new_channel.source, dst = %new_channel.destination,
410 "channel opened"
411 );
412 let _ = event_tx.broadcast_direct(ChainEvent::ChannelOpened(new_channel)).await;
413 }
414 Ok(SubscribedEventType::WinningProbability((new, old))) => {
415 let old = old.unwrap_or_default();
416 match new.approx_cmp(&old) {
417 Ordering::Less => {
418 tracing::debug!(%new, %old, "winning probability decreased");
419 let _ = event_tx
420 .broadcast_direct(ChainEvent::WinningProbabilityDecreased(new))
421 .await;
422 }
423 Ordering::Greater => {
424 tracing::debug!(%new, %old, "winning probability increased");
425 let _ = event_tx
426 .broadcast_direct(ChainEvent::WinningProbabilityIncreased(new))
427 .await;
428 }
429 Ordering::Equal => {}
430 }
431 }
432 Ok(SubscribedEventType::TicketPrice((new, old))) => {
433 tracing::debug!(%new, ?old, "ticket price changed");
434 let _ = event_tx.broadcast_direct(ChainEvent::TicketPriceChanged(new)).await;
435 }
436 Err(error) => {
437 tracing::error!(%error, "error processing account/graph/ticket params subscription");
438 }
439 }
440 }
441 })
442 .await;
443 });
444
445 connection_ready_rx
446 .timeout(futures_time::time::Duration::from(timeout))
447 .map(|res| match res {
448 Ok(Ok(Ok(_))) => Ok(abort_handle),
449 Ok(Ok(Err(error))) => {
450 abort_handle.abort();
451 Err(ConnectorError::from(error))
452 }
453 Ok(Err(_)) => {
454 abort_handle.abort();
455 Err(ConnectorError::InvalidState("failed to determine connection state"))
456 }
457 Err(_) => {
458 abort_handle.abort();
459 tracing::error!(min_accounts, min_channels, "connection timeout when syncing");
460 Err(ConnectorError::ConnectionTimeout)
461 }
462 })
463 .await
464 }
465
466 pub async fn connect(&mut self) -> Result<(), ConnectorError> {
483 if self
484 .connection_handle
485 .as_ref()
486 .filter(|handle| !handle.is_aborted())
487 .is_some()
488 {
489 return Err(ConnectorError::InvalidState("connector is already connected"));
490 }
491
492 let abort_handle = self
493 .do_connect(self.cfg.connection_timeout.max(MIN_CONNECTION_TIMEOUT))
494 .await?;
495
496 if let Err(error) = self.sequencer.start().await {
497 abort_handle.abort();
498 return Err(error);
499 }
500
501 self.connection_handle = Some(abort_handle);
502
503 tracing::info!(node = %self.chain_key.public().to_address(), "connected to chain as node");
504 Ok(())
505 }
506
507 pub fn client(&self) -> &C {
509 self.client.as_ref()
510 }
511
512 pub fn is_connected(&self) -> bool {
514 self.check_connection_state().is_ok()
515 }
516}
517
518impl<B, C, P> HoprBlockchainConnector<C, B, P, P::TxRequest>
519where
520 C: BlokliTransactionClient + Send + Sync + 'static,
521 P: PayloadGenerator + Send + Sync,
522 P::TxRequest: Send + Sync,
523{
524 async fn send_tx<'a>(
525 &'a self,
526 tx_req: P::TxRequest,
527 ) -> Result<impl Future<Output = Result<ChainReceipt, ConnectorError>> + Send + 'a, ConnectorError> {
528 Ok(self
529 .sequencer
530 .enqueue_transaction(tx_req, self.cfg.tx_confirm_timeout.max(MIN_TX_CONFIRM_TIMEOUT))
531 .await?
532 .and_then(|tx| {
533 futures::future::ready(
534 ChainReceipt::from_str(&tx.transaction_hash.0)
535 .map_err(|_| ConnectorError::TypeConversion("invalid tx hash".into())),
536 )
537 }))
538 }
539}
540
541impl<B, C, P, R> HoprBlockchainConnector<C, R, B, P> {
542 pub(crate) fn check_connection_state(&self) -> Result<(), ConnectorError> {
543 self.connection_handle
544 .as_ref()
545 .filter(|handle| !handle.is_aborted()) .ok_or(ConnectorError::InvalidState("connector is not connected"))
547 .map(|_| ())
548 }
549
550 pub fn invalidate_caches(&self) {
552 self.channel_by_parties.invalidate_all();
553 self.channel_by_id.invalidate_all();
554 self.packet_to_chain.invalidate_all();
555 self.chain_to_packet.invalidate_all();
556 self.values.invalidate_all();
557 }
558}
559
560impl<B, C, P, R> Drop for HoprBlockchainConnector<C, R, B, P> {
561 fn drop(&mut self) {
562 self.events.0.close();
563 if let Some(abort_handle) = self.connection_handle.take() {
564 abort_handle.abort();
565 }
566 }
567}
568
569impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
570where
571 B: Backend + Send + Sync + 'static,
572 C: Send + Sync,
573 P: Send + Sync,
574 R: Send + Sync,
575{
576 pub fn as_path_resolver(&self) -> ChainPathResolver<'_, Self> {
578 self.into()
579 }
580}
581
582#[cfg(test)]
583pub(crate) mod tests {
584 use blokli_client::BlokliTestState;
585 use hex_literal::hex;
586 use hopr_chain_types::contract_addresses_for_network;
587
588 use super::*;
589 use crate::{InMemoryBackend, testing::BlokliTestStateBuilder};
590
591 pub const PRIVATE_KEY_1: [u8; 32] = hex!("c14b8faa0a9b8a5fa4453664996f23a7e7de606d42297d723fc4a794f375e260");
592 pub const PRIVATE_KEY_2: [u8; 32] = hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775");
593 pub const MODULE_ADDR: [u8; 20] = hex!("1111111111111111111111111111111111111111");
594
595 pub type TestConnector<C> = HoprBlockchainConnector<
596 C,
597 InMemoryBackend,
598 SafePayloadGenerator,
599 <SafePayloadGenerator as PayloadGenerator>::TxRequest,
600 >;
601
602 pub fn create_connector<C>(blokli_client: C) -> anyhow::Result<TestConnector<C>>
603 where
604 C: BlokliQueryClient + BlokliTransactionClient + BlokliSubscriptionClient + Send + Sync + 'static,
605 {
606 let ckp = ChainKeypair::from_secret(&PRIVATE_KEY_1)?;
607
608 Ok(HoprBlockchainConnector::new(
609 ckp.clone(),
610 Default::default(),
611 blokli_client,
612 InMemoryBackend::default(),
613 SafePayloadGenerator::new(
614 &ckp,
615 contract_addresses_for_network("rotsee").unwrap().1,
616 MODULE_ADDR.into(),
617 ),
618 ))
619 }
620
621 #[tokio::test]
622 async fn connector_should_connect() -> anyhow::Result<()> {
623 let blokli_client = BlokliTestStateBuilder::default().build_static_client();
624
625 let mut connector = create_connector(blokli_client)?;
626 connector.connect().await?;
627
628 assert!(connector.is_connected());
629
630 Ok(())
631 }
632
633 #[tokio::test]
634 async fn connector_should_not_connect_when_blokli_not_healthy() -> anyhow::Result<()> {
635 let mut state = BlokliTestState::default();
636 state.health = "DOWN".into();
637
638 let blokli_client = BlokliTestStateBuilder::from(state).build_static_client();
639
640 let mut connector = create_connector(blokli_client)?;
641
642 let res = connector.connect().await;
643
644 assert!(matches!(res, Err(ConnectorError::ServerNotHealthy)));
645 assert!(!connector.is_connected());
646
647 Ok(())
648 }
649}