hopr_protocol_hopr/codec/
decoder.rs

1use std::{
2    ops::{Mul, Sub},
3    time::Duration,
4};
5
6use hopr_api::chain::*;
7use hopr_crypto_packet::prelude::*;
8use hopr_crypto_types::prelude::*;
9use hopr_internal_types::prelude::*;
10use hopr_primitive_types::prelude::*;
11
12use crate::{
13    AuxiliaryPacketInfo, HoprCodecConfig, IncomingAcknowledgementPacket, IncomingFinalPacket, IncomingForwardedPacket,
14    IncomingPacket, IncomingPacketError, PacketDecoder, SurbStore, TicketCreationError, TicketTracker,
15    errors::HoprProtocolError, tbf::TagBloomFilter,
16};
17
18/// Default [decoder](PacketDecoder) implementation for HOPR packets.
19pub struct HoprDecoder<Chain, S, T> {
20    chain_api: Chain,
21    surb_store: std::sync::Arc<S>,
22    tracker: T,
23    packet_key: OffchainKeypair,
24    chain_key: ChainKeypair,
25    channels_dst: Hash,
26    cfg: HoprCodecConfig,
27    tbf: parking_lot::Mutex<TagBloomFilter>,
28    peer_id_cache: moka::future::Cache<PeerId, OffchainPublicKey>,
29}
30
31impl<Chain, S, T> HoprDecoder<Chain, S, T>
32where
33    Chain: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Send + Sync,
34    S: SurbStore + Send + Sync,
35    T: TicketTracker + Send + Sync,
36{
37    /// Creates a new instance of the decoder.
38    pub fn new(
39        (packet_key, chain_key): (OffchainKeypair, ChainKeypair),
40        chain_api: Chain,
41        surb_store: S,
42        tracker: T,
43        channels_dst: Hash,
44        cfg: HoprCodecConfig,
45    ) -> Self {
46        Self {
47            chain_api,
48            surb_store: std::sync::Arc::new(surb_store),
49            packet_key,
50            chain_key,
51            channels_dst,
52            cfg,
53            tracker,
54            tbf: parking_lot::Mutex::new(TagBloomFilter::default()),
55            peer_id_cache: moka::future::Cache::builder()
56                .time_to_idle(Duration::from_secs(600))
57                .max_capacity(100_000)
58                .build(),
59        }
60    }
61
62    async fn validate_and_replace_ticket(
63        &self,
64        mut fwd: HoprForwardedPacket,
65    ) -> Result<(HoprForwardedPacket, UnacknowledgedTicket), HoprProtocolError> {
66        let previous_hop_addr = self
67            .chain_api
68            .packet_key_to_chain_key(&fwd.previous_hop)
69            .await
70            .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
71            .ok_or(HoprProtocolError::KeyNotFound)?;
72
73        let next_hop_addr = self
74            .chain_api
75            .packet_key_to_chain_key(&fwd.outgoing.next_hop)
76            .await
77            .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
78            .ok_or(HoprProtocolError::KeyNotFound)?;
79
80        let incoming_channel = self
81            .chain_api
82            .channel_by_parties(&previous_hop_addr, self.chain_key.as_ref())
83            .await
84            .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
85            .ok_or_else(|| HoprProtocolError::ChannelNotFound(previous_hop_addr, *self.chain_key.as_ref()))?;
86
87        // The ticket price from the oracle times my node's position on the
88        // path is the acceptable minimum
89        let minimum_ticket_price = self
90            .chain_api
91            .minimum_ticket_price()
92            .await
93            .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
94            .mul(U256::from(fwd.path_pos));
95
96        let remaining_balance = incoming_channel.balance.sub(
97            self.tracker
98                .incoming_channel_unrealized_balance(incoming_channel.get_id(), incoming_channel.channel_epoch)
99                .await
100                .map_err(|e| HoprProtocolError::TicketTrackerError(e.into()))?,
101        );
102
103        // Here also the signature on the ticket gets validated,
104        // so afterward we are sure the source of the `channel`
105        // (which is equal to `previous_hop_addr`) has issued this
106        // ticket.
107        let win_prob = self
108            .chain_api
109            .minimum_incoming_ticket_win_prob()
110            .await
111            .map_err(|e| HoprProtocolError::ResolverError(e.into()))?;
112        let domain_separator = self.channels_dst;
113
114        let verified_incoming_ticket = hopr_parallelize::cpu::spawn_fifo_blocking(move || {
115            validate_unacknowledged_ticket(
116                fwd.outgoing.ticket,
117                &incoming_channel,
118                minimum_ticket_price,
119                win_prob,
120                remaining_balance,
121                &domain_separator,
122            )
123        })
124        .await?;
125
126        // The ticket is now validated:
127        tracing::trace!(%verified_incoming_ticket, "successfully verified incoming ticket");
128
129        // NOTE: that the path position according to the ticket value
130        // may no longer match the path position from the packet header,
131        // because the ticket issuer may set the price of the ticket higher.
132
133        // Create the new ticket for the new packet
134        let ticket_builder = if fwd.path_pos > 1 {
135            // There must be a channel to the next node if it's not the final hop.
136            // If the channel does not exist, the ticket we extracted before cannot be saved,
137            // as there would be no way to acknowledge it without the channel.
138            let outgoing_channel = self
139                .chain_api
140                .channel_by_parties(self.chain_key.as_ref(), &next_hop_addr)
141                .await
142                .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
143                .ok_or_else(|| HoprProtocolError::ChannelNotFound(*self.chain_key.as_ref(), next_hop_addr))?;
144
145            let (outgoing_ticket_win_prob, outgoing_ticket_price) = self
146                .chain_api
147                .outgoing_ticket_values(self.cfg.outgoing_win_prob, self.cfg.outgoing_ticket_price)
148                .await
149                .map_err(|e| HoprProtocolError::ResolverError(e.into()))?;
150
151            // We currently take the maximum of the win prob from the incoming ticket
152            // and the one configured on this node.
153            // Therefore, the winning probability can only increase along the path.
154            let outgoing_ticket_win_prob = outgoing_ticket_win_prob.max(&verified_incoming_ticket.win_prob());
155
156            // The following operation fails if there's not enough balance on the channel to the next hop.
157            // Again, in this case, we cannot save the ticket we previously extracted because there is no way it gets
158            // acknowledged without enough balance.
159            self.tracker
160                .create_multihop_ticket(
161                    &outgoing_channel,
162                    fwd.path_pos,
163                    outgoing_ticket_win_prob,
164                    outgoing_ticket_price,
165                )
166                .await
167                .map_err(|e| match e {
168                    TicketCreationError::OutOfFunds(id, a) => HoprProtocolError::OutOfFunds(id, a),
169                    e => HoprProtocolError::TicketTrackerError(e.into()),
170                })?
171        } else {
172            TicketBuilder::zero_hop().counterparty(next_hop_addr)
173        };
174
175        // Finally, replace the ticket in the outgoing packet with a new one
176        let ticket_builder = ticket_builder.eth_challenge(fwd.next_challenge);
177        let me_on_chain = self.chain_key.clone();
178        fwd.outgoing.ticket = hopr_parallelize::cpu::spawn_fifo_blocking(move || {
179            ticket_builder.build_signed(&me_on_chain, &domain_separator)
180        })
181        .await?
182        .leak();
183
184        let unack_ticket = verified_incoming_ticket.into_unacknowledged(fwd.own_key);
185        Ok((fwd, unack_ticket))
186    }
187}
188
189#[async_trait::async_trait]
190impl<Chain, S, T> PacketDecoder for HoprDecoder<Chain, S, T>
191where
192    Chain: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Send + Sync,
193    S: SurbStore + Send + Sync + 'static,
194    T: TicketTracker + Send + Sync,
195{
196    type Error = HoprProtocolError;
197
198    #[tracing::instrument(skip(self, sender, data), level = "trace", fields(%sender))]
199    async fn decode(
200        &self,
201        sender: PeerId,
202        data: Box<[u8]>,
203    ) -> Result<IncomingPacket, IncomingPacketError<Self::Error>> {
204        tracing::trace!(data_len = data.len(), "decoding packet");
205
206        // Try to retrieve the peer's public key from the cache or compute it if it does not exist yet
207        let previous_hop = match self
208            .peer_id_cache
209            .try_get_with_by_ref(
210                &sender,
211                hopr_parallelize::cpu::spawn_fifo_blocking(move || OffchainPublicKey::from_peerid(&sender)),
212            )
213            .await
214        {
215            Ok(peer) => peer,
216            Err(error) => {
217                // There absolutely nothing we can do when the peer id is unparseable (e.g., non-ed25519 based)
218                tracing::error!(%sender, %error, "dropping packet - cannot convert peer id");
219                return Err(IncomingPacketError::Undecodable(HoprProtocolError::InvalidSender));
220            }
221        };
222
223        let offchain_keypair = self.packet_key.clone();
224        let surb_store = self.surb_store.clone();
225        let mapper = self.chain_api.key_id_mapper_ref().clone();
226
227        // If the following operation fails, it means that the packet is not a valid Hopr packet,
228        // and as such should not be acknowledged later.
229        let packet = hopr_parallelize::cpu::spawn_fifo_blocking(move || {
230            HoprPacket::from_incoming(&data, &offchain_keypair, previous_hop, &mapper, |p| {
231                surb_store.find_reply_opener(p)
232            })
233        })
234        .await
235        .map_err(|e| IncomingPacketError::Undecodable(e.into()))?;
236
237        // This is checked on both Final and Forwarded packets,
238        // Outgoing packets are not allowed to pass and are later reported as invalid state.
239        if let Some(tag) = packet.packet_tag() {
240            // This operation has run-time of ~10 nanoseconds,
241            // and therefore does not need to be invoked via spawn_blocking
242            if self.tbf.lock().check_and_set(tag) {
243                return Err(IncomingPacketError::ProcessingError(
244                    previous_hop,
245                    HoprProtocolError::Replay,
246                ));
247            }
248        }
249
250        match packet {
251            HoprPacket::Final(incoming) => {
252                // Extract additional information from the packet that will be passed upwards
253                let info = AuxiliaryPacketInfo {
254                    packet_signals: incoming.signals,
255                    num_surbs: incoming.surbs.len(),
256                };
257
258                // Store all incoming SURBs if any
259                if !incoming.surbs.is_empty() {
260                    self.surb_store.insert_surbs(incoming.sender, incoming.surbs).await;
261                    tracing::trace!(pseudonym = %incoming.sender, num_surbs = info.num_surbs, "stored incoming surbs for pseudonym");
262                }
263
264                Ok(match incoming.ack_key {
265                    None => {
266                        if incoming.plain_text.len() < size_of::<u16>() {
267                            return Err(IncomingPacketError::Undecodable(
268                                GeneralError::ParseError("invalid acknowledgement packet size".into()).into(),
269                            ));
270                        }
271
272                        let num_acks =
273                            u16::from_be_bytes(incoming.plain_text[..size_of::<u16>()].try_into().map_err(|_| {
274                                IncomingPacketError::Undecodable(
275                                    GeneralError::ParseError("invalid num acks".into()).into(),
276                                )
277                            })?);
278
279                        if incoming.plain_text.len() < size_of::<u16>() + (num_acks as usize) * Acknowledgement::SIZE {
280                            return Err(IncomingPacketError::Undecodable(
281                                GeneralError::ParseError("invalid number of acknowledgements in packet".into()).into(),
282                            ));
283                        }
284                        tracing::trace!(num_acks, "received acknowledgement packet");
285
286                        // The contained payload represents an Acknowledgement
287                        IncomingPacket::Acknowledgement(
288                            IncomingAcknowledgementPacket {
289                                packet_tag: incoming.packet_tag,
290                                previous_hop: incoming.previous_hop,
291                                received_acks: incoming.plain_text
292                                    [size_of::<u16>()..size_of::<u16>() + num_acks as usize * Acknowledgement::SIZE]
293                                    .chunks_exact(Acknowledgement::SIZE)
294                                    .map(Acknowledgement::try_from)
295                                    .collect::<Result<Vec<_>, _>>()
296                                    .map_err(|e: GeneralError| IncomingPacketError::Undecodable(e.into()))?,
297                            }
298                            .into(),
299                        )
300                    }
301                    Some(ack_key) => IncomingPacket::Final(
302                        IncomingFinalPacket {
303                            packet_tag: incoming.packet_tag,
304                            previous_hop: incoming.previous_hop,
305                            sender: incoming.sender,
306                            plain_text: incoming.plain_text,
307                            ack_key,
308                            info,
309                        }
310                        .into(),
311                    ),
312                })
313            }
314            HoprPacket::Forwarded(fwd) => {
315                // Transform the ticket so it can be sent to the next hop
316                let (fwd, verified_unack_ticket) =
317                    self.validate_and_replace_ticket(*fwd)
318                        .await
319                        .map_err(|error| match error {
320                            // Distinguish ticket validation errors so that they can get extra treatment later
321                            HoprProtocolError::TicketValidationError(e) => {
322                                IncomingPacketError::InvalidTicket(previous_hop, e)
323                            }
324                            e => IncomingPacketError::ProcessingError(previous_hop, e),
325                        })?;
326
327                let mut payload = Vec::with_capacity(HoprPacket::SIZE);
328                payload.extend_from_slice(fwd.outgoing.packet.as_ref());
329                payload.extend_from_slice(&fwd.outgoing.ticket.into_encoded());
330
331                Ok(IncomingPacket::Forwarded(
332                    IncomingForwardedPacket {
333                        packet_tag: fwd.packet_tag,
334                        previous_hop: fwd.previous_hop,
335                        next_hop: fwd.outgoing.next_hop,
336                        data: payload.into_boxed_slice(),
337                        ack_challenge: fwd.outgoing.ack_challenge,
338                        received_ticket: verified_unack_ticket,
339                        ack_key_prev_hop: fwd.ack_key,
340                    }
341                    .into(),
342                ))
343            }
344            HoprPacket::Outgoing(_) => Err(IncomingPacketError::ProcessingError(
345                previous_hop,
346                HoprProtocolError::InvalidState("cannot be outgoing packet"),
347            )),
348        }
349    }
350}