hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15
16/// Helper functions.
17mod helpers;
18
19/// Configuration-related public types
20pub mod config;
21/// Various public constants.
22pub mod constants;
23/// Lists all errors thrown from this library.
24pub mod errors;
25
26/// Utility module with helper types and functionality over hopr-lib behavior.
27pub mod utils;
28
29/// Public traits for interactions with this library.
30pub mod traits;
31
32/// Functionality related to the HOPR node state.
33pub mod state;
34
35#[cfg(any(feature = "testing", test))]
36pub mod testing;
37
38/// Exports of libraries necessary for API and interface operations.
39#[doc(hidden)]
40pub mod exports {
41    pub use hopr_api as api;
42
43    pub mod types {
44        pub use hopr_internal_types as internal;
45        pub use hopr_primitive_types as primitive;
46    }
47
48    pub mod crypto {
49        pub use hopr_crypto_keypair as keypair;
50        pub use hopr_crypto_types as types;
51    }
52
53    pub mod network {
54        pub use hopr_network_types as types;
55    }
56
57    pub use hopr_transport as transport;
58}
59
60/// Export of relevant types for easier integration.
61#[doc(hidden)]
62pub mod prelude {
63    pub use super::exports::{
64        crypto::{
65            keypair::key_pair::HoprKeys,
66            types::prelude::{ChainKeypair, Hash, OffchainKeypair},
67        },
68        network::types::{
69            prelude::ForeignDataMode,
70            udp::{ConnectedUdpStream, UdpStreamParallelism},
71        },
72        transport::{OffchainPublicKey, socket::HoprSocket},
73        types::primitive::prelude::Address,
74    };
75}
76
77use std::{
78    convert::identity,
79    num::NonZeroUsize,
80    sync::{Arc, OnceLock, atomic::Ordering},
81    time::Duration,
82};
83
84use futures::{FutureExt, SinkExt, StreamExt, TryFutureExt, channel::mpsc::channel};
85use hopr_api::{
86    chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
87    db::{HoprNodeDbApi, PeerStatus, TicketMarker, TicketSelector},
88};
89use hopr_async_runtime::prelude::spawn;
90pub use hopr_async_runtime::{Abortable, AbortableList};
91pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
92pub use hopr_crypto_types::prelude::*;
93pub use hopr_internal_types::prelude::*;
94pub use hopr_network_types::prelude::*;
95#[cfg(all(feature = "prometheus", not(test)))]
96use hopr_platform::time::native::current_time;
97pub use hopr_primitive_types::prelude::*;
98#[cfg(feature = "runtime-tokio")]
99pub use hopr_transport::transfer_session;
100pub use hopr_transport::*;
101use tracing::{debug, error, info, trace, warn};
102
103pub use crate::{
104    config::SafeModule,
105    constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
106    errors::{HoprLibError, HoprStatusError},
107    state::{HoprLibProcess, HoprState},
108    traits::chain::{CloseChannelResult, OpenChannelResult},
109};
110
111#[cfg(all(feature = "prometheus", not(test)))]
112lazy_static::lazy_static! {
113    static ref METRIC_PROCESS_START_TIME:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
114        "hopr_start_time",
115        "The unix timestamp in seconds at which the process was started"
116    ).unwrap();
117    static ref METRIC_HOPR_LIB_VERSION:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
118        "hopr_lib_version",
119        "Executed version of hopr-lib",
120        &["version"]
121    ).unwrap();
122    static ref METRIC_HOPR_NODE_INFO:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
123        "hopr_node_addresses",
124        "Node on-chain and off-chain addresses",
125        &["peerid", "address", "safe_address", "module_address"]
126    ).unwrap();
127}
128
129pub struct DummyCoverTrafficType {
130    #[allow(dead_code)]
131    _unconstructable: (),
132}
133
134impl TrafficGeneration for DummyCoverTrafficType {
135    fn build(
136        self,
137    ) -> (
138        impl futures::Stream<Item = DestinationRouting> + Send,
139        impl futures::Sink<
140            std::result::Result<hopr_transport::Telemetry, hopr_transport::ProbeError>,
141            Error = impl std::error::Error,
142        > + Send
143        + Sync
144        + Clone
145        + 'static,
146    ) {
147        (
148            futures::stream::empty(),
149            futures::sink::drain::<std::result::Result<hopr_transport::Telemetry, hopr_transport::ProbeError>>(),
150        )
151    }
152}
153
154/// Prepare an optimized version of the tokio runtime setup for hopr-lib specifically.
155///
156/// Divide the available CPU parallelism by 2, since half of the available threads are
157/// to be used for IO-bound and half for CPU-bound tasks.
158#[cfg(feature = "runtime-tokio")]
159pub fn prepare_tokio_runtime(
160    num_cpu_threads: Option<NonZeroUsize>,
161    num_io_threads: Option<NonZeroUsize>,
162) -> anyhow::Result<tokio::runtime::Runtime> {
163    use std::str::FromStr;
164    let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
165
166    hopr_parallelize::cpu::init_thread_pool(
167        num_cpu_threads
168            .map(|v| v.get())
169            .or(avail_parallelism)
170            .ok_or(anyhow::anyhow!(
171                "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
172                 environment variable."
173            ))?
174            .max(1),
175    )?;
176
177    Ok(tokio::runtime::Builder::new_multi_thread()
178        .enable_all()
179        .worker_threads(
180            num_io_threads
181                .map(|v| v.get())
182                .or(avail_parallelism)
183                .ok_or(anyhow::anyhow!(
184                    "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
185                     environment variable."
186                ))?
187                .max(1),
188        )
189        .thread_name("hoprd")
190        .thread_stack_size(
191            std::env::var("HOPRD_THREAD_STACK_SIZE")
192                .ok()
193                .and_then(|v| usize::from_str(&v).ok())
194                .unwrap_or(10 * 1024 * 1024)
195                .max(2 * 1024 * 1024),
196        )
197        .build()?)
198}
199
200/// Type alias used to send and receive transport data via a running HOPR node.
201pub type HoprTransportIO = socket::HoprSocket<
202    futures::channel::mpsc::Receiver<ApplicationDataIn>,
203    futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
204>;
205
206/// HOPR main object providing the entire HOPR node functionality
207///
208/// Instantiating this object creates all processes and objects necessary for
209/// running the HOPR node. Once created, the node can be started using the
210/// `run()` method.
211///
212/// Externally offered API should be enough to perform all necessary tasks
213/// with the HOPR node manually, but it is advised to create such a configuration
214/// that manual interaction is unnecessary.
215///
216/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
217pub struct Hopr<Chain, Db> {
218    me: OffchainKeypair,
219    cfg: config::HoprLibConfig,
220    state: Arc<state::AtomicHoprState>,
221    transport_api: HoprTransport<Db, Chain>,
222    redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
223    node_db: Db,
224    chain_api: Chain,
225    processes: OnceLock<AbortableList<HoprLibProcess>>,
226}
227
228impl<Chain, Db> Hopr<Chain, Db>
229where
230    Chain: HoprChainApi + Clone + Send + Sync + 'static,
231    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
232{
233    pub async fn new(
234        cfg: config::HoprLibConfig,
235        hopr_chain_api: Chain,
236        hopr_node_db: Db,
237        me: &OffchainKeypair,
238        me_onchain: &ChainKeypair,
239    ) -> errors::Result<Self> {
240        if hopr_crypto_random::is_rng_fixed() {
241            warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
242        }
243
244        let multiaddress: Multiaddr = (&cfg.host).try_into().map_err(HoprLibError::TransportError)?;
245
246        let my_multiaddresses = vec![multiaddress];
247
248        let hopr_transport_api = HoprTransport::new(
249            me,
250            me_onchain,
251            HoprTransportConfig {
252                transport: cfg.transport.clone(),
253                network: cfg.network_options.clone(),
254                protocol: cfg.protocol,
255                probe: cfg.probe,
256                session: cfg.session,
257            },
258            hopr_node_db.clone(),
259            hopr_chain_api.clone(),
260            my_multiaddresses,
261        );
262
263        #[cfg(all(feature = "prometheus", not(test)))]
264        {
265            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
266            METRIC_HOPR_LIB_VERSION.set(
267                &[const_format::formatcp!("{}", constants::APP_VERSION)],
268                const_format::formatcp!(
269                    "{}.{}",
270                    env!("CARGO_PKG_VERSION_MAJOR"),
271                    env!("CARGO_PKG_VERSION_MINOR")
272                )
273                .parse()
274                .unwrap_or(0.0),
275            );
276
277            // Calling get_ticket_statistics will initialize the respective metrics on tickets
278            if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
279                error!(%error, "failed to initialize ticket statistics metrics");
280            }
281        }
282
283        Ok(Self {
284            me: me.clone(),
285            cfg,
286            state: Arc::new(state::AtomicHoprState::new(HoprState::Uninitialized)),
287            transport_api: hopr_transport_api,
288            chain_api: hopr_chain_api,
289            node_db: hopr_node_db,
290            redeem_requests: OnceLock::new(),
291            processes: OnceLock::new(),
292        })
293    }
294
295    fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
296        if self.status() == state {
297            Ok(())
298        } else {
299            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
300        }
301    }
302
303    pub fn status(&self) -> HoprState {
304        self.state.load(Ordering::Relaxed)
305    }
306
307    pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
308        self.chain_api
309            .get_balance(self.me_onchain())
310            .await
311            .map_err(HoprLibError::chain)
312    }
313
314    pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
315        self.chain_api
316            .get_balance(self.cfg.safe_module.safe_address)
317            .await
318            .map_err(HoprLibError::chain)
319    }
320
321    pub async fn chain_info(&self) -> errors::Result<ChainInfo> {
322        self.chain_api.chain_info().await.map_err(HoprLibError::chain)
323    }
324
325    pub fn get_safe_config(&self) -> SafeModule {
326        self.cfg.safe_module.clone()
327    }
328
329    pub fn config(&self) -> &config::HoprLibConfig {
330        &self.cfg
331    }
332
333    #[inline]
334    fn is_public(&self) -> bool {
335        self.cfg.publish
336    }
337
338    pub async fn run<
339        Ct,
340        #[cfg(feature = "session-server")] T: traits::session::HoprSessionServer + Clone + Send + 'static,
341    >(
342        &self,
343        cover_traffic: Option<Ct>,
344        #[cfg(feature = "session-server")] serve_handler: T,
345    ) -> errors::Result<HoprTransportIO>
346    where
347        Ct: TrafficGeneration + Send + Sync + 'static,
348    {
349        self.error_if_not_in_state(
350            HoprState::Uninitialized,
351            "cannot start the hopr node multiple times".into(),
352        )?;
353
354        #[cfg(feature = "testing")]
355        warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
356
357        info!(
358            address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
359            "node is not started, please fund this node",
360        );
361
362        helpers::wait_for_funds(
363            *MIN_NATIVE_BALANCE,
364            *SUGGESTED_NATIVE_BALANCE,
365            Duration::from_secs(200),
366            self.me_onchain(),
367            &self.chain_api,
368        )
369        .await?;
370
371        let mut processes = AbortableList::<HoprLibProcess>::default();
372
373        info!("starting HOPR node...");
374        self.state.store(HoprState::Initializing, Ordering::Relaxed);
375
376        let balance: XDaiBalance = self.get_balance().await?;
377        let minimum_balance = *constants::MIN_NATIVE_BALANCE;
378
379        info!(
380            address = %self.me_onchain(),
381            %balance,
382            %minimum_balance,
383            "node information"
384        );
385
386        if balance.le(&minimum_balance) {
387            return Err(HoprLibError::GeneralError(
388                "cannot start the node without a sufficiently funded wallet".into(),
389            ));
390        }
391
392        // Once we are able to query the chain,
393        // check if the ticket price is configured correctly.
394        let network_min_ticket_price = self
395            .chain_api
396            .minimum_ticket_price()
397            .await
398            .map_err(HoprLibError::chain)?;
399        let configured_ticket_price = self.cfg.protocol.outgoing_ticket_price;
400        if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
401            return Err(HoprLibError::GeneralError(format!(
402                "configured outgoing ticket price is lower than the network minimum ticket price: \
403                 {configured_ticket_price:?} < {network_min_ticket_price}"
404            )));
405        }
406        // Once we are able to query the chain,
407        // check if the winning probability is configured correctly.
408        let network_min_win_prob = self
409            .chain_api
410            .minimum_incoming_ticket_win_prob()
411            .await
412            .map_err(HoprLibError::chain)?;
413        let configured_win_prob = self.cfg.protocol.outgoing_ticket_winning_prob;
414        if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
415            && configured_win_prob
416                .and_then(|c| WinningProbability::try_from(c).ok())
417                .is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
418        {
419            return Err(HoprLibError::GeneralError(format!(
420                "configured outgoing ticket winning probability is lower than the network minimum winning \
421                 probability: {configured_win_prob:?} < {network_min_win_prob}"
422            )));
423        }
424
425        // Calculate the minimum capacity based on accounts (each account can generate 2 messages),
426        // plus 100 as an additional buffer
427        let minimum_capacity = self
428            .chain_api
429            .count_accounts(AccountSelector {
430                public_only: true,
431                ..Default::default()
432            })
433            .await
434            .map_err(HoprLibError::chain)?
435            .saturating_mul(2)
436            .saturating_add(100);
437
438        let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
439            .ok()
440            .and_then(|s| s.trim().parse::<usize>().ok())
441            .filter(|&c| c > 0)
442            .unwrap_or(2048)
443            .max(minimum_capacity);
444
445        debug!(
446            capacity = chain_discovery_events_capacity,
447            minimum_required = minimum_capacity,
448            "creating chain discovery events channel"
449        );
450        let (indexer_peer_update_tx, indexer_peer_update_rx) =
451            channel::<PeerDiscovery>(chain_discovery_events_capacity);
452
453        // Stream all the existing announcements and also subscribe to all future on-chain
454        // announcements
455        let (announcements_stream, announcements_handle) = futures::stream::abortable(
456            self.chain_api
457                .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])
458                .map_err(HoprLibError::chain)?,
459        );
460        processes.insert(HoprLibProcess::AccountAnnouncements, announcements_handle);
461
462        spawn(
463            announcements_stream
464                .filter_map(|event| {
465                    futures::future::ready(event.try_as_announcement().map(|account| {
466                        PeerDiscovery::Announce(account.public_key.into(), account.get_multiaddrs().to_vec())
467                    }))
468                })
469                .map(Ok)
470                .forward(indexer_peer_update_tx)
471                .inspect(
472                    |_| warn!(task = %HoprLibProcess::AccountAnnouncements,"long-running background task finished"),
473                ),
474        );
475
476        info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
477
478        let safe_addr = self.cfg.safe_module.safe_address;
479
480        if self.me_onchain() == safe_addr {
481            return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
482        }
483
484        info!(%safe_addr, "registering safe with this node");
485        match self.chain_api.register_safe(&safe_addr).await {
486            Ok(awaiter) => {
487                // Wait until the registration is confirmed on-chain, otherwise we cannot proceed.
488                awaiter.await.map_err(HoprLibError::chain)?;
489                info!(%safe_addr, "safe successfully registered with this node");
490            }
491            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
492                info!(%safe_addr, "this safe is already registered with this node");
493            }
494            Err(error) => {
495                error!(%safe_addr, %error, "safe registration failed");
496                return Err(HoprLibError::chain(error));
497            }
498        }
499
500        // Only public nodes announce multiaddresses
501        let multiaddresses_to_announce = if self.is_public() {
502            // The multiaddresses are filtered for the non-private ones,
503            // unless `announce_local_addresses` is set to `true`.
504            self.transport_api.announceable_multiaddresses()
505        } else {
506            Vec::with_capacity(0)
507        };
508
509        // Warn when announcing a private multiaddress, which is acceptable in certain scenarios
510        multiaddresses_to_announce
511            .iter()
512            .filter(|a| !is_public_address(a))
513            .for_each(|multi_addr| tracing::warn!(?multi_addr, "announcing private multiaddress"));
514
515        // At this point the node is already registered with Safe, so
516        // we can announce via Safe-compliant TX
517        info!(?multiaddresses_to_announce, "announcing node on chain");
518        match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
519            Ok(awaiter) => {
520                // Wait until the announcement is confirmed on-chain, otherwise we cannot proceed.
521                awaiter.await.map_err(HoprLibError::chain)?;
522                info!(?multiaddresses_to_announce, "node has been successfully announced");
523            }
524            Err(AnnouncementError::AlreadyAnnounced) => {
525                info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
526            }
527            Err(error) => {
528                error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
529                return Err(HoprLibError::chain(error));
530            }
531        }
532
533        let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
534            .ok()
535            .and_then(|s| s.trim().parse::<usize>().ok())
536            .filter(|&c| c > 0)
537            .unwrap_or(256);
538
539        let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
540        #[cfg(feature = "session-server")]
541        {
542            debug!(capacity = incoming_session_channel_capacity, "creating session server");
543            processes.insert(
544                HoprLibProcess::SessionServer,
545                hopr_async_runtime::spawn_as_abortable!(
546                    _session_rx
547                        .for_each_concurrent(None, move |session| {
548                            let serve_handler = serve_handler.clone();
549                            async move {
550                                let session_id = *session.session.id();
551                                match serve_handler.process(session).await {
552                                    Ok(_) => debug!(?session_id, "client session processed successfully"),
553                                    Err(error) => error!(
554                                        ?session_id,
555                                        %error,
556                                        "client session processing failed"
557                                    ),
558                                }
559                            }
560                        })
561                        .inspect(|_| tracing::warn!(
562                            task = %HoprLibProcess::SessionServer,
563                            "long-running background task finished"
564                        ))
565                ),
566            );
567        }
568
569        info!("starting transport");
570
571        let (hopr_socket, transport_processes) = self
572            .transport_api
573            .run(cover_traffic, indexer_peer_update_rx, session_tx)
574            .await?;
575        processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
576
577        info!("starting outgoing ticket flush process");
578        let (index_flush_stream, index_flush_handle) =
579            futures::stream::abortable(futures_time::stream::interval(Duration::from_secs(5).into()));
580        processes.insert(HoprLibProcess::TicketIndexFlush, index_flush_handle);
581        let node_db = self.node_db.clone();
582        spawn(
583            index_flush_stream
584                .for_each(move |_| {
585                    let node_db = node_db.clone();
586                    async move {
587                        match node_db.persist_outgoing_ticket_indices().await {
588                            Ok(count) => trace!(count, "successfully flushed states of outgoing ticket indices"),
589                            Err(error) => error!(%error, "Failed to flush ticket indices"),
590                        }
591                    }
592                })
593                .inspect(|_| {
594                    tracing::warn!(
595                        task = %HoprLibProcess::TicketIndexFlush,
596                        "long-running background task finished"
597                    )
598                }),
599        );
600
601        // Start a queue that takes care of redeeming tickets via given TicketSelectors
602        let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
603        let _ = self.redeem_requests.set(redemption_req_tx);
604        let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
605        processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
606        let chain = self.chain_api.clone();
607        let node_db = self.node_db.clone();
608        spawn(redemption_req_rx
609            .for_each(move |selector| {
610                let chain = chain.clone();
611                let db = node_db.clone();
612                async move {
613                    match chain.redeem_tickets_via_selectors(&db, [selector]).await {
614                        Ok(res) => debug!(%res, "redemption complete"),
615                        Err(error) => error!(%error, "redemption failed"),
616                    }
617                }
618            })
619            .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
620        );
621
622        let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
623        processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
624        let chain = self.chain_api.clone();
625        let node_db = self.node_db.clone();
626        let events = chain.subscribe().map_err(HoprLibError::chain)?;
627        spawn(
628            futures::stream::Abortable::new(
629                events
630                    .filter_map(move |event|
631                        futures::future::ready(event.try_as_channel_closed())
632                    ),
633                chain_events_sub_reg
634            )
635            .for_each(move |closed_channel| {
636                let node_db = node_db.clone();
637                let chain = chain.clone();
638                async move {
639                    match closed_channel.direction(chain.me()) {
640                        Some(ChannelDirection::Incoming) => {
641                            match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
642                                Ok(num_neglected) if num_neglected > 0 => {
643                                    warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
644                                },
645                                Ok(_) => {
646                                    debug!(%closed_channel, "no neglected tickets on incoming closed channel");
647                                },
648                                Err(error) => {
649                                    error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
650                                }
651                            }
652                        },
653                        Some(ChannelDirection::Outgoing) => {
654                            if let Err(error) = node_db.reset_outgoing_ticket_index(closed_channel.get_id()).await {
655                                error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
656                            } else {
657                                debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
658                            }
659                        }
660                        _ => {} // Event for a channel that is not our own
661                    }
662                }
663            })
664            .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
665        );
666
667        // NOTE: after the chain is synced, we can reset tickets which are considered
668        // redeemed but on-chain state does not align with that. This implies there was a problem
669        // right when the transaction was sent on-chain. In such cases, we simply let it retry and
670        // handle errors appropriately.
671        let mut channels = self
672            .chain_api
673            .stream_channels(ChannelSelector {
674                destination: self.me_onchain().into(),
675                ..Default::default()
676            })
677            .map_err(HoprLibError::chain)
678            .await?;
679
680        while let Some(channel) = channels.next().await {
681            self.node_db
682                .update_ticket_states_and_fetch(
683                    [TicketSelector::from(&channel)
684                        .with_state(AcknowledgedTicketStatus::BeingRedeemed)
685                        .with_index_range(channel.ticket_index..)],
686                    AcknowledgedTicketStatus::Untouched,
687                )
688                .map_err(HoprLibError::db)
689                .await?
690                .for_each(|ticket| {
691                    info!(%ticket, "fixed next out-of-sync ticket");
692                    futures::future::ready(())
693                })
694                .await;
695        }
696
697        self.state.store(HoprState::Running, Ordering::Relaxed);
698
699        info!(
700            id = %self.me_peer_id(),
701            version = constants::APP_VERSION,
702            "NODE STARTED AND RUNNING"
703        );
704
705        #[cfg(all(feature = "prometheus", not(test)))]
706        METRIC_HOPR_NODE_INFO.set(
707            &[
708                &self.me.public().to_peerid_str(),
709                &self.me_onchain().to_string(),
710                &self.cfg.safe_module.safe_address.to_string(),
711                &self.cfg.safe_module.module_address.to_string(),
712            ],
713            1.0,
714        );
715
716        let _ = self.processes.set(processes);
717        Ok(hopr_socket)
718    }
719
720    /// Used to practically shut down all node's processes without dropping the instance.
721    ///
722    /// This means that the instance can be used to retrieve some information, but all
723    /// active operations will stop and new will be impossible to perform.
724    /// Such operations will return [`HoprStatusError::NotThereYet`].
725    ///
726    /// This is the final state and cannot be reversed by calling [`HoprLib::run`] again.
727    pub fn shutdown(&self) -> Result<(), HoprLibError> {
728        self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
729        if let Some(processes) = self.processes.get() {
730            processes.abort_all();
731        }
732        self.state.store(HoprState::Terminated, Ordering::Relaxed);
733        info!("NODE SHUTDOWN COMPLETE");
734        Ok(())
735    }
736
737    // p2p transport =========
738    /// Own PeerId used in the libp2p transport layer
739    pub fn me_peer_id(&self) -> PeerId {
740        (*self.me.public()).into()
741    }
742
743    /// Get the list of all announced public nodes in the network
744    pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
745        Ok(self
746            .chain_api
747            .stream_accounts(AccountSelector {
748                public_only: true,
749                ..Default::default()
750            })
751            .map_err(HoprLibError::chain)
752            .await?
753            .map(|entry| {
754                (
755                    PeerId::from(entry.public_key),
756                    entry.chain_addr,
757                    entry.get_multiaddrs().to_vec(),
758                )
759            })
760            .collect()
761            .await)
762    }
763
764    /// Ping another node in the network based on the PeerId
765    ///
766    /// Returns the RTT (round trip time), i.e. how long it took for the ping to return.
767    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
768        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
769
770        Ok(self.transport_api.ping(peer).await?)
771    }
772
773    /// Create a client session connection returning a session object that implements
774    /// [`futures::io::AsyncRead`] and [`futures::io::AsyncWrite`] and can bu used as a read/write binary session.
775    #[cfg(feature = "session-client")]
776    pub async fn connect_to(
777        &self,
778        destination: Address,
779        target: SessionTarget,
780        cfg: SessionClientConfig,
781    ) -> errors::Result<HoprSession> {
782        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
783
784        let backoff = backon::ConstantBuilder::default()
785            .with_max_times(self.cfg.session.establish_max_retries as usize)
786            .with_delay(self.cfg.session.establish_retry_timeout)
787            .with_jitter();
788
789        use backon::Retryable;
790
791        Ok((|| {
792            let cfg = cfg.clone();
793            let target = target.clone();
794            async { self.transport_api.new_session(destination, target, cfg).await }
795        })
796        .retry(backoff)
797        .sleep(backon::FuturesTimerSleeper)
798        .await?)
799    }
800
801    /// Sends keep-alive to the given [`HoprSessionId`], making sure the session is not
802    /// closed due to inactivity.
803    #[cfg(feature = "session-client")]
804    pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
805        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
806        Ok(self.transport_api.probe_session(id).await?)
807    }
808
809    #[cfg(feature = "session-client")]
810    pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
811        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
812        Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
813    }
814
815    #[cfg(feature = "session-client")]
816    pub async fn update_session_surb_balancer_config(
817        &self,
818        id: &SessionId,
819        cfg: SurbBalancerConfig,
820    ) -> errors::Result<()> {
821        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
822        Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
823    }
824
825    /// List all multiaddresses announced by this node
826    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
827        self.transport_api.local_multiaddresses()
828    }
829
830    /// List all multiaddresses on which the node is listening
831    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
832        self.transport_api.listening_multiaddresses().await
833    }
834
835    /// List all multiaddresses observed for a PeerId
836    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
837        self.transport_api.network_observed_multiaddresses(peer).await
838    }
839
840    /// List all multiaddresses announced on-chain for the given node.
841    pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> errors::Result<Vec<Multiaddr>> {
842        let peer = *peer;
843        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
844        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
845
846        match self
847            .chain_api
848            .stream_accounts(AccountSelector {
849                public_only: false,
850                offchain_key: Some(pubkey),
851                ..Default::default()
852            })
853            .map_err(HoprLibError::chain)
854            .await?
855            .next()
856            .await
857        {
858            Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
859            None => {
860                error!(%peer, "no information");
861                Ok(vec![])
862            }
863        }
864    }
865
866    // Network =========
867
868    /// Get measured network health
869    pub async fn network_health(&self) -> Health {
870        self.transport_api.network_health().await
871    }
872
873    /// List all peers connected to this
874    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
875        Ok(self.transport_api.network_connected_peers().await?)
876    }
877
878    /// Get all data collected from the network relevant for a PeerId
879    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
880        Ok(self.transport_api.network_peer_info(peer).await?)
881    }
882
883    /// Get peers connected peers with quality higher than some value
884    pub async fn all_network_peers(
885        &self,
886        minimum_quality: f64,
887    ) -> errors::Result<Vec<(Option<Address>, PeerId, PeerStatus)>> {
888        Ok(
889            futures::stream::iter(self.transport_api.network_connected_peers().await?)
890                .filter_map(|peer| async move {
891                    if let Ok(Some(info)) = self.transport_api.network_peer_info(&peer).await {
892                        if info.get_average_quality() >= minimum_quality {
893                            Some((peer, info))
894                        } else {
895                            None
896                        }
897                    } else {
898                        None
899                    }
900                })
901                .filter_map(|(peer_id, info)| async move {
902                    let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
903                    Some((address, peer_id, info))
904                })
905                .collect::<Vec<_>>()
906                .await,
907        )
908    }
909
910    // Ticket ========
911    /// Get all tickets in a channel specified by [`prelude::Hash`]
912    pub async fn tickets_in_channel(&self, channel: &prelude::Hash) -> errors::Result<Option<Vec<RedeemableTicket>>> {
913        Ok(self.transport_api.tickets_in_channel(channel).await?)
914    }
915
916    /// Get all tickets
917    pub async fn all_tickets(&self) -> errors::Result<Vec<VerifiedTicket>> {
918        Ok(self.transport_api.all_tickets().await?)
919    }
920
921    /// Get statistics for all tickets
922    pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
923        Ok(self.transport_api.ticket_statistics().await?)
924    }
925
926    /// Reset the ticket metrics to zero
927    pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
928        self.node_db
929            .reset_ticket_statistics()
930            .await
931            .map_err(HoprLibError::chain)
932    }
933
934    // Chain =========
935    pub fn me_onchain(&self) -> Address {
936        *self.chain_api.me()
937    }
938
939    /// Get ticket price
940    pub async fn get_ticket_price(&self) -> errors::Result<HoprBalance> {
941        self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
942    }
943
944    /// Get minimum incoming ticket winning probability
945    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
946        self.chain_api
947            .minimum_incoming_ticket_win_prob()
948            .await
949            .map_err(HoprLibError::chain)
950    }
951
952    /// List of all accounts announced on the chain
953    pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
954        Ok(self
955            .chain_api
956            .stream_accounts(AccountSelector {
957                public_only: true,
958                ..Default::default()
959            })
960            .map_err(HoprLibError::chain)
961            .await?
962            .collect()
963            .await)
964    }
965
966    /// Get the channel entry from Hash.
967    /// @returns the channel entry of those two nodes
968    pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
969        self.chain_api
970            .channel_by_id(channel_id)
971            .await
972            .map_err(HoprLibError::chain)
973    }
974
975    /// Get the channel entry between source and destination node.
976    /// @param src Address
977    /// @param dest Address
978    /// @returns the channel entry of those two nodes
979    pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<Option<ChannelEntry>> {
980        self.chain_api
981            .channel_by_parties(src, dest)
982            .await
983            .map_err(HoprLibError::chain)
984    }
985
986    /// List all channels open from a specified Address
987    pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
988        Ok(self
989            .chain_api
990            .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
991                ChannelStatusDiscriminants::Closed,
992                ChannelStatusDiscriminants::Open,
993                ChannelStatusDiscriminants::PendingToClose,
994            ]))
995            .map_err(HoprLibError::chain)
996            .await?
997            .collect()
998            .await)
999    }
1000
1001    /// List all channels open to a specified address
1002    pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1003        Ok(self
1004            .chain_api
1005            .stream_channels(
1006                ChannelSelector::default()
1007                    .with_destination(*dest)
1008                    .with_allowed_states(&[
1009                        ChannelStatusDiscriminants::Closed,
1010                        ChannelStatusDiscriminants::Open,
1011                        ChannelStatusDiscriminants::PendingToClose,
1012                    ]),
1013            )
1014            .map_err(HoprLibError::chain)
1015            .await?
1016            .collect()
1017            .await)
1018    }
1019
1020    /// List all channels
1021    pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1022        Ok(self
1023            .chain_api
1024            .stream_channels(ChannelSelector::default().with_allowed_states(&[
1025                ChannelStatusDiscriminants::Closed,
1026                ChannelStatusDiscriminants::Open,
1027                ChannelStatusDiscriminants::PendingToClose,
1028            ]))
1029            .map_err(HoprLibError::chain)
1030            .await?
1031            .collect()
1032            .await)
1033    }
1034
1035    /// Current safe allowance balance
1036    pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1037        self.chain_api
1038            .safe_allowance(self.cfg.safe_module.safe_address)
1039            .await
1040            .map_err(HoprLibError::chain)
1041    }
1042
1043    /// Withdraw on-chain assets to a given address
1044    /// @param recipient the account where the assets should be transferred to
1045    /// @param amount how many tokens to be transferred
1046    pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1047        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1048
1049        self.chain_api
1050            .withdraw(amount, &recipient)
1051            .and_then(identity)
1052            .map_err(HoprLibError::chain)
1053            .await
1054    }
1055
1056    /// Withdraw on-chain native assets to a given address
1057    /// @param recipient the account where the assets should be transferred to
1058    /// @param amount how many tokens to be transferred
1059    pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<prelude::Hash> {
1060        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1061
1062        self.chain_api
1063            .withdraw(amount, &recipient)
1064            .and_then(identity)
1065            .map_err(HoprLibError::chain)
1066            .await
1067    }
1068
1069    pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1070        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1071
1072        let (channel_id, tx_hash) = self
1073            .chain_api
1074            .open_channel(destination, amount)
1075            .and_then(identity)
1076            .map_err(HoprLibError::chain)
1077            .await?;
1078
1079        Ok(OpenChannelResult { tx_hash, channel_id })
1080    }
1081
1082    pub async fn fund_channel(&self, channel_id: &prelude::Hash, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1083        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1084
1085        self.chain_api
1086            .fund_channel(channel_id, amount)
1087            .and_then(identity)
1088            .map_err(HoprLibError::chain)
1089            .await
1090    }
1091
1092    pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> errors::Result<CloseChannelResult> {
1093        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1094
1095        let tx_hash = self
1096            .chain_api
1097            .close_channel(channel_id)
1098            .and_then(identity)
1099            .map_err(HoprLibError::chain)
1100            .await?;
1101
1102        Ok(CloseChannelResult { tx_hash })
1103    }
1104
1105    pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1106        self.chain_api
1107            .channel_closure_notice_period()
1108            .await
1109            .map_err(HoprLibError::chain)
1110    }
1111
1112    pub fn redemption_requests(
1113        &self,
1114    ) -> errors::Result<impl futures::Sink<TicketSelector, Error = HoprLibError> + Clone> {
1115        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1116
1117        // TODO: add universal timeout sink here
1118        Ok(self
1119            .redeem_requests
1120            .get()
1121            .cloned()
1122            .expect("redeem_requests is not initialized")
1123            .sink_map_err(HoprLibError::other))
1124    }
1125
1126    pub async fn redeem_all_tickets<B: Into<HoprBalance>>(&self, min_value: B) -> errors::Result<()> {
1127        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1128
1129        let min_value = min_value.into();
1130
1131        self.chain_api
1132            .stream_channels(
1133                ChannelSelector::default()
1134                    .with_destination(self.me_onchain())
1135                    .with_allowed_states(&[
1136                        ChannelStatusDiscriminants::Open,
1137                        ChannelStatusDiscriminants::PendingToClose,
1138                    ]),
1139            )
1140            .map_err(HoprLibError::chain)
1141            .await?
1142            .map(|channel| {
1143                Ok(TicketSelector::from(&channel)
1144                    .with_amount(min_value..)
1145                    .with_index_range(channel.ticket_index..)
1146                    .with_state(AcknowledgedTicketStatus::Untouched))
1147            })
1148            .forward(self.redemption_requests()?)
1149            .await?;
1150
1151        Ok(())
1152    }
1153
1154    pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1155        &self,
1156        counterparty: &Address,
1157        min_value: B,
1158    ) -> errors::Result<()> {
1159        self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1160            .await
1161    }
1162
1163    pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1164        &self,
1165        channel_id: &Hash,
1166        min_value: B,
1167    ) -> errors::Result<()> {
1168        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1169
1170        let channel = self
1171            .chain_api
1172            .channel_by_id(channel_id)
1173            .await
1174            .map_err(HoprLibError::chain)?
1175            .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1176
1177        self.redemption_requests()?
1178            .send(
1179                TicketSelector::from(channel)
1180                    .with_amount(min_value.into()..)
1181                    .with_index_range(channel.ticket_index..)
1182                    .with_state(AcknowledgedTicketStatus::Untouched),
1183            )
1184            .await?;
1185
1186        Ok(())
1187    }
1188
1189    pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1190        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1191
1192        self.redemption_requests()?
1193            .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1194            .await?;
1195
1196        Ok(())
1197    }
1198
1199    pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1200        let peer_id = *peer_id;
1201        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
1202        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer_id))
1203            .await
1204            .map_err(|e| HoprLibError::GeneralError(format!("failed to convert peer id to off-chain key: {}", e)))?;
1205
1206        self.chain_api
1207            .packet_key_to_chain_key(&pubkey)
1208            .await
1209            .map_err(HoprLibError::chain)
1210    }
1211
1212    pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1213        self.chain_api
1214            .chain_key_to_packet_key(address)
1215            .await
1216            .map(|pk| pk.map(|v| v.into()))
1217            .map_err(HoprLibError::chain)
1218    }
1219}
1220
1221impl<Chain, Db> Hopr<Chain, Db> {
1222    // === telemetry
1223    /// Prometheus formatted metrics collected by the hopr-lib components.
1224    pub fn collect_hopr_metrics() -> errors::Result<String> {
1225        cfg_if::cfg_if! {
1226            if #[cfg(all(feature = "prometheus", not(test)))] {
1227                hopr_metrics::gather_all_metrics().map_err(|e| HoprLibError::Other(e.into()))
1228            } else {
1229                Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
1230            }
1231        }
1232    }
1233}