Skip to main content

hopr_transport_p2p/
swarm.rs

1#[cfg(feature = "runtime-tokio")]
2use std::num::NonZeroU8;
3use std::sync::Arc;
4
5use dashmap::DashSet;
6use futures::{FutureExt, Stream, StreamExt};
7use hopr_api::{Multiaddr, OffchainKeypair, network::BoxedProcessFn};
8use hopr_utils::network_types::prelude::is_public_address;
9use libp2p::{
10    autonat,
11    swarm::{NetworkInfo, SwarmEvent},
12};
13use tracing::{debug, error, info, trace, warn};
14
15use crate::{
16    HoprNetwork, HoprNetworkBehavior, HoprNetworkBehaviorEvent, PeerDiscovery,
17    errors::Result,
18    utils::{replace_transport_with_unspecified, resolve_dns_if_any},
19};
20
21#[cfg(all(feature = "telemetry", not(test)))]
22lazy_static::lazy_static! {
23    static ref METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT:  hopr_api::types::telemetry::SimpleGauge =  hopr_api::types::telemetry::SimpleGauge::new(
24        "hopr_transport_p2p_active_connection_count",
25        "Number of currently active p2p connections as observed from libp2p events"
26    ).unwrap();
27    static ref METRIC_TRANSPORT_NAT_STATUS: hopr_api::types::telemetry::SimpleGauge = hopr_api::types::telemetry::SimpleGauge::new(
28        "hopr_transport_p2p_nat_status",
29        "Current NAT status as reported by libp2p autonat. 0=Unknown, 1=Public, 2=Private"
30    ).unwrap();
31    static ref METRIC_NETWORK_HEALTH: hopr_api::types::telemetry::SimpleGauge =
32         hopr_api::types::telemetry::SimpleGauge::new("hopr_network_health", "Connectivity health indicator").unwrap();
33}
34
35pub struct InactiveNetwork {
36    swarm: libp2p::Swarm<HoprNetworkBehavior>,
37}
38
39#[cfg(any(target_os = "android", target_os = "ios"))]
40fn swarm_dns_config() -> (libp2p::dns::ResolverConfig, libp2p::dns::ResolverOpts) {
41    (
42        libp2p::dns::ResolverConfig::cloudflare(),
43        libp2p::dns::ResolverOpts::default(),
44    )
45}
46
47/// Build objects comprising an inactive p2p network.
48///
49/// Returns a built [libp2p::Swarm] object implementing the HoprNetworkBehavior functionality.
50impl InactiveNetwork {
51    #[cfg(feature = "runtime-tokio")]
52    pub async fn build(
53        me: libp2p::identity::Keypair,
54        external_discovery_events: futures::stream::BoxStream<'static, PeerDiscovery>,
55    ) -> Result<Self> {
56        let me_public: libp2p::identity::PublicKey = me.public();
57
58        let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
59            .with_tokio()
60            .with_tcp(
61                libp2p::tcp::Config::default().nodelay(true),
62                libp2p::noise::Config::new,
63                // use default yamux configuration to enable auto-tuning
64                // see https://github.com/libp2p/rust-libp2p/pull/4970
65                libp2p::yamux::Config::default,
66            )
67            .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?;
68
69        #[cfg(feature = "transport-quic")]
70        let swarm = swarm.with_quic();
71
72        #[cfg(any(target_os = "android", target_os = "ios"))]
73        let swarm = {
74            let (dns_resolver_config, dns_resolver_opts) = swarm_dns_config();
75            swarm.with_dns_config(dns_resolver_config, dns_resolver_opts)
76        };
77
78        #[cfg(not(any(target_os = "android", target_os = "ios")))]
79        let swarm = swarm
80            .with_dns()
81            .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?;
82
83        Ok(Self {
84            swarm: swarm
85                .with_behaviour(|_key| HoprNetworkBehavior::new(me_public, external_discovery_events))
86                .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
87                .with_swarm_config(|cfg| {
88                    cfg.with_dial_concurrency_factor(
89                        NonZeroU8::new({
90                            let v = std::env::var("HOPR_INTERNAL_LIBP2P_MAX_CONCURRENTLY_DIALED_PEER_COUNT")
91                                .ok()
92                                .and_then(|v| v.trim().parse::<u8>().ok())
93                                .unwrap_or(crate::constants::HOPR_SWARM_CONCURRENTLY_DIALED_PEER_COUNT);
94                            v.max(1)
95                        })
96                        .expect("clamped to >= 1, will never fail"),
97                    )
98                    .with_max_negotiating_inbound_streams(
99                        std::env::var("HOPR_INTERNAL_LIBP2P_MAX_NEGOTIATING_INBOUND_STREAM_COUNT")
100                            .and_then(|v| v.parse::<usize>().map_err(|_e| std::env::VarError::NotPresent))
101                            .unwrap_or(crate::constants::HOPR_SWARM_CONCURRENTLY_NEGOTIATING_INBOUND_PEER_COUNT),
102                    )
103                    .with_idle_connection_timeout(
104                        std::env::var("HOPR_INTERNAL_LIBP2P_SWARM_IDLE_TIMEOUT")
105                            .and_then(|v| v.parse::<u64>().map_err(|_e| std::env::VarError::NotPresent))
106                            .map(std::time::Duration::from_secs)
107                            .unwrap_or(crate::constants::HOPR_SWARM_IDLE_CONNECTION_TIMEOUT),
108                    )
109                })
110                .build(),
111        })
112    }
113
114    #[cfg(not(feature = "runtime-tokio"))]
115    pub async fn build<T>(_me: libp2p::identity::Keypair, _external_discovery_events: T) -> Result<Self>
116    where
117        T: Stream<Item = PeerDiscovery> + Send + 'static,
118    {
119        Err(crate::errors::P2PError::Libp2p(
120            "InactiveNetwork::build requires the runtime-tokio feature".to_string(),
121        ))
122    }
123
124    pub fn with_listen_on(mut self, multiaddresses: Vec<Multiaddr>) -> Result<InactiveConfiguredNetwork> {
125        for multiaddress in multiaddresses.iter() {
126            match resolve_dns_if_any(multiaddress) {
127                Ok(ma) => {
128                    if let Err(e) = self.swarm.listen_on(ma.clone()) {
129                        warn!(%multiaddress, listen_on=%ma, error = %e, "Failed to listen_on, will try to use an unspecified address");
130
131                        match replace_transport_with_unspecified(&ma) {
132                            Ok(ma) => {
133                                if let Err(e) = self.swarm.listen_on(ma.clone()) {
134                                    warn!(multiaddress = %ma, error = %e, "Failed to listen_on using the unspecified multiaddress",);
135                                } else {
136                                    info!(
137                                        listen_on = ?ma,
138                                        multiaddress = ?multiaddress,
139                                        "Listening for p2p connections"
140                                    );
141                                    self.swarm.add_external_address(multiaddress.clone());
142                                }
143                            }
144                            Err(e) => {
145                                error!(multiaddress = %ma, error = %e, "Failed to transform the multiaddress")
146                            }
147                        }
148                    } else {
149                        info!(
150                            listen_on = ?ma,
151                            multiaddress = ?multiaddress,
152                            "Listening for p2p connections"
153                        );
154                        self.swarm.add_external_address(multiaddress.clone());
155                    }
156                }
157                Err(error) => error!(%multiaddress, %error, "Failed to transform the multiaddress"),
158            }
159        }
160
161        Ok(InactiveConfiguredNetwork { swarm: self.swarm })
162    }
163}
164
165pub struct InactiveConfiguredNetwork {
166    swarm: libp2p::Swarm<HoprNetworkBehavior>,
167}
168
169/// Factory for constructing the libp2p network and its background process.
170///
171/// Accepts a peer discovery stream and produces a [`HoprNetwork`] + background
172/// process function. The network supports event subscription via
173/// [`NetworkView::subscribe_network_events`](hopr_api::network::NetworkView::subscribe_network_events).
174pub struct HoprLibp2pNetworkBuilder {
175    bootstrap: std::pin::Pin<Box<dyn Stream<Item = PeerDiscovery> + Send + Sync>>,
176}
177
178impl HoprLibp2pNetworkBuilder {
179    pub fn new<T>(bootstrap: T) -> Self
180    where
181        T: Stream<Item = PeerDiscovery> + Send + Sync + 'static,
182    {
183        Self {
184            bootstrap: Box::pin(bootstrap),
185        }
186    }
187}
188
189impl std::fmt::Debug for HoprLibp2pNetworkBuilder {
190    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191        f.debug_struct("HoprSwarmBuilder").finish_non_exhaustive()
192    }
193}
194
195impl HoprLibp2pNetworkBuilder {
196    /// Build the network and return it along with its background process.
197    ///
198    /// The returned [`HoprNetwork`] supports event subscription — subscribe
199    /// before starting the process to avoid missing events.
200    pub async fn build(
201        self,
202        identity: &OffchainKeypair,
203        my_multiaddresses: Vec<Multiaddr>,
204        protocol: &'static str,
205        allow_private_addresses: bool,
206    ) -> std::result::Result<(HoprNetwork, BoxedProcessFn), impl std::error::Error> {
207        #[cfg(all(feature = "telemetry", not(test)))]
208        {
209            METRIC_NETWORK_HEALTH.set(0.0);
210        }
211
212        let identity_p2p: libp2p::identity::Keypair = identity.into();
213        let me = identity_p2p.public().to_peer_id();
214        let swarm = InactiveNetwork::build(identity_p2p, self.bootstrap)
215            .await
216            .expect("swarm must be constructible");
217
218        let swarm = swarm
219            .with_listen_on(my_multiaddresses.clone())
220            .expect("swarm must be configurable");
221
222        let swarm = swarm.swarm;
223        let store = crate::peer_store::NetworkPeerStore::new(me, my_multiaddresses.into_iter().collect());
224        let tracker: Arc<DashSet<libp2p::PeerId>> = Default::default();
225        let liveness: crate::liveness::LivenessRegistry = Default::default();
226
227        let (notifier, event_rx) = async_broadcast::broadcast(1000);
228
229        let network = HoprNetwork {
230            tracker: tracker.clone(),
231            store: Arc::new(store.clone()),
232            control: swarm.behaviour().streams.new_control(),
233            protocol: libp2p::StreamProtocol::new(protocol),
234            event_rx: event_rx.deactivate(),
235            liveness: liveness.clone(),
236        };
237
238        #[cfg(all(feature = "telemetry", not(test)))]
239        let network_inner = network.clone();
240        let mut swarm = swarm;
241        let process = async move {
242            // Shared teardown for both ConnectionClosed and OutgoingConnectionError: remove
243            // from the tracker and liveness registry and broadcast the disconnection event.
244            let disconnect_peer = |peer_id: libp2p::PeerId| {
245                tracker.remove(&peer_id);
246                liveness.remove(&peer_id);
247                if let Err(error) = notifier.try_broadcast(hopr_api::network::NetworkEvent::PeerDisconnected(peer_id)) {
248                    error!(peer = %peer_id, %error, "failed to broadcast peer disconnected event");
249                }
250            };
251
252            while let Some(event) = swarm.next().await {
253                match event {
254                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Discovery(_)) => {}
255                    SwarmEvent::Behaviour(
256                        HoprNetworkBehaviorEvent::Autonat(event)
257                    ) => {
258                        match event {
259                            autonat::Event::StatusChanged { old, new } => {
260                                info!(?old, ?new, "AutoNAT status changed");
261                                #[cfg(all(feature = "telemetry", not(test)))]
262                                {
263                                    let value = match new {
264                                        autonat::NatStatus::Unknown => 0.0,
265                                        autonat::NatStatus::Public(_) => 1.0,
266                                        autonat::NatStatus::Private => 2.0,
267                                    };
268                                    METRIC_TRANSPORT_NAT_STATUS.set(value);
269                                }
270                            }
271                            autonat::Event::InboundProbe { .. } => {}
272                            autonat::Event::OutboundProbe { .. } => {}
273                        }
274                    }
275                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Identify(_event)) => {}
276                    SwarmEvent::ConnectionEstablished {
277                        peer_id,
278                        connection_id,
279                        num_established,
280                        established_in,
281                        endpoint,
282                        ..
283                        // concurrent_dial_errors,
284                    } => {
285                        debug!(%peer_id, %connection_id, num_established, established_in_ms = established_in.as_millis(), transport="libp2p", "connection established");
286
287                        if num_established == std::num::NonZero::<u32>::new(1).expect("must be a non-zero value") {
288                            match endpoint {
289                                libp2p::core::ConnectedPoint::Dialer { address, .. } => {
290                                    if allow_private_addresses || is_public_address(&address) {
291                                        if let Err(error) = store.add(peer_id, std::collections::HashSet::from([address])) {
292                                            error!(peer = %peer_id, %error, direction = "outgoing", "failed to add connected peer to the peer store");
293                                        }
294                                    } else {
295                                        debug!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Private/local peer address encountered")
296                                    }
297                                    tracker.insert(peer_id);
298                                    if let Err(error) = notifier.try_broadcast(hopr_api::network::NetworkEvent::PeerConnected(peer_id)) {
299                                        error!(peer = %peer_id, %error, "failed to broadcast peer connected event");
300                                    }
301                                },
302                                libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => {
303                                    if allow_private_addresses || is_public_address(&send_back_addr) {
304                                        if let Err(error) = store.add(peer_id, std::collections::HashSet::from([send_back_addr])) {
305                                            error!(peer = %peer_id, %error, direction = "incoming", "failed to add connected peer to the peer store");
306                                        }
307                                    } else {
308                                        debug!(transport="libp2p", peer = %peer_id, multiaddress = %send_back_addr, "Private/local peer address ignored")
309                                    }
310                                    tracker.insert(peer_id);
311                                    if let Err(error) = notifier.try_broadcast(hopr_api::network::NetworkEvent::PeerConnected(peer_id)) {
312                                        error!(peer = %peer_id, %error, "failed to broadcast peer connected event");
313                                    }
314                                }
315                            }
316                        } else {
317                            trace!(transport="libp2p", peer = %peer_id, num_established, "Additional connection established")
318                        }
319
320                        print_network_info(swarm.network_info(), "connection established");
321
322                        #[cfg(all(feature = "telemetry", not(test)))]
323                        {
324                            METRIC_NETWORK_HEALTH.set((hopr_api::network::NetworkView::health(&network_inner) as i32).into());
325                            METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.increment(1.0);
326                        }
327                    }
328                    SwarmEvent::ConnectionClosed {
329                        peer_id,
330                        connection_id,
331                        cause,
332                        num_established,
333                        ..
334                        // endpoint,
335                    } => {
336                        debug!(%peer_id, %connection_id, num_established, transport="libp2p", "connection closed: {cause:?}");
337
338                        if num_established == 0 {
339                            disconnect_peer(peer_id);
340                        }
341
342                        print_network_info(swarm.network_info(), "connection closed");
343
344                        #[cfg(all(feature = "telemetry", not(test)))]
345                        {
346                            METRIC_NETWORK_HEALTH.set((hopr_api::network::NetworkView::health(&network_inner) as i32).into());
347                            METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.decrement(1.0);
348                        }
349                    }
350                    SwarmEvent::IncomingConnection {
351                        connection_id,
352                        local_addr,
353                        send_back_addr,
354                    } => {
355                        trace!(%local_addr, %send_back_addr, %connection_id, transport="libp2p", "incoming connection");
356                    }
357                    SwarmEvent::IncomingConnectionError {
358                        local_addr,
359                        connection_id,
360                        error,
361                        send_back_addr,
362                        peer_id
363                    } => {
364                        debug!(?peer_id, %local_addr, %send_back_addr, %connection_id, transport="libp2p", %error, "incoming connection error");
365                    }
366                    SwarmEvent::OutgoingConnectionError {
367                        connection_id,
368                        error,
369                        peer_id
370                    } => {
371                        debug!(peer = ?peer_id, %connection_id, transport="libp2p", %error, "outgoing connection error");
372
373                        if let Some(peer_id) = peer_id
374                            && !swarm.is_connected(&peer_id) {
375                                if let Err(error) = store.remove(&peer_id) {
376                                    error!(peer = %peer_id, %error, "failed to remove undialable peer from the peer store");
377                                }
378                                disconnect_peer(peer_id);
379                            }
380
381                        #[cfg(all(feature = "telemetry", not(test)))]
382                        {
383                            METRIC_NETWORK_HEALTH.set((hopr_api::network::NetworkView::health(&network_inner) as i32).into());
384                        }
385                    }
386                    SwarmEvent::NewListenAddr {
387                        listener_id,
388                        address,
389                    } => {
390                        debug!(%listener_id, %address, transport="libp2p", "new listen address")
391                    }
392                    SwarmEvent::ExpiredListenAddr {
393                        listener_id,
394                        address,
395                    } => {
396                        debug!(%listener_id, %address, transport="libp2p", "expired listen address")
397                    }
398                    SwarmEvent::ListenerClosed {
399                        listener_id,
400                        addresses,
401                        reason,
402                    } => {
403                        debug!(%listener_id, ?addresses, ?reason, transport="libp2p", "listener closed", )
404                    }
405                    SwarmEvent::ListenerError {
406                        listener_id,
407                        error,
408                    } => {
409                        debug!(%listener_id, transport="libp2p", %error, "listener error")
410                    }
411                    SwarmEvent::Dialing {
412                        peer_id,
413                        connection_id,
414                    } => {
415                        debug!(peer = ?peer_id, %connection_id, transport="libp2p", "dialing")
416                    }
417                    SwarmEvent::NewExternalAddrCandidate {address} => {
418                        debug!(%address, "Detected new external address candidate")
419                    }
420                    SwarmEvent::ExternalAddrConfirmed { address } => {
421                        info!(%address, "Detected external address")
422                    }
423                    SwarmEvent::ExternalAddrExpired {
424                        ..  // address: Multiaddr
425                    } => {}
426                    SwarmEvent::NewExternalAddrOfPeer {
427                        peer_id, address
428                    } => {
429                        // Only store public/routable addresses
430                        if allow_private_addresses || is_public_address(&address) {
431                            swarm.add_peer_address(peer_id, address.clone());
432                            trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Public peer address stored in swarm")
433                        } else {
434                            trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Private/local peer address ignored")
435                        }
436                    },
437                    _ => trace!(transport="libp2p", "Unsupported enum option detected")
438                }
439            }
440        }.boxed();
441
442        Ok::<_, std::io::Error>((network, Box::new(move || process)))
443    }
444}
445
446fn print_network_info(network_info: NetworkInfo, event: &str) {
447    let num_peers = network_info.num_peers();
448    let connection_counters = network_info.connection_counters();
449    let num_incoming = connection_counters.num_established_incoming();
450    let num_outgoing = connection_counters.num_established_outgoing();
451    info!(
452        num_peers,
453        num_incoming, num_outgoing, "swarm network status after {event}"
454    );
455}
456
457#[cfg(test)]
458mod tests {
459    #[test]
460    #[cfg(any(target_os = "android", target_os = "ios"))]
461    fn uses_cloudflare_dns_resolver_config() {
462        let (resolver_config, resolver_opts) = super::swarm_dns_config();
463        assert_eq!(resolver_config, libp2p::dns::ResolverConfig::cloudflare());
464        assert_eq!(resolver_opts, libp2p::dns::ResolverOpts::default());
465    }
466}