Skip to main content

hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15/// Helper functions.
16mod helpers;
17
18/// Configuration-related public types
19pub mod config;
20/// Various public constants.
21pub mod constants;
22/// Lists all errors thrown from this library.
23pub mod errors;
24/// Public traits for interactions with this library.
25pub mod traits;
26/// Public domain types for peer discovery.
27pub mod types;
28/// Utility module with helper types and functionality over hopr-lib behavior.
29pub mod utils;
30
31pub use hopr_api as api;
32
33/// Exports of libraries necessary for API and interface operations.
34#[doc(hidden)]
35pub mod exports {
36    pub mod types {
37        pub use hopr_api::types::{chain, internal, primitive};
38    }
39
40    pub mod crypto {
41        pub use hopr_api::types::crypto as types;
42        pub use hopr_crypto_keypair as keypair;
43    }
44
45    pub mod network {
46        pub use hopr_network_types as types;
47    }
48
49    pub use hopr_transport as transport;
50}
51
52/// Export of relevant types for easier integration.
53#[doc(hidden)]
54pub mod prelude {
55    #[cfg(feature = "runtime-tokio")]
56    pub use super::exports::network::types::{
57        prelude::ForeignDataMode,
58        udp::{ConnectedUdpStream, UdpStreamParallelism},
59    };
60    pub use super::exports::{
61        crypto::{
62            keypair::key_pair::HoprKeys,
63            types::prelude::{ChainKeypair, Hash, OffchainKeypair},
64        },
65        transport::{OffchainPublicKey, socket::HoprSocket},
66        types::primitive::prelude::Address,
67    };
68}
69
70use std::{
71    convert::identity,
72    future::Future,
73    sync::{Arc, OnceLock, atomic::Ordering},
74    time::Duration,
75};
76
77use anyhow::anyhow;
78use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, channel::mpsc::channel, pin_mut};
79use futures_time::future::FutureExt as FuturesTimeFutureExt;
80use hopr_api::{
81    chain::*,
82    ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
83    node::{ChainInfo, CloseChannelResult, OpenChannelResult, SafeModuleConfig, state::AtomicHoprState},
84};
85pub use hopr_api::{
86    graph::EdgeLinkObservable,
87    network::{NetworkBuilder, NetworkStreamControl},
88    node::{HoprNodeNetworkOperations, HoprNodeOperations, state::HoprState},
89    tickets::{ChannelStats, RedemptionResult, TicketManagement, TicketManagementExt},
90    types::{crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
91};
92use hopr_async_runtime::prelude::spawn;
93pub use hopr_async_runtime::{Abortable, AbortableList};
94pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
95pub use hopr_network_types::prelude::*;
96#[cfg(all(feature = "telemetry", not(test)))]
97use hopr_platform::time::native::current_time;
98use hopr_ticket_manager::{HoprTicketManager, RedbStore, RedbTicketQueue};
99#[cfg(feature = "runtime-tokio")]
100pub use hopr_transport::transfer_session;
101pub use hopr_transport::*;
102use tracing::{debug, error, info, warn};
103use validator::Validate;
104
105pub use crate::{
106    config::SafeModule,
107    constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
108    errors::{HoprLibError, HoprStatusError},
109    types::{AnnouncedPeer, AnnouncementOrigin},
110};
111
112/// Public routing configuration for session opening in `hopr-lib`.
113///
114/// This intentionally exposes only hop-count based routing.
115#[cfg(feature = "session-client")]
116#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, smart_default::SmartDefault)]
117#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
118pub struct HopRouting(
119    #[default(hopr_api::types::primitive::bounded::BoundedSize::MIN)]
120    hopr_api::types::primitive::bounded::BoundedSize<
121        { hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS },
122    >,
123);
124
125#[cfg(feature = "session-client")]
126impl HopRouting {
127    /// Maximum number of hops that can be configured.
128    pub const MAX_HOPS: usize = hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS;
129
130    /// Returns the configured number of hops.
131    pub fn hop_count(self) -> usize {
132        self.0.into()
133    }
134}
135
136#[cfg(feature = "session-client")]
137impl TryFrom<usize> for HopRouting {
138    type Error = hopr_api::types::primitive::errors::GeneralError;
139
140    fn try_from(value: usize) -> Result<Self, Self::Error> {
141        Ok(Self(value.try_into()?))
142    }
143}
144
145#[cfg(feature = "session-client")]
146impl From<HopRouting> for hopr_api::types::internal::routing::RoutingOptions {
147    fn from(value: HopRouting) -> Self {
148        Self::Hops(value.0)
149    }
150}
151
152/// Session client configuration for `hopr-lib`.
153///
154/// Unlike transport-level configuration, this API intentionally does not expose
155/// explicit intermediate paths.
156#[cfg(feature = "session-client")]
157#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault)]
158pub struct HoprSessionClientConfig {
159    /// Forward route selection policy.
160    pub forward_path: HopRouting,
161    /// Return route selection policy.
162    pub return_path: HopRouting,
163    /// Capabilities offered by the session.
164    #[default(_code = "SessionCapability::Segmentation.into()")]
165    pub capabilities: SessionCapabilities,
166    /// Optional pseudonym used for the session. Mostly useful for testing only.
167    #[default(None)]
168    pub pseudonym: Option<hopr_api::types::internal::protocol::HoprPseudonym>,
169    /// Enable automatic SURB management for the session.
170    #[default(Some(SurbBalancerConfig::default()))]
171    pub surb_management: Option<SurbBalancerConfig>,
172    /// If set, the maximum number of possible SURBs will always be sent with session data packets.
173    #[default(false)]
174    pub always_max_out_surbs: bool,
175}
176
177#[cfg(feature = "session-client")]
178impl From<HoprSessionClientConfig> for hopr_transport::SessionClientConfig {
179    fn from(value: HoprSessionClientConfig) -> Self {
180        Self {
181            forward_path_options: value.forward_path.into(),
182            return_path_options: value.return_path.into(),
183            capabilities: value.capabilities,
184            pseudonym: value.pseudonym,
185            surb_management: value.surb_management,
186            always_max_out_surbs: value.always_max_out_surbs,
187        }
188    }
189}
190
191/// Long-running tasks that are spawned by the HOPR node.
192#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display, strum::EnumCount)]
193pub enum HoprLibProcess {
194    #[strum(to_string = "transport: {0}")]
195    Transport(HoprTransportProcess),
196    #[strum(to_string = "session server providing the exit node session stream functionality")]
197    SessionServer,
198    #[strum(to_string = "ticket redemption queue driver")]
199    TicketRedemptions,
200    #[strum(to_string = "subscription for on-chain channel updates")]
201    ChannelEvents,
202    #[strum(to_string = "on received ticket event (winning or rejected)")]
203    TicketEvents,
204    #[strum(to_string = "persisting of outgoing ticket indices")]
205    OutIndexSync,
206}
207
208#[cfg(all(feature = "telemetry", not(test)))]
209lazy_static::lazy_static! {
210    static ref METRIC_PROCESS_START_TIME:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
211        "hopr_start_time",
212        "The unix timestamp in seconds at which the process was started"
213    ).unwrap();
214    static ref METRIC_HOPR_LIB_VERSION:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
215        "hopr_lib_version",
216        "Executed version of hopr-lib",
217        &["version"]
218    ).unwrap();
219    static ref METRIC_HOPR_NODE_INFO:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
220        "hopr_node_addresses",
221        "Node on-chain and off-chain addresses",
222        &["peerid", "address", "safe_address", "module_address"]
223    ).unwrap();
224}
225
226/// Prepare an optimized version of the tokio runtime setup for hopr-lib specifically.
227///
228/// Divide the available CPU parallelism by 2, since half of the available threads are
229/// to be used for IO-bound and half for CPU-bound tasks.
230#[cfg(feature = "runtime-tokio")]
231pub fn prepare_tokio_runtime(
232    num_cpu_threads: Option<std::num::NonZeroUsize>,
233    num_io_threads: Option<std::num::NonZeroUsize>,
234) -> anyhow::Result<tokio::runtime::Runtime> {
235    use std::str::FromStr;
236    let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
237
238    hopr_parallelize::cpu::init_thread_pool(
239        num_cpu_threads
240            .map(|v| v.get())
241            .or(avail_parallelism)
242            .ok_or(anyhow::anyhow!(
243                "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
244                 environment variable."
245            ))?
246            .max(1),
247    )?;
248
249    Ok(tokio::runtime::Builder::new_multi_thread()
250        .enable_all()
251        .worker_threads(
252            num_io_threads
253                .map(|v| v.get())
254                .or(avail_parallelism)
255                .ok_or(anyhow::anyhow!(
256                    "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
257                     environment variable."
258                ))?
259                .max(1),
260        )
261        .thread_name("hoprd")
262        .thread_stack_size(
263            std::env::var("HOPRD_THREAD_STACK_SIZE")
264                .ok()
265                .and_then(|v| usize::from_str(&v).ok())
266                .unwrap_or(10 * 1024 * 1024)
267                .max(2 * 1024 * 1024),
268        )
269        .build()?)
270}
271
272/// Type alias used to send and receive transport data via a running HOPR node.
273pub type HoprTransportIO = socket::HoprSocket<
274    futures::channel::mpsc::Receiver<ApplicationDataIn>,
275    futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
276>;
277
278type TicketEvents = (
279    async_broadcast::Sender<TicketEvent>,
280    async_broadcast::InactiveReceiver<TicketEvent>,
281);
282
283/// Time to wait until the node's keybinding appears on-chain
284const NODE_READY_TIMEOUT: Duration = Duration::from_secs(120);
285
286/// Timeout to wait until an on-chain event is received in response to a successful on-chain operation resolution.
287// TODO: use the value from ChainInfo instead (once available via https://github.com/hoprnet/blokli/issues/200)
288const ON_CHAIN_RESOLUTION_EVENT_TIMEOUT: Duration = Duration::from_secs(90);
289
290/// HOPR main object providing the entire HOPR node functionality
291///
292/// Instantiating this object creates all processes and objects necessary for
293/// running the HOPR node. Once created, the node can be started using the
294/// `run()` method.
295///
296/// Externally offered API should be enough to perform all necessary tasks
297/// with the HOPR node manually, but it is advised to create such a configuration
298/// that manual interaction is unnecessary.
299///
300/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
301pub struct Hopr<Chain, Graph, Net>
302where
303    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
304        + hopr_api::graph::NetworkGraphUpdate
305        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
306        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
307        + Clone
308        + Send
309        + Sync
310        + 'static,
311    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
312        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
313    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
314    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
315{
316    me: OffchainKeypair,
317    cfg: config::HoprLibConfig,
318    state: Arc<api::node::state::AtomicHoprState>,
319    transport_api: HoprTransport<Chain, Graph, Net>,
320    chain_api: Chain,
321    ticket_event_subscribers: TicketEvents,
322    ticket_manager: OnceLock<Arc<HoprTicketManager<RedbStore, RedbTicketQueue>>>,
323    processes: OnceLock<AbortableList<HoprLibProcess>>,
324}
325
326impl<Chain, Graph, Net> Hopr<Chain, Graph, Net>
327where
328    Chain: HoprChainApi + Clone + Send + Sync + 'static,
329    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
330        + hopr_api::graph::NetworkGraphUpdate
331        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
332        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
333        + Clone
334        + Send
335        + Sync
336        + 'static,
337    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
338        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
339    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
340    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
341{
342    pub async fn new(
343        identity: (&ChainKeypair, &OffchainKeypair),
344        hopr_chain_api: Chain,
345        graph: Graph,
346        cfg: config::HoprLibConfig,
347    ) -> errors::Result<Self> {
348        if hopr_api::types::crypto_random::is_rng_fixed() {
349            warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
350        }
351
352        cfg.validate()?;
353
354        let hopr_transport_api = HoprTransport::new(
355            identity,
356            hopr_chain_api.clone(),
357            graph,
358            vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
359            cfg.protocol.clone(),
360        )
361        .map_err(HoprLibError::TransportError)?;
362
363        #[cfg(all(feature = "telemetry", not(test)))]
364        {
365            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
366            METRIC_HOPR_LIB_VERSION.set(
367                &[const_format::formatcp!("{}", constants::APP_VERSION)],
368                const_format::formatcp!(
369                    "{}.{}",
370                    env!("CARGO_PKG_VERSION_MAJOR"),
371                    env!("CARGO_PKG_VERSION_MINOR")
372                )
373                .parse()
374                .unwrap_or(0.0),
375            );
376        }
377
378        let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
379        new_tickets_tx.set_await_active(false);
380        new_tickets_tx.set_overflow(true);
381
382        Ok(Self {
383            me: identity.1.clone(),
384            cfg,
385            state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
386            transport_api: hopr_transport_api,
387            chain_api: hopr_chain_api,
388            processes: OnceLock::new(),
389            ticket_event_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
390            ticket_manager: Default::default(),
391        })
392    }
393
394    fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
395        if HoprNodeOperations::status(self) == state {
396            Ok(())
397        } else {
398            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
399        }
400    }
401
402    pub fn config(&self) -> &config::HoprLibConfig {
403        &self.cfg
404    }
405
406    /// Returns a reference to the network graph.
407    pub fn graph(&self) -> &Graph {
408        self.transport_api.graph()
409    }
410
411    #[inline]
412    fn is_public(&self) -> bool {
413        self.cfg.publish
414    }
415
416    pub async fn run<
417        Ct,
418        NetBuilder,
419        #[cfg(feature = "session-server")] T: traits::HoprSessionServer + Clone + Send + 'static,
420    >(
421        &self,
422        cover_traffic: Ct,
423        network_builder: NetBuilder,
424        #[cfg(feature = "session-server")] serve_handler: T,
425    ) -> errors::Result<HoprTransportIO>
426    where
427        Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
428        NetBuilder: NetworkBuilder<Network = Net> + Send + Sync + 'static,
429    {
430        self.error_if_not_in_state(
431            HoprState::Uninitialized,
432            "cannot start the hopr node multiple times".into(),
433        )?;
434
435        #[cfg(feature = "testing")]
436        warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
437
438        let me_onchain = *self.chain_api.me();
439        info!(
440            address = %me_onchain, minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
441            "node is not started, please fund this node",
442        );
443
444        self.state.store(HoprState::WaitingForFunds, Ordering::Relaxed);
445        helpers::wait_for_funds(
446            *MIN_NATIVE_BALANCE,
447            *SUGGESTED_NATIVE_BALANCE,
448            Duration::from_secs(200),
449            me_onchain,
450            &self.chain_api,
451        )
452        .await?;
453
454        let mut processes = AbortableList::<HoprLibProcess>::default();
455
456        info!("starting HOPR node...");
457        self.state.store(HoprState::CheckingBalance, Ordering::Relaxed);
458
459        let balance: XDaiBalance = self.chain_api.balance(me_onchain).await.map_err(HoprLibError::chain)?;
460        let minimum_balance = *constants::MIN_NATIVE_BALANCE;
461
462        info!(
463            address = %me_onchain,
464            %balance,
465            %minimum_balance,
466            "node information"
467        );
468
469        if balance.le(&minimum_balance) {
470            return Err(HoprLibError::GeneralError(
471                "cannot start the node without a sufficiently funded wallet".into(),
472            ));
473        }
474
475        self.state.store(HoprState::ValidatingNetworkConfig, Ordering::Relaxed);
476
477        // Once we are able to query the chain,
478        // check if the ticket price is configured correctly.
479        let network_min_ticket_price = self
480            .chain_api
481            .minimum_ticket_price()
482            .await
483            .map_err(HoprLibError::chain)?;
484        let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
485        if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
486            return Err(HoprLibError::GeneralError(format!(
487                "configured outgoing ticket price is lower than the network minimum ticket price: \
488                 {configured_ticket_price:?} < {network_min_ticket_price}"
489            )));
490        }
491        // Once we are able to query the chain,
492        // check if the winning probability is configured correctly.
493        let network_min_win_prob = self
494            .chain_api
495            .minimum_incoming_ticket_win_prob()
496            .await
497            .map_err(HoprLibError::chain)?;
498        let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
499        if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
500            && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
501        {
502            return Err(HoprLibError::GeneralError(format!(
503                "configured outgoing ticket winning probability is lower than the network minimum winning \
504                 probability: {configured_win_prob:?} < {network_min_win_prob}"
505            )));
506        }
507
508        self.state.store(HoprState::CheckingOnchainAddress, Ordering::Relaxed);
509
510        info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
511
512        let safe_addr = self.cfg.safe_module.safe_address;
513
514        if self.me_onchain() == safe_addr {
515            return Err(HoprLibError::GeneralError(
516                "cannot use self as staking safe address".into(),
517            ));
518        }
519
520        self.state.store(HoprState::RegisteringSafe, Ordering::Relaxed);
521        info!(%safe_addr, "registering safe with this node");
522        match self.chain_api.register_safe(&safe_addr).await {
523            Ok(awaiter) => {
524                // Wait until the registration is confirmed on-chain, otherwise we cannot proceed.
525                awaiter.await.map_err(|error| {
526                    error!(%safe_addr, %error, "safe registration failed with error");
527                    HoprLibError::chain(error)
528                })?;
529                info!(%safe_addr, "safe successfully registered with this node");
530            }
531            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
532                info!(%safe_addr, "this safe is already registered with this node");
533            }
534            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
535                // TODO: support safe deregistration flow
536                error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
537                return Err(HoprLibError::GeneralError("node registered with different safe".into()));
538            }
539            Err(error) => {
540                error!(%safe_addr, %error, "safe registration failed");
541                return Err(HoprLibError::chain(error));
542            }
543        }
544
545        // Only public nodes announce multiaddresses
546        let multiaddresses_to_announce = if self.is_public() {
547            // The multiaddresses are filtered for the non-private ones,
548            // unless `announce_local_addresses` is set to `true`.
549            self.transport_api.announceable_multiaddresses()
550        } else {
551            Vec::with_capacity(0)
552        };
553
554        // Warn when announcing a private multiaddress, which is acceptable in certain scenarios
555        multiaddresses_to_announce
556            .iter()
557            .filter(|a| !is_public_address(a))
558            .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
559
560        self.state.store(HoprState::AnnouncingNode, Ordering::Relaxed);
561
562        let chain_api = self.chain_api.clone();
563        let me_offchain = *self.me.public();
564        let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
565
566        // At this point the node is already registered with Safe, so
567        // we can announce via Safe-compliant TX
568        info!(?multiaddresses_to_announce, "announcing node on chain");
569        match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
570            Ok(awaiter) => {
571                // Wait until the announcement is confirmed on-chain, otherwise we cannot proceed.
572                awaiter.await.map_err(|error| {
573                    error!(?multiaddresses_to_announce, %error, "node announcement failed");
574                    HoprLibError::chain(error)
575                })?;
576                info!(?multiaddresses_to_announce, "node has been successfully announced");
577            }
578            Err(AnnouncementError::AlreadyAnnounced) => {
579                info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
580            }
581            Err(error) => {
582                error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
583                return Err(HoprLibError::chain(error));
584            }
585        }
586
587        self.state.store(HoprState::AwaitingKeyBinding, Ordering::Relaxed);
588
589        // Wait for the node key-binding readiness to return
590        let this_node_account = node_ready
591            .await
592            .map_err(HoprLibError::other)?
593            .map_err(HoprLibError::chain)?;
594        if this_node_account.chain_addr != self.me_onchain()
595            || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
596        {
597            error!(%this_node_account, "account bound to offchain key does not match this node");
598            return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
599        }
600
601        info!(%this_node_account, "node account is ready");
602
603        self.state.store(HoprState::InitializingServices, Ordering::Relaxed);
604
605        info!("initializing session infrastructure");
606        let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
607            .ok()
608            .and_then(|s| s.trim().parse::<usize>().ok())
609            .filter(|&c| c > 0)
610            .unwrap_or(256);
611
612        let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
613        #[cfg(feature = "session-server")]
614        {
615            debug!(capacity = incoming_session_channel_capacity, "creating session server");
616            processes.insert(
617                HoprLibProcess::SessionServer,
618                hopr_async_runtime::spawn_as_abortable!(
619                    _session_rx
620                        .for_each_concurrent(None, move |session| {
621                            let serve_handler = serve_handler.clone();
622                            async move {
623                                let session_id = *session.session.id();
624                                match serve_handler.process(session).await {
625                                    Ok(_) => debug!(?session_id, "client session processed successfully"),
626                                    Err(error) => error!(
627                                        ?session_id,
628                                        %error,
629                                        "client session processing failed"
630                                    ),
631                                }
632                            }
633                        })
634                        .inspect(|_| tracing::warn!(
635                            task = %HoprLibProcess::SessionServer,
636                            "long-running background task finished"
637                        ))
638                ),
639            );
640        }
641
642        info!("starting ticket manager & factory");
643        let store = self
644            .cfg
645            .ticket_storage_file
646            .as_ref()
647            .map(RedbStore::new)
648            .unwrap_or_else(RedbStore::new_temp)
649            .map_err(HoprLibError::ticket_manager)?;
650
651        let (ticket_manager, ticket_factory) = HoprTicketManager::new_with_factory(store);
652        let ticket_manager = Arc::new(ticket_manager);
653        let ticket_factory = Arc::new(ticket_factory);
654
655        // Synchronize the ticket manager with the chain before starting the packet pipeline
656        ticket_manager
657            .sync_from_incoming_channels(
658                &self
659                    .chain_api
660                    .stream_channels(ChannelSelector::default().with_destination(me_onchain))
661                    .map_err(HoprLibError::chain)?
662                    .collect::<Vec<_>>()
663                    .await,
664            )
665            .map_err(HoprLibError::ticket_manager)?;
666
667        // Synchronize the ticket factory with the chain before starting the packet pipeline
668        ticket_factory
669            .sync_from_outgoing_channels(
670                &self
671                    .chain_api
672                    .stream_channels(ChannelSelector::default().with_source(me_onchain))
673                    .map_err(HoprLibError::chain)?
674                    .collect::<Vec<_>>()
675                    .await,
676            )
677            .map_err(HoprLibError::ticket_manager)?;
678
679        // Make sure outgoing ticket indices in the ticket factory are periodically persisted to disk
680        let (index_sync_handle, index_sync_reg) = futures::future::AbortHandle::new_pair();
681        let tfact = ticket_factory.clone();
682        let tfact2 = ticket_factory.clone();
683        spawn(
684            futures::stream::Abortable::new(
685                futures_time::stream::interval(self.cfg.out_index_sync_period.into()),
686                index_sync_reg,
687            )
688            .for_each(move |_| {
689                let tfact = tfact.clone();
690                async move {
691                    if let Err(error) =
692                        hopr_async_runtime::prelude::spawn_blocking(move || tfact.save_outgoing_indices())
693                            .map_err(hopr_ticket_manager::TicketManagerError::store)
694                            .and_then(futures::future::ready)
695                            .await
696                    {
697                        tracing::error!(%error, "failed to sync ticket indices to persistent storage:");
698                    } else {
699                        tracing::trace!("successfully synced ticket indices");
700                    }
701                }
702            })
703            .inspect(move |_| {
704                // Do an extra save here on graceful shutdown
705                if let Err(error) = tfact2.save_outgoing_indices() {
706                    tracing::error!(%error, "failed to sync ticket indices to persistent storage on shutdown");
707                }
708                tracing::warn!(
709                    task = %HoprLibProcess::OutIndexSync,
710                    "long-running background task finished"
711                )
712            }),
713        );
714        processes.insert(HoprLibProcess::OutIndexSync, index_sync_handle);
715
716        info!("starting ticket events processor");
717        let (tickets_tx, tickets_rx) = channel(8192);
718        let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
719        processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
720        let new_ticket_tx = self.ticket_event_subscribers.0.clone();
721        let tmgr = ticket_manager.clone();
722        spawn(
723            tickets_rx
724                .for_each(move |event| {
725                    // Ticket manager processes new winning tickets
726                    if let TicketEvent::WinningTicket(ticket) = &event
727                        && let Err(error) = tmgr.insert_incoming_ticket(**ticket)
728                    {
729                        tracing::error!(%error, "failed to insert incoming ticket into manager");
730                    }
731                    if let Err(error) = new_ticket_tx.try_broadcast(event) {
732                        tracing::error!(%error, "failed to broadcast new ticket event to subscribers");
733                    }
734                    futures::future::ready(())
735                })
736                .inspect(|_| {
737                    tracing::warn!(
738                        task = %HoprLibProcess::TicketEvents,
739                        "long-running background task finished"
740                    )
741                }),
742        );
743
744        info!("starting transport");
745        let (hopr_socket, transport_processes) = self
746            .transport_api
747            .run(cover_traffic, network_builder, tickets_tx, ticket_factory, session_tx)
748            .await?;
749        processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
750
751        info!("subscribing to channel events");
752        let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
753        processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
754        let chain = self.chain_api.clone();
755        let events = chain.subscribe().map_err(HoprLibError::chain)?;
756        let tmgr = ticket_manager.clone();
757
758        spawn(
759            futures::stream::Abortable::new(
760                events
761                    .filter_map(move |event|
762                        futures::future::ready(event.try_as_channel_closed())
763                    ),
764                chain_events_sub_reg
765            )
766            .for_each(move |closed_channel| {
767                let chain = chain.clone();
768                let tmgr = tmgr.clone();
769                async move {
770                    match closed_channel.direction(chain.me()) {
771                        Some(ChannelDirection::Incoming) => {
772                            // Ticket manager neglects tickets on incoming channel closure
773                            match tmgr.neglect_tickets(closed_channel.get_id(), None) {
774                                Ok(neglected) if !neglected.is_empty() => {
775                                    warn!(num_neglected = neglected.len(), %closed_channel, "tickets on incoming closed channel were neglected");
776                                },
777                                Ok(_) => {
778                                    debug!(%closed_channel, "no neglected tickets on incoming closed channel");
779                                },
780                                Err(error) => {
781                                    error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
782                                }
783                            }
784                        },
785                        Some(ChannelDirection::Outgoing) => {
786                            // Removal of outgoing ticket index is done automatically be the ticket manager
787                            // when new epoch is encountered
788                        }
789                        _ => {} // Event for a channel that is not our own
790                    }
791                }
792            })
793            .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
794        );
795
796        self.state.store(HoprState::Running, Ordering::Relaxed);
797
798        self.ticket_manager
799            .set(ticket_manager)
800            .map_err(|_| HoprLibError::other(anyhow!("cannot set ticket manager")))?;
801
802        info!(
803            id = %self.me_peer_id(),
804            version = constants::APP_VERSION,
805            "NODE STARTED AND RUNNING"
806        );
807
808        #[cfg(all(feature = "telemetry", not(test)))]
809        METRIC_HOPR_NODE_INFO.set(
810            &[
811                &self.me.public().to_peerid_str(),
812                &me_onchain.to_string(),
813                &self.cfg.safe_module.safe_address.to_string(),
814                &self.cfg.safe_module.module_address.to_string(),
815            ],
816            1.0,
817        );
818
819        let _ = self.processes.set(processes);
820
821        Ok(hopr_socket)
822    }
823
824    /// Used to practically shut down all node's processes without dropping the instance.
825    ///
826    /// This means that the instance can be used to retrieve some information, but all
827    /// active operations will stop and new will be impossible to perform.
828    /// Such operations will return [`HoprStatusError::NotThereYet`].
829    ///
830    /// This is the final state and cannot be reversed by calling `run` again.
831    pub fn shutdown(&self) -> Result<(), HoprLibError> {
832        self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
833        if let Some(processes) = self.processes.get() {
834            processes.abort_all();
835        }
836        self.state.store(HoprState::Terminated, Ordering::Relaxed);
837        info!("NODE SHUTDOWN COMPLETE");
838        Ok(())
839    }
840
841    /// Create a client session connection returning a session object that implements
842    /// [`futures::io::AsyncRead`] and [`futures::io::AsyncWrite`] and can bu used as a read/write binary session.
843    #[cfg(feature = "session-client")]
844    pub async fn connect_to(
845        &self,
846        destination: Address,
847        target: SessionTarget,
848        cfg: HoprSessionClientConfig,
849    ) -> errors::Result<HoprSession> {
850        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
851
852        let backoff = backon::ConstantBuilder::default()
853            .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
854            .with_delay(self.cfg.protocol.session.establish_retry_timeout)
855            .with_jitter();
856
857        use backon::Retryable;
858
859        Ok((|| {
860            let cfg = hopr_transport::SessionClientConfig::from(cfg.clone());
861            let target = target.clone();
862            async { self.transport_api.new_session(destination, target, cfg).await }
863        })
864        .retry(backoff)
865        .sleep(backon::FuturesTimerSleeper)
866        .await?)
867    }
868
869    /// Sends keep-alive to the given [`SessionId`], making sure the session is not
870    /// closed due to inactivity.
871    #[cfg(feature = "session-client")]
872    pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
873        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
874        Ok(self.transport_api.probe_session(id).await?)
875    }
876
877    #[cfg(feature = "session-client")]
878    pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
879        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
880        Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
881    }
882
883    #[cfg(feature = "session-client")]
884    pub async fn update_session_surb_balancer_config(
885        &self,
886        id: &SessionId,
887        cfg: SurbBalancerConfig,
888    ) -> errors::Result<()> {
889        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
890        Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
891    }
892
893    /// Spawns a one-shot awaiter that hooks up to the [`ChainEvent`] bus and either matching the given `predicate`
894    /// successfully or timing out after `timeout`.
895    fn spawn_wait_for_on_chain_event(
896        &self,
897        context: impl std::fmt::Display,
898        predicate: impl Fn(&ChainEvent) -> bool + Send + Sync + 'static,
899        timeout: Duration,
900    ) -> errors::Result<(
901        impl Future<Output = errors::Result<ChainEvent>>,
902        hopr_async_runtime::AbortHandle,
903    )> {
904        debug!(%context, "registering wait for on-chain event");
905        let (event_stream, handle) = futures::stream::abortable(
906            self.chain_api
907                .subscribe()
908                .map_err(HoprLibError::chain)?
909                .skip_while(move |event| futures::future::ready(!predicate(event))),
910        );
911
912        let ctx = context.to_string();
913
914        Ok((
915            spawn(async move {
916                pin_mut!(event_stream);
917                let res = event_stream
918                    .next()
919                    .timeout(futures_time::time::Duration::from(timeout))
920                    .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")))
921                    .await?
922                    .ok_or(HoprLibError::GeneralError(format!(
923                        "failed to yield an on-chain event for {ctx}"
924                    )));
925                debug!(%ctx, ?res, "on-chain event waiting done");
926                res
927            })
928            .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")))
929            .and_then(futures::future::ready),
930            handle,
931        ))
932    }
933}
934
935impl<Chain, Graph, Net> Hopr<Chain, Graph, Net>
936where
937    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
938        + hopr_api::graph::NetworkGraphUpdate
939        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
940        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
941        + Clone
942        + Send
943        + Sync
944        + 'static,
945    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
946        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
947    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
948    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
949{
950    // === telemetry
951    /// Prometheus formatted metrics collected by the hopr-lib components.
952    pub fn collect_hopr_metrics() -> errors::Result<String> {
953        cfg_if::cfg_if! {
954            if #[cfg(all(feature = "telemetry", not(test)))] {
955                hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
956            } else {
957                Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
958            }
959        }
960    }
961}
962
963// === Trait implementations for the high-level node API ===
964
965impl<Chain, Graph, Net> HoprNodeOperations for Hopr<Chain, Graph, Net>
966where
967    Chain: HoprChainApi + Clone + Send + Sync + 'static,
968    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
969        + hopr_api::graph::NetworkGraphUpdate
970        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
971        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
972        + Clone
973        + Send
974        + Sync
975        + 'static,
976    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
977        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
978    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
979    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
980{
981    fn status(&self) -> HoprState {
982        self.state.load(Ordering::Relaxed)
983    }
984}
985
986#[async_trait::async_trait]
987impl<Chain, Graph, Net> HoprNodeNetworkOperations for Hopr<Chain, Graph, Net>
988where
989    Chain: HoprChainApi + Clone + Send + Sync + 'static,
990    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
991        + hopr_api::graph::NetworkGraphUpdate
992        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
993        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
994        + Clone
995        + Send
996        + Sync
997        + 'static,
998    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
999        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1000    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1001    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1002{
1003    type Error = HoprLibError;
1004    type TransportObservable = <Graph as hopr_api::graph::NetworkGraphView>::Observed;
1005
1006    fn me_peer_id(&self) -> PeerId {
1007        (*self.me.public()).into()
1008    }
1009
1010    async fn get_public_nodes(&self) -> Result<Vec<(PeerId, Address, Vec<Multiaddr>)>, Self::Error> {
1011        Ok(self
1012            .chain_api
1013            .stream_accounts(AccountSelector {
1014                public_only: true,
1015                ..Default::default()
1016            })
1017            .map_err(HoprLibError::chain)?
1018            .map(|entry| {
1019                (
1020                    PeerId::from(entry.public_key),
1021                    entry.chain_addr,
1022                    entry.get_multiaddrs().to_vec(),
1023                )
1024            })
1025            .collect()
1026            .await)
1027    }
1028
1029    async fn network_health(&self) -> hopr_api::network::Health {
1030        self.transport_api.network_health().await
1031    }
1032
1033    async fn network_connected_peers(&self) -> Result<Vec<PeerId>, Self::Error> {
1034        Ok(self
1035            .transport_api
1036            .network_connected_peers()
1037            .await?
1038            .into_iter()
1039            .map(PeerId::from)
1040            .collect())
1041    }
1042
1043    fn network_peer_info(&self, peer: &PeerId) -> Option<Self::TransportObservable> {
1044        let pubkey = OffchainPublicKey::from_peerid(peer).ok()?;
1045        self.transport_api.network_peer_observations(&pubkey)
1046    }
1047
1048    async fn all_network_peers(
1049        &self,
1050        minimum_score: f64,
1051    ) -> Result<Vec<(Option<Address>, PeerId, Self::TransportObservable)>, Self::Error> {
1052        Ok(
1053            futures::stream::iter(self.transport_api.all_network_peers(minimum_score).await?)
1054                .filter_map(|(pubkey, info)| async move {
1055                    let peer_id = PeerId::from(pubkey);
1056                    let address = self.peerid_to_chain_key(&peer_id).ok().flatten();
1057                    Some((address, peer_id, info))
1058                })
1059                .collect::<Vec<_>>()
1060                .await,
1061        )
1062    }
1063
1064    fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1065        self.transport_api.local_multiaddresses()
1066    }
1067
1068    async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1069        self.transport_api.listening_multiaddresses().await
1070    }
1071
1072    async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1073        let Ok(pubkey) = hopr_transport::peer_id_to_public_key(peer) else {
1074            return vec![];
1075        };
1076        self.transport_api.network_observed_multiaddresses(&pubkey).await
1077    }
1078
1079    async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Result<Vec<Multiaddr>, Self::Error> {
1080        let pubkey = hopr_transport::peer_id_to_public_key(peer).map_err(HoprLibError::TransportError)?;
1081
1082        match self
1083            .chain_api
1084            .stream_accounts(AccountSelector {
1085                public_only: false,
1086                offchain_key: Some(pubkey),
1087                ..Default::default()
1088            })
1089            .map_err(HoprLibError::chain)?
1090            .next()
1091            .await
1092        {
1093            Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
1094            None => {
1095                error!(%peer, "no information");
1096                Ok(vec![])
1097            }
1098        }
1099    }
1100
1101    async fn ping(&self, peer: &PeerId) -> Result<(Duration, Self::TransportObservable), Self::Error> {
1102        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1103        let pubkey = hopr_transport::peer_id_to_public_key(peer).map_err(HoprLibError::TransportError)?;
1104        Ok(self.transport_api.ping(&pubkey).await?)
1105    }
1106}
1107
1108impl<Chain, Graph, Net> Hopr<Chain, Graph, Net>
1109where
1110    Chain: HoprChainApi + Clone + Send + Sync + 'static,
1111    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
1112        + hopr_api::graph::NetworkGraphUpdate
1113        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
1114        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1115        + Clone
1116        + Send
1117        + Sync
1118        + 'static,
1119    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
1120        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1121    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1122    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1123{
1124    // Minimum grace period left in PendingToClose channel, so
1125    // that ticket redemption should still be attempted.
1126    const PENDING_TO_CLOSE_TOLERANCE: Duration = Duration::from_secs(5);
1127
1128    pub fn me_onchain(&self) -> Address {
1129        *self.chain_api.me()
1130    }
1131
1132    pub fn get_safe_config(&self) -> SafeModuleConfig {
1133        SafeModuleConfig {
1134            safe_address: self.cfg.safe_module.safe_address,
1135            module_address: self.cfg.safe_module.module_address,
1136        }
1137    }
1138
1139    pub async fn get_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1140        self.chain_api
1141            .balance(self.me_onchain())
1142            .await
1143            .map_err(HoprLibError::chain)
1144    }
1145
1146    pub async fn get_safe_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1147        self.chain_api
1148            .balance(self.cfg.safe_module.safe_address)
1149            .await
1150            .map_err(HoprLibError::chain)
1151    }
1152
1153    pub async fn safe_allowance(&self) -> Result<HoprBalance, HoprLibError> {
1154        self.chain_api
1155            .safe_allowance(self.cfg.safe_module.safe_address)
1156            .await
1157            .map_err(HoprLibError::chain)
1158    }
1159
1160    pub async fn chain_info(&self) -> Result<ChainInfo, HoprLibError> {
1161        self.chain_api.chain_info().await.map_err(HoprLibError::chain)
1162    }
1163
1164    pub async fn get_ticket_price(&self) -> Result<HoprBalance, HoprLibError> {
1165        self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1166    }
1167
1168    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> Result<WinningProbability, HoprLibError> {
1169        self.chain_api
1170            .minimum_incoming_ticket_win_prob()
1171            .await
1172            .map_err(HoprLibError::chain)
1173    }
1174
1175    pub async fn get_channel_closure_notice_period(&self) -> Result<Duration, HoprLibError> {
1176        self.chain_api
1177            .channel_closure_notice_period()
1178            .await
1179            .map_err(HoprLibError::chain)
1180    }
1181
1182    pub async fn announced_peers(&self) -> Result<Vec<AnnouncedPeer>, HoprLibError> {
1183        Ok(self
1184            .chain_api
1185            .stream_accounts(AccountSelector {
1186                public_only: true,
1187                ..Default::default()
1188            })
1189            .map_err(HoprLibError::chain)?
1190            .map(|entry| AnnouncedPeer {
1191                address: entry.chain_addr,
1192                multiaddresses: entry.get_multiaddrs().to_vec(),
1193                origin: AnnouncementOrigin::Chain,
1194            })
1195            .collect()
1196            .await)
1197    }
1198
1199    pub fn peerid_to_chain_key(&self, peer_id: &PeerId) -> Result<Option<Address>, HoprLibError> {
1200        let pubkey = hopr_transport::peer_id_to_public_key(peer_id).map_err(HoprLibError::TransportError)?;
1201
1202        self.chain_api
1203            .packet_key_to_chain_key(&pubkey)
1204            .map_err(HoprLibError::chain)
1205    }
1206
1207    pub fn chain_key_to_peerid(&self, address: &Address) -> Result<Option<PeerId>, HoprLibError> {
1208        self.chain_api
1209            .chain_key_to_packet_key(address)
1210            .map(|pk| pk.map(|v| v.into()))
1211            .map_err(HoprLibError::chain)
1212    }
1213
1214    pub fn channel_by_id(&self, channel_id: &ChannelId) -> Result<Option<ChannelEntry>, HoprLibError> {
1215        self.chain_api.channel_by_id(channel_id).map_err(HoprLibError::chain)
1216    }
1217
1218    pub fn channel(&self, src: &Address, dest: &Address) -> Result<Option<ChannelEntry>, HoprLibError> {
1219        self.chain_api
1220            .channel_by_parties(src, dest)
1221            .map_err(HoprLibError::chain)
1222    }
1223
1224    pub async fn channels_from(&self, src: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1225        Ok(self
1226            .chain_api
1227            .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1228                ChannelStatusDiscriminants::Closed,
1229                ChannelStatusDiscriminants::Open,
1230                ChannelStatusDiscriminants::PendingToClose,
1231            ]))
1232            .map_err(HoprLibError::chain)?
1233            .collect()
1234            .await)
1235    }
1236
1237    pub async fn channels_to(&self, dest: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1238        Ok(self
1239            .chain_api
1240            .stream_channels(
1241                ChannelSelector::default()
1242                    .with_destination(*dest)
1243                    .with_allowed_states(&[
1244                        ChannelStatusDiscriminants::Closed,
1245                        ChannelStatusDiscriminants::Open,
1246                        ChannelStatusDiscriminants::PendingToClose,
1247                    ]),
1248            )
1249            .map_err(HoprLibError::chain)?
1250            .collect()
1251            .await)
1252    }
1253
1254    pub async fn all_channels(&self) -> Result<Vec<ChannelEntry>, HoprLibError> {
1255        Ok(self
1256            .chain_api
1257            .stream_channels(ChannelSelector::default().with_allowed_states(&[
1258                ChannelStatusDiscriminants::Closed,
1259                ChannelStatusDiscriminants::Open,
1260                ChannelStatusDiscriminants::PendingToClose,
1261            ]))
1262            .map_err(HoprLibError::chain)?
1263            .collect()
1264            .await)
1265    }
1266
1267    pub async fn open_channel(
1268        &self,
1269        destination: &Address,
1270        amount: HoprBalance,
1271    ) -> Result<OpenChannelResult, HoprLibError> {
1272        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1273
1274        let channel_id = generate_channel_id(&self.me_onchain(), destination);
1275
1276        // Subscribe to chain events BEFORE sending the transaction to avoid
1277        // a race where the event is broadcast before the subscriber activates.
1278        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1279            format!("open channel to {destination} ({channel_id})"),
1280            move |event| matches!(event, ChainEvent::ChannelOpened(c) if c.get_id() == &channel_id),
1281            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1282        )?;
1283
1284        let confirm_awaiter = self
1285            .chain_api
1286            .open_channel(destination, amount)
1287            .await
1288            .map_err(HoprLibError::chain)?;
1289
1290        let tx_hash = confirm_awaiter.await.map_err(|e| {
1291            event_abort.abort();
1292            HoprLibError::chain(e)
1293        })?;
1294
1295        let event = event_awaiter.await?;
1296        debug!(%event, "open channel event received");
1297
1298        Ok(OpenChannelResult { tx_hash, channel_id })
1299    }
1300
1301    pub async fn fund_channel(&self, channel_id: &ChannelId, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1302        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1303
1304        let channel_id = *channel_id;
1305
1306        // Subscribe to chain events BEFORE sending the transaction to avoid
1307        // a race where the event is broadcast before the subscriber activates.
1308        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1309            format!("fund channel {channel_id}"),
1310            move |event| matches!(event, ChainEvent::ChannelBalanceIncreased(c, a) if c.get_id() == &channel_id && a == &amount),
1311            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT
1312        )?;
1313
1314        let confirm_awaiter = self
1315            .chain_api
1316            .fund_channel(&channel_id, amount)
1317            .await
1318            .map_err(HoprLibError::chain)?;
1319
1320        let res = confirm_awaiter.await.map_err(|e| {
1321            event_abort.abort();
1322            HoprLibError::chain(e)
1323        })?;
1324
1325        let event = event_awaiter.await?;
1326        debug!(%event, "fund channel event received");
1327
1328        Ok(res)
1329    }
1330
1331    pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> Result<CloseChannelResult, HoprLibError> {
1332        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1333
1334        let channel_id = *channel_id;
1335
1336        // Subscribe to chain events BEFORE sending the transaction to avoid
1337        // a race where the event is broadcast before the subscriber activates.
1338        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1339            format!("close channel {channel_id}"),
1340            move |event| {
1341                matches!(event, ChainEvent::ChannelClosed(c) if c.get_id() == &channel_id)
1342                    || matches!(event, ChainEvent::ChannelClosureInitiated(c) if c.get_id() == &channel_id)
1343            },
1344            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1345        )?;
1346
1347        let confirm_awaiter = self
1348            .chain_api
1349            .close_channel(&channel_id)
1350            .await
1351            .map_err(HoprLibError::chain)?;
1352
1353        let tx_hash = confirm_awaiter.await.map_err(|e| {
1354            event_abort.abort();
1355            HoprLibError::chain(e)
1356        })?;
1357
1358        let event = event_awaiter.await?;
1359        debug!(%event, "close channel event received");
1360
1361        Ok(CloseChannelResult { tx_hash })
1362    }
1363
1364    pub fn ticket_management(&self) -> Result<impl TicketManagement + Clone + Send + 'static, HoprLibError> {
1365        self.error_if_not_in_state(HoprState::Running, "Node is not ready for transport operations".into())?;
1366
1367        self.ticket_manager
1368            .get()
1369            .cloned()
1370            .ok_or(HoprLibError::StatusError(HoprStatusError::NotThereYet(
1371                HoprState::Running,
1372                "Node is not ready for transport operations".into(),
1373            )))
1374    }
1375
1376    // TODO: this method can be sync unless we allow querying of the redeemed value from Blokli
1377    pub async fn ticket_statistics(&self) -> Result<ChannelStats, HoprLibError> {
1378        self.ticket_management()?
1379            .ticket_stats(None)
1380            .map_err(HoprLibError::ticket_manager)
1381    }
1382
1383    pub async fn redeem_all_tickets<B: Into<HoprBalance> + Send>(
1384        &self,
1385        min_value: B,
1386    ) -> Result<Vec<RedemptionResult>, HoprLibError> {
1387        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1388
1389        let min_value = min_value.into();
1390
1391        self.ticket_management()?
1392            .redeem_in_channels(
1393                self.chain_api.clone(),
1394                None,
1395                min_value.into(),
1396                Some(Self::PENDING_TO_CLOSE_TOLERANCE),
1397            )
1398            .await
1399            .map_err(HoprLibError::ticket_manager)?
1400            .try_collect::<Vec<_>>()
1401            .map_err(HoprLibError::ticket_manager)
1402            .await
1403    }
1404
1405    pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance> + Send>(
1406        &self,
1407        counterparty: &Address,
1408        min_value: B,
1409    ) -> Result<Vec<RedemptionResult>, HoprLibError> {
1410        self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1411            .await
1412    }
1413
1414    pub async fn redeem_tickets_in_channel<B: Into<HoprBalance> + Send>(
1415        &self,
1416        channel_id: &ChannelId,
1417        min_value: B,
1418    ) -> Result<Vec<RedemptionResult>, HoprLibError> {
1419        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1420
1421        let min_value = min_value.into();
1422
1423        let chain_api = self.chain_api.clone();
1424        let channel_id = *channel_id;
1425        let channel = hopr_async_runtime::prelude::spawn_blocking(move || {
1426            chain_api
1427                .channel_by_id(&channel_id)
1428                .map_err(HoprLibError::chain)?
1429                .ok_or(HoprLibError::GeneralError("channel not found".into()))
1430        })
1431        .await
1432        .map_err(HoprLibError::other)??;
1433
1434        self.ticket_management()?
1435            .redeem_in_channels(
1436                self.chain_api.clone(),
1437                ChannelSelector::default()
1438                    .with_source(channel.source)
1439                    .with_destination(channel.destination)
1440                    .into(),
1441                min_value.into(),
1442                Some(Self::PENDING_TO_CLOSE_TOLERANCE),
1443            )
1444            .await
1445            .map_err(HoprLibError::ticket_manager)?
1446            .map_err(HoprLibError::ticket_manager)
1447            .try_collect::<Vec<_>>()
1448            .await
1449    }
1450
1451    pub fn subscribe_ticket_events(&self) -> impl Stream<Item = TicketEvent> + Send + 'static {
1452        self.ticket_event_subscribers.1.activate_cloned()
1453    }
1454
1455    pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1456        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1457
1458        self.chain_api
1459            .withdraw(amount, &recipient)
1460            .and_then(identity)
1461            .map_err(HoprLibError::chain)
1462            .await
1463    }
1464
1465    pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> Result<Hash, HoprLibError> {
1466        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1467
1468        self.chain_api
1469            .withdraw(amount, &recipient)
1470            .and_then(identity)
1471            .map_err(HoprLibError::chain)
1472            .await
1473    }
1474}