created an abstract interface to streaming serialised IO

This commit is contained in:
Vladimir Komendantskiy 2018-04-11 13:21:50 +01:00
parent fda6b04ab5
commit 1105d15995
6 changed files with 44 additions and 25 deletions

View File

@ -494,11 +494,9 @@ fn index_of_path(mut path: Vec<bool>) -> usize {
path.reverse(); path.reverse();
for &dir in path.iter() { for &dir in path.iter() {
if dir == false { idx = idx << 1;
idx = idx << 1; if dir == true {
} idx = idx | 1;
else {
idx = (idx << 1) | 1;
} }
} }
idx idx

View File

@ -12,6 +12,7 @@ use proto::Message;
use proto_io; use proto_io;
use proto_io::CodecIo; use proto_io::CodecIo;
use messaging::SourcedMessage; use messaging::SourcedMessage;
use stream_io::StreamIo;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
@ -26,21 +27,19 @@ impl From<io::Error> for Error {
/// consensus algorithm. /// consensus algorithm.
pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync + pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync +
From<Vec<u8>> + Into<Vec<u8>>> From<Vec<u8>> + Into<Vec<u8>>>
where Vec<u8>: From<T>
{ {
/// The transmit side of the multiple producer channel from comms threads. /// The transmit side of the multiple producer channel from comms threads.
tx: &'a Sender<SourcedMessage<T>>, tx: &'a Sender<SourcedMessage<T>>,
/// The receive side of the channel to the comms thread. /// The receive side of the channel to the comms thread.
rx: &'a Receiver<Message<T>>, rx: &'a Receiver<Message<T>>,
/// The socket IO task. /// The socket IO task.
io: CodecIo, io: CodecIo<T>,
/// The index of this comms task for identification against its remote node. /// The index of this comms task for identification against its remote node.
pub node_index: usize pub node_index: usize
} }
impl<'a, T: Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>> impl<'a, T: Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
CommsTask<'a, T> CommsTask<'a, T>
where Vec<u8>: From<T>
{ {
pub fn new(tx: &'a Sender<SourcedMessage<T>>, pub fn new(tx: &'a Sender<SourcedMessage<T>>,
rx: &'a Receiver<Message<T>>, rx: &'a Receiver<Message<T>>,
@ -54,7 +53,7 @@ where Vec<u8>: From<T>
CommsTask { CommsTask {
tx: tx, tx: tx,
rx: rx, rx: rx,
io: CodecIo::new(stream), io: StreamIo::from_stream(stream),
node_index: node_index node_index: node_index
} }
} }
@ -76,14 +75,14 @@ where Vec<u8>: From<T>
let message = rx.recv().unwrap(); let message = rx.recv().unwrap();
debug!("Node {} <- {:?}", node_index, message); debug!("Node {} <- {:?}", node_index, message);
// Forward the message to the remote node. // Forward the message to the remote node.
io1.send_message(message).unwrap(); io1.send(message).unwrap();
} }
}); });
// Remote comms receive loop. // Remote comms receive loop.
debug!("Starting remote RX loop for node {}", node_index); debug!("Starting remote RX loop for node {}", node_index);
loop { loop {
match self.io.receive_message() { match self.io.recv() {
Ok(message) => { Ok(message) => {
debug!("Node {} -> {:?}", node_index, message); debug!("Node {} -> {:?}", node_index, message);
tx.send( tx.send(

View File

@ -47,6 +47,7 @@ extern crate reed_solomon_erasure;
mod connection; mod connection;
mod messaging; mod messaging;
mod stream_io;
mod proto; mod proto;
mod proto_io; mod proto_io;
mod commst; mod commst;

View File

@ -16,9 +16,6 @@ pub enum Message<T: Send + Sync> {
Agreement(AgreementMessage) Agreement(AgreementMessage)
} }
//unsafe impl<T: Send + Sync> Send for Message<T> { }
//impl<T: Send + Sync> !Sync for Message<T> { }
/// The three kinds of message sent during the reliable broadcast stage of the /// The three kinds of message sent during the reliable broadcast stage of the
/// consensus algorithm. /// consensus algorithm.
#[derive (Clone, Debug, PartialEq)] #[derive (Clone, Debug, PartialEq)]

View File

@ -6,6 +6,7 @@ use std::net::TcpStream;
use protobuf; use protobuf;
use protobuf::Message as ProtobufMessage; use protobuf::Message as ProtobufMessage;
use proto::*; use proto::*;
use stream_io::StreamIo;
/// A magic key to put right before each message. An atavism of primitive serial /// A magic key to put right before each message. An atavism of primitive serial
/// protocols. /// protocols.
@ -57,29 +58,34 @@ fn decode_u32_from_be(buffer: &[u8]) -> Result<u32, Error> {
Ok(result) Ok(result)
} }
pub struct CodecIo { pub struct CodecIo<T> {
stream: TcpStream, stream: TcpStream,
buffer: [u8; 1024 * 4], buffer: [u8; 1024 * 4],
/// FIXME: remove this dependent argument
phant: T
} }
/// A message handling task. /// A message handling task.
impl CodecIo where { impl<T> StreamIo<TcpStream, T, Error> for CodecIo<T>
pub fn new(stream: TcpStream) -> Self { where T: Clone + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>
{
fn from_stream(stream: TcpStream) -> Self {
CodecIo { CodecIo {
stream, stream,
buffer: [0; 1024 * 4] buffer: [0; 1024 * 4],
phant: T::from(Vec::new())
} }
} }
pub fn try_clone(&self) -> Result<CodecIo, ::std::io::Error> { fn try_clone(&self) -> Result<CodecIo<T>, ::std::io::Error> {
Ok(CodecIo { Ok(CodecIo {
stream: self.stream.try_clone()?, stream: self.stream.try_clone()?,
buffer: [0; 1024 * 4] buffer: self.buffer.clone(),
phant: T::from(Vec::new())
}) })
} }
pub fn receive_message<T>(&mut self) -> Result<Message<T>, Error> fn recv(&mut self) -> Result<Message<T>, Error>
where T: From<Vec<u8>> + Send + Sync
{ {
self.stream.read_exact(&mut self.buffer[0..4])?; self.stream.read_exact(&mut self.buffer[0..4])?;
let frame_start = decode_u32_from_be(&self.buffer[0..4])?; let frame_start = decode_u32_from_be(&self.buffer[0..4])?;
@ -103,9 +109,7 @@ impl CodecIo where {
.map_err(|e| Error::ProtobufError(e)) .map_err(|e| Error::ProtobufError(e))
} }
pub fn send_message<T>(&mut self, message: Message<T>) fn send(&mut self, message: Message<T>) -> Result<(), Error>
-> Result<(), Error>
where T: Into<Vec<u8>> + Send + Sync
{ {
let mut buffer: [u8; 4] = [0; 4]; let mut buffer: [u8; 4] = [0; 4];
// Wrap stream // Wrap stream

20
src/stream_io.rs Normal file
View File

@ -0,0 +1,20 @@
//! Abstract interface to serialised IO.
use std::io;
use std::io::{Read, Write};
use proto::*;
/// Trait of types of streams carrying payload of type `Message<T>` and
/// returning errors of type `Error`.
///
/// This is a stream interface independent of the choice of serialisation
/// methods.
pub trait StreamIo<Stream, T, Error>: Sized
where Stream: Read + Write, T: Send + Sync // From implies Into
{
fn from_stream(stream: Stream) -> Self;
fn try_clone(&self) -> Result<Self, io::Error>;
fn recv(&mut self) -> Result<Message<T>, Error>;
fn send(&mut self, m: Message<T>) -> Result<(), Error>;
}