Improve reliability using event-based synchronization

This commit is contained in:
Aram 🍐 2022-01-08 01:05:51 -05:00
parent 62b2641627
commit 51788c9557
12 changed files with 628 additions and 805 deletions

View file

@ -5,6 +5,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Instant;
use crate::events::{Bus, Event};
use anyhow::Context;
use priority_queue::double_priority_queue::DoublePriorityQueue;
use priority_queue::priority_queue::PriorityQueue;
@ -30,61 +31,24 @@ const UDP_TIMEOUT_SECONDS: u64 = 60;
/// TODO: Make this configurable by the CLI
const PORTS_PER_IP: usize = 100;
/// Starts the server that listens on UDP datagrams.
pub async fn udp_proxy_server(
port_forward: PortForwardConfig,
port_pool: UdpPortPool,
wg: Arc<WireGuardTunnel>,
bus: Bus,
) -> anyhow::Result<()> {
// Abort signal
let abort = Arc::new(AtomicBool::new(false));
// data_to_real_client_(tx/rx): This task reads the data from this mpsc channel to send back
// to the real client.
let (data_to_real_client_tx, mut data_to_real_client_rx) =
tokio::sync::mpsc::channel::<(VirtualPort, Vec<u8>)>(1_000);
// data_to_real_server_(tx/rx): This task sends the data received from the real client to the
// virtual interface (virtual server socket).
let (data_to_virtual_server_tx, data_to_virtual_server_rx) =
tokio::sync::mpsc::channel::<(VirtualPort, Vec<u8>)>(1_000);
{
// Spawn virtual interface
// Note: contrary to TCP, there is only one UDP virtual interface
let virtual_interface = UdpVirtualInterface::new(
port_forward,
wg,
data_to_real_client_tx,
data_to_virtual_server_rx,
);
let abort = abort.clone();
tokio::spawn(async move {
virtual_interface.poll_loop().await.unwrap_or_else(|e| {
error!("Virtual interface poll loop failed unexpectedly: {}", e);
abort.store(true, Ordering::Relaxed);
});
});
}
let mut endpoint = bus.new_endpoint();
let socket = UdpSocket::bind(port_forward.source)
.await
.with_context(|| "Failed to bind on UDP proxy address")?;
let mut buffer = [0u8; MAX_PACKET];
loop {
if abort.load(Ordering::Relaxed) {
break;
}
tokio::select! {
to_send_result = next_udp_datagram(&socket, &mut buffer, port_pool.clone()) => {
match to_send_result {
Ok(Some((port, data))) => {
data_to_virtual_server_tx.send((port, data)).await.unwrap_or_else(|e| {
error!(
"Failed to dispatch data to UDP virtual interface: {:?}",
e
);
});
endpoint.send(Event::LocalData(port, data));
}
Ok(None) => {
continue;
@ -98,9 +62,9 @@ pub async fn udp_proxy_server(
}
}
}
data_recv_result = data_to_real_client_rx.recv() => {
if let Some((port, data)) = data_recv_result {
if let Some(peer_addr) = port_pool.get_peer_addr(port.0).await {
event = endpoint.recv() => {
if let Event::RemoteData(port, data) = event {
if let Some(peer_addr) = port_pool.get_peer_addr(port).await {
if let Err(e) = socket.send_to(&data, peer_addr).await {
error!(
"[{}] Failed to send UDP datagram to real client ({}): {:?}",
@ -109,7 +73,7 @@ pub async fn udp_proxy_server(
e,
);
}
port_pool.update_last_transmit(port.0).await;
port_pool.update_last_transmit(port).await;
}
}
}
@ -141,14 +105,13 @@ async fn next_udp_datagram(
return Ok(None);
}
};
let port = VirtualPort(port, PortProtocol::Udp);
debug!(
"[{}] Received datagram of {} bytes from {}",
port, size, peer_addr
);
port_pool.update_last_transmit(port.0).await;
port_pool.update_last_transmit(port).await;
let data = buffer[..size].to_vec();
Ok(Some((port, data)))
@ -181,14 +144,14 @@ impl UdpPortPool {
}
/// Requests a free port from the pool. An error is returned if none is available (exhausted max capacity).
pub async fn next(&self, peer_addr: SocketAddr) -> anyhow::Result<u16> {
pub async fn next(&self, peer_addr: SocketAddr) -> anyhow::Result<VirtualPort> {
// A port found to be reused. This is outside of the block because the read lock cannot be upgraded to a write lock.
let mut port_reuse: Option<u16> = None;
{
let inner = self.inner.read().await;
if let Some(port) = inner.port_by_peer_addr.get(&peer_addr) {
return Ok(*port);
return Ok(VirtualPort::new(*port, PortProtocol::Udp));
}
// Count how many ports are being used by the peer IP
@ -240,26 +203,26 @@ impl UdpPortPool {
inner.port_by_peer_addr.insert(peer_addr, port);
inner.peer_addr_by_port.insert(port, peer_addr);
Ok(port)
Ok(VirtualPort::new(port, PortProtocol::Udp))
}
/// Notify that the given virtual port has received or transmitted a UDP datagram.
pub async fn update_last_transmit(&self, port: u16) {
pub async fn update_last_transmit(&self, port: VirtualPort) {
let mut inner = self.inner.write().await;
if let Some(peer) = inner.peer_addr_by_port.get(&port).copied() {
if let Some(peer) = inner.peer_addr_by_port.get(&port.num()).copied() {
let mut pq: &mut DoublePriorityQueue<u16, Instant> = inner
.peer_port_usage
.entry(peer.ip())
.or_insert_with(Default::default);
pq.push(port, Instant::now());
pq.push(port.num(), Instant::now());
}
let mut pq: &mut DoublePriorityQueue<u16, Instant> = &mut inner.port_usage;
pq.push(port, Instant::now());
pq.push(port.num(), Instant::now());
}
pub async fn get_peer_addr(&self, port: u16) -> Option<SocketAddr> {
pub async fn get_peer_addr(&self, port: VirtualPort) -> Option<SocketAddr> {
let inner = self.inner.read().await;
inner.peer_addr_by_port.get(&port).copied()
inner.peer_addr_by_port.get(&port.num()).copied()
}
}