From a92500e67bea0cd1e45e612142b683fb6ce70fb2 Mon Sep 17 00:00:00 2001 From: Aram Peres Date: Wed, 4 Aug 2021 17:48:20 -0400 Subject: [PATCH] Add sentence I/O to blocking ConnectionStream --- Cargo.lock | 62 ++++++++++- rups/Cargo.toml | 4 + rups/src/blocking/client.rs | 12 +++ rups/src/blocking/mod.rs | 1 + rups/src/blocking/stream.rs | 200 +++++++++++++++++++++++++++++++++++- rups/src/error.rs | 19 +++- rups/src/proto/client.rs | 14 ++- rups/src/proto/mod.rs | 17 ++- rups/src/proto/server.rs | 22 +++- rups/src/proto/util.rs | 10 +- 10 files changed, 343 insertions(+), 18 deletions(-) create mode 100644 rups/src/blocking/client.rs diff --git a/Cargo.lock b/Cargo.lock index e07d53b..82d62ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -52,6 +52,22 @@ version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c59e7af012c713f529e7a3ee57ce9b31ddd858d4b512923602f74608b009631" +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + +[[package]] +name = "bytes" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" +dependencies = [ + "byteorder", + "iovec", +] + [[package]] name = "bytes" version = "1.0.1" @@ -85,6 +101,12 @@ dependencies = [ "vec_map", ] +[[package]] +name = "futures" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -94,6 +116,15 @@ dependencies = [ "libc", ] +[[package]] +name = "iovec" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.51" @@ -152,6 +183,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "mockstream" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35bbe0c0c9d254b463b13734bc361d1423289547e052b1e77e5a77292496ba2e" + [[package]] name = "ntapi" version = "0.3.6" @@ -220,9 +257,11 @@ dependencies = [ name = "rups" version = "0.5.2" dependencies = [ + "mockstream", "rustls", "shell-words", "tokio", + "tokio-mockstream", "tokio-rustls", "webpki", "webpki-roots", @@ -305,7 +344,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b7b349f11a7047e6d1276853e612d152f5e8a352c61917887cc2169e2366b4c" dependencies = [ "autocfg", - "bytes", + "bytes 1.0.1", "libc", "memchr", "mio", @@ -315,6 +354,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "tokio-io" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674" +dependencies = [ + "bytes 0.4.12", + "futures", + "log", +] + [[package]] name = "tokio-macros" version = "1.3.0" @@ -326,6 +376,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-mockstream" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41bfc436ef8b7f60c19adf3df086330ae9992385e4d8c53b17a323cad288e155" +dependencies = [ + "futures", + "tokio-io", +] + [[package]] name = "tokio-rustls" version = "0.22.0" diff --git a/rups/Cargo.toml b/rups/Cargo.toml index 8d729ee..781c602 100644 --- a/rups/Cargo.toml +++ b/rups/Cargo.toml @@ -21,6 +21,10 @@ webpki-roots = { version = "0.21", optional = true } tokio = { version = "1", optional = true, features = ["net", "io-util", "rt"] } tokio-rustls = { version = "0.22", optional = true } +[dev-dependencies] +mockstream = "0.0.3" +tokio-mockstream = "1.1.0" + [features] default = [] ssl = ["rustls", "rustls/dangerous_configuration", "webpki", "webpki-roots"] diff --git a/rups/src/blocking/client.rs b/rups/src/blocking/client.rs new file mode 100644 index 0000000..ddb961c --- /dev/null +++ b/rups/src/blocking/client.rs @@ -0,0 +1,12 @@ +use crate::blocking::stream::ConnectionStream; +use crate::Config; + +/// A synchronous NUT client. +pub struct Client { + /// The client configuration. + config: Config, + /// The client connection. + stream: ConnectionStream, +} + +impl Client {} diff --git a/rups/src/blocking/mod.rs b/rups/src/blocking/mod.rs index 195fc97..b3515b6 100644 --- a/rups/src/blocking/mod.rs +++ b/rups/src/blocking/mod.rs @@ -5,6 +5,7 @@ use crate::blocking::stream::ConnectionStream; use crate::cmd::{Command, Response}; use crate::{Config, Error, Host, NutError}; +mod client; mod stream; /// A blocking NUT client connection. diff --git a/rups/src/blocking/stream.rs b/rups/src/blocking/stream.rs index 96f3783..d1ba29a 100644 --- a/rups/src/blocking/stream.rs +++ b/rups/src/blocking/stream.rs @@ -1,4 +1,7 @@ -use std::io::{Read, Write}; +use crate::proto::util::{join_sentence, split_sentence}; +use crate::proto::Sentence; +use crate::{Error, NutError}; +use std::io::{BufRead, BufReader, Read, Write}; use std::net::TcpStream; /// A wrapper for various synchronous stream types. @@ -13,6 +16,10 @@ pub enum ConnectionStream { /// A server stream wrapped with SSL using `rustls`. #[cfg(feature = "ssl")] SslServer(Box>), + + /// A mock stream, used for testing. + #[cfg(test)] + Mock(mockstream::SharedMockStream), } impl ConnectionStream { @@ -37,6 +44,82 @@ impl ConnectionStream { rustls::StreamOwned::new(session, self), ))) } + + /// Writes a sentence on the current stream. + pub fn write_sentence(&mut self, sentence: &T) -> crate::Result<()> { + let encoded = sentence.encode(); + let joined = join_sentence(encoded); + self.write_literal(&joined)?; + self.flush().map_err(crate::Error::Io) + } + + /// Writes a collection of sentences on the current stream. + pub fn write_sentences(&mut self, sentences: &[T]) -> crate::Result<()> { + for sentence in sentences { + let encoded = sentence.encode(); + let joined = join_sentence(encoded); + self.write_literal(&joined)?; + } + self.flush().map_err(crate::Error::Io) + } + + /// Writes a literal string to the current stream. + /// Note: the literal string MUST end with a new-line character (`\n`). + /// + /// Note: does not automatically flush. + pub fn write_literal(&mut self, literal: &str) -> crate::Result<()> { + assert!(literal.ends_with('\n')); + self.write_all(literal.as_bytes())?; + Ok(()) + } + + /// Reads a literal string from the current stream. + /// Note: the literal string will ends with a new-line character (`\n`). + pub fn read_literal(reader: &mut BufReader<&mut Self>) -> crate::Result { + let mut raw = String::new(); + reader.read_line(&mut raw)?; + Ok(raw) + } + + /// Reads a sentence from the given `BufReader`. + pub fn read_sentence(reader: &mut BufReader<&mut Self>) -> crate::Result { + let raw = Self::read_literal(reader)?; + if raw.is_empty() { + return Err(Error::eof()); + } + let split = split_sentence(raw).ok_or(crate::NutError::NotProcessable)?; + T::decode(split) + .ok_or(Error::Nut(NutError::InvalidArgument))? + .into() + } + + /// Reads all sentences in the buffer until the given `matcher` function evaluates to `true`. + /// + /// The final sentence is excluded. + pub fn read_sentences_until bool>( + reader: &mut BufReader<&mut Self>, + matcher: F, + ) -> crate::Result> { + let mut result = Vec::new(); + let max_iter = 1000; // Exit after 1000 lines to prevent overflow. + for _ in 0..max_iter { + let sentence: T = Self::read_sentence(reader)?; + if matcher(&sentence) { + return Ok(result); + } else { + result.push(sentence); + } + } + Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::Interrupted, + "Reached maximum read capacity.", + ))) + } + + /// Initializes a new `BufReader` for the current stream. + pub fn buffer(&mut self) -> BufReader<&mut Self> { + BufReader::new(self) + } } impl Read for ConnectionStream { @@ -47,6 +130,8 @@ impl Read for ConnectionStream { Self::SslClient(stream) => stream.read(buf), #[cfg(feature = "ssl")] Self::SslServer(stream) => stream.read(buf), + #[cfg(test)] + Self::Mock(stream) => stream.read(buf), } } } @@ -59,6 +144,12 @@ impl Write for ConnectionStream { Self::SslClient(stream) => stream.write(buf), #[cfg(feature = "ssl")] Self::SslServer(stream) => stream.write(buf), + #[cfg(test)] + Self::Mock(stream) => { + let len = buf.len(); + stream.push_bytes_to_read(buf); + Ok(len) + } } } @@ -69,6 +160,113 @@ impl Write for ConnectionStream { Self::SslClient(stream) => stream.flush(), #[cfg(feature = "ssl")] Self::SslServer(stream) => stream.flush(), + #[cfg(test)] + Self::Mock(stream) => stream.flush(), } } } + +#[cfg(test)] +mod tests { + use super::ConnectionStream; + use crate::proto::{ClientSentences, ServerSentences}; + use std::io::{Read, Write}; + + #[test] + fn read_write_sentence() { + let mut client_stream = mockstream::SharedMockStream::new(); + let mut server_stream = client_stream.clone(); + + let mut client_stream = ConnectionStream::Mock(client_stream); + let mut server_stream = ConnectionStream::Mock(server_stream); + + // Client requests list of UPS devices + client_stream + .write_sentence(&ServerSentences::QueryListUps {}) + .expect("Failed to write LIST UPS"); + + // Server reads query for list of UPS devices + let mut server_buffer = server_stream.buffer(); + let sentence: ServerSentences = + ConnectionStream::read_sentence(&mut server_buffer).expect("Failed to read LIST UPS"); + assert_eq!(sentence, ServerSentences::QueryListUps {}); + + // Server sends list of UPS devices. + server_stream + .write_sentences(&[ + ClientSentences::BeginListUps {}, + ClientSentences::RespondUps { + ups_name: "nutdev0".into(), + description: "A NUT device.".into(), + }, + ClientSentences::RespondUps { + ups_name: "nutdev1".into(), + description: "Another NUT device.".into(), + }, + ClientSentences::EndListUps {}, + ]) + .expect("Failed to write UPS LIST"); + + // Client reads list of UPS devices. + let mut client_buffer = client_stream.buffer(); + let sentence: ClientSentences = ConnectionStream::read_sentence(&mut client_buffer) + .expect("Failed to read BEGIN LIST UPS"); + assert_eq!(sentence, ClientSentences::BeginListUps {}); + + let sentences: Vec = + ConnectionStream::read_sentences_until(&mut client_buffer, |s| { + matches!(s, ClientSentences::EndListUps {}) + }) + .expect("Failed to read UPS items"); + + assert_eq!( + sentences, + vec![ + ClientSentences::RespondUps { + ups_name: "nutdev0".into(), + description: "A NUT device.".into(), + }, + ClientSentences::RespondUps { + ups_name: "nutdev1".into(), + description: "Another NUT device.".into(), + }, + ] + ); + + // Client sends login + client_stream + .write_sentence(&ServerSentences::ExecLogin { + ups_name: "nutdev0".into(), + }) + .expect("Failed to write LOGIN nutdev0"); + + // Server receives login + let mut server_buffer = server_stream.buffer(); + let sentence: ServerSentences = ConnectionStream::read_sentence(&mut server_buffer) + .expect("Failed to read LOGIN nutdev0"); + assert_eq!( + sentence, + ServerSentences::ExecLogin { + ups_name: "nutdev0".into() + } + ); + + // Server rejects login + server_stream + .write_sentence(&ClientSentences::RespondErr { + message: "USERNAME-REQUIRED".into(), + extras: vec![], + }) + .expect("Failed to write ERR USERNAME-REQUIRED"); + + // Client expects error + let mut client_buffer = client_stream.buffer(); + let error: crate::Error = + ConnectionStream::read_sentence::(&mut client_buffer) + .expect_err("Failed to read ERR"); + assert!(matches!( + error, + crate::Error::Nut(crate::NutError::UsernameRequired) + )); + } +} diff --git a/rups/src/error.rs b/rups/src/error.rs index 78dfd1d..5f49b1a 100644 --- a/rups/src/error.rs +++ b/rups/src/error.rs @@ -62,6 +62,8 @@ pub enum NutError { SslInvalidHostname, /// Occurs when the client used a feature that is disabled by the server. FeatureNotConfigured, + /// The client or server sent a message that could not be processed. + NotProcessable, /// Generic (usually internal) client error. Generic(String), } @@ -99,14 +101,15 @@ impl fmt::Display for NutError { "Given hostname cannot be used for a strict SSL connection" ), Self::FeatureNotConfigured => write!(f, "Feature not configured by server"), + Self::NotProcessable => write!(f, "Message could not be processed"), Self::Generic(msg) => write!(f, "NUT error: {}", msg), } } } -impl> From for NutError { - fn from(sentence: T) -> Self { - if let ClientSentences::RespondErr { message, .. } = sentence.as_ref() { +impl From for NutError { + fn from(sentence: ClientSentences) -> Self { + if let ClientSentences::RespondErr { message, .. } = sentence { match message.as_str() { "ACCESS-DENIED" => Self::AccessDenied, "UNKNOWN-UPS" => Self::UnknownUps, @@ -135,7 +138,7 @@ impl> From for NutError { } } else { // This is not supposed to happen... - panic!("Cannot convert {:?} into NutError", sentence.as_ref()); + panic!("Cannot convert {:?} into NutError", sentence); } } } @@ -163,6 +166,14 @@ impl Error { pub fn generic(message: T) -> Self { NutError::generic(message.to_string()).into() } + + /// Constructs an EOF (end-of-file) error. + pub fn eof() -> Self { + Self::Io(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Reached end of stream while sentence was expected", + )) + } } impl fmt::Display for Error { diff --git a/rups/src/proto/client.rs b/rups/src/proto/client.rs index 87161d6..6377a26 100644 --- a/rups/src/proto/client.rs +++ b/rups/src/proto/client.rs @@ -1,4 +1,5 @@ use crate::proto::impl_sentences; +use crate::{Error, NutError}; impl_sentences! { /// A generic successful response with no additional data. @@ -456,9 +457,20 @@ impl_sentences! { ), } +#[allow(clippy::from_over_into)] +impl Into> for Sentences { + fn into(self) -> crate::Result { + if let Sentences::RespondErr { .. } = &self { + Err(Error::Nut(NutError::from(self))) + } else { + Ok(self) + } + } +} + #[cfg(test)] mod tests { - use crate::proto::test_encode_decode; + use crate::proto::{test_encode_decode, Sentence}; use super::Sentences; diff --git a/rups/src/proto/mod.rs b/rups/src/proto/mod.rs index a1d5c6e..01e1589 100644 --- a/rups/src/proto/mod.rs +++ b/rups/src/proto/mod.rs @@ -104,6 +104,15 @@ macro_rules! impl_words { }; } +/// A NUT protocol sentence that can be encoded and decoded from a Vector of strings. +pub trait Sentence: Sized + Into> { + /// Decodes a sentence. Returns `None` if the pattern cannot be recognized. + fn decode(raw: Vec) -> Option; + + /// Encodes the sentence. + fn encode(&self) -> Vec<&str>; +} + /// Implements the list of sentences, which are combinations /// of words that form commands (serverbound) and responses (clientbound). macro_rules! impl_sentences { @@ -147,9 +156,8 @@ macro_rules! impl_sentences { )* } - impl Sentences { - /// Decodes a sentence. Returns `None` if the pattern cannot be recognized. - pub(crate) fn decode(raw: Vec) -> Option { + impl crate::proto::Sentence for Sentences { + fn decode(raw: Vec) -> Option { use super::{Word::*, *}; use Sentences::*; let words = Word::decode_words(raw.as_slice()); @@ -168,8 +176,7 @@ macro_rules! impl_sentences { None } - /// Encodes the sentence. - pub(crate) fn encode(&self) -> Vec<&str> { + fn encode(&self) -> Vec<&str> { use super::Word::*; match self { $( diff --git a/rups/src/proto/server.rs b/rups/src/proto/server.rs index a659b8e..6f4b5dc 100644 --- a/rups/src/proto/server.rs +++ b/rups/src/proto/server.rs @@ -91,6 +91,15 @@ impl_sentences! { 3: cmd_name, } ), + /// Client requests the list of UPS devices. + QueryListUps ( + { + 0: List, + 1: Ups, + 2: EOL, + }, + {} + ), /// Client requests the list of variables for the given `ups_name` device. QueryListVar ( { @@ -313,10 +322,17 @@ impl_sentences! { ), } +#[allow(clippy::from_over_into)] +impl Into> for Sentences { + fn into(self) -> crate::Result { + Ok(self) + } +} + #[cfg(test)] mod tests { use super::Sentences; - use crate::proto::test_encode_decode; + use crate::proto::{test_encode_decode, Sentence}; #[test] fn test_encode_decode() { test_encode_decode!( @@ -371,6 +387,10 @@ mod tests { ups_name: "nutdev".into(), } ); + test_encode_decode!( + ["LIST", "UPS"] <=> + Sentences::QueryListUps {} + ); test_encode_decode!( ["LIST", "RW", "nutdev"] <=> Sentences::QueryListRw { diff --git a/rups/src/proto/util.rs b/rups/src/proto/util.rs index 8a9a0fa..43f03b3 100644 --- a/rups/src/proto/util.rs +++ b/rups/src/proto/util.rs @@ -3,7 +3,7 @@ /// /// Returns `None` if the sentence cannot be split safely (usually unbalanced quotation marks). pub fn split_sentence>(sentence: T) -> Option> { - shell_words::split(sentence.as_ref()).ok() + shell_words::split(sentence.as_ref().trim_end_matches('\n')).ok() } /// Joins a collection of words (`&str`) into one sentence string, @@ -13,7 +13,7 @@ where I: IntoIterator, S: AsRef, { - shell_words::join(words) + format!("{}\n", shell_words::join(words)) } #[cfg(test)] @@ -34,8 +34,8 @@ mod tests { #[test] fn test_join() { - assert_eq!(join_sentence(vec!["AbC", "dEf", "GHi"]), "AbC dEf GHi",); - assert_eq!(join_sentence(vec!["AbC dEf", "GHi"]), "'AbC dEf' GHi",); - assert_eq!(join_sentence(vec!["\"AbC dEf", "GHi"]), "'\"AbC dEf' GHi",); + assert_eq!(join_sentence(vec!["AbC", "dEf", "GHi"]), "AbC dEf GHi\n",); + assert_eq!(join_sentence(vec!["AbC dEf", "GHi"]), "'AbC dEf' GHi\n",); + assert_eq!(join_sentence(vec!["\"AbC dEf", "GHi"]), "'\"AbC dEf' GHi\n",); } }