Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Peter van Nostrand 2018-05-06 19:32:15 -04:00
commit a6901b9be1
15 changed files with 381 additions and 1022 deletions

25
.travis.yml Normal file
View File

@ -0,0 +1,25 @@
language: rust
rust: nightly-2018-04-19
cache: cargo
addons:
apt:
packages:
- unzip
before_install:
- curl -OL https://github.com/google/protobuf/releases/download/v3.5.1/protoc-3.5.1-linux-x86_64.zip
- sudo unzip protoc-3.5.1-linux-x86_64.zip -d /usr/local bin/protoc
- sudo chown $(whoami) /usr/local/bin/protoc
- protoc --version
- rm protoc-3.5.1-linux-x86_64.zip
- rustup component add rustfmt-preview
- cargo install clippy -f --vers=0.0.195
env:
global:
- RUST_BACKTRACE=1
- RUSTFLAGS="-D warnings"
script:
- cargo fmt -- --write-mode=diff
- cargo clippy -- -D clippy
- cargo clippy --tests -- -D clippy
- cargo check --tests
- cargo test

View File

@ -1,13 +1,18 @@
//! Example of a consensus node that uses the `hbbft::node::Node` struct for
//! running the distributed consensus state machine.
//#[macro_use]
extern crate crossbeam;
#[macro_use]
extern crate crossbeam_channel;
extern crate docopt;
extern crate hbbft;
#[macro_use]
extern crate log;
extern crate simple_logger;
mod network;
use docopt::Docopt;
use hbbft::node::Node;
use network::node::Node;
use std::collections::HashSet;
use std::net::SocketAddr;
use std::vec::Vec;

View File

@ -8,10 +8,9 @@ use std::io;
use std::net::TcpStream;
use std::sync::Arc;
use messaging::SourcedMessage;
use proto::Message;
use proto_io;
use proto_io::ProtoIo;
use hbbft::messaging::SourcedMessage;
use hbbft::proto::Message;
use hbbft::proto_io::{self, ProtoIo};
#[derive(Debug)]
pub enum Error {
@ -32,7 +31,7 @@ pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + I
/// The receive side of the channel to the comms thread.
rx: &'a Receiver<Message<T>>,
/// The socket IO task.
io: ProtoIo,
io: ProtoIo<TcpStream>,
/// The index of this comms task for identification against its remote node.
pub node_index: usize,
}

View File

@ -0,0 +1,189 @@
//! The local message delivery system.
use crossbeam::{Scope, ScopedJoinHandle};
use crossbeam_channel;
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use hbbft::messaging::{SourcedMessage, Target, TargetedMessage};
use hbbft::proto::Message;
use std::fmt::Debug;
/// The queue functionality for messages sent between algorithm instances.
/// The messaging struct allows for targeted message exchange between comms
/// tasks on one side and algo tasks on the other.
pub struct Messaging<T: Clone + Debug + Send + Sync> {
/// Transmit sides of message channels to comms threads.
txs_to_comms: Vec<Sender<Message<T>>>,
/// Receive side of the routed message channel from comms threads.
rx_from_comms: Receiver<SourcedMessage<T>>,
/// Transmit sides of message channels to algo threads.
txs_to_algo: Vec<Sender<SourcedMessage<T>>>,
/// Receive side of the routed message channel from comms threads.
rx_from_algo: Receiver<TargetedMessage<T>>,
/// RX handles to be used by comms tasks.
rxs_to_comms: Vec<Receiver<Message<T>>>,
/// TX handle to be used by comms tasks.
tx_from_comms: Sender<SourcedMessage<T>>,
/// RX handles to be used by algo tasks.
rxs_to_algo: Vec<Receiver<SourcedMessage<T>>>,
/// TX handle to be used by algo tasks.
tx_from_algo: Sender<TargetedMessage<T>>,
/// Control channel used to stop the listening thread.
stop_tx: Sender<()>,
stop_rx: Receiver<()>,
}
impl<T: Clone + Debug + Send + Sync> Messaging<T> {
/// Initialises all the required TX and RX handles for the case on a total
/// number `num_nodes` of consensus nodes.
pub fn new(num_nodes: usize) -> Self {
let to_comms: Vec<_> = (0..num_nodes - 1)
.map(|_| unbounded::<Message<T>>())
.collect();
let txs_to_comms = to_comms.iter().map(|&(ref tx, _)| tx.to_owned()).collect();
let rxs_to_comms: Vec<Receiver<Message<T>>> =
to_comms.iter().map(|&(_, ref rx)| rx.to_owned()).collect();
let (tx_from_comms, rx_from_comms) = unbounded();
let to_algo: Vec<_> = (0..num_nodes)
.map(|_| unbounded::<SourcedMessage<T>>())
.collect();
let txs_to_algo = to_algo.iter().map(|&(ref tx, _)| tx.to_owned()).collect();
let rxs_to_algo: Vec<Receiver<SourcedMessage<T>>> =
to_algo.iter().map(|&(_, ref rx)| rx.to_owned()).collect();
let (tx_from_algo, rx_from_algo) = unbounded();
let (stop_tx, stop_rx) = bounded(1);
Messaging {
// internally used handles
txs_to_comms,
rx_from_comms,
txs_to_algo,
rx_from_algo,
// externally used handles
rxs_to_comms,
tx_from_comms,
rxs_to_algo,
tx_from_algo,
stop_tx,
stop_rx,
}
}
pub fn rxs_to_comms(&self) -> &Vec<Receiver<Message<T>>> {
&self.rxs_to_comms
}
pub fn tx_from_comms(&self) -> &Sender<SourcedMessage<T>> {
&self.tx_from_comms
}
pub fn rxs_to_algo(&self) -> &Vec<Receiver<SourcedMessage<T>>> {
&self.rxs_to_algo
}
pub fn tx_from_algo(&self) -> &Sender<TargetedMessage<T>> {
&self.tx_from_algo
}
/// Gives the ownership of the handle to stop the message receive loop.
pub fn stop_tx(&self) -> Sender<()> {
self.stop_tx.to_owned()
}
/// Spawns the message delivery thread in a given thread scope.
pub fn spawn<'a>(&self, scope: &Scope<'a>) -> ScopedJoinHandle<Result<(), Error>>
where
T: 'a,
{
let txs_to_comms = self.txs_to_comms.to_owned();
let rx_from_comms = self.rx_from_comms.to_owned();
let txs_to_algo = self.txs_to_algo.to_owned();
let rx_from_algo = self.rx_from_algo.to_owned();
let stop_rx = self.stop_rx.to_owned();
let mut stop = false;
// TODO: `select_loop!` seems to really confuse Clippy.
#[cfg_attr(
feature = "cargo-clippy",
allow(never_loop, if_let_redundant_pattern_matching, deref_addrof)
)]
scope.spawn(move || {
let mut result = Ok(());
// This loop forwards messages according to their metadata.
while !stop && result.is_ok() {
select_loop! {
recv(rx_from_algo, message) => {
match message {
TargetedMessage {
target: Target::All,
message
} => {
// Send the message to all remote nodes, stopping at
// the first error.
result = txs_to_comms.iter()
.fold(Ok(()), |result, tx| {
if result.is_ok() {
tx.send(message.clone())
}
else {
result
}
}).map_err(Error::from);
},
TargetedMessage {
target: Target::Node(i),
message
} => {
// Remote node indices start from 1.
assert!(i > 0);
// Convert node index to vector index.
let i = i - 1;
result = if i < txs_to_comms.len() {
txs_to_comms[i].send(message.clone())
.map_err(Error::from)
}
else {
Err(Error::NoSuchTarget)
};
}
}
},
recv(rx_from_comms, message) => {
// Send the message to all algorithm instances, stopping at
// the first error.
result = txs_to_algo.iter().fold(Ok(()), |result, tx| {
if result.is_ok() {
tx.send(message.clone())
}
else {
result
}
}).map_err(Error::from)
},
recv(stop_rx, _) => {
// Flag the thread ready to exit.
stop = true;
}
}
} // end of select_loop!
result
})
}
}
#[derive(Clone, Debug)]
pub enum Error {
NoSuchTarget,
SendError,
}
impl<T> From<crossbeam_channel::SendError<T>> for Error {
fn from(_: crossbeam_channel::SendError<T>) -> Error {
Error::SendError
}
}

