mirror of
https://github.com/aramperes/onetun.git
synced 2025-09-09 06:38:32 -04:00
Drain IP broadcast sink receiver
This commit is contained in:
parent
d0497e437a
commit
25bcedc855
4 changed files with 161 additions and 6 deletions
121
Cargo.lock
generated
121
Cargo.lock
generated
|
@ -242,6 +242,100 @@ dependencies = [
|
|||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures"
|
||||
version = "0.3.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a12aa0eb539080d55c3f2d45a67c3b58b6b0773c1a3ca2dfec66d58c97fd66ca"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-executor",
|
||||
"futures-io",
|
||||
"futures-sink",
|
||||
"futures-task",
|
||||
"futures-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-channel"
|
||||
version = "0.3.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5da6ba8c3bb3c165d3c7319fc1cc8304facf1fb8db99c5de877183c08a273888"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-core"
|
||||
version = "0.3.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d"
|
||||
|
||||
[[package]]
|
||||
name = "futures-executor"
|
||||
version = "0.3.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "45025be030969d763025784f7f355043dc6bc74093e4ecc5000ca4dc50d8745c"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-task",
|
||||
"futures-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-io"
|
||||
version = "0.3.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377"
|
||||
|
||||
[[package]]
|
||||
name = "futures-macro"
|
||||
version = "0.3.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "18e4a4b95cea4b4ccbcf1c5675ca7c4ee4e9e75eb79944d07defde18068f79bb"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"proc-macro-hack",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-sink"
|
||||
version = "0.3.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "36ea153c13024fe480590b3e3d4cad89a0cfacecc24577b68f86c6ced9c2bc11"
|
||||
|
||||
[[package]]
|
||||
name = "futures-task"
|
||||
version = "0.3.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1d3d00f4eddb73e498a54394f228cd55853bdf059259e8e7bc6e69d408892e99"
|
||||
|
||||
[[package]]
|
||||
name = "futures-util"
|
||||
version = "0.3.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "36568465210a3a6ee45e1f165136d68671471a501e632e9a98d96872222b5481"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
"futures-macro",
|
||||
"futures-sink",
|
||||
"futures-task",
|
||||
"memchr",
|
||||
"pin-project-lite",
|
||||
"pin-utils",
|
||||
"proc-macro-hack",
|
||||
"proc-macro-nested",
|
||||
"slab",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "getrandom"
|
||||
version = "0.2.3"
|
||||
|
@ -481,11 +575,12 @@ checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56"
|
|||
|
||||
[[package]]
|
||||
name = "onetun"
|
||||
version = "0.1.0"
|
||||
version = "0.1.2"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"boringtun",
|
||||
"clap",
|
||||
"futures",
|
||||
"lockfree",
|
||||
"log",
|
||||
"pretty_env_logger",
|
||||
|
@ -530,6 +625,12 @@ version = "0.2.7"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8d31d11c69a6b52a174b42bdc0c30e5e11670f90788b2c471c31c1d17d449443"
|
||||
|
||||
[[package]]
|
||||
name = "pin-utils"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
||||
|
||||
[[package]]
|
||||
name = "pretty_env_logger"
|
||||
version = "0.3.1"
|
||||
|
@ -541,6 +642,18 @@ dependencies = [
|
|||
"log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-hack"
|
||||
version = "0.5.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-nested"
|
||||
version = "0.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086"
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2"
|
||||
version = "1.0.29"
|
||||
|
@ -652,6 +765,12 @@ dependencies = [
|
|||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "slab"
|
||||
version = "0.4.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5"
|
||||
|
||||
[[package]]
|
||||
name = "slog"
|
||||
version = "2.7.0"
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "onetun"
|
||||
version = "0.1.0"
|
||||
version = "0.1.2"
|
||||
edition = "2018"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
@ -14,3 +14,4 @@ anyhow = "1"
|
|||
smoltcp = { git = "https://github.com/smoltcp-rs/smoltcp", branch = "master" }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
lockfree = "0.5.1"
|
||||
futures = "0.3.17"
|
||||
|
|
|
@ -47,6 +47,12 @@ async fn main() -> anyhow::Result<()> {
|
|||
tokio::spawn(async move { wg.consume_task().await });
|
||||
}
|
||||
|
||||
{
|
||||
// Start IP broadcast drain task for WireGuard
|
||||
let wg = wg.clone();
|
||||
tokio::spawn(async move { wg.broadcast_drain_task().await });
|
||||
}
|
||||
|
||||
info!(
|
||||
"Tunnelling [{}]->[{}] (via [{}] as peer {})",
|
||||
&config.source_addr, &config.dest_addr, &config.endpoint_addr, &config.source_peer_ip
|
||||
|
|
37
src/wg.rs
37
src/wg.rs
|
@ -4,6 +4,7 @@ use std::time::Duration;
|
|||
|
||||
use anyhow::Context;
|
||||
use boringtun::noise::{Tunn, TunnResult};
|
||||
use futures::lock::Mutex;
|
||||
use log::Level;
|
||||
use smoltcp::phy::ChecksumCapabilities;
|
||||
use smoltcp::wire::{
|
||||
|
@ -11,6 +12,7 @@ use smoltcp::wire::{
|
|||
TcpPacket, TcpRepr, TcpSeqNumber,
|
||||
};
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::port_pool::PortPool;
|
||||
|
@ -32,8 +34,8 @@ pub struct WireGuardTunnel {
|
|||
endpoint: SocketAddr,
|
||||
/// Broadcast sender for received IP packets.
|
||||
ip_broadcast_tx: tokio::sync::broadcast::Sender<Vec<u8>>,
|
||||
/// Placeholder so that the broadcaster doesn't close.
|
||||
_ip_broadcast_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
|
||||
/// Sink so that the broadcaster doesn't close. A repeating task should drain this as much as possible.
|
||||
ip_broadcast_rx_sink: Mutex<tokio::sync::broadcast::Receiver<Vec<u8>>>,
|
||||
/// Port pool.
|
||||
port_pool: Arc<PortPool>,
|
||||
}
|
||||
|
@ -47,7 +49,7 @@ impl WireGuardTunnel {
|
|||
.await
|
||||
.with_context(|| "Failed to create UDP socket for WireGuard connection")?;
|
||||
let endpoint = config.endpoint_addr;
|
||||
let (ip_broadcast_tx, ip_broadcast_rx) =
|
||||
let (ip_broadcast_tx, ip_broadcast_rx_sink) =
|
||||
tokio::sync::broadcast::channel(BROADCAST_CAPACITY);
|
||||
|
||||
Ok(Self {
|
||||
|
@ -56,7 +58,7 @@ impl WireGuardTunnel {
|
|||
udp,
|
||||
endpoint,
|
||||
ip_broadcast_tx,
|
||||
_ip_broadcast_rx: ip_broadcast_rx,
|
||||
ip_broadcast_rx_sink: Mutex::new(ip_broadcast_rx_sink),
|
||||
port_pool,
|
||||
})
|
||||
}
|
||||
|
@ -228,6 +230,33 @@ impl WireGuardTunnel {
|
|||
}
|
||||
}
|
||||
|
||||
/// A repeating task that drains the default IP broadcast channel receiver.
|
||||
/// It is necessary to keep this receiver alive to prevent the overall channel from closing,
|
||||
/// so draining its backlog regularly is required to avoid memory leaks.
|
||||
pub async fn broadcast_drain_task(&self) {
|
||||
trace!("Starting IP broadcast sink drain task");
|
||||
|
||||
loop {
|
||||
let mut sink = self.ip_broadcast_rx_sink.lock().await;
|
||||
match sink.recv().await {
|
||||
Ok(_) => {
|
||||
trace!("Drained a packet from IP broadcast sink");
|
||||
}
|
||||
Err(e) => match e {
|
||||
RecvError::Closed => {
|
||||
trace!("IP broadcast sink finished draining: channel closed");
|
||||
break;
|
||||
}
|
||||
RecvError::Lagged(_) => {
|
||||
warn!("IP broadcast sink is falling behind");
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
trace!("Stopped IP broadcast sink drain");
|
||||
}
|
||||
|
||||
fn create_tunnel(config: &Config) -> anyhow::Result<Box<Tunn>> {
|
||||
Tunn::new(
|
||||
config.private_key.clone(),
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue