use hopr_lib::errors::HoprLibError;
use hopr_lib::{transfer_session, HoprOffchainKeypair, ServiceId};
use hopr_network_types::prelude::ForeignDataMode;
use hopr_network_types::udp::UdpStreamParallelism;
use hoprd_api::{HOPR_TCP_BUFFER_SIZE, HOPR_UDP_BUFFER_SIZE, HOPR_UDP_QUEUE_SIZE};
use std::net::SocketAddr;
use crate::config::SessionIpForwardingConfig;
#[cfg(all(feature = "prometheus", not(test)))]
lazy_static::lazy_static! {
static ref METRIC_ACTIVE_TARGETS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
"hopr_session_hoprd_target_connections",
"Number of currently active HOPR session target connections on this Exit node",
&["type"]
).unwrap();
}
#[derive(Debug, Clone)]
pub struct HoprServerIpForwardingReactor {
keypair: HoprOffchainKeypair,
cfg: SessionIpForwardingConfig,
}
impl HoprServerIpForwardingReactor {
pub fn new(keypair: HoprOffchainKeypair, cfg: SessionIpForwardingConfig) -> Self {
Self { keypair, cfg }
}
fn all_ips_allowed(&self, addrs: &[SocketAddr]) -> bool {
addrs.iter().all(|addr| {
self.cfg
.target_allow_list
.as_ref()
.map_or(true, |list| list.contains(addr))
})
}
}
pub const SERVICE_ID_LOOPBACK: ServiceId = 0;
#[hopr_lib::async_trait]
impl hopr_lib::HoprSessionReactor for HoprServerIpForwardingReactor {
#[tracing::instrument(level = "debug", skip(self, session))]
async fn process(&self, mut session: hopr_lib::HoprIncomingSession) -> hopr_lib::errors::Result<()> {
let session_id = *session.session.id();
match session.target {
hopr_lib::SessionTarget::UdpStream(udp_target) => {
let udp_target = udp_target
.unseal(&self.keypair)
.map_err(|e| HoprLibError::GeneralError(format!("cannot unseal target: {e}")))?;
tracing::debug!(
session_id = ?session_id,
%udp_target,
"binding socket to the UDP server"
);
let resolved_udp_target = udp_target
.clone()
.resolve_tokio()
.await
.map_err(|e| HoprLibError::GeneralError(format!("failed to resolve DNS name {udp_target}: {e}")))?
.first()
.ok_or(HoprLibError::GeneralError(format!(
"failed to resolve DNS name {udp_target}"
)))?
.to_owned();
tracing::debug!(
?session_id,
%udp_target,
resolution = ?resolved_udp_target,
"UDP target resolved"
);
if !self.all_ips_allowed(&[resolved_udp_target]) {
return Err(HoprLibError::GeneralError(format!(
"denied target address {resolved_udp_target}"
)));
}
let mut udp_bridge = hopr_network_types::udp::ConnectedUdpStream::builder()
.with_buffer_size(HOPR_UDP_BUFFER_SIZE)
.with_counterparty(resolved_udp_target)
.with_foreign_data_mode(ForeignDataMode::Error)
.with_queue_size(HOPR_UDP_QUEUE_SIZE)
.with_receiver_parallelism(UdpStreamParallelism::Auto)
.build(("0.0.0.0", 0))
.map_err(|e| {
HoprLibError::GeneralError(format!(
"could not bridge the incoming session to {udp_target}: {e}"
))
})?;
tracing::debug!(
?session_id,
%udp_target,
"bridging the session to the UDP server"
);
tokio::task::spawn(async move {
#[cfg(all(feature = "prometheus", not(test)))]
METRIC_ACTIVE_TARGETS.increment(&["udp"], 1.0);
match transfer_session(&mut session.session, &mut udp_bridge, HOPR_UDP_BUFFER_SIZE).await {
Ok((session_to_stream_bytes, stream_to_session_bytes)) => tracing::info!(
?session_id,
session_to_stream_bytes,
stream_to_session_bytes,
%udp_target,
"server bridged session to UDP ended"
),
Err(e) => tracing::error!(
?session_id,
%udp_target,
error = %e,
"UDP server stream is closed"
),
}
#[cfg(all(feature = "prometheus", not(test)))]
METRIC_ACTIVE_TARGETS.decrement(&["udp"], 1.0);
});
Ok(())
}
hopr_lib::SessionTarget::TcpStream(tcp_target) => {
let tcp_target = tcp_target
.unseal(&self.keypair)
.map_err(|e| HoprLibError::GeneralError(format!("cannot unseal target: {e}")))?;
tracing::debug!(?session_id, %tcp_target, "creating a connection to the TCP server");
let resolved_tcp_targets =
tcp_target.clone().resolve_tokio().await.map_err(|e| {
HoprLibError::GeneralError(format!("failed to resolve DNS name {tcp_target}: {e}"))
})?;
tracing::debug!(
?session_id,
%tcp_target,
resolution = ?resolved_tcp_targets,
"TCP target resolved"
);
if !self.all_ips_allowed(&resolved_tcp_targets) {
return Err(HoprLibError::GeneralError(format!(
"denied target address {resolved_tcp_targets:?}"
)));
}
let strategy = tokio_retry::strategy::FixedInterval::new(self.cfg.tcp_target_retry_delay)
.take(self.cfg.max_tcp_target_retries as usize);
let mut tcp_bridge = tokio_retry::Retry::spawn(strategy, || {
tokio::net::TcpStream::connect(resolved_tcp_targets.as_slice())
})
.await
.map_err(|e| {
HoprLibError::GeneralError(format!("could not bridge the incoming session to {tcp_target}: {e}"))
})?;
tcp_bridge.set_nodelay(true).map_err(|e| {
HoprLibError::GeneralError(format!(
"could not set the TCP_NODELAY option for the bridged session to {tcp_target}: {e}",
))
})?;
tracing::debug!(
?session_id,
%tcp_target,
"bridging the session to the TCP server"
);
tokio::task::spawn(async move {
#[cfg(all(feature = "prometheus", not(test)))]
METRIC_ACTIVE_TARGETS.increment(&["tcp"], 1.0);
match transfer_session(&mut session.session, &mut tcp_bridge, HOPR_TCP_BUFFER_SIZE).await {
Ok((session_to_stream_bytes, stream_to_session_bytes)) => tracing::info!(
?session_id,
session_to_stream_bytes,
stream_to_session_bytes,
%tcp_target,
"server bridged session to TCP ended"
),
Err(error) => tracing::error!(
?session_id,
%tcp_target,
%error,
"TCP server stream is closed"
),
}
#[cfg(all(feature = "prometheus", not(test)))]
METRIC_ACTIVE_TARGETS.decrement(&["tcp"], 1.0);
});
Ok(())
}
hopr_lib::SessionTarget::ExitNode(SERVICE_ID_LOOPBACK) => {
tracing::debug!(?session_id, "bridging the session to the loopback service");
let (mut reader, mut writer) = tokio::io::split(session.session);
#[cfg(all(feature = "prometheus", not(test)))]
METRIC_ACTIVE_TARGETS.increment(&["loopback"], 1.0);
match tokio::io::copy(&mut reader, &mut writer).await {
Ok(copied) => tracing::info!(?session_id, copied, "server loopback session service ended"),
Err(error) => tracing::error!(
?session_id,
%error,
"server loopback session service ended with an error"
),
}
#[cfg(all(feature = "prometheus", not(test)))]
METRIC_ACTIVE_TARGETS.decrement(&["loopback"], 1.0);
Ok(())
}
hopr_lib::SessionTarget::ExitNode(_) => Err(HoprLibError::GeneralError(
"server does not support internal session processing".into(),
)),
}
}
}