hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15
16/// Helper functions.
17mod helpers;
18
19/// Configuration-related public types
20pub mod config;
21/// Various public constants.
22pub mod constants;
23/// Lists all errors thrown from this library.
24pub mod errors;
25
26/// Utility module with helper types and functionality over hopr-lib behavior.
27pub mod utils;
28
29/// Public traits for interactions with this library.
30pub mod traits;
31
32/// Functionality related to the HOPR node state.
33pub mod state;
34
35#[cfg(any(feature = "testing", test))]
36pub mod testing;
37
38pub use hopr_api as api;
39
40/// Exports of libraries necessary for API and interface operations.
41#[doc(hidden)]
42pub mod exports {
43    pub mod types {
44        pub use hopr_internal_types as internal;
45        pub use hopr_primitive_types as primitive;
46    }
47
48    pub mod crypto {
49        pub use hopr_crypto_keypair as keypair;
50        pub use hopr_crypto_types as types;
51    }
52
53    pub mod network {
54        pub use hopr_network_types as types;
55    }
56
57    pub use hopr_transport as transport;
58}
59
60/// Export of relevant types for easier integration.
61#[doc(hidden)]
62pub mod prelude {
63    pub use super::exports::{
64        crypto::{
65            keypair::key_pair::HoprKeys,
66            types::prelude::{ChainKeypair, Hash, OffchainKeypair},
67        },
68        network::types::{
69            prelude::ForeignDataMode,
70            udp::{ConnectedUdpStream, UdpStreamParallelism},
71        },
72        transport::{OffchainPublicKey, socket::HoprSocket},
73        types::primitive::prelude::Address,
74    };
75}
76
77use std::{
78    convert::identity,
79    future::Future,
80    num::NonZeroUsize,
81    sync::{Arc, OnceLock, atomic::Ordering},
82    time::Duration,
83};
84
85use futures::{FutureExt, SinkExt, Stream, StreamExt, TryFutureExt, channel::mpsc::channel, pin_mut};
86use futures_time::future::FutureExt as FuturesTimeFutureExt;
87use hopr_api::{
88    chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
89    ct::TrafficGeneration,
90    db::{HoprNodeDbApi, TicketMarker, TicketSelector},
91};
92pub use hopr_api::{db::ChannelTicketStatistics, network::Observable};
93use hopr_async_runtime::prelude::spawn;
94pub use hopr_async_runtime::{Abortable, AbortableList};
95pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
96pub use hopr_crypto_types::prelude::*;
97pub use hopr_internal_types::prelude::*;
98pub use hopr_network_types::prelude::*;
99#[cfg(all(feature = "prometheus", not(test)))]
100use hopr_platform::time::native::current_time;
101pub use hopr_primitive_types::prelude::*;
102use hopr_transport::errors::HoprTransportError;
103#[cfg(feature = "runtime-tokio")]
104pub use hopr_transport::transfer_session;
105pub use hopr_transport::*;
106use tracing::{debug, error, info, warn};
107use validator::Validate;
108
109pub use crate::{
110    config::SafeModule,
111    constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
112    errors::{HoprLibError, HoprStatusError},
113    state::{HoprLibProcess, HoprState},
114    traits::chain::{CloseChannelResult, OpenChannelResult},
115};
116
117#[cfg(all(feature = "prometheus", not(test)))]
118lazy_static::lazy_static! {
119    static ref METRIC_PROCESS_START_TIME:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
120        "hopr_start_time",
121        "The unix timestamp in seconds at which the process was started"
122    ).unwrap();
123    static ref METRIC_HOPR_LIB_VERSION:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
124        "hopr_lib_version",
125        "Executed version of hopr-lib",
126        &["version"]
127    ).unwrap();
128    static ref METRIC_HOPR_NODE_INFO:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
129        "hopr_node_addresses",
130        "Node on-chain and off-chain addresses",
131        &["peerid", "address", "safe_address", "module_address"]
132    ).unwrap();
133}
134
135/// Prepare an optimized version of the tokio runtime setup for hopr-lib specifically.
136///
137/// Divide the available CPU parallelism by 2, since half of the available threads are
138/// to be used for IO-bound and half for CPU-bound tasks.
139#[cfg(feature = "runtime-tokio")]
140pub fn prepare_tokio_runtime(
141    num_cpu_threads: Option<NonZeroUsize>,
142    num_io_threads: Option<NonZeroUsize>,
143) -> anyhow::Result<tokio::runtime::Runtime> {
144    use std::str::FromStr;
145    let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
146
147    hopr_parallelize::cpu::init_thread_pool(
148        num_cpu_threads
149            .map(|v| v.get())
150            .or(avail_parallelism)
151            .ok_or(anyhow::anyhow!(
152                "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
153                 environment variable."
154            ))?
155            .max(1),
156    )?;
157
158    Ok(tokio::runtime::Builder::new_multi_thread()
159        .enable_all()
160        .worker_threads(
161            num_io_threads
162                .map(|v| v.get())
163                .or(avail_parallelism)
164                .ok_or(anyhow::anyhow!(
165                    "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
166                     environment variable."
167                ))?
168                .max(1),
169        )
170        .thread_name("hoprd")
171        .thread_stack_size(
172            std::env::var("HOPRD_THREAD_STACK_SIZE")
173                .ok()
174                .and_then(|v| usize::from_str(&v).ok())
175                .unwrap_or(10 * 1024 * 1024)
176                .max(2 * 1024 * 1024),
177        )
178        .build()?)
179}
180
181/// Type alias used to send and receive transport data via a running HOPR node.
182pub type HoprTransportIO = socket::HoprSocket<
183    futures::channel::mpsc::Receiver<ApplicationDataIn>,
184    futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
185>;
186
187type NewTicketEvents = (
188    async_broadcast::Sender<VerifiedTicket>,
189    async_broadcast::InactiveReceiver<VerifiedTicket>,
190);
191
192/// Time to wait until the node's keybinding appears on-chain
193const NODE_READY_TIMEOUT: Duration = Duration::from_secs(30);
194
195/// Timeout to wait until an on-chain event is received in response to a successful on-chain operation resolution.
196// TODO: use the value from ChainInfo instead (once available via https://github.com/hoprnet/blokli/issues/200)
197const ON_CHAIN_RESOLUTION_EVENT_TIMEOUT: Duration = Duration::from_secs(90);
198
199/// HOPR main object providing the entire HOPR node functionality
200///
201/// Instantiating this object creates all processes and objects necessary for
202/// running the HOPR node. Once created, the node can be started using the
203/// `run()` method.
204///
205/// Externally offered API should be enough to perform all necessary tasks
206/// with the HOPR node manually, but it is advised to create such a configuration
207/// that manual interaction is unnecessary.
208///
209/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
210pub struct Hopr<Chain, Db> {
211    me: OffchainKeypair,
212    cfg: config::HoprLibConfig,
213    state: Arc<state::AtomicHoprState>,
214    transport_api: HoprTransport<Chain, Db>,
215    redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
216    node_db: Db,
217    chain_api: Chain,
218    winning_ticket_subscribers: NewTicketEvents,
219    processes: OnceLock<AbortableList<HoprLibProcess>>,
220}
221
222impl<Chain, Db> Hopr<Chain, Db>
223where
224    Chain: HoprChainApi + Clone + Send + Sync + 'static,
225    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
226{
227    pub async fn new(
228        identity: (&ChainKeypair, &OffchainKeypair),
229        hopr_chain_api: Chain,
230        hopr_node_db: Db,
231        cfg: config::HoprLibConfig,
232    ) -> errors::Result<Self> {
233        if hopr_crypto_random::is_rng_fixed() {
234            warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
235        }
236
237        cfg.validate()?;
238
239        let hopr_transport_api = HoprTransport::new(
240            identity,
241            hopr_chain_api.clone(),
242            hopr_node_db.clone(),
243            vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
244            cfg.protocol,
245        );
246
247        #[cfg(all(feature = "prometheus", not(test)))]
248        {
249            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
250            METRIC_HOPR_LIB_VERSION.set(
251                &[const_format::formatcp!("{}", constants::APP_VERSION)],
252                const_format::formatcp!(
253                    "{}.{}",
254                    env!("CARGO_PKG_VERSION_MAJOR"),
255                    env!("CARGO_PKG_VERSION_MINOR")
256                )
257                .parse()
258                .unwrap_or(0.0),
259            );
260
261            // Calling get_ticket_statistics will initialize the respective metrics on tickets
262            if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
263                error!(%error, "failed to initialize ticket statistics metrics");
264            }
265        }
266
267        let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
268        new_tickets_tx.set_await_active(false);
269        new_tickets_tx.set_overflow(true);
270
271        Ok(Self {
272            me: identity.1.clone(),
273            cfg,
274            state: Arc::new(state::AtomicHoprState::new(HoprState::Uninitialized)),
275            transport_api: hopr_transport_api,
276            chain_api: hopr_chain_api,
277            node_db: hopr_node_db,
278            redeem_requests: OnceLock::new(),
279            processes: OnceLock::new(),
280            winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
281        })
282    }
283
284    fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
285        if self.status() == state {
286            Ok(())
287        } else {
288            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
289        }
290    }
291
292    pub fn status(&self) -> HoprState {
293        self.state.load(Ordering::Relaxed)
294    }
295
296    pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
297        self.chain_api
298            .balance(self.me_onchain())
299            .await
300            .map_err(HoprLibError::chain)
301    }
302
303    pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
304        self.chain_api
305            .balance(self.cfg.safe_module.safe_address)
306            .await
307            .map_err(HoprLibError::chain)
308    }
309
310    pub async fn chain_info(&self) -> errors::Result<ChainInfo> {
311        self.chain_api.chain_info().await.map_err(HoprLibError::chain)
312    }
313
314    pub fn get_safe_config(&self) -> SafeModule {
315        self.cfg.safe_module.clone()
316    }
317
318    pub fn config(&self) -> &config::HoprLibConfig {
319        &self.cfg
320    }
321
322    #[inline]
323    fn is_public(&self) -> bool {
324        self.cfg.publish
325    }
326
327    pub async fn run<
328        Ct,
329        #[cfg(feature = "session-server")] T: traits::session::HoprSessionServer + Clone + Send + 'static,
330    >(
331        &self,
332        cover_traffic: Ct,
333        #[cfg(feature = "session-server")] serve_handler: T,
334    ) -> errors::Result<HoprTransportIO>
335    where
336        Ct: TrafficGeneration + Send + Sync + 'static,
337    {
338        self.error_if_not_in_state(
339            HoprState::Uninitialized,
340            "cannot start the hopr node multiple times".into(),
341        )?;
342
343        #[cfg(feature = "testing")]
344        warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
345
346        info!(
347            address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
348            "node is not started, please fund this node",
349        );
350
351        helpers::wait_for_funds(
352            *MIN_NATIVE_BALANCE,
353            *SUGGESTED_NATIVE_BALANCE,
354            Duration::from_secs(200),
355            self.me_onchain(),
356            &self.chain_api,
357        )
358        .await?;
359
360        let mut processes = AbortableList::<HoprLibProcess>::default();
361
362        info!("starting HOPR node...");
363        self.state.store(HoprState::Initializing, Ordering::Relaxed);
364
365        let balance: XDaiBalance = self.get_balance().await?;
366        let minimum_balance = *constants::MIN_NATIVE_BALANCE;
367
368        info!(
369            address = %self.me_onchain(),
370            %balance,
371            %minimum_balance,
372            "node information"
373        );
374
375        if balance.le(&minimum_balance) {
376            return Err(HoprLibError::GeneralError(
377                "cannot start the node without a sufficiently funded wallet".into(),
378            ));
379        }
380
381        // Once we are able to query the chain,
382        // check if the ticket price is configured correctly.
383        let network_min_ticket_price = self
384            .chain_api
385            .minimum_ticket_price()
386            .await
387            .map_err(HoprLibError::chain)?;
388        let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
389        if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
390            return Err(HoprLibError::GeneralError(format!(
391                "configured outgoing ticket price is lower than the network minimum ticket price: \
392                 {configured_ticket_price:?} < {network_min_ticket_price}"
393            )));
394        }
395        // Once we are able to query the chain,
396        // check if the winning probability is configured correctly.
397        let network_min_win_prob = self
398            .chain_api
399            .minimum_incoming_ticket_win_prob()
400            .await
401            .map_err(HoprLibError::chain)?;
402        let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
403        if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
404            && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
405        {
406            return Err(HoprLibError::GeneralError(format!(
407                "configured outgoing ticket winning probability is lower than the network minimum winning \
408                 probability: {configured_win_prob:?} < {network_min_win_prob}"
409            )));
410        }
411
412        // Calculate the minimum capacity based on accounts (each account can generate 2 messages),
413        // plus 100 as an additional buffer
414        let minimum_capacity = self
415            .chain_api
416            .count_accounts(AccountSelector {
417                public_only: true,
418                ..Default::default()
419            })
420            .await
421            .map_err(HoprLibError::chain)?
422            .saturating_mul(2)
423            .saturating_add(100);
424
425        let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
426            .ok()
427            .and_then(|s| s.trim().parse::<usize>().ok())
428            .filter(|&c| c > 0)
429            .unwrap_or(2048)
430            .max(minimum_capacity);
431
432        debug!(
433            capacity = chain_discovery_events_capacity,
434            minimum_required = minimum_capacity,
435            "creating chain discovery events channel"
436        );
437        let (indexer_peer_update_tx, indexer_peer_update_rx) =
438            channel::<PeerDiscovery>(chain_discovery_events_capacity);
439
440        // Stream all the existing announcements and also subscribe to all future on-chain
441        // announcements
442        let (announcements_stream, announcements_handle) = futures::stream::abortable(
443            self.chain_api
444                .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])
445                .map_err(HoprLibError::chain)?,
446        );
447        processes.insert(HoprLibProcess::AccountAnnouncements, announcements_handle);
448
449        spawn(
450            announcements_stream
451                .filter_map(|event| {
452                    futures::future::ready(event.try_as_announcement().map(|account| {
453                        PeerDiscovery::Announce(account.public_key.into(), account.get_multiaddrs().to_vec())
454                    }))
455                })
456                .map(Ok)
457                .forward(indexer_peer_update_tx)
458                .inspect(
459                    |_| warn!(task = %HoprLibProcess::AccountAnnouncements,"long-running background task finished"),
460                ),
461        );
462
463        info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
464
465        let safe_addr = self.cfg.safe_module.safe_address;
466
467        if self.me_onchain() == safe_addr {
468            return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
469        }
470
471        info!(%safe_addr, "registering safe with this node");
472        match self.chain_api.register_safe(&safe_addr).await {
473            Ok(awaiter) => {
474                // Wait until the registration is confirmed on-chain, otherwise we cannot proceed.
475                awaiter.await.map_err(|error| {
476                    error!(%safe_addr, %error, "safe registration failed with error");
477                    HoprLibError::chain(error)
478                })?;
479                info!(%safe_addr, "safe successfully registered with this node");
480            }
481            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
482                info!(%safe_addr, "this safe is already registered with this node");
483            }
484            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
485                // TODO: support safe deregistration flow
486                error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
487                return Err(HoprLibError::GeneralError("node registered with different safe".into()));
488            }
489            Err(error) => {
490                error!(%safe_addr, %error, "safe registration failed");
491                return Err(HoprLibError::chain(error));
492            }
493        }
494
495        // Only public nodes announce multiaddresses
496        let multiaddresses_to_announce = if self.is_public() {
497            // The multiaddresses are filtered for the non-private ones,
498            // unless `announce_local_addresses` is set to `true`.
499            self.transport_api.announceable_multiaddresses()
500        } else {
501            Vec::with_capacity(0)
502        };
503
504        // Warn when announcing a private multiaddress, which is acceptable in certain scenarios
505        multiaddresses_to_announce
506            .iter()
507            .filter(|a| !is_public_address(a))
508            .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
509
510        let chain_api = self.chain_api.clone();
511        let me_offchain = *self.me.public();
512        let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
513
514        // At this point the node is already registered with Safe, so
515        // we can announce via Safe-compliant TX
516        info!(?multiaddresses_to_announce, "announcing node on chain");
517        match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
518            Ok(awaiter) => {
519                // Wait until the announcement is confirmed on-chain, otherwise we cannot proceed.
520                awaiter.await.map_err(|error| {
521                    error!(?multiaddresses_to_announce, %error, "node announcement failed");
522                    HoprLibError::chain(error)
523                })?;
524                info!(?multiaddresses_to_announce, "node has been successfully announced");
525            }
526            Err(AnnouncementError::AlreadyAnnounced) => {
527                info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
528            }
529            Err(error) => {
530                error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
531                return Err(HoprLibError::chain(error));
532            }
533        }
534
535        // Wait for the node key-binding readiness to return
536        let this_node_account = node_ready
537            .await
538            .map_err(HoprLibError::other)?
539            .map_err(HoprLibError::chain)?;
540        if this_node_account.chain_addr != self.me_onchain()
541            || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
542        {
543            error!(%this_node_account, "account bound to offchain key does not match this node");
544            return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
545        }
546
547        info!(%this_node_account, "node account is ready");
548
549        let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
550            .ok()
551            .and_then(|s| s.trim().parse::<usize>().ok())
552            .filter(|&c| c > 0)
553            .unwrap_or(256);
554
555        let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
556        #[cfg(feature = "session-server")]
557        {
558            debug!(capacity = incoming_session_channel_capacity, "creating session server");
559            processes.insert(
560                HoprLibProcess::SessionServer,
561                hopr_async_runtime::spawn_as_abortable!(
562                    _session_rx
563                        .for_each_concurrent(None, move |session| {
564                            let serve_handler = serve_handler.clone();
565                            async move {
566                                let session_id = *session.session.id();
567                                match serve_handler.process(session).await {
568                                    Ok(_) => debug!(?session_id, "client session processed successfully"),
569                                    Err(error) => error!(
570                                        ?session_id,
571                                        %error,
572                                        "client session processing failed"
573                                    ),
574                                }
575                            }
576                        })
577                        .inspect(|_| tracing::warn!(
578                            task = %HoprLibProcess::SessionServer,
579                            "long-running background task finished"
580                        ))
581                ),
582            );
583        }
584
585        info!("starting ticket events processor");
586        let (tickets_tx, tickets_rx) = channel(1024);
587        let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
588        processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
589        let node_db = self.node_db.clone();
590        let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
591        spawn(
592            tickets_rx
593                .filter_map(move |ticket_event| {
594                    let node_db = node_db.clone();
595                    async move {
596                        match ticket_event {
597                            TicketEvent::WinningTicket(winning) => {
598                                if let Err(error) = node_db.insert_ticket(*winning).await {
599                                    tracing::error!(%error, %winning, "failed to insert ticket into database");
600                                } else {
601                                    tracing::debug!(%winning, "inserted ticket into database");
602                                }
603                                Some(winning)
604                            }
605                            TicketEvent::RejectedTicket(rejected, issuer) => {
606                                if let Some(issuer) = &issuer {
607                                    if let Err(error) =
608                                        node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
609                                    {
610                                        tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
611                                    } else {
612                                        tracing::debug!(%rejected, "marked ticket as rejected");
613                                    }
614                                } else {
615                                    tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
616                                }
617                                None
618                            }
619                        }
620                    }
621                })
622                .for_each(move |ticket| {
623                    if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
624                        tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
625                    }
626                    futures::future::ready(())
627                })
628                .inspect(|_| {
629                    tracing::warn!(
630                        task = %HoprLibProcess::TicketEvents,
631                        "long-running background task finished"
632                    )
633                }),
634        );
635
636        info!("starting transport");
637        let (hopr_socket, transport_processes) = self
638            .transport_api
639            .run(cover_traffic, indexer_peer_update_rx, tickets_tx, session_tx)
640            .await?;
641        processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
642
643        // Start a queue that takes care of redeeming tickets via given TicketSelectors
644        let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
645        let _ = self.redeem_requests.set(redemption_req_tx);
646        let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
647        processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
648        let chain = self.chain_api.clone();
649        let node_db = self.node_db.clone();
650        spawn(redemption_req_rx
651            .for_each(move |selector| {
652                let chain = chain.clone();
653                let db = node_db.clone();
654                async move {
655                    match chain.redeem_tickets_via_selectors(&db, [selector]).await {
656                        Ok(res) => debug!(%res, "redemption complete"),
657                        Err(error) => error!(%error, "redemption failed"),
658                    }
659                }
660            })
661            .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
662        );
663
664        let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
665        processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
666        let chain = self.chain_api.clone();
667        let node_db = self.node_db.clone();
668        let events = chain.subscribe().map_err(HoprLibError::chain)?;
669        spawn(
670            futures::stream::Abortable::new(
671                events
672                    .filter_map(move |event|
673                        futures::future::ready(event.try_as_channel_closed())
674                    ),
675                chain_events_sub_reg
676            )
677            .for_each(move |closed_channel| {
678                let node_db = node_db.clone();
679                let chain = chain.clone();
680                async move {
681                    match closed_channel.direction(chain.me()) {
682                        Some(ChannelDirection::Incoming) => {
683                            match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
684                                Ok(num_neglected) if num_neglected > 0 => {
685                                    warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
686                                },
687                                Ok(_) => {
688                                    debug!(%closed_channel, "no neglected tickets on incoming closed channel");
689                                },
690                                Err(error) => {
691                                    error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
692                                }
693                            }
694                        },
695                        Some(ChannelDirection::Outgoing) => {
696                            if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
697                                error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
698                            } else {
699                                debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
700                            }
701                        }
702                        _ => {} // Event for a channel that is not our own
703                    }
704                }
705            })
706            .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
707        );
708
709        // NOTE: after the chain is synced, we can reset tickets which are considered
710        // redeemed but on-chain state does not align with that. This implies there was a problem
711        // right when the transaction was sent on-chain. In such cases, we simply let it retry and
712        // handle errors appropriately.
713        let mut channels = self
714            .chain_api
715            .stream_channels(ChannelSelector {
716                destination: self.me_onchain().into(),
717                ..Default::default()
718            })
719            .map_err(HoprLibError::chain)
720            .await?;
721
722        while let Some(channel) = channels.next().await {
723            self.node_db
724                .update_ticket_states_and_fetch(
725                    [TicketSelector::from(&channel)
726                        .with_state(AcknowledgedTicketStatus::BeingRedeemed)
727                        .with_index_range(channel.ticket_index..)],
728                    AcknowledgedTicketStatus::Untouched,
729                )
730                .map_err(HoprLibError::db)
731                .await?
732                .for_each(|ticket| {
733                    info!(%ticket, "fixed next out-of-sync ticket");
734                    futures::future::ready(())
735                })
736                .await;
737        }
738
739        self.state.store(HoprState::Running, Ordering::Relaxed);
740
741        info!(
742            id = %self.me_peer_id(),
743            version = constants::APP_VERSION,
744            "NODE STARTED AND RUNNING"
745        );
746
747        #[cfg(all(feature = "prometheus", not(test)))]
748        METRIC_HOPR_NODE_INFO.set(
749            &[
750                &self.me.public().to_peerid_str(),
751                &self.me_onchain().to_string(),
752                &self.cfg.safe_module.safe_address.to_string(),
753                &self.cfg.safe_module.module_address.to_string(),
754            ],
755            1.0,
756        );
757
758        let _ = self.processes.set(processes);
759        Ok(hopr_socket)
760    }
761
762    /// Used to practically shut down all node's processes without dropping the instance.
763    ///
764    /// This means that the instance can be used to retrieve some information, but all
765    /// active operations will stop and new will be impossible to perform.
766    /// Such operations will return [`HoprStatusError::NotThereYet`].
767    ///
768    /// This is the final state and cannot be reversed by calling [`HoprLib::run`] again.
769    pub fn shutdown(&self) -> Result<(), HoprLibError> {
770        self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
771        if let Some(processes) = self.processes.get() {
772            processes.abort_all();
773        }
774        self.state.store(HoprState::Terminated, Ordering::Relaxed);
775        info!("NODE SHUTDOWN COMPLETE");
776        Ok(())
777    }
778
779    /// Allows external users to receive notifications about new winning tickets.
780    pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send {
781        self.winning_ticket_subscribers.1.activate_cloned()
782    }
783
784    // p2p transport =========
785    /// Own PeerId used in the libp2p transport layer
786    pub fn me_peer_id(&self) -> PeerId {
787        (*self.me.public()).into()
788    }
789
790    /// Get the list of all announced public nodes in the network
791    pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
792        Ok(self
793            .chain_api
794            .stream_accounts(AccountSelector {
795                public_only: true,
796                ..Default::default()
797            })
798            .map_err(HoprLibError::chain)
799            .await?
800            .map(|entry| {
801                (
802                    PeerId::from(entry.public_key),
803                    entry.chain_addr,
804                    entry.get_multiaddrs().to_vec(),
805                )
806            })
807            .collect()
808            .await)
809    }
810
811    /// Ping another node in the network based on the PeerId
812    ///
813    /// Returns the RTT (round trip time), i.e. how long it took for the ping to return.
814    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, Observations)> {
815        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
816
817        Ok(self.transport_api.ping(peer).await?)
818    }
819
820    /// Create a client session connection returning a session object that implements
821    /// [`futures::io::AsyncRead`] and [`futures::io::AsyncWrite`] and can bu used as a read/write binary session.
822    #[cfg(feature = "session-client")]
823    pub async fn connect_to(
824        &self,
825        destination: Address,
826        target: SessionTarget,
827        cfg: SessionClientConfig,
828    ) -> errors::Result<HoprSession> {
829        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
830
831        let backoff = backon::ConstantBuilder::default()
832            .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
833            .with_delay(self.cfg.protocol.session.establish_retry_timeout)
834            .with_jitter();
835
836        use backon::Retryable;
837
838        Ok((|| {
839            let cfg = cfg.clone();
840            let target = target.clone();
841            async { self.transport_api.new_session(destination, target, cfg).await }
842        })
843        .retry(backoff)
844        .sleep(backon::FuturesTimerSleeper)
845        .await?)
846    }
847
848    /// Sends keep-alive to the given [`HoprSessionId`], making sure the session is not
849    /// closed due to inactivity.
850    #[cfg(feature = "session-client")]
851    pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
852        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
853        Ok(self.transport_api.probe_session(id).await?)
854    }
855
856    #[cfg(feature = "session-client")]
857    pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
858        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
859        Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
860    }
861
862    #[cfg(feature = "session-client")]
863    pub async fn update_session_surb_balancer_config(
864        &self,
865        id: &SessionId,
866        cfg: SurbBalancerConfig,
867    ) -> errors::Result<()> {
868        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
869        Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
870    }
871
872    /// List all multiaddresses announced by this node
873    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
874        self.transport_api.local_multiaddresses()
875    }
876
877    /// List all multiaddresses on which the node is listening
878    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
879        self.transport_api.listening_multiaddresses().await
880    }
881
882    /// List all multiaddresses observed for a PeerId
883    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
884        self.transport_api.network_observed_multiaddresses(peer).await
885    }
886
887    /// List all multiaddresses announced on-chain for the given node.
888    pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> errors::Result<Vec<Multiaddr>> {
889        let peer = *peer;
890        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
891        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
892
893        match self
894            .chain_api
895            .stream_accounts(AccountSelector {
896                public_only: false,
897                offchain_key: Some(pubkey),
898                ..Default::default()
899            })
900            .map_err(HoprLibError::chain)
901            .await?
902            .next()
903            .await
904        {
905            Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
906            None => {
907                error!(%peer, "no information");
908                Ok(vec![])
909            }
910        }
911    }
912
913    // Network =========
914
915    /// Get measured network health
916    pub async fn network_health(&self) -> Health {
917        self.transport_api.network_health().await
918    }
919
920    /// List all peers connected to this
921    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
922        Ok(self.transport_api.network_connected_peers().await?)
923    }
924
925    /// Get all data collected from the network relevant for a PeerId
926    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<Observations>> {
927        Ok(self.transport_api.network_peer_observations(peer).await?)
928    }
929
930    /// Get peers connected peers with quality higher than some value
931    pub async fn all_network_peers(
932        &self,
933        minimum_score: f64,
934    ) -> errors::Result<Vec<(Option<Address>, PeerId, Observations)>> {
935        Ok(
936            futures::stream::iter(self.transport_api.network_connected_peers().await?)
937                .filter_map(|peer| async move {
938                    if let Ok(Some(info)) = self.transport_api.network_peer_observations(&peer).await {
939                        if info.score() >= minimum_score {
940                            Some((peer, info))
941                        } else {
942                            None
943                        }
944                    } else {
945                        None
946                    }
947                })
948                .filter_map(|(peer_id, info)| async move {
949                    let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
950                    Some((address, peer_id, info))
951                })
952                .collect::<Vec<_>>()
953                .await,
954        )
955    }
956
957    // Ticket ========
958    /// Get all tickets in a channel specified by [`channel_id`](ChannelId).
959    pub async fn tickets_in_channel(&self, channel_id: &ChannelId) -> errors::Result<Option<Vec<RedeemableTicket>>> {
960        if let Some(channel) = self
961            .chain_api
962            .channel_by_id(channel_id)
963            .await
964            .map_err(|e| HoprTransportError::Other(e.into()))?
965        {
966            if &channel.destination == self.chain_api.me() {
967                Ok(Some(
968                    self.node_db
969                        .stream_tickets([&channel])
970                        .await
971                        .map_err(HoprLibError::db)?
972                        .collect()
973                        .await,
974                ))
975            } else {
976                Ok(None)
977            }
978        } else {
979            Ok(None)
980        }
981    }
982
983    /// Get all tickets
984    pub async fn all_tickets(&self) -> errors::Result<Vec<VerifiedTicket>> {
985        Ok(self
986            .node_db
987            .stream_tickets(None::<TicketSelector>)
988            .await
989            .map_err(HoprLibError::db)?
990            .map(|v| v.ticket)
991            .collect()
992            .await)
993    }
994
995    /// Get statistics for all tickets
996    pub async fn ticket_statistics(&self) -> errors::Result<ChannelTicketStatistics> {
997        self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
998    }
999
1000    /// Reset the ticket metrics to zero
1001    pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
1002        self.node_db
1003            .reset_ticket_statistics()
1004            .await
1005            .map_err(HoprLibError::chain)
1006    }
1007
1008    // Chain =========
1009    pub fn me_onchain(&self) -> Address {
1010        *self.chain_api.me()
1011    }
1012
1013    /// Get ticket price
1014    pub async fn get_ticket_price(&self) -> errors::Result<HoprBalance> {
1015        self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1016    }
1017
1018    /// Get minimum incoming ticket winning probability
1019    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
1020        self.chain_api
1021            .minimum_incoming_ticket_win_prob()
1022            .await
1023            .map_err(HoprLibError::chain)
1024    }
1025
1026    /// List of all accounts announced on the chain
1027    pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1028        Ok(self
1029            .chain_api
1030            .stream_accounts(AccountSelector {
1031                public_only: true,
1032                ..Default::default()
1033            })
1034            .map_err(HoprLibError::chain)
1035            .await?
1036            .collect()
1037            .await)
1038    }
1039
1040    /// Get the channel entry from Hash.
1041    /// @returns the channel entry of those two nodes
1042    pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1043        self.chain_api
1044            .channel_by_id(channel_id)
1045            .await
1046            .map_err(HoprLibError::chain)
1047    }
1048
1049    /// Get the channel entry between source and destination node.
1050    /// @param src Address
1051    /// @param dest Address
1052    /// @returns the channel entry of those two nodes
1053    pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<Option<ChannelEntry>> {
1054        self.chain_api
1055            .channel_by_parties(src, dest)
1056            .await
1057            .map_err(HoprLibError::chain)
1058    }
1059
1060    /// List all channels open from a specified Address
1061    pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1062        Ok(self
1063            .chain_api
1064            .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1065                ChannelStatusDiscriminants::Closed,
1066                ChannelStatusDiscriminants::Open,
1067                ChannelStatusDiscriminants::PendingToClose,
1068            ]))
1069            .map_err(HoprLibError::chain)
1070            .await?
1071            .collect()
1072            .await)
1073    }
1074
1075    /// List all channels open to a specified address
1076    pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1077        Ok(self
1078            .chain_api
1079            .stream_channels(
1080                ChannelSelector::default()
1081                    .with_destination(*dest)
1082                    .with_allowed_states(&[
1083                        ChannelStatusDiscriminants::Closed,
1084                        ChannelStatusDiscriminants::Open,
1085                        ChannelStatusDiscriminants::PendingToClose,
1086                    ]),
1087            )
1088            .map_err(HoprLibError::chain)
1089            .await?
1090            .collect()
1091            .await)
1092    }
1093
1094    /// List all channels
1095    pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1096        Ok(self
1097            .chain_api
1098            .stream_channels(ChannelSelector::default().with_allowed_states(&[
1099                ChannelStatusDiscriminants::Closed,
1100                ChannelStatusDiscriminants::Open,
1101                ChannelStatusDiscriminants::PendingToClose,
1102            ]))
1103            .map_err(HoprLibError::chain)
1104            .await?
1105            .collect()
1106            .await)
1107    }
1108
1109    /// Current safe allowance balance
1110    pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1111        self.chain_api
1112            .safe_allowance(self.cfg.safe_module.safe_address)
1113            .await
1114            .map_err(HoprLibError::chain)
1115    }
1116
1117    /// Withdraw on-chain assets to a given address
1118    /// @param recipient the account where the assets should be transferred to
1119    /// @param amount how many tokens to be transferred
1120    pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1121        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1122
1123        self.chain_api
1124            .withdraw(amount, &recipient)
1125            .and_then(identity)
1126            .map_err(HoprLibError::chain)
1127            .await
1128    }
1129
1130    /// Withdraw on-chain native assets to a given address
1131    /// @param recipient the account where the assets should be transferred to
1132    /// @param amount how many tokens to be transferred
1133    pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<prelude::Hash> {
1134        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1135
1136        self.chain_api
1137            .withdraw(amount, &recipient)
1138            .and_then(identity)
1139            .map_err(HoprLibError::chain)
1140            .await
1141    }
1142
1143    /// Spawns a one-shot awaiter that hooks up to the [`ChainEvent`] bus and either matching the given `predicate`
1144    /// successfully or timing out after `timeout`.
1145    fn spawn_wait_for_on_chain_event(
1146        &self,
1147        context: impl std::fmt::Display,
1148        predicate: impl Fn(&ChainEvent) -> bool + Send + Sync + 'static,
1149        timeout: Duration,
1150    ) -> errors::Result<impl Future<Output = errors::Result<ChainEvent>>> {
1151        debug!(%context, "registering wait for on-chain event");
1152        let event_stream = self
1153            .chain_api
1154            .subscribe()
1155            .map_err(HoprLibError::chain)?
1156            .skip_while(move |event| futures::future::ready(!predicate(event)));
1157
1158        let ctx = context.to_string();
1159        Ok(spawn(async move {
1160            pin_mut!(event_stream);
1161            let res = event_stream
1162                .next()
1163                .timeout(futures_time::time::Duration::from(timeout))
1164                .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")))
1165                .await?
1166                .ok_or(HoprLibError::GeneralError(format!(
1167                    "failed to yield an on-chain event for {ctx}"
1168                )));
1169            debug!(%ctx, ?res, "on-chain event waiting done");
1170            res
1171        })
1172        .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")))
1173        .and_then(futures::future::ready))
1174    }
1175
1176    pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1177        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1178
1179        let channel_id = generate_channel_id(&self.me_onchain(), destination);
1180
1181        let event_awaiter = self.spawn_wait_for_on_chain_event(
1182            format!("open channel to {destination} ({channel_id})"),
1183            move |event| matches!(event, ChainEvent::ChannelOpened(c) if c.get_id() == &channel_id),
1184            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1185        )?;
1186
1187        let tx_hash = self
1188            .chain_api
1189            .open_channel(destination, amount)
1190            .and_then(identity)
1191            .map_err(HoprLibError::chain)
1192            .await?;
1193
1194        let event = event_awaiter.await?;
1195        debug!(%event, "open channel event received");
1196
1197        Ok(OpenChannelResult { tx_hash, channel_id })
1198    }
1199
1200    pub async fn fund_channel(&self, channel_id: &ChannelId, amount: HoprBalance) -> errors::Result<Hash> {
1201        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1202
1203        let channel_id = *channel_id;
1204        let event_awaiter = self.spawn_wait_for_on_chain_event(
1205            format!("fund channel {channel_id}"),
1206            move |event| matches!(event, ChainEvent::ChannelBalanceIncreased(c, a) if c.get_id() == &channel_id && a == &amount),
1207            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT
1208        )?;
1209
1210        let res = self
1211            .chain_api
1212            .fund_channel(&channel_id, amount)
1213            .and_then(identity)
1214            .map_err(HoprLibError::chain)
1215            .await?;
1216
1217        let event = event_awaiter.await?;
1218        debug!(%event, "fund channel event received");
1219
1220        Ok(res)
1221    }
1222
1223    pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> errors::Result<CloseChannelResult> {
1224        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1225
1226        let channel_id = *channel_id;
1227        let event_awaiter = self.spawn_wait_for_on_chain_event(
1228            format!("close channel {channel_id}"),
1229            move |event| {
1230                matches!(event, ChainEvent::ChannelClosed(c) if c.get_id() == &channel_id)
1231                    || matches!(event, ChainEvent::ChannelClosureInitiated(c) if c.get_id() == &channel_id)
1232            },
1233            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1234        )?;
1235
1236        let tx_hash = self
1237            .chain_api
1238            .close_channel(&channel_id)
1239            .and_then(identity)
1240            .map_err(HoprLibError::chain)
1241            .await?;
1242
1243        let event = event_awaiter.await?;
1244        debug!(%event, "close channel event received");
1245
1246        Ok(CloseChannelResult { tx_hash })
1247    }
1248
1249    pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1250        self.chain_api
1251            .channel_closure_notice_period()
1252            .await
1253            .map_err(HoprLibError::chain)
1254    }
1255
1256    pub fn redemption_requests(
1257        &self,
1258    ) -> errors::Result<impl futures::Sink<TicketSelector, Error = HoprLibError> + Clone> {
1259        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1260
1261        // TODO: add universal timeout sink here
1262        Ok(self
1263            .redeem_requests
1264            .get()
1265            .cloned()
1266            .expect("redeem_requests is not initialized")
1267            .sink_map_err(HoprLibError::other))
1268    }
1269
1270    pub async fn redeem_all_tickets<B: Into<HoprBalance>>(&self, min_value: B) -> errors::Result<()> {
1271        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1272
1273        let min_value = min_value.into();
1274
1275        self.chain_api
1276            .stream_channels(
1277                ChannelSelector::default()
1278                    .with_destination(self.me_onchain())
1279                    .with_allowed_states(&[
1280                        ChannelStatusDiscriminants::Open,
1281                        ChannelStatusDiscriminants::PendingToClose,
1282                    ]),
1283            )
1284            .map_err(HoprLibError::chain)
1285            .await?
1286            .map(|channel| {
1287                Ok(TicketSelector::from(&channel)
1288                    .with_amount(min_value..)
1289                    .with_index_range(channel.ticket_index..)
1290                    .with_state(AcknowledgedTicketStatus::Untouched))
1291            })
1292            .forward(self.redemption_requests()?)
1293            .await?;
1294
1295        Ok(())
1296    }
1297
1298    pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1299        &self,
1300        counterparty: &Address,
1301        min_value: B,
1302    ) -> errors::Result<()> {
1303        self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1304            .await
1305    }
1306
1307    pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1308        &self,
1309        channel_id: &Hash,
1310        min_value: B,
1311    ) -> errors::Result<()> {
1312        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1313
1314        let channel = self
1315            .chain_api
1316            .channel_by_id(channel_id)
1317            .await
1318            .map_err(HoprLibError::chain)?
1319            .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1320
1321        self.redemption_requests()?
1322            .send(
1323                TicketSelector::from(channel)
1324                    .with_amount(min_value.into()..)
1325                    .with_index_range(channel.ticket_index..)
1326                    .with_state(AcknowledgedTicketStatus::Untouched),
1327            )
1328            .await?;
1329
1330        Ok(())
1331    }
1332
1333    pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1334        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1335
1336        self.redemption_requests()?
1337            .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1338            .await?;
1339
1340        Ok(())
1341    }
1342
1343    pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1344        let peer_id = *peer_id;
1345        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
1346        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer_id))
1347            .await
1348            .map_err(|e| HoprLibError::GeneralError(format!("failed to convert peer id to off-chain key: {}", e)))?;
1349
1350        self.chain_api
1351            .packet_key_to_chain_key(&pubkey)
1352            .await
1353            .map_err(HoprLibError::chain)
1354    }
1355
1356    pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1357        self.chain_api
1358            .chain_key_to_packet_key(address)
1359            .await
1360            .map(|pk| pk.map(|v| v.into()))
1361            .map_err(HoprLibError::chain)
1362    }
1363}
1364
1365impl<Chain, Db> Hopr<Chain, Db> {
1366    // === telemetry
1367    /// Prometheus formatted metrics collected by the hopr-lib components.
1368    pub fn collect_hopr_metrics() -> errors::Result<String> {
1369        cfg_if::cfg_if! {
1370            if #[cfg(all(feature = "prometheus", not(test)))] {
1371                hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
1372            } else {
1373                Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
1374            }
1375        }
1376    }
1377}