1#[cfg(feature = "session-server")]
2pub mod exit;
3
4#[cfg(feature = "session-server")]
5pub mod config;
6
7#[cfg(any(feature = "testing", test))]
8pub mod testing;
9
10use std::sync::Arc;
11
12#[cfg(feature = "runtime-tokio")]
13pub use hopr_lib;
14use hopr_ticket_manager::{HoprTicketManager, RedbStore, RedbTicketQueue};
15#[cfg(feature = "runtime-tokio")]
16use {
17 hopr_chain_connector::{
18 BlockchainConnectorConfig, HoprBlockchainSafeConnector,
19 api::HoprChainApi,
20 blokli_client::{BlokliClient, BlokliClientConfig},
21 create_trustful_hopr_blokli_connector,
22 },
23 hopr_lib::builder::{ChainKeypair, Keypair, OffchainKeypair},
24 hopr_lib::{Hopr, config::HoprLibConfig},
25 hopr_network_graph::{ChannelGraph, SharedChannelGraph},
26 hopr_transport_p2p::{HoprLibp2pNetworkBuilder, HoprNetwork},
27 validator::Validate,
28};
29
30#[cfg(feature = "session-server")]
31use crate::{config::SessionIpForwardingConfig, exit::HoprServerIpForwardingReactor};
32
33pub type SharedTicketManager = Arc<HoprTicketManager<RedbStore, RedbTicketQueue>>;
35
36#[cfg(feature = "runtime-tokio")]
38pub type ReferenceHopr =
39 Hopr<Arc<HoprBlockchainSafeConnector<BlokliClient>>, SharedChannelGraph, HoprNetwork, SharedTicketManager>;
40
41#[cfg(feature = "runtime-tokio")]
43pub async fn build_reference(
44 identity: (&ChainKeypair, &OffchainKeypair),
45 config: HoprLibConfig,
46 blokli_url: String,
47 #[cfg(feature = "session-server")] server_config: SessionIpForwardingConfig,
48) -> anyhow::Result<Arc<ReferenceHopr>> {
49 let (chain_key, packet_key) = identity;
50
51 let mut chain_connector = create_trustful_hopr_blokli_connector(
52 chain_key,
53 BlockchainConnectorConfig {
54 connection_sync_timeout: std::time::Duration::from_mins(1),
55 sync_tolerance: 90,
56 tx_timeout_multiplier: std::env::var("HOPR_TX_TIMEOUT_MULTIPLIER")
57 .ok()
58 .and_then(|p| {
59 p.parse()
60 .inspect_err(|error| tracing::warn!(%error, "failed to parse HOPR_TX_TIMEOUT_MULTIPLIER"))
61 .ok()
62 })
63 .unwrap_or_else(|| BlockchainConnectorConfig::default().tx_timeout_multiplier),
64 },
65 BlokliClient::new(
66 blokli_url.parse()?,
67 BlokliClientConfig {
68 timeout: std::time::Duration::from_secs(30),
69 stream_reconnect_timeout: std::time::Duration::from_secs(30),
70 subscription_stream_restart_delay: Some(std::time::Duration::from_secs(1)),
71 ..Default::default()
72 },
73 ),
74 config.safe_module.module_address,
75 )
76 .await?;
77 chain_connector.connect().await?;
78 let chain_connector = Arc::new(chain_connector);
79
80 #[cfg(feature = "session-server")]
81 let session_server = HoprServerIpForwardingReactor::new(packet_key.clone(), server_config);
82
83 build_with_chain(
84 chain_key,
85 packet_key,
86 config,
87 None,
88 chain_connector,
89 #[cfg(feature = "session-server")]
90 session_server,
91 )
92 .await
93}
94
95#[cfg(feature = "runtime-tokio")]
98pub async fn build_with_chain<
99 Chain,
100 #[cfg(feature = "session-server")] Srv: hopr_lib::api::node::HoprSessionServer<Session = hopr_lib::IncomingSession, Error: std::fmt::Display>
101 + Clone
102 + Send
103 + 'static,
104>(
105 chain_key: &ChainKeypair,
106 packet_key: &OffchainKeypair,
107 config: HoprLibConfig,
108 probe_cfg: Option<hopr_ct_full_network::ProberConfig>,
109 chain_connector: Chain,
110 #[cfg(feature = "session-server")] server: Srv,
111) -> anyhow::Result<Arc<Hopr<Chain, SharedChannelGraph, HoprNetwork, SharedTicketManager>>>
112where
113 Chain: HoprChainApi + Clone + Send + Sync + 'static,
114{
115 if let Some(ref pcfg) = probe_cfg {
116 pcfg.validate()
117 .map_err(|e| anyhow::anyhow!("invalid ProberConfig: {e}"))?;
118 let probe_timeout = config.protocol.probe.timeout;
119 anyhow::ensure!(
120 pcfg.interval >= probe_timeout,
121 "ProberConfig interval ({:?}) must be >= ProbeConfig timeout ({:?})",
122 pcfg.interval,
123 probe_timeout,
124 );
125 }
126
127 let backend = RedbStore::new_temp().map_err(hopr_ticket_manager::TicketManagerError::store)?;
128 let (ticket_manager, ticket_factory) = HoprTicketManager::new_with_factory(backend);
129 let ticket_manager = Arc::new(ticket_manager);
130 let ticket_factory = Arc::new(ticket_factory);
131
132 {
134 use futures::StreamExt;
135 use hopr_lib::api::chain::ChannelSelector;
136
137 let me = chain_connector.me();
138 let incoming_channels: Vec<_> = chain_connector
139 .stream_channels(ChannelSelector::default().with_destination(*me))
140 .map_err(|e| anyhow::anyhow!("failed to stream incoming channels: {e}"))?
141 .collect()
142 .await;
143 ticket_manager
144 .sync_from_incoming_channels(&incoming_channels)
145 .map_err(|e| anyhow::anyhow!("failed to sync ticket manager: {e}"))?;
146
147 let outgoing_channels: Vec<_> = chain_connector
148 .stream_channels(ChannelSelector::default().with_source(*me))
149 .map_err(|e| anyhow::anyhow!("failed to stream outgoing channels: {e}"))?
150 .collect()
151 .await;
152 ticket_factory
153 .sync_from_outgoing_channels(&outgoing_channels)
154 .map_err(|e| anyhow::anyhow!("failed to sync ticket factory: {e}"))?;
155 }
156
157 let (peer_discovery_tx, peer_discovery_rx) = futures::channel::mpsc::channel(2048);
159 {
160 use futures::{SinkExt, StreamExt};
161 use hopr_lib::api::chain::StateSyncOptions;
162 let chain_events = chain_connector
163 .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])
164 .map_err(|e| anyhow::anyhow!("failed to subscribe to chain events: {e}"))?;
165 let tx = peer_discovery_tx;
166 tokio::spawn(async move {
167 chain_events
168 .for_each(|event| {
169 let mut tx = tx.clone();
170 async move {
171 if let hopr_lib::api::types::chain::chain_events::ChainEvent::Announcement(account) = event {
172 let peer_id: hopr_lib::api::PeerId = account.public_key.into();
173 if let Err(error) = tx
174 .send(hopr_transport_p2p::PeerDiscovery::Announce(
175 peer_id,
176 account.get_multiaddrs().to_vec(),
177 ))
178 .await
179 {
180 tracing::error!(%peer_id, %error, "failed to send peer discovery event");
181 }
182 }
183 }
184 })
185 .await;
186 });
187 }
188
189 let prober_cfg = probe_cfg.unwrap_or_default();
190 let graph: SharedChannelGraph = Arc::new(ChannelGraph::new(*packet_key.public()));
191 let graph_for_ct = graph.clone();
192
193 let safe_address = config.safe_module.safe_address;
194 let module_address = config.safe_module.module_address;
195
196 let builder = hopr_lib::builder::HoprBuilder::new()
197 .with_identity(chain_key, packet_key)
198 .with_config(config)
199 .with_safe_module(&safe_address, &module_address)
200 .with_chain_api(move |_ctx| chain_connector)
201 .with_graph(move |_ctx| graph)
202 .with_network(move |ctx| {
203 Box::pin(async move {
204 let multiaddresses = vec![
205 (&ctx.cfg.host)
206 .try_into()
207 .expect("host config must be a valid multiaddress"),
208 ];
209 let nb = HoprLibp2pNetworkBuilder::new(peer_discovery_rx);
210 nb.build(
211 &ctx.packet_key,
212 multiaddresses,
213 "/hopr/mix/1.1.0",
214 ctx.cfg.protocol.transport.prefer_local_addresses,
215 )
216 .await
217 .expect("network must be constructible")
218 })
219 })
220 .with_cover_traffic(move |ctx| {
221 hopr_ct_full_network::FullNetworkDiscovery::new(*ctx.packet_key.public(), prober_cfg, graph_for_ct)
222 });
223
224 #[cfg(feature = "session-server")]
225 let builder = builder.with_session_server(server);
226
227 let node = builder.build_full(ticket_manager, ticket_factory).await?;
228
229 Ok(Arc::new(node))
230}