hopr_transport_p2p/
swarm.rs

1use std::num::NonZeroU8;
2
3use futures::{Sink, SinkExt, Stream, StreamExt, select};
4use hopr_internal_types::prelude::*;
5use hopr_network_types::prelude::is_public_address;
6use hopr_transport_identity::{
7    Multiaddr,
8    multiaddrs::{replace_transport_with_unspecified, resolve_dns_if_any},
9};
10use hopr_transport_protocol::PeerDiscovery;
11use libp2p::{
12    autonat,
13    identity::PublicKey,
14    request_response::{OutboundRequestId, ResponseChannel},
15    swarm::{NetworkInfo, SwarmEvent},
16};
17use tracing::{debug, error, info, trace, warn};
18
19use crate::{HoprNetworkBehavior, HoprNetworkBehaviorEvent, constants, errors::Result};
20
21#[cfg(all(feature = "prometheus", not(test)))]
22lazy_static::lazy_static! {
23    static ref METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
24        "hopr_transport_p2p_opened_connection_count",
25        "Number of currently open connections"
26    ).unwrap();
27    static ref METRIC_TRANSPORT_NAT_STATUS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
28        "hopr_transport_p2p_nat_status",
29        "Current NAT status as reported by libp2p autonat. 0=Unknown, 1=Public, 2=Private"
30    ).unwrap();
31}
32
33/// Build objects comprising the p2p network.
34///
35/// Returns a built [libp2p::Swarm] object implementing the HoprNetworkBehavior functionality.
36async fn build_p2p_network<T>(
37    me: libp2p::identity::Keypair,
38    indexer_update_input: T,
39) -> Result<libp2p::Swarm<HoprNetworkBehavior>>
40where
41    T: Stream<Item = PeerDiscovery> + Send + 'static,
42{
43    let me_public: PublicKey = me.public();
44
45    // Both features could be enabled during testing, therefore we only use tokio when its
46    // exclusively enabled.
47    //
48    // NOTE: Private address filtering is implemented at multiple levels for defense-in-depth:
49    // 1. Discovery behavior filters PeerDiscovery::Allow and PeerDiscovery::Announce events
50    // 2. SwarmEvent::NewExternalAddrOfPeer events are filtered using is_public_address()
51    // 3. Network layer filters addresses in both add() and get() methods (primary protection)
52    // 4. libp2p's global_only transport wrapper could be added here but the above filtering provides equivalent
53    //    protection while maintaining compatibility with the existing code
54    #[cfg(feature = "runtime-tokio")]
55    let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
56        .with_tokio()
57        .with_tcp(
58            libp2p::tcp::Config::default().nodelay(true),
59            libp2p::noise::Config::new,
60            // use default yamux configuration to enable auto-tuning
61            // see https://github.com/libp2p/rust-libp2p/pull/4970
62            libp2p::yamux::Config::default,
63        )
64        .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?;
65
66    #[cfg(all(feature = "transport-quic", feature = "runtime-tokio"))]
67    let swarm = swarm.with_quic();
68
69    #[cfg(feature = "runtime-tokio")]
70    let swarm = swarm.with_dns();
71
72    Ok(swarm
73        .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
74        .with_behaviour(|_key| HoprNetworkBehavior::new(me_public, indexer_update_input))
75        .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
76        .with_swarm_config(|cfg| {
77            cfg.with_dial_concurrency_factor(
78                NonZeroU8::new(
79                    std::env::var("HOPR_INTERNAL_LIBP2P_MAX_CONCURRENTLY_DIALED_PEER_COUNT")
80                        .map(|v| v.trim().parse::<u8>().unwrap_or(u8::MAX))
81                        .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_DIALED_PEER_COUNT),
82                )
83                .expect("concurrently dialed peer count must be > 0"),
84            )
85            .with_max_negotiating_inbound_streams(
86                std::env::var("HOPR_INTERNAL_LIBP2P_MAX_NEGOTIATING_INBOUND_STREAM_COUNT")
87                    .and_then(|v| v.parse::<usize>().map_err(|_e| std::env::VarError::NotPresent))
88                    .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_NEGOTIATING_INBOUND_PEER_COUNT),
89            )
90            .with_idle_connection_timeout(
91                std::env::var("HOPR_INTERNAL_LIBP2P_SWARM_IDLE_TIMEOUT")
92                    .and_then(|v| v.parse::<u64>().map_err(|_e| std::env::VarError::NotPresent))
93                    .map(std::time::Duration::from_secs)
94                    .unwrap_or(constants::HOPR_SWARM_IDLE_CONNECTION_TIMEOUT),
95            )
96        })
97        .build())
98}
99
100pub struct HoprSwarm {
101    pub(crate) swarm: libp2p::Swarm<HoprNetworkBehavior>,
102}
103
104impl std::fmt::Debug for HoprSwarm {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        f.debug_struct("HoprSwarm").finish()
107    }
108}
109
110impl From<HoprSwarm> for libp2p::Swarm<HoprNetworkBehavior> {
111    fn from(value: HoprSwarm) -> Self {
112        value.swarm
113    }
114}
115
116impl HoprSwarm {
117    pub async fn new<T>(
118        identity: libp2p::identity::Keypair,
119        indexer_update_input: T,
120        my_multiaddresses: Vec<Multiaddr>,
121    ) -> Self
122    where
123        T: Stream<Item = PeerDiscovery> + Send + 'static,
124    {
125        let mut swarm = build_p2p_network(identity, indexer_update_input)
126            .await
127            .expect("swarm must be constructible");
128
129        for multiaddress in my_multiaddresses.iter() {
130            match resolve_dns_if_any(multiaddress) {
131                Ok(ma) => {
132                    if let Err(e) = swarm.listen_on(ma.clone()) {
133                        warn!(%multiaddress, listen_on=%ma, error = %e, "Failed to listen_on, will try to use an unspecified address");
134
135                        match replace_transport_with_unspecified(&ma) {
136                            Ok(ma) => {
137                                if let Err(e) = swarm.listen_on(ma.clone()) {
138                                    warn!(multiaddress = %ma, error = %e, "Failed to listen_on using the unspecified multiaddress",);
139                                } else {
140                                    info!(
141                                        listen_on = ?ma,
142                                        multiaddress = ?multiaddress,
143                                        "Listening for p2p connections"
144                                    );
145                                    swarm.add_external_address(multiaddress.clone());
146                                }
147                            }
148                            Err(e) => {
149                                error!(multiaddress = %ma, error = %e, "Failed to transform the multiaddress")
150                            }
151                        }
152                    } else {
153                        info!(
154                            listen_on = ?ma,
155                            multiaddress = ?multiaddress,
156                            "Listening for p2p connections"
157                        );
158                        swarm.add_external_address(multiaddress.clone());
159                    }
160                }
161                Err(e) => error!(%multiaddress, error = %e, "Failed to transform the multiaddress"),
162            }
163        }
164
165        // TODO: perform this check
166        // NOTE: This would be a valid check but is not immediate
167        // assert!(
168        //     swarm.listeners().count() > 0,
169        //     "The node failed to listen on at least one of the specified interfaces"
170        // );
171
172        Self { swarm }
173    }
174
175    pub fn build_protocol_control(&self, protocol: &'static str) -> crate::HoprStreamProtocolControl {
176        crate::HoprStreamProtocolControl::new(self.swarm.behaviour().streams.new_control(), protocol)
177    }
178
179    /// Main p2p loop that instantiates a new libp2p::Swarm instance and sets up listening and reacting pipelines
180    /// running in a neverending loop future.
181    ///
182    /// The function represents the entirety of the business logic of the hopr daemon related to core operations.
183    ///
184    /// This future can only be resolved by an unrecoverable error or a panic.
185    pub async fn run<T>(self, events: T)
186    where
187        T: Sink<crate::behavior::discovery::Event> + Send + 'static,
188    {
189        let mut swarm: libp2p::Swarm<HoprNetworkBehavior> = self.into();
190        futures::pin_mut!(events);
191
192        loop {
193            select! {
194                event = swarm.select_next_some() => match event {
195                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Discovery(event)) => {
196                        if let Err(_error) = events.send(event).await {
197                            tracing::error!("Failed to send discovery event from the transport layer");
198                        }
199                    }
200                    SwarmEvent::Behaviour(
201                        HoprNetworkBehaviorEvent::Autonat(event)
202                    ) => {
203                            match event {
204                                autonat::Event::StatusChanged { old, new } => {
205                                    info!(?old, ?new, "AutoNAT status changed");
206                                    #[cfg(all(feature = "prometheus", not(test)))]
207                                    {
208                                        let value = match new {
209                                            autonat::NatStatus::Unknown => 0.0,
210                                            autonat::NatStatus::Public(_) => 1.0,
211                                            autonat::NatStatus::Private => 2.0,
212                                        };
213                                        METRIC_TRANSPORT_NAT_STATUS.set(value);
214                                    }
215                                }
216                                autonat::Event::InboundProbe { .. } => {}
217                                autonat::Event::OutboundProbe { .. } => {}
218                            }
219                    }
220                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Identify(_event)) => {}
221                    SwarmEvent::ConnectionEstablished {
222                        peer_id,
223                        connection_id,
224                        num_established,
225                        established_in,
226                        ..
227                        // concurrent_dial_errors,
228                        // endpoint,
229                    } => {
230                        debug!(%peer_id, %connection_id, num_established, established_in_ms = established_in.as_millis(), transport="libp2p", "connection established");
231
232                        print_network_info(swarm.network_info(), "connection established");
233
234                        #[cfg(all(feature = "prometheus", not(test)))]
235                        {
236                            METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.increment(1.0);
237                        }
238                    }
239                    SwarmEvent::ConnectionClosed {
240                        peer_id,
241                        connection_id,
242                        cause,
243                        num_established,
244                        ..
245                        // endpoint,
246                    } => {
247                        debug!(%peer_id, %connection_id, num_established, transport="libp2p", "connection closed: {cause:?}");
248
249                        print_network_info(swarm.network_info(), "connection closed");
250
251                        #[cfg(all(feature = "prometheus", not(test)))]
252                        {
253                            METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.decrement(1.0);
254                        }
255                    }
256                    SwarmEvent::IncomingConnection {
257                        connection_id,
258                        local_addr,
259                        send_back_addr,
260                    } => {
261                        trace!(%local_addr, %send_back_addr, %connection_id, transport="libp2p",  "incoming connection");
262                    }
263                    SwarmEvent::IncomingConnectionError {
264                        local_addr,
265                        connection_id,
266                        error,
267                        send_back_addr,
268                        peer_id
269                    } => {
270                        error!(?peer_id, %local_addr, %send_back_addr, %connection_id, transport="libp2p", %error, "incoming connection error");
271
272                        print_network_info(swarm.network_info(), "incoming connection error");
273                    }
274                    SwarmEvent::OutgoingConnectionError {
275                        connection_id,
276                        error,
277                        peer_id
278                    } => {
279                        error!(peer = ?peer_id, %connection_id, transport="libp2p", %error, "outgoing connection error");
280
281                        print_network_info(swarm.network_info(), "outgoing connection error");
282                    }
283                    SwarmEvent::NewListenAddr {
284                        listener_id,
285                        address,
286                    } => {
287                        debug!(%listener_id, %address, transport="libp2p", "new listen address")
288                    }
289                    SwarmEvent::ExpiredListenAddr {
290                        listener_id,
291                        address,
292                    } => {
293                        debug!(%listener_id, %address, transport="libp2p", "expired listen address")
294                    }
295                    SwarmEvent::ListenerClosed {
296                        listener_id,
297                        addresses,
298                        reason,
299                    } => {
300                        debug!(%listener_id, ?addresses, ?reason, transport="libp2p", "listener closed", )
301                    }
302                    SwarmEvent::ListenerError {
303                        listener_id,
304                        error,
305                    } => {
306                        debug!(%listener_id, transport="libp2p", %error, "listener error")
307                    }
308                    SwarmEvent::Dialing {
309                        peer_id,
310                        connection_id,
311                    } => {
312                        debug!(peer = ?peer_id, %connection_id, transport="libp2p", "dialing")
313                    }
314                    SwarmEvent::NewExternalAddrCandidate {address} => {
315                        debug!(%address, "Detected new external address candidate")
316                    }
317                    SwarmEvent::ExternalAddrConfirmed { address } => {
318                        info!(%address, "Detected external address")
319                    }
320                    SwarmEvent::ExternalAddrExpired {
321                        ..  // address: Multiaddr
322                    } => {}
323                    SwarmEvent::NewExternalAddrOfPeer {
324                        peer_id, address
325                    } => {
326                        // Only store public/routable addresses
327                        if is_public_address(&address) {
328                            swarm.add_peer_address(peer_id, address.clone());
329                            trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Public peer address stored in swarm")
330                        } else {
331                            trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Private/local peer address ignored")
332                        }
333                    },
334                    _ => trace!(transport="libp2p", "Unsupported enum option detected")
335                }
336            }
337        }
338    }
339}
340
341fn print_network_info(network_info: NetworkInfo, event: &str) {
342    let num_peers = network_info.num_peers();
343    let connection_counters = network_info.connection_counters();
344    let num_incoming = connection_counters.num_established_incoming();
345    let num_outgoing = connection_counters.num_established_outgoing();
346    info!(
347        num_peers,
348        num_incoming, num_outgoing, "swarm network status after {event}"
349    );
350}
351
352pub type TicketAggregationRequestType = OutboundRequestId;
353pub type TicketAggregationResponseType = ResponseChannel<std::result::Result<Ticket, String>>;