4
examples/network/mod.rs Normal file
View File

@ -0,0 +1,4 @@
pub mod commst;
pub mod connection;
pub mod messaging;
pub mod node;

View File

@ -1,16 +1,50 @@
//! Networking controls of the consensus node.
//!
//! ## Example
//!
//! The following code could be run on host 192.168.1.1:
//!
//! ```ignore
//! extern crate hbbft;
//!
//! use hbbft::node::Node;
//! use std::net::SocketAddr;
//! use std::vec::Vec;
//!
//! fn main() {
//! let bind_address = "127.0.0.1:10001".parse().unwrap();
//! let remote_addresses = vec!["192.168.1.2:10002",
//! "192.168.1.3:10003",
//! "192.168.1.4:10004"]
//! .iter()
//! .map(|s| s.parse().unwrap())
//! .collect();
//!
//! let value = "Value #1".as_bytes().to_vec();
//!
//! let result = Node::new(bind_address, remote_addresses, Some(value))
//! .run();
//! println!("Consensus result {:?}", result);
//! }
//! ```
//!
//! Similar code shall then run on hosts 192.168.1.2, 192.168.1.3 and
//! 192.168.1.4 with appropriate changes in `bind_address` and
//! `remote_addresses`. Each host has it's own optional broadcast `value`. If
//! the consensus `result` is not an error then every successfully terminated
//! consensus node will be the same `result`.
use crossbeam;
use merkle::Hashable;
use std::collections::HashSet;
use std::fmt::Debug;
use std::io;
use std::marker::{Send, Sync};
use std::net::SocketAddr;
use broadcast;
use commst;
use connection;
use messaging::Messaging;
use hbbft::broadcast;
use network::commst;
use network::connection;
use network::messaging::Messaging;
#[derive(Debug)]
pub enum Error {
@ -41,7 +75,7 @@ pub struct Node<T> {
value: Option<T>,
}
impl<T: Clone + Debug + Hashable + PartialEq + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
Node<T>
{
/// Consensus node constructor. It only initialises initial parameters.

View File

@ -3,16 +3,17 @@
use proto::AgreementMessage;
use std::collections::{BTreeSet, VecDeque};
#[derive(Default)]
pub struct Agreement {
input: Option<bool>,
bin_values: BTreeSet<bool>,
_bin_values: BTreeSet<bool>,
}
impl Agreement {
pub fn new() -> Self {
Agreement {
input: None,
bin_values: BTreeSet::new(),
_bin_values: BTreeSet::new(),
}
}
@ -29,7 +30,7 @@ impl Agreement {
/// Receive input from a remote node.
pub fn on_input(
&self,
_message: AgreementMessage,
_message: &AgreementMessage,
) -> Result<VecDeque<AgreementMessage>, Error> {
Err(Error::NotImplemented)
}

View File

@ -1,5 +1,4 @@
//! Reliable broadcast algorithm instance.
use crossbeam;
use crossbeam_channel::{Receiver, RecvError, SendError, Sender};
use merkle::proof::{Lemma, Positioned, Proof};
use merkle::{Hashable, MerkleTree};
@ -11,11 +10,12 @@ use std::fmt::Debug;
use std::hash::Hash;
use std::iter;
use std::marker::{Send, Sync};
use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
use std::sync::{RwLock, RwLockWriteGuard};
use messaging;
use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoopState, ProposedValue,
QMessage, RemoteMessage, RemoteNode, SourcedMessage, Target, TargetedMessage};
use messaging::{SourcedMessage, Target, TargetedMessage};
// TODO: Make this a generic argument of `Broadcast`.
type ProposedValue = Vec<u8>;
type MessageQueue<NodeUid> = VecDeque<TargetedBroadcastMessage<NodeUid>>;
@ -26,12 +26,12 @@ pub struct TargetedBroadcastMessage<NodeUid> {
pub message: BroadcastMessage<ProposedValue>,
}
impl TargetedBroadcastMessage<messaging::NodeUid> {
pub fn into_remote_message(self) -> RemoteMessage {
RemoteMessage {
node: match self.target {
BroadcastTarget::All => RemoteNode::All,
BroadcastTarget::Node(node) => RemoteNode::Node(node),
impl TargetedBroadcastMessage<usize> {
pub fn into_targeted_message(self) -> TargetedMessage<ProposedValue> {
TargetedMessage {
target: match self.target {
BroadcastTarget::All => Target::All,
BroadcastTarget::Node(node) => Target::Node(node),
},
message: Message::Broadcast(self.message),
}
@ -74,95 +74,6 @@ pub struct Broadcast<NodeUid: Eq + Hash> {
state: RwLock<BroadcastState>,
}
impl Broadcast<messaging::NodeUid> {
/// The message-driven interface function for calls from the main message
/// loop.
pub fn on_message(
&self,
m: QMessage,
tx: &Sender<QMessage>,
) -> Result<MessageLoopState, Error> {
match m {
QMessage::Local(LocalMessage { message, .. }) => match message {
AlgoMessage::BroadcastInput(value) => self.on_local_message(&mut value.to_owned()),
_ => Err(Error::UnexpectedMessage),
},
QMessage::Remote(RemoteMessage {
node: RemoteNode::Node(uid),
message,
}) => {
if let Message::Broadcast(b) = message {
self.on_remote_message(uid, b, tx)
} else {
Err(Error::UnexpectedMessage)
}
}
_ => Err(Error::UnexpectedMessage),
}
}
fn on_remote_message(
&self,
uid: messaging::NodeUid,
message: BroadcastMessage<ProposedValue>,
tx: &Sender<QMessage>,
) -> Result<MessageLoopState, Error> {
let (output, messages) = self.handle_broadcast_message(&uid, message)?;
if let Some(value) = output {
tx.send(QMessage::Local(LocalMessage {
dst: Algorithm::CommonSubset,
message: AlgoMessage::BroadcastOutput(self.our_id, value),
})).map_err(Error::from)?;
}
Ok(MessageLoopState::Processing(
messages
.into_iter()
.map(TargetedBroadcastMessage::into_remote_message)
.collect(),
))
}
/// Processes the proposed value input by broadcasting it.
pub fn on_local_message(&self, value: &mut ProposedValue) -> Result<MessageLoopState, Error> {
let mut state = self.state.write().unwrap();
// Split the value into chunks/shards, encode them with erasure codes.
// Assemble a Merkle tree from data and parity shards. Take all proofs
// from this tree and send them, each to its own node.
self.send_shards(value.clone())
.map(|(proof, remote_messages)| {
// Record the first proof as if it were sent by the node to
// itself.
let h = proof.root_hash.clone();
if proof.validate(h.as_slice()) {
// Save the leaf value for reconstructing the tree later.
state.leaf_values[index_of_proof(&proof)] =
Some(proof.value.clone().into_boxed_slice());
state.leaf_values_num += 1;
state.root_hash = Some(h);
}
MessageLoopState::Processing(
remote_messages
.into_iter()
.map(TargetedBroadcastMessage::into_remote_message)
.collect(),
)
})
}
}
impl<'a, E> Handler<E> for Broadcast<messaging::NodeUid>
where
E: From<Error> + From<messaging::Error>,
{
fn handle(&self, m: QMessage, tx: Sender<QMessage>) -> Result<MessageLoopState, E> {
self.on_message(m, &tx).map_err(E::from)
}
}
impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
/// Creates a new broadcast instance to be used by node `our_id` which expects a value proposal
/// from node `proposer_id`.
@ -487,14 +398,10 @@ pub struct Instance<'a, T: 'a + Clone + Debug + Send + Sync> {
tx: &'a Sender<TargetedMessage<ProposedValue>>,
/// The receive side of the channel from comms threads.
rx: &'a Receiver<SourcedMessage<ProposedValue>>,
/// The broadcast algorithm instance.
broadcast: Broadcast<usize>,
/// Value to be broadcast.
broadcast_value: Option<T>,
/// This instance's index for identification against its comms task.
node_index: usize,
/// Number of nodes participating in broadcast.
num_nodes: usize,
/// Maximum allowed number of faulty nodes.
num_faulty_nodes: usize,
}
impl<'a, T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>>
@ -505,45 +412,25 @@ impl<'a, T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8
rx: &'a Receiver<SourcedMessage<ProposedValue>>,
broadcast_value: Option<T>,
num_nodes: usize,
node_index: usize,
proposer_index: usize,
) -> Self {
let all_indexes = (0..num_nodes).collect();
let broadcast = Broadcast::new(0, proposer_index, all_indexes)
.expect("failed to instantiate broadcast");
Instance {
tx,
rx,
broadcast,
broadcast_value,
node_index,
num_nodes,
num_faulty_nodes: (num_nodes - 1) / 3,
}
}
/// Broadcast stage task returning the computed values in case of success,
/// and an error in case of failure.
pub fn run(&mut self) -> Result<T, Error> {
pub fn run(self) -> Result<T, Error> {
// Broadcast state machine thread.
let bvalue = self.broadcast_value.to_owned();
let result: Result<T, Error>;
let result_r = Arc::new(Mutex::new(None));
let result_r_scoped = result_r.clone();
crossbeam::scope(|scope| {
scope.spawn(move || {
*result_r_scoped.lock().unwrap() = Some(inner_run(
self.tx,
self.rx,
bvalue,
self.node_index,
self.num_nodes,
self.num_faulty_nodes,
));
});
});
if let Some(ref r) = *result_r.lock().unwrap() {
result = r.to_owned();
} else {
result = Err(Error::Threading);
}
result
let bvalue: Option<ProposedValue> = self.broadcast_value.map(|v| v.into());
inner_run(self.tx, self.rx, bvalue, &self.broadcast).map(ProposedValue::into)
}
}
@ -554,7 +441,6 @@ pub enum Error {
Threading,
ProofConstructionFailed,
ReedSolomon(rse::Error),
Send(SendError<QMessage>),
SendDeprecated(SendError<TargetedMessage<ProposedValue>>),
Recv(RecvError),
UnexpectedMessage,
@ -567,12 +453,6 @@ impl From<rse::Error> for Error {
}
}
impl From<SendError<QMessage>> for Error {
fn from(err: SendError<QMessage>) -> Error {
Error::Send(err)
}
}
impl From<SendError<TargetedMessage<ProposedValue>>> for Error {
fn from(err: SendError<TargetedMessage<ProposedValue>>) -> Error {
Error::SendDeprecated(err)
@ -585,138 +465,25 @@ impl From<RecvError> for Error {
}
}
/// Breaks the input value into shards of equal length and encodes them -- and
/// some extra parity shards -- with a Reed-Solomon erasure coding scheme. The
/// returned value contains the shard assigned to this node. That shard doesn't
/// need to be sent anywhere. It is returned to the broadcast instance and gets
/// recorded immediately.
fn send_shards<'a, T>(
value: T,
/// The main loop of the broadcast task.
fn inner_run<'a>(
tx: &'a Sender<TargetedMessage<ProposedValue>>,
coding: &ReedSolomon,
) -> Result<Proof<ProposedValue>, Error>
where
T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>,
{
let data_shard_num = coding.data_shard_count();
let parity_shard_num = coding.parity_shard_count();
debug!(
"Data shards: {}, parity shards: {}",
data_shard_num, parity_shard_num
);
let mut v: Vec<u8> = T::into(value);
// Insert the length of `v` so it can be decoded without the padding.
let payload_len = v.len() as u8;
v.insert(0, payload_len); // TODO: Handle messages larger than 255 bytes.
let value_len = v.len();
// Size of a Merkle tree leaf value, in bytes.
let shard_len = if value_len % data_shard_num > 0 {
value_len / data_shard_num + 1
} else {
value_len / data_shard_num
};
// Pad the last data shard with zeros. Fill the parity shards with zeros.
v.resize(shard_len * (data_shard_num + parity_shard_num), 0);
debug!("value_len {}, shard_len {}", value_len, shard_len);
// Divide the vector into chunks/shards.
let shards_iter = v.chunks_mut(shard_len);
// Convert the iterator over slices into a vector of slices.
let mut shards: Vec<&mut [u8]> = Vec::new();
for s in shards_iter {
shards.push(s);
}
debug!("Shards before encoding: {:?}", shards);
// Construct the parity chunks/shards
coding.encode(shards.as_mut_slice())?;
debug!("Shards: {:?}", shards);
let shards_t: Vec<ProposedValue> = shards.into_iter().map(|s| s.to_vec()).collect();
// Convert the Merkle tree into a partial binary tree for later
// deconstruction into compound branches.
let mtree = MerkleTree::from_vec(&::ring::digest::SHA256, shards_t);
// Default result in case of `gen_proof` error.
let mut result = Err(Error::ProofConstructionFailed);
// Send each proof to a node.
for (i, leaf_value) in mtree.iter().enumerate() {
let proof = mtree.gen_proof(leaf_value.to_vec());
if let Some(proof) = proof {
if i == 0 {
// The first proof is addressed to this node.
result = Ok(proof);
} else {
// Rest of the proofs are sent to remote nodes.
tx.send(TargetedMessage {
target: Target::Node(i),
message: Message::Broadcast(BroadcastMessage::Value(proof)),
})?;
}
rx: &'a Receiver<SourcedMessage<ProposedValue>>,
broadcast_value: Option<ProposedValue>,
broadcast: &Broadcast<usize>,
) -> Result<ProposedValue, Error> {
if let Some(v) = broadcast_value {
for msg in broadcast
.propose_value(v)?
.into_iter()
.map(TargetedBroadcastMessage::into_targeted_message)
{
tx.send(msg)?;
}
}
result
}
/// The main loop of the broadcast task.
fn inner_run<'a, T>(
tx: &'a Sender<TargetedMessage<ProposedValue>>,
rx: &'a Receiver<SourcedMessage<ProposedValue>>,
broadcast_value: Option<T>,
node_index: usize,
num_nodes: usize,
num_faulty_nodes: usize,
) -> Result<T, Error>
where
T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>,
{
// Erasure coding scheme: N - 2f value shards and 2f parity shards
let parity_shard_num = 2 * num_faulty_nodes;
let data_shard_num = num_nodes - parity_shard_num;
let coding = ReedSolomon::new(data_shard_num, parity_shard_num)?;
// currently known leaf values
let mut leaf_values: Vec<Option<Box<[u8]>>> = vec![None; num_nodes];
// Write-once root hash of a tree broadcast from the sender associated with
// this instance.
let mut root_hash: Option<Vec<u8>> = None;
// number of non-None leaf values
let mut leaf_values_num = 0;
// Split the value into chunks/shards, encode them with erasure codes.
// Assemble a Merkle tree from data and parity shards. Take all proofs from
// this tree and send them, each to its own node.
if let Some(v) = broadcast_value {
send_shards(v, tx, &coding).map(|proof| {
// Record the first proof as if it were sent by the node to
// itself.
let h = proof.root_hash.clone();
if proof.validate(h.as_slice()) {
// Save the leaf value for reconstructing the tree later.
leaf_values[index_of_proof(&proof)] = Some(proof.value.clone().into_boxed_slice());
leaf_values_num += 1;
root_hash = Some(h);
}
})?
}
// return value
let mut result: Option<Result<T, Error>> = None;
// Number of times Echo was received with the same root hash.
let mut echo_num = 0;
// Number of times Ready was received with the same root hash.
let mut readys: HashMap<Vec<u8>, usize> = HashMap::new();
let mut ready_sent = false;
let mut ready_to_decode = false;
// TODO: handle exit conditions
while result.is_none() {
loop {
// Receive a message from the socket IO task.
let message = rx.recv()?;
if let SourcedMessage {
@ -724,132 +491,19 @@ where
message: Message::Broadcast(message),
} = message
{
match message {
// A value received. Record the value and multicast an echo.
BroadcastMessage::Value(p) => {
if i != node_index {
// Ignore value messages from unrelated remote nodes.
continue;
}
if root_hash.is_none() {
root_hash = Some(p.root_hash.clone());
debug!(
"Node {} Value root hash {:?}",
node_index,
HexBytes(&p.root_hash)
);
}
if let Some(ref h) = root_hash {
if p.validate(h.as_slice()) {
// Save the leaf value for reconstructing the tree
// later.
leaf_values[index_of_proof(&p)] =
Some(p.value.clone().into_boxed_slice());
leaf_values_num += 1;
}
}
// Broadcast an echo of this proof.
tx.send(TargetedMessage {
target: Target::All,
message: Message::Broadcast(BroadcastMessage::Echo(p)),
})?
}
// An echo received. Verify the proof it contains.
BroadcastMessage::Echo(p) => {
if root_hash.is_none() && i == node_index {
root_hash = Some(p.root_hash.clone());
debug!("Node {} Echo root hash {:?}", node_index, root_hash);
}
// call validate with the root hash as argument
if let Some(ref h) = root_hash {
if p.validate(h.as_slice()) {
echo_num += 1;
// Save the leaf value for reconstructing the tree
// later.
leaf_values[index_of_proof(&p)] =
Some(p.value.clone().into_boxed_slice());
leaf_values_num += 1;
// upon receiving 2f + 1 matching READY(h)
// messages, wait for N 2 f ECHO messages, then
// decode v
if ready_to_decode
&& leaf_values_num >= num_nodes - 2 * num_faulty_nodes
{
result = Some(decode_from_shards(
&mut leaf_values,
&coding,
data_shard_num,
h,
));
} else if leaf_values_num >= num_nodes - num_faulty_nodes {
result = Some(decode_from_shards(
&mut leaf_values,
&coding,
data_shard_num,
h,
));
// if Ready has not yet been sent, multicast
// Ready
if !ready_sent {
ready_sent = true;
tx.send(TargetedMessage {
target: Target::All,
message: Message::Broadcast(BroadcastMessage::Ready(
h.to_owned(),
)),
})?;
}
}
}
}
}
BroadcastMessage::Ready(ref hash) => {
// Update the number Ready has been received with this hash.
*readys.entry(hash.to_vec()).or_insert(1) += 1;
// Check that the root hash matches.
if let Some(ref h) = root_hash {
let ready_num: usize = *readys.get(h).unwrap_or(&0);
// Upon receiving f + 1 matching Ready(h) messages, if
// Ready has not yet been sent, multicast Ready(h).
if (ready_num == num_faulty_nodes + 1) && !ready_sent {
tx.send(TargetedMessage {
target: Target::All,
message: Message::Broadcast(BroadcastMessage::Ready(h.to_vec())),
})?;
}
// Upon receiving 2f + 1 matching Ready(h) messages,
// wait for N 2f Echo messages, then decode v.
if ready_num > 2 * num_faulty_nodes {
// Wait for N - 2f Echo messages, then decode v.
if echo_num >= num_nodes - 2 * num_faulty_nodes {
result = Some(decode_from_shards(
&mut leaf_values,
&coding,
data_shard_num,
h,
));
} else {
ready_to_decode = true;
}
}
}
}
let (opt_output, msgs) = broadcast.handle_broadcast_message(&i, message)?;
for msg in msgs.into_iter()
.map(TargetedBroadcastMessage::into_targeted_message)
{
tx.send(msg)?;
}
if let Some(output) = opt_output {
return Ok(output);
}
} else {
error!("Incorrect message from the socket: {:?}", message);
}
}
// result is not a None, safe to extract value
result.unwrap()
}
fn decode_from_shards<T>(

View File

@ -1,5 +1,8 @@
//! Asynchronous Common Subset algorithm.
// TODO: This module is work in progress. Remove this attribute when it's not needed anymore.
#![allow(unused)]
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt::{Debug, Display};
use std::hash::Hash;
@ -10,10 +13,11 @@ use agreement::Agreement;
use broadcast;
use broadcast::{Broadcast, TargetedBroadcastMessage};
use messaging::ProposedValue;
use proto::{AgreementMessage, BroadcastMessage};
// TODO: Make this a generic argument of `Broadcast`.
type ProposedValue = Vec<u8>;
/// Input from a remote node to Common Subset.
pub enum Input<NodeUid> {
/// Message from a remote node `uid` to the broadcast instance `uid`.
@ -70,7 +74,7 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
num_nodes,
num_faulty_nodes,
agreement_true_outputs: HashSet::new(),
broadcast_instances: broadcast_instances,
broadcast_instances,
agreement_instances: HashMap::new(),
broadcast_results: HashMap::new(),
agreement_results: HashMap::new(),
@ -97,8 +101,11 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
/// Upon delivery of v_j from RBC_j, if input has not yet been provided to
/// BA_j, then provide input 1 to BA_j. See Figure 11.
pub fn on_broadcast_result(&mut self, uid: NodeUid) -> Result<Option<AgreementMessage>, Error> {
if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) {
pub fn on_broadcast_result(
&mut self,
uid: &NodeUid,
) -> Result<Option<AgreementMessage>, Error> {
if let Some(agreement_instance) = self.agreement_instances.get_mut(uid) {
if !agreement_instance.has_input() {
Ok(Some(agreement_instance.set_input(true)))
} else {
@ -131,7 +138,7 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
}
};
if instance_result.is_some() {
self.on_broadcast_result(uid)?;
self.on_broadcast_result(&uid)?;
}
input_result
}

View File

@ -2,59 +2,20 @@
//!
//! Library of asynchronous Byzantine fault tolerant consensus known as "the
//! honey badger of BFT protocols" after a paper with the same title.
//!
//! ## Example
//!
//! The following code could be run on host 192.168.1.1:
//!
//! ```ignore
//! extern crate hbbft;
//!
//! use hbbft::node::Node;
//! use std::net::SocketAddr;
//! use std::vec::Vec;
//!
//! fn main() {
//! let bind_address = "127.0.0.1:10001".parse().unwrap();
//! let remote_addresses = vec!["192.168.1.2:10002",
//! "192.168.1.3:10003",
//! "192.168.1.4:10004"]
//! .iter()
//! .map(|s| s.parse().unwrap())
//! .collect();
//!
//! let value = "Value #1".as_bytes().to_vec();
//!
//! let result = Node::new(bind_address, remote_addresses, Some(value))
//! .run();
//! println!("Consensus result {:?}", result);
//! }
//! ```
//!
//! Similar code shall then run on hosts 192.168.1.2, 192.168.1.3 and
//! 192.168.1.4 with appropriate changes in `bind_address` and
//! `remote_addresses`. Each host has it's own optional broadcast `value`. If
//! the consensus `result` is not an error then every successfully terminated
//! consensus node will be the same `result`.
#![feature(optin_builtin_traits)]
#[macro_use]
extern crate log;
extern crate crossbeam;
extern crate crossbeam_channel;
extern crate merkle;
extern crate protobuf;
extern crate ring;
#[macro_use]
extern crate crossbeam_channel;
extern crate reed_solomon_erasure;
extern crate ring;
pub mod agreement;
pub mod broadcast;
pub mod common_subset;
mod commst;
mod connection;
pub mod messaging;
pub mod proto;
mod proto_io;
pub mod node;
pub mod proto_io;

View File

@ -1,297 +1,16 @@
//! The local message delivery system.
use crossbeam::{Scope, ScopedJoinHandle};
use crossbeam_channel;
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use proto::Message;
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::RwLock;
/// Unique ID of a node.
pub type NodeUid = SocketAddr;
/// Type of algorithm primitive used in HoneyBadgerBFT.
///
/// TODO: Add the epoch parameter?
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum Algorithm {
/// Encryption stage.
Encryption,
/// Decryption stage.
Decryption,
/// Asynchronous Common Subset.
CommonSubset,
/// Reliable Broadcast instance.
Broadcast(NodeUid),
/// Binary Agreement instance.
Agreement(NodeUid),
}
impl Iterator for Algorithm {
type Item = String;
fn next(&mut self) -> Option<Self::Item> {
Some(format!("{:?}", self))
}
}
/// Type of proposed (encrypted) value for consensus.
pub type ProposedValue = Vec<u8>;
/// Kinds of messages sent between algorithm instances.
#[derive(Clone)]
pub enum AlgoMessage {
/// Asynchronous common subset input.
CommonSubsetInput(ProposedValue),
/// Asynchronous common subset output.
CommonSubsetOutput(HashSet<ProposedValue>),
/// Broadcast instance input.
BroadcastInput(ProposedValue),
/// Broadcast instance output.
BroadcastOutput(NodeUid, ProposedValue),
/// Binary agreement instance input.
AgreementInput(bool),
/// Binary agreement instance output.
AgreementOutput(NodeUid, bool),
}
/// A message sent between algorithm instances.
#[derive(Clone)]
pub struct LocalMessage {
/// Identifier of the message destination algorithm.
pub dst: Algorithm,
/// Payload
pub message: AlgoMessage,
}
/// The message destinations corresponding to a remote node `i`. It can be
/// either of the two:
///
/// 1) `All`: all nodes if sent to socket tasks, or all local algorithm
/// instances if received from socket tasks.
///
/// 2) `Node(i)`: node `i` or local algorithm instances with the node ID `i`.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RemoteNode {
All,
Node(NodeUid),
}
/// Message to or from a remote node.
#[derive(Clone, Debug, PartialEq)]
pub struct RemoteMessage {
pub node: RemoteNode,
pub message: Message<ProposedValue>,
}
/// The union type of local and remote messages.
#[derive(Clone)]
pub enum QMessage {
Local(LocalMessage),
Remote(RemoteMessage),
}
/// States of the message loop consided as an automaton with output. There is
/// one exit state `Finished` and one transitional (also initial) state
/// `Processing` whose argument is an output queue of messages to be sent to
/// remote nodes.
#[derive(Clone, PartialEq)]
pub enum MessageLoopState {
Processing(VecDeque<RemoteMessage>),
Finished,
}
impl MessageLoopState {
pub fn is_processing(&self) -> bool {
if let MessageLoopState::Processing(_) = self {
true
} else {
false
}
}
/// Appends pending messages of another state. Used to append messages
/// emitted by the handler to the messages already queued from previous
/// iterations of a message handling loop.
pub fn append(&mut self, other: &mut MessageLoopState) {
if let MessageLoopState::Processing(ref mut new_msgs) = other {
if let MessageLoopState::Processing(ref mut msgs) = self {
msgs.append(new_msgs);
}
}
}
}
/// Abstract type of message handler callback. A callback function has two
/// arguments: the sent message and the TX handle to send replies back to the
/// message loop. A call to the function returns either a new message loop state
/// - either `Finished` or a state with outgoing messages to remote nodes - or
/// an error.
pub trait Handler<HandlerError: From<Error>>: Send + Sync {
fn handle(&self, m: QMessage, tx: Sender<QMessage>) -> Result<MessageLoopState, HandlerError>;
}
/// The queue functionality for messages sent between algorithm instances.
pub struct MessageLoop<'a, HandlerError: 'a + From<Error>> {
/// Algorithm message handlers. Every message handler receives a message and
/// the TX handle of the incoming message queue for sending replies back to
/// the message loop.
algos: RwLock<HashMap<Algorithm, &'a Handler<HandlerError>>>,
/// TX handle of the message queue.
queue_tx: Sender<QMessage>,
/// RX handle of the message queue.
queue_rx: Receiver<QMessage>,
/// Remote send handles. Messages are sent through channels as opposed to
/// directly to sockets. This is done to make tests independent of socket
/// IO.
remote_txs: HashMap<NodeUid, Sender<Message<ProposedValue>>>,
}
impl<'a, HandlerError> MessageLoop<'a, HandlerError>
where
HandlerError: 'a + From<Error>,
{
pub fn new(remote_txs: HashMap<NodeUid, Sender<Message<ProposedValue>>>) -> Self {
let (queue_tx, queue_rx) = unbounded();
MessageLoop {
algos: RwLock::new(HashMap::new()),
queue_tx,
queue_rx,
remote_txs,
}
}
pub fn queue_tx(&self) -> Sender<QMessage> {
self.queue_tx.clone()
}
/// Registers a handler for messages sent to the given algorithm.
pub fn insert_algo(&'a self, algo: Algorithm, handler: &'a Handler<HandlerError>) {
let lock = self.algos.write();
if let Ok(mut map) = lock {
map.insert(algo, handler);
} else {
error!("Cannot insert {:?}", algo);
}
}
/// Unregisters the handler for messages sent to the given algorithm.
pub fn remove_algo(&self, algo: &Algorithm) {
let lock = self.algos.write();
if let Ok(mut map) = lock {
map.remove(algo);
} else {
error!("Cannot remove {:?}", algo);
}
}
/// The message loop.
pub fn run(&self) -> Result<MessageLoopState, HandlerError> {
let mut result = Ok(MessageLoopState::Processing(VecDeque::new()));
while let Ok(mut state) = result {
// Send any outgoing messages to remote nodes using the provided
// function.
(if let MessageLoopState::Processing(messages) = &state {
self.send_remote(messages)
.map(|_| MessageLoopState::Processing(VecDeque::new()))
.map_err(HandlerError::from)
} else {
Ok(MessageLoopState::Finished)
})?;
// Receive local and remote messages.
if let Ok(m) = self.queue_rx.recv() {
result = match m {
QMessage::Local(LocalMessage { dst, message }) => {
// FIXME: error handling
if let Some(mut handler) = self.algos.write().unwrap().get_mut(&dst) {
let mut new_result = handler.handle(
QMessage::Local(LocalMessage { dst, message }),
self.queue_tx.clone(),
);
if let Ok(ref mut new_state) = new_result {
state.append(new_state);
Ok(state)
} else {
// Error overrides the previous state.
new_result
}
} else {
Err(Error::NoSuchAlgorithm).map_err(HandlerError::from)
}
}
// A message FROM a remote node.
QMessage::Remote(RemoteMessage { node, message }) => {
// Multicast the message to all algorithm instances,
// collecting output messages iteratively and appending them
// to result.
//
// FIXME: error handling
self.algos.write().unwrap().iter_mut().fold(
Ok(state),
|result1, (_, handler)| {
if let Ok(mut state1) = result1 {
handler
.handle(
QMessage::Remote(RemoteMessage {
node: node.clone(),
message: message.clone(),
}),
self.queue_tx.clone(),
)
.map(|ref mut state2| {
state1.append(state2);
state1
})
} else {
result1
}
},
)
}
}
} else {
result = Err(Error::RecvError).map_err(HandlerError::from)
}
} // end of while loop
result
}
/// Send a message queue to remote nodes.
fn send_remote(&self, messages: &VecDeque<RemoteMessage>) -> Result<(), Error> {
messages.iter().fold(Ok(()), |result, m| {
if result.is_err() {
result
} else {
match m {
RemoteMessage {
node: RemoteNode::Node(uid),
message,
} => {
if let Some(tx) = self.remote_txs.get(&uid) {
tx.send(message.clone()).map_err(Error::from)
} else {
Err(Error::SendError)
}
}
RemoteMessage {
node: RemoteNode::All,
message,
} => self.remote_txs.iter().fold(result, |result1, (_, tx)| {
if result1.is_err() {
result1
} else {
tx.send(message.clone()).map_err(Error::from)
}
}),
}
}
})
}
/// Message sent by a given source. The sources are consensus nodes indexed 1
/// through N where N is the total number of nodes. Sourced messages are
/// required when it is essential to know the message origin but the set of
/// recepients is unknown without further computation which is irrelevant to the
/// message delivery task.
#[derive(Clone, Debug)]
pub struct SourcedMessage<T: Clone + Debug + Send + Sync> {
pub source: usize,
pub message: Message<T>,
}
/// Message destination can be either of the two:
@ -325,207 +44,3 @@ impl<T: Clone + Debug + Send + Sync> TargetedMessage<T> {
}
}
}
/// Message sent by a given source. The sources are consensus nodes indexed 1
/// through N where N is the total number of nodes. Sourced messages are
/// required when it is essential to know the message origin but the set of
/// recepients is unknown without further computation which is irrelevant to the
/// message delivery task.
#[derive(Clone, Debug)]
pub struct SourcedMessage<T: Clone + Debug + Send + Sync> {
pub source: usize,
pub message: Message<T>,
}
/// The messaging struct allows for targeted message exchange between comms
/// tasks on one side and algo tasks on the other.
pub struct Messaging<T: Clone + Debug + Send + Sync> {
/// The total number of consensus nodes for indexing purposes.
num_nodes: usize,
/// Transmit sides of message channels to comms threads.
txs_to_comms: Vec<Sender<Message<T>>>,
/// Receive side of the routed message channel from comms threads.
rx_from_comms: Receiver<SourcedMessage<T>>,
/// Transmit sides of message channels to algo threads.
txs_to_algo: Vec<Sender<SourcedMessage<T>>>,
/// Receive side of the routed message channel from comms threads.
rx_from_algo: Receiver<TargetedMessage<T>>,
/// RX handles to be used by comms tasks.
rxs_to_comms: Vec<Receiver<Message<T>>>,
/// TX handle to be used by comms tasks.
tx_from_comms: Sender<SourcedMessage<T>>,
/// RX handles to be used by algo tasks.
rxs_to_algo: Vec<Receiver<SourcedMessage<T>>>,
/// TX handle to be used by algo tasks.
tx_from_algo: Sender<TargetedMessage<T>>,
/// Control channel used to stop the listening thread.
stop_tx: Sender<()>,
stop_rx: Receiver<()>,
}
impl<T: Clone + Debug + Send + Sync> Messaging<T> {
/// Initialises all the required TX and RX handles for the case on a total
/// number `num_nodes` of consensus nodes.
pub fn new(num_nodes: usize) -> Self {
let to_comms: Vec<_> = (0..num_nodes - 1)
.map(|_| unbounded::<Message<T>>())
.collect();
let txs_to_comms = to_comms.iter().map(|&(ref tx, _)| tx.to_owned()).collect();
let rxs_to_comms: Vec<Receiver<Message<T>>> =
to_comms.iter().map(|&(_, ref rx)| rx.to_owned()).collect();
let (tx_from_comms, rx_from_comms) = unbounded();
let to_algo: Vec<_> = (0..num_nodes)
.map(|_| unbounded::<SourcedMessage<T>>())
.collect();
let txs_to_algo = to_algo.iter().map(|&(ref tx, _)| tx.to_owned()).collect();
let rxs_to_algo: Vec<Receiver<SourcedMessage<T>>> =
to_algo.iter().map(|&(_, ref rx)| rx.to_owned()).collect();
let (tx_from_algo, rx_from_algo) = unbounded();
let (stop_tx, stop_rx) = bounded(1);
Messaging {
num_nodes,
// internally used handles
txs_to_comms,
rx_from_comms,
txs_to_algo,
rx_from_algo,
// externally used handles
rxs_to_comms,
tx_from_comms,
rxs_to_algo,
tx_from_algo,
stop_tx,
stop_rx,
}
}
pub fn num_nodes(&self) -> usize {
self.num_nodes
}
pub fn rxs_to_comms(&self) -> &Vec<Receiver<Message<T>>> {
&self.rxs_to_comms
}
pub fn tx_from_comms(&self) -> &Sender<SourcedMessage<T>> {
&self.tx_from_comms
}
pub fn rxs_to_algo(&self) -> &Vec<Receiver<SourcedMessage<T>>> {
&self.rxs_to_algo
}
pub fn tx_from_algo(&self) -> &Sender<TargetedMessage<T>> {
&self.tx_from_algo
}
/// Gives the ownership of the handle to stop the message receive loop.
pub fn stop_tx(&self) -> Sender<()> {
self.stop_tx.to_owned()
}
/// Spawns the message delivery thread in a given thread scope.
pub fn spawn<'a>(&self, scope: &Scope<'a>) -> ScopedJoinHandle<Result<(), Error>>
where
T: 'a,
{
let txs_to_comms = self.txs_to_comms.to_owned();
let rx_from_comms = self.rx_from_comms.to_owned();
let txs_to_algo = self.txs_to_algo.to_owned();
let rx_from_algo = self.rx_from_algo.to_owned();
let stop_rx = self.stop_rx.to_owned();
let mut stop = false;
// TODO: `select_loop!` seems to really confuse Clippy.
#[cfg_attr(
feature = "cargo-clippy",
allow(never_loop, if_let_redundant_pattern_matching, deref_addrof)
)]
scope.spawn(move || {
let mut result = Ok(());
// This loop forwards messages according to their metadata.
while !stop && result.is_ok() {
select_loop! {
recv(rx_from_algo, message) => {
match message {
TargetedMessage {
target: Target::All,
message
} => {
// Send the message to all remote nodes, stopping at
// the first error.
result = txs_to_comms.iter()
.fold(Ok(()), |result, tx| {
if result.is_ok() {
tx.send(message.clone())
}
else {
result
}
}).map_err(Error::from);
},
TargetedMessage {
target: Target::Node(i),
message
} => {
// Remote node indices start from 1.
assert!(i > 0);
// Convert node index to vector index.
let i = i - 1;
result = if i < txs_to_comms.len() {
txs_to_comms[i].send(message.clone())
.map_err(Error::from)
}
else {
Err(Error::NoSuchTarget)
};
}
}
},
recv(rx_from_comms, message) => {
// Send the message to all algorithm instances, stopping at
// the first error.
result = txs_to_algo.iter().fold(Ok(()), |result, tx| {
if result.is_ok() {
tx.send(message.clone())
}
else {
result
}
}).map_err(Error::from)
},
recv(stop_rx, _) => {
// Flag the thread ready to exit.
stop = true;
}
}
} // end of select_loop!
result
})
}
}
#[derive(Clone, Debug)]
pub enum Error {
NoSuchAlgorithm,
NoSuchRemote,
RecvError,
NoSuchTarget,
SendError,
}
impl<T> From<crossbeam_channel::SendError<T>> for Error {
fn from(_: crossbeam_channel::SendError<T>) -> Error {
Error::SendError
}
}

View File

@ -26,6 +26,7 @@ pub enum BroadcastMessage<T: Send + Sync> {
Ready(Vec<u8>),
}
/// Wrapper for a byte array, whose `Debug` implementation outputs shortened hexadecimal strings.
pub struct HexBytes<'a>(pub &'a [u8]);
impl<'a> fmt::Debug for HexBytes<'a> {
@ -73,7 +74,7 @@ impl<T: Send + Sync + fmt::Debug> fmt::Debug for BroadcastMessage<T> {
}
/// Messages sent during the binary Byzantine agreement stage.
#[derive(Clone, Debug, PartialEq)]
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum AgreementMessage {
BVal(bool),
Aux(bool),
@ -166,6 +167,7 @@ impl<T: Send + Sync> BroadcastMessage<T> {
BroadcastMessage::Ready(h) => {
let mut r = ReadyProto::new();
r.set_root_hash(h);
b.set_ready(r);
}
}
b

View File

@ -3,9 +3,9 @@
use proto::*;
use protobuf;
use protobuf::Message as ProtobufMessage;
use std::io::Read;
use std::io;
use std::io::{Read, Write};
use std::net::TcpStream;
use std::{cmp, io};
/// A magic key to put right before each message. An atavism of primitive serial
/// protocols.
@ -35,95 +35,48 @@ impl From<protobuf::ProtobufError> for Error {
}
}
fn encode_u32_to_be(value: u32, buffer: &mut [u8]) -> Result<(), Error> {
if buffer.len() < 4 {
return Err(Error::EncodeError);
}
let value = value.to_le();
buffer[0] = ((value & 0xFF00_0000) >> 24) as u8;
buffer[1] = ((value & 0x00FF_0000) >> 16) as u8;
buffer[2] = ((value & 0x0000_FF00) >> 8) as u8;
buffer[3] = (value & 0x0000_00FF) as u8;
Ok(())
pub struct ProtoIo<S: Read + Write> {
stream: S,
}
fn decode_u32_from_be(buffer: &[u8]) -> Result<u32, Error> {
if buffer.len() < 4 {
return Err(Error::DecodeError);
impl ProtoIo<TcpStream> {
pub fn try_clone(&self) -> Result<ProtoIo<TcpStream>, ::std::io::Error> {
Ok(ProtoIo {
stream: self.stream.try_clone()?,
})
}
let mut result = u32::from(buffer[0]);
result <<= 8;
result += u32::from(buffer[1]);
result <<= 8;
result += u32::from(buffer[2]);
result <<= 8;
result += u32::from(buffer[3]);
Ok(result)
}
pub struct ProtoIo {
stream: TcpStream,
buffer: [u8; 1024 * 4],
}
/// A message handling task.
impl ProtoIo
impl<S: Read + Write> ProtoIo<S>
//where T: Clone + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>
{
pub fn from_stream(stream: TcpStream) -> Self {
ProtoIo {
stream,
buffer: [0; 1024 * 4],
}
}
pub fn try_clone(&self) -> Result<ProtoIo, ::std::io::Error> {
Ok(ProtoIo {
stream: self.stream.try_clone()?,
buffer: self.buffer,
})
pub fn from_stream(stream: S) -> Self {
ProtoIo { stream }
}
pub fn recv<T>(&mut self) -> Result<Message<T>, Error>
where
T: Clone + Send + Sync + From<Vec<u8>>, // + Into<Vec<u8>>
{
self.stream.read_exact(&mut self.buffer[0..4])?;
let frame_start = decode_u32_from_be(&self.buffer[0..4])?;
if frame_start != FRAME_START {
let mut stream = protobuf::CodedInputStream::new(&mut self.stream);
// Read magic number
if stream.read_raw_varint32()? != FRAME_START {
return Err(Error::FrameStartMismatch);
};
self.stream.read_exact(&mut self.buffer[0..4])?;
let size = decode_u32_from_be(&self.buffer[0..4])? as usize;
let mut message_v: Vec<u8> = Vec::new();
message_v.reserve(size);
while message_v.len() < size {
let num_to_read = cmp::min(self.buffer.len(), size - message_v.len());
let (slice, _) = self.buffer.split_at_mut(num_to_read);
self.stream.read_exact(slice)?;
message_v.extend_from_slice(slice);
}
Message::parse_from_bytes(&message_v).map_err(Error::ProtobufError)
Message::from_proto(stream.read_message()?).ok_or(Error::DecodeError)
}
pub fn send<T>(&mut self, message: Message<T>) -> Result<(), Error>
where
T: Clone + Send + Sync + Into<Vec<u8>>,
{
let mut buffer: [u8; 4] = [0; 4];
// Wrap stream
let mut stream = protobuf::CodedOutputStream::new(&mut self.stream);
// Write magic number
encode_u32_to_be(FRAME_START, &mut buffer[0..4])?;
stream.write_raw_bytes(&buffer)?;
stream.write_raw_varint32(FRAME_START)?;
let message_p = message.into_proto();
// Write message size
encode_u32_to_be(message_p.compute_size(), &mut buffer[0..4])?;
stream.write_raw_bytes(&buffer)?;
// Write message
message_p.write_to(&mut stream)?;
message_p.write_length_delimited_to(&mut stream)?;
// Flush
stream.flush()?;
Ok(())
@ -133,14 +86,23 @@ impl ProtoIo
#[cfg(test)]
mod tests {
use proto_io::*;
use std::io::Cursor;
/// Test the requirement that composing encoding with decoding yields the
/// identity.
#[test]
fn encode_decode_identity() {
let mut buffer: [u8; 4] = [0; 4];
encode_u32_to_be(FRAME_START, &mut buffer[0..4]).unwrap();
let frame_start = decode_u32_from_be(&buffer[0..4]).unwrap();
assert_eq!(frame_start, FRAME_START);
fn encode_decode_message() {
let msg0: Message<Vec<u8>> =
Message::Broadcast(BroadcastMessage::Ready(b"Test 0".to_vec()));
let msg1: Message<Vec<u8>> =
Message::Broadcast(BroadcastMessage::Ready(b"Test 1".to_vec()));
let mut pio = ProtoIo::from_stream(Cursor::new(Vec::new()));
pio.send(msg0.clone()).expect("send msg0");
pio.send(msg1.clone()).expect("send msg1");
println!("{:?}", pio.stream.get_ref());
pio.stream.set_position(0);
assert_eq!(msg0, pio.recv().expect("recv msg0"));
// TODO: Figure out why the cursor is wrong here.
let len = pio.stream.get_ref().len() as u64;
pio.stream.set_position(len / 2);
assert_eq!(msg1, pio.recv().expect("recv msg1"));
}
}

View File

@ -14,12 +14,13 @@ use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
use std::fmt;
use hbbft::broadcast::{Broadcast, BroadcastTarget, TargetedBroadcastMessage};
use hbbft::messaging::ProposedValue;
use hbbft::proto::BroadcastMessage;
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy)]
struct NodeId(usize);
type ProposedValue = Vec<u8>;
type MessageQueue = VecDeque<TargetedBroadcastMessage<NodeId>>;
/// A "node" running a broadcast instance.
@ -294,7 +295,7 @@ fn test_8_broadcast_equal_leaves() {
let adversary = SilentAdversary::new(MessageScheduler::Random);
// Space is ASCII character 32. So 32 spaces will create shards that are all equal, even if the
// length of the value is inserted.
test_broadcast(TestNetwork::new(8, 0, adversary), &vec![b' '; 32]);
test_broadcast(TestNetwork::new(8, 0, adversary), &[b' '; 32]);
}
// TODO: Unignore once node numbers are supported that are not powers of two.