1use std::{net::Ipv4Addr, num::NonZeroU8};
2
3use futures::{Sink, SinkExt, Stream, StreamExt, select};
4use hopr_internal_types::prelude::*;
5#[cfg(all(feature = "prometheus", not(test)))]
6use hopr_metrics::metrics::SimpleGauge;
7use hopr_transport_identity::{
8 Multiaddr, PeerId,
9 multiaddrs::{replace_transport_with_unspecified, resolve_dns_if_any},
10};
11use hopr_transport_protocol::PeerDiscovery;
12use libp2p::{
13 autonat,
14 multiaddr::Protocol,
15 request_response::{OutboundRequestId, ResponseChannel},
16 swarm::{NetworkInfo, SwarmEvent, dial_opts::DialOpts},
17};
18use tracing::{debug, error, info, trace, warn};
19
20use crate::{HoprNetworkBehavior, HoprNetworkBehaviorEvent, constants, errors::Result};
21
22#[cfg(all(feature = "prometheus", not(test)))]
23lazy_static::lazy_static! {
24 static ref METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT: SimpleGauge = SimpleGauge::new(
25 "hopr_transport_p2p_opened_connection_count",
26 "Number of currently open connections"
27 ).unwrap();
28}
29
30async fn build_p2p_network<T>(
34 me: libp2p::identity::Keypair,
35 indexer_update_input: T,
36) -> Result<libp2p::Swarm<HoprNetworkBehavior>>
37where
38 T: Stream<Item = PeerDiscovery> + Send + 'static,
39{
40 let me_peerid: PeerId = me.public().into();
41
42 #[cfg(feature = "runtime-tokio")]
45 let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
46 .with_tokio()
47 .with_tcp(
48 libp2p::tcp::Config::default().nodelay(true),
49 libp2p::noise::Config::new,
50 libp2p::yamux::Config::default,
53 )
54 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
55 .with_quic()
56 .with_dns();
57
58 Ok(swarm
59 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
60 .with_behaviour(|_key| HoprNetworkBehavior::new(me_peerid, indexer_update_input))
61 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
62 .with_swarm_config(|cfg| {
63 cfg.with_dial_concurrency_factor(
64 NonZeroU8::new(
65 std::env::var("HOPR_INTERNAL_LIBP2P_MAX_CONCURRENTLY_DIALED_PEER_COUNT")
66 .map(|v| v.trim().parse::<u8>().unwrap_or(u8::MAX))
67 .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_DIALED_PEER_COUNT),
68 )
69 .expect("concurrently dialed peer count must be > 0"),
70 )
71 .with_max_negotiating_inbound_streams(
72 std::env::var("HOPR_INTERNAL_LIBP2P_MAX_NEGOTIATING_INBOUND_STREAM_COUNT")
73 .and_then(|v| v.parse::<usize>().map_err(|_e| std::env::VarError::NotPresent))
74 .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_NEGOTIATING_INBOUND_PEER_COUNT),
75 )
76 .with_idle_connection_timeout(
77 std::env::var("HOPR_INTERNAL_LIBP2P_SWARM_IDLE_TIMEOUT")
78 .and_then(|v| v.parse::<u64>().map_err(|_e| std::env::VarError::NotPresent))
79 .map(std::time::Duration::from_secs)
80 .unwrap_or(constants::HOPR_SWARM_IDLE_CONNECTION_TIMEOUT),
81 )
82 })
83 .build())
84}
85
86pub struct HoprSwarm {
87 pub(crate) swarm: libp2p::Swarm<HoprNetworkBehavior>,
88}
89
90impl std::fmt::Debug for HoprSwarm {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 f.debug_struct("HoprSwarm").finish()
93 }
94}
95
96impl From<HoprSwarm> for libp2p::Swarm<HoprNetworkBehavior> {
97 fn from(value: HoprSwarm) -> Self {
98 value.swarm
99 }
100}
101
102impl HoprSwarm {
103 pub async fn new<T>(
104 identity: libp2p::identity::Keypair,
105 indexer_update_input: T,
106 my_multiaddresses: Vec<Multiaddr>,
107 ) -> Self
108 where
109 T: Stream<Item = PeerDiscovery> + Send + 'static,
110 {
111 let mut swarm = build_p2p_network(identity, indexer_update_input)
112 .await
113 .expect("swarm must be constructible");
114
115 for multiaddress in my_multiaddresses.iter() {
116 match resolve_dns_if_any(multiaddress) {
117 Ok(ma) => {
118 if let Err(e) = swarm.listen_on(ma.clone()) {
119 warn!(%multiaddress, listen_on=%ma, error = %e, "Failed to listen_on, will try to use an unspecified address");
120
121 match replace_transport_with_unspecified(&ma) {
122 Ok(ma) => {
123 if let Err(e) = swarm.listen_on(ma.clone()) {
124 warn!(multiaddress = %ma, error = %e, "Failed to listen_on using the unspecified multiaddress",);
125 } else {
126 info!(
127 listen_on = ?ma,
128 multiaddress = ?multiaddress,
129 "Listening for p2p connections"
130 );
131 swarm.add_external_address(multiaddress.clone());
132 }
133 }
134 Err(e) => {
135 error!(multiaddress = %ma, error = %e, "Failed to transform the multiaddress")
136 }
137 }
138 } else {
139 info!(
140 listen_on = ?ma,
141 multiaddress = ?multiaddress,
142 "Listening for p2p connections"
143 );
144 swarm.add_external_address(multiaddress.clone());
145 }
146 }
147 Err(e) => error!(%multiaddress, error = %e, "Failed to transform the multiaddress"),
148 }
149 }
150
151 Self { swarm }
159 }
160
161 pub fn build_protocol_control(&self, protocol: &'static str) -> crate::HoprStreamProtocolControl {
162 crate::HoprStreamProtocolControl::new(self.swarm.behaviour().streams.new_control(), protocol)
163 }
164
165 pub async fn run<T>(self, events: T)
172 where
173 T: Sink<crate::behavior::discovery::Event> + Send + 'static,
174 {
175 let mut swarm: libp2p::Swarm<HoprNetworkBehavior> = self.into();
176 futures::pin_mut!(events);
177
178 loop {
179 select! {
180 event = swarm.select_next_some() => match event {
181 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Discovery(event)) => {
182 if let Err(_error) = events.send(event).await {
183 tracing::error!("Failed to send discovery event from the transport layer");
184 }
185 }
186 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::AutonatClient(autonat::v2::client::Event {
187 server,
188 tested_addr,
189 bytes_sent,
190 result,
191 })) => {
192 match result {
193 Ok(_) => {
194 debug!(%server, %tested_addr, %bytes_sent, "Autonat server successfully tested");
195 }
196 Err(error) => {
197 warn!(%server, %tested_addr, %bytes_sent, %error, "Autonat server test failed");
198 }
199 }
200 }
201 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::AutonatServer(event)) => {
202 warn!(?event, "Autonat server event");
203 }
204 SwarmEvent::ConnectionEstablished {
205 peer_id,
206 connection_id,
207 num_established,
208 established_in,
209 ..
210 } => {
213 debug!(%peer_id, %connection_id, num_established, established_in_ms = established_in.as_millis(), transport="libp2p", "connection established");
214
215 print_network_info(swarm.network_info(), "connection established");
216
217 #[cfg(all(feature = "prometheus", not(test)))]
218 {
219 METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.increment(1.0);
220 }
221 }
222 SwarmEvent::ConnectionClosed {
223 peer_id,
224 connection_id,
225 cause,
226 num_established,
227 ..
228 } => {
230 debug!(%peer_id, %connection_id, num_established, transport="libp2p", "connection closed: {cause:?}");
231
232 print_network_info(swarm.network_info(), "connection closed");
233
234 #[cfg(all(feature = "prometheus", not(test)))]
235 {
236 METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.decrement(1.0);
237 }
238 }
239 SwarmEvent::IncomingConnection {
240 connection_id,
241 local_addr,
242 send_back_addr,
243 } => {
244 trace!(%local_addr, %send_back_addr, %connection_id, transport="libp2p", "incoming connection");
245 }
246 SwarmEvent::IncomingConnectionError {
247 local_addr,
248 connection_id,
249 error,
250 send_back_addr,
251 peer_id
252 } => {
253 error!(?peer_id, %local_addr, %send_back_addr, %connection_id, transport="libp2p", %error, "incoming connection error");
254
255 print_network_info(swarm.network_info(), "incoming connection error");
256 }
257 SwarmEvent::OutgoingConnectionError {
258 connection_id,
259 error,
260 peer_id
261 } => {
262 error!(peer = ?peer_id, %connection_id, transport="libp2p", %error, "outgoing connection error");
263
264 print_network_info(swarm.network_info(), "outgoing connection error");
265 }
266 SwarmEvent::NewListenAddr {
267 listener_id,
268 address,
269 } => {
270 debug!(%listener_id, %address, transport="libp2p", "new listen address")
271 }
272 SwarmEvent::ExpiredListenAddr {
273 listener_id,
274 address,
275 } => {
276 debug!(%listener_id, %address, transport="libp2p", "expired listen address")
277 }
278 SwarmEvent::ListenerClosed {
279 listener_id,
280 addresses,
281 reason,
282 } => {
283 debug!(%listener_id, ?addresses, ?reason, transport="libp2p", "listener closed", )
284 }
285 SwarmEvent::ListenerError {
286 listener_id,
287 error,
288 } => {
289 debug!(%listener_id, transport="libp2p", %error, "listener error")
290 }
291 SwarmEvent::Dialing {
292 peer_id,
293 connection_id,
294 } => {
295 debug!(peer = ?peer_id, %connection_id, transport="libp2p", "dialing")
296 }
297 SwarmEvent::NewExternalAddrCandidate {
298 .. } => {}
300 SwarmEvent::ExternalAddrConfirmed { address } => {
301 info!(%address, "Detected external address")
302 }
303 SwarmEvent::ExternalAddrExpired {
304 .. } => {}
306 SwarmEvent::NewExternalAddrOfPeer {
307 peer_id, address
308 } => {
309 swarm.add_peer_address(peer_id, address.clone());
310 trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "New peer stored in swarm")
311 },
312 _ => trace!(transport="libp2p", "Unsupported enum option detected")
313 }
314 }
315 }
316 }
317
318 pub fn run_nat_server(&mut self, port: u16) {
319 info!(listen_on = port, "Starting NAT server");
320
321 match self.swarm.listen_on(
322 Multiaddr::empty()
323 .with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED))
324 .with(Protocol::Tcp(port)),
325 ) {
326 Ok(_) => {
327 info!("NAT server started");
328 }
329 Err(e) => {
330 warn!(error = %e, "Failed to listen on NAT server");
331 }
332 }
333 }
334
335 pub fn dial_nat_server(&mut self, addresses: Vec<Multiaddr>) {
336 info!(
341 num_addresses = addresses.len(),
342 "Dialing NAT servers with multiple candidate addresses"
343 );
344
345 for addr in addresses {
346 let dial_opts = DialOpts::unknown_peer_id().address(addr.clone()).build();
347 if let Err(e) = self.swarm.dial(dial_opts) {
348 warn!(%addr, %e, "Failed to dial NAT server address");
349 } else {
350 info!(%addr, "Dialed NAT server address");
351 break;
352 }
353 }
354 }
355}
356
357fn print_network_info(network_info: NetworkInfo, event: &str) {
358 let num_peers = network_info.num_peers();
359 let connection_counters = network_info.connection_counters();
360 let num_incoming = connection_counters.num_established_incoming();
361 let num_outgoing = connection_counters.num_established_outgoing();
362 info!(
363 num_peers,
364 num_incoming, num_outgoing, "swarm network status after {event}"
365 );
366}
367
368pub type TicketAggregationRequestType = OutboundRequestId;
369pub type TicketAggregationResponseType = ResponseChannel<std::result::Result<Ticket, String>>;