hopr_transport_p2p/
swarm.rs

1use std::{net::Ipv4Addr, num::NonZeroU8};
2
3use futures::{Sink, SinkExt, Stream, StreamExt, select};
4use hopr_internal_types::prelude::*;
5#[cfg(all(feature = "prometheus", not(test)))]
6use hopr_metrics::metrics::SimpleGauge;
7use hopr_transport_identity::{
8    Multiaddr, PeerId,
9    multiaddrs::{replace_transport_with_unspecified, resolve_dns_if_any},
10};
11use hopr_transport_protocol::PeerDiscovery;
12use libp2p::{
13    autonat,
14    multiaddr::Protocol,
15    request_response::{OutboundRequestId, ResponseChannel},
16    swarm::{NetworkInfo, SwarmEvent, dial_opts::DialOpts},
17};
18use tracing::{debug, error, info, trace, warn};
19
20use crate::{HoprNetworkBehavior, HoprNetworkBehaviorEvent, constants, errors::Result};
21
22#[cfg(all(feature = "prometheus", not(test)))]
23lazy_static::lazy_static! {
24    static ref METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT: SimpleGauge = SimpleGauge::new(
25        "hopr_transport_p2p_opened_connection_count",
26        "Number of currently open connections"
27    ).unwrap();
28}
29
30/// Build objects comprising the p2p network.
31///
32/// Returns a built [libp2p::Swarm] object implementing the HoprNetworkBehavior functionality.
33async fn build_p2p_network<T>(
34    me: libp2p::identity::Keypair,
35    indexer_update_input: T,
36) -> Result<libp2p::Swarm<HoprNetworkBehavior>>
37where
38    T: Stream<Item = PeerDiscovery> + Send + 'static,
39{
40    let me_peerid: PeerId = me.public().into();
41
42    // Both features could be enabled during testing, therefore we only use tokio when its
43    // exclusively enabled.
44    #[cfg(feature = "runtime-tokio")]
45    let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
46        .with_tokio()
47        .with_tcp(
48            libp2p::tcp::Config::default().nodelay(true),
49            libp2p::noise::Config::new,
50            // use default yamux configuration to enable auto-tuning
51            // see https://github.com/libp2p/rust-libp2p/pull/4970
52            libp2p::yamux::Config::default,
53        )
54        .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
55        .with_quic()
56        .with_dns();
57
58    Ok(swarm
59        .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
60        .with_behaviour(|_key| HoprNetworkBehavior::new(me_peerid, indexer_update_input))
61        .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
62        .with_swarm_config(|cfg| {
63            cfg.with_dial_concurrency_factor(
64                NonZeroU8::new(
65                    std::env::var("HOPR_INTERNAL_LIBP2P_MAX_CONCURRENTLY_DIALED_PEER_COUNT")
66                        .map(|v| v.trim().parse::<u8>().unwrap_or(u8::MAX))
67                        .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_DIALED_PEER_COUNT),
68                )
69                .expect("concurrently dialed peer count must be > 0"),
70            )
71            .with_max_negotiating_inbound_streams(
72                std::env::var("HOPR_INTERNAL_LIBP2P_MAX_NEGOTIATING_INBOUND_STREAM_COUNT")
73                    .and_then(|v| v.parse::<usize>().map_err(|_e| std::env::VarError::NotPresent))
74                    .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_NEGOTIATING_INBOUND_PEER_COUNT),
75            )
76            .with_idle_connection_timeout(
77                std::env::var("HOPR_INTERNAL_LIBP2P_SWARM_IDLE_TIMEOUT")
78                    .and_then(|v| v.parse::<u64>().map_err(|_e| std::env::VarError::NotPresent))
79                    .map(std::time::Duration::from_secs)
80                    .unwrap_or(constants::HOPR_SWARM_IDLE_CONNECTION_TIMEOUT),
81            )
82        })
83        .build())
84}
85
86pub struct HoprSwarm {
87    pub(crate) swarm: libp2p::Swarm<HoprNetworkBehavior>,
88}
89
90impl std::fmt::Debug for HoprSwarm {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        f.debug_struct("HoprSwarm").finish()
93    }
94}
95
96impl From<HoprSwarm> for libp2p::Swarm<HoprNetworkBehavior> {
97    fn from(value: HoprSwarm) -> Self {
98        value.swarm
99    }
100}
101
102impl HoprSwarm {
103    pub async fn new<T>(
104        identity: libp2p::identity::Keypair,
105        indexer_update_input: T,
106        my_multiaddresses: Vec<Multiaddr>,
107    ) -> Self
108    where
109        T: Stream<Item = PeerDiscovery> + Send + 'static,
110    {
111        let mut swarm = build_p2p_network(identity, indexer_update_input)
112            .await
113            .expect("swarm must be constructible");
114
115        for multiaddress in my_multiaddresses.iter() {
116            match resolve_dns_if_any(multiaddress) {
117                Ok(ma) => {
118                    if let Err(e) = swarm.listen_on(ma.clone()) {
119                        warn!(%multiaddress, listen_on=%ma, error = %e, "Failed to listen_on, will try to use an unspecified address");
120
121                        match replace_transport_with_unspecified(&ma) {
122                            Ok(ma) => {
123                                if let Err(e) = swarm.listen_on(ma.clone()) {
124                                    warn!(multiaddress = %ma, error = %e, "Failed to listen_on using the unspecified multiaddress",);
125                                } else {
126                                    info!(
127                                        listen_on = ?ma,
128                                        multiaddress = ?multiaddress,
129                                        "Listening for p2p connections"
130                                    );
131                                    swarm.add_external_address(multiaddress.clone());
132                                }
133                            }
134                            Err(e) => {
135                                error!(multiaddress = %ma, error = %e, "Failed to transform the multiaddress")
136                            }
137                        }
138                    } else {
139                        info!(
140                            listen_on = ?ma,
141                            multiaddress = ?multiaddress,
142                            "Listening for p2p connections"
143                        );
144                        swarm.add_external_address(multiaddress.clone());
145                    }
146                }
147                Err(e) => error!(%multiaddress, error = %e, "Failed to transform the multiaddress"),
148            }
149        }
150
151        // TODO: perform this check
152        // NOTE: This would be a valid check but is not immediate
153        // assert!(
154        //     swarm.listeners().count() > 0,
155        //     "The node failed to listen on at least one of the specified interfaces"
156        // );
157
158        Self { swarm }
159    }
160
161    pub fn build_protocol_control(&self, protocol: &'static str) -> crate::HoprStreamProtocolControl {
162        crate::HoprStreamProtocolControl::new(self.swarm.behaviour().streams.new_control(), protocol)
163    }
164
165    /// Main p2p loop that instantiates a new libp2p::Swarm instance and sets up listening and reacting pipelines
166    /// running in a neverending loop future.
167    ///
168    /// The function represents the entirety of the business logic of the hopr daemon related to core operations.
169    ///
170    /// This future can only be resolved by an unrecoverable error or a panic.
171    pub async fn run<T>(self, events: T)
172    where
173        T: Sink<crate::behavior::discovery::Event> + Send + 'static,
174    {
175        let mut swarm: libp2p::Swarm<HoprNetworkBehavior> = self.into();
176        futures::pin_mut!(events);
177
178        loop {
179            select! {
180                event = swarm.select_next_some() => match event {
181                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Discovery(event)) => {
182                        if let Err(_error) = events.send(event).await {
183                            tracing::error!("Failed to send discovery event from the transport layer");
184                        }
185                    }
186                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::AutonatClient(autonat::v2::client::Event {
187                        server,
188                        tested_addr,
189                        bytes_sent,
190                        result,
191                    })) => {
192                        match result {
193                            Ok(_) => {
194                                debug!(%server, %tested_addr, %bytes_sent, "Autonat server successfully tested");
195                            }
196                            Err(error) => {
197                                warn!(%server, %tested_addr, %bytes_sent, %error, "Autonat server test failed");
198                            }
199                        }
200                    }
201                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::AutonatServer(event)) => {
202                        warn!(?event, "Autonat server event");
203                    }
204                    SwarmEvent::ConnectionEstablished {
205                        peer_id,
206                        connection_id,
207                        num_established,
208                        established_in,
209                        ..
210                        // concurrent_dial_errors,
211                        // endpoint,
212                    } => {
213                        debug!(%peer_id, %connection_id, num_established, established_in_ms = established_in.as_millis(), transport="libp2p", "connection established");
214
215                        print_network_info(swarm.network_info(), "connection established");
216
217                        #[cfg(all(feature = "prometheus", not(test)))]
218                        {
219                            METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.increment(1.0);
220                        }
221                    }
222                    SwarmEvent::ConnectionClosed {
223                        peer_id,
224                        connection_id,
225                        cause,
226                        num_established,
227                        ..
228                        // endpoint,
229                    } => {
230                        debug!(%peer_id, %connection_id, num_established, transport="libp2p", "connection closed: {cause:?}");
231
232                        print_network_info(swarm.network_info(), "connection closed");
233
234                        #[cfg(all(feature = "prometheus", not(test)))]
235                        {
236                            METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.decrement(1.0);
237                        }
238                    }
239                    SwarmEvent::IncomingConnection {
240                        connection_id,
241                        local_addr,
242                        send_back_addr,
243                    } => {
244                        trace!(%local_addr, %send_back_addr, %connection_id, transport="libp2p",  "incoming connection");
245                    }
246                    SwarmEvent::IncomingConnectionError {
247                        local_addr,
248                        connection_id,
249                        error,
250                        send_back_addr,
251                        peer_id
252                    } => {
253                        error!(?peer_id, %local_addr, %send_back_addr, %connection_id, transport="libp2p", %error, "incoming connection error");
254
255                        print_network_info(swarm.network_info(), "incoming connection error");
256                    }
257                    SwarmEvent::OutgoingConnectionError {
258                        connection_id,
259                        error,
260                        peer_id
261                    } => {
262                        error!(peer = ?peer_id, %connection_id, transport="libp2p", %error, "outgoing connection error");
263
264                        print_network_info(swarm.network_info(), "outgoing connection error");
265                    }
266                    SwarmEvent::NewListenAddr {
267                        listener_id,
268                        address,
269                    } => {
270                        debug!(%listener_id, %address, transport="libp2p", "new listen address")
271                    }
272                    SwarmEvent::ExpiredListenAddr {
273                        listener_id,
274                        address,
275                    } => {
276                        debug!(%listener_id, %address, transport="libp2p", "expired listen address")
277                    }
278                    SwarmEvent::ListenerClosed {
279                        listener_id,
280                        addresses,
281                        reason,
282                    } => {
283                        debug!(%listener_id, ?addresses, ?reason, transport="libp2p", "listener closed", )
284                    }
285                    SwarmEvent::ListenerError {
286                        listener_id,
287                        error,
288                    } => {
289                        debug!(%listener_id, transport="libp2p", %error, "listener error")
290                    }
291                    SwarmEvent::Dialing {
292                        peer_id,
293                        connection_id,
294                    } => {
295                        debug!(peer = ?peer_id, %connection_id, transport="libp2p", "dialing")
296                    }
297                    SwarmEvent::NewExternalAddrCandidate {
298                        ..  // address: Multiaddr
299                    } => {}
300                    SwarmEvent::ExternalAddrConfirmed { address } => {
301                        info!(%address, "Detected external address")
302                    }
303                    SwarmEvent::ExternalAddrExpired {
304                        ..  // address: Multiaddr
305                    } => {}
306                    SwarmEvent::NewExternalAddrOfPeer {
307                        peer_id, address
308                    } => {
309                        swarm.add_peer_address(peer_id, address.clone());
310                        trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "New peer stored in swarm")
311                    },
312                    _ => trace!(transport="libp2p", "Unsupported enum option detected")
313                }
314            }
315        }
316    }
317
318    pub fn run_nat_server(&mut self, port: u16) {
319        info!(listen_on = port, "Starting NAT server");
320
321        match self.swarm.listen_on(
322            Multiaddr::empty()
323                .with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED))
324                .with(Protocol::Tcp(port)),
325        ) {
326            Ok(_) => {
327                info!("NAT server started");
328            }
329            Err(e) => {
330                warn!(error = %e, "Failed to listen on NAT server");
331            }
332        }
333    }
334
335    pub fn dial_nat_server(&mut self, addresses: Vec<Multiaddr>) {
336        // let dial_opts = DialOpts::peer_id(PeerId::random())
337        //     .addresses(addresses)
338        //     .extend_addresses_through_behaviour()
339        //     .build();
340        info!(
341            num_addresses = addresses.len(),
342            "Dialing NAT servers with multiple candidate addresses"
343        );
344
345        for addr in addresses {
346            let dial_opts = DialOpts::unknown_peer_id().address(addr.clone()).build();
347            if let Err(e) = self.swarm.dial(dial_opts) {
348                warn!(%addr, %e, "Failed to dial NAT server address");
349            } else {
350                info!(%addr, "Dialed NAT server address");
351                break;
352            }
353        }
354    }
355}
356
357fn print_network_info(network_info: NetworkInfo, event: &str) {
358    let num_peers = network_info.num_peers();
359    let connection_counters = network_info.connection_counters();
360    let num_incoming = connection_counters.num_established_incoming();
361    let num_outgoing = connection_counters.num_established_outgoing();
362    info!(
363        num_peers,
364        num_incoming, num_outgoing, "swarm network status after {event}"
365    );
366}
367
368pub type TicketAggregationRequestType = OutboundRequestId;
369pub type TicketAggregationResponseType = ResponseChannel<std::result::Result<Ticket, String>>;