1use hopr_api::{
2 chain::{ChainKeyOperations, ChainReadChannelOperations, ChainReadTicketOperations, ChainValues},
3 tickets::TicketFactory,
4 types::{
5 crypto::prelude::*,
6 internal::{prelude::*, routing::ResolvedTransportRouting},
7 },
8};
9use hopr_async_runtime::AbortableList;
10use hopr_crypto_packet::HoprSurb;
11use hopr_protocol_app::prelude::*;
12use hopr_protocol_hopr::prelude::*;
13use hopr_transport_protocol::run_packet_pipeline;
14
15use crate::{HoprTransportProcess, config::HoprPacketPipelineConfig};
16
17#[derive(Clone)]
19pub struct HoprPipelineComponents<TEvt, S, Chain, TFact> {
20 pub ticket_events: TEvt,
22 pub surb_store: S,
24 pub chain_api: Chain,
26 pub ticket_factory: TFact,
28 pub counters: hopr_transport_protocol::PeerProtocolCounterRegistry,
30}
31
32pub fn run_hopr_packet_pipeline<WIn, WOut, Chain, S, TEvt, TFact, AppOut, AppIn>(
33 (packet_key, chain_key): (OffchainKeypair, ChainKeypair),
34 wire_msg: (WOut, WIn),
35 api: (AppOut, AppIn),
36 components: HoprPipelineComponents<TEvt, S, Chain, TFact>,
37 channels_dst: Hash,
38 cfg: HoprPacketPipelineConfig,
39) -> AbortableList<HoprTransportProcess>
40where
41 WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
42 WOut::Error: std::error::Error,
43 WIn: futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + 'static,
44 Chain: ChainKeyOperations
45 + ChainReadChannelOperations
46 + ChainReadTicketOperations
47 + ChainValues
48 + Clone
49 + Send
50 + Sync
51 + 'static,
52 S: SurbStore + Clone + Send + Sync + 'static,
53 TEvt: futures::Sink<hopr_api::node::TicketEvent> + Clone + Unpin + Send + 'static,
54 TEvt::Error: std::error::Error,
55 TFact: TicketFactory + Clone + Send + Sync + 'static,
56 AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
57 AppOut::Error: std::error::Error,
58 AppIn: futures::Stream<Item = (ResolvedTransportRouting<HoprSurb>, ApplicationDataOut)> + Send + 'static,
59{
60 let HoprPipelineComponents {
61 ticket_events,
62 surb_store,
63 chain_api,
64 ticket_factory,
65 counters,
66 } = components;
67
68 let unack_ticket_proc =
69 HoprUnacknowledgedTicketProcessor::new(chain_api.clone(), chain_key.clone(), channels_dst, cfg.ack_processor);
70
71 let encoder = HoprEncoder::new(
72 chain_key.clone(),
73 chain_api.clone(),
74 surb_store.clone(),
75 ticket_factory.clone(),
76 channels_dst,
77 cfg.codec,
78 );
79
80 let decoder = HoprDecoder::new(
81 (packet_key.clone(), chain_key.clone()),
82 chain_api.clone(),
83 surb_store,
84 ticket_factory.clone(),
85 channels_dst,
86 cfg.codec,
87 );
88
89 let mut processes = AbortableList::default();
90
91 #[cfg(feature = "capture")]
92 let (encoder, decoder) = {
93 use crate::capture;
94
95 let writer: Box<dyn capture::PacketWriter + Send + 'static> =
96 if let Ok(desc) = std::env::var("HOPR_CAPTURE_PACKETS") {
97 if let Ok(pcap_writer) = std::fs::File::create(&desc).and_then(capture::PcapPacketWriter::new) {
98 tracing::warn!("pcap file packet capture initialized to {desc}");
99 Box::new(pcap_writer)
100 } else {
101 tracing::error!(desc, "failed to create packet capture: invalid socket address or file");
102 Box::new(capture::NullWriter)
103 }
104 } else {
105 tracing::warn!("no packet capture specified");
106 Box::new(capture::NullWriter)
107 };
108
109 let (sender, ah) = capture::packet_capture_channel(writer);
110 processes.insert(HoprTransportProcess::Capture, ah);
111 (
112 capture::CapturePacketCodec::new(encoder, *packet_key.public(), sender.clone()),
113 capture::CapturePacketCodec::new(decoder, *packet_key.public(), sender.clone()),
114 )
115 };
116
117 processes.flat_map_extend_from(
118 run_packet_pipeline(
119 packet_key.clone(),
120 wire_msg,
121 (encoder, decoder),
122 unack_ticket_proc,
123 ticket_events,
124 cfg.pipeline,
125 api,
126 counters,
127 ),
128 HoprTransportProcess::Pipeline,
129 );
130
131 processes
132}