1mod helpers;
18
19pub mod config;
21pub mod constants;
23pub mod errors;
25
26pub mod utils;
28
29pub mod traits;
31
32pub mod state;
34
35#[cfg(any(feature = "testing", test))]
36pub mod testing;
37
38pub use hopr_api as api;
39
40#[doc(hidden)]
42pub mod exports {
43 pub mod types {
44 pub use hopr_chain_types as chain;
45 pub use hopr_internal_types as internal;
46 pub use hopr_primitive_types as primitive;
47 }
48
49 pub mod crypto {
50 pub use hopr_crypto_keypair as keypair;
51 pub use hopr_crypto_types as types;
52 }
53
54 pub mod network {
55 pub use hopr_network_types as types;
56 }
57
58 pub use hopr_transport as transport;
59}
60
61#[doc(hidden)]
63pub mod prelude {
64 pub use super::exports::{
65 crypto::{
66 keypair::key_pair::HoprKeys,
67 types::prelude::{ChainKeypair, Hash, OffchainKeypair},
68 },
69 network::types::{
70 prelude::ForeignDataMode,
71 udp::{ConnectedUdpStream, UdpStreamParallelism},
72 },
73 transport::{OffchainPublicKey, socket::HoprSocket},
74 types::primitive::prelude::Address,
75 };
76}
77
78use std::{
79 convert::identity,
80 future::Future,
81 sync::{Arc, OnceLock, atomic::Ordering},
82 time::Duration,
83};
84
85use futures::{FutureExt, SinkExt, Stream, StreamExt, TryFutureExt, channel::mpsc::channel, pin_mut};
86use futures_time::future::FutureExt as FuturesTimeFutureExt;
87use hopr_api::{
88 chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
89 ct::TrafficGeneration,
90 db::{HoprNodeDbApi, TicketMarker, TicketSelector},
91};
92pub use hopr_api::{db::ChannelTicketStatistics, network::Observable};
93use hopr_async_runtime::prelude::spawn;
94pub use hopr_async_runtime::{Abortable, AbortableList};
95pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
96pub use hopr_crypto_types::prelude::*;
97pub use hopr_internal_types::prelude::*;
98pub use hopr_network_types::prelude::*;
99#[cfg(all(feature = "prometheus", not(test)))]
100use hopr_platform::time::native::current_time;
101pub use hopr_primitive_types::prelude::*;
102use hopr_transport::errors::HoprTransportError;
103#[cfg(feature = "runtime-tokio")]
104pub use hopr_transport::transfer_session;
105pub use hopr_transport::*;
106use tracing::{debug, error, info, warn};
107use validator::Validate;
108
109pub use crate::{
110 config::SafeModule,
111 constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
112 errors::{HoprLibError, HoprStatusError},
113 state::{HoprLibProcess, HoprState},
114 traits::chain::{CloseChannelResult, OpenChannelResult},
115};
116
117#[cfg(all(feature = "prometheus", not(test)))]
118lazy_static::lazy_static! {
119 static ref METRIC_PROCESS_START_TIME: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
120 "hopr_start_time",
121 "The unix timestamp in seconds at which the process was started"
122 ).unwrap();
123 static ref METRIC_HOPR_LIB_VERSION: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
124 "hopr_lib_version",
125 "Executed version of hopr-lib",
126 &["version"]
127 ).unwrap();
128 static ref METRIC_HOPR_NODE_INFO: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
129 "hopr_node_addresses",
130 "Node on-chain and off-chain addresses",
131 &["peerid", "address", "safe_address", "module_address"]
132 ).unwrap();
133}
134
135#[cfg(feature = "runtime-tokio")]
140pub fn prepare_tokio_runtime(
141 num_cpu_threads: Option<std::num::NonZeroUsize>,
142 num_io_threads: Option<std::num::NonZeroUsize>,
143) -> anyhow::Result<tokio::runtime::Runtime> {
144 use std::str::FromStr;
145 let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
146
147 hopr_parallelize::cpu::init_thread_pool(
148 num_cpu_threads
149 .map(|v| v.get())
150 .or(avail_parallelism)
151 .ok_or(anyhow::anyhow!(
152 "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
153 environment variable."
154 ))?
155 .max(1),
156 )?;
157
158 Ok(tokio::runtime::Builder::new_multi_thread()
159 .enable_all()
160 .worker_threads(
161 num_io_threads
162 .map(|v| v.get())
163 .or(avail_parallelism)
164 .ok_or(anyhow::anyhow!(
165 "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
166 environment variable."
167 ))?
168 .max(1),
169 )
170 .thread_name("hoprd")
171 .thread_stack_size(
172 std::env::var("HOPRD_THREAD_STACK_SIZE")
173 .ok()
174 .and_then(|v| usize::from_str(&v).ok())
175 .unwrap_or(10 * 1024 * 1024)
176 .max(2 * 1024 * 1024),
177 )
178 .build()?)
179}
180
181pub type HoprTransportIO = socket::HoprSocket<
183 futures::channel::mpsc::Receiver<ApplicationDataIn>,
184 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
185>;
186
187type NewTicketEvents = (
188 async_broadcast::Sender<VerifiedTicket>,
189 async_broadcast::InactiveReceiver<VerifiedTicket>,
190);
191
192const NODE_READY_TIMEOUT: Duration = Duration::from_secs(120);
194
195const ON_CHAIN_RESOLUTION_EVENT_TIMEOUT: Duration = Duration::from_secs(90);
198
199pub struct Hopr<Chain, Db> {
211 me: OffchainKeypair,
212 cfg: config::HoprLibConfig,
213 state: Arc<state::AtomicHoprState>,
214 transport_api: HoprTransport<Chain, Db>,
215 redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
216 node_db: Db,
217 chain_api: Chain,
218 winning_ticket_subscribers: NewTicketEvents,
219 processes: OnceLock<AbortableList<HoprLibProcess>>,
220}
221
222impl<Chain, Db> Hopr<Chain, Db>
223where
224 Chain: HoprChainApi + Clone + Send + Sync + 'static,
225 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
226{
227 pub async fn new(
228 identity: (&ChainKeypair, &OffchainKeypair),
229 hopr_chain_api: Chain,
230 hopr_node_db: Db,
231 cfg: config::HoprLibConfig,
232 ) -> errors::Result<Self> {
233 if hopr_crypto_random::is_rng_fixed() {
234 warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
235 }
236
237 cfg.validate()?;
238
239 let hopr_transport_api = HoprTransport::new(
240 identity,
241 hopr_chain_api.clone(),
242 hopr_node_db.clone(),
243 vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
244 cfg.protocol,
245 );
246
247 #[cfg(all(feature = "prometheus", not(test)))]
248 {
249 METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
250 METRIC_HOPR_LIB_VERSION.set(
251 &[const_format::formatcp!("{}", constants::APP_VERSION)],
252 const_format::formatcp!(
253 "{}.{}",
254 env!("CARGO_PKG_VERSION_MAJOR"),
255 env!("CARGO_PKG_VERSION_MINOR")
256 )
257 .parse()
258 .unwrap_or(0.0),
259 );
260
261 if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
263 error!(%error, "failed to initialize ticket statistics metrics");
264 }
265 }
266
267 let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
268 new_tickets_tx.set_await_active(false);
269 new_tickets_tx.set_overflow(true);
270
271 Ok(Self {
272 me: identity.1.clone(),
273 cfg,
274 state: Arc::new(state::AtomicHoprState::new(HoprState::Uninitialized)),
275 transport_api: hopr_transport_api,
276 chain_api: hopr_chain_api,
277 node_db: hopr_node_db,
278 redeem_requests: OnceLock::new(),
279 processes: OnceLock::new(),
280 winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
281 })
282 }
283
284 fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
285 if self.status() == state {
286 Ok(())
287 } else {
288 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
289 }
290 }
291
292 pub fn status(&self) -> HoprState {
293 self.state.load(Ordering::Relaxed)
294 }
295
296 pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
297 self.chain_api
298 .balance(self.me_onchain())
299 .await
300 .map_err(HoprLibError::chain)
301 }
302
303 pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
304 self.chain_api
305 .balance(self.cfg.safe_module.safe_address)
306 .await
307 .map_err(HoprLibError::chain)
308 }
309
310 pub async fn chain_info(&self) -> errors::Result<ChainInfo> {
311 self.chain_api.chain_info().await.map_err(HoprLibError::chain)
312 }
313
314 pub fn get_safe_config(&self) -> SafeModule {
315 self.cfg.safe_module.clone()
316 }
317
318 pub fn config(&self) -> &config::HoprLibConfig {
319 &self.cfg
320 }
321
322 #[inline]
323 fn is_public(&self) -> bool {
324 self.cfg.publish
325 }
326
327 pub async fn run<
328 Ct,
329 #[cfg(feature = "session-server")] T: traits::session::HoprSessionServer + Clone + Send + 'static,
330 >(
331 &self,
332 cover_traffic: Ct,
333 #[cfg(feature = "session-server")] serve_handler: T,
334 ) -> errors::Result<HoprTransportIO>
335 where
336 Ct: TrafficGeneration + Send + Sync + 'static,
337 {
338 self.error_if_not_in_state(
339 HoprState::Uninitialized,
340 "cannot start the hopr node multiple times".into(),
341 )?;
342
343 #[cfg(feature = "testing")]
344 warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
345
346 info!(
347 address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
348 "node is not started, please fund this node",
349 );
350
351 self.state.store(HoprState::WaitingForFunds, Ordering::Relaxed);
352 helpers::wait_for_funds(
353 *MIN_NATIVE_BALANCE,
354 *SUGGESTED_NATIVE_BALANCE,
355 Duration::from_secs(200),
356 self.me_onchain(),
357 &self.chain_api,
358 )
359 .await?;
360
361 let mut processes = AbortableList::<HoprLibProcess>::default();
362
363 info!("starting HOPR node...");
364 self.state.store(HoprState::CheckingBalance, Ordering::Relaxed);
365
366 let balance: XDaiBalance = self.get_balance().await?;
367 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
368
369 info!(
370 address = %self.me_onchain(),
371 %balance,
372 %minimum_balance,
373 "node information"
374 );
375
376 if balance.le(&minimum_balance) {
377 return Err(HoprLibError::GeneralError(
378 "cannot start the node without a sufficiently funded wallet".into(),
379 ));
380 }
381
382 self.state.store(HoprState::ValidatingNetworkConfig, Ordering::Relaxed);
383
384 let network_min_ticket_price = self
387 .chain_api
388 .minimum_ticket_price()
389 .await
390 .map_err(HoprLibError::chain)?;
391 let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
392 if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
393 return Err(HoprLibError::GeneralError(format!(
394 "configured outgoing ticket price is lower than the network minimum ticket price: \
395 {configured_ticket_price:?} < {network_min_ticket_price}"
396 )));
397 }
398 let network_min_win_prob = self
401 .chain_api
402 .minimum_incoming_ticket_win_prob()
403 .await
404 .map_err(HoprLibError::chain)?;
405 let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
406 if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
407 && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
408 {
409 return Err(HoprLibError::GeneralError(format!(
410 "configured outgoing ticket winning probability is lower than the network minimum winning \
411 probability: {configured_win_prob:?} < {network_min_win_prob}"
412 )));
413 }
414
415 let minimum_capacity = self
418 .chain_api
419 .count_accounts(AccountSelector {
420 public_only: true,
421 ..Default::default()
422 })
423 .await
424 .map_err(HoprLibError::chain)?
425 .saturating_mul(2)
426 .saturating_add(100);
427
428 let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
429 .ok()
430 .and_then(|s| s.trim().parse::<usize>().ok())
431 .filter(|&c| c > 0)
432 .unwrap_or(2048)
433 .max(minimum_capacity);
434
435 debug!(
436 capacity = chain_discovery_events_capacity,
437 minimum_required = minimum_capacity,
438 "creating chain discovery events channel"
439 );
440 let (indexer_peer_update_tx, indexer_peer_update_rx) =
441 channel::<PeerDiscovery>(chain_discovery_events_capacity);
442
443 self.state
444 .store(HoprState::SubscribingToAnnouncements, Ordering::Relaxed);
445
446 let (announcements_stream, announcements_handle) = futures::stream::abortable(
449 self.chain_api
450 .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])
451 .map_err(HoprLibError::chain)?,
452 );
453 processes.insert(HoprLibProcess::AccountAnnouncements, announcements_handle);
454
455 spawn(
456 announcements_stream
457 .filter_map(|event| {
458 futures::future::ready(event.try_as_announcement().map(|account| {
459 PeerDiscovery::Announce(account.public_key.into(), account.get_multiaddrs().to_vec())
460 }))
461 })
462 .map(Ok)
463 .forward(indexer_peer_update_tx)
464 .inspect(
465 |_| warn!(task = %HoprLibProcess::AccountAnnouncements,"long-running background task finished"),
466 ),
467 );
468
469 info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
470
471 let safe_addr = self.cfg.safe_module.safe_address;
472
473 if self.me_onchain() == safe_addr {
474 return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
475 }
476
477 self.state.store(HoprState::RegisteringSafe, Ordering::Relaxed);
478 info!(%safe_addr, "registering safe with this node");
479 match self.chain_api.register_safe(&safe_addr).await {
480 Ok(awaiter) => {
481 awaiter.await.map_err(|error| {
483 error!(%safe_addr, %error, "safe registration failed with error");
484 HoprLibError::chain(error)
485 })?;
486 info!(%safe_addr, "safe successfully registered with this node");
487 }
488 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
489 info!(%safe_addr, "this safe is already registered with this node");
490 }
491 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
492 error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
494 return Err(HoprLibError::GeneralError("node registered with different safe".into()));
495 }
496 Err(error) => {
497 error!(%safe_addr, %error, "safe registration failed");
498 return Err(HoprLibError::chain(error));
499 }
500 }
501
502 let multiaddresses_to_announce = if self.is_public() {
504 self.transport_api.announceable_multiaddresses()
507 } else {
508 Vec::with_capacity(0)
509 };
510
511 multiaddresses_to_announce
513 .iter()
514 .filter(|a| !is_public_address(a))
515 .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
516
517 self.state.store(HoprState::AnnouncingNode, Ordering::Relaxed);
518
519 let chain_api = self.chain_api.clone();
520 let me_offchain = *self.me.public();
521 let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
522
523 info!(?multiaddresses_to_announce, "announcing node on chain");
526 match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
527 Ok(awaiter) => {
528 awaiter.await.map_err(|error| {
530 error!(?multiaddresses_to_announce, %error, "node announcement failed");
531 HoprLibError::chain(error)
532 })?;
533 info!(?multiaddresses_to_announce, "node has been successfully announced");
534 }
535 Err(AnnouncementError::AlreadyAnnounced) => {
536 info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
537 }
538 Err(error) => {
539 error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
540 return Err(HoprLibError::chain(error));
541 }
542 }
543
544 self.state.store(HoprState::AwaitingKeyBinding, Ordering::Relaxed);
545
546 let this_node_account = node_ready
548 .await
549 .map_err(HoprLibError::other)?
550 .map_err(HoprLibError::chain)?;
551 if this_node_account.chain_addr != self.me_onchain()
552 || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
553 {
554 error!(%this_node_account, "account bound to offchain key does not match this node");
555 return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
556 }
557
558 info!(%this_node_account, "node account is ready");
559
560 self.state.store(HoprState::InitializingServices, Ordering::Relaxed);
561
562 info!("initializing session infrastructure");
563 let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
564 .ok()
565 .and_then(|s| s.trim().parse::<usize>().ok())
566 .filter(|&c| c > 0)
567 .unwrap_or(256);
568
569 let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
570 #[cfg(feature = "session-server")]
571 {
572 debug!(capacity = incoming_session_channel_capacity, "creating session server");
573 processes.insert(
574 HoprLibProcess::SessionServer,
575 hopr_async_runtime::spawn_as_abortable!(
576 _session_rx
577 .for_each_concurrent(None, move |session| {
578 let serve_handler = serve_handler.clone();
579 async move {
580 let session_id = *session.session.id();
581 match serve_handler.process(session).await {
582 Ok(_) => debug!(?session_id, "client session processed successfully"),
583 Err(error) => error!(
584 ?session_id,
585 %error,
586 "client session processing failed"
587 ),
588 }
589 }
590 })
591 .inspect(|_| tracing::warn!(
592 task = %HoprLibProcess::SessionServer,
593 "long-running background task finished"
594 ))
595 ),
596 );
597 }
598
599 info!("starting ticket events processor");
600 let (tickets_tx, tickets_rx) = channel(8192);
601 let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
602 processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
603 let node_db = self.node_db.clone();
604 let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
605 spawn(
606 tickets_rx
607 .filter_map(move |ticket_event| {
608 let node_db = node_db.clone();
609 async move {
610 match ticket_event {
611 TicketEvent::WinningTicket(winning) => {
612 if let Err(error) = node_db.insert_ticket(*winning).await {
613 tracing::error!(%error, %winning, "failed to insert ticket into database");
614 } else {
615 tracing::debug!(%winning, "inserted ticket into database");
616 }
617 Some(winning)
618 }
619 TicketEvent::RejectedTicket(rejected, issuer) => {
620 if let Some(issuer) = &issuer {
621 if let Err(error) =
622 node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
623 {
624 tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
625 } else {
626 tracing::debug!(%rejected, "marked ticket as rejected");
627 }
628 } else {
629 tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
630 }
631 None
632 }
633 }
634 }
635 })
636 .for_each(move |ticket| {
637 if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
638 tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
639 }
640 futures::future::ready(())
641 })
642 .inspect(|_| {
643 tracing::warn!(
644 task = %HoprLibProcess::TicketEvents,
645 "long-running background task finished"
646 )
647 }),
648 );
649
650 info!("starting transport");
651 let (hopr_socket, transport_processes) = self
652 .transport_api
653 .run(cover_traffic, indexer_peer_update_rx, tickets_tx, session_tx)
654 .await?;
655 processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
656
657 info!("starting ticket redemption service");
658 let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
660 let _ = self.redeem_requests.set(redemption_req_tx);
661 let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
662 processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
663 let chain = self.chain_api.clone();
664 let node_db = self.node_db.clone();
665 spawn(redemption_req_rx
666 .for_each(move |selector| {
667 let chain = chain.clone();
668 let db = node_db.clone();
669 async move {
670 match chain.redeem_tickets_via_selectors(&db, [selector]).await {
671 Ok(res) => debug!(%res, "redemption complete"),
672 Err(error) => error!(%error, "redemption failed"),
673 }
674 }
675 })
676 .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
677 );
678
679 info!("subscribing to channel events");
680 let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
681 processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
682 let chain = self.chain_api.clone();
683 let node_db = self.node_db.clone();
684 let events = chain.subscribe().map_err(HoprLibError::chain)?;
685 spawn(
686 futures::stream::Abortable::new(
687 events
688 .filter_map(move |event|
689 futures::future::ready(event.try_as_channel_closed())
690 ),
691 chain_events_sub_reg
692 )
693 .for_each(move |closed_channel| {
694 let node_db = node_db.clone();
695 let chain = chain.clone();
696 async move {
697 match closed_channel.direction(chain.me()) {
698 Some(ChannelDirection::Incoming) => {
699 match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
700 Ok(num_neglected) if num_neglected > 0 => {
701 warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
702 },
703 Ok(_) => {
704 debug!(%closed_channel, "no neglected tickets on incoming closed channel");
705 },
706 Err(error) => {
707 error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
708 }
709 }
710 },
711 Some(ChannelDirection::Outgoing) => {
712 if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
713 error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
714 } else {
715 debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
716 }
717 }
718 _ => {} }
720 }
721 })
722 .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
723 );
724
725 info!("synchronizing ticket states");
726 let mut channels = self
731 .chain_api
732 .stream_channels(ChannelSelector {
733 destination: self.me_onchain().into(),
734 ..Default::default()
735 })
736 .map_err(HoprLibError::chain)
737 .await?;
738
739 while let Some(channel) = channels.next().await {
740 self.node_db
743 .update_ticket_states_and_fetch(
744 [TicketSelector::from(&channel)
745 .with_state(AcknowledgedTicketStatus::BeingRedeemed)
746 .with_index_range(channel.ticket_index..)],
747 AcknowledgedTicketStatus::Untouched,
748 )
749 .map_err(HoprLibError::db)
750 .await?
751 .for_each(|ticket| {
752 info!(%ticket, "fixed next out-of-sync ticket");
753 futures::future::ready(())
754 })
755 .await;
756
757 self.node_db
759 .mark_tickets_as(
760 [TicketSelector::from(&channel).with_index_range(..channel.ticket_index)],
761 TicketMarker::Neglected,
762 )
763 .map_err(HoprLibError::db)
764 .await?;
765 }
766
767 self.state.store(HoprState::Running, Ordering::Relaxed);
768
769 info!(
770 id = %self.me_peer_id(),
771 version = constants::APP_VERSION,
772 "NODE STARTED AND RUNNING"
773 );
774
775 #[cfg(all(feature = "prometheus", not(test)))]
776 METRIC_HOPR_NODE_INFO.set(
777 &[
778 &self.me.public().to_peerid_str(),
779 &self.me_onchain().to_string(),
780 &self.cfg.safe_module.safe_address.to_string(),
781 &self.cfg.safe_module.module_address.to_string(),
782 ],
783 1.0,
784 );
785
786 let _ = self.processes.set(processes);
787 Ok(hopr_socket)
788 }
789
790 pub fn shutdown(&self) -> Result<(), HoprLibError> {
798 self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
799 if let Some(processes) = self.processes.get() {
800 processes.abort_all();
801 }
802 self.state.store(HoprState::Terminated, Ordering::Relaxed);
803 info!("NODE SHUTDOWN COMPLETE");
804 Ok(())
805 }
806
807 pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send {
809 self.winning_ticket_subscribers.1.activate_cloned()
810 }
811
812 pub fn me_peer_id(&self) -> PeerId {
815 (*self.me.public()).into()
816 }
817
818 pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
820 Ok(self
821 .chain_api
822 .stream_accounts(AccountSelector {
823 public_only: true,
824 ..Default::default()
825 })
826 .map_err(HoprLibError::chain)
827 .await?
828 .map(|entry| {
829 (
830 PeerId::from(entry.public_key),
831 entry.chain_addr,
832 entry.get_multiaddrs().to_vec(),
833 )
834 })
835 .collect()
836 .await)
837 }
838
839 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, Observations)> {
843 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
844
845 Ok(self.transport_api.ping(peer).await?)
846 }
847
848 #[cfg(feature = "session-client")]
851 pub async fn connect_to(
852 &self,
853 destination: Address,
854 target: SessionTarget,
855 cfg: SessionClientConfig,
856 ) -> errors::Result<HoprSession> {
857 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
858
859 let backoff = backon::ConstantBuilder::default()
860 .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
861 .with_delay(self.cfg.protocol.session.establish_retry_timeout)
862 .with_jitter();
863
864 use backon::Retryable;
865
866 Ok((|| {
867 let cfg = cfg.clone();
868 let target = target.clone();
869 async { self.transport_api.new_session(destination, target, cfg).await }
870 })
871 .retry(backoff)
872 .sleep(backon::FuturesTimerSleeper)
873 .await?)
874 }
875
876 #[cfg(feature = "session-client")]
879 pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
880 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
881 Ok(self.transport_api.probe_session(id).await?)
882 }
883
884 #[cfg(feature = "session-client")]
885 pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
886 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
887 Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
888 }
889
890 #[cfg(all(feature = "session-client", feature = "telemetry"))]
891 pub async fn get_session_stats(&self, id: &SessionId) -> errors::Result<SessionStatsSnapshot> {
892 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
893 Ok(self.transport_api.session_stats(id).await?)
894 }
895
896 #[cfg(feature = "session-client")]
897 pub async fn update_session_surb_balancer_config(
898 &self,
899 id: &SessionId,
900 cfg: SurbBalancerConfig,
901 ) -> errors::Result<()> {
902 self.error_if_not_in_state(HoprState::Running, "Node is not ready for session operations".into())?;
903 Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
904 }
905
906 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
908 self.transport_api.local_multiaddresses()
909 }
910
911 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
913 self.transport_api.listening_multiaddresses().await
914 }
915
916 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
918 self.transport_api.network_observed_multiaddresses(peer).await
919 }
920
921 pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> errors::Result<Vec<Multiaddr>> {
923 let peer = *peer;
924 let pubkey =
926 hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer), "multiaddr_lookup")
927 .await??;
928
929 match self
930 .chain_api
931 .stream_accounts(AccountSelector {
932 public_only: false,
933 offchain_key: Some(pubkey),
934 ..Default::default()
935 })
936 .map_err(HoprLibError::chain)
937 .await?
938 .next()
939 .await
940 {
941 Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
942 None => {
943 error!(%peer, "no information");
944 Ok(vec![])
945 }
946 }
947 }
948
949 pub async fn network_health(&self) -> Health {
953 self.transport_api.network_health().await
954 }
955
956 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
958 Ok(self.transport_api.network_connected_peers().await?)
959 }
960
961 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<Observations>> {
963 Ok(self.transport_api.network_peer_observations(peer).await?)
964 }
965
966 #[cfg(feature = "telemetry")]
968 pub async fn network_peer_packet_stats(&self, peer: &PeerId) -> errors::Result<Option<PeerPacketStatsSnapshot>> {
969 Ok(self.transport_api.network_peer_packet_stats(peer).await?)
970 }
971
972 #[cfg(feature = "telemetry")]
974 pub async fn network_all_packet_stats(&self) -> errors::Result<Vec<(PeerId, PeerPacketStatsSnapshot)>> {
975 Ok(self.transport_api.network_all_packet_stats().await?)
976 }
977
978 pub async fn all_network_peers(
980 &self,
981 minimum_score: f64,
982 ) -> errors::Result<Vec<(Option<Address>, PeerId, Observations)>> {
983 Ok(
984 futures::stream::iter(self.transport_api.network_connected_peers().await?)
985 .filter_map(|peer| async move {
986 if let Ok(Some(info)) = self.transport_api.network_peer_observations(&peer).await {
987 if info.score() >= minimum_score {
988 Some((peer, info))
989 } else {
990 None
991 }
992 } else {
993 None
994 }
995 })
996 .filter_map(|(peer_id, info)| async move {
997 let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
998 Some((address, peer_id, info))
999 })
1000 .collect::<Vec<_>>()
1001 .await,
1002 )
1003 }
1004
1005 pub async fn tickets_in_channel(&self, channel_id: &ChannelId) -> errors::Result<Option<Vec<RedeemableTicket>>> {
1008 if let Some(channel) = self
1009 .chain_api
1010 .channel_by_id(channel_id)
1011 .await
1012 .map_err(|e| HoprTransportError::Other(e.into()))?
1013 {
1014 if &channel.destination == self.chain_api.me() {
1015 Ok(Some(
1016 self.node_db
1017 .stream_tickets([&channel])
1018 .await
1019 .map_err(HoprLibError::db)?
1020 .collect()
1021 .await,
1022 ))
1023 } else {
1024 Ok(None)
1025 }
1026 } else {
1027 Ok(None)
1028 }
1029 }
1030
1031 pub async fn all_tickets(&self) -> errors::Result<Vec<VerifiedTicket>> {
1033 Ok(self
1034 .node_db
1035 .stream_tickets(None::<TicketSelector>)
1036 .await
1037 .map_err(HoprLibError::db)?
1038 .map(|v| v.ticket)
1039 .collect()
1040 .await)
1041 }
1042
1043 pub async fn ticket_statistics(&self) -> errors::Result<ChannelTicketStatistics> {
1045 self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
1046 }
1047
1048 pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
1050 self.node_db
1051 .reset_ticket_statistics()
1052 .await
1053 .map_err(HoprLibError::chain)
1054 }
1055
1056 pub fn me_onchain(&self) -> Address {
1058 *self.chain_api.me()
1059 }
1060
1061 pub async fn get_ticket_price(&self) -> errors::Result<HoprBalance> {
1063 self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1064 }
1065
1066 pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
1068 self.chain_api
1069 .minimum_incoming_ticket_win_prob()
1070 .await
1071 .map_err(HoprLibError::chain)
1072 }
1073
1074 pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1076 Ok(self
1077 .chain_api
1078 .stream_accounts(AccountSelector {
1079 public_only: true,
1080 ..Default::default()
1081 })
1082 .map_err(HoprLibError::chain)
1083 .await?
1084 .collect()
1085 .await)
1086 }
1087
1088 pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1091 self.chain_api
1092 .channel_by_id(channel_id)
1093 .await
1094 .map_err(HoprLibError::chain)
1095 }
1096
1097 pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<Option<ChannelEntry>> {
1102 self.chain_api
1103 .channel_by_parties(src, dest)
1104 .await
1105 .map_err(HoprLibError::chain)
1106 }
1107
1108 pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1110 Ok(self
1111 .chain_api
1112 .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1113 ChannelStatusDiscriminants::Closed,
1114 ChannelStatusDiscriminants::Open,
1115 ChannelStatusDiscriminants::PendingToClose,
1116 ]))
1117 .map_err(HoprLibError::chain)
1118 .await?
1119 .collect()
1120 .await)
1121 }
1122
1123 pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1125 Ok(self
1126 .chain_api
1127 .stream_channels(
1128 ChannelSelector::default()
1129 .with_destination(*dest)
1130 .with_allowed_states(&[
1131 ChannelStatusDiscriminants::Closed,
1132 ChannelStatusDiscriminants::Open,
1133 ChannelStatusDiscriminants::PendingToClose,
1134 ]),
1135 )
1136 .map_err(HoprLibError::chain)
1137 .await?
1138 .collect()
1139 .await)
1140 }
1141
1142 pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1144 Ok(self
1145 .chain_api
1146 .stream_channels(ChannelSelector::default().with_allowed_states(&[
1147 ChannelStatusDiscriminants::Closed,
1148 ChannelStatusDiscriminants::Open,
1149 ChannelStatusDiscriminants::PendingToClose,
1150 ]))
1151 .map_err(HoprLibError::chain)
1152 .await?
1153 .collect()
1154 .await)
1155 }
1156
1157 pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1159 self.chain_api
1160 .safe_allowance(self.cfg.safe_module.safe_address)
1161 .await
1162 .map_err(HoprLibError::chain)
1163 }
1164
1165 pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1169 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1170
1171 self.chain_api
1172 .withdraw(amount, &recipient)
1173 .and_then(identity)
1174 .map_err(HoprLibError::chain)
1175 .await
1176 }
1177
1178 pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<prelude::Hash> {
1182 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1183
1184 self.chain_api
1185 .withdraw(amount, &recipient)
1186 .and_then(identity)
1187 .map_err(HoprLibError::chain)
1188 .await
1189 }
1190
1191 fn spawn_wait_for_on_chain_event(
1194 &self,
1195 context: impl std::fmt::Display,
1196 predicate: impl Fn(&ChainEvent) -> bool + Send + Sync + 'static,
1197 timeout: Duration,
1198 ) -> errors::Result<(
1199 impl Future<Output = errors::Result<ChainEvent>>,
1200 hopr_async_runtime::AbortHandle,
1201 )> {
1202 debug!(%context, "registering wait for on-chain event");
1203 let (event_stream, handle) = futures::stream::abortable(
1204 self.chain_api
1205 .subscribe()
1206 .map_err(HoprLibError::chain)?
1207 .skip_while(move |event| futures::future::ready(!predicate(event))),
1208 );
1209
1210 let ctx = context.to_string();
1211
1212 Ok((
1213 spawn(async move {
1214 pin_mut!(event_stream);
1215 let res = event_stream
1216 .next()
1217 .timeout(futures_time::time::Duration::from(timeout))
1218 .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")))
1219 .await?
1220 .ok_or(HoprLibError::GeneralError(format!(
1221 "failed to yield an on-chain event for {ctx}"
1222 )));
1223 debug!(%ctx, ?res, "on-chain event waiting done");
1224 res
1225 })
1226 .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")))
1227 .and_then(futures::future::ready),
1228 handle,
1229 ))
1230 }
1231
1232 pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1233 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1234
1235 let channel_id = generate_channel_id(&self.me_onchain(), destination);
1236
1237 let confirm_awaiter = self
1238 .chain_api
1239 .open_channel(destination, amount)
1240 .await
1241 .map_err(HoprLibError::chain)?;
1242
1243 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1244 format!("open channel to {destination} ({channel_id})"),
1245 move |event| matches!(event, ChainEvent::ChannelOpened(c) if c.get_id() == &channel_id),
1246 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1247 )?;
1248
1249 let tx_hash = confirm_awaiter.await.map_err(|e| {
1250 event_abort.abort();
1251 HoprLibError::chain(e)
1252 })?;
1253
1254 let event = event_awaiter.await?;
1255 debug!(%event, "open channel event received");
1256
1257 Ok(OpenChannelResult { tx_hash, channel_id })
1258 }
1259
1260 pub async fn fund_channel(&self, channel_id: &ChannelId, amount: HoprBalance) -> errors::Result<Hash> {
1261 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1262
1263 let channel_id = *channel_id;
1264
1265 let confirm_awaiter = self
1266 .chain_api
1267 .fund_channel(&channel_id, amount)
1268 .await
1269 .map_err(HoprLibError::chain)?;
1270
1271 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1272 format!("fund channel {channel_id}"),
1273 move |event| matches!(event, ChainEvent::ChannelBalanceIncreased(c, a) if c.get_id() == &channel_id && a == &amount),
1274 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT
1275 )?;
1276
1277 let res = confirm_awaiter.await.map_err(|e| {
1278 event_abort.abort();
1279 HoprLibError::chain(e)
1280 })?;
1281
1282 let event = event_awaiter.await?;
1283 debug!(%event, "fund channel event received");
1284
1285 Ok(res)
1286 }
1287
1288 pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> errors::Result<CloseChannelResult> {
1289 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1290
1291 let channel_id = *channel_id;
1292
1293 let confirm_awaiter = self
1294 .chain_api
1295 .close_channel(&channel_id)
1296 .await
1297 .map_err(HoprLibError::chain)?;
1298
1299 let (event_awaiter, event_abort) = self.spawn_wait_for_on_chain_event(
1300 format!("close channel {channel_id}"),
1301 move |event| {
1302 matches!(event, ChainEvent::ChannelClosed(c) if c.get_id() == &channel_id)
1303 || matches!(event, ChainEvent::ChannelClosureInitiated(c) if c.get_id() == &channel_id)
1304 },
1305 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1306 )?;
1307
1308 let tx_hash = confirm_awaiter.await.map_err(|e| {
1309 event_abort.abort();
1310 HoprLibError::chain(e)
1311 })?;
1312
1313 let event = event_awaiter.await?;
1314 debug!(%event, "close channel event received");
1315
1316 Ok(CloseChannelResult { tx_hash })
1317 }
1318
1319 pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1320 self.chain_api
1321 .channel_closure_notice_period()
1322 .await
1323 .map_err(HoprLibError::chain)
1324 }
1325
1326 pub fn redemption_requests(
1327 &self,
1328 ) -> errors::Result<impl futures::Sink<TicketSelector, Error = HoprLibError> + Clone> {
1329 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1330
1331 Ok(self
1333 .redeem_requests
1334 .get()
1335 .cloned()
1336 .expect("redeem_requests is not initialized")
1337 .sink_map_err(HoprLibError::other))
1338 }
1339
1340 pub async fn redeem_all_tickets<B: Into<HoprBalance>>(&self, min_value: B) -> errors::Result<()> {
1341 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1342
1343 let min_value = min_value.into();
1344
1345 self.chain_api
1346 .stream_channels(
1347 ChannelSelector::default()
1348 .with_destination(self.me_onchain())
1349 .with_allowed_states(&[
1350 ChannelStatusDiscriminants::Open,
1351 ChannelStatusDiscriminants::PendingToClose,
1352 ]),
1353 )
1354 .map_err(HoprLibError::chain)
1355 .await?
1356 .map(|channel| {
1357 Ok(TicketSelector::from(&channel)
1358 .with_amount(min_value..)
1359 .with_index_range(channel.ticket_index..)
1360 .with_state(AcknowledgedTicketStatus::Untouched))
1361 })
1362 .forward(self.redemption_requests()?)
1363 .await?;
1364
1365 Ok(())
1366 }
1367
1368 pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1369 &self,
1370 counterparty: &Address,
1371 min_value: B,
1372 ) -> errors::Result<()> {
1373 self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1374 .await
1375 }
1376
1377 pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1378 &self,
1379 channel_id: &Hash,
1380 min_value: B,
1381 ) -> errors::Result<()> {
1382 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1383
1384 let channel = self
1385 .chain_api
1386 .channel_by_id(channel_id)
1387 .await
1388 .map_err(HoprLibError::chain)?
1389 .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1390
1391 self.redemption_requests()?
1392 .send(
1393 TicketSelector::from(channel)
1394 .with_amount(min_value.into()..)
1395 .with_index_range(channel.ticket_index..)
1396 .with_state(AcknowledgedTicketStatus::Untouched),
1397 )
1398 .await?;
1399
1400 Ok(())
1401 }
1402
1403 pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1404 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1405
1406 self.redemption_requests()?
1407 .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1408 .await?;
1409
1410 Ok(())
1411 }
1412
1413 pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1414 let peer_id = *peer_id;
1415 let pubkey = hopr_parallelize::cpu::spawn_blocking(
1417 move || prelude::OffchainPublicKey::from_peerid(&peer_id),
1418 "chainkey_lookup",
1419 )
1420 .await??;
1421
1422 self.chain_api
1423 .packet_key_to_chain_key(&pubkey)
1424 .await
1425 .map_err(HoprLibError::chain)
1426 }
1427
1428 pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1429 self.chain_api
1430 .chain_key_to_packet_key(address)
1431 .await
1432 .map(|pk| pk.map(|v| v.into()))
1433 .map_err(HoprLibError::chain)
1434 }
1435}
1436
1437impl<Chain, Db> Hopr<Chain, Db> {
1438 pub fn collect_hopr_metrics() -> errors::Result<String> {
1441 cfg_if::cfg_if! {
1442 if #[cfg(all(feature = "prometheus", not(test)))] {
1443 hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
1444 } else {
1445 Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
1446 }
1447 }
1448 }
1449}