hoprd/
exit.rs

1use hopr_lib::errors::HoprLibError;
2use hopr_lib::{transfer_session, HoprOffchainKeypair, ServiceId};
3use hopr_network_types::prelude::ForeignDataMode;
4use hopr_network_types::udp::UdpStreamParallelism;
5use hoprd_api::{HOPR_TCP_BUFFER_SIZE, HOPR_UDP_BUFFER_SIZE, HOPR_UDP_QUEUE_SIZE};
6use std::net::SocketAddr;
7
8use crate::config::SessionIpForwardingConfig;
9
10#[cfg(all(feature = "prometheus", not(test)))]
11lazy_static::lazy_static! {
12    static ref METRIC_ACTIVE_TARGETS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
13        "hopr_session_hoprd_target_connections",
14        "Number of currently active HOPR session target connections on this Exit node",
15        &["type"]
16    ).unwrap();
17}
18
19/// Implementation of [`hopr_lib::HoprSessionReactor`] that facilitates
20/// bridging of TCP or UDP sockets from the Session Exit node to a destination.
21#[derive(Debug, Clone)]
22pub struct HoprServerIpForwardingReactor {
23    keypair: HoprOffchainKeypair,
24    cfg: SessionIpForwardingConfig,
25}
26
27impl HoprServerIpForwardingReactor {
28    pub fn new(keypair: HoprOffchainKeypair, cfg: SessionIpForwardingConfig) -> Self {
29        Self { keypair, cfg }
30    }
31
32    fn all_ips_allowed(&self, addrs: &[SocketAddr]) -> bool {
33        if self.cfg.use_target_allow_list {
34            for addr in addrs {
35                if !self.cfg.target_allow_list.contains(addr) {
36                    tracing::error!(%addr, "address not allowed by the target allow list, denying the target");
37                    return false;
38                }
39                tracing::debug!(%addr, "address allowed by the target allow list, accepting the target");
40            }
41        }
42        true
43    }
44}
45
46pub const SERVICE_ID_LOOPBACK: ServiceId = 0;
47
48#[hopr_lib::async_trait]
49impl hopr_lib::HoprSessionReactor for HoprServerIpForwardingReactor {
50    #[tracing::instrument(level = "debug", skip(self, session))]
51    async fn process(&self, mut session: hopr_lib::HoprIncomingSession) -> hopr_lib::errors::Result<()> {
52        let session_id = *session.session.id();
53        match session.target {
54            hopr_lib::SessionTarget::UdpStream(udp_target) => {
55                let udp_target = udp_target
56                    .unseal(&self.keypair)
57                    .map_err(|e| HoprLibError::GeneralError(format!("cannot unseal target: {e}")))?;
58
59                tracing::debug!(
60                    session_id = ?session_id,
61                    %udp_target,
62                    "binding socket to the UDP server"
63                );
64
65                // In UDP, it is impossible to determine if the target is viable,
66                // so we just take the first resolved address.
67                let resolved_udp_target = udp_target
68                    .clone()
69                    .resolve_tokio()
70                    .await
71                    .map_err(|e| HoprLibError::GeneralError(format!("failed to resolve DNS name {udp_target}: {e}")))?
72                    .first()
73                    .ok_or(HoprLibError::GeneralError(format!(
74                        "failed to resolve DNS name {udp_target}"
75                    )))?
76                    .to_owned();
77                tracing::debug!(
78                    ?session_id,
79                    %udp_target,
80                    resolution = ?resolved_udp_target,
81                    "UDP target resolved"
82                );
83
84                if !self.all_ips_allowed(&[resolved_udp_target]) {
85                    return Err(HoprLibError::GeneralError(format!(
86                        "denied target address {resolved_udp_target}"
87                    )));
88                }
89
90                let mut udp_bridge = hopr_network_types::udp::ConnectedUdpStream::builder()
91                    .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
92                    .with_counterparty(resolved_udp_target)
93                    .with_foreign_data_mode(ForeignDataMode::Error)
94                    .with_queue_size(HOPR_UDP_QUEUE_SIZE)
95                    .with_receiver_parallelism(UdpStreamParallelism::Auto)
96                    .build(("0.0.0.0", 0))
97                    .map_err(|e| {
98                        HoprLibError::GeneralError(format!(
99                            "could not bridge the incoming session to {udp_target}: {e}"
100                        ))
101                    })?;
102
103                tracing::debug!(
104                    ?session_id,
105                    %udp_target,
106                    "bridging the session to the UDP server"
107                );
108
109                tokio::task::spawn(async move {
110                    #[cfg(all(feature = "prometheus", not(test)))]
111                    METRIC_ACTIVE_TARGETS.increment(&["udp"], 1.0);
112
113                    match transfer_session(&mut session.session, &mut udp_bridge, HOPR_UDP_BUFFER_SIZE).await {
114                        Ok((session_to_stream_bytes, stream_to_session_bytes)) => tracing::info!(
115                            ?session_id,
116                            session_to_stream_bytes,
117                            stream_to_session_bytes,
118                            %udp_target,
119                            "server bridged session to UDP ended"
120                        ),
121                        Err(e) => tracing::error!(
122                            ?session_id,
123                            %udp_target,
124                            error = %e,
125                            "UDP server stream is closed"
126                        ),
127                    }
128
129                    #[cfg(all(feature = "prometheus", not(test)))]
130                    METRIC_ACTIVE_TARGETS.decrement(&["udp"], 1.0);
131                });
132
133                Ok(())
134            }
135            hopr_lib::SessionTarget::TcpStream(tcp_target) => {
136                let tcp_target = tcp_target
137                    .unseal(&self.keypair)
138                    .map_err(|e| HoprLibError::GeneralError(format!("cannot unseal target: {e}")))?;
139
140                tracing::debug!(?session_id, %tcp_target, "creating a connection to the TCP server");
141
142                // TCP is able to determine which of the resolved multiple addresses is viable,
143                // and therefore we can pass all of them.
144                let resolved_tcp_targets =
145                    tcp_target.clone().resolve_tokio().await.map_err(|e| {
146                        HoprLibError::GeneralError(format!("failed to resolve DNS name {tcp_target}: {e}"))
147                    })?;
148                tracing::debug!(
149                    ?session_id,
150                    %tcp_target,
151                    resolution = ?resolved_tcp_targets,
152                    "TCP target resolved"
153                );
154
155                if !self.all_ips_allowed(&resolved_tcp_targets) {
156                    return Err(HoprLibError::GeneralError(format!(
157                        "denied target address {resolved_tcp_targets:?}"
158                    )));
159                }
160
161                let strategy = tokio_retry::strategy::FixedInterval::new(self.cfg.tcp_target_retry_delay)
162                    .take(self.cfg.max_tcp_target_retries as usize);
163
164                let mut tcp_bridge = tokio_retry::Retry::spawn(strategy, || {
165                    tokio::net::TcpStream::connect(resolved_tcp_targets.as_slice())
166                })
167                .await
168                .map_err(|e| {
169                    HoprLibError::GeneralError(format!("could not bridge the incoming session to {tcp_target}: {e}"))
170                })?;
171
172                tcp_bridge.set_nodelay(true).map_err(|e| {
173                    HoprLibError::GeneralError(format!(
174                        "could not set the TCP_NODELAY option for the bridged session to {tcp_target}: {e}",
175                    ))
176                })?;
177
178                tracing::debug!(
179                    ?session_id,
180                    %tcp_target,
181                    "bridging the session to the TCP server"
182                );
183                tokio::task::spawn(async move {
184                    #[cfg(all(feature = "prometheus", not(test)))]
185                    METRIC_ACTIVE_TARGETS.increment(&["tcp"], 1.0);
186
187                    match transfer_session(&mut session.session, &mut tcp_bridge, HOPR_TCP_BUFFER_SIZE).await {
188                        Ok((session_to_stream_bytes, stream_to_session_bytes)) => tracing::info!(
189                            ?session_id,
190                            session_to_stream_bytes,
191                            stream_to_session_bytes,
192                            %tcp_target,
193                            "server bridged session to TCP ended"
194                        ),
195                        Err(error) => tracing::error!(
196                            ?session_id,
197                            %tcp_target,
198                            %error,
199                            "TCP server stream is closed"
200                        ),
201                    }
202
203                    #[cfg(all(feature = "prometheus", not(test)))]
204                    METRIC_ACTIVE_TARGETS.decrement(&["tcp"], 1.0);
205                });
206
207                Ok(())
208            }
209            hopr_lib::SessionTarget::ExitNode(SERVICE_ID_LOOPBACK) => {
210                tracing::debug!(?session_id, "bridging the session to the loopback service");
211                let (mut reader, mut writer) = tokio::io::split(session.session);
212
213                #[cfg(all(feature = "prometheus", not(test)))]
214                METRIC_ACTIVE_TARGETS.increment(&["loopback"], 1.0);
215
216                // Uses 4 kB buffer for copying
217                match tokio::io::copy(&mut reader, &mut writer).await {
218                    Ok(copied) => tracing::info!(?session_id, copied, "server loopback session service ended"),
219                    Err(error) => tracing::error!(
220                        ?session_id,
221                        %error,
222                        "server loopback session service ended with an error"
223                    ),
224                }
225
226                #[cfg(all(feature = "prometheus", not(test)))]
227                METRIC_ACTIVE_TARGETS.decrement(&["loopback"], 1.0);
228
229                Ok(())
230            }
231            hopr_lib::SessionTarget::ExitNode(_) => Err(HoprLibError::GeneralError(
232                "server does not support internal session processing".into(),
233            )),
234        }
235    }
236}