WIP: Remote port forwarding (close, but outgoing packets are going to the wrong place)

This commit is contained in:
Aram 🍐 2022-01-09 00:51:09 -05:00
parent c19af07342
commit efb05de7e2
5 changed files with 124 additions and 21 deletions

View file

@ -468,8 +468,16 @@ impl PortForwardConfig {
impl Display for PortForwardConfig { impl Display for PortForwardConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if self.remote {
write!(
f,
"(remote){}:{}:{}",
self.source, self.destination, self.protocol
)
} else {
write!(f, "{}:{}:{}", self.source, self.destination, self.protocol) write!(f, "{}:{}:{}", self.source, self.destination, self.protocol)
} }
}
} }
/// Layer 7 protocols for ports. /// Layer 7 protocols for ports.

View file

@ -99,7 +99,13 @@ async fn main() -> anyhow::Result<()> {
// Start UDP Virtual Interface // Start UDP Virtual Interface
let port_forwards = config.port_forwards.clone(); let port_forwards = config.port_forwards.clone();
let iface = UdpVirtualInterface::new(port_forwards, bus, config.source_peer_ip); let remote_port_forwards = config.remote_port_forwards.clone();
let iface = UdpVirtualInterface::new(
port_forwards,
remote_port_forwards,
bus,
config.source_peer_ip,
);
tokio::spawn(async move { iface.poll_loop(device).await }); tokio::spawn(async move { iface.poll_loop(device).await });
} }
@ -127,6 +133,29 @@ async fn main() -> anyhow::Result<()> {
}); });
} }
{
let remote_port_forwards = config.remote_port_forwards;
remote_port_forwards
.into_iter()
.map(|pf| {
(
pf,
wg.clone(),
tcp_port_pool.clone(),
udp_port_pool.clone(),
bus.clone(),
)
})
.for_each(move |(pf, wg, tcp_port_pool, udp_port_pool, bus)| {
tokio::spawn(async move {
tunnel::remote_port_forward(pf, tcp_port_pool, udp_port_pool, wg, bus)
.await
.unwrap_or_else(|e| error!("Remote port-forward failed for {} : {}", pf, e))
});
});
}
futures::future::pending().await futures::future::pending().await
} }

View file

@ -32,3 +32,21 @@ pub async fn port_forward(
PortProtocol::Udp => udp::udp_proxy_server(port_forward, udp_port_pool, bus).await, PortProtocol::Udp => udp::udp_proxy_server(port_forward, udp_port_pool, bus).await,
} }
} }
pub async fn remote_port_forward(
port_forward: PortForwardConfig,
_tcp_port_pool: TcpPortPool,
udp_port_pool: UdpPortPool,
wg: Arc<WireGuardTunnel>,
bus: Bus,
) -> anyhow::Result<()> {
info!(
"Remote Tunneling {} [{}]<-[{}] (via [{}])",
port_forward.protocol, port_forward.destination, port_forward.source, &wg.endpoint,
);
match port_forward.protocol {
PortProtocol::Tcp => Ok(()), // TODO: Remote TCP forwarding
PortProtocol::Udp => udp::udp_proxy_server(port_forward, udp_port_pool, bus).await,
}
}

View file

