1mod helpers;
18
19pub mod config;
21pub mod constants;
23pub mod errors;
25
26pub mod utils;
28
29pub mod traits;
31
32pub mod state;
34
35#[cfg(any(feature = "testing", test))]
36pub mod testing;
37
38pub use hopr_api as api;
39
40#[doc(hidden)]
42pub mod exports {
43 pub mod types {
44 pub use hopr_internal_types as internal;
45 pub use hopr_primitive_types as primitive;
46 }
47
48 pub mod crypto {
49 pub use hopr_crypto_keypair as keypair;
50 pub use hopr_crypto_types as types;
51 }
52
53 pub mod network {
54 pub use hopr_network_types as types;
55 }
56
57 pub use hopr_transport as transport;
58}
59
60#[doc(hidden)]
62pub mod prelude {
63 pub use super::exports::{
64 crypto::{
65 keypair::key_pair::HoprKeys,
66 types::prelude::{ChainKeypair, Hash, OffchainKeypair},
67 },
68 network::types::{
69 prelude::ForeignDataMode,
70 udp::{ConnectedUdpStream, UdpStreamParallelism},
71 },
72 transport::{OffchainPublicKey, socket::HoprSocket},
73 types::primitive::prelude::Address,
74 };
75}
76
77use std::{
78 convert::identity,
79 num::NonZeroUsize,
80 sync::{Arc, OnceLock, atomic::Ordering},
81 time::Duration,
82};
83
84use futures::{FutureExt, SinkExt, Stream, StreamExt, TryFutureExt, channel::mpsc::channel};
85use hopr_api::{
86 chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
87 ct::TrafficGeneration,
88 db::{HoprNodeDbApi, TicketMarker, TicketSelector},
89};
90pub use hopr_api::{db::ChannelTicketStatistics, network::Observable};
91use hopr_async_runtime::prelude::spawn;
92pub use hopr_async_runtime::{Abortable, AbortableList};
93pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
94pub use hopr_crypto_types::prelude::*;
95pub use hopr_internal_types::prelude::*;
96pub use hopr_network_types::prelude::*;
97#[cfg(all(feature = "prometheus", not(test)))]
98use hopr_platform::time::native::current_time;
99pub use hopr_primitive_types::prelude::*;
100use hopr_transport::errors::HoprTransportError;
101#[cfg(feature = "runtime-tokio")]
102pub use hopr_transport::transfer_session;
103pub use hopr_transport::*;
104use tracing::{debug, error, info, warn};
105use validator::Validate;
106
107pub use crate::{
108 config::SafeModule,
109 constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
110 errors::{HoprLibError, HoprStatusError},
111 state::{HoprLibProcess, HoprState},
112 traits::chain::{CloseChannelResult, OpenChannelResult},
113};
114
115#[cfg(all(feature = "prometheus", not(test)))]
116lazy_static::lazy_static! {
117 static ref METRIC_PROCESS_START_TIME: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
118 "hopr_start_time",
119 "The unix timestamp in seconds at which the process was started"
120 ).unwrap();
121 static ref METRIC_HOPR_LIB_VERSION: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
122 "hopr_lib_version",
123 "Executed version of hopr-lib",
124 &["version"]
125 ).unwrap();
126 static ref METRIC_HOPR_NODE_INFO: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
127 "hopr_node_addresses",
128 "Node on-chain and off-chain addresses",
129 &["peerid", "address", "safe_address", "module_address"]
130 ).unwrap();
131}
132
133#[cfg(feature = "runtime-tokio")]
138pub fn prepare_tokio_runtime(
139 num_cpu_threads: Option<NonZeroUsize>,
140 num_io_threads: Option<NonZeroUsize>,
141) -> anyhow::Result<tokio::runtime::Runtime> {
142 use std::str::FromStr;
143 let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
144
145 hopr_parallelize::cpu::init_thread_pool(
146 num_cpu_threads
147 .map(|v| v.get())
148 .or(avail_parallelism)
149 .ok_or(anyhow::anyhow!(
150 "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
151 environment variable."
152 ))?
153 .max(1),
154 )?;
155
156 Ok(tokio::runtime::Builder::new_multi_thread()
157 .enable_all()
158 .worker_threads(
159 num_io_threads
160 .map(|v| v.get())
161 .or(avail_parallelism)
162 .ok_or(anyhow::anyhow!(
163 "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
164 environment variable."
165 ))?
166 .max(1),
167 )
168 .thread_name("hoprd")
169 .thread_stack_size(
170 std::env::var("HOPRD_THREAD_STACK_SIZE")
171 .ok()
172 .and_then(|v| usize::from_str(&v).ok())
173 .unwrap_or(10 * 1024 * 1024)
174 .max(2 * 1024 * 1024),
175 )
176 .build()?)
177}
178
179pub type HoprTransportIO = socket::HoprSocket<
181 futures::channel::mpsc::Receiver<ApplicationDataIn>,
182 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
183>;
184
185type NewTicketEvents = (
186 async_broadcast::Sender<VerifiedTicket>,
187 async_broadcast::InactiveReceiver<VerifiedTicket>,
188);
189
190const NODE_READY_TIMEOUT: Duration = Duration::from_secs(30);
192
193pub struct Hopr<Chain, Db> {
205 me: OffchainKeypair,
206 cfg: config::HoprLibConfig,
207 state: Arc<state::AtomicHoprState>,
208 transport_api: HoprTransport<Chain, Db>,
209 redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
210 node_db: Db,
211 chain_api: Chain,
212 winning_ticket_subscribers: NewTicketEvents,
213 processes: OnceLock<AbortableList<HoprLibProcess>>,
214}
215
216impl<Chain, Db> Hopr<Chain, Db>
217where
218 Chain: HoprChainApi + Clone + Send + Sync + 'static,
219 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
220{
221 pub async fn new(
222 identity: (&ChainKeypair, &OffchainKeypair),
223 hopr_chain_api: Chain,
224 hopr_node_db: Db,
225 cfg: config::HoprLibConfig,
226 ) -> errors::Result<Self> {
227 if hopr_crypto_random::is_rng_fixed() {
228 warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
229 }
230
231 cfg.validate()?;
232
233 let hopr_transport_api = HoprTransport::new(
234 identity,
235 hopr_chain_api.clone(),
236 hopr_node_db.clone(),
237 vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
238 cfg.protocol,
239 );
240
241 #[cfg(all(feature = "prometheus", not(test)))]
242 {
243 METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
244 METRIC_HOPR_LIB_VERSION.set(
245 &[const_format::formatcp!("{}", constants::APP_VERSION)],
246 const_format::formatcp!(
247 "{}.{}",
248 env!("CARGO_PKG_VERSION_MAJOR"),
249 env!("CARGO_PKG_VERSION_MINOR")
250 )
251 .parse()
252 .unwrap_or(0.0),
253 );
254
255 if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
257 error!(%error, "failed to initialize ticket statistics metrics");
258 }
259 }
260
261 let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
262 new_tickets_tx.set_await_active(false);
263 new_tickets_tx.set_overflow(true);
264
265 Ok(Self {
266 me: identity.1.clone(),
267 cfg,
268 state: Arc::new(state::AtomicHoprState::new(HoprState::Uninitialized)),
269 transport_api: hopr_transport_api,
270 chain_api: hopr_chain_api,
271 node_db: hopr_node_db,
272 redeem_requests: OnceLock::new(),
273 processes: OnceLock::new(),
274 winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
275 })
276 }
277
278 fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
279 if self.status() == state {
280 Ok(())
281 } else {
282 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
283 }
284 }
285
286 pub fn status(&self) -> HoprState {
287 self.state.load(Ordering::Relaxed)
288 }
289
290 pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
291 self.chain_api
292 .balance(self.me_onchain())
293 .await
294 .map_err(HoprLibError::chain)
295 }
296
297 pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
298 self.chain_api
299 .balance(self.cfg.safe_module.safe_address)
300 .await
301 .map_err(HoprLibError::chain)
302 }
303
304 pub async fn chain_info(&self) -> errors::Result<ChainInfo> {
305 self.chain_api.chain_info().await.map_err(HoprLibError::chain)
306 }
307
308 pub fn get_safe_config(&self) -> SafeModule {
309 self.cfg.safe_module.clone()
310 }
311
312 pub fn config(&self) -> &config::HoprLibConfig {
313 &self.cfg
314 }
315
316 #[inline]
317 fn is_public(&self) -> bool {
318 self.cfg.publish
319 }
320
321 pub async fn run<
322 Ct,
323 #[cfg(feature = "session-server")] T: traits::session::HoprSessionServer + Clone + Send + 'static,
324 >(
325 &self,
326 cover_traffic: Ct,
327 #[cfg(feature = "session-server")] serve_handler: T,
328 ) -> errors::Result<HoprTransportIO>
329 where
330 Ct: TrafficGeneration + Send + Sync + 'static,
331 {
332 self.error_if_not_in_state(
333 HoprState::Uninitialized,
334 "cannot start the hopr node multiple times".into(),
335 )?;
336
337 #[cfg(feature = "testing")]
338 warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
339
340 info!(
341 address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
342 "node is not started, please fund this node",
343 );
344
345 helpers::wait_for_funds(
346 *MIN_NATIVE_BALANCE,
347 *SUGGESTED_NATIVE_BALANCE,
348 Duration::from_secs(200),
349 self.me_onchain(),
350 &self.chain_api,
351 )
352 .await?;
353
354 let mut processes = AbortableList::<HoprLibProcess>::default();
355
356 info!("starting HOPR node...");
357 self.state.store(HoprState::Initializing, Ordering::Relaxed);
358
359 let balance: XDaiBalance = self.get_balance().await?;
360 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
361
362 info!(
363 address = %self.me_onchain(),
364 %balance,
365 %minimum_balance,
366 "node information"
367 );
368
369 if balance.le(&minimum_balance) {
370 return Err(HoprLibError::GeneralError(
371 "cannot start the node without a sufficiently funded wallet".into(),
372 ));
373 }
374
375 let network_min_ticket_price = self
378 .chain_api
379 .minimum_ticket_price()
380 .await
381 .map_err(HoprLibError::chain)?;
382 let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
383 if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
384 return Err(HoprLibError::GeneralError(format!(
385 "configured outgoing ticket price is lower than the network minimum ticket price: \
386 {configured_ticket_price:?} < {network_min_ticket_price}"
387 )));
388 }
389 let network_min_win_prob = self
392 .chain_api
393 .minimum_incoming_ticket_win_prob()
394 .await
395 .map_err(HoprLibError::chain)?;
396 let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
397 if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
398 && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
399 {
400 return Err(HoprLibError::GeneralError(format!(
401 "configured outgoing ticket winning probability is lower than the network minimum winning \
402 probability: {configured_win_prob:?} < {network_min_win_prob}"
403 )));
404 }
405
406 let minimum_capacity = self
409 .chain_api
410 .count_accounts(AccountSelector {
411 public_only: true,
412 ..Default::default()
413 })
414 .await
415 .map_err(HoprLibError::chain)?
416 .saturating_mul(2)
417 .saturating_add(100);
418
419 let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
420 .ok()
421 .and_then(|s| s.trim().parse::<usize>().ok())
422 .filter(|&c| c > 0)
423 .unwrap_or(2048)
424 .max(minimum_capacity);
425
426 debug!(
427 capacity = chain_discovery_events_capacity,
428 minimum_required = minimum_capacity,
429 "creating chain discovery events channel"
430 );
431 let (indexer_peer_update_tx, indexer_peer_update_rx) =
432 channel::<PeerDiscovery>(chain_discovery_events_capacity);
433
434 let (announcements_stream, announcements_handle) = futures::stream::abortable(
437 self.chain_api
438 .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])
439 .map_err(HoprLibError::chain)?,
440 );
441 processes.insert(HoprLibProcess::AccountAnnouncements, announcements_handle);
442
443 spawn(
444 announcements_stream
445 .filter_map(|event| {
446 futures::future::ready(event.try_as_announcement().map(|account| {
447 PeerDiscovery::Announce(account.public_key.into(), account.get_multiaddrs().to_vec())
448 }))
449 })
450 .map(Ok)
451 .forward(indexer_peer_update_tx)
452 .inspect(
453 |_| warn!(task = %HoprLibProcess::AccountAnnouncements,"long-running background task finished"),
454 ),
455 );
456
457 info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
458
459 let safe_addr = self.cfg.safe_module.safe_address;
460
461 if self.me_onchain() == safe_addr {
462 return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
463 }
464
465 info!(%safe_addr, "registering safe with this node");
466 match self.chain_api.register_safe(&safe_addr).await {
467 Ok(awaiter) => {
468 awaiter.await.map_err(|error| {
470 error!(%safe_addr, %error, "safe registration failed with error");
471 HoprLibError::chain(error)
472 })?;
473 info!(%safe_addr, "safe successfully registered with this node");
474 }
475 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
476 info!(%safe_addr, "this safe is already registered with this node");
477 }
478 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
479 error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
481 return Err(HoprLibError::GeneralError("node registered with different safe".into()));
482 }
483 Err(error) => {
484 error!(%safe_addr, %error, "safe registration failed");
485 return Err(HoprLibError::chain(error));
486 }
487 }
488
489 let multiaddresses_to_announce = if self.is_public() {
491 self.transport_api.announceable_multiaddresses()
494 } else {
495 Vec::with_capacity(0)
496 };
497
498 multiaddresses_to_announce
500 .iter()
501 .filter(|a| !is_public_address(a))
502 .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
503
504 let chain_api = self.chain_api.clone();
505 let me_offchain = *self.me.public();
506 let node_ready = spawn(async move { chain_api.await_key_binding(&me_offchain, NODE_READY_TIMEOUT).await });
507
508 info!(?multiaddresses_to_announce, "announcing node on chain");
511 match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
512 Ok(awaiter) => {
513 awaiter.await.map_err(|error| {
515 error!(?multiaddresses_to_announce, %error, "node announcement failed");
516 HoprLibError::chain(error)
517 })?;
518 info!(?multiaddresses_to_announce, "node has been successfully announced");
519 }
520 Err(AnnouncementError::AlreadyAnnounced) => {
521 info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
522 }
523 Err(error) => {
524 error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
525 return Err(HoprLibError::chain(error));
526 }
527 }
528
529 let this_node_account = node_ready
531 .await
532 .map_err(HoprLibError::other)?
533 .map_err(HoprLibError::chain)?;
534 if this_node_account.chain_addr != self.me_onchain()
535 || this_node_account.safe_address.is_none_or(|a| a != safe_addr)
536 {
537 error!(%this_node_account, "account bound to offchain key does not match this node");
538 return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
539 }
540
541 info!(%this_node_account, "node account is ready");
542
543 let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
544 .ok()
545 .and_then(|s| s.trim().parse::<usize>().ok())
546 .filter(|&c| c > 0)
547 .unwrap_or(256);
548
549 let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
550 #[cfg(feature = "session-server")]
551 {
552 debug!(capacity = incoming_session_channel_capacity, "creating session server");
553 processes.insert(
554 HoprLibProcess::SessionServer,
555 hopr_async_runtime::spawn_as_abortable!(
556 _session_rx
557 .for_each_concurrent(None, move |session| {
558 let serve_handler = serve_handler.clone();
559 async move {
560 let session_id = *session.session.id();
561 match serve_handler.process(session).await {
562 Ok(_) => debug!(?session_id, "client session processed successfully"),
563 Err(error) => error!(
564 ?session_id,
565 %error,
566 "client session processing failed"
567 ),
568 }
569 }
570 })
571 .inspect(|_| tracing::warn!(
572 task = %HoprLibProcess::SessionServer,
573 "long-running background task finished"
574 ))
575 ),
576 );
577 }
578
579 info!("starting ticket events processor");
580 let (tickets_tx, tickets_rx) = channel(1024);
581 let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
582 processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
583 let node_db = self.node_db.clone();
584 let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
585 spawn(
586 tickets_rx
587 .filter_map(move |ticket_event| {
588 let node_db = node_db.clone();
589 async move {
590 match ticket_event {
591 TicketEvent::WinningTicket(winning) => {
592 if let Err(error) = node_db.insert_ticket(*winning).await {
593 tracing::error!(%error, %winning, "failed to insert ticket into database");
594 } else {
595 tracing::debug!(%winning, "inserted ticket into database");
596 }
597 Some(winning)
598 }
599 TicketEvent::RejectedTicket(rejected, issuer) => {
600 if let Some(issuer) = &issuer {
601 if let Err(error) =
602 node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
603 {
604 tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
605 } else {
606 tracing::debug!(%rejected, "marked ticket as rejected");
607 }
608 } else {
609 tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
610 }
611 None
612 }
613 }
614 }
615 })
616 .for_each(move |ticket| {
617 if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
618 tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
619 }
620 futures::future::ready(())
621 })
622 .inspect(|_| {
623 tracing::warn!(
624 task = %HoprLibProcess::TicketEvents,
625 "long-running background task finished"
626 )
627 }),
628 );
629
630 info!("starting transport");
631 let (hopr_socket, transport_processes) = self
632 .transport_api
633 .run(cover_traffic, indexer_peer_update_rx, tickets_tx, session_tx)
634 .await?;
635 processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
636
637 let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
639 let _ = self.redeem_requests.set(redemption_req_tx);
640 let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
641 processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
642 let chain = self.chain_api.clone();
643 let node_db = self.node_db.clone();
644 spawn(redemption_req_rx
645 .for_each(move |selector| {
646 let chain = chain.clone();
647 let db = node_db.clone();
648 async move {
649 match chain.redeem_tickets_via_selectors(&db, [selector]).await {
650 Ok(res) => debug!(%res, "redemption complete"),
651 Err(error) => error!(%error, "redemption failed"),
652 }
653 }
654 })
655 .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
656 );
657
658 let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
659 processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
660 let chain = self.chain_api.clone();
661 let node_db = self.node_db.clone();
662 let events = chain.subscribe().map_err(HoprLibError::chain)?;
663 spawn(
664 futures::stream::Abortable::new(
665 events
666 .filter_map(move |event|
667 futures::future::ready(event.try_as_channel_closed())
668 ),
669 chain_events_sub_reg
670 )
671 .for_each(move |closed_channel| {
672 let node_db = node_db.clone();
673 let chain = chain.clone();
674 async move {
675 match closed_channel.direction(chain.me()) {
676 Some(ChannelDirection::Incoming) => {
677 match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
678 Ok(num_neglected) if num_neglected > 0 => {
679 warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
680 },
681 Ok(_) => {
682 debug!(%closed_channel, "no neglected tickets on incoming closed channel");
683 },
684 Err(error) => {
685 error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
686 }
687 }
688 },
689 Some(ChannelDirection::Outgoing) => {
690 if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
691 error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
692 } else {
693 debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
694 }
695 }
696 _ => {} }
698 }
699 })
700 .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
701 );
702
703 let mut channels = self
708 .chain_api
709 .stream_channels(ChannelSelector {
710 destination: self.me_onchain().into(),
711 ..Default::default()
712 })
713 .map_err(HoprLibError::chain)
714 .await?;
715
716 while let Some(channel) = channels.next().await {
717 self.node_db
718 .update_ticket_states_and_fetch(
719 [TicketSelector::from(&channel)
720 .with_state(AcknowledgedTicketStatus::BeingRedeemed)
721 .with_index_range(channel.ticket_index..)],
722 AcknowledgedTicketStatus::Untouched,
723 )
724 .map_err(HoprLibError::db)
725 .await?
726 .for_each(|ticket| {
727 info!(%ticket, "fixed next out-of-sync ticket");
728 futures::future::ready(())
729 })
730 .await;
731 }
732
733 self.state.store(HoprState::Running, Ordering::Relaxed);
734
735 info!(
736 id = %self.me_peer_id(),
737 version = constants::APP_VERSION,
738 "NODE STARTED AND RUNNING"
739 );
740
741 #[cfg(all(feature = "prometheus", not(test)))]
742 METRIC_HOPR_NODE_INFO.set(
743 &[
744 &self.me.public().to_peerid_str(),
745 &self.me_onchain().to_string(),
746 &self.cfg.safe_module.safe_address.to_string(),
747 &self.cfg.safe_module.module_address.to_string(),
748 ],
749 1.0,
750 );
751
752 let _ = self.processes.set(processes);
753 Ok(hopr_socket)
754 }
755
756 pub fn shutdown(&self) -> Result<(), HoprLibError> {
764 self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
765 if let Some(processes) = self.processes.get() {
766 processes.abort_all();
767 }
768 self.state.store(HoprState::Terminated, Ordering::Relaxed);
769 info!("NODE SHUTDOWN COMPLETE");
770 Ok(())
771 }
772
773 pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send {
775 self.winning_ticket_subscribers.1.activate_cloned()
776 }
777
778 pub fn me_peer_id(&self) -> PeerId {
781 (*self.me.public()).into()
782 }
783
784 pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
786 Ok(self
787 .chain_api
788 .stream_accounts(AccountSelector {
789 public_only: true,
790 ..Default::default()
791 })
792 .map_err(HoprLibError::chain)
793 .await?
794 .map(|entry| {
795 (
796 PeerId::from(entry.public_key),
797 entry.chain_addr,
798 entry.get_multiaddrs().to_vec(),
799 )
800 })
801 .collect()
802 .await)
803 }
804
805 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, Observations)> {
809 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
810
811 Ok(self.transport_api.ping(peer).await?)
812 }
813
814 #[cfg(feature = "session-client")]
817 pub async fn connect_to(
818 &self,
819 destination: Address,
820 target: SessionTarget,
821 cfg: SessionClientConfig,
822 ) -> errors::Result<HoprSession> {
823 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
824
825 let backoff = backon::ConstantBuilder::default()
826 .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
827 .with_delay(self.cfg.protocol.session.establish_retry_timeout)
828 .with_jitter();
829
830 use backon::Retryable;
831
832 Ok((|| {
833 let cfg = cfg.clone();
834 let target = target.clone();
835 async { self.transport_api.new_session(destination, target, cfg).await }
836 })
837 .retry(backoff)
838 .sleep(backon::FuturesTimerSleeper)
839 .await?)
840 }
841
842 #[cfg(feature = "session-client")]
845 pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
846 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
847 Ok(self.transport_api.probe_session(id).await?)
848 }
849
850 #[cfg(feature = "session-client")]
851 pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
852 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
853 Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
854 }
855
856 #[cfg(feature = "session-client")]
857 pub async fn update_session_surb_balancer_config(
858 &self,
859 id: &SessionId,
860 cfg: SurbBalancerConfig,
861 ) -> errors::Result<()> {
862 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
863 Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
864 }
865
866 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
868 self.transport_api.local_multiaddresses()
869 }
870
871 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
873 self.transport_api.listening_multiaddresses().await
874 }
875
876 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
878 self.transport_api.network_observed_multiaddresses(peer).await
879 }
880
881 pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> errors::Result<Vec<Multiaddr>> {
883 let peer = *peer;
884 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
886
887 match self
888 .chain_api
889 .stream_accounts(AccountSelector {
890 public_only: false,
891 offchain_key: Some(pubkey),
892 ..Default::default()
893 })
894 .map_err(HoprLibError::chain)
895 .await?
896 .next()
897 .await
898 {
899 Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
900 None => {
901 error!(%peer, "no information");
902 Ok(vec![])
903 }
904 }
905 }
906
907 pub async fn network_health(&self) -> Health {
911 self.transport_api.network_health().await
912 }
913
914 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
916 Ok(self.transport_api.network_connected_peers().await?)
917 }
918
919 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<Observations>> {
921 Ok(self.transport_api.network_peer_observations(peer).await?)
922 }
923
924 pub async fn all_network_peers(
926 &self,
927 minimum_score: f64,
928 ) -> errors::Result<Vec<(Option<Address>, PeerId, Observations)>> {
929 Ok(
930 futures::stream::iter(self.transport_api.network_connected_peers().await?)
931 .filter_map(|peer| async move {
932 if let Ok(Some(info)) = self.transport_api.network_peer_observations(&peer).await {
933 if info.score() >= minimum_score {
934 Some((peer, info))
935 } else {
936 None
937 }
938 } else {
939 None
940 }
941 })
942 .filter_map(|(peer_id, info)| async move {
943 let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
944 Some((address, peer_id, info))
945 })
946 .collect::<Vec<_>>()
947 .await,
948 )
949 }
950
951 pub async fn tickets_in_channel(&self, channel_id: &ChannelId) -> errors::Result<Option<Vec<RedeemableTicket>>> {
954 if let Some(channel) = self
955 .chain_api
956 .channel_by_id(channel_id)
957 .await
958 .map_err(|e| HoprTransportError::Other(e.into()))?
959 {
960 if &channel.destination == self.chain_api.me() {
961 Ok(Some(
962 self.node_db
963 .stream_tickets([&channel])
964 .await
965 .map_err(HoprLibError::db)?
966 .collect()
967 .await,
968 ))
969 } else {
970 Ok(None)
971 }
972 } else {
973 Ok(None)
974 }
975 }
976
977 pub async fn all_tickets(&self) -> errors::Result<Vec<VerifiedTicket>> {
979 Ok(self
980 .node_db
981 .stream_tickets(None::<TicketSelector>)
982 .await
983 .map_err(HoprLibError::db)?
984 .map(|v| v.ticket)
985 .collect()
986 .await)
987 }
988
989 pub async fn ticket_statistics(&self) -> errors::Result<ChannelTicketStatistics> {
991 self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
992 }
993
994 pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
996 self.node_db
997 .reset_ticket_statistics()
998 .await
999 .map_err(HoprLibError::chain)
1000 }
1001
1002 pub fn me_onchain(&self) -> Address {
1004 *self.chain_api.me()
1005 }
1006
1007 pub async fn get_ticket_price(&self) -> errors::Result<HoprBalance> {
1009 self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
1010 }
1011
1012 pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
1014 self.chain_api
1015 .minimum_incoming_ticket_win_prob()
1016 .await
1017 .map_err(HoprLibError::chain)
1018 }
1019
1020 pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1022 Ok(self
1023 .chain_api
1024 .stream_accounts(AccountSelector {
1025 public_only: true,
1026 ..Default::default()
1027 })
1028 .map_err(HoprLibError::chain)
1029 .await?
1030 .collect()
1031 .await)
1032 }
1033
1034 pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1037 self.chain_api
1038 .channel_by_id(channel_id)
1039 .await
1040 .map_err(HoprLibError::chain)
1041 }
1042
1043 pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<Option<ChannelEntry>> {
1048 self.chain_api
1049 .channel_by_parties(src, dest)
1050 .await
1051 .map_err(HoprLibError::chain)
1052 }
1053
1054 pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1056 Ok(self
1057 .chain_api
1058 .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1059 ChannelStatusDiscriminants::Closed,
1060 ChannelStatusDiscriminants::Open,
1061 ChannelStatusDiscriminants::PendingToClose,
1062 ]))
1063 .map_err(HoprLibError::chain)
1064 .await?
1065 .collect()
1066 .await)
1067 }
1068
1069 pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1071 Ok(self
1072 .chain_api
1073 .stream_channels(
1074 ChannelSelector::default()
1075 .with_destination(*dest)
1076 .with_allowed_states(&[
1077 ChannelStatusDiscriminants::Closed,
1078 ChannelStatusDiscriminants::Open,
1079 ChannelStatusDiscriminants::PendingToClose,
1080 ]),
1081 )
1082 .map_err(HoprLibError::chain)
1083 .await?
1084 .collect()
1085 .await)
1086 }
1087
1088 pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1090 Ok(self
1091 .chain_api
1092 .stream_channels(ChannelSelector::default().with_allowed_states(&[
1093 ChannelStatusDiscriminants::Closed,
1094 ChannelStatusDiscriminants::Open,
1095 ChannelStatusDiscriminants::PendingToClose,
1096 ]))
1097 .map_err(HoprLibError::chain)
1098 .await?
1099 .collect()
1100 .await)
1101 }
1102
1103 pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1105 self.chain_api
1106 .safe_allowance(self.cfg.safe_module.safe_address)
1107 .await
1108 .map_err(HoprLibError::chain)
1109 }
1110
1111 pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1115 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1116
1117 self.chain_api
1118 .withdraw(amount, &recipient)
1119 .and_then(identity)
1120 .map_err(HoprLibError::chain)
1121 .await
1122 }
1123
1124 pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<prelude::Hash> {
1128 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1129
1130 self.chain_api
1131 .withdraw(amount, &recipient)
1132 .and_then(identity)
1133 .map_err(HoprLibError::chain)
1134 .await
1135 }
1136
1137 pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1138 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1139
1140 let (channel_id, tx_hash) = self
1141 .chain_api
1142 .open_channel(destination, amount)
1143 .and_then(identity)
1144 .map_err(HoprLibError::chain)
1145 .await?;
1146
1147 Ok(OpenChannelResult { tx_hash, channel_id })
1148 }
1149
1150 pub async fn fund_channel(&self, channel_id: &prelude::Hash, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1151 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1152
1153 self.chain_api
1154 .fund_channel(channel_id, amount)
1155 .and_then(identity)
1156 .map_err(HoprLibError::chain)
1157 .await
1158 }
1159
1160 pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> errors::Result<CloseChannelResult> {
1161 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1162
1163 let tx_hash = self
1164 .chain_api
1165 .close_channel(channel_id)
1166 .and_then(identity)
1167 .map_err(HoprLibError::chain)
1168 .await?;
1169
1170 Ok(CloseChannelResult { tx_hash })
1171 }
1172
1173 pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1174 self.chain_api
1175 .channel_closure_notice_period()
1176 .await
1177 .map_err(HoprLibError::chain)
1178 }
1179
1180 pub fn redemption_requests(
1181 &self,
1182 ) -> errors::Result<impl futures::Sink<TicketSelector, Error = HoprLibError> + Clone> {
1183 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1184
1185 Ok(self
1187 .redeem_requests
1188 .get()
1189 .cloned()
1190 .expect("redeem_requests is not initialized")
1191 .sink_map_err(HoprLibError::other))
1192 }
1193
1194 pub async fn redeem_all_tickets<B: Into<HoprBalance>>(&self, min_value: B) -> errors::Result<()> {
1195 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1196
1197 let min_value = min_value.into();
1198
1199 self.chain_api
1200 .stream_channels(
1201 ChannelSelector::default()
1202 .with_destination(self.me_onchain())
1203 .with_allowed_states(&[
1204 ChannelStatusDiscriminants::Open,
1205 ChannelStatusDiscriminants::PendingToClose,
1206 ]),
1207 )
1208 .map_err(HoprLibError::chain)
1209 .await?
1210 .map(|channel| {
1211 Ok(TicketSelector::from(&channel)
1212 .with_amount(min_value..)
1213 .with_index_range(channel.ticket_index..)
1214 .with_state(AcknowledgedTicketStatus::Untouched))
1215 })
1216 .forward(self.redemption_requests()?)
1217 .await?;
1218
1219 Ok(())
1220 }
1221
1222 pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1223 &self,
1224 counterparty: &Address,
1225 min_value: B,
1226 ) -> errors::Result<()> {
1227 self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1228 .await
1229 }
1230
1231 pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1232 &self,
1233 channel_id: &Hash,
1234 min_value: B,
1235 ) -> errors::Result<()> {
1236 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1237
1238 let channel = self
1239 .chain_api
1240 .channel_by_id(channel_id)
1241 .await
1242 .map_err(HoprLibError::chain)?
1243 .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1244
1245 self.redemption_requests()?
1246 .send(
1247 TicketSelector::from(channel)
1248 .with_amount(min_value.into()..)
1249 .with_index_range(channel.ticket_index..)
1250 .with_state(AcknowledgedTicketStatus::Untouched),
1251 )
1252 .await?;
1253
1254 Ok(())
1255 }
1256
1257 pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1258 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1259
1260 self.redemption_requests()?
1261 .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1262 .await?;
1263
1264 Ok(())
1265 }
1266
1267 pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1268 let peer_id = *peer_id;
1269 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer_id))
1271 .await
1272 .map_err(|e| HoprLibError::GeneralError(format!("failed to convert peer id to off-chain key: {}", e)))?;
1273
1274 self.chain_api
1275 .packet_key_to_chain_key(&pubkey)
1276 .await
1277 .map_err(HoprLibError::chain)
1278 }
1279
1280 pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1281 self.chain_api
1282 .chain_key_to_packet_key(address)
1283 .await
1284 .map(|pk| pk.map(|v| v.into()))
1285 .map_err(HoprLibError::chain)
1286 }
1287}
1288
1289impl<Chain, Db> Hopr<Chain, Db> {
1290 pub fn collect_hopr_metrics() -> errors::Result<String> {
1293 cfg_if::cfg_if! {
1294 if #[cfg(all(feature = "prometheus", not(test)))] {
1295 hopr_metrics::gather_all_metrics().map_err(|e| HoprLibError::Other(e.into()))
1296 } else {
1297 Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
1298 }
1299 }
1300 }
1301}