hopr_lib/
lib.rs

1//! HOPR library creating a unified [`Hopr`] object that can be used on its own,
2//! as well as integrated into other systems and libraries.
3//!
4//! The [`Hopr`] object is standalone, meaning that once it is constructed and run,
5//! it will perform its functionality autonomously. The API it offers serves as a
6//! high-level integration point for other applications and utilities, but offers
7//! a complete and fully featured HOPR node stripped from top level functionality
8//! such as the REST API, key management...
9//!
10//! The intended way to use hopr_lib is for a specific tool to be built on top of it,
11//! should the default `hoprd` implementation not be acceptable.
12//!
13//! For most of the practical use cases, the `hoprd` application should be a preferable
14//! choice.
15
16/// Configuration-related public types
17pub mod config;
18/// Various public constants.
19pub mod constants;
20/// Lists all errors thrown from this library.
21pub mod errors;
22
23use async_lock::RwLock;
24use futures::{
25    channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
26    Stream, StreamExt,
27};
28use std::ops::Deref;
29use std::{
30    collections::HashMap,
31    fmt::{Display, Formatter},
32    path::PathBuf,
33    sync::{atomic::Ordering, Arc},
34    time::Duration,
35};
36use tracing::{debug, error, info, trace, warn};
37
38use errors::{HoprLibError, HoprStatusError};
39use hopr_async_runtime::prelude::{sleep, spawn, JoinHandle};
40use hopr_chain_actions::{
41    action_state::{ActionState, IndexerActionTracker},
42    channels::ChannelActions,
43    node::NodeActions,
44    redeem::TicketRedeemActions,
45};
46use hopr_chain_api::{
47    can_register_with_safe, config::ChainNetworkConfig, errors::HoprChainError, wait_for_funds, HoprChain,
48    HoprChainProcess, SignificantChainEvent,
49};
50use hopr_chain_rpc::HoprRpcOperations;
51use hopr_chain_types::chain_events::ChainEventType;
52use hopr_chain_types::ContractAddresses;
53use hopr_crypto_types::prelude::OffchainPublicKey;
54use hopr_db_api::logs::HoprDbLogOperations;
55use hopr_db_sql::{
56    accounts::HoprDbAccountOperations,
57    api::{info::SafeInfo, resolver::HoprDbResolverOperations, tickets::HoprDbTicketOperations},
58    channels::HoprDbChannelOperations,
59    db::{HoprDb, HoprDbConfig},
60    info::{HoprDbInfoOperations, IndexerStateInfo},
61    prelude::{ChainOrPacketKey::ChainKey, DbSqlError, HoprDbPeersOperations},
62    registry::HoprDbRegistryOperations,
63    HoprDbAllOperations, HoprDbGeneralModelOperations,
64};
65use hopr_path::channel_graph::{ChannelGraph, ChannelGraphConfig, NodeScoreUpdate};
66use hopr_platform::file::native::{join, remove_dir_all};
67use hopr_strategy::strategy::{MultiStrategy, SingularStrategy};
68use hopr_transport::{
69    execute_on_tick, ChainKeypair, Hash, HoprTransport, HoprTransportConfig, HoprTransportProcess, IncomingSession,
70    OffchainKeypair, PeerDiscovery, PeerStatus,
71};
72pub use {
73    hopr_chain_actions::errors::ChainActionsError,
74    hopr_chain_api::config::{
75        Addresses as NetworkContractAddresses, EnvironmentType, Network as ChainNetwork, ProtocolsConfig,
76    },
77    hopr_internal_types::prelude::*,
78    hopr_network_types::prelude::{IpProtocol, RoutingOptions},
79    hopr_path::channel_graph::GraphExportConfig,
80    hopr_primitive_types::prelude::*,
81    hopr_strategy::Strategy,
82    hopr_transport::{
83        config::{looks_like_domain, HostConfig, HostType},
84        constants::RESERVED_TAG_UPPER_LIMIT,
85        errors::{HoprTransportError, NetworkingError, ProtocolError},
86        ApplicationData, HalfKeyChallenge, Health, IncomingSession as HoprIncomingSession, Keypair, Multiaddr,
87        OffchainKeypair as HoprOffchainKeypair, PeerId, SendMsg, ServiceId, Session as HoprSession, SessionCapability,
88        SessionClientConfig, SessionId as HoprSessionId, SessionTarget, TicketStatistics, SESSION_USABLE_MTU_SIZE,
89    },
90};
91
92#[cfg(feature = "runtime-tokio")]
93pub use hopr_transport::transfer_session;
94
95use crate::config::SafeModule;
96use crate::constants::{MIN_NATIVE_BALANCE, ONBOARDING_INFORMATION_INTERVAL, SUGGESTED_NATIVE_BALANCE};
97
98#[cfg(all(feature = "prometheus", not(test)))]
99use {
100    hopr_metrics::metrics::{MultiGauge, SimpleGauge},
101    hopr_platform::time::native::current_time,
102    std::str::FromStr,
103};
104
105#[cfg(all(feature = "prometheus", not(test)))]
106lazy_static::lazy_static! {
107    static ref METRIC_PROCESS_START_TIME: SimpleGauge = SimpleGauge::new(
108        "hopr_up",
109        "The unix timestamp in seconds at which the process was started"
110    ).unwrap();
111    static ref METRIC_HOPR_LIB_VERSION: MultiGauge = MultiGauge::new(
112        "hopr_lib_version",
113        "Executed version of hopr-lib",
114        &["version"]
115    ).unwrap();
116    static ref METRIC_HOPR_NODE_INFO: MultiGauge = MultiGauge::new(
117        "hopr_node_addresses",
118        "Node on-chain and off-chain addresses",
119        &["peerid", "address", "safe_address", "module_address"]
120    ).unwrap();
121}
122
123pub use async_trait::async_trait;
124
125/// Interface representing the HOPR server behavior for each incoming session instance
126/// supplied as an argument.
127#[cfg(feature = "session-server")]
128#[async_trait::async_trait]
129pub trait HoprSessionReactor {
130    /// Fully process a single HOPR session
131    async fn process(&self, session: HoprIncomingSession) -> errors::Result<()>;
132}
133
134/// An enum representing the current state of the HOPR node
135#[atomic_enum::atomic_enum]
136#[derive(PartialEq, Eq)]
137pub enum HoprState {
138    Uninitialized = 0,
139    Initializing = 1,
140    Indexing = 2,
141    Starting = 3,
142    Running = 4,
143}
144
145impl Display for HoprState {
146    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
147        write!(f, "{:?}", self)
148    }
149}
150
151pub struct OpenChannelResult {
152    pub tx_hash: Hash,
153    pub channel_id: Hash,
154}
155
156pub struct CloseChannelResult {
157    pub tx_hash: Hash,
158    pub status: ChannelStatus,
159}
160
161/// Enum differentiator for loop component futures.
162///
163/// Used to differentiate the type of the future that exits the loop premateruly
164/// by tagging it as an enum.
165#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display)]
166pub enum HoprLibProcesses {
167    #[strum(to_string = "transport: {0}")]
168    Transport(HoprTransportProcess),
169    #[cfg(feature = "session-server")]
170    #[strum(to_string = "session server providing the exit node session stream functionality")]
171    SessionServer,
172    #[strum(to_string = "tick wake up the strategies to perform an action")]
173    StrategyTick,
174    #[strum(to_string = "initial indexing operation into the DB")]
175    Indexing,
176    #[strum(to_string = "processing of indexed operations in internal components")]
177    IndexReflection,
178    #[strum(to_string = "on-chain transaction queue component for outgoing transactions")]
179    OutgoingOnchainActionQueue,
180    #[strum(to_string = "flush operation of outgoing ticket indices to the DB")]
181    TicketIndexFlush,
182    #[strum(to_string = "on received ack ticket trigger")]
183    OnReceivedAcknowledgement,
184}
185
186impl HoprLibProcesses {
187    /// Identifies whether a loop is allowed to finish or should
188    /// run indefinitely.
189    pub fn can_finish(&self) -> bool {
190        matches!(self, HoprLibProcesses::Indexing)
191    }
192}
193
194impl From<HoprTransportProcess> for HoprLibProcesses {
195    fn from(value: HoprTransportProcess) -> Self {
196        HoprLibProcesses::Transport(value)
197    }
198}
199
200/// Creates a pipeline that chains the indexer-generated data, processes them into
201/// the individual components, and creates a filtered output stream that is fed into
202/// the transport layer swarm.
203///
204/// * `event_stream` - represents the events generated by the indexer.
205///   If the Indexer is not synced, it will not generate any events.
206/// * `preloading_event_stream` - a stream used by the components to preload the data from the objects (db, channel graph...)
207#[allow(clippy::too_many_arguments)]
208pub async fn chain_events_to_transport_events<StreamIn, Db>(
209    event_stream: StreamIn,
210    me_onchain: Address,
211    db: Db,
212    multi_strategy: Arc<MultiStrategy>,
213    channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
214    indexer_action_tracker: Arc<IndexerActionTracker>,
215) -> impl Stream<Item = PeerDiscovery> + Send + 'static
216where
217    Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
218    StreamIn: Stream<Item = SignificantChainEvent> + Send + 'static,
219{
220    Box::pin(event_stream.filter_map(move |event| {
221        let db = db.clone();
222        let multi_strategy = multi_strategy.clone();
223        let channel_graph = channel_graph.clone();
224        let indexer_action_tracker = indexer_action_tracker.clone();
225
226        async move {
227            let resolved = indexer_action_tracker.match_and_resolve(&event).await;
228            debug!(count = resolved.len(), event = %event, "resolved indexer expectations", );
229
230            match event.event_type {
231                ChainEventType::Announcement{peer, multiaddresses, ..} => {
232                    Some(PeerDiscovery::Announce(peer, multiaddresses))
233                }
234                ChainEventType::ChannelOpened(channel) |
235                ChainEventType::ChannelClosureInitiated(channel) |
236                ChainEventType::ChannelClosed(channel) |
237                ChainEventType::ChannelBalanceIncreased(channel, _) | // needed ?
238                ChainEventType::ChannelBalanceDecreased(channel, _) | // needed ?
239                ChainEventType::TicketRedeemed(channel, _) => {   // needed ?
240                    let maybe_direction = channel.direction(&me_onchain);
241
242                    let change = channel_graph
243                        .write()
244                        .await
245                        .update_channel(channel);
246
247                    // Check if this is our own channel
248                    if let Some(own_channel_direction) = maybe_direction {
249                        if let Some(change_set) = change {
250                            for channel_change in change_set {
251                                let _ = hopr_strategy::strategy::SingularStrategy::on_own_channel_changed(
252                                    &*multi_strategy,
253                                    &channel,
254                                    own_channel_direction,
255                                    channel_change,
256                                )
257                                .await;
258                            }
259                        } else if channel.status == ChannelStatus::Open {
260                            // Emit Opening event if the channel did not exist before in the graph
261                            let _ = hopr_strategy::strategy::SingularStrategy::on_own_channel_changed(
262                                &*multi_strategy,
263                                &channel,
264                                own_channel_direction,
265                                ChannelChange::Status {
266                                    left: ChannelStatus::Closed,
267                                    right: ChannelStatus::Open,
268                                },
269                            )
270                            .await;
271                        }
272                    }
273
274                    None
275                }
276                ChainEventType::NetworkRegistryUpdate(address, allowed) => {
277                    let packet_key = db.translate_key(None, address).await;
278                    match packet_key {
279                        Ok(pk) => {
280                            if let Some(pk) = pk {
281                                let offchain_key: Result<OffchainPublicKey, _> = pk.try_into();
282
283                                if let Ok(offchain_key) = offchain_key {
284                                    let peer_id = offchain_key.into();
285
286                                    let res = match allowed {
287                                        hopr_chain_types::chain_events::NetworkRegistryStatus::Allowed => PeerDiscovery::Allow(peer_id),
288                                        hopr_chain_types::chain_events::NetworkRegistryStatus::Denied => PeerDiscovery::Ban(peer_id),
289                                    };
290
291                                    Some(res)
292                                } else {
293                                    error!("Failed to unwrap as offchain key at this point");
294                                    None
295                                }
296                            } else {
297                                None
298                            }
299                        }
300                        Err(e) => {
301                            error!(error = %e, "on_network_registry_node_allowed failed");
302                            None
303                        },
304                    }
305                }
306                ChainEventType::NodeSafeRegistered(safe_address) =>  {
307                    info!(%safe_address, "Node safe registered");
308                    None
309                }
310            }
311        }
312    }))
313}
314
315/// Represents the socket behavior of the hopr-lib spawned [`Hopr`] object.
316///
317/// Provides a read and write stream for Hopr socket recognized data formats.
318pub struct HoprSocket {
319    rx: UnboundedReceiver<ApplicationData>,
320    tx: UnboundedSender<ApplicationData>,
321}
322
323impl Default for HoprSocket {
324    fn default() -> Self {
325        let (tx, rx) = unbounded::<ApplicationData>();
326        Self { rx, tx }
327    }
328}
329
330impl HoprSocket {
331    pub fn new() -> Self {
332        Self::default()
333    }
334
335    pub fn reader(self) -> UnboundedReceiver<ApplicationData> {
336        self.rx
337    }
338
339    pub fn writer(&self) -> UnboundedSender<ApplicationData> {
340        self.tx.clone()
341    }
342}
343
344/// HOPR main object providing the entire HOPR node functionality
345///
346/// Instantiating this object creates all processes and objects necessary for
347/// running the HOPR node. Once created, the node can be started using the
348/// `run()` method.
349///
350/// Externally offered API should be sufficient to perform all necessary tasks
351/// with the HOPR node manually, but it is advised to create such a configuration
352/// that manual interaction is unnecessary.
353///
354/// As such, the `hopr_lib` serves mainly as an integration point into Rust programs.
355pub struct Hopr {
356    me: OffchainKeypair,
357    me_chain: ChainKeypair,
358    cfg: config::HoprLibConfig,
359    state: Arc<AtomicHoprState>,
360    transport_api: HoprTransport<HoprDb>,
361    hopr_chain_api: HoprChain<HoprDb>,
362    // objects that could be removed pending architectural cleanup ========
363    db: HoprDb,
364    chain_cfg: ChainNetworkConfig,
365    channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
366    multistrategy: Arc<MultiStrategy>,
367    rx_indexer_significant_events: async_channel::Receiver<SignificantChainEvent>,
368}
369
370impl Hopr {
371    pub fn new(
372        mut cfg: config::HoprLibConfig,
373        me: &OffchainKeypair,
374        me_onchain: &ChainKeypair,
375    ) -> crate::errors::Result<Self> {
376        let multiaddress: Multiaddr = (&cfg.host).try_into()?;
377
378        let db_path: PathBuf = [&cfg.db.data, "db"].iter().collect();
379        info!(path = ?db_path, "Initiating DB");
380
381        if cfg.db.force_initialize {
382            info!("Force cleaning up existing database");
383            remove_dir_all(db_path.as_path()).map_err(|e| {
384                HoprLibError::GeneralError(format!(
385                    "Failed to remove the existing DB directory at '{db_path:?}': {e}"
386                ))
387            })?;
388            cfg.db.initialize = true
389        }
390
391        // create DB dir if it does not exist
392        if let Some(parent_dir_path) = db_path.as_path().parent() {
393            if !parent_dir_path.is_dir() {
394                std::fs::create_dir_all(parent_dir_path).map_err(|e| {
395                    HoprLibError::GeneralError(format!(
396                        "Failed to create DB parent directory at '{parent_dir_path:?}': {e}"
397                    ))
398                })?
399            }
400        }
401
402        let db_cfg = HoprDbConfig {
403            create_if_missing: cfg.db.initialize,
404            force_create: cfg.db.force_initialize,
405            log_slow_queries: std::time::Duration::from_millis(150),
406        };
407        let db = futures::executor::block_on(HoprDb::new(db_path.as_path(), me_onchain.clone(), db_cfg))?;
408
409        if let Some(provider) = &cfg.chain.provider {
410            info!(provider, "Creating chain components using the custom provider");
411        } else {
412            info!("Creating chain components using the default provider");
413        }
414        let resolved_environment = hopr_chain_api::config::ChainNetworkConfig::new(
415            &cfg.chain.network,
416            crate::constants::APP_VERSION_COERCED,
417            cfg.chain.provider.as_deref(),
418            cfg.chain.max_rpc_requests_per_sec,
419            &mut cfg.chain.protocols,
420        )
421        .map_err(|e| HoprLibError::GeneralError(format!("Failed to resolve blockchain environment: {e}")))?;
422
423        let contract_addresses = ContractAddresses::from(&resolved_environment);
424        info!(
425            myself = me_onchain.public().to_hex(),
426            contract_addresses = ?contract_addresses,
427            "Resolved contract addresses",
428        );
429
430        let my_multiaddresses = vec![multiaddress];
431
432        let (tx_indexer_events, rx_indexer_events) = async_channel::unbounded::<SignificantChainEvent>();
433
434        let channel_graph = Arc::new(RwLock::new(ChannelGraph::new(
435            me_onchain.public().to_address(),
436            ChannelGraphConfig::default(),
437        )));
438
439        let hopr_transport_api = HoprTransport::new(
440            me,
441            me_onchain,
442            HoprTransportConfig {
443                transport: cfg.transport.clone(),
444                network: cfg.network_options.clone(),
445                protocol: cfg.protocol,
446                heartbeat: cfg.heartbeat,
447                session: cfg.session,
448            },
449            db.clone(),
450            channel_graph.clone(),
451            my_multiaddresses,
452        );
453
454        let hopr_hopr_chain_api = hopr_chain_api::HoprChain::new(
455            me_onchain.clone(),
456            db.clone(),
457            resolved_environment.clone(),
458            cfg.safe_module.module_address,
459            ContractAddresses {
460                announcements: resolved_environment.announcements,
461                channels: resolved_environment.channels,
462                token: resolved_environment.token,
463                price_oracle: resolved_environment.ticket_price_oracle,
464                win_prob_oracle: resolved_environment.winning_probability_oracle,
465                network_registry: resolved_environment.network_registry,
466                network_registry_proxy: resolved_environment.network_registry_proxy,
467                stake_factory: resolved_environment.node_stake_v2_factory,
468                safe_registry: resolved_environment.node_safe_registry,
469                module_implementation: resolved_environment.module_implementation,
470            },
471            cfg.safe_module.safe_address,
472            hopr_chain_indexer::IndexerConfig {
473                start_block_number: resolved_environment.channel_contract_deploy_block as u64,
474                fast_sync: cfg.chain.fast_sync,
475            },
476            tx_indexer_events,
477        );
478
479        let multi_strategy = Arc::new(MultiStrategy::new(
480            cfg.strategy.clone(),
481            db.clone(),
482            hopr_hopr_chain_api.actions_ref().clone(),
483            hopr_transport_api.ticket_aggregator(),
484        ));
485        debug!(
486            strategies = tracing::field::debug(&multi_strategy),
487            "Initialized strategies"
488        );
489
490        #[cfg(all(feature = "prometheus", not(test)))]
491        {
492            METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
493            METRIC_HOPR_LIB_VERSION.set(
494                &[const_format::formatcp!("{}", constants::APP_VERSION)],
495                f64::from_str(const_format::formatcp!(
496                    "{}.{}",
497                    env!("CARGO_PKG_VERSION_MAJOR"),
498                    env!("CARGO_PKG_VERSION_MINOR")
499                ))
500                .unwrap_or(0.0),
501            );
502
503            // Calling get_ticket_statistics will initialize the respective metrics on tickets
504            if let Err(e) = futures::executor::block_on(db.get_ticket_statistics(None)) {
505                error!(error = %e, "Failed to initialize ticket statistics metrics");
506            }
507        }
508
509        Ok(Self {
510            me: me.clone(),
511            me_chain: me_onchain.clone(),
512            cfg,
513            state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
514            transport_api: hopr_transport_api,
515            hopr_chain_api: hopr_hopr_chain_api,
516            db,
517            chain_cfg: resolved_environment,
518            channel_graph,
519            multistrategy: multi_strategy,
520            rx_indexer_significant_events: rx_indexer_events,
521        })
522    }
523
524    fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
525        if self.status() == state {
526            Ok(())
527        } else {
528            Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
529        }
530    }
531
532    pub fn status(&self) -> HoprState {
533        self.state.load(Ordering::Relaxed)
534    }
535
536    pub fn version_coerced(&self) -> String {
537        String::from(constants::APP_VERSION_COERCED)
538    }
539
540    pub fn version(&self) -> String {
541        String::from(constants::APP_VERSION)
542    }
543
544    pub fn network(&self) -> String {
545        self.cfg.chain.network.clone()
546    }
547
548    pub async fn get_balance(&self, balance_type: BalanceType) -> errors::Result<Balance> {
549        Ok(self.hopr_chain_api.get_balance(balance_type).await?)
550    }
551
552    pub async fn get_eligibility_status(&self) -> errors::Result<bool> {
553        Ok(self.hopr_chain_api.get_eligibility_status().await?)
554    }
555
556    pub async fn get_safe_balance(&self, balance_type: BalanceType) -> errors::Result<Balance> {
557        let safe_balance = self
558            .hopr_chain_api
559            .get_safe_balance(self.cfg.safe_module.safe_address, balance_type)
560            .await?;
561
562        if balance_type == BalanceType::HOPR {
563            let my_db = self.db.clone();
564            self.db
565                .begin_transaction()
566                .await?
567                .perform(|tx| {
568                    Box::pin(async move {
569                        let db_safe_balance = my_db.get_safe_hopr_balance(Some(tx)).await?;
570                        if safe_balance != db_safe_balance {
571                            warn!(
572                                %db_safe_balance,
573                                %safe_balance,
574                                "Safe balance in the DB mismatches on chain balance"
575                            );
576                            my_db.set_safe_hopr_balance(Some(tx), safe_balance).await?;
577                        }
578                        Ok::<_, DbSqlError>(())
579                    })
580                })
581                .await?;
582        }
583        Ok(safe_balance)
584    }
585
586    pub fn get_safe_config(&self) -> SafeModule {
587        self.cfg.safe_module.clone()
588    }
589
590    pub fn chain_config(&self) -> ChainNetworkConfig {
591        self.chain_cfg.clone()
592    }
593
594    pub fn get_provider(&self) -> String {
595        self.cfg
596            .chain
597            .provider
598            .clone()
599            .unwrap_or(self.chain_cfg.chain.default_provider.clone())
600    }
601
602    #[inline]
603    fn is_public(&self) -> bool {
604        self.cfg.chain.announce
605    }
606
607    pub async fn run<#[cfg(feature = "session-server")] T: HoprSessionReactor + Clone + Send + 'static>(
608        &self,
609        #[cfg(feature = "session-server")] serve_handler: T,
610    ) -> errors::Result<(HoprSocket, HashMap<HoprLibProcesses, JoinHandle<()>>)> {
611        self.error_if_not_in_state(
612            HoprState::Uninitialized,
613            "Cannot start the hopr node multiple times".into(),
614        )?;
615
616        info!(
617            address = %self.me_onchain(), minimum_balance = %Balance::new_from_str(SUGGESTED_NATIVE_BALANCE, BalanceType::Native),
618            "Node is not started, please fund this node",
619        );
620
621        let mut processes: HashMap<HoprLibProcesses, JoinHandle<()>> = HashMap::new();
622
623        wait_for_funds(
624            self.me_onchain(),
625            Balance::new_from_str(MIN_NATIVE_BALANCE, BalanceType::Native),
626            Duration::from_secs(200),
627            self.hopr_chain_api.rpc(),
628        )
629        .await?;
630
631        info!("Starting the node...");
632
633        self.state.store(HoprState::Initializing, Ordering::Relaxed);
634
635        let balance = self.get_balance(BalanceType::Native).await?;
636
637        let minimum_balance = Balance::new_from_str(constants::MIN_NATIVE_BALANCE, BalanceType::Native);
638
639        info!(
640            address = %self.hopr_chain_api.me_onchain(),
641            %balance,
642            %minimum_balance,
643            "Node information"
644        );
645
646        if balance.le(&minimum_balance) {
647            return Err(HoprLibError::GeneralError(
648                "Cannot start the node without a sufficiently funded wallet".to_string(),
649            ));
650        }
651
652        // Once we are able to query the chain,
653        // check if the ticket price is configured correctly.
654        let network_min_ticket_price = self.hopr_chain_api.get_minimum_ticket_price().await?;
655
656        let configured_ticket_price = self.cfg.protocol.outgoing_ticket_price;
657        if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
658            return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
659                "configured outgoing ticket price is lower than the network minimum ticket price: {configured_ticket_price:?} < {network_min_ticket_price}"
660            ))));
661        }
662
663        // Once we are able to query the chain,
664        // check if the winning probability is configured correctly.
665        let network_min_win_prob = self.hopr_chain_api.get_minimum_winning_probability().await?;
666        let configured_win_prob = self.cfg.protocol.outgoing_ticket_winning_prob;
667        if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
668            && configured_win_prob.is_some_and(|c| c < network_min_win_prob)
669        {
670            return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
671                "configured outgoing ticket winning probability is lower than the network minimum winning probability: {configured_win_prob:?} < {network_min_win_prob}"
672            ))));
673        }
674
675        info!("Linking chain and packet keys");
676        self.db
677            .insert_account(
678                None,
679                AccountEntry {
680                    public_key: *self.me.public(),
681                    chain_addr: self.hopr_chain_api.me_onchain(),
682                    // Will be set once we announce ourselves and Indexer processes the announcement
683                    entry_type: AccountType::NotAnnounced,
684                },
685            )
686            .await?;
687
688        self.state.store(HoprState::Indexing, Ordering::Relaxed);
689
690        let (indexer_peer_update_tx, indexer_peer_update_rx) = futures::channel::mpsc::unbounded::<PeerDiscovery>();
691
692        let indexer_event_pipeline = chain_events_to_transport_events(
693            self.rx_indexer_significant_events.clone(),
694            self.me_onchain(),
695            self.db.clone(),
696            self.multistrategy.clone(),
697            self.channel_graph.clone(),
698            self.hopr_chain_api.action_state(),
699        )
700        .await;
701
702        // terminated once all senders are dropped and no items in the receiver remain
703        spawn(async move {
704            indexer_event_pipeline
705                .map(Ok)
706                .forward(indexer_peer_update_tx)
707                .await
708                .expect("The index to transport event chain failed");
709        });
710
711        info!("Start the chain process and sync the indexer");
712        for (id, proc) in self.hopr_chain_api.start().await?.into_iter() {
713            let nid = match id {
714                HoprChainProcess::Indexer => HoprLibProcesses::Indexing,
715                HoprChainProcess::OutgoingOnchainActionQueue => HoprLibProcesses::OutgoingOnchainActionQueue,
716            };
717            processes.insert(nid, proc);
718        }
719
720        {
721            // Show onboarding information
722            let my_ethereum_address = self.me_onchain();
723            let my_peer_id = self.me_peer_id();
724            let my_version = crate::constants::APP_VERSION;
725
726            while !self
727                .db
728                .clone()
729                .is_allowed_in_network_registry(None, &my_ethereum_address)
730                .await
731                .unwrap_or(false)
732            {
733                info!("Once you become eligible to join the HOPR network, you can continue your onboarding by using the following URL: https://hub.hoprnet.org/staking/onboarding?HOPRdNodeAddressForOnboarding={}, or by manually entering the node address of your node on https://hub.hoprnet.org/.", my_ethereum_address.to_hex());
734
735                sleep(ONBOARDING_INFORMATION_INTERVAL).await;
736
737                info!(peer_id = %my_peer_id, address = %my_ethereum_address.to_hex(), version = &my_version, "Node information");
738                info!("Node Ethereum address: {my_ethereum_address} <- put this into staking hub");
739            }
740        }
741
742        // Check Safe-module status:
743        // 1) if the node is already included into the module
744        // 2) if the module is enabled in the safe
745        // 3) if the safe is the owner of the module
746        // if any of the conditions is not met, return error
747        let safe_module_configuration = self
748            .hopr_chain_api
749            .rpc()
750            .check_node_safe_module_status(self.me_onchain())
751            .await
752            .map_err(HoprChainError::Rpc)?;
753
754        if !safe_module_configuration.should_pass() {
755            error!(
756                ?safe_module_configuration,
757                "Something is wrong with the safe module configuration",
758            );
759            return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
760                "Safe and module are not configured correctly {:?}",
761                safe_module_configuration,
762            ))));
763        }
764
765        // Possibly register node-safe pair to NodeSafeRegistry. Following that the
766        // connector is set to use safe tx variants.
767        if can_register_with_safe(
768            self.me_onchain(),
769            self.cfg.safe_module.safe_address,
770            self.hopr_chain_api.rpc(),
771        )
772        .await?
773        {
774            info!("Registering safe by node");
775
776            if self.me_onchain() == self.cfg.safe_module.safe_address {
777                return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
778            }
779
780            if let Err(e) = self
781                .hopr_chain_api
782                .actions_ref()
783                .register_safe_by_node(self.cfg.safe_module.safe_address)
784                .await?
785                .await
786            {
787                // Intentionally ignoring the errored state
788                error!(error = %e, "Failed to register node with safe")
789            }
790        }
791
792        self.db
793            .set_safe_info(
794                None,
795                SafeInfo {
796                    safe_address: self.cfg.safe_module.safe_address,
797                    module_address: self.cfg.safe_module.module_address,
798                },
799            )
800            .await?;
801
802        if self.is_public() {
803            // At this point the node is already registered with Safe, so
804            // we can announce via Safe-compliant TX
805
806            let multiaddresses_to_announce = self.transport_api.announceable_multiaddresses();
807
808            // The announcement is intentionally not awaited until confirmation
809            match self
810                .hopr_chain_api
811                .actions_ref()
812                .announce(&multiaddresses_to_announce, &self.me)
813                .await
814            {
815                Ok(_) => info!(?multiaddresses_to_announce, "Announcing node on chain",),
816                Err(ChainActionsError::AlreadyAnnounced) => {
817                    info!(multiaddresses_announced = ?multiaddresses_to_announce, "Node already announced on chain", )
818                }
819                // If the announcement fails, we keep going to prevent the node from retrying
820                // after restart.
821                // Functionality is limited, and users must check the logs for errors.
822                Err(e) => error!(error = %e, "Failed to transmit node announcement"),
823            }
824        }
825
826        {
827            let channel_graph = self.channel_graph.clone();
828            let mut cg = channel_graph.write().await;
829
830            info!("Syncing channels from the previous runs");
831            let mut channel_stream = self
832                .db
833                .stream_active_channels()
834                .await
835                .map_err(hopr_db_sql::api::errors::DbError::from)?;
836
837            while let Some(maybe_channel) = channel_stream.next().await {
838                match maybe_channel {
839                    Ok(channel) => {
840                        cg.update_channel(channel);
841                    }
842                    Err(error) => error!(%error, "Failed to sync channel into the graph"),
843                }
844            }
845
846            // Initialize node latencies and scores in the channel graph:
847            // Sync only those nodes that we know that had a good quality
848            // Other nodes will be repopulated into the channel graph during heartbeat
849            // rounds.
850            info!("Syncing peer qualities from the previous runs");
851            let min_quality_to_sync: f64 = std::env::var("HOPR_MIN_PEER_QUALITY_TO_SYNC")
852                .map_err(|e| e.to_string())
853                .and_then(|v| std::str::FromStr::from_str(&v).map_err(|_| "parse error".to_string()))
854                .unwrap_or_else(|error| {
855                    warn!(error, "invalid value for HOPR_MIN_PEER_QUALITY_TO_SYNC env variable");
856                    constants::DEFAULT_MIN_QUALITY_TO_SYNC
857                });
858
859            let mut peer_stream = self
860                .db
861                .get_network_peers(Default::default(), false)
862                .await?
863                .filter(|status| futures::future::ready(status.quality >= min_quality_to_sync));
864
865            while let Some(peer) = peer_stream.next().await {
866                if let Some(ChainKey(key)) = self.db.translate_key(None, peer.id.0).await? {
867                    // For nodes that had a good quality, we assign a perfect score
868                    cg.update_node_score(&key, NodeScoreUpdate::Initialize(peer.last_seen_latency, 1.0));
869                } else {
870                    error!(peer = %peer.id.1, "Could not translate peer information");
871                }
872            }
873
874            info!(
875                channels = cg.count_channels(),
876                nodes = cg.count_nodes(),
877                "Channel graph sync complete"
878            );
879        }
880
881        let socket = HoprSocket::new();
882        let transport_output_tx = socket.writer();
883
884        // notifier on acknowledged ticket reception
885        let multi_strategy_ack_ticket = self.multistrategy.clone();
886        let (on_ack_tkt_tx, mut on_ack_tkt_rx) = unbounded::<AcknowledgedTicket>();
887        self.db.start_ticket_processing(Some(on_ack_tkt_tx))?;
888        processes.insert(
889            HoprLibProcesses::OnReceivedAcknowledgement,
890            spawn(async move {
891                while let Some(ack) = on_ack_tkt_rx.next().await {
892                    if let Err(error) = hopr_strategy::strategy::SingularStrategy::on_acknowledged_winning_ticket(
893                        &*multi_strategy_ack_ticket,
894                        &ack,
895                    )
896                    .await
897                    {
898                        error!(%error, "Failed to process acknowledged winning ticket with the strategy");
899                    }
900                }
901            }),
902        );
903
904        let (session_tx, _session_rx) = unbounded::<IncomingSession>();
905
906        #[cfg(feature = "session-server")]
907        {
908            processes.insert(
909                HoprLibProcesses::SessionServer,
910                spawn(_session_rx.for_each_concurrent(None, move |session| {
911                    let serve_handler = serve_handler.clone();
912                    async move {
913                        let session_id = *session.session.id();
914                        match serve_handler.process(session).await {
915                            Ok(_) => debug!(
916                                session_id = ?session_id,
917                                "Client session processed successfully"
918                            ),
919                            Err(e) => error!(
920                                session_id = ?session_id,
921                                error = %e,
922                                "Client session processing failed"
923                            ),
924                        }
925                    }
926                })),
927            );
928        }
929
930        for (id, proc) in self
931            .transport_api
932            .run(
933                &self.me_chain,
934                String::from(constants::APP_VERSION),
935                join(&[&self.cfg.db.data, "tbf"])
936                    .map_err(|e| HoprLibError::GeneralError(format!("Failed to construct the bloom filter: {e}")))?,
937                transport_output_tx,
938                indexer_peer_update_rx,
939                session_tx,
940            )
941            .await?
942            .into_iter()
943        {
944            processes.insert(id.into(), proc);
945        }
946
947        let db_clone = self.db.clone();
948        processes.insert(
949            HoprLibProcesses::TicketIndexFlush,
950            spawn(Box::pin(execute_on_tick(
951                Duration::from_secs(5),
952                move || {
953                    let db_clone = db_clone.clone();
954                    async move {
955                        match db_clone.persist_outgoing_ticket_indices().await {
956                            Ok(n) => debug!(count = n, "Successfully flushed states of outgoing ticket indices"),
957                            Err(e) => error!(error = %e, "Failed to flush ticket indices"),
958                        }
959                    }
960                },
961                "flush the states of outgoing ticket indices".into(),
962            ))),
963        );
964
965        // NOTE: after the chain is synched we can reset tickets which are considered
966        // redeemed but on-chain state does not align with that. This implies there was a problem
967        // right when the transaction was sent on-chain. In such cases we simply let it retry and
968        // handle errors appropriately.
969        if let Err(e) = self.db.fix_channels_next_ticket_state().await {
970            error!(error = %e, "failed to fix channels ticket states");
971        }
972
973        // NOTE: strategy ticks must start after the chain is synced, otherwise
974        // the strategy would react to historical data and drain through the native
975        // balance on chain operations not relevant for the present network state
976        let multi_strategy = self.multistrategy.clone();
977        let strategy_interval = self.cfg.strategy.execution_interval;
978        processes.insert(
979            HoprLibProcesses::StrategyTick,
980            spawn(async move {
981                execute_on_tick(
982                    Duration::from_secs(strategy_interval),
983                    move || {
984                        let multi_strategy = multi_strategy.clone();
985
986                        async move {
987                            trace!(state = "started", "strategy tick");
988                            let _ = multi_strategy.on_tick().await;
989                            trace!(state = "finished", "strategy tick");
990                        }
991                    },
992                    "run strategies".into(),
993                )
994                .await;
995            }),
996        );
997
998        self.state.store(HoprState::Running, Ordering::Relaxed);
999
1000        info!(
1001            id = %self.me_peer_id(),
1002            version = constants::APP_VERSION,
1003            "NODE STARTED AND RUNNING"
1004        );
1005
1006        #[cfg(all(feature = "prometheus", not(test)))]
1007        METRIC_HOPR_NODE_INFO.set(
1008            &[
1009                &self.me.public().to_peerid_str(),
1010                &self.me_onchain().to_string(),
1011                &self.cfg.safe_module.safe_address.to_string(),
1012                &self.cfg.safe_module.module_address.to_string(),
1013            ],
1014            1.0,
1015        );
1016
1017        Ok((socket, processes))
1018    }
1019
1020    // p2p transport =========
1021    /// Own PeerId used in the libp2p transport layer
1022    pub fn me_peer_id(&self) -> PeerId {
1023        (*self.me.public()).into()
1024    }
1025
1026    /// Get the list of all announced public nodes in the network
1027    pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
1028        Ok(self.transport_api.get_public_nodes().await?)
1029    }
1030
1031    /// Returns the most recently indexed log, if any.
1032    pub async fn get_indexer_state(&self) -> errors::Result<IndexerStateInfo> {
1033        let indexer_state_info = self.db.get_indexer_state_info(None).await?;
1034
1035        match self.db.get_last_checksummed_log().await? {
1036            Some(log) => {
1037                let checksum = match log.checksum {
1038                    Some(checksum) => Hash::from_hex(checksum.as_str())?,
1039                    None => Hash::default(),
1040                };
1041                Ok(IndexerStateInfo {
1042                    latest_log_block_number: log.block_number as u32,
1043                    latest_log_checksum: checksum,
1044                    ..indexer_state_info
1045                })
1046            }
1047            None => Ok(indexer_state_info),
1048        }
1049    }
1050
1051    /// Test whether the peer with PeerId is allowed to access the network
1052    pub async fn is_allowed_to_access_network(
1053        &self,
1054        address_like: either::Either<&PeerId, Address>,
1055    ) -> errors::Result<bool> {
1056        Ok(self.transport_api.is_allowed_to_access_network(address_like).await?)
1057    }
1058
1059    /// Ping another node in the network based on the PeerId
1060    ///
1061    /// Returns the RTT (round trip time), i.e. how long it took for the ping to return.
1062    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
1063        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1064
1065        Ok(self.transport_api.ping(peer).await?)
1066    }
1067
1068    /// Create a client session connection returning a session object that implements
1069    /// [`AsyncRead`] and [`AsyncWrite`] and can bu used as a read/write binary session.
1070    #[cfg(feature = "session-client")]
1071    pub async fn connect_to(&self, cfg: SessionClientConfig) -> errors::Result<HoprSession> {
1072        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1073
1074        let backoff = backon::ConstantBuilder::default()
1075            .with_max_times(self.cfg.session.establish_max_retries as usize)
1076            .with_delay(self.cfg.session.establish_retry_timeout)
1077            .with_jitter();
1078
1079        struct Sleeper;
1080        impl backon::Sleeper for Sleeper {
1081            type Sleep = futures_timer::Delay;
1082
1083            fn sleep(&self, dur: Duration) -> Self::Sleep {
1084                futures_timer::Delay::new(dur)
1085            }
1086        }
1087
1088        use backon::Retryable;
1089
1090        Ok((|| {
1091            let cfg = cfg.clone();
1092            async { self.transport_api.new_session(cfg).await }
1093        })
1094        .retry(backoff)
1095        .sleep(Sleeper)
1096        .await?)
1097    }
1098
1099    /// Send a message to another peer in the network
1100    ///
1101    /// @param msg message to send
1102    /// @param destination PeerId of the destination
1103    /// @param options optional configuration of the message path using hops and intermediatePath
1104    /// @param applicationTag optional tag identifying the sending application
1105    /// @returns ack challenge
1106    #[tracing::instrument(level = "debug", skip(self, msg))]
1107    pub async fn send_message(
1108        &self,
1109        msg: Box<[u8]>,
1110        destination: PeerId,
1111        options: RoutingOptions,
1112        application_tag: Option<u16>,
1113    ) -> errors::Result<()> {
1114        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1115
1116        self.transport_api
1117            .send_message(msg, destination, options, application_tag)
1118            .await?;
1119
1120        Ok(())
1121    }
1122
1123    /// Attempts to aggregate all tickets in the given channel
1124    pub async fn aggregate_tickets(&self, channel: &Hash) -> errors::Result<()> {
1125        Ok(self.transport_api.aggregate_tickets(channel).await?)
1126    }
1127
1128    /// List all multiaddresses announced by this node
1129    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1130        self.transport_api.local_multiaddresses()
1131    }
1132
1133    /// List all multiaddresses on which the node is listening
1134    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1135        self.transport_api.listening_multiaddresses().await
1136    }
1137
1138    /// List all multiaddresses observed for a PeerId
1139    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1140        self.transport_api.network_observed_multiaddresses(peer).await
1141    }
1142
1143    /// List all multiaddresses announced on-chain for the given node.
1144    pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Vec<Multiaddr> {
1145        let key = match OffchainPublicKey::try_from(peer) {
1146            Ok(k) => k,
1147            Err(e) => {
1148                error!(%peer, error = %e, "failed to convert peer id to off-chain key");
1149                return vec![];
1150            }
1151        };
1152
1153        match self.db.get_account(None, key).await {
1154            Ok(Some(entry)) => Vec::from_iter(entry.get_multiaddr()),
1155            Ok(None) => {
1156                error!(%peer, "no information");
1157                vec![]
1158            }
1159            Err(e) => {
1160                error!(%peer, error = %e, "failed to retrieve information");
1161                vec![]
1162            }
1163        }
1164    }
1165
1166    // Network =========
1167
1168    /// Get measured network health
1169    pub async fn network_health(&self) -> Health {
1170        self.transport_api.network_health().await
1171    }
1172
1173    /// List all peers connected to this
1174    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
1175        Ok(self.transport_api.network_connected_peers().await?)
1176    }
1177
1178    /// Get all data collected from the network relevant for a PeerId
1179    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<hopr_transport::PeerStatus>> {
1180        Ok(self.transport_api.network_peer_info(peer).await?)
1181    }
1182
1183    /// Get peers connected peers with quality higher than some value
1184    pub async fn all_network_peers(
1185        &self,
1186        minimum_quality: f64,
1187    ) -> errors::Result<Vec<(Option<Address>, PeerId, hopr_transport::PeerStatus)>> {
1188        Ok(
1189            futures::stream::iter(self.transport_api.network_connected_peers().await?)
1190                .filter_map(|peer| async move {
1191                    if let Ok(Some(info)) = self.transport_api.network_peer_info(&peer).await {
1192                        if info.get_average_quality() >= minimum_quality {
1193                            Some((peer, info))
1194                        } else {
1195                            None
1196                        }
1197                    } else {
1198                        None
1199                    }
1200                })
1201                .filter_map(|(peer_id, info)| async move {
1202                    let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
1203                    Some((address, peer_id, info))
1204                })
1205                .collect::<Vec<_>>()
1206                .await,
1207        )
1208    }
1209
1210    // Ticket ========
1211    /// Get all tickets in a channel specified by Hash
1212    pub async fn tickets_in_channel(&self, channel: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
1213        Ok(self.transport_api.tickets_in_channel(channel).await?)
1214    }
1215
1216    /// Get all tickets
1217    pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
1218        Ok(self.transport_api.all_tickets().await?)
1219    }
1220
1221    /// Get statistics for all tickets
1222    pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
1223        Ok(self.transport_api.ticket_statistics().await?)
1224    }
1225
1226    /// Reset the ticket metrics to zero
1227    pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
1228        Ok(self.db.reset_ticket_statistics().await?)
1229    }
1230
1231    // DB ============
1232    pub fn peer_resolver(&self) -> &impl HoprDbResolverOperations {
1233        &self.db
1234    }
1235
1236    // Chain =========
1237    pub fn me_onchain(&self) -> Address {
1238        self.hopr_chain_api.me_onchain()
1239    }
1240
1241    /// Get ticket price
1242    pub async fn get_ticket_price(&self) -> errors::Result<Option<U256>> {
1243        Ok(self.hopr_chain_api.ticket_price().await?)
1244    }
1245
1246    /// Get minimum incoming ticket winning probability
1247    pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<f64> {
1248        Ok(self
1249            .db
1250            .get_indexer_data(None)
1251            .await?
1252            .minimum_incoming_ticket_winning_prob)
1253    }
1254
1255    /// List of all accounts announced on the chain
1256    pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1257        Ok(self.db.get_accounts(None, false).await?)
1258    }
1259
1260    /// Get the channel entry from Hash.
1261    /// @returns the channel entry of those two nodes
1262    pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1263        Ok(self.db.get_channel_by_id(None, channel_id).await?)
1264    }
1265
1266    /// Get the channel entry between source and destination node.
1267    /// @param src Address
1268    /// @param dest Address
1269    /// @returns the channel entry of those two nodes
1270    pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<ChannelEntry> {
1271        Ok(self.hopr_chain_api.channel(src, dest).await?)
1272    }
1273
1274    /// List all channels open from a specified Address
1275    pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1276        Ok(self.hopr_chain_api.channels_from(src).await?)
1277    }
1278
1279    /// List all channels open to a specified address
1280    pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1281        Ok(self.hopr_chain_api.channels_to(dest).await?)
1282    }
1283
1284    /// List all channels
1285    pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1286        Ok(self.hopr_chain_api.all_channels().await?)
1287    }
1288
1289    /// Current safe allowance balance
1290    pub async fn safe_allowance(&self) -> errors::Result<Balance> {
1291        Ok(self.hopr_chain_api.safe_allowance().await?)
1292    }
1293
1294    /// Withdraw on-chain assets to a given address
1295    /// @param recipient the account where the assets should be transferred to
1296    /// @param amount how many tokens to be transferred
1297    pub async fn withdraw(&self, recipient: Address, amount: Balance) -> errors::Result<Hash> {
1298        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1299
1300        Ok(self
1301            .hopr_chain_api
1302            .actions_ref()
1303            .withdraw(recipient, amount)
1304            .await?
1305            .await?
1306            .tx_hash)
1307    }
1308
1309    pub async fn open_channel(&self, destination: &Address, amount: &Balance) -> errors::Result<OpenChannelResult> {
1310        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1311
1312        let awaiter = self
1313            .hopr_chain_api
1314            .actions_ref()
1315            .open_channel(*destination, *amount)
1316            .await?;
1317
1318        let channel_id = generate_channel_id(&self.hopr_chain_api.me_onchain(), destination);
1319        Ok(awaiter.await.map(|confirm| OpenChannelResult {
1320            tx_hash: confirm.tx_hash,
1321            channel_id,
1322        })?)
1323    }
1324
1325    pub async fn fund_channel(&self, channel_id: &Hash, amount: &Balance) -> errors::Result<Hash> {
1326        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1327
1328        Ok(self
1329            .hopr_chain_api
1330            .actions_ref()
1331            .fund_channel(*channel_id, *amount)
1332            .await?
1333            .await
1334            .map(|confirm| confirm.tx_hash)?)
1335    }
1336
1337    pub async fn close_channel(
1338        &self,
1339        counterparty: &Address,
1340        direction: ChannelDirection,
1341        redeem_before_close: bool,
1342    ) -> errors::Result<CloseChannelResult> {
1343        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1344
1345        let confirmation = self
1346            .hopr_chain_api
1347            .actions_ref()
1348            .close_channel(*counterparty, direction, redeem_before_close)
1349            .await?
1350            .await?;
1351
1352        match confirmation
1353            .event
1354            .expect("channel close action confirmation must have associated chain event")
1355        {
1356            ChainEventType::ChannelClosureInitiated(c) => Ok(CloseChannelResult {
1357                tx_hash: confirmation.tx_hash,
1358                status: c.status, // copy the information about closure time
1359            }),
1360            ChainEventType::ChannelClosed(_) => Ok(CloseChannelResult {
1361                tx_hash: confirmation.tx_hash,
1362                status: ChannelStatus::Closed,
1363            }),
1364            _ => Err(HoprLibError::GeneralError("close channel transaction failed".into())),
1365        }
1366    }
1367
1368    pub async fn close_channel_by_id(
1369        &self,
1370        channel_id: Hash,
1371        redeem_before_close: bool,
1372    ) -> errors::Result<CloseChannelResult> {
1373        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1374
1375        match self.channel_from_hash(&channel_id).await? {
1376            Some(channel) => match channel.orientation(&self.me_onchain()) {
1377                Some((direction, counterparty)) => {
1378                    self.close_channel(&counterparty, direction, redeem_before_close).await
1379                }
1380                None => Err(HoprLibError::ChainError(ChainActionsError::InvalidArguments(
1381                    "cannot close channel that is not own".into(),
1382                ))),
1383            },
1384            None => Err(HoprLibError::ChainError(ChainActionsError::ChannelDoesNotExist)),
1385        }
1386    }
1387
1388    pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1389        Ok(self.hopr_chain_api.get_channel_closure_notice_period().await?)
1390    }
1391
1392    pub async fn redeem_all_tickets(&self, only_aggregated: bool) -> errors::Result<()> {
1393        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1394
1395        // We do not await the on-chain confirmation
1396        self.hopr_chain_api
1397            .actions_ref()
1398            .redeem_all_tickets(only_aggregated)
1399            .await?;
1400
1401        Ok(())
1402    }
1403
1404    pub async fn redeem_tickets_with_counterparty(
1405        &self,
1406        counterparty: &Address,
1407        only_aggregated: bool,
1408    ) -> errors::Result<()> {
1409        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1410
1411        // We do not await the on-chain confirmation
1412        let _ = self
1413            .hopr_chain_api
1414            .actions_ref()
1415            .redeem_tickets_with_counterparty(counterparty, only_aggregated)
1416            .await?;
1417
1418        Ok(())
1419    }
1420
1421    pub async fn redeem_tickets_in_channel(&self, channel_id: &Hash, only_aggregated: bool) -> errors::Result<usize> {
1422        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1423
1424        let channel = self.db.get_channel_by_id(None, channel_id).await?;
1425        let mut redeem_count = 0;
1426
1427        if let Some(channel) = channel {
1428            if channel.destination == self.hopr_chain_api.me_onchain() {
1429                // We do not await the on-chain confirmation
1430                redeem_count = self
1431                    .hopr_chain_api
1432                    .actions_ref()
1433                    .redeem_tickets_in_channel(&channel, only_aggregated)
1434                    .await?
1435                    .len();
1436            }
1437        }
1438
1439        Ok(redeem_count)
1440    }
1441
1442    pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1443        self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1444
1445        // We do not await the on-chain confirmation
1446        #[allow(clippy::let_underscore_future)]
1447        let _ = self.hopr_chain_api.actions_ref().redeem_ticket(ack_ticket).await?;
1448
1449        Ok(())
1450    }
1451
1452    pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1453        let pk = hopr_transport::OffchainPublicKey::try_from(peer_id)?;
1454        Ok(self.db.resolve_chain_key(&pk).await?)
1455    }
1456
1457    pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1458        Ok(self
1459            .db
1460            .resolve_packet_key(address)
1461            .await
1462            .map(|pk| pk.map(|v| v.into()))?)
1463    }
1464
1465    pub async fn export_channel_graph(&self, cfg: GraphExportConfig) -> String {
1466        self.channel_graph.read().await.as_dot(cfg)
1467    }
1468
1469    pub async fn export_raw_channel_graph(&self) -> errors::Result<String> {
1470        let cg = self.channel_graph.read().await;
1471        serde_json::to_string(cg.deref()).map_err(|e| HoprLibError::GeneralError(e.to_string()))
1472    }
1473}