1mod helpers;
17
18pub mod config;
20pub mod constants;
22pub mod errors;
24pub mod traits;
26pub mod types;
28pub mod utils;
30
31pub use hopr_api as api;
32
33#[doc(hidden)]
35pub mod exports {
36 pub mod types {
37 pub use hopr_api::types::{chain, internal, primitive};
38 }
39
40 pub mod crypto {
41 pub use hopr_api::types::crypto as types;
42 pub use hopr_crypto_keypair as keypair;
43 }
44
45 pub mod network {
46 pub use hopr_network_types as types;
47 }
48
49 pub use hopr_transport as transport;
50}
51
52#[doc(hidden)]
54pub mod prelude {
55 #[cfg(feature = "runtime-tokio")]
56 pub use super::exports::network::types::{
57 prelude::ForeignDataMode,
58 udp::{ConnectedUdpStream, UdpStreamParallelism},
59 };
60 pub use super::exports::{
61 crypto::{
62 keypair::key_pair::HoprKeys,
63 types::prelude::{ChainKeypair, Hash, OffchainKeypair},
64 },
65 transport::{OffchainPublicKey, socket::HoprSocket},
66 types::primitive::prelude::Address,
67 };
68}
69
70use std::{
71 convert::identity,
72 future::Future,
73 sync::{Arc, OnceLock, atomic::Ordering},
74 time::Duration,
75};
76
77use anyhow::anyhow;
78use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, channel::mpsc::channel, pin_mut};
79use futures_time::future::FutureExt as FuturesTimeFutureExt;
80use hopr_api::{
81 chain::*,
82 ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
83 node::{ChainInfo, CloseChannelResult, OpenChannelResult, SafeModuleConfig, state::AtomicHoprState},
84};
85pub use hopr_api::{
86 graph::EdgeLinkObservable,
87 network::{NetworkBuilder, NetworkStreamControl},
88 node::{HoprNodeNetworkOperations, HoprNodeOperations, state::HoprState},
89 tickets::{ChannelStats, RedemptionResult, TicketManagement, TicketManagementExt},
90 types::{crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
91};
92use hopr_async_runtime::prelude::spawn;
93pub use hopr_async_runtime::{Abortable, AbortableList};
94pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
95pub use hopr_network_types::prelude::*;
96#[cfg(all(feature = "telemetry", not(test)))]
97use hopr_platform::time::native::current_time;
98use hopr_ticket_manager::{HoprTicketManager, RedbStore, RedbTicketQueue};
99#[cfg(feature = "runtime-tokio")]
100pub use hopr_transport::transfer_session;
101pub use hopr_transport::*;
102use tracing::{debug, error, info, warn};
103use validator::Validate;
104
105pub use crate::{
106 config::SafeModule,
107 constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
108 errors::{HoprLibError, HoprStatusError},
109 types::{AnnouncedPeer, AnnouncementOrigin},
110};
111
112#[cfg(feature = "session-client")]
116#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, smart_default::SmartDefault)]
117#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
118pub struct HopRouting(
119 #[default(hopr_api::types::primitive::bounded::BoundedSize::MIN)]
120 hopr_api::types::primitive::bounded::BoundedSize<
121 { hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS },
122 >,
123);
124
125#[cfg(feature = "session-client")]
126impl HopRouting {
127 pub const MAX_HOPS: usize = hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS;
129
130 pub fn hop_count(self) -> usize {
132 self.0.into()
133 }
134}
135
136#[cfg(feature = "session-client")]
137impl TryFrom<usize> for HopRouting {
138 type Error = hopr_api::types::primitive::errors::GeneralError;
139
140 fn try_from(value: usize) -> Result<Self, Self::Error> {
141 Ok(Self(value.try_into()?))
142 }
143}
144
145#[cfg(feature = "session-client")]
146impl From<HopRouting> for hopr_api::types::internal::routing::RoutingOptions {
147 fn from(value: HopRouting) -> Self {
148 Self::Hops(value.0)
149 }
150}
151
152#[cfg(feature = "session-client")]
157#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault)]
158pub struct HoprSessionClientConfig {
159 pub forward_path: HopRouting,
161 pub return_path: HopRouting,
163 #[default(_code = "SessionCapability::Segmentation.into()")]
165 pub capabilities: SessionCapabilities,
166 #[default(None)]
168 pub pseudonym: Option<hopr_api::types::internal::protocol::HoprPseudonym>,
169 #[default(Some(SurbBalancerConfig::default()))]
171 pub surb_management: Option<SurbBalancerConfig>,
172 #[default(false)]
174 pub always_max_out_surbs: bool,
175}
176
177#[cfg(feature = "session-client")]
178impl From<HoprSessionClientConfig> for hopr_transport::SessionClientConfig {
179 fn from(value: HoprSessionClientConfig) -> Self {
180 Self {
181 forward_path_options: value.forward_path.into(),
182 return_path_options: value.return_path.into(),
183 capabilities: value.capabilities,
184 pseudonym: value.pseudonym,
185 surb_management: value.surb_management,
186 always_max_out_surbs: value.always_max_out_surbs,
187 }
188 }
189}
190
191#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display, strum::EnumCount)]
193pub enum HoprLibProcess {
194 #[strum(to_string = "transport: {0}")]
195 Transport(HoprTransportProcess),
196 #[strum(to_string = "session server providing the exit node session stream functionality")]
197 SessionServer,
198 #[strum(to_string = "ticket redemption queue driver")]
199 TicketRedemptions,
200 #[strum(to_string = "subscription for on-chain channel updates")]
201 ChannelEvents,
202 #[strum(to_string = "on received ticket event (winning or rejected)")]
203 TicketEvents,
204 #[strum(to_string = "persisting of outgoing ticket indices")]
205 OutIndexSync,
206}
207
208#[cfg(all(feature = "telemetry", not(test)))]
209lazy_static::lazy_static! {
210 static ref METRIC_PROCESS_START_TIME: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
211 "hopr_start_time",
212 "The unix timestamp in seconds at which the process was started"
213 ).unwrap();
214 static ref METRIC_HOPR_LIB_VERSION: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
215 "hopr_lib_version",
216 "Executed version of hopr-lib",
217 &["version"]
218 ).unwrap();
219 static ref METRIC_HOPR_NODE_INFO: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
220 "hopr_node_addresses",
221 "Node on-chain and off-chain addresses",
222 &["peerid", "address", "safe_address", "module_address"]
223 ).unwrap();
224}
225
226#[cfg(feature = "runtime-tokio")]
231pub fn prepare_tokio_runtime(
232 num_cpu_threads: Option<std::num::NonZeroUsize>,
233 num_io_threads: Option<std::num::NonZeroUsize>,
234) -> anyhow::Result<tokio::runtime::Runtime> {
235 use std::str::FromStr;
236 let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
237
238 hopr_parallelize::cpu::init_thread_pool(
239 num_cpu_threads
240 .map(|v| v.get())
241 .or(avail_parallelism)
242 .ok_or(anyhow::anyhow!(
243 "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
244 environment variable."
245 ))?
246 .max(1),
247 )?;
248
249 Ok(tokio::runtime::Builder::new_multi_thread()
250 .enable_all()
251 .worker_threads(
252 num_io_threads
253 .map(|v| v.get())
254 .or(avail_parallelism)
255 .ok_or(anyhow::anyhow!(
256 "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
257 environment variable."
258 ))?
259 .max(1),
260 )
261 .thread_name("hoprd")
262 .thread_stack_size(
263 std::env::var("HOPRD_THREAD_STACK_SIZE")
264 .ok()
265 .and_then(|v| usize::from_str(&v).ok())
266 .unwrap_or(10 * 1024 * 1024)
267 .max(2 * 1024 * 1024),
268 )
269 .build()?)
270}
271
272pub type HoprTransportIO = socket::HoprSocket<
274 futures::channel::mpsc::Receiver<ApplicationDataIn>,
275 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
276>;
277
278type TicketEvents = (
279 async_broadcast::Sender<TicketEvent>,
280 async_broadcast::InactiveReceiver<TicketEvent>,
281);
282
283const NODE_READY_TIMEOUT: Duration = Duration::from_secs(120);
285
286const ON_CHAIN_RESOLUTION_EVENT_TIMEOUT: Duration = Duration::from_secs(90);
289
290pub struct Hopr<Chain, Graph, Net>
302where
303 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
304 + hopr_api::graph::NetworkGraphUpdate
305 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
306 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
307 + Clone
308 + Send
309 + Sync
310 + 'static,
311 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
312 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
313 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
314 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
315{
316 me: OffchainKeypair,
317 cfg: config::HoprLibConfig,
318 state: Arc<api::node::state::AtomicHoprState>,
319 transport_api: HoprTransport<Chain, Graph, Net>,
320 chain_api: Chain,
321 ticket_event_subscribers: TicketEvents,
322 ticket_manager: OnceLock<Arc<HoprTicketManager<RedbStore, RedbTicketQueue>>>,
323 processes: OnceLock<AbortableList<HoprLibProcess>>,
324}
325
326impl<Chain, Graph, Net> Hopr<Chain, Graph, Net>
327where
328 Chain: HoprChainApi + Clone + Send + Sync + 'static,
329 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
330 + hopr_api::graph::NetworkGraphUpdate
331 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
332 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
333 + Clone
334 + Send
335 + Sync
336 + 'static,
337 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
338 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
339 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
340 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
341{
342 pub async fn new(
343 identity: (&ChainKeypair, &OffchainKeypair),
344 hopr_chain_api: Chain,
345 graph: Graph,
346 cfg: config::HoprLibConfig,
347 ) -> errors::Result<Self> {
348 if hopr_api::types::crypto_random::is_rng_fixed() {
349 warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
350 }
351
352 cfg.validate()?;
353
354 let hopr_transport_api = HoprTransport::new(
355 identity,
356 hopr_chain_api.clone(),
357 graph,
358 vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
359 cfg.protocol.clone(),
360 )
361 .map_err(HoprLibError::TransportError)?;
362
363 #[cfg(all(feature = "telemetry", not(test)))]
364 {
365 METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
366 METRIC_HOPR_LIB_VERSION.set(
367 &[const_format::formatcp!("{}", constants::APP_VERSION)],
368 const_format::formatcp!(
369 "{}.{}",
370 env!("CARGO_PKG_VERSION_MAJOR"),
371 env!("CARGO_PKG_VERSION_MINOR")
372 )
373 .parse()
374 .unwrap_or(0.0),
375 );
376 }
377
378 let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
379 new_tickets_tx.set_await_active(false);
380 new_tickets_tx.set_overflow(true);
381
382 Ok(Self {
383 me: identity.1.clone(),
384 cfg,
385 state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
386 transport_api: hopr_transport_api,
387 chain_api: hopr_chain_api,
388 processes: OnceLock::new(),
389 ticket_event_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
390 ticket_manager: Default::default(),
391 })
392 }
393
394 fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
395 if HoprNodeOperations::status(self) == state {
396 Ok(())
397 } else {
398 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
399 }
400 }
401
402 pub fn config(&self) -> &config::HoprLibConfig {
403 &self.cfg
404 }
405
406 pub fn graph(&self) -> &Graph {
408 self.transport_api.graph()
409 }
410
411 #[inline]
412 fn is_public(&self) -> bool {
413 self.cfg.publish
414 }
415
416 pub async fn run<
417 Ct,
418 NetBuilder,
419 #[cfg(feature = "session-server")] T: traits::HoprSessionServer + Clone + Send + 'static,
420 >(
421 &self,
422 cover_traffic: Ct,
423 network_builder: NetBuilder,
424 #[cfg(feature = "session-server")] serve_handler: T,
425 ) -> errors::Result<HoprTransportIO>
426 where
427 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
428 NetBuilder: NetworkBuilder<Network = Net> + Send + Sync + 'static,
429 {
430 self.error_if_not_in_state(
431 HoprState::Uninitialized,
432 "cannot start the hopr node multiple times".into(),
433 )?;
434
435 #[cfg(feature = "testing")]
436 warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
437
438 let me_onchain = *self.chain_api.me();
439 info!(
440 address = %me_onchain, minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
441 "node is not started, please fund this node",
442 );
443
444 self.state.store(HoprState::WaitingForFunds, Ordering::Relaxed);
445 helpers::wait_for_funds(
446 *MIN_NATIVE_BALANCE,
447 *SUGGESTED_NATIVE_BALANCE,
448 Duration::from_secs(200),
449 me_onchain,
450 &self.chain_api,
451 )
452 .await?;
453
454 let mut processes = AbortableList::<HoprLibProcess>::default();
455
456 info!("starting HOPR node...");
457 self.state.store(HoprState::CheckingBalance, Ordering::Relaxed);
458
459 let balance: XDaiBalance = self.chain_api.balance(me_onchain).await.map_err(HoprLibError::chain)?;
460 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
461
462 info!(
463 address = %me_onchain,
464 %balance,
465 %minimum_balance,
466 "node information"
467 );
468
469 if balance.le(&minimum_balance) {
470 return Err(HoprLibError::GeneralError(
471 "cannot start the node without a sufficiently funded wallet".into(),
472 ));
473 }
474
475 self.state.store(HoprState::ValidatingNetworkConfig, Ordering::Relaxed);
476
477 let network_min_ticket_price = self
480 .chain_api
481 .minimum_ticket_price()
482 .await
483 .map_err(HoprLibError::chain)?;
484 let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
485 if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
486 return Err(HoprLibError::GeneralError(format!(
487 "configured outgoing ticket price is lower than the network minimum ticket price: \
488 {configured_ticket_price:?} < {network_min_ticket_price}"
489 )));
490 }
491 let network_min_win_prob = self
494 .chain_api
495 .minimum_incoming_ticket_win_prob()
496 .await
497 .map_err(HoprLibError::chain)?;
498 let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
499 if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
500 && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
501 {
502 return Err(HoprLibError::GeneralError(format!(
503 "configured outgoing ticket winning probability is lower than the network minimum winning \
504 probability: {configured_win_prob:?} < {network_min_win_prob}"
505 )));
506 }
507
508 self.state.store(HoprState::CheckingOnchainAddress, Ordering::Relaxed);
509
510 info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
511
512 let safe_addr = self.cfg.safe_module.safe_address;
513
514 if self.me_onchain() == safe_addr {
515 return Err(HoprLibError::GeneralError(
516 "cannot use self as staking safe address".into(),
517 ));
518 }
519
520 self.state.store(HoprState::RegisteringSafe, Ordering::Relaxed);
521 info!(%safe_addr, "registering safe with this node");
522 match self.chain_api.register_safe(&safe_addr).await {
523 Ok(awaiter) => {
524 awaiter.await.map_err(|error| {
526 error!(%safe_addr, %error, "safe registration failed with error");
527 HoprLibError::chain(error)
528 })?;
529 info!(%safe_addr, "safe successfully registered with this node");
530 }
531 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
532 info!(%safe_addr, "this safe is already registered with this node");
533 }
534 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
535 error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
537 return Err(HoprLibError::GeneralError("node registered with different safe".into()));
538 }
539 Err(error) => {
540 error!(%safe_addr, %error, "safe registration failed");
541 return Err(HoprLibError::chain(error));
542 }
543 }
544
545 let multiaddresses_to_announce = if self.is_public() {
547 self.transport_api.announceable_multiaddresses()
550 } else {
551 Vec::with_capacity(0)
552 };
553
554 multiaddresses_to_announce
556 .iter()
557 .filter(|a| !is_public_address(a))
558 .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
559
560 self.state.store(HoprState::AnnouncingNode, Ordering::Relaxed);
561
562 let chain_api = self.chain_api.clone();
563 let me_offchain = *self.me.public();
564 let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
565
566 info!(?multiaddresses_to_announce, "announcing node on chain");
569 match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
570 Ok(awaiter) => {
571 awaiter.await.map_err(|error| {
573 error!(?multiaddresses_to_announce, %error, "node announcement failed");
574 HoprLibError::chain(error)
575 })?;
576 info!(?multiaddresses_to_announce, "node has been successfully announced");
577 }
578 Err(AnnouncementError::AlreadyAnnounced) => {
579 info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
580 }
581 Err(error) => {
582 error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
583 return Err(HoprLibError::chain(error));
584 }
585 }
586
587 self.state.store(HoprState::AwaitingKeyBinding, Ordering::Relaxed);
588
589 let this_node_account = node_ready
591 .await
592 .map_err(HoprLibError::other)?
593 .map_err(HoprLibError::chain)?;
594 if this_node_account.chain_addr != self.me_onchain()
595 || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
596 {
597 error!(%this_node_account, "account bound to offchain key does not match this node");
598 return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
599 }
600
601 info!(%this_node_account, "node account is ready");
602
603 self.state.store(HoprState::InitializingServices, Ordering::Relaxed);
604
605 info!("initializing session infrastructure");
606 let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
607 .ok()
608 .and_then(|s| s.trim().parse::<usize>().ok())
609 .filter(|&c| c > 0)
610 .unwrap_or(256);
611
612 let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
613 #[cfg(feature = "session-server")]
614 {
615 debug!(capacity = incoming_session_channel_capacity, "creating session server");
616 processes.insert(
617 HoprLibProcess::SessionServer,
618 hopr_async_runtime::spawn_as_abortable!(
619 _session_rx
620 .for_each_concurrent(None, move |session| {
621 let serve_handler = serve_handler.clone();
622 async move {
623 let session_id = *session.session.id();
624 match serve_handler.process(session).await {
625 Ok(_) => debug!(?session_id, "client session processed successfully"),
626 Err(error) => error!(
627 ?session_id,
628 %error,
629 "client session processing failed"
630 ),
631 }
632 }
633 })
634 .inspect(|_| tracing::warn!(
635 task = %HoprLibProcess::SessionServer,
636 "long-running background task finished"
637 ))
638 ),
639 );
640 }
641
642 info!("starting ticket manager & factory");
643 let store = self
644 .cfg
645 .ticket_storage_file
646 .as_ref()
647 .map(RedbStore::new)
648 .unwrap_or_else(RedbStore::new_temp)
649 .map_err(HoprLibError::ticket_manager)?;
650
651 let (ticket_manager, ticket_factory) = HoprTicketManager::new_with_factory(store);
652 let ticket_manager = Arc::new(ticket_manager);
653 let ticket_factory = Arc::new(ticket_factory);
654
655 ticket_manager
657 .sync_from_incoming_channels(
658 &self
659 .chain_api
660 .stream_channels(ChannelSelector::default().with_destination(me_onchain))
661 .map_err(HoprLibError::chain)?
662 .collect::<Vec<_>>()
663 .await,
664 )
665 .map_err(HoprLibError::ticket_manager)?;
666
667 ticket_factory
669 .sync_from_outgoing_channels(
670 &self
671 .chain_api
672 .stream_channels(ChannelSelector::default().with_source(me_onchain))
673 .map_err(HoprLibError::chain)?
674 .collect::<Vec<_>>()
675 .await,
676 )
677 .map_err(HoprLibError::ticket_manager)?;
678
679 let (index_sync_handle, index_sync_reg) = futures::future::AbortHandle::new_pair();
681 let tfact = ticket_factory.clone();
682 let tfact2 = ticket_factory.clone();
683 spawn(
684 futures::stream::Abortable::new(
685 futures_time::stream::interval(self.cfg.out_index_sync_period.into()),
686 index_sync_reg,
687 )
688 .for_each(move |_| {
689 let tfact = tfact.clone();
690 async move {
691 if let Err(error) =
692 hopr_async_runtime::prelude::spawn_blocking(move || tfact.save_outgoing_indices())
693 .map_err(hopr_ticket_manager::TicketManagerError::store)
694 .and_then(futures::future::ready)
695 .await
696 {
697 tracing::error!(%error, "failed to sync ticket indices to persistent storage:");
698 } else {
699 tracing::trace!("successfully synced ticket indices");
700 }
701 }
702 })
703 .inspect(move |_| {
704 if let Err(error) = tfact2.save_outgoing_indices() {
706 tracing::error!(%error, "failed to sync ticket indices to persistent storage on shutdown");
707 }
708 tracing::warn!(
709 task = %HoprLibProcess::OutIndexSync,
710 "long-running background task finished"
711 )
712 }),
713 );
714 processes.insert(HoprLibProcess::OutIndexSync, index_sync_handle);
715
716 info!("starting ticket events processor");
717 let (tickets_tx, tickets_rx) = channel(8192);
718 let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
719 processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
720 let new_ticket_tx = self.ticket_event_subscribers.0.clone();
721 let tmgr = ticket_manager.clone();
722 spawn(
723 tickets_rx
724 .for_each(move |event| {
725 if let TicketEvent::WinningTicket(ticket) = &event
727 && let Err(error) = tmgr.insert_incoming_ticket(**ticket)
728 {
729 tracing::error!(%error, "failed to insert incoming ticket into manager");
730 }
731 if let Err(error) = new_ticket_tx.try_broadcast(event) {
732 tracing::error!(%error, "failed to broadcast new ticket event to subscribers");
733 }
734 futures::future::ready(())
735 })
736 .inspect(|_| {
737 tracing::warn!(
738 task = %HoprLibProcess::TicketEvents,
739 "long-running background task finished"
740 )
741 }),
742 );
743
744 info!("starting transport");
745 let (hopr_socket, transport_processes) = self
746 .transport_api
747 .run(cover_traffic, network_builder, tickets_tx, ticket_factory, session_tx)
748 .await?;
749 processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
750
751 info!("subscribing to channel events");
752 let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
753 processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
754 let chain = self.chain_api.clone();
755 let events = chain.subscribe().map_err(HoprLibError::chain)?;
756 let tmgr = ticket_manager.clone();
757
758 spawn(
759 futures::stream::Abortable::new(
760 events
761 .filter_map(move |event|
762 futures::future::ready(event.try_as_channel_closed())
763 ),
764 chain_events_sub_reg
765 )
766 .for_each(move |closed_channel| {
767 let chain = chain.clone();
768 let tmgr = tmgr.clone();
769 async move {
770 match closed_channel.direction(chain.me()) {
771 Some(ChannelDirection::Incoming) => {
772 match tmgr.neglect_tickets(closed_channel.get_id(), None) {
774 Ok(neglected) if !neglected.is_empty() => {
775 warn!(num_neglected = neglected.len(), %closed_channel, "tickets on incoming closed channel were neglected");
776 },
777 Ok(_) => {
778 debug!(%closed_channel, "no neglected tickets on incoming closed channel");
779 },
780 Err(error) => {
781 error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
782 }
783 }
784 },
785 Some(ChannelDirection::Outgoing) => {
786 }
789 _ => {} }
791 }
792 })
793 .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
794 );
795
796 self.state.store(HoprState::Running, Ordering::Relaxed);
797
798 self.ticket_manager
799 .set(ticket_manager)
800 .map_err(|_| HoprLibError::other(anyhow!("cannot set ticket manager")))?;
801
802 info!(
803 id = %self.me_peer_id(),
804 version = constants::APP_VERSION,
805 "NODE STARTED AND RUNNING"
806 );
807
808 #[cfg(all(feature = "telemetry", not(test)))]
809 METRIC_HOPR_NODE_INFO.set(
810 &[
811 &self.me.public().to_peerid_str(),
812 &me_onchain.to_string(),
813 &self.cfg.safe_module.safe_address.to_string(),
814 &self.cfg.safe_module.module_address.to_string(),
815 ],
816 1.0,
817 );
818
819 let _ = self.processes.set(processes);
820
821 Ok(hopr_socket)
822 }
823
824 pub fn shutdown(&self) -> Result<(), HoprLibError> {
832 self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
833 if let Some(processes) = self.processes.get() {
834 processes.abort_all();
835 }
836 self.state.store(HoprState::Terminated, Ordering::Relaxed);
837 info!("NODE SHUTDOWN COMPLETE");
838 Ok(())
839 }
840
841 #[cfg(feature = "session-client")]
844 pub async fn connect_to(
845 &self,
846 destination: Address,
847 target: SessionTarget,
848 cfg: HoprSessionClientConfig,
849 ) -> errors::Result<HoprSession> {
850 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
851
852 let backoff = backon::ConstantBuilder::default()
853 .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
854 .with_delay(self.cfg.protocol.session.establish_retry_timeout)
855 .with_jitter();
856
857 use backon::Retryable;
858
859 Ok((|| {
860 let cfg = hopr_transport::SessionClientConfig::from(cfg.clone());
861 let target = target.clone();
862 async { self.transport_api.new_session(destination, target, cfg).await }
863 })
864 .retry(backoff)
865 .sleep(backon::FuturesTimerSleeper)
866 .await?)
867 }
868
869 #[cfg(feature = "session-client")]
872 pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
873 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
874 Ok(self.transport_api.probe_session(id).await?)
875 }
876
877 #[cfg(feature = "session-client")]
878 pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
879 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
880 Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
881 }
882
883 #[cfg(feature = "session-client")]
884 pub async fn update_session_surb_balancer_config(
885 &self,
886 id: &SessionId,
887 cfg: SurbBalancerConfig,
888 ) -> errors::Result<()> {
889 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
890 Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
891 }
892
893 fn spawn_wait_for_on_chain_event(
896 &self,
897 context: impl std::fmt::Display,
898 predicate: impl Fn(&ChainEvent) -> bool + Send + Sync + 'static,
899 timeout: Duration,
900 ) -> errors::Result<(
901 impl Future<Output = errors::Result<ChainEvent>>,
902 hopr_async_runtime::AbortHandle,
903 )> {
904 debug!(%context, "registering wait for on-chain event");
905 let (event_stream, handle) = futures::stream::abortable(
906 self.chain_api
907 .subscribe()
908 .map_err(HoprLibError::chain)?
909 .skip_while(move |event| futures::future::ready(!predicate(event))),
910 );
911
912 let ctx = context.to_string();
913
914 Ok((
915 spawn(async move {
916 pin_mut!(event_stream);
917 let res = event_stream
918 .next()
919 .timeout(futures_time::time::Duration::from(timeout))
920 .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")))
921 .await?
922 .ok_or(HoprLibError::GeneralError(format!(
923 "failed to yield an on-chain event for {ctx}"
924 )));
925 debug!(%ctx, ?res, "on-chain event waiting done");
926 res
927 })
928 .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")))
929 .and_then(futures::future::ready),
930 handle,
931 ))
932 }
933}
934
935impl<Chain, Graph, Net> Hopr<Chain, Graph, Net>
936where
937 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
938 + hopr_api::graph::NetworkGraphUpdate
939 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
940 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
941 + Clone
942 + Send
943 + Sync
944 + 'static,
945 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
946 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
947 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
948 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
949{
950 pub fn collect_hopr_metrics() -> errors::Result<String> {
953 cfg_if::cfg_if! {
954 if #[cfg(all(feature = "telemetry", not(test)))] {
955 hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
956 } else {
957 Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
958 }
959 }
960 }
961}
962
963impl<Chain, Graph, Net> HoprNodeOperations for Hopr<Chain, Graph, Net>
966where
967 Chain: HoprChainApi + Clone + Send + Sync + 'static,
968 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
969 + hopr_api::graph::NetworkGraphUpdate
970 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
971 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
972 + Clone
973 + Send
974 + Sync
975 + 'static,
976 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
977 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
978 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
979 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
980{
981 fn status(&self) -> HoprState {
982 self.state.load(Ordering::Relaxed)
983 }
984}
985
986#[async_trait::async_trait]
987impl<Chain, Graph, Net> HoprNodeNetworkOperations for Hopr<Chain, Graph, Net>
988where
989 Chain: HoprChainApi + Clone + Send + Sync + 'static,
990 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
991 + hopr_api::graph::NetworkGraphUpdate
992 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
993 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
994 + Clone
995 + Send
996 + Sync
997 + 'static,
998 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
999 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1000 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1001 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1002{
1003 type Error = HoprLibError;
1004 type TransportObservable = <Graph as hopr_api::graph::NetworkGraphView>::Observed;
1005
1006 fn me_peer_id(&self) -> PeerId {
1007 (*self.me.public()).into()
1008 }
1009
1010 async fn get_public_nodes(&self) -> Result<Vec<(PeerId, Address, Vec<Multiaddr>)>, Self::Error> {
1011 Ok(self
1012 .chain_api
1013 .stream_accounts(AccountSelector {
1014 public_only: true,
1015 ..Default::default()
1016 })
1017 .map_err(HoprLibError::chain)?
1018 .map(|entry| {
1019 (
1020 PeerId::from(entry.public_key),
1021 entry.chain_addr,
1022 entry.get_multiaddrs().to_vec(),
1023 )
1024 })
1025 .collect()
1026 .await)
1027 }
1028
1029 async fn network_health(&self) -> hopr_api::network::Health {
1030 self.transport_api.network_health().await
1031 }
1032
1033 async fn network_connected_peers(&self) -> Result<Vec<PeerId>, Self::Error> {
1034 Ok(self
1035 .transport_api
1036 .network_connected_peers()
1037 .await?
1038 .into_iter()
1039 .map(PeerId::from)
1040 .collect())
1041 }
1042
1043 fn network_peer_info(&self, peer: &PeerId) -> Option<Self::TransportObservable> {
1044 let pubkey = OffchainPublicKey::from_peerid(peer).ok()?;
1045 self.transport_api.network_peer_observations(&pubkey)
1046 }
1047
1048 async fn all_network_peers(
1049 &self,
1050 minimum_score: f64,
1051 ) -> Result<Vec<(Option<Address>, PeerId, Self::TransportObservable)>, Self::Error> {
1052 Ok(
1053 futures::stream::iter(self.transport_api.all_network_peers(minimum_score).await?)
1054 .filter_map(|(pubkey, info)| async move {
1055 let peer_id = PeerId::from(pubkey);
1056 let address = self.peerid_to_chain_key(&peer_id).ok().flatten();
1057 Some((address, peer_id, info))
1058 })
1059 .collect::<Vec<_>>()
1060 .await,
1061 )
1062 }
1063
1064 fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1065 self.transport_api.local_multiaddresses()
1066 }
1067
1068 async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1069 self.transport_api.listening_multiaddresses().await
1070 }
1071
1072 async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1073 let Ok(pubkey) = hopr_transport::peer_id_to_public_key(peer) else {
1074 return vec![];
1075 };
1076 self.transport_api.network_observed_multiaddresses(&pubkey).await
1077 }
1078
1079 async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Result<Vec<Multiaddr>, Self::Error> {
1080 let pubkey = hopr_transport::peer_id_to_public_key(peer).map_err(HoprLibError::TransportError)?;
1081
1082 match self
1083 .chain_api
1084 .stream_accounts(AccountSelector {
1085 public_only: false,
1086 offchain_key: Some(pubkey),
1087 ..Default::default()
1088 })
1089 .map_err(HoprLibError::chain)?
1090 .next()
1091 .await
1092 {
1093 Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
1094 None => {
1095 error!(%peer, "no information");
1096 Ok(vec![])
1097 }
1098 }
1099 }
1100
1101 async fn ping(&self, peer: &PeerId) -> Result<(Duration, Self::TransportObservable), Self::Error> {
1102 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1103 let pubkey = hopr_transport::peer_id_to_public_key(peer).map_err(HoprLibError::TransportError)?;
1104 Ok(self.transport_api.ping(&pubkey).await?)
1105 }
1106}
1107
1108impl<Chain, Graph, Net> Hopr<Chain, Graph, Net>
1109where
1110 Chain: HoprChainApi + Clone + Send + Sync + 'static,
1111 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
1112 + hopr_api::graph::NetworkGraphUpdate
1113 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
1114 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1115 + Clone
1116 + Send
1117 + Sync
1118 + 'static,
1119 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
1120 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1121 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1122 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1123{
1124 const PENDING_TO_CLOSE_TOLERANCE: Duration = Duration::from_secs(5);
1127
1128 pub fn me_onchain(&self) -> Address {
1129 *self.chain_api.me()
1130 }
1131
1132 pub fn get_safe_config(&self) -> SafeModuleConfig {
1133 SafeModuleConfig {
1134 safe_address: self.cfg.safe_module.safe_address,
1135 module_address: self.cfg.safe_module.module_address,
1136 }
1137 }
1138
1139 pub async fn get_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1140 self.chain_api
1141 .balance(self.me_onchain())
1142 .await
1143 .map_err(HoprLibError::chain)
1144 }
1145
1146 pub async fn get_safe_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1147 self.chain_api
1148 .balance(self.cfg.safe_module.safe_address)
1149 .await
1150 .map_err(HoprLibError::chain)
1151 }
1152
1153 pub async fn safe_allowance(&self) -> Result<HoprBalance, HoprLibError> {
1154 self.chain_api
1155 .safe_allowance(self.cfg.safe_module.safe_address)
1156 .await
1157 .map_err(HoprLibError::chain)
1158 }
1159
1160 pub async fn chain_info(&self) -> Result<ChainInfo, HoprLibError> {
1161 self.chain_api.chain_info().await.map_err(HoprLibError::chain)
1162 }
1163
1164 pub async fn get_ticket_price(&self) -> Result<HoprBalance, HoprLibError> {
1165 self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1166 }
1167
1168 pub async fn get_minimum_incoming_ticket_win_probability(&self) -> Result<WinningProbability, HoprLibError> {
1169 self.chain_api
1170 .minimum_incoming_ticket_win_prob()
1171 .await
1172 .map_err(HoprLibError::chain)
1173 }
1174
1175 pub async fn get_channel_closure_notice_period(&self) -> Result<Duration, HoprLibError> {
1176 self.chain_api
1177 .channel_closure_notice_period()
1178 .await
1179 .map_err(HoprLibError::chain)
1180 }
1181
1182 pub async fn announced_peers(&self) -> Result<Vec<AnnouncedPeer>, HoprLibError> {
1183 Ok(self
1184 .chain_api
1185 .stream_accounts(AccountSelector {
1186 public_only: true,
1187 ..Default::default()
1188 })
1189 .map_err(HoprLibError::chain)?
1190 .map(|entry| AnnouncedPeer {
1191 address: entry.chain_addr,
1192 multiaddresses: entry.get_multiaddrs().to_vec(),
1193 origin: AnnouncementOrigin::Chain,
1194 })
1195 .collect()
1196 .await)
1197 }
1198
1199 pub fn peerid_to_chain_key(&self, peer_id: &PeerId) -> Result<Option<Address>, HoprLibError> {
1200 let pubkey = hopr_transport::peer_id_to_public_key(peer_id).map_err(HoprLibError::TransportError)?;
1201
1202 self.chain_api
1203 .packet_key_to_chain_key(&pubkey)
1204 .map_err(HoprLibError::chain)
1205 }
1206
1207 pub fn chain_key_to_peerid(&self, address: &Address) -> Result<Option<PeerId>, HoprLibError> {
1208 self.chain_api
1209 .chain_key_to_packet_key(address)
1210 .map(|pk| pk.map(|v| v.into()))
1211 .map_err(HoprLibError::chain)
1212 }
1213
1214 pub fn channel_by_id(&self, channel_id: &ChannelId) -> Result<Option<ChannelEntry>, HoprLibError> {
1215 self.chain_api.channel_by_id(channel_id).map_err(HoprLibError::chain)
1216 }
1217
1218 pub fn channel(&self, src: &Address, dest: &Address) -> Result<Option<ChannelEntry>, HoprLibError> {
1219 self.chain_api
1220 .channel_by_parties(src, dest)
1221 .map_err(HoprLibError::chain)
1222 }
1223
1224 pub async fn channels_from(&self, src: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1225 Ok(self
1226 .chain_api
1227 .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1228 ChannelStatusDiscriminants::Closed,
1229 ChannelStatusDiscriminants::Open,
1230 ChannelStatusDiscriminants::PendingToClose,
1231 ]))
1232 .map_err(HoprLibError::chain)?
1233 .collect()
1234 .await)
1235 }
1236
1237 pub async fn channels_to(&self, dest: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1238 Ok(self
1239 .chain_api
1240 .stream_channels(
1241 ChannelSelector::default()
1242 .with_destination(*dest)
1243 .with_allowed_states(&[
1244 ChannelStatusDiscriminants::Closed,
1245 ChannelStatusDiscriminants::Open,
1246 ChannelStatusDiscriminants::PendingToClose,
1247 ]),
1248 )
1249 .map_err(HoprLibError::chain)?
1250 .collect()
1251 .await)
1252 }
1253
1254 pub async fn all_channels(&self) -> Result<Vec<ChannelEntry>, HoprLibError> {
1255 Ok(self
1256 .chain_api
1257 .stream_channels(ChannelSelector::default().with_allowed_states(&[
1258 ChannelStatusDiscriminants::Closed,
1259 ChannelStatusDiscriminants::Open,
1260 ChannelStatusDiscriminants::PendingToClose,
1261 ]))
1262 .map_err(HoprLibError::chain)?
1263 .collect()
1264 .await)
1265 }
1266
1267 pub async fn open_channel(
1268 &self,
1269 destination: &Address,
1270 amount: HoprBalance,
1271 ) -> Result<OpenChannelResult, HoprLibError> {
1272 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1273
1274 let channel_id = generate_channel_id(&self.me_onchain(), destination);
1275
1276 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1279 format!("open channel to {destination} ({channel_id})"),
1280 move |event| matches!(event, ChainEvent::ChannelOpened(c) if c.get_id() == &channel_id),
1281 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1282 )?;
1283
1284 let confirm_awaiter = self
1285 .chain_api
1286 .open_channel(destination, amount)
1287 .await
1288 .map_err(HoprLibError::chain)?;
1289
1290 let tx_hash = confirm_awaiter.await.map_err(|e| {
1291 event_abort.abort();
1292 HoprLibError::chain(e)
1293 })?;
1294
1295 let event = event_awaiter.await?;
1296 debug!(%event, "open channel event received");
1297
1298 Ok(OpenChannelResult { tx_hash, channel_id })
1299 }
1300
1301 pub async fn fund_channel(&self, channel_id: &ChannelId, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1302 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1303
1304 let channel_id = *channel_id;
1305
1306 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1309 format!("fund channel {channel_id}"),
1310 move |event| matches!(event, ChainEvent::ChannelBalanceIncreased(c, a) if c.get_id() == &channel_id && a == &amount),
1311 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT
1312 )?;
1313
1314 let confirm_awaiter = self
1315 .chain_api
1316 .fund_channel(&channel_id, amount)
1317 .await
1318 .map_err(HoprLibError::chain)?;
1319
1320 let res = confirm_awaiter.await.map_err(|e| {
1321 event_abort.abort();
1322 HoprLibError::chain(e)
1323 })?;
1324
1325 let event = event_awaiter.await?;
1326 debug!(%event, "fund channel event received");
1327
1328 Ok(res)
1329 }
1330
1331 pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> Result<CloseChannelResult, HoprLibError> {
1332 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1333
1334 let channel_id = *channel_id;
1335
1336 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1339 format!("close channel {channel_id}"),
1340 move |event| {
1341 matches!(event, ChainEvent::ChannelClosed(c) if c.get_id() == &channel_id)
1342 || matches!(event, ChainEvent::ChannelClosureInitiated(c) if c.get_id() == &channel_id)
1343 },
1344 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1345 )?;
1346
1347 let confirm_awaiter = self
1348 .chain_api
1349 .close_channel(&channel_id)
1350 .await
1351 .map_err(HoprLibError::chain)?;
1352
1353 let tx_hash = confirm_awaiter.await.map_err(|e| {
1354 event_abort.abort();
1355 HoprLibError::chain(e)
1356 })?;
1357
1358 let event = event_awaiter.await?;
1359 debug!(%event, "close channel event received");
1360
1361 Ok(CloseChannelResult { tx_hash })
1362 }
1363
1364 pub fn ticket_management(&self) -> Result<impl TicketManagement + Clone + Send + 'static, HoprLibError> {
1365 self.error_if_not_in_state(HoprState::Running, "Node is not ready for transport operations".into())?;
1366
1367 self.ticket_manager
1368 .get()
1369 .cloned()
1370 .ok_or(HoprLibError::StatusError(HoprStatusError::NotThereYet(
1371 HoprState::Running,
1372 "Node is not ready for transport operations".into(),
1373 )))
1374 }
1375
1376 pub async fn ticket_statistics(&self) -> Result<ChannelStats, HoprLibError> {
1378 self.ticket_management()?
1379 .ticket_stats(None)
1380 .map_err(HoprLibError::ticket_manager)
1381 }
1382
1383 pub async fn redeem_all_tickets<B: Into<HoprBalance> + Send>(
1384 &self,
1385 min_value: B,
1386 ) -> Result<Vec<RedemptionResult>, HoprLibError> {
1387 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1388
1389 let min_value = min_value.into();
1390
1391 self.ticket_management()?
1392 .redeem_in_channels(
1393 self.chain_api.clone(),
1394 None,
1395 min_value.into(),
1396 Some(Self::PENDING_TO_CLOSE_TOLERANCE),
1397 )
1398 .await
1399 .map_err(HoprLibError::ticket_manager)?
1400 .try_collect::<Vec<_>>()
1401 .map_err(HoprLibError::ticket_manager)
1402 .await
1403 }
1404
1405 pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance> + Send>(
1406 &self,
1407 counterparty: &Address,
1408 min_value: B,
1409 ) -> Result<Vec<RedemptionResult>, HoprLibError> {
1410 self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1411 .await
1412 }
1413
1414 pub async fn redeem_tickets_in_channel<B: Into<HoprBalance> + Send>(
1415 &self,
1416 channel_id: &ChannelId,
1417 min_value: B,
1418 ) -> Result<Vec<RedemptionResult>, HoprLibError> {
1419 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1420
1421 let min_value = min_value.into();
1422
1423 let chain_api = self.chain_api.clone();
1424 let channel_id = *channel_id;
1425 let channel = hopr_async_runtime::prelude::spawn_blocking(move || {
1426 chain_api
1427 .channel_by_id(&channel_id)
1428 .map_err(HoprLibError::chain)?
1429 .ok_or(HoprLibError::GeneralError("channel not found".into()))
1430 })
1431 .await
1432 .map_err(HoprLibError::other)??;
1433
1434 self.ticket_management()?
1435 .redeem_in_channels(
1436 self.chain_api.clone(),
1437 ChannelSelector::default()
1438 .with_source(channel.source)
1439 .with_destination(channel.destination)
1440 .into(),
1441 min_value.into(),
1442 Some(Self::PENDING_TO_CLOSE_TOLERANCE),
1443 )
1444 .await
1445 .map_err(HoprLibError::ticket_manager)?
1446 .map_err(HoprLibError::ticket_manager)
1447 .try_collect::<Vec<_>>()
1448 .await
1449 }
1450
1451 pub fn subscribe_ticket_events(&self) -> impl Stream<Item = TicketEvent> + Send + 'static {
1452 self.ticket_event_subscribers.1.activate_cloned()
1453 }
1454
1455 pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1456 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1457
1458 self.chain_api
1459 .withdraw(amount, &recipient)
1460 .and_then(identity)
1461 .map_err(HoprLibError::chain)
1462 .await
1463 }
1464
1465 pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> Result<Hash, HoprLibError> {
1466 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1467
1468 self.chain_api
1469 .withdraw(amount, &recipient)
1470 .and_then(identity)
1471 .map_err(HoprLibError::chain)
1472 .await
1473 }
1474}