1pub mod config;
4pub mod errors;
5pub mod executors;
6
7use std::{collections::HashMap, sync::Arc, time::Duration};
8
9use alloy::{
10 rpc::{client::ClientBuilder, types::TransactionRequest},
11 transports::{
12 http::{Http, ReqwestTransport},
13 layers::RetryBackoffLayer,
14 },
15};
16use config::ChainNetworkConfig;
17use executors::{EthereumTransactionExecutor, RpcEthereumClient, RpcEthereumClientConfig};
18use futures::future::AbortHandle;
19use hopr_async_runtime::{prelude::sleep, spawn_as_abortable};
20use hopr_chain_actions::{
21 ChainActions,
22 action_queue::{ActionQueue, ActionQueueConfig},
23 action_state::IndexerActionTracker,
24 payload::SafePayloadGenerator,
25};
26use hopr_chain_indexer::{IndexerConfig, block::Indexer, handlers::ContractEventHandlers};
27use hopr_chain_rpc::{
28 HoprRpcOperations,
29 client::DefaultRetryPolicy,
30 rpc::{RpcOperations, RpcOperationsConfig},
31 transport::ReqwestClient,
32};
33use hopr_chain_types::ContractAddresses;
34pub use hopr_chain_types::chain_events::SignificantChainEvent;
35use hopr_crypto_types::prelude::*;
36use hopr_db_sql::HoprDbAllOperations;
37pub use hopr_internal_types::channels::ChannelEntry;
38use hopr_internal_types::{
39 account::AccountEntry, channels::CorruptedChannelEntry, prelude::ChannelDirection, tickets::WinningProbability,
40};
41use hopr_primitive_types::prelude::*;
42use tracing::{debug, error, info, warn};
43
44use crate::errors::{HoprChainError, Result};
45
46pub type DefaultHttpRequestor = hopr_chain_rpc::transport::ReqwestClient;
47
48pub async fn can_register_with_safe<Rpc: HoprRpcOperations>(
50 me: Address,
51 safe_address: Address,
52 rpc: &Rpc,
53) -> Result<bool> {
54 let target_address = rpc.get_module_target_address().await?;
55 debug!(node_address = %me, %safe_address, %target_address, "can register with safe");
56
57 if target_address != safe_address {
58 return Err(HoprChainError::Api("safe is not the module target".into()));
60 }
61
62 let registered_address = rpc.get_safe_from_node_safe_registry(me).await?;
63 info!(%registered_address, "currently registered Safe address in NodeSafeRegistry");
64
65 if registered_address.is_zero() {
66 info!("Node is not associated with a Safe in NodeSafeRegistry yet");
67 Ok(true)
68 } else if registered_address != safe_address {
69 Err(HoprChainError::Api(
70 "Node is associated with a different Safe in NodeSafeRegistry".into(),
71 ))
72 } else {
73 info!("Node is associated with correct Safe in NodeSafeRegistry");
74 Ok(false)
75 }
76}
77
78pub async fn wait_for_funds<Rpc: HoprRpcOperations>(
82 address: Address,
83 min_balance: XDaiBalance,
84 max_delay: Duration,
85 rpc: &Rpc,
86) -> Result<()> {
87 let multiplier = 1.05;
88 let mut current_delay = Duration::from_secs(2).min(max_delay);
89
90 while current_delay <= max_delay {
91 match rpc.get_xdai_balance(address).await {
92 Ok(current_balance) => {
93 info!(balance = %current_balance, "balance status");
94 if current_balance.ge(&min_balance) {
95 info!("node is funded");
96 return Ok(());
97 } else {
98 warn!("still unfunded, trying again soon");
99 }
100 }
101 Err(e) => error!(error = %e, "failed to fetch balance from the chain"),
102 }
103
104 sleep(current_delay).await;
105 current_delay = current_delay.mul_f64(multiplier);
106 }
107
108 Err(HoprChainError::Api("timeout waiting for funds".into()))
109}
110
111fn build_transport_client(url: &str) -> Result<Http<ReqwestClient>> {
112 let parsed_url = url::Url::parse(url).unwrap_or_else(|_| panic!("failed to parse URL: {url}"));
113 Ok(ReqwestTransport::new(parsed_url))
114}
115
116#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
117pub enum HoprChainProcess {
118 Indexer,
119 OutgoingOnchainActionQueue,
120}
121
122type ActionQueueType<T> = ActionQueue<
123 T,
124 IndexerActionTracker,
125 EthereumTransactionExecutor<
126 TransactionRequest,
127 RpcEthereumClient<RpcOperations<DefaultHttpRequestor>>,
128 SafePayloadGenerator,
129 >,
130>;
131
132#[derive(Debug, Clone)]
139pub struct HoprChain<T: HoprDbAllOperations + Send + Sync + Clone + std::fmt::Debug> {
140 me_onchain: ChainKeypair,
141 safe_address: Address,
142 contract_addresses: ContractAddresses,
143 indexer_cfg: IndexerConfig,
144 indexer_events_tx: async_channel::Sender<SignificantChainEvent>,
145 db: T,
146 hopr_chain_actions: ChainActions<T>,
147 action_queue: ActionQueueType<T>,
148 action_state: Arc<IndexerActionTracker>,
149 rpc_operations: RpcOperations<DefaultHttpRequestor>,
150}
151
152impl<T: HoprDbAllOperations + Send + Sync + Clone + std::fmt::Debug + 'static> HoprChain<T> {
153 #[allow(clippy::too_many_arguments)] pub fn new(
155 me_onchain: ChainKeypair,
156 db: T,
157 chain_config: ChainNetworkConfig,
159 module_address: Address,
160 contract_addresses: ContractAddresses,
162 safe_address: Address,
163 indexer_cfg: IndexerConfig,
164 indexer_events_tx: async_channel::Sender<SignificantChainEvent>,
165 ) -> Result<Self> {
166 let mut rpc_http_config = hopr_chain_rpc::HttpPostRequestorConfig::default();
168 if let Some(max_rpc_req) = chain_config.max_requests_per_sec {
169 rpc_http_config.max_requests_per_sec = Some(max_rpc_req); }
171
172 let rpc_http_retry_policy = DefaultRetryPolicy::default();
175
176 let rpc_cfg = RpcOperationsConfig {
178 chain_id: chain_config.chain.chain_id as u64,
179 contract_addrs: contract_addresses,
180 module_address,
181 safe_address,
182 expected_block_time: Duration::from_millis(chain_config.chain.block_time),
183 tx_polling_interval: Duration::from_millis(chain_config.tx_polling_interval),
184 finality: chain_config.confirmations,
185 max_block_range_fetch_size: chain_config.max_block_range,
186 ..Default::default()
187 };
188
189 let rpc_client_cfg = RpcEthereumClientConfig::default();
191
192 let action_queue_cfg = ActionQueueConfig::default();
194
195 let transport_client = build_transport_client(&chain_config.chain.default_provider)?;
198
199 let rpc_client = ClientBuilder::default()
200 .layer(RetryBackoffLayer::new_with_policy(2, 100, 100, rpc_http_retry_policy))
201 .transport(transport_client.clone(), transport_client.guess_local());
202
203 let requestor = DefaultHttpRequestor::new();
204
205 let rpc_operations =
207 RpcOperations::new(rpc_client, requestor, &me_onchain, rpc_cfg, None).expect("failed to initialize RPC");
208
209 let ethereum_tx_executor = EthereumTransactionExecutor::new(
211 RpcEthereumClient::new(rpc_operations.clone(), rpc_client_cfg),
212 SafePayloadGenerator::new(&me_onchain, contract_addresses, module_address),
213 );
214
215 let action_queue = ActionQueue::new(
217 db.clone(),
218 IndexerActionTracker::default(),
219 ethereum_tx_executor,
220 action_queue_cfg,
221 );
222
223 let action_state = action_queue.action_state();
224 let action_sender = action_queue.new_sender();
225
226 let hopr_chain_actions = ChainActions::new(&me_onchain, db.clone(), action_sender);
228
229 Ok(Self {
230 me_onchain,
231 safe_address,
232 contract_addresses,
233 indexer_cfg,
234 indexer_events_tx,
235 db,
236 hopr_chain_actions,
237 action_queue,
238 action_state,
239 rpc_operations,
240 })
241 }
242
243 pub async fn start(&self) -> errors::Result<HashMap<HoprChainProcess, AbortHandle>> {
248 let mut processes: HashMap<HoprChainProcess, AbortHandle> = HashMap::new();
249
250 processes.insert(
251 HoprChainProcess::OutgoingOnchainActionQueue,
252 spawn_as_abortable!(self.action_queue.clone().start()),
253 );
254 processes.insert(
255 HoprChainProcess::Indexer,
256 Indexer::new(
257 self.rpc_operations.clone(),
258 ContractEventHandlers::new(
259 self.contract_addresses,
260 self.safe_address,
261 self.me_onchain.clone(),
262 self.db.clone(),
263 self.rpc_operations.clone(),
264 ),
265 self.db.clone(),
266 self.indexer_cfg.clone(),
267 self.indexer_events_tx.clone(),
268 )
269 .start()
270 .await?,
271 );
272 Ok(processes)
273 }
274
275 pub fn me_onchain(&self) -> Address {
276 self.me_onchain.public().to_address()
277 }
278
279 pub fn action_state(&self) -> Arc<IndexerActionTracker> {
280 self.action_state.clone()
281 }
282
283 pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
284 Ok(self.db.get_accounts(None, true).await?)
285 }
286
287 pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<ChannelEntry> {
288 self.db
289 .get_channel_by_parties(None, src, dest, false)
290 .await
291 .map_err(HoprChainError::from)
292 .and_then(|v| {
293 v.ok_or(errors::HoprChainError::Api(format!(
294 "Channel entry not available {src}-{dest}"
295 )))
296 })
297 }
298
299 pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
300 Ok(self.db.get_channels_via(None, ChannelDirection::Outgoing, src).await?)
301 }
302
303 pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
304 Ok(self.db.get_channels_via(None, ChannelDirection::Incoming, dest).await?)
305 }
306
307 pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
308 Ok(self.db.get_all_channels(None).await?)
309 }
310
311 pub async fn corrupted_channels(&self) -> errors::Result<Vec<CorruptedChannelEntry>> {
312 Ok(self.db.get_all_corrupted_channels(None).await?)
313 }
314
315 pub async fn ticket_price(&self) -> errors::Result<Option<HoprBalance>> {
316 Ok(self.db.get_indexer_data(None).await?.ticket_price)
317 }
318
319 pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
320 Ok(self.db.get_safe_hopr_allowance(None).await?)
321 }
322
323 pub fn actions_ref(&self) -> &ChainActions<T> {
324 &self.hopr_chain_actions
325 }
326
327 pub fn actions_mut_ref(&mut self) -> &mut ChainActions<T> {
328 &mut self.hopr_chain_actions
329 }
330
331 pub fn rpc(&self) -> &RpcOperations<DefaultHttpRequestor> {
332 &self.rpc_operations
333 }
334
335 pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
345 let bal = if C::is::<XDai>() {
346 self.rpc_operations
347 .get_xdai_balance(self.me_onchain())
348 .await?
349 .to_be_bytes()
350 } else if C::is::<WxHOPR>() {
351 self.rpc_operations
352 .get_hopr_balance(self.me_onchain())
353 .await?
354 .to_be_bytes()
355 } else {
356 return Err(HoprChainError::Api("unsupported currency".into()));
357 };
358
359 Ok(Balance::<C>::from(U256::from_be_bytes(bal)))
360 }
361
362 pub async fn get_safe_balance<C: Currency + Send>(&self, safe_address: Address) -> errors::Result<Balance<C>> {
375 let bal = if C::is::<XDai>() {
376 self.rpc_operations.get_xdai_balance(safe_address).await?.to_be_bytes()
377 } else if C::is::<WxHOPR>() {
378 self.rpc_operations.get_hopr_balance(safe_address).await?.to_be_bytes()
379 } else {
380 return Err(HoprChainError::Api("unsupported currency".into()));
381 };
382
383 Ok(Balance::<C>::from(U256::from_be_bytes(bal)))
384 }
385
386 pub async fn get_safe_hopr_allowance(&self) -> Result<HoprBalance> {
394 Ok(self
395 .rpc_operations
396 .get_hopr_allowance(self.safe_address, self.contract_addresses.channels)
397 .await?)
398 }
399
400 pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
401 Ok(self.rpc_operations.get_channel_closure_notice_period().await?)
402 }
403
404 pub async fn get_eligibility_status(&self) -> errors::Result<bool> {
405 Ok(self.rpc_operations.get_eligibility_status(self.me_onchain()).await?)
406 }
407
408 pub async fn get_minimum_winning_probability(&self) -> errors::Result<WinningProbability> {
409 Ok(self.rpc_operations.get_minimum_network_winning_probability().await?)
410 }
411
412 pub async fn get_minimum_ticket_price(&self) -> errors::Result<HoprBalance> {
413 Ok(self.rpc_operations.get_minimum_network_ticket_price().await?)
414 }
415}