hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15
16/// Helper functions.
17mod helpers;
18
19/// Configuration-related public types
20pub mod config;
21/// Various public constants.
22pub mod constants;
23/// Lists all errors thrown from this library.
24pub mod errors;
25
26/// Utility module with helper types and functionality over hopr-lib behavior.
27pub mod utils;
28
29/// Public traits for interactions with this library.
30pub mod traits;
31
32/// Functionality related to the HOPR node state.
33pub mod state;
34
35#[cfg(any(feature = "testing", test))]
36pub mod testing;
37
38pub use hopr_api as api;
39
40/// Exports of libraries necessary for API and interface operations.
41#[doc(hidden)]
42pub mod exports {
43    pub mod types {
44        pub use hopr_internal_types as internal;
45        pub use hopr_primitive_types as primitive;
46    }
47
48    pub mod crypto {
49        pub use hopr_crypto_keypair as keypair;
50        pub use hopr_crypto_types as types;
51    }
52
53    pub mod network {
54        pub use hopr_network_types as types;
55    }
56
57    pub use hopr_transport as transport;
58}
59
60/// Export of relevant types for easier integration.
61#[doc(hidden)]
62pub mod prelude {
63    pub use super::exports::{
64        crypto::{
65            keypair::key_pair::HoprKeys,
66            types::prelude::{ChainKeypair, Hash, OffchainKeypair},
67        },
68        network::types::{
69            prelude::ForeignDataMode,
70            udp::{ConnectedUdpStream, UdpStreamParallelism},
71        },
72        transport::{OffchainPublicKey, socket::HoprSocket},
73        types::primitive::prelude::Address,
74    };
75}
76
77use std::{
78    convert::identity,
79    num::NonZeroUsize,
80    sync::{Arc, OnceLock, atomic::Ordering},
81    time::Duration,
82};
83
84use futures::{FutureExt, SinkExt, Stream, StreamExt, TryFutureExt, channel::mpsc::channel};
85use hopr_api::{
86    chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
87    ct::TrafficGeneration,
88    db::{HoprNodeDbApi, TicketMarker, TicketSelector},
89};
90pub use hopr_api::{db::ChannelTicketStatistics, network::Observable};
91use hopr_async_runtime::prelude::spawn;
92pub use hopr_async_runtime::{Abortable, AbortableList};
93pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
94pub use hopr_crypto_types::prelude::*;
95pub use hopr_internal_types::prelude::*;
96pub use hopr_network_types::prelude::*;
97#[cfg(all(feature = "prometheus", not(test)))]
98use hopr_platform::time::native::current_time;
99pub use hopr_primitive_types::prelude::*;
100use hopr_transport::errors::HoprTransportError;
101#[cfg(feature = "runtime-tokio")]
102pub use hopr_transport::transfer_session;
103pub use hopr_transport::*;
104use tracing::{debug, error, info, warn};
105use validator::Validate;
106
107pub use crate::{
108    config::SafeModule,
109    constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
110    errors::{HoprLibError, HoprStatusError},
111    state::{HoprLibProcess, HoprState},
112    traits::chain::{CloseChannelResult, OpenChannelResult},
113};
114
115#[cfg(all(feature = "prometheus", not(test)))]
116lazy_static::lazy_static! {
117    static ref METRIC_PROCESS_START_TIME:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
118        "hopr_start_time",
119        "The unix timestamp in seconds at which the process was started"
120    ).unwrap();
121    static ref METRIC_HOPR_LIB_VERSION:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
122        "hopr_lib_version",
123        "Executed version of hopr-lib",
124        &["version"]
125    ).unwrap();
126    static ref METRIC_HOPR_NODE_INFO:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
127        "hopr_node_addresses",
128        "Node on-chain and off-chain addresses",
129        &["peerid", "address", "safe_address", "module_address"]
130    ).unwrap();
131}
132
133/// Prepare an optimized version of the tokio runtime setup for hopr-lib specifically.
134///
135/// Divide the available CPU parallelism by 2, since half of the available threads are
136/// to be used for IO-bound and half for CPU-bound tasks.
137#[cfg(feature = "runtime-tokio")]
138pub fn prepare_tokio_runtime(
139    num_cpu_threads: Option<NonZeroUsize>,
140    num_io_threads: Option<NonZeroUsize>,
141) -> anyhow::Result<tokio::runtime::Runtime> {
142    use std::str::FromStr;
143    let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
144
145    hopr_parallelize::cpu::init_thread_pool(
146        num_cpu_threads
147            .map(|v| v.get())
148            .or(avail_parallelism)
149            .ok_or(anyhow::anyhow!(
150                "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
151                 environment variable."
152            ))?
153            .max(1),
154    )?;
155
156    Ok(tokio::runtime::Builder::new_multi_thread()
157        .enable_all()
158        .worker_threads(
159            num_io_threads
160                .map(|v| v.get())
161                .or(avail_parallelism)
162                .ok_or(anyhow::anyhow!(
163                    "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
164                     environment variable."
165                ))?
166                .max(1),
167        )
168        .thread_name("hoprd")
169        .thread_stack_size(
170            std::env::var("HOPRD_THREAD_STACK_SIZE")
171                .ok()
172                .and_then(|v| usize::from_str(&v).ok())
173                .unwrap_or(10 * 1024 * 1024)
174                .max(2 * 1024 * 1024),
175        )
176        .build()?)
177}
178
179/// Type alias used to send and receive transport data via a running HOPR node.
180pub type HoprTransportIO = socket::HoprSocket<
181    futures::channel::mpsc::Receiver<ApplicationDataIn>,
182    futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
183>;
184
185type NewTicketEvents = (
186    async_broadcast::Sender<VerifiedTicket>,
187    async_broadcast::InactiveReceiver<VerifiedTicket>,
188);
189
190/// HOPR main object providing the entire HOPR node functionality
191///
192/// Instantiating this object creates all processes and objects necessary for
193/// running the HOPR node. Once created, the node can be started using the
194/// `run()` method.
195///
196/// Externally offered API should be enough to perform all necessary tasks
197/// with the HOPR node manually, but it is advised to create such a configuration
198/// that manual interaction is unnecessary.
199///
200/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
201pub struct Hopr<Chain, Db> {
202    me: OffchainKeypair,
203    cfg: config::HoprLibConfig,
204    state: Arc<state::AtomicHoprState>,
205    transport_api: HoprTransport<Chain, Db>,
206    redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
207    node_db: Db,
208    chain_api: Chain,
209    winning_ticket_subscribers: NewTicketEvents,
210    processes: OnceLock<AbortableList<HoprLibProcess>>,
211}
212
213impl<Chain, Db> Hopr<Chain, Db>
214where
215    Chain: HoprChainApi + Clone + Send + Sync + 'static,
216    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
217{
218    pub async fn new(
219        identity: (&ChainKeypair, &OffchainKeypair),
220        hopr_chain_api: Chain,
221        hopr_node_db: Db,
222        cfg: config::HoprLibConfig,
223    ) -> errors::Result<Self> {
224        if hopr_crypto_random::is_rng_fixed() {
225            warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
226        }
227
228        cfg.validate()?;
229
230        let hopr_transport_api = HoprTransport::new(
231            identity,
232            hopr_chain_api.clone(),
233            hopr_node_db.clone(),
234            vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
235            cfg.protocol,
236        );
237
238        #[cfg(all(feature = "prometheus", not(test)))]
239        {
240            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
241            METRIC_HOPR_LIB_VERSION.set(
242                &[const_format::formatcp!("{}", constants::APP_VERSION)],
243                const_format::formatcp!(
244                    "{}.{}",
245                    env!("CARGO_PKG_VERSION_MAJOR"),
246                    env!("CARGO_PKG_VERSION_MINOR")
247                )
248                .parse()
249                .unwrap_or(0.0),
250            );
251
252            // Calling get_ticket_statistics will initialize the respective metrics on tickets
253            if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
254                error!(%error, "failed to initialize ticket statistics metrics");
255            }
256        }
257
258        let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
259        new_tickets_tx.set_await_active(false);
260        new_tickets_tx.set_overflow(true);
261
262        Ok(Self {
263            me: identity.1.clone(),
264            cfg,
265            state: Arc::new(state::AtomicHoprState::new(HoprState::Uninitialized)),
266            transport_api: hopr_transport_api,
267            chain_api: hopr_chain_api,
268            node_db: hopr_node_db,
269            redeem_requests: OnceLock::new(),
270            processes: OnceLock::new(),
271            winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
272        })
273    }
274
275    fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
276        if self.status() == state {
277            Ok(())
278        } else {
279            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
280        }
281    }
282
283    pub fn status(&self) -> HoprState {
284        self.state.load(Ordering::Relaxed)
285    }
286
287    pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
288        self.chain_api
289            .balance(self.me_onchain())
290            .await
291            .map_err(HoprLibError::chain)
292    }
293
294    pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
295        self.chain_api
296            .balance(self.cfg.safe_module.safe_address)
297            .await
298            .map_err(HoprLibError::chain)
299    }
300
301    pub async fn chain_info(&self) -> errors::Result<ChainInfo> {
302        self.chain_api.chain_info().await.map_err(HoprLibError::chain)
303    }
304
305    pub fn get_safe_config(&self) -> SafeModule {
306        self.cfg.safe_module.clone()
307    }
308
309    pub fn config(&self) -> &config::HoprLibConfig {
310        &self.cfg
311    }
312
313    #[inline]
314    fn is_public(&self) -> bool {
315        self.cfg.publish
316    }
317
318    pub async fn run<
319        Ct,
320        #[cfg(feature = "session-server")] T: traits::session::HoprSessionServer + Clone + Send + 'static,
321    >(
322        &self,
323        cover_traffic: Ct,
324        #[cfg(feature = "session-server")] serve_handler: T,
325    ) -> errors::Result<HoprTransportIO>
326    where
327        Ct: TrafficGeneration + Send + Sync + 'static,
328    {
329        self.error_if_not_in_state(
330            HoprState::Uninitialized,
331            "cannot start the hopr node multiple times".into(),
332        )?;
333
334        #[cfg(feature = "testing")]
335        warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
336
337        info!(
338            address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
339            "node is not started, please fund this node",
340        );
341
342        helpers::wait_for_funds(
343            *MIN_NATIVE_BALANCE,
344            *SUGGESTED_NATIVE_BALANCE,
345            Duration::from_secs(200),
346            self.me_onchain(),
347            &self.chain_api,
348        )
349        .await?;
350
351        let mut processes = AbortableList::<HoprLibProcess>::default();
352
353        info!("starting HOPR node...");
354        self.state.store(HoprState::Initializing, Ordering::Relaxed);
355
356        let balance: XDaiBalance = self.get_balance().await?;
357        let minimum_balance = *constants::MIN_NATIVE_BALANCE;
358
359        info!(
360            address = %self.me_onchain(),
361            %balance,
362            %minimum_balance,
363            "node information"
364        );
365
366        if balance.le(&minimum_balance) {
367            return Err(HoprLibError::GeneralError(
368                "cannot start the node without a sufficiently funded wallet".into(),
369            ));
370        }
371
372        // Once we are able to query the chain,
373        // check if the ticket price is configured correctly.
374        let network_min_ticket_price = self
375            .chain_api
376            .minimum_ticket_price()
377            .await
378            .map_err(HoprLibError::chain)?;
379        let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
380        if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
381            return Err(HoprLibError::GeneralError(format!(
382                "configured outgoing ticket price is lower than the network minimum ticket price: \
383                 {configured_ticket_price:?} < {network_min_ticket_price}"
384            )));
385        }
386        // Once we are able to query the chain,
387        // check if the winning probability is configured correctly.
388        let network_min_win_prob = self
389            .chain_api
390            .minimum_incoming_ticket_win_prob()
391            .await
392            .map_err(HoprLibError::chain)?;
393        let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
394        if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
395            && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
396        {
397            return Err(HoprLibError::GeneralError(format!(
398                "configured outgoing ticket winning probability is lower than the network minimum winning \
399                 probability: {configured_win_prob:?} < {network_min_win_prob}"
400            )));
401        }
402
403        // Calculate the minimum capacity based on accounts (each account can generate 2 messages),
404        // plus 100 as an additional buffer
405        let minimum_capacity = self
406            .chain_api
407            .count_accounts(AccountSelector {
408                public_only: true,
409                ..Default::default()
410            })
411            .await
412            .map_err(HoprLibError::chain)?
413            .saturating_mul(2)
414            .saturating_add(100);
415
416        let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
417            .ok()
418            .and_then(|s| s.trim().parse::<usize>().ok())
419            .filter(|&c| c > 0)
420            .unwrap_or(2048)
421            .max(minimum_capacity);
422
423        debug!(
424            capacity = chain_discovery_events_capacity,
425            minimum_required = minimum_capacity,
426            "creating chain discovery events channel"
427        );
428        let (indexer_peer_update_tx, indexer_peer_update_rx) =
429            channel::<PeerDiscovery>(chain_discovery_events_capacity);
430
431        // Stream all the existing announcements and also subscribe to all future on-chain
432        // announcements
433        let (announcements_stream, announcements_handle) = futures::stream::abortable(
434            self.chain_api
435                .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])
436                .map_err(HoprLibError::chain)?,
437        );
438        processes.insert(HoprLibProcess::AccountAnnouncements, announcements_handle);
439
440        spawn(
441            announcements_stream
442                .filter_map(|event| {
443                    futures::future::ready(event.try_as_announcement().map(|account| {
444                        PeerDiscovery::Announce(account.public_key.into(), account.get_multiaddrs().to_vec())
445                    }))
446                })
447                .map(Ok)
448                .forward(indexer_peer_update_tx)
449                .inspect(
450                    |_| warn!(task = %HoprLibProcess::AccountAnnouncements,"long-running background task finished"),
451                ),
452        );
453
454        info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
455
456        let safe_addr = self.cfg.safe_module.safe_address;
457
458        if self.me_onchain() == safe_addr {
459            return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
460        }
461
462        info!(%safe_addr, "registering safe with this node");
463        match self.chain_api.register_safe(&safe_addr).await {
464            Ok(awaiter) => {
465                // Wait until the registration is confirmed on-chain, otherwise we cannot proceed.
466                awaiter.await.map_err(|error| {
467                    error!(%safe_addr, %error, "safe registration failed with error");
468                    HoprLibError::chain(error)
469                })?;
470                info!(%safe_addr, "safe successfully registered with this node");
471            }
472            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
473                info!(%safe_addr, "this safe is already registered with this node");
474            }
475            Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
476                // TODO: support safe deregistration flow
477                error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
478                return Err(HoprLibError::GeneralError("node registered with different safe".into()));
479            }
480            Err(error) => {
481                error!(%safe_addr, %error, "safe registration failed");
482                return Err(HoprLibError::chain(error));
483            }
484        }
485
486        // Only public nodes announce multiaddresses
487        let multiaddresses_to_announce = if self.is_public() {
488            // The multiaddresses are filtered for the non-private ones,
489            // unless `announce_local_addresses` is set to `true`.
490            self.transport_api.announceable_multiaddresses()
491        } else {
492            Vec::with_capacity(0)
493        };
494
495        // Warn when announcing a private multiaddress, which is acceptable in certain scenarios
496        multiaddresses_to_announce
497            .iter()
498            .filter(|a| !is_public_address(a))
499            .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
500
501        // At this point the node is already registered with Safe, so
502        // we can announce via Safe-compliant TX
503        info!(?multiaddresses_to_announce, "announcing node on chain");
504        match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
505            Ok(awaiter) => {
506                // Wait until the announcement is confirmed on-chain, otherwise we cannot proceed.
507                awaiter.await.map_err(|error| {
508                    error!(?multiaddresses_to_announce, %error, "node announcement failed");
509                    HoprLibError::chain(error)
510                })?;
511                info!(?multiaddresses_to_announce, "node has been successfully announced");
512            }
513            Err(AnnouncementError::AlreadyAnnounced) => {
514                info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
515            }
516            Err(error) => {
517                error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
518                return Err(HoprLibError::chain(error));
519            }
520        }
521
522        let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
523            .ok()
524            .and_then(|s| s.trim().parse::<usize>().ok())
525            .filter(|&c| c > 0)
526            .unwrap_or(256);
527
528        let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
529        #[cfg(feature = "session-server")]
530        {
531            debug!(capacity = incoming_session_channel_capacity, "creating session server");
532            processes.insert(
533                HoprLibProcess::SessionServer,
534                hopr_async_runtime::spawn_as_abortable!(
535                    _session_rx
536                        .for_each_concurrent(None, move |session| {
537                            let serve_handler = serve_handler.clone();
538                            async move {
539                                let session_id = *session.session.id();
540                                match serve_handler.process(session).await {
541                                    Ok(_) => debug!(?session_id, "client session processed successfully"),
542                                    Err(error) => error!(
543                                        ?session_id,
544                                        %error,
545                                        "client session processing failed"
546                                    ),
547                                }
548                            }
549                        })
550                        .inspect(|_| tracing::warn!(
551                            task = %HoprLibProcess::SessionServer,
552                            "long-running background task finished"
553                        ))
554                ),
555            );
556        }
557
558        info!("starting ticket events processor");
559        let (tickets_tx, tickets_rx) = channel(1024);
560        let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
561        processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
562        let node_db = self.node_db.clone();
563        let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
564        spawn(
565            tickets_rx
566                .filter_map(move |ticket_event| {
567                    let node_db = node_db.clone();
568                    async move {
569                        match ticket_event {
570                            TicketEvent::WinningTicket(winning) => {
571                                if let Err(error) = node_db.insert_ticket(*winning).await {
572                                    tracing::error!(%error, %winning, "failed to insert ticket into database");
573                                } else {
574                                    tracing::debug!(%winning, "inserted ticket into database");
575                                }
576                                Some(winning)
577                            }
578                            TicketEvent::RejectedTicket(rejected, issuer) => {
579                                if let Some(issuer) = &issuer {
580                                    if let Err(error) =
581                                        node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
582                                    {
583                                        tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
584                                    } else {
585                                        tracing::debug!(%rejected, "marked ticket as rejected");
586                                    }
587                                } else {
588                                    tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
589                                }
590                                None
591                            }
592                        }
593                    }
594                })
595                .for_each(move |ticket| {
596                    if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
597                        tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
598                    }
599                    futures::future::ready(())
600                })
601                .inspect(|_| {
602                    tracing::warn!(
603                        task = %HoprLibProcess::TicketEvents,
604                        "long-running background task finished"
605                    )
606                }),
607        );
608
609        info!("starting transport");
610        let (hopr_socket, transport_processes) = self
611            .transport_api
612            .run(cover_traffic, indexer_peer_update_rx, tickets_tx, session_tx)
613            .await?;
614        processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
615
616        // Start a queue that takes care of redeeming tickets via given TicketSelectors
617        let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
618        let _ = self.redeem_requests.set(redemption_req_tx);
619        let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
620        processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
621        let chain = self.chain_api.clone();
622        let node_db = self.node_db.clone();
623        spawn(redemption_req_rx
624            .for_each(move |selector| {
625                let chain = chain.clone();
626                let db = node_db.clone();
627                async move {
628                    match chain.redeem_tickets_via_selectors(&db, [selector]).await {
629                        Ok(res) => debug!(%res, "redemption complete"),
630                        Err(error) => error!(%error, "redemption failed"),
631                    }
632                }
633            })
634            .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
635        );
636
637        let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
638        processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
639        let chain = self.chain_api.clone();
640        let node_db = self.node_db.clone();
641        let events = chain.subscribe().map_err(HoprLibError::chain)?;
642        spawn(
643            futures::stream::Abortable::new(
644                events
645                    .filter_map(move |event|
646                        futures::future::ready(event.try_as_channel_closed())
647                    ),
648                chain_events_sub_reg
649            )
650            .for_each(move |closed_channel| {
651                let node_db = node_db.clone();
652                let chain = chain.clone();
653                async move {
654                    match closed_channel.direction(chain.me()) {
655                        Some(ChannelDirection::Incoming) => {
656                            match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
657                                Ok(num_neglected) if num_neglected > 0 => {
658                                    warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
659                                },
660                                Ok(_) => {
661                                    debug!(%closed_channel, "no neglected tickets on incoming closed channel");
662                                },
663                                Err(error) => {
664                                    error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
665                                }
666                            }
667                        },
668                        Some(ChannelDirection::Outgoing) => {
669                            if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
670                                error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
671                            } else {
672                                debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
673                            }
674                        }
675                        _ => {} // Event for a channel that is not our own
676                    }
677                }
678            })
679            .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
680        );
681
682        // NOTE: after the chain is synced, we can reset tickets which are considered
683        // redeemed but on-chain state does not align with that. This implies there was a problem
684        // right when the transaction was sent on-chain. In such cases, we simply let it retry and
685        // handle errors appropriately.
686        let mut channels = self
687            .chain_api
688            .stream_channels(ChannelSelector {
689                destination: self.me_onchain().into(),
690                ..Default::default()
691            })
692            .map_err(HoprLibError::chain)
693            .await?;
694
695        while let Some(channel) = channels.next().await {
696            self.node_db
697                .update_ticket_states_and_fetch(
698                    [TicketSelector::from(&channel)
699                        .with_state(AcknowledgedTicketStatus::BeingRedeemed)
700                        .with_index_range(channel.ticket_index..)],
701                    AcknowledgedTicketStatus::Untouched,
702                )
703                .map_err(HoprLibError::db)
704                .await?
705                .for_each(|ticket| {
706                    info!(%ticket, "fixed next out-of-sync ticket");
707                    futures::future::ready(())
708                })
709                .await;
710        }
711
712        self.state.store(HoprState::Running, Ordering::Relaxed);
713
714        info!(
715            id = %self.me_peer_id(),
716            version = constants::APP_VERSION,
717            "NODE STARTED AND RUNNING"
718        );
719
720        #[cfg(all(feature = "prometheus", not(test)))]
721        METRIC_HOPR_NODE_INFO.set(
722            &[
723                &self.me.public().to_peerid_str(),
724                &self.me_onchain().to_string(),
725                &self.cfg.safe_module.safe_address.to_string(),
726                &self.cfg.safe_module.module_address.to_string(),
727            ],
728            1.0,
729        );
730
731        let _ = self.processes.set(processes);
732        Ok(hopr_socket)
733    }
734
735    /// Used to practically shut down all node's processes without dropping the instance.
736    ///
737    /// This means that the instance can be used to retrieve some information, but all
738    /// active operations will stop and new will be impossible to perform.
739    /// Such operations will return [`HoprStatusError::NotThereYet`].
740    ///
741    /// This is the final state and cannot be reversed by calling [`HoprLib::run`] again.
742    pub fn shutdown(&self) -> Result<(), HoprLibError> {
743        self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
744        if let Some(processes) = self.processes.get() {
745            processes.abort_all();
746        }
747        self.state.store(HoprState::Terminated, Ordering::Relaxed);
748        info!("NODE SHUTDOWN COMPLETE");
749        Ok(())
750    }
751
752    /// Allows external users to receive notifications about new winning tickets.
753    pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send {
754        self.winning_ticket_subscribers.1.activate_cloned()
755    }
756
757    // p2p transport =========
758    /// Own PeerId used in the libp2p transport layer
759    pub fn me_peer_id(&self) -> PeerId {
760        (*self.me.public()).into()
761    }
762
763    /// Get the list of all announced public nodes in the network
764    pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
765        Ok(self
766            .chain_api
767            .stream_accounts(AccountSelector {
768                public_only: true,
769                ..Default::default()
770            })
771            .map_err(HoprLibError::chain)
772            .await?
773            .map(|entry| {
774                (
775                    PeerId::from(entry.public_key),
776                    entry.chain_addr,
777                    entry.get_multiaddrs().to_vec(),
778                )
779            })
780            .collect()
781            .await)
782    }
783
784    /// Ping another node in the network based on the PeerId
785    ///
786    /// Returns the RTT (round trip time), i.e. how long it took for the ping to return.
787    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, Observations)> {
788        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
789
790        Ok(self.transport_api.ping(peer).await?)
791    }
792
793    /// Create a client session connection returning a session object that implements
794    /// [`futures::io::AsyncRead`] and [`futures::io::AsyncWrite`] and can bu used as a read/write binary session.
795    #[cfg(feature = "session-client")]
796    pub async fn connect_to(
797        &self,
798        destination: Address,
799        target: SessionTarget,
800        cfg: SessionClientConfig,
801    ) -> errors::Result<HoprSession> {
802        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
803
804        let backoff = backon::ConstantBuilder::default()
805            .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
806            .with_delay(self.cfg.protocol.session.establish_retry_timeout)
807            .with_jitter();
808
809        use backon::Retryable;
810
811        Ok((|| {
812            let cfg = cfg.clone();
813            let target = target.clone();
814            async { self.transport_api.new_session(destination, target, cfg).await }
815        })
816        .retry(backoff)
817        .sleep(backon::FuturesTimerSleeper)
818        .await?)
819    }
820
821    /// Sends keep-alive to the given [`HoprSessionId`], making sure the session is not
822    /// closed due to inactivity.
823    #[cfg(feature = "session-client")]
824    pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
825        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
826        Ok(self.transport_api.probe_session(id).await?)
827    }
828
829    #[cfg(feature = "session-client")]
830    pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
831        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
832        Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
833    }
834
835    #[cfg(feature = "session-client")]
836    pub async fn update_session_surb_balancer_config(
837        &self,
838        id: &SessionId,
839        cfg: SurbBalancerConfig,
840    ) -> errors::Result<()> {
841        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
842        Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
843    }
844
845    /// List all multiaddresses announced by this node
846    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
847        self.transport_api.local_multiaddresses()
848    }
849
850    /// List all multiaddresses on which the node is listening
851    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
852        self.transport_api.listening_multiaddresses().await
853    }
854
855    /// List all multiaddresses observed for a PeerId
856    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
857        self.transport_api.network_observed_multiaddresses(peer).await
858    }
859
860    /// List all multiaddresses announced on-chain for the given node.
861    pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> errors::Result<Vec<Multiaddr>> {
862        let peer = *peer;
863        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
864        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
865
866        match self
867            .chain_api
868            .stream_accounts(AccountSelector {
869                public_only: false,
870                offchain_key: Some(pubkey),
871                ..Default::default()
872            })
873            .map_err(HoprLibError::chain)
874            .await?
875            .next()
876            .await
877        {
878            Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
879            None => {
880                error!(%peer, "no information");
881                Ok(vec![])
882            }
883        }
884    }
885
886    // Network =========
887
888    /// Get measured network health
889    pub async fn network_health(&self) -> Health {
890        self.transport_api.network_health().await
891    }
892
893    /// List all peers connected to this
894    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
895        Ok(self.transport_api.network_connected_peers().await?)
896    }
897
898    /// Get all data collected from the network relevant for a PeerId
899    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<Observations>> {
900        Ok(self.transport_api.network_peer_observations(peer).await?)
901    }
902
903    /// Get peers connected peers with quality higher than some value
904    pub async fn all_network_peers(
905        &self,
906        minimum_score: f64,
907    ) -> errors::Result<Vec<(Option<Address>, PeerId, Observations)>> {
908        Ok(
909            futures::stream::iter(self.transport_api.network_connected_peers().await?)
910                .filter_map(|peer| async move {
911                    if let Ok(Some(info)) = self.transport_api.network_peer_observations(&peer).await {
912                        if info.score() >= minimum_score {
913                            Some((peer, info))
914                        } else {
915                            None
916                        }
917                    } else {
918                        None
919                    }
920                })
921                .filter_map(|(peer_id, info)| async move {
922                    let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
923                    Some((address, peer_id, info))
924                })
925                .collect::<Vec<_>>()
926                .await,
927        )
928    }
929
930    // Ticket ========
931    /// Get all tickets in a channel specified by [`channel_id`](ChannelId).
932    pub async fn tickets_in_channel(&self, channel_id: &ChannelId) -> errors::Result<Option<Vec<RedeemableTicket>>> {
933        if let Some(channel) = self
934            .chain_api
935            .channel_by_id(channel_id)
936            .await
937            .map_err(|e| HoprTransportError::Other(e.into()))?
938        {
939            if &channel.destination == self.chain_api.me() {
940                Ok(Some(
941                    self.node_db
942                        .stream_tickets([&channel])
943                        .await
944                        .map_err(HoprLibError::db)?
945                        .collect()
946                        .await,
947                ))
948            } else {
949                Ok(None)
950            }
951        } else {
952            Ok(None)
953        }
954    }
955
956    /// Get all tickets
957    pub async fn all_tickets(&self) -> errors::Result<Vec<VerifiedTicket>> {
958        Ok(self
959            .node_db
960            .stream_tickets(None::<TicketSelector>)
961            .await
962            .map_err(HoprLibError::db)?
963            .map(|v| v.ticket)
964            .collect()
965            .await)
966    }
967
968    /// Get statistics for all tickets
969    pub async fn ticket_statistics(&self) -> errors::Result<ChannelTicketStatistics> {
970        self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
971    }
972
973    /// Reset the ticket metrics to zero
974    pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
975        self.node_db
976            .reset_ticket_statistics()
977            .await
978            .map_err(HoprLibError::chain)
979    }
980
981    // Chain =========
982    pub fn me_onchain(&self) -> Address {
983        *self.chain_api.me()
984    }
985
986    /// Get ticket price
987    pub async fn get_ticket_price(&self) -> errors::Result<HoprBalance> {
988        self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
989    }
990
991    /// Get minimum incoming ticket winning probability
992    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
993        self.chain_api
994            .minimum_incoming_ticket_win_prob()
995            .await
996            .map_err(HoprLibError::chain)
997    }
998
999    /// List of all accounts announced on the chain
1000    pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1001        Ok(self
1002            .chain_api
1003            .stream_accounts(AccountSelector {
1004                public_only: true,
1005                ..Default::default()
1006            })
1007            .map_err(HoprLibError::chain)
1008            .await?
1009            .collect()
1010            .await)
1011    }
1012
1013    /// Get the channel entry from Hash.
1014    /// @returns the channel entry of those two nodes
1015    pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1016        self.chain_api
1017            .channel_by_id(channel_id)
1018            .await
1019            .map_err(HoprLibError::chain)
1020    }
1021
1022    /// Get the channel entry between source and destination node.
1023    /// @param src Address
1024    /// @param dest Address
1025    /// @returns the channel entry of those two nodes
1026    pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<Option<ChannelEntry>> {
1027        self.chain_api
1028            .channel_by_parties(src, dest)
1029            .await
1030            .map_err(HoprLibError::chain)
1031    }
1032
1033    /// List all channels open from a specified Address
1034    pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1035        Ok(self
1036            .chain_api
1037            .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1038                ChannelStatusDiscriminants::Closed,
1039                ChannelStatusDiscriminants::Open,
1040                ChannelStatusDiscriminants::PendingToClose,
1041            ]))
1042            .map_err(HoprLibError::chain)
1043            .await?
1044            .collect()
1045            .await)
1046    }
1047
1048    /// List all channels open to a specified address
1049    pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1050        Ok(self
1051            .chain_api
1052            .stream_channels(
1053                ChannelSelector::default()
1054                    .with_destination(*dest)
1055                    .with_allowed_states(&[
1056                        ChannelStatusDiscriminants::Closed,
1057                        ChannelStatusDiscriminants::Open,
1058                        ChannelStatusDiscriminants::PendingToClose,
1059                    ]),
1060            )
1061            .map_err(HoprLibError::chain)
1062            .await?
1063            .collect()
1064            .await)
1065    }
1066
1067    /// List all channels
1068    pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1069        Ok(self
1070            .chain_api
1071            .stream_channels(ChannelSelector::default().with_allowed_states(&[
1072                ChannelStatusDiscriminants::Closed,
1073                ChannelStatusDiscriminants::Open,
1074                ChannelStatusDiscriminants::PendingToClose,
1075            ]))
1076            .map_err(HoprLibError::chain)
1077            .await?
1078            .collect()
1079            .await)
1080    }
1081
1082    /// Current safe allowance balance
1083    pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1084        self.chain_api
1085            .safe_allowance(self.cfg.safe_module.safe_address)
1086            .await
1087            .map_err(HoprLibError::chain)
1088    }
1089
1090    /// Withdraw on-chain assets to a given address
1091    /// @param recipient the account where the assets should be transferred to
1092    /// @param amount how many tokens to be transferred
1093    pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1094        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1095
1096        self.chain_api
1097            .withdraw(amount, &recipient)
1098            .and_then(identity)
1099            .map_err(HoprLibError::chain)
1100            .await
1101    }
1102
1103    /// Withdraw on-chain native assets to a given address
1104    /// @param recipient the account where the assets should be transferred to
1105    /// @param amount how many tokens to be transferred
1106    pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<prelude::Hash> {
1107        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1108
1109        self.chain_api
1110            .withdraw(amount, &recipient)
1111            .and_then(identity)
1112            .map_err(HoprLibError::chain)
1113            .await
1114    }
1115
1116    pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1117        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1118
1119        let (channel_id, tx_hash) = self
1120            .chain_api
1121            .open_channel(destination, amount)
1122            .and_then(identity)
1123            .map_err(HoprLibError::chain)
1124            .await?;
1125
1126        Ok(OpenChannelResult { tx_hash, channel_id })
1127    }
1128
1129    pub async fn fund_channel(&self, channel_id: &prelude::Hash, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1130        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1131
1132        self.chain_api
1133            .fund_channel(channel_id, amount)
1134            .and_then(identity)
1135            .map_err(HoprLibError::chain)
1136            .await
1137    }
1138
1139    pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> errors::Result<CloseChannelResult> {
1140        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1141
1142        let tx_hash = self
1143            .chain_api
1144            .close_channel(channel_id)
1145            .and_then(identity)
1146            .map_err(HoprLibError::chain)
1147            .await?;
1148
1149        Ok(CloseChannelResult { tx_hash })
1150    }
1151
1152    pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1153        self.chain_api
1154            .channel_closure_notice_period()
1155            .await
1156            .map_err(HoprLibError::chain)
1157    }
1158
1159    pub fn redemption_requests(
1160        &self,
1161    ) -> errors::Result<impl futures::Sink<TicketSelector, Error = HoprLibError> + Clone> {
1162        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1163
1164        // TODO: add universal timeout sink here
1165        Ok(self
1166            .redeem_requests
1167            .get()
1168            .cloned()
1169            .expect("redeem_requests is not initialized")
1170            .sink_map_err(HoprLibError::other))
1171    }
1172
1173    pub async fn redeem_all_tickets<B: Into<HoprBalance>>(&self, min_value: B) -> errors::Result<()> {
1174        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1175
1176        let min_value = min_value.into();
1177
1178        self.chain_api
1179            .stream_channels(
1180                ChannelSelector::default()
1181                    .with_destination(self.me_onchain())
1182                    .with_allowed_states(&[
1183                        ChannelStatusDiscriminants::Open,
1184                        ChannelStatusDiscriminants::PendingToClose,
1185                    ]),
1186            )
1187            .map_err(HoprLibError::chain)
1188            .await?
1189            .map(|channel| {
1190                Ok(TicketSelector::from(&channel)
1191                    .with_amount(min_value..)
1192                    .with_index_range(channel.ticket_index..)
1193                    .with_state(AcknowledgedTicketStatus::Untouched))
1194            })
1195            .forward(self.redemption_requests()?)
1196            .await?;
1197
1198        Ok(())
1199    }
1200
1201    pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1202        &self,
1203        counterparty: &Address,
1204        min_value: B,
1205    ) -> errors::Result<()> {
1206        self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1207            .await
1208    }
1209
1210    pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1211        &self,
1212        channel_id: &Hash,
1213        min_value: B,
1214    ) -> errors::Result<()> {
1215        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1216
1217        let channel = self
1218            .chain_api
1219            .channel_by_id(channel_id)
1220            .await
1221            .map_err(HoprLibError::chain)?
1222            .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1223
1224        self.redemption_requests()?
1225            .send(
1226                TicketSelector::from(channel)
1227                    .with_amount(min_value.into()..)
1228                    .with_index_range(channel.ticket_index..)
1229                    .with_state(AcknowledgedTicketStatus::Untouched),
1230            )
1231            .await?;
1232
1233        Ok(())
1234    }
1235
1236    pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1237        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1238
1239        self.redemption_requests()?
1240            .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1241            .await?;
1242
1243        Ok(())
1244    }
1245
1246    pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1247        let peer_id = *peer_id;
1248        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
1249        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer_id))
1250            .await
1251            .map_err(|e| HoprLibError::GeneralError(format!("failed to convert peer id to off-chain key: {}", e)))?;
1252
1253        self.chain_api
1254            .packet_key_to_chain_key(&pubkey)
1255            .await
1256            .map_err(HoprLibError::chain)
1257    }
1258
1259    pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1260        self.chain_api
1261            .chain_key_to_packet_key(address)
1262            .await
1263            .map(|pk| pk.map(|v| v.into()))
1264            .map_err(HoprLibError::chain)
1265    }
1266}
1267
1268impl<Chain, Db> Hopr<Chain, Db> {
1269    // === telemetry
1270    /// Prometheus formatted metrics collected by the hopr-lib components.
1271    pub fn collect_hopr_metrics() -> errors::Result<String> {
1272        cfg_if::cfg_if! {
1273            if #[cfg(all(feature = "prometheus", not(test)))] {
1274                hopr_metrics::gather_all_metrics().map_err(|e| HoprLibError::Other(e.into()))
1275            } else {
1276                Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
1277            }
1278        }
1279    }
1280}