1use std::{
2 ops::{Mul, Sub},
3 time::Duration,
4};
5
6use hopr_api::chain::*;
7use hopr_crypto_packet::prelude::*;
8use hopr_crypto_types::prelude::*;
9use hopr_internal_types::prelude::*;
10use hopr_platform::trace_timed;
11use hopr_primitive_types::prelude::*;
12
13use crate::{
14 AuxiliaryPacketInfo, HoprCodecConfig, IncomingAcknowledgementPacket, IncomingFinalPacket, IncomingForwardedPacket,
15 IncomingPacket, IncomingPacketError, PacketDecoder, SurbStore, TicketCreationError, TicketTracker,
16 errors::HoprProtocolError, tbf::TagBloomFilter,
17};
18
19pub struct HoprDecoder<Chain, S, T> {
21 chain_api: Chain,
22 surb_store: std::sync::Arc<S>,
23 tracker: T,
24 packet_key: OffchainKeypair,
25 chain_key: ChainKeypair,
26 channels_dst: Hash,
27 cfg: HoprCodecConfig,
28 tbf: parking_lot::Mutex<TagBloomFilter>,
29 peer_id_cache: moka::future::Cache<PeerId, OffchainPublicKey>,
30}
31
32impl<Chain, S, T> HoprDecoder<Chain, S, T>
33where
34 Chain: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Send + Sync,
35 S: SurbStore + Send + Sync,
36 T: TicketTracker + Send + Sync,
37{
38 pub fn new(
40 (packet_key, chain_key): (OffchainKeypair, ChainKeypair),
41 chain_api: Chain,
42 surb_store: S,
43 tracker: T,
44 channels_dst: Hash,
45 cfg: HoprCodecConfig,
46 ) -> Self {
47 Self {
48 chain_api,
49 surb_store: std::sync::Arc::new(surb_store),
50 packet_key,
51 chain_key,
52 channels_dst,
53 cfg,
54 tracker,
55 tbf: parking_lot::Mutex::new(TagBloomFilter::default()),
56 peer_id_cache: moka::future::Cache::builder()
57 .time_to_idle(Duration::from_secs(600))
58 .max_capacity(100_000)
59 .build(),
60 }
61 }
62
63 #[tracing::instrument(skip(self, fwd), level = "debug", fields(path_pos = fwd.path_pos))]
64 async fn validate_and_replace_ticket(
65 &self,
66 mut fwd: HoprForwardedPacket,
67 ) -> Result<(HoprForwardedPacket, UnacknowledgedTicket), HoprProtocolError> {
68 let previous_hop_addr = trace_timed!("previous_hop_addr lookup", {
69 self.chain_api
70 .packet_key_to_chain_key(&fwd.previous_hop)
71 .await
72 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
73 .ok_or(HoprProtocolError::KeyNotFound)?
74 });
75
76 let next_hop_addr = trace_timed!("next_hop_addr lookup", {
77 self.chain_api
78 .packet_key_to_chain_key(&fwd.outgoing.next_hop)
79 .await
80 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
81 .ok_or(HoprProtocolError::KeyNotFound)?
82 });
83
84 let incoming_channel = trace_timed!("incoming_channel lookup", {
85 self.chain_api
86 .channel_by_parties(&previous_hop_addr, self.chain_key.as_ref())
87 .await
88 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
89 .ok_or_else(|| HoprProtocolError::ChannelNotFound(previous_hop_addr, *self.chain_key.as_ref()))?
90 });
91
92 let minimum_ticket_price = trace_timed!("minimum_ticket_price lookup", {
95 self.chain_api
96 .minimum_ticket_price()
97 .await
98 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
99 .mul(U256::from(fwd.path_pos))
100 })
101 .max(self.cfg.min_incoming_ticket_price.unwrap_or_default());
102
103 let remaining_balance = trace_timed!("unrealized_balance lookup", {
104 incoming_channel.balance.sub(
105 self.tracker
106 .incoming_channel_unrealized_balance(
107 incoming_channel.get_id(),
108 incoming_channel.channel_epoch,
109 incoming_channel.ticket_index,
110 )
111 .await
112 .map_err(|e| HoprProtocolError::TicketTrackerError(e.into()))?,
113 )
114 });
115
116 let win_prob = trace_timed!("win_prob lookup", {
121 self.chain_api
122 .minimum_incoming_ticket_win_prob()
123 .await
124 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
125 });
126
127 let domain_separator = self.channels_dst;
128
129 let verified_incoming_ticket = trace_timed!("ticket_signature_verification", {
130 hopr_parallelize::cpu::spawn_fifo_blocking(
131 move || {
132 validate_unacknowledged_ticket(
133 fwd.outgoing.ticket,
134 &incoming_channel,
135 minimum_ticket_price,
136 win_prob,
137 remaining_balance,
138 &domain_separator,
139 )
140 },
141 "ticket_verify",
142 )
143 .await??
144 });
145
146 tracing::trace!(%verified_incoming_ticket, "successfully verified incoming ticket");
148
149 let ticket_builder = if fwd.path_pos > 1 {
155 let outgoing_channel = self
159 .chain_api
160 .channel_by_parties(self.chain_key.as_ref(), &next_hop_addr)
161 .await
162 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
163 .ok_or_else(|| HoprProtocolError::ChannelNotFound(*self.chain_key.as_ref(), next_hop_addr))?;
164
165 let (outgoing_ticket_win_prob, outgoing_ticket_price) = self
166 .chain_api
167 .outgoing_ticket_values(self.cfg.outgoing_win_prob, self.cfg.outgoing_ticket_price)
168 .await
169 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?;
170
171 let outgoing_ticket_win_prob = outgoing_ticket_win_prob.max(&verified_incoming_ticket.win_prob());
175
176 self.tracker
180 .create_multihop_ticket(
181 &outgoing_channel,
182 fwd.path_pos,
183 outgoing_ticket_win_prob,
184 outgoing_ticket_price,
185 )
186 .await
187 .map_err(|e| match e {
188 TicketCreationError::OutOfFunds(id, a) => HoprProtocolError::OutOfFunds(id, a),
189 e => HoprProtocolError::TicketTrackerError(e.into()),
190 })?
191 } else {
192 TicketBuilder::zero_hop().counterparty(next_hop_addr)
193 };
194
195 let ticket_builder = ticket_builder.eth_challenge(fwd.next_challenge);
197 fwd.outgoing.ticket = trace_timed!("ticket_signing", {
198 ticket_builder.build_signed(&self.chain_key, &domain_separator)?.leak()
199 });
200
201 let unack_ticket = verified_incoming_ticket.into_unacknowledged(fwd.own_key);
202 Ok((fwd, unack_ticket))
203 }
204}
205
206#[async_trait::async_trait]
207impl<Chain, S, T> PacketDecoder for HoprDecoder<Chain, S, T>
208where
209 Chain: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Send + Sync,
210 S: SurbStore + Send + Sync + 'static,
211 T: TicketTracker + Send + Sync,
212{
213 type Error = HoprProtocolError;
214
215 #[tracing::instrument(skip(self, sender, data), level = "trace", fields(%sender))]
216 async fn decode(
217 &self,
218 sender: PeerId,
219 data: Box<[u8]>,
220 ) -> Result<IncomingPacket, IncomingPacketError<Self::Error>> {
221 #[cfg(feature = "trace-timing")]
222 let decode_start = std::time::Instant::now();
223 tracing::trace!(data_len = data.len(), "decoding packet");
224
225 let previous_hop = trace_timed!("peer_id_conversion complete", {
229 match self
230 .peer_id_cache
231 .try_get_with_by_ref(&sender, async {
232 hopr_parallelize::cpu::spawn_fifo_blocking(
233 move || OffchainPublicKey::from_peerid(&sender),
234 "peerid_lookup",
235 )
236 .await
237 .map_err(HoprProtocolError::from)?
238 .map_err(HoprProtocolError::from)
239 })
240 .await
241 {
242 Ok(peer) => peer,
243 Err(error) => {
244 return match error.as_ref() {
245 HoprProtocolError::SpawnError(spawn_err) => {
246 tracing::warn!(%sender, %error, "dropping packet due to local CPU overload (not sender's fault)");
247 Err(IncomingPacketError::Overloaded(HoprProtocolError::SpawnError(
248 *spawn_err,
249 )))
250 }
251 _ => {
252 tracing::error!(%sender, %error, "dropping packet - cannot convert peer id");
253 Err(IncomingPacketError::Undecodable(HoprProtocolError::InvalidSender))
254 }
255 };
256 }
257 }
258 });
259
260 let offchain_keypair = self.packet_key.clone();
262 let surb_store = self.surb_store.clone();
263 let mapper = self.chain_api.key_id_mapper_ref().clone();
264
265 let packet = trace_timed!("sphinx_decode complete", {
268 hopr_parallelize::cpu::spawn_fifo_blocking(
269 move || {
270 HoprPacket::from_incoming(&data, &offchain_keypair, previous_hop, &mapper, |p| {
271 surb_store.find_reply_opener(p)
272 })
273 },
274 "packet_decode",
275 )
276 .await
277 .map_err(|e| IncomingPacketError::Overloaded(e.into()))?
278 .map_err(|e| IncomingPacketError::Undecodable(e.into()))?
279 });
280
281 let packet_type = match &packet {
282 HoprPacket::Final(_) => "final",
283 HoprPacket::Forwarded(_) => "forwarded",
284 HoprPacket::Outgoing(_) => "outgoing",
285 };
286
287 if let Some(tag) = packet.packet_tag() {
290 if self.tbf.lock().check_and_set(tag) {
293 return Err(IncomingPacketError::ProcessingError(
294 previous_hop,
295 HoprProtocolError::Replay,
296 ));
297 }
298 }
299
300 match packet {
301 HoprPacket::Final(incoming) => {
302 let info = AuxiliaryPacketInfo {
304 packet_signals: incoming.signals,
305 num_surbs: incoming.surbs.len(),
306 };
307
308 if !incoming.surbs.is_empty() {
310 self.surb_store.insert_surbs(incoming.sender, incoming.surbs).await;
311 tracing::trace!(pseudonym = %incoming.sender, num_surbs = info.num_surbs, packet_type, "stored incoming surbs for pseudonym");
312 }
313
314 let result = match incoming.ack_key {
315 None => {
316 if incoming.plain_text.len() < size_of::<u16>() {
317 return Err(IncomingPacketError::Undecodable(
318 GeneralError::ParseError("invalid acknowledgement packet size".into()).into(),
319 ));
320 }
321
322 let num_acks =
323 u16::from_be_bytes(incoming.plain_text[..size_of::<u16>()].try_into().map_err(|_| {
324 IncomingPacketError::Undecodable(
325 GeneralError::ParseError("invalid num acks".into()).into(),
326 )
327 })?);
328
329 if incoming.plain_text.len() < size_of::<u16>() + (num_acks as usize) * Acknowledgement::SIZE {
330 return Err(IncomingPacketError::Undecodable(
331 GeneralError::ParseError("invalid number of acknowledgements in packet".into()).into(),
332 ));
333 }
334 tracing::trace!(num_acks, packet_type, "received acknowledgement packet");
335
336 IncomingPacket::Acknowledgement(
338 IncomingAcknowledgementPacket {
339 packet_tag: incoming.packet_tag,
340 previous_hop: incoming.previous_hop,
341 received_acks: incoming.plain_text
342 [size_of::<u16>()..size_of::<u16>() + num_acks as usize * Acknowledgement::SIZE]
343 .chunks_exact(Acknowledgement::SIZE)
344 .map(Acknowledgement::try_from)
345 .collect::<Result<Vec<_>, _>>()
346 .map_err(|e: GeneralError| IncomingPacketError::Undecodable(e.into()))?,
347 }
348 .into(),
349 )
350 }
351 Some(ack_key) => IncomingPacket::Final(
352 IncomingFinalPacket {
353 packet_tag: incoming.packet_tag,
354 previous_hop: incoming.previous_hop,
355 sender: incoming.sender,
356 plain_text: incoming.plain_text,
357 ack_key,
358 info,
359 }
360 .into(),
361 ),
362 };
363 #[cfg(feature = "trace-timing")]
364 tracing::trace!(
365 total_ms = decode_start.elapsed().as_millis() as u64,
366 packet_type,
367 "decode complete"
368 );
369 Ok(result)
370 }
371 HoprPacket::Forwarded(fwd) => {
372 let (fwd, verified_unack_ticket) = trace_timed!("ticket_validation complete", {
375 self.validate_and_replace_ticket(*fwd)
376 .await
377 .map_err(|error| match error {
378 HoprProtocolError::TicketValidationError(e) => {
380 IncomingPacketError::InvalidTicket(previous_hop, e)
381 }
382 e => IncomingPacketError::ProcessingError(previous_hop, e),
383 })?
384 });
385
386 let mut payload = Vec::with_capacity(HoprPacket::SIZE);
387 payload.extend_from_slice(fwd.outgoing.packet.as_ref());
388 payload.extend_from_slice(&fwd.outgoing.ticket.into_encoded());
389
390 #[cfg(feature = "trace-timing")]
391 tracing::trace!(
392 total_ms = decode_start.elapsed().as_millis() as u64,
393 packet_type,
394 "decode complete"
395 );
396 Ok(IncomingPacket::Forwarded(
397 IncomingForwardedPacket {
398 packet_tag: fwd.packet_tag,
399 previous_hop: fwd.previous_hop,
400 next_hop: fwd.outgoing.next_hop,
401 data: payload.into_boxed_slice(),
402 ack_challenge: fwd.outgoing.ack_challenge,
403 received_ticket: verified_unack_ticket,
404 ack_key_prev_hop: fwd.ack_key,
405 }
406 .into(),
407 ))
408 }
409 HoprPacket::Outgoing(_) => {
410 #[cfg(feature = "trace-timing")]
411 tracing::trace!(
412 total_ms = decode_start.elapsed().as_millis() as u64,
413 packet_type,
414 "decode complete"
415 );
416 Err(IncomingPacketError::ProcessingError(
417 previous_hop,
418 HoprProtocolError::InvalidState("cannot be outgoing packet"),
419 ))
420 }
421 }
422 }
423}