1pub mod config;
18pub mod constants;
20pub mod errors;
22
23use std::{
24 collections::HashMap,
25 fmt::{Display, Formatter},
26 ops::Deref,
27 path::PathBuf,
28 str::FromStr,
29 sync::{Arc, atomic::Ordering},
30 time::Duration,
31};
32
33use async_lock::RwLock;
34use errors::{HoprLibError, HoprStatusError};
35use futures::{
36 SinkExt, Stream, StreamExt,
37 channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded},
38 future::AbortHandle,
39 stream::{self},
40};
41use hopr_async_runtime::prelude::{sleep, spawn};
42pub use hopr_chain_actions::errors::ChainActionsError;
43use hopr_chain_actions::{
44 action_state::{ActionState, IndexerActionTracker},
45 channels::ChannelActions,
46 node::NodeActions,
47 redeem::TicketRedeemActions,
48};
49pub use hopr_chain_api::config::{
50 Addresses as NetworkContractAddresses, EnvironmentType, Network as ChainNetwork, ProtocolsConfig,
51};
52use hopr_chain_api::{
53 HoprChain, HoprChainProcess, SignificantChainEvent, can_register_with_safe, config::ChainNetworkConfig,
54 errors::HoprChainError, wait_for_funds,
55};
56use hopr_chain_rpc::HoprRpcOperations;
57use hopr_chain_types::{ContractAddresses, chain_events::ChainEventType};
58pub use hopr_crypto_keypair::key_pair::{HoprKeys, IdentityRetrievalModes};
59use hopr_crypto_types::prelude::OffchainPublicKey;
60use hopr_db_api::logs::HoprDbLogOperations;
61use hopr_db_sql::{
62 HoprDbAllOperations,
63 accounts::HoprDbAccountOperations,
64 api::{info::SafeInfo, resolver::HoprDbResolverOperations, tickets::HoprDbTicketOperations},
65 channels::HoprDbChannelOperations,
66 db::{HoprDb, HoprDbConfig},
67 info::{HoprDbInfoOperations, IndexerStateInfo},
68 prelude::{ChainOrPacketKey::ChainKey, HoprDbPeersOperations},
69 registry::HoprDbRegistryOperations,
70};
71pub use hopr_internal_types::prelude::*;
72pub use hopr_network_types::prelude::{DestinationRouting, IpProtocol, RoutingOptions};
73pub use hopr_path::channel_graph::GraphExportConfig;
74use hopr_path::channel_graph::{ChannelGraph, ChannelGraphConfig, NodeScoreUpdate};
75use hopr_platform::file::native::remove_dir_all;
76pub use hopr_primitive_types::prelude::*;
77pub use hopr_strategy::Strategy;
78use hopr_strategy::strategy::{MultiStrategy, SingularStrategy};
79#[cfg(feature = "runtime-tokio")]
80pub use hopr_transport::transfer_session;
81pub use hopr_transport::{
82 ApplicationData, ApplicationDataIn, ApplicationDataOut, HalfKeyChallenge, Health, HoprSession,
83 IncomingSession as HoprIncomingSession, Keypair, Multiaddr, OffchainKeypair as HoprOffchainKeypair, PeerId,
84 PingQueryReplier, ProbeError, SESSION_MTU, SURB_SIZE, ServiceId, SessionCapabilities, SessionCapability,
85 SessionClientConfig, SessionId as HoprSessionId, SessionManagerError, SessionTarget, SurbBalancerConfig, Tag,
86 TicketStatistics, TransportSessionError,
87 config::{HostConfig, HostType, looks_like_domain},
88 errors::{HoprTransportError, NetworkingError, ProtocolError},
89};
90use hopr_transport::{
91 ChainKeypair, Hash, HoprTransport, HoprTransportConfig, HoprTransportProcess, IncomingSession, OffchainKeypair,
92 PeerDiscovery, PeerStatus, execute_on_tick,
93};
94use tracing::{debug, error, info, trace, warn};
95#[cfg(all(feature = "prometheus", not(test)))]
96use {
97 hopr_metrics::metrics::{MultiGauge, SimpleGauge},
98 hopr_platform::time::native::current_time,
99};
100
101use crate::{
102 config::SafeModule,
103 constants::{MIN_NATIVE_BALANCE, ONBOARDING_INFORMATION_INTERVAL, SUGGESTED_NATIVE_BALANCE},
104};
105
106#[cfg(all(feature = "prometheus", not(test)))]
107lazy_static::lazy_static! {
108 static ref METRIC_PROCESS_START_TIME: SimpleGauge = SimpleGauge::new(
109 "hopr_start_time",
110 "The unix timestamp in seconds at which the process was started"
111 ).unwrap();
112 static ref METRIC_HOPR_LIB_VERSION: MultiGauge = MultiGauge::new(
113 "hopr_lib_version",
114 "Executed version of hopr-lib",
115 &["version"]
116 ).unwrap();
117 static ref METRIC_HOPR_NODE_INFO: MultiGauge = MultiGauge::new(
118 "hopr_node_addresses",
119 "Node on-chain and off-chain addresses",
120 &["peerid", "address", "safe_address", "module_address"]
121 ).unwrap();
122}
123
124pub use async_trait::async_trait;
125
126#[cfg(feature = "session-server")]
129#[async_trait::async_trait]
130pub trait HoprSessionReactor {
131 async fn process(&self, session: HoprIncomingSession) -> errors::Result<()>;
133}
134
135#[atomic_enum::atomic_enum]
137#[derive(PartialEq, Eq)]
138pub enum HoprState {
139 Uninitialized = 0,
140 Initializing = 1,
141 Indexing = 2,
142 Starting = 3,
143 Running = 4,
144}
145
146impl Display for HoprState {
147 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
148 write!(f, "{self:?}")
149 }
150}
151
152pub struct OpenChannelResult {
153 pub tx_hash: Hash,
154 pub channel_id: Hash,
155}
156
157pub struct CloseChannelResult {
158 pub tx_hash: Hash,
159 pub status: ChannelStatus,
160}
161
162#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display)]
167pub enum HoprLibProcesses {
168 #[strum(to_string = "transport: {0}")]
169 Transport(HoprTransportProcess),
170 #[cfg(feature = "session-server")]
171 #[strum(to_string = "session server providing the exit node session stream functionality")]
172 SessionServer,
173 #[strum(to_string = "tick wake up the strategies to perform an action")]
174 StrategyTick,
175 #[strum(to_string = "initial indexing operation into the DB")]
176 Indexing,
177 #[strum(to_string = "processing of indexed operations in internal components")]
178 IndexReflection,
179 #[strum(to_string = "on-chain transaction queue component for outgoing transactions")]
180 OutgoingOnchainActionQueue,
181 #[strum(to_string = "flush operation of outgoing ticket indices to the DB")]
182 TicketIndexFlush,
183 #[strum(to_string = "on received ack ticket trigger")]
184 OnReceivedAcknowledgement,
185}
186
187impl HoprLibProcesses {
188 pub fn can_finish(&self) -> bool {
191 matches!(self, HoprLibProcesses::Indexing)
192 }
193}
194
195impl From<HoprTransportProcess> for HoprLibProcesses {
196 fn from(value: HoprTransportProcess) -> Self {
197 HoprLibProcesses::Transport(value)
198 }
199}
200
201#[allow(clippy::too_many_arguments)]
210pub async fn chain_events_to_transport_events<StreamIn, Db>(
211 event_stream: StreamIn,
212 me_onchain: Address,
213 db: Db,
214 multi_strategy: Arc<MultiStrategy>,
215 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
216 indexer_action_tracker: Arc<IndexerActionTracker>,
217) -> impl Stream<Item = PeerDiscovery> + Send + 'static
218where
219 Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
220 StreamIn: Stream<Item = SignificantChainEvent> + Send + 'static,
221{
222 Box::pin(event_stream.filter_map(move |event| {
223 let db = db.clone();
224 let multi_strategy = multi_strategy.clone();
225 let channel_graph = channel_graph.clone();
226 let indexer_action_tracker = indexer_action_tracker.clone();
227
228 async move {
229 let resolved = indexer_action_tracker.match_and_resolve(&event).await;
230 if resolved.is_empty() {
231 trace!(%event, "No indexer expectations resolved for the event");
232 } else {
233 debug!(count = resolved.len(), %event, "resolved indexer expectations");
234 }
235
236 match event.event_type {
237 ChainEventType::Announcement{peer, address, multiaddresses} => {
238 let allowed = db
239 .is_allowed_in_network_registry(None, &address)
240 .await
241 .unwrap_or(false);
242
243 Some(vec![PeerDiscovery::Announce(peer, multiaddresses), if allowed {
244 PeerDiscovery::Allow(peer)
245 } else {
246 PeerDiscovery::Ban(peer)
247 }])
248 }
249 ChainEventType::ChannelOpened(channel) |
250 ChainEventType::ChannelClosureInitiated(channel) |
251 ChainEventType::ChannelClosed(channel) |
252 ChainEventType::ChannelBalanceIncreased(channel, _) | ChainEventType::ChannelBalanceDecreased(channel, _) | ChainEventType::TicketRedeemed(channel, _) => { let maybe_direction = channel.direction(&me_onchain);
256
257 let change = channel_graph
258 .write_arc()
259 .await
260 .update_channel(channel);
261
262 if let Some(own_channel_direction) = maybe_direction {
264 if let Some(change_set) = change {
265 for channel_change in change_set {
266 let _ = hopr_strategy::strategy::SingularStrategy::on_own_channel_changed(
267 &*multi_strategy,
268 &channel,
269 own_channel_direction,
270 channel_change,
271 )
272 .await;
273 }
274 } else if channel.status == ChannelStatus::Open {
275 let _ = hopr_strategy::strategy::SingularStrategy::on_own_channel_changed(
277 &*multi_strategy,
278 &channel,
279 own_channel_direction,
280 ChannelChange::Status {
281 left: ChannelStatus::Closed,
282 right: ChannelStatus::Open,
283 },
284 )
285 .await;
286 }
287 }
288
289 None
290 }
291 ChainEventType::NetworkRegistryUpdate(address, allowed) => {
292 let packet_key = db.translate_key(None, address).await;
293 match packet_key {
294 Ok(pk) => {
295 if let Some(pk) = pk {
296 let offchain_key: Result<OffchainPublicKey, _> = pk.try_into();
297
298 if let Ok(offchain_key) = offchain_key {
299 let peer_id = offchain_key.into();
300
301 let res = match allowed {
302 hopr_chain_types::chain_events::NetworkRegistryStatus::Allowed => PeerDiscovery::Allow(peer_id),
303 hopr_chain_types::chain_events::NetworkRegistryStatus::Denied => PeerDiscovery::Ban(peer_id),
304 };
305
306 Some(vec![res])
307 } else {
308 error!("Failed to unwrap as offchain key at this point");
309 None
310 }
311 } else {
312 None
313 }
314 }
315 Err(error) => {
316 error!(%error, "on_network_registry_node_allowed failed");
317 None
318 },
319 }
320 }
321 ChainEventType::NodeSafeRegistered(safe_address) => {
322 info!(%safe_address, "Node safe registered");
323 None
324 }
325 }
326 }
327 })
328 .flat_map(stream::iter)
329)
330}
331
332pub struct HoprSocket {
336 rx: UnboundedReceiver<ApplicationDataIn>,
337 tx: UnboundedSender<ApplicationDataIn>,
338}
339
340impl Default for HoprSocket {
341 fn default() -> Self {
342 let (tx, rx) = unbounded::<ApplicationDataIn>();
343 Self { rx, tx }
344 }
345}
346
347impl HoprSocket {
348 pub fn new() -> Self {
349 Self::default()
350 }
351
352 pub fn reader(self) -> UnboundedReceiver<ApplicationDataIn> {
353 self.rx
354 }
355
356 pub fn writer(&self) -> UnboundedSender<ApplicationDataIn> {
357 self.tx.clone()
358 }
359}
360
361pub struct Hopr {
373 me: OffchainKeypair,
374 me_chain: ChainKeypair,
375 cfg: config::HoprLibConfig,
376 state: Arc<AtomicHoprState>,
377 transport_api: HoprTransport<HoprDb>,
378 hopr_chain_api: HoprChain<HoprDb>,
379 db: HoprDb,
381 chain_cfg: ChainNetworkConfig,
382 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
383 multistrategy: Arc<MultiStrategy>,
384 rx_indexer_significant_events: async_channel::Receiver<SignificantChainEvent>,
385}
386
387impl Hopr {
388 pub fn new(
389 mut cfg: config::HoprLibConfig,
390 me: &OffchainKeypair,
391 me_onchain: &ChainKeypair,
392 ) -> crate::errors::Result<Self> {
393 let multiaddress: Multiaddr = (&cfg.host).try_into()?;
394
395 let db_path: PathBuf = [&cfg.db.data, "db"].iter().collect();
396 info!(path = ?db_path, "Initiating DB");
397
398 if cfg.db.force_initialize {
399 info!("Force cleaning up existing database");
400 remove_dir_all(db_path.as_path()).map_err(|e| {
401 HoprLibError::GeneralError(format!(
402 "Failed to remove the existing DB directory at '{db_path:?}': {e}"
403 ))
404 })?;
405 cfg.db.initialize = true
406 }
407
408 if let Some(parent_dir_path) = db_path.as_path().parent() {
410 if !parent_dir_path.is_dir() {
411 std::fs::create_dir_all(parent_dir_path).map_err(|e| {
412 HoprLibError::GeneralError(format!(
413 "Failed to create DB parent directory at '{parent_dir_path:?}': {e}"
414 ))
415 })?
416 }
417 }
418
419 let db_cfg = HoprDbConfig {
420 create_if_missing: cfg.db.initialize,
421 force_create: cfg.db.force_initialize,
422 log_slow_queries: std::time::Duration::from_millis(150),
423 surb_ring_buffer_size: std::env::var("HOPR_PROTOCOL_SURB_RB_SIZE")
424 .ok()
425 .and_then(|s| u64::from_str(&s).map(|v| v as usize).ok())
426 .unwrap_or_else(|| HoprDbConfig::default().surb_ring_buffer_size),
427 surb_distress_threshold: std::env::var("HOPR_PROTOCOL_SURB_RB_DISTRESS")
428 .ok()
429 .and_then(|s| u64::from_str(&s).map(|v| v as usize).ok())
430 .unwrap_or_else(|| HoprDbConfig::default().surb_distress_threshold),
431 };
432 let db = futures::executor::block_on(HoprDb::new(db_path.as_path(), me_onchain.clone(), db_cfg))?;
433
434 if let Some(provider) = &cfg.chain.provider {
435 info!(provider, "Creating chain components using the custom provider");
436 } else {
437 info!("Creating chain components using the default provider");
438 }
439 let resolved_environment = hopr_chain_api::config::ChainNetworkConfig::new(
440 &cfg.chain.network,
441 crate::constants::APP_VERSION_COERCED,
442 cfg.chain.provider.as_deref(),
443 cfg.chain.max_rpc_requests_per_sec,
444 &mut cfg.chain.protocols,
445 )
446 .map_err(|e| HoprLibError::GeneralError(format!("Failed to resolve blockchain environment: {e}")))?;
447
448 let contract_addresses = ContractAddresses::from(&resolved_environment);
449 info!(
450 myself = me_onchain.public().to_hex(),
451 contract_addresses = ?contract_addresses,
452 "Resolved contract addresses",
453 );
454
455 let my_multiaddresses = vec![multiaddress];
456
457 let (tx_indexer_events, rx_indexer_events) = async_channel::unbounded::<SignificantChainEvent>();
458
459 let channel_graph = Arc::new(RwLock::new(ChannelGraph::new(
460 me_onchain.public().to_address(),
461 ChannelGraphConfig::default(),
462 )));
463
464 let hopr_transport_api = HoprTransport::new(
465 me,
466 me_onchain,
467 HoprTransportConfig {
468 transport: cfg.transport.clone(),
469 network: cfg.network_options.clone(),
470 protocol: cfg.protocol,
471 probe: cfg.probe,
472 session: cfg.session,
473 },
474 db.clone(),
475 channel_graph.clone(),
476 my_multiaddresses,
477 );
478
479 let hopr_hopr_chain_api = hopr_chain_api::HoprChain::new(
480 me_onchain.clone(),
481 db.clone(),
482 resolved_environment.clone(),
483 cfg.safe_module.module_address,
484 ContractAddresses {
485 announcements: resolved_environment.announcements,
486 channels: resolved_environment.channels,
487 token: resolved_environment.token,
488 price_oracle: resolved_environment.ticket_price_oracle,
489 win_prob_oracle: resolved_environment.winning_probability_oracle,
490 network_registry: resolved_environment.network_registry,
491 network_registry_proxy: resolved_environment.network_registry_proxy,
492 stake_factory: resolved_environment.node_stake_v2_factory,
493 safe_registry: resolved_environment.node_safe_registry,
494 module_implementation: resolved_environment.module_implementation,
495 },
496 cfg.safe_module.safe_address,
497 hopr_chain_indexer::IndexerConfig {
498 start_block_number: resolved_environment.channel_contract_deploy_block as u64,
499 fast_sync: cfg.chain.fast_sync,
500 enable_logs_snapshot: cfg.chain.enable_logs_snapshot,
501 logs_snapshot_url: cfg.chain.logs_snapshot_url.clone(),
502 data_directory: cfg.db.data.clone(),
503 },
504 tx_indexer_events,
505 )?;
506
507 let multi_strategy = Arc::new(MultiStrategy::new(
508 cfg.strategy.clone(),
509 db.clone(),
510 hopr_hopr_chain_api.actions_ref().clone(),
511 hopr_transport_api.ticket_aggregator(),
512 ));
513 debug!(
514 strategies = tracing::field::debug(&multi_strategy),
515 "Initialized strategies"
516 );
517
518 #[cfg(all(feature = "prometheus", not(test)))]
519 {
520 METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
521 METRIC_HOPR_LIB_VERSION.set(
522 &[const_format::formatcp!("{}", constants::APP_VERSION)],
523 f64::from_str(const_format::formatcp!(
524 "{}.{}",
525 env!("CARGO_PKG_VERSION_MAJOR"),
526 env!("CARGO_PKG_VERSION_MINOR")
527 ))
528 .unwrap_or(0.0),
529 );
530
531 if let Err(e) = futures::executor::block_on(db.get_ticket_statistics(None)) {
533 error!(error = %e, "Failed to initialize ticket statistics metrics");
534 }
535 }
536
537 Ok(Self {
538 me: me.clone(),
539 me_chain: me_onchain.clone(),
540 cfg,
541 state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
542 transport_api: hopr_transport_api,
543 hopr_chain_api: hopr_hopr_chain_api,
544 db,
545 chain_cfg: resolved_environment,
546 channel_graph,
547 multistrategy: multi_strategy,
548 rx_indexer_significant_events: rx_indexer_events,
549 })
550 }
551
552 fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
553 if self.status() == state {
554 Ok(())
555 } else {
556 Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
557 }
558 }
559
560 pub fn status(&self) -> HoprState {
561 self.state.load(Ordering::Relaxed)
562 }
563
564 pub fn version_coerced(&self) -> String {
565 String::from(constants::APP_VERSION_COERCED)
566 }
567
568 pub fn version(&self) -> String {
569 String::from(constants::APP_VERSION)
570 }
571
572 pub fn network(&self) -> String {
573 self.cfg.chain.network.clone()
574 }
575
576 pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
577 Ok(self.hopr_chain_api.get_balance().await?)
578 }
579
580 pub async fn get_eligibility_status(&self) -> errors::Result<bool> {
581 Ok(self.hopr_chain_api.get_eligibility_status().await?)
582 }
583
584 pub async fn get_safe_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
585 let safe_balance = self
586 .hopr_chain_api
587 .get_safe_balance(self.cfg.safe_module.safe_address)
588 .await?;
589 Ok(safe_balance)
590 }
591
592 pub fn get_safe_config(&self) -> SafeModule {
593 self.cfg.safe_module.clone()
594 }
595
596 pub fn chain_config(&self) -> ChainNetworkConfig {
597 self.chain_cfg.clone()
598 }
599
600 pub fn config(&self) -> &config::HoprLibConfig {
601 &self.cfg
602 }
603
604 pub fn get_provider(&self) -> String {
605 self.cfg
606 .chain
607 .provider
608 .clone()
609 .unwrap_or(self.chain_cfg.chain.default_provider.clone())
610 }
611
612 #[inline]
613 fn is_public(&self) -> bool {
614 self.cfg.chain.announce
615 }
616
617 pub async fn run<#[cfg(feature = "session-server")] T: HoprSessionReactor + Clone + Send + 'static>(
618 &self,
619 #[cfg(feature = "session-server")] serve_handler: T,
620 ) -> errors::Result<(HoprSocket, HashMap<HoprLibProcesses, AbortHandle>)> {
621 self.error_if_not_in_state(
622 HoprState::Uninitialized,
623 "Cannot start the hopr node multiple times".into(),
624 )?;
625
626 info!(
627 address = %self.me_onchain(), minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
628 "Node is not started, please fund this node",
629 );
630
631 let mut processes: HashMap<HoprLibProcesses, AbortHandle> = HashMap::new();
632
633 wait_for_funds(
634 self.me_onchain(),
635 *MIN_NATIVE_BALANCE,
636 Duration::from_secs(200),
637 self.hopr_chain_api.rpc(),
638 )
639 .await?;
640
641 info!("Starting the node...");
642
643 self.state.store(HoprState::Initializing, Ordering::Relaxed);
644
645 let balance: XDaiBalance = self.get_balance().await?;
646 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
647
648 info!(
649 address = %self.hopr_chain_api.me_onchain(),
650 %balance,
651 %minimum_balance,
652 "Node information"
653 );
654
655 if balance.le(&minimum_balance) {
656 return Err(HoprLibError::GeneralError(
657 "Cannot start the node without a sufficiently funded wallet".to_string(),
658 ));
659 }
660
661 let network_min_ticket_price = self.hopr_chain_api.get_minimum_ticket_price().await?;
664
665 let configured_ticket_price = self.cfg.protocol.outgoing_ticket_price;
666 if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
667 return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
668 "configured outgoing ticket price is lower than the network minimum ticket price: \
669 {configured_ticket_price:?} < {network_min_ticket_price}"
670 ))));
671 }
672
673 let network_min_win_prob = self.hopr_chain_api.get_minimum_winning_probability().await?;
676 let configured_win_prob = self.cfg.protocol.outgoing_ticket_winning_prob;
677 if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
678 && configured_win_prob
679 .and_then(|c| WinningProbability::try_from(c).ok())
680 .is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
681 {
682 return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
683 "configured outgoing ticket winning probability is lower than the network minimum winning \
684 probability: {configured_win_prob:?} < {network_min_win_prob}"
685 ))));
686 }
687
688 self.db
690 .set_safe_info(
691 None,
692 SafeInfo {
693 safe_address: self.cfg.safe_module.safe_address,
694 module_address: self.cfg.safe_module.module_address,
695 },
696 )
697 .await?;
698
699 self.state.store(HoprState::Indexing, Ordering::Relaxed);
700
701 let (mut indexer_peer_update_tx, indexer_peer_update_rx) = futures::channel::mpsc::unbounded::<PeerDiscovery>();
702
703 let indexer_event_pipeline = chain_events_to_transport_events(
704 self.rx_indexer_significant_events.clone(),
705 self.me_onchain(),
706 self.db.clone(),
707 self.multistrategy.clone(),
708 self.channel_graph.clone(),
709 self.hopr_chain_api.action_state(),
710 )
711 .await;
712
713 {
714 info!("Syncing peer announcements and network registry updates from previous runs");
717 let accounts = self.db.get_accounts(None, true).await?;
718 for account in accounts.into_iter() {
719 match account.entry_type {
720 AccountType::NotAnnounced => {}
721 AccountType::Announced { multiaddr, .. } => {
722 indexer_peer_update_tx
723 .send(PeerDiscovery::Announce(account.public_key.into(), vec![multiaddr]))
724 .await
725 .map_err(|e| {
726 HoprLibError::GeneralError(format!("Failed to send peer discovery announcement: {e}"))
727 })?;
728
729 let allow_status = if self
730 .db
731 .is_allowed_in_network_registry(None, &account.chain_addr)
732 .await?
733 {
734 PeerDiscovery::Allow(account.public_key.into())
735 } else {
736 PeerDiscovery::Ban(account.public_key.into())
737 };
738
739 indexer_peer_update_tx.send(allow_status).await.map_err(|e| {
740 HoprLibError::GeneralError(format!(
741 "Failed to send peer discovery network registry event: {e}"
742 ))
743 })?;
744 }
745 }
746 }
747 }
748
749 spawn(async move {
750 indexer_event_pipeline
751 .map(Ok)
752 .forward(indexer_peer_update_tx)
753 .await
754 .expect("The index to transport event chain failed");
755
756 tracing::info!(
757 task = "indexer pipeline for transport",
758 "long-running background task finished"
759 )
760 });
761
762 info!("Start the chain process and sync the indexer");
763 for (id, proc) in self.hopr_chain_api.start().await?.into_iter() {
764 let nid = match id {
765 HoprChainProcess::Indexer => HoprLibProcesses::Indexing,
766 HoprChainProcess::OutgoingOnchainActionQueue => HoprLibProcesses::OutgoingOnchainActionQueue,
767 };
768 processes.insert(nid, proc);
769 }
770
771 {
772 let my_ethereum_address = self.me_onchain();
774 let my_peer_id = self.me_peer_id();
775 let my_version = crate::constants::APP_VERSION;
776
777 while !self
778 .db
779 .clone()
780 .is_allowed_in_network_registry(None, &my_ethereum_address)
781 .await
782 .unwrap_or(false)
783 {
784 info!(
785 "Once you become eligible to join the HOPR network, you can continue your onboarding by using the following URL: https://hub.hoprnet.org/staking/onboarding?HOPRdNodeAddressForOnboarding={}, or by manually entering the node address of your node on https://hub.hoprnet.org/.",
786 my_ethereum_address.to_hex()
787 );
788
789 sleep(ONBOARDING_INFORMATION_INTERVAL).await;
790
791 info!(peer_id = %my_peer_id, address = %my_ethereum_address.to_hex(), version = &my_version, "Node information");
792 info!("Node Ethereum address: {my_ethereum_address} <- put this into staking hub");
793 }
794 }
795
796 let safe_module_configuration = self
802 .hopr_chain_api
803 .rpc()
804 .check_node_safe_module_status(self.me_onchain())
805 .await
806 .map_err(HoprChainError::Rpc)?;
807
808 if !safe_module_configuration.should_pass() {
809 error!(
810 ?safe_module_configuration,
811 "Something is wrong with the safe module configuration",
812 );
813 return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
814 "Safe and module are not configured correctly {safe_module_configuration:?}",
815 ))));
816 }
817
818 if can_register_with_safe(
821 self.me_onchain(),
822 self.cfg.safe_module.safe_address,
823 self.hopr_chain_api.rpc(),
824 )
825 .await?
826 {
827 info!("Registering safe by node");
828
829 if self.me_onchain() == self.cfg.safe_module.safe_address {
830 return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
831 }
832
833 if let Err(e) = self
834 .hopr_chain_api
835 .actions_ref()
836 .register_safe_by_node(self.cfg.safe_module.safe_address)
837 .await?
838 .await
839 {
840 error!(error = %e, "Failed to register node with safe")
842 }
843 }
844
845 if self.is_public() {
846 let multiaddresses_to_announce = self.transport_api.announceable_multiaddresses();
850
851 match self
853 .hopr_chain_api
854 .actions_ref()
855 .announce(&multiaddresses_to_announce, &self.me)
856 .await
857 {
858 Ok(_) => info!(?multiaddresses_to_announce, "Announcing node on chain",),
859 Err(ChainActionsError::AlreadyAnnounced) => {
860 info!(multiaddresses_announced = ?multiaddresses_to_announce, "Node already announced on chain")
861 }
862 Err(e) => error!(error = %e, "Failed to transmit node announcement"),
866 }
867 }
868
869 {
870 let channel_graph = self.channel_graph.clone();
874 let mut cg = channel_graph.write_arc().await;
875
876 info!("Syncing channels from the previous runs");
877 let mut channel_stream = self
878 .db
879 .stream_active_channels()
880 .await
881 .map_err(hopr_db_sql::api::errors::DbError::from)?;
882
883 while let Some(maybe_channel) = channel_stream.next().await {
884 match maybe_channel {
885 Ok(channel) => {
886 cg.update_channel(channel);
887 }
888 Err(error) => error!(%error, "Failed to sync channel into the graph"),
889 }
890 }
891
892 info!("Syncing peer qualities from the previous runs");
897 let min_quality_to_sync: f64 = std::env::var("HOPR_MIN_PEER_QUALITY_TO_SYNC")
898 .map_err(|e| e.to_string())
899 .and_then(|v| std::str::FromStr::from_str(&v).map_err(|_| "parse error".to_string()))
900 .unwrap_or_else(|error| {
901 warn!(error, "invalid value for HOPR_MIN_PEER_QUALITY_TO_SYNC env variable");
902 constants::DEFAULT_MIN_QUALITY_TO_SYNC
903 });
904
905 let mut peer_stream = self
906 .db
907 .get_network_peers(Default::default(), false)
908 .await?
909 .filter(|status| futures::future::ready(status.quality >= min_quality_to_sync));
910
911 while let Some(peer) = peer_stream.next().await {
912 if let Some(ChainKey(key)) = self.db.translate_key(None, peer.id.0).await? {
913 cg.update_node_score(&key, NodeScoreUpdate::Initialize(peer.last_seen_latency, 1.0));
915 } else {
916 error!(peer = %peer.id.1, "Could not translate peer information");
917 }
918 }
919
920 info!(
921 channels = cg.count_channels(),
922 nodes = cg.count_nodes(),
923 "Channel graph sync complete"
924 );
925 }
926
927 let socket = HoprSocket::new();
928 let transport_output_tx = socket.writer();
929
930 let multi_strategy_ack_ticket = self.multistrategy.clone();
932 let (on_ack_tkt_tx, mut on_ack_tkt_rx) = unbounded::<AcknowledgedTicket>();
933 self.db.start_ticket_processing(Some(on_ack_tkt_tx))?;
934
935 processes.insert(
936 HoprLibProcesses::OnReceivedAcknowledgement,
937 hopr_async_runtime::spawn_as_abortable!(async move {
938 while let Some(ack) = on_ack_tkt_rx.next().await {
939 if let Err(error) = hopr_strategy::strategy::SingularStrategy::on_acknowledged_winning_ticket(
940 &*multi_strategy_ack_ticket,
941 &ack,
942 )
943 .await
944 {
945 error!(%error, "Failed to process acknowledged winning ticket with the strategy");
946 }
947 }
948 }),
949 );
950
951 let (session_tx, _session_rx) = unbounded::<IncomingSession>();
952
953 #[cfg(feature = "session-server")]
954 {
955 processes.insert(
956 HoprLibProcesses::SessionServer,
957 hopr_async_runtime::spawn_as_abortable!(_session_rx.for_each_concurrent(None, move |session| {
958 let serve_handler = serve_handler.clone();
959 async move {
960 let session_id = *session.session.id();
961 match serve_handler.process(session).await {
962 Ok(_) => debug!(
963 session_id = ?session_id,
964 "Client session processed successfully"
965 ),
966 Err(e) => error!(
967 session_id = ?session_id,
968 error = %e,
969 "Client session processing failed"
970 ),
971 }
972 }
973 })),
974 );
975 }
976
977 for (id, proc) in self
978 .transport_api
979 .run(&self.me_chain, transport_output_tx, indexer_peer_update_rx, session_tx)
980 .await?
981 .into_iter()
982 {
983 processes.insert(id.into(), proc);
984 }
985
986 let db_clone = self.db.clone();
987 processes.insert(
988 HoprLibProcesses::TicketIndexFlush,
989 hopr_async_runtime::spawn_as_abortable!(Box::pin(execute_on_tick(
990 Duration::from_secs(5),
991 move || {
992 let db_clone = db_clone.clone();
993 async move {
994 match db_clone.persist_outgoing_ticket_indices().await {
995 Ok(n) => debug!(count = n, "Successfully flushed states of outgoing ticket indices"),
996 Err(e) => error!(error = %e, "Failed to flush ticket indices"),
997 }
998 }
999 },
1000 "flush the states of outgoing ticket indices".into(),
1001 ))),
1002 );
1003
1004 if let Err(e) = self.db.fix_channels_next_ticket_state().await {
1009 error!(error = %e, "failed to fix channels ticket states");
1010 }
1011
1012 let multi_strategy = self.multistrategy.clone();
1016 let strategy_interval = self.cfg.strategy.execution_interval;
1017 processes.insert(
1018 HoprLibProcesses::StrategyTick,
1019 hopr_async_runtime::spawn_as_abortable!(async move {
1020 execute_on_tick(
1021 Duration::from_secs(strategy_interval),
1022 move || {
1023 let multi_strategy = multi_strategy.clone();
1024
1025 async move {
1026 trace!(state = "started", "strategy tick");
1027 let _ = multi_strategy.on_tick().await;
1028 trace!(state = "finished", "strategy tick");
1029 }
1030 },
1031 "run strategies".into(),
1032 )
1033 .await;
1034 }),
1035 );
1036
1037 self.state.store(HoprState::Running, Ordering::Relaxed);
1038
1039 info!(
1040 id = %self.me_peer_id(),
1041 version = constants::APP_VERSION,
1042 "NODE STARTED AND RUNNING"
1043 );
1044
1045 #[cfg(all(feature = "prometheus", not(test)))]
1046 METRIC_HOPR_NODE_INFO.set(
1047 &[
1048 &self.me.public().to_peerid_str(),
1049 &self.me_onchain().to_string(),
1050 &self.cfg.safe_module.safe_address.to_string(),
1051 &self.cfg.safe_module.module_address.to_string(),
1052 ],
1053 1.0,
1054 );
1055
1056 Ok((socket, processes))
1057 }
1058
1059 pub fn me_peer_id(&self) -> PeerId {
1062 (*self.me.public()).into()
1063 }
1064
1065 pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
1067 Ok(self.transport_api.get_public_nodes().await?)
1068 }
1069
1070 pub async fn get_indexer_state(&self) -> errors::Result<IndexerStateInfo> {
1072 let indexer_state_info = self.db.get_indexer_state_info(None).await?;
1073
1074 match self.db.get_last_checksummed_log().await? {
1075 Some(log) => {
1076 let checksum = match log.checksum {
1077 Some(checksum) => Hash::from_hex(checksum.as_str())?,
1078 None => Hash::default(),
1079 };
1080 Ok(IndexerStateInfo {
1081 latest_log_block_number: log.block_number as u32,
1082 latest_log_checksum: checksum,
1083 ..indexer_state_info
1084 })
1085 }
1086 None => Ok(indexer_state_info),
1087 }
1088 }
1089
1090 pub async fn is_allowed_to_access_network(
1092 &self,
1093 address_like: either::Either<&PeerId, Address>,
1094 ) -> errors::Result<bool> {
1095 Ok(self.transport_api.is_allowed_to_access_network(address_like).await?)
1096 }
1097
1098 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
1102 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1103
1104 Ok(self.transport_api.ping(peer).await?)
1105 }
1106
1107 #[cfg(feature = "session-client")]
1110 pub async fn connect_to(
1111 &self,
1112 destination: Address,
1113 target: SessionTarget,
1114 cfg: SessionClientConfig,
1115 ) -> errors::Result<HoprSession> {
1116 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1117
1118 let backoff = backon::ConstantBuilder::default()
1119 .with_max_times(self.cfg.session.establish_max_retries as usize)
1120 .with_delay(self.cfg.session.establish_retry_timeout)
1121 .with_jitter();
1122
1123 use backon::Retryable;
1124
1125 Ok((|| {
1126 let cfg = cfg.clone();
1127 let target = target.clone();
1128 async { self.transport_api.new_session(destination, target, cfg).await }
1129 })
1130 .retry(backoff)
1131 .sleep(backon::FuturesTimerSleeper)
1132 .await?)
1133 }
1134
1135 #[cfg(feature = "session-client")]
1138 pub async fn keep_alive_session(&self, id: &HoprSessionId) -> errors::Result<()> {
1139 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1140 Ok(self.transport_api.probe_session(id).await?)
1141 }
1142
1143 #[cfg(feature = "session-client")]
1144 pub async fn get_session_surb_balancer_config(
1145 &self,
1146 id: &HoprSessionId,
1147 ) -> errors::Result<Option<SurbBalancerConfig>> {
1148 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1149 Ok(self.transport_api.session_surb_balancing_cfg(id).await?)
1150 }
1151
1152 #[cfg(feature = "session-client")]
1153 pub async fn update_session_surb_balancer_config(
1154 &self,
1155 id: &HoprSessionId,
1156 cfg: SurbBalancerConfig,
1157 ) -> errors::Result<()> {
1158 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1159 Ok(self.transport_api.update_session_surb_balancing_cfg(id, cfg).await?)
1160 }
1161
1162 #[tracing::instrument(level = "debug", skip(self, msg))]
1170 pub async fn send_message(
1171 &self,
1172 msg: Box<[u8]>,
1173 routing: DestinationRouting,
1174 application_tag: Tag,
1175 ) -> errors::Result<()> {
1176 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1177
1178 self.transport_api.send_message(msg, routing, application_tag).await?;
1179
1180 Ok(())
1181 }
1182
1183 pub async fn aggregate_tickets(&self, channel: &Hash) -> errors::Result<()> {
1185 Ok(self.transport_api.aggregate_tickets(channel).await?)
1186 }
1187
1188 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
1190 self.transport_api.local_multiaddresses()
1191 }
1192
1193 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
1195 self.transport_api.listening_multiaddresses().await
1196 }
1197
1198 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
1200 self.transport_api.network_observed_multiaddresses(peer).await
1201 }
1202
1203 pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Vec<Multiaddr> {
1205 let peer = *peer;
1206 let pubkey = match hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await {
1208 Ok(k) => k,
1209 Err(e) => {
1210 error!(%peer, error = %e, "failed to convert peer id to off-chain key");
1211 return vec![];
1212 }
1213 };
1214
1215 match self.db.get_account(None, pubkey).await {
1216 Ok(Some(entry)) => Vec::from_iter(entry.get_multiaddr()),
1217 Ok(None) => {
1218 error!(%peer, "no information");
1219 vec![]
1220 }
1221 Err(e) => {
1222 error!(%peer, error = %e, "failed to retrieve information");
1223 vec![]
1224 }
1225 }
1226 }
1227
1228 pub async fn network_health(&self) -> Health {
1232 self.transport_api.network_health().await
1233 }
1234
1235 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
1237 Ok(self.transport_api.network_connected_peers().await?)
1238 }
1239
1240 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<hopr_transport::PeerStatus>> {
1242 Ok(self.transport_api.network_peer_info(peer).await?)
1243 }
1244
1245 pub async fn all_network_peers(
1247 &self,
1248 minimum_quality: f64,
1249 ) -> errors::Result<Vec<(Option<Address>, PeerId, hopr_transport::PeerStatus)>> {
1250 Ok(
1251 futures::stream::iter(self.transport_api.network_connected_peers().await?)
1252 .filter_map(|peer| async move {
1253 if let Ok(Some(info)) = self.transport_api.network_peer_info(&peer).await {
1254 if info.get_average_quality() >= minimum_quality {
1255 Some((peer, info))
1256 } else {
1257 None
1258 }
1259 } else {
1260 None
1261 }
1262 })
1263 .filter_map(|(peer_id, info)| async move {
1264 let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
1265 Some((address, peer_id, info))
1266 })
1267 .collect::<Vec<_>>()
1268 .await,
1269 )
1270 }
1271
1272 pub async fn tickets_in_channel(&self, channel: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
1275 Ok(self.transport_api.tickets_in_channel(channel).await?)
1276 }
1277
1278 pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
1280 Ok(self.transport_api.all_tickets().await?)
1281 }
1282
1283 pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
1285 Ok(self.transport_api.ticket_statistics().await?)
1286 }
1287
1288 pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
1290 Ok(self.db.reset_ticket_statistics().await?)
1291 }
1292
1293 pub fn peer_resolver(&self) -> &impl HoprDbResolverOperations {
1295 &self.db
1296 }
1297
1298 pub fn me_onchain(&self) -> Address {
1300 self.hopr_chain_api.me_onchain()
1301 }
1302
1303 pub async fn get_ticket_price(&self) -> errors::Result<Option<HoprBalance>> {
1305 Ok(self.hopr_chain_api.ticket_price().await?)
1306 }
1307
1308 pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<WinningProbability> {
1310 Ok(self
1311 .db
1312 .get_indexer_data(None)
1313 .await?
1314 .minimum_incoming_ticket_winning_prob)
1315 }
1316
1317 pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
1319 Ok(self.hopr_chain_api.accounts_announced_on_chain().await?)
1320 }
1321
1322 pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
1325 Ok(self.db.get_channel_by_id(None, channel_id).await?)
1326 }
1327
1328 pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<ChannelEntry> {
1333 Ok(self.hopr_chain_api.channel(src, dest).await?)
1334 }
1335
1336 pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
1338 Ok(self.hopr_chain_api.channels_from(src).await?)
1339 }
1340
1341 pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
1343 Ok(self.hopr_chain_api.channels_to(dest).await?)
1344 }
1345
1346 pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
1348 Ok(self.hopr_chain_api.all_channels().await?)
1349 }
1350
1351 pub async fn corrupted_channels(&self) -> errors::Result<Vec<CorruptedChannelEntry>> {
1353 Ok(self.hopr_chain_api.corrupted_channels().await?)
1354 }
1355
1356 pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
1358 Ok(self.hopr_chain_api.safe_allowance().await?)
1359 }
1360
1361 pub async fn withdraw_tokens(&self, recipient: Address, amount: HoprBalance) -> errors::Result<Hash> {
1365 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1366
1367 let awaiter = self.hopr_chain_api.actions_ref().withdraw(recipient, amount).await?;
1368
1369 Ok(awaiter.await?.tx_hash)
1370 }
1371
1372 pub async fn withdraw_native(&self, recipient: Address, amount: XDaiBalance) -> errors::Result<Hash> {
1376 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1377
1378 let awaiter = self
1379 .hopr_chain_api
1380 .actions_ref()
1381 .withdraw_native(recipient, amount)
1382 .await?;
1383
1384 Ok(awaiter.await?.tx_hash)
1385 }
1386
1387 pub async fn open_channel(&self, destination: &Address, amount: HoprBalance) -> errors::Result<OpenChannelResult> {
1388 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1389
1390 let awaiter = self
1391 .hopr_chain_api
1392 .actions_ref()
1393 .open_channel(*destination, amount)
1394 .await?;
1395
1396 let channel_id = generate_channel_id(&self.hopr_chain_api.me_onchain(), destination);
1397 Ok(awaiter.await.map(|confirm| OpenChannelResult {
1398 tx_hash: confirm.tx_hash,
1399 channel_id,
1400 })?)
1401 }
1402
1403 pub async fn fund_channel(&self, channel_id: &Hash, amount: HoprBalance) -> errors::Result<Hash> {
1404 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1405
1406 let awaiter = self
1407 .hopr_chain_api
1408 .actions_ref()
1409 .fund_channel(*channel_id, amount)
1410 .await?;
1411
1412 Ok(awaiter.await?.tx_hash)
1413 }
1414
1415 pub async fn close_channel(
1416 &self,
1417 counterparty: &Address,
1418 direction: ChannelDirection,
1419 redeem_before_close: bool,
1420 ) -> errors::Result<CloseChannelResult> {
1421 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1422
1423 let confirmation = self
1424 .hopr_chain_api
1425 .actions_ref()
1426 .close_channel(*counterparty, direction, redeem_before_close)
1427 .await?
1428 .await?;
1429
1430 match confirmation
1431 .event
1432 .expect("channel close action confirmation must have associated chain event")
1433 {
1434 ChainEventType::ChannelClosureInitiated(c) => Ok(CloseChannelResult {
1435 tx_hash: confirmation.tx_hash,
1436 status: c.status, }),
1438 ChainEventType::ChannelClosed(_) => Ok(CloseChannelResult {
1439 tx_hash: confirmation.tx_hash,
1440 status: ChannelStatus::Closed,
1441 }),
1442 _ => Err(HoprLibError::GeneralError("close channel transaction failed".into())),
1443 }
1444 }
1445
1446 pub async fn close_channel_by_id(
1447 &self,
1448 channel_id: Hash,
1449 redeem_before_close: bool,
1450 ) -> errors::Result<CloseChannelResult> {
1451 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1452
1453 match self.channel_from_hash(&channel_id).await? {
1454 Some(channel) => match channel.orientation(&self.me_onchain()) {
1455 Some((direction, counterparty)) => {
1456 self.close_channel(&counterparty, direction, redeem_before_close).await
1457 }
1458 None => Err(HoprLibError::ChainError(ChainActionsError::InvalidArguments(
1459 "cannot close channel that is not own".into(),
1460 ))),
1461 },
1462 None => Err(HoprLibError::ChainError(ChainActionsError::ChannelDoesNotExist)),
1463 }
1464 }
1465
1466 pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
1467 Ok(self.hopr_chain_api.get_channel_closure_notice_period().await?)
1468 }
1469
1470 pub async fn redeem_all_tickets<B: Into<HoprBalance>>(
1471 &self,
1472 min_value: B,
1473 only_aggregated: bool,
1474 ) -> errors::Result<()> {
1475 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1476
1477 self.hopr_chain_api
1479 .actions_ref()
1480 .redeem_all_tickets(min_value.into(), only_aggregated)
1481 .await?;
1482
1483 Ok(())
1484 }
1485
1486 pub async fn redeem_tickets_with_counterparty<B: Into<HoprBalance>>(
1487 &self,
1488 counterparty: &Address,
1489 min_value: B,
1490 only_aggregated: bool,
1491 ) -> errors::Result<()> {
1492 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1493
1494 let _ = self
1496 .hopr_chain_api
1497 .actions_ref()
1498 .redeem_tickets_with_counterparty(counterparty, min_value.into(), only_aggregated)
1499 .await?;
1500
1501 Ok(())
1502 }
1503
1504 pub async fn redeem_tickets_in_channel<B: Into<HoprBalance>>(
1505 &self,
1506 channel_id: &Hash,
1507 min_value: B,
1508 only_aggregated: bool,
1509 ) -> errors::Result<usize> {
1510 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1511
1512 let channel = self.db.get_channel_by_id(None, channel_id).await?;
1513 let mut redeem_count = 0;
1514
1515 if let Some(channel) = channel {
1516 if channel.destination == self.hopr_chain_api.me_onchain() {
1517 redeem_count = self
1519 .hopr_chain_api
1520 .actions_ref()
1521 .redeem_tickets_in_channel(&channel, min_value.into(), only_aggregated)
1522 .await?
1523 .len();
1524 }
1525 }
1526
1527 Ok(redeem_count)
1528 }
1529
1530 pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
1531 self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
1532
1533 #[allow(clippy::let_underscore_future)]
1535 let _ = self.hopr_chain_api.actions_ref().redeem_ticket(ack_ticket).await?;
1536
1537 Ok(())
1538 }
1539
1540 pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
1541 let peer_id = *peer_id;
1542 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer_id))
1544 .await
1545 .map_err(|e| HoprLibError::GeneralError(format!("failed to convert peer id to off-chain key: {}", e)))?;
1546 Ok(self.db.resolve_chain_key(&pubkey).await?)
1547 }
1548
1549 pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
1550 Ok(self
1551 .db
1552 .resolve_packet_key(address)
1553 .await
1554 .map(|pk| pk.map(|v| v.into()))?)
1555 }
1556
1557 pub async fn export_channel_graph(&self, cfg: GraphExportConfig) -> String {
1558 self.channel_graph.read_arc().await.as_dot(cfg)
1559 }
1560
1561 pub async fn export_raw_channel_graph(&self) -> errors::Result<String> {
1562 let cg = self.channel_graph.read_arc().await;
1563 serde_json::to_string(cg.deref()).map_err(|e| HoprLibError::GeneralError(e.to_string()))
1564 }
1565}