Skip to main content

hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15
16/// Helper functions.
17mod helpers;
18
19/// Configuration-related public types
20pub mod config;
21/// Various public constants.
22pub mod constants;
23/// Lists all errors thrown from this library.
24pub mod errors;
25/// Public traits for interactions with this library.
26pub mod traits;
27/// Utility module with helper types and functionality over hopr-lib behavior.
28pub mod utils;
29
30pub use hopr_api as api;
31
32/// Exports of libraries necessary for API and interface operations.
33#[doc(hidden)]
34pub mod exports {
35    pub mod types {
36        pub use hopr_chain_types as chain;
37        pub use hopr_internal_types as internal;
38        pub use hopr_primitive_types as primitive;
39    }
40
41    pub mod crypto {
42        pub use hopr_crypto_keypair as keypair;
43        pub use hopr_crypto_types as types;
44    }
45
46    pub mod network {
47        pub use hopr_network_types as types;
48    }
49
50    pub use hopr_transport as transport;
51}
52
53/// Export of relevant types for easier integration.
54#[doc(hidden)]
55pub mod prelude {
56    pub use super::exports::{
57        crypto::{
58            keypair::key_pair::HoprKeys,
59            types::prelude::{ChainKeypair, Hash, OffchainKeypair},
60        },
61        network::types::{
62            prelude::ForeignDataMode,
63            udp::{ConnectedUdpStream, UdpStreamParallelism},
64        },
65        transport::{OffchainPublicKey, socket::HoprSocket},
66        types::primitive::prelude::Address,
67    };
68}
69
70use std::{
71    convert::identity,
72    future::Future,
73    sync::{Arc, OnceLock, atomic::Ordering},
74    time::Duration,
75};
76
77use futures::{
78    FutureExt, SinkExt, Stream, StreamExt, TryFutureExt,
79    channel::mpsc::{SendError, channel},
80    pin_mut,
81    sink::SinkMapErr,
82};
83use futures_time::future::FutureExt as FuturesTimeFutureExt;
84use hopr_api::{
85    chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
86    ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
87    db::{HoprNodeDbApi, TicketMarker, TicketSelector},
88    node::{ChainInfo, CloseChannelResult, OpenChannelResult, SafeModuleConfig, state::AtomicHoprState},
89};
90pub use hopr_api::{
91    db::ChannelTicketStatistics,
92    graph::EdgeLinkObservable,
93    network::{NetworkBuilder, NetworkStreamControl},
94    node::{HoprNodeNetworkOperations, HoprNodeOperations, state::HoprState},
95};
96use hopr_async_runtime::prelude::spawn;
97pub use hopr_async_runtime::{Abortable, AbortableList};
98pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
99pub use hopr_crypto_types::prelude::*;
100pub use hopr_internal_types::prelude::*;
101pub use hopr_network_types::prelude::*;
102#[cfg(all(feature = "prometheus", not(test)))]
103use hopr_platform::time::native::current_time;
104pub use hopr_primitive_types::prelude::*;
105use hopr_transport::errors::HoprTransportError;
106#[cfg(feature = "runtime-tokio")]
107pub use hopr_transport::transfer_session;
108pub use hopr_transport::*;
109use tracing::{debug, error, info, warn};
110use validator::Validate;
111
112pub use crate::{
113    config::SafeModule,
114    constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
115    errors::{HoprLibError, HoprStatusError},
116};
117
118/// Long-running tasks that are spawned by the HOPR node.
119#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display, strum::EnumCount)]
120pub enum HoprLibProcess {
121    #[strum(to_string = "transport: {0}")]
122    Transport(HoprTransportProcess),
123    #[strum(to_string = "session server providing the exit node session stream functionality")]
124    SessionServer,
125    #[strum(to_string = "ticket redemption queue driver")]
126    TicketRedemptions,
127    #[strum(to_string = "subscription for on-chain channel updates")]
128    ChannelEvents,
129    #[strum(to_string = "on received ticket event (winning or rejected)")]
130    TicketEvents,
131}
132
133#[cfg(all(feature = "prometheus", not(test)))]
134lazy_static::lazy_static! {
135    static ref METRIC_PROCESS_START_TIME:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
136        "hopr_start_time",
137        "The unix timestamp in seconds at which the process was started"
138    ).unwrap();
139    static ref METRIC_HOPR_LIB_VERSION:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
140        "hopr_lib_version",
141        "Executed version of hopr-lib",
142        &["version"]
143    ).unwrap();
144    static ref METRIC_HOPR_NODE_INFO:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
145        "hopr_node_addresses",
146        "Node on-chain and off-chain addresses",
147        &["peerid", "address", "safe_address", "module_address"]
148    ).unwrap();
149}
150
151/// Prepare an optimized version of the tokio runtime setup for hopr-lib specifically.
152///
153/// Divide the available CPU parallelism by 2, since half of the available threads are
154/// to be used for IO-bound and half for CPU-bound tasks.
155#[cfg(feature = "runtime-tokio")]
156pub fn prepare_tokio_runtime(
157    num_cpu_threads: Option<std::num::NonZeroUsize>,
158    num_io_threads: Option<std::num::NonZeroUsize>,
159) -> anyhow::Result<tokio::runtime::Runtime> {
160    use std::str::FromStr;
161    let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
162
163    hopr_parallelize::cpu::init_thread_pool(
164        num_cpu_threads
165            .map(|v| v.get())
166            .or(avail_parallelism)
167            .ok_or(anyhow::anyhow!(
168                "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
169                 environment variable."
170            ))?
171            .max(1),
172    )?;
173
174    Ok(tokio::runtime::Builder::new_multi_thread()
175        .enable_all()
176        .worker_threads(
177            num_io_threads
178                .map(|v| v.get())
179                .or(avail_parallelism)
180                .ok_or(anyhow::anyhow!(
181                    "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
182                     environment variable."
183                ))?
184                .max(1),
185        )
186        .thread_name("hoprd")
187        .thread_stack_size(
188            std::env::var("HOPRD_THREAD_STACK_SIZE")
189                .ok()
190                .and_then(|v| usize::from_str(&v).ok())
191                .unwrap_or(10 * 1024 * 1024)
192                .max(2 * 1024 * 1024),
193        )
194        .build()?)
195}
196
197/// Type alias used to send and receive transport data via a running HOPR node.
198pub type HoprTransportIO = socket::HoprSocket<
199    futures::channel::mpsc::Receiver<ApplicationDataIn>,
200    futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
201>;
202
203type NewTicketEvents = (
204    async_broadcast::Sender<VerifiedTicket>,
205    async_broadcast::InactiveReceiver<VerifiedTicket>,
206);
207
208/// Time to wait until the node's keybinding appears on-chain
209const NODE_READY_TIMEOUT: Duration = Duration::from_secs(120);
210
211/// Timeout to wait until an on-chain event is received in response to a successful on-chain operation resolution.
212// TODO: use the value from ChainInfo instead (once available via https://github.com/hoprnet/blokli/issues/200)
213const ON_CHAIN_RESOLUTION_EVENT_TIMEOUT: Duration = Duration::from_secs(90);
214
215/// HOPR main object providing the entire HOPR node functionality
216///
217/// Instantiating this object creates all processes and objects necessary for
218/// running the HOPR node. Once created, the node can be started using the
219/// `run()` method.
220///
221/// Externally offered API should be enough to perform all necessary tasks
222/// with the HOPR node manually, but it is advised to create such a configuration
223/// that manual interaction is unnecessary.
224///
225/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
226pub struct Hopr<Chain, Db, Graph, Net>
227where
228    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
229        + hopr_api::graph::NetworkGraphUpdate
230        + Clone
231        + Send
232        + Sync
233        + 'static,
234    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
235{
236    me: OffchainKeypair,
237    cfg: config::HoprLibConfig,
238    state: Arc<api::node::state::AtomicHoprState>,
239    transport_api: HoprTransport<Chain, Db, Graph, Net>,
240    redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
241    node_db: Db,
242    chain_api: Chain,
243    winning_ticket_subscribers: NewTicketEvents,
244    processes: OnceLock<AbortableList<HoprLibProcess>>,
245}
246
247impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
248where
249    Chain: HoprChainApi + Clone + Send + Sync + 'static,
250    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
251    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
252        + hopr_api::graph::NetworkGraphUpdate
253        + Clone
254        + Send
255        + Sync
256        + 'static,
257    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
258{
259    pub async fn new(
260        identity: (&ChainKeypair, &OffchainKeypair),
261        hopr_chain_api: Chain,
262        hopr_node_db: Db,
263        graph: Graph,
264        cfg: config::HoprLibConfig,
265    ) -> errors::Result<Self> {
266        if hopr_crypto_random::is_rng_fixed() {
267            warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
268        }
269
270        cfg.validate()?;
271
272        let hopr_transport_api = HoprTransport::new(
273            identity,
274            hopr_chain_api.clone(),
275            hopr_node_db.clone(),
276            graph,
277            vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
278            cfg.protocol,
279        );
280
281        #[cfg(all(feature = "prometheus", not(test)))]
282        {
283            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
284            METRIC_HOPR_LIB_VERSION.set(
285                &[const_format::formatcp!("{}", constants::APP_VERSION)],
286                const_format::formatcp!(
287                    "{}.{}",
288                    env!("CARGO_PKG_VERSION_MAJOR"),
289                    env!("CARGO_PKG_VERSION_MINOR")
290                )
291                .parse()
292                .unwrap_or(0.0),
293            );
294
295            // Calling get_ticket_statistics will initialize the respective metrics on tickets
296            if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
297                error!(%error, "failed to initialize ticket statistics metrics");
298            }
299        }
300
301        let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
302        new_tickets_tx.set_await_active(false);
303        new_tickets_tx.set_overflow(true);
304
305        Ok(Self {
306            me: identity.1.clone(),
307            cfg,
308            state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
309            transport_api: hopr_transport_api,
310            chain_api: hopr_chain_api,
311            node_db: hopr_node_db,
312            redeem_requests: OnceLock::new(),
313            processes: OnceLock::new(),
314            winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
315        })
316    }
317
318    fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
319        if HoprNodeOperations::status(self) == state {
320            Ok(())
321        } else {
322            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
323        }
324    }
325
326    pub fn config(&self) -> &config::HoprLibConfig {
327        &self.cfg
328    }
329
330    #[inline]
331    fn is_public(&self) -> bool {
332        self.cfg.publish
333    }
334
335    // TODO(20260218): @NumberFour8 abstract the telemetry objects properly and extract this API into a telemetry trait
336    /// Get packet stats for a specific peer.
337    #[cfg(feature = "telemetry")]
338    pub async fn network_peer_packet_stats(&self, peer: &PeerId) -> errors::Result<Option<PeerPacketStatsSnapshot>> {
339        Ok(self.transport_api.network_peer_packet_stats(peer).await?)
340    }
341
342    // TODO(20260218): @NumberFour8 abstract the telemetry objects properly and extract this API into a telemetry trait
343    /// Get packet stats for all connected peers.
344    #[cfg(feature = "telemetry")]
345    pub async fn network_all_packet_stats(&self) -> errors::Result<Vec<(PeerId, PeerPacketStatsSnapshot)>> {
346        Ok(self.transport_api.network_all_packet_stats().await?)
347    }
348
349    pub async fn run<
350        Ct,
351        NetBuilder,
352        #[cfg(feature = "session-server")] T: traits::HoprSessionServer + Clone + Send + 'static,
353    >(
354        &self,
355        cover_traffic: Ct,
356        network_builder: NetBuilder,
357        #[cfg(feature = "session-server")] serve_handler: T,
358    ) -> errors::Result<HoprTransportIO>
359    where
360        Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
361        NetBuilder: NetworkBuilder<Network = Net> + Send + Sync + 'static,
362    {
363        self.error_if_not_in_state(
364            HoprState::Uninitialized,
365            "cannot start the hopr node multiple times".into(),
366        )?;
367
368        #[cfg(feature = "testing")]
369        warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
370
371        let me_onchain = *self.chain_api.me();
372        info!(
373            address = %me_onchain, minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
374            "node is not started, please fund this node",
375        );
376
377        self.state.store(HoprState::WaitingForFunds, Ordering::Relaxed);
378        helpers::wait_for_funds(
379            *MIN_NATIVE_BALANCE,
380            *SUGGESTED_NATIVE_BALANCE,
381            Duration::from_secs(200),
382            me_onchain,
383            &self.chain_api,
384        )
385        .await?;
386
387        let mut processes = AbortableList::<HoprLibProcess>::default();
388
389        info!("starting HOPR node...");
390        self.state.store(HoprState::CheckingBalance, Ordering::Relaxed);
391
392        let balance: XDaiBalance = self.chain_api.balance(me_onchain).await.map_err(HoprLibError::chain)?;
393        let minimum_balance = *constants::MIN_NATIVE_BALANCE;
394
395        info!(
396            address = %me_onchain,
397            %balance,
398            %minimum_balance,
399            "node information"
400        );
401
402        if balance.le(&minimum_balance) {
403            return Err(HoprLibError::GeneralError(
404                "cannot start the node without a sufficiently funded wallet".into(),
405            ));
406        }
407
408        self.state.store(HoprState::ValidatingNetworkConfig, Ordering::Relaxed);
409
410        // Once we are able to query the chain,
411        // check if the ticket price is configured correctly.
412        let network_min_ticket_price = self
413            .chain_api
414            .minimum_ticket_price()
415            .await
416            .map_err(HoprLibError::chain)?;
417        let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
418        if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
419            return Err(HoprLibError::GeneralError(format!(
420                "configured outgoing ticket price is lower than the network minimum ticket price: \
421                 {configured_ticket_price:?} < {network_min_ticket_price}"
422            )));
423        }
424        // Once we are able to query the chain,
425        // check if the winning probability is configured correctly.
426        let network_min_win_prob = self
427            .chain_api
428            .minimum_incoming_ticket_win_prob()
429            .await
430            .map_err(HoprLibError::chain)?;
431        let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
432        if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
433            && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
434        {
435            return Err(HoprLibError::GeneralError(format!(
436                "configured outgoing ticket winning probability is lower than the network minimum winning \
437                 probability: {configured_win_prob:?} < {network_min_win_prob}"
438            )));
439        }
440
441        self.state.store(HoprState::CheckingOnchainAddress, Ordering::Relaxed);
442
443        info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
444
445        let safe_addr = self.cfg.safe_module.safe_address;
446
447        if self.me_onchain() == safe_addr {
448            return Err(HoprLibError::GeneralError(
449                "cannot use self as staking safe address".into(),
450            ));
451        }
452
453        self.state.store(HoprState::RegisteringSafe, Ordering::Relaxed);
454        info!(%safe_addr, "registering safe with this node");
455        match self.chain_api.register_safe(&safe_addr).await {
456            Ok(awaiter) => {
457                // Wait until the registration is confirmed on-chain, otherwise we cannot proceed.
458                awaiter.await.map_err(|error| {
459                    error!(%safe_addr, %error, "safe registration failed with error");
460                    HoprLibError::chain(error)
461                })?;
462                info!(%safe_addr, "safe successfully registered with this node");
463            }
464            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
465                info!(%safe_addr, "this safe is already registered with this node");
466            }
467            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
468                // TODO: support safe deregistration flow
469                error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
470                return Err(HoprLibError::GeneralError("node registered with different safe".into()));
471            }
472            Err(error) => {
473                error!(%safe_addr, %error, "safe registration failed");
474                return Err(HoprLibError::chain(error));
475            }
476        }
477
478        // Only public nodes announce multiaddresses
479        let multiaddresses_to_announce = if self.is_public() {
480            // The multiaddresses are filtered for the non-private ones,
481            // unless `announce_local_addresses` is set to `true`.
482            self.transport_api.announceable_multiaddresses()
483        } else {
484            Vec::with_capacity(0)
485        };
486
487        // Warn when announcing a private multiaddress, which is acceptable in certain scenarios
488        multiaddresses_to_announce
489            .iter()
490            .filter(|a| !is_public_address(a))
491            .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
492
493        self.state.store(HoprState::AnnouncingNode, Ordering::Relaxed);
494
495        let chain_api = self.chain_api.clone();
496        let me_offchain = *self.me.public();
497        let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
498
499        // At this point the node is already registered with Safe, so
500        // we can announce via Safe-compliant TX
501        info!(?multiaddresses_to_announce, "announcing node on chain");
502        match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
503            Ok(awaiter) => {
504                // Wait until the announcement is confirmed on-chain, otherwise we cannot proceed.
505                awaiter.await.map_err(|error| {
506                    error!(?multiaddresses_to_announce, %error, "node announcement failed");
507                    HoprLibError::chain(error)
508                })?;
509                info!(?multiaddresses_to_announce, "node has been successfully announced");
510            }
511            Err(AnnouncementError::AlreadyAnnounced) => {
512                info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
513            }
514            Err(error) => {
515                error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
516                return Err(HoprLibError::chain(error));
517            }
518        }
519
520        self.state.store(HoprState::AwaitingKeyBinding, Ordering::Relaxed);
521
522        // Wait for the node key-binding readiness to return
523        let this_node_account = node_ready
524            .await
525            .map_err(HoprLibError::other)?
526            .map_err(HoprLibError::chain)?;
527        if this_node_account.chain_addr != self.me_onchain()
528            || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
529        {
530            error!(%this_node_account, "account bound to offchain key does not match this node");
531            return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
532        }
533
534        info!(%this_node_account, "node account is ready");
535
536        self.state.store(HoprState::InitializingServices, Ordering::Relaxed);
537
538        info!("initializing session infrastructure");
539        let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
540            .ok()
541            .and_then(|s| s.trim().parse::<usize>().ok())
542            .filter(|&c| c > 0)
543            .unwrap_or(256);
544
545        let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
546        #[cfg(feature = "session-server")]
547        {
548            debug!(capacity = incoming_session_channel_capacity, "creating session server");
549            processes.insert(
550                HoprLibProcess::SessionServer,
551                hopr_async_runtime::spawn_as_abortable!(
552                    _session_rx
553                        .for_each_concurrent(None, move |session| {
554                            let serve_handler = serve_handler.clone();
555                            async move {
556                                let session_id = *session.session.id();
557                                match serve_handler.process(session).await {
558                                    Ok(_) => debug!(?session_id, "client session processed successfully"),
559                                    Err(error) => error!(
560                                        ?session_id,
561                                        %error,
562                                        "client session processing failed"
563                                    ),
564                                }
565                            }
566                        })
567                        .inspect(|_| tracing::warn!(
568                            task = %HoprLibProcess::SessionServer,
569                            "long-running background task finished"
570                        ))
571                ),
572            );
573        }
574
575        info!("starting ticket events processor");
576        let (tickets_tx, tickets_rx) = channel(8192);
577        let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
578        processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
579        let node_db = self.node_db.clone();
580        let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
581        spawn(
582            tickets_rx
583                .filter_map(move |ticket_event| {
584                    let node_db = node_db.clone();
585                    async move {
586                        match ticket_event {
587                            TicketEvent::WinningTicket(winning) => {
588                                if let Err(error) = node_db.insert_ticket(*winning).await {
589                                    tracing::error!(%error, %winning, "failed to insert ticket into database");
590                                } else {
591                                    tracing::debug!(%winning, "inserted ticket into database");
592                                }
593                                Some(winning)
594                            }
595                            TicketEvent::RejectedTicket(rejected, issuer) => {
596                                if let Some(issuer) = &issuer {
597                                    if let Err(error) =
598                                        node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
599                                    {
600                                        tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
601                                    } else {
602                                        tracing::debug!(%rejected, "marked ticket as rejected");
603                                    }
604                                } else {
605                                    tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
606                                }
607                                None
608                            }
609                        }
610                    }
611                })
612                .for_each(move |ticket| {
613                    if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
614                        tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
615                    }
616                    futures::future::ready(())
617                })
618                .inspect(|_| {
619                    tracing::warn!(
620                        task = %HoprLibProcess::TicketEvents,
621                        "long-running background task finished"
622                    )
623                }),
624        );
625
626        info!("starting transport");
627        let (hopr_socket, transport_processes) = self
628            .transport_api
629            .run(cover_traffic, network_builder, tickets_tx, session_tx)
630            .await?;
631        processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
632
633        info!("starting ticket redemption service");
634        // Start a queue that takes care of redeeming tickets via given TicketSelectors
635        let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
636        let _ = self.redeem_requests.set(redemption_req_tx);
637        let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
638        processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
639        let chain = self.chain_api.clone();
640        let node_db = self.node_db.clone();
641        spawn(redemption_req_rx
642            .for_each(move |selector| {
643                let chain = chain.clone();
644                let db = node_db.clone();
645                async move {
646                    match chain.redeem_tickets_via_selectors(&db, [selector]).await {
647                        Ok(res) => debug!(%res, "redemption complete"),
648                        Err(error) => error!(%error, "redemption failed"),
649                    }
650                }
651            })
652            .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
653        );
654
655        info!("subscribing to channel events");
656        let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
657        processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
658        let chain = self.chain_api.clone();
659        let node_db = self.node_db.clone();
660        let events = chain.subscribe().map_err(HoprLibError::chain)?;
661        spawn(
662            futures::stream::Abortable::new(
663                events
664                    .filter_map(move |event|
665                        futures::future::ready(event.try_as_channel_closed())
666                    ),
667                chain_events_sub_reg
668            )
669            .for_each(move |closed_channel| {
670                let node_db = node_db.clone();
671                let chain = chain.clone();
672                async move {
673                    match closed_channel.direction(chain.me()) {
674                        Some(ChannelDirection::Incoming) => {
675                            match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
676                                Ok(num_neglected) if num_neglected > 0 => {
677                                    warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
678                                },
679                                Ok(_) => {
680                                    debug!(%closed_channel, "no neglected tickets on incoming closed channel");
681                                },
682                                Err(error) => {
683                                    error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
684                                }
685                            }
686                        },
687                        Some(ChannelDirection::Outgoing) => {
688                            if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
689                                error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
690                            } else {
691                                debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
692                            }
693                        }
694                        _ => {} // Event for a channel that is not our own
695                    }
696                }
697            })
698            .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
699        );
700
701        info!("synchronizing ticket states");
702        // NOTE: after the chain is synced, we can reset tickets which are considered
703        // redeemed but on-chain state does not align with that. This implies there was a problem
704        // right when the transaction was sent on-chain. In such cases, we simply let it retry and
705        // handle errors appropriately.
706        let mut channels = self
707            .chain_api
708            .stream_channels(ChannelSelector {
709                destination: self.me_onchain().into(),
710                ..Default::default()
711            })
712            .map_err(HoprLibError::chain)
713            .await?;
714
715        while let Some(channel) = channels.next().await {
716            // Set the state of all unredeemed tickets with a higher index than the current
717            // channel index as untouched.
718            self.node_db
719                .update_ticket_states_and_fetch(
720                    [TicketSelector::from(&channel)
721                        .with_state(AcknowledgedTicketStatus::BeingRedeemed)
722                        .with_index_range(channel.ticket_index..)],
723                    AcknowledgedTicketStatus::Untouched,
724                )
725                .map_err(HoprLibError::db)
726                .await?
727                .for_each(|ticket| {
728                    info!(%ticket, "fixed next out-of-sync ticket");
729                    futures::future::ready(())
730                })
731                .await;
732
733            // Mark all the tickets with a lower ticket index than the current channel index as neglected.
734            self.node_db
735                .mark_tickets_as(
736                    [TicketSelector::from(&channel).with_index_range(..channel.ticket_index)],
737                    TicketMarker::Neglected,
738                )
739                .map_err(HoprLibError::db)
740                .await?;
741        }
742
743        self.state.store(HoprState::Running, Ordering::Relaxed);
744
745        info!(
746            id = %self.me_peer_id(),
747            version = constants::APP_VERSION,
748            "NODE STARTED AND RUNNING"
749        );
750
751        #[cfg(all(feature = "prometheus", not(test)))]
752        METRIC_HOPR_NODE_INFO.set(
753            &[
754                &self.me.public().to_peerid_str(),
755                &me_onchain.to_string(),
756                &self.cfg.safe_module.safe_address.to_string(),
757                &self.cfg.safe_module.module_address.to_string(),
758            ],
759            1.0,
760        );
761
762        let _ = self.processes.set(processes);
763        Ok(hopr_socket)
764    }
765
766    /// Used to practically shut down all node's processes without dropping the instance.
767    ///
768    /// This means that the instance can be used to retrieve some information, but all
769    /// active operations will stop and new will be impossible to perform.
770    /// Such operations will return [`HoprStatusError::NotThereYet`].
771    ///
772    /// This is the final state and cannot be reversed by calling [`HoprLib::run`] again.
773    pub fn shutdown(&self) -> Result<(), HoprLibError> {
774        self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
775        if let Some(processes) = self.processes.get() {
776            processes.abort_all();
777        }
778        self.state.store(HoprState::Terminated, Ordering::Relaxed);
779        info!("NODE SHUTDOWN COMPLETE");
780        Ok(())
781    }
782
783    /// Create a client session connection returning a session object that implements
784    /// [`futures::io::AsyncRead`] and [`futures::io::AsyncWrite`] and can bu used as a read/write binary session.
785    #[cfg(feature = "session-client")]
786    pub async fn connect_to(
787        &self,
788        destination: Address,
789        target: SessionTarget,
790        cfg: SessionClientConfig,
791    ) -> errors::Result<HoprSession> {
792        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
793
794        let backoff = backon::ConstantBuilder::default()
795            .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
796            .with_delay(self.cfg.protocol.session.establish_retry_timeout)
797            .with_jitter();
798
799        use backon::Retryable;
800
801        Ok((|| {
802            let cfg = cfg.clone();
803            let target = target.clone();
804            async { self.transport_api.new_session(destination, target, cfg).await }
805        })
806        .retry(backoff)
807        .sleep(backon::FuturesTimerSleeper)
808        .await?)
809    }
810
811    /// Sends keep-alive to the given [`HoprSessionId`], making sure the session is not
812    /// closed due to inactivity.
813    #[cfg(feature = "session-client")]
814    pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
815        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
816        Ok(self.transport_api.probe_session(id).await?)
817    }
818
819    #[cfg(feature = "session-client")]
820    pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
821        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
822        Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
823    }
824
825    #[cfg(all(feature = "session-client", feature = "telemetry"))]
826    pub async fn get_session_stats(&self, id: &SessionId) -> errors::Result<SessionStatsSnapshot> {
827        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
828        Ok(self.transport_api.session_stats(id).await?)
829    }
830
831    #[cfg(feature = "session-client")]
832    pub async fn update_session_surb_balancer_config(
833        &self,
834        id: &SessionId,
835        cfg: SurbBalancerConfig,
836    ) -> errors::Result<()> {
837        self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
838        Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
839    }
840
841    /// Spawns a one-shot awaiter that hooks up to the [`ChainEvent`] bus and either matching the given `predicate`
842    /// successfully or timing out after `timeout`.
843    fn spawn_wait_for_on_chain_event(
844        &self,
845        context: impl std::fmt::Display,
846        predicate: impl Fn(&ChainEvent) -> bool + Send + Sync + 'static,
847        timeout: Duration,
848    ) -> errors::Result<(
849        impl Future<Output = errors::Result<ChainEvent>>,
850        hopr_async_runtime::AbortHandle,
851    )> {
852        debug!(%context, "registering wait for on-chain event");
853        let (event_stream, handle) = futures::stream::abortable(
854            self.chain_api
855                .subscribe()
856                .map_err(HoprLibError::chain)?
857                .skip_while(move |event| futures::future::ready(!predicate(event))),
858        );
859
860        let ctx = context.to_string();
861
862        Ok((
863            spawn(async move {
864                pin_mut!(event_stream);
865                let res = event_stream
866                    .next()
867                    .timeout(futures_time::time::Duration::from(timeout))
868                    .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")))
869                    .await?
870                    .ok_or(HoprLibError::GeneralError(format!(
871                        "failed to yield an on-chain event for {ctx}"
872                    )));
873                debug!(%ctx, ?res, "on-chain event waiting done");
874                res
875            })
876            .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")))
877            .and_then(futures::future::ready),
878            handle,
879        ))
880    }
881}
882
883impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
884where
885    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
886        + hopr_api::graph::NetworkGraphUpdate
887        + Clone
888        + Send
889        + Sync
890        + 'static,
891    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
892{
893    // === telemetry
894    /// Prometheus formatted metrics collected by the hopr-lib components.
895    pub fn collect_hopr_metrics() -> errors::Result<String> {
896        cfg_if::cfg_if! {
897            if #[cfg(all(feature = "prometheus", not(test)))] {
898                hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
899            } else {
900                Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
901            }
902        }
903    }
904}
905
906// === Trait implementations for the high-level node API ===
907
908impl<Chain, Db, Graph, Net> HoprNodeOperations for Hopr<Chain, Db, Graph, Net>
909where
910    Chain: HoprChainApi + Clone + Send + Sync + 'static,
911    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
912    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
913        + hopr_api::graph::NetworkGraphUpdate
914        + Clone
915        + Send
916        + Sync
917        + 'static,
918    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
919{
920    fn status(&self) -> HoprState {
921        self.state.load(Ordering::Relaxed)
922    }
923}
924
925#[async_trait::async_trait]
926impl<Chain, Db, Graph, Net> HoprNodeNetworkOperations for Hopr<Chain, Db, Graph, Net>
927where
928    Chain: HoprChainApi + Clone + Send + Sync + 'static,
929    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
930    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
931        + hopr_api::graph::NetworkGraphUpdate
932        + Clone
933        + Send
934        + Sync
935        + 'static,
936    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
937{
938    type Error = HoprLibError;
939    type TransportObservable = Graph::Observed;
940
941    fn me_peer_id(&self) -> PeerId {
942        (*self.me.public()).into()
943    }
944
945    async fn get_public_nodes(&self) -> Result<Vec<(PeerId, Address, Vec<Multiaddr>)>, Self::Error> {
946        Ok(self
947            .chain_api
948            .stream_accounts(AccountSelector {
949                public_only: true,
950                ..Default::default()
951            })
952            .map_err(HoprLibError::chain)
953            .await?
954            .map(|entry| {
955                (
956                    PeerId::from(entry.public_key),
957                    entry.chain_addr,
958                    entry.get_multiaddrs().to_vec(),
959                )
960            })
961            .collect()
962            .await)
963    }
964
965    async fn network_health(&self) -> hopr_api::network::Health {
966        self.transport_api.network_health().await
967    }
968
969    async fn network_connected_peers(&self) -> Result<Vec<PeerId>, Self::Error> {
970        Ok(self
971            .transport_api
972            .network_connected_peers()
973            .await?
974            .into_iter()
975            .map(PeerId::from)
976            .collect())
977    }
978
979    fn network_peer_info(&self, peer: &PeerId) -> Option<Self::TransportObservable> {
980        let pubkey = OffchainPublicKey::from_peerid(peer).ok()?;
981        self.transport_api.network_peer_observations(&pubkey)
982    }
983
984    async fn all_network_peers(
985        &self,
986        minimum_score: f64,
987    ) -> Result<Vec<(Option<Address>, PeerId, Self::TransportObservable)>, Self::Error> {
988        Ok(
989            futures::stream::iter(self.transport_api.all_network_peers(minimum_score).await?)
990                .filter_map(|(pubkey, info)| async move {
991                    let peer_id = PeerId::from(pubkey);
992                    let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
993                    Some((address, peer_id, info))
994                })
995                .collect::<Vec<_>>()
996                .await,
997        )
998    }
999
1000    fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1001        self.transport_api.local_multiaddresses()
1002    }
1003
1004    async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1005        self.transport_api.listening_multiaddresses().await
1006    }
1007
1008    async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1009        let Ok(pubkey) = hopr_transport::peer_id_to_public_key(peer).await else {
1010            return vec![];
1011        };
1012        self.transport_api.network_observed_multiaddresses(&pubkey).await
1013    }
1014
1015    async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Result<Vec<Multiaddr>, Self::Error> {
1016        let pubkey = hopr_transport::peer_id_to_public_key(peer)
1017            .await
1018            .map_err(HoprLibError::TransportError)?;
1019
1020        match self
1021            .chain_api
1022            .stream_accounts(AccountSelector {
1023                public_only: false,
1024                offchain_key: Some(pubkey),
1025                ..Default::default()
1026            })
1027            .map_err(HoprLibError::chain)
1028            .await?
1029            .next()
1030            .await
1031        {
1032            Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
1033            None => {
1034                error!(%peer, "no information");
1035                Ok(vec![])
1036            }
1037        }
1038    }
1039
1040    async fn ping(&self, peer: &PeerId) -> Result<(Duration, Self::TransportObservable), Self::Error> {
1041        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1042        let pubkey = hopr_transport::peer_id_to_public_key(peer)
1043            .await
1044            .map_err(HoprLibError::TransportError)?;
1045        Ok(self.transport_api.ping(&pubkey).await?)
1046    }
1047}
1048
1049pub type SinkMap = SinkMapErr<futures::channel::mpsc::Sender<TicketSelector>, fn(SendError) -> HoprLibError>;
1050
1051impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
1052where
1053    Chain: HoprChainApi + Clone + Send + Sync + 'static,
1054    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
1055    Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
1056        + hopr_api::graph::NetworkGraphUpdate
1057        + Clone
1058        + Send
1059        + Sync
1060        + 'static,
1061    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1062{
1063    pub fn me_onchain(&self) -> Address {
1064        *self.chain_api.me()
1065    }
1066
1067    pub fn get_safe_config(&self) -> SafeModuleConfig {
1068        SafeModuleConfig {
1069            safe_address: self.cfg.safe_module.safe_address,
1070            module_address: self.cfg.safe_module.module_address,
1071        }
1072    }
1073
1074    pub async fn get_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1075        self.chain_api
1076            .balance(self.me_onchain())
1077            .await
1078            .map_err(HoprLibError::chain)
1079    }
1080
1081    pub async fn get_safe_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1082        self.chain_api
1083            .balance(self.cfg.safe_module.safe_address)
1084            .await
1085            .map_err(HoprLibError::chain)
1086    }
1087
1088    pub async fn safe_allowance(&self) -> Result<HoprBalance, HoprLibError> {
1089        self.chain_api
1090            .safe_allowance(self.cfg.safe_module.safe_address)
1091            .await
1092            .map_err(HoprLibError::chain)
1093    }
1094
1095    pub async fn chain_info(&self) -> Result<ChainInfo, HoprLibError> {
1096        self.chain_api.chain_info().await.map_err(HoprLibError::chain)
1097    }
1098
1099    pub async fn get_ticket_price(&self) -> Result<HoprBalance, HoprLibError> {
1100        self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1101    }
1102
1103    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> Result<WinningProbability, HoprLibError> {
1104        self.chain_api
1105            .minimum_incoming_ticket_win_prob()
1106            .await
1107            .map_err(HoprLibError::chain)
1108    }
1109
1110    pub async fn get_channel_closure_notice_period(&self) -> Result<Duration, HoprLibError> {
1111        self.chain_api
1112            .channel_closure_notice_period()
1113            .await
1114            .map_err(HoprLibError::chain)
1115    }
1116
1117    pub async fn accounts_announced_on_chain(&self) -> Result<Vec<AccountEntry>, HoprLibError> {
1118        Ok(self
1119            .chain_api
1120            .stream_accounts(AccountSelector {
1121                public_only: true,
1122                ..Default::default()
1123            })
1124            .map_err(HoprLibError::chain)
1125            .await?
1126            .collect()
1127            .await)
1128    }
1129
1130    pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> Result<Option<Address>, HoprLibError> {
1131        let pubkey = hopr_transport::peer_id_to_public_key(peer_id)
1132            .await
1133            .map_err(HoprLibError::TransportError)?;
1134
1135        self.chain_api
1136            .packet_key_to_chain_key(&pubkey)
1137            .await
1138            .map_err(HoprLibError::chain)
1139    }
1140
1141    pub async fn chain_key_to_peerid(&self, address: &Address) -> Result<Option<PeerId>, HoprLibError> {
1142        self.chain_api
1143            .chain_key_to_packet_key(address)
1144            .await
1145            .map(|pk| pk.map(|v| v.into()))
1146            .map_err(HoprLibError::chain)
1147    }
1148
1149    pub async fn channel_from_hash(&self, channel_id: &Hash) -> Result<Option<ChannelEntry>, HoprLibError> {
1150        self.chain_api
1151            .channel_by_id(channel_id)
1152            .await
1153            .map_err(HoprLibError::chain)
1154    }
1155
1156    pub async fn channel(&self, src: &Address, dest: &Address) -> Result<Option<ChannelEntry>, HoprLibError> {
1157        self.chain_api
1158            .channel_by_parties(src, dest)
1159            .await
1160            .map_err(HoprLibError::chain)
1161    }
1162
1163    pub async fn channels_from(&self, src: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1164        Ok(self
1165            .chain_api
1166            .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1167                ChannelStatusDiscriminants::Closed,
1168                ChannelStatusDiscriminants::Open,
1169                ChannelStatusDiscriminants::PendingToClose,
1170            ]))
1171            .map_err(HoprLibError::chain)
1172            .await?
1173            .collect()
1174            .await)
1175    }
1176
1177    pub async fn channels_to(&self, dest: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1178        Ok(self
1179            .chain_api
1180            .stream_channels(
1181                ChannelSelector::default()
1182                    .with_destination(*dest)
1183                    .with_allowed_states(&[
1184                        ChannelStatusDiscriminants::Closed,
1185                        ChannelStatusDiscriminants::Open,
1186                        ChannelStatusDiscriminants::PendingToClose,
1187                    ]),
1188            )
1189            .map_err(HoprLibError::chain)
1190            .await?
1191            .collect()
1192            .await)
1193    }
1194
1195    pub async fn all_channels(&self) -> Result<Vec<ChannelEntry>, HoprLibError> {
1196        Ok(self
1197            .chain_api
1198            .stream_channels(ChannelSelector::default().with_allowed_states(&[
1199                ChannelStatusDiscriminants::Closed,
1200                ChannelStatusDiscriminants::Open,
1201                ChannelStatusDiscriminants::PendingToClose,
1202            ]))
1203            .map_err(HoprLibError::chain)
1204            .await?
1205            .collect()
1206            .await)
1207    }
1208
1209    pub async fn open_channel(
1210        &self,
1211        destination: &Address,
1212        amount: HoprBalance,
1213    ) -> Result<OpenChannelResult, HoprLibError> {
1214        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1215
1216        let channel_id = generate_channel_id(&self.me_onchain(), destination);
1217
1218        let confirm_awaiter = self
1219            .chain_api
1220            .open_channel(destination, amount)
1221            .await
1222            .map_err(HoprLibError::chain)?;
1223
1224        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1225            format!("open channel to {destination} ({channel_id})"),
1226            move |event| matches!(event, ChainEvent::ChannelOpened(c) if c.get_id() == &channel_id),
1227            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1228        )?;
1229
1230        let tx_hash = confirm_awaiter.await.map_err(|e| {
1231            event_abort.abort();
1232            HoprLibError::chain(e)
1233        })?;
1234
1235        let event = event_awaiter.await?;
1236        debug!(%event, "open channel event received");
1237
1238        Ok(OpenChannelResult { tx_hash, channel_id })
1239    }
1240
1241    pub async fn fund_channel(&self, channel_id: &ChannelId, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1242        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1243
1244        let channel_id = *channel_id;
1245
1246        let confirm_awaiter = self
1247            .chain_api
1248            .fund_channel(&channel_id, amount)
1249            .await
1250            .map_err(HoprLibError::chain)?;
1251
1252        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1253            format!("fund channel {channel_id}"),
1254            move |event| matches!(event, ChainEvent::ChannelBalanceIncreased(c, a) if c.get_id() == &channel_id && a == &amount),
1255            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT
1256        )?;
1257
1258        let res = confirm_awaiter.await.map_err(|e| {
1259            event_abort.abort();
1260            HoprLibError::chain(e)
1261        })?;
1262
1263        let event = event_awaiter.await?;
1264        debug!(%event, "fund channel event received");
1265
1266        Ok(res)
1267    }
1268
1269    pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> Result<CloseChannelResult, HoprLibError> {
1270        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1271
1272        let channel_id = *channel_id;
1273
1274        let confirm_awaiter = self
1275            .chain_api
1276            .close_channel(&channel_id)
1277            .await
1278            .map_err(HoprLibError::chain)?;
1279
1280        let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1281            format!("close channel {channel_id}"),
1282            move |event| {
1283                matches!(event, ChainEvent::ChannelClosed(c) if c.get_id() == &channel_id)
1284                    || matches!(event, ChainEvent::ChannelClosureInitiated(c) if c.get_id() == &channel_id)
1285            },
1286            ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1287        )?;
1288
1289        let tx_hash = confirm_awaiter.await.map_err(|e| {
1290            event_abort.abort();
1291            HoprLibError::chain(e)
1292        })?;
1293
1294        let event = event_awaiter.await?;
1295        debug!(%event, "close channel event received");
1296
1297        Ok(CloseChannelResult { tx_hash })
1298    }
1299
1300    pub async fn tickets_in_channel(
1301        &self,
1302        channel_id: &ChannelId,
1303    ) -> Result<Option<Vec<RedeemableTicket>>, HoprLibError> {
1304        if let Some(channel) = self
1305            .chain_api
1306            .channel_by_id(channel_id)
1307            .await
1308            .map_err(|e| HoprTransportError::Other(e.into()))?
1309        {
1310            if &channel.destination == self.chain_api.me() {
1311                Ok(Some(
1312                    self.node_db
1313                        .stream_tickets([&channel])
1314                        .await
1315                        .map_err(HoprLibError::db)?
1316                        .collect()
1317                        .await,
1318                ))
1319            } else {
1320                Ok(None)
1321            }
1322        } else {
1323            Ok(None)
1324        }
1325    }
1326
1327    pub async fn all_tickets(&self) -> Result<Vec<VerifiedTicket>, HoprLibError> {
1328        Ok(self
1329            .node_db
1330            .stream_tickets(None::<TicketSelector>)
1331            .await
1332            .map_err(HoprLibError::db)?
1333            .map(|v| v.ticket)
1334            .collect()
1335            .await)
1336    }
1337
1338    pub async fn ticket_statistics(&self) -> Result<ChannelTicketStatistics, HoprLibError> {
1339        self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
1340    }
1341
1342    pub async fn reset_ticket_statistics(&self) -> Result<(), HoprLibError> {
1343        self.node_db
1344            .reset_ticket_statistics()
1345            .await
1346            .map_err(HoprLibError::chain)
1347    }
1348
1349    pub async fn redeem_all_tickets<B: Into<HoprBalance> + Send>(&self, min_value: B) -> Result<(), HoprLibError> {
1350        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1351
1352        let min_value = min_value.into();
1353
1354        self.chain_api
1355            .stream_channels(
1356                ChannelSelector::default()
1357                    .with_destination(self.me_onchain())
1358                    .with_allowed_states(&[
1359                        ChannelStatusDiscriminants::Open,
1360                        ChannelStatusDiscriminants::PendingToClose,
1361                    ]),
1362            )
1363            .map_err(HoprLibError::chain)
1364            .await?
1365            .map(|channel| {
1366                Ok(TicketSelector::from(&channel)
1367                    .with_amount(min_value..)
1368                    .with_index_range(channel.ticket_index..)
1369                    .with_state(AcknowledgedTicketStatus::Untouched))
1370            })
1371            .forward(self.redemption_requests()?)
1372            .await?;
1373
1374        Ok(())
1375    }
1376
1377    pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance> + Send>(
1378        &self,
1379        counterparty: &Address,
1380        min_value: B,
1381    ) -> Result<(), HoprLibError> {
1382        self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1383            .await
1384    }
1385
1386    pub async fn redeem_tickets_in_channel<B: Into<HoprBalance> + Send>(
1387        &self,
1388        channel_id: &Hash,
1389        min_value: B,
1390    ) -> Result<(), HoprLibError> {
1391        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1392
1393        let channel = self
1394            .chain_api
1395            .channel_by_id(channel_id)
1396            .await
1397            .map_err(HoprLibError::chain)?
1398            .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1399
1400        self.redemption_requests()?
1401            .send(
1402                TicketSelector::from(channel)
1403                    .with_amount(min_value.into()..)
1404                    .with_index_range(channel.ticket_index..)
1405                    .with_state(AcknowledgedTicketStatus::Untouched),
1406            )
1407            .await?;
1408
1409        Ok(())
1410    }
1411
1412    pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> Result<(), HoprLibError> {
1413        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1414
1415        self.redemption_requests()?
1416            .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1417            .await?;
1418
1419        Ok(())
1420    }
1421
1422    pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send + 'static {
1423        self.winning_ticket_subscribers.1.activate_cloned()
1424    }
1425
1426    pub fn redemption_requests(&self) -> Result<SinkMap, HoprLibError> {
1427        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1428
1429        Ok(self
1430            .redeem_requests
1431            .get()
1432            .cloned()
1433            .expect("redeem_requests is not initialized")
1434            .sink_map_err(|e| HoprLibError::GeneralError(format!("failed to send redemption request: {e}"))))
1435    }
1436
1437    pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1438        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1439
1440        self.chain_api
1441            .withdraw(amount, &recipient)
1442            .and_then(identity)
1443            .map_err(HoprLibError::chain)
1444            .await
1445    }
1446
1447    pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> Result<Hash, HoprLibError> {
1448        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1449
1450        self.chain_api
1451            .withdraw(amount, &recipient)
1452            .and_then(identity)
1453            .map_err(HoprLibError::chain)
1454            .await
1455    }
1456}