@ -1,6 +1,7 @@
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::ops::Range; use std::ops::Range;
use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
@ -34,7 +35,22 @@ pub async fn udp_proxy_server(
bus: Bus, bus: Bus,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut endpoint = bus.new_endpoint(); let mut endpoint = bus.new_endpoint();
let socket = UdpSocket::bind(port_forward.source)
// Remote port forwards bind on localhost. Regular port forwards bind on the given source.
let bind = if port_forward.remote {
port_pool
.reserve(port_forward.source.port(), port_forward.destination)
.await
.with_context(|| "Failed to assign virtual port for remote UDP port forward")?;
match port_forward.source.ip() {
IpAddr::V4(_) => SocketAddr::from((IpAddr::from_str("0.0.0.0").unwrap(), 0)),
IpAddr::V6(_) => SocketAddr::from((IpAddr::from_str("[::]").unwrap(), 0)),
}
} else {
port_forward.source
};
let socket = UdpSocket::bind(bind)
.await .await
.with_context(|| "Failed to bind on UDP proxy address")?; .with_context(|| "Failed to bind on UDP proxy address")?;
@ -61,6 +77,7 @@ pub async fn udp_proxy_server(
event = endpoint.recv() => { event = endpoint.recv() => {
if let Event::RemoteData(port, data) = event { if let Event::RemoteData(port, data) = event {
if let Some(peer) = port_pool.get_peer_addr(port).await { if let Some(peer) = port_pool.get_peer_addr(port).await {
trace!("Sending {} bytes to real client ({}->{})", data.len(), socket.local_addr().unwrap(), peer);
if let Err(e) = socket.send_to(&data, peer).await { if let Err(e) = socket.send_to(&data, peer).await {
error!( error!(
"[{}] Failed to send UDP datagram to real client ({}): {:?}", "[{}] Failed to send UDP datagram to real client ({}): {:?}",
@ -139,6 +156,14 @@ impl UdpPortPool {
} }
} }
/// Takes the given port out of the pool, marking it with the given peer address, for an unlimited amount of time.
pub async fn reserve(&self, port: u16, peer_addr: SocketAddr) -> anyhow::Result<VirtualPort> {
let mut inner = self.inner.write().await;
inner.port_by_peer_addr.insert(peer_addr, port);
inner.peer_addr_by_port.insert(port, peer_addr);
Ok(VirtualPort::new(port, PortProtocol::Udp))
}
/// Requests a free port from the pool. An error is returned if none is available (exhausted max capacity). /// Requests a free port from the pool. An error is returned if none is available (exhausted max capacity).
pub async fn next(&self, peer_addr: SocketAddr) -> anyhow::Result<VirtualPort> { pub async fn next(&self, peer_addr: SocketAddr) -> anyhow::Result<VirtualPort> {
// A port found to be reused. This is outside of the block because the read lock cannot be upgraded to a write lock. // A port found to be reused. This is outside of the block because the read lock cannot be upgraded to a write lock.

View file

@ -1,6 +1,6 @@
use anyhow::Context; use anyhow::Context;
use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::{HashMap, HashSet, VecDeque};
use std::net::IpAddr; use std::net::{IpAddr, SocketAddr};
use crate::events::Event; use crate::events::Event;
use crate::{Bus, PortProtocol}; use crate::{Bus, PortProtocol};
@ -19,18 +19,28 @@ const MAX_PACKET: usize = 65536;
pub struct UdpVirtualInterface { pub struct UdpVirtualInterface {
source_peer_ip: IpAddr, source_peer_ip: IpAddr,
port_forwards: Vec<PortForwardConfig>, port_forwards: Vec<PortForwardConfig>,
remote_port_forwards: Vec<PortForwardConfig>,
bus: Bus, bus: Bus,
} }
impl UdpVirtualInterface { impl UdpVirtualInterface {
/// Initialize the parameters for a new virtual interface. /// Initialize the parameters for a new virtual interface.
/// Use the `poll_loop()` future to start the virtual interface poll loop. /// Use the `poll_loop()` future to start the virtual interface poll loop.
pub fn new(port_forwards: Vec<PortForwardConfig>, bus: Bus, source_peer_ip: IpAddr) -> Self { pub fn new(
port_forwards: Vec<PortForwardConfig>,
remote_port_forwards: Vec<PortForwardConfig>,
bus: Bus,
source_peer_ip: IpAddr,
) -> Self {
Self { Self {
port_forwards: port_forwards port_forwards: port_forwards
.into_iter() .into_iter()
.filter(|f| matches!(f.protocol, PortProtocol::Udp)) .filter(|f| matches!(f.protocol, PortProtocol::Udp))
.collect(), .collect(),
remote_port_forwards: remote_port_forwards
.into_iter()
.filter(|f| matches!(f.protocol, PortProtocol::Udp))
.collect(),
source_peer_ip, source_peer_ip,
bus, bus,
} }
@ -114,14 +124,28 @@ impl VirtualInterfacePoll for UdpVirtualInterface {
let mut port_client_handle_map: HashMap<VirtualPort, SocketHandle> = HashMap::new(); let mut port_client_handle_map: HashMap<VirtualPort, SocketHandle> = HashMap::new();
// Data packets to send from a virtual client // Data packets to send from a virtual client
let mut send_queue: HashMap<VirtualPort, VecDeque<(PortForwardConfig, Vec<u8>)>> = let mut send_queue: HashMap<VirtualPort, VecDeque<(SocketAddr, Vec<u8>)>> = HashMap::new();
HashMap::new();
// Create sockets for remote port forwards
for remote_port_forward in self.remote_port_forwards.iter() {
let virtual_port =
VirtualPort::new(remote_port_forward.source.port(), PortProtocol::Udp);
let client_socket = UdpVirtualInterface::new_client_socket(
remote_port_forward.source.ip(),
virtual_port,
)?;
let client_handle = iface.add_socket(client_socket);
port_client_handle_map.insert(virtual_port, client_handle);
send_queue.insert(virtual_port, VecDeque::new());
}
let mut wake = false;
loop { loop {
tokio::select! { tokio::select! {
_ = match (next_poll, port_client_handle_map.len()) { _ = match (next_poll, wake) {
(None, 0) => tokio::time::sleep(Duration::MAX), (None, false) => tokio::time::sleep(Duration::MAX),
(None, _) => tokio::time::sleep(Duration::ZERO), (None, true) => tokio::time::sleep(Duration::ZERO),
(Some(until), _) => tokio::time::sleep_until(until), (Some(until), _) => tokio::time::sleep_until(until),
} => { } => {
let loop_start = smoltcp::time::Instant::now(); let loop_start = smoltcp::time::Instant::now();
@ -134,17 +158,16 @@ impl VirtualInterfacePoll for UdpVirtualInterface {
_ => {} _ => {}
} }
// Find client socket send data to
for (virtual_port, client_handle) in port_client_handle_map.iter() { for (virtual_port, client_handle) in port_client_handle_map.iter() {
let client_socket = iface.get_socket::<UdpSocket>(*client_handle); let client_socket = iface.get_socket::<UdpSocket>(*client_handle);
if client_socket.can_send() { if client_socket.can_send() {
if let Some(send_queue) = send_queue.get_mut(virtual_port) { if let Some(send_queue) = send_queue.get_mut(virtual_port) {
let to_transfer = send_queue.pop_front(); let to_transfer = send_queue.pop_front();
if let Some((port_forward, data)) = to_transfer { if let Some((destination, data)) = to_transfer {
client_socket client_socket
.send_slice( .send_slice(
&data, &data,
(IpAddress::from(port_forward.destination.ip()), port_forward.destination.port()).into(), (IpAddress::from(destination.ip()), destination.port()).into(),
) )
.unwrap_or_else(|e| { .unwrap_or_else(|e| {
error!( error!(
@ -155,15 +178,11 @@ impl VirtualInterfacePoll for UdpVirtualInterface {
} }
} }
} }
}
// Find client socket recv data from
for (virtual_port, client_handle) in port_client_handle_map.iter() {
let client_socket = iface.get_socket::<UdpSocket>(*client_handle);
if client_socket.can_recv() { if client_socket.can_recv() {
match client_socket.recv() { match client_socket.recv() {
Ok((data, _peer)) => { Ok((data, peer)) => {
if !data.is_empty() { if !data.is_empty() {
trace!("notifying remote data from peer: {}", peer);
endpoint.send(Event::RemoteData(*virtual_port, data.to_vec())); endpoint.send(Event::RemoteData(*virtual_port, data.to_vec()));
} }
} }
@ -189,9 +208,11 @@ impl VirtualInterfacePoll for UdpVirtualInterface {
event = endpoint.recv() => { event = endpoint.recv() => {
match event { match event {
Event::LocalData(port_forward, virtual_port, data) => { Event::LocalData(port_forward, virtual_port, data) => {
let destination = port_forward.destination;
if let Some(send_queue) = send_queue.get_mut(&virtual_port) { if let Some(send_queue) = send_queue.get_mut(&virtual_port) {
// Client socket already exists // Client socket already exists
send_queue.push_back((port_forward, data)); send_queue.push_back((destination, data));
} else { } else {
// Client socket does not exist // Client socket does not exist
let client_socket = UdpVirtualInterface::new_client_socket(self.source_peer_ip, virtual_port)?; let client_socket = UdpVirtualInterface::new_client_socket(self.source_peer_ip, virtual_port)?;
@ -199,12 +220,14 @@ impl VirtualInterfacePoll for UdpVirtualInterface {
// Add handle to map // Add handle to map
port_client_handle_map.insert(virtual_port, client_handle); port_client_handle_map.insert(virtual_port, client_handle);
send_queue.insert(virtual_port, VecDeque::from(vec![(port_forward, data)])); send_queue.insert(virtual_port, VecDeque::from(vec![(destination, data)]));
} }
next_poll = None; next_poll = None;
wake = true;
} }
Event::VirtualDeviceFed(protocol) if protocol == PortProtocol::Udp => { Event::VirtualDeviceFed(protocol) if protocol == PortProtocol::Udp => {
next_poll = None; next_poll = None;
wake = true;
} }
_ => {} _ => {}
} }