1#[cfg(feature = "runtime-tokio")]
2use std::num::NonZeroU8;
3use std::sync::Arc;
4
5use dashmap::DashSet;
6use futures::{FutureExt, Stream, StreamExt};
7use hopr_api::{Multiaddr, OffchainKeypair, network::BoxedProcessFn};
8use hopr_utils::network_types::prelude::is_public_address;
9use libp2p::{
10 autonat,
11 swarm::{NetworkInfo, SwarmEvent},
12};
13use tracing::{debug, error, info, trace, warn};
14
15use crate::{
16 HoprNetwork, HoprNetworkBehavior, HoprNetworkBehaviorEvent, PeerDiscovery,
17 errors::Result,
18 utils::{replace_transport_with_unspecified, resolve_dns_if_any},
19};
20
21#[cfg(all(feature = "telemetry", not(test)))]
22lazy_static::lazy_static! {
23 static ref METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT: hopr_api::types::telemetry::SimpleGauge = hopr_api::types::telemetry::SimpleGauge::new(
24 "hopr_transport_p2p_active_connection_count",
25 "Number of currently active p2p connections as observed from libp2p events"
26 ).unwrap();
27 static ref METRIC_TRANSPORT_NAT_STATUS: hopr_api::types::telemetry::SimpleGauge = hopr_api::types::telemetry::SimpleGauge::new(
28 "hopr_transport_p2p_nat_status",
29 "Current NAT status as reported by libp2p autonat. 0=Unknown, 1=Public, 2=Private"
30 ).unwrap();
31 static ref METRIC_NETWORK_HEALTH: hopr_api::types::telemetry::SimpleGauge =
32 hopr_api::types::telemetry::SimpleGauge::new("hopr_network_health", "Connectivity health indicator").unwrap();
33}
34
35pub struct InactiveNetwork {
36 swarm: libp2p::Swarm<HoprNetworkBehavior>,
37}
38
39#[cfg(any(target_os = "android", target_os = "ios"))]
40fn swarm_dns_config() -> (libp2p::dns::ResolverConfig, libp2p::dns::ResolverOpts) {
41 (
42 libp2p::dns::ResolverConfig::cloudflare(),
43 libp2p::dns::ResolverOpts::default(),
44 )
45}
46
47impl InactiveNetwork {
51 #[cfg(feature = "runtime-tokio")]
52 pub async fn build(
53 me: libp2p::identity::Keypair,
54 external_discovery_events: futures::stream::BoxStream<'static, PeerDiscovery>,
55 ) -> Result<Self> {
56 let me_public: libp2p::identity::PublicKey = me.public();
57
58 let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
59 .with_tokio()
60 .with_tcp(
61 libp2p::tcp::Config::default().nodelay(true),
62 libp2p::noise::Config::new,
63 libp2p::yamux::Config::default,
66 )
67 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?;
68
69 #[cfg(feature = "transport-quic")]
70 let swarm = swarm.with_quic();
71
72 #[cfg(any(target_os = "android", target_os = "ios"))]
73 let swarm = {
74 let (dns_resolver_config, dns_resolver_opts) = swarm_dns_config();
75 swarm.with_dns_config(dns_resolver_config, dns_resolver_opts)
76 };
77
78 #[cfg(not(any(target_os = "android", target_os = "ios")))]
79 let swarm = swarm
80 .with_dns()
81 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?;
82
83 Ok(Self {
84 swarm: swarm
85 .with_behaviour(|_key| HoprNetworkBehavior::new(me_public, external_discovery_events))
86 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
87 .with_swarm_config(|cfg| {
88 cfg.with_dial_concurrency_factor(
89 NonZeroU8::new({
90 let v = std::env::var("HOPR_INTERNAL_LIBP2P_MAX_CONCURRENTLY_DIALED_PEER_COUNT")
91 .ok()
92 .and_then(|v| v.trim().parse::<u8>().ok())
93 .unwrap_or(crate::constants::HOPR_SWARM_CONCURRENTLY_DIALED_PEER_COUNT);
94 v.max(1)
95 })
96 .expect("clamped to >= 1, will never fail"),
97 )
98 .with_max_negotiating_inbound_streams(
99 std::env::var("HOPR_INTERNAL_LIBP2P_MAX_NEGOTIATING_INBOUND_STREAM_COUNT")
100 .and_then(|v| v.parse::<usize>().map_err(|_e| std::env::VarError::NotPresent))
101 .unwrap_or(crate::constants::HOPR_SWARM_CONCURRENTLY_NEGOTIATING_INBOUND_PEER_COUNT),
102 )
103 .with_idle_connection_timeout(
104 std::env::var("HOPR_INTERNAL_LIBP2P_SWARM_IDLE_TIMEOUT")
105 .and_then(|v| v.parse::<u64>().map_err(|_e| std::env::VarError::NotPresent))
106 .map(std::time::Duration::from_secs)
107 .unwrap_or(crate::constants::HOPR_SWARM_IDLE_CONNECTION_TIMEOUT),
108 )
109 })
110 .build(),
111 })
112 }
113
114 #[cfg(not(feature = "runtime-tokio"))]
115 pub async fn build<T>(_me: libp2p::identity::Keypair, _external_discovery_events: T) -> Result<Self>
116 where
117 T: Stream<Item = PeerDiscovery> + Send + 'static,
118 {
119 Err(crate::errors::P2PError::Libp2p(
120 "InactiveNetwork::build requires the runtime-tokio feature".to_string(),
121 ))
122 }
123
124 pub fn with_listen_on(mut self, multiaddresses: Vec<Multiaddr>) -> Result<InactiveConfiguredNetwork> {
125 for multiaddress in multiaddresses.iter() {
126 match resolve_dns_if_any(multiaddress) {
127 Ok(ma) => {
128 if let Err(e) = self.swarm.listen_on(ma.clone()) {
129 warn!(%multiaddress, listen_on=%ma, error = %e, "Failed to listen_on, will try to use an unspecified address");
130
131 match replace_transport_with_unspecified(&ma) {
132 Ok(ma) => {
133 if let Err(e) = self.swarm.listen_on(ma.clone()) {
134 warn!(multiaddress = %ma, error = %e, "Failed to listen_on using the unspecified multiaddress",);
135 } else {
136 info!(
137 listen_on = ?ma,
138 multiaddress = ?multiaddress,
139 "Listening for p2p connections"
140 );
141 self.swarm.add_external_address(multiaddress.clone());
142 }
143 }
144 Err(e) => {
145 error!(multiaddress = %ma, error = %e, "Failed to transform the multiaddress")
146 }
147 }
148 } else {
149 info!(
150 listen_on = ?ma,
151 multiaddress = ?multiaddress,
152 "Listening for p2p connections"
153 );
154 self.swarm.add_external_address(multiaddress.clone());
155 }
156 }
157 Err(error) => error!(%multiaddress, %error, "Failed to transform the multiaddress"),
158 }
159 }
160
161 Ok(InactiveConfiguredNetwork { swarm: self.swarm })
162 }
163}
164
165pub struct InactiveConfiguredNetwork {
166 swarm: libp2p::Swarm<HoprNetworkBehavior>,
167}
168
169pub struct HoprLibp2pNetworkBuilder {
175 bootstrap: std::pin::Pin<Box<dyn Stream<Item = PeerDiscovery> + Send + Sync>>,
176}
177
178impl HoprLibp2pNetworkBuilder {
179 pub fn new<T>(bootstrap: T) -> Self
180 where
181 T: Stream<Item = PeerDiscovery> + Send + Sync + 'static,
182 {
183 Self {
184 bootstrap: Box::pin(bootstrap),
185 }
186 }
187}
188
189impl std::fmt::Debug for HoprLibp2pNetworkBuilder {
190 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191 f.debug_struct("HoprSwarmBuilder").finish_non_exhaustive()
192 }
193}
194
195impl HoprLibp2pNetworkBuilder {
196 pub async fn build(
201 self,
202 identity: &OffchainKeypair,
203 my_multiaddresses: Vec<Multiaddr>,
204 protocol: &'static str,
205 allow_private_addresses: bool,
206 ) -> std::result::Result<(HoprNetwork, BoxedProcessFn), impl std::error::Error> {
207 #[cfg(all(feature = "telemetry", not(test)))]
208 {
209 METRIC_NETWORK_HEALTH.set(0.0);
210 }
211
212 let identity_p2p: libp2p::identity::Keypair = identity.into();
213 let me = identity_p2p.public().to_peer_id();
214 let swarm = InactiveNetwork::build(identity_p2p, self.bootstrap)
215 .await
216 .expect("swarm must be constructible");
217
218 let swarm = swarm
219 .with_listen_on(my_multiaddresses.clone())
220 .expect("swarm must be configurable");
221
222 let swarm = swarm.swarm;
223 let store = crate::peer_store::NetworkPeerStore::new(me, my_multiaddresses.into_iter().collect());
224 let tracker: Arc<DashSet<libp2p::PeerId>> = Default::default();
225 let liveness: crate::liveness::LivenessRegistry = Default::default();
226
227 let (notifier, event_rx) = async_broadcast::broadcast(1000);
228
229 let network = HoprNetwork {
230 tracker: tracker.clone(),
231 store: Arc::new(store.clone()),
232 control: swarm.behaviour().streams.new_control(),
233 protocol: libp2p::StreamProtocol::new(protocol),
234 event_rx: event_rx.deactivate(),
235 liveness: liveness.clone(),
236 };
237
238 #[cfg(all(feature = "telemetry", not(test)))]
239 let network_inner = network.clone();
240 let mut swarm = swarm;
241 let process = async move {
242 let disconnect_peer = |peer_id: libp2p::PeerId| {
245 tracker.remove(&peer_id);
246 liveness.remove(&peer_id);
247 if let Err(error) = notifier.try_broadcast(hopr_api::network::NetworkEvent::PeerDisconnected(peer_id)) {
248 error!(peer = %peer_id, %error, "failed to broadcast peer disconnected event");
249 }
250 };
251
252 while let Some(event) = swarm.next().await {
253 match event {
254 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Discovery(_)) => {}
255 SwarmEvent::Behaviour(
256 HoprNetworkBehaviorEvent::Autonat(event)
257 ) => {
258 match event {
259 autonat::Event::StatusChanged { old, new } => {
260 info!(?old, ?new, "AutoNAT status changed");
261 #[cfg(all(feature = "telemetry", not(test)))]
262 {
263 let value = match new {
264 autonat::NatStatus::Unknown => 0.0,
265 autonat::NatStatus::Public(_) => 1.0,
266 autonat::NatStatus::Private => 2.0,
267 };
268 METRIC_TRANSPORT_NAT_STATUS.set(value);
269 }
270 }
271 autonat::Event::InboundProbe { .. } => {}
272 autonat::Event::OutboundProbe { .. } => {}
273 }
274 }
275 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Identify(_event)) => {}
276 SwarmEvent::ConnectionEstablished {
277 peer_id,
278 connection_id,
279 num_established,
280 established_in,
281 endpoint,
282 ..
283 } => {
285 debug!(%peer_id, %connection_id, num_established, established_in_ms = established_in.as_millis(), transport="libp2p", "connection established");
286
287 if num_established == std::num::NonZero::<u32>::new(1).expect("must be a non-zero value") {
288 match endpoint {
289 libp2p::core::ConnectedPoint::Dialer { address, .. } => {
290 if allow_private_addresses || is_public_address(&address) {
291 if let Err(error) = store.add(peer_id, std::collections::HashSet::from([address])) {
292 error!(peer = %peer_id, %error, direction = "outgoing", "failed to add connected peer to the peer store");
293 }
294 } else {
295 debug!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Private/local peer address encountered")
296 }
297 tracker.insert(peer_id);
298 if let Err(error) = notifier.try_broadcast(hopr_api::network::NetworkEvent::PeerConnected(peer_id)) {
299 error!(peer = %peer_id, %error, "failed to broadcast peer connected event");
300 }
301 },
302 libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => {
303 if allow_private_addresses || is_public_address(&send_back_addr) {
304 if let Err(error) = store.add(peer_id, std::collections::HashSet::from([send_back_addr])) {
305 error!(peer = %peer_id, %error, direction = "incoming", "failed to add connected peer to the peer store");
306 }
307 } else {
308 debug!(transport="libp2p", peer = %peer_id, multiaddress = %send_back_addr, "Private/local peer address ignored")
309 }
310 tracker.insert(peer_id);
311 if let Err(error) = notifier.try_broadcast(hopr_api::network::NetworkEvent::PeerConnected(peer_id)) {
312 error!(peer = %peer_id, %error, "failed to broadcast peer connected event");
313 }
314 }
315 }
316 } else {
317 trace!(transport="libp2p", peer = %peer_id, num_established, "Additional connection established")
318 }
319
320 print_network_info(swarm.network_info(), "connection established");
321
322 #[cfg(all(feature = "telemetry", not(test)))]
323 {
324 METRIC_NETWORK_HEALTH.set((hopr_api::network::NetworkView::health(&network_inner) as i32).into());
325 METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.increment(1.0);
326 }
327 }
328 SwarmEvent::ConnectionClosed {
329 peer_id,
330 connection_id,
331 cause,
332 num_established,
333 ..
334 } => {
336 debug!(%peer_id, %connection_id, num_established, transport="libp2p", "connection closed: {cause:?}");
337
338 if num_established == 0 {
339 disconnect_peer(peer_id);
340 }
341
342 print_network_info(swarm.network_info(), "connection closed");
343
344 #[cfg(all(feature = "telemetry", not(test)))]
345 {
346 METRIC_NETWORK_HEALTH.set((hopr_api::network::NetworkView::health(&network_inner) as i32).into());
347 METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.decrement(1.0);
348 }
349 }
350 SwarmEvent::IncomingConnection {
351 connection_id,
352 local_addr,
353 send_back_addr,
354 } => {
355 trace!(%local_addr, %send_back_addr, %connection_id, transport="libp2p", "incoming connection");
356 }
357 SwarmEvent::IncomingConnectionError {
358 local_addr,
359 connection_id,
360 error,
361 send_back_addr,
362 peer_id
363 } => {
364 debug!(?peer_id, %local_addr, %send_back_addr, %connection_id, transport="libp2p", %error, "incoming connection error");
365 }
366 SwarmEvent::OutgoingConnectionError {
367 connection_id,
368 error,
369 peer_id
370 } => {
371 debug!(peer = ?peer_id, %connection_id, transport="libp2p", %error, "outgoing connection error");
372
373 if let Some(peer_id) = peer_id
374 && !swarm.is_connected(&peer_id) {
375 if let Err(error) = store.remove(&peer_id) {
376 error!(peer = %peer_id, %error, "failed to remove undialable peer from the peer store");
377 }
378 disconnect_peer(peer_id);
379 }
380
381 #[cfg(all(feature = "telemetry", not(test)))]
382 {
383 METRIC_NETWORK_HEALTH.set((hopr_api::network::NetworkView::health(&network_inner) as i32).into());
384 }
385 }
386 SwarmEvent::NewListenAddr {
387 listener_id,
388 address,
389 } => {
390 debug!(%listener_id, %address, transport="libp2p", "new listen address")
391 }
392 SwarmEvent::ExpiredListenAddr {
393 listener_id,
394 address,
395 } => {
396 debug!(%listener_id, %address, transport="libp2p", "expired listen address")
397 }
398 SwarmEvent::ListenerClosed {
399 listener_id,
400 addresses,
401 reason,
402 } => {
403 debug!(%listener_id, ?addresses, ?reason, transport="libp2p", "listener closed", )
404 }
405 SwarmEvent::ListenerError {
406 listener_id,
407 error,
408 } => {
409 debug!(%listener_id, transport="libp2p", %error, "listener error")
410 }
411 SwarmEvent::Dialing {
412 peer_id,
413 connection_id,
414 } => {
415 debug!(peer = ?peer_id, %connection_id, transport="libp2p", "dialing")
416 }
417 SwarmEvent::NewExternalAddrCandidate {address} => {
418 debug!(%address, "Detected new external address candidate")
419 }
420 SwarmEvent::ExternalAddrConfirmed { address } => {
421 info!(%address, "Detected external address")
422 }
423 SwarmEvent::ExternalAddrExpired {
424 .. } => {}
426 SwarmEvent::NewExternalAddrOfPeer {
427 peer_id, address
428 } => {
429 if allow_private_addresses || is_public_address(&address) {
431 swarm.add_peer_address(peer_id, address.clone());
432 trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Public peer address stored in swarm")
433 } else {
434 trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Private/local peer address ignored")
435 }
436 },
437 _ => trace!(transport="libp2p", "Unsupported enum option detected")
438 }
439 }
440 }.boxed();
441
442 Ok::<_, std::io::Error>((network, Box::new(move || process)))
443 }
444}
445
446fn print_network_info(network_info: NetworkInfo, event: &str) {
447 let num_peers = network_info.num_peers();
448 let connection_counters = network_info.connection_counters();
449 let num_incoming = connection_counters.num_established_incoming();
450 let num_outgoing = connection_counters.num_established_outgoing();
451 info!(
452 num_peers,
453 num_incoming, num_outgoing, "swarm network status after {event}"
454 );
455}
456
457#[cfg(test)]
458mod tests {
459 #[test]
460 #[cfg(any(target_os = "android", target_os = "ios"))]
461 fn uses_cloudflare_dns_resolver_config() {
462 let (resolver_config, resolver_opts) = super::swarm_dns_config();
463 assert_eq!(resolver_config, libp2p::dns::ResolverConfig::cloudflare());
464 assert_eq!(resolver_opts, libp2p::dns::ResolverOpts::default());
465 }
466}