Skip to main content

hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15
16/// Helper functions.
17mod helpers;
18
19/// Configuration-related public types
20pub mod config;
21/// Various public constants.
22pub mod constants;
23/// Lists all errors thrown from this library.
24pub mod errors;
25/// Public traits for interactions with this library.
26pub mod traits;
27/// Utility module with helper types and functionality over hopr-lib behavior.
28pub mod utils;
29
30pub use hopr_api as api;
31
32/// Exports of libraries necessary for API and interface operations.
33#[doc(hidden)]
34pub mod exports {
35    pub mod types {
36        pub use hopr_api::types::{chain, internal, primitive};
37    }
38
39    pub mod crypto {
40        pub use hopr_api::types::crypto as types;
41        pub use hopr_crypto_keypair as keypair;
42    }
43
44    pub mod network {
45        pub use hopr_network_types as types;
46    }
47
48    pub use hopr_transport as transport;
49}
50
51/// Export of relevant types for easier integration.
52#[doc(hidden)]
53pub mod prelude {
54    #[cfg(feature = "runtime-tokio")]
55    pub use super::exports::network::types::{
56        prelude::ForeignDataMode,
57        udp::{ConnectedUdpStream, UdpStreamParallelism},
58    };
59    pub use super::exports::{
60        crypto::{
61            keypair::key_pair::HoprKeys,
62            types::prelude::{ChainKeypair, Hash, OffchainKeypair},
63        },
64        transport::{OffchainPublicKey, socket::HoprSocket},
65        types::primitive::prelude::Address,
66    };
67}
68
69use std::{
70    convert::identity,
71    future::Future,
72    sync::{Arc, OnceLock, atomic::Ordering},
73    time::Duration,
74};
75
76use futures::{
77    FutureExt, SinkExt, Stream, StreamExt, TryFutureExt,
78    channel::mpsc::{SendError, channel},
79    pin_mut,
80    sink::SinkMapErr,
81};
82use futures_time::future::FutureExt as FuturesTimeFutureExt;
83use hopr_api::{
84    chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
85    ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
86    db::{HoprNodeDbApi, TicketMarker, TicketSelector},
87    node::{ChainInfo, CloseChannelResult, OpenChannelResult, SafeModuleConfig, state::AtomicHoprState},
88};
89pub use hopr_api::{
90    db::ChannelTicketStatistics,
91    graph::EdgeLinkObservable,
92    network::{NetworkBuilder, NetworkStreamControl},
93    node::{HoprNodeNetworkOperations, HoprNodeOperations, state::HoprState},
94    types::{crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
95};
96use hopr_async_runtime::prelude::spawn;
97pub use hopr_async_runtime::{Abortable, AbortableList};
98pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
99pub use hopr_network_types::prelude::*;
100#[cfg(all(feature = "telemetry", not(test)))]
101use hopr_platform::time::native::current_time;
102use hopr_transport::errors::HoprTransportError;
103#[cfg(feature = "runtime-tokio")]
104pub use hopr_transport::transfer_session;
105pub use hopr_transport::*;
106use tracing::{debug, error, info, warn};
107use validator::Validate;
108
109pub use crate::{
110    config::SafeModule,
111    constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
112    errors::{HoprLibError, HoprStatusError},
113};
114
115/// Public routing configuration for session opening in `hopr-lib`.
116///
117/// This intentionally exposes only hop-count based routing.
118#[cfg(feature = "session-client")]
119#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, smart_default::SmartDefault)]
120#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
121pub struct HopRouting(
122    #[default(hopr_api::types::primitive::bounded::BoundedSize::MIN)]
123    hopr_api::types::primitive::bounded::BoundedSize<
124        { hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS },
125    >,
126);
127
128#[cfg(feature = "session-client")]
129impl HopRouting {
130    /// Maximum number of hops that can be configured.
131    pub const MAX_HOPS: usize = hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS;
132
133    /// Returns the configured number of hops.
134    pub fn hop_count(self) -> usize {
135        self.0.into()
136    }
137}
138
139#[cfg(feature = "session-client")]
140impl TryFrom<usize> for HopRouting {
141    type Error = hopr_api::types::primitive::errors::GeneralError;
142
143    fn try_from(value: usize) -> Result<Self, Self::Error> {
144        Ok(Self(value.try_into()?))
145    }
146}
147
148#[cfg(feature = "session-client")]
149impl From<HopRouting> for hopr_api::types::internal::routing::RoutingOptions {
150    fn from(value: HopRouting) -> Self {
151        Self::Hops(value.0)
152    }
153}
154
155/// Session client configuration for `hopr-lib`.
156///
157/// Unlike transport-level configuration, this API intentionally does not expose
158/// explicit intermediate paths.
159#[cfg(feature = "session-client")]
160#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault)]
161pub struct HoprSessionClientConfig {
162    /// Forward route selection policy.
163    pub forward_path: HopRouting,
164    /// Return route selection policy.
165    pub return_path: HopRouting,
166    /// Capabilities offered by the session.
167    #[default(_code = "SessionCapability::Segmentation.into()")]
168    pub capabilities: SessionCapabilities,
169    /// Optional pseudonym used for the session. Mostly useful for testing only.
170    #[default(None)]
171    pub pseudonym: Option<hopr_api::types::internal::protocol::HoprPseudonym>,
172    /// Enable automatic SURB management for the session.
173    #[default(Some(SurbBalancerConfig::default()))]
174    pub surb_management: Option<SurbBalancerConfig>,
175    /// If set, the maximum number of possible SURBs will always be sent with session data packets.
176    #[default(false)]
177    pub always_max_out_surbs: bool,
178}
179
180#[cfg(feature = "session-client")]
181impl From<HoprSessionClientConfig> for hopr_transport::SessionClientConfig {
182    fn from(value: HoprSessionClientConfig) -> Self {
183        Self {
184            forward_path_options: value.forward_path.into(),
185            return_path_options: value.return_path.into(),
186            capabilities: value.capabilities,
187            pseudonym: value.pseudonym,
188            surb_management: value.surb_management,
189            always_max_out_surbs: value.always_max_out_surbs,
190        }
191    }
192}
193
194/// Long-running tasks that are spawned by the HOPR node.
195#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display, strum::EnumCount)]
196pub enum HoprLibProcess {
197    #[strum(to_string = "transport: {0}")]
198    Transport(HoprTransportProcess),
199    #[strum(to_string = "session server providing the exit node session stream functionality")]
200    SessionServer,
201    #[strum(to_string = "ticket redemption queue driver")]
202    TicketRedemptions,
203    #[strum(to_string = "subscription for on-chain channel updates")]
204    ChannelEvents,
205    #[strum(to_string = "on received ticket event (winning or rejected)")]
206    TicketEvents,
207}
208
209#[cfg(all(feature = "telemetry", not(test)))]
210lazy_static::lazy_static! {
211    static ref METRIC_PROCESS_START_TIME:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
212        "hopr_start_time",
213        "The unix timestamp in seconds at which the process was started"
214    ).unwrap();
215    static ref METRIC_HOPR_LIB_VERSION:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
216        "hopr_lib_version",
217        "Executed version of hopr-lib",
218        &["version"]
219    ).unwrap();
220    static ref METRIC_HOPR_NODE_INFO:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
221        "hopr_node_addresses",
222        "Node on-chain and off-chain addresses",
223        &["peerid", "address", "safe_address", "module_address"]
224    ).unwrap();
225}
226
227/// Prepare an optimized version of the tokio runtime setup for hopr-lib specifically.
228///
229/// Divide the available CPU parallelism by 2, since half of the available threads are
230/// to be used for IO-bound and half for CPU-bound tasks.
231#[cfg(feature = "runtime-tokio")]
232pub fn prepare_tokio_runtime(
233    num_cpu_threads: Option<std::num::NonZeroUsize>,
234    num_io_threads: Option<std::num::NonZeroUsize>,
235) -> anyhow::Result<tokio::runtime::Runtime> {
236    use std::str::FromStr;
237    let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
238
239    hopr_parallelize::cpu::init_thread_pool(
240        num_cpu_threads
241            .map(|v| v.get())
242            .or(avail_parallelism)
243            .ok_or(anyhow::anyhow!(
244                "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
245                 environment variable."
246            ))?
247            .max(1),
248    )?;
249
250    Ok(tokio::runtime::Builder::new_multi_thread()
251        .enable_all()
252        .worker_threads(
253            num_io_threads
254                .map(|v| v.get())
255                .or(avail_parallelism)
256                .ok_or(anyhow::anyhow!(
257                    "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
258                     environment variable."
259                ))?
260                .max(1),
261        )
262        .thread_name("hoprd")
263        .thread_stack_size(
264            std::env::var("HOPRD_THREAD_STACK_SIZE")
265                .ok()
266                .and_then(|v| usize::from_str(&v).ok())
267                .unwrap_or(10 * 1024 * 1024)
268                .max(2 * 1024 * 1024),
269        )
270        .build()?)
271}
272
273/// Type alias used to send and receive transport data via a running HOPR node.
274pub type HoprTransportIO = socket::HoprSocket<
275    futures::channel::mpsc::Receiver<ApplicationDataIn>,
276    futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
277>;
278
279type NewTicketEvents = (
280    async_broadcast::Sender<VerifiedTicket>,
281    async_broadcast::InactiveReceiver<VerifiedTicket>,
282);
283
284/// Time to wait until the node's keybinding appears on-chain
285const NODE_READY_TIMEOUT: Duration = Duration::from_secs(120);
286
287/// Timeout to wait until an on-chain event is received in response to a successful on-chain operation resolution.
288// TODO: use the value from ChainInfo instead (once available via https://github.com/hoprnet/blokli/issues/200)
289const ON_CHAIN_RESOLUTION_EVENT_TIMEOUT: Duration = Duration::from_secs(90);
290
291/// HOPR main object providing the entire HOPR node functionality
292///
293/// Instantiating this object creates all processes and objects necessary for
294/// running the HOPR node. Once created, the node can be started using the
295/// `run()` method.
296///
297/// Externally offered API should be enough to perform all necessary tasks
298/// with the HOPR node manually, but it is advised to create such a configuration
299/// that manual interaction is unnecessary.
300///
301/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
302pub struct Hopr<Chain, Db, Graph, Net>
303where
304    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
305        + hopr_api::graph::NetworkGraphUpdate
306        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
307        + Clone
308        + Send
309        + Sync
310        + 'static,
311    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
312        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
313    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
314{
315    me: OffchainKeypair,
316    cfg: config::HoprLibConfig,
317    state: Arc<api::node::state::AtomicHoprState>,
318    transport_api: HoprTransport<Chain, Db, Graph, Net>,
319    redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
320    node_db: Db,
321    chain_api: Chain,
322    winning_ticket_subscribers: NewTicketEvents,
323    processes: OnceLock<AbortableList<HoprLibProcess>>,
324}
325
326impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
327where
328    Chain: HoprChainApi + Clone + Send + Sync + 'static,
329    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
330    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
331        + hopr_api::graph::NetworkGraphUpdate
332        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
333        + Clone
334        + Send
335        + Sync
336        + 'static,
337    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
338        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
339    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
340{
341    pub async fn new(
342        identity: (&ChainKeypair, &OffchainKeypair),
343        hopr_chain_api: Chain,
344        hopr_node_db: Db,
345        graph: Graph,
346        cfg: config::HoprLibConfig,
347    ) -> errors::Result<Self> {
348        if hopr_api::types::crypto_random::is_rng_fixed() {
349            warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
350        }
351
352        cfg.validate()?;
353
354        let hopr_transport_api = HoprTransport::new(
355            identity,
356            hopr_chain_api.clone(),
357            hopr_node_db.clone(),
358            graph,
359            vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
360            cfg.protocol,
361        )
362        .map_err(HoprLibError::TransportError)?;
363
364        #[cfg(all(feature = "telemetry", not(test)))]
365        {
366            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
367            METRIC_HOPR_LIB_VERSION.set(
368                &[const_format::formatcp!("{}", constants::APP_VERSION)],
369                const_format::formatcp!(
370                    "{}.{}",
371                    env!("CARGO_PKG_VERSION_MAJOR"),
372                    env!("CARGO_PKG_VERSION_MINOR")
373                )
374                .parse()
375                .unwrap_or(0.0),
376            );
377
378            // Calling get_ticket_statistics will initialize the respective metrics on tickets
379            if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
380                error!(%error, "failed to initialize ticket statistics metrics");
381            }
382        }
383
384        let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
385        new_tickets_tx.set_await_active(false);
386        new_tickets_tx.set_overflow(true);
387
388        Ok(Self {
389            me: identity.1.clone(),
390            cfg,
391            state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
392            transport_api: hopr_transport_api,
393            chain_api: hopr_chain_api,
394            node_db: hopr_node_db,
395            redeem_requests: OnceLock::new(),
396            processes: OnceLock::new(),
397            winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
398        })
399    }
400
401    fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
402        if HoprNodeOperations::status(self) == state {
403            Ok(())
404        } else {
405            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
406        }
407    }
408
409    pub fn config(&self) -> &config::HoprLibConfig {
410        &self.cfg
411    }
412
413    #[inline]
414    fn is_public(&self) -> bool {
415        self.cfg.publish
416    }
417
418    // TODO(20260218): @NumberFour8 abstract the telemetry objects properly and extract this API into a telemetry trait
419    /// Get packet stats for a specific peer.
420    #[cfg(feature = "telemetry")]
421    pub async fn network_peer_packet_stats(&self, peer: &PeerId) -> errors::Result<Option<PeerPacketStatsSnapshot>> {
422        Ok(self.transport_api.network_peer_packet_stats(peer).await?)
423    }
424
425    // TODO(20260218): @NumberFour8 abstract the telemetry objects properly and extract this API into a telemetry trait
426    /// Get packet stats for all connected peers.
427    #[cfg(feature = "telemetry")]
428    pub async fn network_all_packet_stats(&self) -> errors::Result<Vec<(PeerId, PeerPacketStatsSnapshot)>> {
429        Ok(self.transport_api.network_all_packet_stats().await?)
430    }
431
432    pub async fn run<
433        Ct,
434        NetBuilder,
435        #[cfg(feature = "session-server")] T: traits::HoprSessionServer + Clone + Send + 'static,
436    >(
437        &self,
438        cover_traffic: Ct,
439        network_builder: NetBuilder,
440        #[cfg(feature = "session-server")] serve_handler: T,
441    ) -> errors::Result<HoprTransportIO>
442    where
443        Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
444        NetBuilder: NetworkBuilder<Network = Net> + Send + Sync + 'static,
445    {
446        self.error_if_not_in_state(
447            HoprState::Uninitialized,
448            "cannot start the hopr node multiple times".into(),
449        )?;
450
451        #[cfg(feature = "testing")]
452        warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
453
454        let me_onchain = *self.chain_api.me();
455        info!(
456            address = %me_onchain, minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
457            "node is not started, please fund this node",
458        );
459
460        self.state.store(HoprState::WaitingForFunds, Ordering::Relaxed);
461        helpers::wait_for_funds(
462            *MIN_NATIVE_BALANCE,
463            *SUGGESTED_NATIVE_BALANCE,
464            Duration::from_secs(200),
465            me_onchain,
466            &self.chain_api,
467        )
468        .await?;
469
470        let mut processes = AbortableList::<HoprLibProcess>::default();
471
472        info!("starting HOPR node...");
473        self.state.store(HoprState::CheckingBalance, Ordering::Relaxed);
474
475        let balance: XDaiBalance = self.chain_api.balance(me_onchain).await.map_err(HoprLibError::chain)?;
476        let minimum_balance = *constants::MIN_NATIVE_BALANCE;
477
478        info!(
479            address = %me_onchain,
480            %balance,
481            %minimum_balance,
482            "node information"
483        );
484
485        if balance.le(&minimum_balance) {
486            return Err(HoprLibError::GeneralError(
487                "cannot start the node without a sufficiently funded wallet".into(),
488            ));
489        }
490
491        self.state.store(HoprState::ValidatingNetworkConfig, Ordering::Relaxed);
492
493        // Once we are able to query the chain,
494        // check if the ticket price is configured correctly.
495        let network_min_ticket_price = self
496            .chain_api
497            .minimum_ticket_price()
498            .await
499            .map_err(HoprLibError::chain)?;
500        let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
501        if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
502            return Err(HoprLibError::GeneralError(format!(
503                "configured outgoing ticket price is lower than the network minimum ticket price: \
504                 {configured_ticket_price:?} < {network_min_ticket_price}"
505            )));
506        }
507        // Once we are able to query the chain,
508        // check if the winning probability is configured correctly.
509        let network_min_win_prob = self
510            .chain_api
511            .minimum_incoming_ticket_win_prob()
512            .await
513            .map_err(HoprLibError::chain)?;
514        let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
515        if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
516            && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
517        {
518            return Err(HoprLibError::GeneralError(format!(
519                "configured outgoing ticket winning probability is lower than the network minimum winning \
520                 probability: {configured_win_prob:?} < {network_min_win_prob}"
521            )));
522        }
523
524        self.state.store(HoprState::CheckingOnchainAddress, Ordering::Relaxed);
525
526        info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
527
528        let safe_addr = self.cfg.safe_module.safe_address;
529
530        if self.me_onchain() == safe_addr {
531            return Err(HoprLibError::GeneralError(
532                "cannot use self as staking safe address".into(),
533            ));
534        }
535
536        self.state.store(HoprState::RegisteringSafe, Ordering::Relaxed);
537        info!(%safe_addr, "registering safe with this node");
538        match self.chain_api.register_safe(&safe_addr).await {
539            Ok(awaiter) => {
540                // Wait until the registration is confirmed on-chain, otherwise we cannot proceed.
541                awaiter.await.map_err(|error| {
542                    error!(%safe_addr, %error, "safe registration failed with error");
543                    HoprLibError::chain(error)
544                })?;
545                info!(%safe_addr, "safe successfully registered with this node");
546            }
547            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
548                info!(%safe_addr, "this safe is already registered with this node");
549            }
550            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
551                // TODO: support safe deregistration flow
552                error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
553                return Err(HoprLibError::GeneralError("node registered with different safe".into()));
554            }
555            Err(error) => {
556                error!(%safe_addr, %error, "safe registration failed");
557                return Err(HoprLibError::chain(error));
558            }
559        }
560
561        // Only public nodes announce multiaddresses
562        let multiaddresses_to_announce = if self.is_public() {
563            // The multiaddresses are filtered for the non-private ones,
564            // unless `announce_local_addresses` is set to `true`.
565            self.transport_api.announceable_multiaddresses()
566        } else {
567            Vec::with_capacity(0)
568        };
569
570        // Warn when announcing a private multiaddress, which is acceptable in certain scenarios
571        multiaddresses_to_announce
572            .iter()
573            .filter(|a| !is_public_address(a))
574            .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
575
576        self.state.store(HoprState::AnnouncingNode, Ordering::Relaxed);
577
578        let chain_api = self.chain_api.clone();
579        let me_offchain = *self.me.public();
580        let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
581
582        // At this point the node is already registered with Safe, so
583        // we can announce via Safe-compliant TX
584        info!(?multiaddresses_to_announce, "announcing node on chain");
585        match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
586            Ok(awaiter) => {
587                // Wait until the announcement is confirmed on-chain, otherwise we cannot proceed.
588                awaiter.await.map_err(|error| {
589                    error!(?multiaddresses_to_announce, %error, "node announcement failed");
590                    HoprLibError::chain(error)
591                })?;
592                info!(?multiaddresses_to_announce, "node has been successfully announced");
593            }
594            Err(AnnouncementError::AlreadyAnnounced) => {
595                info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
596            }
597            Err(error) => {
598                error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
599                return Err(HoprLibError::chain(error));
600            }
601        }
602
603        self.state.store(HoprState::AwaitingKeyBinding, Ordering::Relaxed);
604
605        // Wait for the node key-binding readiness to return
606        let this_node_account = node_ready
607            .await
608            .map_err(HoprLibError::other)?
609            .map_err(HoprLibError::chain)?;
610        if this_node_account.chain_addr != self.me_onchain()
611            || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
612        {
613            error!(%this_node_account, "account bound to offchain key does not match this node");
614            return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
615        }
616
617        info!(%this_node_account, "node account is ready");
618
619        self.state.store(HoprState::InitializingServices, Ordering::Relaxed);
620
621        info!("initializing session infrastructure");
622        let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
623            .ok()
624            .and_then(|s| s.trim().parse::<usize>().ok())
625            .filter(|&c| c > 0)
626            .unwrap_or(256);
627
628        let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
629        #[cfg(feature = "session-server")]
630        {
631            debug!(capacity = incoming_session_channel_capacity, "creating session server");
632            processes.insert(
633                HoprLibProcess::SessionServer,
634                hopr_async_runtime::spawn_as_abortable!(
635                    _session_rx
636                        .for_each_concurrent(None, move |session| {
637                            let serve_handler = serve_handler.clone();
638                            async move {
639                                let session_id = *session.session.id();
640                                match serve_handler.process(session).await {
641                                    Ok(_) => debug!(?session_id, "client session processed successfully"),
642                                    Err(error) => error!(
643                                        ?session_id,
644                                        %error,
645                                        "client session processing failed"
646                                    ),
647                                }
648                            }
649                        })
650                        .inspect(|_| tracing::warn!(
651                            task = %HoprLibProcess::SessionServer,
652                            "long-running background task finished"
653                        ))
654                ),
655            );
656        }
657
658        info!("starting ticket events processor");
659        let (tickets_tx, tickets_rx) = channel(8192);
660        let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
661        processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
662        let node_db = self.node_db.clone();
663        let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
664        spawn(
665            tickets_rx
666                .filter_map(move |ticket_event| {
667                    let node_db = node_db.clone();
668                    async move {
669                        match ticket_event {
670                            TicketEvent::WinningTicket(winning) => {
671                                if let Err(error) = node_db.insert_ticket(*winning).await {
672                                    tracing::error!(%error, %winning, "failed to insert ticket into database");
673                                } else {
674                                    tracing::debug!(%winning, "inserted ticket into database");
675                                }
676                                Some(winning)
677                            }
678                            TicketEvent::RejectedTicket(rejected, issuer) => {
679                                if let Some(issuer) = &issuer {
680                                    if let Err(error) =
681                                        node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
682                                    {
683                                        tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
684                                    } else {
685                                        tracing::debug!(%rejected, "marked ticket as rejected");
686                                    }
687                                } else {
688                                    tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
689                                }
690                                None
691                            }
692                        }
693                    }
694                })
695                .for_each(move |ticket| {
696                    if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
697                        tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
698                    }
699                    futures::future::ready(())
700                })
701                .inspect(|_| {
702                    tracing::warn!(
703                        task = %HoprLibProcess::TicketEvents,
704                        "long-running background task finished"
705                    )
706                }),
707        );
708
709        info!("starting transport");
710        let (hopr_socket, transport_processes) = self
711            .transport_api
712            .run(cover_traffic, network_builder, tickets_tx, session_tx)
713            .await?;
714        processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
715
716        info!("starting ticket redemption service");
717        // Start a queue that takes care of redeeming tickets via given TicketSelectors
718        let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
719        let _ = self.redeem_requests.set(redemption_req_tx);
720        let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
721        processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
722        let chain = self.chain_api.clone();
723        let node_db = self.node_db.clone();
724        spawn(redemption_req_rx
725            .for_each(move |selector| {
726                let chain = chain.clone();
727                let db = node_db.clone();
728                async move {
729                    match chain.redeem_tickets_via_selectors(&db, [selector]).await {
730                        Ok(res) => debug!(%res, "redemption complete"),
731                        Err(error) => error!(%error, "redemption failed"),
732                    }
733                }
734            })
735            .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
736        );
737
738        info!("subscribing to channel events");
739        let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
740        processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
741        let chain = self.chain_api.clone();
742        let node_db = self.node_db.clone();
743        let events = chain.subscribe().map_err(HoprLibError::chain)?;
744        spawn(
745            futures::stream::Abortable::new(
746                events
747                    .filter_map(move |event|
748                        futures::future::ready(event.try_as_channel_closed())
749                    ),
750                chain_events_sub_reg
751            )
752            .for_each(move |closed_channel| {
753                let node_db = node_db.clone();
754                let chain = chain.clone();
755                async move {
756                    match closed_channel.direction(chain.me()) {
757                        Some(ChannelDirection::Incoming) => {
758                            match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
759                                Ok(num_neglected) if num_neglected > 0 => {
760                                    warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
761                                },
762                                Ok(_) => {
763                                    debug!(%closed_channel, "no neglected tickets on incoming closed channel");
764                                },
765                                Err(error) => {
766                                    error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
767                                }
768                            }
769                        },
770                        Some(ChannelDirection::Outgoing) => {
771                            if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
772                                error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
773                            } else {
774                                debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
775                            }
776                        }
777                        _ => {} // Event for a channel that is not our own
778                    }
779                }
780            })
781            .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
782        );
783
784        info!("synchronizing ticket states");
785        // NOTE: after the chain is synced, we can reset tickets which are considered
786        // redeemed but on-chain state does not align with that. This implies there was a problem
787        // right when the transaction was sent on-chain. In such cases, we simply let it retry and
788        // handle errors appropriately.
789        let mut channels = self
790            .chain_api
791            .stream_channels(ChannelSelector {
792                destination: self.me_onchain().into(),
793                ..Default::default()
794            })
795            .map_err(HoprLibError::chain)
796            .await?;
797
798        while let Some(channel) = channels.next().await {
799            // Set the state of all unredeemed tickets with a higher index than the current
800            // channel index as untouched.
801            self.node_db
802                .update_ticket_states_and_fetch(
803                    [TicketSelector::from(&channel)
804                        .with_state(AcknowledgedTicketStatus::BeingRedeemed)
805                        .with_index_range(channel.ticket_index..)],
806                    AcknowledgedTicketStatus::Untouched,
807                )
808                .map_err(HoprLibError::db)
809                .await?
810                .for_each(|ticket| {
811                    info!(%ticket, "fixed next out-of-sync ticket");
812                    futures::future::ready(())
813                })
814                .await;
815
816            // Mark all the tickets with a lower ticket index than the current channel index as neglected.
817            self.node_db
818                .mark_tickets_as(
819                    [TicketSelector::from(&channel).with_index_range(..channel.ticket_index)],
820                    TicketMarker::Neglected,
821                )
822                .map_err(HoprLibError::db)
823                .await?;
824        }
825
826        self.state.store(HoprState::Running, Ordering::Relaxed);
827
828        info!(
829            id = %self.me_peer_id(),
830            version = constants::APP_VERSION,
831            "NODE STARTED AND RUNNING"
832        );
833
834        #[cfg(all(feature = "telemetry", not(test)))]
835        METRIC_HOPR_NODE_INFO.set(
836            &[
837                &self.me.public().to_peerid_str(),
838                &me_onchain.to_string(),
839                &self.cfg.safe_module.safe_address.to_string(),
840                &self.cfg.safe_module.module_address.to_string(),
841            ],
842            1.0,
843        );
844
845        let _ = self.processes.set(processes);
846        Ok(hopr_socket)
847    }
848
849    /// Used to practically shut down all node's processes without dropping the instance.
850    ///
851    /// This means that the instance can be used to retrieve some information, but all
852    /// active operations will stop and new will be impossible to perform.
853    /// Such operations will return [`HoprStatusError::NotThereYet`].
854    ///
855    /// This is the final state and cannot be reversed by calling `run` again.
856    pub fn shutdown(&self) -> Result<(), HoprLibError> {
857        self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
858        if let Some(processes) = self.processes.get() {
859            processes.abort_all();
860        }
861        self.state.store(HoprState::Terminated, Ordering::Relaxed);
862        info!("NODE SHUTDOWN COMPLETE");
863        Ok(())
864    }
865
866    /// Create a client session connection returning a session object that implements
867    /// [`futures::io::AsyncRead`] and [`futures::io::AsyncWrite`] and can bu used as a read/write binary session.
868    #[cfg(feature = "session-client")]
869    pub async fn connect_to(
870        &self,
871        destination: Address,
872        target: SessionTarget,
873        cfg: HoprSessionClientConfig,
874    ) -> errors::Result<HoprSession> {
875        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
876
877        let backoff = backon::ConstantBuilder::default()
878            .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
879            .with_delay(self.cfg.protocol.session.establish_retry_timeout)
880            .with_jitter();
881
882        use backon::Retryable;
883
884        Ok((|| {
885            let cfg = hopr_transport::SessionClientConfig::from(cfg.clone());
886            let target = target.clone();
887            async { self.transport_api.new_session(destination, target, cfg).await }
888        })
889        .retry(backoff)
890        .sleep(backon::FuturesTimerSleeper)
891        .await?)
892    }
893
894    /// Sends keep-alive to the given [`SessionId`], making sure the session is not
895    /// closed due to inactivity.
896    #[cfg(feature = "session-client")]
897    pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
898        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
899        Ok(self.transport_api.probe_session(id).await?)
900    }
901
902    #[cfg(feature = "session-client")]
903    pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
904        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
905        Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
906    }
907
908    #[cfg(all(feature = "session-client", feature = "telemetry"))]
909    pub async fn get_session_stats(&self, id: &SessionId) -> errors::Result<SessionStatsSnapshot> {
910        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
911        Ok(self.transport_api.session_stats(id).await?)
912    }
913
914    #[cfg(feature = "session-client")]
915    pub async fn update_session_surb_balancer_config(
916        &self,
917        id: &SessionId,
918        cfg: SurbBalancerConfig,
919    ) -> errors::Result<()> {
920        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
921        Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
922    }
923
924    /// Spawns a one-shot awaiter that hooks up to the [`ChainEvent`] bus and either matching the given `predicate`
925    /// successfully or timing out after `timeout`.
926    fn spawn_wait_for_on_chain_event(
927        &self,
928        context: impl std::fmt::Display,
929        predicate: impl Fn(&ChainEvent) -> bool + Send + Sync + 'static,
930        timeout: Duration,
931    ) -> errors::Result<(
932        impl Future<Output = errors::Result<ChainEvent>>,
933        hopr_async_runtime::AbortHandle,
934    )> {
935        debug!(%context, "registering wait for on-chain event");
936        let (event_stream, handle) = futures::stream::abortable(
937            self.chain_api
938                .subscribe()
939                .map_err(HoprLibError::chain)?
940                .skip_while(move |event| futures::future::ready(!predicate(event))),
941        );
942
943        let ctx = context.to_string();
944
945        Ok((
946            spawn(async move {
947                pin_mut!(event_stream);
948                let res = event_stream
949                    .next()
950                    .timeout(futures_time::time::Duration::from(timeout))
951                    .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")))
952                    .await?
953                    .ok_or(HoprLibError::GeneralError(format!(
954                        "failed to yield an on-chain event for {ctx}"
955                    )));
956                debug!(%ctx, ?res, "on-chain event waiting done");
957                res
958            })
959            .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")))
960            .and_then(futures::future::ready),
961            handle,
962        ))
963    }
964}
965
966impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
967where
968    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
969        + hopr_api::graph::NetworkGraphUpdate
970        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
971        + Clone
972        + Send
973        + Sync
974        + 'static,
975    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
976        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
977    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
978{
979    // === telemetry
980    /// Prometheus formatted metrics collected by the hopr-lib components.
981    pub fn collect_hopr_metrics() -> errors::Result<String> {
982        cfg_if::cfg_if! {
983            if #[cfg(all(feature = "telemetry", not(test)))] {
984                hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
985            } else {
986                Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
987            }
988        }
989    }
990}
991
992// === Trait implementations for the high-level node API ===
993
994impl<Chain, Db, Graph, Net> HoprNodeOperations for Hopr<Chain, Db, Graph, Net>
995where
996    Chain: HoprChainApi + Clone + Send + Sync + 'static,
997    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
998    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
999        + hopr_api::graph::NetworkGraphUpdate
1000        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1001        + Clone
1002        + Send
1003        + Sync
1004        + 'static,
1005    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
1006        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1007    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1008{
1009    fn status(&self) -> HoprState {
1010        self.state.load(Ordering::Relaxed)
1011    }
1012}
1013
1014#[async_trait::async_trait]
1015impl<Chain, Db, Graph, Net> HoprNodeNetworkOperations for Hopr<Chain, Db, Graph, Net>
1016where
1017    Chain: HoprChainApi + Clone + Send + Sync + 'static,
1018    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
1019    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
1020        + hopr_api::graph::NetworkGraphUpdate
1021        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1022        + Clone
1023        + Send
1024        + Sync
1025        + 'static,
1026    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
1027        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1028    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1029{
1030    type Error = HoprLibError;
1031    type TransportObservable = <Graph as hopr_api::graph::NetworkGraphView>::Observed;
1032
1033    fn me_peer_id(&self) -> PeerId {
1034        (*self.me.public()).into()
1035    }
1036
1037    async fn get_public_nodes(&self) -> Result<Vec<(PeerId, Address, Vec<Multiaddr>)>, Self::Error> {
1038        Ok(self
1039            .chain_api
1040            .stream_accounts(AccountSelector {
1041                public_only: true,
1042                ..Default::default()
1043            })
1044            .map_err(HoprLibError::chain)
1045            .await?
1046            .map(|entry| {
1047                (
1048                    PeerId::from(entry.public_key),
1049                    entry.chain_addr,
1050                    entry.get_multiaddrs().to_vec(),
1051                )
1052            })
1053            .collect()
1054            .await)
1055    }
1056
1057    async fn network_health(&self) -> hopr_api::network::Health {
1058        self.transport_api.network_health().await
1059    }
1060
1061    async fn network_connected_peers(&self) -> Result<Vec<PeerId>, Self::Error> {
1062        Ok(self
1063            .transport_api
1064            .network_connected_peers()
1065            .await?
1066            .into_iter()
1067            .map(PeerId::from)
1068            .collect())
1069    }
1070
1071    fn network_peer_info(&self, peer: &PeerId) -> Option<Self::TransportObservable> {
1072        let pubkey = OffchainPublicKey::from_peerid(peer).ok()?;
1073        self.transport_api.network_peer_observations(&pubkey)
1074    }
1075
1076    async fn all_network_peers(
1077        &self,
1078        minimum_score: f64,
1079    ) -> Result<Vec<(Option<Address>, PeerId, Self::TransportObservable)>, Self::Error> {
1080        Ok(
1081            futures::stream::iter(self.transport_api.all_network_peers(minimum_score).await?)
1082                .filter_map(|(pubkey, info)| async move {
1083                    let peer_id = PeerId::from(pubkey);
1084                    let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
1085                    Some((address, peer_id, info))
1086                })
1087                .collect::<Vec<_>>()
1088                .await,
1089        )
1090    }
1091
1092    fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1093        self.transport_api.local_multiaddresses()
1094    }
1095
1096    async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1097        self.transport_api.listening_multiaddresses().await
1098    }
1099
1100    async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1101        let Ok(pubkey) = hopr_transport::peer_id_to_public_key(peer).await else {
1102            return vec![];
1103        };
1104        self.transport_api.network_observed_multiaddresses(&pubkey).await
1105    }
1106
1107    async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Result<Vec<Multiaddr>, Self::Error> {
1108        let pubkey = hopr_transport::peer_id_to_public_key(peer)
1109            .await
1110            .map_err(HoprLibError::TransportError)?;
1111
1112        match self
1113            .chain_api
1114            .stream_accounts(AccountSelector {
1115                public_only: false,
1116                offchain_key: Some(pubkey),
1117                ..Default::default()
1118            })
1119            .map_err(HoprLibError::chain)
1120            .await?
1121            .next()
1122            .await
1123        {
1124            Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
1125            None => {
1126                error!(%peer, "no information");
1127                Ok(vec![])
1128            }
1129        }
1130    }
1131
1132    async fn ping(&self, peer: &PeerId) -> Result<(Duration, Self::TransportObservable), Self::Error> {
1133        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1134        let pubkey = hopr_transport::peer_id_to_public_key(peer)
1135            .await
1136            .map_err(HoprLibError::TransportError)?;
1137        Ok(self.transport_api.ping(&pubkey).await?)
1138    }
1139}
1140
1141pub type SinkMap = SinkMapErr<futures::channel::mpsc::Sender<TicketSelector>, fn(SendError) -> HoprLibError>;
1142
1143impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
1144where
1145    Chain: HoprChainApi + Clone + Send + Sync + 'static,
1146    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
1147    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
1148        + hopr_api::graph::NetworkGraphUpdate
1149        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1150        + Clone
1151        + Send
1152        + Sync
1153        + 'static,
1154    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
1155        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1156    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1157{
1158    pub fn me_onchain(&self) -> Address {
1159        *self.chain_api.me()
1160    }
1161
1162    pub fn get_safe_config(&self) -> SafeModuleConfig {
1163        SafeModuleConfig {
1164            safe_address: self.cfg.safe_module.safe_address,
1165            module_address: self.cfg.safe_module.module_address,
1166        }
1167    }
1168
1169    pub async fn get_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1170        self.chain_api
1171            .balance(self.me_onchain())
1172            .await
1173            .map_err(HoprLibError::chain)
1174    }
1175
1176    pub async fn get_safe_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1177        self.chain_api
1178            .balance(self.cfg.safe_module.safe_address)
1179            .await
1180            .map_err(HoprLibError::chain)
1181    }
1182
1183    pub async fn safe_allowance(&self) -> Result<HoprBalance, HoprLibError> {
1184        self.chain_api
1185            .safe_allowance(self.cfg.safe_module.safe_address)
1186            .await
1187            .map_err(HoprLibError::chain)
1188    }
1189
1190    pub async fn chain_info(&self) -> Result<ChainInfo, HoprLibError> {
1191        self.chain_api.chain_info().await.map_err(HoprLibError::chain)
1192    }
1193
1194    pub async fn get_ticket_price(&self) -> Result<HoprBalance, HoprLibError> {
1195        self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1196    }
1197
1198    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> Result<WinningProbability, HoprLibError> {
1199        self.chain_api
1200            .minimum_incoming_ticket_win_prob()
1201            .await
1202            .map_err(HoprLibError::chain)
1203    }
1204
1205    pub async fn get_channel_closure_notice_period(&self) -> Result<Duration, HoprLibError> {
1206        self.chain_api
1207            .channel_closure_notice_period()
1208            .await
1209            .map_err(HoprLibError::chain)
1210    }
1211
1212    pub async fn accounts_announced_on_chain(&self) -> Result<Vec<AccountEntry>, HoprLibError> {
1213        Ok(self
1214            .chain_api
1215            .stream_accounts(AccountSelector {
1216                public_only: true,
1217                ..Default::default()
1218            })
1219            .map_err(HoprLibError::chain)
1220            .await?
1221            .collect()
1222            .await)
1223    }
1224
1225    pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> Result<Option<Address>, HoprLibError> {
1226        let pubkey = hopr_transport::peer_id_to_public_key(peer_id)
1227            .await
1228            .map_err(HoprLibError::TransportError)?;
1229
1230        self.chain_api
1231            .packet_key_to_chain_key(&pubkey)
1232            .await
1233            .map_err(HoprLibError::chain)
1234    }
1235
1236    pub async fn chain_key_to_peerid(&self, address: &Address) -> Result<Option<PeerId>, HoprLibError> {
1237        self.chain_api
1238            .chain_key_to_packet_key(address)
1239            .await
1240            .map(|pk| pk.map(|v| v.into()))
1241            .map_err(HoprLibError::chain)
1242    }
1243
1244    pub async fn channel_from_hash(&self, channel_id: &Hash) -> Result<Option<ChannelEntry>, HoprLibError> {
1245        self.chain_api
1246            .channel_by_id(channel_id)
1247            .await
1248            .map_err(HoprLibError::chain)
1249    }
1250
1251    pub async fn channel(&self, src: &Address, dest: &Address) -> Result<Option<ChannelEntry>, HoprLibError> {
1252        self.chain_api
1253            .channel_by_parties(src, dest)
1254            .await
1255            .map_err(HoprLibError::chain)
1256    }
1257
1258    pub async fn channels_from(&self, src: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1259        Ok(self
1260            .chain_api
1261            .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1262                ChannelStatusDiscriminants::Closed,
1263                ChannelStatusDiscriminants::Open,
1264                ChannelStatusDiscriminants::PendingToClose,
1265            ]))
1266            .map_err(HoprLibError::chain)
1267            .await?
1268            .collect()
1269            .await)
1270    }
1271
1272    pub async fn channels_to(&self, dest: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1273        Ok(self
1274            .chain_api
1275            .stream_channels(
1276                ChannelSelector::default()
1277                    .with_destination(*dest)
1278                    .with_allowed_states(&[
1279                        ChannelStatusDiscriminants::Closed,
1280                        ChannelStatusDiscriminants::Open,
1281                        ChannelStatusDiscriminants::PendingToClose,
1282                    ]),
1283            )
1284            .map_err(HoprLibError::chain)
1285            .await?
1286            .collect()
1287            .await)
1288    }
1289
1290    pub async fn all_channels(&self) -> Result<Vec<ChannelEntry>, HoprLibError> {
1291        Ok(self
1292            .chain_api
1293            .stream_channels(ChannelSelector::default().with_allowed_states(&[
1294                ChannelStatusDiscriminants::Closed,
1295                ChannelStatusDiscriminants::Open,
1296                ChannelStatusDiscriminants::PendingToClose,
1297            ]))
1298            .map_err(HoprLibError::chain)
1299            .await?
1300            .collect()
1301            .await)
1302    }
1303
1304    pub async fn open_channel(
1305        &self,
1306        destination: &Address,
1307        amount: HoprBalance,
1308    ) -> Result<OpenChannelResult, HoprLibError> {
1309        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1310
1311        let channel_id = generate_channel_id(&self.me_onchain(), destination);
1312
1313        let confirm_awaiter = self
1314            .chain_api
1315            .open_channel(destination, amount)
1316            .await
1317            .map_err(HoprLibError::chain)?;
1318
1319        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1320            format!("open channel to {destination} ({channel_id})"),
1321            move |event| matches!(event, ChainEvent::ChannelOpened(c) if c.get_id() == &channel_id),
1322            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1323        )?;
1324
1325        let tx_hash = confirm_awaiter.await.map_err(|e| {
1326            event_abort.abort();
1327            HoprLibError::chain(e)
1328        })?;
1329
1330        let event = event_awaiter.await?;
1331        debug!(%event, "open channel event received");
1332
1333        Ok(OpenChannelResult { tx_hash, channel_id })
1334    }
1335
1336    pub async fn fund_channel(&self, channel_id: &ChannelId, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1337        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1338
1339        let channel_id = *channel_id;
1340
1341        let confirm_awaiter = self
1342            .chain_api
1343            .fund_channel(&channel_id, amount)
1344            .await
1345            .map_err(HoprLibError::chain)?;
1346
1347        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1348            format!("fund channel {channel_id}"),
1349            move |event| matches!(event, ChainEvent::ChannelBalanceIncreased(c, a) if c.get_id() == &channel_id && a == &amount),
1350            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT
1351        )?;
1352
1353        let res = confirm_awaiter.await.map_err(|e| {
1354            event_abort.abort();
1355            HoprLibError::chain(e)
1356        })?;
1357
1358        let event = event_awaiter.await?;
1359        debug!(%event, "fund channel event received");
1360
1361        Ok(res)
1362    }
1363
1364    pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> Result<CloseChannelResult, HoprLibError> {
1365        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1366
1367        let channel_id = *channel_id;
1368
1369        let confirm_awaiter = self
1370            .chain_api
1371            .close_channel(&channel_id)
1372            .await
1373            .map_err(HoprLibError::chain)?;
1374
1375        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1376            format!("close channel {channel_id}"),
1377            move |event| {
1378                matches!(event, ChainEvent::ChannelClosed(c) if c.get_id() == &channel_id)
1379                    || matches!(event, ChainEvent::ChannelClosureInitiated(c) if c.get_id() == &channel_id)
1380            },
1381            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1382        )?;
1383
1384        let tx_hash = confirm_awaiter.await.map_err(|e| {
1385            event_abort.abort();
1386            HoprLibError::chain(e)
1387        })?;
1388
1389        let event = event_awaiter.await?;
1390        debug!(%event, "close channel event received");
1391
1392        Ok(CloseChannelResult { tx_hash })
1393    }
1394
1395    pub async fn tickets_in_channel(
1396        &self,
1397        channel_id: &ChannelId,
1398    ) -> Result<Option<Vec<RedeemableTicket>>, HoprLibError> {
1399        if let Some(channel) = self
1400            .chain_api
1401            .channel_by_id(channel_id)
1402            .await
1403            .map_err(|e| HoprTransportError::Other(e.into()))?
1404        {
1405            if &channel.destination == self.chain_api.me() {
1406                Ok(Some(
1407                    self.node_db
1408                        .stream_tickets([&channel])
1409                        .await
1410                        .map_err(HoprLibError::db)?
1411                        .collect()
1412                        .await,
1413                ))
1414            } else {
1415                Ok(None)
1416            }
1417        } else {
1418            Ok(None)
1419        }
1420    }
1421
1422    pub async fn all_tickets(&self) -> Result<Vec<VerifiedTicket>, HoprLibError> {
1423        Ok(self
1424            .node_db
1425            .stream_tickets(None::<TicketSelector>)
1426            .await
1427            .map_err(HoprLibError::db)?
1428            .map(|v| v.ticket)
1429            .collect()
1430            .await)
1431    }
1432
1433    pub async fn ticket_statistics(&self) -> Result<ChannelTicketStatistics, HoprLibError> {
1434        self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
1435    }
1436
1437    pub async fn reset_ticket_statistics(&self) -> Result<(), HoprLibError> {
1438        self.node_db
1439            .reset_ticket_statistics()
1440            .await
1441            .map_err(HoprLibError::chain)
1442    }
1443
1444    pub async fn redeem_all_tickets<B: Into<HoprBalance> + Send>(&self, min_value: B) -> Result<(), HoprLibError> {
1445        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1446
1447        let min_value = min_value.into();
1448
1449        self.chain_api
1450            .stream_channels(
1451                ChannelSelector::default()
1452                    .with_destination(self.me_onchain())
1453                    .with_allowed_states(&[
1454                        ChannelStatusDiscriminants::Open,
1455                        ChannelStatusDiscriminants::PendingToClose,
1456                    ]),
1457            )
1458            .map_err(HoprLibError::chain)
1459            .await?
1460            .map(|channel| {
1461                Ok(TicketSelector::from(&channel)
1462                    .with_amount(min_value..)
1463                    .with_index_range(channel.ticket_index..)
1464                    .with_state(AcknowledgedTicketStatus::Untouched))
1465            })
1466            .forward(self.redemption_requests()?)
1467            .await?;
1468
1469        Ok(())
1470    }
1471
1472    pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance> + Send>(
1473        &self,
1474        counterparty: &Address,
1475        min_value: B,
1476    ) -> Result<(), HoprLibError> {
1477        self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1478            .await
1479    }
1480
1481    pub async fn redeem_tickets_in_channel<B: Into<HoprBalance> + Send>(
1482        &self,
1483        channel_id: &Hash,
1484        min_value: B,
1485    ) -> Result<(), HoprLibError> {
1486        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1487
1488        let channel = self
1489            .chain_api
1490            .channel_by_id(channel_id)
1491            .await
1492            .map_err(HoprLibError::chain)?
1493            .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1494
1495        self.redemption_requests()?
1496            .send(
1497                TicketSelector::from(channel)
1498                    .with_amount(min_value.into()..)
1499                    .with_index_range(channel.ticket_index..)
1500                    .with_state(AcknowledgedTicketStatus::Untouched),
1501            )
1502            .await?;
1503
1504        Ok(())
1505    }
1506
1507    pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> Result<(), HoprLibError> {
1508        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1509
1510        self.redemption_requests()?
1511            .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1512            .await?;
1513
1514        Ok(())
1515    }
1516
1517    pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send + 'static {
1518        self.winning_ticket_subscribers.1.activate_cloned()
1519    }
1520
1521    pub fn redemption_requests(&self) -> Result<SinkMap, HoprLibError> {
1522        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1523
1524        Ok(self
1525            .redeem_requests
1526            .get()
1527            .cloned()
1528            .expect("redeem_requests is not initialized")
1529            .sink_map_err(|e| HoprLibError::GeneralError(format!("failed to send redemption request: {e}"))))
1530    }
1531
1532    pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1533        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1534
1535        self.chain_api
1536            .withdraw(amount, &recipient)
1537            .and_then(identity)
1538            .map_err(HoprLibError::chain)
1539            .await
1540    }
1541
1542    pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> Result<Hash, HoprLibError> {
1543        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1544
1545        self.chain_api
1546            .withdraw(amount, &recipient)
1547            .and_then(identity)
1548            .map_err(HoprLibError::chain)
1549            .await
1550    }
1551}