1pub mod config;
18pub mod constants;
20pub mod errors;
22
23use async_lock::RwLock;
24use futures::{
25 channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
26 Stream, StreamExt,
27};
28use std::ops::Deref;
29use std::{
30 collections::HashMap,
31 fmt::{Display, Formatter},
32 path::PathBuf,
33 sync::{atomic::Ordering, Arc},
34 time::Duration,
35};
36use tracing::{debug, error, info, trace, warn};
37
38use errors::{HoprLibError, HoprStatusError};
39use hopr_async_runtime::prelude::{sleep, spawn, JoinHandle};
40use hopr_chain_actions::{
41 action_state::{ActionState, IndexerActionTracker},
42 channels::ChannelActions,
43 node::NodeActions,
44 redeem::TicketRedeemActions,
45};
46use hopr_chain_api::{
47 can_register_with_safe, config::ChainNetworkConfig, errors::HoprChainError, wait_for_funds, HoprChain,
48 HoprChainProcess, SignificantChainEvent,
49};
50use hopr_chain_rpc::HoprRpcOperations;
51use hopr_chain_types::chain_events::ChainEventType;
52use hopr_chain_types::ContractAddresses;
53use hopr_crypto_types::prelude::OffchainPublicKey;
54use hopr_db_api::logs::HoprDbLogOperations;
55use hopr_db_sql::{
56 accounts::HoprDbAccountOperations,
57 api::{info::SafeInfo, resolver::HoprDbResolverOperations, tickets::HoprDbTicketOperations},
58 channels::HoprDbChannelOperations,
59 db::{HoprDb, HoprDbConfig},
60 info::{HoprDbInfoOperations, IndexerStateInfo},
61 prelude::{ChainOrPacketKey::ChainKey, DbSqlError, HoprDbPeersOperations},
62 registry::HoprDbRegistryOperations,
63 HoprDbAllOperations, HoprDbGeneralModelOperations,
64};
65use hopr_path::channel_graph::{ChannelGraph, ChannelGraphConfig, NodeScoreUpdate};
66use hopr_platform::file::native::{join, remove_dir_all};
67use hopr_strategy::strategy::{MultiStrategy, SingularStrategy};
68use hopr_transport::{
69 execute_on_tick, ChainKeypair, Hash, HoprTransport, HoprTransportConfig, HoprTransportProcess, IncomingSession,
70 OffchainKeypair, PeerDiscovery, PeerStatus,
71};
72pub use {
73 hopr_chain_actions::errors::ChainActionsError,
74 hopr_chain_api::config::{
75 Addresses as NetworkContractAddresses, EnvironmentType, Network as ChainNetwork, ProtocolsConfig,
76 },
77 hopr_internal_types::prelude::*,
78 hopr_network_types::prelude::{IpProtocol, RoutingOptions},
79 hopr_path::channel_graph::GraphExportConfig,
80 hopr_primitive_types::prelude::*,
81 hopr_strategy::Strategy,
82 hopr_transport::{
83 config::{looks_like_domain, HostConfig, HostType},
84 constants::RESERVED_TAG_UPPER_LIMIT,
85 errors::{HoprTransportError, NetworkingError, ProtocolError},
86 ApplicationData, HalfKeyChallenge, Health, IncomingSession as HoprIncomingSession, Keypair, Multiaddr,
87 OffchainKeypair as HoprOffchainKeypair, PeerId, SendMsg, ServiceId, Session as HoprSession, SessionCapability,
88 SessionClientConfig, SessionId as HoprSessionId, SessionTarget, TicketStatistics, SESSION_USABLE_MTU_SIZE,
89 },
90};
91
92#[cfg(feature = "runtime-tokio")]
93pub use hopr_transport::transfer_session;
94
95use crate::config::SafeModule;
96use crate::constants::{MIN_NATIVE_BALANCE, ONBOARDING_INFORMATION_INTERVAL, SUGGESTED_NATIVE_BALANCE};
97
98#[cfg(all(feature = "prometheus", not(test)))]
99use {
100 hopr_metrics::metrics::{MultiGauge, SimpleGauge},
101 hopr_platform::time::native::current_time,
102 std::str::FromStr,
103};
104
105#[cfg(all(feature = "prometheus", not(test)))]
106lazy_static::lazy_static! {
107 static ref METRIC_PROCESS_START_TIME: SimpleGauge = SimpleGauge::new(
108 "hopr_up",
109 "The unix timestamp in seconds at which the process was started"
110 ).unwrap();
111 static ref METRIC_HOPR_LIB_VERSION: MultiGauge = MultiGauge::new(
112 "hopr_lib_version",
113 "Executed version of hopr-lib",
114 &["version"]
115 ).unwrap();
116 static ref METRIC_HOPR_NODE_INFO: MultiGauge = MultiGauge::new(
117 "hopr_node_addresses",
118 "Node on-chain and off-chain addresses",
119 &["peerid", "address", "safe_address", "module_address"]
120 ).unwrap();
121}
122
123pub use async_trait::async_trait;
124
125#[cfg(feature = "session-server")]
128#[async_trait::async_trait]
129pub trait HoprSessionReactor {
130 async fn process(&self, session: HoprIncomingSession) -> errors::Result<()>;
132}
133
134#[atomic_enum::atomic_enum]
136#[derive(PartialEq, Eq)]
137pub enum HoprState {
138 Uninitialized = 0,
139 Initializing = 1,
140 Indexing = 2,
141 Starting = 3,
142 Running = 4,
143}
144
145impl Display for HoprState {
146 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
147 write!(f, "{:?}", self)
148 }
149}
150
151pub struct OpenChannelResult {
152 pub tx_hash: Hash,
153 pub channel_id: Hash,
154}
155
156pub struct CloseChannelResult {
157 pub tx_hash: Hash,
158 pub status: ChannelStatus,
159}
160
161#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display)]
166pub enum HoprLibProcesses {
167 #[strum(to_string = "transport: {0}")]
168 Transport(HoprTransportProcess),
169 #[cfg(feature = "session-server")]
170 #[strum(to_string = "session server providing the exit node session stream functionality")]
171 SessionServer,
172 #[strum(to_string = "tick wake up the strategies to perform an action")]
173 StrategyTick,
174 #[strum(to_string = "initial indexing operation into the DB")]
175 Indexing,
176 #[strum(to_string = "processing of indexed operations in internal components")]
177 IndexReflection,
178 #[strum(to_string = "on-chain transaction queue component for outgoing transactions")]
179 OutgoingOnchainActionQueue,
180 #[strum(to_string = "flush operation of outgoing ticket indices to the DB")]
181 TicketIndexFlush,
182 #[strum(to_string = "on received ack ticket trigger")]
183 OnReceivedAcknowledgement,
184}
185
186impl HoprLibProcesses {
187 pub fn can_finish(&self) -> bool {
190 matches!(self, HoprLibProcesses::Indexing)
191 }
192}
193
194impl From<HoprTransportProcess> for HoprLibProcesses {
195 fn from(value: HoprTransportProcess) -> Self {
196 HoprLibProcesses::Transport(value)
197 }
198}
199
200#[allow(clippy::too_many_arguments)]
208pub async fn chain_events_to_transport_events<StreamIn, Db>(
209 event_stream: StreamIn,
210 me_onchain: Address,
211 db: Db,
212 multi_strategy: Arc<MultiStrategy>,
213 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
214 indexer_action_tracker: Arc<IndexerActionTracker>,
215) -> impl Stream<Item = PeerDiscovery> + Send + 'static
216where
217 Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
218 StreamIn: Stream<Item = SignificantChainEvent> + Send + 'static,
219{
220 Box::pin(event_stream.filter_map(move |event| {
221 let db = db.clone();
222 let multi_strategy = multi_strategy.clone();
223 let channel_graph = channel_graph.clone();
224 let indexer_action_tracker = indexer_action_tracker.clone();
225
226 async move {
227 let resolved = indexer_action_tracker.match_and_resolve(&event).await;
228 debug!(count = resolved.len(), event = %event, "resolved indexer expectations", );
229
230 match event.event_type {
231 ChainEventType::Announcement{peer, multiaddresses, ..} => {
232 Some(PeerDiscovery::Announce(peer, multiaddresses))
233 }
234 ChainEventType::ChannelOpened(channel) |
235 ChainEventType::ChannelClosureInitiated(channel) |
236 ChainEventType::ChannelClosed(channel) |
237 ChainEventType::ChannelBalanceIncreased(channel, _) | ChainEventType::ChannelBalanceDecreased(channel, _) | ChainEventType::TicketRedeemed(channel, _) => { let maybe_direction = channel.direction(&me_onchain);
241
242 let change = channel_graph
243 .write()
244 .await
245 .update_channel(channel);
246
247 if let Some(own_channel_direction) = maybe_direction {
249 if let Some(change_set) = change {
250 for channel_change in change_set {
251 let _ = hopr_strategy::strategy::SingularStrategy::on_own_channel_changed(
252 &*multi_strategy,
253 &channel,
254 own_channel_direction,
255 channel_change,
256 )
257 .await;
258 }
259 } else if channel.status == ChannelStatus::Open {
260 let _ = hopr_strategy::strategy::SingularStrategy::on_own_channel_changed(
262 &*multi_strategy,
263 &channel,
264 own_channel_direction,
265 ChannelChange::Status {
266 left: ChannelStatus::Closed,
267 right: ChannelStatus::Open,
268 },
269 )
270 .await;
271 }
272 }
273
274 None
275 }
276 ChainEventType::NetworkRegistryUpdate(address, allowed) => {
277 let packet_key = db.translate_key(None, address).await;
278 match packet_key {
279 Ok(pk) => {
280 if let Some(pk) = pk {
281 let offchain_key: Result<OffchainPublicKey, _> = pk.try_into();
282
283 if let Ok(offchain_key) = offchain_key {
284 let peer_id = offchain_key.into();
285
286 let res = match allowed {
287 hopr_chain_types::chain_events::NetworkRegistryStatus::Allowed => PeerDiscovery::Allow(peer_id),
288 hopr_chain_types::chain_events::NetworkRegistryStatus::Denied => PeerDiscovery::Ban(peer_id),
289 };
290
291 Some(res)
292 } else {
293 error!("Failed to unwrap as offchain key at this point");
294 None
295 }
296 } else {
297 None
298 }
299 }
300 Err(e) => {
301 error!(error = %e, "on_network_registry_node_allowed failed");
302 None
303 },
304 }
305 }
306 ChainEventType::NodeSafeRegistered(safe_address) => {
307 info!(%safe_address, "Node safe registered");
308 None
309 }
310 }
311 }
312 }))
313}
314
315pub struct HoprSocket {
319 rx: UnboundedReceiver<ApplicationData>,
320 tx: UnboundedSender<ApplicationData>,
321}
322
323impl Default for HoprSocket {
324 fn default() -> Self {
325 let (tx, rx) = unbounded::<ApplicationData>();
326 Self { rx, tx }
327 }
328}
329
330impl HoprSocket {
331 pub fn new() -> Self {
332 Self::default()
333 }
334
335 pub fn reader(self) -> UnboundedReceiver<ApplicationData> {
336 self.rx
337 }
338
339 pub fn writer(&self) -> UnboundedSender<ApplicationData> {
340 self.tx.clone()
341 }
342}
343
344pub struct Hopr {
356 me: OffchainKeypair,
357 me_chain: ChainKeypair,
358 cfg: config::HoprLibConfig,
359 state: Arc<AtomicHoprState>,
360 transport_api: HoprTransport<HoprDb>,
361 hopr_chain_api: HoprChain<HoprDb>,
362 db: HoprDb,
364 chain_cfg: ChainNetworkConfig,
365 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
366 multistrategy: Arc<MultiStrategy>,
367 rx_indexer_significant_events: async_channel::Receiver<SignificantChainEvent>,
368}
369
370impl Hopr {
371 pub fn new(
372 mut cfg: config::HoprLibConfig,
373 me: &OffchainKeypair,
374 me_onchain: &ChainKeypair,
375 ) -> crate::errors::Result<Self> {
376 let multiaddress: Multiaddr = (&cfg.host).try_into()?;
377
378 let db_path: PathBuf = [&cfg.db.data, "db"].iter().collect();
379 info!(path = ?db_path, "Initiating DB");
380
381 if cfg.db.force_initialize {
382 info!("Force cleaning up existing database");
383 remove_dir_all(db_path.as_path()).map_err(|e| {
384 HoprLibError::GeneralError(format!(
385 "Failed to remove the existing DB directory at '{db_path:?}': {e}"
386 ))
387 })?;
388 cfg.db.initialize = true
389 }
390
391 if let Some(parent_dir_path) = db_path.as_path().parent() {
393 if !parent_dir_path.is_dir() {
394 std::fs::create_dir_all(parent_dir_path).map_err(|e| {
395 HoprLibError::GeneralError(format!(
396 "Failed to create DB parent directory at '{parent_dir_path:?}': {e}"
397 ))
398 })?
399 }
400 }
401
402 let db_cfg = HoprDbConfig {
403 create_if_missing: cfg.db.initialize,
404 force_create: cfg.db.force_initialize,
405 log_slow_queries: std::time::Duration::from_millis(150),
406 };
407 let db = futures::executor::block_on(HoprDb::new(db_path.as_path(), me_onchain.clone(), db_cfg))?;
408
409 if let Some(provider) = &cfg.chain.provider {
410 info!(provider, "Creating chain components using the custom provider");
411 } else {
412 info!("Creating chain components using the default provider");
413 }
414 let resolved_environment = hopr_chain_api::config::ChainNetworkConfig::new(
415 &cfg.chain.network,
416 crate::constants::APP_VERSION_COERCED,
417 cfg.chain.provider.as_deref(),
418 cfg.chain.max_rpc_requests_per_sec,
419 &mut cfg.chain.protocols,
420 )
421 .map_err(|e| HoprLibError::GeneralError(format!("Failed to resolve blockchain environment: {e}")))?;
422
423 let contract_addresses = ContractAddresses::from(&resolved_environment);
424 info!(
425 myself = me_onchain.public().to_hex(),
426 contract_addresses = ?contract_addresses,
427 "Resolved contract addresses",
428 );
429
430 let my_multiaddresses = vec![multiaddress];
431
432 let (tx_indexer_events, rx_indexer_events) = async_channel::unbounded::<SignificantChainEvent>();
433
434 let channel_graph = Arc::new(RwLock::new(ChannelGraph::new(
435 me_onchain.public().to_address(),
436 ChannelGraphConfig::default(),
437 )));
438
439 let hopr_transport_api = HoprTransport::new(
440 me,
441 me_onchain,
442 HoprTransportConfig {
443 transport: cfg.transport.clone(),
444 network: cfg.network_options.clone(),
445 protocol: cfg.protocol,
446 heartbeat: cfg.heartbeat,
447 session: cfg.session,
448 },
449 db.clone(),
450 channel_graph.clone(),
451 my_multiaddresses,
452 );
453
454 let hopr_hopr_chain_api = hopr_chain_api::HoprChain::new(
455 me_onchain.clone(),
456 db.clone(),
457 resolved_environment.clone(),
458 cfg.safe_module.module_address,
459 ContractAddresses {
460 announcements: resolved_environment.announcements,
461 channels: resolved_environment.channels,
462 token: resolved_environment.token,
463 price_oracle: resolved_environment.ticket_price_oracle,
464 win_prob_oracle: resolved_environment.winning_probability_oracle,
465 network_registry: resolved_environment.network_registry,
466 network_registry_proxy: resolved_environment.network_registry_proxy,
467 stake_factory: resolved_environment.node_stake_v2_factory,
468 safe_registry: resolved_environment.node_safe_registry,
469 module_implementation: resolved_environment.module_implementation,
470 },
471 cfg.safe_module.safe_address,
472 hopr_chain_indexer::IndexerConfig {
473 start_block_number: resolved_environment.channel_contract_deploy_block as u64,
474 fast_sync: cfg.chain.fast_sync,
475 },
476 tx_indexer_events,
477 );
478
479 let multi_strategy = Arc::new(MultiStrategy::new(
480 cfg.strategy.clone(),
481 db.clone(),
482 hopr_hopr_chain_api.actions_ref().clone(),
483 hopr_transport_api.ticket_aggregator(),
484 ));
485 debug!(
486 strategies = tracing::field::debug(&multi_strategy),
487 "Initialized strategies"
488 );
489
490 #[cfg(all(feature = "prometheus", not(test)))]
491 {
492 METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
493 METRIC_HOPR_LIB_VERSION.set(
494 &[const_format::formatcp!("{}", constants::APP_VERSION)],
495 f64::from_str(const_format::formatcp!(
496 "{}.{}",
497 env!("CARGO_PKG_VERSION_MAJOR"),
498 env!("CARGO_PKG_VERSION_MINOR")
499 ))
500 .unwrap_or(0.0),
501 );
502
503 if let Err(e) = futures::executor::block_on(db.get_ticket_statistics(None)) {
505 error!(error = %e, "Failed to initialize ticket statistics metrics");
506 }
507 }
508
509 Ok(Self {
510 me: me.clone(),
511 me_chain: me_onchain.clone(),
512 cfg,
513 state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
514 transport_api: hopr_transport_api,
515 hopr_chain_api: hopr_hopr_chain_api,
516 db,
517 chain_cfg: resolved_environment,
518 channel_graph,
519 multistrategy: multi_strategy,
520 rx_indexer_significant_events: rx_indexer_events,
521 })
522 }
523
524 fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
525 if self.status() == state {
526 Ok(())
527 } else {
528 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
529 }
530 }
531
532 pub fn status(&self) -> HoprState {
533 self.state.load(Ordering::Relaxed)
534 }
535
536 pub fn version_coerced(&self) -> String {
537 String::from(constants::APP_VERSION_COERCED)
538 }
539
540 pub fn version(&self) -> String {
541 String::from(constants::APP_VERSION)
542 }
543
544 pub fn network(&self) -> String {
545 self.cfg.chain.network.clone()
546 }
547
548 pub async fn get_balance(&self, balance_type: BalanceType) -> errors::Result<Balance> {
549 Ok(self.hopr_chain_api.get_balance(balance_type).await?)
550 }
551
552 pub async fn get_eligibility_status(&self) -> errors::Result<bool> {
553 Ok(self.hopr_chain_api.get_eligibility_status().await?)
554 }
555
556 pub async fn get_safe_balance(&self, balance_type: BalanceType) -> errors::Result<Balance> {
557 let safe_balance = self
558 .hopr_chain_api
559 .get_safe_balance(self.cfg.safe_module.safe_address, balance_type)
560 .await?;
561
562 if balance_type == BalanceType::HOPR {
563 let my_db = self.db.clone();
564 self.db
565 .begin_transaction()
566 .await?
567 .perform(|tx| {
568 Box::pin(async move {
569 let db_safe_balance = my_db.get_safe_hopr_balance(Some(tx)).await?;
570 if safe_balance != db_safe_balance {
571 warn!(
572 %db_safe_balance,
573 %safe_balance,
574 "Safe balance in the DB mismatches on chain balance"
575 );
576 my_db.set_safe_hopr_balance(Some(tx), safe_balance).await?;
577 }
578 Ok::<_, DbSqlError>(())
579 })
580 })
581 .await?;
582 }
583 Ok(safe_balance)
584 }
585
586 pub fn get_safe_config(&self) -> SafeModule {
587 self.cfg.safe_module.clone()
588 }
589
590 pub fn chain_config(&self) -> ChainNetworkConfig {
591 self.chain_cfg.clone()
592 }
593
594 pub fn get_provider(&self) -> String {
595 self.cfg
596 .chain
597 .provider
598 .clone()
599 .unwrap_or(self.chain_cfg.chain.default_provider.clone())
600 }
601
602 #[inline]
603 fn is_public(&self) -> bool {
604 self.cfg.chain.announce
605 }
606
607 pub async fn run<#[cfg(feature = "session-server")] T: HoprSessionReactor + Clone + Send + 'static>(
608 &self,
609 #[cfg(feature = "session-server")] serve_handler: T,
610 ) -> errors::Result<(HoprSocket, HashMap<HoprLibProcesses, JoinHandle<()>>)> {
611 self.error_if_not_in_state(
612 HoprState::Uninitialized,
613 "Cannot start the hopr node multiple times".into(),
614 )?;
615
616 info!(
617 address = %self.me_onchain(), minimum_balance = %Balance::new_from_str(SUGGESTED_NATIVE_BALANCE, BalanceType::Native),
618 "Node is not started, please fund this node",
619 );
620
621 let mut processes: HashMap<HoprLibProcesses, JoinHandle<()>> = HashMap::new();
622
623 wait_for_funds(
624 self.me_onchain(),
625 Balance::new_from_str(MIN_NATIVE_BALANCE, BalanceType::Native),
626 Duration::from_secs(200),
627 self.hopr_chain_api.rpc(),
628 )
629 .await?;
630
631 info!("Starting the node...");
632
633 self.state.store(HoprState::Initializing, Ordering::Relaxed);
634
635 let balance = self.get_balance(BalanceType::Native).await?;
636
637 let minimum_balance = Balance::new_from_str(constants::MIN_NATIVE_BALANCE, BalanceType::Native);
638
639 info!(
640 address = %self.hopr_chain_api.me_onchain(),
641 %balance,
642 %minimum_balance,
643 "Node information"
644 );
645
646 if balance.le(&minimum_balance) {
647 return Err(HoprLibError::GeneralError(
648 "Cannot start the node without a sufficiently funded wallet".to_string(),
649 ));
650 }
651
652 let network_min_ticket_price = self.hopr_chain_api.get_minimum_ticket_price().await?;
655
656 let configured_ticket_price = self.cfg.protocol.outgoing_ticket_price;
657 if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
658 return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
659 "configured outgoing ticket price is lower than the network minimum ticket price: {configured_ticket_price:?} < {network_min_ticket_price}"
660 ))));
661 }
662
663 let network_min_win_prob = self.hopr_chain_api.get_minimum_winning_probability().await?;
666 let configured_win_prob = self.cfg.protocol.outgoing_ticket_winning_prob;
667 if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
668 && configured_win_prob.is_some_and(|c| c < network_min_win_prob)
669 {
670 return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
671 "configured outgoing ticket winning probability is lower than the network minimum winning probability: {configured_win_prob:?} < {network_min_win_prob}"
672 ))));
673 }
674
675 info!("Linking chain and packet keys");
676 self.db
677 .insert_account(
678 None,
679 AccountEntry {
680 public_key: *self.me.public(),
681 chain_addr: self.hopr_chain_api.me_onchain(),
682 entry_type: AccountType::NotAnnounced,
684 },
685 )
686 .await?;
687
688 self.state.store(HoprState::Indexing, Ordering::Relaxed);
689
690 let (indexer_peer_update_tx, indexer_peer_update_rx) = futures::channel::mpsc::unbounded::<PeerDiscovery>();
691
692 let indexer_event_pipeline = chain_events_to_transport_events(
693 self.rx_indexer_significant_events.clone(),
694 self.me_onchain(),
695 self.db.clone(),
696 self.multistrategy.clone(),
697 self.channel_graph.clone(),
698 self.hopr_chain_api.action_state(),
699 )
700 .await;
701
702 spawn(async move {
704 indexer_event_pipeline
705 .map(Ok)
706 .forward(indexer_peer_update_tx)
707 .await
708 .expect("The index to transport event chain failed");
709 });
710
711 info!("Start the chain process and sync the indexer");
712 for (id, proc) in self.hopr_chain_api.start().await?.into_iter() {
713 let nid = match id {
714 HoprChainProcess::Indexer => HoprLibProcesses::Indexing,
715 HoprChainProcess::OutgoingOnchainActionQueue => HoprLibProcesses::OutgoingOnchainActionQueue,
716 };
717 processes.insert(nid, proc);
718 }
719
720 {
721 let my_ethereum_address = self.me_onchain();
723 let my_peer_id = self.me_peer_id();
724 let my_version = crate::constants::APP_VERSION;
725
726 while !self
727 .db
728 .clone()
729 .is_allowed_in_network_registry(None, &my_ethereum_address)
730 .await
731 .unwrap_or(false)
732 {
733 info!("Once you become eligible to join the HOPR network, you can continue your onboarding by using the following URL: https://hub.hoprnet.org/staking/onboarding?HOPRdNodeAddressForOnboarding={}, or by manually entering the node address of your node on https://hub.hoprnet.org/.", my_ethereum_address.to_hex());
734
735 sleep(ONBOARDING_INFORMATION_INTERVAL).await;
736
737 info!(peer_id = %my_peer_id, address = %my_ethereum_address.to_hex(), version = &my_version, "Node information");
738 info!("Node Ethereum address: {my_ethereum_address} <- put this into staking hub");
739 }
740 }
741
742 let safe_module_configuration = self
748 .hopr_chain_api
749 .rpc()
750 .check_node_safe_module_status(self.me_onchain())
751 .await
752 .map_err(HoprChainError::Rpc)?;
753
754 if !safe_module_configuration.should_pass() {
755 error!(
756 ?safe_module_configuration,
757 "Something is wrong with the safe module configuration",
758 );
759 return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
760 "Safe and module are not configured correctly {:?}",
761 safe_module_configuration,
762 ))));
763 }
764
765 if can_register_with_safe(
768 self.me_onchain(),
769 self.cfg.safe_module.safe_address,
770 self.hopr_chain_api.rpc(),
771 )
772 .await?
773 {
774 info!("Registering safe by node");
775
776 if self.me_onchain() == self.cfg.safe_module.safe_address {
777 return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
778 }
779
780 if let Err(e) = self
781 .hopr_chain_api
782 .actions_ref()
783 .register_safe_by_node(self.cfg.safe_module.safe_address)
784 .await?
785 .await
786 {
787 error!(error = %e, "Failed to register node with safe")
789 }
790 }
791
792 self.db
793 .set_safe_info(
794 None,
795 SafeInfo {
796 safe_address: self.cfg.safe_module.safe_address,
797 module_address: self.cfg.safe_module.module_address,
798 },
799 )
800 .await?;
801
802 if self.is_public() {
803 let multiaddresses_to_announce = self.transport_api.announceable_multiaddresses();
807
808 match self
810 .hopr_chain_api
811 .actions_ref()
812 .announce(&multiaddresses_to_announce, &self.me)
813 .await
814 {
815 Ok(_) => info!(?multiaddresses_to_announce, "Announcing node on chain",),
816 Err(ChainActionsError::AlreadyAnnounced) => {
817 info!(multiaddresses_announced = ?multiaddresses_to_announce, "Node already announced on chain", )
818 }
819 Err(e) => error!(error = %e, "Failed to transmit node announcement"),
823 }
824 }
825
826 {
827 let channel_graph = self.channel_graph.clone();
828 let mut cg = channel_graph.write().await;
829
830 info!("Syncing channels from the previous runs");
831 let mut channel_stream = self
832 .db
833 .stream_active_channels()
834 .await
835 .map_err(hopr_db_sql::api::errors::DbError::from)?;
836
837 while let Some(maybe_channel) = channel_stream.next().await {
838 match maybe_channel {
839 Ok(channel) => {
840 cg.update_channel(channel);
841 }
842 Err(error) => error!(%error, "Failed to sync channel into the graph"),
843 }
844 }
845
846 info!("Syncing peer qualities from the previous runs");
851 let min_quality_to_sync: f64 = std::env::var("HOPR_MIN_PEER_QUALITY_TO_SYNC")
852 .map_err(|e| e.to_string())
853 .and_then(|v| std::str::FromStr::from_str(&v).map_err(|_| "parse error".to_string()))
854 .unwrap_or_else(|error| {
855 warn!(error, "invalid value for HOPR_MIN_PEER_QUALITY_TO_SYNC env variable");
856 constants::DEFAULT_MIN_QUALITY_TO_SYNC
857 });
858
859 let mut peer_stream = self
860 .db
861 .get_network_peers(Default::default(), false)
862 .await?
863 .filter(|status| futures::future::ready(status.quality >= min_quality_to_sync));
864
865 while let Some(peer) = peer_stream.next().await {
866 if let Some(ChainKey(key)) = self.db.translate_key(None, peer.id.0).await? {
867 cg.update_node_score(&key, NodeScoreUpdate::Initialize(peer.last_seen_latency, 1.0));
869 } else {
870 error!(peer = %peer.id.1, "Could not translate peer information");
871 }
872 }
873
874 info!(
875 channels = cg.count_channels(),
876 nodes = cg.count_nodes(),
877 "Channel graph sync complete"
878 );
879 }
880
881 let socket = HoprSocket::new();
882 let transport_output_tx = socket.writer();
883
884 let multi_strategy_ack_ticket = self.multistrategy.clone();
886 let (on_ack_tkt_tx, mut on_ack_tkt_rx) = unbounded::<AcknowledgedTicket>();
887 self.db.start_ticket_processing(Some(on_ack_tkt_tx))?;
888 processes.insert(
889 HoprLibProcesses::OnReceivedAcknowledgement,
890 spawn(async move {
891 while let Some(ack) = on_ack_tkt_rx.next().await {
892 if let Err(error) = hopr_strategy::strategy::SingularStrategy::on_acknowledged_winning_ticket(
893 &*multi_strategy_ack_ticket,
894 &ack,
895 )
896 .await
897 {
898 error!(%error, "Failed to process acknowledged winning ticket with the strategy");
899 }
900 }
901 }),
902 );
903
904 let (session_tx, _session_rx) = unbounded::<IncomingSession>();
905
906 #[cfg(feature = "session-server")]
907 {
908 processes.insert(
909 HoprLibProcesses::SessionServer,
910 spawn(_session_rx.for_each_concurrent(None, move |session| {
911 let serve_handler = serve_handler.clone();
912 async move {
913 let session_id = *session.session.id();
914 match serve_handler.process(session).await {
915 Ok(_) => debug!(
916 session_id = ?session_id,
917 "Client session processed successfully"
918 ),
919 Err(e) => error!(
920 session_id = ?session_id,
921 error = %e,
922 "Client session processing failed"
923 ),
924 }
925 }
926 })),
927 );
928 }
929
930 for (id, proc) in self
931 .transport_api
932 .run(
933 &self.me_chain,
934 String::from(constants::APP_VERSION),
935 join(&[&self.cfg.db.data, "tbf"])
936 .map_err(|e| HoprLibError::GeneralError(format!("Failed to construct the bloom filter: {e}")))?,
937 transport_output_tx,
938 indexer_peer_update_rx,
939 session_tx,
940 )
941 .await?
942 .into_iter()
943 {
944 processes.insert(id.into(), proc);
945 }
946
947 let db_clone = self.db.clone();
948 processes.insert(
949 HoprLibProcesses::TicketIndexFlush,
950 spawn(Box::pin(execute_on_tick(
951 Duration::from_secs(5),
952 move || {
953 let db_clone = db_clone.clone();
954 async move {
955 match db_clone.persist_outgoing_ticket_indices().await {
956 Ok(n) => debug!(count = n, "Successfully flushed states of outgoing ticket indices"),
957 Err(e) => error!(error = %e, "Failed to flush ticket indices"),
958 }
959 }
960 },
961 "flush the states of outgoing ticket indices".into(),
962 ))),
963 );
964
965 if let Err(e) = self.db.fix_channels_next_ticket_state().await {
970 error!(error = %e, "failed to fix channels ticket states");
971 }
972
973 let multi_strategy = self.multistrategy.clone();
977 let strategy_interval = self.cfg.strategy.execution_interval;
978 processes.insert(
979 HoprLibProcesses::StrategyTick,
980 spawn(async move {
981 execute_on_tick(
982 Duration::from_secs(strategy_interval),
983 move || {
984 let multi_strategy = multi_strategy.clone();
985
986 async move {
987 trace!(state = "started", "strategy tick");
988 let _ = multi_strategy.on_tick().await;
989 trace!(state = "finished", "strategy tick");
990 }
991 },
992 "run strategies".into(),
993 )
994 .await;
995 }),
996 );
997
998 self.state.store(HoprState::Running, Ordering::Relaxed);
999
1000 info!(
1001 id = %self.me_peer_id(),
1002 version = constants::APP_VERSION,
1003 "NODE STARTED AND RUNNING"
1004 );
1005
1006 #[cfg(all(feature = "prometheus", not(test)))]
1007 METRIC_HOPR_NODE_INFO.set(
1008 &[
1009 &self.me.public().to_peerid_str(),
1010 &self.me_onchain().to_string(),
1011 &self.cfg.safe_module.safe_address.to_string(),
1012 &self.cfg.safe_module.module_address.to_string(),
1013 ],
1014 1.0,
1015 );
1016
1017 Ok((socket, processes))
1018 }
1019
1020 pub fn me_peer_id(&self) -> PeerId {
1023 (*self.me.public()).into()
1024 }
1025
1026 pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
1028 Ok(self.transport_api.get_public_nodes().await?)
1029 }
1030
1031 pub async fn get_indexer_state(&self) -> errors::Result<IndexerStateInfo> {
1033 let indexer_state_info = self.db.get_indexer_state_info(None).await?;
1034
1035 match self.db.get_last_checksummed_log().await? {
1036 Some(log) => {
1037 let checksum = match log.checksum {
1038 Some(checksum) => Hash::from_hex(checksum.as_str())?,
1039 None => Hash::default(),
1040 };
1041 Ok(IndexerStateInfo {
1042 latest_log_block_number: log.block_number as u32,
1043 latest_log_checksum: checksum,
1044 ..indexer_state_info
1045 })
1046 }
1047 None => Ok(indexer_state_info),
1048 }
1049 }
1050
1051 pub async fn is_allowed_to_access_network(
1053 &self,
1054 address_like: either::Either<&PeerId, Address>,
1055 ) -> errors::Result<bool> {
1056 Ok(self.transport_api.is_allowed_to_access_network(address_like).await?)
1057 }
1058
1059 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
1063 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1064
1065 Ok(self.transport_api.ping(peer).await?)
1066 }
1067
1068 #[cfg(feature = "session-client")]
1071 pub async fn connect_to(&self, cfg: SessionClientConfig) -> errors::Result<HoprSession> {
1072 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1073
1074 let backoff = backon::ConstantBuilder::default()
1075 .with_max_times(self.cfg.session.establish_max_retries as usize)
1076 .with_delay(self.cfg.session.establish_retry_timeout)
1077 .with_jitter();
1078
1079 struct Sleeper;
1080 impl backon::Sleeper for Sleeper {
1081 type Sleep = futures_timer::Delay;
1082
1083 fn sleep(&self, dur: Duration) -> Self::Sleep {
1084 futures_timer::Delay::new(dur)
1085 }
1086 }
1087
1088 use backon::Retryable;
1089
1090 Ok((|| {
1091 let cfg = cfg.clone();
1092 async { self.transport_api.new_session(cfg).await }
1093 })
1094 .retry(backoff)
1095 .sleep(Sleeper)
1096 .await?)
1097 }
1098
1099 #[tracing::instrument(level = "debug", skip(self, msg))]
1107 pub async fn send_message(
1108 &self,
1109 msg: Box<[u8]>,
1110 destination: PeerId,
1111 options: RoutingOptions,
1112 application_tag: Option<u16>,
1113 ) -> errors::Result<()> {
1114 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1115
1116 self.transport_api
1117 .send_message(msg, destination, options, application_tag)
1118 .await?;
1119
1120 Ok(())
1121 }
1122
1123 pub async fn aggregate_tickets(&self, channel: &Hash) -> errors::Result<()> {
1125 Ok(self.transport_api.aggregate_tickets(channel).await?)
1126 }
1127
1128 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1130 self.transport_api.local_multiaddresses()
1131 }
1132
1133 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1135 self.transport_api.listening_multiaddresses().await
1136 }
1137
1138 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1140 self.transport_api.network_observed_multiaddresses(peer).await
1141 }
1142
1143 pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Vec<Multiaddr> {
1145 let key = match OffchainPublicKey::try_from(peer) {
1146 Ok(k) => k,
1147 Err(e) => {
1148 error!(%peer, error = %e, "failed to convert peer id to off-chain key");
1149 return vec![];
1150 }
1151 };
1152
1153 match self.db.get_account(None, key).await {
1154 Ok(Some(entry)) => Vec::from_iter(entry.get_multiaddr()),
1155 Ok(None) => {
1156 error!(%peer, "no information");
1157 vec![]
1158 }
1159 Err(e) => {
1160 error!(%peer, error = %e, "failed to retrieve information");
1161 vec![]
1162 }
1163 }
1164 }
1165
1166 pub async fn network_health(&self) -> Health {
1170 self.transport_api.network_health().await
1171 }
1172
1173 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
1175 Ok(self.transport_api.network_connected_peers().await?)
1176 }
1177
1178 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<hopr_transport::PeerStatus>> {
1180 Ok(self.transport_api.network_peer_info(peer).await?)
1181 }
1182
1183 pub async fn all_network_peers(
1185 &self,
1186 minimum_quality: f64,
1187 ) -> errors::Result<Vec<(Option<Address>, PeerId, hopr_transport::PeerStatus)>> {
1188 Ok(
1189 futures::stream::iter(self.transport_api.network_connected_peers().await?)
1190 .filter_map(|peer| async move {
1191 if let Ok(Some(info)) = self.transport_api.network_peer_info(&peer).await {
1192 if info.get_average_quality() >= minimum_quality {
1193 Some((peer, info))
1194 } else {
1195 None
1196 }
1197 } else {
1198 None
1199 }
1200 })
1201 .filter_map(|(peer_id, info)| async move {
1202 let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
1203 Some((address, peer_id, info))
1204 })
1205 .collect::<Vec<_>>()
1206 .await,
1207 )
1208 }
1209
1210 pub async fn tickets_in_channel(&self, channel: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
1213 Ok(self.transport_api.tickets_in_channel(channel).await?)
1214 }
1215
1216 pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
1218 Ok(self.transport_api.all_tickets().await?)
1219 }
1220
1221 pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
1223 Ok(self.transport_api.ticket_statistics().await?)
1224 }
1225
1226 pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
1228 Ok(self.db.reset_ticket_statistics().await?)
1229 }
1230
1231 pub fn peer_resolver(&self) -> &impl HoprDbResolverOperations {
1233 &self.db
1234 }
1235
1236 pub fn me_onchain(&self) -> Address {
1238 self.hopr_chain_api.me_onchain()
1239 }
1240
1241 pub async fn get_ticket_price(&self) -> errors::Result<Option<U256>> {
1243 Ok(self.hopr_chain_api.ticket_price().await?)
1244 }
1245
1246 pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<f64> {
1248 Ok(self
1249 .db
1250 .get_indexer_data(None)
1251 .await?
1252 .minimum_incoming_ticket_winning_prob)
1253 }
1254
1255 pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1257 Ok(self.db.get_accounts(None, false).await?)
1258 }
1259
1260 pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1263 Ok(self.db.get_channel_by_id(None, channel_id).await?)
1264 }
1265
1266 pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<ChannelEntry> {
1271 Ok(self.hopr_chain_api.channel(src, dest).await?)
1272 }
1273
1274 pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1276 Ok(self.hopr_chain_api.channels_from(src).await?)
1277 }
1278
1279 pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1281 Ok(self.hopr_chain_api.channels_to(dest).await?)
1282 }
1283
1284 pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1286 Ok(self.hopr_chain_api.all_channels().await?)
1287 }
1288
1289 pub async fn safe_allowance(&self) -> errors::Result<Balance> {
1291 Ok(self.hopr_chain_api.safe_allowance().await?)
1292 }
1293
1294 pub async fn withdraw(&self, recipient: Address, amount: Balance) -> errors::Result<Hash> {
1298 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1299
1300 Ok(self
1301 .hopr_chain_api
1302 .actions_ref()
1303 .withdraw(recipient, amount)
1304 .await?
1305 .await?
1306 .tx_hash)
1307 }
1308
1309 pub async fn open_channel(&self, destination: &Address, amount: &Balance) -> errors::Result<OpenChannelResult> {
1310 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1311
1312 let awaiter = self
1313 .hopr_chain_api
1314 .actions_ref()
1315 .open_channel(*destination, *amount)
1316 .await?;
1317
1318 let channel_id = generate_channel_id(&self.hopr_chain_api.me_onchain(), destination);
1319 Ok(awaiter.await.map(|confirm| OpenChannelResult {
1320 tx_hash: confirm.tx_hash,
1321 channel_id,
1322 })?)
1323 }
1324
1325 pub async fn fund_channel(&self, channel_id: &Hash, amount: &Balance) -> errors::Result<Hash> {
1326 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1327
1328 Ok(self
1329 .hopr_chain_api
1330 .actions_ref()
1331 .fund_channel(*channel_id, *amount)
1332 .await?
1333 .await
1334 .map(|confirm| confirm.tx_hash)?)
1335 }
1336
1337 pub async fn close_channel(
1338 &self,
1339 counterparty: &Address,
1340 direction: ChannelDirection,
1341 redeem_before_close: bool,
1342 ) -> errors::Result<CloseChannelResult> {
1343 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1344
1345 let confirmation = self
1346 .hopr_chain_api
1347 .actions_ref()
1348 .close_channel(*counterparty, direction, redeem_before_close)
1349 .await?
1350 .await?;
1351
1352 match confirmation
1353 .event
1354 .expect("channel close action confirmation must have associated chain event")
1355 {
1356 ChainEventType::ChannelClosureInitiated(c) => Ok(CloseChannelResult {
1357 tx_hash: confirmation.tx_hash,
1358 status: c.status, }),
1360 ChainEventType::ChannelClosed(_) => Ok(CloseChannelResult {
1361 tx_hash: confirmation.tx_hash,
1362 status: ChannelStatus::Closed,
1363 }),
1364 _ => Err(HoprLibError::GeneralError("close channel transaction failed".into())),
1365 }
1366 }
1367
1368 pub async fn close_channel_by_id(
1369 &self,
1370 channel_id: Hash,
1371 redeem_before_close: bool,
1372 ) -> errors::Result<CloseChannelResult> {
1373 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1374
1375 match self.channel_from_hash(&channel_id).await? {
1376 Some(channel) => match channel.orientation(&self.me_onchain()) {
1377 Some((direction, counterparty)) => {
1378 self.close_channel(&counterparty, direction, redeem_before_close).await
1379 }
1380 None => Err(HoprLibError::ChainError(ChainActionsError::InvalidArguments(
1381 "cannot close channel that is not own".into(),
1382 ))),
1383 },
1384 None => Err(HoprLibError::ChainError(ChainActionsError::ChannelDoesNotExist)),
1385 }
1386 }
1387
1388 pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1389 Ok(self.hopr_chain_api.get_channel_closure_notice_period().await?)
1390 }
1391
1392 pub async fn redeem_all_tickets(&self, only_aggregated: bool) -> errors::Result<()> {
1393 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1394
1395 self.hopr_chain_api
1397 .actions_ref()
1398 .redeem_all_tickets(only_aggregated)
1399 .await?;
1400
1401 Ok(())
1402 }
1403
1404 pub async fn redeem_tickets_with_counterparty(
1405 &self,
1406 counterparty: &Address,
1407 only_aggregated: bool,
1408 ) -> errors::Result<()> {
1409 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1410
1411 let _ = self
1413 .hopr_chain_api
1414 .actions_ref()
1415 .redeem_tickets_with_counterparty(counterparty, only_aggregated)
1416 .await?;
1417
1418 Ok(())
1419 }
1420
1421 pub async fn redeem_tickets_in_channel(&self, channel_id: &Hash, only_aggregated: bool) -> errors::Result<usize> {
1422 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1423
1424 let channel = self.db.get_channel_by_id(None, channel_id).await?;
1425 let mut redeem_count = 0;
1426
1427 if let Some(channel) = channel {
1428 if channel.destination == self.hopr_chain_api.me_onchain() {
1429 redeem_count = self
1431 .hopr_chain_api
1432 .actions_ref()
1433 .redeem_tickets_in_channel(&channel, only_aggregated)
1434 .await?
1435 .len();
1436 }
1437 }
1438
1439 Ok(redeem_count)
1440 }
1441
1442 pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1443 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1444
1445 #[allow(clippy::let_underscore_future)]
1447 let _ = self.hopr_chain_api.actions_ref().redeem_ticket(ack_ticket).await?;
1448
1449 Ok(())
1450 }
1451
1452 pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1453 let pk = hopr_transport::OffchainPublicKey::try_from(peer_id)?;
1454 Ok(self.db.resolve_chain_key(&pk).await?)
1455 }
1456
1457 pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1458 Ok(self
1459 .db
1460 .resolve_packet_key(address)
1461 .await
1462 .map(|pk| pk.map(|v| v.into()))?)
1463 }
1464
1465 pub async fn export_channel_graph(&self, cfg: GraphExportConfig) -> String {
1466 self.channel_graph.read().await.as_dot(cfg)
1467 }
1468
1469 pub async fn export_raw_channel_graph(&self) -> errors::Result<String> {
1470 let cg = self.channel_graph.read().await;
1471 serde_json::to_string(cg.deref()).map_err(|e| HoprLibError::GeneralError(e.to_string()))
1472 }
1473}