diff --git a/src/ip_sink.rs b/src/ip_sink.rs new file mode 100644 index 0000000..f17eb21 --- /dev/null +++ b/src/ip_sink.rs @@ -0,0 +1,35 @@ +use crate::virtual_device::VirtualIpDevice; +use crate::wg::WireGuardTunnel; +use smoltcp::iface::InterfaceBuilder; +use smoltcp::socket::SocketSet; +use std::sync::Arc; +use tokio::time::Duration; + +/// A repeating task that processes unroutable IP packets. +pub async fn run_ip_sink_interface(wg: Arc) -> ! { + // Initialize interface + let device = VirtualIpDevice::new_sink(wg) + .await + .expect("Failed to initialize VirtualIpDevice for sink interface"); + + // No sockets on sink interface + let mut socket_set_entries: [_; 0] = Default::default(); + let mut socket_set = SocketSet::new(&mut socket_set_entries[..]); + let mut virtual_interface = InterfaceBuilder::new(device).ip_addrs([]).finalize(); + + loop { + let loop_start = smoltcp::time::Instant::now(); + match virtual_interface.poll(&mut socket_set, loop_start) { + Ok(processed) if processed => { + trace!("[SINK] Virtual interface polled some packets to be processed",); + tokio::time::sleep(Duration::from_millis(1)).await; + } + Err(e) => { + error!("[SINK] Virtual interface poll error: {:?}", e); + } + _ => { + tokio::time::sleep(Duration::from_millis(5)).await; + } + } + } +} diff --git a/src/main.rs b/src/main.rs index 7ae3e0f..afacddd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,6 +18,7 @@ use crate::virtual_device::VirtualIpDevice; use crate::wg::WireGuardTunnel; pub mod config; +pub mod ip_sink; pub mod port_pool; pub mod virtual_device; pub mod wg; @@ -47,6 +48,12 @@ async fn main() -> anyhow::Result<()> { tokio::spawn(async move { wg.consume_task().await }); } + { + // Start IP sink task for incoming IP packets + let wg = wg.clone(); + tokio::spawn(async move { ip_sink::run_ip_sink_interface(wg).await }); + } + info!( "Tunnelling [{}]->[{}] (via [{}] as peer {})", &config.source_addr, &config.dest_addr, &config.endpoint_addr, &config.source_peer_ip @@ -278,7 +285,6 @@ async fn virtual_tcp_interface( IpCidr::new(IpAddress::from(source_peer_ip), 32), IpCidr::new(IpAddress::from(dest_addr.ip()), 32), ]) - .any_ip(true) .finalize(); // Server socket: this is a placeholder for the interface to route new connections to. diff --git a/src/virtual_device.rs b/src/virtual_device.rs index 3ca2416..b8a61ce 100644 --- a/src/virtual_device.rs +++ b/src/virtual_device.rs @@ -22,6 +22,14 @@ impl VirtualIpDevice { Ok(Self { wg, ip_dispatch_rx }) } + + pub async fn new_sink(wg: Arc) -> anyhow::Result { + let ip_dispatch_rx = wg + .register_sink_interface() + .await + .with_context(|| "Failed to register IP dispatch for sink virtual interface")?; + Ok(Self { wg, ip_dispatch_rx }) + } } impl<'a> Device<'a> for VirtualIpDevice { diff --git a/src/wg.rs b/src/wg.rs index f748794..f91f6d1 100644 --- a/src/wg.rs +++ b/src/wg.rs @@ -10,6 +10,7 @@ use smoltcp::wire::{ TcpPacket, TcpRepr, TcpSeqNumber, }; use tokio::net::UdpSocket; +use tokio::sync::RwLock; use crate::config::Config; use crate::MAX_PACKET; @@ -30,6 +31,8 @@ pub struct WireGuardTunnel { endpoint: SocketAddr, /// Maps virtual ports to the corresponding IP packet dispatcher. virtual_port_ip_tx: lockfree::map::Map>>, + /// IP packet dispatcher for unroutable packets. `None` if not initialized. + sink_ip_tx: RwLock>>>, } impl WireGuardTunnel { @@ -49,6 +52,7 @@ impl WireGuardTunnel { udp, endpoint, virtual_port_ip_tx, + sink_ip_tx: RwLock::new(None), }) } @@ -98,6 +102,18 @@ impl WireGuardTunnel { } } + /// Register a virtual interface (using its assigned virtual port) with the given IP packet `Sender`. + pub async fn register_sink_interface( + &self, + ) -> anyhow::Result>> { + let (sender, receiver) = tokio::sync::mpsc::channel(DISPATCH_CAPACITY); + + let mut sink_ip_tx = self.sink_ip_tx.write().await; + *sink_ip_tx = Some(sender); + + Ok(receiver) + } + /// Releases the virtual interface from IP dispatch. pub fn release_virtual_interface(&self, virtual_port: u16) { self.virtual_port_ip_tx.remove(&virtual_port); @@ -223,7 +239,7 @@ impl WireGuardTunnel { } RouteResult::TcpReset => { trace!("Resetting dead TCP connection after packet from WireGuard endpoint"); - self.route_tcp_sink(packet).await.unwrap_or_else(|e| { + self.route_ip_sink(packet).await.unwrap_or_else(|e| { error!("Failed to send TCP reset to sink: {:?}", e) }); } @@ -295,10 +311,21 @@ impl WireGuardTunnel { .unwrap_or(RouteResult::Drop) } - /// Route a packet to the TCP sink interface. - async fn route_tcp_sink(&self, _packet: &[u8]) -> anyhow::Result<()> { - // TODO - Ok(()) + /// Route a packet to the IP sink interface. + async fn route_ip_sink(&self, packet: &[u8]) -> anyhow::Result<()> { + let ip_sink_tx = self.sink_ip_tx.read().await; + + if let Some(ip_sink_tx) = &*ip_sink_tx { + ip_sink_tx + .send(packet.to_vec()) + .await + .with_context(|| "Failed to dispatch IP packet to sink interface") + } else { + warn!( + "Could not dispatch unroutable IP packet to sink because interface is not active." + ); + Ok(()) + } } }