1mod helpers;
18
19pub mod config;
21pub mod constants;
23pub mod errors;
25pub mod traits;
27pub mod utils;
29
30pub use hopr_api as api;
31
32#[doc(hidden)]
34pub mod exports {
35 pub mod types {
36 pub use hopr_api::types::{chain, internal, primitive};
37 }
38
39 pub mod crypto {
40 pub use hopr_api::types::crypto as types;
41 pub use hopr_crypto_keypair as keypair;
42 }
43
44 pub mod network {
45 pub use hopr_network_types as types;
46 }
47
48 pub use hopr_transport as transport;
49}
50
51#[doc(hidden)]
53pub mod prelude {
54 #[cfg(feature = "runtime-tokio")]
55 pub use super::exports::network::types::{
56 prelude::ForeignDataMode,
57 udp::{ConnectedUdpStream, UdpStreamParallelism},
58 };
59 pub use super::exports::{
60 crypto::{
61 keypair::key_pair::HoprKeys,
62 types::prelude::{ChainKeypair, Hash, OffchainKeypair},
63 },
64 transport::{OffchainPublicKey, socket::HoprSocket},
65 types::primitive::prelude::Address,
66 };
67}
68
69use std::{
70 convert::identity,
71 future::Future,
72 sync::{Arc, OnceLock, atomic::Ordering},
73 time::Duration,
74};
75
76use futures::{
77 FutureExt, SinkExt, Stream, StreamExt, TryFutureExt,
78 channel::mpsc::{SendError, channel},
79 pin_mut,
80 sink::SinkMapErr,
81};
82use futures_time::future::FutureExt as FuturesTimeFutureExt;
83use hopr_api::{
84 chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
85 ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
86 db::{HoprNodeDbApi, TicketMarker, TicketSelector},
87 node::{ChainInfo, CloseChannelResult, OpenChannelResult, SafeModuleConfig, state::AtomicHoprState},
88};
89pub use hopr_api::{
90 db::ChannelTicketStatistics,
91 graph::EdgeLinkObservable,
92 network::{NetworkBuilder, NetworkStreamControl},
93 node::{HoprNodeNetworkOperations, HoprNodeOperations, state::HoprState},
94 types::{crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
95};
96use hopr_async_runtime::prelude::spawn;
97pub use hopr_async_runtime::{Abortable, AbortableList};
98pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
99pub use hopr_network_types::prelude::*;
100#[cfg(all(feature = "telemetry", not(test)))]
101use hopr_platform::time::native::current_time;
102use hopr_transport::errors::HoprTransportError;
103#[cfg(feature = "runtime-tokio")]
104pub use hopr_transport::transfer_session;
105pub use hopr_transport::*;
106use tracing::{debug, error, info, warn};
107use validator::Validate;
108
109pub use crate::{
110 config::SafeModule,
111 constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
112 errors::{HoprLibError, HoprStatusError},
113};
114
115#[cfg(feature = "session-client")]
119#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, smart_default::SmartDefault)]
120#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
121pub struct HopRouting(
122 #[default(hopr_api::types::primitive::bounded::BoundedSize::MIN)]
123 hopr_api::types::primitive::bounded::BoundedSize<
124 { hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS },
125 >,
126);
127
128#[cfg(feature = "session-client")]
129impl HopRouting {
130 pub const MAX_HOPS: usize = hopr_api::types::internal::routing::RoutingOptions::MAX_INTERMEDIATE_HOPS;
132
133 pub fn hop_count(self) -> usize {
135 self.0.into()
136 }
137}
138
139#[cfg(feature = "session-client")]
140impl TryFrom<usize> for HopRouting {
141 type Error = hopr_api::types::primitive::errors::GeneralError;
142
143 fn try_from(value: usize) -> Result<Self, Self::Error> {
144 Ok(Self(value.try_into()?))
145 }
146}
147
148#[cfg(feature = "session-client")]
149impl From<HopRouting> for hopr_api::types::internal::routing::RoutingOptions {
150 fn from(value: HopRouting) -> Self {
151 Self::Hops(value.0)
152 }
153}
154
155#[cfg(feature = "session-client")]
160#[derive(Debug, Clone, PartialEq, smart_default::SmartDefault)]
161pub struct HoprSessionClientConfig {
162 pub forward_path: HopRouting,
164 pub return_path: HopRouting,
166 #[default(_code = "SessionCapability::Segmentation.into()")]
168 pub capabilities: SessionCapabilities,
169 #[default(None)]
171 pub pseudonym: Option<hopr_api::types::internal::protocol::HoprPseudonym>,
172 #[default(Some(SurbBalancerConfig::default()))]
174 pub surb_management: Option<SurbBalancerConfig>,
175 #[default(false)]
177 pub always_max_out_surbs: bool,
178}
179
180#[cfg(feature = "session-client")]
181impl From<HoprSessionClientConfig> for hopr_transport::SessionClientConfig {
182 fn from(value: HoprSessionClientConfig) -> Self {
183 Self {
184 forward_path_options: value.forward_path.into(),
185 return_path_options: value.return_path.into(),
186 capabilities: value.capabilities,
187 pseudonym: value.pseudonym,
188 surb_management: value.surb_management,
189 always_max_out_surbs: value.always_max_out_surbs,
190 }
191 }
192}
193
194#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display, strum::EnumCount)]
196pub enum HoprLibProcess {
197 #[strum(to_string = "transport: {0}")]
198 Transport(HoprTransportProcess),
199 #[strum(to_string = "session server providing the exit node session stream functionality")]
200 SessionServer,
201 #[strum(to_string = "ticket redemption queue driver")]
202 TicketRedemptions,
203 #[strum(to_string = "subscription for on-chain channel updates")]
204 ChannelEvents,
205 #[strum(to_string = "on received ticket event (winning or rejected)")]
206 TicketEvents,
207}
208
209#[cfg(all(feature = "telemetry", not(test)))]
210lazy_static::lazy_static! {
211 static ref METRIC_PROCESS_START_TIME: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
212 "hopr_start_time",
213 "The unix timestamp in seconds at which the process was started"
214 ).unwrap();
215 static ref METRIC_HOPR_LIB_VERSION: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
216 "hopr_lib_version",
217 "Executed version of hopr-lib",
218 &["version"]
219 ).unwrap();
220 static ref METRIC_HOPR_NODE_INFO: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
221 "hopr_node_addresses",
222 "Node on-chain and off-chain addresses",
223 &["peerid", "address", "safe_address", "module_address"]
224 ).unwrap();
225}
226
227#[cfg(feature = "runtime-tokio")]
232pub fn prepare_tokio_runtime(
233 num_cpu_threads: Option<std::num::NonZeroUsize>,
234 num_io_threads: Option<std::num::NonZeroUsize>,
235) -> anyhow::Result<tokio::runtime::Runtime> {
236 use std::str::FromStr;
237 let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
238
239 hopr_parallelize::cpu::init_thread_pool(
240 num_cpu_threads
241 .map(|v| v.get())
242 .or(avail_parallelism)
243 .ok_or(anyhow::anyhow!(
244 "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
245 environment variable."
246 ))?
247 .max(1),
248 )?;
249
250 Ok(tokio::runtime::Builder::new_multi_thread()
251 .enable_all()
252 .worker_threads(
253 num_io_threads
254 .map(|v| v.get())
255 .or(avail_parallelism)
256 .ok_or(anyhow::anyhow!(
257 "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
258 environment variable."
259 ))?
260 .max(1),
261 )
262 .thread_name("hoprd")
263 .thread_stack_size(
264 std::env::var("HOPRD_THREAD_STACK_SIZE")
265 .ok()
266 .and_then(|v| usize::from_str(&v).ok())
267 .unwrap_or(10 * 1024 * 1024)
268 .max(2 * 1024 * 1024),
269 )
270 .build()?)
271}
272
273pub type HoprTransportIO = socket::HoprSocket<
275 futures::channel::mpsc::Receiver<ApplicationDataIn>,
276 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
277>;
278
279type NewTicketEvents = (
280 async_broadcast::Sender<VerifiedTicket>,
281 async_broadcast::InactiveReceiver<VerifiedTicket>,
282);
283
284const NODE_READY_TIMEOUT: Duration = Duration::from_secs(120);
286
287const ON_CHAIN_RESOLUTION_EVENT_TIMEOUT: Duration = Duration::from_secs(90);
290
291pub struct Hopr<Chain, Db, Graph, Net>
303where
304 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
305 + hopr_api::graph::NetworkGraphUpdate
306 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
307 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
308 + Clone
309 + Send
310 + Sync
311 + 'static,
312 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
313 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
314 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
315 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
316{
317 me: OffchainKeypair,
318 cfg: config::HoprLibConfig,
319 state: Arc<api::node::state::AtomicHoprState>,
320 transport_api: HoprTransport<Chain, Db, Graph, Net>,
321 redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
322 node_db: Db,
323 chain_api: Chain,
324 winning_ticket_subscribers: NewTicketEvents,
325 processes: OnceLock<AbortableList<HoprLibProcess>>,
326}
327
328impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
329where
330 Chain: HoprChainApi + Clone + Send + Sync + 'static,
331 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
332 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
333 + hopr_api::graph::NetworkGraphUpdate
334 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
335 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
336 + Clone
337 + Send
338 + Sync
339 + 'static,
340 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
341 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
342 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
343 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
344{
345 pub async fn new(
346 identity: (&ChainKeypair, &OffchainKeypair),
347 hopr_chain_api: Chain,
348 hopr_node_db: Db,
349 graph: Graph,
350 cfg: config::HoprLibConfig,
351 ) -> errors::Result<Self> {
352 if hopr_api::types::crypto_random::is_rng_fixed() {
353 warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
354 }
355
356 cfg.validate()?;
357
358 let hopr_transport_api = HoprTransport::new(
359 identity,
360 hopr_chain_api.clone(),
361 hopr_node_db.clone(),
362 graph,
363 vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
364 cfg.protocol,
365 )
366 .map_err(HoprLibError::TransportError)?;
367
368 #[cfg(all(feature = "telemetry", not(test)))]
369 {
370 METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
371 METRIC_HOPR_LIB_VERSION.set(
372 &[const_format::formatcp!("{}", constants::APP_VERSION)],
373 const_format::formatcp!(
374 "{}.{}",
375 env!("CARGO_PKG_VERSION_MAJOR"),
376 env!("CARGO_PKG_VERSION_MINOR")
377 )
378 .parse()
379 .unwrap_or(0.0),
380 );
381
382 if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
384 error!(%error, "failed to initialize ticket statistics metrics");
385 }
386 }
387
388 let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
389 new_tickets_tx.set_await_active(false);
390 new_tickets_tx.set_overflow(true);
391
392 Ok(Self {
393 me: identity.1.clone(),
394 cfg,
395 state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
396 transport_api: hopr_transport_api,
397 chain_api: hopr_chain_api,
398 node_db: hopr_node_db,
399 redeem_requests: OnceLock::new(),
400 processes: OnceLock::new(),
401 winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
402 })
403 }
404
405 fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
406 if HoprNodeOperations::status(self) == state {
407 Ok(())
408 } else {
409 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
410 }
411 }
412
413 pub fn config(&self) -> &config::HoprLibConfig {
414 &self.cfg
415 }
416
417 #[inline]
418 fn is_public(&self) -> bool {
419 self.cfg.publish
420 }
421
422 pub async fn run<
423 Ct,
424 NetBuilder,
425 #[cfg(feature = "session-server")] T: traits::HoprSessionServer + Clone + Send + 'static,
426 >(
427 &self,
428 cover_traffic: Ct,
429 network_builder: NetBuilder,
430 #[cfg(feature = "session-server")] serve_handler: T,
431 ) -> errors::Result<HoprTransportIO>
432 where
433 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
434 NetBuilder: NetworkBuilder<Network = Net> + Send + Sync + 'static,
435 {
436 self.error_if_not_in_state(
437 HoprState::Uninitialized,
438 "cannot start the hopr node multiple times".into(),
439 )?;
440
441 #[cfg(feature = "testing")]
442 warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
443
444 let me_onchain = *self.chain_api.me();
445 info!(
446 address = %me_onchain, minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
447 "node is not started, please fund this node",
448 );
449
450 self.state.store(HoprState::WaitingForFunds, Ordering::Relaxed);
451 helpers::wait_for_funds(
452 *MIN_NATIVE_BALANCE,
453 *SUGGESTED_NATIVE_BALANCE,
454 Duration::from_secs(200),
455 me_onchain,
456 &self.chain_api,
457 )
458 .await?;
459
460 let mut processes = AbortableList::<HoprLibProcess>::default();
461
462 info!("starting HOPR node...");
463 self.state.store(HoprState::CheckingBalance, Ordering::Relaxed);
464
465 let balance: XDaiBalance = self.chain_api.balance(me_onchain).await.map_err(HoprLibError::chain)?;
466 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
467
468 info!(
469 address = %me_onchain,
470 %balance,
471 %minimum_balance,
472 "node information"
473 );
474
475 if balance.le(&minimum_balance) {
476 return Err(HoprLibError::GeneralError(
477 "cannot start the node without a sufficiently funded wallet".into(),
478 ));
479 }
480
481 self.state.store(HoprState::ValidatingNetworkConfig, Ordering::Relaxed);
482
483 let network_min_ticket_price = self
486 .chain_api
487 .minimum_ticket_price()
488 .await
489 .map_err(HoprLibError::chain)?;
490 let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
491 if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
492 return Err(HoprLibError::GeneralError(format!(
493 "configured outgoing ticket price is lower than the network minimum ticket price: \
494 {configured_ticket_price:?} < {network_min_ticket_price}"
495 )));
496 }
497 let network_min_win_prob = self
500 .chain_api
501 .minimum_incoming_ticket_win_prob()
502 .await
503 .map_err(HoprLibError::chain)?;
504 let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
505 if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
506 && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
507 {
508 return Err(HoprLibError::GeneralError(format!(
509 "configured outgoing ticket winning probability is lower than the network minimum winning \
510 probability: {configured_win_prob:?} < {network_min_win_prob}"
511 )));
512 }
513
514 self.state.store(HoprState::CheckingOnchainAddress, Ordering::Relaxed);
515
516 info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
517
518 let safe_addr = self.cfg.safe_module.safe_address;
519
520 if self.me_onchain() == safe_addr {
521 return Err(HoprLibError::GeneralError(
522 "cannot use self as staking safe address".into(),
523 ));
524 }
525
526 self.state.store(HoprState::RegisteringSafe, Ordering::Relaxed);
527 info!(%safe_addr, "registering safe with this node");
528 match self.chain_api.register_safe(&safe_addr).await {
529 Ok(awaiter) => {
530 awaiter.await.map_err(|error| {
532 error!(%safe_addr, %error, "safe registration failed with error");
533 HoprLibError::chain(error)
534 })?;
535 info!(%safe_addr, "safe successfully registered with this node");
536 }
537 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
538 info!(%safe_addr, "this safe is already registered with this node");
539 }
540 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
541 error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
543 return Err(HoprLibError::GeneralError("node registered with different safe".into()));
544 }
545 Err(error) => {
546 error!(%safe_addr, %error, "safe registration failed");
547 return Err(HoprLibError::chain(error));
548 }
549 }
550
551 let multiaddresses_to_announce = if self.is_public() {
553 self.transport_api.announceable_multiaddresses()
556 } else {
557 Vec::with_capacity(0)
558 };
559
560 multiaddresses_to_announce
562 .iter()
563 .filter(|a| !is_public_address(a))
564 .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
565
566 self.state.store(HoprState::AnnouncingNode, Ordering::Relaxed);
567
568 let chain_api = self.chain_api.clone();
569 let me_offchain = *self.me.public();
570 let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
571
572 info!(?multiaddresses_to_announce, "announcing node on chain");
575 match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
576 Ok(awaiter) => {
577 awaiter.await.map_err(|error| {
579 error!(?multiaddresses_to_announce, %error, "node announcement failed");
580 HoprLibError::chain(error)
581 })?;
582 info!(?multiaddresses_to_announce, "node has been successfully announced");
583 }
584 Err(AnnouncementError::AlreadyAnnounced) => {
585 info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
586 }
587 Err(error) => {
588 error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
589 return Err(HoprLibError::chain(error));
590 }
591 }
592
593 self.state.store(HoprState::AwaitingKeyBinding, Ordering::Relaxed);
594
595 let this_node_account = node_ready
597 .await
598 .map_err(HoprLibError::other)?
599 .map_err(HoprLibError::chain)?;
600 if this_node_account.chain_addr != self.me_onchain()
601 || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
602 {
603 error!(%this_node_account, "account bound to offchain key does not match this node");
604 return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
605 }
606
607 info!(%this_node_account, "node account is ready");
608
609 self.state.store(HoprState::InitializingServices, Ordering::Relaxed);
610
611 info!("initializing session infrastructure");
612 let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
613 .ok()
614 .and_then(|s| s.trim().parse::<usize>().ok())
615 .filter(|&c| c > 0)
616 .unwrap_or(256);
617
618 let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
619 #[cfg(feature = "session-server")]
620 {
621 debug!(capacity = incoming_session_channel_capacity, "creating session server");
622 processes.insert(
623 HoprLibProcess::SessionServer,
624 hopr_async_runtime::spawn_as_abortable!(
625 _session_rx
626 .for_each_concurrent(None, move |session| {
627 let serve_handler = serve_handler.clone();
628 async move {
629 let session_id = *session.session.id();
630 match serve_handler.process(session).await {
631 Ok(_) => debug!(?session_id, "client session processed successfully"),
632 Err(error) => error!(
633 ?session_id,
634 %error,
635 "client session processing failed"
636 ),
637 }
638 }
639 })
640 .inspect(|_| tracing::warn!(
641 task = %HoprLibProcess::SessionServer,
642 "long-running background task finished"
643 ))
644 ),
645 );
646 }
647
648 info!("starting ticket events processor");
649 let (tickets_tx, tickets_rx) = channel(8192);
650 let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
651 processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
652 let node_db = self.node_db.clone();
653 let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
654 spawn(
655 tickets_rx
656 .filter_map(move |ticket_event| {
657 let node_db = node_db.clone();
658 async move {
659 match ticket_event {
660 TicketEvent::WinningTicket(winning) => {
661 if let Err(error) = node_db.insert_ticket(*winning).await {
662 tracing::error!(%error, %winning, "failed to insert ticket into database");
663 } else {
664 tracing::debug!(%winning, "inserted ticket into database");
665 }
666 Some(winning)
667 }
668 TicketEvent::RejectedTicket(rejected, issuer) => {
669 if let Some(issuer) = &issuer {
670 if let Err(error) =
671 node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
672 {
673 tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
674 } else {
675 tracing::debug!(%rejected, "marked ticket as rejected");
676 }
677 } else {
678 tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
679 }
680 None
681 }
682 }
683 }
684 })
685 .for_each(move |ticket| {
686 if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
687 tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
688 }
689 futures::future::ready(())
690 })
691 .inspect(|_| {
692 tracing::warn!(
693 task = %HoprLibProcess::TicketEvents,
694 "long-running background task finished"
695 )
696 }),
697 );
698
699 info!("starting transport");
700 let (hopr_socket, transport_processes) = self
701 .transport_api
702 .run(cover_traffic, network_builder, tickets_tx, session_tx)
703 .await?;
704 processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
705
706 info!("starting ticket redemption service");
707 let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
709 let _ = self.redeem_requests.set(redemption_req_tx);
710 let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
711 processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
712 let chain = self.chain_api.clone();
713 let node_db = self.node_db.clone();
714 spawn(redemption_req_rx
715 .for_each(move |selector| {
716 let chain = chain.clone();
717 let db = node_db.clone();
718 async move {
719 match chain.redeem_tickets_via_selectors(&db, [selector]).await {
720 Ok(res) => debug!(%res, "redemption complete"),
721 Err(error) => error!(%error, "redemption failed"),
722 }
723 }
724 })
725 .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
726 );
727
728 info!("subscribing to channel events");
729 let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
730 processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
731 let chain = self.chain_api.clone();
732 let node_db = self.node_db.clone();
733 let events = chain.subscribe().map_err(HoprLibError::chain)?;
734 spawn(
735 futures::stream::Abortable::new(
736 events
737 .filter_map(move |event|
738 futures::future::ready(event.try_as_channel_closed())
739 ),
740 chain_events_sub_reg
741 )
742 .for_each(move |closed_channel| {
743 let node_db = node_db.clone();
744 let chain = chain.clone();
745 async move {
746 match closed_channel.direction(chain.me()) {
747 Some(ChannelDirection::Incoming) => {
748 match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
749 Ok(num_neglected) if num_neglected > 0 => {
750 warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
751 },
752 Ok(_) => {
753 debug!(%closed_channel, "no neglected tickets on incoming closed channel");
754 },
755 Err(error) => {
756 error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
757 }
758 }
759 },
760 Some(ChannelDirection::Outgoing) => {
761 if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
762 error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
763 } else {
764 debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
765 }
766 }
767 _ => {} }
769 }
770 })
771 .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
772 );
773
774 info!("synchronizing ticket states");
775 let mut channels = self
780 .chain_api
781 .stream_channels(ChannelSelector {
782 destination: self.me_onchain().into(),
783 ..Default::default()
784 })
785 .map_err(HoprLibError::chain)
786 .await?;
787
788 while let Some(channel) = channels.next().await {
789 self.node_db
792 .update_ticket_states_and_fetch(
793 [TicketSelector::from(&channel)
794 .with_state(AcknowledgedTicketStatus::BeingRedeemed)
795 .with_index_range(channel.ticket_index..)],
796 AcknowledgedTicketStatus::Untouched,
797 )
798 .map_err(HoprLibError::db)
799 .await?
800 .for_each(|ticket| {
801 info!(%ticket, "fixed next out-of-sync ticket");
802 futures::future::ready(())
803 })
804 .await;
805
806 self.node_db
808 .mark_tickets_as(
809 [TicketSelector::from(&channel).with_index_range(..channel.ticket_index)],
810 TicketMarker::Neglected,
811 )
812 .map_err(HoprLibError::db)
813 .await?;
814 }
815
816 self.state.store(HoprState::Running, Ordering::Relaxed);
817
818 info!(
819 id = %self.me_peer_id(),
820 version = constants::APP_VERSION,
821 "NODE STARTED AND RUNNING"
822 );
823
824 #[cfg(all(feature = "telemetry", not(test)))]
825 METRIC_HOPR_NODE_INFO.set(
826 &[
827 &self.me.public().to_peerid_str(),
828 &me_onchain.to_string(),
829 &self.cfg.safe_module.safe_address.to_string(),
830 &self.cfg.safe_module.module_address.to_string(),
831 ],
832 1.0,
833 );
834
835 let _ = self.processes.set(processes);
836 Ok(hopr_socket)
837 }
838
839 pub fn shutdown(&self) -> Result<(), HoprLibError> {
847 self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
848 if let Some(processes) = self.processes.get() {
849 processes.abort_all();
850 }
851 self.state.store(HoprState::Terminated, Ordering::Relaxed);
852 info!("NODE SHUTDOWN COMPLETE");
853 Ok(())
854 }
855
856 #[cfg(feature = "session-client")]
859 pub async fn connect_to(
860 &self,
861 destination: Address,
862 target: SessionTarget,
863 cfg: HoprSessionClientConfig,
864 ) -> errors::Result<HoprSession> {
865 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
866
867 let backoff = backon::ConstantBuilder::default()
868 .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
869 .with_delay(self.cfg.protocol.session.establish_retry_timeout)
870 .with_jitter();
871
872 use backon::Retryable;
873
874 Ok((|| {
875 let cfg = hopr_transport::SessionClientConfig::from(cfg.clone());
876 let target = target.clone();
877 async { self.transport_api.new_session(destination, target, cfg).await }
878 })
879 .retry(backoff)
880 .sleep(backon::FuturesTimerSleeper)
881 .await?)
882 }
883
884 #[cfg(feature = "session-client")]
887 pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
888 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
889 Ok(self.transport_api.probe_session(id).await?)
890 }
891
892 #[cfg(feature = "session-client")]
893 pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
894 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
895 Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
896 }
897
898 #[cfg(feature = "session-client")]
899 pub async fn update_session_surb_balancer_config(
900 &self,
901 id: &SessionId,
902 cfg: SurbBalancerConfig,
903 ) -> errors::Result<()> {
904 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
905 Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
906 }
907
908 fn spawn_wait_for_on_chain_event(
911 &self,
912 context: impl std::fmt::Display,
913 predicate: impl Fn(&ChainEvent) -> bool + Send + Sync + 'static,
914 timeout: Duration,
915 ) -> errors::Result<(
916 impl Future<Output = errors::Result<ChainEvent>>,
917 hopr_async_runtime::AbortHandle,
918 )> {
919 debug!(%context, "registering wait for on-chain event");
920 let (event_stream, handle) = futures::stream::abortable(
921 self.chain_api
922 .subscribe()
923 .map_err(HoprLibError::chain)?
924 .skip_while(move |event| futures::future::ready(!predicate(event))),
925 );
926
927 let ctx = context.to_string();
928
929 Ok((
930 spawn(async move {
931 pin_mut!(event_stream);
932 let res = event_stream
933 .next()
934 .timeout(futures_time::time::Duration::from(timeout))
935 .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")))
936 .await?
937 .ok_or(HoprLibError::GeneralError(format!(
938 "failed to yield an on-chain event for {ctx}"
939 )));
940 debug!(%ctx, ?res, "on-chain event waiting done");
941 res
942 })
943 .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")))
944 .and_then(futures::future::ready),
945 handle,
946 ))
947 }
948}
949
950impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
951where
952 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
953 + hopr_api::graph::NetworkGraphUpdate
954 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
955 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
956 + Clone
957 + Send
958 + Sync
959 + 'static,
960 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
961 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
962 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
963 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
964{
965 pub fn collect_hopr_metrics() -> errors::Result<String> {
968 cfg_if::cfg_if! {
969 if #[cfg(all(feature = "telemetry", not(test)))] {
970 hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
971 } else {
972 Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
973 }
974 }
975 }
976}
977
978impl<Chain, Db, Graph, Net> HoprNodeOperations for Hopr<Chain, Db, Graph, Net>
981where
982 Chain: HoprChainApi + Clone + Send + Sync + 'static,
983 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
984 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
985 + hopr_api::graph::NetworkGraphUpdate
986 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
987 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
988 + Clone
989 + Send
990 + Sync
991 + 'static,
992 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
993 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
994 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
995 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
996{
997 fn status(&self) -> HoprState {
998 self.state.load(Ordering::Relaxed)
999 }
1000}
1001
1002#[async_trait::async_trait]
1003impl<Chain, Db, Graph, Net> HoprNodeNetworkOperations for Hopr<Chain, Db, Graph, Net>
1004where
1005 Chain: HoprChainApi + Clone + Send + Sync + 'static,
1006 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
1007 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
1008 + hopr_api::graph::NetworkGraphUpdate
1009 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
1010 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1011 + Clone
1012 + Send
1013 + Sync
1014 + 'static,
1015 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
1016 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1017 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1018 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1019{
1020 type Error = HoprLibError;
1021 type TransportObservable = <Graph as hopr_api::graph::NetworkGraphView>::Observed;
1022
1023 fn me_peer_id(&self) -> PeerId {
1024 (*self.me.public()).into()
1025 }
1026
1027 async fn get_public_nodes(&self) -> Result<Vec<(PeerId, Address, Vec<Multiaddr>)>, Self::Error> {
1028 Ok(self
1029 .chain_api
1030 .stream_accounts(AccountSelector {
1031 public_only: true,
1032 ..Default::default()
1033 })
1034 .map_err(HoprLibError::chain)
1035 .await?
1036 .map(|entry| {
1037 (
1038 PeerId::from(entry.public_key),
1039 entry.chain_addr,
1040 entry.get_multiaddrs().to_vec(),
1041 )
1042 })
1043 .collect()
1044 .await)
1045 }
1046
1047 async fn network_health(&self) -> hopr_api::network::Health {
1048 self.transport_api.network_health().await
1049 }
1050
1051 async fn network_connected_peers(&self) -> Result<Vec<PeerId>, Self::Error> {
1052 Ok(self
1053 .transport_api
1054 .network_connected_peers()
1055 .await?
1056 .into_iter()
1057 .map(PeerId::from)
1058 .collect())
1059 }
1060
1061 fn network_peer_info(&self, peer: &PeerId) -> Option<Self::TransportObservable> {
1062 let pubkey = OffchainPublicKey::from_peerid(peer).ok()?;
1063 self.transport_api.network_peer_observations(&pubkey)
1064 }
1065
1066 async fn all_network_peers(
1067 &self,
1068 minimum_score: f64,
1069 ) -> Result<Vec<(Option<Address>, PeerId, Self::TransportObservable)>, Self::Error> {
1070 Ok(
1071 futures::stream::iter(self.transport_api.all_network_peers(minimum_score).await?)
1072 .filter_map(|(pubkey, info)| async move {
1073 let peer_id = PeerId::from(pubkey);
1074 let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
1075 Some((address, peer_id, info))
1076 })
1077 .collect::<Vec<_>>()
1078 .await,
1079 )
1080 }
1081
1082 fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1083 self.transport_api.local_multiaddresses()
1084 }
1085
1086 async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1087 self.transport_api.listening_multiaddresses().await
1088 }
1089
1090 async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1091 let Ok(pubkey) = hopr_transport::peer_id_to_public_key(peer).await else {
1092 return vec![];
1093 };
1094 self.transport_api.network_observed_multiaddresses(&pubkey).await
1095 }
1096
1097 async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Result<Vec<Multiaddr>, Self::Error> {
1098 let pubkey = hopr_transport::peer_id_to_public_key(peer)
1099 .await
1100 .map_err(HoprLibError::TransportError)?;
1101
1102 match self
1103 .chain_api
1104 .stream_accounts(AccountSelector {
1105 public_only: false,
1106 offchain_key: Some(pubkey),
1107 ..Default::default()
1108 })
1109 .map_err(HoprLibError::chain)
1110 .await?
1111 .next()
1112 .await
1113 {
1114 Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
1115 None => {
1116 error!(%peer, "no information");
1117 Ok(vec![])
1118 }
1119 }
1120 }
1121
1122 async fn ping(&self, peer: &PeerId) -> Result<(Duration, Self::TransportObservable), Self::Error> {
1123 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1124 let pubkey = hopr_transport::peer_id_to_public_key(peer)
1125 .await
1126 .map_err(HoprLibError::TransportError)?;
1127 Ok(self.transport_api.ping(&pubkey).await?)
1128 }
1129}
1130
1131pub type SinkMap = SinkMapErr<futures::channel::mpsc::Sender<TicketSelector>, fn(SendError) -> HoprLibError>;
1132
1133impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
1134where
1135 Chain: HoprChainApi + Clone + Send + Sync + 'static,
1136 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
1137 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
1138 + hopr_api::graph::NetworkGraphUpdate
1139 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
1140 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1141 + Clone
1142 + Send
1143 + Sync
1144 + 'static,
1145 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
1146 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
1147 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1148 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1149{
1150 pub fn me_onchain(&self) -> Address {
1151 *self.chain_api.me()
1152 }
1153
1154 pub fn get_safe_config(&self) -> SafeModuleConfig {
1155 SafeModuleConfig {
1156 safe_address: self.cfg.safe_module.safe_address,
1157 module_address: self.cfg.safe_module.module_address,
1158 }
1159 }
1160
1161 pub async fn get_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1162 self.chain_api
1163 .balance(self.me_onchain())
1164 .await
1165 .map_err(HoprLibError::chain)
1166 }
1167
1168 pub async fn get_safe_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1169 self.chain_api
1170 .balance(self.cfg.safe_module.safe_address)
1171 .await
1172 .map_err(HoprLibError::chain)
1173 }
1174
1175 pub async fn safe_allowance(&self) -> Result<HoprBalance, HoprLibError> {
1176 self.chain_api
1177 .safe_allowance(self.cfg.safe_module.safe_address)
1178 .await
1179 .map_err(HoprLibError::chain)
1180 }
1181
1182 pub async fn chain_info(&self) -> Result<ChainInfo, HoprLibError> {
1183 self.chain_api.chain_info().await.map_err(HoprLibError::chain)
1184 }
1185
1186 pub async fn get_ticket_price(&self) -> Result<HoprBalance, HoprLibError> {
1187 self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1188 }
1189
1190 pub async fn get_minimum_incoming_ticket_win_probability(&self) -> Result<WinningProbability, HoprLibError> {
1191 self.chain_api
1192 .minimum_incoming_ticket_win_prob()
1193 .await
1194 .map_err(HoprLibError::chain)
1195 }
1196
1197 pub async fn get_channel_closure_notice_period(&self) -> Result<Duration, HoprLibError> {
1198 self.chain_api
1199 .channel_closure_notice_period()
1200 .await
1201 .map_err(HoprLibError::chain)
1202 }
1203
1204 pub async fn accounts_announced_on_chain(&self) -> Result<Vec<AccountEntry>, HoprLibError> {
1205 Ok(self
1206 .chain_api
1207 .stream_accounts(AccountSelector {
1208 public_only: true,
1209 ..Default::default()
1210 })
1211 .map_err(HoprLibError::chain)
1212 .await?
1213 .collect()
1214 .await)
1215 }
1216
1217 pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> Result<Option<Address>, HoprLibError> {
1218 let pubkey = hopr_transport::peer_id_to_public_key(peer_id)
1219 .await
1220 .map_err(HoprLibError::TransportError)?;
1221
1222 self.chain_api
1223 .packet_key_to_chain_key(&pubkey)
1224 .await
1225 .map_err(HoprLibError::chain)
1226 }
1227
1228 pub async fn chain_key_to_peerid(&self, address: &Address) -> Result<Option<PeerId>, HoprLibError> {
1229 self.chain_api
1230 .chain_key_to_packet_key(address)
1231 .await
1232 .map(|pk| pk.map(|v| v.into()))
1233 .map_err(HoprLibError::chain)
1234 }
1235
1236 pub async fn channel_from_hash(&self, channel_id: &Hash) -> Result<Option<ChannelEntry>, HoprLibError> {
1237 self.chain_api
1238 .channel_by_id(channel_id)
1239 .await
1240 .map_err(HoprLibError::chain)
1241 }
1242
1243 pub async fn channel(&self, src: &Address, dest: &Address) -> Result<Option<ChannelEntry>, HoprLibError> {
1244 self.chain_api
1245 .channel_by_parties(src, dest)
1246 .await
1247 .map_err(HoprLibError::chain)
1248 }
1249
1250 pub async fn channels_from(&self, src: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1251 Ok(self
1252 .chain_api
1253 .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1254 ChannelStatusDiscriminants::Closed,
1255 ChannelStatusDiscriminants::Open,
1256 ChannelStatusDiscriminants::PendingToClose,
1257 ]))
1258 .map_err(HoprLibError::chain)
1259 .await?
1260 .collect()
1261 .await)
1262 }
1263
1264 pub async fn channels_to(&self, dest: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1265 Ok(self
1266 .chain_api
1267 .stream_channels(
1268 ChannelSelector::default()
1269 .with_destination(*dest)
1270 .with_allowed_states(&[
1271 ChannelStatusDiscriminants::Closed,
1272 ChannelStatusDiscriminants::Open,
1273 ChannelStatusDiscriminants::PendingToClose,
1274 ]),
1275 )
1276 .map_err(HoprLibError::chain)
1277 .await?
1278 .collect()
1279 .await)
1280 }
1281
1282 pub async fn all_channels(&self) -> Result<Vec<ChannelEntry>, HoprLibError> {
1283 Ok(self
1284 .chain_api
1285 .stream_channels(ChannelSelector::default().with_allowed_states(&[
1286 ChannelStatusDiscriminants::Closed,
1287 ChannelStatusDiscriminants::Open,
1288 ChannelStatusDiscriminants::PendingToClose,
1289 ]))
1290 .map_err(HoprLibError::chain)
1291 .await?
1292 .collect()
1293 .await)
1294 }
1295
1296 pub async fn open_channel(
1297 &self,
1298 destination: &Address,
1299 amount: HoprBalance,
1300 ) -> Result<OpenChannelResult, HoprLibError> {
1301 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1302
1303 let channel_id = generate_channel_id(&self.me_onchain(), destination);
1304
1305 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1308 format!("open channel to {destination} ({channel_id})"),
1309 move |event| matches!(event, ChainEvent::ChannelOpened(c) if c.get_id() == &channel_id),
1310 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1311 )?;
1312
1313 let confirm_awaiter = self
1314 .chain_api
1315 .open_channel(destination, amount)
1316 .await
1317 .map_err(HoprLibError::chain)?;
1318
1319 let tx_hash = confirm_awaiter.await.map_err(|e| {
1320 event_abort.abort();
1321 HoprLibError::chain(e)
1322 })?;
1323
1324 let event = event_awaiter.await?;
1325 debug!(%event, "open channel event received");
1326
1327 Ok(OpenChannelResult { tx_hash, channel_id })
1328 }
1329
1330 pub async fn fund_channel(&self, channel_id: &ChannelId, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1331 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1332
1333 let channel_id = *channel_id;
1334
1335 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1338 format!("fund channel {channel_id}"),
1339 move |event| matches!(event, ChainEvent::ChannelBalanceIncreased(c, a) if c.get_id() == &channel_id && a == &amount),
1340 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT
1341 )?;
1342
1343 let confirm_awaiter = self
1344 .chain_api
1345 .fund_channel(&channel_id, amount)
1346 .await
1347 .map_err(HoprLibError::chain)?;
1348
1349 let res = confirm_awaiter.await.map_err(|e| {
1350 event_abort.abort();
1351 HoprLibError::chain(e)
1352 })?;
1353
1354 let event = event_awaiter.await?;
1355 debug!(%event, "fund channel event received");
1356
1357 Ok(res)
1358 }
1359
1360 pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> Result<CloseChannelResult, HoprLibError> {
1361 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1362
1363 let channel_id = *channel_id;
1364
1365 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1368 format!("close channel {channel_id}"),
1369 move |event| {
1370 matches!(event, ChainEvent::ChannelClosed(c) if c.get_id() == &channel_id)
1371 || matches!(event, ChainEvent::ChannelClosureInitiated(c) if c.get_id() == &channel_id)
1372 },
1373 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1374 )?;
1375
1376 let confirm_awaiter = self
1377 .chain_api
1378 .close_channel(&channel_id)
1379 .await
1380 .map_err(HoprLibError::chain)?;
1381
1382 let tx_hash = confirm_awaiter.await.map_err(|e| {
1383 event_abort.abort();
1384 HoprLibError::chain(e)
1385 })?;
1386
1387 let event = event_awaiter.await?;
1388 debug!(%event, "close channel event received");
1389
1390 Ok(CloseChannelResult { tx_hash })
1391 }
1392
1393 pub async fn tickets_in_channel(
1394 &self,
1395 channel_id: &ChannelId,
1396 ) -> Result<Option<Vec<RedeemableTicket>>, HoprLibError> {
1397 if let Some(channel) = self
1398 .chain_api
1399 .channel_by_id(channel_id)
1400 .await
1401 .map_err(|e| HoprTransportError::Other(e.into()))?
1402 {
1403 if &channel.destination == self.chain_api.me() {
1404 Ok(Some(
1405 self.node_db
1406 .stream_tickets([&channel])
1407 .await
1408 .map_err(HoprLibError::db)?
1409 .collect()
1410 .await,
1411 ))
1412 } else {
1413 Ok(None)
1414 }
1415 } else {
1416 Ok(None)
1417 }
1418 }
1419
1420 pub async fn all_tickets(&self) -> Result<Vec<VerifiedTicket>, HoprLibError> {
1421 Ok(self
1422 .node_db
1423 .stream_tickets(None::<TicketSelector>)
1424 .await
1425 .map_err(HoprLibError::db)?
1426 .map(|v| v.ticket)
1427 .collect()
1428 .await)
1429 }
1430
1431 pub async fn ticket_statistics(&self) -> Result<ChannelTicketStatistics, HoprLibError> {
1432 self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
1433 }
1434
1435 pub async fn reset_ticket_statistics(&self) -> Result<(), HoprLibError> {
1436 self.node_db
1437 .reset_ticket_statistics()
1438 .await
1439 .map_err(HoprLibError::chain)
1440 }
1441
1442 pub async fn redeem_all_tickets<B: Into<HoprBalance> + Send>(&self, min_value: B) -> Result<(), HoprLibError> {
1443 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1444
1445 let min_value = min_value.into();
1446
1447 self.chain_api
1448 .stream_channels(
1449 ChannelSelector::default()
1450 .with_destination(self.me_onchain())
1451 .with_allowed_states(&[
1452 ChannelStatusDiscriminants::Open,
1453 ChannelStatusDiscriminants::PendingToClose,
1454 ]),
1455 )
1456 .map_err(HoprLibError::chain)
1457 .await?
1458 .map(|channel| {
1459 Ok(TicketSelector::from(&channel)
1460 .with_amount(min_value..)
1461 .with_index_range(channel.ticket_index..)
1462 .with_state(AcknowledgedTicketStatus::Untouched))
1463 })
1464 .forward(self.redemption_requests()?)
1465 .await?;
1466
1467 Ok(())
1468 }
1469
1470 pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance> + Send>(
1471 &self,
1472 counterparty: &Address,
1473 min_value: B,
1474 ) -> Result<(), HoprLibError> {
1475 self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1476 .await
1477 }
1478
1479 pub async fn redeem_tickets_in_channel<B: Into<HoprBalance> + Send>(
1480 &self,
1481 channel_id: &Hash,
1482 min_value: B,
1483 ) -> Result<(), HoprLibError> {
1484 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1485
1486 let channel = self
1487 .chain_api
1488 .channel_by_id(channel_id)
1489 .await
1490 .map_err(HoprLibError::chain)?
1491 .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1492
1493 self.redemption_requests()?
1494 .send(
1495 TicketSelector::from(channel)
1496 .with_amount(min_value.into()..)
1497 .with_index_range(channel.ticket_index..)
1498 .with_state(AcknowledgedTicketStatus::Untouched),
1499 )
1500 .await?;
1501
1502 Ok(())
1503 }
1504
1505 pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> Result<(), HoprLibError> {
1506 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1507
1508 self.redemption_requests()?
1509 .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1510 .await?;
1511
1512 Ok(())
1513 }
1514
1515 pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send + 'static {
1516 self.winning_ticket_subscribers.1.activate_cloned()
1517 }
1518
1519 pub fn redemption_requests(&self) -> Result<SinkMap, HoprLibError> {
1520 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1521
1522 Ok(self
1523 .redeem_requests
1524 .get()
1525 .cloned()
1526 .expect("redeem_requests is not initialized")
1527 .sink_map_err(|e| HoprLibError::GeneralError(format!("failed to send redemption request: {e}"))))
1528 }
1529
1530 pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1531 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1532
1533 self.chain_api
1534 .withdraw(amount, &recipient)
1535 .and_then(identity)
1536 .map_err(HoprLibError::chain)
1537 .await
1538 }
1539
1540 pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> Result<Hash, HoprLibError> {
1541 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1542
1543 self.chain_api
1544 .withdraw(amount, &recipient)
1545 .and_then(identity)
1546 .map_err(HoprLibError::chain)
1547 .await
1548 }
1549}