1use std::{
13 fmt::Debug,
14 future::IntoFuture,
15 io::{BufWriter, Write},
16 sync::{
17 Arc,
18 atomic::{AtomicBool, AtomicUsize, Ordering},
19 },
20 task::{Context, Poll},
21 time::Duration,
22};
23
24use alloy::eips::eip1559::Eip1559Estimation;
26use alloy::{
27 network::{EthereumWallet, Network, TransactionBuilder},
28 primitives::utils::parse_units,
29 providers::{
30 Identity, Provider, RootProvider, SendableTx,
31 fillers::{
32 BlobGasFiller, ChainIdFiller, FillProvider, FillerControlFlow, GasFiller, JoinFill, NonceFiller, TxFiller,
33 WalletFiller,
34 },
35 },
36 rpc::json_rpc::{ErrorPayload, RequestPacket, ResponsePacket, ResponsePayload},
37 transports::{HttpError, TransportError, TransportErrorKind, TransportFut, TransportResult, layers::RetryPolicy},
38};
39use futures::{FutureExt, StreamExt};
40use serde::{Deserialize, Serialize};
41use serde_with::{DisplayFromStr, serde_as};
42use tower::{Layer, Service};
43use tracing::{error, trace};
44use url::Url;
45use validator::Validate;
46
47pub const EIP1559_FEE_ESTIMATION_DEFAULT_MAX_FEE_GNOSIS: u128 = 3_000_000_000;
52pub const EIP1559_FEE_ESTIMATION_DEFAULT_PRIORITY_FEE_GNOSIS: u128 = 100_000_000;
53
54#[cfg(all(feature = "prometheus", not(test)))]
55use hopr_metrics::metrics::{MultiCounter, MultiHistogram};
56
57use crate::{rpc::DEFAULT_GAS_ORACLE_URL, transport::HttpRequestor};
58
59#[cfg(all(feature = "prometheus", not(test)))]
60lazy_static::lazy_static! {
61 static ref METRIC_COUNT_RPC_CALLS: MultiCounter = MultiCounter::new(
62 "hopr_rpc_call_count",
63 "Number of Ethereum RPC calls over HTTP and their result",
64 &["call", "result"]
65 )
66 .unwrap();
67 static ref METRIC_RPC_CALLS_TIMING: MultiHistogram = MultiHistogram::new(
68 "hopr_rpc_call_time_sec",
69 "Timing of RPC calls over HTTP in seconds",
70 vec![0.1, 0.5, 1.0, 2.0, 5.0, 7.0, 10.0],
71 &["call"]
72 )
73 .unwrap();
74 static ref METRIC_RETRIES_PER_RPC_CALL: MultiHistogram = MultiHistogram::new(
75 "hopr_retries_per_rpc_call",
76 "Number of retries per RPC call",
77 vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
78 &["call"]
79 )
80 .unwrap();
81}
82
83#[serde_as]
113#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault, Serialize, Deserialize, Validate)]
114pub struct DefaultRetryPolicy {
115 #[validate(range(min = 0))]
119 #[default(Some(0))]
120 pub min_retries: Option<u32>,
121
122 #[default(Duration::from_secs(1))]
129 pub initial_backoff: Duration,
130
131 #[validate(range(min = 0.0))]
138 #[default(0.3)]
139 pub backoff_coefficient: f64,
140 #[default(Duration::from_secs(30))]
146 pub max_backoff: Duration,
147 pub backoff_on_transport_errors: bool,
151 #[default(_code = "vec![-32005, -32016, 429]")]
155 pub retryable_json_rpc_errors: Vec<i64>,
156
157 #[serde_as(as = "Vec<DisplayFromStr>")]
161 #[default(
162 _code = "vec![http::StatusCode::TOO_MANY_REQUESTS,http::StatusCode::GATEWAY_TIMEOUT,\
163 http::StatusCode::SERVICE_UNAVAILABLE]"
164 )]
165 pub retryable_http_errors: Vec<http::StatusCode>,
166
167 #[validate(range(min = 5))]
173 #[default = 100]
174 pub max_retry_queue_size: u32,
175}
176
177impl DefaultRetryPolicy {
178 fn is_retryable_json_rpc_errors(&self, rpc_err: &ErrorPayload) -> bool {
179 self.retryable_json_rpc_errors.contains(&rpc_err.code)
180 }
181
182 fn is_retryable_http_errors(&self, http_err: &HttpError) -> bool {
183 let status_code = match http::StatusCode::try_from(http_err.status) {
184 Ok(status_code) => status_code,
185 Err(_) => return false,
186 };
187 self.retryable_http_errors.contains(&status_code)
188 }
189}
190
191impl RetryPolicy for DefaultRetryPolicy {
192 fn should_retry(&self, err: &TransportError) -> bool {
193 match err {
194 TransportError::Transport(err) => {
197 match err {
198 TransportErrorKind::MissingBatchResponse(_) => true,
200 TransportErrorKind::HttpError(http_err) => {
201 http_err.is_rate_limit_err() || self.is_retryable_http_errors(http_err)
202 }
203 TransportErrorKind::Custom(err) => {
204 let msg = err.to_string();
205 msg.contains("429 Too Many Requests")
206 }
207 _ => false,
208 }
209 }
210 TransportError::SerError(_) => false,
213 TransportError::DeserError { text, .. } => {
214 if let Ok(resp) = serde_json::from_str::<ErrorPayload>(text) {
215 return self.is_retryable_json_rpc_errors(&resp);
216 }
217
218 #[derive(Deserialize)]
221 struct Resp {
222 error: ErrorPayload,
223 }
224
225 if let Ok(resp) = serde_json::from_str::<Resp>(text) {
226 return self.is_retryable_json_rpc_errors(&resp.error);
227 }
228
229 false
230 }
231 TransportError::ErrorResp(err) => self.is_retryable_json_rpc_errors(err),
232 TransportError::NullResp => true,
233 _ => false,
234 }
235 }
236
237 fn backoff_hint(&self, _error: &alloy::transports::TransportError) -> Option<std::time::Duration> {
240 None
241 }
242}
243
244#[derive(Debug, Clone)]
245pub struct ZeroRetryPolicy;
246impl RetryPolicy for ZeroRetryPolicy {
247 fn should_retry(&self, _err: &alloy::transports::TransportError) -> bool {
248 false
249 }
250
251 fn backoff_hint(&self, _error: &alloy::transports::TransportError) -> Option<std::time::Duration> {
252 None
253 }
254}
255
256#[derive(Clone, Copy, Default, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
258pub enum GasCategory {
259 SafeLow,
260 #[default]
261 Standard,
262 Fast,
263 Fastest,
264}
265
266#[derive(Clone, Debug)]
271pub struct GasOracleFiller<C> {
272 client: C,
273 url: Url,
274 gas_category: GasCategory,
275}
276
277#[derive(Clone, Debug, Deserialize, PartialEq)]
278pub struct GasOracleResponse {
279 pub status: String,
280 pub message: String,
281 pub result: GasOracleResponseResult,
282}
283
284#[derive(Clone, Debug, Deserialize, PartialEq)]
285#[serde(rename_all = "PascalCase")]
286pub struct GasOracleResponseResult {
287 pub last_block: String,
288 pub safe_gas_price: String,
289 pub propose_gas_price: String,
290 pub fast_gas_price: String,
291}
292
293impl GasOracleResponse {
294 #[inline]
295 pub fn gas_from_category(&self, gas_category: GasCategory) -> String {
296 self.result.gas_from_category(gas_category)
297 }
298}
299
300impl GasOracleResponseResult {
301 fn gas_from_category(&self, gas_category: GasCategory) -> String {
302 match gas_category {
303 GasCategory::SafeLow => self.safe_gas_price.clone(),
304 GasCategory::Standard => self.propose_gas_price.clone(),
305 GasCategory::Fast => self.fast_gas_price.clone(),
306 GasCategory::Fastest => self.fast_gas_price.clone(),
307 }
308 }
309}
310
311impl<C> GasOracleFiller<C>
312where
313 C: HttpRequestor + Clone,
314{
315 pub fn new(client: C, url: Option<Url>) -> Self {
317 Self {
318 client,
319 url: url.unwrap_or_else(|| Url::parse(DEFAULT_GAS_ORACLE_URL).unwrap()),
320 gas_category: GasCategory::Standard,
321 }
322 }
323
324 pub fn category(mut self, gas_category: GasCategory) -> Self {
326 self.gas_category = gas_category;
327 self
328 }
329
330 pub async fn query(&self) -> Result<GasOracleResponse, TransportError> {
332 let raw_value = self
333 .client
334 .http_get(self.url.as_str())
335 .await
336 .map_err(TransportErrorKind::custom)?;
337
338 let parsed: GasOracleResponse = serde_json::from_slice(raw_value.as_ref()).map_err(|e| {
339 error!(%e, "failed to deserialize gas price API response");
340 TransportErrorKind::Custom("failed to deserialize gas price API response".into())
341 })?;
342
343 Ok(parsed)
344 }
345
346 async fn prepare_legacy<P, N>(&self, provider: &P, tx: &N::TransactionRequest) -> TransportResult<GasOracleFillable>
347 where
348 P: Provider<N>,
349 N: Network,
350 {
351 let gas_limit_fut = tx.gas_limit().map_or_else(
352 || provider.estimate_gas(tx.clone()).into_future().right_future(),
353 |gas_limit| async move { Ok(gas_limit) }.left_future(),
354 );
355
356 let res_fut = self.query();
357
358 let (gas_limit, res) = futures::try_join!(gas_limit_fut, res_fut)?;
360
361 let gas_price_in_gwei = res.gas_from_category(self.gas_category);
366 let gas_price = parse_units(&gas_price_in_gwei, "gwei")
367 .map_err(|e| TransportErrorKind::custom_str(&format!("Failed to parse gwei from gas oracle: {e}")))?;
368 let gas_price_in_128: u128 = gas_price
369 .get_absolute()
370 .try_into()
371 .map_err(|_| TransportErrorKind::custom_str("Conversion overflow"))?;
372
373 Ok(GasOracleFillable::Legacy {
374 gas_limit,
375 gas_price: tx.gas_price().unwrap_or(gas_price_in_128),
376 })
377 }
378
379 async fn prepare_1559<P, N>(&self, provider: &P, tx: &N::TransactionRequest) -> TransportResult<GasOracleFillable>
380 where
381 P: Provider<N>,
382 N: Network,
383 {
384 let gas_limit_fut = tx.gas_limit().map_or_else(
385 || provider.estimate_gas(tx.clone()).into_future().right_future(),
386 |gas_limit| async move { Ok(gas_limit) }.left_future(),
387 );
388
389 let gas_limit = gas_limit_fut.await?;
391
392 Ok(GasOracleFillable::Eip1559 {
393 gas_limit,
394 estimate: self.estimate_eip1559_fees(),
395 })
396 }
397
398 fn estimate_eip1559_fees(&self) -> Eip1559Estimation {
402 Eip1559Estimation {
403 max_fee_per_gas: EIP1559_FEE_ESTIMATION_DEFAULT_MAX_FEE_GNOSIS,
404 max_priority_fee_per_gas: EIP1559_FEE_ESTIMATION_DEFAULT_PRIORITY_FEE_GNOSIS,
405 }
406 }
407}
408
409#[doc(hidden)]
411#[derive(Clone, Copy, Debug, PartialEq, Eq)]
412pub enum GasOracleFillable {
413 Legacy {
414 gas_limit: u64,
415 gas_price: u128,
416 },
417 Eip1559 {
418 gas_limit: u64,
419 estimate: Eip1559Estimation,
420 },
421}
422
423impl<N, C> TxFiller<N> for GasOracleFiller<C>
424where
425 N: Network,
426 C: HttpRequestor + Clone,
427{
428 type Fillable = GasOracleFillable;
429
430 fn status(&self, tx: &<N as Network>::TransactionRequest) -> FillerControlFlow {
431 if tx.gas_price().is_some() && tx.gas_limit().is_some() {
433 return FillerControlFlow::Finished;
434 }
435
436 if tx.max_fee_per_gas().is_some() && tx.max_priority_fee_per_gas().is_some() && tx.gas_limit().is_some() {
438 return FillerControlFlow::Finished;
439 }
440
441 FillerControlFlow::Ready
442 }
443
444 fn fill_sync(&self, _tx: &mut SendableTx<N>) {}
445
446 async fn prepare<P>(&self, provider: &P, tx: &<N as Network>::TransactionRequest) -> TransportResult<Self::Fillable>
447 where
448 P: Provider<N>,
449 {
450 if tx.gas_price().is_some() {
451 self.prepare_legacy(provider, tx).await
452 } else {
453 match self.prepare_1559(provider, tx).await {
454 Ok(estimate) => Ok(estimate),
456 Err(e) => Err(e),
457 }
458 }
459 }
460
461 async fn fill(&self, fillable: Self::Fillable, mut tx: SendableTx<N>) -> TransportResult<SendableTx<N>> {
462 if let Some(builder) = tx.as_mut_builder() {
463 match fillable {
464 GasOracleFillable::Legacy { gas_limit, gas_price } => {
465 builder.set_gas_limit(gas_limit);
466 builder.set_gas_price(gas_price);
467 }
468 GasOracleFillable::Eip1559 { gas_limit, estimate } => {
469 builder.set_gas_limit(gas_limit);
470 builder.set_max_fee_per_gas(estimate.max_fee_per_gas);
471 builder.set_max_priority_fee_per_gas(estimate.max_priority_fee_per_gas);
472 }
473 }
474 };
475 Ok(tx)
476 }
477}
478
479pub struct MetricsLayer;
480
481#[derive(Debug, Clone)]
482pub struct MetricsService<S> {
483 inner: S,
484}
485
486impl<S> Layer<S> for MetricsLayer {
488 type Service = MetricsService<S>;
489
490 fn layer(&self, inner: S) -> Self::Service {
491 MetricsService { inner }
492 }
493}
494
495impl<S> Service<RequestPacket> for MetricsService<S>
497where
498 S: Service<RequestPacket, Future = TransportFut<'static>, Error = TransportError> + Send + 'static + Clone,
499 S::Error: Send + 'static + Debug,
500{
501 type Error = TransportError;
502 type Future = TransportFut<'static>;
503 type Response = ResponsePacket;
504
505 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
506 self.inner.poll_ready(cx)
507 }
508
509 fn call(&mut self, request: RequestPacket) -> Self::Future {
510 let start = std::time::Instant::now();
512
513 let method_names = match request.clone() {
514 RequestPacket::Single(single_req) => vec![single_req.method().to_owned()],
515 RequestPacket::Batch(vec_req) => vec_req.iter().map(|s_req| s_req.method().to_owned()).collect(),
516 };
517
518 let future = self.inner.call(request);
519
520 Box::pin(async move {
522 let res = future.await;
523
524 let req_duration = start.elapsed();
525 method_names.iter().for_each(|method| {
526 trace!(method, duration_in_ms = req_duration.as_millis(), "rpc request took");
527 #[cfg(all(feature = "prometheus", not(test)))]
528 METRIC_RPC_CALLS_TIMING.observe(&[method], req_duration.as_secs_f64());
529 });
530
531 match &res {
533 Ok(result) => match result {
534 ResponsePacket::Single(a) => match a.payload {
535 ResponsePayload::Success(_) => {
536 #[cfg(all(feature = "prometheus", not(test)))]
537 METRIC_COUNT_RPC_CALLS.increment(&[&method_names[0], "success"]);
538 }
539 ResponsePayload::Failure(_) => {
540 #[cfg(all(feature = "prometheus", not(test)))]
541 METRIC_COUNT_RPC_CALLS.increment(&[&method_names[0], "failure"]);
542 }
543 },
544 ResponsePacket::Batch(b) => {
545 b.iter().enumerate().for_each(|(i, _)| match b[i].payload {
546 ResponsePayload::Success(_) => {
547 #[cfg(all(feature = "prometheus", not(test)))]
548 METRIC_COUNT_RPC_CALLS.increment(&[&method_names[i], "success"]);
549 }
550 ResponsePayload::Failure(_) => {
551 #[cfg(all(feature = "prometheus", not(test)))]
552 METRIC_COUNT_RPC_CALLS.increment(&[&method_names[i], "failure"]);
553 }
554 });
555 }
556 },
557 Err(err) => {
558 error!(error = ?err, "Error occurred while processing request");
559 method_names.iter().for_each(|_m| {
560 #[cfg(all(feature = "prometheus", not(test)))]
561 METRIC_COUNT_RPC_CALLS.increment(&[_m, "failure"]);
562 });
563 }
564 };
565
566 res
567 })
568 }
569}
570
571#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
573pub struct RequestorResponseSnapshot {
574 id: usize,
575 request: String,
576 response: String,
577}
578
579#[derive(Debug, Clone)]
586pub struct SnapshotRequestor {
587 next_id: Arc<AtomicUsize>,
588 entries: moka::future::Cache<String, RequestorResponseSnapshot>,
589 file: String,
590 aggressive_save: bool,
591 fail_on_miss: bool,
592 ignore_snapshot: bool,
593}
594
595impl SnapshotRequestor {
596 pub fn new(snapshot_file: &str) -> Self {
603 Self {
604 next_id: Arc::new(AtomicUsize::new(1)),
605 entries: moka::future::Cache::builder().build(),
606 file: snapshot_file.to_owned(),
607 aggressive_save: false,
608 fail_on_miss: false,
609 ignore_snapshot: false,
610 }
611 }
612
613 pub fn snapshot_path(&self) -> &str {
615 &self.file
616 }
617
618 pub fn clear(&self) {
621 self.entries.invalidate_all();
622 self.next_id.store(1, Ordering::Relaxed);
623 }
624
625 pub async fn try_load(&mut self, fail_on_miss: bool) -> Result<(), std::io::Error> {
629 if self.ignore_snapshot {
630 return Ok(());
631 }
632
633 let loaded = serde_yaml::from_reader::<_, Vec<RequestorResponseSnapshot>>(std::fs::File::open(&self.file)?)
634 .map_err(std::io::Error::other)?;
635
636 self.clear();
637
638 let loaded_len = futures::stream::iter(loaded)
639 .then(|entry| {
640 self.next_id.fetch_max(entry.id, Ordering::Relaxed);
641 self.entries.insert(entry.request.clone(), entry)
642 })
643 .collect::<Vec<_>>()
644 .await
645 .len();
646
647 if loaded_len > 0 {
648 self.fail_on_miss = fail_on_miss;
649 }
650
651 tracing::debug!("snapshot with {loaded_len} entries has been loaded from {}", &self.file);
652 Ok(())
653 }
654
655 pub async fn load(mut self, fail_on_miss: bool) -> Self {
659 let _ = self.try_load(fail_on_miss).await;
660 self
661 }
662
663 pub fn with_aggresive_save(mut self) -> Self {
667 self.aggressive_save = true;
668 self
669 }
670
671 pub fn with_ignore_snapshot(mut self, ignore_snapshot: bool) -> Self {
677 self.ignore_snapshot = ignore_snapshot;
678 self
679 }
680
681 pub fn save(&self) -> Result<(), std::io::Error> {
686 if self.ignore_snapshot {
687 return Ok(());
688 }
689
690 let mut values: Vec<RequestorResponseSnapshot> = self.entries.iter().map(|(_, r)| r).collect();
691 values.sort_unstable_by_key(|a| a.id);
692
693 let mut writer = BufWriter::new(std::fs::File::create(&self.file)?);
694
695 serde_yaml::to_writer(&mut writer, &values).map_err(std::io::Error::other)?;
696
697 writer.flush()?;
698
699 tracing::debug!("snapshot with {} entries saved to file {}", values.len(), self.file);
700 Ok(())
701 }
702}
703
704impl Drop for SnapshotRequestor {
705 fn drop(&mut self) {
706 if let Err(e) = self.save() {
707 tracing::error!("failed to save snapshot: {e}");
708 }
709 }
710}
711
712#[derive(Debug, Clone)]
713pub struct SnapshotRequestorLayer {
714 snapshot_requestor: SnapshotRequestor,
715}
716
717impl SnapshotRequestorLayer {
718 pub fn new(snapshot_file: &str) -> Self {
719 Self {
720 snapshot_requestor: SnapshotRequestor::new(snapshot_file),
721 }
722 }
723
724 pub fn from_requestor(snapshot_requestor: SnapshotRequestor) -> Self {
725 Self { snapshot_requestor }
726 }
727}
728#[derive(Debug, Clone)]
729pub struct SnapshotRequestorService<S> {
730 inner: S,
731 snapshot_requestor: SnapshotRequestor,
732}
733
734impl<S> Layer<S> for SnapshotRequestorLayer {
736 type Service = SnapshotRequestorService<S>;
737
738 fn layer(&self, inner: S) -> Self::Service {
739 SnapshotRequestorService {
740 inner,
741 snapshot_requestor: self.snapshot_requestor.clone(),
742 }
743 }
744}
745
746impl<S> Service<RequestPacket> for SnapshotRequestorService<S>
748where
749 S: Service<RequestPacket, Future = TransportFut<'static>, Error = TransportError> + Send + 'static + Clone,
750 S::Error: Send + 'static + Debug,
751{
752 type Error = TransportError;
753 type Future = TransportFut<'static>;
754 type Response = ResponsePacket;
755
756 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
757 self.inner.poll_ready(cx)
758 }
759
760 fn call(&mut self, request: RequestPacket) -> Self::Future {
761 let mut inner = self.inner.clone(); let snapshot_requestor = self.snapshot_requestor.clone(); let future = inner.call(request.clone()); Box::pin(async move {
767 let res = future.await;
768
769 let request_string = serde_json::to_string(&request)
770 .map_err(|e| TransportErrorKind::Custom(format!("serialize error: {e}").into()))?;
771
772 let inserted = AtomicBool::new(false);
773
774 let _result = snapshot_requestor
775 .entries
776 .entry(request_string.clone())
777 .or_try_insert_with(async {
778 if snapshot_requestor.fail_on_miss {
779 tracing::error!("{request_string} is missing in {}", &snapshot_requestor.file);
780 return Err(TransportErrorKind::http_error(
781 http::StatusCode::NOT_FOUND.into(),
782 "".into(),
783 ));
784 }
785
786 let response_string = match &res {
787 Ok(result) => match result {
788 ResponsePacket::Single(resp) => match &resp.payload {
789 ResponsePayload::Success(success_payload) => success_payload.to_string(),
790 ResponsePayload::Failure(e) => {
791 return Err(TransportErrorKind::Custom(format!("RPC error: {e}").into()).into());
792 }
793 },
794 ResponsePacket::Batch(batch) => {
795 let mut responses = Vec::with_capacity(batch.len());
796 for (i, resp) in batch.iter().enumerate() {
797 match &resp.payload {
798 ResponsePayload::Success(success_payload) => {
799 responses.push(success_payload.to_string());
800 }
801 ResponsePayload::Failure(e) => {
802 return Err(TransportErrorKind::Custom(
803 format!("RPC error in batch item #{i}: {e}").into(),
804 )
805 .into());
806 }
807 }
808 }
809 responses.join(", ")
810 }
811 },
812 Err(err) => {
813 error!(error = ?err, "Error occurred while processing request");
814 return Err(TransportErrorKind::Custom(
815 format!("Error occurred while processing request: {err}").into(),
816 )
817 .into());
818 }
819 };
820
821 let id = snapshot_requestor.next_id.fetch_add(1, Ordering::SeqCst);
822 inserted.store(true, Ordering::Relaxed);
823 tracing::debug!("saved new snapshot entry #{id}");
824
825 Ok(RequestorResponseSnapshot {
826 id,
827 request: request_string.clone(),
828 response: response_string,
829 })
830 })
831 .await
832 .map(|e| e.into_value().response.into_bytes().into_boxed_slice())
833 .map_err(|e| TransportErrorKind::Custom(format!("{e}").into()))?;
834
835 if inserted.load(Ordering::Relaxed) && snapshot_requestor.aggressive_save {
836 tracing::debug!("{request_string} was NOT found and was resolved");
837 snapshot_requestor
838 .save()
839 .map_err(|e| TransportErrorKind::Custom(format!("{e}").into()))?;
840 } else {
841 tracing::debug!("{request_string} was found");
842 }
843
844 res
845 })
846 }
847}
848
849pub type AnvilRpcClient = FillProvider<
850 JoinFill<
851 JoinFill<Identity, JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>>,
852 WalletFiller<EthereumWallet>,
853 >,
854 RootProvider,
855>;
856#[cfg(not(target_arch = "wasm32"))]
858pub fn create_rpc_client_to_anvil(
859 anvil: &alloy::node_bindings::AnvilInstance,
860 signer: &hopr_crypto_types::keypairs::ChainKeypair,
861) -> Arc<AnvilRpcClient> {
862 use alloy::{
863 providers::ProviderBuilder, rpc::client::ClientBuilder, signers::local::PrivateKeySigner,
864 transports::http::ReqwestTransport,
865 };
866 use hopr_crypto_types::keypairs::Keypair;
867
868 let wallet = PrivateKeySigner::from_slice(signer.secret().as_ref()).expect("failed to construct wallet");
869
870 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
871
872 let rpc_client = ClientBuilder::default().transport(transport_client.clone(), transport_client.guess_local());
873
874 let provider = ProviderBuilder::new().wallet(wallet).connect_client(rpc_client);
875
876 Arc::new(provider)
877}
878
879#[cfg(test)]
880mod tests {
881 use std::time::Duration;
882
883 use alloy::{
884 network::TransactionBuilder,
885 primitives::{U256, address},
886 providers::{
887 Provider, ProviderBuilder,
888 fillers::{BlobGasFiller, CachedNonceManager, ChainIdFiller, GasFiller, NonceFiller},
889 },
890 rpc::{client::ClientBuilder, types::TransactionRequest},
891 signers::local::PrivateKeySigner,
892 transports::{http::ReqwestTransport, layers::RetryBackoffLayer},
893 };
894 use anyhow::Ok;
895 use hopr_async_runtime::prelude::sleep;
896 use hopr_chain_types::{ContractAddresses, ContractInstances, utils::create_anvil};
897 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair};
898 use hopr_primitive_types::primitives::Address;
899 use serde_json::json;
900 use tempfile::NamedTempFile;
901
902 use crate::client::{
903 DefaultRetryPolicy, GasOracleFiller, MetricsLayer, SnapshotRequestor, SnapshotRequestorLayer, ZeroRetryPolicy,
904 };
905
906 #[tokio::test]
907 async fn test_client_should_deploy_contracts_via_reqwest() -> anyhow::Result<()> {
908 let anvil = create_anvil(None);
909 let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
910 let signer_chain_key = ChainKeypair::from_secret(signer.to_bytes().as_ref())?;
911
912 let rpc_client = ClientBuilder::default().http(anvil.endpoint_url());
913
914 let provider = ProviderBuilder::new().wallet(signer).connect_client(rpc_client);
915
916 let contracts = ContractInstances::deploy_for_testing(provider.clone(), &signer_chain_key)
917 .await
918 .expect("deploy failed");
919
920 let contract_addrs = ContractAddresses::from(&contracts);
921
922 assert_ne!(contract_addrs.token, Address::default());
923 assert_ne!(contract_addrs.channels, Address::default());
924 assert_ne!(contract_addrs.announcements, Address::default());
925 assert_ne!(contract_addrs.network_registry, Address::default());
926 assert_ne!(contract_addrs.safe_registry, Address::default());
927 assert_ne!(contract_addrs.price_oracle, Address::default());
928
929 Ok(())
930 }
931
932 #[tokio::test]
933 async fn test_client_should_get_block_number() -> anyhow::Result<()> {
934 let block_time = Duration::from_millis(1100);
935
936 let anvil = create_anvil(Some(block_time));
937 let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
938
939 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
940
941 let rpc_client = ClientBuilder::default().transport(transport_client.clone(), transport_client.guess_local());
942
943 let provider = ProviderBuilder::new().wallet(signer).connect_client(rpc_client);
944
945 let mut last_number = 0;
946
947 for _ in 0..3 {
948 sleep(block_time).await;
949
950 let num = provider.get_block_number().await?;
951
952 assert!(num > last_number, "next block number must be greater");
953 last_number = num;
954 }
955
956 Ok(())
957 }
958
959 #[tokio::test]
960 async fn test_client_should_get_block_number_with_metrics_without_retry() -> anyhow::Result<()> {
961 let block_time = Duration::from_secs(1);
962
963 let anvil = create_anvil(Some(block_time));
964 let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
965
966 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
967
968 let retry_layer = RetryBackoffLayer::new(2, 100, 100);
970
971 let rpc_client = ClientBuilder::default()
972 .layer(retry_layer)
973 .layer(MetricsLayer)
974 .transport(transport_client.clone(), transport_client.guess_local());
975
976 let provider = ProviderBuilder::new().wallet(signer).connect_client(rpc_client);
977
978 let mut last_number = 0;
979
980 for _ in 0..3 {
981 sleep(block_time).await;
982
983 let num = provider.get_block_number().await?;
984
985 assert!(num > last_number, "next block number must be greater");
986 last_number = num;
987 }
988
989 Ok(())
997 }
998
999 #[tokio::test]
1000 async fn test_client_should_fail_on_malformed_request() -> anyhow::Result<()> {
1001 let anvil = create_anvil(None);
1002 let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
1003
1004 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1005
1006 let rpc_client = ClientBuilder::default().transport(transport_client.clone(), transport_client.guess_local());
1007
1008 let provider = ProviderBuilder::new().wallet(signer).connect_client(rpc_client);
1009
1010 let err = provider
1011 .raw_request::<(), alloy::primitives::U64>("eth_blockNumber_bla".into(), ())
1012 .await
1013 .expect_err("expected error");
1014
1015 assert!(matches!(err, alloy::transports::RpcError::ErrorResp(..)));
1016
1017 Ok(())
1018 }
1019
1020 #[tokio::test]
1021 async fn test_client_should_fail_on_malformed_response() {
1022 let mut server = mockito::Server::new_async().await;
1023
1024 let m = server
1025 .mock("POST", "/")
1026 .with_status(200)
1027 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1028 .with_body("}malformed{")
1029 .expect(1)
1030 .create();
1031
1032 let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1033
1034 let rpc_client = ClientBuilder::default()
1035 .layer(RetryBackoffLayer::new(2, 100, 100))
1036 .transport(transport_client.clone(), transport_client.guess_local());
1037
1038 let provider = ProviderBuilder::new().connect_client(rpc_client);
1039
1040 let err = provider
1041 .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1042 .await
1043 .expect_err("expected error");
1044
1045 m.assert();
1046 assert!(matches!(
1047 err,
1048 alloy::transports::RpcError::DeserError { err: _, text: _ }
1049 ));
1050 }
1051
1052 #[tokio::test]
1053 async fn test_client_should_retry_on_http_error() {
1054 let mut server = mockito::Server::new_async().await;
1055
1056 let too_many_requests: u16 = http::StatusCode::TOO_MANY_REQUESTS.as_u16();
1057
1058 let m = server
1059 .mock("POST", "/")
1060 .with_status(too_many_requests as usize)
1061 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1062 .with_body("{}")
1063 .expect(3)
1064 .create();
1065
1066 let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1067
1068 let rpc_client = ClientBuilder::default()
1069 .layer(RetryBackoffLayer::new(2, 100, 100))
1070 .transport(transport_client.clone(), transport_client.guess_local());
1071
1072 let provider = ProviderBuilder::new().connect_client(rpc_client);
1085
1086 let err = provider
1087 .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1088 .await
1089 .expect_err("expected error");
1090
1091 m.assert();
1092 assert!(matches!(err, alloy::transports::RpcError::Transport(..)));
1093
1094 }
1101
1102 #[tokio::test]
1103 async fn test_client_should_not_retry_with_zero_retry_policy() {
1104 let mut server = mockito::Server::new_async().await;
1105
1106 let m = server
1107 .mock("POST", "/")
1108 .with_status(404)
1109 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1110 .with_body("{}")
1111 .expect(1)
1112 .create();
1113
1114 let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1115
1116 let rpc_client = ClientBuilder::default()
1117 .layer(RetryBackoffLayer::new_with_policy(2, 100, 100, ZeroRetryPolicy))
1118 .transport(transport_client.clone(), transport_client.guess_local());
1119
1120 let provider = ProviderBuilder::new().connect_client(rpc_client);
1121
1122 let err = provider
1123 .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1124 .await
1125 .expect_err("expected error");
1126
1127 m.assert();
1128 assert!(matches!(err, alloy::transports::RpcError::Transport(..)));
1129 }
1136
1137 #[tokio::test]
1138 async fn test_client_should_retry_on_json_rpc_error() {
1139 let mut server = mockito::Server::new_async().await;
1140
1141 let m = server
1142 .mock("POST", "/")
1143 .with_status(200)
1144 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1145 .with_body(
1146 r#"{
1147 "jsonrpc": "2.0",
1148 "id": 1,
1149 "error": {
1150 "message": "some message",
1151 "code": -32603
1152 }
1153 }"#,
1154 )
1155 .expect(3)
1156 .create();
1157
1158 let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1159
1160 let simple_json_rpc_retry_policy = DefaultRetryPolicy {
1161 initial_backoff: Duration::from_millis(100),
1162 retryable_json_rpc_errors: vec![-32603],
1163 ..DefaultRetryPolicy::default()
1164 };
1165 let rpc_client = ClientBuilder::default()
1166 .layer(RetryBackoffLayer::new_with_policy(
1167 2,
1168 100,
1169 100,
1170 simple_json_rpc_retry_policy,
1171 ))
1172 .transport(transport_client.clone(), transport_client.guess_local());
1173
1174 let provider = ProviderBuilder::new().connect_client(rpc_client);
1175
1176 let err = provider
1177 .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1178 .await
1179 .expect_err("expected error");
1180
1181 m.assert();
1182 assert!(matches!(err, alloy::transports::RpcError::Transport(..)));
1183 }
1190
1191 #[tokio::test]
1192 async fn test_client_should_not_retry_on_nonretryable_json_rpc_error() {
1193 let mut server = mockito::Server::new_async().await;
1194
1195 let m = server
1196 .mock("POST", "/")
1197 .with_status(200)
1198 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1199 .with_body(
1200 r#"{
1201 "jsonrpc": "2.0",
1202 "id": 1,
1203 "error": {
1204 "message": "some message",
1205 "code": -32000
1206 }
1207 }"#,
1208 )
1209 .expect(1)
1210 .create();
1211
1212 let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1213
1214 let simple_json_rpc_retry_policy = DefaultRetryPolicy {
1215 initial_backoff: Duration::from_millis(100),
1216 retryable_json_rpc_errors: vec![],
1217 ..DefaultRetryPolicy::default()
1218 };
1219 let rpc_client = ClientBuilder::default()
1220 .layer(RetryBackoffLayer::new_with_policy(
1221 2,
1222 100,
1223 100,
1224 simple_json_rpc_retry_policy,
1225 ))
1226 .transport(transport_client.clone(), transport_client.guess_local());
1227
1228 let provider = ProviderBuilder::new().connect_client(rpc_client);
1229
1230 let err = provider
1231 .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1232 .await
1233 .expect_err("expected error");
1234
1235 m.assert();
1236 assert!(matches!(err, alloy::transports::RpcError::ErrorResp(..)));
1237
1238 }
1245
1246 #[tokio::test]
1247 async fn test_client_should_retry_on_nonretryable_json_rpc_error_if_min_retries_is_given() {
1248 let mut server = mockito::Server::new_async().await;
1249
1250 let _m = server
1251 .mock("POST", "/")
1252 .with_status(200)
1253 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1254 .with_body(
1255 r#"{
1256 "jsonrpc": "2.0",
1257 "id": 1,
1258 "error": {
1259 "message": "some message",
1260 "code": -32000
1261 }
1262 }"#,
1263 )
1264 .expect(2)
1265 .create();
1266
1267 let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1268
1269 let simple_json_rpc_retry_policy = DefaultRetryPolicy {
1270 initial_backoff: Duration::from_millis(100),
1271 retryable_json_rpc_errors: vec![],
1272 min_retries: Some(1),
1273 ..DefaultRetryPolicy::default()
1274 };
1275 let rpc_client = ClientBuilder::default()
1276 .layer(RetryBackoffLayer::new_with_policy(
1277 2,
1278 100,
1279 100,
1280 simple_json_rpc_retry_policy,
1281 ))
1282 .transport(transport_client.clone(), transport_client.guess_local());
1283
1284 let provider = ProviderBuilder::new().connect_client(rpc_client);
1285
1286 let err = provider
1287 .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1288 .await
1289 .expect_err("expected error");
1290
1291 assert!(matches!(err, alloy::transports::RpcError::ErrorResp(..)));
1294
1295 }
1302
1303 #[tokio::test]
1304 async fn test_client_should_retry_on_malformed_json_rpc_error() {
1305 let mut server = mockito::Server::new_async().await;
1306
1307 let m = server
1308 .mock("POST", "/")
1309 .with_status(200)
1310 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1311 .with_body(
1312 r#"{
1313 "jsonrpc": "2.0",
1314 "error": {
1315 "message": "some message",
1316 "code": -32600
1317 }
1318 }"#,
1319 )
1320 .expect(3)
1321 .create();
1322
1323 let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1324
1325 let simple_json_rpc_retry_policy = DefaultRetryPolicy {
1326 initial_backoff: Duration::from_millis(100),
1327 retryable_json_rpc_errors: vec![-32600],
1328 min_retries: Some(1),
1329 ..DefaultRetryPolicy::default()
1330 };
1331 let rpc_client = ClientBuilder::default()
1332 .layer(RetryBackoffLayer::new_with_policy(
1333 2,
1334 100,
1335 100,
1336 simple_json_rpc_retry_policy,
1337 ))
1338 .transport(transport_client.clone(), transport_client.guess_local());
1339
1340 let provider = ProviderBuilder::new().connect_client(rpc_client);
1341
1342 let err = provider
1343 .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1344 .await
1345 .expect_err("expected error");
1346
1347 m.assert();
1348 assert!(matches!(err, alloy::transports::RpcError::Transport(..)));
1349
1350 }
1357
1358 #[test_log::test(tokio::test)]
1359 async fn test_client_from_file() -> anyhow::Result<()> {
1360 let block_time = Duration::from_millis(1100);
1361 let snapshot_file = NamedTempFile::new()?;
1362
1363 let anvil = create_anvil(Some(block_time));
1364
1365 {
1366 let mut last_number = 0;
1367
1368 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1369
1370 let rpc_client = ClientBuilder::default()
1371 .layer(RetryBackoffLayer::new_with_policy(
1372 2,
1373 100,
1374 100,
1375 DefaultRetryPolicy::default(),
1376 ))
1377 .layer(SnapshotRequestorLayer::new(snapshot_file.path().to_str().unwrap()))
1378 .transport(transport_client.clone(), transport_client.guess_local());
1379
1380 let provider = ProviderBuilder::new().connect_client(rpc_client);
1381
1382 for _ in 0..3 {
1383 sleep(block_time).await;
1384
1385 let num = provider.get_block_number().await?;
1386
1387 assert!(num > last_number, "next block number must be greater");
1388 last_number = num;
1389 }
1390 }
1391
1392 {
1393 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1394
1395 let snapshot_requestor = SnapshotRequestor::new(snapshot_file.path().to_str().unwrap())
1396 .load(true)
1397 .await;
1398
1399 let rpc_client = ClientBuilder::default()
1400 .layer(RetryBackoffLayer::new_with_policy(
1401 2,
1402 100,
1403 100,
1404 DefaultRetryPolicy::default(),
1405 ))
1406 .layer(SnapshotRequestorLayer::from_requestor(snapshot_requestor))
1407 .transport(transport_client.clone(), transport_client.guess_local());
1408
1409 let provider = ProviderBuilder::new().connect_client(rpc_client);
1410
1411 let mut last_number = 0;
1412 for _ in 0..3 {
1413 sleep(block_time).await;
1414
1415 let num = provider.get_block_number().await?;
1416
1417 assert!(num > last_number, "next block number must be greater");
1418 last_number = num;
1419 }
1420 }
1421
1422 Ok(())
1423 }
1424
1425 #[tokio::test]
1426 async fn test_client_should_call_on_gas_oracle_for_eip1559_tx() -> anyhow::Result<()> {
1427 let _ = env_logger::builder().is_test(true).try_init();
1428
1429 let mut server = mockito::Server::new_async().await;
1430
1431 let m = server
1432 .mock("GET", "/gasapi.ashx?apikey=key&method=gasoracle")
1433 .with_status(http::StatusCode::ACCEPTED.as_u16().into())
1434 .with_body(r#"{"status":"1","message":"OK","result":{"LastBlock":"39864926","SafeGasPrice":"1.1","ProposeGasPrice":"1.1","FastGasPrice":"1.6","UsdPrice":"0.999968207972734"}}"#)
1435 .expect(0)
1436 .create();
1437
1438 let anvil = create_anvil(None);
1439 let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
1440
1441 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1442 let rpc_client = ClientBuilder::default()
1445 .layer(RetryBackoffLayer::new(2, 100, 100))
1446 .transport(transport_client.clone(), transport_client.guess_local());
1447
1448 let provider = ProviderBuilder::new()
1449 .disable_recommended_fillers()
1450 .wallet(signer)
1451 .filler(NonceFiller::new(CachedNonceManager::default()))
1452 .filler(GasOracleFiller::new(
1453 transport_client.client().clone(),
1454 Some((server.url() + "/gasapi.ashx?apikey=key&method=gasoracle").parse()?),
1455 ))
1456 .filler(GasFiller)
1457 .connect_client(rpc_client);
1458
1459 let tx = TransactionRequest::default()
1460 .with_chain_id(provider.get_chain_id().await?)
1461 .to(address!("d8dA6BF26964aF9D7eEd9e03E53415D37aA96045"))
1462 .value(U256::from(100))
1463 .transaction_type(2);
1464
1465 let receipt = provider.send_transaction(tx).await?.get_receipt().await?;
1466
1467 m.assert();
1468 assert_eq!(receipt.gas_used, 21000);
1469 Ok(())
1470 }
1471
1472 #[tokio::test]
1473 async fn test_client_should_call_on_gas_oracle_for_legacy_tx() -> anyhow::Result<()> {
1474 let _ = env_logger::builder().is_test(true).try_init();
1475
1476 let mut server = mockito::Server::new_async().await;
1477
1478 let m = server
1479 .mock("GET", "/gasapi.ashx?apikey=key&method=gasoracle")
1480 .with_status(http::StatusCode::ACCEPTED.as_u16().into())
1481 .with_body(r#"{"status":"1","message":"OK","result":{"LastBlock":"39864926","SafeGasPrice":"1.1","ProposeGasPrice":"3.5","FastGasPrice":"1.6","UsdPrice":"0.999968207972734"}}"#)
1482 .expect(1)
1483 .create();
1484
1485 let anvil = create_anvil(None);
1486 let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
1487
1488 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1489
1490 let rpc_client = ClientBuilder::default()
1491 .layer(RetryBackoffLayer::new(2, 100, 100))
1492 .transport(transport_client.clone(), transport_client.guess_local());
1493
1494 let provider = ProviderBuilder::new()
1495 .disable_recommended_fillers()
1496 .wallet(signer)
1497 .filler(ChainIdFiller::default())
1498 .filler(NonceFiller::new(CachedNonceManager::default()))
1499 .filler(GasOracleFiller::new(
1500 transport_client.client().clone(),
1501 Some((server.url() + "/gasapi.ashx?apikey=key&method=gasoracle").parse()?),
1502 ))
1503 .filler(GasFiller)
1504 .filler(BlobGasFiller)
1505 .connect_client(rpc_client);
1506
1507 let tx = TransactionRequest::default()
1509 .with_to(address!("d8dA6BF26964aF9D7eEd9e03E53415D37aA96045"))
1510 .with_value(U256::from(100))
1511 .with_gas_price(1000000000);
1512
1513 let receipt = provider.send_transaction(tx).await?.get_receipt().await?;
1514
1515 m.assert();
1516 assert_eq!(receipt.gas_used, 21000);
1517 Ok(())
1518 }
1519}