hopr_chain_api/
lib.rs

1//! Crate containing the API object for chain operations used by the HOPRd node.
2
3pub mod config;
4pub mod errors;
5pub mod executors;
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10use tracing::{debug, error, info, warn};
11
12use config::ChainNetworkConfig;
13use executors::{EthereumTransactionExecutor, RpcEthereumClient, RpcEthereumClientConfig};
14use hopr_async_runtime::prelude::{sleep, spawn, JoinHandle};
15use hopr_chain_actions::action_queue::{ActionQueue, ActionQueueConfig};
16use hopr_chain_actions::action_state::IndexerActionTracker;
17use hopr_chain_actions::payload::SafePayloadGenerator;
18use hopr_chain_actions::ChainActions;
19use hopr_chain_indexer::{block::Indexer, handlers::ContractEventHandlers, IndexerConfig};
20use hopr_chain_rpc::client::SimpleJsonRpcRetryPolicy;
21use hopr_chain_rpc::rpc::{RpcOperations, RpcOperationsConfig};
22use hopr_chain_rpc::HoprRpcOperations;
23pub use hopr_chain_types::chain_events::SignificantChainEvent;
24use hopr_chain_types::ContractAddresses;
25use hopr_crypto_types::prelude::*;
26use hopr_db_sql::HoprDbAllOperations;
27use hopr_internal_types::account::AccountEntry;
28pub use hopr_internal_types::channels::ChannelEntry;
29use hopr_internal_types::prelude::ChannelDirection;
30use hopr_primitive_types::prelude::*;
31
32use crate::errors::{HoprChainError, Result};
33
34/// The default HTTP request engine
35///
36/// TODO: Should be an internal type, `hopr_lib::chain` must be moved to this package
37#[cfg(feature = "runtime-async-std")]
38pub type DefaultHttpRequestor = hopr_chain_rpc::client::surf_client::SurfRequestor;
39
40// Both features could be enabled during testing; therefore, we only use tokio when its
41// exclusively enabled.
42#[cfg(all(feature = "runtime-tokio", not(feature = "runtime-async-std")))]
43pub type DefaultHttpRequestor = hopr_chain_rpc::client::reqwest_client::ReqwestRequestor;
44
45/// The default JSON RPC provider client
46///
47/// TODO: Should be an internal type, `hopr_lib::chain` must be moved to this package
48pub type JsonRpcClient = hopr_chain_rpc::client::JsonRpcProviderClient<DefaultHttpRequestor, SimpleJsonRpcRetryPolicy>;
49
50/// Checks whether the node can be registered with the Safe in the NodeSafeRegistry
51pub async fn can_register_with_safe<Rpc: HoprRpcOperations>(
52    me: Address,
53    safe_address: Address,
54    rpc: &Rpc,
55) -> Result<bool> {
56    let target_address = rpc.get_module_target_address().await?;
57    debug!(node_address = %me, %safe_address, %target_address, "can register with safe");
58
59    if target_address != safe_address {
60        // cannot proceed when the safe address is not the target/owner of given module
61        return Err(HoprChainError::Api("safe is not the module target".into()));
62    }
63
64    let registered_address = rpc.get_safe_from_node_safe_registry(me).await?;
65    info!(%registered_address, "currently registered Safe address in NodeSafeRegistry");
66
67    if registered_address.is_zero() {
68        info!("Node is not associated with a Safe in NodeSafeRegistry yet");
69        Ok(true)
70    } else if registered_address != safe_address {
71        Err(HoprChainError::Api(
72            "Node is associated with a different Safe in NodeSafeRegistry".into(),
73        ))
74    } else {
75        info!("Node is associated with correct Safe in NodeSafeRegistry");
76        Ok(false)
77    }
78}
79
80/// Waits until the given address is funded.
81///
82/// This is done by querying the RPC provider for balance with backoff until `max_delay` argument.
83pub async fn wait_for_funds<Rpc: HoprRpcOperations>(
84    address: Address,
85    min_balance: Balance,
86    max_delay: Duration,
87    rpc: &Rpc,
88) -> Result<()> {
89    let multiplier = 1.05;
90    let mut current_delay = Duration::from_secs(2).min(max_delay);
91
92    while current_delay <= max_delay {
93        match rpc.get_balance(address, min_balance.balance_type()).await {
94            Ok(current_balance) => {
95                info!(balance = %current_balance, "balance status");
96                if current_balance.ge(&min_balance) {
97                    info!("node is funded");
98                    return Ok(());
99                } else {
100                    warn!("still unfunded, trying again soon");
101                }
102            }
103            Err(e) => error!(error = %e, "failed to fetch balance from the chain"),
104        }
105
106        sleep(current_delay).await;
107        current_delay = current_delay.mul_f64(multiplier);
108    }
109
110    Err(HoprChainError::Api("timeout waiting for funds".into()))
111}
112
113#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
114pub enum HoprChainProcess {
115    Indexer,
116    OutgoingOnchainActionQueue,
117}
118
119type ActionQueueType<T> = ActionQueue<
120    T,
121    IndexerActionTracker,
122    EthereumTransactionExecutor<
123        hopr_chain_rpc::TypedTransaction,
124        RpcEthereumClient<
125            RpcOperations<
126                hopr_chain_rpc::client::JsonRpcProviderClient<DefaultHttpRequestor, SimpleJsonRpcRetryPolicy>,
127                DefaultHttpRequestor,
128            >,
129        >,
130        SafePayloadGenerator,
131    >,
132>;
133
134/// Represents all chain interactions exported to be used in the hopr-lib
135///
136/// NOTE: instead of creating a unified interface the [HoprChain] exports
137/// some functionality (e.g. the [ChainActions] as a referentially used)
138/// object. This behavior will be refactored and hidden behind a trait
139/// in the future implementations.
140#[derive(Debug, Clone)]
141pub struct HoprChain<T: HoprDbAllOperations + Send + Sync + Clone + std::fmt::Debug> {
142    me_onchain: ChainKeypair,
143    safe_address: Address,
144    contract_addresses: ContractAddresses,
145    indexer_cfg: IndexerConfig,
146    indexer_events_tx: async_channel::Sender<SignificantChainEvent>,
147    db: T,
148    hopr_chain_actions: ChainActions<T>,
149    action_queue: ActionQueueType<T>,
150    action_state: Arc<IndexerActionTracker>,
151    rpc_operations: RpcOperations<JsonRpcClient, DefaultHttpRequestor>,
152}
153
154impl<T: HoprDbAllOperations + Send + Sync + Clone + std::fmt::Debug + 'static> HoprChain<T> {
155    #[allow(clippy::too_many_arguments)] // TODO: refactor this function into a reasonable group of components once fully rearchitected
156    pub fn new(
157        me_onchain: ChainKeypair,
158        db: T,
159        // --
160        chain_config: ChainNetworkConfig,
161        module_address: Address,
162        // --
163        contract_addresses: ContractAddresses,
164        safe_address: Address,
165        indexer_cfg: IndexerConfig,
166        indexer_events_tx: async_channel::Sender<SignificantChainEvent>,
167    ) -> Self {
168        // TODO: extract this from the global config type
169        let mut rpc_http_config = hopr_chain_rpc::HttpPostRequestorConfig::default();
170        if let Some(max_rpc_req) = chain_config.max_requests_per_sec {
171            rpc_http_config.max_requests_per_sec = Some(max_rpc_req); // override the default if set
172        }
173
174        // TODO: extract this from the global config type
175        let rpc_http_retry_policy = SimpleJsonRpcRetryPolicy {
176            min_retries: Some(2),
177            ..SimpleJsonRpcRetryPolicy::default()
178        };
179
180        // TODO: extract this from the global config type
181        let rpc_cfg = RpcOperationsConfig {
182            chain_id: chain_config.chain.chain_id as u64,
183            contract_addrs: contract_addresses,
184            module_address,
185            safe_address,
186            expected_block_time: Duration::from_millis(chain_config.chain.block_time),
187            tx_polling_interval: Duration::from_millis(chain_config.tx_polling_interval),
188            finality: chain_config.confirmations,
189            max_block_range_fetch_size: chain_config.max_block_range,
190            ..Default::default()
191        };
192
193        // TODO: extract this from the global config type
194        let rpc_client_cfg = RpcEthereumClientConfig::default();
195
196        // TODO: extract this from the global config type
197        let action_queue_cfg = ActionQueueConfig::default();
198
199        // --- Configs done ---
200
201        let requestor = DefaultHttpRequestor::new(rpc_http_config);
202
203        // Build JSON RPC client
204        let rpc_client = JsonRpcClient::new(
205            &chain_config.chain.default_provider,
206            requestor.clone(),
207            rpc_http_retry_policy,
208        );
209
210        // Build RPC operations
211        let rpc_operations =
212            RpcOperations::new(rpc_client, requestor, &me_onchain, rpc_cfg).expect("failed to initialize RPC");
213
214        // Build the Ethereum Transaction Executor that uses RpcOperations as backend
215        let ethereum_tx_executor = EthereumTransactionExecutor::new(
216            RpcEthereumClient::new(rpc_operations.clone(), rpc_client_cfg),
217            SafePayloadGenerator::new(&me_onchain, contract_addresses, module_address),
218        );
219
220        // Build the Action Queue
221        let action_queue = ActionQueue::new(
222            db.clone(),
223            IndexerActionTracker::default(),
224            ethereum_tx_executor,
225            action_queue_cfg,
226        );
227
228        let action_state = action_queue.action_state();
229        let action_sender = action_queue.new_sender();
230
231        // Instantiate Chain Actions
232        let hopr_chain_actions = ChainActions::new(&me_onchain, db.clone(), action_sender);
233
234        Self {
235            me_onchain,
236            safe_address,
237            contract_addresses,
238            indexer_cfg,
239            indexer_events_tx,
240            db,
241            hopr_chain_actions,
242            action_queue,
243            action_state,
244            rpc_operations,
245        }
246    }
247
248    /// Execute all processes of the [`HoprChain`] object.
249    ///
250    /// This method will spawn the [`HoprChainProcess::Indexer`] and [`HoprChainProcess::OutgoingOnchainActionQueue`]
251    /// processes and return join handles to the calling function.
252    pub async fn start(&self) -> errors::Result<HashMap<HoprChainProcess, JoinHandle<()>>> {
253        let mut processes: HashMap<HoprChainProcess, JoinHandle<()>> = HashMap::new();
254
255        processes.insert(
256            HoprChainProcess::OutgoingOnchainActionQueue,
257            spawn(self.action_queue.clone().start()),
258        );
259        processes.insert(
260            HoprChainProcess::Indexer,
261            Indexer::new(
262                self.rpc_operations.clone(),
263                ContractEventHandlers::new(
264                    self.contract_addresses,
265                    self.safe_address,
266                    self.me_onchain.clone(),
267                    self.db.clone(),
268                ),
269                self.db.clone(),
270                self.indexer_cfg,
271                self.indexer_events_tx.clone(),
272            )
273            .start()
274            .await?,
275        );
276
277        Ok(processes)
278    }
279
280    pub fn me_onchain(&self) -> Address {
281        self.me_onchain.public().to_address()
282    }
283
284    pub fn action_state(&self) -> Arc<IndexerActionTracker> {
285        self.action_state.clone()
286    }
287
288    pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
289        Ok(self.db.get_accounts(None, true).await?)
290    }
291
292    pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<ChannelEntry> {
293        self.db
294            .get_channel_by_parties(None, src, dest, false)
295            .await
296            .map_err(HoprChainError::from)
297            .and_then(|v| {
298                v.ok_or(errors::HoprChainError::Api(format!(
299                    "Channel entry not available {}-{}",
300                    src, dest
301                )))
302            })
303    }
304
305    pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
306        Ok(self.db.get_channels_via(None, ChannelDirection::Outgoing, src).await?)
307    }
308
309    pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
310        Ok(self.db.get_channels_via(None, ChannelDirection::Incoming, dest).await?)
311    }
312
313    pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
314        Ok(self.db.get_all_channels(None).await?)
315    }
316
317    pub async fn ticket_price(&self) -> errors::Result<Option<U256>> {
318        Ok(self.db.get_indexer_data(None).await?.ticket_price.map(|b| b.amount()))
319    }
320
321    pub async fn safe_allowance(&self) -> errors::Result<Balance> {
322        Ok(self.db.get_safe_hopr_allowance(None).await?)
323    }
324
325    pub fn actions_ref(&self) -> &ChainActions<T> {
326        &self.hopr_chain_actions
327    }
328
329    pub fn actions_mut_ref(&mut self) -> &mut ChainActions<T> {
330        &mut self.hopr_chain_actions
331    }
332
333    pub fn rpc(&self) -> &RpcOperations<JsonRpcClient, DefaultHttpRequestor> {
334        &self.rpc_operations
335    }
336
337    pub async fn get_balance(&self, balance_type: BalanceType) -> errors::Result<Balance> {
338        Ok(self.rpc_operations.get_balance(self.me_onchain(), balance_type).await?)
339    }
340
341    pub async fn get_safe_balance(&self, safe_address: Address, balance_type: BalanceType) -> errors::Result<Balance> {
342        Ok(self.rpc_operations.get_balance(safe_address, balance_type).await?)
343    }
344
345    pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
346        Ok(self.rpc_operations.get_channel_closure_notice_period().await?)
347    }
348
349    pub async fn get_eligibility_status(&self) -> errors::Result<bool> {
350        Ok(self.rpc_operations.get_eligibility_status(self.me_onchain()).await?)
351    }
352
353    pub async fn get_minimum_winning_probability(&self) -> errors::Result<f64> {
354        Ok(self.rpc_operations.get_minimum_network_winning_probability().await?)
355    }
356
357    pub async fn get_minimum_ticket_price(&self) -> errors::Result<Balance> {
358        Ok(self.rpc_operations.get_minimum_network_ticket_price().await?)
359    }
360}