1mod helpers;
18
19pub mod config;
21pub mod constants;
23pub mod errors;
25pub mod traits;
27pub mod utils;
29
30pub use hopr_api as api;
31
32#[doc(hidden)]
34pub mod exports {
35 pub mod types {
36 pub use hopr_chain_types as chain;
37 pub use hopr_internal_types as internal;
38 pub use hopr_primitive_types as primitive;
39 }
40
41 pub mod crypto {
42 pub use hopr_crypto_keypair as keypair;
43 pub use hopr_crypto_types as types;
44 }
45
46 pub mod network {
47 pub use hopr_network_types as types;
48 }
49
50 pub use hopr_transport as transport;
51}
52
53#[doc(hidden)]
55pub mod prelude {
56 pub use super::exports::{
57 crypto::{
58 keypair::key_pair::HoprKeys,
59 types::prelude::{ChainKeypair, Hash, OffchainKeypair},
60 },
61 network::types::{
62 prelude::ForeignDataMode,
63 udp::{ConnectedUdpStream, UdpStreamParallelism},
64 },
65 transport::{OffchainPublicKey, socket::HoprSocket},
66 types::primitive::prelude::Address,
67 };
68}
69
70use std::{
71 convert::identity,
72 future::Future,
73 sync::{Arc, OnceLock, atomic::Ordering},
74 time::Duration,
75};
76
77use futures::{
78 FutureExt, SinkExt, Stream, StreamExt, TryFutureExt,
79 channel::mpsc::{SendError, channel},
80 pin_mut,
81 sink::SinkMapErr,
82};
83use futures_time::future::FutureExt as FuturesTimeFutureExt;
84use hopr_api::{
85 chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
86 ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
87 db::{HoprNodeDbApi, TicketMarker, TicketSelector},
88 node::{ChainInfo, CloseChannelResult, OpenChannelResult, SafeModuleConfig, state::AtomicHoprState},
89};
90pub use hopr_api::{
91 db::ChannelTicketStatistics,
92 graph::EdgeLinkObservable,
93 network::{NetworkBuilder, NetworkStreamControl},
94 node::{HoprNodeNetworkOperations, HoprNodeOperations, state::HoprState},
95};
96use hopr_async_runtime::prelude::spawn;
97pub use hopr_async_runtime::{Abortable, AbortableList};
98pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
99pub use hopr_crypto_types::prelude::*;
100pub use hopr_internal_types::prelude::*;
101pub use hopr_network_types::prelude::*;
102#[cfg(all(feature = "prometheus", not(test)))]
103use hopr_platform::time::native::current_time;
104pub use hopr_primitive_types::prelude::*;
105use hopr_transport::errors::HoprTransportError;
106#[cfg(feature = "runtime-tokio")]
107pub use hopr_transport::transfer_session;
108pub use hopr_transport::*;
109use tracing::{debug, error, info, warn};
110use validator::Validate;
111
112pub use crate::{
113 config::SafeModule,
114 constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
115 errors::{HoprLibError, HoprStatusError},
116};
117
118#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display, strum::EnumCount)]
120pub enum HoprLibProcess {
121 #[strum(to_string = "transport: {0}")]
122 Transport(HoprTransportProcess),
123 #[strum(to_string = "session server providing the exit node session stream functionality")]
124 SessionServer,
125 #[strum(to_string = "ticket redemption queue driver")]
126 TicketRedemptions,
127 #[strum(to_string = "subscription for on-chain channel updates")]
128 ChannelEvents,
129 #[strum(to_string = "on received ticket event (winning or rejected)")]
130 TicketEvents,
131}
132
133#[cfg(all(feature = "prometheus", not(test)))]
134lazy_static::lazy_static! {
135 static ref METRIC_PROCESS_START_TIME: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
136 "hopr_start_time",
137 "The unix timestamp in seconds at which the process was started"
138 ).unwrap();
139 static ref METRIC_HOPR_LIB_VERSION: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
140 "hopr_lib_version",
141 "Executed version of hopr-lib",
142 &["version"]
143 ).unwrap();
144 static ref METRIC_HOPR_NODE_INFO: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
145 "hopr_node_addresses",
146 "Node on-chain and off-chain addresses",
147 &["peerid", "address", "safe_address", "module_address"]
148 ).unwrap();
149}
150
151#[cfg(feature = "runtime-tokio")]
156pub fn prepare_tokio_runtime(
157 num_cpu_threads: Option<std::num::NonZeroUsize>,
158 num_io_threads: Option<std::num::NonZeroUsize>,
159) -> anyhow::Result<tokio::runtime::Runtime> {
160 use std::str::FromStr;
161 let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
162
163 hopr_parallelize::cpu::init_thread_pool(
164 num_cpu_threads
165 .map(|v| v.get())
166 .or(avail_parallelism)
167 .ok_or(anyhow::anyhow!(
168 "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
169 environment variable."
170 ))?
171 .max(1),
172 )?;
173
174 Ok(tokio::runtime::Builder::new_multi_thread()
175 .enable_all()
176 .worker_threads(
177 num_io_threads
178 .map(|v| v.get())
179 .or(avail_parallelism)
180 .ok_or(anyhow::anyhow!(
181 "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
182 environment variable."
183 ))?
184 .max(1),
185 )
186 .thread_name("hoprd")
187 .thread_stack_size(
188 std::env::var("HOPRD_THREAD_STACK_SIZE")
189 .ok()
190 .and_then(|v| usize::from_str(&v).ok())
191 .unwrap_or(10 * 1024 * 1024)
192 .max(2 * 1024 * 1024),
193 )
194 .build()?)
195}
196
197pub type HoprTransportIO = socket::HoprSocket<
199 futures::channel::mpsc::Receiver<ApplicationDataIn>,
200 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
201>;
202
203type NewTicketEvents = (
204 async_broadcast::Sender<VerifiedTicket>,
205 async_broadcast::InactiveReceiver<VerifiedTicket>,
206);
207
208const NODE_READY_TIMEOUT: Duration = Duration::from_secs(120);
210
211const ON_CHAIN_RESOLUTION_EVENT_TIMEOUT: Duration = Duration::from_secs(90);
214
215pub struct Hopr<Chain, Db, Graph, Net>
227where
228 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
229 + hopr_api::graph::NetworkGraphUpdate
230 + Clone
231 + Send
232 + Sync
233 + 'static,
234 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
235{
236 me: OffchainKeypair,
237 cfg: config::HoprLibConfig,
238 state: Arc<api::node::state::AtomicHoprState>,
239 transport_api: HoprTransport<Chain, Db, Graph, Net>,
240 redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
241 node_db: Db,
242 chain_api: Chain,
243 winning_ticket_subscribers: NewTicketEvents,
244 processes: OnceLock<AbortableList<HoprLibProcess>>,
245}
246
247impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
248where
249 Chain: HoprChainApi + Clone + Send + Sync + 'static,
250 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
251 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
252 + hopr_api::graph::NetworkGraphUpdate
253 + Clone
254 + Send
255 + Sync
256 + 'static,
257 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
258{
259 pub async fn new(
260 identity: (&ChainKeypair, &OffchainKeypair),
261 hopr_chain_api: Chain,
262 hopr_node_db: Db,
263 graph: Graph,
264 cfg: config::HoprLibConfig,
265 ) -> errors::Result<Self> {
266 if hopr_crypto_random::is_rng_fixed() {
267 warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
268 }
269
270 cfg.validate()?;
271
272 let hopr_transport_api = HoprTransport::new(
273 identity,
274 hopr_chain_api.clone(),
275 hopr_node_db.clone(),
276 graph,
277 vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
278 cfg.protocol,
279 );
280
281 #[cfg(all(feature = "prometheus", not(test)))]
282 {
283 METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
284 METRIC_HOPR_LIB_VERSION.set(
285 &[const_format::formatcp!("{}", constants::APP_VERSION)],
286 const_format::formatcp!(
287 "{}.{}",
288 env!("CARGO_PKG_VERSION_MAJOR"),
289 env!("CARGO_PKG_VERSION_MINOR")
290 )
291 .parse()
292 .unwrap_or(0.0),
293 );
294
295 if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
297 error!(%error, "failed to initialize ticket statistics metrics");
298 }
299 }
300
301 let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
302 new_tickets_tx.set_await_active(false);
303 new_tickets_tx.set_overflow(true);
304
305 Ok(Self {
306 me: identity.1.clone(),
307 cfg,
308 state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
309 transport_api: hopr_transport_api,
310 chain_api: hopr_chain_api,
311 node_db: hopr_node_db,
312 redeem_requests: OnceLock::new(),
313 processes: OnceLock::new(),
314 winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
315 })
316 }
317
318 fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
319 if HoprNodeOperations::status(self) == state {
320 Ok(())
321 } else {
322 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
323 }
324 }
325
326 pub fn config(&self) -> &config::HoprLibConfig {
327 &self.cfg
328 }
329
330 #[inline]
331 fn is_public(&self) -> bool {
332 self.cfg.publish
333 }
334
335 #[cfg(feature = "telemetry")]
338 pub async fn network_peer_packet_stats(&self, peer: &PeerId) -> errors::Result<Option<PeerPacketStatsSnapshot>> {
339 Ok(self.transport_api.network_peer_packet_stats(peer).await?)
340 }
341
342 #[cfg(feature = "telemetry")]
345 pub async fn network_all_packet_stats(&self) -> errors::Result<Vec<(PeerId, PeerPacketStatsSnapshot)>> {
346 Ok(self.transport_api.network_all_packet_stats().await?)
347 }
348
349 pub async fn run<
350 Ct,
351 NetBuilder,
352 #[cfg(feature = "session-server")] T: traits::HoprSessionServer + Clone + Send + 'static,
353 >(
354 &self,
355 cover_traffic: Ct,
356 network_builder: NetBuilder,
357 #[cfg(feature = "session-server")] serve_handler: T,
358 ) -> errors::Result<HoprTransportIO>
359 where
360 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
361 NetBuilder: NetworkBuilder<Network = Net> + Send + Sync + 'static,
362 {
363 self.error_if_not_in_state(
364 HoprState::Uninitialized,
365 "cannot start the hopr node multiple times".into(),
366 )?;
367
368 #[cfg(feature = "testing")]
369 warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
370
371 let me_onchain = *self.chain_api.me();
372 info!(
373 address = %me_onchain, minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
374 "node is not started, please fund this node",
375 );
376
377 self.state.store(HoprState::WaitingForFunds, Ordering::Relaxed);
378 helpers::wait_for_funds(
379 *MIN_NATIVE_BALANCE,
380 *SUGGESTED_NATIVE_BALANCE,
381 Duration::from_secs(200),
382 me_onchain,
383 &self.chain_api,
384 )
385 .await?;
386
387 let mut processes = AbortableList::<HoprLibProcess>::default();
388
389 info!("starting HOPR node...");
390 self.state.store(HoprState::CheckingBalance, Ordering::Relaxed);
391
392 let balance: XDaiBalance = self.chain_api.balance(me_onchain).await.map_err(HoprLibError::chain)?;
393 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
394
395 info!(
396 address = %me_onchain,
397 %balance,
398 %minimum_balance,
399 "node information"
400 );
401
402 if balance.le(&minimum_balance) {
403 return Err(HoprLibError::GeneralError(
404 "cannot start the node without a sufficiently funded wallet".into(),
405 ));
406 }
407
408 self.state.store(HoprState::ValidatingNetworkConfig, Ordering::Relaxed);
409
410 let network_min_ticket_price = self
413 .chain_api
414 .minimum_ticket_price()
415 .await
416 .map_err(HoprLibError::chain)?;
417 let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
418 if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
419 return Err(HoprLibError::GeneralError(format!(
420 "configured outgoing ticket price is lower than the network minimum ticket price: \
421 {configured_ticket_price:?} < {network_min_ticket_price}"
422 )));
423 }
424 let network_min_win_prob = self
427 .chain_api
428 .minimum_incoming_ticket_win_prob()
429 .await
430 .map_err(HoprLibError::chain)?;
431 let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
432 if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
433 && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
434 {
435 return Err(HoprLibError::GeneralError(format!(
436 "configured outgoing ticket winning probability is lower than the network minimum winning \
437 probability: {configured_win_prob:?} < {network_min_win_prob}"
438 )));
439 }
440
441 self.state.store(HoprState::CheckingOnchainAddress, Ordering::Relaxed);
442
443 info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
444
445 let safe_addr = self.cfg.safe_module.safe_address;
446
447 if self.me_onchain() == safe_addr {
448 return Err(HoprLibError::GeneralError(
449 "cannot use self as staking safe address".into(),
450 ));
451 }
452
453 self.state.store(HoprState::RegisteringSafe, Ordering::Relaxed);
454 info!(%safe_addr, "registering safe with this node");
455 match self.chain_api.register_safe(&safe_addr).await {
456 Ok(awaiter) => {
457 awaiter.await.map_err(|error| {
459 error!(%safe_addr, %error, "safe registration failed with error");
460 HoprLibError::chain(error)
461 })?;
462 info!(%safe_addr, "safe successfully registered with this node");
463 }
464 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
465 info!(%safe_addr, "this safe is already registered with this node");
466 }
467 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
468 error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
470 return Err(HoprLibError::GeneralError("node registered with different safe".into()));
471 }
472 Err(error) => {
473 error!(%safe_addr, %error, "safe registration failed");
474 return Err(HoprLibError::chain(error));
475 }
476 }
477
478 let multiaddresses_to_announce = if self.is_public() {
480 self.transport_api.announceable_multiaddresses()
483 } else {
484 Vec::with_capacity(0)
485 };
486
487 multiaddresses_to_announce
489 .iter()
490 .filter(|a| !is_public_address(a))
491 .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
492
493 self.state.store(HoprState::AnnouncingNode, Ordering::Relaxed);
494
495 let chain_api = self.chain_api.clone();
496 let me_offchain = *self.me.public();
497 let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
498
499 info!(?multiaddresses_to_announce, "announcing node on chain");
502 match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
503 Ok(awaiter) => {
504 awaiter.await.map_err(|error| {
506 error!(?multiaddresses_to_announce, %error, "node announcement failed");
507 HoprLibError::chain(error)
508 })?;
509 info!(?multiaddresses_to_announce, "node has been successfully announced");
510 }
511 Err(AnnouncementError::AlreadyAnnounced) => {
512 info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
513 }
514 Err(error) => {
515 error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
516 return Err(HoprLibError::chain(error));
517 }
518 }
519
520 self.state.store(HoprState::AwaitingKeyBinding, Ordering::Relaxed);
521
522 let this_node_account = node_ready
524 .await
525 .map_err(HoprLibError::other)?
526 .map_err(HoprLibError::chain)?;
527 if this_node_account.chain_addr != self.me_onchain()
528 || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
529 {
530 error!(%this_node_account, "account bound to offchain key does not match this node");
531 return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
532 }
533
534 info!(%this_node_account, "node account is ready");
535
536 self.state.store(HoprState::InitializingServices, Ordering::Relaxed);
537
538 info!("initializing session infrastructure");
539 let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
540 .ok()
541 .and_then(|s| s.trim().parse::<usize>().ok())
542 .filter(|&c| c > 0)
543 .unwrap_or(256);
544
545 let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
546 #[cfg(feature = "session-server")]
547 {
548 debug!(capacity = incoming_session_channel_capacity, "creating session server");
549 processes.insert(
550 HoprLibProcess::SessionServer,
551 hopr_async_runtime::spawn_as_abortable!(
552 _session_rx
553 .for_each_concurrent(None, move |session| {
554 let serve_handler = serve_handler.clone();
555 async move {
556 let session_id = *session.session.id();
557 match serve_handler.process(session).await {
558 Ok(_) => debug!(?session_id, "client session processed successfully"),
559 Err(error) => error!(
560 ?session_id,
561 %error,
562 "client session processing failed"
563 ),
564 }
565 }
566 })
567 .inspect(|_| tracing::warn!(
568 task = %HoprLibProcess::SessionServer,
569 "long-running background task finished"
570 ))
571 ),
572 );
573 }
574
575 info!("starting ticket events processor");
576 let (tickets_tx, tickets_rx) = channel(8192);
577 let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
578 processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
579 let node_db = self.node_db.clone();
580 let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
581 spawn(
582 tickets_rx
583 .filter_map(move |ticket_event| {
584 let node_db = node_db.clone();
585 async move {
586 match ticket_event {
587 TicketEvent::WinningTicket(winning) => {
588 if let Err(error) = node_db.insert_ticket(*winning).await {
589 tracing::error!(%error, %winning, "failed to insert ticket into database");
590 } else {
591 tracing::debug!(%winning, "inserted ticket into database");
592 }
593 Some(winning)
594 }
595 TicketEvent::RejectedTicket(rejected, issuer) => {
596 if let Some(issuer) = &issuer {
597 if let Err(error) =
598 node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
599 {
600 tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
601 } else {
602 tracing::debug!(%rejected, "marked ticket as rejected");
603 }
604 } else {
605 tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
606 }
607 None
608 }
609 }
610 }
611 })
612 .for_each(move |ticket| {
613 if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
614 tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
615 }
616 futures::future::ready(())
617 })
618 .inspect(|_| {
619 tracing::warn!(
620 task = %HoprLibProcess::TicketEvents,
621 "long-running background task finished"
622 )
623 }),
624 );
625
626 info!("starting transport");
627 let (hopr_socket, transport_processes) = self
628 .transport_api
629 .run(cover_traffic, network_builder, tickets_tx, session_tx)
630 .await?;
631 processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
632
633 info!("starting ticket redemption service");
634 let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
636 let _ = self.redeem_requests.set(redemption_req_tx);
637 let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
638 processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
639 let chain = self.chain_api.clone();
640 let node_db = self.node_db.clone();
641 spawn(redemption_req_rx
642 .for_each(move |selector| {
643 let chain = chain.clone();
644 let db = node_db.clone();
645 async move {
646 match chain.redeem_tickets_via_selectors(&db, [selector]).await {
647 Ok(res) => debug!(%res, "redemption complete"),
648 Err(error) => error!(%error, "redemption failed"),
649 }
650 }
651 })
652 .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
653 );
654
655 info!("subscribing to channel events");
656 let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
657 processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
658 let chain = self.chain_api.clone();
659 let node_db = self.node_db.clone();
660 let events = chain.subscribe().map_err(HoprLibError::chain)?;
661 spawn(
662 futures::stream::Abortable::new(
663 events
664 .filter_map(move |event|
665 futures::future::ready(event.try_as_channel_closed())
666 ),
667 chain_events_sub_reg
668 )
669 .for_each(move |closed_channel| {
670 let node_db = node_db.clone();
671 let chain = chain.clone();
672 async move {
673 match closed_channel.direction(chain.me()) {
674 Some(ChannelDirection::Incoming) => {
675 match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
676 Ok(num_neglected) if num_neglected > 0 => {
677 warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
678 },
679 Ok(_) => {
680 debug!(%closed_channel, "no neglected tickets on incoming closed channel");
681 },
682 Err(error) => {
683 error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
684 }
685 }
686 },
687 Some(ChannelDirection::Outgoing) => {
688 if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
689 error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
690 } else {
691 debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
692 }
693 }
694 _ => {} }
696 }
697 })
698 .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
699 );
700
701 info!("synchronizing ticket states");
702 let mut channels = self
707 .chain_api
708 .stream_channels(ChannelSelector {
709 destination: self.me_onchain().into(),
710 ..Default::default()
711 })
712 .map_err(HoprLibError::chain)
713 .await?;
714
715 while let Some(channel) = channels.next().await {
716 self.node_db
719 .update_ticket_states_and_fetch(
720 [TicketSelector::from(&channel)
721 .with_state(AcknowledgedTicketStatus::BeingRedeemed)
722 .with_index_range(channel.ticket_index..)],
723 AcknowledgedTicketStatus::Untouched,
724 )
725 .map_err(HoprLibError::db)
726 .await?
727 .for_each(|ticket| {
728 info!(%ticket, "fixed next out-of-sync ticket");
729 futures::future::ready(())
730 })
731 .await;
732
733 self.node_db
735 .mark_tickets_as(
736 [TicketSelector::from(&channel).with_index_range(..channel.ticket_index)],
737 TicketMarker::Neglected,
738 )
739 .map_err(HoprLibError::db)
740 .await?;
741 }
742
743 self.state.store(HoprState::Running, Ordering::Relaxed);
744
745 info!(
746 id = %self.me_peer_id(),
747 version = constants::APP_VERSION,
748 "NODE STARTED AND RUNNING"
749 );
750
751 #[cfg(all(feature = "prometheus", not(test)))]
752 METRIC_HOPR_NODE_INFO.set(
753 &[
754 &self.me.public().to_peerid_str(),
755 &me_onchain.to_string(),
756 &self.cfg.safe_module.safe_address.to_string(),
757 &self.cfg.safe_module.module_address.to_string(),
758 ],
759 1.0,
760 );
761
762 let _ = self.processes.set(processes);
763 Ok(hopr_socket)
764 }
765
766 pub fn shutdown(&self) -> Result<(), HoprLibError> {
774 self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
775 if let Some(processes) = self.processes.get() {
776 processes.abort_all();
777 }
778 self.state.store(HoprState::Terminated, Ordering::Relaxed);
779 info!("NODE SHUTDOWN COMPLETE");
780 Ok(())
781 }
782
783 #[cfg(feature = "session-client")]
786 pub async fn connect_to(
787 &self,
788 destination: Address,
789 target: SessionTarget,
790 cfg: SessionClientConfig,
791 ) -> errors::Result<HoprSession> {
792 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
793
794 let backoff = backon::ConstantBuilder::default()
795 .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
796 .with_delay(self.cfg.protocol.session.establish_retry_timeout)
797 .with_jitter();
798
799 use backon::Retryable;
800
801 Ok((|| {
802 let cfg = cfg.clone();
803 let target = target.clone();
804 async { self.transport_api.new_session(destination, target, cfg).await }
805 })
806 .retry(backoff)
807 .sleep(backon::FuturesTimerSleeper)
808 .await?)
809 }
810
811 #[cfg(feature = "session-client")]
814 pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
815 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
816 Ok(self.transport_api.probe_session(id).await?)
817 }
818
819 #[cfg(feature = "session-client")]
820 pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
821 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
822 Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
823 }
824
825 #[cfg(all(feature = "session-client", feature = "telemetry"))]
826 pub async fn get_session_stats(&self, id: &SessionId) -> errors::Result<SessionStatsSnapshot> {
827 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
828 Ok(self.transport_api.session_stats(id).await?)
829 }
830
831 #[cfg(feature = "session-client")]
832 pub async fn update_session_surb_balancer_config(
833 &self,
834 id: &SessionId,
835 cfg: SurbBalancerConfig,
836 ) -> errors::Result<()> {
837 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
838 Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
839 }
840
841 fn spawn_wait_for_on_chain_event(
844 &self,
845 context: impl std::fmt::Display,
846 predicate: impl Fn(&ChainEvent) -> bool + Send + Sync + 'static,
847 timeout: Duration,
848 ) -> errors::Result<(
849 impl Future<Output = errors::Result<ChainEvent>>,
850 hopr_async_runtime::AbortHandle,
851 )> {
852 debug!(%context, "registering wait for on-chain event");
853 let (event_stream, handle) = futures::stream::abortable(
854 self.chain_api
855 .subscribe()
856 .map_err(HoprLibError::chain)?
857 .skip_while(move |event| futures::future::ready(!predicate(event))),
858 );
859
860 let ctx = context.to_string();
861
862 Ok((
863 spawn(async move {
864 pin_mut!(event_stream);
865 let res = event_stream
866 .next()
867 .timeout(futures_time::time::Duration::from(timeout))
868 .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")))
869 .await?
870 .ok_or(HoprLibError::GeneralError(format!(
871 "failed to yield an on-chain event for {ctx}"
872 )));
873 debug!(%ctx, ?res, "on-chain event waiting done");
874 res
875 })
876 .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")))
877 .and_then(futures::future::ready),
878 handle,
879 ))
880 }
881}
882
883impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
884where
885 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
886 + hopr_api::graph::NetworkGraphUpdate
887 + Clone
888 + Send
889 + Sync
890 + 'static,
891 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
892{
893 pub fn collect_hopr_metrics() -> errors::Result<String> {
896 cfg_if::cfg_if! {
897 if #[cfg(all(feature = "prometheus", not(test)))] {
898 hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
899 } else {
900 Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
901 }
902 }
903 }
904}
905
906impl<Chain, Db, Graph, Net> HoprNodeOperations for Hopr<Chain, Db, Graph, Net>
909where
910 Chain: HoprChainApi + Clone + Send + Sync + 'static,
911 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
912 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
913 + hopr_api::graph::NetworkGraphUpdate
914 + Clone
915 + Send
916 + Sync
917 + 'static,
918 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
919{
920 fn status(&self) -> HoprState {
921 self.state.load(Ordering::Relaxed)
922 }
923}
924
925#[async_trait::async_trait]
926impl<Chain, Db, Graph, Net> HoprNodeNetworkOperations for Hopr<Chain, Db, Graph, Net>
927where
928 Chain: HoprChainApi + Clone + Send + Sync + 'static,
929 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
930 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
931 + hopr_api::graph::NetworkGraphUpdate
932 + Clone
933 + Send
934 + Sync
935 + 'static,
936 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
937{
938 type Error = HoprLibError;
939 type TransportObservable = Graph::Observed;
940
941 fn me_peer_id(&self) -> PeerId {
942 (*self.me.public()).into()
943 }
944
945 async fn get_public_nodes(&self) -> Result<Vec<(PeerId, Address, Vec<Multiaddr>)>, Self::Error> {
946 Ok(self
947 .chain_api
948 .stream_accounts(AccountSelector {
949 public_only: true,
950 ..Default::default()
951 })
952 .map_err(HoprLibError::chain)
953 .await?
954 .map(|entry| {
955 (
956 PeerId::from(entry.public_key),
957 entry.chain_addr,
958 entry.get_multiaddrs().to_vec(),
959 )
960 })
961 .collect()
962 .await)
963 }
964
965 async fn network_health(&self) -> hopr_api::network::Health {
966 self.transport_api.network_health().await
967 }
968
969 async fn network_connected_peers(&self) -> Result<Vec<PeerId>, Self::Error> {
970 Ok(self
971 .transport_api
972 .network_connected_peers()
973 .await?
974 .into_iter()
975 .map(PeerId::from)
976 .collect())
977 }
978
979 fn network_peer_info(&self, peer: &PeerId) -> Option<Self::TransportObservable> {
980 let pubkey = OffchainPublicKey::from_peerid(peer).ok()?;
981 self.transport_api.network_peer_observations(&pubkey)
982 }
983
984 async fn all_network_peers(
985 &self,
986 minimum_score: f64,
987 ) -> Result<Vec<(Option<Address>, PeerId, Self::TransportObservable)>, Self::Error> {
988 Ok(
989 futures::stream::iter(self.transport_api.all_network_peers(minimum_score).await?)
990 .filter_map(|(pubkey, info)| async move {
991 let peer_id = PeerId::from(pubkey);
992 let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
993 Some((address, peer_id, info))
994 })
995 .collect::<Vec<_>>()
996 .await,
997 )
998 }
999
1000 fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1001 self.transport_api.local_multiaddresses()
1002 }
1003
1004 async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1005 self.transport_api.listening_multiaddresses().await
1006 }
1007
1008 async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1009 let Ok(pubkey) = hopr_transport::peer_id_to_public_key(peer).await else {
1010 return vec![];
1011 };
1012 self.transport_api.network_observed_multiaddresses(&pubkey).await
1013 }
1014
1015 async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Result<Vec<Multiaddr>, Self::Error> {
1016 let pubkey = hopr_transport::peer_id_to_public_key(peer)
1017 .await
1018 .map_err(HoprLibError::TransportError)?;
1019
1020 match self
1021 .chain_api
1022 .stream_accounts(AccountSelector {
1023 public_only: false,
1024 offchain_key: Some(pubkey),
1025 ..Default::default()
1026 })
1027 .map_err(HoprLibError::chain)
1028 .await?
1029 .next()
1030 .await
1031 {
1032 Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
1033 None => {
1034 error!(%peer, "no information");
1035 Ok(vec![])
1036 }
1037 }
1038 }
1039
1040 async fn ping(&self, peer: &PeerId) -> Result<(Duration, Self::TransportObservable), Self::Error> {
1041 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1042 let pubkey = hopr_transport::peer_id_to_public_key(peer)
1043 .await
1044 .map_err(HoprLibError::TransportError)?;
1045 Ok(self.transport_api.ping(&pubkey).await?)
1046 }
1047}
1048
1049pub type SinkMap = SinkMapErr<futures::channel::mpsc::Sender<TicketSelector>, fn(SendError) -> HoprLibError>;
1050
1051impl<Chain, Db, Graph, Net> Hopr<Chain, Db, Graph, Net>
1052where
1053 Chain: HoprChainApi + Clone + Send + Sync + 'static,
1054 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
1055 Graph: hopr_api::graph::NetworkGraphView<NodeId = OffchainPublicKey>
1056 + hopr_api::graph::NetworkGraphUpdate
1057 + Clone
1058 + Send
1059 + Sync
1060 + 'static,
1061 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
1062{
1063 pub fn me_onchain(&self) -> Address {
1064 *self.chain_api.me()
1065 }
1066
1067 pub fn get_safe_config(&self) -> SafeModuleConfig {
1068 SafeModuleConfig {
1069 safe_address: self.cfg.safe_module.safe_address,
1070 module_address: self.cfg.safe_module.module_address,
1071 }
1072 }
1073
1074 pub async fn get_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1075 self.chain_api
1076 .balance(self.me_onchain())
1077 .await
1078 .map_err(HoprLibError::chain)
1079 }
1080
1081 pub async fn get_safe_balance<C: Currency + Send>(&self) -> Result<Balance<C>, HoprLibError> {
1082 self.chain_api
1083 .balance(self.cfg.safe_module.safe_address)
1084 .await
1085 .map_err(HoprLibError::chain)
1086 }
1087
1088 pub async fn safe_allowance(&self) -> Result<HoprBalance, HoprLibError> {
1089 self.chain_api
1090 .safe_allowance(self.cfg.safe_module.safe_address)
1091 .await
1092 .map_err(HoprLibError::chain)
1093 }
1094
1095 pub async fn chain_info(&self) -> Result<ChainInfo, HoprLibError> {
1096 self.chain_api.chain_info().await.map_err(HoprLibError::chain)
1097 }
1098
1099 pub async fn get_ticket_price(&self) -> Result<HoprBalance, HoprLibError> {
1100 self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1101 }
1102
1103 pub async fn get_minimum_incoming_ticket_win_probability(&self) -> Result<WinningProbability, HoprLibError> {
1104 self.chain_api
1105 .minimum_incoming_ticket_win_prob()
1106 .await
1107 .map_err(HoprLibError::chain)
1108 }
1109
1110 pub async fn get_channel_closure_notice_period(&self) -> Result<Duration, HoprLibError> {
1111 self.chain_api
1112 .channel_closure_notice_period()
1113 .await
1114 .map_err(HoprLibError::chain)
1115 }
1116
1117 pub async fn accounts_announced_on_chain(&self) -> Result<Vec<AccountEntry>, HoprLibError> {
1118 Ok(self
1119 .chain_api
1120 .stream_accounts(AccountSelector {
1121 public_only: true,
1122 ..Default::default()
1123 })
1124 .map_err(HoprLibError::chain)
1125 .await?
1126 .collect()
1127 .await)
1128 }
1129
1130 pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> Result<Option<Address>, HoprLibError> {
1131 let pubkey = hopr_transport::peer_id_to_public_key(peer_id)
1132 .await
1133 .map_err(HoprLibError::TransportError)?;
1134
1135 self.chain_api
1136 .packet_key_to_chain_key(&pubkey)
1137 .await
1138 .map_err(HoprLibError::chain)
1139 }
1140
1141 pub async fn chain_key_to_peerid(&self, address: &Address) -> Result<Option<PeerId>, HoprLibError> {
1142 self.chain_api
1143 .chain_key_to_packet_key(address)
1144 .await
1145 .map(|pk| pk.map(|v| v.into()))
1146 .map_err(HoprLibError::chain)
1147 }
1148
1149 pub async fn channel_from_hash(&self, channel_id: &Hash) -> Result<Option<ChannelEntry>, HoprLibError> {
1150 self.chain_api
1151 .channel_by_id(channel_id)
1152 .await
1153 .map_err(HoprLibError::chain)
1154 }
1155
1156 pub async fn channel(&self, src: &Address, dest: &Address) -> Result<Option<ChannelEntry>, HoprLibError> {
1157 self.chain_api
1158 .channel_by_parties(src, dest)
1159 .await
1160 .map_err(HoprLibError::chain)
1161 }
1162
1163 pub async fn channels_from(&self, src: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1164 Ok(self
1165 .chain_api
1166 .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1167 ChannelStatusDiscriminants::Closed,
1168 ChannelStatusDiscriminants::Open,
1169 ChannelStatusDiscriminants::PendingToClose,
1170 ]))
1171 .map_err(HoprLibError::chain)
1172 .await?
1173 .collect()
1174 .await)
1175 }
1176
1177 pub async fn channels_to(&self, dest: &Address) -> Result<Vec<ChannelEntry>, HoprLibError> {
1178 Ok(self
1179 .chain_api
1180 .stream_channels(
1181 ChannelSelector::default()
1182 .with_destination(*dest)
1183 .with_allowed_states(&[
1184 ChannelStatusDiscriminants::Closed,
1185 ChannelStatusDiscriminants::Open,
1186 ChannelStatusDiscriminants::PendingToClose,
1187 ]),
1188 )
1189 .map_err(HoprLibError::chain)
1190 .await?
1191 .collect()
1192 .await)
1193 }
1194
1195 pub async fn all_channels(&self) -> Result<Vec<ChannelEntry>, HoprLibError> {
1196 Ok(self
1197 .chain_api
1198 .stream_channels(ChannelSelector::default().with_allowed_states(&[
1199 ChannelStatusDiscriminants::Closed,
1200 ChannelStatusDiscriminants::Open,
1201 ChannelStatusDiscriminants::PendingToClose,
1202 ]))
1203 .map_err(HoprLibError::chain)
1204 .await?
1205 .collect()
1206 .await)
1207 }
1208
1209 pub async fn open_channel(
1210 &self,
1211 destination: &Address,
1212 amount: HoprBalance,
1213 ) -> Result<OpenChannelResult, HoprLibError> {
1214 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1215
1216 let channel_id = generate_channel_id(&self.me_onchain(), destination);
1217
1218 let confirm_awaiter = self
1219 .chain_api
1220 .open_channel(destination, amount)
1221 .await
1222 .map_err(HoprLibError::chain)?;
1223
1224 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1225 format!("open channel to {destination} ({channel_id})"),
1226 move |event| matches!(event, ChainEvent::ChannelOpened(c) if c.get_id() == &channel_id),
1227 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1228 )?;
1229
1230 let tx_hash = confirm_awaiter.await.map_err(|e| {
1231 event_abort.abort();
1232 HoprLibError::chain(e)
1233 })?;
1234
1235 let event = event_awaiter.await?;
1236 debug!(%event, "open channel event received");
1237
1238 Ok(OpenChannelResult { tx_hash, channel_id })
1239 }
1240
1241 pub async fn fund_channel(&self, channel_id: &ChannelId, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1242 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1243
1244 let channel_id = *channel_id;
1245
1246 let confirm_awaiter = self
1247 .chain_api
1248 .fund_channel(&channel_id, amount)
1249 .await
1250 .map_err(HoprLibError::chain)?;
1251
1252 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1253 format!("fund channel {channel_id}"),
1254 move |event| matches!(event, ChainEvent::ChannelBalanceIncreased(c, a) if c.get_id() == &channel_id && a == &amount),
1255 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT
1256 )?;
1257
1258 let res = confirm_awaiter.await.map_err(|e| {
1259 event_abort.abort();
1260 HoprLibError::chain(e)
1261 })?;
1262
1263 let event = event_awaiter.await?;
1264 debug!(%event, "fund channel event received");
1265
1266 Ok(res)
1267 }
1268
1269 pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> Result<CloseChannelResult, HoprLibError> {
1270 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1271
1272 let channel_id = *channel_id;
1273
1274 let confirm_awaiter = self
1275 .chain_api
1276 .close_channel(&channel_id)
1277 .await
1278 .map_err(HoprLibError::chain)?;
1279
1280 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1281 format!("close channel {channel_id}"),
1282 move |event| {
1283 matches!(event, ChainEvent::ChannelClosed(c) if c.get_id() == &channel_id)
1284 || matches!(event, ChainEvent::ChannelClosureInitiated(c) if c.get_id() == &channel_id)
1285 },
1286 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1287 )?;
1288
1289 let tx_hash = confirm_awaiter.await.map_err(|e| {
1290 event_abort.abort();
1291 HoprLibError::chain(e)
1292 })?;
1293
1294 let event = event_awaiter.await?;
1295 debug!(%event, "close channel event received");
1296
1297 Ok(CloseChannelResult { tx_hash })
1298 }
1299
1300 pub async fn tickets_in_channel(
1301 &self,
1302 channel_id: &ChannelId,
1303 ) -> Result<Option<Vec<RedeemableTicket>>, HoprLibError> {
1304 if let Some(channel) = self
1305 .chain_api
1306 .channel_by_id(channel_id)
1307 .await
1308 .map_err(|e| HoprTransportError::Other(e.into()))?
1309 {
1310 if &channel.destination == self.chain_api.me() {
1311 Ok(Some(
1312 self.node_db
1313 .stream_tickets([&channel])
1314 .await
1315 .map_err(HoprLibError::db)?
1316 .collect()
1317 .await,
1318 ))
1319 } else {
1320 Ok(None)
1321 }
1322 } else {
1323 Ok(None)
1324 }
1325 }
1326
1327 pub async fn all_tickets(&self) -> Result<Vec<VerifiedTicket>, HoprLibError> {
1328 Ok(self
1329 .node_db
1330 .stream_tickets(None::<TicketSelector>)
1331 .await
1332 .map_err(HoprLibError::db)?
1333 .map(|v| v.ticket)
1334 .collect()
1335 .await)
1336 }
1337
1338 pub async fn ticket_statistics(&self) -> Result<ChannelTicketStatistics, HoprLibError> {
1339 self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
1340 }
1341
1342 pub async fn reset_ticket_statistics(&self) -> Result<(), HoprLibError> {
1343 self.node_db
1344 .reset_ticket_statistics()
1345 .await
1346 .map_err(HoprLibError::chain)
1347 }
1348
1349 pub async fn redeem_all_tickets<B: Into<HoprBalance> + Send>(&self, min_value: B) -> Result<(), HoprLibError> {
1350 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1351
1352 let min_value = min_value.into();
1353
1354 self.chain_api
1355 .stream_channels(
1356 ChannelSelector::default()
1357 .with_destination(self.me_onchain())
1358 .with_allowed_states(&[
1359 ChannelStatusDiscriminants::Open,
1360 ChannelStatusDiscriminants::PendingToClose,
1361 ]),
1362 )
1363 .map_err(HoprLibError::chain)
1364 .await?
1365 .map(|channel| {
1366 Ok(TicketSelector::from(&channel)
1367 .with_amount(min_value..)
1368 .with_index_range(channel.ticket_index..)
1369 .with_state(AcknowledgedTicketStatus::Untouched))
1370 })
1371 .forward(self.redemption_requests()?)
1372 .await?;
1373
1374 Ok(())
1375 }
1376
1377 pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance> + Send>(
1378 &self,
1379 counterparty: &Address,
1380 min_value: B,
1381 ) -> Result<(), HoprLibError> {
1382 self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1383 .await
1384 }
1385
1386 pub async fn redeem_tickets_in_channel<B: Into<HoprBalance> + Send>(
1387 &self,
1388 channel_id: &Hash,
1389 min_value: B,
1390 ) -> Result<(), HoprLibError> {
1391 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1392
1393 let channel = self
1394 .chain_api
1395 .channel_by_id(channel_id)
1396 .await
1397 .map_err(HoprLibError::chain)?
1398 .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1399
1400 self.redemption_requests()?
1401 .send(
1402 TicketSelector::from(channel)
1403 .with_amount(min_value.into()..)
1404 .with_index_range(channel.ticket_index..)
1405 .with_state(AcknowledgedTicketStatus::Untouched),
1406 )
1407 .await?;
1408
1409 Ok(())
1410 }
1411
1412 pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> Result<(), HoprLibError> {
1413 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1414
1415 self.redemption_requests()?
1416 .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1417 .await?;
1418
1419 Ok(())
1420 }
1421
1422 pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send + 'static {
1423 self.winning_ticket_subscribers.1.activate_cloned()
1424 }
1425
1426 pub fn redemption_requests(&self) -> Result<SinkMap, HoprLibError> {
1427 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1428
1429 Ok(self
1430 .redeem_requests
1431 .get()
1432 .cloned()
1433 .expect("redeem_requests is not initialized")
1434 .sink_map_err(|e| HoprLibError::GeneralError(format!("failed to send redemption request: {e}"))))
1435 }
1436
1437 pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> Result<Hash, HoprLibError> {
1438 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1439
1440 self.chain_api
1441 .withdraw(amount, &recipient)
1442 .and_then(identity)
1443 .map_err(HoprLibError::chain)
1444 .await
1445 }
1446
1447 pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> Result<Hash, HoprLibError> {
1448 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1449
1450 self.chain_api
1451 .withdraw(amount, &recipient)
1452 .and_then(identity)
1453 .map_err(HoprLibError::chain)
1454 .await
1455 }
1456}