initial commit: elements of profobuf interface

This commit is contained in:
Vladimir Komendantskiy 2018-03-15 00:03:21 +00:00
commit 5301123872
12 changed files with 2529 additions and 0 deletions

21
Cargo.toml Normal file
View File

@ -0,0 +1,21 @@
[package]
name = "hbbft"
version = "0.1.0"
authors = ["Vladimir Komendantskiy <komendantsky@gmail.com>"]
[dependencies]
log = "0.4.1"
simple_logger = "0.5"
tokio = "0.1"
tokio-io = "0.1"
tokio-timer = "0.1"
futures = "0.1"
reed-solomon-erasure = "3.0"
merkle = { git = "https://github.com/vkomenda/merkle.rs", branch = "public-proof" }
ring = "^0.12"
rand = "*"
error-chain = "0.11"
protobuf = "1.4.4"
[build-dependencies]
protoc-rust = "1.4.4"

4
README.md Normal file
View File

@ -0,0 +1,4 @@
# Implementation of the paper "Honey Badger of BFT Protocols" in Rust
This is a modular library of consensus. There are *going to be*
[examples](./examples/README.md) illustrating the use of this algorithm.

9
build.rs Normal file
View File

@ -0,0 +1,9 @@
extern crate protoc_rust;
fn main() {
protoc_rust::run(protoc_rust::Args {
out_dir: "src/proto",
input: &["proto/message.proto"],
includes: &["proto"],
}).expect("protoc");
}

1
examples/README.md Normal file
View File

@ -0,0 +1 @@
## Examples for the hbbft library

49
proto/message.proto Normal file
View File

@ -0,0 +1,49 @@
syntax = "proto3";
message MessageProto {
oneof payload {
BroadcastProto broadcast = 1;
AgreementProto agreement = 2;
}
}
message BroadcastProto {
oneof payload {
ValueProto value = 1;
EchoProto echo = 2;
ReadyProto ready = 3;
}
}
message ValueProto {
ProofProto proof = 1;
}
message EchoProto {
ProofProto proof = 1;
}
message ReadyProto {
bytes root_hash = 1;
}
message ProofProto {
bytes root_hash = 1;
LemmaProto lemma = 2;
bytes value = 3;
}
message LemmaProto {
bytes node_hash = 1;
LemmaProto sub_lemma = 2;
oneof sibling_hash {
bytes left_sibling_hash = 3;
bytes right_sibling_hash = 4;
}
}
message AgreementProto {
// TODO
}

4
src/agreement/mod.rs Normal file
View File

@ -0,0 +1,4 @@
//! Binary Byzantine agreement protocol from a common coin protocol.
use futures::{Future, Stream};
use futures::future::*;

69
src/broadcast/mod.rs Normal file
View File

@ -0,0 +1,69 @@
//! Reliable broadcast algorithm.
use std::collections::{HashMap, HashSet};
use std::net::{TcpStream, TcpListener, SocketAddr};
use errors::ResultExt;
use task::{Error, MessageLoop, Task};
use proto::message::{MessageProto, ValueProto, EchoProto, ReadyProto};
use merkle::*;
/// A broadcast task is an instance of `Task`, a message-handling task with a
/// main loop.
pub struct BroadcastTask<T> {
/// The underlying task that handles sending and receiving messages.
task: Task,
/// Messages of type Value received so far, keyed with the root hash for
/// easy access.
values: HashMap<Vec<u8>, Proof<T>>,
/// Messages of type Echo received so far, keyed with the root hash for
/// easy access.
echos: HashMap<Vec<u8>, Proof<T>>,
/// Messages of type Ready received so far. That is, the root hashes in
/// those messages.
readys: HashSet<Vec<u8>>
}
impl<T> BroadcastTask<T> {
pub fn new(stream: TcpStream) -> Self {
BroadcastTask {
task: Task::new(stream),
values: Default::default(),
echos: Default::default(),
readys: Default::default()
}
}
}
impl<T> MessageLoop for BroadcastTask<T> {
fn run(&mut self) {
loop {
match self.task.receive_message() {
Ok(message) => self.on_message_received(message).unwrap(),
Err(Error::ProtobufError(e)) => warn!("Protobuf error {}", e),
Err(e) => {
warn!("Critical error {:?}", e);
break;
}
}
}
}
fn on_message_received(&mut self, message: MessageProto) -> Result<(), Error> {
if message.has_broadcast() {
let broadcast = message.get_broadcast();
if broadcast.has_value() {
let value = broadcast.get_value();
}
else if broadcast.has_echo() {
let echo = broadcast.get_echo();
}
else if broadcast.has_ready() {
let ready = broadcast.get_ready();
}
return Ok(());
}
else {
warn!("Unexpected message type");
return Err(Error::ProtocolError);
}
}
}

