mirror of
https://github.com/arampoire/onetun.git
synced 2025-11-30 22:00:23 -05:00
WIP: Remote port forwarding
This commit is contained in:
parent
56c950d159
commit
4abb4f9b1e
5 changed files with 116 additions and 10 deletions
|
|
@ -13,7 +13,6 @@ const DEFAULT_PORT_FORWARD_SOURCE: &str = "127.0.0.1";
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
pub port_forwards: Vec<PortForwardConfig>,
|
pub port_forwards: Vec<PortForwardConfig>,
|
||||||
#[allow(dead_code)]
|
|
||||||
pub remote_port_forwards: Vec<PortForwardConfig>,
|
pub remote_port_forwards: Vec<PortForwardConfig>,
|
||||||
pub private_key: Arc<StaticSecret>,
|
pub private_key: Arc<StaticSecret>,
|
||||||
pub endpoint_public_key: Arc<PublicKey>,
|
pub endpoint_public_key: Arc<PublicKey>,
|
||||||
|
|
|
||||||
35
src/lib.rs
35
src/lib.rs
|
|
@ -82,6 +82,10 @@ pub async fn start_tunnels(config: Config, bus: Bus) -> anyhow::Result<()> {
|
||||||
.port_forwards
|
.port_forwards
|
||||||
.iter()
|
.iter()
|
||||||
.any(|pf| pf.protocol == PortProtocol::Udp)
|
.any(|pf| pf.protocol == PortProtocol::Udp)
|
||||||
|
|| config
|
||||||
|
.remote_port_forwards
|
||||||
|
.iter()
|
||||||
|
.any(|pf| pf.protocol == PortProtocol::Udp)
|
||||||
{
|
{
|
||||||
// UDP device
|
// UDP device
|
||||||
let bus = bus.clone();
|
let bus = bus.clone();
|
||||||
|
|
@ -90,7 +94,13 @@ pub async fn start_tunnels(config: Config, bus: Bus) -> anyhow::Result<()> {
|
||||||
|
|
||||||
// Start UDP Virtual Interface
|
// Start UDP Virtual Interface
|
||||||
let port_forwards = config.port_forwards.clone();
|
let port_forwards = config.port_forwards.clone();
|
||||||
let iface = UdpVirtualInterface::new(port_forwards, bus, config.source_peer_ip);
|
let remote_port_forwards = config.remote_port_forwards.clone();
|
||||||
|
let iface = UdpVirtualInterface::new(
|
||||||
|
port_forwards,
|
||||||
|
remote_port_forwards,
|
||||||
|
bus,
|
||||||
|
config.source_peer_ip,
|
||||||
|
);
|
||||||
tokio::spawn(async move { iface.poll_loop(device).await });
|
tokio::spawn(async move { iface.poll_loop(device).await });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -118,5 +128,28 @@ pub async fn start_tunnels(config: Config, bus: Bus) -> anyhow::Result<()> {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let remote_port_forwards = config.remote_port_forwards;
|
||||||
|
|
||||||
|
remote_port_forwards
|
||||||
|
.into_iter()
|
||||||
|
.map(|pf| {
|
||||||
|
(
|
||||||
|
pf,
|
||||||
|
wg.clone(),
|
||||||
|
tcp_port_pool.clone(),
|
||||||
|
udp_port_pool.clone(),
|
||||||
|
bus.clone(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.for_each(move |(pf, wg, tcp_port_pool, udp_port_pool, bus)| {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
tunnel::remote_port_forward(pf, tcp_port_pool, udp_port_pool, wg, bus)
|
||||||
|
.await
|
||||||
|
.unwrap_or_else(|e| error!("Remote port-forward failed for {} : {}", pf, e))
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -32,3 +32,21 @@ pub async fn port_forward(
|
||||||
PortProtocol::Udp => udp::udp_proxy_server(port_forward, udp_port_pool, bus).await,
|
PortProtocol::Udp => udp::udp_proxy_server(port_forward, udp_port_pool, bus).await,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn remote_port_forward(
|
||||||
|
port_forward: PortForwardConfig,
|
||||||
|
_tcp_port_pool: TcpPortPool,
|
||||||
|
udp_port_pool: UdpPortPool,
|
||||||
|
wg: Arc<WireGuardTunnel>,
|
||||||
|
bus: Bus,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
info!(
|
||||||
|
"Remote Tunneling {} [{}]<-[{}] (via [{}])",
|
||||||
|
port_forward.protocol, port_forward.destination, port_forward.source, &wg.endpoint,
|
||||||
|
);
|
||||||
|
|
||||||
|
match port_forward.protocol {
|
||||||
|
PortProtocol::Tcp => Ok(()), // TODO: Remote TCP forwarding
|
||||||
|
PortProtocol::Udp => udp::udp_proxy_server(port_forward, udp_port_pool, bus).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
use std::collections::{HashMap, VecDeque};
|
use std::collections::{HashMap, VecDeque};
|
||||||
use std::net::{IpAddr, SocketAddr};
|
use std::net::{IpAddr, SocketAddr};
|
||||||
use std::ops::Range;
|
use std::ops::Range;
|
||||||
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
|
|
@ -35,7 +36,22 @@ pub async fn udp_proxy_server(
|
||||||
bus: Bus,
|
bus: Bus,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let mut endpoint = bus.new_endpoint();
|
let mut endpoint = bus.new_endpoint();
|
||||||
let socket = UdpSocket::bind(port_forward.source)
|
|
||||||
|
// Remote port forwards bind on localhost. Regular port forwards bind on the given source.
|
||||||
|
let bind = if port_forward.remote {
|
||||||
|
port_pool
|
||||||
|
.reserve(port_forward.source.port(), port_forward.destination)
|
||||||
|
.await
|
||||||
|
.with_context(|| "Failed to assign virtual port for remote UDP port forward")?;
|
||||||
|
match port_forward.source.ip() {
|
||||||
|
IpAddr::V4(_) => SocketAddr::from((IpAddr::from_str("0.0.0.0").unwrap(), 0)),
|
||||||
|
IpAddr::V6(_) => SocketAddr::from((IpAddr::from_str("[::]").unwrap(), 0)),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
port_forward.source
|
||||||
|
};
|
||||||
|
|
||||||
|
let socket = UdpSocket::bind(bind)
|
||||||
.await
|
.await
|
||||||
.context("Failed to bind on UDP proxy address")?;
|
.context("Failed to bind on UDP proxy address")?;
|
||||||
|
|
||||||
|
|
@ -156,6 +172,14 @@ impl UdpPortPool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Takes the given port out of the pool, marking it with the given peer address, for an unlimited amount of time.
|
||||||
|
pub async fn reserve(&self, port: u16, peer_addr: SocketAddr) -> anyhow::Result<VirtualPort> {
|
||||||
|
let mut inner = self.inner.write().await;
|
||||||
|
inner.port_by_peer_addr.insert(peer_addr, port);
|
||||||
|
inner.peer_addr_by_port.insert(port, peer_addr);
|
||||||
|
Ok(VirtualPort::new(port, PortProtocol::Udp))
|
||||||
|
}
|
||||||
|
|
||||||
/// Requests a free port from the pool. An error is returned if none is available (exhausted max capacity).
|
/// Requests a free port from the pool. An error is returned if none is available (exhausted max capacity).
|
||||||
pub async fn next(&self, peer_addr: SocketAddr) -> anyhow::Result<VirtualPort> {
|
pub async fn next(&self, peer_addr: SocketAddr) -> anyhow::Result<VirtualPort> {
|
||||||
// A port found to be reused. This is outside of the block because the read lock cannot be upgraded to a write lock.
|
// A port found to be reused. This is outside of the block because the read lock cannot be upgraded to a write lock.
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,7 @@ const MAX_PACKET: usize = 65536;
|
||||||
pub struct UdpVirtualInterface {
|
pub struct UdpVirtualInterface {
|
||||||
source_peer_ip: IpAddr,
|
source_peer_ip: IpAddr,
|
||||||
port_forwards: Vec<PortForwardConfig>,
|
port_forwards: Vec<PortForwardConfig>,
|
||||||
|
remote_port_forwards: Vec<PortForwardConfig>,
|
||||||
bus: Bus,
|
bus: Bus,
|
||||||
sockets: SocketSet<'static>,
|
sockets: SocketSet<'static>,
|
||||||
}
|
}
|
||||||
|
|
@ -30,12 +31,21 @@ pub struct UdpVirtualInterface {
|
||||||
impl UdpVirtualInterface {
|
impl UdpVirtualInterface {
|
||||||
/// Initialize the parameters for a new virtual interface.
|
/// Initialize the parameters for a new virtual interface.
|
||||||
/// Use the `poll_loop()` future to start the virtual interface poll loop.
|
/// Use the `poll_loop()` future to start the virtual interface poll loop.
|
||||||
pub fn new(port_forwards: Vec<PortForwardConfig>, bus: Bus, source_peer_ip: IpAddr) -> Self {
|
pub fn new(
|
||||||
|
port_forwards: Vec<PortForwardConfig>,
|
||||||
|
remote_port_forwards: Vec<PortForwardConfig>,
|
||||||
|
bus: Bus,
|
||||||
|
source_peer_ip: IpAddr,
|
||||||
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
port_forwards: port_forwards
|
port_forwards: port_forwards
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter(|f| matches!(f.protocol, PortProtocol::Udp))
|
.filter(|f| matches!(f.protocol, PortProtocol::Udp))
|
||||||
.collect(),
|
.collect(),
|
||||||
|
remote_port_forwards: remote_port_forwards
|
||||||
|
.into_iter()
|
||||||
|
.filter(|f| matches!(f.protocol, PortProtocol::Udp))
|
||||||
|
.collect(),
|
||||||
source_peer_ip,
|
source_peer_ip,
|
||||||
bus,
|
bus,
|
||||||
sockets: SocketSet::new([]),
|
sockets: SocketSet::new([]),
|
||||||
|
|
@ -129,11 +139,30 @@ impl VirtualInterfacePoll for UdpVirtualInterface {
|
||||||
let mut send_queue: HashMap<VirtualPort, VecDeque<(PortForwardConfig, Bytes)>> =
|
let mut send_queue: HashMap<VirtualPort, VecDeque<(PortForwardConfig, Bytes)>> =
|
||||||
HashMap::new();
|
HashMap::new();
|
||||||
|
|
||||||
|
// Create sockets for remote port forwards
|
||||||
|
for remote_port_forward in self.remote_port_forwards.iter() {
|
||||||
|
let virtual_port =
|
||||||
|
VirtualPort::new(remote_port_forward.source.port(), PortProtocol::Udp);
|
||||||
|
let client_socket = UdpVirtualInterface::new_client_socket(
|
||||||
|
remote_port_forward.source.ip(),
|
||||||
|
virtual_port,
|
||||||
|
)?;
|
||||||
|
debug!(
|
||||||
|
"Created remote client socket: {:?}",
|
||||||
|
client_socket.endpoint()
|
||||||
|
);
|
||||||
|
let client_handle = self.sockets.add(client_socket);
|
||||||
|
port_client_handle_map.insert(virtual_port, client_handle);
|
||||||
|
send_queue.insert(virtual_port, VecDeque::new());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut wake = false;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = match (next_poll, port_client_handle_map.len()) {
|
_ = match (next_poll, wake) {
|
||||||
(None, 0) => tokio::time::sleep(Duration::MAX),
|
(None, false) => tokio::time::sleep(Duration::MAX),
|
||||||
(None, _) => tokio::time::sleep(Duration::ZERO),
|
(None, true) => tokio::time::sleep(Duration::ZERO),
|
||||||
(Some(until), _) => tokio::time::sleep_until(until),
|
(Some(until), _) => tokio::time::sleep_until(until),
|
||||||
} => {
|
} => {
|
||||||
let loop_start = smoltcp::time::Instant::now();
|
let loop_start = smoltcp::time::Instant::now();
|
||||||
|
|
@ -147,11 +176,11 @@ impl VirtualInterfacePoll for UdpVirtualInterface {
|
||||||
if client_socket.can_send() {
|
if client_socket.can_send() {
|
||||||
if let Some(send_queue) = send_queue.get_mut(virtual_port) {
|
if let Some(send_queue) = send_queue.get_mut(virtual_port) {
|
||||||
let to_transfer = send_queue.pop_front();
|
let to_transfer = send_queue.pop_front();
|
||||||
if let Some((port_forward, data)) = to_transfer {
|
if let Some((destination, data)) = to_transfer {
|
||||||
client_socket
|
client_socket
|
||||||
.send_slice(
|
.send_slice(
|
||||||
&data,
|
&data,
|
||||||
UdpMetadata::from(port_forward.destination),
|
UdpMetadata::from(destination.destination),
|
||||||
)
|
)
|
||||||
.unwrap_or_else(|e| {
|
.unwrap_or_else(|e| {
|
||||||
error!(
|
error!(
|
||||||
|
|
@ -164,8 +193,9 @@ impl VirtualInterfacePoll for UdpVirtualInterface {
|
||||||
}
|
}
|
||||||
if client_socket.can_recv() {
|
if client_socket.can_recv() {
|
||||||
match client_socket.recv() {
|
match client_socket.recv() {
|
||||||
Ok((data, _peer)) => {
|
Ok((data, peer)) => {
|
||||||
if !data.is_empty() {
|
if !data.is_empty() {
|
||||||
|
trace!("notifying remote data from peer: {}", peer);
|
||||||
endpoint.send(Event::RemoteData(*virtual_port, data.to_vec().into()));
|
endpoint.send(Event::RemoteData(*virtual_port, data.to_vec().into()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -204,9 +234,11 @@ impl VirtualInterfacePoll for UdpVirtualInterface {
|
||||||
send_queue.insert(virtual_port, VecDeque::from(vec![(port_forward, data)]));
|
send_queue.insert(virtual_port, VecDeque::from(vec![(port_forward, data)]));
|
||||||
}
|
}
|
||||||
next_poll = None;
|
next_poll = None;
|
||||||
|
wake = true;
|
||||||
}
|
}
|
||||||
Event::VirtualDeviceFed(PortProtocol::Udp) => {
|
Event::VirtualDeviceFed(PortProtocol::Udp) => {
|
||||||
next_poll = None;
|
next_poll = None;
|
||||||
|
wake = true;
|
||||||
}
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue