Merge branch 'master' into remove-autogenerated-message.rs

This commit is contained in:
Vladimir Komendantskiy 2018-03-28 14:52:13 +01:00 committed by GitHub
commit 103e7c612d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 799 additions and 119 deletions

3
.gitignore vendored
View File

@ -1 +1,4 @@
src/proto/message.rs
/target
**/*.rs.bk
Cargo.lock

View File

@ -7,15 +7,19 @@ authors = ["Vladimir Komendantskiy <komendantsky@gmail.com>"]
log = "0.4.1"
simple_logger = "0.5"
tokio = "0.1"
tokio-io = "0.1"
tokio-timer = "0.1"
futures = "0.1"
#tokio-io = "0.1"
#tokio-timer = "0.1"
reed-solomon-erasure = "3.0"
merkle = { git = "https://github.com/vkomenda/merkle.rs", branch = "public-proof" }
ring = "^0.12"
rand = "*"
error-chain = "0.11"
protobuf = "1.4.4"
spmc = "0.2.2"
crossbeam = "0.3.2"
[build-dependencies]
protoc-rust = "1.4.4"
[dev-dependencies]
docopt = "0.8"

View File

@ -1,4 +1,4 @@
# Implementation of the paper "Honey Badger of BFT Protocols" in Rust
This is a modular library of consensus. There are *going to be*
This is a modular library of consensus. There are
[examples](./examples/README.md) illustrating the use of this algorithm.

View File

@ -1 +1,5 @@
## Examples for the hbbft library
## Examples for the hbbft library
- [Consensus node](consensus-node.rs) - Example of a consensus node that uses
the `hbbft::node::Node` struct for running the distributed consensus state
machine.

View File

@ -0,0 +1,54 @@
//! Example of a consensus node that uses the `hbbft::node::Node` struct for
//! running the distributed consensus state machine.
extern crate docopt;
extern crate hbbft;
use hbbft::node::Node;
use docopt::Docopt;
use std::net::SocketAddr;
use std::vec::Vec;
const VERSION: &'static str = "0.1.0";
const USAGE: &'static str = "
Consensus node example
Usage:
consensus-node --bind-address=<host:port> [--value=VALUE] [--remote-address=<host:port>]...
consensus-node (--help | -h)
consensus-node --version
";
#[derive(Debug)]
struct Args {
bind_address: SocketAddr,
remote_addresses: Vec<SocketAddr>,
value: Option<Vec<u8>>,
}
fn parse_args() -> Args {
let args = Docopt::new(USAGE)
.unwrap_or_else(|e| e.exit())
.version(Some(VERSION.to_owned()))
.parse()
.unwrap_or_else(|e| e.exit());
Args {
value: if args.get_count("--value") > 0 {
Some(args.get_str("--value").as_bytes().to_vec())
}
else {
None
},
bind_address: args.get_str("--bind-address").parse().unwrap(),
remote_addresses: args.get_vec("--remote-address")
.iter()
.map(|s| s.parse().unwrap())
.collect()
}
}
pub fn main() {
let args: Args = parse_args();
println!("{:?}", args);
let node = Node::new(args.bind_address, args.remote_addresses, args.value);
node.run();
}

View File

@ -1,4 +1,4 @@
//! Binary Byzantine agreement protocol from a common coin protocol.
use futures::{Future, Stream};
use futures::future::*;
//use futures::{Future, Stream};
//use futures::future::*;

View File

@ -1,69 +1,335 @@
//! Reliable broadcast algorithm.
use std::collections::{HashMap, HashSet};
use std::net::{TcpStream, TcpListener, SocketAddr};
use errors::ResultExt;
use task::{Error, MessageLoop, Task};
use proto::message::{MessageProto, ValueProto, EchoProto, ReadyProto};
use merkle::*;
use std::fmt::Debug;
use std::hash::Hash;
use std::collections::{HashSet, HashMap};
use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use spmc;
use crossbeam;
use proto::*;
use std::marker::{Send, Sync};
use merkle::MerkleTree;
use merkle::proof::{Proof, Lemma, Positioned};
use reed_solomon_erasure::ReedSolomon;
/// A broadcast task is an instance of `Task`, a message-handling task with a
/// main loop.
pub struct BroadcastTask<T> {
/// The underlying task that handles sending and receiving messages.
task: Task,
/// Messages of type Value received so far, keyed with the root hash for
/// easy access.
values: HashMap<Vec<u8>, Proof<T>>,
/// Messages of type Echo received so far, keyed with the root hash for
/// easy access.
echos: HashMap<Vec<u8>, Proof<T>>,
/// Temporary placeholders for the number of participants and the maximum
/// envisaged number of faulty nodes. Only one is required since N >= 3f +
/// 1. There are at least two options for where should N and f come from:
///
/// - start-up parameters
///
/// - initial socket setup phase in node.rs
///
const PLACEHOLDER_N: usize = 8;
const PLACEHOLDER_F: usize = 2;
/// Broadcast stage. See the TODO note below!
///
/// TODO: The ACS algorithm will require multiple broadcast instances running
/// asynchronously, see Figure 4 in the HBBFT paper. So, it's likely that the
/// broadcast *stage* has to be replaced with N asynchronous threads, each
/// responding to values from one particular remote node. The paper doesn't make
/// it clear though how other messages - Echo and Ready - are distributed over
/// the instances. Also it appears that the sender of a message has to become
/// part of the message for this to work.
pub struct Stage<T: Send + Sync> {
/// The transmit side of the multiple consumer channel to comms threads.
pub tx: Arc<Mutex<spmc::Sender<Message<T>>>>,
/// The receive side of the multiple producer channel from comms threads.
pub rx: Arc<Mutex<mpsc::Receiver<Message<T>>>>,
/// Messages of type Value received so far.
pub values: HashSet<Proof<T>>,
/// Messages of type Echo received so far.
pub echos: HashSet<Proof<T>>,
/// Messages of type Ready received so far. That is, the root hashes in
/// those messages.
readys: HashSet<Vec<u8>>
pub readys: HashMap<Vec<u8>, usize>,
/// Value to be broadcast
pub broadcast_value: Option<T>
}
impl<T> BroadcastTask<T> {
pub fn new(stream: TcpStream) -> Self {
BroadcastTask {
task: Task::new(stream),
impl<T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
+ From<Vec<u8>> + AsRef<[u8]>>
Stage<T>
where Vec<u8>: From<T>
{
pub fn new(tx: Arc<Mutex<spmc::Sender<Message<T>>>>,
rx: Arc<Mutex<mpsc::Receiver<Message<T>>>>,
broadcast_value: Option<T>) -> Self
{
Stage {
tx: tx,
rx: rx,
values: Default::default(),
echos: Default::default(),
readys: Default::default()
readys: Default::default(),
broadcast_value: broadcast_value
}
}
/// Broadcast stage task returning the computed values in case of success,
/// and an error in case of failure.
///
/// TODO: Detailed error status.
pub fn run(&mut self) -> Result<T, ()> {
// Broadcast state machine thread.
//
// rx cannot be cloned due to its type constraint but can be used inside
// a thread with the help of an `Arc` (`Rc` wouldn't work for the same
// reason). A `Mutex` is used to grant write access.
let rx = self.rx.to_owned();
let tx = self.tx.to_owned();
let values = Arc::new(Mutex::new(self.values.to_owned()));
let echos = Arc::new(Mutex::new(self.echos.to_owned()));
let readys = Arc::new(Mutex::new(self.readys.to_owned()));
let final_value: Option<T> = None;
let final_value_r = Arc::new(Mutex::new(None));
let bvalue = self.broadcast_value.to_owned();
crossbeam::scope(|scope| {
scope.spawn(move || {
*final_value_r.lock().unwrap() =
inner_run(tx, rx, values, echos, readys, bvalue);
});
});
match final_value {
None => Err(()),
Some(v) => Ok(v)
}
}
}
impl<T> MessageLoop for BroadcastTask<T> {
fn run(&mut self) {
loop {
match self.task.receive_message() {
Ok(message) => self.on_message_received(message).unwrap(),
Err(Error::ProtobufError(e)) => warn!("Protobuf error {}", e),
Err(e) => {
warn!("Critical error {:?}", e);
break;
/// The main loop of the broadcast task.
fn inner_run<T>(tx: Arc<Mutex<spmc::Sender<Message<T>>>>,
rx: Arc<Mutex<mpsc::Receiver<Message<T>>>>,
values: Arc<Mutex<HashSet<Proof<T>>>>,
echos: Arc<Mutex<HashSet<Proof<T>>>>,
readys: Arc<Mutex<HashMap<Vec<u8>, usize>>>,
broadcast_value: Option<T>) -> Option<T>
where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
+ From<Vec<u8>> + AsRef<[u8]>
, Vec<u8>: From<T>
{
// return value
let reconstructed_value: Option<T> = None;
// Ready sent flags
let mut ready_sent: HashSet<Vec<u8>> = Default::default();
// Erasure coding scheme: N - 2f value shards and 2f parity shards
let parity_shard_num = 2 * PLACEHOLDER_F;
let data_shard_num = PLACEHOLDER_N - parity_shard_num;
let coding = ReedSolomon::new(data_shard_num, parity_shard_num).unwrap();
// Split the value into chunks/shards, encode them with erasure codes.
// Assemble a Merkle tree from data and parity shards. Take all proofs from
// this tree and send them, each to its own node.
//
// FIXME: Does the node send a proof to itself?
if let Some(v) = broadcast_value {
let mut v: Vec<u8> = Vec::from(v).to_owned();
// Pad the value vector with zeros to allow for shards of equal sizes.
let shard_pad_len = v.len() % data_shard_num;
for _i in 0..shard_pad_len {
v.push(0);
}
// Size of a Merkle tree leaf value, in bytes.
// Now the vector length is evenly divisible by the number of shards.
let shard_len = v.len() / data_shard_num;
// Pad the parity shards with zeros.
for _i in 0 .. shard_len * parity_shard_num {
v.push(0);
}
// Divide the vector into chunks/shards.
let shards_iter = v.chunks_mut(shard_len);
// Convert the iterator over slices into a vector of slices.
let mut shards: Vec<&mut [u8]> = Vec::new();
for s in shards_iter {
shards.push(s);
}
// Construct the parity chunks/shards
coding.encode(shards.as_mut_slice()).unwrap();
// Convert shards back to type `T` for proof generation.
let mut shards_T: Vec<T> = Vec::new();
for s in shards.iter() {
let s = Vec::into(s.to_vec());
shards_T.push(s);
}
// Convert the Merkle tree into a partial binary tree for later
// deconstruction into compound branches.
let mtree = MerkleTree::from_vec(&::ring::digest::SHA256, shards_T);
// Send each proof to a node.
//
// FIXME: use a single consumer TX channel.
for leaf_value in mtree.iter().cloned() {
let proof = mtree.gen_proof(leaf_value);
if let Some(proof) = proof {
tx.lock().unwrap().send(Message::Broadcast(
BroadcastMessage::Value(proof))).unwrap();
}
}
}
// TODO: handle exit conditions
while reconstructed_value == None {
// Receive a message from the socket IO task.
let message = rx.lock().unwrap().recv().unwrap();
if let Message::Broadcast(message) = message {
match message {
// A value received. Record the value and multicast an echo.
//
// TODO: determine if the paper treats multicast as reflexive and
// add an echo to this node if it does.
BroadcastMessage::Value(p) => {
values.lock().unwrap().insert(p.clone());
tx.lock().unwrap()
.send(Message::Broadcast(
BroadcastMessage::Echo(p)))
.unwrap()
},
// An echo received. Verify the proof it contains.
BroadcastMessage::Echo(p) => {
let root_hash = p.root_hash.clone();
//let echos = echos.lock().unwrap();
if p.validate(root_hash.as_slice()) {
echos.lock().unwrap().insert(p.clone());
// Upon receiving valid echos for the same root hash
// from N - f distinct parties, try to interpolate the
// Merkle tree.
//
// TODO: eliminate this iteration
let mut echo_n = 0;
for echo in echos.lock().unwrap().iter() {
if echo.root_hash == root_hash {
echo_n += 1;
}
}
if echo_n >= PLACEHOLDER_N - PLACEHOLDER_F {
// Try to interpolate the Merkle tree using the
// Reed-Solomon erasure coding scheme.
//
// FIXME: indicate the missing leaves with None
let mut leaves: Vec<Option<Box<[u8]>>> = Vec::new();
// TODO: optimise this loop out as well
for echo in
echos.lock().unwrap().iter()
{
if echo.root_hash == root_hash {
leaves.push(Some(
Box::from(echo.value.clone().into())));
}
}
coding.reconstruct_shards(leaves.as_mut_slice())
.unwrap();
// FIXME: Recompute Merkle tree root.
// if Ready has not yet been sent, multicast Ready
if let None = ready_sent.get(&root_hash) {
ready_sent.insert(root_hash.clone());
tx.lock().unwrap().send(Message::Broadcast(
BroadcastMessage::Ready(root_hash)))
.unwrap();
}
}
}
},
BroadcastMessage::Ready(ref h) => {
// Number of times Ready(h) was received.
let ready_n;
if let Some(n) = readys.lock().unwrap().get_mut(h) {
*n = *n + 1;
ready_n = *n;
}
else {
//
readys.lock().unwrap().insert(h.clone(), 1);
ready_n = 1;
}
// Upon receiving f + 1 matching Ready(h) messages, if Ready
// has not yet been sent, multicast Ready(h).
if (ready_n == PLACEHOLDER_F + 1) &&
(ready_sent.get(h) == None)
{
tx.lock().unwrap().send(Message::Broadcast(
BroadcastMessage::Ready(h.to_vec()))).unwrap();
}
// Upon receiving 2f + 1 matching Ready(h) messages, wait
// for N 2f Echo messages, then decode v.
if (ready_n > 2 * PLACEHOLDER_F) &&
(reconstructed_value == None) &&
(echos.lock().unwrap().len() >=
PLACEHOLDER_N - 2 * PLACEHOLDER_F)
{
// FIXME: decode v
}
}
}
}
}
fn on_message_received(&mut self, message: MessageProto) -> Result<(), Error> {
if message.has_broadcast() {
let broadcast = message.get_broadcast();
if broadcast.has_value() {
let value = broadcast.get_value();
}
else if broadcast.has_echo() {
let echo = broadcast.get_echo();
}
else if broadcast.has_ready() {
let ready = broadcast.get_ready();
}
return Ok(());
}
else {
warn!("Unexpected message type");
return Err(Error::ProtocolError);
error!("Incorrect message from the socket: {:?}",
message);
}
}
return reconstructed_value;
}
/// An additional path conversion operation on `Lemma` to allow reconstruction
/// of erasure-coded `Proof` from `Lemma`s. The output path, when read from left
/// to right, goes from leaf to root (LSB order).
pub fn lemma_to_path(lemma: &Lemma) -> Vec<bool> {
match lemma.sub_lemma {
None => {
match lemma.sibling_hash {
// lemma terminates with no leaf
None => vec![],
// the leaf is on the right
Some(Positioned::Left(_)) => vec![true],
// the leaf is on the left
Some(Positioned::Right(_)) => vec![false],
}
}
Some(ref l) => {
let mut p = lemma_to_path(l.as_ref());
match lemma.sibling_hash {
// lemma terminates
None => (),
// lemma branches out to the right
Some(Positioned::Left(_)) => p.push(true),
// lemma branches out to the left
Some(Positioned::Right(_)) => p.push(false),
}
p
}
}
}
/// Further conversion of a binary tree path into an array index.
pub fn path_to_index(mut path: Vec<bool>) -> usize {
let mut idx = 0;
// Convert to the MSB order.
path.reverse();
for &dir in path.iter() {
if dir == false {
idx = idx << 1;
}
else {
idx = (idx << 1) | 1;
}
}
idx
}

84
src/commst.rs Normal file
View File

@ -0,0 +1,84 @@
//! Comms task structure. A comms task communicates with a remote node through a
//! socket. Local communication with coordinating threads is made via
//! `spmc::channel()` and `mpsc::channel()`.
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use spmc;
use crossbeam;
use proto::Message;
use task;
/// A communication task connects a remote node to the thread that manages the
/// consensus algorithm.
pub struct CommsTask<T: Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
where Vec<u8>: From<T>
{
/// The transmit side of the multiple producer channel from comms threads.
tx: mpsc::Sender<Message<T>>,
/// The receive side of the multiple consumer channel to comms threads.
rx: spmc::Receiver<Message<T>>,
/// The socket IO task.
task: task::Task
}
impl<T: Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
CommsTask<T>
where Vec<u8>: From<T>
{
pub fn new(tx: mpsc::Sender<Message<T>>,
rx: spmc::Receiver<Message<T>>,
stream: ::std::net::TcpStream) -> Self {
CommsTask {
tx: tx,
rx: rx,
task: task::Task::new(stream)
}
}
/// The main socket IO loop and an asynchronous thread responding to manager
/// thread requests.
pub fn run(&mut self) {
// Borrow parts of `self` before entering the thread binding scope.
let tx = Arc::new(&self.tx);
let rx = Arc::new(&self.rx);
let task = Arc::new(Mutex::new(&mut self.task));
crossbeam::scope(|scope| {
// Make a further copy of `task` for the thread stack.
let task1 = task.clone();
// Local comms receive loop thread.
scope.spawn(move || {
loop {
// Receive a message from the manager thread.
let message = rx.recv().unwrap();
// Forward the message to the remote node.
task1.lock().unwrap().send_message(message).unwrap();
}
});
// Remote comms receive loop.
loop {
match task.lock().unwrap().receive_message() {
Ok(message) => // self.on_message_received(message),
tx.send(message).unwrap(),
Err(task::Error::ProtobufError(e)) =>
warn!("Protobuf error {}", e),
Err(e) => {
warn!("Critical error {:?}", e);
break;
}
}
}
});
}
/// Handler of a received message.
fn on_message_received(&mut self, message: Message<T>) {
// Forward the message to the manager thread.
self.tx.send(message).unwrap();
}
}

View File

@ -1,3 +1,39 @@
//! # HoneyBadgerBFT
//!
//! Library of asynchronous Byzantine fault tolerant consensus known as "the
//! honey badger of BFT protocols" after a paper with the same title.
//!
//! ## Example
//!
//! The following code could be run on host 192.168.1.1:
//!
//! ```rust
//! extern crate hbbft;
//!
//! use hbbft::node::Node;
//! use std::net::SocketAddr;
//! use std::vec::Vec;
//!
//! fn main() {
//! let bind_address = "192.168.1.1:10001".parse().unwrap();
//! let remote_addresses = vec!["192.168.1.2:10002".parse().unwrap(),
//! "192.168.1.3:10003".parse().unwrap(),
//! "192.168.1.4:10004".parse().unwrap(),
//! "192.168.1.5:10005".parse().unwrap()];
//! let value: &'static str = "Proposed value";
//!
//! let result = Node::new(bind_address, remote_addresses, Some(value))
//! .run();
//! println!("Consensus result {:?}", result);
//! }
//! ```
//!
//! Similar code shall then run on hosts 192.168.1.2, 192.168.1.3, 192.168.1.4
//! and 192.168.1.5, with appropriate changes in `bind_address` and
//! `remote_addresses`. Each host has it's own optional broadcast `value`. If
//! the consensus `result` is not an error then every successfully terminated
//! consensus node will be the same `result`.
#[macro_use]
extern crate error_chain;
#[macro_use]
@ -5,11 +41,16 @@ extern crate log;
extern crate protobuf;
extern crate ring;
extern crate merkle;
extern crate futures;
//extern crate futures;
extern crate spmc;
extern crate crossbeam;
extern crate reed_solomon_erasure;
mod errors;
mod proto;
mod task;
mod commst;
pub mod node;
pub mod broadcast;
pub mod agreement;

87
src/node.rs Normal file
View File

@ -0,0 +1,87 @@
//! Networking controls of the consensus node.
use std::fmt::Debug;
use std::hash::Hash;
use std::marker::{Send, Sync};
use std::net::{TcpListener, SocketAddr};
use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use spmc;
use crossbeam;
use broadcast;
use proto::Message;
use commst;
/// This is a structure to start a consensus node.
pub struct Node<T> {
/// Incoming connection socket.
addr: SocketAddr,
/// Sockets of remote nodes. TODO.
remotes: Vec<SocketAddr>,
/// Optionally, a value to be broadcast by this node.
value: Option<T>
}
impl<T: Clone + Debug + Eq + Hash + Send + Sync + From<Vec<u8>> + AsRef<[u8]>>
Node<T>
where Vec<u8>: From<T>
{
/// Consensus node constructor. It only initialises initial parameters.
pub fn new(addr: SocketAddr, remotes: Vec<SocketAddr>, value: Option<T>) ->
Self
{
Node {addr, remotes, value}
}
/// Consensus node procedure implementing HoneyBadgerBFT.
pub fn run(&self) -> Result<T, ()>
{
// Listen for incoming connections on a given TCP port.
let listener = TcpListener::bind(&self.addr).unwrap();
// Multicast channel from the manager task to comms tasks.
let (stx, srx): (spmc::Sender<Message<T>>,
spmc::Receiver<Message<T>>) = spmc::channel();
// Unicast channel from comms tasks to the manager task.
let (mtx, mrx): (mpsc::Sender<Message<T>>,
mpsc::Receiver<Message<T>>) = mpsc::channel();
let broadcast_value = self.value.to_owned();
// All spawned threads will have exited by the end of the scope.
crossbeam::scope(|scope| {
// Listen for incoming socket connections and start a comms task for
// each new connection.
for stream in listener.incoming() {
match stream {
Ok(stream) => {
info!("New connection from {:?}",
stream.peer_addr().unwrap());
let tx = mtx.clone();
let rx = srx.clone();
scope.spawn(move || {
commst::CommsTask::new(tx, rx, stream).run();
});
// TODO: break when all the remotes have joined
}
Err(e) => {
warn!("Failed to connect: {}", e);
}
}
}
// broadcast stage
let (tx, rx) = (Arc::new(Mutex::new(stx)),
Arc::new(Mutex::new(mrx)));
match broadcast::Stage::new(tx, rx, broadcast_value).run() {
Ok(_) => debug!("Broadcast stage succeeded"),
Err(_) => error!("Broadcast stage failed")
}
// TODO: other stages
}); // end of thread scope
Err(())
}
}

View File

@ -1,65 +1,146 @@
//! Construction of messages from protobuf buffers.
#![feature(optin_builtin_traits)]
pub mod message;
use std::marker::{Send, Sync};
use ring::digest::Algorithm;
use merkle::proof::{Proof, Lemma, Positioned};
//use protobuf::Message;
use self::message::*;
use protobuf::error::ProtobufResult;
use protobuf::Message as ProtobufMessage;
use proto::message::*;
use protobuf::error::{ProtobufResult, ProtobufError, WireError};
use protobuf::core::parse_from_bytes;
/// Kinds of message sent by nodes participating in consensus.
enum Message<T> {
#[derive (Clone, Debug, PartialEq)]
pub enum Message<T: Send + Sync> {
Broadcast(BroadcastMessage<T>),
Agreement(AgreementMessage<T>)
Agreement(AgreementMessage)
}
//unsafe impl<T: Send + Sync> Send for Message<T> { }
//impl<T: Send + Sync> !Sync for Message<T> { }
/// The three kinds of message sent during the reliable broadcast stage of the
/// consensus algorithm.
enum BroadcastMessage<T> {
#[derive (Clone, Debug, PartialEq)]
pub enum BroadcastMessage<T: Send + Sync> {
Value(Proof<T>),
Echo(Proof<T>),
Ready(Vec<u8>)
}
/// Messages sent during the binary Byzantine agreement stage.
enum AgreementMessage<T> {
#[derive (Clone, Debug, PartialEq)]
pub enum AgreementMessage {
// TODO
Phantom(T)
}
impl<T> Message<T> {
pub fn from_protobuf(algorithm: &'static Algorithm,
mut proto: message::MessageProto) -> Option<Self>
where T: From<Vec<u8>>,
impl<T: Send + Sync> Message<T> {
/// Translation from protobuf to the regular type.
///
/// TODO: add an `Algorithm` field to `MessageProto`. Either `Algorithm` has
/// to be fully serialised and sent as a whole, or it can be passed over
/// using an ID and the `Eq` instance to discriminate the finite set of
/// algorithms in `ring::digest`.
pub fn from_proto(mut proto: message::MessageProto)
-> Option<Self>
where T: From<Vec<u8>>
{
if proto.has_broadcast() {
proto.take_broadcast().into_broadcast(algorithm)
BroadcastMessage::from_proto(proto.take_broadcast(),
// TODO, possibly move Algorithm inside
// BroadcastMessage
&::ring::digest::SHA256)
.map(|b| Message::Broadcast(b))
}
else if proto.has_agreement() {
AgreementMessage::from_proto(proto.take_agreement())
.map(|a| Message::Agreement(a))
}
else {
// TODO
None
}
}
pub fn into_proto(self) -> MessageProto
where T: Into<Vec<u8>>
{
let mut m = MessageProto::new();
match self {
Message::Broadcast(b) => {
m.set_broadcast(b.into_proto());
},
Message::Agreement(a) => {
m.set_agreement(a.into_proto());
}
}
m
}
/// Parse a `Message` from its protobuf binary representation.
///
/// TODO: pass custom errors from down the chain of nested parsers as
/// opposed to returning `WireError::Other`.
pub fn parse_from_bytes(bytes: &[u8]) -> ProtobufResult<Self>
where T: From<Vec<u8>>
{
let r = parse_from_bytes::<MessageProto>(bytes)
.map(|proto| Self::from_proto(proto));
match r {
Ok(Some(m)) => Ok(m),
Ok(None) => Err(ProtobufError::WireError(WireError::Other)),
Err(e) => Err(e)
}
}
/// Produce a protobuf representation of this `Message`.
pub fn write_to_bytes(self) -> ProtobufResult<Vec<u8>>
where T: Into<Vec<u8>>
{
self.into_proto().write_to_bytes()
}
}
impl BroadcastProto {
pub fn into_broadcast<T>(mut self,
algorithm: &'static Algorithm)
-> Option<BroadcastMessage<T>>
where T: From<Vec<u8>>,
impl<T: Send + Sync> BroadcastMessage<T> {
pub fn into_proto(self) -> BroadcastProto
where T: Into<Vec<u8>>
{
if self.has_value() {
self.take_value().take_proof().into_proof(algorithm)
let mut b = BroadcastProto::new();
match self {
BroadcastMessage::Value(p) => {
let mut v = ValueProto::new();
v.set_proof(ProofProto::into_proto(p));
b.set_value(v);
},
BroadcastMessage::Echo(p) => {
let mut e = EchoProto::new();
e.set_proof(ProofProto::into_proto(p));
b.set_echo(e);
},
BroadcastMessage::Ready(h) => {
let mut r = ReadyProto::new();
r.set_root_hash(h);
}
}
b
}
pub fn from_proto(mut mp: BroadcastProto,
algorithm: &'static Algorithm)
-> Option<Self>
where T: From<Vec<u8>>
{
if mp.has_value() {
mp.take_value().take_proof().from_proto(algorithm)
.map(|p| BroadcastMessage::Value(p))
}
else if self.has_echo() {
self.take_echo().take_proof().into_proof(algorithm)
else if mp.has_echo() {
mp.take_echo().take_proof().from_proto(algorithm)
.map(|p| BroadcastMessage::Echo(p))
}
else if self.has_ready() {
let h = self.take_ready().take_root_hash();
else if mp.has_ready() {
let h = mp.take_ready().take_root_hash();
Some(BroadcastMessage::Ready(h))
}
else {
@ -68,17 +149,54 @@ impl BroadcastProto {
}
}
impl AgreementMessage {
pub fn into_proto(self) -> AgreementProto
{
unimplemented!();
}
pub fn from_proto(_mp: AgreementProto) -> Option<Self>
{
unimplemented!();
}
}
/// Serialisation of `Proof` defined against its protobuf interface to work
/// around the restriction of not being allowed to extend the implementation of
/// `Proof` outside its crate.
impl ProofProto {
pub fn into_proof<T>(mut self,
pub fn into_proto<T>(proof: Proof<T>) -> Self
where T: Into<Vec<u8>>
{
let mut proto = Self::new();
match proof {
Proof {
algorithm, // TODO: use
root_hash,
lemma,
value,
} => {
proto.set_root_hash(root_hash);
proto.set_lemma(LemmaProto::into_proto(lemma));
proto.set_value(value.into());
}
}
proto
}
pub fn from_proto<T>(mut self,
algorithm: &'static Algorithm)
-> Option<Proof<T>>
where T: From<Vec<u8>>
where T: From<Vec<u8>>
{
if !self.has_lemma() {
return None;
}
self.take_lemma().into_lemma().map(|lemma| {
self.take_lemma().from_proto().map(|lemma| {
Proof::new(
algorithm,
self.take_root_hash(),
@ -90,7 +208,35 @@ impl ProofProto {
}
impl LemmaProto {
pub fn into_lemma(mut self) -> Option<Lemma> {
pub fn into_proto(lemma: Lemma) -> Self {
let mut proto = Self::new();
match lemma {
Lemma {node_hash, sibling_hash, sub_lemma} => {
proto.set_node_hash(node_hash);
if let Some(sub_proto) = sub_lemma.map(
|l| Self::into_proto(*l))
{
proto.set_sub_lemma(sub_proto);
}
match sibling_hash {
Some(Positioned::Left(hash)) =>
proto.set_left_sibling_hash(hash),
Some(Positioned::Right(hash)) =>
proto.set_right_sibling_hash(hash),
None => {}
}
}
}
proto
}
pub fn from_proto(mut self) -> Option<Lemma> {
let node_hash = self.take_node_hash();
let sibling_hash = if self.has_left_sibling_hash() {
@ -105,7 +251,7 @@ impl LemmaProto {
// If a `sub_lemma` is present is the Protobuf,
// then we expect it to unserialize to a valid `Lemma`,
// otherwise we return `None`
self.take_sub_lemma().into_lemma().map(|sub_lemma| {
self.take_sub_lemma().from_proto().map(|sub_lemma| {
Lemma {
node_hash: node_hash,
sibling_hash: sibling_hash,

View File

@ -6,8 +6,8 @@ use std::{cmp,io};
use std::io::Read;
use std::net::TcpStream;
use protobuf;
use protobuf::Message as ProtoBufMessage;
use proto::message::{MessageProto};
use protobuf::Message as ProtobufMessage;
use proto::*;
/// A magic key to put right before each message. An atavism of primitive serial
/// protocols.
@ -59,30 +59,13 @@ fn decode_u32_from_be(buffer: &[u8]) -> Result<u32, Error> {
Ok(result)
}
/// A trait allowing custom definitions of the main loop and the received
/// message callback.
pub trait MessageLoop {
fn run(&mut self);
fn on_message_received(&mut self,
message: MessageProto)
-> Result<(), Error>;
}
pub struct Task {
stream: TcpStream,
buffer: [u8; 1024],
}
/// Placeholder `MessageLoop` definition for a generic `Task`.
impl MessageLoop for Task {
fn run(&mut self) {}
fn on_message_received(&mut self, _: MessageProto) -> Result<(), Error> {
Ok(())
}
}
/// A message handling task.
impl Task where Self: MessageLoop {
impl Task where {
pub fn new(stream: TcpStream) -> Task {
Task {
stream,
@ -90,7 +73,9 @@ impl Task where Self: MessageLoop {
}
}
pub fn receive_message(&mut self) -> Result<MessageProto, Error> {
pub fn receive_message<T>(&mut self) -> Result<Message<T>, Error>
where T: From<Vec<u8>> + Send + Sync
{
self.stream.read_exact(&mut self.buffer[0..4])?;
let frame_start = decode_u32_from_be(&self.buffer[0..4])?;
if frame_start != FRAME_START {
@ -99,30 +84,36 @@ impl Task where Self: MessageLoop {
self.stream.read_exact(&mut self.buffer[0..4])?;
let size = decode_u32_from_be(&self.buffer[0..4])? as usize;
let mut message: Vec<u8> = Vec::new();
message.reserve(size);
while message.len() < size {
let num_to_read = cmp::min(self.buffer.len(), size - message.len());
let mut message_v: Vec<u8> = Vec::new();
message_v.reserve(size);
while message_v.len() < size {
let num_to_read = cmp::min(self.buffer.len(), size -
message_v.len());
let (slice, _) = self.buffer.split_at_mut(num_to_read);
self.stream.read_exact(slice)?;
message.extend_from_slice(slice);
message_v.extend_from_slice(slice);
}
let message = protobuf::parse_from_bytes::<MessageProto>(&message)?;
Ok(message)
Message::parse_from_bytes(&message_v)
.map_err(|e| Error::ProtobufError(e))
}
pub fn send_message(&mut self, message: &MessageProto) -> Result<(), Error> {
pub fn send_message<T>(&mut self, message: Message<T>)
-> Result<(), Error>
where T: Into<Vec<u8>> + Send + Sync
{
let mut buffer: [u8; 4] = [0; 4];
// Wrap stream
let mut stream = protobuf::CodedOutputStream::new(&mut self.stream);
// Write magic number
encode_u32_to_be(FRAME_START, &mut buffer[0..4])?;
stream.write_raw_bytes(&buffer)?;
let message_p = message.into_proto();
// Write message size
encode_u32_to_be(message.compute_size(), &mut buffer[0..4])?;
encode_u32_to_be(message_p.compute_size(), &mut buffer[0..4])?;
stream.write_raw_bytes(&buffer)?;
// Write message
message.write_to(&mut stream)?;
message_p.write_to(&mut stream)?;
// Flush
stream.flush()?;
Ok(())