hopr_transport_p2p/behavior/
heartbeat.rs

1/// Behavior generating heartbeat requests for peers to be pinged.
2///
3/// This behavior is used with cooperation with the heartbeat request response protocol
4/// and as such is used primarily as the source of peers to be pinged.
5use std::{
6    collections::VecDeque,
7    task::{Context, Poll},
8};
9
10use futures::stream::{BoxStream, Stream, StreamExt};
11use libp2p::{
12    swarm::{dummy::ConnectionHandler, NetworkBehaviour, ToSwarm},
13    PeerId,
14};
15
16use hopr_transport_network::ping::PingQueryReplier;
17
18#[derive(Debug)]
19pub enum Event {
20    ToProbe((PeerId, PingQueryReplier)),
21}
22
23pub struct Behaviour {
24    events: BoxStream<'static, (PeerId, PingQueryReplier)>,
25    pending_events: VecDeque<
26        libp2p::swarm::ToSwarm<
27            <Self as NetworkBehaviour>::ToSwarm,
28            <<Self as NetworkBehaviour>::ConnectionHandler as libp2p::swarm::ConnectionHandler>::FromBehaviour,
29        >,
30    >,
31}
32
33impl Behaviour {
34    pub fn new<T>(heartbeat_queue: T) -> Self
35    where
36        T: Stream<Item = (PeerId, PingQueryReplier)> + Send + 'static,
37    {
38        Self {
39            events: Box::pin(heartbeat_queue),
40            pending_events: VecDeque::new(),
41        }
42    }
43}
44
45impl NetworkBehaviour for Behaviour {
46    type ConnectionHandler = ConnectionHandler;
47
48    type ToSwarm = Event;
49
50    fn handle_established_inbound_connection(
51        &mut self,
52        _connection_id: libp2p::swarm::ConnectionId,
53        _peer: libp2p::PeerId,
54        _local_addr: &libp2p::Multiaddr,
55        _remote_addr: &libp2p::Multiaddr,
56    ) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
57        Ok(Self::ConnectionHandler {})
58    }
59
60    fn handle_established_outbound_connection(
61        &mut self,
62        _connection_id: libp2p::swarm::ConnectionId,
63        _peer: libp2p::PeerId,
64        _addr: &libp2p::Multiaddr,
65        _role_override: libp2p::core::Endpoint,
66        _port_use: libp2p::core::transport::PortUse,
67    ) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
68        Ok(Self::ConnectionHandler {})
69    }
70
71    fn on_swarm_event(&mut self, _event: libp2p::swarm::FromSwarm) {
72        // No reaction to swarm event is necessary here, responses are handled by the protocol
73    }
74
75    fn on_connection_handler_event(
76        &mut self,
77        _peer_id: libp2p::PeerId,
78        _connection_id: libp2p::swarm::ConnectionId,
79        _event: libp2p::swarm::THandlerOutEvent<Self>,
80    ) {
81        // Nothing is necessary here, because no ConnectionHandler events should be generated
82    }
83
84    fn poll(
85        &mut self,
86        cx: &mut Context<'_>,
87    ) -> Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
88        if let Some(value) = self.pending_events.pop_front() {
89            return Poll::Ready(value);
90        };
91
92        match self.events.poll_next_unpin(cx) {
93            Poll::Ready(Some((peer_id, replier))) => {
94                if let Some(value) = self.pending_events.pop_front() {
95                    self.pending_events
96                        .push_back(ToSwarm::GenerateEvent(Event::ToProbe((peer_id, replier))));
97                    Poll::Ready(value)
98                } else {
99                    Poll::Ready(ToSwarm::GenerateEvent(Event::ToProbe((peer_id, replier))))
100                }
101            }
102            Poll::Ready(None) | Poll::Pending => Poll::Pending,
103        }
104    }
105}