1#[cfg(feature = "runtime-tokio")]
2use std::num::NonZeroU8;
3use std::sync::Arc;
4
5use dashmap::DashSet;
6use futures::{FutureExt, Stream, StreamExt, stream::BoxStream};
7use hopr_api::{Multiaddr, OffchainKeypair, network::BoxedProcessFn};
8use hopr_utils::network_types::prelude::is_public_address;
9use libp2p::{
10 autonat,
11 identity::PublicKey,
12 swarm::{NetworkInfo, SwarmEvent},
13};
14use tracing::{debug, error, info, trace, warn};
15
16use crate::{
17 HoprNetwork, HoprNetworkBehavior, HoprNetworkBehaviorEvent, PeerDiscovery, constants,
18 errors::Result,
19 utils::{replace_transport_with_unspecified, resolve_dns_if_any},
20};
21
22#[cfg(all(feature = "telemetry", not(test)))]
23lazy_static::lazy_static! {
24 static ref METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT: hopr_types::telemetry::SimpleGauge = hopr_types::telemetry::SimpleGauge::new(
25 "hopr_transport_p2p_active_connection_count",
26 "Number of currently active p2p connections as observed from libp2p events"
27 ).unwrap();
28 static ref METRIC_TRANSPORT_NAT_STATUS: hopr_types::telemetry::SimpleGauge = hopr_types::telemetry::SimpleGauge::new(
29 "hopr_transport_p2p_nat_status",
30 "Current NAT status as reported by libp2p autonat. 0=Unknown, 1=Public, 2=Private"
31 ).unwrap();
32 static ref METRIC_NETWORK_HEALTH: hopr_types::telemetry::SimpleGauge =
33 hopr_types::telemetry::SimpleGauge::new("hopr_network_health", "Connectivity health indicator").unwrap();
34}
35
36pub struct InactiveNetwork {
37 swarm: libp2p::Swarm<HoprNetworkBehavior>,
38}
39
40#[cfg(any(target_os = "android", target_os = "ios"))]
41fn swarm_dns_config() -> (libp2p::dns::ResolverConfig, libp2p::dns::ResolverOpts) {
42 (
43 libp2p::dns::ResolverConfig::cloudflare(),
44 libp2p::dns::ResolverOpts::default(),
45 )
46}
47
48impl InactiveNetwork {
52 #[cfg(feature = "runtime-tokio")]
53 pub async fn build(
54 me: libp2p::identity::Keypair,
55 external_discovery_events: BoxStream<'static, PeerDiscovery>,
56 ) -> Result<Self> {
57 let me_public: PublicKey = me.public();
58
59 let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
60 .with_tokio()
61 .with_tcp(
62 libp2p::tcp::Config::default().nodelay(true),
63 libp2p::noise::Config::new,
64 libp2p::yamux::Config::default,
67 )
68 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?;
69
70 #[cfg(feature = "transport-quic")]
71 let swarm = swarm.with_quic();
72
73 #[cfg(any(target_os = "android", target_os = "ios"))]
74 let swarm = {
75 let (dns_resolver_config, dns_resolver_opts) = swarm_dns_config();
76 swarm.with_dns_config(dns_resolver_config, dns_resolver_opts)
77 };
78
79 #[cfg(not(any(target_os = "android", target_os = "ios")))]
80 let swarm = swarm
81 .with_dns()
82 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?;
83
84 Ok(Self {
85 swarm: swarm
86 .with_behaviour(|_key| HoprNetworkBehavior::new(me_public, external_discovery_events))
87 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
88 .with_swarm_config(|cfg| {
89 cfg.with_dial_concurrency_factor(
90 NonZeroU8::new({
91 let v = std::env::var("HOPR_INTERNAL_LIBP2P_MAX_CONCURRENTLY_DIALED_PEER_COUNT")
92 .ok()
93 .and_then(|v| v.trim().parse::<u8>().ok())
94 .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_DIALED_PEER_COUNT);
95 v.max(1)
96 })
97 .expect("clamped to >= 1, will never fail"),
98 )
99 .with_max_negotiating_inbound_streams(
100 std::env::var("HOPR_INTERNAL_LIBP2P_MAX_NEGOTIATING_INBOUND_STREAM_COUNT")
101 .and_then(|v| v.parse::<usize>().map_err(|_e| std::env::VarError::NotPresent))
102 .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_NEGOTIATING_INBOUND_PEER_COUNT),
103 )
104 .with_idle_connection_timeout(
105 std::env::var("HOPR_INTERNAL_LIBP2P_SWARM_IDLE_TIMEOUT")
106 .and_then(|v| v.parse::<u64>().map_err(|_e| std::env::VarError::NotPresent))
107 .map(std::time::Duration::from_secs)
108 .unwrap_or(constants::HOPR_SWARM_IDLE_CONNECTION_TIMEOUT),
109 )
110 })
111 .build(),
112 })
113 }
114
115 #[cfg(not(feature = "runtime-tokio"))]
116 pub async fn build<T>(_me: libp2p::identity::Keypair, _external_discovery_events: T) -> Result<Self>
117 where
118 T: Stream<Item = PeerDiscovery> + Send + 'static,
119 {
120 Err(crate::errors::P2PError::Libp2p(
121 "InactiveNetwork::build requires the runtime-tokio feature".to_string(),
122 ))
123 }
124
125 pub fn with_listen_on(mut self, multiaddresses: Vec<Multiaddr>) -> Result<InactiveConfiguredNetwork> {
126 for multiaddress in multiaddresses.iter() {
127 match resolve_dns_if_any(multiaddress) {
128 Ok(ma) => {
129 if let Err(e) = self.swarm.listen_on(ma.clone()) {
130 warn!(%multiaddress, listen_on=%ma, error = %e, "Failed to listen_on, will try to use an unspecified address");
131
132 match replace_transport_with_unspecified(&ma) {
133 Ok(ma) => {
134 if let Err(e) = self.swarm.listen_on(ma.clone()) {
135 warn!(multiaddress = %ma, error = %e, "Failed to listen_on using the unspecified multiaddress",);
136 } else {
137 info!(
138 listen_on = ?ma,
139 multiaddress = ?multiaddress,
140 "Listening for p2p connections"
141 );
142 self.swarm.add_external_address(multiaddress.clone());
143 }
144 }
145 Err(e) => {
146 error!(multiaddress = %ma, error = %e, "Failed to transform the multiaddress")
147 }
148 }
149 } else {
150 info!(
151 listen_on = ?ma,
152 multiaddress = ?multiaddress,
153 "Listening for p2p connections"
154 );
155 self.swarm.add_external_address(multiaddress.clone());
156 }
157 }
158 Err(error) => error!(%multiaddress, %error, "Failed to transform the multiaddress"),
159 }
160 }
161
162 Ok(InactiveConfiguredNetwork { swarm: self.swarm })
163 }
164}
165
166pub struct InactiveConfiguredNetwork {
167 swarm: libp2p::Swarm<HoprNetworkBehavior>,
168}
169
170pub struct HoprLibp2pNetworkBuilder {
176 bootstrap: std::pin::Pin<Box<dyn Stream<Item = PeerDiscovery> + Send + Sync>>,
177}
178
179impl HoprLibp2pNetworkBuilder {
180 pub fn new<T>(bootstrap: T) -> Self
181 where
182 T: Stream<Item = PeerDiscovery> + Send + Sync + 'static,
183 {
184 Self {
185 bootstrap: Box::pin(bootstrap),
186 }
187 }
188}
189
190impl std::fmt::Debug for HoprLibp2pNetworkBuilder {
191 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192 f.debug_struct("HoprSwarmBuilder").finish_non_exhaustive()
193 }
194}
195
196impl HoprLibp2pNetworkBuilder {
197 pub async fn build(
202 self,
203 identity: &OffchainKeypair,
204 my_multiaddresses: Vec<Multiaddr>,
205 protocol: &'static str,
206 allow_private_addresses: bool,
207 ) -> std::result::Result<(HoprNetwork, BoxedProcessFn), impl std::error::Error> {
208 #[cfg(all(feature = "telemetry", not(test)))]
209 {
210 METRIC_NETWORK_HEALTH.set(0.0);
211 }
212
213 let identity_p2p: libp2p::identity::Keypair = identity.into();
214 let me = identity_p2p.public().to_peer_id();
215 let swarm = InactiveNetwork::build(identity_p2p, self.bootstrap)
216 .await
217 .expect("swarm must be constructible");
218
219 let swarm = swarm
220 .with_listen_on(my_multiaddresses.clone())
221 .expect("swarm must be configurable");
222
223 let swarm = swarm.swarm;
224 let store = crate::peer_store::NetworkPeerStore::new(me, my_multiaddresses.into_iter().collect());
225 let tracker: Arc<DashSet<libp2p::PeerId>> = Default::default();
226
227 let (notifier, event_rx) = async_broadcast::broadcast(1000);
228
229 let network = HoprNetwork {
230 tracker: tracker.clone(),
231 store: Arc::new(store.clone()),
232 control: swarm.behaviour().streams.new_control(),
233 protocol: libp2p::StreamProtocol::new(protocol),
234 event_rx: event_rx.deactivate(),
235 };
236
237 #[cfg(all(feature = "telemetry", not(test)))]
238 let network_inner = network.clone();
239 let mut swarm = swarm;
240 let process = async move {
241 while let Some(event) = swarm.next().await {
242 match event {
243 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Discovery(_)) => {}
244 SwarmEvent::Behaviour(
245 HoprNetworkBehaviorEvent::Autonat(event)
246 ) => {
247 match event {
248 autonat::Event::StatusChanged { old, new } => {
249 info!(?old, ?new, "AutoNAT status changed");
250 #[cfg(all(feature = "telemetry", not(test)))]
251 {
252 let value = match new {
253 autonat::NatStatus::Unknown => 0.0,
254 autonat::NatStatus::Public(_) => 1.0,
255 autonat::NatStatus::Private => 2.0,
256 };
257 METRIC_TRANSPORT_NAT_STATUS.set(value);
258 }
259 }
260 autonat::Event::InboundProbe { .. } => {}
261 autonat::Event::OutboundProbe { .. } => {}
262 }
263 }
264 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Identify(_event)) => {}
265 SwarmEvent::ConnectionEstablished {
266 peer_id,
267 connection_id,
268 num_established,
269 established_in,
270 endpoint,
271 ..
272 } => {
274 debug!(%peer_id, %connection_id, num_established, established_in_ms = established_in.as_millis(), transport="libp2p", "connection established");
275
276 if num_established == std::num::NonZero::<u32>::new(1).expect("must be a non-zero value") {
277 match endpoint {
278 libp2p::core::ConnectedPoint::Dialer { address, .. } => {
279 if allow_private_addresses || is_public_address(&address) {
280 if let Err(error) = store.add(peer_id, std::collections::HashSet::from([address])) {
281 error!(peer = %peer_id, %error, direction = "outgoing", "failed to add connected peer to the peer store");
282 }
283 } else {
284 debug!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Private/local peer address encountered")
285 }
286 tracker.insert(peer_id);
287 if let Err(error) = notifier.try_broadcast(hopr_api::network::NetworkEvent::PeerConnected(peer_id)) {
288 error!(peer = %peer_id, %error, "failed to broadcast peer connected event");
289 }
290 },
291 libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => {
292 if allow_private_addresses || is_public_address(&send_back_addr) {
293 if let Err(error) = store.add(peer_id, std::collections::HashSet::from([send_back_addr])) {
294 error!(peer = %peer_id, %error, direction = "incoming", "failed to add connected peer to the peer store");
295 }
296 } else {
297 debug!(transport="libp2p", peer = %peer_id, multiaddress = %send_back_addr, "Private/local peer address ignored")
298 }
299 tracker.insert(peer_id);
300 if let Err(error) = notifier.try_broadcast(hopr_api::network::NetworkEvent::PeerConnected(peer_id)) {
301 error!(peer = %peer_id, %error, "failed to broadcast peer connected event");
302 }
303 }
304 }
305 } else {
306 trace!(transport="libp2p", peer = %peer_id, num_established, "Additional connection established")
307 }
308
309 print_network_info(swarm.network_info(), "connection established");
310
311 #[cfg(all(feature = "telemetry", not(test)))]
312 {
313 METRIC_NETWORK_HEALTH.set((hopr_api::network::NetworkView::health(&network_inner) as i32).into());
314 METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.increment(1.0);
315 }
316 }
317 SwarmEvent::ConnectionClosed {
318 peer_id,
319 connection_id,
320 cause,
321 num_established,
322 ..
323 } => {
325 debug!(%peer_id, %connection_id, num_established, transport="libp2p", "connection closed: {cause:?}");
326
327 if num_established == 0 {
328 tracker.remove(&peer_id);
329 if let Err(error) = notifier.try_broadcast(hopr_api::network::NetworkEvent::PeerDisconnected(peer_id)) {
330 error!(peer = %peer_id, %error, "failed to broadcast peer disconnected event");
331 }
332 }
333
334 print_network_info(swarm.network_info(), "connection closed");
335
336 #[cfg(all(feature = "telemetry", not(test)))]
337 {
338 METRIC_NETWORK_HEALTH.set((hopr_api::network::NetworkView::health(&network_inner) as i32).into());
339 METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.decrement(1.0);
340 }
341 }
342 SwarmEvent::IncomingConnection {
343 connection_id,
344 local_addr,
345 send_back_addr,
346 } => {
347 trace!(%local_addr, %send_back_addr, %connection_id, transport="libp2p", "incoming connection");
348 }
349 SwarmEvent::IncomingConnectionError {
350 local_addr,
351 connection_id,
352 error,
353 send_back_addr,
354 peer_id
355 } => {
356 debug!(?peer_id, %local_addr, %send_back_addr, %connection_id, transport="libp2p", %error, "incoming connection error");
357 }
358 SwarmEvent::OutgoingConnectionError {
359 connection_id,
360 error,
361 peer_id
362 } => {
363 debug!(peer = ?peer_id, %connection_id, transport="libp2p", %error, "outgoing connection error");
364
365 if let Some(peer_id) = peer_id
366 && !swarm.is_connected(&peer_id) {
367 if let Err(error) = store.remove(&peer_id) {
368 error!(peer = %peer_id, %error, "failed to remove undialable peer from the peer store");
369 }
370 tracker.remove(&peer_id);
371 if let Err(error) = notifier.try_broadcast(hopr_api::network::NetworkEvent::PeerDisconnected(peer_id)) {
372 error!(peer = %peer_id, %error, "failed to broadcast peer disconnected event");
373 }
374 }
375
376 #[cfg(all(feature = "telemetry", not(test)))]
377 {
378 METRIC_NETWORK_HEALTH.set((hopr_api::network::NetworkView::health(&network_inner) as i32).into());
379 }
380 }
381 SwarmEvent::NewListenAddr {
382 listener_id,
383 address,
384 } => {
385 debug!(%listener_id, %address, transport="libp2p", "new listen address")
386 }
387 SwarmEvent::ExpiredListenAddr {
388 listener_id,
389 address,
390 } => {
391 debug!(%listener_id, %address, transport="libp2p", "expired listen address")
392 }
393 SwarmEvent::ListenerClosed {
394 listener_id,
395 addresses,
396 reason,
397 } => {
398 debug!(%listener_id, ?addresses, ?reason, transport="libp2p", "listener closed", )
399 }
400 SwarmEvent::ListenerError {
401 listener_id,
402 error,
403 } => {
404 debug!(%listener_id, transport="libp2p", %error, "listener error")
405 }
406 SwarmEvent::Dialing {
407 peer_id,
408 connection_id,
409 } => {
410 debug!(peer = ?peer_id, %connection_id, transport="libp2p", "dialing")
411 }
412 SwarmEvent::NewExternalAddrCandidate {address} => {
413 debug!(%address, "Detected new external address candidate")
414 }
415 SwarmEvent::ExternalAddrConfirmed { address } => {
416 info!(%address, "Detected external address")
417 }
418 SwarmEvent::ExternalAddrExpired {
419 .. } => {}
421 SwarmEvent::NewExternalAddrOfPeer {
422 peer_id, address
423 } => {
424 if allow_private_addresses || is_public_address(&address) {
426 swarm.add_peer_address(peer_id, address.clone());
427 trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Public peer address stored in swarm")
428 } else {
429 trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Private/local peer address ignored")
430 }
431 },
432 _ => trace!(transport="libp2p", "Unsupported enum option detected")
433 }
434 }
435 }.boxed();
436
437 Ok::<_, std::io::Error>((network, Box::new(move || process)))
438 }
439}
440
441fn print_network_info(network_info: NetworkInfo, event: &str) {
442 let num_peers = network_info.num_peers();
443 let connection_counters = network_info.connection_counters();
444 let num_incoming = connection_counters.num_established_incoming();
445 let num_outgoing = connection_counters.num_established_outgoing();
446 info!(
447 num_peers,
448 num_incoming, num_outgoing, "swarm network status after {event}"
449 );
450}
451
452#[cfg(test)]
453mod tests {
454 #[test]
455 #[cfg(any(target_os = "android", target_os = "ios"))]
456 fn uses_cloudflare_dns_resolver_config() {
457 let (resolver_config, resolver_opts) = super::swarm_dns_config();
458 assert_eq!(resolver_config, libp2p::dns::ResolverConfig::cloudflare());
459 assert_eq!(resolver_opts, libp2p::dns::ResolverOpts::default());
460 }
461}