1mod helpers;
18
19pub mod config;
21pub mod constants;
23pub mod errors;
25
26pub mod utils;
28
29pub mod traits;
31
32pub mod state;
34
35#[cfg(any(feature = "testing", test))]
36pub mod testing;
37
38#[doc(hidden)]
40pub mod exports {
41 pub use hopr_api as api;
42
43 pub mod types {
44 pub use hopr_internal_types as internal;
45 pub use hopr_primitive_types as primitive;
46 }
47
48 pub mod crypto {
49 pub use hopr_crypto_keypair as keypair;
50 pub use hopr_crypto_types as types;
51 }
52
53 pub mod network {
54 pub use hopr_network_types as types;
55 }
56
57 pub use hopr_transport as transport;
58}
59
60#[doc(hidden)]
62pub mod prelude {
63 pub use super::exports::{
64 crypto::{
65 keypair::key_pair::HoprKeys,
66 types::prelude::{ChainKeypair, Hash, OffchainKeypair},
67 },
68 network::types::{
69 prelude::ForeignDataMode,
70 udp::{ConnectedUdpStream, UdpStreamParallelism},
71 },
72 transport::{OffchainPublicKey, socket::HoprSocket},
73 types::primitive::prelude::Address,
74 };
75}
76
77use std::{
78 convert::identity,
79 num::NonZeroUsize,
80 sync::{Arc, OnceLock, atomic::Ordering},
81 time::Duration,
82};
83
84use futures::{FutureExt, SinkExt, Stream, StreamExt, TryFutureExt, channel::mpsc::channel};
85pub use hopr_api::db::ChannelTicketStatistics;
86use hopr_api::{
87 chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
88 db::{HoprNodeDbApi, PeerStatus, TicketMarker, TicketSelector},
89};
90use hopr_async_runtime::prelude::spawn;
91pub use hopr_async_runtime::{Abortable, AbortableList};
92pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
93pub use hopr_crypto_types::prelude::*;
94pub use hopr_internal_types::prelude::*;
95pub use hopr_network_types::prelude::*;
96#[cfg(all(feature = "prometheus", not(test)))]
97use hopr_platform::time::native::current_time;
98pub use hopr_primitive_types::prelude::*;
99use hopr_transport::errors::HoprTransportError;
100#[cfg(feature = "runtime-tokio")]
101pub use hopr_transport::transfer_session;
102pub use hopr_transport::*;
103use tracing::{debug, error, info, warn};
104use validator::Validate;
105
106pub use crate::{
107 config::SafeModule,
108 constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
109 errors::{HoprLibError, HoprStatusError},
110 state::{HoprLibProcess, HoprState},
111 traits::chain::{CloseChannelResult, OpenChannelResult},
112};
113
114#[cfg(all(feature = "prometheus", not(test)))]
115lazy_static::lazy_static! {
116 static ref METRIC_PROCESS_START_TIME: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
117 "hopr_start_time",
118 "The unix timestamp in seconds at which the process was started"
119 ).unwrap();
120 static ref METRIC_HOPR_LIB_VERSION: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
121 "hopr_lib_version",
122 "Executed version of hopr-lib",
123 &["version"]
124 ).unwrap();
125 static ref METRIC_HOPR_NODE_INFO: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
126 "hopr_node_addresses",
127 "Node on-chain and off-chain addresses",
128 &["peerid", "address", "safe_address", "module_address"]
129 ).unwrap();
130}
131
132pub struct DummyCoverTrafficType {
133 #[allow(dead_code)]
134 _unconstructable: (),
135}
136
137impl TrafficGeneration for DummyCoverTrafficType {
138 fn build(
139 self,
140 ) -> (
141 impl futures::Stream<Item = DestinationRouting> + Send,
142 impl futures::Sink<
143 std::result::Result<hopr_transport::Telemetry, hopr_transport::ProbeError>,
144 Error = impl std::error::Error,
145 > + Send
146 + Sync
147 + Clone
148 + 'static,
149 ) {
150 (
151 futures::stream::empty(),
152 futures::sink::drain::<std::result::Result<hopr_transport::Telemetry, hopr_transport::ProbeError>>(),
153 )
154 }
155}
156
157#[cfg(feature = "runtime-tokio")]
162pub fn prepare_tokio_runtime(
163 num_cpu_threads: Option<NonZeroUsize>,
164 num_io_threads: Option<NonZeroUsize>,
165) -> anyhow::Result<tokio::runtime::Runtime> {
166 use std::str::FromStr;
167 let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
168
169 hopr_parallelize::cpu::init_thread_pool(
170 num_cpu_threads
171 .map(|v| v.get())
172 .or(avail_parallelism)
173 .ok_or(anyhow::anyhow!(
174 "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
175 environment variable."
176 ))?
177 .max(1),
178 )?;
179
180 Ok(tokio::runtime::Builder::new_multi_thread()
181 .enable_all()
182 .worker_threads(
183 num_io_threads
184 .map(|v| v.get())
185 .or(avail_parallelism)
186 .ok_or(anyhow::anyhow!(
187 "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
188 environment variable."
189 ))?
190 .max(1),
191 )
192 .thread_name("hoprd")
193 .thread_stack_size(
194 std::env::var("HOPRD_THREAD_STACK_SIZE")
195 .ok()
196 .and_then(|v| usize::from_str(&v).ok())
197 .unwrap_or(10 * 1024 * 1024)
198 .max(2 * 1024 * 1024),
199 )
200 .build()?)
201}
202
203pub type HoprTransportIO = socket::HoprSocket<
205 futures::channel::mpsc::Receiver<ApplicationDataIn>,
206 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
207>;
208
209type NewTicketEvents = (
210 async_broadcast::Sender<VerifiedTicket>,
211 async_broadcast::InactiveReceiver<VerifiedTicket>,
212);
213
214pub struct Hopr<Chain, Db> {
226 me: OffchainKeypair,
227 cfg: config::HoprLibConfig,
228 state: Arc<state::AtomicHoprState>,
229 transport_api: HoprTransport<Chain, Db>,
230 redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
231 node_db: Db,
232 chain_api: Chain,
233 winning_ticket_subscribers: NewTicketEvents,
234 processes: OnceLock<AbortableList<HoprLibProcess>>,
235}
236
237impl<Chain, Db> Hopr<Chain, Db>
238where
239 Chain: HoprChainApi + Clone + Send + Sync + 'static,
240 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
241{
242 pub async fn new(
243 identity: (&ChainKeypair, &OffchainKeypair),
244 hopr_chain_api: Chain,
245 hopr_node_db: Db,
246 cfg: config::HoprLibConfig,
247 ) -> errors::Result<Self> {
248 if hopr_crypto_random::is_rng_fixed() {
249 warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
250 }
251
252 cfg.validate()?;
253
254 let hopr_transport_api = HoprTransport::new(
255 identity,
256 hopr_chain_api.clone(),
257 hopr_node_db.clone(),
258 vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
259 cfg.protocol,
260 );
261
262 #[cfg(all(feature = "prometheus", not(test)))]
263 {
264 METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
265 METRIC_HOPR_LIB_VERSION.set(
266 &[const_format::formatcp!("{}", constants::APP_VERSION)],
267 const_format::formatcp!(
268 "{}.{}",
269 env!("CARGO_PKG_VERSION_MAJOR"),
270 env!("CARGO_PKG_VERSION_MINOR")
271 )
272 .parse()
273 .unwrap_or(0.0),
274 );
275
276 if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
278 error!(%error, "failed to initialize ticket statistics metrics");
279 }
280 }
281
282 let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
283 new_tickets_tx.set_await_active(false);
284 new_tickets_tx.set_overflow(true);
285
286 Ok(Self {
287 me: identity.1.clone(),
288 cfg,
289 state: Arc::new(state::AtomicHoprState::new(HoprState::Uninitialized)),
290 transport_api: hopr_transport_api,
291 chain_api: hopr_chain_api,
292 node_db: hopr_node_db,
293 redeem_requests: OnceLock::new(),
294 processes: OnceLock::new(),
295 winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
296 })
297 }
298
299 fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
300 if self.status() == state {
301 Ok(())
302 } else {
303 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
304 }
305 }
306
307 pub fn status(&self) -> HoprState {
308 self.state.load(Ordering::Relaxed)
309 }
310
311 pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
312 self.chain_api
313 .get_balance(self.me_onchain())
314 .await
315 .map_err(HoprLibError::chain)
316 }
317
318 pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
319 self.chain_api
320 .get_balance(self.cfg.safe_module.safe_address)
321 .await
322 .map_err(HoprLibError::chain)
323 }
324
325 pub async fn chain_info(&self) -> errors::Result<ChainInfo> {
326 self.chain_api.chain_info().await.map_err(HoprLibError::chain)
327 }
328
329 pub fn get_safe_config(&self) -> SafeModule {
330 self.cfg.safe_module.clone()
331 }
332
333 pub fn config(&self) -> &config::HoprLibConfig {
334 &self.cfg
335 }
336
337 #[inline]
338 fn is_public(&self) -> bool {
339 self.cfg.publish
340 }
341
342 pub async fn run<
343 Ct,
344 #[cfg(feature = "session-server")] T: traits::session::HoprSessionServer + Clone + Send + 'static,
345 >(
346 &self,
347 cover_traffic: Option<Ct>,
348 #[cfg(feature = "session-server")] serve_handler: T,
349 ) -> errors::Result<HoprTransportIO>
350 where
351 Ct: TrafficGeneration + Send + Sync + 'static,
352 {
353 self.error_if_not_in_state(
354 HoprState::Uninitialized,
355 "cannot start the hopr node multiple times".into(),
356 )?;
357
358 #[cfg(feature = "testing")]
359 warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
360
361 info!(
362 address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
363 "node is not started, please fund this node",
364 );
365
366 helpers::wait_for_funds(
367 *MIN_NATIVE_BALANCE,
368 *SUGGESTED_NATIVE_BALANCE,
369 Duration::from_secs(200),
370 self.me_onchain(),
371 &self.chain_api,
372 )
373 .await?;
374
375 let mut processes = AbortableList::<HoprLibProcess>::default();
376
377 info!("starting HOPR node...");
378 self.state.store(HoprState::Initializing, Ordering::Relaxed);
379
380 let balance: XDaiBalance = self.get_balance().await?;
381 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
382
383 info!(
384 address = %self.me_onchain(),
385 %balance,
386 %minimum_balance,
387 "node information"
388 );
389
390 if balance.le(&minimum_balance) {
391 return Err(HoprLibError::GeneralError(
392 "cannot start the node without a sufficiently funded wallet".into(),
393 ));
394 }
395
396 let network_min_ticket_price = self
399 .chain_api
400 .minimum_ticket_price()
401 .await
402 .map_err(HoprLibError::chain)?;
403 let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
404 if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
405 return Err(HoprLibError::GeneralError(format!(
406 "configured outgoing ticket price is lower than the network minimum ticket price: \
407 {configured_ticket_price:?} < {network_min_ticket_price}"
408 )));
409 }
410 let network_min_win_prob = self
413 .chain_api
414 .minimum_incoming_ticket_win_prob()
415 .await
416 .map_err(HoprLibError::chain)?;
417 let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
418 if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
419 && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
420 {
421 return Err(HoprLibError::GeneralError(format!(
422 "configured outgoing ticket winning probability is lower than the network minimum winning \
423 probability: {configured_win_prob:?} < {network_min_win_prob}"
424 )));
425 }
426
427 let minimum_capacity = self
430 .chain_api
431 .count_accounts(AccountSelector {
432 public_only: true,
433 ..Default::default()
434 })
435 .await
436 .map_err(HoprLibError::chain)?
437 .saturating_mul(2)
438 .saturating_add(100);
439
440 let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
441 .ok()
442 .and_then(|s| s.trim().parse::<usize>().ok())
443 .filter(|&c| c > 0)
444 .unwrap_or(2048)
445 .max(minimum_capacity);
446
447 debug!(
448 capacity = chain_discovery_events_capacity,
449 minimum_required = minimum_capacity,
450 "creating chain discovery events channel"
451 );
452 let (indexer_peer_update_tx, indexer_peer_update_rx) =
453 channel::<PeerDiscovery>(chain_discovery_events_capacity);
454
455 let (announcements_stream, announcements_handle) = futures::stream::abortable(
458 self.chain_api
459 .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])
460 .map_err(HoprLibError::chain)?,
461 );
462 processes.insert(HoprLibProcess::AccountAnnouncements, announcements_handle);
463
464 spawn(
465 announcements_stream
466 .filter_map(|event| {
467 futures::future::ready(event.try_as_announcement().map(|account| {
468 PeerDiscovery::Announce(account.public_key.into(), account.get_multiaddrs().to_vec())
469 }))
470 })
471 .map(Ok)
472 .forward(indexer_peer_update_tx)
473 .inspect(
474 |_| warn!(task = %HoprLibProcess::AccountAnnouncements,"long-running background task finished"),
475 ),
476 );
477
478 info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
479
480 let safe_addr = self.cfg.safe_module.safe_address;
481
482 if self.me_onchain() == safe_addr {
483 return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
484 }
485
486 info!(%safe_addr, "registering safe with this node");
487 match self.chain_api.register_safe(&safe_addr).await {
488 Ok(awaiter) => {
489 awaiter.await.map_err(HoprLibError::chain)?;
491 info!(%safe_addr, "safe successfully registered with this node");
492 }
493 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
494 info!(%safe_addr, "this safe is already registered with this node");
495 }
496 Err(error) => {
497 error!(%safe_addr, %error, "safe registration failed");
498 return Err(HoprLibError::chain(error));
499 }
500 }
501
502 let multiaddresses_to_announce = if self.is_public() {
504 self.transport_api.announceable_multiaddresses()
507 } else {
508 Vec::with_capacity(0)
509 };
510
511 multiaddresses_to_announce
513 .iter()
514 .filter(|a| !is_public_address(a))
515 .for_each(|multi_addr| tracing::warn!(?multi_addr, "announcing private multiaddress"));
516
517 info!(?multiaddresses_to_announce, "announcing node on chain");
520 match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
521 Ok(awaiter) => {
522 awaiter.await.map_err(HoprLibError::chain)?;
524 info!(?multiaddresses_to_announce, "node has been successfully announced");
525 }
526 Err(AnnouncementError::AlreadyAnnounced) => {
527 info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
528 }
529 Err(error) => {
530 error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
531 return Err(HoprLibError::chain(error));
532 }
533 }
534
535 let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
536 .ok()
537 .and_then(|s| s.trim().parse::<usize>().ok())
538 .filter(|&c| c > 0)
539 .unwrap_or(256);
540
541 let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
542 #[cfg(feature = "session-server")]
543 {
544 debug!(capacity = incoming_session_channel_capacity, "creating session server");
545 processes.insert(
546 HoprLibProcess::SessionServer,
547 hopr_async_runtime::spawn_as_abortable!(
548 _session_rx
549 .for_each_concurrent(None, move |session| {
550 let serve_handler = serve_handler.clone();
551 async move {
552 let session_id = *session.session.id();
553 match serve_handler.process(session).await {
554 Ok(_) => debug!(?session_id, "client session processed successfully"),
555 Err(error) => error!(
556 ?session_id,
557 %error,
558 "client session processing failed"
559 ),
560 }
561 }
562 })
563 .inspect(|_| tracing::warn!(
564 task = %HoprLibProcess::SessionServer,
565 "long-running background task finished"
566 ))
567 ),
568 );
569 }
570
571 info!("starting ticket events processor");
572 let (tickets_tx, tickets_rx) = channel(1024);
573 let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
574 processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
575 let node_db = self.node_db.clone();
576 let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
577 spawn(
578 tickets_rx
579 .filter_map(move |ticket_event| {
580 let node_db = node_db.clone();
581 async move {
582 match ticket_event {
583 TicketEvent::WinningTicket(winning) => {
584 if let Err(error) = node_db.insert_ticket(*winning).await {
585 tracing::error!(%error, %winning, "failed to insert ticket into database");
586 } else {
587 tracing::debug!(%winning, "inserted ticket into database");
588 }
589 Some(winning)
590 }
591 TicketEvent::RejectedTicket(rejected, issuer) => {
592 if let Some(issuer) = &issuer {
593 if let Err(error) =
594 node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
595 {
596 tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
597 } else {
598 tracing::debug!(%rejected, "marked ticket as rejected");
599 }
600 } else {
601 tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
602 }
603 None
604 }
605 }
606 }
607 })
608 .for_each(move |ticket| {
609 if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
610 tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
611 }
612 futures::future::ready(())
613 })
614 .inspect(|_| {
615 tracing::warn!(
616 task = %HoprLibProcess::TicketEvents,
617 "long-running background task finished"
618 )
619 }),
620 );
621
622 info!("starting transport");
623 let (hopr_socket, transport_processes) = self
624 .transport_api
625 .run(cover_traffic, indexer_peer_update_rx, tickets_tx, session_tx)
626 .await?;
627 processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
628
629 let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
631 let _ = self.redeem_requests.set(redemption_req_tx);
632 let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
633 processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
634 let chain = self.chain_api.clone();
635 let node_db = self.node_db.clone();
636 spawn(redemption_req_rx
637 .for_each(move |selector| {
638 let chain = chain.clone();
639 let db = node_db.clone();
640 async move {
641 match chain.redeem_tickets_via_selectors(&db, [selector]).await {
642 Ok(res) => debug!(%res, "redemption complete"),
643 Err(error) => error!(%error, "redemption failed"),
644 }
645 }
646 })
647 .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
648 );
649
650 let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
651 processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
652 let chain = self.chain_api.clone();
653 let node_db = self.node_db.clone();
654 let events = chain.subscribe().map_err(HoprLibError::chain)?;
655 spawn(
656 futures::stream::Abortable::new(
657 events
658 .filter_map(move |event|
659 futures::future::ready(event.try_as_channel_closed())
660 ),
661 chain_events_sub_reg
662 )
663 .for_each(move |closed_channel| {
664 let node_db = node_db.clone();
665 let chain = chain.clone();
666 async move {
667 match closed_channel.direction(chain.me()) {
668 Some(ChannelDirection::Incoming) => {
669 match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
670 Ok(num_neglected) if num_neglected > 0 => {
671 warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
672 },
673 Ok(_) => {
674 debug!(%closed_channel, "no neglected tickets on incoming closed channel");
675 },
676 Err(error) => {
677 error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
678 }
679 }
680 },
681 Some(ChannelDirection::Outgoing) => {
682 if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
683 error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
684 } else {
685 debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
686 }
687 }
688 _ => {} }
690 }
691 })
692 .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
693 );
694
695 let mut channels = self
700 .chain_api
701 .stream_channels(ChannelSelector {
702 destination: self.me_onchain().into(),
703 ..Default::default()
704 })
705 .map_err(HoprLibError::chain)
706 .await?;
707
708 while let Some(channel) = channels.next().await {
709 self.node_db
710 .update_ticket_states_and_fetch(
711 [TicketSelector::from(&channel)
712 .with_state(AcknowledgedTicketStatus::BeingRedeemed)
713 .with_index_range(channel.ticket_index..)],
714 AcknowledgedTicketStatus::Untouched,
715 )
716 .map_err(HoprLibError::db)
717 .await?
718 .for_each(|ticket| {
719 info!(%ticket, "fixed next out-of-sync ticket");
720 futures::future::ready(())
721 })
722 .await;
723 }
724
725 self.state.store(HoprState::Running, Ordering::Relaxed);
726
727 info!(
728 id = %self.me_peer_id(),
729 version = constants::APP_VERSION,
730 "NODE STARTED AND RUNNING"
731 );
732
733 #[cfg(all(feature = "prometheus", not(test)))]
734 METRIC_HOPR_NODE_INFO.set(
735 &[
736 &self.me.public().to_peerid_str(),
737 &self.me_onchain().to_string(),
738 &self.cfg.safe_module.safe_address.to_string(),
739 &self.cfg.safe_module.module_address.to_string(),
740 ],
741 1.0,
742 );
743
744 let _ = self.processes.set(processes);
745 Ok(hopr_socket)
746 }
747
748 pub fn shutdown(&self) -> Result<(), HoprLibError> {
756 self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
757 if let Some(processes) = self.processes.get() {
758 processes.abort_all();
759 }
760 self.state.store(HoprState::Terminated, Ordering::Relaxed);
761 info!("NODE SHUTDOWN COMPLETE");
762 Ok(())
763 }
764
765 pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send {
767 self.winning_ticket_subscribers.1.activate_cloned()
768 }
769
770 pub fn me_peer_id(&self) -> PeerId {
773 (*self.me.public()).into()
774 }
775
776 pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
778 Ok(self
779 .chain_api
780 .stream_accounts(AccountSelector {
781 public_only: true,
782 ..Default::default()
783 })
784 .map_err(HoprLibError::chain)
785 .await?
786 .map(|entry| {
787 (
788 PeerId::from(entry.public_key),
789 entry.chain_addr,
790 entry.get_multiaddrs().to_vec(),
791 )
792 })
793 .collect()
794 .await)
795 }
796
797 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
801 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
802
803 Ok(self.transport_api.ping(peer).await?)
804 }
805
806 #[cfg(feature = "session-client")]
809 pub async fn connect_to(
810 &self,
811 destination: Address,
812 target: SessionTarget,
813 cfg: SessionClientConfig,
814 ) -> errors::Result<HoprSession> {
815 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
816
817 let backoff = backon::ConstantBuilder::default()
818 .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
819 .with_delay(self.cfg.protocol.session.establish_retry_timeout)
820 .with_jitter();
821
822 use backon::Retryable;
823
824 Ok((|| {
825 let cfg = cfg.clone();
826 let target = target.clone();
827 async { self.transport_api.new_session(destination, target, cfg).await }
828 })
829 .retry(backoff)
830 .sleep(backon::FuturesTimerSleeper)
831 .await?)
832 }
833
834 #[cfg(feature = "session-client")]
837 pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
838 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
839 Ok(self.transport_api.probe_session(id).await?)
840 }
841
842 #[cfg(feature = "session-client")]
843 pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
844 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
845 Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
846 }
847
848 #[cfg(feature = "session-client")]
849 pub async fn update_session_surb_balancer_config(
850 &self,
851 id: &SessionId,
852 cfg: SurbBalancerConfig,
853 ) -> errors::Result<()> {
854 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
855 Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
856 }
857
858 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
860 self.transport_api.local_multiaddresses()
861 }
862
863 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
865 self.transport_api.listening_multiaddresses().await
866 }
867
868 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
870 self.transport_api.network_observed_multiaddresses(peer).await
871 }
872
873 pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> errors::Result<Vec<Multiaddr>> {
875 let peer = *peer;
876 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
878
879 match self
880 .chain_api
881 .stream_accounts(AccountSelector {
882 public_only: false,
883 offchain_key: Some(pubkey),
884 ..Default::default()
885 })
886 .map_err(HoprLibError::chain)
887 .await?
888 .next()
889 .await
890 {
891 Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
892 None => {
893 error!(%peer, "no information");
894 Ok(vec![])
895 }
896 }
897 }
898
899 pub async fn network_health(&self) -> Health {
903 self.transport_api.network_health().await
904 }
905
906 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
908 Ok(self.transport_api.network_connected_peers().await?)
909 }
910
911 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
913 Ok(self.transport_api.network_peer_info(peer).await?)
914 }
915
916 pub async fn all_network_peers(
918 &self,
919 minimum_quality: f64,
920 ) -> errors::Result<Vec<(Option<Address>, PeerId, PeerStatus)>> {
921 Ok(
922 futures::stream::iter(self.transport_api.network_connected_peers().await?)
923 .filter_map(|peer| async move {
924 if let Ok(Some(info)) = self.transport_api.network_peer_info(&peer).await {
925 if info.get_average_quality() >= minimum_quality {
926 Some((peer, info))
927 } else {
928 None
929 }
930 } else {
931 None
932 }
933 })
934 .filter_map(|(peer_id, info)| async move {
935 let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
936 Some((address, peer_id, info))
937 })
938 .collect::<Vec<_>>()
939 .await,
940 )
941 }
942
943 pub async fn tickets_in_channel(&self, channel_id: &ChannelId) -> errors::Result<Option<Vec<RedeemableTicket>>> {
946 if let Some(channel) = self
947 .chain_api
948 .channel_by_id(channel_id)
949 .await
950 .map_err(|e| HoprTransportError::Other(e.into()))?
951 {
952 if &channel.destination == self.chain_api.me() {
953 Ok(Some(
954 self.node_db
955 .stream_tickets([&channel])
956 .await
957 .map_err(HoprLibError::db)?
958 .collect()
959 .await,
960 ))
961 } else {
962 Ok(None)
963 }
964 } else {
965 Ok(None)
966 }
967 }
968
969 pub async fn all_tickets(&self) -> errors::Result<Vec<VerifiedTicket>> {
971 Ok(self
972 .node_db
973 .stream_tickets(None::<TicketSelector>)
974 .await
975 .map_err(HoprLibError::db)?
976 .map(|v| v.ticket)
977 .collect()
978 .await)
979 }
980
981 pub async fn ticket_statistics(&self) -> errors::Result<ChannelTicketStatistics> {
983 self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
984 }
985
986 pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
988 self.node_db
989 .reset_ticket_statistics()
990 .await
991 .map_err(HoprLibError::chain)
992 }
993
994 pub fn me_onchain(&self) -> Address {
996 *self.chain_api.me()
997 }
998
999 pub async fn get_ticket_price(&self) -> errors::Result<HoprBalance> {
1001 self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1002 }
1003
1004 pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
1006 self.chain_api
1007 .minimum_incoming_ticket_win_prob()
1008 .await
1009 .map_err(HoprLibError::chain)
1010 }
1011
1012 pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1014 Ok(self
1015 .chain_api
1016 .stream_accounts(AccountSelector {
1017 public_only: true,
1018 ..Default::default()
1019 })
1020 .map_err(HoprLibError::chain)
1021 .await?
1022 .collect()
1023 .await)
1024 }
1025
1026 pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1029 self.chain_api
1030 .channel_by_id(channel_id)
1031 .await
1032 .map_err(HoprLibError::chain)
1033 }
1034
1035 pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<Option<ChannelEntry>> {
1040 self.chain_api
1041 .channel_by_parties(src, dest)
1042 .await
1043 .map_err(HoprLibError::chain)
1044 }
1045
1046 pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1048 Ok(self
1049 .chain_api
1050 .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1051 ChannelStatusDiscriminants::Closed,
1052 ChannelStatusDiscriminants::Open,
1053 ChannelStatusDiscriminants::PendingToClose,
1054 ]))
1055 .map_err(HoprLibError::chain)
1056 .await?
1057 .collect()
1058 .await)
1059 }
1060
1061 pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1063 Ok(self
1064 .chain_api
1065 .stream_channels(
1066 ChannelSelector::default()
1067 .with_destination(*dest)
1068 .with_allowed_states(&[
1069 ChannelStatusDiscriminants::Closed,
1070 ChannelStatusDiscriminants::Open,
1071 ChannelStatusDiscriminants::PendingToClose,
1072 ]),
1073 )
1074 .map_err(HoprLibError::chain)
1075 .await?
1076 .collect()
1077 .await)
1078 }
1079
1080 pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1082 Ok(self
1083 .chain_api
1084 .stream_channels(ChannelSelector::default().with_allowed_states(&[
1085 ChannelStatusDiscriminants::Closed,
1086 ChannelStatusDiscriminants::Open,
1087 ChannelStatusDiscriminants::PendingToClose,
1088 ]))
1089 .map_err(HoprLibError::chain)
1090 .await?
1091 .collect()
1092 .await)
1093 }
1094
1095 pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1097 self.chain_api
1098 .safe_allowance(self.cfg.safe_module.safe_address)
1099 .await
1100 .map_err(HoprLibError::chain)
1101 }
1102
1103 pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1107 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1108
1109 self.chain_api
1110 .withdraw(amount, &recipient)
1111 .and_then(identity)
1112 .map_err(HoprLibError::chain)
1113 .await
1114 }
1115
1116 pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<prelude::Hash> {
1120 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1121
1122 self.chain_api
1123 .withdraw(amount, &recipient)
1124 .and_then(identity)
1125 .map_err(HoprLibError::chain)
1126 .await
1127 }
1128
1129 pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1130 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1131
1132 let (channel_id, tx_hash) = self
1133 .chain_api
1134 .open_channel(destination, amount)
1135 .and_then(identity)
1136 .map_err(HoprLibError::chain)
1137 .await?;
1138
1139 Ok(OpenChannelResult { tx_hash, channel_id })
1140 }
1141
1142 pub async fn fund_channel(&self, channel_id: &prelude::Hash, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1143 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1144
1145 self.chain_api
1146 .fund_channel(channel_id, amount)
1147 .and_then(identity)
1148 .map_err(HoprLibError::chain)
1149 .await
1150 }
1151
1152 pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> errors::Result<CloseChannelResult> {
1153 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1154
1155 let tx_hash = self
1156 .chain_api
1157 .close_channel(channel_id)
1158 .and_then(identity)
1159 .map_err(HoprLibError::chain)
1160 .await?;
1161
1162 Ok(CloseChannelResult { tx_hash })
1163 }
1164
1165 pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1166 self.chain_api
1167 .channel_closure_notice_period()
1168 .await
1169 .map_err(HoprLibError::chain)
1170 }
1171
1172 pub fn redemption_requests(
1173 &self,
1174 ) -> errors::Result<impl futures::Sink<TicketSelector, Error = HoprLibError> + Clone> {
1175 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1176
1177 Ok(self
1179 .redeem_requests
1180 .get()
1181 .cloned()
1182 .expect("redeem_requests is not initialized")
1183 .sink_map_err(HoprLibError::other))
1184 }
1185
1186 pub async fn redeem_all_tickets<B: Into<HoprBalance>>(&self, min_value: B) -> errors::Result<()> {
1187 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1188
1189 let min_value = min_value.into();
1190
1191 self.chain_api
1192 .stream_channels(
1193 ChannelSelector::default()
1194 .with_destination(self.me_onchain())
1195 .with_allowed_states(&[
1196 ChannelStatusDiscriminants::Open,
1197 ChannelStatusDiscriminants::PendingToClose,
1198 ]),
1199 )
1200 .map_err(HoprLibError::chain)
1201 .await?
1202 .map(|channel| {
1203 Ok(TicketSelector::from(&channel)
1204 .with_amount(min_value..)
1205 .with_index_range(channel.ticket_index..)
1206 .with_state(AcknowledgedTicketStatus::Untouched))
1207 })
1208 .forward(self.redemption_requests()?)
1209 .await?;
1210
1211 Ok(())
1212 }
1213
1214 pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1215 &self,
1216 counterparty: &Address,
1217 min_value: B,
1218 ) -> errors::Result<()> {
1219 self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1220 .await
1221 }
1222
1223 pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1224 &self,
1225 channel_id: &Hash,
1226 min_value: B,
1227 ) -> errors::Result<()> {
1228 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1229
1230 let channel = self
1231 .chain_api
1232 .channel_by_id(channel_id)
1233 .await
1234 .map_err(HoprLibError::chain)?
1235 .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1236
1237 self.redemption_requests()?
1238 .send(
1239 TicketSelector::from(channel)
1240 .with_amount(min_value.into()..)
1241 .with_index_range(channel.ticket_index..)
1242 .with_state(AcknowledgedTicketStatus::Untouched),
1243 )
1244 .await?;
1245
1246 Ok(())
1247 }
1248
1249 pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1250 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1251
1252 self.redemption_requests()?
1253 .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1254 .await?;
1255
1256 Ok(())
1257 }
1258
1259 pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1260 let peer_id = *peer_id;
1261 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer_id))
1263 .await
1264 .map_err(|e| HoprLibError::GeneralError(format!("failed to convert peer id to off-chain key: {}", e)))?;
1265
1266 self.chain_api
1267 .packet_key_to_chain_key(&pubkey)
1268 .await
1269 .map_err(HoprLibError::chain)
1270 }
1271
1272 pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1273 self.chain_api
1274 .chain_key_to_packet_key(address)
1275 .await
1276 .map(|pk| pk.map(|v| v.into()))
1277 .map_err(HoprLibError::chain)
1278 }
1279}
1280
1281impl<Chain, Db> Hopr<Chain, Db> {
1282 pub fn collect_hopr_metrics() -> errors::Result<String> {
1285 cfg_if::cfg_if! {
1286 if #[cfg(all(feature = "prometheus", not(test)))] {
1287 hopr_metrics::gather_all_metrics().map_err(|e| HoprLibError::Other(e.into()))
1288 } else {
1289 Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
1290 }
1291 }
1292 }
1293}