Skip to main content

hoprd/
exit.rs

1use std::{net::SocketAddr, num::NonZeroUsize};
2
3use hopr_lib::{
4    OffchainKeypair, ServiceId,
5    errors::HoprLibError,
6    prelude::{ConnectedUdpStream, ForeignDataMode, UdpStreamParallelism},
7    transfer_session,
8};
9use hoprd_api::{HOPR_TCP_BUFFER_SIZE, HOPR_UDP_BUFFER_SIZE, HOPR_UDP_QUEUE_SIZE};
10
11use crate::config::SessionIpForwardingConfig;
12
13#[cfg(all(feature = "prometheus", not(test)))]
14lazy_static::lazy_static! {
15    static ref METRIC_ACTIVE_TARGETS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
16        "hopr_session_hoprd_target_connections",
17        "Number of currently active HOPR session target connections on this Exit node",
18        &["type"]
19    ).unwrap();
20}
21
22/// Implementation of [`hopr_lib::HoprSessionReactor`] that facilitates
23/// bridging of TCP or UDP sockets from the Session Exit node to a destination.
24#[derive(Debug, Clone)]
25pub struct HoprServerIpForwardingReactor {
26    keypair: OffchainKeypair,
27    cfg: SessionIpForwardingConfig,
28}
29
30impl HoprServerIpForwardingReactor {
31    pub fn new(keypair: OffchainKeypair, cfg: SessionIpForwardingConfig) -> Self {
32        Self { keypair, cfg }
33    }
34
35    fn all_ips_allowed(&self, addrs: &[SocketAddr]) -> bool {
36        if self.cfg.use_target_allow_list {
37            for addr in addrs {
38                if !self.cfg.target_allow_list.contains(addr) {
39                    tracing::error!(%addr, "address not allowed by the target allow list, denying the target");
40                    return false;
41                }
42                tracing::debug!(%addr, "address allowed by the target allow list, accepting the target");
43            }
44        }
45        true
46    }
47}
48
49pub const SERVICE_ID_LOOPBACK: ServiceId = 0;
50
51#[async_trait::async_trait]
52impl hopr_lib::traits::session::HoprSessionServer for HoprServerIpForwardingReactor {
53    #[tracing::instrument(level = "debug", skip(self, session))]
54    async fn process(
55        &self,
56        mut session: hopr_lib::exports::transport::IncomingSession,
57    ) -> hopr_lib::errors::Result<()> {
58        let session_id = *session.session.id();
59        match session.target {
60            hopr_lib::SessionTarget::UdpStream(udp_target) => {
61                let kp = self.keypair.clone();
62                let udp_target =
63                    hopr_lib::utils::parallelize::cpu::spawn_blocking(move || udp_target.unseal(&kp), "udp_unseal")
64                        .await
65                        .map_err(|e| HoprLibError::GeneralError(format!("failed to spawn unseal task: {e}")))?
66                        .map_err(|e| HoprLibError::GeneralError(format!("cannot unseal target: {e}")))?;
67
68                tracing::debug!(
69                    session_id = ?session_id,
70                    %udp_target,
71                    "binding socket to the UDP server"
72                );
73
74                // In UDP, it is impossible to determine if the target is viable,
75                // so we just take the first resolved address.
76                let resolved_udp_target = udp_target
77                    .clone()
78                    .resolve_tokio()
79                    .await
80                    .map_err(|e| HoprLibError::GeneralError(format!("failed to resolve DNS name {udp_target}: {e}")))?
81                    .first()
82                    .ok_or(HoprLibError::GeneralError(format!(
83                        "failed to resolve DNS name {udp_target}"
84                    )))?
85                    .to_owned();
86                tracing::debug!(
87                    ?session_id,
88                    %udp_target,
89                    resolution = ?resolved_udp_target,
90                    "UDP target resolved"
91                );
92
93                if !self.all_ips_allowed(&[resolved_udp_target]) {
94                    return Err(HoprLibError::GeneralError(format!(
95                        "denied target address {resolved_udp_target}"
96                    )));
97                }
98
99                let mut udp_bridge = ConnectedUdpStream::builder()
100                    .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
101                    .with_counterparty(resolved_udp_target)
102                    .with_foreign_data_mode(ForeignDataMode::Error)
103                    .with_queue_size(HOPR_UDP_QUEUE_SIZE)
104                    .with_receiver_parallelism(
105                        std::env::var("HOPRD_SESSION_EXIT_UDP_RX_PARALLELISM")
106                            .ok()
107                            .and_then(|s| s.parse::<NonZeroUsize>().ok())
108                            .map(UdpStreamParallelism::Specific)
109                            .unwrap_or(UdpStreamParallelism::Auto),
110                    )
111                    .build(("0.0.0.0", 0))
112                    .map_err(|e| {
113                        HoprLibError::GeneralError(format!(
114                            "could not bridge the incoming session to {udp_target}: {e}"
115                        ))
116                    })?;
117
118                tracing::debug!(
119                    ?session_id,
120                    %udp_target,
121                    "bridging the session to the UDP server"
122                );
123
124                tokio::task::spawn(async move {
125                    #[cfg(all(feature = "prometheus", not(test)))]
126                    let _g = hopr_metrics::MultiGaugeGuard::new(&METRIC_ACTIVE_TARGETS, &["udp"], 1.0);
127
128                    // The Session forwards the termination to the udp_bridge, terminating
129                    // the UDP socket.
130                    match transfer_session(&mut session.session, &mut udp_bridge, HOPR_UDP_BUFFER_SIZE, None).await {
131                        Ok((session_to_stream_bytes, stream_to_session_bytes)) => tracing::info!(
132                            ?session_id,
133                            session_to_stream_bytes,
134                            stream_to_session_bytes,
135                            %udp_target,
136                            "server bridged session to UDP ended"
137                        ),
138                        Err(e) => tracing::error!(
139                            ?session_id,
140                            %udp_target,
141                            error = %e,
142                            "UDP server stream is closed"
143                        ),
144                    }
145                });
146
147                Ok(())
148            }
149            hopr_lib::SessionTarget::TcpStream(tcp_target) => {
150                let kp = self.keypair.clone();
151                let tcp_target =
152                    hopr_lib::utils::parallelize::cpu::spawn_blocking(move || tcp_target.unseal(&kp), "tcp_unseal")
153                        .await
154                        .map_err(|e| HoprLibError::GeneralError(format!("failed to spawn unseal task: {e}")))?
155                        .map_err(|e| HoprLibError::GeneralError(format!("cannot unseal target: {e}")))?;
156
157                tracing::debug!(?session_id, %tcp_target, "creating a connection to the TCP server");
158
159                // TCP is able to determine which of the resolved multiple addresses is viable,
160                // and therefore we can pass all of them.
161                let resolved_tcp_targets =
162                    tcp_target.clone().resolve_tokio().await.map_err(|e| {
163                        HoprLibError::GeneralError(format!("failed to resolve DNS name {tcp_target}: {e}"))
164                    })?;
165                tracing::debug!(
166                    ?session_id,
167                    %tcp_target,
168                    resolution = ?resolved_tcp_targets,
169                    "TCP target resolved"
170                );
171
172                if !self.all_ips_allowed(&resolved_tcp_targets) {
173                    return Err(HoprLibError::GeneralError(format!(
174                        "denied target address {resolved_tcp_targets:?}"
175                    )));
176                }
177
178                let strategy = tokio_retry::strategy::FixedInterval::new(self.cfg.tcp_target_retry_delay)
179                    .take(self.cfg.max_tcp_target_retries as usize);
180
181                let mut tcp_bridge = tokio_retry::Retry::spawn(strategy, || {
182                    tokio::net::TcpStream::connect(resolved_tcp_targets.as_slice())
183                })
184                .await
185                .map_err(|e| {
186                    HoprLibError::GeneralError(format!("could not bridge the incoming session to {tcp_target}: {e}"))
187                })?;
188
189                tcp_bridge.set_nodelay(true).map_err(|e| {
190                    HoprLibError::GeneralError(format!(
191                        "could not set the TCP_NODELAY option for the bridged session to {tcp_target}: {e}",
192                    ))
193                })?;
194
195                tracing::debug!(
196                    ?session_id,
197                    %tcp_target,
198                    "bridging the session to the TCP server"
199                );
200
201                tokio::task::spawn(async move {
202                    #[cfg(all(feature = "prometheus", not(test)))]
203                    let _g = hopr_metrics::MultiGaugeGuard::new(&METRIC_ACTIVE_TARGETS, &["tcp"], 1.0);
204
205                    match transfer_session(&mut session.session, &mut tcp_bridge, HOPR_TCP_BUFFER_SIZE, None).await {
206                        Ok((session_to_stream_bytes, stream_to_session_bytes)) => tracing::info!(
207                            ?session_id,
208                            session_to_stream_bytes,
209                            stream_to_session_bytes,
210                            %tcp_target,
211                            "server bridged session to TCP ended"
212                        ),
213                        Err(error) => tracing::error!(
214                            ?session_id,
215                            %tcp_target,
216                            %error,
217                            "TCP server stream is closed"
218                        ),
219                    }
220                });
221
222                Ok(())
223            }
224            hopr_lib::SessionTarget::ExitNode(SERVICE_ID_LOOPBACK) => {
225                tracing::debug!(?session_id, "bridging the session to the loopback service");
226                let (mut reader, mut writer) = tokio::io::split(session.session);
227
228                #[cfg(all(feature = "prometheus", not(test)))]
229                let _g = hopr_metrics::MultiGaugeGuard::new(&METRIC_ACTIVE_TARGETS, &["loopback"], 1.0);
230
231                // Uses 4 kB buffer for copying
232                match tokio::io::copy(&mut reader, &mut writer).await {
233                    Ok(copied) => tracing::info!(?session_id, copied, "server loopback session service ended"),
234                    Err(error) => tracing::error!(
235                        ?session_id,
236                        %error,
237                        "server loopback session service ended with an error"
238                    ),
239                }
240
241                Ok(())
242            }
243            hopr_lib::SessionTarget::ExitNode(_) => Err(HoprLibError::GeneralError(
244                "server does not support internal session processing".into(),
245            )),
246        }
247    }
248}