From cdcd3ee8da270913c3e06442a243852ceed940fb Mon Sep 17 00:00:00 2001 From: Aram Peres Date: Wed, 12 Jan 2022 03:11:00 -0500 Subject: [PATCH] Proof of concept for using crossbeam queues instead of VecDequeue for IP device --- Cargo.lock | 21 +++++++++++++++++++++ Cargo.toml | 1 + src/events.rs | 7 +++++-- src/virtual_device.rs | 25 +++++++++---------------- 4 files changed, 36 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a423ebd..05b41ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -171,6 +171,26 @@ dependencies = [ "unreachable", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b979d76c9fcb84dffc80a73f7290da0f83e4c95773494674cb44b76d13a7a110" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcae03edb34f947e64acdb1c33ec169824e20657e9ecb61cef6c8c74dcb8120" +dependencies = [ + "cfg-if", + "lazy_static", +] + [[package]] name = "either" version = "1.6.1" @@ -539,6 +559,7 @@ dependencies = [ "async-trait", "boringtun", "clap", + "crossbeam-queue", "futures", "log", "nom", diff --git a/Cargo.toml b/Cargo.toml index 02a45ad..db47142 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,3 +18,4 @@ rand = "0.8.4" nom = "7" async-trait = "0.1.51" priority-queue = "1.2.0" +crossbeam-queue = "0.3.3" diff --git a/src/events.rs b/src/events.rs index cd76a49..23cfdd2 100644 --- a/src/events.rs +++ b/src/events.rs @@ -127,8 +127,11 @@ impl BusEndpoint { return event; } } - Err(_) => { - error!("Failed to read event bus from endpoint #{}", self.id); + Err(e) => { + error!( + "Failed to read event bus from endpoint #{}: {:?}", + self.id, e + ); return futures::future::pending().await; } } diff --git a/src/virtual_device.rs b/src/virtual_device.rs index e0e7e4d..4be7c66 100644 --- a/src/virtual_device.rs +++ b/src/virtual_device.rs @@ -3,8 +3,7 @@ use crate::events::{BusSender, Event}; use crate::Bus; use smoltcp::phy::{Device, DeviceCapabilities, Medium}; use smoltcp::time::Instant; -use std::collections::VecDeque; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; /// A virtual device that processes IP packets through smoltcp and WireGuard. pub struct VirtualIpDevice { @@ -13,7 +12,7 @@ pub struct VirtualIpDevice { /// Channel receiver for received IP packets. bus_sender: BusSender, /// Local queue for packets received from the bus that need to go through the smoltcp interface. - process_queue: Arc>>>, + process_queue: Arc>>, } impl VirtualIpDevice { @@ -21,7 +20,7 @@ impl VirtualIpDevice { pub fn new(protocol: PortProtocol, bus: Bus, max_transmission_unit: usize) -> Self { let mut bus_endpoint = bus.new_endpoint(); let bus_sender = bus_endpoint.sender(); - let process_queue = Arc::new(Mutex::new(VecDeque::new())); + let process_queue = Arc::new(crossbeam_queue::ArrayQueue::new(1000)); // TODO: Configurable? { let process_queue = process_queue.clone(); @@ -29,11 +28,11 @@ impl VirtualIpDevice { loop { match bus_endpoint.recv().await { Event::InboundInternetPacket(ip_proto, data) if ip_proto == protocol => { - let mut queue = process_queue - .lock() - .expect("Failed to acquire process queue lock"); - queue.push_back(data); - bus_endpoint.send(Event::VirtualDeviceFed(ip_proto)); + if process_queue.push(data).is_err() { + error!("VirtualIpDevice process queue full"); + } else { + bus_endpoint.send(Event::VirtualDeviceFed(ip_proto)); + } } _ => {} } @@ -54,13 +53,7 @@ impl<'a> Device<'a> for VirtualIpDevice { type TxToken = TxToken; fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> { - let next = { - let mut queue = self - .process_queue - .lock() - .expect("Failed to acquire process queue lock"); - queue.pop_front() - }; + let next = self.process_queue.pop(); match next { Some(buffer) => Some(( Self::RxToken { buffer },