1pub mod config;
18pub mod constants;
20pub mod errors;
22
23use std::{
24 collections::HashMap,
25 fmt::{Display, Formatter},
26 ops::Deref,
27 path::PathBuf,
28 str::FromStr,
29 sync::{Arc, atomic::Ordering},
30 time::Duration,
31};
32
33use async_lock::RwLock;
34use errors::{HoprLibError, HoprStatusError};
35use futures::{
36 SinkExt, Stream, StreamExt,
37 channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded},
38 future::AbortHandle,
39 stream::{self},
40};
41use hopr_async_runtime::prelude::{sleep, spawn};
42pub use hopr_chain_actions::errors::ChainActionsError;
43use hopr_chain_actions::{
44 action_state::{ActionState, IndexerActionTracker},
45 channels::ChannelActions,
46 node::NodeActions,
47 redeem::TicketRedeemActions,
48};
49pub use hopr_chain_api::config::{
50 Addresses as NetworkContractAddresses, EnvironmentType, Network as ChainNetwork, ProtocolsConfig,
51};
52use hopr_chain_api::{
53 HoprChain, HoprChainProcess, SignificantChainEvent, can_register_with_safe, config::ChainNetworkConfig,
54 errors::HoprChainError, wait_for_funds,
55};
56use hopr_chain_rpc::HoprRpcOperations;
57use hopr_chain_types::{ContractAddresses, chain_events::ChainEventType};
58use hopr_crypto_types::prelude::OffchainPublicKey;
59use hopr_db_api::logs::HoprDbLogOperations;
60use hopr_db_sql::{
61 HoprDbAllOperations,
62 accounts::HoprDbAccountOperations,
63 api::{info::SafeInfo, resolver::HoprDbResolverOperations, tickets::HoprDbTicketOperations},
64 channels::HoprDbChannelOperations,
65 db::{HoprDb, HoprDbConfig},
66 info::{HoprDbInfoOperations, IndexerStateInfo},
67 prelude::{ChainOrPacketKey::ChainKey, HoprDbPeersOperations},
68 registry::HoprDbRegistryOperations,
69};
70pub use hopr_internal_types::prelude::*;
71pub use hopr_network_types::prelude::{DestinationRouting, IpProtocol, RoutingOptions};
72pub use hopr_path::channel_graph::GraphExportConfig;
73use hopr_path::channel_graph::{ChannelGraph, ChannelGraphConfig, NodeScoreUpdate};
74use hopr_platform::file::native::{join, remove_dir_all};
75pub use hopr_primitive_types::prelude::*;
76pub use hopr_strategy::Strategy;
77use hopr_strategy::strategy::{MultiStrategy, SingularStrategy};
78#[cfg(feature = "runtime-tokio")]
79pub use hopr_transport::transfer_session;
80pub use hopr_transport::{
81 ApplicationData, HalfKeyChallenge, Health, IncomingSession as HoprIncomingSession, Keypair, Multiaddr,
82 OffchainKeypair as HoprOffchainKeypair, PeerId, PingQueryReplier, ProbeError, SESSION_MTU, SURB_SIZE, ServiceId,
83 Session as HoprSession, SessionCapabilities, SessionCapability, SessionClientConfig, SessionId as HoprSessionId,
84 SessionManagerError, SessionTarget, SurbBalancerConfig, Tag, TicketStatistics, TransportSessionError,
85 config::{HostConfig, HostType, looks_like_domain},
86 errors::{HoprTransportError, NetworkingError, ProtocolError},
87};
88use hopr_transport::{
89 ChainKeypair, Hash, HoprTransport, HoprTransportConfig, HoprTransportProcess, IncomingSession, OffchainKeypair,
90 PeerDiscovery, PeerStatus, execute_on_tick,
91};
92use tracing::{debug, error, info, trace, warn};
93#[cfg(all(feature = "prometheus", not(test)))]
94use {
95 hopr_metrics::metrics::{MultiGauge, SimpleGauge},
96 hopr_platform::time::native::current_time,
97};
98
99use crate::{
100 config::SafeModule,
101 constants::{MIN_NATIVE_BALANCE, ONBOARDING_INFORMATION_INTERVAL, SUGGESTED_NATIVE_BALANCE},
102};
103
104#[cfg(all(feature = "prometheus", not(test)))]
105lazy_static::lazy_static! {
106 static ref METRIC_PROCESS_START_TIME: SimpleGauge = SimpleGauge::new(
107 "hopr_start_time",
108 "The unix timestamp in seconds at which the process was started"
109 ).unwrap();
110 static ref METRIC_HOPR_LIB_VERSION: MultiGauge = MultiGauge::new(
111 "hopr_lib_version",
112 "Executed version of hopr-lib",
113 &["version"]
114 ).unwrap();
115 static ref METRIC_HOPR_NODE_INFO: MultiGauge = MultiGauge::new(
116 "hopr_node_addresses",
117 "Node on-chain and off-chain addresses",
118 &["peerid", "address", "safe_address", "module_address"]
119 ).unwrap();
120}
121
122pub use async_trait::async_trait;
123
124#[cfg(feature = "session-server")]
127#[async_trait::async_trait]
128pub trait HoprSessionReactor {
129 async fn process(&self, session: HoprIncomingSession) -> errors::Result<()>;
131}
132
133#[atomic_enum::atomic_enum]
135#[derive(PartialEq, Eq)]
136pub enum HoprState {
137 Uninitialized = 0,
138 Initializing = 1,
139 Indexing = 2,
140 Starting = 3,
141 Running = 4,
142}
143
144impl Display for HoprState {
145 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146 write!(f, "{self:?}")
147 }
148}
149
150pub struct OpenChannelResult {
151 pub tx_hash: Hash,
152 pub channel_id: Hash,
153}
154
155pub struct CloseChannelResult {
156 pub tx_hash: Hash,
157 pub status: ChannelStatus,
158}
159
160#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display)]
165pub enum HoprLibProcesses {
166 #[strum(to_string = "transport: {0}")]
167 Transport(HoprTransportProcess),
168 #[cfg(feature = "session-server")]
169 #[strum(to_string = "session server providing the exit node session stream functionality")]
170 SessionServer,
171 #[strum(to_string = "tick wake up the strategies to perform an action")]
172 StrategyTick,
173 #[strum(to_string = "initial indexing operation into the DB")]
174 Indexing,
175 #[strum(to_string = "processing of indexed operations in internal components")]
176 IndexReflection,
177 #[strum(to_string = "on-chain transaction queue component for outgoing transactions")]
178 OutgoingOnchainActionQueue,
179 #[strum(to_string = "flush operation of outgoing ticket indices to the DB")]
180 TicketIndexFlush,
181 #[strum(to_string = "on received ack ticket trigger")]
182 OnReceivedAcknowledgement,
183}
184
185impl HoprLibProcesses {
186 pub fn can_finish(&self) -> bool {
189 matches!(self, HoprLibProcesses::Indexing)
190 }
191}
192
193impl From<HoprTransportProcess> for HoprLibProcesses {
194 fn from(value: HoprTransportProcess) -> Self {
195 HoprLibProcesses::Transport(value)
196 }
197}
198
199#[allow(clippy::too_many_arguments)]
208pub async fn chain_events_to_transport_events<StreamIn, Db>(
209 event_stream: StreamIn,
210 me_onchain: Address,
211 db: Db,
212 multi_strategy: Arc<MultiStrategy>,
213 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
214 indexer_action_tracker: Arc<IndexerActionTracker>,
215) -> impl Stream<Item = PeerDiscovery> + Send + 'static
216where
217 Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
218 StreamIn: Stream<Item = SignificantChainEvent> + Send + 'static,
219{
220 Box::pin(event_stream.filter_map(move |event| {
221 let db = db.clone();
222 let multi_strategy = multi_strategy.clone();
223 let channel_graph = channel_graph.clone();
224 let indexer_action_tracker = indexer_action_tracker.clone();
225
226 async move {
227 let resolved = indexer_action_tracker.match_and_resolve(&event).await;
228 if resolved.is_empty() {
229 trace!(%event, "No indexer expectations resolved for the event");
230 } else {
231 debug!(count = resolved.len(), %event, "resolved indexer expectations");
232 }
233
234 match event.event_type {
235 ChainEventType::Announcement{peer, address, multiaddresses} => {
236 let allowed = db
237 .is_allowed_in_network_registry(None, &address)
238 .await
239 .unwrap_or(false);
240
241 Some(vec![PeerDiscovery::Announce(peer, multiaddresses), if allowed {
242 PeerDiscovery::Allow(peer)
243 } else {
244 PeerDiscovery::Ban(peer)
245 }])
246 }
247 ChainEventType::ChannelOpened(channel) |
248 ChainEventType::ChannelClosureInitiated(channel) |
249 ChainEventType::ChannelClosed(channel) |
250 ChainEventType::ChannelBalanceIncreased(channel, _) | ChainEventType::ChannelBalanceDecreased(channel, _) | ChainEventType::TicketRedeemed(channel, _) => { let maybe_direction = channel.direction(&me_onchain);
254
255 let change = channel_graph
256 .write_arc()
257 .await
258 .update_channel(channel);
259
260 if let Some(own_channel_direction) = maybe_direction {
262 if let Some(change_set) = change {
263 for channel_change in change_set {
264 let _ = hopr_strategy::strategy::SingularStrategy::on_own_channel_changed(
265 &*multi_strategy,
266 &channel,
267 own_channel_direction,
268 channel_change,
269 )
270 .await;
271 }
272 } else if channel.status == ChannelStatus::Open {
273 let _ = hopr_strategy::strategy::SingularStrategy::on_own_channel_changed(
275 &*multi_strategy,
276 &channel,
277 own_channel_direction,
278 ChannelChange::Status {
279 left: ChannelStatus::Closed,
280 right: ChannelStatus::Open,
281 },
282 )
283 .await;
284 }
285 }
286
287 None
288 }
289 ChainEventType::NetworkRegistryUpdate(address, allowed) => {
290 let packet_key = db.translate_key(None, address).await;
291 match packet_key {
292 Ok(pk) => {
293 if let Some(pk) = pk {
294 let offchain_key: Result<OffchainPublicKey, _> = pk.try_into();
295
296 if let Ok(offchain_key) = offchain_key {
297 let peer_id = offchain_key.into();
298
299 let res = match allowed {
300 hopr_chain_types::chain_events::NetworkRegistryStatus::Allowed => PeerDiscovery::Allow(peer_id),
301 hopr_chain_types::chain_events::NetworkRegistryStatus::Denied => PeerDiscovery::Ban(peer_id),
302 };
303
304 Some(vec![res])
305 } else {
306 error!("Failed to unwrap as offchain key at this point");
307 None
308 }
309 } else {
310 None
311 }
312 }
313 Err(error) => {
314 error!(%error, "on_network_registry_node_allowed failed");
315 None
316 },
317 }
318 }
319 ChainEventType::NodeSafeRegistered(safe_address) => {
320 info!(%safe_address, "Node safe registered");
321 None
322 }
323 }
324 }
325 })
326 .flat_map(stream::iter)
327)
328}
329
330pub struct HoprSocket {
334 rx: UnboundedReceiver<ApplicationData>,
335 tx: UnboundedSender<ApplicationData>,
336}
337
338impl Default for HoprSocket {
339 fn default() -> Self {
340 let (tx, rx) = unbounded::<ApplicationData>();
341 Self { rx, tx }
342 }
343}
344
345impl HoprSocket {
346 pub fn new() -> Self {
347 Self::default()
348 }
349
350 pub fn reader(self) -> UnboundedReceiver<ApplicationData> {
351 self.rx
352 }
353
354 pub fn writer(&self) -> UnboundedSender<ApplicationData> {
355 self.tx.clone()
356 }
357}
358
359pub struct Hopr {
371 me: OffchainKeypair,
372 me_chain: ChainKeypair,
373 cfg: config::HoprLibConfig,
374 state: Arc<AtomicHoprState>,
375 transport_api: HoprTransport<HoprDb>,
376 hopr_chain_api: HoprChain<HoprDb>,
377 db: HoprDb,
379 chain_cfg: ChainNetworkConfig,
380 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
381 multistrategy: Arc<MultiStrategy>,
382 rx_indexer_significant_events: async_channel::Receiver<SignificantChainEvent>,
383}
384
385impl Hopr {
386 pub fn new(
387 mut cfg: config::HoprLibConfig,
388 me: &OffchainKeypair,
389 me_onchain: &ChainKeypair,
390 ) -> crate::errors::Result<Self> {
391 let multiaddress: Multiaddr = (&cfg.host).try_into()?;
392
393 let db_path: PathBuf = [&cfg.db.data, "db"].iter().collect();
394 info!(path = ?db_path, "Initiating DB");
395
396 if cfg.db.force_initialize {
397 info!("Force cleaning up existing database");
398 remove_dir_all(db_path.as_path()).map_err(|e| {
399 HoprLibError::GeneralError(format!(
400 "Failed to remove the existing DB directory at '{db_path:?}': {e}"
401 ))
402 })?;
403 cfg.db.initialize = true
404 }
405
406 if let Some(parent_dir_path) = db_path.as_path().parent() {
408 if !parent_dir_path.is_dir() {
409 std::fs::create_dir_all(parent_dir_path).map_err(|e| {
410 HoprLibError::GeneralError(format!(
411 "Failed to create DB parent directory at '{parent_dir_path:?}': {e}"
412 ))
413 })?
414 }
415 }
416
417 let db_cfg = HoprDbConfig {
418 create_if_missing: cfg.db.initialize,
419 force_create: cfg.db.force_initialize,
420 log_slow_queries: std::time::Duration::from_millis(150),
421 surb_ring_buffer_size: std::env::var("HOPR_PROTOCOL_SURB_RB_SIZE")
422 .ok()
423 .and_then(|s| u64::from_str(&s).map(|v| v as usize).ok())
424 .unwrap_or_else(|| HoprDbConfig::default().surb_ring_buffer_size),
425 surb_distress_threshold: std::env::var("HOPR_PROTOCOL_SURB_RB_DISTRESS")
426 .ok()
427 .and_then(|s| u64::from_str(&s).map(|v| v as usize).ok())
428 .unwrap_or_else(|| HoprDbConfig::default().surb_distress_threshold),
429 };
430 let db = futures::executor::block_on(HoprDb::new(db_path.as_path(), me_onchain.clone(), db_cfg))?;
431
432 if let Some(provider) = &cfg.chain.provider {
433 info!(provider, "Creating chain components using the custom provider");
434 } else {
435 info!("Creating chain components using the default provider");
436 }
437 let resolved_environment = hopr_chain_api::config::ChainNetworkConfig::new(
438 &cfg.chain.network,
439 crate::constants::APP_VERSION_COERCED,
440 cfg.chain.provider.as_deref(),
441 cfg.chain.max_rpc_requests_per_sec,
442 &mut cfg.chain.protocols,
443 )
444 .map_err(|e| HoprLibError::GeneralError(format!("Failed to resolve blockchain environment: {e}")))?;
445
446 let contract_addresses = ContractAddresses::from(&resolved_environment);
447 info!(
448 myself = me_onchain.public().to_hex(),
449 contract_addresses = ?contract_addresses,
450 "Resolved contract addresses",
451 );
452
453 let my_multiaddresses = vec![multiaddress];
454
455 let (tx_indexer_events, rx_indexer_events) = async_channel::unbounded::<SignificantChainEvent>();
456
457 let channel_graph = Arc::new(RwLock::new(ChannelGraph::new(
458 me_onchain.public().to_address(),
459 ChannelGraphConfig::default(),
460 )));
461
462 let hopr_transport_api = HoprTransport::new(
463 me,
464 me_onchain,
465 HoprTransportConfig {
466 transport: cfg.transport.clone(),
467 network: cfg.network_options.clone(),
468 protocol: cfg.protocol,
469 probe: cfg.probe,
470 session: cfg.session,
471 },
472 db.clone(),
473 channel_graph.clone(),
474 my_multiaddresses,
475 );
476
477 let hopr_hopr_chain_api = hopr_chain_api::HoprChain::new(
478 me_onchain.clone(),
479 db.clone(),
480 resolved_environment.clone(),
481 cfg.safe_module.module_address,
482 ContractAddresses {
483 announcements: resolved_environment.announcements,
484 channels: resolved_environment.channels,
485 token: resolved_environment.token,
486 price_oracle: resolved_environment.ticket_price_oracle,
487 win_prob_oracle: resolved_environment.winning_probability_oracle,
488 network_registry: resolved_environment.network_registry,
489 network_registry_proxy: resolved_environment.network_registry_proxy,
490 stake_factory: resolved_environment.node_stake_v2_factory,
491 safe_registry: resolved_environment.node_safe_registry,
492 module_implementation: resolved_environment.module_implementation,
493 },
494 cfg.safe_module.safe_address,
495 hopr_chain_indexer::IndexerConfig {
496 start_block_number: resolved_environment.channel_contract_deploy_block as u64,
497 fast_sync: cfg.chain.fast_sync,
498 enable_logs_snapshot: cfg.chain.enable_logs_snapshot,
499 logs_snapshot_url: cfg.chain.logs_snapshot_url.clone(),
500 data_directory: cfg.db.data.clone(),
501 },
502 tx_indexer_events,
503 )?;
504
505 let multi_strategy = Arc::new(MultiStrategy::new(
506 cfg.strategy.clone(),
507 db.clone(),
508 hopr_hopr_chain_api.actions_ref().clone(),
509 hopr_transport_api.ticket_aggregator(),
510 ));
511 debug!(
512 strategies = tracing::field::debug(&multi_strategy),
513 "Initialized strategies"
514 );
515
516 #[cfg(all(feature = "prometheus", not(test)))]
517 {
518 METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
519 METRIC_HOPR_LIB_VERSION.set(
520 &[const_format::formatcp!("{}", constants::APP_VERSION)],
521 f64::from_str(const_format::formatcp!(
522 "{}.{}",
523 env!("CARGO_PKG_VERSION_MAJOR"),
524 env!("CARGO_PKG_VERSION_MINOR")
525 ))
526 .unwrap_or(0.0),
527 );
528
529 if let Err(e) = futures::executor::block_on(db.get_ticket_statistics(None)) {
531 error!(error = %e, "Failed to initialize ticket statistics metrics");
532 }
533 }
534
535 Ok(Self {
536 me: me.clone(),
537 me_chain: me_onchain.clone(),
538 cfg,
539 state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
540 transport_api: hopr_transport_api,
541 hopr_chain_api: hopr_hopr_chain_api,
542 db,
543 chain_cfg: resolved_environment,
544 channel_graph,
545 multistrategy: multi_strategy,
546 rx_indexer_significant_events: rx_indexer_events,
547 })
548 }
549
550 fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
551 if self.status() == state {
552 Ok(())
553 } else {
554 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
555 }
556 }
557
558 pub fn status(&self) -> HoprState {
559 self.state.load(Ordering::Relaxed)
560 }
561
562 pub fn version_coerced(&self) -> String {
563 String::from(constants::APP_VERSION_COERCED)
564 }
565
566 pub fn version(&self) -> String {
567 String::from(constants::APP_VERSION)
568 }
569
570 pub fn network(&self) -> String {
571 self.cfg.chain.network.clone()
572 }
573
574 pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
575 Ok(self.hopr_chain_api.get_balance().await?)
576 }
577
578 pub async fn get_eligibility_status(&self) -> errors::Result<bool> {
579 Ok(self.hopr_chain_api.get_eligibility_status().await?)
580 }
581
582 pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
583 let safe_balance = self
584 .hopr_chain_api
585 .get_safe_balance(self.cfg.safe_module.safe_address)
586 .await?;
587 Ok(safe_balance)
588 }
589
590 pub fn get_safe_config(&self) -> SafeModule {
591 self.cfg.safe_module.clone()
592 }
593
594 pub fn chain_config(&self) -> ChainNetworkConfig {
595 self.chain_cfg.clone()
596 }
597
598 pub fn config(&self) -> &config::HoprLibConfig {
599 &self.cfg
600 }
601
602 pub fn get_provider(&self) -> String {
603 self.cfg
604 .chain
605 .provider
606 .clone()
607 .unwrap_or(self.chain_cfg.chain.default_provider.clone())
608 }
609
610 #[inline]
611 fn is_public(&self) -> bool {
612 self.cfg.chain.announce
613 }
614
615 pub async fn run<#[cfg(feature = "session-server")] T: HoprSessionReactor + Clone + Send + 'static>(
616 &self,
617 #[cfg(feature = "session-server")] serve_handler: T,
618 ) -> errors::Result<(HoprSocket, HashMap<HoprLibProcesses, AbortHandle>)> {
619 self.error_if_not_in_state(
620 HoprState::Uninitialized,
621 "Cannot start the hopr node multiple times".into(),
622 )?;
623
624 info!(
625 address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
626 "Node is not started, please fund this node",
627 );
628
629 let mut processes: HashMap<HoprLibProcesses, AbortHandle> = HashMap::new();
630
631 wait_for_funds(
632 self.me_onchain(),
633 *MIN_NATIVE_BALANCE,
634 Duration::from_secs(200),
635 self.hopr_chain_api.rpc(),
636 )
637 .await?;
638
639 info!("Starting the node...");
640
641 self.state.store(HoprState::Initializing, Ordering::Relaxed);
642
643 let balance: XDaiBalance = self.get_balance().await?;
644 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
645
646 info!(
647 address = %self.hopr_chain_api.me_onchain(),
648 %balance,
649 %minimum_balance,
650 "Node information"
651 );
652
653 if balance.le(&minimum_balance) {
654 return Err(HoprLibError::GeneralError(
655 "Cannot start the node without a sufficiently funded wallet".to_string(),
656 ));
657 }
658
659 let network_min_ticket_price = self.hopr_chain_api.get_minimum_ticket_price().await?;
662
663 let configured_ticket_price = self.cfg.protocol.outgoing_ticket_price;
664 if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
665 return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
666 "configured outgoing ticket price is lower than the network minimum ticket price: \
667 {configured_ticket_price:?} < {network_min_ticket_price}"
668 ))));
669 }
670
671 let network_min_win_prob = self.hopr_chain_api.get_minimum_winning_probability().await?;
674 let configured_win_prob = self.cfg.protocol.outgoing_ticket_winning_prob;
675 if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
676 && configured_win_prob
677 .and_then(|c| WinningProbability::try_from(c).ok())
678 .is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
679 {
680 return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
681 "configured outgoing ticket winning probability is lower than the network minimum winning \
682 probability: {configured_win_prob:?} < {network_min_win_prob}"
683 ))));
684 }
685
686 self.db
688 .set_safe_info(
689 None,
690 SafeInfo {
691 safe_address: self.cfg.safe_module.safe_address,
692 module_address: self.cfg.safe_module.module_address,
693 },
694 )
695 .await?;
696
697 self.state.store(HoprState::Indexing, Ordering::Relaxed);
698
699 let (mut indexer_peer_update_tx, indexer_peer_update_rx) = futures::channel::mpsc::unbounded::<PeerDiscovery>();
700
701 let indexer_event_pipeline = chain_events_to_transport_events(
702 self.rx_indexer_significant_events.clone(),
703 self.me_onchain(),
704 self.db.clone(),
705 self.multistrategy.clone(),
706 self.channel_graph.clone(),
707 self.hopr_chain_api.action_state(),
708 )
709 .await;
710
711 {
712 info!("Syncing peer announcements and network registry updates from previous runs");
715 let accounts = self.db.get_accounts(None, true).await?;
716 for account in accounts.into_iter() {
717 match account.entry_type {
718 AccountType::NotAnnounced => {}
719 AccountType::Announced { multiaddr, .. } => {
720 indexer_peer_update_tx
721 .send(PeerDiscovery::Announce(account.public_key.into(), vec![multiaddr]))
722 .await
723 .map_err(|e| {
724 HoprLibError::GeneralError(format!("Failed to send peer discovery announcement: {e}"))
725 })?;
726
727 let allow_status = if self
728 .db
729 .is_allowed_in_network_registry(None, &account.chain_addr)
730 .await?
731 {
732 PeerDiscovery::Allow(account.public_key.into())
733 } else {
734 PeerDiscovery::Ban(account.public_key.into())
735 };
736
737 indexer_peer_update_tx.send(allow_status).await.map_err(|e| {
738 HoprLibError::GeneralError(format!(
739 "Failed to send peer discovery network registry event: {e}"
740 ))
741 })?;
742 }
743 }
744 }
745 }
746
747 spawn(async move {
748 indexer_event_pipeline
749 .map(Ok)
750 .forward(indexer_peer_update_tx)
751 .await
752 .expect("The index to transport event chain failed");
753 });
754
755 info!("Start the chain process and sync the indexer");
756 for (id, proc) in self.hopr_chain_api.start().await?.into_iter() {
757 let nid = match id {
758 HoprChainProcess::Indexer => HoprLibProcesses::Indexing,
759 HoprChainProcess::OutgoingOnchainActionQueue => HoprLibProcesses::OutgoingOnchainActionQueue,
760 };
761 processes.insert(nid, proc);
762 }
763
764 {
765 let my_ethereum_address = self.me_onchain();
767 let my_peer_id = self.me_peer_id();
768 let my_version = crate::constants::APP_VERSION;
769
770 while !self
771 .db
772 .clone()
773 .is_allowed_in_network_registry(None, &my_ethereum_address)
774 .await
775 .unwrap_or(false)
776 {
777 info!(
778 "Once you become eligible to join the HOPR network, you can continue your onboarding by using the following URL: https://hub.hoprnet.org/staking/onboarding?HOPRdNodeAddressForOnboarding={}, or by manually entering the node address of your node on https://hub.hoprnet.org/.",
779 my_ethereum_address.to_hex()
780 );
781
782 sleep(ONBOARDING_INFORMATION_INTERVAL).await;
783
784 info!(peer_id = %my_peer_id, address = %my_ethereum_address.to_hex(), version = &my_version, "Node information");
785 info!("Node Ethereum address: {my_ethereum_address} <- put this into staking hub");
786 }
787 }
788
789 let safe_module_configuration = self
795 .hopr_chain_api
796 .rpc()
797 .check_node_safe_module_status(self.me_onchain())
798 .await
799 .map_err(HoprChainError::Rpc)?;
800
801 if !safe_module_configuration.should_pass() {
802 error!(
803 ?safe_module_configuration,
804 "Something is wrong with the safe module configuration",
805 );
806 return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
807 "Safe and module are not configured correctly {safe_module_configuration:?}",
808 ))));
809 }
810
811 if can_register_with_safe(
814 self.me_onchain(),
815 self.cfg.safe_module.safe_address,
816 self.hopr_chain_api.rpc(),
817 )
818 .await?
819 {
820 info!("Registering safe by node");
821
822 if self.me_onchain() == self.cfg.safe_module.safe_address {
823 return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
824 }
825
826 if let Err(e) = self
827 .hopr_chain_api
828 .actions_ref()
829 .register_safe_by_node(self.cfg.safe_module.safe_address)
830 .await?
831 .await
832 {
833 error!(error = %e, "Failed to register node with safe")
835 }
836 }
837
838 if self.is_public() {
839 let multiaddresses_to_announce = self.transport_api.announceable_multiaddresses();
843
844 match self
846 .hopr_chain_api
847 .actions_ref()
848 .announce(&multiaddresses_to_announce, &self.me)
849 .await
850 {
851 Ok(_) => info!(?multiaddresses_to_announce, "Announcing node on chain",),
852 Err(ChainActionsError::AlreadyAnnounced) => {
853 info!(multiaddresses_announced = ?multiaddresses_to_announce, "Node already announced on chain")
854 }
855 Err(e) => error!(error = %e, "Failed to transmit node announcement"),
859 }
860 }
861
862 {
863 let channel_graph = self.channel_graph.clone();
867 let mut cg = channel_graph.write_arc().await;
868
869 info!("Syncing channels from the previous runs");
870 let mut channel_stream = self
871 .db
872 .stream_active_channels()
873 .await
874 .map_err(hopr_db_sql::api::errors::DbError::from)?;
875
876 while let Some(maybe_channel) = channel_stream.next().await {
877 match maybe_channel {
878 Ok(channel) => {
879 cg.update_channel(channel);
880 }
881 Err(error) => error!(%error, "Failed to sync channel into the graph"),
882 }
883 }
884
885 info!("Syncing peer qualities from the previous runs");
890 let min_quality_to_sync: f64 = std::env::var("HOPR_MIN_PEER_QUALITY_TO_SYNC")
891 .map_err(|e| e.to_string())
892 .and_then(|v| std::str::FromStr::from_str(&v).map_err(|_| "parse error".to_string()))
893 .unwrap_or_else(|error| {
894 warn!(error, "invalid value for HOPR_MIN_PEER_QUALITY_TO_SYNC env variable");
895 constants::DEFAULT_MIN_QUALITY_TO_SYNC
896 });
897
898 let mut peer_stream = self
899 .db
900 .get_network_peers(Default::default(), false)
901 .await?
902 .filter(|status| futures::future::ready(status.quality >= min_quality_to_sync));
903
904 while let Some(peer) = peer_stream.next().await {
905 if let Some(ChainKey(key)) = self.db.translate_key(None, peer.id.0).await? {
906 cg.update_node_score(&key, NodeScoreUpdate::Initialize(peer.last_seen_latency, 1.0));
908 } else {
909 error!(peer = %peer.id.1, "Could not translate peer information");
910 }
911 }
912
913 info!(
914 channels = cg.count_channels(),
915 nodes = cg.count_nodes(),
916 "Channel graph sync complete"
917 );
918 }
919
920 let socket = HoprSocket::new();
921 let transport_output_tx = socket.writer();
922
923 let multi_strategy_ack_ticket = self.multistrategy.clone();
925 let (on_ack_tkt_tx, mut on_ack_tkt_rx) = unbounded::<AcknowledgedTicket>();
926 self.db.start_ticket_processing(Some(on_ack_tkt_tx))?;
927
928 processes.insert(
929 HoprLibProcesses::OnReceivedAcknowledgement,
930 hopr_async_runtime::spawn_as_abortable!(async move {
931 while let Some(ack) = on_ack_tkt_rx.next().await {
932 if let Err(error) = hopr_strategy::strategy::SingularStrategy::on_acknowledged_winning_ticket(
933 &*multi_strategy_ack_ticket,
934 &ack,
935 )
936 .await
937 {
938 error!(%error, "Failed to process acknowledged winning ticket with the strategy");
939 }
940 }
941 }),
942 );
943
944 let (session_tx, _session_rx) = unbounded::<IncomingSession>();
945
946 #[cfg(feature = "session-server")]
947 {
948 processes.insert(
949 HoprLibProcesses::SessionServer,
950 hopr_async_runtime::spawn_as_abortable!(_session_rx.for_each_concurrent(None, move |session| {
951 let serve_handler = serve_handler.clone();
952 async move {
953 let session_id = *session.session.id();
954 match serve_handler.process(session).await {
955 Ok(_) => debug!(
956 session_id = ?session_id,
957 "Client session processed successfully"
958 ),
959 Err(e) => error!(
960 session_id = ?session_id,
961 error = %e,
962 "Client session processing failed"
963 ),
964 }
965 }
966 })),
967 );
968 }
969
970 for (id, proc) in self
971 .transport_api
972 .run(
973 &self.me_chain,
974 join(&[&self.cfg.db.data, "tbf"])
975 .map_err(|e| HoprLibError::GeneralError(format!("Failed to construct the bloom filter: {e}")))?,
976 transport_output_tx,
977 indexer_peer_update_rx,
978 session_tx,
979 )
980 .await?
981 .into_iter()
982 {
983 processes.insert(id.into(), proc);
984 }
985
986 let db_clone = self.db.clone();
987 processes.insert(
988 HoprLibProcesses::TicketIndexFlush,
989 hopr_async_runtime::spawn_as_abortable!(Box::pin(execute_on_tick(
990 Duration::from_secs(5),
991 move || {
992 let db_clone = db_clone.clone();
993 async move {
994 match db_clone.persist_outgoing_ticket_indices().await {
995 Ok(n) => debug!(count = n, "Successfully flushed states of outgoing ticket indices"),
996 Err(e) => error!(error = %e, "Failed to flush ticket indices"),
997 }
998 }
999 },
1000 "flush the states of outgoing ticket indices".into(),
1001 ))),
1002 );
1003
1004 if let Err(e) = self.db.fix_channels_next_ticket_state().await {
1009 error!(error = %e, "failed to fix channels ticket states");
1010 }
1011
1012 let multi_strategy = self.multistrategy.clone();
1016 let strategy_interval = self.cfg.strategy.execution_interval;
1017 processes.insert(
1018 HoprLibProcesses::StrategyTick,
1019 hopr_async_runtime::spawn_as_abortable!(async move {
1020 execute_on_tick(
1021 Duration::from_secs(strategy_interval),
1022 move || {
1023 let multi_strategy = multi_strategy.clone();
1024
1025 async move {
1026 trace!(state = "started", "strategy tick");
1027 let _ = multi_strategy.on_tick().await;
1028 trace!(state = "finished", "strategy tick");
1029 }
1030 },
1031 "run strategies".into(),
1032 )
1033 .await;
1034 }),
1035 );
1036
1037 self.state.store(HoprState::Running, Ordering::Relaxed);
1038
1039 info!(
1040 id = %self.me_peer_id(),
1041 version = constants::APP_VERSION,
1042 "NODE STARTED AND RUNNING"
1043 );
1044
1045 #[cfg(all(feature = "prometheus", not(test)))]
1046 METRIC_HOPR_NODE_INFO.set(
1047 &[
1048 &self.me.public().to_peerid_str(),
1049 &self.me_onchain().to_string(),
1050 &self.cfg.safe_module.safe_address.to_string(),
1051 &self.cfg.safe_module.module_address.to_string(),
1052 ],
1053 1.0,
1054 );
1055
1056 Ok((socket, processes))
1057 }
1058
1059 pub fn me_peer_id(&self) -> PeerId {
1062 (*self.me.public()).into()
1063 }
1064
1065 pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
1067 Ok(self.transport_api.get_public_nodes().await?)
1068 }
1069
1070 pub async fn get_indexer_state(&self) -> errors::Result<IndexerStateInfo> {
1072 let indexer_state_info = self.db.get_indexer_state_info(None).await?;
1073
1074 match self.db.get_last_checksummed_log().await? {
1075 Some(log) => {
1076 let checksum = match log.checksum {
1077 Some(checksum) => Hash::from_hex(checksum.as_str())?,
1078 None => Hash::default(),
1079 };
1080 Ok(IndexerStateInfo {
1081 latest_log_block_number: log.block_number as u32,
1082 latest_log_checksum: checksum,
1083 ..indexer_state_info
1084 })
1085 }
1086 None => Ok(indexer_state_info),
1087 }
1088 }
1089
1090 pub async fn is_allowed_to_access_network(
1092 &self,
1093 address_like: either::Either<&PeerId, Address>,
1094 ) -> errors::Result<bool> {
1095 Ok(self.transport_api.is_allowed_to_access_network(address_like).await?)
1096 }
1097
1098 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
1102 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1103
1104 Ok(self.transport_api.ping(peer).await?)
1105 }
1106
1107 #[cfg(feature = "session-client")]
1110 pub async fn connect_to(
1111 &self,
1112 destination: Address,
1113 target: SessionTarget,
1114 cfg: SessionClientConfig,
1115 ) -> errors::Result<HoprSession> {
1116 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1117
1118 let backoff = backon::ConstantBuilder::default()
1119 .with_max_times(self.cfg.session.establish_max_retries as usize)
1120 .with_delay(self.cfg.session.establish_retry_timeout)
1121 .with_jitter();
1122
1123 use backon::Retryable;
1124
1125 Ok((|| {
1126 let cfg = cfg.clone();
1127 let target = target.clone();
1128 async { self.transport_api.new_session(destination, target, cfg).await }
1129 })
1130 .retry(backoff)
1131 .sleep(backon::FuturesTimerSleeper)
1132 .await?)
1133 }
1134
1135 #[cfg(feature = "session-client")]
1138 pub async fn keep_alive_session(&self, id: &HoprSessionId) -> errors::Result<()> {
1139 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1140 Ok(self.transport_api.probe_session(id).await?)
1141 }
1142
1143 #[cfg(feature = "session-client")]
1144 pub async fn get_session_surb_balancer_config(
1145 &self,
1146 id: &HoprSessionId,
1147 ) -> errors::Result<Option<SurbBalancerConfig>> {
1148 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1149 Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
1150 }
1151
1152 #[cfg(feature = "session-client")]
1153 pub async fn update_session_surb_balancer_config(
1154 &self,
1155 id: &HoprSessionId,
1156 cfg: SurbBalancerConfig,
1157 ) -> errors::Result<()> {
1158 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1159 Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
1160 }
1161
1162 #[tracing::instrument(level = "debug", skip(self, msg))]
1170 pub async fn send_message(
1171 &self,
1172 msg: Box<[u8]>,
1173 routing: DestinationRouting,
1174 application_tag: Tag,
1175 ) -> errors::Result<()> {
1176 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1177
1178 self.transport_api.send_message(msg, routing, application_tag).await?;
1179
1180 Ok(())
1181 }
1182
1183 pub async fn aggregate_tickets(&self, channel: &Hash) -> errors::Result<()> {
1185 Ok(self.transport_api.aggregate_tickets(channel).await?)
1186 }
1187
1188 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1190 self.transport_api.local_multiaddresses()
1191 }
1192
1193 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1195 self.transport_api.listening_multiaddresses().await
1196 }
1197
1198 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1200 self.transport_api.network_observed_multiaddresses(peer).await
1201 }
1202
1203 pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Vec<Multiaddr> {
1205 let key = match OffchainPublicKey::try_from(peer) {
1206 Ok(k) => k,
1207 Err(e) => {
1208 error!(%peer, error = %e, "failed to convert peer id to off-chain key");
1209 return vec![];
1210 }
1211 };
1212
1213 match self.db.get_account(None, key).await {
1214 Ok(Some(entry)) => Vec::from_iter(entry.get_multiaddr()),
1215 Ok(None) => {
1216 error!(%peer, "no information");
1217 vec![]
1218 }
1219 Err(e) => {
1220 error!(%peer, error = %e, "failed to retrieve information");
1221 vec![]
1222 }
1223 }
1224 }
1225
1226 pub async fn network_health(&self) -> Health {
1230 self.transport_api.network_health().await
1231 }
1232
1233 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
1235 Ok(self.transport_api.network_connected_peers().await?)
1236 }
1237
1238 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<hopr_transport::PeerStatus>> {
1240 Ok(self.transport_api.network_peer_info(peer).await?)
1241 }
1242
1243 pub async fn all_network_peers(
1245 &self,
1246 minimum_quality: f64,
1247 ) -> errors::Result<Vec<(Option<Address>, PeerId, hopr_transport::PeerStatus)>> {
1248 Ok(
1249 futures::stream::iter(self.transport_api.network_connected_peers().await?)
1250 .filter_map(|peer| async move {
1251 if let Ok(Some(info)) = self.transport_api.network_peer_info(&peer).await {
1252 if info.get_average_quality() >= minimum_quality {
1253 Some((peer, info))
1254 } else {
1255 None
1256 }
1257 } else {
1258 None
1259 }
1260 })
1261 .filter_map(|(peer_id, info)| async move {
1262 let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
1263 Some((address, peer_id, info))
1264 })
1265 .collect::<Vec<_>>()
1266 .await,
1267 )
1268 }
1269
1270 pub async fn tickets_in_channel(&self, channel: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
1273 Ok(self.transport_api.tickets_in_channel(channel).await?)
1274 }
1275
1276 pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
1278 Ok(self.transport_api.all_tickets().await?)
1279 }
1280
1281 pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
1283 Ok(self.transport_api.ticket_statistics().await?)
1284 }
1285
1286 pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
1288 Ok(self.db.reset_ticket_statistics().await?)
1289 }
1290
1291 pub fn peer_resolver(&self) -> &impl HoprDbResolverOperations {
1293 &self.db
1294 }
1295
1296 pub fn me_onchain(&self) -> Address {
1298 self.hopr_chain_api.me_onchain()
1299 }
1300
1301 pub async fn get_ticket_price(&self) -> errors::Result<Option<HoprBalance>> {
1303 Ok(self.hopr_chain_api.ticket_price().await?)
1304 }
1305
1306 pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
1308 Ok(self
1309 .db
1310 .get_indexer_data(None)
1311 .await?
1312 .minimum_incoming_ticket_winning_prob)
1313 }
1314
1315 pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1317 Ok(self.hopr_chain_api.accounts_announced_on_chain().await?)
1318 }
1319
1320 pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1323 Ok(self.db.get_channel_by_id(None, channel_id).await?)
1324 }
1325
1326 pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<ChannelEntry> {
1331 Ok(self.hopr_chain_api.channel(src, dest).await?)
1332 }
1333
1334 pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1336 Ok(self.hopr_chain_api.channels_from(src).await?)
1337 }
1338
1339 pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1341 Ok(self.hopr_chain_api.channels_to(dest).await?)
1342 }
1343
1344 pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1346 Ok(self.hopr_chain_api.all_channels().await?)
1347 }
1348
1349 pub async fn corrupted_channels(&self) -> errors::Result<Vec<CorruptedChannelEntry>> {
1351 Ok(self.hopr_chain_api.corrupted_channels().await?)
1352 }
1353
1354 pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1356 Ok(self.hopr_chain_api.safe_allowance().await?)
1357 }
1358
1359 pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<Hash> {
1363 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1364
1365 let awaiter = self.hopr_chain_api.actions_ref().withdraw(recipient, amount).await?;
1366
1367 Ok(awaiter.await?.tx_hash)
1368 }
1369
1370 pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<Hash> {
1374 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1375
1376 let awaiter = self
1377 .hopr_chain_api
1378 .actions_ref()
1379 .withdraw_native(recipient, amount)
1380 .await?;
1381
1382 Ok(awaiter.await?.tx_hash)
1383 }
1384
1385 pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1386 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1387
1388 let awaiter = self
1389 .hopr_chain_api
1390 .actions_ref()
1391 .open_channel(*destination, amount)
1392 .await?;
1393
1394 let channel_id = generate_channel_id(&self.hopr_chain_api.me_onchain(), destination);
1395 Ok(awaiter.await.map(|confirm| OpenChannelResult {
1396 tx_hash: confirm.tx_hash,
1397 channel_id,
1398 })?)
1399 }
1400
1401 pub async fn fund_channel(&self, channel_id: &Hash, amount: HoprBalance) -> errors::Result<Hash> {
1402 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1403
1404 let awaiter = self
1405 .hopr_chain_api
1406 .actions_ref()
1407 .fund_channel(*channel_id, amount)
1408 .await?;
1409
1410 Ok(awaiter.await?.tx_hash)
1411 }
1412
1413 pub async fn close_channel(
1414 &self,
1415 counterparty: &Address,
1416 direction: ChannelDirection,
1417 redeem_before_close: bool,
1418 ) -> errors::Result<CloseChannelResult> {
1419 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1420
1421 let confirmation = self
1422 .hopr_chain_api
1423 .actions_ref()
1424 .close_channel(*counterparty, direction, redeem_before_close)
1425 .await?
1426 .await?;
1427
1428 match confirmation
1429 .event
1430 .expect("channel close action confirmation must have associated chain event")
1431 {
1432 ChainEventType::ChannelClosureInitiated(c) => Ok(CloseChannelResult {
1433 tx_hash: confirmation.tx_hash,
1434 status: c.status, }),
1436 ChainEventType::ChannelClosed(_) => Ok(CloseChannelResult {
1437 tx_hash: confirmation.tx_hash,
1438 status: ChannelStatus::Closed,
1439 }),
1440 _ => Err(HoprLibError::GeneralError("close channel transaction failed".into())),
1441 }
1442 }
1443
1444 pub async fn close_channel_by_id(
1445 &self,
1446 channel_id: Hash,
1447 redeem_before_close: bool,
1448 ) -> errors::Result<CloseChannelResult> {
1449 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1450
1451 match self.channel_from_hash(&channel_id).await? {
1452 Some(channel) => match channel.orientation(&self.me_onchain()) {
1453 Some((direction, counterparty)) => {
1454 self.close_channel(&counterparty, direction, redeem_before_close).await
1455 }
1456 None => Err(HoprLibError::ChainError(ChainActionsError::InvalidArguments(
1457 "cannot close channel that is not own".into(),
1458 ))),
1459 },
1460 None => Err(HoprLibError::ChainError(ChainActionsError::ChannelDoesNotExist)),
1461 }
1462 }
1463
1464 pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1465 Ok(self.hopr_chain_api.get_channel_closure_notice_period().await?)
1466 }
1467
1468 pub async fn redeem_all_tickets(&self, only_aggregated: bool) -> errors::Result<()> {
1469 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1470
1471 self.hopr_chain_api
1473 .actions_ref()
1474 .redeem_all_tickets(only_aggregated)
1475 .await?;
1476
1477 Ok(())
1478 }
1479
1480 pub async fn redeem_tickets_with_counterparty(
1481 &self,
1482 counterparty: &Address,
1483 only_aggregated: bool,
1484 ) -> errors::Result<()> {
1485 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1486
1487 let _ = self
1489 .hopr_chain_api
1490 .actions_ref()
1491 .redeem_tickets_with_counterparty(counterparty, only_aggregated)
1492 .await?;
1493
1494 Ok(())
1495 }
1496
1497 pub async fn redeem_tickets_in_channel(&self, channel_id: &Hash, only_aggregated: bool) -> errors::Result<usize> {
1498 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1499
1500 let channel = self.db.get_channel_by_id(None, channel_id).await?;
1501 let mut redeem_count = 0;
1502
1503 if let Some(channel) = channel {
1504 if channel.destination == self.hopr_chain_api.me_onchain() {
1505 redeem_count = self
1507 .hopr_chain_api
1508 .actions_ref()
1509 .redeem_tickets_in_channel(&channel, only_aggregated)
1510 .await?
1511 .len();
1512 }
1513 }
1514
1515 Ok(redeem_count)
1516 }
1517
1518 pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1519 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1520
1521 #[allow(clippy::let_underscore_future)]
1523 let _ = self.hopr_chain_api.actions_ref().redeem_ticket(ack_ticket).await?;
1524
1525 Ok(())
1526 }
1527
1528 pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1529 let pk = hopr_transport::OffchainPublicKey::try_from(peer_id)?;
1530 Ok(self.db.resolve_chain_key(&pk).await?)
1531 }
1532
1533 pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1534 Ok(self
1535 .db
1536 .resolve_packet_key(address)
1537 .await
1538 .map(|pk| pk.map(|v| v.into()))?)
1539 }
1540
1541 pub async fn export_channel_graph(&self, cfg: GraphExportConfig) -> String {
1542 self.channel_graph.read_arc().await.as_dot(cfg)
1543 }
1544
1545 pub async fn export_raw_channel_graph(&self) -> errors::Result<String> {
1546 let cg = self.channel_graph.read_arc().await;
1547 serde_json::to_string(cg.deref()).map_err(|e| HoprLibError::GeneralError(e.to_string()))
1548 }
1549}