hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15
16/// Helper functions.
17mod helpers;
18
19/// Configuration-related public types
20pub mod config;
21/// Various public constants.
22pub mod constants;
23/// Lists all errors thrown from this library.
24pub mod errors;
25
26/// Utility module with helper types and functionality over hopr-lib behavior.
27pub mod utils;
28
29/// Public traits for interactions with this library.
30pub mod traits;
31
32/// Functionality related to the HOPR node state.
33pub mod state;
34
35#[cfg(any(feature = "testing", test))]
36pub mod testing;
37
38/// Exports of libraries necessary for API and interface operations.
39#[doc(hidden)]
40pub mod exports {
41    pub use hopr_api as api;
42
43    pub mod types {
44        pub use hopr_internal_types as internal;
45        pub use hopr_primitive_types as primitive;
46    }
47
48    pub mod crypto {
49        pub use hopr_crypto_keypair as keypair;
50        pub use hopr_crypto_types as types;
51    }
52
53    pub mod network {
54        pub use hopr_network_types as types;
55    }
56
57    pub use hopr_transport as transport;
58}
59
60/// Export of relevant types for easier integration.
61#[doc(hidden)]
62pub mod prelude {
63    pub use super::exports::{
64        crypto::{
65            keypair::key_pair::HoprKeys,
66            types::prelude::{ChainKeypair, Hash, OffchainKeypair},
67        },
68        network::types::{
69            prelude::ForeignDataMode,
70            udp::{ConnectedUdpStream, UdpStreamParallelism},
71        },
72        transport::{OffchainPublicKey, socket::HoprSocket},
73        types::primitive::prelude::Address,
74    };
75}
76
77use std::{
78    convert::identity,
79    num::NonZeroUsize,
80    sync::{Arc, OnceLock, atomic::Ordering},
81    time::Duration,
82};
83
84use futures::{FutureExt, SinkExt, Stream, StreamExt, TryFutureExt, channel::mpsc::channel};
85pub use hopr_api::db::ChannelTicketStatistics;
86use hopr_api::{
87    chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
88    db::{HoprNodeDbApi, PeerStatus, TicketMarker, TicketSelector},
89};
90use hopr_async_runtime::prelude::spawn;
91pub use hopr_async_runtime::{Abortable, AbortableList};
92pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
93pub use hopr_crypto_types::prelude::*;
94pub use hopr_internal_types::prelude::*;
95pub use hopr_network_types::prelude::*;
96#[cfg(all(feature = "prometheus", not(test)))]
97use hopr_platform::time::native::current_time;
98pub use hopr_primitive_types::prelude::*;
99use hopr_transport::errors::HoprTransportError;
100#[cfg(feature = "runtime-tokio")]
101pub use hopr_transport::transfer_session;
102pub use hopr_transport::*;
103use tracing::{debug, error, info, warn};
104use validator::Validate;
105
106pub use crate::{
107    config::SafeModule,
108    constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
109    errors::{HoprLibError, HoprStatusError},
110    state::{HoprLibProcess, HoprState},
111    traits::chain::{CloseChannelResult, OpenChannelResult},
112};
113
114#[cfg(all(feature = "prometheus", not(test)))]
115lazy_static::lazy_static! {
116    static ref METRIC_PROCESS_START_TIME:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
117        "hopr_start_time",
118        "The unix timestamp in seconds at which the process was started"
119    ).unwrap();
120    static ref METRIC_HOPR_LIB_VERSION:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
121        "hopr_lib_version",
122        "Executed version of hopr-lib",
123        &["version"]
124    ).unwrap();
125    static ref METRIC_HOPR_NODE_INFO:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
126        "hopr_node_addresses",
127        "Node on-chain and off-chain addresses",
128        &["peerid", "address", "safe_address", "module_address"]
129    ).unwrap();
130}
131
132pub struct DummyCoverTrafficType {
133    #[allow(dead_code)]
134    _unconstructable: (),
135}
136
137impl TrafficGeneration for DummyCoverTrafficType {
138    fn build(
139        self,
140    ) -> (
141        impl futures::Stream<Item = DestinationRouting> + Send,
142        impl futures::Sink<
143            std::result::Result<hopr_transport::Telemetry, hopr_transport::ProbeError>,
144            Error = impl std::error::Error,
145        > + Send
146        + Sync
147        + Clone
148        + 'static,
149    ) {
150        (
151            futures::stream::empty(),
152            futures::sink::drain::<std::result::Result<hopr_transport::Telemetry, hopr_transport::ProbeError>>(),
153        )
154    }
155}
156
157/// Prepare an optimized version of the tokio runtime setup for hopr-lib specifically.
158///
159/// Divide the available CPU parallelism by 2, since half of the available threads are
160/// to be used for IO-bound and half for CPU-bound tasks.
161#[cfg(feature = "runtime-tokio")]
162pub fn prepare_tokio_runtime(
163    num_cpu_threads: Option<NonZeroUsize>,
164    num_io_threads: Option<NonZeroUsize>,
165) -> anyhow::Result<tokio::runtime::Runtime> {
166    use std::str::FromStr;
167    let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
168
169    hopr_parallelize::cpu::init_thread_pool(
170        num_cpu_threads
171            .map(|v| v.get())
172            .or(avail_parallelism)
173            .ok_or(anyhow::anyhow!(
174                "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
175                 environment variable."
176            ))?
177            .max(1),
178    )?;
179
180    Ok(tokio::runtime::Builder::new_multi_thread()
181        .enable_all()
182        .worker_threads(
183            num_io_threads
184                .map(|v| v.get())
185                .or(avail_parallelism)
186                .ok_or(anyhow::anyhow!(
187                    "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
188                     environment variable."
189                ))?
190                .max(1),
191        )
192        .thread_name("hoprd")
193        .thread_stack_size(
194            std::env::var("HOPRD_THREAD_STACK_SIZE")
195                .ok()
196                .and_then(|v| usize::from_str(&v).ok())
197                .unwrap_or(10 * 1024 * 1024)
198                .max(2 * 1024 * 1024),
199        )
200        .build()?)
201}
202
203/// Type alias used to send and receive transport data via a running HOPR node.
204pub type HoprTransportIO = socket::HoprSocket<
205    futures::channel::mpsc::Receiver<ApplicationDataIn>,
206    futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
207>;
208
209type NewTicketEvents = (
210    async_broadcast::Sender<VerifiedTicket>,
211    async_broadcast::InactiveReceiver<VerifiedTicket>,
212);
213
214/// HOPR main object providing the entire HOPR node functionality
215///
216/// Instantiating this object creates all processes and objects necessary for
217/// running the HOPR node. Once created, the node can be started using the
218/// `run()` method.
219///
220/// Externally offered API should be enough to perform all necessary tasks
221/// with the HOPR node manually, but it is advised to create such a configuration
222/// that manual interaction is unnecessary.
223///
224/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
225pub struct Hopr<Chain, Db> {
226    me: OffchainKeypair,
227    cfg: config::HoprLibConfig,
228    state: Arc<state::AtomicHoprState>,
229    transport_api: HoprTransport<Chain, Db>,
230    redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
231    node_db: Db,
232    chain_api: Chain,
233    winning_ticket_subscribers: NewTicketEvents,
234    processes: OnceLock<AbortableList<HoprLibProcess>>,
235}
236
237impl<Chain, Db> Hopr<Chain, Db>
238where
239    Chain: HoprChainApi + Clone + Send + Sync + 'static,
240    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
241{
242    pub async fn new(
243        identity: (&ChainKeypair, &OffchainKeypair),
244        hopr_chain_api: Chain,
245        hopr_node_db: Db,
246        cfg: config::HoprLibConfig,
247    ) -> errors::Result<Self> {
248        if hopr_crypto_random::is_rng_fixed() {
249            warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
250        }
251
252        cfg.validate()?;
253
254        let hopr_transport_api = HoprTransport::new(
255            identity,
256            hopr_chain_api.clone(),
257            hopr_node_db.clone(),
258            vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
259            cfg.protocol,
260        );
261
262        #[cfg(all(feature = "prometheus", not(test)))]
263        {
264            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
265            METRIC_HOPR_LIB_VERSION.set(
266                &[const_format::formatcp!("{}", constants::APP_VERSION)],
267                const_format::formatcp!(
268                    "{}.{}",
269                    env!("CARGO_PKG_VERSION_MAJOR"),
270                    env!("CARGO_PKG_VERSION_MINOR")
271                )
272                .parse()
273                .unwrap_or(0.0),
274            );
275
276            // Calling get_ticket_statistics will initialize the respective metrics on tickets
277            if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
278                error!(%error, "failed to initialize ticket statistics metrics");
279            }
280        }
281
282        let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
283        new_tickets_tx.set_await_active(false);
284        new_tickets_tx.set_overflow(true);
285
286        Ok(Self {
287            me: identity.1.clone(),
288            cfg,
289            state: Arc::new(state::AtomicHoprState::new(HoprState::Uninitialized)),
290            transport_api: hopr_transport_api,
291            chain_api: hopr_chain_api,
292            node_db: hopr_node_db,
293            redeem_requests: OnceLock::new(),
294            processes: OnceLock::new(),
295            winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
296        })
297    }
298
299    fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
300        if self.status() == state {
301            Ok(())
302        } else {
303            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
304        }
305    }
306
307    pub fn status(&self) -> HoprState {
308        self.state.load(Ordering::Relaxed)
309    }
310
311    pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
312        self.chain_api
313            .get_balance(self.me_onchain())
314            .await
315            .map_err(HoprLibError::chain)
316    }
317
318    pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
319        self.chain_api
320            .get_balance(self.cfg.safe_module.safe_address)
321            .await
322            .map_err(HoprLibError::chain)
323    }
324
325    pub async fn chain_info(&self) -> errors::Result<ChainInfo> {
326        self.chain_api.chain_info().await.map_err(HoprLibError::chain)
327    }
328
329    pub fn get_safe_config(&self) -> SafeModule {
330        self.cfg.safe_module.clone()
331    }
332
333    pub fn config(&self) -> &config::HoprLibConfig {
334        &self.cfg
335    }
336
337    #[inline]
338    fn is_public(&self) -> bool {
339        self.cfg.publish
340    }
341
342    pub async fn run<
343        Ct,
344        #[cfg(feature = "session-server")] T: traits::session::HoprSessionServer + Clone + Send + 'static,
345    >(
346        &self,
347        cover_traffic: Option<Ct>,
348        #[cfg(feature = "session-server")] serve_handler: T,
349    ) -> errors::Result<HoprTransportIO>
350    where
351        Ct: TrafficGeneration + Send + Sync + 'static,
352    {
353        self.error_if_not_in_state(
354            HoprState::Uninitialized,
355            "cannot start the hopr node multiple times".into(),
356        )?;
357
358        #[cfg(feature = "testing")]
359        warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
360
361        info!(
362            address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
363            "node is not started, please fund this node",
364        );
365
366        helpers::wait_for_funds(
367            *MIN_NATIVE_BALANCE,
368            *SUGGESTED_NATIVE_BALANCE,
369            Duration::from_secs(200),
370            self.me_onchain(),
371            &self.chain_api,
372        )
373        .await?;
374
375        let mut processes = AbortableList::<HoprLibProcess>::default();
376
377        info!("starting HOPR node...");
378        self.state.store(HoprState::Initializing, Ordering::Relaxed);
379
380        let balance: XDaiBalance = self.get_balance().await?;
381        let minimum_balance = *constants::MIN_NATIVE_BALANCE;
382
383        info!(
384            address = %self.me_onchain(),
385            %balance,
386            %minimum_balance,
387            "node information"
388        );
389
390        if balance.le(&minimum_balance) {
391            return Err(HoprLibError::GeneralError(
392                "cannot start the node without a sufficiently funded wallet".into(),
393            ));
394        }
395
396        // Once we are able to query the chain,
397        // check if the ticket price is configured correctly.
398        let network_min_ticket_price = self
399            .chain_api
400            .minimum_ticket_price()
401            .await
402            .map_err(HoprLibError::chain)?;
403        let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
404        if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
405            return Err(HoprLibError::GeneralError(format!(
406                "configured outgoing ticket price is lower than the network minimum ticket price: \
407                 {configured_ticket_price:?} < {network_min_ticket_price}"
408            )));
409        }
410        // Once we are able to query the chain,
411        // check if the winning probability is configured correctly.
412        let network_min_win_prob = self
413            .chain_api
414            .minimum_incoming_ticket_win_prob()
415            .await
416            .map_err(HoprLibError::chain)?;
417        let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
418        if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
419            && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
420        {
421            return Err(HoprLibError::GeneralError(format!(
422                "configured outgoing ticket winning probability is lower than the network minimum winning \
423                 probability: {configured_win_prob:?} < {network_min_win_prob}"
424            )));
425        }
426
427        // Calculate the minimum capacity based on accounts (each account can generate 2 messages),
428        // plus 100 as an additional buffer
429        let minimum_capacity = self
430            .chain_api
431            .count_accounts(AccountSelector {
432                public_only: true,
433                ..Default::default()
434            })
435            .await
436            .map_err(HoprLibError::chain)?
437            .saturating_mul(2)
438            .saturating_add(100);
439
440        let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
441            .ok()
442            .and_then(|s| s.trim().parse::<usize>().ok())
443            .filter(|&c| c > 0)
444            .unwrap_or(2048)
445            .max(minimum_capacity);
446
447        debug!(
448            capacity = chain_discovery_events_capacity,
449            minimum_required = minimum_capacity,
450            "creating chain discovery events channel"
451        );
452        let (indexer_peer_update_tx, indexer_peer_update_rx) =
453            channel::<PeerDiscovery>(chain_discovery_events_capacity);
454
455        // Stream all the existing announcements and also subscribe to all future on-chain
456        // announcements
457        let (announcements_stream, announcements_handle) = futures::stream::abortable(
458            self.chain_api
459                .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])
460                .map_err(HoprLibError::chain)?,
461        );
462        processes.insert(HoprLibProcess::AccountAnnouncements, announcements_handle);
463
464        spawn(
465            announcements_stream
466                .filter_map(|event| {
467                    futures::future::ready(event.try_as_announcement().map(|account| {
468                        PeerDiscovery::Announce(account.public_key.into(), account.get_multiaddrs().to_vec())
469                    }))
470                })
471                .map(Ok)
472                .forward(indexer_peer_update_tx)
473                .inspect(
474                    |_| warn!(task = %HoprLibProcess::AccountAnnouncements,"long-running background task finished"),
475                ),
476        );
477
478        info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
479
480        let safe_addr = self.cfg.safe_module.safe_address;
481
482        if self.me_onchain() == safe_addr {
483            return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
484        }
485
486        info!(%safe_addr, "registering safe with this node");
487        match self.chain_api.register_safe(&safe_addr).await {
488            Ok(awaiter) => {
489                // Wait until the registration is confirmed on-chain, otherwise we cannot proceed.
490                awaiter.await.map_err(HoprLibError::chain)?;
491                info!(%safe_addr, "safe successfully registered with this node");
492            }
493            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
494                info!(%safe_addr, "this safe is already registered with this node");
495            }
496            Err(error) => {
497                error!(%safe_addr, %error, "safe registration failed");
498                return Err(HoprLibError::chain(error));
499            }
500        }
501
502        // Only public nodes announce multiaddresses
503        let multiaddresses_to_announce = if self.is_public() {
504            // The multiaddresses are filtered for the non-private ones,
505            // unless `announce_local_addresses` is set to `true`.
506            self.transport_api.announceable_multiaddresses()
507        } else {
508            Vec::with_capacity(0)
509        };
510
511        // Warn when announcing a private multiaddress, which is acceptable in certain scenarios
512        multiaddresses_to_announce
513            .iter()
514            .filter(|a| !is_public_address(a))
515            .for_each(|multi_addr| tracing::warn!(?multi_addr, "announcing private multiaddress"));
516
517        // At this point the node is already registered with Safe, so
518        // we can announce via Safe-compliant TX
519        info!(?multiaddresses_to_announce, "announcing node on chain");
520        match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
521            Ok(awaiter) => {
522                // Wait until the announcement is confirmed on-chain, otherwise we cannot proceed.
523                awaiter.await.map_err(HoprLibError::chain)?;
524                info!(?multiaddresses_to_announce, "node has been successfully announced");
525            }
526            Err(AnnouncementError::AlreadyAnnounced) => {
527                info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
528            }
529            Err(error) => {
530                error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
531                return Err(HoprLibError::chain(error));
532            }
533        }
534
535        let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
536            .ok()
537            .and_then(|s| s.trim().parse::<usize>().ok())
538            .filter(|&c| c > 0)
539            .unwrap_or(256);
540
541        let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
542        #[cfg(feature = "session-server")]
543        {
544            debug!(capacity = incoming_session_channel_capacity, "creating session server");
545            processes.insert(
546                HoprLibProcess::SessionServer,
547                hopr_async_runtime::spawn_as_abortable!(
548                    _session_rx
549                        .for_each_concurrent(None, move |session| {
550                            let serve_handler = serve_handler.clone();
551                            async move {
552                                let session_id = *session.session.id();
553                                match serve_handler.process(session).await {
554                                    Ok(_) => debug!(?session_id, "client session processed successfully"),
555                                    Err(error) => error!(
556                                        ?session_id,
557                                        %error,
558                                        "client session processing failed"
559                                    ),
560                                }
561                            }
562                        })
563                        .inspect(|_| tracing::warn!(
564                            task = %HoprLibProcess::SessionServer,
565                            "long-running background task finished"
566                        ))
567                ),
568            );
569        }
570
571        info!("starting ticket events processor");
572        let (tickets_tx, tickets_rx) = channel(1024);
573        let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
574        processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
575        let node_db = self.node_db.clone();
576        let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
577        spawn(
578            tickets_rx
579                .filter_map(move |ticket_event| {
580                    let node_db = node_db.clone();
581                    async move {
582                        match ticket_event {
583                            TicketEvent::WinningTicket(winning) => {
584                                if let Err(error) = node_db.insert_ticket(*winning).await {
585                                    tracing::error!(%error, %winning, "failed to insert ticket into database");
586                                } else {
587                                    tracing::debug!(%winning, "inserted ticket into database");
588                                }
589                                Some(winning)
590                            }
591                            TicketEvent::RejectedTicket(rejected, issuer) => {
592                                if let Some(issuer) = &issuer {
593                                    if let Err(error) =
594                                        node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
595                                    {
596                                        tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
597                                    } else {
598                                        tracing::debug!(%rejected, "marked ticket as rejected");
599                                    }
600                                } else {
601                                    tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
602                                }
603                                None
604                            }
605                        }
606                    }
607                })
608                .for_each(move |ticket| {
609                    if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
610                        tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
611                    }
612                    futures::future::ready(())
613                })
614                .inspect(|_| {
615                    tracing::warn!(
616                        task = %HoprLibProcess::TicketEvents,
617                        "long-running background task finished"
618                    )
619                }),
620        );
621
622        info!("starting transport");
623        let (hopr_socket, transport_processes) = self
624            .transport_api
625            .run(cover_traffic, indexer_peer_update_rx, tickets_tx, session_tx)
626            .await?;
627        processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
628
629        // Start a queue that takes care of redeeming tickets via given TicketSelectors
630        let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
631        let _ = self.redeem_requests.set(redemption_req_tx);
632        let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
633        processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
634        let chain = self.chain_api.clone();
635        let node_db = self.node_db.clone();
636        spawn(redemption_req_rx
637            .for_each(move |selector| {
638                let chain = chain.clone();
639                let db = node_db.clone();
640                async move {
641                    match chain.redeem_tickets_via_selectors(&db, [selector]).await {
642                        Ok(res) => debug!(%res, "redemption complete"),
643                        Err(error) => error!(%error, "redemption failed"),
644                    }
645                }
646            })
647            .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
648        );
649
650        let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
651        processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
652        let chain = self.chain_api.clone();
653        let node_db = self.node_db.clone();
654        let events = chain.subscribe().map_err(HoprLibError::chain)?;
655        spawn(
656            futures::stream::Abortable::new(
657                events
658                    .filter_map(move |event|
659                        futures::future::ready(event.try_as_channel_closed())
660                    ),
661                chain_events_sub_reg
662            )
663            .for_each(move |closed_channel| {
664                let node_db = node_db.clone();
665                let chain = chain.clone();
666                async move {
667                    match closed_channel.direction(chain.me()) {
668                        Some(ChannelDirection::Incoming) => {
669                            match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
670                                Ok(num_neglected) if num_neglected > 0 => {
671                                    warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
672                                },
673                                Ok(_) => {
674                                    debug!(%closed_channel, "no neglected tickets on incoming closed channel");
675                                },
676                                Err(error) => {
677                                    error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
678                                }
679                            }
680                        },
681                        Some(ChannelDirection::Outgoing) => {
682                            if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
683                                error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
684                            } else {
685                                debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
686                            }
687                        }
688                        _ => {} // Event for a channel that is not our own
689                    }
690                }
691            })
692            .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
693        );
694
695        // NOTE: after the chain is synced, we can reset tickets which are considered
696        // redeemed but on-chain state does not align with that. This implies there was a problem
697        // right when the transaction was sent on-chain. In such cases, we simply let it retry and
698        // handle errors appropriately.
699        let mut channels = self
700            .chain_api
701            .stream_channels(ChannelSelector {
702                destination: self.me_onchain().into(),
703                ..Default::default()
704            })
705            .map_err(HoprLibError::chain)
706            .await?;
707
708        while let Some(channel) = channels.next().await {
709            self.node_db
710                .update_ticket_states_and_fetch(
711                    [TicketSelector::from(&channel)
712                        .with_state(AcknowledgedTicketStatus::BeingRedeemed)
713                        .with_index_range(channel.ticket_index..)],
714                    AcknowledgedTicketStatus::Untouched,
715                )
716                .map_err(HoprLibError::db)
717                .await?
718                .for_each(|ticket| {
719                    info!(%ticket, "fixed next out-of-sync ticket");
720                    futures::future::ready(())
721                })
722                .await;
723        }
724
725        self.state.store(HoprState::Running, Ordering::Relaxed);
726
727        info!(
728            id = %self.me_peer_id(),
729            version = constants::APP_VERSION,
730            "NODE STARTED AND RUNNING"
731        );
732
733        #[cfg(all(feature = "prometheus", not(test)))]
734        METRIC_HOPR_NODE_INFO.set(
735            &[
736                &self.me.public().to_peerid_str(),
737                &self.me_onchain().to_string(),
738                &self.cfg.safe_module.safe_address.to_string(),
739                &self.cfg.safe_module.module_address.to_string(),
740            ],
741            1.0,
742        );
743
744        let _ = self.processes.set(processes);
745        Ok(hopr_socket)
746    }
747
748    /// Used to practically shut down all node's processes without dropping the instance.
749    ///
750    /// This means that the instance can be used to retrieve some information, but all
751    /// active operations will stop and new will be impossible to perform.
752    /// Such operations will return [`HoprStatusError::NotThereYet`].
753    ///
754    /// This is the final state and cannot be reversed by calling [`HoprLib::run`] again.
755    pub fn shutdown(&self) -> Result<(), HoprLibError> {
756        self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
757        if let Some(processes) = self.processes.get() {
758            processes.abort_all();
759        }
760        self.state.store(HoprState::Terminated, Ordering::Relaxed);
761        info!("NODE SHUTDOWN COMPLETE");
762        Ok(())
763    }
764
765    /// Allows external users to receive notifications about new winning tickets.
766    pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send {
767        self.winning_ticket_subscribers.1.activate_cloned()
768    }
769
770    // p2p transport =========
771    /// Own PeerId used in the libp2p transport layer
772    pub fn me_peer_id(&self) -> PeerId {
773        (*self.me.public()).into()
774    }
775
776    /// Get the list of all announced public nodes in the network
777    pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
778        Ok(self
779            .chain_api
780            .stream_accounts(AccountSelector {
781                public_only: true,
782                ..Default::default()
783            })
784            .map_err(HoprLibError::chain)
785            .await?
786            .map(|entry| {
787                (
788                    PeerId::from(entry.public_key),
789                    entry.chain_addr,
790                    entry.get_multiaddrs().to_vec(),
791                )
792            })
793            .collect()
794            .await)
795    }
796
797    /// Ping another node in the network based on the PeerId
798    ///
799    /// Returns the RTT (round trip time), i.e. how long it took for the ping to return.
800    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
801        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
802
803        Ok(self.transport_api.ping(peer).await?)
804    }
805
806    /// Create a client session connection returning a session object that implements
807    /// [`futures::io::AsyncRead`] and [`futures::io::AsyncWrite`] and can bu used as a read/write binary session.
808    #[cfg(feature = "session-client")]
809    pub async fn connect_to(
810        &self,
811        destination: Address,
812        target: SessionTarget,
813        cfg: SessionClientConfig,
814    ) -> errors::Result<HoprSession> {
815        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
816
817        let backoff = backon::ConstantBuilder::default()
818            .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
819            .with_delay(self.cfg.protocol.session.establish_retry_timeout)
820            .with_jitter();
821
822        use backon::Retryable;
823
824        Ok((|| {
825            let cfg = cfg.clone();
826            let target = target.clone();
827            async { self.transport_api.new_session(destination, target, cfg).await }
828        })
829        .retry(backoff)
830        .sleep(backon::FuturesTimerSleeper)
831        .await?)
832    }
833
834    /// Sends keep-alive to the given [`HoprSessionId`], making sure the session is not
835    /// closed due to inactivity.
836    #[cfg(feature = "session-client")]
837    pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
838        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
839        Ok(self.transport_api.probe_session(id).await?)
840    }
841
842    #[cfg(feature = "session-client")]
843    pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
844        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
845        Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
846    }
847
848    #[cfg(feature = "session-client")]
849    pub async fn update_session_surb_balancer_config(
850        &self,
851        id: &SessionId,
852        cfg: SurbBalancerConfig,
853    ) -> errors::Result<()> {
854        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
855        Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
856    }
857
858    /// List all multiaddresses announced by this node
859    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
860        self.transport_api.local_multiaddresses()
861    }
862
863    /// List all multiaddresses on which the node is listening
864    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
865        self.transport_api.listening_multiaddresses().await
866    }
867
868    /// List all multiaddresses observed for a PeerId
869    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
870        self.transport_api.network_observed_multiaddresses(peer).await
871    }
872
873    /// List all multiaddresses announced on-chain for the given node.
874    pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> errors::Result<Vec<Multiaddr>> {
875        let peer = *peer;
876        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
877        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
878
879        match self
880            .chain_api
881            .stream_accounts(AccountSelector {
882                public_only: false,
883                offchain_key: Some(pubkey),
884                ..Default::default()
885            })
886            .map_err(HoprLibError::chain)
887            .await?
888            .next()
889            .await
890        {
891            Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
892            None => {
893                error!(%peer, "no information");
894                Ok(vec![])
895            }
896        }
897    }
898
899    // Network =========
900
901    /// Get measured network health
902    pub async fn network_health(&self) -> Health {
903        self.transport_api.network_health().await
904    }
905
906    /// List all peers connected to this
907    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
908        Ok(self.transport_api.network_connected_peers().await?)
909    }
910
911    /// Get all data collected from the network relevant for a PeerId
912    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
913        Ok(self.transport_api.network_peer_info(peer).await?)
914    }
915
916    /// Get peers connected peers with quality higher than some value
917    pub async fn all_network_peers(
918        &self,
919        minimum_quality: f64,
920    ) -> errors::Result<Vec<(Option<Address>, PeerId, PeerStatus)>> {
921        Ok(
922            futures::stream::iter(self.transport_api.network_connected_peers().await?)
923                .filter_map(|peer| async move {
924                    if let Ok(Some(info)) = self.transport_api.network_peer_info(&peer).await {
925                        if info.get_average_quality() >= minimum_quality {
926                            Some((peer, info))
927                        } else {
928                            None
929                        }
930                    } else {
931                        None
932                    }
933                })
934                .filter_map(|(peer_id, info)| async move {
935                    let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
936                    Some((address, peer_id, info))
937                })
938                .collect::<Vec<_>>()
939                .await,
940        )
941    }
942
943    // Ticket ========
944    /// Get all tickets in a channel specified by [`channel_id`](ChannelId).
945    pub async fn tickets_in_channel(&self, channel_id: &ChannelId) -> errors::Result<Option<Vec<RedeemableTicket>>> {
946        if let Some(channel) = self
947            .chain_api
948            .channel_by_id(channel_id)
949            .await
950            .map_err(|e| HoprTransportError::Other(e.into()))?
951        {
952            if &channel.destination == self.chain_api.me() {
953                Ok(Some(
954                    self.node_db
955                        .stream_tickets([&channel])
956                        .await
957                        .map_err(HoprLibError::db)?
958                        .collect()
959                        .await,
960                ))
961            } else {
962                Ok(None)
963            }
964        } else {
965            Ok(None)
966        }
967    }
968
969    /// Get all tickets
970    pub async fn all_tickets(&self) -> errors::Result<Vec<VerifiedTicket>> {
971        Ok(self
972            .node_db
973            .stream_tickets(None::<TicketSelector>)
974            .await
975            .map_err(HoprLibError::db)?
976            .map(|v| v.ticket)
977            .collect()
978            .await)
979    }
980
981    /// Get statistics for all tickets
982    pub async fn ticket_statistics(&self) -> errors::Result<ChannelTicketStatistics> {
983        self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
984    }
985
986    /// Reset the ticket metrics to zero
987    pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
988        self.node_db
989            .reset_ticket_statistics()
990            .await
991            .map_err(HoprLibError::chain)
992    }
993
994    // Chain =========
995    pub fn me_onchain(&self) -> Address {
996        *self.chain_api.me()
997    }
998
999    /// Get ticket price
1000    pub async fn get_ticket_price(&self) -> errors::Result<HoprBalance> {
1001        self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1002    }
1003
1004    /// Get minimum incoming ticket winning probability
1005    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
1006        self.chain_api
1007            .minimum_incoming_ticket_win_prob()
1008            .await
1009            .map_err(HoprLibError::chain)
1010    }
1011
1012    /// List of all accounts announced on the chain
1013    pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1014        Ok(self
1015            .chain_api
1016            .stream_accounts(AccountSelector {
1017                public_only: true,
1018                ..Default::default()
1019            })
1020            .map_err(HoprLibError::chain)
1021            .await?
1022            .collect()
1023            .await)
1024    }
1025
1026    /// Get the channel entry from Hash.
1027    /// @returns the channel entry of those two nodes
1028    pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1029        self.chain_api
1030            .channel_by_id(channel_id)
1031            .await
1032            .map_err(HoprLibError::chain)
1033    }
1034
1035    /// Get the channel entry between source and destination node.
1036    /// @param src Address
1037    /// @param dest Address
1038    /// @returns the channel entry of those two nodes
1039    pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<Option<ChannelEntry>> {
1040        self.chain_api
1041            .channel_by_parties(src, dest)
1042            .await
1043            .map_err(HoprLibError::chain)
1044    }
1045
1046    /// List all channels open from a specified Address
1047    pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1048        Ok(self
1049            .chain_api
1050            .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1051                ChannelStatusDiscriminants::Closed,
1052                ChannelStatusDiscriminants::Open,
1053                ChannelStatusDiscriminants::PendingToClose,
1054            ]))
1055            .map_err(HoprLibError::chain)
1056            .await?
1057            .collect()
1058            .await)
1059    }
1060
1061    /// List all channels open to a specified address
1062    pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1063        Ok(self
1064            .chain_api
1065            .stream_channels(
1066                ChannelSelector::default()
1067                    .with_destination(*dest)
1068                    .with_allowed_states(&[
1069                        ChannelStatusDiscriminants::Closed,
1070                        ChannelStatusDiscriminants::Open,
1071                        ChannelStatusDiscriminants::PendingToClose,
1072                    ]),
1073            )
1074            .map_err(HoprLibError::chain)
1075            .await?
1076            .collect()
1077            .await)
1078    }
1079
1080    /// List all channels
1081    pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1082        Ok(self
1083            .chain_api
1084            .stream_channels(ChannelSelector::default().with_allowed_states(&[
1085                ChannelStatusDiscriminants::Closed,
1086                ChannelStatusDiscriminants::Open,
1087                ChannelStatusDiscriminants::PendingToClose,
1088            ]))
1089            .map_err(HoprLibError::chain)
1090            .await?
1091            .collect()
1092            .await)
1093    }
1094
1095    /// Current safe allowance balance
1096    pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1097        self.chain_api
1098            .safe_allowance(self.cfg.safe_module.safe_address)
1099            .await
1100            .map_err(HoprLibError::chain)
1101    }
1102
1103    /// Withdraw on-chain assets to a given address
1104    /// @param recipient the account where the assets should be transferred to
1105    /// @param amount how many tokens to be transferred
1106    pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1107        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1108
1109        self.chain_api
1110            .withdraw(amount, &recipient)
1111            .and_then(identity)
1112            .map_err(HoprLibError::chain)
1113            .await
1114    }
1115
1116    /// Withdraw on-chain native assets to a given address
1117    /// @param recipient the account where the assets should be transferred to
1118    /// @param amount how many tokens to be transferred
1119    pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<prelude::Hash> {
1120        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1121
1122        self.chain_api
1123            .withdraw(amount, &recipient)
1124            .and_then(identity)
1125            .map_err(HoprLibError::chain)
1126            .await
1127    }
1128
1129    pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1130        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1131
1132        let (channel_id, tx_hash) = self
1133            .chain_api
1134            .open_channel(destination, amount)
1135            .and_then(identity)
1136            .map_err(HoprLibError::chain)
1137            .await?;
1138
1139        Ok(OpenChannelResult { tx_hash, channel_id })
1140    }
1141
1142    pub async fn fund_channel(&self, channel_id: &prelude::Hash, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1143        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1144
1145        self.chain_api
1146            .fund_channel(channel_id, amount)
1147            .and_then(identity)
1148            .map_err(HoprLibError::chain)
1149            .await
1150    }
1151
1152    pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> errors::Result<CloseChannelResult> {
1153        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1154
1155        let tx_hash = self
1156            .chain_api
1157            .close_channel(channel_id)
1158            .and_then(identity)
1159            .map_err(HoprLibError::chain)
1160            .await?;
1161
1162        Ok(CloseChannelResult { tx_hash })
1163    }
1164
1165    pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1166        self.chain_api
1167            .channel_closure_notice_period()
1168            .await
1169            .map_err(HoprLibError::chain)
1170    }
1171
1172    pub fn redemption_requests(
1173        &self,
1174    ) -> errors::Result<impl futures::Sink<TicketSelector, Error = HoprLibError> + Clone> {
1175        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1176
1177        // TODO: add universal timeout sink here
1178        Ok(self
1179            .redeem_requests
1180            .get()
1181            .cloned()
1182            .expect("redeem_requests is not initialized")
1183            .sink_map_err(HoprLibError::other))
1184    }
1185
1186    pub async fn redeem_all_tickets<B: Into<HoprBalance>>(&self, min_value: B) -> errors::Result<()> {
1187        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1188
1189        let min_value = min_value.into();
1190
1191        self.chain_api
1192            .stream_channels(
1193                ChannelSelector::default()
1194                    .with_destination(self.me_onchain())
1195                    .with_allowed_states(&[
1196                        ChannelStatusDiscriminants::Open,
1197                        ChannelStatusDiscriminants::PendingToClose,
1198                    ]),
1199            )
1200            .map_err(HoprLibError::chain)
1201            .await?
1202            .map(|channel| {
1203                Ok(TicketSelector::from(&channel)
1204                    .with_amount(min_value..)
1205                    .with_index_range(channel.ticket_index..)
1206                    .with_state(AcknowledgedTicketStatus::Untouched))
1207            })
1208            .forward(self.redemption_requests()?)
1209            .await?;
1210
1211        Ok(())
1212    }
1213
1214    pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1215        &self,
1216        counterparty: &Address,
1217        min_value: B,
1218    ) -> errors::Result<()> {
1219        self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1220            .await
1221    }
1222
1223    pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1224        &self,
1225        channel_id: &Hash,
1226        min_value: B,
1227    ) -> errors::Result<()> {
1228        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1229
1230        let channel = self
1231            .chain_api
1232            .channel_by_id(channel_id)
1233            .await
1234            .map_err(HoprLibError::chain)?
1235            .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1236
1237        self.redemption_requests()?
1238            .send(
1239                TicketSelector::from(channel)
1240                    .with_amount(min_value.into()..)
1241                    .with_index_range(channel.ticket_index..)
1242                    .with_state(AcknowledgedTicketStatus::Untouched),
1243            )
1244            .await?;
1245
1246        Ok(())
1247    }
1248
1249    pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1250        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1251
1252        self.redemption_requests()?
1253            .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1254            .await?;
1255
1256        Ok(())
1257    }
1258
1259    pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1260        let peer_id = *peer_id;
1261        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
1262        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer_id))
1263            .await
1264            .map_err(|e| HoprLibError::GeneralError(format!("failed to convert peer id to off-chain key: {}", e)))?;
1265
1266        self.chain_api
1267            .packet_key_to_chain_key(&pubkey)
1268            .await
1269            .map_err(HoprLibError::chain)
1270    }
1271
1272    pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1273        self.chain_api
1274            .chain_key_to_packet_key(address)
1275            .await
1276            .map(|pk| pk.map(|v| v.into()))
1277            .map_err(HoprLibError::chain)
1278    }
1279}
1280
1281impl<Chain, Db> Hopr<Chain, Db> {
1282    // === telemetry
1283    /// Prometheus formatted metrics collected by the hopr-lib components.
1284    pub fn collect_hopr_metrics() -> errors::Result<String> {
1285        cfg_if::cfg_if! {
1286            if #[cfg(all(feature = "prometheus", not(test)))] {
1287                hopr_metrics::gather_all_metrics().map_err(|e| HoprLibError::Other(e.into()))
1288            } else {
1289                Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
1290            }
1291        }
1292    }
1293}