hoprd/
exit.rs

1use std::{net::SocketAddr, num::NonZeroUsize};
2
3use hopr_lib::{HoprOffchainKeypair, ServiceId, errors::HoprLibError, transfer_session};
4use hopr_network_types::{prelude::ForeignDataMode, udp::UdpStreamParallelism};
5use hoprd_api::{HOPR_TCP_BUFFER_SIZE, HOPR_UDP_BUFFER_SIZE, HOPR_UDP_QUEUE_SIZE};
6
7use crate::config::SessionIpForwardingConfig;
8
9#[cfg(all(feature = "prometheus", not(test)))]
10lazy_static::lazy_static! {
11    static ref METRIC_ACTIVE_TARGETS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
12        "hopr_session_hoprd_target_connections",
13        "Number of currently active HOPR session target connections on this Exit node",
14        &["type"]
15    ).unwrap();
16}
17
18/// Implementation of [`hopr_lib::HoprSessionReactor`] that facilitates
19/// bridging of TCP or UDP sockets from the Session Exit node to a destination.
20#[derive(Debug, Clone)]
21pub struct HoprServerIpForwardingReactor {
22    keypair: HoprOffchainKeypair,
23    cfg: SessionIpForwardingConfig,
24}
25
26impl HoprServerIpForwardingReactor {
27    pub fn new(keypair: HoprOffchainKeypair, cfg: SessionIpForwardingConfig) -> Self {
28        Self { keypair, cfg }
29    }
30
31    fn all_ips_allowed(&self, addrs: &[SocketAddr]) -> bool {
32        if self.cfg.use_target_allow_list {
33            for addr in addrs {
34                if !self.cfg.target_allow_list.contains(addr) {
35                    tracing::error!(%addr, "address not allowed by the target allow list, denying the target");
36                    return false;
37                }
38                tracing::debug!(%addr, "address allowed by the target allow list, accepting the target");
39            }
40        }
41        true
42    }
43}
44
45pub const SERVICE_ID_LOOPBACK: ServiceId = 0;
46
47#[hopr_lib::async_trait]
48impl hopr_lib::HoprSessionReactor for HoprServerIpForwardingReactor {
49    #[tracing::instrument(level = "debug", skip(self, session))]
50    async fn process(&self, mut session: hopr_lib::HoprIncomingSession) -> hopr_lib::errors::Result<()> {
51        let session_id = *session.session.id();
52        match session.target {
53            hopr_lib::SessionTarget::UdpStream(udp_target) => {
54                let kp = self.keypair.clone();
55                let udp_target = hopr_parallelize::cpu::spawn_blocking(move || udp_target.unseal(&kp))
56                    .await
57                    .map_err(|e| HoprLibError::GeneralError(format!("cannot unseal target: {e}")))?;
58
59                tracing::debug!(
60                    session_id = ?session_id,
61                    %udp_target,
62                    "binding socket to the UDP server"
63                );
64
65                // In UDP, it is impossible to determine if the target is viable,
66                // so we just take the first resolved address.
67                let resolved_udp_target = udp_target
68                    .clone()
69                    .resolve_tokio()
70                    .await
71                    .map_err(|e| HoprLibError::GeneralError(format!("failed to resolve DNS name {udp_target}: {e}")))?
72                    .first()
73                    .ok_or(HoprLibError::GeneralError(format!(
74                        "failed to resolve DNS name {udp_target}"
75                    )))?
76                    .to_owned();
77                tracing::debug!(
78                    ?session_id,
79                    %udp_target,
80                    resolution = ?resolved_udp_target,
81                    "UDP target resolved"
82                );
83
84                if !self.all_ips_allowed(&[resolved_udp_target]) {
85                    return Err(HoprLibError::GeneralError(format!(
86                        "denied target address {resolved_udp_target}"
87                    )));
88                }
89
90                let mut udp_bridge = hopr_network_types::udp::ConnectedUdpStream::builder()
91                    .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
92                    .with_counterparty(resolved_udp_target)
93                    .with_foreign_data_mode(ForeignDataMode::Error)
94                    .with_queue_size(HOPR_UDP_QUEUE_SIZE)
95                    .with_receiver_parallelism(
96                        std::env::var("HOPRD_SESSION_EXIT_UDP_RX_PARALLELISM")
97                            .ok()
98                            .and_then(|s| s.parse::<NonZeroUsize>().ok())
99                            .map(UdpStreamParallelism::Specific)
100                            .unwrap_or(UdpStreamParallelism::Auto),
101                    )
102                    .build(("0.0.0.0", 0))
103                    .map_err(|e| {
104                        HoprLibError::GeneralError(format!(
105                            "could not bridge the incoming session to {udp_target}: {e}"
106                        ))
107                    })?;
108
109                tracing::debug!(
110                    ?session_id,
111                    %udp_target,
112                    "bridging the session to the UDP server"
113                );
114
115                tokio::task::spawn(async move {
116                    #[cfg(all(feature = "prometheus", not(test)))]
117                    METRIC_ACTIVE_TARGETS.increment(&["udp"], 1.0);
118
119                    // The Session forwards the termination to the udp_bridge, terminating
120                    // the UDP socket.
121                    match transfer_session(&mut session.session, &mut udp_bridge, HOPR_UDP_BUFFER_SIZE, None).await {
122                        Ok((session_to_stream_bytes, stream_to_session_bytes)) => tracing::info!(
123                            ?session_id,
124                            session_to_stream_bytes,
125                            stream_to_session_bytes,
126                            %udp_target,
127                            "server bridged session to UDP ended"
128                        ),
129                        Err(e) => tracing::error!(
130                            ?session_id,
131                            %udp_target,
132                            error = %e,
133                            "UDP server stream is closed"
134                        ),
135                    }
136
137                    #[cfg(all(feature = "prometheus", not(test)))]
138                    METRIC_ACTIVE_TARGETS.decrement(&["udp"], 1.0);
139                });
140
141                Ok(())
142            }
143            hopr_lib::SessionTarget::TcpStream(tcp_target) => {
144                let kp = self.keypair.clone();
145                let tcp_target = hopr_parallelize::cpu::spawn_blocking(move || tcp_target.unseal(&kp))
146                    .await
147                    .map_err(|e| HoprLibError::GeneralError(format!("cannot unseal target: {e}")))?;
148
149                tracing::debug!(?session_id, %tcp_target, "creating a connection to the TCP server");
150
151                // TCP is able to determine which of the resolved multiple addresses is viable,
152                // and therefore we can pass all of them.
153                let resolved_tcp_targets =
154                    tcp_target.clone().resolve_tokio().await.map_err(|e| {
155                        HoprLibError::GeneralError(format!("failed to resolve DNS name {tcp_target}: {e}"))
156                    })?;
157                tracing::debug!(
158                    ?session_id,
159                    %tcp_target,
160                    resolution = ?resolved_tcp_targets,
161                    "TCP target resolved"
162                );
163
164                if !self.all_ips_allowed(&resolved_tcp_targets) {
165                    return Err(HoprLibError::GeneralError(format!(
166                        "denied target address {resolved_tcp_targets:?}"
167                    )));
168                }
169
170                let strategy = tokio_retry::strategy::FixedInterval::new(self.cfg.tcp_target_retry_delay)
171                    .take(self.cfg.max_tcp_target_retries as usize);
172
173                let mut tcp_bridge = tokio_retry::Retry::spawn(strategy, || {
174                    tokio::net::TcpStream::connect(resolved_tcp_targets.as_slice())
175                })
176                .await
177                .map_err(|e| {
178                    HoprLibError::GeneralError(format!("could not bridge the incoming session to {tcp_target}: {e}"))
179                })?;
180
181                tcp_bridge.set_nodelay(true).map_err(|e| {
182                    HoprLibError::GeneralError(format!(
183                        "could not set the TCP_NODELAY option for the bridged session to {tcp_target}: {e}",
184                    ))
185                })?;
186
187                tracing::debug!(
188                    ?session_id,
189                    %tcp_target,
190                    "bridging the session to the TCP server"
191                );
192                tokio::task::spawn(async move {
193                    #[cfg(all(feature = "prometheus", not(test)))]
194                    METRIC_ACTIVE_TARGETS.increment(&["tcp"], 1.0);
195
196                    match transfer_session(&mut session.session, &mut tcp_bridge, HOPR_TCP_BUFFER_SIZE, None).await {
197                        Ok((session_to_stream_bytes, stream_to_session_bytes)) => tracing::info!(
198                            ?session_id,
199                            session_to_stream_bytes,
200                            stream_to_session_bytes,
201                            %tcp_target,
202                            "server bridged session to TCP ended"
203                        ),
204                        Err(error) => tracing::error!(
205                            ?session_id,
206                            %tcp_target,
207                            %error,
208                            "TCP server stream is closed"
209                        ),
210                    }
211
212                    #[cfg(all(feature = "prometheus", not(test)))]
213                    METRIC_ACTIVE_TARGETS.decrement(&["tcp"], 1.0);
214                });
215
216                Ok(())
217            }
218            hopr_lib::SessionTarget::ExitNode(SERVICE_ID_LOOPBACK) => {
219                tracing::debug!(?session_id, "bridging the session to the loopback service");
220                let (mut reader, mut writer) = tokio::io::split(session.session);
221
222                #[cfg(all(feature = "prometheus", not(test)))]
223                METRIC_ACTIVE_TARGETS.increment(&["loopback"], 1.0);
224
225                // Uses 4 kB buffer for copying
226                match tokio::io::copy(&mut reader, &mut writer).await {
227                    Ok(copied) => tracing::info!(?session_id, copied, "server loopback session service ended"),
228                    Err(error) => tracing::error!(
229                        ?session_id,
230                        %error,
231                        "server loopback session service ended with an error"
232                    ),
233                }
234
235                #[cfg(all(feature = "prometheus", not(test)))]
236                METRIC_ACTIVE_TARGETS.decrement(&["loopback"], 1.0);
237
238                Ok(())
239            }
240            hopr_lib::SessionTarget::ExitNode(_) => Err(HoprLibError::GeneralError(
241                "server does not support internal session processing".into(),
242            )),
243        }
244    }
245}