Skip to main content

hoprd_localcluster/
client_helper.rs

1use std::process::Child;
2
3use anyhow::{Context, Result};
4use futures::future::try_join_all;
5use hopr_lib::HoprBalance;
6use hoprd_api_client;
7use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue};
8
9#[derive(Debug, Clone)]
10pub struct HoprdApiClient {
11    inner: hoprd_api_client::Client,
12}
13
14impl HoprdApiClient {
15    pub fn new(base_url: String, token: Option<String>) -> Result<Self> {
16        let mut headers = HeaderMap::new();
17        if let Some(token) = token {
18            let value = format!("Bearer {token}");
19            headers.insert(
20                AUTHORIZATION,
21                HeaderValue::from_str(&value).context("invalid api token")?,
22            );
23        }
24
25        let http_client = reqwest::ClientBuilder::new()
26            .timeout(std::time::Duration::from_secs(10))
27            .default_headers(headers)
28            .build()
29            .context("failed to build http client")?;
30
31        Ok(Self {
32            inner: hoprd_api_client::Client::new_with_client(base_url.as_ref(), http_client),
33        })
34    }
35
36    pub async fn wait_started(&self, timeout: std::time::Duration) -> Result<()> {
37        self.wait_status("/startedz", timeout).await
38    }
39
40    pub async fn wait_ready(&self, timeout: std::time::Duration) -> Result<()> {
41        self.wait_status("/readyz", timeout).await
42    }
43
44    async fn wait_status(&self, path: &str, timeout: std::time::Duration) -> Result<()> {
45        let start = std::time::Instant::now();
46        loop {
47            let ready = match path {
48                "/startedz" => self.inner.startedz().await,
49                "/readyz" => self.inner.readyz().await,
50                _ => anyhow::bail!("unknown status path: {path}"),
51            };
52            if ready.is_ok() {
53                return Ok(());
54            }
55
56            if start.elapsed() > timeout {
57                anyhow::bail!("timeout while waiting for {}", path);
58            }
59
60            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
61        }
62    }
63
64    pub async fn addresses(&self) -> Result<String> {
65        let response = self.inner.addresses().await?;
66        Ok(response.into_inner().native)
67    }
68
69    pub async fn open_channel(&self, destination: &str, amount: &str) -> Result<()> {
70        let req = hoprd_api_client::types::OpenChannelBodyRequest {
71            amount: amount.to_string(),
72            destination: destination.to_string(),
73        };
74        let _ = self.inner.open_channel(&req).await?;
75        Ok(())
76    }
77}
78
79pub struct NodeProcess {
80    pub id: usize,
81    pub api_port: u16,
82    pub p2p_port: u16,
83    pub api: HoprdApiClient,
84    pub child: Child,
85    pub address: Option<String>,
86}
87
88pub async fn open_full_mesh_channels(nodes: &[NodeProcess], amount: &HoprBalance) -> Result<()> {
89    let amount = amount.to_string();
90    let mut tasks = Vec::new();
91    for src in nodes {
92        let Some(src_addr) = src.address.clone() else {
93            anyhow::bail!("node {} address missing", src.id);
94        };
95        for dst in nodes {
96            let Some(dst_addr) = dst.address.clone() else {
97                anyhow::bail!("node {} address missing", dst.id);
98            };
99            if src_addr == dst_addr {
100                continue;
101            }
102            let api = src.api.clone();
103            let amount = amount.clone();
104            tasks.push(async move { api.open_channel(&dst_addr, &amount).await });
105        }
106    }
107
108    try_join_all(tasks).await.context("failed to open channels")?;
109    Ok(())
110}