Use tokio select for polling client socket

This commit is contained in:
Aram 🍐 2021-10-14 19:31:29 -04:00
parent 5ba111002f
commit bf489900e6
2 changed files with 60 additions and 71 deletions

View file

@ -10,7 +10,7 @@ use anyhow::Context;
use smoltcp::iface::InterfaceBuilder; use smoltcp::iface::InterfaceBuilder;
use smoltcp::socket::{SocketSet, TcpSocket, TcpSocketBuffer}; use smoltcp::socket::{SocketSet, TcpSocket, TcpSocketBuffer};
use smoltcp::wire::{IpAddress, IpCidr}; use smoltcp::wire::{IpAddress, IpCidr};
use tokio::io::Interest; use tokio::io::{AsyncReadExt, Interest};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::error::TryRecvError;
@ -158,85 +158,76 @@ async fn handle_tcp_proxy_connection(
} }
loop { loop {
let ready = socket
.ready(Interest::READABLE | Interest::WRITABLE)
.await
.with_context(|| "Failed to wait for TCP proxy socket readiness")?;
if abort.load(Ordering::Relaxed) { if abort.load(Ordering::Relaxed) {
break; break;
} }
if ready.is_readable() { tokio::select! {
let mut buffer = [0u8; MAX_PACKET]; readable_result = socket.readable() => {
match readable_result {
match socket.try_read(&mut buffer) { Ok(_) => {
Ok(size) if size > 0 => { let mut buffer = vec![];
let data = &buffer[..size]; match socket.try_read_buf(&mut buffer) {
debug!( Ok(size) if size > 0 => {
"[{}] Read {} bytes of TCP data from real client", let data = &buffer[..size];
virtual_port, size debug!(
); "[{}] Read {} bytes of TCP data from real client",
match data_to_real_server_tx.send(data.to_vec()).await { virtual_port, size
);
match data_to_real_server_tx.send(data.to_vec()).await {
Err(e) => {
error!(
"[{}] Failed to dispatch data to virtual interface: {:?}",
virtual_port, e
);
}
_ => {}
}
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
error!(
"[{}] Failed to read from client TCP socket: {:?}",
virtual_port, e
);
break;
}
_ => {
break;
}
}
}
Err(e) => {
error!("[{}] Failed to check if readable: {:?}", virtual_port, e);
break;
}
}
}
data_recv_result = data_to_real_client_rx.recv() => {
match data_recv_result {
Some(data) => match socket.try_write(&data) {
Ok(size) => {
debug!(
"[{}] Wrote {} bytes of TCP data to real client",
virtual_port, size
);
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => { Err(e) => {
error!( error!(
"[{}] Failed to dispatch data to virtual interface: {:?}", "[{}] Failed to write to client TCP socket: {:?}",
virtual_port, e virtual_port, e
); );
} }
_ => {} },
} None => continue,
} }
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
error!(
"[{}] Failed to read from client TCP socket: {:?}",
virtual_port, e
);
break;
}
_ => {}
} }
} }
if ready.is_writable() {
// Flush the data_to_real_client_rx channel
match data_to_real_client_rx.try_recv() {
Ok(data) => match socket.try_write(&data) {
Ok(size) => {
debug!(
"[{}] Wrote {} bytes of TCP data to real client",
virtual_port, size
);
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
error!(
"[{}] Failed to write to client TCP socket: {:?}",
virtual_port, e
);
}
},
Err(e) => match e {
TryRecvError::Empty => {
// Nothing else to consume in the data channel.
}
TryRecvError::Disconnected => {
// Channel is broken, probably terminated.
}
},
}
}
if ready.is_read_closed() || ready.is_write_closed() {
break;
}
tokio::time::sleep(Duration::from_millis(1)).await;
} }
trace!("[{}] TCP socket handler task terminated", virtual_port); trace!("[{}] TCP socket handler task terminated", virtual_port);

View file

@ -14,9 +14,7 @@ impl PortPool {
pub fn new() -> Self { pub fn new() -> Self {
let inner = lockfree::queue::Queue::default(); let inner = lockfree::queue::Queue::default();
PORT_RANGE.for_each(|p| inner.push(p) as ()); PORT_RANGE.for_each(|p| inner.push(p) as ());
Self { Self { inner }
inner,
}
} }
pub fn next(&self) -> anyhow::Result<u16> { pub fn next(&self) -> anyhow::Result<u16> {