hopr_transport_p2p/behavior/
heartbeat.rs
1use std::{
6 collections::VecDeque,
7 task::{Context, Poll},
8};
9
10use futures::stream::{BoxStream, Stream, StreamExt};
11use libp2p::{
12 swarm::{dummy::ConnectionHandler, NetworkBehaviour, ToSwarm},
13 PeerId,
14};
15
16use hopr_transport_network::ping::PingQueryReplier;
17
18#[derive(Debug)]
19pub enum Event {
20 ToProbe((PeerId, PingQueryReplier)),
21}
22
23pub struct Behaviour {
24 events: BoxStream<'static, (PeerId, PingQueryReplier)>,
25 pending_events: VecDeque<
26 libp2p::swarm::ToSwarm<
27 <Self as NetworkBehaviour>::ToSwarm,
28 <<Self as NetworkBehaviour>::ConnectionHandler as libp2p::swarm::ConnectionHandler>::FromBehaviour,
29 >,
30 >,
31}
32
33impl Behaviour {
34 pub fn new<T>(heartbeat_queue: T) -> Self
35 where
36 T: Stream<Item = (PeerId, PingQueryReplier)> + Send + 'static,
37 {
38 Self {
39 events: Box::pin(heartbeat_queue),
40 pending_events: VecDeque::new(),
41 }
42 }
43}
44
45impl NetworkBehaviour for Behaviour {
46 type ConnectionHandler = ConnectionHandler;
47
48 type ToSwarm = Event;
49
50 fn handle_established_inbound_connection(
51 &mut self,
52 _connection_id: libp2p::swarm::ConnectionId,
53 _peer: libp2p::PeerId,
54 _local_addr: &libp2p::Multiaddr,
55 _remote_addr: &libp2p::Multiaddr,
56 ) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
57 Ok(Self::ConnectionHandler {})
58 }
59
60 fn handle_established_outbound_connection(
61 &mut self,
62 _connection_id: libp2p::swarm::ConnectionId,
63 _peer: libp2p::PeerId,
64 _addr: &libp2p::Multiaddr,
65 _role_override: libp2p::core::Endpoint,
66 _port_use: libp2p::core::transport::PortUse,
67 ) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
68 Ok(Self::ConnectionHandler {})
69 }
70
71 fn on_swarm_event(&mut self, _event: libp2p::swarm::FromSwarm) {
72 }
74
75 fn on_connection_handler_event(
76 &mut self,
77 _peer_id: libp2p::PeerId,
78 _connection_id: libp2p::swarm::ConnectionId,
79 _event: libp2p::swarm::THandlerOutEvent<Self>,
80 ) {
81 }
83
84 fn poll(
85 &mut self,
86 cx: &mut Context<'_>,
87 ) -> Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
88 if let Some(value) = self.pending_events.pop_front() {
89 return Poll::Ready(value);
90 };
91
92 match self.events.poll_next_unpin(cx) {
93 Poll::Ready(Some((peer_id, replier))) => {
94 if let Some(value) = self.pending_events.pop_front() {
95 self.pending_events
96 .push_back(ToSwarm::GenerateEvent(Event::ToProbe((peer_id, replier))));
97 Poll::Ready(value)
98 } else {
99 Poll::Ready(ToSwarm::GenerateEvent(Event::ToProbe((peer_id, replier))))
100 }
101 }
102 Poll::Ready(None) | Poll::Pending => Poll::Pending,
103 }
104 }
105}