hopr_transport_session/
utils.rs1use std::time::Duration;
2
3use futures::{FutureExt, SinkExt, StreamExt, TryStreamExt};
4use hopr_async_runtime::AbortHandle;
5use hopr_crypto_packet::prelude::HoprPacket;
6use hopr_network_types::prelude::DestinationRouting;
7use hopr_protocol_app::prelude::ApplicationData;
8use tracing::{debug, error};
9
10use crate::{
11 SessionId,
12 balancer::{RateController, RateLimitStreamExt, SurbControllerWithCorrection},
13 errors::TransportSessionError,
14 types::HoprStartProtocol,
15};
16
17#[cfg(feature = "runtime-tokio")]
26pub async fn transfer_session<S>(
27 session: &mut crate::Session,
28 stream: &mut S,
29 max_buffer: usize,
30 abort_stream: Option<futures::future::AbortRegistration>,
31) -> std::io::Result<(usize, usize)>
32where
33 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
34{
35 tracing::debug!(
37 session_id = ?session.id(),
38 egress_buffer = max_buffer,
39 ingress_buffer = max_buffer,
40 "session buffers"
41 );
42
43 if let Some(abort_stream) = abort_stream {
44 let (_, dummy) = futures::future::AbortHandle::new_pair();
48 hopr_network_types::utils::copy_duplex_abortable(
49 session,
50 stream,
51 (max_buffer, max_buffer),
52 (dummy, abort_stream),
53 )
54 .await
55 .map(|(a, b)| (a as usize, b as usize))
56 } else {
57 hopr_network_types::utils::copy_duplex(session, stream, (max_buffer, max_buffer))
58 .await
59 .map(|(a, b)| (a as usize, b as usize))
60 }
61}
62
63pub(crate) async fn insert_into_next_slot<K, V, F>(
69 cache: &moka::future::Cache<K, V>,
70 generator: F,
71 value: V,
72) -> Option<K>
73where
74 K: Copy + std::hash::Hash + Eq + Send + Sync + 'static,
75 V: Clone + Send + Sync + 'static,
76 F: Fn(Option<K>) -> K,
77{
78 cache.run_pending_tasks().await;
79
80 let initial = generator(None);
81 let mut next = initial;
82 loop {
83 let insertion_result = cache
84 .entry(next)
85 .and_try_compute_with(|e| {
86 if e.is_none() {
87 futures::future::ok::<_, ()>(moka::ops::compute::Op::Put(value.clone()))
88 } else {
89 futures::future::ok::<_, ()>(moka::ops::compute::Op::Nop)
90 }
91 })
92 .await;
93
94 if let Ok(moka::ops::compute::CompResult::Inserted(_)) = insertion_result {
96 return Some(next);
97 }
98
99 next = generator(Some(next));
101
102 if next == initial {
104 return None;
105 }
106 }
107}
108
109pub(crate) fn spawn_keep_alive_stream<S>(
110 session_id: SessionId,
111 sender: S,
112 routing: DestinationRouting,
113) -> (SurbControllerWithCorrection, AbortHandle)
114where
115 S: futures::Sink<(DestinationRouting, ApplicationData)> + Clone + Send + Sync + Unpin + 'static,
116 S::Error: std::error::Error + Send + Sync + 'static,
117{
118 let elem = HoprStartProtocol::KeepAlive(session_id.into());
119
120 let controller = RateController::new(0, Duration::from_secs(1));
122
123 let (ka_stream, abort_handle) =
124 futures::stream::abortable(futures::stream::repeat(elem).rate_limit_with_controller(&controller));
125
126 let sender_clone = sender.clone();
127 let fwd_routing_clone = routing.clone();
128
129 debug!(%session_id, "spawning keep-alive stream");
131 hopr_async_runtime::prelude::spawn(
132 ka_stream
133 .map(move |msg| ApplicationData::try_from(msg).map(|m| (fwd_routing_clone.clone(), m)))
134 .map_err(TransportSessionError::from)
135 .try_for_each_concurrent(None, move |msg| {
136 let mut sender_clone = sender_clone.clone();
137 async move {
138 sender_clone
139 .send(msg)
140 .await
141 .map_err(|e| TransportSessionError::PacketSendingError(e.to_string()))
142 }
143 })
144 .then(move |res| {
145 match res {
146 Ok(_) => debug!(%session_id, "keep-alive stream done"),
147 Err(error) => error!(%session_id, %error, "keep-alive stream failed"),
148 }
149 futures::future::ready(())
150 }),
151 );
152
153 (
156 SurbControllerWithCorrection(controller, HoprPacket::MAX_SURBS_IN_PACKET as u32),
157 abort_handle,
158 )
159}
160
161#[cfg(test)]
162mod tests {
163 use anyhow::anyhow;
164
165 use super::*;
166
167 #[tokio::test]
168 async fn test_insert_into_next_slot() -> anyhow::Result<()> {
169 let cache = moka::future::Cache::new(10);
170
171 for i in 0..5 {
172 let v = insert_into_next_slot(&cache, |prev| prev.map(|v| (v + 1) % 5).unwrap_or(0), "foo".to_string())
173 .await
174 .ok_or(anyhow!("should insert"))?;
175 assert_eq!(v, i);
176 assert_eq!(Some("foo".to_string()), cache.get(&i).await);
177 }
178
179 assert!(
180 insert_into_next_slot(&cache, |prev| prev.map(|v| (v + 1) % 5).unwrap_or(0), "foo".to_string())
181 .await
182 .is_none(),
183 "must not find slot when full"
184 );
185
186 Ok(())
187 }
188}