4
src/errors.rs Normal file
View File

@ -0,0 +1,4 @@
//! Template-assisted definition of the crate's error strategy.
error_chain! {
}

15
src/lib.rs Normal file
View File

@ -0,0 +1,15 @@
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate log;
extern crate protobuf;
extern crate ring;
extern crate merkle;
extern crate futures;
mod errors;
mod proto;
mod task;
pub mod broadcast;
pub mod agreement;

2082
src/proto/message.rs Normal file

File diff suppressed because it is too large Load Diff

126
src/proto/mod.rs Normal file
View File

@ -0,0 +1,126 @@
//! Construction of messages from protobuf buffers.
pub mod message;
use ring::digest::Algorithm;
use merkle::proof::{Proof, Lemma, Positioned};
//use protobuf::Message;
use self::message::*;
use protobuf::error::ProtobufResult;
use protobuf::core::parse_from_bytes;
/// Kinds of message sent by nodes participating in consensus.
enum Message<T> {
Broadcast(BroadcastMessage<T>),
Agreement(AgreementMessage<T>)
}
/// The three kinds of message sent during the reliable broadcast stage of the
/// consensus algorithm.
enum BroadcastMessage<T> {
Value(Proof<T>),
Echo(Proof<T>),
Ready(Vec<u8>)
}
/// Messages sent during the binary Byzantine agreement stage.
enum AgreementMessage<T> {
// TODO
Phantom(T)
}
impl<T> Message<T> {
pub fn from_protobuf(algorithm: &'static Algorithm,
mut proto: message::MessageProto) -> Option<Self>
where T: From<Vec<u8>>,
{
if proto.has_broadcast() {
proto.take_broadcast().into_broadcast(algorithm)
.map(|b| Message::Broadcast(b))
}
else {
// TODO
None
}
}
}
impl BroadcastProto {
pub fn into_broadcast<T>(mut self,
algorithm: &'static Algorithm)
-> Option<BroadcastMessage<T>>
where T: From<Vec<u8>>,
{
if self.has_value() {
self.take_value().take_proof().into_proof(algorithm)
.map(|p| BroadcastMessage::Value(p))
}
else if self.has_echo() {
self.take_echo().take_proof().into_proof(algorithm)
.map(|p| BroadcastMessage::Echo(p))
}
else if self.has_ready() {
let h = self.take_ready().take_root_hash();
Some(BroadcastMessage::Ready(h))
}
else {
None
}
}
}
impl ProofProto {
pub fn into_proof<T>(mut self,
algorithm: &'static Algorithm)
-> Option<Proof<T>>
where T: From<Vec<u8>>
{
if !self.has_lemma() {
return None;
}
self.take_lemma().into_lemma().map(|lemma| {
Proof::new(
algorithm,
self.take_root_hash(),
lemma,
self.take_value().into(),
)
})
}
}
impl LemmaProto {
pub fn into_lemma(mut self) -> Option<Lemma> {
let node_hash = self.take_node_hash();
let sibling_hash = if self.has_left_sibling_hash() {
Some(Positioned::Left(self.take_left_sibling_hash()))
} else if self.has_right_sibling_hash() {
Some(Positioned::Right(self.take_right_sibling_hash()))
} else {
None
};
if self.has_sub_lemma() {
// If a `sub_lemma` is present is the Protobuf,
// then we expect it to unserialize to a valid `Lemma`,
// otherwise we return `None`
self.take_sub_lemma().into_lemma().map(|sub_lemma| {
Lemma {
node_hash: node_hash,
sibling_hash: sibling_hash,
sub_lemma: Some(Box::new(sub_lemma)),
}
})
} else {
// We might very well not have a sub_lemma,
// in which case we just set it to `None`,
// but still return a potentially valid `Lemma`.
Some(Lemma {
node_hash: node_hash,
sibling_hash: sibling_hash,
sub_lemma: None,
})
}
}
}

145
src/task.rs Normal file
View File

@ -0,0 +1,145 @@
//! Protobuf message IO task structure. The generic, shared behaviour is
//! contained in the implementation of `Task` while any specific behaviour
//! should be defined by means of the `MessageLoop` trait interface.
use std::{cmp,io};
use std::io::Read;
use std::net::TcpStream;
use protobuf;
use protobuf::Message as ProtoBufMessage;
use proto::message::{MessageProto};
/// A magic key to put right before each message. An atavism of primitive serial
/// protocols.
///
/// TODO: Replace it with a proper handshake at connection initiation.
const FRAME_START: u32 = 0x2C0FFEE5;
#[derive(Debug)]
pub enum Error {
IoError(io::Error),
EncodeError,
DecodeError,
FrameStartMismatch,
ProtocolError,
ProtobufError(protobuf::ProtobufError),
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error { Error::IoError(err) }
}
impl From<protobuf::ProtobufError> for Error {
fn from(err: protobuf::ProtobufError) -> Error { Error::ProtobufError(err) }
}
fn encode_u32_to_be(value: u32, buffer: &mut[u8]) -> Result<(), Error> {
if buffer.len() < 4 {
return Err(Error::EncodeError);
}
let value = value.to_le();
buffer[0] = ((value & 0xFF000000) >> 24) as u8;
buffer[1] = ((value & 0x00FF0000) >> 16) as u8;
buffer[2] = ((value & 0x0000FF00) >> 8) as u8;
buffer[3] = (value & 0x000000FF) as u8;
Ok(())
}
fn decode_u32_from_be(buffer: &[u8]) -> Result<u32, Error> {
if buffer.len() < 4 {
return Err(Error::DecodeError);
}
let mut result: u32 = buffer[0] as u32;
result = result << 8;
result += buffer[1] as u32;
result = result << 8;
result += buffer[2] as u32;
result = result << 8;
result += buffer[3] as u32;
Ok(result)
}
/// A trait allowing custom definitions of the main loop and the received
/// message callback.
pub trait MessageLoop {
fn run(&mut self);
fn on_message_received(&mut self,
message: MessageProto)
-> Result<(), Error>;
}
pub struct Task {
stream: TcpStream,
buffer: [u8; 1024],
}
/// Placeholder `MessageLoop` definition for a generic `Task`.
impl MessageLoop for Task {
fn run(&mut self) {}
fn on_message_received(&mut self, _: MessageProto) -> Result<(), Error> {
Ok(())
}
}
/// A message handling task.
impl Task where Self: MessageLoop {
pub fn new(stream: TcpStream) -> Task {
Task {
stream,
buffer: [0; 1024]
}
}
pub fn receive_message(&mut self) -> Result<MessageProto, Error> {
self.stream.read_exact(&mut self.buffer[0..4])?;
let frame_start = decode_u32_from_be(&self.buffer[0..4])?;
if frame_start != FRAME_START {
return Err(Error::FrameStartMismatch);
};
self.stream.read_exact(&mut self.buffer[0..4])?;
let size = decode_u32_from_be(&self.buffer[0..4])? as usize;
let mut message: Vec<u8> = Vec::new();
message.reserve(size);
while message.len() < size {
let num_to_read = cmp::min(self.buffer.len(), size - message.len());
let (slice, _) = self.buffer.split_at_mut(num_to_read);
self.stream.read_exact(slice)?;
message.extend_from_slice(slice);
}
let message = protobuf::parse_from_bytes::<MessageProto>(&message)?;
Ok(message)
}
pub fn send_message(&mut self, message: &MessageProto) -> Result<(), Error> {
let mut buffer: [u8; 4] = [0; 4];
// Wrap stream
let mut stream = protobuf::CodedOutputStream::new(&mut self.stream);
// Write magic number
encode_u32_to_be(FRAME_START, &mut buffer[0..4])?;
stream.write_raw_bytes(&buffer)?;
// Write message size
encode_u32_to_be(message.compute_size(), &mut buffer[0..4])?;
stream.write_raw_bytes(&buffer)?;
// Write message
message.write_to(&mut stream)?;
// Flush
stream.flush()?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use task::*;
/// Test the requirement that composing encoding with decoding yields the
/// identity.
#[test]
fn encode_decode_identity() {
let mut buffer: [u8; 4] = [0; 4];
encode_u32_to_be(FRAME_START, &mut buffer[0..4]).unwrap();
let frame_start = decode_u32_from_be(&buffer[0..4]).unwrap();
assert_eq!(frame_start, FRAME_START);
}
}