hopr_transport_p2p/behavior/
ticket_aggregation.rs

1/// Behavior emitting the ticket aggregation processor events used to trigger the ticket
2/// aggregation request response protocol.
3///
4/// This behavior is used with cooperation with the ticket aggregation request response protocol
5/// and as such is used primarily as the source of events to be performed with ticket aggregation
6/// on the wire level.
7use std::{
8    collections::VecDeque,
9    task::{Context, Poll},
10};
11
12use futures::stream::{BoxStream, Stream, StreamExt};
13use libp2p::swarm::{dummy::ConnectionHandler, NetworkBehaviour, ToSwarm};
14
15use hopr_transport_protocol::ticket_aggregation::processor::TicketAggregationProcessed;
16
17use crate::swarm::{TicketAggregationRequestType, TicketAggregationResponseType};
18
19pub type Event = TicketAggregationProcessed<TicketAggregationResponseType, TicketAggregationRequestType>;
20
21pub struct Behaviour {
22    events: BoxStream<'static, Event>,
23    pending_events: VecDeque<
24        libp2p::swarm::ToSwarm<
25            <Self as NetworkBehaviour>::ToSwarm,
26            <<Self as NetworkBehaviour>::ConnectionHandler as libp2p::swarm::ConnectionHandler>::FromBehaviour,
27        >,
28    >,
29}
30
31impl Behaviour {
32    pub fn new<T>(ticket_aggregation_events: T) -> Self
33    where
34        T: Stream<Item = Event> + Send + 'static,
35    {
36        Self {
37            events: Box::pin(ticket_aggregation_events),
38            pending_events: VecDeque::new(),
39        }
40    }
41}
42
43impl NetworkBehaviour for Behaviour {
44    type ConnectionHandler = ConnectionHandler;
45
46    type ToSwarm = Event;
47
48    fn handle_established_inbound_connection(
49        &mut self,
50        _connection_id: libp2p::swarm::ConnectionId,
51        _peer: libp2p::PeerId,
52        _local_addr: &libp2p::Multiaddr,
53        _remote_addr: &libp2p::Multiaddr,
54    ) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
55        Ok(Self::ConnectionHandler {})
56    }
57
58    fn handle_established_outbound_connection(
59        &mut self,
60        _connection_id: libp2p::swarm::ConnectionId,
61        _peer: libp2p::PeerId,
62        _addr: &libp2p::Multiaddr,
63        _role_override: libp2p::core::Endpoint,
64        _port_use: libp2p::core::transport::PortUse,
65    ) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
66        Ok(Self::ConnectionHandler {})
67    }
68
69    fn on_swarm_event(&mut self, _event: libp2p::swarm::FromSwarm) {
70        // No reaction to swarm event is necessary here, responses are handled by the protocol
71    }
72
73    fn on_connection_handler_event(
74        &mut self,
75        _peer_id: libp2p::PeerId,
76        _connection_id: libp2p::swarm::ConnectionId,
77        _event: libp2p::swarm::THandlerOutEvent<Self>,
78    ) {
79        // Nothing is necessary here, because no ConnectionHandler events should be generated
80    }
81
82    fn poll(
83        &mut self,
84        cx: &mut Context<'_>,
85    ) -> Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
86        if let Some(value) = self.pending_events.pop_front() {
87            return Poll::Ready(value);
88        };
89
90        match self.events.poll_next_unpin(cx) {
91            Poll::Ready(Some(ticket_agg_event)) => {
92                if let Some(value) = self.pending_events.pop_front() {
93                    self.pending_events.push_back(ToSwarm::GenerateEvent(ticket_agg_event));
94
95                    Poll::Ready(value)
96                } else {
97                    Poll::Ready(ToSwarm::GenerateEvent(ticket_agg_event))
98                }
99            }
100            Poll::Ready(None) | Poll::Pending => Poll::Pending,
101        }
102    }
103}