Skip to main content

hopr_protocol_hopr/codec/
encoder.rs

1use hopr_api::chain::*;
2use hopr_crypto_packet::prelude::*;
3use hopr_crypto_types::{crypto_traits::Randomizable, prelude::*};
4use hopr_internal_types::prelude::*;
5use hopr_network_types::prelude::*;
6use hopr_primitive_types::prelude::*;
7use tracing::Instrument;
8
9use crate::{
10    HoprCodecConfig, OutgoingPacket, PacketEncoder, SurbStore, TicketCreationError, TicketTracker,
11    errors::HoprProtocolError,
12};
13
14/// Maximum number of acknowledgements that can be packed into a single HOPR packet.
15///
16/// Currently, the [`HoprPacket::PAYLOAD_SIZE`] minus 16-bit acknowledgement batch size counter
17/// divided by [`Acknowledgement::SIZE`].
18pub const MAX_ACKNOWLEDGEMENTS_BATCH_SIZE: usize =
19    (HoprPacket::PAYLOAD_SIZE - size_of::<u16>()) / Acknowledgement::SIZE;
20
21/// Default [encoder](PacketEncoder) implementation for HOPR packets.
22pub struct HoprEncoder<Chain, S, T> {
23    chain_api: Chain,
24    surb_store: S,
25    tracker: T,
26    chain_key: ChainKeypair,
27    channels_dst: Hash,
28    cfg: HoprCodecConfig,
29}
30
31impl<Chain, S, T> HoprEncoder<Chain, S, T> {
32    /// Creates a new instance of the encoder.
33    pub fn new(
34        chain_key: ChainKeypair,
35        chain_api: Chain,
36        surb_store: S,
37        tracker: T,
38        channels_dst: Hash,
39        cfg: HoprCodecConfig,
40    ) -> Self {
41        Self {
42            chain_api,
43            surb_store,
44            tracker,
45            chain_key,
46            channels_dst,
47            cfg,
48        }
49    }
50}
51
52impl<Chain, S, T> HoprEncoder<Chain, S, T>
53where
54    Chain: ChainKeyOperations + ChainReadChannelOperations + ChainValues + Sync,
55    S: SurbStore,
56    T: TicketTracker + Sync,
57{
58    async fn encode_packet_internal<D: AsRef<[u8]> + Send + 'static, Sig: Into<PacketSignals> + Send + 'static>(
59        &self,
60        next_peer: OffchainPublicKey,
61        data: D,
62        num_hops: usize,
63        signals: Sig,
64        routing: PacketRouting<ValidatedPath>,
65        pseudonym: HoprPseudonym,
66    ) -> Result<OutgoingPacket, HoprProtocolError> {
67        let next_peer = self
68            .chain_api
69            .packet_key_to_chain_key(&next_peer)
70            .await
71            .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
72            .ok_or(HoprProtocolError::KeyNotFound)?;
73
74        // Decide whether to create a multi-hop or a zero-hop ticket
75        let next_ticket = if num_hops > 1 {
76            let channel = self
77                .chain_api
78                .channel_by_parties(self.chain_key.as_ref(), &next_peer)
79                .await
80                .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
81                .ok_or_else(|| HoprProtocolError::ChannelNotFound(*self.chain_key.as_ref(), next_peer))?;
82
83            let (outgoing_ticket_win_prob, outgoing_ticket_price) = self
84                .chain_api
85                .outgoing_ticket_values(self.cfg.outgoing_win_prob, self.cfg.outgoing_ticket_price)
86                .await
87                .map_err(|e| HoprProtocolError::ResolverError(e.into()))?;
88
89            self.tracker
90                .create_multihop_ticket(
91                    &channel,
92                    num_hops as u8,
93                    outgoing_ticket_win_prob,
94                    outgoing_ticket_price,
95                )
96                .await
97                .map_err(|e| match e {
98                    TicketCreationError::OutOfFunds(id, a) => HoprProtocolError::OutOfFunds(id, a),
99                    e => HoprProtocolError::TicketTrackerError(e.into()),
100                })?
101        } else {
102            TicketBuilder::zero_hop().counterparty(next_peer)
103        };
104
105        // Construct the outgoing packet
106        let chain_key = self.chain_key.clone();
107        let mapper = self.chain_api.key_id_mapper_ref().clone();
108        let domain_separator = self.channels_dst;
109        let (packet, openers) = hopr_parallelize::cpu::spawn_fifo_blocking(
110            move || {
111                HoprPacket::into_outgoing(
112                    data.as_ref(),
113                    &pseudonym,
114                    routing,
115                    &chain_key,
116                    next_ticket,
117                    &mapper,
118                    &domain_separator,
119                    signals,
120                )
121            },
122            "packet_encode",
123        )
124        .await??;
125
126        // Store the reply openers under the given SenderId
127        // This is a no-op for reply packets
128        openers.into_iter().for_each(|(surb_id, opener)| {
129            self.surb_store
130                .insert_reply_opener(HoprSenderId::from_pseudonym_and_id(&pseudonym, surb_id), opener);
131        });
132
133        let out = packet.try_as_outgoing().ok_or(HoprProtocolError::InvalidState(
134            "cannot send out packet that is not outgoing",
135        ))?;
136
137        let mut transport_payload = Vec::with_capacity(HoprPacket::SIZE);
138        transport_payload.extend_from_slice(out.packet.as_ref());
139        transport_payload.extend_from_slice(&out.ticket.into_encoded());
140
141        Ok(OutgoingPacket {
142            next_hop: out.next_hop,
143            ack_challenge: out.ack_challenge,
144            data: transport_payload.into_boxed_slice(),
145        })
146    }
147}
148
149#[async_trait::async_trait]
150impl<Chain, S, T> PacketEncoder for HoprEncoder<Chain, S, T>
151where
152    Chain: ChainKeyOperations + ChainReadChannelOperations + ChainValues + Send + Sync,
153    S: SurbStore + Send + Sync,
154    T: TicketTracker + Send + Sync,
155{
156    type Error = HoprProtocolError;
157
158    #[tracing::instrument(skip_all, level = "trace")]
159    async fn encode_packet<D: AsRef<[u8]> + Send + 'static, Sig: Into<PacketSignals> + Send + 'static>(
160        &self,
161        data: D,
162        routing: ResolvedTransportRouting,
163        signals: Sig,
164    ) -> Result<OutgoingPacket, Self::Error> {
165        // Get necessary packet routing values
166        let (next_peer, num_hops, pseudonym, routing) = match routing {
167            ResolvedTransportRouting::Forward {
168                pseudonym,
169                forward_path,
170                return_paths,
171            } => (
172                forward_path[0],
173                forward_path.num_hops(),
174                pseudonym,
175                PacketRouting::ForwardPath {
176                    forward_path,
177                    return_paths,
178                },
179            ),
180            ResolvedTransportRouting::Return(sender_id, surb) => {
181                let next = self
182                    .chain_api
183                    .key_id_mapper_ref()
184                    .map_id_to_public(&surb.first_relayer)
185                    .ok_or(HoprProtocolError::KeyNotFound)?;
186
187                (
188                    next,
189                    surb.additional_data_receiver.proof_of_relay_values().chain_length() as usize,
190                    sender_id.pseudonym(),
191                    PacketRouting::Surb(sender_id.surb_id(), surb),
192                )
193            }
194        };
195
196        tracing::trace!(len = data.as_ref().len(), "encoding packet");
197        self.encode_packet_internal(next_peer, data, num_hops, signals, routing, pseudonym)
198            .in_current_span()
199            .await
200    }
201
202    #[tracing::instrument(skip_all, level = "trace", fields(destination = destination.to_peerid_str()))]
203    async fn encode_acknowledgements(
204        &self,
205        acks: &[VerifiedAcknowledgement],
206        destination: &OffchainPublicKey,
207    ) -> Result<OutgoingPacket, Self::Error> {
208        tracing::trace!(num_acks = acks.len(), "encoding acknowledgements");
209
210        let mut all_acks = Vec::<u8>::with_capacity(size_of::<u16>() + acks.len() * Acknowledgement::SIZE);
211        all_acks.extend((acks.len() as u16).to_be_bytes());
212        acks.iter().for_each(|ack| all_acks.extend(ack.leak().as_ref()));
213
214        self.encode_packet_internal(
215            *destination,
216            all_acks,
217            0,
218            None,
219            PacketRouting::NoAck(*destination),
220            HoprPseudonym::random(),
221        )
222        .in_current_span()
223        .await
224    }
225}