hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15
16/// Helper functions.
17mod helpers;
18
19/// Configuration-related public types
20pub mod config;
21/// Various public constants.
22pub mod constants;
23/// Lists all errors thrown from this library.
24pub mod errors;
25
26/// Utility module with helper types and functionality over hopr-lib behavior.
27pub mod utils;
28
29/// Public traits for interactions with this library.
30pub mod traits;
31
32/// Functionality related to the HOPR node state.
33pub mod state;
34
35#[cfg(any(feature = "testing", test))]
36pub mod testing;
37
38/// Re-exports of libraries necessary for API and interface operations.
39#[doc(hidden)]
40pub mod exports {
41    pub mod chain {
42        pub use hopr_chain_types as types;
43    }
44
45    pub mod types {
46        pub use hopr_primitive_types as primitive;
47    }
48    pub mod crypto {
49        pub use hopr_crypto_keypair as keypair;
50        pub use hopr_crypto_types as types;
51    }
52
53    pub mod network {
54        pub use hopr_network_types as types;
55    }
56
57    pub use hopr_transport as transport;
58}
59
60/// Export of relevant types for easier integration.
61#[doc(hidden)]
62pub mod prelude {
63    pub use super::exports::{
64        crypto::{
65            keypair::key_pair::HoprKeys,
66            types::prelude::{ChainKeypair, Hash, OffchainKeypair},
67        },
68        network::types::{
69            prelude::ForeignDataMode,
70            udp::{ConnectedUdpStream, UdpStreamParallelism},
71        },
72        transport::{OffchainPublicKey, socket::HoprSocket},
73        types::primitive::prelude::Address,
74    };
75}
76
77use std::{
78    collections::HashMap,
79    ops::Deref,
80    path::PathBuf,
81    str::FromStr,
82    sync::{Arc, atomic::Ordering},
83    time::Duration,
84};
85
86use async_lock::RwLock;
87pub use async_trait::async_trait;
88use errors::{HoprLibError, HoprStatusError};
89use futures::{FutureExt, StreamExt, channel::mpsc::channel, future::AbortHandle};
90use hopr_api::{
91    chain::{
92        AccountSelector, AnnouncementError, ChainEvents, ChainKeyOperations, ChainReadAccountOperations,
93        ChainReadChannelOperations, ChainValues, ChainWriteAccountOperations, ChainWriteChannelOperations,
94        ChainWriteTicketOperations, ChannelSelector,
95    },
96    db::{HoprDbPeersOperations, HoprDbTicketOperations, PeerStatus, TicketSelector},
97};
98use hopr_async_runtime::prelude::spawn;
99pub use hopr_chain_api::config::{
100    Addresses as NetworkContractAddresses, EnvironmentType, Network as ChainNetwork, ProtocolsConfig,
101};
102use hopr_chain_api::{HoprChain, HoprChainProcess, config::ChainNetworkConfig, errors::HoprChainError, wait_for_funds};
103use hopr_chain_types::ContractAddresses;
104pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
105use hopr_crypto_types::prelude::Hash;
106use hopr_db_node::{HoprNodeDb, HoprNodeDbConfig};
107pub use hopr_internal_types::prelude::*;
108pub use hopr_network_types::prelude::{DestinationRouting, IpProtocol, RoutingOptions};
109pub use hopr_path::channel_graph::GraphExportConfig;
110use hopr_path::channel_graph::{ChannelGraph, ChannelGraphConfig, NodeScoreUpdate};
111#[cfg(all(feature = "prometheus", not(test)))]
112use hopr_platform::time::native::current_time;
113pub use hopr_primitive_types::prelude::*;
114pub use hopr_strategy::Strategy;
115use hopr_strategy::strategy::{MultiStrategy, SingularStrategy};
116#[cfg(feature = "runtime-tokio")]
117pub use hopr_transport::transfer_session;
118pub use hopr_transport::{
119    ApplicationData, ApplicationDataIn, ApplicationDataOut, HalfKeyChallenge, Health, HoprSession, IncomingSession,
120    Keypair, Multiaddr, OffchainKeypair as HoprOffchainKeypair, PeerId, PingQueryReplier, ProbeError, SESSION_MTU,
121    SURB_SIZE, ServiceId, SessionCapabilities, SessionCapability, SessionClientConfig, SessionId as HoprSessionId,
122    SessionManagerError, SessionTarget, SurbBalancerConfig, Tag, Telemetry, TicketStatistics, TrafficGeneration,
123    TransportSessionError,
124    config::{HostConfig, HostType, looks_like_domain},
125    errors::{HoprTransportError, NetworkingError, ProtocolError},
126};
127use hopr_transport::{
128    ChainKeypair, HoprTransport, HoprTransportConfig, OffchainKeypair, PeerDiscovery, execute_on_tick,
129};
130use tracing::{debug, error, info, trace, warn};
131
132use crate::{
133    config::SafeModule,
134    constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
135    state::HoprState,
136    traits::chain::{CloseChannelResult, OpenChannelResult},
137};
138
139#[cfg(all(feature = "prometheus", not(test)))]
140lazy_static::lazy_static! {
141    static ref METRIC_PROCESS_START_TIME:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
142        "hopr_start_time",
143        "The unix timestamp in seconds at which the process was started"
144    ).unwrap();
145    static ref METRIC_HOPR_LIB_VERSION:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
146        "hopr_lib_version",
147        "Executed version of hopr-lib",
148        &["version"]
149    ).unwrap();
150    static ref METRIC_HOPR_NODE_INFO:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
151        "hopr_node_addresses",
152        "Node on-chain and off-chain addresses",
153        &["peerid", "address", "safe_address", "module_address"]
154    ).unwrap();
155}
156
157pub struct DummyCoverTrafficType {
158    #[allow(dead_code)]
159    _unconstructable: (),
160}
161
162impl TrafficGeneration for DummyCoverTrafficType {
163    fn build(
164        self,
165    ) -> (
166        impl futures::Stream<Item = DestinationRouting> + Send,
167        impl futures::Sink<
168            std::result::Result<hopr_transport::Telemetry, hopr_transport::ProbeError>,
169            Error = impl std::error::Error,
170        > + Send
171        + Sync
172        + Clone
173        + 'static,
174    ) {
175        (
176            futures::stream::empty(),
177            futures::sink::drain::<std::result::Result<hopr_transport::Telemetry, hopr_transport::ProbeError>>(),
178        )
179    }
180}
181
182/// Prepare an optimized version of the tokio runtime setup for hopr-lib specifically.
183///
184/// Divide the available CPU parallelism by 2, since half of the available threads are
185/// to be used for IO-bound and half for CPU-bound tasks.
186#[cfg(feature = "runtime-tokio")]
187pub fn prepare_tokio_runtime() -> anyhow::Result<tokio::runtime::Runtime> {
188    let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
189
190    hopr_parallelize::cpu::init_thread_pool(
191        std::env::var("HOPRD_NUM_CPU_THREADS")
192            .ok()
193            .and_then(|v| usize::from_str(&v).ok())
194            .or(avail_parallelism)
195            .ok_or(anyhow::anyhow!(
196                "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
197                 environment variable."
198            ))?
199            .max(1),
200    )?;
201
202    Ok(tokio::runtime::Builder::new_multi_thread()
203        .enable_all()
204        .worker_threads(
205            std::env::var("HOPRD_NUM_IO_THREADS")
206                .ok()
207                .and_then(|v| usize::from_str(&v).ok())
208                .or(avail_parallelism)
209                .ok_or(anyhow::anyhow!(
210                    "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
211                     environment variable."
212                ))?
213                .max(1),
214        )
215        .thread_name("hoprd")
216        .thread_stack_size(
217            std::env::var("HOPRD_THREAD_STACK_SIZE")
218                .ok()
219                .and_then(|v| usize::from_str(&v).ok())
220                .unwrap_or(10 * 1024 * 1024)
221                .max(2 * 1024 * 1024),
222        )
223        .build()?)
224}
225
226/// HOPR main object providing the entire HOPR node functionality
227///
228/// Instantiating this object creates all processes and objects necessary for
229/// running the HOPR node. Once created, the node can be started using the
230/// `run()` method.
231///
232/// Externally offered API should be sufficient to perform all necessary tasks
233/// with the HOPR node manually, but it is advised to create such a configuration
234/// that manual interaction is unnecessary.
235///
236/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
237pub struct Hopr {
238    me: OffchainKeypair,
239    cfg: config::HoprLibConfig,
240    state: Arc<state::AtomicHoprState>,
241    transport_api: HoprTransport<HoprNodeDb, HoprChain>,
242    hopr_chain_api: HoprChain,
243    node_db: HoprNodeDb,
244    // objects that could be removed pending architectural cleanup ========
245    chain_cfg: ChainNetworkConfig,
246    channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
247    multistrategy: Arc<MultiStrategy>,
248}
249
250impl Hopr {
251    pub async fn new(
252        mut cfg: config::HoprLibConfig,
253        me: &OffchainKeypair,
254        me_onchain: &ChainKeypair,
255    ) -> crate::errors::Result<Self> {
256        if hopr_crypto_random::is_rng_fixed() {
257            warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
258        }
259
260        let multiaddress: Multiaddr = (&cfg.host).try_into()?;
261
262        let db_path: PathBuf = [&cfg.db.data, "node_db"].iter().collect();
263        info!(path = ?db_path, "Initiating DB");
264
265        if cfg.db.force_initialize {
266            info!("Force cleaning up existing database");
267            hopr_platform::file::native::remove_dir_all(db_path.as_path()).map_err(|e| {
268                HoprLibError::GeneralError(format!(
269                    "Failed to remove the existing DB directory at '{db_path:?}': {e}"
270                ))
271            })?;
272            cfg.db.initialize = true
273        }
274
275        // create DB dir if it does not exist
276        if let Some(parent_dir_path) = db_path.as_path().parent() {
277            if !parent_dir_path.is_dir() {
278                std::fs::create_dir_all(parent_dir_path).map_err(|e| {
279                    HoprLibError::GeneralError(format!(
280                        "Failed to create DB parent directory at '{parent_dir_path:?}': {e}"
281                    ))
282                })?
283            }
284        }
285
286        let db_cfg = HoprNodeDbConfig {
287            create_if_missing: cfg.db.initialize,
288            force_create: cfg.db.force_initialize,
289            log_slow_queries: std::time::Duration::from_millis(150),
290            surb_ring_buffer_size: std::env::var("HOPR_PROTOCOL_SURB_RB_SIZE")
291                .ok()
292                .and_then(|s| u64::from_str(&s).map(|v| v as usize).ok())
293                .unwrap_or_else(|| HoprNodeDbConfig::default().surb_ring_buffer_size),
294            surb_distress_threshold: std::env::var("HOPR_PROTOCOL_SURB_RB_DISTRESS")
295                .ok()
296                .and_then(|s| u64::from_str(&s).map(|v| v as usize).ok())
297                .unwrap_or_else(|| HoprNodeDbConfig::default().surb_distress_threshold),
298        };
299        let node_db = HoprNodeDb::new(db_path.as_path(), me_onchain.clone(), db_cfg).await?;
300
301        if let Some(provider) = &cfg.chain.provider {
302            info!(provider, "Creating chain components using the custom provider");
303        } else {
304            info!("Creating chain components using the default provider");
305        }
306        let resolved_environment = hopr_chain_api::config::ChainNetworkConfig::new(
307            &cfg.chain.network,
308            crate::constants::APP_VERSION_COERCED,
309            cfg.chain.provider.as_deref(),
310            cfg.chain.max_rpc_requests_per_sec,
311            &mut cfg.chain.protocols,
312        )
313        .map_err(|e| HoprLibError::GeneralError(format!("Failed to resolve blockchain environment: {e}")))?;
314
315        let contract_addresses = ContractAddresses::from(&resolved_environment);
316        info!(
317            myself = me_onchain.public().to_hex(),
318            contract_addresses = ?contract_addresses,
319            "Resolved contract addresses",
320        );
321
322        let my_multiaddresses = vec![multiaddress];
323
324        let channel_graph = Arc::new(RwLock::new(ChannelGraph::new(
325            me_onchain.public().to_address(),
326            ChannelGraphConfig::default(),
327        )));
328
329        // TODO (4.0): replace this with new implementation that follows the chain traits from the hopr-api crate
330        let hopr_chain_api = hopr_chain_api::HoprChain::new(
331            me_onchain.clone(),
332            resolved_environment.clone(),
333            node_db.clone(),
334            &cfg.db.data,
335            cfg.safe_module.module_address,
336            ContractAddresses {
337                announcements: resolved_environment.announcements,
338                channels: resolved_environment.channels,
339                token: resolved_environment.token,
340                ticket_price_oracle: resolved_environment.ticket_price_oracle,
341                winning_probability_oracle: resolved_environment.winning_probability_oracle,
342                network_registry: resolved_environment.network_registry,
343                network_registry_proxy: resolved_environment.network_registry_proxy,
344                node_stake_v2_factory: resolved_environment.node_stake_v2_factory,
345                node_safe_registry: resolved_environment.node_safe_registry,
346                module_implementation: resolved_environment.module_implementation,
347            },
348            cfg.safe_module.safe_address,
349            hopr_chain_api::IndexerConfig {
350                start_block_number: resolved_environment.channel_contract_deploy_block as u64,
351                fast_sync: cfg.chain.fast_sync,
352                enable_logs_snapshot: cfg.chain.enable_logs_snapshot,
353                logs_snapshot_url: cfg.chain.logs_snapshot_url.clone(),
354                data_directory: cfg.db.data.clone(),
355            },
356        )?;
357
358        let hopr_transport_api = HoprTransport::new(
359            me,
360            me_onchain,
361            HoprTransportConfig {
362                transport: cfg.transport.clone(),
363                network: cfg.network_options.clone(),
364                protocol: cfg.protocol,
365                probe: cfg.probe,
366                session: cfg.session,
367            },
368            node_db.clone(),
369            hopr_chain_api.clone(),
370            channel_graph.clone(),
371            my_multiaddresses,
372        );
373
374        let multi_strategy = Arc::new(MultiStrategy::new(cfg.strategy.clone(), hopr_chain_api.clone()));
375        debug!(
376            strategies = tracing::field::debug(&multi_strategy),
377            "Initialized strategies"
378        );
379
380        #[cfg(all(feature = "prometheus", not(test)))]
381        {
382            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
383            METRIC_HOPR_LIB_VERSION.set(
384                &[const_format::formatcp!("{}", constants::APP_VERSION)],
385                f64::from_str(const_format::formatcp!(
386                    "{}.{}",
387                    env!("CARGO_PKG_VERSION_MAJOR"),
388                    env!("CARGO_PKG_VERSION_MINOR")
389                ))
390                .unwrap_or(0.0),
391            );
392
393            // Calling get_ticket_statistics will initialize the respective metrics on tickets
394            if let Err(e) = node_db.get_ticket_statistics(None).await {
395                error!(error = %e, "Failed to initialize ticket statistics metrics");
396            }
397        }
398
399        Ok(Self {
400            me: me.clone(),
401            cfg,
402            state: Arc::new(state::AtomicHoprState::new(state::HoprState::Uninitialized)),
403            transport_api: hopr_transport_api,
404            hopr_chain_api,
405            node_db,
406            chain_cfg: resolved_environment,
407            channel_graph,
408            multistrategy: multi_strategy,
409        })
410    }
411
412    fn error_if_not_in_state(&self, state: state::HoprState, error: String) -> errors::Result<()> {
413        if self.status() == state {
414            Ok(())
415        } else {
416            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
417        }
418    }
419
420    pub fn status(&self) -> state::HoprState {
421        self.state.load(Ordering::Relaxed)
422    }
423
424    pub fn network(&self) -> String {
425        self.cfg.chain.network.clone()
426    }
427
428    pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
429        Ok(self.hopr_chain_api.node_balance().await?)
430    }
431
432    pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
433        Ok(self.hopr_chain_api.safe_balance().await?)
434    }
435
436    pub fn get_safe_config(&self) -> SafeModule {
437        self.cfg.safe_module.clone()
438    }
439
440    pub fn chain_config(&self) -> ChainNetworkConfig {
441        self.chain_cfg.clone()
442    }
443
444    pub fn config(&self) -> &config::HoprLibConfig {
445        &self.cfg
446    }
447
448    pub fn get_provider(&self) -> String {
449        self.cfg
450            .chain
451            .provider
452            .clone()
453            .unwrap_or(self.chain_cfg.chain.default_provider.clone())
454    }
455
456    #[inline]
457    fn is_public(&self) -> bool {
458        self.cfg.chain.announce
459    }
460
461    pub async fn run<
462        Ct,
463        #[cfg(feature = "session-server")] T: traits::session::HoprSessionServer + Clone + Send + 'static,
464    >(
465        &self,
466        cover_traffic: Option<Ct>,
467        #[cfg(feature = "session-server")] serve_handler: T,
468    ) -> errors::Result<(
469        hopr_transport::socket::HoprSocket<
470            futures::channel::mpsc::Receiver<ApplicationDataIn>,
471            futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
472        >,
473        HashMap<state::HoprLibProcesses, AbortHandle>,
474    )>
475    where
476        Ct: TrafficGeneration + Send + Sync + 'static,
477    {
478        self.error_if_not_in_state(
479            state::HoprState::Uninitialized,
480            "Cannot start the hopr node multiple times".into(),
481        )?;
482
483        #[cfg(feature = "testing")]
484        warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
485
486        info!(
487            address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
488            "Node is not started, please fund this node",
489        );
490
491        wait_for_funds(
492            *MIN_NATIVE_BALANCE,
493            *SUGGESTED_NATIVE_BALANCE,
494            Duration::from_secs(200),
495            &self.hopr_chain_api,
496        )
497        .await?;
498
499        let mut processes: HashMap<state::HoprLibProcesses, AbortHandle> = HashMap::new();
500
501        info!("Starting the node...");
502
503        self.state.store(state::HoprState::Initializing, Ordering::Relaxed);
504
505        let balance: XDaiBalance = self.get_balance().await?;
506        let minimum_balance = *constants::MIN_NATIVE_BALANCE;
507
508        info!(
509            address = %self.hopr_chain_api.me_onchain(),
510            %balance,
511            %minimum_balance,
512            "Node information"
513        );
514
515        if balance.le(&minimum_balance) {
516            return Err(HoprLibError::GeneralError(
517                "Cannot start the node without a sufficiently funded wallet".to_string(),
518            ));
519        }
520
521        #[cfg(not(feature = "testing"))]
522        {
523            // Once we are able to query the chain,
524            // check if the winning probability is configured correctly.
525            let network_min_win_prob = self.hopr_chain_api.minimum_incoming_ticket_win_prob().await?;
526            let configured_win_prob = self.cfg.protocol.outgoing_ticket_winning_prob;
527
528            if configured_win_prob
529                .and_then(|c| WinningProbability::try_from(c).ok())
530                .is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
531            {
532                return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
533                    "configured outgoing ticket winning probability is lower than the network minimum winning \
534                     probability: {configured_win_prob:?} < {network_min_win_prob}"
535                ))));
536            }
537        }
538
539        self.state.store(state::HoprState::Indexing, Ordering::Relaxed);
540
541        // Calculate the minimum capacity based on accounts (each account can generate 2 messages),
542        // plus 100 as an additional buffer
543        let minimum_capacity = self
544            .hopr_chain_api
545            .count_accounts(AccountSelector {
546                public_only: true,
547                ..Default::default()
548            })
549            .await?
550            .saturating_mul(2)
551            .saturating_add(100);
552
553        let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
554            .ok()
555            .and_then(|s| s.trim().parse::<usize>().ok())
556            .filter(|&c| c > 0)
557            .unwrap_or(2048)
558            .max(minimum_capacity);
559
560        debug!(
561            capacity = chain_discovery_events_capacity,
562            minimum_required = minimum_capacity,
563            "Creating chain discovery events channel"
564        );
565        let (indexer_peer_update_tx, indexer_peer_update_rx) =
566            futures::channel::mpsc::channel::<PeerDiscovery>(chain_discovery_events_capacity);
567
568        let indexer_event_pipeline = helpers::chain_events_to_transport_events(
569            self.hopr_chain_api.subscribe()?,
570            self.me_onchain(),
571            self.multistrategy.clone(),
572            self.channel_graph.clone(),
573            self.node_db.clone(),
574        );
575
576        spawn(async move {
577            let result = indexer_event_pipeline
578                .map(Ok)
579                .forward(indexer_peer_update_tx)
580                .inspect(|result| {
581                    tracing::warn!(
582                        ?result,
583                        task = "indexer -> transport",
584                        "long-running background task finished"
585                    )
586                })
587                .await;
588
589            result.expect("The index to transport event chain failed")
590        });
591
592        info!("Start the chain process and sync the indexer");
593        for (id, proc) in self.hopr_chain_api.start().await?.into_iter() {
594            let nid = match id {
595                HoprChainProcess::Indexer => state::HoprLibProcesses::Indexing,
596                HoprChainProcess::OutgoingOnchainActionQueue => state::HoprLibProcesses::OutgoingOnchainActionQueue,
597            };
598            processes.insert(nid, proc);
599        }
600
601        info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
602
603        // Check Safe-module status:
604        // 1) if the node is already included into the module
605        // 2) if the module is enabled in the safe
606        // 3) if the safe is the owner of the module
607        // if any of the conditions is not met, return error
608        if !self.hopr_chain_api.check_node_safe_module_status().await? {
609            return Err(HoprLibError::ChainApi(HoprChainError::Api(
610                "Safe and module are not configured correctly".into(),
611            )));
612        }
613
614        // Possibly register a node-safe pair to NodeSafeRegistry.
615        // Following that, the connector is set to use safe tx variants.
616
617        if self
618            .hopr_chain_api
619            .can_register_with_safe(&self.cfg.safe_module.safe_address)
620            .await?
621        {
622            info!("Registering safe by node");
623
624            if self.me_onchain() == self.cfg.safe_module.safe_address {
625                return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
626            }
627
628            if let Err(error) = self
629                .hopr_chain_api
630                .register_safe(&self.cfg.safe_module.safe_address)
631                .await?
632                .await
633            {
634                // Intentionally ignoring the errored state
635                error!(%error, "Failed to register node with safe")
636            }
637        }
638
639        if self.is_public() {
640            // At this point the node is already registered with Safe, so
641            // we can announce via Safe-compliant TX
642
643            let multiaddresses_to_announce = self.transport_api.announceable_multiaddresses();
644
645            // The announcement is intentionally not awaited until confirmation
646            match self
647                .hopr_chain_api
648                .announce(&multiaddresses_to_announce, &self.me)
649                .await
650            {
651                Ok(_) => info!(?multiaddresses_to_announce, "Announcing node on chain",),
652                Err(AnnouncementError::AlreadyAnnounced) => {
653                    info!(multiaddresses_announced = ?multiaddresses_to_announce, "Node already announced on chain")
654                }
655                // If the announcement fails, we keep going to prevent the node from retrying
656                // after restart.
657                // Functionality is limited, and users must check the logs for errors.
658                Err(error) => error!(%error, "Failed to transmit node announcement"),
659            }
660        }
661
662        {
663            // Sync key ids from indexed Accounts
664
665            // Sync the Channel graph
666            let channel_graph = self.channel_graph.clone();
667            let mut cg = channel_graph.write_arc().await;
668
669            info!("Syncing channels from the previous runs");
670            let mut channel_stream = self
671                .hopr_chain_api
672                .stream_channels(
673                    ChannelSelector::default()
674                        .with_allowed_states(&[
675                            ChannelStatusDiscriminants::Open,
676                            ChannelStatusDiscriminants::PendingToClose,
677                        ])
678                        .with_closure_time_range(Utc::now()..),
679                )
680                .await?;
681            while let Some(channel) = channel_stream.next().await {
682                cg.update_channel(channel);
683            }
684
685            // Initialize node latencies and scores in the channel graph:
686            // Sync only those nodes that we know that had a good quality
687            // Other nodes will be repopulated into the channel graph during heartbeat
688            // rounds.
689            info!("Syncing peer qualities from the previous runs");
690            let min_quality_to_sync: f64 = std::env::var("HOPR_MIN_PEER_QUALITY_TO_SYNC")
691                .map_err(|e| e.to_string())
692                .and_then(|v| std::str::FromStr::from_str(&v).map_err(|_| "parse error".to_string()))
693                .unwrap_or_else(|error| {
694                    warn!(error, "invalid value for HOPR_MIN_PEER_QUALITY_TO_SYNC env variable");
695                    constants::DEFAULT_MIN_QUALITY_TO_SYNC
696                });
697
698            let mut peer_stream = self
699                .node_db
700                .get_network_peers(Default::default(), false)
701                .await?
702                .filter(|status| futures::future::ready(status.quality >= min_quality_to_sync));
703
704            while let Some(peer) = peer_stream.next().await {
705                if let Some(key) = self.hopr_chain_api.packet_key_to_chain_key(&peer.id.0).await? {
706                    // For nodes that had a good quality, we assign a perfect score
707                    cg.update_node_score(&key, NodeScoreUpdate::Initialize(peer.last_seen_latency, 1.0));
708                } else {
709                    error!(peer = %peer.id.1, "Could not translate peer information");
710                }
711            }
712
713            info!(
714                channels = cg.count_channels(),
715                nodes = cg.count_nodes(),
716                "Channel graph sync complete"
717            );
718        }
719
720        // notifier on acknowledged ticket reception
721        let multi_strategy_ack_ticket = self.multistrategy.clone();
722
723        let ack_ticket_channel_capacity = std::env::var("HOPR_INTERNAL_ACKED_TICKET_CHANNEL_CAPACITY")
724            .ok()
725            .and_then(|s| s.trim().parse::<usize>().ok())
726            .filter(|&c| c > 0)
727            .unwrap_or(2048);
728
729        debug!(
730            capacity = ack_ticket_channel_capacity,
731            "Creating acknowledged ticket channel"
732        );
733        let (on_ack_tkt_tx, mut on_ack_tkt_rx) = channel::<AcknowledgedTicket>(ack_ticket_channel_capacity);
734        self.node_db.start_ticket_processing(Some(on_ack_tkt_tx))?;
735
736        processes.insert(
737            state::HoprLibProcesses::OnReceivedAcknowledgement,
738            hopr_async_runtime::spawn_as_abortable!(async move {
739                while let Some(ack) = on_ack_tkt_rx.next().await {
740                    if let Err(error) = hopr_strategy::strategy::SingularStrategy::on_acknowledged_winning_ticket(
741                        &*multi_strategy_ack_ticket,
742                        &ack,
743                    )
744                    .await
745                    {
746                        error!(%error, "Failed to process acknowledged winning ticket with the strategy");
747                    }
748                }
749
750                tracing::warn!(
751                    task = %state::HoprLibProcesses::OnReceivedAcknowledgement,
752                    "long-running background task finished"
753                )
754            }),
755        );
756
757        let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
758            .ok()
759            .and_then(|s| s.trim().parse::<usize>().ok())
760            .filter(|&c| c > 0)
761            .unwrap_or(256);
762
763        debug!(
764            capacity = incoming_session_channel_capacity,
765            "Creating incoming session channel"
766        );
767        let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
768
769        #[cfg(feature = "session-server")]
770        {
771            processes.insert(
772                state::HoprLibProcesses::SessionServer,
773                hopr_async_runtime::spawn_as_abortable!(
774                    _session_rx
775                        .for_each_concurrent(None, move |session| {
776                            let serve_handler = serve_handler.clone();
777                            async move {
778                                let session_id = *session.session.id();
779                                match serve_handler.process(session).await {
780                                    Ok(_) => debug!(
781                                        session_id = ?session_id,
782                                        "Client session processed successfully"
783                                    ),
784                                    Err(e) => error!(
785                                        session_id = ?session_id,
786                                        error = %e,
787                                        "Client session processing failed"
788                                    ),
789                                }
790                            }
791                        })
792                        .inspect(|_| tracing::warn!(
793                            task = %state::HoprLibProcesses::SessionServer,
794                            "long-running background task finished"
795                        ))
796                ),
797            );
798        }
799
800        info!("Starting transport");
801
802        let (hopr_socket, transport_processes) = self
803            .transport_api
804            .run(cover_traffic, indexer_peer_update_rx, session_tx)
805            .await?;
806
807        for (id, proc) in transport_processes.into_iter() {
808            processes.insert(id.into(), proc);
809        }
810
811        let db_clone = self.node_db.clone();
812        processes.insert(
813            state::HoprLibProcesses::TicketIndexFlush,
814            hopr_async_runtime::spawn_as_abortable!(
815                Box::pin(execute_on_tick(
816                    Duration::from_secs(5),
817                    move || {
818                        let db_clone = db_clone.clone();
819                        async move {
820                            match db_clone.persist_outgoing_ticket_indices().await {
821                                Ok(n) => debug!(count = n, "Successfully flushed states of outgoing ticket indices"),
822                                Err(e) => error!(error = %e, "Failed to flush ticket indices"),
823                            }
824                        }
825                    },
826                    "flush the states of outgoing ticket indices".into(),
827                ))
828                .inspect(|_| tracing::warn!(
829                    task = %state::HoprLibProcesses::TicketIndexFlush,
830                    "long-running background task finished"
831                ))
832            ),
833        );
834
835        // NOTE: after the chain is synced, we can reset tickets which are considered
836        // redeemed but on-chain state does not align with that. This implies there was a problem
837        // right when the transaction was sent on-chain. In such cases, we simply let it retry and
838        // handle errors appropriately.
839        let mut channels = self
840            .hopr_chain_api
841            .stream_channels(ChannelSelector {
842                destination: self.me_onchain().into(),
843                ..Default::default()
844            })
845            .await?;
846
847        while let Some(channel) = channels.next().await {
848            self.node_db
849                .update_ticket_states_and_fetch(
850                    TicketSelector::from(&channel)
851                        .with_state(AcknowledgedTicketStatus::BeingRedeemed)
852                        .with_index_range(channel.ticket_index.as_u64()..),
853                    AcknowledgedTicketStatus::Untouched,
854                )
855                .await?
856                .for_each(|ticket| {
857                    info!(%ticket, "fixed next out-of-sync ticket");
858                    futures::future::ready(())
859                })
860                .await;
861        }
862
863        // NOTE: strategy ticks must start after the chain is synced, otherwise
864        // the strategy would react to historical data and drain through the native
865        // balance on chain operations not relevant for the present network state
866        let multi_strategy = self.multistrategy.clone();
867        let strategy_interval = self.cfg.strategy.execution_interval;
868        processes.insert(
869            state::HoprLibProcesses::StrategyTick,
870            hopr_async_runtime::spawn_as_abortable!(
871                execute_on_tick(
872                    Duration::from_secs(strategy_interval),
873                    move || {
874                        let multi_strategy = multi_strategy.clone();
875
876                        async move {
877                            trace!(state = "started", "strategy tick");
878                            let _ = multi_strategy.on_tick().await;
879                            trace!(state = "finished", "strategy tick");
880                        }
881                    },
882                    "run strategies".into(),
883                )
884                .inspect(
885                    |_| tracing::warn!(task = %state::HoprLibProcesses::StrategyTick, "long-running background task finished")
886                )
887            ),
888        );
889
890        self.state.store(state::HoprState::Running, Ordering::Relaxed);
891
892        info!(
893            id = %self.me_peer_id(),
894            version = constants::APP_VERSION,
895            "NODE STARTED AND RUNNING"
896        );
897
898        #[cfg(all(feature = "prometheus", not(test)))]
899        METRIC_HOPR_NODE_INFO.set(
900            &[
901                &self.me.public().to_peerid_str(),
902                &self.me_onchain().to_string(),
903                &self.cfg.safe_module.safe_address.to_string(),
904                &self.cfg.safe_module.module_address.to_string(),
905            ],
906            1.0,
907        );
908
909        Ok((hopr_socket, processes))
910    }
911
912    // p2p transport =========
913    /// Own PeerId used in the libp2p transport layer
914    pub fn me_peer_id(&self) -> PeerId {
915        (*self.me.public()).into()
916    }
917
918    /// Get the list of all announced public nodes in the network
919    pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
920        Ok(self
921            .hopr_chain_api
922            .stream_accounts(AccountSelector {
923                public_only: true,
924                ..Default::default()
925            })
926            .await?
927            .filter_map(|entry| {
928                futures::future::ready(
929                    entry
930                        .get_multiaddr()
931                        .map(|maddr| (PeerId::from(entry.public_key), entry.chain_addr, vec![maddr])),
932                )
933            })
934            .collect()
935            .await)
936    }
937
938    /// Ping another node in the network based on the PeerId
939    ///
940    /// Returns the RTT (round trip time), i.e. how long it took for the ping to return.
941    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
942        self.error_if_not_in_state(
943            state::HoprState::Running,
944            "Node is not ready for on-chain operations".into(),
945        )?;
946
947        Ok(self.transport_api.ping(peer).await?)
948    }
949
950    /// Create a client session connection returning a session object that implements
951    /// [`futures::io::AsyncRead`] and [`futures::io::AsyncWrite`] and can bu used as a read/write binary session.
952    #[cfg(feature = "session-client")]
953    pub async fn connect_to(
954        &self,
955        destination: Address,
956        target: SessionTarget,
957        cfg: SessionClientConfig,
958    ) -> errors::Result<HoprSession> {
959        self.error_if_not_in_state(
960            state::HoprState::Running,
961            "Node is not ready for on-chain operations".into(),
962        )?;
963
964        let backoff = backon::ConstantBuilder::default()
965            .with_max_times(self.cfg.session.establish_max_retries as usize)
966            .with_delay(self.cfg.session.establish_retry_timeout)
967            .with_jitter();
968
969        use backon::Retryable;
970
971        Ok((|| {
972            let cfg = cfg.clone();
973            let target = target.clone();
974            async { self.transport_api.new_session(destination, target, cfg).await }
975        })
976        .retry(backoff)
977        .sleep(backon::FuturesTimerSleeper)
978        .await?)
979    }
980
981    /// Sends keep-alive to the given [`HoprSessionId`], making sure the session is not
982    /// closed due to inactivity.
983    #[cfg(feature = "session-client")]
984    pub async fn keep_alive_session(&self, id: &HoprSessionId) -> errors::Result<()> {
985        self.error_if_not_in_state(
986            state::HoprState::Running,
987            "Node is not ready for on-chain operations".into(),
988        )?;
989        Ok(self.transport_api.probe_session(id).await?)
990    }
991
992    #[cfg(feature = "session-client")]
993    pub async fn get_session_surb_balancer_config(
994        &self,
995        id: &HoprSessionId,
996    ) -> errors::Result<Option<SurbBalancerConfig>> {
997        self.error_if_not_in_state(
998            state::HoprState::Running,
999            "Node is not ready for on-chain operations".into(),
1000        )?;
1001        Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
1002    }
1003
1004    #[cfg(feature = "session-client")]
1005    pub async fn update_session_surb_balancer_config(
1006        &self,
1007        id: &HoprSessionId,
1008        cfg: SurbBalancerConfig,
1009    ) -> errors::Result<()> {
1010        self.error_if_not_in_state(
1011            state::HoprState::Running,
1012            "Node is not ready for on-chain operations".into(),
1013        )?;
1014        Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
1015    }
1016
1017    /// List all multiaddresses announced by this node
1018    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1019        self.transport_api.local_multiaddresses()
1020    }
1021
1022    /// List all multiaddresses on which the node is listening
1023    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1024        self.transport_api.listening_multiaddresses().await
1025    }
1026
1027    /// List all multiaddresses observed for a PeerId
1028    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1029        self.transport_api.network_observed_multiaddresses(peer).await
1030    }
1031
1032    /// List all multiaddresses announced on-chain for the given node.
1033    pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Vec<Multiaddr> {
1034        let peer = *peer;
1035        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
1036        let pubkey =
1037            match hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer)).await {
1038                Ok(k) => k,
1039                Err(e) => {
1040                    error!(%peer, error = %e, "failed to convert peer id to off-chain key");
1041                    return vec![];
1042                }
1043            };
1044
1045        match self.hopr_chain_api.find_account_by_packet_key(&pubkey).await {
1046            Ok(Some(entry)) => Vec::from_iter(entry.get_multiaddr()),
1047            Ok(None) => {
1048                error!(%peer, "no information");
1049                vec![]
1050            }
1051            Err(e) => {
1052                error!(%peer, error = %e, "failed to retrieve information");
1053                vec![]
1054            }
1055        }
1056    }
1057
1058    // Network =========
1059
1060    /// Get measured network health
1061    pub async fn network_health(&self) -> Health {
1062        self.transport_api.network_health().await
1063    }
1064
1065    /// List all peers connected to this
1066    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
1067        Ok(self.transport_api.network_connected_peers().await?)
1068    }
1069
1070    /// Get all data collected from the network relevant for a PeerId
1071    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
1072        Ok(self.transport_api.network_peer_info(peer).await?)
1073    }
1074
1075    /// Get peers connected peers with quality higher than some value
1076    pub async fn all_network_peers(
1077        &self,
1078        minimum_quality: f64,
1079    ) -> errors::Result<Vec<(Option<Address>, PeerId, PeerStatus)>> {
1080        Ok(
1081            futures::stream::iter(self.transport_api.network_connected_peers().await?)
1082                .filter_map(|peer| async move {
1083                    if let Ok(Some(info)) = self.transport_api.network_peer_info(&peer).await {
1084                        if info.get_average_quality() >= minimum_quality {
1085                            Some((peer, info))
1086                        } else {
1087                            None
1088                        }
1089                    } else {
1090                        None
1091                    }
1092                })
1093                .filter_map(|(peer_id, info)| async move {
1094                    let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
1095                    Some((address, peer_id, info))
1096                })
1097                .collect::<Vec<_>>()
1098                .await,
1099        )
1100    }
1101
1102    // Ticket ========
1103    /// Get all tickets in a channel specified by [`prelude::Hash`]
1104    pub async fn tickets_in_channel(&self, channel: &prelude::Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
1105        Ok(self.transport_api.tickets_in_channel(channel).await?)
1106    }
1107
1108    /// Get all tickets
1109    pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
1110        Ok(self.transport_api.all_tickets().await?)
1111    }
1112
1113    /// Get statistics for all tickets
1114    pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
1115        Ok(self.transport_api.ticket_statistics().await?)
1116    }
1117
1118    /// Reset the ticket metrics to zero
1119    pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
1120        Ok(self.node_db.reset_ticket_statistics().await?)
1121    }
1122
1123    // DB ============
1124    pub fn peer_resolver(&self) -> &impl ChainKeyOperations {
1125        &self.hopr_chain_api
1126    }
1127
1128    // Chain =========
1129    pub fn me_onchain(&self) -> Address {
1130        self.hopr_chain_api.me_onchain()
1131    }
1132
1133    /// Get ticket price
1134    pub async fn get_ticket_price(&self) -> errors::Result<HoprBalance> {
1135        Ok(self.hopr_chain_api.minimum_ticket_price().await?)
1136    }
1137
1138    /// Get minimum incoming ticket winning probability
1139    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
1140        Ok(self.hopr_chain_api.minimum_incoming_ticket_win_prob().await?)
1141    }
1142
1143    /// List of all accounts announced on the chain
1144    pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1145        Ok(self
1146            .hopr_chain_api
1147            .stream_accounts(AccountSelector {
1148                public_only: true,
1149                ..Default::default()
1150            })
1151            .await?
1152            .collect()
1153            .await)
1154    }
1155
1156    /// Get the channel entry from Hash.
1157    /// @returns the channel entry of those two nodes
1158    pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1159        Ok(self.hopr_chain_api.channel_by_id(channel_id).await?)
1160    }
1161
1162    /// Get the channel entry between source and destination node.
1163    /// @param src Address
1164    /// @param dest Address
1165    /// @returns the channel entry of those two nodes
1166    pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<Option<ChannelEntry>> {
1167        Ok(self.hopr_chain_api.channel_by_parties(src, dest).await?)
1168    }
1169
1170    /// List all channels open from a specified Address
1171    pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1172        Ok(self
1173            .hopr_chain_api
1174            .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1175                ChannelStatusDiscriminants::Closed,
1176                ChannelStatusDiscriminants::Open,
1177                ChannelStatusDiscriminants::PendingToClose,
1178            ]))
1179            .await?
1180            .collect()
1181            .await)
1182    }
1183
1184    /// List all channels open to a specified address
1185    pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1186        Ok(self
1187            .hopr_chain_api
1188            .stream_channels(
1189                ChannelSelector::default()
1190                    .with_destination(*dest)
1191                    .with_allowed_states(&[
1192                        ChannelStatusDiscriminants::Closed,
1193                        ChannelStatusDiscriminants::Open,
1194                        ChannelStatusDiscriminants::PendingToClose,
1195                    ]),
1196            )
1197            .await?
1198            .collect()
1199            .await)
1200    }
1201
1202    /// List all channels
1203    pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1204        Ok(self
1205            .hopr_chain_api
1206            .stream_channels(ChannelSelector::default().with_allowed_states(&[
1207                ChannelStatusDiscriminants::Closed,
1208                ChannelStatusDiscriminants::Open,
1209                ChannelStatusDiscriminants::PendingToClose,
1210            ]))
1211            .await?
1212            .collect()
1213            .await)
1214    }
1215
1216    /// List all corrupted channels
1217    pub async fn corrupted_channels(&self) -> errors::Result<Vec<CorruptedChannelEntry>> {
1218        Ok(self.hopr_chain_api.corrupted_channels().await?)
1219    }
1220
1221    /// Current safe allowance balance
1222    pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1223        Ok(self.hopr_chain_api.safe_allowance().await?)
1224    }
1225
1226    /// Withdraw on-chain assets to a given address
1227    /// @param recipient the account where the assets should be transferred to
1228    /// @param amount how many tokens to be transferred
1229    pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1230        self.error_if_not_in_state(
1231            state::HoprState::Running,
1232            "Node is not ready for on-chain operations".into(),
1233        )?;
1234
1235        let awaiter = self.hopr_chain_api.withdraw(amount, &recipient).await?;
1236
1237        Ok(awaiter.await?)
1238    }
1239
1240    /// Withdraw on-chain native assets to a given address
1241    /// @param recipient the account where the assets should be transferred to
1242    /// @param amount how many tokens to be transferred
1243    pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<prelude::Hash> {
1244        self.error_if_not_in_state(
1245            state::HoprState::Running,
1246            "Node is not ready for on-chain operations".into(),
1247        )?;
1248
1249        let awaiter = self.hopr_chain_api.withdraw(amount, &recipient).await?;
1250
1251        Ok(awaiter.await?)
1252    }
1253
1254    pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1255        self.error_if_not_in_state(
1256            state::HoprState::Running,
1257            "Node is not ready for on-chain operations".into(),
1258        )?;
1259
1260        let (channel_id, tx_hash) = self.hopr_chain_api.open_channel(destination, amount).await?.await?;
1261
1262        Ok(OpenChannelResult { tx_hash, channel_id })
1263    }
1264
1265    pub async fn fund_channel(&self, channel_id: &prelude::Hash, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1266        self.error_if_not_in_state(
1267            state::HoprState::Running,
1268            "Node is not ready for on-chain operations".into(),
1269        )?;
1270
1271        let awaiter = self.hopr_chain_api.fund_channel(channel_id, amount).await?;
1272
1273        Ok(awaiter.await?)
1274    }
1275
1276    pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> errors::Result<CloseChannelResult> {
1277        self.error_if_not_in_state(
1278            state::HoprState::Running,
1279            "Node is not ready for on-chain operations".into(),
1280        )?;
1281
1282        let (status, tx_hash) = self.hopr_chain_api.close_channel(channel_id).await?.await?;
1283
1284        Ok(CloseChannelResult { tx_hash, status })
1285    }
1286
1287    pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1288        Ok(self.hopr_chain_api.channel_closure_notice_period().await?)
1289    }
1290
1291    pub async fn redeem_all_tickets<B: Into<HoprBalance>>(
1292        &self,
1293        min_value: B,
1294        only_aggregated: bool,
1295    ) -> errors::Result<()> {
1296        self.error_if_not_in_state(
1297            state::HoprState::Running,
1298            "Node is not ready for on-chain operations".into(),
1299        )?;
1300
1301        let min_value = min_value.into();
1302        let chain_api = self.hopr_chain_api.clone();
1303
1304        // Does not need to be done concurrently, because we do not await each channel's redemption
1305        self.hopr_chain_api
1306            .stream_channels(
1307                ChannelSelector::default()
1308                    .with_destination(chain_api.me_onchain())
1309                    .with_allowed_states(&[
1310                        ChannelStatusDiscriminants::Open,
1311                        ChannelStatusDiscriminants::PendingToClose,
1312                    ]),
1313            )
1314            .await?
1315            .for_each(|channel| {
1316                let chain_api = chain_api.clone();
1317                async move {
1318                    match chain_api
1319                        .redeem_tickets_via_selector(
1320                            TicketSelector::from(&channel)
1321                                .with_amount(min_value..)
1322                                .with_aggregated_only(only_aggregated)
1323                                .with_index_range(channel.ticket_index.as_u64()..)
1324                                .with_state(AcknowledgedTicketStatus::Untouched),
1325                        )
1326                        .await
1327                    {
1328                        Ok(awaiters) => info!(count = awaiters.len(), %channel, "redeemed tickets in channel"),
1329                        Err(error) => error!(%error, %channel, "failed to redeem tickets"),
1330                    }
1331                }
1332            })
1333            .await;
1334
1335        Ok(())
1336    }
1337
1338    pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1339        &self,
1340        counterparty: &Address,
1341        min_value: B,
1342        only_aggregated: bool,
1343    ) -> errors::Result<usize> {
1344        self.redeem_tickets_in_channel(
1345            &generate_channel_id(counterparty, &self.me_onchain()),
1346            min_value,
1347            only_aggregated,
1348        )
1349        .await
1350    }
1351
1352    pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1353        &self,
1354        channel_id: &Hash,
1355        min_value: B,
1356        only_aggregated: bool,
1357    ) -> errors::Result<usize> {
1358        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1359
1360        let channel = self
1361            .hopr_chain_api
1362            .channel_by_id(channel_id)
1363            .await?
1364            .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1365
1366        let out = self
1367            .hopr_chain_api
1368            .redeem_tickets_via_selector(
1369                TicketSelector::from(channel)
1370                    .with_amount(min_value.into()..)
1371                    .with_aggregated_only(only_aggregated)
1372                    .with_index_range(channel.ticket_index.as_u64()..)
1373                    .with_state(AcknowledgedTicketStatus::Untouched),
1374            )
1375            .await?;
1376
1377        Ok(out.len())
1378    }
1379
1380    pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1381        self.error_if_not_in_state(
1382            state::HoprState::Running,
1383            "Node is not ready for on-chain operations".into(),
1384        )?;
1385
1386        // We do not await the on-chain confirmation
1387        #[allow(clippy::let_underscore_future)]
1388        let _ = self.hopr_chain_api.redeem_ticket(ack_ticket).await?;
1389
1390        Ok(())
1391    }
1392
1393    pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1394        let peer_id = *peer_id;
1395        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
1396        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer_id))
1397            .await
1398            .map_err(|e| HoprLibError::GeneralError(format!("failed to convert peer id to off-chain key: {}", e)))?;
1399
1400        Ok(self.hopr_chain_api.packet_key_to_chain_key(&pubkey).await?)
1401    }
1402
1403    pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1404        Ok(self
1405            .hopr_chain_api
1406            .chain_key_to_packet_key(address)
1407            .await
1408            .map(|pk| pk.map(|v| v.into()))?)
1409    }
1410
1411    pub async fn export_channel_graph(&self, cfg: GraphExportConfig) -> String {
1412        self.channel_graph.read_arc().await.as_dot(cfg)
1413    }
1414
1415    pub async fn export_raw_channel_graph(&self) -> errors::Result<String> {
1416        let cg = self.channel_graph.read_arc().await;
1417        serde_json::to_string(cg.deref()).map_err(|e| HoprLibError::GeneralError(e.to_string()))
1418    }
1419
1420    pub async fn get_indexer_state(&self) -> errors::Result<hopr_chain_api::IndexerStateInfo> {
1421        Ok(self.hopr_chain_api.get_indexer_state().await?)
1422    }
1423
1424    // === telemetry
1425    /// Prometheus formatted metrics collected by the hopr-lib components.
1426    pub fn collect_hopr_metrics() -> errors::Result<String> {
1427        cfg_if::cfg_if! {
1428            if #[cfg(all(feature = "prometheus", not(test)))] {
1429                hopr_metrics::gather_all_metrics().map_err(|e| HoprLibError::GeneralError(e.to_string()))
1430            } else {
1431        Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
1432
1433            }
1434        }
1435    }
1436}