hopr_transport_p2p/behavior/
ticket_aggregation.rs1use std::{
8 collections::VecDeque,
9 task::{Context, Poll},
10};
11
12use futures::stream::{BoxStream, Stream, StreamExt};
13use libp2p::swarm::{dummy::ConnectionHandler, NetworkBehaviour, ToSwarm};
14
15use hopr_transport_protocol::ticket_aggregation::processor::TicketAggregationProcessed;
16
17use crate::swarm::{TicketAggregationRequestType, TicketAggregationResponseType};
18
19pub type Event = TicketAggregationProcessed<TicketAggregationResponseType, TicketAggregationRequestType>;
20
21pub struct Behaviour {
22 events: BoxStream<'static, Event>,
23 pending_events: VecDeque<
24 libp2p::swarm::ToSwarm<
25 <Self as NetworkBehaviour>::ToSwarm,
26 <<Self as NetworkBehaviour>::ConnectionHandler as libp2p::swarm::ConnectionHandler>::FromBehaviour,
27 >,
28 >,
29}
30
31impl Behaviour {
32 pub fn new<T>(ticket_aggregation_events: T) -> Self
33 where
34 T: Stream<Item = Event> + Send + 'static,
35 {
36 Self {
37 events: Box::pin(ticket_aggregation_events),
38 pending_events: VecDeque::new(),
39 }
40 }
41}
42
43impl NetworkBehaviour for Behaviour {
44 type ConnectionHandler = ConnectionHandler;
45
46 type ToSwarm = Event;
47
48 fn handle_established_inbound_connection(
49 &mut self,
50 _connection_id: libp2p::swarm::ConnectionId,
51 _peer: libp2p::PeerId,
52 _local_addr: &libp2p::Multiaddr,
53 _remote_addr: &libp2p::Multiaddr,
54 ) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
55 Ok(Self::ConnectionHandler {})
56 }
57
58 fn handle_established_outbound_connection(
59 &mut self,
60 _connection_id: libp2p::swarm::ConnectionId,
61 _peer: libp2p::PeerId,
62 _addr: &libp2p::Multiaddr,
63 _role_override: libp2p::core::Endpoint,
64 _port_use: libp2p::core::transport::PortUse,
65 ) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
66 Ok(Self::ConnectionHandler {})
67 }
68
69 fn on_swarm_event(&mut self, _event: libp2p::swarm::FromSwarm) {
70 }
72
73 fn on_connection_handler_event(
74 &mut self,
75 _peer_id: libp2p::PeerId,
76 _connection_id: libp2p::swarm::ConnectionId,
77 _event: libp2p::swarm::THandlerOutEvent<Self>,
78 ) {
79 }
81
82 fn poll(
83 &mut self,
84 cx: &mut Context<'_>,
85 ) -> Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
86 if let Some(value) = self.pending_events.pop_front() {
87 return Poll::Ready(value);
88 };
89
90 match self.events.poll_next_unpin(cx) {
91 Poll::Ready(Some(ticket_agg_event)) => {
92 if let Some(value) = self.pending_events.pop_front() {
93 self.pending_events.push_back(ToSwarm::GenerateEvent(ticket_agg_event));
94
95 Poll::Ready(value)
96 } else {
97 Poll::Ready(ToSwarm::GenerateEvent(ticket_agg_event))
98 }
99 }
100 Poll::Ready(None) | Poll::Pending => Poll::Pending,
101 }
102 }
103}