1mod helpers;
18
19pub mod config;
21pub mod constants;
23pub mod errors;
25
26pub mod utils;
28
29pub mod traits;
31
32pub mod state;
34
35#[cfg(any(feature = "testing", test))]
36pub mod testing;
37
38pub use hopr_api as api;
39
40#[doc(hidden)]
42pub mod exports {
43 pub mod types {
44 pub use hopr_internal_types as internal;
45 pub use hopr_primitive_types as primitive;
46 }
47
48 pub mod crypto {
49 pub use hopr_crypto_keypair as keypair;
50 pub use hopr_crypto_types as types;
51 }
52
53 pub mod network {
54 pub use hopr_network_types as types;
55 }
56
57 pub use hopr_transport as transport;
58}
59
60#[doc(hidden)]
62pub mod prelude {
63 pub use super::exports::{
64 crypto::{
65 keypair::key_pair::HoprKeys,
66 types::prelude::{ChainKeypair, Hash, OffchainKeypair},
67 },
68 network::types::{
69 prelude::ForeignDataMode,
70 udp::{ConnectedUdpStream, UdpStreamParallelism},
71 },
72 transport::{OffchainPublicKey, socket::HoprSocket},
73 types::primitive::prelude::Address,
74 };
75}
76
77use std::{
78 convert::identity,
79 future::Future,
80 num::NonZeroUsize,
81 sync::{Arc, OnceLock, atomic::Ordering},
82 time::Duration,
83};
84
85use futures::{FutureExt, SinkExt, Stream, StreamExt, TryFutureExt, channel::mpsc::channel, pin_mut};
86use futures_time::future::FutureExt as FuturesTimeFutureExt;
87use hopr_api::{
88 chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
89 ct::TrafficGeneration,
90 db::{HoprNodeDbApi, TicketMarker, TicketSelector},
91};
92pub use hopr_api::{db::ChannelTicketStatistics, network::Observable};
93use hopr_async_runtime::prelude::spawn;
94pub use hopr_async_runtime::{Abortable, AbortableList};
95pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
96pub use hopr_crypto_types::prelude::*;
97pub use hopr_internal_types::prelude::*;
98pub use hopr_network_types::prelude::*;
99#[cfg(all(feature = "prometheus", not(test)))]
100use hopr_platform::time::native::current_time;
101pub use hopr_primitive_types::prelude::*;
102use hopr_transport::errors::HoprTransportError;
103#[cfg(feature = "runtime-tokio")]
104pub use hopr_transport::transfer_session;
105pub use hopr_transport::*;
106use tracing::{debug, error, info, warn};
107use validator::Validate;
108
109pub use crate::{
110 config::SafeModule,
111 constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
112 errors::{HoprLibError, HoprStatusError},
113 state::{HoprLibProcess, HoprState},
114 traits::chain::{CloseChannelResult, OpenChannelResult},
115};
116
117#[cfg(all(feature = "prometheus", not(test)))]
118lazy_static::lazy_static! {
119 static ref METRIC_PROCESS_START_TIME: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
120 "hopr_start_time",
121 "The unix timestamp in seconds at which the process was started"
122 ).unwrap();
123 static ref METRIC_HOPR_LIB_VERSION: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
124 "hopr_lib_version",
125 "Executed version of hopr-lib",
126 &["version"]
127 ).unwrap();
128 static ref METRIC_HOPR_NODE_INFO: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
129 "hopr_node_addresses",
130 "Node on-chain and off-chain addresses",
131 &["peerid", "address", "safe_address", "module_address"]
132 ).unwrap();
133}
134
135#[cfg(feature = "runtime-tokio")]
140pub fn prepare_tokio_runtime(
141 num_cpu_threads: Option<NonZeroUsize>,
142 num_io_threads: Option<NonZeroUsize>,
143) -> anyhow::Result<tokio::runtime::Runtime> {
144 use std::str::FromStr;
145 let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
146
147 hopr_parallelize::cpu::init_thread_pool(
148 num_cpu_threads
149 .map(|v| v.get())
150 .or(avail_parallelism)
151 .ok_or(anyhow::anyhow!(
152 "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
153 environment variable."
154 ))?
155 .max(1),
156 )?;
157
158 Ok(tokio::runtime::Builder::new_multi_thread()
159 .enable_all()
160 .worker_threads(
161 num_io_threads
162 .map(|v| v.get())
163 .or(avail_parallelism)
164 .ok_or(anyhow::anyhow!(
165 "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
166 environment variable."
167 ))?
168 .max(1),
169 )
170 .thread_name("hoprd")
171 .thread_stack_size(
172 std::env::var("HOPRD_THREAD_STACK_SIZE")
173 .ok()
174 .and_then(|v| usize::from_str(&v).ok())
175 .unwrap_or(10 * 1024 * 1024)
176 .max(2 * 1024 * 1024),
177 )
178 .build()?)
179}
180
181pub type HoprTransportIO = socket::HoprSocket<
183 futures::channel::mpsc::Receiver<ApplicationDataIn>,
184 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
185>;
186
187type NewTicketEvents = (
188 async_broadcast::Sender<VerifiedTicket>,
189 async_broadcast::InactiveReceiver<VerifiedTicket>,
190);
191
192const NODE_READY_TIMEOUT: Duration = Duration::from_secs(30);
194
195const ON_CHAIN_RESOLUTION_EVENT_TIMEOUT: Duration = Duration::from_secs(30);
197
198pub struct Hopr<Chain, Db> {
210 me: OffchainKeypair,
211 cfg: config::HoprLibConfig,
212 state: Arc<state::AtomicHoprState>,
213 transport_api: HoprTransport<Chain, Db>,
214 redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
215 node_db: Db,
216 chain_api: Chain,
217 winning_ticket_subscribers: NewTicketEvents,
218 processes: OnceLock<AbortableList<HoprLibProcess>>,
219}
220
221impl<Chain, Db> Hopr<Chain, Db>
222where
223 Chain: HoprChainApi + Clone + Send + Sync + 'static,
224 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
225{
226 pub async fn new(
227 identity: (&ChainKeypair, &OffchainKeypair),
228 hopr_chain_api: Chain,
229 hopr_node_db: Db,
230 cfg: config::HoprLibConfig,
231 ) -> errors::Result<Self> {
232 if hopr_crypto_random::is_rng_fixed() {
233 warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
234 }
235
236 cfg.validate()?;
237
238 let hopr_transport_api = HoprTransport::new(
239 identity,
240 hopr_chain_api.clone(),
241 hopr_node_db.clone(),
242 vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
243 cfg.protocol,
244 );
245
246 #[cfg(all(feature = "prometheus", not(test)))]
247 {
248 METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
249 METRIC_HOPR_LIB_VERSION.set(
250 &[const_format::formatcp!("{}", constants::APP_VERSION)],
251 const_format::formatcp!(
252 "{}.{}",
253 env!("CARGO_PKG_VERSION_MAJOR"),
254 env!("CARGO_PKG_VERSION_MINOR")
255 )
256 .parse()
257 .unwrap_or(0.0),
258 );
259
260 if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
262 error!(%error, "failed to initialize ticket statistics metrics");
263 }
264 }
265
266 let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
267 new_tickets_tx.set_await_active(false);
268 new_tickets_tx.set_overflow(true);
269
270 Ok(Self {
271 me: identity.1.clone(),
272 cfg,
273 state: Arc::new(state::AtomicHoprState::new(HoprState::Uninitialized)),
274 transport_api: hopr_transport_api,
275 chain_api: hopr_chain_api,
276 node_db: hopr_node_db,
277 redeem_requests: OnceLock::new(),
278 processes: OnceLock::new(),
279 winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
280 })
281 }
282
283 fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
284 if self.status() == state {
285 Ok(())
286 } else {
287 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
288 }
289 }
290
291 pub fn status(&self) -> HoprState {
292 self.state.load(Ordering::Relaxed)
293 }
294
295 pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
296 self.chain_api
297 .balance(self.me_onchain())
298 .await
299 .map_err(HoprLibError::chain)
300 }
301
302 pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
303 self.chain_api
304 .balance(self.cfg.safe_module.safe_address)
305 .await
306 .map_err(HoprLibError::chain)
307 }
308
309 pub async fn chain_info(&self) -> errors::Result<ChainInfo> {
310 self.chain_api.chain_info().await.map_err(HoprLibError::chain)
311 }
312
313 pub fn get_safe_config(&self) -> SafeModule {
314 self.cfg.safe_module.clone()
315 }
316
317 pub fn config(&self) -> &config::HoprLibConfig {
318 &self.cfg
319 }
320
321 #[inline]
322 fn is_public(&self) -> bool {
323 self.cfg.publish
324 }
325
326 pub async fn run<
327 Ct,
328 #[cfg(feature = "session-server")] T: traits::session::HoprSessionServer + Clone + Send + 'static,
329 >(
330 &self,
331 cover_traffic: Ct,
332 #[cfg(feature = "session-server")] serve_handler: T,
333 ) -> errors::Result<HoprTransportIO>
334 where
335 Ct: TrafficGeneration + Send + Sync + 'static,
336 {
337 self.error_if_not_in_state(
338 HoprState::Uninitialized,
339 "cannot start the hopr node multiple times".into(),
340 )?;
341
342 #[cfg(feature = "testing")]
343 warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
344
345 info!(
346 address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
347 "node is not started, please fund this node",
348 );
349
350 helpers::wait_for_funds(
351 *MIN_NATIVE_BALANCE,
352 *SUGGESTED_NATIVE_BALANCE,
353 Duration::from_secs(200),
354 self.me_onchain(),
355 &self.chain_api,
356 )
357 .await?;
358
359 let mut processes = AbortableList::<HoprLibProcess>::default();
360
361 info!("starting HOPR node...");
362 self.state.store(HoprState::Initializing, Ordering::Relaxed);
363
364 let balance: XDaiBalance = self.get_balance().await?;
365 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
366
367 info!(
368 address = %self.me_onchain(),
369 %balance,
370 %minimum_balance,
371 "node information"
372 );
373
374 if balance.le(&minimum_balance) {
375 return Err(HoprLibError::GeneralError(
376 "cannot start the node without a sufficiently funded wallet".into(),
377 ));
378 }
379
380 let network_min_ticket_price = self
383 .chain_api
384 .minimum_ticket_price()
385 .await
386 .map_err(HoprLibError::chain)?;
387 let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
388 if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
389 return Err(HoprLibError::GeneralError(format!(
390 "configured outgoing ticket price is lower than the network minimum ticket price: \
391 {configured_ticket_price:?} < {network_min_ticket_price}"
392 )));
393 }
394 let network_min_win_prob = self
397 .chain_api
398 .minimum_incoming_ticket_win_prob()
399 .await
400 .map_err(HoprLibError::chain)?;
401 let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
402 if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
403 && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
404 {
405 return Err(HoprLibError::GeneralError(format!(
406 "configured outgoing ticket winning probability is lower than the network minimum winning \
407 probability: {configured_win_prob:?} < {network_min_win_prob}"
408 )));
409 }
410
411 let minimum_capacity = self
414 .chain_api
415 .count_accounts(AccountSelector {
416 public_only: true,
417 ..Default::default()
418 })
419 .await
420 .map_err(HoprLibError::chain)?
421 .saturating_mul(2)
422 .saturating_add(100);
423
424 let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
425 .ok()
426 .and_then(|s| s.trim().parse::<usize>().ok())
427 .filter(|&c| c > 0)
428 .unwrap_or(2048)
429 .max(minimum_capacity);
430
431 debug!(
432 capacity = chain_discovery_events_capacity,
433 minimum_required = minimum_capacity,
434 "creating chain discovery events channel"
435 );
436 let (indexer_peer_update_tx, indexer_peer_update_rx) =
437 channel::<PeerDiscovery>(chain_discovery_events_capacity);
438
439 let (announcements_stream, announcements_handle) = futures::stream::abortable(
442 self.chain_api
443 .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])
444 .map_err(HoprLibError::chain)?,
445 );
446 processes.insert(HoprLibProcess::AccountAnnouncements, announcements_handle);
447
448 spawn(
449 announcements_stream
450 .filter_map(|event| {
451 futures::future::ready(event.try_as_announcement().map(|account| {
452 PeerDiscovery::Announce(account.public_key.into(), account.get_multiaddrs().to_vec())
453 }))
454 })
455 .map(Ok)
456 .forward(indexer_peer_update_tx)
457 .inspect(
458 |_| warn!(task = %HoprLibProcess::AccountAnnouncements,"long-running background task finished"),
459 ),
460 );
461
462 info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
463
464 let safe_addr = self.cfg.safe_module.safe_address;
465
466 if self.me_onchain() == safe_addr {
467 return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
468 }
469
470 info!(%safe_addr, "registering safe with this node");
471 match self.chain_api.register_safe(&safe_addr).await {
472 Ok(awaiter) => {
473 awaiter.await.map_err(|error| {
475 error!(%safe_addr, %error, "safe registration failed with error");
476 HoprLibError::chain(error)
477 })?;
478 info!(%safe_addr, "safe successfully registered with this node");
479 }
480 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
481 info!(%safe_addr, "this safe is already registered with this node");
482 }
483 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
484 error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
486 return Err(HoprLibError::GeneralError("node registered with different safe".into()));
487 }
488 Err(error) => {
489 error!(%safe_addr, %error, "safe registration failed");
490 return Err(HoprLibError::chain(error));
491 }
492 }
493
494 let multiaddresses_to_announce = if self.is_public() {
496 self.transport_api.announceable_multiaddresses()
499 } else {
500 Vec::with_capacity(0)
501 };
502
503 multiaddresses_to_announce
505 .iter()
506 .filter(|a| !is_public_address(a))
507 .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
508
509 let chain_api = self.chain_api.clone();
510 let me_offchain = *self.me.public();
511 let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
512
513 info!(?multiaddresses_to_announce, "announcing node on chain");
516 match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
517 Ok(awaiter) => {
518 awaiter.await.map_err(|error| {
520 error!(?multiaddresses_to_announce, %error, "node announcement failed");
521 HoprLibError::chain(error)
522 })?;
523 info!(?multiaddresses_to_announce, "node has been successfully announced");
524 }
525 Err(AnnouncementError::AlreadyAnnounced) => {
526 info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
527 }
528 Err(error) => {
529 error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
530 return Err(HoprLibError::chain(error));
531 }
532 }
533
534 let this_node_account = node_ready
536 .await
537 .map_err(HoprLibError::other)?
538 .map_err(HoprLibError::chain)?;
539 if this_node_account.chain_addr != self.me_onchain()
540 || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
541 {
542 error!(%this_node_account, "account bound to offchain key does not match this node");
543 return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
544 }
545
546 info!(%this_node_account, "node account is ready");
547
548 let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
549 .ok()
550 .and_then(|s| s.trim().parse::<usize>().ok())
551 .filter(|&c| c > 0)
552 .unwrap_or(256);
553
554 let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
555 #[cfg(feature = "session-server")]
556 {
557 debug!(capacity = incoming_session_channel_capacity, "creating session server");
558 processes.insert(
559 HoprLibProcess::SessionServer,
560 hopr_async_runtime::spawn_as_abortable!(
561 _session_rx
562 .for_each_concurrent(None, move |session| {
563 let serve_handler = serve_handler.clone();
564 async move {
565 let session_id = *session.session.id();
566 match serve_handler.process(session).await {
567 Ok(_) => debug!(?session_id, "client session processed successfully"),
568 Err(error) => error!(
569 ?session_id,
570 %error,
571 "client session processing failed"
572 ),
573 }
574 }
575 })
576 .inspect(|_| tracing::warn!(
577 task = %HoprLibProcess::SessionServer,
578 "long-running background task finished"
579 ))
580 ),
581 );
582 }
583
584 info!("starting ticket events processor");
585 let (tickets_tx, tickets_rx) = channel(1024);
586 let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
587 processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
588 let node_db = self.node_db.clone();
589 let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
590 spawn(
591 tickets_rx
592 .filter_map(move |ticket_event| {
593 let node_db = node_db.clone();
594 async move {
595 match ticket_event {
596 TicketEvent::WinningTicket(winning) => {
597 if let Err(error) = node_db.insert_ticket(*winning).await {
598 tracing::error!(%error, %winning, "failed to insert ticket into database");
599 } else {
600 tracing::debug!(%winning, "inserted ticket into database");
601 }
602 Some(winning)
603 }
604 TicketEvent::RejectedTicket(rejected, issuer) => {
605 if let Some(issuer) = &issuer {
606 if let Err(error) =
607 node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
608 {
609 tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
610 } else {
611 tracing::debug!(%rejected, "marked ticket as rejected");
612 }
613 } else {
614 tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
615 }
616 None
617 }
618 }
619 }
620 })
621 .for_each(move |ticket| {
622 if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
623 tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
624 }
625 futures::future::ready(())
626 })
627 .inspect(|_| {
628 tracing::warn!(
629 task = %HoprLibProcess::TicketEvents,
630 "long-running background task finished"
631 )
632 }),
633 );
634
635 info!("starting transport");
636 let (hopr_socket, transport_processes) = self
637 .transport_api
638 .run(cover_traffic, indexer_peer_update_rx, tickets_tx, session_tx)
639 .await?;
640 processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
641
642 let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
644 let _ = self.redeem_requests.set(redemption_req_tx);
645 let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
646 processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
647 let chain = self.chain_api.clone();
648 let node_db = self.node_db.clone();
649 spawn(redemption_req_rx
650 .for_each(move |selector| {
651 let chain = chain.clone();
652 let db = node_db.clone();
653 async move {
654 match chain.redeem_tickets_via_selectors(&db, [selector]).await {
655 Ok(res) => debug!(%res, "redemption complete"),
656 Err(error) => error!(%error, "redemption failed"),
657 }
658 }
659 })
660 .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
661 );
662
663 let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
664 processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
665 let chain = self.chain_api.clone();
666 let node_db = self.node_db.clone();
667 let events = chain.subscribe().map_err(HoprLibError::chain)?;
668 spawn(
669 futures::stream::Abortable::new(
670 events
671 .filter_map(move |event|
672 futures::future::ready(event.try_as_channel_closed())
673 ),
674 chain_events_sub_reg
675 )
676 .for_each(move |closed_channel| {
677 let node_db = node_db.clone();
678 let chain = chain.clone();
679 async move {
680 match closed_channel.direction(chain.me()) {
681 Some(ChannelDirection::Incoming) => {
682 match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
683 Ok(num_neglected) if num_neglected > 0 => {
684 warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
685 },
686 Ok(_) => {
687 debug!(%closed_channel, "no neglected tickets on incoming closed channel");
688 },
689 Err(error) => {
690 error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
691 }
692 }
693 },
694 Some(ChannelDirection::Outgoing) => {
695 if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
696 error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
697 } else {
698 debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
699 }
700 }
701 _ => {} }
703 }
704 })
705 .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
706 );
707
708 let mut channels = self
713 .chain_api
714 .stream_channels(ChannelSelector {
715 destination: self.me_onchain().into(),
716 ..Default::default()
717 })
718 .map_err(HoprLibError::chain)
719 .await?;
720
721 while let Some(channel) = channels.next().await {
722 self.node_db
723 .update_ticket_states_and_fetch(
724 [TicketSelector::from(&channel)
725 .with_state(AcknowledgedTicketStatus::BeingRedeemed)
726 .with_index_range(channel.ticket_index..)],
727 AcknowledgedTicketStatus::Untouched,
728 )
729 .map_err(HoprLibError::db)
730 .await?
731 .for_each(|ticket| {
732 info!(%ticket, "fixed next out-of-sync ticket");
733 futures::future::ready(())
734 })
735 .await;
736 }
737
738 self.state.store(HoprState::Running, Ordering::Relaxed);
739
740 info!(
741 id = %self.me_peer_id(),
742 version = constants::APP_VERSION,
743 "NODE STARTED AND RUNNING"
744 );
745
746 #[cfg(all(feature = "prometheus", not(test)))]
747 METRIC_HOPR_NODE_INFO.set(
748 &[
749 &self.me.public().to_peerid_str(),
750 &self.me_onchain().to_string(),
751 &self.cfg.safe_module.safe_address.to_string(),
752 &self.cfg.safe_module.module_address.to_string(),
753 ],
754 1.0,
755 );
756
757 let _ = self.processes.set(processes);
758 Ok(hopr_socket)
759 }
760
761 pub fn shutdown(&self) -> Result<(), HoprLibError> {
769 self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
770 if let Some(processes) = self.processes.get() {
771 processes.abort_all();
772 }
773 self.state.store(HoprState::Terminated, Ordering::Relaxed);
774 info!("NODE SHUTDOWN COMPLETE");
775 Ok(())
776 }
777
778 pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send {
780 self.winning_ticket_subscribers.1.activate_cloned()
781 }
782
783 pub fn me_peer_id(&self) -> PeerId {
786 (*self.me.public()).into()
787 }
788
789 pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
791 Ok(self
792 .chain_api
793 .stream_accounts(AccountSelector {
794 public_only: true,
795 ..Default::default()
796 })
797 .map_err(HoprLibError::chain)
798 .await?
799 .map(|entry| {
800 (
801 PeerId::from(entry.public_key),
802 entry.chain_addr,
803 entry.get_multiaddrs().to_vec(),
804 )
805 })
806 .collect()
807 .await)
808 }
809
810 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, Observations)> {
814 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
815
816 Ok(self.transport_api.ping(peer).await?)
817 }
818
819 #[cfg(feature = "session-client")]
822 pub async fn connect_to(
823 &self,
824 destination: Address,
825 target: SessionTarget,
826 cfg: SessionClientConfig,
827 ) -> errors::Result<HoprSession> {
828 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
829
830 let backoff = backon::ConstantBuilder::default()
831 .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
832 .with_delay(self.cfg.protocol.session.establish_retry_timeout)
833 .with_jitter();
834
835 use backon::Retryable;
836
837 Ok((|| {
838 let cfg = cfg.clone();
839 let target = target.clone();
840 async { self.transport_api.new_session(destination, target, cfg).await }
841 })
842 .retry(backoff)
843 .sleep(backon::FuturesTimerSleeper)
844 .await?)
845 }
846
847 #[cfg(feature = "session-client")]
850 pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
851 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
852 Ok(self.transport_api.probe_session(id).await?)
853 }
854
855 #[cfg(feature = "session-client")]
856 pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
857 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
858 Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
859 }
860
861 #[cfg(feature = "session-client")]
862 pub async fn update_session_surb_balancer_config(
863 &self,
864 id: &SessionId,
865 cfg: SurbBalancerConfig,
866 ) -> errors::Result<()> {
867 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
868 Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
869 }
870
871 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
873 self.transport_api.local_multiaddresses()
874 }
875
876 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
878 self.transport_api.listening_multiaddresses().await
879 }
880
881 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
883 self.transport_api.network_observed_multiaddresses(peer).await
884 }
885
886 pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> errors::Result<Vec<Multiaddr>> {
888 let peer = *peer;
889 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
891
892 match self
893 .chain_api
894 .stream_accounts(AccountSelector {
895 public_only: false,
896 offchain_key: Some(pubkey),
897 ..Default::default()
898 })
899 .map_err(HoprLibError::chain)
900 .await?
901 .next()
902 .await
903 {
904 Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
905 None => {
906 error!(%peer, "no information");
907 Ok(vec![])
908 }
909 }
910 }
911
912 pub async fn network_health(&self) -> Health {
916 self.transport_api.network_health().await
917 }
918
919 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
921 Ok(self.transport_api.network_connected_peers().await?)
922 }
923
924 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<Observations>> {
926 Ok(self.transport_api.network_peer_observations(peer).await?)
927 }
928
929 pub async fn all_network_peers(
931 &self,
932 minimum_score: f64,
933 ) -> errors::Result<Vec<(Option<Address>, PeerId, Observations)>> {
934 Ok(
935 futures::stream::iter(self.transport_api.network_connected_peers().await?)
936 .filter_map(|peer| async move {
937 if let Ok(Some(info)) = self.transport_api.network_peer_observations(&peer).await {
938 if info.score() >= minimum_score {
939 Some((peer, info))
940 } else {
941 None
942 }
943 } else {
944 None
945 }
946 })
947 .filter_map(|(peer_id, info)| async move {
948 let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
949 Some((address, peer_id, info))
950 })
951 .collect::<Vec<_>>()
952 .await,
953 )
954 }
955
956 pub async fn tickets_in_channel(&self, channel_id: &ChannelId) -> errors::Result<Option<Vec<RedeemableTicket>>> {
959 if let Some(channel) = self
960 .chain_api
961 .channel_by_id(channel_id)
962 .await
963 .map_err(|e| HoprTransportError::Other(e.into()))?
964 {
965 if &channel.destination == self.chain_api.me() {
966 Ok(Some(
967 self.node_db
968 .stream_tickets([&channel])
969 .await
970 .map_err(HoprLibError::db)?
971 .collect()
972 .await,
973 ))
974 } else {
975 Ok(None)
976 }
977 } else {
978 Ok(None)
979 }
980 }
981
982 pub async fn all_tickets(&self) -> errors::Result<Vec<VerifiedTicket>> {
984 Ok(self
985 .node_db
986 .stream_tickets(None::<TicketSelector>)
987 .await
988 .map_err(HoprLibError::db)?
989 .map(|v| v.ticket)
990 .collect()
991 .await)
992 }
993
994 pub async fn ticket_statistics(&self) -> errors::Result<ChannelTicketStatistics> {
996 self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
997 }
998
999 pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
1001 self.node_db
1002 .reset_ticket_statistics()
1003 .await
1004 .map_err(HoprLibError::chain)
1005 }
1006
1007 pub fn me_onchain(&self) -> Address {
1009 *self.chain_api.me()
1010 }
1011
1012 pub async fn get_ticket_price(&self) -> errors::Result<HoprBalance> {
1014 self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1015 }
1016
1017 pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
1019 self.chain_api
1020 .minimum_incoming_ticket_win_prob()
1021 .await
1022 .map_err(HoprLibError::chain)
1023 }
1024
1025 pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1027 Ok(self
1028 .chain_api
1029 .stream_accounts(AccountSelector {
1030 public_only: true,
1031 ..Default::default()
1032 })
1033 .map_err(HoprLibError::chain)
1034 .await?
1035 .collect()
1036 .await)
1037 }
1038
1039 pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1042 self.chain_api
1043 .channel_by_id(channel_id)
1044 .await
1045 .map_err(HoprLibError::chain)
1046 }
1047
1048 pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<Option<ChannelEntry>> {
1053 self.chain_api
1054 .channel_by_parties(src, dest)
1055 .await
1056 .map_err(HoprLibError::chain)
1057 }
1058
1059 pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1061 Ok(self
1062 .chain_api
1063 .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1064 ChannelStatusDiscriminants::Closed,
1065 ChannelStatusDiscriminants::Open,
1066 ChannelStatusDiscriminants::PendingToClose,
1067 ]))
1068 .map_err(HoprLibError::chain)
1069 .await?
1070 .collect()
1071 .await)
1072 }
1073
1074 pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1076 Ok(self
1077 .chain_api
1078 .stream_channels(
1079 ChannelSelector::default()
1080 .with_destination(*dest)
1081 .with_allowed_states(&[
1082 ChannelStatusDiscriminants::Closed,
1083 ChannelStatusDiscriminants::Open,
1084 ChannelStatusDiscriminants::PendingToClose,
1085 ]),
1086 )
1087 .map_err(HoprLibError::chain)
1088 .await?
1089 .collect()
1090 .await)
1091 }
1092
1093 pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1095 Ok(self
1096 .chain_api
1097 .stream_channels(ChannelSelector::default().with_allowed_states(&[
1098 ChannelStatusDiscriminants::Closed,
1099 ChannelStatusDiscriminants::Open,
1100 ChannelStatusDiscriminants::PendingToClose,
1101 ]))
1102 .map_err(HoprLibError::chain)
1103 .await?
1104 .collect()
1105 .await)
1106 }
1107
1108 pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1110 self.chain_api
1111 .safe_allowance(self.cfg.safe_module.safe_address)
1112 .await
1113 .map_err(HoprLibError::chain)
1114 }
1115
1116 pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1120 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1121
1122 self.chain_api
1123 .withdraw(amount, &recipient)
1124 .and_then(identity)
1125 .map_err(HoprLibError::chain)
1126 .await
1127 }
1128
1129 pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<prelude::Hash> {
1133 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1134
1135 self.chain_api
1136 .withdraw(amount, &recipient)
1137 .and_then(identity)
1138 .map_err(HoprLibError::chain)
1139 .await
1140 }
1141
1142 fn spawn_wait_for_on_chain_event(
1145 &self,
1146 context: impl std::fmt::Display,
1147 predicate: impl Fn(&ChainEvent) -> bool + Send + Sync + 'static,
1148 timeout: Duration,
1149 ) -> errors::Result<impl Future<Output = errors::Result<ChainEvent>>> {
1150 let event_stream = self
1151 .chain_api
1152 .subscribe()
1153 .map_err(HoprLibError::chain)?
1154 .skip_while(move |event| futures::future::ready(!predicate(event)));
1155
1156 let ctx = context.to_string();
1157 Ok(spawn(async move {
1158 pin_mut!(event_stream);
1159 event_stream
1160 .next()
1161 .timeout(futures_time::time::Duration::from(timeout))
1162 .map_err(|_| HoprLibError::GeneralError(format!("{ctx} timed out after {timeout:?}")))
1163 .await?
1164 .ok_or(HoprLibError::GeneralError(format!(
1165 "failed to yield an on-chain event for {ctx}"
1166 )))
1167 })
1168 .map_err(move |_| HoprLibError::GeneralError(format!("failed to spawn future for {context}")))
1169 .and_then(futures::future::ready))
1170 }
1171
1172 pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1173 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1174
1175 let channel_id = generate_channel_id(&self.me_onchain(), destination);
1176
1177 let event_awaiter = self.spawn_wait_for_on_chain_event(
1178 format!("open channel to {destination}"),
1179 move |event| matches!(event, ChainEvent::ChannelOpened(c) if c.get_id() == &channel_id),
1180 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1181 )?;
1182
1183 let tx_hash = self
1184 .chain_api
1185 .open_channel(destination, amount)
1186 .and_then(identity)
1187 .map_err(HoprLibError::chain)
1188 .await?;
1189
1190 let event = event_awaiter.await?;
1191 debug!(%event, "open channel event received");
1192
1193 Ok(OpenChannelResult { tx_hash, channel_id })
1194 }
1195
1196 pub async fn fund_channel(&self, channel_id: &ChannelId, amount: HoprBalance) -> errors::Result<Hash> {
1197 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1198
1199 let channel_id = *channel_id;
1200 let event_awaiter = self.spawn_wait_for_on_chain_event(
1201 format!("fund channel {channel_id}"),
1202 move |event| matches!(event, ChainEvent::ChannelBalanceIncreased(c, a) if c.get_id() == &channel_id && a == &amount),
1203 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT
1204 )?;
1205
1206 let res = self
1207 .chain_api
1208 .fund_channel(&channel_id, amount)
1209 .and_then(identity)
1210 .map_err(HoprLibError::chain)
1211 .await?;
1212
1213 let event = event_awaiter.await?;
1214 debug!(%event, "fund channel event received");
1215
1216 Ok(res)
1217 }
1218
1219 pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> errors::Result<CloseChannelResult> {
1220 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1221
1222 let channel_id = *channel_id;
1223 let event_awaiter = self.spawn_wait_for_on_chain_event(
1224 format!("close channel {channel_id}"),
1225 move |event| {
1226 matches!(event, ChainEvent::ChannelClosed(c) if c.get_id() == &channel_id)
1227 || matches!(event, ChainEvent::ChannelClosureInitiated(c) if c.get_id() == &channel_id)
1228 },
1229 ON_CHAIN_RESOLUTION_EVENT_TIMEOUT,
1230 )?;
1231
1232 let tx_hash = self
1233 .chain_api
1234 .close_channel(&channel_id)
1235 .and_then(identity)
1236 .map_err(HoprLibError::chain)
1237 .await?;
1238
1239 let event = event_awaiter.await?;
1240 debug!(%event, "close channel event received");
1241
1242 Ok(CloseChannelResult { tx_hash })
1243 }
1244
1245 pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1246 self.chain_api
1247 .channel_closure_notice_period()
1248 .await
1249 .map_err(HoprLibError::chain)
1250 }
1251
1252 pub fn redemption_requests(
1253 &self,
1254 ) -> errors::Result<impl futures::Sink<TicketSelector, Error = HoprLibError> + Clone> {
1255 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1256
1257 Ok(self
1259 .redeem_requests
1260 .get()
1261 .cloned()
1262 .expect("redeem_requests is not initialized")
1263 .sink_map_err(HoprLibError::other))
1264 }
1265
1266 pub async fn redeem_all_tickets<B: Into<HoprBalance>>(&self, min_value: B) -> errors::Result<()> {
1267 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1268
1269 let min_value = min_value.into();
1270
1271 self.chain_api
1272 .stream_channels(
1273 ChannelSelector::default()
1274 .with_destination(self.me_onchain())
1275 .with_allowed_states(&[
1276 ChannelStatusDiscriminants::Open,
1277 ChannelStatusDiscriminants::PendingToClose,
1278 ]),
1279 )
1280 .map_err(HoprLibError::chain)
1281 .await?
1282 .map(|channel| {
1283 Ok(TicketSelector::from(&channel)
1284 .with_amount(min_value..)
1285 .with_index_range(channel.ticket_index..)
1286 .with_state(AcknowledgedTicketStatus::Untouched))
1287 })
1288 .forward(self.redemption_requests()?)
1289 .await?;
1290
1291 Ok(())
1292 }
1293
1294 pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1295 &self,
1296 counterparty: &Address,
1297 min_value: B,
1298 ) -> errors::Result<()> {
1299 self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1300 .await
1301 }
1302
1303 pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1304 &self,
1305 channel_id: &Hash,
1306 min_value: B,
1307 ) -> errors::Result<()> {
1308 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1309
1310 let channel = self
1311 .chain_api
1312 .channel_by_id(channel_id)
1313 .await
1314 .map_err(HoprLibError::chain)?
1315 .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1316
1317 self.redemption_requests()?
1318 .send(
1319 TicketSelector::from(channel)
1320 .with_amount(min_value.into()..)
1321 .with_index_range(channel.ticket_index..)
1322 .with_state(AcknowledgedTicketStatus::Untouched),
1323 )
1324 .await?;
1325
1326 Ok(())
1327 }
1328
1329 pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1330 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1331
1332 self.redemption_requests()?
1333 .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1334 .await?;
1335
1336 Ok(())
1337 }
1338
1339 pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1340 let peer_id = *peer_id;
1341 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer_id))
1343 .await
1344 .map_err(|e| HoprLibError::GeneralError(format!("failed to convert peer id to off-chain key: {}", e)))?;
1345
1346 self.chain_api
1347 .packet_key_to_chain_key(&pubkey)
1348 .await
1349 .map_err(HoprLibError::chain)
1350 }
1351
1352 pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1353 self.chain_api
1354 .chain_key_to_packet_key(address)
1355 .await
1356 .map(|pk| pk.map(|v| v.into()))
1357 .map_err(HoprLibError::chain)
1358 }
1359}
1360
1361impl<Chain, Db> Hopr<Chain, Db> {
1362 pub fn collect_hopr_metrics() -> errors::Result<String> {
1365 cfg_if::cfg_if! {
1366 if #[cfg(all(feature = "prometheus", not(test)))] {
1367 hopr_metrics::gather_all_metrics().map_err(HoprLibError::other)
1368 } else {
1369 Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
1370 }
1371 }
1372 }
1373}