1use std::{
13 fmt::Debug,
14 future::IntoFuture,
15 io::{BufWriter, Write},
16 sync::{
17 Arc,
18 atomic::{AtomicBool, AtomicUsize, Ordering},
19 },
20 task::{Context, Poll},
21 time::Duration,
22};
23
24use alloy::eips::eip1559::Eip1559Estimation;
26use alloy::{
27 network::{EthereumWallet, Network, TransactionBuilder},
28 primitives::utils::parse_units,
29 providers::{
30 Identity, Provider, RootProvider, SendableTx,
31 fillers::{
32 BlobGasFiller, ChainIdFiller, FillProvider, FillerControlFlow, GasFiller, JoinFill, NonceFiller, TxFiller,
33 WalletFiller,
34 },
35 },
36 rpc::json_rpc::{ErrorPayload, RequestPacket, ResponsePacket, ResponsePayload},
37 transports::{HttpError, TransportError, TransportErrorKind, TransportFut, TransportResult, layers::RetryPolicy},
38};
39use futures::{FutureExt, StreamExt};
40use serde::{Deserialize, Serialize};
41use serde_with::{DisplayFromStr, serde_as};
42use tower::{Layer, Service};
43use tracing::{error, trace};
44use url::Url;
45use validator::Validate;
46
47pub const EIP1559_FEE_ESTIMATION_DEFAULT_MAX_FEE_GNOSIS: u128 = 3_000_000_000;
52pub const EIP1559_FEE_ESTIMATION_DEFAULT_PRIORITY_FEE_GNOSIS: u128 = 100_000_000;
53
54use crate::{rpc::DEFAULT_GAS_ORACLE_URL, transport::HttpRequestor};
55
56#[cfg(all(feature = "prometheus", not(test)))]
57lazy_static::lazy_static! {
58 static ref METRIC_COUNT_RPC_CALLS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
59 "hopr_rpc_call_count",
60 "Number of Ethereum RPC calls over HTTP and their result",
61 &["call", "result"]
62 )
63 .unwrap();
64 static ref METRIC_RPC_CALLS_TIMING: hopr_metrics::MultiHistogram = hopr_metrics::MultiHistogram::new(
65 "hopr_rpc_call_time_sec",
66 "Timing of RPC calls over HTTP in seconds",
67 vec![0.1, 0.5, 1.0, 2.0, 5.0, 7.0, 10.0],
68 &["call"]
69 )
70 .unwrap();
71 static ref METRIC_RETRIES_PER_RPC_CALL: hopr_metrics::MultiHistogram = hopr_metrics::MultiHistogram::new(
72 "hopr_retries_per_rpc_call",
73 "Number of retries per RPC call",
74 vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
75 &["call"]
76 )
77 .unwrap();
78}
79
80#[serde_as]
110#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault, Serialize, Deserialize, Validate)]
111pub struct DefaultRetryPolicy {
112 #[validate(range(min = 0))]
116 #[default(Some(0))]
117 pub min_retries: Option<u32>,
118
119 #[default(Duration::from_secs(1))]
126 pub initial_backoff: Duration,
127
128 #[validate(range(min = 0.0))]
135 #[default(0.3)]
136 pub backoff_coefficient: f64,
137 #[default(Duration::from_secs(30))]
143 pub max_backoff: Duration,
144 pub backoff_on_transport_errors: bool,
148 #[default(_code = "vec![-32005, -32016, 429]")]
152 pub retryable_json_rpc_errors: Vec<i64>,
153
154 #[serde_as(as = "Vec<DisplayFromStr>")]
158 #[default(
159 _code = "vec![http::StatusCode::TOO_MANY_REQUESTS,http::StatusCode::GATEWAY_TIMEOUT,\
160 http::StatusCode::SERVICE_UNAVAILABLE]"
161 )]
162 pub retryable_http_errors: Vec<http::StatusCode>,
163
164 #[validate(range(min = 5))]
170 #[default = 100]
171 pub max_retry_queue_size: u32,
172}
173
174impl DefaultRetryPolicy {
175 fn is_retryable_json_rpc_errors(&self, rpc_err: &ErrorPayload) -> bool {
176 self.retryable_json_rpc_errors.contains(&rpc_err.code)
177 }
178
179 fn is_retryable_http_errors(&self, http_err: &HttpError) -> bool {
180 let status_code = match http::StatusCode::try_from(http_err.status) {
181 Ok(status_code) => status_code,
182 Err(_) => return false,
183 };
184 self.retryable_http_errors.contains(&status_code)
185 }
186}
187
188impl RetryPolicy for DefaultRetryPolicy {
189 fn should_retry(&self, err: &TransportError) -> bool {
190 match err {
191 TransportError::Transport(err) => {
194 match err {
195 TransportErrorKind::MissingBatchResponse(_) => true,
197 TransportErrorKind::HttpError(http_err) => {
198 http_err.is_rate_limit_err() || self.is_retryable_http_errors(http_err)
199 }
200 TransportErrorKind::Custom(err) => {
201 let msg = err.to_string();
202 msg.contains("429 Too Many Requests")
203 }
204 _ => false,
205 }
206 }
207 TransportError::SerError(_) => false,
210 TransportError::DeserError { text, .. } => {
211 if let Ok(resp) = serde_json::from_str::<ErrorPayload>(text) {
212 return self.is_retryable_json_rpc_errors(&resp);
213 }
214
215 #[derive(Deserialize)]
218 struct Resp {
219 error: ErrorPayload,
220 }
221
222 if let Ok(resp) = serde_json::from_str::<Resp>(text) {
223 return self.is_retryable_json_rpc_errors(&resp.error);
224 }
225
226 false
227 }
228 TransportError::ErrorResp(err) => self.is_retryable_json_rpc_errors(err),
229 TransportError::NullResp => true,
230 _ => false,
231 }
232 }
233
234 fn backoff_hint(&self, _error: &alloy::transports::TransportError) -> Option<std::time::Duration> {
237 None
238 }
239}
240
241#[derive(Debug, Clone)]
242pub struct ZeroRetryPolicy;
243impl RetryPolicy for ZeroRetryPolicy {
244 fn should_retry(&self, _err: &alloy::transports::TransportError) -> bool {
245 false
246 }
247
248 fn backoff_hint(&self, _error: &alloy::transports::TransportError) -> Option<std::time::Duration> {
249 None
250 }
251}
252
253#[derive(Clone, Copy, Default, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
255pub enum GasCategory {
256 SafeLow,
257 #[default]
258 Standard,
259 Fast,
260 Fastest,
261}
262
263#[derive(Clone, Debug)]
268pub struct GasOracleFiller<C> {
269 client: C,
270 url: Url,
271 gas_category: GasCategory,
272}
273
274#[derive(Clone, Debug, Deserialize, PartialEq)]
275pub struct GasOracleResponse {
276 pub status: String,
277 pub message: String,
278 pub result: GasOracleResponseResult,
279}
280
281#[derive(Clone, Debug, Deserialize, PartialEq)]
282#[serde(rename_all = "PascalCase")]
283pub struct GasOracleResponseResult {
284 pub last_block: String,
285 pub safe_gas_price: String,
286 pub propose_gas_price: String,
287 pub fast_gas_price: String,
288}
289
290impl GasOracleResponse {
291 #[inline]
292 pub fn gas_from_category(&self, gas_category: GasCategory) -> String {
293 self.result.gas_from_category(gas_category)
294 }
295}
296
297impl GasOracleResponseResult {
298 fn gas_from_category(&self, gas_category: GasCategory) -> String {
299 match gas_category {
300 GasCategory::SafeLow => self.safe_gas_price.clone(),
301 GasCategory::Standard => self.propose_gas_price.clone(),
302 GasCategory::Fast => self.fast_gas_price.clone(),
303 GasCategory::Fastest => self.fast_gas_price.clone(),
304 }
305 }
306}
307
308impl<C> GasOracleFiller<C>
309where
310 C: HttpRequestor + Clone,
311{
312 pub fn new(client: C, url: Option<Url>) -> Self {
314 Self {
315 client,
316 url: url.unwrap_or_else(|| Url::parse(DEFAULT_GAS_ORACLE_URL).unwrap()),
317 gas_category: GasCategory::Standard,
318 }
319 }
320
321 pub fn category(mut self, gas_category: GasCategory) -> Self {
323 self.gas_category = gas_category;
324 self
325 }
326
327 pub async fn query(&self) -> Result<GasOracleResponse, TransportError> {
329 let raw_value = self
330 .client
331 .http_get(self.url.as_str())
332 .await
333 .map_err(TransportErrorKind::custom)?;
334
335 let parsed: GasOracleResponse = serde_json::from_slice(raw_value.as_ref()).map_err(|e| {
336 error!(%e, "failed to deserialize gas price API response");
337 TransportErrorKind::Custom("failed to deserialize gas price API response".into())
338 })?;
339
340 Ok(parsed)
341 }
342
343 async fn prepare_legacy<P, N>(&self, provider: &P, tx: &N::TransactionRequest) -> TransportResult<GasOracleFillable>
344 where
345 P: Provider<N>,
346 N: Network,
347 {
348 let gas_limit_fut = tx.gas_limit().map_or_else(
349 || provider.estimate_gas(tx.clone()).into_future().right_future(),
350 |gas_limit| async move { Ok(gas_limit) }.left_future(),
351 );
352
353 let res_fut = self.query();
354
355 let (gas_limit, res) = futures::try_join!(gas_limit_fut, res_fut)?;
357
358 let gas_price_in_gwei = res.gas_from_category(self.gas_category);
363 let gas_price = parse_units(&gas_price_in_gwei, "gwei")
364 .map_err(|e| TransportErrorKind::custom_str(&format!("Failed to parse gwei from gas oracle: {e}")))?;
365 let gas_price_in_128: u128 = gas_price
366 .get_absolute()
367 .try_into()
368 .map_err(|_| TransportErrorKind::custom_str("Conversion overflow"))?;
369
370 Ok(GasOracleFillable::Legacy {
371 gas_limit,
372 gas_price: tx.gas_price().unwrap_or(gas_price_in_128),
373 })
374 }
375
376 async fn prepare_1559<P, N>(&self, provider: &P, tx: &N::TransactionRequest) -> TransportResult<GasOracleFillable>
377 where
378 P: Provider<N>,
379 N: Network,
380 {
381 let gas_limit_fut = tx.gas_limit().map_or_else(
382 || provider.estimate_gas(tx.clone()).into_future().right_future(),
383 |gas_limit| async move { Ok(gas_limit) }.left_future(),
384 );
385
386 let gas_limit = gas_limit_fut.await?;
388
389 Ok(GasOracleFillable::Eip1559 {
390 gas_limit,
391 estimate: self.estimate_eip1559_fees(),
392 })
393 }
394
395 fn estimate_eip1559_fees(&self) -> Eip1559Estimation {
399 Eip1559Estimation {
400 max_fee_per_gas: EIP1559_FEE_ESTIMATION_DEFAULT_MAX_FEE_GNOSIS,
401 max_priority_fee_per_gas: EIP1559_FEE_ESTIMATION_DEFAULT_PRIORITY_FEE_GNOSIS,
402 }
403 }
404}
405
406#[doc(hidden)]
408#[derive(Clone, Copy, Debug, PartialEq, Eq)]
409pub enum GasOracleFillable {
410 Legacy {
411 gas_limit: u64,
412 gas_price: u128,
413 },
414 Eip1559 {
415 gas_limit: u64,
416 estimate: Eip1559Estimation,
417 },
418}
419
420impl<N, C> TxFiller<N> for GasOracleFiller<C>
421where
422 N: Network,
423 C: HttpRequestor + Clone,
424{
425 type Fillable = GasOracleFillable;
426
427 fn status(&self, tx: &<N as Network>::TransactionRequest) -> FillerControlFlow {
428 if tx.gas_price().is_some() && tx.gas_limit().is_some() {
430 return FillerControlFlow::Finished;
431 }
432
433 if tx.max_fee_per_gas().is_some() && tx.max_priority_fee_per_gas().is_some() && tx.gas_limit().is_some() {
435 return FillerControlFlow::Finished;
436 }
437
438 FillerControlFlow::Ready
439 }
440
441 fn fill_sync(&self, _tx: &mut SendableTx<N>) {}
442
443 async fn prepare<P>(&self, provider: &P, tx: &<N as Network>::TransactionRequest) -> TransportResult<Self::Fillable>
444 where
445 P: Provider<N>,
446 {
447 if tx.gas_price().is_some() {
448 self.prepare_legacy(provider, tx).await
449 } else {
450 match self.prepare_1559(provider, tx).await {
451 Ok(estimate) => Ok(estimate),
453 Err(e) => Err(e),
454 }
455 }
456 }
457
458 async fn fill(&self, fillable: Self::Fillable, mut tx: SendableTx<N>) -> TransportResult<SendableTx<N>> {
459 if let Some(builder) = tx.as_mut_builder() {
460 match fillable {
461 GasOracleFillable::Legacy { gas_limit, gas_price } => {
462 builder.set_gas_limit(gas_limit);
463 builder.set_gas_price(gas_price);
464 }
465 GasOracleFillable::Eip1559 { gas_limit, estimate } => {
466 builder.set_gas_limit(gas_limit);
467 builder.set_max_fee_per_gas(estimate.max_fee_per_gas);
468 builder.set_max_priority_fee_per_gas(estimate.max_priority_fee_per_gas);
469 }
470 }
471 };
472 Ok(tx)
473 }
474}
475
476pub struct MetricsLayer;
477
478#[derive(Debug, Clone)]
479pub struct MetricsService<S> {
480 inner: S,
481}
482
483impl<S> Layer<S> for MetricsLayer {
485 type Service = MetricsService<S>;
486
487 fn layer(&self, inner: S) -> Self::Service {
488 MetricsService { inner }
489 }
490}
491
492impl<S> Service<RequestPacket> for MetricsService<S>
494where
495 S: Service<RequestPacket, Future = TransportFut<'static>, Error = TransportError> + Send + 'static + Clone,
496 S::Error: Send + 'static + Debug,
497{
498 type Error = TransportError;
499 type Future = TransportFut<'static>;
500 type Response = ResponsePacket;
501
502 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
503 self.inner.poll_ready(cx)
504 }
505
506 fn call(&mut self, request: RequestPacket) -> Self::Future {
507 let start = std::time::Instant::now();
509
510 let method_names = match request.clone() {
511 RequestPacket::Single(single_req) => vec![single_req.method().to_owned()],
512 RequestPacket::Batch(vec_req) => vec_req.iter().map(|s_req| s_req.method().to_owned()).collect(),
513 };
514
515 let future = self.inner.call(request);
516
517 Box::pin(async move {
519 let res = future.await;
520
521 let req_duration = start.elapsed();
522 method_names.iter().for_each(|method| {
523 trace!(method, duration_in_ms = req_duration.as_millis(), "rpc request took");
524 #[cfg(all(feature = "prometheus", not(test)))]
525 METRIC_RPC_CALLS_TIMING.observe(&[method], req_duration.as_secs_f64());
526 });
527
528 match &res {
530 Ok(result) => match result {
531 ResponsePacket::Single(a) => match a.payload {
532 ResponsePayload::Success(_) => {
533 #[cfg(all(feature = "prometheus", not(test)))]
534 METRIC_COUNT_RPC_CALLS.increment(&[&method_names[0], "success"]);
535 }
536 ResponsePayload::Failure(_) => {
537 #[cfg(all(feature = "prometheus", not(test)))]
538 METRIC_COUNT_RPC_CALLS.increment(&[&method_names[0], "failure"]);
539 }
540 },
541 ResponsePacket::Batch(b) => {
542 b.iter().enumerate().for_each(|(i, _)| match b[i].payload {
543 ResponsePayload::Success(_) => {
544 #[cfg(all(feature = "prometheus", not(test)))]
545 METRIC_COUNT_RPC_CALLS.increment(&[&method_names[i], "success"]);
546 }
547 ResponsePayload::Failure(_) => {
548 #[cfg(all(feature = "prometheus", not(test)))]
549 METRIC_COUNT_RPC_CALLS.increment(&[&method_names[i], "failure"]);
550 }
551 });
552 }
553 },
554 Err(err) => {
555 error!(error = ?err, "Error occurred while processing request");
556 method_names.iter().for_each(|_m| {
557 #[cfg(all(feature = "prometheus", not(test)))]
558 METRIC_COUNT_RPC_CALLS.increment(&[_m, "failure"]);
559 });
560 }
561 };
562
563 res
564 })
565 }
566}
567
568#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
570pub struct RequestorResponseSnapshot {
571 id: usize,
572 request: String,
573 response: String,
574}
575
576#[derive(Debug, Clone)]
583pub struct SnapshotRequestor {
584 next_id: Arc<AtomicUsize>,
585 entries: moka::future::Cache<String, RequestorResponseSnapshot>,
586 file: String,
587 aggressive_save: bool,
588 fail_on_miss: bool,
589 ignore_snapshot: bool,
590}
591
592impl SnapshotRequestor {
593 pub fn new(snapshot_file: &str) -> Self {
600 Self {
601 next_id: Arc::new(AtomicUsize::new(1)),
602 entries: moka::future::Cache::builder().build(),
603 file: snapshot_file.to_owned(),
604 aggressive_save: false,
605 fail_on_miss: false,
606 ignore_snapshot: false,
607 }
608 }
609
610 pub fn snapshot_path(&self) -> &str {
612 &self.file
613 }
614
615 pub fn clear(&self) {
618 self.entries.invalidate_all();
619 self.next_id.store(1, Ordering::Relaxed);
620 }
621
622 pub async fn try_load(&mut self, fail_on_miss: bool) -> Result<(), std::io::Error> {
626 if self.ignore_snapshot {
627 return Ok(());
628 }
629
630 let loaded = serde_yaml::from_reader::<_, Vec<RequestorResponseSnapshot>>(std::fs::File::open(&self.file)?)
631 .map_err(std::io::Error::other)?;
632
633 self.clear();
634
635 let loaded_len = futures::stream::iter(loaded)
636 .then(|entry| {
637 self.next_id.fetch_max(entry.id, Ordering::Relaxed);
638 self.entries.insert(entry.request.clone(), entry)
639 })
640 .collect::<Vec<_>>()
641 .await
642 .len();
643
644 if loaded_len > 0 {
645 self.fail_on_miss = fail_on_miss;
646 }
647
648 tracing::debug!("snapshot with {loaded_len} entries has been loaded from {}", &self.file);
649 Ok(())
650 }
651
652 pub async fn load(mut self, fail_on_miss: bool) -> Self {
656 let _ = self.try_load(fail_on_miss).await;
657 self
658 }
659
660 pub fn with_aggresive_save(mut self) -> Self {
664 self.aggressive_save = true;
665 self
666 }
667
668 pub fn with_ignore_snapshot(mut self, ignore_snapshot: bool) -> Self {
674 self.ignore_snapshot = ignore_snapshot;
675 self
676 }
677
678 pub fn save(&self) -> Result<(), std::io::Error> {
683 if self.ignore_snapshot {
684 return Ok(());
685 }
686
687 let mut values: Vec<RequestorResponseSnapshot> = self.entries.iter().map(|(_, r)| r).collect();
688 values.sort_unstable_by_key(|a| a.id);
689
690 let mut writer = BufWriter::new(std::fs::File::create(&self.file)?);
691
692 serde_yaml::to_writer(&mut writer, &values).map_err(std::io::Error::other)?;
693
694 writer.flush()?;
695
696 tracing::debug!("snapshot with {} entries saved to file {}", values.len(), self.file);
697 Ok(())
698 }
699}
700
701impl Drop for SnapshotRequestor {
702 fn drop(&mut self) {
703 if let Err(e) = self.save() {
704 tracing::error!("failed to save snapshot: {e}");
705 }
706 }
707}
708
709#[derive(Debug, Clone)]
710pub struct SnapshotRequestorLayer {
711 snapshot_requestor: SnapshotRequestor,
712}
713
714impl SnapshotRequestorLayer {
715 pub fn new(snapshot_file: &str) -> Self {
716 Self {
717 snapshot_requestor: SnapshotRequestor::new(snapshot_file),
718 }
719 }
720
721 pub fn from_requestor(snapshot_requestor: SnapshotRequestor) -> Self {
722 Self { snapshot_requestor }
723 }
724}
725#[derive(Debug, Clone)]
726pub struct SnapshotRequestorService<S> {
727 inner: S,
728 snapshot_requestor: SnapshotRequestor,
729}
730
731impl<S> Layer<S> for SnapshotRequestorLayer {
733 type Service = SnapshotRequestorService<S>;
734
735 fn layer(&self, inner: S) -> Self::Service {
736 SnapshotRequestorService {
737 inner,
738 snapshot_requestor: self.snapshot_requestor.clone(),
739 }
740 }
741}
742
743impl<S> Service<RequestPacket> for SnapshotRequestorService<S>
745where
746 S: Service<RequestPacket, Future = TransportFut<'static>, Error = TransportError> + Send + 'static + Clone,
747 S::Error: Send + 'static + Debug,
748{
749 type Error = TransportError;
750 type Future = TransportFut<'static>;
751 type Response = ResponsePacket;
752
753 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
754 self.inner.poll_ready(cx)
755 }
756
757 fn call(&mut self, request: RequestPacket) -> Self::Future {
758 let mut inner = self.inner.clone(); let snapshot_requestor = self.snapshot_requestor.clone(); let future = inner.call(request.clone()); Box::pin(async move {
764 let res = future.await;
765
766 let request_string = serde_json::to_string(&request)
767 .map_err(|e| TransportErrorKind::Custom(format!("serialize error: {e}").into()))?;
768
769 let inserted = AtomicBool::new(false);
770
771 let _result = snapshot_requestor
772 .entries
773 .entry(request_string.clone())
774 .or_try_insert_with(async {
775 if snapshot_requestor.fail_on_miss {
776 tracing::error!("{request_string} is missing in {}", &snapshot_requestor.file);
777 return Err(TransportErrorKind::http_error(
778 http::StatusCode::NOT_FOUND.into(),
779 "".into(),
780 ));
781 }
782
783 let response_string = match &res {
784 Ok(result) => match result {
785 ResponsePacket::Single(resp) => match &resp.payload {
786 ResponsePayload::Success(success_payload) => success_payload.to_string(),
787 ResponsePayload::Failure(e) => {
788 return Err(TransportErrorKind::Custom(format!("RPC error: {e}").into()).into());
789 }
790 },
791 ResponsePacket::Batch(batch) => {
792 let mut responses = Vec::with_capacity(batch.len());
793 for (i, resp) in batch.iter().enumerate() {
794 match &resp.payload {
795 ResponsePayload::Success(success_payload) => {
796 responses.push(success_payload.to_string());
797 }
798 ResponsePayload::Failure(e) => {
799 return Err(TransportErrorKind::Custom(
800 format!("RPC error in batch item #{i}: {e}").into(),
801 )
802 .into());
803 }
804 }
805 }
806 responses.join(", ")
807 }
808 },
809 Err(err) => {
810 error!(error = ?err, "Error occurred while processing request");
811 return Err(TransportErrorKind::Custom(
812 format!("Error occurred while processing request: {err}").into(),
813 )
814 .into());
815 }
816 };
817
818 let id = snapshot_requestor.next_id.fetch_add(1, Ordering::SeqCst);
819 inserted.store(true, Ordering::Relaxed);
820 tracing::debug!("saved new snapshot entry #{id}");
821
822 Ok(RequestorResponseSnapshot {
823 id,
824 request: request_string.clone(),
825 response: response_string,
826 })
827 })
828 .await
829 .map(|e| e.into_value().response.into_bytes().into_boxed_slice())
830 .map_err(|e| TransportErrorKind::Custom(format!("{e}").into()))?;
831
832 if inserted.load(Ordering::Relaxed) && snapshot_requestor.aggressive_save {
833 tracing::debug!("{request_string} was NOT found and was resolved");
834 snapshot_requestor
835 .save()
836 .map_err(|e| TransportErrorKind::Custom(format!("{e}").into()))?;
837 } else {
838 tracing::debug!("{request_string} was found");
839 }
840
841 res
842 })
843 }
844}
845
846pub type AnvilRpcClient = FillProvider<
847 JoinFill<
848 JoinFill<Identity, JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>>,
849 WalletFiller<EthereumWallet>,
850 >,
851 RootProvider,
852>;
853#[cfg(not(target_arch = "wasm32"))]
855pub fn create_rpc_client_to_anvil(
856 anvil: &alloy::node_bindings::AnvilInstance,
857 signer: &hopr_crypto_types::keypairs::ChainKeypair,
858) -> Arc<AnvilRpcClient> {
859 use alloy::{
860 providers::ProviderBuilder, rpc::client::ClientBuilder, signers::local::PrivateKeySigner,
861 transports::http::ReqwestTransport,
862 };
863 use hopr_crypto_types::keypairs::Keypair;
864
865 let wallet = PrivateKeySigner::from_slice(signer.secret().as_ref()).expect("failed to construct wallet");
866
867 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
868
869 let rpc_client = ClientBuilder::default().transport(transport_client.clone(), transport_client.guess_local());
870
871 let provider = ProviderBuilder::new().wallet(wallet).connect_client(rpc_client);
872
873 Arc::new(provider)
874}
875
876#[cfg(test)]
877mod tests {
878 use std::time::Duration;
879
880 use alloy::{
881 network::TransactionBuilder,
882 primitives::{U256, address},
883 providers::{
884 Provider, ProviderBuilder,
885 fillers::{BlobGasFiller, CachedNonceManager, ChainIdFiller, GasFiller, NonceFiller},
886 },
887 rpc::{client::ClientBuilder, types::TransactionRequest},
888 signers::local::PrivateKeySigner,
889 transports::{http::ReqwestTransport, layers::RetryBackoffLayer},
890 };
891 use anyhow::Ok;
892 use hopr_async_runtime::prelude::sleep;
893 use hopr_chain_types::{ContractAddresses, ContractInstances, utils::create_anvil};
894 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair};
895 use hopr_primitive_types::primitives::Address;
896 use serde_json::json;
897 use tempfile::NamedTempFile;
898
899 use crate::client::{
900 DefaultRetryPolicy, GasOracleFiller, MetricsLayer, SnapshotRequestor, SnapshotRequestorLayer, ZeroRetryPolicy,
901 };
902
903 #[tokio::test]
904 async fn test_client_should_deploy_contracts_via_reqwest() -> anyhow::Result<()> {
905 let anvil = create_anvil(None);
906 let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
907 let signer_chain_key = ChainKeypair::from_secret(signer.to_bytes().as_ref())?;
908
909 let rpc_client = ClientBuilder::default().http(anvil.endpoint_url());
910
911 let provider = ProviderBuilder::new().wallet(signer).connect_client(rpc_client);
912
913 let contracts = ContractInstances::deploy_for_testing(provider.clone(), &signer_chain_key)
914 .await
915 .expect("deploy failed");
916
917 let contract_addrs = ContractAddresses::from(&contracts);
918
919 assert_ne!(contract_addrs.token, Address::default());
920 assert_ne!(contract_addrs.channels, Address::default());
921 assert_ne!(contract_addrs.announcements, Address::default());
922 assert_ne!(contract_addrs.network_registry, Address::default());
923 assert_ne!(contract_addrs.node_safe_registry, Address::default());
924 assert_ne!(contract_addrs.ticket_price_oracle, Address::default());
925
926 Ok(())
927 }
928
929 #[tokio::test]
930 async fn test_client_should_get_block_number() -> anyhow::Result<()> {
931 let block_time = Duration::from_millis(1100);
932
933 let anvil = create_anvil(Some(block_time));
934 let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
935
936 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
937
938 let rpc_client = ClientBuilder::default().transport(transport_client.clone(), transport_client.guess_local());
939
940 let provider = ProviderBuilder::new().wallet(signer).connect_client(rpc_client);
941
942 let mut last_number = 0;
943
944 for _ in 0..3 {
945 sleep(block_time).await;
946
947 let num = provider.get_block_number().await?;
948
949 assert!(num > last_number, "next block number must be greater");
950 last_number = num;
951 }
952
953 Ok(())
954 }
955
956 #[tokio::test]
957 async fn test_client_should_get_block_number_with_metrics_without_retry() -> anyhow::Result<()> {
958 let block_time = Duration::from_secs(1);
959
960 let anvil = create_anvil(Some(block_time));
961 let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
962
963 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
964
965 let retry_layer = RetryBackoffLayer::new(2, 100, 100);
967
968 let rpc_client = ClientBuilder::default()
969 .layer(retry_layer)
970 .layer(MetricsLayer)
971 .transport(transport_client.clone(), transport_client.guess_local());
972
973 let provider = ProviderBuilder::new().wallet(signer).connect_client(rpc_client);
974
975 let mut last_number = 0;
976
977 for _ in 0..3 {
978 sleep(block_time).await;
979
980 let num = provider.get_block_number().await?;
981
982 assert!(num > last_number, "next block number must be greater");
983 last_number = num;
984 }
985
986 Ok(())
994 }
995
996 #[tokio::test]
997 async fn test_client_should_fail_on_malformed_request() -> anyhow::Result<()> {
998 let anvil = create_anvil(None);
999 let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
1000
1001 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1002
1003 let rpc_client = ClientBuilder::default().transport(transport_client.clone(), transport_client.guess_local());
1004
1005 let provider = ProviderBuilder::new().wallet(signer).connect_client(rpc_client);
1006
1007 let err = provider
1008 .raw_request::<(), alloy::primitives::U64>("eth_blockNumber_bla".into(), ())
1009 .await
1010 .expect_err("expected error");
1011
1012 assert!(matches!(err, alloy::transports::RpcError::ErrorResp(..)));
1013
1014 Ok(())
1015 }
1016
1017 #[tokio::test]
1018 async fn test_client_should_fail_on_malformed_response() {
1019 let mut server = mockito::Server::new_async().await;
1020
1021 let m = server
1022 .mock("POST", "/")
1023 .with_status(200)
1024 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1025 .with_body("}malformed{")
1026 .expect(1)
1027 .create();
1028
1029 let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1030
1031 let rpc_client = ClientBuilder::default()
1032 .layer(RetryBackoffLayer::new(2, 100, 100))
1033 .transport(transport_client.clone(), transport_client.guess_local());
1034
1035 let provider = ProviderBuilder::new().connect_client(rpc_client);
1036
1037 let err = provider
1038 .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1039 .await
1040 .expect_err("expected error");
1041
1042 m.assert();
1043 assert!(matches!(
1044 err,
1045 alloy::transports::RpcError::DeserError { err: _, text: _ }
1046 ));
1047 }
1048
1049 #[tokio::test]
1050 async fn test_client_should_retry_on_http_error() {
1051 let mut server = mockito::Server::new_async().await;
1052
1053 let too_many_requests: u16 = http::StatusCode::TOO_MANY_REQUESTS.as_u16();
1054
1055 let m = server
1056 .mock("POST", "/")
1057 .with_status(too_many_requests as usize)
1058 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1059 .with_body("{}")
1060 .expect(3)
1061 .create();
1062
1063 let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1064
1065 let rpc_client = ClientBuilder::default()
1066 .layer(RetryBackoffLayer::new(2, 100, 100))
1067 .transport(transport_client.clone(), transport_client.guess_local());
1068
1069 let provider = ProviderBuilder::new().connect_client(rpc_client);
1082
1083 let err = provider
1084 .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1085 .await
1086 .expect_err("expected error");
1087
1088 m.assert();
1089 assert!(matches!(err, alloy::transports::RpcError::Transport(..)));
1090
1091 }
1098
1099 #[tokio::test]
1100 async fn test_client_should_not_retry_with_zero_retry_policy() {
1101 let mut server = mockito::Server::new_async().await;
1102
1103 let m = server
1104 .mock("POST", "/")
1105 .with_status(404)
1106 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1107 .with_body("{}")
1108 .expect(1)
1109 .create();
1110
1111 let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1112
1113 let rpc_client = ClientBuilder::default()
1114 .layer(RetryBackoffLayer::new_with_policy(2, 100, 100, ZeroRetryPolicy))
1115 .transport(transport_client.clone(), transport_client.guess_local());
1116
1117 let provider = ProviderBuilder::new().connect_client(rpc_client);
1118
1119 let err = provider
1120 .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1121 .await
1122 .expect_err("expected error");
1123
1124 m.assert();
1125 assert!(matches!(err, alloy::transports::RpcError::Transport(..)));
1126 }
1133
1134 #[tokio::test]
1135 async fn test_client_should_retry_on_json_rpc_error() {
1136 let mut server = mockito::Server::new_async().await;
1137
1138 let m = server
1139 .mock("POST", "/")
1140 .with_status(200)
1141 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1142 .with_body(
1143 r#"{
1144 "jsonrpc": "2.0",
1145 "id": 1,
1146 "error": {
1147 "message": "some message",
1148 "code": -32603
1149 }
1150 }"#,
1151 )
1152 .expect(3)
1153 .create();
1154
1155 let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1156
1157 let simple_json_rpc_retry_policy = DefaultRetryPolicy {
1158 initial_backoff: Duration::from_millis(100),
1159 retryable_json_rpc_errors: vec![-32603],
1160 ..DefaultRetryPolicy::default()
1161 };
1162 let rpc_client = ClientBuilder::default()
1163 .layer(RetryBackoffLayer::new_with_policy(
1164 2,
1165 100,
1166 100,
1167 simple_json_rpc_retry_policy,
1168 ))
1169 .transport(transport_client.clone(), transport_client.guess_local());
1170
1171 let provider = ProviderBuilder::new().connect_client(rpc_client);
1172
1173 let err = provider
1174 .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1175 .await
1176 .expect_err("expected error");
1177
1178 m.assert();
1179 assert!(matches!(err, alloy::transports::RpcError::Transport(..)));
1180 }
1187
1188 #[tokio::test]
1189 async fn test_client_should_not_retry_on_nonretryable_json_rpc_error() {
1190 let mut server = mockito::Server::new_async().await;
1191
1192 let m = server
1193 .mock("POST", "/")
1194 .with_status(200)
1195 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1196 .with_body(
1197 r#"{
1198 "jsonrpc": "2.0",
1199 "id": 1,
1200 "error": {
1201 "message": "some message",
1202 "code": -32000
1203 }
1204 }"#,
1205 )
1206 .expect(1)
1207 .create();
1208
1209 let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1210
1211 let simple_json_rpc_retry_policy = DefaultRetryPolicy {
1212 initial_backoff: Duration::from_millis(100),
1213 retryable_json_rpc_errors: vec![],
1214 ..DefaultRetryPolicy::default()
1215 };
1216 let rpc_client = ClientBuilder::default()
1217 .layer(RetryBackoffLayer::new_with_policy(
1218 2,
1219 100,
1220 100,
1221 simple_json_rpc_retry_policy,
1222 ))
1223 .transport(transport_client.clone(), transport_client.guess_local());
1224
1225 let provider = ProviderBuilder::new().connect_client(rpc_client);
1226
1227 let err = provider
1228 .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1229 .await
1230 .expect_err("expected error");
1231
1232 m.assert();
1233 assert!(matches!(err, alloy::transports::RpcError::ErrorResp(..)));
1234
1235 }
1242
1243 #[tokio::test]
1244 async fn test_client_should_retry_on_nonretryable_json_rpc_error_if_min_retries_is_given() {
1245 let mut server = mockito::Server::new_async().await;
1246
1247 let _m = server
1248 .mock("POST", "/")
1249 .with_status(200)
1250 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1251 .with_body(
1252 r#"{
1253 "jsonrpc": "2.0",
1254 "id": 1,
1255 "error": {
1256 "message": "some message",
1257 "code": -32000
1258 }
1259 }"#,
1260 )
1261 .expect(2)
1262 .create();
1263
1264 let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1265
1266 let simple_json_rpc_retry_policy = DefaultRetryPolicy {
1267 initial_backoff: Duration::from_millis(100),
1268 retryable_json_rpc_errors: vec![],
1269 min_retries: Some(1),
1270 ..DefaultRetryPolicy::default()
1271 };
1272 let rpc_client = ClientBuilder::default()
1273 .layer(RetryBackoffLayer::new_with_policy(
1274 2,
1275 100,
1276 100,
1277 simple_json_rpc_retry_policy,
1278 ))
1279 .transport(transport_client.clone(), transport_client.guess_local());
1280
1281 let provider = ProviderBuilder::new().connect_client(rpc_client);
1282
1283 let err = provider
1284 .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1285 .await
1286 .expect_err("expected error");
1287
1288 assert!(matches!(err, alloy::transports::RpcError::ErrorResp(..)));
1291
1292 }
1299
1300 #[tokio::test]
1301 async fn test_client_should_retry_on_malformed_json_rpc_error() {
1302 let mut server = mockito::Server::new_async().await;
1303
1304 let m = server
1305 .mock("POST", "/")
1306 .with_status(200)
1307 .match_body(mockito::Matcher::PartialJson(json!({"method": "eth_blockNumber"})))
1308 .with_body(
1309 r#"{
1310 "jsonrpc": "2.0",
1311 "error": {
1312 "message": "some message",
1313 "code": -32600
1314 }
1315 }"#,
1316 )
1317 .expect(3)
1318 .create();
1319
1320 let transport_client = ReqwestTransport::new(url::Url::parse(&server.url()).unwrap());
1321
1322 let simple_json_rpc_retry_policy = DefaultRetryPolicy {
1323 initial_backoff: Duration::from_millis(100),
1324 retryable_json_rpc_errors: vec![-32600],
1325 min_retries: Some(1),
1326 ..DefaultRetryPolicy::default()
1327 };
1328 let rpc_client = ClientBuilder::default()
1329 .layer(RetryBackoffLayer::new_with_policy(
1330 2,
1331 100,
1332 100,
1333 simple_json_rpc_retry_policy,
1334 ))
1335 .transport(transport_client.clone(), transport_client.guess_local());
1336
1337 let provider = ProviderBuilder::new().connect_client(rpc_client);
1338
1339 let err = provider
1340 .raw_request::<(), alloy::primitives::U64>("eth_blockNumber".into(), ())
1341 .await
1342 .expect_err("expected error");
1343
1344 m.assert();
1345 assert!(matches!(err, alloy::transports::RpcError::Transport(..)));
1346
1347 }
1354
1355 #[test_log::test(tokio::test)]
1356 async fn test_client_from_file() -> anyhow::Result<()> {
1357 let block_time = Duration::from_millis(1100);
1358 let snapshot_file = NamedTempFile::new()?;
1359
1360 let anvil = create_anvil(Some(block_time));
1361
1362 {
1363 let mut last_number = 0;
1364
1365 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1366
1367 let rpc_client = ClientBuilder::default()
1368 .layer(RetryBackoffLayer::new_with_policy(
1369 2,
1370 100,
1371 100,
1372 DefaultRetryPolicy::default(),
1373 ))
1374 .layer(SnapshotRequestorLayer::new(snapshot_file.path().to_str().unwrap()))
1375 .transport(transport_client.clone(), transport_client.guess_local());
1376
1377 let provider = ProviderBuilder::new().connect_client(rpc_client);
1378
1379 for _ in 0..3 {
1380 sleep(block_time).await;
1381
1382 let num = provider.get_block_number().await?;
1383
1384 assert!(num > last_number, "next block number must be greater");
1385 last_number = num;
1386 }
1387 }
1388
1389 {
1390 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1391
1392 let snapshot_requestor = SnapshotRequestor::new(snapshot_file.path().to_str().unwrap())
1393 .load(true)
1394 .await;
1395
1396 let rpc_client = ClientBuilder::default()
1397 .layer(RetryBackoffLayer::new_with_policy(
1398 2,
1399 100,
1400 100,
1401 DefaultRetryPolicy::default(),
1402 ))
1403 .layer(SnapshotRequestorLayer::from_requestor(snapshot_requestor))
1404 .transport(transport_client.clone(), transport_client.guess_local());
1405
1406 let provider = ProviderBuilder::new().connect_client(rpc_client);
1407
1408 let mut last_number = 0;
1409 for _ in 0..3 {
1410 sleep(block_time).await;
1411
1412 let num = provider.get_block_number().await?;
1413
1414 assert!(num > last_number, "next block number must be greater");
1415 last_number = num;
1416 }
1417 }
1418
1419 Ok(())
1420 }
1421
1422 #[tokio::test]
1423 async fn test_client_should_call_on_gas_oracle_for_eip1559_tx() -> anyhow::Result<()> {
1424 let _ = env_logger::builder().is_test(true).try_init();
1425
1426 let mut server = mockito::Server::new_async().await;
1427
1428 let m = server
1429 .mock("GET", "/gasapi.ashx?apikey=key&method=gasoracle")
1430 .with_status(http::StatusCode::ACCEPTED.as_u16().into())
1431 .with_body(r#"{"status":"1","message":"OK","result":{"LastBlock":"39864926","SafeGasPrice":"1.1","ProposeGasPrice":"1.1","FastGasPrice":"1.6","UsdPrice":"0.999968207972734"}}"#)
1432 .expect(0)
1433 .create();
1434
1435 let anvil = create_anvil(None);
1436 let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
1437
1438 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1439 let rpc_client = ClientBuilder::default()
1442 .layer(RetryBackoffLayer::new(2, 100, 100))
1443 .transport(transport_client.clone(), transport_client.guess_local());
1444
1445 let provider = ProviderBuilder::new()
1446 .disable_recommended_fillers()
1447 .wallet(signer)
1448 .filler(NonceFiller::new(CachedNonceManager::default()))
1449 .filler(GasOracleFiller::new(
1450 transport_client.client().clone(),
1451 Some((server.url() + "/gasapi.ashx?apikey=key&method=gasoracle").parse()?),
1452 ))
1453 .filler(GasFiller)
1454 .connect_client(rpc_client);
1455
1456 let tx = TransactionRequest::default()
1457 .with_chain_id(provider.get_chain_id().await?)
1458 .to(address!("d8dA6BF26964aF9D7eEd9e03E53415D37aA96045"))
1459 .value(U256::from(100))
1460 .transaction_type(2);
1461
1462 let receipt = provider.send_transaction(tx).await?.get_receipt().await?;
1463
1464 m.assert();
1465 assert_eq!(receipt.gas_used, 21000);
1466 Ok(())
1467 }
1468
1469 #[tokio::test]
1470 async fn test_client_should_call_on_gas_oracle_for_legacy_tx() -> anyhow::Result<()> {
1471 let _ = env_logger::builder().is_test(true).try_init();
1472
1473 let mut server = mockito::Server::new_async().await;
1474
1475 let m = server
1476 .mock("GET", "/gasapi.ashx?apikey=key&method=gasoracle")
1477 .with_status(http::StatusCode::ACCEPTED.as_u16().into())
1478 .with_body(r#"{"status":"1","message":"OK","result":{"LastBlock":"39864926","SafeGasPrice":"1.1","ProposeGasPrice":"3.5","FastGasPrice":"1.6","UsdPrice":"0.999968207972734"}}"#)
1479 .expect(1)
1480 .create();
1481
1482 let anvil = create_anvil(None);
1483 let signer: PrivateKeySigner = anvil.keys()[0].clone().into();
1484
1485 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
1486
1487 let rpc_client = ClientBuilder::default()
1488 .layer(RetryBackoffLayer::new(2, 100, 100))
1489 .transport(transport_client.clone(), transport_client.guess_local());
1490
1491 let provider = ProviderBuilder::new()
1492 .disable_recommended_fillers()
1493 .wallet(signer)
1494 .filler(ChainIdFiller::default())
1495 .filler(NonceFiller::new(CachedNonceManager::default()))
1496 .filler(GasOracleFiller::new(
1497 transport_client.client().clone(),
1498 Some((server.url() + "/gasapi.ashx?apikey=key&method=gasoracle").parse()?),
1499 ))
1500 .filler(GasFiller)
1501 .filler(BlobGasFiller)
1502 .connect_client(rpc_client);
1503
1504 let tx = TransactionRequest::default()
1506 .with_to(address!("d8dA6BF26964aF9D7eEd9e03E53415D37aA96045"))
1507 .with_value(U256::from(100))
1508 .with_gas_price(1000000000);
1509
1510 let receipt = provider.send_transaction(tx).await?.get_receipt().await?;
1511
1512 m.assert();
1513 assert_eq!(receipt.gas_used, 21000);
1514 Ok(())
1515 }
1516}