Skip to main content

hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15
16/// Helper functions.
17mod helpers;
18
19/// Configuration-related public types
20pub mod config;
21/// Various public constants.
22pub mod constants;
23/// Lists all errors thrown from this library.
24pub mod errors;
25/// Public traits for interactions with this library.
26pub mod traits;
27/// Utility module with helper types and functionality over hopr-lib behavior.
28pub mod utils;
29
30pub use hopr_api as api;
31
32/// Exports of libraries necessary for API and interface operations.
33#[doc(hidden)]
34pub mod exports {
35    pub mod types {
36        pub use hopr_api::types::{chain, internal, primitive};
37    }
38
39    pub mod crypto {
40        pub use hopr_api::types::crypto as types;
41        pub use hopr_crypto_keypair as keypair;
42    }
43
44    pub mod network {
45        pub use hopr_network_types as types;
46    }
47
48    pub use hopr_transport as transport;
49}
50
51/// Export of relevant types for easier integration.
52#[doc(hidden)]
53pub mod prelude {
54    #[cfg(feature = "runtime-tokio")]
55    pub use super::exports::network::types::{
56        prelude::ForeignDataMode,
57        udp::{ConnectedUdpStream, UdpStreamParallelism},
58    };
59    pub use super::exports::{
60        crypto::{
61            keypair::key_pair::HoprKeys,
62            types::prelude::{ChainKeypair, Hash, OffchainKeypair},
63        },
64        transport::{OffchainPublicKey, socket::HoprSocket},
65        types::primitive::prelude::Address,
66    };
67}
68
69use std::{
70    convert::identity,
71    future::Future,
72    sync::{Arc, OnceLock, atomic::Ordering},
73    time::Duration,
74};
75
76use futures::{
77    FutureExt, SinkExt, Stream, StreamExt, TryFutureExt,
78    channel::mpsc::{SendError, channel},
79    pin_mut,
80    sink::SinkMapErr,
81};
82use futures_time::future::FutureExt as FuturesTimeFutureExt;
83use hopr_api::{
84    chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
85    ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
86    db::{HoprNodeDbApi, TicketMarker, TicketSelector},
87    node::{ChainInfo, CloseChannelResult, OpenChannelResult, SafeModuleConfig, state::AtomicHoprState},
88};
89pub use hopr_api::{
90    db::ChannelTicketStatistics,
91    graph::EdgeLinkObservable,
92    network::{NetworkBuilder, NetworkStreamControl},
93    node::{HoprNodeNetworkOperations, HoprNodeOperations, state::HoprState},
94    types::{crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
95};
96use hopr_async_runtime::prelude::spawn;
97pub use hopr_async_runtime::{Abortable, AbortableList};
98pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
99pub use hopr_network_types::prelude::*;
100#[cfg(all(feature = "telemetry", not(test)))]
101use hopr_platform::time::native::current_time;
102use hopr_transport::errors::HoprTransportError;
103#[cfg(feature = "runtime-tokio")]
104pub use hopr_transport::transfer_session;
105pub use hopr_transport::*;
106use tracing::{debug, error, info, warn};
107use validator::Validate;
108
109pub use crate::{
110    config::SafeModule,
111    constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
112    errors::{HoprLibError, HoprStatusError},
113};
114
115/// Public routing configuration for session opening in `hopr-lib`.
116///
117/// This intentionally exposes only hop-count based routing.
118#[cfg(feature = "session-client")]
119#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, smart_default::SmartDefault)]
120#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
121pub struct HopRouting(
122    #[default(hopr_api::types::primitive::bounded::BoundedSize::MIN)]
123    hopr_api::types::primitive::bounded::BoundedSize<
124        { hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS },
125    >,
126);
127
128#[cfg(feature = "session-client")]
129impl HopRouting {
130    /// Maximum number of hops that can be configured.
131    pub const MAX_HOPS: usize = hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS;
132
133    /// Returns the configured number of hops.
134    pub fn hop_count(self) -> usize {
135        self.0.into()
136    }
137}
138
139#[cfg(feature = "session-client")]
140impl TryFrom<usize> for HopRouting {
141    type Error = hopr_api::types::primitive::errors::GeneralError;
142
143    fn try_from(value: usize) -> Result<Self, Self::Error> {
144        Ok(Self(value.try_into()?))
145    }
146}
147
148#[cfg(feature = "session-client")]
149impl From<HopRouting> for hopr_api::types::internal::routing::RoutingOptions {
150    fn from(value: HopRouting) -> Self {
151        Self::Hops(value.0)
152    }
153}
154
155/// Session client configuration for `hopr-lib`.
156///
157/// Unlike transport-level configuration, this API intentionally does not expose
158/// explicit intermediate paths.
159#[cfg(feature = "session-client")]
160#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault)]
161pub struct HoprSessionClientConfig {
162    /// Forward route selection policy.
163    pub forward_path: HopRouting,
164    /// Return route selection policy.
165    pub return_path: HopRouting,
166    /// Capabilities offered by the session.
167    #[default(_code = "SessionCapability::Segmentation.into()")]
168    pub capabilities: SessionCapabilities,
169    /// Optional pseudonym used for the session. Mostly useful for testing only.
170    #[default(None)]
171    pub pseudonym: Option<hopr_api::types::internal::protocol::HoprPseudonym>,
172    /// Enable automatic SURB management for the session.
173    #[default(Some(SurbBalancerConfig::default()))]
174    pub surb_management: Option<SurbBalancerConfig>,
175    /// If set, the maximum number of possible SURBs will always be sent with session data packets.
176    #[default(false)]
177    pub always_max_out_surbs: bool,
178}
179
180#[cfg(feature = "session-client")]
181impl From<HoprSessionClientConfig> for hopr_transport::SessionClientConfig {
182    fn from(value: HoprSessionClientConfig) -> Self {
183        Self {
184            forward_path_options: value.forward_path.into(),
185            return_path_options: value.return_path.into(),
186            capabilities: value.capabilities,
187            pseudonym: value.pseudonym,
188            surb_management: value.surb_management,
189            always_max_out_surbs: value.always_max_out_surbs,
190        }
191    }
192}
193
194/// Long-running tasks that are spawned by the HOPR node.
195#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display, strum::EnumCount)]
196pub enum HoprLibProcess {
197    #[strum(to_string = "transport: {0}")]
198    Transport(HoprTransportProcess),
199    #[strum(to_string = "session server providing the exit node session stream functionality")]
200    SessionServer,
201    #[strum(to_string = "ticket redemption queue driver")]
202    TicketRedemptions,
203    #[strum(to_string = "subscription for on-chain channel updates")]
204    ChannelEvents,
205    #[strum(to_string = "on received ticket event (winning or rejected)")]
206    TicketEvents,
207}
208
209#[cfg(all(feature = "telemetry", not(test)))]
210lazy_static::lazy_static! {
211    static ref METRIC_PROCESS_START_TIME:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
212        "hopr_start_time",
213        "The unix timestamp in seconds at which the process was started"
214    ).unwrap();
215    static ref METRIC_HOPR_LIB_VERSION:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
216        "hopr_lib_version",
217        "Executed version of hopr-lib",
218        &["version"]
219    ).unwrap();
220    static ref METRIC_HOPR_NODE_INFO:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
221        "hopr_node_addresses",
222        "Node on-chain and off-chain addresses",
223        &["peerid", "address", "safe_address", "module_address"]
224    ).unwrap();
225}
226
227/// Prepare an optimized version of the tokio runtime setup for hopr-lib specifically.
228///
229/// Divide the available CPU parallelism by 2, since half of the available threads are
230/// to be used for IO-bound and half for CPU-bound tasks.
231#[cfg(feature = "runtime-tokio")]
232pub fn prepare_tokio_runtime(
233    num_cpu_threads: Option<std::num::NonZeroUsize>,
234    num_io_threads: Option<std::num::NonZeroUsize>,
235) -> anyhow::Result<tokio::runtime::Runtime> {
236    use std::str::FromStr;
237    let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
238
239    hopr_parallelize::cpu::init_thread_pool(
240        num_cpu_threads
241            .map(|v| v.get())
242            .or(avail_parallelism)
243            .ok_or(anyhow::anyhow!(
244                "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
245                 environment variable."
246            ))?
247            .max(1),
248    )?;
249
250    Ok(tokio::runtime::Builder::new_multi_thread()
251        .enable_all()
252        .worker_threads(
253            num_io_threads
254                .map(|v| v.get())
255                .or(avail_parallelism)
256                .ok_or(anyhow::anyhow!(
257                    "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
258                     environment variable."
259                ))?
260                .max(1),
261        )
262        .thread_name("hoprd")
263        .thread_stack_size(
264            std::env::var("HOPRD_THREAD_STACK_SIZE")
265                .ok()
266                .and_then(|v| usize::from_str(&v).ok())
267                .unwrap_or(10 * 1024 * 1024)
268                .max(2 * 1024 * 1024),
269        )
270        .build()?)
271}
272
273/// Type alias used to send and receive transport data via a running HOPR node.
274pub type HoprTransportIO = socket::HoprSocket<
275    futures::channel::mpsc::Receiver<ApplicationDataIn>,
276    futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
277>;
278
279type NewTicketEvents = (
280    async_broadcast::Sender<VerifiedTicket>,
281    async_broadcast::InactiveReceiver<VerifiedTicket>,
282);
283
284/// Time to wait until the node's keybinding appears on-chain
285const NODE_READY_TIMEOUT: Duration = Duration::from_secs(120);
286
287/// Timeout to wait until an on-chain event is received in response to a successful on-chain operation resolution.
288// TODO: use the value from ChainInfo instead (once available via https://github.com/hoprnet/blokli/issues/200)
289const ON_CHAIN_RESOLUTION_EVENT_TIMEOUT: Duration = Duration::from_secs(90);
290
291/// HOPR main object providing the entire HOPR node functionality
292///
293/// Instantiating this object creates all processes and objects necessary for
294/// running the HOPR node. Once created, the node can be started using the
295/// `run()` method.
296///
297/// Externally offered API should be enough to perform all necessary tasks
298/// with the HOPR node manually, but it is advised to create such a configuration
299/// that manual interaction is unnecessary.
300///
301/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
302pub struct Hopr<Chain, Db, Graph, Net>
303where
304    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
305        + hopr_api::graph::NetworkGraphUpdate
306        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
307        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
308        + Clone
309        + Send
310        + Sync
311        + 'static,
312    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
313        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
314    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
315    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
316{
317    me: OffchainKeypair,
318    cfg: config::HoprLibConfig,
319    state: Arc<api::node::state::AtomicHoprState>,
320    transport_api: HoprTransport<Chain, Db, Graph, Net>,
321    redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
322    node_db: Db,
323    chain_api: Chain,
324    winning_ticket_subscribers: NewTicketEvents,
325    processes: OnceLock<AbortableList<HoprLibProcess>>,
326}
327
328impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
329where
330    Chain: HoprChainApi + Clone + Send + Sync + 'static,
331    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
332    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
333        + hopr_api::graph::NetworkGraphUpdate
334        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
335        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
336        + Clone
337        + Send
338        + Sync
339        + 'static,
340    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
341        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
342    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
343    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
344{
345    pub async fn new(
346        identity: (&ChainKeypair, &OffchainKeypair),
347        hopr_chain_api: Chain,
348        hopr_node_db: Db,
349        graph: Graph,
350        cfg: config::HoprLibConfig,
351    ) -> errors::Result<Self> {
352        if hopr_api::types::crypto_random::is_rng_fixed() {
353            warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
354        }
355
356        cfg.validate()?;
357
358        let hopr_transport_api = HoprTransport::new(
359            identity,
360            hopr_chain_api.clone(),
361            hopr_node_db.clone(),
362            graph,
363            vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
364            cfg.protocol,
365        )
366        .map_err(HoprLibError::TransportError)?;
367
368        #[cfg(all(feature = "telemetry", not(test)))]
369        {
370            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
371            METRIC_HOPR_LIB_VERSION.set(
372                &[const_format::formatcp!("{}", constants::APP_VERSION)],
373                const_format::formatcp!(
374                    "{}.{}",
375                    env!("CARGO_PKG_VERSION_MAJOR"),
376                    env!("CARGO_PKG_VERSION_MINOR")
377                )
378                .parse()
379                .unwrap_or(0.0),
380            );
381
382            // Calling get_ticket_statistics will initialize the respective metrics on tickets
383            if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
384                error!(%error, "failed to initialize ticket statistics metrics");
385            }
386        }
387
388        let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
389        new_tickets_tx.set_await_active(false);
390        new_tickets_tx.set_overflow(true);
391
392        Ok(Self {
393            me: identity.1.clone(),
394            cfg,
395            state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
396            transport_api: hopr_transport_api,
397            chain_api: hopr_chain_api,
398            node_db: hopr_node_db,
399            redeem_requests: OnceLock::new(),
400            processes: OnceLock::new(),
401            winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
402        })
403    }
404
405    fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
406        if HoprNodeOperations::status(self) == state {
407            Ok(())
408        } else {
409            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
410        }
411    }
412
413    pub fn config(&self) -> &config::HoprLibConfig {
414        &self.cfg
415    }
416
417    /// Returns a reference to the network graph.
418    pub fn graph(&self) -> &Graph {
419        self.transport_api.graph()
420    }
421
422    #[inline]
423    fn is_public(&self) -> bool {
424        self.cfg.publish
425    }
426
427    pub async fn run<
428        Ct,
429        NetBuilder,
430        #[cfg(feature = "session-server")] T: traits::HoprSessionServer + Clone + Send + 'static,
431    >(
432        &self,
433        cover_traffic: Ct,
434        network_builder: NetBuilder,
435        #[cfg(feature = "session-server")] serve_handler: T,
436    ) -> errors::Result<HoprTransportIO>
437    where
438        Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
439        NetBuilder: NetworkBuilder<Network = Net> + Send + Sync + 'static,
440    {
441        self.error_if_not_in_state(
442            HoprState::Uninitialized,
443            "cannot start the hopr node multiple times".into(),
444        )?;
445
446        #[cfg(feature = "testing")]
447        warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
448
449        let me_onchain = *self.chain_api.me();
450        info!(
451            address = %me_onchain, minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
452            "node is not started, please fund this node",
453        );
454
455        self.state.store(HoprState::WaitingForFunds, Ordering::Relaxed);
456        helpers::wait_for_funds(
457            *MIN_NATIVE_BALANCE,
458            *SUGGESTED_NATIVE_BALANCE,
459            Duration::from_secs(200),
460            me_onchain,
461            &self.chain_api,
462        )
463        .await?;
464
465        let mut processes = AbortableList::<HoprLibProcess>::default();
466
467        info!("starting HOPR node...");
468        self.state.store(HoprState::CheckingBalance, Ordering::Relaxed);
469
470        let balance: XDaiBalance = self.chain_api.balance(me_onchain).await.map_err(HoprLibError::chain)?;
471        let minimum_balance = *constants::MIN_NATIVE_BALANCE;
472
473        info!(
474            address = %me_onchain,
475            %balance,
476            %minimum_balance,
477            "node information"
478        );
479
480        if balance.le(&minimum_balance) {
481            return Err(HoprLibError::GeneralError(
482                "cannot start the node without a sufficiently funded wallet".into(),
483            ));
484        }
485
486        self.state.store(HoprState::ValidatingNetworkConfig, Ordering::Relaxed);
487
488        // Once we are able to query the chain,
489        // check if the ticket price is configured correctly.
490        let network_min_ticket_price = self
491            .chain_api
492            .minimum_ticket_price()
493            .await
494            .map_err(HoprLibError::chain)?;
495        let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
496        if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
497            return Err(HoprLibError::GeneralError(format!(
498                "configured outgoing ticket price is lower than the network minimum ticket price: \
499                 {configured_ticket_price:?} < {network_min_ticket_price}"
500            )));
501        }
502        // Once we are able to query the chain,
503        // check if the winning probability is configured correctly.
504        let network_min_win_prob = self
505            .chain_api
506            .minimum_incoming_ticket_win_prob()
507            .await
508            .map_err(HoprLibError::chain)?;
509        let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
510        if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
511            && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
512        {
513            return Err(HoprLibError::GeneralError(format!(
514                "configured outgoing ticket winning probability is lower than the network minimum winning \
515                 probability: {configured_win_prob:?} < {network_min_win_prob}"
516            )));
517        }
518
519        self.state.store(HoprState::CheckingOnchainAddress, Ordering::Relaxed);
520
521        info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
522
523        let safe_addr = self.cfg.safe_module.safe_address;
524
525        if self.me_onchain() == safe_addr {
526            return Err(HoprLibError::GeneralError(
527                "cannot use self as staking safe address".into(),
528            ));
529        }
530
531        self.state.store(HoprState::RegisteringSafe, Ordering::Relaxed);
532        info!(%safe_addr, "registering safe with this node");
533        match self.chain_api.register_safe(&safe_addr).await {
534            Ok(awaiter) => {
535                // Wait until the registration is confirmed on-chain, otherwise we cannot proceed.
536                awaiter.await.map_err(|error| {
537                    error!(%safe_addr, %error, "safe registration failed with error");
538                    HoprLibError::chain(error)
539                })?;
540                info!(%safe_addr, "safe successfully registered with this node");
541            }
542            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
543                info!(%safe_addr, "this safe is already registered with this node");
544            }
545            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
546                // TODO: support safe deregistration flow
547                error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
548                return Err(HoprLibError::GeneralError("node registered with different safe".into()));
549            }
550            Err(error) => {
551                error!(%safe_addr, %error, "safe registration failed");
552                return Err(HoprLibError::chain(error));
553            }
554        }
555
556        // Only public nodes announce multiaddresses
557        let multiaddresses_to_announce = if self.is_public() {
558            // The multiaddresses are filtered for the non-private ones,
559            // unless `announce_local_addresses` is set to `true`.
560            self.transport_api.announceable_multiaddresses()
561        } else {
562            Vec::with_capacity(0)
563        };
564
565        // Warn when announcing a private multiaddress, which is acceptable in certain scenarios
566        multiaddresses_to_announce
567            .iter()
568            .filter(|a| !is_public_address(a))
569            .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
570
571        self.state.store(HoprState::AnnouncingNode, Ordering::Relaxed);
572
573        let chain_api = self.chain_api.clone();
574        let me_offchain = *self.me.public();
575        let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
576
577        // At this point the node is already registered with Safe, so
578        // we can announce via Safe-compliant TX
579        info!(?multiaddresses_to_announce, "announcing node on chain");
580        match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
581            Ok(awaiter) => {
582                // Wait until the announcement is confirmed on-chain, otherwise we cannot proceed.
583                awaiter.await.map_err(|error| {
584                    error!(?multiaddresses_to_announce, %error, "node announcement failed");
585                    HoprLibError::chain(error)
586                })?;
587                info!(?multiaddresses_to_announce, "node has been successfully announced");
588            }
589            Err(AnnouncementError::AlreadyAnnounced) => {
590                info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
591            }
592            Err(error) => {
593                error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
594                return Err(HoprLibError::chain(error));
595            }
596        }
597
598        self.state.store(HoprState::AwaitingKeyBinding, Ordering::Relaxed);
599
600        // Wait for the node key-binding readiness to return
601        let this_node_account = node_ready
602            .await
603            .map_err(HoprLibError::other)?
604            .map_err(HoprLibError::chain)?;
605        if this_node_account.chain_addr != self.me_onchain()
606            || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
607        {
608            error!(%this_node_account, "account bound to offchain key does not match this node");
609            return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
610        }
611
612        info!(%this_node_account, "node account is ready");
613
614        self.state.store(HoprState::InitializingServices, Ordering::Relaxed);
615
616        info!("initializing session infrastructure");
617        let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
618            .ok()
619            .and_then(|s| s.trim().parse::<usize>().ok())
620            .filter(|&c| c > 0)
621            .unwrap_or(256);
622
623        let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
624        #[cfg(feature = "session-server")]
625        {
626            debug!(capacity = incoming_session_channel_capacity, "creating session server");
627            processes.insert(
628                HoprLibProcess::SessionServer,
629                hopr_async_runtime::spawn_as_abortable!(
630                    _session_rx
631                        .for_each_concurrent(None, move |session| {
632                            let serve_handler = serve_handler.clone();
633                            async move {
634                                let session_id = *session.session.id();
635                                match serve_handler.process(session).await {
636                                    Ok(_) => debug!(?session_id, "client session processed successfully"),
637                                    Err(error) => error!(
638                                        ?session_id,
639                                        %error,
640                                        "client session processing failed"
641                                    ),
642                                }
643                            }
644                        })
645                        .inspect(|_| tracing::warn!(
646                            task = %HoprLibProcess::SessionServer,
647                            "long-running background task finished"
648                        ))
649                ),
650            );
651        }
652
653        info!("starting ticket events processor");
654        let (tickets_tx, tickets_rx) = channel(8192);
655        let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
656        processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
657        let node_db = self.node_db.clone();
658        let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
659        spawn(
660            tickets_rx
661                .filter_map(move |ticket_event| {
662                    let node_db = node_db.clone();
663                    async move {
664                        match ticket_event {
665                            TicketEvent::WinningTicket(winning) => {
666                                if let Err(error) = node_db.insert_ticket(*winning).await {
667                                    tracing::error!(%error, %winning, "failed to insert ticket into database");
668                                } else {
669                                    tracing::debug!(%winning, "inserted ticket into database");
670                                }
671                                Some(winning)
672                            }
673                            TicketEvent::RejectedTicket(rejected, issuer) => {
674                                if let Some(issuer) = &issuer {
675                                    if let Err(error) =
676                                        node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
677                                    {
678                                        tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
679                                    } else {
680                                        tracing::debug!(%rejected, "marked ticket as rejected");
681                                    }
682                                } else {
683                                    tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
684                                }
685                                None
686                            }
687                        }
688                    }
689                })
690                .for_each(move |ticket| {
691                    if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
692                        tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
693                    }
694                    futures::future::ready(())
695                })
696                .inspect(|_| {
697                    tracing::warn!(
698                        task = %HoprLibProcess::TicketEvents,
699                        "long-running background task finished"
700                    )
701                }),
702        );
703
704        info!("starting transport");
705        let (hopr_socket, transport_processes) = self
706            .transport_api
707            .run(cover_traffic, network_builder, tickets_tx, session_tx)
708            .await?;
709        processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
710
711        info!("starting ticket redemption service");
712        // Start a queue that takes care of redeeming tickets via given TicketSelectors
713        let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
714        let _ = self.redeem_requests.set(redemption_req_tx);
715        let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
716        processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
717        let chain = self.chain_api.clone();
718        let node_db = self.node_db.clone();
719        spawn(redemption_req_rx
720            .for_each(move |selector| {
721                let chain = chain.clone();
722                let db = node_db.clone();
723                async move {
724                    match chain.redeem_tickets_via_selectors(&db, [selector]).await {
725                        Ok(res) => debug!(%res, "redemption complete"),
726                        Err(error) => error!(%error, "redemption failed"),
727                    }
728                }
729            })
730            .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
731        );
732
733        info!("subscribing to channel events");
734        let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
735        processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
736        let chain = self.chain_api.clone();
737        let node_db = self.node_db.clone();
738        let events = chain.subscribe().map_err(HoprLibError::chain)?;
739        spawn(
740            futures::stream::Abortable::new(
741                events
742                    .filter_map(move |event|
743                        futures::future::ready(event.try_as_channel_closed())
744                    ),
745                chain_events_sub_reg
746            )
747            .for_each(move |closed_channel| {
748                let node_db = node_db.clone();
749                let chain = chain.clone();
750                async move {
751                    match closed_channel.direction(chain.me()) {
752                        Some(ChannelDirection::Incoming) => {
753                            match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
754                                Ok(num_neglected) if num_neglected > 0 => {
755                                    warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
756                                },
757                                Ok(_) => {
758                                    debug!(%closed_channel, "no neglected tickets on incoming closed channel");
759                                },
760                                Err(error) => {
761                                    error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
762                                }
763                            }
764                        },
765                        Some(ChannelDirection::Outgoing) => {
766                            if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
767                                error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
768                            } else {
769                                debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
770                            }
771                        }
772                        _ => {} // Event for a channel that is not our own
773                    }
774                }
775            })
776            .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
777        );
778
779        info!("synchronizing ticket states");
780        // NOTE: after the chain is synced, we can reset tickets which are considered
781        // redeemed but on-chain state does not align with that. This implies there was a problem
782        // right when the transaction was sent on-chain. In such cases, we simply let it retry and
783        // handle errors appropriately.
784        let mut channels = self
785            .chain_api
786            .stream_channels(ChannelSelector {
787                destination: self.me_onchain().into(),
788                ..Default::default()
789            })
790            .map_err(HoprLibError::chain)
791            .await?;
792
793        while let Some(channel) = channels.next().await {
794            // Set the state of all unredeemed tickets with a higher index than the current
795            // channel index as untouched.
796            self.node_db
797                .update_ticket_states_and_fetch(
798                    [TicketSelector::from(&channel)
799                        .with_state(AcknowledgedTicketStatus::BeingRedeemed)
800                        .with_index_range(channel.ticket_index..)],
801                    AcknowledgedTicketStatus::Untouched,
802                )
803                .map_err(HoprLibError::db)
804                .await?
805                .for_each(|ticket| {
806                    info!(%ticket, "fixed next out-of-sync ticket");
807                    futures::future::ready(())
808                })
809                .await;
810
811            // Mark all the tickets with a lower ticket index than the current channel index as neglected.
812            self.node_db
813                .mark_tickets_as(
814                    [TicketSelector::from(&channel).with_index_range(..channel.ticket_index)],
815                    TicketMarker::Neglected,
816                )
817                .map_err(HoprLibError::db)
818                .await?;
819        }
820
821        self.state.store(HoprState::Running, Ordering::Relaxed);
822
823        info!(
824            id = %self.me_peer_id(),
825            version = constants::APP_VERSION,
826            "NODE STARTED AND RUNNING"
827        );
828
829        #[cfg(all(feature = "telemetry", not(test)))]
830        METRIC_HOPR_NODE_INFO.set(
831            &[
832                &self.me.public().to_peerid_str(),
833                &me_onchain.to_string(),
834                &self.cfg.safe_module.safe_address.to_string(),
835                &self.cfg.safe_module.module_address.to_string(),
836            ],
837            1.0,
838        );
839
840        let _ = self.processes.set(processes);
841        Ok(hopr_socket)
842    }
843
844    /// Used to practically shut down all node's processes without dropping the instance.
845    ///
846    /// This means that the instance can be used to retrieve some information, but all
847    /// active operations will stop and new will be impossible to perform.
848    /// Such operations will return [`HoprStatusError::NotThereYet`].
849    ///
850    /// This is the final state and cannot be reversed by calling `run` again.
851    pub fn shutdown(&self) -> Result<(), HoprLibError> {
852        self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
853        if let Some(processes) = self.processes.get() {
854            processes.abort_all();
855        }
856        self.state.store(HoprState::Terminated, Ordering::Relaxed);
857        info!("NODE SHUTDOWN COMPLETE");
858        Ok(())
859    }
860
861    /// Create a client session connection returning a session object that implements
862    /// [`futures::io::AsyncRead`] and [`futures::io::AsyncWrite`] and can bu used as a read/write binary session.
863    #[cfg(feature = "session-client")]
864    pub async fn connect_to(
865        &self,
866        destination: Address,
867        target: SessionTarget,
868        cfg: HoprSessionClientConfig,
869    ) -> errors::Result<HoprSession> {
870        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
871
872        let backoff = backon::ConstantBuilder::default()
873            .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
874            .with_delay(self.cfg.protocol.session.establish_retry_timeout)
875            .with_jitter();
876
877        use backon::Retryable;
878
879        Ok((|| {
880            let cfg = hopr_transport::SessionClientConfig::from(cfg.clone());
881            let target = target.clone();
882            async { self.transport_api.new_session(destination, target, cfg).await }
883        })
884        .retry(backoff)
885        .sleep(backon::FuturesTimerSleeper)
886        .await?)
887    }
888
889    /// Sends keep-alive to the given [`SessionId`], making sure the session is not
890    /// closed due to inactivity.
891    #[cfg(feature = "session-client")]
892    pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
893        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
894        Ok(self.transport_api.probe_session(id).await?)
895    }
896
897    #[cfg(feature = "session-client")]
898    pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
899        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
900        Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
901    }
902
903    #[cfg(feature = "session-client")]
904    pub async fn update_session_surb_balancer_config(
905        &self,
906        id: &SessionId,
907        cfg: SurbBalancerConfig,
908    ) -> errors::Result<()> {
909        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
910        Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
911    }
912
913    /// Spawns a one-shot awaiter that hooks up to the [`ChainEvent`] bus and either matching the given `predicate`
914    /// successfully or timing out after `timeout`.
915    fn spawn_wait_for_on_chain_event(
916        &self,
917        context: impl std::fmt::Display,
918        predicate: impl Fn(&ChainEvent) -> bool + Send + Sync + 'static,
919        timeout: Duration,
920    ) -> errors::Result<(
921        impl Future<Output = errors::Result<ChainEvent>>,
922        hopr_async_runtime::AbortHandle,
923    )> {
924        debug!(%context, "registering wait for on-chain event");
925        let (event_stream, handle) = futures::stream::abortable(
926            self.chain_api
927                .subscribe()
928                .map_err(HoprLibError::chain)?
929                .skip_while(move |event| futures::future::ready(!predicate(event))),
930        );
931
932        let ctx = context.to_string();
933
934        Ok((
935            spawn(async move {
936                pin_mut!(event_stream);
937                let res = event_stream
938                    .next()
939                    .timeout(futures_time::time::Duration::from(timeout))
940                    .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")))
941                    .await?
942                    .ok_or(HoprLibError::GeneralError(format!(
943                        "failed to yield an on-chain event for {ctx}"
944                    )));
945                debug!(%ctx, ?res, "on-chain event waiting done");
946                res
947            })
948            .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")))
949            .and_then(futures::future::ready),
950            handle,
951        ))
952    }
953}
954
955impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
956where
957    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
958        + hopr_api::graph::NetworkGraphUpdate
959        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
960        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
961        + Clone
962        + Send
963        + Sync
964        + 'static,
965    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
966        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
967    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
968    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
969{
970    // === telemetry
971    /// Prometheus formatted metrics collected by the hopr-lib components.
972    pub fn collect_hopr_metrics() -> errors::Result<String> {
973        cfg_if::cfg_if! {
974            if #[cfg(all(feature = "telemetry", not(test)))] {
975                hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
976            } else {
977                Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
978            }
979        }
980    }
981}
982
983// === Trait implementations for the high-level node API ===
984
985impl<Chain, Db, Graph, Net> HoprNodeOperations for Hopr<Chain, Db, Graph, Net>
986where
987    Chain: HoprChainApi + Clone + Send + Sync + 'static,
988    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
989    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
990        + hopr_api::graph::NetworkGraphUpdate
991        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
992        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
993        + Clone
994        + Send
995        + Sync
996        + 'static,
997    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
998        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
999    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1000    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1001{
1002    fn status(&self) -> HoprState {
1003        self.state.load(Ordering::Relaxed)
1004    }
1005}
1006
1007#[async_trait::async_trait]
1008impl<Chain, Db, Graph, Net> HoprNodeNetworkOperations for Hopr<Chain, Db, Graph, Net>
1009where
1010    Chain: HoprChainApi + Clone + Send + Sync + 'static,
1011    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
1012    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
1013        + hopr_api::graph::NetworkGraphUpdate
1014        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
1015        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1016        + Clone
1017        + Send
1018        + Sync
1019        + 'static,
1020    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
1021        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1022    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1023    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1024{
1025    type Error = HoprLibError;
1026    type TransportObservable = <Graph as hopr_api::graph::NetworkGraphView>::Observed;
1027
1028    fn me_peer_id(&self) -> PeerId {
1029        (*self.me.public()).into()
1030    }
1031
1032    async fn get_public_nodes(&self) -> Result<Vec<(PeerId, Address, Vec<Multiaddr>)>, Self::Error> {
1033        Ok(self
1034            .chain_api
1035            .stream_accounts(AccountSelector {
1036                public_only: true,
1037                ..Default::default()
1038            })
1039            .map_err(HoprLibError::chain)
1040            .await?
1041            .map(|entry| {
1042                (
1043                    PeerId::from(entry.public_key),
1044                    entry.chain_addr,
1045                    entry.get_multiaddrs().to_vec(),
1046                )
1047            })
1048            .collect()
1049            .await)
1050    }
1051
1052    async fn network_health(&self) -> hopr_api::network::Health {
1053        self.transport_api.network_health().await
1054    }
1055
1056    async fn network_connected_peers(&self) -> Result<Vec<PeerId>, Self::Error> {
1057        Ok(self
1058            .transport_api
1059            .network_connected_peers()
1060            .await?
1061            .into_iter()
1062            .map(PeerId::from)
1063            .collect())
1064    }
1065
1066    fn network_peer_info(&self, peer: &PeerId) -> Option<Self::TransportObservable> {
1067        let pubkey = OffchainPublicKey::from_peerid(peer).ok()?;
1068        self.transport_api.network_peer_observations(&pubkey)
1069    }
1070
1071    async fn all_network_peers(
1072        &self,
1073        minimum_score: f64,
1074    ) -> Result<Vec<(Option<Address>, PeerId, Self::TransportObservable)>, Self::Error> {
1075        Ok(
1076            futures::stream::iter(self.transport_api.all_network_peers(minimum_score).await?)
1077                .filter_map(|(pubkey, info)| async move {
1078                    let peer_id = PeerId::from(pubkey);
1079                    let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
1080                    Some((address, peer_id, info))
1081                })
1082                .collect::<Vec<_>>()
1083                .await,
1084        )
1085    }
1086
1087    fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1088        self.transport_api.local_multiaddresses()
1089    }
1090
1091    async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1092        self.transport_api.listening_multiaddresses().await
1093    }
1094
1095    async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1096        let Ok(pubkey) = hopr_transport::peer_id_to_public_key(peer).await else {
1097            return vec![];
1098        };
1099        self.transport_api.network_observed_multiaddresses(&pubkey).await
1100    }
1101
1102    async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Result<Vec<Multiaddr>, Self::Error> {
1103        let pubkey = hopr_transport::peer_id_to_public_key(peer)
1104            .await
1105            .map_err(HoprLibError::TransportError)?;
1106
1107        match self
1108            .chain_api
1109            .stream_accounts(AccountSelector {
1110                public_only: false,
1111                offchain_key: Some(pubkey),
1112                ..Default::default()
1113            })
1114            .map_err(HoprLibError::chain)
1115            .await?
1116            .next()
1117            .await
1118        {
1119            Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
1120            None => {
1121                error!(%peer, "no information");
1122                Ok(vec![])
1123            }
1124        }
1125    }
1126
1127    async fn ping(&self, peer: &PeerId) -> Result<(Duration, Self::TransportObservable), Self::Error> {
1128        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1129        let pubkey = hopr_transport::peer_id_to_public_key(peer)
1130            .await
1131            .map_err(HoprLibError::TransportError)?;
1132        Ok(self.transport_api.ping(&pubkey).await?)
1133    }
1134}
1135
1136pub type SinkMap = SinkMapErr<futures::channel::mpsc::Sender<TicketSelector>, fn(SendError) -> HoprLibError>;
1137
1138impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
1139where
1140    Chain: HoprChainApi + Clone + Send + Sync + 'static,
1141    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
1142    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
1143        + hopr_api::graph::NetworkGraphUpdate
1144        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
1145        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1146        + Clone
1147        + Send
1148        + Sync
1149        + 'static,
1150    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
1151        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1152    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1153    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1154{
1155    pub fn me_onchain(&self) -> Address {
1156        *self.chain_api.me()
1157    }
1158
1159    pub fn get_safe_config(&self) -> SafeModuleConfig {
1160        SafeModuleConfig {
1161            safe_address: self.cfg.safe_module.safe_address,
1162            module_address: self.cfg.safe_module.module_address,
1163        }
1164    }
1165
1166    pub async fn get_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1167        self.chain_api
1168            .balance(self.me_onchain())
1169            .await
1170            .map_err(HoprLibError::chain)
1171    }
1172
1173    pub async fn get_safe_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1174        self.chain_api
1175            .balance(self.cfg.safe_module.safe_address)
1176            .await
1177            .map_err(HoprLibError::chain)
1178    }
1179
1180    pub async fn safe_allowance(&self) -> Result<HoprBalance, HoprLibError> {
1181        self.chain_api
1182            .safe_allowance(self.cfg.safe_module.safe_address)
1183            .await
1184            .map_err(HoprLibError::chain)
1185    }
1186
1187    pub async fn chain_info(&self) -> Result<ChainInfo, HoprLibError> {
1188        self.chain_api.chain_info().await.map_err(HoprLibError::chain)
1189    }
1190
1191    pub async fn get_ticket_price(&self) -> Result<HoprBalance, HoprLibError> {
1192        self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1193    }
1194
1195    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> Result<WinningProbability, HoprLibError> {
1196        self.chain_api
1197            .minimum_incoming_ticket_win_prob()
1198            .await
1199            .map_err(HoprLibError::chain)
1200    }
1201
1202    pub async fn get_channel_closure_notice_period(&self) -> Result<Duration, HoprLibError> {
1203        self.chain_api
1204            .channel_closure_notice_period()
1205            .await
1206            .map_err(HoprLibError::chain)
1207    }
1208
1209    pub async fn accounts_announced_on_chain(&self) -> Result<Vec<AccountEntry>, HoprLibError> {
1210        Ok(self
1211            .chain_api
1212            .stream_accounts(AccountSelector {
1213                public_only: true,
1214                ..Default::default()
1215            })
1216            .map_err(HoprLibError::chain)
1217            .await?
1218            .collect()
1219            .await)
1220    }
1221
1222    pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> Result<Option<Address>, HoprLibError> {
1223        let pubkey = hopr_transport::peer_id_to_public_key(peer_id)
1224            .await
1225            .map_err(HoprLibError::TransportError)?;
1226
1227        self.chain_api
1228            .packet_key_to_chain_key(&pubkey)
1229            .await
1230            .map_err(HoprLibError::chain)
1231    }
1232
1233    pub async fn chain_key_to_peerid(&self, address: &Address) -> Result<Option<PeerId>, HoprLibError> {
1234        self.chain_api
1235            .chain_key_to_packet_key(address)
1236            .await
1237            .map(|pk| pk.map(|v| v.into()))
1238            .map_err(HoprLibError::chain)
1239    }
1240
1241    pub async fn channel_from_hash(&self, channel_id: &Hash) -> Result<Option<ChannelEntry>, HoprLibError> {
1242        self.chain_api
1243            .channel_by_id(channel_id)
1244            .await
1245            .map_err(HoprLibError::chain)
1246    }
1247
1248    pub async fn channel(&self, src: &Address, dest: &Address) -> Result<Option<ChannelEntry>, HoprLibError> {
1249        self.chain_api
1250            .channel_by_parties(src, dest)
1251            .await
1252            .map_err(HoprLibError::chain)
1253    }
1254
1255    pub async fn channels_from(&self, src: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1256        Ok(self
1257            .chain_api
1258            .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1259                ChannelStatusDiscriminants::Closed,
1260                ChannelStatusDiscriminants::Open,
1261                ChannelStatusDiscriminants::PendingToClose,
1262            ]))
1263            .map_err(HoprLibError::chain)
1264            .await?
1265            .collect()
1266            .await)
1267    }
1268
1269    pub async fn channels_to(&self, dest: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1270        Ok(self
1271            .chain_api
1272            .stream_channels(
1273                ChannelSelector::default()
1274                    .with_destination(*dest)
1275                    .with_allowed_states(&[
1276                        ChannelStatusDiscriminants::Closed,
1277                        ChannelStatusDiscriminants::Open,
1278                        ChannelStatusDiscriminants::PendingToClose,
1279                    ]),
1280            )
1281            .map_err(HoprLibError::chain)
1282            .await?
1283            .collect()
1284            .await)
1285    }
1286
1287    pub async fn all_channels(&self) -> Result<Vec<ChannelEntry>, HoprLibError> {
1288        Ok(self
1289            .chain_api
1290            .stream_channels(ChannelSelector::default().with_allowed_states(&[
1291                ChannelStatusDiscriminants::Closed,
1292                ChannelStatusDiscriminants::Open,
1293                ChannelStatusDiscriminants::PendingToClose,
1294            ]))
1295            .map_err(HoprLibError::chain)
1296            .await?
1297            .collect()
1298            .await)
1299    }
1300
1301    pub async fn open_channel(
1302        &self,
1303        destination: &Address,
1304        amount: HoprBalance,
1305    ) -> Result<OpenChannelResult, HoprLibError> {
1306        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1307
1308        let channel_id = generate_channel_id(&self.me_onchain(), destination);
1309
1310        // Subscribe to chain events BEFORE sending the transaction to avoid
1311        // a race where the event is broadcast before the subscriber activates.
1312        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1313            format!("open channel to {destination} ({channel_id})"),
1314            move |event| matches!(event, ChainEvent::ChannelOpened(c) if c.get_id() == &channel_id),
1315            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1316        )?;
1317
1318        let confirm_awaiter = self
1319            .chain_api
1320            .open_channel(destination, amount)
1321            .await
1322            .map_err(HoprLibError::chain)?;
1323
1324        let tx_hash = confirm_awaiter.await.map_err(|e| {
1325            event_abort.abort();
1326            HoprLibError::chain(e)
1327        })?;
1328
1329        let event = event_awaiter.await?;
1330        debug!(%event, "open channel event received");
1331
1332        Ok(OpenChannelResult { tx_hash, channel_id })
1333    }
1334
1335    pub async fn fund_channel(&self, channel_id: &ChannelId, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1336        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1337
1338        let channel_id = *channel_id;
1339
1340        // Subscribe to chain events BEFORE sending the transaction to avoid
1341        // a race where the event is broadcast before the subscriber activates.
1342        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1343            format!("fund channel {channel_id}"),
1344            move |event| matches!(event, ChainEvent::ChannelBalanceIncreased(c, a) if c.get_id() == &channel_id && a == &amount),
1345            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT
1346        )?;
1347
1348        let confirm_awaiter = self
1349            .chain_api
1350            .fund_channel(&channel_id, amount)
1351            .await
1352            .map_err(HoprLibError::chain)?;
1353
1354        let res = confirm_awaiter.await.map_err(|e| {
1355            event_abort.abort();
1356            HoprLibError::chain(e)
1357        })?;
1358
1359        let event = event_awaiter.await?;
1360        debug!(%event, "fund channel event received");
1361
1362        Ok(res)
1363    }
1364
1365    pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> Result<CloseChannelResult, HoprLibError> {
1366        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1367
1368        let channel_id = *channel_id;
1369
1370        // Subscribe to chain events BEFORE sending the transaction to avoid
1371        // a race where the event is broadcast before the subscriber activates.
1372        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1373            format!("close channel {channel_id}"),
1374            move |event| {
1375                matches!(event, ChainEvent::ChannelClosed(c) if c.get_id() == &channel_id)
1376                    || matches!(event, ChainEvent::ChannelClosureInitiated(c) if c.get_id() == &channel_id)
1377            },
1378            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1379        )?;
1380
1381        let confirm_awaiter = self
1382            .chain_api
1383            .close_channel(&channel_id)
1384            .await
1385            .map_err(HoprLibError::chain)?;
1386
1387        let tx_hash = confirm_awaiter.await.map_err(|e| {
1388            event_abort.abort();
1389            HoprLibError::chain(e)
1390        })?;
1391
1392        let event = event_awaiter.await?;
1393        debug!(%event, "close channel event received");
1394
1395        Ok(CloseChannelResult { tx_hash })
1396    }
1397
1398    pub async fn tickets_in_channel(
1399        &self,
1400        channel_id: &ChannelId,
1401    ) -> Result<Option<Vec<RedeemableTicket>>, HoprLibError> {
1402        if let Some(channel) = self
1403            .chain_api
1404            .channel_by_id(channel_id)
1405            .await
1406            .map_err(|e| HoprTransportError::Other(e.into()))?
1407        {
1408            if &channel.destination == self.chain_api.me() {
1409                Ok(Some(
1410                    self.node_db
1411                        .stream_tickets([&channel])
1412                        .await
1413                        .map_err(HoprLibError::db)?
1414                        .collect()
1415                        .await,
1416                ))
1417            } else {
1418                Ok(None)
1419            }
1420        } else {
1421            Ok(None)
1422        }
1423    }
1424
1425    pub async fn all_tickets(&self) -> Result<Vec<VerifiedTicket>, HoprLibError> {
1426        Ok(self
1427            .node_db
1428            .stream_tickets(None::<TicketSelector>)
1429            .await
1430            .map_err(HoprLibError::db)?
1431            .map(|v| v.ticket)
1432            .collect()
1433            .await)
1434    }
1435
1436    pub async fn ticket_statistics(&self) -> Result<ChannelTicketStatistics, HoprLibError> {
1437        self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
1438    }
1439
1440    pub async fn reset_ticket_statistics(&self) -> Result<(), HoprLibError> {
1441        self.node_db
1442            .reset_ticket_statistics()
1443            .await
1444            .map_err(HoprLibError::chain)
1445    }
1446
1447    pub async fn redeem_all_tickets<B: Into<HoprBalance> + Send>(&self, min_value: B) -> Result<(), HoprLibError> {
1448        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1449
1450        let min_value = min_value.into();
1451
1452        self.chain_api
1453            .stream_channels(
1454                ChannelSelector::default()
1455                    .with_destination(self.me_onchain())
1456                    .with_allowed_states(&[
1457                        ChannelStatusDiscriminants::Open,
1458                        ChannelStatusDiscriminants::PendingToClose,
1459                    ]),
1460            )
1461            .map_err(HoprLibError::chain)
1462            .await?
1463            .map(|channel| {
1464                Ok(TicketSelector::from(&channel)
1465                    .with_amount(min_value..)
1466                    .with_index_range(channel.ticket_index..)
1467                    .with_state(AcknowledgedTicketStatus::Untouched))
1468            })
1469            .forward(self.redemption_requests()?)
1470            .await?;
1471
1472        Ok(())
1473    }
1474
1475    pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance> + Send>(
1476        &self,
1477        counterparty: &Address,
1478        min_value: B,
1479    ) -> Result<(), HoprLibError> {
1480        self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1481            .await
1482    }
1483
1484    pub async fn redeem_tickets_in_channel<B: Into<HoprBalance> + Send>(
1485        &self,
1486        channel_id: &Hash,
1487        min_value: B,
1488    ) -> Result<(), HoprLibError> {
1489        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1490
1491        let channel = self
1492            .chain_api
1493            .channel_by_id(channel_id)
1494            .await
1495            .map_err(HoprLibError::chain)?
1496            .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1497
1498        self.redemption_requests()?
1499            .send(
1500                TicketSelector::from(channel)
1501                    .with_amount(min_value.into()..)
1502                    .with_index_range(channel.ticket_index..)
1503                    .with_state(AcknowledgedTicketStatus::Untouched),
1504            )
1505            .await?;
1506
1507        Ok(())
1508    }
1509
1510    pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> Result<(), HoprLibError> {
1511        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1512
1513        self.redemption_requests()?
1514            .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1515            .await?;
1516
1517        Ok(())
1518    }
1519
1520    pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send + 'static {
1521        self.winning_ticket_subscribers.1.activate_cloned()
1522    }
1523
1524    pub fn redemption_requests(&self) -> Result<SinkMap, HoprLibError> {
1525        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1526
1527        Ok(self
1528            .redeem_requests
1529            .get()
1530            .cloned()
1531            .expect("redeem_requests is not initialized")
1532            .sink_map_err(|e| HoprLibError::GeneralError(format!("failed to send redemption request: {e}"))))
1533    }
1534
1535    pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1536        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1537
1538        self.chain_api
1539            .withdraw(amount, &recipient)
1540            .and_then(identity)
1541            .map_err(HoprLibError::chain)
1542            .await
1543    }
1544
1545    pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> Result<Hash, HoprLibError> {
1546        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1547
1548        self.chain_api
1549            .withdraw(amount, &recipient)
1550            .and_then(identity)
1551            .map_err(HoprLibError::chain)
1552            .await
1553    }
1554}