hopr_transport_p2p/
swarm.rs

1use futures::{select, Stream, StreamExt};
2use libp2p::swarm::NetworkInfo;
3use libp2p::{request_response::OutboundRequestId, request_response::ResponseChannel, swarm::SwarmEvent};
4use std::num::NonZeroU8;
5use tracing::{debug, error, info, trace, warn};
6
7use hopr_internal_types::prelude::*;
8use hopr_transport_identity::{
9    multiaddrs::{replace_transport_with_unspecified, resolve_dns_if_any},
10    Multiaddr, PeerId,
11};
12use hopr_transport_network::{messaging::ControlMessage, network::NetworkTriggeredEvent, ping::PingQueryReplier};
13use hopr_transport_protocol::{
14    config::ProtocolConfig,
15    ticket_aggregation::processor::{TicketAggregationActions, TicketAggregationFinalizer, TicketAggregationProcessed},
16    PeerDiscovery,
17};
18
19use crate::{constants, errors::Result, HoprNetworkBehavior, HoprNetworkBehaviorEvent, Ping, Pong};
20
21#[cfg(all(feature = "prometheus", not(test)))]
22use hopr_metrics::metrics::SimpleGauge;
23
24#[cfg(all(feature = "prometheus", not(test)))]
25lazy_static::lazy_static! {
26    static ref METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT: SimpleGauge = SimpleGauge::new(
27        "hopr_transport_p2p_opened_connection_count",
28        "Number of currently open connections"
29    ).unwrap();
30}
31
32/// Build objects comprising the p2p network.
33///
34/// Returns a built [libp2p::Swarm] object implementing the HoprNetworkBehavior functionality.
35async fn build_p2p_network<T, U>(
36    me: libp2p::identity::Keypair,
37    network_update_input: futures::channel::mpsc::Receiver<NetworkTriggeredEvent>,
38    indexer_update_input: U,
39    heartbeat_requests: futures::channel::mpsc::UnboundedReceiver<(PeerId, PingQueryReplier)>,
40    ticket_aggregation_interactions: T,
41    protocol_cfg: ProtocolConfig,
42) -> Result<libp2p::Swarm<HoprNetworkBehavior>>
43where
44    T: Stream<Item = crate::behavior::ticket_aggregation::Event> + Send + 'static,
45    U: Stream<Item = PeerDiscovery> + Send + 'static,
46{
47    let me_peerid: PeerId = me.public().into();
48
49    #[cfg(feature = "runtime-async-std")]
50    let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
51        .with_async_std()
52        .with_tcp(
53            libp2p::tcp::Config::default().nodelay(true),
54            libp2p::noise::Config::new,
55            // use default yamux configuration to enable auto-tuning
56            // see https://github.com/libp2p/rust-libp2p/pull/4970
57            libp2p::yamux::Config::default,
58        )
59        .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
60        .with_quic()
61        .with_dns();
62
63    // Both features could be enabled during testing, therefore we only use tokio when its
64    // exclusively enabled.
65    #[cfg(all(feature = "runtime-tokio", not(feature = "runtime-async-std")))]
66    let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
67        .with_tokio()
68        .with_tcp(
69            libp2p::tcp::Config::default().nodelay(true),
70            libp2p::noise::Config::new,
71            // use default yamux configuration to enable auto-tuning
72            // see https://github.com/libp2p/rust-libp2p/pull/4970
73            libp2p::yamux::Config::default,
74        )
75        .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
76        .with_quic()
77        .with_dns();
78
79    Ok(swarm
80        .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
81        .with_behaviour(|_key| {
82            HoprNetworkBehavior::new(
83                me_peerid,
84                network_update_input,
85                indexer_update_input,
86                heartbeat_requests,
87                ticket_aggregation_interactions,
88                protocol_cfg.heartbeat.timeout,
89                protocol_cfg.ticket_aggregation.timeout,
90            )
91        })
92        .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
93        .with_swarm_config(|cfg| {
94            cfg.with_dial_concurrency_factor(
95                NonZeroU8::new(
96                    std::env::var("HOPR_INTERNAL_LIBP2P_MAX_CONCURRENTLY_DIALED_PEER_COUNT")
97                        .map(|v| v.trim().parse::<u8>().unwrap_or(u8::MAX))
98                        .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_DIALED_PEER_COUNT),
99                )
100                .expect("concurrently dialed peer count must be > 0"),
101            )
102            .with_max_negotiating_inbound_streams(
103                std::env::var("HOPR_INTERNAL_LIBP2P_MAX_NEGOTIATING_INBOUND_STREAM_COUNT")
104                    .and_then(|v| v.parse::<usize>().map_err(|_e| std::env::VarError::NotPresent))
105                    .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_NEGOTIATING_INBOUND_PEER_COUNT),
106            )
107            .with_idle_connection_timeout(
108                std::env::var("HOPR_INTERNAL_LIBP2P_SWARM_IDLE_TIMEOUT")
109                    .and_then(|v| v.parse::<u64>().map_err(|_e| std::env::VarError::NotPresent))
110                    .map(std::time::Duration::from_secs)
111                    .unwrap_or(constants::HOPR_SWARM_IDLE_CONNECTION_TIMEOUT),
112            )
113        })
114        .build())
115}
116
117pub type TicketAggregationWriter =
118    TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>;
119pub type TicketAggregationEvent = crate::behavior::ticket_aggregation::Event;
120
121pub struct HoprSwarm {
122    pub(crate) swarm: libp2p::Swarm<HoprNetworkBehavior>,
123}
124
125impl HoprSwarm {
126    pub async fn new<U, T>(
127        identity: libp2p::identity::Keypair,
128        network_update_input: futures::channel::mpsc::Receiver<NetworkTriggeredEvent>,
129        indexer_update_input: U,
130        heartbeat_requests: futures::channel::mpsc::UnboundedReceiver<(PeerId, PingQueryReplier)>,
131        ticket_aggregation_interactions: T,
132        my_multiaddresses: Vec<Multiaddr>,
133        protocol_cfg: ProtocolConfig,
134    ) -> Self
135    where
136        T: Stream<Item = TicketAggregationEvent> + Send + 'static,
137        U: Stream<Item = PeerDiscovery> + Send + 'static,
138    {
139        let mut swarm = build_p2p_network(
140            identity,
141            network_update_input,
142            indexer_update_input,
143            heartbeat_requests,
144            ticket_aggregation_interactions,
145            protocol_cfg,
146        )
147        .await
148        .expect("swarm must be constructible");
149
150        for multiaddress in my_multiaddresses.iter() {
151            match resolve_dns_if_any(multiaddress) {
152                Ok(ma) => {
153                    if let Err(e) = swarm.listen_on(ma.clone()) {
154                        warn!(%multiaddress, listen_on=%ma, error = %e, "Failed to listen_on, will try to use an unspecified address");
155
156                        match replace_transport_with_unspecified(&ma) {
157                            Ok(ma) => {
158                                if let Err(e) = swarm.listen_on(ma.clone()) {
159                                    warn!(multiaddress = %ma, error = %e, "Failed to listen_on using the unspecified multiaddress",);
160                                } else {
161                                    info!(
162                                        listen_on = ?ma,
163                                        multiaddress = ?multiaddress,
164                                        "Listening for p2p connections"
165                                    );
166                                    swarm.add_external_address(multiaddress.clone());
167                                }
168                            }
169                            Err(e) => {
170                                error!(multiaddress = %ma, error = %e, "Failed to transform the multiaddress")
171                            }
172                        }
173                    } else {
174                        info!(
175                            listen_on = ?ma,
176                            multiaddress = ?multiaddress,
177                            "Listening for p2p connections"
178                        );
179                        swarm.add_external_address(multiaddress.clone());
180                    }
181                }
182                Err(e) => error!(%multiaddress, error = %e, "Failed to transform the multiaddress"),
183            }
184        }
185
186        // TODO: perform this check
187        // NOTE: This would be a valid check but is not immediate
188        // assert!(
189        //     swarm.listeners().count() > 0,
190        //     "The node failed to listen on at least one of the specified interfaces"
191        // );
192
193        Self { swarm }
194    }
195
196    pub fn build_protocol_control(&self, protocol: &'static str) -> crate::HoprStreamProtocolControl {
197        crate::HoprStreamProtocolControl::new(self.swarm.behaviour().streams.new_control(), protocol)
198    }
199
200    // TODO: rename to with_outputs
201    pub fn with_processors(self, ticket_aggregation_writer: TicketAggregationWriter) -> HoprSwarmWithProcessors {
202        HoprSwarmWithProcessors {
203            swarm: self,
204            ticket_aggregation_writer,
205        }
206    }
207}
208
209fn print_network_info(network_info: NetworkInfo, event: &str) {
210    let num_peers = network_info.num_peers();
211    let connection_counters = network_info.connection_counters();
212    let num_incoming = connection_counters.num_established_incoming();
213    let num_outgoing = connection_counters.num_established_outgoing();
214    info!(
215        num_peers,
216        num_incoming, num_outgoing, "swarm network status after {event}"
217    );
218}
219
220impl From<HoprSwarm> for libp2p::Swarm<HoprNetworkBehavior> {
221    fn from(value: HoprSwarm) -> Self {
222        value.swarm
223    }
224}
225
226/// Composition of all inputs allowing to produce a single stream of
227/// input events passed into the swarm processing logic.
228#[derive(Debug)]
229pub enum Inputs {
230    Message((PeerId, Box<[u8]>)),
231    Acknowledgement((PeerId, Acknowledgement)),
232}
233
234impl From<(PeerId, Acknowledgement)> for Inputs {
235    fn from(value: (PeerId, Acknowledgement)) -> Self {
236        Self::Acknowledgement(value)
237    }
238}
239
240impl From<(PeerId, Box<[u8]>)> for Inputs {
241    fn from(value: (PeerId, Box<[u8]>)) -> Self {
242        Self::Message(value)
243    }
244}
245
246use hopr_internal_types::legacy;
247
248pub type TicketAggregationRequestType = OutboundRequestId;
249pub type TicketAggregationResponseType = ResponseChannel<std::result::Result<legacy::Ticket, String>>;
250
251pub struct HoprSwarmWithProcessors {
252    swarm: HoprSwarm,
253    ticket_aggregation_writer: TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>,
254}
255
256impl std::fmt::Debug for HoprSwarmWithProcessors {
257    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258        f.debug_struct("SwarmEventLoop").finish()
259    }
260}
261
262impl HoprSwarmWithProcessors {
263    /// Main p2p loop that instantiates a new libp2p::Swarm instance and sets up listening and reacting pipelines
264    /// running in a neverending loop future.
265    ///
266    /// The function represents the entirety of the business logic of the hopr daemon related to core operations.
267    ///
268    /// This future can only be resolved by an unrecoverable error or a panic.
269    pub async fn run(self, version: String) {
270        let mut swarm: libp2p::Swarm<HoprNetworkBehavior> = self.swarm.into();
271
272        // NOTE: an improvement would be a forgetting cache for the active requests
273        let active_pings: moka::future::Cache<libp2p::request_response::OutboundRequestId, PingQueryReplier> =
274            moka::future::CacheBuilder::new(1000)
275                .time_to_live(std::time::Duration::from_secs(40))
276                .build();
277        let active_aggregation_requests: moka::future::Cache<
278            libp2p::request_response::OutboundRequestId,
279            TicketAggregationFinalizer,
280        > = moka::future::CacheBuilder::new(1000)
281            .time_to_live(std::time::Duration::from_secs(40))
282            .build();
283
284        let mut aggregation_writer = self.ticket_aggregation_writer;
285
286        loop {
287            select! {
288                event = swarm.select_next_some() => match event {
289                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::TicketAggregation(event)) => {
290                        let _span = tracing::span!(tracing::Level::DEBUG, "swarm protocol", protocol = "/hopr/ticket_aggregation/0.1.0");
291                        match event {
292                            libp2p::request_response::Event::<Vec<legacy::AcknowledgedTicket>, std::result::Result<legacy::Ticket,String>>::Message {
293                                peer,
294                                message,
295                                connection_id
296                            } => {
297                                match message {
298                                    libp2p::request_response::Message::<Vec<legacy::AcknowledgedTicket>, std::result::Result<legacy::Ticket,String>>::Request {
299                                        request_id, request, channel
300                                    } => {
301                                        trace!(%peer, %request_id, %connection_id, "Received a ticket aggregation request");
302
303                                        let request = request.into_iter().map(TransferableWinningTicket::from).collect::<Vec<_>>();
304                                        if let Err(e) = aggregation_writer.receive_aggregation_request(peer, request, channel) {
305                                            error!(%peer, %request_id, %connection_id, error = %e, "Failed to process a ticket aggregation request");
306                                        }
307                                    },
308                                    libp2p::request_response::Message::<Vec<legacy::AcknowledgedTicket>, std::result::Result<legacy::Ticket, String>>::Response {
309                                        request_id, response
310                                    } => {
311                                        if let Err(e) = aggregation_writer.receive_ticket(peer, response.map(|t| t.0), request_id) {
312                                            error!(%peer, %request_id, %connection_id, error = %e,  "Failed to receive aggregated ticket");
313                                        }
314                                    }
315                                }
316                            },
317                            libp2p::request_response::Event::<Vec<legacy::AcknowledgedTicket>, std::result::Result<legacy::Ticket,String>>::OutboundFailure {
318                                peer, request_id, error, connection_id
319                            } => {
320                                error!(%peer, %request_id, %connection_id, %error, "Failed to send an aggregation request");
321                            },
322                            libp2p::request_response::Event::<Vec<legacy::AcknowledgedTicket>, std::result::Result<legacy::Ticket,String>>::InboundFailure {
323                                peer, request_id, error, connection_id
324                            } => {
325                                warn!(%peer, %request_id, %connection_id, %error, "Failed to receive an aggregated ticket");
326                            },
327                            libp2p::request_response::Event::<Vec<legacy::AcknowledgedTicket>, std::result::Result<legacy::Ticket,String>>::ResponseSent {..} => {
328                                // trace!("Discarded messages not relevant for the protocol!");
329                            },
330                        }
331                    }
332                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Heartbeat(event)) => {
333                        let _span = tracing::span!(tracing::Level::DEBUG, "swarm protocol", protocol = "/hopr/heartbeat/0.1.0");
334                        match event {
335                            libp2p::request_response::Event::<Ping,Pong>::Message {
336                                peer,
337                                message,
338                                connection_id
339                            } => {
340                                match message {
341                                    libp2p::request_response::Message::<Ping,Pong>::Request {
342                                        request_id, request, channel
343                                    } => {
344                                        trace!(%peer, %request_id, %connection_id, "Received a heartbeat Ping");
345
346                                        if let Ok(challenge_response) = ControlMessage::generate_pong_response(&request.0)
347                                        {
348                                            if swarm.behaviour_mut().heartbeat.send_response(channel, Pong(challenge_response, version.clone())).is_err() {
349                                                error!(%peer, %request_id, %connection_id, "Failed to reply to a Ping request");
350                                            };
351                                        }
352                                    },
353                                    libp2p::request_response::Message::<Ping,Pong>::Response {
354                                        request_id, response
355                                    } => {
356                                        if let Some(replier) = active_pings.remove(&request_id).await {
357                                            active_pings.run_pending_tasks().await;     // needed to remove the invalidated, but still present instance of Arc inside
358                                            trace!(%peer, %request_id, "Processing manual ping response");
359                                            replier.notify(response.0, response.1)
360                                        } else {
361                                            debug!(%peer, %request_id, "Failed to find heartbeat replier");
362                                        }
363                                    }
364                                }
365                            },
366                            libp2p::request_response::Event::<Ping,Pong>::OutboundFailure {
367                                peer, request_id, error, connection_id
368                            } => {
369                                active_pings.invalidate(&request_id).await;
370                                if matches!(error, libp2p::request_response::OutboundFailure::DialFailure) {
371                                    trace!(%peer, %request_id, %connection_id, %error, "Peer is offline");
372                                } else {
373                                    error!(%peer, %request_id, %connection_id, %error, "Failed heartbeat protocol on outbound");
374                                }
375                            },
376                            libp2p::request_response::Event::<Ping,Pong>::InboundFailure {
377                                peer, request_id, error, connection_id
378                            } => {
379                                warn!(%peer, %request_id, %connection_id, "Failed to receive a Pong request: {error}");
380                            },
381                            libp2p::request_response::Event::<Ping,Pong>::ResponseSent {..} => {
382                                // trace!("Discarded messages not relevant for the protocol!");
383                            },
384                        }
385                    }
386                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::KeepAlive(_)) => {}
387                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Discovery(_)) => {}
388                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::TicketAggregationBehavior(event)) => {
389                        let _span = tracing::span!(tracing::Level::DEBUG, "swarm behavior", behavior="ticket aggregation");
390
391                        match event {
392                            TicketAggregationProcessed::Send(peer, acked_tickets, finalizer) => {
393                                let ack_tkt_count = acked_tickets.len();
394                                let request_id = swarm.behaviour_mut().ticket_aggregation.send_request(&peer, acked_tickets);
395                                debug!(%peer, %request_id, "Sending request to aggregate {ack_tkt_count} tickets");
396                                active_aggregation_requests.insert(request_id, finalizer).await;
397                            },
398                            TicketAggregationProcessed::Reply(peer, ticket, response) => {
399                                debug!(%peer, "Enqueuing a response'");
400                                if swarm.behaviour_mut().ticket_aggregation.send_response(response, ticket.map(legacy::Ticket)).is_err() {
401                                    error!(%peer, "Failed to enqueue response");
402                                }
403                            },
404                            TicketAggregationProcessed::Receive(peer, _, request) => {
405                                match active_aggregation_requests.remove(&request).await {
406                                    Some(finalizer) => {
407                                        active_aggregation_requests.run_pending_tasks().await;
408                                        finalizer.finalize();
409                                    },
410                                    None => {
411                                        warn!(%peer, request_id = %request, "Response already handled")
412                                    }
413                                }
414                            }
415                        }
416                    }
417                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::HeartbeatGenerator(event)) => {
418                        let _span = tracing::span!(tracing::Level::DEBUG, "swarm behavior", behavior="heartbeat generator");
419
420                        trace!(event = tracing::field::debug(&event), "Received a heartbeat event");
421                        match event {
422                            crate::behavior::heartbeat::Event::ToProbe((peer, replier)) => {
423                                let req_id = swarm.behaviour_mut().heartbeat.send_request(&peer, Ping(replier.challenge()));
424                                active_pings.insert(req_id, replier).await;
425                            },
426                        }
427                    }
428                    SwarmEvent::ConnectionEstablished {
429                        peer_id,
430                        connection_id,
431                        num_established,
432                        established_in,
433                        ..
434                        // concurrent_dial_errors,
435                        // endpoint,
436                    } => {
437                        debug!(%peer_id, %connection_id, num_established, established_in_ms = established_in.as_millis(), transport="libp2p", "connection established");
438
439                        print_network_info(swarm.network_info(), "connection established");
440
441                        #[cfg(all(feature = "prometheus", not(test)))]
442                        {
443                            METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.increment(1.0);
444                        }
445                    }
446                    SwarmEvent::ConnectionClosed {
447                        peer_id,
448                        connection_id,
449                        cause,
450                        num_established,
451                        ..
452                        // endpoint,
453                    } => {
454                        debug!(%peer_id, %connection_id, num_established, transport="libp2p", "connection closed: {cause:?}");
455
456                        print_network_info(swarm.network_info(), "connection closed");
457
458                        #[cfg(all(feature = "prometheus", not(test)))]
459                        {
460                            METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.decrement(1.0);
461                        }
462                    }
463                    SwarmEvent::IncomingConnection {
464                        connection_id,
465                        local_addr,
466                        send_back_addr,
467                    } => {
468                        trace!(%local_addr, %send_back_addr, %connection_id, transport="libp2p",  "incoming connection");
469                    }
470                    SwarmEvent::IncomingConnectionError {
471                        local_addr,
472                        connection_id,
473                        error,
474                        send_back_addr,
475                    } => {
476                        error!(%local_addr, %send_back_addr, %connection_id, transport="libp2p", %error, "incoming connection error");
477
478                        print_network_info(swarm.network_info(), "incoming connection error");
479                    }
480                    SwarmEvent::OutgoingConnectionError {
481                        connection_id,
482                        error,
483                        peer_id
484                    } => {
485                        error!(peer = ?peer_id, %connection_id, transport="libp2p", %error, "outgoing connection error");
486
487                        print_network_info(swarm.network_info(), "outgoing connection error");
488                    }
489                    SwarmEvent::NewListenAddr {
490                        listener_id,
491                        address,
492                    } => {
493                        debug!(%listener_id, %address, transport="libp2p", "new listen address")
494                    }
495                    SwarmEvent::ExpiredListenAddr {
496                        listener_id,
497                        address,
498                    } => {
499                        debug!(%listener_id, %address, transport="libp2p", "expired listen address")
500                    }
501                    SwarmEvent::ListenerClosed {
502                        listener_id,
503                        addresses,
504                        reason,
505                    } => {
506                        debug!(%listener_id, ?addresses, ?reason, transport="libp2p", "listener closed", )
507                    }
508                    SwarmEvent::ListenerError {
509                        listener_id,
510                        error,
511                    } => {
512                        debug!(%listener_id, transport="libp2p", %error, "listener error")
513                    }
514                    SwarmEvent::Dialing {
515                        peer_id,
516                        connection_id,
517                    } => {
518                        debug!(peer = ?peer_id, %connection_id, transport="libp2p", "dialing")
519                    }
520                    SwarmEvent::NewExternalAddrCandidate {
521                        ..  // address: Multiaddr
522                    } => {}
523                    SwarmEvent::ExternalAddrConfirmed {
524                        ..  // address: Multiaddr
525                    } => {}
526                    SwarmEvent::ExternalAddrExpired {
527                        ..  // address: Multiaddr
528                    } => {}
529                    SwarmEvent::NewExternalAddrOfPeer {
530                        peer_id, address
531                    } => {
532                        trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "New peer stored in swarm")
533                    },
534                    _ => trace!(transport="libp2p", "Unsupported enum option detected")
535                }
536            }
537        }
538    }
539}