hopr_transport_session/
utils.rs1use std::time::Duration;
2
3use futures::{FutureExt, SinkExt, StreamExt, TryStreamExt};
4use hopr_async_runtime::AbortHandle;
5use hopr_crypto_packet::prelude::HoprPacket;
6use hopr_network_types::prelude::DestinationRouting;
7use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataOut};
8use tracing::{debug, error};
9
10use crate::{
11 SessionId,
12 balancer::{RateController, RateLimitStreamExt, SurbControllerWithCorrection},
13 errors::TransportSessionError,
14 types::HoprStartProtocol,
15};
16
17#[cfg(feature = "runtime-tokio")]
26pub async fn transfer_session<S>(
27 session: &mut crate::HoprSession,
28 stream: &mut S,
29 max_buffer: usize,
30 abort_stream: Option<futures::future::AbortRegistration>,
31) -> std::io::Result<(usize, usize)>
32where
33 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
34{
35 tracing::debug!(
37 session_id = ?session.id(),
38 egress_buffer = max_buffer,
39 ingress_buffer = max_buffer,
40 "session buffers"
41 );
42
43 if let Some(abort_stream) = abort_stream {
44 let (_, dummy) = futures::future::AbortHandle::new_pair();
48 hopr_network_types::utils::copy_duplex_abortable(
49 session,
50 stream,
51 (max_buffer, max_buffer),
52 (dummy, abort_stream),
53 )
54 .await
55 .map(|(a, b)| (a as usize, b as usize))
56 } else {
57 hopr_network_types::utils::copy_duplex(session, stream, (max_buffer, max_buffer))
58 .await
59 .map(|(a, b)| (a as usize, b as usize))
60 }
61}
62
63pub(crate) async fn insert_into_next_slot<K, V, F>(
69 cache: &moka::future::Cache<K, V>,
70 generator: F,
71 value: V,
72) -> Option<K>
73where
74 K: Copy + std::hash::Hash + Eq + Send + Sync + 'static,
75 V: Clone + Send + Sync + 'static,
76 F: Fn(Option<K>) -> K,
77{
78 cache.run_pending_tasks().await;
79
80 let initial = generator(None);
81 let mut next = initial;
82 loop {
83 let insertion_result = cache
84 .entry(next)
85 .and_try_compute_with(|e| {
86 if e.is_none() {
87 futures::future::ok::<_, ()>(moka::ops::compute::Op::Put(value.clone()))
88 } else {
89 futures::future::ok::<_, ()>(moka::ops::compute::Op::Nop)
90 }
91 })
92 .await;
93
94 if let Ok(moka::ops::compute::CompResult::Inserted(_)) = insertion_result {
96 return Some(next);
97 }
98
99 next = generator(Some(next));
101
102 if next == initial {
104 return None;
105 }
106 }
107}
108
109pub(crate) fn spawn_keep_alive_stream<S>(
110 session_id: SessionId,
111 sender: S,
112 routing: DestinationRouting,
113) -> (SurbControllerWithCorrection, AbortHandle)
114where
115 S: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + Unpin + 'static,
116 S::Error: std::error::Error + Send + Sync + 'static,
117{
118 let elem = HoprStartProtocol::KeepAlive(session_id.into());
119
120 let controller = RateController::new(0, Duration::from_secs(1));
122
123 let (ka_stream, abort_handle) =
124 futures::stream::abortable(futures::stream::repeat(elem).rate_limit_with_controller(&controller));
125
126 let sender_clone = sender.clone();
127 let fwd_routing_clone = routing.clone();
128
129 debug!(%session_id, "spawning keep-alive stream");
131 hopr_async_runtime::prelude::spawn(
132 ka_stream
133 .map(move |msg| {
134 ApplicationData::try_from(msg)
135 .map(|data| (fwd_routing_clone.clone(), ApplicationDataOut::with_no_packet_info(data)))
136 })
137 .map_err(TransportSessionError::from)
138 .try_for_each_concurrent(None, move |msg| {
139 let mut sender_clone = sender_clone.clone();
140 async move {
141 sender_clone
142 .send(msg)
143 .await
144 .map_err(|e| TransportSessionError::PacketSendingError(e.to_string()))
145 }
146 })
147 .then(move |res| {
148 match res {
149 Ok(_) => tracing::trace!(
150 component = "session",
151 %session_id,
152 task = "session keepalive",
153 "background task finished"
154 ),
155 Err(error) => error!(%session_id, %error, "keep-alive stream failed"),
156 }
157 futures::future::ready(())
158 }),
159 );
160
161 (
164 SurbControllerWithCorrection(controller, HoprPacket::MAX_SURBS_IN_PACKET as u32),
165 abort_handle,
166 )
167}
168
169#[cfg(test)]
170mod tests {
171 use anyhow::anyhow;
172
173 use super::*;
174
175 #[tokio::test]
176 async fn test_insert_into_next_slot() -> anyhow::Result<()> {
177 let cache = moka::future::Cache::new(10);
178
179 for i in 0..5 {
180 let v = insert_into_next_slot(&cache, |prev| prev.map(|v| (v + 1) % 5).unwrap_or(0), "foo".to_string())
181 .await
182 .ok_or(anyhow!("should insert"))?;
183 assert_eq!(v, i);
184 assert_eq!(Some("foo".to_string()), cache.get(&i).await);
185 }
186
187 assert!(
188 insert_into_next_slot(&cache, |prev| prev.map(|v| (v + 1) % 5).unwrap_or(0), "foo".to_string())
189 .await
190 .is_none(),
191 "must not find slot when full"
192 );
193
194 Ok(())
195 }
196}