hopr_protocol_hopr/codec/
decoder.rs1use std::{
2 ops::{Mul, Sub},
3 time::Duration,
4};
5
6use hopr_api::chain::*;
7use hopr_crypto_packet::prelude::*;
8use hopr_crypto_types::prelude::*;
9use hopr_internal_types::prelude::*;
10use hopr_primitive_types::prelude::*;
11
12use crate::{
13 AuxiliaryPacketInfo, HoprCodecConfig, IncomingAcknowledgementPacket, IncomingFinalPacket, IncomingForwardedPacket,
14 IncomingPacket, IncomingPacketError, PacketDecoder, SurbStore, TicketCreationError, TicketTracker,
15 errors::HoprProtocolError, tbf::TagBloomFilter,
16};
17
18pub struct HoprDecoder<Chain, S, T> {
20 chain_api: Chain,
21 surb_store: std::sync::Arc<S>,
22 tracker: T,
23 packet_key: OffchainKeypair,
24 chain_key: ChainKeypair,
25 channels_dst: Hash,
26 cfg: HoprCodecConfig,
27 tbf: parking_lot::Mutex<TagBloomFilter>,
28 peer_id_cache: moka::future::Cache<PeerId, OffchainPublicKey>,
29}
30
31impl<Chain, S, T> HoprDecoder<Chain, S, T>
32where
33 Chain: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Send + Sync,
34 S: SurbStore + Send + Sync,
35 T: TicketTracker + Send + Sync,
36{
37 pub fn new(
39 (packet_key, chain_key): (OffchainKeypair, ChainKeypair),
40 chain_api: Chain,
41 surb_store: S,
42 tracker: T,
43 channels_dst: Hash,
44 cfg: HoprCodecConfig,
45 ) -> Self {
46 Self {
47 chain_api,
48 surb_store: std::sync::Arc::new(surb_store),
49 packet_key,
50 chain_key,
51 channels_dst,
52 cfg,
53 tracker,
54 tbf: parking_lot::Mutex::new(TagBloomFilter::default()),
55 peer_id_cache: moka::future::Cache::builder()
56 .time_to_idle(Duration::from_secs(600))
57 .max_capacity(100_000)
58 .build(),
59 }
60 }
61
62 async fn validate_and_replace_ticket(
63 &self,
64 mut fwd: HoprForwardedPacket,
65 ) -> Result<(HoprForwardedPacket, UnacknowledgedTicket), HoprProtocolError> {
66 let previous_hop_addr = self
67 .chain_api
68 .packet_key_to_chain_key(&fwd.previous_hop)
69 .await
70 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
71 .ok_or(HoprProtocolError::KeyNotFound)?;
72
73 let next_hop_addr = self
74 .chain_api
75 .packet_key_to_chain_key(&fwd.outgoing.next_hop)
76 .await
77 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
78 .ok_or(HoprProtocolError::KeyNotFound)?;
79
80 let incoming_channel = self
81 .chain_api
82 .channel_by_parties(&previous_hop_addr, self.chain_key.as_ref())
83 .await
84 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
85 .ok_or_else(|| HoprProtocolError::ChannelNotFound(previous_hop_addr, *self.chain_key.as_ref()))?;
86
87 let minimum_ticket_price = self
90 .chain_api
91 .minimum_ticket_price()
92 .await
93 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
94 .mul(U256::from(fwd.path_pos));
95
96 let remaining_balance = incoming_channel.balance.sub(
97 self.tracker
98 .incoming_channel_unrealized_balance(incoming_channel.get_id(), incoming_channel.channel_epoch)
99 .await
100 .map_err(|e| HoprProtocolError::TicketTrackerError(e.into()))?,
101 );
102
103 let win_prob = self
108 .chain_api
109 .minimum_incoming_ticket_win_prob()
110 .await
111 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?;
112 let domain_separator = self.channels_dst;
113
114 let verified_incoming_ticket = hopr_parallelize::cpu::spawn_fifo_blocking(move || {
115 validate_unacknowledged_ticket(
116 fwd.outgoing.ticket,
117 &incoming_channel,
118 minimum_ticket_price,
119 win_prob,
120 remaining_balance,
121 &domain_separator,
122 )
123 })
124 .await?;
125
126 tracing::trace!(%verified_incoming_ticket, "successfully verified incoming ticket");
128
129 let ticket_builder = if fwd.path_pos > 1 {
135 let outgoing_channel = self
139 .chain_api
140 .channel_by_parties(self.chain_key.as_ref(), &next_hop_addr)
141 .await
142 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
143 .ok_or_else(|| HoprProtocolError::ChannelNotFound(*self.chain_key.as_ref(), next_hop_addr))?;
144
145 let (outgoing_ticket_win_prob, outgoing_ticket_price) = self
146 .chain_api
147 .outgoing_ticket_values(self.cfg.outgoing_win_prob, self.cfg.outgoing_ticket_price)
148 .await
149 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?;
150
151 let outgoing_ticket_win_prob = outgoing_ticket_win_prob.max(&verified_incoming_ticket.win_prob());
155
156 self.tracker
160 .create_multihop_ticket(
161 &outgoing_channel,
162 fwd.path_pos,
163 outgoing_ticket_win_prob,
164 outgoing_ticket_price,
165 )
166 .await
167 .map_err(|e| match e {
168 TicketCreationError::OutOfFunds(id, a) => HoprProtocolError::OutOfFunds(id, a),
169 e => HoprProtocolError::TicketTrackerError(e.into()),
170 })?
171 } else {
172 TicketBuilder::zero_hop().counterparty(next_hop_addr)
173 };
174
175 let ticket_builder = ticket_builder.eth_challenge(fwd.next_challenge);
177 let me_on_chain = self.chain_key.clone();
178 fwd.outgoing.ticket = hopr_parallelize::cpu::spawn_fifo_blocking(move || {
179 ticket_builder.build_signed(&me_on_chain, &domain_separator)
180 })
181 .await?
182 .leak();
183
184 let unack_ticket = verified_incoming_ticket.into_unacknowledged(fwd.own_key);
185 Ok((fwd, unack_ticket))
186 }
187}
188
189#[async_trait::async_trait]
190impl<Chain, S, T> PacketDecoder for HoprDecoder<Chain, S, T>
191where
192 Chain: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Send + Sync,
193 S: SurbStore + Send + Sync + 'static,
194 T: TicketTracker + Send + Sync,
195{
196 type Error = HoprProtocolError;
197
198 #[tracing::instrument(skip(self, sender, data), level = "trace", fields(%sender))]
199 async fn decode(
200 &self,
201 sender: PeerId,
202 data: Box<[u8]>,
203 ) -> Result<IncomingPacket, IncomingPacketError<Self::Error>> {
204 tracing::trace!(data_len = data.len(), "decoding packet");
205
206 let previous_hop = match self
208 .peer_id_cache
209 .try_get_with_by_ref(
210 &sender,
211 hopr_parallelize::cpu::spawn_fifo_blocking(move || OffchainPublicKey::from_peerid(&sender)),
212 )
213 .await
214 {
215 Ok(peer) => peer,
216 Err(error) => {
217 tracing::error!(%sender, %error, "dropping packet - cannot convert peer id");
219 return Err(IncomingPacketError::Undecodable(HoprProtocolError::InvalidSender));
220 }
221 };
222
223 let offchain_keypair = self.packet_key.clone();
224 let surb_store = self.surb_store.clone();
225 let mapper = self.chain_api.key_id_mapper_ref().clone();
226
227 let packet = hopr_parallelize::cpu::spawn_fifo_blocking(move || {
230 HoprPacket::from_incoming(&data, &offchain_keypair, previous_hop, &mapper, |p| {
231 surb_store.find_reply_opener(p)
232 })
233 })
234 .await
235 .map_err(|e| IncomingPacketError::Undecodable(e.into()))?;
236
237 if let Some(tag) = packet.packet_tag() {
240 if self.tbf.lock().check_and_set(tag) {
243 return Err(IncomingPacketError::ProcessingError(
244 previous_hop,
245 HoprProtocolError::Replay,
246 ));
247 }
248 }
249
250 match packet {
251 HoprPacket::Final(incoming) => {
252 let info = AuxiliaryPacketInfo {
254 packet_signals: incoming.signals,
255 num_surbs: incoming.surbs.len(),
256 };
257
258 if !incoming.surbs.is_empty() {
260 self.surb_store.insert_surbs(incoming.sender, incoming.surbs).await;
261 tracing::trace!(pseudonym = %incoming.sender, num_surbs = info.num_surbs, "stored incoming surbs for pseudonym");
262 }
263
264 Ok(match incoming.ack_key {
265 None => {
266 if incoming.plain_text.len() < size_of::<u16>() {
267 return Err(IncomingPacketError::Undecodable(
268 GeneralError::ParseError("invalid acknowledgement packet size".into()).into(),
269 ));
270 }
271
272 let num_acks =
273 u16::from_be_bytes(incoming.plain_text[..size_of::<u16>()].try_into().map_err(|_| {
274 IncomingPacketError::Undecodable(
275 GeneralError::ParseError("invalid num acks".into()).into(),
276 )
277 })?);
278
279 if incoming.plain_text.len() < size_of::<u16>() + (num_acks as usize) * Acknowledgement::SIZE {
280 return Err(IncomingPacketError::Undecodable(
281 GeneralError::ParseError("invalid number of acknowledgements in packet".into()).into(),
282 ));
283 }
284 tracing::trace!(num_acks, "received acknowledgement packet");
285
286 IncomingPacket::Acknowledgement(
288 IncomingAcknowledgementPacket {
289 packet_tag: incoming.packet_tag,
290 previous_hop: incoming.previous_hop,
291 received_acks: incoming.plain_text
292 [size_of::<u16>()..size_of::<u16>() + num_acks as usize * Acknowledgement::SIZE]
293 .chunks_exact(Acknowledgement::SIZE)
294 .map(Acknowledgement::try_from)
295 .collect::<Result<Vec<_>, _>>()
296 .map_err(|e: GeneralError| IncomingPacketError::Undecodable(e.into()))?,
297 }
298 .into(),
299 )
300 }
301 Some(ack_key) => IncomingPacket::Final(
302 IncomingFinalPacket {
303 packet_tag: incoming.packet_tag,
304 previous_hop: incoming.previous_hop,
305 sender: incoming.sender,
306 plain_text: incoming.plain_text,
307 ack_key,
308 info,
309 }
310 .into(),
311 ),
312 })
313 }
314 HoprPacket::Forwarded(fwd) => {
315 let (fwd, verified_unack_ticket) =
317 self.validate_and_replace_ticket(*fwd)
318 .await
319 .map_err(|error| match error {
320 HoprProtocolError::TicketValidationError(e) => {
322 IncomingPacketError::InvalidTicket(previous_hop, e)
323 }
324 e => IncomingPacketError::ProcessingError(previous_hop, e),
325 })?;
326
327 let mut payload = Vec::with_capacity(HoprPacket::SIZE);
328 payload.extend_from_slice(fwd.outgoing.packet.as_ref());
329 payload.extend_from_slice(&fwd.outgoing.ticket.into_encoded());
330
331 Ok(IncomingPacket::Forwarded(
332 IncomingForwardedPacket {
333 packet_tag: fwd.packet_tag,
334 previous_hop: fwd.previous_hop,
335 next_hop: fwd.outgoing.next_hop,
336 data: payload.into_boxed_slice(),
337 ack_challenge: fwd.outgoing.ack_challenge,
338 received_ticket: verified_unack_ticket,
339 ack_key_prev_hop: fwd.ack_key,
340 }
341 .into(),
342 ))
343 }
344 HoprPacket::Outgoing(_) => Err(IncomingPacketError::ProcessingError(
345 previous_hop,
346 HoprProtocolError::InvalidState("cannot be outgoing packet"),
347 )),
348 }
349 }
350}