hopr_chain_rpc/
client.rs

1//! Due to the migration of the RPC client to the `alloy` crate, this module contains implementation
2//! and parameters of client layers. The underlying HTTP transport layer is defined in `transport.rs`.
3//!
4//! Extended layers of RPC clients:
5//! - Replace the legacy retry backoff layer with the default [`RetryBackoffService`]. However the backoff calculation
6//!   still needs to be improved, as the number of retries is not passed to the `backoff_hint` method.
7//! - Add Metrics Layer
8//! - Add Snapshot Layer
9//! - Use tokio runtime for most of the tests
10//!
11//! This module contains defalut gas estimation constants for EIP-1559 for Gnosis chain,
12use std::{
13    fmt::Debug,
14    future::IntoFuture,
15    io::{BufWriter, Write},
16    sync::{
17        Arc,
18        atomic::{AtomicBool, AtomicUsize, Ordering},
19    },
20    task::{Context, Poll},
21    time::Duration,
22};
23
24/// as GasOracleMiddleware middleware is migrated to GasFiller
25use alloy::eips::eip1559::Eip1559Estimation;
26use alloy::{
27    network::{EthereumWallet, Network, TransactionBuilder},
28    primitives::utils::parse_units,
29    providers::{
30        Identity, Provider, RootProvider, SendableTx,
31        fillers::{
32            BlobGasFiller, ChainIdFiller, FillProvider, FillerControlFlow, GasFiller, JoinFill, NonceFiller, TxFiller,
33            WalletFiller,
34        },
35    },
36    rpc::json_rpc::{ErrorPayload, RequestPacket, ResponsePacket, ResponsePayload},
37    transports::{HttpError, TransportError, TransportErrorKind, TransportFut, TransportResult, layers::RetryPolicy},
38};
39use futures::{FutureExt, StreamExt};
40use serde::{Deserialize, Serialize};
41use serde_with::{DisplayFromStr, serde_as};
42use tower::{Layer, Service};
43use tracing::{error, trace};
44use url::Url;
45use validator::Validate;
46
47/// Gas estimation constants for EIP-1559 for Gnosis chain.
48/// These values are used to estimate the gas price for transactions.
49/// As GasOracleMiddleware is migrated to GasFiller, they are replaced with
50/// default values.
51pub const EIP1559_FEE_ESTIMATION_DEFAULT_MAX_FEE_GNOSIS: u128 = 3_000_000_000;
52pub const EIP1559_FEE_ESTIMATION_DEFAULT_PRIORITY_FEE_GNOSIS: u128 = 100_000_000;
53
54#[cfg(all(feature = "prometheus", not(test)))]
55use hopr_metrics::metrics::{MultiCounter, MultiHistogram};
56
57use crate::{rpc::DEFAULT_GAS_ORACLE_URL, transport::HttpRequestor};
58
59#[cfg(all(feature = "prometheus", not(test)))]
60lazy_static::lazy_static! {
61    static ref METRIC_COUNT_RPC_CALLS: MultiCounter = MultiCounter::new(
62        "hopr_rpc_call_count",
63        "Number of Ethereum RPC calls over HTTP and their result",
64        &["call", "result"]
65    )
66    .unwrap();
67    static ref METRIC_RPC_CALLS_TIMING: MultiHistogram = MultiHistogram::new(
68        "hopr_rpc_call_time_sec",
69        "Timing of RPC calls over HTTP in seconds",
70        vec![0.1, 0.5, 1.0, 2.0, 5.0, 7.0, 10.0],
71        &["call"]
72    )
73    .unwrap();
74    static ref METRIC_RETRIES_PER_RPC_CALL: MultiHistogram = MultiHistogram::new(
75        "hopr_retries_per_rpc_call",
76        "Number of retries per RPC call",
77        vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
78        &["call"]
79    )
80    .unwrap();
81}
82
83/// Defines a default retry policy suitable for `RpcClient`.
84/// This is a reimplementation of the legacy "retry policy suitable for `JsonRpcProviderClient`"
85///
86/// This retry policy distinguishes between 4 types of RPC request failures:
87/// - JSON RPC error (based on error code)
88/// - HTTP error (based on HTTP status)
89/// - Transport error (e.g. connection timeout)
90/// - Serde error (some of these are treated as JSON RPC error above, if an error code can be obtained).
91///
92/// The standard `RetryBackoffLayer` defines the following properties:
93/// - `max_rate_limit_retries`: (u32) The maximum number of retries for rate limit errors. Different from the legacy
94///   implementation, there is always an upper limit.
95/// - `initial_backoff`: (u64) The initial backoff in milliseconds
96/// - `compute_units_per_second`: (u64) The number of compute units per second for this service
97///
98/// The policy will make up to `max_retries` once a JSON RPC request fails.
99/// The minimum number of retries `min_retries` can be also specified and applies to any type of error regardless.
100/// Each retry `k > 0` will be separated by a delay of `initial_backoff * (1 + backoff_coefficient)^(k - 1)`,
101/// namely all the JSON RPC error codes specified in `retryable_json_rpc_errors` and all the HTTP errors
102/// specified in `retryable_http_errors`.
103///
104/// The total wait time will be `(initial_backoff/backoff_coefficient) * ((1 + backoff_coefficient)^max_retries - 1)`.
105/// or `max_backoff`, whatever is lower.
106///
107/// Transport and connection errors (such as connection timeouts) are retried without backoff
108/// at a constant delay of `initial_backoff` if `backoff_on_transport_errors` is not set.
109///
110/// No more additional retries are allowed on new requests, if the maximum number of concurrent
111/// requests being retried has reached `max_retry_queue_size`.
112#[serde_as]
113#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault, Serialize, Deserialize, Validate)]
114pub struct DefaultRetryPolicy {
115    /// Minimum number of retries of any error, regardless the error code.
116    ///
117    /// Default is 0.
118    #[validate(range(min = 0))]
119    #[default(Some(0))]
120    pub min_retries: Option<u32>,
121
122    /// Initial wait before retries.
123    ///
124    /// NOTE: Transport and connection errors (such as connection timeouts) are retried at
125    /// a constant rate (no backoff) with this delay if `backoff_on_transport_errors` is not set.
126    ///
127    /// Default is 1 second.
128    #[default(Duration::from_secs(1))]
129    pub initial_backoff: Duration,
130
131    /// Backoff coefficient by which will be each retry multiplied.
132    ///
133    /// Must be non-negative. If set to `0`, no backoff will be applied and the
134    /// requests will be retried at a constant rate.
135    ///
136    /// Default is 0.3
137    #[validate(range(min = 0.0))]
138    #[default(0.3)]
139    pub backoff_coefficient: f64,
140    /// Maximum backoff value.
141    ///
142    /// Once reached, the requests will be retried at a constant rate with this timeout.
143    ///
144    /// Default is 30 seconds.
145    #[default(Duration::from_secs(30))]
146    pub max_backoff: Duration,
147    /// Indicates whether to also apply backoff to transport and connection errors (such as connection timeouts).
148    ///
149    /// Default is false.
150    pub backoff_on_transport_errors: bool,
151    /// List of JSON RPC errors that should be retried with backoff
152    ///
153    /// Default is \[429, -32005, -32016\]
154    #[default(_code = "vec![-32005, -32016, 429]")]
155    pub retryable_json_rpc_errors: Vec<i64>,
156
157    /// List of HTTP errors that should be retried with backoff.
158    ///
159    /// Default is \[429, 504, 503\]
160    #[serde_as(as = "Vec<DisplayFromStr>")]
161    #[default(
162        _code = "vec![http::StatusCode::TOO_MANY_REQUESTS,http::StatusCode::GATEWAY_TIMEOUT,\
163                 http::StatusCode::SERVICE_UNAVAILABLE]"
164    )]
165    pub retryable_http_errors: Vec<http::StatusCode>,
166
167    /// Maximum number of different requests that are being retried at the same time.
168    ///
169    /// If any additional request fails after this number is attained, it won't be retried.
170    ///
171    /// Default is 100
172    #[validate(range(min = 5))]
173    #[default = 100]
174    pub max_retry_queue_size: u32,
175}
176
177impl DefaultRetryPolicy {
178    fn is_retryable_json_rpc_errors(&self, rpc_err: &ErrorPayload) -> bool {
179        self.retryable_json_rpc_errors.contains(&rpc_err.code)
180    }
181
182    fn is_retryable_http_errors(&self, http_err: &HttpError) -> bool {
183        let status_code = match http::StatusCode::try_from(http_err.status) {
184            Ok(status_code) => status_code,
185            Err(_) => return false,
186        };
187        self.retryable_http_errors.contains(&status_code)
188    }
189}
190
191impl RetryPolicy for DefaultRetryPolicy {
192    fn should_retry(&self, err: &TransportError) -> bool {
193        match err {
194            // There was a transport-level error. This is either a non-retryable error,
195            // or a server error that should be retried.
196            TransportError::Transport(err) => {
197                match err {
198                    // Missing batch response errors can be retried.
199                    TransportErrorKind::MissingBatchResponse(_) => true,
200                    TransportErrorKind::HttpError(http_err) => {
201                        http_err.is_rate_limit_err() || self.is_retryable_http_errors(http_err)
202                    }
203                    TransportErrorKind::Custom(err) => {
204                        let msg = err.to_string();
205                        msg.contains("429 Too Many Requests")
206                    }
207                    _ => false,
208                }
209            }
210            // The transport could not serialize the error itself. The request was malformed from
211            // the start.
212            TransportError::SerError(_) => false,
213            TransportError::DeserError { text, .. } => {
214                if let Ok(resp) = serde_json::from_str::<ErrorPayload>(text) {
215                    return self.is_retryable_json_rpc_errors(&resp);
216                }
217
218                // some providers send invalid JSON RPC in the error case (no `id:u64`), but the
219                // text should be a `JsonRpcError`
220                #[derive(Deserialize)]
221                struct Resp {
222                    error: ErrorPayload,
223                }
224
225                if let Ok(resp) = serde_json::from_str::<Resp>(text) {
226                    return self.is_retryable_json_rpc_errors(&resp.error);
227                }
228
229                false
230            }
231            TransportError::ErrorResp(err) => self.is_retryable_json_rpc_errors(err),
232            TransportError::NullResp => true,
233            _ => false,
234        }
235    }
236
237    // TODO(#7140): original implementation requires input param of `num_retries`
238    // next_backoff = initial_backoff * (1 + backoff_coefficient)^(num_retries - 1)
239    fn backoff_hint(&self, _error: &alloy::transports::TransportError) -> Option<std::time::Duration> {
240        None
241    }
242}
243
244#[derive(Debug, Clone)]
245pub struct ZeroRetryPolicy;
246impl RetryPolicy for ZeroRetryPolicy {
247    fn should_retry(&self, _err: &alloy::transports::TransportError) -> bool {
248        false
249    }
250
251    fn backoff_hint(&self, _error: &alloy::transports::TransportError) -> Option<std::time::Duration> {
252        None
253    }
254}
255
256/// Generic [`GasOracle`] gas price categories.
257#[derive(Clone, Copy, Default, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
258pub enum GasCategory {
259    SafeLow,
260    #[default]
261    Standard,
262    Fast,
263    Fastest,
264}
265
266/// Use the underlying gas tracker API of GnosisScan to populate the gas price.
267/// It returns gas price in gwei.
268/// It implements the `GasOracle` trait.
269/// If no Oracle URL is given, it returns no values.
270#[derive(Clone, Debug)]
271pub struct GasOracleFiller<C> {
272    client: C,
273    url: Url,
274    gas_category: GasCategory,
275}
276
277#[derive(Clone, Debug, Deserialize, PartialEq)]
278pub struct GasOracleResponse {
279    pub status: String,
280    pub message: String,
281    pub result: GasOracleResponseResult,
282}
283
284#[derive(Clone, Debug, Deserialize, PartialEq)]
285#[serde(rename_all = "PascalCase")]
286pub struct GasOracleResponseResult {
287    pub last_block: String,
288    pub safe_gas_price: String,
289    pub propose_gas_price: String,
290    pub fast_gas_price: String,
291}
292
293impl GasOracleResponse {
294    #[inline]
295    pub fn gas_from_category(&self, gas_category: GasCategory) -> String {
296        self.result.gas_from_category(gas_category)
297    }
298}
299
300impl GasOracleResponseResult {
301    fn gas_from_category(&self, gas_category: GasCategory) -> String {
302        match gas_category {
303            GasCategory::SafeLow => self.safe_gas_price.clone(),
304            GasCategory::Standard => self.propose_gas_price.clone(),
305            GasCategory::Fast => self.fast_gas_price.clone(),
306            GasCategory::Fastest => self.fast_gas_price.clone(),
307        }
308    }
309}
310
311impl<C> GasOracleFiller<C>
312where
313    C: HttpRequestor + Clone,
314{
315    /// Same as [`Self::new`] but with a custom [`Client`].
316    pub fn new(client: C, url: Option<Url>) -> Self {
317        Self {
318            client,
319            url: url.unwrap_or_else(|| Url::parse(DEFAULT_GAS_ORACLE_URL).unwrap()),
320            gas_category: GasCategory::Standard,
321        }
322    }
323
324    /// Sets the gas price category to be used when fetching the gas price.
325    pub fn category(mut self, gas_category: GasCategory) -> Self {
326        self.gas_category = gas_category;
327        self
328    }
329
330    /// Perform a request to the gas price API and deserialize the response.
331    pub async fn query(&self) -> Result<GasOracleResponse, TransportError> {
332        let raw_value = self
333            .client
334            .http_get(self.url.as_str())
335            .await
336            .map_err(TransportErrorKind::custom)?;
337
338        let parsed: GasOracleResponse = serde_json::from_slice(raw_value.as_ref()).map_err(|e| {
339            error!(%e, "failed to deserialize gas price API response");
340            TransportErrorKind::Custom("failed to deserialize gas price API response".into())
341        })?;
342
343        Ok(parsed)
344    }
345
346    async fn prepare_legacy<P, N>(&self, provider: &P, tx: &N::TransactionRequest) -> TransportResult<GasOracleFillable>
347    where
348        P: Provider<N>,
349        N: Network,
350    {
351        let gas_limit_fut = tx.gas_limit().map_or_else(
352            || provider.estimate_gas(tx.clone()).into_future().right_future(),
353            |gas_limit| async move { Ok(gas_limit) }.left_future(),
354        );
355
356        let res_fut = self.query();
357
358        // Run both futures concurrently
359        let (gas_limit, res) = futures::try_join!(gas_limit_fut, res_fut)?;
360
361        // // Await the future to get the gas limit
362        // let gas_limit = gas_limit_fut.await?;
363
364        // let res = self.query().await?;
365        let gas_price_in_gwei = res.gas_from_category(self.gas_category);
366        let gas_price = parse_units(&gas_price_in_gwei, "gwei")
367            .map_err(|e| TransportErrorKind::custom_str(&format!("Failed to parse gwei from gas oracle: {e}")))?;
368        let gas_price_in_128: u128 = gas_price
369            .get_absolute()
370            .try_into()
371            .map_err(|_| TransportErrorKind::custom_str("Conversion overflow"))?;
372
373        Ok(GasOracleFillable::Legacy {
374            gas_limit,
375            gas_price: tx.gas_price().unwrap_or(gas_price_in_128),
376        })
377    }
378
379    async fn prepare_1559<P, N>(&self, provider: &P, tx: &N::TransactionRequest) -> TransportResult<GasOracleFillable>
380    where
381        P: Provider<N>,
382        N: Network,
383    {
384        let gas_limit_fut = tx.gas_limit().map_or_else(
385            || provider.estimate_gas(tx.clone()).into_future().right_future(),
386            |gas_limit| async move { Ok(gas_limit) }.left_future(),
387        );
388
389        // Await the future to get the gas limit
390        let gas_limit = gas_limit_fut.await?;
391
392        Ok(GasOracleFillable::Eip1559 {
393            gas_limit,
394            estimate: self.estimate_eip1559_fees(),
395        })
396    }
397
398    // returns hardcoded (max_fee_per_gas, max_priority_fee_per_gas)
399    // Due to foundry is unable to estimate EIP-1559 fees for L2s https://github.com/foundry-rs/foundry/issues/5709,
400    // a hardcoded value of (3 gwei, 0.1 gwei) for Gnosischain is returned.
401    fn estimate_eip1559_fees(&self) -> Eip1559Estimation {
402        Eip1559Estimation {
403            max_fee_per_gas: EIP1559_FEE_ESTIMATION_DEFAULT_MAX_FEE_GNOSIS,
404            max_priority_fee_per_gas: EIP1559_FEE_ESTIMATION_DEFAULT_PRIORITY_FEE_GNOSIS,
405        }
406    }
407}
408
409/// An enum over the different types of gas fillable.
410#[doc(hidden)]
411#[derive(Clone, Copy, Debug, PartialEq, Eq)]
412pub enum GasOracleFillable {
413    Legacy {
414        gas_limit: u64,
415        gas_price: u128,
416    },
417    Eip1559 {
418        gas_limit: u64,
419        estimate: Eip1559Estimation,
420    },
421}
422
423impl<N, C> TxFiller<N> for GasOracleFiller<C>
424where
425    N: Network,
426    C: HttpRequestor + Clone,
427{
428    type Fillable = GasOracleFillable;
429
430    fn status(&self, tx: &<N as Network>::TransactionRequest) -> FillerControlFlow {
431        // legacy and eip2930 tx
432        if tx.gas_price().is_some() && tx.gas_limit().is_some() {
433            return FillerControlFlow::Finished;
434        }
435
436        // eip1559
437        if tx.max_fee_per_gas().is_some() && tx.max_priority_fee_per_gas().is_some() && tx.gas_limit().is_some() {
438            return FillerControlFlow::Finished;
439        }
440
441        FillerControlFlow::Ready
442    }
443
444    fn fill_sync(&self, _tx: &mut SendableTx<N>) {}
445
446    async fn prepare<P>(&self, provider: &P, tx: &<N as Network>::TransactionRequest) -> TransportResult<Self::Fillable>
447    where
448        P: Provider<N>,
449    {
450        if tx.gas_price().is_some() {
451            self.prepare_legacy(provider, tx).await
452        } else {
453            match self.prepare_1559(provider, tx).await {
454                // fallback to legacy
455                Ok(estimate) => Ok(estimate),
456                Err(e) => Err(e),
457            }
458        }
459    }
460
461    async fn fill(&self, fillable: Self::Fillable, mut tx: SendableTx<N>) -> TransportResult<SendableTx<N>> {
462        if let Some(builder) = tx.as_mut_builder() {
463            match fillable {
464                GasOracleFillable::Legacy { gas_limit, gas_price } => {
465                    builder.set_gas_limit(gas_limit);
466                    builder.set_gas_price(gas_price);
467                }
468                GasOracleFillable::Eip1559 { gas_limit, estimate } => {
469                    builder.set_gas_limit(gas_limit);
470                    builder.set_max_fee_per_gas(estimate.max_fee_per_gas);
471                    builder.set_max_priority_fee_per_gas(estimate.max_priority_fee_per_gas);
472                }
473            }
474        };
475        Ok(tx)
476    }
477}
478
479pub struct MetricsLayer;
480
481#[derive(Debug, Clone)]
482pub struct MetricsService<S> {
483    inner: S,
484}
485
486// Implement tower::Layer for MetricsLayer.
487impl<S> Layer<S> for MetricsLayer {
488    type Service = MetricsService<S>;
489
490    fn layer(&self, inner: S) -> Self::Service {
491        MetricsService { inner }
492    }
493}
494
495/// Implement the [`tower::Service`] trait for the [`MetricsService`].
496impl<S> Service<RequestPacket> for MetricsService<S>
497where
498    S: Service<RequestPacket, Future = TransportFut<'static>, Error = TransportError> + Send + 'static + Clone,
499    S::Error: Send + 'static + Debug,
500{
501    type Error = TransportError;
502    type Future = TransportFut<'static>;
503    type Response = ResponsePacket;
504
505    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
506        self.inner.poll_ready(cx)
507    }
508
509    fn call(&mut self, request: RequestPacket) -> Self::Future {
510        // metrics before calling
511        let start = std::time::Instant::now();
512
513        let method_names = match request.clone() {
514            RequestPacket::Single(single_req) => vec![single_req.method().to_owned()],
515            RequestPacket::Batch(vec_req) => vec_req.iter().map(|s_req| s_req.method().to_owned()).collect(),
516        };
517
518        let future = self.inner.call(request);
519
520        // metrics after calling
521        Box::pin(async move {
522            let res = future.await;
523
524            let req_duration = start.elapsed();
525            method_names.iter().for_each(|method| {
526                trace!(method, duration_in_ms = req_duration.as_millis(), "rpc request took");
527                #[cfg(all(feature = "prometheus", not(test)))]
528                METRIC_RPC_CALLS_TIMING.observe(&[method], req_duration.as_secs_f64());
529            });
530
531            // First deserialize the Response object
532            match &res {
533                Ok(result) => match result {
534                    ResponsePacket::Single(a) => match a.payload {
535                        ResponsePayload::Success(_) => {
536                            #[cfg(all(feature = "prometheus", not(test)))]
537                            METRIC_COUNT_RPC_CALLS.increment(&[&method_names[0], "success"]);
538                        }
539                        ResponsePayload::Failure(_) => {
540                            #[cfg(all(feature = "prometheus", not(test)))]
541                            METRIC_COUNT_RPC_CALLS.increment(&[&method_names[0], "failure"]);
542                        }
543                    },
544                    ResponsePacket::Batch(b) => {
545                        b.iter().enumerate().for_each(|(i, _)| match b[i].payload {
546                            ResponsePayload::Success(_) => {
547                                #[cfg(all(feature = "prometheus", not(test)))]
548                                METRIC_COUNT_RPC_CALLS.increment(&[&method_names[i], "success"]);
549                            }
550                            ResponsePayload::Failure(_) => {
551                                #[cfg(all(feature = "prometheus", not(test)))]
552                                METRIC_COUNT_RPC_CALLS.increment(&[&method_names[i], "failure"]);
553                            }
554                        });
555                    }
556                },
557                Err(err) => {
558                    error!(error = ?err, "Error occurred while processing request");
559                    method_names.iter().for_each(|_m| {
560                        #[cfg(all(feature = "prometheus", not(test)))]
561                        METRIC_COUNT_RPC_CALLS.increment(&[_m, "failure"]);
562                    });
563                }
564            };
565
566            res
567        })
568    }
569}
570
571/// Snapshot of a response cached by the [`SnapshotRequestorLayer`].
572#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
573pub struct RequestorResponseSnapshot {
574    id: usize,
575    request: String,
576    response: String,
577}
578
579/// Replays an RPC response to a request if it is found in the snapshot YAML file.
580/// If no such request has been seen before,
581/// it captures the new request/response pair obtained from the inner [`HttpRequestor`]
582/// and stores it into the snapshot file.
583///
584/// This is useful for snapshot testing only and should **NOT** be used in production.
585#[derive(Debug, Clone)]
586pub struct SnapshotRequestor {
587    next_id: Arc<AtomicUsize>,
588    entries: moka::future::Cache<String, RequestorResponseSnapshot>,
589    file: String,
590    aggressive_save: bool,
591    fail_on_miss: bool,
592    ignore_snapshot: bool,
593}
594
595impl SnapshotRequestor {
596    /// Creates a new instance by wrapping an existing [`HttpRequestor`] and capturing
597    /// the request/response pairs.
598    ///
599    /// The constructor does not load any [snapshot entries](SnapshotRequestorLayer) from
600    /// the `snapshot_file`.
601    /// The [`SnapshotRequestorLayer::load`] method must be used after construction to do that.
602    pub fn new(snapshot_file: &str) -> Self {
603        Self {
604            next_id: Arc::new(AtomicUsize::new(1)),
605            entries: moka::future::Cache::builder().build(),
606            file: snapshot_file.to_owned(),
607            aggressive_save: false,
608            fail_on_miss: false,
609            ignore_snapshot: false,
610        }
611    }
612
613    /// Gets the path to the snapshot disk file.
614    pub fn snapshot_path(&self) -> &str {
615        &self.file
616    }
617
618    /// Clears all entries from the snapshot in memory.
619    /// The snapshot file is not changed.
620    pub fn clear(&self) {
621        self.entries.invalidate_all();
622        self.next_id.store(1, Ordering::Relaxed);
623    }
624
625    /// Clears all entries and loads them from the snapshot file.
626    /// If `fail_on_miss` is set and the data is successfully loaded, all later
627    /// requests that miss the loaded snapshot will result in HTTP error 404.
628    pub async fn try_load(&mut self, fail_on_miss: bool) -> Result<(), std::io::Error> {
629        if self.ignore_snapshot {
630            return Ok(());
631        }
632
633        let loaded = serde_yaml::from_reader::<_, Vec<RequestorResponseSnapshot>>(std::fs::File::open(&self.file)?)
634            .map_err(std::io::Error::other)?;
635
636        self.clear();
637
638        let loaded_len = futures::stream::iter(loaded)
639            .then(|entry| {
640                self.next_id.fetch_max(entry.id, Ordering::Relaxed);
641                self.entries.insert(entry.request.clone(), entry)
642            })
643            .collect::<Vec<_>>()
644            .await
645            .len();
646
647        if loaded_len > 0 {
648            self.fail_on_miss = fail_on_miss;
649        }
650
651        tracing::debug!("snapshot with {loaded_len} entries has been loaded from {}", &self.file);
652        Ok(())
653    }
654
655    /// Similar as [`SnapshotRequestorLayer::try_load`], except that no entries are cleared if the load fails.
656    ///
657    /// This method consumes and returns self for easier call chaining.
658    pub async fn load(mut self, fail_on_miss: bool) -> Self {
659        let _ = self.try_load(fail_on_miss).await;
660        self
661    }
662
663    /// Forces saving to disk on each newly inserted entry.
664    ///
665    /// Use this only when the expected number of entries in the snapshot is small.
666    pub fn with_aggresive_save(mut self) -> Self {
667        self.aggressive_save = true;
668        self
669    }
670
671    /// If set, the snapshot data will be ignored and resolution
672    /// will always be done with the inner requestor.
673    ///
674    /// This will inhibit any attempts to [`load`](SnapshotRequestorLayer::try_load) or
675    /// [`save`](SnapshotRequestorLayer::save) snapshot data.
676    pub fn with_ignore_snapshot(mut self, ignore_snapshot: bool) -> Self {
677        self.ignore_snapshot = ignore_snapshot;
678        self
679    }
680
681    /// Save the currently cached entries to the snapshot file on disk.
682    ///
683    /// Note that this method is automatically called on Drop, so usually it is unnecessary
684    /// to call it explicitly.
685    pub fn save(&self) -> Result<(), std::io::Error> {
686        if self.ignore_snapshot {
687            return Ok(());
688        }
689
690        let mut values: Vec<RequestorResponseSnapshot> = self.entries.iter().map(|(_, r)| r).collect();
691        values.sort_unstable_by_key(|a| a.id);
692
693        let mut writer = BufWriter::new(std::fs::File::create(&self.file)?);
694
695        serde_yaml::to_writer(&mut writer, &values).map_err(std::io::Error::other)?;
696
697        writer.flush()?;
698
699        tracing::debug!("snapshot with {} entries saved to file {}", values.len(), self.file);
700        Ok(())
701    }
702}
703
704impl Drop for SnapshotRequestor {
705    fn drop(&mut self) {
706        if let Err(e) = self.save() {
707            tracing::error!("failed to save snapshot: {e}");
708        }
709    }
710}
711
712#[derive(Debug, Clone)]
713pub struct SnapshotRequestorLayer {
714    snapshot_requestor: SnapshotRequestor,
715}
716
717impl SnapshotRequestorLayer {
718    pub fn new(snapshot_file: &str) -> Self {
719        Self {
720            snapshot_requestor: SnapshotRequestor::new(snapshot_file),
721        }
722    }
723
724    pub fn from_requestor(snapshot_requestor: SnapshotRequestor) -> Self {
725        Self { snapshot_requestor }
726    }
727}
728#[derive(Debug, Clone)]
729pub struct SnapshotRequestorService<S> {
730    inner: S,
731    snapshot_requestor: SnapshotRequestor,
732}
733
734// Implement tower::Layer for MetricsLayer.
735impl<S> Layer<S> for SnapshotRequestorLayer {
736    type Service = SnapshotRequestorService<S>;
737
738    fn layer(&self, inner: S) -> Self::Service {
739        SnapshotRequestorService {
740            inner,
741            snapshot_requestor: self.snapshot_requestor.clone(),
742        }
743    }
744}
745
746/// Implement the [`tower::Service`] trait for the [`SnapshotRequestorService`].
747impl<S> Service<RequestPacket> for SnapshotRequestorService<S>
748where
749    S: Service<RequestPacket, Future = TransportFut<'static>, Error = TransportError> + Send + 'static + Clone,
750    S::Error: Send + 'static + Debug,
751{
752    type Error = TransportError;
753    type Future = TransportFut<'static>;
754    type Response = ResponsePacket;
755
756    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
757        self.inner.poll_ready(cx)
758    }
759
760    fn call(&mut self, request: RequestPacket) -> Self::Future {
761        let mut inner = self.inner.clone(); // Clone service
762        let snapshot_requestor = self.snapshot_requestor.clone(); // Clone Arc or similar wrapper
763
764        let future = inner.call(request.clone()); // Move request too
765
766        Box::pin(async move {
767            let res = future.await;
768
769            let request_string = serde_json::to_string(&request)
770                .map_err(|e| TransportErrorKind::Custom(format!("serialize error: {e}").into()))?;
771
772            let inserted = AtomicBool::new(false);
773
774            let _result = snapshot_requestor
775                .entries
776                .entry(request_string.clone())
777                .or_try_insert_with(async {
778                    if snapshot_requestor.fail_on_miss {
779                        tracing::error!("{request_string} is missing in {}", &snapshot_requestor.file);
780                        return Err(TransportErrorKind::http_error(
781                            http::StatusCode::NOT_FOUND.into(),
782                            "".into(),
783                        ));
784                    }
785
786                    let response_string = match &res {
787                        Ok(result) => match result {
788                            ResponsePacket::Single(resp) => match &resp.payload {
789                                ResponsePayload::Success(success_payload) => success_payload.to_string(),
790                                ResponsePayload::Failure(e) => {
791                                    return Err(TransportErrorKind::Custom(format!("RPC error: {e}").into()).into());
792                                }
793                            },
794                            ResponsePacket::Batch(batch) => {
795                                let mut responses = Vec::with_capacity(batch.len());
796                                for (i, resp) in batch.iter().enumerate() {
797                                    match &resp.payload {
798                                        ResponsePayload::Success(success_payload) => {
799                                            responses.push(success_payload.to_string());
800                                        }
801                                        ResponsePayload::Failure(e) => {
802                                            return Err(TransportErrorKind::Custom(
803                                                format!("RPC error in batch item #{i}: {e}").into(),
804                                            )
805                                            .into());
806                                        }
807                                    }
808                                }
809                                responses.join(", ")
810                            }
811                        },
812                        Err(err) => {
813                            error!(error = ?err, "Error occurred while processing request");
814                            return Err(TransportErrorKind::Custom(
815                                format!("Error occurred while processing request: {err}").into(),
816                            )
817                            .into());
818                        }
819                    };
820
821                    let id = snapshot_requestor.next_id.fetch_add(1, Ordering::SeqCst);
822                    inserted.store(true, Ordering::Relaxed);
823                    tracing::debug!("saved new snapshot entry #{id}");
824
825                    Ok(RequestorResponseSnapshot {
826                        id,
827                        request: request_string.clone(),
828                        response: response_string,
829                    })
830                })
831                .await
832                .map(|e| e.into_value().response.into_bytes().into_boxed_slice())
833                .map_err(|e| TransportErrorKind::Custom(format!("{e}").into()))?;
834
835            if inserted.load(Ordering::Relaxed) && snapshot_requestor.aggressive_save {
836                tracing::debug!("{request_string} was NOT found and was resolved");
837                snapshot_requestor
838                    .save()
839                    .map_err(|e| TransportErrorKind::Custom(format!("{e}").into()))?;
840            } else {
841                tracing::debug!("{request_string} was found");
842            }
843
844            res
845        })
846    }
847}
848
849pub type AnvilRpcClient = FillProvider<
850    JoinFill<
851        JoinFill<Identity, JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>>,
852        WalletFiller<EthereumWallet>,
853    >,
854    RootProvider,
855>;
856/// Used for testing. Creates RPC client to the local Anvil instance.
857#[cfg(not(target_arch = "wasm32"))]
858pub fn create_rpc_client_to_anvil(
859    anvil: &alloy::node_bindings::AnvilInstance,
860    signer: &hopr_crypto_types::keypairs::ChainKeypair,
861) -> Arc<AnvilRpcClient> {
862    use alloy::{
863        providers::ProviderBuilder, rpc::client::ClientBuilder, signers::local::PrivateKeySigner,
864        transports::http::ReqwestTransport,
865    };
866    use hopr_crypto_types::keypairs::Keypair;
867
868    let wallet = PrivateKeySigner::from_slice(signer.secret().as_ref()).expect("failed to construct wallet");
869
870    let transport_client = ReqwestTransport::new(anvil.endpoint_url());
871
872    let rpc_client = ClientBuilder::default().transport(transport_client.clone(), transport_client.guess_local());
873
874    let provider = ProviderBuilder::new().wallet(wallet).connect_client(rpc_client);
875
876    Arc::new(provider)
877}
878
879#[cfg(test)]
880mod tests {
881    use std::time::Duration;
882
883    use alloy::{
884        network::TransactionBuilder,
885        primitives::{U256, address},
886        providers::{
887            Provider, ProviderBuilder,
888            fillers::{BlobGasFiller, CachedNonceManager, ChainIdFiller, GasFiller, NonceFiller},
889        },
890        rpc::{client::ClientBuilder, types::TransactionRequest},
891        signers::local::PrivateKeySigner,
892        transports::{http::ReqwestTransport, layers::RetryBackoffLayer},
893    };
894    use anyhow::Ok;
895    use hopr_async_runtime::prelude::sleep;
896    use hopr_chain_types::{ContractAddresses, ContractInstances, utils::create_anvil};
897    use hopr_crypto_types::keypairs::{ChainKeypair, Keypair};
898    use hopr_primitive_types::primitives::Address;
899    use serde_json::json;
900    use tempfile::NamedTempFile;
901
902    use crate::client::{
903        DefaultRetryPolicy, GasOracleFiller, MetricsLayer, SnapshotRequestor, SnapshotRequestorLayer, ZeroRetryPolicy,
904    };
905
906    #[tokio::test]
907    async fn test_client_should_deploy_contracts_via_reqwest() -> anyhow::Result<()> {
908        let anvil = create_anvil(None);
909        let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
910        let signer_chain_key = ChainKeypair::from_secret(signer.to_bytes().as_ref())?;
911
912        let rpc_client = ClientBuilder::default().http(anvil.endpoint_url());
913
914        let provider = ProviderBuilder::new().wallet(signer).connect_client(rpc_client);
915
916        let contracts = ContractInstances::deploy_for_testing(provider.clone(), &signer_chain_key)
917            .await
918            .expect("deploy failed");
919
920        let contract_addrs = ContractAddresses::from(&contracts);
921
922        assert_ne!(contract_addrs.token, Address::default());
923        assert_ne!(contract_addrs.channels, Address::default());
924        assert_ne!(contract_addrs.announcements, Address::default());
925        assert_ne!(contract_addrs.network_registry, Address::default());
926        assert_ne!(contract_addrs.safe_registry, Address::default());
927        assert_ne!(contract_addrs.price_oracle, Address::default());
928
929        Ok(())
930    }
931
932    #[tokio::test]
933    async fn test_client_should_get_block_number() -> anyhow::Result<()> {
934        let block_time = Duration::from_millis(1100);
935
936        let anvil = create_anvil(Some(block_time));
937        let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
938
939        let transport_client = ReqwestTransport::new(anvil.endpoint_url());
940
941        let rpc_client = ClientBuilder::default().transport(transport_client.clone(), transport_client.guess_local());
942
943        let provider = ProviderBuilder::new().wallet(signer).connect_client(rpc_client);
944
945        let mut last_number = 0;
946
947        for _ in 0..3 {
948            sleep(block_time).await;
949
950            let num = provider.get_block_number().await?;
951
952            assert!(num > last_number, "next block number must be greater");
953            last_number = num;
954        }
955
956        Ok(())
957    }
958
959    #[tokio::test]
960    async fn test_client_should_get_block_number_with_metrics_without_retry() -> anyhow::Result<()> {
961        let block_time = Duration::from_secs(1);
962
963        let anvil = create_anvil(Some(block_time));
964        let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
965
966        let transport_client = ReqwestTransport::new(anvil.endpoint_url());
967
968        // additional retry layer
969        let retry_layer = RetryBackoffLayer::new(2, 100, 100);
970
971        let rpc_client = ClientBuilder::default()
972            .layer(retry_layer)
973            .layer(MetricsLayer)
974            .transport(transport_client.clone(), transport_client.guess_local());
975
976        let provider = ProviderBuilder::new().wallet(signer).connect_client(rpc_client);
977
978        let mut last_number = 0;
979
980        for _ in 0..3 {
981            sleep(block_time).await;
982
983            let num = provider.get_block_number().await?;
984
985            assert!(num > last_number, "next block number must be greater");
986            last_number = num;
987        }
988
989        // FIXME: cannot get the private field `requests_enqueued`
990        // assert_eq!(
991        //     0,
992        //     rpc_client.requests_enqueued.load(Ordering::SeqCst),
993        //     "retry queue should be zero on successful requests"
994        // );
995
996        Ok(())
997    }
998
999    #[tokio::test]
1000    async fn test_client_should_fail_on_malformed_request() -> anyhow::Result<()> {
1001        let anvil = create_anvil(None);
1002        let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
1003
1004        let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1005
1006        let rpc_client = ClientBuilder::default().transport(transport_client.clone(), transport_client.guess_local());
1007
1008        let provider = ProviderBuilder::new().wallet(signer).connect_client(rpc_client);
1009
1010        let err = provider
1011            .raw_request::<(), alloy::primitives::U64>("eth_blockNumber_bla".into(), ())
1012            .await
1013            .expect_err("expected error");
1014
1015        assert!(matches!(err, alloy::transports::RpcError::ErrorResp(..)));
1016
1017        Ok(())
1018    }
1019
1020    #[tokio::test]
1021    async fn test_client_should_fail_on_malformed_response() {
1022        let mut server = mockito::Server::new_async().await;
1023
1024        let m = server
1025            .mock("POST", "/")
1026            .with_status(200)
1027            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1028            .with_body("}malformed{")
1029            .expect(1)
1030            .create();
1031
1032        let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1033
1034        let rpc_client = ClientBuilder::default()
1035            .layer(RetryBackoffLayer::new(2, 100, 100))
1036            .transport(transport_client.clone(), transport_client.guess_local());
1037
1038        let provider = ProviderBuilder::new().connect_client(rpc_client);
1039
1040        let err = provider
1041            .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1042            .await
1043            .expect_err("expected error");
1044
1045        m.assert();
1046        assert!(matches!(
1047            err,
1048            alloy::transports::RpcError::DeserError { err: _, text: _ }
1049        ));
1050    }
1051
1052    #[tokio::test]
1053    async fn test_client_should_retry_on_http_error() {
1054        let mut server = mockito::Server::new_async().await;
1055
1056        let too_many_requests: u16 = http::StatusCode::TOO_MANY_REQUESTS.as_u16();
1057
1058        let m = server
1059            .mock("POST", "/")
1060            .with_status(too_many_requests as usize)
1061            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1062            .with_body("{}")
1063            .expect(3)
1064            .create();
1065
1066        let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1067
1068        let rpc_client = ClientBuilder::default()
1069            .layer(RetryBackoffLayer::new(2, 100, 100))
1070            .transport(transport_client.clone(), transport_client.guess_local());
1071
1072        // TODO: FIXME: implement a CustomRetryBackoff policy/service and test its `requests_enqueued`
1073        // let client = JsonRpcProviderClient::new(
1074        //     &server.url(),
1075        //     SurfRequestor::default(),
1076        //     SimpleJsonRpcRetryPolicy {
1077        //         max_retries: Some(2),
1078        //         retryable_http_errors: vec![http_types::StatusCode::TooManyRequests],
1079        //         initial_backoff: Duration::from_millis(100),
1080        //         ..SimpleJsonRpcRetryPolicy::default()
1081        //     },
1082        // );
1083
1084        let provider = ProviderBuilder::new().connect_client(rpc_client);
1085
1086        let err = provider
1087            .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1088            .await
1089            .expect_err("expected error");
1090
1091        m.assert();
1092        assert!(matches!(err, alloy::transports::RpcError::Transport(..)));
1093
1094        // TODO: Create a customize RetryBackoffService that exposes `requests_enqueued`
1095        // assert_eq!(
1096        //     0,
1097        //     client.requests_enqueued.load(Ordering::SeqCst),
1098        //     "retry queue should be zero when policy says no more retries"
1099        // );
1100    }
1101
1102    #[tokio::test]
1103    async fn test_client_should_not_retry_with_zero_retry_policy() {
1104        let mut server = mockito::Server::new_async().await;
1105
1106        let m = server
1107            .mock("POST", "/")
1108            .with_status(404)
1109            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1110            .with_body("{}")
1111            .expect(1)
1112            .create();
1113
1114        let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1115
1116        let rpc_client = ClientBuilder::default()
1117            .layer(RetryBackoffLayer::new_with_policy(2, 100, 100, ZeroRetryPolicy))
1118            .transport(transport_client.clone(), transport_client.guess_local());
1119
1120        let provider = ProviderBuilder::new().connect_client(rpc_client);
1121
1122        let err = provider
1123            .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1124            .await
1125            .expect_err("expected error");
1126
1127        m.assert();
1128        assert!(matches!(err, alloy::transports::RpcError::Transport(..)));
1129        // TODO: Create a customize RetryBackoffService that exposes `requests_enqueued`
1130        // assert_eq!(
1131        //     0,
1132        //     client.requests_enqueued.load(Ordering::SeqCst),
1133        //     "retry queue should be zero when policy says no more retries"
1134        // );
1135    }
1136
1137    #[tokio::test]
1138    async fn test_client_should_retry_on_json_rpc_error() {
1139        let mut server = mockito::Server::new_async().await;
1140
1141        let m = server
1142            .mock("POST", "/")
1143            .with_status(200)
1144            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1145            .with_body(
1146                r#"{
1147              "jsonrpc": "2.0",
1148              "id": 1,
1149              "error": {
1150                "message": "some message",
1151                "code": -32603
1152              }
1153            }"#,
1154            )
1155            .expect(3)
1156            .create();
1157
1158        let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1159
1160        let simple_json_rpc_retry_policy = DefaultRetryPolicy {
1161            initial_backoff: Duration::from_millis(100),
1162            retryable_json_rpc_errors: vec![-32603],
1163            ..DefaultRetryPolicy::default()
1164        };
1165        let rpc_client = ClientBuilder::default()
1166            .layer(RetryBackoffLayer::new_with_policy(
1167                2,
1168                100,
1169                100,
1170                simple_json_rpc_retry_policy,
1171            ))
1172            .transport(transport_client.clone(), transport_client.guess_local());
1173
1174        let provider = ProviderBuilder::new().connect_client(rpc_client);
1175
1176        let err = provider
1177            .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1178            .await
1179            .expect_err("expected error");
1180
1181        m.assert();
1182        assert!(matches!(err, alloy::transports::RpcError::Transport(..)));
1183        // TODO: Create a customize RetryBackoffService that exposes `requests_enqueued`
1184        // assert_eq!(
1185        //     0,
1186        //     client.requests_enqueued.load(Ordering::SeqCst),
1187        //     "retry queue should be zero when policy says no more retries"
1188        // );
1189    }
1190
1191    #[tokio::test]
1192    async fn test_client_should_not_retry_on_nonretryable_json_rpc_error() {
1193        let mut server = mockito::Server::new_async().await;
1194
1195        let m = server
1196            .mock("POST", "/")
1197            .with_status(200)
1198            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1199            .with_body(
1200                r#"{
1201              "jsonrpc": "2.0",
1202              "id": 1,
1203              "error": {
1204                "message": "some message",
1205                "code": -32000
1206              }
1207            }"#,
1208            )
1209            .expect(1)
1210            .create();
1211
1212        let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1213
1214        let simple_json_rpc_retry_policy = DefaultRetryPolicy {
1215            initial_backoff: Duration::from_millis(100),
1216            retryable_json_rpc_errors: vec![],
1217            ..DefaultRetryPolicy::default()
1218        };
1219        let rpc_client = ClientBuilder::default()
1220            .layer(RetryBackoffLayer::new_with_policy(
1221                2,
1222                100,
1223                100,
1224                simple_json_rpc_retry_policy,
1225            ))
1226            .transport(transport_client.clone(), transport_client.guess_local());
1227
1228        let provider = ProviderBuilder::new().connect_client(rpc_client);
1229
1230        let err = provider
1231            .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1232            .await
1233            .expect_err("expected error");
1234
1235        m.assert();
1236        assert!(matches!(err, alloy::transports::RpcError::ErrorResp(..)));
1237
1238        // TODO: Create a customize RetryBackoffService that exposes `requests_enqueued`
1239        // assert_eq!(
1240        //     0,
1241        //     client.requests_enqueued.load(Ordering::SeqCst),
1242        //     "retry queue should be zero when policy says no more retries"
1243        // );
1244    }
1245
1246    #[tokio::test]
1247    async fn test_client_should_retry_on_nonretryable_json_rpc_error_if_min_retries_is_given() {
1248        let mut server = mockito::Server::new_async().await;
1249
1250        let _m = server
1251            .mock("POST", "/")
1252            .with_status(200)
1253            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1254            .with_body(
1255                r#"{
1256              "jsonrpc": "2.0",
1257              "id": 1,
1258              "error": {
1259                "message": "some message",
1260                "code": -32000
1261              }
1262            }"#,
1263            )
1264            .expect(2)
1265            .create();
1266
1267        let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1268
1269        let simple_json_rpc_retry_policy = DefaultRetryPolicy {
1270            initial_backoff: Duration::from_millis(100),
1271            retryable_json_rpc_errors: vec![],
1272            min_retries: Some(1),
1273            ..DefaultRetryPolicy::default()
1274        };
1275        let rpc_client = ClientBuilder::default()
1276            .layer(RetryBackoffLayer::new_with_policy(
1277                2,
1278                100,
1279                100,
1280                simple_json_rpc_retry_policy,
1281            ))
1282            .transport(transport_client.clone(), transport_client.guess_local());
1283
1284        let provider = ProviderBuilder::new().connect_client(rpc_client);
1285
1286        let err = provider
1287            .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1288            .await
1289            .expect_err("expected error");
1290
1291        // FIXME: implement minimum retry, and enable the assert
1292        // m.assert();
1293        assert!(matches!(err, alloy::transports::RpcError::ErrorResp(..)));
1294
1295        // TODO: Create a customize RetryBackoffService that exposes `requests_enqueued`
1296        // assert_eq!(
1297        //     0,
1298        //     client.requests_enqueued.load(Ordering::SeqCst),
1299        //     "retry queue should be zero when policy says no more retries"
1300        // );
1301    }
1302
1303    #[tokio::test]
1304    async fn test_client_should_retry_on_malformed_json_rpc_error() {
1305        let mut server = mockito::Server::new_async().await;
1306
1307        let m = server
1308            .mock("POST", "/")
1309            .with_status(200)
1310            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1311            .with_body(
1312                r#"{
1313              "jsonrpc": "2.0",
1314              "error": {
1315                "message": "some message",
1316                "code": -32600
1317              }
1318            }"#,
1319            )
1320            .expect(3)
1321            .create();
1322
1323        let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1324
1325        let simple_json_rpc_retry_policy = DefaultRetryPolicy {
1326            initial_backoff: Duration::from_millis(100),
1327            retryable_json_rpc_errors: vec![-32600],
1328            min_retries: Some(1),
1329            ..DefaultRetryPolicy::default()
1330        };
1331        let rpc_client = ClientBuilder::default()
1332            .layer(RetryBackoffLayer::new_with_policy(
1333                2,
1334                100,
1335                100,
1336                simple_json_rpc_retry_policy,
1337            ))
1338            .transport(transport_client.clone(), transport_client.guess_local());
1339
1340        let provider = ProviderBuilder::new().connect_client(rpc_client);
1341
1342        let err = provider
1343            .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1344            .await
1345            .expect_err("expected error");
1346
1347        m.assert();
1348        assert!(matches!(err, alloy::transports::RpcError::Transport(..)));
1349
1350        // TODO: Create a customize RetryBackoffService that exposes `requests_enqueued`
1351        // assert_eq!(
1352        //     0,
1353        //     client.requests_enqueued.load(Ordering::SeqCst),
1354        //     "retry queue should be zero when policy says no more retries"
1355        // );
1356    }
1357
1358    #[test_log::test(tokio::test)]
1359    async fn test_client_from_file() -> anyhow::Result<()> {
1360        let block_time = Duration::from_millis(1100);
1361        let snapshot_file = NamedTempFile::new()?;
1362
1363        let anvil = create_anvil(Some(block_time));
1364
1365        {
1366            let mut last_number = 0;
1367
1368            let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1369
1370            let rpc_client = ClientBuilder::default()
1371                .layer(RetryBackoffLayer::new_with_policy(
1372                    2,
1373                    100,
1374                    100,
1375                    DefaultRetryPolicy::default(),
1376                ))
1377                .layer(SnapshotRequestorLayer::new(snapshot_file.path().to_str().unwrap()))
1378                .transport(transport_client.clone(), transport_client.guess_local());
1379
1380            let provider = ProviderBuilder::new().connect_client(rpc_client);
1381
1382            for _ in 0..3 {
1383                sleep(block_time).await;
1384
1385                let num = provider.get_block_number().await?;
1386
1387                assert!(num > last_number, "next block number must be greater");
1388                last_number = num;
1389            }
1390        }
1391
1392        {
1393            let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1394
1395            let snapshot_requestor = SnapshotRequestor::new(snapshot_file.path().to_str().unwrap())
1396                .load(true)
1397                .await;
1398
1399            let rpc_client = ClientBuilder::default()
1400                .layer(RetryBackoffLayer::new_with_policy(
1401                    2,
1402                    100,
1403                    100,
1404                    DefaultRetryPolicy::default(),
1405                ))
1406                .layer(SnapshotRequestorLayer::from_requestor(snapshot_requestor))
1407                .transport(transport_client.clone(), transport_client.guess_local());
1408
1409            let provider = ProviderBuilder::new().connect_client(rpc_client);
1410
1411            let mut last_number = 0;
1412            for _ in 0..3 {
1413                sleep(block_time).await;
1414
1415                let num = provider.get_block_number().await?;
1416
1417                assert!(num > last_number, "next block number must be greater");
1418                last_number = num;
1419            }
1420        }
1421
1422        Ok(())
1423    }
1424
1425    #[tokio::test]
1426    async fn test_client_should_call_on_gas_oracle_for_eip1559_tx() -> anyhow::Result<()> {
1427        let _ = env_logger::builder().is_test(true).try_init();
1428
1429        let mut server = mockito::Server::new_async().await;
1430
1431        let m = server
1432            .mock("GET", "/gasapi.ashx?apikey=key&method=gasoracle")
1433            .with_status(http::StatusCode::ACCEPTED.as_u16().into())
1434            .with_body(r#"{"status":"1","message":"OK","result":{"LastBlock":"39864926","SafeGasPrice":"1.1","ProposeGasPrice":"1.1","FastGasPrice":"1.6","UsdPrice":"0.999968207972734"}}"#)
1435            .expect(0)
1436            .create();
1437
1438        let anvil = create_anvil(None);
1439        let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
1440
1441        let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1442        // let underlying_transport_client = transport_client.client().clone();
1443
1444        let rpc_client = ClientBuilder::default()
1445            .layer(RetryBackoffLayer::new(2, 100, 100))
1446            .transport(transport_client.clone(), transport_client.guess_local());
1447
1448        let provider = ProviderBuilder::new()
1449            .disable_recommended_fillers()
1450            .wallet(signer)
1451            .filler(NonceFiller::new(CachedNonceManager::default()))
1452            .filler(GasOracleFiller::new(
1453                transport_client.client().clone(),
1454                Some((server.url() + "/gasapi.ashx?apikey=key&method=gasoracle").parse()?),
1455            ))
1456            .filler(GasFiller)
1457            .connect_client(rpc_client);
1458
1459        let tx = TransactionRequest::default()
1460            .with_chain_id(provider.get_chain_id().await?)
1461            .to(address!("d8dA6BF26964aF9D7eEd9e03E53415D37aA96045"))
1462            .value(U256::from(100))
1463            .transaction_type(2);
1464
1465        let receipt = provider.send_transaction(tx).await?.get_receipt().await?;
1466
1467        m.assert();
1468        assert_eq!(receipt.gas_used, 21000);
1469        Ok(())
1470    }
1471
1472    #[tokio::test]
1473    async fn test_client_should_call_on_gas_oracle_for_legacy_tx() -> anyhow::Result<()> {
1474        let _ = env_logger::builder().is_test(true).try_init();
1475
1476        let mut server = mockito::Server::new_async().await;
1477
1478        let m = server
1479            .mock("GET", "/gasapi.ashx?apikey=key&method=gasoracle")
1480            .with_status(http::StatusCode::ACCEPTED.as_u16().into())
1481            .with_body(r#"{"status":"1","message":"OK","result":{"LastBlock":"39864926","SafeGasPrice":"1.1","ProposeGasPrice":"3.5","FastGasPrice":"1.6","UsdPrice":"0.999968207972734"}}"#)
1482            .expect(1)
1483            .create();
1484
1485        let anvil = create_anvil(None);
1486        let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
1487
1488        let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1489
1490        let rpc_client = ClientBuilder::default()
1491            .layer(RetryBackoffLayer::new(2, 100, 100))
1492            .transport(transport_client.clone(), transport_client.guess_local());
1493
1494        let provider = ProviderBuilder::new()
1495            .disable_recommended_fillers()
1496            .wallet(signer)
1497            .filler(ChainIdFiller::default())
1498            .filler(NonceFiller::new(CachedNonceManager::default()))
1499            .filler(GasOracleFiller::new(
1500                transport_client.client().clone(),
1501                Some((server.url() + "/gasapi.ashx?apikey=key&method=gasoracle").parse()?),
1502            ))
1503            .filler(GasFiller)
1504            .filler(BlobGasFiller)
1505            .connect_client(rpc_client);
1506
1507        // GasEstimationLayer requires chain_id to be set to handle EIP-1559 tx
1508        let tx = TransactionRequest::default()
1509            .with_to(address!("d8dA6BF26964aF9D7eEd9e03E53415D37aA96045"))
1510            .with_value(U256::from(100))
1511            .with_gas_price(1000000000);
1512
1513        let receipt = provider.send_transaction(tx).await?.get_receipt().await?;
1514
1515        m.assert();
1516        assert_eq!(receipt.gas_used, 21000);
1517        Ok(())
1518    }
1519}