hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15
16/// Helper functions.
17mod helpers;
18
19/// Configuration-related public types
20pub mod config;
21/// Various public constants.
22pub mod constants;
23/// Lists all errors thrown from this library.
24pub mod errors;
25
26/// Utility module with helper types and functionality over hopr-lib behavior.
27pub mod utils;
28
29/// Public traits for interactions with this library.
30pub mod traits;
31
32/// Functionality related to the HOPR node state.
33pub mod state;
34
35#[cfg(any(feature = "testing", test))]
36pub mod testing;
37
38pub use hopr_api as api;
39
40/// Exports of libraries necessary for API and interface operations.
41#[doc(hidden)]
42pub mod exports {
43    pub mod types {
44        pub use hopr_internal_types as internal;
45        pub use hopr_primitive_types as primitive;
46    }
47
48    pub mod crypto {
49        pub use hopr_crypto_keypair as keypair;
50        pub use hopr_crypto_types as types;
51    }
52
53    pub mod network {
54        pub use hopr_network_types as types;
55    }
56
57    pub use hopr_transport as transport;
58}
59
60/// Export of relevant types for easier integration.
61#[doc(hidden)]
62pub mod prelude {
63    pub use super::exports::{
64        crypto::{
65            keypair::key_pair::HoprKeys,
66            types::prelude::{ChainKeypair, Hash, OffchainKeypair},
67        },
68        network::types::{
69            prelude::ForeignDataMode,
70            udp::{ConnectedUdpStream, UdpStreamParallelism},
71        },
72        transport::{OffchainPublicKey, socket::HoprSocket},
73        types::primitive::prelude::Address,
74    };
75}
76
77use std::{
78    convert::identity,
79    future::Future,
80    num::NonZeroUsize,
81    sync::{Arc, OnceLock, atomic::Ordering},
82    time::Duration,
83};
84
85use futures::{FutureExt, SinkExt, Stream, StreamExt, TryFutureExt, channel::mpsc::channel, pin_mut};
86use futures_time::future::FutureExt as FuturesTimeFutureExt;
87use hopr_api::{
88    chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
89    ct::TrafficGeneration,
90    db::{HoprNodeDbApi, TicketMarker, TicketSelector},
91};
92pub use hopr_api::{db::ChannelTicketStatistics, network::Observable};
93use hopr_async_runtime::prelude::spawn;
94pub use hopr_async_runtime::{Abortable, AbortableList};
95pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
96pub use hopr_crypto_types::prelude::*;
97pub use hopr_internal_types::prelude::*;
98pub use hopr_network_types::prelude::*;
99#[cfg(all(feature = "prometheus", not(test)))]
100use hopr_platform::time::native::current_time;
101pub use hopr_primitive_types::prelude::*;
102use hopr_transport::errors::HoprTransportError;
103#[cfg(feature = "runtime-tokio")]
104pub use hopr_transport::transfer_session;
105pub use hopr_transport::*;
106use tracing::{debug, error, info, warn};
107use validator::Validate;
108
109pub use crate::{
110    config::SafeModule,
111    constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
112    errors::{HoprLibError, HoprStatusError},
113    state::{HoprLibProcess, HoprState},
114    traits::chain::{CloseChannelResult, OpenChannelResult},
115};
116
117#[cfg(all(feature = "prometheus", not(test)))]
118lazy_static::lazy_static! {
119    static ref METRIC_PROCESS_START_TIME:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
120        "hopr_start_time",
121        "The unix timestamp in seconds at which the process was started"
122    ).unwrap();
123    static ref METRIC_HOPR_LIB_VERSION:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
124        "hopr_lib_version",
125        "Executed version of hopr-lib",
126        &["version"]
127    ).unwrap();
128    static ref METRIC_HOPR_NODE_INFO:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
129        "hopr_node_addresses",
130        "Node on-chain and off-chain addresses",
131        &["peerid", "address", "safe_address", "module_address"]
132    ).unwrap();
133}
134
135/// Prepare an optimized version of the tokio runtime setup for hopr-lib specifically.
136///
137/// Divide the available CPU parallelism by 2, since half of the available threads are
138/// to be used for IO-bound and half for CPU-bound tasks.
139#[cfg(feature = "runtime-tokio")]
140pub fn prepare_tokio_runtime(
141    num_cpu_threads: Option<NonZeroUsize>,
142    num_io_threads: Option<NonZeroUsize>,
143) -> anyhow::Result<tokio::runtime::Runtime> {
144    use std::str::FromStr;
145    let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
146
147    hopr_parallelize::cpu::init_thread_pool(
148        num_cpu_threads
149            .map(|v| v.get())
150            .or(avail_parallelism)
151            .ok_or(anyhow::anyhow!(
152                "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
153                 environment variable."
154            ))?
155            .max(1),
156    )?;
157
158    Ok(tokio::runtime::Builder::new_multi_thread()
159        .enable_all()
160        .worker_threads(
161            num_io_threads
162                .map(|v| v.get())
163                .or(avail_parallelism)
164                .ok_or(anyhow::anyhow!(
165                    "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
166                     environment variable."
167                ))?
168                .max(1),
169        )
170        .thread_name("hoprd")
171        .thread_stack_size(
172            std::env::var("HOPRD_THREAD_STACK_SIZE")
173                .ok()
174                .and_then(|v| usize::from_str(&v).ok())
175                .unwrap_or(10 * 1024 * 1024)
176                .max(2 * 1024 * 1024),
177        )
178        .build()?)
179}
180
181/// Type alias used to send and receive transport data via a running HOPR node.
182pub type HoprTransportIO = socket::HoprSocket<
183    futures::channel::mpsc::Receiver<ApplicationDataIn>,
184    futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
185>;
186
187type NewTicketEvents = (
188    async_broadcast::Sender<VerifiedTicket>,
189    async_broadcast::InactiveReceiver<VerifiedTicket>,
190);
191
192/// Time to wait until the node's keybinding appears on-chain
193const NODE_READY_TIMEOUT: Duration = Duration::from_secs(30);
194
195/// Timeout to wait until an on-chain event is received in response to a successful on-chain operation resolution.
196const ON_CHAIN_RESOLUTION_EVENT_TIMEOUT: Duration = Duration::from_secs(30);
197
198/// HOPR main object providing the entire HOPR node functionality
199///
200/// Instantiating this object creates all processes and objects necessary for
201/// running the HOPR node. Once created, the node can be started using the
202/// `run()` method.
203///
204/// Externally offered API should be enough to perform all necessary tasks
205/// with the HOPR node manually, but it is advised to create such a configuration
206/// that manual interaction is unnecessary.
207///
208/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
209pub struct Hopr<Chain, Db> {
210    me: OffchainKeypair,
211    cfg: config::HoprLibConfig,
212    state: Arc<state::AtomicHoprState>,
213    transport_api: HoprTransport<Chain, Db>,
214    redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
215    node_db: Db,
216    chain_api: Chain,
217    winning_ticket_subscribers: NewTicketEvents,
218    processes: OnceLock<AbortableList<HoprLibProcess>>,
219}
220
221impl<Chain, Db> Hopr<Chain, Db>
222where
223    Chain: HoprChainApi + Clone + Send + Sync + 'static,
224    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
225{
226    pub async fn new(
227        identity: (&ChainKeypair, &OffchainKeypair),
228        hopr_chain_api: Chain,
229        hopr_node_db: Db,
230        cfg: config::HoprLibConfig,
231    ) -> errors::Result<Self> {
232        if hopr_crypto_random::is_rng_fixed() {
233            warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
234        }
235
236        cfg.validate()?;
237
238        let hopr_transport_api = HoprTransport::new(
239            identity,
240            hopr_chain_api.clone(),
241            hopr_node_db.clone(),
242            vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
243            cfg.protocol,
244        );
245
246        #[cfg(all(feature = "prometheus", not(test)))]
247        {
248            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
249            METRIC_HOPR_LIB_VERSION.set(
250                &[const_format::formatcp!("{}", constants::APP_VERSION)],
251                const_format::formatcp!(
252                    "{}.{}",
253                    env!("CARGO_PKG_VERSION_MAJOR"),
254                    env!("CARGO_PKG_VERSION_MINOR")
255                )
256                .parse()
257                .unwrap_or(0.0),
258            );
259
260            // Calling get_ticket_statistics will initialize the respective metrics on tickets
261            if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
262                error!(%error, "failed to initialize ticket statistics metrics");
263            }
264        }
265
266        let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
267        new_tickets_tx.set_await_active(false);
268        new_tickets_tx.set_overflow(true);
269
270        Ok(Self {
271            me: identity.1.clone(),
272            cfg,
273            state: Arc::new(state::AtomicHoprState::new(HoprState::Uninitialized)),
274            transport_api: hopr_transport_api,
275            chain_api: hopr_chain_api,
276            node_db: hopr_node_db,
277            redeem_requests: OnceLock::new(),
278            processes: OnceLock::new(),
279            winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
280        })
281    }
282
283    fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
284        if self.status() == state {
285            Ok(())
286        } else {
287            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
288        }
289    }
290
291    pub fn status(&self) -> HoprState {
292        self.state.load(Ordering::Relaxed)
293    }
294
295    pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
296        self.chain_api
297            .balance(self.me_onchain())
298            .await
299            .map_err(HoprLibError::chain)
300    }
301
302    pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
303        self.chain_api
304            .balance(self.cfg.safe_module.safe_address)
305            .await
306            .map_err(HoprLibError::chain)
307    }
308
309    pub async fn chain_info(&self) -> errors::Result<ChainInfo> {
310        self.chain_api.chain_info().await.map_err(HoprLibError::chain)
311    }
312
313    pub fn get_safe_config(&self) -> SafeModule {
314        self.cfg.safe_module.clone()
315    }
316
317    pub fn config(&self) -> &config::HoprLibConfig {
318        &self.cfg
319    }
320
321    #[inline]
322    fn is_public(&self) -> bool {
323        self.cfg.publish
324    }
325
326    pub async fn run<
327        Ct,
328        #[cfg(feature = "session-server")] T: traits::session::HoprSessionServer + Clone + Send + 'static,
329    >(
330        &self,
331        cover_traffic: Ct,
332        #[cfg(feature = "session-server")] serve_handler: T,
333    ) -> errors::Result<HoprTransportIO>
334    where
335        Ct: TrafficGeneration + Send + Sync + 'static,
336    {
337        self.error_if_not_in_state(
338            HoprState::Uninitialized,
339            "cannot start the hopr node multiple times".into(),
340        )?;
341
342        #[cfg(feature = "testing")]
343        warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
344
345        info!(
346            address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
347            "node is not started, please fund this node",
348        );
349
350        helpers::wait_for_funds(
351            *MIN_NATIVE_BALANCE,
352            *SUGGESTED_NATIVE_BALANCE,
353            Duration::from_secs(200),
354            self.me_onchain(),
355            &self.chain_api,
356        )
357        .await?;
358
359        let mut processes = AbortableList::<HoprLibProcess>::default();
360
361        info!("starting HOPR node...");
362        self.state.store(HoprState::Initializing, Ordering::Relaxed);
363
364        let balance: XDaiBalance = self.get_balance().await?;
365        let minimum_balance = *constants::MIN_NATIVE_BALANCE;
366
367        info!(
368            address = %self.me_onchain(),
369            %balance,
370            %minimum_balance,
371            "node information"
372        );
373
374        if balance.le(&minimum_balance) {
375            return Err(HoprLibError::GeneralError(
376                "cannot start the node without a sufficiently funded wallet".into(),
377            ));
378        }
379
380        // Once we are able to query the chain,
381        // check if the ticket price is configured correctly.
382        let network_min_ticket_price = self
383            .chain_api
384            .minimum_ticket_price()
385            .await
386            .map_err(HoprLibError::chain)?;
387        let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
388        if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
389            return Err(HoprLibError::GeneralError(format!(
390                "configured outgoing ticket price is lower than the network minimum ticket price: \
391                 {configured_ticket_price:?} < {network_min_ticket_price}"
392            )));
393        }
394        // Once we are able to query the chain,
395        // check if the winning probability is configured correctly.
396        let network_min_win_prob = self
397            .chain_api
398            .minimum_incoming_ticket_win_prob()
399            .await
400            .map_err(HoprLibError::chain)?;
401        let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
402        if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
403            && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
404        {
405            return Err(HoprLibError::GeneralError(format!(
406                "configured outgoing ticket winning probability is lower than the network minimum winning \
407                 probability: {configured_win_prob:?} < {network_min_win_prob}"
408            )));
409        }
410
411        // Calculate the minimum capacity based on accounts (each account can generate 2 messages),
412        // plus 100 as an additional buffer
413        let minimum_capacity = self
414            .chain_api
415            .count_accounts(AccountSelector {
416                public_only: true,
417                ..Default::default()
418            })
419            .await
420            .map_err(HoprLibError::chain)?
421            .saturating_mul(2)
422            .saturating_add(100);
423
424        let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
425            .ok()
426            .and_then(|s| s.trim().parse::<usize>().ok())
427            .filter(|&c| c > 0)
428            .unwrap_or(2048)
429            .max(minimum_capacity);
430
431        debug!(
432            capacity = chain_discovery_events_capacity,
433            minimum_required = minimum_capacity,
434            "creating chain discovery events channel"
435        );
436        let (indexer_peer_update_tx, indexer_peer_update_rx) =
437            channel::<PeerDiscovery>(chain_discovery_events_capacity);
438
439        // Stream all the existing announcements and also subscribe to all future on-chain
440        // announcements
441        let (announcements_stream, announcements_handle) = futures::stream::abortable(
442            self.chain_api
443                .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])
444                .map_err(HoprLibError::chain)?,
445        );
446        processes.insert(HoprLibProcess::AccountAnnouncements, announcements_handle);
447
448        spawn(
449            announcements_stream
450                .filter_map(|event| {
451                    futures::future::ready(event.try_as_announcement().map(|account| {
452                        PeerDiscovery::Announce(account.public_key.into(), account.get_multiaddrs().to_vec())
453                    }))
454                })
455                .map(Ok)
456                .forward(indexer_peer_update_tx)
457                .inspect(
458                    |_| warn!(task = %HoprLibProcess::AccountAnnouncements,"long-running background task finished"),
459                ),
460        );
461
462        info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
463
464        let safe_addr = self.cfg.safe_module.safe_address;
465
466        if self.me_onchain() == safe_addr {
467            return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
468        }
469
470        info!(%safe_addr, "registering safe with this node");
471        match self.chain_api.register_safe(&safe_addr).await {
472            Ok(awaiter) => {
473                // Wait until the registration is confirmed on-chain, otherwise we cannot proceed.
474                awaiter.await.map_err(|error| {
475                    error!(%safe_addr, %error, "safe registration failed with error");
476                    HoprLibError::chain(error)
477                })?;
478                info!(%safe_addr, "safe successfully registered with this node");
479            }
480            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
481                info!(%safe_addr, "this safe is already registered with this node");
482            }
483            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
484                // TODO: support safe deregistration flow
485                error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
486                return Err(HoprLibError::GeneralError("node registered with different safe".into()));
487            }
488            Err(error) => {
489                error!(%safe_addr, %error, "safe registration failed");
490                return Err(HoprLibError::chain(error));
491            }
492        }
493
494        // Only public nodes announce multiaddresses
495        let multiaddresses_to_announce = if self.is_public() {
496            // The multiaddresses are filtered for the non-private ones,
497            // unless `announce_local_addresses` is set to `true`.
498            self.transport_api.announceable_multiaddresses()
499        } else {
500            Vec::with_capacity(0)
501        };
502
503        // Warn when announcing a private multiaddress, which is acceptable in certain scenarios
504        multiaddresses_to_announce
505            .iter()
506            .filter(|a| !is_public_address(a))
507            .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
508
509        let chain_api = self.chain_api.clone();
510        let me_offchain = *self.me.public();
511        let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
512
513        // At this point the node is already registered with Safe, so
514        // we can announce via Safe-compliant TX
515        info!(?multiaddresses_to_announce, "announcing node on chain");
516        match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
517            Ok(awaiter) => {
518                // Wait until the announcement is confirmed on-chain, otherwise we cannot proceed.
519                awaiter.await.map_err(|error| {
520                    error!(?multiaddresses_to_announce, %error, "node announcement failed");
521                    HoprLibError::chain(error)
522                })?;
523                info!(?multiaddresses_to_announce, "node has been successfully announced");
524            }
525            Err(AnnouncementError::AlreadyAnnounced) => {
526                info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
527            }
528            Err(error) => {
529                error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
530                return Err(HoprLibError::chain(error));
531            }
532        }
533
534        // Wait for the node key-binding readiness to return
535        let this_node_account = node_ready
536            .await
537            .map_err(HoprLibError::other)?
538            .map_err(HoprLibError::chain)?;
539        if this_node_account.chain_addr != self.me_onchain()
540            || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
541        {
542            error!(%this_node_account, "account bound to offchain key does not match this node");
543            return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
544        }
545
546        info!(%this_node_account, "node account is ready");
547
548        let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
549            .ok()
550            .and_then(|s| s.trim().parse::<usize>().ok())
551            .filter(|&c| c > 0)
552            .unwrap_or(256);
553
554        let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
555        #[cfg(feature = "session-server")]
556        {
557            debug!(capacity = incoming_session_channel_capacity, "creating session server");
558            processes.insert(
559                HoprLibProcess::SessionServer,
560                hopr_async_runtime::spawn_as_abortable!(
561                    _session_rx
562                        .for_each_concurrent(None, move |session| {
563                            let serve_handler = serve_handler.clone();
564                            async move {
565                                let session_id = *session.session.id();
566                                match serve_handler.process(session).await {
567                                    Ok(_) => debug!(?session_id, "client session processed successfully"),
568                                    Err(error) => error!(
569                                        ?session_id,
570                                        %error,
571                                        "client session processing failed"
572                                    ),
573                                }
574                            }
575                        })
576                        .inspect(|_| tracing::warn!(
577                            task = %HoprLibProcess::SessionServer,
578                            "long-running background task finished"
579                        ))
580                ),
581            );
582        }
583
584        info!("starting ticket events processor");
585        let (tickets_tx, tickets_rx) = channel(1024);
586        let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
587        processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
588        let node_db = self.node_db.clone();
589        let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
590        spawn(
591            tickets_rx
592                .filter_map(move |ticket_event| {
593                    let node_db = node_db.clone();
594                    async move {
595                        match ticket_event {
596                            TicketEvent::WinningTicket(winning) => {
597                                if let Err(error) = node_db.insert_ticket(*winning).await {
598                                    tracing::error!(%error, %winning, "failed to insert ticket into database");
599                                } else {
600                                    tracing::debug!(%winning, "inserted ticket into database");
601                                }
602                                Some(winning)
603                            }
604                            TicketEvent::RejectedTicket(rejected, issuer) => {
605                                if let Some(issuer) = &issuer {
606                                    if let Err(error) =
607                                        node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
608                                    {
609                                        tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
610                                    } else {
611                                        tracing::debug!(%rejected, "marked ticket as rejected");
612                                    }
613                                } else {
614                                    tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
615                                }
616                                None
617                            }
618                        }
619                    }
620                })
621                .for_each(move |ticket| {
622                    if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
623                        tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
624                    }
625                    futures::future::ready(())
626                })
627                .inspect(|_| {
628                    tracing::warn!(
629                        task = %HoprLibProcess::TicketEvents,
630                        "long-running background task finished"
631                    )
632                }),
633        );
634
635        info!("starting transport");
636        let (hopr_socket, transport_processes) = self
637            .transport_api
638            .run(cover_traffic, indexer_peer_update_rx, tickets_tx, session_tx)
639            .await?;
640        processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
641
642        // Start a queue that takes care of redeeming tickets via given TicketSelectors
643        let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
644        let _ = self.redeem_requests.set(redemption_req_tx);
645        let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
646        processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
647        let chain = self.chain_api.clone();
648        let node_db = self.node_db.clone();
649        spawn(redemption_req_rx
650            .for_each(move |selector| {
651                let chain = chain.clone();
652                let db = node_db.clone();
653                async move {
654                    match chain.redeem_tickets_via_selectors(&db, [selector]).await {
655                        Ok(res) => debug!(%res, "redemption complete"),
656                        Err(error) => error!(%error, "redemption failed"),
657                    }
658                }
659            })
660            .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
661        );
662
663        let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
664        processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
665        let chain = self.chain_api.clone();
666        let node_db = self.node_db.clone();
667        let events = chain.subscribe().map_err(HoprLibError::chain)?;
668        spawn(
669            futures::stream::Abortable::new(
670                events
671                    .filter_map(move |event|
672                        futures::future::ready(event.try_as_channel_closed())
673                    ),
674                chain_events_sub_reg
675            )
676            .for_each(move |closed_channel| {
677                let node_db = node_db.clone();
678                let chain = chain.clone();
679                async move {
680                    match closed_channel.direction(chain.me()) {
681                        Some(ChannelDirection::Incoming) => {
682                            match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
683                                Ok(num_neglected) if num_neglected > 0 => {
684                                    warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
685                                },
686                                Ok(_) => {
687                                    debug!(%closed_channel, "no neglected tickets on incoming closed channel");
688                                },
689                                Err(error) => {
690                                    error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
691                                }
692                            }
693                        },
694                        Some(ChannelDirection::Outgoing) => {
695                            if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
696                                error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
697                            } else {
698                                debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
699                            }
700                        }
701                        _ => {} // Event for a channel that is not our own
702                    }
703                }
704            })
705            .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
706        );
707
708        // NOTE: after the chain is synced, we can reset tickets which are considered
709        // redeemed but on-chain state does not align with that. This implies there was a problem
710        // right when the transaction was sent on-chain. In such cases, we simply let it retry and
711        // handle errors appropriately.
712        let mut channels = self
713            .chain_api
714            .stream_channels(ChannelSelector {
715                destination: self.me_onchain().into(),
716                ..Default::default()
717            })
718            .map_err(HoprLibError::chain)
719            .await?;
720
721        while let Some(channel) = channels.next().await {
722            self.node_db
723                .update_ticket_states_and_fetch(
724                    [TicketSelector::from(&channel)
725                        .with_state(AcknowledgedTicketStatus::BeingRedeemed)
726                        .with_index_range(channel.ticket_index..)],
727                    AcknowledgedTicketStatus::Untouched,
728                )
729                .map_err(HoprLibError::db)
730                .await?
731                .for_each(|ticket| {
732                    info!(%ticket, "fixed next out-of-sync ticket");
733                    futures::future::ready(())
734                })
735                .await;
736        }
737
738        self.state.store(HoprState::Running, Ordering::Relaxed);
739
740        info!(
741            id = %self.me_peer_id(),
742            version = constants::APP_VERSION,
743            "NODE STARTED AND RUNNING"
744        );
745
746        #[cfg(all(feature = "prometheus", not(test)))]
747        METRIC_HOPR_NODE_INFO.set(
748            &[
749                &self.me.public().to_peerid_str(),
750                &self.me_onchain().to_string(),
751                &self.cfg.safe_module.safe_address.to_string(),
752                &self.cfg.safe_module.module_address.to_string(),
753            ],
754            1.0,
755        );
756
757        let _ = self.processes.set(processes);
758        Ok(hopr_socket)
759    }
760
761    /// Used to practically shut down all node's processes without dropping the instance.
762    ///
763    /// This means that the instance can be used to retrieve some information, but all
764    /// active operations will stop and new will be impossible to perform.
765    /// Such operations will return [`HoprStatusError::NotThereYet`].
766    ///
767    /// This is the final state and cannot be reversed by calling [`HoprLib::run`] again.
768    pub fn shutdown(&self) -> Result<(), HoprLibError> {
769        self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
770        if let Some(processes) = self.processes.get() {
771            processes.abort_all();
772        }
773        self.state.store(HoprState::Terminated, Ordering::Relaxed);
774        info!("NODE SHUTDOWN COMPLETE");
775        Ok(())
776    }
777
778    /// Allows external users to receive notifications about new winning tickets.
779    pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send {
780        self.winning_ticket_subscribers.1.activate_cloned()
781    }
782
783    // p2p transport =========
784    /// Own PeerId used in the libp2p transport layer
785    pub fn me_peer_id(&self) -> PeerId {
786        (*self.me.public()).into()
787    }
788
789    /// Get the list of all announced public nodes in the network
790    pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
791        Ok(self
792            .chain_api
793            .stream_accounts(AccountSelector {
794                public_only: true,
795                ..Default::default()
796            })
797            .map_err(HoprLibError::chain)
798            .await?
799            .map(|entry| {
800                (
801                    PeerId::from(entry.public_key),
802                    entry.chain_addr,
803                    entry.get_multiaddrs().to_vec(),
804                )
805            })
806            .collect()
807            .await)
808    }
809
810    /// Ping another node in the network based on the PeerId
811    ///
812    /// Returns the RTT (round trip time), i.e. how long it took for the ping to return.
813    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, Observations)> {
814        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
815
816        Ok(self.transport_api.ping(peer).await?)
817    }
818
819    /// Create a client session connection returning a session object that implements
820    /// [`futures::io::AsyncRead`] and [`futures::io::AsyncWrite`] and can bu used as a read/write binary session.
821    #[cfg(feature = "session-client")]
822    pub async fn connect_to(
823        &self,
824        destination: Address,
825        target: SessionTarget,
826        cfg: SessionClientConfig,
827    ) -> errors::Result<HoprSession> {
828        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
829
830        let backoff = backon::ConstantBuilder::default()
831            .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
832            .with_delay(self.cfg.protocol.session.establish_retry_timeout)
833            .with_jitter();
834
835        use backon::Retryable;
836
837        Ok((|| {
838            let cfg = cfg.clone();
839            let target = target.clone();
840            async { self.transport_api.new_session(destination, target, cfg).await }
841        })
842        .retry(backoff)
843        .sleep(backon::FuturesTimerSleeper)
844        .await?)
845    }
846
847    /// Sends keep-alive to the given [`HoprSessionId`], making sure the session is not
848    /// closed due to inactivity.
849    #[cfg(feature = "session-client")]
850    pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
851        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
852        Ok(self.transport_api.probe_session(id).await?)
853    }
854
855    #[cfg(feature = "session-client")]
856    pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
857        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
858        Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
859    }
860
861    #[cfg(feature = "session-client")]
862    pub async fn update_session_surb_balancer_config(
863        &self,
864        id: &SessionId,
865        cfg: SurbBalancerConfig,
866    ) -> errors::Result<()> {
867        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
868        Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
869    }
870
871    /// List all multiaddresses announced by this node
872    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
873        self.transport_api.local_multiaddresses()
874    }
875
876    /// List all multiaddresses on which the node is listening
877    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
878        self.transport_api.listening_multiaddresses().await
879    }
880
881    /// List all multiaddresses observed for a PeerId
882    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
883        self.transport_api.network_observed_multiaddresses(peer).await
884    }
885
886    /// List all multiaddresses announced on-chain for the given node.
887    pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> errors::Result<Vec<Multiaddr>> {
888        let peer = *peer;
889        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
890        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
891
892        match self
893            .chain_api
894            .stream_accounts(AccountSelector {
895                public_only: false,
896                offchain_key: Some(pubkey),
897                ..Default::default()
898            })
899            .map_err(HoprLibError::chain)
900            .await?
901            .next()
902            .await
903        {
904            Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
905            None => {
906                error!(%peer, "no information");
907                Ok(vec![])
908            }
909        }
910    }
911
912    // Network =========
913
914    /// Get measured network health
915    pub async fn network_health(&self) -> Health {
916        self.transport_api.network_health().await
917    }
918
919    /// List all peers connected to this
920    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
921        Ok(self.transport_api.network_connected_peers().await?)
922    }
923
924    /// Get all data collected from the network relevant for a PeerId
925    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<Observations>> {
926        Ok(self.transport_api.network_peer_observations(peer).await?)
927    }
928
929    /// Get peers connected peers with quality higher than some value
930    pub async fn all_network_peers(
931        &self,
932        minimum_score: f64,
933    ) -> errors::Result<Vec<(Option<Address>, PeerId, Observations)>> {
934        Ok(
935            futures::stream::iter(self.transport_api.network_connected_peers().await?)
936                .filter_map(|peer| async move {
937                    if let Ok(Some(info)) = self.transport_api.network_peer_observations(&peer).await {
938                        if info.score() >= minimum_score {
939                            Some((peer, info))
940                        } else {
941                            None
942                        }
943                    } else {
944                        None
945                    }
946                })
947                .filter_map(|(peer_id, info)| async move {
948                    let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
949                    Some((address, peer_id, info))
950                })
951                .collect::<Vec<_>>()
952                .await,
953        )
954    }
955
956    // Ticket ========
957    /// Get all tickets in a channel specified by [`channel_id`](ChannelId).
958    pub async fn tickets_in_channel(&self, channel_id: &ChannelId) -> errors::Result<Option<Vec<RedeemableTicket>>> {
959        if let Some(channel) = self
960            .chain_api
961            .channel_by_id(channel_id)
962            .await
963            .map_err(|e| HoprTransportError::Other(e.into()))?
964        {
965            if &channel.destination == self.chain_api.me() {
966                Ok(Some(
967                    self.node_db
968                        .stream_tickets([&channel])
969                        .await
970                        .map_err(HoprLibError::db)?
971                        .collect()
972                        .await,
973                ))
974            } else {
975                Ok(None)
976            }
977        } else {
978            Ok(None)
979        }
980    }
981
982    /// Get all tickets
983    pub async fn all_tickets(&self) -> errors::Result<Vec<VerifiedTicket>> {
984        Ok(self
985            .node_db
986            .stream_tickets(None::<TicketSelector>)
987            .await
988            .map_err(HoprLibError::db)?
989            .map(|v| v.ticket)
990            .collect()
991            .await)
992    }
993
994    /// Get statistics for all tickets
995    pub async fn ticket_statistics(&self) -> errors::Result<ChannelTicketStatistics> {
996        self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
997    }
998
999    /// Reset the ticket metrics to zero
1000    pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
1001        self.node_db
1002            .reset_ticket_statistics()
1003            .await
1004            .map_err(HoprLibError::chain)
1005    }
1006
1007    // Chain =========
1008    pub fn me_onchain(&self) -> Address {
1009        *self.chain_api.me()
1010    }
1011
1012    /// Get ticket price
1013    pub async fn get_ticket_price(&self) -> errors::Result<HoprBalance> {
1014        self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1015    }
1016
1017    /// Get minimum incoming ticket winning probability
1018    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
1019        self.chain_api
1020            .minimum_incoming_ticket_win_prob()
1021            .await
1022            .map_err(HoprLibError::chain)
1023    }
1024
1025    /// List of all accounts announced on the chain
1026    pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1027        Ok(self
1028            .chain_api
1029            .stream_accounts(AccountSelector {
1030                public_only: true,
1031                ..Default::default()
1032            })
1033            .map_err(HoprLibError::chain)
1034            .await?
1035            .collect()
1036            .await)
1037    }
1038
1039    /// Get the channel entry from Hash.
1040    /// @returns the channel entry of those two nodes
1041    pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1042        self.chain_api
1043            .channel_by_id(channel_id)
1044            .await
1045            .map_err(HoprLibError::chain)
1046    }
1047
1048    /// Get the channel entry between source and destination node.
1049    /// @param src Address
1050    /// @param dest Address
1051    /// @returns the channel entry of those two nodes
1052    pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<Option<ChannelEntry>> {
1053        self.chain_api
1054            .channel_by_parties(src, dest)
1055            .await
1056            .map_err(HoprLibError::chain)
1057    }
1058
1059    /// List all channels open from a specified Address
1060    pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1061        Ok(self
1062            .chain_api
1063            .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1064                ChannelStatusDiscriminants::Closed,
1065                ChannelStatusDiscriminants::Open,
1066                ChannelStatusDiscriminants::PendingToClose,
1067            ]))
1068            .map_err(HoprLibError::chain)
1069            .await?
1070            .collect()
1071            .await)
1072    }
1073
1074    /// List all channels open to a specified address
1075    pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1076        Ok(self
1077            .chain_api
1078            .stream_channels(
1079                ChannelSelector::default()
1080                    .with_destination(*dest)
1081                    .with_allowed_states(&[
1082                        ChannelStatusDiscriminants::Closed,
1083                        ChannelStatusDiscriminants::Open,
1084                        ChannelStatusDiscriminants::PendingToClose,
1085                    ]),
1086            )
1087            .map_err(HoprLibError::chain)
1088            .await?
1089            .collect()
1090            .await)
1091    }
1092
1093    /// List all channels
1094    pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1095        Ok(self
1096            .chain_api
1097            .stream_channels(ChannelSelector::default().with_allowed_states(&[
1098                ChannelStatusDiscriminants::Closed,
1099                ChannelStatusDiscriminants::Open,
1100                ChannelStatusDiscriminants::PendingToClose,
1101            ]))
1102            .map_err(HoprLibError::chain)
1103            .await?
1104            .collect()
1105            .await)
1106    }
1107
1108    /// Current safe allowance balance
1109    pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1110        self.chain_api
1111            .safe_allowance(self.cfg.safe_module.safe_address)
1112            .await
1113            .map_err(HoprLibError::chain)
1114    }
1115
1116    /// Withdraw on-chain assets to a given address
1117    /// @param recipient the account where the assets should be transferred to
1118    /// @param amount how many tokens to be transferred
1119    pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1120        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1121
1122        self.chain_api
1123            .withdraw(amount, &recipient)
1124            .and_then(identity)
1125            .map_err(HoprLibError::chain)
1126            .await
1127    }
1128
1129    /// Withdraw on-chain native assets to a given address
1130    /// @param recipient the account where the assets should be transferred to
1131    /// @param amount how many tokens to be transferred
1132    pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<prelude::Hash> {
1133        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1134
1135        self.chain_api
1136            .withdraw(amount, &recipient)
1137            .and_then(identity)
1138            .map_err(HoprLibError::chain)
1139            .await
1140    }
1141
1142    /// Spawns a one-shot awaiter that hooks up to the [`ChainEvent`] bus and either matching the given `predicate`
1143    /// successfully or timing out after `timeout`.
1144    fn spawn_wait_for_on_chain_event(
1145        &self,
1146        context: impl std::fmt::Display,
1147        predicate: impl Fn(&ChainEvent) -> bool + Send + Sync + 'static,
1148        timeout: Duration,
1149    ) -> errors::Result<impl Future<Output = errors::Result<ChainEvent>>> {
1150        let event_stream = self
1151            .chain_api
1152            .subscribe()
1153            .map_err(HoprLibError::chain)?
1154            .skip_while(move |event| futures::future::ready(!predicate(event)));
1155
1156        let ctx = context.to_string();
1157        Ok(spawn(async move {
1158            pin_mut!(event_stream);
1159            event_stream
1160                .next()
1161                .timeout(futures_time::time::Duration::from(timeout))
1162                .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")))
1163                .await?
1164                .ok_or(HoprLibError::GeneralError(format!(
1165                    "failed to yield an on-chain event for {ctx}"
1166                )))
1167        })
1168        .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")))
1169        .and_then(futures::future::ready))
1170    }
1171
1172    pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1173        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1174
1175        let channel_id = generate_channel_id(&self.me_onchain(), destination);
1176
1177        let event_awaiter = self.spawn_wait_for_on_chain_event(
1178            format!("open channel to {destination}"),
1179            move |event| matches!(event, ChainEvent::ChannelOpened(c) if c.get_id() == &channel_id),
1180            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1181        )?;
1182
1183        let tx_hash = self
1184            .chain_api
1185            .open_channel(destination, amount)
1186            .and_then(identity)
1187            .map_err(HoprLibError::chain)
1188            .await?;
1189
1190        let event = event_awaiter.await?;
1191        debug!(%event, "open channel event received");
1192
1193        Ok(OpenChannelResult { tx_hash, channel_id })
1194    }
1195
1196    pub async fn fund_channel(&self, channel_id: &ChannelId, amount: HoprBalance) -> errors::Result<Hash> {
1197        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1198
1199        let channel_id = *channel_id;
1200        let event_awaiter = self.spawn_wait_for_on_chain_event(
1201            format!("fund channel {channel_id}"),
1202            move |event| matches!(event, ChainEvent::ChannelBalanceIncreased(c, a) if c.get_id() == &channel_id && a == &amount),
1203            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT
1204        )?;
1205
1206        let res = self
1207            .chain_api
1208            .fund_channel(&channel_id, amount)
1209            .and_then(identity)
1210            .map_err(HoprLibError::chain)
1211            .await?;
1212
1213        let event = event_awaiter.await?;
1214        debug!(%event, "fund channel event received");
1215
1216        Ok(res)
1217    }
1218
1219    pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> errors::Result<CloseChannelResult> {
1220        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1221
1222        let channel_id = *channel_id;
1223        let event_awaiter = self.spawn_wait_for_on_chain_event(
1224            format!("close channel {channel_id}"),
1225            move |event| {
1226                matches!(event, ChainEvent::ChannelClosed(c) if c.get_id() == &channel_id)
1227                    || matches!(event, ChainEvent::ChannelClosureInitiated(c) if c.get_id() == &channel_id)
1228            },
1229            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1230        )?;
1231
1232        let tx_hash = self
1233            .chain_api
1234            .close_channel(&channel_id)
1235            .and_then(identity)
1236            .map_err(HoprLibError::chain)
1237            .await?;
1238
1239        let event = event_awaiter.await?;
1240        debug!(%event, "close channel event received");
1241
1242        Ok(CloseChannelResult { tx_hash })
1243    }
1244
1245    pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1246        self.chain_api
1247            .channel_closure_notice_period()
1248            .await
1249            .map_err(HoprLibError::chain)
1250    }
1251
1252    pub fn redemption_requests(
1253        &self,
1254    ) -> errors::Result<impl futures::Sink<TicketSelector, Error = HoprLibError> + Clone> {
1255        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1256
1257        // TODO: add universal timeout sink here
1258        Ok(self
1259            .redeem_requests
1260            .get()
1261            .cloned()
1262            .expect("redeem_requests is not initialized")
1263            .sink_map_err(HoprLibError::other))
1264    }
1265
1266    pub async fn redeem_all_tickets<B: Into<HoprBalance>>(&self, min_value: B) -> errors::Result<()> {
1267        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1268
1269        let min_value = min_value.into();
1270
1271        self.chain_api
1272            .stream_channels(
1273                ChannelSelector::default()
1274                    .with_destination(self.me_onchain())
1275                    .with_allowed_states(&[
1276                        ChannelStatusDiscriminants::Open,
1277                        ChannelStatusDiscriminants::PendingToClose,
1278                    ]),
1279            )
1280            .map_err(HoprLibError::chain)
1281            .await?
1282            .map(|channel| {
1283                Ok(TicketSelector::from(&channel)
1284                    .with_amount(min_value..)
1285                    .with_index_range(channel.ticket_index..)
1286                    .with_state(AcknowledgedTicketStatus::Untouched))
1287            })
1288            .forward(self.redemption_requests()?)
1289            .await?;
1290
1291        Ok(())
1292    }
1293
1294    pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1295        &self,
1296        counterparty: &Address,
1297        min_value: B,
1298    ) -> errors::Result<()> {
1299        self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1300            .await
1301    }
1302
1303    pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1304        &self,
1305        channel_id: &Hash,
1306        min_value: B,
1307    ) -> errors::Result<()> {
1308        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1309
1310        let channel = self
1311            .chain_api
1312            .channel_by_id(channel_id)
1313            .await
1314            .map_err(HoprLibError::chain)?
1315            .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1316
1317        self.redemption_requests()?
1318            .send(
1319                TicketSelector::from(channel)
1320                    .with_amount(min_value.into()..)
1321                    .with_index_range(channel.ticket_index..)
1322                    .with_state(AcknowledgedTicketStatus::Untouched),
1323            )
1324            .await?;
1325
1326        Ok(())
1327    }
1328
1329    pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1330        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1331
1332        self.redemption_requests()?
1333            .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1334            .await?;
1335
1336        Ok(())
1337    }
1338
1339    pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1340        let peer_id = *peer_id;
1341        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
1342        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer_id))
1343            .await
1344            .map_err(|e| HoprLibError::GeneralError(format!("failed to convert peer id to off-chain key: {}", e)))?;
1345
1346        self.chain_api
1347            .packet_key_to_chain_key(&pubkey)
1348            .await
1349            .map_err(HoprLibError::chain)
1350    }
1351
1352    pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1353        self.chain_api
1354            .chain_key_to_packet_key(address)
1355            .await
1356            .map(|pk| pk.map(|v| v.into()))
1357            .map_err(HoprLibError::chain)
1358    }
1359}
1360
1361impl<Chain, Db> Hopr<Chain, Db> {
1362    // === telemetry
1363    /// Prometheus formatted metrics collected by the hopr-lib components.
1364    pub fn collect_hopr_metrics() -> errors::Result<String> {
1365        cfg_if::cfg_if! {
1366            if #[cfg(all(feature = "prometheus", not(test)))] {
1367                hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
1368            } else {
1369                Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
1370            }
1371        }
1372    }
1373}