UDP port re-use during flooding

This commit is contained in:
Aram 🍐 2021-10-26 00:03:44 -04:00
parent d975efefaf
commit faf157cfeb
3 changed files with 104 additions and 10 deletions

27
Cargo.lock generated
View file

@ -374,6 +374,12 @@ version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0a01e0497841a3b2db4f8afa483cce65f7e96a3498bd6c541734792aeac8fe7" checksum = "f0a01e0497841a3b2db4f8afa483cce65f7e96a3498bd6c541734792aeac8fe7"
[[package]]
name = "hashbrown"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
[[package]] [[package]]
name = "hermit-abi" name = "hermit-abi"
version = "0.1.19" version = "0.1.19"
@ -398,6 +404,16 @@ dependencies = [
"quick-error", "quick-error",
] ]
[[package]]
name = "indexmap"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5"
dependencies = [
"autocfg",
"hashbrown",
]
[[package]] [[package]]
name = "instant" name = "instant"
version = "0.1.11" version = "0.1.11"
@ -615,6 +631,7 @@ dependencies = [
"log", "log",
"nom", "nom",
"pretty_env_logger", "pretty_env_logger",
"priority-queue",
"rand", "rand",
"smoltcp", "smoltcp",
"tokio", "tokio",
@ -674,6 +691,16 @@ dependencies = [
"log", "log",
] ]
[[package]]
name = "priority-queue"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf40e51ccefb72d42720609e1d3c518de8b5800d723a09358d4a6d6245e1f8ca"
dependencies = [
"autocfg",
"indexmap",
]
[[package]] [[package]]
name = "proc-macro-hack" name = "proc-macro-hack"
version = "0.5.19" version = "0.5.19"

View file

@ -18,3 +18,4 @@ rand = "0.8.4"
nom = "7" nom = "7"
async-trait = "0.1.51" async-trait = "0.1.51"
dashmap = "4.0.2" dashmap = "4.0.2"
priority-queue = "1.2.0"

View file

@ -1,4 +1,4 @@
use std::collections::{HashMap, VecDeque}; use std::collections::{BTreeMap, HashMap, VecDeque};
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::ops::Range; use std::ops::Range;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
@ -6,6 +6,8 @@ use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use anyhow::Context; use anyhow::Context;
use priority_queue::double_priority_queue::DoublePriorityQueue;
use priority_queue::priority_queue::PriorityQueue;
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use rand::thread_rng; use rand::thread_rng;
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
@ -107,6 +109,7 @@ pub async fn udp_proxy_server(
e, e,
); );
} }
port_pool.update_last_transmit(port.0).await;
} }
} }
} }
@ -145,6 +148,8 @@ async fn next_udp_datagram(
port, size, peer_addr port, size, peer_addr
); );
port_pool.update_last_transmit(port.0).await;
let data = buffer[..size].to_vec(); let data = buffer[..size].to_vec();
Ok(Some((port, data))) Ok(Some((port, data)))
} }
@ -177,26 +182,81 @@ impl UdpPortPool {
/// Requests a free port from the pool. An error is returned if none is available (exhausted max capacity). /// Requests a free port from the pool. An error is returned if none is available (exhausted max capacity).
pub async fn next(&self, peer_addr: SocketAddr) -> anyhow::Result<u16> { pub async fn next(&self, peer_addr: SocketAddr) -> anyhow::Result<u16> {
// A port found to be reused. This is outside of the block because the read lock cannot be upgraded to a write lock.
let mut port_reuse: Option<u16> = None;
{ {
let inner = self.inner.read().await; let inner = self.inner.read().await;
if let Some(port) = inner.port_by_peer_addr.get(&peer_addr) { if let Some(port) = inner.port_by_peer_addr.get(&peer_addr) {
return Ok(*port); return Ok(*port);
} }
// Count how many ports are being used by the peer IP
let peer_ip = peer_addr.ip();
let peer_port_count = inner
.peer_port_usage
.get(&peer_ip)
.map(|v| v.len())
.unwrap_or_default();
if peer_port_count >= PORTS_PER_IP {
// Return least recently used port in this IP's pool
port_reuse = Some(
*(inner
.peer_port_usage
.get(&peer_ip)
.unwrap()
.peek_min()
.unwrap()
.0),
);
warn!(
"Peer [{}] is re-using active virtual port {} due to self-exhaustion.",
peer_addr,
port_reuse.unwrap()
);
}
} }
// TODO: When the port pool is exhausted, it should re-queue the least recently used port.
// TODO: Limit number of ports in use by peer IP
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
let port = inner
.queue let port = port_reuse
.pop_front() .or_else(|| inner.queue.pop_front())
.with_context(|| "UDP virtual port pool is exhausted")?; .or_else(|| {
// If there is no port to reuse, and the port pool is exhausted, take the last recently used port overall,
// as long as the last transmission exceeds the deadline
let last: (&u16, &Instant) = inner.port_usage.peek_min().unwrap();
if Instant::now().duration_since(*last.1).as_secs() > UDP_TIMEOUT_SECONDS {
warn!(
"Peer [{}] is re-using inactive virtual port {} due to global exhaustion.",
peer_addr, last.0
);
Some(*last.0)
} else {
None
}
})
.with_context(|| "virtual port pool is exhausted")?;
inner.port_by_peer_addr.insert(peer_addr, port); inner.port_by_peer_addr.insert(peer_addr, port);
inner.peer_addr_by_port.insert(port, peer_addr); inner.peer_addr_by_port.insert(port, peer_addr);
Ok(port) Ok(port)
} }
/// Notify that the given virtual port has received or transmitted a UDP datagram.
pub async fn update_last_transmit(&self, port: u16) {
let mut inner = self.inner.write().await;
if let Some(peer) = inner.peer_addr_by_port.get(&port).copied() {
let mut pq: &mut DoublePriorityQueue<u16, Instant> = inner
.peer_port_usage
.entry(peer.ip())
.or_insert_with(Default::default);
pq.push(port, Instant::now());
}
let mut pq: &mut DoublePriorityQueue<u16, Instant> = &mut inner.port_usage;
pq.push(port, Instant::now());
}
pub async fn get_peer_addr(&self, port: u16) -> Option<SocketAddr> { pub async fn get_peer_addr(&self, port: u16) -> Option<SocketAddr> {
let inner = self.inner.read().await; let inner = self.inner.read().await;
inner.peer_addr_by_port.get(&port).copied() inner.peer_addr_by_port.get(&port).copied()
@ -208,8 +268,14 @@ impl UdpPortPool {
struct UdpPortPoolInner { struct UdpPortPoolInner {
/// Remaining ports in the pool. /// Remaining ports in the pool.
queue: VecDeque<u16>, queue: VecDeque<u16>,
/// The port assigned by peer IP/port. /// The port assigned by peer IP/port. This is used to lookup an existing virtual port
/// for an incoming UDP datagram.
port_by_peer_addr: HashMap<SocketAddr, u16>, port_by_peer_addr: HashMap<SocketAddr, u16>,
/// The socket address assigned to a peer IP/port. /// The socket address assigned to a peer IP/port. This is used to send a UDP datagram to
/// the real peer address, given the virtual port.
peer_addr_by_port: HashMap<u16, SocketAddr>, peer_addr_by_port: HashMap<u16, SocketAddr>,
/// Keeps an ordered map of the most recently used virtual ports by a peer (client) IP.
peer_port_usage: HashMap<IpAddr, DoublePriorityQueue<u16, Instant>>,
/// Keeps an ordered map of the most recently used virtual ports in general.
port_usage: DoublePriorityQueue<u16, Instant>,
} }