hopr_chain_api/
lib.rs

1//! Crate containing the API object for chain operations used by the HOPRd node.
2
3pub mod config;
4pub mod errors;
5pub mod executors;
6
7use std::{collections::HashMap, sync::Arc, time::Duration};
8
9use alloy::{
10    rpc::{client::ClientBuilder, types::TransactionRequest},
11    transports::{
12        http::{Http, ReqwestTransport},
13        layers::RetryBackoffLayer,
14    },
15};
16use config::ChainNetworkConfig;
17use executors::{EthereumTransactionExecutor, RpcEthereumClient, RpcEthereumClientConfig};
18use futures::future::AbortHandle;
19use hopr_async_runtime::{prelude::sleep, spawn_as_abortable};
20use hopr_chain_actions::{
21    ChainActions,
22    action_queue::{ActionQueue, ActionQueueConfig},
23    action_state::IndexerActionTracker,
24    payload::SafePayloadGenerator,
25};
26use hopr_chain_indexer::{IndexerConfig, block::Indexer, handlers::ContractEventHandlers};
27use hopr_chain_rpc::{
28    HoprRpcOperations,
29    client::DefaultRetryPolicy,
30    rpc::{RpcOperations, RpcOperationsConfig},
31    transport::ReqwestClient,
32};
33use hopr_chain_types::ContractAddresses;
34pub use hopr_chain_types::chain_events::SignificantChainEvent;
35use hopr_crypto_types::prelude::*;
36use hopr_db_sql::HoprDbAllOperations;
37pub use hopr_internal_types::channels::ChannelEntry;
38use hopr_internal_types::{
39    account::AccountEntry, channels::CorruptedChannelEntry, prelude::ChannelDirection, tickets::WinningProbability,
40};
41use hopr_primitive_types::prelude::*;
42use tracing::{debug, error, info, warn};
43
44use crate::errors::{HoprChainError, Result};
45
46pub type DefaultHttpRequestor = hopr_chain_rpc::transport::ReqwestClient;
47
48/// Checks whether the node can be registered with the Safe in the NodeSafeRegistry
49pub async fn can_register_with_safe<Rpc: HoprRpcOperations>(
50    me: Address,
51    safe_address: Address,
52    rpc: &Rpc,
53) -> Result<bool> {
54    let target_address = rpc.get_module_target_address().await?;
55    debug!(node_address = %me, %safe_address, %target_address, "can register with safe");
56
57    if target_address != safe_address {
58        // cannot proceed when the safe address is not the target/owner of given module
59        return Err(HoprChainError::Api("safe is not the module target".into()));
60    }
61
62    let registered_address = rpc.get_safe_from_node_safe_registry(me).await?;
63    info!(%registered_address, "currently registered Safe address in NodeSafeRegistry");
64
65    if registered_address.is_zero() {
66        info!("Node is not associated with a Safe in NodeSafeRegistry yet");
67        Ok(true)
68    } else if registered_address != safe_address {
69        Err(HoprChainError::Api(
70            "Node is associated with a different Safe in NodeSafeRegistry".into(),
71        ))
72    } else {
73        info!("Node is associated with correct Safe in NodeSafeRegistry");
74        Ok(false)
75    }
76}
77
78/// Waits until the given address is funded.
79///
80/// This is done by querying the RPC provider for balance with backoff until `max_delay` argument.
81pub async fn wait_for_funds<Rpc: HoprRpcOperations>(
82    address: Address,
83    min_balance: XDaiBalance,
84    max_delay: Duration,
85    rpc: &Rpc,
86) -> Result<()> {
87    let multiplier = 1.05;
88    let mut current_delay = Duration::from_secs(2).min(max_delay);
89
90    while current_delay <= max_delay {
91        match rpc.get_xdai_balance(address).await {
92            Ok(current_balance) => {
93                info!(balance = %current_balance, "balance status");
94                if current_balance.ge(&min_balance) {
95                    info!("node is funded");
96                    return Ok(());
97                } else {
98                    warn!("still unfunded, trying again soon");
99                }
100            }
101            Err(e) => error!(error = %e, "failed to fetch balance from the chain"),
102        }
103
104        sleep(current_delay).await;
105        current_delay = current_delay.mul_f64(multiplier);
106    }
107
108    Err(HoprChainError::Api("timeout waiting for funds".into()))
109}
110
111fn build_transport_client(url: &str) -> Result<Http<ReqwestClient>> {
112    let parsed_url = url::Url::parse(url).unwrap_or_else(|_| panic!("failed to parse URL: {url}"));
113    Ok(ReqwestTransport::new(parsed_url))
114}
115
116#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
117pub enum HoprChainProcess {
118    Indexer,
119    OutgoingOnchainActionQueue,
120}
121
122type ActionQueueType<T> = ActionQueue<
123    T,
124    IndexerActionTracker,
125    EthereumTransactionExecutor<
126        TransactionRequest,
127        RpcEthereumClient<RpcOperations<DefaultHttpRequestor>>,
128        SafePayloadGenerator,
129    >,
130>;
131
132/// Represents all chain interactions exported to be used in the hopr-lib
133///
134/// NOTE: instead of creating a unified interface the [HoprChain] exports
135/// some functionality (e.g. the [ChainActions] as a referentially used)
136/// object. This behavior will be refactored and hidden behind a trait
137/// in the future implementations.
138#[derive(Debug, Clone)]
139pub struct HoprChain<T: HoprDbAllOperations + Send + Sync + Clone + std::fmt::Debug> {
140    me_onchain: ChainKeypair,
141    safe_address: Address,
142    contract_addresses: ContractAddresses,
143    indexer_cfg: IndexerConfig,
144    indexer_events_tx: async_channel::Sender<SignificantChainEvent>,
145    db: T,
146    hopr_chain_actions: ChainActions<T>,
147    action_queue: ActionQueueType<T>,
148    action_state: Arc<IndexerActionTracker>,
149    rpc_operations: RpcOperations<DefaultHttpRequestor>,
150}
151
152impl<T: HoprDbAllOperations + Send + Sync + Clone + std::fmt::Debug + 'static> HoprChain<T> {
153    #[allow(clippy::too_many_arguments)] // TODO: refactor this function into a reasonable group of components once fully rearchitected
154    pub fn new(
155        me_onchain: ChainKeypair,
156        db: T,
157        // --
158        chain_config: ChainNetworkConfig,
159        module_address: Address,
160        // --
161        contract_addresses: ContractAddresses,
162        safe_address: Address,
163        indexer_cfg: IndexerConfig,
164        indexer_events_tx: async_channel::Sender<SignificantChainEvent>,
165    ) -> Result<Self> {
166        // TODO: extract this from the global config type
167        let mut rpc_http_config = hopr_chain_rpc::HttpPostRequestorConfig::default();
168        if let Some(max_rpc_req) = chain_config.max_requests_per_sec {
169            rpc_http_config.max_requests_per_sec = Some(max_rpc_req); // override the default if set
170        }
171
172        // TODO(#7140): replace this DefaultRetryPolicy with a custom one that computes backoff with the number of
173        // retries
174        let rpc_http_retry_policy = DefaultRetryPolicy::default();
175
176        // TODO: extract this from the global config type
177        let rpc_cfg = RpcOperationsConfig {
178            chain_id: chain_config.chain.chain_id as u64,
179            contract_addrs: contract_addresses,
180            module_address,
181            safe_address,
182            expected_block_time: Duration::from_millis(chain_config.chain.block_time),
183            tx_polling_interval: Duration::from_millis(chain_config.tx_polling_interval),
184            finality: chain_config.confirmations,
185            max_block_range_fetch_size: chain_config.max_block_range,
186            ..Default::default()
187        };
188
189        // TODO: extract this from the global config type
190        let rpc_client_cfg = RpcEthereumClientConfig::default();
191
192        // TODO: extract this from the global config type
193        let action_queue_cfg = ActionQueueConfig::default();
194
195        // --- Configs done ---
196
197        let transport_client = build_transport_client(&chain_config.chain.default_provider)?;
198
199        let rpc_client = ClientBuilder::default()
200            .layer(RetryBackoffLayer::new_with_policy(2, 100, 100, rpc_http_retry_policy))
201            .transport(transport_client.clone(), transport_client.guess_local());
202
203        let requestor = DefaultHttpRequestor::new();
204
205        // Build RPC operations
206        let rpc_operations =
207            RpcOperations::new(rpc_client, requestor, &me_onchain, rpc_cfg, None).expect("failed to initialize RPC");
208
209        // Build the Ethereum Transaction Executor that uses RpcOperations as backend
210        let ethereum_tx_executor = EthereumTransactionExecutor::new(
211            RpcEthereumClient::new(rpc_operations.clone(), rpc_client_cfg),
212            SafePayloadGenerator::new(&me_onchain, contract_addresses, module_address),
213        );
214
215        // Build the Action Queue
216        let action_queue = ActionQueue::new(
217            db.clone(),
218            IndexerActionTracker::default(),
219            ethereum_tx_executor,
220            action_queue_cfg,
221        );
222
223        let action_state = action_queue.action_state();
224        let action_sender = action_queue.new_sender();
225
226        // Instantiate Chain Actions
227        let hopr_chain_actions = ChainActions::new(&me_onchain, db.clone(), action_sender);
228
229        Ok(Self {
230            me_onchain,
231            safe_address,
232            contract_addresses,
233            indexer_cfg,
234            indexer_events_tx,
235            db,
236            hopr_chain_actions,
237            action_queue,
238            action_state,
239            rpc_operations,
240        })
241    }
242
243    /// Execute all processes of the [`HoprChain`] object.
244    ///
245    /// This method will spawn the [`HoprChainProcess::Indexer`] and [`HoprChainProcess::OutgoingOnchainActionQueue`]
246    /// processes and return join handles to the calling function.
247    pub async fn start(&self) -> errors::Result<HashMap<HoprChainProcess, AbortHandle>> {
248        let mut processes: HashMap<HoprChainProcess, AbortHandle> = HashMap::new();
249
250        processes.insert(
251            HoprChainProcess::OutgoingOnchainActionQueue,
252            spawn_as_abortable!(self.action_queue.clone().start()),
253        );
254        processes.insert(
255            HoprChainProcess::Indexer,
256            Indexer::new(
257                self.rpc_operations.clone(),
258                ContractEventHandlers::new(
259                    self.contract_addresses,
260                    self.safe_address,
261                    self.me_onchain.clone(),
262                    self.db.clone(),
263                    self.rpc_operations.clone(),
264                ),
265                self.db.clone(),
266                self.indexer_cfg.clone(),
267                self.indexer_events_tx.clone(),
268            )
269            .start()
270            .await?,
271        );
272        Ok(processes)
273    }
274
275    pub fn me_onchain(&self) -> Address {
276        self.me_onchain.public().to_address()
277    }
278
279    pub fn action_state(&self) -> Arc<IndexerActionTracker> {
280        self.action_state.clone()
281    }
282
283    pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
284        Ok(self.db.get_accounts(None, true).await?)
285    }
286
287    pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<ChannelEntry> {
288        self.db
289            .get_channel_by_parties(None, src, dest, false)
290            .await
291            .map_err(HoprChainError::from)
292            .and_then(|v| {
293                v.ok_or(errors::HoprChainError::Api(format!(
294                    "Channel entry not available {src}-{dest}"
295                )))
296            })
297    }
298
299    pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
300        Ok(self.db.get_channels_via(None, ChannelDirection::Outgoing, src).await?)
301    }
302
303    pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
304        Ok(self.db.get_channels_via(None, ChannelDirection::Incoming, dest).await?)
305    }
306
307    pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
308        Ok(self.db.get_all_channels(None).await?)
309    }
310
311    pub async fn corrupted_channels(&self) -> errors::Result<Vec<CorruptedChannelEntry>> {
312        Ok(self.db.get_all_corrupted_channels(None).await?)
313    }
314
315    pub async fn ticket_price(&self) -> errors::Result<Option<HoprBalance>> {
316        Ok(self.db.get_indexer_data(None).await?.ticket_price)
317    }
318
319    pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
320        Ok(self.db.get_safe_hopr_allowance(None).await?)
321    }
322
323    pub fn actions_ref(&self) -> &ChainActions<T> {
324        &self.hopr_chain_actions
325    }
326
327    pub fn actions_mut_ref(&mut self) -> &mut ChainActions<T> {
328        &mut self.hopr_chain_actions
329    }
330
331    pub fn rpc(&self) -> &RpcOperations<DefaultHttpRequestor> {
332        &self.rpc_operations
333    }
334
335    /// Retrieves the balance of the node's on-chain account for the specified currency.
336    ///
337    /// This method queries the on-chain balance of the node's account for the given currency.
338    /// It supports querying balances for XDai and WxHOPR currencies. If the currency is unsupported,
339    /// an error is returned.
340    ///
341    /// # Returns
342    /// * `Result<Balance<C>>` - The balance of the node's account for the specified currency, or an error if the query
343    ///   fails.
344    pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
345        let bal = if C::is::<XDai>() {
346            self.rpc_operations
347                .get_xdai_balance(self.me_onchain())
348                .await?
349                .to_be_bytes()
350        } else if C::is::<WxHOPR>() {
351            self.rpc_operations
352                .get_hopr_balance(self.me_onchain())
353                .await?
354                .to_be_bytes()
355        } else {
356            return Err(HoprChainError::Api("unsupported currency".into()));
357        };
358
359        Ok(Balance::<C>::from(U256::from_be_bytes(bal)))
360    }
361
362    /// Retrieves the balance of the specified address for the given currency.
363    ///
364    /// This method queries the on-chain balance of the provided address for the specified currency.
365    /// It supports querying balances for XDai and WxHOPR currencies. If the currency is unsupported,
366    /// an error is returned.
367    ///
368    /// # Arguments
369    /// * `address` - The address whose balance is to be retrieved.
370    ///
371    /// # Returns
372    /// * `Result<Balance<C>>` - The balance of the specified address for the given currency, or an error if the query
373    ///   fails.
374    pub async fn get_safe_balance<C: Currency + Send>(&self, safe_address: Address) -> errors::Result<Balance<C>> {
375        let bal = if C::is::<XDai>() {
376            self.rpc_operations.get_xdai_balance(safe_address).await?.to_be_bytes()
377        } else if C::is::<WxHOPR>() {
378            self.rpc_operations.get_hopr_balance(safe_address).await?.to_be_bytes()
379        } else {
380            return Err(HoprChainError::Api("unsupported currency".into()));
381        };
382
383        Ok(Balance::<C>::from(U256::from_be_bytes(bal)))
384    }
385
386    /// Retrieves the HOPR token allowance granted by the safe address to the channels contract.
387    ///
388    /// This method queries the on-chain HOPR token contract to determine how many tokens
389    /// the safe address has approved the channels contract to spend on its behalf.
390    ///
391    /// # Returns
392    /// * `Result<HoprBalance>` - The current allowance amount, or an error if the query fails
393    pub async fn get_safe_hopr_allowance(&self) -> Result<HoprBalance> {
394        Ok(self
395            .rpc_operations
396            .get_hopr_allowance(self.safe_address, self.contract_addresses.channels)
397            .await?)
398    }
399
400    pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
401        Ok(self.rpc_operations.get_channel_closure_notice_period().await?)
402    }
403
404    pub async fn get_eligibility_status(&self) -> errors::Result<bool> {
405        Ok(self.rpc_operations.get_eligibility_status(self.me_onchain()).await?)
406    }
407
408    pub async fn get_minimum_winning_probability(&self) -> errors::Result<WinningProbability> {
409        Ok(self.rpc_operations.get_minimum_network_winning_probability().await?)
410    }
411
412    pub async fn get_minimum_ticket_price(&self) -> errors::Result<HoprBalance> {
413        Ok(self.rpc_operations.get_minimum_network_ticket_price().await?)
414    }
415}