1mod helpers;
18
19pub mod config;
21pub mod constants;
23pub mod errors;
25
26pub mod utils;
28
29pub mod traits;
31
32pub mod state;
34
35#[cfg(any(feature = "testing", test))]
36pub mod testing;
37
38#[doc(hidden)]
40pub mod exports {
41 pub use hopr_api as api;
42 pub mod types {
43 pub use hopr_internal_types as internal;
44 pub use hopr_primitive_types as primitive;
45 }
46 pub mod crypto {
47 pub use hopr_crypto_keypair as keypair;
48 pub use hopr_crypto_types as types;
49 }
50
51 pub mod network {
52 pub use hopr_network_types as types;
53 }
54
55 pub use hopr_transport as transport;
56}
57
58#[doc(hidden)]
60pub mod prelude {
61 pub use super::exports::{
62 crypto::{
63 keypair::key_pair::HoprKeys,
64 types::prelude::{ChainKeypair, Hash, OffchainKeypair},
65 },
66 network::types::{
67 prelude::ForeignDataMode,
68 udp::{ConnectedUdpStream, UdpStreamParallelism},
69 },
70 transport::{OffchainPublicKey, socket::HoprSocket},
71 types::primitive::prelude::Address,
72 };
73}
74
75use std::{
76 convert::identity,
77 num::NonZeroUsize,
78 sync::{Arc, OnceLock, atomic::Ordering},
79 time::Duration,
80};
81
82use futures::{FutureExt, SinkExt, StreamExt, TryFutureExt, channel::mpsc::channel};
83use hopr_api::{
84 chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
85 db::{HoprNodeDbApi, PeerStatus, TicketMarker, TicketSelector},
86};
87use hopr_async_runtime::prelude::spawn;
88pub use hopr_async_runtime::{Abortable, AbortableList};
89pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
90pub use hopr_crypto_types::prelude::*;
91pub use hopr_internal_types::prelude::*;
92pub use hopr_network_types::prelude::*;
93#[cfg(all(feature = "prometheus", not(test)))]
94use hopr_platform::time::native::current_time;
95pub use hopr_primitive_types::prelude::*;
96#[cfg(feature = "runtime-tokio")]
97pub use hopr_transport::transfer_session;
98pub use hopr_transport::*;
99use tracing::{debug, error, info, trace, warn};
100
101pub use crate::{
102 config::SafeModule,
103 constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
104 errors::{HoprLibError, HoprStatusError},
105 state::{HoprLibProcess, HoprState},
106 traits::chain::{CloseChannelResult, OpenChannelResult},
107};
108
109#[cfg(all(feature = "prometheus", not(test)))]
110lazy_static::lazy_static! {
111 static ref METRIC_PROCESS_START_TIME: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
112 "hopr_start_time",
113 "The unix timestamp in seconds at which the process was started"
114 ).unwrap();
115 static ref METRIC_HOPR_LIB_VERSION: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
116 "hopr_lib_version",
117 "Executed version of hopr-lib",
118 &["version"]
119 ).unwrap();
120 static ref METRIC_HOPR_NODE_INFO: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
121 "hopr_node_addresses",
122 "Node on-chain and off-chain addresses",
123 &["peerid", "address", "safe_address", "module_address"]
124 ).unwrap();
125}
126
127pub struct DummyCoverTrafficType {
128 #[allow(dead_code)]
129 _unconstructable: (),
130}
131
132impl TrafficGeneration for DummyCoverTrafficType {
133 fn build(
134 self,
135 ) -> (
136 impl futures::Stream<Item = DestinationRouting> + Send,
137 impl futures::Sink<
138 std::result::Result<hopr_transport::Telemetry, hopr_transport::ProbeError>,
139 Error = impl std::error::Error,
140 > + Send
141 + Sync
142 + Clone
143 + 'static,
144 ) {
145 (
146 futures::stream::empty(),
147 futures::sink::drain::<std::result::Result<hopr_transport::Telemetry, hopr_transport::ProbeError>>(),
148 )
149 }
150}
151
152#[cfg(feature = "runtime-tokio")]
157pub fn prepare_tokio_runtime(
158 num_cpu_threads: Option<NonZeroUsize>,
159 num_io_threads: Option<NonZeroUsize>,
160) -> anyhow::Result<tokio::runtime::Runtime> {
161 use std::str::FromStr;
162 let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
163
164 hopr_parallelize::cpu::init_thread_pool(
165 num_cpu_threads
166 .map(|v| v.get())
167 .or(avail_parallelism)
168 .ok_or(anyhow::anyhow!(
169 "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
170 environment variable."
171 ))?
172 .max(1),
173 )?;
174
175 Ok(tokio::runtime::Builder::new_multi_thread()
176 .enable_all()
177 .worker_threads(
178 num_io_threads
179 .map(|v| v.get())
180 .or(avail_parallelism)
181 .ok_or(anyhow::anyhow!(
182 "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
183 environment variable."
184 ))?
185 .max(1),
186 )
187 .thread_name("hoprd")
188 .thread_stack_size(
189 std::env::var("HOPRD_THREAD_STACK_SIZE")
190 .ok()
191 .and_then(|v| usize::from_str(&v).ok())
192 .unwrap_or(10 * 1024 * 1024)
193 .max(2 * 1024 * 1024),
194 )
195 .build()?)
196}
197
198pub type HoprTransportIO = socket::HoprSocket<
200 futures::channel::mpsc::Receiver<ApplicationDataIn>,
201 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
202>;
203
204pub struct Hopr<Chain, Db> {
216 me: OffchainKeypair,
217 cfg: config::HoprLibConfig,
218 state: Arc<state::AtomicHoprState>,
219 transport_api: HoprTransport<Db, Chain>,
220 redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
221 node_db: Db,
222 chain_api: Chain,
223 processes: OnceLock<AbortableList<HoprLibProcess>>,
224}
225
226impl<Chain, Db> Hopr<Chain, Db>
227where
228 Chain: HoprChainApi + Clone + Send + Sync + 'static,
229 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
230{
231 pub async fn new(
232 cfg: config::HoprLibConfig,
233 hopr_chain_api: Chain,
234 hopr_node_db: Db,
235 me: &OffchainKeypair,
236 me_onchain: &ChainKeypair,
237 ) -> errors::Result<Self> {
238 if hopr_crypto_random::is_rng_fixed() {
239 warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
240 }
241
242 let multiaddress: Multiaddr = (&cfg.host).try_into().map_err(HoprLibError::TransportError)?;
243
244 let my_multiaddresses = vec![multiaddress];
245
246 let hopr_transport_api = HoprTransport::new(
247 me,
248 me_onchain,
249 HoprTransportConfig {
250 transport: cfg.transport.clone(),
251 network: cfg.network_options.clone(),
252 protocol: cfg.protocol,
253 probe: cfg.probe,
254 session: cfg.session,
255 },
256 hopr_node_db.clone(),
257 hopr_chain_api.clone(),
258 my_multiaddresses,
259 );
260
261 #[cfg(all(feature = "prometheus", not(test)))]
262 {
263 METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
264 METRIC_HOPR_LIB_VERSION.set(
265 &[const_format::formatcp!("{}", constants::APP_VERSION)],
266 const_format::formatcp!(
267 "{}.{}",
268 env!("CARGO_PKG_VERSION_MAJOR"),
269 env!("CARGO_PKG_VERSION_MINOR")
270 )
271 .parse()
272 .unwrap_or(0.0),
273 );
274
275 if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
277 error!(%error, "failed to initialize ticket statistics metrics");
278 }
279 }
280
281 Ok(Self {
282 me: me.clone(),
283 cfg,
284 state: Arc::new(state::AtomicHoprState::new(HoprState::Uninitialized)),
285 transport_api: hopr_transport_api,
286 chain_api: hopr_chain_api,
287 node_db: hopr_node_db,
288 redeem_requests: OnceLock::new(),
289 processes: OnceLock::new(),
290 })
291 }
292
293 fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
294 if self.status() == state {
295 Ok(())
296 } else {
297 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
298 }
299 }
300
301 pub fn status(&self) -> HoprState {
302 self.state.load(Ordering::Relaxed)
303 }
304
305 pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
306 self.chain_api
307 .get_balance(self.me_onchain())
308 .await
309 .map_err(HoprLibError::chain)
310 }
311
312 pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
313 self.chain_api
314 .get_balance(self.cfg.safe_module.safe_address)
315 .await
316 .map_err(HoprLibError::chain)
317 }
318
319 pub async fn chain_info(&self) -> errors::Result<ChainInfo> {
320 self.chain_api.chain_info().await.map_err(HoprLibError::chain)
321 }
322
323 pub fn get_safe_config(&self) -> SafeModule {
324 self.cfg.safe_module.clone()
325 }
326
327 pub fn config(&self) -> &config::HoprLibConfig {
328 &self.cfg
329 }
330
331 #[inline]
332 fn is_public(&self) -> bool {
333 self.cfg.publish
334 }
335
336 pub async fn run<
337 Ct,
338 #[cfg(feature = "session-server")] T: traits::session::HoprSessionServer + Clone + Send + 'static,
339 >(
340 &self,
341 cover_traffic: Option<Ct>,
342 #[cfg(feature = "session-server")] serve_handler: T,
343 ) -> errors::Result<HoprTransportIO>
344 where
345 Ct: TrafficGeneration + Send + Sync + 'static,
346 {
347 self.error_if_not_in_state(
348 HoprState::Uninitialized,
349 "cannot start the hopr node multiple times".into(),
350 )?;
351
352 #[cfg(feature = "testing")]
353 warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
354
355 info!(
356 address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
357 "node is not started, please fund this node",
358 );
359
360 helpers::wait_for_funds(
361 *MIN_NATIVE_BALANCE,
362 *SUGGESTED_NATIVE_BALANCE,
363 Duration::from_secs(200),
364 self.me_onchain(),
365 &self.chain_api,
366 )
367 .await?;
368
369 let mut processes = AbortableList::<HoprLibProcess>::default();
370
371 info!("starting HOPR node...");
372 self.state.store(HoprState::Initializing, Ordering::Relaxed);
373
374 let balance: XDaiBalance = self.get_balance().await?;
375 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
376
377 info!(
378 address = %self.me_onchain(),
379 %balance,
380 %minimum_balance,
381 "node information"
382 );
383
384 if balance.le(&minimum_balance) {
385 return Err(HoprLibError::GeneralError(
386 "cannot start the node without a sufficiently funded wallet".into(),
387 ));
388 }
389
390 let network_min_ticket_price = self
393 .chain_api
394 .minimum_ticket_price()
395 .await
396 .map_err(HoprLibError::chain)?;
397 let configured_ticket_price = self.cfg.protocol.outgoing_ticket_price;
398 if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
399 return Err(HoprLibError::GeneralError(format!(
400 "configured outgoing ticket price is lower than the network minimum ticket price: \
401 {configured_ticket_price:?} < {network_min_ticket_price}"
402 )));
403 }
404 let network_min_win_prob = self
407 .chain_api
408 .minimum_incoming_ticket_win_prob()
409 .await
410 .map_err(HoprLibError::chain)?;
411 let configured_win_prob = self.cfg.protocol.outgoing_ticket_winning_prob;
412 if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
413 && configured_win_prob
414 .and_then(|c| WinningProbability::try_from(c).ok())
415 .is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
416 {
417 return Err(HoprLibError::GeneralError(format!(
418 "configured outgoing ticket winning probability is lower than the network minimum winning \
419 probability: {configured_win_prob:?} < {network_min_win_prob}"
420 )));
421 }
422
423 let minimum_capacity = self
426 .chain_api
427 .count_accounts(AccountSelector {
428 public_only: true,
429 ..Default::default()
430 })
431 .await
432 .map_err(HoprLibError::chain)?
433 .saturating_mul(2)
434 .saturating_add(100);
435
436 let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
437 .ok()
438 .and_then(|s| s.trim().parse::<usize>().ok())
439 .filter(|&c| c > 0)
440 .unwrap_or(2048)
441 .max(minimum_capacity);
442
443 debug!(
444 capacity = chain_discovery_events_capacity,
445 minimum_required = minimum_capacity,
446 "creating chain discovery events channel"
447 );
448 let (indexer_peer_update_tx, indexer_peer_update_rx) =
449 channel::<PeerDiscovery>(chain_discovery_events_capacity);
450
451 let (announcements_stream, announcements_handle) = futures::stream::abortable(
454 self.chain_api
455 .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])
456 .map_err(HoprLibError::chain)?,
457 );
458 processes.insert(HoprLibProcess::AccountAnnouncements, announcements_handle);
459
460 spawn(
461 announcements_stream
462 .filter_map(|event| {
463 futures::future::ready(event.try_as_announcement().map(|account| {
464 PeerDiscovery::Announce(account.public_key.into(), account.get_multiaddrs().to_vec())
465 }))
466 })
467 .map(Ok)
468 .forward(indexer_peer_update_tx)
469 .inspect(
470 |_| warn!(task = %HoprLibProcess::AccountAnnouncements,"long-running background task finished"),
471 ),
472 );
473
474 info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
475
476 let safe_addr = self.cfg.safe_module.safe_address;
477
478 if self.me_onchain() == safe_addr {
479 return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
480 }
481
482 info!(%safe_addr, "registering safe with this node");
483 match self.chain_api.register_safe(&safe_addr).await {
484 Ok(awaiter) => {
485 awaiter.await.map_err(HoprLibError::chain)?;
487 info!(%safe_addr, "safe successfully registered with this node");
488 }
489 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
490 info!(%safe_addr, "this safe is already registered with this node");
491 }
492 Err(error) => {
493 error!(%safe_addr, %error, "safe registration failed");
494 return Err(HoprLibError::chain(error));
495 }
496 }
497
498 let multiaddresses_to_announce = if self.is_public() {
500 self.transport_api.announceable_multiaddresses()
503 } else {
504 Vec::with_capacity(0)
505 };
506
507 multiaddresses_to_announce
509 .iter()
510 .filter(|a| !is_public_address(a))
511 .for_each(|multi_addr| tracing::warn!(?multi_addr, "announcing private multiaddress"));
512
513 info!(?multiaddresses_to_announce, "announcing node on chain");
516 match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
517 Ok(awaiter) => {
518 awaiter.await.map_err(HoprLibError::chain)?;
520 info!(?multiaddresses_to_announce, "node has been successfully announced");
521 }
522 Err(AnnouncementError::AlreadyAnnounced) => {
523 info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
524 }
525 Err(error) => {
526 error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
527 return Err(HoprLibError::chain(error));
528 }
529 }
530
531 let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
532 .ok()
533 .and_then(|s| s.trim().parse::<usize>().ok())
534 .filter(|&c| c > 0)
535 .unwrap_or(256);
536
537 let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
538 #[cfg(feature = "session-server")]
539 {
540 debug!(capacity = incoming_session_channel_capacity, "creating session server");
541 processes.insert(
542 HoprLibProcess::SessionServer,
543 hopr_async_runtime::spawn_as_abortable!(
544 _session_rx
545 .for_each_concurrent(None, move |session| {
546 let serve_handler = serve_handler.clone();
547 async move {
548 let session_id = *session.session.id();
549 match serve_handler.process(session).await {
550 Ok(_) => debug!(?session_id, "client session processed successfully"),
551 Err(error) => error!(
552 ?session_id,
553 %error,
554 "client session processing failed"
555 ),
556 }
557 }
558 })
559 .inspect(|_| tracing::warn!(
560 task = %HoprLibProcess::SessionServer,
561 "long-running background task finished"
562 ))
563 ),
564 );
565 }
566
567 info!("starting transport");
568
569 let (hopr_socket, transport_processes) = self
570 .transport_api
571 .run(cover_traffic, indexer_peer_update_rx, session_tx)
572 .await?;
573 processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
574
575 info!("starting outgoing ticket flush process");
576 let (index_flush_stream, index_flush_handle) =
577 futures::stream::abortable(futures_time::stream::interval(Duration::from_secs(5).into()));
578 processes.insert(HoprLibProcess::TicketIndexFlush, index_flush_handle);
579 let node_db = self.node_db.clone();
580 spawn(
581 index_flush_stream
582 .for_each(move |_| {
583 let node_db = node_db.clone();
584 async move {
585 match node_db.persist_outgoing_ticket_indices().await {
586 Ok(count) => trace!(count, "successfully flushed states of outgoing ticket indices"),
587 Err(error) => error!(%error, "Failed to flush ticket indices"),
588 }
589 }
590 })
591 .inspect(|_| {
592 tracing::warn!(
593 task = %HoprLibProcess::TicketIndexFlush,
594 "long-running background task finished"
595 )
596 }),
597 );
598
599 let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
601 let _ = self.redeem_requests.set(redemption_req_tx);
602 let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
603 processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
604 let chain = self.chain_api.clone();
605 let node_db = self.node_db.clone();
606 spawn(redemption_req_rx
607 .for_each(move |selector| {
608 let chain = chain.clone();
609 let db = node_db.clone();
610 async move {
611 match chain.redeem_tickets_via_selector(&db, selector).await {
612 Ok(res) => debug!(%res, "redemption complete"),
613 Err(error) => error!(%error, "redemption failed"),
614 }
615 }
616 })
617 .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
618 );
619
620 let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
621 processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
622 let chain = self.chain_api.clone();
623 let node_db = self.node_db.clone();
624 let events = chain.subscribe().map_err(HoprLibError::chain)?;
625 spawn(
626 futures::stream::Abortable::new(
627 events
628 .filter_map(move |event|
629 futures::future::ready(
630 event
631 .try_as_channel_closed()
632 .filter(|channel| channel.direction(chain.me()) == Some(ChannelDirection::Incoming))
633 )
634 ),
635 chain_events_sub_reg
636 )
637 .for_each(move |closed_channel| {
638 let node_db = node_db.clone();
639 async move {
640 match node_db.mark_tickets_as(closed_channel.into(), TicketMarker::Neglected).await {
641 Ok(num_neglected) if num_neglected > 0 => {
642 warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
643 },
644 Ok(_) => {
645 debug!(%closed_channel, "no neglected tickets on incoming closed channel");
646 },
647 Err(error) => {
648 error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
649 }
650 }
651 }
652 })
653 .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
654 );
655
656 let mut channels = self
661 .chain_api
662 .stream_channels(ChannelSelector {
663 destination: self.me_onchain().into(),
664 ..Default::default()
665 })
666 .map_err(HoprLibError::chain)
667 .await?;
668
669 while let Some(channel) = channels.next().await {
670 self.node_db
671 .update_ticket_states_and_fetch(
672 TicketSelector::from(&channel)
673 .with_state(AcknowledgedTicketStatus::BeingRedeemed)
674 .with_index_range(channel.ticket_index.as_u64()..),
675 AcknowledgedTicketStatus::Untouched,
676 )
677 .map_err(HoprLibError::db)
678 .await?
679 .for_each(|ticket| {
680 info!(%ticket, "fixed next out-of-sync ticket");
681 futures::future::ready(())
682 })
683 .await;
684 }
685
686 self.state.store(HoprState::Running, Ordering::Relaxed);
687
688 info!(
689 id = %self.me_peer_id(),
690 version = constants::APP_VERSION,
691 "NODE STARTED AND RUNNING"
692 );
693
694 #[cfg(all(feature = "prometheus", not(test)))]
695 METRIC_HOPR_NODE_INFO.set(
696 &[
697 &self.me.public().to_peerid_str(),
698 &self.me_onchain().to_string(),
699 &self.cfg.safe_module.safe_address.to_string(),
700 &self.cfg.safe_module.module_address.to_string(),
701 ],
702 1.0,
703 );
704
705 let _ = self.processes.set(processes);
706 Ok(hopr_socket)
707 }
708
709 pub fn shutdown(&self) -> Result<(), HoprLibError> {
717 self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
718 if let Some(processes) = self.processes.get() {
719 processes.abort_all();
720 }
721 self.state.store(HoprState::Terminated, Ordering::Relaxed);
722 info!("NODE SHUTDOWN COMPLETE");
723 Ok(())
724 }
725
726 pub fn me_peer_id(&self) -> PeerId {
729 (*self.me.public()).into()
730 }
731
732 pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
734 Ok(self
735 .chain_api
736 .stream_accounts(AccountSelector {
737 public_only: true,
738 ..Default::default()
739 })
740 .map_err(HoprLibError::chain)
741 .await?
742 .map(|entry| {
743 (
744 PeerId::from(entry.public_key),
745 entry.chain_addr,
746 entry.get_multiaddrs().to_vec(),
747 )
748 })
749 .collect()
750 .await)
751 }
752
753 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
757 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
758
759 Ok(self.transport_api.ping(peer).await?)
760 }
761
762 #[cfg(feature = "session-client")]
765 pub async fn connect_to(
766 &self,
767 destination: Address,
768 target: SessionTarget,
769 cfg: SessionClientConfig,
770 ) -> errors::Result<HoprSession> {
771 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
772
773 let backoff = backon::ConstantBuilder::default()
774 .with_max_times(self.cfg.session.establish_max_retries as usize)
775 .with_delay(self.cfg.session.establish_retry_timeout)
776 .with_jitter();
777
778 use backon::Retryable;
779
780 Ok((|| {
781 let cfg = cfg.clone();
782 let target = target.clone();
783 async { self.transport_api.new_session(destination, target, cfg).await }
784 })
785 .retry(backoff)
786 .sleep(backon::FuturesTimerSleeper)
787 .await?)
788 }
789
790 #[cfg(feature = "session-client")]
793 pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
794 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
795 Ok(self.transport_api.probe_session(id).await?)
796 }
797
798 #[cfg(feature = "session-client")]
799 pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
800 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
801 Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
802 }
803
804 #[cfg(feature = "session-client")]
805 pub async fn update_session_surb_balancer_config(
806 &self,
807 id: &SessionId,
808 cfg: SurbBalancerConfig,
809 ) -> errors::Result<()> {
810 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
811 Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
812 }
813
814 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
816 self.transport_api.local_multiaddresses()
817 }
818
819 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
821 self.transport_api.listening_multiaddresses().await
822 }
823
824 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
826 self.transport_api.network_observed_multiaddresses(peer).await
827 }
828
829 pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> errors::Result<Vec<Multiaddr>> {
831 let peer = *peer;
832 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
834
835 match self
836 .chain_api
837 .stream_accounts(AccountSelector {
838 public_only: false,
839 offchain_key: Some(pubkey),
840 ..Default::default()
841 })
842 .map_err(HoprLibError::chain)
843 .await?
844 .next()
845 .await
846 {
847 Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
848 None => {
849 error!(%peer, "no information");
850 Ok(vec![])
851 }
852 }
853 }
854
855 pub async fn network_health(&self) -> Health {
859 self.transport_api.network_health().await
860 }
861
862 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
864 Ok(self.transport_api.network_connected_peers().await?)
865 }
866
867 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
869 Ok(self.transport_api.network_peer_info(peer).await?)
870 }
871
872 pub async fn all_network_peers(
874 &self,
875 minimum_quality: f64,
876 ) -> errors::Result<Vec<(Option<Address>, PeerId, PeerStatus)>> {
877 Ok(
878 futures::stream::iter(self.transport_api.network_connected_peers().await?)
879 .filter_map(|peer| async move {
880 if let Ok(Some(info)) = self.transport_api.network_peer_info(&peer).await {
881 if info.get_average_quality() >= minimum_quality {
882 Some((peer, info))
883 } else {
884 None
885 }
886 } else {
887 None
888 }
889 })
890 .filter_map(|(peer_id, info)| async move {
891 let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
892 Some((address, peer_id, info))
893 })
894 .collect::<Vec<_>>()
895 .await,
896 )
897 }
898
899 pub async fn tickets_in_channel(&self, channel: &prelude::Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
902 Ok(self.transport_api.tickets_in_channel(channel).await?)
903 }
904
905 pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
907 Ok(self.transport_api.all_tickets().await?)
908 }
909
910 pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
912 Ok(self.transport_api.ticket_statistics().await?)
913 }
914
915 pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
917 self.node_db
918 .reset_ticket_statistics()
919 .await
920 .map_err(HoprLibError::chain)
921 }
922
923 pub fn me_onchain(&self) -> Address {
925 *self.chain_api.me()
926 }
927
928 pub async fn get_ticket_price(&self) -> errors::Result<HoprBalance> {
930 self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
931 }
932
933 pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
935 self.chain_api
936 .minimum_incoming_ticket_win_prob()
937 .await
938 .map_err(HoprLibError::chain)
939 }
940
941 pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
943 Ok(self
944 .chain_api
945 .stream_accounts(AccountSelector {
946 public_only: true,
947 ..Default::default()
948 })
949 .map_err(HoprLibError::chain)
950 .await?
951 .collect()
952 .await)
953 }
954
955 pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
958 self.chain_api
959 .channel_by_id(channel_id)
960 .await
961 .map_err(HoprLibError::chain)
962 }
963
964 pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<Option<ChannelEntry>> {
969 self.chain_api
970 .channel_by_parties(src, dest)
971 .await
972 .map_err(HoprLibError::chain)
973 }
974
975 pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
977 Ok(self
978 .chain_api
979 .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
980 ChannelStatusDiscriminants::Closed,
981 ChannelStatusDiscriminants::Open,
982 ChannelStatusDiscriminants::PendingToClose,
983 ]))
984 .map_err(HoprLibError::chain)
985 .await?
986 .collect()
987 .await)
988 }
989
990 pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
992 Ok(self
993 .chain_api
994 .stream_channels(
995 ChannelSelector::default()
996 .with_destination(*dest)
997 .with_allowed_states(&[
998 ChannelStatusDiscriminants::Closed,
999 ChannelStatusDiscriminants::Open,
1000 ChannelStatusDiscriminants::PendingToClose,
1001 ]),
1002 )
1003 .map_err(HoprLibError::chain)
1004 .await?
1005 .collect()
1006 .await)
1007 }
1008
1009 pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1011 Ok(self
1012 .chain_api
1013 .stream_channels(ChannelSelector::default().with_allowed_states(&[
1014 ChannelStatusDiscriminants::Closed,
1015 ChannelStatusDiscriminants::Open,
1016 ChannelStatusDiscriminants::PendingToClose,
1017 ]))
1018 .map_err(HoprLibError::chain)
1019 .await?
1020 .collect()
1021 .await)
1022 }
1023
1024 pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1026 self.chain_api
1027 .safe_allowance(self.cfg.safe_module.safe_address)
1028 .await
1029 .map_err(HoprLibError::chain)
1030 }
1031
1032 pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1036 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1037
1038 self.chain_api
1039 .withdraw(amount, &recipient)
1040 .and_then(identity)
1041 .map_err(HoprLibError::chain)
1042 .await
1043 }
1044
1045 pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<prelude::Hash> {
1049 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1050
1051 self.chain_api
1052 .withdraw(amount, &recipient)
1053 .and_then(identity)
1054 .map_err(HoprLibError::chain)
1055 .await
1056 }
1057
1058 pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1059 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1060
1061 let (channel_id, tx_hash) = self
1062 .chain_api
1063 .open_channel(destination, amount)
1064 .and_then(identity)
1065 .map_err(HoprLibError::chain)
1066 .await?;
1067
1068 Ok(OpenChannelResult { tx_hash, channel_id })
1069 }
1070
1071 pub async fn fund_channel(&self, channel_id: &prelude::Hash, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1072 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1073
1074 self.chain_api
1075 .fund_channel(channel_id, amount)
1076 .and_then(identity)
1077 .map_err(HoprLibError::chain)
1078 .await
1079 }
1080
1081 pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> errors::Result<CloseChannelResult> {
1082 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1083
1084 let tx_hash = self
1085 .chain_api
1086 .close_channel(channel_id)
1087 .and_then(identity)
1088 .map_err(HoprLibError::chain)
1089 .await?;
1090
1091 Ok(CloseChannelResult { tx_hash })
1092 }
1093
1094 pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1095 self.chain_api
1096 .channel_closure_notice_period()
1097 .await
1098 .map_err(HoprLibError::chain)
1099 }
1100
1101 pub fn redemption_requests(
1102 &self,
1103 ) -> errors::Result<impl futures::Sink<TicketSelector, Error = HoprLibError> + Clone> {
1104 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1105
1106 Ok(self
1108 .redeem_requests
1109 .get()
1110 .cloned()
1111 .expect("redeem_requests is not initialized")
1112 .sink_map_err(HoprLibError::other))
1113 }
1114
1115 pub async fn redeem_all_tickets<B: Into<HoprBalance>>(&self, min_value: B) -> errors::Result<()> {
1116 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1117
1118 let min_value = min_value.into();
1119
1120 self.chain_api
1121 .stream_channels(
1122 ChannelSelector::default()
1123 .with_destination(self.me_onchain())
1124 .with_allowed_states(&[
1125 ChannelStatusDiscriminants::Open,
1126 ChannelStatusDiscriminants::PendingToClose,
1127 ]),
1128 )
1129 .map_err(HoprLibError::chain)
1130 .await?
1131 .map(|channel| {
1132 Ok(TicketSelector::from(&channel)
1133 .with_amount(min_value..)
1134 .with_index_range(channel.ticket_index.as_u64()..)
1135 .with_state(AcknowledgedTicketStatus::Untouched))
1136 })
1137 .forward(self.redemption_requests()?)
1138 .await?;
1139
1140 Ok(())
1141 }
1142
1143 pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1144 &self,
1145 counterparty: &Address,
1146 min_value: B,
1147 ) -> errors::Result<()> {
1148 self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1149 .await
1150 }
1151
1152 pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1153 &self,
1154 channel_id: &Hash,
1155 min_value: B,
1156 ) -> errors::Result<()> {
1157 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1158
1159 let channel = self
1160 .chain_api
1161 .channel_by_id(channel_id)
1162 .await
1163 .map_err(HoprLibError::chain)?
1164 .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1165
1166 self.redemption_requests()?
1167 .send(
1168 TicketSelector::from(channel)
1169 .with_amount(min_value.into()..)
1170 .with_index_range(channel.ticket_index.as_u64()..)
1171 .with_state(AcknowledgedTicketStatus::Untouched),
1172 )
1173 .await?;
1174
1175 Ok(())
1176 }
1177
1178 pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1179 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1180
1181 self.redemption_requests()?
1182 .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1183 .await?;
1184
1185 Ok(())
1186 }
1187
1188 pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1189 let peer_id = *peer_id;
1190 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer_id))
1192 .await
1193 .map_err(|e| HoprLibError::GeneralError(format!("failed to convert peer id to off-chain key: {}", e)))?;
1194
1195 self.chain_api
1196 .packet_key_to_chain_key(&pubkey)
1197 .await
1198 .map_err(HoprLibError::chain)
1199 }
1200
1201 pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1202 self.chain_api
1203 .chain_key_to_packet_key(address)
1204 .await
1205 .map(|pk| pk.map(|v| v.into()))
1206 .map_err(HoprLibError::chain)
1207 }
1208
1209 pub fn collect_hopr_metrics() -> errors::Result<String> {
1212 cfg_if::cfg_if! {
1213 if #[cfg(all(feature = "prometheus", not(test)))] {
1214 hopr_metrics::gather_all_metrics().map_err(|e| HoprLibError::Other(e.into()))
1215 } else {
1216 Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
1217 }
1218 }
1219 }
1220}