1mod helpers;
18
19pub mod config;
21pub mod constants;
23pub mod errors;
25
26pub mod utils;
28
29pub mod traits;
31
32pub mod state;
34
35#[cfg(any(feature = "testing", test))]
36pub mod testing;
37
38pub use hopr_api as api;
39
40#[doc(hidden)]
42pub mod exports {
43 pub mod types {
44 pub use hopr_internal_types as internal;
45 pub use hopr_primitive_types as primitive;
46 }
47
48 pub mod crypto {
49 pub use hopr_crypto_keypair as keypair;
50 pub use hopr_crypto_types as types;
51 }
52
53 pub mod network {
54 pub use hopr_network_types as types;
55 }
56
57 pub use hopr_transport as transport;
58}
59
60#[doc(hidden)]
62pub mod prelude {
63 pub use super::exports::{
64 crypto::{
65 keypair::key_pair::HoprKeys,
66 types::prelude::{ChainKeypair, Hash, OffchainKeypair},
67 },
68 network::types::{
69 prelude::ForeignDataMode,
70 udp::{ConnectedUdpStream, UdpStreamParallelism},
71 },
72 transport::{OffchainPublicKey, socket::HoprSocket},
73 types::primitive::prelude::Address,
74 };
75}
76
77use std::{
78 convert::identity,
79 num::NonZeroUsize,
80 sync::{Arc, OnceLock, atomic::Ordering},
81 time::Duration,
82};
83
84use futures::{FutureExt, SinkExt, Stream, StreamExt, TryFutureExt, channel::mpsc::channel};
85use hopr_api::{
86 chain::{AccountSelector, AnnouncementError, ChannelSelector, *},
87 ct::TrafficGeneration,
88 db::{HoprNodeDbApi, TicketMarker, TicketSelector},
89};
90pub use hopr_api::{db::ChannelTicketStatistics, network::Observable};
91use hopr_async_runtime::prelude::spawn;
92pub use hopr_async_runtime::{Abortable, AbortableList};
93pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
94pub use hopr_crypto_types::prelude::*;
95pub use hopr_internal_types::prelude::*;
96pub use hopr_network_types::prelude::*;
97#[cfg(all(feature = "prometheus", not(test)))]
98use hopr_platform::time::native::current_time;
99pub use hopr_primitive_types::prelude::*;
100use hopr_transport::errors::HoprTransportError;
101#[cfg(feature = "runtime-tokio")]
102pub use hopr_transport::transfer_session;
103pub use hopr_transport::*;
104use tracing::{debug, error, info, warn};
105use validator::Validate;
106
107pub use crate::{
108 config::SafeModule,
109 constants::{MIN_NATIVE_BALANCE, SUGGESTED_NATIVE_BALANCE},
110 errors::{HoprLibError, HoprStatusError},
111 state::{HoprLibProcess, HoprState},
112 traits::chain::{CloseChannelResult, OpenChannelResult},
113};
114
115#[cfg(all(feature = "prometheus", not(test)))]
116lazy_static::lazy_static! {
117 static ref METRIC_PROCESS_START_TIME: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
118 "hopr_start_time",
119 "The unix timestamp in seconds at which the process was started"
120 ).unwrap();
121 static ref METRIC_HOPR_LIB_VERSION: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
122 "hopr_lib_version",
123 "Executed version of hopr-lib",
124 &["version"]
125 ).unwrap();
126 static ref METRIC_HOPR_NODE_INFO: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
127 "hopr_node_addresses",
128 "Node on-chain and off-chain addresses",
129 &["peerid", "address", "safe_address", "module_address"]
130 ).unwrap();
131}
132
133#[cfg(feature = "runtime-tokio")]
138pub fn prepare_tokio_runtime(
139 num_cpu_threads: Option<NonZeroUsize>,
140 num_io_threads: Option<NonZeroUsize>,
141) -> anyhow::Result<tokio::runtime::Runtime> {
142 use std::str::FromStr;
143 let avail_parallelism = std::thread::available_parallelism().ok().map(|v| v.get() / 2);
144
145 hopr_parallelize::cpu::init_thread_pool(
146 num_cpu_threads
147 .map(|v| v.get())
148 .or(avail_parallelism)
149 .ok_or(anyhow::anyhow!(
150 "Could not determine the number of CPU threads to use. Please set the HOPRD_NUM_CPU_THREADS \
151 environment variable."
152 ))?
153 .max(1),
154 )?;
155
156 Ok(tokio::runtime::Builder::new_multi_thread()
157 .enable_all()
158 .worker_threads(
159 num_io_threads
160 .map(|v| v.get())
161 .or(avail_parallelism)
162 .ok_or(anyhow::anyhow!(
163 "Could not determine the number of IO threads to use. Please set the HOPRD_NUM_IO_THREADS \
164 environment variable."
165 ))?
166 .max(1),
167 )
168 .thread_name("hoprd")
169 .thread_stack_size(
170 std::env::var("HOPRD_THREAD_STACK_SIZE")
171 .ok()
172 .and_then(|v| usize::from_str(&v).ok())
173 .unwrap_or(10 * 1024 * 1024)
174 .max(2 * 1024 * 1024),
175 )
176 .build()?)
177}
178
179pub type HoprTransportIO = socket::HoprSocket<
181 futures::channel::mpsc::Receiver<ApplicationDataIn>,
182 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
183>;
184
185type NewTicketEvents = (
186 async_broadcast::Sender<VerifiedTicket>,
187 async_broadcast::InactiveReceiver<VerifiedTicket>,
188);
189
190pub struct Hopr<Chain, Db> {
202 me: OffchainKeypair,
203 cfg: config::HoprLibConfig,
204 state: Arc<state::AtomicHoprState>,
205 transport_api: HoprTransport<Chain, Db>,
206 redeem_requests: OnceLock<futures::channel::mpsc::Sender<TicketSelector>>,
207 node_db: Db,
208 chain_api: Chain,
209 winning_ticket_subscribers: NewTicketEvents,
210 processes: OnceLock<AbortableList<HoprLibProcess>>,
211}
212
213impl<Chain, Db> Hopr<Chain, Db>
214where
215 Chain: HoprChainApi + Clone + Send + Sync + 'static,
216 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
217{
218 pub async fn new(
219 identity: (&ChainKeypair, &OffchainKeypair),
220 hopr_chain_api: Chain,
221 hopr_node_db: Db,
222 cfg: config::HoprLibConfig,
223 ) -> errors::Result<Self> {
224 if hopr_crypto_random::is_rng_fixed() {
225 warn!("!! FOR TESTING ONLY !! THIS BUILD IS USING AN INSECURE FIXED RNG !!")
226 }
227
228 cfg.validate()?;
229
230 let hopr_transport_api = HoprTransport::new(
231 identity,
232 hopr_chain_api.clone(),
233 hopr_node_db.clone(),
234 vec![(&cfg.host).try_into().map_err(HoprLibError::TransportError)?],
235 cfg.protocol,
236 );
237
238 #[cfg(all(feature = "prometheus", not(test)))]
239 {
240 METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
241 METRIC_HOPR_LIB_VERSION.set(
242 &[const_format::formatcp!("{}", constants::APP_VERSION)],
243 const_format::formatcp!(
244 "{}.{}",
245 env!("CARGO_PKG_VERSION_MAJOR"),
246 env!("CARGO_PKG_VERSION_MINOR")
247 )
248 .parse()
249 .unwrap_or(0.0),
250 );
251
252 if let Err(error) = hopr_node_db.get_ticket_statistics(None).await {
254 error!(%error, "failed to initialize ticket statistics metrics");
255 }
256 }
257
258 let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
259 new_tickets_tx.set_await_active(false);
260 new_tickets_tx.set_overflow(true);
261
262 Ok(Self {
263 me: identity.1.clone(),
264 cfg,
265 state: Arc::new(state::AtomicHoprState::new(HoprState::Uninitialized)),
266 transport_api: hopr_transport_api,
267 chain_api: hopr_chain_api,
268 node_db: hopr_node_db,
269 redeem_requests: OnceLock::new(),
270 processes: OnceLock::new(),
271 winning_ticket_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
272 })
273 }
274
275 fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
276 if self.status() == state {
277 Ok(())
278 } else {
279 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
280 }
281 }
282
283 pub fn status(&self) -> HoprState {
284 self.state.load(Ordering::Relaxed)
285 }
286
287 pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
288 self.chain_api
289 .balance(self.me_onchain())
290 .await
291 .map_err(HoprLibError::chain)
292 }
293
294 pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
295 self.chain_api
296 .balance(self.cfg.safe_module.safe_address)
297 .await
298 .map_err(HoprLibError::chain)
299 }
300
301 pub async fn chain_info(&self) -> errors::Result<ChainInfo> {
302 self.chain_api.chain_info().await.map_err(HoprLibError::chain)
303 }
304
305 pub fn get_safe_config(&self) -> SafeModule {
306 self.cfg.safe_module.clone()
307 }
308
309 pub fn config(&self) -> &config::HoprLibConfig {
310 &self.cfg
311 }
312
313 #[inline]
314 fn is_public(&self) -> bool {
315 self.cfg.publish
316 }
317
318 pub async fn run<
319 Ct,
320 #[cfg(feature = "session-server")] T: traits::session::HoprSessionServer + Clone + Send + 'static,
321 >(
322 &self,
323 cover_traffic: Ct,
324 #[cfg(feature = "session-server")] serve_handler: T,
325 ) -> errors::Result<HoprTransportIO>
326 where
327 Ct: TrafficGeneration + Send + Sync + 'static,
328 {
329 self.error_if_not_in_state(
330 HoprState::Uninitialized,
331 "cannot start the hopr node multiple times".into(),
332 )?;
333
334 #[cfg(feature = "testing")]
335 warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
336
337 info!(
338 address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
339 "node is not started, please fund this node",
340 );
341
342 helpers::wait_for_funds(
343 *MIN_NATIVE_BALANCE,
344 *SUGGESTED_NATIVE_BALANCE,
345 Duration::from_secs(200),
346 self.me_onchain(),
347 &self.chain_api,
348 )
349 .await?;
350
351 let mut processes = AbortableList::<HoprLibProcess>::default();
352
353 info!("starting HOPR node...");
354 self.state.store(HoprState::Initializing, Ordering::Relaxed);
355
356 let balance: XDaiBalance = self.get_balance().await?;
357 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
358
359 info!(
360 address = %self.me_onchain(),
361 %balance,
362 %minimum_balance,
363 "node information"
364 );
365
366 if balance.le(&minimum_balance) {
367 return Err(HoprLibError::GeneralError(
368 "cannot start the node without a sufficiently funded wallet".into(),
369 ));
370 }
371
372 let network_min_ticket_price = self
375 .chain_api
376 .minimum_ticket_price()
377 .await
378 .map_err(HoprLibError::chain)?;
379 let configured_ticket_price = self.cfg.protocol.packet.codec.outgoing_ticket_price;
380 if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
381 return Err(HoprLibError::GeneralError(format!(
382 "configured outgoing ticket price is lower than the network minimum ticket price: \
383 {configured_ticket_price:?} < {network_min_ticket_price}"
384 )));
385 }
386 let network_min_win_prob = self
389 .chain_api
390 .minimum_incoming_ticket_win_prob()
391 .await
392 .map_err(HoprLibError::chain)?;
393 let configured_win_prob = self.cfg.protocol.packet.codec.outgoing_win_prob;
394 if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
395 && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
396 {
397 return Err(HoprLibError::GeneralError(format!(
398 "configured outgoing ticket winning probability is lower than the network minimum winning \
399 probability: {configured_win_prob:?} < {network_min_win_prob}"
400 )));
401 }
402
403 let minimum_capacity = self
406 .chain_api
407 .count_accounts(AccountSelector {
408 public_only: true,
409 ..Default::default()
410 })
411 .await
412 .map_err(HoprLibError::chain)?
413 .saturating_mul(2)
414 .saturating_add(100);
415
416 let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
417 .ok()
418 .and_then(|s| s.trim().parse::<usize>().ok())
419 .filter(|&c| c > 0)
420 .unwrap_or(2048)
421 .max(minimum_capacity);
422
423 debug!(
424 capacity = chain_discovery_events_capacity,
425 minimum_required = minimum_capacity,
426 "creating chain discovery events channel"
427 );
428 let (indexer_peer_update_tx, indexer_peer_update_rx) =
429 channel::<PeerDiscovery>(chain_discovery_events_capacity);
430
431 let (announcements_stream, announcements_handle) = futures::stream::abortable(
434 self.chain_api
435 .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])
436 .map_err(HoprLibError::chain)?,
437 );
438 processes.insert(HoprLibProcess::AccountAnnouncements, announcements_handle);
439
440 spawn(
441 announcements_stream
442 .filter_map(|event| {
443 futures::future::ready(event.try_as_announcement().map(|account| {
444 PeerDiscovery::Announce(account.public_key.into(), account.get_multiaddrs().to_vec())
445 }))
446 })
447 .map(Ok)
448 .forward(indexer_peer_update_tx)
449 .inspect(
450 |_| warn!(task = %HoprLibProcess::AccountAnnouncements,"long-running background task finished"),
451 ),
452 );
453
454 info!(peer_id = %self.me_peer_id(), address = %self.me_onchain(), version = constants::APP_VERSION, "Node information");
455
456 let safe_addr = self.cfg.safe_module.safe_address;
457
458 if self.me_onchain() == safe_addr {
459 return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
460 }
461
462 info!(%safe_addr, "registering safe with this node");
463 match self.chain_api.register_safe(&safe_addr).await {
464 Ok(awaiter) => {
465 awaiter.await.map_err(|error| {
467 error!(%safe_addr, %error, "safe registration failed with error");
468 HoprLibError::chain(error)
469 })?;
470 info!(%safe_addr, "safe successfully registered with this node");
471 }
472 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe == safe_addr => {
473 info!(%safe_addr, "this safe is already registered with this node");
474 }
475 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) if registered_safe != safe_addr => {
476 error!(%safe_addr, %registered_safe, "this node is currently registered with different safe");
478 return Err(HoprLibError::GeneralError("node registered with different safe".into()));
479 }
480 Err(error) => {
481 error!(%safe_addr, %error, "safe registration failed");
482 return Err(HoprLibError::chain(error));
483 }
484 }
485
486 let multiaddresses_to_announce = if self.is_public() {
488 self.transport_api.announceable_multiaddresses()
491 } else {
492 Vec::with_capacity(0)
493 };
494
495 multiaddresses_to_announce
497 .iter()
498 .filter(|a| !is_public_address(a))
499 .for_each(|multi_addr| warn!(?multi_addr, "announcing private multiaddress"));
500
501 info!(?multiaddresses_to_announce, "announcing node on chain");
504 match self.chain_api.announce(&multiaddresses_to_announce, &self.me).await {
505 Ok(awaiter) => {
506 awaiter.await.map_err(|error| {
508 error!(?multiaddresses_to_announce, %error, "node announcement failed");
509 HoprLibError::chain(error)
510 })?;
511 info!(?multiaddresses_to_announce, "node has been successfully announced");
512 }
513 Err(AnnouncementError::AlreadyAnnounced) => {
514 info!(multiaddresses_announced = ?multiaddresses_to_announce, "node already announced on chain")
515 }
516 Err(error) => {
517 error!(%error, ?multiaddresses_to_announce, "failed to transmit node announcement");
518 return Err(HoprLibError::chain(error));
519 }
520 }
521
522 let incoming_session_channel_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
523 .ok()
524 .and_then(|s| s.trim().parse::<usize>().ok())
525 .filter(|&c| c > 0)
526 .unwrap_or(256);
527
528 let (session_tx, _session_rx) = channel::<IncomingSession>(incoming_session_channel_capacity);
529 #[cfg(feature = "session-server")]
530 {
531 debug!(capacity = incoming_session_channel_capacity, "creating session server");
532 processes.insert(
533 HoprLibProcess::SessionServer,
534 hopr_async_runtime::spawn_as_abortable!(
535 _session_rx
536 .for_each_concurrent(None, move |session| {
537 let serve_handler = serve_handler.clone();
538 async move {
539 let session_id = *session.session.id();
540 match serve_handler.process(session).await {
541 Ok(_) => debug!(?session_id, "client session processed successfully"),
542 Err(error) => error!(
543 ?session_id,
544 %error,
545 "client session processing failed"
546 ),
547 }
548 }
549 })
550 .inspect(|_| tracing::warn!(
551 task = %HoprLibProcess::SessionServer,
552 "long-running background task finished"
553 ))
554 ),
555 );
556 }
557
558 info!("starting ticket events processor");
559 let (tickets_tx, tickets_rx) = channel(1024);
560 let (tickets_rx, tickets_handle) = futures::stream::abortable(tickets_rx);
561 processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
562 let node_db = self.node_db.clone();
563 let new_ticket_tx = self.winning_ticket_subscribers.0.clone();
564 spawn(
565 tickets_rx
566 .filter_map(move |ticket_event| {
567 let node_db = node_db.clone();
568 async move {
569 match ticket_event {
570 TicketEvent::WinningTicket(winning) => {
571 if let Err(error) = node_db.insert_ticket(*winning).await {
572 tracing::error!(%error, %winning, "failed to insert ticket into database");
573 } else {
574 tracing::debug!(%winning, "inserted ticket into database");
575 }
576 Some(winning)
577 }
578 TicketEvent::RejectedTicket(rejected, issuer) => {
579 if let Some(issuer) = &issuer {
580 if let Err(error) =
581 node_db.mark_unsaved_ticket_rejected(issuer, rejected.as_ref()).await
582 {
583 tracing::error!(%error, %rejected, "failed to mark ticket as rejected");
584 } else {
585 tracing::debug!(%rejected, "marked ticket as rejected");
586 }
587 } else {
588 tracing::debug!(%rejected, "issuer of the rejected ticket could not be determined");
589 }
590 None
591 }
592 }
593 }
594 })
595 .for_each(move |ticket| {
596 if let Err(error) = new_ticket_tx.try_broadcast(ticket.ticket) {
597 tracing::error!(%error, "failed to broadcast new winning ticket to subscribers");
598 }
599 futures::future::ready(())
600 })
601 .inspect(|_| {
602 tracing::warn!(
603 task = %HoprLibProcess::TicketEvents,
604 "long-running background task finished"
605 )
606 }),
607 );
608
609 info!("starting transport");
610 let (hopr_socket, transport_processes) = self
611 .transport_api
612 .run(cover_traffic, indexer_peer_update_rx, tickets_tx, session_tx)
613 .await?;
614 processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
615
616 let (redemption_req_tx, redemption_req_rx) = channel::<TicketSelector>(1024);
618 let _ = self.redeem_requests.set(redemption_req_tx);
619 let (redemption_req_rx, redemption_req_handle) = futures::stream::abortable(redemption_req_rx);
620 processes.insert(HoprLibProcess::TicketRedemptions, redemption_req_handle);
621 let chain = self.chain_api.clone();
622 let node_db = self.node_db.clone();
623 spawn(redemption_req_rx
624 .for_each(move |selector| {
625 let chain = chain.clone();
626 let db = node_db.clone();
627 async move {
628 match chain.redeem_tickets_via_selectors(&db, [selector]).await {
629 Ok(res) => debug!(%res, "redemption complete"),
630 Err(error) => error!(%error, "redemption failed"),
631 }
632 }
633 })
634 .inspect(|_| tracing::warn!(task = %HoprLibProcess::TicketRedemptions, "long-running background task finished"))
635 );
636
637 let (chain_events_sub_handle, chain_events_sub_reg) = hopr_async_runtime::AbortHandle::new_pair();
638 processes.insert(HoprLibProcess::ChannelEvents, chain_events_sub_handle);
639 let chain = self.chain_api.clone();
640 let node_db = self.node_db.clone();
641 let events = chain.subscribe().map_err(HoprLibError::chain)?;
642 spawn(
643 futures::stream::Abortable::new(
644 events
645 .filter_map(move |event|
646 futures::future::ready(event.try_as_channel_closed())
647 ),
648 chain_events_sub_reg
649 )
650 .for_each(move |closed_channel| {
651 let node_db = node_db.clone();
652 let chain = chain.clone();
653 async move {
654 match closed_channel.direction(chain.me()) {
655 Some(ChannelDirection::Incoming) => {
656 match node_db.mark_tickets_as([&closed_channel], TicketMarker::Neglected).await {
657 Ok(num_neglected) if num_neglected > 0 => {
658 warn!(%num_neglected, %closed_channel, "tickets on incoming closed channel were neglected");
659 },
660 Ok(_) => {
661 debug!(%closed_channel, "no neglected tickets on incoming closed channel");
662 },
663 Err(error) => {
664 error!(%error, %closed_channel, "failed to mark tickets on incoming closed channel as neglected");
665 }
666 }
667 },
668 Some(ChannelDirection::Outgoing) => {
669 if let Err(error) = node_db.remove_outgoing_ticket_index(closed_channel.get_id(), closed_channel.channel_epoch).await {
670 error!(%error, %closed_channel, "failed to reset ticket index on closed outgoing channel");
671 } else {
672 debug!(%closed_channel, "outgoing ticket index has been resets on outgoing channel closure");
673 }
674 }
675 _ => {} }
677 }
678 })
679 .inspect(|_| tracing::warn!(task = %HoprLibProcess::ChannelEvents, "long-running background task finished"))
680 );
681
682 let mut channels = self
687 .chain_api
688 .stream_channels(ChannelSelector {
689 destination: self.me_onchain().into(),
690 ..Default::default()
691 })
692 .map_err(HoprLibError::chain)
693 .await?;
694
695 while let Some(channel) = channels.next().await {
696 self.node_db
697 .update_ticket_states_and_fetch(
698 [TicketSelector::from(&channel)
699 .with_state(AcknowledgedTicketStatus::BeingRedeemed)
700 .with_index_range(channel.ticket_index..)],
701 AcknowledgedTicketStatus::Untouched,
702 )
703 .map_err(HoprLibError::db)
704 .await?
705 .for_each(|ticket| {
706 info!(%ticket, "fixed next out-of-sync ticket");
707 futures::future::ready(())
708 })
709 .await;
710 }
711
712 self.state.store(HoprState::Running, Ordering::Relaxed);
713
714 info!(
715 id = %self.me_peer_id(),
716 version = constants::APP_VERSION,
717 "NODE STARTED AND RUNNING"
718 );
719
720 #[cfg(all(feature = "prometheus", not(test)))]
721 METRIC_HOPR_NODE_INFO.set(
722 &[
723 &self.me.public().to_peerid_str(),
724 &self.me_onchain().to_string(),
725 &self.cfg.safe_module.safe_address.to_string(),
726 &self.cfg.safe_module.module_address.to_string(),
727 ],
728 1.0,
729 );
730
731 let _ = self.processes.set(processes);
732 Ok(hopr_socket)
733 }
734
735 pub fn shutdown(&self) -> Result<(), HoprLibError> {
743 self.error_if_not_in_state(HoprState::Running, "node is not running".into())?;
744 if let Some(processes) = self.processes.get() {
745 processes.abort_all();
746 }
747 self.state.store(HoprState::Terminated, Ordering::Relaxed);
748 info!("NODE SHUTDOWN COMPLETE");
749 Ok(())
750 }
751
752 pub fn subscribe_winning_tickets(&self) -> impl Stream<Item = VerifiedTicket> + Send {
754 self.winning_ticket_subscribers.1.activate_cloned()
755 }
756
757 pub fn me_peer_id(&self) -> PeerId {
760 (*self.me.public()).into()
761 }
762
763 pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
765 Ok(self
766 .chain_api
767 .stream_accounts(AccountSelector {
768 public_only: true,
769 ..Default::default()
770 })
771 .map_err(HoprLibError::chain)
772 .await?
773 .map(|entry| {
774 (
775 PeerId::from(entry.public_key),
776 entry.chain_addr,
777 entry.get_multiaddrs().to_vec(),
778 )
779 })
780 .collect()
781 .await)
782 }
783
784 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, Observations)> {
788 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
789
790 Ok(self.transport_api.ping(peer).await?)
791 }
792
793 #[cfg(feature = "session-client")]
796 pub async fn connect_to(
797 &self,
798 destination: Address,
799 target: SessionTarget,
800 cfg: SessionClientConfig,
801 ) -> errors::Result<HoprSession> {
802 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
803
804 let backoff = backon::ConstantBuilder::default()
805 .with_max_times(self.cfg.protocol.session.establish_max_retries as usize)
806 .with_delay(self.cfg.protocol.session.establish_retry_timeout)
807 .with_jitter();
808
809 use backon::Retryable;
810
811 Ok((|| {
812 let cfg = cfg.clone();
813 let target = target.clone();
814 async { self.transport_api.new_session(destination, target, cfg).await }
815 })
816 .retry(backoff)
817 .sleep(backon::FuturesTimerSleeper)
818 .await?)
819 }
820
821 #[cfg(feature = "session-client")]
824 pub async fn keep_alive_session(&self, id: &SessionId) -> errors::Result<()> {
825 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
826 Ok(self.transport_api.probe_session(id).await?)
827 }
828
829 #[cfg(feature = "session-client")]
830 pub async fn get_session_surb_balancer_config(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
831 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
832 Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
833 }
834
835 #[cfg(feature = "session-client")]
836 pub async fn update_session_surb_balancer_config(
837 &self,
838 id: &SessionId,
839 cfg: SurbBalancerConfig,
840 ) -> errors::Result<()> {
841 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
842 Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
843 }
844
845 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
847 self.transport_api.local_multiaddresses()
848 }
849
850 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
852 self.transport_api.listening_multiaddresses().await
853 }
854
855 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
857 self.transport_api.network_observed_multiaddresses(peer).await
858 }
859
860 pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> errors::Result<Vec<Multiaddr>> {
862 let peer = *peer;
863 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await?;
865
866 match self
867 .chain_api
868 .stream_accounts(AccountSelector {
869 public_only: false,
870 offchain_key: Some(pubkey),
871 ..Default::default()
872 })
873 .map_err(HoprLibError::chain)
874 .await?
875 .next()
876 .await
877 {
878 Some(entry) => Ok(entry.get_multiaddrs().to_vec()),
879 None => {
880 error!(%peer, "no information");
881 Ok(vec![])
882 }
883 }
884 }
885
886 pub async fn network_health(&self) -> Health {
890 self.transport_api.network_health().await
891 }
892
893 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
895 Ok(self.transport_api.network_connected_peers().await?)
896 }
897
898 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<Observations>> {
900 Ok(self.transport_api.network_peer_observations(peer).await?)
901 }
902
903 pub async fn all_network_peers(
905 &self,
906 minimum_score: f64,
907 ) -> errors::Result<Vec<(Option<Address>, PeerId, Observations)>> {
908 Ok(
909 futures::stream::iter(self.transport_api.network_connected_peers().await?)
910 .filter_map(|peer| async move {
911 if let Ok(Some(info)) = self.transport_api.network_peer_observations(&peer).await {
912 if info.score() >= minimum_score {
913 Some((peer, info))
914 } else {
915 None
916 }
917 } else {
918 None
919 }
920 })
921 .filter_map(|(peer_id, info)| async move {
922 let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
923 Some((address, peer_id, info))
924 })
925 .collect::<Vec<_>>()
926 .await,
927 )
928 }
929
930 pub async fn tickets_in_channel(&self, channel_id: &ChannelId) -> errors::Result<Option<Vec<RedeemableTicket>>> {
933 if let Some(channel) = self
934 .chain_api
935 .channel_by_id(channel_id)
936 .await
937 .map_err(|e| HoprTransportError::Other(e.into()))?
938 {
939 if &channel.destination == self.chain_api.me() {
940 Ok(Some(
941 self.node_db
942 .stream_tickets([&channel])
943 .await
944 .map_err(HoprLibError::db)?
945 .collect()
946 .await,
947 ))
948 } else {
949 Ok(None)
950 }
951 } else {
952 Ok(None)
953 }
954 }
955
956 pub async fn all_tickets(&self) -> errors::Result<Vec<VerifiedTicket>> {
958 Ok(self
959 .node_db
960 .stream_tickets(None::<TicketSelector>)
961 .await
962 .map_err(HoprLibError::db)?
963 .map(|v| v.ticket)
964 .collect()
965 .await)
966 }
967
968 pub async fn ticket_statistics(&self) -> errors::Result<ChannelTicketStatistics> {
970 self.node_db.get_ticket_statistics(None).await.map_err(HoprLibError::db)
971 }
972
973 pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
975 self.node_db
976 .reset_ticket_statistics()
977 .await
978 .map_err(HoprLibError::chain)
979 }
980
981 pub fn me_onchain(&self) -> Address {
983 *self.chain_api.me()
984 }
985
986 pub async fn get_ticket_price(&self) -> errors::Result<HoprBalance> {
988 self.chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)
989 }
990
991 pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
993 self.chain_api
994 .minimum_incoming_ticket_win_prob()
995 .await
996 .map_err(HoprLibError::chain)
997 }
998
999 pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1001 Ok(self
1002 .chain_api
1003 .stream_accounts(AccountSelector {
1004 public_only: true,
1005 ..Default::default()
1006 })
1007 .map_err(HoprLibError::chain)
1008 .await?
1009 .collect()
1010 .await)
1011 }
1012
1013 pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1016 self.chain_api
1017 .channel_by_id(channel_id)
1018 .await
1019 .map_err(HoprLibError::chain)
1020 }
1021
1022 pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<Option<ChannelEntry>> {
1027 self.chain_api
1028 .channel_by_parties(src, dest)
1029 .await
1030 .map_err(HoprLibError::chain)
1031 }
1032
1033 pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1035 Ok(self
1036 .chain_api
1037 .stream_channels(ChannelSelector::default().with_source(*src).with_allowed_states(&[
1038 ChannelStatusDiscriminants::Closed,
1039 ChannelStatusDiscriminants::Open,
1040 ChannelStatusDiscriminants::PendingToClose,
1041 ]))
1042 .map_err(HoprLibError::chain)
1043 .await?
1044 .collect()
1045 .await)
1046 }
1047
1048 pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1050 Ok(self
1051 .chain_api
1052 .stream_channels(
1053 ChannelSelector::default()
1054 .with_destination(*dest)
1055 .with_allowed_states(&[
1056 ChannelStatusDiscriminants::Closed,
1057 ChannelStatusDiscriminants::Open,
1058 ChannelStatusDiscriminants::PendingToClose,
1059 ]),
1060 )
1061 .map_err(HoprLibError::chain)
1062 .await?
1063 .collect()
1064 .await)
1065 }
1066
1067 pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1069 Ok(self
1070 .chain_api
1071 .stream_channels(ChannelSelector::default().with_allowed_states(&[
1072 ChannelStatusDiscriminants::Closed,
1073 ChannelStatusDiscriminants::Open,
1074 ChannelStatusDiscriminants::PendingToClose,
1075 ]))
1076 .map_err(HoprLibError::chain)
1077 .await?
1078 .collect()
1079 .await)
1080 }
1081
1082 pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1084 self.chain_api
1085 .safe_allowance(self.cfg.safe_module.safe_address)
1086 .await
1087 .map_err(HoprLibError::chain)
1088 }
1089
1090 pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1094 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1095
1096 self.chain_api
1097 .withdraw(amount, &recipient)
1098 .and_then(identity)
1099 .map_err(HoprLibError::chain)
1100 .await
1101 }
1102
1103 pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<prelude::Hash> {
1107 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1108
1109 self.chain_api
1110 .withdraw(amount, &recipient)
1111 .and_then(identity)
1112 .map_err(HoprLibError::chain)
1113 .await
1114 }
1115
1116 pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1117 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1118
1119 let (channel_id, tx_hash) = self
1120 .chain_api
1121 .open_channel(destination, amount)
1122 .and_then(identity)
1123 .map_err(HoprLibError::chain)
1124 .await?;
1125
1126 Ok(OpenChannelResult { tx_hash, channel_id })
1127 }
1128
1129 pub async fn fund_channel(&self, channel_id: &prelude::Hash, amount: HoprBalance) -> errors::Result<prelude::Hash> {
1130 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1131
1132 self.chain_api
1133 .fund_channel(channel_id, amount)
1134 .and_then(identity)
1135 .map_err(HoprLibError::chain)
1136 .await
1137 }
1138
1139 pub async fn close_channel_by_id(&self, channel_id: &ChannelId) -> errors::Result<CloseChannelResult> {
1140 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1141
1142 let tx_hash = self
1143 .chain_api
1144 .close_channel(channel_id)
1145 .and_then(identity)
1146 .map_err(HoprLibError::chain)
1147 .await?;
1148
1149 Ok(CloseChannelResult { tx_hash })
1150 }
1151
1152 pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1153 self.chain_api
1154 .channel_closure_notice_period()
1155 .await
1156 .map_err(HoprLibError::chain)
1157 }
1158
1159 pub fn redemption_requests(
1160 &self,
1161 ) -> errors::Result<impl futures::Sink<TicketSelector, Error = HoprLibError> + Clone> {
1162 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1163
1164 Ok(self
1166 .redeem_requests
1167 .get()
1168 .cloned()
1169 .expect("redeem_requests is not initialized")
1170 .sink_map_err(HoprLibError::other))
1171 }
1172
1173 pub async fn redeem_all_tickets<B: Into<HoprBalance>>(&self, min_value: B) -> errors::Result<()> {
1174 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1175
1176 let min_value = min_value.into();
1177
1178 self.chain_api
1179 .stream_channels(
1180 ChannelSelector::default()
1181 .with_destination(self.me_onchain())
1182 .with_allowed_states(&[
1183 ChannelStatusDiscriminants::Open,
1184 ChannelStatusDiscriminants::PendingToClose,
1185 ]),
1186 )
1187 .map_err(HoprLibError::chain)
1188 .await?
1189 .map(|channel| {
1190 Ok(TicketSelector::from(&channel)
1191 .with_amount(min_value..)
1192 .with_index_range(channel.ticket_index..)
1193 .with_state(AcknowledgedTicketStatus::Untouched))
1194 })
1195 .forward(self.redemption_requests()?)
1196 .await?;
1197
1198 Ok(())
1199 }
1200
1201 pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1202 &self,
1203 counterparty: &Address,
1204 min_value: B,
1205 ) -> errors::Result<()> {
1206 self.redeem_tickets_in_channel(&generate_channel_id(counterparty, &self.me_onchain()), min_value)
1207 .await
1208 }
1209
1210 pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1211 &self,
1212 channel_id: &Hash,
1213 min_value: B,
1214 ) -> errors::Result<()> {
1215 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1216
1217 let channel = self
1218 .chain_api
1219 .channel_by_id(channel_id)
1220 .await
1221 .map_err(HoprLibError::chain)?
1222 .ok_or(HoprLibError::GeneralError("Channel not found".into()))?;
1223
1224 self.redemption_requests()?
1225 .send(
1226 TicketSelector::from(channel)
1227 .with_amount(min_value.into()..)
1228 .with_index_range(channel.ticket_index..)
1229 .with_state(AcknowledgedTicketStatus::Untouched),
1230 )
1231 .await?;
1232
1233 Ok(())
1234 }
1235
1236 pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1237 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1238
1239 self.redemption_requests()?
1240 .send(TicketSelector::from(&ack_ticket).with_state(AcknowledgedTicketStatus::Untouched))
1241 .await?;
1242
1243 Ok(())
1244 }
1245
1246 pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1247 let peer_id = *peer_id;
1248 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || prelude::OffchainPublicKey::from_peerid(&peer_id))
1250 .await
1251 .map_err(|e| HoprLibError::GeneralError(format!("failed to convert peer id to off-chain key: {}", e)))?;
1252
1253 self.chain_api
1254 .packet_key_to_chain_key(&pubkey)
1255 .await
1256 .map_err(HoprLibError::chain)
1257 }
1258
1259 pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1260 self.chain_api
1261 .chain_key_to_packet_key(address)
1262 .await
1263 .map(|pk| pk.map(|v| v.into()))
1264 .map_err(HoprLibError::chain)
1265 }
1266}
1267
1268impl<Chain, Db> Hopr<Chain, Db> {
1269 pub fn collect_hopr_metrics() -> errors::Result<String> {
1272 cfg_if::cfg_if! {
1273 if #[cfg(all(feature = "prometheus", not(test)))] {
1274 hopr_metrics::gather_all_metrics().map_err(|e| HoprLibError::Other(e.into()))
1275 } else {
1276 Err(HoprLibError::GeneralError("BUILT WITHOUT METRICS SUPPORT".into()))
1277 }
1278 }
1279 }
1280}