Skip to main content

hopr_transport/
pipeline.rs

1use hopr_api::{
2    chain::{ChainKeyOperations, ChainReadChannelOperations, ChainReadTicketOperations, ChainValues},
3    tickets::TicketFactory,
4    types::{
5        crypto::prelude::*,
6        internal::{prelude::*, routing::ResolvedTransportRouting},
7    },
8};
9use hopr_async_runtime::AbortableList;
10use hopr_crypto_packet::HoprSurb;
11use hopr_protocol_app::prelude::*;
12use hopr_protocol_hopr::prelude::*;
13use hopr_transport_protocol::run_packet_pipeline;
14
15use crate::{HoprTransportProcess, config::HoprPacketPipelineConfig};
16
17/// Contains all components required to run the HOPR packet pipeline.
18#[derive(Clone)]
19pub struct HoprPipelineComponents<TEvt, S, Chain, TFact> {
20    /// Sink for ticket events.
21    pub ticket_events: TEvt,
22    /// Store for SURBs and Reply Openers.
23    pub surb_store: S,
24    /// Chain API for interacting with the blockchain.
25    pub chain_api: Chain,
26    /// Ticket factory for creating outgoing tickets
27    pub ticket_factory: TFact,
28    /// Per-peer protocol conformance counters.
29    pub counters: hopr_transport_protocol::PeerProtocolCounterRegistry,
30}
31
32pub fn run_hopr_packet_pipeline<WIn, WOut, Chain, S, TEvt, TFact, AppOut, AppIn>(
33    (packet_key, chain_key): (OffchainKeypair, ChainKeypair),
34    wire_msg: (WOut, WIn),
35    api: (AppOut, AppIn),
36    components: HoprPipelineComponents<TEvt, S, Chain, TFact>,
37    channels_dst: Hash,
38    cfg: HoprPacketPipelineConfig,
39) -> AbortableList<HoprTransportProcess>
40where
41    WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
42    WOut::Error: std::error::Error,
43    WIn: futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + 'static,
44    Chain: ChainKeyOperations
45        + ChainReadChannelOperations
46        + ChainReadTicketOperations
47        + ChainValues
48        + Clone
49        + Send
50        + Sync
51        + 'static,
52    S: SurbStore + Clone + Send + Sync + 'static,
53    TEvt: futures::Sink<hopr_api::node::TicketEvent> + Clone + Unpin + Send + 'static,
54    TEvt::Error: std::error::Error,
55    TFact: TicketFactory + Clone + Send + Sync + 'static,
56    AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
57    AppOut::Error: std::error::Error,
58    AppIn: futures::Stream<Item = (ResolvedTransportRouting<HoprSurb>, ApplicationDataOut)> + Send + 'static,
59{
60    let HoprPipelineComponents {
61        ticket_events,
62        surb_store,
63        chain_api,
64        ticket_factory,
65        counters,
66    } = components;
67
68    let unack_ticket_proc =
69        HoprUnacknowledgedTicketProcessor::new(chain_api.clone(), chain_key.clone(), channels_dst, cfg.ack_processor);
70
71    let encoder = HoprEncoder::new(
72        chain_key.clone(),
73        chain_api.clone(),
74        surb_store.clone(),
75        ticket_factory.clone(),
76        channels_dst,
77        cfg.codec,
78    );
79
80    let decoder = HoprDecoder::new(
81        (packet_key.clone(), chain_key.clone()),
82        chain_api.clone(),
83        surb_store,
84        ticket_factory.clone(),
85        channels_dst,
86        cfg.codec,
87    );
88
89    let mut processes = AbortableList::default();
90
91    #[cfg(feature = "capture")]
92    let (encoder, decoder) = {
93        use crate::capture;
94
95        let writer: Box<dyn capture::PacketWriter + Send + 'static> =
96            if let Ok(desc) = std::env::var("HOPR_CAPTURE_PACKETS") {
97                if let Ok(pcap_writer) = std::fs::File::create(&desc).and_then(capture::PcapPacketWriter::new) {
98                    tracing::warn!("pcap file packet capture initialized to {desc}");
99                    Box::new(pcap_writer)
100                } else {
101                    tracing::error!(desc, "failed to create packet capture: invalid socket address or file");
102                    Box::new(capture::NullWriter)
103                }
104            } else {
105                tracing::warn!("no packet capture specified");
106                Box::new(capture::NullWriter)
107            };
108
109        let (sender, ah) = capture::packet_capture_channel(writer);
110        processes.insert(HoprTransportProcess::Capture, ah);
111        (
112            capture::CapturePacketCodec::new(encoder, *packet_key.public(), sender.clone()),
113            capture::CapturePacketCodec::new(decoder, *packet_key.public(), sender.clone()),
114        )
115    };
116
117    processes.flat_map_extend_from(
118        run_packet_pipeline(
119            packet_key.clone(),
120            wire_msg,
121            (encoder, decoder),
122            unack_ticket_proc,
123            ticket_events,
124            cfg.pipeline,
125            api,
126            counters,
127        ),
128        HoprTransportProcess::Pipeline,
129    );
130
131    processes
132}