1use std::{net::Ipv4Addr, num::NonZeroU8};
2
3use futures::{Sink, SinkExt, Stream, StreamExt, select};
4use hopr_internal_types::prelude::*;
5use hopr_network_types::prelude::is_public_address;
6use hopr_transport_identity::{
7 Multiaddr, PeerId,
8 multiaddrs::{replace_transport_with_unspecified, resolve_dns_if_any},
9};
10use hopr_transport_protocol::PeerDiscovery;
11use libp2p::{
12 autonat,
13 multiaddr::Protocol,
14 request_response::{OutboundRequestId, ResponseChannel},
15 swarm::{NetworkInfo, SwarmEvent, dial_opts::DialOpts},
16};
17use tracing::{debug, error, info, trace, warn};
18
19use crate::{HoprNetworkBehavior, HoprNetworkBehaviorEvent, constants, errors::Result};
20
21#[cfg(all(feature = "prometheus", not(test)))]
22lazy_static::lazy_static! {
23 static ref METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
24 "hopr_transport_p2p_opened_connection_count",
25 "Number of currently open connections"
26 ).unwrap();
27}
28
29async fn build_p2p_network<T>(
33 me: libp2p::identity::Keypair,
34 indexer_update_input: T,
35) -> Result<libp2p::Swarm<HoprNetworkBehavior>>
36where
37 T: Stream<Item = PeerDiscovery> + Send + 'static,
38{
39 let me_peerid: PeerId = me.public().into();
40
41 #[cfg(feature = "runtime-tokio")]
51 let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
52 .with_tokio()
53 .with_tcp(
54 libp2p::tcp::Config::default().nodelay(true),
55 libp2p::noise::Config::new,
56 libp2p::yamux::Config::default,
59 )
60 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?;
61
62 #[cfg(all(feature = "transport-quic", feature = "runtime-tokio"))]
63 let swarm = swarm.with_quic();
64
65 #[cfg(feature = "runtime-tokio")]
66 let swarm = swarm.with_dns();
67
68 Ok(swarm
69 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
70 .with_behaviour(|_key| HoprNetworkBehavior::new(me_peerid, indexer_update_input))
71 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
72 .with_swarm_config(|cfg| {
73 cfg.with_dial_concurrency_factor(
74 NonZeroU8::new(
75 std::env::var("HOPR_INTERNAL_LIBP2P_MAX_CONCURRENTLY_DIALED_PEER_COUNT")
76 .map(|v| v.trim().parse::<u8>().unwrap_or(u8::MAX))
77 .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_DIALED_PEER_COUNT),
78 )
79 .expect("concurrently dialed peer count must be > 0"),
80 )
81 .with_max_negotiating_inbound_streams(
82 std::env::var("HOPR_INTERNAL_LIBP2P_MAX_NEGOTIATING_INBOUND_STREAM_COUNT")
83 .and_then(|v| v.parse::<usize>().map_err(|_e| std::env::VarError::NotPresent))
84 .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_NEGOTIATING_INBOUND_PEER_COUNT),
85 )
86 .with_idle_connection_timeout(
87 std::env::var("HOPR_INTERNAL_LIBP2P_SWARM_IDLE_TIMEOUT")
88 .and_then(|v| v.parse::<u64>().map_err(|_e| std::env::VarError::NotPresent))
89 .map(std::time::Duration::from_secs)
90 .unwrap_or(constants::HOPR_SWARM_IDLE_CONNECTION_TIMEOUT),
91 )
92 })
93 .build())
94}
95
96pub struct HoprSwarm {
97 pub(crate) swarm: libp2p::Swarm<HoprNetworkBehavior>,
98}
99
100impl std::fmt::Debug for HoprSwarm {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 f.debug_struct("HoprSwarm").finish()
103 }
104}
105
106impl From<HoprSwarm> for libp2p::Swarm<HoprNetworkBehavior> {
107 fn from(value: HoprSwarm) -> Self {
108 value.swarm
109 }
110}
111
112impl HoprSwarm {
113 pub async fn new<T>(
114 identity: libp2p::identity::Keypair,
115 indexer_update_input: T,
116 my_multiaddresses: Vec<Multiaddr>,
117 ) -> Self
118 where
119 T: Stream<Item = PeerDiscovery> + Send + 'static,
120 {
121 let mut swarm = build_p2p_network(identity, indexer_update_input)
122 .await
123 .expect("swarm must be constructible");
124
125 for multiaddress in my_multiaddresses.iter() {
126 match resolve_dns_if_any(multiaddress) {
127 Ok(ma) => {
128 if let Err(e) = swarm.listen_on(ma.clone()) {
129 warn!(%multiaddress, listen_on=%ma, error = %e, "Failed to listen_on, will try to use an unspecified address");
130
131 match replace_transport_with_unspecified(&ma) {
132 Ok(ma) => {
133 if let Err(e) = swarm.listen_on(ma.clone()) {
134 warn!(multiaddress = %ma, error = %e, "Failed to listen_on using the unspecified multiaddress",);
135 } else {
136 info!(
137 listen_on = ?ma,
138 multiaddress = ?multiaddress,
139 "Listening for p2p connections"
140 );
141 swarm.add_external_address(multiaddress.clone());
142 }
143 }
144 Err(e) => {
145 error!(multiaddress = %ma, error = %e, "Failed to transform the multiaddress")
146 }
147 }
148 } else {
149 info!(
150 listen_on = ?ma,
151 multiaddress = ?multiaddress,
152 "Listening for p2p connections"
153 );
154 swarm.add_external_address(multiaddress.clone());
155 }
156 }
157 Err(e) => error!(%multiaddress, error = %e, "Failed to transform the multiaddress"),
158 }
159 }
160
161 Self { swarm }
169 }
170
171 pub fn build_protocol_control(&self, protocol: &'static str) -> crate::HoprStreamProtocolControl {
172 crate::HoprStreamProtocolControl::new(self.swarm.behaviour().streams.new_control(), protocol)
173 }
174
175 pub async fn run<T>(self, events: T)
182 where
183 T: Sink<crate::behavior::discovery::Event> + Send + 'static,
184 {
185 let mut swarm: libp2p::Swarm<HoprNetworkBehavior> = self.into();
186 futures::pin_mut!(events);
187
188 loop {
189 select! {
190 event = swarm.select_next_some() => match event {
191 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Discovery(event)) => {
192 if let Err(_error) = events.send(event).await {
193 tracing::error!("Failed to send discovery event from the transport layer");
194 }
195 }
196 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::AutonatClient(autonat::v2::client::Event {
197 server,
198 tested_addr,
199 bytes_sent,
200 result,
201 })) => {
202 match result {
203 Ok(_) => {
204 debug!(%server, %tested_addr, %bytes_sent, "Autonat server successfully tested");
205 }
206 Err(error) => {
207 warn!(%server, %tested_addr, %bytes_sent, %error, "Autonat server test failed");
208 }
209 }
210 }
211 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::AutonatServer(event)) => {
212 warn!(?event, "Autonat server event");
213 }
214 SwarmEvent::ConnectionEstablished {
215 peer_id,
216 connection_id,
217 num_established,
218 established_in,
219 ..
220 } => {
223 debug!(%peer_id, %connection_id, num_established, established_in_ms = established_in.as_millis(), transport="libp2p", "connection established");
224
225 print_network_info(swarm.network_info(), "connection established");
226
227 #[cfg(all(feature = "prometheus", not(test)))]
228 {
229 METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.increment(1.0);
230 }
231 }
232 SwarmEvent::ConnectionClosed {
233 peer_id,
234 connection_id,
235 cause,
236 num_established,
237 ..
238 } => {
240 debug!(%peer_id, %connection_id, num_established, transport="libp2p", "connection closed: {cause:?}");
241
242 print_network_info(swarm.network_info(), "connection closed");
243
244 #[cfg(all(feature = "prometheus", not(test)))]
245 {
246 METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.decrement(1.0);
247 }
248 }
249 SwarmEvent::IncomingConnection {
250 connection_id,
251 local_addr,
252 send_back_addr,
253 } => {
254 trace!(%local_addr, %send_back_addr, %connection_id, transport="libp2p", "incoming connection");
255 }
256 SwarmEvent::IncomingConnectionError {
257 local_addr,
258 connection_id,
259 error,
260 send_back_addr,
261 peer_id
262 } => {
263 error!(?peer_id, %local_addr, %send_back_addr, %connection_id, transport="libp2p", %error, "incoming connection error");
264
265 print_network_info(swarm.network_info(), "incoming connection error");
266 }
267 SwarmEvent::OutgoingConnectionError {
268 connection_id,
269 error,
270 peer_id
271 } => {
272 error!(peer = ?peer_id, %connection_id, transport="libp2p", %error, "outgoing connection error");
273
274 print_network_info(swarm.network_info(), "outgoing connection error");
275 }
276 SwarmEvent::NewListenAddr {
277 listener_id,
278 address,
279 } => {
280 debug!(%listener_id, %address, transport="libp2p", "new listen address")
281 }
282 SwarmEvent::ExpiredListenAddr {
283 listener_id,
284 address,
285 } => {
286 debug!(%listener_id, %address, transport="libp2p", "expired listen address")
287 }
288 SwarmEvent::ListenerClosed {
289 listener_id,
290 addresses,
291 reason,
292 } => {
293 debug!(%listener_id, ?addresses, ?reason, transport="libp2p", "listener closed", )
294 }
295 SwarmEvent::ListenerError {
296 listener_id,
297 error,
298 } => {
299 debug!(%listener_id, transport="libp2p", %error, "listener error")
300 }
301 SwarmEvent::Dialing {
302 peer_id,
303 connection_id,
304 } => {
305 debug!(peer = ?peer_id, %connection_id, transport="libp2p", "dialing")
306 }
307 SwarmEvent::NewExternalAddrCandidate {
308 .. } => {}
310 SwarmEvent::ExternalAddrConfirmed { address } => {
311 info!(%address, "Detected external address")
312 }
313 SwarmEvent::ExternalAddrExpired {
314 .. } => {}
316 SwarmEvent::NewExternalAddrOfPeer {
317 peer_id, address
318 } => {
319 if is_public_address(&address) {
321 swarm.add_peer_address(peer_id, address.clone());
322 trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Public peer address stored in swarm")
323 } else {
324 trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Private/local peer address ignored")
325 }
326 },
327 _ => trace!(transport="libp2p", "Unsupported enum option detected")
328 }
329 }
330 }
331 }
332
333 pub fn run_nat_server(&mut self, port: u16) {
334 info!(listen_on = port, "Starting NAT server");
335
336 match self.swarm.listen_on(
337 Multiaddr::empty()
338 .with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED))
339 .with(Protocol::Tcp(port)),
340 ) {
341 Ok(_) => {
342 info!("NAT server started");
343 }
344 Err(e) => {
345 warn!(error = %e, "Failed to listen on NAT server");
346 }
347 }
348 }
349
350 pub fn dial_nat_server(&mut self, addresses: Vec<Multiaddr>) {
351 info!(
356 num_addresses = addresses.len(),
357 "Dialing NAT servers with multiple candidate addresses"
358 );
359
360 for addr in addresses {
361 let dial_opts = DialOpts::unknown_peer_id().address(addr.clone()).build();
362 if let Err(e) = self.swarm.dial(dial_opts) {
363 warn!(%addr, %e, "Failed to dial NAT server address");
364 } else {
365 info!(%addr, "Dialed NAT server address");
366 break;
367 }
368 }
369 }
370}
371
372fn print_network_info(network_info: NetworkInfo, event: &str) {
373 let num_peers = network_info.num_peers();
374 let connection_counters = network_info.connection_counters();
375 let num_incoming = connection_counters.num_established_incoming();
376 let num_outgoing = connection_counters.num_established_outgoing();
377 info!(
378 num_peers,
379 num_incoming, num_outgoing, "swarm network status after {event}"
380 );
381}
382
383pub type TicketAggregationRequestType = OutboundRequestId;
384pub type TicketAggregationResponseType = ResponseChannel<std::result::Result<Ticket, String>>;