Skip to main content

hopr_transport_session/balancer/
mod.rs

1mod controller;
2/// Contains implementation of the [`SurbBalancerController`] trait using a Proportional Integral Derivative (PID)
3/// controller.
4pub mod pid;
5#[allow(dead_code)]
6mod rate_limiting;
7/// Contains a simple proportional output implementation of the [`SurbBalancerController`] trait.
8pub mod simple;
9
10pub use controller::{BalancerStateValues, SurbBalancer, SurbBalancerConfig};
11pub use rate_limiting::{RateController, RateLimitSinkExt, RateLimitStreamExt};
12
13/// Smallest possible interval for balancer sampling.
14pub const MIN_BALANCER_SAMPLING_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
15
16/// Allows estimating the flow of SURBs in a Session (production or consumption).
17pub trait SurbFlowEstimator {
18    /// Estimates the number of SURBs consumed.
19    ///
20    /// Value returned on each call must be equal or greater to the value returned by a previous call.
21    fn estimate_surbs_consumed(&self) -> u64;
22    /// Estimates the number of SURBs produced or received.
23    ///
24    /// Value returned on each call must be equal or greater to the value returned by a previous call.
25    fn estimate_surbs_produced(&self) -> u64;
26
27    /// Subtracts SURBs consumed from SURBs produced, saturating at zero.
28    fn saturating_diff(&self) -> u64 {
29        self.estimate_surbs_produced()
30            .saturating_sub(self.estimate_surbs_consumed())
31    }
32
33    /// Computes the estimated change in SURB buffer.
34    ///
35    /// This is done by computing the change in produced and consumed SURBs since the `earlier`
36    /// state and then taking their difference.
37    ///
38    /// A positive result is a surplus number of SURBs added to the buffer, a negative result is a loss of SURBs
39    /// from the buffer.
40    /// Returns `None` if `earlier` had more SURBs produced/consumed than this instance (overflow).
41    fn estimated_surb_buffer_change<E: SurbFlowEstimator>(&self, earlier: &E) -> Option<i64> {
42        match (
43            self.estimate_surbs_produced()
44                .checked_sub(earlier.estimate_surbs_produced()),
45            self.estimate_surbs_consumed()
46                .checked_sub(earlier.estimate_surbs_consumed()),
47        ) {
48            (Some(surbs_delivered_delta), Some(surbs_consumed_delta)) => {
49                Some(surbs_delivered_delta as i64 - surbs_consumed_delta as i64)
50            }
51            _ => None,
52        }
53    }
54}
55
56/// Allows controlling the production or consumption of SURBs in a Session.
57#[cfg_attr(test, mockall::automock)]
58pub trait SurbFlowController {
59    /// Adjusts the amount of SURB production or consumption.
60    fn adjust_surb_flow(&self, surbs_per_sec: usize);
61}
62
63/// Represents the setpoint (target) and output limit of a controller.
64#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
65pub struct BalancerControllerBounds(u64, u64);
66
67impl BalancerControllerBounds {
68    /// Creates a new instance.
69    pub fn new(target: u64, output_limit: u64) -> Self {
70        Self(target, output_limit)
71    }
72
73    /// Gets the target (setpoint) of a controller.
74    #[inline]
75    pub fn target(&self) -> u64 {
76        self.0
77    }
78
79    /// Gets the output limit of a controller.
80    #[inline]
81    pub fn output_limit(&self) -> u64 {
82        self.1
83    }
84
85    /// Unpacks the controller bounds into two `u64`s (target and output limit).
86    #[inline]
87    pub fn unzip(&self) -> (u64, u64) {
88        (self.0, self.1)
89    }
90}
91
92/// Trait abstracting a controller used in the [`SurbBalancer`].
93pub trait SurbBalancerController {
94    /// Gets the current bounds of the controller.
95    fn bounds(&self) -> BalancerControllerBounds;
96    /// Updates the controller's target (setpoint) and output limit.
97    fn set_target_and_limit(&mut self, bounds: BalancerControllerBounds);
98    /// Queries the controller for the next control output based on the `current_buffer_level` of SURBs.
99    fn next_control_output(&mut self, current_buffer_level: u64) -> u64;
100}
101
102/// Implementation of [`SurbFlowEstimator`] that tracks the number of produced
103/// and consumed SURBs via two `u64`s.
104///
105/// This implementation can take "snapshots" of other `SurbFlowEstimators` (via `From` trait) by simply
106/// calling their respective methods to fill in its values.
107#[derive(Clone, Copy, Debug, Default)]
108pub struct SimpleSurbFlowEstimator {
109    /// Number of produced SURBs.
110    pub produced: u64,
111    /// Number of consumed SURBs.
112    pub consumed: u64,
113}
114
115impl<T: SurbFlowEstimator> From<&T> for SimpleSurbFlowEstimator {
116    fn from(value: &T) -> Self {
117        Self {
118            produced: value.estimate_surbs_produced(),
119            consumed: value.estimate_surbs_consumed(),
120        }
121    }
122}
123
124impl SurbFlowEstimator for SimpleSurbFlowEstimator {
125    fn estimate_surbs_consumed(&self) -> u64 {
126        self.consumed
127    }
128
129    fn estimate_surbs_produced(&self) -> u64 {
130        self.produced
131    }
132}
133
134/// An implementation of `SurbFlowEstimator` that tracks the number of produced
135/// and consumed SURBs via two `AtomicU64`s.
136#[derive(Clone, Debug, Default)]
137pub struct AtomicSurbFlowEstimator {
138    /// Number of consumed SURBs.
139    pub consumed: std::sync::Arc<std::sync::atomic::AtomicU64>,
140    /// Number of produced or received SURBs.
141    pub produced: std::sync::Arc<std::sync::atomic::AtomicU64>,
142}
143
144impl SurbFlowEstimator for AtomicSurbFlowEstimator {
145    fn estimate_surbs_consumed(&self) -> u64 {
146        self.consumed.load(std::sync::atomic::Ordering::Relaxed)
147    }
148
149    fn estimate_surbs_produced(&self) -> u64 {
150        self.produced.load(std::sync::atomic::Ordering::Relaxed)
151    }
152}
153
154/// Wraps a [`RateController`] as [`SurbFlowController`] with the given correction
155/// factor on time unit.
156///
157/// For example, when this is used to control the flow of keep-alive messages (carrying SURBs),
158/// the correction factor is `HoprPacket::MAX_SURBS_IN_PACKET` - which is the number of SURBs
159/// a single keep-alive message can bear.
160///
161/// In another case, when this is used to control the egress of a Session, each outgoing packet
162/// consumes only a single SURB and therefore the correction factor is `1`.
163pub struct SurbControllerWithCorrection(pub RateController, pub u32);
164
165impl SurbFlowController for SurbControllerWithCorrection {
166    fn adjust_surb_flow(&self, surbs_per_sec: usize) {
167        self.0
168            .set_rate_per_unit(surbs_per_sec, self.1 * std::time::Duration::from_secs(1));
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175
176    #[test]
177    fn test_estimated_surb_buffer_change() {
178        let estimator_1 = SimpleSurbFlowEstimator {
179            produced: 10,
180            consumed: 5,
181        };
182        let estimator_2 = SimpleSurbFlowEstimator {
183            produced: 15,
184            consumed: 11,
185        };
186        let estimator_3 = SimpleSurbFlowEstimator {
187            produced: 25,
188            consumed: 16,
189        };
190        assert_eq!(estimator_1.estimated_surb_buffer_change(&estimator_1), Some(0));
191        assert_eq!(estimator_2.estimated_surb_buffer_change(&estimator_1), Some(-1));
192        assert_eq!(estimator_3.estimated_surb_buffer_change(&estimator_2), Some(5));
193        assert_eq!(estimator_1.estimated_surb_buffer_change(&estimator_2), None);
194    }
195}