Add sentence I/O to blocking ConnectionStream

This commit is contained in:
Aram 🍐 2021-08-04 17:48:20 -04:00
parent ea96f433e6
commit a92500e67b
10 changed files with 343 additions and 18 deletions

62
Cargo.lock generated
View file

@ -52,6 +52,22 @@ version = "3.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c59e7af012c713f529e7a3ee57ce9b31ddd858d4b512923602f74608b009631"
[[package]]
name = "byteorder"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c"
dependencies = [
"byteorder",
"iovec",
]
[[package]]
name = "bytes"
version = "1.0.1"
@ -85,6 +101,12 @@ dependencies = [
"vec_map",
]
[[package]]
name = "futures"
version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678"
[[package]]
name = "hermit-abi"
version = "0.1.19"
@ -94,6 +116,15 @@ dependencies = [
"libc",
]
[[package]]
name = "iovec"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.51"
@ -152,6 +183,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "mockstream"
version = "0.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35bbe0c0c9d254b463b13734bc361d1423289547e052b1e77e5a77292496ba2e"
[[package]]
name = "ntapi"
version = "0.3.6"
@ -220,9 +257,11 @@ dependencies = [
name = "rups"
version = "0.5.2"
dependencies = [
"mockstream",
"rustls",
"shell-words",
"tokio",
"tokio-mockstream",
"tokio-rustls",
"webpki",
"webpki-roots",
@ -305,7 +344,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b7b349f11a7047e6d1276853e612d152f5e8a352c61917887cc2169e2366b4c"
dependencies = [
"autocfg",
"bytes",
"bytes 1.0.1",
"libc",
"memchr",
"mio",
@ -315,6 +354,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "tokio-io"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674"
dependencies = [
"bytes 0.4.12",
"futures",
"log",
]
[[package]]
name = "tokio-macros"
version = "1.3.0"
@ -326,6 +376,16 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-mockstream"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41bfc436ef8b7f60c19adf3df086330ae9992385e4d8c53b17a323cad288e155"
dependencies = [
"futures",
"tokio-io",
]
[[package]]
name = "tokio-rustls"
version = "0.22.0"

View file

@ -21,6 +21,10 @@ webpki-roots = { version = "0.21", optional = true }
tokio = { version = "1", optional = true, features = ["net", "io-util", "rt"] }
tokio-rustls = { version = "0.22", optional = true }
[dev-dependencies]
mockstream = "0.0.3"
tokio-mockstream = "1.1.0"
[features]
default = []
ssl = ["rustls", "rustls/dangerous_configuration", "webpki", "webpki-roots"]

View file

@ -0,0 +1,12 @@
use crate::blocking::stream::ConnectionStream;
use crate::Config;
/// A synchronous NUT client.
pub struct Client {
/// The client configuration.
config: Config,
/// The client connection.
stream: ConnectionStream,
}
impl Client {}

View file

@ -5,6 +5,7 @@ use crate::blocking::stream::ConnectionStream;
use crate::cmd::{Command, Response};
use crate::{Config, Error, Host, NutError};
mod client;
mod stream;
/// A blocking NUT client connection.

View file

@ -1,4 +1,7 @@
use std::io::{Read, Write};
use crate::proto::util::{join_sentence, split_sentence};
use crate::proto::Sentence;
use crate::{Error, NutError};
use std::io::{BufRead, BufReader, Read, Write};
use std::net::TcpStream;
/// A wrapper for various synchronous stream types.
@ -13,6 +16,10 @@ pub enum ConnectionStream {
/// A server stream wrapped with SSL using `rustls`.
#[cfg(feature = "ssl")]
SslServer(Box<rustls::StreamOwned<rustls::ServerSession, ConnectionStream>>),
/// A mock stream, used for testing.
#[cfg(test)]
Mock(mockstream::SharedMockStream),
}
impl ConnectionStream {
@ -37,6 +44,82 @@ impl ConnectionStream {
rustls::StreamOwned::new(session, self),
)))
}
/// Writes a sentence on the current stream.
pub fn write_sentence<T: Sentence>(&mut self, sentence: &T) -> crate::Result<()> {
let encoded = sentence.encode();
let joined = join_sentence(encoded);
self.write_literal(&joined)?;
self.flush().map_err(crate::Error::Io)
}
/// Writes a collection of sentences on the current stream.
pub fn write_sentences<T: Sentence>(&mut self, sentences: &[T]) -> crate::Result<()> {
for sentence in sentences {
let encoded = sentence.encode();
let joined = join_sentence(encoded);
self.write_literal(&joined)?;
}
self.flush().map_err(crate::Error::Io)
}
/// Writes a literal string to the current stream.
/// Note: the literal string MUST end with a new-line character (`\n`).
///
/// Note: does not automatically flush.
pub fn write_literal(&mut self, literal: &str) -> crate::Result<()> {
assert!(literal.ends_with('\n'));
self.write_all(literal.as_bytes())?;
Ok(())
}
/// Reads a literal string from the current stream.
/// Note: the literal string will ends with a new-line character (`\n`).
pub fn read_literal(reader: &mut BufReader<&mut Self>) -> crate::Result<String> {
let mut raw = String::new();
reader.read_line(&mut raw)?;
Ok(raw)
}
/// Reads a sentence from the given `BufReader`.
pub fn read_sentence<T: Sentence>(reader: &mut BufReader<&mut Self>) -> crate::Result<T> {
let raw = Self::read_literal(reader)?;
if raw.is_empty() {
return Err(Error::eof());
}
let split = split_sentence(raw).ok_or(crate::NutError::NotProcessable)?;
T::decode(split)
.ok_or(Error::Nut(NutError::InvalidArgument))?
.into()
}
/// Reads all sentences in the buffer until the given `matcher` function evaluates to `true`.
///
/// The final sentence is excluded.
pub fn read_sentences_until<T: Sentence, F: Fn(&T) -> bool>(
reader: &mut BufReader<&mut Self>,
matcher: F,
) -> crate::Result<Vec<T>> {
let mut result = Vec::new();
let max_iter = 1000; // Exit after 1000 lines to prevent overflow.
for _ in 0..max_iter {
let sentence: T = Self::read_sentence(reader)?;
if matcher(&sentence) {
return Ok(result);
} else {
result.push(sentence);
}
}
Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::Interrupted,
"Reached maximum read capacity.",
)))
}
/// Initializes a new `BufReader` for the current stream.
pub fn buffer(&mut self) -> BufReader<&mut Self> {
BufReader::new(self)
}
}
impl Read for ConnectionStream {
@ -47,6 +130,8 @@ impl Read for ConnectionStream {
Self::SslClient(stream) => stream.read(buf),
#[cfg(feature = "ssl")]
Self::SslServer(stream) => stream.read(buf),
#[cfg(test)]
Self::Mock(stream) => stream.read(buf),
}
}
}
@ -59,6 +144,12 @@ impl Write for ConnectionStream {
Self::SslClient(stream) => stream.write(buf),
#[cfg(feature = "ssl")]
Self::SslServer(stream) => stream.write(buf),
#[cfg(test)]
Self::Mock(stream) => {
let len = buf.len();
stream.push_bytes_to_read(buf);
Ok(len)
}
}
}
@ -69,6 +160,113 @@ impl Write for ConnectionStream {
Self::SslClient(stream) => stream.flush(),
#[cfg(feature = "ssl")]
Self::SslServer(stream) => stream.flush(),
#[cfg(test)]
Self::Mock(stream) => stream.flush(),
}
}
}
#[cfg(test)]
mod tests {
use super::ConnectionStream;
use crate::proto::{ClientSentences, ServerSentences};
use std::io::{Read, Write};
#[test]
fn read_write_sentence() {
let mut client_stream = mockstream::SharedMockStream::new();
let mut server_stream = client_stream.clone();
let mut client_stream = ConnectionStream::Mock(client_stream);
let mut server_stream = ConnectionStream::Mock(server_stream);
// Client requests list of UPS devices
client_stream
.write_sentence(&ServerSentences::QueryListUps {})
.expect("Failed to write LIST UPS");
// Server reads query for list of UPS devices
let mut server_buffer = server_stream.buffer();
let sentence: ServerSentences =
ConnectionStream::read_sentence(&mut server_buffer).expect("Failed to read LIST UPS");
assert_eq!(sentence, ServerSentences::QueryListUps {});
// Server sends list of UPS devices.
server_stream
.write_sentences(&[
ClientSentences::BeginListUps {},
ClientSentences::RespondUps {
ups_name: "nutdev0".into(),
description: "A NUT device.".into(),
},
ClientSentences::RespondUps {
ups_name: "nutdev1".into(),
description: "Another NUT device.".into(),
},
ClientSentences::EndListUps {},
])
.expect("Failed to write UPS LIST");
// Client reads list of UPS devices.
let mut client_buffer = client_stream.buffer();
let sentence: ClientSentences = ConnectionStream::read_sentence(&mut client_buffer)
.expect("Failed to read BEGIN LIST UPS");
assert_eq!(sentence, ClientSentences::BeginListUps {});
let sentences: Vec<ClientSentences> =
ConnectionStream::read_sentences_until(&mut client_buffer, |s| {
matches!(s, ClientSentences::EndListUps {})
})
.expect("Failed to read UPS items");
assert_eq!(
sentences,
vec![
ClientSentences::RespondUps {
ups_name: "nutdev0".into(),
description: "A NUT device.".into(),
},
ClientSentences::RespondUps {
ups_name: "nutdev1".into(),
description: "Another NUT device.".into(),
},
]
);
// Client sends login
client_stream
.write_sentence(&ServerSentences::ExecLogin {
ups_name: "nutdev0".into(),
})
.expect("Failed to write LOGIN nutdev0");
// Server receives login
let mut server_buffer = server_stream.buffer();
let sentence: ServerSentences = ConnectionStream::read_sentence(&mut server_buffer)
.expect("Failed to read LOGIN nutdev0");
assert_eq!(
sentence,
ServerSentences::ExecLogin {
ups_name: "nutdev0".into()
}
);
// Server rejects login
server_stream
.write_sentence(&ClientSentences::RespondErr {
message: "USERNAME-REQUIRED".into(),
extras: vec![],
})
.expect("Failed to write ERR USERNAME-REQUIRED");
// Client expects error
let mut client_buffer = client_stream.buffer();
let error: crate::Error =
ConnectionStream::read_sentence::<ClientSentences>(&mut client_buffer)
.expect_err("Failed to read ERR");
assert!(matches!(
error,
crate::Error::Nut(crate::NutError::UsernameRequired)
));
}
}

