hopr_transport_session/balancer/
mod.rs1mod controller;
2pub mod pid;
5#[allow(dead_code)]
6mod rate_limiting;
7pub mod simple;
9
10pub use controller::{BalancerConfigFeedback, SurbBalancer, SurbBalancerConfig};
11pub use rate_limiting::{RateController, RateLimitSinkExt, RateLimitStreamExt};
12
13pub const MIN_BALANCER_SAMPLING_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
15
16pub trait SurbFlowEstimator {
18 fn estimate_surbs_consumed(&self) -> u64;
22 fn estimate_surbs_produced(&self) -> u64;
26
27 fn estimated_surb_buffer_change<E: SurbFlowEstimator>(&self, earlier: &E) -> Option<i64> {
36 match (
37 self.estimate_surbs_produced()
38 .checked_sub(earlier.estimate_surbs_produced()),
39 self.estimate_surbs_consumed()
40 .checked_sub(earlier.estimate_surbs_consumed()),
41 ) {
42 (Some(surbs_delivered_delta), Some(surbs_consumed_delta)) => {
43 Some(surbs_delivered_delta as i64 - surbs_consumed_delta as i64)
44 }
45 _ => None,
46 }
47 }
48}
49
50#[cfg_attr(test, mockall::automock)]
52pub trait SurbFlowController {
53 fn adjust_surb_flow(&self, surbs_per_sec: usize);
55}
56
57#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
59pub struct BalancerControllerBounds(u64, u64);
60
61impl BalancerControllerBounds {
62 pub fn new(target: u64, output_limit: u64) -> Self {
64 Self(target, output_limit)
65 }
66
67 pub fn target(&self) -> u64 {
69 self.0
70 }
71
72 pub fn output_limit(&self) -> u64 {
74 self.1
75 }
76}
77
78pub trait SurbBalancerController {
80 fn bounds(&self) -> BalancerControllerBounds;
82 fn set_target_and_limit(&mut self, bounds: BalancerControllerBounds);
84 fn next_control_output(&mut self, current_buffer_level: u64) -> u64;
86}
87
88#[derive(Clone, Copy, Debug, Default)]
94pub struct SimpleSurbFlowEstimator {
95 pub produced: u64,
97 pub consumed: u64,
99}
100
101impl<T: SurbFlowEstimator> From<&T> for SimpleSurbFlowEstimator {
102 fn from(value: &T) -> Self {
103 Self {
104 produced: value.estimate_surbs_produced(),
105 consumed: value.estimate_surbs_consumed(),
106 }
107 }
108}
109
110impl SurbFlowEstimator for SimpleSurbFlowEstimator {
111 fn estimate_surbs_consumed(&self) -> u64 {
112 self.consumed
113 }
114
115 fn estimate_surbs_produced(&self) -> u64 {
116 self.produced
117 }
118}
119
120#[derive(Clone, Debug, Default)]
123pub struct AtomicSurbFlowEstimator {
124 pub consumed: std::sync::Arc<std::sync::atomic::AtomicU64>,
126 pub produced: std::sync::Arc<std::sync::atomic::AtomicU64>,
128}
129
130impl SurbFlowEstimator for AtomicSurbFlowEstimator {
131 fn estimate_surbs_consumed(&self) -> u64 {
132 self.consumed.load(std::sync::atomic::Ordering::Relaxed)
133 }
134
135 fn estimate_surbs_produced(&self) -> u64 {
136 self.produced.load(std::sync::atomic::Ordering::Relaxed)
137 }
138}
139
140pub struct SurbControllerWithCorrection(pub RateController, pub u32);
150
151impl SurbFlowController for SurbControllerWithCorrection {
152 fn adjust_surb_flow(&self, surbs_per_sec: usize) {
153 self.0
154 .set_rate_per_unit(surbs_per_sec, self.1 * std::time::Duration::from_secs(1));
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161
162 #[test]
163 fn test_estimated_surb_buffer_change() {
164 let estimator_1 = SimpleSurbFlowEstimator {
165 produced: 10,
166 consumed: 5,
167 };
168 let estimator_2 = SimpleSurbFlowEstimator {
169 produced: 15,
170 consumed: 11,
171 };
172 let estimator_3 = SimpleSurbFlowEstimator {
173 produced: 25,
174 consumed: 16,
175 };
176 assert_eq!(estimator_1.estimated_surb_buffer_change(&estimator_1), Some(0));
177 assert_eq!(estimator_2.estimated_surb_buffer_change(&estimator_1), Some(-1));
178 assert_eq!(estimator_3.estimated_surb_buffer_change(&estimator_2), Some(5));
179 assert_eq!(estimator_1.estimated_surb_buffer_change(&estimator_2), None);
180 }
181}