1mod helpers;
18
19pub mod config;
21pub mod constants;
23pub mod errors;
25
26pub mod utils;
28
29pub mod traits;
31
32pub mod state;
34
35#[cfg(any(feature = "testing", test))]
36pub mod testing;
37
38pub use hopr_api as api;
39
40#[doc(hidden)]
42pub mod exports {
43 pub mod types {
44 pub use hopr_internal_types as internal;
45 pub use hopr_primitive_types as primitive;
46 }
47
48 pub mod crypto {
49 pub use hopr_crypto_keypair as keypair;
50 pub use hopr_crypto_types as types;
51 }
52
53 pub mod network {
54 pub use hopr_network_types as types;
55 }
56
57 pub use hopr_transport as transport;
58}
59
60#[doc(hidden)]
62pub mod prelude {
63 pub use super::exports::{
64 crypto::{
65 keypair::key_pair::HoprKeys,
66 types::prelude::{ChainKeypair, Hash, OffchainKeypair},
67 },
68 network::types::{
69 prelude::ForeignDataMode,
70 udp::{ConnectedUdpStream, UdpStreamParallelism},
71 },
72 transport::{OffchainPublicKey, socket::HoprSocket},
73 types::primitive::prelude::Address,
74 };
75}
76
77use std::{
78 convert::identity,
79 future::Future,
80 num::NonZeroUsize,
81 sync::{Arc, OnceLock, atomic::Ordering},
82 time::Duration,
83};
84
85use futures::{FutureExt, SinkExt, Stream, StreamExt, TryFutureExt, channel::mpsc::channel, pin_mut};
86use futures_time::future::FutureExt as FuturesTimeFutureExt;
87use hopr_api::{
88 chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
89 ct::TrafficGeneration,
90 db::{HoprNodeDbApi, TicketMarker, TicketSelector},
91};
92pub use hopr_api::{db::ChannelTicketStatistics, network::Observable};
93use hopr_async_runtime::prelude::spawn;
94pub use hopr_async_runtime::{Abortable, AbortableList};
95pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
96pub use hopr_crypto_types::prelude::*;
97pub use hopr_internal_types::prelude::*;
98pub use hopr_network_types::prelude::*;
99#[cfg(all(feature = "prometheus", not(test)))]
100use hopr_platform::time::native::current_time;
101pub use hopr_primitive_types::prelude::*;
102use hopr_transport::errors::HoprTransportError;
103#[cfg(feature = "runtime-tokio")]
104pub use hopr_transport::transfer_session;
105pub use hopr_transport::*;
106use tracing::{debug, error, info, warn};
107use validator::Validate;
108
109pub use crate::{
110 config::SafeModule,
111 constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
112 errors::{HoprLibError, HoprStatusError},
113 state::{HoprLibProcess, HoprState},
114 traits::chain::{CloseChannelResult, OpenChannelResult},
115};
116
117#[cfg(all(feature = "prometheus", not(test)))]
118lazy_static::lazy_static! {
119 static ref METRIC_PROCESS_START_TIME: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
120 "hopr_start_time",
121 "The unix timestamp in seconds at which the process was started"
122 ).unwrap();
123 static ref METRIC_HOPR_LIB_VERSION: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
124 "hopr_lib_version",
125 "Executed version of hopr-lib",
126 &["version"]
127 ).unwrap();
128 static ref METRIC_HOPR_NODE_INFO: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
129 "hopr_node_addresses",
130 "Node on-chain and off-chain addresses",
131 &["peerid", "address", "safe_address", "module_address"]
132 ).unwrap();
133}
134
135#[cfg(feature = "runtime-tokio")]
140pub fn prepare_tokio_runtime(
141 num_cpu_threads: Option<NonZeroUsize>,
142 num_io_threads: Option<NonZeroUsize>,
143) -> anyhow::Result<tokio::runtime::Runtime> {
144 use std::str::FromStr;
145 let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
146
147 hopr_parallelize::cpu::init_thread_pool(
148 num_cpu_threads
149 .map(|v| v.get())
150 .or(avail_parallelism)
151 .ok_or(anyhow::anyhow!(
152 "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
153 environment variable."
154 ))?
155 .max(1),
156 )?;
157
158 Ok(tokio::runtime::Builder::new_multi_thread()
159 .enable_all()
160 .worker_threads(
161 num_io_threads
162 .map(|v| v.get())
163 .or(avail_parallelism)
164 .ok_or(anyhow::anyhow!(
165 "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
166 environment variable."
167 ))?
168 .max(1),
169 )
170 .thread_name("hoprd")
171 .thread_stack_size(
172 std::env::var("HOPRD_THREAD_STACK_SIZE")
173 .ok()
174 .and_then(|v| usize::from_str(&v).ok())
175 .unwrap_or(10 * 1024 * 1024)
176 .max(2 * 1024 * 1024),
177 )
178 .build()?)
179}
180
181pub type HoprTransportIO = socket::HoprSocket<
183 futures::channel::mpsc::Receiver<ApplicationDataIn>,
184 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
185>;
186
187type NewTicketEvents = (
188 async_broadcast::Sender<VerifiedTicket>,
189 async_broadcast::InactiveReceiver<VerifiedTicket>,
190);
191
192const NODE_READY_TIMEOUT: Duration = Duration::from_secs(30);
194
195const ON_CHAIN_RESOLUTION_EVENT_TIMEOUT: Duration = Duration::from_secs(90);
198
199pub struct Hopr<Chain, Db> {
211 me: OffchainKeypair,
212 cfg: config::HoprLibConfig,
213 state: Arc<state::AtomicHoprState>,
214 transport_api: HoprTransport<Chain, Db>,
215 redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
216 node_db: Db,
217 chain_api: Chain,
218 winning_ticket_subscribers: NewTicketEvents,
219 processes: OnceLock<AbortableList<HoprLibProcess>>,
220}
221
222impl<Chain, Db> Hopr<Chain, Db>
223where
224 Chain: HoprChainApi + Clone + Send + Sync + 'static,
225 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
226{
227 pub async fn new(
228 identity: (&ChainKeypair, &OffchainKeypair),
229 hopr_chain_api: Chain,
230 hopr_node_db: Db,
231 cfg: config::HoprLibConfig,
232 ) -> errors::Result<Self> {
233 if hopr_crypto_random::is_rng_fixed() {
234 warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
235 }
236
237 cfg.validate()?;
238
239 let hopr_transport_api = HoprTransport::new(
240 identity,
241 hopr_chain_api.clone(),
242 hopr_node_db.clone(),
243 vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
244 cfg.protocol,
245 );
246
247 #[cfg(all(feature = "prometheus", not(test)))]
248 {
249 METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
250 METRIC_HOPR_LIB_VERSION.set(
251 &[const_format::formatcp!("{}", constants::APP_VERSION)],
252 const_format::formatcp!(
253 "{}.{}",
254 env!("CARGO_PKG_VERSION_MAJOR"),
255 env!("CARGO_PKG_VERSION_MINOR")
256 )
257 .parse()
258 .unwrap_or(0.0),
259 );
260
261 if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
263 error!(%error, "failed to initialize ticket statistics metrics");
264 }
265 }
266
267 let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
268 new_tickets_tx.set_await_active(false);
269 new_tickets_tx.set_overflow(true);
270
271 Ok(Self {
272 me: identity.1.clone(),
273 cfg,
274 state: Arc::new(state::AtomicHoprState::new(HoprState::Uninitialized)),
275 transport_api: hopr_transport_api,
276 chain_api: hopr_chain_api,
277 node_db: hopr_node_db,
278 redeem_requests: OnceLock::new(),
279 processes: OnceLock::new(),
280 winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
281 })
282 }
283
284 fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
285 if self.status() == state {
286 Ok(())
287 } else {
288 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
289 }
290 }
291
292 pub fn status(&self) -> HoprState {
293 self.state.load(Ordering::Relaxed)
294 }
295
296 pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
297 self.chain_api
298 .balance(self.me_onchain())
299 .await
300 .map_err(HoprLibError::chain)
301 }
302
303 pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
304 self.chain_api
305 .balance(self.cfg.safe_module.safe_address)
306 .await
307 .map_err(HoprLibError::chain)
308 }
309
310 pub async fn chain_info(&self) -> errors::Result<ChainInfo> {
311 self.chain_api.chain_info().await.map_err(HoprLibError::chain)
312 }
313
314 pub fn get_safe_config(&self) -> SafeModule {
315 self.cfg.safe_module.clone()
316 }
317
318 pub fn config(&self) -> &config::HoprLibConfig {
319 &self.cfg
320 }
321
322 #[inline]
323 fn is_public(&self) -> bool {
324 self.cfg.publish
325 }
326
327 pub async fn run<
328 Ct,
329 #[cfg(feature = "session-server")] T: traits::session::HoprSessionServer + Clone + Send + 'static,
330 >(
331 &self,
332 cover_traffic: Ct,
333 #[cfg(feature = "session-server")] serve_handler: T,
334 ) -> errors::Result<HoprTransportIO>
335 where
336 Ct: TrafficGeneration + Send + Sync + 'static,
337 {
338 self.error_if_not_in_state(
339 HoprState::Uninitialized,
340 "cannot start the hopr node multiple times".into(),
341 )?;
342
343 #[cfg(feature = "testing")]
344 warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
345
346 info!(
347 address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
348 "node is not started, please fund this node",
349 );
350
351 helpers::wait_for_funds(
352 *MIN_NATIVE_BALANCE,
353 *SUGGESTED_NATIVE_BALANCE,
354 Duration::from_secs(200),
355 self.me_onchain(),
356 &self.chain_api,
357 )
358 .await?;
359
360 let mut processes = AbortableList::<HoprLibProcess>::default();
361
362 info!("starting HOPR node...");
363 self.state.store(HoprState::Initializing, Ordering::Relaxed);
364
365 let balance: XDaiBalance = self.get_balance().await?;
366 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
367
368 info!(
369 address = %self.me_onchain(),
370 %balance,
371 %minimum_balance,
372 "node information"
373 );
374
375 if balance.le(&minimum_balance) {
376 return Err(HoprLibError::GeneralError(
377 "cannot start the node without a sufficiently funded wallet".into(),
378 ));
379 }
380
381 let network_min_ticket_price = self
384 .chain_api
385 .minimum_ticket_price()
386 .await
387 .map_err(HoprLibError::chain)?;
388 let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
389 if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
390 return Err(HoprLibError::GeneralError(format!(
391 "configured outgoing ticket price is lower than the network minimum ticket price: \
392 {configured_ticket_price:?} < {network_min_ticket_price}"
393 )));
394 }
395 let network_min_win_prob = self
398 .chain_api
399 .minimum_incoming_ticket_win_prob()
400 .await
401 .map_err(HoprLibError::chain)?;
402 let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
403 if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
404 && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
405 {
406 return Err(HoprLibError::GeneralError(format!(
407 "configured outgoing ticket winning probability is lower than the network minimum winning \
408 probability: {configured_win_prob:?} < {network_min_win_prob}"
409 )));
410 }
411
412 let minimum_capacity = self
415 .chain_api
416 .count_accounts(AccountSelector {
417 public_only: true,
418 ..Default::default()
419 })
420 .await
421 .map_err(HoprLibError::chain)?
422 .saturating_mul(2)
423 .saturating_add(100);
424
425 let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
426 .ok()
427 .and_then(|s| s.trim().parse::<usize>().ok())
428 .filter(|&c| c > 0)
429 .unwrap_or(2048)
430 .max(minimum_capacity);
431
432 debug!(
433 capacity = chain_discovery_events_capacity,
434 minimum_required = minimum_capacity,
435 "creating chain discovery events channel"
436 );
437 let (indexer_peer_update_tx, indexer_peer_update_rx) =
438 channel::<PeerDiscovery>(chain_discovery_events_capacity);
439
440 let (announcements_stream, announcements_handle) = futures::stream::abortable(
443 self.chain_api
444 .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])
445 .map_err(HoprLibError::chain)?,
446 );
447 processes.insert(HoprLibProcess::AccountAnnouncements, announcements_handle);
448
449 spawn(
450 announcements_stream
451 .filter_map(|event| {
452 futures::future::ready(event.try_as_announcement().map(|account| {
453 PeerDiscovery::Announce(account.public_key.into(), account.get_multiaddrs().to_vec())
454 }))
455 })
456 .map(Ok)
457 .forward(indexer_peer_update_tx)
458 .inspect(
459 |_| warn!(task = %HoprLibProcess::AccountAnnouncements,"long-running background task finished"),
460 ),
461 );
462
463 info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
464
465 let safe_addr = self.cfg.safe_module.safe_address;
466
467 if self.me_onchain() == safe_addr {
468 return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
469 }
470
471 info!(%safe_addr, "registering safe with this node");
472 match self.chain_api.register_safe(&safe_addr).await {
473 Ok(awaiter) => {
474 awaiter.await.map_err(|error| {
476 error!(%safe_addr, %error, "safe registration failed with error");
477 HoprLibError::chain(error)
478 })?;
479 info!(%safe_addr, "safe successfully registered with this node");
480 }
481 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
482 info!(%safe_addr, "this safe is already registered with this node");
483 }
484 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
485 error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
487 return Err(HoprLibError::GeneralError("node registered with different safe".into()));
488 }
489 Err(error) => {
490 error!(%safe_addr, %error, "safe registration failed");
491 return Err(HoprLibError::chain(error));
492 }
493 }
494
495 let multiaddresses_to_announce = if self.is_public() {
497 self.transport_api.announceable_multiaddresses()
500 } else {
501 Vec::with_capacity(0)
502 };
503
504 multiaddresses_to_announce
506 .iter()
507 .filter(|a| !is_public_address(a))
508 .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
509
510 let chain_api = self.chain_api.clone();
511 let me_offchain = *self.me.public();
512 let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
513
514 info!(?multiaddresses_to_announce, "announcing node on chain");
517 match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
518 Ok(awaiter) => {
519 awaiter.await.map_err(|error| {
521 error!(?multiaddresses_to_announce, %error, "node announcement failed");
522 HoprLibError::chain(error)
523 })?;
524 info!(?multiaddresses_to_announce, "node has been successfully announced");
525 }
526 Err(AnnouncementError::AlreadyAnnounced) => {
527 info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
528 }
529 Err(error) => {
530 error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
531 return Err(HoprLibError::chain(error));
532 }
533 }
534
535 let this_node_account = node_ready
537 .await
538 .map_err(HoprLibError::other)?
539 .map_err(HoprLibError::chain)?;
540 if this_node_account.chain_addr != self.me_onchain()
541 || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
542 {
543 error!(%this_node_account, "account bound to offchain key does not match this node");
544 return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
545 }
546
547 info!(%this_node_account, "node account is ready");
548
549 let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
550 .ok()
551 .and_then(|s| s.trim().parse::<usize>().ok())
552 .filter(|&c| c > 0)
553 .unwrap_or(256);
554
555 let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
556 #[cfg(feature = "session-server")]
557 {
558 debug!(capacity = incoming_session_channel_capacity, "creating session server");
559 processes.insert(
560 HoprLibProcess::SessionServer,
561 hopr_async_runtime::spawn_as_abortable!(
562 _session_rx
563 .for_each_concurrent(None, move |session| {
564 let serve_handler = serve_handler.clone();
565 async move {
566 let session_id = *session.session.id();
567 match serve_handler.process(session).await {
568 Ok(_) => debug!(?session_id, "client session processed successfully"),
569 Err(error) => error!(
570 ?session_id,
571 %error,
572 "client session processing failed"
573 ),
574 }
575 }
576 })
577 .inspect(|_| tracing::warn!(
578 task = %HoprLibProcess::SessionServer,
579 "long-running background task finished"
580 ))
581 ),
582 );
583 }
584
585 info!("starting ticket events processor");
586 let (tickets_tx, tickets_rx) = channel(1024);
587 let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
588 processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
589 let node_db = self.node_db.clone();
590 let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
591 spawn(
592 tickets_rx
593 .filter_map(move |ticket_event| {
594 let node_db = node_db.clone();
595 async move {
596 match ticket_event {
597 TicketEvent::WinningTicket(winning) => {
598 if let Err(error) = node_db.insert_ticket(*winning).await {
599 tracing::error!(%error, %winning, "failed to insert ticket into database");
600 } else {
601 tracing::debug!(%winning, "inserted ticket into database");
602 }
603 Some(winning)
604 }
605 TicketEvent::RejectedTicket(rejected, issuer) => {
606 if let Some(issuer) = &issuer {
607 if let Err(error) =
608 node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
609 {
610 tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
611 } else {
612 tracing::debug!(%rejected, "marked ticket as rejected");
613 }
614 } else {
615 tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
616 }
617 None
618 }
619 }
620 }
621 })
622 .for_each(move |ticket| {
623 if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
624 tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
625 }
626 futures::future::ready(())
627 })
628 .inspect(|_| {
629 tracing::warn!(
630 task = %HoprLibProcess::TicketEvents,
631 "long-running background task finished"
632 )
633 }),
634 );
635
636 info!("starting transport");
637 let (hopr_socket, transport_processes) = self
638 .transport_api
639 .run(cover_traffic, indexer_peer_update_rx, tickets_tx, session_tx)
640 .await?;
641 processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
642
643 let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
645 let _ = self.redeem_requests.set(redemption_req_tx);
646 let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
647 processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
648 let chain = self.chain_api.clone();
649 let node_db = self.node_db.clone();
650 spawn(redemption_req_rx
651 .for_each(move |selector| {
652 let chain = chain.clone();
653 let db = node_db.clone();
654 async move {
655 match chain.redeem_tickets_via_selectors(&db, [selector]).await {
656 Ok(res) => debug!(%res, "redemption complete"),
657 Err(error) => error!(%error, "redemption failed"),
658 }
659 }
660 })
661 .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
662 );
663
664 let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
665 processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
666 let chain = self.chain_api.clone();
667 let node_db = self.node_db.clone();
668 let events = chain.subscribe().map_err(HoprLibError::chain)?;
669 spawn(
670 futures::stream::Abortable::new(
671 events
672 .filter_map(move |event|
673 futures::future::ready(event.try_as_channel_closed())
674 ),
675 chain_events_sub_reg
676 )
677 .for_each(move |closed_channel| {
678 let node_db = node_db.clone();
679 let chain = chain.clone();
680 async move {
681 match closed_channel.direction(chain.me()) {
682 Some(ChannelDirection::Incoming) => {
683 match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
684 Ok(num_neglected) if num_neglected > 0 => {
685 warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
686 },
687 Ok(_) => {
688 debug!(%closed_channel, "no neglected tickets on incoming closed channel");
689 },
690 Err(error) => {
691 error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
692 }
693 }
694 },
695 Some(ChannelDirection::Outgoing) => {
696 if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
697 error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
698 } else {
699 debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
700 }
701 }
702 _ => {} }
704 }
705 })
706 .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
707 );
708
709 let mut channels = self
714 .chain_api
715 .stream_channels(ChannelSelector {
716 destination: self.me_onchain().into(),
717 ..Default::default()
718 })
719 .map_err(HoprLibError::chain)
720 .await?;
721
722 while let Some(channel) = channels.next().await {
723 self.node_db
724 .update_ticket_states_and_fetch(
725 [TicketSelector::from(&channel)
726 .with_state(AcknowledgedTicketStatus::BeingRedeemed)
727 .with_index_range(channel.ticket_index..)],
728 AcknowledgedTicketStatus::Untouched,
729 )
730 .map_err(HoprLibError::db)
731 .await?
732 .for_each(|ticket| {
733 info!(%ticket, "fixed next out-of-sync ticket");
734 futures::future::ready(())
735 })
736 .await;
737 }
738
739 self.state.store(HoprState::Running, Ordering::Relaxed);
740
741 info!(
742 id = %self.me_peer_id(),
743 version = constants::APP_VERSION,
744 "NODE STARTED AND RUNNING"
745 );
746
747 #[cfg(all(feature = "prometheus", not(test)))]
748 METRIC_HOPR_NODE_INFO.set(
749 &[
750 &self.me.public().to_peerid_str(),
751 &self.me_onchain().to_string(),
752 &self.cfg.safe_module.safe_address.to_string(),
753 &self.cfg.safe_module.module_address.to_string(),
754 ],
755 1.0,
756 );
757
758 let _ = self.processes.set(processes);
759 Ok(hopr_socket)
760 }
761
762 pub fn shutdown(&self) -> Result<(), HoprLibError> {
770 self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
771 if let Some(processes) = self.processes.get() {
772 processes.abort_all();
773 }
774 self.state.store(HoprState::Terminated, Ordering::Relaxed);
775 info!("NODE SHUTDOWN COMPLETE");
776 Ok(())
777 }
778
779 pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send {
781 self.winning_ticket_subscribers.1.activate_cloned()
782 }
783
784 pub fn me_peer_id(&self) -> PeerId {
787 (*self.me.public()).into()
788 }
789
790 pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
792 Ok(self
793 .chain_api
794 .stream_accounts(AccountSelector {
795 public_only: true,
796 ..Default::default()
797 })
798 .map_err(HoprLibError::chain)
799 .await?
800 .map(|entry| {
801 (
802 PeerId::from(entry.public_key),
803 entry.chain_addr,
804 entry.get_multiaddrs().to_vec(),
805 )
806 })
807 .collect()
808 .await)
809 }
810
811 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, Observations)> {
815 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
816
817 Ok(self.transport_api.ping(peer).await?)
818 }
819
820 #[cfg(feature = "session-client")]
823 pub async fn connect_to(
824 &self,
825 destination: Address,
826 target: SessionTarget,
827 cfg: SessionClientConfig,
828 ) -> errors::Result<HoprSession> {
829 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
830
831 let backoff = backon::ConstantBuilder::default()
832 .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
833 .with_delay(self.cfg.protocol.session.establish_retry_timeout)
834 .with_jitter();
835
836 use backon::Retryable;
837
838 Ok((|| {
839 let cfg = cfg.clone();
840 let target = target.clone();
841 async { self.transport_api.new_session(destination, target, cfg).await }
842 })
843 .retry(backoff)
844 .sleep(backon::FuturesTimerSleeper)
845 .await?)
846 }
847
848 #[cfg(feature = "session-client")]
851 pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
852 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
853 Ok(self.transport_api.probe_session(id).await?)
854 }
855
856 #[cfg(feature = "session-client")]
857 pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
858 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
859 Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
860 }
861
862 #[cfg(feature = "session-client")]
863 pub async fn update_session_surb_balancer_config(
864 &self,
865 id: &SessionId,
866 cfg: SurbBalancerConfig,
867 ) -> errors::Result<()> {
868 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
869 Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
870 }
871
872 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
874 self.transport_api.local_multiaddresses()
875 }
876
877 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
879 self.transport_api.listening_multiaddresses().await
880 }
881
882 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
884 self.transport_api.network_observed_multiaddresses(peer).await
885 }
886
887 pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> errors::Result<Vec<Multiaddr>> {
889 let peer = *peer;
890 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
892
893 match self
894 .chain_api
895 .stream_accounts(AccountSelector {
896 public_only: false,
897 offchain_key: Some(pubkey),
898 ..Default::default()
899 })
900 .map_err(HoprLibError::chain)
901 .await?
902 .next()
903 .await
904 {
905 Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
906 None => {
907 error!(%peer, "no information");
908 Ok(vec![])
909 }
910 }
911 }
912
913 pub async fn network_health(&self) -> Health {
917 self.transport_api.network_health().await
918 }
919
920 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
922 Ok(self.transport_api.network_connected_peers().await?)
923 }
924
925 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<Observations>> {
927 Ok(self.transport_api.network_peer_observations(peer).await?)
928 }
929
930 pub async fn all_network_peers(
932 &self,
933 minimum_score: f64,
934 ) -> errors::Result<Vec<(Option<Address>, PeerId, Observations)>> {
935 Ok(
936 futures::stream::iter(self.transport_api.network_connected_peers().await?)
937 .filter_map(|peer| async move {
938 if let Ok(Some(info)) = self.transport_api.network_peer_observations(&peer).await {
939 if info.score() >= minimum_score {
940 Some((peer, info))
941 } else {
942 None
943 }
944 } else {
945 None
946 }
947 })
948 .filter_map(|(peer_id, info)| async move {
949 let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
950 Some((address, peer_id, info))
951 })
952 .collect::<Vec<_>>()
953 .await,
954 )
955 }
956
957 pub async fn tickets_in_channel(&self, channel_id: &ChannelId) -> errors::Result<Option<Vec<RedeemableTicket>>> {
960 if let Some(channel) = self
961 .chain_api
962 .channel_by_id(channel_id)
963 .await
964 .map_err(|e| HoprTransportError::Other(e.into()))?
965 {
966 if &channel.destination == self.chain_api.me() {
967 Ok(Some(
968 self.node_db
969 .stream_tickets([&channel])
970 .await
971 .map_err(HoprLibError::db)?
972 .collect()
973 .await,
974 ))
975 } else {
976 Ok(None)
977 }
978 } else {
979 Ok(None)
980 }
981 }
982
983 pub async fn all_tickets(&self) -> errors::Result<Vec<VerifiedTicket>> {
985 Ok(self
986 .node_db
987 .stream_tickets(None::<TicketSelector>)
988 .await
989 .map_err(HoprLibError::db)?
990 .map(|v| v.ticket)
991 .collect()
992 .await)
993 }
994
995 pub async fn ticket_statistics(&self) -> errors::Result<ChannelTicketStatistics> {
997 self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
998 }
999
1000 pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
1002 self.node_db
1003 .reset_ticket_statistics()
1004 .await
1005 .map_err(HoprLibError::chain)
1006 }
1007
1008 pub fn me_onchain(&self) -> Address {
1010 *self.chain_api.me()
1011 }
1012
1013 pub async fn get_ticket_price(&self) -> errors::Result<HoprBalance> {
1015 self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1016 }
1017
1018 pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
1020 self.chain_api
1021 .minimum_incoming_ticket_win_prob()
1022 .await
1023 .map_err(HoprLibError::chain)
1024 }
1025
1026 pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1028 Ok(self
1029 .chain_api
1030 .stream_accounts(AccountSelector {
1031 public_only: true,
1032 ..Default::default()
1033 })
1034 .map_err(HoprLibError::chain)
1035 .await?
1036 .collect()
1037 .await)
1038 }
1039
1040 pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1043 self.chain_api
1044 .channel_by_id(channel_id)
1045 .await
1046 .map_err(HoprLibError::chain)
1047 }
1048
1049 pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<Option<ChannelEntry>> {
1054 self.chain_api
1055 .channel_by_parties(src, dest)
1056 .await
1057 .map_err(HoprLibError::chain)
1058 }
1059
1060 pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1062 Ok(self
1063 .chain_api
1064 .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1065 ChannelStatusDiscriminants::Closed,
1066 ChannelStatusDiscriminants::Open,
1067 ChannelStatusDiscriminants::PendingToClose,
1068 ]))
1069 .map_err(HoprLibError::chain)
1070 .await?
1071 .collect()
1072 .await)
1073 }
1074
1075 pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1077 Ok(self
1078 .chain_api
1079 .stream_channels(
1080 ChannelSelector::default()
1081 .with_destination(*dest)
1082 .with_allowed_states(&[
1083 ChannelStatusDiscriminants::Closed,
1084 ChannelStatusDiscriminants::Open,
1085 ChannelStatusDiscriminants::PendingToClose,
1086 ]),
1087 )
1088 .map_err(HoprLibError::chain)
1089 .await?
1090 .collect()
1091 .await)
1092 }
1093
1094 pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1096 Ok(self
1097 .chain_api
1098 .stream_channels(ChannelSelector::default().with_allowed_states(&[
1099 ChannelStatusDiscriminants::Closed,
1100 ChannelStatusDiscriminants::Open,
1101 ChannelStatusDiscriminants::PendingToClose,
1102 ]))
1103 .map_err(HoprLibError::chain)
1104 .await?
1105 .collect()
1106 .await)
1107 }
1108
1109 pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1111 self.chain_api
1112 .safe_allowance(self.cfg.safe_module.safe_address)
1113 .await
1114 .map_err(HoprLibError::chain)
1115 }
1116
1117 pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1121 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1122
1123 self.chain_api
1124 .withdraw(amount, &recipient)
1125 .and_then(identity)
1126 .map_err(HoprLibError::chain)
1127 .await
1128 }
1129
1130 pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<prelude::Hash> {
1134 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1135
1136 self.chain_api
1137 .withdraw(amount, &recipient)
1138 .and_then(identity)
1139 .map_err(HoprLibError::chain)
1140 .await
1141 }
1142
1143 fn spawn_wait_for_on_chain_event(
1146 &self,
1147 context: impl std::fmt::Display,
1148 predicate: impl Fn(&ChainEvent) -> bool + Send + Sync + 'static,
1149 timeout: Duration,
1150 ) -> errors::Result<impl Future<Output = errors::Result<ChainEvent>>> {
1151 debug!(%context, "registering wait for on-chain event");
1152 let event_stream = self
1153 .chain_api
1154 .subscribe()
1155 .map_err(HoprLibError::chain)?
1156 .skip_while(move |event| futures::future::ready(!predicate(event)));
1157
1158 let ctx = context.to_string();
1159 Ok(spawn(async move {
1160 pin_mut!(event_stream);
1161 let res = event_stream
1162 .next()
1163 .timeout(futures_time::time::Duration::from(timeout))
1164 .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")))
1165 .await?
1166 .ok_or(HoprLibError::GeneralError(format!(
1167 "failed to yield an on-chain event for {ctx}"
1168 )));
1169 debug!(%ctx, ?res, "on-chain event waiting done");
1170 res
1171 })
1172 .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")))
1173 .and_then(futures::future::ready))
1174 }
1175
1176 pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1177 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1178
1179 let channel_id = generate_channel_id(&self.me_onchain(), destination);
1180
1181 let event_awaiter = self.spawn_wait_for_on_chain_event(
1182 format!("open channel to {destination} ({channel_id})"),
1183 move |event| matches!(event, ChainEvent::ChannelOpened(c) if c.get_id() == &channel_id),
1184 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1185 )?;
1186
1187 let tx_hash = self
1188 .chain_api
1189 .open_channel(destination, amount)
1190 .and_then(identity)
1191 .map_err(HoprLibError::chain)
1192 .await?;
1193
1194 let event = event_awaiter.await?;
1195 debug!(%event, "open channel event received");
1196
1197 Ok(OpenChannelResult { tx_hash, channel_id })
1198 }
1199
1200 pub async fn fund_channel(&self, channel_id: &ChannelId, amount: HoprBalance) -> errors::Result<Hash> {
1201 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1202
1203 let channel_id = *channel_id;
1204 let event_awaiter = self.spawn_wait_for_on_chain_event(
1205 format!("fund channel {channel_id}"),
1206 move |event| matches!(event, ChainEvent::ChannelBalanceIncreased(c, a) if c.get_id() == &channel_id && a == &amount),
1207 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT
1208 )?;
1209
1210 let res = self
1211 .chain_api
1212 .fund_channel(&channel_id, amount)
1213 .and_then(identity)
1214 .map_err(HoprLibError::chain)
1215 .await?;
1216
1217 let event = event_awaiter.await?;
1218 debug!(%event, "fund channel event received");
1219
1220 Ok(res)
1221 }
1222
1223 pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> errors::Result<CloseChannelResult> {
1224 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1225
1226 let channel_id = *channel_id;
1227 let event_awaiter = self.spawn_wait_for_on_chain_event(
1228 format!("close channel {channel_id}"),
1229 move |event| {
1230 matches!(event, ChainEvent::ChannelClosed(c) if c.get_id() == &channel_id)
1231 || matches!(event, ChainEvent::ChannelClosureInitiated(c) if c.get_id() == &channel_id)
1232 },
1233 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1234 )?;
1235
1236 let tx_hash = self
1237 .chain_api
1238 .close_channel(&channel_id)
1239 .and_then(identity)
1240 .map_err(HoprLibError::chain)
1241 .await?;
1242
1243 let event = event_awaiter.await?;
1244 debug!(%event, "close channel event received");
1245
1246 Ok(CloseChannelResult { tx_hash })
1247 }
1248
1249 pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1250 self.chain_api
1251 .channel_closure_notice_period()
1252 .await
1253 .map_err(HoprLibError::chain)
1254 }
1255
1256 pub fn redemption_requests(
1257 &self,
1258 ) -> errors::Result<impl futures::Sink<TicketSelector, Error = HoprLibError> + Clone> {
1259 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1260
1261 Ok(self
1263 .redeem_requests
1264 .get()
1265 .cloned()
1266 .expect("redeem_requests is not initialized")
1267 .sink_map_err(HoprLibError::other))
1268 }
1269
1270 pub async fn redeem_all_tickets<B: Into<HoprBalance>>(&self, min_value: B) -> errors::Result<()> {
1271 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1272
1273 let min_value = min_value.into();
1274
1275 self.chain_api
1276 .stream_channels(
1277 ChannelSelector::default()
1278 .with_destination(self.me_onchain())
1279 .with_allowed_states(&[
1280 ChannelStatusDiscriminants::Open,
1281 ChannelStatusDiscriminants::PendingToClose,
1282 ]),
1283 )
1284 .map_err(HoprLibError::chain)
1285 .await?
1286 .map(|channel| {
1287 Ok(TicketSelector::from(&channel)
1288 .with_amount(min_value..)
1289 .with_index_range(channel.ticket_index..)
1290 .with_state(AcknowledgedTicketStatus::Untouched))
1291 })
1292 .forward(self.redemption_requests()?)
1293 .await?;
1294
1295 Ok(())
1296 }
1297
1298 pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1299 &self,
1300 counterparty: &Address,
1301 min_value: B,
1302 ) -> errors::Result<()> {
1303 self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1304 .await
1305 }
1306
1307 pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1308 &self,
1309 channel_id: &Hash,
1310 min_value: B,
1311 ) -> errors::Result<()> {
1312 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1313
1314 let channel = self
1315 .chain_api
1316 .channel_by_id(channel_id)
1317 .await
1318 .map_err(HoprLibError::chain)?
1319 .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1320
1321 self.redemption_requests()?
1322 .send(
1323 TicketSelector::from(channel)
1324 .with_amount(min_value.into()..)
1325 .with_index_range(channel.ticket_index..)
1326 .with_state(AcknowledgedTicketStatus::Untouched),
1327 )
1328 .await?;
1329
1330 Ok(())
1331 }
1332
1333 pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1334 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1335
1336 self.redemption_requests()?
1337 .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1338 .await?;
1339
1340 Ok(())
1341 }
1342
1343 pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1344 let peer_id = *peer_id;
1345 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer_id))
1347 .await
1348 .map_err(|e| HoprLibError::GeneralError(format!("failed to convert peer id to off-chain key: {}", e)))?;
1349
1350 self.chain_api
1351 .packet_key_to_chain_key(&pubkey)
1352 .await
1353 .map_err(HoprLibError::chain)
1354 }
1355
1356 pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1357 self.chain_api
1358 .chain_key_to_packet_key(address)
1359 .await
1360 .map(|pk| pk.map(|v| v.into()))
1361 .map_err(HoprLibError::chain)
1362 }
1363}
1364
1365impl<Chain, Db> Hopr<Chain, Db> {
1366 pub fn collect_hopr_metrics() -> errors::Result<String> {
1369 cfg_if::cfg_if! {
1370 if #[cfg(all(feature = "prometheus", not(test)))] {
1371 hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
1372 } else {
1373 Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
1374 }
1375 }
1376 }
1377}