Task is refactored to provide a stream of messages

This commit is contained in:
Vladimir Komendantskiy 2018-03-15 23:43:58 +00:00
parent e8cde28f93
commit cb4c55a5c5
3 changed files with 79 additions and 52 deletions

View File

@ -1,9 +1,10 @@
//! Reliable broadcast algorithm.
use std::fmt::Debug;
use std::collections::{HashMap, HashSet};
use std::net::{TcpStream, TcpListener, SocketAddr};
use errors::ResultExt;
//use errors::ResultExt;
use task::{Error, MessageLoop, Task};
use proto::message::{MessageProto, ValueProto, EchoProto, ReadyProto};
use proto::Message;
use merkle::*;
/// A broadcast task is an instance of `Task`, a message-handling task with a
@ -22,7 +23,7 @@ pub struct BroadcastTask<T> {
readys: HashSet<Vec<u8>>
}
impl<T> BroadcastTask<T> {
impl<T: Debug> BroadcastTask<T> {
pub fn new(stream: TcpStream) -> Self {
BroadcastTask {
task: Task::new(stream),
@ -31,9 +32,20 @@ impl<T> BroadcastTask<T> {
readys: Default::default()
}
}
fn on_message_received(&mut self, message: Message<T>)
-> Result<(), Error>
{
info!("Message received: {:?}", message);
Ok(())
// else {
// warn!("Unexpected message type");
// return Err(Error::ProtocolError);
// }
}
}
impl<T> MessageLoop for BroadcastTask<T> {
impl<T: Debug + From<Vec<u8>>> MessageLoop for BroadcastTask<T> {
fn run(&mut self) {
loop {
match self.task.receive_message() {
@ -46,24 +58,4 @@ impl<T> MessageLoop for BroadcastTask<T> {
}
}
}
fn on_message_received(&mut self, message: MessageProto) -> Result<(), Error> {
if message.has_broadcast() {
let broadcast = message.get_broadcast();
if broadcast.has_value() {
let value = broadcast.get_value();
}
else if broadcast.has_echo() {
let echo = broadcast.get_echo();
}
else if broadcast.has_ready() {
let ready = broadcast.get_ready();
}
return Ok(());
}
else {
warn!("Unexpected message type");
return Err(Error::ProtocolError);
}
}
}

View File

@ -3,12 +3,13 @@ pub mod message;
use ring::digest::Algorithm;
use merkle::proof::{Proof, Lemma, Positioned};
//use protobuf::Message;
use self::message::*;
use protobuf::error::ProtobufResult;
use protobuf::Message as ProtobufMessage;
use proto::message::*;
use protobuf::error::{ProtobufResult, ProtobufError, WireError};
use protobuf::core::parse_from_bytes;
/// Kinds of message sent by nodes participating in consensus.
#[derive (Clone, Debug, PartialEq)]
pub enum Message<T> {
Broadcast(BroadcastMessage<T>),
Agreement(AgreementMessage)
@ -16,6 +17,7 @@ pub enum Message<T> {
/// The three kinds of message sent during the reliable broadcast stage of the
/// consensus algorithm.
#[derive (Clone, Debug, PartialEq)]
pub enum BroadcastMessage<T> {
Value(Proof<T>),
Echo(Proof<T>),
@ -23,18 +25,24 @@ pub enum BroadcastMessage<T> {
}
/// Messages sent during the binary Byzantine agreement stage.
#[derive (Clone, Debug, PartialEq)]
pub enum AgreementMessage {
// TODO
}
impl<T> Message<T> {
/// Translation from protobuf to the regular type.
pub fn from_proto(algorithm: &'static Algorithm,
mut proto: message::MessageProto) -> Option<Self>
///
/// TODO: add an `Algorithm` field to `MessageProto`.
pub fn from_proto(mut proto: message::MessageProto)
-> Option<Self>
where T: From<Vec<u8>>
{
if proto.has_broadcast() {
BroadcastMessage::from_proto(proto.take_broadcast(), algorithm)
BroadcastMessage::from_proto(proto.take_broadcast(),
// TODO, possibly move Algorithm inside
// BroadcastMessage
&::ring::digest::SHA256)
.map(|b| Message::Broadcast(b))
}
else if proto.has_agreement() {
@ -60,6 +68,30 @@ impl<T> Message<T> {
}
m
}
/// Parse a `Message` from its protobuf binary representation.
///
/// TODO: pass custom errors from down the chain of nested parsers as
/// opposed to returning `WireError::Other`.
pub fn parse_from_bytes(bytes: &[u8]) -> ProtobufResult<Self>
where T: From<Vec<u8>>
{
let r = parse_from_bytes::<MessageProto>(bytes)
.map(|proto| Self::from_proto(proto));
match r {
Ok(Some(m)) => Ok(m),
Ok(None) => Err(ProtobufError::WireError(WireError::Other)),
Err(e) => Err(e)
}
}
/// Produce a protobuf representation of this `Message`.
pub fn write_to_bytes(self) -> ProtobufResult<Vec<u8>>
where T: Into<Vec<u8>>
{
self.into_proto().write_to_bytes()
}
}
impl<T> BroadcastMessage<T> {
@ -115,7 +147,7 @@ impl AgreementMessage {
unimplemented!();
}
pub fn from_proto(mut mp: AgreementProto) -> Option<Self>
pub fn from_proto(_mp: AgreementProto) -> Option<Self>
{
unimplemented!();
}

View File

@ -6,8 +6,8 @@ use std::{cmp,io};
use std::io::Read;
use std::net::TcpStream;
use protobuf;
use protobuf::Message as ProtoBufMessage;
use proto::message::{MessageProto};
use protobuf::Message as ProtobufMessage;
use proto::*;
/// A magic key to put right before each message. An atavism of primitive serial
/// protocols.
@ -59,13 +59,9 @@ fn decode_u32_from_be(buffer: &[u8]) -> Result<u32, Error> {
Ok(result)
}
/// A trait allowing custom definitions of the main loop and the received
/// message callback.
/// A trait allowing custom definitions of the main loop.
pub trait MessageLoop {
fn run(&mut self);
fn on_message_received(&mut self,
message: MessageProto)
-> Result<(), Error>;
}
pub struct Task {
@ -74,11 +70,10 @@ pub struct Task {
}
/// Placeholder `MessageLoop` definition for a generic `Task`.
///
/// TODO: not needed? remove?
impl MessageLoop for Task {
fn run(&mut self) {}
fn on_message_received(&mut self, _: MessageProto) -> Result<(), Error> {
Ok(())
}
}
/// A message handling task.
@ -90,7 +85,9 @@ impl Task where Self: MessageLoop {
}
}
pub fn receive_message(&mut self) -> Result<MessageProto, Error> {
pub fn receive_message<T>(&mut self) -> Result<Message<T>, Error>
where T: From<Vec<u8>>
{
self.stream.read_exact(&mut self.buffer[0..4])?;
let frame_start = decode_u32_from_be(&self.buffer[0..4])?;
if frame_start != FRAME_START {
@ -99,30 +96,36 @@ impl Task where Self: MessageLoop {
self.stream.read_exact(&mut self.buffer[0..4])?;
let size = decode_u32_from_be(&self.buffer[0..4])? as usize;
let mut message: Vec<u8> = Vec::new();
message.reserve(size);
while message.len() < size {
let num_to_read = cmp::min(self.buffer.len(), size - message.len());
let mut message_v: Vec<u8> = Vec::new();
message_v.reserve(size);
while message_v.len() < size {
let num_to_read = cmp::min(self.buffer.len(), size -
message_v.len());
let (slice, _) = self.buffer.split_at_mut(num_to_read);
self.stream.read_exact(slice)?;
message.extend_from_slice(slice);
message_v.extend_from_slice(slice);
}
let message = protobuf::parse_from_bytes::<MessageProto>(&message)?;
Ok(message)
Message::parse_from_bytes(&message_v)
.map_err(|e| Error::ProtobufError(e))
}
pub fn send_message(&mut self, message: &MessageProto) -> Result<(), Error> {
pub fn send_message<T>(&mut self, message: Message<T>)
-> Result<(), Error>
where T: Into<Vec<u8>>
{
let mut buffer: [u8; 4] = [0; 4];
// Wrap stream
let mut stream = protobuf::CodedOutputStream::new(&mut self.stream);
// Write magic number
encode_u32_to_be(FRAME_START, &mut buffer[0..4])?;
stream.write_raw_bytes(&buffer)?;
let message_p = message.into_proto();
// Write message size
encode_u32_to_be(message.compute_size(), &mut buffer[0..4])?;
encode_u32_to_be(message_p.compute_size(), &mut buffer[0..4])?;
stream.write_raw_bytes(&buffer)?;
// Write message
message.write_to(&mut stream)?;
message_p.write_to(&mut stream)?;
// Flush
stream.flush()?;
Ok(())