diff --git a/Cargo.lock b/Cargo.lock index 885b05b..851e8c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -374,6 +374,12 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0a01e0497841a3b2db4f8afa483cce65f7e96a3498bd6c541734792aeac8fe7" +[[package]] +name = "hashbrown" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -398,6 +404,16 @@ dependencies = [ "quick-error", ] +[[package]] +name = "indexmap" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5" +dependencies = [ + "autocfg", + "hashbrown", +] + [[package]] name = "instant" version = "0.1.11" @@ -615,6 +631,7 @@ dependencies = [ "log", "nom", "pretty_env_logger", + "priority-queue", "rand", "smoltcp", "tokio", @@ -674,6 +691,16 @@ dependencies = [ "log", ] +[[package]] +name = "priority-queue" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf40e51ccefb72d42720609e1d3c518de8b5800d723a09358d4a6d6245e1f8ca" +dependencies = [ + "autocfg", + "indexmap", +] + [[package]] name = "proc-macro-hack" version = "0.5.19" diff --git a/Cargo.toml b/Cargo.toml index 905ac32..9549a80 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,3 +18,4 @@ rand = "0.8.4" nom = "7" async-trait = "0.1.51" dashmap = "4.0.2" +priority-queue = "1.2.0" diff --git a/src/tunnel/udp.rs b/src/tunnel/udp.rs index 6eb7e02..bedcdf7 100644 --- a/src/tunnel/udp.rs +++ b/src/tunnel/udp.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, VecDeque}; +use std::collections::{BTreeMap, HashMap, VecDeque}; use std::net::{IpAddr, SocketAddr}; use std::ops::Range; use std::sync::atomic::{AtomicBool, Ordering}; @@ -6,6 +6,8 @@ use std::sync::Arc; use std::time::Instant; use anyhow::Context; +use priority_queue::double_priority_queue::DoublePriorityQueue; +use priority_queue::priority_queue::PriorityQueue; use rand::seq::SliceRandom; use rand::thread_rng; use tokio::net::UdpSocket; @@ -107,6 +109,7 @@ pub async fn udp_proxy_server( e, ); } + port_pool.update_last_transmit(port.0).await; } } } @@ -145,6 +148,8 @@ async fn next_udp_datagram( port, size, peer_addr ); + port_pool.update_last_transmit(port.0).await; + let data = buffer[..size].to_vec(); Ok(Some((port, data))) } @@ -177,26 +182,81 @@ impl UdpPortPool { /// Requests a free port from the pool. An error is returned if none is available (exhausted max capacity). pub async fn next(&self, peer_addr: SocketAddr) -> anyhow::Result { + // A port found to be reused. This is outside of the block because the read lock cannot be upgraded to a write lock. + let mut port_reuse: Option = None; + { let inner = self.inner.read().await; if let Some(port) = inner.port_by_peer_addr.get(&peer_addr) { return Ok(*port); } + + // Count how many ports are being used by the peer IP + let peer_ip = peer_addr.ip(); + let peer_port_count = inner + .peer_port_usage + .get(&peer_ip) + .map(|v| v.len()) + .unwrap_or_default(); + + if peer_port_count >= PORTS_PER_IP { + // Return least recently used port in this IP's pool + port_reuse = Some( + *(inner + .peer_port_usage + .get(&peer_ip) + .unwrap() + .peek_min() + .unwrap() + .0), + ); + warn!( + "Peer [{}] is re-using active virtual port {} due to self-exhaustion.", + peer_addr, + port_reuse.unwrap() + ); + } } - // TODO: When the port pool is exhausted, it should re-queue the least recently used port. - // TODO: Limit number of ports in use by peer IP - let mut inner = self.inner.write().await; - let port = inner - .queue - .pop_front() - .with_context(|| "UDP virtual port pool is exhausted")?; + + let port = port_reuse + .or_else(|| inner.queue.pop_front()) + .or_else(|| { + // If there is no port to reuse, and the port pool is exhausted, take the last recently used port overall, + // as long as the last transmission exceeds the deadline + let last: (&u16, &Instant) = inner.port_usage.peek_min().unwrap(); + if Instant::now().duration_since(*last.1).as_secs() > UDP_TIMEOUT_SECONDS { + warn!( + "Peer [{}] is re-using inactive virtual port {} due to global exhaustion.", + peer_addr, last.0 + ); + Some(*last.0) + } else { + None + } + }) + .with_context(|| "virtual port pool is exhausted")?; + inner.port_by_peer_addr.insert(peer_addr, port); inner.peer_addr_by_port.insert(port, peer_addr); Ok(port) } + /// Notify that the given virtual port has received or transmitted a UDP datagram. + pub async fn update_last_transmit(&self, port: u16) { + let mut inner = self.inner.write().await; + if let Some(peer) = inner.peer_addr_by_port.get(&port).copied() { + let mut pq: &mut DoublePriorityQueue = inner + .peer_port_usage + .entry(peer.ip()) + .or_insert_with(Default::default); + pq.push(port, Instant::now()); + } + let mut pq: &mut DoublePriorityQueue = &mut inner.port_usage; + pq.push(port, Instant::now()); + } + pub async fn get_peer_addr(&self, port: u16) -> Option { let inner = self.inner.read().await; inner.peer_addr_by_port.get(&port).copied() @@ -208,8 +268,14 @@ impl UdpPortPool { struct UdpPortPoolInner { /// Remaining ports in the pool. queue: VecDeque, - /// The port assigned by peer IP/port. + /// The port assigned by peer IP/port. This is used to lookup an existing virtual port + /// for an incoming UDP datagram. port_by_peer_addr: HashMap, - /// The socket address assigned to a peer IP/port. + /// The socket address assigned to a peer IP/port. This is used to send a UDP datagram to + /// the real peer address, given the virtual port. peer_addr_by_port: HashMap, + /// Keeps an ordered map of the most recently used virtual ports by a peer (client) IP. + peer_port_usage: HashMap>, + /// Keeps an ordered map of the most recently used virtual ports in general. + port_usage: DoublePriorityQueue, }