1use hopr_lib::errors::HoprLibError;
2use hopr_lib::{transfer_session, HoprOffchainKeypair, ServiceId};
3use hopr_network_types::prelude::ForeignDataMode;
4use hopr_network_types::udp::UdpStreamParallelism;
5use hoprd_api::{HOPR_TCP_BUFFER_SIZE, HOPR_UDP_BUFFER_SIZE, HOPR_UDP_QUEUE_SIZE};
6use std::net::SocketAddr;
7
8use crate::config::SessionIpForwardingConfig;
9
10#[cfg(all(feature = "prometheus", not(test)))]
11lazy_static::lazy_static! {
12 static ref METRIC_ACTIVE_TARGETS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
13 "hopr_session_hoprd_target_connections",
14 "Number of currently active HOPR session target connections on this Exit node",
15 &["type"]
16 ).unwrap();
17}
18
19#[derive(Debug, Clone)]
22pub struct HoprServerIpForwardingReactor {
23 keypair: HoprOffchainKeypair,
24 cfg: SessionIpForwardingConfig,
25}
26
27impl HoprServerIpForwardingReactor {
28 pub fn new(keypair: HoprOffchainKeypair, cfg: SessionIpForwardingConfig) -> Self {
29 Self { keypair, cfg }
30 }
31
32 fn all_ips_allowed(&self, addrs: &[SocketAddr]) -> bool {
33 if self.cfg.use_target_allow_list {
34 for addr in addrs {
35 if !self.cfg.target_allow_list.contains(addr) {
36 tracing::error!(%addr, "address not allowed by the target allow list, denying the target");
37 return false;
38 }
39 tracing::debug!(%addr, "address allowed by the target allow list, accepting the target");
40 }
41 }
42 true
43 }
44}
45
46pub const SERVICE_ID_LOOPBACK: ServiceId = 0;
47
48#[hopr_lib::async_trait]
49impl hopr_lib::HoprSessionReactor for HoprServerIpForwardingReactor {
50 #[tracing::instrument(level = "debug", skip(self, session))]
51 async fn process(&self, mut session: hopr_lib::HoprIncomingSession) -> hopr_lib::errors::Result<()> {
52 let session_id = *session.session.id();
53 match session.target {
54 hopr_lib::SessionTarget::UdpStream(udp_target) => {
55 let udp_target = udp_target
56 .unseal(&self.keypair)
57 .map_err(|e| HoprLibError::GeneralError(format!("cannot unseal target: {e}")))?;
58
59 tracing::debug!(
60 session_id = ?session_id,
61 %udp_target,
62 "binding socket to the UDP server"
63 );
64
65 let resolved_udp_target = udp_target
68 .clone()
69 .resolve_tokio()
70 .await
71 .map_err(|e| HoprLibError::GeneralError(format!("failed to resolve DNS name {udp_target}: {e}")))?
72 .first()
73 .ok_or(HoprLibError::GeneralError(format!(
74 "failed to resolve DNS name {udp_target}"
75 )))?
76 .to_owned();
77 tracing::debug!(
78 ?session_id,
79 %udp_target,
80 resolution = ?resolved_udp_target,
81 "UDP target resolved"
82 );
83
84 if !self.all_ips_allowed(&[resolved_udp_target]) {
85 return Err(HoprLibError::GeneralError(format!(
86 "denied target address {resolved_udp_target}"
87 )));
88 }
89
90 let mut udp_bridge = hopr_network_types::udp::ConnectedUdpStream::builder()
91 .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
92 .with_counterparty(resolved_udp_target)
93 .with_foreign_data_mode(ForeignDataMode::Error)
94 .with_queue_size(HOPR_UDP_QUEUE_SIZE)
95 .with_receiver_parallelism(UdpStreamParallelism::Auto)
96 .build(("0.0.0.0", 0))
97 .map_err(|e| {
98 HoprLibError::GeneralError(format!(
99 "could not bridge the incoming session to {udp_target}: {e}"
100 ))
101 })?;
102
103 tracing::debug!(
104 ?session_id,
105 %udp_target,
106 "bridging the session to the UDP server"
107 );
108
109 tokio::task::spawn(async move {
110 #[cfg(all(feature = "prometheus", not(test)))]
111 METRIC_ACTIVE_TARGETS.increment(&["udp"], 1.0);
112
113 match transfer_session(&mut session.session, &mut udp_bridge, HOPR_UDP_BUFFER_SIZE).await {
114 Ok((session_to_stream_bytes, stream_to_session_bytes)) => tracing::info!(
115 ?session_id,
116 session_to_stream_bytes,
117 stream_to_session_bytes,
118 %udp_target,
119 "server bridged session to UDP ended"
120 ),
121 Err(e) => tracing::error!(
122 ?session_id,
123 %udp_target,
124 error = %e,
125 "UDP server stream is closed"
126 ),
127 }
128
129 #[cfg(all(feature = "prometheus", not(test)))]
130 METRIC_ACTIVE_TARGETS.decrement(&["udp"], 1.0);
131 });
132
133 Ok(())
134 }
135 hopr_lib::SessionTarget::TcpStream(tcp_target) => {
136 let tcp_target = tcp_target
137 .unseal(&self.keypair)
138 .map_err(|e| HoprLibError::GeneralError(format!("cannot unseal target: {e}")))?;
139
140 tracing::debug!(?session_id, %tcp_target, "creating a connection to the TCP server");
141
142 let resolved_tcp_targets =
145 tcp_target.clone().resolve_tokio().await.map_err(|e| {
146 HoprLibError::GeneralError(format!("failed to resolve DNS name {tcp_target}: {e}"))
147 })?;
148 tracing::debug!(
149 ?session_id,
150 %tcp_target,
151 resolution = ?resolved_tcp_targets,
152 "TCP target resolved"
153 );
154
155 if !self.all_ips_allowed(&resolved_tcp_targets) {
156 return Err(HoprLibError::GeneralError(format!(
157 "denied target address {resolved_tcp_targets:?}"
158 )));
159 }
160
161 let strategy = tokio_retry::strategy::FixedInterval::new(self.cfg.tcp_target_retry_delay)
162 .take(self.cfg.max_tcp_target_retries as usize);
163
164 let mut tcp_bridge = tokio_retry::Retry::spawn(strategy, || {
165 tokio::net::TcpStream::connect(resolved_tcp_targets.as_slice())
166 })
167 .await
168 .map_err(|e| {
169 HoprLibError::GeneralError(format!("could not bridge the incoming session to {tcp_target}: {e}"))
170 })?;
171
172 tcp_bridge.set_nodelay(true).map_err(|e| {
173 HoprLibError::GeneralError(format!(
174 "could not set the TCP_NODELAY option for the bridged session to {tcp_target}: {e}",
175 ))
176 })?;
177
178 tracing::debug!(
179 ?session_id,
180 %tcp_target,
181 "bridging the session to the TCP server"
182 );
183 tokio::task::spawn(async move {
184 #[cfg(all(feature = "prometheus", not(test)))]
185 METRIC_ACTIVE_TARGETS.increment(&["tcp"], 1.0);
186
187 match transfer_session(&mut session.session, &mut tcp_bridge, HOPR_TCP_BUFFER_SIZE).await {
188 Ok((session_to_stream_bytes, stream_to_session_bytes)) => tracing::info!(
189 ?session_id,
190 session_to_stream_bytes,
191 stream_to_session_bytes,
192 %tcp_target,
193 "server bridged session to TCP ended"
194 ),
195 Err(error) => tracing::error!(
196 ?session_id,
197 %tcp_target,
198 %error,
199 "TCP server stream is closed"
200 ),
201 }
202
203 #[cfg(all(feature = "prometheus", not(test)))]
204 METRIC_ACTIVE_TARGETS.decrement(&["tcp"], 1.0);
205 });
206
207 Ok(())
208 }
209 hopr_lib::SessionTarget::ExitNode(SERVICE_ID_LOOPBACK) => {
210 tracing::debug!(?session_id, "bridging the session to the loopback service");
211 let (mut reader, mut writer) = tokio::io::split(session.session);
212
213 #[cfg(all(feature = "prometheus", not(test)))]
214 METRIC_ACTIVE_TARGETS.increment(&["loopback"], 1.0);
215
216 match tokio::io::copy(&mut reader, &mut writer).await {
218 Ok(copied) => tracing::info!(?session_id, copied, "server loopback session service ended"),
219 Err(error) => tracing::error!(
220 ?session_id,
221 %error,
222 "server loopback session service ended with an error"
223 ),
224 }
225
226 #[cfg(all(feature = "prometheus", not(test)))]
227 METRIC_ACTIVE_TARGETS.decrement(&["loopback"], 1.0);
228
229 Ok(())
230 }
231 hopr_lib::SessionTarget::ExitNode(_) => Err(HoprLibError::GeneralError(
232 "server does not support internal session processing".into(),
233 )),
234 }
235 }
236}