Skip to main content

hopr_transport/
pipeline.rs

1use bytes::Bytes;
2use hopr_api::{
3    chain::{ChainKeyOperations, ChainReadChannelOperations, ChainReadTicketOperations, ChainValues},
4    tickets::TicketFactory,
5    types::{
6        crypto::prelude::*,
7        internal::{prelude::*, routing::ResolvedTransportRouting},
8    },
9};
10use hopr_crypto_packet::HoprSurb;
11use hopr_protocol_app::prelude::*;
12use hopr_protocol_hopr::prelude::*;
13use hopr_utils::runtime::AbortableList;
14
15use crate::{
16    HoprTransportProcess, PeerProtocolCounterRegistry,
17    config::HoprPacketPipelineConfig,
18    protocol::{PacketPipelineBuilder, Unset},
19};
20
21/// Builder for the HOPR packet pipeline.
22///
23/// Creates the encoder/decoder, the unacknowledged-ticket processor, optionally hooks up the
24/// packet capture (when the `capture` feature is enabled) and finally delegates to the lower-level
25/// [`PacketPipelineBuilder`] to spawn the per-stage tasks. The shape of the spawned pipeline is
26/// selected by which terminal `build_for_*` method is called:
27///
28/// - [`HoprPacketPipelineBuilder::build_for_relay`] — full pipeline. Requires
29///   [`HoprPacketPipelineBuilder::with_ticket_events`] to be called beforehand.
30/// - [`HoprPacketPipelineBuilder::build_for_entry`] — Entry nodes. Ticket events are not needed (and any value
31///   previously set is ignored).
32/// - [`HoprPacketPipelineBuilder::build_for_exit`] — Exit nodes. Ticket events are not needed (and any value previously
33///   set is ignored).
34///
35/// The builder is constructed via [`HoprPacketPipelineBuilder::new`] which takes no arguments.
36/// The required components must then be supplied via the corresponding builder methods:
37/// [`identity`](HoprPacketPipelineBuilder::identity), [`transport`](HoprPacketPipelineBuilder::transport),
38/// [`api`](HoprPacketPipelineBuilder::api), [`surb_store`](HoprPacketPipelineBuilder::surb_store),
39/// [`chain_api`](HoprPacketPipelineBuilder::chain_api),
40/// [`ticket_factory`](HoprPacketPipelineBuilder::ticket_factory) and
41/// [`channels_dst`](HoprPacketPipelineBuilder::channels_dst).
42///
43/// The per-peer counter registry defaults to an empty one; override it via
44/// [`HoprPacketPipelineBuilder::with_counters`].
45///
46/// The configuration ([`HoprPacketPipelineConfig`]) is optional and defaults to
47/// `HoprPacketPipelineConfig::default()`; override it via [`HoprPacketPipelineBuilder::with_config`].
48pub struct HoprPacketPipelineBuilder<
49    WIn,
50    WOut,
51    Chain,
52    S,
53    TFact,
54    AppOut,
55    AppIn,
56    TEvt = futures::sink::Drain<hopr_api::node::TicketEvent>,
57> {
58    packet_key: Option<OffchainKeypair>,
59    chain_key: Option<ChainKeypair>,
60    wire_msg: (WOut, WIn),
61    api: (AppOut, AppIn),
62    surb_store: S,
63    chain_api: Chain,
64    ticket_factory: TFact,
65    counters: PeerProtocolCounterRegistry,
66    channels_dst: Option<Hash>,
67    cfg: HoprPacketPipelineConfig,
68    ticket_events: Option<TEvt>,
69}
70
71impl Default
72    for HoprPacketPipelineBuilder<
73        Unset,
74        Unset,
75        Unset,
76        Unset,
77        Unset,
78        Unset,
79        Unset,
80        futures::sink::Drain<hopr_api::node::TicketEvent>,
81    >
82{
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88impl
89    HoprPacketPipelineBuilder<
90        Unset,
91        Unset,
92        Unset,
93        Unset,
94        Unset,
95        Unset,
96        Unset,
97        futures::sink::Drain<hopr_api::node::TicketEvent>,
98    >
99{
100    /// Creates a new empty builder. All required components must then be supplied via the
101    /// corresponding builder methods before calling any of the terminal `build_for_*` methods.
102    pub fn new() -> Self {
103        Self {
104            packet_key: None,
105            chain_key: None,
106            wire_msg: (Unset, Unset),
107            api: (Unset, Unset),
108            surb_store: Unset,
109            chain_api: Unset,
110            ticket_factory: Unset,
111            counters: PeerProtocolCounterRegistry::default(),
112            channels_dst: None,
113            cfg: HoprPacketPipelineConfig::default(),
114            ticket_events: None,
115        }
116    }
117}
118
119impl<WIn, WOut, Chain, S, TFact, AppOut, AppIn, TEvt>
120    HoprPacketPipelineBuilder<WIn, WOut, Chain, S, TFact, AppOut, AppIn, TEvt>
121{
122    /// Overrides the default [`HoprPacketPipelineConfig`].
123    #[must_use]
124    pub fn with_config(mut self, cfg: HoprPacketPipelineConfig) -> Self {
125        self.cfg = cfg;
126        self
127    }
128
129    /// Overrides the default (empty) per-peer protocol counter registry.
130    #[must_use]
131    pub fn with_counters(mut self, counters: PeerProtocolCounterRegistry) -> Self {
132        self.counters = counters;
133        self
134    }
135
136    /// Sets the node identity (chain and offchain keypairs).
137    #[must_use]
138    pub fn identity<'a, I>(mut self, identity: I) -> Self
139    where
140        I: Into<(&'a ChainKeypair, &'a OffchainKeypair)>,
141    {
142        let (chain_key, packet_key) = identity.into();
143        self.chain_key = Some(chain_key.clone());
144        self.packet_key = Some(packet_key.clone());
145        self
146    }
147
148    /// Sets the channel-set domain separator used by the codec and ticket processor.
149    #[must_use]
150    pub fn channels_dst(mut self, channels_dst: Hash) -> Self {
151        self.channels_dst = Some(channels_dst);
152        self
153    }
154
155    /// Sets the underlying wire-message transport (outgoing sink, incoming stream).
156    #[must_use]
157    pub fn transport<WIn2, WOut2>(
158        self,
159        wire_msg: (WOut2, WIn2),
160    ) -> HoprPacketPipelineBuilder<WIn2, WOut2, Chain, S, TFact, AppOut, AppIn, TEvt> {
161        HoprPacketPipelineBuilder {
162            packet_key: self.packet_key,
163            chain_key: self.chain_key,
164            wire_msg,
165            api: self.api,
166            surb_store: self.surb_store,
167            chain_api: self.chain_api,
168            ticket_factory: self.ticket_factory,
169            counters: self.counters,
170            channels_dst: self.channels_dst,
171            cfg: self.cfg,
172            ticket_events: self.ticket_events,
173        }
174    }
175
176    /// Sets the application API (incoming sink, outgoing stream).
177    #[must_use]
178    pub fn api<AppOut2, AppIn2>(
179        self,
180        api: (AppOut2, AppIn2),
181    ) -> HoprPacketPipelineBuilder<WIn, WOut, Chain, S, TFact, AppOut2, AppIn2, TEvt> {
182        HoprPacketPipelineBuilder {
183            packet_key: self.packet_key,
184            chain_key: self.chain_key,
185            wire_msg: self.wire_msg,
186            api,
187            surb_store: self.surb_store,
188            chain_api: self.chain_api,
189            ticket_factory: self.ticket_factory,
190            counters: self.counters,
191            channels_dst: self.channels_dst,
192            cfg: self.cfg,
193            ticket_events: self.ticket_events,
194        }
195    }
196
197    /// Sets the SURB store used by the encoder/decoder.
198    #[must_use]
199    pub fn surb_store<S2>(
200        self,
201        surb_store: S2,
202    ) -> HoprPacketPipelineBuilder<WIn, WOut, Chain, S2, TFact, AppOut, AppIn, TEvt> {
203        HoprPacketPipelineBuilder {
204            packet_key: self.packet_key,
205            chain_key: self.chain_key,
206            wire_msg: self.wire_msg,
207            api: self.api,
208            surb_store,
209            chain_api: self.chain_api,
210            ticket_factory: self.ticket_factory,
211            counters: self.counters,
212            channels_dst: self.channels_dst,
213            cfg: self.cfg,
214            ticket_events: self.ticket_events,
215        }
216    }
217
218    /// Sets the chain API used by the encoder/decoder and the unacknowledged ticket processor.
219    #[must_use]
220    pub fn chain_api<Chain2>(
221        self,
222        chain_api: Chain2,
223    ) -> HoprPacketPipelineBuilder<WIn, WOut, Chain2, S, TFact, AppOut, AppIn, TEvt> {
224        HoprPacketPipelineBuilder {
225            packet_key: self.packet_key,
226            chain_key: self.chain_key,
227            wire_msg: self.wire_msg,
228            api: self.api,
229            surb_store: self.surb_store,
230            chain_api,
231            ticket_factory: self.ticket_factory,
232            counters: self.counters,
233            channels_dst: self.channels_dst,
234            cfg: self.cfg,
235            ticket_events: self.ticket_events,
236        }
237    }
238
239    /// Sets the ticket factory used by the encoder/decoder.
240    #[must_use]
241    pub fn ticket_factory<TFact2>(
242        self,
243        ticket_factory: TFact2,
244    ) -> HoprPacketPipelineBuilder<WIn, WOut, Chain, S, TFact2, AppOut, AppIn, TEvt> {
245        HoprPacketPipelineBuilder {
246            packet_key: self.packet_key,
247            chain_key: self.chain_key,
248            wire_msg: self.wire_msg,
249            api: self.api,
250            surb_store: self.surb_store,
251            chain_api: self.chain_api,
252            ticket_factory,
253            counters: self.counters,
254            channels_dst: self.channels_dst,
255            cfg: self.cfg,
256            ticket_events: self.ticket_events,
257        }
258    }
259
260    /// Attaches the ticket events sink. Required for Relay nodes (see
261    /// [`HoprPacketPipelineBuilder::build_for_relay`]); ignored by Entry and Exit nodes.
262    #[must_use]
263    pub fn with_ticket_events<TEvt2>(
264        self,
265        ticket_events: TEvt2,
266    ) -> HoprPacketPipelineBuilder<WIn, WOut, Chain, S, TFact, AppOut, AppIn, TEvt2> {
267        HoprPacketPipelineBuilder {
268            packet_key: self.packet_key,
269            chain_key: self.chain_key,
270            wire_msg: self.wire_msg,
271            api: self.api,
272            surb_store: self.surb_store,
273            chain_api: self.chain_api,
274            ticket_factory: self.ticket_factory,
275            counters: self.counters,
276            channels_dst: self.channels_dst,
277            cfg: self.cfg,
278            ticket_events: Some(ticket_events),
279        }
280    }
281}
282
283// Implementation detail: codec, decoder and optional capture wiring shared by the three terminals.
284impl<WIn, WOut, Chain, S, TFact, AppOut, AppIn, TEvt>
285    HoprPacketPipelineBuilder<WIn, WOut, Chain, S, TFact, AppOut, AppIn, TEvt>
286where
287    WOut: futures::Sink<(PeerId, Bytes)> + Clone + Unpin + Send + 'static,
288    WOut::Error: std::error::Error,
289    WIn: futures::Stream<Item = (PeerId, Bytes)> + Send + 'static,
290    Chain: ChainKeyOperations
291        + ChainReadChannelOperations
292        + ChainReadTicketOperations
293        + ChainValues
294        + Clone
295        + Send
296        + Sync
297        + 'static,
298    S: SurbStore + Clone + Send + Sync + 'static,
299    TFact: TicketFactory + Clone + Send + Sync + 'static,
300    AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
301    AppOut::Error: std::error::Error,
302    AppIn: futures::Stream<Item = (ResolvedTransportRouting<HoprSurb>, ApplicationDataOut)> + Send + 'static,
303{
304    /// Builds the codec pair (and capture wiring when enabled) and the unacknowledged ticket
305    /// processor, returning them together with an [`AbortableList`] already containing the
306    /// capture task if any was started.
307    #[allow(clippy::type_complexity)]
308    fn prepare(
309        self,
310    ) -> (
311        OffchainKeypair,
312        (WOut, WIn),
313        (AppOut, AppIn),
314        PeerProtocolCounterRegistry,
315        HoprUnacknowledgedTicketProcessor<Chain>,
316        Option<TEvt>,
317        HoprPacketPipelineConfig,
318        // Codec parts in their final shape (possibly wrapped by capture)
319        AbortableList<HoprTransportProcess>,
320        BuiltCodec<Chain, S, TFact>,
321    ) {
322        let HoprPacketPipelineBuilder {
323            packet_key,
324            chain_key,
325            wire_msg,
326            api,
327            surb_store,
328            chain_api,
329            ticket_factory,
330            counters,
331            channels_dst,
332            cfg,
333            ticket_events,
334        } = self;
335
336        let packet_key = packet_key.expect("identity() must be called before building the pipeline");
337        let chain_key = chain_key.expect("identity() must be called before building the pipeline");
338        let channels_dst = channels_dst.expect("channels_dst() must be called before building the pipeline");
339
340        let unack_ticket_proc = HoprUnacknowledgedTicketProcessor::new(
341            chain_api.clone(),
342            chain_key.clone(),
343            channels_dst,
344            cfg.ack_processor,
345        );
346
347        let encoder = HoprEncoder::new(
348            chain_key.clone(),
349            chain_api.clone(),
350            surb_store.clone(),
351            ticket_factory.clone(),
352            channels_dst,
353            cfg.codec,
354        );
355
356        let decoder = HoprDecoder::new(
357            (packet_key.clone(), chain_key.clone()),
358            chain_api.clone(),
359            surb_store,
360            ticket_factory.clone(),
361            channels_dst,
362            cfg.codec,
363        );
364
365        #[allow(unused_mut)]
366        let mut processes = AbortableList::default();
367
368        #[cfg(feature = "capture")]
369        let codec = {
370            use crate::capture;
371
372            let writer: Box<dyn capture::PacketWriter + Send + 'static> =
373                if let Ok(desc) = std::env::var("HOPR_CAPTURE_PACKETS") {
374                    if let Ok(pcap_writer) = std::fs::File::create(&desc).and_then(capture::PcapPacketWriter::new) {
375                        tracing::warn!("pcap file packet capture initialized to {desc}");
376                        Box::new(pcap_writer)
377                    } else {
378                        tracing::error!(desc, "failed to create packet capture: invalid socket address or file");
379                        Box::new(capture::NullWriter)
380                    }
381                } else {
382                    tracing::warn!("no packet capture specified");
383                    Box::new(capture::NullWriter)
384                };
385
386            let (sender, ah) = capture::packet_capture_channel(writer);
387            processes.insert(HoprTransportProcess::Capture, ah);
388            BuiltCodec::Captured(
389                capture::CapturePacketCodec::new(encoder, *packet_key.public(), sender.clone()),
390                capture::CapturePacketCodec::new(decoder, *packet_key.public(), sender),
391            )
392        };
393
394        #[cfg(not(feature = "capture"))]
395        let codec = BuiltCodec::Plain(encoder, decoder);
396
397        (
398            packet_key,
399            wire_msg,
400            api,
401            counters,
402            unack_ticket_proc,
403            ticket_events,
404            cfg,
405            processes,
406            codec,
407        )
408    }
409}
410
411/// Internal helper to keep the codec types abstracted between the capture/no-capture builds.
412enum BuiltCodec<Chain, S, TFact>
413where
414    Chain: ChainKeyOperations
415        + ChainReadChannelOperations
416        + ChainReadTicketOperations
417        + ChainValues
418        + Clone
419        + Send
420        + Sync
421        + 'static,
422    S: SurbStore + Clone + Send + Sync + 'static,
423    TFact: TicketFactory + Clone + Send + Sync + 'static,
424{
425    #[cfg(not(feature = "capture"))]
426    Plain(HoprEncoder<Chain, S, TFact>, HoprDecoder<Chain, S, TFact>),
427    #[cfg(feature = "capture")]
428    Captured(
429        crate::capture::CapturePacketCodec<HoprEncoder<Chain, S, TFact>>,
430        crate::capture::CapturePacketCodec<HoprDecoder<Chain, S, TFact>>,
431    ),
432}
433
434// Terminal: Relay
435impl<WIn, WOut, Chain, S, TFact, AppOut, AppIn, TEvt>
436    HoprPacketPipelineBuilder<WIn, WOut, Chain, S, TFact, AppOut, AppIn, TEvt>
437where
438    WOut: futures::Sink<(PeerId, Bytes)> + Clone + Unpin + Send + 'static,
439    WOut::Error: std::error::Error,
440    WIn: futures::Stream<Item = (PeerId, Bytes)> + Send + 'static,
441    Chain: ChainKeyOperations
442        + ChainReadChannelOperations
443        + ChainReadTicketOperations
444        + ChainValues
445        + Clone
446        + Send
447        + Sync
448        + 'static,
449    S: SurbStore + Clone + Send + Sync + 'static,
450    TEvt: futures::Sink<hopr_api::node::TicketEvent> + Clone + Unpin + Send + 'static,
451    TEvt::Error: std::error::Error,
452    TFact: TicketFactory + Clone + Send + Sync + 'static,
453    AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
454    AppOut::Error: std::error::Error,
455    AppIn: futures::Stream<Item = (ResolvedTransportRouting<HoprSurb>, ApplicationDataOut)> + Send + 'static,
456{
457    /// Builds the pipeline configured for a Relay node.
458    ///
459    /// # Panics
460    /// Panics if [`HoprPacketPipelineBuilder::with_ticket_events`] was not called.
461    pub fn build_for_relay(self) -> AbortableList<HoprTransportProcess> {
462        let (packet_key, wire_msg, api, counters, unack_ticket_proc, ticket_events, _cfg, mut processes, codec) =
463            self.prepare();
464
465        let ticket_events = ticket_events.expect("Relay node requires ticket events; call with_ticket_events() first");
466
467        let inner = match codec {
468            #[cfg(not(feature = "capture"))]
469            BuiltCodec::Plain(encoder, decoder) => PacketPipelineBuilder::new(packet_key.clone())
470                .transport(wire_msg)
471                .codec((encoder, decoder))
472                .api(api)
473                .with_counters(counters)
474                .with_config(_cfg.pipeline)
475                .with_ticket_processing(unack_ticket_proc, ticket_events)
476                .build_for_relay(),
477            #[cfg(feature = "capture")]
478            BuiltCodec::Captured(encoder, decoder) => PacketPipelineBuilder::new(packet_key.clone())
479                .transport(wire_msg)
480                .codec((encoder, decoder))
481                .api(api)
482                .with_counters(counters)
483                .with_config(_cfg.pipeline)
484                .with_ticket_processing(unack_ticket_proc, ticket_events)
485                .build_for_relay(),
486        };
487
488        processes.flat_map_extend_from(inner, HoprTransportProcess::Pipeline);
489        processes
490    }
491}
492
493// Terminal: Entry / Exit (no ticket events required)
494impl<WIn, WOut, Chain, S, TFact, AppOut, AppIn, TEvt>
495    HoprPacketPipelineBuilder<WIn, WOut, Chain, S, TFact, AppOut, AppIn, TEvt>
496where
497    WOut: futures::Sink<(PeerId, Bytes)> + Clone + Unpin + Send + 'static,
498    WOut::Error: std::error::Error,
499    WIn: futures::Stream<Item = (PeerId, Bytes)> + Send + 'static,
500    Chain: ChainKeyOperations
501        + ChainReadChannelOperations
502        + ChainReadTicketOperations
503        + ChainValues
504        + Clone
505        + Send
506        + Sync
507        + 'static,
508    S: SurbStore + Clone + Send + Sync + 'static,
509    TFact: TicketFactory + Clone + Send + Sync + 'static,
510    AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
511    AppOut::Error: std::error::Error,
512    AppIn: futures::Stream<Item = (ResolvedTransportRouting<HoprSurb>, ApplicationDataOut)> + Send + 'static,
513{
514    /// Builds the pipeline configured for an Entry node.
515    ///
516    /// The incoming acknowledgement pipeline is not started; ticket events (if any) are ignored.
517    pub fn build_for_entry(self) -> AbortableList<HoprTransportProcess> {
518        let (packet_key, wire_msg, api, counters, _unack, _ticket_events, _cfg, mut processes, codec) = self.prepare();
519
520        let inner = match codec {
521            #[cfg(not(feature = "capture"))]
522            BuiltCodec::Plain(encoder, decoder) => PacketPipelineBuilder::new(packet_key.clone())
523                .transport(wire_msg)
524                .codec((encoder, decoder))
525                .api(api)
526                .with_counters(counters)
527                .with_config(_cfg.pipeline)
528                .build_for_entry(),
529            #[cfg(feature = "capture")]
530            BuiltCodec::Captured(encoder, decoder) => PacketPipelineBuilder::new(packet_key.clone())
531                .transport(wire_msg)
532                .codec((encoder, decoder))
533                .api(api)
534                .with_counters(counters)
535                .with_config(_cfg.pipeline)
536                .build_for_entry(),
537        };
538
539        processes.flat_map_extend_from(inner, HoprTransportProcess::Pipeline);
540        processes
541    }
542
543    /// Builds the pipeline configured for an Exit node.
544    ///
545    /// The incoming acknowledgement pipeline is started but its acknowledgements are drained
546    /// (never forwarded to a ticket processor); ticket events (if any) are ignored.
547    pub fn build_for_exit(self) -> AbortableList<HoprTransportProcess> {
548        let (packet_key, wire_msg, api, counters, _unack, _ticket_events, _cfg, mut processes, codec) = self.prepare();
549
550        let inner = match codec {
551            #[cfg(not(feature = "capture"))]
552            BuiltCodec::Plain(encoder, decoder) => PacketPipelineBuilder::new(packet_key.clone())
553                .transport(wire_msg)
554                .codec((encoder, decoder))
555                .api(api)
556                .with_counters(counters)
557                .with_config(_cfg.pipeline)
558                .build_for_exit(),
559            #[cfg(feature = "capture")]
560            BuiltCodec::Captured(encoder, decoder) => PacketPipelineBuilder::new(packet_key.clone())
561                .transport(wire_msg)
562                .codec((encoder, decoder))
563                .api(api)
564                .with_counters(counters)
565                .with_config(_cfg.pipeline)
566                .build_for_exit(),
567        };
568
569        processes.flat_map_extend_from(inner, HoprTransportProcess::Pipeline);
570        processes
571    }
572}