hoprd_localcluster/
client_helper.rs1use std::process::Child;
2
3use anyhow::{Context, Result};
4use futures::future::try_join_all;
5use hopr_lib::HoprBalance;
6use hoprd_api_client;
7use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue};
8
9#[derive(Debug, Clone)]
10pub struct HoprdApiClient {
11 inner: hoprd_api_client::Client,
12}
13
14impl HoprdApiClient {
15 pub fn new(base_url: String, token: Option<String>) -> Result<Self> {
16 let mut headers = HeaderMap::new();
17 if let Some(token) = token {
18 let value = format!("Bearer {token}");
19 headers.insert(
20 AUTHORIZATION,
21 HeaderValue::from_str(&value).context("invalid api token")?,
22 );
23 }
24
25 let http_client = reqwest::ClientBuilder::new()
26 .timeout(std::time::Duration::from_secs(10))
27 .default_headers(headers)
28 .build()
29 .context("failed to build http client")?;
30
31 Ok(Self {
32 inner: hoprd_api_client::Client::new_with_client(base_url.as_ref(), http_client),
33 })
34 }
35
36 pub async fn wait_started(&self, timeout: std::time::Duration) -> Result<()> {
37 self.wait_status("/startedz", timeout).await
38 }
39
40 pub async fn wait_ready(&self, timeout: std::time::Duration) -> Result<()> {
41 self.wait_status("/readyz", timeout).await
42 }
43
44 async fn wait_status(&self, path: &str, timeout: std::time::Duration) -> Result<()> {
45 let start = std::time::Instant::now();
46 loop {
47 let ready = match path {
48 "/startedz" => self.inner.startedz().await,
49 "/readyz" => self.inner.readyz().await,
50 _ => anyhow::bail!("unknown status path: {path}"),
51 };
52 if ready.is_ok() {
53 return Ok(());
54 }
55
56 if start.elapsed() > timeout {
57 anyhow::bail!("timeout while waiting for {}", path);
58 }
59
60 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
61 }
62 }
63
64 pub async fn addresses(&self) -> Result<String> {
65 let response = self.inner.addresses().await?;
66 Ok(response.into_inner().native)
67 }
68
69 pub async fn open_channel(&self, destination: &str, amount: &str) -> Result<()> {
70 let req = hoprd_api_client::types::OpenChannelBodyRequest {
71 amount: amount.to_string(),
72 destination: destination.to_string(),
73 };
74 let _ = self.inner.open_channel(&req).await?;
75 Ok(())
76 }
77}
78
79pub struct NodeProcess {
80 pub id: usize,
81 pub api_port: u16,
82 pub p2p_port: u16,
83 pub api: HoprdApiClient,
84 pub child: Child,
85 pub address: Option<String>,
86}
87
88pub async fn open_full_mesh_channels(nodes: &[NodeProcess], amount: &HoprBalance) -> Result<()> {
89 let amount = amount.to_string();
90 let mut tasks = Vec::new();
91 for src in nodes {
92 let Some(src_addr) = src.address.clone() else {
93 anyhow::bail!("node {} address missing", src.id);
94 };
95 for dst in nodes {
96 let Some(dst_addr) = dst.address.clone() else {
97 anyhow::bail!("node {} address missing", dst.id);
98 };
99 if src_addr == dst_addr {
100 continue;
101 }
102 let api = src.api.clone();
103 let amount = amount.clone();
104 tasks.push(async move { api.open_channel(&dst_addr, &amount).await });
105 }
106 }
107
108 try_join_all(tasks).await.context("failed to open channels")?;
109 Ok(())
110}