1mod helpers;
17
18pub mod config;
20pub mod constants;
22pub mod errors;
24pub mod traits;
26pub mod types;
28pub mod utils;
30
31pub use hopr_api as api;
32
33#[doc(hidden)]
35pub mod exports {
36 pub mod types {
37 pub use hopr_api::types::{chain, internal, primitive};
38 }
39
40 pub mod crypto {
41 pub use hopr_api::types::crypto as types;
42 pub use hopr_crypto_keypair as keypair;
43 }
44
45 pub mod network {
46 pub use hopr_network_types as types;
47 }
48
49 pub use hopr_transport as transport;
50}
51
52#[doc(hidden)]
54pub mod prelude {
55 #[cfg(feature = "runtime-tokio")]
56 pub use super::exports::network::types::{
57 prelude::ForeignDataMode,
58 udp::{ConnectedUdpStream, UdpStreamParallelism},
59 };
60 pub use super::exports::{
61 crypto::{
62 keypair::key_pair::HoprKeys,
63 types::prelude::{ChainKeypair, Hash, OffchainKeypair},
64 },
65 transport::{OffchainPublicKey, socket::HoprSocket},
66 types::primitive::prelude::Address,
67 };
68}
69
70use std::{
71 convert::identity,
72 future::Future,
73 sync::{Arc, OnceLock, atomic::Ordering},
74 time::Duration,
75};
76
77use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, channel::mpsc::channel, pin_mut};
78use futures_time::future::FutureExt as FuturesTimeFutureExt;
79use hopr_api::{
80 chain::*,
81 ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
82 node::{ChainInfo, CloseChannelResult, OpenChannelResult, SafeModuleConfig, state::AtomicHoprState},
83};
84pub use hopr_api::{
85 graph::EdgeLinkObservable,
86 network::{NetworkBuilder, NetworkStreamControl},
87 node::{HoprNodeNetworkOperations, HoprNodeOperations, state::HoprState},
88 tickets::{ChannelStats, RedemptionResult, TicketManagement, TicketManagementExt},
89 types::{crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
90};
91use hopr_async_runtime::prelude::spawn;
92pub use hopr_async_runtime::{Abortable, AbortableList};
93pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
94pub use hopr_network_types::prelude::*;
95#[cfg(all(feature = "telemetry", not(test)))]
96use hopr_platform::time::native::current_time;
97#[cfg(feature = "runtime-tokio")]
98pub use hopr_transport::transfer_session;
99pub use hopr_transport::*;
100use tracing::{debug, error, info, warn};
101use validator::Validate;
102
103pub use crate::{
104 config::SafeModule,
105 constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
106 errors::{HoprLibError, HoprStatusError},
107 types::{AnnouncedPeer, AnnouncementOrigin},
108};
109
110#[cfg(feature = "session-client")]
114#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, smart_default::SmartDefault)]
115#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
116pub struct HopRouting(
117 #[default(hopr_api::types::primitive::bounded::BoundedSize::MIN)]
118 hopr_api::types::primitive::bounded::BoundedSize<
119 { hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS },
120 >,
121);
122
123#[cfg(feature = "session-client")]
124impl HopRouting {
125 pub const MAX_HOPS: usize = hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS;
127
128 pub fn hop_count(self) -> usize {
130 self.0.into()
131 }
132}
133
134#[cfg(feature = "session-client")]
135impl TryFrom<usize> for HopRouting {
136 type Error = hopr_api::types::primitive::errors::GeneralError;
137
138 fn try_from(value: usize) -> Result<Self, Self::Error> {
139 Ok(Self(value.try_into()?))
140 }
141}
142
143#[cfg(feature = "session-client")]
144impl From<HopRouting> for hopr_api::types::internal::routing::RoutingOptions {
145 fn from(value: HopRouting) -> Self {
146 Self::Hops(value.0)
147 }
148}
149
150#[cfg(feature = "session-client")]
155#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault)]
156pub struct HoprSessionClientConfig {
157 pub forward_path: HopRouting,
159 pub return_path: HopRouting,
161 #[default(_code = "SessionCapability::Segmentation.into()")]
163 pub capabilities: SessionCapabilities,
164 #[default(None)]
166 pub pseudonym: Option<hopr_api::types::internal::protocol::HoprPseudonym>,
167 #[default(Some(SurbBalancerConfig::default()))]
169 pub surb_management: Option<SurbBalancerConfig>,
170 #[default(false)]
172 pub always_max_out_surbs: bool,
173}
174
175#[cfg(feature = "session-client")]
176impl From<HoprSessionClientConfig> for hopr_transport::SessionClientConfig {
177 fn from(value: HoprSessionClientConfig) -> Self {
178 Self {
179 forward_path_options: value.forward_path.into(),
180 return_path_options: value.return_path.into(),
181 capabilities: value.capabilities,
182 pseudonym: value.pseudonym,
183 surb_management: value.surb_management,
184 always_max_out_surbs: value.always_max_out_surbs,
185 }
186 }
187}
188
189#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display, strum::EnumCount)]
191pub enum HoprLibProcess {
192 #[strum(to_string = "transport: {0}")]
193 Transport(HoprTransportProcess),
194 #[strum(to_string = "session server providing the exit node session stream functionality")]
195 SessionServer,
196 #[strum(to_string = "ticket redemption queue driver")]
197 TicketRedemptions,
198 #[strum(to_string = "subscription for on-chain channel updates")]
199 ChannelEvents,
200 #[strum(to_string = "on received ticket event (winning or rejected)")]
201 TicketEvents,
202}
203
204#[cfg(all(feature = "telemetry", not(test)))]
205lazy_static::lazy_static! {
206 static ref METRIC_PROCESS_START_TIME: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
207 "hopr_start_time",
208 "The unix timestamp in seconds at which the process was started"
209 ).unwrap();
210 static ref METRIC_HOPR_LIB_VERSION: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
211 "hopr_lib_version",
212 "Executed version of hopr-lib",
213 &["version"]
214 ).unwrap();
215 static ref METRIC_HOPR_NODE_INFO: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
216 "hopr_node_addresses",
217 "Node on-chain and off-chain addresses",
218 &["peerid", "address", "safe_address", "module_address"]
219 ).unwrap();
220}
221
222#[cfg(feature = "runtime-tokio")]
227pub fn prepare_tokio_runtime(
228 num_cpu_threads: Option<std::num::NonZeroUsize>,
229 num_io_threads: Option<std::num::NonZeroUsize>,
230) -> anyhow::Result<tokio::runtime::Runtime> {
231 use std::str::FromStr;
232 let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
233
234 hopr_parallelize::cpu::init_thread_pool(
235 num_cpu_threads
236 .map(|v| v.get())
237 .or(avail_parallelism)
238 .ok_or(anyhow::anyhow!(
239 "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
240 environment variable."
241 ))?
242 .max(1),
243 )?;
244
245 Ok(tokio::runtime::Builder::new_multi_thread()
246 .enable_all()
247 .worker_threads(
248 num_io_threads
249 .map(|v| v.get())
250 .or(avail_parallelism)
251 .ok_or(anyhow::anyhow!(
252 "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
253 environment variable."
254 ))?
255 .max(1),
256 )
257 .thread_name("hoprd")
258 .thread_stack_size(
259 std::env::var("HOPRD_THREAD_STACK_SIZE")
260 .ok()
261 .and_then(|v| usize::from_str(&v).ok())
262 .unwrap_or(10 * 1024 * 1024)
263 .max(2 * 1024 * 1024),
264 )
265 .build()?)
266}
267
268pub type HoprTransportIO = socket::HoprSocket<
270 futures::channel::mpsc::Receiver<ApplicationDataIn>,
271 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
272>;
273
274type TicketEvents = (
275 async_broadcast::Sender<TicketEvent>,
276 async_broadcast::InactiveReceiver<TicketEvent>,
277);
278
279const NODE_READY_TIMEOUT: Duration = Duration::from_secs(120);
281
282const ON_CHAIN_RESOLUTION_EVENT_TIMEOUT: Duration = Duration::from_secs(90);
285
286pub struct Hopr<Chain, Graph, Net>
298where
299 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
300 + hopr_api::graph::NetworkGraphUpdate
301 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
302 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
303 + Clone
304 + Send
305 + Sync
306 + 'static,
307 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
308 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
309 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
310 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
311{
312 me: OffchainKeypair,
313 cfg: config::HoprLibConfig,
314 state: Arc<api::node::state::AtomicHoprState>,
315 transport_api: HoprTransport<Chain, Graph, Net>,
316 chain_api: Chain,
317 ticket_event_subscribers: TicketEvents,
318 processes: OnceLock<AbortableList<HoprLibProcess>>,
319}
320
321impl<Chain, Graph, Net> Hopr<Chain, Graph, Net>
322where
323 Chain: HoprChainApi + Clone + Send + Sync + 'static,
324 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
325 + hopr_api::graph::NetworkGraphUpdate
326 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
327 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
328 + Clone
329 + Send
330 + Sync
331 + 'static,
332 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
333 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
334 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
335 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
336{
337 pub async fn new(
338 identity: (&ChainKeypair, &OffchainKeypair),
339 hopr_chain_api: Chain,
340 graph: Graph,
341 cfg: config::HoprLibConfig,
342 ) -> errors::Result<Self> {
343 if hopr_api::types::crypto_random::is_rng_fixed() {
344 warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
345 }
346
347 cfg.validate()?;
348
349 let hopr_transport_api = HoprTransport::new(
350 identity,
351 hopr_chain_api.clone(),
352 graph,
353 vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
354 cfg.protocol.clone(),
355 )
356 .map_err(HoprLibError::TransportError)?;
357
358 #[cfg(all(feature = "telemetry", not(test)))]
359 {
360 METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
361 METRIC_HOPR_LIB_VERSION.set(
362 &[const_format::formatcp!("{}", constants::APP_VERSION)],
363 const_format::formatcp!(
364 "{}.{}",
365 env!("CARGO_PKG_VERSION_MAJOR"),
366 env!("CARGO_PKG_VERSION_MINOR")
367 )
368 .parse()
369 .unwrap_or(0.0),
370 );
371 }
372
373 let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
374 new_tickets_tx.set_await_active(false);
375 new_tickets_tx.set_overflow(true);
376
377 Ok(Self {
378 me: identity.1.clone(),
379 cfg,
380 state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
381 transport_api: hopr_transport_api,
382 chain_api: hopr_chain_api,
383 processes: OnceLock::new(),
384 ticket_event_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
385 })
386 }
387
388 fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
389 if HoprNodeOperations::status(self) == state {
390 Ok(())
391 } else {
392 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
393 }
394 }
395
396 pub fn config(&self) -> &config::HoprLibConfig {
397 &self.cfg
398 }
399
400 pub fn graph(&self) -> &Graph {
402 self.transport_api.graph()
403 }
404
405 #[inline]
406 fn is_public(&self) -> bool {
407 self.cfg.publish
408 }
409
410 pub async fn run<
411 Ct,
412 NetBuilder,
413 #[cfg(feature = "session-server")] T: traits::HoprSessionServer + Clone + Send + 'static,
414 >(
415 &self,
416 cover_traffic: Ct,
417 network_builder: NetBuilder,
418 #[cfg(feature = "session-server")] serve_handler: T,
419 ) -> errors::Result<HoprTransportIO>
420 where
421 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
422 NetBuilder: NetworkBuilder<Network = Net> + Send + Sync + 'static,
423 {
424 self.error_if_not_in_state(
425 HoprState::Uninitialized,
426 "cannot start the hopr node multiple times".into(),
427 )?;
428
429 #[cfg(feature = "testing")]
430 warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
431
432 let me_onchain = *self.chain_api.me();
433 info!(
434 address = %me_onchain, minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
435 "node is not started, please fund this node",
436 );
437
438 self.state.store(HoprState::WaitingForFunds, Ordering::Relaxed);
439 helpers::wait_for_funds(
440 *MIN_NATIVE_BALANCE,
441 *SUGGESTED_NATIVE_BALANCE,
442 Duration::from_secs(200),
443 me_onchain,
444 &self.chain_api,
445 )
446 .await?;
447
448 let mut processes = AbortableList::<HoprLibProcess>::default();
449
450 info!("starting HOPR node...");
451 self.state.store(HoprState::CheckingBalance, Ordering::Relaxed);
452
453 let balance: XDaiBalance = self.chain_api.balance(me_onchain).await.map_err(HoprLibError::chain)?;
454 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
455
456 info!(
457 address = %me_onchain,
458 %balance,
459 %minimum_balance,
460 "node information"
461 );
462
463 if balance.le(&minimum_balance) {
464 return Err(HoprLibError::GeneralError(
465 "cannot start the node without a sufficiently funded wallet".into(),
466 ));
467 }
468
469 self.state.store(HoprState::ValidatingNetworkConfig, Ordering::Relaxed);
470
471 let network_min_ticket_price = self
474 .chain_api
475 .minimum_ticket_price()
476 .await
477 .map_err(HoprLibError::chain)?;
478 let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
479 if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
480 return Err(HoprLibError::GeneralError(format!(
481 "configured outgoing ticket price is lower than the network minimum ticket price: \
482 {configured_ticket_price:?} < {network_min_ticket_price}"
483 )));
484 }
485 let network_min_win_prob = self
488 .chain_api
489 .minimum_incoming_ticket_win_prob()
490 .await
491 .map_err(HoprLibError::chain)?;
492 let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
493 if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
494 && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
495 {
496 return Err(HoprLibError::GeneralError(format!(
497 "configured outgoing ticket winning probability is lower than the network minimum winning \
498 probability: {configured_win_prob:?} < {network_min_win_prob}"
499 )));
500 }
501
502 self.state.store(HoprState::CheckingOnchainAddress, Ordering::Relaxed);
503
504 info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
505
506 let safe_addr = self.cfg.safe_module.safe_address;
507
508 if self.me_onchain() == safe_addr {
509 return Err(HoprLibError::GeneralError(
510 "cannot use self as staking safe address".into(),
511 ));
512 }
513
514 self.state.store(HoprState::RegisteringSafe, Ordering::Relaxed);
515 info!(%safe_addr, "registering safe with this node");
516 match self.chain_api.register_safe(&safe_addr).await {
517 Ok(awaiter) => {
518 awaiter.await.map_err(|error| {
520 error!(%safe_addr, %error, "safe registration failed with error");
521 HoprLibError::chain(error)
522 })?;
523 info!(%safe_addr, "safe successfully registered with this node");
524 }
525 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
526 info!(%safe_addr, "this safe is already registered with this node");
527 }
528 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
529 error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
531 return Err(HoprLibError::GeneralError("node registered with different safe".into()));
532 }
533 Err(error) => {
534 error!(%safe_addr, %error, "safe registration failed");
535 return Err(HoprLibError::chain(error));
536 }
537 }
538
539 let multiaddresses_to_announce = if self.is_public() {
541 self.transport_api.announceable_multiaddresses()
544 } else {
545 Vec::with_capacity(0)
546 };
547
548 multiaddresses_to_announce
550 .iter()
551 .filter(|a| !is_public_address(a))
552 .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
553
554 self.state.store(HoprState::AnnouncingNode, Ordering::Relaxed);
555
556 let chain_api = self.chain_api.clone();
557 let me_offchain = *self.me.public();
558 let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
559
560 info!(?multiaddresses_to_announce, "announcing node on chain");
563 match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
564 Ok(awaiter) => {
565 awaiter.await.map_err(|error| {
567 error!(?multiaddresses_to_announce, %error, "node announcement failed");
568 HoprLibError::chain(error)
569 })?;
570 info!(?multiaddresses_to_announce, "node has been successfully announced");
571 }
572 Err(AnnouncementError::AlreadyAnnounced) => {
573 info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
574 }
575 Err(error) => {
576 error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
577 return Err(HoprLibError::chain(error));
578 }
579 }
580
581 self.state.store(HoprState::AwaitingKeyBinding, Ordering::Relaxed);
582
583 let this_node_account = node_ready
585 .await
586 .map_err(HoprLibError::other)?
587 .map_err(HoprLibError::chain)?;
588 if this_node_account.chain_addr != self.me_onchain()
589 || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
590 {
591 error!(%this_node_account, "account bound to offchain key does not match this node");
592 return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
593 }
594
595 info!(%this_node_account, "node account is ready");
596
597 self.state.store(HoprState::InitializingServices, Ordering::Relaxed);
598
599 info!("initializing session infrastructure");
600 let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
601 .ok()
602 .and_then(|s| s.trim().parse::<usize>().ok())
603 .filter(|&c| c > 0)
604 .unwrap_or(256);
605
606 let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
607 #[cfg(feature = "session-server")]
608 {
609 debug!(capacity = incoming_session_channel_capacity, "creating session server");
610 processes.insert(
611 HoprLibProcess::SessionServer,
612 hopr_async_runtime::spawn_as_abortable!(
613 _session_rx
614 .for_each_concurrent(None, move |session| {
615 let serve_handler = serve_handler.clone();
616 async move {
617 let session_id = *session.session.id();
618 match serve_handler.process(session).await {
619 Ok(_) => debug!(?session_id, "client session processed successfully"),
620 Err(error) => error!(
621 ?session_id,
622 %error,
623 "client session processing failed"
624 ),
625 }
626 }
627 })
628 .inspect(|_| tracing::warn!(
629 task = %HoprLibProcess::SessionServer,
630 "long-running background task finished"
631 ))
632 ),
633 );
634 }
635
636 info!("starting ticket events processor");
637 let (tickets_tx, tickets_rx) = channel(8192);
638 let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
639 processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
640 let new_ticket_tx = self.ticket_event_subscribers.0.clone();
641 spawn(
642 tickets_rx
643 .for_each(move |event| {
644 if let Err(error) = new_ticket_tx.try_broadcast(event) {
645 tracing::error!(%error, "failed to broadcast new ticket event to subscribers");
646 }
647 futures::future::ready(())
648 })
649 .inspect(|_| {
650 tracing::warn!(
651 task = %HoprLibProcess::TicketEvents,
652 "long-running background task finished"
653 )
654 }),
655 );
656
657 info!("starting transport");
658 let (hopr_socket, transport_processes) = self
659 .transport_api
660 .run(cover_traffic, network_builder, tickets_tx, session_tx)
661 .await?;
662 processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
663
664 info!("subscribing to channel events");
665 let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
666 processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
667 let chain = self.chain_api.clone();
668 let events = chain.subscribe().map_err(HoprLibError::chain)?;
669 let tmgr = self.transport_api.ticket_manager();
670 spawn(
671 futures::stream::Abortable::new(
672 events
673 .filter_map(move |event|
674 futures::future::ready(event.try_as_channel_closed())
675 ),
676 chain_events_sub_reg
677 )
678 .for_each(move |closed_channel| {
679 let chain = chain.clone();
680 let tmgr = tmgr.clone();
681 async move {
682 match closed_channel.direction(chain.me()) {
683 Some(ChannelDirection::Incoming) => {
684 match tmgr.neglect_tickets(closed_channel.get_id(), None) {
685 Ok(neglected) if !neglected.is_empty() => {
686 warn!(num_neglected = neglected.len(), %closed_channel, "tickets on incoming closed channel were neglected");
687 },
688 Ok(_) => {
689 debug!(%closed_channel, "no neglected tickets on incoming closed channel");
690 },
691 Err(error) => {
692 error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
693 }
694 }
695 },
696 Some(ChannelDirection::Outgoing) => {
697 }
700 _ => {} }
702 }
703 })
704 .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
705 );
706
707 self.state.store(HoprState::Running, Ordering::Relaxed);
708
709 info!(
710 id = %self.me_peer_id(),
711 version = constants::APP_VERSION,
712 "NODE STARTED AND RUNNING"
713 );
714
715 #[cfg(all(feature = "telemetry", not(test)))]
716 METRIC_HOPR_NODE_INFO.set(
717 &[
718 &self.me.public().to_peerid_str(),
719 &me_onchain.to_string(),
720 &self.cfg.safe_module.safe_address.to_string(),
721 &self.cfg.safe_module.module_address.to_string(),
722 ],
723 1.0,
724 );
725
726 let _ = self.processes.set(processes);
727 Ok(hopr_socket)
728 }
729
730 pub fn shutdown(&self) -> Result<(), HoprLibError> {
738 self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
739 if let Some(processes) = self.processes.get() {
740 processes.abort_all();
741 }
742 self.state.store(HoprState::Terminated, Ordering::Relaxed);
743 info!("NODE SHUTDOWN COMPLETE");
744 Ok(())
745 }
746
747 #[cfg(feature = "session-client")]
750 pub async fn connect_to(
751 &self,
752 destination: Address,
753 target: SessionTarget,
754 cfg: HoprSessionClientConfig,
755 ) -> errors::Result<HoprSession> {
756 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
757
758 let backoff = backon::ConstantBuilder::default()
759 .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
760 .with_delay(self.cfg.protocol.session.establish_retry_timeout)
761 .with_jitter();
762
763 use backon::Retryable;
764
765 Ok((|| {
766 let cfg = hopr_transport::SessionClientConfig::from(cfg.clone());
767 let target = target.clone();
768 async { self.transport_api.new_session(destination, target, cfg).await }
769 })
770 .retry(backoff)
771 .sleep(backon::FuturesTimerSleeper)
772 .await?)
773 }
774
775 #[cfg(feature = "session-client")]
778 pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
779 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
780 Ok(self.transport_api.probe_session(id).await?)
781 }
782
783 #[cfg(feature = "session-client")]
784 pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
785 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
786 Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
787 }
788
789 #[cfg(feature = "session-client")]
790 pub async fn update_session_surb_balancer_config(
791 &self,
792 id: &SessionId,
793 cfg: SurbBalancerConfig,
794 ) -> errors::Result<()> {
795 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
796 Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
797 }
798
799 fn spawn_wait_for_on_chain_event(
802 &self,
803 context: impl std::fmt::Display,
804 predicate: impl Fn(&ChainEvent) -> bool + Send + Sync + 'static,
805 timeout: Duration,
806 ) -> errors::Result<(
807 impl Future<Output = errors::Result<ChainEvent>>,
808 hopr_async_runtime::AbortHandle,
809 )> {
810 debug!(%context, "registering wait for on-chain event");
811 let (event_stream, handle) = futures::stream::abortable(
812 self.chain_api
813 .subscribe()
814 .map_err(HoprLibError::chain)?
815 .skip_while(move |event| futures::future::ready(!predicate(event))),
816 );
817
818 let ctx = context.to_string();
819
820 Ok((
821 spawn(async move {
822 pin_mut!(event_stream);
823 let res = event_stream
824 .next()
825 .timeout(futures_time::time::Duration::from(timeout))
826 .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")))
827 .await?
828 .ok_or(HoprLibError::GeneralError(format!(
829 "failed to yield an on-chain event for {ctx}"
830 )));
831 debug!(%ctx, ?res, "on-chain event waiting done");
832 res
833 })
834 .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")))
835 .and_then(futures::future::ready),
836 handle,
837 ))
838 }
839}
840
841impl<Chain, Graph, Net> Hopr<Chain, Graph, Net>
842where
843 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
844 + hopr_api::graph::NetworkGraphUpdate
845 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
846 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
847 + Clone
848 + Send
849 + Sync
850 + 'static,
851 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
852 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
853 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
854 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
855{
856 pub fn collect_hopr_metrics() -> errors::Result<String> {
859 cfg_if::cfg_if! {
860 if #[cfg(all(feature = "telemetry", not(test)))] {
861 hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
862 } else {
863 Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
864 }
865 }
866 }
867}
868
869impl<Chain, Graph, Net> HoprNodeOperations for Hopr<Chain, Graph, Net>
872where
873 Chain: HoprChainApi + Clone + Send + Sync + 'static,
874 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
875 + hopr_api::graph::NetworkGraphUpdate
876 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
877 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
878 + Clone
879 + Send
880 + Sync
881 + 'static,
882 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
883 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
884 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
885 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
886{
887 fn status(&self) -> HoprState {
888 self.state.load(Ordering::Relaxed)
889 }
890}
891
892#[async_trait::async_trait]
893impl<Chain, Graph, Net> HoprNodeNetworkOperations for Hopr<Chain, Graph, Net>
894where
895 Chain: HoprChainApi + Clone + Send + Sync + 'static,
896 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
897 + hopr_api::graph::NetworkGraphUpdate
898 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
899 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
900 + Clone
901 + Send
902 + Sync
903 + 'static,
904 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
905 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
906 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
907 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
908{
909 type Error = HoprLibError;
910 type TransportObservable = <Graph as hopr_api::graph::NetworkGraphView>::Observed;
911
912 fn me_peer_id(&self) -> PeerId {
913 (*self.me.public()).into()
914 }
915
916 async fn get_public_nodes(&self) -> Result<Vec<(PeerId, Address, Vec<Multiaddr>)>, Self::Error> {
917 Ok(self
918 .chain_api
919 .stream_accounts(AccountSelector {
920 public_only: true,
921 ..Default::default()
922 })
923 .map_err(HoprLibError::chain)
924 .await?
925 .map(|entry| {
926 (
927 PeerId::from(entry.public_key),
928 entry.chain_addr,
929 entry.get_multiaddrs().to_vec(),
930 )
931 })
932 .collect()
933 .await)
934 }
935
936 async fn network_health(&self) -> hopr_api::network::Health {
937 self.transport_api.network_health().await
938 }
939
940 async fn network_connected_peers(&self) -> Result<Vec<PeerId>, Self::Error> {
941 Ok(self
942 .transport_api
943 .network_connected_peers()
944 .await?
945 .into_iter()
946 .map(PeerId::from)
947 .collect())
948 }
949
950 fn network_peer_info(&self, peer: &PeerId) -> Option<Self::TransportObservable> {
951 let pubkey = OffchainPublicKey::from_peerid(peer).ok()?;
952 self.transport_api.network_peer_observations(&pubkey)
953 }
954
955 async fn all_network_peers(
956 &self,
957 minimum_score: f64,
958 ) -> Result<Vec<(Option<Address>, PeerId, Self::TransportObservable)>, Self::Error> {
959 Ok(
960 futures::stream::iter(self.transport_api.all_network_peers(minimum_score).await?)
961 .filter_map(|(pubkey, info)| async move {
962 let peer_id = PeerId::from(pubkey);
963 let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
964 Some((address, peer_id, info))
965 })
966 .collect::<Vec<_>>()
967 .await,
968 )
969 }
970
971 fn local_multiaddresses(&self) -> Vec<Multiaddr> {
972 self.transport_api.local_multiaddresses()
973 }
974
975 async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
976 self.transport_api.listening_multiaddresses().await
977 }
978
979 async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
980 let Ok(pubkey) = hopr_transport::peer_id_to_public_key(peer).await else {
981 return vec![];
982 };
983 self.transport_api.network_observed_multiaddresses(&pubkey).await
984 }
985
986 async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Result<Vec<Multiaddr>, Self::Error> {
987 let pubkey = hopr_transport::peer_id_to_public_key(peer)
988 .await
989 .map_err(HoprLibError::TransportError)?;
990
991 match self
992 .chain_api
993 .stream_accounts(AccountSelector {
994 public_only: false,
995 offchain_key: Some(pubkey),
996 ..Default::default()
997 })
998 .map_err(HoprLibError::chain)
999 .await?
1000 .next()
1001 .await
1002 {
1003 Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
1004 None => {
1005 error!(%peer, "no information");
1006 Ok(vec![])
1007 }
1008 }
1009 }
1010
1011 async fn ping(&self, peer: &PeerId) -> Result<(Duration, Self::TransportObservable), Self::Error> {
1012 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1013 let pubkey = hopr_transport::peer_id_to_public_key(peer)
1014 .await
1015 .map_err(HoprLibError::TransportError)?;
1016 Ok(self.transport_api.ping(&pubkey).await?)
1017 }
1018}
1019
1020impl<Chain, Graph, Net> Hopr<Chain, Graph, Net>
1021where
1022 Chain: HoprChainApi + Clone + Send + Sync + 'static,
1023 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
1024 + hopr_api::graph::NetworkGraphUpdate
1025 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
1026 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1027 + Clone
1028 + Send
1029 + Sync
1030 + 'static,
1031 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
1032 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1033 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1034 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1035{
1036 const PENDING_TO_CLOSE_TOLERANCE: Duration = Duration::from_secs(5);
1039
1040 pub fn me_onchain(&self) -> Address {
1041 *self.chain_api.me()
1042 }
1043
1044 pub fn get_safe_config(&self) -> SafeModuleConfig {
1045 SafeModuleConfig {
1046 safe_address: self.cfg.safe_module.safe_address,
1047 module_address: self.cfg.safe_module.module_address,
1048 }
1049 }
1050
1051 pub async fn get_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1052 self.chain_api
1053 .balance(self.me_onchain())
1054 .await
1055 .map_err(HoprLibError::chain)
1056 }
1057
1058 pub async fn get_safe_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1059 self.chain_api
1060 .balance(self.cfg.safe_module.safe_address)
1061 .await
1062 .map_err(HoprLibError::chain)
1063 }
1064
1065 pub async fn safe_allowance(&self) -> Result<HoprBalance, HoprLibError> {
1066 self.chain_api
1067 .safe_allowance(self.cfg.safe_module.safe_address)
1068 .await
1069 .map_err(HoprLibError::chain)
1070 }
1071
1072 pub async fn chain_info(&self) -> Result<ChainInfo, HoprLibError> {
1073 self.chain_api.chain_info().await.map_err(HoprLibError::chain)
1074 }
1075
1076 pub async fn get_ticket_price(&self) -> Result<HoprBalance, HoprLibError> {
1077 self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1078 }
1079
1080 pub async fn get_minimum_incoming_ticket_win_probability(&self) -> Result<WinningProbability, HoprLibError> {
1081 self.chain_api
1082 .minimum_incoming_ticket_win_prob()
1083 .await
1084 .map_err(HoprLibError::chain)
1085 }
1086
1087 pub async fn get_channel_closure_notice_period(&self) -> Result<Duration, HoprLibError> {
1088 self.chain_api
1089 .channel_closure_notice_period()
1090 .await
1091 .map_err(HoprLibError::chain)
1092 }
1093
1094 pub async fn announced_peers(&self) -> Result<Vec<AnnouncedPeer>, HoprLibError> {
1095 Ok(self
1096 .chain_api
1097 .stream_accounts(AccountSelector {
1098 public_only: true,
1099 ..Default::default()
1100 })
1101 .map_err(HoprLibError::chain)
1102 .await?
1103 .map(|entry| AnnouncedPeer {
1104 address: entry.chain_addr,
1105 multiaddresses: entry.get_multiaddrs().to_vec(),
1106 origin: AnnouncementOrigin::Chain,
1107 })
1108 .collect()
1109 .await)
1110 }
1111
1112 pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> Result<Option<Address>, HoprLibError> {
1113 let pubkey = hopr_transport::peer_id_to_public_key(peer_id)
1114 .await
1115 .map_err(HoprLibError::TransportError)?;
1116
1117 self.chain_api
1118 .packet_key_to_chain_key(&pubkey)
1119 .await
1120 .map_err(HoprLibError::chain)
1121 }
1122
1123 pub async fn chain_key_to_peerid(&self, address: &Address) -> Result<Option<PeerId>, HoprLibError> {
1124 self.chain_api
1125 .chain_key_to_packet_key(address)
1126 .await
1127 .map(|pk| pk.map(|v| v.into()))
1128 .map_err(HoprLibError::chain)
1129 }
1130
1131 pub async fn channel_from_hash(&self, channel_id: &Hash) -> Result<Option<ChannelEntry>, HoprLibError> {
1132 self.chain_api
1133 .channel_by_id(channel_id)
1134 .await
1135 .map_err(HoprLibError::chain)
1136 }
1137
1138 pub async fn channel(&self, src: &Address, dest: &Address) -> Result<Option<ChannelEntry>, HoprLibError> {
1139 self.chain_api
1140 .channel_by_parties(src, dest)
1141 .await
1142 .map_err(HoprLibError::chain)
1143 }
1144
1145 pub async fn channels_from(&self, src: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1146 Ok(self
1147 .chain_api
1148 .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1149 ChannelStatusDiscriminants::Closed,
1150 ChannelStatusDiscriminants::Open,
1151 ChannelStatusDiscriminants::PendingToClose,
1152 ]))
1153 .map_err(HoprLibError::chain)
1154 .await?
1155 .collect()
1156 .await)
1157 }
1158
1159 pub async fn channels_to(&self, dest: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1160 Ok(self
1161 .chain_api
1162 .stream_channels(
1163 ChannelSelector::default()
1164 .with_destination(*dest)
1165 .with_allowed_states(&[
1166 ChannelStatusDiscriminants::Closed,
1167 ChannelStatusDiscriminants::Open,
1168 ChannelStatusDiscriminants::PendingToClose,
1169 ]),
1170 )
1171 .map_err(HoprLibError::chain)
1172 .await?
1173 .collect()
1174 .await)
1175 }
1176
1177 pub async fn all_channels(&self) -> Result<Vec<ChannelEntry>, HoprLibError> {
1178 Ok(self
1179 .chain_api
1180 .stream_channels(ChannelSelector::default().with_allowed_states(&[
1181 ChannelStatusDiscriminants::Closed,
1182 ChannelStatusDiscriminants::Open,
1183 ChannelStatusDiscriminants::PendingToClose,
1184 ]))
1185 .map_err(HoprLibError::chain)
1186 .await?
1187 .collect()
1188 .await)
1189 }
1190
1191 pub async fn open_channel(
1192 &self,
1193 destination: &Address,
1194 amount: HoprBalance,
1195 ) -> Result<OpenChannelResult, HoprLibError> {
1196 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1197
1198 let channel_id = generate_channel_id(&self.me_onchain(), destination);
1199
1200 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1203 format!("open channel to {destination} ({channel_id})"),
1204 move |event| matches!(event, ChainEvent::ChannelOpened(c) if c.get_id() == &channel_id),
1205 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1206 )?;
1207
1208 let confirm_awaiter = self
1209 .chain_api
1210 .open_channel(destination, amount)
1211 .await
1212 .map_err(HoprLibError::chain)?;
1213
1214 let tx_hash = confirm_awaiter.await.map_err(|e| {
1215 event_abort.abort();
1216 HoprLibError::chain(e)
1217 })?;
1218
1219 let event = event_awaiter.await?;
1220 debug!(%event, "open channel event received");
1221
1222 Ok(OpenChannelResult { tx_hash, channel_id })
1223 }
1224
1225 pub async fn fund_channel(&self, channel_id: &ChannelId, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1226 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1227
1228 let channel_id = *channel_id;
1229
1230 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1233 format!("fund channel {channel_id}"),
1234 move |event| matches!(event, ChainEvent::ChannelBalanceIncreased(c, a) if c.get_id() == &channel_id && a == &amount),
1235 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT
1236 )?;
1237
1238 let confirm_awaiter = self
1239 .chain_api
1240 .fund_channel(&channel_id, amount)
1241 .await
1242 .map_err(HoprLibError::chain)?;
1243
1244 let res = confirm_awaiter.await.map_err(|e| {
1245 event_abort.abort();
1246 HoprLibError::chain(e)
1247 })?;
1248
1249 let event = event_awaiter.await?;
1250 debug!(%event, "fund channel event received");
1251
1252 Ok(res)
1253 }
1254
1255 pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> Result<CloseChannelResult, HoprLibError> {
1256 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1257
1258 let channel_id = *channel_id;
1259
1260 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1263 format!("close channel {channel_id}"),
1264 move |event| {
1265 matches!(event, ChainEvent::ChannelClosed(c) if c.get_id() == &channel_id)
1266 || matches!(event, ChainEvent::ChannelClosureInitiated(c) if c.get_id() == &channel_id)
1267 },
1268 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1269 )?;
1270
1271 let confirm_awaiter = self
1272 .chain_api
1273 .close_channel(&channel_id)
1274 .await
1275 .map_err(HoprLibError::chain)?;
1276
1277 let tx_hash = confirm_awaiter.await.map_err(|e| {
1278 event_abort.abort();
1279 HoprLibError::chain(e)
1280 })?;
1281
1282 let event = event_awaiter.await?;
1283 debug!(%event, "close channel event received");
1284
1285 Ok(CloseChannelResult { tx_hash })
1286 }
1287
1288 pub async fn ticket_statistics(&self) -> Result<ChannelStats, HoprLibError> {
1290 self.transport_api
1291 .ticket_manager()
1292 .ticket_stats(None)
1293 .map_err(HoprLibError::ticket_manager)
1294 }
1295
1296 pub async fn redeem_all_tickets<B: Into<HoprBalance> + Send>(
1297 &self,
1298 min_value: B,
1299 ) -> Result<Vec<RedemptionResult>, HoprLibError> {
1300 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1301
1302 let min_value = min_value.into();
1303
1304 self.ticket_management()
1305 .redeem_in_channels(
1306 self.chain_api.clone(),
1307 None,
1308 min_value.into(),
1309 Some(Self::PENDING_TO_CLOSE_TOLERANCE),
1310 )
1311 .await
1312 .map_err(HoprLibError::ticket_manager)?
1313 .try_collect::<Vec<_>>()
1314 .map_err(HoprLibError::ticket_manager)
1315 .await
1316 }
1317
1318 pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance> + Send>(
1319 &self,
1320 counterparty: &Address,
1321 min_value: B,
1322 ) -> Result<Vec<RedemptionResult>, HoprLibError> {
1323 self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1324 .await
1325 }
1326
1327 pub async fn redeem_tickets_in_channel<B: Into<HoprBalance> + Send>(
1328 &self,
1329 channel_id: &ChannelId,
1330 min_value: B,
1331 ) -> Result<Vec<RedemptionResult>, HoprLibError> {
1332 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1333
1334 let min_value = min_value.into();
1335
1336 let channel = self
1337 .chain_api
1338 .channel_by_id(channel_id)
1339 .await
1340 .map_err(HoprLibError::chain)?
1341 .ok_or(HoprLibError::GeneralError("channel not found".into()))?;
1342
1343 self.transport_api
1344 .ticket_manager()
1345 .redeem_in_channels(
1346 self.chain_api.clone(),
1347 ChannelSelector::default()
1348 .with_source(channel.source)
1349 .with_destination(channel.destination)
1350 .into(),
1351 min_value.into(),
1352 Some(Self::PENDING_TO_CLOSE_TOLERANCE),
1353 )
1354 .await
1355 .map_err(HoprLibError::ticket_manager)?
1356 .map_err(HoprLibError::ticket_manager)
1357 .try_collect::<Vec<_>>()
1358 .await
1359 }
1360
1361 pub fn ticket_management(&self) -> impl TicketManagement + Clone + Send + 'static {
1362 self.transport_api.ticket_manager()
1363 }
1364
1365 pub fn subscribe_ticket_events(&self) -> impl Stream<Item = TicketEvent> + Send + 'static {
1366 self.ticket_event_subscribers.1.activate_cloned()
1367 }
1368
1369 pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1370 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1371
1372 self.chain_api
1373 .withdraw(amount, &recipient)
1374 .and_then(identity)
1375 .map_err(HoprLibError::chain)
1376 .await
1377 }
1378
1379 pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> Result<Hash, HoprLibError> {
1380 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1381
1382 self.chain_api
1383 .withdraw(amount, &recipient)
1384 .and_then(identity)
1385 .map_err(HoprLibError::chain)
1386 .await
1387 }
1388}