hopr_transport_p2p/behavior/
discovery.rs1use std::{
6 cmp::{Ordering, Reverse},
7 collections::{BinaryHeap, HashMap, VecDeque},
8};
9
10use backon::BackoffBuilder;
11use futures::stream::{BoxStream, Stream, StreamExt};
12use libp2p::{
13 Multiaddr, PeerId,
14 core::Endpoint,
15 swarm::{
16 ConnectionDenied, ConnectionId, DialFailure, NetworkBehaviour, ToSwarm, dial_opts::DialOpts,
17 dummy::ConnectionHandler,
18 },
19};
20
21use crate::PeerDiscovery;
22
23pub struct Delayed<T> {
27 pub release_at: std::time::Instant,
28 pub item: T,
29}
30
31impl<T> PartialEq for Delayed<T> {
32 fn eq(&self, other: &Self) -> bool {
33 self.release_at == other.release_at
34 }
35}
36
37impl<T> PartialOrd for Delayed<T> {
38 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
39 Some(self.cmp(other))
40 }
41}
42
43impl<T> Eq for Delayed<T> {}
44
45impl<T> Ord for Delayed<T> {
46 fn cmp(&self, other: &Self) -> Ordering {
47 self.release_at.cmp(&other.release_at)
48 }
49}
50
51impl<T> From<(std::time::Instant, T)> for Delayed<T> {
52 fn from(value: (std::time::Instant, T)) -> Self {
53 Self {
54 release_at: value.0,
55 item: value.1,
56 }
57 }
58}
59
60#[inline]
61fn initial_backoff() -> backon::ExponentialBackoff {
62 backon::ExponentialBuilder::new()
63 .with_min_delay(std::time::Duration::from_secs(3))
64 .with_max_delay(std::time::Duration::from_secs(120))
65 .with_factor(1.5)
66 .with_jitter()
67 .build()
68}
69
70pub struct Behaviour {
71 me: PeerId,
72 events: BoxStream<'static, PeerDiscovery>,
73 pending_events: VecDeque<
74 libp2p::swarm::ToSwarm<
75 <Self as NetworkBehaviour>::ToSwarm,
76 <<Self as NetworkBehaviour>::ConnectionHandler as libp2p::swarm::ConnectionHandler>::FromBehaviour,
77 >,
78 >,
79 bootstrap_peers: HashMap<PeerId, Vec<Multiaddr>>,
80 connected_peers: HashMap<PeerId, usize>,
81 next_dial_attempts: BinaryHeap<Reverse<Delayed<PeerId>>>,
82 not_connected_peers: HashMap<PeerId, backon::ExponentialBackoff>,
83}
84
85impl Behaviour {
86 pub fn new<T>(me: PeerId, external_discovery_events: T) -> Self
87 where
88 T: Stream<Item = PeerDiscovery> + Send + 'static,
89 {
90 Self {
91 me,
92 events: Box::pin(external_discovery_events),
93 bootstrap_peers: HashMap::new(),
94 pending_events: VecDeque::new(),
95 connected_peers: HashMap::new(),
96 next_dial_attempts: BinaryHeap::with_capacity(1500),
97 not_connected_peers: HashMap::new(),
98 }
99 }
100
101 fn schedule_dial_with(&mut self, peer: PeerId, mut backoff: backon::ExponentialBackoff) {
102 let duration = backoff.next().unwrap_or_else(|| {
103 tracing::debug!(%peer, "failed to get next backoff duration, using 10s");
104 std::time::Duration::from_secs(10)
105 });
106 self.not_connected_peers.insert(peer, backoff);
107 self.next_dial_attempts.push(Reverse(Delayed {
108 release_at: std::time::Instant::now() + duration,
109 item: peer,
110 }));
111 }
112}
113
114impl NetworkBehaviour for Behaviour {
115 type ConnectionHandler = ConnectionHandler;
116 type ToSwarm = ();
117
118 #[tracing::instrument(
119 level = "debug",
120 name = "Discovery::handle_established_inbound_connection",
121 skip(self),
122 fields(transport = "p2p discovery"),
123 err(Display)
124 )]
125 fn handle_established_inbound_connection(
126 &mut self,
127 connection_id: libp2p::swarm::ConnectionId,
128 peer: libp2p::PeerId,
129 local_addr: &libp2p::Multiaddr,
130 remote_addr: &libp2p::Multiaddr,
131 ) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
132 Some(Self::ConnectionHandler {}).ok_or_else(|| {
133 libp2p::swarm::ConnectionDenied::new(crate::errors::P2PError::Logic(format!(
134 "Connection from '{peer}' is not allowed"
135 )))
136 })
137 }
138
139 #[tracing::instrument(
140 level = "debug",
141 name = "Discovery::handle_pending_outbound_connection"
142 skip(self),
143 fields(transport = "p2p discovery"),
144 ret(Debug),
145 err(Display)
146 )]
147 fn handle_pending_outbound_connection(
148 &mut self,
149 connection_id: ConnectionId,
150 maybe_peer: Option<PeerId>,
151 addresses: &[Multiaddr],
152 effective_role: Endpoint,
153 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
154 if let Some(peer) = maybe_peer {
155 return Ok(self
157 .bootstrap_peers
158 .get(&peer)
159 .map_or(vec![], |addresses| addresses.clone()));
160 }
161
162 Ok(vec![])
163 }
164
165 #[tracing::instrument(
166 level = "trace",
167 name = "Discovery::handle_established_outbound_connection",
168 skip(self),
169 fields(transport = "p2p discovery"),
170 err(Display)
171 )]
172 fn handle_established_outbound_connection(
173 &mut self,
174 connection_id: libp2p::swarm::ConnectionId,
175 peer: libp2p::PeerId,
176 addr: &libp2p::Multiaddr,
177 role_override: libp2p::core::Endpoint,
178 port_use: libp2p::core::transport::PortUse,
179 ) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
180 Ok(Self::ConnectionHandler {})
181 }
182
183 #[tracing::instrument(
184 level = "debug",
185 name = "Discovery::on_swarm_event"
186 skip(self),
187 fields(transport = "p2p discovery"),
188 )]
189 fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) {
190 match event {
191 libp2p::swarm::FromSwarm::ConnectionEstablished(data) => {
192 *self.connected_peers.entry(data.peer_id).or_insert(0) += 1;
193 self.not_connected_peers.remove(&data.peer_id);
194 }
195 libp2p::swarm::FromSwarm::ConnectionClosed(data) => {
196 let v = self.connected_peers.entry(data.peer_id).or_insert(0);
197 if *v > 0 {
198 *v -= 1;
199 }
200
201 let has_no_more_live_connections = *v == 0;
202
203 if has_no_more_live_connections {
204 self.connected_peers.remove(&data.peer_id);
205 self.schedule_dial_with(data.peer_id, initial_backoff());
206 }
207 }
208 libp2p::swarm::FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
209 tracing::debug!(?peer_id, %error, "Failed to dial peer");
210
211 if let Some(peer) = peer_id {
213 let backoff = self.not_connected_peers.remove(&peer).unwrap_or_else(|| {
214 tracing::debug!(%peer, "no backoff for a failed dial, creating new backoff");
215 initial_backoff()
216 });
217 self.schedule_dial_with(peer, backoff);
218 }
219 }
220 _ => {}
221 }
222 }
223
224 fn on_connection_handler_event(
225 &mut self,
226 _peer_id: libp2p::PeerId,
227 _connection_id: libp2p::swarm::ConnectionId,
228 _event: libp2p::swarm::THandlerOutEvent<Self>,
229 ) {
230 }
232
233 #[tracing::instrument(
234 level = "debug",
235 name = "Discovery::poll"
236 skip(self, cx),
237 fields(transport = "p2p discovery")
238 )]
239 fn poll(
240 &mut self,
241 cx: &mut std::task::Context<'_>,
242 ) -> std::task::Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
243 if let Some(value) = self.pending_events.pop_front() {
244 return std::task::Poll::Ready(value);
245 };
246
247 let poll_result = self.events.poll_next_unpin(cx).map(|e| {
248 if let Some(PeerDiscovery::Announce(peer, multiaddresses)) = e
249 && peer != self.me
250 {
251 tracing::debug!(%peer, addresses = ?&multiaddresses, "Announcement");
252
253 for multiaddress in &multiaddresses {
254 self.pending_events.push_back(ToSwarm::NewExternalAddrOfPeer {
255 peer_id: peer,
256 address: multiaddress.clone(),
257 });
258 }
259
260 if !multiaddresses.is_empty() {
262 self.bootstrap_peers.insert(peer, multiaddresses);
263 self.schedule_dial_with(peer, initial_backoff());
264 }
265 }
266 });
267
268 if matches!(poll_result, std::task::Poll::Ready(_))
269 && let Some(value) = self.pending_events.pop_front()
270 {
271 return std::task::Poll::Ready(value);
272 }
273
274 let now = std::time::Instant::now();
275 while self
276 .next_dial_attempts
277 .peek()
278 .map(|x| x.0.release_at <= now)
279 .unwrap_or(false)
280 {
281 let peer = self
282 .next_dial_attempts
283 .pop()
284 .expect("The value should be present within the same access")
285 .0
286 .item;
287
288 if !self.not_connected_peers.contains_key(&peer) {
290 continue;
291 }
292
293 tracing::trace!(%peer, "attempting a new dial attempt item");
294 self.pending_events.push_back(ToSwarm::Dial {
295 opts: DialOpts::peer_id(peer).build(),
296 });
297 }
298
299 if let Some(value) = self.pending_events.pop_front() {
300 std::task::Poll::Ready(value)
301 } else {
302 std::task::Poll::Pending
303 }
304 }
305}