hopr_transport_p2p/
swarm.rs

1use std::num::NonZeroU8;
2
3use futures::{Stream, StreamExt};
4use hopr_network_types::prelude::is_public_address;
5use hopr_transport_identity::{
6    Multiaddr,
7    multiaddrs::{replace_transport_with_unspecified, resolve_dns_if_any},
8};
9use hopr_transport_protocol::PeerDiscovery;
10use libp2p::{
11    PeerId, autonat,
12    identity::PublicKey,
13    swarm::{NetworkInfo, SwarmEvent},
14};
15use tracing::{debug, error, info, trace, warn};
16
17use crate::{HoprNetwork, HoprNetworkBehavior, HoprNetworkBehaviorEvent, constants, errors::Result};
18
19#[cfg(all(feature = "prometheus", not(test)))]
20lazy_static::lazy_static! {
21    static ref METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
22        "hopr_transport_p2p_active_connection_count",
23        "Number of currently active connections"
24    ).unwrap();
25    static ref METRIC_TRANSPORT_NAT_STATUS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
26        "hopr_transport_p2p_nat_status",
27        "Current NAT status as reported by libp2p autonat. 0=Unknown, 1=Public, 2=Private"
28    ).unwrap();
29    static ref METRIC_NETWORK_HEALTH: hopr_metrics::SimpleGauge =
30         hopr_metrics::SimpleGauge::new("hopr_network_health", "Connectivity health indicator").unwrap();
31}
32
33pub struct InactiveNetwork {
34    swarm: libp2p::Swarm<HoprNetworkBehavior>,
35}
36
37/// Build objects comprising an inactive p2p network.
38///
39/// Returns a built [libp2p::Swarm] object implementing the HoprNetworkBehavior functionality.
40impl InactiveNetwork {
41    #[cfg(feature = "runtime-tokio")]
42    pub async fn build<T>(me: libp2p::identity::Keypair, external_discovery_events: T) -> Result<Self>
43    where
44        T: Stream<Item = PeerDiscovery> + Send + 'static,
45    {
46        let me_public: PublicKey = me.public();
47
48        let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
49            .with_tokio()
50            .with_tcp(
51                libp2p::tcp::Config::default().nodelay(true),
52                libp2p::noise::Config::new,
53                // use default yamux configuration to enable auto-tuning
54                // see https://github.com/libp2p/rust-libp2p/pull/4970
55                libp2p::yamux::Config::default,
56            )
57            .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?;
58
59        #[cfg(feature = "transport-quic")]
60        let swarm = swarm.with_quic();
61
62        let swarm = swarm.with_dns();
63
64        Ok(Self {
65            swarm: swarm
66                .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
67                .with_behaviour(|_key| HoprNetworkBehavior::new(me_public, external_discovery_events))
68                .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
69                .with_swarm_config(|cfg| {
70                    cfg.with_dial_concurrency_factor(
71                        NonZeroU8::new({
72                            let v = std::env::var("HOPR_INTERNAL_LIBP2P_MAX_CONCURRENTLY_DIALED_PEER_COUNT")
73                                .ok()
74                                .and_then(|v| v.trim().parse::<u8>().ok())
75                                .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_DIALED_PEER_COUNT);
76                            v.max(1)
77                        })
78                        .expect("clamped to >= 1, will never fail"),
79                    )
80                    .with_max_negotiating_inbound_streams(
81                        std::env::var("HOPR_INTERNAL_LIBP2P_MAX_NEGOTIATING_INBOUND_STREAM_COUNT")
82                            .and_then(|v| v.parse::<usize>().map_err(|_e| std::env::VarError::NotPresent))
83                            .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_NEGOTIATING_INBOUND_PEER_COUNT),
84                    )
85                    .with_idle_connection_timeout(
86                        std::env::var("HOPR_INTERNAL_LIBP2P_SWARM_IDLE_TIMEOUT")
87                            .and_then(|v| v.parse::<u64>().map_err(|_e| std::env::VarError::NotPresent))
88                            .map(std::time::Duration::from_secs)
89                            .unwrap_or(constants::HOPR_SWARM_IDLE_CONNECTION_TIMEOUT),
90                    )
91                })
92                .build(),
93        })
94    }
95
96    pub fn with_listen_on(mut self, multiaddresses: Vec<Multiaddr>) -> Result<InactiveConfiguredNetwork> {
97        for multiaddress in multiaddresses.iter() {
98            match resolve_dns_if_any(multiaddress) {
99                Ok(ma) => {
100                    if let Err(e) = self.swarm.listen_on(ma.clone()) {
101                        warn!(%multiaddress, listen_on=%ma, error = %e, "Failed to listen_on, will try to use an unspecified address");
102
103                        match replace_transport_with_unspecified(&ma) {
104                            Ok(ma) => {
105                                if let Err(e) = self.swarm.listen_on(ma.clone()) {
106                                    warn!(multiaddress = %ma, error = %e, "Failed to listen_on using the unspecified multiaddress",);
107                                } else {
108                                    info!(
109                                        listen_on = ?ma,
110                                        multiaddress = ?multiaddress,
111                                        "Listening for p2p connections"
112                                    );
113                                    self.swarm.add_external_address(multiaddress.clone());
114                                }
115                            }
116                            Err(e) => {
117                                error!(multiaddress = %ma, error = %e, "Failed to transform the multiaddress")
118                            }
119                        }
120                    } else {
121                        info!(
122                            listen_on = ?ma,
123                            multiaddress = ?multiaddress,
124                            "Listening for p2p connections"
125                        );
126                        self.swarm.add_external_address(multiaddress.clone());
127                    }
128                }
129                Err(error) => error!(%multiaddress, %error, "Failed to transform the multiaddress"),
130            }
131        }
132
133        Ok(InactiveConfiguredNetwork { swarm: self.swarm })
134    }
135}
136
137pub struct InactiveConfiguredNetwork {
138    swarm: libp2p::Swarm<HoprNetworkBehavior>,
139}
140
141/// Builder of the network view and an actual background process running the libp2p core
142/// event processing loop.
143///
144/// This object is primarily constructed to allow delayed starting of the background process,
145/// as well as setup all the interconnections with the underlying network views to allow complex
146/// functionality and signalling.
147pub struct HoprLibp2pNetworkBuilder {
148    pub(crate) swarm: libp2p::Swarm<HoprNetworkBehavior>,
149    me: PeerId,
150    my_addresses: Vec<Multiaddr>,
151}
152
153impl std::fmt::Debug for HoprLibp2pNetworkBuilder {
154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155        f.debug_struct("HoprSwarm").finish()
156    }
157}
158
159impl From<HoprLibp2pNetworkBuilder> for libp2p::Swarm<HoprNetworkBehavior> {
160    fn from(value: HoprLibp2pNetworkBuilder) -> Self {
161        value.swarm
162    }
163}
164
165impl HoprLibp2pNetworkBuilder {
166    pub async fn new<T>(
167        identity: libp2p::identity::Keypair,
168        external_discovery_events: T,
169        my_multiaddresses: Vec<Multiaddr>,
170    ) -> Self
171    where
172        T: Stream<Item = PeerDiscovery> + Send + 'static,
173    {
174        #[cfg(all(feature = "prometheus", not(test)))]
175        {
176            METRIC_NETWORK_HEALTH.set(0.0);
177        }
178
179        let me = identity.public().to_peer_id();
180        let swarm = InactiveNetwork::build(identity, external_discovery_events)
181            .await
182            .expect("swarm must be constructible");
183
184        let swarm = swarm
185            .with_listen_on(my_multiaddresses.clone())
186            .expect("swarm must be configurable");
187
188        Self {
189            swarm: swarm.swarm,
190            me,
191            my_addresses: my_multiaddresses,
192        }
193    }
194
195    pub fn into_network_with_stream_protocol_process(
196        self,
197        protocol: &'static str,
198        allow_private_addresses: bool,
199    ) -> (HoprNetwork, impl std::future::Future<Output = ()>) {
200        let tracker = hopr_transport_network::track::NetworkPeerTracker::new();
201        let store =
202            hopr_transport_network::store::NetworkPeerStore::new(self.me, self.my_addresses.into_iter().collect());
203
204        let network = HoprNetwork {
205            tracker: tracker.clone(),
206            store: store.clone(),
207            control: self.swarm.behaviour().streams.new_control(),
208            protocol: libp2p::StreamProtocol::new(protocol),
209        };
210
211        #[cfg(all(feature = "prometheus", not(test)))]
212        let network_inner = network.clone();
213        let mut swarm = self.swarm;
214        let process = async move {
215            while let Some(event) = swarm.next().await {
216                match event {
217                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Discovery(_)) => {}
218                    SwarmEvent::Behaviour(
219                        HoprNetworkBehaviorEvent::Autonat(event)
220                    ) => {
221                        match event {
222                            autonat::Event::StatusChanged { old, new } => {
223                                info!(?old, ?new, "AutoNAT status changed");
224                                #[cfg(all(feature = "prometheus", not(test)))]
225                                {
226                                    let value = match new {
227                                        autonat::NatStatus::Unknown => 0.0,
228                                        autonat::NatStatus::Public(_) => 1.0,
229                                        autonat::NatStatus::Private => 2.0,
230                                    };
231                                    METRIC_TRANSPORT_NAT_STATUS.set(value);
232                                }
233                            }
234                            autonat::Event::InboundProbe { .. } => {}
235                            autonat::Event::OutboundProbe { .. } => {}
236                        }
237                    }
238                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Identify(_event)) => {}
239                    SwarmEvent::ConnectionEstablished {
240                        peer_id,
241                        connection_id,
242                        num_established,
243                        established_in,
244                        endpoint,
245                        ..
246                        // concurrent_dial_errors,
247                    } => {
248                        debug!(%peer_id, %connection_id, num_established, established_in_ms = established_in.as_millis(), transport="libp2p", "connection established");
249
250                        if num_established == std::num::NonZero::<u32>::new(1).expect("must be a non-zero value") {
251                            match endpoint {
252                                libp2p::core::ConnectedPoint::Dialer { address, .. } => {
253                                    if allow_private_addresses || is_public_address(&address) {
254                                        if let Err(error) = store.add(peer_id, std::collections::HashSet::from([address])) {
255                                            error!(peer = %peer_id, %error, direction = "outgoing", "failed to add connected peer to the peer store");
256                                        }
257                                        tracker.add(peer_id);
258                                    } else {
259                                        debug!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Private/local peer address ignored")
260                                    }
261                                },
262                                libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => {
263                                    if allow_private_addresses || is_public_address(&send_back_addr) {
264                                        if let Err(error) = store.add(peer_id, std::collections::HashSet::from([send_back_addr])) {
265                                            error!(peer = %peer_id, %error, direction = "incoming", "failed to add connected peer to the peer store");
266                                        }
267                                        tracker.add(peer_id);
268                                    } else {
269                                        debug!(transport="libp2p", peer = %peer_id, multiaddress = %send_back_addr, "Private/local peer address ignored")
270                                    }
271                                }
272                            }
273                        } else {
274                            trace!(transport="libp2p", peer = %peer_id, num_established, "Additional connection established")
275                        }
276
277                        print_network_info(swarm.network_info(), "connection established");
278
279                        #[cfg(all(feature = "prometheus", not(test)))]
280                        {
281                            METRIC_NETWORK_HEALTH.set((hopr_api::network::NetworkView::health(&network_inner) as i32).into());
282                            METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.increment(1.0);
283                        }
284                    }
285                    SwarmEvent::ConnectionClosed {
286                        peer_id,
287                        connection_id,
288                        cause,
289                        num_established,
290                        ..
291                        // endpoint,
292                    } => {
293                        debug!(%peer_id, %connection_id, num_established, transport="libp2p", "connection closed: {cause:?}");
294
295                        if num_established == 0 {
296                            tracker.remove(&peer_id);
297                        }
298
299                        print_network_info(swarm.network_info(), "connection closed");
300
301                        #[cfg(all(feature = "prometheus", not(test)))]
302                        {
303                            METRIC_NETWORK_HEALTH.set((hopr_api::network::NetworkView::health(&network_inner) as i32).into());
304                            METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.decrement(1.0);
305                        }
306                    }
307                    SwarmEvent::IncomingConnection {
308                        connection_id,
309                        local_addr,
310                        send_back_addr,
311                    } => {
312                        trace!(%local_addr, %send_back_addr, %connection_id, transport="libp2p",  "incoming connection");
313                    }
314                    SwarmEvent::IncomingConnectionError {
315                        local_addr,
316                        connection_id,
317                        error,
318                        send_back_addr,
319                        peer_id
320                    } => {
321                        debug!(?peer_id, %local_addr, %send_back_addr, %connection_id, transport="libp2p", %error, "incoming connection error");
322                    }
323                    SwarmEvent::OutgoingConnectionError {
324                        connection_id,
325                        error,
326                        peer_id
327                    } => {
328                        debug!(peer = ?peer_id, %connection_id, transport="libp2p", %error, "outgoing connection error");
329
330                        if let Some(peer_id) = peer_id
331                            && !swarm.is_connected(&peer_id) {
332                                if let Err(error) = store.remove(&peer_id) {
333                                    error!(peer = %peer_id, %error, "failed to remove undialable peer from the peer store");
334                                }
335                                tracker.remove(&peer_id);
336                            }
337
338                        #[cfg(all(feature = "prometheus", not(test)))]
339                        {
340                            METRIC_NETWORK_HEALTH.set((hopr_api::network::NetworkView::health(&network_inner) as i32).into());
341                        }
342                    }
343                    SwarmEvent::NewListenAddr {
344                        listener_id,
345                        address,
346                    } => {
347                        debug!(%listener_id, %address, transport="libp2p", "new listen address")
348                    }
349                    SwarmEvent::ExpiredListenAddr {
350                        listener_id,
351                        address,
352                    } => {
353                        debug!(%listener_id, %address, transport="libp2p", "expired listen address")
354                    }
355                    SwarmEvent::ListenerClosed {
356                        listener_id,
357                        addresses,
358                        reason,
359                    } => {
360                        debug!(%listener_id, ?addresses, ?reason, transport="libp2p", "listener closed", )
361                    }
362                    SwarmEvent::ListenerError {
363                        listener_id,
364                        error,
365                    } => {
366                        debug!(%listener_id, transport="libp2p", %error, "listener error")
367                    }
368                    SwarmEvent::Dialing {
369                        peer_id,
370                        connection_id,
371                    } => {
372                        debug!(peer = ?peer_id, %connection_id, transport="libp2p", "dialing")
373                    }
374                    SwarmEvent::NewExternalAddrCandidate {address} => {
375                        debug!(%address, "Detected new external address candidate")
376                    }
377                    SwarmEvent::ExternalAddrConfirmed { address } => {
378                        info!(%address, "Detected external address")
379                    }
380                    SwarmEvent::ExternalAddrExpired {
381                        ..  // address: Multiaddr
382                    } => {}
383                    SwarmEvent::NewExternalAddrOfPeer {
384                        peer_id, address
385                    } => {
386                        // Only store public/routable addresses
387                        if allow_private_addresses || is_public_address(&address) {
388                            swarm.add_peer_address(peer_id, address.clone());
389                            trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Public peer address stored in swarm")
390                        } else {
391                            trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Private/local peer address ignored")
392                        }
393                    },
394                    _ => trace!(transport="libp2p", "Unsupported enum option detected")
395                }
396            }
397        };
398
399        (network, process)
400    }
401}
402
403fn print_network_info(network_info: NetworkInfo, event: &str) {
404    let num_peers = network_info.num_peers();
405    let connection_counters = network_info.connection_counters();
406    let num_incoming = connection_counters.num_established_incoming();
407    let num_outgoing = connection_counters.num_established_outgoing();
408    info!(
409        num_peers,
410        num_incoming, num_outgoing, "swarm network status after {event}"
411    );
412}