From 1105d15995bcb734119eb5cf84f2bac1cca39b39 Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Wed, 11 Apr 2018 13:21:50 +0100 Subject: [PATCH] created an abstract interface to streaming serialised IO --- src/broadcast/mod.rs | 8 +++----- src/commst.rs | 11 +++++------ src/lib.rs | 1 + src/proto/mod.rs | 3 --- src/proto_io.rs | 26 +++++++++++++++----------- src/stream_io.rs | 20 ++++++++++++++++++++ 6 files changed, 44 insertions(+), 25 deletions(-) create mode 100644 src/stream_io.rs diff --git a/src/broadcast/mod.rs b/src/broadcast/mod.rs index 685e818..67784e4 100644 --- a/src/broadcast/mod.rs +++ b/src/broadcast/mod.rs @@ -494,11 +494,9 @@ fn index_of_path(mut path: Vec) -> usize { path.reverse(); for &dir in path.iter() { - if dir == false { - idx = idx << 1; - } - else { - idx = (idx << 1) | 1; + idx = idx << 1; + if dir == true { + idx = idx | 1; } } idx diff --git a/src/commst.rs b/src/commst.rs index 3195b1c..040bde2 100644 --- a/src/commst.rs +++ b/src/commst.rs @@ -12,6 +12,7 @@ use proto::Message; use proto_io; use proto_io::CodecIo; use messaging::SourcedMessage; +use stream_io::StreamIo; #[derive(Debug)] pub enum Error { @@ -26,21 +27,19 @@ impl From for Error { /// consensus algorithm. pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync + From> + Into>> -where Vec: From { /// The transmit side of the multiple producer channel from comms threads. tx: &'a Sender>, /// The receive side of the channel to the comms thread. rx: &'a Receiver>, /// The socket IO task. - io: CodecIo, + io: CodecIo, /// The index of this comms task for identification against its remote node. pub node_index: usize } impl<'a, T: Clone + Debug + Send + Sync + From> + Into>> CommsTask<'a, T> -where Vec: From { pub fn new(tx: &'a Sender>, rx: &'a Receiver>, @@ -54,7 +53,7 @@ where Vec: From CommsTask { tx: tx, rx: rx, - io: CodecIo::new(stream), + io: StreamIo::from_stream(stream), node_index: node_index } } @@ -76,14 +75,14 @@ where Vec: From let message = rx.recv().unwrap(); debug!("Node {} <- {:?}", node_index, message); // Forward the message to the remote node. - io1.send_message(message).unwrap(); + io1.send(message).unwrap(); } }); // Remote comms receive loop. debug!("Starting remote RX loop for node {}", node_index); loop { - match self.io.receive_message() { + match self.io.recv() { Ok(message) => { debug!("Node {} -> {:?}", node_index, message); tx.send( diff --git a/src/lib.rs b/src/lib.rs index 5f37c6a..4e1a9c1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,6 +47,7 @@ extern crate reed_solomon_erasure; mod connection; mod messaging; +mod stream_io; mod proto; mod proto_io; mod commst; diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 49b0bbe..e3da713 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -16,9 +16,6 @@ pub enum Message { Agreement(AgreementMessage) } -//unsafe impl Send for Message { } -//impl !Sync for Message { } - /// The three kinds of message sent during the reliable broadcast stage of the /// consensus algorithm. #[derive (Clone, Debug, PartialEq)] diff --git a/src/proto_io.rs b/src/proto_io.rs index 7110c13..3ae4b2b 100644 --- a/src/proto_io.rs +++ b/src/proto_io.rs @@ -6,6 +6,7 @@ use std::net::TcpStream; use protobuf; use protobuf::Message as ProtobufMessage; use proto::*; +use stream_io::StreamIo; /// A magic key to put right before each message. An atavism of primitive serial /// protocols. @@ -57,29 +58,34 @@ fn decode_u32_from_be(buffer: &[u8]) -> Result { Ok(result) } -pub struct CodecIo { +pub struct CodecIo { stream: TcpStream, buffer: [u8; 1024 * 4], + /// FIXME: remove this dependent argument + phant: T } /// A message handling task. -impl CodecIo where { - pub fn new(stream: TcpStream) -> Self { +impl StreamIo for CodecIo +where T: Clone + Send + Sync + From> + Into> +{ + fn from_stream(stream: TcpStream) -> Self { CodecIo { stream, - buffer: [0; 1024 * 4] + buffer: [0; 1024 * 4], + phant: T::from(Vec::new()) } } - pub fn try_clone(&self) -> Result { + fn try_clone(&self) -> Result, ::std::io::Error> { Ok(CodecIo { stream: self.stream.try_clone()?, - buffer: [0; 1024 * 4] + buffer: self.buffer.clone(), + phant: T::from(Vec::new()) }) } - pub fn receive_message(&mut self) -> Result, Error> - where T: From> + Send + Sync + fn recv(&mut self) -> Result, Error> { self.stream.read_exact(&mut self.buffer[0..4])?; let frame_start = decode_u32_from_be(&self.buffer[0..4])?; @@ -103,9 +109,7 @@ impl CodecIo where { .map_err(|e| Error::ProtobufError(e)) } - pub fn send_message(&mut self, message: Message) - -> Result<(), Error> - where T: Into> + Send + Sync + fn send(&mut self, message: Message) -> Result<(), Error> { let mut buffer: [u8; 4] = [0; 4]; // Wrap stream diff --git a/src/stream_io.rs b/src/stream_io.rs new file mode 100644 index 0000000..dc879b5 --- /dev/null +++ b/src/stream_io.rs @@ -0,0 +1,20 @@ +//! Abstract interface to serialised IO. + +use std::io; +use std::io::{Read, Write}; + +use proto::*; + +/// Trait of types of streams carrying payload of type `Message` and +/// returning errors of type `Error`. +/// +/// This is a stream interface independent of the choice of serialisation +/// methods. +pub trait StreamIo: Sized +where Stream: Read + Write, T: Send + Sync // From implies Into +{ + fn from_stream(stream: Stream) -> Self; + fn try_clone(&self) -> Result; + fn recv(&mut self) -> Result, Error>; + fn send(&mut self, m: Message) -> Result<(), Error>; +}