1use std::{net::SocketAddr, num::NonZeroUsize};
2
3use hopr_lib::{HoprOffchainKeypair, ServiceId, errors::HoprLibError, transfer_session};
4use hopr_network_types::{prelude::ForeignDataMode, udp::UdpStreamParallelism};
5use hoprd_api::{HOPR_TCP_BUFFER_SIZE, HOPR_UDP_BUFFER_SIZE, HOPR_UDP_QUEUE_SIZE};
6
7use crate::config::SessionIpForwardingConfig;
8
9#[cfg(all(feature = "prometheus", not(test)))]
10lazy_static::lazy_static! {
11 static ref METRIC_ACTIVE_TARGETS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
12 "hopr_session_hoprd_target_connections",
13 "Number of currently active HOPR session target connections on this Exit node",
14 &["type"]
15 ).unwrap();
16}
17
18#[derive(Debug, Clone)]
21pub struct HoprServerIpForwardingReactor {
22 keypair: HoprOffchainKeypair,
23 cfg: SessionIpForwardingConfig,
24}
25
26impl HoprServerIpForwardingReactor {
27 pub fn new(keypair: HoprOffchainKeypair, cfg: SessionIpForwardingConfig) -> Self {
28 Self { keypair, cfg }
29 }
30
31 fn all_ips_allowed(&self, addrs: &[SocketAddr]) -> bool {
32 if self.cfg.use_target_allow_list {
33 for addr in addrs {
34 if !self.cfg.target_allow_list.contains(addr) {
35 tracing::error!(%addr, "address not allowed by the target allow list, denying the target");
36 return false;
37 }
38 tracing::debug!(%addr, "address allowed by the target allow list, accepting the target");
39 }
40 }
41 true
42 }
43}
44
45pub const SERVICE_ID_LOOPBACK: ServiceId = 0;
46
47#[hopr_lib::async_trait]
48impl hopr_lib::HoprSessionReactor for HoprServerIpForwardingReactor {
49 #[tracing::instrument(level = "debug", skip(self, session))]
50 async fn process(&self, mut session: hopr_lib::HoprIncomingSession) -> hopr_lib::errors::Result<()> {
51 let session_id = *session.session.id();
52 match session.target {
53 hopr_lib::SessionTarget::UdpStream(udp_target) => {
54 let kp = self.keypair.clone();
55 let udp_target = hopr_parallelize::cpu::spawn_blocking(move || udp_target.unseal(&kp))
56 .await
57 .map_err(|e| HoprLibError::GeneralError(format!("cannot unseal target: {e}")))?;
58
59 tracing::debug!(
60 session_id = ?session_id,
61 %udp_target,
62 "binding socket to the UDP server"
63 );
64
65 let resolved_udp_target = udp_target
68 .clone()
69 .resolve_tokio()
70 .await
71 .map_err(|e| HoprLibError::GeneralError(format!("failed to resolve DNS name {udp_target}: {e}")))?
72 .first()
73 .ok_or(HoprLibError::GeneralError(format!(
74 "failed to resolve DNS name {udp_target}"
75 )))?
76 .to_owned();
77 tracing::debug!(
78 ?session_id,
79 %udp_target,
80 resolution = ?resolved_udp_target,
81 "UDP target resolved"
82 );
83
84 if !self.all_ips_allowed(&[resolved_udp_target]) {
85 return Err(HoprLibError::GeneralError(format!(
86 "denied target address {resolved_udp_target}"
87 )));
88 }
89
90 let mut udp_bridge = hopr_network_types::udp::ConnectedUdpStream::builder()
91 .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
92 .with_counterparty(resolved_udp_target)
93 .with_foreign_data_mode(ForeignDataMode::Error)
94 .with_queue_size(HOPR_UDP_QUEUE_SIZE)
95 .with_receiver_parallelism(
96 std::env::var("HOPRD_SESSION_EXIT_UDP_RX_PARALLELISM")
97 .ok()
98 .and_then(|s| s.parse::<NonZeroUsize>().ok())
99 .map(UdpStreamParallelism::Specific)
100 .unwrap_or(UdpStreamParallelism::Auto),
101 )
102 .build(("0.0.0.0", 0))
103 .map_err(|e| {
104 HoprLibError::GeneralError(format!(
105 "could not bridge the incoming session to {udp_target}: {e}"
106 ))
107 })?;
108
109 tracing::debug!(
110 ?session_id,
111 %udp_target,
112 "bridging the session to the UDP server"
113 );
114
115 tokio::task::spawn(async move {
116 #[cfg(all(feature = "prometheus", not(test)))]
117 METRIC_ACTIVE_TARGETS.increment(&["udp"], 1.0);
118
119 match transfer_session(&mut session.session, &mut udp_bridge, HOPR_UDP_BUFFER_SIZE, None).await {
122 Ok((session_to_stream_bytes, stream_to_session_bytes)) => tracing::info!(
123 ?session_id,
124 session_to_stream_bytes,
125 stream_to_session_bytes,
126 %udp_target,
127 "server bridged session to UDP ended"
128 ),
129 Err(e) => tracing::error!(
130 ?session_id,
131 %udp_target,
132 error = %e,
133 "UDP server stream is closed"
134 ),
135 }
136
137 #[cfg(all(feature = "prometheus", not(test)))]
138 METRIC_ACTIVE_TARGETS.decrement(&["udp"], 1.0);
139 });
140
141 Ok(())
142 }
143 hopr_lib::SessionTarget::TcpStream(tcp_target) => {
144 let kp = self.keypair.clone();
145 let tcp_target = hopr_parallelize::cpu::spawn_blocking(move || tcp_target.unseal(&kp))
146 .await
147 .map_err(|e| HoprLibError::GeneralError(format!("cannot unseal target: {e}")))?;
148
149 tracing::debug!(?session_id, %tcp_target, "creating a connection to the TCP server");
150
151 let resolved_tcp_targets =
154 tcp_target.clone().resolve_tokio().await.map_err(|e| {
155 HoprLibError::GeneralError(format!("failed to resolve DNS name {tcp_target}: {e}"))
156 })?;
157 tracing::debug!(
158 ?session_id,
159 %tcp_target,
160 resolution = ?resolved_tcp_targets,
161 "TCP target resolved"
162 );
163
164 if !self.all_ips_allowed(&resolved_tcp_targets) {
165 return Err(HoprLibError::GeneralError(format!(
166 "denied target address {resolved_tcp_targets:?}"
167 )));
168 }
169
170 let strategy = tokio_retry::strategy::FixedInterval::new(self.cfg.tcp_target_retry_delay)
171 .take(self.cfg.max_tcp_target_retries as usize);
172
173 let mut tcp_bridge = tokio_retry::Retry::spawn(strategy, || {
174 tokio::net::TcpStream::connect(resolved_tcp_targets.as_slice())
175 })
176 .await
177 .map_err(|e| {
178 HoprLibError::GeneralError(format!("could not bridge the incoming session to {tcp_target}: {e}"))
179 })?;
180
181 tcp_bridge.set_nodelay(true).map_err(|e| {
182 HoprLibError::GeneralError(format!(
183 "could not set the TCP_NODELAY option for the bridged session to {tcp_target}: {e}",
184 ))
185 })?;
186
187 tracing::debug!(
188 ?session_id,
189 %tcp_target,
190 "bridging the session to the TCP server"
191 );
192 tokio::task::spawn(async move {
193 #[cfg(all(feature = "prometheus", not(test)))]
194 METRIC_ACTIVE_TARGETS.increment(&["tcp"], 1.0);
195
196 match transfer_session(&mut session.session, &mut tcp_bridge, HOPR_TCP_BUFFER_SIZE, None).await {
197 Ok((session_to_stream_bytes, stream_to_session_bytes)) => tracing::info!(
198 ?session_id,
199 session_to_stream_bytes,
200 stream_to_session_bytes,
201 %tcp_target,
202 "server bridged session to TCP ended"
203 ),
204 Err(error) => tracing::error!(
205 ?session_id,
206 %tcp_target,
207 %error,
208 "TCP server stream is closed"
209 ),
210 }
211
212 #[cfg(all(feature = "prometheus", not(test)))]
213 METRIC_ACTIVE_TARGETS.decrement(&["tcp"], 1.0);
214 });
215
216 Ok(())
217 }
218 hopr_lib::SessionTarget::ExitNode(SERVICE_ID_LOOPBACK) => {
219 tracing::debug!(?session_id, "bridging the session to the loopback service");
220 let (mut reader, mut writer) = tokio::io::split(session.session);
221
222 #[cfg(all(feature = "prometheus", not(test)))]
223 METRIC_ACTIVE_TARGETS.increment(&["loopback"], 1.0);
224
225 match tokio::io::copy(&mut reader, &mut writer).await {
227 Ok(copied) => tracing::info!(?session_id, copied, "server loopback session service ended"),
228 Err(error) => tracing::error!(
229 ?session_id,
230 %error,
231 "server loopback session service ended with an error"
232 ),
233 }
234
235 #[cfg(all(feature = "prometheus", not(test)))]
236 METRIC_ACTIVE_TARGETS.decrement(&["loopback"], 1.0);
237
238 Ok(())
239 }
240 hopr_lib::SessionTarget::ExitNode(_) => Err(HoprLibError::GeneralError(
241 "server does not support internal session processing".into(),
242 )),
243 }
244 }
245}