hopr_chain_api/
lib.rs

1//! Crate containing the API object for chain operations used by the HOPRd node.
2
3pub mod config;
4pub mod errors;
5pub mod executors;
6
7use std::{collections::HashMap, sync::Arc, time::Duration};
8
9use alloy::{
10    rpc::{client::ClientBuilder, types::TransactionRequest},
11    transports::{
12        http::{Http, ReqwestTransport},
13        layers::RetryBackoffLayer,
14    },
15};
16use config::ChainNetworkConfig;
17use executors::{EthereumTransactionExecutor, RpcEthereumClient, RpcEthereumClientConfig};
18use futures::future::AbortHandle;
19use hopr_async_runtime::{prelude::sleep, spawn_as_abortable};
20use hopr_chain_actions::{
21    ChainActions,
22    action_queue::{ActionQueue, ActionQueueConfig},
23    action_state::IndexerActionTracker,
24    payload::SafePayloadGenerator,
25};
26use hopr_chain_indexer::{IndexerConfig, block::Indexer, handlers::ContractEventHandlers};
27use hopr_chain_rpc::{
28    HoprRpcOperations,
29    client::DefaultRetryPolicy,
30    rpc::{RpcOperations, RpcOperationsConfig},
31    transport::ReqwestClient,
32};
33use hopr_chain_types::ContractAddresses;
34pub use hopr_chain_types::chain_events::SignificantChainEvent;
35use hopr_crypto_types::prelude::*;
36use hopr_db_sql::HoprDbAllOperations;
37pub use hopr_internal_types::channels::ChannelEntry;
38use hopr_internal_types::{account::AccountEntry, prelude::ChannelDirection, tickets::WinningProbability};
39use hopr_primitive_types::prelude::*;
40use tracing::{debug, error, info, warn};
41
42use crate::errors::{HoprChainError, Result};
43
44pub type DefaultHttpRequestor = hopr_chain_rpc::transport::ReqwestClient;
45
46/// Checks whether the node can be registered with the Safe in the NodeSafeRegistry
47pub async fn can_register_with_safe<Rpc: HoprRpcOperations>(
48    me: Address,
49    safe_address: Address,
50    rpc: &Rpc,
51) -> Result<bool> {
52    let target_address = rpc.get_module_target_address().await?;
53    debug!(node_address = %me, %safe_address, %target_address, "can register with safe");
54
55    if target_address != safe_address {
56        // cannot proceed when the safe address is not the target/owner of given module
57        return Err(HoprChainError::Api("safe is not the module target".into()));
58    }
59
60    let registered_address = rpc.get_safe_from_node_safe_registry(me).await?;
61    info!(%registered_address, "currently registered Safe address in NodeSafeRegistry");
62
63    if registered_address.is_zero() {
64        info!("Node is not associated with a Safe in NodeSafeRegistry yet");
65        Ok(true)
66    } else if registered_address != safe_address {
67        Err(HoprChainError::Api(
68            "Node is associated with a different Safe in NodeSafeRegistry".into(),
69        ))
70    } else {
71        info!("Node is associated with correct Safe in NodeSafeRegistry");
72        Ok(false)
73    }
74}
75
76/// Waits until the given address is funded.
77///
78/// This is done by querying the RPC provider for balance with backoff until `max_delay` argument.
79pub async fn wait_for_funds<Rpc: HoprRpcOperations>(
80    address: Address,
81    min_balance: XDaiBalance,
82    max_delay: Duration,
83    rpc: &Rpc,
84) -> Result<()> {
85    let multiplier = 1.05;
86    let mut current_delay = Duration::from_secs(2).min(max_delay);
87
88    while current_delay <= max_delay {
89        match rpc.get_xdai_balance(address).await {
90            Ok(current_balance) => {
91                info!(balance = %current_balance, "balance status");
92                if current_balance.ge(&min_balance) {
93                    info!("node is funded");
94                    return Ok(());
95                } else {
96                    warn!("still unfunded, trying again soon");
97                }
98            }
99            Err(e) => error!(error = %e, "failed to fetch balance from the chain"),
100        }
101
102        sleep(current_delay).await;
103        current_delay = current_delay.mul_f64(multiplier);
104    }
105
106    Err(HoprChainError::Api("timeout waiting for funds".into()))
107}
108
109fn build_transport_client(url: &str) -> Result<Http<ReqwestClient>> {
110    let parsed_url = url::Url::parse(url).unwrap_or_else(|_| panic!("failed to parse URL: {}", url));
111    Ok(ReqwestTransport::new(parsed_url))
112}
113
114#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
115pub enum HoprChainProcess {
116    Indexer,
117    OutgoingOnchainActionQueue,
118}
119
120type ActionQueueType<T> = ActionQueue<
121    T,
122    IndexerActionTracker,
123    EthereumTransactionExecutor<
124        TransactionRequest,
125        RpcEthereumClient<RpcOperations<DefaultHttpRequestor>>,
126        SafePayloadGenerator,
127    >,
128>;
129
130/// Represents all chain interactions exported to be used in the hopr-lib
131///
132/// NOTE: instead of creating a unified interface the [HoprChain] exports
133/// some functionality (e.g. the [ChainActions] as a referentially used)
134/// object. This behavior will be refactored and hidden behind a trait
135/// in the future implementations.
136#[derive(Debug, Clone)]
137pub struct HoprChain<T: HoprDbAllOperations + Send + Sync + Clone + std::fmt::Debug> {
138    me_onchain: ChainKeypair,
139    safe_address: Address,
140    contract_addresses: ContractAddresses,
141    indexer_cfg: IndexerConfig,
142    indexer_events_tx: async_channel::Sender<SignificantChainEvent>,
143    db: T,
144    hopr_chain_actions: ChainActions<T>,
145    action_queue: ActionQueueType<T>,
146    action_state: Arc<IndexerActionTracker>,
147    rpc_operations: RpcOperations<DefaultHttpRequestor>,
148}
149
150impl<T: HoprDbAllOperations + Send + Sync + Clone + std::fmt::Debug + 'static> HoprChain<T> {
151    #[allow(clippy::too_many_arguments)] // TODO: refactor this function into a reasonable group of components once fully rearchitected
152    pub fn new(
153        me_onchain: ChainKeypair,
154        db: T,
155        // --
156        chain_config: ChainNetworkConfig,
157        module_address: Address,
158        // --
159        contract_addresses: ContractAddresses,
160        safe_address: Address,
161        indexer_cfg: IndexerConfig,
162        indexer_events_tx: async_channel::Sender<SignificantChainEvent>,
163    ) -> Result<Self> {
164        // TODO: extract this from the global config type
165        let mut rpc_http_config = hopr_chain_rpc::HttpPostRequestorConfig::default();
166        if let Some(max_rpc_req) = chain_config.max_requests_per_sec {
167            rpc_http_config.max_requests_per_sec = Some(max_rpc_req); // override the default if set
168        }
169
170        // TODO(#7140): replace this DefaultRetryPolicy with a custom one that computes backoff with the number of
171        // retries
172        let rpc_http_retry_policy = DefaultRetryPolicy::default();
173
174        // TODO: extract this from the global config type
175        let rpc_cfg = RpcOperationsConfig {
176            chain_id: chain_config.chain.chain_id as u64,
177            contract_addrs: contract_addresses,
178            module_address,
179            safe_address,
180            expected_block_time: Duration::from_millis(chain_config.chain.block_time),
181            tx_polling_interval: Duration::from_millis(chain_config.tx_polling_interval),
182            finality: chain_config.confirmations,
183            max_block_range_fetch_size: chain_config.max_block_range,
184            ..Default::default()
185        };
186
187        // TODO: extract this from the global config type
188        let rpc_client_cfg = RpcEthereumClientConfig::default();
189
190        // TODO: extract this from the global config type
191        let action_queue_cfg = ActionQueueConfig::default();
192
193        // --- Configs done ---
194
195        let transport_client = build_transport_client(&chain_config.chain.default_provider)?;
196
197        let rpc_client = ClientBuilder::default()
198            .layer(RetryBackoffLayer::new_with_policy(2, 100, 100, rpc_http_retry_policy))
199            .transport(transport_client.clone(), transport_client.guess_local());
200
201        let requestor = DefaultHttpRequestor::new();
202
203        // Build RPC operations
204        let rpc_operations =
205            RpcOperations::new(rpc_client, requestor, &me_onchain, rpc_cfg, None).expect("failed to initialize RPC");
206
207        // Build the Ethereum Transaction Executor that uses RpcOperations as backend
208        let ethereum_tx_executor = EthereumTransactionExecutor::new(
209            RpcEthereumClient::new(rpc_operations.clone(), rpc_client_cfg),
210            SafePayloadGenerator::new(&me_onchain, contract_addresses, module_address),
211        );
212
213        // Build the Action Queue
214        let action_queue = ActionQueue::new(
215            db.clone(),
216            IndexerActionTracker::default(),
217            ethereum_tx_executor,
218            action_queue_cfg,
219        );
220
221        let action_state = action_queue.action_state();
222        let action_sender = action_queue.new_sender();
223
224        // Instantiate Chain Actions
225        let hopr_chain_actions = ChainActions::new(&me_onchain, db.clone(), action_sender);
226
227        Ok(Self {
228            me_onchain,
229            safe_address,
230            contract_addresses,
231            indexer_cfg,
232            indexer_events_tx,
233            db,
234            hopr_chain_actions,
235            action_queue,
236            action_state,
237            rpc_operations,
238        })
239    }
240
241    /// Execute all processes of the [`HoprChain`] object.
242    ///
243    /// This method will spawn the [`HoprChainProcess::Indexer`] and [`HoprChainProcess::OutgoingOnchainActionQueue`]
244    /// processes and return join handles to the calling function.
245    pub async fn start(&self) -> errors::Result<HashMap<HoprChainProcess, AbortHandle>> {
246        let mut processes: HashMap<HoprChainProcess, AbortHandle> = HashMap::new();
247
248        processes.insert(
249            HoprChainProcess::OutgoingOnchainActionQueue,
250            spawn_as_abortable(self.action_queue.clone().start()),
251        );
252        processes.insert(
253            HoprChainProcess::Indexer,
254            Indexer::new(
255                self.rpc_operations.clone(),
256                ContractEventHandlers::new(
257                    self.contract_addresses,
258                    self.safe_address,
259                    self.me_onchain.clone(),
260                    self.db.clone(),
261                    self.rpc_operations.clone(),
262                ),
263                self.db.clone(),
264                self.indexer_cfg,
265                self.indexer_events_tx.clone(),
266            )
267            .start()
268            .await?,
269        );
270        Ok(processes)
271    }
272
273    pub fn me_onchain(&self) -> Address {
274        self.me_onchain.public().to_address()
275    }
276
277    pub fn action_state(&self) -> Arc<IndexerActionTracker> {
278        self.action_state.clone()
279    }
280
281    pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
282        Ok(self.db.get_accounts(None, true).await?)
283    }
284
285    pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<ChannelEntry> {
286        self.db
287            .get_channel_by_parties(None, src, dest, false)
288            .await
289            .map_err(HoprChainError::from)
290            .and_then(|v| {
291                v.ok_or(errors::HoprChainError::Api(format!(
292                    "Channel entry not available {}-{}",
293                    src, dest
294                )))
295            })
296    }
297
298    pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
299        Ok(self.db.get_channels_via(None, ChannelDirection::Outgoing, src).await?)
300    }
301
302    pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
303        Ok(self.db.get_channels_via(None, ChannelDirection::Incoming, dest).await?)
304    }
305
306    pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
307        Ok(self.db.get_all_channels(None).await?)
308    }
309
310    pub async fn ticket_price(&self) -> errors::Result<Option<HoprBalance>> {
311        Ok(self.db.get_indexer_data(None).await?.ticket_price)
312    }
313
314    pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
315        Ok(self.db.get_safe_hopr_allowance(None).await?)
316    }
317
318    pub fn actions_ref(&self) -> &ChainActions<T> {
319        &self.hopr_chain_actions
320    }
321
322    pub fn actions_mut_ref(&mut self) -> &mut ChainActions<T> {
323        &mut self.hopr_chain_actions
324    }
325
326    pub fn rpc(&self) -> &RpcOperations<DefaultHttpRequestor> {
327        &self.rpc_operations
328    }
329
330    /// Retrieves the balance of the node's on-chain account for the specified currency.
331    ///
332    /// This method queries the on-chain balance of the node's account for the given currency.
333    /// It supports querying balances for XDai and WxHOPR currencies. If the currency is unsupported,
334    /// an error is returned.
335    ///
336    /// # Returns
337    /// * `Result<Balance<C>>` - The balance of the node's account for the specified currency, or an error if the query
338    ///   fails.
339    pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
340        let bal = if C::is::<XDai>() {
341            self.rpc_operations
342                .get_xdai_balance(self.me_onchain())
343                .await?
344                .to_be_bytes()
345        } else if C::is::<WxHOPR>() {
346            self.rpc_operations
347                .get_hopr_balance(self.me_onchain())
348                .await?
349                .to_be_bytes()
350        } else {
351            return Err(HoprChainError::Api("unsupported currency".into()));
352        };
353
354        Ok(Balance::<C>::from(U256::from_be_bytes(bal)))
355    }
356
357    /// Retrieves the balance of the specified address for the given currency.
358    ///
359    /// This method queries the on-chain balance of the provided address for the specified currency.
360    /// It supports querying balances for XDai and WxHOPR currencies. If the currency is unsupported,
361    /// an error is returned.
362    ///
363    /// # Arguments
364    /// * `address` - The address whose balance is to be retrieved.
365    ///
366    /// # Returns
367    /// * `Result<Balance<C>>` - The balance of the specified address for the given currency, or an error if the query
368    ///   fails.
369    pub async fn get_safe_balance<C: Currency + Send>(&self, safe_address: Address) -> errors::Result<Balance<C>> {
370        let bal = if C::is::<XDai>() {
371            self.rpc_operations.get_xdai_balance(safe_address).await?.to_be_bytes()
372        } else if C::is::<WxHOPR>() {
373            self.rpc_operations.get_hopr_balance(safe_address).await?.to_be_bytes()
374        } else {
375            return Err(HoprChainError::Api("unsupported currency".into()));
376        };
377
378        Ok(Balance::<C>::from(U256::from_be_bytes(bal)))
379    }
380
381    /// Retrieves the HOPR token allowance granted by the safe address to the channels contract.
382    ///
383    /// This method queries the on-chain HOPR token contract to determine how many tokens
384    /// the safe address has approved the channels contract to spend on its behalf.
385    ///
386    /// # Returns
387    /// * `Result<HoprBalance>` - The current allowance amount, or an error if the query fails
388    pub async fn get_safe_hopr_allowance(&self) -> Result<HoprBalance> {
389        Ok(self
390            .rpc_operations
391            .get_hopr_allowance(self.safe_address, self.contract_addresses.channels)
392            .await?)
393    }
394
395    pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
396        Ok(self.rpc_operations.get_channel_closure_notice_period().await?)
397    }
398
399    pub async fn get_eligibility_status(&self) -> errors::Result<bool> {
400        Ok(self.rpc_operations.get_eligibility_status(self.me_onchain()).await?)
401    }
402
403    pub async fn get_minimum_winning_probability(&self) -> errors::Result<WinningProbability> {
404        Ok(self.rpc_operations.get_minimum_network_winning_probability().await?)
405    }
406
407    pub async fn get_minimum_ticket_price(&self) -> errors::Result<HoprBalance> {
408        Ok(self.rpc_operations.get_minimum_network_ticket_price().await?)
409    }
410}