hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it;
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15
16/// Configuration-related public types
17pub mod config;
18/// Various public constants.
19pub mod constants;
20/// Lists all errors thrown from this library.
21pub mod errors;
22
23use std::{
24    collections::HashMap,
25    fmt::{Display, Formatter},
26    ops::Deref,
27    path::PathBuf,
28    sync::{Arc, atomic::Ordering},
29    time::Duration,
30};
31
32use async_lock::RwLock;
33use errors::{HoprLibError, HoprStatusError};
34use futures::{
35    SinkExt, Stream, StreamExt,
36    channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded},
37    future::AbortHandle,
38    stream::{self},
39};
40use hopr_async_runtime::prelude::{sleep, spawn};
41pub use hopr_chain_actions::errors::ChainActionsError;
42use hopr_chain_actions::{
43    action_state::{ActionState, IndexerActionTracker},
44    channels::ChannelActions,
45    node::NodeActions,
46    redeem::TicketRedeemActions,
47};
48pub use hopr_chain_api::config::{
49    Addresses as NetworkContractAddresses, EnvironmentType, Network as ChainNetwork, ProtocolsConfig,
50};
51use hopr_chain_api::{
52    HoprChain, HoprChainProcess, SignificantChainEvent, can_register_with_safe, config::ChainNetworkConfig,
53    errors::HoprChainError, wait_for_funds,
54};
55use hopr_chain_rpc::HoprRpcOperations;
56use hopr_chain_types::{ContractAddresses, chain_events::ChainEventType};
57use hopr_crypto_types::prelude::OffchainPublicKey;
58use hopr_db_api::logs::HoprDbLogOperations;
59use hopr_db_sql::{
60    HoprDbAllOperations,
61    accounts::HoprDbAccountOperations,
62    api::{info::SafeInfo, resolver::HoprDbResolverOperations, tickets::HoprDbTicketOperations},
63    channels::HoprDbChannelOperations,
64    db::{HoprDb, HoprDbConfig},
65    info::{HoprDbInfoOperations, IndexerStateInfo},
66    prelude::{ChainOrPacketKey::ChainKey, HoprDbPeersOperations},
67    registry::HoprDbRegistryOperations,
68};
69pub use hopr_internal_types::prelude::*;
70pub use hopr_network_types::prelude::{DestinationRouting, IpProtocol, RoutingOptions};
71pub use hopr_path::channel_graph::GraphExportConfig;
72use hopr_path::channel_graph::{ChannelGraph, ChannelGraphConfig, NodeScoreUpdate};
73use hopr_platform::file::native::{join, remove_dir_all};
74pub use hopr_primitive_types::prelude::*;
75pub use hopr_strategy::Strategy;
76use hopr_strategy::strategy::{MultiStrategy, SingularStrategy};
77#[cfg(feature = "runtime-tokio")]
78pub use hopr_transport::transfer_session;
79pub use hopr_transport::{
80    ApplicationData, HalfKeyChallenge, Health, IncomingSession as HoprIncomingSession, Keypair, Multiaddr,
81    OffchainKeypair as HoprOffchainKeypair, PeerId, PingQueryReplier, ProbeError, SESSION_PAYLOAD_SIZE, SendMsg,
82    ServiceId, Session as HoprSession, SessionCapabilities, SessionCapability, SessionClientConfig,
83    SessionId as HoprSessionId, SessionTarget, SurbBalancerConfig, Tag, TicketStatistics,
84    config::{HostConfig, HostType, looks_like_domain},
85    errors::{HoprTransportError, NetworkingError, ProtocolError},
86};
87use hopr_transport::{
88    ChainKeypair, Hash, HoprTransport, HoprTransportConfig, HoprTransportProcess, IncomingSession, OffchainKeypair,
89    PeerDiscovery, PeerStatus, execute_on_tick,
90};
91use tracing::{debug, error, info, trace, warn};
92#[cfg(all(feature = "prometheus", not(test)))]
93use {
94    hopr_metrics::metrics::{MultiGauge, SimpleGauge},
95    hopr_platform::time::native::current_time,
96    std::str::FromStr,
97};
98
99use crate::{
100    config::SafeModule,
101    constants::{MIN_NATIVE_BALANCE, ONBOARDING_INFORMATION_INTERVAL, SUGGESTED_NATIVE_BALANCE},
102};
103
104#[cfg(all(feature = "prometheus", not(test)))]
105lazy_static::lazy_static! {
106    static ref METRIC_PROCESS_START_TIME: SimpleGauge = SimpleGauge::new(
107        "hopr_start_time",
108        "The unix timestamp in seconds at which the process was started"
109    ).unwrap();
110    static ref METRIC_HOPR_LIB_VERSION: MultiGauge = MultiGauge::new(
111        "hopr_lib_version",
112        "Executed version of hopr-lib",
113        &["version"]
114    ).unwrap();
115    static ref METRIC_HOPR_NODE_INFO: MultiGauge = MultiGauge::new(
116        "hopr_node_addresses",
117        "Node on-chain and off-chain addresses",
118        &["peerid", "address", "safe_address", "module_address"]
119    ).unwrap();
120}
121
122pub use async_trait::async_trait;
123
124/// Interface representing the HOPR server behavior for each incoming session instance
125/// supplied as an argument.
126#[cfg(feature = "session-server")]
127#[async_trait::async_trait]
128pub trait HoprSessionReactor {
129    /// Fully process a single HOPR session
130    async fn process(&self, session: HoprIncomingSession) -> errors::Result<()>;
131}
132
133/// An enum representing the current state of the HOPR node
134#[atomic_enum::atomic_enum]
135#[derive(PartialEq, Eq)]
136pub enum HoprState {
137    Uninitialized = 0,
138    Initializing = 1,
139    Indexing = 2,
140    Starting = 3,
141    Running = 4,
142}
143
144impl Display for HoprState {
145    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146        write!(f, "{:?}", self)
147    }
148}
149
150pub struct OpenChannelResult {
151    pub tx_hash: Hash,
152    pub channel_id: Hash,
153}
154
155pub struct CloseChannelResult {
156    pub tx_hash: Hash,
157    pub status: ChannelStatus,
158}
159
160/// Enum differentiator for loop component futures.
161///
162/// Used to differentiate the type of the future that exits the loop premateruly
163/// by tagging it as an enum.
164#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display)]
165pub enum HoprLibProcesses {
166    #[strum(to_string = "transport: {0}")]
167    Transport(HoprTransportProcess),
168    #[cfg(feature = "session-server")]
169    #[strum(to_string = "session server providing the exit node session stream functionality")]
170    SessionServer,
171    #[strum(to_string = "tick wake up the strategies to perform an action")]
172    StrategyTick,
173    #[strum(to_string = "initial indexing operation into the DB")]
174    Indexing,
175    #[strum(to_string = "processing of indexed operations in internal components")]
176    IndexReflection,
177    #[strum(to_string = "on-chain transaction queue component for outgoing transactions")]
178    OutgoingOnchainActionQueue,
179    #[strum(to_string = "flush operation of outgoing ticket indices to the DB")]
180    TicketIndexFlush,
181    #[strum(to_string = "on received ack ticket trigger")]
182    OnReceivedAcknowledgement,
183}
184
185impl HoprLibProcesses {
186    /// Identifies whether a loop is allowed to finish or should
187    /// run indefinitely.
188    pub fn can_finish(&self) -> bool {
189        matches!(self, HoprLibProcesses::Indexing)
190    }
191}
192
193impl From<HoprTransportProcess> for HoprLibProcesses {
194    fn from(value: HoprTransportProcess) -> Self {
195        HoprLibProcesses::Transport(value)
196    }
197}
198
199/// Creates a pipeline that chains the indexer-generated data, processes them into
200/// the individual components, and creates a filtered output stream that is fed into
201/// the transport layer swarm.
202///
203/// * `event_stream` - represents the events generated by the indexer. If the Indexer is not synced, it will not
204///   generate any events.
205/// * `preloading_event_stream` - a stream used by the components to preload the data from the objects (db, channel
206///   graph...)
207#[allow(clippy::too_many_arguments)]
208pub async fn chain_events_to_transport_events<StreamIn, Db>(
209    event_stream: StreamIn,
210    me_onchain: Address,
211    db: Db,
212    multi_strategy: Arc<MultiStrategy>,
213    channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
214    indexer_action_tracker: Arc<IndexerActionTracker>,
215) -> impl Stream<Item = PeerDiscovery> + Send + 'static
216where
217    Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
218    StreamIn: Stream<Item = SignificantChainEvent> + Send + 'static,
219{
220    Box::pin(event_stream.filter_map(move |event| {
221        let db = db.clone();
222        let multi_strategy = multi_strategy.clone();
223        let channel_graph = channel_graph.clone();
224        let indexer_action_tracker = indexer_action_tracker.clone();
225
226        async move {
227            let resolved = indexer_action_tracker.match_and_resolve(&event).await;
228            if resolved.is_empty() {
229                trace!(%event, "No indexer expectations resolved for the event");
230            } else {
231                debug!(count = resolved.len(), %event, "resolved indexer expectations");
232            }
233
234            match event.event_type {
235                ChainEventType::Announcement{peer, address, multiaddresses} => {
236                    let allowed = db
237                        .is_allowed_in_network_registry(None, &address)
238                        .await
239                        .unwrap_or(false);
240
241                    Some(vec![PeerDiscovery::Announce(peer, multiaddresses), if allowed {
242                        PeerDiscovery::Allow(peer)
243                    } else {
244                        PeerDiscovery::Ban(peer)
245                    }])
246                }
247                ChainEventType::ChannelOpened(channel) |
248                ChainEventType::ChannelClosureInitiated(channel) |
249                ChainEventType::ChannelClosed(channel) |
250                ChainEventType::ChannelBalanceIncreased(channel, _) | // needed ?
251                ChainEventType::ChannelBalanceDecreased(channel, _) | // needed ?
252                ChainEventType::TicketRedeemed(channel, _) => {   // needed ?
253                    let maybe_direction = channel.direction(&me_onchain);
254
255                    let change = channel_graph
256                        .write_arc()
257                        .await
258                        .update_channel(channel);
259
260                    // Check if this is our own channel
261                    if let Some(own_channel_direction) = maybe_direction {
262                        if let Some(change_set) = change {
263                            for channel_change in change_set {
264                                let _ = hopr_strategy::strategy::SingularStrategy::on_own_channel_changed(
265                                    &*multi_strategy,
266                                    &channel,
267                                    own_channel_direction,
268                                    channel_change,
269                                )
270                                .await;
271                            }
272                        } else if channel.status == ChannelStatus::Open {
273                            // Emit Opening event if the channel did not exist before in the graph
274                            let _ = hopr_strategy::strategy::SingularStrategy::on_own_channel_changed(
275                                &*multi_strategy,
276                                &channel,
277                                own_channel_direction,
278                                ChannelChange::Status {
279                                    left: ChannelStatus::Closed,
280                                    right: ChannelStatus::Open,
281                                },
282                            )
283                            .await;
284                        }
285                    }
286
287                    None
288                }
289                ChainEventType::NetworkRegistryUpdate(address, allowed) => {
290                    let packet_key = db.translate_key(None, address).await;
291                    match packet_key {
292                        Ok(pk) => {
293                            if let Some(pk) = pk {
294                                let offchain_key: Result<OffchainPublicKey, _> = pk.try_into();
295
296                                if let Ok(offchain_key) = offchain_key {
297                                    let peer_id = offchain_key.into();
298
299                                    let res = match allowed {
300                                        hopr_chain_types::chain_events::NetworkRegistryStatus::Allowed => PeerDiscovery::Allow(peer_id),
301                                        hopr_chain_types::chain_events::NetworkRegistryStatus::Denied => PeerDiscovery::Ban(peer_id),
302                                    };
303
304                                    Some(vec![res])
305                                } else {
306                                    error!("Failed to unwrap as offchain key at this point");
307                                    None
308                                }
309                            } else {
310                                None
311                            }
312                        }
313                        Err(error) => {
314                            error!(%error, "on_network_registry_node_allowed failed");
315                            None
316                        },
317                    }
318                }
319                ChainEventType::NodeSafeRegistered(safe_address) =>  {
320                    info!(%safe_address, "Node safe registered");
321                    None
322                }
323            }
324        }
325    })
326    .flat_map(stream::iter)
327)
328}
329
330/// Represents the socket behavior of the hopr-lib spawned [`Hopr`] object.
331///
332/// Provides a read and write stream for Hopr socket recognized data formats.
333pub struct HoprSocket {
334    rx: UnboundedReceiver<ApplicationData>,
335    tx: UnboundedSender<ApplicationData>,
336}
337
338impl Default for HoprSocket {
339    fn default() -> Self {
340        let (tx, rx) = unbounded::<ApplicationData>();
341        Self { rx, tx }
342    }
343}
344
345impl HoprSocket {
346    pub fn new() -> Self {
347        Self::default()
348    }
349
350    pub fn reader(self) -> UnboundedReceiver<ApplicationData> {
351        self.rx
352    }
353
354    pub fn writer(&self) -> UnboundedSender<ApplicationData> {
355        self.tx.clone()
356    }
357}
358
359/// HOPR main object providing the entire HOPR node functionality
360///
361/// Instantiating this object creates all processes and objects necessary for
362/// running the HOPR node. Once created, the node can be started using the
363/// `run()` method.
364///
365/// Externally offered API should be sufficient to perform all necessary tasks
366/// with the HOPR node manually, but it is advised to create such a configuration
367/// that manual interaction is unnecessary.
368///
369/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
370pub struct Hopr {
371    me: OffchainKeypair,
372    me_chain: ChainKeypair,
373    cfg: config::HoprLibConfig,
374    state: Arc<AtomicHoprState>,
375    transport_api: HoprTransport<HoprDb>,
376    hopr_chain_api: HoprChain<HoprDb>,
377    // objects that could be removed pending architectural cleanup ========
378    db: HoprDb,
379    chain_cfg: ChainNetworkConfig,
380    channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
381    multistrategy: Arc<MultiStrategy>,
382    rx_indexer_significant_events: async_channel::Receiver<SignificantChainEvent>,
383}
384
385impl Hopr {
386    pub fn new(
387        mut cfg: config::HoprLibConfig,
388        me: &OffchainKeypair,
389        me_onchain: &ChainKeypair,
390    ) -> crate::errors::Result<Self> {
391        let multiaddress: Multiaddr = (&cfg.host).try_into()?;
392
393        let db_path: PathBuf = [&cfg.db.data, "db"].iter().collect();
394        info!(path = ?db_path, "Initiating DB");
395
396        if cfg.db.force_initialize {
397            info!("Force cleaning up existing database");
398            remove_dir_all(db_path.as_path()).map_err(|e| {
399                HoprLibError::GeneralError(format!(
400                    "Failed to remove the existing DB directory at '{db_path:?}': {e}"
401                ))
402            })?;
403            cfg.db.initialize = true
404        }
405
406        // create DB dir if it does not exist
407        if let Some(parent_dir_path) = db_path.as_path().parent() {
408            if !parent_dir_path.is_dir() {
409                std::fs::create_dir_all(parent_dir_path).map_err(|e| {
410                    HoprLibError::GeneralError(format!(
411                        "Failed to create DB parent directory at '{parent_dir_path:?}': {e}"
412                    ))
413                })?
414            }
415        }
416
417        let db_cfg = HoprDbConfig {
418            create_if_missing: cfg.db.initialize,
419            force_create: cfg.db.force_initialize,
420            log_slow_queries: std::time::Duration::from_millis(150),
421        };
422        let db = futures::executor::block_on(HoprDb::new(db_path.as_path(), me_onchain.clone(), db_cfg))?;
423
424        if let Some(provider) = &cfg.chain.provider {
425            info!(provider, "Creating chain components using the custom provider");
426        } else {
427            info!("Creating chain components using the default provider");
428        }
429        let resolved_environment = hopr_chain_api::config::ChainNetworkConfig::new(
430            &cfg.chain.network,
431            crate::constants::APP_VERSION_COERCED,
432            cfg.chain.provider.as_deref(),
433            cfg.chain.max_rpc_requests_per_sec,
434            &mut cfg.chain.protocols,
435        )
436        .map_err(|e| HoprLibError::GeneralError(format!("Failed to resolve blockchain environment: {e}")))?;
437
438        let contract_addresses = ContractAddresses::from(&resolved_environment);
439        info!(
440            myself = me_onchain.public().to_hex(),
441            contract_addresses = ?contract_addresses,
442            "Resolved contract addresses",
443        );
444
445        let my_multiaddresses = vec![multiaddress];
446
447        let (tx_indexer_events, rx_indexer_events) = async_channel::unbounded::<SignificantChainEvent>();
448
449        let channel_graph = Arc::new(RwLock::new(ChannelGraph::new(
450            me_onchain.public().to_address(),
451            ChannelGraphConfig::default(),
452        )));
453
454        let hopr_transport_api = HoprTransport::new(
455            me,
456            me_onchain,
457            HoprTransportConfig {
458                transport: cfg.transport.clone(),
459                network: cfg.network_options.clone(),
460                protocol: cfg.protocol,
461                probe: cfg.probe,
462                session: cfg.session,
463            },
464            db.clone(),
465            channel_graph.clone(),
466            my_multiaddresses,
467        );
468
469        let hopr_hopr_chain_api = hopr_chain_api::HoprChain::new(
470            me_onchain.clone(),
471            db.clone(),
472            resolved_environment.clone(),
473            cfg.safe_module.module_address,
474            ContractAddresses {
475                announcements: resolved_environment.announcements,
476                channels: resolved_environment.channels,
477                token: resolved_environment.token,
478                price_oracle: resolved_environment.ticket_price_oracle,
479                win_prob_oracle: resolved_environment.winning_probability_oracle,
480                network_registry: resolved_environment.network_registry,
481                network_registry_proxy: resolved_environment.network_registry_proxy,
482                stake_factory: resolved_environment.node_stake_v2_factory,
483                safe_registry: resolved_environment.node_safe_registry,
484                module_implementation: resolved_environment.module_implementation,
485            },
486            cfg.safe_module.safe_address,
487            hopr_chain_indexer::IndexerConfig {
488                start_block_number: resolved_environment.channel_contract_deploy_block as u64,
489                fast_sync: cfg.chain.fast_sync,
490            },
491            tx_indexer_events,
492        )?;
493
494        let multi_strategy = Arc::new(MultiStrategy::new(
495            cfg.strategy.clone(),
496            db.clone(),
497            hopr_hopr_chain_api.actions_ref().clone(),
498            hopr_transport_api.ticket_aggregator(),
499        ));
500        debug!(
501            strategies = tracing::field::debug(&multi_strategy),
502            "Initialized strategies"
503        );
504
505        #[cfg(all(feature = "prometheus", not(test)))]
506        {
507            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
508            METRIC_HOPR_LIB_VERSION.set(
509                &[const_format::formatcp!("{}", constants::APP_VERSION)],
510                f64::from_str(const_format::formatcp!(
511                    "{}.{}",
512                    env!("CARGO_PKG_VERSION_MAJOR"),
513                    env!("CARGO_PKG_VERSION_MINOR")
514                ))
515                .unwrap_or(0.0),
516            );
517
518            // Calling get_ticket_statistics will initialize the respective metrics on tickets
519            if let Err(e) = futures::executor::block_on(db.get_ticket_statistics(None)) {
520                error!(error = %e, "Failed to initialize ticket statistics metrics");
521            }
522        }
523
524        Ok(Self {
525            me: me.clone(),
526            me_chain: me_onchain.clone(),
527            cfg,
528            state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
529            transport_api: hopr_transport_api,
530            hopr_chain_api: hopr_hopr_chain_api,
531            db,
532            chain_cfg: resolved_environment,
533            channel_graph,
534            multistrategy: multi_strategy,
535            rx_indexer_significant_events: rx_indexer_events,
536        })
537    }
538
539    fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
540        if self.status() == state {
541            Ok(())
542        } else {
543            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
544        }
545    }
546
547    pub fn status(&self) -> HoprState {
548        self.state.load(Ordering::Relaxed)
549    }
550
551    pub fn version_coerced(&self) -> String {
552        String::from(constants::APP_VERSION_COERCED)
553    }
554
555    pub fn version(&self) -> String {
556        String::from(constants::APP_VERSION)
557    }
558
559    pub fn network(&self) -> String {
560        self.cfg.chain.network.clone()
561    }
562
563    pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
564        Ok(self.hopr_chain_api.get_balance().await?)
565    }
566
567    pub async fn get_eligibility_status(&self) -> errors::Result<bool> {
568        Ok(self.hopr_chain_api.get_eligibility_status().await?)
569    }
570
571    pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
572        let safe_balance = self
573            .hopr_chain_api
574            .get_safe_balance(self.cfg.safe_module.safe_address)
575            .await?;
576        Ok(safe_balance)
577    }
578
579    pub fn get_safe_config(&self) -> SafeModule {
580        self.cfg.safe_module.clone()
581    }
582
583    pub fn chain_config(&self) -> ChainNetworkConfig {
584        self.chain_cfg.clone()
585    }
586
587    pub fn config(&self) -> &config::HoprLibConfig {
588        &self.cfg
589    }
590
591    pub fn get_provider(&self) -> String {
592        self.cfg
593            .chain
594            .provider
595            .clone()
596            .unwrap_or(self.chain_cfg.chain.default_provider.clone())
597    }
598
599    #[inline]
600    fn is_public(&self) -> bool {
601        self.cfg.chain.announce
602    }
603
604    pub async fn run<#[cfg(feature = "session-server")] T: HoprSessionReactor + Clone + Send + 'static>(
605        &self,
606        #[cfg(feature = "session-server")] serve_handler: T,
607    ) -> errors::Result<(HoprSocket, HashMap<HoprLibProcesses, AbortHandle>)> {
608        self.error_if_not_in_state(
609            HoprState::Uninitialized,
610            "Cannot start the hopr node multiple times".into(),
611        )?;
612
613        info!(
614            address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
615            "Node is not started, please fund this node",
616        );
617
618        let mut processes: HashMap<HoprLibProcesses, AbortHandle> = HashMap::new();
619
620        wait_for_funds(
621            self.me_onchain(),
622            *MIN_NATIVE_BALANCE,
623            Duration::from_secs(200),
624            self.hopr_chain_api.rpc(),
625        )
626        .await?;
627
628        info!("Starting the node...");
629
630        self.state.store(HoprState::Initializing, Ordering::Relaxed);
631
632        let balance: XDaiBalance = self.get_balance().await?;
633        let minimum_balance = *constants::MIN_NATIVE_BALANCE;
634
635        info!(
636            address = %self.hopr_chain_api.me_onchain(),
637            %balance,
638            %minimum_balance,
639            "Node information"
640        );
641
642        if balance.le(&minimum_balance) {
643            return Err(HoprLibError::GeneralError(
644                "Cannot start the node without a sufficiently funded wallet".to_string(),
645            ));
646        }
647
648        // Once we are able to query the chain,
649        // check if the ticket price is configured correctly.
650        let network_min_ticket_price = self.hopr_chain_api.get_minimum_ticket_price().await?;
651
652        let configured_ticket_price = self.cfg.protocol.outgoing_ticket_price;
653        if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
654            return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
655                "configured outgoing ticket price is lower than the network minimum ticket price: \
656                 {configured_ticket_price:?} < {network_min_ticket_price}"
657            ))));
658        }
659
660        // Once we are able to query the chain,
661        // check if the winning probability is configured correctly.
662        let network_min_win_prob = self.hopr_chain_api.get_minimum_winning_probability().await?;
663        let configured_win_prob = self.cfg.protocol.outgoing_ticket_winning_prob;
664        if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
665            && configured_win_prob
666                .and_then(|c| WinningProbability::try_from(c).ok())
667                .is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
668        {
669            return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
670                "configured outgoing ticket winning probability is lower than the network minimum winning \
671                 probability: {configured_win_prob:?} < {network_min_win_prob}"
672            ))));
673        }
674
675        // set safe and module addresses in the DB
676        self.db
677            .set_safe_info(
678                None,
679                SafeInfo {
680                    safe_address: self.cfg.safe_module.safe_address,
681                    module_address: self.cfg.safe_module.module_address,
682                },
683            )
684            .await?;
685
686        self.state.store(HoprState::Indexing, Ordering::Relaxed);
687
688        let (mut indexer_peer_update_tx, indexer_peer_update_rx) = futures::channel::mpsc::unbounded::<PeerDiscovery>();
689
690        let indexer_event_pipeline = chain_events_to_transport_events(
691            self.rx_indexer_significant_events.clone(),
692            self.me_onchain(),
693            self.db.clone(),
694            self.multistrategy.clone(),
695            self.channel_graph.clone(),
696            self.hopr_chain_api.action_state(),
697        )
698        .await;
699
700        {
701            // This has to happen before the indexing process starts in order to make sure that the pre-existing data is
702            // properly populated into the transport mechanism before the synced data in the follow up process.
703            info!("Syncing peer announcements and network registry updates from previous runs");
704            let accounts = self.db.get_accounts(None, true).await?;
705            for account in accounts.into_iter() {
706                match account.entry_type {
707                    AccountType::NotAnnounced => {}
708                    AccountType::Announced { multiaddr, .. } => {
709                        indexer_peer_update_tx
710                            .send(PeerDiscovery::Announce(account.public_key.into(), vec![multiaddr]))
711                            .await
712                            .map_err(|e| {
713                                HoprLibError::GeneralError(format!("Failed to send peer discovery announcement: {e}"))
714                            })?;
715
716                        let allow_status = if self
717                            .db
718                            .is_allowed_in_network_registry(None, &account.chain_addr)
719                            .await?
720                        {
721                            PeerDiscovery::Allow(account.public_key.into())
722                        } else {
723                            PeerDiscovery::Ban(account.public_key.into())
724                        };
725
726                        indexer_peer_update_tx.send(allow_status).await.map_err(|e| {
727                            HoprLibError::GeneralError(format!(
728                                "Failed to send peer discovery network registry event: {e}"
729                            ))
730                        })?;
731                    }
732                }
733            }
734        }
735
736        spawn(async move {
737            indexer_event_pipeline
738                .map(Ok)
739                .forward(indexer_peer_update_tx)
740                .await
741                .expect("The index to transport event chain failed");
742        });
743
744        info!("Start the chain process and sync the indexer");
745        for (id, proc) in self.hopr_chain_api.start().await?.into_iter() {
746            let nid = match id {
747                HoprChainProcess::Indexer => HoprLibProcesses::Indexing,
748                HoprChainProcess::OutgoingOnchainActionQueue => HoprLibProcesses::OutgoingOnchainActionQueue,
749            };
750            processes.insert(nid, proc);
751        }
752
753        {
754            // Show onboarding information
755            let my_ethereum_address = self.me_onchain();
756            let my_peer_id = self.me_peer_id();
757            let my_version = crate::constants::APP_VERSION;
758
759            while !self
760                .db
761                .clone()
762                .is_allowed_in_network_registry(None, &my_ethereum_address)
763                .await
764                .unwrap_or(false)
765            {
766                info!(
767                    "Once you become eligible to join the HOPR network, you can continue your onboarding by using the following URL: https://hub.hoprnet.org/staking/onboarding?HOPRdNodeAddressForOnboarding={}, or by manually entering the node address of your node on https://hub.hoprnet.org/.",
768                    my_ethereum_address.to_hex()
769                );
770
771                sleep(ONBOARDING_INFORMATION_INTERVAL).await;
772
773                info!(peer_id = %my_peer_id, address = %my_ethereum_address.to_hex(), version = &my_version, "Node information");
774                info!("Node Ethereum address: {my_ethereum_address} <- put this into staking hub");
775            }
776        }
777
778        // Check Safe-module status:
779        // 1) if the node is already included into the module
780        // 2) if the module is enabled in the safe
781        // 3) if the safe is the owner of the module
782        // if any of the conditions is not met, return error
783        let safe_module_configuration = self
784            .hopr_chain_api
785            .rpc()
786            .check_node_safe_module_status(self.me_onchain())
787            .await
788            .map_err(HoprChainError::Rpc)?;
789
790        if !safe_module_configuration.should_pass() {
791            error!(
792                ?safe_module_configuration,
793                "Something is wrong with the safe module configuration",
794            );
795            return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
796                "Safe and module are not configured correctly {:?}",
797                safe_module_configuration,
798            ))));
799        }
800
801        // Possibly register a node-safe pair to NodeSafeRegistry.
802        // Following that, the connector is set to use safe tx variants.
803        if can_register_with_safe(
804            self.me_onchain(),
805            self.cfg.safe_module.safe_address,
806            self.hopr_chain_api.rpc(),
807        )
808        .await?
809        {
810            info!("Registering safe by node");
811
812            if self.me_onchain() == self.cfg.safe_module.safe_address {
813                return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
814            }
815
816            if let Err(e) = self
817                .hopr_chain_api
818                .actions_ref()
819                .register_safe_by_node(self.cfg.safe_module.safe_address)
820                .await?
821                .await
822            {
823                // Intentionally ignoring the errored state
824                error!(error = %e, "Failed to register node with safe")
825            }
826        }
827
828        if self.is_public() {
829            // At this point the node is already registered with Safe, so
830            // we can announce via Safe-compliant TX
831
832            let multiaddresses_to_announce = self.transport_api.announceable_multiaddresses();
833
834            // The announcement is intentionally not awaited until confirmation
835            match self
836                .hopr_chain_api
837                .actions_ref()
838                .announce(&multiaddresses_to_announce, &self.me)
839                .await
840            {
841                Ok(_) => info!(?multiaddresses_to_announce, "Announcing node on chain",),
842                Err(ChainActionsError::AlreadyAnnounced) => {
843                    info!(multiaddresses_announced = ?multiaddresses_to_announce, "Node already announced on chain")
844                }
845                // If the announcement fails, we keep going to prevent the node from retrying
846                // after restart.
847                // Functionality is limited, and users must check the logs for errors.
848                Err(e) => error!(error = %e, "Failed to transmit node announcement"),
849            }
850        }
851
852        {
853            // Sync key ids from indexed Accounts
854
855            // Sync the Channel graph
856            let channel_graph = self.channel_graph.clone();
857            let mut cg = channel_graph.write_arc().await;
858
859            info!("Syncing channels from the previous runs");
860            let mut channel_stream = self
861                .db
862                .stream_active_channels()
863                .await
864                .map_err(hopr_db_sql::api::errors::DbError::from)?;
865
866            while let Some(maybe_channel) = channel_stream.next().await {
867                match maybe_channel {
868                    Ok(channel) => {
869                        cg.update_channel(channel);
870                    }
871                    Err(error) => error!(%error, "Failed to sync channel into the graph"),
872                }
873            }
874
875            // Initialize node latencies and scores in the channel graph:
876            // Sync only those nodes that we know that had a good quality
877            // Other nodes will be repopulated into the channel graph during heartbeat
878            // rounds.
879            info!("Syncing peer qualities from the previous runs");
880            let min_quality_to_sync: f64 = std::env::var("HOPR_MIN_PEER_QUALITY_TO_SYNC")
881                .map_err(|e| e.to_string())
882                .and_then(|v| std::str::FromStr::from_str(&v).map_err(|_| "parse error".to_string()))
883                .unwrap_or_else(|error| {
884                    warn!(error, "invalid value for HOPR_MIN_PEER_QUALITY_TO_SYNC env variable");
885                    constants::DEFAULT_MIN_QUALITY_TO_SYNC
886                });
887
888            let mut peer_stream = self
889                .db
890                .get_network_peers(Default::default(), false)
891                .await?
892                .filter(|status| futures::future::ready(status.quality >= min_quality_to_sync));
893
894            while let Some(peer) = peer_stream.next().await {
895                if let Some(ChainKey(key)) = self.db.translate_key(None, peer.id.0).await? {
896                    // For nodes that had a good quality, we assign a perfect score
897                    cg.update_node_score(&key, NodeScoreUpdate::Initialize(peer.last_seen_latency, 1.0));
898                } else {
899                    error!(peer = %peer.id.1, "Could not translate peer information");
900                }
901            }
902
903            info!(
904                channels = cg.count_channels(),
905                nodes = cg.count_nodes(),
906                "Channel graph sync complete"
907            );
908        }
909
910        let socket = HoprSocket::new();
911        let transport_output_tx = socket.writer();
912
913        // notifier on acknowledged ticket reception
914        let multi_strategy_ack_ticket = self.multistrategy.clone();
915        let (on_ack_tkt_tx, mut on_ack_tkt_rx) = unbounded::<AcknowledgedTicket>();
916        self.db.start_ticket_processing(Some(on_ack_tkt_tx))?;
917
918        processes.insert(
919            HoprLibProcesses::OnReceivedAcknowledgement,
920            hopr_async_runtime::spawn_as_abortable(async move {
921                while let Some(ack) = on_ack_tkt_rx.next().await {
922                    if let Err(error) = hopr_strategy::strategy::SingularStrategy::on_acknowledged_winning_ticket(
923                        &*multi_strategy_ack_ticket,
924                        &ack,
925                    )
926                    .await
927                    {
928                        error!(%error, "Failed to process acknowledged winning ticket with the strategy");
929                    }
930                }
931            }),
932        );
933
934        let (session_tx, _session_rx) = unbounded::<IncomingSession>();
935
936        #[cfg(feature = "session-server")]
937        {
938            processes.insert(
939                HoprLibProcesses::SessionServer,
940                hopr_async_runtime::spawn_as_abortable(_session_rx.for_each_concurrent(None, move |session| {
941                    let serve_handler = serve_handler.clone();
942                    async move {
943                        let session_id = *session.session.id();
944                        match serve_handler.process(session).await {
945                            Ok(_) => debug!(
946                                session_id = ?session_id,
947                                "Client session processed successfully"
948                            ),
949                            Err(e) => error!(
950                                session_id = ?session_id,
951                                error = %e,
952                                "Client session processing failed"
953                            ),
954                        }
955                    }
956                })),
957            );
958        }
959
960        for (id, proc) in self
961            .transport_api
962            .run(
963                &self.me_chain,
964                join(&[&self.cfg.db.data, "tbf"])
965                    .map_err(|e| HoprLibError::GeneralError(format!("Failed to construct the bloom filter: {e}")))?,
966                transport_output_tx,
967                indexer_peer_update_rx,
968                session_tx,
969            )
970            .await?
971            .into_iter()
972        {
973            processes.insert(id.into(), proc);
974        }
975
976        let db_clone = self.db.clone();
977        processes.insert(
978            HoprLibProcesses::TicketIndexFlush,
979            hopr_async_runtime::spawn_as_abortable(Box::pin(execute_on_tick(
980                Duration::from_secs(5),
981                move || {
982                    let db_clone = db_clone.clone();
983                    async move {
984                        match db_clone.persist_outgoing_ticket_indices().await {
985                            Ok(n) => debug!(count = n, "Successfully flushed states of outgoing ticket indices"),
986                            Err(e) => error!(error = %e, "Failed to flush ticket indices"),
987                        }
988                    }
989                },
990                "flush the states of outgoing ticket indices".into(),
991            ))),
992        );
993
994        // NOTE: after the chain is synced, we can reset tickets which are considered
995        // redeemed but on-chain state does not align with that. This implies there was a problem
996        // right when the transaction was sent on-chain. In such cases, we simply let it retry and
997        // handle errors appropriately.
998        if let Err(e) = self.db.fix_channels_next_ticket_state().await {
999            error!(error = %e, "failed to fix channels ticket states");
1000        }
1001
1002        // NOTE: strategy ticks must start after the chain is synced, otherwise
1003        // the strategy would react to historical data and drain through the native
1004        // balance on chain operations not relevant for the present network state
1005        let multi_strategy = self.multistrategy.clone();
1006        let strategy_interval = self.cfg.strategy.execution_interval;
1007        processes.insert(
1008            HoprLibProcesses::StrategyTick,
1009            hopr_async_runtime::spawn_as_abortable(async move {
1010                execute_on_tick(
1011                    Duration::from_secs(strategy_interval),
1012                    move || {
1013                        let multi_strategy = multi_strategy.clone();
1014
1015                        async move {
1016                            trace!(state = "started", "strategy tick");
1017                            let _ = multi_strategy.on_tick().await;
1018                            trace!(state = "finished", "strategy tick");
1019                        }
1020                    },
1021                    "run strategies".into(),
1022                )
1023                .await;
1024            }),
1025        );
1026
1027        self.state.store(HoprState::Running, Ordering::Relaxed);
1028
1029        info!(
1030            id = %self.me_peer_id(),
1031            version = constants::APP_VERSION,
1032            "NODE STARTED AND RUNNING"
1033        );
1034
1035        #[cfg(all(feature = "prometheus", not(test)))]
1036        METRIC_HOPR_NODE_INFO.set(
1037            &[
1038                &self.me.public().to_peerid_str(),
1039                &self.me_onchain().to_string(),
1040                &self.cfg.safe_module.safe_address.to_string(),
1041                &self.cfg.safe_module.module_address.to_string(),
1042            ],
1043            1.0,
1044        );
1045
1046        Ok((socket, processes))
1047    }
1048
1049    // p2p transport =========
1050    /// Own PeerId used in the libp2p transport layer
1051    pub fn me_peer_id(&self) -> PeerId {
1052        (*self.me.public()).into()
1053    }
1054
1055    /// Get the list of all announced public nodes in the network
1056    pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
1057        Ok(self.transport_api.get_public_nodes().await?)
1058    }
1059
1060    /// Returns the most recently indexed log, if any.
1061    pub async fn get_indexer_state(&self) -> errors::Result<IndexerStateInfo> {
1062        let indexer_state_info = self.db.get_indexer_state_info(None).await?;
1063
1064        match self.db.get_last_checksummed_log().await? {
1065            Some(log) => {
1066                let checksum = match log.checksum {
1067                    Some(checksum) => Hash::from_hex(checksum.as_str())?,
1068                    None => Hash::default(),
1069                };
1070                Ok(IndexerStateInfo {
1071                    latest_log_block_number: log.block_number as u32,
1072                    latest_log_checksum: checksum,
1073                    ..indexer_state_info
1074                })
1075            }
1076            None => Ok(indexer_state_info),
1077        }
1078    }
1079
1080    /// Test whether the peer with PeerId is allowed to access the network
1081    pub async fn is_allowed_to_access_network(
1082        &self,
1083        address_like: either::Either<&PeerId, Address>,
1084    ) -> errors::Result<bool> {
1085        Ok(self.transport_api.is_allowed_to_access_network(address_like).await?)
1086    }
1087
1088    /// Ping another node in the network based on the PeerId
1089    ///
1090    /// Returns the RTT (round trip time), i.e. how long it took for the ping to return.
1091    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
1092        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1093
1094        Ok(self.transport_api.ping(peer).await?)
1095    }
1096
1097    /// Create a client session connection returning a session object that implements
1098    /// [`AsyncRead`] and [`AsyncWrite`] and can bu used as a read/write binary session.
1099    #[cfg(feature = "session-client")]
1100    pub async fn connect_to(
1101        &self,
1102        destination: Address,
1103        target: SessionTarget,
1104        cfg: SessionClientConfig,
1105    ) -> errors::Result<HoprSession> {
1106        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1107
1108        let backoff = backon::ConstantBuilder::default()
1109            .with_max_times(self.cfg.session.establish_max_retries as usize)
1110            .with_delay(self.cfg.session.establish_retry_timeout)
1111            .with_jitter();
1112
1113        struct Sleeper;
1114        impl backon::Sleeper for Sleeper {
1115            type Sleep = futures_timer::Delay;
1116
1117            fn sleep(&self, dur: Duration) -> Self::Sleep {
1118                futures_timer::Delay::new(dur)
1119            }
1120        }
1121
1122        use backon::Retryable;
1123
1124        Ok((|| {
1125            let cfg = cfg.clone();
1126            let target = target.clone();
1127            async { self.transport_api.new_session(destination, target, cfg).await }
1128        })
1129        .retry(backoff)
1130        .sleep(Sleeper)
1131        .await?)
1132    }
1133
1134    /// Sends keep-alive to the given [`HoprSessionId`], making sure the session is not
1135    /// closed due to inactivity.
1136    #[cfg(feature = "session-client")]
1137    pub async fn keep_alive_session(&self, id: &HoprSessionId) -> errors::Result<()> {
1138        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1139        Ok(self.transport_api.probe_session(id).await?)
1140    }
1141
1142    /// Send a message to another peer in the network
1143    ///
1144    /// @param msg message to send
1145    /// @param destination PeerId of the destination
1146    /// @param options optional configuration of the message path using hops and intermediatePath
1147    /// @param applicationTag optional tag identifying the sending application
1148    /// @returns ack challenge
1149    #[tracing::instrument(level = "debug", skip(self, msg))]
1150    pub async fn send_message(
1151        &self,
1152        msg: Box<[u8]>,
1153        routing: DestinationRouting,
1154        application_tag: Tag,
1155    ) -> errors::Result<()> {
1156        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1157
1158        self.transport_api.send_message(msg, routing, application_tag).await?;
1159
1160        Ok(())
1161    }
1162
1163    /// Attempts to aggregate all tickets in the given channel
1164    pub async fn aggregate_tickets(&self, channel: &Hash) -> errors::Result<()> {
1165        Ok(self.transport_api.aggregate_tickets(channel).await?)
1166    }
1167
1168    /// List all multiaddresses announced by this node
1169    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1170        self.transport_api.local_multiaddresses()
1171    }
1172
1173    /// List all multiaddresses on which the node is listening
1174    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1175        self.transport_api.listening_multiaddresses().await
1176    }
1177
1178    /// List all multiaddresses observed for a PeerId
1179    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1180        self.transport_api.network_observed_multiaddresses(peer).await
1181    }
1182
1183    /// List all multiaddresses announced on-chain for the given node.
1184    pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Vec<Multiaddr> {
1185        let key = match OffchainPublicKey::try_from(peer) {
1186            Ok(k) => k,
1187            Err(e) => {
1188                error!(%peer, error = %e, "failed to convert peer id to off-chain key");
1189                return vec![];
1190            }
1191        };
1192
1193        match self.db.get_account(None, key).await {
1194            Ok(Some(entry)) => Vec::from_iter(entry.get_multiaddr()),
1195            Ok(None) => {
1196                error!(%peer, "no information");
1197                vec![]
1198            }
1199            Err(e) => {
1200                error!(%peer, error = %e, "failed to retrieve information");
1201                vec![]
1202            }
1203        }
1204    }
1205
1206    // Network =========
1207
1208    /// Get measured network health
1209    pub async fn network_health(&self) -> Health {
1210        self.transport_api.network_health().await
1211    }
1212
1213    /// List all peers connected to this
1214    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
1215        Ok(self.transport_api.network_connected_peers().await?)
1216    }
1217
1218    /// Get all data collected from the network relevant for a PeerId
1219    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<hopr_transport::PeerStatus>> {
1220        Ok(self.transport_api.network_peer_info(peer).await?)
1221    }
1222
1223    /// Get peers connected peers with quality higher than some value
1224    pub async fn all_network_peers(
1225        &self,
1226        minimum_quality: f64,
1227    ) -> errors::Result<Vec<(Option<Address>, PeerId, hopr_transport::PeerStatus)>> {
1228        Ok(
1229            futures::stream::iter(self.transport_api.network_connected_peers().await?)
1230                .filter_map(|peer| async move {
1231                    if let Ok(Some(info)) = self.transport_api.network_peer_info(&peer).await {
1232                        if info.get_average_quality() >= minimum_quality {
1233                            Some((peer, info))
1234                        } else {
1235                            None
1236                        }
1237                    } else {
1238                        None
1239                    }
1240                })
1241                .filter_map(|(peer_id, info)| async move {
1242                    let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
1243                    Some((address, peer_id, info))
1244                })
1245                .collect::<Vec<_>>()
1246                .await,
1247        )
1248    }
1249
1250    // Ticket ========
1251    /// Get all tickets in a channel specified by Hash
1252    pub async fn tickets_in_channel(&self, channel: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
1253        Ok(self.transport_api.tickets_in_channel(channel).await?)
1254    }
1255
1256    /// Get all tickets
1257    pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
1258        Ok(self.transport_api.all_tickets().await?)
1259    }
1260
1261    /// Get statistics for all tickets
1262    pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
1263        Ok(self.transport_api.ticket_statistics().await?)
1264    }
1265
1266    /// Reset the ticket metrics to zero
1267    pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
1268        Ok(self.db.reset_ticket_statistics().await?)
1269    }
1270
1271    // DB ============
1272    pub fn peer_resolver(&self) -> &impl HoprDbResolverOperations {
1273        &self.db
1274    }
1275
1276    // Chain =========
1277    pub fn me_onchain(&self) -> Address {
1278        self.hopr_chain_api.me_onchain()
1279    }
1280
1281    /// Get ticket price
1282    pub async fn get_ticket_price(&self) -> errors::Result<Option<HoprBalance>> {
1283        Ok(self.hopr_chain_api.ticket_price().await?)
1284    }
1285
1286    /// Get minimum incoming ticket winning probability
1287    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
1288        Ok(self
1289            .db
1290            .get_indexer_data(None)
1291            .await?
1292            .minimum_incoming_ticket_winning_prob)
1293    }
1294
1295    /// List of all accounts announced on the chain
1296    pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1297        Ok(self.hopr_chain_api.accounts_announced_on_chain().await?)
1298    }
1299
1300    /// Get the channel entry from Hash.
1301    /// @returns the channel entry of those two nodes
1302    pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1303        Ok(self.db.get_channel_by_id(None, channel_id).await?)
1304    }
1305
1306    /// Get the channel entry between source and destination node.
1307    /// @param src Address
1308    /// @param dest Address
1309    /// @returns the channel entry of those two nodes
1310    pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<ChannelEntry> {
1311        Ok(self.hopr_chain_api.channel(src, dest).await?)
1312    }
1313
1314    /// List all channels open from a specified Address
1315    pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1316        Ok(self.hopr_chain_api.channels_from(src).await?)
1317    }
1318
1319    /// List all channels open to a specified address
1320    pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1321        Ok(self.hopr_chain_api.channels_to(dest).await?)
1322    }
1323
1324    /// List all channels
1325    pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1326        Ok(self.hopr_chain_api.all_channels().await?)
1327    }
1328
1329    /// Current safe allowance balance
1330    pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1331        Ok(self.hopr_chain_api.safe_allowance().await?)
1332    }
1333
1334    /// Withdraw on-chain assets to a given address
1335    /// @param recipient the account where the assets should be transferred to
1336    /// @param amount how many tokens to be transferred
1337    pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<Hash> {
1338        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1339
1340        Ok(self
1341            .hopr_chain_api
1342            .actions_ref()
1343            .withdraw(recipient, amount)
1344            .await?
1345            .await?
1346            .tx_hash)
1347    }
1348
1349    /// Withdraw on-chain native assets to a given address
1350    /// @param recipient the account where the assets should be transferred to
1351    /// @param amount how many tokens to be transferred
1352    pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<Hash> {
1353        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1354
1355        Ok(self
1356            .hopr_chain_api
1357            .actions_ref()
1358            .withdraw_native(recipient, amount)
1359            .await?
1360            .await?
1361            .tx_hash)
1362    }
1363
1364    pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1365        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1366
1367        let awaiter = self
1368            .hopr_chain_api
1369            .actions_ref()
1370            .open_channel(*destination, amount)
1371            .await?;
1372
1373        let channel_id = generate_channel_id(&self.hopr_chain_api.me_onchain(), destination);
1374        Ok(awaiter.await.map(|confirm| OpenChannelResult {
1375            tx_hash: confirm.tx_hash,
1376            channel_id,
1377        })?)
1378    }
1379
1380    pub async fn fund_channel(&self, channel_id: &Hash, amount: HoprBalance) -> errors::Result<Hash> {
1381        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1382
1383        Ok(self
1384            .hopr_chain_api
1385            .actions_ref()
1386            .fund_channel(*channel_id, amount)
1387            .await?
1388            .await
1389            .map(|confirm| confirm.tx_hash)?)
1390    }
1391
1392    pub async fn close_channel(
1393        &self,
1394        counterparty: &Address,
1395        direction: ChannelDirection,
1396        redeem_before_close: bool,
1397    ) -> errors::Result<CloseChannelResult> {
1398        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1399
1400        let confirmation = self
1401            .hopr_chain_api
1402            .actions_ref()
1403            .close_channel(*counterparty, direction, redeem_before_close)
1404            .await?
1405            .await?;
1406
1407        match confirmation
1408            .event
1409            .expect("channel close action confirmation must have associated chain event")
1410        {
1411            ChainEventType::ChannelClosureInitiated(c) => Ok(CloseChannelResult {
1412                tx_hash: confirmation.tx_hash,
1413                status: c.status, // copy the information about closure time
1414            }),
1415            ChainEventType::ChannelClosed(_) => Ok(CloseChannelResult {
1416                tx_hash: confirmation.tx_hash,
1417                status: ChannelStatus::Closed,
1418            }),
1419            _ => Err(HoprLibError::GeneralError("close channel transaction failed".into())),
1420        }
1421    }
1422
1423    pub async fn close_channel_by_id(
1424        &self,
1425        channel_id: Hash,
1426        redeem_before_close: bool,
1427    ) -> errors::Result<CloseChannelResult> {
1428        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1429
1430        match self.channel_from_hash(&channel_id).await? {
1431            Some(channel) => match channel.orientation(&self.me_onchain()) {
1432                Some((direction, counterparty)) => {
1433                    self.close_channel(&counterparty, direction, redeem_before_close).await
1434                }
1435                None => Err(HoprLibError::ChainError(ChainActionsError::InvalidArguments(
1436                    "cannot close channel that is not own".into(),
1437                ))),
1438            },
1439            None => Err(HoprLibError::ChainError(ChainActionsError::ChannelDoesNotExist)),
1440        }
1441    }
1442
1443    pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1444        Ok(self.hopr_chain_api.get_channel_closure_notice_period().await?)
1445    }
1446
1447    pub async fn redeem_all_tickets(&self, only_aggregated: bool) -> errors::Result<()> {
1448        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1449
1450        // We do not await the on-chain confirmation
1451        self.hopr_chain_api
1452            .actions_ref()
1453            .redeem_all_tickets(only_aggregated)
1454            .await?;
1455
1456        Ok(())
1457    }
1458
1459    pub async fn redeem_tickets_with_counterparty(
1460        &self,
1461        counterparty: &Address,
1462        only_aggregated: bool,
1463    ) -> errors::Result<()> {
1464        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1465
1466        // We do not await the on-chain confirmation
1467        let _ = self
1468            .hopr_chain_api
1469            .actions_ref()
1470            .redeem_tickets_with_counterparty(counterparty, only_aggregated)
1471            .await?;
1472
1473        Ok(())
1474    }
1475
1476    pub async fn redeem_tickets_in_channel(&self, channel_id: &Hash, only_aggregated: bool) -> errors::Result<usize> {
1477        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1478
1479        let channel = self.db.get_channel_by_id(None, channel_id).await?;
1480        let mut redeem_count = 0;
1481
1482        if let Some(channel) = channel {
1483            if channel.destination == self.hopr_chain_api.me_onchain() {
1484                // We do not await the on-chain confirmation
1485                redeem_count = self
1486                    .hopr_chain_api
1487                    .actions_ref()
1488                    .redeem_tickets_in_channel(&channel, only_aggregated)
1489                    .await?
1490                    .len();
1491            }
1492        }
1493
1494        Ok(redeem_count)
1495    }
1496
1497    pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1498        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1499
1500        // We do not await the on-chain confirmation
1501        #[allow(clippy::let_underscore_future)]
1502        let _ = self.hopr_chain_api.actions_ref().redeem_ticket(ack_ticket).await?;
1503
1504        Ok(())
1505    }
1506
1507    pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1508        let pk = hopr_transport::OffchainPublicKey::try_from(peer_id)?;
1509        Ok(self.db.resolve_chain_key(&pk).await?)
1510    }
1511
1512    pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1513        Ok(self
1514            .db
1515            .resolve_packet_key(address)
1516            .await
1517            .map(|pk| pk.map(|v| v.into()))?)
1518    }
1519
1520    pub async fn export_channel_graph(&self, cfg: GraphExportConfig) -> String {
1521        self.channel_graph.read_arc().await.as_dot(cfg)
1522    }
1523
1524    pub async fn export_raw_channel_graph(&self) -> errors::Result<String> {
1525        let cg = self.channel_graph.read_arc().await;
1526        serde_json::to_string(cg.deref()).map_err(|e| HoprLibError::GeneralError(e.to_string()))
1527    }
1528}