hopr_chain_rpc/
client.rs

1//! Due to the migration of the RPC client to the `alloy` crate, this module contains implementation
2//! and parameters of client layers. The underlying HTTP transport layer is defined in `transport.rs`.
3//!
4//! Extended layers of RPC clients:
5//! - Replace the legacy retry backoff layer with the default [`RetryBackoffService`]. However the backoff calculation
6//!   still needs to be improved, as the number of retries is not passed to the `backoff_hint` method.
7//! - Add Metrics Layer
8//! - Add Snapshot Layer
9//! - Use tokio runtime for most of the tests
10//!
11//! This module contains defalut gas estimation constants for EIP-1559 for Gnosis chain,
12use std::{
13    fmt::Debug,
14    future::IntoFuture,
15    io::{BufWriter, Write},
16    sync::{
17        Arc,
18        atomic::{AtomicBool, AtomicUsize, Ordering},
19    },
20    task::{Context, Poll},
21    time::Duration,
22};
23
24/// as GasOracleMiddleware middleware is migrated to GasFiller
25use alloy::eips::eip1559::Eip1559Estimation;
26use alloy::{
27    network::{EthereumWallet, Network, TransactionBuilder},
28    primitives::utils::parse_units,
29    providers::{
30        Identity, Provider, RootProvider, SendableTx,
31        fillers::{
32            BlobGasFiller, ChainIdFiller, FillProvider, FillerControlFlow, GasFiller, JoinFill, NonceFiller, TxFiller,
33            WalletFiller,
34        },
35    },
36    rpc::json_rpc::{ErrorPayload, RequestPacket, ResponsePacket, ResponsePayload},
37    transports::{HttpError, TransportError, TransportErrorKind, TransportFut, TransportResult, layers::RetryPolicy},
38};
39use futures::{FutureExt, StreamExt};
40use serde::{Deserialize, Serialize};
41use serde_with::{DisplayFromStr, serde_as};
42use tower::{Layer, Service};
43use tracing::{error, trace};
44use url::Url;
45use validator::Validate;
46
47/// Gas estimation constants for EIP-1559 for Gnosis chain.
48/// These values are used to estimate the gas price for transactions.
49/// As GasOracleMiddleware is migrated to GasFiller, they are replaced with
50/// default values.
51pub const EIP1559_FEE_ESTIMATION_DEFAULT_MAX_FEE_GNOSIS: u128 = 3_000_000_000;
52pub const EIP1559_FEE_ESTIMATION_DEFAULT_PRIORITY_FEE_GNOSIS: u128 = 100_000_000;
53
54use crate::{rpc::DEFAULT_GAS_ORACLE_URL, transport::HttpRequestor};
55
56#[cfg(all(feature = "prometheus", not(test)))]
57lazy_static::lazy_static! {
58    static ref METRIC_COUNT_RPC_CALLS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
59        "hopr_rpc_call_count",
60        "Number of Ethereum RPC calls over HTTP and their result",
61        &["call", "result"]
62    )
63    .unwrap();
64    static ref METRIC_RPC_CALLS_TIMING: hopr_metrics::MultiHistogram = hopr_metrics::MultiHistogram::new(
65        "hopr_rpc_call_time_sec",
66        "Timing of RPC calls over HTTP in seconds",
67        vec![0.1, 0.5, 1.0, 2.0, 5.0, 7.0, 10.0],
68        &["call"]
69    )
70    .unwrap();
71    static ref METRIC_RETRIES_PER_RPC_CALL: hopr_metrics::MultiHistogram = hopr_metrics::MultiHistogram::new(
72        "hopr_retries_per_rpc_call",
73        "Number of retries per RPC call",
74        vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
75        &["call"]
76    )
77    .unwrap();
78}
79
80/// Defines a default retry policy suitable for `RpcClient`.
81/// This is a reimplementation of the legacy "retry policy suitable for `JsonRpcProviderClient`"
82///
83/// This retry policy distinguishes between 4 types of RPC request failures:
84/// - JSON RPC error (based on error code)
85/// - HTTP error (based on HTTP status)
86/// - Transport error (e.g. connection timeout)
87/// - Serde error (some of these are treated as JSON RPC error above, if an error code can be obtained).
88///
89/// The standard `RetryBackoffLayer` defines the following properties:
90/// - `max_rate_limit_retries`: (u32) The maximum number of retries for rate limit errors. Different from the legacy
91///   implementation, there is always an upper limit.
92/// - `initial_backoff`: (u64) The initial backoff in milliseconds
93/// - `compute_units_per_second`: (u64) The number of compute units per second for this service
94///
95/// The policy will make up to `max_retries` once a JSON RPC request fails.
96/// The minimum number of retries `min_retries` can be also specified and applies to any type of error regardless.
97/// Each retry `k > 0` will be separated by a delay of `initial_backoff * (1 + backoff_coefficient)^(k - 1)`,
98/// namely all the JSON RPC error codes specified in `retryable_json_rpc_errors` and all the HTTP errors
99/// specified in `retryable_http_errors`.
100///
101/// The total wait time will be `(initial_backoff/backoff_coefficient) * ((1 + backoff_coefficient)^max_retries - 1)`.
102/// or `max_backoff`, whatever is lower.
103///
104/// Transport and connection errors (such as connection timeouts) are retried without backoff
105/// at a constant delay of `initial_backoff` if `backoff_on_transport_errors` is not set.
106///
107/// No more additional retries are allowed on new requests, if the maximum number of concurrent
108/// requests being retried has reached `max_retry_queue_size`.
109#[serde_as]
110#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault, Serialize, Deserialize, Validate)]
111pub struct DefaultRetryPolicy {
112    /// Minimum number of retries of any error, regardless the error code.
113    ///
114    /// Default is 0.
115    #[validate(range(min = 0))]
116    #[default(Some(0))]
117    pub min_retries: Option<u32>,
118
119    /// Initial wait before retries.
120    ///
121    /// NOTE: Transport and connection errors (such as connection timeouts) are retried at
122    /// a constant rate (no backoff) with this delay if `backoff_on_transport_errors` is not set.
123    ///
124    /// Default is 1 second.
125    #[default(Duration::from_secs(1))]
126    pub initial_backoff: Duration,
127
128    /// Backoff coefficient by which will be each retry multiplied.
129    ///
130    /// Must be non-negative. If set to `0`, no backoff will be applied and the
131    /// requests will be retried at a constant rate.
132    ///
133    /// Default is 0.3
134    #[validate(range(min = 0.0))]
135    #[default(0.3)]
136    pub backoff_coefficient: f64,
137    /// Maximum backoff value.
138    ///
139    /// Once reached, the requests will be retried at a constant rate with this timeout.
140    ///
141    /// Default is 30 seconds.
142    #[default(Duration::from_secs(30))]
143    pub max_backoff: Duration,
144    /// Indicates whether to also apply backoff to transport and connection errors (such as connection timeouts).
145    ///
146    /// Default is false.
147    pub backoff_on_transport_errors: bool,
148    /// List of JSON RPC errors that should be retried with backoff
149    ///
150    /// Default is \[429, -32005, -32016\]
151    #[default(_code = "vec![-32005, -32016, 429]")]
152    pub retryable_json_rpc_errors: Vec<i64>,
153
154    /// List of HTTP errors that should be retried with backoff.
155    ///
156    /// Default is \[429, 504, 503\]
157    #[serde_as(as = "Vec<DisplayFromStr>")]
158    #[default(
159        _code = "vec![http::StatusCode::TOO_MANY_REQUESTS,http::StatusCode::GATEWAY_TIMEOUT,\
160                 http::StatusCode::SERVICE_UNAVAILABLE]"
161    )]
162    pub retryable_http_errors: Vec<http::StatusCode>,
163
164    /// Maximum number of different requests that are being retried at the same time.
165    ///
166    /// If any additional request fails after this number is attained, it won't be retried.
167    ///
168    /// Default is 100
169    #[validate(range(min = 5))]
170    #[default = 100]
171    pub max_retry_queue_size: u32,
172}
173
174impl DefaultRetryPolicy {
175    fn is_retryable_json_rpc_errors(&self, rpc_err: &ErrorPayload) -> bool {
176        self.retryable_json_rpc_errors.contains(&rpc_err.code)
177    }
178
179    fn is_retryable_http_errors(&self, http_err: &HttpError) -> bool {
180        let status_code = match http::StatusCode::try_from(http_err.status) {
181            Ok(status_code) => status_code,
182            Err(_) => return false,
183        };
184        self.retryable_http_errors.contains(&status_code)
185    }
186}
187
188impl RetryPolicy for DefaultRetryPolicy {
189    fn should_retry(&self, err: &TransportError) -> bool {
190        match err {
191            // There was a transport-level error. This is either a non-retryable error,
192            // or a server error that should be retried.
193            TransportError::Transport(err) => {
194                match err {
195                    // Missing batch response errors can be retried.
196                    TransportErrorKind::MissingBatchResponse(_) => true,
197                    TransportErrorKind::HttpError(http_err) => {
198                        http_err.is_rate_limit_err() || self.is_retryable_http_errors(http_err)
199                    }
200                    TransportErrorKind::Custom(err) => {
201                        let msg = err.to_string();
202                        msg.contains("429 Too Many Requests")
203                    }
204                    _ => false,
205                }
206            }
207            // The transport could not serialize the error itself. The request was malformed from
208            // the start.
209            TransportError::SerError(_) => false,
210            TransportError::DeserError { text, .. } => {
211                if let Ok(resp) = serde_json::from_str::<ErrorPayload>(text) {
212                    return self.is_retryable_json_rpc_errors(&resp);
213                }
214
215                // some providers send invalid JSON RPC in the error case (no `id:u64`), but the
216                // text should be a `JsonRpcError`
217                #[derive(Deserialize)]
218                struct Resp {
219                    error: ErrorPayload,
220                }
221
222                if let Ok(resp) = serde_json::from_str::<Resp>(text) {
223                    return self.is_retryable_json_rpc_errors(&resp.error);
224                }
225
226                false
227            }
228            TransportError::ErrorResp(err) => self.is_retryable_json_rpc_errors(err),
229            TransportError::NullResp => true,
230            _ => false,
231        }
232    }
233
234    // TODO(#7140): original implementation requires input param of `num_retries`
235    // next_backoff = initial_backoff * (1 + backoff_coefficient)^(num_retries - 1)
236    fn backoff_hint(&self, _error: &alloy::transports::TransportError) -> Option<std::time::Duration> {
237        None
238    }
239}
240
241#[derive(Debug, Clone)]
242pub struct ZeroRetryPolicy;
243impl RetryPolicy for ZeroRetryPolicy {
244    fn should_retry(&self, _err: &alloy::transports::TransportError) -> bool {
245        false
246    }
247
248    fn backoff_hint(&self, _error: &alloy::transports::TransportError) -> Option<std::time::Duration> {
249        None
250    }
251}
252
253/// Generic [`GasOracle`] gas price categories.
254#[derive(Clone, Copy, Default, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
255pub enum GasCategory {
256    SafeLow,
257    #[default]
258    Standard,
259    Fast,
260    Fastest,
261}
262
263/// Use the underlying gas tracker API of GnosisScan to populate the gas price.
264/// It returns gas price in gwei.
265/// It implements the `GasOracle` trait.
266/// If no Oracle URL is given, it returns no values.
267#[derive(Clone, Debug)]
268pub struct GasOracleFiller<C> {
269    client: C,
270    url: Url,
271    gas_category: GasCategory,
272}
273
274#[derive(Clone, Debug, Deserialize, PartialEq)]
275pub struct GasOracleResponse {
276    pub status: String,
277    pub message: String,
278    pub result: GasOracleResponseResult,
279}
280
281#[derive(Clone, Debug, Deserialize, PartialEq)]
282#[serde(rename_all = "PascalCase")]
283pub struct GasOracleResponseResult {
284    pub last_block: String,
285    pub safe_gas_price: String,
286    pub propose_gas_price: String,
287    pub fast_gas_price: String,
288}
289
290impl GasOracleResponse {
291    #[inline]
292    pub fn gas_from_category(&self, gas_category: GasCategory) -> String {
293        self.result.gas_from_category(gas_category)
294    }
295}
296
297impl GasOracleResponseResult {
298    fn gas_from_category(&self, gas_category: GasCategory) -> String {
299        match gas_category {
300            GasCategory::SafeLow => self.safe_gas_price.clone(),
301            GasCategory::Standard => self.propose_gas_price.clone(),
302            GasCategory::Fast => self.fast_gas_price.clone(),
303            GasCategory::Fastest => self.fast_gas_price.clone(),
304        }
305    }
306}
307
308impl<C> GasOracleFiller<C>
309where
310    C: HttpRequestor + Clone,
311{
312    /// Same as [`Self::new`] but with a custom [`Client`].
313    pub fn new(client: C, url: Option<Url>) -> Self {
314        Self {
315            client,
316            url: url.unwrap_or_else(|| Url::parse(DEFAULT_GAS_ORACLE_URL).unwrap()),
317            gas_category: GasCategory::Standard,
318        }
319    }
320
321    /// Sets the gas price category to be used when fetching the gas price.
322    pub fn category(mut self, gas_category: GasCategory) -> Self {
323        self.gas_category = gas_category;
324        self
325    }
326
327    /// Perform a request to the gas price API and deserialize the response.
328    pub async fn query(&self) -> Result<GasOracleResponse, TransportError> {
329        let raw_value = self
330            .client
331            .http_get(self.url.as_str())
332            .await
333            .map_err(TransportErrorKind::custom)?;
334
335        let parsed: GasOracleResponse = serde_json::from_slice(raw_value.as_ref()).map_err(|e| {
336            error!(%e, "failed to deserialize gas price API response");
337            TransportErrorKind::Custom("failed to deserialize gas price API response".into())
338        })?;
339
340        Ok(parsed)
341    }
342
343    async fn prepare_legacy<P, N>(&self, provider: &P, tx: &N::TransactionRequest) -> TransportResult<GasOracleFillable>
344    where
345        P: Provider<N>,
346        N: Network,
347    {
348        let gas_limit_fut = tx.gas_limit().map_or_else(
349            || provider.estimate_gas(tx.clone()).into_future().right_future(),
350            |gas_limit| async move { Ok(gas_limit) }.left_future(),
351        );
352
353        let res_fut = self.query();
354
355        // Run both futures concurrently
356        let (gas_limit, res) = futures::try_join!(gas_limit_fut, res_fut)?;
357
358        // // Await the future to get the gas limit
359        // let gas_limit = gas_limit_fut.await?;
360
361        // let res = self.query().await?;
362        let gas_price_in_gwei = res.gas_from_category(self.gas_category);
363        let gas_price = parse_units(&gas_price_in_gwei, "gwei")
364            .map_err(|e| TransportErrorKind::custom_str(&format!("Failed to parse gwei from gas oracle: {e}")))?;
365        let gas_price_in_128: u128 = gas_price
366            .get_absolute()
367            .try_into()
368            .map_err(|_| TransportErrorKind::custom_str("Conversion overflow"))?;
369
370        Ok(GasOracleFillable::Legacy {
371            gas_limit,
372            gas_price: tx.gas_price().unwrap_or(gas_price_in_128),
373        })
374    }
375
376    async fn prepare_1559<P, N>(&self, provider: &P, tx: &N::TransactionRequest) -> TransportResult<GasOracleFillable>
377    where
378        P: Provider<N>,
379        N: Network,
380    {
381        let gas_limit_fut = tx.gas_limit().map_or_else(
382            || provider.estimate_gas(tx.clone()).into_future().right_future(),
383            |gas_limit| async move { Ok(gas_limit) }.left_future(),
384        );
385
386        // Await the future to get the gas limit
387        let gas_limit = gas_limit_fut.await?;
388
389        Ok(GasOracleFillable::Eip1559 {
390            gas_limit,
391            estimate: self.estimate_eip1559_fees(),
392        })
393    }
394
395    // returns hardcoded (max_fee_per_gas, max_priority_fee_per_gas)
396    // Due to foundry is unable to estimate EIP-1559 fees for L2s https://github.com/foundry-rs/foundry/issues/5709,
397    // a hardcoded value of (3 gwei, 0.1 gwei) for Gnosischain is returned.
398    fn estimate_eip1559_fees(&self) -> Eip1559Estimation {
399        Eip1559Estimation {
400            max_fee_per_gas: EIP1559_FEE_ESTIMATION_DEFAULT_MAX_FEE_GNOSIS,
401            max_priority_fee_per_gas: EIP1559_FEE_ESTIMATION_DEFAULT_PRIORITY_FEE_GNOSIS,
402        }
403    }
404}
405
406/// An enum over the different types of gas fillable.
407#[doc(hidden)]
408#[derive(Clone, Copy, Debug, PartialEq, Eq)]
409pub enum GasOracleFillable {
410    Legacy {
411        gas_limit: u64,
412        gas_price: u128,
413    },
414    Eip1559 {
415        gas_limit: u64,
416        estimate: Eip1559Estimation,
417    },
418}
419
420impl<N, C> TxFiller<N> for GasOracleFiller<C>
421where
422    N: Network,
423    C: HttpRequestor + Clone,
424{
425    type Fillable = GasOracleFillable;
426
427    fn status(&self, tx: &<N as Network>::TransactionRequest) -> FillerControlFlow {
428        // legacy and eip2930 tx
429        if tx.gas_price().is_some() && tx.gas_limit().is_some() {
430            return FillerControlFlow::Finished;
431        }
432
433        // eip1559
434        if tx.max_fee_per_gas().is_some() && tx.max_priority_fee_per_gas().is_some() && tx.gas_limit().is_some() {
435            return FillerControlFlow::Finished;
436        }
437
438        FillerControlFlow::Ready
439    }
440
441    fn fill_sync(&self, _tx: &mut SendableTx<N>) {}
442
443    async fn prepare<P>(&self, provider: &P, tx: &<N as Network>::TransactionRequest) -> TransportResult<Self::Fillable>
444    where
445        P: Provider<N>,
446    {
447        if tx.gas_price().is_some() {
448            self.prepare_legacy(provider, tx).await
449        } else {
450            match self.prepare_1559(provider, tx).await {
451                // fallback to legacy
452                Ok(estimate) => Ok(estimate),
453                Err(e) => Err(e),
454            }
455        }
456    }
457
458    async fn fill(&self, fillable: Self::Fillable, mut tx: SendableTx<N>) -> TransportResult<SendableTx<N>> {
459        if let Some(builder) = tx.as_mut_builder() {
460            match fillable {
461                GasOracleFillable::Legacy { gas_limit, gas_price } => {
462                    builder.set_gas_limit(gas_limit);
463                    builder.set_gas_price(gas_price);
464                }
465                GasOracleFillable::Eip1559 { gas_limit, estimate } => {
466                    builder.set_gas_limit(gas_limit);
467                    builder.set_max_fee_per_gas(estimate.max_fee_per_gas);
468                    builder.set_max_priority_fee_per_gas(estimate.max_priority_fee_per_gas);
469                }
470            }
471        };
472        Ok(tx)
473    }
474}
475
476pub struct MetricsLayer;
477
478#[derive(Debug, Clone)]
479pub struct MetricsService<S> {
480    inner: S,
481}
482
483// Implement tower::Layer for MetricsLayer.
484impl<S> Layer<S> for MetricsLayer {
485    type Service = MetricsService<S>;
486
487    fn layer(&self, inner: S) -> Self::Service {
488        MetricsService { inner }
489    }
490}
491
492/// Implement the [`tower::Service`] trait for the [`MetricsService`].
493impl<S> Service<RequestPacket> for MetricsService<S>
494where
495    S: Service<RequestPacket, Future = TransportFut<'static>, Error = TransportError> + Send + 'static + Clone,
496    S::Error: Send + 'static + Debug,
497{
498    type Error = TransportError;
499    type Future = TransportFut<'static>;
500    type Response = ResponsePacket;
501
502    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
503        self.inner.poll_ready(cx)
504    }
505
506    fn call(&mut self, request: RequestPacket) -> Self::Future {
507        // metrics before calling
508        let start = std::time::Instant::now();
509
510        let method_names = match request.clone() {
511            RequestPacket::Single(single_req) => vec![single_req.method().to_owned()],
512            RequestPacket::Batch(vec_req) => vec_req.iter().map(|s_req| s_req.method().to_owned()).collect(),
513        };
514
515        let future = self.inner.call(request);
516
517        // metrics after calling
518        Box::pin(async move {
519            let res = future.await;
520
521            let req_duration = start.elapsed();
522            method_names.iter().for_each(|method| {
523                trace!(method, duration_in_ms = req_duration.as_millis(), "rpc request took");
524                #[cfg(all(feature = "prometheus", not(test)))]
525                METRIC_RPC_CALLS_TIMING.observe(&[method], req_duration.as_secs_f64());
526            });
527
528            // First deserialize the Response object
529            match &res {
530                Ok(result) => match result {
531                    ResponsePacket::Single(a) => match a.payload {
532                        ResponsePayload::Success(_) => {
533                            #[cfg(all(feature = "prometheus", not(test)))]
534                            METRIC_COUNT_RPC_CALLS.increment(&[&method_names[0], "success"]);
535                        }
536                        ResponsePayload::Failure(_) => {
537                            #[cfg(all(feature = "prometheus", not(test)))]
538                            METRIC_COUNT_RPC_CALLS.increment(&[&method_names[0], "failure"]);
539                        }
540                    },
541                    ResponsePacket::Batch(b) => {
542                        b.iter().enumerate().for_each(|(i, _)| match b[i].payload {
543                            ResponsePayload::Success(_) => {
544                                #[cfg(all(feature = "prometheus", not(test)))]
545                                METRIC_COUNT_RPC_CALLS.increment(&[&method_names[i], "success"]);
546                            }
547                            ResponsePayload::Failure(_) => {
548                                #[cfg(all(feature = "prometheus", not(test)))]
549                                METRIC_COUNT_RPC_CALLS.increment(&[&method_names[i], "failure"]);
550                            }
551                        });
552                    }
553                },
554                Err(err) => {
555                    error!(error = ?err, "Error occurred while processing request");
556                    method_names.iter().for_each(|_m| {
557                        #[cfg(all(feature = "prometheus", not(test)))]
558                        METRIC_COUNT_RPC_CALLS.increment(&[_m, "failure"]);
559                    });
560                }
561            };
562
563            res
564        })
565    }
566}
567
568/// Snapshot of a response cached by the [`SnapshotRequestorLayer`].
569#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
570pub struct RequestorResponseSnapshot {
571    id: usize,
572    request: String,
573    response: String,
574}
575
576/// Replays an RPC response to a request if it is found in the snapshot YAML file.
577/// If no such request has been seen before,
578/// it captures the new request/response pair obtained from the inner [`HttpRequestor`]
579/// and stores it into the snapshot file.
580///
581/// This is useful for snapshot testing only and should **NOT** be used in production.
582#[derive(Debug, Clone)]
583pub struct SnapshotRequestor {
584    next_id: Arc<AtomicUsize>,
585    entries: moka::future::Cache<String, RequestorResponseSnapshot>,
586    file: String,
587    aggressive_save: bool,
588    fail_on_miss: bool,
589    ignore_snapshot: bool,
590}
591
592impl SnapshotRequestor {
593    /// Creates a new instance by wrapping an existing [`HttpRequestor`] and capturing
594    /// the request/response pairs.
595    ///
596    /// The constructor does not load any [snapshot entries](SnapshotRequestorLayer) from
597    /// the `snapshot_file`.
598    /// The [`SnapshotRequestorLayer::load`] method must be used after construction to do that.
599    pub fn new(snapshot_file: &str) -> Self {
600        Self {
601            next_id: Arc::new(AtomicUsize::new(1)),
602            entries: moka::future::Cache::builder().build(),
603            file: snapshot_file.to_owned(),
604            aggressive_save: false,
605            fail_on_miss: false,
606            ignore_snapshot: false,
607        }
608    }
609
610    /// Gets the path to the snapshot disk file.
611    pub fn snapshot_path(&self) -> &str {
612        &self.file
613    }
614
615    /// Clears all entries from the snapshot in memory.
616    /// The snapshot file is not changed.
617    pub fn clear(&self) {
618        self.entries.invalidate_all();
619        self.next_id.store(1, Ordering::Relaxed);
620    }
621
622    /// Clears all entries and loads them from the snapshot file.
623    /// If `fail_on_miss` is set and the data is successfully loaded, all later
624    /// requests that miss the loaded snapshot will result in HTTP error 404.
625    pub async fn try_load(&mut self, fail_on_miss: bool) -> Result<(), std::io::Error> {
626        if self.ignore_snapshot {
627            return Ok(());
628        }
629
630        let loaded = serde_yaml::from_reader::<_, Vec<RequestorResponseSnapshot>>(std::fs::File::open(&self.file)?)
631            .map_err(std::io::Error::other)?;
632
633        self.clear();
634
635        let loaded_len = futures::stream::iter(loaded)
636            .then(|entry| {
637                self.next_id.fetch_max(entry.id, Ordering::Relaxed);
638                self.entries.insert(entry.request.clone(), entry)
639            })
640            .collect::<Vec<_>>()
641            .await
642            .len();
643
644        if loaded_len > 0 {
645            self.fail_on_miss = fail_on_miss;
646        }
647
648        tracing::debug!("snapshot with {loaded_len} entries has been loaded from {}", &self.file);
649        Ok(())
650    }
651
652    /// Similar as [`SnapshotRequestorLayer::try_load`], except that no entries are cleared if the load fails.
653    ///
654    /// This method consumes and returns self for easier call chaining.
655    pub async fn load(mut self, fail_on_miss: bool) -> Self {
656        let _ = self.try_load(fail_on_miss).await;
657        self
658    }
659
660    /// Forces saving to disk on each newly inserted entry.
661    ///
662    /// Use this only when the expected number of entries in the snapshot is small.
663    pub fn with_aggresive_save(mut self) -> Self {
664        self.aggressive_save = true;
665        self
666    }
667
668    /// If set, the snapshot data will be ignored and resolution
669    /// will always be done with the inner requestor.
670    ///
671    /// This will inhibit any attempts to [`load`](SnapshotRequestorLayer::try_load) or
672    /// [`save`](SnapshotRequestorLayer::save) snapshot data.
673    pub fn with_ignore_snapshot(mut self, ignore_snapshot: bool) -> Self {
674        self.ignore_snapshot = ignore_snapshot;
675        self
676    }
677
678    /// Save the currently cached entries to the snapshot file on disk.
679    ///
680    /// Note that this method is automatically called on Drop, so usually it is unnecessary
681    /// to call it explicitly.
682    pub fn save(&self) -> Result<(), std::io::Error> {
683        if self.ignore_snapshot {
684            return Ok(());
685        }
686
687        let mut values: Vec<RequestorResponseSnapshot> = self.entries.iter().map(|(_, r)| r).collect();
688        values.sort_unstable_by_key(|a| a.id);
689
690        let mut writer = BufWriter::new(std::fs::File::create(&self.file)?);
691
692        serde_yaml::to_writer(&mut writer, &values).map_err(std::io::Error::other)?;
693
694        writer.flush()?;
695
696        tracing::debug!("snapshot with {} entries saved to file {}", values.len(), self.file);
697        Ok(())
698    }
699}
700
701impl Drop for SnapshotRequestor {
702    fn drop(&mut self) {
703        if let Err(e) = self.save() {
704            tracing::error!("failed to save snapshot: {e}");
705        }
706    }
707}
708
709#[derive(Debug, Clone)]
710pub struct SnapshotRequestorLayer {
711    snapshot_requestor: SnapshotRequestor,
712}
713
714impl SnapshotRequestorLayer {
715    pub fn new(snapshot_file: &str) -> Self {
716        Self {
717            snapshot_requestor: SnapshotRequestor::new(snapshot_file),
718        }
719    }
720
721    pub fn from_requestor(snapshot_requestor: SnapshotRequestor) -> Self {
722        Self { snapshot_requestor }
723    }
724}
725#[derive(Debug, Clone)]
726pub struct SnapshotRequestorService<S> {
727    inner: S,
728    snapshot_requestor: SnapshotRequestor,
729}
730
731// Implement tower::Layer for MetricsLayer.
732impl<S> Layer<S> for SnapshotRequestorLayer {
733    type Service = SnapshotRequestorService<S>;
734
735    fn layer(&self, inner: S) -> Self::Service {
736        SnapshotRequestorService {
737            inner,
738            snapshot_requestor: self.snapshot_requestor.clone(),
739        }
740    }
741}
742
743/// Implement the [`tower::Service`] trait for the [`SnapshotRequestorService`].
744impl<S> Service<RequestPacket> for SnapshotRequestorService<S>
745where
746    S: Service<RequestPacket, Future = TransportFut<'static>, Error = TransportError> + Send + 'static + Clone,
747    S::Error: Send + 'static + Debug,
748{
749    type Error = TransportError;
750    type Future = TransportFut<'static>;
751    type Response = ResponsePacket;
752
753    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
754        self.inner.poll_ready(cx)
755    }
756
757    fn call(&mut self, request: RequestPacket) -> Self::Future {
758        let mut inner = self.inner.clone(); // Clone service
759        let snapshot_requestor = self.snapshot_requestor.clone(); // Clone Arc or similar wrapper
760
761        let future = inner.call(request.clone()); // Move request too
762
763        Box::pin(async move {
764            let res = future.await;
765
766            let request_string = serde_json::to_string(&request)
767                .map_err(|e| TransportErrorKind::Custom(format!("serialize error: {e}").into()))?;
768
769            let inserted = AtomicBool::new(false);
770
771            let _result = snapshot_requestor
772                .entries
773                .entry(request_string.clone())
774                .or_try_insert_with(async {
775                    if snapshot_requestor.fail_on_miss {
776                        tracing::error!("{request_string} is missing in {}", &snapshot_requestor.file);
777                        return Err(TransportErrorKind::http_error(
778                            http::StatusCode::NOT_FOUND.into(),
779                            "".into(),
780                        ));
781                    }
782
783                    let response_string = match &res {
784                        Ok(result) => match result {
785                            ResponsePacket::Single(resp) => match &resp.payload {
786                                ResponsePayload::Success(success_payload) => success_payload.to_string(),
787                                ResponsePayload::Failure(e) => {
788                                    return Err(TransportErrorKind::Custom(format!("RPC error: {e}").into()).into());
789                                }
790                            },
791                            ResponsePacket::Batch(batch) => {
792                                let mut responses = Vec::with_capacity(batch.len());
793                                for (i, resp) in batch.iter().enumerate() {
794                                    match &resp.payload {
795                                        ResponsePayload::Success(success_payload) => {
796                                            responses.push(success_payload.to_string());
797                                        }
798                                        ResponsePayload::Failure(e) => {
799                                            return Err(TransportErrorKind::Custom(
800                                                format!("RPC error in batch item #{i}: {e}").into(),
801                                            )
802                                            .into());
803                                        }
804                                    }
805                                }
806                                responses.join(", ")
807                            }
808                        },
809                        Err(err) => {
810                            error!(error = ?err, "Error occurred while processing request");
811                            return Err(TransportErrorKind::Custom(
812                                format!("Error occurred while processing request: {err}").into(),
813                            )
814                            .into());
815                        }
816                    };
817
818                    let id = snapshot_requestor.next_id.fetch_add(1, Ordering::SeqCst);
819                    inserted.store(true, Ordering::Relaxed);
820                    tracing::debug!("saved new snapshot entry #{id}");
821
822                    Ok(RequestorResponseSnapshot {
823                        id,
824                        request: request_string.clone(),
825                        response: response_string,
826                    })
827                })
828                .await
829                .map(|e| e.into_value().response.into_bytes().into_boxed_slice())
830                .map_err(|e| TransportErrorKind::Custom(format!("{e}").into()))?;
831
832            if inserted.load(Ordering::Relaxed) && snapshot_requestor.aggressive_save {
833                tracing::debug!("{request_string} was NOT found and was resolved");
834                snapshot_requestor
835                    .save()
836                    .map_err(|e| TransportErrorKind::Custom(format!("{e}").into()))?;
837            } else {
838                tracing::debug!("{request_string} was found");
839            }
840
841            res
842        })
843    }
844}
845
846pub type AnvilRpcClient = FillProvider<
847    JoinFill<
848        JoinFill<Identity, JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>>,
849        WalletFiller<EthereumWallet>,
850    >,
851    RootProvider,
852>;
853/// Used for testing. Creates RPC client to the local Anvil instance.
854#[cfg(not(target_arch = "wasm32"))]
855pub fn create_rpc_client_to_anvil(
856    anvil: &alloy::node_bindings::AnvilInstance,
857    signer: &hopr_crypto_types::keypairs::ChainKeypair,
858) -> Arc<AnvilRpcClient> {
859    use alloy::{
860        providers::ProviderBuilder, rpc::client::ClientBuilder, signers::local::PrivateKeySigner,
861        transports::http::ReqwestTransport,
862    };
863    use hopr_crypto_types::keypairs::Keypair;
864
865    let wallet = PrivateKeySigner::from_slice(signer.secret().as_ref()).expect("failed to construct wallet");
866
867    let transport_client = ReqwestTransport::new(anvil.endpoint_url());
868
869    let rpc_client = ClientBuilder::default().transport(transport_client.clone(), transport_client.guess_local());
870
871    let provider = ProviderBuilder::new().wallet(wallet).connect_client(rpc_client);
872
873    Arc::new(provider)
874}
875
876#[cfg(test)]
877mod tests {
878    use std::time::Duration;
879
880    use alloy::{
881        network::TransactionBuilder,
882        primitives::{U256, address},
883        providers::{
884            Provider, ProviderBuilder,
885            fillers::{BlobGasFiller, CachedNonceManager, ChainIdFiller, GasFiller, NonceFiller},
886        },
887        rpc::{client::ClientBuilder, types::TransactionRequest},
888        signers::local::PrivateKeySigner,
889        transports::{http::ReqwestTransport, layers::RetryBackoffLayer},
890    };
891    use anyhow::Ok;
892    use hopr_async_runtime::prelude::sleep;
893    use hopr_chain_types::{ContractAddresses, ContractInstances, utils::create_anvil};
894    use hopr_crypto_types::keypairs::{ChainKeypair, Keypair};
895    use hopr_primitive_types::primitives::Address;
896    use serde_json::json;
897    use tempfile::NamedTempFile;
898
899    use crate::client::{
900        DefaultRetryPolicy, GasOracleFiller, MetricsLayer, SnapshotRequestor, SnapshotRequestorLayer, ZeroRetryPolicy,
901    };
902
903    #[tokio::test]
904    async fn test_client_should_deploy_contracts_via_reqwest() -> anyhow::Result<()> {
905        let anvil = create_anvil(None);
906        let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
907        let signer_chain_key = ChainKeypair::from_secret(signer.to_bytes().as_ref())?;
908
909        let rpc_client = ClientBuilder::default().http(anvil.endpoint_url());
910
911        let provider = ProviderBuilder::new().wallet(signer).connect_client(rpc_client);
912
913        let contracts = ContractInstances::deploy_for_testing(provider.clone(), &signer_chain_key)
914            .await
915            .expect("deploy failed");
916
917        let contract_addrs = ContractAddresses::from(&contracts);
918
919        assert_ne!(contract_addrs.token, Address::default());
920        assert_ne!(contract_addrs.channels, Address::default());
921        assert_ne!(contract_addrs.announcements, Address::default());
922        assert_ne!(contract_addrs.network_registry, Address::default());
923        assert_ne!(contract_addrs.node_safe_registry, Address::default());
924        assert_ne!(contract_addrs.ticket_price_oracle, Address::default());
925
926        Ok(())
927    }
928
929    #[tokio::test]
930    async fn test_client_should_get_block_number() -> anyhow::Result<()> {
931        let block_time = Duration::from_millis(1100);
932
933        let anvil = create_anvil(Some(block_time));
934        let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
935
936        let transport_client = ReqwestTransport::new(anvil.endpoint_url());
937
938        let rpc_client = ClientBuilder::default().transport(transport_client.clone(), transport_client.guess_local());
939
940        let provider = ProviderBuilder::new().wallet(signer).connect_client(rpc_client);
941
942        let mut last_number = 0;
943
944        for _ in 0..3 {
945            sleep(block_time).await;
946
947            let num = provider.get_block_number().await?;
948
949            assert!(num > last_number, "next block number must be greater");
950            last_number = num;
951        }
952
953        Ok(())
954    }
955
956    #[tokio::test]
957    async fn test_client_should_get_block_number_with_metrics_without_retry() -> anyhow::Result<()> {
958        let block_time = Duration::from_secs(1);
959
960        let anvil = create_anvil(Some(block_time));
961        let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
962
963        let transport_client = ReqwestTransport::new(anvil.endpoint_url());
964
965        // additional retry layer
966        let retry_layer = RetryBackoffLayer::new(2, 100, 100);
967
968        let rpc_client = ClientBuilder::default()
969            .layer(retry_layer)
970            .layer(MetricsLayer)
971            .transport(transport_client.clone(), transport_client.guess_local());
972
973        let provider = ProviderBuilder::new().wallet(signer).connect_client(rpc_client);
974
975        let mut last_number = 0;
976
977        for _ in 0..3 {
978            sleep(block_time).await;
979
980            let num = provider.get_block_number().await?;
981
982            assert!(num > last_number, "next block number must be greater");
983            last_number = num;
984        }
985
986        // FIXME: cannot get the private field `requests_enqueued`
987        // assert_eq!(
988        //     0,
989        //     rpc_client.requests_enqueued.load(Ordering::SeqCst),
990        //     "retry queue should be zero on successful requests"
991        // );
992
993        Ok(())
994    }
995
996    #[tokio::test]
997    async fn test_client_should_fail_on_malformed_request() -> anyhow::Result<()> {
998        let anvil = create_anvil(None);
999        let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
1000
1001        let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1002
1003        let rpc_client = ClientBuilder::default().transport(transport_client.clone(), transport_client.guess_local());
1004
1005        let provider = ProviderBuilder::new().wallet(signer).connect_client(rpc_client);
1006
1007        let err = provider
1008            .raw_request::<(), alloy::primitives::U64>("eth_blockNumber_bla".into(), ())
1009            .await
1010            .expect_err("expected error");
1011
1012        assert!(matches!(err, alloy::transports::RpcError::ErrorResp(..)));
1013
1014        Ok(())
1015    }
1016
1017    #[tokio::test]
1018    async fn test_client_should_fail_on_malformed_response() {
1019        let mut server = mockito::Server::new_async().await;
1020
1021        let m = server
1022            .mock("POST", "/")
1023            .with_status(200)
1024            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1025            .with_body("}malformed{")
1026            .expect(1)
1027            .create();
1028
1029        let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1030
1031        let rpc_client = ClientBuilder::default()
1032            .layer(RetryBackoffLayer::new(2, 100, 100))
1033            .transport(transport_client.clone(), transport_client.guess_local());
1034
1035        let provider = ProviderBuilder::new().connect_client(rpc_client);
1036
1037        let err = provider
1038            .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1039            .await
1040            .expect_err("expected error");
1041
1042        m.assert();
1043        assert!(matches!(
1044            err,
1045            alloy::transports::RpcError::DeserError { err: _, text: _ }
1046        ));
1047    }
1048
1049    #[tokio::test]
1050    async fn test_client_should_retry_on_http_error() {
1051        let mut server = mockito::Server::new_async().await;
1052
1053        let too_many_requests: u16 = http::StatusCode::TOO_MANY_REQUESTS.as_u16();
1054
1055        let m = server
1056            .mock("POST", "/")
1057            .with_status(too_many_requests as usize)
1058            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1059            .with_body("{}")
1060            .expect(3)
1061            .create();
1062
1063        let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1064
1065        let rpc_client = ClientBuilder::default()
1066            .layer(RetryBackoffLayer::new(2, 100, 100))
1067            .transport(transport_client.clone(), transport_client.guess_local());
1068
1069        // TODO: FIXME: implement a CustomRetryBackoff policy/service and test its `requests_enqueued`
1070        // let client = JsonRpcProviderClient::new(
1071        //     &server.url(),
1072        //     SurfRequestor::default(),
1073        //     SimpleJsonRpcRetryPolicy {
1074        //         max_retries: Some(2),
1075        //         retryable_http_errors: vec![http_types::StatusCode::TooManyRequests],
1076        //         initial_backoff: Duration::from_millis(100),
1077        //         ..SimpleJsonRpcRetryPolicy::default()
1078        //     },
1079        // );
1080
1081        let provider = ProviderBuilder::new().connect_client(rpc_client);
1082
1083        let err = provider
1084            .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1085            .await
1086            .expect_err("expected error");
1087
1088        m.assert();
1089        assert!(matches!(err, alloy::transports::RpcError::Transport(..)));
1090
1091        // TODO: Create a customize RetryBackoffService that exposes `requests_enqueued`
1092        // assert_eq!(
1093        //     0,
1094        //     client.requests_enqueued.load(Ordering::SeqCst),
1095        //     "retry queue should be zero when policy says no more retries"
1096        // );
1097    }
1098
1099    #[tokio::test]
1100    async fn test_client_should_not_retry_with_zero_retry_policy() {
1101        let mut server = mockito::Server::new_async().await;
1102
1103        let m = server
1104            .mock("POST", "/")
1105            .with_status(404)
1106            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1107            .with_body("{}")
1108            .expect(1)
1109            .create();
1110
1111        let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1112
1113        let rpc_client = ClientBuilder::default()
1114            .layer(RetryBackoffLayer::new_with_policy(2, 100, 100, ZeroRetryPolicy))
1115            .transport(transport_client.clone(), transport_client.guess_local());
1116
1117        let provider = ProviderBuilder::new().connect_client(rpc_client);
1118
1119        let err = provider
1120            .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1121            .await
1122            .expect_err("expected error");
1123
1124        m.assert();
1125        assert!(matches!(err, alloy::transports::RpcError::Transport(..)));
1126        // TODO: Create a customize RetryBackoffService that exposes `requests_enqueued`
1127        // assert_eq!(
1128        //     0,
1129        //     client.requests_enqueued.load(Ordering::SeqCst),
1130        //     "retry queue should be zero when policy says no more retries"
1131        // );
1132    }
1133
1134    #[tokio::test]
1135    async fn test_client_should_retry_on_json_rpc_error() {
1136        let mut server = mockito::Server::new_async().await;
1137
1138        let m = server
1139            .mock("POST", "/")
1140            .with_status(200)
1141            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1142            .with_body(
1143                r#"{
1144              "jsonrpc": "2.0",
1145              "id": 1,
1146              "error": {
1147                "message": "some message",
1148                "code": -32603
1149              }
1150            }"#,
1151            )
1152            .expect(3)
1153            .create();
1154
1155        let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1156
1157        let simple_json_rpc_retry_policy = DefaultRetryPolicy {
1158            initial_backoff: Duration::from_millis(100),
1159            retryable_json_rpc_errors: vec![-32603],
1160            ..DefaultRetryPolicy::default()
1161        };
1162        let rpc_client = ClientBuilder::default()
1163            .layer(RetryBackoffLayer::new_with_policy(
1164                2,
1165                100,
1166                100,
1167                simple_json_rpc_retry_policy,
1168            ))
1169            .transport(transport_client.clone(), transport_client.guess_local());
1170
1171        let provider = ProviderBuilder::new().connect_client(rpc_client);
1172
1173        let err = provider
1174            .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1175            .await
1176            .expect_err("expected error");
1177
1178        m.assert();
1179        assert!(matches!(err, alloy::transports::RpcError::Transport(..)));
1180        // TODO: Create a customize RetryBackoffService that exposes `requests_enqueued`
1181        // assert_eq!(
1182        //     0,
1183        //     client.requests_enqueued.load(Ordering::SeqCst),
1184        //     "retry queue should be zero when policy says no more retries"
1185        // );
1186    }
1187
1188    #[tokio::test]
1189    async fn test_client_should_not_retry_on_nonretryable_json_rpc_error() {
1190        let mut server = mockito::Server::new_async().await;
1191
1192        let m = server
1193            .mock("POST", "/")
1194            .with_status(200)
1195            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1196            .with_body(
1197                r#"{
1198              "jsonrpc": "2.0",
1199              "id": 1,
1200              "error": {
1201                "message": "some message",
1202                "code": -32000
1203              }
1204            }"#,
1205            )
1206            .expect(1)
1207            .create();
1208
1209        let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1210
1211        let simple_json_rpc_retry_policy = DefaultRetryPolicy {
1212            initial_backoff: Duration::from_millis(100),
1213            retryable_json_rpc_errors: vec![],
1214            ..DefaultRetryPolicy::default()
1215        };
1216        let rpc_client = ClientBuilder::default()
1217            .layer(RetryBackoffLayer::new_with_policy(
1218                2,
1219                100,
1220                100,
1221                simple_json_rpc_retry_policy,
1222            ))
1223            .transport(transport_client.clone(), transport_client.guess_local());
1224
1225        let provider = ProviderBuilder::new().connect_client(rpc_client);
1226
1227        let err = provider
1228            .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1229            .await
1230            .expect_err("expected error");
1231
1232        m.assert();
1233        assert!(matches!(err, alloy::transports::RpcError::ErrorResp(..)));
1234
1235        // TODO: Create a customize RetryBackoffService that exposes `requests_enqueued`
1236        // assert_eq!(
1237        //     0,
1238        //     client.requests_enqueued.load(Ordering::SeqCst),
1239        //     "retry queue should be zero when policy says no more retries"
1240        // );
1241    }
1242
1243    #[tokio::test]
1244    async fn test_client_should_retry_on_nonretryable_json_rpc_error_if_min_retries_is_given() {
1245        let mut server = mockito::Server::new_async().await;
1246
1247        let _m = server
1248            .mock("POST", "/")
1249            .with_status(200)
1250            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1251            .with_body(
1252                r#"{
1253              "jsonrpc": "2.0",
1254              "id": 1,
1255              "error": {
1256                "message": "some message",
1257                "code": -32000
1258              }
1259            }"#,
1260            )
1261            .expect(2)
1262            .create();
1263
1264        let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1265
1266        let simple_json_rpc_retry_policy = DefaultRetryPolicy {
1267            initial_backoff: Duration::from_millis(100),
1268            retryable_json_rpc_errors: vec![],
1269            min_retries: Some(1),
1270            ..DefaultRetryPolicy::default()
1271        };
1272        let rpc_client = ClientBuilder::default()
1273            .layer(RetryBackoffLayer::new_with_policy(
1274                2,
1275                100,
1276                100,
1277                simple_json_rpc_retry_policy,
1278            ))
1279            .transport(transport_client.clone(), transport_client.guess_local());
1280
1281        let provider = ProviderBuilder::new().connect_client(rpc_client);
1282
1283        let err = provider
1284            .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1285            .await
1286            .expect_err("expected error");
1287
1288        // FIXME: implement minimum retry, and enable the assert
1289        // m.assert();
1290        assert!(matches!(err, alloy::transports::RpcError::ErrorResp(..)));
1291
1292        // TODO: Create a customize RetryBackoffService that exposes `requests_enqueued`
1293        // assert_eq!(
1294        //     0,
1295        //     client.requests_enqueued.load(Ordering::SeqCst),
1296        //     "retry queue should be zero when policy says no more retries"
1297        // );
1298    }
1299
1300    #[tokio::test]
1301    async fn test_client_should_retry_on_malformed_json_rpc_error() {
1302        let mut server = mockito::Server::new_async().await;
1303
1304        let m = server
1305            .mock("POST", "/")
1306            .with_status(200)
1307            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1308            .with_body(
1309                r#"{
1310              "jsonrpc": "2.0",
1311              "error": {
1312                "message": "some message",
1313                "code": -32600
1314              }
1315            }"#,
1316            )
1317            .expect(3)
1318            .create();
1319
1320        let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1321
1322        let simple_json_rpc_retry_policy = DefaultRetryPolicy {
1323            initial_backoff: Duration::from_millis(100),
1324            retryable_json_rpc_errors: vec![-32600],
1325            min_retries: Some(1),
1326            ..DefaultRetryPolicy::default()
1327        };
1328        let rpc_client = ClientBuilder::default()
1329            .layer(RetryBackoffLayer::new_with_policy(
1330                2,
1331                100,
1332                100,
1333                simple_json_rpc_retry_policy,
1334            ))
1335            .transport(transport_client.clone(), transport_client.guess_local());
1336
1337        let provider = ProviderBuilder::new().connect_client(rpc_client);
1338
1339        let err = provider
1340            .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1341            .await
1342            .expect_err("expected error");
1343
1344        m.assert();
1345        assert!(matches!(err, alloy::transports::RpcError::Transport(..)));
1346
1347        // TODO: Create a customize RetryBackoffService that exposes `requests_enqueued`
1348        // assert_eq!(
1349        //     0,
1350        //     client.requests_enqueued.load(Ordering::SeqCst),
1351        //     "retry queue should be zero when policy says no more retries"
1352        // );
1353    }
1354
1355    #[test_log::test(tokio::test)]
1356    async fn test_client_from_file() -> anyhow::Result<()> {
1357        let block_time = Duration::from_millis(1100);
1358        let snapshot_file = NamedTempFile::new()?;
1359
1360        let anvil = create_anvil(Some(block_time));
1361
1362        {
1363            let mut last_number = 0;
1364
1365            let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1366
1367            let rpc_client = ClientBuilder::default()
1368                .layer(RetryBackoffLayer::new_with_policy(
1369                    2,
1370                    100,
1371                    100,
1372                    DefaultRetryPolicy::default(),
1373                ))
1374                .layer(SnapshotRequestorLayer::new(snapshot_file.path().to_str().unwrap()))
1375                .transport(transport_client.clone(), transport_client.guess_local());
1376
1377            let provider = ProviderBuilder::new().connect_client(rpc_client);
1378
1379            for _ in 0..3 {
1380                sleep(block_time).await;
1381
1382                let num = provider.get_block_number().await?;
1383
1384                assert!(num > last_number, "next block number must be greater");
1385                last_number = num;
1386            }
1387        }
1388
1389        {
1390            let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1391
1392            let snapshot_requestor = SnapshotRequestor::new(snapshot_file.path().to_str().unwrap())
1393                .load(true)
1394                .await;
1395
1396            let rpc_client = ClientBuilder::default()
1397                .layer(RetryBackoffLayer::new_with_policy(
1398                    2,
1399                    100,
1400                    100,
1401                    DefaultRetryPolicy::default(),
1402                ))
1403                .layer(SnapshotRequestorLayer::from_requestor(snapshot_requestor))
1404                .transport(transport_client.clone(), transport_client.guess_local());
1405
1406            let provider = ProviderBuilder::new().connect_client(rpc_client);
1407
1408            let mut last_number = 0;
1409            for _ in 0..3 {
1410                sleep(block_time).await;
1411
1412                let num = provider.get_block_number().await?;
1413
1414                assert!(num > last_number, "next block number must be greater");
1415                last_number = num;
1416            }
1417        }
1418
1419        Ok(())
1420    }
1421
1422    #[tokio::test]
1423    async fn test_client_should_call_on_gas_oracle_for_eip1559_tx() -> anyhow::Result<()> {
1424        let _ = env_logger::builder().is_test(true).try_init();
1425
1426        let mut server = mockito::Server::new_async().await;
1427
1428        let m = server
1429            .mock("GET", "/gasapi.ashx?apikey=key&method=gasoracle")
1430            .with_status(http::StatusCode::ACCEPTED.as_u16().into())
1431            .with_body(r#"{"status":"1","message":"OK","result":{"LastBlock":"39864926","SafeGasPrice":"1.1","ProposeGasPrice":"1.1","FastGasPrice":"1.6","UsdPrice":"0.999968207972734"}}"#)
1432            .expect(0)
1433            .create();
1434
1435        let anvil = create_anvil(None);
1436        let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
1437
1438        let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1439        // let underlying_transport_client = transport_client.client().clone();
1440
1441        let rpc_client = ClientBuilder::default()
1442            .layer(RetryBackoffLayer::new(2, 100, 100))
1443            .transport(transport_client.clone(), transport_client.guess_local());
1444
1445        let provider = ProviderBuilder::new()
1446            .disable_recommended_fillers()
1447            .wallet(signer)
1448            .filler(NonceFiller::new(CachedNonceManager::default()))
1449            .filler(GasOracleFiller::new(
1450                transport_client.client().clone(),
1451                Some((server.url() + "/gasapi.ashx?apikey=key&method=gasoracle").parse()?),
1452            ))
1453            .filler(GasFiller)
1454            .connect_client(rpc_client);
1455
1456        let tx = TransactionRequest::default()
1457            .with_chain_id(provider.get_chain_id().await?)
1458            .to(address!("d8dA6BF26964aF9D7eEd9e03E53415D37aA96045"))
1459            .value(U256::from(100))
1460            .transaction_type(2);
1461
1462        let receipt = provider.send_transaction(tx).await?.get_receipt().await?;
1463
1464        m.assert();
1465        assert_eq!(receipt.gas_used, 21000);
1466        Ok(())
1467    }
1468
1469    #[tokio::test]
1470    async fn test_client_should_call_on_gas_oracle_for_legacy_tx() -> anyhow::Result<()> {
1471        let _ = env_logger::builder().is_test(true).try_init();
1472
1473        let mut server = mockito::Server::new_async().await;
1474
1475        let m = server
1476            .mock("GET", "/gasapi.ashx?apikey=key&method=gasoracle")
1477            .with_status(http::StatusCode::ACCEPTED.as_u16().into())
1478            .with_body(r#"{"status":"1","message":"OK","result":{"LastBlock":"39864926","SafeGasPrice":"1.1","ProposeGasPrice":"3.5","FastGasPrice":"1.6","UsdPrice":"0.999968207972734"}}"#)
1479            .expect(1)
1480            .create();
1481
1482        let anvil = create_anvil(None);
1483        let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
1484
1485        let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1486
1487        let rpc_client = ClientBuilder::default()
1488            .layer(RetryBackoffLayer::new(2, 100, 100))
1489            .transport(transport_client.clone(), transport_client.guess_local());
1490
1491        let provider = ProviderBuilder::new()
1492            .disable_recommended_fillers()
1493            .wallet(signer)
1494            .filler(ChainIdFiller::default())
1495            .filler(NonceFiller::new(CachedNonceManager::default()))
1496            .filler(GasOracleFiller::new(
1497                transport_client.client().clone(),
1498                Some((server.url() + "/gasapi.ashx?apikey=key&method=gasoracle").parse()?),
1499            ))
1500            .filler(GasFiller)
1501            .filler(BlobGasFiller)
1502            .connect_client(rpc_client);
1503
1504        // GasEstimationLayer requires chain_id to be set to handle EIP-1559 tx
1505        let tx = TransactionRequest::default()
1506            .with_to(address!("d8dA6BF26964aF9D7eEd9e03E53415D37aA96045"))
1507            .with_value(U256::from(100))
1508            .with_gas_price(1000000000);
1509
1510        let receipt = provider.send_transaction(tx).await?.get_receipt().await?;
1511
1512        m.assert();
1513        assert_eq!(receipt.gas_used, 21000);
1514        Ok(())
1515    }
1516}