Skip to main content

hopr_transport/protocol/pipeline/
builder.rs

1//! Builder for constructing the HOPR packet pipeline.
2
3use bytes::Bytes;
4use hopr_api::{
5    PeerId,
6    node::TicketEvent,
7    types::{crypto::prelude::*, internal::prelude::*},
8};
9use hopr_crypto_packet::HoprSurb;
10use hopr_protocol_app::prelude::*;
11use hopr_protocol_hopr::prelude::*;
12use hopr_utils::runtime::AbortableList;
13
14use super::{
15    NodeType, NoopTicketProcessor, PacketPipelineProcesses, config::PacketPipelineConfig, run_packet_pipeline_inner,
16};
17use crate::PeerProtocolCounterRegistry;
18
19/// Placeholder type used by [`PacketPipelineBuilder`] for generic parameters that have
20/// not yet been provided via the corresponding builder method.
21pub struct Unset;
22
23/// Builder for constructing the HOPR packet pipeline for a specific node type.
24///
25/// The builder is constructed from a packet key via [`PacketPipelineBuilder::new`]; the
26/// transport, codec and application API must then be provided via the
27/// [`PacketPipelineBuilder::transport`], [`PacketPipelineBuilder::codec`] and
28/// [`PacketPipelineBuilder::api`] builder methods.
29///
30/// Terminal methods for each node type are then exposed:
31/// - [`PacketPipelineBuilder::build_for_relay`] — full pipeline, requires ticket processing via
32///   [`PacketPipelineBuilder::with_ticket_processing`].
33/// - [`PacketPipelineBuilder::build_for_entry`] — Entry nodes; the incoming acknowledgement pipeline is not started at
34///   all.
35/// - [`PacketPipelineBuilder::build_for_exit`] — Exit nodes; the incoming acknowledgement pipeline is started but only
36///   drains the stream (no ticket processing).
37///
38/// The pipeline does not handle mixing itself; it needs to be injected as a separate process
39/// overlay on top of the `wire_msg` Stream or Sink.
40pub struct PacketPipelineBuilder<WIn, WOut, C, D, T, TEvt, AppOut, AppIn> {
41    packet_key: OffchainKeypair,
42    wire_msg: (WOut, WIn),
43    codec: (C, D),
44    cfg: PacketPipelineConfig,
45    api: (AppOut, AppIn),
46    counters: PeerProtocolCounterRegistry,
47    ticket_proc: Option<T>,
48    ticket_events: Option<TEvt>,
49}
50
51impl
52    PacketPipelineBuilder<
53        Unset,
54        Unset,
55        Unset,
56        Unset,
57        NoopTicketProcessor,
58        futures::sink::Drain<TicketEvent>,
59        Unset,
60        Unset,
61    >
62{
63    /// Creates a new builder with the common parameters shared by all node types.
64    ///
65    /// The transport, codec and application API must be supplied via
66    /// [`PacketPipelineBuilder::transport`], [`PacketPipelineBuilder::codec`] and
67    /// [`PacketPipelineBuilder::api`] before any of the terminal `build_for_*` methods can
68    /// be called.
69    ///
70    /// The pipeline configuration defaults to [`PacketPipelineConfig::default`]; use
71    /// [`PacketPipelineBuilder::with_config`] to override it. The per-peer counter registry
72    /// defaults to an empty one; use [`PacketPipelineBuilder::with_counters`] to override it.
73    ///
74    /// Use [`PacketPipelineBuilder::with_ticket_processing`] to attach ticket processing
75    /// before calling [`PacketPipelineBuilder::build_for_relay`].
76    pub fn new(packet_key: OffchainKeypair) -> Self {
77        Self {
78            packet_key,
79            wire_msg: (Unset, Unset),
80            codec: (Unset, Unset),
81            cfg: PacketPipelineConfig::default(),
82            api: (Unset, Unset),
83            counters: PeerProtocolCounterRegistry::default(),
84            ticket_proc: None,
85            ticket_events: None,
86        }
87    }
88}
89
90impl<WIn, WOut, C, D, T, TEvt, AppOut, AppIn> PacketPipelineBuilder<WIn, WOut, C, D, T, TEvt, AppOut, AppIn> {
91    /// Overrides the default [`PacketPipelineConfig`].
92    #[must_use]
93    pub fn with_config(mut self, cfg: PacketPipelineConfig) -> Self {
94        self.cfg = cfg;
95        self
96    }
97
98    /// Overrides the default (empty) per-peer protocol counter registry.
99    #[must_use]
100    pub fn with_counters(mut self, counters: PeerProtocolCounterRegistry) -> Self {
101        self.counters = counters;
102        self
103    }
104
105    /// Sets the underlying wire-message transport (outgoing sink, incoming stream).
106    #[must_use]
107    pub fn transport<WIn2, WOut2>(
108        self,
109        wire_msg: (WOut2, WIn2),
110    ) -> PacketPipelineBuilder<WIn2, WOut2, C, D, T, TEvt, AppOut, AppIn> {
111        PacketPipelineBuilder {
112            packet_key: self.packet_key,
113            wire_msg,
114            codec: self.codec,
115            cfg: self.cfg,
116            api: self.api,
117            counters: self.counters,
118            ticket_proc: self.ticket_proc,
119            ticket_events: self.ticket_events,
120        }
121    }
122
123    /// Sets the packet codec (encoder, decoder).
124    #[must_use]
125    pub fn codec<C2, D2>(self, codec: (C2, D2)) -> PacketPipelineBuilder<WIn, WOut, C2, D2, T, TEvt, AppOut, AppIn> {
126        PacketPipelineBuilder {
127            packet_key: self.packet_key,
128            wire_msg: self.wire_msg,
129            codec,
130            cfg: self.cfg,
131            api: self.api,
132            counters: self.counters,
133            ticket_proc: self.ticket_proc,
134            ticket_events: self.ticket_events,
135        }
136    }
137
138    /// Sets the application API (outgoing sink for received data, incoming stream for data to send).
139    #[must_use]
140    pub fn api<AppOut2, AppIn2>(
141        self,
142        api: (AppOut2, AppIn2),
143    ) -> PacketPipelineBuilder<WIn, WOut, C, D, T, TEvt, AppOut2, AppIn2> {
144        PacketPipelineBuilder {
145            packet_key: self.packet_key,
146            wire_msg: self.wire_msg,
147            codec: self.codec,
148            cfg: self.cfg,
149            api,
150            counters: self.counters,
151            ticket_proc: self.ticket_proc,
152            ticket_events: self.ticket_events,
153        }
154    }
155
156    /// Attaches a ticket processor and a ticket-events sink to the builder.
157    ///
158    /// Required before calling [`PacketPipelineBuilder::build_for_relay`]. Has no effect on
159    /// Entry/Exit builds, which never process tickets.
160    #[must_use]
161    pub fn with_ticket_processing<T2, TEvt2>(
162        self,
163        ticket_proc: T2,
164        ticket_events: TEvt2,
165    ) -> PacketPipelineBuilder<WIn, WOut, C, D, T2, TEvt2, AppOut, AppIn>
166    where
167        T2: UnacknowledgedTicketProcessor + Sync + Send + 'static,
168        TEvt2: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
169        TEvt2::Error: std::error::Error,
170    {
171        PacketPipelineBuilder {
172            packet_key: self.packet_key,
173            wire_msg: self.wire_msg,
174            codec: self.codec,
175            cfg: self.cfg,
176            api: self.api,
177            counters: self.counters,
178            ticket_proc: Some(ticket_proc),
179            ticket_events: Some(ticket_events),
180        }
181    }
182}
183
184impl<WIn, WOut, C, D, T, TEvt, AppOut, AppIn> PacketPipelineBuilder<WIn, WOut, C, D, T, TEvt, AppOut, AppIn>
185where
186    WOut: futures::Sink<(PeerId, Bytes)> + Clone + Unpin + Send + 'static,
187    WOut::Error: std::error::Error,
188    WIn: futures::Stream<Item = (PeerId, Bytes)> + Send + 'static,
189    C: PacketEncoder + Sync + Send + 'static,
190    D: PacketDecoder + Sync + Send + 'static,
191    T: UnacknowledgedTicketProcessor + Sync + Send + 'static,
192    TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
193    TEvt::Error: std::error::Error,
194    AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
195    AppOut::Error: std::error::Error,
196    AppIn: futures::Stream<Item = (ResolvedTransportRouting<HoprSurb>, ApplicationDataOut)> + Send + 'static,
197{
198    /// Builds and starts the full packet pipeline for a HOPR **Relay** node.
199    ///
200    /// Relay nodes run the full pipeline: outgoing/incoming messages, outgoing acknowledgements,
201    /// and incoming acknowledgements (with ticket processing).
202    ///
203    /// # Panics
204    ///
205    /// Panics if [`PacketPipelineBuilder::with_ticket_processing`] was not called before this method.
206    #[must_use]
207    pub fn build_for_relay(self) -> AbortableList<PacketPipelineProcesses> {
208        let ticket_proc = self
209            .ticket_proc
210            .expect("Relay node requires ticket processing; call with_ticket_processing() first");
211        let ticket_events = self
212            .ticket_events
213            .expect("Relay node requires ticket processing; call with_ticket_processing() first");
214        run_packet_pipeline_inner(
215            NodeType::Relay,
216            self.packet_key,
217            self.wire_msg,
218            self.codec,
219            ticket_proc,
220            ticket_events,
221            self.cfg,
222            self.api,
223            self.counters,
224        )
225    }
226}
227
228impl<WIn, WOut, C, D, T, TEvt, AppOut, AppIn> PacketPipelineBuilder<WIn, WOut, C, D, T, TEvt, AppOut, AppIn>
229where
230    WOut: futures::Sink<(PeerId, Bytes)> + Clone + Unpin + Send + 'static,
231    WOut::Error: std::error::Error,
232    WIn: futures::Stream<Item = (PeerId, Bytes)> + Send + 'static,
233    C: PacketEncoder + Sync + Send + 'static,
234    D: PacketDecoder + Sync + Send + 'static,
235    AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
236    AppOut::Error: std::error::Error,
237    AppIn: futures::Stream<Item = (ResolvedTransportRouting<HoprSurb>, ApplicationDataOut)> + Send + 'static,
238{
239    /// Builds and starts the packet pipeline for a HOPR **Entry** node.
240    ///
241    /// Entry nodes never relay packets and therefore do not process tickets. As a consequence,
242    /// the incoming acknowledgement pipeline is **not** started.
243    /// Any ticket processor or ticket events sink previously set via
244    /// [`PacketPipelineBuilder::with_ticket_processing`] is ignored.
245    #[must_use]
246    pub fn build_for_entry(self) -> AbortableList<PacketPipelineProcesses> {
247        run_packet_pipeline_inner::<_, _, _, _, NoopTicketProcessor, futures::sink::Drain<TicketEvent>, _, _>(
248            NodeType::Entry,
249            self.packet_key,
250            self.wire_msg,
251            self.codec,
252            NoopTicketProcessor,
253            futures::sink::drain(),
254            self.cfg,
255            self.api,
256            self.counters,
257        )
258    }
259
260    /// Builds and starts the packet pipeline for a HOPR **Exit** node.
261    ///
262    /// Exit nodes do not process tickets either. However, in contrast to
263    /// [`PacketPipelineBuilder::build_for_entry`], the incoming acknowledgement pipeline is kept
264    /// running (it only drains the stream) to support future development.
265    /// Any ticket processor or ticket events sink previously set via
266    /// [`PacketPipelineBuilder::with_ticket_processing`] is ignored.
267    #[must_use]
268    pub fn build_for_exit(self) -> AbortableList<PacketPipelineProcesses> {
269        run_packet_pipeline_inner::<_, _, _, _, NoopTicketProcessor, futures::sink::Drain<TicketEvent>, _, _>(
270            NodeType::Exit,
271            self.packet_key,
272            self.wire_msg,
273            self.codec,
274            NoopTicketProcessor,
275            futures::sink::drain(),
276            self.cfg,
277            self.api,
278            self.counters,
279        )
280    }
281}