1pub mod config;
4pub mod errors;
5pub mod executors;
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10use tracing::{debug, error, info, warn};
11
12use config::ChainNetworkConfig;
13use executors::{EthereumTransactionExecutor, RpcEthereumClient, RpcEthereumClientConfig};
14use hopr_async_runtime::prelude::{sleep, spawn, JoinHandle};
15use hopr_chain_actions::action_queue::{ActionQueue, ActionQueueConfig};
16use hopr_chain_actions::action_state::IndexerActionTracker;
17use hopr_chain_actions::payload::SafePayloadGenerator;
18use hopr_chain_actions::ChainActions;
19use hopr_chain_indexer::{block::Indexer, handlers::ContractEventHandlers, IndexerConfig};
20use hopr_chain_rpc::client::SimpleJsonRpcRetryPolicy;
21use hopr_chain_rpc::rpc::{RpcOperations, RpcOperationsConfig};
22use hopr_chain_rpc::HoprRpcOperations;
23pub use hopr_chain_types::chain_events::SignificantChainEvent;
24use hopr_chain_types::ContractAddresses;
25use hopr_crypto_types::prelude::*;
26use hopr_db_sql::HoprDbAllOperations;
27use hopr_internal_types::account::AccountEntry;
28pub use hopr_internal_types::channels::ChannelEntry;
29use hopr_internal_types::prelude::ChannelDirection;
30use hopr_primitive_types::prelude::*;
31
32use crate::errors::{HoprChainError, Result};
33
34#[cfg(feature = "runtime-async-std")]
38pub type DefaultHttpRequestor = hopr_chain_rpc::client::surf_client::SurfRequestor;
39
40#[cfg(all(feature = "runtime-tokio", not(feature = "runtime-async-std")))]
43pub type DefaultHttpRequestor = hopr_chain_rpc::client::reqwest_client::ReqwestRequestor;
44
45pub type JsonRpcClient = hopr_chain_rpc::client::JsonRpcProviderClient<DefaultHttpRequestor, SimpleJsonRpcRetryPolicy>;
49
50pub async fn can_register_with_safe<Rpc: HoprRpcOperations>(
52 me: Address,
53 safe_address: Address,
54 rpc: &Rpc,
55) -> Result<bool> {
56 let target_address = rpc.get_module_target_address().await?;
57 debug!(node_address = %me, %safe_address, %target_address, "can register with safe");
58
59 if target_address != safe_address {
60 return Err(HoprChainError::Api("safe is not the module target".into()));
62 }
63
64 let registered_address = rpc.get_safe_from_node_safe_registry(me).await?;
65 info!(%registered_address, "currently registered Safe address in NodeSafeRegistry");
66
67 if registered_address.is_zero() {
68 info!("Node is not associated with a Safe in NodeSafeRegistry yet");
69 Ok(true)
70 } else if registered_address != safe_address {
71 Err(HoprChainError::Api(
72 "Node is associated with a different Safe in NodeSafeRegistry".into(),
73 ))
74 } else {
75 info!("Node is associated with correct Safe in NodeSafeRegistry");
76 Ok(false)
77 }
78}
79
80pub async fn wait_for_funds<Rpc: HoprRpcOperations>(
84 address: Address,
85 min_balance: Balance,
86 max_delay: Duration,
87 rpc: &Rpc,
88) -> Result<()> {
89 let multiplier = 1.05;
90 let mut current_delay = Duration::from_secs(2).min(max_delay);
91
92 while current_delay <= max_delay {
93 match rpc.get_balance(address, min_balance.balance_type()).await {
94 Ok(current_balance) => {
95 info!(balance = %current_balance, "balance status");
96 if current_balance.ge(&min_balance) {
97 info!("node is funded");
98 return Ok(());
99 } else {
100 warn!("still unfunded, trying again soon");
101 }
102 }
103 Err(e) => error!(error = %e, "failed to fetch balance from the chain"),
104 }
105
106 sleep(current_delay).await;
107 current_delay = current_delay.mul_f64(multiplier);
108 }
109
110 Err(HoprChainError::Api("timeout waiting for funds".into()))
111}
112
113#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
114pub enum HoprChainProcess {
115 Indexer,
116 OutgoingOnchainActionQueue,
117}
118
119type ActionQueueType<T> = ActionQueue<
120 T,
121 IndexerActionTracker,
122 EthereumTransactionExecutor<
123 hopr_chain_rpc::TypedTransaction,
124 RpcEthereumClient<
125 RpcOperations<
126 hopr_chain_rpc::client::JsonRpcProviderClient<DefaultHttpRequestor, SimpleJsonRpcRetryPolicy>,
127 DefaultHttpRequestor,
128 >,
129 >,
130 SafePayloadGenerator,
131 >,
132>;
133
134#[derive(Debug, Clone)]
141pub struct HoprChain<T: HoprDbAllOperations + Send + Sync + Clone + std::fmt::Debug> {
142 me_onchain: ChainKeypair,
143 safe_address: Address,
144 contract_addresses: ContractAddresses,
145 indexer_cfg: IndexerConfig,
146 indexer_events_tx: async_channel::Sender<SignificantChainEvent>,
147 db: T,
148 hopr_chain_actions: ChainActions<T>,
149 action_queue: ActionQueueType<T>,
150 action_state: Arc<IndexerActionTracker>,
151 rpc_operations: RpcOperations<JsonRpcClient, DefaultHttpRequestor>,
152}
153
154impl<T: HoprDbAllOperations + Send + Sync + Clone + std::fmt::Debug + 'static> HoprChain<T> {
155 #[allow(clippy::too_many_arguments)] pub fn new(
157 me_onchain: ChainKeypair,
158 db: T,
159 chain_config: ChainNetworkConfig,
161 module_address: Address,
162 contract_addresses: ContractAddresses,
164 safe_address: Address,
165 indexer_cfg: IndexerConfig,
166 indexer_events_tx: async_channel::Sender<SignificantChainEvent>,
167 ) -> Self {
168 let mut rpc_http_config = hopr_chain_rpc::HttpPostRequestorConfig::default();
170 if let Some(max_rpc_req) = chain_config.max_requests_per_sec {
171 rpc_http_config.max_requests_per_sec = Some(max_rpc_req); }
173
174 let rpc_http_retry_policy = SimpleJsonRpcRetryPolicy {
176 min_retries: Some(2),
177 ..SimpleJsonRpcRetryPolicy::default()
178 };
179
180 let rpc_cfg = RpcOperationsConfig {
182 chain_id: chain_config.chain.chain_id as u64,
183 contract_addrs: contract_addresses,
184 module_address,
185 safe_address,
186 expected_block_time: Duration::from_millis(chain_config.chain.block_time),
187 tx_polling_interval: Duration::from_millis(chain_config.tx_polling_interval),
188 finality: chain_config.confirmations,
189 max_block_range_fetch_size: chain_config.max_block_range,
190 ..Default::default()
191 };
192
193 let rpc_client_cfg = RpcEthereumClientConfig::default();
195
196 let action_queue_cfg = ActionQueueConfig::default();
198
199 let requestor = DefaultHttpRequestor::new(rpc_http_config);
202
203 let rpc_client = JsonRpcClient::new(
205 &chain_config.chain.default_provider,
206 requestor.clone(),
207 rpc_http_retry_policy,
208 );
209
210 let rpc_operations =
212 RpcOperations::new(rpc_client, requestor, &me_onchain, rpc_cfg).expect("failed to initialize RPC");
213
214 let ethereum_tx_executor = EthereumTransactionExecutor::new(
216 RpcEthereumClient::new(rpc_operations.clone(), rpc_client_cfg),
217 SafePayloadGenerator::new(&me_onchain, contract_addresses, module_address),
218 );
219
220 let action_queue = ActionQueue::new(
222 db.clone(),
223 IndexerActionTracker::default(),
224 ethereum_tx_executor,
225 action_queue_cfg,
226 );
227
228 let action_state = action_queue.action_state();
229 let action_sender = action_queue.new_sender();
230
231 let hopr_chain_actions = ChainActions::new(&me_onchain, db.clone(), action_sender);
233
234 Self {
235 me_onchain,
236 safe_address,
237 contract_addresses,
238 indexer_cfg,
239 indexer_events_tx,
240 db,
241 hopr_chain_actions,
242 action_queue,
243 action_state,
244 rpc_operations,
245 }
246 }
247
248 pub async fn start(&self) -> errors::Result<HashMap<HoprChainProcess, JoinHandle<()>>> {
253 let mut processes: HashMap<HoprChainProcess, JoinHandle<()>> = HashMap::new();
254
255 processes.insert(
256 HoprChainProcess::OutgoingOnchainActionQueue,
257 spawn(self.action_queue.clone().start()),
258 );
259 processes.insert(
260 HoprChainProcess::Indexer,
261 Indexer::new(
262 self.rpc_operations.clone(),
263 ContractEventHandlers::new(
264 self.contract_addresses,
265 self.safe_address,
266 self.me_onchain.clone(),
267 self.db.clone(),
268 ),
269 self.db.clone(),
270 self.indexer_cfg,
271 self.indexer_events_tx.clone(),
272 )
273 .start()
274 .await?,
275 );
276
277 Ok(processes)
278 }
279
280 pub fn me_onchain(&self) -> Address {
281 self.me_onchain.public().to_address()
282 }
283
284 pub fn action_state(&self) -> Arc<IndexerActionTracker> {
285 self.action_state.clone()
286 }
287
288 pub async fn accounts_announced_on_chain(&self) -> errors::Result<Vec<AccountEntry>> {
289 Ok(self.db.get_accounts(None, true).await?)
290 }
291
292 pub async fn channel(&self, src: &Address, dest: &Address) -> errors::Result<ChannelEntry> {
293 self.db
294 .get_channel_by_parties(None, src, dest, false)
295 .await
296 .map_err(HoprChainError::from)
297 .and_then(|v| {
298 v.ok_or(errors::HoprChainError::Api(format!(
299 "Channel entry not available {}-{}",
300 src, dest
301 )))
302 })
303 }
304
305 pub async fn channels_from(&self, src: &Address) -> errors::Result<Vec<ChannelEntry>> {
306 Ok(self.db.get_channels_via(None, ChannelDirection::Outgoing, src).await?)
307 }
308
309 pub async fn channels_to(&self, dest: &Address) -> errors::Result<Vec<ChannelEntry>> {
310 Ok(self.db.get_channels_via(None, ChannelDirection::Incoming, dest).await?)
311 }
312
313 pub async fn all_channels(&self) -> errors::Result<Vec<ChannelEntry>> {
314 Ok(self.db.get_all_channels(None).await?)
315 }
316
317 pub async fn ticket_price(&self) -> errors::Result<Option<U256>> {
318 Ok(self.db.get_indexer_data(None).await?.ticket_price.map(|b| b.amount()))
319 }
320
321 pub async fn safe_allowance(&self) -> errors::Result<Balance> {
322 Ok(self.db.get_safe_hopr_allowance(None).await?)
323 }
324
325 pub fn actions_ref(&self) -> &ChainActions<T> {
326 &self.hopr_chain_actions
327 }
328
329 pub fn actions_mut_ref(&mut self) -> &mut ChainActions<T> {
330 &mut self.hopr_chain_actions
331 }
332
333 pub fn rpc(&self) -> &RpcOperations<JsonRpcClient, DefaultHttpRequestor> {
334 &self.rpc_operations
335 }
336
337 pub async fn get_balance(&self, balance_type: BalanceType) -> errors::Result<Balance> {
338 Ok(self.rpc_operations.get_balance(self.me_onchain(), balance_type).await?)
339 }
340
341 pub async fn get_safe_balance(&self, safe_address: Address, balance_type: BalanceType) -> errors::Result<Balance> {
342 Ok(self.rpc_operations.get_balance(safe_address, balance_type).await?)
343 }
344
345 pub async fn get_channel_closure_notice_period(&self) -> errors::Result<Duration> {
346 Ok(self.rpc_operations.get_channel_closure_notice_period().await?)
347 }
348
349 pub async fn get_eligibility_status(&self) -> errors::Result<bool> {
350 Ok(self.rpc_operations.get_eligibility_status(self.me_onchain()).await?)
351 }
352
353 pub async fn get_minimum_winning_probability(&self) -> errors::Result<f64> {
354 Ok(self.rpc_operations.get_minimum_network_winning_probability().await?)
355 }
356
357 pub async fn get_minimum_ticket_price(&self) -> errors::Result<Balance> {
358 Ok(self.rpc_operations.get_minimum_network_ticket_price().await?)
359 }
360}