Skip to main content

hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15
16/// Helper functions.
17mod helpers;
18
19/// Configuration-related public types
20pub mod config;
21/// Various public constants.
22pub mod constants;
23/// Lists all errors thrown from this library.
24pub mod errors;
25
26/// Utility module with helper types and functionality over hopr-lib behavior.
27pub mod utils;
28
29/// Public traits for interactions with this library.
30pub mod traits;
31
32/// Functionality related to the HOPR node state.
33pub mod state;
34
35#[cfg(any(feature = "testing", test))]
36pub mod testing;
37
38pub use hopr_api as api;
39
40/// Exports of libraries necessary for API and interface operations.
41#[doc(hidden)]
42pub mod exports {
43    pub mod types {
44        pub use hopr_chain_types as chain;
45        pub use hopr_internal_types as internal;
46        pub use hopr_primitive_types as primitive;
47    }
48
49    pub mod crypto {
50        pub use hopr_crypto_keypair as keypair;
51        pub use hopr_crypto_types as types;
52    }
53
54    pub mod network {
55        pub use hopr_network_types as types;
56    }
57
58    pub use hopr_transport as transport;
59}
60
61/// Export of relevant types for easier integration.
62#[doc(hidden)]
63pub mod prelude {
64    pub use super::exports::{
65        crypto::{
66            keypair::key_pair::HoprKeys,
67            types::prelude::{ChainKeypair, Hash, OffchainKeypair},
68        },
69        network::types::{
70            prelude::ForeignDataMode,
71            udp::{ConnectedUdpStream, UdpStreamParallelism},
72        },
73        transport::{OffchainPublicKey, socket::HoprSocket},
74        types::primitive::prelude::Address,
75    };
76}
77
78use std::{
79    convert::identity,
80    future::Future,
81    sync::{Arc, OnceLock, atomic::Ordering},
82    time::Duration,
83};
84
85use futures::{FutureExt, SinkExt, Stream, StreamExt, TryFutureExt, channel::mpsc::channel, pin_mut};
86use futures_time::future::FutureExt as FuturesTimeFutureExt;
87use hopr_api::{
88    chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
89    ct::TrafficGeneration,
90    db::{HoprNodeDbApi, TicketMarker, TicketSelector},
91};
92pub use hopr_api::{db::ChannelTicketStatistics, network::Observable};
93use hopr_async_runtime::prelude::spawn;
94pub use hopr_async_runtime::{Abortable, AbortableList};
95pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
96pub use hopr_crypto_types::prelude::*;
97pub use hopr_internal_types::prelude::*;
98pub use hopr_network_types::prelude::*;
99#[cfg(all(feature = "prometheus", not(test)))]
100use hopr_platform::time::native::current_time;
101pub use hopr_primitive_types::prelude::*;
102use hopr_transport::errors::HoprTransportError;
103#[cfg(feature = "runtime-tokio")]
104pub use hopr_transport::transfer_session;
105pub use hopr_transport::*;
106use tracing::{debug, error, info, warn};
107use validator::Validate;
108
109pub use crate::{
110    config::SafeModule,
111    constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
112    errors::{HoprLibError, HoprStatusError},
113    state::{HoprLibProcess, HoprState},
114    traits::chain::{CloseChannelResult, OpenChannelResult},
115};
116
117#[cfg(all(feature = "prometheus", not(test)))]
118lazy_static::lazy_static! {
119    static ref METRIC_PROCESS_START_TIME:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
120        "hopr_start_time",
121        "The unix timestamp in seconds at which the process was started"
122    ).unwrap();
123    static ref METRIC_HOPR_LIB_VERSION:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
124        "hopr_lib_version",
125        "Executed version of hopr-lib",
126        &["version"]
127    ).unwrap();
128    static ref METRIC_HOPR_NODE_INFO:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
129        "hopr_node_addresses",
130        "Node on-chain and off-chain addresses",
131        &["peerid", "address", "safe_address", "module_address"]
132    ).unwrap();
133}
134
135/// Prepare an optimized version of the tokio runtime setup for hopr-lib specifically.
136///
137/// Divide the available CPU parallelism by 2, since half of the available threads are
138/// to be used for IO-bound and half for CPU-bound tasks.
139#[cfg(feature = "runtime-tokio")]
140pub fn prepare_tokio_runtime(
141    num_cpu_threads: Option<std::num::NonZeroUsize>,
142    num_io_threads: Option<std::num::NonZeroUsize>,
143) -> anyhow::Result<tokio::runtime::Runtime> {
144    use std::str::FromStr;
145    let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
146
147    hopr_parallelize::cpu::init_thread_pool(
148        num_cpu_threads
149            .map(|v| v.get())
150            .or(avail_parallelism)
151            .ok_or(anyhow::anyhow!(
152                "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
153                 environment variable."
154            ))?
155            .max(1),
156    )?;
157
158    Ok(tokio::runtime::Builder::new_multi_thread()
159        .enable_all()
160        .worker_threads(
161            num_io_threads
162                .map(|v| v.get())
163                .or(avail_parallelism)
164                .ok_or(anyhow::anyhow!(
165                    "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
166                     environment variable."
167                ))?
168                .max(1),
169        )
170        .thread_name("hoprd")
171        .thread_stack_size(
172            std::env::var("HOPRD_THREAD_STACK_SIZE")
173                .ok()
174                .and_then(|v| usize::from_str(&v).ok())
175                .unwrap_or(10 * 1024 * 1024)
176                .max(2 * 1024 * 1024),
177        )
178        .build()?)
179}
180
181/// Type alias used to send and receive transport data via a running HOPR node.
182pub type HoprTransportIO = socket::HoprSocket<
183    futures::channel::mpsc::Receiver<ApplicationDataIn>,
184    futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
185>;
186
187type NewTicketEvents = (
188    async_broadcast::Sender<VerifiedTicket>,
189    async_broadcast::InactiveReceiver<VerifiedTicket>,
190);
191
192/// Time to wait until the node's keybinding appears on-chain
193const NODE_READY_TIMEOUT: Duration = Duration::from_secs(120);
194
195/// Timeout to wait until an on-chain event is received in response to a successful on-chain operation resolution.
196// TODO: use the value from ChainInfo instead (once available via https://github.com/hoprnet/blokli/issues/200)
197const ON_CHAIN_RESOLUTION_EVENT_TIMEOUT: Duration = Duration::from_secs(90);
198
199/// HOPR main object providing the entire HOPR node functionality
200///
201/// Instantiating this object creates all processes and objects necessary for
202/// running the HOPR node. Once created, the node can be started using the
203/// `run()` method.
204///
205/// Externally offered API should be enough to perform all necessary tasks
206/// with the HOPR node manually, but it is advised to create such a configuration
207/// that manual interaction is unnecessary.
208///
209/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
210pub struct Hopr<Chain, Db> {
211    me: OffchainKeypair,
212    cfg: config::HoprLibConfig,
213    state: Arc<state::AtomicHoprState>,
214    transport_api: HoprTransport<Chain, Db>,
215    redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
216    node_db: Db,
217    chain_api: Chain,
218    winning_ticket_subscribers: NewTicketEvents,
219    processes: OnceLock<AbortableList<HoprLibProcess>>,
220}
221
222impl<Chain, Db> Hopr<Chain, Db>
223where
224    Chain: HoprChainApi + Clone + Send + Sync + 'static,
225    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
226{
227    pub async fn new(
228        identity: (&ChainKeypair, &OffchainKeypair),
229        hopr_chain_api: Chain,
230        hopr_node_db: Db,
231        cfg: config::HoprLibConfig,
232    ) -> errors::Result<Self> {
233        if hopr_crypto_random::is_rng_fixed() {
234            warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
235        }
236
237        cfg.validate()?;
238
239        let hopr_transport_api = HoprTransport::new(
240            identity,
241            hopr_chain_api.clone(),
242            hopr_node_db.clone(),
243            vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
244            cfg.protocol,
245        );
246
247        #[cfg(all(feature = "prometheus", not(test)))]
248        {
249            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
250            METRIC_HOPR_LIB_VERSION.set(
251                &[const_format::formatcp!("{}", constants::APP_VERSION)],
252                const_format::formatcp!(
253                    "{}.{}",
254                    env!("CARGO_PKG_VERSION_MAJOR"),
255                    env!("CARGO_PKG_VERSION_MINOR")
256                )
257                .parse()
258                .unwrap_or(0.0),
259            );
260
261            // Calling get_ticket_statistics will initialize the respective metrics on tickets
262            if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
263                error!(%error, "failed to initialize ticket statistics metrics");
264            }
265        }
266
267        let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
268        new_tickets_tx.set_await_active(false);
269        new_tickets_tx.set_overflow(true);
270
271        Ok(Self {
272            me: identity.1.clone(),
273            cfg,
274            state: Arc::new(state::AtomicHoprState::new(HoprState::Uninitialized)),
275            transport_api: hopr_transport_api,
276            chain_api: hopr_chain_api,
277            node_db: hopr_node_db,
278            redeem_requests: OnceLock::new(),
279            processes: OnceLock::new(),
280            winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
281        })
282    }
283
284    fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
285        if self.status() == state {
286            Ok(())
287        } else {
288            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
289        }
290    }
291
292    pub fn status(&self) -> HoprState {
293        self.state.load(Ordering::Relaxed)
294    }
295
296    pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
297        self.chain_api
298            .balance(self.me_onchain())
299            .await
300            .map_err(HoprLibError::chain)
301    }
302
303    pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
304        self.chain_api
305            .balance(self.cfg.safe_module.safe_address)
306            .await
307            .map_err(HoprLibError::chain)
308    }
309
310    pub async fn chain_info(&self) -> errors::Result<ChainInfo> {
311        self.chain_api.chain_info().await.map_err(HoprLibError::chain)
312    }
313
314    pub fn get_safe_config(&self) -> SafeModule {
315        self.cfg.safe_module.clone()
316    }
317
318    pub fn config(&self) -> &config::HoprLibConfig {
319        &self.cfg
320    }
321
322    #[inline]
323    fn is_public(&self) -> bool {
324        self.cfg.publish
325    }
326
327    pub async fn run<
328        Ct,
329        #[cfg(feature = "session-server")] T: traits::session::HoprSessionServer + Clone + Send + 'static,
330    >(
331        &self,
332        cover_traffic: Ct,
333        #[cfg(feature = "session-server")] serve_handler: T,
334    ) -> errors::Result<HoprTransportIO>
335    where
336        Ct: TrafficGeneration + Send + Sync + 'static,
337    {
338        self.error_if_not_in_state(
339            HoprState::Uninitialized,
340            "cannot start the hopr node multiple times".into(),
341        )?;
342
343        #[cfg(feature = "testing")]
344        warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
345
346        info!(
347            address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
348            "node is not started, please fund this node",
349        );
350
351        self.state.store(HoprState::WaitingForFunds, Ordering::Relaxed);
352        helpers::wait_for_funds(
353            *MIN_NATIVE_BALANCE,
354            *SUGGESTED_NATIVE_BALANCE,
355            Duration::from_secs(200),
356            self.me_onchain(),
357            &self.chain_api,
358        )
359        .await?;
360
361        let mut processes = AbortableList::<HoprLibProcess>::default();
362
363        info!("starting HOPR node...");
364        self.state.store(HoprState::CheckingBalance, Ordering::Relaxed);
365
366        let balance: XDaiBalance = self.get_balance().await?;
367        let minimum_balance = *constants::MIN_NATIVE_BALANCE;
368
369        info!(
370            address = %self.me_onchain(),
371            %balance,
372            %minimum_balance,
373            "node information"
374        );
375
376        if balance.le(&minimum_balance) {
377            return Err(HoprLibError::GeneralError(
378                "cannot start the node without a sufficiently funded wallet".into(),
379            ));
380        }
381
382        self.state.store(HoprState::ValidatingNetworkConfig, Ordering::Relaxed);
383
384        // Once we are able to query the chain,
385        // check if the ticket price is configured correctly.
386        let network_min_ticket_price = self
387            .chain_api
388            .minimum_ticket_price()
389            .await
390            .map_err(HoprLibError::chain)?;
391        let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
392        if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
393            return Err(HoprLibError::GeneralError(format!(
394                "configured outgoing ticket price is lower than the network minimum ticket price: \
395                 {configured_ticket_price:?} < {network_min_ticket_price}"
396            )));
397        }
398        // Once we are able to query the chain,
399        // check if the winning probability is configured correctly.
400        let network_min_win_prob = self
401            .chain_api
402            .minimum_incoming_ticket_win_prob()
403            .await
404            .map_err(HoprLibError::chain)?;
405        let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
406        if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
407            && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
408        {
409            return Err(HoprLibError::GeneralError(format!(
410                "configured outgoing ticket winning probability is lower than the network minimum winning \
411                 probability: {configured_win_prob:?} < {network_min_win_prob}"
412            )));
413        }
414
415        // Calculate the minimum capacity based on accounts (each account can generate 2 messages),
416        // plus 100 as an additional buffer
417        let minimum_capacity = self
418            .chain_api
419            .count_accounts(AccountSelector {
420                public_only: true,
421                ..Default::default()
422            })
423            .await
424            .map_err(HoprLibError::chain)?
425            .saturating_mul(2)
426            .saturating_add(100);
427
428        let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
429            .ok()
430            .and_then(|s| s.trim().parse::<usize>().ok())
431            .filter(|&c| c > 0)
432            .unwrap_or(2048)
433            .max(minimum_capacity);
434
435        debug!(
436            capacity = chain_discovery_events_capacity,
437            minimum_required = minimum_capacity,
438            "creating chain discovery events channel"
439        );
440        let (indexer_peer_update_tx, indexer_peer_update_rx) =
441            channel::<PeerDiscovery>(chain_discovery_events_capacity);
442
443        self.state
444            .store(HoprState::SubscribingToAnnouncements, Ordering::Relaxed);
445
446        // Stream all the existing announcements and also subscribe to all future on-chain
447        // announcements
448        let (announcements_stream, announcements_handle) = futures::stream::abortable(
449            self.chain_api
450                .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])
451                .map_err(HoprLibError::chain)?,
452        );
453        processes.insert(HoprLibProcess::AccountAnnouncements, announcements_handle);
454
455        spawn(
456            announcements_stream
457                .filter_map(|event| {
458                    futures::future::ready(event.try_as_announcement().map(|account| {
459                        PeerDiscovery::Announce(account.public_key.into(), account.get_multiaddrs().to_vec())
460                    }))
461                })
462                .map(Ok)
463                .forward(indexer_peer_update_tx)
464                .inspect(
465                    |_| warn!(task = %HoprLibProcess::AccountAnnouncements,"long-running background task finished"),
466                ),
467        );
468
469        info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
470
471        let safe_addr = self.cfg.safe_module.safe_address;
472
473        if self.me_onchain() == safe_addr {
474            return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
475        }
476
477        self.state.store(HoprState::RegisteringSafe, Ordering::Relaxed);
478        info!(%safe_addr, "registering safe with this node");
479        match self.chain_api.register_safe(&safe_addr).await {
480            Ok(awaiter) => {
481                // Wait until the registration is confirmed on-chain, otherwise we cannot proceed.
482                awaiter.await.map_err(|error| {
483                    error!(%safe_addr, %error, "safe registration failed with error");
484                    HoprLibError::chain(error)
485                })?;
486                info!(%safe_addr, "safe successfully registered with this node");
487            }
488            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
489                info!(%safe_addr, "this safe is already registered with this node");
490            }
491            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
492                // TODO: support safe deregistration flow
493                error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
494                return Err(HoprLibError::GeneralError("node registered with different safe".into()));
495            }
496            Err(error) => {
497                error!(%safe_addr, %error, "safe registration failed");
498                return Err(HoprLibError::chain(error));
499            }
500        }
501
502        // Only public nodes announce multiaddresses
503        let multiaddresses_to_announce = if self.is_public() {
504            // The multiaddresses are filtered for the non-private ones,
505            // unless `announce_local_addresses` is set to `true`.
506            self.transport_api.announceable_multiaddresses()
507        } else {
508            Vec::with_capacity(0)
509        };
510
511        // Warn when announcing a private multiaddress, which is acceptable in certain scenarios
512        multiaddresses_to_announce
513            .iter()
514            .filter(|a| !is_public_address(a))
515            .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
516
517        self.state.store(HoprState::AnnouncingNode, Ordering::Relaxed);
518
519        let chain_api = self.chain_api.clone();
520        let me_offchain = *self.me.public();
521        let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
522
523        // At this point the node is already registered with Safe, so
524        // we can announce via Safe-compliant TX
525        info!(?multiaddresses_to_announce, "announcing node on chain");
526        match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
527            Ok(awaiter) => {
528                // Wait until the announcement is confirmed on-chain, otherwise we cannot proceed.
529                awaiter.await.map_err(|error| {
530                    error!(?multiaddresses_to_announce, %error, "node announcement failed");
531                    HoprLibError::chain(error)
532                })?;
533                info!(?multiaddresses_to_announce, "node has been successfully announced");
534            }
535            Err(AnnouncementError::AlreadyAnnounced) => {
536                info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
537            }
538            Err(error) => {
539                error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
540                return Err(HoprLibError::chain(error));
541            }
542        }
543
544        self.state.store(HoprState::AwaitingKeyBinding, Ordering::Relaxed);
545
546        // Wait for the node key-binding readiness to return
547        let this_node_account = node_ready
548            .await
549            .map_err(HoprLibError::other)?
550            .map_err(HoprLibError::chain)?;
551        if this_node_account.chain_addr != self.me_onchain()
552            || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
553        {
554            error!(%this_node_account, "account bound to offchain key does not match this node");
555            return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
556        }
557
558        info!(%this_node_account, "node account is ready");
559
560        self.state.store(HoprState::InitializingServices, Ordering::Relaxed);
561
562        info!("initializing session infrastructure");
563        let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
564            .ok()
565            .and_then(|s| s.trim().parse::<usize>().ok())
566            .filter(|&c| c > 0)
567            .unwrap_or(256);
568
569        let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
570        #[cfg(feature = "session-server")]
571        {
572            debug!(capacity = incoming_session_channel_capacity, "creating session server");
573            processes.insert(
574                HoprLibProcess::SessionServer,
575                hopr_async_runtime::spawn_as_abortable!(
576                    _session_rx
577                        .for_each_concurrent(None, move |session| {
578                            let serve_handler = serve_handler.clone();
579                            async move {
580                                let session_id = *session.session.id();
581                                match serve_handler.process(session).await {
582                                    Ok(_) => debug!(?session_id, "client session processed successfully"),
583                                    Err(error) => error!(
584                                        ?session_id,
585                                        %error,
586                                        "client session processing failed"
587                                    ),
588                                }
589                            }
590                        })
591                        .inspect(|_| tracing::warn!(
592                            task = %HoprLibProcess::SessionServer,
593                            "long-running background task finished"
594                        ))
595                ),
596            );
597        }
598
599        info!("starting ticket events processor");
600        let (tickets_tx, tickets_rx) = channel(8192);
601        let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
602        processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
603        let node_db = self.node_db.clone();
604        let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
605        spawn(
606            tickets_rx
607                .filter_map(move |ticket_event| {
608                    let node_db = node_db.clone();
609                    async move {
610                        match ticket_event {
611                            TicketEvent::WinningTicket(winning) => {
612                                if let Err(error) = node_db.insert_ticket(*winning).await {
613                                    tracing::error!(%error, %winning, "failed to insert ticket into database");
614                                } else {
615                                    tracing::debug!(%winning, "inserted ticket into database");
616                                }
617                                Some(winning)
618                            }
619                            TicketEvent::RejectedTicket(rejected, issuer) => {
620                                if let Some(issuer) = &issuer {
621                                    if let Err(error) =
622                                        node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
623                                    {
624                                        tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
625                                    } else {
626                                        tracing::debug!(%rejected, "marked ticket as rejected");
627                                    }
628                                } else {
629                                    tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
630                                }
631                                None
632                            }
633                        }
634                    }
635                })
636                .for_each(move |ticket| {
637                    if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
638                        tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
639                    }
640                    futures::future::ready(())
641                })
642                .inspect(|_| {
643                    tracing::warn!(
644                        task = %HoprLibProcess::TicketEvents,
645                        "long-running background task finished"
646                    )
647                }),
648        );
649
650        info!("starting transport");
651        let (hopr_socket, transport_processes) = self
652            .transport_api
653            .run(cover_traffic, indexer_peer_update_rx, tickets_tx, session_tx)
654            .await?;
655        processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
656
657        info!("starting ticket redemption service");
658        // Start a queue that takes care of redeeming tickets via given TicketSelectors
659        let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
660        let _ = self.redeem_requests.set(redemption_req_tx);
661        let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
662        processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
663        let chain = self.chain_api.clone();
664        let node_db = self.node_db.clone();
665        spawn(redemption_req_rx
666            .for_each(move |selector| {
667                let chain = chain.clone();
668                let db = node_db.clone();
669                async move {
670                    match chain.redeem_tickets_via_selectors(&db, [selector]).await {
671                        Ok(res) => debug!(%res, "redemption complete"),
672                        Err(error) => error!(%error, "redemption failed"),
673                    }
674                }
675            })
676            .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
677        );
678
679        info!("subscribing to channel events");
680        let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
681        processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
682        let chain = self.chain_api.clone();
683        let node_db = self.node_db.clone();
684        let events = chain.subscribe().map_err(HoprLibError::chain)?;
685        spawn(
686            futures::stream::Abortable::new(
687                events
688                    .filter_map(move |event|
689                        futures::future::ready(event.try_as_channel_closed())
690                    ),
691                chain_events_sub_reg
692            )
693            .for_each(move |closed_channel| {
694                let node_db = node_db.clone();
695                let chain = chain.clone();
696                async move {
697                    match closed_channel.direction(chain.me()) {
698                        Some(ChannelDirection::Incoming) => {
699                            match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
700                                Ok(num_neglected) if num_neglected > 0 => {
701                                    warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
702                                },
703                                Ok(_) => {
704                                    debug!(%closed_channel, "no neglected tickets on incoming closed channel");
705                                },
706                                Err(error) => {
707                                    error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
708                                }
709                            }
710                        },
711                        Some(ChannelDirection::Outgoing) => {
712                            if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
713                                error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
714                            } else {
715                                debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
716                            }
717                        }
718                        _ => {} // Event for a channel that is not our own
719                    }
720                }
721            })
722            .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
723        );
724
725        info!("synchronizing ticket states");
726        // NOTE: after the chain is synced, we can reset tickets which are considered
727        // redeemed but on-chain state does not align with that. This implies there was a problem
728        // right when the transaction was sent on-chain. In such cases, we simply let it retry and
729        // handle errors appropriately.
730        let mut channels = self
731            .chain_api
732            .stream_channels(ChannelSelector {
733                destination: self.me_onchain().into(),
734                ..Default::default()
735            })
736            .map_err(HoprLibError::chain)
737            .await?;
738
739        while let Some(channel) = channels.next().await {
740            // Set the state of all unredeemed tickets with a higher index than the current
741            // channel index as untouched.
742            self.node_db
743                .update_ticket_states_and_fetch(
744                    [TicketSelector::from(&channel)
745                        .with_state(AcknowledgedTicketStatus::BeingRedeemed)
746                        .with_index_range(channel.ticket_index..)],
747                    AcknowledgedTicketStatus::Untouched,
748                )
749                .map_err(HoprLibError::db)
750                .await?
751                .for_each(|ticket| {
752                    info!(%ticket, "fixed next out-of-sync ticket");
753                    futures::future::ready(())
754                })
755                .await;
756
757            // Mark all the tickets with a lower ticket index than the current channel index as neglected.
758            self.node_db
759                .mark_tickets_as(
760                    [TicketSelector::from(&channel).with_index_range(..channel.ticket_index)],
761                    TicketMarker::Neglected,
762                )
763                .map_err(HoprLibError::db)
764                .await?;
765        }
766
767        self.state.store(HoprState::Running, Ordering::Relaxed);
768
769        info!(
770            id = %self.me_peer_id(),
771            version = constants::APP_VERSION,
772            "NODE STARTED AND RUNNING"
773        );
774
775        #[cfg(all(feature = "prometheus", not(test)))]
776        METRIC_HOPR_NODE_INFO.set(
777            &[
778                &self.me.public().to_peerid_str(),
779                &self.me_onchain().to_string(),
780                &self.cfg.safe_module.safe_address.to_string(),
781                &self.cfg.safe_module.module_address.to_string(),
782            ],
783            1.0,
784        );
785
786        let _ = self.processes.set(processes);
787        Ok(hopr_socket)
788    }
789
790    /// Used to practically shut down all node's processes without dropping the instance.
791    ///
792    /// This means that the instance can be used to retrieve some information, but all
793    /// active operations will stop and new will be impossible to perform.
794    /// Such operations will return [`HoprStatusError::NotThereYet`].
795    ///
796    /// This is the final state and cannot be reversed by calling [`HoprLib::run`] again.
797    pub fn shutdown(&self) -> Result<(), HoprLibError> {
798        self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
799        if let Some(processes) = self.processes.get() {
800            processes.abort_all();
801        }
802        self.state.store(HoprState::Terminated, Ordering::Relaxed);
803        info!("NODE SHUTDOWN COMPLETE");
804        Ok(())
805    }
806
807    /// Allows external users to receive notifications about new winning tickets.
808    pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send {
809        self.winning_ticket_subscribers.1.activate_cloned()
810    }
811
812    // p2p transport =========
813    /// Own PeerId used in the libp2p transport layer
814    pub fn me_peer_id(&self) -> PeerId {
815        (*self.me.public()).into()
816    }
817
818    /// Get the list of all announced public nodes in the network
819    pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
820        Ok(self
821            .chain_api
822            .stream_accounts(AccountSelector {
823                public_only: true,
824                ..Default::default()
825            })
826            .map_err(HoprLibError::chain)
827            .await?
828            .map(|entry| {
829                (
830                    PeerId::from(entry.public_key),
831                    entry.chain_addr,
832                    entry.get_multiaddrs().to_vec(),
833                )
834            })
835            .collect()
836            .await)
837    }
838
839    /// Ping another node in the network based on the PeerId
840    ///
841    /// Returns the RTT (round trip time), i.e. how long it took for the ping to return.
842    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, Observations)> {
843        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
844
845        Ok(self.transport_api.ping(peer).await?)
846    }
847
848    /// Create a client session connection returning a session object that implements
849    /// [`futures::io::AsyncRead`] and [`futures::io::AsyncWrite`] and can bu used as a read/write binary session.
850    #[cfg(feature = "session-client")]
851    pub async fn connect_to(
852        &self,
853        destination: Address,
854        target: SessionTarget,
855        cfg: SessionClientConfig,
856    ) -> errors::Result<HoprSession> {
857        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
858
859        let backoff = backon::ConstantBuilder::default()
860            .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
861            .with_delay(self.cfg.protocol.session.establish_retry_timeout)
862            .with_jitter();
863
864        use backon::Retryable;
865
866        Ok((|| {
867            let cfg = cfg.clone();
868            let target = target.clone();
869            async { self.transport_api.new_session(destination, target, cfg).await }
870        })
871        .retry(backoff)
872        .sleep(backon::FuturesTimerSleeper)
873        .await?)
874    }
875
876    /// Sends keep-alive to the given [`HoprSessionId`], making sure the session is not
877    /// closed due to inactivity.
878    #[cfg(feature = "session-client")]
879    pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
880        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
881        Ok(self.transport_api.probe_session(id).await?)
882    }
883
884    #[cfg(feature = "session-client")]
885    pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
886        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
887        Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
888    }
889
890    #[cfg(all(feature = "session-client", feature = "telemetry"))]
891    pub async fn get_session_stats(&self, id: &SessionId) -> errors::Result<SessionStatsSnapshot> {
892        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
893        Ok(self.transport_api.session_stats(id).await?)
894    }
895
896    #[cfg(feature = "session-client")]
897    pub async fn update_session_surb_balancer_config(
898        &self,
899        id: &SessionId,
900        cfg: SurbBalancerConfig,
901    ) -> errors::Result<()> {
902        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
903        Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
904    }
905
906    /// List all multiaddresses announced by this node
907    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
908        self.transport_api.local_multiaddresses()
909    }
910
911    /// List all multiaddresses on which the node is listening
912    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
913        self.transport_api.listening_multiaddresses().await
914    }
915
916    /// List all multiaddresses observed for a PeerId
917    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
918        self.transport_api.network_observed_multiaddresses(peer).await
919    }
920
921    /// List all multiaddresses announced on-chain for the given node.
922    pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> errors::Result<Vec<Multiaddr>> {
923        let peer = *peer;
924        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
925        let pubkey =
926            hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer), "multiaddr_lookup")
927                .await??;
928
929        match self
930            .chain_api
931            .stream_accounts(AccountSelector {
932                public_only: false,
933                offchain_key: Some(pubkey),
934                ..Default::default()
935            })
936            .map_err(HoprLibError::chain)
937            .await?
938            .next()
939            .await
940        {
941            Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
942            None => {
943                error!(%peer, "no information");
944                Ok(vec![])
945            }
946        }
947    }
948
949    // Network =========
950
951    /// Get measured network health
952    pub async fn network_health(&self) -> Health {
953        self.transport_api.network_health().await
954    }
955
956    /// List all peers connected to this
957    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
958        Ok(self.transport_api.network_connected_peers().await?)
959    }
960
961    /// Get all data collected from the network relevant for a PeerId
962    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<Observations>> {
963        Ok(self.transport_api.network_peer_observations(peer).await?)
964    }
965
966    /// Get packet stats for a specific peer.
967    #[cfg(feature = "telemetry")]
968    pub async fn network_peer_packet_stats(&self, peer: &PeerId) -> errors::Result<Option<PeerPacketStatsSnapshot>> {
969        Ok(self.transport_api.network_peer_packet_stats(peer).await?)
970    }
971
972    /// Get packet stats for all connected peers.
973    #[cfg(feature = "telemetry")]
974    pub async fn network_all_packet_stats(&self) -> errors::Result<Vec<(PeerId, PeerPacketStatsSnapshot)>> {
975        Ok(self.transport_api.network_all_packet_stats().await?)
976    }
977
978    /// Get peers connected peers with quality higher than some value
979    pub async fn all_network_peers(
980        &self,
981        minimum_score: f64,
982    ) -> errors::Result<Vec<(Option<Address>, PeerId, Observations)>> {
983        Ok(
984            futures::stream::iter(self.transport_api.network_connected_peers().await?)
985                .filter_map(|peer| async move {
986                    if let Ok(Some(info)) = self.transport_api.network_peer_observations(&peer).await {
987                        if info.score() >= minimum_score {
988                            Some((peer, info))
989                        } else {
990                            None
991                        }
992                    } else {
993                        None
994                    }
995                })
996                .filter_map(|(peer_id, info)| async move {
997                    let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
998                    Some((address, peer_id, info))
999                })
1000                .collect::<Vec<_>>()
1001                .await,
1002        )
1003    }
1004
1005    // Ticket ========
1006    /// Get all tickets in a channel specified by [`channel_id`](ChannelId).
1007    pub async fn tickets_in_channel(&self, channel_id: &ChannelId) -> errors::Result<Option<Vec<RedeemableTicket>>> {
1008        if let Some(channel) = self
1009            .chain_api
1010            .channel_by_id(channel_id)
1011            .await
1012            .map_err(|e| HoprTransportError::Other(e.into()))?
1013        {
1014            if &channel.destination == self.chain_api.me() {
1015                Ok(Some(
1016                    self.node_db
1017                        .stream_tickets([&channel])
1018                        .await
1019                        .map_err(HoprLibError::db)?
1020                        .collect()
1021                        .await,
1022                ))
1023            } else {
1024                Ok(None)
1025            }
1026        } else {
1027            Ok(None)
1028        }
1029    }
1030
1031    /// Get all tickets
1032    pub async fn all_tickets(&self) -> errors::Result<Vec<VerifiedTicket>> {
1033        Ok(self
1034            .node_db
1035            .stream_tickets(None::<TicketSelector>)
1036            .await
1037            .map_err(HoprLibError::db)?
1038            .map(|v| v.ticket)
1039            .collect()
1040            .await)
1041    }
1042
1043    /// Get statistics for all tickets
1044    pub async fn ticket_statistics(&self) -> errors::Result<ChannelTicketStatistics> {
1045        self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
1046    }
1047
1048    /// Reset the ticket metrics to zero
1049    pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
1050        self.node_db
1051            .reset_ticket_statistics()
1052            .await
1053            .map_err(HoprLibError::chain)
1054    }
1055
1056    // Chain =========
1057    pub fn me_onchain(&self) -> Address {
1058        *self.chain_api.me()
1059    }
1060
1061    /// Get ticket price
1062    pub async fn get_ticket_price(&self) -> errors::Result<HoprBalance> {
1063        self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1064    }
1065
1066    /// Get minimum incoming ticket winning probability
1067    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
1068        self.chain_api
1069            .minimum_incoming_ticket_win_prob()
1070            .await
1071            .map_err(HoprLibError::chain)
1072    }
1073
1074    /// List of all accounts announced on the chain
1075    pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1076        Ok(self
1077            .chain_api
1078            .stream_accounts(AccountSelector {
1079                public_only: true,
1080                ..Default::default()
1081            })
1082            .map_err(HoprLibError::chain)
1083            .await?
1084            .collect()
1085            .await)
1086    }
1087
1088    /// Get the channel entry from Hash.
1089    /// @returns the channel entry of those two nodes
1090    pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1091        self.chain_api
1092            .channel_by_id(channel_id)
1093            .await
1094            .map_err(HoprLibError::chain)
1095    }
1096
1097    /// Get the channel entry between source and destination node.
1098    /// @param src Address
1099    /// @param dest Address
1100    /// @returns the channel entry of those two nodes
1101    pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<Option<ChannelEntry>> {
1102        self.chain_api
1103            .channel_by_parties(src, dest)
1104            .await
1105            .map_err(HoprLibError::chain)
1106    }
1107
1108    /// List all channels open from a specified Address
1109    pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1110        Ok(self
1111            .chain_api
1112            .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1113                ChannelStatusDiscriminants::Closed,
1114                ChannelStatusDiscriminants::Open,
1115                ChannelStatusDiscriminants::PendingToClose,
1116            ]))
1117            .map_err(HoprLibError::chain)
1118            .await?
1119            .collect()
1120            .await)
1121    }
1122
1123    /// List all channels open to a specified address
1124    pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1125        Ok(self
1126            .chain_api
1127            .stream_channels(
1128                ChannelSelector::default()
1129                    .with_destination(*dest)
1130                    .with_allowed_states(&[
1131                        ChannelStatusDiscriminants::Closed,
1132                        ChannelStatusDiscriminants::Open,
1133                        ChannelStatusDiscriminants::PendingToClose,
1134                    ]),
1135            )
1136            .map_err(HoprLibError::chain)
1137            .await?
1138            .collect()
1139            .await)
1140    }
1141
1142    /// List all channels
1143    pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1144        Ok(self
1145            .chain_api
1146            .stream_channels(ChannelSelector::default().with_allowed_states(&[
1147                ChannelStatusDiscriminants::Closed,
1148                ChannelStatusDiscriminants::Open,
1149                ChannelStatusDiscriminants::PendingToClose,
1150            ]))
1151            .map_err(HoprLibError::chain)
1152            .await?
1153            .collect()
1154            .await)
1155    }
1156
1157    /// Current safe allowance balance
1158    pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1159        self.chain_api
1160            .safe_allowance(self.cfg.safe_module.safe_address)
1161            .await
1162            .map_err(HoprLibError::chain)
1163    }
1164
1165    /// Withdraw on-chain assets to a given address
1166    /// @param recipient the account where the assets should be transferred to
1167    /// @param amount how many tokens to be transferred
1168    pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1169        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1170
1171        self.chain_api
1172            .withdraw(amount, &recipient)
1173            .and_then(identity)
1174            .map_err(HoprLibError::chain)
1175            .await
1176    }
1177
1178    /// Withdraw on-chain native assets to a given address
1179    /// @param recipient the account where the assets should be transferred to
1180    /// @param amount how many tokens to be transferred
1181    pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<prelude::Hash> {
1182        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1183
1184        self.chain_api
1185            .withdraw(amount, &recipient)
1186            .and_then(identity)
1187            .map_err(HoprLibError::chain)
1188            .await
1189    }
1190
1191    /// Spawns a one-shot awaiter that hooks up to the [`ChainEvent`] bus and either matching the given `predicate`
1192    /// successfully or timing out after `timeout`.
1193    fn spawn_wait_for_on_chain_event(
1194        &self,
1195        context: impl std::fmt::Display,
1196        predicate: impl Fn(&ChainEvent) -> bool + Send + Sync + 'static,
1197        timeout: Duration,
1198    ) -> errors::Result<(
1199        impl Future<Output = errors::Result<ChainEvent>>,
1200        hopr_async_runtime::AbortHandle,
1201    )> {
1202        debug!(%context, "registering wait for on-chain event");
1203        let (event_stream, handle) = futures::stream::abortable(
1204            self.chain_api
1205                .subscribe()
1206                .map_err(HoprLibError::chain)?
1207                .skip_while(move |event| futures::future::ready(!predicate(event))),
1208        );
1209
1210        let ctx = context.to_string();
1211
1212        Ok((
1213            spawn(async move {
1214                pin_mut!(event_stream);
1215                let res = event_stream
1216                    .next()
1217                    .timeout(futures_time::time::Duration::from(timeout))
1218                    .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")))
1219                    .await?
1220                    .ok_or(HoprLibError::GeneralError(format!(
1221                        "failed to yield an on-chain event for {ctx}"
1222                    )));
1223                debug!(%ctx, ?res, "on-chain event waiting done");
1224                res
1225            })
1226            .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")))
1227            .and_then(futures::future::ready),
1228            handle,
1229        ))
1230    }
1231
1232    pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1233        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1234
1235        let channel_id = generate_channel_id(&self.me_onchain(), destination);
1236
1237        let confirm_awaiter = self
1238            .chain_api
1239            .open_channel(destination, amount)
1240            .await
1241            .map_err(HoprLibError::chain)?;
1242
1243        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1244            format!("open channel to {destination} ({channel_id})"),
1245            move |event| matches!(event, ChainEvent::ChannelOpened(c) if c.get_id() == &channel_id),
1246            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1247        )?;
1248
1249        let tx_hash = confirm_awaiter.await.map_err(|e| {
1250            event_abort.abort();
1251            HoprLibError::chain(e)
1252        })?;
1253
1254        let event = event_awaiter.await?;
1255        debug!(%event, "open channel event received");
1256
1257        Ok(OpenChannelResult { tx_hash, channel_id })
1258    }
1259
1260    pub async fn fund_channel(&self, channel_id: &ChannelId, amount: HoprBalance) -> errors::Result<Hash> {
1261        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1262
1263        let channel_id = *channel_id;
1264
1265        let confirm_awaiter = self
1266            .chain_api
1267            .fund_channel(&channel_id, amount)
1268            .await
1269            .map_err(HoprLibError::chain)?;
1270
1271        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1272            format!("fund channel {channel_id}"),
1273            move |event| matches!(event, ChainEvent::ChannelBalanceIncreased(c, a) if c.get_id() == &channel_id && a == &amount),
1274            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT
1275        )?;
1276
1277        let res = confirm_awaiter.await.map_err(|e| {
1278            event_abort.abort();
1279            HoprLibError::chain(e)
1280        })?;
1281
1282        let event = event_awaiter.await?;
1283        debug!(%event, "fund channel event received");
1284
1285        Ok(res)
1286    }
1287
1288    pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> errors::Result<CloseChannelResult> {
1289        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1290
1291        let channel_id = *channel_id;
1292
1293        let confirm_awaiter = self
1294            .chain_api
1295            .close_channel(&channel_id)
1296            .await
1297            .map_err(HoprLibError::chain)?;
1298
1299        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1300            format!("close channel {channel_id}"),
1301            move |event| {
1302                matches!(event, ChainEvent::ChannelClosed(c) if c.get_id() == &channel_id)
1303                    || matches!(event, ChainEvent::ChannelClosureInitiated(c) if c.get_id() == &channel_id)
1304            },
1305            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1306        )?;
1307
1308        let tx_hash = confirm_awaiter.await.map_err(|e| {
1309            event_abort.abort();
1310            HoprLibError::chain(e)
1311        })?;
1312
1313        let event = event_awaiter.await?;
1314        debug!(%event, "close channel event received");
1315
1316        Ok(CloseChannelResult { tx_hash })
1317    }
1318
1319    pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1320        self.chain_api
1321            .channel_closure_notice_period()
1322            .await
1323            .map_err(HoprLibError::chain)
1324    }
1325
1326    pub fn redemption_requests(
1327        &self,
1328    ) -> errors::Result<impl futures::Sink<TicketSelector, Error = HoprLibError> + Clone> {
1329        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1330
1331        // TODO: add universal timeout sink here
1332        Ok(self
1333            .redeem_requests
1334            .get()
1335            .cloned()
1336            .expect("redeem_requests is not initialized")
1337            .sink_map_err(HoprLibError::other))
1338    }
1339
1340    pub async fn redeem_all_tickets<B: Into<HoprBalance>>(&self, min_value: B) -> errors::Result<()> {
1341        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1342
1343        let min_value = min_value.into();
1344
1345        self.chain_api
1346            .stream_channels(
1347                ChannelSelector::default()
1348                    .with_destination(self.me_onchain())
1349                    .with_allowed_states(&[
1350                        ChannelStatusDiscriminants::Open,
1351                        ChannelStatusDiscriminants::PendingToClose,
1352                    ]),
1353            )
1354            .map_err(HoprLibError::chain)
1355            .await?
1356            .map(|channel| {
1357                Ok(TicketSelector::from(&channel)
1358                    .with_amount(min_value..)
1359                    .with_index_range(channel.ticket_index..)
1360                    .with_state(AcknowledgedTicketStatus::Untouched))
1361            })
1362            .forward(self.redemption_requests()?)
1363            .await?;
1364
1365        Ok(())
1366    }
1367
1368    pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1369        &self,
1370        counterparty: &Address,
1371        min_value: B,
1372    ) -> errors::Result<()> {
1373        self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1374            .await
1375    }
1376
1377    pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1378        &self,
1379        channel_id: &Hash,
1380        min_value: B,
1381    ) -> errors::Result<()> {
1382        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1383
1384        let channel = self
1385            .chain_api
1386            .channel_by_id(channel_id)
1387            .await
1388            .map_err(HoprLibError::chain)?
1389            .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1390
1391        self.redemption_requests()?
1392            .send(
1393                TicketSelector::from(channel)
1394                    .with_amount(min_value.into()..)
1395                    .with_index_range(channel.ticket_index..)
1396                    .with_state(AcknowledgedTicketStatus::Untouched),
1397            )
1398            .await?;
1399
1400        Ok(())
1401    }
1402
1403    pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1404        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1405
1406        self.redemption_requests()?
1407            .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1408            .await?;
1409
1410        Ok(())
1411    }
1412
1413    pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1414        let peer_id = *peer_id;
1415        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
1416        let pubkey = hopr_parallelize::cpu::spawn_blocking(
1417            move || prelude::OffchainPublicKey::from_peerid(&peer_id),
1418            "chainkey_lookup",
1419        )
1420        .await??;
1421
1422        self.chain_api
1423            .packet_key_to_chain_key(&pubkey)
1424            .await
1425            .map_err(HoprLibError::chain)
1426    }
1427
1428    pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1429        self.chain_api
1430            .chain_key_to_packet_key(address)
1431            .await
1432            .map(|pk| pk.map(|v| v.into()))
1433            .map_err(HoprLibError::chain)
1434    }
1435}
1436
1437impl<Chain, Db> Hopr<Chain, Db> {
1438    // === telemetry
1439    /// Prometheus formatted metrics collected by the hopr-lib components.
1440    pub fn collect_hopr_metrics() -> errors::Result<String> {
1441        cfg_if::cfg_if! {
1442            if #[cfg(all(feature = "prometheus", not(test)))] {
1443                hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
1444            } else {
1445                Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
1446            }
1447        }
1448    }
1449}