Skip to main content

hopr_reference/
exit.rs

1use std::{net::SocketAddr, num::NonZeroUsize};
2
3use hopr_lib::{
4    OffchainKeypair, ServiceId,
5    errors::HoprLibError,
6    prelude::{ConnectedUdpStream, ForeignDataMode, UdpStreamParallelism},
7    transfer_session,
8};
9
10use crate::config::SessionIpForwardingConfig;
11
12#[cfg(all(feature = "telemetry", not(test)))]
13lazy_static::lazy_static! {
14    static ref METRIC_ACTIVE_TARGETS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
15        "hopr_session_hoprd_target_connections",
16        "Number of currently active HOPR session target connections on this Exit node",
17        &["type"]
18    ).unwrap();
19}
20
21/// Size of the buffer for forwarding data to/from a TCP stream.
22pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
23
24/// Size of the buffer for forwarding data to/from a UDP stream.
25pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
26
27/// Size of the queue (back-pressure) for data incoming from a UDP stream.
28pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
29
30/// Implementation of `HoprSessionServer` that facilitates
31/// bridging of TCP or UDP sockets from the Session Exit node to a destination.
32#[derive(Debug, Clone)]
33pub struct HoprServerIpForwardingReactor {
34    keypair: OffchainKeypair,
35    cfg: SessionIpForwardingConfig,
36}
37
38impl HoprServerIpForwardingReactor {
39    pub fn new(keypair: OffchainKeypair, cfg: SessionIpForwardingConfig) -> Self {
40        Self { keypair, cfg }
41    }
42
43    fn all_ips_allowed(&self, addrs: &[SocketAddr]) -> bool {
44        if self.cfg.use_target_allow_list {
45            for addr in addrs {
46                if !self.cfg.target_allow_list.contains(addr) {
47                    tracing::error!(%addr, "address not allowed by the target allow list, denying the target");
48                    return false;
49                }
50                tracing::debug!(%addr, "address allowed by the target allow list, accepting the target");
51            }
52        }
53        true
54    }
55}
56
57pub const SERVICE_ID_LOOPBACK: ServiceId = 0;
58
59#[async_trait::async_trait]
60impl hopr_lib::api::node::HoprSessionServer for HoprServerIpForwardingReactor {
61    type Error = hopr_lib::errors::HoprLibError;
62    type Session = hopr_lib::exports::transport::IncomingSession;
63
64    #[tracing::instrument(level = "debug", skip(self, session))]
65    async fn process(
66        &self,
67        mut session: hopr_lib::exports::transport::IncomingSession,
68    ) -> Result<(), hopr_lib::errors::HoprLibError> {
69        let session_id = *session.session.id();
70        match session.target {
71            hopr_lib::SessionTarget::UdpStream(udp_target) => {
72                let kp = self.keypair.clone();
73                let udp_target = hopr_lib::utils::hopr_parallelize::cpu::spawn_blocking(
74                    move || udp_target.unseal(&kp),
75                    "udp_unseal",
76                )
77                .await
78                .map_err(|e| HoprLibError::GeneralError(format!("failed to spawn unseal task: {e}")))?
79                .map_err(|e| HoprLibError::GeneralError(format!("cannot unseal target: {e}")))?;
80
81                tracing::debug!(
82                    session_id = ?session_id,
83                    %udp_target,
84                    "binding socket to the UDP server"
85                );
86
87                // In UDP, it is impossible to determine if the target is viable,
88                // so we just take the first resolved address.
89                let resolved_udp_target = udp_target
90                    .clone()
91                    .resolve_tokio()
92                    .await
93                    .map_err(|e| HoprLibError::GeneralError(format!("failed to resolve DNS name {udp_target}: {e}")))?
94                    .first()
95                    .ok_or(HoprLibError::GeneralError(format!(
96                        "failed to resolve DNS name {udp_target}"
97                    )))?
98                    .to_owned();
99                tracing::debug!(
100                    ?session_id,
101                    %udp_target,
102                    resolution = ?resolved_udp_target,
103                    "UDP target resolved"
104                );
105
106                if !self.all_ips_allowed(&[resolved_udp_target]) {
107                    return Err(HoprLibError::GeneralError(format!(
108                        "denied target address {resolved_udp_target}"
109                    )));
110                }
111
112                let mut udp_bridge = ConnectedUdpStream::builder()
113                    .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
114                    .with_counterparty(resolved_udp_target)
115                    .with_foreign_data_mode(ForeignDataMode::Error)
116                    .with_queue_size(HOPR_UDP_QUEUE_SIZE)
117                    .with_receiver_parallelism(
118                        std::env::var("HOPRD_SESSION_EXIT_UDP_RX_PARALLELISM")
119                            .ok()
120                            .and_then(|s| s.parse::<NonZeroUsize>().ok())
121                            .map(UdpStreamParallelism::Specific)
122                            .unwrap_or(UdpStreamParallelism::Auto),
123                    )
124                    .build(("0.0.0.0", 0))
125                    .map_err(|e| {
126                        HoprLibError::GeneralError(format!(
127                            "could not bridge the incoming session to {udp_target}: {e}"
128                        ))
129                    })?;
130
131                tracing::debug!(
132                    ?session_id,
133                    %udp_target,
134                    "bridging the session to the UDP server"
135                );
136
137                tokio::task::spawn(async move {
138                    #[cfg(all(feature = "telemetry", not(test)))]
139                    let _g = hopr_metrics::MultiGaugeGuard::new(&METRIC_ACTIVE_TARGETS, &["udp"], 1.0);
140
141                    // The Session forwards the termination to the udp_bridge, terminating
142                    // the UDP socket.
143                    match transfer_session(&mut session.session, &mut udp_bridge, HOPR_UDP_BUFFER_SIZE, None).await {
144                        Ok((session_to_stream_bytes, stream_to_session_bytes)) => tracing::info!(
145                            ?session_id,
146                            session_to_stream_bytes,
147                            stream_to_session_bytes,
148                            %udp_target,
149                            "server bridged session to UDP ended"
150                        ),
151                        Err(e) => tracing::error!(
152                            ?session_id,
153                            %udp_target,
154                            error = %e,
155                            "UDP server stream is closed"
156                        ),
157                    }
158                });
159
160                Ok(())
161            }
162            hopr_lib::SessionTarget::TcpStream(tcp_target) => {
163                let kp = self.keypair.clone();
164                let tcp_target = hopr_lib::utils::hopr_parallelize::cpu::spawn_blocking(
165                    move || tcp_target.unseal(&kp),
166                    "tcp_unseal",
167                )
168                .await
169                .map_err(|e| HoprLibError::GeneralError(format!("failed to spawn unseal task: {e}")))?
170                .map_err(|e| HoprLibError::GeneralError(format!("cannot unseal target: {e}")))?;
171
172                tracing::debug!(?session_id, %tcp_target, "creating a connection to the TCP server");
173
174                // TCP is able to determine which of the resolved multiple addresses is viable,
175                // and therefore we can pass all of them.
176                let resolved_tcp_targets =
177                    tcp_target.clone().resolve_tokio().await.map_err(|e| {
178                        HoprLibError::GeneralError(format!("failed to resolve DNS name {tcp_target}: {e}"))
179                    })?;
180                tracing::debug!(
181                    ?session_id,
182                    %tcp_target,
183                    resolution = ?resolved_tcp_targets,
184                    "TCP target resolved"
185                );
186
187                if !self.all_ips_allowed(&resolved_tcp_targets) {
188                    return Err(HoprLibError::GeneralError(format!(
189                        "denied target address {resolved_tcp_targets:?}"
190                    )));
191                }
192
193                let strategy = tokio_retry::strategy::FixedInterval::new(self.cfg.tcp_target_retry_delay)
194                    .take(self.cfg.max_tcp_target_retries as usize);
195
196                let mut tcp_bridge = tokio_retry::Retry::spawn(strategy, || {
197                    tokio::net::TcpStream::connect(resolved_tcp_targets.as_slice())
198                })
199                .await
200                .map_err(|e| {
201                    HoprLibError::GeneralError(format!("could not bridge the incoming session to {tcp_target}: {e}"))
202                })?;
203
204                tcp_bridge.set_nodelay(true).map_err(|e| {
205                    HoprLibError::GeneralError(format!(
206                        "could not set the TCP_NODELAY option for the bridged session to {tcp_target}: {e}",
207                    ))
208                })?;
209
210                tracing::debug!(
211                    ?session_id,
212                    %tcp_target,
213                    "bridging the session to the TCP server"
214                );
215
216                tokio::task::spawn(async move {
217                    #[cfg(all(feature = "telemetry", not(test)))]
218                    let _g = hopr_metrics::MultiGaugeGuard::new(&METRIC_ACTIVE_TARGETS, &["tcp"], 1.0);
219
220                    match transfer_session(&mut session.session, &mut tcp_bridge, HOPR_TCP_BUFFER_SIZE, None).await {
221                        Ok((session_to_stream_bytes, stream_to_session_bytes)) => tracing::info!(
222                            ?session_id,
223                            session_to_stream_bytes,
224                            stream_to_session_bytes,
225                            %tcp_target,
226                            "server bridged session to TCP ended"
227                        ),
228                        Err(error) => tracing::error!(
229                            ?session_id,
230                            %tcp_target,
231                            %error,
232                            "TCP server stream is closed"
233                        ),
234                    }
235                });
236
237                Ok(())
238            }
239            hopr_lib::SessionTarget::ExitNode(SERVICE_ID_LOOPBACK) => {
240                tracing::debug!(?session_id, "bridging the session to the loopback service");
241                let (mut reader, mut writer) = tokio::io::split(session.session);
242
243                #[cfg(all(feature = "telemetry", not(test)))]
244                let _g = hopr_metrics::MultiGaugeGuard::new(&METRIC_ACTIVE_TARGETS, &["loopback"], 1.0);
245
246                // Uses 4 kB buffer for copying
247                match tokio::io::copy(&mut reader, &mut writer).await {
248                    Ok(copied) => tracing::info!(?session_id, copied, "server loopback session service ended"),
249                    Err(error) => tracing::error!(
250                        ?session_id,
251                        %error,
252                        "server loopback session service ended with an error"
253                    ),
254                }
255
256                Ok(())
257            }
258            hopr_lib::SessionTarget::ExitNode(_) => Err(HoprLibError::GeneralError(
259                "server does not support internal session processing".into(),
260            )),
261        }
262    }
263}