Skip to main content

hopr_transport/protocol/pipeline/
config.rs

1//! Configuration structures for the HOPR packet processing pipeline.
2
3use validator::{Validate, ValidationError, ValidationErrors};
4
5fn default_ack_buffer_interval() -> std::time::Duration {
6    std::time::Duration::from_millis(200)
7}
8
9fn default_ack_grouping_capacity() -> usize {
10    5
11}
12
13fn default_ticket_ack_buffer_size() -> usize {
14    50_000
15}
16
17fn default_ack_out_buffer_size() -> usize {
18    50_000
19}
20
21/// Configuration for the acknowledgement processing pipeline.
22#[derive(Debug, Copy, Clone, smart_default::SmartDefault, Eq, PartialEq)]
23#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
24pub struct AcknowledgementPipelineConfig {
25    /// Interval for which to wait to buffer acknowledgements before sending them out.
26    ///
27    /// Default is 200 ms.
28    #[default(default_ack_buffer_interval())]
29    #[cfg_attr(
30        feature = "serde",
31        serde(default = "default_ack_buffer_interval", with = "humantime_serde")
32    )]
33    pub ack_buffer_interval: std::time::Duration,
34    /// Initial capacity when grouping outgoing acknowledgements.
35    ///
36    /// If set too low, it causes additional reallocations in the outgoing acknowledgement processing pipeline.
37    /// The value should grow if `ack_buffer_interval` grows.
38    ///
39    /// Default is 5.
40    #[default(default_ack_grouping_capacity())]
41    #[cfg_attr(feature = "serde", serde(default = "default_ack_grouping_capacity"))]
42    pub ack_grouping_capacity: usize,
43    /// Capacity of the `incoming_ack` MPSC channel carrying received acknowledgements
44    /// to the ticket-ack processing pipeline.
45    ///
46    /// The previous hardcoded value of 1_000_000 pre-allocated ~MBs of ring buffer per node even
47    /// though real-world throughput rarely saturates more than a few thousand entries. Let the
48    /// 50 ms sink timeouts (`QUEUE_SEND_TIMEOUT`) propagate backpressure instead.
49    ///
50    /// The default is 50 000.
51    #[default(default_ticket_ack_buffer_size())]
52    #[cfg_attr(feature = "serde", serde(default = "default_ticket_ack_buffer_size"))]
53    pub ticket_ack_buffer_size: usize,
54    /// Capacity of the `outgoing_ack` MPSC channel carrying acknowledgements to be sent back
55    /// to the previous hop.
56    ///
57    /// The default is 50 000. See [`ticket_ack_buffer_size`](Self::ticket_ack_buffer_size) for the
58    /// rationale on why this is smaller than the original hardcoded 1_000_000.
59    #[default(default_ack_out_buffer_size())]
60    #[cfg_attr(feature = "serde", serde(default = "default_ack_out_buffer_size"))]
61    pub ack_out_buffer_size: usize,
62    /// Maximum concurrency when processing incoming (received) acknowledgements.
63    ///
64    /// `None` or `Some(0)` both fall back to a default of 10.
65    pub ack_input_concurrency: Option<usize>,
66    /// Maximum concurrency when processing outgoing (sent-back) acknowledgements.
67    ///
68    /// `None` or `Some(0)` both fall back to a default of 10.
69    pub ack_output_concurrency: Option<usize>,
70}
71
72// Requires manual implementation due to https://github.com/Keats/validator/issues/285
73impl Validate for AcknowledgementPipelineConfig {
74    fn validate(&self) -> Result<(), ValidationErrors> {
75        let mut errors = ValidationErrors::new();
76        if self.ack_grouping_capacity == 0 {
77            errors.add("ack_grouping_capacity", ValidationError::new("must be greater than 0"));
78        }
79        if self.ack_buffer_interval < std::time::Duration::from_millis(10) {
80            errors.add("ack_buffer_interval", ValidationError::new("must be at least 10 ms"));
81        }
82        if self.ticket_ack_buffer_size == 0 {
83            errors.add("ticket_ack_buffer_size", ValidationError::new("must be greater than 0"));
84        }
85        if self.ack_out_buffer_size == 0 {
86            errors.add("ack_out_buffer_size", ValidationError::new("must be greater than 0"));
87        }
88        if errors.is_empty() { Ok(()) } else { Err(errors) }
89    }
90}
91
92/// Overall configuration of the input/output packet processing pipeline.
93#[derive(Clone, Copy, Debug, Default, PartialEq, Validate)]
94#[cfg_attr(
95    feature = "serde",
96    derive(serde::Serialize, serde::Deserialize),
97    serde(deny_unknown_fields)
98)]
99pub struct PacketPipelineConfig {
100    /// Maximum concurrency when processing outgoing packets.
101    ///
102    /// `None` or `Some(0)` both fall back to the default (available parallelism * 8).
103    pub output_concurrency: Option<usize>,
104    /// Maximum concurrency when processing incoming packets.
105    ///
106    /// `None` or `Some(0)` both fall back to the default (available parallelism * 8).
107    pub input_concurrency: Option<usize>,
108    /// Configuration of the packet acknowledgement processing
109    #[validate(nested)]
110    pub ack_config: AcknowledgementPipelineConfig,
111}