1mod helpers;
18
19pub mod config;
21pub mod constants;
23pub mod errors;
25
26pub mod utils;
28
29pub mod traits;
31
32pub mod state;
34
35#[cfg(any(feature = "testing", test))]
36pub mod testing;
37
38#[doc(hidden)]
40pub mod exports {
41 pub mod chain {
42 pub use hopr_chain_types as types;
43 }
44
45 pub mod types {
46 pub use hopr_primitive_types as primitive;
47 }
48 pub mod crypto {
49 pub use hopr_crypto_keypair as keypair;
50 pub use hopr_crypto_types as types;
51 }
52
53 pub mod network {
54 pub use hopr_network_types as types;
55 }
56
57 pub use hopr_transport as transport;
58}
59
60#[doc(hidden)]
62pub mod prelude {
63 pub use super::exports::{
64 crypto::{
65 keypair::key_pair::HoprKeys,
66 types::prelude::{ChainKeypair, Hash, OffchainKeypair},
67 },
68 network::types::{
69 prelude::ForeignDataMode,
70 udp::{ConnectedUdpStream, UdpStreamParallelism},
71 },
72 transport::{OffchainPublicKey, socket::HoprSocket},
73 types::primitive::prelude::Address,
74 };
75}
76
77use std::{
78 collections::HashMap,
79 ops::Deref,
80 path::PathBuf,
81 str::FromStr,
82 sync::{Arc, atomic::Ordering},
83 time::Duration,
84};
85
86use async_lock::RwLock;
87pub use async_trait::async_trait;
88use errors::{HoprLibError, HoprStatusError};
89use futures::{FutureExt, StreamExt, channel::mpsc::channel, future::AbortHandle};
90use hopr_api::{
91 chain::{
92 AccountSelector, AnnouncementError, ChainEvents, ChainKeyOperations, ChainReadAccountOperations,
93 ChainReadChannelOperations, ChainValues, ChainWriteAccountOperations, ChainWriteChannelOperations,
94 ChainWriteTicketOperations, ChannelSelector,
95 },
96 db::{HoprDbPeersOperations, HoprDbTicketOperations, PeerStatus, TicketSelector},
97};
98use hopr_async_runtime::prelude::spawn;
99pub use hopr_chain_api::config::{
100 Addresses as NetworkContractAddresses, EnvironmentType, Network as ChainNetwork, ProtocolsConfig,
101};
102use hopr_chain_api::{HoprChain, HoprChainProcess, config::ChainNetworkConfig, errors::HoprChainError, wait_for_funds};
103use hopr_chain_types::ContractAddresses;
104pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
105use hopr_crypto_types::prelude::Hash;
106use hopr_db_node::{HoprNodeDb, HoprNodeDbConfig};
107pub use hopr_internal_types::prelude::*;
108pub use hopr_network_types::prelude::{DestinationRouting, IpProtocol, RoutingOptions};
109pub use hopr_path::channel_graph::GraphExportConfig;
110use hopr_path::channel_graph::{ChannelGraph, ChannelGraphConfig, NodeScoreUpdate};
111#[cfg(all(feature = "prometheus", not(test)))]
112use hopr_platform::time::native::current_time;
113pub use hopr_primitive_types::prelude::*;
114pub use hopr_strategy::Strategy;
115use hopr_strategy::strategy::{MultiStrategy, SingularStrategy};
116#[cfg(feature = "runtime-tokio")]
117pub use hopr_transport::transfer_session;
118pub use hopr_transport::{
119 ApplicationData, ApplicationDataIn, ApplicationDataOut, HalfKeyChallenge, Health, HoprSession, IncomingSession,
120 Keypair, Multiaddr, OffchainKeypair as HoprOffchainKeypair, PeerId, PingQueryReplier, ProbeError, SESSION_MTU,
121 SURB_SIZE, ServiceId, SessionCapabilities, SessionCapability, SessionClientConfig, SessionId as HoprSessionId,
122 SessionManagerError, SessionTarget, SurbBalancerConfig, Tag, Telemetry, TicketStatistics, TrafficGeneration,
123 TransportSessionError,
124 config::{HostConfig, HostType, looks_like_domain},
125 errors::{HoprTransportError, NetworkingError, ProtocolError},
126};
127use hopr_transport::{
128 ChainKeypair, HoprTransport, HoprTransportConfig, OffchainKeypair, PeerDiscovery, execute_on_tick,
129};
130use tracing::{debug, error, info, trace, warn};
131
132use crate::{
133 config::SafeModule,
134 constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
135 state::HoprState,
136 traits::chain::{CloseChannelResult, OpenChannelResult},
137};
138
139#[cfg(all(feature = "prometheus", not(test)))]
140lazy_static::lazy_static! {
141 static ref METRIC_PROCESS_START_TIME: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
142 "hopr_start_time",
143 "The unix timestamp in seconds at which the process was started"
144 ).unwrap();
145 static ref METRIC_HOPR_LIB_VERSION: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
146 "hopr_lib_version",
147 "Executed version of hopr-lib",
148 &["version"]
149 ).unwrap();
150 static ref METRIC_HOPR_NODE_INFO: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
151 "hopr_node_addresses",
152 "Node on-chain and off-chain addresses",
153 &["peerid", "address", "safe_address", "module_address"]
154 ).unwrap();
155}
156
157pub struct DummyCoverTrafficType {
158 #[allow(dead_code)]
159 _unconstructable: (),
160}
161
162impl TrafficGeneration for DummyCoverTrafficType {
163 fn build(
164 self,
165 ) -> (
166 impl futures::Stream<Item = DestinationRouting> + Send,
167 impl futures::Sink<
168 std::result::Result<hopr_transport::Telemetry, hopr_transport::ProbeError>,
169 Error = impl std::error::Error,
170 > + Send
171 + Sync
172 + Clone
173 + 'static,
174 ) {
175 (
176 futures::stream::empty(),
177 futures::sink::drain::<std::result::Result<hopr_transport::Telemetry, hopr_transport::ProbeError>>(),
178 )
179 }
180}
181
182#[cfg(feature = "runtime-tokio")]
187pub fn prepare_tokio_runtime() -> anyhow::Result<tokio::runtime::Runtime> {
188 let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
189
190 hopr_parallelize::cpu::init_thread_pool(
191 std::env::var("HOPRD_NUM_CPU_THREADS")
192 .ok()
193 .and_then(|v| usize::from_str(&v).ok())
194 .or(avail_parallelism)
195 .ok_or(anyhow::anyhow!(
196 "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
197 environment variable."
198 ))?
199 .max(1),
200 )?;
201
202 Ok(tokio::runtime::Builder::new_multi_thread()
203 .enable_all()
204 .worker_threads(
205 std::env::var("HOPRD_NUM_IO_THREADS")
206 .ok()
207 .and_then(|v| usize::from_str(&v).ok())
208 .or(avail_parallelism)
209 .ok_or(anyhow::anyhow!(
210 "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
211 environment variable."
212 ))?
213 .max(1),
214 )
215 .thread_name("hoprd")
216 .thread_stack_size(
217 std::env::var("HOPRD_THREAD_STACK_SIZE")
218 .ok()
219 .and_then(|v| usize::from_str(&v).ok())
220 .unwrap_or(10 * 1024 * 1024)
221 .max(2 * 1024 * 1024),
222 )
223 .build()?)
224}
225
226pub struct Hopr {
238 me: OffchainKeypair,
239 cfg: config::HoprLibConfig,
240 state: Arc<state::AtomicHoprState>,
241 transport_api: HoprTransport<HoprNodeDb, HoprChain>,
242 hopr_chain_api: HoprChain,
243 node_db: HoprNodeDb,
244 chain_cfg: ChainNetworkConfig,
246 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
247 multistrategy: Arc<MultiStrategy>,
248}
249
250impl Hopr {
251 pub async fn new(
252 mut cfg: config::HoprLibConfig,
253 me: &OffchainKeypair,
254 me_onchain: &ChainKeypair,
255 ) -> crate::errors::Result<Self> {
256 if hopr_crypto_random::is_rng_fixed() {
257 warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
258 }
259
260 let multiaddress: Multiaddr = (&cfg.host).try_into()?;
261
262 let db_path: PathBuf = [&cfg.db.data, "node_db"].iter().collect();
263 info!(path = ?db_path, "Initiating DB");
264
265 if cfg.db.force_initialize {
266 info!("Force cleaning up existing database");
267 hopr_platform::file::native::remove_dir_all(db_path.as_path()).map_err(|e| {
268 HoprLibError::GeneralError(format!(
269 "Failed to remove the existing DB directory at '{db_path:?}': {e}"
270 ))
271 })?;
272 cfg.db.initialize = true
273 }
274
275 if let Some(parent_dir_path) = db_path.as_path().parent() {
277 if !parent_dir_path.is_dir() {
278 std::fs::create_dir_all(parent_dir_path).map_err(|e| {
279 HoprLibError::GeneralError(format!(
280 "Failed to create DB parent directory at '{parent_dir_path:?}': {e}"
281 ))
282 })?
283 }
284 }
285
286 let db_cfg = HoprNodeDbConfig {
287 create_if_missing: cfg.db.initialize,
288 force_create: cfg.db.force_initialize,
289 log_slow_queries: std::time::Duration::from_millis(150),
290 surb_ring_buffer_size: std::env::var("HOPR_PROTOCOL_SURB_RB_SIZE")
291 .ok()
292 .and_then(|s| u64::from_str(&s).map(|v| v as usize).ok())
293 .unwrap_or_else(|| HoprNodeDbConfig::default().surb_ring_buffer_size),
294 surb_distress_threshold: std::env::var("HOPR_PROTOCOL_SURB_RB_DISTRESS")
295 .ok()
296 .and_then(|s| u64::from_str(&s).map(|v| v as usize).ok())
297 .unwrap_or_else(|| HoprNodeDbConfig::default().surb_distress_threshold),
298 };
299 let node_db = HoprNodeDb::new(db_path.as_path(), me_onchain.clone(), db_cfg).await?;
300
301 if let Some(provider) = &cfg.chain.provider {
302 info!(provider, "Creating chain components using the custom provider");
303 } else {
304 info!("Creating chain components using the default provider");
305 }
306 let resolved_environment = hopr_chain_api::config::ChainNetworkConfig::new(
307 &cfg.chain.network,
308 crate::constants::APP_VERSION_COERCED,
309 cfg.chain.provider.as_deref(),
310 cfg.chain.max_rpc_requests_per_sec,
311 &mut cfg.chain.protocols,
312 )
313 .map_err(|e| HoprLibError::GeneralError(format!("Failed to resolve blockchain environment: {e}")))?;
314
315 let contract_addresses = ContractAddresses::from(&resolved_environment);
316 info!(
317 myself = me_onchain.public().to_hex(),
318 contract_addresses = ?contract_addresses,
319 "Resolved contract addresses",
320 );
321
322 let my_multiaddresses = vec![multiaddress];
323
324 let channel_graph = Arc::new(RwLock::new(ChannelGraph::new(
325 me_onchain.public().to_address(),
326 ChannelGraphConfig::default(),
327 )));
328
329 let hopr_chain_api = hopr_chain_api::HoprChain::new(
331 me_onchain.clone(),
332 resolved_environment.clone(),
333 node_db.clone(),
334 &cfg.db.data,
335 cfg.safe_module.module_address,
336 ContractAddresses {
337 announcements: resolved_environment.announcements,
338 channels: resolved_environment.channels,
339 token: resolved_environment.token,
340 ticket_price_oracle: resolved_environment.ticket_price_oracle,
341 winning_probability_oracle: resolved_environment.winning_probability_oracle,
342 network_registry: resolved_environment.network_registry,
343 network_registry_proxy: resolved_environment.network_registry_proxy,
344 node_stake_v2_factory: resolved_environment.node_stake_v2_factory,
345 node_safe_registry: resolved_environment.node_safe_registry,
346 module_implementation: resolved_environment.module_implementation,
347 },
348 cfg.safe_module.safe_address,
349 hopr_chain_api::IndexerConfig {
350 start_block_number: resolved_environment.channel_contract_deploy_block as u64,
351 fast_sync: cfg.chain.fast_sync,
352 enable_logs_snapshot: cfg.chain.enable_logs_snapshot,
353 logs_snapshot_url: cfg.chain.logs_snapshot_url.clone(),
354 data_directory: cfg.db.data.clone(),
355 },
356 )?;
357
358 let hopr_transport_api = HoprTransport::new(
359 me,
360 me_onchain,
361 HoprTransportConfig {
362 transport: cfg.transport.clone(),
363 network: cfg.network_options.clone(),
364 protocol: cfg.protocol,
365 probe: cfg.probe,
366 session: cfg.session,
367 },
368 node_db.clone(),
369 hopr_chain_api.clone(),
370 channel_graph.clone(),
371 my_multiaddresses,
372 );
373
374 let multi_strategy = Arc::new(MultiStrategy::new(cfg.strategy.clone(), hopr_chain_api.clone()));
375 debug!(
376 strategies = tracing::field::debug(&multi_strategy),
377 "Initialized strategies"
378 );
379
380 #[cfg(all(feature = "prometheus", not(test)))]
381 {
382 METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
383 METRIC_HOPR_LIB_VERSION.set(
384 &[const_format::formatcp!("{}", constants::APP_VERSION)],
385 f64::from_str(const_format::formatcp!(
386 "{}.{}",
387 env!("CARGO_PKG_VERSION_MAJOR"),
388 env!("CARGO_PKG_VERSION_MINOR")
389 ))
390 .unwrap_or(0.0),
391 );
392
393 if let Err(e) = node_db.get_ticket_statistics(None).await {
395 error!(error = %e, "Failed to initialize ticket statistics metrics");
396 }
397 }
398
399 Ok(Self {
400 me: me.clone(),
401 cfg,
402 state: Arc::new(state::AtomicHoprState::new(state::HoprState::Uninitialized)),
403 transport_api: hopr_transport_api,
404 hopr_chain_api,
405 node_db,
406 chain_cfg: resolved_environment,
407 channel_graph,
408 multistrategy: multi_strategy,
409 })
410 }
411
412 fn error_if_not_in_state(&self, state: state::HoprState, error: String) -> errors::Result<()> {
413 if self.status() == state {
414 Ok(())
415 } else {
416 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
417 }
418 }
419
420 pub fn status(&self) -> state::HoprState {
421 self.state.load(Ordering::Relaxed)
422 }
423
424 pub fn network(&self) -> String {
425 self.cfg.chain.network.clone()
426 }
427
428 pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
429 Ok(self.hopr_chain_api.node_balance().await?)
430 }
431
432 pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
433 Ok(self.hopr_chain_api.safe_balance().await?)
434 }
435
436 pub fn get_safe_config(&self) -> SafeModule {
437 self.cfg.safe_module.clone()
438 }
439
440 pub fn chain_config(&self) -> ChainNetworkConfig {
441 self.chain_cfg.clone()
442 }
443
444 pub fn config(&self) -> &config::HoprLibConfig {
445 &self.cfg
446 }
447
448 pub fn get_provider(&self) -> String {
449 self.cfg
450 .chain
451 .provider
452 .clone()
453 .unwrap_or(self.chain_cfg.chain.default_provider.clone())
454 }
455
456 #[inline]
457 fn is_public(&self) -> bool {
458 self.cfg.chain.announce
459 }
460
461 pub async fn run<
462 Ct,
463 #[cfg(feature = "session-server")] T: traits::session::HoprSessionServer + Clone + Send + 'static,
464 >(
465 &self,
466 cover_traffic: Option<Ct>,
467 #[cfg(feature = "session-server")] serve_handler: T,
468 ) -> errors::Result<(
469 hopr_transport::socket::HoprSocket<
470 futures::channel::mpsc::Receiver<ApplicationDataIn>,
471 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
472 >,
473 HashMap<state::HoprLibProcesses, AbortHandle>,
474 )>
475 where
476 Ct: TrafficGeneration + Send + Sync + 'static,
477 {
478 self.error_if_not_in_state(
479 state::HoprState::Uninitialized,
480 "Cannot start the hopr node multiple times".into(),
481 )?;
482
483 #[cfg(feature = "testing")]
484 warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
485
486 info!(
487 address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
488 "Node is not started, please fund this node",
489 );
490
491 wait_for_funds(
492 *MIN_NATIVE_BALANCE,
493 *SUGGESTED_NATIVE_BALANCE,
494 Duration::from_secs(200),
495 &self.hopr_chain_api,
496 )
497 .await?;
498
499 let mut processes: HashMap<state::HoprLibProcesses, AbortHandle> = HashMap::new();
500
501 info!("Starting the node...");
502
503 self.state.store(state::HoprState::Initializing, Ordering::Relaxed);
504
505 let balance: XDaiBalance = self.get_balance().await?;
506 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
507
508 info!(
509 address = %self.hopr_chain_api.me_onchain(),
510 %balance,
511 %minimum_balance,
512 "Node information"
513 );
514
515 if balance.le(&minimum_balance) {
516 return Err(HoprLibError::GeneralError(
517 "Cannot start the node without a sufficiently funded wallet".to_string(),
518 ));
519 }
520
521 #[cfg(not(feature = "testing"))]
522 {
523 let network_min_win_prob = self.hopr_chain_api.minimum_incoming_ticket_win_prob().await?;
526 let configured_win_prob = self.cfg.protocol.outgoing_ticket_winning_prob;
527
528 if configured_win_prob
529 .and_then(|c| WinningProbability::try_from(c).ok())
530 .is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
531 {
532 return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
533 "configured outgoing ticket winning probability is lower than the network minimum winning \
534 probability: {configured_win_prob:?} < {network_min_win_prob}"
535 ))));
536 }
537 }
538
539 self.state.store(state::HoprState::Indexing, Ordering::Relaxed);
540
541 let minimum_capacity = self
544 .hopr_chain_api
545 .count_accounts(AccountSelector {
546 public_only: true,
547 ..Default::default()
548 })
549 .await?
550 .saturating_mul(2)
551 .saturating_add(100);
552
553 let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
554 .ok()
555 .and_then(|s| s.trim().parse::<usize>().ok())
556 .filter(|&c| c > 0)
557 .unwrap_or(2048)
558 .max(minimum_capacity);
559
560 debug!(
561 capacity = chain_discovery_events_capacity,
562 minimum_required = minimum_capacity,
563 "Creating chain discovery events channel"
564 );
565 let (indexer_peer_update_tx, indexer_peer_update_rx) =
566 futures::channel::mpsc::channel::<PeerDiscovery>(chain_discovery_events_capacity);
567
568 let indexer_event_pipeline = helpers::chain_events_to_transport_events(
569 self.hopr_chain_api.subscribe()?,
570 self.me_onchain(),
571 self.multistrategy.clone(),
572 self.channel_graph.clone(),
573 self.node_db.clone(),
574 );
575
576 spawn(async move {
577 let result = indexer_event_pipeline
578 .map(Ok)
579 .forward(indexer_peer_update_tx)
580 .inspect(|result| {
581 tracing::warn!(
582 ?result,
583 task = "indexer -> transport",
584 "long-running background task finished"
585 )
586 })
587 .await;
588
589 result.expect("The index to transport event chain failed")
590 });
591
592 info!("Start the chain process and sync the indexer");
593 for (id, proc) in self.hopr_chain_api.start().await?.into_iter() {
594 let nid = match id {
595 HoprChainProcess::Indexer => state::HoprLibProcesses::Indexing,
596 HoprChainProcess::OutgoingOnchainActionQueue => state::HoprLibProcesses::OutgoingOnchainActionQueue,
597 };
598 processes.insert(nid, proc);
599 }
600
601 info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
602
603 if !self.hopr_chain_api.check_node_safe_module_status().await? {
609 return Err(HoprLibError::ChainApi(HoprChainError::Api(
610 "Safe and module are not configured correctly".into(),
611 )));
612 }
613
614 if self
618 .hopr_chain_api
619 .can_register_with_safe(&self.cfg.safe_module.safe_address)
620 .await?
621 {
622 info!("Registering safe by node");
623
624 if self.me_onchain() == self.cfg.safe_module.safe_address {
625 return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
626 }
627
628 if let Err(error) = self
629 .hopr_chain_api
630 .register_safe(&self.cfg.safe_module.safe_address)
631 .await?
632 .await
633 {
634 error!(%error, "Failed to register node with safe")
636 }
637 }
638
639 if self.is_public() {
640 let multiaddresses_to_announce = self.transport_api.announceable_multiaddresses();
644
645 match self
647 .hopr_chain_api
648 .announce(&multiaddresses_to_announce, &self.me)
649 .await
650 {
651 Ok(_) => info!(?multiaddresses_to_announce, "Announcing node on chain",),
652 Err(AnnouncementError::AlreadyAnnounced) => {
653 info!(multiaddresses_announced = ?multiaddresses_to_announce, "Node already announced on chain")
654 }
655 Err(error) => error!(%error, "Failed to transmit node announcement"),
659 }
660 }
661
662 {
663 let channel_graph = self.channel_graph.clone();
667 let mut cg = channel_graph.write_arc().await;
668
669 info!("Syncing channels from the previous runs");
670 let mut channel_stream = self
671 .hopr_chain_api
672 .stream_channels(
673 ChannelSelector::default()
674 .with_allowed_states(&[
675 ChannelStatusDiscriminants::Open,
676 ChannelStatusDiscriminants::PendingToClose,
677 ])
678 .with_closure_time_range(Utc::now()..),
679 )
680 .await?;
681 while let Some(channel) = channel_stream.next().await {
682 cg.update_channel(channel);
683 }
684
685 info!("Syncing peer qualities from the previous runs");
690 let min_quality_to_sync: f64 = std::env::var("HOPR_MIN_PEER_QUALITY_TO_SYNC")
691 .map_err(|e| e.to_string())
692 .and_then(|v| std::str::FromStr::from_str(&v).map_err(|_| "parse error".to_string()))
693 .unwrap_or_else(|error| {
694 warn!(error, "invalid value for HOPR_MIN_PEER_QUALITY_TO_SYNC env variable");
695 constants::DEFAULT_MIN_QUALITY_TO_SYNC
696 });
697
698 let mut peer_stream = self
699 .node_db
700 .get_network_peers(Default::default(), false)
701 .await?
702 .filter(|status| futures::future::ready(status.quality >= min_quality_to_sync));
703
704 while let Some(peer) = peer_stream.next().await {
705 if let Some(key) = self.hopr_chain_api.packet_key_to_chain_key(&peer.id.0).await? {
706 cg.update_node_score(&key, NodeScoreUpdate::Initialize(peer.last_seen_latency, 1.0));
708 } else {
709 error!(peer = %peer.id.1, "Could not translate peer information");
710 }
711 }
712
713 info!(
714 channels = cg.count_channels(),
715 nodes = cg.count_nodes(),
716 "Channel graph sync complete"
717 );
718 }
719
720 let multi_strategy_ack_ticket = self.multistrategy.clone();
722
723 let ack_ticket_channel_capacity = std::env::var("HOPR_INTERNAL_ACKED_TICKET_CHANNEL_CAPACITY")
724 .ok()
725 .and_then(|s| s.trim().parse::<usize>().ok())
726 .filter(|&c| c > 0)
727 .unwrap_or(2048);
728
729 debug!(
730 capacity = ack_ticket_channel_capacity,
731 "Creating acknowledged ticket channel"
732 );
733 let (on_ack_tkt_tx, mut on_ack_tkt_rx) = channel::<AcknowledgedTicket>(ack_ticket_channel_capacity);
734 self.node_db.start_ticket_processing(Some(on_ack_tkt_tx))?;
735
736 processes.insert(
737 state::HoprLibProcesses::OnReceivedAcknowledgement,
738 hopr_async_runtime::spawn_as_abortable!(async move {
739 while let Some(ack) = on_ack_tkt_rx.next().await {
740 if let Err(error) = hopr_strategy::strategy::SingularStrategy::on_acknowledged_winning_ticket(
741 &*multi_strategy_ack_ticket,
742 &ack,
743 )
744 .await
745 {
746 error!(%error, "Failed to process acknowledged winning ticket with the strategy");
747 }
748 }
749
750 tracing::warn!(
751 task = %state::HoprLibProcesses::OnReceivedAcknowledgement,
752 "long-running background task finished"
753 )
754 }),
755 );
756
757 let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
758 .ok()
759 .and_then(|s| s.trim().parse::<usize>().ok())
760 .filter(|&c| c > 0)
761 .unwrap_or(256);
762
763 debug!(
764 capacity = incoming_session_channel_capacity,
765 "Creating incoming session channel"
766 );
767 let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
768
769 #[cfg(feature = "session-server")]
770 {
771 processes.insert(
772 state::HoprLibProcesses::SessionServer,
773 hopr_async_runtime::spawn_as_abortable!(
774 _session_rx
775 .for_each_concurrent(None, move |session| {
776 let serve_handler = serve_handler.clone();
777 async move {
778 let session_id = *session.session.id();
779 match serve_handler.process(session).await {
780 Ok(_) => debug!(
781 session_id = ?session_id,
782 "Client session processed successfully"
783 ),
784 Err(e) => error!(
785 session_id = ?session_id,
786 error = %e,
787 "Client session processing failed"
788 ),
789 }
790 }
791 })
792 .inspect(|_| tracing::warn!(
793 task = %state::HoprLibProcesses::SessionServer,
794 "long-running background task finished"
795 ))
796 ),
797 );
798 }
799
800 info!("Starting transport");
801
802 let (hopr_socket, transport_processes) = self
803 .transport_api
804 .run(cover_traffic, indexer_peer_update_rx, session_tx)
805 .await?;
806
807 for (id, proc) in transport_processes.into_iter() {
808 processes.insert(id.into(), proc);
809 }
810
811 let db_clone = self.node_db.clone();
812 processes.insert(
813 state::HoprLibProcesses::TicketIndexFlush,
814 hopr_async_runtime::spawn_as_abortable!(
815 Box::pin(execute_on_tick(
816 Duration::from_secs(5),
817 move || {
818 let db_clone = db_clone.clone();
819 async move {
820 match db_clone.persist_outgoing_ticket_indices().await {
821 Ok(n) => debug!(count = n, "Successfully flushed states of outgoing ticket indices"),
822 Err(e) => error!(error = %e, "Failed to flush ticket indices"),
823 }
824 }
825 },
826 "flush the states of outgoing ticket indices".into(),
827 ))
828 .inspect(|_| tracing::warn!(
829 task = %state::HoprLibProcesses::TicketIndexFlush,
830 "long-running background task finished"
831 ))
832 ),
833 );
834
835 let mut channels = self
840 .hopr_chain_api
841 .stream_channels(ChannelSelector {
842 destination: self.me_onchain().into(),
843 ..Default::default()
844 })
845 .await?;
846
847 while let Some(channel) = channels.next().await {
848 self.node_db
849 .update_ticket_states_and_fetch(
850 TicketSelector::from(&channel)
851 .with_state(AcknowledgedTicketStatus::BeingRedeemed)
852 .with_index_range(channel.ticket_index.as_u64()..),
853 AcknowledgedTicketStatus::Untouched,
854 )
855 .await?
856 .for_each(|ticket| {
857 info!(%ticket, "fixed next out-of-sync ticket");
858 futures::future::ready(())
859 })
860 .await;
861 }
862
863 let multi_strategy = self.multistrategy.clone();
867 let strategy_interval = self.cfg.strategy.execution_interval;
868 processes.insert(
869 state::HoprLibProcesses::StrategyTick,
870 hopr_async_runtime::spawn_as_abortable!(
871 execute_on_tick(
872 Duration::from_secs(strategy_interval),
873 move || {
874 let multi_strategy = multi_strategy.clone();
875
876 async move {
877 trace!(state = "started", "strategy tick");
878 let _ = multi_strategy.on_tick().await;
879 trace!(state = "finished", "strategy tick");
880 }
881 },
882 "run strategies".into(),
883 )
884 .inspect(
885 |_| tracing::warn!(task = %state::HoprLibProcesses::StrategyTick, "long-running background task finished")
886 )
887 ),
888 );
889
890 self.state.store(state::HoprState::Running, Ordering::Relaxed);
891
892 info!(
893 id = %self.me_peer_id(),
894 version = constants::APP_VERSION,
895 "NODE STARTED AND RUNNING"
896 );
897
898 #[cfg(all(feature = "prometheus", not(test)))]
899 METRIC_HOPR_NODE_INFO.set(
900 &[
901 &self.me.public().to_peerid_str(),
902 &self.me_onchain().to_string(),
903 &self.cfg.safe_module.safe_address.to_string(),
904 &self.cfg.safe_module.module_address.to_string(),
905 ],
906 1.0,
907 );
908
909 Ok((hopr_socket, processes))
910 }
911
912 pub fn me_peer_id(&self) -> PeerId {
915 (*self.me.public()).into()
916 }
917
918 pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
920 Ok(self
921 .hopr_chain_api
922 .stream_accounts(AccountSelector {
923 public_only: true,
924 ..Default::default()
925 })
926 .await?
927 .filter_map(|entry| {
928 futures::future::ready(
929 entry
930 .get_multiaddr()
931 .map(|maddr| (PeerId::from(entry.public_key), entry.chain_addr, vec![maddr])),
932 )
933 })
934 .collect()
935 .await)
936 }
937
938 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
942 self.error_if_not_in_state(
943 state::HoprState::Running,
944 "Node is not ready for on-chain operations".into(),
945 )?;
946
947 Ok(self.transport_api.ping(peer).await?)
948 }
949
950 #[cfg(feature = "session-client")]
953 pub async fn connect_to(
954 &self,
955 destination: Address,
956 target: SessionTarget,
957 cfg: SessionClientConfig,
958 ) -> errors::Result<HoprSession> {
959 self.error_if_not_in_state(
960 state::HoprState::Running,
961 "Node is not ready for on-chain operations".into(),
962 )?;
963
964 let backoff = backon::ConstantBuilder::default()
965 .with_max_times(self.cfg.session.establish_max_retries as usize)
966 .with_delay(self.cfg.session.establish_retry_timeout)
967 .with_jitter();
968
969 use backon::Retryable;
970
971 Ok((|| {
972 let cfg = cfg.clone();
973 let target = target.clone();
974 async { self.transport_api.new_session(destination, target, cfg).await }
975 })
976 .retry(backoff)
977 .sleep(backon::FuturesTimerSleeper)
978 .await?)
979 }
980
981 #[cfg(feature = "session-client")]
984 pub async fn keep_alive_session(&self, id: &HoprSessionId) -> errors::Result<()> {
985 self.error_if_not_in_state(
986 state::HoprState::Running,
987 "Node is not ready for on-chain operations".into(),
988 )?;
989 Ok(self.transport_api.probe_session(id).await?)
990 }
991
992 #[cfg(feature = "session-client")]
993 pub async fn get_session_surb_balancer_config(
994 &self,
995 id: &HoprSessionId,
996 ) -> errors::Result<Option<SurbBalancerConfig>> {
997 self.error_if_not_in_state(
998 state::HoprState::Running,
999 "Node is not ready for on-chain operations".into(),
1000 )?;
1001 Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
1002 }
1003
1004 #[cfg(feature = "session-client")]
1005 pub async fn update_session_surb_balancer_config(
1006 &self,
1007 id: &HoprSessionId,
1008 cfg: SurbBalancerConfig,
1009 ) -> errors::Result<()> {
1010 self.error_if_not_in_state(
1011 state::HoprState::Running,
1012 "Node is not ready for on-chain operations".into(),
1013 )?;
1014 Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
1015 }
1016
1017 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1019 self.transport_api.local_multiaddresses()
1020 }
1021
1022 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1024 self.transport_api.listening_multiaddresses().await
1025 }
1026
1027 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1029 self.transport_api.network_observed_multiaddresses(peer).await
1030 }
1031
1032 pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Vec<Multiaddr> {
1034 let peer = *peer;
1035 let pubkey =
1037 match hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer)).await {
1038 Ok(k) => k,
1039 Err(e) => {
1040 error!(%peer, error = %e, "failed to convert peer id to off-chain key");
1041 return vec![];
1042 }
1043 };
1044
1045 match self.hopr_chain_api.find_account_by_packet_key(&pubkey).await {
1046 Ok(Some(entry)) => Vec::from_iter(entry.get_multiaddr()),
1047 Ok(None) => {
1048 error!(%peer, "no information");
1049 vec![]
1050 }
1051 Err(e) => {
1052 error!(%peer, error = %e, "failed to retrieve information");
1053 vec![]
1054 }
1055 }
1056 }
1057
1058 pub async fn network_health(&self) -> Health {
1062 self.transport_api.network_health().await
1063 }
1064
1065 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
1067 Ok(self.transport_api.network_connected_peers().await?)
1068 }
1069
1070 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
1072 Ok(self.transport_api.network_peer_info(peer).await?)
1073 }
1074
1075 pub async fn all_network_peers(
1077 &self,
1078 minimum_quality: f64,
1079 ) -> errors::Result<Vec<(Option<Address>, PeerId, PeerStatus)>> {
1080 Ok(
1081 futures::stream::iter(self.transport_api.network_connected_peers().await?)
1082 .filter_map(|peer| async move {
1083 if let Ok(Some(info)) = self.transport_api.network_peer_info(&peer).await {
1084 if info.get_average_quality() >= minimum_quality {
1085 Some((peer, info))
1086 } else {
1087 None
1088 }
1089 } else {
1090 None
1091 }
1092 })
1093 .filter_map(|(peer_id, info)| async move {
1094 let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
1095 Some((address, peer_id, info))
1096 })
1097 .collect::<Vec<_>>()
1098 .await,
1099 )
1100 }
1101
1102 pub async fn tickets_in_channel(&self, channel: &prelude::Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
1105 Ok(self.transport_api.tickets_in_channel(channel).await?)
1106 }
1107
1108 pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
1110 Ok(self.transport_api.all_tickets().await?)
1111 }
1112
1113 pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
1115 Ok(self.transport_api.ticket_statistics().await?)
1116 }
1117
1118 pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
1120 Ok(self.node_db.reset_ticket_statistics().await?)
1121 }
1122
1123 pub fn peer_resolver(&self) -> &impl ChainKeyOperations {
1125 &self.hopr_chain_api
1126 }
1127
1128 pub fn me_onchain(&self) -> Address {
1130 self.hopr_chain_api.me_onchain()
1131 }
1132
1133 pub async fn get_ticket_price(&self) -> errors::Result<HoprBalance> {
1135 Ok(self.hopr_chain_api.minimum_ticket_price().await?)
1136 }
1137
1138 pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
1140 Ok(self.hopr_chain_api.minimum_incoming_ticket_win_prob().await?)
1141 }
1142
1143 pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1145 Ok(self
1146 .hopr_chain_api
1147 .stream_accounts(AccountSelector {
1148 public_only: true,
1149 ..Default::default()
1150 })
1151 .await?
1152 .collect()
1153 .await)
1154 }
1155
1156 pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1159 Ok(self.hopr_chain_api.channel_by_id(channel_id).await?)
1160 }
1161
1162 pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<Option<ChannelEntry>> {
1167 Ok(self.hopr_chain_api.channel_by_parties(src, dest).await?)
1168 }
1169
1170 pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1172 Ok(self
1173 .hopr_chain_api
1174 .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1175 ChannelStatusDiscriminants::Closed,
1176 ChannelStatusDiscriminants::Open,
1177 ChannelStatusDiscriminants::PendingToClose,
1178 ]))
1179 .await?
1180 .collect()
1181 .await)
1182 }
1183
1184 pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1186 Ok(self
1187 .hopr_chain_api
1188 .stream_channels(
1189 ChannelSelector::default()
1190 .with_destination(*dest)
1191 .with_allowed_states(&[
1192 ChannelStatusDiscriminants::Closed,
1193 ChannelStatusDiscriminants::Open,
1194 ChannelStatusDiscriminants::PendingToClose,
1195 ]),
1196 )
1197 .await?
1198 .collect()
1199 .await)
1200 }
1201
1202 pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1204 Ok(self
1205 .hopr_chain_api
1206 .stream_channels(ChannelSelector::default().with_allowed_states(&[
1207 ChannelStatusDiscriminants::Closed,
1208 ChannelStatusDiscriminants::Open,
1209 ChannelStatusDiscriminants::PendingToClose,
1210 ]))
1211 .await?
1212 .collect()
1213 .await)
1214 }
1215
1216 pub async fn corrupted_channels(&self) -> errors::Result<Vec<CorruptedChannelEntry>> {
1218 Ok(self.hopr_chain_api.corrupted_channels().await?)
1219 }
1220
1221 pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1223 Ok(self.hopr_chain_api.safe_allowance().await?)
1224 }
1225
1226 pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1230 self.error_if_not_in_state(
1231 state::HoprState::Running,
1232 "Node is not ready for on-chain operations".into(),
1233 )?;
1234
1235 let awaiter = self.hopr_chain_api.withdraw(amount, &recipient).await?;
1236
1237 Ok(awaiter.await?)
1238 }
1239
1240 pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<prelude::Hash> {
1244 self.error_if_not_in_state(
1245 state::HoprState::Running,
1246 "Node is not ready for on-chain operations".into(),
1247 )?;
1248
1249 let awaiter = self.hopr_chain_api.withdraw(amount, &recipient).await?;
1250
1251 Ok(awaiter.await?)
1252 }
1253
1254 pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1255 self.error_if_not_in_state(
1256 state::HoprState::Running,
1257 "Node is not ready for on-chain operations".into(),
1258 )?;
1259
1260 let (channel_id, tx_hash) = self.hopr_chain_api.open_channel(destination, amount).await?.await?;
1261
1262 Ok(OpenChannelResult { tx_hash, channel_id })
1263 }
1264
1265 pub async fn fund_channel(&self, channel_id: &prelude::Hash, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1266 self.error_if_not_in_state(
1267 state::HoprState::Running,
1268 "Node is not ready for on-chain operations".into(),
1269 )?;
1270
1271 let awaiter = self.hopr_chain_api.fund_channel(channel_id, amount).await?;
1272
1273 Ok(awaiter.await?)
1274 }
1275
1276 pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> errors::Result<CloseChannelResult> {
1277 self.error_if_not_in_state(
1278 state::HoprState::Running,
1279 "Node is not ready for on-chain operations".into(),
1280 )?;
1281
1282 let (status, tx_hash) = self.hopr_chain_api.close_channel(channel_id).await?.await?;
1283
1284 Ok(CloseChannelResult { tx_hash, status })
1285 }
1286
1287 pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1288 Ok(self.hopr_chain_api.channel_closure_notice_period().await?)
1289 }
1290
1291 pub async fn redeem_all_tickets<B: Into<HoprBalance>>(
1292 &self,
1293 min_value: B,
1294 only_aggregated: bool,
1295 ) -> errors::Result<()> {
1296 self.error_if_not_in_state(
1297 state::HoprState::Running,
1298 "Node is not ready for on-chain operations".into(),
1299 )?;
1300
1301 let min_value = min_value.into();
1302 let chain_api = self.hopr_chain_api.clone();
1303
1304 self.hopr_chain_api
1306 .stream_channels(
1307 ChannelSelector::default()
1308 .with_destination(chain_api.me_onchain())
1309 .with_allowed_states(&[
1310 ChannelStatusDiscriminants::Open,
1311 ChannelStatusDiscriminants::PendingToClose,
1312 ]),
1313 )
1314 .await?
1315 .for_each(|channel| {
1316 let chain_api = chain_api.clone();
1317 async move {
1318 match chain_api
1319 .redeem_tickets_via_selector(
1320 TicketSelector::from(&channel)
1321 .with_amount(min_value..)
1322 .with_aggregated_only(only_aggregated)
1323 .with_index_range(channel.ticket_index.as_u64()..)
1324 .with_state(AcknowledgedTicketStatus::Untouched),
1325 )
1326 .await
1327 {
1328 Ok(awaiters) => info!(count = awaiters.len(), %channel, "redeemed tickets in channel"),
1329 Err(error) => error!(%error, %channel, "failed to redeem tickets"),
1330 }
1331 }
1332 })
1333 .await;
1334
1335 Ok(())
1336 }
1337
1338 pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1339 &self,
1340 counterparty: &Address,
1341 min_value: B,
1342 only_aggregated: bool,
1343 ) -> errors::Result<usize> {
1344 self.redeem_tickets_in_channel(
1345 &generate_channel_id(counterparty, &self.me_onchain()),
1346 min_value,
1347 only_aggregated,
1348 )
1349 .await
1350 }
1351
1352 pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1353 &self,
1354 channel_id: &Hash,
1355 min_value: B,
1356 only_aggregated: bool,
1357 ) -> errors::Result<usize> {
1358 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1359
1360 let channel = self
1361 .hopr_chain_api
1362 .channel_by_id(channel_id)
1363 .await?
1364 .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1365
1366 let out = self
1367 .hopr_chain_api
1368 .redeem_tickets_via_selector(
1369 TicketSelector::from(channel)
1370 .with_amount(min_value.into()..)
1371 .with_aggregated_only(only_aggregated)
1372 .with_index_range(channel.ticket_index.as_u64()..)
1373 .with_state(AcknowledgedTicketStatus::Untouched),
1374 )
1375 .await?;
1376
1377 Ok(out.len())
1378 }
1379
1380 pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1381 self.error_if_not_in_state(
1382 state::HoprState::Running,
1383 "Node is not ready for on-chain operations".into(),
1384 )?;
1385
1386 #[allow(clippy::let_underscore_future)]
1388 let _ = self.hopr_chain_api.redeem_ticket(ack_ticket).await?;
1389
1390 Ok(())
1391 }
1392
1393 pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1394 let peer_id = *peer_id;
1395 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer_id))
1397 .await
1398 .map_err(|e| HoprLibError::GeneralError(format!("failed to convert peer id to off-chain key: {}", e)))?;
1399
1400 Ok(self.hopr_chain_api.packet_key_to_chain_key(&pubkey).await?)
1401 }
1402
1403 pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1404 Ok(self
1405 .hopr_chain_api
1406 .chain_key_to_packet_key(address)
1407 .await
1408 .map(|pk| pk.map(|v| v.into()))?)
1409 }
1410
1411 pub async fn export_channel_graph(&self, cfg: GraphExportConfig) -> String {
1412 self.channel_graph.read_arc().await.as_dot(cfg)
1413 }
1414
1415 pub async fn export_raw_channel_graph(&self) -> errors::Result<String> {
1416 let cg = self.channel_graph.read_arc().await;
1417 serde_json::to_string(cg.deref()).map_err(|e| HoprLibError::GeneralError(e.to_string()))
1418 }
1419
1420 pub async fn get_indexer_state(&self) -> errors::Result<hopr_chain_api::IndexerStateInfo> {
1421 Ok(self.hopr_chain_api.get_indexer_state().await?)
1422 }
1423
1424 pub fn collect_hopr_metrics() -> errors::Result<String> {
1427 cfg_if::cfg_if! {
1428 if #[cfg(all(feature = "prometheus", not(test)))] {
1429 hopr_metrics::gather_all_metrics().map_err(|e| HoprLibError::GeneralError(e.to_string()))
1430 } else {
1431 Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
1432
1433 }
1434 }
1435 }
1436}