1use async_trait::async_trait;
16use ethers::providers::{JsonRpcClient, JsonRpcError};
17use futures::StreamExt;
18use http_types::Method;
19use serde::de::DeserializeOwned;
20use serde::{Deserialize, Serialize};
21use std::fmt::{Debug, Formatter};
22use std::io::{BufWriter, Write};
23use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
24use std::sync::Arc;
25use std::time::Duration;
26use tracing::{debug, error, trace, warn};
27use validator::Validate;
28
29use hopr_async_runtime::prelude::sleep;
30
31use crate::client::RetryAction::{NoRetry, RetryAfter};
32use crate::errors::{HttpRequestError, JsonRpcProviderClientError};
33use crate::helper::{Request, Response};
34use crate::{HttpRequestor, RetryAction, RetryPolicy};
35
36#[cfg(all(feature = "prometheus", not(test)))]
37use hopr_metrics::metrics::{MultiCounter, MultiHistogram};
38
39#[cfg(all(feature = "prometheus", not(test)))]
40lazy_static::lazy_static! {
41 static ref METRIC_COUNT_RPC_CALLS: MultiCounter = MultiCounter::new(
42 "hopr_rpc_call_count",
43 "Number of Ethereum RPC calls over HTTP and their result",
44 &["call", "result"]
45 )
46 .unwrap();
47 static ref METRIC_RPC_CALLS_TIMING: MultiHistogram = MultiHistogram::new(
48 "hopr_rpc_call_time_sec",
49 "Timing of RPC calls over HTTP in seconds",
50 vec![0.1, 0.5, 1.0, 2.0, 5.0, 7.0, 10.0],
51 &["call"]
52 )
53 .unwrap();
54 static ref METRIC_RETRIES_PER_RPC_CALL: MultiHistogram = MultiHistogram::new(
55 "hopr_retries_per_rpc_call",
56 "Number of retries per RPC call",
57 vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
58 &["call"]
59 )
60 .unwrap();
61}
62
63#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault, Serialize, Deserialize, Validate)]
86pub struct SimpleJsonRpcRetryPolicy {
87 #[validate(range(min = 0))]
91 #[default(Some(0))]
92 pub min_retries: Option<u32>,
93
94 #[validate(range(min = 1))]
100 #[default(Some(12))]
101 pub max_retries: Option<u32>,
102 #[default(Duration::from_secs(1))]
109 pub initial_backoff: Duration,
110 #[validate(range(min = 0.0))]
117 #[default(0.3)]
118 pub backoff_coefficient: f64,
119 #[default(Duration::from_secs(30))]
125 pub max_backoff: Duration,
126 pub backoff_on_transport_errors: bool,
130 #[default(_code = "vec![-32005, -32016, 429]")]
134 pub retryable_json_rpc_errors: Vec<i64>,
135
136 #[default(
140 _code = "vec![http_types::StatusCode::TooManyRequests,http_types::StatusCode::GatewayTimeout,http_types::StatusCode::ServiceUnavailable]"
141 )]
142 pub retryable_http_errors: Vec<http_types::StatusCode>,
143 #[validate(range(min = 5))]
149 #[default = 100]
150 pub max_retry_queue_size: u32,
151}
152
153impl SimpleJsonRpcRetryPolicy {
154 fn is_retryable_json_rpc_error(&self, err: &JsonRpcError) -> bool {
155 self.retryable_json_rpc_errors.contains(&err.code) || err.message.contains("rate limit")
156 }
157
158 fn is_retryable_http_error(&self, status: &http_types::StatusCode) -> bool {
159 self.retryable_http_errors.contains(status)
160 }
161}
162
163impl RetryPolicy<JsonRpcProviderClientError> for SimpleJsonRpcRetryPolicy {
164 fn is_retryable_error(
165 &self,
166 err: &JsonRpcProviderClientError,
167 num_retries: u32,
168 retry_queue_size: u32,
169 ) -> RetryAction {
170 if self.max_retries.is_some_and(|max| num_retries > max) {
171 warn!(
172 count = self.max_retries.expect("max_retries must be set"),
173 "max number of retries has been reached"
174 );
175 return NoRetry;
176 }
177
178 debug!(
179 size = retry_queue_size,
180 "checking retry queue size after retryable error"
181 );
182
183 if retry_queue_size > self.max_retry_queue_size {
184 warn!(
185 size = self.max_retry_queue_size,
186 "maximum size of retry queue has been reached"
187 );
188 return NoRetry;
189 }
190
191 let backoff = self
193 .initial_backoff
194 .mul_f64(f64::powi(1.0 + self.backoff_coefficient, (num_retries - 1) as i32))
195 .min(self.max_backoff);
196
197 if self.min_retries.is_some_and(|min| num_retries <= min) {
199 debug!(num_retries, min_retries = ?self.min_retries, "retrying because minimum number of retries not yet reached");
200 return RetryAfter(backoff);
201 }
202
203 match err {
204 JsonRpcProviderClientError::JsonRpcError(e) if self.is_retryable_json_rpc_error(e) => {
206 debug!(error = %e, "encountered retryable JSON RPC error code");
207 RetryAfter(backoff)
208 }
209
210 JsonRpcProviderClientError::BackendError(HttpRequestError::HttpError(e))
212 if self.is_retryable_http_error(e) =>
213 {
214 debug!(error = ?e, "encountered retryable HTTP error code");
215 RetryAfter(backoff)
216 }
217
218 JsonRpcProviderClientError::BackendError(e @ HttpRequestError::Timeout)
220 | JsonRpcProviderClientError::BackendError(e @ HttpRequestError::TransportError(_))
221 | JsonRpcProviderClientError::BackendError(e @ HttpRequestError::UnknownError(_)) => {
222 debug!(error = %e, "encountered retryable transport error");
223 RetryAfter(if self.backoff_on_transport_errors {
224 backoff
225 } else {
226 self.initial_backoff
227 })
228 }
229
230 JsonRpcProviderClientError::SerdeJson { text, .. } => {
232 #[derive(Deserialize)]
233 struct Resp {
234 error: JsonRpcError,
235 }
236
237 match serde_json::from_str::<Resp>(text) {
238 Ok(Resp { error }) if self.is_retryable_json_rpc_error(&error) => {
239 debug!(%error, "encountered retryable JSON RPC error");
240 RetryAfter(backoff)
241 }
242 _ => {
243 debug!(error = %text, "unparseable JSON RPC error");
244 NoRetry
245 }
246 }
247 }
248
249 _ => NoRetry,
251 }
252 }
253}
254
255pub struct JsonRpcProviderClient<Req: HttpRequestor, R: RetryPolicy<JsonRpcProviderClientError>> {
260 id: AtomicU64,
261 requests_enqueued: AtomicU32,
262 url: String,
263 requestor: Req,
264 retry_policy: R,
265}
266
267impl<Req: HttpRequestor, R: RetryPolicy<JsonRpcProviderClientError>> JsonRpcProviderClient<Req, R> {
268 pub fn new(base_url: &str, requestor: Req, retry_policy: R) -> Self {
270 Self {
271 id: AtomicU64::new(1),
272 requests_enqueued: AtomicU32::new(0),
273 url: base_url.to_owned(),
274 requestor,
275 retry_policy,
276 }
277 }
278
279 async fn send_request_internal<T, A>(&self, method: &str, params: T) -> Result<A, JsonRpcProviderClientError>
280 where
281 T: Serialize + Send + Sync,
282 A: DeserializeOwned,
283 {
284 let next_id = self.id.fetch_add(1, Ordering::SeqCst);
286 let payload = Request::new(next_id, method, params);
287
288 debug!(method, "sending rpc request");
289 trace!(
290 method,
291 request = serde_json::to_string(&payload).expect("request must be serializable"),
292 "sending rpc request",
293 );
294
295 let start = std::time::Instant::now();
297 let body = self.requestor.http_post(self.url.as_ref(), payload).await?;
298 let req_duration = start.elapsed();
299
300 trace!(method, duration_in_ms = req_duration.as_millis(), "rpc request took");
301
302 #[cfg(all(feature = "prometheus", not(test)))]
303 METRIC_RPC_CALLS_TIMING.observe(&[method], req_duration.as_secs_f64());
304
305 let raw = match serde_json::from_slice(&body) {
307 Ok(Response::Success { result, .. }) => result.to_owned(),
308 Ok(Response::Error { error, .. }) => {
309 #[cfg(all(feature = "prometheus", not(test)))]
310 METRIC_COUNT_RPC_CALLS.increment(&[method, "failure"]);
311
312 return Err(error.into());
313 }
314 Ok(_) => {
315 let err = JsonRpcProviderClientError::SerdeJson {
316 err: serde::de::Error::custom("unexpected notification over HTTP transport"),
317 text: String::from_utf8_lossy(&body).to_string(),
318 };
319 #[cfg(all(feature = "prometheus", not(test)))]
320 METRIC_COUNT_RPC_CALLS.increment(&[method, "failure"]);
321
322 return Err(err);
323 }
324 Err(err) => {
325 #[cfg(all(feature = "prometheus", not(test)))]
326 METRIC_COUNT_RPC_CALLS.increment(&[method, "failure"]);
327
328 return Err(JsonRpcProviderClientError::SerdeJson {
329 err,
330 text: String::from_utf8_lossy(&body).to_string(),
331 });
332 }
333 };
334
335 let json_str = raw.get();
337 trace!(method, response = &json_str, "rpc request response received");
338
339 let res = serde_json::from_str(json_str).map_err(|err| JsonRpcProviderClientError::SerdeJson {
340 err,
341 text: raw.to_string(),
342 })?;
343
344 #[cfg(all(feature = "prometheus", not(test)))]
345 METRIC_COUNT_RPC_CALLS.increment(&[method, "success"]);
346
347 Ok(res)
348 }
349}
350
351impl<Req: HttpRequestor, R: RetryPolicy<JsonRpcProviderClientError>> Debug for JsonRpcProviderClient<Req, R> {
352 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
353 f.debug_struct("JsonRpcProviderClient")
354 .field("id", &self.id)
355 .field("url", &self.url)
356 .field("requests_enqueued", &self.requests_enqueued)
357 .finish_non_exhaustive()
358 }
359}
360
361impl<Req: HttpRequestor + Clone, R: RetryPolicy<JsonRpcProviderClientError> + Clone> Clone
362 for JsonRpcProviderClient<Req, R>
363{
364 fn clone(&self) -> Self {
365 Self {
366 id: AtomicU64::new(1),
367 url: self.url.clone(),
368 requests_enqueued: AtomicU32::new(0),
369 requestor: self.requestor.clone(),
370 retry_policy: self.retry_policy.clone(),
371 }
372 }
373}
374
375#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
376#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
377impl<Req, R> JsonRpcClient for JsonRpcProviderClient<Req, R>
378where
379 Req: HttpRequestor,
380 R: RetryPolicy<JsonRpcProviderClientError> + Send + Sync,
381{
382 type Error = JsonRpcProviderClientError;
383
384 async fn request<T, A>(&self, method: &str, params: T) -> Result<A, Self::Error>
385 where
386 T: Serialize + Send + Sync,
387 A: DeserializeOwned + Send,
388 {
389 enum RetryParams<Params> {
393 Value(Params),
394 Zst(()),
395 }
396
397 let params = if std::mem::size_of::<A>() == 0 {
398 RetryParams::Zst(())
399 } else {
400 let params = serde_json::to_value(params)
401 .map_err(|err| JsonRpcProviderClientError::SerdeJson { err, text: "".into() })?;
402 RetryParams::Value(params)
403 };
404
405 self.requests_enqueued.fetch_add(1, Ordering::SeqCst);
406 let start = std::time::Instant::now();
407
408 let mut num_retries = 0;
409 loop {
410 let err;
411
412 {
415 let resp = match params {
416 RetryParams::Value(ref params) => self.send_request_internal(method, params).await,
417 RetryParams::Zst(unit) => self.send_request_internal(method, unit).await,
418 };
419
420 match resp {
421 Ok(ret) => {
422 self.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
423
424 #[cfg(all(feature = "prometheus", not(test)))]
425 METRIC_RETRIES_PER_RPC_CALL.observe(&[method], num_retries as f64);
426
427 debug!(method, elapsed_in_ms = start.elapsed().as_millis(), "request succeeded",);
428 return Ok(ret);
429 }
430 Err(req_err) => {
431 err = req_err;
432 error!(
433 method,
434 elapsed_in_ms = start.elapsed().as_millis(),
435 error = %err,
436 "request failed",
437 );
438 num_retries += 1;
439 }
440 }
441 }
442
443 match self
444 .retry_policy
445 .is_retryable_error(&err, num_retries, self.requests_enqueued.load(Ordering::SeqCst))
446 {
447 NoRetry => {
448 self.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
449 warn!(method, "no more retries for RPC call");
450
451 #[cfg(all(feature = "prometheus", not(test)))]
452 METRIC_RETRIES_PER_RPC_CALL.observe(&[method], num_retries as f64);
453
454 debug!(
455 method,
456 duration_in_ms = start.elapsed().as_millis(),
457 "failed request duration in the retry queue",
458 );
459 return Err(err);
460 }
461 RetryAfter(backoff) => {
462 warn!(method, backoff_in_ms = backoff.as_millis(), "request will retry",);
463 sleep(backoff).await
464 }
465 }
466 }
467 }
468}
469
470#[cfg(any(test, feature = "runtime-async-std"))]
471pub mod surf_client {
472 use async_std::prelude::FutureExt;
473 use async_trait::async_trait;
474 use serde::Serialize;
475 use tracing::info;
476
477 use crate::errors::HttpRequestError;
478 use crate::{HttpPostRequestorConfig, HttpRequestor};
479
480 #[derive(Clone, Debug, Default)]
483 pub struct SurfRequestor {
484 client: surf::Client,
485 cfg: HttpPostRequestorConfig,
486 }
487
488 impl SurfRequestor {
489 pub fn new(cfg: HttpPostRequestorConfig) -> Self {
490 info!(?cfg, "creating surf client");
491
492 let mut client = surf::client().with(surf::middleware::Redirect::new(cfg.max_redirects));
493
494 if let Some(max) = cfg.max_requests_per_sec.and_then(|r| (r > 0).then_some(r)) {
496 client = client.with(
497 surf_governor::GovernorMiddleware::per_second(max)
498 .expect("cannot setup http rate limiter middleware"),
499 );
500 }
501
502 Self { client, cfg }
503 }
504 }
505
506 #[async_trait]
507 impl HttpRequestor for SurfRequestor {
508 async fn http_query<T>(
509 &self,
510 method: http_types::Method,
511 url: &str,
512 data: Option<T>,
513 ) -> Result<Box<[u8]>, HttpRequestError>
514 where
515 T: Serialize + Send + Sync,
516 {
517 let request = match method {
518 http_types::Method::Post => self
519 .client
520 .post(url)
521 .body_json(&data.ok_or(HttpRequestError::UnknownError("missing data".to_string()))?)
522 .map_err(|e| HttpRequestError::UnknownError(e.to_string()))?,
523 http_types::Method::Get => self.client.get(url),
524 _ => return Err(HttpRequestError::UnknownError("unsupported method".to_string())),
525 };
526
527 async move {
528 match request.await {
529 Ok(mut response) if response.status().is_success() => match response.body_bytes().await {
530 Ok(data) => Ok(data.into_boxed_slice()),
531 Err(e) => Err(HttpRequestError::TransportError(e.to_string())),
532 },
533 Ok(response) => Err(HttpRequestError::HttpError(response.status())),
534 Err(e) => Err(HttpRequestError::TransportError(e.to_string())),
535 }
536 }
537 .timeout(self.cfg.http_request_timeout)
538 .await
539 .map_err(|_| HttpRequestError::Timeout)?
540 }
541 }
542}
543
544#[cfg(any(test, feature = "runtime-tokio"))]
545pub mod reqwest_client {
546 use async_trait::async_trait;
547 use http_types::StatusCode;
548 use serde::Serialize;
549 use std::sync::Arc;
550 use std::time::Duration;
551 use tracing::info;
552
553 use crate::errors::HttpRequestError;
554 use crate::{HttpPostRequestorConfig, HttpRequestor};
555
556 #[derive(Clone, Debug, Default)]
558 pub struct ReqwestRequestor {
559 client: reqwest::Client,
560 limiter: Option<Arc<governor::DefaultKeyedRateLimiter<String>>>,
561 }
562
563 impl ReqwestRequestor {
564 pub fn new(cfg: HttpPostRequestorConfig) -> Self {
565 info!(?cfg, "creating reqwest client");
566 Self {
567 client: reqwest::Client::builder()
568 .timeout(cfg.http_request_timeout)
569 .redirect(reqwest::redirect::Policy::limited(cfg.max_redirects as usize))
570 .tcp_keepalive(Some(Duration::from_secs(30)))
573 .zstd(true)
576 .brotli(true)
577 .build()
578 .expect("could not build reqwest client"),
579 limiter: cfg
580 .max_requests_per_sec
581 .filter(|reqs| *reqs > 0) .map(|reqs| {
583 Arc::new(governor::DefaultKeyedRateLimiter::keyed(governor::Quota::per_second(
584 reqs.try_into().unwrap(),
585 )))
586 }),
587 }
588 }
589 }
590
591 #[async_trait]
592 impl HttpRequestor for ReqwestRequestor {
593 async fn http_query<T>(
594 &self,
595 method: http_types::Method,
596 url: &str,
597 data: Option<T>,
598 ) -> Result<Box<[u8]>, HttpRequestError>
599 where
600 T: Serialize + Send + Sync,
601 {
602 let url = reqwest::Url::parse(url)
603 .map_err(|e| HttpRequestError::UnknownError(format!("url parse error: {e}")))?;
604
605 let builder = match method {
606 http_types::Method::Get => self.client.get(url.clone()),
607 http_types::Method::Post => self.client.post(url.clone()).body(
608 serde_json::to_string(&data.ok_or(HttpRequestError::UnknownError("missing data".to_string()))?)
609 .map_err(|e| HttpRequestError::UnknownError(format!("serialize error: {e}")))?,
610 ),
611 _ => return Err(HttpRequestError::UnknownError("unsupported method".to_string())),
612 };
613
614 if self
615 .limiter
616 .clone()
617 .map(|limiter| limiter.check_key(&url.host_str().unwrap_or(".").to_string()).is_ok())
618 .unwrap_or(true)
619 {
620 let resp = builder
621 .header("content-type", "application/json")
622 .send()
623 .await
624 .map_err(|e| {
625 if e.is_status() {
626 HttpRequestError::HttpError(
627 StatusCode::try_from(e.status().map(|s| s.as_u16()).unwrap_or(500))
628 .expect("status code must be compatible"), )
630 } else if e.is_timeout() {
631 HttpRequestError::Timeout
632 } else {
633 HttpRequestError::UnknownError(e.to_string())
634 }
635 })?;
636
637 resp.bytes()
638 .await
639 .map(|b| Box::from(b.as_ref()))
640 .map_err(|e| HttpRequestError::UnknownError(format!("error retrieving body: {e}")))
641 } else {
642 Err(HttpRequestError::HttpError(StatusCode::TooManyRequests))
643 }
644 }
645 }
646}
647
648#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
650pub struct RequestorResponseSnapshot {
651 id: usize,
652 request: String,
653 response: String,
654}
655
656#[derive(Debug, Clone)]
663pub struct SnapshotRequestor<T> {
664 inner: T,
665 next_id: Arc<AtomicUsize>,
666 entries: moka::future::Cache<String, RequestorResponseSnapshot>,
667 file: String,
668 aggressive_save: bool,
669 fail_on_miss: bool,
670 ignore_snapshot: bool,
671}
672
673impl<T> SnapshotRequestor<T> {
674 pub fn new(inner: T, snapshot_file: &str) -> Self {
681 Self {
682 inner,
683 next_id: Arc::new(AtomicUsize::new(1)),
684 entries: moka::future::Cache::builder().build(),
685 file: snapshot_file.to_owned(),
686 aggressive_save: false,
687 fail_on_miss: false,
688 ignore_snapshot: false,
689 }
690 }
691
692 pub fn snapshot_path(&self) -> &str {
694 &self.file
695 }
696
697 pub fn clear(&self) {
700 self.entries.invalidate_all();
701 self.next_id.store(1, Ordering::Relaxed);
702 }
703
704 pub async fn try_load(&mut self, fail_on_miss: bool) -> Result<(), std::io::Error> {
708 if self.ignore_snapshot {
709 return Ok(());
710 }
711
712 let loaded = serde_yaml::from_reader::<_, Vec<RequestorResponseSnapshot>>(std::fs::File::open(&self.file)?)
713 .map_err(std::io::Error::other)?;
714
715 self.clear();
716
717 let loaded_len = futures::stream::iter(loaded)
718 .then(|entry| {
719 self.next_id.fetch_max(entry.id, Ordering::Relaxed);
720 self.entries.insert(entry.request.clone(), entry)
721 })
722 .collect::<Vec<_>>()
723 .await
724 .len();
725
726 if loaded_len > 0 {
727 self.fail_on_miss = fail_on_miss;
728 }
729
730 tracing::debug!("snapshot with {loaded_len} entries has been loaded from {}", &self.file);
731 Ok(())
732 }
733
734 pub async fn load(mut self, fail_on_miss: bool) -> Self {
738 let _ = self.try_load(fail_on_miss).await;
739 self
740 }
741
742 pub fn with_aggresive_save(mut self) -> Self {
746 self.aggressive_save = true;
747 self
748 }
749
750 pub fn with_ignore_snapshot(mut self, ignore_snapshot: bool) -> Self {
756 self.ignore_snapshot = ignore_snapshot;
757 self
758 }
759
760 pub fn save(&self) -> Result<(), std::io::Error> {
765 if self.ignore_snapshot {
766 return Ok(());
767 }
768
769 let mut values: Vec<RequestorResponseSnapshot> = self.entries.iter().map(|(_, r)| r).collect();
770 values.sort_unstable_by_key(|a| a.id);
771
772 let mut writer = BufWriter::new(std::fs::File::create(&self.file)?);
773
774 serde_yaml::to_writer(&mut writer, &values).map_err(std::io::Error::other)?;
775
776 writer.flush()?;
777
778 tracing::debug!("snapshot with {} entries saved to file {}", values.len(), self.file);
779 Ok(())
780 }
781}
782
783impl<R: HttpRequestor> SnapshotRequestor<R> {
784 async fn http_post_with_snapshot<In>(&self, url: &str, data: In) -> Result<Box<[u8]>, HttpRequestError>
785 where
786 In: Serialize + Send + Sync,
787 {
788 let request = serde_json::to_string(&data)
789 .map_err(|e| HttpRequestError::UnknownError(format!("serialize error: {e}")))?;
790
791 let inserted = AtomicBool::new(false);
792 let result = self
793 .entries
794 .entry(request.clone())
795 .or_try_insert_with(async {
796 if self.fail_on_miss {
797 tracing::error!("{request} is missing in {}", &self.file);
798 return Err(HttpRequestError::HttpError(http_types::StatusCode::NotFound));
799 }
800
801 let response = self.inner.http_post(url, data).await?;
802 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
803 inserted.store(true, Ordering::Relaxed);
804
805 tracing::debug!("saved new snapshot entry #{id}");
806 Ok(RequestorResponseSnapshot {
807 id,
808 request: request.clone(),
809 response: String::from_utf8(response.into_vec())
810 .map_err(|e| HttpRequestError::UnknownError(format!("unparseable data: {e}")))?,
811 })
812 })
813 .await
814 .map(|e| e.into_value().response.into_bytes().into_boxed_slice())
815 .map_err(|e: Arc<HttpRequestError>| e.as_ref().clone())?;
816
817 if inserted.load(Ordering::Relaxed) && self.aggressive_save {
818 tracing::debug!("{request} was NOT found and was resolved");
819 self.save().map_err(|e| HttpRequestError::UnknownError(e.to_string()))?;
820 } else {
821 tracing::debug!("{request} was found");
822 }
823
824 Ok(result)
825 }
826}
827
828impl<T> Drop for SnapshotRequestor<T> {
829 fn drop(&mut self) {
830 if let Err(e) = self.save() {
831 tracing::error!("failed to save snapshot: {e}");
832 }
833 }
834}
835
836#[async_trait::async_trait]
837impl<R: HttpRequestor> HttpRequestor for SnapshotRequestor<R> {
838 async fn http_query<T>(&self, _: Method, _: &str, _: Option<T>) -> Result<Box<[u8]>, HttpRequestError>
839 where
840 T: Serialize + Send + Sync,
841 {
842 todo!()
843 }
844
845 async fn http_post<T>(&self, url: &str, data: T) -> Result<Box<[u8]>, HttpRequestError>
846 where
847 T: Serialize + Send + Sync,
848 {
849 self.http_post_with_snapshot(url, data).await
850 }
851
852 async fn http_get(&self, _url: &str) -> Result<Box<[u8]>, HttpRequestError> {
853 todo!()
854 }
855}
856
857#[async_trait]
858impl<R: HttpRequestor> HttpRequestor for &SnapshotRequestor<R> {
859 async fn http_query<T>(&self, _: Method, _: &str, _: Option<T>) -> Result<Box<[u8]>, HttpRequestError>
860 where
861 T: Serialize + Send + Sync,
862 {
863 todo!()
864 }
865
866 async fn http_post<T>(&self, url: &str, data: T) -> Result<Box<[u8]>, HttpRequestError>
867 where
868 T: Serialize + Send + Sync,
869 {
870 self.http_post_with_snapshot(url, data).await
871 }
872
873 async fn http_get(&self, _url: &str) -> Result<Box<[u8]>, HttpRequestError> {
874 todo!()
875 }
876}
877
878type AnvilRpcClient<R> = ethers::middleware::SignerMiddleware<
879 ethers::providers::Provider<JsonRpcProviderClient<R, SimpleJsonRpcRetryPolicy>>,
880 ethers::signers::Wallet<ethers::core::k256::ecdsa::SigningKey>,
881>;
882
883#[cfg(not(target_arch = "wasm32"))]
885pub fn create_rpc_client_to_anvil<R: HttpRequestor>(
886 backend: R,
887 anvil: ðers::utils::AnvilInstance,
888 signer: &hopr_crypto_types::keypairs::ChainKeypair,
889) -> Arc<AnvilRpcClient<R>> {
890 use ethers::signers::Signer;
891 use hopr_crypto_types::keypairs::Keypair;
892
893 let wallet =
894 ethers::signers::LocalWallet::from_bytes(signer.secret().as_ref()).expect("failed to construct wallet");
895 let json_client = JsonRpcProviderClient::new(&anvil.endpoint(), backend, SimpleJsonRpcRetryPolicy::default());
896 let provider = ethers::providers::Provider::new(json_client).interval(Duration::from_millis(10_u64));
897
898 Arc::new(ethers::middleware::SignerMiddleware::new(
899 provider,
900 wallet.with_chain_id(anvil.chain_id()),
901 ))
902}
903
904#[cfg(test)]
905mod tests {
906 use async_trait::async_trait;
907 use ethers::providers::JsonRpcClient;
908 use hopr_async_runtime::prelude::sleep;
909 use hopr_chain_types::utils::create_anvil;
910 use hopr_chain_types::{ContractAddresses, ContractInstances};
911 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair};
912 use hopr_primitive_types::primitives::Address;
913 use http_types::Method;
914 use serde::Serialize;
915 use serde_json::json;
916 use std::fmt::Debug;
917 use std::sync::atomic::Ordering;
918 use std::time::Duration;
919 use tempfile::NamedTempFile;
920
921 use crate::client::reqwest_client::ReqwestRequestor;
922 use crate::client::surf_client::SurfRequestor;
923 use crate::client::{
924 create_rpc_client_to_anvil, JsonRpcProviderClient, SimpleJsonRpcRetryPolicy, SnapshotRequestor,
925 };
926 use crate::errors::{HttpRequestError, JsonRpcProviderClientError};
927 use crate::{HttpRequestor, ZeroRetryPolicy};
928
929 async fn deploy_contracts<R: HttpRequestor + Debug>(req: R) -> anyhow::Result<ContractAddresses> {
930 let anvil = create_anvil(None);
931 let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
932
933 let client = create_rpc_client_to_anvil(req, &anvil, &chain_key_0);
934
935 let contracts = ContractInstances::deploy_for_testing(client.clone(), &chain_key_0)
936 .await
937 .expect("deploy failed");
938
939 Ok(ContractAddresses::from(&contracts))
940 }
941
942 #[async_std::test]
943 async fn test_client_should_deploy_contracts_via_surf() -> anyhow::Result<()> {
944 let contract_addrs = deploy_contracts(SurfRequestor::default()).await?;
945
946 assert_ne!(contract_addrs.token, Address::default());
947 assert_ne!(contract_addrs.channels, Address::default());
948 assert_ne!(contract_addrs.announcements, Address::default());
949 assert_ne!(contract_addrs.network_registry, Address::default());
950 assert_ne!(contract_addrs.safe_registry, Address::default());
951 assert_ne!(contract_addrs.price_oracle, Address::default());
952
953 Ok(())
954 }
955
956 #[tokio::test]
957 async fn test_client_should_deploy_contracts_via_reqwest() -> anyhow::Result<()> {
958 let contract_addrs = deploy_contracts(ReqwestRequestor::default()).await?;
959
960 assert_ne!(contract_addrs.token, Address::default());
961 assert_ne!(contract_addrs.channels, Address::default());
962 assert_ne!(contract_addrs.announcements, Address::default());
963 assert_ne!(contract_addrs.network_registry, Address::default());
964 assert_ne!(contract_addrs.safe_registry, Address::default());
965 assert_ne!(contract_addrs.price_oracle, Address::default());
966
967 Ok(())
968 }
969
970 #[async_std::test]
971 async fn test_client_should_get_block_number() -> anyhow::Result<()> {
972 let block_time = Duration::from_secs(1);
973
974 let anvil = create_anvil(Some(block_time));
975 let client = JsonRpcProviderClient::new(
976 &anvil.endpoint(),
977 SurfRequestor::default(),
978 SimpleJsonRpcRetryPolicy::default(),
979 );
980
981 let mut last_number = 0;
982
983 for _ in 0..3 {
984 sleep(block_time).await;
985
986 let number: ethers::types::U64 = client.request("eth_blockNumber", ()).await?;
987
988 assert!(number.as_u64() > last_number, "next block number must be greater");
989 last_number = number.as_u64();
990 }
991
992 assert_eq!(
993 0,
994 client.requests_enqueued.load(Ordering::SeqCst),
995 "retry queue should be zero on successful requests"
996 );
997
998 Ok(())
999 }
1000
1001 #[async_std::test]
1002 async fn test_client_should_fail_on_malformed_request() {
1003 let anvil = create_anvil(None);
1004 let client = JsonRpcProviderClient::new(
1005 &anvil.endpoint(),
1006 SurfRequestor::default(),
1007 SimpleJsonRpcRetryPolicy::default(),
1008 );
1009
1010 let err = client
1011 .request::<_, ethers::types::U64>("eth_blockNumber_bla", ())
1012 .await
1013 .expect_err("expected error");
1014
1015 assert!(matches!(err, JsonRpcProviderClientError::JsonRpcError(..)));
1016 }
1017
1018 #[async_std::test]
1019 async fn test_client_should_fail_on_malformed_response() {
1020 let mut server = mockito::Server::new_async().await;
1021
1022 let m = server
1023 .mock("POST", "/")
1024 .with_status(200)
1025 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1026 .with_body("}malformed{")
1027 .expect(1)
1028 .create();
1029
1030 let client = JsonRpcProviderClient::new(
1031 &server.url(),
1032 SurfRequestor::default(),
1033 SimpleJsonRpcRetryPolicy::default(),
1034 );
1035
1036 let err = client
1037 .request::<_, ethers::types::U64>("eth_blockNumber", ())
1038 .await
1039 .expect_err("expected error");
1040
1041 m.assert();
1042 assert!(matches!(err, JsonRpcProviderClientError::SerdeJson { .. }));
1043 }
1044
1045 #[async_std::test]
1046 async fn test_client_should_retry_on_http_error() {
1047 let mut server = mockito::Server::new_async().await;
1048
1049 let m = server
1050 .mock("POST", "/")
1051 .with_status(http_types::StatusCode::TooManyRequests as usize)
1052 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1053 .with_body("{}")
1054 .expect(3)
1055 .create();
1056
1057 let client = JsonRpcProviderClient::new(
1058 &server.url(),
1059 SurfRequestor::default(),
1060 SimpleJsonRpcRetryPolicy {
1061 max_retries: Some(2),
1062 retryable_http_errors: vec![http_types::StatusCode::TooManyRequests],
1063 initial_backoff: Duration::from_millis(100),
1064 ..SimpleJsonRpcRetryPolicy::default()
1065 },
1066 );
1067
1068 let err = client
1069 .request::<_, ethers::types::U64>("eth_blockNumber", ())
1070 .await
1071 .expect_err("expected error");
1072
1073 m.assert();
1074 assert!(matches!(err, JsonRpcProviderClientError::BackendError(_)));
1075 assert_eq!(
1076 0,
1077 client.requests_enqueued.load(Ordering::SeqCst),
1078 "retry queue should be zero when policy says no more retries"
1079 );
1080 }
1081
1082 #[async_std::test]
1083 async fn test_client_should_not_retry_with_zero_retry_policy() {
1084 let mut server = mockito::Server::new_async().await;
1085
1086 let m = server
1087 .mock("POST", "/")
1088 .with_status(404)
1089 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1090 .with_body("{}")
1091 .expect(1)
1092 .create();
1093
1094 let client = JsonRpcProviderClient::new(&server.url(), SurfRequestor::default(), ZeroRetryPolicy::default());
1095
1096 let err = client
1097 .request::<_, ethers::types::U64>("eth_blockNumber", ())
1098 .await
1099 .expect_err("expected error");
1100
1101 m.assert();
1102 assert!(matches!(err, JsonRpcProviderClientError::BackendError(_)));
1103 assert_eq!(
1104 0,
1105 client.requests_enqueued.load(Ordering::SeqCst),
1106 "retry queue should be zero when policy says no more retries"
1107 );
1108 }
1109
1110 #[async_std::test]
1111 async fn test_client_should_retry_on_json_rpc_error() {
1112 let mut server = mockito::Server::new_async().await;
1113
1114 let m = server
1115 .mock("POST", "/")
1116 .with_status(200)
1117 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1118 .with_body(
1119 r#"{
1120 "jsonrpc": "2.0",
1121 "id": 1,
1122 "error": {
1123 "message": "some message",
1124 "code": -32603
1125 }
1126 }"#,
1127 )
1128 .expect(3)
1129 .create();
1130
1131 let client = JsonRpcProviderClient::new(
1132 &server.url(),
1133 SurfRequestor::default(),
1134 SimpleJsonRpcRetryPolicy {
1135 max_retries: Some(2),
1136 retryable_json_rpc_errors: vec![-32603],
1137 initial_backoff: Duration::from_millis(100),
1138 ..SimpleJsonRpcRetryPolicy::default()
1139 },
1140 );
1141
1142 let err = client
1143 .request::<_, ethers::types::U64>("eth_blockNumber", ())
1144 .await
1145 .expect_err("expected error");
1146
1147 m.assert();
1148 assert!(matches!(err, JsonRpcProviderClientError::JsonRpcError(_)));
1149 assert_eq!(
1150 0,
1151 client.requests_enqueued.load(Ordering::SeqCst),
1152 "retry queue should be zero when policy says no more retries"
1153 );
1154 }
1155
1156 #[async_std::test]
1157 async fn test_client_should_not_retry_on_nonretryable_json_rpc_error() {
1158 let mut server = mockito::Server::new_async().await;
1159
1160 let m = server
1161 .mock("POST", "/")
1162 .with_status(200)
1163 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1164 .with_body(
1165 r#"{
1166 "jsonrpc": "2.0",
1167 "id": 1,
1168 "error": {
1169 "message": "some message",
1170 "code": -32000
1171 }
1172 }"#,
1173 )
1174 .expect(1)
1175 .create();
1176
1177 let client = JsonRpcProviderClient::new(
1178 &server.url(),
1179 SurfRequestor::default(),
1180 SimpleJsonRpcRetryPolicy {
1181 max_retries: Some(2),
1182 retryable_json_rpc_errors: vec![],
1183 initial_backoff: Duration::from_millis(100),
1184 ..SimpleJsonRpcRetryPolicy::default()
1185 },
1186 );
1187
1188 let err = client
1189 .request::<_, ethers::types::U64>("eth_blockNumber", ())
1190 .await
1191 .expect_err("expected error");
1192
1193 m.assert();
1194 assert!(matches!(err, JsonRpcProviderClientError::JsonRpcError(_)));
1195 assert_eq!(
1196 0,
1197 client.requests_enqueued.load(Ordering::SeqCst),
1198 "retry queue should be zero when policy says no more retries"
1199 );
1200 }
1201
1202 #[async_std::test]
1203 async fn test_client_should_retry_on_nonretryable_json_rpc_error_if_min_retries_is_given() {
1204 let mut server = mockito::Server::new_async().await;
1205
1206 let m = server
1207 .mock("POST", "/")
1208 .with_status(200)
1209 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1210 .with_body(
1211 r#"{
1212 "jsonrpc": "2.0",
1213 "id": 1,
1214 "error": {
1215 "message": "some message",
1216 "code": -32000
1217 }
1218 }"#,
1219 )
1220 .expect(2)
1221 .create();
1222
1223 let client = JsonRpcProviderClient::new(
1224 &server.url(),
1225 SurfRequestor::default(),
1226 SimpleJsonRpcRetryPolicy {
1227 min_retries: Some(1),
1228 max_retries: Some(2),
1229 retryable_json_rpc_errors: vec![],
1230 initial_backoff: Duration::from_millis(100),
1231 ..SimpleJsonRpcRetryPolicy::default()
1232 },
1233 );
1234
1235 let err = client
1236 .request::<_, ethers::types::U64>("eth_blockNumber", ())
1237 .await
1238 .expect_err("expected error");
1239
1240 m.assert();
1241 assert!(matches!(err, JsonRpcProviderClientError::JsonRpcError(_)));
1242 assert_eq!(
1243 0,
1244 client.requests_enqueued.load(Ordering::SeqCst),
1245 "retry queue should be zero when policy says no more retries"
1246 );
1247 }
1248
1249 #[async_std::test]
1250 async fn test_client_should_retry_on_malformed_json_rpc_error() {
1251 let mut server = mockito::Server::new_async().await;
1252
1253 let m = server
1254 .mock("POST", "/")
1255 .with_status(200)
1256 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1257 .with_body(
1258 r#"{
1259 "jsonrpc": "2.0",
1260 "error": {
1261 "message": "some message",
1262 "code": -32600
1263 }
1264 }"#,
1265 )
1266 .expect(3)
1267 .create();
1268
1269 let client = JsonRpcProviderClient::new(
1270 &server.url(),
1271 SurfRequestor::default(),
1272 SimpleJsonRpcRetryPolicy {
1273 max_retries: Some(2),
1274 retryable_json_rpc_errors: vec![-32600],
1275 initial_backoff: Duration::from_millis(100),
1276 ..SimpleJsonRpcRetryPolicy::default()
1277 },
1278 );
1279
1280 let err = client
1281 .request::<_, ethers::types::U64>("eth_blockNumber", ())
1282 .await
1283 .expect_err("expected error");
1284
1285 m.assert();
1286 assert!(matches!(err, JsonRpcProviderClientError::SerdeJson { .. }));
1287 assert_eq!(
1288 0,
1289 client.requests_enqueued.load(Ordering::SeqCst),
1290 "retry queue should be zero when policy says no more retries"
1291 );
1292 }
1293
1294 #[derive(Debug)]
1297 struct NullHttpPostRequestor;
1298
1299 #[async_trait]
1300 impl HttpRequestor for NullHttpPostRequestor {
1301 async fn http_query<T>(&self, _: Method, _: &str, _: Option<T>) -> Result<Box<[u8]>, HttpRequestError>
1302 where
1303 T: Serialize + Send + Sync,
1304 {
1305 Err(HttpRequestError::UnknownError("use of NullHttpPostRequestor".into()))
1306 }
1307 }
1308
1309 #[test_log::test(async_std::test)]
1310 async fn test_client_from_file() -> anyhow::Result<()> {
1311 let block_time = Duration::from_millis(1100);
1312 let snapshot_file = NamedTempFile::new()?;
1313
1314 let anvil = create_anvil(Some(block_time));
1315 {
1316 let client = JsonRpcProviderClient::new(
1317 &anvil.endpoint(),
1318 SnapshotRequestor::new(SurfRequestor::default(), snapshot_file.path().to_str().unwrap()),
1319 SimpleJsonRpcRetryPolicy::default(),
1320 );
1321
1322 let mut last_number = 0;
1323
1324 for _ in 0..3 {
1325 sleep(block_time).await;
1326
1327 let number: ethers::types::U64 = client.request("eth_blockNumber", ()).await?;
1328
1329 assert!(number.as_u64() > last_number, "next block number must be greater");
1330 last_number = number.as_u64();
1331 }
1332 }
1333
1334 {
1335 let client = JsonRpcProviderClient::new(
1336 &anvil.endpoint(),
1337 SnapshotRequestor::new(NullHttpPostRequestor, snapshot_file.path().to_str().unwrap())
1338 .load(true)
1339 .await,
1340 SimpleJsonRpcRetryPolicy::default(),
1341 );
1342
1343 let mut last_number = 0;
1344 for _ in 0..3 {
1345 sleep(block_time).await;
1346
1347 let number: ethers::types::U64 = client.request("eth_blockNumber", ()).await?;
1348
1349 assert!(number.as_u64() > last_number, "next block number must be greater");
1350 last_number = number.as_u64();
1351 }
1352 }
1353
1354 Ok(())
1355 }
1356}