From 5debcca268d68bb4704ec8805ebbaa558732df00 Mon Sep 17 00:00:00 2001 From: Aram Peres Date: Sat, 16 Oct 2021 15:56:53 -0400 Subject: [PATCH] Fix numerous issues with fragmentation of large packets. --- .gitignore | 1 + Cargo.lock | 57 +++++++++++++++++++++++++++++ Cargo.toml | 2 + src/main.rs | 85 ++++++++++++++++++++++++++++++++++++------- src/port_pool.rs | 6 ++- src/virtual_device.rs | 2 +- 6 files changed, 137 insertions(+), 16 deletions(-) diff --git a/.gitignore b/.gitignore index 6adbcee..911ee60 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target /.idea .envrc +*.log diff --git a/Cargo.lock b/Cargo.lock index 35ac8ad..04ba5b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -408,6 +408,15 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e537132deb99c0eb4b752f0346b6a836200eaaa3516dd7e5514b63930a09e5d" +[[package]] +name = "itertools" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69ddb889f9d0d08a67338271fa9b62996bc788c7796a5c18cf057420aaed5eaf" +dependencies = [ + "either", +] + [[package]] name = "jni" version = "0.10.2" @@ -581,9 +590,11 @@ dependencies = [ "boringtun", "clap", "futures", + "itertools", "lockfree", "log", "pretty_env_logger", + "rand", "smoltcp", "tokio", ] @@ -631,6 +642,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "ppv-lite86" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3ca011bd0129ff4ae15cd04c4eef202cadf6c51c21e47aba319b4e0501db741" + [[package]] name = "pretty_env_logger" version = "0.3.1" @@ -678,6 +695,46 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", + "rand_hc", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" +dependencies = [ + "getrandom", +] + +[[package]] +name = "rand_hc" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" +dependencies = [ + "rand_core", +] + [[package]] name = "redox_syscall" version = "0.2.10" diff --git a/Cargo.toml b/Cargo.toml index 94a447d..9d10ef3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,3 +15,5 @@ smoltcp = { git = "https://github.com/smoltcp-rs/smoltcp", branch = "master" } tokio = { version = "1", features = ["full"] } lockfree = "0.5.1" futures = "0.3.17" +rand = "0.8.4" +itertools = "0.10.1" diff --git a/src/main.rs b/src/main.rs index 4913d82..12c520b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -136,6 +136,9 @@ async fn handle_tcp_proxy_connection( // Abort signal for stopping the Virtual Interface let abort = Arc::new(AtomicBool::new(false)); + // Signals that the Virtual Client is ready to send data + let (virtual_client_ready_tx, virtual_client_ready_rx) = tokio::sync::oneshot::channel::<()>(); + // data_to_real_client_(tx/rx): This task reads the data from this mpsc channel to send back // to the real client. let (data_to_real_client_tx, mut data_to_real_client_rx) = tokio::sync::mpsc::channel(1_000); @@ -156,11 +159,22 @@ async fn handle_tcp_proxy_connection( abort, data_to_real_client_tx, data_to_virtual_server_rx, + virtual_client_ready_tx, ) .await }); } + // Wait for virtual client to be ready. + virtual_client_ready_rx + .await + .expect("failed to wait for virtual client to be ready"); + trace!("[{}] Virtual client is ready to send data", virtual_port); + + // Data that has been received from the client, but hasn't been flushed to the + // virtual client just yet. + let mut unflushed_data = Vec::with_capacity(MAX_PACKET); + loop { if abort.load(Ordering::Relaxed) { break; @@ -170,7 +184,8 @@ async fn handle_tcp_proxy_connection( readable_result = socket.readable() => { match readable_result { Ok(_) => { - let mut buffer = vec![]; + // Buffer for the individual TCP segment. + let mut buffer = Vec::with_capacity(MAX_PACKET); match socket.try_read_buf(&mut buffer) { Ok(size) if size > 0 => { let data = &buffer[..size]; @@ -178,14 +193,21 @@ async fn handle_tcp_proxy_connection( "[{}] Read {} bytes of TCP data from real client", virtual_port, size ); - if let Err(e) = data_to_virtual_server_tx.send(data.to_vec()).await { - error!( - "[{}] Failed to dispatch data to virtual interface: {:?}", - virtual_port, e - ); - } + unflushed_data.extend_from_slice(data); } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + trace!("[{}] Real client blocked; have {} bytes to flush", virtual_port, unflushed_data.len()); + if !unflushed_data.is_empty() { + let data = unflushed_data; + // Reset unflushed data + unflushed_data = Vec::with_capacity(MAX_PACKET); + if let Err(e) = data_to_virtual_server_tx.send(data).await { + error!( + "[{}] Failed to dispatch data to virtual interface: {:?}", + virtual_port, e + ); + } + } continue; } Err(e) => { @@ -236,6 +258,7 @@ async fn handle_tcp_proxy_connection( Ok(()) } +#[allow(clippy::too_many_arguments)] async fn virtual_tcp_interface( virtual_port: u16, source_peer_ip: IpAddr, @@ -244,7 +267,10 @@ async fn virtual_tcp_interface( abort: Arc, data_to_real_client_tx: tokio::sync::mpsc::Sender>, mut data_to_virtual_server_rx: tokio::sync::mpsc::Receiver>, + virtual_client_ready_tx: tokio::sync::oneshot::Sender<()>, ) -> anyhow::Result<()> { + let mut virtual_client_ready_tx = Some(virtual_client_ready_tx); + // Create a device and interface to simulate IP packets // In essence: // * TCP packets received from the 'real' client are 'sent' to the 'virtual server' via the 'virtual client' @@ -304,8 +330,12 @@ async fn virtual_tcp_interface( let _server_handle = socket_set.add(server_socket?); let client_handle = socket_set.add(client_socket?); + // Instructs that this is the last poll, after which the connection is closed. let mut graceful_shutdown = false; + // Any data that wasn't sent because it was over the sending buffer limit + let mut tx_extra = Vec::new(); + loop { let loop_start = smoltcp::time::Instant::now(); let forceful_shutdown = abort.load(Ordering::Relaxed); @@ -352,13 +382,40 @@ async fn virtual_tcp_interface( } } if client_socket.can_send() { - // Check if there is anything to send - if let Ok(data) = data_to_virtual_server_rx.try_recv() { - if let Err(e) = client_socket.send_slice(&data) { - error!( - "[{}] Failed to send slice via virtual client socket: {:?}", - virtual_port, e - ); + if let Some(virtual_client_ready_tx) = virtual_client_ready_tx.take() { + virtual_client_ready_tx + .send(()) + .expect("Failed to notify real client that virtual client is ready"); + } + + let mut to_transfer = None; + + if tx_extra.is_empty() { + // We can read the next data in the queue + if let Ok(data) = data_to_virtual_server_rx.try_recv() { + to_transfer = Some(data); + } + } + + let to_transfer_slice = to_transfer.as_ref().unwrap_or(&tx_extra).as_slice(); + if !to_transfer_slice.is_empty() { + let total = to_transfer_slice.len(); + match client_socket.send_slice(to_transfer_slice) { + Ok(sent) => { + trace!( + "[{}] Sent {}/{} bytes via virtual client socket", + virtual_port, + sent, + total, + ); + tx_extra = Vec::from(&to_transfer_slice[sent..total]); + } + Err(e) => { + error!( + "[{}] Failed to send slice via virtual client socket: {:?}", + virtual_port, e + ); + } } } } diff --git a/src/port_pool.rs b/src/port_pool.rs index 771c7a9..a932b7c 100644 --- a/src/port_pool.rs +++ b/src/port_pool.rs @@ -1,6 +1,8 @@ use std::ops::Range; use anyhow::Context; +use rand::seq::SliceRandom; +use rand::thread_rng; const MIN_PORT: u16 = 32768; const MAX_PORT: u16 = 60999; @@ -25,7 +27,9 @@ impl PortPool { /// Initializes a new pool of virtual ports. pub fn new() -> Self { let inner = lockfree::queue::Queue::default(); - PORT_RANGE.for_each(|p| inner.push(p) as ()); + let mut ports: Vec = PORT_RANGE.collect(); + ports.shuffle(&mut thread_rng()); + ports.into_iter().for_each(|p| inner.push(p) as ()); Self { inner, taken: lockfree::set::Set::new(), diff --git a/src/virtual_device.rs b/src/virtual_device.rs index 9692963..ad60b63 100644 --- a/src/virtual_device.rs +++ b/src/virtual_device.rs @@ -49,7 +49,7 @@ impl<'a> Device<'a> for VirtualIpDevice { fn capabilities(&self) -> DeviceCapabilities { let mut cap = DeviceCapabilities::default(); cap.medium = Medium::Ip; - cap.max_transmission_unit = 65535; + cap.max_transmission_unit = 1420; cap } }