1use bytes::Bytes;
4use hopr_api::{
5 PeerId,
6 node::TicketEvent,
7 types::{crypto::prelude::*, internal::prelude::*},
8};
9use hopr_crypto_packet::HoprSurb;
10use hopr_protocol_app::prelude::*;
11use hopr_protocol_hopr::prelude::*;
12use hopr_utils::runtime::AbortableList;
13
14use super::{
15 NodeType, NoopTicketProcessor, PacketPipelineProcesses, config::PacketPipelineConfig, run_packet_pipeline_inner,
16};
17use crate::PeerProtocolCounterRegistry;
18
19pub struct Unset;
22
23pub struct PacketPipelineBuilder<WIn, WOut, C, D, T, TEvt, AppOut, AppIn> {
41 packet_key: OffchainKeypair,
42 wire_msg: (WOut, WIn),
43 codec: (C, D),
44 cfg: PacketPipelineConfig,
45 api: (AppOut, AppIn),
46 counters: PeerProtocolCounterRegistry,
47 ticket_proc: Option<T>,
48 ticket_events: Option<TEvt>,
49}
50
51impl
52 PacketPipelineBuilder<
53 Unset,
54 Unset,
55 Unset,
56 Unset,
57 NoopTicketProcessor,
58 futures::sink::Drain<TicketEvent>,
59 Unset,
60 Unset,
61 >
62{
63 pub fn new(packet_key: OffchainKeypair) -> Self {
77 Self {
78 packet_key,
79 wire_msg: (Unset, Unset),
80 codec: (Unset, Unset),
81 cfg: PacketPipelineConfig::default(),
82 api: (Unset, Unset),
83 counters: PeerProtocolCounterRegistry::default(),
84 ticket_proc: None,
85 ticket_events: None,
86 }
87 }
88}
89
90impl<WIn, WOut, C, D, T, TEvt, AppOut, AppIn> PacketPipelineBuilder<WIn, WOut, C, D, T, TEvt, AppOut, AppIn> {
91 #[must_use]
93 pub fn with_config(mut self, cfg: PacketPipelineConfig) -> Self {
94 self.cfg = cfg;
95 self
96 }
97
98 #[must_use]
100 pub fn with_counters(mut self, counters: PeerProtocolCounterRegistry) -> Self {
101 self.counters = counters;
102 self
103 }
104
105 #[must_use]
107 pub fn transport<WIn2, WOut2>(
108 self,
109 wire_msg: (WOut2, WIn2),
110 ) -> PacketPipelineBuilder<WIn2, WOut2, C, D, T, TEvt, AppOut, AppIn> {
111 PacketPipelineBuilder {
112 packet_key: self.packet_key,
113 wire_msg,
114 codec: self.codec,
115 cfg: self.cfg,
116 api: self.api,
117 counters: self.counters,
118 ticket_proc: self.ticket_proc,
119 ticket_events: self.ticket_events,
120 }
121 }
122
123 #[must_use]
125 pub fn codec<C2, D2>(self, codec: (C2, D2)) -> PacketPipelineBuilder<WIn, WOut, C2, D2, T, TEvt, AppOut, AppIn> {
126 PacketPipelineBuilder {
127 packet_key: self.packet_key,
128 wire_msg: self.wire_msg,
129 codec,
130 cfg: self.cfg,
131 api: self.api,
132 counters: self.counters,
133 ticket_proc: self.ticket_proc,
134 ticket_events: self.ticket_events,
135 }
136 }
137
138 #[must_use]
140 pub fn api<AppOut2, AppIn2>(
141 self,
142 api: (AppOut2, AppIn2),
143 ) -> PacketPipelineBuilder<WIn, WOut, C, D, T, TEvt, AppOut2, AppIn2> {
144 PacketPipelineBuilder {
145 packet_key: self.packet_key,
146 wire_msg: self.wire_msg,
147 codec: self.codec,
148 cfg: self.cfg,
149 api,
150 counters: self.counters,
151 ticket_proc: self.ticket_proc,
152 ticket_events: self.ticket_events,
153 }
154 }
155
156 #[must_use]
161 pub fn with_ticket_processing<T2, TEvt2>(
162 self,
163 ticket_proc: T2,
164 ticket_events: TEvt2,
165 ) -> PacketPipelineBuilder<WIn, WOut, C, D, T2, TEvt2, AppOut, AppIn>
166 where
167 T2: UnacknowledgedTicketProcessor + Sync + Send + 'static,
168 TEvt2: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
169 TEvt2::Error: std::error::Error,
170 {
171 PacketPipelineBuilder {
172 packet_key: self.packet_key,
173 wire_msg: self.wire_msg,
174 codec: self.codec,
175 cfg: self.cfg,
176 api: self.api,
177 counters: self.counters,
178 ticket_proc: Some(ticket_proc),
179 ticket_events: Some(ticket_events),
180 }
181 }
182}
183
184impl<WIn, WOut, C, D, T, TEvt, AppOut, AppIn> PacketPipelineBuilder<WIn, WOut, C, D, T, TEvt, AppOut, AppIn>
185where
186 WOut: futures::Sink<(PeerId, Bytes)> + Clone + Unpin + Send + 'static,
187 WOut::Error: std::error::Error,
188 WIn: futures::Stream<Item = (PeerId, Bytes)> + Send + 'static,
189 C: PacketEncoder + Sync + Send + 'static,
190 D: PacketDecoder + Sync + Send + 'static,
191 T: UnacknowledgedTicketProcessor + Sync + Send + 'static,
192 TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
193 TEvt::Error: std::error::Error,
194 AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
195 AppOut::Error: std::error::Error,
196 AppIn: futures::Stream<Item = (ResolvedTransportRouting<HoprSurb>, ApplicationDataOut)> + Send + 'static,
197{
198 #[must_use]
207 pub fn build_for_relay(self) -> AbortableList<PacketPipelineProcesses> {
208 let ticket_proc = self
209 .ticket_proc
210 .expect("Relay node requires ticket processing; call with_ticket_processing() first");
211 let ticket_events = self
212 .ticket_events
213 .expect("Relay node requires ticket processing; call with_ticket_processing() first");
214 run_packet_pipeline_inner(
215 NodeType::Relay,
216 self.packet_key,
217 self.wire_msg,
218 self.codec,
219 ticket_proc,
220 ticket_events,
221 self.cfg,
222 self.api,
223 self.counters,
224 )
225 }
226}
227
228impl<WIn, WOut, C, D, T, TEvt, AppOut, AppIn> PacketPipelineBuilder<WIn, WOut, C, D, T, TEvt, AppOut, AppIn>
229where
230 WOut: futures::Sink<(PeerId, Bytes)> + Clone + Unpin + Send + 'static,
231 WOut::Error: std::error::Error,
232 WIn: futures::Stream<Item = (PeerId, Bytes)> + Send + 'static,
233 C: PacketEncoder + Sync + Send + 'static,
234 D: PacketDecoder + Sync + Send + 'static,
235 AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
236 AppOut::Error: std::error::Error,
237 AppIn: futures::Stream<Item = (ResolvedTransportRouting<HoprSurb>, ApplicationDataOut)> + Send + 'static,
238{
239 #[must_use]
246 pub fn build_for_entry(self) -> AbortableList<PacketPipelineProcesses> {
247 run_packet_pipeline_inner::<_, _, _, _, NoopTicketProcessor, futures::sink::Drain<TicketEvent>, _, _>(
248 NodeType::Entry,
249 self.packet_key,
250 self.wire_msg,
251 self.codec,
252 NoopTicketProcessor,
253 futures::sink::drain(),
254 self.cfg,
255 self.api,
256 self.counters,
257 )
258 }
259
260 #[must_use]
268 pub fn build_for_exit(self) -> AbortableList<PacketPipelineProcesses> {
269 run_packet_pipeline_inner::<_, _, _, _, NoopTicketProcessor, futures::sink::Drain<TicketEvent>, _, _>(
270 NodeType::Exit,
271 self.packet_key,
272 self.wire_msg,
273 self.codec,
274 NoopTicketProcessor,
275 futures::sink::drain(),
276 self.cfg,
277 self.api,
278 self.counters,
279 )
280 }
281}