1pub mod config;
4pub mod errors;
5pub mod executors;
6
7use std::{collections::HashMap, sync::Arc, time::Duration};
8
9use alloy::{
10 rpc::{client::ClientBuilder, types::TransactionRequest},
11 transports::{
12 http::{Http, ReqwestTransport},
13 layers::RetryBackoffLayer,
14 },
15};
16use config::ChainNetworkConfig;
17use executors::{EthereumTransactionExecutor, RpcEthereumClient, RpcEthereumClientConfig};
18use futures::future::AbortHandle;
19use hopr_async_runtime::{prelude::sleep, spawn_as_abortable};
20use hopr_chain_actions::{
21 ChainActions,
22 action_queue::{ActionQueue, ActionQueueConfig},
23 action_state::IndexerActionTracker,
24 payload::SafePayloadGenerator,
25};
26use hopr_chain_indexer::{IndexerConfig, block::Indexer, handlers::ContractEventHandlers};
27use hopr_chain_rpc::{
28 HoprRpcOperations,
29 client::DefaultRetryPolicy,
30 rpc::{RpcOperations, RpcOperationsConfig},
31 transport::ReqwestClient,
32};
33use hopr_chain_types::ContractAddresses;
34pub use hopr_chain_types::chain_events::SignificantChainEvent;
35use hopr_crypto_types::prelude::*;
36use hopr_db_sql::HoprDbAllOperations;
37pub use hopr_internal_types::channels::ChannelEntry;
38use hopr_internal_types::{account::AccountEntry, prelude::ChannelDirection, tickets::WinningProbability};
39use hopr_primitive_types::prelude::*;
40use tracing::{debug, error, info, warn};
41
42use crate::errors::{HoprChainError, Result};
43
44pub type DefaultHttpRequestor = hopr_chain_rpc::transport::ReqwestClient;
45
46pub async fn can_register_with_safe<Rpc: HoprRpcOperations>(
48 me: Address,
49 safe_address: Address,
50 rpc: &Rpc,
51) -> Result<bool> {
52 let target_address = rpc.get_module_target_address().await?;
53 debug!(node_address = %me, %safe_address, %target_address, "can register with safe");
54
55 if target_address != safe_address {
56 return Err(HoprChainError::Api("safe is not the module target".into()));
58 }
59
60 let registered_address = rpc.get_safe_from_node_safe_registry(me).await?;
61 info!(%registered_address, "currently registered Safe address in NodeSafeRegistry");
62
63 if registered_address.is_zero() {
64 info!("Node is not associated with a Safe in NodeSafeRegistry yet");
65 Ok(true)
66 } else if registered_address != safe_address {
67 Err(HoprChainError::Api(
68 "Node is associated with a different Safe in NodeSafeRegistry".into(),
69 ))
70 } else {
71 info!("Node is associated with correct Safe in NodeSafeRegistry");
72 Ok(false)
73 }
74}
75
76pub async fn wait_for_funds<Rpc: HoprRpcOperations>(
80 address: Address,
81 min_balance: XDaiBalance,
82 max_delay: Duration,
83 rpc: &Rpc,
84) -> Result<()> {
85 let multiplier = 1.05;
86 let mut current_delay = Duration::from_secs(2).min(max_delay);
87
88 while current_delay <= max_delay {
89 match rpc.get_xdai_balance(address).await {
90 Ok(current_balance) => {
91 info!(balance = %current_balance, "balance status");
92 if current_balance.ge(&min_balance) {
93 info!("node is funded");
94 return Ok(());
95 } else {
96 warn!("still unfunded, trying again soon");
97 }
98 }
99 Err(e) => error!(error = %e, "failed to fetch balance from the chain"),
100 }
101
102 sleep(current_delay).await;
103 current_delay = current_delay.mul_f64(multiplier);
104 }
105
106 Err(HoprChainError::Api("timeout waiting for funds".into()))
107}
108
109fn build_transport_client(url: &str) -> Result<Http<ReqwestClient>> {
110 let parsed_url = url::Url::parse(url).unwrap_or_else(|_| panic!("failed to parse URL: {}", url));
111 Ok(ReqwestTransport::new(parsed_url))
112}
113
114#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
115pub enum HoprChainProcess {
116 Indexer,
117 OutgoingOnchainActionQueue,
118}
119
120type ActionQueueType<T> = ActionQueue<
121 T,
122 IndexerActionTracker,
123 EthereumTransactionExecutor<
124 TransactionRequest,
125 RpcEthereumClient<RpcOperations<DefaultHttpRequestor>>,
126 SafePayloadGenerator,
127 >,
128>;
129
130#[derive(Debug, Clone)]
137pub struct HoprChain<T: HoprDbAllOperations + Send + Sync + Clone + std::fmt::Debug> {
138 me_onchain: ChainKeypair,
139 safe_address: Address,
140 contract_addresses: ContractAddresses,
141 indexer_cfg: IndexerConfig,
142 indexer_events_tx: async_channel::Sender<SignificantChainEvent>,
143 db: T,
144 hopr_chain_actions: ChainActions<T>,
145 action_queue: ActionQueueType<T>,
146 action_state: Arc<IndexerActionTracker>,
147 rpc_operations: RpcOperations<DefaultHttpRequestor>,
148}
149
150impl<T: HoprDbAllOperations + Send + Sync + Clone + std::fmt::Debug + 'static> HoprChain<T> {
151 #[allow(clippy::too_many_arguments)] pub fn new(
153 me_onchain: ChainKeypair,
154 db: T,
155 chain_config: ChainNetworkConfig,
157 module_address: Address,
158 contract_addresses: ContractAddresses,
160 safe_address: Address,
161 indexer_cfg: IndexerConfig,
162 indexer_events_tx: async_channel::Sender<SignificantChainEvent>,
163 ) -> Result<Self> {
164 let mut rpc_http_config = hopr_chain_rpc::HttpPostRequestorConfig::default();
166 if let Some(max_rpc_req) = chain_config.max_requests_per_sec {
167 rpc_http_config.max_requests_per_sec = Some(max_rpc_req); }
169
170 let rpc_http_retry_policy = DefaultRetryPolicy::default();
173
174 let rpc_cfg = RpcOperationsConfig {
176 chain_id: chain_config.chain.chain_id as u64,
177 contract_addrs: contract_addresses,
178 module_address,
179 safe_address,
180 expected_block_time: Duration::from_millis(chain_config.chain.block_time),
181 tx_polling_interval: Duration::from_millis(chain_config.tx_polling_interval),
182 finality: chain_config.confirmations,
183 max_block_range_fetch_size: chain_config.max_block_range,
184 ..Default::default()
185 };
186
187 let rpc_client_cfg = RpcEthereumClientConfig::default();
189
190 let action_queue_cfg = ActionQueueConfig::default();
192
193 let transport_client = build_transport_client(&chain_config.chain.default_provider)?;
196
197 let rpc_client = ClientBuilder::default()
198 .layer(RetryBackoffLayer::new_with_policy(2, 100, 100, rpc_http_retry_policy))
199 .transport(transport_client.clone(), transport_client.guess_local());
200
201 let requestor = DefaultHttpRequestor::new();
202
203 let rpc_operations =
205 RpcOperations::new(rpc_client, requestor, &me_onchain, rpc_cfg, None).expect("failed to initialize RPC");
206
207 let ethereum_tx_executor = EthereumTransactionExecutor::new(
209 RpcEthereumClient::new(rpc_operations.clone(), rpc_client_cfg),
210 SafePayloadGenerator::new(&me_onchain, contract_addresses, module_address),
211 );
212
213 let action_queue = ActionQueue::new(
215 db.clone(),
216 IndexerActionTracker::default(),
217 ethereum_tx_executor,
218 action_queue_cfg,
219 );
220
221 let action_state = action_queue.action_state();
222 let action_sender = action_queue.new_sender();
223
224 let hopr_chain_actions = ChainActions::new(&me_onchain, db.clone(), action_sender);
226
227 Ok(Self {
228 me_onchain,
229 safe_address,
230 contract_addresses,
231 indexer_cfg,
232 indexer_events_tx,
233 db,
234 hopr_chain_actions,
235 action_queue,
236 action_state,
237 rpc_operations,
238 })
239 }
240
241 pub async fn start(&self) -> errors::Result<HashMap<HoprChainProcess, AbortHandle>> {
246 let mut processes: HashMap<HoprChainProcess, AbortHandle> = HashMap::new();
247
248 processes.insert(
249 HoprChainProcess::OutgoingOnchainActionQueue,
250 spawn_as_abortable(self.action_queue.clone().start()),
251 );
252 processes.insert(
253 HoprChainProcess::Indexer,
254 Indexer::new(
255 self.rpc_operations.clone(),
256 ContractEventHandlers::new(
257 self.contract_addresses,
258 self.safe_address,
259 self.me_onchain.clone(),
260 self.db.clone(),
261 self.rpc_operations.clone(),
262 ),
263 self.db.clone(),
264 self.indexer_cfg,
265 self.indexer_events_tx.clone(),
266 )
267 .start()
268 .await?,
269 );
270 Ok(processes)
271 }
272
273 pub fn me_onchain(&self) -> Address {
274 self.me_onchain.public().to_address()
275 }
276
277 pub fn action_state(&self) -> Arc<IndexerActionTracker> {
278 self.action_state.clone()
279 }
280
281 pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
282 Ok(self.db.get_accounts(None, true).await?)
283 }
284
285 pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<ChannelEntry> {
286 self.db
287 .get_channel_by_parties(None, src, dest, false)
288 .await
289 .map_err(HoprChainError::from)
290 .and_then(|v| {
291 v.ok_or(errors::HoprChainError::Api(format!(
292 "Channel entry not available {}-{}",
293 src, dest
294 )))
295 })
296 }
297
298 pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
299 Ok(self.db.get_channels_via(None, ChannelDirection::Outgoing, src).await?)
300 }
301
302 pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
303 Ok(self.db.get_channels_via(None, ChannelDirection::Incoming, dest).await?)
304 }
305
306 pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
307 Ok(self.db.get_all_channels(None).await?)
308 }
309
310 pub async fn ticket_price(&self) -> errors::Result<Option<HoprBalance>> {
311 Ok(self.db.get_indexer_data(None).await?.ticket_price)
312 }
313
314 pub async fn safe_allowance(&self) -> errors::Result<HoprBalance> {
315 Ok(self.db.get_safe_hopr_allowance(None).await?)
316 }
317
318 pub fn actions_ref(&self) -> &ChainActions<T> {
319 &self.hopr_chain_actions
320 }
321
322 pub fn actions_mut_ref(&mut self) -> &mut ChainActions<T> {
323 &mut self.hopr_chain_actions
324 }
325
326 pub fn rpc(&self) -> &RpcOperations<DefaultHttpRequestor> {
327 &self.rpc_operations
328 }
329
330 pub async fn get_balance<C: Currency + Send>(&self) -> errors::Result<Balance<C>> {
340 let bal = if C::is::<XDai>() {
341 self.rpc_operations
342 .get_xdai_balance(self.me_onchain())
343 .await?
344 .to_be_bytes()
345 } else if C::is::<WxHOPR>() {
346 self.rpc_operations
347 .get_hopr_balance(self.me_onchain())
348 .await?
349 .to_be_bytes()
350 } else {
351 return Err(HoprChainError::Api("unsupported currency".into()));
352 };
353
354 Ok(Balance::<C>::from(U256::from_be_bytes(bal)))
355 }
356
357 pub async fn get_safe_balance<C: Currency + Send>(&self, safe_address: Address) -> errors::Result<Balance<C>> {
370 let bal = if C::is::<XDai>() {
371 self.rpc_operations.get_xdai_balance(safe_address).await?.to_be_bytes()
372 } else if C::is::<WxHOPR>() {
373 self.rpc_operations.get_hopr_balance(safe_address).await?.to_be_bytes()
374 } else {
375 return Err(HoprChainError::Api("unsupported currency".into()));
376 };
377
378 Ok(Balance::<C>::from(U256::from_be_bytes(bal)))
379 }
380
381 pub async fn get_safe_hopr_allowance(&self) -> Result<HoprBalance> {
389 Ok(self
390 .rpc_operations
391 .get_hopr_allowance(self.safe_address, self.contract_addresses.channels)
392 .await?)
393 }
394
395 pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
396 Ok(self.rpc_operations.get_channel_closure_notice_period().await?)
397 }
398
399 pub async fn get_eligibility_status(&self) -> errors::Result<bool> {
400 Ok(self.rpc_operations.get_eligibility_status(self.me_onchain()).await?)
401 }
402
403 pub async fn get_minimum_winning_probability(&self) -> errors::Result<WinningProbability> {
404 Ok(self.rpc_operations.get_minimum_network_winning_probability().await?)
405 }
406
407 pub async fn get_minimum_ticket_price(&self) -> errors::Result<HoprBalance> {
408 Ok(self.rpc_operations.get_minimum_network_ticket_price().await?)
409 }
410}