View file

@ -62,6 +62,8 @@ pub enum NutError {
SslInvalidHostname,
/// Occurs when the client used a feature that is disabled by the server.
FeatureNotConfigured,
/// The client or server sent a message that could not be processed.
NotProcessable,
/// Generic (usually internal) client error.
Generic(String),
}
@ -99,14 +101,15 @@ impl fmt::Display for NutError {
"Given hostname cannot be used for a strict SSL connection"
),
Self::FeatureNotConfigured => write!(f, "Feature not configured by server"),
Self::NotProcessable => write!(f, "Message could not be processed"),
Self::Generic(msg) => write!(f, "NUT error: {}", msg),
}
}
}
impl<T: AsRef<ClientSentences>> From<T> for NutError {
fn from(sentence: T) -> Self {
if let ClientSentences::RespondErr { message, .. } = sentence.as_ref() {
impl From<ClientSentences> for NutError {
fn from(sentence: ClientSentences) -> Self {
if let ClientSentences::RespondErr { message, .. } = sentence {
match message.as_str() {
"ACCESS-DENIED" => Self::AccessDenied,
"UNKNOWN-UPS" => Self::UnknownUps,
@ -135,7 +138,7 @@ impl<T: AsRef<ClientSentences>> From<T> for NutError {
}
} else {
// This is not supposed to happen...
panic!("Cannot convert {:?} into NutError", sentence.as_ref());
panic!("Cannot convert {:?} into NutError", sentence);
}
}
}
@ -163,6 +166,14 @@ impl Error {
pub fn generic<T: ToString>(message: T) -> Self {
NutError::generic(message.to_string()).into()
}
/// Constructs an EOF (end-of-file) error.
pub fn eof() -> Self {
Self::Io(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Reached end of stream while sentence was expected",
))
}
}
impl fmt::Display for Error {

View file

@ -1,4 +1,5 @@
use crate::proto::impl_sentences;
use crate::{Error, NutError};
impl_sentences! {
/// A generic successful response with no additional data.
@ -456,9 +457,20 @@ impl_sentences! {
),
}
#[allow(clippy::from_over_into)]
impl Into<crate::Result<Self>> for Sentences {
fn into(self) -> crate::Result<Sentences> {
if let Sentences::RespondErr { .. } = &self {
Err(Error::Nut(NutError::from(self)))
} else {
Ok(self)
}
}
}
#[cfg(test)]
mod tests {
use crate::proto::test_encode_decode;
use crate::proto::{test_encode_decode, Sentence};
use super::Sentences;

View file

@ -104,6 +104,15 @@ macro_rules! impl_words {
};
}
/// A NUT protocol sentence that can be encoded and decoded from a Vector of strings.
pub trait Sentence: Sized + Into<crate::Result<Self>> {
/// Decodes a sentence. Returns `None` if the pattern cannot be recognized.
fn decode(raw: Vec<String>) -> Option<Self>;
/// Encodes the sentence.
fn encode(&self) -> Vec<&str>;
}
/// Implements the list of sentences, which are combinations
/// of words that form commands (serverbound) and responses (clientbound).
macro_rules! impl_sentences {
@ -147,9 +156,8 @@ macro_rules! impl_sentences {
)*
}
impl Sentences {
/// Decodes a sentence. Returns `None` if the pattern cannot be recognized.
pub(crate) fn decode(raw: Vec<String>) -> Option<Sentences> {
impl crate::proto::Sentence for Sentences {
fn decode(raw: Vec<String>) -> Option<Sentences> {
use super::{Word::*, *};
use Sentences::*;
let words = Word::decode_words(raw.as_slice());
@ -168,8 +176,7 @@ macro_rules! impl_sentences {
None
}
/// Encodes the sentence.
pub(crate) fn encode(&self) -> Vec<&str> {
fn encode(&self) -> Vec<&str> {
use super::Word::*;
match self {
$(

View file

@ -91,6 +91,15 @@ impl_sentences! {
3: cmd_name,
}
),
/// Client requests the list of UPS devices.
QueryListUps (
{
0: List,
1: Ups,
2: EOL,
},
{}
),
/// Client requests the list of variables for the given `ups_name` device.
QueryListVar (
{
@ -313,10 +322,17 @@ impl_sentences! {
),
}
#[allow(clippy::from_over_into)]
impl Into<crate::Result<Self>> for Sentences {
fn into(self) -> crate::Result<Sentences> {
Ok(self)
}
}
#[cfg(test)]
mod tests {
use super::Sentences;
use crate::proto::test_encode_decode;
use crate::proto::{test_encode_decode, Sentence};
#[test]
fn test_encode_decode() {
test_encode_decode!(
@ -371,6 +387,10 @@ mod tests {
ups_name: "nutdev".into(),
}
);
test_encode_decode!(
["LIST", "UPS"] <=>
Sentences::QueryListUps {}
);
test_encode_decode!(
["LIST", "RW", "nutdev"] <=>
Sentences::QueryListRw {

View file

@ -3,7 +3,7 @@
///
/// Returns `None` if the sentence cannot be split safely (usually unbalanced quotation marks).
pub fn split_sentence<T: AsRef<str>>(sentence: T) -> Option<Vec<String>> {
shell_words::split(sentence.as_ref()).ok()
shell_words::split(sentence.as_ref().trim_end_matches('\n')).ok()
}
/// Joins a collection of words (`&str`) into one sentence string,
@ -13,7 +13,7 @@ where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
shell_words::join(words)
format!("{}\n", shell_words::join(words))
}
#[cfg(test)]
@ -34,8 +34,8 @@ mod tests {
#[test]
fn test_join() {
assert_eq!(join_sentence(vec!["AbC", "dEf", "GHi"]), "AbC dEf GHi",);
assert_eq!(join_sentence(vec!["AbC dEf", "GHi"]), "'AbC dEf' GHi",);
assert_eq!(join_sentence(vec!["\"AbC dEf", "GHi"]), "'\"AbC dEf' GHi",);
assert_eq!(join_sentence(vec!["AbC", "dEf", "GHi"]), "AbC dEf GHi\n",);
assert_eq!(join_sentence(vec!["AbC dEf", "GHi"]), "'AbC dEf' GHi\n",);
assert_eq!(join_sentence(vec!["\"AbC dEf", "GHi"]), "'\"AbC dEf' GHi\n",);
}
}