Skip to main content

hopr_protocol_hopr/codec/
decoder.rs

1use std::{
2    ops::{Mul, Sub},
3    time::Duration,
4};
5
6use hopr_api::chain::*;
7use hopr_crypto_packet::prelude::*;
8use hopr_crypto_types::prelude::*;
9use hopr_internal_types::prelude::*;
10use hopr_platform::trace_timed;
11use hopr_primitive_types::prelude::*;
12
13use crate::{
14    AuxiliaryPacketInfo, HoprCodecConfig, IncomingAcknowledgementPacket, IncomingFinalPacket, IncomingForwardedPacket,
15    IncomingPacket, IncomingPacketError, PacketDecoder, SurbStore, TicketCreationError, TicketTracker,
16    errors::HoprProtocolError, tbf::TagBloomFilter,
17};
18
19/// Default [decoder](PacketDecoder) implementation for HOPR packets.
20pub struct HoprDecoder<Chain, S, T> {
21    chain_api: Chain,
22    surb_store: std::sync::Arc<S>,
23    tracker: T,
24    packet_key: OffchainKeypair,
25    chain_key: ChainKeypair,
26    channels_dst: Hash,
27    cfg: HoprCodecConfig,
28    tbf: parking_lot::Mutex<TagBloomFilter>,
29    peer_id_cache: moka::future::Cache<PeerId, OffchainPublicKey>,
30}
31
32impl<Chain, S, T> HoprDecoder<Chain, S, T>
33where
34    Chain: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Send + Sync,
35    S: SurbStore + Send + Sync,
36    T: TicketTracker + Send + Sync,
37{
38    /// Creates a new instance of the decoder.
39    pub fn new(
40        (packet_key, chain_key): (OffchainKeypair, ChainKeypair),
41        chain_api: Chain,
42        surb_store: S,
43        tracker: T,
44        channels_dst: Hash,
45        cfg: HoprCodecConfig,
46    ) -> Self {
47        Self {
48            chain_api,
49            surb_store: std::sync::Arc::new(surb_store),
50            packet_key,
51            chain_key,
52            channels_dst,
53            cfg,
54            tracker,
55            tbf: parking_lot::Mutex::new(TagBloomFilter::default()),
56            peer_id_cache: moka::future::Cache::builder()
57                .time_to_idle(Duration::from_secs(600))
58                .max_capacity(100_000)
59                .build(),
60        }
61    }
62
63    #[tracing::instrument(skip(self, fwd), level = "debug", fields(path_pos = fwd.path_pos))]
64    async fn validate_and_replace_ticket(
65        &self,
66        mut fwd: HoprForwardedPacket,
67    ) -> Result<(HoprForwardedPacket, UnacknowledgedTicket), HoprProtocolError> {
68        let previous_hop_addr = trace_timed!("previous_hop_addr lookup", {
69            self.chain_api
70                .packet_key_to_chain_key(&fwd.previous_hop)
71                .await
72                .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
73                .ok_or(HoprProtocolError::KeyNotFound)?
74        });
75
76        let next_hop_addr = trace_timed!("next_hop_addr lookup", {
77            self.chain_api
78                .packet_key_to_chain_key(&fwd.outgoing.next_hop)
79                .await
80                .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
81                .ok_or(HoprProtocolError::KeyNotFound)?
82        });
83
84        let incoming_channel = trace_timed!("incoming_channel lookup", {
85            self.chain_api
86                .channel_by_parties(&previous_hop_addr, self.chain_key.as_ref())
87                .await
88                .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
89                .ok_or_else(|| HoprProtocolError::ChannelNotFound(previous_hop_addr, *self.chain_key.as_ref()))?
90        });
91
92        // The ticket price from the oracle times my node's position on the
93        // path is the acceptable minimum
94        let minimum_ticket_price = trace_timed!("minimum_ticket_price lookup", {
95            self.chain_api
96                .minimum_ticket_price()
97                .await
98                .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
99                .mul(U256::from(fwd.path_pos))
100        })
101        .max(self.cfg.min_incoming_ticket_price.unwrap_or_default());
102
103        let remaining_balance = trace_timed!("unrealized_balance lookup", {
104            incoming_channel.balance.sub(
105                self.tracker
106                    .incoming_channel_unrealized_balance(
107                        incoming_channel.get_id(),
108                        incoming_channel.channel_epoch,
109                        incoming_channel.ticket_index,
110                    )
111                    .await
112                    .map_err(|e| HoprProtocolError::TicketTrackerError(e.into()))?,
113            )
114        });
115
116        // Here also the signature on the ticket gets validated,
117        // so afterward we are sure the source of the `channel`
118        // (which is equal to `previous_hop_addr`) has issued this
119        // ticket.
120        let win_prob = trace_timed!("win_prob lookup", {
121            self.chain_api
122                .minimum_incoming_ticket_win_prob()
123                .await
124                .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
125        });
126
127        let domain_separator = self.channels_dst;
128
129        let verified_incoming_ticket = trace_timed!("ticket_signature_verification", {
130            hopr_parallelize::cpu::spawn_fifo_blocking(
131                move || {
132                    validate_unacknowledged_ticket(
133                        fwd.outgoing.ticket,
134                        &incoming_channel,
135                        minimum_ticket_price,
136                        win_prob,
137                        remaining_balance,
138                        &domain_separator,
139                    )
140                },
141                "ticket_verify",
142            )
143            .await??
144        });
145
146        // The ticket is now validated:
147        tracing::trace!(%verified_incoming_ticket, "successfully verified incoming ticket");
148
149        // NOTE: that the path position according to the ticket value
150        // may no longer match the path position from the packet header,
151        // because the ticket issuer may set the price of the ticket higher.
152
153        // Create the new ticket for the new packet
154        let ticket_builder = if fwd.path_pos > 1 {
155            // There must be a channel to the next node if it's not the final hop.
156            // If the channel does not exist, the ticket we extracted before cannot be saved,
157            // as there would be no way to acknowledge it without the channel.
158            let outgoing_channel = self
159                .chain_api
160                .channel_by_parties(self.chain_key.as_ref(), &next_hop_addr)
161                .await
162                .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
163                .ok_or_else(|| HoprProtocolError::ChannelNotFound(*self.chain_key.as_ref(), next_hop_addr))?;
164
165            let (outgoing_ticket_win_prob, outgoing_ticket_price) = self
166                .chain_api
167                .outgoing_ticket_values(self.cfg.outgoing_win_prob, self.cfg.outgoing_ticket_price)
168                .await
169                .map_err(|e| HoprProtocolError::ResolverError(e.into()))?;
170
171            // We currently take the maximum of the win prob from the incoming ticket
172            // and the one configured on this node.
173            // Therefore, the winning probability can only increase along the path.
174            let outgoing_ticket_win_prob = outgoing_ticket_win_prob.max(&verified_incoming_ticket.win_prob());
175
176            // The following operation fails if there's not enough balance on the channel to the next hop.
177            // Again, in this case, we cannot save the ticket we previously extracted because there is no way it gets
178            // acknowledged without enough balance.
179            self.tracker
180                .create_multihop_ticket(
181                    &outgoing_channel,
182                    fwd.path_pos,
183                    outgoing_ticket_win_prob,
184                    outgoing_ticket_price,
185                )
186                .await
187                .map_err(|e| match e {
188                    TicketCreationError::OutOfFunds(id, a) => HoprProtocolError::OutOfFunds(id, a),
189                    e => HoprProtocolError::TicketTrackerError(e.into()),
190                })?
191        } else {
192            TicketBuilder::zero_hop().counterparty(next_hop_addr)
193        };
194
195        // Finally, replace the ticket in the outgoing packet with a new one
196        let ticket_builder = ticket_builder.eth_challenge(fwd.next_challenge);
197        fwd.outgoing.ticket = trace_timed!("ticket_signing", {
198            ticket_builder.build_signed(&self.chain_key, &domain_separator)?.leak()
199        });
200
201        let unack_ticket = verified_incoming_ticket.into_unacknowledged(fwd.own_key);
202        Ok((fwd, unack_ticket))
203    }
204}
205
206#[async_trait::async_trait]
207impl<Chain, S, T> PacketDecoder for HoprDecoder<Chain, S, T>
208where
209    Chain: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Send + Sync,
210    S: SurbStore + Send + Sync + 'static,
211    T: TicketTracker + Send + Sync,
212{
213    type Error = HoprProtocolError;
214
215    #[tracing::instrument(skip(self, sender, data), level = "trace", fields(%sender))]
216    async fn decode(
217        &self,
218        sender: PeerId,
219        data: Box<[u8]>,
220    ) -> Result<IncomingPacket, IncomingPacketError<Self::Error>> {
221        #[cfg(feature = "trace-timing")]
222        let decode_start = std::time::Instant::now();
223        tracing::trace!(data_len = data.len(), "decoding packet");
224
225        // Phase 1: Peer ID conversion
226        // Try to retrieve the peer's public key from the cache or compute it if it does not exist yet.
227        // The async block ensures the Rayon task is only submitted on cache miss.
228        let previous_hop = trace_timed!("peer_id_conversion complete", {
229            match self
230                .peer_id_cache
231                .try_get_with_by_ref(&sender, async {
232                    hopr_parallelize::cpu::spawn_fifo_blocking(
233                        move || OffchainPublicKey::from_peerid(&sender),
234                        "peerid_lookup",
235                    )
236                    .await
237                    .map_err(HoprProtocolError::from)?
238                    .map_err(HoprProtocolError::from)
239                })
240                .await
241            {
242                Ok(peer) => peer,
243                Err(error) => {
244                    return match error.as_ref() {
245                        HoprProtocolError::SpawnError(spawn_err) => {
246                            tracing::warn!(%sender, %error, "dropping packet due to local CPU overload (not sender's fault)");
247                            Err(IncomingPacketError::Overloaded(HoprProtocolError::SpawnError(
248                                *spawn_err,
249                            )))
250                        }
251                        _ => {
252                            tracing::error!(%sender, %error, "dropping packet - cannot convert peer id");
253                            Err(IncomingPacketError::Undecodable(HoprProtocolError::InvalidSender))
254                        }
255                    };
256                }
257            }
258        });
259
260        // Phase 2: Sphinx packet decoding
261        let offchain_keypair = self.packet_key.clone();
262        let surb_store = self.surb_store.clone();
263        let mapper = self.chain_api.key_id_mapper_ref().clone();
264
265        // If the following operation fails, it means that the packet is not a valid Hopr packet,
266        // and as such should not be acknowledged later.
267        let packet = trace_timed!("sphinx_decode complete", {
268            hopr_parallelize::cpu::spawn_fifo_blocking(
269                move || {
270                    HoprPacket::from_incoming(&data, &offchain_keypair, previous_hop, &mapper, |p| {
271                        surb_store.find_reply_opener(p)
272                    })
273                },
274                "packet_decode",
275            )
276            .await
277            .map_err(|e| IncomingPacketError::Overloaded(e.into()))?
278            .map_err(|e| IncomingPacketError::Undecodable(e.into()))?
279        });
280
281        let packet_type = match &packet {
282            HoprPacket::Final(_) => "final",
283            HoprPacket::Forwarded(_) => "forwarded",
284            HoprPacket::Outgoing(_) => "outgoing",
285        };
286
287        // This is checked on both Final and Forwarded packets,
288        // Outgoing packets are not allowed to pass and are later reported as invalid state.
289        if let Some(tag) = packet.packet_tag() {
290            // This operation has run-time of ~10 nanoseconds,
291            // and therefore does not need to be invoked via spawn_blocking
292            if self.tbf.lock().check_and_set(tag) {
293                return Err(IncomingPacketError::ProcessingError(
294                    previous_hop,
295                    HoprProtocolError::Replay,
296                ));
297            }
298        }
299
300        match packet {
301            HoprPacket::Final(incoming) => {
302                // Extract additional information from the packet that will be passed upwards
303                let info = AuxiliaryPacketInfo {
304                    packet_signals: incoming.signals,
305                    num_surbs: incoming.surbs.len(),
306                };
307
308                // Store all incoming SURBs if any
309                if !incoming.surbs.is_empty() {
310                    self.surb_store.insert_surbs(incoming.sender, incoming.surbs).await;
311                    tracing::trace!(pseudonym = %incoming.sender, num_surbs = info.num_surbs, packet_type, "stored incoming surbs for pseudonym");
312                }
313
314                let result = match incoming.ack_key {
315                    None => {
316                        if incoming.plain_text.len() < size_of::<u16>() {
317                            return Err(IncomingPacketError::Undecodable(
318                                GeneralError::ParseError("invalid acknowledgement packet size".into()).into(),
319                            ));
320                        }
321
322                        let num_acks =
323                            u16::from_be_bytes(incoming.plain_text[..size_of::<u16>()].try_into().map_err(|_| {
324                                IncomingPacketError::Undecodable(
325                                    GeneralError::ParseError("invalid num acks".into()).into(),
326                                )
327                            })?);
328
329                        if incoming.plain_text.len() < size_of::<u16>() + (num_acks as usize) * Acknowledgement::SIZE {
330                            return Err(IncomingPacketError::Undecodable(
331                                GeneralError::ParseError("invalid number of acknowledgements in packet".into()).into(),
332                            ));
333                        }
334                        tracing::trace!(num_acks, packet_type, "received acknowledgement packet");
335
336                        // The contained payload represents an Acknowledgement
337                        IncomingPacket::Acknowledgement(
338                            IncomingAcknowledgementPacket {
339                                packet_tag: incoming.packet_tag,
340                                previous_hop: incoming.previous_hop,
341                                received_acks: incoming.plain_text
342                                    [size_of::<u16>()..size_of::<u16>() + num_acks as usize * Acknowledgement::SIZE]
343                                    .chunks_exact(Acknowledgement::SIZE)
344                                    .map(Acknowledgement::try_from)
345                                    .collect::<Result<Vec<_>, _>>()
346                                    .map_err(|e: GeneralError| IncomingPacketError::Undecodable(e.into()))?,
347                            }
348                            .into(),
349                        )
350                    }
351                    Some(ack_key) => IncomingPacket::Final(
352                        IncomingFinalPacket {
353                            packet_tag: incoming.packet_tag,
354                            previous_hop: incoming.previous_hop,
355                            sender: incoming.sender,
356                            plain_text: incoming.plain_text,
357                            ack_key,
358                            info,
359                        }
360                        .into(),
361                    ),
362                };
363                #[cfg(feature = "trace-timing")]
364                tracing::trace!(
365                    total_ms = decode_start.elapsed().as_millis() as u64,
366                    packet_type,
367                    "decode complete"
368                );
369                Ok(result)
370            }
371            HoprPacket::Forwarded(fwd) => {
372                // Phase 3: Ticket validation and replacement for forwarded packets
373                // Transform the ticket so it can be sent to the next hop
374                let (fwd, verified_unack_ticket) = trace_timed!("ticket_validation complete", {
375                    self.validate_and_replace_ticket(*fwd)
376                        .await
377                        .map_err(|error| match error {
378                            // Distinguish ticket validation errors so that they can get extra treatment later
379                            HoprProtocolError::TicketValidationError(e) => {
380                                IncomingPacketError::InvalidTicket(previous_hop, e)
381                            }
382                            e => IncomingPacketError::ProcessingError(previous_hop, e),
383                        })?
384                });
385
386                let mut payload = Vec::with_capacity(HoprPacket::SIZE);
387                payload.extend_from_slice(fwd.outgoing.packet.as_ref());
388                payload.extend_from_slice(&fwd.outgoing.ticket.into_encoded());
389
390                #[cfg(feature = "trace-timing")]
391                tracing::trace!(
392                    total_ms = decode_start.elapsed().as_millis() as u64,
393                    packet_type,
394                    "decode complete"
395                );
396                Ok(IncomingPacket::Forwarded(
397                    IncomingForwardedPacket {
398                        packet_tag: fwd.packet_tag,
399                        previous_hop: fwd.previous_hop,
400                        next_hop: fwd.outgoing.next_hop,
401                        data: payload.into_boxed_slice(),
402                        ack_challenge: fwd.outgoing.ack_challenge,
403                        received_ticket: verified_unack_ticket,
404                        ack_key_prev_hop: fwd.ack_key,
405                    }
406                    .into(),
407                ))
408            }
409            HoprPacket::Outgoing(_) => {
410                #[cfg(feature = "trace-timing")]
411                tracing::trace!(
412                    total_ms = decode_start.elapsed().as_millis() as u64,
413                    packet_type,
414                    "decode complete"
415                );
416                Err(IncomingPacketError::ProcessingError(
417                    previous_hop,
418                    HoprProtocolError::InvalidState("cannot be outgoing packet"),
419                ))
420            }
421        }
422    }
423}