hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15
16/// Helper functions.
17mod helpers;
18
19/// Configuration-related public types
20pub mod config;
21/// Various public constants.
22pub mod constants;
23/// Lists all errors thrown from this library.
24pub mod errors;
25
26/// Utility module with helper types and functionality over hopr-lib behavior.
27pub mod utils;
28
29/// Public traits for interactions with this library.
30pub mod traits;
31
32/// Functionality related to the HOPR node state.
33pub mod state;
34
35#[cfg(any(feature = "testing", test))]
36pub mod testing;
37
38pub use hopr_api as api;
39
40/// Exports of libraries necessary for API and interface operations.
41#[doc(hidden)]
42pub mod exports {
43    pub mod types {
44        pub use hopr_internal_types as internal;
45        pub use hopr_primitive_types as primitive;
46    }
47
48    pub mod crypto {
49        pub use hopr_crypto_keypair as keypair;
50        pub use hopr_crypto_types as types;
51    }
52
53    pub mod network {
54        pub use hopr_network_types as types;
55    }
56
57    pub use hopr_transport as transport;
58}
59
60/// Export of relevant types for easier integration.
61#[doc(hidden)]
62pub mod prelude {
63    pub use super::exports::{
64        crypto::{
65            keypair::key_pair::HoprKeys,
66            types::prelude::{ChainKeypair, Hash, OffchainKeypair},
67        },
68        network::types::{
69            prelude::ForeignDataMode,
70            udp::{ConnectedUdpStream, UdpStreamParallelism},
71        },
72        transport::{OffchainPublicKey, socket::HoprSocket},
73        types::primitive::prelude::Address,
74    };
75}
76
77use std::{
78    convert::identity,
79    num::NonZeroUsize,
80    sync::{Arc, OnceLock, atomic::Ordering},
81    time::Duration,
82};
83
84use futures::{FutureExt, SinkExt, Stream, StreamExt, TryFutureExt, channel::mpsc::channel};
85use hopr_api::{
86    chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
87    ct::TrafficGeneration,
88    db::{HoprNodeDbApi, TicketMarker, TicketSelector},
89};
90pub use hopr_api::{db::ChannelTicketStatistics, network::Observable};
91use hopr_async_runtime::prelude::spawn;
92pub use hopr_async_runtime::{Abortable, AbortableList};
93pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
94pub use hopr_crypto_types::prelude::*;
95pub use hopr_internal_types::prelude::*;
96pub use hopr_network_types::prelude::*;
97#[cfg(all(feature = "prometheus", not(test)))]
98use hopr_platform::time::native::current_time;
99pub use hopr_primitive_types::prelude::*;
100use hopr_transport::errors::HoprTransportError;
101#[cfg(feature = "runtime-tokio")]
102pub use hopr_transport::transfer_session;
103pub use hopr_transport::*;
104use tracing::{debug, error, info, warn};
105use validator::Validate;
106
107pub use crate::{
108    config::SafeModule,
109    constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
110    errors::{HoprLibError, HoprStatusError},
111    state::{HoprLibProcess, HoprState},
112    traits::chain::{CloseChannelResult, OpenChannelResult},
113};
114
115#[cfg(all(feature = "prometheus", not(test)))]
116lazy_static::lazy_static! {
117    static ref METRIC_PROCESS_START_TIME:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
118        "hopr_start_time",
119        "The unix timestamp in seconds at which the process was started"
120    ).unwrap();
121    static ref METRIC_HOPR_LIB_VERSION:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
122        "hopr_lib_version",
123        "Executed version of hopr-lib",
124        &["version"]
125    ).unwrap();
126    static ref METRIC_HOPR_NODE_INFO:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
127        "hopr_node_addresses",
128        "Node on-chain and off-chain addresses",
129        &["peerid", "address", "safe_address", "module_address"]
130    ).unwrap();
131}
132
133/// Prepare an optimized version of the tokio runtime setup for hopr-lib specifically.
134///
135/// Divide the available CPU parallelism by 2, since half of the available threads are
136/// to be used for IO-bound and half for CPU-bound tasks.
137#[cfg(feature = "runtime-tokio")]
138pub fn prepare_tokio_runtime(
139    num_cpu_threads: Option<NonZeroUsize>,
140    num_io_threads: Option<NonZeroUsize>,
141) -> anyhow::Result<tokio::runtime::Runtime> {
142    use std::str::FromStr;
143    let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
144
145    hopr_parallelize::cpu::init_thread_pool(
146        num_cpu_threads
147            .map(|v| v.get())
148            .or(avail_parallelism)
149            .ok_or(anyhow::anyhow!(
150                "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
151                 environment variable."
152            ))?
153            .max(1),
154    )?;
155
156    Ok(tokio::runtime::Builder::new_multi_thread()
157        .enable_all()
158        .worker_threads(
159            num_io_threads
160                .map(|v| v.get())
161                .or(avail_parallelism)
162                .ok_or(anyhow::anyhow!(
163                    "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
164                     environment variable."
165                ))?
166                .max(1),
167        )
168        .thread_name("hoprd")
169        .thread_stack_size(
170            std::env::var("HOPRD_THREAD_STACK_SIZE")
171                .ok()
172                .and_then(|v| usize::from_str(&v).ok())
173                .unwrap_or(10 * 1024 * 1024)
174                .max(2 * 1024 * 1024),
175        )
176        .build()?)
177}
178
179/// Type alias used to send and receive transport data via a running HOPR node.
180pub type HoprTransportIO = socket::HoprSocket<
181    futures::channel::mpsc::Receiver<ApplicationDataIn>,
182    futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
183>;
184
185type NewTicketEvents = (
186    async_broadcast::Sender<VerifiedTicket>,
187    async_broadcast::InactiveReceiver<VerifiedTicket>,
188);
189
190/// Time to wait until the node's keybinding appears on-chain
191const NODE_READY_TIMEOUT: Duration = Duration::from_secs(30);
192
193/// HOPR main object providing the entire HOPR node functionality
194///
195/// Instantiating this object creates all processes and objects necessary for
196/// running the HOPR node. Once created, the node can be started using the
197/// `run()` method.
198///
199/// Externally offered API should be enough to perform all necessary tasks
200/// with the HOPR node manually, but it is advised to create such a configuration
201/// that manual interaction is unnecessary.
202///
203/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
204pub struct Hopr<Chain, Db> {
205    me: OffchainKeypair,
206    cfg: config::HoprLibConfig,
207    state: Arc<state::AtomicHoprState>,
208    transport_api: HoprTransport<Chain, Db>,
209    redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
210    node_db: Db,
211    chain_api: Chain,
212    winning_ticket_subscribers: NewTicketEvents,
213    processes: OnceLock<AbortableList<HoprLibProcess>>,
214}
215
216impl<Chain, Db> Hopr<Chain, Db>
217where
218    Chain: HoprChainApi + Clone + Send + Sync + 'static,
219    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
220{
221    pub async fn new(
222        identity: (&ChainKeypair, &OffchainKeypair),
223        hopr_chain_api: Chain,
224        hopr_node_db: Db,
225        cfg: config::HoprLibConfig,
226    ) -> errors::Result<Self> {
227        if hopr_crypto_random::is_rng_fixed() {
228            warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
229        }
230
231        cfg.validate()?;
232
233        let hopr_transport_api = HoprTransport::new(
234            identity,
235            hopr_chain_api.clone(),
236            hopr_node_db.clone(),
237            vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
238            cfg.protocol,
239        );
240
241        #[cfg(all(feature = "prometheus", not(test)))]
242        {
243            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
244            METRIC_HOPR_LIB_VERSION.set(
245                &[const_format::formatcp!("{}", constants::APP_VERSION)],
246                const_format::formatcp!(
247                    "{}.{}",
248                    env!("CARGO_PKG_VERSION_MAJOR"),
249                    env!("CARGO_PKG_VERSION_MINOR")
250                )
251                .parse()
252                .unwrap_or(0.0),
253            );
254
255            // Calling get_ticket_statistics will initialize the respective metrics on tickets
256            if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
257                error!(%error, "failed to initialize ticket statistics metrics");
258            }
259        }
260
261        let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
262        new_tickets_tx.set_await_active(false);
263        new_tickets_tx.set_overflow(true);
264
265        Ok(Self {
266            me: identity.1.clone(),
267            cfg,
268            state: Arc::new(state::AtomicHoprState::new(HoprState::Uninitialized)),
269            transport_api: hopr_transport_api,
270            chain_api: hopr_chain_api,
271            node_db: hopr_node_db,
272            redeem_requests: OnceLock::new(),
273            processes: OnceLock::new(),
274            winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
275        })
276    }
277
278    fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
279        if self.status() == state {
280            Ok(())
281        } else {
282            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
283        }
284    }
285
286    pub fn status(&self) -> HoprState {
287        self.state.load(Ordering::Relaxed)
288    }
289
290    pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
291        self.chain_api
292            .balance(self.me_onchain())
293            .await
294            .map_err(HoprLibError::chain)
295    }
296
297    pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
298        self.chain_api
299            .balance(self.cfg.safe_module.safe_address)
300            .await
301            .map_err(HoprLibError::chain)
302    }
303
304    pub async fn chain_info(&self) -> errors::Result<ChainInfo> {
305        self.chain_api.chain_info().await.map_err(HoprLibError::chain)
306    }
307
308    pub fn get_safe_config(&self) -> SafeModule {
309        self.cfg.safe_module.clone()
310    }
311
312    pub fn config(&self) -> &config::HoprLibConfig {
313        &self.cfg
314    }
315
316    #[inline]
317    fn is_public(&self) -> bool {
318        self.cfg.publish
319    }
320
321    pub async fn run<
322        Ct,
323        #[cfg(feature = "session-server")] T: traits::session::HoprSessionServer + Clone + Send + 'static,
324    >(
325        &self,
326        cover_traffic: Ct,
327        #[cfg(feature = "session-server")] serve_handler: T,
328    ) -> errors::Result<HoprTransportIO>
329    where
330        Ct: TrafficGeneration + Send + Sync + 'static,
331    {
332        self.error_if_not_in_state(
333            HoprState::Uninitialized,
334            "cannot start the hopr node multiple times".into(),
335        )?;
336
337        #[cfg(feature = "testing")]
338        warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
339
340        info!(
341            address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
342            "node is not started, please fund this node",
343        );
344
345        helpers::wait_for_funds(
346            *MIN_NATIVE_BALANCE,
347            *SUGGESTED_NATIVE_BALANCE,
348            Duration::from_secs(200),
349            self.me_onchain(),
350            &self.chain_api,
351        )
352        .await?;
353
354        let mut processes = AbortableList::<HoprLibProcess>::default();
355
356        info!("starting HOPR node...");
357        self.state.store(HoprState::Initializing, Ordering::Relaxed);
358
359        let balance: XDaiBalance = self.get_balance().await?;
360        let minimum_balance = *constants::MIN_NATIVE_BALANCE;
361
362        info!(
363            address = %self.me_onchain(),
364            %balance,
365            %minimum_balance,
366            "node information"
367        );
368
369        if balance.le(&minimum_balance) {
370            return Err(HoprLibError::GeneralError(
371                "cannot start the node without a sufficiently funded wallet".into(),
372            ));
373        }
374
375        // Once we are able to query the chain,
376        // check if the ticket price is configured correctly.
377        let network_min_ticket_price = self
378            .chain_api
379            .minimum_ticket_price()
380            .await
381            .map_err(HoprLibError::chain)?;
382        let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
383        if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
384            return Err(HoprLibError::GeneralError(format!(
385                "configured outgoing ticket price is lower than the network minimum ticket price: \
386                 {configured_ticket_price:?} < {network_min_ticket_price}"
387            )));
388        }
389        // Once we are able to query the chain,
390        // check if the winning probability is configured correctly.
391        let network_min_win_prob = self
392            .chain_api
393            .minimum_incoming_ticket_win_prob()
394            .await
395            .map_err(HoprLibError::chain)?;
396        let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
397        if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
398            && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
399        {
400            return Err(HoprLibError::GeneralError(format!(
401                "configured outgoing ticket winning probability is lower than the network minimum winning \
402                 probability: {configured_win_prob:?} < {network_min_win_prob}"
403            )));
404        }
405
406        // Calculate the minimum capacity based on accounts (each account can generate 2 messages),
407        // plus 100 as an additional buffer
408        let minimum_capacity = self
409            .chain_api
410            .count_accounts(AccountSelector {
411                public_only: true,
412                ..Default::default()
413            })
414            .await
415            .map_err(HoprLibError::chain)?
416            .saturating_mul(2)
417            .saturating_add(100);
418
419        let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
420            .ok()
421            .and_then(|s| s.trim().parse::<usize>().ok())
422            .filter(|&c| c > 0)
423            .unwrap_or(2048)
424            .max(minimum_capacity);
425
426        debug!(
427            capacity = chain_discovery_events_capacity,
428            minimum_required = minimum_capacity,
429            "creating chain discovery events channel"
430        );
431        let (indexer_peer_update_tx, indexer_peer_update_rx) =
432            channel::<PeerDiscovery>(chain_discovery_events_capacity);
433
434        // Stream all the existing announcements and also subscribe to all future on-chain
435        // announcements
436        let (announcements_stream, announcements_handle) = futures::stream::abortable(
437            self.chain_api
438                .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])
439                .map_err(HoprLibError::chain)?,
440        );
441        processes.insert(HoprLibProcess::AccountAnnouncements, announcements_handle);
442
443        spawn(
444            announcements_stream
445                .filter_map(|event| {
446                    futures::future::ready(event.try_as_announcement().map(|account| {
447                        PeerDiscovery::Announce(account.public_key.into(), account.get_multiaddrs().to_vec())
448                    }))
449                })
450                .map(Ok)
451                .forward(indexer_peer_update_tx)
452                .inspect(
453                    |_| warn!(task = %HoprLibProcess::AccountAnnouncements,"long-running background task finished"),
454                ),
455        );
456
457        info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
458
459        let safe_addr = self.cfg.safe_module.safe_address;
460
461        if self.me_onchain() == safe_addr {
462            return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
463        }
464
465        info!(%safe_addr, "registering safe with this node");
466        match self.chain_api.register_safe(&safe_addr).await {
467            Ok(awaiter) => {
468                // Wait until the registration is confirmed on-chain, otherwise we cannot proceed.
469                awaiter.await.map_err(|error| {
470                    error!(%safe_addr, %error, "safe registration failed with error");
471                    HoprLibError::chain(error)
472                })?;
473                info!(%safe_addr, "safe successfully registered with this node");
474            }
475            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
476                info!(%safe_addr, "this safe is already registered with this node");
477            }
478            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
479                // TODO: support safe deregistration flow
480                error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
481                return Err(HoprLibError::GeneralError("node registered with different safe".into()));
482            }
483            Err(error) => {
484                error!(%safe_addr, %error, "safe registration failed");
485                return Err(HoprLibError::chain(error));
486            }
487        }
488
489        // Only public nodes announce multiaddresses
490        let multiaddresses_to_announce = if self.is_public() {
491            // The multiaddresses are filtered for the non-private ones,
492            // unless `announce_local_addresses` is set to `true`.
493            self.transport_api.announceable_multiaddresses()
494        } else {
495            Vec::with_capacity(0)
496        };
497
498        // Warn when announcing a private multiaddress, which is acceptable in certain scenarios
499        multiaddresses_to_announce
500            .iter()
501            .filter(|a| !is_public_address(a))
502            .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
503
504        let chain_api = self.chain_api.clone();
505        let me_offchain = *self.me.public();
506        let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
507
508        // At this point the node is already registered with Safe, so
509        // we can announce via Safe-compliant TX
510        info!(?multiaddresses_to_announce, "announcing node on chain");
511        match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
512            Ok(awaiter) => {
513                // Wait until the announcement is confirmed on-chain, otherwise we cannot proceed.
514                awaiter.await.map_err(|error| {
515                    error!(?multiaddresses_to_announce, %error, "node announcement failed");
516                    HoprLibError::chain(error)
517                })?;
518                info!(?multiaddresses_to_announce, "node has been successfully announced");
519            }
520            Err(AnnouncementError::AlreadyAnnounced) => {
521                info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
522            }
523            Err(error) => {
524                error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
525                return Err(HoprLibError::chain(error));
526            }
527        }
528
529        // Wait for the node key-binding readiness to return
530        let this_node_account = node_ready
531            .await
532            .map_err(HoprLibError::other)?
533            .map_err(HoprLibError::chain)?;
534        if this_node_account.chain_addr != self.me_onchain()
535            || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
536        {
537            error!(%this_node_account, "account bound to offchain key does not match this node");
538            return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
539        }
540
541        info!(%this_node_account, "node account is ready");
542
543        let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
544            .ok()
545            .and_then(|s| s.trim().parse::<usize>().ok())
546            .filter(|&c| c > 0)
547            .unwrap_or(256);
548
549        let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
550        #[cfg(feature = "session-server")]
551        {
552            debug!(capacity = incoming_session_channel_capacity, "creating session server");
553            processes.insert(
554                HoprLibProcess::SessionServer,
555                hopr_async_runtime::spawn_as_abortable!(
556                    _session_rx
557                        .for_each_concurrent(None, move |session| {
558                            let serve_handler = serve_handler.clone();
559                            async move {
560                                let session_id = *session.session.id();
561                                match serve_handler.process(session).await {
562                                    Ok(_) => debug!(?session_id, "client session processed successfully"),
563                                    Err(error) => error!(
564                                        ?session_id,
565                                        %error,
566                                        "client session processing failed"
567                                    ),
568                                }
569                            }
570                        })
571                        .inspect(|_| tracing::warn!(
572                            task = %HoprLibProcess::SessionServer,
573                            "long-running background task finished"
574                        ))
575                ),
576            );
577        }
578
579        info!("starting ticket events processor");
580        let (tickets_tx, tickets_rx) = channel(1024);
581        let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
582        processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
583        let node_db = self.node_db.clone();
584        let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
585        spawn(
586            tickets_rx
587                .filter_map(move |ticket_event| {
588                    let node_db = node_db.clone();
589                    async move {
590                        match ticket_event {
591                            TicketEvent::WinningTicket(winning) => {
592                                if let Err(error) = node_db.insert_ticket(*winning).await {
593                                    tracing::error!(%error, %winning, "failed to insert ticket into database");
594                                } else {
595                                    tracing::debug!(%winning, "inserted ticket into database");
596                                }
597                                Some(winning)
598                            }
599                            TicketEvent::RejectedTicket(rejected, issuer) => {
600                                if let Some(issuer) = &issuer {
601                                    if let Err(error) =
602                                        node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
603                                    {
604                                        tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
605                                    } else {
606                                        tracing::debug!(%rejected, "marked ticket as rejected");
607                                    }
608                                } else {
609                                    tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
610                                }
611                                None
612                            }
613                        }
614                    }
615                })
616                .for_each(move |ticket| {
617                    if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
618                        tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
619                    }
620                    futures::future::ready(())
621                })
622                .inspect(|_| {
623                    tracing::warn!(
624                        task = %HoprLibProcess::TicketEvents,
625                        "long-running background task finished"
626                    )
627                }),
628        );
629
630        info!("starting transport");
631        let (hopr_socket, transport_processes) = self
632            .transport_api
633            .run(cover_traffic, indexer_peer_update_rx, tickets_tx, session_tx)
634            .await?;
635        processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
636
637        // Start a queue that takes care of redeeming tickets via given TicketSelectors
638        let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
639        let _ = self.redeem_requests.set(redemption_req_tx);
640        let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
641        processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
642        let chain = self.chain_api.clone();
643        let node_db = self.node_db.clone();
644        spawn(redemption_req_rx
645            .for_each(move |selector| {
646                let chain = chain.clone();
647                let db = node_db.clone();
648                async move {
649                    match chain.redeem_tickets_via_selectors(&db, [selector]).await {
650                        Ok(res) => debug!(%res, "redemption complete"),
651                        Err(error) => error!(%error, "redemption failed"),
652                    }
653                }
654            })
655            .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
656        );
657
658        let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
659        processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
660        let chain = self.chain_api.clone();
661        let node_db = self.node_db.clone();
662        let events = chain.subscribe().map_err(HoprLibError::chain)?;
663        spawn(
664            futures::stream::Abortable::new(
665                events
666                    .filter_map(move |event|
667                        futures::future::ready(event.try_as_channel_closed())
668                    ),
669                chain_events_sub_reg
670            )
671            .for_each(move |closed_channel| {
672                let node_db = node_db.clone();
673                let chain = chain.clone();
674                async move {
675                    match closed_channel.direction(chain.me()) {
676                        Some(ChannelDirection::Incoming) => {
677                            match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
678                                Ok(num_neglected) if num_neglected > 0 => {
679                                    warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
680                                },
681                                Ok(_) => {
682                                    debug!(%closed_channel, "no neglected tickets on incoming closed channel");
683                                },
684                                Err(error) => {
685                                    error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
686                                }
687                            }
688                        },
689                        Some(ChannelDirection::Outgoing) => {
690                            if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
691                                error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
692                            } else {
693                                debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
694                            }
695                        }
696                        _ => {} // Event for a channel that is not our own
697                    }
698                }
699            })
700            .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
701        );
702
703        // NOTE: after the chain is synced, we can reset tickets which are considered
704        // redeemed but on-chain state does not align with that. This implies there was a problem
705        // right when the transaction was sent on-chain. In such cases, we simply let it retry and
706        // handle errors appropriately.
707        let mut channels = self
708            .chain_api
709            .stream_channels(ChannelSelector {
710                destination: self.me_onchain().into(),
711                ..Default::default()
712            })
713            .map_err(HoprLibError::chain)
714            .await?;
715
716        while let Some(channel) = channels.next().await {
717            self.node_db
718                .update_ticket_states_and_fetch(
719                    [TicketSelector::from(&channel)
720                        .with_state(AcknowledgedTicketStatus::BeingRedeemed)
721                        .with_index_range(channel.ticket_index..)],
722                    AcknowledgedTicketStatus::Untouched,
723                )
724                .map_err(HoprLibError::db)
725                .await?
726                .for_each(|ticket| {
727                    info!(%ticket, "fixed next out-of-sync ticket");
728                    futures::future::ready(())
729                })
730                .await;
731        }
732
733        self.state.store(HoprState::Running, Ordering::Relaxed);
734
735        info!(
736            id = %self.me_peer_id(),
737            version = constants::APP_VERSION,
738            "NODE STARTED AND RUNNING"
739        );
740
741        #[cfg(all(feature = "prometheus", not(test)))]
742        METRIC_HOPR_NODE_INFO.set(
743            &[
744                &self.me.public().to_peerid_str(),
745                &self.me_onchain().to_string(),
746                &self.cfg.safe_module.safe_address.to_string(),
747                &self.cfg.safe_module.module_address.to_string(),
748            ],
749            1.0,
750        );
751
752        let _ = self.processes.set(processes);
753        Ok(hopr_socket)
754    }
755
756    /// Used to practically shut down all node's processes without dropping the instance.
757    ///
758    /// This means that the instance can be used to retrieve some information, but all
759    /// active operations will stop and new will be impossible to perform.
760    /// Such operations will return [`HoprStatusError::NotThereYet`].
761    ///
762    /// This is the final state and cannot be reversed by calling [`HoprLib::run`] again.
763    pub fn shutdown(&self) -> Result<(), HoprLibError> {
764        self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
765        if let Some(processes) = self.processes.get() {
766            processes.abort_all();
767        }
768        self.state.store(HoprState::Terminated, Ordering::Relaxed);
769        info!("NODE SHUTDOWN COMPLETE");
770        Ok(())
771    }
772
773    /// Allows external users to receive notifications about new winning tickets.
774    pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send {
775        self.winning_ticket_subscribers.1.activate_cloned()
776    }
777
778    // p2p transport =========
779    /// Own PeerId used in the libp2p transport layer
780    pub fn me_peer_id(&self) -> PeerId {
781        (*self.me.public()).into()
782    }
783
784    /// Get the list of all announced public nodes in the network
785    pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
786        Ok(self
787            .chain_api
788            .stream_accounts(AccountSelector {
789                public_only: true,
790                ..Default::default()
791            })
792            .map_err(HoprLibError::chain)
793            .await?
794            .map(|entry| {
795                (
796                    PeerId::from(entry.public_key),
797                    entry.chain_addr,
798                    entry.get_multiaddrs().to_vec(),
799                )
800            })
801            .collect()
802            .await)
803    }
804
805    /// Ping another node in the network based on the PeerId
806    ///
807    /// Returns the RTT (round trip time), i.e. how long it took for the ping to return.
808    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, Observations)> {
809        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
810
811        Ok(self.transport_api.ping(peer).await?)
812    }
813
814    /// Create a client session connection returning a session object that implements
815    /// [`futures::io::AsyncRead`] and [`futures::io::AsyncWrite`] and can bu used as a read/write binary session.
816    #[cfg(feature = "session-client")]
817    pub async fn connect_to(
818        &self,
819        destination: Address,
820        target: SessionTarget,
821        cfg: SessionClientConfig,
822    ) -> errors::Result<HoprSession> {
823        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
824
825        let backoff = backon::ConstantBuilder::default()
826            .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
827            .with_delay(self.cfg.protocol.session.establish_retry_timeout)
828            .with_jitter();
829
830        use backon::Retryable;
831
832        Ok((|| {
833            let cfg = cfg.clone();
834            let target = target.clone();
835            async { self.transport_api.new_session(destination, target, cfg).await }
836        })
837        .retry(backoff)
838        .sleep(backon::FuturesTimerSleeper)
839        .await?)
840    }
841
842    /// Sends keep-alive to the given [`HoprSessionId`], making sure the session is not
843    /// closed due to inactivity.
844    #[cfg(feature = "session-client")]
845    pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
846        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
847        Ok(self.transport_api.probe_session(id).await?)
848    }
849
850    #[cfg(feature = "session-client")]
851    pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
852        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
853        Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
854    }
855
856    #[cfg(feature = "session-client")]
857    pub async fn update_session_surb_balancer_config(
858        &self,
859        id: &SessionId,
860        cfg: SurbBalancerConfig,
861    ) -> errors::Result<()> {
862        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
863        Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
864    }
865
866    /// List all multiaddresses announced by this node
867    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
868        self.transport_api.local_multiaddresses()
869    }
870
871    /// List all multiaddresses on which the node is listening
872    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
873        self.transport_api.listening_multiaddresses().await
874    }
875
876    /// List all multiaddresses observed for a PeerId
877    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
878        self.transport_api.network_observed_multiaddresses(peer).await
879    }
880
881    /// List all multiaddresses announced on-chain for the given node.
882    pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> errors::Result<Vec<Multiaddr>> {
883        let peer = *peer;
884        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
885        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
886
887        match self
888            .chain_api
889            .stream_accounts(AccountSelector {
890                public_only: false,
891                offchain_key: Some(pubkey),
892                ..Default::default()
893            })
894            .map_err(HoprLibError::chain)
895            .await?
896            .next()
897            .await
898        {
899            Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
900            None => {
901                error!(%peer, "no information");
902                Ok(vec![])
903            }
904        }
905    }
906
907    // Network =========
908
909    /// Get measured network health
910    pub async fn network_health(&self) -> Health {
911        self.transport_api.network_health().await
912    }
913
914    /// List all peers connected to this
915    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
916        Ok(self.transport_api.network_connected_peers().await?)
917    }
918
919    /// Get all data collected from the network relevant for a PeerId
920    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<Observations>> {
921        Ok(self.transport_api.network_peer_observations(peer).await?)
922    }
923
924    /// Get peers connected peers with quality higher than some value
925    pub async fn all_network_peers(
926        &self,
927        minimum_score: f64,
928    ) -> errors::Result<Vec<(Option<Address>, PeerId, Observations)>> {
929        Ok(
930            futures::stream::iter(self.transport_api.network_connected_peers().await?)
931                .filter_map(|peer| async move {
932                    if let Ok(Some(info)) = self.transport_api.network_peer_observations(&peer).await {
933                        if info.score() >= minimum_score {
934                            Some((peer, info))
935                        } else {
936                            None
937                        }
938                    } else {
939                        None
940                    }
941                })
942                .filter_map(|(peer_id, info)| async move {
943                    let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
944                    Some((address, peer_id, info))
945                })
946                .collect::<Vec<_>>()
947                .await,
948        )
949    }
950
951    // Ticket ========
952    /// Get all tickets in a channel specified by [`channel_id`](ChannelId).
953    pub async fn tickets_in_channel(&self, channel_id: &ChannelId) -> errors::Result<Option<Vec<RedeemableTicket>>> {
954        if let Some(channel) = self
955            .chain_api
956            .channel_by_id(channel_id)
957            .await
958            .map_err(|e| HoprTransportError::Other(e.into()))?
959        {
960            if &channel.destination == self.chain_api.me() {
961                Ok(Some(
962                    self.node_db
963                        .stream_tickets([&channel])
964                        .await
965                        .map_err(HoprLibError::db)?
966                        .collect()
967                        .await,
968                ))
969            } else {
970                Ok(None)
971            }
972        } else {
973            Ok(None)
974        }
975    }
976
977    /// Get all tickets
978    pub async fn all_tickets(&self) -> errors::Result<Vec<VerifiedTicket>> {
979        Ok(self
980            .node_db
981            .stream_tickets(None::<TicketSelector>)
982            .await
983            .map_err(HoprLibError::db)?
984            .map(|v| v.ticket)
985            .collect()
986            .await)
987    }
988
989    /// Get statistics for all tickets
990    pub async fn ticket_statistics(&self) -> errors::Result<ChannelTicketStatistics> {
991        self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
992    }
993
994    /// Reset the ticket metrics to zero
995    pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
996        self.node_db
997            .reset_ticket_statistics()
998            .await
999            .map_err(HoprLibError::chain)
1000    }
1001
1002    // Chain =========
1003    pub fn me_onchain(&self) -> Address {
1004        *self.chain_api.me()
1005    }
1006
1007    /// Get ticket price
1008    pub async fn get_ticket_price(&self) -> errors::Result<HoprBalance> {
1009        self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1010    }
1011
1012    /// Get minimum incoming ticket winning probability
1013    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
1014        self.chain_api
1015            .minimum_incoming_ticket_win_prob()
1016            .await
1017            .map_err(HoprLibError::chain)
1018    }
1019
1020    /// List of all accounts announced on the chain
1021    pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1022        Ok(self
1023            .chain_api
1024            .stream_accounts(AccountSelector {
1025                public_only: true,
1026                ..Default::default()
1027            })
1028            .map_err(HoprLibError::chain)
1029            .await?
1030            .collect()
1031            .await)
1032    }
1033
1034    /// Get the channel entry from Hash.
1035    /// @returns the channel entry of those two nodes
1036    pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1037        self.chain_api
1038            .channel_by_id(channel_id)
1039            .await
1040            .map_err(HoprLibError::chain)
1041    }
1042
1043    /// Get the channel entry between source and destination node.
1044    /// @param src Address
1045    /// @param dest Address
1046    /// @returns the channel entry of those two nodes
1047    pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<Option<ChannelEntry>> {
1048        self.chain_api
1049            .channel_by_parties(src, dest)
1050            .await
1051            .map_err(HoprLibError::chain)
1052    }
1053
1054    /// List all channels open from a specified Address
1055    pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1056        Ok(self
1057            .chain_api
1058            .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1059                ChannelStatusDiscriminants::Closed,
1060                ChannelStatusDiscriminants::Open,
1061                ChannelStatusDiscriminants::PendingToClose,
1062            ]))
1063            .map_err(HoprLibError::chain)
1064            .await?
1065            .collect()
1066            .await)
1067    }
1068
1069    /// List all channels open to a specified address
1070    pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1071        Ok(self
1072            .chain_api
1073            .stream_channels(
1074                ChannelSelector::default()
1075                    .with_destination(*dest)
1076                    .with_allowed_states(&[
1077                        ChannelStatusDiscriminants::Closed,
1078                        ChannelStatusDiscriminants::Open,
1079                        ChannelStatusDiscriminants::PendingToClose,
1080                    ]),
1081            )
1082            .map_err(HoprLibError::chain)
1083            .await?
1084            .collect()
1085            .await)
1086    }
1087
1088    /// List all channels
1089    pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1090        Ok(self
1091            .chain_api
1092            .stream_channels(ChannelSelector::default().with_allowed_states(&[
1093                ChannelStatusDiscriminants::Closed,
1094                ChannelStatusDiscriminants::Open,
1095                ChannelStatusDiscriminants::PendingToClose,
1096            ]))
1097            .map_err(HoprLibError::chain)
1098            .await?
1099            .collect()
1100            .await)
1101    }
1102
1103    /// Current safe allowance balance
1104    pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1105        self.chain_api
1106            .safe_allowance(self.cfg.safe_module.safe_address)
1107            .await
1108            .map_err(HoprLibError::chain)
1109    }
1110
1111    /// Withdraw on-chain assets to a given address
1112    /// @param recipient the account where the assets should be transferred to
1113    /// @param amount how many tokens to be transferred
1114    pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1115        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1116
1117        self.chain_api
1118            .withdraw(amount, &recipient)
1119            .and_then(identity)
1120            .map_err(HoprLibError::chain)
1121            .await
1122    }
1123
1124    /// Withdraw on-chain native assets to a given address
1125    /// @param recipient the account where the assets should be transferred to
1126    /// @param amount how many tokens to be transferred
1127    pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<prelude::Hash> {
1128        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1129
1130        self.chain_api
1131            .withdraw(amount, &recipient)
1132            .and_then(identity)
1133            .map_err(HoprLibError::chain)
1134            .await
1135    }
1136
1137    pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1138        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1139
1140        let (channel_id, tx_hash) = self
1141            .chain_api
1142            .open_channel(destination, amount)
1143            .and_then(identity)
1144            .map_err(HoprLibError::chain)
1145            .await?;
1146
1147        Ok(OpenChannelResult { tx_hash, channel_id })
1148    }
1149
1150    pub async fn fund_channel(&self, channel_id: &prelude::Hash, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1151        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1152
1153        self.chain_api
1154            .fund_channel(channel_id, amount)
1155            .and_then(identity)
1156            .map_err(HoprLibError::chain)
1157            .await
1158    }
1159
1160    pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> errors::Result<CloseChannelResult> {
1161        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1162
1163        let tx_hash = self
1164            .chain_api
1165            .close_channel(channel_id)
1166            .and_then(identity)
1167            .map_err(HoprLibError::chain)
1168            .await?;
1169
1170        Ok(CloseChannelResult { tx_hash })
1171    }
1172
1173    pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1174        self.chain_api
1175            .channel_closure_notice_period()
1176            .await
1177            .map_err(HoprLibError::chain)
1178    }
1179
1180    pub fn redemption_requests(
1181        &self,
1182    ) -> errors::Result<impl futures::Sink<TicketSelector, Error = HoprLibError> + Clone> {
1183        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1184
1185        // TODO: add universal timeout sink here
1186        Ok(self
1187            .redeem_requests
1188            .get()
1189            .cloned()
1190            .expect("redeem_requests is not initialized")
1191            .sink_map_err(HoprLibError::other))
1192    }
1193
1194    pub async fn redeem_all_tickets<B: Into<HoprBalance>>(&self, min_value: B) -> errors::Result<()> {
1195        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1196
1197        let min_value = min_value.into();
1198
1199        self.chain_api
1200            .stream_channels(
1201                ChannelSelector::default()
1202                    .with_destination(self.me_onchain())
1203                    .with_allowed_states(&[
1204                        ChannelStatusDiscriminants::Open,
1205                        ChannelStatusDiscriminants::PendingToClose,
1206                    ]),
1207            )
1208            .map_err(HoprLibError::chain)
1209            .await?
1210            .map(|channel| {
1211                Ok(TicketSelector::from(&channel)
1212                    .with_amount(min_value..)
1213                    .with_index_range(channel.ticket_index..)
1214                    .with_state(AcknowledgedTicketStatus::Untouched))
1215            })
1216            .forward(self.redemption_requests()?)
1217            .await?;
1218
1219        Ok(())
1220    }
1221
1222    pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1223        &self,
1224        counterparty: &Address,
1225        min_value: B,
1226    ) -> errors::Result<()> {
1227        self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1228            .await
1229    }
1230
1231    pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1232        &self,
1233        channel_id: &Hash,
1234        min_value: B,
1235    ) -> errors::Result<()> {
1236        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1237
1238        let channel = self
1239            .chain_api
1240            .channel_by_id(channel_id)
1241            .await
1242            .map_err(HoprLibError::chain)?
1243            .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1244
1245        self.redemption_requests()?
1246            .send(
1247                TicketSelector::from(channel)
1248                    .with_amount(min_value.into()..)
1249                    .with_index_range(channel.ticket_index..)
1250                    .with_state(AcknowledgedTicketStatus::Untouched),
1251            )
1252            .await?;
1253
1254        Ok(())
1255    }
1256
1257    pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1258        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1259
1260        self.redemption_requests()?
1261            .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1262            .await?;
1263
1264        Ok(())
1265    }
1266
1267    pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1268        let peer_id = *peer_id;
1269        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
1270        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer_id))
1271            .await
1272            .map_err(|e| HoprLibError::GeneralError(format!("failed to convert peer id to off-chain key: {}", e)))?;
1273
1274        self.chain_api
1275            .packet_key_to_chain_key(&pubkey)
1276            .await
1277            .map_err(HoprLibError::chain)
1278    }
1279
1280    pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1281        self.chain_api
1282            .chain_key_to_packet_key(address)
1283            .await
1284            .map(|pk| pk.map(|v| v.into()))
1285            .map_err(HoprLibError::chain)
1286    }
1287}
1288
1289impl<Chain, Db> Hopr<Chain, Db> {
1290    // === telemetry
1291    /// Prometheus formatted metrics collected by the hopr-lib components.
1292    pub fn collect_hopr_metrics() -> errors::Result<String> {
1293        cfg_if::cfg_if! {
1294            if #[cfg(all(feature = "prometheus", not(test)))] {
1295                hopr_metrics::gather_all_metrics().map_err(|e| HoprLibError::Other(e.into()))
1296            } else {
1297                Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
1298            }
1299        }
1300    }
1301}