From efb05de7e24665b79af2191257a9363eae07e40a Mon Sep 17 00:00:00 2001 From: Aram Peres Date: Sun, 9 Jan 2022 00:51:09 -0500 Subject: [PATCH] WIP: Remote port forwarding (close, but outgoing packets are going to the wrong place) --- src/config.rs | 10 ++++++- src/main.rs | 31 ++++++++++++++++++++- src/tunnel/mod.rs | 18 ++++++++++++ src/tunnel/udp.rs | 27 +++++++++++++++++- src/virtual_iface/udp.rs | 59 ++++++++++++++++++++++++++++------------ 5 files changed, 124 insertions(+), 21 deletions(-) diff --git a/src/config.rs b/src/config.rs index 0b60b23..99731e7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -468,7 +468,15 @@ impl PortForwardConfig { impl Display for PortForwardConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}:{}:{}", self.source, self.destination, self.protocol) + if self.remote { + write!( + f, + "(remote){}:{}:{}", + self.source, self.destination, self.protocol + ) + } else { + write!(f, "{}:{}:{}", self.source, self.destination, self.protocol) + } } } diff --git a/src/main.rs b/src/main.rs index 918f9a3..dab7b16 100644 --- a/src/main.rs +++ b/src/main.rs @@ -99,7 +99,13 @@ async fn main() -> anyhow::Result<()> { // Start UDP Virtual Interface let port_forwards = config.port_forwards.clone(); - let iface = UdpVirtualInterface::new(port_forwards, bus, config.source_peer_ip); + let remote_port_forwards = config.remote_port_forwards.clone(); + let iface = UdpVirtualInterface::new( + port_forwards, + remote_port_forwards, + bus, + config.source_peer_ip, + ); tokio::spawn(async move { iface.poll_loop(device).await }); } @@ -127,6 +133,29 @@ async fn main() -> anyhow::Result<()> { }); } + { + let remote_port_forwards = config.remote_port_forwards; + + remote_port_forwards + .into_iter() + .map(|pf| { + ( + pf, + wg.clone(), + tcp_port_pool.clone(), + udp_port_pool.clone(), + bus.clone(), + ) + }) + .for_each(move |(pf, wg, tcp_port_pool, udp_port_pool, bus)| { + tokio::spawn(async move { + tunnel::remote_port_forward(pf, tcp_port_pool, udp_port_pool, wg, bus) + .await + .unwrap_or_else(|e| error!("Remote port-forward failed for {} : {}", pf, e)) + }); + }); + } + futures::future::pending().await } diff --git a/src/tunnel/mod.rs b/src/tunnel/mod.rs index eadf8b0..4d52fd7 100644 --- a/src/tunnel/mod.rs +++ b/src/tunnel/mod.rs @@ -32,3 +32,21 @@ pub async fn port_forward( PortProtocol::Udp => udp::udp_proxy_server(port_forward, udp_port_pool, bus).await, } } + +pub async fn remote_port_forward( + port_forward: PortForwardConfig, + _tcp_port_pool: TcpPortPool, + udp_port_pool: UdpPortPool, + wg: Arc, + bus: Bus, +) -> anyhow::Result<()> { + info!( + "Remote Tunneling {} [{}]<-[{}] (via [{}])", + port_forward.protocol, port_forward.destination, port_forward.source, &wg.endpoint, + ); + + match port_forward.protocol { + PortProtocol::Tcp => Ok(()), // TODO: Remote TCP forwarding + PortProtocol::Udp => udp::udp_proxy_server(port_forward, udp_port_pool, bus).await, + } +} diff --git a/src/tunnel/udp.rs b/src/tunnel/udp.rs index 76005a9..0c70497 100644 --- a/src/tunnel/udp.rs +++ b/src/tunnel/udp.rs @@ -1,6 +1,7 @@ use std::collections::{HashMap, VecDeque}; use std::net::{IpAddr, SocketAddr}; use std::ops::Range; +use std::str::FromStr; use std::sync::Arc; use std::time::Instant; @@ -34,7 +35,22 @@ pub async fn udp_proxy_server( bus: Bus, ) -> anyhow::Result<()> { let mut endpoint = bus.new_endpoint(); - let socket = UdpSocket::bind(port_forward.source) + + // Remote port forwards bind on localhost. Regular port forwards bind on the given source. + let bind = if port_forward.remote { + port_pool + .reserve(port_forward.source.port(), port_forward.destination) + .await + .with_context(|| "Failed to assign virtual port for remote UDP port forward")?; + match port_forward.source.ip() { + IpAddr::V4(_) => SocketAddr::from((IpAddr::from_str("0.0.0.0").unwrap(), 0)), + IpAddr::V6(_) => SocketAddr::from((IpAddr::from_str("[::]").unwrap(), 0)), + } + } else { + port_forward.source + }; + + let socket = UdpSocket::bind(bind) .await .with_context(|| "Failed to bind on UDP proxy address")?; @@ -61,6 +77,7 @@ pub async fn udp_proxy_server( event = endpoint.recv() => { if let Event::RemoteData(port, data) = event { if let Some(peer) = port_pool.get_peer_addr(port).await { + trace!("Sending {} bytes to real client ({}->{})", data.len(), socket.local_addr().unwrap(), peer); if let Err(e) = socket.send_to(&data, peer).await { error!( "[{}] Failed to send UDP datagram to real client ({}): {:?}", @@ -139,6 +156,14 @@ impl UdpPortPool { } } + /// Takes the given port out of the pool, marking it with the given peer address, for an unlimited amount of time. + pub async fn reserve(&self, port: u16, peer_addr: SocketAddr) -> anyhow::Result { + let mut inner = self.inner.write().await; + inner.port_by_peer_addr.insert(peer_addr, port); + inner.peer_addr_by_port.insert(port, peer_addr); + Ok(VirtualPort::new(port, PortProtocol::Udp)) + } + /// Requests a free port from the pool. An error is returned if none is available (exhausted max capacity). pub async fn next(&self, peer_addr: SocketAddr) -> anyhow::Result { // A port found to be reused. This is outside of the block because the read lock cannot be upgraded to a write lock. diff --git a/src/virtual_iface/udp.rs b/src/virtual_iface/udp.rs index 67b227f..6c9e894 100644 --- a/src/virtual_iface/udp.rs +++ b/src/virtual_iface/udp.rs @@ -1,6 +1,6 @@ use anyhow::Context; use std::collections::{HashMap, HashSet, VecDeque}; -use std::net::IpAddr; +use std::net::{IpAddr, SocketAddr}; use crate::events::Event; use crate::{Bus, PortProtocol}; @@ -19,18 +19,28 @@ const MAX_PACKET: usize = 65536; pub struct UdpVirtualInterface { source_peer_ip: IpAddr, port_forwards: Vec, + remote_port_forwards: Vec, bus: Bus, } impl UdpVirtualInterface { /// Initialize the parameters for a new virtual interface. /// Use the `poll_loop()` future to start the virtual interface poll loop. - pub fn new(port_forwards: Vec, bus: Bus, source_peer_ip: IpAddr) -> Self { + pub fn new( + port_forwards: Vec, + remote_port_forwards: Vec, + bus: Bus, + source_peer_ip: IpAddr, + ) -> Self { Self { port_forwards: port_forwards .into_iter() .filter(|f| matches!(f.protocol, PortProtocol::Udp)) .collect(), + remote_port_forwards: remote_port_forwards + .into_iter() + .filter(|f| matches!(f.protocol, PortProtocol::Udp)) + .collect(), source_peer_ip, bus, } @@ -114,14 +124,28 @@ impl VirtualInterfacePoll for UdpVirtualInterface { let mut port_client_handle_map: HashMap = HashMap::new(); // Data packets to send from a virtual client - let mut send_queue: HashMap)>> = - HashMap::new(); + let mut send_queue: HashMap)>> = HashMap::new(); + + // Create sockets for remote port forwards + for remote_port_forward in self.remote_port_forwards.iter() { + let virtual_port = + VirtualPort::new(remote_port_forward.source.port(), PortProtocol::Udp); + let client_socket = UdpVirtualInterface::new_client_socket( + remote_port_forward.source.ip(), + virtual_port, + )?; + let client_handle = iface.add_socket(client_socket); + port_client_handle_map.insert(virtual_port, client_handle); + send_queue.insert(virtual_port, VecDeque::new()); + } + + let mut wake = false; loop { tokio::select! { - _ = match (next_poll, port_client_handle_map.len()) { - (None, 0) => tokio::time::sleep(Duration::MAX), - (None, _) => tokio::time::sleep(Duration::ZERO), + _ = match (next_poll, wake) { + (None, false) => tokio::time::sleep(Duration::MAX), + (None, true) => tokio::time::sleep(Duration::ZERO), (Some(until), _) => tokio::time::sleep_until(until), } => { let loop_start = smoltcp::time::Instant::now(); @@ -134,17 +158,16 @@ impl VirtualInterfacePoll for UdpVirtualInterface { _ => {} } - // Find client socket send data to for (virtual_port, client_handle) in port_client_handle_map.iter() { let client_socket = iface.get_socket::(*client_handle); if client_socket.can_send() { if let Some(send_queue) = send_queue.get_mut(virtual_port) { let to_transfer = send_queue.pop_front(); - if let Some((port_forward, data)) = to_transfer { + if let Some((destination, data)) = to_transfer { client_socket .send_slice( &data, - (IpAddress::from(port_forward.destination.ip()), port_forward.destination.port()).into(), + (IpAddress::from(destination.ip()), destination.port()).into(), ) .unwrap_or_else(|e| { error!( @@ -155,15 +178,11 @@ impl VirtualInterfacePoll for UdpVirtualInterface { } } } - } - - // Find client socket recv data from - for (virtual_port, client_handle) in port_client_handle_map.iter() { - let client_socket = iface.get_socket::(*client_handle); if client_socket.can_recv() { match client_socket.recv() { - Ok((data, _peer)) => { + Ok((data, peer)) => { if !data.is_empty() { + trace!("notifying remote data from peer: {}", peer); endpoint.send(Event::RemoteData(*virtual_port, data.to_vec())); } } @@ -189,9 +208,11 @@ impl VirtualInterfacePoll for UdpVirtualInterface { event = endpoint.recv() => { match event { Event::LocalData(port_forward, virtual_port, data) => { + let destination = port_forward.destination; + if let Some(send_queue) = send_queue.get_mut(&virtual_port) { // Client socket already exists - send_queue.push_back((port_forward, data)); + send_queue.push_back((destination, data)); } else { // Client socket does not exist let client_socket = UdpVirtualInterface::new_client_socket(self.source_peer_ip, virtual_port)?; @@ -199,12 +220,14 @@ impl VirtualInterfacePoll for UdpVirtualInterface { // Add handle to map port_client_handle_map.insert(virtual_port, client_handle); - send_queue.insert(virtual_port, VecDeque::from(vec![(port_forward, data)])); + send_queue.insert(virtual_port, VecDeque::from(vec![(destination, data)])); } next_poll = None; + wake = true; } Event::VirtualDeviceFed(protocol) if protocol == PortProtocol::Udp => { next_poll = None; + wake = true; } _ => {} }