hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15
16/// Configuration-related public types
17pub mod config;
18/// Various public constants.
19pub mod constants;
20/// Lists all errors thrown from this library.
21pub mod errors;
22
23use std::{
24    collections::HashMap,
25    fmt::{Display, Formatter},
26    ops::Deref,
27    path::PathBuf,
28    str::FromStr,
29    sync::{Arc, atomic::Ordering},
30    time::Duration,
31};
32
33use async_lock::RwLock;
34use errors::{HoprLibError, HoprStatusError};
35use futures::{
36    SinkExt, Stream, StreamExt,
37    channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded},
38    future::AbortHandle,
39    stream::{self},
40};
41use hopr_async_runtime::prelude::{sleep, spawn};
42pub use hopr_chain_actions::errors::ChainActionsError;
43use hopr_chain_actions::{
44    action_state::{ActionState, IndexerActionTracker},
45    channels::ChannelActions,
46    node::NodeActions,
47    redeem::TicketRedeemActions,
48};
49pub use hopr_chain_api::config::{
50    Addresses as NetworkContractAddresses, EnvironmentType, Network as ChainNetwork, ProtocolsConfig,
51};
52use hopr_chain_api::{
53    HoprChain, HoprChainProcess, SignificantChainEvent, can_register_with_safe, config::ChainNetworkConfig,
54    errors::HoprChainError, wait_for_funds,
55};
56use hopr_chain_rpc::HoprRpcOperations;
57use hopr_chain_types::{ContractAddresses, chain_events::ChainEventType};
58pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
59use hopr_crypto_types::prelude::OffchainPublicKey;
60use hopr_db_api::logs::HoprDbLogOperations;
61use hopr_db_sql::{
62    HoprDbAllOperations,
63    accounts::HoprDbAccountOperations,
64    api::{info::SafeInfo, resolver::HoprDbResolverOperations, tickets::HoprDbTicketOperations},
65    channels::HoprDbChannelOperations,
66    db::{HoprDb, HoprDbConfig},
67    info::{HoprDbInfoOperations, IndexerStateInfo},
68    prelude::{ChainOrPacketKey::ChainKey, HoprDbPeersOperations},
69    registry::HoprDbRegistryOperations,
70};
71pub use hopr_internal_types::prelude::*;
72pub use hopr_network_types::prelude::{DestinationRouting, IpProtocol, RoutingOptions};
73pub use hopr_path::channel_graph::GraphExportConfig;
74use hopr_path::channel_graph::{ChannelGraph, ChannelGraphConfig, NodeScoreUpdate};
75use hopr_platform::file::native::remove_dir_all;
76pub use hopr_primitive_types::prelude::*;
77pub use hopr_strategy::Strategy;
78use hopr_strategy::strategy::{MultiStrategy, SingularStrategy};
79#[cfg(feature = "runtime-tokio")]
80pub use hopr_transport::transfer_session;
81pub use hopr_transport::{
82    ApplicationData, ApplicationDataIn, ApplicationDataOut, HalfKeyChallenge, Health, HoprSession,
83    IncomingSession as HoprIncomingSession, Keypair, Multiaddr, OffchainKeypair as HoprOffchainKeypair, PeerId,
84    PingQueryReplier, ProbeError, SESSION_MTU, SURB_SIZE, ServiceId, SessionCapabilities, SessionCapability,
85    SessionClientConfig, SessionId as HoprSessionId, SessionManagerError, SessionTarget, SurbBalancerConfig, Tag,
86    TicketStatistics, TransportSessionError,
87    config::{HostConfig, HostType, looks_like_domain},
88    errors::{HoprTransportError, NetworkingError, ProtocolError},
89};
90use hopr_transport::{
91    ChainKeypair, Hash, HoprTransport, HoprTransportConfig, HoprTransportProcess, IncomingSession, OffchainKeypair,
92    PeerDiscovery, PeerStatus, execute_on_tick,
93};
94use tracing::{debug, error, info, trace, warn};
95#[cfg(all(feature = "prometheus", not(test)))]
96use {
97    hopr_metrics::metrics::{MultiGauge, SimpleGauge},
98    hopr_platform::time::native::current_time,
99};
100
101use crate::{
102    config::SafeModule,
103    constants::{MIN_NATIVE_BALANCE, ONBOARDING_INFORMATION_INTERVAL, SUGGESTED_NATIVE_BALANCE},
104};
105
106#[cfg(all(feature = "prometheus", not(test)))]
107lazy_static::lazy_static! {
108    static ref METRIC_PROCESS_START_TIME: SimpleGauge = SimpleGauge::new(
109        "hopr_start_time",
110        "The unix timestamp in seconds at which the process was started"
111    ).unwrap();
112    static ref METRIC_HOPR_LIB_VERSION: MultiGauge = MultiGauge::new(
113        "hopr_lib_version",
114        "Executed version of hopr-lib",
115        &["version"]
116    ).unwrap();
117    static ref METRIC_HOPR_NODE_INFO: MultiGauge = MultiGauge::new(
118        "hopr_node_addresses",
119        "Node on-chain and off-chain addresses",
120        &["peerid", "address", "safe_address", "module_address"]
121    ).unwrap();
122}
123
124pub use async_trait::async_trait;
125
126/// Interface representing the HOPR server behavior for each incoming session instance
127/// supplied as an argument.
128#[cfg(feature = "session-server")]
129#[async_trait::async_trait]
130pub trait HoprSessionReactor {
131    /// Fully process a single HOPR session
132    async fn process(&self, session: HoprIncomingSession) -> errors::Result<()>;
133}
134
135/// An enum representing the current state of the HOPR node
136#[atomic_enum::atomic_enum]
137#[derive(PartialEq, Eq)]
138pub enum HoprState {
139    Uninitialized = 0,
140    Initializing = 1,
141    Indexing = 2,
142    Starting = 3,
143    Running = 4,
144}
145
146impl Display for HoprState {
147    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
148        write!(f, "{self:?}")
149    }
150}
151
152pub struct OpenChannelResult {
153    pub tx_hash: Hash,
154    pub channel_id: Hash,
155}
156
157pub struct CloseChannelResult {
158    pub tx_hash: Hash,
159    pub status: ChannelStatus,
160}
161
162/// Enum differentiator for loop component futures.
163///
164/// Used to differentiate the type of the future that exits the loop premateruly
165/// by tagging it as an enum.
166#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display)]
167pub enum HoprLibProcesses {
168    #[strum(to_string = "transport: {0}")]
169    Transport(HoprTransportProcess),
170    #[cfg(feature = "session-server")]
171    #[strum(to_string = "session server providing the exit node session stream functionality")]
172    SessionServer,
173    #[strum(to_string = "tick wake up the strategies to perform an action")]
174    StrategyTick,
175    #[strum(to_string = "initial indexing operation into the DB")]
176    Indexing,
177    #[strum(to_string = "processing of indexed operations in internal components")]
178    IndexReflection,
179    #[strum(to_string = "on-chain transaction queue component for outgoing transactions")]
180    OutgoingOnchainActionQueue,
181    #[strum(to_string = "flush operation of outgoing ticket indices to the DB")]
182    TicketIndexFlush,
183    #[strum(to_string = "on received ack ticket trigger")]
184    OnReceivedAcknowledgement,
185}
186
187impl HoprLibProcesses {
188    /// Identifies whether a loop is allowed to finish or should
189    /// run indefinitely.
190    pub fn can_finish(&self) -> bool {
191        matches!(self, HoprLibProcesses::Indexing)
192    }
193}
194
195impl From<HoprTransportProcess> for HoprLibProcesses {
196    fn from(value: HoprTransportProcess) -> Self {
197        HoprLibProcesses::Transport(value)
198    }
199}
200
201/// Creates a pipeline that chains the indexer-generated data, processes them into
202/// the individual components, and creates a filtered output stream that is fed into
203/// the transport layer swarm.
204///
205/// * `event_stream` - represents the events generated by the indexer. If the Indexer is not synced, it will not
206///   generate any events.
207/// * `preloading_event_stream` - a stream used by the components to preload the data from the objects (db, channel
208///   graph...)
209#[allow(clippy::too_many_arguments)]
210pub async fn chain_events_to_transport_events<StreamIn, Db>(
211    event_stream: StreamIn,
212    me_onchain: Address,
213    db: Db,
214    multi_strategy: Arc<MultiStrategy>,
215    channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
216    indexer_action_tracker: Arc<IndexerActionTracker>,
217) -> impl Stream<Item = PeerDiscovery> + Send + 'static
218where
219    Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
220    StreamIn: Stream<Item = SignificantChainEvent> + Send + 'static,
221{
222    Box::pin(event_stream.filter_map(move |event| {
223        let db = db.clone();
224        let multi_strategy = multi_strategy.clone();
225        let channel_graph = channel_graph.clone();
226        let indexer_action_tracker = indexer_action_tracker.clone();
227
228        async move {
229            let resolved = indexer_action_tracker.match_and_resolve(&event).await;
230            if resolved.is_empty() {
231                trace!(%event, "No indexer expectations resolved for the event");
232            } else {
233                debug!(count = resolved.len(), %event, "resolved indexer expectations");
234            }
235
236            match event.event_type {
237                ChainEventType::Announcement{peer, address, multiaddresses} => {
238                    let allowed = db
239                        .is_allowed_in_network_registry(None, &address)
240                        .await
241                        .unwrap_or(false);
242
243                    Some(vec![PeerDiscovery::Announce(peer, multiaddresses), if allowed {
244                        PeerDiscovery::Allow(peer)
245                    } else {
246                        PeerDiscovery::Ban(peer)
247                    }])
248                }
249                ChainEventType::ChannelOpened(channel) |
250                ChainEventType::ChannelClosureInitiated(channel) |
251                ChainEventType::ChannelClosed(channel) |
252                ChainEventType::ChannelBalanceIncreased(channel, _) | // needed ?
253                ChainEventType::ChannelBalanceDecreased(channel, _) | // needed ?
254                ChainEventType::TicketRedeemed(channel, _) => {   // needed ?
255                    let maybe_direction = channel.direction(&me_onchain);
256
257                    let change = channel_graph
258                        .write_arc()
259                        .await
260                        .update_channel(channel);
261
262                    // Check if this is our own channel
263                    if let Some(own_channel_direction) = maybe_direction {
264                        if let Some(change_set) = change {
265                            for channel_change in change_set {
266                                let _ = hopr_strategy::strategy::SingularStrategy::on_own_channel_changed(
267                                    &*multi_strategy,
268                                    &channel,
269                                    own_channel_direction,
270                                    channel_change,
271                                )
272                                .await;
273                            }
274                        } else if channel.status == ChannelStatus::Open {
275                            // Emit Opening event if the channel did not exist before in the graph
276                            let _ = hopr_strategy::strategy::SingularStrategy::on_own_channel_changed(
277                                &*multi_strategy,
278                                &channel,
279                                own_channel_direction,
280                                ChannelChange::Status {
281                                    left: ChannelStatus::Closed,
282                                    right: ChannelStatus::Open,
283                                },
284                            )
285                            .await;
286                        }
287                    }
288
289                    None
290                }
291                ChainEventType::NetworkRegistryUpdate(address, allowed) => {
292                    let packet_key = db.translate_key(None, address).await;
293                    match packet_key {
294                        Ok(pk) => {
295                            if let Some(pk) = pk {
296                                let offchain_key: Result<OffchainPublicKey, _> = pk.try_into();
297
298                                if let Ok(offchain_key) = offchain_key {
299                                    let peer_id = offchain_key.into();
300
301                                    let res = match allowed {
302                                        hopr_chain_types::chain_events::NetworkRegistryStatus::Allowed => PeerDiscovery::Allow(peer_id),
303                                        hopr_chain_types::chain_events::NetworkRegistryStatus::Denied => PeerDiscovery::Ban(peer_id),
304                                    };
305
306                                    Some(vec![res])
307                                } else {
308                                    error!("Failed to unwrap as offchain key at this point");
309                                    None
310                                }
311                            } else {
312                                None
313                            }
314                        }
315                        Err(error) => {
316                            error!(%error, "on_network_registry_node_allowed failed");
317                            None
318                        },
319                    }
320                }
321                ChainEventType::NodeSafeRegistered(safe_address) =>  {
322                    info!(%safe_address, "Node safe registered");
323                    None
324                }
325            }
326        }
327    })
328    .flat_map(stream::iter)
329)
330}
331
332/// Represents the socket behavior of the hopr-lib spawned [`Hopr`] object.
333///
334/// Provides a read and write stream for Hopr socket recognized data formats.
335pub struct HoprSocket {
336    rx: UnboundedReceiver<ApplicationDataIn>,
337    tx: UnboundedSender<ApplicationDataIn>,
338}
339
340impl Default for HoprSocket {
341    fn default() -> Self {
342        let (tx, rx) = unbounded::<ApplicationDataIn>();
343        Self { rx, tx }
344    }
345}
346
347impl HoprSocket {
348    pub fn new() -> Self {
349        Self::default()
350    }
351
352    pub fn reader(self) -> UnboundedReceiver<ApplicationDataIn> {
353        self.rx
354    }
355
356    pub fn writer(&self) -> UnboundedSender<ApplicationDataIn> {
357        self.tx.clone()
358    }
359}
360
361/// HOPR main object providing the entire HOPR node functionality
362///
363/// Instantiating this object creates all processes and objects necessary for
364/// running the HOPR node. Once created, the node can be started using the
365/// `run()` method.
366///
367/// Externally offered API should be sufficient to perform all necessary tasks
368/// with the HOPR node manually, but it is advised to create such a configuration
369/// that manual interaction is unnecessary.
370///
371/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
372pub struct Hopr {
373    me: OffchainKeypair,
374    me_chain: ChainKeypair,
375    cfg: config::HoprLibConfig,
376    state: Arc<AtomicHoprState>,
377    transport_api: HoprTransport<HoprDb>,
378    hopr_chain_api: HoprChain<HoprDb>,
379    // objects that could be removed pending architectural cleanup ========
380    db: HoprDb,
381    chain_cfg: ChainNetworkConfig,
382    channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
383    multistrategy: Arc<MultiStrategy>,
384    rx_indexer_significant_events: async_channel::Receiver<SignificantChainEvent>,
385}
386
387impl Hopr {
388    pub fn new(
389        mut cfg: config::HoprLibConfig,
390        me: &OffchainKeypair,
391        me_onchain: &ChainKeypair,
392    ) -> crate::errors::Result<Self> {
393        let multiaddress: Multiaddr = (&cfg.host).try_into()?;
394
395        let db_path: PathBuf = [&cfg.db.data, "db"].iter().collect();
396        info!(path = ?db_path, "Initiating DB");
397
398        if cfg.db.force_initialize {
399            info!("Force cleaning up existing database");
400            remove_dir_all(db_path.as_path()).map_err(|e| {
401                HoprLibError::GeneralError(format!(
402                    "Failed to remove the existing DB directory at '{db_path:?}': {e}"
403                ))
404            })?;
405            cfg.db.initialize = true
406        }
407
408        // create DB dir if it does not exist
409        if let Some(parent_dir_path) = db_path.as_path().parent() {
410            if !parent_dir_path.is_dir() {
411                std::fs::create_dir_all(parent_dir_path).map_err(|e| {
412                    HoprLibError::GeneralError(format!(
413                        "Failed to create DB parent directory at '{parent_dir_path:?}': {e}"
414                    ))
415                })?
416            }
417        }
418
419        let db_cfg = HoprDbConfig {
420            create_if_missing: cfg.db.initialize,
421            force_create: cfg.db.force_initialize,
422            log_slow_queries: std::time::Duration::from_millis(150),
423            surb_ring_buffer_size: std::env::var("HOPR_PROTOCOL_SURB_RB_SIZE")
424                .ok()
425                .and_then(|s| u64::from_str(&s).map(|v| v as usize).ok())
426                .unwrap_or_else(|| HoprDbConfig::default().surb_ring_buffer_size),
427            surb_distress_threshold: std::env::var("HOPR_PROTOCOL_SURB_RB_DISTRESS")
428                .ok()
429                .and_then(|s| u64::from_str(&s).map(|v| v as usize).ok())
430                .unwrap_or_else(|| HoprDbConfig::default().surb_distress_threshold),
431        };
432        let db = futures::executor::block_on(HoprDb::new(db_path.as_path(), me_onchain.clone(), db_cfg))?;
433
434        if let Some(provider) = &cfg.chain.provider {
435            info!(provider, "Creating chain components using the custom provider");
436        } else {
437            info!("Creating chain components using the default provider");
438        }
439        let resolved_environment = hopr_chain_api::config::ChainNetworkConfig::new(
440            &cfg.chain.network,
441            crate::constants::APP_VERSION_COERCED,
442            cfg.chain.provider.as_deref(),
443            cfg.chain.max_rpc_requests_per_sec,
444            &mut cfg.chain.protocols,
445        )
446        .map_err(|e| HoprLibError::GeneralError(format!("Failed to resolve blockchain environment: {e}")))?;
447
448        let contract_addresses = ContractAddresses::from(&resolved_environment);
449        info!(
450            myself = me_onchain.public().to_hex(),
451            contract_addresses = ?contract_addresses,
452            "Resolved contract addresses",
453        );
454
455        let my_multiaddresses = vec![multiaddress];
456
457        let (tx_indexer_events, rx_indexer_events) = async_channel::unbounded::<SignificantChainEvent>();
458
459        let channel_graph = Arc::new(RwLock::new(ChannelGraph::new(
460            me_onchain.public().to_address(),
461            ChannelGraphConfig::default(),
462        )));
463
464        let hopr_transport_api = HoprTransport::new(
465            me,
466            me_onchain,
467            HoprTransportConfig {
468                transport: cfg.transport.clone(),
469                network: cfg.network_options.clone(),
470                protocol: cfg.protocol,
471                probe: cfg.probe,
472                session: cfg.session,
473            },
474            db.clone(),
475            channel_graph.clone(),
476            my_multiaddresses,
477        );
478
479        let hopr_hopr_chain_api = hopr_chain_api::HoprChain::new(
480            me_onchain.clone(),
481            db.clone(),
482            resolved_environment.clone(),
483            cfg.safe_module.module_address,
484            ContractAddresses {
485                announcements: resolved_environment.announcements,
486                channels: resolved_environment.channels,
487                token: resolved_environment.token,
488                price_oracle: resolved_environment.ticket_price_oracle,
489                win_prob_oracle: resolved_environment.winning_probability_oracle,
490                network_registry: resolved_environment.network_registry,
491                network_registry_proxy: resolved_environment.network_registry_proxy,
492                stake_factory: resolved_environment.node_stake_v2_factory,
493                safe_registry: resolved_environment.node_safe_registry,
494                module_implementation: resolved_environment.module_implementation,
495            },
496            cfg.safe_module.safe_address,
497            hopr_chain_indexer::IndexerConfig {
498                start_block_number: resolved_environment.channel_contract_deploy_block as u64,
499                fast_sync: cfg.chain.fast_sync,
500                enable_logs_snapshot: cfg.chain.enable_logs_snapshot,
501                logs_snapshot_url: cfg.chain.logs_snapshot_url.clone(),
502                data_directory: cfg.db.data.clone(),
503            },
504            tx_indexer_events,
505        )?;
506
507        let multi_strategy = Arc::new(MultiStrategy::new(
508            cfg.strategy.clone(),
509            db.clone(),
510            hopr_hopr_chain_api.actions_ref().clone(),
511            hopr_transport_api.ticket_aggregator(),
512        ));
513        debug!(
514            strategies = tracing::field::debug(&multi_strategy),
515            "Initialized strategies"
516        );
517
518        #[cfg(all(feature = "prometheus", not(test)))]
519        {
520            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
521            METRIC_HOPR_LIB_VERSION.set(
522                &[const_format::formatcp!("{}", constants::APP_VERSION)],
523                f64::from_str(const_format::formatcp!(
524                    "{}.{}",
525                    env!("CARGO_PKG_VERSION_MAJOR"),
526                    env!("CARGO_PKG_VERSION_MINOR")
527                ))
528                .unwrap_or(0.0),
529            );
530
531            // Calling get_ticket_statistics will initialize the respective metrics on tickets
532            if let Err(e) = futures::executor::block_on(db.get_ticket_statistics(None)) {
533                error!(error = %e, "Failed to initialize ticket statistics metrics");
534            }
535        }
536
537        Ok(Self {
538            me: me.clone(),
539            me_chain: me_onchain.clone(),
540            cfg,
541            state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
542            transport_api: hopr_transport_api,
543            hopr_chain_api: hopr_hopr_chain_api,
544            db,
545            chain_cfg: resolved_environment,
546            channel_graph,
547            multistrategy: multi_strategy,
548            rx_indexer_significant_events: rx_indexer_events,
549        })
550    }
551
552    fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
553        if self.status() == state {
554            Ok(())
555        } else {
556            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
557        }
558    }
559
560    pub fn status(&self) -> HoprState {
561        self.state.load(Ordering::Relaxed)
562    }
563
564    pub fn version_coerced(&self) -> String {
565        String::from(constants::APP_VERSION_COERCED)
566    }
567
568    pub fn version(&self) -> String {
569        String::from(constants::APP_VERSION)
570    }
571
572    pub fn network(&self) -> String {
573        self.cfg.chain.network.clone()
574    }
575
576    pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
577        Ok(self.hopr_chain_api.get_balance().await?)
578    }
579
580    pub async fn get_eligibility_status(&self) -> errors::Result<bool> {
581        Ok(self.hopr_chain_api.get_eligibility_status().await?)
582    }
583
584    pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
585        let safe_balance = self
586            .hopr_chain_api
587            .get_safe_balance(self.cfg.safe_module.safe_address)
588            .await?;
589        Ok(safe_balance)
590    }
591
592    pub fn get_safe_config(&self) -> SafeModule {
593        self.cfg.safe_module.clone()
594    }
595
596    pub fn chain_config(&self) -> ChainNetworkConfig {
597        self.chain_cfg.clone()
598    }
599
600    pub fn config(&self) -> &config::HoprLibConfig {
601        &self.cfg
602    }
603
604    pub fn get_provider(&self) -> String {
605        self.cfg
606            .chain
607            .provider
608            .clone()
609            .unwrap_or(self.chain_cfg.chain.default_provider.clone())
610    }
611
612    #[inline]
613    fn is_public(&self) -> bool {
614        self.cfg.chain.announce
615    }
616
617    pub async fn run<#[cfg(feature = "session-server")] T: HoprSessionReactor + Clone + Send + 'static>(
618        &self,
619        #[cfg(feature = "session-server")] serve_handler: T,
620    ) -> errors::Result<(HoprSocket, HashMap<HoprLibProcesses, AbortHandle>)> {
621        self.error_if_not_in_state(
622            HoprState::Uninitialized,
623            "Cannot start the hopr node multiple times".into(),
624        )?;
625
626        info!(
627            address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
628            "Node is not started, please fund this node",
629        );
630
631        let mut processes: HashMap<HoprLibProcesses, AbortHandle> = HashMap::new();
632
633        wait_for_funds(
634            self.me_onchain(),
635            *MIN_NATIVE_BALANCE,
636            Duration::from_secs(200),
637            self.hopr_chain_api.rpc(),
638        )
639        .await?;
640
641        info!("Starting the node...");
642
643        self.state.store(HoprState::Initializing, Ordering::Relaxed);
644
645        let balance: XDaiBalance = self.get_balance().await?;
646        let minimum_balance = *constants::MIN_NATIVE_BALANCE;
647
648        info!(
649            address = %self.hopr_chain_api.me_onchain(),
650            %balance,
651            %minimum_balance,
652            "Node information"
653        );
654
655        if balance.le(&minimum_balance) {
656            return Err(HoprLibError::GeneralError(
657                "Cannot start the node without a sufficiently funded wallet".to_string(),
658            ));
659        }
660
661        // Once we are able to query the chain,
662        // check if the ticket price is configured correctly.
663        let network_min_ticket_price = self.hopr_chain_api.get_minimum_ticket_price().await?;
664
665        let configured_ticket_price = self.cfg.protocol.outgoing_ticket_price;
666        if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
667            return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
668                "configured outgoing ticket price is lower than the network minimum ticket price: \
669                 {configured_ticket_price:?} < {network_min_ticket_price}"
670            ))));
671        }
672
673        // Once we are able to query the chain,
674        // check if the winning probability is configured correctly.
675        let network_min_win_prob = self.hopr_chain_api.get_minimum_winning_probability().await?;
676        let configured_win_prob = self.cfg.protocol.outgoing_ticket_winning_prob;
677        if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
678            && configured_win_prob
679                .and_then(|c| WinningProbability::try_from(c).ok())
680                .is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
681        {
682            return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
683                "configured outgoing ticket winning probability is lower than the network minimum winning \
684                 probability: {configured_win_prob:?} < {network_min_win_prob}"
685            ))));
686        }
687
688        // set safe and module addresses in the DB
689        self.db
690            .set_safe_info(
691                None,
692                SafeInfo {
693                    safe_address: self.cfg.safe_module.safe_address,
694                    module_address: self.cfg.safe_module.module_address,
695                },
696            )
697            .await?;
698
699        self.state.store(HoprState::Indexing, Ordering::Relaxed);
700
701        let (mut indexer_peer_update_tx, indexer_peer_update_rx) = futures::channel::mpsc::unbounded::<PeerDiscovery>();
702
703        let indexer_event_pipeline = chain_events_to_transport_events(
704            self.rx_indexer_significant_events.clone(),
705            self.me_onchain(),
706            self.db.clone(),
707            self.multistrategy.clone(),
708            self.channel_graph.clone(),
709            self.hopr_chain_api.action_state(),
710        )
711        .await;
712
713        {
714            // This has to happen before the indexing process starts in order to make sure that the pre-existing data is
715            // properly populated into the transport mechanism before the synced data in the follow up process.
716            info!("Syncing peer announcements and network registry updates from previous runs");
717            let accounts = self.db.get_accounts(None, true).await?;
718            for account in accounts.into_iter() {
719                match account.entry_type {
720                    AccountType::NotAnnounced => {}
721                    AccountType::Announced { multiaddr, .. } => {
722                        indexer_peer_update_tx
723                            .send(PeerDiscovery::Announce(account.public_key.into(), vec![multiaddr]))
724                            .await
725                            .map_err(|e| {
726                                HoprLibError::GeneralError(format!("Failed to send peer discovery announcement: {e}"))
727                            })?;
728
729                        let allow_status = if self
730                            .db
731                            .is_allowed_in_network_registry(None, &account.chain_addr)
732                            .await?
733                        {
734                            PeerDiscovery::Allow(account.public_key.into())
735                        } else {
736                            PeerDiscovery::Ban(account.public_key.into())
737                        };
738
739                        indexer_peer_update_tx.send(allow_status).await.map_err(|e| {
740                            HoprLibError::GeneralError(format!(
741                                "Failed to send peer discovery network registry event: {e}"
742                            ))
743                        })?;
744                    }
745                }
746            }
747        }
748
749        spawn(async move {
750            indexer_event_pipeline
751                .map(Ok)
752                .forward(indexer_peer_update_tx)
753                .await
754                .expect("The index to transport event chain failed");
755
756            tracing::info!(
757                task = "indexer pipeline for transport",
758                "long-running background task finished"
759            )
760        });
761
762        info!("Start the chain process and sync the indexer");
763        for (id, proc) in self.hopr_chain_api.start().await?.into_iter() {
764            let nid = match id {
765                HoprChainProcess::Indexer => HoprLibProcesses::Indexing,
766                HoprChainProcess::OutgoingOnchainActionQueue => HoprLibProcesses::OutgoingOnchainActionQueue,
767            };
768            processes.insert(nid, proc);
769        }
770
771        {
772            // Show onboarding information
773            let my_ethereum_address = self.me_onchain();
774            let my_peer_id = self.me_peer_id();
775            let my_version = crate::constants::APP_VERSION;
776
777            while !self
778                .db
779                .clone()
780                .is_allowed_in_network_registry(None, &my_ethereum_address)
781                .await
782                .unwrap_or(false)
783            {
784                info!(
785                    "Once you become eligible to join the HOPR network, you can continue your onboarding by using the following URL: https://hub.hoprnet.org/staking/onboarding?HOPRdNodeAddressForOnboarding={}, or by manually entering the node address of your node on https://hub.hoprnet.org/.",
786                    my_ethereum_address.to_hex()
787                );
788
789                sleep(ONBOARDING_INFORMATION_INTERVAL).await;
790
791                info!(peer_id = %my_peer_id, address = %my_ethereum_address.to_hex(), version = &my_version, "Node information");
792                info!("Node Ethereum address: {my_ethereum_address} <- put this into staking hub");
793            }
794        }
795
796        // Check Safe-module status:
797        // 1) if the node is already included into the module
798        // 2) if the module is enabled in the safe
799        // 3) if the safe is the owner of the module
800        // if any of the conditions is not met, return error
801        let safe_module_configuration = self
802            .hopr_chain_api
803            .rpc()
804            .check_node_safe_module_status(self.me_onchain())
805            .await
806            .map_err(HoprChainError::Rpc)?;
807
808        if !safe_module_configuration.should_pass() {
809            error!(
810                ?safe_module_configuration,
811                "Something is wrong with the safe module configuration",
812            );
813            return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
814                "Safe and module are not configured correctly {safe_module_configuration:?}",
815            ))));
816        }
817
818        // Possibly register a node-safe pair to NodeSafeRegistry.
819        // Following that, the connector is set to use safe tx variants.
820        if can_register_with_safe(
821            self.me_onchain(),
822            self.cfg.safe_module.safe_address,
823            self.hopr_chain_api.rpc(),
824        )
825        .await?
826        {
827            info!("Registering safe by node");
828
829            if self.me_onchain() == self.cfg.safe_module.safe_address {
830                return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
831            }
832
833            if let Err(e) = self
834                .hopr_chain_api
835                .actions_ref()
836                .register_safe_by_node(self.cfg.safe_module.safe_address)
837                .await?
838                .await
839            {
840                // Intentionally ignoring the errored state
841                error!(error = %e, "Failed to register node with safe")
842            }
843        }
844
845        if self.is_public() {
846            // At this point the node is already registered with Safe, so
847            // we can announce via Safe-compliant TX
848
849            let multiaddresses_to_announce = self.transport_api.announceable_multiaddresses();
850
851            // The announcement is intentionally not awaited until confirmation
852            match self
853                .hopr_chain_api
854                .actions_ref()
855                .announce(&multiaddresses_to_announce, &self.me)
856                .await
857            {
858                Ok(_) => info!(?multiaddresses_to_announce, "Announcing node on chain",),
859                Err(ChainActionsError::AlreadyAnnounced) => {
860                    info!(multiaddresses_announced = ?multiaddresses_to_announce, "Node already announced on chain")
861                }
862                // If the announcement fails, we keep going to prevent the node from retrying
863                // after restart.
864                // Functionality is limited, and users must check the logs for errors.
865                Err(e) => error!(error = %e, "Failed to transmit node announcement"),
866            }
867        }
868
869        {
870            // Sync key ids from indexed Accounts
871
872            // Sync the Channel graph
873            let channel_graph = self.channel_graph.clone();
874            let mut cg = channel_graph.write_arc().await;
875
876            info!("Syncing channels from the previous runs");
877            let mut channel_stream = self
878                .db
879                .stream_active_channels()
880                .await
881                .map_err(hopr_db_sql::api::errors::DbError::from)?;
882
883            while let Some(maybe_channel) = channel_stream.next().await {
884                match maybe_channel {
885                    Ok(channel) => {
886                        cg.update_channel(channel);
887                    }
888                    Err(error) => error!(%error, "Failed to sync channel into the graph"),
889                }
890            }
891
892            // Initialize node latencies and scores in the channel graph:
893            // Sync only those nodes that we know that had a good quality
894            // Other nodes will be repopulated into the channel graph during heartbeat
895            // rounds.
896            info!("Syncing peer qualities from the previous runs");
897            let min_quality_to_sync: f64 = std::env::var("HOPR_MIN_PEER_QUALITY_TO_SYNC")
898                .map_err(|e| e.to_string())
899                .and_then(|v| std::str::FromStr::from_str(&v).map_err(|_| "parse error".to_string()))
900                .unwrap_or_else(|error| {
901                    warn!(error, "invalid value for HOPR_MIN_PEER_QUALITY_TO_SYNC env variable");
902                    constants::DEFAULT_MIN_QUALITY_TO_SYNC
903                });
904
905            let mut peer_stream = self
906                .db
907                .get_network_peers(Default::default(), false)
908                .await?
909                .filter(|status| futures::future::ready(status.quality >= min_quality_to_sync));
910
911            while let Some(peer) = peer_stream.next().await {
912                if let Some(ChainKey(key)) = self.db.translate_key(None, peer.id.0).await? {
913                    // For nodes that had a good quality, we assign a perfect score
914                    cg.update_node_score(&key, NodeScoreUpdate::Initialize(peer.last_seen_latency, 1.0));
915                } else {
916                    error!(peer = %peer.id.1, "Could not translate peer information");
917                }
918            }
919
920            info!(
921                channels = cg.count_channels(),
922                nodes = cg.count_nodes(),
923                "Channel graph sync complete"
924            );
925        }
926
927        let socket = HoprSocket::new();
928        let transport_output_tx = socket.writer();
929
930        // notifier on acknowledged ticket reception
931        let multi_strategy_ack_ticket = self.multistrategy.clone();
932        let (on_ack_tkt_tx, mut on_ack_tkt_rx) = unbounded::<AcknowledgedTicket>();
933        self.db.start_ticket_processing(Some(on_ack_tkt_tx))?;
934
935        processes.insert(
936            HoprLibProcesses::OnReceivedAcknowledgement,
937            hopr_async_runtime::spawn_as_abortable!(async move {
938                while let Some(ack) = on_ack_tkt_rx.next().await {
939                    if let Err(error) = hopr_strategy::strategy::SingularStrategy::on_acknowledged_winning_ticket(
940                        &*multi_strategy_ack_ticket,
941                        &ack,
942                    )
943                    .await
944                    {
945                        error!(%error, "Failed to process acknowledged winning ticket with the strategy");
946                    }
947                }
948            }),
949        );
950
951        let (session_tx, _session_rx) = unbounded::<IncomingSession>();
952
953        #[cfg(feature = "session-server")]
954        {
955            processes.insert(
956                HoprLibProcesses::SessionServer,
957                hopr_async_runtime::spawn_as_abortable!(_session_rx.for_each_concurrent(None, move |session| {
958                    let serve_handler = serve_handler.clone();
959                    async move {
960                        let session_id = *session.session.id();
961                        match serve_handler.process(session).await {
962                            Ok(_) => debug!(
963                                session_id = ?session_id,
964                                "Client session processed successfully"
965                            ),
966                            Err(e) => error!(
967                                session_id = ?session_id,
968                                error = %e,
969                                "Client session processing failed"
970                            ),
971                        }
972                    }
973                })),
974            );
975        }
976
977        for (id, proc) in self
978            .transport_api
979            .run(&self.me_chain, transport_output_tx, indexer_peer_update_rx, session_tx)
980            .await?
981            .into_iter()
982        {
983            processes.insert(id.into(), proc);
984        }
985
986        let db_clone = self.db.clone();
987        processes.insert(
988            HoprLibProcesses::TicketIndexFlush,
989            hopr_async_runtime::spawn_as_abortable!(Box::pin(execute_on_tick(
990                Duration::from_secs(5),
991                move || {
992                    let db_clone = db_clone.clone();
993                    async move {
994                        match db_clone.persist_outgoing_ticket_indices().await {
995                            Ok(n) => debug!(count = n, "Successfully flushed states of outgoing ticket indices"),
996                            Err(e) => error!(error = %e, "Failed to flush ticket indices"),
997                        }
998                    }
999                },
1000                "flush the states of outgoing ticket indices".into(),
1001            ))),
1002        );
1003
1004        // NOTE: after the chain is synced, we can reset tickets which are considered
1005        // redeemed but on-chain state does not align with that. This implies there was a problem
1006        // right when the transaction was sent on-chain. In such cases, we simply let it retry and
1007        // handle errors appropriately.
1008        if let Err(e) = self.db.fix_channels_next_ticket_state().await {
1009            error!(error = %e, "failed to fix channels ticket states");
1010        }
1011
1012        // NOTE: strategy ticks must start after the chain is synced, otherwise
1013        // the strategy would react to historical data and drain through the native
1014        // balance on chain operations not relevant for the present network state
1015        let multi_strategy = self.multistrategy.clone();
1016        let strategy_interval = self.cfg.strategy.execution_interval;
1017        processes.insert(
1018            HoprLibProcesses::StrategyTick,
1019            hopr_async_runtime::spawn_as_abortable!(async move {
1020                execute_on_tick(
1021                    Duration::from_secs(strategy_interval),
1022                    move || {
1023                        let multi_strategy = multi_strategy.clone();
1024
1025                        async move {
1026                            trace!(state = "started", "strategy tick");
1027                            let _ = multi_strategy.on_tick().await;
1028                            trace!(state = "finished", "strategy tick");
1029                        }
1030                    },
1031                    "run strategies".into(),
1032                )
1033                .await;
1034            }),
1035        );
1036
1037        self.state.store(HoprState::Running, Ordering::Relaxed);
1038
1039        info!(
1040            id = %self.me_peer_id(),
1041            version = constants::APP_VERSION,
1042            "NODE STARTED AND RUNNING"
1043        );
1044
1045        #[cfg(all(feature = "prometheus", not(test)))]
1046        METRIC_HOPR_NODE_INFO.set(
1047            &[
1048                &self.me.public().to_peerid_str(),
1049                &self.me_onchain().to_string(),
1050                &self.cfg.safe_module.safe_address.to_string(),
1051                &self.cfg.safe_module.module_address.to_string(),
1052            ],
1053            1.0,
1054        );
1055
1056        Ok((socket, processes))
1057    }
1058
1059    // p2p transport =========
1060    /// Own PeerId used in the libp2p transport layer
1061    pub fn me_peer_id(&self) -> PeerId {
1062        (*self.me.public()).into()
1063    }
1064
1065    /// Get the list of all announced public nodes in the network
1066    pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
1067        Ok(self.transport_api.get_public_nodes().await?)
1068    }
1069
1070    /// Returns the most recently indexed log, if any.
1071    pub async fn get_indexer_state(&self) -> errors::Result<IndexerStateInfo> {
1072        let indexer_state_info = self.db.get_indexer_state_info(None).await?;
1073
1074        match self.db.get_last_checksummed_log().await? {
1075            Some(log) => {
1076                let checksum = match log.checksum {
1077                    Some(checksum) => Hash::from_hex(checksum.as_str())?,
1078                    None => Hash::default(),
1079                };
1080                Ok(IndexerStateInfo {
1081                    latest_log_block_number: log.block_number as u32,
1082                    latest_log_checksum: checksum,
1083                    ..indexer_state_info
1084                })
1085            }
1086            None => Ok(indexer_state_info),
1087        }
1088    }
1089
1090    /// Test whether the peer with PeerId is allowed to access the network
1091    pub async fn is_allowed_to_access_network(
1092        &self,
1093        address_like: either::Either<&PeerId, Address>,
1094    ) -> errors::Result<bool> {
1095        Ok(self.transport_api.is_allowed_to_access_network(address_like).await?)
1096    }
1097
1098    /// Ping another node in the network based on the PeerId
1099    ///
1100    /// Returns the RTT (round trip time), i.e. how long it took for the ping to return.
1101    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
1102        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1103
1104        Ok(self.transport_api.ping(peer).await?)
1105    }
1106
1107    /// Create a client session connection returning a session object that implements
1108    /// [`futures::io::AsyncRead`] and [`futures::io::AsyncWrite`] and can bu used as a read/write binary session.
1109    #[cfg(feature = "session-client")]
1110    pub async fn connect_to(
1111        &self,
1112        destination: Address,
1113        target: SessionTarget,
1114        cfg: SessionClientConfig,
1115    ) -> errors::Result<HoprSession> {
1116        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1117
1118        let backoff = backon::ConstantBuilder::default()
1119            .with_max_times(self.cfg.session.establish_max_retries as usize)
1120            .with_delay(self.cfg.session.establish_retry_timeout)
1121            .with_jitter();
1122
1123        use backon::Retryable;
1124
1125        Ok((|| {
1126            let cfg = cfg.clone();
1127            let target = target.clone();
1128            async { self.transport_api.new_session(destination, target, cfg).await }
1129        })
1130        .retry(backoff)
1131        .sleep(backon::FuturesTimerSleeper)
1132        .await?)
1133    }
1134
1135    /// Sends keep-alive to the given [`HoprSessionId`], making sure the session is not
1136    /// closed due to inactivity.
1137    #[cfg(feature = "session-client")]
1138    pub async fn keep_alive_session(&self, id: &HoprSessionId) -> errors::Result<()> {
1139        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1140        Ok(self.transport_api.probe_session(id).await?)
1141    }
1142
1143    #[cfg(feature = "session-client")]
1144    pub async fn get_session_surb_balancer_config(
1145        &self,
1146        id: &HoprSessionId,
1147    ) -> errors::Result<Option<SurbBalancerConfig>> {
1148        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1149        Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
1150    }
1151
1152    #[cfg(feature = "session-client")]
1153    pub async fn update_session_surb_balancer_config(
1154        &self,
1155        id: &HoprSessionId,
1156        cfg: SurbBalancerConfig,
1157    ) -> errors::Result<()> {
1158        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1159        Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
1160    }
1161
1162    /// Send a message to another peer in the network
1163    ///
1164    /// @param msg message to send
1165    /// @param destination PeerId of the destination
1166    /// @param options optional configuration of the message path using hops and intermediatePath
1167    /// @param applicationTag optional tag identifying the sending application
1168    /// @returns ack challenge
1169    #[tracing::instrument(level = "debug", skip(self, msg))]
1170    pub async fn send_message(
1171        &self,
1172        msg: Box<[u8]>,
1173        routing: DestinationRouting,
1174        application_tag: Tag,
1175    ) -> errors::Result<()> {
1176        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1177
1178        self.transport_api.send_message(msg, routing, application_tag).await?;
1179
1180        Ok(())
1181    }
1182
1183    /// Attempts to aggregate all tickets in the given channel
1184    pub async fn aggregate_tickets(&self, channel: &Hash) -> errors::Result<()> {
1185        Ok(self.transport_api.aggregate_tickets(channel).await?)
1186    }
1187
1188    /// List all multiaddresses announced by this node
1189    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1190        self.transport_api.local_multiaddresses()
1191    }
1192
1193    /// List all multiaddresses on which the node is listening
1194    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1195        self.transport_api.listening_multiaddresses().await
1196    }
1197
1198    /// List all multiaddresses observed for a PeerId
1199    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1200        self.transport_api.network_observed_multiaddresses(peer).await
1201    }
1202
1203    /// List all multiaddresses announced on-chain for the given node.
1204    pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Vec<Multiaddr> {
1205        let peer = *peer;
1206        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
1207        let pubkey = match hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await {
1208            Ok(k) => k,
1209            Err(e) => {
1210                error!(%peer, error = %e, "failed to convert peer id to off-chain key");
1211                return vec![];
1212            }
1213        };
1214
1215        match self.db.get_account(None, pubkey).await {
1216            Ok(Some(entry)) => Vec::from_iter(entry.get_multiaddr()),
1217            Ok(None) => {
1218                error!(%peer, "no information");
1219                vec![]
1220            }
1221            Err(e) => {
1222                error!(%peer, error = %e, "failed to retrieve information");
1223                vec![]
1224            }
1225        }
1226    }
1227
1228    // Network =========
1229
1230    /// Get measured network health
1231    pub async fn network_health(&self) -> Health {
1232        self.transport_api.network_health().await
1233    }
1234
1235    /// List all peers connected to this
1236    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
1237        Ok(self.transport_api.network_connected_peers().await?)
1238    }
1239
1240    /// Get all data collected from the network relevant for a PeerId
1241    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<hopr_transport::PeerStatus>> {
1242        Ok(self.transport_api.network_peer_info(peer).await?)
1243    }
1244
1245    /// Get peers connected peers with quality higher than some value
1246    pub async fn all_network_peers(
1247        &self,
1248        minimum_quality: f64,
1249    ) -> errors::Result<Vec<(Option<Address>, PeerId, hopr_transport::PeerStatus)>> {
1250        Ok(
1251            futures::stream::iter(self.transport_api.network_connected_peers().await?)
1252                .filter_map(|peer| async move {
1253                    if let Ok(Some(info)) = self.transport_api.network_peer_info(&peer).await {
1254                        if info.get_average_quality() >= minimum_quality {
1255                            Some((peer, info))
1256                        } else {
1257                            None
1258                        }
1259                    } else {
1260                        None
1261                    }
1262                })
1263                .filter_map(|(peer_id, info)| async move {
1264                    let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
1265                    Some((address, peer_id, info))
1266                })
1267                .collect::<Vec<_>>()
1268                .await,
1269        )
1270    }
1271
1272    // Ticket ========
1273    /// Get all tickets in a channel specified by Hash
1274    pub async fn tickets_in_channel(&self, channel: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
1275        Ok(self.transport_api.tickets_in_channel(channel).await?)
1276    }
1277
1278    /// Get all tickets
1279    pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
1280        Ok(self.transport_api.all_tickets().await?)
1281    }
1282
1283    /// Get statistics for all tickets
1284    pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
1285        Ok(self.transport_api.ticket_statistics().await?)
1286    }
1287
1288    /// Reset the ticket metrics to zero
1289    pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
1290        Ok(self.db.reset_ticket_statistics().await?)
1291    }
1292
1293    // DB ============
1294    pub fn peer_resolver(&self) -> &impl HoprDbResolverOperations {
1295        &self.db
1296    }
1297
1298    // Chain =========
1299    pub fn me_onchain(&self) -> Address {
1300        self.hopr_chain_api.me_onchain()
1301    }
1302
1303    /// Get ticket price
1304    pub async fn get_ticket_price(&self) -> errors::Result<Option<HoprBalance>> {
1305        Ok(self.hopr_chain_api.ticket_price().await?)
1306    }
1307
1308    /// Get minimum incoming ticket winning probability
1309    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
1310        Ok(self
1311            .db
1312            .get_indexer_data(None)
1313            .await?
1314            .minimum_incoming_ticket_winning_prob)
1315    }
1316
1317    /// List of all accounts announced on the chain
1318    pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1319        Ok(self.hopr_chain_api.accounts_announced_on_chain().await?)
1320    }
1321
1322    /// Get the channel entry from Hash.
1323    /// @returns the channel entry of those two nodes
1324    pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1325        Ok(self.db.get_channel_by_id(None, channel_id).await?)
1326    }
1327
1328    /// Get the channel entry between source and destination node.
1329    /// @param src Address
1330    /// @param dest Address
1331    /// @returns the channel entry of those two nodes
1332    pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<ChannelEntry> {
1333        Ok(self.hopr_chain_api.channel(src, dest).await?)
1334    }
1335
1336    /// List all channels open from a specified Address
1337    pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1338        Ok(self.hopr_chain_api.channels_from(src).await?)
1339    }
1340
1341    /// List all channels open to a specified address
1342    pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1343        Ok(self.hopr_chain_api.channels_to(dest).await?)
1344    }
1345
1346    /// List all channels
1347    pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1348        Ok(self.hopr_chain_api.all_channels().await?)
1349    }
1350
1351    /// List all corrupted channels
1352    pub async fn corrupted_channels(&self) -> errors::Result<Vec<CorruptedChannelEntry>> {
1353        Ok(self.hopr_chain_api.corrupted_channels().await?)
1354    }
1355
1356    /// Current safe allowance balance
1357    pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1358        Ok(self.hopr_chain_api.safe_allowance().await?)
1359    }
1360
1361    /// Withdraw on-chain assets to a given address
1362    /// @param recipient the account where the assets should be transferred to
1363    /// @param amount how many tokens to be transferred
1364    pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<Hash> {
1365        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1366
1367        let awaiter = self.hopr_chain_api.actions_ref().withdraw(recipient, amount).await?;
1368
1369        Ok(awaiter.await?.tx_hash)
1370    }
1371
1372    /// Withdraw on-chain native assets to a given address
1373    /// @param recipient the account where the assets should be transferred to
1374    /// @param amount how many tokens to be transferred
1375    pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<Hash> {
1376        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1377
1378        let awaiter = self
1379            .hopr_chain_api
1380            .actions_ref()
1381            .withdraw_native(recipient, amount)
1382            .await?;
1383
1384        Ok(awaiter.await?.tx_hash)
1385    }
1386
1387    pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1388        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1389
1390        let awaiter = self
1391            .hopr_chain_api
1392            .actions_ref()
1393            .open_channel(*destination, amount)
1394            .await?;
1395
1396        let channel_id = generate_channel_id(&self.hopr_chain_api.me_onchain(), destination);
1397        Ok(awaiter.await.map(|confirm| OpenChannelResult {
1398            tx_hash: confirm.tx_hash,
1399            channel_id,
1400        })?)
1401    }
1402
1403    pub async fn fund_channel(&self, channel_id: &Hash, amount: HoprBalance) -> errors::Result<Hash> {
1404        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1405
1406        let awaiter = self
1407            .hopr_chain_api
1408            .actions_ref()
1409            .fund_channel(*channel_id, amount)
1410            .await?;
1411
1412        Ok(awaiter.await?.tx_hash)
1413    }
1414
1415    pub async fn close_channel(
1416        &self,
1417        counterparty: &Address,
1418        direction: ChannelDirection,
1419        redeem_before_close: bool,
1420    ) -> errors::Result<CloseChannelResult> {
1421        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1422
1423        let confirmation = self
1424            .hopr_chain_api
1425            .actions_ref()
1426            .close_channel(*counterparty, direction, redeem_before_close)
1427            .await?
1428            .await?;
1429
1430        match confirmation
1431            .event
1432            .expect("channel close action confirmation must have associated chain event")
1433        {
1434            ChainEventType::ChannelClosureInitiated(c) => Ok(CloseChannelResult {
1435                tx_hash: confirmation.tx_hash,
1436                status: c.status, // copy the information about closure time
1437            }),
1438            ChainEventType::ChannelClosed(_) => Ok(CloseChannelResult {
1439                tx_hash: confirmation.tx_hash,
1440                status: ChannelStatus::Closed,
1441            }),
1442            _ => Err(HoprLibError::GeneralError("close channel transaction failed".into())),
1443        }
1444    }
1445
1446    pub async fn close_channel_by_id(
1447        &self,
1448        channel_id: Hash,
1449        redeem_before_close: bool,
1450    ) -> errors::Result<CloseChannelResult> {
1451        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1452
1453        match self.channel_from_hash(&channel_id).await? {
1454            Some(channel) => match channel.orientation(&self.me_onchain()) {
1455                Some((direction, counterparty)) => {
1456                    self.close_channel(&counterparty, direction, redeem_before_close).await
1457                }
1458                None => Err(HoprLibError::ChainError(ChainActionsError::InvalidArguments(
1459                    "cannot close channel that is not own".into(),
1460                ))),
1461            },
1462            None => Err(HoprLibError::ChainError(ChainActionsError::ChannelDoesNotExist)),
1463        }
1464    }
1465
1466    pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1467        Ok(self.hopr_chain_api.get_channel_closure_notice_period().await?)
1468    }
1469
1470    pub async fn redeem_all_tickets<B: Into<HoprBalance>>(
1471        &self,
1472        min_value: B,
1473        only_aggregated: bool,
1474    ) -> errors::Result<()> {
1475        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1476
1477        // We do not await the on-chain confirmation
1478        self.hopr_chain_api
1479            .actions_ref()
1480            .redeem_all_tickets(min_value.into(), only_aggregated)
1481            .await?;
1482
1483        Ok(())
1484    }
1485
1486    pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1487        &self,
1488        counterparty: &Address,
1489        min_value: B,
1490        only_aggregated: bool,
1491    ) -> errors::Result<()> {
1492        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1493
1494        // We do not await the on-chain confirmation
1495        let _ = self
1496            .hopr_chain_api
1497            .actions_ref()
1498            .redeem_tickets_with_counterparty(counterparty, min_value.into(), only_aggregated)
1499            .await?;
1500
1501        Ok(())
1502    }
1503
1504    pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1505        &self,
1506        channel_id: &Hash,
1507        min_value: B,
1508        only_aggregated: bool,
1509    ) -> errors::Result<usize> {
1510        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1511
1512        let channel = self.db.get_channel_by_id(None, channel_id).await?;
1513        let mut redeem_count = 0;
1514
1515        if let Some(channel) = channel {
1516            if channel.destination == self.hopr_chain_api.me_onchain() {
1517                // We do not await the on-chain confirmation
1518                redeem_count = self
1519                    .hopr_chain_api
1520                    .actions_ref()
1521                    .redeem_tickets_in_channel(&channel, min_value.into(), only_aggregated)
1522                    .await?
1523                    .len();
1524            }
1525        }
1526
1527        Ok(redeem_count)
1528    }
1529
1530    pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1531        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1532
1533        // We do not await the on-chain confirmation
1534        #[allow(clippy::let_underscore_future)]
1535        let _ = self.hopr_chain_api.actions_ref().redeem_ticket(ack_ticket).await?;
1536
1537        Ok(())
1538    }
1539
1540    pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1541        let peer_id = *peer_id;
1542        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
1543        let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer_id))
1544            .await
1545            .map_err(|e| HoprLibError::GeneralError(format!("failed to convert peer id to off-chain key: {}", e)))?;
1546        Ok(self.db.resolve_chain_key(&pubkey).await?)
1547    }
1548
1549    pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1550        Ok(self
1551            .db
1552            .resolve_packet_key(address)
1553            .await
1554            .map(|pk| pk.map(|v| v.into()))?)
1555    }
1556
1557    pub async fn export_channel_graph(&self, cfg: GraphExportConfig) -> String {
1558        self.channel_graph.read_arc().await.as_dot(cfg)
1559    }
1560
1561    pub async fn export_raw_channel_graph(&self) -> errors::Result<String> {
1562        let cg = self.channel_graph.read_arc().await;
1563        serde_json::to_string(cg.deref()).map_err(|e| HoprLibError::GeneralError(e.to_string()))
1564    }
1565}