1use std::{net::SocketAddr, num::NonZeroUsize};
2
3use hopr_lib::{
4 OffchainKeypair, ServiceId,
5 errors::HoprLibError,
6 prelude::{ConnectedUdpStream, ForeignDataMode, UdpStreamParallelism},
7 transfer_session,
8};
9
10use crate::config::SessionIpForwardingConfig;
11
12#[cfg(all(feature = "telemetry", not(test)))]
13lazy_static::lazy_static! {
14 static ref METRIC_ACTIVE_TARGETS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
15 "hopr_session_hoprd_target_connections",
16 "Number of currently active HOPR session target connections on this Exit node",
17 &["type"]
18 ).unwrap();
19}
20
21pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
23
24pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
26
27pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
29
30#[derive(Debug, Clone)]
33pub struct HoprServerIpForwardingReactor {
34 keypair: OffchainKeypair,
35 cfg: SessionIpForwardingConfig,
36}
37
38impl HoprServerIpForwardingReactor {
39 pub fn new(keypair: OffchainKeypair, cfg: SessionIpForwardingConfig) -> Self {
40 Self { keypair, cfg }
41 }
42
43 fn all_ips_allowed(&self, addrs: &[SocketAddr]) -> bool {
44 if self.cfg.use_target_allow_list {
45 for addr in addrs {
46 if !self.cfg.target_allow_list.contains(addr) {
47 tracing::error!(%addr, "address not allowed by the target allow list, denying the target");
48 return false;
49 }
50 tracing::debug!(%addr, "address allowed by the target allow list, accepting the target");
51 }
52 }
53 true
54 }
55}
56
57pub const SERVICE_ID_LOOPBACK: ServiceId = 0;
58
59#[async_trait::async_trait]
60impl hopr_lib::api::node::HoprSessionServer for HoprServerIpForwardingReactor {
61 type Error = hopr_lib::errors::HoprLibError;
62 type Session = hopr_lib::exports::transport::IncomingSession;
63
64 #[tracing::instrument(level = "debug", skip(self, session))]
65 async fn process(
66 &self,
67 mut session: hopr_lib::exports::transport::IncomingSession,
68 ) -> Result<(), hopr_lib::errors::HoprLibError> {
69 let session_id = *session.session.id();
70 match session.target {
71 hopr_lib::SessionTarget::UdpStream(udp_target) => {
72 let kp = self.keypair.clone();
73 let udp_target = hopr_lib::utils::hopr_parallelize::cpu::spawn_blocking(
74 move || udp_target.unseal(&kp),
75 "udp_unseal",
76 )
77 .await
78 .map_err(|e| HoprLibError::GeneralError(format!("failed to spawn unseal task: {e}")))?
79 .map_err(|e| HoprLibError::GeneralError(format!("cannot unseal target: {e}")))?;
80
81 tracing::debug!(
82 session_id = ?session_id,
83 %udp_target,
84 "binding socket to the UDP server"
85 );
86
87 let resolved_udp_target = udp_target
90 .clone()
91 .resolve_tokio()
92 .await
93 .map_err(|e| HoprLibError::GeneralError(format!("failed to resolve DNS name {udp_target}: {e}")))?
94 .first()
95 .ok_or(HoprLibError::GeneralError(format!(
96 "failed to resolve DNS name {udp_target}"
97 )))?
98 .to_owned();
99 tracing::debug!(
100 ?session_id,
101 %udp_target,
102 resolution = ?resolved_udp_target,
103 "UDP target resolved"
104 );
105
106 if !self.all_ips_allowed(&[resolved_udp_target]) {
107 return Err(HoprLibError::GeneralError(format!(
108 "denied target address {resolved_udp_target}"
109 )));
110 }
111
112 let mut udp_bridge = ConnectedUdpStream::builder()
113 .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
114 .with_counterparty(resolved_udp_target)
115 .with_foreign_data_mode(ForeignDataMode::Error)
116 .with_queue_size(HOPR_UDP_QUEUE_SIZE)
117 .with_receiver_parallelism(
118 std::env::var("HOPRD_SESSION_EXIT_UDP_RX_PARALLELISM")
119 .ok()
120 .and_then(|s| s.parse::<NonZeroUsize>().ok())
121 .map(UdpStreamParallelism::Specific)
122 .unwrap_or(UdpStreamParallelism::Auto),
123 )
124 .build(("0.0.0.0", 0))
125 .map_err(|e| {
126 HoprLibError::GeneralError(format!(
127 "could not bridge the incoming session to {udp_target}: {e}"
128 ))
129 })?;
130
131 tracing::debug!(
132 ?session_id,
133 %udp_target,
134 "bridging the session to the UDP server"
135 );
136
137 tokio::task::spawn(async move {
138 #[cfg(all(feature = "telemetry", not(test)))]
139 let _g = hopr_metrics::MultiGaugeGuard::new(&METRIC_ACTIVE_TARGETS, &["udp"], 1.0);
140
141 match transfer_session(&mut session.session, &mut udp_bridge, HOPR_UDP_BUFFER_SIZE, None).await {
144 Ok((session_to_stream_bytes, stream_to_session_bytes)) => tracing::info!(
145 ?session_id,
146 session_to_stream_bytes,
147 stream_to_session_bytes,
148 %udp_target,
149 "server bridged session to UDP ended"
150 ),
151 Err(e) => tracing::error!(
152 ?session_id,
153 %udp_target,
154 error = %e,
155 "UDP server stream is closed"
156 ),
157 }
158 });
159
160 Ok(())
161 }
162 hopr_lib::SessionTarget::TcpStream(tcp_target) => {
163 let kp = self.keypair.clone();
164 let tcp_target = hopr_lib::utils::hopr_parallelize::cpu::spawn_blocking(
165 move || tcp_target.unseal(&kp),
166 "tcp_unseal",
167 )
168 .await
169 .map_err(|e| HoprLibError::GeneralError(format!("failed to spawn unseal task: {e}")))?
170 .map_err(|e| HoprLibError::GeneralError(format!("cannot unseal target: {e}")))?;
171
172 tracing::debug!(?session_id, %tcp_target, "creating a connection to the TCP server");
173
174 let resolved_tcp_targets =
177 tcp_target.clone().resolve_tokio().await.map_err(|e| {
178 HoprLibError::GeneralError(format!("failed to resolve DNS name {tcp_target}: {e}"))
179 })?;
180 tracing::debug!(
181 ?session_id,
182 %tcp_target,
183 resolution = ?resolved_tcp_targets,
184 "TCP target resolved"
185 );
186
187 if !self.all_ips_allowed(&resolved_tcp_targets) {
188 return Err(HoprLibError::GeneralError(format!(
189 "denied target address {resolved_tcp_targets:?}"
190 )));
191 }
192
193 let strategy = tokio_retry::strategy::FixedInterval::new(self.cfg.tcp_target_retry_delay)
194 .take(self.cfg.max_tcp_target_retries as usize);
195
196 let mut tcp_bridge = tokio_retry::Retry::spawn(strategy, || {
197 tokio::net::TcpStream::connect(resolved_tcp_targets.as_slice())
198 })
199 .await
200 .map_err(|e| {
201 HoprLibError::GeneralError(format!("could not bridge the incoming session to {tcp_target}: {e}"))
202 })?;
203
204 tcp_bridge.set_nodelay(true).map_err(|e| {
205 HoprLibError::GeneralError(format!(
206 "could not set the TCP_NODELAY option for the bridged session to {tcp_target}: {e}",
207 ))
208 })?;
209
210 tracing::debug!(
211 ?session_id,
212 %tcp_target,
213 "bridging the session to the TCP server"
214 );
215
216 tokio::task::spawn(async move {
217 #[cfg(all(feature = "telemetry", not(test)))]
218 let _g = hopr_metrics::MultiGaugeGuard::new(&METRIC_ACTIVE_TARGETS, &["tcp"], 1.0);
219
220 match transfer_session(&mut session.session, &mut tcp_bridge, HOPR_TCP_BUFFER_SIZE, None).await {
221 Ok((session_to_stream_bytes, stream_to_session_bytes)) => tracing::info!(
222 ?session_id,
223 session_to_stream_bytes,
224 stream_to_session_bytes,
225 %tcp_target,
226 "server bridged session to TCP ended"
227 ),
228 Err(error) => tracing::error!(
229 ?session_id,
230 %tcp_target,
231 %error,
232 "TCP server stream is closed"
233 ),
234 }
235 });
236
237 Ok(())
238 }
239 hopr_lib::SessionTarget::ExitNode(SERVICE_ID_LOOPBACK) => {
240 tracing::debug!(?session_id, "bridging the session to the loopback service");
241 let (mut reader, mut writer) = tokio::io::split(session.session);
242
243 #[cfg(all(feature = "telemetry", not(test)))]
244 let _g = hopr_metrics::MultiGaugeGuard::new(&METRIC_ACTIVE_TARGETS, &["loopback"], 1.0);
245
246 match tokio::io::copy(&mut reader, &mut writer).await {
248 Ok(copied) => tracing::info!(?session_id, copied, "server loopback session service ended"),
249 Err(error) => tracing::error!(
250 ?session_id,
251 %error,
252 "server loopback session service ended with an error"
253 ),
254 }
255
256 Ok(())
257 }
258 hopr_lib::SessionTarget::ExitNode(_) => Err(HoprLibError::GeneralError(
259 "server does not support internal session processing".into(),
260 )),
261 }
262 }
263}