diff --git a/README.md b/README.md index f652574..1defff3 100644 --- a/README.md +++ b/README.md @@ -74,7 +74,7 @@ local port, say `127.0.0.1:8080`, that will tunnel through WireGuard to reach th You'll then see this log: ``` -INFO onetun > Tunnelling [127.0.0.1:8080]->[192.168.4.2:8080] (via [140.30.3.182:51820] as peer 192.168.4.3) +INFO onetun > Tunneling [127.0.0.1:8080]->[192.168.4.2:8080] (via [140.30.3.182:51820] as peer 192.168.4.3) ``` Which means you can now access the port locally! diff --git a/src/main.rs b/src/main.rs index 28846fa..b5da3ad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,28 +1,22 @@ #[macro_use] extern crate log; -use std::net::IpAddr; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use anyhow::Context; -use tokio::net::{TcpListener, TcpStream}; -use crate::config::{Config, PortForwardConfig, PortProtocol}; +use crate::config::Config; use crate::port_pool::PortPool; -use crate::virtual_iface::tcp::TcpVirtualInterface; -use crate::virtual_iface::VirtualInterfacePoll; use crate::wg::WireGuardTunnel; pub mod config; pub mod ip_sink; pub mod port_pool; +pub mod tunnel; pub mod virtual_device; pub mod virtual_iface; pub mod wg; -pub const MAX_PACKET: usize = 65536; - #[tokio::main] async fn main() -> anyhow::Result<()> { let config = Config::from_args().with_context(|| "Failed to read config")?; @@ -63,7 +57,7 @@ async fn main() -> anyhow::Result<()> { std::thread::spawn(move || { let cpu_pool = tokio::runtime::Runtime::new().unwrap(); cpu_pool.block_on(async move { - port_forward(pf, source_peer_ip, port_pool, wg) + tunnel::port_forward(pf, source_peer_ip, port_pool, wg) .await .unwrap_or_else(|e| error!("Port-forward failed for {} : {}", pf, e)) }); @@ -74,212 +68,6 @@ async fn main() -> anyhow::Result<()> { futures::future::pending().await } -async fn port_forward( - port_forward: PortForwardConfig, - source_peer_ip: IpAddr, - port_pool: Arc, - wg: Arc, -) -> anyhow::Result<()> { - info!( - "Tunnelling {} [{}]->[{}] (via [{}] as peer {})", - port_forward.protocol, - port_forward.source, - port_forward.destination, - &wg.endpoint, - source_peer_ip - ); - - match port_forward.protocol { - PortProtocol::Tcp => tcp_proxy_server(port_forward, port_pool, wg).await, - PortProtocol::Udp => Err(anyhow::anyhow!("UDP isn't supported just yet.")), - } -} - -/// Starts the server that listens on TCP connections. -async fn tcp_proxy_server( - port_forward: PortForwardConfig, - port_pool: Arc, - wg: Arc, -) -> anyhow::Result<()> { - let listener = TcpListener::bind(port_forward.source) - .await - .with_context(|| "Failed to listen on TCP proxy server")?; - - loop { - let wg = wg.clone(); - let port_pool = port_pool.clone(); - let (socket, peer_addr) = listener - .accept() - .await - .with_context(|| "Failed to accept connection on TCP proxy server")?; - - // Assign a 'virtual port': this is a unique port number used to route IP packets - // received from the WireGuard tunnel. It is the port number that the virtual client will - // listen on. - let virtual_port = match port_pool.next() { - Ok(port) => port, - Err(e) => { - error!( - "Failed to assign virtual port number for connection [{}]: {:?}", - peer_addr, e - ); - continue; - } - }; - - info!("[{}] Incoming connection from {}", virtual_port, peer_addr); - - tokio::spawn(async move { - let port_pool = Arc::clone(&port_pool); - let result = - handle_tcp_proxy_connection(socket, virtual_port, port_forward, wg.clone()).await; - - if let Err(e) = result { - error!( - "[{}] Connection dropped un-gracefully: {:?}", - virtual_port, e - ); - } else { - info!("[{}] Connection closed by client", virtual_port); - } - - // Release port when connection drops - wg.release_virtual_interface(virtual_port); - port_pool.release(virtual_port); - }); - } -} - -/// Handles a new TCP connection with its assigned virtual port. -async fn handle_tcp_proxy_connection( - socket: TcpStream, - virtual_port: u16, - port_forward: PortForwardConfig, - wg: Arc, -) -> anyhow::Result<()> { - // Abort signal for stopping the Virtual Interface - let abort = Arc::new(AtomicBool::new(false)); - - // Signals that the Virtual Client is ready to send data - let (virtual_client_ready_tx, virtual_client_ready_rx) = tokio::sync::oneshot::channel::<()>(); - - // data_to_real_client_(tx/rx): This task reads the data from this mpsc channel to send back - // to the real client. - let (data_to_real_client_tx, mut data_to_real_client_rx) = tokio::sync::mpsc::channel(1_000); - - // data_to_real_server_(tx/rx): This task sends the data received from the real client to the - // virtual interface (virtual server socket). - let (data_to_virtual_server_tx, data_to_virtual_server_rx) = tokio::sync::mpsc::channel(1_000); - - // Spawn virtual interface - { - let abort = abort.clone(); - let virtual_interface = TcpVirtualInterface::new( - virtual_port, - port_forward, - wg, - abort.clone(), - data_to_real_client_tx, - data_to_virtual_server_rx, - virtual_client_ready_tx, - ); - - tokio::spawn(async move { - virtual_interface.poll_loop().await.unwrap_or_else(|e| { - error!("Virtual interface poll loop failed unexpectedly: {}", e); - abort.store(true, Ordering::Relaxed); - }) - }); - } - - // Wait for virtual client to be ready. - virtual_client_ready_rx - .await - .with_context(|| "Virtual client dropped before being ready.")?; - trace!("[{}] Virtual client is ready to send data", virtual_port); - - loop { - tokio::select! { - readable_result = socket.readable() => { - match readable_result { - Ok(_) => { - // Buffer for the individual TCP segment. - let mut buffer = Vec::with_capacity(MAX_PACKET); - match socket.try_read_buf(&mut buffer) { - Ok(size) if size > 0 => { - let data = &buffer[..size]; - debug!( - "[{}] Read {} bytes of TCP data from real client", - virtual_port, size - ); - if let Err(e) = data_to_virtual_server_tx.send(data.to_vec()).await { - error!( - "[{}] Failed to dispatch data to virtual interface: {:?}", - virtual_port, e - ); - } - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - continue; - } - Err(e) => { - error!( - "[{}] Failed to read from client TCP socket: {:?}", - virtual_port, e - ); - break; - } - _ => { - break; - } - } - } - Err(e) => { - error!("[{}] Failed to check if readable: {:?}", virtual_port, e); - break; - } - } - } - data_recv_result = data_to_real_client_rx.recv() => { - match data_recv_result { - Some(data) => match socket.try_write(&data) { - Ok(size) => { - debug!( - "[{}] Wrote {} bytes of TCP data to real client", - virtual_port, size - ); - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - if abort.load(Ordering::Relaxed) { - break; - } else { - continue; - } - } - Err(e) => { - error!( - "[{}] Failed to write to client TCP socket: {:?}", - virtual_port, e - ); - } - }, - None => { - if abort.load(Ordering::Relaxed) { - break; - } else { - continue; - } - }, - } - } - } - } - - trace!("[{}] TCP socket handler task terminated", virtual_port); - abort.store(true, Ordering::Relaxed); - Ok(()) -} - fn init_logger(config: &Config) -> anyhow::Result<()> { let mut builder = pretty_env_logger::formatted_builder(); builder.parse_filters(&config.log); diff --git a/src/tunnel/mod.rs b/src/tunnel/mod.rs new file mode 100644 index 0000000..7eab856 --- /dev/null +++ b/src/tunnel/mod.rs @@ -0,0 +1,29 @@ +use std::net::IpAddr; +use std::sync::Arc; + +use crate::config::{PortForwardConfig, PortProtocol}; +use crate::port_pool::PortPool; +use crate::wg::WireGuardTunnel; + +mod tcp; + +pub async fn port_forward( + port_forward: PortForwardConfig, + source_peer_ip: IpAddr, + port_pool: Arc, + wg: Arc, +) -> anyhow::Result<()> { + info!( + "Tunneling {} [{}]->[{}] (via [{}] as peer {})", + port_forward.protocol, + port_forward.source, + port_forward.destination, + &wg.endpoint, + source_peer_ip + ); + + match port_forward.protocol { + PortProtocol::Tcp => tcp::tcp_proxy_server(port_forward, port_pool, wg).await, + PortProtocol::Udp => Err(anyhow::anyhow!("UDP isn't supported just yet.")), + } +} diff --git a/src/tunnel/tcp.rs b/src/tunnel/tcp.rs new file mode 100644 index 0000000..987aa5e --- /dev/null +++ b/src/tunnel/tcp.rs @@ -0,0 +1,196 @@ +use crate::config::PortForwardConfig; +use crate::port_pool::PortPool; +use crate::virtual_iface::tcp::TcpVirtualInterface; +use crate::virtual_iface::VirtualInterfacePoll; +use crate::wg::WireGuardTunnel; +use anyhow::Context; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::net::{TcpListener, TcpStream}; + +const MAX_PACKET: usize = 65536; + +/// Starts the server that listens on TCP connections. +pub async fn tcp_proxy_server( + port_forward: PortForwardConfig, + port_pool: Arc, + wg: Arc, +) -> anyhow::Result<()> { + let listener = TcpListener::bind(port_forward.source) + .await + .with_context(|| "Failed to listen on TCP proxy server")?; + + loop { + let wg = wg.clone(); + let port_pool = port_pool.clone(); + let (socket, peer_addr) = listener + .accept() + .await + .with_context(|| "Failed to accept connection on TCP proxy server")?; + + // Assign a 'virtual port': this is a unique port number used to route IP packets + // received from the WireGuard tunnel. It is the port number that the virtual client will + // listen on. + let virtual_port = match port_pool.next() { + Ok(port) => port, + Err(e) => { + error!( + "Failed to assign virtual port number for connection [{}]: {:?}", + peer_addr, e + ); + continue; + } + }; + + info!("[{}] Incoming connection from {}", virtual_port, peer_addr); + + tokio::spawn(async move { + let port_pool = Arc::clone(&port_pool); + let result = + handle_tcp_proxy_connection(socket, virtual_port, port_forward, wg.clone()).await; + + if let Err(e) = result { + error!( + "[{}] Connection dropped un-gracefully: {:?}", + virtual_port, e + ); + } else { + info!("[{}] Connection closed by client", virtual_port); + } + + // Release port when connection drops + wg.release_virtual_interface(virtual_port); + port_pool.release(virtual_port); + }); + } +} + +/// Handles a new TCP connection with its assigned virtual port. +async fn handle_tcp_proxy_connection( + socket: TcpStream, + virtual_port: u16, + port_forward: PortForwardConfig, + wg: Arc, +) -> anyhow::Result<()> { + // Abort signal for stopping the Virtual Interface + let abort = Arc::new(AtomicBool::new(false)); + + // Signals that the Virtual Client is ready to send data + let (virtual_client_ready_tx, virtual_client_ready_rx) = tokio::sync::oneshot::channel::<()>(); + + // data_to_real_client_(tx/rx): This task reads the data from this mpsc channel to send back + // to the real client. + let (data_to_real_client_tx, mut data_to_real_client_rx) = tokio::sync::mpsc::channel(1_000); + + // data_to_real_server_(tx/rx): This task sends the data received from the real client to the + // virtual interface (virtual server socket). + let (data_to_virtual_server_tx, data_to_virtual_server_rx) = tokio::sync::mpsc::channel(1_000); + + // Spawn virtual interface + { + let abort = abort.clone(); + let virtual_interface = TcpVirtualInterface::new( + virtual_port, + port_forward, + wg, + abort.clone(), + data_to_real_client_tx, + data_to_virtual_server_rx, + virtual_client_ready_tx, + ); + + tokio::spawn(async move { + virtual_interface.poll_loop().await.unwrap_or_else(|e| { + error!("Virtual interface poll loop failed unexpectedly: {}", e); + abort.store(true, Ordering::Relaxed); + }) + }); + } + + // Wait for virtual client to be ready. + virtual_client_ready_rx + .await + .with_context(|| "Virtual client dropped before being ready.")?; + trace!("[{}] Virtual client is ready to send data", virtual_port); + + loop { + tokio::select! { + readable_result = socket.readable() => { + match readable_result { + Ok(_) => { + // Buffer for the individual TCP segment. + let mut buffer = Vec::with_capacity(MAX_PACKET); + match socket.try_read_buf(&mut buffer) { + Ok(size) if size > 0 => { + let data = &buffer[..size]; + debug!( + "[{}] Read {} bytes of TCP data from real client", + virtual_port, size + ); + if let Err(e) = data_to_virtual_server_tx.send(data.to_vec()).await { + error!( + "[{}] Failed to dispatch data to virtual interface: {:?}", + virtual_port, e + ); + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + error!( + "[{}] Failed to read from client TCP socket: {:?}", + virtual_port, e + ); + break; + } + _ => { + break; + } + } + } + Err(e) => { + error!("[{}] Failed to check if readable: {:?}", virtual_port, e); + break; + } + } + } + data_recv_result = data_to_real_client_rx.recv() => { + match data_recv_result { + Some(data) => match socket.try_write(&data) { + Ok(size) => { + debug!( + "[{}] Wrote {} bytes of TCP data to real client", + virtual_port, size + ); + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + if abort.load(Ordering::Relaxed) { + break; + } else { + continue; + } + } + Err(e) => { + error!( + "[{}] Failed to write to client TCP socket: {:?}", + virtual_port, e + ); + } + }, + None => { + if abort.load(Ordering::Relaxed) { + break; + } else { + continue; + } + }, + } + } + } + } + + trace!("[{}] TCP socket handler task terminated", virtual_port); + abort.store(true, Ordering::Relaxed); + Ok(()) +} diff --git a/src/wg.rs b/src/wg.rs index 3d3cc5f..498fb98 100644 --- a/src/wg.rs +++ b/src/wg.rs @@ -9,10 +9,10 @@ use tokio::net::UdpSocket; use tokio::sync::RwLock; use crate::config::Config; -use crate::MAX_PACKET; /// The capacity of the channel for received IP packets. const DISPATCH_CAPACITY: usize = 1_000; +const MAX_PACKET: usize = 65536; /// A WireGuard tunnel. Encapsulates and decapsulates IP packets /// to be sent to and received from a remote UDP endpoint.