hopr_chain_rpc/
client.rs

1//! Extended `JsonRpcClient` abstraction.
2//!
3//! This module contains custom implementation of `ethers::providers::JsonRpcClient`
4//! which allows usage of non-`reqwest` based HTTP clients.
5//!
6//! The major type implemented in this module is the [JsonRpcProviderClient]
7//! which implements the [ethers::providers::JsonRpcClient] trait. That makes it possible to use it with `ethers`.
8//!
9//! The [JsonRpcProviderClient] is abstract over the [HttpRequestor] trait, which makes it possible
10//! to make the underlying HTTP client implementation easily replaceable. This is needed to make it possible
11//! for `ethers` to work with different async runtimes, since the HTTP client is typically not agnostic to
12//! async runtimes (the default HTTP client in `ethers` is using `reqwest`, which is `tokio` specific).
13//! Secondly, this abstraction also allows implementing WASM-compatible HTTP client if needed at some point.
14
15use async_trait::async_trait;
16use ethers::providers::{JsonRpcClient, JsonRpcError};
17use futures::StreamExt;
18use http_types::Method;
19use serde::de::DeserializeOwned;
20use serde::{Deserialize, Serialize};
21use std::fmt::{Debug, Formatter};
22use std::io::{BufWriter, Write};
23use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
24use std::sync::Arc;
25use std::time::Duration;
26use tracing::{debug, error, trace, warn};
27use validator::Validate;
28
29use hopr_async_runtime::prelude::sleep;
30
31use crate::client::RetryAction::{NoRetry, RetryAfter};
32use crate::errors::{HttpRequestError, JsonRpcProviderClientError};
33use crate::helper::{Request, Response};
34use crate::{HttpRequestor, RetryAction, RetryPolicy};
35
36#[cfg(all(feature = "prometheus", not(test)))]
37use hopr_metrics::metrics::{MultiCounter, MultiHistogram};
38
39#[cfg(all(feature = "prometheus", not(test)))]
40lazy_static::lazy_static! {
41    static ref METRIC_COUNT_RPC_CALLS: MultiCounter = MultiCounter::new(
42        "hopr_rpc_call_count",
43        "Number of Ethereum RPC calls over HTTP and their result",
44        &["call", "result"]
45    )
46    .unwrap();
47    static ref METRIC_RPC_CALLS_TIMING: MultiHistogram = MultiHistogram::new(
48        "hopr_rpc_call_time_sec",
49        "Timing of RPC calls over HTTP in seconds",
50        vec![0.1, 0.5, 1.0, 2.0, 5.0, 7.0, 10.0],
51        &["call"]
52    )
53    .unwrap();
54    static ref METRIC_RETRIES_PER_RPC_CALL: MultiHistogram = MultiHistogram::new(
55        "hopr_retries_per_rpc_call",
56        "Number of retries per RPC call",
57        vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
58        &["call"]
59    )
60    .unwrap();
61}
62
63/// Defines a retry policy suitable for `JsonRpcProviderClient`.
64///
65/// This retry policy distinguishes between 4 types of RPC request failures:
66/// - JSON RPC error (based on error code)
67/// - HTTP error (based on HTTP status)
68/// - Transport error (e.g. connection timeout)
69/// - Serde error (some of these are treated as JSON RPC error above, if an error code can be obtained).
70///
71/// The policy will make up to `max_retries` once a JSON RPC request fails.
72/// The minimum number of retries `min_retries` can be also specified and applies to any type of error regardless.
73/// Each retry `k > 0` will be separated by a delay of `initial_backoff * (1 + backoff_coefficient)^(k - 1)`,
74/// namely all the JSON RPC error codes specified in `retryable_json_rpc_errors` and all the HTTP errors
75/// specified in `retryable_http_errors`.
76///
77/// The total wait time will be `(initial_backoff/backoff_coefficient) * ((1 + backoff_coefficient)^max_retries - 1)`.
78/// or `max_backoff`, whatever is lower.
79///
80/// Transport and connection errors (such as connection timeouts) are retried without backoff
81/// at a constant delay of `initial_backoff` if `backoff_on_transport_errors` is not set.
82///
83/// No more additional retries are allowed on new requests, if the maximum number of concurrent
84/// requests being retried has reached `max_retry_queue_size`.
85#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault, Serialize, Deserialize, Validate)]
86pub struct SimpleJsonRpcRetryPolicy {
87    /// Minimum number of retries of any error, regardless the error code.
88    ///
89    /// Default is 0.
90    #[validate(range(min = 0))]
91    #[default(Some(0))]
92    pub min_retries: Option<u32>,
93
94    /// Maximum number of retries.
95    ///
96    /// If `None` is given, will keep retrying indefinitely.
97    ///
98    /// Default is 12.
99    #[validate(range(min = 1))]
100    #[default(Some(12))]
101    pub max_retries: Option<u32>,
102    /// Initial wait before retries.
103    ///
104    /// NOTE: Transport and connection errors (such as connection timeouts) are retried at
105    /// a constant rate (no backoff) with this delay if `backoff_on_transport_errors` is not set.
106    ///
107    /// Default is 1 second.
108    #[default(Duration::from_secs(1))]
109    pub initial_backoff: Duration,
110    /// Backoff coefficient by which will be each retry multiplied.
111    ///
112    /// Must be non-negative. If set to `0`, no backoff will be applied and the
113    /// requests will be retried at a constant rate.
114    ///
115    /// Default is 0.3
116    #[validate(range(min = 0.0))]
117    #[default(0.3)]
118    pub backoff_coefficient: f64,
119    /// Maximum backoff value.
120    ///
121    /// Once reached, the requests will be retried at a constant rate with this timeout.
122    ///
123    /// Default is 30 seconds.
124    #[default(Duration::from_secs(30))]
125    pub max_backoff: Duration,
126    /// Indicates whether to also apply backoff to transport and connection errors (such as connection timeouts).
127    ///
128    /// Default is false.
129    pub backoff_on_transport_errors: bool,
130    /// List of JSON RPC errors that should be retried with backoff
131    ///
132    /// Default is \[429, -32005, -32016\]
133    #[default(_code = "vec![-32005, -32016, 429]")]
134    pub retryable_json_rpc_errors: Vec<i64>,
135
136    /// List of HTTP errors that should be retried with backoff.
137    ///
138    /// Default is \[429, 504, 503\]
139    #[default(
140        _code = "vec![http_types::StatusCode::TooManyRequests,http_types::StatusCode::GatewayTimeout,http_types::StatusCode::ServiceUnavailable]"
141    )]
142    pub retryable_http_errors: Vec<http_types::StatusCode>,
143    /// Maximum number of different requests that are being retried at the same time.
144    ///
145    /// If any additional request fails after this number is attained, it won't be retried.
146    ///
147    /// Default is 100
148    #[validate(range(min = 5))]
149    #[default = 100]
150    pub max_retry_queue_size: u32,
151}
152
153impl SimpleJsonRpcRetryPolicy {
154    fn is_retryable_json_rpc_error(&self, err: &JsonRpcError) -> bool {
155        self.retryable_json_rpc_errors.contains(&err.code) || err.message.contains("rate limit")
156    }
157
158    fn is_retryable_http_error(&self, status: &http_types::StatusCode) -> bool {
159        self.retryable_http_errors.contains(status)
160    }
161}
162
163impl RetryPolicy<JsonRpcProviderClientError> for SimpleJsonRpcRetryPolicy {
164    fn is_retryable_error(
165        &self,
166        err: &JsonRpcProviderClientError,
167        num_retries: u32,
168        retry_queue_size: u32,
169    ) -> RetryAction {
170        if self.max_retries.is_some_and(|max| num_retries > max) {
171            warn!(
172                count = self.max_retries.expect("max_retries must be set"),
173                "max number of retries has been reached"
174            );
175            return NoRetry;
176        }
177
178        debug!(
179            size = retry_queue_size,
180            "checking retry queue size after retryable error"
181        );
182
183        if retry_queue_size > self.max_retry_queue_size {
184            warn!(
185                size = self.max_retry_queue_size,
186                "maximum size of retry queue has been reached"
187            );
188            return NoRetry;
189        }
190
191        // next_backoff = initial_backoff * (1 + backoff_coefficient)^(num_retries - 1)
192        let backoff = self
193            .initial_backoff
194            .mul_f64(f64::powi(1.0 + self.backoff_coefficient, (num_retries - 1) as i32))
195            .min(self.max_backoff);
196
197        // Retry if a global minimum of number of retries was given and wasn't yet attained
198        if self.min_retries.is_some_and(|min| num_retries <= min) {
199            debug!(num_retries, min_retries = ?self.min_retries,  "retrying because minimum number of retries not yet reached");
200            return RetryAfter(backoff);
201        }
202
203        match err {
204            // Retryable JSON RPC errors are retries with backoff
205            JsonRpcProviderClientError::JsonRpcError(e) if self.is_retryable_json_rpc_error(e) => {
206                debug!(error = %e, "encountered retryable JSON RPC error code");
207                RetryAfter(backoff)
208            }
209
210            // Retryable HTTP errors are retries with backoff
211            JsonRpcProviderClientError::BackendError(HttpRequestError::HttpError(e))
212                if self.is_retryable_http_error(e) =>
213            {
214                debug!(error = ?e, "encountered retryable HTTP error code");
215                RetryAfter(backoff)
216            }
217
218            // Transport error and timeouts are retried at a constant rate if specified
219            JsonRpcProviderClientError::BackendError(e @ HttpRequestError::Timeout)
220            | JsonRpcProviderClientError::BackendError(e @ HttpRequestError::TransportError(_))
221            | JsonRpcProviderClientError::BackendError(e @ HttpRequestError::UnknownError(_)) => {
222                debug!(error = %e, "encountered retryable transport error");
223                RetryAfter(if self.backoff_on_transport_errors {
224                    backoff
225                } else {
226                    self.initial_backoff
227                })
228            }
229
230            // Some providers send invalid JSON RPC in the error case (no `id:u64`), but the text is a `JsonRpcError`
231            JsonRpcProviderClientError::SerdeJson { text, .. } => {
232                #[derive(Deserialize)]
233                struct Resp {
234                    error: JsonRpcError,
235                }
236
237                match serde_json::from_str::<Resp>(text) {
238                    Ok(Resp { error }) if self.is_retryable_json_rpc_error(&error) => {
239                        debug!(%error, "encountered retryable JSON RPC error");
240                        RetryAfter(backoff)
241                    }
242                    _ => {
243                        debug!(error = %text, "unparseable JSON RPC error");
244                        NoRetry
245                    }
246                }
247            }
248
249            // Anything else is not retried
250            _ => NoRetry,
251        }
252    }
253}
254
255/// Modified implementation of `ethers::providers::Http` so that it can
256/// operate with any `HttpPostRequestor`.
257/// Also contains possible retry actions to be taken on various failures, therefore it
258/// implements also `ethers::providers::RetryClient` functionality.
259pub struct JsonRpcProviderClient<Req: HttpRequestor, R: RetryPolicy<JsonRpcProviderClientError>> {
260    id: AtomicU64,
261    requests_enqueued: AtomicU32,
262    url: String,
263    requestor: Req,
264    retry_policy: R,
265}
266
267impl<Req: HttpRequestor, R: RetryPolicy<JsonRpcProviderClientError>> JsonRpcProviderClient<Req, R> {
268    /// Creates the client given the `HttpPostRequestor`
269    pub fn new(base_url: &str, requestor: Req, retry_policy: R) -> Self {
270        Self {
271            id: AtomicU64::new(1),
272            requests_enqueued: AtomicU32::new(0),
273            url: base_url.to_owned(),
274            requestor,
275            retry_policy,
276        }
277    }
278
279    async fn send_request_internal<T, A>(&self, method: &str, params: T) -> Result<A, JsonRpcProviderClientError>
280    where
281        T: Serialize + Send + Sync,
282        A: DeserializeOwned,
283    {
284        // Create the Request object
285        let next_id = self.id.fetch_add(1, Ordering::SeqCst);
286        let payload = Request::new(next_id, method, params);
287
288        debug!(method, "sending rpc request");
289        trace!(
290            method,
291            request = serde_json::to_string(&payload).expect("request must be serializable"),
292            "sending rpc request",
293        );
294
295        // Perform the actual request
296        let start = std::time::Instant::now();
297        let body = self.requestor.http_post(self.url.as_ref(), payload).await?;
298        let req_duration = start.elapsed();
299
300        trace!(method, duration_in_ms = req_duration.as_millis(), "rpc request took");
301
302        #[cfg(all(feature = "prometheus", not(test)))]
303        METRIC_RPC_CALLS_TIMING.observe(&[method], req_duration.as_secs_f64());
304
305        // First deserialize the Response object
306        let raw = match serde_json::from_slice(&body) {
307            Ok(Response::Success { result, .. }) => result.to_owned(),
308            Ok(Response::Error { error, .. }) => {
309                #[cfg(all(feature = "prometheus", not(test)))]
310                METRIC_COUNT_RPC_CALLS.increment(&[method, "failure"]);
311
312                return Err(error.into());
313            }
314            Ok(_) => {
315                let err = JsonRpcProviderClientError::SerdeJson {
316                    err: serde::de::Error::custom("unexpected notification over HTTP transport"),
317                    text: String::from_utf8_lossy(&body).to_string(),
318                };
319                #[cfg(all(feature = "prometheus", not(test)))]
320                METRIC_COUNT_RPC_CALLS.increment(&[method, "failure"]);
321
322                return Err(err);
323            }
324            Err(err) => {
325                #[cfg(all(feature = "prometheus", not(test)))]
326                METRIC_COUNT_RPC_CALLS.increment(&[method, "failure"]);
327
328                return Err(JsonRpcProviderClientError::SerdeJson {
329                    err,
330                    text: String::from_utf8_lossy(&body).to_string(),
331                });
332            }
333        };
334
335        // Next, deserialize the data out of the Response object
336        let json_str = raw.get();
337        trace!(method, response = &json_str, "rpc request response received");
338
339        let res = serde_json::from_str(json_str).map_err(|err| JsonRpcProviderClientError::SerdeJson {
340            err,
341            text: raw.to_string(),
342        })?;
343
344        #[cfg(all(feature = "prometheus", not(test)))]
345        METRIC_COUNT_RPC_CALLS.increment(&[method, "success"]);
346
347        Ok(res)
348    }
349}
350
351impl<Req: HttpRequestor, R: RetryPolicy<JsonRpcProviderClientError>> Debug for JsonRpcProviderClient<Req, R> {
352    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
353        f.debug_struct("JsonRpcProviderClient")
354            .field("id", &self.id)
355            .field("url", &self.url)
356            .field("requests_enqueued", &self.requests_enqueued)
357            .finish_non_exhaustive()
358    }
359}
360
361impl<Req: HttpRequestor + Clone, R: RetryPolicy<JsonRpcProviderClientError> + Clone> Clone
362    for JsonRpcProviderClient<Req, R>
363{
364    fn clone(&self) -> Self {
365        Self {
366            id: AtomicU64::new(1),
367            url: self.url.clone(),
368            requests_enqueued: AtomicU32::new(0),
369            requestor: self.requestor.clone(),
370            retry_policy: self.retry_policy.clone(),
371        }
372    }
373}
374
375#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
376#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
377impl<Req, R> JsonRpcClient for JsonRpcProviderClient<Req, R>
378where
379    Req: HttpRequestor,
380    R: RetryPolicy<JsonRpcProviderClientError> + Send + Sync,
381{
382    type Error = JsonRpcProviderClientError;
383
384    async fn request<T, A>(&self, method: &str, params: T) -> Result<A, Self::Error>
385    where
386        T: Serialize + Send + Sync,
387        A: DeserializeOwned + Send,
388    {
389        // Helper type that caches the `params` value across several retries
390        // This is necessary because the wrapper provider is supposed to skip he `params` if it's of
391        // size 0, see `crate::transports::common::Request`
392        enum RetryParams<Params> {
393            Value(Params),
394            Zst(()),
395        }
396
397        let params = if std::mem::size_of::<A>() == 0 {
398            RetryParams::Zst(())
399        } else {
400            let params = serde_json::to_value(params)
401                .map_err(|err| JsonRpcProviderClientError::SerdeJson { err, text: "".into() })?;
402            RetryParams::Value(params)
403        };
404
405        self.requests_enqueued.fetch_add(1, Ordering::SeqCst);
406        let start = std::time::Instant::now();
407
408        let mut num_retries = 0;
409        loop {
410            let err;
411
412            // hack to not hold `A` across an await in the sleep future and prevent requiring
413            // A: Send + Sync
414            {
415                let resp = match params {
416                    RetryParams::Value(ref params) => self.send_request_internal(method, params).await,
417                    RetryParams::Zst(unit) => self.send_request_internal(method, unit).await,
418                };
419
420                match resp {
421                    Ok(ret) => {
422                        self.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
423
424                        #[cfg(all(feature = "prometheus", not(test)))]
425                        METRIC_RETRIES_PER_RPC_CALL.observe(&[method], num_retries as f64);
426
427                        debug!(method, elapsed_in_ms = start.elapsed().as_millis(), "request succeeded",);
428                        return Ok(ret);
429                    }
430                    Err(req_err) => {
431                        err = req_err;
432                        error!(
433                            method,
434                            elapsed_in_ms = start.elapsed().as_millis(),
435                            error = %err,
436                            "request failed",
437                        );
438                        num_retries += 1;
439                    }
440                }
441            }
442
443            match self
444                .retry_policy
445                .is_retryable_error(&err, num_retries, self.requests_enqueued.load(Ordering::SeqCst))
446            {
447                NoRetry => {
448                    self.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
449                    warn!(method, "no more retries for RPC call");
450
451                    #[cfg(all(feature = "prometheus", not(test)))]
452                    METRIC_RETRIES_PER_RPC_CALL.observe(&[method], num_retries as f64);
453
454                    debug!(
455                        method,
456                        duration_in_ms = start.elapsed().as_millis(),
457                        "failed request duration in the retry queue",
458                    );
459                    return Err(err);
460                }
461                RetryAfter(backoff) => {
462                    warn!(method, backoff_in_ms = backoff.as_millis(), "request will retry",);
463                    sleep(backoff).await
464                }
465            }
466        }
467    }
468}
469
470#[cfg(any(test, feature = "runtime-async-std"))]
471pub mod surf_client {
472    use async_std::prelude::FutureExt;
473    use async_trait::async_trait;
474    use serde::Serialize;
475    use tracing::info;
476
477    use crate::errors::HttpRequestError;
478    use crate::{HttpPostRequestorConfig, HttpRequestor};
479
480    /// HTTP client that uses a non-Tokio runtime based HTTP client library, such as `surf`.
481    /// `surf` works also for Browsers in WASM environments.
482    #[derive(Clone, Debug, Default)]
483    pub struct SurfRequestor {
484        client: surf::Client,
485        cfg: HttpPostRequestorConfig,
486    }
487
488    impl SurfRequestor {
489        pub fn new(cfg: HttpPostRequestorConfig) -> Self {
490            info!(?cfg, "creating surf client");
491
492            let mut client = surf::client().with(surf::middleware::Redirect::new(cfg.max_redirects));
493
494            // Rate limit of 0 also means unlimited as if None was given
495            if let Some(max) = cfg.max_requests_per_sec.and_then(|r| (r > 0).then_some(r)) {
496                client = client.with(
497                    surf_governor::GovernorMiddleware::per_second(max)
498                        .expect("cannot setup http rate limiter middleware"),
499                );
500            }
501
502            Self { client, cfg }
503        }
504    }
505
506    #[async_trait]
507    impl HttpRequestor for SurfRequestor {
508        async fn http_query<T>(
509            &self,
510            method: http_types::Method,
511            url: &str,
512            data: Option<T>,
513        ) -> Result<Box<[u8]>, HttpRequestError>
514        where
515            T: Serialize + Send + Sync,
516        {
517            let request = match method {
518                http_types::Method::Post => self
519                    .client
520                    .post(url)
521                    .body_json(&data.ok_or(HttpRequestError::UnknownError("missing data".to_string()))?)
522                    .map_err(|e| HttpRequestError::UnknownError(e.to_string()))?,
523                http_types::Method::Get => self.client.get(url),
524                _ => return Err(HttpRequestError::UnknownError("unsupported method".to_string())),
525            };
526
527            async move {
528                match request.await {
529                    Ok(mut response) if response.status().is_success() => match response.body_bytes().await {
530                        Ok(data) => Ok(data.into_boxed_slice()),
531                        Err(e) => Err(HttpRequestError::TransportError(e.to_string())),
532                    },
533                    Ok(response) => Err(HttpRequestError::HttpError(response.status())),
534                    Err(e) => Err(HttpRequestError::TransportError(e.to_string())),
535                }
536            }
537            .timeout(self.cfg.http_request_timeout)
538            .await
539            .map_err(|_| HttpRequestError::Timeout)?
540        }
541    }
542}
543
544#[cfg(any(test, feature = "runtime-tokio"))]
545pub mod reqwest_client {
546    use async_trait::async_trait;
547    use http_types::StatusCode;
548    use serde::Serialize;
549    use std::sync::Arc;
550    use std::time::Duration;
551    use tracing::info;
552
553    use crate::errors::HttpRequestError;
554    use crate::{HttpPostRequestorConfig, HttpRequestor};
555
556    /// HTTP client that uses a Tokio runtime-based HTTP client library, such as `reqwest`.
557    #[derive(Clone, Debug, Default)]
558    pub struct ReqwestRequestor {
559        client: reqwest::Client,
560        limiter: Option<Arc<governor::DefaultKeyedRateLimiter<String>>>,
561    }
562
563    impl ReqwestRequestor {
564        pub fn new(cfg: HttpPostRequestorConfig) -> Self {
565            info!(?cfg, "creating reqwest client");
566            Self {
567                client: reqwest::Client::builder()
568                    .timeout(cfg.http_request_timeout)
569                    .redirect(reqwest::redirect::Policy::limited(cfg.max_redirects as usize))
570                    // 30 seconds is longer than the normal interval between RPC requests, thus the
571                    // connection should remain available
572                    .tcp_keepalive(Some(Duration::from_secs(30)))
573                    // Enable all supported encodings to reduce the amount of data transferred
574                    // in responses. This is relevant for large eth_getLogs responses.
575                    .zstd(true)
576                    .brotli(true)
577                    .build()
578                    .expect("could not build reqwest client"),
579                limiter: cfg
580                    .max_requests_per_sec
581                    .filter(|reqs| *reqs > 0) // Ensures the following unwrapping won't fail
582                    .map(|reqs| {
583                        Arc::new(governor::DefaultKeyedRateLimiter::keyed(governor::Quota::per_second(
584                            reqs.try_into().unwrap(),
585                        )))
586                    }),
587            }
588        }
589    }
590
591    #[async_trait]
592    impl HttpRequestor for ReqwestRequestor {
593        async fn http_query<T>(
594            &self,
595            method: http_types::Method,
596            url: &str,
597            data: Option<T>,
598        ) -> Result<Box<[u8]>, HttpRequestError>
599        where
600            T: Serialize + Send + Sync,
601        {
602            let url = reqwest::Url::parse(url)
603                .map_err(|e| HttpRequestError::UnknownError(format!("url parse error: {e}")))?;
604
605            let builder = match method {
606                http_types::Method::Get => self.client.get(url.clone()),
607                http_types::Method::Post => self.client.post(url.clone()).body(
608                    serde_json::to_string(&data.ok_or(HttpRequestError::UnknownError("missing data".to_string()))?)
609                        .map_err(|e| HttpRequestError::UnknownError(format!("serialize error: {e}")))?,
610                ),
611                _ => return Err(HttpRequestError::UnknownError("unsupported method".to_string())),
612            };
613
614            if self
615                .limiter
616                .clone()
617                .map(|limiter| limiter.check_key(&url.host_str().unwrap_or(".").to_string()).is_ok())
618                .unwrap_or(true)
619            {
620                let resp = builder
621                    .header("content-type", "application/json")
622                    .send()
623                    .await
624                    .map_err(|e| {
625                        if e.is_status() {
626                            HttpRequestError::HttpError(
627                                StatusCode::try_from(e.status().map(|s| s.as_u16()).unwrap_or(500))
628                                    .expect("status code must be compatible"), // cannot happen
629                            )
630                        } else if e.is_timeout() {
631                            HttpRequestError::Timeout
632                        } else {
633                            HttpRequestError::UnknownError(e.to_string())
634                        }
635                    })?;
636
637                resp.bytes()
638                    .await
639                    .map(|b| Box::from(b.as_ref()))
640                    .map_err(|e| HttpRequestError::UnknownError(format!("error retrieving body: {e}")))
641            } else {
642                Err(HttpRequestError::HttpError(StatusCode::TooManyRequests))
643            }
644        }
645    }
646}
647
648/// Snapshot of a response cached by the [`SnapshotRequestor`].
649#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
650pub struct RequestorResponseSnapshot {
651    id: usize,
652    request: String,
653    response: String,
654}
655
656/// Replays an RPC response to a request if it is found in the snapshot YAML file.
657/// If no such request has been seen before,
658/// it captures the new request/response pair obtained from the inner [`HttpRequestor`]
659/// and stores it into the snapshot file.
660///
661/// This is useful for snapshot testing only and should **NOT** be used in production.
662#[derive(Debug, Clone)]
663pub struct SnapshotRequestor<T> {
664    inner: T,
665    next_id: Arc<AtomicUsize>,
666    entries: moka::future::Cache<String, RequestorResponseSnapshot>,
667    file: String,
668    aggressive_save: bool,
669    fail_on_miss: bool,
670    ignore_snapshot: bool,
671}
672
673impl<T> SnapshotRequestor<T> {
674    /// Creates a new instance by wrapping an existing [`HttpRequestor`] and capturing
675    /// the request/response pairs.
676    ///
677    /// The constructor does not load any [snapshot entries](SnapshotRequestor) from
678    /// the `snapshot_file`.
679    /// The [`SnapshotRequestor::load`] method must be used after construction to do that.
680    pub fn new(inner: T, snapshot_file: &str) -> Self {
681        Self {
682            inner,
683            next_id: Arc::new(AtomicUsize::new(1)),
684            entries: moka::future::Cache::builder().build(),
685            file: snapshot_file.to_owned(),
686            aggressive_save: false,
687            fail_on_miss: false,
688            ignore_snapshot: false,
689        }
690    }
691
692    /// Gets the path to the snapshot disk file.
693    pub fn snapshot_path(&self) -> &str {
694        &self.file
695    }
696
697    /// Clears all entries from the snapshot in memory.
698    /// The snapshot file is not changed.
699    pub fn clear(&self) {
700        self.entries.invalidate_all();
701        self.next_id.store(1, Ordering::Relaxed);
702    }
703
704    /// Clears all entries and loads them from the snapshot file.
705    /// If `fail_on_miss` is set and the data is successfully loaded, all later
706    /// requests that miss the loaded snapshot will result in HTTP error 404.
707    pub async fn try_load(&mut self, fail_on_miss: bool) -> Result<(), std::io::Error> {
708        if self.ignore_snapshot {
709            return Ok(());
710        }
711
712        let loaded = serde_yaml::from_reader::<_, Vec<RequestorResponseSnapshot>>(std::fs::File::open(&self.file)?)
713            .map_err(std::io::Error::other)?;
714
715        self.clear();
716
717        let loaded_len = futures::stream::iter(loaded)
718            .then(|entry| {
719                self.next_id.fetch_max(entry.id, Ordering::Relaxed);
720                self.entries.insert(entry.request.clone(), entry)
721            })
722            .collect::<Vec<_>>()
723            .await
724            .len();
725
726        if loaded_len > 0 {
727            self.fail_on_miss = fail_on_miss;
728        }
729
730        tracing::debug!("snapshot with {loaded_len} entries has been loaded from {}", &self.file);
731        Ok(())
732    }
733
734    /// Similar as [`SnapshotRequestor::try_load`], except that no entries are cleared if the load fails.
735    ///
736    /// This method consumes and returns self for easier call chaining.
737    pub async fn load(mut self, fail_on_miss: bool) -> Self {
738        let _ = self.try_load(fail_on_miss).await;
739        self
740    }
741
742    /// Forces saving to disk on each newly inserted entry.
743    ///
744    /// Use this only when the expected number of entries in the snapshot is small.
745    pub fn with_aggresive_save(mut self) -> Self {
746        self.aggressive_save = true;
747        self
748    }
749
750    /// If set, the snapshot data will be ignored and resolution
751    /// will always be done with the inner requestor.
752    ///
753    /// This will inhibit any attempts to [`load`](SnapshotRequestor::try_load) or
754    /// [`save`](SnapshotRequestor::save) snapshot data.
755    pub fn with_ignore_snapshot(mut self, ignore_snapshot: bool) -> Self {
756        self.ignore_snapshot = ignore_snapshot;
757        self
758    }
759
760    /// Save the currently cached entries to the snapshot file on disk.
761    ///
762    /// Note that this method is automatically called on Drop, so usually it is unnecessary
763    /// to call it explicitly.
764    pub fn save(&self) -> Result<(), std::io::Error> {
765        if self.ignore_snapshot {
766            return Ok(());
767        }
768
769        let mut values: Vec<RequestorResponseSnapshot> = self.entries.iter().map(|(_, r)| r).collect();
770        values.sort_unstable_by_key(|a| a.id);
771
772        let mut writer = BufWriter::new(std::fs::File::create(&self.file)?);
773
774        serde_yaml::to_writer(&mut writer, &values).map_err(std::io::Error::other)?;
775
776        writer.flush()?;
777
778        tracing::debug!("snapshot with {} entries saved to file {}", values.len(), self.file);
779        Ok(())
780    }
781}
782
783impl<R: HttpRequestor> SnapshotRequestor<R> {
784    async fn http_post_with_snapshot<In>(&self, url: &str, data: In) -> Result<Box<[u8]>, HttpRequestError>
785    where
786        In: Serialize + Send + Sync,
787    {
788        let request = serde_json::to_string(&data)
789            .map_err(|e| HttpRequestError::UnknownError(format!("serialize error: {e}")))?;
790
791        let inserted = AtomicBool::new(false);
792        let result = self
793            .entries
794            .entry(request.clone())
795            .or_try_insert_with(async {
796                if self.fail_on_miss {
797                    tracing::error!("{request} is missing in {}", &self.file);
798                    return Err(HttpRequestError::HttpError(http_types::StatusCode::NotFound));
799                }
800
801                let response = self.inner.http_post(url, data).await?;
802                let id = self.next_id.fetch_add(1, Ordering::SeqCst);
803                inserted.store(true, Ordering::Relaxed);
804
805                tracing::debug!("saved new snapshot entry #{id}");
806                Ok(RequestorResponseSnapshot {
807                    id,
808                    request: request.clone(),
809                    response: String::from_utf8(response.into_vec())
810                        .map_err(|e| HttpRequestError::UnknownError(format!("unparseable data: {e}")))?,
811                })
812            })
813            .await
814            .map(|e| e.into_value().response.into_bytes().into_boxed_slice())
815            .map_err(|e: Arc<HttpRequestError>| e.as_ref().clone())?;
816
817        if inserted.load(Ordering::Relaxed) && self.aggressive_save {
818            tracing::debug!("{request} was NOT found and was resolved");
819            self.save().map_err(|e| HttpRequestError::UnknownError(e.to_string()))?;
820        } else {
821            tracing::debug!("{request} was found");
822        }
823
824        Ok(result)
825    }
826}
827
828impl<T> Drop for SnapshotRequestor<T> {
829    fn drop(&mut self) {
830        if let Err(e) = self.save() {
831            tracing::error!("failed to save snapshot: {e}");
832        }
833    }
834}
835
836#[async_trait::async_trait]
837impl<R: HttpRequestor> HttpRequestor for SnapshotRequestor<R> {
838    async fn http_query<T>(&self, _: Method, _: &str, _: Option<T>) -> Result<Box<[u8]>, HttpRequestError>
839    where
840        T: Serialize + Send + Sync,
841    {
842        todo!()
843    }
844
845    async fn http_post<T>(&self, url: &str, data: T) -> Result<Box<[u8]>, HttpRequestError>
846    where
847        T: Serialize + Send + Sync,
848    {
849        self.http_post_with_snapshot(url, data).await
850    }
851
852    async fn http_get(&self, _url: &str) -> Result<Box<[u8]>, HttpRequestError> {
853        todo!()
854    }
855}
856
857#[async_trait]
858impl<R: HttpRequestor> HttpRequestor for &SnapshotRequestor<R> {
859    async fn http_query<T>(&self, _: Method, _: &str, _: Option<T>) -> Result<Box<[u8]>, HttpRequestError>
860    where
861        T: Serialize + Send + Sync,
862    {
863        todo!()
864    }
865
866    async fn http_post<T>(&self, url: &str, data: T) -> Result<Box<[u8]>, HttpRequestError>
867    where
868        T: Serialize + Send + Sync,
869    {
870        self.http_post_with_snapshot(url, data).await
871    }
872
873    async fn http_get(&self, _url: &str) -> Result<Box<[u8]>, HttpRequestError> {
874        todo!()
875    }
876}
877
878type AnvilRpcClient<R> = ethers::middleware::SignerMiddleware<
879    ethers::providers::Provider<JsonRpcProviderClient<R, SimpleJsonRpcRetryPolicy>>,
880    ethers::signers::Wallet<ethers::core::k256::ecdsa::SigningKey>,
881>;
882
883/// Used for testing. Creates Ethers RPC client to the local Anvil instance.
884#[cfg(not(target_arch = "wasm32"))]
885pub fn create_rpc_client_to_anvil<R: HttpRequestor>(
886    backend: R,
887    anvil: &ethers::utils::AnvilInstance,
888    signer: &hopr_crypto_types::keypairs::ChainKeypair,
889) -> Arc<AnvilRpcClient<R>> {
890    use ethers::signers::Signer;
891    use hopr_crypto_types::keypairs::Keypair;
892
893    let wallet =
894        ethers::signers::LocalWallet::from_bytes(signer.secret().as_ref()).expect("failed to construct wallet");
895    let json_client = JsonRpcProviderClient::new(&anvil.endpoint(), backend, SimpleJsonRpcRetryPolicy::default());
896    let provider = ethers::providers::Provider::new(json_client).interval(Duration::from_millis(10_u64));
897
898    Arc::new(ethers::middleware::SignerMiddleware::new(
899        provider,
900        wallet.with_chain_id(anvil.chain_id()),
901    ))
902}
903
904#[cfg(test)]
905mod tests {
906    use async_trait::async_trait;
907    use ethers::providers::JsonRpcClient;
908    use hopr_async_runtime::prelude::sleep;
909    use hopr_chain_types::utils::create_anvil;
910    use hopr_chain_types::{ContractAddresses, ContractInstances};
911    use hopr_crypto_types::keypairs::{ChainKeypair, Keypair};
912    use hopr_primitive_types::primitives::Address;
913    use http_types::Method;
914    use serde::Serialize;
915    use serde_json::json;
916    use std::fmt::Debug;
917    use std::sync::atomic::Ordering;
918    use std::time::Duration;
919    use tempfile::NamedTempFile;
920
921    use crate::client::reqwest_client::ReqwestRequestor;
922    use crate::client::surf_client::SurfRequestor;
923    use crate::client::{
924        create_rpc_client_to_anvil, JsonRpcProviderClient, SimpleJsonRpcRetryPolicy, SnapshotRequestor,
925    };
926    use crate::errors::{HttpRequestError, JsonRpcProviderClientError};
927    use crate::{HttpRequestor, ZeroRetryPolicy};
928
929    async fn deploy_contracts<R: HttpRequestor + Debug>(req: R) -> anyhow::Result<ContractAddresses> {
930        let anvil = create_anvil(None);
931        let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
932
933        let client = create_rpc_client_to_anvil(req, &anvil, &chain_key_0);
934
935        let contracts = ContractInstances::deploy_for_testing(client.clone(), &chain_key_0)
936            .await
937            .expect("deploy failed");
938
939        Ok(ContractAddresses::from(&contracts))
940    }
941
942    #[async_std::test]
943    async fn test_client_should_deploy_contracts_via_surf() -> anyhow::Result<()> {
944        let contract_addrs = deploy_contracts(SurfRequestor::default()).await?;
945
946        assert_ne!(contract_addrs.token, Address::default());
947        assert_ne!(contract_addrs.channels, Address::default());
948        assert_ne!(contract_addrs.announcements, Address::default());
949        assert_ne!(contract_addrs.network_registry, Address::default());
950        assert_ne!(contract_addrs.safe_registry, Address::default());
951        assert_ne!(contract_addrs.price_oracle, Address::default());
952
953        Ok(())
954    }
955
956    #[tokio::test]
957    async fn test_client_should_deploy_contracts_via_reqwest() -> anyhow::Result<()> {
958        let contract_addrs = deploy_contracts(ReqwestRequestor::default()).await?;
959
960        assert_ne!(contract_addrs.token, Address::default());
961        assert_ne!(contract_addrs.channels, Address::default());
962        assert_ne!(contract_addrs.announcements, Address::default());
963        assert_ne!(contract_addrs.network_registry, Address::default());
964        assert_ne!(contract_addrs.safe_registry, Address::default());
965        assert_ne!(contract_addrs.price_oracle, Address::default());
966
967        Ok(())
968    }
969
970    #[async_std::test]
971    async fn test_client_should_get_block_number() -> anyhow::Result<()> {
972        let block_time = Duration::from_secs(1);
973
974        let anvil = create_anvil(Some(block_time));
975        let client = JsonRpcProviderClient::new(
976            &anvil.endpoint(),
977            SurfRequestor::default(),
978            SimpleJsonRpcRetryPolicy::default(),
979        );
980
981        let mut last_number = 0;
982
983        for _ in 0..3 {
984            sleep(block_time).await;
985
986            let number: ethers::types::U64 = client.request("eth_blockNumber", ()).await?;
987
988            assert!(number.as_u64() > last_number, "next block number must be greater");
989            last_number = number.as_u64();
990        }
991
992        assert_eq!(
993            0,
994            client.requests_enqueued.load(Ordering::SeqCst),
995            "retry queue should be zero on successful requests"
996        );
997
998        Ok(())
999    }
1000
1001    #[async_std::test]
1002    async fn test_client_should_fail_on_malformed_request() {
1003        let anvil = create_anvil(None);
1004        let client = JsonRpcProviderClient::new(
1005            &anvil.endpoint(),
1006            SurfRequestor::default(),
1007            SimpleJsonRpcRetryPolicy::default(),
1008        );
1009
1010        let err = client
1011            .request::<_, ethers::types::U64>("eth_blockNumber_bla", ())
1012            .await
1013            .expect_err("expected error");
1014
1015        assert!(matches!(err, JsonRpcProviderClientError::JsonRpcError(..)));
1016    }
1017
1018    #[async_std::test]
1019    async fn test_client_should_fail_on_malformed_response() {
1020        let mut server = mockito::Server::new_async().await;
1021
1022        let m = server
1023            .mock("POST", "/")
1024            .with_status(200)
1025            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1026            .with_body("}malformed{")
1027            .expect(1)
1028            .create();
1029
1030        let client = JsonRpcProviderClient::new(
1031            &server.url(),
1032            SurfRequestor::default(),
1033            SimpleJsonRpcRetryPolicy::default(),
1034        );
1035
1036        let err = client
1037            .request::<_, ethers::types::U64>("eth_blockNumber", ())
1038            .await
1039            .expect_err("expected error");
1040
1041        m.assert();
1042        assert!(matches!(err, JsonRpcProviderClientError::SerdeJson { .. }));
1043    }
1044
1045    #[async_std::test]
1046    async fn test_client_should_retry_on_http_error() {
1047        let mut server = mockito::Server::new_async().await;
1048
1049        let m = server
1050            .mock("POST", "/")
1051            .with_status(http_types::StatusCode::TooManyRequests as usize)
1052            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1053            .with_body("{}")
1054            .expect(3)
1055            .create();
1056
1057        let client = JsonRpcProviderClient::new(
1058            &server.url(),
1059            SurfRequestor::default(),
1060            SimpleJsonRpcRetryPolicy {
1061                max_retries: Some(2),
1062                retryable_http_errors: vec![http_types::StatusCode::TooManyRequests],
1063                initial_backoff: Duration::from_millis(100),
1064                ..SimpleJsonRpcRetryPolicy::default()
1065            },
1066        );
1067
1068        let err = client
1069            .request::<_, ethers::types::U64>("eth_blockNumber", ())
1070            .await
1071            .expect_err("expected error");
1072
1073        m.assert();
1074        assert!(matches!(err, JsonRpcProviderClientError::BackendError(_)));
1075        assert_eq!(
1076            0,
1077            client.requests_enqueued.load(Ordering::SeqCst),
1078            "retry queue should be zero when policy says no more retries"
1079        );
1080    }
1081
1082    #[async_std::test]
1083    async fn test_client_should_not_retry_with_zero_retry_policy() {
1084        let mut server = mockito::Server::new_async().await;
1085
1086        let m = server
1087            .mock("POST", "/")
1088            .with_status(404)
1089            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1090            .with_body("{}")
1091            .expect(1)
1092            .create();
1093
1094        let client = JsonRpcProviderClient::new(&server.url(), SurfRequestor::default(), ZeroRetryPolicy::default());
1095
1096        let err = client
1097            .request::<_, ethers::types::U64>("eth_blockNumber", ())
1098            .await
1099            .expect_err("expected error");
1100
1101        m.assert();
1102        assert!(matches!(err, JsonRpcProviderClientError::BackendError(_)));
1103        assert_eq!(
1104            0,
1105            client.requests_enqueued.load(Ordering::SeqCst),
1106            "retry queue should be zero when policy says no more retries"
1107        );
1108    }
1109
1110    #[async_std::test]
1111    async fn test_client_should_retry_on_json_rpc_error() {
1112        let mut server = mockito::Server::new_async().await;
1113
1114        let m = server
1115            .mock("POST", "/")
1116            .with_status(200)
1117            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1118            .with_body(
1119                r#"{
1120              "jsonrpc": "2.0",
1121              "id": 1,
1122              "error": {
1123                "message": "some message",
1124                "code": -32603
1125              }
1126            }"#,
1127            )
1128            .expect(3)
1129            .create();
1130
1131        let client = JsonRpcProviderClient::new(
1132            &server.url(),
1133            SurfRequestor::default(),
1134            SimpleJsonRpcRetryPolicy {
1135                max_retries: Some(2),
1136                retryable_json_rpc_errors: vec![-32603],
1137                initial_backoff: Duration::from_millis(100),
1138                ..SimpleJsonRpcRetryPolicy::default()
1139            },
1140        );
1141
1142        let err = client
1143            .request::<_, ethers::types::U64>("eth_blockNumber", ())
1144            .await
1145            .expect_err("expected error");
1146
1147        m.assert();
1148        assert!(matches!(err, JsonRpcProviderClientError::JsonRpcError(_)));
1149        assert_eq!(
1150            0,
1151            client.requests_enqueued.load(Ordering::SeqCst),
1152            "retry queue should be zero when policy says no more retries"
1153        );
1154    }
1155
1156    #[async_std::test]
1157    async fn test_client_should_not_retry_on_nonretryable_json_rpc_error() {
1158        let mut server = mockito::Server::new_async().await;
1159
1160        let m = server
1161            .mock("POST", "/")
1162            .with_status(200)
1163            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1164            .with_body(
1165                r#"{
1166              "jsonrpc": "2.0",
1167              "id": 1,
1168              "error": {
1169                "message": "some message",
1170                "code": -32000
1171              }
1172            }"#,
1173            )
1174            .expect(1)
1175            .create();
1176
1177        let client = JsonRpcProviderClient::new(
1178            &server.url(),
1179            SurfRequestor::default(),
1180            SimpleJsonRpcRetryPolicy {
1181                max_retries: Some(2),
1182                retryable_json_rpc_errors: vec![],
1183                initial_backoff: Duration::from_millis(100),
1184                ..SimpleJsonRpcRetryPolicy::default()
1185            },
1186        );
1187
1188        let err = client
1189            .request::<_, ethers::types::U64>("eth_blockNumber", ())
1190            .await
1191            .expect_err("expected error");
1192
1193        m.assert();
1194        assert!(matches!(err, JsonRpcProviderClientError::JsonRpcError(_)));
1195        assert_eq!(
1196            0,
1197            client.requests_enqueued.load(Ordering::SeqCst),
1198            "retry queue should be zero when policy says no more retries"
1199        );
1200    }
1201
1202    #[async_std::test]
1203    async fn test_client_should_retry_on_nonretryable_json_rpc_error_if_min_retries_is_given() {
1204        let mut server = mockito::Server::new_async().await;
1205
1206        let m = server
1207            .mock("POST", "/")
1208            .with_status(200)
1209            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1210            .with_body(
1211                r#"{
1212              "jsonrpc": "2.0",
1213              "id": 1,
1214              "error": {
1215                "message": "some message",
1216                "code": -32000
1217              }
1218            }"#,
1219            )
1220            .expect(2)
1221            .create();
1222
1223        let client = JsonRpcProviderClient::new(
1224            &server.url(),
1225            SurfRequestor::default(),
1226            SimpleJsonRpcRetryPolicy {
1227                min_retries: Some(1),
1228                max_retries: Some(2),
1229                retryable_json_rpc_errors: vec![],
1230                initial_backoff: Duration::from_millis(100),
1231                ..SimpleJsonRpcRetryPolicy::default()
1232            },
1233        );
1234
1235        let err = client
1236            .request::<_, ethers::types::U64>("eth_blockNumber", ())
1237            .await
1238            .expect_err("expected error");
1239
1240        m.assert();
1241        assert!(matches!(err, JsonRpcProviderClientError::JsonRpcError(_)));
1242        assert_eq!(
1243            0,
1244            client.requests_enqueued.load(Ordering::SeqCst),
1245            "retry queue should be zero when policy says no more retries"
1246        );
1247    }
1248
1249    #[async_std::test]
1250    async fn test_client_should_retry_on_malformed_json_rpc_error() {
1251        let mut server = mockito::Server::new_async().await;
1252
1253        let m = server
1254            .mock("POST", "/")
1255            .with_status(200)
1256            .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1257            .with_body(
1258                r#"{
1259              "jsonrpc": "2.0",
1260              "error": {
1261                "message": "some message",
1262                "code": -32600
1263              }
1264            }"#,
1265            )
1266            .expect(3)
1267            .create();
1268
1269        let client = JsonRpcProviderClient::new(
1270            &server.url(),
1271            SurfRequestor::default(),
1272            SimpleJsonRpcRetryPolicy {
1273                max_retries: Some(2),
1274                retryable_json_rpc_errors: vec![-32600],
1275                initial_backoff: Duration::from_millis(100),
1276                ..SimpleJsonRpcRetryPolicy::default()
1277            },
1278        );
1279
1280        let err = client
1281            .request::<_, ethers::types::U64>("eth_blockNumber", ())
1282            .await
1283            .expect_err("expected error");
1284
1285        m.assert();
1286        assert!(matches!(err, JsonRpcProviderClientError::SerdeJson { .. }));
1287        assert_eq!(
1288            0,
1289            client.requests_enqueued.load(Ordering::SeqCst),
1290            "retry queue should be zero when policy says no more retries"
1291        );
1292    }
1293
1294    // Requires manual implementation, because mockall does not work well with generic methods
1295    // in non-generic traits.
1296    #[derive(Debug)]
1297    struct NullHttpPostRequestor;
1298
1299    #[async_trait]
1300    impl HttpRequestor for NullHttpPostRequestor {
1301        async fn http_query<T>(&self, _: Method, _: &str, _: Option<T>) -> Result<Box<[u8]>, HttpRequestError>
1302        where
1303            T: Serialize + Send + Sync,
1304        {
1305            Err(HttpRequestError::UnknownError("use of NullHttpPostRequestor".into()))
1306        }
1307    }
1308
1309    #[test_log::test(async_std::test)]
1310    async fn test_client_from_file() -> anyhow::Result<()> {
1311        let block_time = Duration::from_millis(1100);
1312        let snapshot_file = NamedTempFile::new()?;
1313
1314        let anvil = create_anvil(Some(block_time));
1315        {
1316            let client = JsonRpcProviderClient::new(
1317                &anvil.endpoint(),
1318                SnapshotRequestor::new(SurfRequestor::default(), snapshot_file.path().to_str().unwrap()),
1319                SimpleJsonRpcRetryPolicy::default(),
1320            );
1321
1322            let mut last_number = 0;
1323
1324            for _ in 0..3 {
1325                sleep(block_time).await;
1326
1327                let number: ethers::types::U64 = client.request("eth_blockNumber", ()).await?;
1328
1329                assert!(number.as_u64() > last_number, "next block number must be greater");
1330                last_number = number.as_u64();
1331            }
1332        }
1333
1334        {
1335            let client = JsonRpcProviderClient::new(
1336                &anvil.endpoint(),
1337                SnapshotRequestor::new(NullHttpPostRequestor, snapshot_file.path().to_str().unwrap())
1338                    .load(true)
1339                    .await,
1340                SimpleJsonRpcRetryPolicy::default(),
1341            );
1342
1343            let mut last_number = 0;
1344            for _ in 0..3 {
1345                sleep(block_time).await;
1346
1347                let number: ethers::types::U64 = client.request("eth_blockNumber", ()).await?;
1348
1349                assert!(number.as_u64() > last_number, "next block number must be greater");
1350                last_number = number.as_u64();
1351            }
1352        }
1353
1354        Ok(())
1355    }
1356}