hopr_chain_connector/connector/
mod.rs

1use std::{cmp::Ordering, str::FromStr, time::Duration};
2
3use blokli_client::api::{BlokliQueryClient, BlokliSubscriptionClient, BlokliTransactionClient};
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
5use futures_concurrency::stream::Merge;
6use futures_time::future::FutureExt as FuturesTimeExt;
7use hopr_api::chain::{ChainPathResolver, ChainReceipt, HoprKeyIdent};
8use hopr_async_runtime::AbortHandle;
9use hopr_chain_types::prelude::*;
10use hopr_crypto_types::prelude::*;
11use hopr_internal_types::prelude::*;
12use hopr_primitive_types::prelude::*;
13use petgraph::prelude::DiGraphMap;
14
15use crate::{
16    backend::Backend,
17    connector::{
18        keys::HoprKeyMapper,
19        sequencer::TransactionSequencer,
20        utils::{
21            ParsedChainInfo, model_to_account_entry, model_to_graph_entry, model_to_ticket_params,
22            process_channel_changes_into_events,
23        },
24        values::CHAIN_INFO_CACHE_KEY,
25    },
26    errors::ConnectorError,
27};
28
29mod accounts;
30mod channels;
31mod events;
32mod keys;
33mod sequencer;
34mod tickets;
35mod utils;
36mod values;
37
38type EventsChannel = (
39    async_broadcast::Sender<ChainEvent>,
40    async_broadcast::InactiveReceiver<ChainEvent>,
41);
42
43/// Configuration of the [`HoprBlockchainConnector`].
44#[derive(Clone, Copy, Debug, PartialEq, Eq, smart_default::SmartDefault)]
45pub struct BlockchainConnectorConfig {
46    /// Default time to wait until a transaction is confirmed.
47    #[default(Duration::from_secs(30))]
48    pub tx_confirm_timeout: Duration,
49    /// Fee to use for new key bindings.
50    #[default(HoprBalance::from_str("0.01 wxHOPR").unwrap())]
51    pub new_key_binding_fee: HoprBalance,
52}
53
54/// A connector acting as a middleware between the HOPR APIs (see the [`hopr_api`] crate) and the Blokli Client API (see
55/// the [`blokli_client`] crate).
56///
57/// The connector object cannot be cloned, and shall be used inside an `Arc` if cloning is needed.
58pub struct HoprBlockchainConnector<C, B, P, R> {
59    payload_generator: P,
60    chain_key: ChainKeypair,
61    client: std::sync::Arc<C>,
62    graph: std::sync::Arc<parking_lot::RwLock<DiGraphMap<HoprKeyIdent, ChannelId, ahash::RandomState>>>,
63    backend: std::sync::Arc<B>,
64    connection_handle: Option<AbortHandle>,
65    sequencer: TransactionSequencer<C, R>,
66    events: EventsChannel,
67    cfg: BlockchainConnectorConfig,
68
69    // KeyId <-> OffchainPublicKey mapping
70    mapper: HoprKeyMapper<B>,
71    // Fast retrieval of chain keys by address
72    chain_to_packet: moka::future::Cache<Address, Option<OffchainPublicKey>, ahash::RandomState>,
73    // Fast retrieval of packet keys by chain key
74    packet_to_chain: moka::future::Cache<OffchainPublicKey, Option<Address>, ahash::RandomState>,
75    // Fast retrieval of channel entries by id
76    channel_by_id: moka::future::Cache<ChannelId, Option<ChannelEntry>, ahash::RandomState>,
77    // Fast retrieval of channel entries by parties
78    channel_by_parties: moka::future::Cache<ChannelParties, Option<ChannelEntry>, ahash::RandomState>,
79    // Contains only the chain info structure
80    values: moka::future::Cache<u32, ParsedChainInfo>,
81}
82
83impl<B, C, P> HoprBlockchainConnector<C, B, P, P::TxRequest>
84where
85    B: Backend + Send + Sync + 'static,
86    C: BlokliSubscriptionClient + BlokliQueryClient + BlokliTransactionClient + Send + Sync + 'static,
87    P: PayloadGenerator + Send + Sync + 'static,
88    P::TxRequest: Send + Sync + 'static,
89{
90    /// Creates a new instance.
91    pub fn new(
92        chain_key: ChainKeypair,
93        cfg: BlockchainConnectorConfig,
94        client: C,
95        backend: B,
96        payload_generator: P,
97    ) -> Self {
98        let backend = std::sync::Arc::new(backend);
99        let (mut events_tx, events_rx) = async_broadcast::broadcast(1024);
100        events_tx.set_overflow(true);
101        events_tx.set_await_active(false);
102
103        let client = std::sync::Arc::new(client);
104        Self {
105            payload_generator,
106            graph: std::sync::Arc::new(parking_lot::RwLock::new(DiGraphMap::with_capacity_and_hasher(
107                10_000,
108                100_000,
109                ahash::RandomState::default(),
110            ))),
111            backend: backend.clone(),
112            connection_handle: None,
113            sequencer: TransactionSequencer::new(chain_key.clone(), client.clone()),
114            events: (events_tx, events_rx.deactivate()),
115            client,
116            chain_key,
117            cfg,
118            mapper: HoprKeyMapper {
119                id_to_key: moka::sync::CacheBuilder::new(10_000)
120                    .time_to_idle(Duration::from_secs(600))
121                    .build_with_hasher(ahash::RandomState::default()),
122                key_to_id: moka::sync::CacheBuilder::new(10_000)
123                    .time_to_idle(Duration::from_secs(600))
124                    .build_with_hasher(ahash::RandomState::default()),
125                backend,
126            },
127            chain_to_packet: moka::future::CacheBuilder::new(10_000)
128                .time_to_idle(Duration::from_secs(600))
129                .build_with_hasher(ahash::RandomState::default()),
130            packet_to_chain: moka::future::CacheBuilder::new(10_000)
131                .time_to_idle(Duration::from_secs(600))
132                .build_with_hasher(ahash::RandomState::default()),
133            channel_by_id: moka::future::CacheBuilder::new(100_000)
134                .time_to_idle(Duration::from_secs(600))
135                .build_with_hasher(ahash::RandomState::default()),
136            channel_by_parties: moka::future::CacheBuilder::new(100_000)
137                .time_to_idle(Duration::from_secs(600))
138                .build_with_hasher(ahash::RandomState::default()),
139            values: moka::future::CacheBuilder::new(1)
140                .time_to_live(Duration::from_secs(600))
141                .build(),
142        }
143    }
144
145    async fn do_connect(&self, timeout: Duration) -> Result<AbortHandle, ConnectorError> {
146        const TOLERANCE: f64 = 0.01;
147        let min_accounts = (self.client.count_accounts(None).await? as f64 * (1.0 - TOLERANCE)).round() as u32;
148        let min_channels = (self.client.count_channels(None).await? as f64 * (1.0 - TOLERANCE)).round() as u32;
149        tracing::debug!(min_accounts, min_channels, "connection thresholds");
150
151        let (abort_handle, abort_reg) = AbortHandle::new_pair();
152
153        let (connection_ready_tx, connection_ready_rx) = futures::channel::oneshot::channel();
154        let mut connection_ready_tx = Some(connection_ready_tx);
155
156        let client = self.client.clone();
157        let mapper = self.mapper.clone();
158        let backend = self.backend.clone();
159        let graph = self.graph.clone();
160        let event_tx = self.events.0.clone();
161        let me = self.chain_key.public().to_address();
162        let values_cache = self.values.clone();
163
164        let chain_to_packet = self.chain_to_packet.clone();
165        let packet_to_chain = self.packet_to_chain.clone();
166
167        let channel_by_id = self.channel_by_id.clone();
168        let channel_by_parties = self.channel_by_parties.clone();
169
170        // Query chain info to populate the cache
171        self.query_cached_chain_info().await?;
172
173        #[allow(clippy::large_enum_variant)]
174        #[derive(Debug)]
175        enum SubscribedEventType {
176            Account((AccountEntry, Option<AccountEntry>)),
177            Channel((ChannelEntry, Option<Vec<ChannelChange>>)),
178            WinningProbability((WinningProbability, Option<WinningProbability>)),
179            TicketPrice((HoprBalance, Option<HoprBalance>)),
180        }
181
182        hopr_async_runtime::prelude::spawn(async move {
183            let connections = client
184                .subscribe_accounts(None)
185                .and_then(|accounts| Ok((accounts, client.subscribe_graph()?)))
186                .and_then(|(accounts, channels)| Ok((accounts, channels, client.subscribe_ticket_params()?)));
187
188            if let Err(error) = connections {
189                if let Some(connection_ready_tx) = connection_ready_tx.take() {
190                    let _ = connection_ready_tx.send(Err(error));
191                }
192                return;
193            }
194
195            let (account_stream, channel_stream, ticket_params_stream) = connections.unwrap();
196
197            // Stream of Account events (Announcements)
198            let graph_clone = graph.clone();
199            let account_stream = account_stream
200                .inspect_ok(|entry| tracing::trace!(?entry, "new account entry"))
201                .map_err(ConnectorError::from)
202                .try_filter_map(|account| futures::future::ready(model_to_account_entry(account).map(Some)))
203                .and_then(move |account| {
204                    let graph = graph_clone.clone();
205                    let mapper = mapper.clone();
206                    let chain_to_packet = chain_to_packet.clone();
207                    let packet_to_chain = packet_to_chain.clone();
208                    hopr_async_runtime::prelude::spawn_blocking(move || {
209                        mapper.key_to_id.insert(account.public_key, Some(account.key_id));
210                        mapper.id_to_key.insert(account.key_id, Some(account.public_key));
211                        graph.write().add_node(account.key_id);
212                        mapper.backend.insert_account(account.clone()).map(|old| (account, old))
213                    })
214                    .map_err(|e| ConnectorError::BackendError(e.into()))
215                    .and_then(move |res| {
216                        let chain_to_packet = chain_to_packet.clone();
217                        let packet_to_chain = packet_to_chain.clone();
218                        async move {
219                            if let Ok((account, _)) = &res {
220                                // Rather update the cached entry than invalidating it
221                                chain_to_packet
222                                    .insert(account.chain_addr, Some(account.public_key))
223                                    .await;
224                                packet_to_chain
225                                    .insert(account.public_key, Some(account.chain_addr))
226                                    .await;
227                            }
228                            res.map(SubscribedEventType::Account)
229                                .map_err(|e| ConnectorError::BackendError(e.into()))
230                        }
231                    })
232                })
233                .fuse();
234
235            // Stream of channel graph updates
236            let channel_stream = channel_stream
237                .map_err(ConnectorError::from)
238                .inspect_ok(|entry| tracing::trace!(?entry, "new graph entry"))
239                .try_filter_map(|graph_event| futures::future::ready(model_to_graph_entry(graph_event).map(Some)))
240                .and_then(move |(src, dst, channel)| {
241                    let graph = graph.clone();
242                    let backend = backend.clone();
243                    let channel_by_id = channel_by_id.clone();
244                    let channel_by_parties = channel_by_parties.clone();
245                    hopr_async_runtime::prelude::spawn_blocking(move || {
246                        graph.write().add_edge(src.key_id, dst.key_id, channel.get_id());
247                        backend
248                            .insert_channel(channel)
249                            .map(|old| (channel, old.map(|old| old.diff(&channel))))
250                    })
251                    .map_err(|e| ConnectorError::BackendError(e.into()))
252                    .and_then(move |res| {
253                        let channel_by_id = channel_by_id.clone();
254                        let channel_by_parties = channel_by_parties.clone();
255                        async move {
256                            if let Ok((channel, _)) = &res {
257                                // Rather update the cached entry than invalidating it
258                                channel_by_id.insert(channel.get_id(), Some(*channel)).await;
259                                channel_by_parties
260                                    .insert(ChannelParties::from(channel), Some(*channel))
261                                    .await;
262                            }
263                            res.map(SubscribedEventType::Channel)
264                                .map_err(|e| ConnectorError::BackendError(e.into()))
265                        }
266                    })
267                })
268                .fuse();
269
270            // Stream of ticket parameter updates (ticket price, minimum winning probability)
271            let ticket_params_stream = ticket_params_stream
272                .map_err(ConnectorError::from)
273                .inspect_ok(|entry| tracing::trace!(?entry, "new ticket params"))
274                .try_filter_map(|ticket_value_event| {
275                    futures::future::ready(model_to_ticket_params(ticket_value_event).map(Some))
276                })
277                .and_then(|(new_ticket_price, new_win_prob)| {
278                    let values_cache = values_cache.clone();
279                    async move {
280                        let mut events = Vec::<SubscribedEventType>::new();
281                        values_cache
282                            .entry(CHAIN_INFO_CACHE_KEY)
283                            .and_compute_with(|cached_entry| {
284                                futures::future::ready(match cached_entry {
285                                    Some(chain_info) => {
286                                        let mut chain_info = chain_info.into_value();
287                                        if chain_info.ticket_price != new_ticket_price {
288                                            events.push(SubscribedEventType::TicketPrice((
289                                                new_ticket_price,
290                                                Some(chain_info.ticket_price),
291                                            )));
292                                            chain_info.ticket_price = new_ticket_price;
293                                        }
294                                        if !chain_info.ticket_win_prob.approx_eq(&new_win_prob) {
295                                            events.push(SubscribedEventType::WinningProbability((
296                                                new_win_prob,
297                                                Some(chain_info.ticket_win_prob),
298                                            )));
299                                            chain_info.ticket_win_prob = new_win_prob;
300                                        }
301
302                                        if !events.is_empty() {
303                                            moka::ops::compute::Op::Put(chain_info)
304                                        } else {
305                                            moka::ops::compute::Op::Nop
306                                        }
307                                    }
308                                    None => {
309                                        tracing::warn!(
310                                            "chain info not present in the cache before ticket params update"
311                                        );
312                                        events.push(SubscribedEventType::TicketPrice((new_ticket_price, None)));
313                                        events.push(SubscribedEventType::WinningProbability((new_win_prob, None)));
314                                        moka::ops::compute::Op::Nop
315                                    }
316                                })
317                            })
318                            .await;
319                        Ok(futures::stream::iter(events).map(Ok::<_, ConnectorError>))
320                    }
321                })
322                .try_flatten()
323                .fuse();
324
325            let mut account_counter = 0;
326            let mut channel_counter = 0;
327            if min_accounts == 0 && min_channels == 0 {
328                tracing::debug!(account_counter, channel_counter, "on-chain graph has been synced");
329                let _ = connection_ready_tx.take().unwrap().send(Ok(()));
330            }
331
332            futures::stream::Abortable::new(
333                (account_stream, channel_stream, ticket_params_stream).merge(),
334                abort_reg,
335            )
336            .inspect_ok(move |event_type| {
337                if connection_ready_tx.is_some() {
338                    match event_type {
339                        SubscribedEventType::Account(_) => account_counter += 1,
340                        SubscribedEventType::Channel(_) => channel_counter += 1,
341                        _ => {}
342                    }
343
344                    // Send the completion notification
345                    // once we reach the expected number of accounts and channels with
346                    // the given tolerance
347                    if account_counter >= min_accounts && channel_counter >= min_channels {
348                        tracing::debug!(account_counter, channel_counter, "on-chain graph has been synced");
349                        let _ = connection_ready_tx.take().unwrap().send(Ok(()));
350                    }
351                }
352            })
353            .for_each(|event_type| {
354                let event_tx = event_tx.clone();
355                async move {
356                    match event_type {
357                        Ok(SubscribedEventType::Account((new_account, old_account))) => {
358                            tracing::debug!(%new_account, "account inserted");
359                            // We only track public accounts as events and also
360                            // broadcast announcements of already existing accounts (old_account == None).
361                            if new_account.has_announced_with_routing_info()
362                                && old_account.is_none_or(|a| !a.has_announced_with_routing_info())
363                            {
364                                tracing::debug!(account = %new_account, "new announcement");
365                                let _ = event_tx
366                                    .broadcast_direct(ChainEvent::Announcement(new_account.clone()))
367                                    .await;
368                            }
369                        }
370                        Ok(SubscribedEventType::Channel((new_channel, Some(changes)))) => {
371                            tracing::debug!(
372                                id = %new_channel.get_id(),
373                                src = %new_channel.source, dst = %new_channel.destination,
374                                num_changes = changes.len(),
375                                "channel updated"
376                            );
377                            process_channel_changes_into_events(new_channel, changes, &me, &event_tx).await;
378                        }
379                        Ok(SubscribedEventType::Channel((new_channel, None))) => {
380                            tracing::debug!(
381                                id = %new_channel.get_id(),
382                                src = %new_channel.source, dst = %new_channel.destination,
383                                "channel opened"
384                            );
385                            let _ = event_tx.broadcast_direct(ChainEvent::ChannelOpened(new_channel)).await;
386                        }
387                        Ok(SubscribedEventType::WinningProbability((new, old))) => {
388                            let old = old.unwrap_or_default();
389                            match new.approx_cmp(&old) {
390                                Ordering::Less => {
391                                    tracing::debug!(%new, %old, "winning probability decreased");
392                                    let _ = event_tx
393                                        .broadcast_direct(ChainEvent::WinningProbabilityDecreased(new))
394                                        .await;
395                                }
396                                Ordering::Greater => {
397                                    tracing::debug!(%new, %old, "winning probability increased");
398                                    let _ = event_tx
399                                        .broadcast_direct(ChainEvent::WinningProbabilityIncreased(new))
400                                        .await;
401                                }
402                                Ordering::Equal => {}
403                            }
404                        }
405                        Ok(SubscribedEventType::TicketPrice((new, old))) => {
406                            tracing::debug!(%new, ?old, "ticket price changed");
407                            let _ = event_tx.broadcast_direct(ChainEvent::TicketPriceChanged(new)).await;
408                        }
409                        Err(error) => {
410                            tracing::error!(%error, "error processing account/graph/ticket params subscription");
411                        }
412                    }
413                }
414            })
415            .await;
416        });
417
418        connection_ready_rx
419            .timeout(futures_time::time::Duration::from(timeout))
420            .map(|res| match res {
421                Ok(Ok(Ok(_))) => Ok(abort_handle),
422                Ok(Ok(Err(error))) => {
423                    abort_handle.abort();
424                    Err(ConnectorError::from(error))
425                }
426                Ok(Err(_)) => {
427                    abort_handle.abort();
428                    Err(ConnectorError::InvalidState("failed to determine connection state"))
429                }
430                Err(_) => {
431                    abort_handle.abort();
432                    tracing::error!(min_accounts, min_channels, "connection timeout when syncing");
433                    Err(ConnectorError::ConnectionTimeout)
434                }
435            })
436            .await
437    }
438
439    /// Connects to the chain using the underlying client, syncs all on-chain data
440    /// and subscribes for all future updates.
441    ///
442    /// If the sync of the current state does not happen within `timeout`, a [`ConnectorError::ConnectionTimeout`]
443    /// error is returned.
444    pub async fn connect(&mut self, timeout: Duration) -> Result<(), ConnectorError> {
445        if self
446            .connection_handle
447            .as_ref()
448            .filter(|handle| !handle.is_aborted())
449            .is_some()
450        {
451            return Err(ConnectorError::InvalidState("connector is already connected"));
452        }
453
454        let abort_handle = self.do_connect(timeout).await?;
455
456        if let Err(error) = self.sequencer.start().await {
457            abort_handle.abort();
458            return Err(error);
459        }
460
461        self.connection_handle = Some(abort_handle);
462
463        tracing::info!(node = %self.chain_key.public().to_address(), "connected to chain as node");
464        Ok(())
465    }
466
467    /// Returns the reference to the underlying client.
468    pub fn client(&self) -> &C {
469        self.client.as_ref()
470    }
471}
472
473impl<B, C, P> HoprBlockchainConnector<C, B, P, P::TxRequest>
474where
475    C: BlokliTransactionClient + Send + Sync + 'static,
476    P: PayloadGenerator + Send + Sync,
477    P::TxRequest: Send + Sync,
478{
479    async fn send_tx<'a>(
480        &'a self,
481        tx_req: P::TxRequest,
482    ) -> Result<impl Future<Output = Result<ChainReceipt, ConnectorError>> + Send + 'a, ConnectorError> {
483        Ok(self
484            .sequencer
485            .enqueue_transaction(tx_req, self.cfg.tx_confirm_timeout)
486            .await?
487            .and_then(|tx| {
488                futures::future::ready(
489                    ChainReceipt::from_str(&tx.transaction_hash.0)
490                        .map_err(|_| ConnectorError::TypeConversion("invalid tx hash".into())),
491                )
492            }))
493    }
494}
495
496impl<B, C, P, R> HoprBlockchainConnector<C, R, B, P> {
497    pub(crate) fn check_connection_state(&self) -> Result<(), ConnectorError> {
498        self.connection_handle
499            .as_ref()
500            .filter(|handle| !handle.is_aborted()) // Do a safety check
501            .ok_or(ConnectorError::InvalidState("connector is not connected"))
502            .map(|_| ())
503    }
504
505    /// Invalidates all cached on-chain data.
506    pub fn invalidate_caches(&self) {
507        self.channel_by_parties.invalidate_all();
508        self.channel_by_id.invalidate_all();
509        self.packet_to_chain.invalidate_all();
510        self.chain_to_packet.invalidate_all();
511        self.values.invalidate_all();
512    }
513}
514
515impl<B, C, P, R> Drop for HoprBlockchainConnector<C, R, B, P> {
516    fn drop(&mut self) {
517        self.events.0.close();
518        if let Some(abort_handle) = self.connection_handle.take() {
519            abort_handle.abort();
520        }
521    }
522}
523
524impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
525where
526    B: Backend + Send + Sync + 'static,
527    C: Send + Sync,
528    P: Send + Sync,
529    R: Send + Sync,
530{
531    /// Returns a [`PathAddressResolver`] using this connector.
532    pub fn as_path_resolver(&self) -> ChainPathResolver<'_, Self> {
533        self.into()
534    }
535}