1use std::{net::Ipv4Addr, num::NonZeroU8};
2
3use futures::{Stream, StreamExt, select};
4use hopr_internal_types::prelude::*;
5#[cfg(all(feature = "prometheus", not(test)))]
6use hopr_metrics::metrics::SimpleGauge;
7use hopr_transport_identity::{
8 Multiaddr, PeerId,
9 multiaddrs::{replace_transport_with_unspecified, resolve_dns_if_any},
10};
11use hopr_transport_protocol::PeerDiscovery;
12use libp2p::{
13 autonat,
14 multiaddr::Protocol,
15 request_response::{OutboundRequestId, ResponseChannel},
16 swarm::{NetworkInfo, SwarmEvent, dial_opts::DialOpts},
17};
18use tracing::{debug, error, info, trace, warn};
19
20use crate::{HoprNetworkBehavior, HoprNetworkBehaviorEvent, constants, errors::Result};
21
22#[cfg(all(feature = "prometheus", not(test)))]
23lazy_static::lazy_static! {
24 static ref METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT: SimpleGauge = SimpleGauge::new(
25 "hopr_transport_p2p_opened_connection_count",
26 "Number of currently open connections"
27 ).unwrap();
28}
29
30async fn build_p2p_network<T>(
34 me: libp2p::identity::Keypair,
35 indexer_update_input: T,
36) -> Result<libp2p::Swarm<HoprNetworkBehavior>>
37where
38 T: Stream<Item = PeerDiscovery> + Send + 'static,
39{
40 let me_peerid: PeerId = me.public().into();
41
42 #[cfg(feature = "runtime-tokio")]
45 let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
46 .with_tokio()
47 .with_tcp(
48 libp2p::tcp::Config::default().nodelay(true),
49 libp2p::noise::Config::new,
50 libp2p::yamux::Config::default,
53 )
54 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
55 .with_quic()
56 .with_dns();
57
58 Ok(swarm
59 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
60 .with_behaviour(|_key| HoprNetworkBehavior::new(me_peerid, indexer_update_input))
61 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
62 .with_swarm_config(|cfg| {
63 cfg.with_dial_concurrency_factor(
64 NonZeroU8::new(
65 std::env::var("HOPR_INTERNAL_LIBP2P_MAX_CONCURRENTLY_DIALED_PEER_COUNT")
66 .map(|v| v.trim().parse::<u8>().unwrap_or(u8::MAX))
67 .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_DIALED_PEER_COUNT),
68 )
69 .expect("concurrently dialed peer count must be > 0"),
70 )
71 .with_max_negotiating_inbound_streams(
72 std::env::var("HOPR_INTERNAL_LIBP2P_MAX_NEGOTIATING_INBOUND_STREAM_COUNT")
73 .and_then(|v| v.parse::<usize>().map_err(|_e| std::env::VarError::NotPresent))
74 .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_NEGOTIATING_INBOUND_PEER_COUNT),
75 )
76 .with_idle_connection_timeout(
77 std::env::var("HOPR_INTERNAL_LIBP2P_SWARM_IDLE_TIMEOUT")
78 .and_then(|v| v.parse::<u64>().map_err(|_e| std::env::VarError::NotPresent))
79 .map(std::time::Duration::from_secs)
80 .unwrap_or(constants::HOPR_SWARM_IDLE_CONNECTION_TIMEOUT),
81 )
82 })
83 .build())
84}
85
86pub struct HoprSwarm {
87 pub(crate) swarm: libp2p::Swarm<HoprNetworkBehavior>,
88}
89
90impl std::fmt::Debug for HoprSwarm {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 f.debug_struct("HoprSwarm").finish()
93 }
94}
95
96impl From<HoprSwarm> for libp2p::Swarm<HoprNetworkBehavior> {
97 fn from(value: HoprSwarm) -> Self {
98 value.swarm
99 }
100}
101
102impl HoprSwarm {
103 pub async fn new<T>(
104 identity: libp2p::identity::Keypair,
105 indexer_update_input: T,
106 my_multiaddresses: Vec<Multiaddr>,
107 ) -> Self
108 where
109 T: Stream<Item = PeerDiscovery> + Send + 'static,
110 {
111 let mut swarm = build_p2p_network(identity, indexer_update_input)
112 .await
113 .expect("swarm must be constructible");
114
115 for multiaddress in my_multiaddresses.iter() {
116 match resolve_dns_if_any(multiaddress) {
117 Ok(ma) => {
118 if let Err(e) = swarm.listen_on(ma.clone()) {
119 warn!(%multiaddress, listen_on=%ma, error = %e, "Failed to listen_on, will try to use an unspecified address");
120
121 match replace_transport_with_unspecified(&ma) {
122 Ok(ma) => {
123 if let Err(e) = swarm.listen_on(ma.clone()) {
124 warn!(multiaddress = %ma, error = %e, "Failed to listen_on using the unspecified multiaddress",);
125 } else {
126 info!(
127 listen_on = ?ma,
128 multiaddress = ?multiaddress,
129 "Listening for p2p connections"
130 );
131 swarm.add_external_address(multiaddress.clone());
132 }
133 }
134 Err(e) => {
135 error!(multiaddress = %ma, error = %e, "Failed to transform the multiaddress")
136 }
137 }
138 } else {
139 info!(
140 listen_on = ?ma,
141 multiaddress = ?multiaddress,
142 "Listening for p2p connections"
143 );
144 swarm.add_external_address(multiaddress.clone());
145 }
146 }
147 Err(e) => error!(%multiaddress, error = %e, "Failed to transform the multiaddress"),
148 }
149 }
150
151 Self { swarm }
159 }
160
161 pub fn build_protocol_control(&self, protocol: &'static str) -> crate::HoprStreamProtocolControl {
162 crate::HoprStreamProtocolControl::new(self.swarm.behaviour().streams.new_control(), protocol)
163 }
164
165 pub async fn run(self) {
172 let mut swarm: libp2p::Swarm<HoprNetworkBehavior> = self.into();
173
174 loop {
175 select! {
176 event = swarm.select_next_some() => match event {
177 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Discovery(_)) => {}
178 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::AutonatClient(autonat::v2::client::Event {
179 server,
180 tested_addr,
181 bytes_sent,
182 result,
183 })) => {
184 match result {
185 Ok(_) => {
186 debug!(%server, %tested_addr, %bytes_sent, "Autonat server successfully tested");
187 }
188 Err(e) => {
189 warn!(%server, %tested_addr, %bytes_sent, %e, "Autonat server test failed");
190 }
191 }
192 }
193 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::AutonatServer(event)) => {
194 warn!(?event, "Autonat server event");
195 }
196 SwarmEvent::ConnectionEstablished {
197 peer_id,
198 connection_id,
199 num_established,
200 established_in,
201 ..
202 } => {
205 debug!(%peer_id, %connection_id, num_established, established_in_ms = established_in.as_millis(), transport="libp2p", "connection established");
206
207 print_network_info(swarm.network_info(), "connection established");
208
209 #[cfg(all(feature = "prometheus", not(test)))]
210 {
211 METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.increment(1.0);
212 }
213 }
214 SwarmEvent::ConnectionClosed {
215 peer_id,
216 connection_id,
217 cause,
218 num_established,
219 ..
220 } => {
222 debug!(%peer_id, %connection_id, num_established, transport="libp2p", "connection closed: {cause:?}");
223
224 print_network_info(swarm.network_info(), "connection closed");
225
226 #[cfg(all(feature = "prometheus", not(test)))]
227 {
228 METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.decrement(1.0);
229 }
230 }
231 SwarmEvent::IncomingConnection {
232 connection_id,
233 local_addr,
234 send_back_addr,
235 } => {
236 trace!(%local_addr, %send_back_addr, %connection_id, transport="libp2p", "incoming connection");
237 }
238 SwarmEvent::IncomingConnectionError {
239 local_addr,
240 connection_id,
241 error,
242 send_back_addr,
243 } => {
244 error!(%local_addr, %send_back_addr, %connection_id, transport="libp2p", %error, "incoming connection error");
245
246 print_network_info(swarm.network_info(), "incoming connection error");
247 }
248 SwarmEvent::OutgoingConnectionError {
249 connection_id,
250 error,
251 peer_id
252 } => {
253 error!(peer = ?peer_id, %connection_id, transport="libp2p", %error, "outgoing connection error");
254
255 print_network_info(swarm.network_info(), "outgoing connection error");
256 }
257 SwarmEvent::NewListenAddr {
258 listener_id,
259 address,
260 } => {
261 debug!(%listener_id, %address, transport="libp2p", "new listen address")
262 }
263 SwarmEvent::ExpiredListenAddr {
264 listener_id,
265 address,
266 } => {
267 debug!(%listener_id, %address, transport="libp2p", "expired listen address")
268 }
269 SwarmEvent::ListenerClosed {
270 listener_id,
271 addresses,
272 reason,
273 } => {
274 debug!(%listener_id, ?addresses, ?reason, transport="libp2p", "listener closed", )
275 }
276 SwarmEvent::ListenerError {
277 listener_id,
278 error,
279 } => {
280 debug!(%listener_id, transport="libp2p", %error, "listener error")
281 }
282 SwarmEvent::Dialing {
283 peer_id,
284 connection_id,
285 } => {
286 debug!(peer = ?peer_id, %connection_id, transport="libp2p", "dialing")
287 }
288 SwarmEvent::NewExternalAddrCandidate {
289 .. } => {}
291 SwarmEvent::ExternalAddrConfirmed { address } => {
292 info!(%address, "Detected external address")
293 }
294 SwarmEvent::ExternalAddrExpired {
295 .. } => {}
297 SwarmEvent::NewExternalAddrOfPeer {
298 peer_id, address
299 } => {
300 swarm.add_peer_address(peer_id, address.clone());
301 trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "New peer stored in swarm")
302 },
303 _ => trace!(transport="libp2p", "Unsupported enum option detected")
304 }
305 }
306 }
307 }
308
309 pub fn run_nat_server(&mut self, port: u16) {
310 info!(listen_on = port, "Starting NAT server");
311
312 match self.swarm.listen_on(
313 Multiaddr::empty()
314 .with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED))
315 .with(Protocol::Tcp(port)),
316 ) {
317 Ok(_) => {
318 info!("NAT server started");
319 }
320 Err(e) => {
321 warn!(error = %e, "Failed to listen on NAT server");
322 }
323 }
324 }
325
326 pub fn dial_nat_server(&mut self, addresses: Vec<Multiaddr>) {
327 info!(
332 num_addresses = addresses.len(),
333 "Dialing NAT servers with multiple candidate addresses"
334 );
335
336 for addr in addresses {
337 let dial_opts = DialOpts::unknown_peer_id().address(addr.clone()).build();
338 if let Err(e) = self.swarm.dial(dial_opts) {
339 warn!(%addr, %e, "Failed to dial NAT server address");
340 } else {
341 info!(%addr, "Dialed NAT server address");
342 break;
343 }
344 }
345 }
346}
347
348fn print_network_info(network_info: NetworkInfo, event: &str) {
349 let num_peers = network_info.num_peers();
350 let connection_counters = network_info.connection_counters();
351 let num_incoming = connection_counters.num_established_incoming();
352 let num_outgoing = connection_counters.num_established_outgoing();
353 info!(
354 num_peers,
355 num_incoming, num_outgoing, "swarm network status after {event}"
356 );
357}
358
359pub type TicketAggregationRequestType = OutboundRequestId;
360pub type TicketAggregationResponseType = ResponseChannel<std::result::Result<Ticket, String>>;