hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15
16/// Configuration-related public types
17pub mod config;
18/// Various public constants.
19pub mod constants;
20/// Lists all errors thrown from this library.
21pub mod errors;
22
23use std::{
24    collections::HashMap,
25    fmt::{Display, Formatter},
26    ops::Deref,
27    path::PathBuf,
28    str::FromStr,
29    sync::{Arc, atomic::Ordering},
30    time::Duration,
31};
32
33use async_lock::RwLock;
34use errors::{HoprLibError, HoprStatusError};
35use futures::{
36    SinkExt, Stream, StreamExt,
37    channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded},
38    future::AbortHandle,
39    stream::{self},
40};
41use hopr_async_runtime::prelude::{sleep, spawn};
42pub use hopr_chain_actions::errors::ChainActionsError;
43use hopr_chain_actions::{
44    action_state::{ActionState, IndexerActionTracker},
45    channels::ChannelActions,
46    node::NodeActions,
47    redeem::TicketRedeemActions,
48};
49pub use hopr_chain_api::config::{
50    Addresses as NetworkContractAddresses, EnvironmentType, Network as ChainNetwork, ProtocolsConfig,
51};
52use hopr_chain_api::{
53    HoprChain, HoprChainProcess, SignificantChainEvent, can_register_with_safe, config::ChainNetworkConfig,
54    errors::HoprChainError, wait_for_funds,
55};
56use hopr_chain_rpc::HoprRpcOperations;
57use hopr_chain_types::{ContractAddresses, chain_events::ChainEventType};
58use hopr_crypto_types::prelude::OffchainPublicKey;
59use hopr_db_api::logs::HoprDbLogOperations;
60use hopr_db_sql::{
61    HoprDbAllOperations,
62    accounts::HoprDbAccountOperations,
63    api::{info::SafeInfo, resolver::HoprDbResolverOperations, tickets::HoprDbTicketOperations},
64    channels::HoprDbChannelOperations,
65    db::{HoprDb, HoprDbConfig},
66    info::{HoprDbInfoOperations, IndexerStateInfo},
67    prelude::{ChainOrPacketKey::ChainKey, HoprDbPeersOperations},
68    registry::HoprDbRegistryOperations,
69};
70pub use hopr_internal_types::prelude::*;
71pub use hopr_network_types::prelude::{DestinationRouting, IpProtocol, RoutingOptions};
72pub use hopr_path::channel_graph::GraphExportConfig;
73use hopr_path::channel_graph::{ChannelGraph, ChannelGraphConfig, NodeScoreUpdate};
74use hopr_platform::file::native::{join, remove_dir_all};
75pub use hopr_primitive_types::prelude::*;
76pub use hopr_strategy::Strategy;
77use hopr_strategy::strategy::{MultiStrategy, SingularStrategy};
78#[cfg(feature = "runtime-tokio")]
79pub use hopr_transport::transfer_session;
80pub use hopr_transport::{
81    ApplicationData, HalfKeyChallenge, Health, IncomingSession as HoprIncomingSession, Keypair, Multiaddr,
82    OffchainKeypair as HoprOffchainKeypair, PeerId, PingQueryReplier, ProbeError, SESSION_MTU, SURB_SIZE, ServiceId,
83    Session as HoprSession, SessionCapabilities, SessionCapability, SessionClientConfig, SessionId as HoprSessionId,
84    SessionManagerError, SessionTarget, SurbBalancerConfig, Tag, TicketStatistics, TransportSessionError,
85    config::{HostConfig, HostType, looks_like_domain},
86    errors::{HoprTransportError, NetworkingError, ProtocolError},
87};
88use hopr_transport::{
89    ChainKeypair, Hash, HoprTransport, HoprTransportConfig, HoprTransportProcess, IncomingSession, OffchainKeypair,
90    PeerDiscovery, PeerStatus, execute_on_tick,
91};
92use tracing::{debug, error, info, trace, warn};
93#[cfg(all(feature = "prometheus", not(test)))]
94use {
95    hopr_metrics::metrics::{MultiGauge, SimpleGauge},
96    hopr_platform::time::native::current_time,
97};
98
99use crate::{
100    config::SafeModule,
101    constants::{MIN_NATIVE_BALANCE, ONBOARDING_INFORMATION_INTERVAL, SUGGESTED_NATIVE_BALANCE},
102};
103
104#[cfg(all(feature = "prometheus", not(test)))]
105lazy_static::lazy_static! {
106    static ref METRIC_PROCESS_START_TIME: SimpleGauge = SimpleGauge::new(
107        "hopr_start_time",
108        "The unix timestamp in seconds at which the process was started"
109    ).unwrap();
110    static ref METRIC_HOPR_LIB_VERSION: MultiGauge = MultiGauge::new(
111        "hopr_lib_version",
112        "Executed version of hopr-lib",
113        &["version"]
114    ).unwrap();
115    static ref METRIC_HOPR_NODE_INFO: MultiGauge = MultiGauge::new(
116        "hopr_node_addresses",
117        "Node on-chain and off-chain addresses",
118        &["peerid", "address", "safe_address", "module_address"]
119    ).unwrap();
120}
121
122pub use async_trait::async_trait;
123
124/// Interface representing the HOPR server behavior for each incoming session instance
125/// supplied as an argument.
126#[cfg(feature = "session-server")]
127#[async_trait::async_trait]
128pub trait HoprSessionReactor {
129    /// Fully process a single HOPR session
130    async fn process(&self, session: HoprIncomingSession) -> errors::Result<()>;
131}
132
133/// An enum representing the current state of the HOPR node
134#[atomic_enum::atomic_enum]
135#[derive(PartialEq, Eq)]
136pub enum HoprState {
137    Uninitialized = 0,
138    Initializing = 1,
139    Indexing = 2,
140    Starting = 3,
141    Running = 4,
142}
143
144impl Display for HoprState {
145    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146        write!(f, "{self:?}")
147    }
148}
149
150pub struct OpenChannelResult {
151    pub tx_hash: Hash,
152    pub channel_id: Hash,
153}
154
155pub struct CloseChannelResult {
156    pub tx_hash: Hash,
157    pub status: ChannelStatus,
158}
159
160/// Enum differentiator for loop component futures.
161///
162/// Used to differentiate the type of the future that exits the loop premateruly
163/// by tagging it as an enum.
164#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display)]
165pub enum HoprLibProcesses {
166    #[strum(to_string = "transport: {0}")]
167    Transport(HoprTransportProcess),
168    #[cfg(feature = "session-server")]
169    #[strum(to_string = "session server providing the exit node session stream functionality")]
170    SessionServer,
171    #[strum(to_string = "tick wake up the strategies to perform an action")]
172    StrategyTick,
173    #[strum(to_string = "initial indexing operation into the DB")]
174    Indexing,
175    #[strum(to_string = "processing of indexed operations in internal components")]
176    IndexReflection,
177    #[strum(to_string = "on-chain transaction queue component for outgoing transactions")]
178    OutgoingOnchainActionQueue,
179    #[strum(to_string = "flush operation of outgoing ticket indices to the DB")]
180    TicketIndexFlush,
181    #[strum(to_string = "on received ack ticket trigger")]
182    OnReceivedAcknowledgement,
183}
184
185impl HoprLibProcesses {
186    /// Identifies whether a loop is allowed to finish or should
187    /// run indefinitely.
188    pub fn can_finish(&self) -> bool {
189        matches!(self, HoprLibProcesses::Indexing)
190    }
191}
192
193impl From<HoprTransportProcess> for HoprLibProcesses {
194    fn from(value: HoprTransportProcess) -> Self {
195        HoprLibProcesses::Transport(value)
196    }
197}
198
199/// Creates a pipeline that chains the indexer-generated data, processes them into
200/// the individual components, and creates a filtered output stream that is fed into
201/// the transport layer swarm.
202///
203/// * `event_stream` - represents the events generated by the indexer. If the Indexer is not synced, it will not
204///   generate any events.
205/// * `preloading_event_stream` - a stream used by the components to preload the data from the objects (db, channel
206///   graph...)
207#[allow(clippy::too_many_arguments)]
208pub async fn chain_events_to_transport_events<StreamIn, Db>(
209    event_stream: StreamIn,
210    me_onchain: Address,
211    db: Db,
212    multi_strategy: Arc<MultiStrategy>,
213    channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
214    indexer_action_tracker: Arc<IndexerActionTracker>,
215) -> impl Stream<Item = PeerDiscovery> + Send + 'static
216where
217    Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
218    StreamIn: Stream<Item = SignificantChainEvent> + Send + 'static,
219{
220    Box::pin(event_stream.filter_map(move |event| {
221        let db = db.clone();
222        let multi_strategy = multi_strategy.clone();
223        let channel_graph = channel_graph.clone();
224        let indexer_action_tracker = indexer_action_tracker.clone();
225
226        async move {
227            let resolved = indexer_action_tracker.match_and_resolve(&event).await;
228            if resolved.is_empty() {
229                trace!(%event, "No indexer expectations resolved for the event");
230            } else {
231                debug!(count = resolved.len(), %event, "resolved indexer expectations");
232            }
233
234            match event.event_type {
235                ChainEventType::Announcement{peer, address, multiaddresses} => {
236                    let allowed = db
237                        .is_allowed_in_network_registry(None, &address)
238                        .await
239                        .unwrap_or(false);
240
241                    Some(vec![PeerDiscovery::Announce(peer, multiaddresses), if allowed {
242                        PeerDiscovery::Allow(peer)
243                    } else {
244                        PeerDiscovery::Ban(peer)
245                    }])
246                }
247                ChainEventType::ChannelOpened(channel) |
248                ChainEventType::ChannelClosureInitiated(channel) |
249                ChainEventType::ChannelClosed(channel) |
250                ChainEventType::ChannelBalanceIncreased(channel, _) | // needed ?
251                ChainEventType::ChannelBalanceDecreased(channel, _) | // needed ?
252                ChainEventType::TicketRedeemed(channel, _) => {   // needed ?
253                    let maybe_direction = channel.direction(&me_onchain);
254
255                    let change = channel_graph
256                        .write_arc()
257                        .await
258                        .update_channel(channel);
259
260                    // Check if this is our own channel
261                    if let Some(own_channel_direction) = maybe_direction {
262                        if let Some(change_set) = change {
263                            for channel_change in change_set {
264                                let _ = hopr_strategy::strategy::SingularStrategy::on_own_channel_changed(
265                                    &*multi_strategy,
266                                    &channel,
267                                    own_channel_direction,
268                                    channel_change,
269                                )
270                                .await;
271                            }
272                        } else if channel.status == ChannelStatus::Open {
273                            // Emit Opening event if the channel did not exist before in the graph
274                            let _ = hopr_strategy::strategy::SingularStrategy::on_own_channel_changed(
275                                &*multi_strategy,
276                                &channel,
277                                own_channel_direction,
278                                ChannelChange::Status {
279                                    left: ChannelStatus::Closed,
280                                    right: ChannelStatus::Open,
281                                },
282                            )
283                            .await;
284                        }
285                    }
286
287                    None
288                }
289                ChainEventType::NetworkRegistryUpdate(address, allowed) => {
290                    let packet_key = db.translate_key(None, address).await;
291                    match packet_key {
292                        Ok(pk) => {
293                            if let Some(pk) = pk {
294                                let offchain_key: Result<OffchainPublicKey, _> = pk.try_into();
295
296                                if let Ok(offchain_key) = offchain_key {
297                                    let peer_id = offchain_key.into();
298
299                                    let res = match allowed {
300                                        hopr_chain_types::chain_events::NetworkRegistryStatus::Allowed => PeerDiscovery::Allow(peer_id),
301                                        hopr_chain_types::chain_events::NetworkRegistryStatus::Denied => PeerDiscovery::Ban(peer_id),
302                                    };
303
304                                    Some(vec![res])
305                                } else {
306                                    error!("Failed to unwrap as offchain key at this point");
307                                    None
308                                }
309                            } else {
310                                None
311                            }
312                        }
313                        Err(error) => {
314                            error!(%error, "on_network_registry_node_allowed failed");
315                            None
316                        },
317                    }
318                }
319                ChainEventType::NodeSafeRegistered(safe_address) =>  {
320                    info!(%safe_address, "Node safe registered");
321                    None
322                }
323            }
324        }
325    })
326    .flat_map(stream::iter)
327)
328}
329
330/// Represents the socket behavior of the hopr-lib spawned [`Hopr`] object.
331///
332/// Provides a read and write stream for Hopr socket recognized data formats.
333pub struct HoprSocket {
334    rx: UnboundedReceiver<ApplicationData>,
335    tx: UnboundedSender<ApplicationData>,
336}
337
338impl Default for HoprSocket {
339    fn default() -> Self {
340        let (tx, rx) = unbounded::<ApplicationData>();
341        Self { rx, tx }
342    }
343}
344
345impl HoprSocket {
346    pub fn new() -> Self {
347        Self::default()
348    }
349
350    pub fn reader(self) -> UnboundedReceiver<ApplicationData> {
351        self.rx
352    }
353
354    pub fn writer(&self) -> UnboundedSender<ApplicationData> {
355        self.tx.clone()
356    }
357}
358
359/// HOPR main object providing the entire HOPR node functionality
360///
361/// Instantiating this object creates all processes and objects necessary for
362/// running the HOPR node. Once created, the node can be started using the
363/// `run()` method.
364///
365/// Externally offered API should be sufficient to perform all necessary tasks
366/// with the HOPR node manually, but it is advised to create such a configuration
367/// that manual interaction is unnecessary.
368///
369/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
370pub struct Hopr {
371    me: OffchainKeypair,
372    me_chain: ChainKeypair,
373    cfg: config::HoprLibConfig,
374    state: Arc<AtomicHoprState>,
375    transport_api: HoprTransport<HoprDb>,
376    hopr_chain_api: HoprChain<HoprDb>,
377    // objects that could be removed pending architectural cleanup ========
378    db: HoprDb,
379    chain_cfg: ChainNetworkConfig,
380    channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
381    multistrategy: Arc<MultiStrategy>,
382    rx_indexer_significant_events: async_channel::Receiver<SignificantChainEvent>,
383}
384
385impl Hopr {
386    pub fn new(
387        mut cfg: config::HoprLibConfig,
388        me: &OffchainKeypair,
389        me_onchain: &ChainKeypair,
390    ) -> crate::errors::Result<Self> {
391        let multiaddress: Multiaddr = (&cfg.host).try_into()?;
392
393        let db_path: PathBuf = [&cfg.db.data, "db"].iter().collect();
394        info!(path = ?db_path, "Initiating DB");
395
396        if cfg.db.force_initialize {
397            info!("Force cleaning up existing database");
398            remove_dir_all(db_path.as_path()).map_err(|e| {
399                HoprLibError::GeneralError(format!(
400                    "Failed to remove the existing DB directory at '{db_path:?}': {e}"
401                ))
402            })?;
403            cfg.db.initialize = true
404        }
405
406        // create DB dir if it does not exist
407        if let Some(parent_dir_path) = db_path.as_path().parent() {
408            if !parent_dir_path.is_dir() {
409                std::fs::create_dir_all(parent_dir_path).map_err(|e| {
410                    HoprLibError::GeneralError(format!(
411                        "Failed to create DB parent directory at '{parent_dir_path:?}': {e}"
412                    ))
413                })?
414            }
415        }
416
417        let db_cfg = HoprDbConfig {
418            create_if_missing: cfg.db.initialize,
419            force_create: cfg.db.force_initialize,
420            log_slow_queries: std::time::Duration::from_millis(150),
421            surb_ring_buffer_size: std::env::var("HOPR_PROTOCOL_SURB_RB_SIZE")
422                .ok()
423                .and_then(|s| u64::from_str(&s).map(|v| v as usize).ok())
424                .unwrap_or_else(|| HoprDbConfig::default().surb_ring_buffer_size),
425            surb_distress_threshold: std::env::var("HOPR_PROTOCOL_SURB_RB_DISTRESS")
426                .ok()
427                .and_then(|s| u64::from_str(&s).map(|v| v as usize).ok())
428                .unwrap_or_else(|| HoprDbConfig::default().surb_distress_threshold),
429        };
430        let db = futures::executor::block_on(HoprDb::new(db_path.as_path(), me_onchain.clone(), db_cfg))?;
431
432        if let Some(provider) = &cfg.chain.provider {
433            info!(provider, "Creating chain components using the custom provider");
434        } else {
435            info!("Creating chain components using the default provider");
436        }
437        let resolved_environment = hopr_chain_api::config::ChainNetworkConfig::new(
438            &cfg.chain.network,
439            crate::constants::APP_VERSION_COERCED,
440            cfg.chain.provider.as_deref(),
441            cfg.chain.max_rpc_requests_per_sec,
442            &mut cfg.chain.protocols,
443        )
444        .map_err(|e| HoprLibError::GeneralError(format!("Failed to resolve blockchain environment: {e}")))?;
445
446        let contract_addresses = ContractAddresses::from(&resolved_environment);
447        info!(
448            myself = me_onchain.public().to_hex(),
449            contract_addresses = ?contract_addresses,
450            "Resolved contract addresses",
451        );
452
453        let my_multiaddresses = vec![multiaddress];
454
455        let (tx_indexer_events, rx_indexer_events) = async_channel::unbounded::<SignificantChainEvent>();
456
457        let channel_graph = Arc::new(RwLock::new(ChannelGraph::new(
458            me_onchain.public().to_address(),
459            ChannelGraphConfig::default(),
460        )));
461
462        let hopr_transport_api = HoprTransport::new(
463            me,
464            me_onchain,
465            HoprTransportConfig {
466                transport: cfg.transport.clone(),
467                network: cfg.network_options.clone(),
468                protocol: cfg.protocol,
469                probe: cfg.probe,
470                session: cfg.session,
471            },
472            db.clone(),
473            channel_graph.clone(),
474            my_multiaddresses,
475        );
476
477        let hopr_hopr_chain_api = hopr_chain_api::HoprChain::new(
478            me_onchain.clone(),
479            db.clone(),
480            resolved_environment.clone(),
481            cfg.safe_module.module_address,
482            ContractAddresses {
483                announcements: resolved_environment.announcements,
484                channels: resolved_environment.channels,
485                token: resolved_environment.token,
486                price_oracle: resolved_environment.ticket_price_oracle,
487                win_prob_oracle: resolved_environment.winning_probability_oracle,
488                network_registry: resolved_environment.network_registry,
489                network_registry_proxy: resolved_environment.network_registry_proxy,
490                stake_factory: resolved_environment.node_stake_v2_factory,
491                safe_registry: resolved_environment.node_safe_registry,
492                module_implementation: resolved_environment.module_implementation,
493            },
494            cfg.safe_module.safe_address,
495            hopr_chain_indexer::IndexerConfig {
496                start_block_number: resolved_environment.channel_contract_deploy_block as u64,
497                fast_sync: cfg.chain.fast_sync,
498                enable_logs_snapshot: cfg.chain.enable_logs_snapshot,
499                logs_snapshot_url: cfg.chain.logs_snapshot_url.clone(),
500                data_directory: cfg.db.data.clone(),
501            },
502            tx_indexer_events,
503        )?;
504
505        let multi_strategy = Arc::new(MultiStrategy::new(
506            cfg.strategy.clone(),
507            db.clone(),
508            hopr_hopr_chain_api.actions_ref().clone(),
509            hopr_transport_api.ticket_aggregator(),
510        ));
511        debug!(
512            strategies = tracing::field::debug(&multi_strategy),
513            "Initialized strategies"
514        );
515
516        #[cfg(all(feature = "prometheus", not(test)))]
517        {
518            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
519            METRIC_HOPR_LIB_VERSION.set(
520                &[const_format::formatcp!("{}", constants::APP_VERSION)],
521                f64::from_str(const_format::formatcp!(
522                    "{}.{}",
523                    env!("CARGO_PKG_VERSION_MAJOR"),
524                    env!("CARGO_PKG_VERSION_MINOR")
525                ))
526                .unwrap_or(0.0),
527            );
528
529            // Calling get_ticket_statistics will initialize the respective metrics on tickets
530            if let Err(e) = futures::executor::block_on(db.get_ticket_statistics(None)) {
531                error!(error = %e, "Failed to initialize ticket statistics metrics");
532            }
533        }
534
535        Ok(Self {
536            me: me.clone(),
537            me_chain: me_onchain.clone(),
538            cfg,
539            state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
540            transport_api: hopr_transport_api,
541            hopr_chain_api: hopr_hopr_chain_api,
542            db,
543            chain_cfg: resolved_environment,
544            channel_graph,
545            multistrategy: multi_strategy,
546            rx_indexer_significant_events: rx_indexer_events,
547        })
548    }
549
550    fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
551        if self.status() == state {
552            Ok(())
553        } else {
554            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
555        }
556    }
557
558    pub fn status(&self) -> HoprState {
559        self.state.load(Ordering::Relaxed)
560    }
561
562    pub fn version_coerced(&self) -> String {
563        String::from(constants::APP_VERSION_COERCED)
564    }
565
566    pub fn version(&self) -> String {
567        String::from(constants::APP_VERSION)
568    }
569
570    pub fn network(&self) -> String {
571        self.cfg.chain.network.clone()
572    }
573
574    pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
575        Ok(self.hopr_chain_api.get_balance().await?)
576    }
577
578    pub async fn get_eligibility_status(&self) -> errors::Result<bool> {
579        Ok(self.hopr_chain_api.get_eligibility_status().await?)
580    }
581
582    pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
583        let safe_balance = self
584            .hopr_chain_api
585            .get_safe_balance(self.cfg.safe_module.safe_address)
586            .await?;
587        Ok(safe_balance)
588    }
589
590    pub fn get_safe_config(&self) -> SafeModule {
591        self.cfg.safe_module.clone()
592    }
593
594    pub fn chain_config(&self) -> ChainNetworkConfig {
595        self.chain_cfg.clone()
596    }
597
598    pub fn config(&self) -> &config::HoprLibConfig {
599        &self.cfg
600    }
601
602    pub fn get_provider(&self) -> String {
603        self.cfg
604            .chain
605            .provider
606            .clone()
607            .unwrap_or(self.chain_cfg.chain.default_provider.clone())
608    }
609
610    #[inline]
611    fn is_public(&self) -> bool {
612        self.cfg.chain.announce
613    }
614
615    pub async fn run<#[cfg(feature = "session-server")] T: HoprSessionReactor + Clone + Send + 'static>(
616        &self,
617        #[cfg(feature = "session-server")] serve_handler: T,
618    ) -> errors::Result<(HoprSocket, HashMap<HoprLibProcesses, AbortHandle>)> {
619        self.error_if_not_in_state(
620            HoprState::Uninitialized,
621            "Cannot start the hopr node multiple times".into(),
622        )?;
623
624        info!(
625            address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
626            "Node is not started, please fund this node",
627        );
628
629        let mut processes: HashMap<HoprLibProcesses, AbortHandle> = HashMap::new();
630
631        wait_for_funds(
632            self.me_onchain(),
633            *MIN_NATIVE_BALANCE,
634            Duration::from_secs(200),
635            self.hopr_chain_api.rpc(),
636        )
637        .await?;
638
639        info!("Starting the node...");
640
641        self.state.store(HoprState::Initializing, Ordering::Relaxed);
642
643        let balance: XDaiBalance = self.get_balance().await?;
644        let minimum_balance = *constants::MIN_NATIVE_BALANCE;
645
646        info!(
647            address = %self.hopr_chain_api.me_onchain(),
648            %balance,
649            %minimum_balance,
650            "Node information"
651        );
652
653        if balance.le(&minimum_balance) {
654            return Err(HoprLibError::GeneralError(
655                "Cannot start the node without a sufficiently funded wallet".to_string(),
656            ));
657        }
658
659        // Once we are able to query the chain,
660        // check if the ticket price is configured correctly.
661        let network_min_ticket_price = self.hopr_chain_api.get_minimum_ticket_price().await?;
662
663        let configured_ticket_price = self.cfg.protocol.outgoing_ticket_price;
664        if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
665            return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
666                "configured outgoing ticket price is lower than the network minimum ticket price: \
667                 {configured_ticket_price:?} < {network_min_ticket_price}"
668            ))));
669        }
670
671        // Once we are able to query the chain,
672        // check if the winning probability is configured correctly.
673        let network_min_win_prob = self.hopr_chain_api.get_minimum_winning_probability().await?;
674        let configured_win_prob = self.cfg.protocol.outgoing_ticket_winning_prob;
675        if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
676            && configured_win_prob
677                .and_then(|c| WinningProbability::try_from(c).ok())
678                .is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
679        {
680            return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
681                "configured outgoing ticket winning probability is lower than the network minimum winning \
682                 probability: {configured_win_prob:?} < {network_min_win_prob}"
683            ))));
684        }
685
686        // set safe and module addresses in the DB
687        self.db
688            .set_safe_info(
689                None,
690                SafeInfo {
691                    safe_address: self.cfg.safe_module.safe_address,
692                    module_address: self.cfg.safe_module.module_address,
693                },
694            )
695            .await?;
696
697        self.state.store(HoprState::Indexing, Ordering::Relaxed);
698
699        let (mut indexer_peer_update_tx, indexer_peer_update_rx) = futures::channel::mpsc::unbounded::<PeerDiscovery>();
700
701        let indexer_event_pipeline = chain_events_to_transport_events(
702            self.rx_indexer_significant_events.clone(),
703            self.me_onchain(),
704            self.db.clone(),
705            self.multistrategy.clone(),
706            self.channel_graph.clone(),
707            self.hopr_chain_api.action_state(),
708        )
709        .await;
710
711        {
712            // This has to happen before the indexing process starts in order to make sure that the pre-existing data is
713            // properly populated into the transport mechanism before the synced data in the follow up process.
714            info!("Syncing peer announcements and network registry updates from previous runs");
715            let accounts = self.db.get_accounts(None, true).await?;
716            for account in accounts.into_iter() {
717                match account.entry_type {
718                    AccountType::NotAnnounced => {}
719                    AccountType::Announced { multiaddr, .. } => {
720                        indexer_peer_update_tx
721                            .send(PeerDiscovery::Announce(account.public_key.into(), vec![multiaddr]))
722                            .await
723                            .map_err(|e| {
724                                HoprLibError::GeneralError(format!("Failed to send peer discovery announcement: {e}"))
725                            })?;
726
727                        let allow_status = if self
728                            .db
729                            .is_allowed_in_network_registry(None, &account.chain_addr)
730                            .await?
731                        {
732                            PeerDiscovery::Allow(account.public_key.into())
733                        } else {
734                            PeerDiscovery::Ban(account.public_key.into())
735                        };
736
737                        indexer_peer_update_tx.send(allow_status).await.map_err(|e| {
738                            HoprLibError::GeneralError(format!(
739                                "Failed to send peer discovery network registry event: {e}"
740                            ))
741                        })?;
742                    }
743                }
744            }
745        }
746
747        spawn(async move {
748            indexer_event_pipeline
749                .map(Ok)
750                .forward(indexer_peer_update_tx)
751                .await
752                .expect("The index to transport event chain failed");
753        });
754
755        info!("Start the chain process and sync the indexer");
756        for (id, proc) in self.hopr_chain_api.start().await?.into_iter() {
757            let nid = match id {
758                HoprChainProcess::Indexer => HoprLibProcesses::Indexing,
759                HoprChainProcess::OutgoingOnchainActionQueue => HoprLibProcesses::OutgoingOnchainActionQueue,
760            };
761            processes.insert(nid, proc);
762        }
763
764        {
765            // Show onboarding information
766            let my_ethereum_address = self.me_onchain();
767            let my_peer_id = self.me_peer_id();
768            let my_version = crate::constants::APP_VERSION;
769
770            while !self
771                .db
772                .clone()
773                .is_allowed_in_network_registry(None, &my_ethereum_address)
774                .await
775                .unwrap_or(false)
776            {
777                info!(
778                    "Once you become eligible to join the HOPR network, you can continue your onboarding by using the following URL: https://hub.hoprnet.org/staking/onboarding?HOPRdNodeAddressForOnboarding={}, or by manually entering the node address of your node on https://hub.hoprnet.org/.",
779                    my_ethereum_address.to_hex()
780                );
781
782                sleep(ONBOARDING_INFORMATION_INTERVAL).await;
783
784                info!(peer_id = %my_peer_id, address = %my_ethereum_address.to_hex(), version = &my_version, "Node information");
785                info!("Node Ethereum address: {my_ethereum_address} <- put this into staking hub");
786            }
787        }
788
789        // Check Safe-module status:
790        // 1) if the node is already included into the module
791        // 2) if the module is enabled in the safe
792        // 3) if the safe is the owner of the module
793        // if any of the conditions is not met, return error
794        let safe_module_configuration = self
795            .hopr_chain_api
796            .rpc()
797            .check_node_safe_module_status(self.me_onchain())
798            .await
799            .map_err(HoprChainError::Rpc)?;
800
801        if !safe_module_configuration.should_pass() {
802            error!(
803                ?safe_module_configuration,
804                "Something is wrong with the safe module configuration",
805            );
806            return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
807                "Safe and module are not configured correctly {safe_module_configuration:?}",
808            ))));
809        }
810
811        // Possibly register a node-safe pair to NodeSafeRegistry.
812        // Following that, the connector is set to use safe tx variants.
813        if can_register_with_safe(
814            self.me_onchain(),
815            self.cfg.safe_module.safe_address,
816            self.hopr_chain_api.rpc(),
817        )
818        .await?
819        {
820            info!("Registering safe by node");
821
822            if self.me_onchain() == self.cfg.safe_module.safe_address {
823                return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
824            }
825
826            if let Err(e) = self
827                .hopr_chain_api
828                .actions_ref()
829                .register_safe_by_node(self.cfg.safe_module.safe_address)
830                .await?
831                .await
832            {
833                // Intentionally ignoring the errored state
834                error!(error = %e, "Failed to register node with safe")
835            }
836        }
837
838        if self.is_public() {
839            // At this point the node is already registered with Safe, so
840            // we can announce via Safe-compliant TX
841
842            let multiaddresses_to_announce = self.transport_api.announceable_multiaddresses();
843
844            // The announcement is intentionally not awaited until confirmation
845            match self
846                .hopr_chain_api
847                .actions_ref()
848                .announce(&multiaddresses_to_announce, &self.me)
849                .await
850            {
851                Ok(_) => info!(?multiaddresses_to_announce, "Announcing node on chain",),
852                Err(ChainActionsError::AlreadyAnnounced) => {
853                    info!(multiaddresses_announced = ?multiaddresses_to_announce, "Node already announced on chain")
854                }
855                // If the announcement fails, we keep going to prevent the node from retrying
856                // after restart.
857                // Functionality is limited, and users must check the logs for errors.
858                Err(e) => error!(error = %e, "Failed to transmit node announcement"),
859            }
860        }
861
862        {
863            // Sync key ids from indexed Accounts
864
865            // Sync the Channel graph
866            let channel_graph = self.channel_graph.clone();
867            let mut cg = channel_graph.write_arc().await;
868
869            info!("Syncing channels from the previous runs");
870            let mut channel_stream = self
871                .db
872                .stream_active_channels()
873                .await
874                .map_err(hopr_db_sql::api::errors::DbError::from)?;
875
876            while let Some(maybe_channel) = channel_stream.next().await {
877                match maybe_channel {
878                    Ok(channel) => {
879                        cg.update_channel(channel);
880                    }
881                    Err(error) => error!(%error, "Failed to sync channel into the graph"),
882                }
883            }
884
885            // Initialize node latencies and scores in the channel graph:
886            // Sync only those nodes that we know that had a good quality
887            // Other nodes will be repopulated into the channel graph during heartbeat
888            // rounds.
889            info!("Syncing peer qualities from the previous runs");
890            let min_quality_to_sync: f64 = std::env::var("HOPR_MIN_PEER_QUALITY_TO_SYNC")
891                .map_err(|e| e.to_string())
892                .and_then(|v| std::str::FromStr::from_str(&v).map_err(|_| "parse error".to_string()))
893                .unwrap_or_else(|error| {
894                    warn!(error, "invalid value for HOPR_MIN_PEER_QUALITY_TO_SYNC env variable");
895                    constants::DEFAULT_MIN_QUALITY_TO_SYNC
896                });
897
898            let mut peer_stream = self
899                .db
900                .get_network_peers(Default::default(), false)
901                .await?
902                .filter(|status| futures::future::ready(status.quality >= min_quality_to_sync));
903
904            while let Some(peer) = peer_stream.next().await {
905                if let Some(ChainKey(key)) = self.db.translate_key(None, peer.id.0).await? {
906                    // For nodes that had a good quality, we assign a perfect score
907                    cg.update_node_score(&key, NodeScoreUpdate::Initialize(peer.last_seen_latency, 1.0));
908                } else {
909                    error!(peer = %peer.id.1, "Could not translate peer information");
910                }
911            }
912
913            info!(
914                channels = cg.count_channels(),
915                nodes = cg.count_nodes(),
916                "Channel graph sync complete"
917            );
918        }
919
920        let socket = HoprSocket::new();
921        let transport_output_tx = socket.writer();
922
923        // notifier on acknowledged ticket reception
924        let multi_strategy_ack_ticket = self.multistrategy.clone();
925        let (on_ack_tkt_tx, mut on_ack_tkt_rx) = unbounded::<AcknowledgedTicket>();
926        self.db.start_ticket_processing(Some(on_ack_tkt_tx))?;
927
928        processes.insert(
929            HoprLibProcesses::OnReceivedAcknowledgement,
930            hopr_async_runtime::spawn_as_abortable!(async move {
931                while let Some(ack) = on_ack_tkt_rx.next().await {
932                    if let Err(error) = hopr_strategy::strategy::SingularStrategy::on_acknowledged_winning_ticket(
933                        &*multi_strategy_ack_ticket,
934                        &ack,
935                    )
936                    .await
937                    {
938                        error!(%error, "Failed to process acknowledged winning ticket with the strategy");
939                    }
940                }
941            }),
942        );
943
944        let (session_tx, _session_rx) = unbounded::<IncomingSession>();
945
946        #[cfg(feature = "session-server")]
947        {
948            processes.insert(
949                HoprLibProcesses::SessionServer,
950                hopr_async_runtime::spawn_as_abortable!(_session_rx.for_each_concurrent(None, move |session| {
951                    let serve_handler = serve_handler.clone();
952                    async move {
953                        let session_id = *session.session.id();
954                        match serve_handler.process(session).await {
955                            Ok(_) => debug!(
956                                session_id = ?session_id,
957                                "Client session processed successfully"
958                            ),
959                            Err(e) => error!(
960                                session_id = ?session_id,
961                                error = %e,
962                                "Client session processing failed"
963                            ),
964                        }
965                    }
966                })),
967            );
968        }
969
970        for (id, proc) in self
971            .transport_api
972            .run(
973                &self.me_chain,
974                join(&[&self.cfg.db.data, "tbf"])
975                    .map_err(|e| HoprLibError::GeneralError(format!("Failed to construct the bloom filter: {e}")))?,
976                transport_output_tx,
977                indexer_peer_update_rx,
978                session_tx,
979            )
980            .await?
981            .into_iter()
982        {
983            processes.insert(id.into(), proc);
984        }
985
986        let db_clone = self.db.clone();
987        processes.insert(
988            HoprLibProcesses::TicketIndexFlush,
989            hopr_async_runtime::spawn_as_abortable!(Box::pin(execute_on_tick(
990                Duration::from_secs(5),
991                move || {
992                    let db_clone = db_clone.clone();
993                    async move {
994                        match db_clone.persist_outgoing_ticket_indices().await {
995                            Ok(n) => debug!(count = n, "Successfully flushed states of outgoing ticket indices"),
996                            Err(e) => error!(error = %e, "Failed to flush ticket indices"),
997                        }
998                    }
999                },
1000                "flush the states of outgoing ticket indices".into(),
1001            ))),
1002        );
1003
1004        // NOTE: after the chain is synced, we can reset tickets which are considered
1005        // redeemed but on-chain state does not align with that. This implies there was a problem
1006        // right when the transaction was sent on-chain. In such cases, we simply let it retry and
1007        // handle errors appropriately.
1008        if let Err(e) = self.db.fix_channels_next_ticket_state().await {
1009            error!(error = %e, "failed to fix channels ticket states");
1010        }
1011
1012        // NOTE: strategy ticks must start after the chain is synced, otherwise
1013        // the strategy would react to historical data and drain through the native
1014        // balance on chain operations not relevant for the present network state
1015        let multi_strategy = self.multistrategy.clone();
1016        let strategy_interval = self.cfg.strategy.execution_interval;
1017        processes.insert(
1018            HoprLibProcesses::StrategyTick,
1019            hopr_async_runtime::spawn_as_abortable!(async move {
1020                execute_on_tick(
1021                    Duration::from_secs(strategy_interval),
1022                    move || {
1023                        let multi_strategy = multi_strategy.clone();
1024
1025                        async move {
1026                            trace!(state = "started", "strategy tick");
1027                            let _ = multi_strategy.on_tick().await;
1028                            trace!(state = "finished", "strategy tick");
1029                        }
1030                    },
1031                    "run strategies".into(),
1032                )
1033                .await;
1034            }),
1035        );
1036
1037        self.state.store(HoprState::Running, Ordering::Relaxed);
1038
1039        info!(
1040            id = %self.me_peer_id(),
1041            version = constants::APP_VERSION,
1042            "NODE STARTED AND RUNNING"
1043        );
1044
1045        #[cfg(all(feature = "prometheus", not(test)))]
1046        METRIC_HOPR_NODE_INFO.set(
1047            &[
1048                &self.me.public().to_peerid_str(),
1049                &self.me_onchain().to_string(),
1050                &self.cfg.safe_module.safe_address.to_string(),
1051                &self.cfg.safe_module.module_address.to_string(),
1052            ],
1053            1.0,
1054        );
1055
1056        Ok((socket, processes))
1057    }
1058
1059    // p2p transport =========
1060    /// Own PeerId used in the libp2p transport layer
1061    pub fn me_peer_id(&self) -> PeerId {
1062        (*self.me.public()).into()
1063    }
1064
1065    /// Get the list of all announced public nodes in the network
1066    pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
1067        Ok(self.transport_api.get_public_nodes().await?)
1068    }
1069
1070    /// Returns the most recently indexed log, if any.
1071    pub async fn get_indexer_state(&self) -> errors::Result<IndexerStateInfo> {
1072        let indexer_state_info = self.db.get_indexer_state_info(None).await?;
1073
1074        match self.db.get_last_checksummed_log().await? {
1075            Some(log) => {
1076                let checksum = match log.checksum {
1077                    Some(checksum) => Hash::from_hex(checksum.as_str())?,
1078                    None => Hash::default(),
1079                };
1080                Ok(IndexerStateInfo {
1081                    latest_log_block_number: log.block_number as u32,
1082                    latest_log_checksum: checksum,
1083                    ..indexer_state_info
1084                })
1085            }
1086            None => Ok(indexer_state_info),
1087        }
1088    }
1089
1090    /// Test whether the peer with PeerId is allowed to access the network
1091    pub async fn is_allowed_to_access_network(
1092        &self,
1093        address_like: either::Either<&PeerId, Address>,
1094    ) -> errors::Result<bool> {
1095        Ok(self.transport_api.is_allowed_to_access_network(address_like).await?)
1096    }
1097
1098    /// Ping another node in the network based on the PeerId
1099    ///
1100    /// Returns the RTT (round trip time), i.e. how long it took for the ping to return.
1101    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
1102        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1103
1104        Ok(self.transport_api.ping(peer).await?)
1105    }
1106
1107    /// Create a client session connection returning a session object that implements
1108    /// [`futures::io::AsyncRead`] and [`futures::io::AsyncWrite`] and can bu used as a read/write binary session.
1109    #[cfg(feature = "session-client")]
1110    pub async fn connect_to(
1111        &self,
1112        destination: Address,
1113        target: SessionTarget,
1114        cfg: SessionClientConfig,
1115    ) -> errors::Result<HoprSession> {
1116        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1117
1118        let backoff = backon::ConstantBuilder::default()
1119            .with_max_times(self.cfg.session.establish_max_retries as usize)
1120            .with_delay(self.cfg.session.establish_retry_timeout)
1121            .with_jitter();
1122
1123        use backon::Retryable;
1124
1125        Ok((|| {
1126            let cfg = cfg.clone();
1127            let target = target.clone();
1128            async { self.transport_api.new_session(destination, target, cfg).await }
1129        })
1130        .retry(backoff)
1131        .sleep(backon::FuturesTimerSleeper)
1132        .await?)
1133    }
1134
1135    /// Sends keep-alive to the given [`HoprSessionId`], making sure the session is not
1136    /// closed due to inactivity.
1137    #[cfg(feature = "session-client")]
1138    pub async fn keep_alive_session(&self, id: &HoprSessionId) -> errors::Result<()> {
1139        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1140        Ok(self.transport_api.probe_session(id).await?)
1141    }
1142
1143    #[cfg(feature = "session-client")]
1144    pub async fn get_session_surb_balancer_config(
1145        &self,
1146        id: &HoprSessionId,
1147    ) -> errors::Result<Option<SurbBalancerConfig>> {
1148        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1149        Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
1150    }
1151
1152    #[cfg(feature = "session-client")]
1153    pub async fn update_session_surb_balancer_config(
1154        &self,
1155        id: &HoprSessionId,
1156        cfg: SurbBalancerConfig,
1157    ) -> errors::Result<()> {
1158        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1159        Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
1160    }
1161
1162    /// Send a message to another peer in the network
1163    ///
1164    /// @param msg message to send
1165    /// @param destination PeerId of the destination
1166    /// @param options optional configuration of the message path using hops and intermediatePath
1167    /// @param applicationTag optional tag identifying the sending application
1168    /// @returns ack challenge
1169    #[tracing::instrument(level = "debug", skip(self, msg))]
1170    pub async fn send_message(
1171        &self,
1172        msg: Box<[u8]>,
1173        routing: DestinationRouting,
1174        application_tag: Tag,
1175    ) -> errors::Result<()> {
1176        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1177
1178        self.transport_api.send_message(msg, routing, application_tag).await?;
1179
1180        Ok(())
1181    }
1182
1183    /// Attempts to aggregate all tickets in the given channel
1184    pub async fn aggregate_tickets(&self, channel: &Hash) -> errors::Result<()> {
1185        Ok(self.transport_api.aggregate_tickets(channel).await?)
1186    }
1187
1188    /// List all multiaddresses announced by this node
1189    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1190        self.transport_api.local_multiaddresses()
1191    }
1192
1193    /// List all multiaddresses on which the node is listening
1194    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1195        self.transport_api.listening_multiaddresses().await
1196    }
1197
1198    /// List all multiaddresses observed for a PeerId
1199    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1200        self.transport_api.network_observed_multiaddresses(peer).await
1201    }
1202
1203    /// List all multiaddresses announced on-chain for the given node.
1204    pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Vec<Multiaddr> {
1205        let key = match OffchainPublicKey::try_from(peer) {
1206            Ok(k) => k,
1207            Err(e) => {
1208                error!(%peer, error = %e, "failed to convert peer id to off-chain key");
1209                return vec![];
1210            }
1211        };
1212
1213        match self.db.get_account(None, key).await {
1214            Ok(Some(entry)) => Vec::from_iter(entry.get_multiaddr()),
1215            Ok(None) => {
1216                error!(%peer, "no information");
1217                vec![]
1218            }
1219            Err(e) => {
1220                error!(%peer, error = %e, "failed to retrieve information");
1221                vec![]
1222            }
1223        }
1224    }
1225
1226    // Network =========
1227
1228    /// Get measured network health
1229    pub async fn network_health(&self) -> Health {
1230        self.transport_api.network_health().await
1231    }
1232
1233    /// List all peers connected to this
1234    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
1235        Ok(self.transport_api.network_connected_peers().await?)
1236    }
1237
1238    /// Get all data collected from the network relevant for a PeerId
1239    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<hopr_transport::PeerStatus>> {
1240        Ok(self.transport_api.network_peer_info(peer).await?)
1241    }
1242
1243    /// Get peers connected peers with quality higher than some value
1244    pub async fn all_network_peers(
1245        &self,
1246        minimum_quality: f64,
1247    ) -> errors::Result<Vec<(Option<Address>, PeerId, hopr_transport::PeerStatus)>> {
1248        Ok(
1249            futures::stream::iter(self.transport_api.network_connected_peers().await?)
1250                .filter_map(|peer| async move {
1251                    if let Ok(Some(info)) = self.transport_api.network_peer_info(&peer).await {
1252                        if info.get_average_quality() >= minimum_quality {
1253                            Some((peer, info))
1254                        } else {
1255                            None
1256                        }
1257                    } else {
1258                        None
1259                    }
1260                })
1261                .filter_map(|(peer_id, info)| async move {
1262                    let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
1263                    Some((address, peer_id, info))
1264                })
1265                .collect::<Vec<_>>()
1266                .await,
1267        )
1268    }
1269
1270    // Ticket ========
1271    /// Get all tickets in a channel specified by Hash
1272    pub async fn tickets_in_channel(&self, channel: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
1273        Ok(self.transport_api.tickets_in_channel(channel).await?)
1274    }
1275
1276    /// Get all tickets
1277    pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
1278        Ok(self.transport_api.all_tickets().await?)
1279    }
1280
1281    /// Get statistics for all tickets
1282    pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
1283        Ok(self.transport_api.ticket_statistics().await?)
1284    }
1285
1286    /// Reset the ticket metrics to zero
1287    pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
1288        Ok(self.db.reset_ticket_statistics().await?)
1289    }
1290
1291    // DB ============
1292    pub fn peer_resolver(&self) -> &impl HoprDbResolverOperations {
1293        &self.db
1294    }
1295
1296    // Chain =========
1297    pub fn me_onchain(&self) -> Address {
1298        self.hopr_chain_api.me_onchain()
1299    }
1300
1301    /// Get ticket price
1302    pub async fn get_ticket_price(&self) -> errors::Result<Option<HoprBalance>> {
1303        Ok(self.hopr_chain_api.ticket_price().await?)
1304    }
1305
1306    /// Get minimum incoming ticket winning probability
1307    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
1308        Ok(self
1309            .db
1310            .get_indexer_data(None)
1311            .await?
1312            .minimum_incoming_ticket_winning_prob)
1313    }
1314
1315    /// List of all accounts announced on the chain
1316    pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1317        Ok(self.hopr_chain_api.accounts_announced_on_chain().await?)
1318    }
1319
1320    /// Get the channel entry from Hash.
1321    /// @returns the channel entry of those two nodes
1322    pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1323        Ok(self.db.get_channel_by_id(None, channel_id).await?)
1324    }
1325
1326    /// Get the channel entry between source and destination node.
1327    /// @param src Address
1328    /// @param dest Address
1329    /// @returns the channel entry of those two nodes
1330    pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<ChannelEntry> {
1331        Ok(self.hopr_chain_api.channel(src, dest).await?)
1332    }
1333
1334    /// List all channels open from a specified Address
1335    pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1336        Ok(self.hopr_chain_api.channels_from(src).await?)
1337    }
1338
1339    /// List all channels open to a specified address
1340    pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1341        Ok(self.hopr_chain_api.channels_to(dest).await?)
1342    }
1343
1344    /// List all channels
1345    pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1346        Ok(self.hopr_chain_api.all_channels().await?)
1347    }
1348
1349    /// List all corrupted channels
1350    pub async fn corrupted_channels(&self) -> errors::Result<Vec<CorruptedChannelEntry>> {
1351        Ok(self.hopr_chain_api.corrupted_channels().await?)
1352    }
1353
1354    /// Current safe allowance balance
1355    pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1356        Ok(self.hopr_chain_api.safe_allowance().await?)
1357    }
1358
1359    /// Withdraw on-chain assets to a given address
1360    /// @param recipient the account where the assets should be transferred to
1361    /// @param amount how many tokens to be transferred
1362    pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<Hash> {
1363        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1364
1365        let awaiter = self.hopr_chain_api.actions_ref().withdraw(recipient, amount).await?;
1366
1367        Ok(awaiter.await?.tx_hash)
1368    }
1369
1370    /// Withdraw on-chain native assets to a given address
1371    /// @param recipient the account where the assets should be transferred to
1372    /// @param amount how many tokens to be transferred
1373    pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<Hash> {
1374        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1375
1376        let awaiter = self
1377            .hopr_chain_api
1378            .actions_ref()
1379            .withdraw_native(recipient, amount)
1380            .await?;
1381
1382        Ok(awaiter.await?.tx_hash)
1383    }
1384
1385    pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1386        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1387
1388        let awaiter = self
1389            .hopr_chain_api
1390            .actions_ref()
1391            .open_channel(*destination, amount)
1392            .await?;
1393
1394        let channel_id = generate_channel_id(&self.hopr_chain_api.me_onchain(), destination);
1395        Ok(awaiter.await.map(|confirm| OpenChannelResult {
1396            tx_hash: confirm.tx_hash,
1397            channel_id,
1398        })?)
1399    }
1400
1401    pub async fn fund_channel(&self, channel_id: &Hash, amount: HoprBalance) -> errors::Result<Hash> {
1402        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1403
1404        let awaiter = self
1405            .hopr_chain_api
1406            .actions_ref()
1407            .fund_channel(*channel_id, amount)
1408            .await?;
1409
1410        Ok(awaiter.await?.tx_hash)
1411    }
1412
1413    pub async fn close_channel(
1414        &self,
1415        counterparty: &Address,
1416        direction: ChannelDirection,
1417        redeem_before_close: bool,
1418    ) -> errors::Result<CloseChannelResult> {
1419        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1420
1421        let confirmation = self
1422            .hopr_chain_api
1423            .actions_ref()
1424            .close_channel(*counterparty, direction, redeem_before_close)
1425            .await?
1426            .await?;
1427
1428        match confirmation
1429            .event
1430            .expect("channel close action confirmation must have associated chain event")
1431        {
1432            ChainEventType::ChannelClosureInitiated(c) => Ok(CloseChannelResult {
1433                tx_hash: confirmation.tx_hash,
1434                status: c.status, // copy the information about closure time
1435            }),
1436            ChainEventType::ChannelClosed(_) => Ok(CloseChannelResult {
1437                tx_hash: confirmation.tx_hash,
1438                status: ChannelStatus::Closed,
1439            }),
1440            _ => Err(HoprLibError::GeneralError("close channel transaction failed".into())),
1441        }
1442    }
1443
1444    pub async fn close_channel_by_id(
1445        &self,
1446        channel_id: Hash,
1447        redeem_before_close: bool,
1448    ) -> errors::Result<CloseChannelResult> {
1449        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1450
1451        match self.channel_from_hash(&channel_id).await? {
1452            Some(channel) => match channel.orientation(&self.me_onchain()) {
1453                Some((direction, counterparty)) => {
1454                    self.close_channel(&counterparty, direction, redeem_before_close).await
1455                }
1456                None => Err(HoprLibError::ChainError(ChainActionsError::InvalidArguments(
1457                    "cannot close channel that is not own".into(),
1458                ))),
1459            },
1460            None => Err(HoprLibError::ChainError(ChainActionsError::ChannelDoesNotExist)),
1461        }
1462    }
1463
1464    pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1465        Ok(self.hopr_chain_api.get_channel_closure_notice_period().await?)
1466    }
1467
1468    pub async fn redeem_all_tickets(&self, only_aggregated: bool) -> errors::Result<()> {
1469        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1470
1471        // We do not await the on-chain confirmation
1472        self.hopr_chain_api
1473            .actions_ref()
1474            .redeem_all_tickets(only_aggregated)
1475            .await?;
1476
1477        Ok(())
1478    }
1479
1480    pub async fn redeem_tickets_with_counterparty(
1481        &self,
1482        counterparty: &Address,
1483        only_aggregated: bool,
1484    ) -> errors::Result<()> {
1485        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1486
1487        // We do not await the on-chain confirmation
1488        let _ = self
1489            .hopr_chain_api
1490            .actions_ref()
1491            .redeem_tickets_with_counterparty(counterparty, only_aggregated)
1492            .await?;
1493
1494        Ok(())
1495    }
1496
1497    pub async fn redeem_tickets_in_channel(&self, channel_id: &Hash, only_aggregated: bool) -> errors::Result<usize> {
1498        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1499
1500        let channel = self.db.get_channel_by_id(None, channel_id).await?;
1501        let mut redeem_count = 0;
1502
1503        if let Some(channel) = channel {
1504            if channel.destination == self.hopr_chain_api.me_onchain() {
1505                // We do not await the on-chain confirmation
1506                redeem_count = self
1507                    .hopr_chain_api
1508                    .actions_ref()
1509                    .redeem_tickets_in_channel(&channel, only_aggregated)
1510                    .await?
1511                    .len();
1512            }
1513        }
1514
1515        Ok(redeem_count)
1516    }
1517
1518    pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1519        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1520
1521        // We do not await the on-chain confirmation
1522        #[allow(clippy::let_underscore_future)]
1523        let _ = self.hopr_chain_api.actions_ref().redeem_ticket(ack_ticket).await?;
1524
1525        Ok(())
1526    }
1527
1528    pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1529        let pk = hopr_transport::OffchainPublicKey::try_from(peer_id)?;
1530        Ok(self.db.resolve_chain_key(&pk).await?)
1531    }
1532
1533    pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1534        Ok(self
1535            .db
1536            .resolve_packet_key(address)
1537            .await
1538            .map(|pk| pk.map(|v| v.into()))?)
1539    }
1540
1541    pub async fn export_channel_graph(&self, cfg: GraphExportConfig) -> String {
1542        self.channel_graph.read_arc().await.as_dot(cfg)
1543    }
1544
1545    pub async fn export_raw_channel_graph(&self) -> errors::Result<String> {
1546        let cg = self.channel_graph.read_arc().await;
1547        serde_json::to_string(cg.deref()).map_err(|e| HoprLibError::GeneralError(e.to_string()))
1548    }
1549}