hopr_transport_session/balancer/
mod.rs1mod controller;
2pub mod pid;
5#[allow(dead_code)]
6mod rate_limiting;
7pub mod simple;
9
10pub use controller::{BalancerStateValues, SurbBalancer, SurbBalancerConfig};
11pub use rate_limiting::{RateController, RateLimitSinkExt, RateLimitStreamExt};
12
13pub const MIN_BALANCER_SAMPLING_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
15
16pub trait SurbFlowEstimator {
18 fn estimate_surbs_consumed(&self) -> u64;
22 fn estimate_surbs_produced(&self) -> u64;
26
27 fn saturating_diff(&self) -> u64 {
29 self.estimate_surbs_produced()
30 .saturating_sub(self.estimate_surbs_consumed())
31 }
32
33 fn estimated_surb_buffer_change<E: SurbFlowEstimator>(&self, earlier: &E) -> Option<i64> {
42 match (
43 self.estimate_surbs_produced()
44 .checked_sub(earlier.estimate_surbs_produced()),
45 self.estimate_surbs_consumed()
46 .checked_sub(earlier.estimate_surbs_consumed()),
47 ) {
48 (Some(surbs_delivered_delta), Some(surbs_consumed_delta)) => {
49 Some(surbs_delivered_delta as i64 - surbs_consumed_delta as i64)
50 }
51 _ => None,
52 }
53 }
54}
55
56#[cfg_attr(test, mockall::automock)]
58pub trait SurbFlowController {
59 fn adjust_surb_flow(&self, surbs_per_sec: usize);
61}
62
63#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
65pub struct BalancerControllerBounds(u64, u64);
66
67impl BalancerControllerBounds {
68 pub fn new(target: u64, output_limit: u64) -> Self {
70 Self(target, output_limit)
71 }
72
73 #[inline]
75 pub fn target(&self) -> u64 {
76 self.0
77 }
78
79 #[inline]
81 pub fn output_limit(&self) -> u64 {
82 self.1
83 }
84
85 #[inline]
87 pub fn unzip(&self) -> (u64, u64) {
88 (self.0, self.1)
89 }
90}
91
92pub trait SurbBalancerController {
94 fn bounds(&self) -> BalancerControllerBounds;
96 fn set_target_and_limit(&mut self, bounds: BalancerControllerBounds);
98 fn next_control_output(&mut self, current_buffer_level: u64) -> u64;
100}
101
102#[derive(Clone, Copy, Debug, Default)]
108pub struct SimpleSurbFlowEstimator {
109 pub produced: u64,
111 pub consumed: u64,
113}
114
115impl<T: SurbFlowEstimator> From<&T> for SimpleSurbFlowEstimator {
116 fn from(value: &T) -> Self {
117 Self {
118 produced: value.estimate_surbs_produced(),
119 consumed: value.estimate_surbs_consumed(),
120 }
121 }
122}
123
124impl SurbFlowEstimator for SimpleSurbFlowEstimator {
125 fn estimate_surbs_consumed(&self) -> u64 {
126 self.consumed
127 }
128
129 fn estimate_surbs_produced(&self) -> u64 {
130 self.produced
131 }
132}
133
134#[derive(Clone, Debug, Default)]
137pub struct AtomicSurbFlowEstimator {
138 pub consumed: std::sync::Arc<std::sync::atomic::AtomicU64>,
140 pub produced: std::sync::Arc<std::sync::atomic::AtomicU64>,
142}
143
144impl SurbFlowEstimator for AtomicSurbFlowEstimator {
145 fn estimate_surbs_consumed(&self) -> u64 {
146 self.consumed.load(std::sync::atomic::Ordering::Relaxed)
147 }
148
149 fn estimate_surbs_produced(&self) -> u64 {
150 self.produced.load(std::sync::atomic::Ordering::Relaxed)
151 }
152}
153
154pub struct SurbControllerWithCorrection(pub RateController, pub u32);
164
165impl SurbFlowController for SurbControllerWithCorrection {
166 fn adjust_surb_flow(&self, surbs_per_sec: usize) {
167 self.0
168 .set_rate_per_unit(surbs_per_sec, self.1 * std::time::Duration::from_secs(1));
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175
176 #[test]
177 fn test_estimated_surb_buffer_change() {
178 let estimator_1 = SimpleSurbFlowEstimator {
179 produced: 10,
180 consumed: 5,
181 };
182 let estimator_2 = SimpleSurbFlowEstimator {
183 produced: 15,
184 consumed: 11,
185 };
186 let estimator_3 = SimpleSurbFlowEstimator {
187 produced: 25,
188 consumed: 16,
189 };
190 assert_eq!(estimator_1.estimated_surb_buffer_change(&estimator_1), Some(0));
191 assert_eq!(estimator_2.estimated_surb_buffer_change(&estimator_1), Some(-1));
192 assert_eq!(estimator_3.estimated_surb_buffer_change(&estimator_2), Some(5));
193 assert_eq!(estimator_1.estimated_surb_buffer_change(&estimator_2), None);
194 }
195}