Skip to main content

hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15
16/// Helper functions.
17mod helpers;
18
19/// Configuration-related public types
20pub mod config;
21/// Various public constants.
22pub mod constants;
23/// Lists all errors thrown from this library.
24pub mod errors;
25/// Public traits for interactions with this library.
26pub mod traits;
27/// Utility module with helper types and functionality over hopr-lib behavior.
28pub mod utils;
29
30pub use hopr_api as api;
31
32/// Exports of libraries necessary for API and interface operations.
33#[doc(hidden)]
34pub mod exports {
35    pub mod types {
36        pub use hopr_api::types::{chain, internal, primitive};
37    }
38
39    pub mod crypto {
40        pub use hopr_api::types::crypto as types;
41        pub use hopr_crypto_keypair as keypair;
42    }
43
44    pub mod network {
45        pub use hopr_network_types as types;
46    }
47
48    pub use hopr_transport as transport;
49}
50
51/// Export of relevant types for easier integration.
52#[doc(hidden)]
53pub mod prelude {
54    #[cfg(feature = "runtime-tokio")]
55    pub use super::exports::network::types::{
56        prelude::ForeignDataMode,
57        udp::{ConnectedUdpStream, UdpStreamParallelism},
58    };
59    pub use super::exports::{
60        crypto::{
61            keypair::key_pair::HoprKeys,
62            types::prelude::{ChainKeypair, Hash, OffchainKeypair},
63        },
64        transport::{OffchainPublicKey, socket::HoprSocket},
65        types::primitive::prelude::Address,
66    };
67}
68
69use std::{
70    convert::identity,
71    future::Future,
72    sync::{Arc, OnceLock, atomic::Ordering},
73    time::Duration,
74};
75
76use futures::{
77    FutureExt, SinkExt, Stream, StreamExt, TryFutureExt,
78    channel::mpsc::{SendError, channel},
79    pin_mut,
80    sink::SinkMapErr,
81};
82use futures_time::future::FutureExt as FuturesTimeFutureExt;
83use hopr_api::{
84    chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
85    ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
86    db::{HoprNodeDbApi, TicketMarker, TicketSelector},
87    node::{ChainInfo, CloseChannelResult, OpenChannelResult, SafeModuleConfig, state::AtomicHoprState},
88};
89pub use hopr_api::{
90    db::ChannelTicketStatistics,
91    graph::EdgeLinkObservable,
92    network::{NetworkBuilder, NetworkStreamControl},
93    node::{HoprNodeNetworkOperations, HoprNodeOperations, state::HoprState},
94    types::{crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
95};
96use hopr_async_runtime::prelude::spawn;
97pub use hopr_async_runtime::{Abortable, AbortableList};
98pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
99pub use hopr_network_types::prelude::*;
100#[cfg(all(feature = "telemetry", not(test)))]
101use hopr_platform::time::native::current_time;
102use hopr_transport::errors::HoprTransportError;
103#[cfg(feature = "runtime-tokio")]
104pub use hopr_transport::transfer_session;
105pub use hopr_transport::*;
106use tracing::{debug, error, info, warn};
107use validator::Validate;
108
109pub use crate::{
110    config::SafeModule,
111    constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
112    errors::{HoprLibError, HoprStatusError},
113};
114
115/// Public routing configuration for session opening in `hopr-lib`.
116///
117/// This intentionally exposes only hop-count based routing.
118#[cfg(feature = "session-client")]
119#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, smart_default::SmartDefault)]
120#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
121pub struct HopRouting(
122    #[default(hopr_api::types::primitive::bounded::BoundedSize::MIN)]
123    hopr_api::types::primitive::bounded::BoundedSize<
124        { hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS },
125    >,
126);
127
128#[cfg(feature = "session-client")]
129impl HopRouting {
130    /// Maximum number of hops that can be configured.
131    pub const MAX_HOPS: usize = hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS;
132
133    /// Returns the configured number of hops.
134    pub fn hop_count(self) -> usize {
135        self.0.into()
136    }
137}
138
139#[cfg(feature = "session-client")]
140impl TryFrom<usize> for HopRouting {
141    type Error = hopr_api::types::primitive::errors::GeneralError;
142
143    fn try_from(value: usize) -> Result<Self, Self::Error> {
144        Ok(Self(value.try_into()?))
145    }
146}
147
148#[cfg(feature = "session-client")]
149impl From<HopRouting> for hopr_api::types::internal::routing::RoutingOptions {
150    fn from(value: HopRouting) -> Self {
151        Self::Hops(value.0)
152    }
153}
154
155/// Session client configuration for `hopr-lib`.
156///
157/// Unlike transport-level configuration, this API intentionally does not expose
158/// explicit intermediate paths.
159#[cfg(feature = "session-client")]
160#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault)]
161pub struct HoprSessionClientConfig {
162    /// Forward route selection policy.
163    pub forward_path: HopRouting,
164    /// Return route selection policy.
165    pub return_path: HopRouting,
166    /// Capabilities offered by the session.
167    #[default(_code = "SessionCapability::Segmentation.into()")]
168    pub capabilities: SessionCapabilities,
169    /// Optional pseudonym used for the session. Mostly useful for testing only.
170    #[default(None)]
171    pub pseudonym: Option<hopr_api::types::internal::protocol::HoprPseudonym>,
172    /// Enable automatic SURB management for the session.
173    #[default(Some(SurbBalancerConfig::default()))]
174    pub surb_management: Option<SurbBalancerConfig>,
175    /// If set, the maximum number of possible SURBs will always be sent with session data packets.
176    #[default(false)]
177    pub always_max_out_surbs: bool,
178}
179
180#[cfg(feature = "session-client")]
181impl From<HoprSessionClientConfig> for hopr_transport::SessionClientConfig {
182    fn from(value: HoprSessionClientConfig) -> Self {
183        Self {
184            forward_path_options: value.forward_path.into(),
185            return_path_options: value.return_path.into(),
186            capabilities: value.capabilities,
187            pseudonym: value.pseudonym,
188            surb_management: value.surb_management,
189            always_max_out_surbs: value.always_max_out_surbs,
190        }
191    }
192}
193
194/// Long-running tasks that are spawned by the HOPR node.
195#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display, strum::EnumCount)]
196pub enum HoprLibProcess {
197    #[strum(to_string = "transport: {0}")]
198    Transport(HoprTransportProcess),
199    #[strum(to_string = "session server providing the exit node session stream functionality")]
200    SessionServer,
201    #[strum(to_string = "ticket redemption queue driver")]
202    TicketRedemptions,
203    #[strum(to_string = "subscription for on-chain channel updates")]
204    ChannelEvents,
205    #[strum(to_string = "on received ticket event (winning or rejected)")]
206    TicketEvents,
207}
208
209#[cfg(all(feature = "telemetry", not(test)))]
210lazy_static::lazy_static! {
211    static ref METRIC_PROCESS_START_TIME:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
212        "hopr_start_time",
213        "The unix timestamp in seconds at which the process was started"
214    ).unwrap();
215    static ref METRIC_HOPR_LIB_VERSION:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
216        "hopr_lib_version",
217        "Executed version of hopr-lib",
218        &["version"]
219    ).unwrap();
220    static ref METRIC_HOPR_NODE_INFO:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
221        "hopr_node_addresses",
222        "Node on-chain and off-chain addresses",
223        &["peerid", "address", "safe_address", "module_address"]
224    ).unwrap();
225}
226
227/// Prepare an optimized version of the tokio runtime setup for hopr-lib specifically.
228///
229/// Divide the available CPU parallelism by 2, since half of the available threads are
230/// to be used for IO-bound and half for CPU-bound tasks.
231#[cfg(feature = "runtime-tokio")]
232pub fn prepare_tokio_runtime(
233    num_cpu_threads: Option<std::num::NonZeroUsize>,
234    num_io_threads: Option<std::num::NonZeroUsize>,
235) -> anyhow::Result<tokio::runtime::Runtime> {
236    use std::str::FromStr;
237    let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
238
239    hopr_parallelize::cpu::init_thread_pool(
240        num_cpu_threads
241            .map(|v| v.get())
242            .or(avail_parallelism)
243            .ok_or(anyhow::anyhow!(
244                "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
245                 environment variable."
246            ))?
247            .max(1),
248    )?;
249
250    Ok(tokio::runtime::Builder::new_multi_thread()
251        .enable_all()
252        .worker_threads(
253            num_io_threads
254                .map(|v| v.get())
255                .or(avail_parallelism)
256                .ok_or(anyhow::anyhow!(
257                    "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
258                     environment variable."
259                ))?
260                .max(1),
261        )
262        .thread_name("hoprd")
263        .thread_stack_size(
264            std::env::var("HOPRD_THREAD_STACK_SIZE")
265                .ok()
266                .and_then(|v| usize::from_str(&v).ok())
267                .unwrap_or(10 * 1024 * 1024)
268                .max(2 * 1024 * 1024),
269        )
270        .build()?)
271}
272
273/// Type alias used to send and receive transport data via a running HOPR node.
274pub type HoprTransportIO = socket::HoprSocket<
275    futures::channel::mpsc::Receiver<ApplicationDataIn>,
276    futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
277>;
278
279type NewTicketEvents = (
280    async_broadcast::Sender<VerifiedTicket>,
281    async_broadcast::InactiveReceiver<VerifiedTicket>,
282);
283
284/// Time to wait until the node's keybinding appears on-chain
285const NODE_READY_TIMEOUT: Duration = Duration::from_secs(120);
286
287/// Timeout to wait until an on-chain event is received in response to a successful on-chain operation resolution.
288// TODO: use the value from ChainInfo instead (once available via https://github.com/hoprnet/blokli/issues/200)
289const ON_CHAIN_RESOLUTION_EVENT_TIMEOUT: Duration = Duration::from_secs(90);
290
291/// HOPR main object providing the entire HOPR node functionality
292///
293/// Instantiating this object creates all processes and objects necessary for
294/// running the HOPR node. Once created, the node can be started using the
295/// `run()` method.
296///
297/// Externally offered API should be enough to perform all necessary tasks
298/// with the HOPR node manually, but it is advised to create such a configuration
299/// that manual interaction is unnecessary.
300///
301/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
302pub struct Hopr<Chain, Db, Graph, Net>
303where
304    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
305        + hopr_api::graph::NetworkGraphUpdate
306        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
307        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
308        + Clone
309        + Send
310        + Sync
311        + 'static,
312    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
313        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
314    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
315    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
316{
317    me: OffchainKeypair,
318    cfg: config::HoprLibConfig,
319    state: Arc<api::node::state::AtomicHoprState>,
320    transport_api: HoprTransport<Chain, Db, Graph, Net>,
321    redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
322    node_db: Db,
323    chain_api: Chain,
324    winning_ticket_subscribers: NewTicketEvents,
325    processes: OnceLock<AbortableList<HoprLibProcess>>,
326}
327
328impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
329where
330    Chain: HoprChainApi + Clone + Send + Sync + 'static,
331    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
332    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
333        + hopr_api::graph::NetworkGraphUpdate
334        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
335        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
336        + Clone
337        + Send
338        + Sync
339        + 'static,
340    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
341        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
342    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
343    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
344{
345    pub async fn new(
346        identity: (&ChainKeypair, &OffchainKeypair),
347        hopr_chain_api: Chain,
348        hopr_node_db: Db,
349        graph: Graph,
350        cfg: config::HoprLibConfig,
351    ) -> errors::Result<Self> {
352        if hopr_api::types::crypto_random::is_rng_fixed() {
353            warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
354        }
355
356        cfg.validate()?;
357
358        let hopr_transport_api = HoprTransport::new(
359            identity,
360            hopr_chain_api.clone(),
361            hopr_node_db.clone(),
362            graph,
363            vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
364            cfg.protocol,
365        )
366        .map_err(HoprLibError::TransportError)?;
367
368        #[cfg(all(feature = "telemetry", not(test)))]
369        {
370            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
371            METRIC_HOPR_LIB_VERSION.set(
372                &[const_format::formatcp!("{}", constants::APP_VERSION)],
373                const_format::formatcp!(
374                    "{}.{}",
375                    env!("CARGO_PKG_VERSION_MAJOR"),
376                    env!("CARGO_PKG_VERSION_MINOR")
377                )
378                .parse()
379                .unwrap_or(0.0),
380            );
381
382            // Calling get_ticket_statistics will initialize the respective metrics on tickets
383            if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
384                error!(%error, "failed to initialize ticket statistics metrics");
385            }
386        }
387
388        let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
389        new_tickets_tx.set_await_active(false);
390        new_tickets_tx.set_overflow(true);
391
392        Ok(Self {
393            me: identity.1.clone(),
394            cfg,
395            state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
396            transport_api: hopr_transport_api,
397            chain_api: hopr_chain_api,
398            node_db: hopr_node_db,
399            redeem_requests: OnceLock::new(),
400            processes: OnceLock::new(),
401            winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
402        })
403    }
404
405    fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
406        if HoprNodeOperations::status(self) == state {
407            Ok(())
408        } else {
409            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
410        }
411    }
412
413    pub fn config(&self) -> &config::HoprLibConfig {
414        &self.cfg
415    }
416
417    #[inline]
418    fn is_public(&self) -> bool {
419        self.cfg.publish
420    }
421
422    pub async fn run<
423        Ct,
424        NetBuilder,
425        #[cfg(feature = "session-server")] T: traits::HoprSessionServer + Clone + Send + 'static,
426    >(
427        &self,
428        cover_traffic: Ct,
429        network_builder: NetBuilder,
430        #[cfg(feature = "session-server")] serve_handler: T,
431    ) -> errors::Result<HoprTransportIO>
432    where
433        Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
434        NetBuilder: NetworkBuilder<Network = Net> + Send + Sync + 'static,
435    {
436        self.error_if_not_in_state(
437            HoprState::Uninitialized,
438            "cannot start the hopr node multiple times".into(),
439        )?;
440
441        #[cfg(feature = "testing")]
442        warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
443
444        let me_onchain = *self.chain_api.me();
445        info!(
446            address = %me_onchain, minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
447            "node is not started, please fund this node",
448        );
449
450        self.state.store(HoprState::WaitingForFunds, Ordering::Relaxed);
451        helpers::wait_for_funds(
452            *MIN_NATIVE_BALANCE,
453            *SUGGESTED_NATIVE_BALANCE,
454            Duration::from_secs(200),
455            me_onchain,
456            &self.chain_api,
457        )
458        .await?;
459
460        let mut processes = AbortableList::<HoprLibProcess>::default();
461
462        info!("starting HOPR node...");
463        self.state.store(HoprState::CheckingBalance, Ordering::Relaxed);
464
465        let balance: XDaiBalance = self.chain_api.balance(me_onchain).await.map_err(HoprLibError::chain)?;
466        let minimum_balance = *constants::MIN_NATIVE_BALANCE;
467
468        info!(
469            address = %me_onchain,
470            %balance,
471            %minimum_balance,
472            "node information"
473        );
474
475        if balance.le(&minimum_balance) {
476            return Err(HoprLibError::GeneralError(
477                "cannot start the node without a sufficiently funded wallet".into(),
478            ));
479        }
480
481        self.state.store(HoprState::ValidatingNetworkConfig, Ordering::Relaxed);
482
483        // Once we are able to query the chain,
484        // check if the ticket price is configured correctly.
485        let network_min_ticket_price = self
486            .chain_api
487            .minimum_ticket_price()
488            .await
489            .map_err(HoprLibError::chain)?;
490        let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
491        if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
492            return Err(HoprLibError::GeneralError(format!(
493                "configured outgoing ticket price is lower than the network minimum ticket price: \
494                 {configured_ticket_price:?} < {network_min_ticket_price}"
495            )));
496        }
497        // Once we are able to query the chain,
498        // check if the winning probability is configured correctly.
499        let network_min_win_prob = self
500            .chain_api
501            .minimum_incoming_ticket_win_prob()
502            .await
503            .map_err(HoprLibError::chain)?;
504        let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
505        if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
506            && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
507        {
508            return Err(HoprLibError::GeneralError(format!(
509                "configured outgoing ticket winning probability is lower than the network minimum winning \
510                 probability: {configured_win_prob:?} < {network_min_win_prob}"
511            )));
512        }
513
514        self.state.store(HoprState::CheckingOnchainAddress, Ordering::Relaxed);
515
516        info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
517
518        let safe_addr = self.cfg.safe_module.safe_address;
519
520        if self.me_onchain() == safe_addr {
521            return Err(HoprLibError::GeneralError(
522                "cannot use self as staking safe address".into(),
523            ));
524        }
525
526        self.state.store(HoprState::RegisteringSafe, Ordering::Relaxed);
527        info!(%safe_addr, "registering safe with this node");
528        match self.chain_api.register_safe(&safe_addr).await {
529            Ok(awaiter) => {
530                // Wait until the registration is confirmed on-chain, otherwise we cannot proceed.
531                awaiter.await.map_err(|error| {
532                    error!(%safe_addr, %error, "safe registration failed with error");
533                    HoprLibError::chain(error)
534                })?;
535                info!(%safe_addr, "safe successfully registered with this node");
536            }
537            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
538                info!(%safe_addr, "this safe is already registered with this node");
539            }
540            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
541                // TODO: support safe deregistration flow
542                error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
543                return Err(HoprLibError::GeneralError("node registered with different safe".into()));
544            }
545            Err(error) => {
546                error!(%safe_addr, %error, "safe registration failed");
547                return Err(HoprLibError::chain(error));
548            }
549        }
550
551        // Only public nodes announce multiaddresses
552        let multiaddresses_to_announce = if self.is_public() {
553            // The multiaddresses are filtered for the non-private ones,
554            // unless `announce_local_addresses` is set to `true`.
555            self.transport_api.announceable_multiaddresses()
556        } else {
557            Vec::with_capacity(0)
558        };
559
560        // Warn when announcing a private multiaddress, which is acceptable in certain scenarios
561        multiaddresses_to_announce
562            .iter()
563            .filter(|a| !is_public_address(a))
564            .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
565
566        self.state.store(HoprState::AnnouncingNode, Ordering::Relaxed);
567
568        let chain_api = self.chain_api.clone();
569        let me_offchain = *self.me.public();
570        let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
571
572        // At this point the node is already registered with Safe, so
573        // we can announce via Safe-compliant TX
574        info!(?multiaddresses_to_announce, "announcing node on chain");
575        match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
576            Ok(awaiter) => {
577                // Wait until the announcement is confirmed on-chain, otherwise we cannot proceed.
578                awaiter.await.map_err(|error| {
579                    error!(?multiaddresses_to_announce, %error, "node announcement failed");
580                    HoprLibError::chain(error)
581                })?;
582                info!(?multiaddresses_to_announce, "node has been successfully announced");
583            }
584            Err(AnnouncementError::AlreadyAnnounced) => {
585                info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
586            }
587            Err(error) => {
588                error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
589                return Err(HoprLibError::chain(error));
590            }
591        }
592
593        self.state.store(HoprState::AwaitingKeyBinding, Ordering::Relaxed);
594
595        // Wait for the node key-binding readiness to return
596        let this_node_account = node_ready
597            .await
598            .map_err(HoprLibError::other)?
599            .map_err(HoprLibError::chain)?;
600        if this_node_account.chain_addr != self.me_onchain()
601            || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
602        {
603            error!(%this_node_account, "account bound to offchain key does not match this node");
604            return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
605        }
606
607        info!(%this_node_account, "node account is ready");
608
609        self.state.store(HoprState::InitializingServices, Ordering::Relaxed);
610
611        info!("initializing session infrastructure");
612        let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
613            .ok()
614            .and_then(|s| s.trim().parse::<usize>().ok())
615            .filter(|&c| c > 0)
616            .unwrap_or(256);
617
618        let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
619        #[cfg(feature = "session-server")]
620        {
621            debug!(capacity = incoming_session_channel_capacity, "creating session server");
622            processes.insert(
623                HoprLibProcess::SessionServer,
624                hopr_async_runtime::spawn_as_abortable!(
625                    _session_rx
626                        .for_each_concurrent(None, move |session| {
627                            let serve_handler = serve_handler.clone();
628                            async move {
629                                let session_id = *session.session.id();
630                                match serve_handler.process(session).await {
631                                    Ok(_) => debug!(?session_id, "client session processed successfully"),
632                                    Err(error) => error!(
633                                        ?session_id,
634                                        %error,
635                                        "client session processing failed"
636                                    ),
637                                }
638                            }
639                        })
640                        .inspect(|_| tracing::warn!(
641                            task = %HoprLibProcess::SessionServer,
642                            "long-running background task finished"
643                        ))
644                ),
645            );
646        }
647
648        info!("starting ticket events processor");
649        let (tickets_tx, tickets_rx) = channel(8192);
650        let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
651        processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
652        let node_db = self.node_db.clone();
653        let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
654        spawn(
655            tickets_rx
656                .filter_map(move |ticket_event| {
657                    let node_db = node_db.clone();
658                    async move {
659                        match ticket_event {
660                            TicketEvent::WinningTicket(winning) => {
661                                if let Err(error) = node_db.insert_ticket(*winning).await {
662                                    tracing::error!(%error, %winning, "failed to insert ticket into database");
663                                } else {
664                                    tracing::debug!(%winning, "inserted ticket into database");
665                                }
666                                Some(winning)
667                            }
668                            TicketEvent::RejectedTicket(rejected, issuer) => {
669                                if let Some(issuer) = &issuer {
670                                    if let Err(error) =
671                                        node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
672                                    {
673                                        tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
674                                    } else {
675                                        tracing::debug!(%rejected, "marked ticket as rejected");
676                                    }
677                                } else {
678                                    tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
679                                }
680                                None
681                            }
682                        }
683                    }
684                })
685                .for_each(move |ticket| {
686                    if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
687                        tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
688                    }
689                    futures::future::ready(())
690                })
691                .inspect(|_| {
692                    tracing::warn!(
693                        task = %HoprLibProcess::TicketEvents,
694                        "long-running background task finished"
695                    )
696                }),
697        );
698
699        info!("starting transport");
700        let (hopr_socket, transport_processes) = self
701            .transport_api
702            .run(cover_traffic, network_builder, tickets_tx, session_tx)
703            .await?;
704        processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
705
706        info!("starting ticket redemption service");
707        // Start a queue that takes care of redeeming tickets via given TicketSelectors
708        let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
709        let _ = self.redeem_requests.set(redemption_req_tx);
710        let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
711        processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
712        let chain = self.chain_api.clone();
713        let node_db = self.node_db.clone();
714        spawn(redemption_req_rx
715            .for_each(move |selector| {
716                let chain = chain.clone();
717                let db = node_db.clone();
718                async move {
719                    match chain.redeem_tickets_via_selectors(&db, [selector]).await {
720                        Ok(res) => debug!(%res, "redemption complete"),
721                        Err(error) => error!(%error, "redemption failed"),
722                    }
723                }
724            })
725            .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
726        );
727
728        info!("subscribing to channel events");
729        let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
730        processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
731        let chain = self.chain_api.clone();
732        let node_db = self.node_db.clone();
733        let events = chain.subscribe().map_err(HoprLibError::chain)?;
734        spawn(
735            futures::stream::Abortable::new(
736                events
737                    .filter_map(move |event|
738                        futures::future::ready(event.try_as_channel_closed())
739                    ),
740                chain_events_sub_reg
741            )
742            .for_each(move |closed_channel| {
743                let node_db = node_db.clone();
744                let chain = chain.clone();
745                async move {
746                    match closed_channel.direction(chain.me()) {
747                        Some(ChannelDirection::Incoming) => {
748                            match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
749                                Ok(num_neglected) if num_neglected > 0 => {
750                                    warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
751                                },
752                                Ok(_) => {
753                                    debug!(%closed_channel, "no neglected tickets on incoming closed channel");
754                                },
755                                Err(error) => {
756                                    error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
757                                }
758                            }
759                        },
760                        Some(ChannelDirection::Outgoing) => {
761                            if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
762                                error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
763                            } else {
764                                debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
765                            }
766                        }
767                        _ => {} // Event for a channel that is not our own
768                    }
769                }
770            })
771            .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
772        );
773
774        info!("synchronizing ticket states");
775        // NOTE: after the chain is synced, we can reset tickets which are considered
776        // redeemed but on-chain state does not align with that. This implies there was a problem
777        // right when the transaction was sent on-chain. In such cases, we simply let it retry and
778        // handle errors appropriately.
779        let mut channels = self
780            .chain_api
781            .stream_channels(ChannelSelector {
782                destination: self.me_onchain().into(),
783                ..Default::default()
784            })
785            .map_err(HoprLibError::chain)
786            .await?;
787
788        while let Some(channel) = channels.next().await {
789            // Set the state of all unredeemed tickets with a higher index than the current
790            // channel index as untouched.
791            self.node_db
792                .update_ticket_states_and_fetch(
793                    [TicketSelector::from(&channel)
794                        .with_state(AcknowledgedTicketStatus::BeingRedeemed)
795                        .with_index_range(channel.ticket_index..)],
796                    AcknowledgedTicketStatus::Untouched,
797                )
798                .map_err(HoprLibError::db)
799                .await?
800                .for_each(|ticket| {
801                    info!(%ticket, "fixed next out-of-sync ticket");
802                    futures::future::ready(())
803                })
804                .await;
805
806            // Mark all the tickets with a lower ticket index than the current channel index as neglected.
807            self.node_db
808                .mark_tickets_as(
809                    [TicketSelector::from(&channel).with_index_range(..channel.ticket_index)],
810                    TicketMarker::Neglected,
811                )
812                .map_err(HoprLibError::db)
813                .await?;
814        }
815
816        self.state.store(HoprState::Running, Ordering::Relaxed);
817
818        info!(
819            id = %self.me_peer_id(),
820            version = constants::APP_VERSION,
821            "NODE STARTED AND RUNNING"
822        );
823
824        #[cfg(all(feature = "telemetry", not(test)))]
825        METRIC_HOPR_NODE_INFO.set(
826            &[
827                &self.me.public().to_peerid_str(),
828                &me_onchain.to_string(),
829                &self.cfg.safe_module.safe_address.to_string(),
830                &self.cfg.safe_module.module_address.to_string(),
831            ],
832            1.0,
833        );
834
835        let _ = self.processes.set(processes);
836        Ok(hopr_socket)
837    }
838
839    /// Used to practically shut down all node's processes without dropping the instance.
840    ///
841    /// This means that the instance can be used to retrieve some information, but all
842    /// active operations will stop and new will be impossible to perform.
843    /// Such operations will return [`HoprStatusError::NotThereYet`].
844    ///
845    /// This is the final state and cannot be reversed by calling `run` again.
846    pub fn shutdown(&self) -> Result<(), HoprLibError> {
847        self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
848        if let Some(processes) = self.processes.get() {
849            processes.abort_all();
850        }
851        self.state.store(HoprState::Terminated, Ordering::Relaxed);
852        info!("NODE SHUTDOWN COMPLETE");
853        Ok(())
854    }
855
856    /// Create a client session connection returning a session object that implements
857    /// [`futures::io::AsyncRead`] and [`futures::io::AsyncWrite`] and can bu used as a read/write binary session.
858    #[cfg(feature = "session-client")]
859    pub async fn connect_to(
860        &self,
861        destination: Address,
862        target: SessionTarget,
863        cfg: HoprSessionClientConfig,
864    ) -> errors::Result<HoprSession> {
865        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
866
867        let backoff = backon::ConstantBuilder::default()
868            .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
869            .with_delay(self.cfg.protocol.session.establish_retry_timeout)
870            .with_jitter();
871
872        use backon::Retryable;
873
874        Ok((|| {
875            let cfg = hopr_transport::SessionClientConfig::from(cfg.clone());
876            let target = target.clone();
877            async { self.transport_api.new_session(destination, target, cfg).await }
878        })
879        .retry(backoff)
880        .sleep(backon::FuturesTimerSleeper)
881        .await?)
882    }
883
884    /// Sends keep-alive to the given [`SessionId`], making sure the session is not
885    /// closed due to inactivity.
886    #[cfg(feature = "session-client")]
887    pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
888        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
889        Ok(self.transport_api.probe_session(id).await?)
890    }
891
892    #[cfg(feature = "session-client")]
893    pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
894        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
895        Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
896    }
897
898    #[cfg(feature = "session-client")]
899    pub async fn update_session_surb_balancer_config(
900        &self,
901        id: &SessionId,
902        cfg: SurbBalancerConfig,
903    ) -> errors::Result<()> {
904        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
905        Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
906    }
907
908    /// Spawns a one-shot awaiter that hooks up to the [`ChainEvent`] bus and either matching the given `predicate`
909    /// successfully or timing out after `timeout`.
910    fn spawn_wait_for_on_chain_event(
911        &self,
912        context: impl std::fmt::Display,
913        predicate: impl Fn(&ChainEvent) -> bool + Send + Sync + 'static,
914        timeout: Duration,
915    ) -> errors::Result<(
916        impl Future<Output = errors::Result<ChainEvent>>,
917        hopr_async_runtime::AbortHandle,
918    )> {
919        debug!(%context, "registering wait for on-chain event");
920        let (event_stream, handle) = futures::stream::abortable(
921            self.chain_api
922                .subscribe()
923                .map_err(HoprLibError::chain)?
924                .skip_while(move |event| futures::future::ready(!predicate(event))),
925        );
926
927        let ctx = context.to_string();
928
929        Ok((
930            spawn(async move {
931                pin_mut!(event_stream);
932                let res = event_stream
933                    .next()
934                    .timeout(futures_time::time::Duration::from(timeout))
935                    .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")))
936                    .await?
937                    .ok_or(HoprLibError::GeneralError(format!(
938                        "failed to yield an on-chain event for {ctx}"
939                    )));
940                debug!(%ctx, ?res, "on-chain event waiting done");
941                res
942            })
943            .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")))
944            .and_then(futures::future::ready),
945            handle,
946        ))
947    }
948}
949
950impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
951where
952    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
953        + hopr_api::graph::NetworkGraphUpdate
954        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
955        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
956        + Clone
957        + Send
958        + Sync
959        + 'static,
960    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
961        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
962    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
963    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
964{
965    // === telemetry
966    /// Prometheus formatted metrics collected by the hopr-lib components.
967    pub fn collect_hopr_metrics() -> errors::Result<String> {
968        cfg_if::cfg_if! {
969            if #[cfg(all(feature = "telemetry", not(test)))] {
970                hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
971            } else {
972                Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
973            }
974        }
975    }
976}
977
978// === Trait implementations for the high-level node API ===
979
980impl<Chain, Db, Graph, Net> HoprNodeOperations for Hopr<Chain, Db, Graph, Net>
981where
982    Chain: HoprChainApi + Clone + Send + Sync + 'static,
983    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
984    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
985        + hopr_api::graph::NetworkGraphUpdate
986        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
987        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
988        + Clone
989        + Send
990        + Sync
991        + 'static,
992    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
993        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
994    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
995    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
996{
997    fn status(&self) -> HoprState {
998        self.state.load(Ordering::Relaxed)
999    }
1000}
1001
1002#[async_trait::async_trait]
1003impl<Chain, Db, Graph, Net> HoprNodeNetworkOperations for Hopr<Chain, Db, Graph, Net>
1004where
1005    Chain: HoprChainApi + Clone + Send + Sync + 'static,
1006    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
1007    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
1008        + hopr_api::graph::NetworkGraphUpdate
1009        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
1010        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1011        + Clone
1012        + Send
1013        + Sync
1014        + 'static,
1015    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
1016        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1017    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1018    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1019{
1020    type Error = HoprLibError;
1021    type TransportObservable = <Graph as hopr_api::graph::NetworkGraphView>::Observed;
1022
1023    fn me_peer_id(&self) -> PeerId {
1024        (*self.me.public()).into()
1025    }
1026
1027    async fn get_public_nodes(&self) -> Result<Vec<(PeerId, Address, Vec<Multiaddr>)>, Self::Error> {
1028        Ok(self
1029            .chain_api
1030            .stream_accounts(AccountSelector {
1031                public_only: true,
1032                ..Default::default()
1033            })
1034            .map_err(HoprLibError::chain)
1035            .await?
1036            .map(|entry| {
1037                (
1038                    PeerId::from(entry.public_key),
1039                    entry.chain_addr,
1040                    entry.get_multiaddrs().to_vec(),
1041                )
1042            })
1043            .collect()
1044            .await)
1045    }
1046
1047    async fn network_health(&self) -> hopr_api::network::Health {
1048        self.transport_api.network_health().await
1049    }
1050
1051    async fn network_connected_peers(&self) -> Result<Vec<PeerId>, Self::Error> {
1052        Ok(self
1053            .transport_api
1054            .network_connected_peers()
1055            .await?
1056            .into_iter()
1057            .map(PeerId::from)
1058            .collect())
1059    }
1060
1061    fn network_peer_info(&self, peer: &PeerId) -> Option<Self::TransportObservable> {
1062        let pubkey = OffchainPublicKey::from_peerid(peer).ok()?;
1063        self.transport_api.network_peer_observations(&pubkey)
1064    }
1065
1066    async fn all_network_peers(
1067        &self,
1068        minimum_score: f64,
1069    ) -> Result<Vec<(Option<Address>, PeerId, Self::TransportObservable)>, Self::Error> {
1070        Ok(
1071            futures::stream::iter(self.transport_api.all_network_peers(minimum_score).await?)
1072                .filter_map(|(pubkey, info)| async move {
1073                    let peer_id = PeerId::from(pubkey);
1074                    let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
1075                    Some((address, peer_id, info))
1076                })
1077                .collect::<Vec<_>>()
1078                .await,
1079        )
1080    }
1081
1082    fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1083        self.transport_api.local_multiaddresses()
1084    }
1085
1086    async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1087        self.transport_api.listening_multiaddresses().await
1088    }
1089
1090    async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1091        let Ok(pubkey) = hopr_transport::peer_id_to_public_key(peer).await else {
1092            return vec![];
1093        };
1094        self.transport_api.network_observed_multiaddresses(&pubkey).await
1095    }
1096
1097    async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Result<Vec<Multiaddr>, Self::Error> {
1098        let pubkey = hopr_transport::peer_id_to_public_key(peer)
1099            .await
1100            .map_err(HoprLibError::TransportError)?;
1101
1102        match self
1103            .chain_api
1104            .stream_accounts(AccountSelector {
1105                public_only: false,
1106                offchain_key: Some(pubkey),
1107                ..Default::default()
1108            })
1109            .map_err(HoprLibError::chain)
1110            .await?
1111            .next()
1112            .await
1113        {
1114            Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
1115            None => {
1116                error!(%peer, "no information");
1117                Ok(vec![])
1118            }
1119        }
1120    }
1121
1122    async fn ping(&self, peer: &PeerId) -> Result<(Duration, Self::TransportObservable), Self::Error> {
1123        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1124        let pubkey = hopr_transport::peer_id_to_public_key(peer)
1125            .await
1126            .map_err(HoprLibError::TransportError)?;
1127        Ok(self.transport_api.ping(&pubkey).await?)
1128    }
1129}
1130
1131pub type SinkMap = SinkMapErr<futures::channel::mpsc::Sender<TicketSelector>, fn(SendError) -> HoprLibError>;
1132
1133impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
1134where
1135    Chain: HoprChainApi + Clone + Send + Sync + 'static,
1136    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
1137    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
1138        + hopr_api::graph::NetworkGraphUpdate
1139        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
1140        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1141        + Clone
1142        + Send
1143        + Sync
1144        + 'static,
1145    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
1146        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1147    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1148    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1149{
1150    pub fn me_onchain(&self) -> Address {
1151        *self.chain_api.me()
1152    }
1153
1154    pub fn get_safe_config(&self) -> SafeModuleConfig {
1155        SafeModuleConfig {
1156            safe_address: self.cfg.safe_module.safe_address,
1157            module_address: self.cfg.safe_module.module_address,
1158        }
1159    }
1160
1161    pub async fn get_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1162        self.chain_api
1163            .balance(self.me_onchain())
1164            .await
1165            .map_err(HoprLibError::chain)
1166    }
1167
1168    pub async fn get_safe_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1169        self.chain_api
1170            .balance(self.cfg.safe_module.safe_address)
1171            .await
1172            .map_err(HoprLibError::chain)
1173    }
1174
1175    pub async fn safe_allowance(&self) -> Result<HoprBalance, HoprLibError> {
1176        self.chain_api
1177            .safe_allowance(self.cfg.safe_module.safe_address)
1178            .await
1179            .map_err(HoprLibError::chain)
1180    }
1181
1182    pub async fn chain_info(&self) -> Result<ChainInfo, HoprLibError> {
1183        self.chain_api.chain_info().await.map_err(HoprLibError::chain)
1184    }
1185
1186    pub async fn get_ticket_price(&self) -> Result<HoprBalance, HoprLibError> {
1187        self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1188    }
1189
1190    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> Result<WinningProbability, HoprLibError> {
1191        self.chain_api
1192            .minimum_incoming_ticket_win_prob()
1193            .await
1194            .map_err(HoprLibError::chain)
1195    }
1196
1197    pub async fn get_channel_closure_notice_period(&self) -> Result<Duration, HoprLibError> {
1198        self.chain_api
1199            .channel_closure_notice_period()
1200            .await
1201            .map_err(HoprLibError::chain)
1202    }
1203
1204    pub async fn accounts_announced_on_chain(&self) -> Result<Vec<AccountEntry>, HoprLibError> {
1205        Ok(self
1206            .chain_api
1207            .stream_accounts(AccountSelector {
1208                public_only: true,
1209                ..Default::default()
1210            })
1211            .map_err(HoprLibError::chain)
1212            .await?
1213            .collect()
1214            .await)
1215    }
1216
1217    pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> Result<Option<Address>, HoprLibError> {
1218        let pubkey = hopr_transport::peer_id_to_public_key(peer_id)
1219            .await
1220            .map_err(HoprLibError::TransportError)?;
1221
1222        self.chain_api
1223            .packet_key_to_chain_key(&pubkey)
1224            .await
1225            .map_err(HoprLibError::chain)
1226    }
1227
1228    pub async fn chain_key_to_peerid(&self, address: &Address) -> Result<Option<PeerId>, HoprLibError> {
1229        self.chain_api
1230            .chain_key_to_packet_key(address)
1231            .await
1232            .map(|pk| pk.map(|v| v.into()))
1233            .map_err(HoprLibError::chain)
1234    }
1235
1236    pub async fn channel_from_hash(&self, channel_id: &Hash) -> Result<Option<ChannelEntry>, HoprLibError> {
1237        self.chain_api
1238            .channel_by_id(channel_id)
1239            .await
1240            .map_err(HoprLibError::chain)
1241    }
1242
1243    pub async fn channel(&self, src: &Address, dest: &Address) -> Result<Option<ChannelEntry>, HoprLibError> {
1244        self.chain_api
1245            .channel_by_parties(src, dest)
1246            .await
1247            .map_err(HoprLibError::chain)
1248    }
1249
1250    pub async fn channels_from(&self, src: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1251        Ok(self
1252            .chain_api
1253            .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1254                ChannelStatusDiscriminants::Closed,
1255                ChannelStatusDiscriminants::Open,
1256                ChannelStatusDiscriminants::PendingToClose,
1257            ]))
1258            .map_err(HoprLibError::chain)
1259            .await?
1260            .collect()
1261            .await)
1262    }
1263
1264    pub async fn channels_to(&self, dest: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1265        Ok(self
1266            .chain_api
1267            .stream_channels(
1268                ChannelSelector::default()
1269                    .with_destination(*dest)
1270                    .with_allowed_states(&[
1271                        ChannelStatusDiscriminants::Closed,
1272                        ChannelStatusDiscriminants::Open,
1273                        ChannelStatusDiscriminants::PendingToClose,
1274                    ]),
1275            )
1276            .map_err(HoprLibError::chain)
1277            .await?
1278            .collect()
1279            .await)
1280    }
1281
1282    pub async fn all_channels(&self) -> Result<Vec<ChannelEntry>, HoprLibError> {
1283        Ok(self
1284            .chain_api
1285            .stream_channels(ChannelSelector::default().with_allowed_states(&[
1286                ChannelStatusDiscriminants::Closed,
1287                ChannelStatusDiscriminants::Open,
1288                ChannelStatusDiscriminants::PendingToClose,
1289            ]))
1290            .map_err(HoprLibError::chain)
1291            .await?
1292            .collect()
1293            .await)
1294    }
1295
1296    pub async fn open_channel(
1297        &self,
1298        destination: &Address,
1299        amount: HoprBalance,
1300    ) -> Result<OpenChannelResult, HoprLibError> {
1301        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1302
1303        let channel_id = generate_channel_id(&self.me_onchain(), destination);
1304
1305        // Subscribe to chain events BEFORE sending the transaction to avoid
1306        // a race where the event is broadcast before the subscriber activates.
1307        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1308            format!("open channel to {destination} ({channel_id})"),
1309            move |event| matches!(event, ChainEvent::ChannelOpened(c) if c.get_id() == &channel_id),
1310            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1311        )?;
1312
1313        let confirm_awaiter = self
1314            .chain_api
1315            .open_channel(destination, amount)
1316            .await
1317            .map_err(HoprLibError::chain)?;
1318
1319        let tx_hash = confirm_awaiter.await.map_err(|e| {
1320            event_abort.abort();
1321            HoprLibError::chain(e)
1322        })?;
1323
1324        let event = event_awaiter.await?;
1325        debug!(%event, "open channel event received");
1326
1327        Ok(OpenChannelResult { tx_hash, channel_id })
1328    }
1329
1330    pub async fn fund_channel(&self, channel_id: &ChannelId, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1331        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1332
1333        let channel_id = *channel_id;
1334
1335        // Subscribe to chain events BEFORE sending the transaction to avoid
1336        // a race where the event is broadcast before the subscriber activates.
1337        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1338            format!("fund channel {channel_id}"),
1339            move |event| matches!(event, ChainEvent::ChannelBalanceIncreased(c, a) if c.get_id() == &channel_id && a == &amount),
1340            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT
1341        )?;
1342
1343        let confirm_awaiter = self
1344            .chain_api
1345            .fund_channel(&channel_id, amount)
1346            .await
1347            .map_err(HoprLibError::chain)?;
1348
1349        let res = confirm_awaiter.await.map_err(|e| {
1350            event_abort.abort();
1351            HoprLibError::chain(e)
1352        })?;
1353
1354        let event = event_awaiter.await?;
1355        debug!(%event, "fund channel event received");
1356
1357        Ok(res)
1358    }
1359
1360    pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> Result<CloseChannelResult, HoprLibError> {
1361        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1362
1363        let channel_id = *channel_id;
1364
1365        // Subscribe to chain events BEFORE sending the transaction to avoid
1366        // a race where the event is broadcast before the subscriber activates.
1367        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1368            format!("close channel {channel_id}"),
1369            move |event| {
1370                matches!(event, ChainEvent::ChannelClosed(c) if c.get_id() == &channel_id)
1371                    || matches!(event, ChainEvent::ChannelClosureInitiated(c) if c.get_id() == &channel_id)
1372            },
1373            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1374        )?;
1375
1376        let confirm_awaiter = self
1377            .chain_api
1378            .close_channel(&channel_id)
1379            .await
1380            .map_err(HoprLibError::chain)?;
1381
1382        let tx_hash = confirm_awaiter.await.map_err(|e| {
1383            event_abort.abort();
1384            HoprLibError::chain(e)
1385        })?;
1386
1387        let event = event_awaiter.await?;
1388        debug!(%event, "close channel event received");
1389
1390        Ok(CloseChannelResult { tx_hash })
1391    }
1392
1393    pub async fn tickets_in_channel(
1394        &self,
1395        channel_id: &ChannelId,
1396    ) -> Result<Option<Vec<RedeemableTicket>>, HoprLibError> {
1397        if let Some(channel) = self
1398            .chain_api
1399            .channel_by_id(channel_id)
1400            .await
1401            .map_err(|e| HoprTransportError::Other(e.into()))?
1402        {
1403            if &channel.destination == self.chain_api.me() {
1404                Ok(Some(
1405                    self.node_db
1406                        .stream_tickets([&channel])
1407                        .await
1408                        .map_err(HoprLibError::db)?
1409                        .collect()
1410                        .await,
1411                ))
1412            } else {
1413                Ok(None)
1414            }
1415        } else {
1416            Ok(None)
1417        }
1418    }
1419
1420    pub async fn all_tickets(&self) -> Result<Vec<VerifiedTicket>, HoprLibError> {
1421        Ok(self
1422            .node_db
1423            .stream_tickets(None::<TicketSelector>)
1424            .await
1425            .map_err(HoprLibError::db)?
1426            .map(|v| v.ticket)
1427            .collect()
1428            .await)
1429    }
1430
1431    pub async fn ticket_statistics(&self) -> Result<ChannelTicketStatistics, HoprLibError> {
1432        self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
1433    }
1434
1435    pub async fn reset_ticket_statistics(&self) -> Result<(), HoprLibError> {
1436        self.node_db
1437            .reset_ticket_statistics()
1438            .await
1439            .map_err(HoprLibError::chain)
1440    }
1441
1442    pub async fn redeem_all_tickets<B: Into<HoprBalance> + Send>(&self, min_value: B) -> Result<(), HoprLibError> {
1443        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1444
1445        let min_value = min_value.into();
1446
1447        self.chain_api
1448            .stream_channels(
1449                ChannelSelector::default()
1450                    .with_destination(self.me_onchain())
1451                    .with_allowed_states(&[
1452                        ChannelStatusDiscriminants::Open,
1453                        ChannelStatusDiscriminants::PendingToClose,
1454                    ]),
1455            )
1456            .map_err(HoprLibError::chain)
1457            .await?
1458            .map(|channel| {
1459                Ok(TicketSelector::from(&channel)
1460                    .with_amount(min_value..)
1461                    .with_index_range(channel.ticket_index..)
1462                    .with_state(AcknowledgedTicketStatus::Untouched))
1463            })
1464            .forward(self.redemption_requests()?)
1465            .await?;
1466
1467        Ok(())
1468    }
1469
1470    pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance> + Send>(
1471        &self,
1472        counterparty: &Address,
1473        min_value: B,
1474    ) -> Result<(), HoprLibError> {
1475        self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1476            .await
1477    }
1478
1479    pub async fn redeem_tickets_in_channel<B: Into<HoprBalance> + Send>(
1480        &self,
1481        channel_id: &Hash,
1482        min_value: B,
1483    ) -> Result<(), HoprLibError> {
1484        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1485
1486        let channel = self
1487            .chain_api
1488            .channel_by_id(channel_id)
1489            .await
1490            .map_err(HoprLibError::chain)?
1491            .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1492
1493        self.redemption_requests()?
1494            .send(
1495                TicketSelector::from(channel)
1496                    .with_amount(min_value.into()..)
1497                    .with_index_range(channel.ticket_index..)
1498                    .with_state(AcknowledgedTicketStatus::Untouched),
1499            )
1500            .await?;
1501
1502        Ok(())
1503    }
1504
1505    pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> Result<(), HoprLibError> {
1506        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1507
1508        self.redemption_requests()?
1509            .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1510            .await?;
1511
1512        Ok(())
1513    }
1514
1515    pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send + 'static {
1516        self.winning_ticket_subscribers.1.activate_cloned()
1517    }
1518
1519    pub fn redemption_requests(&self) -> Result<SinkMap, HoprLibError> {
1520        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1521
1522        Ok(self
1523            .redeem_requests
1524            .get()
1525            .cloned()
1526            .expect("redeem_requests is not initialized")
1527            .sink_map_err(|e| HoprLibError::GeneralError(format!("failed to send redemption request: {e}"))))
1528    }
1529
1530    pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1531        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1532
1533        self.chain_api
1534            .withdraw(amount, &recipient)
1535            .and_then(identity)
1536            .map_err(HoprLibError::chain)
1537            .await
1538    }
1539
1540    pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> Result<Hash, HoprLibError> {
1541        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1542
1543        self.chain_api
1544            .withdraw(amount, &recipient)
1545            .and_then(identity)
1546            .map_err(HoprLibError::chain)
1547            .await
1548    }
1549}