hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15
16/// Helper functions.
17mod helpers;
18
19/// Configuration-related public types
20pub mod config;
21/// Various public constants.
22pub mod constants;
23/// Lists all errors thrown from this library.
24pub mod errors;
25
26/// Utility module with helper types and functionality over hopr-lib behavior.
27pub mod utils;
28
29/// Public traits for interactions with this library.
30pub mod traits;
31
32/// Functionality related to the HOPR node state.
33pub mod state;
34
35#[cfg(any(feature = "testing", test))]
36pub mod testing;
37
38/// Re-exports of libraries necessary for API and interface operations.
39#[doc(hidden)]
40pub mod exports {
41    pub use hopr_api as api;
42    pub mod types {
43        pub use hopr_internal_types as internal;
44        pub use hopr_primitive_types as primitive;
45    }
46    pub mod crypto {
47        pub use hopr_crypto_keypair as keypair;
48        pub use hopr_crypto_types as types;
49    }
50
51    pub mod network {
52        pub use hopr_network_types as types;
53    }
54
55    pub use hopr_transport as transport;
56}
57
58/// Export of relevant types for easier integration.
59#[doc(hidden)]
60pub mod prelude {
61    pub use super::exports::{
62        crypto::{
63            keypair::key_pair::HoprKeys,
64            types::prelude::{ChainKeypair, Hash, OffchainKeypair},
65        },
66        network::types::{
67            prelude::ForeignDataMode,
68            udp::{ConnectedUdpStream, UdpStreamParallelism},
69        },
70        transport::{OffchainPublicKey, socket::HoprSocket},
71        types::primitive::prelude::Address,
72    };
73}
74
75use std::{
76    convert::identity,
77    num::NonZeroUsize,
78    sync::{Arc, OnceLock, atomic::Ordering},
79    time::Duration,
80};
81
82use futures::{FutureExt, SinkExt, StreamExt, TryFutureExt, channel::mpsc::channel};
83use hopr_api::{
84    chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
85    db::{HoprNodeDbApi, PeerStatus, TicketMarker, TicketSelector},
86};
87use hopr_async_runtime::prelude::spawn;
88pub use hopr_async_runtime::{Abortable, AbortableList};
89pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
90pub use hopr_crypto_types::prelude::*;
91pub use hopr_internal_types::prelude::*;
92pub use hopr_network_types::prelude::*;
93#[cfg(all(feature = "prometheus", not(test)))]
94use hopr_platform::time::native::current_time;
95pub use hopr_primitive_types::prelude::*;
96#[cfg(feature = "runtime-tokio")]
97pub use hopr_transport::transfer_session;
98pub use hopr_transport::*;
99use tracing::{debug, error, info, trace, warn};
100
101pub use crate::{
102    config::SafeModule,
103    constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
104    errors::{HoprLibError, HoprStatusError},
105    state::{HoprLibProcess, HoprState},
106    traits::chain::{CloseChannelResult, OpenChannelResult},
107};
108
109#[cfg(all(feature = "prometheus", not(test)))]
110lazy_static::lazy_static! {
111    static ref METRIC_PROCESS_START_TIME:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
112        "hopr_start_time",
113        "The unix timestamp in seconds at which the process was started"
114    ).unwrap();
115    static ref METRIC_HOPR_LIB_VERSION:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
116        "hopr_lib_version",
117        "Executed version of hopr-lib",
118        &["version"]
119    ).unwrap();
120    static ref METRIC_HOPR_NODE_INFO:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
121        "hopr_node_addresses",
122        "Node on-chain and off-chain addresses",
123        &["peerid", "address", "safe_address", "module_address"]
124    ).unwrap();
125}
126
127pub struct DummyCoverTrafficType {
128    #[allow(dead_code)]
129    _unconstructable: (),
130}
131
132impl TrafficGeneration for DummyCoverTrafficType {
133    fn build(
134        self,
135    ) -> (
136        impl futures::Stream<Item = DestinationRouting> + Send,
137        impl futures::Sink<
138            std::result::Result<hopr_transport::Telemetry, hopr_transport::ProbeError>,
139            Error = impl std::error::Error,
140        > + Send
141        + Sync
142        + Clone
143        + 'static,
144    ) {
145        (
146            futures::stream::empty(),
147            futures::sink::drain::<std::result::Result<hopr_transport::Telemetry, hopr_transport::ProbeError>>(),
148        )
149    }
150}
151
152/// Prepare an optimized version of the tokio runtime setup for hopr-lib specifically.
153///
154/// Divide the available CPU parallelism by 2, since half of the available threads are
155/// to be used for IO-bound and half for CPU-bound tasks.
156#[cfg(feature = "runtime-tokio")]
157pub fn prepare_tokio_runtime(
158    num_cpu_threads: Option<NonZeroUsize>,
159    num_io_threads: Option<NonZeroUsize>,
160) -> anyhow::Result<tokio::runtime::Runtime> {
161    use std::str::FromStr;
162    let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
163
164    hopr_parallelize::cpu::init_thread_pool(
165        num_cpu_threads
166            .map(|v| v.get())
167            .or(avail_parallelism)
168            .ok_or(anyhow::anyhow!(
169                "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
170                 environment variable."
171            ))?
172            .max(1),
173    )?;
174
175    Ok(tokio::runtime::Builder::new_multi_thread()
176        .enable_all()
177        .worker_threads(
178            num_io_threads
179                .map(|v| v.get())
180                .or(avail_parallelism)
181                .ok_or(anyhow::anyhow!(
182                    "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
183                     environment variable."
184                ))?
185                .max(1),
186        )
187        .thread_name("hoprd")
188        .thread_stack_size(
189            std::env::var("HOPRD_THREAD_STACK_SIZE")
190                .ok()
191                .and_then(|v| usize::from_str(&v).ok())
192                .unwrap_or(10 * 1024 * 1024)
193                .max(2 * 1024 * 1024),
194        )
195        .build()?)
196}
197
198/// Type alias used to send and receive transport data via a running HOPR node.
199pub type HoprTransportIO = socket::HoprSocket<
200    futures::channel::mpsc::Receiver<ApplicationDataIn>,
201    futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
202>;
203
204/// HOPR main object providing the entire HOPR node functionality
205///
206/// Instantiating this object creates all processes and objects necessary for
207/// running the HOPR node. Once created, the node can be started using the
208/// `run()` method.
209///
210/// Externally offered API should be enough to perform all necessary tasks
211/// with the HOPR node manually, but it is advised to create such a configuration
212/// that manual interaction is unnecessary.
213///
214/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
215pub struct Hopr<Chain, Db> {
216    me: OffchainKeypair,
217    cfg: config::HoprLibConfig,
218    state: Arc<state::AtomicHoprState>,
219    transport_api: HoprTransport<Db, Chain>,
220    redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
221    node_db: Db,
222    chain_api: Chain,
223    processes: OnceLock<AbortableList<HoprLibProcess>>,
224}
225
226impl<Chain, Db> Hopr<Chain, Db>
227where
228    Chain: HoprChainApi + Clone + Send + Sync + 'static,
229    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
230{
231    pub async fn new(
232        cfg: config::HoprLibConfig,
233        hopr_chain_api: Chain,
234        hopr_node_db: Db,
235        me: &OffchainKeypair,
236        me_onchain: &ChainKeypair,
237    ) -> errors::Result<Self> {
238        if hopr_crypto_random::is_rng_fixed() {
239            warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
240        }
241
242        let multiaddress: Multiaddr = (&cfg.host).try_into().map_err(HoprLibError::TransportError)?;
243
244        let my_multiaddresses = vec![multiaddress];
245
246        let hopr_transport_api = HoprTransport::new(
247            me,
248            me_onchain,
249            HoprTransportConfig {
250                transport: cfg.transport.clone(),
251                network: cfg.network_options.clone(),
252                protocol: cfg.protocol,
253                probe: cfg.probe,
254                session: cfg.session,
255            },
256            hopr_node_db.clone(),
257            hopr_chain_api.clone(),
258            my_multiaddresses,
259        );
260
261        #[cfg(all(feature = "prometheus", not(test)))]
262        {
263            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
264            METRIC_HOPR_LIB_VERSION.set(
265                &[const_format::formatcp!("{}", constants::APP_VERSION)],
266                const_format::formatcp!(
267                    "{}.{}",
268                    env!("CARGO_PKG_VERSION_MAJOR"),
269                    env!("CARGO_PKG_VERSION_MINOR")
270                )
271                .parse()
272                .unwrap_or(0.0),
273            );
274
275            // Calling get_ticket_statistics will initialize the respective metrics on tickets
276            if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
277                error!(%error, "failed to initialize ticket statistics metrics");
278            }
279        }
280
281        Ok(Self {
282            me: me.clone(),
283            cfg,
284            state: Arc::new(state::AtomicHoprState::new(HoprState::Uninitialized)),
285            transport_api: hopr_transport_api,
286            chain_api: hopr_chain_api,
287            node_db: hopr_node_db,
288            redeem_requests: OnceLock::new(),
289            processes: OnceLock::new(),
290        })
291    }
292
293    fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
294        if self.status() == state {
295            Ok(())
296        } else {
297            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
298        }
299    }
300
301    pub fn status(&self) -> HoprState {
302        self.state.load(Ordering::Relaxed)
303    }
304
305    pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
306        self.chain_api
307            .get_balance(self.me_onchain())
308            .await
309            .map_err(HoprLibError::chain)
310    }
311
312    pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
313        self.chain_api
314            .get_balance(self.cfg.safe_module.safe_address)
315            .await
316            .map_err(HoprLibError::chain)
317    }
318
319    pub async fn chain_info(&self) -> errors::Result<ChainInfo> {
320        self.chain_api.chain_info().await.map_err(HoprLibError::chain)
321    }
322
323    pub fn get_safe_config(&self) -> SafeModule {
324        self.cfg.safe_module.clone()
325    }
326
327    pub fn config(&self) -> &config::HoprLibConfig {
328        &self.cfg
329    }
330
331    #[inline]
332    fn is_public(&self) -> bool {
333        self.cfg.publish
334    }
335
336    pub async fn run<
337        Ct,
338        #[cfg(feature = "session-server")] T: traits::session::HoprSessionServer + Clone + Send + 'static,
339    >(
340        &self,
341        cover_traffic: Option<Ct>,
342        #[cfg(feature = "session-server")] serve_handler: T,
343    ) -> errors::Result<HoprTransportIO>
344    where
345        Ct: TrafficGeneration + Send + Sync + 'static,
346    {
347        self.error_if_not_in_state(
348            HoprState::Uninitialized,
349            "cannot start the hopr node multiple times".into(),
350        )?;
351
352        #[cfg(feature = "testing")]
353        warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
354
355        info!(
356            address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
357            "node is not started, please fund this node",
358        );
359
360        helpers::wait_for_funds(
361            *MIN_NATIVE_BALANCE,
362            *SUGGESTED_NATIVE_BALANCE,
363            Duration::from_secs(200),
364            self.me_onchain(),
365            &self.chain_api,
366        )
367        .await?;
368
369        let mut processes = AbortableList::<HoprLibProcess>::default();
370
371        info!("starting HOPR node...");
372        self.state.store(HoprState::Initializing, Ordering::Relaxed);
373
374        let balance: XDaiBalance = self.get_balance().await?;
375        let minimum_balance = *constants::MIN_NATIVE_BALANCE;
376
377        info!(
378            address = %self.me_onchain(),
379            %balance,
380            %minimum_balance,
381            "node information"
382        );
383
384        if balance.le(&minimum_balance) {
385            return Err(HoprLibError::GeneralError(
386                "cannot start the node without a sufficiently funded wallet".into(),
387            ));
388        }
389
390        // Once we are able to query the chain,
391        // check if the ticket price is configured correctly.
392        let network_min_ticket_price = self
393            .chain_api
394            .minimum_ticket_price()
395            .await
396            .map_err(HoprLibError::chain)?;
397        let configured_ticket_price = self.cfg.protocol.outgoing_ticket_price;
398        if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
399            return Err(HoprLibError::GeneralError(format!(
400                "configured outgoing ticket price is lower than the network minimum ticket price: \
401                 {configured_ticket_price:?} < {network_min_ticket_price}"
402            )));
403        }
404        // Once we are able to query the chain,
405        // check if the winning probability is configured correctly.
406        let network_min_win_prob = self
407            .chain_api
408            .minimum_incoming_ticket_win_prob()
409            .await
410            .map_err(HoprLibError::chain)?;
411        let configured_win_prob = self.cfg.protocol.outgoing_ticket_winning_prob;
412        if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
413            && configured_win_prob
414                .and_then(|c| WinningProbability::try_from(c).ok())
415                .is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
416        {
417            return Err(HoprLibError::GeneralError(format!(
418                "configured outgoing ticket winning probability is lower than the network minimum winning \
419                 probability: {configured_win_prob:?} < {network_min_win_prob}"
420            )));
421        }
422
423        // Calculate the minimum capacity based on accounts (each account can generate 2 messages),
424        // plus 100 as an additional buffer
425        let minimum_capacity = self
426            .chain_api
427            .count_accounts(AccountSelector {
428                public_only: true,
429                ..Default::default()
430            })
431            .await
432            .map_err(HoprLibError::chain)?
433            .saturating_mul(2)
434            .saturating_add(100);
435
436        let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
437            .ok()
438            .and_then(|s| s.trim().parse::<usize>().ok())
439            .filter(|&c| c > 0)
440            .unwrap_or(2048)
441            .max(minimum_capacity);
442
443        debug!(
444            capacity = chain_discovery_events_capacity,
445            minimum_required = minimum_capacity,
446            "creating chain discovery events channel"
447        );
448        let (indexer_peer_update_tx, indexer_peer_update_rx) =
449            channel::<PeerDiscovery>(chain_discovery_events_capacity);
450
451        // Stream all the existing announcements and also subscribe to all future on-chain
452        // announcements
453        let (announcements_stream, announcements_handle) = futures::stream::abortable(
454            self.chain_api
455                .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])
456                .map_err(HoprLibError::chain)?,
457        );
458        processes.insert(HoprLibProcess::AccountAnnouncements, announcements_handle);
459
460        spawn(
461            announcements_stream
462                .filter_map(|event| {
463                    futures::future::ready(event.try_as_announcement().map(|account| {
464                        PeerDiscovery::Announce(account.public_key.into(), account.get_multiaddrs().to_vec())
465                    }))
466                })
467                .map(Ok)
468                .forward(indexer_peer_update_tx)
469                .inspect(
470                    |_| warn!(task = %HoprLibProcess::AccountAnnouncements,"long-running background task finished"),
471                ),
472        );
473
474        info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
475
476        let safe_addr = self.cfg.safe_module.safe_address;
477
478        if self.me_onchain() == safe_addr {
479            return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
480        }
481
482        info!(%safe_addr, "registering safe with this node");
483        match self.chain_api.register_safe(&safe_addr).await {
484            Ok(awaiter) => {
485                // Wait until the registration is confirmed on-chain, otherwise we cannot proceed.
486                awaiter.await.map_err(HoprLibError::chain)?;
487                info!(%safe_addr, "safe successfully registered with this node");
488            }
489            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
490                info!(%safe_addr, "this safe is already registered with this node");
491            }
492            Err(error) => {
493                error!(%safe_addr, %error, "safe registration failed");
494                return Err(HoprLibError::chain(error));
495            }
496        }
497
498        // Only public nodes announce multiaddresses
499        let multiaddresses_to_announce = if self.is_public() {
500            // The multiaddresses are filtered for the non-private ones,
501            // unless `announce_local_addresses` is set to `true`.
502            self.transport_api.announceable_multiaddresses()
503        } else {
504            Vec::with_capacity(0)
505        };
506
507        // Warn when announcing a private multiaddress, which is acceptable in certain scenarios
508        multiaddresses_to_announce
509            .iter()
510            .filter(|a| !is_public_address(a))
511            .for_each(|multi_addr| tracing::warn!(?multi_addr, "announcing private multiaddress"));
512
513        // At this point the node is already registered with Safe, so
514        // we can announce via Safe-compliant TX
515        info!(?multiaddresses_to_announce, "announcing node on chain");
516        match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
517            Ok(awaiter) => {
518                // Wait until the announcement is confirmed on-chain, otherwise we cannot proceed.
519                awaiter.await.map_err(HoprLibError::chain)?;
520                info!(?multiaddresses_to_announce, "node has been successfully announced");
521            }
522            Err(AnnouncementError::AlreadyAnnounced) => {
523                info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
524            }
525            Err(error) => {
526                error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
527                return Err(HoprLibError::chain(error));
528            }
529        }
530
531        let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
532            .ok()
533            .and_then(|s| s.trim().parse::<usize>().ok())
534            .filter(|&c| c > 0)
535            .unwrap_or(256);
536
537        let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
538        #[cfg(feature = "session-server")]
539        {
540            debug!(capacity = incoming_session_channel_capacity, "creating session server");
541            processes.insert(
542                HoprLibProcess::SessionServer,
543                hopr_async_runtime::spawn_as_abortable!(
544                    _session_rx
545                        .for_each_concurrent(None, move |session| {
546                            let serve_handler = serve_handler.clone();
547                            async move {
548                                let session_id = *session.session.id();
549                                match serve_handler.process(session).await {
550                                    Ok(_) => debug!(?session_id, "client session processed successfully"),
551                                    Err(error) => error!(
552                                        ?session_id,
553                                        %error,
554                                        "client session processing failed"
555                                    ),
556                                }
557                            }
558                        })
559                        .inspect(|_| tracing::warn!(
560                            task = %HoprLibProcess::SessionServer,
561                            "long-running background task finished"
562                        ))
563                ),
564            );
565        }
566
567        info!("starting transport");
568
569        let (hopr_socket, transport_processes) = self
570            .transport_api
571            .run(cover_traffic, indexer_peer_update_rx, session_tx)
572            .await?;
573        processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
574
575        info!("starting outgoing ticket flush process");
576        let (index_flush_stream, index_flush_handle) =
577            futures::stream::abortable(futures_time::stream::interval(Duration::from_secs(5).into()));
578        processes.insert(HoprLibProcess::TicketIndexFlush, index_flush_handle);
579        let node_db = self.node_db.clone();
580        spawn(
581            index_flush_stream
582                .for_each(move |_| {
583                    let node_db = node_db.clone();
584                    async move {
585                        match node_db.persist_outgoing_ticket_indices().await {
586                            Ok(count) => trace!(count, "successfully flushed states of outgoing ticket indices"),
587                            Err(error) => error!(%error, "Failed to flush ticket indices"),
588                        }
589                    }
590                })
591                .inspect(|_| {
592                    tracing::warn!(
593                        task = %HoprLibProcess::TicketIndexFlush,
594                        "long-running background task finished"
595                    )
596                }),
597        );
598
599        // Start a queue that takes care of redeeming tickets via given TicketSelectors
600        let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
601        let _ = self.redeem_requests.set(redemption_req_tx);
602        let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
603        processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
604        let chain = self.chain_api.clone();
605        let node_db = self.node_db.clone();
606        spawn(redemption_req_rx
607            .for_each(move |selector| {
608                let chain = chain.clone();
609                let db = node_db.clone();
610                async move {
611                    match chain.redeem_tickets_via_selector(&db, selector).await {
612                        Ok(res) => debug!(%res, "redemption complete"),
613                        Err(error) => error!(%error, "redemption failed"),
614                    }
615                }
616            })
617            .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
618        );
619
620        let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
621        processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
622        let chain = self.chain_api.clone();
623        let node_db = self.node_db.clone();
624        let events = chain.subscribe().map_err(HoprLibError::chain)?;
625        spawn(
626            futures::stream::Abortable::new(
627                events
628                    .filter_map(move |event|
629                        futures::future::ready(
630                            event
631                                .try_as_channel_closed()
632                                .filter(|channel| channel.direction(chain.me()) == Some(ChannelDirection::Incoming))
633                        )
634                    ),
635                chain_events_sub_reg
636            )
637            .for_each(move |closed_channel| {
638                let node_db = node_db.clone();
639                async move {
640                    match node_db.mark_tickets_as(closed_channel.into(), TicketMarker::Neglected).await {
641                        Ok(num_neglected) if num_neglected > 0 => {
642                            warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
643                        },
644                        Ok(_) => {
645                            debug!(%closed_channel, "no neglected tickets on incoming closed channel");
646                        },
647                        Err(error) => {
648                            error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
649                        }
650                    }
651                }
652            })
653            .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
654        );
655
656        // NOTE: after the chain is synced, we can reset tickets which are considered
657        // redeemed but on-chain state does not align with that. This implies there was a problem
658        // right when the transaction was sent on-chain. In such cases, we simply let it retry and
659        // handle errors appropriately.
660        let mut channels = self
661            .chain_api
662            .stream_channels(ChannelSelector {
663                destination: self.me_onchain().into(),
664                ..Default::default()
665            })
666            .map_err(HoprLibError::chain)
667            .await?;
668
669        while let Some(channel) = channels.next().await {
670            self.node_db
671                .update_ticket_states_and_fetch(
672                    TicketSelector::from(&channel)
673                        .with_state(AcknowledgedTicketStatus::BeingRedeemed)
674                        .with_index_range(channel.ticket_index.as_u64()..),
675                    AcknowledgedTicketStatus::Untouched,
676                )
677                .map_err(HoprLibError::db)
678                .await?
679                .for_each(|ticket| {
680                    info!(%ticket, "fixed next out-of-sync ticket");
681                    futures::future::ready(())
682                })
683                .await;
684        }
685
686        self.state.store(HoprState::Running, Ordering::Relaxed);
687
688        info!(
689            id = %self.me_peer_id(),
690            version = constants::APP_VERSION,
691            "NODE STARTED AND RUNNING"
692        );
693
694        #[cfg(all(feature = "prometheus", not(test)))]
695        METRIC_HOPR_NODE_INFO.set(
696            &[
697                &self.me.public().to_peerid_str(),
698                &self.me_onchain().to_string(),
699                &self.cfg.safe_module.safe_address.to_string(),
700                &self.cfg.safe_module.module_address.to_string(),
701            ],
702            1.0,
703        );
704
705        let _ = self.processes.set(processes);
706        Ok(hopr_socket)
707    }
708
709    /// Used to practically shut down all node's processes without dropping the instance.
710    ///
711    /// This means that the instance can be used to retrieve some information, but all
712    /// active operations will stop and new will be impossible to perform.
713    /// Such operations will return [`HoprStatusError::NotThereYet`].
714    ///
715    /// This is the final state and cannot be reversed by calling [`HoprLib::run`] again.
716    pub fn shutdown(&self) -> Result<(), HoprLibError> {
717        self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
718        if let Some(processes) = self.processes.get() {
719            processes.abort_all();
720        }
721        self.state.store(HoprState::Terminated, Ordering::Relaxed);
722        info!("NODE SHUTDOWN COMPLETE");
723        Ok(())
724    }
725
726    // p2p transport =========
727    /// Own PeerId used in the libp2p transport layer
728    pub fn me_peer_id(&self) -> PeerId {
729        (*self.me.public()).into()
730    }
731
732    /// Get the list of all announced public nodes in the network
733    pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
734        Ok(self
735            .chain_api
736            .stream_accounts(AccountSelector {
737                public_only: true,
738                ..Default::default()
739            })
740            .map_err(HoprLibError::chain)
741            .await?
742            .map(|entry| {
743                (
744                    PeerId::from(entry.public_key),
745                    entry.chain_addr,
746                    entry.get_multiaddrs().to_vec(),
747                )
748            })
749            .collect()
750            .await)
751    }
752
753    /// Ping another node in the network based on the PeerId
754    ///
755    /// Returns the RTT (round trip time), i.e. how long it took for the ping to return.
756    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
757        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
758
759        Ok(self.transport_api.ping(peer).await?)
760    }
761
762    /// Create a client session connection returning a session object that implements
763    /// [`futures::io::AsyncRead`] and [`futures::io::AsyncWrite`] and can bu used as a read/write binary session.
764    #[cfg(feature = "session-client")]
765    pub async fn connect_to(
766        &self,
767        destination: Address,
768        target: SessionTarget,
769        cfg: SessionClientConfig,
770    ) -> errors::Result<HoprSession> {
771        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
772
773        let backoff = backon::ConstantBuilder::default()
774            .with_max_times(self.cfg.session.establish_max_retries as usize)
775            .with_delay(self.cfg.session.establish_retry_timeout)
776            .with_jitter();
777
778        use backon::Retryable;
779
780        Ok((|| {
781            let cfg = cfg.clone();
782            let target = target.clone();
783            async { self.transport_api.new_session(destination, target, cfg).await }
784        })
785        .retry(backoff)
786        .sleep(backon::FuturesTimerSleeper)
787        .await?)
788    }
789
790    /// Sends keep-alive to the given [`HoprSessionId`], making sure the session is not
791    /// closed due to inactivity.
792    #[cfg(feature = "session-client")]
793    pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
794        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
795        Ok(self.transport_api.probe_session(id).await?)
796    }
797
798    #[cfg(feature = "session-client")]
799    pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
800        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
801        Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
802    }
803
804    #[cfg(feature = "session-client")]
805    pub async fn update_session_surb_balancer_config(
806        &self,
807        id: &SessionId,
808        cfg: SurbBalancerConfig,
809    ) -> errors::Result<()> {
810        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
811        Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
812    }
813
814    /// List all multiaddresses announced by this node
815    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
816        self.transport_api.local_multiaddresses()
817    }
818
819    /// List all multiaddresses on which the node is listening
820    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
821        self.transport_api.listening_multiaddresses().await
822    }
823
824    /// List all multiaddresses observed for a PeerId
825    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
826        self.transport_api.network_observed_multiaddresses(peer).await
827    }
828
829    /// List all multiaddresses announced on-chain for the given node.
830    pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> errors::Result<Vec<Multiaddr>> {
831        let peer = *peer;
832        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
833        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
834
835        match self
836            .chain_api
837            .stream_accounts(AccountSelector {
838                public_only: false,
839                offchain_key: Some(pubkey),
840                ..Default::default()
841            })
842            .map_err(HoprLibError::chain)
843            .await?
844            .next()
845            .await
846        {
847            Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
848            None => {
849                error!(%peer, "no information");
850                Ok(vec![])
851            }
852        }
853    }
854
855    // Network =========
856
857    /// Get measured network health
858    pub async fn network_health(&self) -> Health {
859        self.transport_api.network_health().await
860    }
861
862    /// List all peers connected to this
863    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
864        Ok(self.transport_api.network_connected_peers().await?)
865    }
866
867    /// Get all data collected from the network relevant for a PeerId
868    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
869        Ok(self.transport_api.network_peer_info(peer).await?)
870    }
871
872    /// Get peers connected peers with quality higher than some value
873    pub async fn all_network_peers(
874        &self,
875        minimum_quality: f64,
876    ) -> errors::Result<Vec<(Option<Address>, PeerId, PeerStatus)>> {
877        Ok(
878            futures::stream::iter(self.transport_api.network_connected_peers().await?)
879                .filter_map(|peer| async move {
880                    if let Ok(Some(info)) = self.transport_api.network_peer_info(&peer).await {
881                        if info.get_average_quality() >= minimum_quality {
882                            Some((peer, info))
883                        } else {
884                            None
885                        }
886                    } else {
887                        None
888                    }
889                })
890                .filter_map(|(peer_id, info)| async move {
891                    let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
892                    Some((address, peer_id, info))
893                })
894                .collect::<Vec<_>>()
895                .await,
896        )
897    }
898
899    // Ticket ========
900    /// Get all tickets in a channel specified by [`prelude::Hash`]
901    pub async fn tickets_in_channel(&self, channel: &prelude::Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
902        Ok(self.transport_api.tickets_in_channel(channel).await?)
903    }
904
905    /// Get all tickets
906    pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
907        Ok(self.transport_api.all_tickets().await?)
908    }
909
910    /// Get statistics for all tickets
911    pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
912        Ok(self.transport_api.ticket_statistics().await?)
913    }
914
915    /// Reset the ticket metrics to zero
916    pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
917        self.node_db
918            .reset_ticket_statistics()
919            .await
920            .map_err(HoprLibError::chain)
921    }
922
923    // Chain =========
924    pub fn me_onchain(&self) -> Address {
925        *self.chain_api.me()
926    }
927
928    /// Get ticket price
929    pub async fn get_ticket_price(&self) -> errors::Result<HoprBalance> {
930        self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
931    }
932
933    /// Get minimum incoming ticket winning probability
934    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
935        self.chain_api
936            .minimum_incoming_ticket_win_prob()
937            .await
938            .map_err(HoprLibError::chain)
939    }
940
941    /// List of all accounts announced on the chain
942    pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
943        Ok(self
944            .chain_api
945            .stream_accounts(AccountSelector {
946                public_only: true,
947                ..Default::default()
948            })
949            .map_err(HoprLibError::chain)
950            .await?
951            .collect()
952            .await)
953    }
954
955    /// Get the channel entry from Hash.
956    /// @returns the channel entry of those two nodes
957    pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
958        self.chain_api
959            .channel_by_id(channel_id)
960            .await
961            .map_err(HoprLibError::chain)
962    }
963
964    /// Get the channel entry between source and destination node.
965    /// @param src Address
966    /// @param dest Address
967    /// @returns the channel entry of those two nodes
968    pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<Option<ChannelEntry>> {
969        self.chain_api
970            .channel_by_parties(src, dest)
971            .await
972            .map_err(HoprLibError::chain)
973    }
974
975    /// List all channels open from a specified Address
976    pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
977        Ok(self
978            .chain_api
979            .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
980                ChannelStatusDiscriminants::Closed,
981                ChannelStatusDiscriminants::Open,
982                ChannelStatusDiscriminants::PendingToClose,
983            ]))
984            .map_err(HoprLibError::chain)
985            .await?
986            .collect()
987            .await)
988    }
989
990    /// List all channels open to a specified address
991    pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
992        Ok(self
993            .chain_api
994            .stream_channels(
995                ChannelSelector::default()
996                    .with_destination(*dest)
997                    .with_allowed_states(&[
998                        ChannelStatusDiscriminants::Closed,
999                        ChannelStatusDiscriminants::Open,
1000                        ChannelStatusDiscriminants::PendingToClose,
1001                    ]),
1002            )
1003            .map_err(HoprLibError::chain)
1004            .await?
1005            .collect()
1006            .await)
1007    }
1008
1009    /// List all channels
1010    pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1011        Ok(self
1012            .chain_api
1013            .stream_channels(ChannelSelector::default().with_allowed_states(&[
1014                ChannelStatusDiscriminants::Closed,
1015                ChannelStatusDiscriminants::Open,
1016                ChannelStatusDiscriminants::PendingToClose,
1017            ]))
1018            .map_err(HoprLibError::chain)
1019            .await?
1020            .collect()
1021            .await)
1022    }
1023
1024    /// Current safe allowance balance
1025    pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1026        self.chain_api
1027            .safe_allowance(self.cfg.safe_module.safe_address)
1028            .await
1029            .map_err(HoprLibError::chain)
1030    }
1031
1032    /// Withdraw on-chain assets to a given address
1033    /// @param recipient the account where the assets should be transferred to
1034    /// @param amount how many tokens to be transferred
1035    pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1036        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1037
1038        self.chain_api
1039            .withdraw(amount, &recipient)
1040            .and_then(identity)
1041            .map_err(HoprLibError::chain)
1042            .await
1043    }
1044
1045    /// Withdraw on-chain native assets to a given address
1046    /// @param recipient the account where the assets should be transferred to
1047    /// @param amount how many tokens to be transferred
1048    pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<prelude::Hash> {
1049        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1050
1051        self.chain_api
1052            .withdraw(amount, &recipient)
1053            .and_then(identity)
1054            .map_err(HoprLibError::chain)
1055            .await
1056    }
1057
1058    pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1059        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1060
1061        let (channel_id, tx_hash) = self
1062            .chain_api
1063            .open_channel(destination, amount)
1064            .and_then(identity)
1065            .map_err(HoprLibError::chain)
1066            .await?;
1067
1068        Ok(OpenChannelResult { tx_hash, channel_id })
1069    }
1070
1071    pub async fn fund_channel(&self, channel_id: &prelude::Hash, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1072        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1073
1074        self.chain_api
1075            .fund_channel(channel_id, amount)
1076            .and_then(identity)
1077            .map_err(HoprLibError::chain)
1078            .await
1079    }
1080
1081    pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> errors::Result<CloseChannelResult> {
1082        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1083
1084        let tx_hash = self
1085            .chain_api
1086            .close_channel(channel_id)
1087            .and_then(identity)
1088            .map_err(HoprLibError::chain)
1089            .await?;
1090
1091        Ok(CloseChannelResult { tx_hash })
1092    }
1093
1094    pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1095        self.chain_api
1096            .channel_closure_notice_period()
1097            .await
1098            .map_err(HoprLibError::chain)
1099    }
1100
1101    pub fn redemption_requests(
1102        &self,
1103    ) -> errors::Result<impl futures::Sink<TicketSelector, Error = HoprLibError> + Clone> {
1104        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1105
1106        // TODO: add universal timeout sink here
1107        Ok(self
1108            .redeem_requests
1109            .get()
1110            .cloned()
1111            .expect("redeem_requests is not initialized")
1112            .sink_map_err(HoprLibError::other))
1113    }
1114
1115    pub async fn redeem_all_tickets<B: Into<HoprBalance>>(&self, min_value: B) -> errors::Result<()> {
1116        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1117
1118        let min_value = min_value.into();
1119
1120        self.chain_api
1121            .stream_channels(
1122                ChannelSelector::default()
1123                    .with_destination(self.me_onchain())
1124                    .with_allowed_states(&[
1125                        ChannelStatusDiscriminants::Open,
1126                        ChannelStatusDiscriminants::PendingToClose,
1127                    ]),
1128            )
1129            .map_err(HoprLibError::chain)
1130            .await?
1131            .map(|channel| {
1132                Ok(TicketSelector::from(&channel)
1133                    .with_amount(min_value..)
1134                    .with_index_range(channel.ticket_index.as_u64()..)
1135                    .with_state(AcknowledgedTicketStatus::Untouched))
1136            })
1137            .forward(self.redemption_requests()?)
1138            .await?;
1139
1140        Ok(())
1141    }
1142
1143    pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1144        &self,
1145        counterparty: &Address,
1146        min_value: B,
1147    ) -> errors::Result<()> {
1148        self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1149            .await
1150    }
1151
1152    pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1153        &self,
1154        channel_id: &Hash,
1155        min_value: B,
1156    ) -> errors::Result<()> {
1157        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1158
1159        let channel = self
1160            .chain_api
1161            .channel_by_id(channel_id)
1162            .await
1163            .map_err(HoprLibError::chain)?
1164            .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1165
1166        self.redemption_requests()?
1167            .send(
1168                TicketSelector::from(channel)
1169                    .with_amount(min_value.into()..)
1170                    .with_index_range(channel.ticket_index.as_u64()..)
1171                    .with_state(AcknowledgedTicketStatus::Untouched),
1172            )
1173            .await?;
1174
1175        Ok(())
1176    }
1177
1178    pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1179        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1180
1181        self.redemption_requests()?
1182            .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1183            .await?;
1184
1185        Ok(())
1186    }
1187
1188    pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1189        let peer_id = *peer_id;
1190        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
1191        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer_id))
1192            .await
1193            .map_err(|e| HoprLibError::GeneralError(format!("failed to convert peer id to off-chain key: {}", e)))?;
1194
1195        self.chain_api
1196            .packet_key_to_chain_key(&pubkey)
1197            .await
1198            .map_err(HoprLibError::chain)
1199    }
1200
1201    pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1202        self.chain_api
1203            .chain_key_to_packet_key(address)
1204            .await
1205            .map(|pk| pk.map(|v| v.into()))
1206            .map_err(HoprLibError::chain)
1207    }
1208
1209    // === telemetry
1210    /// Prometheus formatted metrics collected by the hopr-lib components.
1211    pub fn collect_hopr_metrics() -> errors::Result<String> {
1212        cfg_if::cfg_if! {
1213            if #[cfg(all(feature = "prometheus", not(test)))] {
1214                hopr_metrics::gather_all_metrics().map_err(|e| HoprLibError::Other(e.into()))
1215            } else {
1216                Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
1217            }
1218        }
1219    }
1220}