Skip to main content

hopr_transport_p2p/behavior/
discovery.rs

1//! The discovery mechanism uses an external stimulus to trigger the discovery
2//! process on the libp2p side. It is responsible for processing the events
3//! generated by other components and passing them to the libp2p swarm in
4//! an appropriate format.
5use std::{
6    cmp::{Ordering, Reverse},
7    collections::{BinaryHeap, HashMap, VecDeque},
8};
9
10use backon::BackoffBuilder;
11use futures::stream::{BoxStream, Stream, StreamExt};
12use libp2p::{
13    Multiaddr, PeerId,
14    core::Endpoint,
15    swarm::{
16        ConnectionDenied, ConnectionId, DialFailure, NetworkBehaviour, ToSwarm, dial_opts::DialOpts,
17        dummy::ConnectionHandler,
18    },
19};
20
21use crate::PeerDiscovery;
22
23/// Data structure holding the item alongside a release timemestamp.
24///
25/// The ordering functionality is defined only over the release timestamp.
26pub struct Delayed<T> {
27    pub release_at: std::time::Instant,
28    pub item: T,
29}
30
31impl<T> PartialEq for Delayed<T> {
32    fn eq(&self, other: &Self) -> bool {
33        self.release_at == other.release_at
34    }
35}
36
37impl<T> PartialOrd for Delayed<T> {
38    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
39        Some(self.cmp(other))
40    }
41}
42
43impl<T> Eq for Delayed<T> {}
44
45impl<T> Ord for Delayed<T> {
46    fn cmp(&self, other: &Self) -> Ordering {
47        self.release_at.cmp(&other.release_at)
48    }
49}
50
51impl<T> From<(std::time::Instant, T)> for Delayed<T> {
52    fn from(value: (std::time::Instant, T)) -> Self {
53        Self {
54            release_at: value.0,
55            item: value.1,
56        }
57    }
58}
59
60#[inline]
61fn initial_backoff() -> backon::ExponentialBackoff {
62    backon::ExponentialBuilder::new()
63        .with_min_delay(std::time::Duration::from_secs(3))
64        .with_max_delay(std::time::Duration::from_secs(120))
65        .with_factor(1.5)
66        .with_jitter()
67        .build()
68}
69
70pub struct Behaviour {
71    me: PeerId,
72    events: BoxStream<'static, PeerDiscovery>,
73    pending_events: VecDeque<
74        libp2p::swarm::ToSwarm<
75            <Self as NetworkBehaviour>::ToSwarm,
76            <<Self as NetworkBehaviour>::ConnectionHandler as libp2p::swarm::ConnectionHandler>::FromBehaviour,
77        >,
78    >,
79    bootstrap_peers: HashMap<PeerId, Vec<Multiaddr>>,
80    connected_peers: HashMap<PeerId, usize>,
81    next_dial_attempts: BinaryHeap<Reverse<Delayed<PeerId>>>,
82    not_connected_peers: HashMap<PeerId, backon::ExponentialBackoff>,
83}
84
85impl Behaviour {
86    pub fn new<T>(me: PeerId, external_discovery_events: T) -> Self
87    where
88        T: Stream<Item = PeerDiscovery> + Send + 'static,
89    {
90        Self {
91            me,
92            events: Box::pin(external_discovery_events),
93            bootstrap_peers: HashMap::new(),
94            pending_events: VecDeque::new(),
95            connected_peers: HashMap::new(),
96            next_dial_attempts: BinaryHeap::with_capacity(1500),
97            not_connected_peers: HashMap::new(),
98        }
99    }
100
101    fn schedule_dial_with(&mut self, peer: PeerId, mut backoff: backon::ExponentialBackoff) {
102        let duration = backoff.next().unwrap_or_else(|| {
103            tracing::debug!(%peer, "failed to get next backoff duration, using 10s");
104            std::time::Duration::from_secs(10)
105        });
106        self.not_connected_peers.insert(peer, backoff);
107        self.next_dial_attempts.push(Reverse(Delayed {
108            release_at: std::time::Instant::now() + duration,
109            item: peer,
110        }));
111    }
112}
113
114impl NetworkBehaviour for Behaviour {
115    type ConnectionHandler = ConnectionHandler;
116    type ToSwarm = ();
117
118    #[tracing::instrument(
119        level = "debug",
120        name = "Discovery::handle_established_inbound_connection",
121        skip(self),
122        fields(transport = "p2p discovery"),
123        err(Display)
124    )]
125    fn handle_established_inbound_connection(
126        &mut self,
127        connection_id: libp2p::swarm::ConnectionId,
128        peer: libp2p::PeerId,
129        local_addr: &libp2p::Multiaddr,
130        remote_addr: &libp2p::Multiaddr,
131    ) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
132        Some(Self::ConnectionHandler {}).ok_or_else(|| {
133            libp2p::swarm::ConnectionDenied::new(crate::errors::P2PError::Logic(format!(
134                "Connection from '{peer}' is not allowed"
135            )))
136        })
137    }
138
139    #[tracing::instrument(
140        level = "debug",
141        name = "Discovery::handle_pending_outbound_connection"
142        skip(self),
143        fields(transport = "p2p discovery"),
144        ret(Debug),
145        err(Display)
146    )]
147    fn handle_pending_outbound_connection(
148        &mut self,
149        connection_id: ConnectionId,
150        maybe_peer: Option<PeerId>,
151        addresses: &[Multiaddr],
152        effective_role: Endpoint,
153    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
154        if let Some(peer) = maybe_peer {
155            // inject the multiaddress of the peer for possible dial usage by stream protocols
156            return Ok(self
157                .bootstrap_peers
158                .get(&peer)
159                .map_or(vec![], |addresses| addresses.clone()));
160        }
161
162        Ok(vec![])
163    }
164
165    #[tracing::instrument(
166        level = "trace",
167        name = "Discovery::handle_established_outbound_connection",
168        skip(self),
169        fields(transport = "p2p discovery"),
170        err(Display)
171    )]
172    fn handle_established_outbound_connection(
173        &mut self,
174        connection_id: libp2p::swarm::ConnectionId,
175        peer: libp2p::PeerId,
176        addr: &libp2p::Multiaddr,
177        role_override: libp2p::core::Endpoint,
178        port_use: libp2p::core::transport::PortUse,
179    ) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
180        Ok(Self::ConnectionHandler {})
181    }
182
183    #[tracing::instrument(
184        level = "debug",
185        name = "Discovery::on_swarm_event"
186        skip(self),
187        fields(transport = "p2p discovery"),
188    )]
189    fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) {
190        match event {
191            libp2p::swarm::FromSwarm::ConnectionEstablished(data) => {
192                *self.connected_peers.entry(data.peer_id).or_insert(0) += 1;
193                self.not_connected_peers.remove(&data.peer_id);
194            }
195            libp2p::swarm::FromSwarm::ConnectionClosed(data) => {
196                let v = self.connected_peers.entry(data.peer_id).or_insert(0);
197                if *v > 0 {
198                    *v -= 1;
199                }
200
201                let has_no_more_live_connections = *v == 0;
202
203                if has_no_more_live_connections {
204                    self.connected_peers.remove(&data.peer_id);
205                    self.schedule_dial_with(data.peer_id, initial_backoff());
206                }
207            }
208            libp2p::swarm::FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
209                tracing::debug!(?peer_id, %error, "Failed to dial peer");
210
211                // on a failed dial get the next scheduled dial using the backoff
212                if let Some(peer) = peer_id {
213                    let backoff = self.not_connected_peers.remove(&peer).unwrap_or_else(|| {
214                        tracing::debug!(%peer, "no backoff for a failed dial, creating new backoff");
215                        initial_backoff()
216                    });
217                    self.schedule_dial_with(peer, backoff);
218                }
219            }
220            _ => {}
221        }
222    }
223
224    fn on_connection_handler_event(
225        &mut self,
226        _peer_id: libp2p::PeerId,
227        _connection_id: libp2p::swarm::ConnectionId,
228        _event: libp2p::swarm::THandlerOutEvent<Self>,
229    ) {
230        // Nothing is necessary here, because no ConnectionHandler events should be generated
231    }
232
233    #[tracing::instrument(
234        level = "debug",
235        name = "Discovery::poll"
236        skip(self, cx),
237        fields(transport = "p2p discovery")
238    )]
239    fn poll(
240        &mut self,
241        cx: &mut std::task::Context<'_>,
242    ) -> std::task::Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
243        if let Some(value) = self.pending_events.pop_front() {
244            return std::task::Poll::Ready(value);
245        };
246
247        let poll_result = self.events.poll_next_unpin(cx).map(|e| {
248            if let Some(PeerDiscovery::Announce(peer, multiaddresses)) = e
249                && peer != self.me
250            {
251                tracing::debug!(%peer, addresses = ?&multiaddresses, "Announcement");
252
253                for multiaddress in &multiaddresses {
254                    self.pending_events.push_back(ToSwarm::NewExternalAddrOfPeer {
255                        peer_id: peer,
256                        address: multiaddress.clone(),
257                    });
258                }
259
260                // Store announced addresses for later dialing / protocol use
261                if !multiaddresses.is_empty() {
262                    self.bootstrap_peers.insert(peer, multiaddresses);
263                    self.schedule_dial_with(peer, initial_backoff());
264                }
265            }
266        });
267
268        if matches!(poll_result, std::task::Poll::Ready(_))
269            && let Some(value) = self.pending_events.pop_front()
270        {
271            return std::task::Poll::Ready(value);
272        }
273
274        let now = std::time::Instant::now();
275        while self
276            .next_dial_attempts
277            .peek()
278            .map(|x| x.0.release_at <= now)
279            .unwrap_or(false)
280        {
281            let peer = self
282                .next_dial_attempts
283                .pop()
284                .expect("The value should be present within the same access")
285                .0
286                .item;
287
288            // Skip stale entries (peer reconnected or was otherwise unscheduled)
289            if !self.not_connected_peers.contains_key(&peer) {
290                continue;
291            }
292
293            tracing::trace!(%peer, "attempting a new dial attempt item");
294            self.pending_events.push_back(ToSwarm::Dial {
295                opts: DialOpts::peer_id(peer).build(),
296            });
297        }
298
299        if let Some(value) = self.pending_events.pop_front() {
300            std::task::Poll::Ready(value)
301        } else {
302            std::task::Poll::Pending
303        }
304    }
305}