hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15
16/// Helper functions.
17mod helpers;
18
19/// Configuration-related public types
20pub mod config;
21/// Various public constants.
22pub mod constants;
23/// Lists all errors thrown from this library.
24pub mod errors;
25
26/// Utility module with helper types and functionality over hopr-lib behavior.
27pub mod utils;
28
29/// Public traits for interactions with this library.
30pub mod traits;
31
32/// Functionality related to the HOPR node state.
33pub mod state;
34
35#[cfg(any(feature = "testing", test))]
36pub mod testing;
37
38/// Re-exports of libraries necessary for API and interface operations.
39#[doc(hidden)]
40pub mod exports {
41    pub mod chain {
42        pub use hopr_chain_types as types;
43    }
44
45    pub mod types {
46        pub use hopr_primitive_types as primitive;
47    }
48    pub mod crypto {
49        pub use hopr_crypto_keypair as keypair;
50        pub use hopr_crypto_types as types;
51    }
52
53    pub mod network {
54        pub use hopr_network_types as types;
55    }
56
57    pub use hopr_transport as transport;
58}
59
60/// Export of relevant types for easier integration.
61#[doc(hidden)]
62pub mod prelude {
63    pub use super::exports::{
64        crypto::{
65            keypair::key_pair::HoprKeys,
66            types::prelude::{ChainKeypair, Hash, OffchainKeypair},
67        },
68        network::types::{
69            prelude::ForeignDataMode,
70            udp::{ConnectedUdpStream, UdpStreamParallelism},
71        },
72        transport::{OffchainPublicKey, socket::HoprSocket},
73        types::primitive::prelude::Address,
74    };
75}
76
77use std::{
78    collections::HashMap,
79    ops::Deref,
80    path::PathBuf,
81    str::FromStr,
82    sync::{Arc, atomic::Ordering},
83    time::Duration,
84};
85
86use async_lock::RwLock;
87pub use async_trait::async_trait;
88use errors::{HoprLibError, HoprStatusError};
89use futures::{FutureExt, StreamExt, channel::mpsc::channel, future::AbortHandle};
90use hopr_api::{
91    chain::{
92        AccountSelector, AnnouncementError, ChainEvents, ChainKeyOperations, ChainReadAccountOperations,
93        ChainReadChannelOperations, ChainValues, ChainWriteAccountOperations, ChainWriteChannelOperations,
94        ChainWriteTicketOperations, ChannelSelector,
95    },
96    db::{HoprDbPeersOperations, HoprDbTicketOperations, PeerStatus, TicketSelector},
97};
98use hopr_async_runtime::prelude::spawn;
99pub use hopr_chain_api::config::{
100    Addresses as NetworkContractAddresses, EnvironmentType, Network as ChainNetwork, ProtocolsConfig,
101};
102use hopr_chain_api::{HoprChain, HoprChainProcess, config::ChainNetworkConfig, errors::HoprChainError, wait_for_funds};
103use hopr_chain_types::ContractAddresses;
104pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
105use hopr_crypto_types::prelude::Hash;
106use hopr_db_node::{HoprNodeDb, HoprNodeDbConfig};
107pub use hopr_internal_types::prelude::*;
108pub use hopr_network_types::prelude::{DestinationRouting, IpProtocol, RoutingOptions};
109pub use hopr_path::channel_graph::GraphExportConfig;
110use hopr_path::channel_graph::{ChannelGraph, ChannelGraphConfig, NodeScoreUpdate};
111#[cfg(all(feature = "prometheus", not(test)))]
112use hopr_platform::time::native::current_time;
113pub use hopr_primitive_types::prelude::*;
114pub use hopr_strategy::Strategy;
115use hopr_strategy::strategy::{MultiStrategy, SingularStrategy};
116#[cfg(feature = "runtime-tokio")]
117pub use hopr_transport::transfer_session;
118pub use hopr_transport::{
119    ApplicationData, ApplicationDataIn, ApplicationDataOut, HalfKeyChallenge, Health, HoprSession, IncomingSession,
120    Keypair, Multiaddr, OffchainKeypair as HoprOffchainKeypair, PeerId, PingQueryReplier, ProbeError, SESSION_MTU,
121    SURB_SIZE, ServiceId, SessionCapabilities, SessionCapability, SessionClientConfig, SessionId as HoprSessionId,
122    SessionManagerError, SessionTarget, SurbBalancerConfig, Tag, TicketStatistics, TransportSessionError,
123    config::{HostConfig, HostType, looks_like_domain},
124    errors::{HoprTransportError, NetworkingError, ProtocolError},
125};
126use hopr_transport::{
127    ChainKeypair, HoprTransport, HoprTransportConfig, OffchainKeypair, PeerDiscovery, execute_on_tick,
128};
129use tracing::{debug, error, info, trace, warn};
130
131use crate::{
132    config::SafeModule,
133    constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
134    state::HoprState,
135    traits::chain::{CloseChannelResult, OpenChannelResult},
136};
137
138#[cfg(all(feature = "prometheus", not(test)))]
139lazy_static::lazy_static! {
140    static ref METRIC_PROCESS_START_TIME:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
141        "hopr_start_time",
142        "The unix timestamp in seconds at which the process was started"
143    ).unwrap();
144    static ref METRIC_HOPR_LIB_VERSION:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
145        "hopr_lib_version",
146        "Executed version of hopr-lib",
147        &["version"]
148    ).unwrap();
149    static ref METRIC_HOPR_NODE_INFO:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
150        "hopr_node_addresses",
151        "Node on-chain and off-chain addresses",
152        &["peerid", "address", "safe_address", "module_address"]
153    ).unwrap();
154}
155
156/// HOPR main object providing the entire HOPR node functionality
157///
158/// Instantiating this object creates all processes and objects necessary for
159/// running the HOPR node. Once created, the node can be started using the
160/// `run()` method.
161///
162/// Externally offered API should be sufficient to perform all necessary tasks
163/// with the HOPR node manually, but it is advised to create such a configuration
164/// that manual interaction is unnecessary.
165///
166/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
167pub struct Hopr {
168    me: OffchainKeypair,
169    cfg: config::HoprLibConfig,
170    state: Arc<state::AtomicHoprState>,
171    transport_api: HoprTransport<HoprNodeDb, HoprChain>,
172    hopr_chain_api: HoprChain,
173    node_db: HoprNodeDb,
174    // objects that could be removed pending architectural cleanup ========
175    chain_cfg: ChainNetworkConfig,
176    channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
177    multistrategy: Arc<MultiStrategy>,
178}
179
180impl Hopr {
181    pub fn new(
182        mut cfg: config::HoprLibConfig,
183        me: &OffchainKeypair,
184        me_onchain: &ChainKeypair,
185    ) -> crate::errors::Result<Self> {
186        if hopr_crypto_random::is_rng_fixed() {
187            warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
188        }
189
190        let multiaddress: Multiaddr = (&cfg.host).try_into()?;
191
192        let db_path: PathBuf = [&cfg.db.data, "node_db"].iter().collect();
193        info!(path = ?db_path, "Initiating DB");
194
195        if cfg.db.force_initialize {
196            info!("Force cleaning up existing database");
197            hopr_platform::file::native::remove_dir_all(db_path.as_path()).map_err(|e| {
198                HoprLibError::GeneralError(format!(
199                    "Failed to remove the existing DB directory at '{db_path:?}': {e}"
200                ))
201            })?;
202            cfg.db.initialize = true
203        }
204
205        // create DB dir if it does not exist
206        if let Some(parent_dir_path) = db_path.as_path().parent() {
207            if !parent_dir_path.is_dir() {
208                std::fs::create_dir_all(parent_dir_path).map_err(|e| {
209                    HoprLibError::GeneralError(format!(
210                        "Failed to create DB parent directory at '{parent_dir_path:?}': {e}"
211                    ))
212                })?
213            }
214        }
215
216        let db_cfg = HoprNodeDbConfig {
217            create_if_missing: cfg.db.initialize,
218            force_create: cfg.db.force_initialize,
219            log_slow_queries: std::time::Duration::from_millis(150),
220            surb_ring_buffer_size: std::env::var("HOPR_PROTOCOL_SURB_RB_SIZE")
221                .ok()
222                .and_then(|s| u64::from_str(&s).map(|v| v as usize).ok())
223                .unwrap_or_else(|| HoprNodeDbConfig::default().surb_ring_buffer_size),
224            surb_distress_threshold: std::env::var("HOPR_PROTOCOL_SURB_RB_DISTRESS")
225                .ok()
226                .and_then(|s| u64::from_str(&s).map(|v| v as usize).ok())
227                .unwrap_or_else(|| HoprNodeDbConfig::default().surb_distress_threshold),
228        };
229        let node_db = futures::executor::block_on(HoprNodeDb::new(db_path.as_path(), me_onchain.clone(), db_cfg))?;
230
231        if let Some(provider) = &cfg.chain.provider {
232            info!(provider, "Creating chain components using the custom provider");
233        } else {
234            info!("Creating chain components using the default provider");
235        }
236        let resolved_environment = hopr_chain_api::config::ChainNetworkConfig::new(
237            &cfg.chain.network,
238            crate::constants::APP_VERSION_COERCED,
239            cfg.chain.provider.as_deref(),
240            cfg.chain.max_rpc_requests_per_sec,
241            &mut cfg.chain.protocols,
242        )
243        .map_err(|e| HoprLibError::GeneralError(format!("Failed to resolve blockchain environment: {e}")))?;
244
245        let contract_addresses = ContractAddresses::from(&resolved_environment);
246        info!(
247            myself = me_onchain.public().to_hex(),
248            contract_addresses = ?contract_addresses,
249            "Resolved contract addresses",
250        );
251
252        let my_multiaddresses = vec![multiaddress];
253
254        let channel_graph = Arc::new(RwLock::new(ChannelGraph::new(
255            me_onchain.public().to_address(),
256            ChannelGraphConfig::default(),
257        )));
258
259        // TODO (4.0): replace this with new implementation that follows the chain traits from the hopr-api crate
260        let hopr_chain_api = hopr_chain_api::HoprChain::new(
261            me_onchain.clone(),
262            resolved_environment.clone(),
263            node_db.clone(),
264            &cfg.db.data,
265            cfg.safe_module.module_address,
266            ContractAddresses {
267                announcements: resolved_environment.announcements,
268                channels: resolved_environment.channels,
269                token: resolved_environment.token,
270                ticket_price_oracle: resolved_environment.ticket_price_oracle,
271                winning_probability_oracle: resolved_environment.winning_probability_oracle,
272                network_registry: resolved_environment.network_registry,
273                network_registry_proxy: resolved_environment.network_registry_proxy,
274                node_stake_v2_factory: resolved_environment.node_stake_v2_factory,
275                node_safe_registry: resolved_environment.node_safe_registry,
276                module_implementation: resolved_environment.module_implementation,
277            },
278            cfg.safe_module.safe_address,
279            hopr_chain_api::IndexerConfig {
280                start_block_number: resolved_environment.channel_contract_deploy_block as u64,
281                fast_sync: cfg.chain.fast_sync,
282                enable_logs_snapshot: cfg.chain.enable_logs_snapshot,
283                logs_snapshot_url: cfg.chain.logs_snapshot_url.clone(),
284                data_directory: cfg.db.data.clone(),
285            },
286        )?;
287
288        let hopr_transport_api = HoprTransport::new(
289            me,
290            me_onchain,
291            HoprTransportConfig {
292                transport: cfg.transport.clone(),
293                network: cfg.network_options.clone(),
294                protocol: cfg.protocol,
295                probe: cfg.probe,
296                session: cfg.session,
297            },
298            node_db.clone(),
299            hopr_chain_api.clone(),
300            channel_graph.clone(),
301            my_multiaddresses,
302        );
303
304        let multi_strategy = Arc::new(MultiStrategy::new(cfg.strategy.clone(), hopr_chain_api.clone()));
305        debug!(
306            strategies = tracing::field::debug(&multi_strategy),
307            "Initialized strategies"
308        );
309
310        #[cfg(all(feature = "prometheus", not(test)))]
311        {
312            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
313            METRIC_HOPR_LIB_VERSION.set(
314                &[const_format::formatcp!("{}", constants::APP_VERSION)],
315                f64::from_str(const_format::formatcp!(
316                    "{}.{}",
317                    env!("CARGO_PKG_VERSION_MAJOR"),
318                    env!("CARGO_PKG_VERSION_MINOR")
319                ))
320                .unwrap_or(0.0),
321            );
322
323            // Calling get_ticket_statistics will initialize the respective metrics on tickets
324            if let Err(e) = futures::executor::block_on(node_db.get_ticket_statistics(None)) {
325                error!(error = %e, "Failed to initialize ticket statistics metrics");
326            }
327        }
328
329        Ok(Self {
330            me: me.clone(),
331            cfg,
332            state: Arc::new(state::AtomicHoprState::new(state::HoprState::Uninitialized)),
333            transport_api: hopr_transport_api,
334            hopr_chain_api,
335            node_db,
336            chain_cfg: resolved_environment,
337            channel_graph,
338            multistrategy: multi_strategy,
339        })
340    }
341
342    fn error_if_not_in_state(&self, state: state::HoprState, error: String) -> errors::Result<()> {
343        if self.status() == state {
344            Ok(())
345        } else {
346            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
347        }
348    }
349
350    pub fn status(&self) -> state::HoprState {
351        self.state.load(Ordering::Relaxed)
352    }
353
354    pub fn network(&self) -> String {
355        self.cfg.chain.network.clone()
356    }
357
358    pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
359        Ok(self.hopr_chain_api.node_balance().await?)
360    }
361
362    pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
363        Ok(self.hopr_chain_api.safe_balance().await?)
364    }
365
366    pub fn get_safe_config(&self) -> SafeModule {
367        self.cfg.safe_module.clone()
368    }
369
370    pub fn chain_config(&self) -> ChainNetworkConfig {
371        self.chain_cfg.clone()
372    }
373
374    pub fn config(&self) -> &config::HoprLibConfig {
375        &self.cfg
376    }
377
378    pub fn get_provider(&self) -> String {
379        self.cfg
380            .chain
381            .provider
382            .clone()
383            .unwrap_or(self.chain_cfg.chain.default_provider.clone())
384    }
385
386    #[inline]
387    fn is_public(&self) -> bool {
388        self.cfg.chain.announce
389    }
390
391    pub async fn run<
392        #[cfg(feature = "session-server")] T: traits::session::HoprSessionServer + Clone + Send + 'static,
393    >(
394        &self,
395        #[cfg(feature = "session-server")] serve_handler: T,
396    ) -> errors::Result<(
397        hopr_transport::socket::HoprSocket<
398            futures::channel::mpsc::Receiver<ApplicationDataIn>,
399            futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
400        >,
401        HashMap<state::HoprLibProcesses, AbortHandle>,
402    )> {
403        self.error_if_not_in_state(
404            state::HoprState::Uninitialized,
405            "Cannot start the hopr node multiple times".into(),
406        )?;
407
408        info!(
409            address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
410            "Node is not started, please fund this node",
411        );
412
413        wait_for_funds(
414            *MIN_NATIVE_BALANCE,
415            *SUGGESTED_NATIVE_BALANCE,
416            Duration::from_secs(200),
417            &self.hopr_chain_api,
418        )
419        .await?;
420
421        let mut processes: HashMap<state::HoprLibProcesses, AbortHandle> = HashMap::new();
422
423        info!("Starting the node...");
424
425        self.state.store(state::HoprState::Initializing, Ordering::Relaxed);
426
427        let balance: XDaiBalance = self.get_balance().await?;
428        let minimum_balance = *constants::MIN_NATIVE_BALANCE;
429
430        info!(
431            address = %self.hopr_chain_api.me_onchain(),
432            %balance,
433            %minimum_balance,
434            "Node information"
435        );
436
437        if balance.le(&minimum_balance) {
438            return Err(HoprLibError::GeneralError(
439                "Cannot start the node without a sufficiently funded wallet".to_string(),
440            ));
441        }
442
443        // Once we are able to query the chain,
444        // check if the ticket price is configured correctly.
445        let network_min_ticket_price = self.hopr_chain_api.minimum_ticket_price().await?;
446        let configured_ticket_price = self.cfg.protocol.outgoing_ticket_price;
447        if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
448            return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
449                "configured outgoing ticket price is lower than the network minimum ticket price: \
450                 {configured_ticket_price:?} < {network_min_ticket_price}"
451            ))));
452        }
453        // Once we are able to query the chain,
454        // check if the winning probability is configured correctly.
455        let network_min_win_prob = self.hopr_chain_api.minimum_incoming_ticket_win_prob().await?;
456        let configured_win_prob = self.cfg.protocol.outgoing_ticket_winning_prob;
457        if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
458            && configured_win_prob
459                .and_then(|c| WinningProbability::try_from(c).ok())
460                .is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
461        {
462            return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
463                "configured outgoing ticket winning probability is lower than the network minimum winning \
464                 probability: {configured_win_prob:?} < {network_min_win_prob}"
465            ))));
466        }
467
468        self.state.store(state::HoprState::Indexing, Ordering::Relaxed);
469
470        // Calculate the minimum capacity based on accounts (each account can generate 2 messages),
471        // plus 100 as an additional buffer
472        let minimum_capacity = self
473            .hopr_chain_api
474            .count_accounts(AccountSelector {
475                public_only: true,
476                ..Default::default()
477            })
478            .await?
479            .saturating_mul(2)
480            .saturating_add(100);
481
482        let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
483            .ok()
484            .and_then(|s| s.trim().parse::<usize>().ok())
485            .filter(|&c| c > 0)
486            .unwrap_or(2048)
487            .max(minimum_capacity);
488
489        debug!(
490            capacity = chain_discovery_events_capacity,
491            minimum_required = minimum_capacity,
492            "Creating chain discovery events channel"
493        );
494        let (indexer_peer_update_tx, indexer_peer_update_rx) =
495            futures::channel::mpsc::channel::<PeerDiscovery>(chain_discovery_events_capacity);
496
497        let indexer_event_pipeline = helpers::chain_events_to_transport_events(
498            self.hopr_chain_api.subscribe()?,
499            self.me_onchain(),
500            self.multistrategy.clone(),
501            self.channel_graph.clone(),
502            self.node_db.clone(),
503        );
504
505        spawn(async move {
506            let result = indexer_event_pipeline
507                .map(Ok)
508                .forward(indexer_peer_update_tx)
509                .inspect(|result| {
510                    tracing::warn!(
511                        ?result,
512                        task = "indexer -> transport",
513                        "long-running background task finished"
514                    )
515                })
516                .await;
517
518            result.expect("The index to transport event chain failed")
519        });
520
521        info!("Start the chain process and sync the indexer");
522        for (id, proc) in self.hopr_chain_api.start().await?.into_iter() {
523            let nid = match id {
524                HoprChainProcess::Indexer => state::HoprLibProcesses::Indexing,
525                HoprChainProcess::OutgoingOnchainActionQueue => state::HoprLibProcesses::OutgoingOnchainActionQueue,
526            };
527            processes.insert(nid, proc);
528        }
529
530        info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
531
532        // Check Safe-module status:
533        // 1) if the node is already included into the module
534        // 2) if the module is enabled in the safe
535        // 3) if the safe is the owner of the module
536        // if any of the conditions is not met, return error
537        if !self.hopr_chain_api.check_node_safe_module_status().await? {
538            return Err(HoprLibError::ChainApi(HoprChainError::Api(
539                "Safe and module are not configured correctly".into(),
540            )));
541        }
542
543        // Possibly register a node-safe pair to NodeSafeRegistry.
544        // Following that, the connector is set to use safe tx variants.
545
546        if self
547            .hopr_chain_api
548            .can_register_with_safe(&self.cfg.safe_module.safe_address)
549            .await?
550        {
551            info!("Registering safe by node");
552
553            if self.me_onchain() == self.cfg.safe_module.safe_address {
554                return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
555            }
556
557            if let Err(error) = self
558                .hopr_chain_api
559                .register_safe(&self.cfg.safe_module.safe_address)
560                .await?
561                .await
562            {
563                // Intentionally ignoring the errored state
564                error!(%error, "Failed to register node with safe")
565            }
566        }
567
568        if self.is_public() {
569            // At this point the node is already registered with Safe, so
570            // we can announce via Safe-compliant TX
571
572            let multiaddresses_to_announce = self.transport_api.announceable_multiaddresses();
573
574            // The announcement is intentionally not awaited until confirmation
575            match self
576                .hopr_chain_api
577                .announce(&multiaddresses_to_announce, &self.me)
578                .await
579            {
580                Ok(_) => info!(?multiaddresses_to_announce, "Announcing node on chain",),
581                Err(AnnouncementError::AlreadyAnnounced) => {
582                    info!(multiaddresses_announced = ?multiaddresses_to_announce, "Node already announced on chain")
583                }
584                // If the announcement fails, we keep going to prevent the node from retrying
585                // after restart.
586                // Functionality is limited, and users must check the logs for errors.
587                Err(error) => error!(%error, "Failed to transmit node announcement"),
588            }
589        }
590
591        {
592            // Sync key ids from indexed Accounts
593
594            // Sync the Channel graph
595            let channel_graph = self.channel_graph.clone();
596            let mut cg = channel_graph.write_arc().await;
597
598            info!("Syncing channels from the previous runs");
599            let mut channel_stream = self
600                .hopr_chain_api
601                .stream_channels(
602                    ChannelSelector::default()
603                        .with_allowed_states(&[
604                            ChannelStatusDiscriminants::Open,
605                            ChannelStatusDiscriminants::PendingToClose,
606                        ])
607                        .with_closure_time_range(Utc::now()..),
608                )
609                .await?;
610            while let Some(channel) = channel_stream.next().await {
611                cg.update_channel(channel);
612            }
613
614            // Initialize node latencies and scores in the channel graph:
615            // Sync only those nodes that we know that had a good quality
616            // Other nodes will be repopulated into the channel graph during heartbeat
617            // rounds.
618            info!("Syncing peer qualities from the previous runs");
619            let min_quality_to_sync: f64 = std::env::var("HOPR_MIN_PEER_QUALITY_TO_SYNC")
620                .map_err(|e| e.to_string())
621                .and_then(|v| std::str::FromStr::from_str(&v).map_err(|_| "parse error".to_string()))
622                .unwrap_or_else(|error| {
623                    warn!(error, "invalid value for HOPR_MIN_PEER_QUALITY_TO_SYNC env variable");
624                    constants::DEFAULT_MIN_QUALITY_TO_SYNC
625                });
626
627            let mut peer_stream = self
628                .node_db
629                .get_network_peers(Default::default(), false)
630                .await?
631                .filter(|status| futures::future::ready(status.quality >= min_quality_to_sync));
632
633            while let Some(peer) = peer_stream.next().await {
634                if let Some(key) = self.hopr_chain_api.packet_key_to_chain_key(&peer.id.0).await? {
635                    // For nodes that had a good quality, we assign a perfect score
636                    cg.update_node_score(&key, NodeScoreUpdate::Initialize(peer.last_seen_latency, 1.0));
637                } else {
638                    error!(peer = %peer.id.1, "Could not translate peer information");
639                }
640            }
641
642            info!(
643                channels = cg.count_channels(),
644                nodes = cg.count_nodes(),
645                "Channel graph sync complete"
646            );
647        }
648
649        // notifier on acknowledged ticket reception
650        let multi_strategy_ack_ticket = self.multistrategy.clone();
651
652        let ack_ticket_channel_capacity = std::env::var("HOPR_INTERNAL_ACKED_TICKET_CHANNEL_CAPACITY")
653            .ok()
654            .and_then(|s| s.trim().parse::<usize>().ok())
655            .filter(|&c| c > 0)
656            .unwrap_or(2048);
657
658        debug!(
659            capacity = ack_ticket_channel_capacity,
660            "Creating acknowledged ticket channel"
661        );
662        let (on_ack_tkt_tx, mut on_ack_tkt_rx) = channel::<AcknowledgedTicket>(ack_ticket_channel_capacity);
663        self.node_db.start_ticket_processing(Some(on_ack_tkt_tx))?;
664
665        processes.insert(
666            state::HoprLibProcesses::OnReceivedAcknowledgement,
667            hopr_async_runtime::spawn_as_abortable!(async move {
668                while let Some(ack) = on_ack_tkt_rx.next().await {
669                    if let Err(error) = hopr_strategy::strategy::SingularStrategy::on_acknowledged_winning_ticket(
670                        &*multi_strategy_ack_ticket,
671                        &ack,
672                    )
673                    .await
674                    {
675                        error!(%error, "Failed to process acknowledged winning ticket with the strategy");
676                    }
677                }
678
679                tracing::warn!(
680                    task = %state::HoprLibProcesses::OnReceivedAcknowledgement,
681                    "long-running background task finished"
682                )
683            }),
684        );
685
686        let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
687            .ok()
688            .and_then(|s| s.trim().parse::<usize>().ok())
689            .filter(|&c| c > 0)
690            .unwrap_or(256);
691
692        debug!(
693            capacity = incoming_session_channel_capacity,
694            "Creating incoming session channel"
695        );
696        let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
697
698        #[cfg(feature = "session-server")]
699        {
700            processes.insert(
701                state::HoprLibProcesses::SessionServer,
702                hopr_async_runtime::spawn_as_abortable!(
703                    _session_rx
704                        .for_each_concurrent(None, move |session| {
705                            let serve_handler = serve_handler.clone();
706                            async move {
707                                let session_id = *session.session.id();
708                                match serve_handler.process(session).await {
709                                    Ok(_) => debug!(
710                                        session_id = ?session_id,
711                                        "Client session processed successfully"
712                                    ),
713                                    Err(e) => error!(
714                                        session_id = ?session_id,
715                                        error = %e,
716                                        "Client session processing failed"
717                                    ),
718                                }
719                            }
720                        })
721                        .inspect(|_| tracing::warn!(
722                            task = %state::HoprLibProcesses::SessionServer,
723                            "long-running background task finished"
724                        ))
725                ),
726            );
727        }
728
729        info!("Starting transport");
730        let (hopr_socket, transport_processes) = self.transport_api.run(indexer_peer_update_rx, session_tx).await?;
731        for (id, proc) in transport_processes.into_iter() {
732            processes.insert(id.into(), proc);
733        }
734
735        let db_clone = self.node_db.clone();
736        processes.insert(
737            state::HoprLibProcesses::TicketIndexFlush,
738            hopr_async_runtime::spawn_as_abortable!(
739                Box::pin(execute_on_tick(
740                    Duration::from_secs(5),
741                    move || {
742                        let db_clone = db_clone.clone();
743                        async move {
744                            match db_clone.persist_outgoing_ticket_indices().await {
745                                Ok(n) => debug!(count = n, "Successfully flushed states of outgoing ticket indices"),
746                                Err(e) => error!(error = %e, "Failed to flush ticket indices"),
747                            }
748                        }
749                    },
750                    "flush the states of outgoing ticket indices".into(),
751                ))
752                .inspect(|_| tracing::warn!(
753                    task = %state::HoprLibProcesses::TicketIndexFlush,
754                    "long-running background task finished"
755                ))
756            ),
757        );
758
759        // NOTE: after the chain is synced, we can reset tickets which are considered
760        // redeemed but on-chain state does not align with that. This implies there was a problem
761        // right when the transaction was sent on-chain. In such cases, we simply let it retry and
762        // handle errors appropriately.
763        let mut channels = self
764            .hopr_chain_api
765            .stream_channels(ChannelSelector {
766                destination: self.me_onchain().into(),
767                ..Default::default()
768            })
769            .await?;
770
771        while let Some(channel) = channels.next().await {
772            self.node_db
773                .update_ticket_states_and_fetch(
774                    TicketSelector::from(&channel)
775                        .with_state(AcknowledgedTicketStatus::BeingRedeemed)
776                        .with_index_range(channel.ticket_index.as_u64()..),
777                    AcknowledgedTicketStatus::Untouched,
778                )
779                .await?
780                .for_each(|ticket| {
781                    info!(%ticket, "fixed next out-of-sync ticket");
782                    futures::future::ready(())
783                })
784                .await;
785        }
786
787        // NOTE: strategy ticks must start after the chain is synced, otherwise
788        // the strategy would react to historical data and drain through the native
789        // balance on chain operations not relevant for the present network state
790        let multi_strategy = self.multistrategy.clone();
791        let strategy_interval = self.cfg.strategy.execution_interval;
792        processes.insert(
793            state::HoprLibProcesses::StrategyTick,
794            hopr_async_runtime::spawn_as_abortable!(
795                execute_on_tick(
796                    Duration::from_secs(strategy_interval),
797                    move || {
798                        let multi_strategy = multi_strategy.clone();
799
800                        async move {
801                            trace!(state = "started", "strategy tick");
802                            let _ = multi_strategy.on_tick().await;
803                            trace!(state = "finished", "strategy tick");
804                        }
805                    },
806                    "run strategies".into(),
807                )
808                .inspect(
809                    |_| tracing::warn!(task = %state::HoprLibProcesses::StrategyTick, "long-running background task finished")
810                )
811            ),
812        );
813
814        self.state.store(state::HoprState::Running, Ordering::Relaxed);
815
816        info!(
817            id = %self.me_peer_id(),
818            version = constants::APP_VERSION,
819            "NODE STARTED AND RUNNING"
820        );
821
822        #[cfg(all(feature = "prometheus", not(test)))]
823        METRIC_HOPR_NODE_INFO.set(
824            &[
825                &self.me.public().to_peerid_str(),
826                &self.me_onchain().to_string(),
827                &self.cfg.safe_module.safe_address.to_string(),
828                &self.cfg.safe_module.module_address.to_string(),
829            ],
830            1.0,
831        );
832
833        Ok((hopr_socket, processes))
834    }
835
836    // p2p transport =========
837    /// Own PeerId used in the libp2p transport layer
838    pub fn me_peer_id(&self) -> PeerId {
839        (*self.me.public()).into()
840    }
841
842    /// Get the list of all announced public nodes in the network
843    pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
844        Ok(self
845            .hopr_chain_api
846            .stream_accounts(AccountSelector {
847                public_only: true,
848                ..Default::default()
849            })
850            .await?
851            .filter_map(|entry| {
852                futures::future::ready(
853                    entry
854                        .get_multiaddr()
855                        .map(|maddr| (PeerId::from(entry.public_key), entry.chain_addr, vec![maddr])),
856                )
857            })
858            .collect()
859            .await)
860    }
861
862    /// Ping another node in the network based on the PeerId
863    ///
864    /// Returns the RTT (round trip time), i.e. how long it took for the ping to return.
865    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
866        self.error_if_not_in_state(
867            state::HoprState::Running,
868            "Node is not ready for on-chain operations".into(),
869        )?;
870
871        Ok(self.transport_api.ping(peer).await?)
872    }
873
874    /// Create a client session connection returning a session object that implements
875    /// [`futures::io::AsyncRead`] and [`futures::io::AsyncWrite`] and can bu used as a read/write binary session.
876    #[cfg(feature = "session-client")]
877    pub async fn connect_to(
878        &self,
879        destination: Address,
880        target: SessionTarget,
881        cfg: SessionClientConfig,
882    ) -> errors::Result<HoprSession> {
883        self.error_if_not_in_state(
884            state::HoprState::Running,
885            "Node is not ready for on-chain operations".into(),
886        )?;
887
888        let backoff = backon::ConstantBuilder::default()
889            .with_max_times(self.cfg.session.establish_max_retries as usize)
890            .with_delay(self.cfg.session.establish_retry_timeout)
891            .with_jitter();
892
893        use backon::Retryable;
894
895        Ok((|| {
896            let cfg = cfg.clone();
897            let target = target.clone();
898            async { self.transport_api.new_session(destination, target, cfg).await }
899        })
900        .retry(backoff)
901        .sleep(backon::FuturesTimerSleeper)
902        .await?)
903    }
904
905    /// Sends keep-alive to the given [`HoprSessionId`], making sure the session is not
906    /// closed due to inactivity.
907    #[cfg(feature = "session-client")]
908    pub async fn keep_alive_session(&self, id: &HoprSessionId) -> errors::Result<()> {
909        self.error_if_not_in_state(
910            state::HoprState::Running,
911            "Node is not ready for on-chain operations".into(),
912        )?;
913        Ok(self.transport_api.probe_session(id).await?)
914    }
915
916    #[cfg(feature = "session-client")]
917    pub async fn get_session_surb_balancer_config(
918        &self,
919        id: &HoprSessionId,
920    ) -> errors::Result<Option<SurbBalancerConfig>> {
921        self.error_if_not_in_state(
922            state::HoprState::Running,
923            "Node is not ready for on-chain operations".into(),
924        )?;
925        Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
926    }
927
928    #[cfg(feature = "session-client")]
929    pub async fn update_session_surb_balancer_config(
930        &self,
931        id: &HoprSessionId,
932        cfg: SurbBalancerConfig,
933    ) -> errors::Result<()> {
934        self.error_if_not_in_state(
935            state::HoprState::Running,
936            "Node is not ready for on-chain operations".into(),
937        )?;
938        Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
939    }
940
941    /// List all multiaddresses announced by this node
942    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
943        self.transport_api.local_multiaddresses()
944    }
945
946    /// List all multiaddresses on which the node is listening
947    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
948        self.transport_api.listening_multiaddresses().await
949    }
950
951    /// List all multiaddresses observed for a PeerId
952    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
953        self.transport_api.network_observed_multiaddresses(peer).await
954    }
955
956    /// List all multiaddresses announced on-chain for the given node.
957    pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Vec<Multiaddr> {
958        let peer = *peer;
959        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
960        let pubkey =
961            match hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer)).await {
962                Ok(k) => k,
963                Err(e) => {
964                    error!(%peer, error = %e, "failed to convert peer id to off-chain key");
965                    return vec![];
966                }
967            };
968
969        match self.hopr_chain_api.find_account_by_packet_key(&pubkey).await {
970            Ok(Some(entry)) => Vec::from_iter(entry.get_multiaddr()),
971            Ok(None) => {
972                error!(%peer, "no information");
973                vec![]
974            }
975            Err(e) => {
976                error!(%peer, error = %e, "failed to retrieve information");
977                vec![]
978            }
979        }
980    }
981
982    // Network =========
983
984    /// Get measured network health
985    pub async fn network_health(&self) -> Health {
986        self.transport_api.network_health().await
987    }
988
989    /// List all peers connected to this
990    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
991        Ok(self.transport_api.network_connected_peers().await?)
992    }
993
994    /// Get all data collected from the network relevant for a PeerId
995    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
996        Ok(self.transport_api.network_peer_info(peer).await?)
997    }
998
999    /// Get peers connected peers with quality higher than some value
1000    pub async fn all_network_peers(
1001        &self,
1002        minimum_quality: f64,
1003    ) -> errors::Result<Vec<(Option<Address>, PeerId, PeerStatus)>> {
1004        Ok(
1005            futures::stream::iter(self.transport_api.network_connected_peers().await?)
1006                .filter_map(|peer| async move {
1007                    if let Ok(Some(info)) = self.transport_api.network_peer_info(&peer).await {
1008                        if info.get_average_quality() >= minimum_quality {
1009                            Some((peer, info))
1010                        } else {
1011                            None
1012                        }
1013                    } else {
1014                        None
1015                    }
1016                })
1017                .filter_map(|(peer_id, info)| async move {
1018                    let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
1019                    Some((address, peer_id, info))
1020                })
1021                .collect::<Vec<_>>()
1022                .await,
1023        )
1024    }
1025
1026    // Ticket ========
1027    /// Get all tickets in a channel specified by [`prelude::Hash`]
1028    pub async fn tickets_in_channel(&self, channel: &prelude::Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
1029        Ok(self.transport_api.tickets_in_channel(channel).await?)
1030    }
1031
1032    /// Get all tickets
1033    pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
1034        Ok(self.transport_api.all_tickets().await?)
1035    }
1036
1037    /// Get statistics for all tickets
1038    pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
1039        Ok(self.transport_api.ticket_statistics().await?)
1040    }
1041
1042    /// Reset the ticket metrics to zero
1043    pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
1044        Ok(self.node_db.reset_ticket_statistics().await?)
1045    }
1046
1047    // DB ============
1048    pub fn peer_resolver(&self) -> &impl ChainKeyOperations {
1049        &self.hopr_chain_api
1050    }
1051
1052    // Chain =========
1053    pub fn me_onchain(&self) -> Address {
1054        self.hopr_chain_api.me_onchain()
1055    }
1056
1057    /// Get ticket price
1058    pub async fn get_ticket_price(&self) -> errors::Result<HoprBalance> {
1059        Ok(self.hopr_chain_api.minimum_ticket_price().await?)
1060    }
1061
1062    /// Get minimum incoming ticket winning probability
1063    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
1064        Ok(self.hopr_chain_api.minimum_incoming_ticket_win_prob().await?)
1065    }
1066
1067    /// List of all accounts announced on the chain
1068    pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1069        Ok(self
1070            .hopr_chain_api
1071            .stream_accounts(AccountSelector {
1072                public_only: true,
1073                ..Default::default()
1074            })
1075            .await?
1076            .collect()
1077            .await)
1078    }
1079
1080    /// Get the channel entry from Hash.
1081    /// @returns the channel entry of those two nodes
1082    pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1083        Ok(self.hopr_chain_api.channel_by_id(channel_id).await?)
1084    }
1085
1086    /// Get the channel entry between source and destination node.
1087    /// @param src Address
1088    /// @param dest Address
1089    /// @returns the channel entry of those two nodes
1090    pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<Option<ChannelEntry>> {
1091        Ok(self.hopr_chain_api.channel_by_parties(src, dest).await?)
1092    }
1093
1094    /// List all channels open from a specified Address
1095    pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1096        Ok(self
1097            .hopr_chain_api
1098            .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1099                ChannelStatusDiscriminants::Closed,
1100                ChannelStatusDiscriminants::Open,
1101                ChannelStatusDiscriminants::PendingToClose,
1102            ]))
1103            .await?
1104            .collect()
1105            .await)
1106    }
1107
1108    /// List all channels open to a specified address
1109    pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1110        Ok(self
1111            .hopr_chain_api
1112            .stream_channels(
1113                ChannelSelector::default()
1114                    .with_destination(*dest)
1115                    .with_allowed_states(&[
1116                        ChannelStatusDiscriminants::Closed,
1117                        ChannelStatusDiscriminants::Open,
1118                        ChannelStatusDiscriminants::PendingToClose,
1119                    ]),
1120            )
1121            .await?
1122            .collect()
1123            .await)
1124    }
1125
1126    /// List all channels
1127    pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1128        Ok(self
1129            .hopr_chain_api
1130            .stream_channels(ChannelSelector::default().with_allowed_states(&[
1131                ChannelStatusDiscriminants::Closed,
1132                ChannelStatusDiscriminants::Open,
1133                ChannelStatusDiscriminants::PendingToClose,
1134            ]))
1135            .await?
1136            .collect()
1137            .await)
1138    }
1139
1140    /// List all corrupted channels
1141    pub async fn corrupted_channels(&self) -> errors::Result<Vec<CorruptedChannelEntry>> {
1142        Ok(self.hopr_chain_api.corrupted_channels().await?)
1143    }
1144
1145    /// Current safe allowance balance
1146    pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1147        Ok(self.hopr_chain_api.safe_allowance().await?)
1148    }
1149
1150    /// Withdraw on-chain assets to a given address
1151    /// @param recipient the account where the assets should be transferred to
1152    /// @param amount how many tokens to be transferred
1153    pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1154        self.error_if_not_in_state(
1155            state::HoprState::Running,
1156            "Node is not ready for on-chain operations".into(),
1157        )?;
1158
1159        let awaiter = self.hopr_chain_api.withdraw(amount, &recipient).await?;
1160
1161        Ok(awaiter.await?)
1162    }
1163
1164    /// Withdraw on-chain native assets to a given address
1165    /// @param recipient the account where the assets should be transferred to
1166    /// @param amount how many tokens to be transferred
1167    pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<prelude::Hash> {
1168        self.error_if_not_in_state(
1169            state::HoprState::Running,
1170            "Node is not ready for on-chain operations".into(),
1171        )?;
1172
1173        let awaiter = self.hopr_chain_api.withdraw(amount, &recipient).await?;
1174
1175        Ok(awaiter.await?)
1176    }
1177
1178    pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1179        self.error_if_not_in_state(
1180            state::HoprState::Running,
1181            "Node is not ready for on-chain operations".into(),
1182        )?;
1183
1184        let (channel_id, tx_hash) = self.hopr_chain_api.open_channel(destination, amount).await?.await?;
1185
1186        Ok(OpenChannelResult { tx_hash, channel_id })
1187    }
1188
1189    pub async fn fund_channel(&self, channel_id: &prelude::Hash, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1190        self.error_if_not_in_state(
1191            state::HoprState::Running,
1192            "Node is not ready for on-chain operations".into(),
1193        )?;
1194
1195        let awaiter = self.hopr_chain_api.fund_channel(channel_id, amount).await?;
1196
1197        Ok(awaiter.await?)
1198    }
1199
1200    pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> errors::Result<CloseChannelResult> {
1201        self.error_if_not_in_state(
1202            state::HoprState::Running,
1203            "Node is not ready for on-chain operations".into(),
1204        )?;
1205
1206        let (status, tx_hash) = self.hopr_chain_api.close_channel(channel_id).await?.await?;
1207
1208        Ok(CloseChannelResult { tx_hash, status })
1209    }
1210
1211    pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1212        Ok(self.hopr_chain_api.channel_closure_notice_period().await?)
1213    }
1214
1215    pub async fn redeem_all_tickets<B: Into<HoprBalance>>(
1216        &self,
1217        min_value: B,
1218        only_aggregated: bool,
1219    ) -> errors::Result<()> {
1220        self.error_if_not_in_state(
1221            state::HoprState::Running,
1222            "Node is not ready for on-chain operations".into(),
1223        )?;
1224
1225        let min_value = min_value.into();
1226        let chain_api = self.hopr_chain_api.clone();
1227
1228        // Does not need to be done concurrently, because we do not await each channel's redemption
1229        self.hopr_chain_api
1230            .stream_channels(
1231                ChannelSelector::default()
1232                    .with_destination(chain_api.me_onchain())
1233                    .with_allowed_states(&[
1234                        ChannelStatusDiscriminants::Open,
1235                        ChannelStatusDiscriminants::PendingToClose,
1236                    ]),
1237            )
1238            .await?
1239            .for_each(|channel| {
1240                let chain_api = chain_api.clone();
1241                async move {
1242                    match chain_api
1243                        .redeem_tickets_via_selector(
1244                            TicketSelector::from(&channel)
1245                                .with_amount(min_value..)
1246                                .with_aggregated_only(only_aggregated)
1247                                .with_index_range(channel.ticket_index.as_u64()..)
1248                                .with_state(AcknowledgedTicketStatus::Untouched),
1249                        )
1250                        .await
1251                    {
1252                        Ok(awaiters) => info!(count = awaiters.len(), %channel, "redeemed tickets in channel"),
1253                        Err(error) => error!(%error, %channel, "failed to redeem tickets"),
1254                    }
1255                }
1256            })
1257            .await;
1258
1259        Ok(())
1260    }
1261
1262    pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1263        &self,
1264        counterparty: &Address,
1265        min_value: B,
1266        only_aggregated: bool,
1267    ) -> errors::Result<usize> {
1268        self.redeem_tickets_in_channel(
1269            &generate_channel_id(counterparty, &self.me_onchain()),
1270            min_value,
1271            only_aggregated,
1272        )
1273        .await
1274    }
1275
1276    pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1277        &self,
1278        channel_id: &Hash,
1279        min_value: B,
1280        only_aggregated: bool,
1281    ) -> errors::Result<usize> {
1282        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1283
1284        let channel = self
1285            .hopr_chain_api
1286            .channel_by_id(channel_id)
1287            .await?
1288            .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1289
1290        let out = self
1291            .hopr_chain_api
1292            .redeem_tickets_via_selector(
1293                TicketSelector::from(channel)
1294                    .with_amount(min_value.into()..)
1295                    .with_aggregated_only(only_aggregated)
1296                    .with_index_range(channel.ticket_index.as_u64()..)
1297                    .with_state(AcknowledgedTicketStatus::Untouched),
1298            )
1299            .await?;
1300
1301        Ok(out.len())
1302    }
1303
1304    pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1305        self.error_if_not_in_state(
1306            state::HoprState::Running,
1307            "Node is not ready for on-chain operations".into(),
1308        )?;
1309
1310        // We do not await the on-chain confirmation
1311        #[allow(clippy::let_underscore_future)]
1312        let _ = self.hopr_chain_api.redeem_ticket(ack_ticket).await?;
1313
1314        Ok(())
1315    }
1316
1317    pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1318        let peer_id = *peer_id;
1319        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
1320        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer_id))
1321            .await
1322            .map_err(|e| HoprLibError::GeneralError(format!("failed to convert peer id to off-chain key: {}", e)))?;
1323
1324        Ok(self.hopr_chain_api.packet_key_to_chain_key(&pubkey).await?)
1325    }
1326
1327    pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1328        Ok(self
1329            .hopr_chain_api
1330            .chain_key_to_packet_key(address)
1331            .await
1332            .map(|pk| pk.map(|v| v.into()))?)
1333    }
1334
1335    pub async fn export_channel_graph(&self, cfg: GraphExportConfig) -> String {
1336        self.channel_graph.read_arc().await.as_dot(cfg)
1337    }
1338
1339    pub async fn export_raw_channel_graph(&self) -> errors::Result<String> {
1340        let cg = self.channel_graph.read_arc().await;
1341        serde_json::to_string(cg.deref()).map_err(|e| HoprLibError::GeneralError(e.to_string()))
1342    }
1343
1344    pub async fn get_indexer_state(&self) -> errors::Result<hopr_chain_api::IndexerStateInfo> {
1345        Ok(self.hopr_chain_api.get_indexer_state().await?)
1346    }
1347
1348    // === telemetry
1349    /// Prometheus formatted metrics collected by the hopr-lib components.
1350    pub fn collect_hopr_metrics() -> errors::Result<String> {
1351        cfg_if::cfg_if! {
1352            if #[cfg(all(feature = "prometheus", not(test)))] {
1353                hopr_metrics::gather_all_metrics().map_err(|e| HoprLibError::GeneralError(e.to_string()))
1354            } else {
1355        Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
1356
1357            }
1358        }
1359    }
1360}