hopr_protocol_hopr/codec/
encoder.rs1use hopr_api::chain::*;
2use hopr_crypto_packet::prelude::*;
3use hopr_crypto_types::{crypto_traits::Randomizable, prelude::*};
4use hopr_internal_types::prelude::*;
5use hopr_network_types::prelude::*;
6use hopr_primitive_types::prelude::*;
7use tracing::Instrument;
8
9use crate::{
10 HoprCodecConfig, OutgoingPacket, PacketEncoder, SurbStore, TicketCreationError, TicketTracker,
11 errors::HoprProtocolError,
12};
13
14pub const MAX_ACKNOWLEDGEMENTS_BATCH_SIZE: usize =
19 (HoprPacket::PAYLOAD_SIZE - size_of::<u16>()) / Acknowledgement::SIZE;
20
21pub struct HoprEncoder<Chain, S, T> {
23 chain_api: Chain,
24 surb_store: S,
25 tracker: T,
26 chain_key: ChainKeypair,
27 channels_dst: Hash,
28 cfg: HoprCodecConfig,
29}
30
31impl<Chain, S, T> HoprEncoder<Chain, S, T> {
32 pub fn new(
34 chain_key: ChainKeypair,
35 chain_api: Chain,
36 surb_store: S,
37 tracker: T,
38 channels_dst: Hash,
39 cfg: HoprCodecConfig,
40 ) -> Self {
41 Self {
42 chain_api,
43 surb_store,
44 tracker,
45 chain_key,
46 channels_dst,
47 cfg,
48 }
49 }
50}
51
52impl<Chain, S, T> HoprEncoder<Chain, S, T>
53where
54 Chain: ChainKeyOperations + ChainReadChannelOperations + ChainValues + Sync,
55 S: SurbStore,
56 T: TicketTracker + Sync,
57{
58 async fn encode_packet_internal<D: AsRef<[u8]> + Send + 'static, Sig: Into<PacketSignals> + Send + 'static>(
59 &self,
60 next_peer: OffchainPublicKey,
61 data: D,
62 num_hops: usize,
63 signals: Sig,
64 routing: PacketRouting<ValidatedPath>,
65 pseudonym: HoprPseudonym,
66 ) -> Result<OutgoingPacket, HoprProtocolError> {
67 let next_peer = self
68 .chain_api
69 .packet_key_to_chain_key(&next_peer)
70 .await
71 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
72 .ok_or(HoprProtocolError::KeyNotFound)?;
73
74 let next_ticket = if num_hops > 1 {
76 let channel = self
77 .chain_api
78 .channel_by_parties(self.chain_key.as_ref(), &next_peer)
79 .await
80 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
81 .ok_or_else(|| HoprProtocolError::ChannelNotFound(*self.chain_key.as_ref(), next_peer))?;
82
83 let (outgoing_ticket_win_prob, outgoing_ticket_price) = self
84 .chain_api
85 .outgoing_ticket_values(self.cfg.outgoing_win_prob, self.cfg.outgoing_ticket_price)
86 .await
87 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?;
88
89 self.tracker
90 .create_multihop_ticket(
91 &channel,
92 num_hops as u8,
93 outgoing_ticket_win_prob,
94 outgoing_ticket_price,
95 )
96 .await
97 .map_err(|e| match e {
98 TicketCreationError::OutOfFunds(id, a) => HoprProtocolError::OutOfFunds(id, a),
99 e => HoprProtocolError::TicketTrackerError(e.into()),
100 })?
101 } else {
102 TicketBuilder::zero_hop().counterparty(next_peer)
103 };
104
105 let chain_key = self.chain_key.clone();
107 let mapper = self.chain_api.key_id_mapper_ref().clone();
108 let domain_separator = self.channels_dst;
109 let (packet, openers) = hopr_parallelize::cpu::spawn_fifo_blocking(
110 move || {
111 HoprPacket::into_outgoing(
112 data.as_ref(),
113 &pseudonym,
114 routing,
115 &chain_key,
116 next_ticket,
117 &mapper,
118 &domain_separator,
119 signals,
120 )
121 },
122 "packet_encode",
123 )
124 .await??;
125
126 openers.into_iter().for_each(|(surb_id, opener)| {
129 self.surb_store
130 .insert_reply_opener(HoprSenderId::from_pseudonym_and_id(&pseudonym, surb_id), opener);
131 });
132
133 let out = packet.try_as_outgoing().ok_or(HoprProtocolError::InvalidState(
134 "cannot send out packet that is not outgoing",
135 ))?;
136
137 let mut transport_payload = Vec::with_capacity(HoprPacket::SIZE);
138 transport_payload.extend_from_slice(out.packet.as_ref());
139 transport_payload.extend_from_slice(&out.ticket.into_encoded());
140
141 Ok(OutgoingPacket {
142 next_hop: out.next_hop,
143 ack_challenge: out.ack_challenge,
144 data: transport_payload.into_boxed_slice(),
145 })
146 }
147}
148
149#[async_trait::async_trait]
150impl<Chain, S, T> PacketEncoder for HoprEncoder<Chain, S, T>
151where
152 Chain: ChainKeyOperations + ChainReadChannelOperations + ChainValues + Send + Sync,
153 S: SurbStore + Send + Sync,
154 T: TicketTracker + Send + Sync,
155{
156 type Error = HoprProtocolError;
157
158 #[tracing::instrument(skip_all, level = "trace")]
159 async fn encode_packet<D: AsRef<[u8]> + Send + 'static, Sig: Into<PacketSignals> + Send + 'static>(
160 &self,
161 data: D,
162 routing: ResolvedTransportRouting,
163 signals: Sig,
164 ) -> Result<OutgoingPacket, Self::Error> {
165 let (next_peer, num_hops, pseudonym, routing) = match routing {
167 ResolvedTransportRouting::Forward {
168 pseudonym,
169 forward_path,
170 return_paths,
171 } => (
172 forward_path[0],
173 forward_path.num_hops(),
174 pseudonym,
175 PacketRouting::ForwardPath {
176 forward_path,
177 return_paths,
178 },
179 ),
180 ResolvedTransportRouting::Return(sender_id, surb) => {
181 let next = self
182 .chain_api
183 .key_id_mapper_ref()
184 .map_id_to_public(&surb.first_relayer)
185 .ok_or(HoprProtocolError::KeyNotFound)?;
186
187 (
188 next,
189 surb.additional_data_receiver.proof_of_relay_values().chain_length() as usize,
190 sender_id.pseudonym(),
191 PacketRouting::Surb(sender_id.surb_id(), surb),
192 )
193 }
194 };
195
196 tracing::trace!(len = data.as_ref().len(), "encoding packet");
197 self.encode_packet_internal(next_peer, data, num_hops, signals, routing, pseudonym)
198 .in_current_span()
199 .await
200 }
201
202 #[tracing::instrument(skip_all, level = "trace", fields(destination = destination.to_peerid_str()))]
203 async fn encode_acknowledgements(
204 &self,
205 acks: &[VerifiedAcknowledgement],
206 destination: &OffchainPublicKey,
207 ) -> Result<OutgoingPacket, Self::Error> {
208 tracing::trace!(num_acks = acks.len(), "encoding acknowledgements");
209
210 let mut all_acks = Vec::<u8>::with_capacity(size_of::<u16>() + acks.len() * Acknowledgement::SIZE);
211 all_acks.extend((acks.len() as u16).to_be_bytes());
212 acks.iter().for_each(|ack| all_acks.extend(ack.leak().as_ref()));
213
214 self.encode_packet_internal(
215 *destination,
216 all_acks,
217 0,
218 None,
219 PacketRouting::NoAck(*destination),
220 HoprPseudonym::random(),
221 )
222 .in_current_span()
223 .await
224 }
225}