Fix numerous issues with fragmentation of large packets.

This commit is contained in:
Aram 🍐 2021-10-16 15:56:53 -04:00
parent 822216c24b
commit 5debcca268
6 changed files with 137 additions and 16 deletions

View file

@ -136,6 +136,9 @@ async fn handle_tcp_proxy_connection(
// Abort signal for stopping the Virtual Interface
let abort = Arc::new(AtomicBool::new(false));
// Signals that the Virtual Client is ready to send data
let (virtual_client_ready_tx, virtual_client_ready_rx) = tokio::sync::oneshot::channel::<()>();
// data_to_real_client_(tx/rx): This task reads the data from this mpsc channel to send back
// to the real client.
let (data_to_real_client_tx, mut data_to_real_client_rx) = tokio::sync::mpsc::channel(1_000);
@ -156,11 +159,22 @@ async fn handle_tcp_proxy_connection(
abort,
data_to_real_client_tx,
data_to_virtual_server_rx,
virtual_client_ready_tx,
)
.await
});
}
// Wait for virtual client to be ready.
virtual_client_ready_rx
.await
.expect("failed to wait for virtual client to be ready");
trace!("[{}] Virtual client is ready to send data", virtual_port);
// Data that has been received from the client, but hasn't been flushed to the
// virtual client just yet.
let mut unflushed_data = Vec::with_capacity(MAX_PACKET);
loop {
if abort.load(Ordering::Relaxed) {
break;
@ -170,7 +184,8 @@ async fn handle_tcp_proxy_connection(
readable_result = socket.readable() => {
match readable_result {
Ok(_) => {
let mut buffer = vec![];
// Buffer for the individual TCP segment.
let mut buffer = Vec::with_capacity(MAX_PACKET);
match socket.try_read_buf(&mut buffer) {
Ok(size) if size > 0 => {
let data = &buffer[..size];
@ -178,14 +193,21 @@ async fn handle_tcp_proxy_connection(
"[{}] Read {} bytes of TCP data from real client",
virtual_port, size
);
if let Err(e) = data_to_virtual_server_tx.send(data.to_vec()).await {
error!(
"[{}] Failed to dispatch data to virtual interface: {:?}",
virtual_port, e
);
}
unflushed_data.extend_from_slice(data);
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
trace!("[{}] Real client blocked; have {} bytes to flush", virtual_port, unflushed_data.len());
if !unflushed_data.is_empty() {
let data = unflushed_data;
// Reset unflushed data
unflushed_data = Vec::with_capacity(MAX_PACKET);
if let Err(e) = data_to_virtual_server_tx.send(data).await {
error!(
"[{}] Failed to dispatch data to virtual interface: {:?}",
virtual_port, e
);
}
}
continue;
}
Err(e) => {
@ -236,6 +258,7 @@ async fn handle_tcp_proxy_connection(
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn virtual_tcp_interface(
virtual_port: u16,
source_peer_ip: IpAddr,
@ -244,7 +267,10 @@ async fn virtual_tcp_interface(
abort: Arc<AtomicBool>,
data_to_real_client_tx: tokio::sync::mpsc::Sender<Vec<u8>>,
mut data_to_virtual_server_rx: tokio::sync::mpsc::Receiver<Vec<u8>>,
virtual_client_ready_tx: tokio::sync::oneshot::Sender<()>,
) -> anyhow::Result<()> {
let mut virtual_client_ready_tx = Some(virtual_client_ready_tx);
// Create a device and interface to simulate IP packets
// In essence:
// * TCP packets received from the 'real' client are 'sent' to the 'virtual server' via the 'virtual client'
@ -304,8 +330,12 @@ async fn virtual_tcp_interface(
let _server_handle = socket_set.add(server_socket?);
let client_handle = socket_set.add(client_socket?);
// Instructs that this is the last poll, after which the connection is closed.
let mut graceful_shutdown = false;
// Any data that wasn't sent because it was over the sending buffer limit
let mut tx_extra = Vec::new();
loop {
let loop_start = smoltcp::time::Instant::now();
let forceful_shutdown = abort.load(Ordering::Relaxed);
@ -352,13 +382,40 @@ async fn virtual_tcp_interface(
}
}
if client_socket.can_send() {
// Check if there is anything to send
if let Ok(data) = data_to_virtual_server_rx.try_recv() {
if let Err(e) = client_socket.send_slice(&data) {
error!(
"[{}] Failed to send slice via virtual client socket: {:?}",
virtual_port, e
);
if let Some(virtual_client_ready_tx) = virtual_client_ready_tx.take() {
virtual_client_ready_tx
.send(())
.expect("Failed to notify real client that virtual client is ready");
}
let mut to_transfer = None;
if tx_extra.is_empty() {
// We can read the next data in the queue
if let Ok(data) = data_to_virtual_server_rx.try_recv() {
to_transfer = Some(data);
}
}
let to_transfer_slice = to_transfer.as_ref().unwrap_or(&tx_extra).as_slice();
if !to_transfer_slice.is_empty() {
let total = to_transfer_slice.len();
match client_socket.send_slice(to_transfer_slice) {
Ok(sent) => {
trace!(
"[{}] Sent {}/{} bytes via virtual client socket",
virtual_port,
sent,
total,
);
tx_extra = Vec::from(&to_transfer_slice[sent..total]);
}
Err(e) => {
error!(
"[{}] Failed to send slice via virtual client socket: {:?}",
virtual_port, e
);
}
}
}
}

View file

@ -1,6 +1,8 @@
use std::ops::Range;
use anyhow::Context;
use rand::seq::SliceRandom;
use rand::thread_rng;
const MIN_PORT: u16 = 32768;
const MAX_PORT: u16 = 60999;
@ -25,7 +27,9 @@ impl PortPool {
/// Initializes a new pool of virtual ports.
pub fn new() -> Self {
let inner = lockfree::queue::Queue::default();
PORT_RANGE.for_each(|p| inner.push(p) as ());
let mut ports: Vec<u16> = PORT_RANGE.collect();
ports.shuffle(&mut thread_rng());
ports.into_iter().for_each(|p| inner.push(p) as ());
Self {
inner,
taken: lockfree::set::Set::new(),

View file

@ -49,7 +49,7 @@ impl<'a> Device<'a> for VirtualIpDevice {
fn capabilities(&self) -> DeviceCapabilities {
let mut cap = DeviceCapabilities::default();
cap.medium = Medium::Ip;
cap.max_transmission_unit = 65535;
cap.max_transmission_unit = 1420;
cap
}
}