hopr_db_sql/
protocol.rs

1use async_trait::async_trait;
2use hopr_crypto_packet::chain::ChainPacketComponents;
3use hopr_crypto_packet::validation::validate_unacknowledged_ticket;
4use hopr_crypto_types::prelude::*;
5use hopr_db_api::errors::Result;
6use hopr_db_api::protocol::{
7    AckResult, HoprDbProtocolOperations, ResolvedAcknowledgement, TransportPacketWithChainData,
8};
9use hopr_db_api::resolver::HoprDbResolverOperations;
10use hopr_internal_types::prelude::*;
11use hopr_primitive_types::prelude::*;
12use std::ops::{Mul, Sub};
13use tracing::{instrument, trace, warn};
14
15use crate::channels::HoprDbChannelOperations;
16use crate::db::HoprDb;
17use crate::errors::DbSqlError;
18use crate::info::HoprDbInfoOperations;
19use crate::prelude::HoprDbTicketOperations;
20use crate::{HoprDbGeneralModelOperations, OptTx};
21
22use hopr_parallelize::cpu::spawn_fifo_blocking;
23
24#[cfg(all(feature = "prometheus", not(test)))]
25lazy_static::lazy_static! {
26    static ref METRIC_INCOMING_WIN_PROB: hopr_metrics::SimpleHistogram =
27        hopr_metrics::SimpleHistogram::new(
28            "hopr_tickets_incoming_win_probability",
29            "Observes the winning probabilities on incoming tickets",
30            vec![0.0, 0.0001, 0.001, 0.01, 0.05, 0.1, 0.15, 0.25, 0.3, 0.5],
31        ).unwrap();
32}
33
34#[async_trait]
35impl HoprDbProtocolOperations for HoprDb {
36    #[instrument(level = "trace", skip(self, ack, me), ret)]
37    async fn handle_acknowledgement(&self, ack: Acknowledgement, me: &ChainKeypair) -> Result<AckResult> {
38        let myself = self.clone();
39        let me_ckp = me.clone();
40
41        let result = self
42            .begin_transaction()
43            .await?
44            .perform(|tx| {
45                Box::pin(async move {
46                    match myself
47                        .caches
48                        .unacked_tickets
49                        .remove(&ack.ack_challenge())
50                        .await
51                        .ok_or_else(|| {
52                            DbSqlError::AcknowledgementValidationError(format!(
53                                "received unexpected acknowledgement for half key challenge {} - half key {}",
54                                ack.ack_challenge().to_hex(),
55                                ack.ack_key_share.to_hex()
56                            ))
57                        })? {
58                        PendingAcknowledgement::WaitingAsSender => {
59                            trace!("received acknowledgement as sender: first relayer has processed the packet");
60
61                            Ok(ResolvedAcknowledgement::Sending(ack.ack_challenge()))
62                        }
63
64                        PendingAcknowledgement::WaitingAsRelayer(unacknowledged) => {
65                            if myself
66                                .get_channel_by_parties(
67                                    Some(tx),
68                                    unacknowledged.ticket.verified_issuer(),
69                                    &myself.me_onchain,
70                                    true,
71                                )
72                                .await?
73                                .is_some_and(|c| {
74                                    c.channel_epoch.as_u32() != unacknowledged.verified_ticket().channel_epoch
75                                })
76                            {
77                                return Err(DbSqlError::LogicalError(format!(
78                                    "no channel found for  address '{}'",
79                                    unacknowledged.ticket.verified_issuer()
80                                )));
81                            }
82
83                            let domain_separator = myself
84                                .get_indexer_data(Some(tx))
85                                .await?
86                                .channels_dst
87                                .ok_or_else(|| DbSqlError::LogicalError("domain separator missing".into()))?;
88
89                            hopr_parallelize::cpu::spawn_blocking(move || {
90                                // This explicitly checks whether the acknowledgement
91                                // solves the challenge on the ticket. It must be done before we
92                                // check that the ticket is winning, which is a lengthy operation
93                                // and should not be done for bogus unacknowledged tickets
94                                let ack_ticket = unacknowledged.acknowledge(&ack.ack_key_share)?;
95
96                                if ack_ticket.is_winning(&me_ckp, &domain_separator) {
97                                    trace!("Found a winning ticket");
98                                    Ok(ResolvedAcknowledgement::RelayingWin(ack_ticket))
99                                } else {
100                                    trace!("Found a losing ticket");
101                                    Ok(ResolvedAcknowledgement::RelayingLoss(
102                                        ack_ticket.ticket.verified_ticket().channel_id,
103                                    ))
104                                }
105                            })
106                            .await
107                        }
108                    }
109                })
110            })
111            .await?;
112
113        match &result {
114            ResolvedAcknowledgement::RelayingWin(ack_ticket) => {
115                self.ticket_manager.insert_ticket(ack_ticket.clone()).await?;
116
117                #[cfg(all(feature = "prometheus", not(test)))]
118                {
119                    let verified_ticket = ack_ticket.ticket.verified_ticket();
120                    let channel = verified_ticket.channel_id.to_string();
121                    crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
122                        &[&channel, "unredeemed"],
123                        self.ticket_manager
124                            .unrealized_value(hopr_db_api::tickets::TicketSelector::new(
125                                verified_ticket.channel_id,
126                                verified_ticket.channel_epoch,
127                            ))
128                            .await?
129                            .amount()
130                            .as_u128() as f64,
131                    );
132                    crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS
133                        .increment(&[&channel, "winning_count"], 1.0f64);
134                }
135            }
136            ResolvedAcknowledgement::RelayingLoss(_channel) => {
137                #[cfg(all(feature = "prometheus", not(test)))]
138                {
139                    crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS
140                        .increment(&[&_channel.to_string(), "losing_count"], 1.0f64);
141                }
142            }
143            _ => {}
144        };
145
146        Ok(result.into())
147    }
148
149    async fn get_network_winning_probability(&self) -> Result<f64> {
150        Ok(self
151            .get_indexer_data(None)
152            .await
153            .map(|data| data.minimum_incoming_ticket_winning_prob)?)
154    }
155
156    async fn get_network_ticket_price(&self) -> Result<Balance> {
157        Ok(self.get_indexer_data(None).await.and_then(|data| {
158            data.ticket_price
159                .ok_or(DbSqlError::LogicalError("missing ticket price".into()))
160        })?)
161    }
162
163    #[tracing::instrument(level = "trace", skip(self, data, me, path))]
164    async fn to_send(
165        &self,
166        data: Box<[u8]>,
167        me: ChainKeypair,
168        path: Vec<OffchainPublicKey>,
169        outgoing_ticket_win_prob: f64,
170        outgoing_ticket_price: Balance,
171    ) -> Result<TransportPacketWithChainData> {
172        let myself = self.clone();
173        let next_peer = myself.resolve_chain_key(&path[0]).await?.ok_or_else(|| {
174            DbSqlError::LogicalError(format!(
175                "failed to find chain key for packet key {} on previous hop",
176                path[0].to_peerid_str()
177            ))
178        })?;
179
180        let components =
181            self.begin_transaction()
182                .await?
183                .perform(|tx| {
184                    Box::pin(async move {
185                        let domain_separator =
186                            myself.get_indexer_data(Some(tx)).await?.channels_dst.ok_or_else(|| {
187                                DbSqlError::LogicalError("failed to fetch the domain separator".into())
188                            })?;
189
190                        // Decide whether to create a 0-hop or multihop ticket
191                        let next_ticket = if path.len() == 1 {
192                            TicketBuilder::zero_hop().direction(&myself.me_onchain, &next_peer)
193                        } else {
194                            myself
195                                .create_multihop_ticket(
196                                    Some(tx),
197                                    me.public().to_address(),
198                                    next_peer,
199                                    path.len() as u8,
200                                    outgoing_ticket_win_prob,
201                                    outgoing_ticket_price,
202                                )
203                                .await?
204                        };
205
206                        spawn_fifo_blocking(move || {
207                            ChainPacketComponents::into_outgoing(&data, &path, &me, next_ticket, &domain_separator)
208                                .map_err(|e| {
209                                    DbSqlError::LogicalError(format!(
210                                        "failed to construct chain components for a packet: {e}"
211                                    ))
212                                })
213                        })
214                        .await
215                    })
216                })
217                .await?;
218
219        match components {
220            ChainPacketComponents::Final { .. } | ChainPacketComponents::Forwarded { .. } => {
221                Err(DbSqlError::LogicalError("Must contain an outgoing packet type".into()).into())
222            }
223            ChainPacketComponents::Outgoing {
224                packet,
225                ticket,
226                next_hop,
227                ack_challenge,
228            } => {
229                self.caches
230                    .unacked_tickets
231                    .insert(ack_challenge, PendingAcknowledgement::WaitingAsSender)
232                    .await;
233
234                let payload = spawn_fifo_blocking(move || {
235                    let mut payload = Vec::with_capacity(ChainPacketComponents::SIZE);
236                    payload.extend_from_slice(packet.as_ref());
237
238                    let ticket_bytes: [u8; Ticket::SIZE] = ticket.into();
239                    payload.extend_from_slice(ticket_bytes.as_ref());
240
241                    payload.into_boxed_slice()
242                })
243                .await;
244
245                Ok(TransportPacketWithChainData::Outgoing {
246                    next_hop,
247                    ack_challenge,
248                    data: payload,
249                })
250            }
251        }
252    }
253
254    #[tracing::instrument(level = "trace", skip(self, data, me, pkt_keypair, sender), fields(sender = %sender))]
255    async fn from_recv(
256        &self,
257        data: Box<[u8]>,
258        me: ChainKeypair,
259        pkt_keypair: &OffchainKeypair,
260        sender: OffchainPublicKey,
261        outgoing_ticket_win_prob: f64,
262        outgoing_ticket_price: Balance,
263    ) -> Result<TransportPacketWithChainData> {
264        let offchain_keypair = pkt_keypair.clone();
265
266        let packet = spawn_fifo_blocking(move || {
267            ChainPacketComponents::from_incoming(&data, &offchain_keypair, sender)
268                .map_err(|e| DbSqlError::LogicalError(format!("failed to construct an incoming packet: {e}")))
269        })
270        .await?;
271
272        match packet {
273            ChainPacketComponents::Final {
274                packet_tag,
275                ack_key,
276                previous_hop,
277                plain_text,
278                ..
279            } => {
280                let offchain_keypair = pkt_keypair.clone();
281                let ack = spawn_fifo_blocking(move || Acknowledgement::new(ack_key, &offchain_keypair)).await;
282
283                Ok(TransportPacketWithChainData::Final {
284                    packet_tag,
285                    previous_hop,
286                    plain_text,
287                    ack,
288                })
289            }
290            ChainPacketComponents::Forwarded {
291                packet,
292                ticket,
293                ack_challenge,
294                packet_tag,
295                ack_key,
296                previous_hop,
297                own_key,
298                next_hop,
299                next_challenge,
300                path_pos,
301            } => {
302                let myself = self.clone();
303                let previous_hop_addr = myself.resolve_chain_key(&previous_hop).await?.ok_or_else(|| {
304                    DbSqlError::LogicalError(format!(
305                        "failed to find channel key for packet key {} on previous hop",
306                        previous_hop.to_peerid_str()
307                    ))
308                })?;
309
310                let next_hop_addr = myself.resolve_chain_key(&next_hop).await?.ok_or_else(|| {
311                    DbSqlError::LogicalError(format!(
312                        "failed to find channel key for packet key {} on next hop",
313                        next_hop.to_peerid_str()
314                    ))
315                })?;
316
317                let verified_ticket = match self
318                    .begin_transaction()
319                    .await?
320                    .perform(|tx| {
321                        Box::pin(async move {
322                            let chain_data = myself.get_indexer_data(Some(tx)).await?;
323
324                            let channel = myself
325                                .get_channel_by_parties(Some(tx), &previous_hop_addr, &myself.me_onchain, true)
326                                .await?
327                                .ok_or_else(|| {
328                                    DbSqlError::LogicalError(format!(
329                                        "no channel found for previous hop address '{previous_hop_addr}'"
330                                    ))
331                                })?;
332
333                            let remaining_balance = channel
334                                .balance
335                                .sub(myself.ticket_manager.unrealized_value((&channel).into()).await?);
336
337                            let domain_separator = chain_data.channels_dst.ok_or_else(|| {
338                                DbSqlError::LogicalError("failed to fetch the domain separator".into())
339                            })?;
340
341                            // The ticket price from the oracle times my node's position on the
342                            // path is the acceptable minimum
343                            let minimum_ticket_price = chain_data
344                                .ticket_price
345                                .ok_or_else(|| DbSqlError::LogicalError("failed to fetch the ticket price".into()))?
346                                .mul(U256::from(path_pos));
347
348                            #[cfg(all(feature = "prometheus", not(test)))]
349                            METRIC_INCOMING_WIN_PROB.observe(ticket.win_prob());
350
351                            // Here also the signature on the ticket gets validated,
352                            // so afterward we are sure the source of the `channel`
353                            // (which is equal to `previous_hop_addr`) has issued this
354                            // ticket.
355                            let ticket = spawn_fifo_blocking(move || {
356                                validate_unacknowledged_ticket(
357                                    ticket,
358                                    &channel,
359                                    minimum_ticket_price,
360                                    chain_data.minimum_incoming_ticket_winning_prob,
361                                    remaining_balance,
362                                    &domain_separator,
363                                )
364                            })
365                            .await?;
366
367                            myself.increment_outgoing_ticket_index(channel.get_id()).await?;
368
369                            myself
370                                .caches
371                                .unacked_tickets
372                                .insert(
373                                    ack_challenge,
374                                    PendingAcknowledgement::WaitingAsRelayer(
375                                        ticket.clone().into_unacknowledged(own_key),
376                                    ),
377                                )
378                                .await;
379
380                            // NOTE: that the path position according to the ticket value
381                            // may no longer match the path position from the packet header,
382                            // because the price of the ticket may be set higher be the ticket
383                            // issuer.
384
385                            // Create the next ticket for the packet
386                            let ticket_builder = if path_pos == 1 {
387                                TicketBuilder::zero_hop().direction(&myself.me_onchain, &next_hop_addr)
388                            } else {
389                                // We currently take the maximum of the win prob on the ticket
390                                // and the one configured on this node.
391                                // Therefore, the winning probability can only increase on the path.
392                                myself
393                                    .create_multihop_ticket(
394                                        Some(tx),
395                                        myself.me_onchain,
396                                        next_hop_addr,
397                                        path_pos,
398                                        outgoing_ticket_win_prob.max(ticket.win_prob()),
399                                        outgoing_ticket_price,
400                                    )
401                                    .await?
402                            };
403
404                            // TODO: benchmark this to confirm, offload a CPU intensive task off the async executor onto a parallelized thread pool
405                            let ticket = spawn_fifo_blocking(move || {
406                                ticket_builder
407                                    .challenge(next_challenge.to_ethereum_challenge())
408                                    .build_signed(&me, &domain_separator)
409                            })
410                            .await?;
411
412                            // forward packet
413                            Ok(ticket)
414                        })
415                    })
416                    .await
417                {
418                    Ok(ticket) => Ok(ticket),
419                    Err(DbSqlError::TicketValidationError(boxed_error)) => {
420                        let (rejected_ticket, error) = *boxed_error;
421                        let rejected_value = rejected_ticket.amount;
422                        warn!(?rejected_ticket, %rejected_value, erorr = ?error, "failure to validate during forwarding");
423
424                        self.mark_unsaved_ticket_rejected(&rejected_ticket).await.map_err(|e| {
425                            DbSqlError::TicketValidationError(Box::new((
426                                rejected_ticket.clone(),
427                                format!("during validation error '{error}' update another error occurred: {e}"),
428                            )))
429                        })?;
430
431                        Err(DbSqlError::TicketValidationError(Box::new((rejected_ticket, error))))
432                    }
433                    Err(e) => Err(e),
434                }?;
435
436                let offchain_keypair = pkt_keypair.clone();
437                let (ack, payload) = spawn_fifo_blocking(move || {
438                    let ack = Acknowledgement::new(ack_key, &offchain_keypair);
439
440                    let mut payload = Vec::with_capacity(ChainPacketComponents::SIZE);
441                    payload.extend_from_slice(packet.as_ref());
442
443                    let ticket_bytes = verified_ticket.leak().into_encoded();
444                    payload.extend_from_slice(ticket_bytes.as_ref());
445
446                    (ack, payload.into_boxed_slice())
447                })
448                .await;
449
450                Ok(TransportPacketWithChainData::Forwarded {
451                    packet_tag,
452                    previous_hop,
453                    next_hop,
454                    data: payload,
455                    ack,
456                })
457            }
458            ChainPacketComponents::Outgoing { .. } => {
459                Err(DbSqlError::LogicalError("Cannot receive an outgoing packet".into()).into())
460            }
461        }
462    }
463}
464
465impl HoprDb {
466    async fn create_multihop_ticket<'a>(
467        &'a self,
468        tx: OptTx<'a>,
469        me_onchain: Address,
470        destination: Address,
471        current_path_pos: u8,
472        winning_prob: f64,
473        ticket_price: Balance,
474    ) -> crate::errors::Result<TicketBuilder> {
475        let channel = self
476            .get_channel_by_parties(tx, &me_onchain, &destination, true)
477            .await?
478            .ok_or(DbSqlError::LogicalError(format!(
479                "channel to '{destination}' not found",
480            )))?;
481
482        // The next ticket is worth: price * remaining hop count / winning probability
483        let amount = Balance::new(
484            ticket_price
485                .amount()
486                .mul(U256::from(current_path_pos - 1))
487                .div_f64(winning_prob)
488                .map_err(|e| {
489                    DbSqlError::LogicalError(format!(
490                        "winning probability outside of the allowed interval (0.0, 1.0]: {e}"
491                    ))
492                })?,
493            BalanceType::HOPR,
494        );
495
496        if channel.balance.lt(&amount) {
497            return Err(DbSqlError::LogicalError(format!(
498                "out of funds: {} with counterparty {destination} has balance {} < {amount}",
499                channel.get_id(),
500                channel.balance
501            )));
502        }
503
504        let ticket_builder = TicketBuilder::default()
505            .direction(&me_onchain, &destination)
506            .balance(amount)
507            .index(self.increment_outgoing_ticket_index(channel.get_id()).await?)
508            .index_offset(1) // unaggregated always have index_offset == 1
509            .win_prob(winning_prob)
510            .channel_epoch(channel.channel_epoch.as_u32());
511
512        Ok(ticket_builder)
513    }
514}