1use std::{net::SocketAddr, num::NonZeroUsize};
2
3use hopr_lib::{
4 OffchainKeypair, ServiceId,
5 errors::HoprLibError,
6 prelude::{ConnectedUdpStream, ForeignDataMode, UdpStreamParallelism},
7 transfer_session,
8};
9use hoprd_api::{HOPR_TCP_BUFFER_SIZE, HOPR_UDP_BUFFER_SIZE, HOPR_UDP_QUEUE_SIZE};
10
11use crate::config::SessionIpForwardingConfig;
12
13#[cfg(all(feature = "prometheus", not(test)))]
14lazy_static::lazy_static! {
15 static ref METRIC_ACTIVE_TARGETS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
16 "hopr_session_hoprd_target_connections",
17 "Number of currently active HOPR session target connections on this Exit node",
18 &["type"]
19 ).unwrap();
20}
21
22#[derive(Debug, Clone)]
25pub struct HoprServerIpForwardingReactor {
26 keypair: OffchainKeypair,
27 cfg: SessionIpForwardingConfig,
28}
29
30impl HoprServerIpForwardingReactor {
31 pub fn new(keypair: OffchainKeypair, cfg: SessionIpForwardingConfig) -> Self {
32 Self { keypair, cfg }
33 }
34
35 fn all_ips_allowed(&self, addrs: &[SocketAddr]) -> bool {
36 if self.cfg.use_target_allow_list {
37 for addr in addrs {
38 if !self.cfg.target_allow_list.contains(addr) {
39 tracing::error!(%addr, "address not allowed by the target allow list, denying the target");
40 return false;
41 }
42 tracing::debug!(%addr, "address allowed by the target allow list, accepting the target");
43 }
44 }
45 true
46 }
47}
48
49pub const SERVICE_ID_LOOPBACK: ServiceId = 0;
50
51#[async_trait::async_trait]
52impl hopr_lib::traits::session::HoprSessionServer for HoprServerIpForwardingReactor {
53 #[tracing::instrument(level = "debug", skip(self, session))]
54 async fn process(
55 &self,
56 mut session: hopr_lib::exports::transport::IncomingSession,
57 ) -> hopr_lib::errors::Result<()> {
58 let session_id = *session.session.id();
59 match session.target {
60 hopr_lib::SessionTarget::UdpStream(udp_target) => {
61 let kp = self.keypair.clone();
62 let udp_target =
63 hopr_lib::utils::parallelize::cpu::spawn_blocking(move || udp_target.unseal(&kp), "udp_unseal")
64 .await
65 .map_err(|e| HoprLibError::GeneralError(format!("failed to spawn unseal task: {e}")))?
66 .map_err(|e| HoprLibError::GeneralError(format!("cannot unseal target: {e}")))?;
67
68 tracing::debug!(
69 session_id = ?session_id,
70 %udp_target,
71 "binding socket to the UDP server"
72 );
73
74 let resolved_udp_target = udp_target
77 .clone()
78 .resolve_tokio()
79 .await
80 .map_err(|e| HoprLibError::GeneralError(format!("failed to resolve DNS name {udp_target}: {e}")))?
81 .first()
82 .ok_or(HoprLibError::GeneralError(format!(
83 "failed to resolve DNS name {udp_target}"
84 )))?
85 .to_owned();
86 tracing::debug!(
87 ?session_id,
88 %udp_target,
89 resolution = ?resolved_udp_target,
90 "UDP target resolved"
91 );
92
93 if !self.all_ips_allowed(&[resolved_udp_target]) {
94 return Err(HoprLibError::GeneralError(format!(
95 "denied target address {resolved_udp_target}"
96 )));
97 }
98
99 let mut udp_bridge = ConnectedUdpStream::builder()
100 .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
101 .with_counterparty(resolved_udp_target)
102 .with_foreign_data_mode(ForeignDataMode::Error)
103 .with_queue_size(HOPR_UDP_QUEUE_SIZE)
104 .with_receiver_parallelism(
105 std::env::var("HOPRD_SESSION_EXIT_UDP_RX_PARALLELISM")
106 .ok()
107 .and_then(|s| s.parse::<NonZeroUsize>().ok())
108 .map(UdpStreamParallelism::Specific)
109 .unwrap_or(UdpStreamParallelism::Auto),
110 )
111 .build(("0.0.0.0", 0))
112 .map_err(|e| {
113 HoprLibError::GeneralError(format!(
114 "could not bridge the incoming session to {udp_target}: {e}"
115 ))
116 })?;
117
118 tracing::debug!(
119 ?session_id,
120 %udp_target,
121 "bridging the session to the UDP server"
122 );
123
124 tokio::task::spawn(async move {
125 #[cfg(all(feature = "prometheus", not(test)))]
126 let _g = hopr_metrics::MultiGaugeGuard::new(&METRIC_ACTIVE_TARGETS, &["udp"], 1.0);
127
128 match transfer_session(&mut session.session, &mut udp_bridge, HOPR_UDP_BUFFER_SIZE, None).await {
131 Ok((session_to_stream_bytes, stream_to_session_bytes)) => tracing::info!(
132 ?session_id,
133 session_to_stream_bytes,
134 stream_to_session_bytes,
135 %udp_target,
136 "server bridged session to UDP ended"
137 ),
138 Err(e) => tracing::error!(
139 ?session_id,
140 %udp_target,
141 error = %e,
142 "UDP server stream is closed"
143 ),
144 }
145 });
146
147 Ok(())
148 }
149 hopr_lib::SessionTarget::TcpStream(tcp_target) => {
150 let kp = self.keypair.clone();
151 let tcp_target =
152 hopr_lib::utils::parallelize::cpu::spawn_blocking(move || tcp_target.unseal(&kp), "tcp_unseal")
153 .await
154 .map_err(|e| HoprLibError::GeneralError(format!("failed to spawn unseal task: {e}")))?
155 .map_err(|e| HoprLibError::GeneralError(format!("cannot unseal target: {e}")))?;
156
157 tracing::debug!(?session_id, %tcp_target, "creating a connection to the TCP server");
158
159 let resolved_tcp_targets =
162 tcp_target.clone().resolve_tokio().await.map_err(|e| {
163 HoprLibError::GeneralError(format!("failed to resolve DNS name {tcp_target}: {e}"))
164 })?;
165 tracing::debug!(
166 ?session_id,
167 %tcp_target,
168 resolution = ?resolved_tcp_targets,
169 "TCP target resolved"
170 );
171
172 if !self.all_ips_allowed(&resolved_tcp_targets) {
173 return Err(HoprLibError::GeneralError(format!(
174 "denied target address {resolved_tcp_targets:?}"
175 )));
176 }
177
178 let strategy = tokio_retry::strategy::FixedInterval::new(self.cfg.tcp_target_retry_delay)
179 .take(self.cfg.max_tcp_target_retries as usize);
180
181 let mut tcp_bridge = tokio_retry::Retry::spawn(strategy, || {
182 tokio::net::TcpStream::connect(resolved_tcp_targets.as_slice())
183 })
184 .await
185 .map_err(|e| {
186 HoprLibError::GeneralError(format!("could not bridge the incoming session to {tcp_target}: {e}"))
187 })?;
188
189 tcp_bridge.set_nodelay(true).map_err(|e| {
190 HoprLibError::GeneralError(format!(
191 "could not set the TCP_NODELAY option for the bridged session to {tcp_target}: {e}",
192 ))
193 })?;
194
195 tracing::debug!(
196 ?session_id,
197 %tcp_target,
198 "bridging the session to the TCP server"
199 );
200
201 tokio::task::spawn(async move {
202 #[cfg(all(feature = "prometheus", not(test)))]
203 let _g = hopr_metrics::MultiGaugeGuard::new(&METRIC_ACTIVE_TARGETS, &["tcp"], 1.0);
204
205 match transfer_session(&mut session.session, &mut tcp_bridge, HOPR_TCP_BUFFER_SIZE, None).await {
206 Ok((session_to_stream_bytes, stream_to_session_bytes)) => tracing::info!(
207 ?session_id,
208 session_to_stream_bytes,
209 stream_to_session_bytes,
210 %tcp_target,
211 "server bridged session to TCP ended"
212 ),
213 Err(error) => tracing::error!(
214 ?session_id,
215 %tcp_target,
216 %error,
217 "TCP server stream is closed"
218 ),
219 }
220 });
221
222 Ok(())
223 }
224 hopr_lib::SessionTarget::ExitNode(SERVICE_ID_LOOPBACK) => {
225 tracing::debug!(?session_id, "bridging the session to the loopback service");
226 let (mut reader, mut writer) = tokio::io::split(session.session);
227
228 #[cfg(all(feature = "prometheus", not(test)))]
229 let _g = hopr_metrics::MultiGaugeGuard::new(&METRIC_ACTIVE_TARGETS, &["loopback"], 1.0);
230
231 match tokio::io::copy(&mut reader, &mut writer).await {
233 Ok(copied) => tracing::info!(?session_id, copied, "server loopback session service ended"),
234 Err(error) => tracing::error!(
235 ?session_id,
236 %error,
237 "server loopback session service ended with an error"
238 ),
239 }
240
241 Ok(())
242 }
243 hopr_lib::SessionTarget::ExitNode(_) => Err(HoprLibError::GeneralError(
244 "server does not support internal session processing".into(),
245 )),
246 }
247 }
248}