pub mod config;
pub mod constants;
pub mod errors;
use async_lock::RwLock;
use futures::{
channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
Stream, StreamExt,
};
use std::ops::Deref;
use std::{
collections::HashMap,
fmt::{Display, Formatter},
path::PathBuf,
sync::{atomic::Ordering, Arc},
time::Duration,
};
use tracing::{debug, error, info, trace, warn};
use errors::{HoprLibError, HoprStatusError};
use hopr_async_runtime::prelude::{sleep, spawn, JoinHandle};
use hopr_chain_actions::{
action_state::{ActionState, IndexerActionTracker},
channels::ChannelActions,
node::NodeActions,
redeem::TicketRedeemActions,
};
use hopr_chain_api::{
can_register_with_safe, config::ChainNetworkConfig, errors::HoprChainError, wait_for_funds, HoprChain,
HoprChainProcess, SignificantChainEvent,
};
use hopr_chain_rpc::HoprRpcOperations;
use hopr_chain_types::chain_events::ChainEventType;
use hopr_chain_types::ContractAddresses;
use hopr_crypto_types::prelude::OffchainPublicKey;
use hopr_db_api::logs::HoprDbLogOperations;
use hopr_db_sql::{
accounts::HoprDbAccountOperations,
api::{info::SafeInfo, resolver::HoprDbResolverOperations, tickets::HoprDbTicketOperations},
channels::HoprDbChannelOperations,
db::{HoprDb, HoprDbConfig},
info::{HoprDbInfoOperations, IndexerStateInfo},
prelude::{ChainOrPacketKey::ChainKey, DbSqlError, HoprDbPeersOperations},
HoprDbAllOperations, HoprDbGeneralModelOperations,
};
use hopr_path::channel_graph::ChannelGraph;
use hopr_platform::file::native::{join, remove_dir_all};
use hopr_strategy::strategy::{MultiStrategy, SingularStrategy};
use hopr_transport::{
execute_on_tick, ChainKeypair, Hash, HoprTransport, HoprTransportConfig, HoprTransportProcess, IncomingSession,
OffchainKeypair, PeerDiscovery, PeerStatus,
};
pub use {
hopr_chain_actions::errors::ChainActionsError,
hopr_chain_api::config::{
Addresses as NetworkContractAddresses, EnvironmentType, Network as ChainNetwork, ProtocolsConfig,
},
hopr_internal_types::prelude::*,
hopr_network_types::prelude::{IpProtocol, RoutingOptions},
hopr_path::channel_graph::GraphExportConfig,
hopr_primitive_types::prelude::*,
hopr_strategy::Strategy,
hopr_transport::{
config::{looks_like_domain, HostConfig, HostType},
constants::RESERVED_TAG_UPPER_LIMIT,
errors::{HoprTransportError, NetworkingError, ProtocolError},
ApplicationData, HalfKeyChallenge, Health, IncomingSession as HoprIncomingSession, Keypair, Multiaddr,
OffchainKeypair as HoprOffchainKeypair, PeerId, SendMsg, ServiceId, Session as HoprSession, SessionCapability,
SessionClientConfig, SessionId as HoprSessionId, SessionTarget, TicketStatistics, SESSION_USABLE_MTU_SIZE,
},
};
#[cfg(feature = "runtime-tokio")]
pub use hopr_transport::transfer_session;
use crate::config::SafeModule;
use crate::constants::{MIN_NATIVE_BALANCE, ONBOARDING_INFORMATION_INTERVAL, SUGGESTED_NATIVE_BALANCE};
#[cfg(all(feature = "prometheus", not(test)))]
use {
hopr_metrics::metrics::{MultiGauge, SimpleGauge},
hopr_platform::time::native::current_time,
std::str::FromStr,
};
#[cfg(all(feature = "prometheus", not(test)))]
lazy_static::lazy_static! {
static ref METRIC_PROCESS_START_TIME: SimpleGauge = SimpleGauge::new(
"hopr_up",
"The unix timestamp in seconds at which the process was started"
).unwrap();
static ref METRIC_HOPR_LIB_VERSION: MultiGauge = MultiGauge::new(
"hopr_lib_version",
"Executed version of hopr-lib",
&["version"]
).unwrap();
static ref METRIC_HOPR_NODE_INFO: MultiGauge = MultiGauge::new(
"hopr_node_addresses",
"Node on-chain and off-chain addresses",
&["peerid", "address", "safe_address", "module_address"]
).unwrap();
}
pub use async_trait::async_trait;
#[cfg(feature = "session-server")]
#[async_trait::async_trait]
pub trait HoprSessionReactor {
async fn process(&self, session: HoprIncomingSession) -> errors::Result<()>;
}
#[atomic_enum::atomic_enum]
#[derive(PartialEq, Eq)]
pub enum HoprState {
Uninitialized = 0,
Initializing = 1,
Indexing = 2,
Starting = 3,
Running = 4,
}
impl Display for HoprState {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
pub struct OpenChannelResult {
pub tx_hash: Hash,
pub channel_id: Hash,
}
pub struct CloseChannelResult {
pub tx_hash: Hash,
pub status: ChannelStatus,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, strum::Display)]
pub enum HoprLibProcesses {
#[strum(to_string = "transport: {0}")]
Transport(HoprTransportProcess),
#[cfg(feature = "session-server")]
#[strum(to_string = "session server providing the exit node session stream functionality")]
SessionServer,
#[strum(to_string = "tick wake up the strategies to perform an action")]
StrategyTick,
#[strum(to_string = "initial indexing operation into the DB")]
Indexing,
#[strum(to_string = "processing of indexed operations in internal components")]
IndexReflection,
#[strum(to_string = "on-chain transaction queue component for outgoing transactions")]
OutgoingOnchainActionQueue,
#[strum(to_string = "flush operation of outgoing ticket indices to the DB")]
TicketIndexFlush,
#[strum(to_string = "on received ack ticket trigger")]
OnReceivedAcknowledgement,
}
impl HoprLibProcesses {
pub fn can_finish(&self) -> bool {
matches!(self, HoprLibProcesses::Indexing)
}
}
impl From<HoprTransportProcess> for HoprLibProcesses {
fn from(value: HoprTransportProcess) -> Self {
HoprLibProcesses::Transport(value)
}
}
#[allow(clippy::too_many_arguments)]
pub async fn chain_events_to_transport_events<StreamIn, Db>(
event_stream: StreamIn,
me_onchain: Address,
db: Db,
multi_strategy: Arc<MultiStrategy>,
channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
indexer_action_tracker: Arc<IndexerActionTracker>,
) -> impl Stream<Item = PeerDiscovery> + Send + 'static
where
Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
StreamIn: Stream<Item = SignificantChainEvent> + Send + 'static,
{
Box::pin(event_stream.filter_map(move |event| {
let db = db.clone();
let multi_strategy = multi_strategy.clone();
let channel_graph = channel_graph.clone();
let indexer_action_tracker = indexer_action_tracker.clone();
async move {
let resolved = indexer_action_tracker.match_and_resolve(&event).await;
debug!(count = resolved.len(), event = %event, "resolved indexer expectations", );
match event.event_type {
ChainEventType::Announcement{peer, multiaddresses, ..} => {
Some(PeerDiscovery::Announce(peer, multiaddresses))
}
ChainEventType::ChannelOpened(channel) |
ChainEventType::ChannelClosureInitiated(channel) |
ChainEventType::ChannelClosed(channel) |
ChainEventType::ChannelBalanceIncreased(channel, _) | ChainEventType::ChannelBalanceDecreased(channel, _) | ChainEventType::TicketRedeemed(channel, _) => { let maybe_direction = channel.direction(&me_onchain);
let change = channel_graph
.write()
.await
.update_channel(channel);
if let Some(own_channel_direction) = maybe_direction {
if let Some(change_set) = change {
for channel_change in change_set {
let _ = hopr_strategy::strategy::SingularStrategy::on_own_channel_changed(
&*multi_strategy,
&channel,
own_channel_direction,
channel_change,
)
.await;
}
} else if channel.status == ChannelStatus::Open {
let _ = hopr_strategy::strategy::SingularStrategy::on_own_channel_changed(
&*multi_strategy,
&channel,
own_channel_direction,
ChannelChange::Status {
left: ChannelStatus::Closed,
right: ChannelStatus::Open,
},
)
.await;
}
}
None
}
ChainEventType::NetworkRegistryUpdate(address, allowed) => {
let packet_key = db.translate_key(None, address).await;
match packet_key {
Ok(pk) => {
if let Some(pk) = pk {
let offchain_key: Result<OffchainPublicKey, _> = pk.try_into();
if let Ok(offchain_key) = offchain_key {
let peer_id = offchain_key.into();
let res = match allowed {
hopr_chain_types::chain_events::NetworkRegistryStatus::Allowed => PeerDiscovery::Allow(peer_id),
hopr_chain_types::chain_events::NetworkRegistryStatus::Denied => PeerDiscovery::Ban(peer_id),
};
Some(res)
} else {
error!("Failed to unwrap as offchain key at this point");
None
}
} else {
None
}
}
Err(e) => {
error!(error = %e, "on_network_registry_node_allowed failed");
None
},
}
}
ChainEventType::NodeSafeRegistered(safe_address) => {
info!(%safe_address, "Node safe registered");
None
}
}
}
}))
}
pub struct HoprSocket {
rx: UnboundedReceiver<ApplicationData>,
tx: UnboundedSender<ApplicationData>,
}
impl Default for HoprSocket {
fn default() -> Self {
let (tx, rx) = unbounded::<ApplicationData>();
Self { rx, tx }
}
}
impl HoprSocket {
pub fn new() -> Self {
Self::default()
}
pub fn reader(self) -> UnboundedReceiver<ApplicationData> {
self.rx
}
pub fn writer(&self) -> UnboundedSender<ApplicationData> {
self.tx.clone()
}
}
pub struct Hopr {
me: OffchainKeypair,
me_chain: ChainKeypair,
cfg: config::HoprLibConfig,
state: Arc<AtomicHoprState>,
transport_api: HoprTransport<HoprDb>,
hopr_chain_api: HoprChain<HoprDb>,
db: HoprDb,
chain_cfg: ChainNetworkConfig,
channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
multistrategy: Arc<MultiStrategy>,
rx_indexer_significant_events: async_channel::Receiver<SignificantChainEvent>,
}
impl Hopr {
pub fn new(
mut cfg: config::HoprLibConfig,
me: &OffchainKeypair,
me_onchain: &ChainKeypair,
) -> crate::errors::Result<Self> {
let multiaddress: Multiaddr = (&cfg.host).try_into()?;
let db_path: PathBuf = [&cfg.db.data, "db"].iter().collect();
info!(path = ?db_path, "Initiating DB");
if cfg.db.force_initialize {
info!("Force cleaning up existing database");
remove_dir_all(db_path.as_path()).map_err(|e| {
HoprLibError::GeneralError(format!(
"Failed to remove the existing DB directory at '{db_path:?}': {e}"
))
})?;
cfg.db.initialize = true
}
if let Some(parent_dir_path) = db_path.as_path().parent() {
if !parent_dir_path.is_dir() {
std::fs::create_dir_all(parent_dir_path).map_err(|e| {
HoprLibError::GeneralError(format!(
"Failed to create DB parent directory at '{parent_dir_path:?}': {e}"
))
})?
}
}
let db_cfg = HoprDbConfig {
create_if_missing: cfg.db.initialize,
force_create: cfg.db.force_initialize,
log_slow_queries: std::time::Duration::from_millis(150),
};
let db = futures::executor::block_on(HoprDb::new(db_path.as_path(), me_onchain.clone(), db_cfg))?;
if let Some(provider) = &cfg.chain.provider {
info!(provider, "Creating chain components using the custom provider");
} else {
info!("Creating chain components using the default provider");
}
let resolved_environment = hopr_chain_api::config::ChainNetworkConfig::new(
&cfg.chain.network,
crate::constants::APP_VERSION_COERCED,
cfg.chain.provider.as_deref(),
cfg.chain.max_rpc_requests_per_sec,
&mut cfg.chain.protocols,
)
.map_err(|e| HoprLibError::GeneralError(format!("Failed to resolve blockchain environment: {e}")))?;
let contract_addresses = ContractAddresses::from(&resolved_environment);
info!(
myself = me_onchain.public().to_hex(),
contract_addresses = ?contract_addresses,
"Resolved contract addresses",
);
let my_multiaddresses = vec![multiaddress];
let (tx_indexer_events, rx_indexer_events) = async_channel::unbounded::<SignificantChainEvent>();
let channel_graph = Arc::new(RwLock::new(ChannelGraph::new(me_onchain.public().to_address())));
let hopr_transport_api = HoprTransport::new(
me,
HoprTransportConfig {
transport: cfg.transport.clone(),
network: cfg.network_options.clone(),
protocol: cfg.protocol,
heartbeat: cfg.heartbeat,
session: cfg.session,
},
db.clone(),
channel_graph.clone(),
my_multiaddresses,
);
let hopr_hopr_chain_api = hopr_chain_api::HoprChain::new(
me_onchain.clone(),
db.clone(),
resolved_environment.clone(),
cfg.safe_module.module_address,
ContractAddresses {
announcements: resolved_environment.announcements,
channels: resolved_environment.channels,
token: resolved_environment.token,
price_oracle: resolved_environment.ticket_price_oracle,
win_prob_oracle: resolved_environment.winning_probability_oracle,
network_registry: resolved_environment.network_registry,
network_registry_proxy: resolved_environment.network_registry_proxy,
stake_factory: resolved_environment.node_stake_v2_factory,
safe_registry: resolved_environment.node_safe_registry,
module_implementation: resolved_environment.module_implementation,
},
cfg.safe_module.safe_address,
hopr_chain_indexer::IndexerConfig {
start_block_number: resolved_environment.channel_contract_deploy_block as u64,
fast_sync: cfg.chain.fast_sync,
},
tx_indexer_events,
);
let multi_strategy = Arc::new(MultiStrategy::new(
cfg.strategy.clone(),
db.clone(),
hopr_hopr_chain_api.actions_ref().clone(),
hopr_transport_api.ticket_aggregator(),
));
debug!(
strategies = tracing::field::debug(&multi_strategy),
"Initialized strategies"
);
#[cfg(all(feature = "prometheus", not(test)))]
{
METRIC_PROCESS_START_TIME.set(current_time().as_unix_timestamp().as_secs_f64());
METRIC_HOPR_LIB_VERSION.set(
&[const_format::formatcp!("{}", constants::APP_VERSION)],
f64::from_str(const_format::formatcp!(
"{}.{}",
env!("CARGO_PKG_VERSION_MAJOR"),
env!("CARGO_PKG_VERSION_MINOR")
))
.unwrap_or(0.0),
);
if let Err(e) = futures::executor::block_on(db.get_ticket_statistics(None)) {
error!(error = %e, "Failed to initialize ticket statistics metrics");
}
}
Ok(Self {
me: me.clone(),
me_chain: me_onchain.clone(),
cfg,
state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
transport_api: hopr_transport_api,
hopr_chain_api: hopr_hopr_chain_api,
db,
chain_cfg: resolved_environment,
channel_graph,
multistrategy: multi_strategy,
rx_indexer_significant_events: rx_indexer_events,
})
}
fn error_if_not_in_state(&self, state: HoprState, error: String) -> errors::Result<()> {
if self.status() == state {
Ok(())
} else {
Err(HoprLibError::StatusError(HoprStatusError::NotThereYet(state, error)))
}
}
pub fn status(&self) -> HoprState {
self.state.load(Ordering::Relaxed)
}
pub fn version_coerced(&self) -> String {
String::from(constants::APP_VERSION_COERCED)
}
pub fn version(&self) -> String {
String::from(constants::APP_VERSION)
}
pub fn network(&self) -> String {
self.cfg.chain.network.clone()
}
pub async fn get_balance(&self, balance_type: BalanceType) -> errors::Result<Balance> {
Ok(self.hopr_chain_api.get_balance(balance_type).await?)
}
pub async fn get_eligibility_status(&self) -> errors::Result<bool> {
Ok(self.hopr_chain_api.get_eligibility_status().await?)
}
pub async fn get_safe_balance(&self, balance_type: BalanceType) -> errors::Result<Balance> {
let safe_balance = self
.hopr_chain_api
.get_safe_balance(self.cfg.safe_module.safe_address, balance_type)
.await?;
if balance_type == BalanceType::HOPR {
let my_db = self.db.clone();
self.db
.begin_transaction()
.await?
.perform(|tx| {
Box::pin(async move {
let db_safe_balance = my_db.get_safe_hopr_balance(Some(tx)).await?;
if safe_balance != db_safe_balance {
warn!(
%db_safe_balance,
%safe_balance,
"Safe balance in the DB mismatches on chain balance"
);
my_db.set_safe_hopr_balance(Some(tx), safe_balance).await?;
}
Ok::<_, DbSqlError>(())
})
})
.await?;
}
Ok(safe_balance)
}
pub fn get_safe_config(&self) -> SafeModule {
self.cfg.safe_module.clone()
}
pub fn chain_config(&self) -> ChainNetworkConfig {
self.chain_cfg.clone()
}
pub fn get_provider(&self) -> String {
self.cfg
.chain
.provider
.clone()
.unwrap_or(self.chain_cfg.chain.default_provider.clone())
}
#[inline]
fn is_public(&self) -> bool {
self.cfg.chain.announce
}
pub async fn run<#[cfg(feature = "session-server")] T: HoprSessionReactor + Clone + Send + 'static>(
&self,
#[cfg(feature = "session-server")] serve_handler: T,
) -> errors::Result<(HoprSocket, HashMap<HoprLibProcesses, JoinHandle<()>>)> {
self.error_if_not_in_state(
HoprState::Uninitialized,
"Cannot start the hopr node multiple times".into(),
)?;
info!(
address = %self.me_onchain(), minimum_balance = %Balance::new_from_str(SUGGESTED_NATIVE_BALANCE, BalanceType::Native),
"Node is not started, please fund this node",
);
let mut processes: HashMap<HoprLibProcesses, JoinHandle<()>> = HashMap::new();
wait_for_funds(
self.me_onchain(),
Balance::new_from_str(MIN_NATIVE_BALANCE, BalanceType::Native),
Duration::from_secs(200),
self.hopr_chain_api.rpc(),
)
.await?;
info!("Starting the node...");
self.state.store(HoprState::Initializing, Ordering::Relaxed);
let balance = self.get_balance(BalanceType::Native).await?;
let minimum_balance = Balance::new_from_str(constants::MIN_NATIVE_BALANCE, BalanceType::Native);
info!(
address = %self.hopr_chain_api.me_onchain(),
%balance,
%minimum_balance,
"Node information"
);
if balance.le(&minimum_balance) {
return Err(HoprLibError::GeneralError(
"Cannot start the node without a sufficiently funded wallet".to_string(),
));
}
info!("Linking chain and packet keys");
self.db
.insert_account(
None,
AccountEntry {
public_key: *self.me.public(),
chain_addr: self.hopr_chain_api.me_onchain(),
entry_type: AccountType::NotAnnounced,
},
)
.await?;
self.state.store(HoprState::Indexing, Ordering::Relaxed);
let (indexer_peer_update_tx, indexer_peer_update_rx) = futures::channel::mpsc::unbounded::<PeerDiscovery>();
let indexer_event_pipeline = chain_events_to_transport_events(
self.rx_indexer_significant_events.clone(),
self.me_onchain(),
self.db.clone(),
self.multistrategy.clone(),
self.channel_graph.clone(),
self.hopr_chain_api.action_state(),
)
.await;
spawn(async move {
indexer_event_pipeline
.map(Ok)
.forward(indexer_peer_update_tx)
.await
.expect("The index to transport event chain failed");
});
info!("Start the chain process and sync the indexer");
for (id, proc) in self.hopr_chain_api.start().await?.into_iter() {
let nid = match id {
HoprChainProcess::Indexer => HoprLibProcesses::Indexing,
HoprChainProcess::OutgoingOnchainActionQueue => HoprLibProcesses::OutgoingOnchainActionQueue,
};
processes.insert(nid, proc);
}
{
let my_ethereum_address = self.me_onchain().to_hex();
let my_peer_id = self.me_peer_id();
let my_version = crate::constants::APP_VERSION;
while !self.is_allowed_to_access_network(&my_peer_id).await.unwrap_or(false) {
info!("Once you become eligible to join the HOPR network, you can continue your onboarding by using the following URL: https://hub.hoprnet.org/staking/onboarding?HOPRdNodeAddressForOnboarding={my_ethereum_address}, or by manually entering the node address of your node on https://hub.hoprnet.org/.");
sleep(ONBOARDING_INFORMATION_INTERVAL).await;
info!(peer_id = %my_peer_id, address = %my_ethereum_address, version = &my_version, "Node information");
info!("Node Ethereum address: {my_ethereum_address} <- put this into staking hub");
}
}
let safe_module_configuration = self
.hopr_chain_api
.rpc()
.check_node_safe_module_status(self.me_onchain())
.await
.map_err(HoprChainError::Rpc)?;
if !safe_module_configuration.should_pass() {
error!(
?safe_module_configuration,
"Something is wrong with the safe module configuration",
);
return Err(HoprLibError::ChainApi(HoprChainError::Api(format!(
"Safe and module are not configured correctly {:?}",
safe_module_configuration,
))));
}
if can_register_with_safe(
self.me_onchain(),
self.cfg.safe_module.safe_address,
self.hopr_chain_api.rpc(),
)
.await?
{
info!("Registering safe by node");
if self.me_onchain() == self.cfg.safe_module.safe_address {
return Err(HoprLibError::GeneralError("cannot self as staking safe address".into()));
}
if let Err(e) = self
.hopr_chain_api
.actions_ref()
.register_safe_by_node(self.cfg.safe_module.safe_address)
.await?
.await
{
error!(error = %e, "Failed to register node with safe")
}
}
self.db
.set_safe_info(
None,
SafeInfo {
safe_address: self.cfg.safe_module.safe_address,
module_address: self.cfg.safe_module.module_address,
},
)
.await?;
if self.is_public() {
let multiaddresses_to_announce = self.transport_api.announceable_multiaddresses();
match self
.hopr_chain_api
.actions_ref()
.announce(&multiaddresses_to_announce, &self.me)
.await
{
Ok(_) => info!(?multiaddresses_to_announce, "Announcing node on chain",),
Err(ChainActionsError::AlreadyAnnounced) => {
info!(multiaddresses_announced = ?multiaddresses_to_announce, "Node already announced on chain", )
}
Err(e) => error!(error = %e, "Failed to transmit node announcement"),
}
}
{
let channel_graph = self.channel_graph.clone();
let mut cg = channel_graph.write().await;
info!("Syncing channels from the previous runs");
let mut channel_stream = self
.db
.stream_active_channels()
.await
.map_err(hopr_db_sql::api::errors::DbError::from)?;
while let Some(maybe_channel) = channel_stream.next().await {
match maybe_channel {
Ok(channel) => {
cg.update_channel(channel);
}
Err(error) => error!(%error, "Failed to sync channel into the graph"),
}
}
info!("Syncing peer qualities from the previous runs");
let mut peer_stream = self
.db
.get_network_peers(Default::default(), false)
.await
.map_err(hopr_db_sql::api::errors::DbError::from)?;
while let Some(peer) = peer_stream.next().await {
if let Some(ChainKey(key)) = self.db.translate_key(None, peer.id.0).await? {
cg.update_node_quality(&key, peer.get_quality());
} else {
error!(peer = %peer.id.1, "Could not translate peer information");
}
}
info!(
channels = cg.count_channels(),
nodes = cg.count_nodes(),
"Channel graph sync complete"
);
}
let socket = HoprSocket::new();
let transport_output_tx = socket.writer();
let multi_strategy_ack_ticket = self.multistrategy.clone();
let (on_ack_tkt_tx, mut on_ack_tkt_rx) = unbounded::<AcknowledgedTicket>();
self.db.start_ticket_processing(Some(on_ack_tkt_tx))?;
processes.insert(
HoprLibProcesses::OnReceivedAcknowledgement,
spawn(async move {
while let Some(ack) = on_ack_tkt_rx.next().await {
if let Err(error) = hopr_strategy::strategy::SingularStrategy::on_acknowledged_winning_ticket(
&*multi_strategy_ack_ticket,
&ack,
)
.await
{
error!(%error, "Failed to process acknowledged winning ticket with the strategy");
}
}
}),
);
let (session_tx, _session_rx) = unbounded::<IncomingSession>();
#[cfg(feature = "session-server")]
{
processes.insert(
HoprLibProcesses::SessionServer,
spawn(_session_rx.for_each_concurrent(None, move |session| {
let serve_handler = serve_handler.clone();
async move {
let session_id = *session.session.id();
match serve_handler.process(session).await {
Ok(_) => debug!(
session_id = ?session_id,
"Client session processed successfully"
),
Err(e) => error!(
session_id = ?session_id,
error = %e,
"Client session processing failed"
),
}
}
})),
);
}
for (id, proc) in self
.transport_api
.run(
&self.me_chain,
String::from(constants::APP_VERSION),
join(&[&self.cfg.db.data, "tbf"])
.map_err(|e| HoprLibError::GeneralError(format!("Failed to construct the bloom filter: {e}")))?,
transport_output_tx,
indexer_peer_update_rx,
session_tx,
)
.await?
.into_iter()
{
processes.insert(id.into(), proc);
}
let db_clone = self.db.clone();
processes.insert(
HoprLibProcesses::TicketIndexFlush,
spawn(Box::pin(execute_on_tick(
Duration::from_secs(5),
move || {
let db_clone = db_clone.clone();
async move {
match db_clone.persist_outgoing_ticket_indices().await {
Ok(n) => debug!(count = n, "Successfully flushed states of outgoing ticket indices"),
Err(e) => error!(error = %e, "Failed to flush ticket indices"),
}
}
},
"flush the states of outgoing ticket indices".into(),
))),
);
let multi_strategy = self.multistrategy.clone();
let strategy_interval = self.cfg.strategy.execution_interval;
processes.insert(
HoprLibProcesses::StrategyTick,
spawn(async move {
execute_on_tick(
Duration::from_secs(strategy_interval),
move || {
let multi_strategy = multi_strategy.clone();
async move {
trace!(state = "started", "strategy tick");
let _ = multi_strategy.on_tick().await;
trace!(state = "finished", "strategy tick");
}
},
"run strategies".into(),
)
.await;
}),
);
self.state.store(HoprState::Running, Ordering::Relaxed);
info!(
id = %self.me_peer_id(),
version = constants::APP_VERSION,
"NODE STARTED AND RUNNING"
);
#[cfg(all(feature = "prometheus", not(test)))]
METRIC_HOPR_NODE_INFO.set(
&[
&self.me.public().to_peerid_str(),
&self.me_onchain().to_string(),
&self.cfg.safe_module.safe_address.to_string(),
&self.cfg.safe_module.module_address.to_string(),
],
1.0,
);
Ok((socket, processes))
}
pub fn me_peer_id(&self) -> PeerId {
(*self.me.public()).into()
}
pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
Ok(self.transport_api.get_public_nodes().await?)
}
pub async fn get_indexer_state(&self) -> errors::Result<IndexerStateInfo> {
let indexer_state_info = self.db.get_indexer_state_info(None).await?;
match self.db.get_last_checksummed_log().await? {
Some(log) => {
let checksum = match log.checksum {
Some(checksum) => Hash::from_hex(checksum.as_str())?,
None => Hash::default(),
};
Ok(IndexerStateInfo {
latest_log_block_number: log.block_number as u32,
latest_log_checksum: checksum,
..indexer_state_info
})
}
None => Ok(indexer_state_info),
}
}
pub async fn is_allowed_to_access_network(&self, peer: &PeerId) -> errors::Result<bool> {
Ok(self.transport_api.is_allowed_to_access_network(peer).await?)
}
pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
Ok(self.transport_api.ping(peer).await?)
}
#[cfg(feature = "session-client")]
pub async fn connect_to(&self, cfg: SessionClientConfig) -> errors::Result<HoprSession> {
self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
let backoff = backon::ConstantBuilder::default()
.with_max_times(self.cfg.session.establish_max_retries as usize)
.with_delay(self.cfg.session.establish_retry_timeout)
.with_jitter();
struct Sleeper;
impl backon::Sleeper for Sleeper {
type Sleep = futures_timer::Delay;
fn sleep(&self, dur: Duration) -> Self::Sleep {
futures_timer::Delay::new(dur)
}
}
use backon::Retryable;
Ok((|| {
let cfg = cfg.clone();
async { self.transport_api.new_session(cfg).await }
})
.retry(backoff)
.sleep(Sleeper)
.await?)
}
#[tracing::instrument(level = "debug", skip(self, msg))]
pub async fn send_message(
&self,
msg: Box<[u8]>,
destination: PeerId,
options: RoutingOptions,
application_tag: Option<u16>,
) -> errors::Result<()> {
self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
self.transport_api
.send_message(msg, destination, options, application_tag)
.await?;
Ok(())
}
pub async fn aggregate_tickets(&self, channel: &Hash) -> errors::Result<()> {
Ok(self.transport_api.aggregate_tickets(channel).await?)
}
pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
self.transport_api.local_multiaddresses()
}
pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
self.transport_api.listening_multiaddresses().await
}
pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
self.transport_api.network_observed_multiaddresses(peer).await
}
pub async fn multiaddresses_announced_on_chain(&self, peer: &PeerId) -> Vec<Multiaddr> {
let key = match OffchainPublicKey::try_from(peer) {
Ok(k) => k,
Err(e) => {
error!(%peer, error = %e, "failed to convert peer id to off-chain key");
return vec![];
}
};
match self.db.get_account(None, key).await {
Ok(Some(entry)) => Vec::from_iter(entry.get_multiaddr()),
Ok(None) => {
error!(%peer, "no information");
vec![]
}
Err(e) => {
error!(%peer, error = %e, "failed to retrieve information");
vec![]
}
}
}
pub async fn network_health(&self) -> Health {
self.transport_api.network_health().await
}
pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
Ok(self.transport_api.network_connected_peers().await?)
}
pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<hopr_transport::PeerStatus>> {
Ok(self.transport_api.network_peer_info(peer).await?)
}
pub async fn all_network_peers(
&self,
minimum_quality: f64,
) -> errors::Result<Vec<(Option<Address>, PeerId, hopr_transport::PeerStatus)>> {
Ok(
futures::stream::iter(self.transport_api.network_connected_peers().await?)
.filter_map(|peer| async move {
if let Ok(Some(info)) = self.transport_api.network_peer_info(&peer).await {
if info.get_average_quality() >= minimum_quality {
Some((peer, info))
} else {
None
}
} else {
None
}
})
.filter_map(|(peer_id, info)| async move {
let address = self.peerid_to_chain_key(&peer_id).await.ok().flatten();
Some((address, peer_id, info))
})
.collect::<Vec<_>>()
.await,
)
}
pub async fn tickets_in_channel(&self, channel: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
Ok(self.transport_api.tickets_in_channel(channel).await?)
}
pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
Ok(self.transport_api.all_tickets().await?)
}
pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
Ok(self.transport_api.ticket_statistics().await?)
}
pub async fn reset_ticket_statistics(&self) -> errors::Result<()> {
Ok(self.db.reset_ticket_statistics().await?)
}
pub fn peer_resolver(&self) -> &impl HoprDbResolverOperations {
&self.db
}
pub fn me_onchain(&self) -> Address {
self.hopr_chain_api.me_onchain()
}
pub async fn get_ticket_price(&self) -> errors::Result<Option<U256>> {
Ok(self.hopr_chain_api.ticket_price().await?)
}
pub async fn get_minimum_incoming_ticket_win_probability(&self) -> errors::Result<f64> {
Ok(self
.db
.get_indexer_data(None)
.await?
.minimum_incoming_ticket_winning_prob)
}
pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
Ok(self.db.get_accounts(None, false).await?)
}
pub async fn channel_from_hash(&self, channel_id: &Hash) -> errors::Result<Option<ChannelEntry>> {
Ok(self.db.get_channel_by_id(None, channel_id).await?)
}
pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<ChannelEntry> {
Ok(self.hopr_chain_api.channel(src, dest).await?)
}
pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
Ok(self.hopr_chain_api.channels_from(src).await?)
}
pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
Ok(self.hopr_chain_api.channels_to(dest).await?)
}
pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
Ok(self.hopr_chain_api.all_channels().await?)
}
pub async fn safe_allowance(&self) -> errors::Result<Balance> {
Ok(self.hopr_chain_api.safe_allowance().await?)
}
pub async fn withdraw(&self, recipient: Address, amount: Balance) -> errors::Result<Hash> {
self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
Ok(self
.hopr_chain_api
.actions_ref()
.withdraw(recipient, amount)
.await?
.await?
.tx_hash)
}
pub async fn open_channel(&self, destination: &Address, amount: &Balance) -> errors::Result<OpenChannelResult> {
self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
let awaiter = self
.hopr_chain_api
.actions_ref()
.open_channel(*destination, *amount)
.await?;
let channel_id = generate_channel_id(&self.hopr_chain_api.me_onchain(), destination);
Ok(awaiter.await.map(|confirm| OpenChannelResult {
tx_hash: confirm.tx_hash,
channel_id,
})?)
}
pub async fn fund_channel(&self, channel_id: &Hash, amount: &Balance) -> errors::Result<Hash> {
self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
Ok(self
.hopr_chain_api
.actions_ref()
.fund_channel(*channel_id, *amount)
.await?
.await
.map(|confirm| confirm.tx_hash)?)
}
pub async fn close_channel(
&self,
counterparty: &Address,
direction: ChannelDirection,
redeem_before_close: bool,
) -> errors::Result<CloseChannelResult> {
self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
let confirmation = self
.hopr_chain_api
.actions_ref()
.close_channel(*counterparty, direction, redeem_before_close)
.await?
.await?;
match confirmation
.event
.expect("channel close action confirmation must have associated chain event")
{
ChainEventType::ChannelClosureInitiated(c) => Ok(CloseChannelResult {
tx_hash: confirmation.tx_hash,
status: c.status, }),
ChainEventType::ChannelClosed(_) => Ok(CloseChannelResult {
tx_hash: confirmation.tx_hash,
status: ChannelStatus::Closed,
}),
_ => Err(HoprLibError::GeneralError("close channel transaction failed".into())),
}
}
pub async fn close_channel_by_id(
&self,
channel_id: Hash,
redeem_before_close: bool,
) -> errors::Result<CloseChannelResult> {
self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
match self.channel_from_hash(&channel_id).await? {
Some(channel) => match channel.orientation(&self.me_onchain()) {
Some((direction, counterparty)) => {
self.close_channel(&counterparty, direction, redeem_before_close).await
}
None => Err(HoprLibError::ChainError(ChainActionsError::InvalidArguments(
"cannot close channel that is not own".into(),
))),
},
None => Err(HoprLibError::ChainError(ChainActionsError::ChannelDoesNotExist)),
}
}
pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
Ok(self.hopr_chain_api.get_channel_closure_notice_period().await?)
}
pub async fn redeem_all_tickets(&self, only_aggregated: bool) -> errors::Result<()> {
self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
self.hopr_chain_api
.actions_ref()
.redeem_all_tickets(only_aggregated)
.await?;
Ok(())
}
pub async fn redeem_tickets_with_counterparty(
&self,
counterparty: &Address,
only_aggregated: bool,
) -> errors::Result<()> {
self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
let _ = self
.hopr_chain_api
.actions_ref()
.redeem_tickets_with_counterparty(counterparty, only_aggregated)
.await?;
Ok(())
}
pub async fn redeem_tickets_in_channel(&self, channel_id: &Hash, only_aggregated: bool) -> errors::Result<usize> {
self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
let channel = self.db.get_channel_by_id(None, channel_id).await?;
let mut redeem_count = 0;
if let Some(channel) = channel {
if channel.destination == self.hopr_chain_api.me_onchain() {
redeem_count = self
.hopr_chain_api
.actions_ref()
.redeem_tickets_in_channel(&channel, only_aggregated)
.await?
.len();
}
}
Ok(redeem_count)
}
pub async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> errors::Result<()> {
self.error_if_not_in_state(HoprState::Running, "Node is not ready for on-chain operations".into())?;
#[allow(clippy::let_underscore_future)]
let _ = self.hopr_chain_api.actions_ref().redeem_ticket(ack_ticket).await?;
Ok(())
}
pub async fn peerid_to_chain_key(&self, peer_id: &PeerId) -> errors::Result<Option<Address>> {
let pk = hopr_transport::OffchainPublicKey::try_from(peer_id)?;
Ok(self.db.resolve_chain_key(&pk).await?)
}
pub async fn chain_key_to_peerid(&self, address: &Address) -> errors::Result<Option<PeerId>> {
Ok(self
.db
.resolve_packet_key(address)
.await
.map(|pk| pk.map(|v| v.into()))?)
}
pub async fn export_channel_graph(&self, cfg: GraphExportConfig) -> String {
self.channel_graph.read().await.as_dot(cfg)
}
pub async fn export_raw_channel_graph(&self) -> errors::Result<String> {
let cg = self.channel_graph.read().await;
serde_json::to_string(cg.deref()).map_err(|e| HoprLibError::GeneralError(e.to_string()))
}
}