hopr_transport_session/balancer/
mod.rs

1mod controller;
2/// Contains implementation of the [`SurbBalancerController`] trait using a Proportional Integral Derivative (PID)
3/// controller.
4pub mod pid;
5#[allow(dead_code)]
6mod rate_limiting;
7/// Contains a simple proportional output implementation of the [`SurbBalancerController`] trait.
8pub mod simple;
9
10pub use controller::{BalancerConfigFeedback, SurbBalancer, SurbBalancerConfig};
11pub use rate_limiting::{RateController, RateLimitSinkExt, RateLimitStreamExt};
12
13/// Smallest possible interval for balancer sampling.
14pub const MIN_BALANCER_SAMPLING_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
15
16/// Allows estimating the flow of SURBs in a Session (production or consumption).
17pub trait SurbFlowEstimator {
18    /// Estimates the number of SURBs consumed.
19    ///
20    /// Value returned on each call must be equal or greater to the value returned by a previous call.
21    fn estimate_surbs_consumed(&self) -> u64;
22    /// Estimates the number of SURBs produced or received.
23    ///
24    /// Value returned on each call must be equal or greater to the value returned by a previous call.
25    fn estimate_surbs_produced(&self) -> u64;
26
27    /// Computes the estimated change in SURB buffer.
28    ///
29    /// This is done by computing the change in produced and consumed SURBs since the `earlier`
30    /// state and then taking their difference.
31    ///
32    /// A positive result is a surplus number of SURBs added to the buffer, a negative result is a loss of SURBs
33    /// from the buffer.
34    /// Returns `None` if `earlier` had more SURBs produced/consumed than this instance (overflow).
35    fn estimated_surb_buffer_change<E: SurbFlowEstimator>(&self, earlier: &E) -> Option<i64> {
36        match (
37            self.estimate_surbs_produced()
38                .checked_sub(earlier.estimate_surbs_produced()),
39            self.estimate_surbs_consumed()
40                .checked_sub(earlier.estimate_surbs_consumed()),
41        ) {
42            (Some(surbs_delivered_delta), Some(surbs_consumed_delta)) => {
43                Some(surbs_delivered_delta as i64 - surbs_consumed_delta as i64)
44            }
45            _ => None,
46        }
47    }
48}
49
50/// Allows controlling the production or consumption of SURBs in a Session.
51#[cfg_attr(test, mockall::automock)]
52pub trait SurbFlowController {
53    /// Adjusts the amount of SURB production or consumption.
54    fn adjust_surb_flow(&self, surbs_per_sec: usize);
55}
56
57/// Represents the setpoint (target) and output limit of a controller.
58#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
59pub struct BalancerControllerBounds(u64, u64);
60
61impl BalancerControllerBounds {
62    /// Creates a new instance.
63    pub fn new(target: u64, output_limit: u64) -> Self {
64        Self(target, output_limit)
65    }
66
67    /// Gets the target (setpoint) of a controller.
68    pub fn target(&self) -> u64 {
69        self.0
70    }
71
72    /// Gets the output limit of a controller.
73    pub fn output_limit(&self) -> u64 {
74        self.1
75    }
76}
77
78/// Trait abstracting a controller used in the [`SurbBalancer`].
79pub trait SurbBalancerController {
80    /// Gets the current bounds of the controller.
81    fn bounds(&self) -> BalancerControllerBounds;
82    /// Updates the controller's target (setpoint) and output limit.
83    fn set_target_and_limit(&mut self, bounds: BalancerControllerBounds);
84    /// Queries the controller for the next control output based on the `current_buffer_level` of SURBs.
85    fn next_control_output(&mut self, current_buffer_level: u64) -> u64;
86}
87
88/// Implementation of [`SurbFlowEstimator`] that tracks the number of produced
89/// and consumed SURBs via two `u64`s.
90///
91/// This implementation can take "snapshots" of other `SurbFlowEstimators` (via `From` trait) by simply
92/// calling their respective methods to fill in its values.
93#[derive(Clone, Copy, Debug, Default)]
94pub struct SimpleSurbFlowEstimator {
95    /// Number of produced SURBs.
96    pub produced: u64,
97    /// Number of consumed SURBs.
98    pub consumed: u64,
99}
100
101impl<T: SurbFlowEstimator> From<&T> for SimpleSurbFlowEstimator {
102    fn from(value: &T) -> Self {
103        Self {
104            produced: value.estimate_surbs_produced(),
105            consumed: value.estimate_surbs_consumed(),
106        }
107    }
108}
109
110impl SurbFlowEstimator for SimpleSurbFlowEstimator {
111    fn estimate_surbs_consumed(&self) -> u64 {
112        self.consumed
113    }
114
115    fn estimate_surbs_produced(&self) -> u64 {
116        self.produced
117    }
118}
119
120/// An implementation of [`SurbFlowEstimator`] that tracks the number of produced
121/// and consumed SURBs via two `AtomicU64`s.
122#[derive(Clone, Debug, Default)]
123pub struct AtomicSurbFlowEstimator {
124    /// Number of consumed SURBs.
125    pub consumed: std::sync::Arc<std::sync::atomic::AtomicU64>,
126    /// Number of produced or received SURBs.
127    pub produced: std::sync::Arc<std::sync::atomic::AtomicU64>,
128}
129
130impl SurbFlowEstimator for AtomicSurbFlowEstimator {
131    fn estimate_surbs_consumed(&self) -> u64 {
132        self.consumed.load(std::sync::atomic::Ordering::Relaxed)
133    }
134
135    fn estimate_surbs_produced(&self) -> u64 {
136        self.produced.load(std::sync::atomic::Ordering::Relaxed)
137    }
138}
139
140/// Wraps a [`RateController`] as [`SurbFlowController`] with the given correction
141/// factor on time unit.
142///
143/// For example, when this is used to control the flow of keep-alive messages (carrying SURBs),
144/// the correction factor is `HoprPacket::MAX_SURBS_IN_PACKET` - which is the number of SURBs
145/// a single keep-alive message can bear.
146///
147/// In another case, when this is used to control the egress of a Session, each outgoing packet
148/// consumes only a single SURB and therefore the correction factor is `1`.
149pub struct SurbControllerWithCorrection(pub RateController, pub u32);
150
151impl SurbFlowController for SurbControllerWithCorrection {
152    fn adjust_surb_flow(&self, surbs_per_sec: usize) {
153        self.0
154            .set_rate_per_unit(surbs_per_sec, self.1 * std::time::Duration::from_secs(1));
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161
162    #[test]
163    fn test_estimated_surb_buffer_change() {
164        let estimator_1 = SimpleSurbFlowEstimator {
165            produced: 10,
166            consumed: 5,
167        };
168        let estimator_2 = SimpleSurbFlowEstimator {
169            produced: 15,
170            consumed: 11,
171        };
172        let estimator_3 = SimpleSurbFlowEstimator {
173            produced: 25,
174            consumed: 16,
175        };
176        assert_eq!(estimator_1.estimated_surb_buffer_change(&estimator_1), Some(0));
177        assert_eq!(estimator_2.estimated_surb_buffer_change(&estimator_1), Some(-1));
178        assert_eq!(estimator_3.estimated_surb_buffer_change(&estimator_2), Some(5));
179        assert_eq!(estimator_1.estimated_surb_buffer_change(&estimator_2), None);
180    }
181}