Proof of concept for using crossbeam queues instead of VecDequeue for IP device

This commit is contained in:
Aram 🍐 2022-01-12 03:11:00 -05:00
parent 648154b5ee
commit cdcd3ee8da
4 changed files with 36 additions and 18 deletions

21
Cargo.lock generated
View file

@ -171,6 +171,26 @@ dependencies = [
"unreachable", "unreachable",
] ]
[[package]]
name = "crossbeam-queue"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b979d76c9fcb84dffc80a73f7290da0f83e4c95773494674cb44b76d13a7a110"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfcae03edb34f947e64acdb1c33ec169824e20657e9ecb61cef6c8c74dcb8120"
dependencies = [
"cfg-if",
"lazy_static",
]
[[package]] [[package]]
name = "either" name = "either"
version = "1.6.1" version = "1.6.1"
@ -539,6 +559,7 @@ dependencies = [
"async-trait", "async-trait",
"boringtun", "boringtun",
"clap", "clap",
"crossbeam-queue",
"futures", "futures",
"log", "log",
"nom", "nom",

View file

@ -18,3 +18,4 @@ rand = "0.8.4"
nom = "7" nom = "7"
async-trait = "0.1.51" async-trait = "0.1.51"
priority-queue = "1.2.0" priority-queue = "1.2.0"
crossbeam-queue = "0.3.3"

View file

@ -127,8 +127,11 @@ impl BusEndpoint {
return event; return event;
} }
} }
Err(_) => { Err(e) => {
error!("Failed to read event bus from endpoint #{}", self.id); error!(
"Failed to read event bus from endpoint #{}: {:?}",
self.id, e
);
return futures::future::pending().await; return futures::future::pending().await;
} }
} }

View file

@ -3,8 +3,7 @@ use crate::events::{BusSender, Event};
use crate::Bus; use crate::Bus;
use smoltcp::phy::{Device, DeviceCapabilities, Medium}; use smoltcp::phy::{Device, DeviceCapabilities, Medium};
use smoltcp::time::Instant; use smoltcp::time::Instant;
use std::collections::VecDeque; use std::sync::Arc;
use std::sync::{Arc, Mutex};
/// A virtual device that processes IP packets through smoltcp and WireGuard. /// A virtual device that processes IP packets through smoltcp and WireGuard.
pub struct VirtualIpDevice { pub struct VirtualIpDevice {
@ -13,7 +12,7 @@ pub struct VirtualIpDevice {
/// Channel receiver for received IP packets. /// Channel receiver for received IP packets.
bus_sender: BusSender, bus_sender: BusSender,
/// Local queue for packets received from the bus that need to go through the smoltcp interface. /// Local queue for packets received from the bus that need to go through the smoltcp interface.
process_queue: Arc<Mutex<VecDeque<Vec<u8>>>>, process_queue: Arc<crossbeam_queue::ArrayQueue<Vec<u8>>>,
} }
impl VirtualIpDevice { impl VirtualIpDevice {
@ -21,7 +20,7 @@ impl VirtualIpDevice {
pub fn new(protocol: PortProtocol, bus: Bus, max_transmission_unit: usize) -> Self { pub fn new(protocol: PortProtocol, bus: Bus, max_transmission_unit: usize) -> Self {
let mut bus_endpoint = bus.new_endpoint(); let mut bus_endpoint = bus.new_endpoint();
let bus_sender = bus_endpoint.sender(); let bus_sender = bus_endpoint.sender();
let process_queue = Arc::new(Mutex::new(VecDeque::new())); let process_queue = Arc::new(crossbeam_queue::ArrayQueue::new(1000)); // TODO: Configurable?
{ {
let process_queue = process_queue.clone(); let process_queue = process_queue.clone();
@ -29,12 +28,12 @@ impl VirtualIpDevice {
loop { loop {
match bus_endpoint.recv().await { match bus_endpoint.recv().await {
Event::InboundInternetPacket(ip_proto, data) if ip_proto == protocol => { Event::InboundInternetPacket(ip_proto, data) if ip_proto == protocol => {
let mut queue = process_queue if process_queue.push(data).is_err() {
.lock() error!("VirtualIpDevice process queue full");
.expect("Failed to acquire process queue lock"); } else {
queue.push_back(data);
bus_endpoint.send(Event::VirtualDeviceFed(ip_proto)); bus_endpoint.send(Event::VirtualDeviceFed(ip_proto));
} }
}
_ => {} _ => {}
} }
} }
@ -54,13 +53,7 @@ impl<'a> Device<'a> for VirtualIpDevice {
type TxToken = TxToken; type TxToken = TxToken;
fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> { fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
let next = { let next = self.process_queue.pop();
let mut queue = self
.process_queue
.lock()
.expect("Failed to acquire process queue lock");
queue.pop_front()
};
match next { match next {
Some(buffer) => Some(( Some(buffer) => Some((
Self::RxToken { buffer }, Self::RxToken { buffer },