Skip to main content

hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15/// Helper functions.
16mod helpers;
17
18/// Configuration-related public types
19pub mod config;
20/// Various public constants.
21pub mod constants;
22/// Lists all errors thrown from this library.
23pub mod errors;
24/// Public traits for interactions with this library.
25pub mod traits;
26/// Public domain types for peer discovery.
27pub mod types;
28/// Utility module with helper types and functionality over hopr-lib behavior.
29pub mod utils;
30
31pub use hopr_api as api;
32
33/// Exports of libraries necessary for API and interface operations.
34#[doc(hidden)]
35pub mod exports {
36    pub mod types {
37        pub use hopr_api::types::{chain, internal, primitive};
38    }
39
40    pub mod crypto {
41        pub use hopr_api::types::crypto as types;
42        pub use hopr_crypto_keypair as keypair;
43    }
44
45    pub mod network {
46        pub use hopr_network_types as types;
47    }
48
49    pub use hopr_transport as transport;
50}
51
52/// Export of relevant types for easier integration.
53#[doc(hidden)]
54pub mod prelude {
55    #[cfg(feature = "runtime-tokio")]
56    pub use super::exports::network::types::{
57        prelude::ForeignDataMode,
58        udp::{ConnectedUdpStream, UdpStreamParallelism},
59    };
60    pub use super::exports::{
61        crypto::{
62            keypair::key_pair::HoprKeys,
63            types::prelude::{ChainKeypair, Hash, OffchainKeypair},
64        },
65        transport::{OffchainPublicKey, socket::HoprSocket},
66        types::primitive::prelude::Address,
67    };
68}
69
70use std::{
71    convert::identity,
72    future::Future,
73    sync::{Arc, OnceLock, atomic::Ordering},
74    time::Duration,
75};
76
77use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, channel::mpsc::channel, pin_mut};
78use futures_time::future::FutureExt as FuturesTimeFutureExt;
79use hopr_api::{
80    chain::*,
81    ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
82    node::{ChainInfo, CloseChannelResult, OpenChannelResult, SafeModuleConfig, state::AtomicHoprState},
83};
84pub use hopr_api::{
85    graph::EdgeLinkObservable,
86    network::{NetworkBuilder, NetworkStreamControl},
87    node::{HoprNodeNetworkOperations, HoprNodeOperations, state::HoprState},
88    tickets::{ChannelStats, RedemptionResult, TicketManagement, TicketManagementExt},
89    types::{crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
90};
91use hopr_async_runtime::prelude::spawn;
92pub use hopr_async_runtime::{Abortable, AbortableList};
93pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
94pub use hopr_network_types::prelude::*;
95#[cfg(all(feature = "telemetry", not(test)))]
96use hopr_platform::time::native::current_time;
97#[cfg(feature = "runtime-tokio")]
98pub use hopr_transport::transfer_session;
99pub use hopr_transport::*;
100use tracing::{debug, error, info, warn};
101use validator::Validate;
102
103pub use crate::{
104    config::SafeModule,
105    constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
106    errors::{HoprLibError, HoprStatusError},
107    types::{AnnouncedPeer, AnnouncementOrigin},
108};
109
110/// Public routing configuration for session opening in `hopr-lib`.
111///
112/// This intentionally exposes only hop-count based routing.
113#[cfg(feature = "session-client")]
114#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, smart_default::SmartDefault)]
115#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
116pub struct HopRouting(
117    #[default(hopr_api::types::primitive::bounded::BoundedSize::MIN)]
118    hopr_api::types::primitive::bounded::BoundedSize<
119        { hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS },
120    >,
121);
122
123#[cfg(feature = "session-client")]
124impl HopRouting {
125    /// Maximum number of hops that can be configured.
126    pub const MAX_HOPS: usize = hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS;
127
128    /// Returns the configured number of hops.
129    pub fn hop_count(self) -> usize {
130        self.0.into()
131    }
132}
133
134#[cfg(feature = "session-client")]
135impl TryFrom<usize> for HopRouting {
136    type Error = hopr_api::types::primitive::errors::GeneralError;
137
138    fn try_from(value: usize) -> Result<Self, Self::Error> {
139        Ok(Self(value.try_into()?))
140    }
141}
142
143#[cfg(feature = "session-client")]
144impl From<HopRouting> for hopr_api::types::internal::routing::RoutingOptions {
145    fn from(value: HopRouting) -> Self {
146        Self::Hops(value.0)
147    }
148}
149
150/// Session client configuration for `hopr-lib`.
151///
152/// Unlike transport-level configuration, this API intentionally does not expose
153/// explicit intermediate paths.
154#[cfg(feature = "session-client")]
155#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault)]
156pub struct HoprSessionClientConfig {
157    /// Forward route selection policy.
158    pub forward_path: HopRouting,
159    /// Return route selection policy.
160    pub return_path: HopRouting,
161    /// Capabilities offered by the session.
162    #[default(_code = "SessionCapability::Segmentation.into()")]
163    pub capabilities: SessionCapabilities,
164    /// Optional pseudonym used for the session. Mostly useful for testing only.
165    #[default(None)]
166    pub pseudonym: Option<hopr_api::types::internal::protocol::HoprPseudonym>,
167    /// Enable automatic SURB management for the session.
168    #[default(Some(SurbBalancerConfig::default()))]
169    pub surb_management: Option<SurbBalancerConfig>,
170    /// If set, the maximum number of possible SURBs will always be sent with session data packets.
171    #[default(false)]
172    pub always_max_out_surbs: bool,
173}
174
175#[cfg(feature = "session-client")]
176impl From<HoprSessionClientConfig> for hopr_transport::SessionClientConfig {
177    fn from(value: HoprSessionClientConfig) -> Self {
178        Self {
179            forward_path_options: value.forward_path.into(),
180            return_path_options: value.return_path.into(),
181            capabilities: value.capabilities,
182            pseudonym: value.pseudonym,
183            surb_management: value.surb_management,
184            always_max_out_surbs: value.always_max_out_surbs,
185        }
186    }
187}
188
189/// Long-running tasks that are spawned by the HOPR node.
190#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display, strum::EnumCount)]
191pub enum HoprLibProcess {
192    #[strum(to_string = "transport: {0}")]
193    Transport(HoprTransportProcess),
194    #[strum(to_string = "session server providing the exit node session stream functionality")]
195    SessionServer,
196    #[strum(to_string = "ticket redemption queue driver")]
197    TicketRedemptions,
198    #[strum(to_string = "subscription for on-chain channel updates")]
199    ChannelEvents,
200    #[strum(to_string = "on received ticket event (winning or rejected)")]
201    TicketEvents,
202}
203
204#[cfg(all(feature = "telemetry", not(test)))]
205lazy_static::lazy_static! {
206    static ref METRIC_PROCESS_START_TIME:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
207        "hopr_start_time",
208        "The unix timestamp in seconds at which the process was started"
209    ).unwrap();
210    static ref METRIC_HOPR_LIB_VERSION:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
211        "hopr_lib_version",
212        "Executed version of hopr-lib",
213        &["version"]
214    ).unwrap();
215    static ref METRIC_HOPR_NODE_INFO:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
216        "hopr_node_addresses",
217        "Node on-chain and off-chain addresses",
218        &["peerid", "address", "safe_address", "module_address"]
219    ).unwrap();
220}
221
222/// Prepare an optimized version of the tokio runtime setup for hopr-lib specifically.
223///
224/// Divide the available CPU parallelism by 2, since half of the available threads are
225/// to be used for IO-bound and half for CPU-bound tasks.
226#[cfg(feature = "runtime-tokio")]
227pub fn prepare_tokio_runtime(
228    num_cpu_threads: Option<std::num::NonZeroUsize>,
229    num_io_threads: Option<std::num::NonZeroUsize>,
230) -> anyhow::Result<tokio::runtime::Runtime> {
231    use std::str::FromStr;
232    let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
233
234    hopr_parallelize::cpu::init_thread_pool(
235        num_cpu_threads
236            .map(|v| v.get())
237            .or(avail_parallelism)
238            .ok_or(anyhow::anyhow!(
239                "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
240                 environment variable."
241            ))?
242            .max(1),
243    )?;
244
245    Ok(tokio::runtime::Builder::new_multi_thread()
246        .enable_all()
247        .worker_threads(
248            num_io_threads
249                .map(|v| v.get())
250                .or(avail_parallelism)
251                .ok_or(anyhow::anyhow!(
252                    "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
253                     environment variable."
254                ))?
255                .max(1),
256        )
257        .thread_name("hoprd")
258        .thread_stack_size(
259            std::env::var("HOPRD_THREAD_STACK_SIZE")
260                .ok()
261                .and_then(|v| usize::from_str(&v).ok())
262                .unwrap_or(10 * 1024 * 1024)
263                .max(2 * 1024 * 1024),
264        )
265        .build()?)
266}
267
268/// Type alias used to send and receive transport data via a running HOPR node.
269pub type HoprTransportIO = socket::HoprSocket<
270    futures::channel::mpsc::Receiver<ApplicationDataIn>,
271    futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
272>;
273
274type TicketEvents = (
275    async_broadcast::Sender<TicketEvent>,
276    async_broadcast::InactiveReceiver<TicketEvent>,
277);
278
279/// Time to wait until the node's keybinding appears on-chain
280const NODE_READY_TIMEOUT: Duration = Duration::from_secs(120);
281
282/// Timeout to wait until an on-chain event is received in response to a successful on-chain operation resolution.
283// TODO: use the value from ChainInfo instead (once available via https://github.com/hoprnet/blokli/issues/200)
284const ON_CHAIN_RESOLUTION_EVENT_TIMEOUT: Duration = Duration::from_secs(90);
285
286/// HOPR main object providing the entire HOPR node functionality
287///
288/// Instantiating this object creates all processes and objects necessary for
289/// running the HOPR node. Once created, the node can be started using the
290/// `run()` method.
291///
292/// Externally offered API should be enough to perform all necessary tasks
293/// with the HOPR node manually, but it is advised to create such a configuration
294/// that manual interaction is unnecessary.
295///
296/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
297pub struct Hopr<Chain, Graph, Net>
298where
299    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
300        + hopr_api::graph::NetworkGraphUpdate
301        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
302        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
303        + Clone
304        + Send
305        + Sync
306        + 'static,
307    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
308        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
309    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
310    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
311{
312    me: OffchainKeypair,
313    cfg: config::HoprLibConfig,
314    state: Arc<api::node::state::AtomicHoprState>,
315    transport_api: HoprTransport<Chain, Graph, Net>,
316    chain_api: Chain,
317    ticket_event_subscribers: TicketEvents,
318    processes: OnceLock<AbortableList<HoprLibProcess>>,
319}
320
321impl<Chain, Graph, Net> Hopr<Chain, Graph, Net>
322where
323    Chain: HoprChainApi + Clone + Send + Sync + 'static,
324    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
325        + hopr_api::graph::NetworkGraphUpdate
326        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
327        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
328        + Clone
329        + Send
330        + Sync
331        + 'static,
332    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
333        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
334    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
335    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
336{
337    pub async fn new(
338        identity: (&ChainKeypair, &OffchainKeypair),
339        hopr_chain_api: Chain,
340        graph: Graph,
341        cfg: config::HoprLibConfig,
342    ) -> errors::Result<Self> {
343        if hopr_api::types::crypto_random::is_rng_fixed() {
344            warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
345        }
346
347        cfg.validate()?;
348
349        let hopr_transport_api = HoprTransport::new(
350            identity,
351            hopr_chain_api.clone(),
352            graph,
353            vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
354            cfg.protocol.clone(),
355        )
356        .map_err(HoprLibError::TransportError)?;
357
358        #[cfg(all(feature = "telemetry", not(test)))]
359        {
360            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
361            METRIC_HOPR_LIB_VERSION.set(
362                &[const_format::formatcp!("{}", constants::APP_VERSION)],
363                const_format::formatcp!(
364                    "{}.{}",
365                    env!("CARGO_PKG_VERSION_MAJOR"),
366                    env!("CARGO_PKG_VERSION_MINOR")
367                )
368                .parse()
369                .unwrap_or(0.0),
370            );
371        }
372
373        let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
374        new_tickets_tx.set_await_active(false);
375        new_tickets_tx.set_overflow(true);
376
377        Ok(Self {
378            me: identity.1.clone(),
379            cfg,
380            state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
381            transport_api: hopr_transport_api,
382            chain_api: hopr_chain_api,
383            processes: OnceLock::new(),
384            ticket_event_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
385        })
386    }
387
388    fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
389        if HoprNodeOperations::status(self) == state {
390            Ok(())
391        } else {
392            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
393        }
394    }
395
396    pub fn config(&self) -> &config::HoprLibConfig {
397        &self.cfg
398    }
399
400    /// Returns a reference to the network graph.
401    pub fn graph(&self) -> &Graph {
402        self.transport_api.graph()
403    }
404
405    #[inline]
406    fn is_public(&self) -> bool {
407        self.cfg.publish
408    }
409
410    pub async fn run<
411        Ct,
412        NetBuilder,
413        #[cfg(feature = "session-server")] T: traits::HoprSessionServer + Clone + Send + 'static,
414    >(
415        &self,
416        cover_traffic: Ct,
417        network_builder: NetBuilder,
418        #[cfg(feature = "session-server")] serve_handler: T,
419    ) -> errors::Result<HoprTransportIO>
420    where
421        Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
422        NetBuilder: NetworkBuilder<Network = Net> + Send + Sync + 'static,
423    {
424        self.error_if_not_in_state(
425            HoprState::Uninitialized,
426            "cannot start the hopr node multiple times".into(),
427        )?;
428
429        #[cfg(feature = "testing")]
430        warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
431
432        let me_onchain = *self.chain_api.me();
433        info!(
434            address = %me_onchain, minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
435            "node is not started, please fund this node",
436        );
437
438        self.state.store(HoprState::WaitingForFunds, Ordering::Relaxed);
439        helpers::wait_for_funds(
440            *MIN_NATIVE_BALANCE,
441            *SUGGESTED_NATIVE_BALANCE,
442            Duration::from_secs(200),
443            me_onchain,
444            &self.chain_api,
445        )
446        .await?;
447
448        let mut processes = AbortableList::<HoprLibProcess>::default();
449
450        info!("starting HOPR node...");
451        self.state.store(HoprState::CheckingBalance, Ordering::Relaxed);
452
453        let balance: XDaiBalance = self.chain_api.balance(me_onchain).await.map_err(HoprLibError::chain)?;
454        let minimum_balance = *constants::MIN_NATIVE_BALANCE;
455
456        info!(
457            address = %me_onchain,
458            %balance,
459            %minimum_balance,
460            "node information"
461        );
462
463        if balance.le(&minimum_balance) {
464            return Err(HoprLibError::GeneralError(
465                "cannot start the node without a sufficiently funded wallet".into(),
466            ));
467        }
468
469        self.state.store(HoprState::ValidatingNetworkConfig, Ordering::Relaxed);
470
471        // Once we are able to query the chain,
472        // check if the ticket price is configured correctly.
473        let network_min_ticket_price = self
474            .chain_api
475            .minimum_ticket_price()
476            .await
477            .map_err(HoprLibError::chain)?;
478        let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
479        if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
480            return Err(HoprLibError::GeneralError(format!(
481                "configured outgoing ticket price is lower than the network minimum ticket price: \
482                 {configured_ticket_price:?} < {network_min_ticket_price}"
483            )));
484        }
485        // Once we are able to query the chain,
486        // check if the winning probability is configured correctly.
487        let network_min_win_prob = self
488            .chain_api
489            .minimum_incoming_ticket_win_prob()
490            .await
491            .map_err(HoprLibError::chain)?;
492        let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
493        if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
494            && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
495        {
496            return Err(HoprLibError::GeneralError(format!(
497                "configured outgoing ticket winning probability is lower than the network minimum winning \
498                 probability: {configured_win_prob:?} < {network_min_win_prob}"
499            )));
500        }
501
502        self.state.store(HoprState::CheckingOnchainAddress, Ordering::Relaxed);
503
504        info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
505
506        let safe_addr = self.cfg.safe_module.safe_address;
507
508        if self.me_onchain() == safe_addr {
509            return Err(HoprLibError::GeneralError(
510                "cannot use self as staking safe address".into(),
511            ));
512        }
513
514        self.state.store(HoprState::RegisteringSafe, Ordering::Relaxed);
515        info!(%safe_addr, "registering safe with this node");
516        match self.chain_api.register_safe(&safe_addr).await {
517            Ok(awaiter) => {
518                // Wait until the registration is confirmed on-chain, otherwise we cannot proceed.
519                awaiter.await.map_err(|error| {
520                    error!(%safe_addr, %error, "safe registration failed with error");
521                    HoprLibError::chain(error)
522                })?;
523                info!(%safe_addr, "safe successfully registered with this node");
524            }
525            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
526                info!(%safe_addr, "this safe is already registered with this node");
527            }
528            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
529                // TODO: support safe deregistration flow
530                error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
531                return Err(HoprLibError::GeneralError("node registered with different safe".into()));
532            }
533            Err(error) => {
534                error!(%safe_addr, %error, "safe registration failed");
535                return Err(HoprLibError::chain(error));
536            }
537        }
538
539        // Only public nodes announce multiaddresses
540        let multiaddresses_to_announce = if self.is_public() {
541            // The multiaddresses are filtered for the non-private ones,
542            // unless `announce_local_addresses` is set to `true`.
543            self.transport_api.announceable_multiaddresses()
544        } else {
545            Vec::with_capacity(0)
546        };
547
548        // Warn when announcing a private multiaddress, which is acceptable in certain scenarios
549        multiaddresses_to_announce
550            .iter()
551            .filter(|a| !is_public_address(a))
552            .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
553
554        self.state.store(HoprState::AnnouncingNode, Ordering::Relaxed);
555
556        let chain_api = self.chain_api.clone();
557        let me_offchain = *self.me.public();
558        let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
559
560        // At this point the node is already registered with Safe, so
561        // we can announce via Safe-compliant TX
562        info!(?multiaddresses_to_announce, "announcing node on chain");
563        match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
564            Ok(awaiter) => {
565                // Wait until the announcement is confirmed on-chain, otherwise we cannot proceed.
566                awaiter.await.map_err(|error| {
567                    error!(?multiaddresses_to_announce, %error, "node announcement failed");
568                    HoprLibError::chain(error)
569                })?;
570                info!(?multiaddresses_to_announce, "node has been successfully announced");
571            }
572            Err(AnnouncementError::AlreadyAnnounced) => {
573                info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
574            }
575            Err(error) => {
576                error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
577                return Err(HoprLibError::chain(error));
578            }
579        }
580
581        self.state.store(HoprState::AwaitingKeyBinding, Ordering::Relaxed);
582
583        // Wait for the node key-binding readiness to return
584        let this_node_account = node_ready
585            .await
586            .map_err(HoprLibError::other)?
587            .map_err(HoprLibError::chain)?;
588        if this_node_account.chain_addr != self.me_onchain()
589            || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
590        {
591            error!(%this_node_account, "account bound to offchain key does not match this node");
592            return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
593        }
594
595        info!(%this_node_account, "node account is ready");
596
597        self.state.store(HoprState::InitializingServices, Ordering::Relaxed);
598
599        info!("initializing session infrastructure");
600        let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
601            .ok()
602            .and_then(|s| s.trim().parse::<usize>().ok())
603            .filter(|&c| c > 0)
604            .unwrap_or(256);
605
606        let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
607        #[cfg(feature = "session-server")]
608        {
609            debug!(capacity = incoming_session_channel_capacity, "creating session server");
610            processes.insert(
611                HoprLibProcess::SessionServer,
612                hopr_async_runtime::spawn_as_abortable!(
613                    _session_rx
614                        .for_each_concurrent(None, move |session| {
615                            let serve_handler = serve_handler.clone();
616                            async move {
617                                let session_id = *session.session.id();
618                                match serve_handler.process(session).await {
619                                    Ok(_) => debug!(?session_id, "client session processed successfully"),
620                                    Err(error) => error!(
621                                        ?session_id,
622                                        %error,
623                                        "client session processing failed"
624                                    ),
625                                }
626                            }
627                        })
628                        .inspect(|_| tracing::warn!(
629                            task = %HoprLibProcess::SessionServer,
630                            "long-running background task finished"
631                        ))
632                ),
633            );
634        }
635
636        info!("starting ticket events processor");
637        let (tickets_tx, tickets_rx) = channel(8192);
638        let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
639        processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
640        let new_ticket_tx = self.ticket_event_subscribers.0.clone();
641        spawn(
642            tickets_rx
643                .for_each(move |event| {
644                    if let Err(error) = new_ticket_tx.try_broadcast(event) {
645                        tracing::error!(%error, "failed to broadcast new ticket event to subscribers");
646                    }
647                    futures::future::ready(())
648                })
649                .inspect(|_| {
650                    tracing::warn!(
651                        task = %HoprLibProcess::TicketEvents,
652                        "long-running background task finished"
653                    )
654                }),
655        );
656
657        info!("starting transport");
658        let (hopr_socket, transport_processes) = self
659            .transport_api
660            .run(cover_traffic, network_builder, tickets_tx, session_tx)
661            .await?;
662        processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
663
664        info!("subscribing to channel events");
665        let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
666        processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
667        let chain = self.chain_api.clone();
668        let events = chain.subscribe().map_err(HoprLibError::chain)?;
669        let tmgr = self.transport_api.ticket_manager();
670        spawn(
671            futures::stream::Abortable::new(
672                events
673                    .filter_map(move |event|
674                        futures::future::ready(event.try_as_channel_closed())
675                    ),
676                chain_events_sub_reg
677            )
678            .for_each(move |closed_channel| {
679                let chain = chain.clone();
680                let tmgr = tmgr.clone();
681                async move {
682                    match closed_channel.direction(chain.me()) {
683                        Some(ChannelDirection::Incoming) => {
684                            match tmgr.neglect_tickets(closed_channel.get_id(), None) {
685                                Ok(neglected) if !neglected.is_empty() => {
686                                    warn!(num_neglected = neglected.len(), %closed_channel, "tickets on incoming closed channel were neglected");
687                                },
688                                Ok(_) => {
689                                    debug!(%closed_channel, "no neglected tickets on incoming closed channel");
690                                },
691                                Err(error) => {
692                                    error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
693                                }
694                            }
695                        },
696                        Some(ChannelDirection::Outgoing) => {
697                            // Removal of outgoing ticket index is done automatically be the ticket manager
698                            // when new epoch is encountered
699                        }
700                        _ => {} // Event for a channel that is not our own
701                    }
702                }
703            })
704            .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
705        );
706
707        self.state.store(HoprState::Running, Ordering::Relaxed);
708
709        info!(
710            id = %self.me_peer_id(),
711            version = constants::APP_VERSION,
712            "NODE STARTED AND RUNNING"
713        );
714
715        #[cfg(all(feature = "telemetry", not(test)))]
716        METRIC_HOPR_NODE_INFO.set(
717            &[
718                &self.me.public().to_peerid_str(),
719                &me_onchain.to_string(),
720                &self.cfg.safe_module.safe_address.to_string(),
721                &self.cfg.safe_module.module_address.to_string(),
722            ],
723            1.0,
724        );
725
726        let _ = self.processes.set(processes);
727        Ok(hopr_socket)
728    }
729
730    /// Used to practically shut down all node's processes without dropping the instance.
731    ///
732    /// This means that the instance can be used to retrieve some information, but all
733    /// active operations will stop and new will be impossible to perform.
734    /// Such operations will return [`HoprStatusError::NotThereYet`].
735    ///
736    /// This is the final state and cannot be reversed by calling `run` again.
737    pub fn shutdown(&self) -> Result<(), HoprLibError> {
738        self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
739        if let Some(processes) = self.processes.get() {
740            processes.abort_all();
741        }
742        self.state.store(HoprState::Terminated, Ordering::Relaxed);
743        info!("NODE SHUTDOWN COMPLETE");
744        Ok(())
745    }
746
747    /// Create a client session connection returning a session object that implements
748    /// [`futures::io::AsyncRead`] and [`futures::io::AsyncWrite`] and can bu used as a read/write binary session.
749    #[cfg(feature = "session-client")]
750    pub async fn connect_to(
751        &self,
752        destination: Address,
753        target: SessionTarget,
754        cfg: HoprSessionClientConfig,
755    ) -> errors::Result<HoprSession> {
756        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
757
758        let backoff = backon::ConstantBuilder::default()
759            .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
760            .with_delay(self.cfg.protocol.session.establish_retry_timeout)
761            .with_jitter();
762
763        use backon::Retryable;
764
765        Ok((|| {
766            let cfg = hopr_transport::SessionClientConfig::from(cfg.clone());
767            let target = target.clone();
768            async { self.transport_api.new_session(destination, target, cfg).await }
769        })
770        .retry(backoff)
771        .sleep(backon::FuturesTimerSleeper)
772        .await?)
773    }
774
775    /// Sends keep-alive to the given [`SessionId`], making sure the session is not
776    /// closed due to inactivity.
777    #[cfg(feature = "session-client")]
778    pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
779        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
780        Ok(self.transport_api.probe_session(id).await?)
781    }
782
783    #[cfg(feature = "session-client")]
784    pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
785        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
786        Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
787    }
788
789    #[cfg(feature = "session-client")]
790    pub async fn update_session_surb_balancer_config(
791        &self,
792        id: &SessionId,
793        cfg: SurbBalancerConfig,
794    ) -> errors::Result<()> {
795        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
796        Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
797    }
798
799    /// Spawns a one-shot awaiter that hooks up to the [`ChainEvent`] bus and either matching the given `predicate`
800    /// successfully or timing out after `timeout`.
801    fn spawn_wait_for_on_chain_event(
802        &self,
803        context: impl std::fmt::Display,
804        predicate: impl Fn(&ChainEvent) -> bool + Send + Sync + 'static,
805        timeout: Duration,
806    ) -> errors::Result<(
807        impl Future<Output = errors::Result<ChainEvent>>,
808        hopr_async_runtime::AbortHandle,
809    )> {
810        debug!(%context, "registering wait for on-chain event");
811        let (event_stream, handle) = futures::stream::abortable(
812            self.chain_api
813                .subscribe()
814                .map_err(HoprLibError::chain)?
815                .skip_while(move |event| futures::future::ready(!predicate(event))),
816        );
817
818        let ctx = context.to_string();
819
820        Ok((
821            spawn(async move {
822                pin_mut!(event_stream);
823                let res = event_stream
824                    .next()
825                    .timeout(futures_time::time::Duration::from(timeout))
826                    .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")))
827                    .await?
828                    .ok_or(HoprLibError::GeneralError(format!(
829                        "failed to yield an on-chain event for {ctx}"
830                    )));
831                debug!(%ctx, ?res, "on-chain event waiting done");
832                res
833            })
834            .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")))
835            .and_then(futures::future::ready),
836            handle,
837        ))
838    }
839}
840
841impl<Chain, Graph, Net> Hopr<Chain, Graph, Net>
842where
843    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
844        + hopr_api::graph::NetworkGraphUpdate
845        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
846        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
847        + Clone
848        + Send
849        + Sync
850        + 'static,
851    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
852        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
853    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
854    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
855{
856    // === telemetry
857    /// Prometheus formatted metrics collected by the hopr-lib components.
858    pub fn collect_hopr_metrics() -> errors::Result<String> {
859        cfg_if::cfg_if! {
860            if #[cfg(all(feature = "telemetry", not(test)))] {
861                hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
862            } else {
863                Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
864            }
865        }
866    }
867}
868
869// === Trait implementations for the high-level node API ===
870
871impl<Chain, Graph, Net> HoprNodeOperations for Hopr<Chain, Graph, Net>
872where
873    Chain: HoprChainApi + Clone + Send + Sync + 'static,
874    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
875        + hopr_api::graph::NetworkGraphUpdate
876        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
877        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
878        + Clone
879        + Send
880        + Sync
881        + 'static,
882    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
883        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
884    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
885    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
886{
887    fn status(&self) -> HoprState {
888        self.state.load(Ordering::Relaxed)
889    }
890}
891
892#[async_trait::async_trait]
893impl<Chain, Graph, Net> HoprNodeNetworkOperations for Hopr<Chain, Graph, Net>
894where
895    Chain: HoprChainApi + Clone + Send + Sync + 'static,
896    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
897        + hopr_api::graph::NetworkGraphUpdate
898        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
899        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
900        + Clone
901        + Send
902        + Sync
903        + 'static,
904    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
905        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
906    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
907    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
908{
909    type Error = HoprLibError;
910    type TransportObservable = <Graph as hopr_api::graph::NetworkGraphView>::Observed;
911
912    fn me_peer_id(&self) -> PeerId {
913        (*self.me.public()).into()
914    }
915
916    async fn get_public_nodes(&self) -> Result<Vec<(PeerId, Address, Vec<Multiaddr>)>, Self::Error> {
917        Ok(self
918            .chain_api
919            .stream_accounts(AccountSelector {
920                public_only: true,
921                ..Default::default()
922            })
923            .map_err(HoprLibError::chain)
924            .await?
925            .map(|entry| {
926                (
927                    PeerId::from(entry.public_key),
928                    entry.chain_addr,
929                    entry.get_multiaddrs().to_vec(),
930                )
931            })
932            .collect()
933            .await)
934    }
935
936    async fn network_health(&self) -> hopr_api::network::Health {
937        self.transport_api.network_health().await
938    }
939
940    async fn network_connected_peers(&self) -> Result<Vec<PeerId>, Self::Error> {
941        Ok(self
942            .transport_api
943            .network_connected_peers()
944            .await?
945            .into_iter()
946            .map(PeerId::from)
947            .collect())
948    }
949
950    fn network_peer_info(&self, peer: &PeerId) -> Option<Self::TransportObservable> {
951        let pubkey = OffchainPublicKey::from_peerid(peer).ok()?;
952        self.transport_api.network_peer_observations(&pubkey)
953    }
954
955    async fn all_network_peers(
956        &self,
957        minimum_score: f64,
958    ) -> Result<Vec<(Option<Address>, PeerId, Self::TransportObservable)>, Self::Error> {
959        Ok(
960            futures::stream::iter(self.transport_api.all_network_peers(minimum_score).await?)
961                .filter_map(|(pubkey, info)| async move {
962                    let peer_id = PeerId::from(pubkey);
963                    let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
964                    Some((address, peer_id, info))
965                })
966                .collect::<Vec<_>>()
967                .await,
968        )
969    }
970
971    fn local_multiaddresses(&self) -> Vec<Multiaddr> {
972        self.transport_api.local_multiaddresses()
973    }
974
975    async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
976        self.transport_api.listening_multiaddresses().await
977    }
978
979    async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
980        let Ok(pubkey) = hopr_transport::peer_id_to_public_key(peer).await else {
981            return vec![];
982        };
983        self.transport_api.network_observed_multiaddresses(&pubkey).await
984    }
985
986    async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Result<Vec<Multiaddr>, Self::Error> {
987        let pubkey = hopr_transport::peer_id_to_public_key(peer)
988            .await
989            .map_err(HoprLibError::TransportError)?;
990
991        match self
992            .chain_api
993            .stream_accounts(AccountSelector {
994                public_only: false,
995                offchain_key: Some(pubkey),
996                ..Default::default()
997            })
998            .map_err(HoprLibError::chain)
999            .await?
1000            .next()
1001            .await
1002        {
1003            Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
1004            None => {
1005                error!(%peer, "no information");
1006                Ok(vec![])
1007            }
1008        }
1009    }
1010
1011    async fn ping(&self, peer: &PeerId) -> Result<(Duration, Self::TransportObservable), Self::Error> {
1012        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1013        let pubkey = hopr_transport::peer_id_to_public_key(peer)
1014            .await
1015            .map_err(HoprLibError::TransportError)?;
1016        Ok(self.transport_api.ping(&pubkey).await?)
1017    }
1018}
1019
1020impl<Chain, Graph, Net> Hopr<Chain, Graph, Net>
1021where
1022    Chain: HoprChainApi + Clone + Send + Sync + 'static,
1023    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
1024        + hopr_api::graph::NetworkGraphUpdate
1025        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
1026        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1027        + Clone
1028        + Send
1029        + Sync
1030        + 'static,
1031    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
1032        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1033    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1034    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1035{
1036    // Minimum grace period left in PendingToClose channel, so
1037    // that ticket redemption should still be attempted.
1038    const PENDING_TO_CLOSE_TOLERANCE: Duration = Duration::from_secs(5);
1039
1040    pub fn me_onchain(&self) -> Address {
1041        *self.chain_api.me()
1042    }
1043
1044    pub fn get_safe_config(&self) -> SafeModuleConfig {
1045        SafeModuleConfig {
1046            safe_address: self.cfg.safe_module.safe_address,
1047            module_address: self.cfg.safe_module.module_address,
1048        }
1049    }
1050
1051    pub async fn get_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1052        self.chain_api
1053            .balance(self.me_onchain())
1054            .await
1055            .map_err(HoprLibError::chain)
1056    }
1057
1058    pub async fn get_safe_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1059        self.chain_api
1060            .balance(self.cfg.safe_module.safe_address)
1061            .await
1062            .map_err(HoprLibError::chain)
1063    }
1064
1065    pub async fn safe_allowance(&self) -> Result<HoprBalance, HoprLibError> {
1066        self.chain_api
1067            .safe_allowance(self.cfg.safe_module.safe_address)
1068            .await
1069            .map_err(HoprLibError::chain)
1070    }
1071
1072    pub async fn chain_info(&self) -> Result<ChainInfo, HoprLibError> {
1073        self.chain_api.chain_info().await.map_err(HoprLibError::chain)
1074    }
1075
1076    pub async fn get_ticket_price(&self) -> Result<HoprBalance, HoprLibError> {
1077        self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1078    }
1079
1080    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> Result<WinningProbability, HoprLibError> {
1081        self.chain_api
1082            .minimum_incoming_ticket_win_prob()
1083            .await
1084            .map_err(HoprLibError::chain)
1085    }
1086
1087    pub async fn get_channel_closure_notice_period(&self) -> Result<Duration, HoprLibError> {
1088        self.chain_api
1089            .channel_closure_notice_period()
1090            .await
1091            .map_err(HoprLibError::chain)
1092    }
1093
1094    pub async fn announced_peers(&self) -> Result<Vec<AnnouncedPeer>, HoprLibError> {
1095        Ok(self
1096            .chain_api
1097            .stream_accounts(AccountSelector {
1098                public_only: true,
1099                ..Default::default()
1100            })
1101            .map_err(HoprLibError::chain)
1102            .await?
1103            .map(|entry| AnnouncedPeer {
1104                address: entry.chain_addr,
1105                multiaddresses: entry.get_multiaddrs().to_vec(),
1106                origin: AnnouncementOrigin::Chain,
1107            })
1108            .collect()
1109            .await)
1110    }
1111
1112    pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> Result<Option<Address>, HoprLibError> {
1113        let pubkey = hopr_transport::peer_id_to_public_key(peer_id)
1114            .await
1115            .map_err(HoprLibError::TransportError)?;
1116
1117        self.chain_api
1118            .packet_key_to_chain_key(&pubkey)
1119            .await
1120            .map_err(HoprLibError::chain)
1121    }
1122
1123    pub async fn chain_key_to_peerid(&self, address: &Address) -> Result<Option<PeerId>, HoprLibError> {
1124        self.chain_api
1125            .chain_key_to_packet_key(address)
1126            .await
1127            .map(|pk| pk.map(|v| v.into()))
1128            .map_err(HoprLibError::chain)
1129    }
1130
1131    pub async fn channel_from_hash(&self, channel_id: &Hash) -> Result<Option<ChannelEntry>, HoprLibError> {
1132        self.chain_api
1133            .channel_by_id(channel_id)
1134            .await
1135            .map_err(HoprLibError::chain)
1136    }
1137
1138    pub async fn channel(&self, src: &Address, dest: &Address) -> Result<Option<ChannelEntry>, HoprLibError> {
1139        self.chain_api
1140            .channel_by_parties(src, dest)
1141            .await
1142            .map_err(HoprLibError::chain)
1143    }
1144
1145    pub async fn channels_from(&self, src: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1146        Ok(self
1147            .chain_api
1148            .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1149                ChannelStatusDiscriminants::Closed,
1150                ChannelStatusDiscriminants::Open,
1151                ChannelStatusDiscriminants::PendingToClose,
1152            ]))
1153            .map_err(HoprLibError::chain)
1154            .await?
1155            .collect()
1156            .await)
1157    }
1158
1159    pub async fn channels_to(&self, dest: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1160        Ok(self
1161            .chain_api
1162            .stream_channels(
1163                ChannelSelector::default()
1164                    .with_destination(*dest)
1165                    .with_allowed_states(&[
1166                        ChannelStatusDiscriminants::Closed,
1167                        ChannelStatusDiscriminants::Open,
1168                        ChannelStatusDiscriminants::PendingToClose,
1169                    ]),
1170            )
1171            .map_err(HoprLibError::chain)
1172            .await?
1173            .collect()
1174            .await)
1175    }
1176
1177    pub async fn all_channels(&self) -> Result<Vec<ChannelEntry>, HoprLibError> {
1178        Ok(self
1179            .chain_api
1180            .stream_channels(ChannelSelector::default().with_allowed_states(&[
1181                ChannelStatusDiscriminants::Closed,
1182                ChannelStatusDiscriminants::Open,
1183                ChannelStatusDiscriminants::PendingToClose,
1184            ]))
1185            .map_err(HoprLibError::chain)
1186            .await?
1187            .collect()
1188            .await)
1189    }
1190
1191    pub async fn open_channel(
1192        &self,
1193        destination: &Address,
1194        amount: HoprBalance,
1195    ) -> Result<OpenChannelResult, HoprLibError> {
1196        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1197
1198        let channel_id = generate_channel_id(&self.me_onchain(), destination);
1199
1200        // Subscribe to chain events BEFORE sending the transaction to avoid
1201        // a race where the event is broadcast before the subscriber activates.
1202        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1203            format!("open channel to {destination} ({channel_id})"),
1204            move |event| matches!(event, ChainEvent::ChannelOpened(c) if c.get_id() == &channel_id),
1205            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1206        )?;
1207
1208        let confirm_awaiter = self
1209            .chain_api
1210            .open_channel(destination, amount)
1211            .await
1212            .map_err(HoprLibError::chain)?;
1213
1214        let tx_hash = confirm_awaiter.await.map_err(|e| {
1215            event_abort.abort();
1216            HoprLibError::chain(e)
1217        })?;
1218
1219        let event = event_awaiter.await?;
1220        debug!(%event, "open channel event received");
1221
1222        Ok(OpenChannelResult { tx_hash, channel_id })
1223    }
1224
1225    pub async fn fund_channel(&self, channel_id: &ChannelId, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1226        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1227
1228        let channel_id = *channel_id;
1229
1230        // Subscribe to chain events BEFORE sending the transaction to avoid
1231        // a race where the event is broadcast before the subscriber activates.
1232        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1233            format!("fund channel {channel_id}"),
1234            move |event| matches!(event, ChainEvent::ChannelBalanceIncreased(c, a) if c.get_id() == &channel_id && a == &amount),
1235            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT
1236        )?;
1237
1238        let confirm_awaiter = self
1239            .chain_api
1240            .fund_channel(&channel_id, amount)
1241            .await
1242            .map_err(HoprLibError::chain)?;
1243
1244        let res = confirm_awaiter.await.map_err(|e| {
1245            event_abort.abort();
1246            HoprLibError::chain(e)
1247        })?;
1248
1249        let event = event_awaiter.await?;
1250        debug!(%event, "fund channel event received");
1251
1252        Ok(res)
1253    }
1254
1255    pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> Result<CloseChannelResult, HoprLibError> {
1256        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1257
1258        let channel_id = *channel_id;
1259
1260        // Subscribe to chain events BEFORE sending the transaction to avoid
1261        // a race where the event is broadcast before the subscriber activates.
1262        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1263            format!("close channel {channel_id}"),
1264            move |event| {
1265                matches!(event, ChainEvent::ChannelClosed(c) if c.get_id() == &channel_id)
1266                    || matches!(event, ChainEvent::ChannelClosureInitiated(c) if c.get_id() == &channel_id)
1267            },
1268            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1269        )?;
1270
1271        let confirm_awaiter = self
1272            .chain_api
1273            .close_channel(&channel_id)
1274            .await
1275            .map_err(HoprLibError::chain)?;
1276
1277        let tx_hash = confirm_awaiter.await.map_err(|e| {
1278            event_abort.abort();
1279            HoprLibError::chain(e)
1280        })?;
1281
1282        let event = event_awaiter.await?;
1283        debug!(%event, "close channel event received");
1284
1285        Ok(CloseChannelResult { tx_hash })
1286    }
1287
1288    // TODO: this method can be sync unless we allow querying of the redeemed value from Blokli
1289    pub async fn ticket_statistics(&self) -> Result<ChannelStats, HoprLibError> {
1290        self.transport_api
1291            .ticket_manager()
1292            .ticket_stats(None)
1293            .map_err(HoprLibError::ticket_manager)
1294    }
1295
1296    pub async fn redeem_all_tickets<B: Into<HoprBalance> + Send>(
1297        &self,
1298        min_value: B,
1299    ) -> Result<Vec<RedemptionResult>, HoprLibError> {
1300        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1301
1302        let min_value = min_value.into();
1303
1304        self.ticket_management()
1305            .redeem_in_channels(
1306                self.chain_api.clone(),
1307                None,
1308                min_value.into(),
1309                Some(Self::PENDING_TO_CLOSE_TOLERANCE),
1310            )
1311            .await
1312            .map_err(HoprLibError::ticket_manager)?
1313            .try_collect::<Vec<_>>()
1314            .map_err(HoprLibError::ticket_manager)
1315            .await
1316    }
1317
1318    pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance> + Send>(
1319        &self,
1320        counterparty: &Address,
1321        min_value: B,
1322    ) -> Result<Vec<RedemptionResult>, HoprLibError> {
1323        self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1324            .await
1325    }
1326
1327    pub async fn redeem_tickets_in_channel<B: Into<HoprBalance> + Send>(
1328        &self,
1329        channel_id: &ChannelId,
1330        min_value: B,
1331    ) -> Result<Vec<RedemptionResult>, HoprLibError> {
1332        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1333
1334        let min_value = min_value.into();
1335
1336        let channel = self
1337            .chain_api
1338            .channel_by_id(channel_id)
1339            .await
1340            .map_err(HoprLibError::chain)?
1341            .ok_or(HoprLibError::GeneralError("channel not found".into()))?;
1342
1343        self.transport_api
1344            .ticket_manager()
1345            .redeem_in_channels(
1346                self.chain_api.clone(),
1347                ChannelSelector::default()
1348                    .with_source(channel.source)
1349                    .with_destination(channel.destination)
1350                    .into(),
1351                min_value.into(),
1352                Some(Self::PENDING_TO_CLOSE_TOLERANCE),
1353            )
1354            .await
1355            .map_err(HoprLibError::ticket_manager)?
1356            .map_err(HoprLibError::ticket_manager)
1357            .try_collect::<Vec<_>>()
1358            .await
1359    }
1360
1361    pub fn ticket_management(&self) -> impl TicketManagement + Clone + Send + 'static {
1362        self.transport_api.ticket_manager()
1363    }
1364
1365    pub fn subscribe_ticket_events(&self) -> impl Stream<Item = TicketEvent> + Send + 'static {
1366        self.ticket_event_subscribers.1.activate_cloned()
1367    }
1368
1369    pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1370        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1371
1372        self.chain_api
1373            .withdraw(amount, &recipient)
1374            .and_then(identity)
1375            .map_err(HoprLibError::chain)
1376            .await
1377    }
1378
1379    pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> Result<Hash, HoprLibError> {
1380        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1381
1382        self.chain_api
1383            .withdraw(amount, &recipient)
1384            .and_then(identity)
1385            .map_err(HoprLibError::chain)
1386            .await
1387    }
1388}