diff --git a/src/main.rs b/src/main.rs index c1e4a12..649dafe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -46,6 +46,13 @@ fn main() -> anyhow::Result<()> { let source_peer_ip = config.source_peer_ip; let dest_addr_ip = config.dest_addr.ip(); let dest_addr_port = config.dest_addr.port(); + let endpoint_addr = config.endpoint_addr; + + // tx/rx for unencrypted IP packets to send through wireguard tunnel + let (wg_send_tx, wg_send_rx) = crossbeam_channel::unbounded::>(); + + // tx/rx for unencrypted IP packets that were received through wireguard tunnel + let (wg_recv_tx, wg_recv_rx) = crossbeam_channel::unbounded::>(); // Initialize peer based on config let peer = Tunn::new( @@ -59,10 +66,125 @@ fn main() -> anyhow::Result<()> { .map_err(|s| anyhow::anyhow!("{}", s)) .with_context(|| "Failed to initialize peer")?; + let peer = Arc::new(peer); + + let endpoint_socket = + Arc::new(UdpSocket::bind("0.0.0.0:0").with_context(|| "Failed to bind endpoint socket")?); + + // Thread that encapsulates and sends WG packets + { + let peer = peer.clone(); + let endpoint_socket = endpoint_socket.clone(); + + thread::spawn(move || { + let peer = peer.clone(); + loop { + let mut send_buf = [0u8; MAX_PACKET]; + match wg_send_rx.recv() { + Ok(next) => match peer.encapsulate(next.as_slice(), &mut send_buf) { + TunnResult::WriteToNetwork(packet) => { + endpoint_socket + .send_to(packet, endpoint_addr) + .expect("failed to send packet to wg endpoint"); + } + TunnResult::Err(e) => { + error!("Failed to encapsulate: {:?}", e); + } + TunnResult::Done => { + // Ignored + } + other => { + error!("Unexpected TunnResult during encapsulation: {:?}", other); + } + }, + Err(e) => { + error!("Failed to consume from wg_send_rx channel: {}", e); + } + } + } + }); + } + { + let peer = peer.clone(); + let endpoint_socket = endpoint_socket.clone(); + + thread::spawn(move || loop { + // Listen on the network + let mut recv_buf = [0u8; MAX_PACKET]; + let mut send_buf = [0u8; MAX_PACKET]; + + let n = match endpoint_socket.recv(&mut recv_buf) { + Ok(n) => n, + Err(e) => { + error!("Failed to read from endpoint socket: {}", e); + break; + } + }; + + let data = &recv_buf[..n]; + match peer.decapsulate(None, data, &mut send_buf) { + TunnResult::WriteToNetwork(packet) => { + endpoint_socket + .send_to(packet, endpoint_addr) + .expect("failed to send packet to wg endpoint"); + loop { + let mut send_buf = [0u8; MAX_PACKET]; + match peer.decapsulate(None, &[], &mut send_buf) { + TunnResult::WriteToNetwork(packet) => { + endpoint_socket + .send_to(packet, endpoint_addr) + .expect("failed to send packet to wg endpoint"); + } + _ => { + break; + } + } + } + } + TunnResult::WriteToTunnelV4(packet, _) => { + debug!("Got {} bytes to send back to client", packet.len()); + wg_recv_tx + .send(packet.to_vec()) + .expect("failed to queue received wg packet"); + } + TunnResult::WriteToTunnelV6(packet, _) => { + debug!("Got {} bytes to send back to client", packet.len()); + wg_recv_tx + .send(packet.to_vec()) + .expect("failed to queue received wg packet"); + } + _ => {} + } + }); + } + + // Maintenance thread + { + let peer = peer.clone(); + let endpoint_socket = endpoint_socket.clone(); + + thread::spawn(move || loop { + let mut send_buf = [0u8; MAX_PACKET]; + match peer.update_timers(&mut send_buf) { + TunnResult::WriteToNetwork(packet) => { + debug!("Sending maintenance message: {} bytes", packet.len()); + endpoint_socket + .send_to(packet, endpoint_addr) + .expect("failed to send maintenance packet to endpoint address"); + } + _ => {} + } + + thread::sleep(Duration::from_millis(200)); + }); + } + let proxy_listener = TcpListener::bind(config.source_addr).unwrap(); for client_stream in proxy_listener.incoming() { client_stream .map(|client_stream| { + let wg_send_tx = wg_send_tx.clone(); + // Pick a port // TODO: Pool let port = 60000; @@ -161,8 +283,9 @@ fn main() -> anyhow::Result<()> { }; if src_addr == source_peer_ip { - // TODO: Encapsulate and send to WG debug!("[{}] IP packet: {} bytes from {} to send to WG", port, recv.len(), src_addr); + // Add to queue to be encapsulated and sent by other thread + wg_send_tx.send(recv).expect("failed to write to wg_send_tx channel"); } } @@ -181,7 +304,6 @@ fn main() -> anyhow::Result<()> { // This thread simulates the IP-layer communication between the client and server. // * When we get data from the 'real' client, we send it via the virtual client // * When the virtual client sends data, it generates IP packets, which are captures via ip_rx/ip_tx - // * When the real destination sends data (via WG endpoint), we send it via the virtual server thread::spawn(move || { let stopped = Arc::clone(&stopped_2); @@ -315,143 +437,9 @@ fn main() -> anyhow::Result<()> { debug!("[{}] Virtual thread stopped", port); stopped.store(true, Ordering::Relaxed); }); + // * When the real destination sends IP packets (via WG endpoint), we send it via the device/interface }) .unwrap_or_else(|e| error!("{:?}", e)); } - - // TCP thread - // let mut handles = Vec::with_capacity(3); - // - // let endpoint_sock = - // Arc::new(UdpSocket::bind("0.0.0.0:0").with_context(|| "Failed to bind endpoint socket")?); - // - // let endpoint_addr = config.endpoint_addr; - // - // let source_peer_addr = SocketAddr::new(config.source_peer_ip, 1234); - // let destination_addr = config.dest_addr; - // - // let close = Arc::new(AtomicBool::new(false)); - // - // - // // thread 1: read from endpoint, forward to peer - // { - // let close = close.clone(); - // let peer = peer.clone(); - // let source_sock = source_sock.clone(); - // let endpoint_sock = endpoint_sock.clone(); - // - // handles.push(thread::spawn(move || loop { - // // Listen on the network - // let mut recv_buf = [0u8; MAX_PACKET]; - // let mut send_buf = [0u8; MAX_PACKET]; - // - // let n = match endpoint_sock.recv(&mut recv_buf) { - // Ok(n) => n, - // Err(_) => { - // if close.load(Ordering::Relaxed) { - // return; - // } - // continue; - // } - // }; - // - // let data = &recv_buf[..n]; - // match peer.decapsulate(None, data, &mut send_buf) { - // TunnResult::WriteToNetwork(packet) => { - // send_packet(packet, endpoint_sock.clone(), endpoint_addr).unwrap(); - // loop { - // let mut send_buf = [0u8; MAX_PACKET]; - // match peer.decapsulate(None, &[], &mut send_buf) { - // TunnResult::WriteToNetwork(packet) => { - // send_packet(packet, endpoint_sock.clone(), endpoint_addr).unwrap(); - // } - // _ => { - // break; - // } - // } - // } - // } - // TunnResult::WriteToTunnelV4(packet, _) => { - // source_sock.send(packet).unwrap(); - // } - // TunnResult::WriteToTunnelV6(packet, _) => { - // source_sock.send(packet).unwrap(); - // } - // _ => {} - // } - // })); - // } - // - // // thread 2: read from peer socket - // { - // let close = close.clone(); - // let peer = peer.clone(); - // let source_sock = source_sock.clone(); - // let endpoint_sock = endpoint_sock.clone(); - // - // handles.push(thread::spawn(move || loop { - // let mut recv_buf = [0u8; MAX_PACKET]; - // let mut send_buf = [0u8; MAX_PACKET]; - // - // let n = match source_sock.recv(&mut recv_buf) { - // Ok(n) => n, - // Err(_) => { - // if close.load(Ordering::Relaxed) { - // return; - // } - // continue; - // } - // }; - // - // let data = &recv_buf[..n]; - // - // // TODO: Support TCP - // let ip_packet = - // wrap_data_packet(Protocol::Udp, data, source_peer_addr, destination_addr) - // .expect("Failed to wrap data packet"); - // - // debug!("Crafted IP packet: {:#?}", ip_packet); - // - // match peer.encapsulate(ip_packet.as_slice(), &mut send_buf) { - // TunnResult::WriteToNetwork(packet) => { - // send_packet(packet, endpoint_sock.clone(), endpoint_addr).unwrap(); - // } - // TunnResult::Err(e) => { - // error!("Failed to encapsulate: {:?}", e); - // } - // other => { - // error!("Unexpected TunnResult during encapsulation: {:?}", other); - // } - // } - // })); - // } - // - // // thread 3: maintenance - // { - // let close = close.clone(); - // let peer = peer.clone(); - // let endpoint_sock = endpoint_sock.clone(); - // - // handles.push(thread::spawn(move || loop { - // if close.load(Ordering::Relaxed) { - // return; - // } - // - // let mut send_buf = [0u8; MAX_PACKET]; - // match peer.update_timers(&mut send_buf) { - // TunnResult::WriteToNetwork(packet) => { - // send_packet(packet, endpoint_sock.clone(), endpoint_addr).unwrap(); - // } - // _ => {} - // } - // - // thread::sleep(Duration::from_millis(200)); - // })); - // } - // - // - // for handle in handles { - // handle.join().expect("Failed to join thread") - // } Ok(()) }