Skip to main content

hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15/// Helper functions.
16mod helpers;
17
18/// Builder module for the [`Hopr`] object.
19pub mod builder;
20/// Configuration-related public types
21pub mod config;
22/// Various public constants.
23pub mod constants;
24/// Lists all errors thrown from this library.
25pub mod errors;
26// Re-export peer discovery types from hopr-api.
27pub use hopr_api::node::{AnnouncedPeer, AnnouncementOrigin};
28/// Utility module with helper types and functionality over hopr-lib behavior.
29pub mod utils;
30
31pub use hopr_api as api;
32
33/// Exports of libraries necessary for API and interface operations.
34#[doc(hidden)]
35pub mod exports {
36    pub mod types {
37        pub use hopr_api::types::{chain, internal, primitive};
38    }
39
40    pub mod crypto {
41        pub use hopr_api::types::crypto as types;
42        pub use hopr_crypto_keypair as keypair;
43    }
44
45    pub mod network {
46        pub use hopr_network_types as types;
47    }
48
49    pub use hopr_transport as transport;
50}
51
52/// Export of relevant types for easier integration.
53#[doc(hidden)]
54pub mod prelude {
55    #[cfg(feature = "runtime-tokio")]
56    pub use super::exports::network::types::{
57        prelude::ForeignDataMode,
58        udp::{ConnectedUdpStream, UdpStreamParallelism},
59    };
60    pub use super::exports::{
61        crypto::{
62            keypair::key_pair::HoprKeys,
63            types::prelude::{ChainKeypair, Hash, OffchainKeypair},
64        },
65        transport::{OffchainPublicKey, socket::HoprSocket},
66        types::primitive::prelude::Address,
67    };
68}
69
70use std::{
71    sync::{Arc, atomic::Ordering},
72    time::Duration,
73};
74
75use futures::{FutureExt, Stream, StreamExt, TryFutureExt, pin_mut};
76use futures_time::future::FutureExt as FuturesTimeFutureExt;
77#[cfg(feature = "session-client")]
78pub use hopr_api::node::HoprSessionClientOperations;
79use hopr_api::{
80    chain::*,
81    graph::HoprGraphApi,
82    node::{
83        AtomicHoprState, ComponentStatus, EitherErrExt, EventWaitResult, HasChainApi, HasGraphView, HasNetworkView,
84        HasTicketManagement, HasTransportApi, NodeOnchainIdentity,
85    },
86};
87pub use hopr_api::{
88    graph::EdgeLinkObservable,
89    network::NetworkStreamControl,
90    node::{
91        EitherErr, HoprNodeOperations, HoprState, IncentiveChannelOperations, IncentiveRedeemOperations,
92        TransportOperations,
93    },
94    tickets::{ChannelStats, RedemptionResult, TicketManagement, TicketManagementExt},
95    types::{crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
96};
97use hopr_async_runtime::prelude::spawn;
98pub use hopr_async_runtime::{Abortable, AbortableList};
99pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
100pub use hopr_network_types::prelude::*;
101#[cfg(feature = "runtime-tokio")]
102pub use hopr_transport::transfer_session;
103pub use hopr_transport::*;
104use tracing::debug;
105
106pub use crate::{
107    config::SafeModule,
108    constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
109    errors::{HoprLibError, HoprStatusError},
110};
111
112/// Public routing configuration for session opening in `hopr-lib`.
113///
114/// This intentionally exposes only hop-count based routing.
115#[cfg(feature = "session-client")]
116#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, smart_default::SmartDefault)]
117#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
118pub struct HopRouting(
119    #[default(hopr_api::types::primitive::bounded::BoundedSize::MIN)]
120    hopr_api::types::primitive::bounded::BoundedSize<
121        { hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS },
122    >,
123);
124
125#[cfg(feature = "session-client")]
126impl HopRouting {
127    /// Maximum number of hops that can be configured.
128    pub const MAX_HOPS: usize = hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS;
129
130    /// Returns the configured number of hops.
131    pub fn hop_count(self) -> usize {
132        self.0.into()
133    }
134}
135
136#[cfg(feature = "session-client")]
137impl TryFrom<usize> for HopRouting {
138    type Error = hopr_api::types::primitive::errors::GeneralError;
139
140    fn try_from(value: usize) -> Result<Self, Self::Error> {
141        Ok(Self(value.try_into()?))
142    }
143}
144
145#[cfg(feature = "session-client")]
146impl From<HopRouting> for hopr_api::types::internal::routing::RoutingOptions {
147    fn from(value: HopRouting) -> Self {
148        Self::Hops(value.0)
149    }
150}
151
152/// Session client configuration for `hopr-lib`.
153///
154/// Unlike transport-level configuration, this API intentionally does not expose
155/// explicit intermediate paths.
156#[cfg(feature = "session-client")]
157#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault)]
158pub struct HoprSessionClientConfig {
159    /// Forward route selection policy.
160    pub forward_path: HopRouting,
161    /// Return route selection policy.
162    pub return_path: HopRouting,
163    /// Capabilities offered by the session.
164    #[default(_code = "SessionCapability::Segmentation.into()")]
165    pub capabilities: SessionCapabilities,
166    /// Optional pseudonym used for the session. Mostly useful for testing only.
167    #[default(None)]
168    pub pseudonym: Option<hopr_api::types::internal::protocol::HoprPseudonym>,
169    /// Enable automatic SURB management for the session.
170    #[default(Some(SurbBalancerConfig::default()))]
171    pub surb_management: Option<SurbBalancerConfig>,
172    /// If set, the maximum number of possible SURBs will always be sent with session data packets.
173    #[default(false)]
174    pub always_max_out_surbs: bool,
175}
176
177#[cfg(feature = "session-client")]
178impl From<HoprSessionClientConfig> for hopr_transport::SessionClientConfig {
179    fn from(value: HoprSessionClientConfig) -> Self {
180        Self {
181            forward_path_options: value.forward_path.into(),
182            return_path_options: value.return_path.into(),
183            capabilities: value.capabilities,
184            pseudonym: value.pseudonym,
185            surb_management: value.surb_management,
186            always_max_out_surbs: value.always_max_out_surbs,
187        }
188    }
189}
190
191/// Long-running tasks that are spawned by the HOPR node.
192#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display, strum::EnumCount)]
193pub(crate) enum HoprLibProcess {
194    #[strum(to_string = "transport: {0}")]
195    Transport(HoprTransportProcess),
196    #[strum(to_string = "session server providing the exit node session stream functionality")]
197    #[allow(dead_code)] // constructed only with feature = "session-server"
198    SessionServer,
199    #[strum(to_string = "subscription for on-chain channel updates")]
200    ChannelEvents,
201    #[strum(to_string = "on received ticket event (winning or rejected)")]
202    TicketEvents,
203    #[strum(to_string = "neglecting tickets on closed channels")]
204    ChannelClosureNeglect,
205}
206
207/// Prepare an optimized version of the tokio runtime setup for hopr-lib specifically.
208///
209/// Divide the available CPU parallelism by 2, since half of the available threads are
210/// to be used for IO-bound and half for CPU-bound tasks.
211#[cfg(feature = "runtime-tokio")]
212pub fn prepare_tokio_runtime(
213    num_cpu_threads: Option<std::num::NonZeroUsize>,
214    num_io_threads: Option<std::num::NonZeroUsize>,
215) -> anyhow::Result<tokio::runtime::Runtime> {
216    use std::str::FromStr;
217    let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
218
219    hopr_parallelize::cpu::init_thread_pool(
220        num_cpu_threads
221            .map(|v| v.get())
222            .or(avail_parallelism)
223            .ok_or(anyhow::anyhow!(
224                "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
225                 environment variable."
226            ))?
227            .max(1),
228    )?;
229
230    Ok(tokio::runtime::Builder::new_multi_thread()
231        .enable_all()
232        .worker_threads(
233            num_io_threads
234                .map(|v| v.get())
235                .or(avail_parallelism)
236                .ok_or(anyhow::anyhow!(
237                    "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
238                     environment variable."
239                ))?
240                .max(1),
241        )
242        .thread_name("hoprd")
243        .thread_stack_size(
244            std::env::var("HOPRD_THREAD_STACK_SIZE")
245                .ok()
246                .and_then(|v| usize::from_str(&v).ok())
247                .unwrap_or(10 * 1024 * 1024)
248                .max(2 * 1024 * 1024),
249        )
250        .build()?)
251}
252
253/// Type alias used to send and receive transport data via a running HOPR node.
254pub type HoprTransportIO = socket::HoprSocket<
255    futures::channel::mpsc::Receiver<ApplicationDataIn>,
256    futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
257>;
258
259type TicketEvents = (
260    async_broadcast::Sender<hopr_api::node::TicketEvent>,
261    async_broadcast::InactiveReceiver<hopr_api::node::TicketEvent>,
262);
263
264/// Time to wait until the node's keybinding appears on-chain
265const NODE_READY_TIMEOUT: Duration = Duration::from_secs(120);
266
267/// HOPR main object providing the entire HOPR node functionality
268///
269/// Instantiating this object creates all processes and objects necessary for
270/// running the HOPR node. Once created, the node can be started using the
271/// `run()` method.
272///
273/// Externally offered API should be enough to perform all necessary tasks
274/// with the HOPR node manually, but it is advised to create such a configuration
275/// that manual interaction is unnecessary.
276///
277/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
278pub struct Hopr<Chain, Graph, Net, TMgr> {
279    pub(crate) transport_id: OffchainKeypair,
280    pub(crate) chain_id: NodeOnchainIdentity,
281    pub(crate) cfg: config::HoprLibConfig,
282    pub(crate) state: Arc<AtomicHoprState>,
283    pub(crate) transport_api: HoprTransport<Chain, Graph, Net>,
284    pub(crate) chain_api: Chain,
285    pub(crate) ticket_event_subscribers: TicketEvents,
286    pub(crate) ticket_manager: TMgr,
287    #[allow(dead_code)] // Handles must stay alive to keep background tasks running
288    pub(crate) processes: AbortableList<HoprLibProcess>,
289}
290
291impl<Chain, Graph, Net, TMgr> Hopr<Chain, Graph, Net, TMgr>
292where
293    Chain: HoprChainApi + Clone + Send + Sync + 'static,
294    Graph: HoprGraphApi<HoprNodeId = OffchainPublicKey> + Clone + Send + Sync + 'static,
295    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
296        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
297    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
298    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
299{
300    pub fn config(&self) -> &config::HoprLibConfig {
301        &self.cfg
302    }
303
304    /// Returns a reference to the network graph.
305    pub fn graph(&self) -> &Graph {
306        self.transport_api.graph()
307    }
308
309    #[cfg(feature = "session-client")]
310    fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
311        if HoprNodeOperations::status(self) == state {
312            Ok(())
313        } else {
314            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
315        }
316    }
317}
318
319#[cfg(feature = "session-client")]
320#[async_trait::async_trait]
321impl<Chain, Graph, Net, TMgr> hopr_api::node::HoprSessionClientOperations for Hopr<Chain, Graph, Net, TMgr>
322where
323    Chain: HoprChainApi + Clone + Send + Sync + 'static,
324    Graph: HoprGraphApi<HoprNodeId = OffchainPublicKey> + Clone + Send + Sync + 'static,
325    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
326        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
327    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
328    Net: hopr_api::network::NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
329    TMgr: Send + Sync + 'static,
330{
331    type Config = HoprSessionClientConfig;
332    type Error = HoprLibError;
333    type Session = HoprSession;
334    type SessionConfigurator = HoprSessionConfigurator;
335    type Target = SessionTarget;
336
337    async fn connect_to(
338        &self,
339        destination: Address,
340        target: Self::Target,
341        cfg: Self::Config,
342    ) -> Result<(Self::Session, Self::SessionConfigurator), Self::Error> {
343        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
344
345        let backoff = backon::ConstantBuilder::default()
346            .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
347            .with_delay(self.cfg.protocol.session.establish_retry_timeout)
348            .with_jitter();
349
350        use backon::Retryable;
351
352        Ok((|| {
353            let cfg = hopr_transport::SessionClientConfig::from(cfg.clone());
354            let target = target.clone();
355            async { self.transport_api.new_session(destination, target, cfg).await }
356        })
357        .retry(backoff)
358        .sleep(backon::FuturesTimerSleeper)
359        .await?)
360    }
361}
362
363// ---------------------------------------------------------------------------
364// Has* accessor trait implementations
365// ---------------------------------------------------------------------------
366
367/// Converts [`HoprState`] into a [`ComponentStatus`] for a named component.
368///
369/// Returns [`ComponentStatus::Ready`] when the node is [`HoprState::Running`],
370/// otherwise [`ComponentStatus::Initializing`] with the component name.
371fn component_status(state: HoprState, component: &str) -> ComponentStatus {
372    if state == HoprState::Running {
373        ComponentStatus::Ready
374    } else {
375        ComponentStatus::Initializing(format!("{component} not yet running"))
376    }
377}
378
379impl<Chain, Graph, Net, TMgr> HasChainApi for Hopr<Chain, Graph, Net, TMgr>
380where
381    Chain: HoprChainApi + Clone + Send + Sync + 'static,
382{
383    type ChainApi = Chain;
384    type ChainError = HoprLibError;
385
386    fn identity(&self) -> &NodeOnchainIdentity {
387        &self.chain_id
388    }
389
390    fn chain_api(&self) -> &Chain {
391        &self.chain_api
392    }
393
394    fn status(&self) -> ComponentStatus {
395        // Chain is considered ready once the node has passed the initial funding check
396        let state = HoprNodeOperations::status(self);
397        if state == HoprState::ValidatingNetworkConfig {
398            ComponentStatus::Ready
399        } else {
400            component_status(state, "chain")
401        }
402    }
403
404    fn wait_for_on_chain_event<F>(
405        &self,
406        predicate: F,
407        context: String,
408        timeout: Duration,
409    ) -> EventWaitResult<<Self::ChainApi as HoprChainApi>::ChainError, Self::ChainError>
410    where
411        F: Fn(&ChainEvent) -> bool + Send + Sync + 'static,
412    {
413        debug!(%context, "registering wait for on-chain event");
414        let (event_stream, handle) = futures::stream::abortable(
415            self.chain_api
416                .subscribe()?
417                .skip_while(move |event| futures::future::ready(!predicate(event))),
418        );
419
420        let ctx = context.clone();
421
422        Ok((
423            spawn(async move {
424                pin_mut!(event_stream);
425                let res = event_stream
426                    .next()
427                    .timeout(futures_time::time::Duration::from(timeout))
428                    .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")).into_right())
429                    .await?
430                    .ok_or(
431                        HoprLibError::GeneralError(format!("failed to yield an on-chain event for {ctx}")).into_right(),
432                    );
433                debug!(%ctx, ?res, "on-chain event waiting done");
434                res
435            })
436            .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")).into_right())
437            .and_then(futures::future::ready)
438            .boxed(),
439            handle,
440        ))
441    }
442}
443
444impl<Chain, Graph, Net, TMgr> HasNetworkView for Hopr<Chain, Graph, Net, TMgr>
445where
446    Chain: Send + Sync + 'static,
447    Graph: Send + Sync + 'static,
448    Net: hopr_api::network::NetworkView + Send + Sync + 'static,
449{
450    type NetworkView = HoprTransport<Chain, Graph, Net>;
451
452    fn network_view(&self) -> &Self::NetworkView {
453        &self.transport_api
454    }
455
456    fn status(&self) -> ComponentStatus {
457        component_status(HoprNodeOperations::status(self), "network")
458    }
459}
460
461impl<Chain, Graph, Net, TMgr> HasGraphView for Hopr<Chain, Graph, Net, TMgr>
462where
463    Chain: HoprChainApi + Clone + Send + Sync + 'static,
464    Graph: HoprGraphApi<HoprNodeId = OffchainPublicKey> + Clone + Send + Sync + 'static,
465    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
466        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
467    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
468    Net: hopr_api::network::NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
469{
470    type Graph = Graph;
471
472    fn graph(&self) -> &Graph {
473        self.transport_api.graph()
474    }
475}
476
477impl<Chain, Graph, Net, TMgr> HasTransportApi for Hopr<Chain, Graph, Net, TMgr>
478where
479    Chain: HoprChainApi + Clone + Send + Sync + 'static,
480    Graph: HoprGraphApi<HoprNodeId = OffchainPublicKey> + Clone + Send + Sync + 'static,
481    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
482        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
483    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
484    Net: hopr_api::network::NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
485    TMgr: Send + Sync + 'static,
486{
487    type Transport = HoprTransport<Chain, Graph, Net>;
488
489    fn transport(&self) -> &Self::Transport {
490        &self.transport_api
491    }
492
493    fn status(&self) -> ComponentStatus {
494        component_status(HoprNodeOperations::status(self), "transport")
495    }
496}
497
498// Available only on Relay nodes that specify `TMgr` that implements TicketManagement
499impl<Chain, Graph, Net, TMgr> HasTicketManagement for Hopr<Chain, Graph, Net, TMgr>
500where
501    Chain: HoprChainApi + Clone + Send + Sync + 'static,
502    TMgr: TicketManagement + Clone + Send + Sync + 'static,
503{
504    type TicketManager = TMgr;
505
506    fn ticket_management(&self) -> &TMgr {
507        &self.ticket_manager
508    }
509
510    fn subscribe_ticket_events(&self) -> impl Stream<Item = hopr_api::node::TicketEvent> + Send + 'static {
511        self.ticket_event_subscribers.1.activate_cloned()
512    }
513
514    fn status(&self) -> ComponentStatus {
515        ComponentStatus::Ready
516    }
517}
518
519impl<Chain, Graph, Net, TMgr> Hopr<Chain, Graph, Net, TMgr> {
520    /// Prometheus formatted metrics collected by the hopr-lib components.
521    pub fn collect_hopr_metrics() -> errors::Result<String> {
522        cfg_if::cfg_if! {
523            if #[cfg(all(feature = "telemetry", not(test)))] {
524                hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
525            } else {
526                Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
527            }
528        }
529    }
530}
531
532impl<Chain, Graph, Net, TMgr> HoprNodeOperations for Hopr<Chain, Graph, Net, TMgr> {
533    fn status(&self) -> HoprState {
534        self.state.load(Ordering::Relaxed)
535    }
536}
537
538#[cfg(test)]
539mod tests {
540    use super::*;
541
542    #[test]
543    fn component_status_returns_ready_when_running() {
544        assert_eq!(component_status(HoprState::Running, "test"), ComponentStatus::Ready);
545    }
546
547    #[test]
548    fn component_status_returns_initializing_when_not_running() {
549        let status = component_status(HoprState::Uninitialized, "network");
550        assert_eq!(status, ComponentStatus::Initializing("network not yet running".into()));
551    }
552
553    #[test]
554    fn component_status_includes_component_name_in_message() {
555        let status = component_status(HoprState::Uninitialized, "chain");
556        assert_eq!(status, ComponentStatus::Initializing("chain not yet running".into()));
557    }
558}
559
560/// Converts a PeerId to an OffchainPublicKey.
561///
562/// This is a standalone utility function, not part of the API traits.
563pub fn peer_id_to_offchain_key(peer_id: &PeerId) -> errors::Result<OffchainPublicKey> {
564    Ok(hopr_transport::peer_id_to_public_key(peer_id)?)
565}