1use std::{cmp::Ordering, str::FromStr, time::Duration};
2
3use blokli_client::api::{BlokliQueryClient, BlokliSubscriptionClient, BlokliTransactionClient};
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
5use futures_concurrency::stream::Merge;
6use futures_time::future::FutureExt as FuturesTimeExt;
7use hopr_api::chain::{ChainPathResolver, ChainReceipt, HoprKeyIdent};
8use hopr_async_runtime::AbortHandle;
9use hopr_chain_types::prelude::*;
10use hopr_crypto_types::prelude::*;
11use hopr_internal_types::prelude::*;
12use hopr_primitive_types::prelude::*;
13use petgraph::prelude::DiGraphMap;
14
15use crate::{
16 backend::Backend,
17 connector::{
18 keys::HoprKeyMapper,
19 sequencer::TransactionSequencer,
20 utils::{
21 ParsedChainInfo, model_to_account_entry, model_to_graph_entry, model_to_ticket_params,
22 process_channel_changes_into_events,
23 },
24 values::CHAIN_INFO_CACHE_KEY,
25 },
26 errors::ConnectorError,
27};
28
29mod accounts;
30mod channels;
31mod events;
32mod keys;
33mod sequencer;
34mod tickets;
35mod utils;
36mod values;
37
38type EventsChannel = (
39 async_broadcast::Sender<ChainEvent>,
40 async_broadcast::InactiveReceiver<ChainEvent>,
41);
42
43#[derive(Clone, Copy, Debug, PartialEq, Eq, smart_default::SmartDefault)]
45pub struct BlockchainConnectorConfig {
46 #[default(Duration::from_secs(30))]
48 pub tx_confirm_timeout: Duration,
49 #[default(HoprBalance::from_str("0.01 wxHOPR").unwrap())]
51 pub new_key_binding_fee: HoprBalance,
52}
53
54pub struct HoprBlockchainConnector<C, B, P, R> {
59 payload_generator: P,
60 chain_key: ChainKeypair,
61 client: std::sync::Arc<C>,
62 graph: std::sync::Arc<parking_lot::RwLock<DiGraphMap<HoprKeyIdent, ChannelId, ahash::RandomState>>>,
63 backend: std::sync::Arc<B>,
64 connection_handle: Option<AbortHandle>,
65 sequencer: TransactionSequencer<C, R>,
66 events: EventsChannel,
67 cfg: BlockchainConnectorConfig,
68
69 mapper: HoprKeyMapper<B>,
71 chain_to_packet: moka::future::Cache<Address, Option<OffchainPublicKey>, ahash::RandomState>,
73 packet_to_chain: moka::future::Cache<OffchainPublicKey, Option<Address>, ahash::RandomState>,
75 channel_by_id: moka::future::Cache<ChannelId, Option<ChannelEntry>, ahash::RandomState>,
77 channel_by_parties: moka::future::Cache<ChannelParties, Option<ChannelEntry>, ahash::RandomState>,
79 values: moka::future::Cache<u32, ParsedChainInfo>,
81}
82
83impl<B, C, P> HoprBlockchainConnector<C, B, P, P::TxRequest>
84where
85 B: Backend + Send + Sync + 'static,
86 C: BlokliSubscriptionClient + BlokliQueryClient + BlokliTransactionClient + Send + Sync + 'static,
87 P: PayloadGenerator + Send + Sync + 'static,
88 P::TxRequest: Send + Sync + 'static,
89{
90 pub fn new(
92 chain_key: ChainKeypair,
93 cfg: BlockchainConnectorConfig,
94 client: C,
95 backend: B,
96 payload_generator: P,
97 ) -> Self {
98 let backend = std::sync::Arc::new(backend);
99 let (mut events_tx, events_rx) = async_broadcast::broadcast(1024);
100 events_tx.set_overflow(true);
101 events_tx.set_await_active(false);
102
103 let client = std::sync::Arc::new(client);
104 Self {
105 payload_generator,
106 graph: std::sync::Arc::new(parking_lot::RwLock::new(DiGraphMap::with_capacity_and_hasher(
107 10_000,
108 100_000,
109 ahash::RandomState::default(),
110 ))),
111 backend: backend.clone(),
112 connection_handle: None,
113 sequencer: TransactionSequencer::new(chain_key.clone(), client.clone()),
114 events: (events_tx, events_rx.deactivate()),
115 client,
116 chain_key,
117 cfg,
118 mapper: HoprKeyMapper {
119 id_to_key: moka::sync::CacheBuilder::new(10_000)
120 .time_to_idle(Duration::from_secs(600))
121 .build_with_hasher(ahash::RandomState::default()),
122 key_to_id: moka::sync::CacheBuilder::new(10_000)
123 .time_to_idle(Duration::from_secs(600))
124 .build_with_hasher(ahash::RandomState::default()),
125 backend,
126 },
127 chain_to_packet: moka::future::CacheBuilder::new(10_000)
128 .time_to_idle(Duration::from_secs(600))
129 .build_with_hasher(ahash::RandomState::default()),
130 packet_to_chain: moka::future::CacheBuilder::new(10_000)
131 .time_to_idle(Duration::from_secs(600))
132 .build_with_hasher(ahash::RandomState::default()),
133 channel_by_id: moka::future::CacheBuilder::new(100_000)
134 .time_to_idle(Duration::from_secs(600))
135 .build_with_hasher(ahash::RandomState::default()),
136 channel_by_parties: moka::future::CacheBuilder::new(100_000)
137 .time_to_idle(Duration::from_secs(600))
138 .build_with_hasher(ahash::RandomState::default()),
139 values: moka::future::CacheBuilder::new(1)
140 .time_to_live(Duration::from_secs(600))
141 .build(),
142 }
143 }
144
145 async fn do_connect(&self, timeout: Duration) -> Result<AbortHandle, ConnectorError> {
146 const TOLERANCE: f64 = 0.01;
147 let min_accounts = (self.client.count_accounts(None).await? as f64 * (1.0 - TOLERANCE)).round() as u32;
148 let min_channels = (self.client.count_channels(None).await? as f64 * (1.0 - TOLERANCE)).round() as u32;
149 tracing::debug!(min_accounts, min_channels, "connection thresholds");
150
151 let (abort_handle, abort_reg) = AbortHandle::new_pair();
152
153 let (connection_ready_tx, connection_ready_rx) = futures::channel::oneshot::channel();
154 let mut connection_ready_tx = Some(connection_ready_tx);
155
156 let client = self.client.clone();
157 let mapper = self.mapper.clone();
158 let backend = self.backend.clone();
159 let graph = self.graph.clone();
160 let event_tx = self.events.0.clone();
161 let me = self.chain_key.public().to_address();
162 let values_cache = self.values.clone();
163
164 let chain_to_packet = self.chain_to_packet.clone();
165 let packet_to_chain = self.packet_to_chain.clone();
166
167 let channel_by_id = self.channel_by_id.clone();
168 let channel_by_parties = self.channel_by_parties.clone();
169
170 self.query_cached_chain_info().await?;
172
173 #[allow(clippy::large_enum_variant)]
174 #[derive(Debug)]
175 enum SubscribedEventType {
176 Account((AccountEntry, Option<AccountEntry>)),
177 Channel((ChannelEntry, Option<Vec<ChannelChange>>)),
178 WinningProbability((WinningProbability, Option<WinningProbability>)),
179 TicketPrice((HoprBalance, Option<HoprBalance>)),
180 }
181
182 hopr_async_runtime::prelude::spawn(async move {
183 let connections = client
184 .subscribe_accounts(None)
185 .and_then(|accounts| Ok((accounts, client.subscribe_graph()?)))
186 .and_then(|(accounts, channels)| Ok((accounts, channels, client.subscribe_ticket_params()?)));
187
188 if let Err(error) = connections {
189 if let Some(connection_ready_tx) = connection_ready_tx.take() {
190 let _ = connection_ready_tx.send(Err(error));
191 }
192 return;
193 }
194
195 let (account_stream, channel_stream, ticket_params_stream) = connections.unwrap();
196
197 let graph_clone = graph.clone();
199 let account_stream = account_stream
200 .inspect_ok(|entry| tracing::trace!(?entry, "new account entry"))
201 .map_err(ConnectorError::from)
202 .try_filter_map(|account| futures::future::ready(model_to_account_entry(account).map(Some)))
203 .and_then(move |account| {
204 let graph = graph_clone.clone();
205 let mapper = mapper.clone();
206 let chain_to_packet = chain_to_packet.clone();
207 let packet_to_chain = packet_to_chain.clone();
208 hopr_async_runtime::prelude::spawn_blocking(move || {
209 mapper.key_to_id.insert(account.public_key, Some(account.key_id));
210 mapper.id_to_key.insert(account.key_id, Some(account.public_key));
211 graph.write().add_node(account.key_id);
212 mapper.backend.insert_account(account.clone()).map(|old| (account, old))
213 })
214 .map_err(|e| ConnectorError::BackendError(e.into()))
215 .and_then(move |res| {
216 let chain_to_packet = chain_to_packet.clone();
217 let packet_to_chain = packet_to_chain.clone();
218 async move {
219 if let Ok((account, _)) = &res {
220 chain_to_packet
222 .insert(account.chain_addr, Some(account.public_key))
223 .await;
224 packet_to_chain
225 .insert(account.public_key, Some(account.chain_addr))
226 .await;
227 }
228 res.map(SubscribedEventType::Account)
229 .map_err(|e| ConnectorError::BackendError(e.into()))
230 }
231 })
232 })
233 .fuse();
234
235 let channel_stream = channel_stream
237 .map_err(ConnectorError::from)
238 .inspect_ok(|entry| tracing::trace!(?entry, "new graph entry"))
239 .try_filter_map(|graph_event| futures::future::ready(model_to_graph_entry(graph_event).map(Some)))
240 .and_then(move |(src, dst, channel)| {
241 let graph = graph.clone();
242 let backend = backend.clone();
243 let channel_by_id = channel_by_id.clone();
244 let channel_by_parties = channel_by_parties.clone();
245 hopr_async_runtime::prelude::spawn_blocking(move || {
246 graph.write().add_edge(src.key_id, dst.key_id, channel.get_id());
247 backend
248 .insert_channel(channel)
249 .map(|old| (channel, old.map(|old| old.diff(&channel))))
250 })
251 .map_err(|e| ConnectorError::BackendError(e.into()))
252 .and_then(move |res| {
253 let channel_by_id = channel_by_id.clone();
254 let channel_by_parties = channel_by_parties.clone();
255 async move {
256 if let Ok((channel, _)) = &res {
257 channel_by_id.insert(channel.get_id(), Some(*channel)).await;
259 channel_by_parties
260 .insert(ChannelParties::from(channel), Some(*channel))
261 .await;
262 }
263 res.map(SubscribedEventType::Channel)
264 .map_err(|e| ConnectorError::BackendError(e.into()))
265 }
266 })
267 })
268 .fuse();
269
270 let ticket_params_stream = ticket_params_stream
272 .map_err(ConnectorError::from)
273 .inspect_ok(|entry| tracing::trace!(?entry, "new ticket params"))
274 .try_filter_map(|ticket_value_event| {
275 futures::future::ready(model_to_ticket_params(ticket_value_event).map(Some))
276 })
277 .and_then(|(new_ticket_price, new_win_prob)| {
278 let values_cache = values_cache.clone();
279 async move {
280 let mut events = Vec::<SubscribedEventType>::new();
281 values_cache
282 .entry(CHAIN_INFO_CACHE_KEY)
283 .and_compute_with(|cached_entry| {
284 futures::future::ready(match cached_entry {
285 Some(chain_info) => {
286 let mut chain_info = chain_info.into_value();
287 if chain_info.ticket_price != new_ticket_price {
288 events.push(SubscribedEventType::TicketPrice((
289 new_ticket_price,
290 Some(chain_info.ticket_price),
291 )));
292 chain_info.ticket_price = new_ticket_price;
293 }
294 if !chain_info.ticket_win_prob.approx_eq(&new_win_prob) {
295 events.push(SubscribedEventType::WinningProbability((
296 new_win_prob,
297 Some(chain_info.ticket_win_prob),
298 )));
299 chain_info.ticket_win_prob = new_win_prob;
300 }
301
302 if !events.is_empty() {
303 moka::ops::compute::Op::Put(chain_info)
304 } else {
305 moka::ops::compute::Op::Nop
306 }
307 }
308 None => {
309 tracing::warn!(
310 "chain info not present in the cache before ticket params update"
311 );
312 events.push(SubscribedEventType::TicketPrice((new_ticket_price, None)));
313 events.push(SubscribedEventType::WinningProbability((new_win_prob, None)));
314 moka::ops::compute::Op::Nop
315 }
316 })
317 })
318 .await;
319 Ok(futures::stream::iter(events).map(Ok::<_, ConnectorError>))
320 }
321 })
322 .try_flatten()
323 .fuse();
324
325 let mut account_counter = 0;
326 let mut channel_counter = 0;
327 if min_accounts == 0 && min_channels == 0 {
328 tracing::debug!(account_counter, channel_counter, "on-chain graph has been synced");
329 let _ = connection_ready_tx.take().unwrap().send(Ok(()));
330 }
331
332 futures::stream::Abortable::new(
333 (account_stream, channel_stream, ticket_params_stream).merge(),
334 abort_reg,
335 )
336 .inspect_ok(move |event_type| {
337 if connection_ready_tx.is_some() {
338 match event_type {
339 SubscribedEventType::Account(_) => account_counter += 1,
340 SubscribedEventType::Channel(_) => channel_counter += 1,
341 _ => {}
342 }
343
344 if account_counter >= min_accounts && channel_counter >= min_channels {
348 tracing::debug!(account_counter, channel_counter, "on-chain graph has been synced");
349 let _ = connection_ready_tx.take().unwrap().send(Ok(()));
350 }
351 }
352 })
353 .for_each(|event_type| {
354 let event_tx = event_tx.clone();
355 async move {
356 match event_type {
357 Ok(SubscribedEventType::Account((new_account, old_account))) => {
358 tracing::debug!(%new_account, "account inserted");
359 if new_account.has_announced_with_routing_info()
362 && old_account.is_none_or(|a| !a.has_announced_with_routing_info())
363 {
364 tracing::debug!(account = %new_account, "new announcement");
365 let _ = event_tx
366 .broadcast_direct(ChainEvent::Announcement(new_account.clone()))
367 .await;
368 }
369 }
370 Ok(SubscribedEventType::Channel((new_channel, Some(changes)))) => {
371 tracing::debug!(
372 id = %new_channel.get_id(),
373 src = %new_channel.source, dst = %new_channel.destination,
374 num_changes = changes.len(),
375 "channel updated"
376 );
377 process_channel_changes_into_events(new_channel, changes, &me, &event_tx).await;
378 }
379 Ok(SubscribedEventType::Channel((new_channel, None))) => {
380 tracing::debug!(
381 id = %new_channel.get_id(),
382 src = %new_channel.source, dst = %new_channel.destination,
383 "channel opened"
384 );
385 let _ = event_tx.broadcast_direct(ChainEvent::ChannelOpened(new_channel)).await;
386 }
387 Ok(SubscribedEventType::WinningProbability((new, old))) => {
388 let old = old.unwrap_or_default();
389 match new.approx_cmp(&old) {
390 Ordering::Less => {
391 tracing::debug!(%new, %old, "winning probability decreased");
392 let _ = event_tx
393 .broadcast_direct(ChainEvent::WinningProbabilityDecreased(new))
394 .await;
395 }
396 Ordering::Greater => {
397 tracing::debug!(%new, %old, "winning probability increased");
398 let _ = event_tx
399 .broadcast_direct(ChainEvent::WinningProbabilityIncreased(new))
400 .await;
401 }
402 Ordering::Equal => {}
403 }
404 }
405 Ok(SubscribedEventType::TicketPrice((new, old))) => {
406 tracing::debug!(%new, ?old, "ticket price changed");
407 let _ = event_tx.broadcast_direct(ChainEvent::TicketPriceChanged(new)).await;
408 }
409 Err(error) => {
410 tracing::error!(%error, "error processing account/graph/ticket params subscription");
411 }
412 }
413 }
414 })
415 .await;
416 });
417
418 connection_ready_rx
419 .timeout(futures_time::time::Duration::from(timeout))
420 .map(|res| match res {
421 Ok(Ok(Ok(_))) => Ok(abort_handle),
422 Ok(Ok(Err(error))) => {
423 abort_handle.abort();
424 Err(ConnectorError::from(error))
425 }
426 Ok(Err(_)) => {
427 abort_handle.abort();
428 Err(ConnectorError::InvalidState("failed to determine connection state"))
429 }
430 Err(_) => {
431 abort_handle.abort();
432 tracing::error!(min_accounts, min_channels, "connection timeout when syncing");
433 Err(ConnectorError::ConnectionTimeout)
434 }
435 })
436 .await
437 }
438
439 pub async fn connect(&mut self, timeout: Duration) -> Result<(), ConnectorError> {
445 if self
446 .connection_handle
447 .as_ref()
448 .filter(|handle| !handle.is_aborted())
449 .is_some()
450 {
451 return Err(ConnectorError::InvalidState("connector is already connected"));
452 }
453
454 let abort_handle = self.do_connect(timeout).await?;
455
456 if let Err(error) = self.sequencer.start().await {
457 abort_handle.abort();
458 return Err(error);
459 }
460
461 self.connection_handle = Some(abort_handle);
462
463 tracing::info!(node = %self.chain_key.public().to_address(), "connected to chain as node");
464 Ok(())
465 }
466
467 pub fn client(&self) -> &C {
469 self.client.as_ref()
470 }
471}
472
473impl<B, C, P> HoprBlockchainConnector<C, B, P, P::TxRequest>
474where
475 C: BlokliTransactionClient + Send + Sync + 'static,
476 P: PayloadGenerator + Send + Sync,
477 P::TxRequest: Send + Sync,
478{
479 async fn send_tx<'a>(
480 &'a self,
481 tx_req: P::TxRequest,
482 ) -> Result<impl Future<Output = Result<ChainReceipt, ConnectorError>> + Send + 'a, ConnectorError> {
483 Ok(self
484 .sequencer
485 .enqueue_transaction(tx_req, self.cfg.tx_confirm_timeout)
486 .await?
487 .and_then(|tx| {
488 futures::future::ready(
489 ChainReceipt::from_str(&tx.transaction_hash.0)
490 .map_err(|_| ConnectorError::TypeConversion("invalid tx hash".into())),
491 )
492 }))
493 }
494}
495
496impl<B, C, P, R> HoprBlockchainConnector<C, R, B, P> {
497 pub(crate) fn check_connection_state(&self) -> Result<(), ConnectorError> {
498 self.connection_handle
499 .as_ref()
500 .filter(|handle| !handle.is_aborted()) .ok_or(ConnectorError::InvalidState("connector is not connected"))
502 .map(|_| ())
503 }
504
505 pub fn invalidate_caches(&self) {
507 self.channel_by_parties.invalidate_all();
508 self.channel_by_id.invalidate_all();
509 self.packet_to_chain.invalidate_all();
510 self.chain_to_packet.invalidate_all();
511 self.values.invalidate_all();
512 }
513}
514
515impl<B, C, P, R> Drop for HoprBlockchainConnector<C, R, B, P> {
516 fn drop(&mut self) {
517 self.events.0.close();
518 if let Some(abort_handle) = self.connection_handle.take() {
519 abort_handle.abort();
520 }
521 }
522}
523
524impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
525where
526 B: Backend + Send + Sync + 'static,
527 C: Send + Sync,
528 P: Send + Sync,
529 R: Send + Sync,
530{
531 pub fn as_path_resolver(&self) -> ChainPathResolver<'_, Self> {
533 self.into()
534 }
535}