1pub mod config;
18pub mod constants;
20pub mod errors;
22
23use std::{
24 collections::HashMap,
25 fmt::{Display, Formatter},
26 ops::Deref,
27 path::PathBuf,
28 sync::{Arc, atomic::Ordering},
29 time::Duration,
30};
31
32use async_lock::RwLock;
33use errors::{HoprLibError, HoprStatusError};
34use futures::{
35 SinkExt, Stream, StreamExt,
36 channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded},
37 future::AbortHandle,
38 stream::{self},
39};
40use hopr_async_runtime::prelude::{sleep, spawn};
41pub use hopr_chain_actions::errors::ChainActionsError;
42use hopr_chain_actions::{
43 action_state::{ActionState, IndexerActionTracker},
44 channels::ChannelActions,
45 node::NodeActions,
46 redeem::TicketRedeemActions,
47};
48pub use hopr_chain_api::config::{
49 Addresses as NetworkContractAddresses, EnvironmentType, Network as ChainNetwork, ProtocolsConfig,
50};
51use hopr_chain_api::{
52 HoprChain, HoprChainProcess, SignificantChainEvent, can_register_with_safe, config::ChainNetworkConfig,
53 errors::HoprChainError, wait_for_funds,
54};
55use hopr_chain_rpc::HoprRpcOperations;
56use hopr_chain_types::{ContractAddresses, chain_events::ChainEventType};
57use hopr_crypto_types::prelude::OffchainPublicKey;
58use hopr_db_api::logs::HoprDbLogOperations;
59use hopr_db_sql::{
60 HoprDbAllOperations,
61 accounts::HoprDbAccountOperations,
62 api::{info::SafeInfo, resolver::HoprDbResolverOperations, tickets::HoprDbTicketOperations},
63 channels::HoprDbChannelOperations,
64 db::{HoprDb, HoprDbConfig},
65 info::{HoprDbInfoOperations, IndexerStateInfo},
66 prelude::{ChainOrPacketKey::ChainKey, HoprDbPeersOperations},
67 registry::HoprDbRegistryOperations,
68};
69pub use hopr_internal_types::prelude::*;
70pub use hopr_network_types::prelude::{DestinationRouting, IpProtocol, RoutingOptions};
71pub use hopr_path::channel_graph::GraphExportConfig;
72use hopr_path::channel_graph::{ChannelGraph, ChannelGraphConfig, NodeScoreUpdate};
73use hopr_platform::file::native::{join, remove_dir_all};
74pub use hopr_primitive_types::prelude::*;
75pub use hopr_strategy::Strategy;
76use hopr_strategy::strategy::{MultiStrategy, SingularStrategy};
77#[cfg(feature = "runtime-tokio")]
78pub use hopr_transport::transfer_session;
79pub use hopr_transport::{
80 ApplicationData, HalfKeyChallenge, Health, IncomingSession as HoprIncomingSession, Keypair, Multiaddr,
81 OffchainKeypair as HoprOffchainKeypair, PeerId, PingQueryReplier, ProbeError, SESSION_PAYLOAD_SIZE, SendMsg,
82 ServiceId, Session as HoprSession, SessionCapabilities, SessionCapability, SessionClientConfig,
83 SessionId as HoprSessionId, SessionTarget, SurbBalancerConfig, Tag, TicketStatistics,
84 config::{HostConfig, HostType, looks_like_domain},
85 errors::{HoprTransportError, NetworkingError, ProtocolError},
86};
87use hopr_transport::{
88 ChainKeypair, Hash, HoprTransport, HoprTransportConfig, HoprTransportProcess, IncomingSession, OffchainKeypair,
89 PeerDiscovery, PeerStatus, execute_on_tick,
90};
91use tracing::{debug, error, info, trace, warn};
92#[cfg(all(feature = "prometheus", not(test)))]
93use {
94 hopr_metrics::metrics::{MultiGauge, SimpleGauge},
95 hopr_platform::time::native::current_time,
96 std::str::FromStr,
97};
98
99use crate::{
100 config::SafeModule,
101 constants::{MIN_NATIVE_BALANCE, ONBOARDING_INFORMATION_INTERVAL, SUGGESTED_NATIVE_BALANCE},
102};
103
104#[cfg(all(feature = "prometheus", not(test)))]
105lazy_static::lazy_static! {
106 static ref METRIC_PROCESS_START_TIME: SimpleGauge = SimpleGauge::new(
107 "hopr_start_time",
108 "The unix timestamp in seconds at which the process was started"
109 ).unwrap();
110 static ref METRIC_HOPR_LIB_VERSION: MultiGauge = MultiGauge::new(
111 "hopr_lib_version",
112 "Executed version of hopr-lib",
113 &["version"]
114 ).unwrap();
115 static ref METRIC_HOPR_NODE_INFO: MultiGauge = MultiGauge::new(
116 "hopr_node_addresses",
117 "Node on-chain and off-chain addresses",
118 &["peerid", "address", "safe_address", "module_address"]
119 ).unwrap();
120}
121
122pub use async_trait::async_trait;
123
124#[cfg(feature = "session-server")]
127#[async_trait::async_trait]
128pub trait HoprSessionReactor {
129 async fn process(&self, session: HoprIncomingSession) -> errors::Result<()>;
131}
132
133#[atomic_enum::atomic_enum]
135#[derive(PartialEq, Eq)]
136pub enum HoprState {
137 Uninitialized = 0,
138 Initializing = 1,
139 Indexing = 2,
140 Starting = 3,
141 Running = 4,
142}
143
144impl Display for HoprState {
145 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146 write!(f, "{:?}", self)
147 }
148}
149
150pub struct OpenChannelResult {
151 pub tx_hash: Hash,
152 pub channel_id: Hash,
153}
154
155pub struct CloseChannelResult {
156 pub tx_hash: Hash,
157 pub status: ChannelStatus,
158}
159
160#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display)]
165pub enum HoprLibProcesses {
166 #[strum(to_string = "transport: {0}")]
167 Transport(HoprTransportProcess),
168 #[cfg(feature = "session-server")]
169 #[strum(to_string = "session server providing the exit node session stream functionality")]
170 SessionServer,
171 #[strum(to_string = "tick wake up the strategies to perform an action")]
172 StrategyTick,
173 #[strum(to_string = "initial indexing operation into the DB")]
174 Indexing,
175 #[strum(to_string = "processing of indexed operations in internal components")]
176 IndexReflection,
177 #[strum(to_string = "on-chain transaction queue component for outgoing transactions")]
178 OutgoingOnchainActionQueue,
179 #[strum(to_string = "flush operation of outgoing ticket indices to the DB")]
180 TicketIndexFlush,
181 #[strum(to_string = "on received ack ticket trigger")]
182 OnReceivedAcknowledgement,
183}
184
185impl HoprLibProcesses {
186 pub fn can_finish(&self) -> bool {
189 matches!(self, HoprLibProcesses::Indexing)
190 }
191}
192
193impl From<HoprTransportProcess> for HoprLibProcesses {
194 fn from(value: HoprTransportProcess) -> Self {
195 HoprLibProcesses::Transport(value)
196 }
197}
198
199#[allow(clippy::too_many_arguments)]
208pub async fn chain_events_to_transport_events<StreamIn, Db>(
209 event_stream: StreamIn,
210 me_onchain: Address,
211 db: Db,
212 multi_strategy: Arc<MultiStrategy>,
213 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
214 indexer_action_tracker: Arc<IndexerActionTracker>,
215) -> impl Stream<Item = PeerDiscovery> + Send + 'static
216where
217 Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
218 StreamIn: Stream<Item = SignificantChainEvent> + Send + 'static,
219{
220 Box::pin(event_stream.filter_map(move |event| {
221 let db = db.clone();
222 let multi_strategy = multi_strategy.clone();
223 let channel_graph = channel_graph.clone();
224 let indexer_action_tracker = indexer_action_tracker.clone();
225
226 async move {
227 let resolved = indexer_action_tracker.match_and_resolve(&event).await;
228 if resolved.is_empty() {
229 trace!(%event, "No indexer expectations resolved for the event");
230 } else {
231 debug!(count = resolved.len(), %event, "resolved indexer expectations");
232 }
233
234 match event.event_type {
235 ChainEventType::Announcement{peer, address, multiaddresses} => {
236 let allowed = db
237 .is_allowed_in_network_registry(None, &address)
238 .await
239 .unwrap_or(false);
240
241 Some(vec![PeerDiscovery::Announce(peer, multiaddresses), if allowed {
242 PeerDiscovery::Allow(peer)
243 } else {
244 PeerDiscovery::Ban(peer)
245 }])
246 }
247 ChainEventType::ChannelOpened(channel) |
248 ChainEventType::ChannelClosureInitiated(channel) |
249 ChainEventType::ChannelClosed(channel) |
250 ChainEventType::ChannelBalanceIncreased(channel, _) | ChainEventType::ChannelBalanceDecreased(channel, _) | ChainEventType::TicketRedeemed(channel, _) => { let maybe_direction = channel.direction(&me_onchain);
254
255 let change = channel_graph
256 .write_arc()
257 .await
258 .update_channel(channel);
259
260 if let Some(own_channel_direction) = maybe_direction {
262 if let Some(change_set) = change {
263 for channel_change in change_set {
264 let _ = hopr_strategy::strategy::SingularStrategy::on_own_channel_changed(
265 &*multi_strategy,
266 &channel,
267 own_channel_direction,
268 channel_change,
269 )
270 .await;
271 }
272 } else if channel.status == ChannelStatus::Open {
273 let _ = hopr_strategy::strategy::SingularStrategy::on_own_channel_changed(
275 &*multi_strategy,
276 &channel,
277 own_channel_direction,
278 ChannelChange::Status {
279 left: ChannelStatus::Closed,
280 right: ChannelStatus::Open,
281 },
282 )
283 .await;
284 }
285 }
286
287 None
288 }
289 ChainEventType::NetworkRegistryUpdate(address, allowed) => {
290 let packet_key = db.translate_key(None, address).await;
291 match packet_key {
292 Ok(pk) => {
293 if let Some(pk) = pk {
294 let offchain_key: Result<OffchainPublicKey, _> = pk.try_into();
295
296 if let Ok(offchain_key) = offchain_key {
297 let peer_id = offchain_key.into();
298
299 let res = match allowed {
300 hopr_chain_types::chain_events::NetworkRegistryStatus::Allowed => PeerDiscovery::Allow(peer_id),
301 hopr_chain_types::chain_events::NetworkRegistryStatus::Denied => PeerDiscovery::Ban(peer_id),
302 };
303
304 Some(vec![res])
305 } else {
306 error!("Failed to unwrap as offchain key at this point");
307 None
308 }
309 } else {
310 None
311 }
312 }
313 Err(error) => {
314 error!(%error, "on_network_registry_node_allowed failed");
315 None
316 },
317 }
318 }
319 ChainEventType::NodeSafeRegistered(safe_address) => {
320 info!(%safe_address, "Node safe registered");
321 None
322 }
323 }
324 }
325 })
326 .flat_map(stream::iter)
327)
328}
329
330pub struct HoprSocket {
334 rx: UnboundedReceiver<ApplicationData>,
335 tx: UnboundedSender<ApplicationData>,
336}
337
338impl Default for HoprSocket {
339 fn default() -> Self {
340 let (tx, rx) = unbounded::<ApplicationData>();
341 Self { rx, tx }
342 }
343}
344
345impl HoprSocket {
346 pub fn new() -> Self {
347 Self::default()
348 }
349
350 pub fn reader(self) -> UnboundedReceiver<ApplicationData> {
351 self.rx
352 }
353
354 pub fn writer(&self) -> UnboundedSender<ApplicationData> {
355 self.tx.clone()
356 }
357}
358
359pub struct Hopr {
371 me: OffchainKeypair,
372 me_chain: ChainKeypair,
373 cfg: config::HoprLibConfig,
374 state: Arc<AtomicHoprState>,
375 transport_api: HoprTransport<HoprDb>,
376 hopr_chain_api: HoprChain<HoprDb>,
377 db: HoprDb,
379 chain_cfg: ChainNetworkConfig,
380 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
381 multistrategy: Arc<MultiStrategy>,
382 rx_indexer_significant_events: async_channel::Receiver<SignificantChainEvent>,
383}
384
385impl Hopr {
386 pub fn new(
387 mut cfg: config::HoprLibConfig,
388 me: &OffchainKeypair,
389 me_onchain: &ChainKeypair,
390 ) -> crate::errors::Result<Self> {
391 let multiaddress: Multiaddr = (&cfg.host).try_into()?;
392
393 let db_path: PathBuf = [&cfg.db.data, "db"].iter().collect();
394 info!(path = ?db_path, "Initiating DB");
395
396 if cfg.db.force_initialize {
397 info!("Force cleaning up existing database");
398 remove_dir_all(db_path.as_path()).map_err(|e| {
399 HoprLibError::GeneralError(format!(
400 "Failed to remove the existing DB directory at '{db_path:?}': {e}"
401 ))
402 })?;
403 cfg.db.initialize = true
404 }
405
406 if let Some(parent_dir_path) = db_path.as_path().parent() {
408 if !parent_dir_path.is_dir() {
409 std::fs::create_dir_all(parent_dir_path).map_err(|e| {
410 HoprLibError::GeneralError(format!(
411 "Failed to create DB parent directory at '{parent_dir_path:?}': {e}"
412 ))
413 })?
414 }
415 }
416
417 let db_cfg = HoprDbConfig {
418 create_if_missing: cfg.db.initialize,
419 force_create: cfg.db.force_initialize,
420 log_slow_queries: std::time::Duration::from_millis(150),
421 };
422 let db = futures::executor::block_on(HoprDb::new(db_path.as_path(), me_onchain.clone(), db_cfg))?;
423
424 if let Some(provider) = &cfg.chain.provider {
425 info!(provider, "Creating chain components using the custom provider");
426 } else {
427 info!("Creating chain components using the default provider");
428 }
429 let resolved_environment = hopr_chain_api::config::ChainNetworkConfig::new(
430 &cfg.chain.network,
431 crate::constants::APP_VERSION_COERCED,
432 cfg.chain.provider.as_deref(),
433 cfg.chain.max_rpc_requests_per_sec,
434 &mut cfg.chain.protocols,
435 )
436 .map_err(|e| HoprLibError::GeneralError(format!("Failed to resolve blockchain environment: {e}")))?;
437
438 let contract_addresses = ContractAddresses::from(&resolved_environment);
439 info!(
440 myself = me_onchain.public().to_hex(),
441 contract_addresses = ?contract_addresses,
442 "Resolved contract addresses",
443 );
444
445 let my_multiaddresses = vec![multiaddress];
446
447 let (tx_indexer_events, rx_indexer_events) = async_channel::unbounded::<SignificantChainEvent>();
448
449 let channel_graph = Arc::new(RwLock::new(ChannelGraph::new(
450 me_onchain.public().to_address(),
451 ChannelGraphConfig::default(),
452 )));
453
454 let hopr_transport_api = HoprTransport::new(
455 me,
456 me_onchain,
457 HoprTransportConfig {
458 transport: cfg.transport.clone(),
459 network: cfg.network_options.clone(),
460 protocol: cfg.protocol,
461 probe: cfg.probe,
462 session: cfg.session,
463 },
464 db.clone(),
465 channel_graph.clone(),
466 my_multiaddresses,
467 );
468
469 let hopr_hopr_chain_api = hopr_chain_api::HoprChain::new(
470 me_onchain.clone(),
471 db.clone(),
472 resolved_environment.clone(),
473 cfg.safe_module.module_address,
474 ContractAddresses {
475 announcements: resolved_environment.announcements,
476 channels: resolved_environment.channels,
477 token: resolved_environment.token,
478 price_oracle: resolved_environment.ticket_price_oracle,
479 win_prob_oracle: resolved_environment.winning_probability_oracle,
480 network_registry: resolved_environment.network_registry,
481 network_registry_proxy: resolved_environment.network_registry_proxy,
482 stake_factory: resolved_environment.node_stake_v2_factory,
483 safe_registry: resolved_environment.node_safe_registry,
484 module_implementation: resolved_environment.module_implementation,
485 },
486 cfg.safe_module.safe_address,
487 hopr_chain_indexer::IndexerConfig {
488 start_block_number: resolved_environment.channel_contract_deploy_block as u64,
489 fast_sync: cfg.chain.fast_sync,
490 },
491 tx_indexer_events,
492 )?;
493
494 let multi_strategy = Arc::new(MultiStrategy::new(
495 cfg.strategy.clone(),
496 db.clone(),
497 hopr_hopr_chain_api.actions_ref().clone(),
498 hopr_transport_api.ticket_aggregator(),
499 ));
500 debug!(
501 strategies = tracing::field::debug(&multi_strategy),
502 "Initialized strategies"
503 );
504
505 #[cfg(all(feature = "prometheus", not(test)))]
506 {
507 METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
508 METRIC_HOPR_LIB_VERSION.set(
509 &[const_format::formatcp!("{}", constants::APP_VERSION)],
510 f64::from_str(const_format::formatcp!(
511 "{}.{}",
512 env!("CARGO_PKG_VERSION_MAJOR"),
513 env!("CARGO_PKG_VERSION_MINOR")
514 ))
515 .unwrap_or(0.0),
516 );
517
518 if let Err(e) = futures::executor::block_on(db.get_ticket_statistics(None)) {
520 error!(error = %e, "Failed to initialize ticket statistics metrics");
521 }
522 }
523
524 Ok(Self {
525 me: me.clone(),
526 me_chain: me_onchain.clone(),
527 cfg,
528 state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
529 transport_api: hopr_transport_api,
530 hopr_chain_api: hopr_hopr_chain_api,
531 db,
532 chain_cfg: resolved_environment,
533 channel_graph,
534 multistrategy: multi_strategy,
535 rx_indexer_significant_events: rx_indexer_events,
536 })
537 }
538
539 fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
540 if self.status() == state {
541 Ok(())
542 } else {
543 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
544 }
545 }
546
547 pub fn status(&self) -> HoprState {
548 self.state.load(Ordering::Relaxed)
549 }
550
551 pub fn version_coerced(&self) -> String {
552 String::from(constants::APP_VERSION_COERCED)
553 }
554
555 pub fn version(&self) -> String {
556 String::from(constants::APP_VERSION)
557 }
558
559 pub fn network(&self) -> String {
560 self.cfg.chain.network.clone()
561 }
562
563 pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
564 Ok(self.hopr_chain_api.get_balance().await?)
565 }
566
567 pub async fn get_eligibility_status(&self) -> errors::Result<bool> {
568 Ok(self.hopr_chain_api.get_eligibility_status().await?)
569 }
570
571 pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
572 let safe_balance = self
573 .hopr_chain_api
574 .get_safe_balance(self.cfg.safe_module.safe_address)
575 .await?;
576 Ok(safe_balance)
577 }
578
579 pub fn get_safe_config(&self) -> SafeModule {
580 self.cfg.safe_module.clone()
581 }
582
583 pub fn chain_config(&self) -> ChainNetworkConfig {
584 self.chain_cfg.clone()
585 }
586
587 pub fn config(&self) -> &config::HoprLibConfig {
588 &self.cfg
589 }
590
591 pub fn get_provider(&self) -> String {
592 self.cfg
593 .chain
594 .provider
595 .clone()
596 .unwrap_or(self.chain_cfg.chain.default_provider.clone())
597 }
598
599 #[inline]
600 fn is_public(&self) -> bool {
601 self.cfg.chain.announce
602 }
603
604 pub async fn run<#[cfg(feature = "session-server")] T: HoprSessionReactor + Clone + Send + 'static>(
605 &self,
606 #[cfg(feature = "session-server")] serve_handler: T,
607 ) -> errors::Result<(HoprSocket, HashMap<HoprLibProcesses, AbortHandle>)> {
608 self.error_if_not_in_state(
609 HoprState::Uninitialized,
610 "Cannot start the hopr node multiple times".into(),
611 )?;
612
613 info!(
614 address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
615 "Node is not started, please fund this node",
616 );
617
618 let mut processes: HashMap<HoprLibProcesses, AbortHandle> = HashMap::new();
619
620 wait_for_funds(
621 self.me_onchain(),
622 *MIN_NATIVE_BALANCE,
623 Duration::from_secs(200),
624 self.hopr_chain_api.rpc(),
625 )
626 .await?;
627
628 info!("Starting the node...");
629
630 self.state.store(HoprState::Initializing, Ordering::Relaxed);
631
632 let balance: XDaiBalance = self.get_balance().await?;
633 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
634
635 info!(
636 address = %self.hopr_chain_api.me_onchain(),
637 %balance,
638 %minimum_balance,
639 "Node information"
640 );
641
642 if balance.le(&minimum_balance) {
643 return Err(HoprLibError::GeneralError(
644 "Cannot start the node without a sufficiently funded wallet".to_string(),
645 ));
646 }
647
648 let network_min_ticket_price = self.hopr_chain_api.get_minimum_ticket_price().await?;
651
652 let configured_ticket_price = self.cfg.protocol.outgoing_ticket_price;
653 if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
654 return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
655 "configured outgoing ticket price is lower than the network minimum ticket price: \
656 {configured_ticket_price:?} < {network_min_ticket_price}"
657 ))));
658 }
659
660 let network_min_win_prob = self.hopr_chain_api.get_minimum_winning_probability().await?;
663 let configured_win_prob = self.cfg.protocol.outgoing_ticket_winning_prob;
664 if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
665 && configured_win_prob
666 .and_then(|c| WinningProbability::try_from(c).ok())
667 .is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
668 {
669 return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
670 "configured outgoing ticket winning probability is lower than the network minimum winning \
671 probability: {configured_win_prob:?} < {network_min_win_prob}"
672 ))));
673 }
674
675 self.db
677 .set_safe_info(
678 None,
679 SafeInfo {
680 safe_address: self.cfg.safe_module.safe_address,
681 module_address: self.cfg.safe_module.module_address,
682 },
683 )
684 .await?;
685
686 self.state.store(HoprState::Indexing, Ordering::Relaxed);
687
688 let (mut indexer_peer_update_tx, indexer_peer_update_rx) = futures::channel::mpsc::unbounded::<PeerDiscovery>();
689
690 let indexer_event_pipeline = chain_events_to_transport_events(
691 self.rx_indexer_significant_events.clone(),
692 self.me_onchain(),
693 self.db.clone(),
694 self.multistrategy.clone(),
695 self.channel_graph.clone(),
696 self.hopr_chain_api.action_state(),
697 )
698 .await;
699
700 {
701 info!("Syncing peer announcements and network registry updates from previous runs");
704 let accounts = self.db.get_accounts(None, true).await?;
705 for account in accounts.into_iter() {
706 match account.entry_type {
707 AccountType::NotAnnounced => {}
708 AccountType::Announced { multiaddr, .. } => {
709 indexer_peer_update_tx
710 .send(PeerDiscovery::Announce(account.public_key.into(), vec![multiaddr]))
711 .await
712 .map_err(|e| {
713 HoprLibError::GeneralError(format!("Failed to send peer discovery announcement: {e}"))
714 })?;
715
716 let allow_status = if self
717 .db
718 .is_allowed_in_network_registry(None, &account.chain_addr)
719 .await?
720 {
721 PeerDiscovery::Allow(account.public_key.into())
722 } else {
723 PeerDiscovery::Ban(account.public_key.into())
724 };
725
726 indexer_peer_update_tx.send(allow_status).await.map_err(|e| {
727 HoprLibError::GeneralError(format!(
728 "Failed to send peer discovery network registry event: {e}"
729 ))
730 })?;
731 }
732 }
733 }
734 }
735
736 spawn(async move {
737 indexer_event_pipeline
738 .map(Ok)
739 .forward(indexer_peer_update_tx)
740 .await
741 .expect("The index to transport event chain failed");
742 });
743
744 info!("Start the chain process and sync the indexer");
745 for (id, proc) in self.hopr_chain_api.start().await?.into_iter() {
746 let nid = match id {
747 HoprChainProcess::Indexer => HoprLibProcesses::Indexing,
748 HoprChainProcess::OutgoingOnchainActionQueue => HoprLibProcesses::OutgoingOnchainActionQueue,
749 };
750 processes.insert(nid, proc);
751 }
752
753 {
754 let my_ethereum_address = self.me_onchain();
756 let my_peer_id = self.me_peer_id();
757 let my_version = crate::constants::APP_VERSION;
758
759 while !self
760 .db
761 .clone()
762 .is_allowed_in_network_registry(None, &my_ethereum_address)
763 .await
764 .unwrap_or(false)
765 {
766 info!(
767 "Once you become eligible to join the HOPR network, you can continue your onboarding by using the following URL: https://hub.hoprnet.org/staking/onboarding?HOPRdNodeAddressForOnboarding={}, or by manually entering the node address of your node on https://hub.hoprnet.org/.",
768 my_ethereum_address.to_hex()
769 );
770
771 sleep(ONBOARDING_INFORMATION_INTERVAL).await;
772
773 info!(peer_id = %my_peer_id, address = %my_ethereum_address.to_hex(), version = &my_version, "Node information");
774 info!("Node Ethereum address: {my_ethereum_address} <- put this into staking hub");
775 }
776 }
777
778 let safe_module_configuration = self
784 .hopr_chain_api
785 .rpc()
786 .check_node_safe_module_status(self.me_onchain())
787 .await
788 .map_err(HoprChainError::Rpc)?;
789
790 if !safe_module_configuration.should_pass() {
791 error!(
792 ?safe_module_configuration,
793 "Something is wrong with the safe module configuration",
794 );
795 return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
796 "Safe and module are not configured correctly {:?}",
797 safe_module_configuration,
798 ))));
799 }
800
801 if can_register_with_safe(
804 self.me_onchain(),
805 self.cfg.safe_module.safe_address,
806 self.hopr_chain_api.rpc(),
807 )
808 .await?
809 {
810 info!("Registering safe by node");
811
812 if self.me_onchain() == self.cfg.safe_module.safe_address {
813 return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
814 }
815
816 if let Err(e) = self
817 .hopr_chain_api
818 .actions_ref()
819 .register_safe_by_node(self.cfg.safe_module.safe_address)
820 .await?
821 .await
822 {
823 error!(error = %e, "Failed to register node with safe")
825 }
826 }
827
828 if self.is_public() {
829 let multiaddresses_to_announce = self.transport_api.announceable_multiaddresses();
833
834 match self
836 .hopr_chain_api
837 .actions_ref()
838 .announce(&multiaddresses_to_announce, &self.me)
839 .await
840 {
841 Ok(_) => info!(?multiaddresses_to_announce, "Announcing node on chain",),
842 Err(ChainActionsError::AlreadyAnnounced) => {
843 info!(multiaddresses_announced = ?multiaddresses_to_announce, "Node already announced on chain")
844 }
845 Err(e) => error!(error = %e, "Failed to transmit node announcement"),
849 }
850 }
851
852 {
853 let channel_graph = self.channel_graph.clone();
857 let mut cg = channel_graph.write_arc().await;
858
859 info!("Syncing channels from the previous runs");
860 let mut channel_stream = self
861 .db
862 .stream_active_channels()
863 .await
864 .map_err(hopr_db_sql::api::errors::DbError::from)?;
865
866 while let Some(maybe_channel) = channel_stream.next().await {
867 match maybe_channel {
868 Ok(channel) => {
869 cg.update_channel(channel);
870 }
871 Err(error) => error!(%error, "Failed to sync channel into the graph"),
872 }
873 }
874
875 info!("Syncing peer qualities from the previous runs");
880 let min_quality_to_sync: f64 = std::env::var("HOPR_MIN_PEER_QUALITY_TO_SYNC")
881 .map_err(|e| e.to_string())
882 .and_then(|v| std::str::FromStr::from_str(&v).map_err(|_| "parse error".to_string()))
883 .unwrap_or_else(|error| {
884 warn!(error, "invalid value for HOPR_MIN_PEER_QUALITY_TO_SYNC env variable");
885 constants::DEFAULT_MIN_QUALITY_TO_SYNC
886 });
887
888 let mut peer_stream = self
889 .db
890 .get_network_peers(Default::default(), false)
891 .await?
892 .filter(|status| futures::future::ready(status.quality >= min_quality_to_sync));
893
894 while let Some(peer) = peer_stream.next().await {
895 if let Some(ChainKey(key)) = self.db.translate_key(None, peer.id.0).await? {
896 cg.update_node_score(&key, NodeScoreUpdate::Initialize(peer.last_seen_latency, 1.0));
898 } else {
899 error!(peer = %peer.id.1, "Could not translate peer information");
900 }
901 }
902
903 info!(
904 channels = cg.count_channels(),
905 nodes = cg.count_nodes(),
906 "Channel graph sync complete"
907 );
908 }
909
910 let socket = HoprSocket::new();
911 let transport_output_tx = socket.writer();
912
913 let multi_strategy_ack_ticket = self.multistrategy.clone();
915 let (on_ack_tkt_tx, mut on_ack_tkt_rx) = unbounded::<AcknowledgedTicket>();
916 self.db.start_ticket_processing(Some(on_ack_tkt_tx))?;
917
918 processes.insert(
919 HoprLibProcesses::OnReceivedAcknowledgement,
920 hopr_async_runtime::spawn_as_abortable(async move {
921 while let Some(ack) = on_ack_tkt_rx.next().await {
922 if let Err(error) = hopr_strategy::strategy::SingularStrategy::on_acknowledged_winning_ticket(
923 &*multi_strategy_ack_ticket,
924 &ack,
925 )
926 .await
927 {
928 error!(%error, "Failed to process acknowledged winning ticket with the strategy");
929 }
930 }
931 }),
932 );
933
934 let (session_tx, _session_rx) = unbounded::<IncomingSession>();
935
936 #[cfg(feature = "session-server")]
937 {
938 processes.insert(
939 HoprLibProcesses::SessionServer,
940 hopr_async_runtime::spawn_as_abortable(_session_rx.for_each_concurrent(None, move |session| {
941 let serve_handler = serve_handler.clone();
942 async move {
943 let session_id = *session.session.id();
944 match serve_handler.process(session).await {
945 Ok(_) => debug!(
946 session_id = ?session_id,
947 "Client session processed successfully"
948 ),
949 Err(e) => error!(
950 session_id = ?session_id,
951 error = %e,
952 "Client session processing failed"
953 ),
954 }
955 }
956 })),
957 );
958 }
959
960 for (id, proc) in self
961 .transport_api
962 .run(
963 &self.me_chain,
964 join(&[&self.cfg.db.data, "tbf"])
965 .map_err(|e| HoprLibError::GeneralError(format!("Failed to construct the bloom filter: {e}")))?,
966 transport_output_tx,
967 indexer_peer_update_rx,
968 session_tx,
969 )
970 .await?
971 .into_iter()
972 {
973 processes.insert(id.into(), proc);
974 }
975
976 let db_clone = self.db.clone();
977 processes.insert(
978 HoprLibProcesses::TicketIndexFlush,
979 hopr_async_runtime::spawn_as_abortable(Box::pin(execute_on_tick(
980 Duration::from_secs(5),
981 move || {
982 let db_clone = db_clone.clone();
983 async move {
984 match db_clone.persist_outgoing_ticket_indices().await {
985 Ok(n) => debug!(count = n, "Successfully flushed states of outgoing ticket indices"),
986 Err(e) => error!(error = %e, "Failed to flush ticket indices"),
987 }
988 }
989 },
990 "flush the states of outgoing ticket indices".into(),
991 ))),
992 );
993
994 if let Err(e) = self.db.fix_channels_next_ticket_state().await {
999 error!(error = %e, "failed to fix channels ticket states");
1000 }
1001
1002 let multi_strategy = self.multistrategy.clone();
1006 let strategy_interval = self.cfg.strategy.execution_interval;
1007 processes.insert(
1008 HoprLibProcesses::StrategyTick,
1009 hopr_async_runtime::spawn_as_abortable(async move {
1010 execute_on_tick(
1011 Duration::from_secs(strategy_interval),
1012 move || {
1013 let multi_strategy = multi_strategy.clone();
1014
1015 async move {
1016 trace!(state = "started", "strategy tick");
1017 let _ = multi_strategy.on_tick().await;
1018 trace!(state = "finished", "strategy tick");
1019 }
1020 },
1021 "run strategies".into(),
1022 )
1023 .await;
1024 }),
1025 );
1026
1027 self.state.store(HoprState::Running, Ordering::Relaxed);
1028
1029 info!(
1030 id = %self.me_peer_id(),
1031 version = constants::APP_VERSION,
1032 "NODE STARTED AND RUNNING"
1033 );
1034
1035 #[cfg(all(feature = "prometheus", not(test)))]
1036 METRIC_HOPR_NODE_INFO.set(
1037 &[
1038 &self.me.public().to_peerid_str(),
1039 &self.me_onchain().to_string(),
1040 &self.cfg.safe_module.safe_address.to_string(),
1041 &self.cfg.safe_module.module_address.to_string(),
1042 ],
1043 1.0,
1044 );
1045
1046 Ok((socket, processes))
1047 }
1048
1049 pub fn me_peer_id(&self) -> PeerId {
1052 (*self.me.public()).into()
1053 }
1054
1055 pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
1057 Ok(self.transport_api.get_public_nodes().await?)
1058 }
1059
1060 pub async fn get_indexer_state(&self) -> errors::Result<IndexerStateInfo> {
1062 let indexer_state_info = self.db.get_indexer_state_info(None).await?;
1063
1064 match self.db.get_last_checksummed_log().await? {
1065 Some(log) => {
1066 let checksum = match log.checksum {
1067 Some(checksum) => Hash::from_hex(checksum.as_str())?,
1068 None => Hash::default(),
1069 };
1070 Ok(IndexerStateInfo {
1071 latest_log_block_number: log.block_number as u32,
1072 latest_log_checksum: checksum,
1073 ..indexer_state_info
1074 })
1075 }
1076 None => Ok(indexer_state_info),
1077 }
1078 }
1079
1080 pub async fn is_allowed_to_access_network(
1082 &self,
1083 address_like: either::Either<&PeerId, Address>,
1084 ) -> errors::Result<bool> {
1085 Ok(self.transport_api.is_allowed_to_access_network(address_like).await?)
1086 }
1087
1088 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
1092 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1093
1094 Ok(self.transport_api.ping(peer).await?)
1095 }
1096
1097 #[cfg(feature = "session-client")]
1100 pub async fn connect_to(
1101 &self,
1102 destination: Address,
1103 target: SessionTarget,
1104 cfg: SessionClientConfig,
1105 ) -> errors::Result<HoprSession> {
1106 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1107
1108 let backoff = backon::ConstantBuilder::default()
1109 .with_max_times(self.cfg.session.establish_max_retries as usize)
1110 .with_delay(self.cfg.session.establish_retry_timeout)
1111 .with_jitter();
1112
1113 struct Sleeper;
1114 impl backon::Sleeper for Sleeper {
1115 type Sleep = futures_timer::Delay;
1116
1117 fn sleep(&self, dur: Duration) -> Self::Sleep {
1118 futures_timer::Delay::new(dur)
1119 }
1120 }
1121
1122 use backon::Retryable;
1123
1124 Ok((|| {
1125 let cfg = cfg.clone();
1126 let target = target.clone();
1127 async { self.transport_api.new_session(destination, target, cfg).await }
1128 })
1129 .retry(backoff)
1130 .sleep(Sleeper)
1131 .await?)
1132 }
1133
1134 #[cfg(feature = "session-client")]
1137 pub async fn keep_alive_session(&self, id: &HoprSessionId) -> errors::Result<()> {
1138 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1139 Ok(self.transport_api.probe_session(id).await?)
1140 }
1141
1142 #[tracing::instrument(level = "debug", skip(self, msg))]
1150 pub async fn send_message(
1151 &self,
1152 msg: Box<[u8]>,
1153 routing: DestinationRouting,
1154 application_tag: Tag,
1155 ) -> errors::Result<()> {
1156 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1157
1158 self.transport_api.send_message(msg, routing, application_tag).await?;
1159
1160 Ok(())
1161 }
1162
1163 pub async fn aggregate_tickets(&self, channel: &Hash) -> errors::Result<()> {
1165 Ok(self.transport_api.aggregate_tickets(channel).await?)
1166 }
1167
1168 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1170 self.transport_api.local_multiaddresses()
1171 }
1172
1173 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1175 self.transport_api.listening_multiaddresses().await
1176 }
1177
1178 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1180 self.transport_api.network_observed_multiaddresses(peer).await
1181 }
1182
1183 pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Vec<Multiaddr> {
1185 let key = match OffchainPublicKey::try_from(peer) {
1186 Ok(k) => k,
1187 Err(e) => {
1188 error!(%peer, error = %e, "failed to convert peer id to off-chain key");
1189 return vec![];
1190 }
1191 };
1192
1193 match self.db.get_account(None, key).await {
1194 Ok(Some(entry)) => Vec::from_iter(entry.get_multiaddr()),
1195 Ok(None) => {
1196 error!(%peer, "no information");
1197 vec![]
1198 }
1199 Err(e) => {
1200 error!(%peer, error = %e, "failed to retrieve information");
1201 vec![]
1202 }
1203 }
1204 }
1205
1206 pub async fn network_health(&self) -> Health {
1210 self.transport_api.network_health().await
1211 }
1212
1213 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
1215 Ok(self.transport_api.network_connected_peers().await?)
1216 }
1217
1218 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<hopr_transport::PeerStatus>> {
1220 Ok(self.transport_api.network_peer_info(peer).await?)
1221 }
1222
1223 pub async fn all_network_peers(
1225 &self,
1226 minimum_quality: f64,
1227 ) -> errors::Result<Vec<(Option<Address>, PeerId, hopr_transport::PeerStatus)>> {
1228 Ok(
1229 futures::stream::iter(self.transport_api.network_connected_peers().await?)
1230 .filter_map(|peer| async move {
1231 if let Ok(Some(info)) = self.transport_api.network_peer_info(&peer).await {
1232 if info.get_average_quality() >= minimum_quality {
1233 Some((peer, info))
1234 } else {
1235 None
1236 }
1237 } else {
1238 None
1239 }
1240 })
1241 .filter_map(|(peer_id, info)| async move {
1242 let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
1243 Some((address, peer_id, info))
1244 })
1245 .collect::<Vec<_>>()
1246 .await,
1247 )
1248 }
1249
1250 pub async fn tickets_in_channel(&self, channel: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
1253 Ok(self.transport_api.tickets_in_channel(channel).await?)
1254 }
1255
1256 pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
1258 Ok(self.transport_api.all_tickets().await?)
1259 }
1260
1261 pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
1263 Ok(self.transport_api.ticket_statistics().await?)
1264 }
1265
1266 pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
1268 Ok(self.db.reset_ticket_statistics().await?)
1269 }
1270
1271 pub fn peer_resolver(&self) -> &impl HoprDbResolverOperations {
1273 &self.db
1274 }
1275
1276 pub fn me_onchain(&self) -> Address {
1278 self.hopr_chain_api.me_onchain()
1279 }
1280
1281 pub async fn get_ticket_price(&self) -> errors::Result<Option<HoprBalance>> {
1283 Ok(self.hopr_chain_api.ticket_price().await?)
1284 }
1285
1286 pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
1288 Ok(self
1289 .db
1290 .get_indexer_data(None)
1291 .await?
1292 .minimum_incoming_ticket_winning_prob)
1293 }
1294
1295 pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1297 Ok(self.hopr_chain_api.accounts_announced_on_chain().await?)
1298 }
1299
1300 pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1303 Ok(self.db.get_channel_by_id(None, channel_id).await?)
1304 }
1305
1306 pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<ChannelEntry> {
1311 Ok(self.hopr_chain_api.channel(src, dest).await?)
1312 }
1313
1314 pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1316 Ok(self.hopr_chain_api.channels_from(src).await?)
1317 }
1318
1319 pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1321 Ok(self.hopr_chain_api.channels_to(dest).await?)
1322 }
1323
1324 pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1326 Ok(self.hopr_chain_api.all_channels().await?)
1327 }
1328
1329 pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1331 Ok(self.hopr_chain_api.safe_allowance().await?)
1332 }
1333
1334 pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<Hash> {
1338 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1339
1340 Ok(self
1341 .hopr_chain_api
1342 .actions_ref()
1343 .withdraw(recipient, amount)
1344 .await?
1345 .await?
1346 .tx_hash)
1347 }
1348
1349 pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<Hash> {
1353 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1354
1355 Ok(self
1356 .hopr_chain_api
1357 .actions_ref()
1358 .withdraw_native(recipient, amount)
1359 .await?
1360 .await?
1361 .tx_hash)
1362 }
1363
1364 pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1365 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1366
1367 let awaiter = self
1368 .hopr_chain_api
1369 .actions_ref()
1370 .open_channel(*destination, amount)
1371 .await?;
1372
1373 let channel_id = generate_channel_id(&self.hopr_chain_api.me_onchain(), destination);
1374 Ok(awaiter.await.map(|confirm| OpenChannelResult {
1375 tx_hash: confirm.tx_hash,
1376 channel_id,
1377 })?)
1378 }
1379
1380 pub async fn fund_channel(&self, channel_id: &Hash, amount: HoprBalance) -> errors::Result<Hash> {
1381 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1382
1383 Ok(self
1384 .hopr_chain_api
1385 .actions_ref()
1386 .fund_channel(*channel_id, amount)
1387 .await?
1388 .await
1389 .map(|confirm| confirm.tx_hash)?)
1390 }
1391
1392 pub async fn close_channel(
1393 &self,
1394 counterparty: &Address,
1395 direction: ChannelDirection,
1396 redeem_before_close: bool,
1397 ) -> errors::Result<CloseChannelResult> {
1398 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1399
1400 let confirmation = self
1401 .hopr_chain_api
1402 .actions_ref()
1403 .close_channel(*counterparty, direction, redeem_before_close)
1404 .await?
1405 .await?;
1406
1407 match confirmation
1408 .event
1409 .expect("channel close action confirmation must have associated chain event")
1410 {
1411 ChainEventType::ChannelClosureInitiated(c) => Ok(CloseChannelResult {
1412 tx_hash: confirmation.tx_hash,
1413 status: c.status, }),
1415 ChainEventType::ChannelClosed(_) => Ok(CloseChannelResult {
1416 tx_hash: confirmation.tx_hash,
1417 status: ChannelStatus::Closed,
1418 }),
1419 _ => Err(HoprLibError::GeneralError("close channel transaction failed".into())),
1420 }
1421 }
1422
1423 pub async fn close_channel_by_id(
1424 &self,
1425 channel_id: Hash,
1426 redeem_before_close: bool,
1427 ) -> errors::Result<CloseChannelResult> {
1428 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1429
1430 match self.channel_from_hash(&channel_id).await? {
1431 Some(channel) => match channel.orientation(&self.me_onchain()) {
1432 Some((direction, counterparty)) => {
1433 self.close_channel(&counterparty, direction, redeem_before_close).await
1434 }
1435 None => Err(HoprLibError::ChainError(ChainActionsError::InvalidArguments(
1436 "cannot close channel that is not own".into(),
1437 ))),
1438 },
1439 None => Err(HoprLibError::ChainError(ChainActionsError::ChannelDoesNotExist)),
1440 }
1441 }
1442
1443 pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1444 Ok(self.hopr_chain_api.get_channel_closure_notice_period().await?)
1445 }
1446
1447 pub async fn redeem_all_tickets(&self, only_aggregated: bool) -> errors::Result<()> {
1448 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1449
1450 self.hopr_chain_api
1452 .actions_ref()
1453 .redeem_all_tickets(only_aggregated)
1454 .await?;
1455
1456 Ok(())
1457 }
1458
1459 pub async fn redeem_tickets_with_counterparty(
1460 &self,
1461 counterparty: &Address,
1462 only_aggregated: bool,
1463 ) -> errors::Result<()> {
1464 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1465
1466 let _ = self
1468 .hopr_chain_api
1469 .actions_ref()
1470 .redeem_tickets_with_counterparty(counterparty, only_aggregated)
1471 .await?;
1472
1473 Ok(())
1474 }
1475
1476 pub async fn redeem_tickets_in_channel(&self, channel_id: &Hash, only_aggregated: bool) -> errors::Result<usize> {
1477 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1478
1479 let channel = self.db.get_channel_by_id(None, channel_id).await?;
1480 let mut redeem_count = 0;
1481
1482 if let Some(channel) = channel {
1483 if channel.destination == self.hopr_chain_api.me_onchain() {
1484 redeem_count = self
1486 .hopr_chain_api
1487 .actions_ref()
1488 .redeem_tickets_in_channel(&channel, only_aggregated)
1489 .await?
1490 .len();
1491 }
1492 }
1493
1494 Ok(redeem_count)
1495 }
1496
1497 pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1498 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1499
1500 #[allow(clippy::let_underscore_future)]
1502 let _ = self.hopr_chain_api.actions_ref().redeem_ticket(ack_ticket).await?;
1503
1504 Ok(())
1505 }
1506
1507 pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1508 let pk = hopr_transport::OffchainPublicKey::try_from(peer_id)?;
1509 Ok(self.db.resolve_chain_key(&pk).await?)
1510 }
1511
1512 pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1513 Ok(self
1514 .db
1515 .resolve_packet_key(address)
1516 .await
1517 .map(|pk| pk.map(|v| v.into()))?)
1518 }
1519
1520 pub async fn export_channel_graph(&self, cfg: GraphExportConfig) -> String {
1521 self.channel_graph.read_arc().await.as_dot(cfg)
1522 }
1523
1524 pub async fn export_raw_channel_graph(&self) -> errors::Result<String> {
1525 let cg = self.channel_graph.read_arc().await;
1526 serde_json::to_string(cg.deref()).map_err(|e| HoprLibError::GeneralError(e.to_string()))
1527 }
1528}