1use bytes::Bytes;
2use hopr_api::{
3 chain::{ChainKeyOperations, ChainReadChannelOperations, ChainReadTicketOperations, ChainValues},
4 tickets::TicketFactory,
5 types::{
6 crypto::prelude::*,
7 internal::{prelude::*, routing::ResolvedTransportRouting},
8 },
9};
10use hopr_crypto_packet::HoprSurb;
11use hopr_protocol_app::prelude::*;
12use hopr_protocol_hopr::prelude::*;
13use hopr_utils::runtime::AbortableList;
14
15use crate::{
16 HoprTransportProcess, PeerProtocolCounterRegistry,
17 config::HoprPacketPipelineConfig,
18 protocol::{PacketPipelineBuilder, Unset},
19};
20
21pub struct HoprPacketPipelineBuilder<
49 WIn,
50 WOut,
51 Chain,
52 S,
53 TFact,
54 AppOut,
55 AppIn,
56 TEvt = futures::sink::Drain<hopr_api::node::TicketEvent>,
57> {
58 packet_key: Option<OffchainKeypair>,
59 chain_key: Option<ChainKeypair>,
60 wire_msg: (WOut, WIn),
61 api: (AppOut, AppIn),
62 surb_store: S,
63 chain_api: Chain,
64 ticket_factory: TFact,
65 counters: PeerProtocolCounterRegistry,
66 channels_dst: Option<Hash>,
67 cfg: HoprPacketPipelineConfig,
68 ticket_events: Option<TEvt>,
69}
70
71impl Default
72 for HoprPacketPipelineBuilder<
73 Unset,
74 Unset,
75 Unset,
76 Unset,
77 Unset,
78 Unset,
79 Unset,
80 futures::sink::Drain<hopr_api::node::TicketEvent>,
81 >
82{
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88impl
89 HoprPacketPipelineBuilder<
90 Unset,
91 Unset,
92 Unset,
93 Unset,
94 Unset,
95 Unset,
96 Unset,
97 futures::sink::Drain<hopr_api::node::TicketEvent>,
98 >
99{
100 pub fn new() -> Self {
103 Self {
104 packet_key: None,
105 chain_key: None,
106 wire_msg: (Unset, Unset),
107 api: (Unset, Unset),
108 surb_store: Unset,
109 chain_api: Unset,
110 ticket_factory: Unset,
111 counters: PeerProtocolCounterRegistry::default(),
112 channels_dst: None,
113 cfg: HoprPacketPipelineConfig::default(),
114 ticket_events: None,
115 }
116 }
117}
118
119impl<WIn, WOut, Chain, S, TFact, AppOut, AppIn, TEvt>
120 HoprPacketPipelineBuilder<WIn, WOut, Chain, S, TFact, AppOut, AppIn, TEvt>
121{
122 #[must_use]
124 pub fn with_config(mut self, cfg: HoprPacketPipelineConfig) -> Self {
125 self.cfg = cfg;
126 self
127 }
128
129 #[must_use]
131 pub fn with_counters(mut self, counters: PeerProtocolCounterRegistry) -> Self {
132 self.counters = counters;
133 self
134 }
135
136 #[must_use]
138 pub fn identity<'a, I>(mut self, identity: I) -> Self
139 where
140 I: Into<(&'a ChainKeypair, &'a OffchainKeypair)>,
141 {
142 let (chain_key, packet_key) = identity.into();
143 self.chain_key = Some(chain_key.clone());
144 self.packet_key = Some(packet_key.clone());
145 self
146 }
147
148 #[must_use]
150 pub fn channels_dst(mut self, channels_dst: Hash) -> Self {
151 self.channels_dst = Some(channels_dst);
152 self
153 }
154
155 #[must_use]
157 pub fn transport<WIn2, WOut2>(
158 self,
159 wire_msg: (WOut2, WIn2),
160 ) -> HoprPacketPipelineBuilder<WIn2, WOut2, Chain, S, TFact, AppOut, AppIn, TEvt> {
161 HoprPacketPipelineBuilder {
162 packet_key: self.packet_key,
163 chain_key: self.chain_key,
164 wire_msg,
165 api: self.api,
166 surb_store: self.surb_store,
167 chain_api: self.chain_api,
168 ticket_factory: self.ticket_factory,
169 counters: self.counters,
170 channels_dst: self.channels_dst,
171 cfg: self.cfg,
172 ticket_events: self.ticket_events,
173 }
174 }
175
176 #[must_use]
178 pub fn api<AppOut2, AppIn2>(
179 self,
180 api: (AppOut2, AppIn2),
181 ) -> HoprPacketPipelineBuilder<WIn, WOut, Chain, S, TFact, AppOut2, AppIn2, TEvt> {
182 HoprPacketPipelineBuilder {
183 packet_key: self.packet_key,
184 chain_key: self.chain_key,
185 wire_msg: self.wire_msg,
186 api,
187 surb_store: self.surb_store,
188 chain_api: self.chain_api,
189 ticket_factory: self.ticket_factory,
190 counters: self.counters,
191 channels_dst: self.channels_dst,
192 cfg: self.cfg,
193 ticket_events: self.ticket_events,
194 }
195 }
196
197 #[must_use]
199 pub fn surb_store<S2>(
200 self,
201 surb_store: S2,
202 ) -> HoprPacketPipelineBuilder<WIn, WOut, Chain, S2, TFact, AppOut, AppIn, TEvt> {
203 HoprPacketPipelineBuilder {
204 packet_key: self.packet_key,
205 chain_key: self.chain_key,
206 wire_msg: self.wire_msg,
207 api: self.api,
208 surb_store,
209 chain_api: self.chain_api,
210 ticket_factory: self.ticket_factory,
211 counters: self.counters,
212 channels_dst: self.channels_dst,
213 cfg: self.cfg,
214 ticket_events: self.ticket_events,
215 }
216 }
217
218 #[must_use]
220 pub fn chain_api<Chain2>(
221 self,
222 chain_api: Chain2,
223 ) -> HoprPacketPipelineBuilder<WIn, WOut, Chain2, S, TFact, AppOut, AppIn, TEvt> {
224 HoprPacketPipelineBuilder {
225 packet_key: self.packet_key,
226 chain_key: self.chain_key,
227 wire_msg: self.wire_msg,
228 api: self.api,
229 surb_store: self.surb_store,
230 chain_api,
231 ticket_factory: self.ticket_factory,
232 counters: self.counters,
233 channels_dst: self.channels_dst,
234 cfg: self.cfg,
235 ticket_events: self.ticket_events,
236 }
237 }
238
239 #[must_use]
241 pub fn ticket_factory<TFact2>(
242 self,
243 ticket_factory: TFact2,
244 ) -> HoprPacketPipelineBuilder<WIn, WOut, Chain, S, TFact2, AppOut, AppIn, TEvt> {
245 HoprPacketPipelineBuilder {
246 packet_key: self.packet_key,
247 chain_key: self.chain_key,
248 wire_msg: self.wire_msg,
249 api: self.api,
250 surb_store: self.surb_store,
251 chain_api: self.chain_api,
252 ticket_factory,
253 counters: self.counters,
254 channels_dst: self.channels_dst,
255 cfg: self.cfg,
256 ticket_events: self.ticket_events,
257 }
258 }
259
260 #[must_use]
263 pub fn with_ticket_events<TEvt2>(
264 self,
265 ticket_events: TEvt2,
266 ) -> HoprPacketPipelineBuilder<WIn, WOut, Chain, S, TFact, AppOut, AppIn, TEvt2> {
267 HoprPacketPipelineBuilder {
268 packet_key: self.packet_key,
269 chain_key: self.chain_key,
270 wire_msg: self.wire_msg,
271 api: self.api,
272 surb_store: self.surb_store,
273 chain_api: self.chain_api,
274 ticket_factory: self.ticket_factory,
275 counters: self.counters,
276 channels_dst: self.channels_dst,
277 cfg: self.cfg,
278 ticket_events: Some(ticket_events),
279 }
280 }
281}
282
283impl<WIn, WOut, Chain, S, TFact, AppOut, AppIn, TEvt>
285 HoprPacketPipelineBuilder<WIn, WOut, Chain, S, TFact, AppOut, AppIn, TEvt>
286where
287 WOut: futures::Sink<(PeerId, Bytes)> + Clone + Unpin + Send + 'static,
288 WOut::Error: std::error::Error,
289 WIn: futures::Stream<Item = (PeerId, Bytes)> + Send + 'static,
290 Chain: ChainKeyOperations
291 + ChainReadChannelOperations
292 + ChainReadTicketOperations
293 + ChainValues
294 + Clone
295 + Send
296 + Sync
297 + 'static,
298 S: SurbStore + Clone + Send + Sync + 'static,
299 TFact: TicketFactory + Clone + Send + Sync + 'static,
300 AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
301 AppOut::Error: std::error::Error,
302 AppIn: futures::Stream<Item = (ResolvedTransportRouting<HoprSurb>, ApplicationDataOut)> + Send + 'static,
303{
304 #[allow(clippy::type_complexity)]
308 fn prepare(
309 self,
310 ) -> (
311 OffchainKeypair,
312 (WOut, WIn),
313 (AppOut, AppIn),
314 PeerProtocolCounterRegistry,
315 HoprUnacknowledgedTicketProcessor<Chain>,
316 Option<TEvt>,
317 HoprPacketPipelineConfig,
318 AbortableList<HoprTransportProcess>,
320 BuiltCodec<Chain, S, TFact>,
321 ) {
322 let HoprPacketPipelineBuilder {
323 packet_key,
324 chain_key,
325 wire_msg,
326 api,
327 surb_store,
328 chain_api,
329 ticket_factory,
330 counters,
331 channels_dst,
332 cfg,
333 ticket_events,
334 } = self;
335
336 let packet_key = packet_key.expect("identity() must be called before building the pipeline");
337 let chain_key = chain_key.expect("identity() must be called before building the pipeline");
338 let channels_dst = channels_dst.expect("channels_dst() must be called before building the pipeline");
339
340 let unack_ticket_proc = HoprUnacknowledgedTicketProcessor::new(
341 chain_api.clone(),
342 chain_key.clone(),
343 channels_dst,
344 cfg.ack_processor,
345 );
346
347 let encoder = HoprEncoder::new(
348 chain_key.clone(),
349 chain_api.clone(),
350 surb_store.clone(),
351 ticket_factory.clone(),
352 channels_dst,
353 cfg.codec,
354 );
355
356 let decoder = HoprDecoder::new(
357 (packet_key.clone(), chain_key.clone()),
358 chain_api.clone(),
359 surb_store,
360 ticket_factory.clone(),
361 channels_dst,
362 cfg.codec,
363 );
364
365 #[allow(unused_mut)]
366 let mut processes = AbortableList::default();
367
368 #[cfg(feature = "capture")]
369 let codec = {
370 use crate::capture;
371
372 let writer: Box<dyn capture::PacketWriter + Send + 'static> =
373 if let Ok(desc) = std::env::var("HOPR_CAPTURE_PACKETS") {
374 if let Ok(pcap_writer) = std::fs::File::create(&desc).and_then(capture::PcapPacketWriter::new) {
375 tracing::warn!("pcap file packet capture initialized to {desc}");
376 Box::new(pcap_writer)
377 } else {
378 tracing::error!(desc, "failed to create packet capture: invalid socket address or file");
379 Box::new(capture::NullWriter)
380 }
381 } else {
382 tracing::warn!("no packet capture specified");
383 Box::new(capture::NullWriter)
384 };
385
386 let (sender, ah) = capture::packet_capture_channel(writer);
387 processes.insert(HoprTransportProcess::Capture, ah);
388 BuiltCodec::Captured(
389 capture::CapturePacketCodec::new(encoder, *packet_key.public(), sender.clone()),
390 capture::CapturePacketCodec::new(decoder, *packet_key.public(), sender),
391 )
392 };
393
394 #[cfg(not(feature = "capture"))]
395 let codec = BuiltCodec::Plain(encoder, decoder);
396
397 (
398 packet_key,
399 wire_msg,
400 api,
401 counters,
402 unack_ticket_proc,
403 ticket_events,
404 cfg,
405 processes,
406 codec,
407 )
408 }
409}
410
411enum BuiltCodec<Chain, S, TFact>
413where
414 Chain: ChainKeyOperations
415 + ChainReadChannelOperations
416 + ChainReadTicketOperations
417 + ChainValues
418 + Clone
419 + Send
420 + Sync
421 + 'static,
422 S: SurbStore + Clone + Send + Sync + 'static,
423 TFact: TicketFactory + Clone + Send + Sync + 'static,
424{
425 #[cfg(not(feature = "capture"))]
426 Plain(HoprEncoder<Chain, S, TFact>, HoprDecoder<Chain, S, TFact>),
427 #[cfg(feature = "capture")]
428 Captured(
429 crate::capture::CapturePacketCodec<HoprEncoder<Chain, S, TFact>>,
430 crate::capture::CapturePacketCodec<HoprDecoder<Chain, S, TFact>>,
431 ),
432}
433
434impl<WIn, WOut, Chain, S, TFact, AppOut, AppIn, TEvt>
436 HoprPacketPipelineBuilder<WIn, WOut, Chain, S, TFact, AppOut, AppIn, TEvt>
437where
438 WOut: futures::Sink<(PeerId, Bytes)> + Clone + Unpin + Send + 'static,
439 WOut::Error: std::error::Error,
440 WIn: futures::Stream<Item = (PeerId, Bytes)> + Send + 'static,
441 Chain: ChainKeyOperations
442 + ChainReadChannelOperations
443 + ChainReadTicketOperations
444 + ChainValues
445 + Clone
446 + Send
447 + Sync
448 + 'static,
449 S: SurbStore + Clone + Send + Sync + 'static,
450 TEvt: futures::Sink<hopr_api::node::TicketEvent> + Clone + Unpin + Send + 'static,
451 TEvt::Error: std::error::Error,
452 TFact: TicketFactory + Clone + Send + Sync + 'static,
453 AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
454 AppOut::Error: std::error::Error,
455 AppIn: futures::Stream<Item = (ResolvedTransportRouting<HoprSurb>, ApplicationDataOut)> + Send + 'static,
456{
457 pub fn build_for_relay(self) -> AbortableList<HoprTransportProcess> {
462 let (packet_key, wire_msg, api, counters, unack_ticket_proc, ticket_events, _cfg, mut processes, codec) =
463 self.prepare();
464
465 let ticket_events = ticket_events.expect("Relay node requires ticket events; call with_ticket_events() first");
466
467 let inner = match codec {
468 #[cfg(not(feature = "capture"))]
469 BuiltCodec::Plain(encoder, decoder) => PacketPipelineBuilder::new(packet_key.clone())
470 .transport(wire_msg)
471 .codec((encoder, decoder))
472 .api(api)
473 .with_counters(counters)
474 .with_config(_cfg.pipeline)
475 .with_ticket_processing(unack_ticket_proc, ticket_events)
476 .build_for_relay(),
477 #[cfg(feature = "capture")]
478 BuiltCodec::Captured(encoder, decoder) => PacketPipelineBuilder::new(packet_key.clone())
479 .transport(wire_msg)
480 .codec((encoder, decoder))
481 .api(api)
482 .with_counters(counters)
483 .with_config(_cfg.pipeline)
484 .with_ticket_processing(unack_ticket_proc, ticket_events)
485 .build_for_relay(),
486 };
487
488 processes.flat_map_extend_from(inner, HoprTransportProcess::Pipeline);
489 processes
490 }
491}
492
493impl<WIn, WOut, Chain, S, TFact, AppOut, AppIn, TEvt>
495 HoprPacketPipelineBuilder<WIn, WOut, Chain, S, TFact, AppOut, AppIn, TEvt>
496where
497 WOut: futures::Sink<(PeerId, Bytes)> + Clone + Unpin + Send + 'static,
498 WOut::Error: std::error::Error,
499 WIn: futures::Stream<Item = (PeerId, Bytes)> + Send + 'static,
500 Chain: ChainKeyOperations
501 + ChainReadChannelOperations
502 + ChainReadTicketOperations
503 + ChainValues
504 + Clone
505 + Send
506 + Sync
507 + 'static,
508 S: SurbStore + Clone + Send + Sync + 'static,
509 TFact: TicketFactory + Clone + Send + Sync + 'static,
510 AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
511 AppOut::Error: std::error::Error,
512 AppIn: futures::Stream<Item = (ResolvedTransportRouting<HoprSurb>, ApplicationDataOut)> + Send + 'static,
513{
514 pub fn build_for_entry(self) -> AbortableList<HoprTransportProcess> {
518 let (packet_key, wire_msg, api, counters, _unack, _ticket_events, _cfg, mut processes, codec) = self.prepare();
519
520 let inner = match codec {
521 #[cfg(not(feature = "capture"))]
522 BuiltCodec::Plain(encoder, decoder) => PacketPipelineBuilder::new(packet_key.clone())
523 .transport(wire_msg)
524 .codec((encoder, decoder))
525 .api(api)
526 .with_counters(counters)
527 .with_config(_cfg.pipeline)
528 .build_for_entry(),
529 #[cfg(feature = "capture")]
530 BuiltCodec::Captured(encoder, decoder) => PacketPipelineBuilder::new(packet_key.clone())
531 .transport(wire_msg)
532 .codec((encoder, decoder))
533 .api(api)
534 .with_counters(counters)
535 .with_config(_cfg.pipeline)
536 .build_for_entry(),
537 };
538
539 processes.flat_map_extend_from(inner, HoprTransportProcess::Pipeline);
540 processes
541 }
542
543 pub fn build_for_exit(self) -> AbortableList<HoprTransportProcess> {
548 let (packet_key, wire_msg, api, counters, _unack, _ticket_events, _cfg, mut processes, codec) = self.prepare();
549
550 let inner = match codec {
551 #[cfg(not(feature = "capture"))]
552 BuiltCodec::Plain(encoder, decoder) => PacketPipelineBuilder::new(packet_key.clone())
553 .transport(wire_msg)
554 .codec((encoder, decoder))
555 .api(api)
556 .with_counters(counters)
557 .with_config(_cfg.pipeline)
558 .build_for_exit(),
559 #[cfg(feature = "capture")]
560 BuiltCodec::Captured(encoder, decoder) => PacketPipelineBuilder::new(packet_key.clone())
561 .transport(wire_msg)
562 .codec((encoder, decoder))
563 .api(api)
564 .with_counters(counters)
565 .with_config(_cfg.pipeline)
566 .build_for_exit(),
567 };
568
569 processes.flat_map_extend_from(inner, HoprTransportProcess::Pipeline);
570 processes
571 }
572}