1mod helpers;
18
19pub mod config;
21pub mod constants;
23pub mod errors;
25pub mod traits;
27pub mod utils;
29
30pub use hopr_api as api;
31
32#[doc(hidden)]
34pub mod exports {
35 pub mod types {
36 pub use hopr_api::types::{chain, internal, primitive};
37 }
38
39 pub mod crypto {
40 pub use hopr_api::types::crypto as types;
41 pub use hopr_crypto_keypair as keypair;
42 }
43
44 pub mod network {
45 pub use hopr_network_types as types;
46 }
47
48 pub use hopr_transport as transport;
49}
50
51#[doc(hidden)]
53pub mod prelude {
54 #[cfg(feature = "runtime-tokio")]
55 pub use super::exports::network::types::{
56 prelude::ForeignDataMode,
57 udp::{ConnectedUdpStream, UdpStreamParallelism},
58 };
59 pub use super::exports::{
60 crypto::{
61 keypair::key_pair::HoprKeys,
62 types::prelude::{ChainKeypair, Hash, OffchainKeypair},
63 },
64 transport::{OffchainPublicKey, socket::HoprSocket},
65 types::primitive::prelude::Address,
66 };
67}
68
69use std::{
70 convert::identity,
71 future::Future,
72 sync::{Arc, OnceLock, atomic::Ordering},
73 time::Duration,
74};
75
76use futures::{
77 FutureExt, SinkExt, Stream, StreamExt, TryFutureExt,
78 channel::mpsc::{SendError, channel},
79 pin_mut,
80 sink::SinkMapErr,
81};
82use futures_time::future::FutureExt as FuturesTimeFutureExt;
83use hopr_api::{
84 chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
85 ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
86 db::{HoprNodeDbApi, TicketMarker, TicketSelector},
87 node::{ChainInfo, CloseChannelResult, OpenChannelResult, SafeModuleConfig, state::AtomicHoprState},
88};
89pub use hopr_api::{
90 db::ChannelTicketStatistics,
91 graph::EdgeLinkObservable,
92 network::{NetworkBuilder, NetworkStreamControl},
93 node::{HoprNodeNetworkOperations, HoprNodeOperations, state::HoprState},
94 types::{crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
95};
96use hopr_async_runtime::prelude::spawn;
97pub use hopr_async_runtime::{Abortable, AbortableList};
98pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
99pub use hopr_network_types::prelude::*;
100#[cfg(all(feature = "telemetry", not(test)))]
101use hopr_platform::time::native::current_time;
102use hopr_transport::errors::HoprTransportError;
103#[cfg(feature = "runtime-tokio")]
104pub use hopr_transport::transfer_session;
105pub use hopr_transport::*;
106use tracing::{debug, error, info, warn};
107use validator::Validate;
108
109pub use crate::{
110 config::SafeModule,
111 constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
112 errors::{HoprLibError, HoprStatusError},
113};
114
115#[cfg(feature = "session-client")]
119#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, smart_default::SmartDefault)]
120#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
121pub struct HopRouting(
122 #[default(hopr_api::types::primitive::bounded::BoundedSize::MIN)]
123 hopr_api::types::primitive::bounded::BoundedSize<
124 { hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS },
125 >,
126);
127
128#[cfg(feature = "session-client")]
129impl HopRouting {
130 pub const MAX_HOPS: usize = hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS;
132
133 pub fn hop_count(self) -> usize {
135 self.0.into()
136 }
137}
138
139#[cfg(feature = "session-client")]
140impl TryFrom<usize> for HopRouting {
141 type Error = hopr_api::types::primitive::errors::GeneralError;
142
143 fn try_from(value: usize) -> Result<Self, Self::Error> {
144 Ok(Self(value.try_into()?))
145 }
146}
147
148#[cfg(feature = "session-client")]
149impl From<HopRouting> for hopr_api::types::internal::routing::RoutingOptions {
150 fn from(value: HopRouting) -> Self {
151 Self::Hops(value.0)
152 }
153}
154
155#[cfg(feature = "session-client")]
160#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault)]
161pub struct HoprSessionClientConfig {
162 pub forward_path: HopRouting,
164 pub return_path: HopRouting,
166 #[default(_code = "SessionCapability::Segmentation.into()")]
168 pub capabilities: SessionCapabilities,
169 #[default(None)]
171 pub pseudonym: Option<hopr_api::types::internal::protocol::HoprPseudonym>,
172 #[default(Some(SurbBalancerConfig::default()))]
174 pub surb_management: Option<SurbBalancerConfig>,
175 #[default(false)]
177 pub always_max_out_surbs: bool,
178}
179
180#[cfg(feature = "session-client")]
181impl From<HoprSessionClientConfig> for hopr_transport::SessionClientConfig {
182 fn from(value: HoprSessionClientConfig) -> Self {
183 Self {
184 forward_path_options: value.forward_path.into(),
185 return_path_options: value.return_path.into(),
186 capabilities: value.capabilities,
187 pseudonym: value.pseudonym,
188 surb_management: value.surb_management,
189 always_max_out_surbs: value.always_max_out_surbs,
190 }
191 }
192}
193
194#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display, strum::EnumCount)]
196pub enum HoprLibProcess {
197 #[strum(to_string = "transport: {0}")]
198 Transport(HoprTransportProcess),
199 #[strum(to_string = "session server providing the exit node session stream functionality")]
200 SessionServer,
201 #[strum(to_string = "ticket redemption queue driver")]
202 TicketRedemptions,
203 #[strum(to_string = "subscription for on-chain channel updates")]
204 ChannelEvents,
205 #[strum(to_string = "on received ticket event (winning or rejected)")]
206 TicketEvents,
207}
208
209#[cfg(all(feature = "telemetry", not(test)))]
210lazy_static::lazy_static! {
211 static ref METRIC_PROCESS_START_TIME: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
212 "hopr_start_time",
213 "The unix timestamp in seconds at which the process was started"
214 ).unwrap();
215 static ref METRIC_HOPR_LIB_VERSION: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
216 "hopr_lib_version",
217 "Executed version of hopr-lib",
218 &["version"]
219 ).unwrap();
220 static ref METRIC_HOPR_NODE_INFO: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
221 "hopr_node_addresses",
222 "Node on-chain and off-chain addresses",
223 &["peerid", "address", "safe_address", "module_address"]
224 ).unwrap();
225}
226
227#[cfg(feature = "runtime-tokio")]
232pub fn prepare_tokio_runtime(
233 num_cpu_threads: Option<std::num::NonZeroUsize>,
234 num_io_threads: Option<std::num::NonZeroUsize>,
235) -> anyhow::Result<tokio::runtime::Runtime> {
236 use std::str::FromStr;
237 let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
238
239 hopr_parallelize::cpu::init_thread_pool(
240 num_cpu_threads
241 .map(|v| v.get())
242 .or(avail_parallelism)
243 .ok_or(anyhow::anyhow!(
244 "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
245 environment variable."
246 ))?
247 .max(1),
248 )?;
249
250 Ok(tokio::runtime::Builder::new_multi_thread()
251 .enable_all()
252 .worker_threads(
253 num_io_threads
254 .map(|v| v.get())
255 .or(avail_parallelism)
256 .ok_or(anyhow::anyhow!(
257 "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
258 environment variable."
259 ))?
260 .max(1),
261 )
262 .thread_name("hoprd")
263 .thread_stack_size(
264 std::env::var("HOPRD_THREAD_STACK_SIZE")
265 .ok()
266 .and_then(|v| usize::from_str(&v).ok())
267 .unwrap_or(10 * 1024 * 1024)
268 .max(2 * 1024 * 1024),
269 )
270 .build()?)
271}
272
273pub type HoprTransportIO = socket::HoprSocket<
275 futures::channel::mpsc::Receiver<ApplicationDataIn>,
276 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
277>;
278
279type NewTicketEvents = (
280 async_broadcast::Sender<VerifiedTicket>,
281 async_broadcast::InactiveReceiver<VerifiedTicket>,
282);
283
284const NODE_READY_TIMEOUT: Duration = Duration::from_secs(120);
286
287const ON_CHAIN_RESOLUTION_EVENT_TIMEOUT: Duration = Duration::from_secs(90);
290
291pub struct Hopr<Chain, Db, Graph, Net>
303where
304 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
305 + hopr_api::graph::NetworkGraphUpdate
306 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
307 + Clone
308 + Send
309 + Sync
310 + 'static,
311 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
312 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
313 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
314{
315 me: OffchainKeypair,
316 cfg: config::HoprLibConfig,
317 state: Arc<api::node::state::AtomicHoprState>,
318 transport_api: HoprTransport<Chain, Db, Graph, Net>,
319 redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
320 node_db: Db,
321 chain_api: Chain,
322 winning_ticket_subscribers: NewTicketEvents,
323 processes: OnceLock<AbortableList<HoprLibProcess>>,
324}
325
326impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
327where
328 Chain: HoprChainApi + Clone + Send + Sync + 'static,
329 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
330 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
331 + hopr_api::graph::NetworkGraphUpdate
332 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
333 + Clone
334 + Send
335 + Sync
336 + 'static,
337 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
338 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
339 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
340{
341 pub async fn new(
342 identity: (&ChainKeypair, &OffchainKeypair),
343 hopr_chain_api: Chain,
344 hopr_node_db: Db,
345 graph: Graph,
346 cfg: config::HoprLibConfig,
347 ) -> errors::Result<Self> {
348 if hopr_api::types::crypto_random::is_rng_fixed() {
349 warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
350 }
351
352 cfg.validate()?;
353
354 let hopr_transport_api = HoprTransport::new(
355 identity,
356 hopr_chain_api.clone(),
357 hopr_node_db.clone(),
358 graph,
359 vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
360 cfg.protocol,
361 )
362 .map_err(HoprLibError::TransportError)?;
363
364 #[cfg(all(feature = "telemetry", not(test)))]
365 {
366 METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
367 METRIC_HOPR_LIB_VERSION.set(
368 &[const_format::formatcp!("{}", constants::APP_VERSION)],
369 const_format::formatcp!(
370 "{}.{}",
371 env!("CARGO_PKG_VERSION_MAJOR"),
372 env!("CARGO_PKG_VERSION_MINOR")
373 )
374 .parse()
375 .unwrap_or(0.0),
376 );
377
378 if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
380 error!(%error, "failed to initialize ticket statistics metrics");
381 }
382 }
383
384 let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
385 new_tickets_tx.set_await_active(false);
386 new_tickets_tx.set_overflow(true);
387
388 Ok(Self {
389 me: identity.1.clone(),
390 cfg,
391 state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
392 transport_api: hopr_transport_api,
393 chain_api: hopr_chain_api,
394 node_db: hopr_node_db,
395 redeem_requests: OnceLock::new(),
396 processes: OnceLock::new(),
397 winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
398 })
399 }
400
401 fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
402 if HoprNodeOperations::status(self) == state {
403 Ok(())
404 } else {
405 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
406 }
407 }
408
409 pub fn config(&self) -> &config::HoprLibConfig {
410 &self.cfg
411 }
412
413 #[inline]
414 fn is_public(&self) -> bool {
415 self.cfg.publish
416 }
417
418 #[cfg(feature = "telemetry")]
421 pub async fn network_peer_packet_stats(&self, peer: &PeerId) -> errors::Result<Option<PeerPacketStatsSnapshot>> {
422 Ok(self.transport_api.network_peer_packet_stats(peer).await?)
423 }
424
425 #[cfg(feature = "telemetry")]
428 pub async fn network_all_packet_stats(&self) -> errors::Result<Vec<(PeerId, PeerPacketStatsSnapshot)>> {
429 Ok(self.transport_api.network_all_packet_stats().await?)
430 }
431
432 pub async fn run<
433 Ct,
434 NetBuilder,
435 #[cfg(feature = "session-server")] T: traits::HoprSessionServer + Clone + Send + 'static,
436 >(
437 &self,
438 cover_traffic: Ct,
439 network_builder: NetBuilder,
440 #[cfg(feature = "session-server")] serve_handler: T,
441 ) -> errors::Result<HoprTransportIO>
442 where
443 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
444 NetBuilder: NetworkBuilder<Network = Net> + Send + Sync + 'static,
445 {
446 self.error_if_not_in_state(
447 HoprState::Uninitialized,
448 "cannot start the hopr node multiple times".into(),
449 )?;
450
451 #[cfg(feature = "testing")]
452 warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
453
454 let me_onchain = *self.chain_api.me();
455 info!(
456 address = %me_onchain, minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
457 "node is not started, please fund this node",
458 );
459
460 self.state.store(HoprState::WaitingForFunds, Ordering::Relaxed);
461 helpers::wait_for_funds(
462 *MIN_NATIVE_BALANCE,
463 *SUGGESTED_NATIVE_BALANCE,
464 Duration::from_secs(200),
465 me_onchain,
466 &self.chain_api,
467 )
468 .await?;
469
470 let mut processes = AbortableList::<HoprLibProcess>::default();
471
472 info!("starting HOPR node...");
473 self.state.store(HoprState::CheckingBalance, Ordering::Relaxed);
474
475 let balance: XDaiBalance = self.chain_api.balance(me_onchain).await.map_err(HoprLibError::chain)?;
476 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
477
478 info!(
479 address = %me_onchain,
480 %balance,
481 %minimum_balance,
482 "node information"
483 );
484
485 if balance.le(&minimum_balance) {
486 return Err(HoprLibError::GeneralError(
487 "cannot start the node without a sufficiently funded wallet".into(),
488 ));
489 }
490
491 self.state.store(HoprState::ValidatingNetworkConfig, Ordering::Relaxed);
492
493 let network_min_ticket_price = self
496 .chain_api
497 .minimum_ticket_price()
498 .await
499 .map_err(HoprLibError::chain)?;
500 let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
501 if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
502 return Err(HoprLibError::GeneralError(format!(
503 "configured outgoing ticket price is lower than the network minimum ticket price: \
504 {configured_ticket_price:?} < {network_min_ticket_price}"
505 )));
506 }
507 let network_min_win_prob = self
510 .chain_api
511 .minimum_incoming_ticket_win_prob()
512 .await
513 .map_err(HoprLibError::chain)?;
514 let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
515 if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
516 && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
517 {
518 return Err(HoprLibError::GeneralError(format!(
519 "configured outgoing ticket winning probability is lower than the network minimum winning \
520 probability: {configured_win_prob:?} < {network_min_win_prob}"
521 )));
522 }
523
524 self.state.store(HoprState::CheckingOnchainAddress, Ordering::Relaxed);
525
526 info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
527
528 let safe_addr = self.cfg.safe_module.safe_address;
529
530 if self.me_onchain() == safe_addr {
531 return Err(HoprLibError::GeneralError(
532 "cannot use self as staking safe address".into(),
533 ));
534 }
535
536 self.state.store(HoprState::RegisteringSafe, Ordering::Relaxed);
537 info!(%safe_addr, "registering safe with this node");
538 match self.chain_api.register_safe(&safe_addr).await {
539 Ok(awaiter) => {
540 awaiter.await.map_err(|error| {
542 error!(%safe_addr, %error, "safe registration failed with error");
543 HoprLibError::chain(error)
544 })?;
545 info!(%safe_addr, "safe successfully registered with this node");
546 }
547 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
548 info!(%safe_addr, "this safe is already registered with this node");
549 }
550 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
551 error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
553 return Err(HoprLibError::GeneralError("node registered with different safe".into()));
554 }
555 Err(error) => {
556 error!(%safe_addr, %error, "safe registration failed");
557 return Err(HoprLibError::chain(error));
558 }
559 }
560
561 let multiaddresses_to_announce = if self.is_public() {
563 self.transport_api.announceable_multiaddresses()
566 } else {
567 Vec::with_capacity(0)
568 };
569
570 multiaddresses_to_announce
572 .iter()
573 .filter(|a| !is_public_address(a))
574 .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
575
576 self.state.store(HoprState::AnnouncingNode, Ordering::Relaxed);
577
578 let chain_api = self.chain_api.clone();
579 let me_offchain = *self.me.public();
580 let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
581
582 info!(?multiaddresses_to_announce, "announcing node on chain");
585 match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
586 Ok(awaiter) => {
587 awaiter.await.map_err(|error| {
589 error!(?multiaddresses_to_announce, %error, "node announcement failed");
590 HoprLibError::chain(error)
591 })?;
592 info!(?multiaddresses_to_announce, "node has been successfully announced");
593 }
594 Err(AnnouncementError::AlreadyAnnounced) => {
595 info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
596 }
597 Err(error) => {
598 error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
599 return Err(HoprLibError::chain(error));
600 }
601 }
602
603 self.state.store(HoprState::AwaitingKeyBinding, Ordering::Relaxed);
604
605 let this_node_account = node_ready
607 .await
608 .map_err(HoprLibError::other)?
609 .map_err(HoprLibError::chain)?;
610 if this_node_account.chain_addr != self.me_onchain()
611 || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
612 {
613 error!(%this_node_account, "account bound to offchain key does not match this node");
614 return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
615 }
616
617 info!(%this_node_account, "node account is ready");
618
619 self.state.store(HoprState::InitializingServices, Ordering::Relaxed);
620
621 info!("initializing session infrastructure");
622 let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
623 .ok()
624 .and_then(|s| s.trim().parse::<usize>().ok())
625 .filter(|&c| c > 0)
626 .unwrap_or(256);
627
628 let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
629 #[cfg(feature = "session-server")]
630 {
631 debug!(capacity = incoming_session_channel_capacity, "creating session server");
632 processes.insert(
633 HoprLibProcess::SessionServer,
634 hopr_async_runtime::spawn_as_abortable!(
635 _session_rx
636 .for_each_concurrent(None, move |session| {
637 let serve_handler = serve_handler.clone();
638 async move {
639 let session_id = *session.session.id();
640 match serve_handler.process(session).await {
641 Ok(_) => debug!(?session_id, "client session processed successfully"),
642 Err(error) => error!(
643 ?session_id,
644 %error,
645 "client session processing failed"
646 ),
647 }
648 }
649 })
650 .inspect(|_| tracing::warn!(
651 task = %HoprLibProcess::SessionServer,
652 "long-running background task finished"
653 ))
654 ),
655 );
656 }
657
658 info!("starting ticket events processor");
659 let (tickets_tx, tickets_rx) = channel(8192);
660 let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
661 processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
662 let node_db = self.node_db.clone();
663 let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
664 spawn(
665 tickets_rx
666 .filter_map(move |ticket_event| {
667 let node_db = node_db.clone();
668 async move {
669 match ticket_event {
670 TicketEvent::WinningTicket(winning) => {
671 if let Err(error) = node_db.insert_ticket(*winning).await {
672 tracing::error!(%error, %winning, "failed to insert ticket into database");
673 } else {
674 tracing::debug!(%winning, "inserted ticket into database");
675 }
676 Some(winning)
677 }
678 TicketEvent::RejectedTicket(rejected, issuer) => {
679 if let Some(issuer) = &issuer {
680 if let Err(error) =
681 node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
682 {
683 tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
684 } else {
685 tracing::debug!(%rejected, "marked ticket as rejected");
686 }
687 } else {
688 tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
689 }
690 None
691 }
692 }
693 }
694 })
695 .for_each(move |ticket| {
696 if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
697 tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
698 }
699 futures::future::ready(())
700 })
701 .inspect(|_| {
702 tracing::warn!(
703 task = %HoprLibProcess::TicketEvents,
704 "long-running background task finished"
705 )
706 }),
707 );
708
709 info!("starting transport");
710 let (hopr_socket, transport_processes) = self
711 .transport_api
712 .run(cover_traffic, network_builder, tickets_tx, session_tx)
713 .await?;
714 processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
715
716 info!("starting ticket redemption service");
717 let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
719 let _ = self.redeem_requests.set(redemption_req_tx);
720 let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
721 processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
722 let chain = self.chain_api.clone();
723 let node_db = self.node_db.clone();
724 spawn(redemption_req_rx
725 .for_each(move |selector| {
726 let chain = chain.clone();
727 let db = node_db.clone();
728 async move {
729 match chain.redeem_tickets_via_selectors(&db, [selector]).await {
730 Ok(res) => debug!(%res, "redemption complete"),
731 Err(error) => error!(%error, "redemption failed"),
732 }
733 }
734 })
735 .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
736 );
737
738 info!("subscribing to channel events");
739 let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
740 processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
741 let chain = self.chain_api.clone();
742 let node_db = self.node_db.clone();
743 let events = chain.subscribe().map_err(HoprLibError::chain)?;
744 spawn(
745 futures::stream::Abortable::new(
746 events
747 .filter_map(move |event|
748 futures::future::ready(event.try_as_channel_closed())
749 ),
750 chain_events_sub_reg
751 )
752 .for_each(move |closed_channel| {
753 let node_db = node_db.clone();
754 let chain = chain.clone();
755 async move {
756 match closed_channel.direction(chain.me()) {
757 Some(ChannelDirection::Incoming) => {
758 match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
759 Ok(num_neglected) if num_neglected > 0 => {
760 warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
761 },
762 Ok(_) => {
763 debug!(%closed_channel, "no neglected tickets on incoming closed channel");
764 },
765 Err(error) => {
766 error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
767 }
768 }
769 },
770 Some(ChannelDirection::Outgoing) => {
771 if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
772 error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
773 } else {
774 debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
775 }
776 }
777 _ => {} }
779 }
780 })
781 .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
782 );
783
784 info!("synchronizing ticket states");
785 let mut channels = self
790 .chain_api
791 .stream_channels(ChannelSelector {
792 destination: self.me_onchain().into(),
793 ..Default::default()
794 })
795 .map_err(HoprLibError::chain)
796 .await?;
797
798 while let Some(channel) = channels.next().await {
799 self.node_db
802 .update_ticket_states_and_fetch(
803 [TicketSelector::from(&channel)
804 .with_state(AcknowledgedTicketStatus::BeingRedeemed)
805 .with_index_range(channel.ticket_index..)],
806 AcknowledgedTicketStatus::Untouched,
807 )
808 .map_err(HoprLibError::db)
809 .await?
810 .for_each(|ticket| {
811 info!(%ticket, "fixed next out-of-sync ticket");
812 futures::future::ready(())
813 })
814 .await;
815
816 self.node_db
818 .mark_tickets_as(
819 [TicketSelector::from(&channel).with_index_range(..channel.ticket_index)],
820 TicketMarker::Neglected,
821 )
822 .map_err(HoprLibError::db)
823 .await?;
824 }
825
826 self.state.store(HoprState::Running, Ordering::Relaxed);
827
828 info!(
829 id = %self.me_peer_id(),
830 version = constants::APP_VERSION,
831 "NODE STARTED AND RUNNING"
832 );
833
834 #[cfg(all(feature = "telemetry", not(test)))]
835 METRIC_HOPR_NODE_INFO.set(
836 &[
837 &self.me.public().to_peerid_str(),
838 &me_onchain.to_string(),
839 &self.cfg.safe_module.safe_address.to_string(),
840 &self.cfg.safe_module.module_address.to_string(),
841 ],
842 1.0,
843 );
844
845 let _ = self.processes.set(processes);
846 Ok(hopr_socket)
847 }
848
849 pub fn shutdown(&self) -> Result<(), HoprLibError> {
857 self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
858 if let Some(processes) = self.processes.get() {
859 processes.abort_all();
860 }
861 self.state.store(HoprState::Terminated, Ordering::Relaxed);
862 info!("NODE SHUTDOWN COMPLETE");
863 Ok(())
864 }
865
866 #[cfg(feature = "session-client")]
869 pub async fn connect_to(
870 &self,
871 destination: Address,
872 target: SessionTarget,
873 cfg: HoprSessionClientConfig,
874 ) -> errors::Result<HoprSession> {
875 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
876
877 let backoff = backon::ConstantBuilder::default()
878 .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
879 .with_delay(self.cfg.protocol.session.establish_retry_timeout)
880 .with_jitter();
881
882 use backon::Retryable;
883
884 Ok((|| {
885 let cfg = hopr_transport::SessionClientConfig::from(cfg.clone());
886 let target = target.clone();
887 async { self.transport_api.new_session(destination, target, cfg).await }
888 })
889 .retry(backoff)
890 .sleep(backon::FuturesTimerSleeper)
891 .await?)
892 }
893
894 #[cfg(feature = "session-client")]
897 pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
898 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
899 Ok(self.transport_api.probe_session(id).await?)
900 }
901
902 #[cfg(feature = "session-client")]
903 pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
904 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
905 Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
906 }
907
908 #[cfg(all(feature = "session-client", feature = "telemetry"))]
909 pub async fn get_session_stats(&self, id: &SessionId) -> errors::Result<SessionStatsSnapshot> {
910 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
911 Ok(self.transport_api.session_stats(id).await?)
912 }
913
914 #[cfg(feature = "session-client")]
915 pub async fn update_session_surb_balancer_config(
916 &self,
917 id: &SessionId,
918 cfg: SurbBalancerConfig,
919 ) -> errors::Result<()> {
920 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
921 Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
922 }
923
924 fn spawn_wait_for_on_chain_event(
927 &self,
928 context: impl std::fmt::Display,
929 predicate: impl Fn(&ChainEvent) -> bool + Send + Sync + 'static,
930 timeout: Duration,
931 ) -> errors::Result<(
932 impl Future<Output = errors::Result<ChainEvent>>,
933 hopr_async_runtime::AbortHandle,
934 )> {
935 debug!(%context, "registering wait for on-chain event");
936 let (event_stream, handle) = futures::stream::abortable(
937 self.chain_api
938 .subscribe()
939 .map_err(HoprLibError::chain)?
940 .skip_while(move |event| futures::future::ready(!predicate(event))),
941 );
942
943 let ctx = context.to_string();
944
945 Ok((
946 spawn(async move {
947 pin_mut!(event_stream);
948 let res = event_stream
949 .next()
950 .timeout(futures_time::time::Duration::from(timeout))
951 .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")))
952 .await?
953 .ok_or(HoprLibError::GeneralError(format!(
954 "failed to yield an on-chain event for {ctx}"
955 )));
956 debug!(%ctx, ?res, "on-chain event waiting done");
957 res
958 })
959 .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")))
960 .and_then(futures::future::ready),
961 handle,
962 ))
963 }
964}
965
966impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
967where
968 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
969 + hopr_api::graph::NetworkGraphUpdate
970 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
971 + Clone
972 + Send
973 + Sync
974 + 'static,
975 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
976 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
977 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
978{
979 pub fn collect_hopr_metrics() -> errors::Result<String> {
982 cfg_if::cfg_if! {
983 if #[cfg(all(feature = "telemetry", not(test)))] {
984 hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
985 } else {
986 Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
987 }
988 }
989 }
990}
991
992impl<Chain, Db, Graph, Net> HoprNodeOperations for Hopr<Chain, Db, Graph, Net>
995where
996 Chain: HoprChainApi + Clone + Send + Sync + 'static,
997 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
998 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
999 + hopr_api::graph::NetworkGraphUpdate
1000 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1001 + Clone
1002 + Send
1003 + Sync
1004 + 'static,
1005 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
1006 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1007 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1008{
1009 fn status(&self) -> HoprState {
1010 self.state.load(Ordering::Relaxed)
1011 }
1012}
1013
1014#[async_trait::async_trait]
1015impl<Chain, Db, Graph, Net> HoprNodeNetworkOperations for Hopr<Chain, Db, Graph, Net>
1016where
1017 Chain: HoprChainApi + Clone + Send + Sync + 'static,
1018 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
1019 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
1020 + hopr_api::graph::NetworkGraphUpdate
1021 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1022 + Clone
1023 + Send
1024 + Sync
1025 + 'static,
1026 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
1027 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1028 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1029{
1030 type Error = HoprLibError;
1031 type TransportObservable = <Graph as hopr_api::graph::NetworkGraphView>::Observed;
1032
1033 fn me_peer_id(&self) -> PeerId {
1034 (*self.me.public()).into()
1035 }
1036
1037 async fn get_public_nodes(&self) -> Result<Vec<(PeerId, Address, Vec<Multiaddr>)>, Self::Error> {
1038 Ok(self
1039 .chain_api
1040 .stream_accounts(AccountSelector {
1041 public_only: true,
1042 ..Default::default()
1043 })
1044 .map_err(HoprLibError::chain)
1045 .await?
1046 .map(|entry| {
1047 (
1048 PeerId::from(entry.public_key),
1049 entry.chain_addr,
1050 entry.get_multiaddrs().to_vec(),
1051 )
1052 })
1053 .collect()
1054 .await)
1055 }
1056
1057 async fn network_health(&self) -> hopr_api::network::Health {
1058 self.transport_api.network_health().await
1059 }
1060
1061 async fn network_connected_peers(&self) -> Result<Vec<PeerId>, Self::Error> {
1062 Ok(self
1063 .transport_api
1064 .network_connected_peers()
1065 .await?
1066 .into_iter()
1067 .map(PeerId::from)
1068 .collect())
1069 }
1070
1071 fn network_peer_info(&self, peer: &PeerId) -> Option<Self::TransportObservable> {
1072 let pubkey = OffchainPublicKey::from_peerid(peer).ok()?;
1073 self.transport_api.network_peer_observations(&pubkey)
1074 }
1075
1076 async fn all_network_peers(
1077 &self,
1078 minimum_score: f64,
1079 ) -> Result<Vec<(Option<Address>, PeerId, Self::TransportObservable)>, Self::Error> {
1080 Ok(
1081 futures::stream::iter(self.transport_api.all_network_peers(minimum_score).await?)
1082 .filter_map(|(pubkey, info)| async move {
1083 let peer_id = PeerId::from(pubkey);
1084 let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
1085 Some((address, peer_id, info))
1086 })
1087 .collect::<Vec<_>>()
1088 .await,
1089 )
1090 }
1091
1092 fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1093 self.transport_api.local_multiaddresses()
1094 }
1095
1096 async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1097 self.transport_api.listening_multiaddresses().await
1098 }
1099
1100 async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1101 let Ok(pubkey) = hopr_transport::peer_id_to_public_key(peer).await else {
1102 return vec![];
1103 };
1104 self.transport_api.network_observed_multiaddresses(&pubkey).await
1105 }
1106
1107 async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Result<Vec<Multiaddr>, Self::Error> {
1108 let pubkey = hopr_transport::peer_id_to_public_key(peer)
1109 .await
1110 .map_err(HoprLibError::TransportError)?;
1111
1112 match self
1113 .chain_api
1114 .stream_accounts(AccountSelector {
1115 public_only: false,
1116 offchain_key: Some(pubkey),
1117 ..Default::default()
1118 })
1119 .map_err(HoprLibError::chain)
1120 .await?
1121 .next()
1122 .await
1123 {
1124 Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
1125 None => {
1126 error!(%peer, "no information");
1127 Ok(vec![])
1128 }
1129 }
1130 }
1131
1132 async fn ping(&self, peer: &PeerId) -> Result<(Duration, Self::TransportObservable), Self::Error> {
1133 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1134 let pubkey = hopr_transport::peer_id_to_public_key(peer)
1135 .await
1136 .map_err(HoprLibError::TransportError)?;
1137 Ok(self.transport_api.ping(&pubkey).await?)
1138 }
1139}
1140
1141pub type SinkMap = SinkMapErr<futures::channel::mpsc::Sender<TicketSelector>, fn(SendError) -> HoprLibError>;
1142
1143impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
1144where
1145 Chain: HoprChainApi + Clone + Send + Sync + 'static,
1146 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
1147 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
1148 + hopr_api::graph::NetworkGraphUpdate
1149 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1150 + Clone
1151 + Send
1152 + Sync
1153 + 'static,
1154 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
1155 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1156 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1157{
1158 pub fn me_onchain(&self) -> Address {
1159 *self.chain_api.me()
1160 }
1161
1162 pub fn get_safe_config(&self) -> SafeModuleConfig {
1163 SafeModuleConfig {
1164 safe_address: self.cfg.safe_module.safe_address,
1165 module_address: self.cfg.safe_module.module_address,
1166 }
1167 }
1168
1169 pub async fn get_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1170 self.chain_api
1171 .balance(self.me_onchain())
1172 .await
1173 .map_err(HoprLibError::chain)
1174 }
1175
1176 pub async fn get_safe_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1177 self.chain_api
1178 .balance(self.cfg.safe_module.safe_address)
1179 .await
1180 .map_err(HoprLibError::chain)
1181 }
1182
1183 pub async fn safe_allowance(&self) -> Result<HoprBalance, HoprLibError> {
1184 self.chain_api
1185 .safe_allowance(self.cfg.safe_module.safe_address)
1186 .await
1187 .map_err(HoprLibError::chain)
1188 }
1189
1190 pub async fn chain_info(&self) -> Result<ChainInfo, HoprLibError> {
1191 self.chain_api.chain_info().await.map_err(HoprLibError::chain)
1192 }
1193
1194 pub async fn get_ticket_price(&self) -> Result<HoprBalance, HoprLibError> {
1195 self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1196 }
1197
1198 pub async fn get_minimum_incoming_ticket_win_probability(&self) -> Result<WinningProbability, HoprLibError> {
1199 self.chain_api
1200 .minimum_incoming_ticket_win_prob()
1201 .await
1202 .map_err(HoprLibError::chain)
1203 }
1204
1205 pub async fn get_channel_closure_notice_period(&self) -> Result<Duration, HoprLibError> {
1206 self.chain_api
1207 .channel_closure_notice_period()
1208 .await
1209 .map_err(HoprLibError::chain)
1210 }
1211
1212 pub async fn accounts_announced_on_chain(&self) -> Result<Vec<AccountEntry>, HoprLibError> {
1213 Ok(self
1214 .chain_api
1215 .stream_accounts(AccountSelector {
1216 public_only: true,
1217 ..Default::default()
1218 })
1219 .map_err(HoprLibError::chain)
1220 .await?
1221 .collect()
1222 .await)
1223 }
1224
1225 pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> Result<Option<Address>, HoprLibError> {
1226 let pubkey = hopr_transport::peer_id_to_public_key(peer_id)
1227 .await
1228 .map_err(HoprLibError::TransportError)?;
1229
1230 self.chain_api
1231 .packet_key_to_chain_key(&pubkey)
1232 .await
1233 .map_err(HoprLibError::chain)
1234 }
1235
1236 pub async fn chain_key_to_peerid(&self, address: &Address) -> Result<Option<PeerId>, HoprLibError> {
1237 self.chain_api
1238 .chain_key_to_packet_key(address)
1239 .await
1240 .map(|pk| pk.map(|v| v.into()))
1241 .map_err(HoprLibError::chain)
1242 }
1243
1244 pub async fn channel_from_hash(&self, channel_id: &Hash) -> Result<Option<ChannelEntry>, HoprLibError> {
1245 self.chain_api
1246 .channel_by_id(channel_id)
1247 .await
1248 .map_err(HoprLibError::chain)
1249 }
1250
1251 pub async fn channel(&self, src: &Address, dest: &Address) -> Result<Option<ChannelEntry>, HoprLibError> {
1252 self.chain_api
1253 .channel_by_parties(src, dest)
1254 .await
1255 .map_err(HoprLibError::chain)
1256 }
1257
1258 pub async fn channels_from(&self, src: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1259 Ok(self
1260 .chain_api
1261 .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1262 ChannelStatusDiscriminants::Closed,
1263 ChannelStatusDiscriminants::Open,
1264 ChannelStatusDiscriminants::PendingToClose,
1265 ]))
1266 .map_err(HoprLibError::chain)
1267 .await?
1268 .collect()
1269 .await)
1270 }
1271
1272 pub async fn channels_to(&self, dest: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1273 Ok(self
1274 .chain_api
1275 .stream_channels(
1276 ChannelSelector::default()
1277 .with_destination(*dest)
1278 .with_allowed_states(&[
1279 ChannelStatusDiscriminants::Closed,
1280 ChannelStatusDiscriminants::Open,
1281 ChannelStatusDiscriminants::PendingToClose,
1282 ]),
1283 )
1284 .map_err(HoprLibError::chain)
1285 .await?
1286 .collect()
1287 .await)
1288 }
1289
1290 pub async fn all_channels(&self) -> Result<Vec<ChannelEntry>, HoprLibError> {
1291 Ok(self
1292 .chain_api
1293 .stream_channels(ChannelSelector::default().with_allowed_states(&[
1294 ChannelStatusDiscriminants::Closed,
1295 ChannelStatusDiscriminants::Open,
1296 ChannelStatusDiscriminants::PendingToClose,
1297 ]))
1298 .map_err(HoprLibError::chain)
1299 .await?
1300 .collect()
1301 .await)
1302 }
1303
1304 pub async fn open_channel(
1305 &self,
1306 destination: &Address,
1307 amount: HoprBalance,
1308 ) -> Result<OpenChannelResult, HoprLibError> {
1309 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1310
1311 let channel_id = generate_channel_id(&self.me_onchain(), destination);
1312
1313 let confirm_awaiter = self
1314 .chain_api
1315 .open_channel(destination, amount)
1316 .await
1317 .map_err(HoprLibError::chain)?;
1318
1319 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1320 format!("open channel to {destination} ({channel_id})"),
1321 move |event| matches!(event, ChainEvent::ChannelOpened(c) if c.get_id() == &channel_id),
1322 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1323 )?;
1324
1325 let tx_hash = confirm_awaiter.await.map_err(|e| {
1326 event_abort.abort();
1327 HoprLibError::chain(e)
1328 })?;
1329
1330 let event = event_awaiter.await?;
1331 debug!(%event, "open channel event received");
1332
1333 Ok(OpenChannelResult { tx_hash, channel_id })
1334 }
1335
1336 pub async fn fund_channel(&self, channel_id: &ChannelId, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1337 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1338
1339 let channel_id = *channel_id;
1340
1341 let confirm_awaiter = self
1342 .chain_api
1343 .fund_channel(&channel_id, amount)
1344 .await
1345 .map_err(HoprLibError::chain)?;
1346
1347 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1348 format!("fund channel {channel_id}"),
1349 move |event| matches!(event, ChainEvent::ChannelBalanceIncreased(c, a) if c.get_id() == &channel_id && a == &amount),
1350 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT
1351 )?;
1352
1353 let res = confirm_awaiter.await.map_err(|e| {
1354 event_abort.abort();
1355 HoprLibError::chain(e)
1356 })?;
1357
1358 let event = event_awaiter.await?;
1359 debug!(%event, "fund channel event received");
1360
1361 Ok(res)
1362 }
1363
1364 pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> Result<CloseChannelResult, HoprLibError> {
1365 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1366
1367 let channel_id = *channel_id;
1368
1369 let confirm_awaiter = self
1370 .chain_api
1371 .close_channel(&channel_id)
1372 .await
1373 .map_err(HoprLibError::chain)?;
1374
1375 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1376 format!("close channel {channel_id}"),
1377 move |event| {
1378 matches!(event, ChainEvent::ChannelClosed(c) if c.get_id() == &channel_id)
1379 || matches!(event, ChainEvent::ChannelClosureInitiated(c) if c.get_id() == &channel_id)
1380 },
1381 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1382 )?;
1383
1384 let tx_hash = confirm_awaiter.await.map_err(|e| {
1385 event_abort.abort();
1386 HoprLibError::chain(e)
1387 })?;
1388
1389 let event = event_awaiter.await?;
1390 debug!(%event, "close channel event received");
1391
1392 Ok(CloseChannelResult { tx_hash })
1393 }
1394
1395 pub async fn tickets_in_channel(
1396 &self,
1397 channel_id: &ChannelId,
1398 ) -> Result<Option<Vec<RedeemableTicket>>, HoprLibError> {
1399 if let Some(channel) = self
1400 .chain_api
1401 .channel_by_id(channel_id)
1402 .await
1403 .map_err(|e| HoprTransportError::Other(e.into()))?
1404 {
1405 if &channel.destination == self.chain_api.me() {
1406 Ok(Some(
1407 self.node_db
1408 .stream_tickets([&channel])
1409 .await
1410 .map_err(HoprLibError::db)?
1411 .collect()
1412 .await,
1413 ))
1414 } else {
1415 Ok(None)
1416 }
1417 } else {
1418 Ok(None)
1419 }
1420 }
1421
1422 pub async fn all_tickets(&self) -> Result<Vec<VerifiedTicket>, HoprLibError> {
1423 Ok(self
1424 .node_db
1425 .stream_tickets(None::<TicketSelector>)
1426 .await
1427 .map_err(HoprLibError::db)?
1428 .map(|v| v.ticket)
1429 .collect()
1430 .await)
1431 }
1432
1433 pub async fn ticket_statistics(&self) -> Result<ChannelTicketStatistics, HoprLibError> {
1434 self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
1435 }
1436
1437 pub async fn reset_ticket_statistics(&self) -> Result<(), HoprLibError> {
1438 self.node_db
1439 .reset_ticket_statistics()
1440 .await
1441 .map_err(HoprLibError::chain)
1442 }
1443
1444 pub async fn redeem_all_tickets<B: Into<HoprBalance> + Send>(&self, min_value: B) -> Result<(), HoprLibError> {
1445 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1446
1447 let min_value = min_value.into();
1448
1449 self.chain_api
1450 .stream_channels(
1451 ChannelSelector::default()
1452 .with_destination(self.me_onchain())
1453 .with_allowed_states(&[
1454 ChannelStatusDiscriminants::Open,
1455 ChannelStatusDiscriminants::PendingToClose,
1456 ]),
1457 )
1458 .map_err(HoprLibError::chain)
1459 .await?
1460 .map(|channel| {
1461 Ok(TicketSelector::from(&channel)
1462 .with_amount(min_value..)
1463 .with_index_range(channel.ticket_index..)
1464 .with_state(AcknowledgedTicketStatus::Untouched))
1465 })
1466 .forward(self.redemption_requests()?)
1467 .await?;
1468
1469 Ok(())
1470 }
1471
1472 pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance> + Send>(
1473 &self,
1474 counterparty: &Address,
1475 min_value: B,
1476 ) -> Result<(), HoprLibError> {
1477 self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1478 .await
1479 }
1480
1481 pub async fn redeem_tickets_in_channel<B: Into<HoprBalance> + Send>(
1482 &self,
1483 channel_id: &Hash,
1484 min_value: B,
1485 ) -> Result<(), HoprLibError> {
1486 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1487
1488 let channel = self
1489 .chain_api
1490 .channel_by_id(channel_id)
1491 .await
1492 .map_err(HoprLibError::chain)?
1493 .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1494
1495 self.redemption_requests()?
1496 .send(
1497 TicketSelector::from(channel)
1498 .with_amount(min_value.into()..)
1499 .with_index_range(channel.ticket_index..)
1500 .with_state(AcknowledgedTicketStatus::Untouched),
1501 )
1502 .await?;
1503
1504 Ok(())
1505 }
1506
1507 pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> Result<(), HoprLibError> {
1508 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1509
1510 self.redemption_requests()?
1511 .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1512 .await?;
1513
1514 Ok(())
1515 }
1516
1517 pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send + 'static {
1518 self.winning_ticket_subscribers.1.activate_cloned()
1519 }
1520
1521 pub fn redemption_requests(&self) -> Result<SinkMap, HoprLibError> {
1522 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1523
1524 Ok(self
1525 .redeem_requests
1526 .get()
1527 .cloned()
1528 .expect("redeem_requests is not initialized")
1529 .sink_map_err(|e| HoprLibError::GeneralError(format!("failed to send redemption request: {e}"))))
1530 }
1531
1532 pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1533 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1534
1535 self.chain_api
1536 .withdraw(amount, &recipient)
1537 .and_then(identity)
1538 .map_err(HoprLibError::chain)
1539 .await
1540 }
1541
1542 pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> Result<Hash, HoprLibError> {
1543 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1544
1545 self.chain_api
1546 .withdraw(amount, &recipient)
1547 .and_then(identity)
1548 .map_err(HoprLibError::chain)
1549 .await
1550 }
1551}