From 4abb4f9b1ea7b498bdbca7c30c9f21b57dbf56d7 Mon Sep 17 00:00:00 2001 From: Aram Peres Date: Sat, 8 Jan 2022 22:00:36 -0500 Subject: [PATCH] WIP: Remote port forwarding --- src/config.rs | 1 - src/lib.rs | 35 +++++++++++++++++++++++++++++- src/tunnel/mod.rs | 18 ++++++++++++++++ src/tunnel/udp.rs | 26 ++++++++++++++++++++++- src/virtual_iface/udp.rs | 46 ++++++++++++++++++++++++++++++++++------ 5 files changed, 116 insertions(+), 10 deletions(-) diff --git a/src/config.rs b/src/config.rs index 411efa3..97dedd2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -13,7 +13,6 @@ const DEFAULT_PORT_FORWARD_SOURCE: &str = "127.0.0.1"; #[derive(Clone)] pub struct Config { pub port_forwards: Vec, - #[allow(dead_code)] pub remote_port_forwards: Vec, pub private_key: Arc, pub endpoint_public_key: Arc, diff --git a/src/lib.rs b/src/lib.rs index a76fa18..b2fe9e1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -82,6 +82,10 @@ pub async fn start_tunnels(config: Config, bus: Bus) -> anyhow::Result<()> { .port_forwards .iter() .any(|pf| pf.protocol == PortProtocol::Udp) + || config + .remote_port_forwards + .iter() + .any(|pf| pf.protocol == PortProtocol::Udp) { // UDP device let bus = bus.clone(); @@ -90,7 +94,13 @@ pub async fn start_tunnels(config: Config, bus: Bus) -> anyhow::Result<()> { // Start UDP Virtual Interface let port_forwards = config.port_forwards.clone(); - let iface = UdpVirtualInterface::new(port_forwards, bus, config.source_peer_ip); + let remote_port_forwards = config.remote_port_forwards.clone(); + let iface = UdpVirtualInterface::new( + port_forwards, + remote_port_forwards, + bus, + config.source_peer_ip, + ); tokio::spawn(async move { iface.poll_loop(device).await }); } @@ -118,5 +128,28 @@ pub async fn start_tunnels(config: Config, bus: Bus) -> anyhow::Result<()> { }); } + { + let remote_port_forwards = config.remote_port_forwards; + + remote_port_forwards + .into_iter() + .map(|pf| { + ( + pf, + wg.clone(), + tcp_port_pool.clone(), + udp_port_pool.clone(), + bus.clone(), + ) + }) + .for_each(move |(pf, wg, tcp_port_pool, udp_port_pool, bus)| { + tokio::spawn(async move { + tunnel::remote_port_forward(pf, tcp_port_pool, udp_port_pool, wg, bus) + .await + .unwrap_or_else(|e| error!("Remote port-forward failed for {} : {}", pf, e)) + }); + }); + } + Ok(()) } diff --git a/src/tunnel/mod.rs b/src/tunnel/mod.rs index eadf8b0..4d52fd7 100644 --- a/src/tunnel/mod.rs +++ b/src/tunnel/mod.rs @@ -32,3 +32,21 @@ pub async fn port_forward( PortProtocol::Udp => udp::udp_proxy_server(port_forward, udp_port_pool, bus).await, } } + +pub async fn remote_port_forward( + port_forward: PortForwardConfig, + _tcp_port_pool: TcpPortPool, + udp_port_pool: UdpPortPool, + wg: Arc, + bus: Bus, +) -> anyhow::Result<()> { + info!( + "Remote Tunneling {} [{}]<-[{}] (via [{}])", + port_forward.protocol, port_forward.destination, port_forward.source, &wg.endpoint, + ); + + match port_forward.protocol { + PortProtocol::Tcp => Ok(()), // TODO: Remote TCP forwarding + PortProtocol::Udp => udp::udp_proxy_server(port_forward, udp_port_pool, bus).await, + } +} diff --git a/src/tunnel/udp.rs b/src/tunnel/udp.rs index ab52dc7..aebcf04 100644 --- a/src/tunnel/udp.rs +++ b/src/tunnel/udp.rs @@ -1,6 +1,7 @@ use std::collections::{HashMap, VecDeque}; use std::net::{IpAddr, SocketAddr}; use std::ops::Range; +use std::str::FromStr; use std::sync::Arc; use std::time::Instant; @@ -35,7 +36,22 @@ pub async fn udp_proxy_server( bus: Bus, ) -> anyhow::Result<()> { let mut endpoint = bus.new_endpoint(); - let socket = UdpSocket::bind(port_forward.source) + + // Remote port forwards bind on localhost. Regular port forwards bind on the given source. + let bind = if port_forward.remote { + port_pool + .reserve(port_forward.source.port(), port_forward.destination) + .await + .with_context(|| "Failed to assign virtual port for remote UDP port forward")?; + match port_forward.source.ip() { + IpAddr::V4(_) => SocketAddr::from((IpAddr::from_str("0.0.0.0").unwrap(), 0)), + IpAddr::V6(_) => SocketAddr::from((IpAddr::from_str("[::]").unwrap(), 0)), + } + } else { + port_forward.source + }; + + let socket = UdpSocket::bind(bind) .await .context("Failed to bind on UDP proxy address")?; @@ -156,6 +172,14 @@ impl UdpPortPool { } } + /// Takes the given port out of the pool, marking it with the given peer address, for an unlimited amount of time. + pub async fn reserve(&self, port: u16, peer_addr: SocketAddr) -> anyhow::Result { + let mut inner = self.inner.write().await; + inner.port_by_peer_addr.insert(peer_addr, port); + inner.peer_addr_by_port.insert(port, peer_addr); + Ok(VirtualPort::new(port, PortProtocol::Udp)) + } + /// Requests a free port from the pool. An error is returned if none is available (exhausted max capacity). pub async fn next(&self, peer_addr: SocketAddr) -> anyhow::Result { // A port found to be reused. This is outside of the block because the read lock cannot be upgraded to a write lock. diff --git a/src/virtual_iface/udp.rs b/src/virtual_iface/udp.rs index 3ca4c2d..e146f44 100644 --- a/src/virtual_iface/udp.rs +++ b/src/virtual_iface/udp.rs @@ -23,6 +23,7 @@ const MAX_PACKET: usize = 65536; pub struct UdpVirtualInterface { source_peer_ip: IpAddr, port_forwards: Vec, + remote_port_forwards: Vec, bus: Bus, sockets: SocketSet<'static>, } @@ -30,12 +31,21 @@ pub struct UdpVirtualInterface { impl UdpVirtualInterface { /// Initialize the parameters for a new virtual interface. /// Use the `poll_loop()` future to start the virtual interface poll loop. - pub fn new(port_forwards: Vec, bus: Bus, source_peer_ip: IpAddr) -> Self { + pub fn new( + port_forwards: Vec, + remote_port_forwards: Vec, + bus: Bus, + source_peer_ip: IpAddr, + ) -> Self { Self { port_forwards: port_forwards .into_iter() .filter(|f| matches!(f.protocol, PortProtocol::Udp)) .collect(), + remote_port_forwards: remote_port_forwards + .into_iter() + .filter(|f| matches!(f.protocol, PortProtocol::Udp)) + .collect(), source_peer_ip, bus, sockets: SocketSet::new([]), @@ -129,11 +139,30 @@ impl VirtualInterfacePoll for UdpVirtualInterface { let mut send_queue: HashMap> = HashMap::new(); + // Create sockets for remote port forwards + for remote_port_forward in self.remote_port_forwards.iter() { + let virtual_port = + VirtualPort::new(remote_port_forward.source.port(), PortProtocol::Udp); + let client_socket = UdpVirtualInterface::new_client_socket( + remote_port_forward.source.ip(), + virtual_port, + )?; + debug!( + "Created remote client socket: {:?}", + client_socket.endpoint() + ); + let client_handle = self.sockets.add(client_socket); + port_client_handle_map.insert(virtual_port, client_handle); + send_queue.insert(virtual_port, VecDeque::new()); + } + + let mut wake = false; + loop { tokio::select! { - _ = match (next_poll, port_client_handle_map.len()) { - (None, 0) => tokio::time::sleep(Duration::MAX), - (None, _) => tokio::time::sleep(Duration::ZERO), + _ = match (next_poll, wake) { + (None, false) => tokio::time::sleep(Duration::MAX), + (None, true) => tokio::time::sleep(Duration::ZERO), (Some(until), _) => tokio::time::sleep_until(until), } => { let loop_start = smoltcp::time::Instant::now(); @@ -147,11 +176,11 @@ impl VirtualInterfacePoll for UdpVirtualInterface { if client_socket.can_send() { if let Some(send_queue) = send_queue.get_mut(virtual_port) { let to_transfer = send_queue.pop_front(); - if let Some((port_forward, data)) = to_transfer { + if let Some((destination, data)) = to_transfer { client_socket .send_slice( &data, - UdpMetadata::from(port_forward.destination), + UdpMetadata::from(destination.destination), ) .unwrap_or_else(|e| { error!( @@ -164,8 +193,9 @@ impl VirtualInterfacePoll for UdpVirtualInterface { } if client_socket.can_recv() { match client_socket.recv() { - Ok((data, _peer)) => { + Ok((data, peer)) => { if !data.is_empty() { + trace!("notifying remote data from peer: {}", peer); endpoint.send(Event::RemoteData(*virtual_port, data.to_vec().into())); } } @@ -204,9 +234,11 @@ impl VirtualInterfacePoll for UdpVirtualInterface { send_queue.insert(virtual_port, VecDeque::from(vec![(port_forward, data)])); } next_poll = None; + wake = true; } Event::VirtualDeviceFed(PortProtocol::Udp) => { next_poll = None; + wake = true; } _ => {} }