1mod helpers;
18
19pub mod config;
21pub mod constants;
23pub mod errors;
25
26pub mod utils;
28
29pub mod traits;
31
32pub mod state;
34
35#[cfg(any(feature = "testing", test))]
36pub mod testing;
37
38#[doc(hidden)]
40pub mod exports {
41 pub use hopr_api as api;
42
43 pub mod types {
44 pub use hopr_internal_types as internal;
45 pub use hopr_primitive_types as primitive;
46 }
47
48 pub mod crypto {
49 pub use hopr_crypto_keypair as keypair;
50 pub use hopr_crypto_types as types;
51 }
52
53 pub mod network {
54 pub use hopr_network_types as types;
55 }
56
57 pub use hopr_transport as transport;
58}
59
60#[doc(hidden)]
62pub mod prelude {
63 pub use super::exports::{
64 crypto::{
65 keypair::key_pair::HoprKeys,
66 types::prelude::{ChainKeypair, Hash, OffchainKeypair},
67 },
68 network::types::{
69 prelude::ForeignDataMode,
70 udp::{ConnectedUdpStream, UdpStreamParallelism},
71 },
72 transport::{OffchainPublicKey, socket::HoprSocket},
73 types::primitive::prelude::Address,
74 };
75}
76
77use std::{
78 convert::identity,
79 num::NonZeroUsize,
80 sync::{Arc, OnceLock, atomic::Ordering},
81 time::Duration,
82};
83
84use futures::{FutureExt, SinkExt, StreamExt, TryFutureExt, channel::mpsc::channel};
85use hopr_api::{
86 chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
87 db::{HoprNodeDbApi, PeerStatus, TicketMarker, TicketSelector},
88};
89use hopr_async_runtime::prelude::spawn;
90pub use hopr_async_runtime::{Abortable, AbortableList};
91pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
92pub use hopr_crypto_types::prelude::*;
93pub use hopr_internal_types::prelude::*;
94pub use hopr_network_types::prelude::*;
95#[cfg(all(feature = "prometheus", not(test)))]
96use hopr_platform::time::native::current_time;
97pub use hopr_primitive_types::prelude::*;
98#[cfg(feature = "runtime-tokio")]
99pub use hopr_transport::transfer_session;
100pub use hopr_transport::*;
101use tracing::{debug, error, info, trace, warn};
102
103pub use crate::{
104 config::SafeModule,
105 constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
106 errors::{HoprLibError, HoprStatusError},
107 state::{HoprLibProcess, HoprState},
108 traits::chain::{CloseChannelResult, OpenChannelResult},
109};
110
111#[cfg(all(feature = "prometheus", not(test)))]
112lazy_static::lazy_static! {
113 static ref METRIC_PROCESS_START_TIME: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
114 "hopr_start_time",
115 "The unix timestamp in seconds at which the process was started"
116 ).unwrap();
117 static ref METRIC_HOPR_LIB_VERSION: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
118 "hopr_lib_version",
119 "Executed version of hopr-lib",
120 &["version"]
121 ).unwrap();
122 static ref METRIC_HOPR_NODE_INFO: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
123 "hopr_node_addresses",
124 "Node on-chain and off-chain addresses",
125 &["peerid", "address", "safe_address", "module_address"]
126 ).unwrap();
127}
128
129pub struct DummyCoverTrafficType {
130 #[allow(dead_code)]
131 _unconstructable: (),
132}
133
134impl TrafficGeneration for DummyCoverTrafficType {
135 fn build(
136 self,
137 ) -> (
138 impl futures::Stream<Item = DestinationRouting> + Send,
139 impl futures::Sink<
140 std::result::Result<hopr_transport::Telemetry, hopr_transport::ProbeError>,
141 Error = impl std::error::Error,
142 > + Send
143 + Sync
144 + Clone
145 + 'static,
146 ) {
147 (
148 futures::stream::empty(),
149 futures::sink::drain::<std::result::Result<hopr_transport::Telemetry, hopr_transport::ProbeError>>(),
150 )
151 }
152}
153
154#[cfg(feature = "runtime-tokio")]
159pub fn prepare_tokio_runtime(
160 num_cpu_threads: Option<NonZeroUsize>,
161 num_io_threads: Option<NonZeroUsize>,
162) -> anyhow::Result<tokio::runtime::Runtime> {
163 use std::str::FromStr;
164 let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
165
166 hopr_parallelize::cpu::init_thread_pool(
167 num_cpu_threads
168 .map(|v| v.get())
169 .or(avail_parallelism)
170 .ok_or(anyhow::anyhow!(
171 "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
172 environment variable."
173 ))?
174 .max(1),
175 )?;
176
177 Ok(tokio::runtime::Builder::new_multi_thread()
178 .enable_all()
179 .worker_threads(
180 num_io_threads
181 .map(|v| v.get())
182 .or(avail_parallelism)
183 .ok_or(anyhow::anyhow!(
184 "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
185 environment variable."
186 ))?
187 .max(1),
188 )
189 .thread_name("hoprd")
190 .thread_stack_size(
191 std::env::var("HOPRD_THREAD_STACK_SIZE")
192 .ok()
193 .and_then(|v| usize::from_str(&v).ok())
194 .unwrap_or(10 * 1024 * 1024)
195 .max(2 * 1024 * 1024),
196 )
197 .build()?)
198}
199
200pub type HoprTransportIO = socket::HoprSocket<
202 futures::channel::mpsc::Receiver<ApplicationDataIn>,
203 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
204>;
205
206pub struct Hopr<Chain, Db> {
218 me: OffchainKeypair,
219 cfg: config::HoprLibConfig,
220 state: Arc<state::AtomicHoprState>,
221 transport_api: HoprTransport<Db, Chain>,
222 redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
223 node_db: Db,
224 chain_api: Chain,
225 processes: OnceLock<AbortableList<HoprLibProcess>>,
226}
227
228impl<Chain, Db> Hopr<Chain, Db>
229where
230 Chain: HoprChainApi + Clone + Send + Sync + 'static,
231 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
232{
233 pub async fn new(
234 cfg: config::HoprLibConfig,
235 hopr_chain_api: Chain,
236 hopr_node_db: Db,
237 me: &OffchainKeypair,
238 me_onchain: &ChainKeypair,
239 ) -> errors::Result<Self> {
240 if hopr_crypto_random::is_rng_fixed() {
241 warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
242 }
243
244 let multiaddress: Multiaddr = (&cfg.host).try_into().map_err(HoprLibError::TransportError)?;
245
246 let my_multiaddresses = vec![multiaddress];
247
248 let hopr_transport_api = HoprTransport::new(
249 me,
250 me_onchain,
251 HoprTransportConfig {
252 transport: cfg.transport.clone(),
253 network: cfg.network_options.clone(),
254 protocol: cfg.protocol,
255 probe: cfg.probe,
256 session: cfg.session,
257 },
258 hopr_node_db.clone(),
259 hopr_chain_api.clone(),
260 my_multiaddresses,
261 );
262
263 #[cfg(all(feature = "prometheus", not(test)))]
264 {
265 METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
266 METRIC_HOPR_LIB_VERSION.set(
267 &[const_format::formatcp!("{}", constants::APP_VERSION)],
268 const_format::formatcp!(
269 "{}.{}",
270 env!("CARGO_PKG_VERSION_MAJOR"),
271 env!("CARGO_PKG_VERSION_MINOR")
272 )
273 .parse()
274 .unwrap_or(0.0),
275 );
276
277 if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
279 error!(%error, "failed to initialize ticket statistics metrics");
280 }
281 }
282
283 Ok(Self {
284 me: me.clone(),
285 cfg,
286 state: Arc::new(state::AtomicHoprState::new(HoprState::Uninitialized)),
287 transport_api: hopr_transport_api,
288 chain_api: hopr_chain_api,
289 node_db: hopr_node_db,
290 redeem_requests: OnceLock::new(),
291 processes: OnceLock::new(),
292 })
293 }
294
295 fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
296 if self.status() == state {
297 Ok(())
298 } else {
299 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
300 }
301 }
302
303 pub fn status(&self) -> HoprState {
304 self.state.load(Ordering::Relaxed)
305 }
306
307 pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
308 self.chain_api
309 .get_balance(self.me_onchain())
310 .await
311 .map_err(HoprLibError::chain)
312 }
313
314 pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
315 self.chain_api
316 .get_balance(self.cfg.safe_module.safe_address)
317 .await
318 .map_err(HoprLibError::chain)
319 }
320
321 pub async fn chain_info(&self) -> errors::Result<ChainInfo> {
322 self.chain_api.chain_info().await.map_err(HoprLibError::chain)
323 }
324
325 pub fn get_safe_config(&self) -> SafeModule {
326 self.cfg.safe_module.clone()
327 }
328
329 pub fn config(&self) -> &config::HoprLibConfig {
330 &self.cfg
331 }
332
333 #[inline]
334 fn is_public(&self) -> bool {
335 self.cfg.publish
336 }
337
338 pub async fn run<
339 Ct,
340 #[cfg(feature = "session-server")] T: traits::session::HoprSessionServer + Clone + Send + 'static,
341 >(
342 &self,
343 cover_traffic: Option<Ct>,
344 #[cfg(feature = "session-server")] serve_handler: T,
345 ) -> errors::Result<HoprTransportIO>
346 where
347 Ct: TrafficGeneration + Send + Sync + 'static,
348 {
349 self.error_if_not_in_state(
350 HoprState::Uninitialized,
351 "cannot start the hopr node multiple times".into(),
352 )?;
353
354 #[cfg(feature = "testing")]
355 warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
356
357 info!(
358 address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
359 "node is not started, please fund this node",
360 );
361
362 helpers::wait_for_funds(
363 *MIN_NATIVE_BALANCE,
364 *SUGGESTED_NATIVE_BALANCE,
365 Duration::from_secs(200),
366 self.me_onchain(),
367 &self.chain_api,
368 )
369 .await?;
370
371 let mut processes = AbortableList::<HoprLibProcess>::default();
372
373 info!("starting HOPR node...");
374 self.state.store(HoprState::Initializing, Ordering::Relaxed);
375
376 let balance: XDaiBalance = self.get_balance().await?;
377 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
378
379 info!(
380 address = %self.me_onchain(),
381 %balance,
382 %minimum_balance,
383 "node information"
384 );
385
386 if balance.le(&minimum_balance) {
387 return Err(HoprLibError::GeneralError(
388 "cannot start the node without a sufficiently funded wallet".into(),
389 ));
390 }
391
392 let network_min_ticket_price = self
395 .chain_api
396 .minimum_ticket_price()
397 .await
398 .map_err(HoprLibError::chain)?;
399 let configured_ticket_price = self.cfg.protocol.outgoing_ticket_price;
400 if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
401 return Err(HoprLibError::GeneralError(format!(
402 "configured outgoing ticket price is lower than the network minimum ticket price: \
403 {configured_ticket_price:?} < {network_min_ticket_price}"
404 )));
405 }
406 let network_min_win_prob = self
409 .chain_api
410 .minimum_incoming_ticket_win_prob()
411 .await
412 .map_err(HoprLibError::chain)?;
413 let configured_win_prob = self.cfg.protocol.outgoing_ticket_winning_prob;
414 if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
415 && configured_win_prob
416 .and_then(|c| WinningProbability::try_from(c).ok())
417 .is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
418 {
419 return Err(HoprLibError::GeneralError(format!(
420 "configured outgoing ticket winning probability is lower than the network minimum winning \
421 probability: {configured_win_prob:?} < {network_min_win_prob}"
422 )));
423 }
424
425 let minimum_capacity = self
428 .chain_api
429 .count_accounts(AccountSelector {
430 public_only: true,
431 ..Default::default()
432 })
433 .await
434 .map_err(HoprLibError::chain)?
435 .saturating_mul(2)
436 .saturating_add(100);
437
438 let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
439 .ok()
440 .and_then(|s| s.trim().parse::<usize>().ok())
441 .filter(|&c| c > 0)
442 .unwrap_or(2048)
443 .max(minimum_capacity);
444
445 debug!(
446 capacity = chain_discovery_events_capacity,
447 minimum_required = minimum_capacity,
448 "creating chain discovery events channel"
449 );
450 let (indexer_peer_update_tx, indexer_peer_update_rx) =
451 channel::<PeerDiscovery>(chain_discovery_events_capacity);
452
453 let (announcements_stream, announcements_handle) = futures::stream::abortable(
456 self.chain_api
457 .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])
458 .map_err(HoprLibError::chain)?,
459 );
460 processes.insert(HoprLibProcess::AccountAnnouncements, announcements_handle);
461
462 spawn(
463 announcements_stream
464 .filter_map(|event| {
465 futures::future::ready(event.try_as_announcement().map(|account| {
466 PeerDiscovery::Announce(account.public_key.into(), account.get_multiaddrs().to_vec())
467 }))
468 })
469 .map(Ok)
470 .forward(indexer_peer_update_tx)
471 .inspect(
472 |_| warn!(task = %HoprLibProcess::AccountAnnouncements,"long-running background task finished"),
473 ),
474 );
475
476 info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
477
478 let safe_addr = self.cfg.safe_module.safe_address;
479
480 if self.me_onchain() == safe_addr {
481 return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
482 }
483
484 info!(%safe_addr, "registering safe with this node");
485 match self.chain_api.register_safe(&safe_addr).await {
486 Ok(awaiter) => {
487 awaiter.await.map_err(HoprLibError::chain)?;
489 info!(%safe_addr, "safe successfully registered with this node");
490 }
491 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
492 info!(%safe_addr, "this safe is already registered with this node");
493 }
494 Err(error) => {
495 error!(%safe_addr, %error, "safe registration failed");
496 return Err(HoprLibError::chain(error));
497 }
498 }
499
500 let multiaddresses_to_announce = if self.is_public() {
502 self.transport_api.announceable_multiaddresses()
505 } else {
506 Vec::with_capacity(0)
507 };
508
509 multiaddresses_to_announce
511 .iter()
512 .filter(|a| !is_public_address(a))
513 .for_each(|multi_addr| tracing::warn!(?multi_addr, "announcing private multiaddress"));
514
515 info!(?multiaddresses_to_announce, "announcing node on chain");
518 match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
519 Ok(awaiter) => {
520 awaiter.await.map_err(HoprLibError::chain)?;
522 info!(?multiaddresses_to_announce, "node has been successfully announced");
523 }
524 Err(AnnouncementError::AlreadyAnnounced) => {
525 info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
526 }
527 Err(error) => {
528 error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
529 return Err(HoprLibError::chain(error));
530 }
531 }
532
533 let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
534 .ok()
535 .and_then(|s| s.trim().parse::<usize>().ok())
536 .filter(|&c| c > 0)
537 .unwrap_or(256);
538
539 let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
540 #[cfg(feature = "session-server")]
541 {
542 debug!(capacity = incoming_session_channel_capacity, "creating session server");
543 processes.insert(
544 HoprLibProcess::SessionServer,
545 hopr_async_runtime::spawn_as_abortable!(
546 _session_rx
547 .for_each_concurrent(None, move |session| {
548 let serve_handler = serve_handler.clone();
549 async move {
550 let session_id = *session.session.id();
551 match serve_handler.process(session).await {
552 Ok(_) => debug!(?session_id, "client session processed successfully"),
553 Err(error) => error!(
554 ?session_id,
555 %error,
556 "client session processing failed"
557 ),
558 }
559 }
560 })
561 .inspect(|_| tracing::warn!(
562 task = %HoprLibProcess::SessionServer,
563 "long-running background task finished"
564 ))
565 ),
566 );
567 }
568
569 info!("starting transport");
570
571 let (hopr_socket, transport_processes) = self
572 .transport_api
573 .run(cover_traffic, indexer_peer_update_rx, session_tx)
574 .await?;
575 processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
576
577 info!("starting outgoing ticket flush process");
578 let (index_flush_stream, index_flush_handle) =
579 futures::stream::abortable(futures_time::stream::interval(Duration::from_secs(5).into()));
580 processes.insert(HoprLibProcess::TicketIndexFlush, index_flush_handle);
581 let node_db = self.node_db.clone();
582 spawn(
583 index_flush_stream
584 .for_each(move |_| {
585 let node_db = node_db.clone();
586 async move {
587 match node_db.persist_outgoing_ticket_indices().await {
588 Ok(count) => trace!(count, "successfully flushed states of outgoing ticket indices"),
589 Err(error) => error!(%error, "Failed to flush ticket indices"),
590 }
591 }
592 })
593 .inspect(|_| {
594 tracing::warn!(
595 task = %HoprLibProcess::TicketIndexFlush,
596 "long-running background task finished"
597 )
598 }),
599 );
600
601 let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
603 let _ = self.redeem_requests.set(redemption_req_tx);
604 let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
605 processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
606 let chain = self.chain_api.clone();
607 let node_db = self.node_db.clone();
608 spawn(redemption_req_rx
609 .for_each(move |selector| {
610 let chain = chain.clone();
611 let db = node_db.clone();
612 async move {
613 match chain.redeem_tickets_via_selectors(&db, [selector]).await {
614 Ok(res) => debug!(%res, "redemption complete"),
615 Err(error) => error!(%error, "redemption failed"),
616 }
617 }
618 })
619 .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
620 );
621
622 let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
623 processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
624 let chain = self.chain_api.clone();
625 let node_db = self.node_db.clone();
626 let events = chain.subscribe().map_err(HoprLibError::chain)?;
627 spawn(
628 futures::stream::Abortable::new(
629 events
630 .filter_map(move |event|
631 futures::future::ready(event.try_as_channel_closed())
632 ),
633 chain_events_sub_reg
634 )
635 .for_each(move |closed_channel| {
636 let node_db = node_db.clone();
637 let chain = chain.clone();
638 async move {
639 match closed_channel.direction(chain.me()) {
640 Some(ChannelDirection::Incoming) => {
641 match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
642 Ok(num_neglected) if num_neglected > 0 => {
643 warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
644 },
645 Ok(_) => {
646 debug!(%closed_channel, "no neglected tickets on incoming closed channel");
647 },
648 Err(error) => {
649 error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
650 }
651 }
652 },
653 Some(ChannelDirection::Outgoing) => {
654 if let Err(error) = node_db.reset_outgoing_ticket_index(closed_channel.get_id()).await {
655 error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
656 } else {
657 debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
658 }
659 }
660 _ => {} }
662 }
663 })
664 .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
665 );
666
667 let mut channels = self
672 .chain_api
673 .stream_channels(ChannelSelector {
674 destination: self.me_onchain().into(),
675 ..Default::default()
676 })
677 .map_err(HoprLibError::chain)
678 .await?;
679
680 while let Some(channel) = channels.next().await {
681 self.node_db
682 .update_ticket_states_and_fetch(
683 [TicketSelector::from(&channel)
684 .with_state(AcknowledgedTicketStatus::BeingRedeemed)
685 .with_index_range(channel.ticket_index..)],
686 AcknowledgedTicketStatus::Untouched,
687 )
688 .map_err(HoprLibError::db)
689 .await?
690 .for_each(|ticket| {
691 info!(%ticket, "fixed next out-of-sync ticket");
692 futures::future::ready(())
693 })
694 .await;
695 }
696
697 self.state.store(HoprState::Running, Ordering::Relaxed);
698
699 info!(
700 id = %self.me_peer_id(),
701 version = constants::APP_VERSION,
702 "NODE STARTED AND RUNNING"
703 );
704
705 #[cfg(all(feature = "prometheus", not(test)))]
706 METRIC_HOPR_NODE_INFO.set(
707 &[
708 &self.me.public().to_peerid_str(),
709 &self.me_onchain().to_string(),
710 &self.cfg.safe_module.safe_address.to_string(),
711 &self.cfg.safe_module.module_address.to_string(),
712 ],
713 1.0,
714 );
715
716 let _ = self.processes.set(processes);
717 Ok(hopr_socket)
718 }
719
720 pub fn shutdown(&self) -> Result<(), HoprLibError> {
728 self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
729 if let Some(processes) = self.processes.get() {
730 processes.abort_all();
731 }
732 self.state.store(HoprState::Terminated, Ordering::Relaxed);
733 info!("NODE SHUTDOWN COMPLETE");
734 Ok(())
735 }
736
737 pub fn me_peer_id(&self) -> PeerId {
740 (*self.me.public()).into()
741 }
742
743 pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
745 Ok(self
746 .chain_api
747 .stream_accounts(AccountSelector {
748 public_only: true,
749 ..Default::default()
750 })
751 .map_err(HoprLibError::chain)
752 .await?
753 .map(|entry| {
754 (
755 PeerId::from(entry.public_key),
756 entry.chain_addr,
757 entry.get_multiaddrs().to_vec(),
758 )
759 })
760 .collect()
761 .await)
762 }
763
764 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
768 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
769
770 Ok(self.transport_api.ping(peer).await?)
771 }
772
773 #[cfg(feature = "session-client")]
776 pub async fn connect_to(
777 &self,
778 destination: Address,
779 target: SessionTarget,
780 cfg: SessionClientConfig,
781 ) -> errors::Result<HoprSession> {
782 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
783
784 let backoff = backon::ConstantBuilder::default()
785 .with_max_times(self.cfg.session.establish_max_retries as usize)
786 .with_delay(self.cfg.session.establish_retry_timeout)
787 .with_jitter();
788
789 use backon::Retryable;
790
791 Ok((|| {
792 let cfg = cfg.clone();
793 let target = target.clone();
794 async { self.transport_api.new_session(destination, target, cfg).await }
795 })
796 .retry(backoff)
797 .sleep(backon::FuturesTimerSleeper)
798 .await?)
799 }
800
801 #[cfg(feature = "session-client")]
804 pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
805 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
806 Ok(self.transport_api.probe_session(id).await?)
807 }
808
809 #[cfg(feature = "session-client")]
810 pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
811 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
812 Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
813 }
814
815 #[cfg(feature = "session-client")]
816 pub async fn update_session_surb_balancer_config(
817 &self,
818 id: &SessionId,
819 cfg: SurbBalancerConfig,
820 ) -> errors::Result<()> {
821 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
822 Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
823 }
824
825 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
827 self.transport_api.local_multiaddresses()
828 }
829
830 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
832 self.transport_api.listening_multiaddresses().await
833 }
834
835 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
837 self.transport_api.network_observed_multiaddresses(peer).await
838 }
839
840 pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> errors::Result<Vec<Multiaddr>> {
842 let peer = *peer;
843 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
845
846 match self
847 .chain_api
848 .stream_accounts(AccountSelector {
849 public_only: false,
850 offchain_key: Some(pubkey),
851 ..Default::default()
852 })
853 .map_err(HoprLibError::chain)
854 .await?
855 .next()
856 .await
857 {
858 Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
859 None => {
860 error!(%peer, "no information");
861 Ok(vec![])
862 }
863 }
864 }
865
866 pub async fn network_health(&self) -> Health {
870 self.transport_api.network_health().await
871 }
872
873 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
875 Ok(self.transport_api.network_connected_peers().await?)
876 }
877
878 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
880 Ok(self.transport_api.network_peer_info(peer).await?)
881 }
882
883 pub async fn all_network_peers(
885 &self,
886 minimum_quality: f64,
887 ) -> errors::Result<Vec<(Option<Address>, PeerId, PeerStatus)>> {
888 Ok(
889 futures::stream::iter(self.transport_api.network_connected_peers().await?)
890 .filter_map(|peer| async move {
891 if let Ok(Some(info)) = self.transport_api.network_peer_info(&peer).await {
892 if info.get_average_quality() >= minimum_quality {
893 Some((peer, info))
894 } else {
895 None
896 }
897 } else {
898 None
899 }
900 })
901 .filter_map(|(peer_id, info)| async move {
902 let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
903 Some((address, peer_id, info))
904 })
905 .collect::<Vec<_>>()
906 .await,
907 )
908 }
909
910 pub async fn tickets_in_channel(&self, channel: &prelude::Hash) -> errors::Result<Option<Vec<RedeemableTicket>>> {
913 Ok(self.transport_api.tickets_in_channel(channel).await?)
914 }
915
916 pub async fn all_tickets(&self) -> errors::Result<Vec<VerifiedTicket>> {
918 Ok(self.transport_api.all_tickets().await?)
919 }
920
921 pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
923 Ok(self.transport_api.ticket_statistics().await?)
924 }
925
926 pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
928 self.node_db
929 .reset_ticket_statistics()
930 .await
931 .map_err(HoprLibError::chain)
932 }
933
934 pub fn me_onchain(&self) -> Address {
936 *self.chain_api.me()
937 }
938
939 pub async fn get_ticket_price(&self) -> errors::Result<HoprBalance> {
941 self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
942 }
943
944 pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
946 self.chain_api
947 .minimum_incoming_ticket_win_prob()
948 .await
949 .map_err(HoprLibError::chain)
950 }
951
952 pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
954 Ok(self
955 .chain_api
956 .stream_accounts(AccountSelector {
957 public_only: true,
958 ..Default::default()
959 })
960 .map_err(HoprLibError::chain)
961 .await?
962 .collect()
963 .await)
964 }
965
966 pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
969 self.chain_api
970 .channel_by_id(channel_id)
971 .await
972 .map_err(HoprLibError::chain)
973 }
974
975 pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<Option<ChannelEntry>> {
980 self.chain_api
981 .channel_by_parties(src, dest)
982 .await
983 .map_err(HoprLibError::chain)
984 }
985
986 pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
988 Ok(self
989 .chain_api
990 .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
991 ChannelStatusDiscriminants::Closed,
992 ChannelStatusDiscriminants::Open,
993 ChannelStatusDiscriminants::PendingToClose,
994 ]))
995 .map_err(HoprLibError::chain)
996 .await?
997 .collect()
998 .await)
999 }
1000
1001 pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1003 Ok(self
1004 .chain_api
1005 .stream_channels(
1006 ChannelSelector::default()
1007 .with_destination(*dest)
1008 .with_allowed_states(&[
1009 ChannelStatusDiscriminants::Closed,
1010 ChannelStatusDiscriminants::Open,
1011 ChannelStatusDiscriminants::PendingToClose,
1012 ]),
1013 )
1014 .map_err(HoprLibError::chain)
1015 .await?
1016 .collect()
1017 .await)
1018 }
1019
1020 pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1022 Ok(self
1023 .chain_api
1024 .stream_channels(ChannelSelector::default().with_allowed_states(&[
1025 ChannelStatusDiscriminants::Closed,
1026 ChannelStatusDiscriminants::Open,
1027 ChannelStatusDiscriminants::PendingToClose,
1028 ]))
1029 .map_err(HoprLibError::chain)
1030 .await?
1031 .collect()
1032 .await)
1033 }
1034
1035 pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1037 self.chain_api
1038 .safe_allowance(self.cfg.safe_module.safe_address)
1039 .await
1040 .map_err(HoprLibError::chain)
1041 }
1042
1043 pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1047 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1048
1049 self.chain_api
1050 .withdraw(amount, &recipient)
1051 .and_then(identity)
1052 .map_err(HoprLibError::chain)
1053 .await
1054 }
1055
1056 pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<prelude::Hash> {
1060 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1061
1062 self.chain_api
1063 .withdraw(amount, &recipient)
1064 .and_then(identity)
1065 .map_err(HoprLibError::chain)
1066 .await
1067 }
1068
1069 pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1070 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1071
1072 let (channel_id, tx_hash) = self
1073 .chain_api
1074 .open_channel(destination, amount)
1075 .and_then(identity)
1076 .map_err(HoprLibError::chain)
1077 .await?;
1078
1079 Ok(OpenChannelResult { tx_hash, channel_id })
1080 }
1081
1082 pub async fn fund_channel(&self, channel_id: &prelude::Hash, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1083 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1084
1085 self.chain_api
1086 .fund_channel(channel_id, amount)
1087 .and_then(identity)
1088 .map_err(HoprLibError::chain)
1089 .await
1090 }
1091
1092 pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> errors::Result<CloseChannelResult> {
1093 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1094
1095 let tx_hash = self
1096 .chain_api
1097 .close_channel(channel_id)
1098 .and_then(identity)
1099 .map_err(HoprLibError::chain)
1100 .await?;
1101
1102 Ok(CloseChannelResult { tx_hash })
1103 }
1104
1105 pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1106 self.chain_api
1107 .channel_closure_notice_period()
1108 .await
1109 .map_err(HoprLibError::chain)
1110 }
1111
1112 pub fn redemption_requests(
1113 &self,
1114 ) -> errors::Result<impl futures::Sink<TicketSelector, Error = HoprLibError> + Clone> {
1115 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1116
1117 Ok(self
1119 .redeem_requests
1120 .get()
1121 .cloned()
1122 .expect("redeem_requests is not initialized")
1123 .sink_map_err(HoprLibError::other))
1124 }
1125
1126 pub async fn redeem_all_tickets<B: Into<HoprBalance>>(&self, min_value: B) -> errors::Result<()> {
1127 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1128
1129 let min_value = min_value.into();
1130
1131 self.chain_api
1132 .stream_channels(
1133 ChannelSelector::default()
1134 .with_destination(self.me_onchain())
1135 .with_allowed_states(&[
1136 ChannelStatusDiscriminants::Open,
1137 ChannelStatusDiscriminants::PendingToClose,
1138 ]),
1139 )
1140 .map_err(HoprLibError::chain)
1141 .await?
1142 .map(|channel| {
1143 Ok(TicketSelector::from(&channel)
1144 .with_amount(min_value..)
1145 .with_index_range(channel.ticket_index..)
1146 .with_state(AcknowledgedTicketStatus::Untouched))
1147 })
1148 .forward(self.redemption_requests()?)
1149 .await?;
1150
1151 Ok(())
1152 }
1153
1154 pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1155 &self,
1156 counterparty: &Address,
1157 min_value: B,
1158 ) -> errors::Result<()> {
1159 self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1160 .await
1161 }
1162
1163 pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1164 &self,
1165 channel_id: &Hash,
1166 min_value: B,
1167 ) -> errors::Result<()> {
1168 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1169
1170 let channel = self
1171 .chain_api
1172 .channel_by_id(channel_id)
1173 .await
1174 .map_err(HoprLibError::chain)?
1175 .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1176
1177 self.redemption_requests()?
1178 .send(
1179 TicketSelector::from(channel)
1180 .with_amount(min_value.into()..)
1181 .with_index_range(channel.ticket_index..)
1182 .with_state(AcknowledgedTicketStatus::Untouched),
1183 )
1184 .await?;
1185
1186 Ok(())
1187 }
1188
1189 pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1190 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1191
1192 self.redemption_requests()?
1193 .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1194 .await?;
1195
1196 Ok(())
1197 }
1198
1199 pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1200 let peer_id = *peer_id;
1201 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer_id))
1203 .await
1204 .map_err(|e| HoprLibError::GeneralError(format!("failed to convert peer id to off-chain key: {}", e)))?;
1205
1206 self.chain_api
1207 .packet_key_to_chain_key(&pubkey)
1208 .await
1209 .map_err(HoprLibError::chain)
1210 }
1211
1212 pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1213 self.chain_api
1214 .chain_key_to_packet_key(address)
1215 .await
1216 .map(|pk| pk.map(|v| v.into()))
1217 .map_err(HoprLibError::chain)
1218 }
1219}
1220
1221impl<Chain, Db> Hopr<Chain, Db> {
1222 pub fn collect_hopr_metrics() -> errors::Result<String> {
1225 cfg_if::cfg_if! {
1226 if #[cfg(all(feature = "prometheus", not(test)))] {
1227 hopr_metrics::gather_all_metrics().map_err(|e| HoprLibError::Other(e.into()))
1228 } else {
1229 Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
1230 }
1231 }
1232 }
1233}