1mod helpers;
18
19pub mod config;
21pub mod constants;
23pub mod errors;
25pub mod traits;
27pub mod utils;
29
30pub use hopr_api as api;
31
32#[doc(hidden)]
34pub mod exports {
35 pub mod types {
36 pub use hopr_api::types::{chain, internal, primitive};
37 }
38
39 pub mod crypto {
40 pub use hopr_api::types::crypto as types;
41 pub use hopr_crypto_keypair as keypair;
42 }
43
44 pub mod network {
45 pub use hopr_network_types as types;
46 }
47
48 pub use hopr_transport as transport;
49}
50
51#[doc(hidden)]
53pub mod prelude {
54 #[cfg(feature = "runtime-tokio")]
55 pub use super::exports::network::types::{
56 prelude::ForeignDataMode,
57 udp::{ConnectedUdpStream, UdpStreamParallelism},
58 };
59 pub use super::exports::{
60 crypto::{
61 keypair::key_pair::HoprKeys,
62 types::prelude::{ChainKeypair, Hash, OffchainKeypair},
63 },
64 transport::{OffchainPublicKey, socket::HoprSocket},
65 types::primitive::prelude::Address,
66 };
67}
68
69use std::{
70 convert::identity,
71 future::Future,
72 sync::{Arc, OnceLock, atomic::Ordering},
73 time::Duration,
74};
75
76use futures::{
77 FutureExt, SinkExt, Stream, StreamExt, TryFutureExt,
78 channel::mpsc::{SendError, channel},
79 pin_mut,
80 sink::SinkMapErr,
81};
82use futures_time::future::FutureExt as FuturesTimeFutureExt;
83use hopr_api::{
84 chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
85 ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
86 db::{HoprNodeDbApi, TicketMarker, TicketSelector},
87 node::{ChainInfo, CloseChannelResult, OpenChannelResult, SafeModuleConfig, state::AtomicHoprState},
88};
89pub use hopr_api::{
90 db::ChannelTicketStatistics,
91 graph::EdgeLinkObservable,
92 network::{NetworkBuilder, NetworkStreamControl},
93 node::{HoprNodeNetworkOperations, HoprNodeOperations, state::HoprState},
94 types::{crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
95};
96use hopr_async_runtime::prelude::spawn;
97pub use hopr_async_runtime::{Abortable, AbortableList};
98pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
99pub use hopr_network_types::prelude::*;
100#[cfg(all(feature = "telemetry", not(test)))]
101use hopr_platform::time::native::current_time;
102use hopr_transport::errors::HoprTransportError;
103#[cfg(feature = "runtime-tokio")]
104pub use hopr_transport::transfer_session;
105pub use hopr_transport::*;
106use tracing::{debug, error, info, warn};
107use validator::Validate;
108
109pub use crate::{
110 config::SafeModule,
111 constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
112 errors::{HoprLibError, HoprStatusError},
113};
114
115#[cfg(feature = "session-client")]
119#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, smart_default::SmartDefault)]
120#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
121pub struct HopRouting(
122 #[default(hopr_api::types::primitive::bounded::BoundedSize::MIN)]
123 hopr_api::types::primitive::bounded::BoundedSize<
124 { hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS },
125 >,
126);
127
128#[cfg(feature = "session-client")]
129impl HopRouting {
130 pub const MAX_HOPS: usize = hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS;
132
133 pub fn hop_count(self) -> usize {
135 self.0.into()
136 }
137}
138
139#[cfg(feature = "session-client")]
140impl TryFrom<usize> for HopRouting {
141 type Error = hopr_api::types::primitive::errors::GeneralError;
142
143 fn try_from(value: usize) -> Result<Self, Self::Error> {
144 Ok(Self(value.try_into()?))
145 }
146}
147
148#[cfg(feature = "session-client")]
149impl From<HopRouting> for hopr_api::types::internal::routing::RoutingOptions {
150 fn from(value: HopRouting) -> Self {
151 Self::Hops(value.0)
152 }
153}
154
155#[cfg(feature = "session-client")]
160#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault)]
161pub struct HoprSessionClientConfig {
162 pub forward_path: HopRouting,
164 pub return_path: HopRouting,
166 #[default(_code = "SessionCapability::Segmentation.into()")]
168 pub capabilities: SessionCapabilities,
169 #[default(None)]
171 pub pseudonym: Option<hopr_api::types::internal::protocol::HoprPseudonym>,
172 #[default(Some(SurbBalancerConfig::default()))]
174 pub surb_management: Option<SurbBalancerConfig>,
175 #[default(false)]
177 pub always_max_out_surbs: bool,
178}
179
180#[cfg(feature = "session-client")]
181impl From<HoprSessionClientConfig> for hopr_transport::SessionClientConfig {
182 fn from(value: HoprSessionClientConfig) -> Self {
183 Self {
184 forward_path_options: value.forward_path.into(),
185 return_path_options: value.return_path.into(),
186 capabilities: value.capabilities,
187 pseudonym: value.pseudonym,
188 surb_management: value.surb_management,
189 always_max_out_surbs: value.always_max_out_surbs,
190 }
191 }
192}
193
194#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display, strum::EnumCount)]
196pub enum HoprLibProcess {
197 #[strum(to_string = "transport: {0}")]
198 Transport(HoprTransportProcess),
199 #[strum(to_string = "session server providing the exit node session stream functionality")]
200 SessionServer,
201 #[strum(to_string = "ticket redemption queue driver")]
202 TicketRedemptions,
203 #[strum(to_string = "subscription for on-chain channel updates")]
204 ChannelEvents,
205 #[strum(to_string = "on received ticket event (winning or rejected)")]
206 TicketEvents,
207}
208
209#[cfg(all(feature = "telemetry", not(test)))]
210lazy_static::lazy_static! {
211 static ref METRIC_PROCESS_START_TIME: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
212 "hopr_start_time",
213 "The unix timestamp in seconds at which the process was started"
214 ).unwrap();
215 static ref METRIC_HOPR_LIB_VERSION: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
216 "hopr_lib_version",
217 "Executed version of hopr-lib",
218 &["version"]
219 ).unwrap();
220 static ref METRIC_HOPR_NODE_INFO: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
221 "hopr_node_addresses",
222 "Node on-chain and off-chain addresses",
223 &["peerid", "address", "safe_address", "module_address"]
224 ).unwrap();
225}
226
227#[cfg(feature = "runtime-tokio")]
232pub fn prepare_tokio_runtime(
233 num_cpu_threads: Option<std::num::NonZeroUsize>,
234 num_io_threads: Option<std::num::NonZeroUsize>,
235) -> anyhow::Result<tokio::runtime::Runtime> {
236 use std::str::FromStr;
237 let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
238
239 hopr_parallelize::cpu::init_thread_pool(
240 num_cpu_threads
241 .map(|v| v.get())
242 .or(avail_parallelism)
243 .ok_or(anyhow::anyhow!(
244 "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
245 environment variable."
246 ))?
247 .max(1),
248 )?;
249
250 Ok(tokio::runtime::Builder::new_multi_thread()
251 .enable_all()
252 .worker_threads(
253 num_io_threads
254 .map(|v| v.get())
255 .or(avail_parallelism)
256 .ok_or(anyhow::anyhow!(
257 "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
258 environment variable."
259 ))?
260 .max(1),
261 )
262 .thread_name("hoprd")
263 .thread_stack_size(
264 std::env::var("HOPRD_THREAD_STACK_SIZE")
265 .ok()
266 .and_then(|v| usize::from_str(&v).ok())
267 .unwrap_or(10 * 1024 * 1024)
268 .max(2 * 1024 * 1024),
269 )
270 .build()?)
271}
272
273pub type HoprTransportIO = socket::HoprSocket<
275 futures::channel::mpsc::Receiver<ApplicationDataIn>,
276 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
277>;
278
279type NewTicketEvents = (
280 async_broadcast::Sender<VerifiedTicket>,
281 async_broadcast::InactiveReceiver<VerifiedTicket>,
282);
283
284const NODE_READY_TIMEOUT: Duration = Duration::from_secs(120);
286
287const ON_CHAIN_RESOLUTION_EVENT_TIMEOUT: Duration = Duration::from_secs(90);
290
291pub struct Hopr<Chain, Db, Graph, Net>
303where
304 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
305 + hopr_api::graph::NetworkGraphUpdate
306 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
307 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
308 + Clone
309 + Send
310 + Sync
311 + 'static,
312 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
313 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
314 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
315 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
316{
317 me: OffchainKeypair,
318 cfg: config::HoprLibConfig,
319 state: Arc<api::node::state::AtomicHoprState>,
320 transport_api: HoprTransport<Chain, Db, Graph, Net>,
321 redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
322 node_db: Db,
323 chain_api: Chain,
324 winning_ticket_subscribers: NewTicketEvents,
325 processes: OnceLock<AbortableList<HoprLibProcess>>,
326}
327
328impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
329where
330 Chain: HoprChainApi + Clone + Send + Sync + 'static,
331 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
332 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
333 + hopr_api::graph::NetworkGraphUpdate
334 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
335 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
336 + Clone
337 + Send
338 + Sync
339 + 'static,
340 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
341 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
342 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
343 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
344{
345 pub async fn new(
346 identity: (&ChainKeypair, &OffchainKeypair),
347 hopr_chain_api: Chain,
348 hopr_node_db: Db,
349 graph: Graph,
350 cfg: config::HoprLibConfig,
351 ) -> errors::Result<Self> {
352 if hopr_api::types::crypto_random::is_rng_fixed() {
353 warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
354 }
355
356 cfg.validate()?;
357
358 let hopr_transport_api = HoprTransport::new(
359 identity,
360 hopr_chain_api.clone(),
361 hopr_node_db.clone(),
362 graph,
363 vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
364 cfg.protocol,
365 )
366 .map_err(HoprLibError::TransportError)?;
367
368 #[cfg(all(feature = "telemetry", not(test)))]
369 {
370 METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
371 METRIC_HOPR_LIB_VERSION.set(
372 &[const_format::formatcp!("{}", constants::APP_VERSION)],
373 const_format::formatcp!(
374 "{}.{}",
375 env!("CARGO_PKG_VERSION_MAJOR"),
376 env!("CARGO_PKG_VERSION_MINOR")
377 )
378 .parse()
379 .unwrap_or(0.0),
380 );
381
382 if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
384 error!(%error, "failed to initialize ticket statistics metrics");
385 }
386 }
387
388 let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
389 new_tickets_tx.set_await_active(false);
390 new_tickets_tx.set_overflow(true);
391
392 Ok(Self {
393 me: identity.1.clone(),
394 cfg,
395 state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
396 transport_api: hopr_transport_api,
397 chain_api: hopr_chain_api,
398 node_db: hopr_node_db,
399 redeem_requests: OnceLock::new(),
400 processes: OnceLock::new(),
401 winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
402 })
403 }
404
405 fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
406 if HoprNodeOperations::status(self) == state {
407 Ok(())
408 } else {
409 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
410 }
411 }
412
413 pub fn config(&self) -> &config::HoprLibConfig {
414 &self.cfg
415 }
416
417 pub fn graph(&self) -> &Graph {
419 self.transport_api.graph()
420 }
421
422 #[inline]
423 fn is_public(&self) -> bool {
424 self.cfg.publish
425 }
426
427 pub async fn run<
428 Ct,
429 NetBuilder,
430 #[cfg(feature = "session-server")] T: traits::HoprSessionServer + Clone + Send + 'static,
431 >(
432 &self,
433 cover_traffic: Ct,
434 network_builder: NetBuilder,
435 #[cfg(feature = "session-server")] serve_handler: T,
436 ) -> errors::Result<HoprTransportIO>
437 where
438 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
439 NetBuilder: NetworkBuilder<Network = Net> + Send + Sync + 'static,
440 {
441 self.error_if_not_in_state(
442 HoprState::Uninitialized,
443 "cannot start the hopr node multiple times".into(),
444 )?;
445
446 #[cfg(feature = "testing")]
447 warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
448
449 let me_onchain = *self.chain_api.me();
450 info!(
451 address = %me_onchain, minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
452 "node is not started, please fund this node",
453 );
454
455 self.state.store(HoprState::WaitingForFunds, Ordering::Relaxed);
456 helpers::wait_for_funds(
457 *MIN_NATIVE_BALANCE,
458 *SUGGESTED_NATIVE_BALANCE,
459 Duration::from_secs(200),
460 me_onchain,
461 &self.chain_api,
462 )
463 .await?;
464
465 let mut processes = AbortableList::<HoprLibProcess>::default();
466
467 info!("starting HOPR node...");
468 self.state.store(HoprState::CheckingBalance, Ordering::Relaxed);
469
470 let balance: XDaiBalance = self.chain_api.balance(me_onchain).await.map_err(HoprLibError::chain)?;
471 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
472
473 info!(
474 address = %me_onchain,
475 %balance,
476 %minimum_balance,
477 "node information"
478 );
479
480 if balance.le(&minimum_balance) {
481 return Err(HoprLibError::GeneralError(
482 "cannot start the node without a sufficiently funded wallet".into(),
483 ));
484 }
485
486 self.state.store(HoprState::ValidatingNetworkConfig, Ordering::Relaxed);
487
488 let network_min_ticket_price = self
491 .chain_api
492 .minimum_ticket_price()
493 .await
494 .map_err(HoprLibError::chain)?;
495 let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
496 if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
497 return Err(HoprLibError::GeneralError(format!(
498 "configured outgoing ticket price is lower than the network minimum ticket price: \
499 {configured_ticket_price:?} < {network_min_ticket_price}"
500 )));
501 }
502 let network_min_win_prob = self
505 .chain_api
506 .minimum_incoming_ticket_win_prob()
507 .await
508 .map_err(HoprLibError::chain)?;
509 let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
510 if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
511 && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
512 {
513 return Err(HoprLibError::GeneralError(format!(
514 "configured outgoing ticket winning probability is lower than the network minimum winning \
515 probability: {configured_win_prob:?} < {network_min_win_prob}"
516 )));
517 }
518
519 self.state.store(HoprState::CheckingOnchainAddress, Ordering::Relaxed);
520
521 info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
522
523 let safe_addr = self.cfg.safe_module.safe_address;
524
525 if self.me_onchain() == safe_addr {
526 return Err(HoprLibError::GeneralError(
527 "cannot use self as staking safe address".into(),
528 ));
529 }
530
531 self.state.store(HoprState::RegisteringSafe, Ordering::Relaxed);
532 info!(%safe_addr, "registering safe with this node");
533 match self.chain_api.register_safe(&safe_addr).await {
534 Ok(awaiter) => {
535 awaiter.await.map_err(|error| {
537 error!(%safe_addr, %error, "safe registration failed with error");
538 HoprLibError::chain(error)
539 })?;
540 info!(%safe_addr, "safe successfully registered with this node");
541 }
542 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
543 info!(%safe_addr, "this safe is already registered with this node");
544 }
545 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
546 error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
548 return Err(HoprLibError::GeneralError("node registered with different safe".into()));
549 }
550 Err(error) => {
551 error!(%safe_addr, %error, "safe registration failed");
552 return Err(HoprLibError::chain(error));
553 }
554 }
555
556 let multiaddresses_to_announce = if self.is_public() {
558 self.transport_api.announceable_multiaddresses()
561 } else {
562 Vec::with_capacity(0)
563 };
564
565 multiaddresses_to_announce
567 .iter()
568 .filter(|a| !is_public_address(a))
569 .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
570
571 self.state.store(HoprState::AnnouncingNode, Ordering::Relaxed);
572
573 let chain_api = self.chain_api.clone();
574 let me_offchain = *self.me.public();
575 let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
576
577 info!(?multiaddresses_to_announce, "announcing node on chain");
580 match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
581 Ok(awaiter) => {
582 awaiter.await.map_err(|error| {
584 error!(?multiaddresses_to_announce, %error, "node announcement failed");
585 HoprLibError::chain(error)
586 })?;
587 info!(?multiaddresses_to_announce, "node has been successfully announced");
588 }
589 Err(AnnouncementError::AlreadyAnnounced) => {
590 info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
591 }
592 Err(error) => {
593 error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
594 return Err(HoprLibError::chain(error));
595 }
596 }
597
598 self.state.store(HoprState::AwaitingKeyBinding, Ordering::Relaxed);
599
600 let this_node_account = node_ready
602 .await
603 .map_err(HoprLibError::other)?
604 .map_err(HoprLibError::chain)?;
605 if this_node_account.chain_addr != self.me_onchain()
606 || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
607 {
608 error!(%this_node_account, "account bound to offchain key does not match this node");
609 return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
610 }
611
612 info!(%this_node_account, "node account is ready");
613
614 self.state.store(HoprState::InitializingServices, Ordering::Relaxed);
615
616 info!("initializing session infrastructure");
617 let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
618 .ok()
619 .and_then(|s| s.trim().parse::<usize>().ok())
620 .filter(|&c| c > 0)
621 .unwrap_or(256);
622
623 let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
624 #[cfg(feature = "session-server")]
625 {
626 debug!(capacity = incoming_session_channel_capacity, "creating session server");
627 processes.insert(
628 HoprLibProcess::SessionServer,
629 hopr_async_runtime::spawn_as_abortable!(
630 _session_rx
631 .for_each_concurrent(None, move |session| {
632 let serve_handler = serve_handler.clone();
633 async move {
634 let session_id = *session.session.id();
635 match serve_handler.process(session).await {
636 Ok(_) => debug!(?session_id, "client session processed successfully"),
637 Err(error) => error!(
638 ?session_id,
639 %error,
640 "client session processing failed"
641 ),
642 }
643 }
644 })
645 .inspect(|_| tracing::warn!(
646 task = %HoprLibProcess::SessionServer,
647 "long-running background task finished"
648 ))
649 ),
650 );
651 }
652
653 info!("starting ticket events processor");
654 let (tickets_tx, tickets_rx) = channel(8192);
655 let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
656 processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
657 let node_db = self.node_db.clone();
658 let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
659 spawn(
660 tickets_rx
661 .filter_map(move |ticket_event| {
662 let node_db = node_db.clone();
663 async move {
664 match ticket_event {
665 TicketEvent::WinningTicket(winning) => {
666 if let Err(error) = node_db.insert_ticket(*winning).await {
667 tracing::error!(%error, %winning, "failed to insert ticket into database");
668 } else {
669 tracing::debug!(%winning, "inserted ticket into database");
670 }
671 Some(winning)
672 }
673 TicketEvent::RejectedTicket(rejected, issuer) => {
674 if let Some(issuer) = &issuer {
675 if let Err(error) =
676 node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
677 {
678 tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
679 } else {
680 tracing::debug!(%rejected, "marked ticket as rejected");
681 }
682 } else {
683 tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
684 }
685 None
686 }
687 }
688 }
689 })
690 .for_each(move |ticket| {
691 if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
692 tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
693 }
694 futures::future::ready(())
695 })
696 .inspect(|_| {
697 tracing::warn!(
698 task = %HoprLibProcess::TicketEvents,
699 "long-running background task finished"
700 )
701 }),
702 );
703
704 info!("starting transport");
705 let (hopr_socket, transport_processes) = self
706 .transport_api
707 .run(cover_traffic, network_builder, tickets_tx, session_tx)
708 .await?;
709 processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
710
711 info!("starting ticket redemption service");
712 let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
714 let _ = self.redeem_requests.set(redemption_req_tx);
715 let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
716 processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
717 let chain = self.chain_api.clone();
718 let node_db = self.node_db.clone();
719 spawn(redemption_req_rx
720 .for_each(move |selector| {
721 let chain = chain.clone();
722 let db = node_db.clone();
723 async move {
724 match chain.redeem_tickets_via_selectors(&db, [selector]).await {
725 Ok(res) => debug!(%res, "redemption complete"),
726 Err(error) => error!(%error, "redemption failed"),
727 }
728 }
729 })
730 .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
731 );
732
733 info!("subscribing to channel events");
734 let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
735 processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
736 let chain = self.chain_api.clone();
737 let node_db = self.node_db.clone();
738 let events = chain.subscribe().map_err(HoprLibError::chain)?;
739 spawn(
740 futures::stream::Abortable::new(
741 events
742 .filter_map(move |event|
743 futures::future::ready(event.try_as_channel_closed())
744 ),
745 chain_events_sub_reg
746 )
747 .for_each(move |closed_channel| {
748 let node_db = node_db.clone();
749 let chain = chain.clone();
750 async move {
751 match closed_channel.direction(chain.me()) {
752 Some(ChannelDirection::Incoming) => {
753 match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
754 Ok(num_neglected) if num_neglected > 0 => {
755 warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
756 },
757 Ok(_) => {
758 debug!(%closed_channel, "no neglected tickets on incoming closed channel");
759 },
760 Err(error) => {
761 error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
762 }
763 }
764 },
765 Some(ChannelDirection::Outgoing) => {
766 if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
767 error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
768 } else {
769 debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
770 }
771 }
772 _ => {} }
774 }
775 })
776 .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
777 );
778
779 info!("synchronizing ticket states");
780 let mut channels = self
785 .chain_api
786 .stream_channels(ChannelSelector {
787 destination: self.me_onchain().into(),
788 ..Default::default()
789 })
790 .map_err(HoprLibError::chain)
791 .await?;
792
793 while let Some(channel) = channels.next().await {
794 self.node_db
797 .update_ticket_states_and_fetch(
798 [TicketSelector::from(&channel)
799 .with_state(AcknowledgedTicketStatus::BeingRedeemed)
800 .with_index_range(channel.ticket_index..)],
801 AcknowledgedTicketStatus::Untouched,
802 )
803 .map_err(HoprLibError::db)
804 .await?
805 .for_each(|ticket| {
806 info!(%ticket, "fixed next out-of-sync ticket");
807 futures::future::ready(())
808 })
809 .await;
810
811 self.node_db
813 .mark_tickets_as(
814 [TicketSelector::from(&channel).with_index_range(..channel.ticket_index)],
815 TicketMarker::Neglected,
816 )
817 .map_err(HoprLibError::db)
818 .await?;
819 }
820
821 self.state.store(HoprState::Running, Ordering::Relaxed);
822
823 info!(
824 id = %self.me_peer_id(),
825 version = constants::APP_VERSION,
826 "NODE STARTED AND RUNNING"
827 );
828
829 #[cfg(all(feature = "telemetry", not(test)))]
830 METRIC_HOPR_NODE_INFO.set(
831 &[
832 &self.me.public().to_peerid_str(),
833 &me_onchain.to_string(),
834 &self.cfg.safe_module.safe_address.to_string(),
835 &self.cfg.safe_module.module_address.to_string(),
836 ],
837 1.0,
838 );
839
840 let _ = self.processes.set(processes);
841 Ok(hopr_socket)
842 }
843
844 pub fn shutdown(&self) -> Result<(), HoprLibError> {
852 self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
853 if let Some(processes) = self.processes.get() {
854 processes.abort_all();
855 }
856 self.state.store(HoprState::Terminated, Ordering::Relaxed);
857 info!("NODE SHUTDOWN COMPLETE");
858 Ok(())
859 }
860
861 #[cfg(feature = "session-client")]
864 pub async fn connect_to(
865 &self,
866 destination: Address,
867 target: SessionTarget,
868 cfg: HoprSessionClientConfig,
869 ) -> errors::Result<HoprSession> {
870 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
871
872 let backoff = backon::ConstantBuilder::default()
873 .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
874 .with_delay(self.cfg.protocol.session.establish_retry_timeout)
875 .with_jitter();
876
877 use backon::Retryable;
878
879 Ok((|| {
880 let cfg = hopr_transport::SessionClientConfig::from(cfg.clone());
881 let target = target.clone();
882 async { self.transport_api.new_session(destination, target, cfg).await }
883 })
884 .retry(backoff)
885 .sleep(backon::FuturesTimerSleeper)
886 .await?)
887 }
888
889 #[cfg(feature = "session-client")]
892 pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
893 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
894 Ok(self.transport_api.probe_session(id).await?)
895 }
896
897 #[cfg(feature = "session-client")]
898 pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
899 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
900 Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
901 }
902
903 #[cfg(feature = "session-client")]
904 pub async fn update_session_surb_balancer_config(
905 &self,
906 id: &SessionId,
907 cfg: SurbBalancerConfig,
908 ) -> errors::Result<()> {
909 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
910 Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
911 }
912
913 fn spawn_wait_for_on_chain_event(
916 &self,
917 context: impl std::fmt::Display,
918 predicate: impl Fn(&ChainEvent) -> bool + Send + Sync + 'static,
919 timeout: Duration,
920 ) -> errors::Result<(
921 impl Future<Output = errors::Result<ChainEvent>>,
922 hopr_async_runtime::AbortHandle,
923 )> {
924 debug!(%context, "registering wait for on-chain event");
925 let (event_stream, handle) = futures::stream::abortable(
926 self.chain_api
927 .subscribe()
928 .map_err(HoprLibError::chain)?
929 .skip_while(move |event| futures::future::ready(!predicate(event))),
930 );
931
932 let ctx = context.to_string();
933
934 Ok((
935 spawn(async move {
936 pin_mut!(event_stream);
937 let res = event_stream
938 .next()
939 .timeout(futures_time::time::Duration::from(timeout))
940 .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")))
941 .await?
942 .ok_or(HoprLibError::GeneralError(format!(
943 "failed to yield an on-chain event for {ctx}"
944 )));
945 debug!(%ctx, ?res, "on-chain event waiting done");
946 res
947 })
948 .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")))
949 .and_then(futures::future::ready),
950 handle,
951 ))
952 }
953}
954
955impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
956where
957 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
958 + hopr_api::graph::NetworkGraphUpdate
959 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
960 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
961 + Clone
962 + Send
963 + Sync
964 + 'static,
965 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
966 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
967 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
968 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
969{
970 pub fn collect_hopr_metrics() -> errors::Result<String> {
973 cfg_if::cfg_if! {
974 if #[cfg(all(feature = "telemetry", not(test)))] {
975 hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
976 } else {
977 Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
978 }
979 }
980 }
981}
982
983impl<Chain, Db, Graph, Net> HoprNodeOperations for Hopr<Chain, Db, Graph, Net>
986where
987 Chain: HoprChainApi + Clone + Send + Sync + 'static,
988 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
989 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
990 + hopr_api::graph::NetworkGraphUpdate
991 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
992 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
993 + Clone
994 + Send
995 + Sync
996 + 'static,
997 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
998 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
999 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1000 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1001{
1002 fn status(&self) -> HoprState {
1003 self.state.load(Ordering::Relaxed)
1004 }
1005}
1006
1007#[async_trait::async_trait]
1008impl<Chain, Db, Graph, Net> HoprNodeNetworkOperations for Hopr<Chain, Db, Graph, Net>
1009where
1010 Chain: HoprChainApi + Clone + Send + Sync + 'static,
1011 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
1012 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
1013 + hopr_api::graph::NetworkGraphUpdate
1014 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
1015 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1016 + Clone
1017 + Send
1018 + Sync
1019 + 'static,
1020 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
1021 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1022 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1023 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1024{
1025 type Error = HoprLibError;
1026 type TransportObservable = <Graph as hopr_api::graph::NetworkGraphView>::Observed;
1027
1028 fn me_peer_id(&self) -> PeerId {
1029 (*self.me.public()).into()
1030 }
1031
1032 async fn get_public_nodes(&self) -> Result<Vec<(PeerId, Address, Vec<Multiaddr>)>, Self::Error> {
1033 Ok(self
1034 .chain_api
1035 .stream_accounts(AccountSelector {
1036 public_only: true,
1037 ..Default::default()
1038 })
1039 .map_err(HoprLibError::chain)
1040 .await?
1041 .map(|entry| {
1042 (
1043 PeerId::from(entry.public_key),
1044 entry.chain_addr,
1045 entry.get_multiaddrs().to_vec(),
1046 )
1047 })
1048 .collect()
1049 .await)
1050 }
1051
1052 async fn network_health(&self) -> hopr_api::network::Health {
1053 self.transport_api.network_health().await
1054 }
1055
1056 async fn network_connected_peers(&self) -> Result<Vec<PeerId>, Self::Error> {
1057 Ok(self
1058 .transport_api
1059 .network_connected_peers()
1060 .await?
1061 .into_iter()
1062 .map(PeerId::from)
1063 .collect())
1064 }
1065
1066 fn network_peer_info(&self, peer: &PeerId) -> Option<Self::TransportObservable> {
1067 let pubkey = OffchainPublicKey::from_peerid(peer).ok()?;
1068 self.transport_api.network_peer_observations(&pubkey)
1069 }
1070
1071 async fn all_network_peers(
1072 &self,
1073 minimum_score: f64,
1074 ) -> Result<Vec<(Option<Address>, PeerId, Self::TransportObservable)>, Self::Error> {
1075 Ok(
1076 futures::stream::iter(self.transport_api.all_network_peers(minimum_score).await?)
1077 .filter_map(|(pubkey, info)| async move {
1078 let peer_id = PeerId::from(pubkey);
1079 let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
1080 Some((address, peer_id, info))
1081 })
1082 .collect::<Vec<_>>()
1083 .await,
1084 )
1085 }
1086
1087 fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1088 self.transport_api.local_multiaddresses()
1089 }
1090
1091 async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1092 self.transport_api.listening_multiaddresses().await
1093 }
1094
1095 async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1096 let Ok(pubkey) = hopr_transport::peer_id_to_public_key(peer).await else {
1097 return vec![];
1098 };
1099 self.transport_api.network_observed_multiaddresses(&pubkey).await
1100 }
1101
1102 async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Result<Vec<Multiaddr>, Self::Error> {
1103 let pubkey = hopr_transport::peer_id_to_public_key(peer)
1104 .await
1105 .map_err(HoprLibError::TransportError)?;
1106
1107 match self
1108 .chain_api
1109 .stream_accounts(AccountSelector {
1110 public_only: false,
1111 offchain_key: Some(pubkey),
1112 ..Default::default()
1113 })
1114 .map_err(HoprLibError::chain)
1115 .await?
1116 .next()
1117 .await
1118 {
1119 Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
1120 None => {
1121 error!(%peer, "no information");
1122 Ok(vec![])
1123 }
1124 }
1125 }
1126
1127 async fn ping(&self, peer: &PeerId) -> Result<(Duration, Self::TransportObservable), Self::Error> {
1128 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1129 let pubkey = hopr_transport::peer_id_to_public_key(peer)
1130 .await
1131 .map_err(HoprLibError::TransportError)?;
1132 Ok(self.transport_api.ping(&pubkey).await?)
1133 }
1134}
1135
1136pub type SinkMap = SinkMapErr<futures::channel::mpsc::Sender<TicketSelector>, fn(SendError) -> HoprLibError>;
1137
1138impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
1139where
1140 Chain: HoprChainApi + Clone + Send + Sync + 'static,
1141 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
1142 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
1143 + hopr_api::graph::NetworkGraphUpdate
1144 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
1145 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1146 + Clone
1147 + Send
1148 + Sync
1149 + 'static,
1150 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
1151 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1152 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1153 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1154{
1155 pub fn me_onchain(&self) -> Address {
1156 *self.chain_api.me()
1157 }
1158
1159 pub fn get_safe_config(&self) -> SafeModuleConfig {
1160 SafeModuleConfig {
1161 safe_address: self.cfg.safe_module.safe_address,
1162 module_address: self.cfg.safe_module.module_address,
1163 }
1164 }
1165
1166 pub async fn get_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1167 self.chain_api
1168 .balance(self.me_onchain())
1169 .await
1170 .map_err(HoprLibError::chain)
1171 }
1172
1173 pub async fn get_safe_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1174 self.chain_api
1175 .balance(self.cfg.safe_module.safe_address)
1176 .await
1177 .map_err(HoprLibError::chain)
1178 }
1179
1180 pub async fn safe_allowance(&self) -> Result<HoprBalance, HoprLibError> {
1181 self.chain_api
1182 .safe_allowance(self.cfg.safe_module.safe_address)
1183 .await
1184 .map_err(HoprLibError::chain)
1185 }
1186
1187 pub async fn chain_info(&self) -> Result<ChainInfo, HoprLibError> {
1188 self.chain_api.chain_info().await.map_err(HoprLibError::chain)
1189 }
1190
1191 pub async fn get_ticket_price(&self) -> Result<HoprBalance, HoprLibError> {
1192 self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1193 }
1194
1195 pub async fn get_minimum_incoming_ticket_win_probability(&self) -> Result<WinningProbability, HoprLibError> {
1196 self.chain_api
1197 .minimum_incoming_ticket_win_prob()
1198 .await
1199 .map_err(HoprLibError::chain)
1200 }
1201
1202 pub async fn get_channel_closure_notice_period(&self) -> Result<Duration, HoprLibError> {
1203 self.chain_api
1204 .channel_closure_notice_period()
1205 .await
1206 .map_err(HoprLibError::chain)
1207 }
1208
1209 pub async fn accounts_announced_on_chain(&self) -> Result<Vec<AccountEntry>, HoprLibError> {
1210 Ok(self
1211 .chain_api
1212 .stream_accounts(AccountSelector {
1213 public_only: true,
1214 ..Default::default()
1215 })
1216 .map_err(HoprLibError::chain)
1217 .await?
1218 .collect()
1219 .await)
1220 }
1221
1222 pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> Result<Option<Address>, HoprLibError> {
1223 let pubkey = hopr_transport::peer_id_to_public_key(peer_id)
1224 .await
1225 .map_err(HoprLibError::TransportError)?;
1226
1227 self.chain_api
1228 .packet_key_to_chain_key(&pubkey)
1229 .await
1230 .map_err(HoprLibError::chain)
1231 }
1232
1233 pub async fn chain_key_to_peerid(&self, address: &Address) -> Result<Option<PeerId>, HoprLibError> {
1234 self.chain_api
1235 .chain_key_to_packet_key(address)
1236 .await
1237 .map(|pk| pk.map(|v| v.into()))
1238 .map_err(HoprLibError::chain)
1239 }
1240
1241 pub async fn channel_from_hash(&self, channel_id: &Hash) -> Result<Option<ChannelEntry>, HoprLibError> {
1242 self.chain_api
1243 .channel_by_id(channel_id)
1244 .await
1245 .map_err(HoprLibError::chain)
1246 }
1247
1248 pub async fn channel(&self, src: &Address, dest: &Address) -> Result<Option<ChannelEntry>, HoprLibError> {
1249 self.chain_api
1250 .channel_by_parties(src, dest)
1251 .await
1252 .map_err(HoprLibError::chain)
1253 }
1254
1255 pub async fn channels_from(&self, src: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1256 Ok(self
1257 .chain_api
1258 .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1259 ChannelStatusDiscriminants::Closed,
1260 ChannelStatusDiscriminants::Open,
1261 ChannelStatusDiscriminants::PendingToClose,
1262 ]))
1263 .map_err(HoprLibError::chain)
1264 .await?
1265 .collect()
1266 .await)
1267 }
1268
1269 pub async fn channels_to(&self, dest: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1270 Ok(self
1271 .chain_api
1272 .stream_channels(
1273 ChannelSelector::default()
1274 .with_destination(*dest)
1275 .with_allowed_states(&[
1276 ChannelStatusDiscriminants::Closed,
1277 ChannelStatusDiscriminants::Open,
1278 ChannelStatusDiscriminants::PendingToClose,
1279 ]),
1280 )
1281 .map_err(HoprLibError::chain)
1282 .await?
1283 .collect()
1284 .await)
1285 }
1286
1287 pub async fn all_channels(&self) -> Result<Vec<ChannelEntry>, HoprLibError> {
1288 Ok(self
1289 .chain_api
1290 .stream_channels(ChannelSelector::default().with_allowed_states(&[
1291 ChannelStatusDiscriminants::Closed,
1292 ChannelStatusDiscriminants::Open,
1293 ChannelStatusDiscriminants::PendingToClose,
1294 ]))
1295 .map_err(HoprLibError::chain)
1296 .await?
1297 .collect()
1298 .await)
1299 }
1300
1301 pub async fn open_channel(
1302 &self,
1303 destination: &Address,
1304 amount: HoprBalance,
1305 ) -> Result<OpenChannelResult, HoprLibError> {
1306 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1307
1308 let channel_id = generate_channel_id(&self.me_onchain(), destination);
1309
1310 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1313 format!("open channel to {destination} ({channel_id})"),
1314 move |event| matches!(event, ChainEvent::ChannelOpened(c) if c.get_id() == &channel_id),
1315 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1316 )?;
1317
1318 let confirm_awaiter = self
1319 .chain_api
1320 .open_channel(destination, amount)
1321 .await
1322 .map_err(HoprLibError::chain)?;
1323
1324 let tx_hash = confirm_awaiter.await.map_err(|e| {
1325 event_abort.abort();
1326 HoprLibError::chain(e)
1327 })?;
1328
1329 let event = event_awaiter.await?;
1330 debug!(%event, "open channel event received");
1331
1332 Ok(OpenChannelResult { tx_hash, channel_id })
1333 }
1334
1335 pub async fn fund_channel(&self, channel_id: &ChannelId, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1336 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1337
1338 let channel_id = *channel_id;
1339
1340 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1343 format!("fund channel {channel_id}"),
1344 move |event| matches!(event, ChainEvent::ChannelBalanceIncreased(c, a) if c.get_id() == &channel_id && a == &amount),
1345 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT
1346 )?;
1347
1348 let confirm_awaiter = self
1349 .chain_api
1350 .fund_channel(&channel_id, amount)
1351 .await
1352 .map_err(HoprLibError::chain)?;
1353
1354 let res = confirm_awaiter.await.map_err(|e| {
1355 event_abort.abort();
1356 HoprLibError::chain(e)
1357 })?;
1358
1359 let event = event_awaiter.await?;
1360 debug!(%event, "fund channel event received");
1361
1362 Ok(res)
1363 }
1364
1365 pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> Result<CloseChannelResult, HoprLibError> {
1366 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1367
1368 let channel_id = *channel_id;
1369
1370 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1373 format!("close channel {channel_id}"),
1374 move |event| {
1375 matches!(event, ChainEvent::ChannelClosed(c) if c.get_id() == &channel_id)
1376 || matches!(event, ChainEvent::ChannelClosureInitiated(c) if c.get_id() == &channel_id)
1377 },
1378 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1379 )?;
1380
1381 let confirm_awaiter = self
1382 .chain_api
1383 .close_channel(&channel_id)
1384 .await
1385 .map_err(HoprLibError::chain)?;
1386
1387 let tx_hash = confirm_awaiter.await.map_err(|e| {
1388 event_abort.abort();
1389 HoprLibError::chain(e)
1390 })?;
1391
1392 let event = event_awaiter.await?;
1393 debug!(%event, "close channel event received");
1394
1395 Ok(CloseChannelResult { tx_hash })
1396 }
1397
1398 pub async fn tickets_in_channel(
1399 &self,
1400 channel_id: &ChannelId,
1401 ) -> Result<Option<Vec<RedeemableTicket>>, HoprLibError> {
1402 if let Some(channel) = self
1403 .chain_api
1404 .channel_by_id(channel_id)
1405 .await
1406 .map_err(|e| HoprTransportError::Other(e.into()))?
1407 {
1408 if &channel.destination == self.chain_api.me() {
1409 Ok(Some(
1410 self.node_db
1411 .stream_tickets([&channel])
1412 .await
1413 .map_err(HoprLibError::db)?
1414 .collect()
1415 .await,
1416 ))
1417 } else {
1418 Ok(None)
1419 }
1420 } else {
1421 Ok(None)
1422 }
1423 }
1424
1425 pub async fn all_tickets(&self) -> Result<Vec<VerifiedTicket>, HoprLibError> {
1426 Ok(self
1427 .node_db
1428 .stream_tickets(None::<TicketSelector>)
1429 .await
1430 .map_err(HoprLibError::db)?
1431 .map(|v| v.ticket)
1432 .collect()
1433 .await)
1434 }
1435
1436 pub async fn ticket_statistics(&self) -> Result<ChannelTicketStatistics, HoprLibError> {
1437 self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
1438 }
1439
1440 pub async fn reset_ticket_statistics(&self) -> Result<(), HoprLibError> {
1441 self.node_db
1442 .reset_ticket_statistics()
1443 .await
1444 .map_err(HoprLibError::chain)
1445 }
1446
1447 pub async fn redeem_all_tickets<B: Into<HoprBalance> + Send>(&self, min_value: B) -> Result<(), HoprLibError> {
1448 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1449
1450 let min_value = min_value.into();
1451
1452 self.chain_api
1453 .stream_channels(
1454 ChannelSelector::default()
1455 .with_destination(self.me_onchain())
1456 .with_allowed_states(&[
1457 ChannelStatusDiscriminants::Open,
1458 ChannelStatusDiscriminants::PendingToClose,
1459 ]),
1460 )
1461 .map_err(HoprLibError::chain)
1462 .await?
1463 .map(|channel| {
1464 Ok(TicketSelector::from(&channel)
1465 .with_amount(min_value..)
1466 .with_index_range(channel.ticket_index..)
1467 .with_state(AcknowledgedTicketStatus::Untouched))
1468 })
1469 .forward(self.redemption_requests()?)
1470 .await?;
1471
1472 Ok(())
1473 }
1474
1475 pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance> + Send>(
1476 &self,
1477 counterparty: &Address,
1478 min_value: B,
1479 ) -> Result<(), HoprLibError> {
1480 self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1481 .await
1482 }
1483
1484 pub async fn redeem_tickets_in_channel<B: Into<HoprBalance> + Send>(
1485 &self,
1486 channel_id: &Hash,
1487 min_value: B,
1488 ) -> Result<(), HoprLibError> {
1489 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1490
1491 let channel = self
1492 .chain_api
1493 .channel_by_id(channel_id)
1494 .await
1495 .map_err(HoprLibError::chain)?
1496 .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1497
1498 self.redemption_requests()?
1499 .send(
1500 TicketSelector::from(channel)
1501 .with_amount(min_value.into()..)
1502 .with_index_range(channel.ticket_index..)
1503 .with_state(AcknowledgedTicketStatus::Untouched),
1504 )
1505 .await?;
1506
1507 Ok(())
1508 }
1509
1510 pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> Result<(), HoprLibError> {
1511 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1512
1513 self.redemption_requests()?
1514 .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1515 .await?;
1516
1517 Ok(())
1518 }
1519
1520 pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send + 'static {
1521 self.winning_ticket_subscribers.1.activate_cloned()
1522 }
1523
1524 pub fn redemption_requests(&self) -> Result<SinkMap, HoprLibError> {
1525 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1526
1527 Ok(self
1528 .redeem_requests
1529 .get()
1530 .cloned()
1531 .expect("redeem_requests is not initialized")
1532 .sink_map_err(|e| HoprLibError::GeneralError(format!("failed to send redemption request: {e}"))))
1533 }
1534
1535 pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1536 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1537
1538 self.chain_api
1539 .withdraw(amount, &recipient)
1540 .and_then(identity)
1541 .map_err(HoprLibError::chain)
1542 .await
1543 }
1544
1545 pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> Result<Hash, HoprLibError> {
1546 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1547
1548 self.chain_api
1549 .withdraw(amount, &recipient)
1550 .and_then(identity)
1551 .map_err(HoprLibError::chain)
1552 .await
1553 }
1554}