1mod helpers;
17
18pub mod builder;
20pub mod config;
22pub mod constants;
24pub mod errors;
26pub use hopr_api::node::{AnnouncedPeer, AnnouncementOrigin};
28pub mod utils;
30
31pub use hopr_api as api;
32
33#[doc(hidden)]
35pub mod exports {
36 pub mod types {
37 pub use hopr_api::types::{chain, internal, primitive};
38 }
39
40 pub mod crypto {
41 pub use hopr_api::types::crypto as types;
42 pub use hopr_crypto_keypair as keypair;
43 }
44
45 pub mod network {
46 pub use hopr_network_types as types;
47 }
48
49 pub use hopr_transport as transport;
50}
51
52#[doc(hidden)]
54pub mod prelude {
55 #[cfg(feature = "runtime-tokio")]
56 pub use super::exports::network::types::{
57 prelude::ForeignDataMode,
58 udp::{ConnectedUdpStream, UdpStreamParallelism},
59 };
60 pub use super::exports::{
61 crypto::{
62 keypair::key_pair::HoprKeys,
63 types::prelude::{ChainKeypair, Hash, OffchainKeypair},
64 },
65 transport::{OffchainPublicKey, socket::HoprSocket},
66 types::primitive::prelude::Address,
67 };
68}
69
70use std::{
71 sync::{Arc, atomic::Ordering},
72 time::Duration,
73};
74
75use futures::{FutureExt, Stream, StreamExt, TryFutureExt, pin_mut};
76use futures_time::future::FutureExt as FuturesTimeFutureExt;
77#[cfg(feature = "session-client")]
78pub use hopr_api::node::HoprSessionClientOperations;
79use hopr_api::{
80 chain::*,
81 graph::HoprGraphApi,
82 node::{
83 AtomicHoprState, ComponentStatus, EitherErrExt, EventWaitResult, HasChainApi, HasGraphView, HasNetworkView,
84 HasTicketManagement, HasTransportApi, NodeOnchainIdentity,
85 },
86};
87pub use hopr_api::{
88 graph::EdgeLinkObservable,
89 network::NetworkStreamControl,
90 node::{
91 EitherErr, HoprNodeOperations, HoprState, IncentiveChannelOperations, IncentiveRedeemOperations,
92 TransportOperations,
93 },
94 tickets::{ChannelStats, RedemptionResult, TicketManagement, TicketManagementExt},
95 types::{crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
96};
97use hopr_async_runtime::prelude::spawn;
98pub use hopr_async_runtime::{Abortable, AbortableList};
99pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
100pub use hopr_network_types::prelude::*;
101#[cfg(feature = "runtime-tokio")]
102pub use hopr_transport::transfer_session;
103pub use hopr_transport::*;
104use tracing::debug;
105
106pub use crate::{
107 config::SafeModule,
108 constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
109 errors::{HoprLibError, HoprStatusError},
110};
111
112#[cfg(feature = "session-client")]
116#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, smart_default::SmartDefault)]
117#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
118pub struct HopRouting(
119 #[default(hopr_api::types::primitive::bounded::BoundedSize::MIN)]
120 hopr_api::types::primitive::bounded::BoundedSize<
121 { hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS },
122 >,
123);
124
125#[cfg(feature = "session-client")]
126impl HopRouting {
127 pub const MAX_HOPS: usize = hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS;
129
130 pub fn hop_count(self) -> usize {
132 self.0.into()
133 }
134}
135
136#[cfg(feature = "session-client")]
137impl TryFrom<usize> for HopRouting {
138 type Error = hopr_api::types::primitive::errors::GeneralError;
139
140 fn try_from(value: usize) -> Result<Self, Self::Error> {
141 Ok(Self(value.try_into()?))
142 }
143}
144
145#[cfg(feature = "session-client")]
146impl From<HopRouting> for hopr_api::types::internal::routing::RoutingOptions {
147 fn from(value: HopRouting) -> Self {
148 Self::Hops(value.0)
149 }
150}
151
152#[cfg(feature = "session-client")]
157#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault)]
158pub struct HoprSessionClientConfig {
159 pub forward_path: HopRouting,
161 pub return_path: HopRouting,
163 #[default(_code = "SessionCapability::Segmentation.into()")]
165 pub capabilities: SessionCapabilities,
166 #[default(None)]
168 pub pseudonym: Option<hopr_api::types::internal::protocol::HoprPseudonym>,
169 #[default(Some(SurbBalancerConfig::default()))]
171 pub surb_management: Option<SurbBalancerConfig>,
172 #[default(false)]
174 pub always_max_out_surbs: bool,
175}
176
177#[cfg(feature = "session-client")]
178impl From<HoprSessionClientConfig> for hopr_transport::SessionClientConfig {
179 fn from(value: HoprSessionClientConfig) -> Self {
180 Self {
181 forward_path_options: value.forward_path.into(),
182 return_path_options: value.return_path.into(),
183 capabilities: value.capabilities,
184 pseudonym: value.pseudonym,
185 surb_management: value.surb_management,
186 always_max_out_surbs: value.always_max_out_surbs,
187 }
188 }
189}
190
191#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display, strum::EnumCount)]
193pub(crate) enum HoprLibProcess {
194 #[strum(to_string = "transport: {0}")]
195 Transport(HoprTransportProcess),
196 #[strum(to_string = "session server providing the exit node session stream functionality")]
197 #[allow(dead_code)] SessionServer,
199 #[strum(to_string = "subscription for on-chain channel updates")]
200 ChannelEvents,
201 #[strum(to_string = "on received ticket event (winning or rejected)")]
202 TicketEvents,
203 #[strum(to_string = "neglecting tickets on closed channels")]
204 ChannelClosureNeglect,
205}
206
207#[cfg(feature = "runtime-tokio")]
212pub fn prepare_tokio_runtime(
213 num_cpu_threads: Option<std::num::NonZeroUsize>,
214 num_io_threads: Option<std::num::NonZeroUsize>,
215) -> anyhow::Result<tokio::runtime::Runtime> {
216 use std::str::FromStr;
217 let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
218
219 hopr_parallelize::cpu::init_thread_pool(
220 num_cpu_threads
221 .map(|v| v.get())
222 .or(avail_parallelism)
223 .ok_or(anyhow::anyhow!(
224 "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
225 environment variable."
226 ))?
227 .max(1),
228 )?;
229
230 Ok(tokio::runtime::Builder::new_multi_thread()
231 .enable_all()
232 .worker_threads(
233 num_io_threads
234 .map(|v| v.get())
235 .or(avail_parallelism)
236 .ok_or(anyhow::anyhow!(
237 "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
238 environment variable."
239 ))?
240 .max(1),
241 )
242 .thread_name("hoprd")
243 .thread_stack_size(
244 std::env::var("HOPRD_THREAD_STACK_SIZE")
245 .ok()
246 .and_then(|v| usize::from_str(&v).ok())
247 .unwrap_or(10 * 1024 * 1024)
248 .max(2 * 1024 * 1024),
249 )
250 .build()?)
251}
252
253pub type HoprTransportIO = socket::HoprSocket<
255 futures::channel::mpsc::Receiver<ApplicationDataIn>,
256 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
257>;
258
259type TicketEvents = (
260 async_broadcast::Sender<hopr_api::node::TicketEvent>,
261 async_broadcast::InactiveReceiver<hopr_api::node::TicketEvent>,
262);
263
264const NODE_READY_TIMEOUT: Duration = Duration::from_secs(120);
266
267pub struct Hopr<Chain, Graph, Net, TMgr> {
279 pub(crate) transport_id: OffchainKeypair,
280 pub(crate) chain_id: NodeOnchainIdentity,
281 pub(crate) cfg: config::HoprLibConfig,
282 pub(crate) state: Arc<AtomicHoprState>,
283 pub(crate) transport_api: HoprTransport<Chain, Graph, Net>,
284 pub(crate) chain_api: Chain,
285 pub(crate) ticket_event_subscribers: TicketEvents,
286 pub(crate) ticket_manager: TMgr,
287 #[allow(dead_code)] pub(crate) processes: AbortableList<HoprLibProcess>,
289}
290
291impl<Chain, Graph, Net, TMgr> Hopr<Chain, Graph, Net, TMgr>
292where
293 Chain: HoprChainApi + Clone + Send + Sync + 'static,
294 Graph: HoprGraphApi<HoprNodeId = OffchainPublicKey> + Clone + Send + Sync + 'static,
295 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
296 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
297 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
298 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
299{
300 pub fn config(&self) -> &config::HoprLibConfig {
301 &self.cfg
302 }
303
304 pub fn graph(&self) -> &Graph {
306 self.transport_api.graph()
307 }
308
309 #[cfg(feature = "session-client")]
310 fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
311 if HoprNodeOperations::status(self) == state {
312 Ok(())
313 } else {
314 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
315 }
316 }
317}
318
319#[cfg(feature = "session-client")]
320#[async_trait::async_trait]
321impl<Chain, Graph, Net, TMgr> hopr_api::node::HoprSessionClientOperations for Hopr<Chain, Graph, Net, TMgr>
322where
323 Chain: HoprChainApi + Clone + Send + Sync + 'static,
324 Graph: HoprGraphApi<HoprNodeId = OffchainPublicKey> + Clone + Send + Sync + 'static,
325 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
326 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
327 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
328 Net: hopr_api::network::NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
329 TMgr: Send + Sync + 'static,
330{
331 type Config = HoprSessionClientConfig;
332 type Error = HoprLibError;
333 type Session = HoprSession;
334 type SessionConfigurator = HoprSessionConfigurator;
335 type Target = SessionTarget;
336
337 async fn connect_to(
338 &self,
339 destination: Address,
340 target: Self::Target,
341 cfg: Self::Config,
342 ) -> Result<(Self::Session, Self::SessionConfigurator), Self::Error> {
343 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
344
345 let backoff = backon::ConstantBuilder::default()
346 .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
347 .with_delay(self.cfg.protocol.session.establish_retry_timeout)
348 .with_jitter();
349
350 use backon::Retryable;
351
352 Ok((|| {
353 let cfg = hopr_transport::SessionClientConfig::from(cfg.clone());
354 let target = target.clone();
355 async { self.transport_api.new_session(destination, target, cfg).await }
356 })
357 .retry(backoff)
358 .sleep(backon::FuturesTimerSleeper)
359 .await?)
360 }
361}
362
363fn component_status(state: HoprState, component: &str) -> ComponentStatus {
372 if state == HoprState::Running {
373 ComponentStatus::Ready
374 } else {
375 ComponentStatus::Initializing(format!("{component} not yet running"))
376 }
377}
378
379impl<Chain, Graph, Net, TMgr> HasChainApi for Hopr<Chain, Graph, Net, TMgr>
380where
381 Chain: HoprChainApi + Clone + Send + Sync + 'static,
382{
383 type ChainApi = Chain;
384 type ChainError = HoprLibError;
385
386 fn identity(&self) -> &NodeOnchainIdentity {
387 &self.chain_id
388 }
389
390 fn chain_api(&self) -> &Chain {
391 &self.chain_api
392 }
393
394 fn status(&self) -> ComponentStatus {
395 let state = HoprNodeOperations::status(self);
397 if state == HoprState::ValidatingNetworkConfig {
398 ComponentStatus::Ready
399 } else {
400 component_status(state, "chain")
401 }
402 }
403
404 fn wait_for_on_chain_event<F>(
405 &self,
406 predicate: F,
407 context: String,
408 timeout: Duration,
409 ) -> EventWaitResult<<Self::ChainApi as HoprChainApi>::ChainError, Self::ChainError>
410 where
411 F: Fn(&ChainEvent) -> bool + Send + Sync + 'static,
412 {
413 debug!(%context, "registering wait for on-chain event");
414 let (event_stream, handle) = futures::stream::abortable(
415 self.chain_api
416 .subscribe()?
417 .skip_while(move |event| futures::future::ready(!predicate(event))),
418 );
419
420 let ctx = context.clone();
421
422 Ok((
423 spawn(async move {
424 pin_mut!(event_stream);
425 let res = event_stream
426 .next()
427 .timeout(futures_time::time::Duration::from(timeout))
428 .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")).into_right())
429 .await?
430 .ok_or(
431 HoprLibError::GeneralError(format!("failed to yield an on-chain event for {ctx}")).into_right(),
432 );
433 debug!(%ctx, ?res, "on-chain event waiting done");
434 res
435 })
436 .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")).into_right())
437 .and_then(futures::future::ready)
438 .boxed(),
439 handle,
440 ))
441 }
442}
443
444impl<Chain, Graph, Net, TMgr> HasNetworkView for Hopr<Chain, Graph, Net, TMgr>
445where
446 Chain: Send + Sync + 'static,
447 Graph: Send + Sync + 'static,
448 Net: hopr_api::network::NetworkView + Send + Sync + 'static,
449{
450 type NetworkView = HoprTransport<Chain, Graph, Net>;
451
452 fn network_view(&self) -> &Self::NetworkView {
453 &self.transport_api
454 }
455
456 fn status(&self) -> ComponentStatus {
457 component_status(HoprNodeOperations::status(self), "network")
458 }
459}
460
461impl<Chain, Graph, Net, TMgr> HasGraphView for Hopr<Chain, Graph, Net, TMgr>
462where
463 Chain: HoprChainApi + Clone + Send + Sync + 'static,
464 Graph: HoprGraphApi<HoprNodeId = OffchainPublicKey> + Clone + Send + Sync + 'static,
465 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
466 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
467 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
468 Net: hopr_api::network::NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
469{
470 type Graph = Graph;
471
472 fn graph(&self) -> &Graph {
473 self.transport_api.graph()
474 }
475}
476
477impl<Chain, Graph, Net, TMgr> HasTransportApi for Hopr<Chain, Graph, Net, TMgr>
478where
479 Chain: HoprChainApi + Clone + Send + Sync + 'static,
480 Graph: HoprGraphApi<HoprNodeId = OffchainPublicKey> + Clone + Send + Sync + 'static,
481 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
482 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
483 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
484 Net: hopr_api::network::NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
485 TMgr: Send + Sync + 'static,
486{
487 type Transport = HoprTransport<Chain, Graph, Net>;
488
489 fn transport(&self) -> &Self::Transport {
490 &self.transport_api
491 }
492
493 fn status(&self) -> ComponentStatus {
494 component_status(HoprNodeOperations::status(self), "transport")
495 }
496}
497
498impl<Chain, Graph, Net, TMgr> HasTicketManagement for Hopr<Chain, Graph, Net, TMgr>
500where
501 Chain: HoprChainApi + Clone + Send + Sync + 'static,
502 TMgr: TicketManagement + Clone + Send + Sync + 'static,
503{
504 type TicketManager = TMgr;
505
506 fn ticket_management(&self) -> &TMgr {
507 &self.ticket_manager
508 }
509
510 fn subscribe_ticket_events(&self) -> impl Stream<Item = hopr_api::node::TicketEvent> + Send + 'static {
511 self.ticket_event_subscribers.1.activate_cloned()
512 }
513
514 fn status(&self) -> ComponentStatus {
515 ComponentStatus::Ready
516 }
517}
518
519impl<Chain, Graph, Net, TMgr> Hopr<Chain, Graph, Net, TMgr> {
520 pub fn collect_hopr_metrics() -> errors::Result<String> {
522 cfg_if::cfg_if! {
523 if #[cfg(all(feature = "telemetry", not(test)))] {
524 hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
525 } else {
526 Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
527 }
528 }
529 }
530}
531
532impl<Chain, Graph, Net, TMgr> HoprNodeOperations for Hopr<Chain, Graph, Net, TMgr> {
533 fn status(&self) -> HoprState {
534 self.state.load(Ordering::Relaxed)
535 }
536}
537
538#[cfg(test)]
539mod tests {
540 use super::*;
541
542 #[test]
543 fn component_status_returns_ready_when_running() {
544 assert_eq!(component_status(HoprState::Running, "test"), ComponentStatus::Ready);
545 }
546
547 #[test]
548 fn component_status_returns_initializing_when_not_running() {
549 let status = component_status(HoprState::Uninitialized, "network");
550 assert_eq!(status, ComponentStatus::Initializing("network not yet running".into()));
551 }
552
553 #[test]
554 fn component_status_includes_component_name_in_message() {
555 let status = component_status(HoprState::Uninitialized, "chain");
556 assert_eq!(status, ComponentStatus::Initializing("chain not yet running".into()));
557 }
558}
559
560pub fn peer_id_to_offchain_key(peer_id: &PeerId) -> errors::Result<OffchainPublicKey> {
564 Ok(hopr_transport::peer_id_to_public_key(peer_id)?)
565}