Fix broadcast and example, enable more tests.

This fixes several issues with the broadcast algorithm and enables the
previously ignored tests that now pass:
* Don't decide on a root hash based on anyone's `Echo` message.
* Work around the `merkle` crate's inability to produce the proof of the
  `i`-th leaf for a given index `i`.
* Ignore messages from unknown nodes.
* Avoid decoding multiple times.
* Document the full algorithm.
* Don't count multiple `Echo` or `Ready` messages coming from the same
  node.
* Fix index computation for a given proof.
* Move `BroadcastMessage` into `broadcast` to make the module more
  self-contained.

The example now only executes a single broadcast instance, expecting the
first node (the one with the lexicographically lowest address) to
propose a value. A shell script is added that runs for example nodes.

Use env_logger instead of simple_logger, so the log level can be controlled
with an environment variable. You can e.g. log all output from the broadcast
test and the crate itself in debug mode with:
RUST_LOG=hbbft=debug,broadcast=debug

Some debugging messages are more concise now and use hexadecimal
notation instead of printing arrays of decimal values.

An indentation error in the Travis script is also fixed.
This commit is contained in:
Andreas Fackler 2018-05-08 16:20:32 +02:00
parent 8af526d61f
commit b98bbe9dcd
14 changed files with 513 additions and 481 deletions

View File

@ -17,9 +17,11 @@ env:
global:
- RUST_BACKTRACE=1
- RUSTFLAGS="-D warnings"
script:
- cargo fmt -- --write-mode=diff
- cargo clippy -- -D clippy
- cargo clippy --tests -- -D clippy
- cargo check --tests
- cargo test
script:
# TODO: This currently fails, claiming that `src/proto/message.rs` does not
# exist. Re-enable once the problem is resolved.
# - cargo fmt -- --write-mode=diff
- cargo clippy -- -D clippy
- cargo clippy --tests -- -D clippy
- cargo check --tests
- cargo test

View File

@ -4,8 +4,8 @@ version = "0.1.0"
authors = ["Vladimir Komendantskiy <komendantsky@gmail.com>"]
[dependencies]
env_logger = "0.5.10"
log = "0.4.1"
simple_logger = "0.5"
reed-solomon-erasure = "3.0"
merkle = { git = "https://github.com/vkomenda/merkle.rs", branch = "public-proof" }
ring = "^0.12"

View File

@ -4,10 +4,10 @@ extern crate crossbeam;
#[macro_use]
extern crate crossbeam_channel;
extern crate docopt;
extern crate env_logger;
extern crate hbbft;
#[macro_use]
extern crate log;
extern crate simple_logger;
mod network;
@ -55,7 +55,7 @@ fn parse_args() -> Args {
}
pub fn main() {
simple_logger::init_with_level(log::Level::Debug).unwrap();
env_logger::init();
let args: Args = parse_args();
println!("{:?}", args);
let node = Node::new(args.bind_address, args.remote_addresses, args.value);

View File

@ -25,7 +25,7 @@ impl From<io::Error> for Error {
/// A communication task connects a remote node to the thread that manages the
/// consensus algorithm.
pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>> {
pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + AsRef<[u8]>> {
/// The transmit side of the multiple producer channel from comms threads.
tx: &'a Sender<SourcedMessage<T>>,
/// The receive side of the channel to the comms thread.
@ -36,7 +36,7 @@ pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + I
pub node_index: usize,
}
impl<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>> CommsTask<'a, T> {
impl<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + AsRef<[u8]>> CommsTask<'a, T> {
pub fn new(
tx: &'a Sender<SourcedMessage<T>>,
rx: &'a Receiver<Message<T>>,
@ -72,7 +72,6 @@ impl<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>> Co
loop {
// Receive a multicast message from the manager thread.
let message = rx.recv().unwrap();
debug!("Node {} <- {:?}", node_index, message);
// Forward the message to the remote node.
io1.send(message).unwrap();
}
@ -83,7 +82,6 @@ impl<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>> Co
loop {
match self.io.recv() {
Ok(message) => {
debug!("Node {} -> {:?}", node_index, message);
tx.send(SourcedMessage {
source: node_index,
message,

View File

@ -1,6 +1,6 @@
//! Connection data and initiation routines.
use std::collections::HashSet;
use std::collections::{BTreeMap, HashSet};
use std::io::BufReader;
use std::net::{SocketAddr, TcpListener, TcpStream};
@ -8,61 +8,47 @@ use std::net::{SocketAddr, TcpListener, TcpStream};
pub struct Connection {
pub stream: TcpStream,
pub reader: BufReader<TcpStream>,
pub node_str: String,
}
impl Connection {
pub fn new(stream: TcpStream) -> Self {
pub fn new(stream: TcpStream, node_str: String) -> Self {
Connection {
// Create a read buffer of 1K bytes.
reader: BufReader::with_capacity(1024, stream.try_clone().unwrap()),
stream,
node_str,
}
}
}
/// Connect this node to remote peers. A vector of successful connections is
/// returned.
pub fn make(bind_address: &SocketAddr, remote_addresses: &HashSet<SocketAddr>) -> Vec<Connection> {
// Connected remote nodes.
// let mut connected: Vec<SocketAddr> = Vec::new();
/// Connect this node to remote peers. A vector of successful connections is returned, as well as
/// our own node ID.
pub fn make(
bind_address: &SocketAddr,
remote_addresses: &HashSet<SocketAddr>,
) -> (String, Vec<Connection>) {
// Listen for incoming connections on a given TCP port.
let bind_address = bind_address;
let listener = TcpListener::bind(bind_address).unwrap();
// Initialise initial connection states.
let mut connections: Vec<Option<Connection>> = (0..remote_addresses.len())
.into_iter()
.map(|_| None)
.collect();
let listener = TcpListener::bind(bind_address).expect("start listener");
let here_str = format!("{}", bind_address);
// Use a `BTreeMap` to make sure we all iterate in the same order.
let remote_by_str: BTreeMap<String, _> = remote_addresses
.iter()
.map(|addr| (format!("{}", addr), addr))
.filter(|(there_str, _)| *there_str != here_str)
.collect();
// Wait for all nodes with larger addresses to connect.
for (n, &address) in remote_addresses.iter().enumerate() {
let there_str = format!("{}", address);
if here_str < there_str {
connections[n] = match listener.accept() {
Ok((stream, _)) => {
info!("Connected to {}", there_str);
Some(Connection::new(stream))
}
Err(_) => None,
}
}
}
// Try to connect to all nodes with smaller addresses.
for (n, &address) in remote_addresses.iter().enumerate() {
let there_str = format!("{}", address);
if here_str > there_str {
connections[n] = match TcpStream::connect(address) {
Ok(stream) => {
info!("Connected to {}", there_str);
Some(Connection::new(stream))
}
Err(_) => None,
}
}
}
// remove Nones from connections
connections.into_iter().filter_map(|c| c).collect()
let connections = remote_by_str
.into_iter()
.map(|(there_str, address)| {
let tcp_conn = if here_str < there_str {
listener.accept().expect("failed to connect").0
} else {
TcpStream::connect(address).expect("failed to connect")
};
Connection::new(tcp_conn, there_str.to_string())
})
.collect();
(here_str, connections)
}

View File

@ -9,13 +9,13 @@ use std::fmt::Debug;
/// The queue functionality for messages sent between algorithm instances.
/// The messaging struct allows for targeted message exchange between comms
/// tasks on one side and algo tasks on the other.
pub struct Messaging<T: Clone + Debug + Send + Sync> {
pub struct Messaging<T: Clone + Debug + Send + Sync + AsRef<[u8]>> {
/// Transmit sides of message channels to comms threads.
txs_to_comms: Vec<Sender<Message<T>>>,
/// Receive side of the routed message channel from comms threads.
rx_from_comms: Receiver<SourcedMessage<T>>,
/// Transmit sides of message channels to algo threads.
txs_to_algo: Vec<Sender<SourcedMessage<T>>>,
/// Transmit sides of message channels to algo thread.
tx_to_algo: Sender<SourcedMessage<T>>,
/// Receive side of the routed message channel from comms threads.
rx_from_algo: Receiver<TargetedMessage<T>>,
@ -23,9 +23,9 @@ pub struct Messaging<T: Clone + Debug + Send + Sync> {
rxs_to_comms: Vec<Receiver<Message<T>>>,
/// TX handle to be used by comms tasks.
tx_from_comms: Sender<SourcedMessage<T>>,
/// RX handles to be used by algo tasks.
rxs_to_algo: Vec<Receiver<SourcedMessage<T>>>,
/// TX handle to be used by algo tasks.
/// RX handles to be used by algo task.
rx_to_algo: Receiver<SourcedMessage<T>>,
/// TX handle to be used by algo task.
tx_from_algo: Sender<TargetedMessage<T>>,
/// Control channel used to stop the listening thread.
@ -33,23 +33,17 @@ pub struct Messaging<T: Clone + Debug + Send + Sync> {
stop_rx: Receiver<()>,
}
impl<T: Clone + Debug + Send + Sync> Messaging<T> {
impl<T: Clone + Debug + Send + Sync + AsRef<[u8]>> Messaging<T> {
/// Initialises all the required TX and RX handles for the case on a total
/// number `num_nodes` of consensus nodes.
pub fn new(num_nodes: usize) -> Self {
let to_comms: Vec<_> = (0..num_nodes - 1)
.map(|_| unbounded::<Message<T>>())
.collect();
let to_comms: Vec<_> = (0..num_nodes).map(|_| unbounded::<Message<T>>()).collect();
let txs_to_comms = to_comms.iter().map(|&(ref tx, _)| tx.to_owned()).collect();
let rxs_to_comms: Vec<Receiver<Message<T>>> =
to_comms.iter().map(|&(_, ref rx)| rx.to_owned()).collect();
let (tx_from_comms, rx_from_comms) = unbounded();
let to_algo: Vec<_> = (0..num_nodes)
.map(|_| unbounded::<SourcedMessage<T>>())
.collect();
let txs_to_algo = to_algo.iter().map(|&(ref tx, _)| tx.to_owned()).collect();
let rxs_to_algo: Vec<Receiver<SourcedMessage<T>>> =
to_algo.iter().map(|&(_, ref rx)| rx.to_owned()).collect();
let (tx_to_algo, rx_to_algo) = unbounded();
let (tx_from_algo, rx_from_algo) = unbounded();
let (stop_tx, stop_rx) = bounded(1);
@ -58,13 +52,13 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
// internally used handles
txs_to_comms,
rx_from_comms,
txs_to_algo,
tx_to_algo,
rx_from_algo,
// externally used handles
rxs_to_comms,
tx_from_comms,
rxs_to_algo,
rx_to_algo,
tx_from_algo,
stop_tx,
@ -80,8 +74,8 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
&self.tx_from_comms
}
pub fn rxs_to_algo(&self) -> &Vec<Receiver<SourcedMessage<T>>> {
&self.rxs_to_algo
pub fn rx_to_algo(&self) -> &Receiver<SourcedMessage<T>> {
&self.rx_to_algo
}
pub fn tx_from_algo(&self) -> &Sender<TargetedMessage<T>> {
@ -100,7 +94,7 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
{
let txs_to_comms = self.txs_to_comms.to_owned();
let rx_from_comms = self.rx_from_comms.to_owned();
let txs_to_algo = self.txs_to_algo.to_owned();
let tx_to_algo = self.tx_to_algo.to_owned();
let rx_from_algo = self.rx_from_algo.to_owned();
let stop_rx = self.stop_rx.to_owned();
@ -128,8 +122,7 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
.fold(Ok(()), |result, tx| {
if result.is_ok() {
tx.send(message.clone())
}
else {
} else {
result
}
}).map_err(Error::from);
@ -138,16 +131,10 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
target: Target::Node(i),
message
} => {
// Remote node indices start from 1.
assert!(i > 0);
// Convert node index to vector index.
let i = i - 1;
result = if i < txs_to_comms.len() {
txs_to_comms[i].send(message.clone())
.map_err(Error::from)
}
else {
} else {
Err(Error::NoSuchTarget)
};
}
@ -156,14 +143,7 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
recv(rx_from_comms, message) => {
// Send the message to all algorithm instances, stopping at
// the first error.
result = txs_to_algo.iter().fold(Ok(()), |result, tx| {
if result.is_ok() {
tx.send(message.clone())
}
else {
result
}
}).map_err(Error::from)
result = tx_to_algo.send(message.clone()).map_err(Error::from)
},
recv(stop_rx, _) => {
// Flag the thread ready to exit.

View File

@ -37,9 +37,9 @@
use crossbeam;
use std::collections::HashSet;
use std::fmt::Debug;
use std::io;
use std::marker::{Send, Sync};
use std::net::SocketAddr;
use std::{io, iter, process};
use hbbft::broadcast;
use network::commst;
@ -50,7 +50,6 @@ use network::messaging::Messaging;
pub enum Error {
IoError(io::Error),
CommsError(commst::Error),
NotImplemented,
}
impl From<io::Error> for Error {
@ -90,54 +89,64 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
/// Consensus node procedure implementing HoneyBadgerBFT.
pub fn run(&self) -> Result<T, Error> {
let value = &self.value;
let connections = connection::make(&self.addr, &self.remotes);
let (our_str, connections) = connection::make(&self.addr, &self.remotes);
let mut node_strs: Vec<String> = iter::once(our_str.clone())
.chain(connections.iter().map(|c| c.node_str.clone()))
.collect();
node_strs.sort();
debug!("Nodes: {:?}", node_strs);
let proposer_id = 0;
let our_id = node_strs.binary_search(&our_str).unwrap();
let num_nodes = connections.len() + 1;
if value.is_some() != (our_id == proposer_id) {
panic!("Exactly the first node must propose a value.");
}
// Initialise the message delivery system and obtain TX and RX handles.
let messaging: Messaging<Vec<u8>> = Messaging::new(num_nodes);
let rxs_to_comms = messaging.rxs_to_comms();
let tx_from_comms = messaging.tx_from_comms();
let rxs_to_algo = messaging.rxs_to_algo();
let rx_to_algo = messaging.rx_to_algo();
let tx_from_algo = messaging.tx_from_algo();
let stop_tx = messaging.stop_tx();
// All spawned threads will have exited by the end of the scope.
crossbeam::scope(|scope| {
// Start the centralised message delivery system.
let msg_handle = messaging.spawn(scope);
let mut broadcast_handles = Vec::new();
let _msg_handle = messaging.spawn(scope);
// Associate a broadcast instance with this node. This instance will
// broadcast the proposed value. There is no remote node
// corresponding to this instance, and no dedicated comms task. The
// node index is 0.
let rx_to_algo0 = &rxs_to_algo[0];
broadcast_handles.push(scope.spawn(move || {
let broadcast_handle = scope.spawn(move || {
match broadcast::Instance::new(
tx_from_algo,
rx_to_algo0,
rx_to_algo,
value.to_owned(),
num_nodes,
0,
(0..num_nodes).collect(),
our_id,
proposer_id,
).run()
{
Ok(t) => {
debug!(
"Broadcast instance 0 succeeded: {}",
"Broadcast succeeded: {}",
String::from_utf8(T::into(t)).unwrap()
);
}
Err(e) => error!("Broadcast instance 0: {:?}", e),
Err(e) => error!("Broadcast instance: {:?}", e),
}
}));
});
// Start a comms task for each connection. Node indices of those
// tasks are 1 through N where N is the number of connections.
for (i, c) in connections.iter().enumerate() {
// Receive side of a single-consumer channel from algorithm
// actor tasks to the comms task.
let rx_to_comms = &rxs_to_comms[i];
let node_index = i + 1;
let node_index = if c.node_str < our_str { i } else { i + 1 };
let rx_to_comms = &rxs_to_comms[node_index];
scope.spawn(move || {
match commst::CommsTask::new(
@ -152,35 +161,11 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
Err(e) => error!("Comms task {}: {:?}", node_index, e),
}
});
// Associate a broadcast instance to the above comms task.
let rx_to_algo = &rxs_to_algo[node_index];
broadcast_handles.push(scope.spawn(move || {
match broadcast::Instance::new(
tx_from_algo,
rx_to_algo,
None,
num_nodes,
node_index,
).run()
{
Ok(t) => {
debug!(
"Broadcast instance {} succeeded: {}",
node_index,
String::from_utf8(T::into(t)).unwrap()
);
}
Err(e) => error!("Broadcast instance {}: {:?}", node_index, e),
}
}));
}
// Wait for the broadcast instances to finish before stopping the
// messaging task.
for h in broadcast_handles {
h.join();
}
broadcast_handle.join();
// Stop the messaging task.
stop_tx
@ -190,30 +175,14 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
})
.unwrap();
match msg_handle.join() {
Ok(()) => debug!("Messaging stopped OK"),
Err(e) => debug!("Messaging error: {:?}", e),
}
// TODO: continue the implementation of the asynchronous common
// subset algorithm.
Err(Error::NotImplemented)
process::exit(0);
// TODO: Exit cleanly.
// match msg_handle.join() {
// Ok(()) => debug!("Messaging stopped OK"),
// Err(e) => debug!("Messaging error: {:?}", e),
// }
// Err(Error::NotImplemented)
}) // end of thread scope
}
}
// #[cfg(test)]
// mod tests {
// use std::collections::HashSet;
// use node;
// /// Test that the node works to completion.
// #[test]
// fn test_node_0() {
// let node = node::Node::new("127.0.0.1:10000".parse().unwrap(),
// HashSet::new(),
// Some("abc".as_bytes().to_vec()));
// let result = node.run();
// assert!(match result { Err(node::Error::NotImplemented) => true,
// _ => false });
// }
// }

View File

@ -0,0 +1,10 @@
#! /bin/bash
cargo build --example consensus-node
cargo run --example consensus-node -- --bind-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --value=Foo &
sleep 1
cargo run --example consensus-node -- --bind-address=127.0.0.1:5001 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 &
sleep 1
cargo run --example consensus-node -- --bind-address=127.0.0.1:5002 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5003 &
sleep 1
cargo run --example consensus-node -- --bind-address=127.0.0.1:5003 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 &

View File

@ -1,12 +1,11 @@
//! Reliable broadcast algorithm instance.
use crossbeam_channel::{Receiver, RecvError, SendError, Sender};
use merkle::proof::{Lemma, Positioned, Proof};
use merkle::{Hashable, MerkleTree};
use proto::*;
use reed_solomon_erasure as rse;
use reed_solomon_erasure::ReedSolomon;
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt::Debug;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt::{self, Debug};
use std::hash::Hash;
use std::iter;
use std::marker::{Send, Sync};
@ -19,6 +18,41 @@ type ProposedValue = Vec<u8>;
type MessageQueue<NodeUid> = VecDeque<TargetedBroadcastMessage<NodeUid>>;
/// The three kinds of message sent during the reliable broadcast stage of the
/// consensus algorithm.
#[derive(Clone, PartialEq)]
pub enum BroadcastMessage<T: Send + Sync> {
Value(Proof<T>),
Echo(Proof<T>),
Ready(Vec<u8>),
}
impl BroadcastMessage<ProposedValue> {
fn target_all<NodeUid>(self) -> TargetedBroadcastMessage<NodeUid> {
TargetedBroadcastMessage {
target: BroadcastTarget::All,
message: self,
}
}
fn target_node<NodeUid>(self, id: NodeUid) -> TargetedBroadcastMessage<NodeUid> {
TargetedBroadcastMessage {
target: BroadcastTarget::Node(id),
message: self,
}
}
}
impl<T: Send + Sync + Debug + AsRef<[u8]>> fmt::Debug for BroadcastMessage<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
BroadcastMessage::Value(ref v) => write!(f, "Value({:?})", HexProof(&v)),
BroadcastMessage::Echo(ref v) => write!(f, "Echo({:?})", HexProof(&v)),
BroadcastMessage::Ready(ref bytes) => write!(f, "Ready({:?})", HexBytes(bytes)),
}
}
}
/// A `BroadcastMessage` to be sent out, together with a target.
#[derive(Clone, Debug)]
pub struct TargetedBroadcastMessage<NodeUid> {
@ -26,14 +60,11 @@ pub struct TargetedBroadcastMessage<NodeUid> {
pub message: BroadcastMessage<ProposedValue>,
}
impl TargetedBroadcastMessage<usize> {
pub fn into_targeted_message(self) -> TargetedMessage<ProposedValue> {
impl From<TargetedBroadcastMessage<usize>> for TargetedMessage<ProposedValue> {
fn from(msg: TargetedBroadcastMessage<usize>) -> TargetedMessage<ProposedValue> {
TargetedMessage {
target: match self.target {
BroadcastTarget::All => Target::All,
BroadcastTarget::Node(node) => Target::Node(node),
},
message: Message::Broadcast(self.message),
target: msg.target.into(),
message: Message::Broadcast(msg.message),
}
}
}
@ -45,25 +76,93 @@ pub enum BroadcastTarget<NodeUid> {
Node(NodeUid),
}
struct BroadcastState {
root_hash: Option<Vec<u8>>,
leaf_values: Vec<Option<Box<[u8]>>>,
leaf_values_num: usize,
echo_num: usize,
readys: HashMap<Vec<u8>, usize>,
impl From<BroadcastTarget<usize>> for Target {
fn from(bt: BroadcastTarget<usize>) -> Target {
match bt {
BroadcastTarget::All => Target::All,
BroadcastTarget::Node(node) => Target::Node(node),
}
}
}
struct BroadcastState<NodeUid: Eq + Hash + Ord> {
/// Whether we have already multicas `Echo`.
echo_sent: bool,
/// Whether we have already multicast `Ready`.
ready_sent: bool,
ready_to_decode: bool,
/// Whether we have already output a value.
has_output: bool,
/// The proofs we have received via `Echo` messages, by sender ID.
echos: BTreeMap<NodeUid, Proof<Vec<u8>>>,
/// The root hashes we received via `Ready` messages, by sender ID.
readys: BTreeMap<NodeUid, Vec<u8>>,
}
impl<NodeUid: Eq + Hash + Ord> BroadcastState<NodeUid> {
/// Returns the number of nodes that have sent us an `Echo` message with this hash.
fn count_echos(&self, hash: &[u8]) -> usize {
self.echos
.values()
.filter(|p| p.root_hash.as_slice() == hash)
.count()
}
/// Returns the number of nodes that have sent us a `Ready` message with this hash.
fn count_readys(&self, hash: &[u8]) -> usize {
self.readys
.values()
.filter(|h| h.as_slice() == hash)
.count()
}
}
/// Reliable Broadcast algorithm instance.
pub struct Broadcast<NodeUid: Eq + Hash> {
///
/// The Reliable Broadcast Protocol assumes a network of `N` nodes that send signed messages to
/// each other, with at most `f` of them malicious, where `3 * f < N`. Handling the networking and
/// signing is the responsibility of this crate's user: only when a message has been verified to be
/// "from node i", it can be handed to the `Broadcast` instance. One of the nodes is the "proposer"
/// who sends a value. Under the above conditions, the protocol guarantees that either all or none
/// of the good nodes output a value, and that if the proposer is good, all good nodes output the
/// proposed value.
///
/// The algorithm works as follows:
///
/// * The proposer uses a Reed-Solomon code to split the value into `N` chunks, `f + 1` of which
/// suffice to reconstruct the value. These chunks are put into a Merkle tree, so that with the
/// tree's root hash `h`, branch `bi` and chunk `si`, the `i`-th chunk `si` can be verified by
/// anyone to belong to the Merkle tree with root hash `h`. These values are "proof" number `i`:
/// `pi`.
/// * The proposer sends `Value(pi)` to node `i`. It translates to: "I am the proposer, and `pi`
/// contains the `i`-th share of my value."
/// * Every (good) node that receives `Value(pi)` from the proposer sends it on to everyone else as
/// `Echo(pi)`. An `Echo` translates to: "I have received `pi` directly from the proposer." If the
/// proposer sends another `Value` message, that is ignored.
/// * So every node that has received at least `f + 1` `Echo` messages with the same root
/// hash will be able to decode a value.
/// * Every node that has received `N - f` `Echo`s with the same root hash from different nodes
/// knows that at least `f + 1` _good_ nodes have sent an `Echo` with that hash to everyone, and
/// therefore everyone will eventually receive at least `f + 1` of them. So upon receiving `N - f`
/// `Echo`s, they send a `Ready(h)` to everyone to indicate that. `Ready` translates to: "I know
/// that everyone will eventually be able to decode the value." Moreover, since every good node
/// only ever sends one kind of `Echo` message, this cannot happen for two different root hashes.
/// * Even without enough `Echo` messages, if a node receives `f + 1` `Ready` messages, it knows
/// that at least one _good_ node has sent `Ready`. It therefore also knows that everyone will be
/// able to decode eventually, and multicasts `Ready` itself.
/// * If a node has received `2 * f + 1` `Ready`s (with matching root hash) from different nodes,
/// it knows that at least `f + 1` _good_ nodes have sent it. Therefore, every good node will
/// eventually receive `f + 1`, and multicast it itself. Therefore, every good node will eventually
/// receive `2 * f + 1` `Ready`s, too. _And_ we know at this point that every good node will
/// eventually be able to decode (i.e. receive at least `f + 1` `Echo` messages).
/// * So a node with `2 * f + 1` `Ready`s and `f + 1` `Echos` will decode and _output_ the value,
/// knowing that every other good node will eventually do the same.
pub struct Broadcast<NodeUid: Eq + Hash + Ord> {
/// The UID of this node.
our_id: NodeUid,
/// The UID of the sending node.
proposer_id: NodeUid,
/// UIDs of all nodes for iteration purposes.
all_uids: HashSet<NodeUid>,
all_uids: BTreeSet<NodeUid>,
num_nodes: usize,
num_faulty_nodes: usize,
data_shard_num: usize,
@ -71,16 +170,16 @@ pub struct Broadcast<NodeUid: Eq + Hash> {
/// All the mutable state is confined to the `state` field. This allows to
/// mutate state even when the broadcast instance is referred to by an
/// immutable reference.
state: RwLock<BroadcastState>,
state: RwLock<BroadcastState<NodeUid>>,
}
impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
/// Creates a new broadcast instance to be used by node `our_id` which expects a value proposal
/// from node `proposer_id`.
pub fn new(
our_id: NodeUid,
proposer_id: NodeUid,
all_uids: HashSet<NodeUid>,
all_uids: BTreeSet<NodeUid>,
) -> Result<Self, Error> {
let num_nodes = all_uids.len();
let num_faulty_nodes = (num_nodes - 1) / 3;
@ -97,14 +196,11 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
data_shard_num,
coding,
state: RwLock::new(BroadcastState {
root_hash: None,
leaf_values: vec![None; num_nodes],
leaf_values_num: 0,
echo_num: 0,
readys: HashMap::new(),
echo_sent: false,
ready_sent: false,
ready_to_decode: false,
has_output: false,
echos: BTreeMap::new(),
readys: BTreeMap::new(),
}),
})
}
@ -114,23 +210,18 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
if self.our_id != self.proposer_id {
return Err(Error::UnexpectedMessage);
}
let mut state = self.state.write().unwrap();
// Split the value into chunks/shards, encode them with erasure codes.
// Assemble a Merkle tree from data and parity shards. Take all proofs
// from this tree and send them, each to its own node.
self.send_shards(value).map(|(proof, remote_messages)| {
// Record the first proof as if it were sent by the node to itself.
let h = proof.root_hash.clone();
// Save the leaf value for reconstructing the tree later.
state.leaf_values[index_of_proof(&proof)] =
Some(proof.value.clone().into_boxed_slice());
state.leaf_values_num += 1;
state.root_hash = Some(h);
remote_messages
})
let (proof, value_msgs) = self.send_shards(value)?;
// TODO: We'd actually need to return the output here, if it was only one node. Should that
// use-case be supported?
let state = self.state.write().unwrap();
let (_, echo_msgs) = self.handle_value(&self.our_id, proof, state)?;
Ok(value_msgs.into_iter().chain(echo_msgs).collect())
}
/// Returns this node's ID.
pub fn our_id(&self) -> &NodeUid {
&self.our_id
}
@ -143,7 +234,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
fn send_shards(
&self,
mut value: ProposedValue,
) -> Result<(Proof<ProposedValue>, MessageQueue<NodeUid>), Error> {
) -> Result<(Proof<Vec<u8>>, MessageQueue<NodeUid>), Error> {
let data_shard_num = self.coding.data_shard_count();
let parity_shard_num = self.coding.parity_shard_count();
@ -172,14 +263,20 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
// Convert the iterator over slices into a vector of slices.
let mut shards: Vec<&mut [u8]> = shards_iter.collect();
debug!("Shards before encoding: {:?}", shards);
debug!("Shards before encoding: {:?}", HexList(&shards));
// Construct the parity chunks/shards
self.coding.encode(&mut shards)?;
debug!("Shards: {:?}", shards);
debug!("Shards: {:?}", HexList(&shards));
let shards_t: Vec<ProposedValue> = shards.into_iter().map(|s| s.to_vec()).collect();
// TODO: `MerkleTree` generates the wrong proof if a leaf occurs more than once, so we
// prepend an "index byte" to each shard. Consider using the `merkle_light` crate instead.
let shards_t: Vec<ProposedValue> = shards
.into_iter()
.enumerate()
.map(|(i, s)| iter::once(i as u8).chain(s.iter().cloned()).collect())
.collect();
// Convert the Merkle tree into a partial binary tree for later
// deconstruction into compound branches.
@ -188,23 +285,19 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
// Default result in case of `gen_proof` error.
let mut result = Err(Error::ProofConstructionFailed);
let mut outgoing = VecDeque::new();
assert_eq!(self.num_nodes, mtree.iter().count());
// Send each proof to a node.
// TODO: This generates the wrong proof if a leaf occurs more than once. Consider using the
// `merkle_light` crate instead.
for (leaf_value, uid) in mtree.iter().zip(self.all_uids.clone()) {
for (leaf_value, uid) in mtree.iter().zip(&self.all_uids) {
let proof = mtree
.gen_proof(leaf_value.to_vec())
.ok_or(Error::ProofConstructionFailed)?;
if uid == self.our_id {
if *uid == self.our_id {
// The proof is addressed to this node.
result = Ok(proof);
} else {
// Rest of the proofs are sent to remote nodes.
outgoing.push_back(TargetedBroadcastMessage {
target: BroadcastTarget::Node(uid),
message: BroadcastMessage::Value(proof),
});
outgoing.push_back(BroadcastMessage::Value(proof).target_node(uid.clone()));
}
}
@ -217,11 +310,14 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
sender_id: &NodeUid,
message: BroadcastMessage<ProposedValue>,
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
if !self.all_uids.contains(sender_id) {
return Err(Error::UnknownSender);
}
let state = self.state.write().unwrap();
match message {
BroadcastMessage::Value(p) => self.handle_value(sender_id, p, state),
BroadcastMessage::Echo(p) => self.handle_echo(p, state),
BroadcastMessage::Ready(hash) => self.handle_ready(hash, state),
BroadcastMessage::Echo(p) => self.handle_echo(sender_id, p, state),
BroadcastMessage::Ready(ref hash) => self.handle_ready(sender_id, hash, state),
}
}
@ -229,159 +325,166 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
fn handle_value(
&self,
sender_id: &NodeUid,
p: Proof<ProposedValue>,
mut state: RwLockWriteGuard<BroadcastState>,
p: Proof<Vec<u8>>,
mut state: RwLockWriteGuard<BroadcastState<NodeUid>>,
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
// If the sender is not the proposer, this is not the first `Value` or the proof is invalid,
// ignore.
if *sender_id != self.proposer_id {
info!(
"Node {:?} received Value from {:?} instead of {:?}.",
self.our_id, sender_id, self.proposer_id
);
return Ok((None, VecDeque::new()));
}
// Initialize the root hash if not already initialised.
if state.root_hash.is_none() {
state.root_hash = Some(p.root_hash.clone());
debug!(
"Node {:?} Value root hash {:?}",
self.our_id,
HexBytes(&p.root_hash)
);
if state.echo_sent {
info!("Node {:?} received multiple Values.", self.our_id);
return Ok((None, VecDeque::new()));
}
if !self.validate_proof(&p, &self.our_id) {
return Ok((None, VecDeque::new()));
}
if state.root_hash.as_ref().map_or(false, |h| p.validate(h)) {
// TODO: Should messages failing this be echoed at all?
// Save the leaf value for reconstructing the tree later.
let idx = index_of_proof(&p);
state.leaf_values[idx] = Some(p.value.clone().into_boxed_slice());
state.leaf_values_num += 1;
}
// Otherwise multicast the proof in an `Echo` message, and handle it ourselves.
state.echo_sent = true;
let (output, echo_msgs) = self.handle_echo(&self.our_id, p.clone(), state)?;
let msgs = iter::once(BroadcastMessage::Echo(p).target_all())
.chain(echo_msgs)
.collect();
// Enqueue a broadcast of an echo of this proof.
let msgs = VecDeque::from(vec![TargetedBroadcastMessage {
target: BroadcastTarget::All,
message: BroadcastMessage::Echo(p.clone()),
}]);
let (output, echo_msgs) = self.handle_echo(p, state)?;
Ok((output, msgs.into_iter().chain(echo_msgs).collect()))
Ok((output, msgs))
}
/// Handles a received echo and verifies the proof it contains.
/// Handles a received `Echo` message.
fn handle_echo(
&self,
p: Proof<ProposedValue>,
mut state: RwLockWriteGuard<BroadcastState>,
sender_id: &NodeUid,
p: Proof<Vec<u8>>,
mut state: RwLockWriteGuard<BroadcastState<NodeUid>>,
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
if state.root_hash.is_none() {
state.root_hash = Some(p.root_hash.clone());
debug!(
"Node {:?} Echo root hash {:?}",
self.our_id, state.root_hash
// If the proof is invalid or the sender has already sent `Echo`, ignore.
if state.echos.contains_key(sender_id) {
info!(
"Node {:?} received multiple Echos from {:?}.",
self.our_id, sender_id,
);
}
// Call validate with the root hash as argument.
let h = if let Some(h) = state.root_hash.clone() {
h
} else {
error!("Broadcast/{:?} root hash not initialised", self.our_id);
return Ok((None, VecDeque::new()));
};
if !p.validate(h.as_slice()) {
debug!("Broadcast/{:?} cannot validate Echo {:?}", self.our_id, p);
}
if !self.validate_proof(&p, sender_id) {
return Ok((None, VecDeque::new()));
}
state.echo_num += 1;
// Save the leaf value for reconstructing the tree later.
let idx = index_of_proof(&p);
state.leaf_values[idx] = Some(p.value.into_boxed_slice());
state.leaf_values_num += 1;
let hash = p.root_hash.clone();
// Upon receiving 2f + 1 matching READY(h)
// messages, wait for N 2 f ECHO messages,
// then decode v. Return the decoded v to ACS.
if state.leaf_values_num < self.num_nodes - self.num_faulty_nodes {
return Ok((None, VecDeque::new()));
}
// TODO: Only decode once. Don't repeat for every ECHO message.
let value = decode_from_shards(
&mut state.leaf_values,
&self.coding,
self.data_shard_num,
&h,
)?;
if state.ready_to_decode && !state.has_output {
state.has_output = true;
return Ok((Some(value), VecDeque::new()));
}
// if Ready has not yet been sent, multicast Ready
if state.ready_sent {
return Ok((None, VecDeque::new()));
// Save the proof for reconstructing the tree later.
state.echos.insert(sender_id.clone(), p);
if state.ready_sent || state.count_echos(&hash) < self.num_nodes - self.num_faulty_nodes {
return Ok((self.get_output(state, &hash)?, VecDeque::new()));
}
// Upon receiving `N - f` `Echo`s with this root hash, multicast `Ready`.
state.ready_sent = true;
let msg = TargetedBroadcastMessage {
target: BroadcastTarget::All,
message: BroadcastMessage::Ready(h.clone()),
};
let (output, ready_msgs) = self.handle_ready(h, state)?;
let msg = BroadcastMessage::Ready(hash.clone()).target_all();
let (output, ready_msgs) = self.handle_ready(&self.our_id, &hash, state)?;
Ok((output, iter::once(msg).chain(ready_msgs).collect()))
}
/// Handles a received `Ready` message.
fn handle_ready(
&self,
hash: Vec<u8>,
mut state: RwLockWriteGuard<BroadcastState>,
sender_id: &NodeUid,
hash: &[u8],
mut state: RwLockWriteGuard<BroadcastState<NodeUid>>,
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
// Update the number Ready has been received with this hash.
// TODO: Don't accept multiple ready messages from the same node.
*state.readys.entry(hash).or_insert(1) += 1;
// Check that the root hash matches.
let h = if let Some(h) = state.root_hash.clone() {
h
} else {
// If the sender has already sent a `Ready` before, ignore.
if state.readys.contains_key(sender_id) {
info!(
"Node {:?} received multiple Readys from {:?}.",
self.our_id, sender_id
);
return Ok((None, VecDeque::new()));
};
}
let ready_num = *state.readys.get(&h).unwrap_or(&0);
let mut outgoing = VecDeque::new();
state.readys.insert(sender_id.clone(), hash.to_vec());
// Upon receiving f + 1 matching Ready(h) messages, if Ready
// has not yet been sent, multicast Ready(h).
if (ready_num == self.num_faulty_nodes + 1) && !state.ready_sent {
// Enqueue a broadcast of a ready message.
outgoing.push_back(TargetedBroadcastMessage {
target: BroadcastTarget::All,
message: BroadcastMessage::Ready(h.to_vec()),
});
let outgoing = if state.count_readys(hash) == self.num_faulty_nodes + 1 && !state.ready_sent
{
// Enqueue a broadcast of a Ready message.
state.ready_sent = true;
iter::once(BroadcastMessage::Ready(hash.to_vec()).target_all()).collect()
} else {
VecDeque::new()
};
Ok((self.get_output(state, hash)?, outgoing))
}
/// Checks whether the condition for output are met for this hash, and if so, returns the output
/// value.
fn get_output(
&self,
mut state: RwLockWriteGuard<BroadcastState<NodeUid>>,
hash: &[u8],
) -> Result<Option<ProposedValue>, Error> {
if state.has_output || state.count_readys(hash) <= 2 * self.num_faulty_nodes
|| state.count_echos(hash) <= self.num_faulty_nodes
{
return Ok(None);
}
let mut output = None;
// Upon receiving 2f + 1 matching Ready(h) messages, wait for N 2f Echo messages.
state.has_output = true;
let mut leaf_values: Vec<Option<Box<[u8]>>> = self.all_uids
.iter()
.map(|id| {
state.echos.get(id).and_then(|p| {
if p.root_hash.as_slice() == hash {
Some(p.value.clone().into_boxed_slice())
} else {
None
}
})
})
.collect();
let value = decode_from_shards(&mut leaf_values, &self.coding, self.data_shard_num, hash)?;
Ok(Some(value))
}
// Upon receiving 2f + 1 matching Ready(h) messages, wait
// for N 2f Echo messages, then decode v.
if ready_num > 2 * self.num_faulty_nodes {
// Wait for N - 2f Echo messages, then decode v.
if state.echo_num >= self.num_nodes - 2 * self.num_faulty_nodes {
let value = decode_from_shards(
&mut state.leaf_values,
&self.coding,
self.data_shard_num,
&h,
)?;
/// Returns `i` if `node_id` is the `i`-th ID among all participating nodes.
fn index_of_node(&self, node_id: &NodeUid) -> Option<usize> {
self.all_uids.iter().position(|id| id == node_id)
}
if !state.has_output {
output = Some(value);
state.has_output = true;
}
} else {
state.ready_to_decode = true;
}
/// Returns the index of this proof's leave in the Merkle tree.
fn index_of_proof(&self, proof: &Proof<ProposedValue>) -> usize {
index_of_lemma(&proof.lemma, self.num_nodes)
}
/// Returns `true` if the proof is valid and has the same index as the node ID. Otherwise
/// logs an info message.
fn validate_proof(&self, p: &Proof<ProposedValue>, id: &NodeUid) -> bool {
if !p.validate(&p.root_hash) {
info!(
"Node {:?} received invalid proof: {:?}",
self.our_id,
HexProof(&p)
);
false
} else if self.index_of_node(id) != Some(p.value[0] as usize)
|| self.index_of_proof(&p) != p.value[0] as usize
{
info!(
"Node {:?} received proof for wrong position: {:?}.",
self.our_id,
HexProof(&p)
);
false
} else {
true
}
Ok((output, outgoing))
}
}
@ -411,12 +514,12 @@ impl<'a, T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8
tx: &'a Sender<TargetedMessage<ProposedValue>>,
rx: &'a Receiver<SourcedMessage<ProposedValue>>,
broadcast_value: Option<T>,
num_nodes: usize,
proposer_index: usize,
node_ids: BTreeSet<usize>,
our_id: usize,
proposer_id: usize,
) -> Self {
let all_indexes = (0..num_nodes).collect();
let broadcast = Broadcast::new(0, proposer_index, all_indexes)
.expect("failed to instantiate broadcast");
let broadcast =
Broadcast::new(our_id, proposer_id, node_ids).expect("failed to instantiate broadcast");
Instance {
tx,
rx,
@ -445,6 +548,7 @@ pub enum Error {
Recv(RecvError),
UnexpectedMessage,
NotImplemented,
UnknownSender,
}
impl From<rse::Error> for Error {
@ -476,7 +580,7 @@ fn inner_run<'a>(
for msg in broadcast
.propose_value(v)?
.into_iter()
.map(TargetedBroadcastMessage::into_targeted_message)
.map(TargetedBroadcastMessage::into)
{
tx.send(msg)?;
}
@ -491,10 +595,15 @@ fn inner_run<'a>(
message: Message::Broadcast(message),
} = message
{
debug!("{} received from {}: {:?}", broadcast.our_id, i, message);
let (opt_output, msgs) = broadcast.handle_broadcast_message(&i, message)?;
for msg in msgs.into_iter()
.map(TargetedBroadcastMessage::into_targeted_message)
{
for msg in &msgs {
debug!(
"{} sending to {:?}: {:?}",
broadcast.our_id, msg.target, msg.message
);
}
for msg in msgs.into_iter().map(TargetedBroadcastMessage::into) {
tx.send(msg)?;
}
if let Some(output) = opt_output {
@ -525,6 +634,9 @@ where
.iter()
.filter_map(|l| l.as_ref().map(|v| v.to_vec()))
.collect();
debug!("Reconstructed shards: {:?}", HexList(&shards));
// Construct the Merkle tree.
let mtree = MerkleTree::from_vec(&::ring::digest::SHA256, shards);
// If the root hash of the reconstructed tree does not match the one
@ -547,62 +659,41 @@ fn glue_shards<T>(m: MerkleTree<ProposedValue>, n: usize) -> T
where
T: From<Vec<u8>> + Into<Vec<u8>>,
{
let t: Vec<u8> = m.into_iter().take(n).flat_map(|s| s).collect();
let t: Vec<u8> = m.into_iter()
.take(n)
.flat_map(|s| s.into_iter().skip(1)) // Drop the index byte.
.collect();
let payload_len = t[0] as usize;
debug!("Glued data shards {:?}", &t[1..(payload_len + 1)]);
debug!("Glued data shards {:?}", HexBytes(&t[1..(payload_len + 1)]));
Vec::into(t[1..(payload_len + 1)].to_vec())
}
/// An additional path conversion operation on `Lemma` to allow reconstruction
/// of erasure-coded `Proof` from `Lemma`s. The output path, when read from left
/// to right, goes from leaf to root (LSB order).
fn path_of_lemma(lemma: &Lemma) -> Vec<bool> {
match lemma.sub_lemma {
None => {
match lemma.sibling_hash {
// lemma terminates with no leaf
None => vec![],
// the leaf is on the right
Some(Positioned::Left(_)) => vec![true],
// the leaf is on the left
Some(Positioned::Right(_)) => vec![false],
}
}
Some(ref l) => {
let mut p = path_of_lemma(l.as_ref());
match lemma.sibling_hash {
// lemma terminates
None => (),
// lemma branches out to the right
Some(Positioned::Left(_)) => p.push(true),
// lemma branches out to the left
Some(Positioned::Right(_)) => p.push(false),
}
p
}
/// Computes the Merkle tree leaf index of a value in a given lemma.
pub fn index_of_lemma(lemma: &Lemma, n: usize) -> usize {
let m = n.next_power_of_two();
match (lemma.sub_lemma.as_ref(), lemma.sibling_hash.as_ref()) {
(None, Some(&Positioned::Right(_))) | (None, None) => 0,
(None, Some(&Positioned::Left(_))) => 1,
(Some(l), None) => index_of_lemma(l, n),
(Some(l), Some(&Positioned::Left(_))) => (m >> 1) + index_of_lemma(l, n - (m >> 1)),
(Some(l), Some(&Positioned::Right(_))) => index_of_lemma(l, m >> 1),
}
}
/// Further conversion of a binary tree path into an array index.
fn index_of_path(mut path: Vec<bool>) -> usize {
let mut idx = 0;
// Convert to the MSB order.
path.reverse();
for &dir in &path {
idx <<= 1;
if dir {
idx |= 1;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_index_of_lemma() {
for &n in &[3, 4, 13, 16, 127, 128, 129, 255] {
let shards: Vec<[u8; 1]> = (0..n).map(|i| [i as u8]).collect();
let mtree = MerkleTree::from_vec(&::ring::digest::SHA256, shards);
for (i, val) in mtree.iter().enumerate() {
let p = mtree.gen_proof(val.clone()).expect("generate proof");
let idx = index_of_lemma(&p.lemma, n);
assert_eq!(i, idx, "Wrong index {} for leaf {}/{}.", idx, i, n);
}
}
}
idx
}
/// Computes the Merkle tree leaf index of a value in a given proof.
// TODO: This currently only works if the number of leaves is a power of two. With the
// `merkle_light` crate, it might not even be needed, though.
pub fn index_of_proof<T>(p: &Proof<T>) -> usize {
index_of_path(path_of_lemma(&p.lemma))
}

View File

@ -11,9 +11,9 @@ use agreement;
use agreement::Agreement;
use broadcast;
use broadcast::{Broadcast, TargetedBroadcastMessage};
use broadcast::{Broadcast, BroadcastMessage, TargetedBroadcastMessage};
use proto::{AgreementMessage, BroadcastMessage};
use proto::AgreementMessage;
// TODO: Make this a generic argument of `Broadcast`.
type ProposedValue = Vec<u8>;
@ -39,7 +39,7 @@ pub enum Output<NodeUid> {
Agreement(AgreementMessage),
}
pub struct CommonSubset<NodeUid: Eq + Hash> {
pub struct CommonSubset<NodeUid: Eq + Hash + Ord> {
uid: NodeUid,
num_nodes: usize,
num_faulty_nodes: usize,
@ -59,7 +59,11 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
for uid0 in all_uids {
broadcast_instances.insert(
uid0.clone(),
Broadcast::new(uid.clone(), uid0.clone(), all_uids.clone())?,
Broadcast::new(
uid.clone(),
uid0.clone(),
all_uids.iter().cloned().collect(),
)?,
);
}

View File

@ -8,7 +8,7 @@ use std::fmt::Debug;
/// recepients is unknown without further computation which is irrelevant to the
/// message delivery task.
#[derive(Clone, Debug)]
pub struct SourcedMessage<T: Clone + Debug + Send + Sync> {
pub struct SourcedMessage<T: Clone + Debug + Send + Sync + AsRef<[u8]>> {
pub source: usize,
pub message: Message<T>,
}
@ -27,20 +27,14 @@ pub enum Target {
/// Message with a designated target.
#[derive(Clone, Debug, PartialEq)]
pub struct TargetedMessage<T: Clone + Debug + Send + Sync> {
pub struct TargetedMessage<T: Clone + Debug + Send + Sync + AsRef<[u8]>> {
pub target: Target,
pub message: Message<T>,
}
impl<T: Clone + Debug + Send + Sync> TargetedMessage<T> {
impl<T: Clone + Debug + Send + Sync + AsRef<[u8]>> TargetedMessage<T> {
/// Initialises a message while checking parameter preconditions.
pub fn new(target: Target, message: Message<T>) -> Option<Self> {
match target {
Target::Node(i) if i == 0 => {
// Remote node indices start from 1.
None
}
_ => Some(TargetedMessage { target, message }),
}
pub fn new(target: Target, message: Message<T>) -> Self {
TargetedMessage { target, message }
}
}

View File

@ -1,6 +1,7 @@
//! Construction of messages from protobuf buffers.
pub mod message;
use broadcast::BroadcastMessage;
use merkle::proof::{Lemma, Positioned, Proof};
use proto::message::*;
use protobuf::core::parse_from_bytes;
@ -12,20 +13,11 @@ use std::marker::{Send, Sync};
/// Kinds of message sent by nodes participating in consensus.
#[derive(Clone, Debug, PartialEq)]
pub enum Message<T: Send + Sync> {
pub enum Message<T: Send + Sync + AsRef<[u8]>> {
Broadcast(BroadcastMessage<T>),
Agreement(AgreementMessage),
}
/// The three kinds of message sent during the reliable broadcast stage of the
/// consensus algorithm.
#[derive(Clone, PartialEq)]
pub enum BroadcastMessage<T: Send + Sync> {
Value(Proof<T>),
Echo(Proof<T>),
Ready(Vec<u8>),
}
/// Wrapper for a byte array, whose `Debug` implementation outputs shortened hexadecimal strings.
pub struct HexBytes<'a>(pub &'a [u8]);
@ -33,46 +25,47 @@ impl<'a> fmt::Debug for HexBytes<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.0.len() > 6 {
for byte in &self.0[..3] {
write!(f, "{:0x}", byte)?;
write!(f, "{:02x}", byte)?;
}
write!(f, "..")?;
for byte in &self.0[(self.0.len() - 3)..] {
write!(f, "{:0x}", byte)?;
write!(f, "{:02x}", byte)?;
}
} else {
for byte in self.0 {
write!(f, "{:0x}", byte)?;
write!(f, "{:02x}", byte)?;
}
}
Ok(())
}
}
struct HexProof<'a, T: 'a>(&'a Proof<T>);
/// Wrapper for a list of byte arrays, whose `Debug` implementation outputs shortened hexadecimal
/// strings.
pub struct HexList<'a, T: 'a>(pub &'a [T]);
impl<'a, T: Send + Sync + fmt::Debug> fmt::Debug for HexProof<'a, T> {
impl<'a, T: AsRef<[u8]>> fmt::Debug for HexList<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let v: Vec<_> = self.0.iter().map(|t| HexBytes(t.as_ref())).collect();
write!(f, "{:?}", v)
}
}
pub struct HexProof<'a, T: 'a>(pub &'a Proof<T>);
impl<'a, T: AsRef<[u8]>> fmt::Debug for HexProof<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Proof {{ algorithm: {:?}, root_hash: {:?}, lemma for leaf #{}, value: {:?} }}",
self.0.algorithm,
HexBytes(&self.0.root_hash),
::broadcast::index_of_proof(self.0),
self.0.value
path_of_lemma(&self.0.lemma),
HexBytes(&self.0.value.as_ref())
)
}
}
impl<T: Send + Sync + fmt::Debug> fmt::Debug for BroadcastMessage<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
BroadcastMessage::Value(ref v) => write!(f, "Value({:?})", HexProof(&v)),
BroadcastMessage::Echo(ref v) => write!(f, "Echo({:?})", HexProof(&v)),
BroadcastMessage::Ready(ref bytes) => write!(f, "Ready({:?})", HexBytes(bytes)),
}
}
}
/// Messages sent during the binary Byzantine agreement stage.
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum AgreementMessage {
@ -80,17 +73,14 @@ pub enum AgreementMessage {
Aux(bool),
}
impl<T: Send + Sync> Message<T> {
impl<T: Send + Sync + AsRef<[u8]> + From<Vec<u8>>> Message<T> {
/// Translation from protobuf to the regular type.
///
/// TODO: add an `Algorithm` field to `MessageProto`. Either `Algorithm` has
/// to be fully serialised and sent as a whole, or it can be passed over
/// using an ID and the `Eq` instance to discriminate the finite set of
/// algorithms in `ring::digest`.
pub fn from_proto(mut proto: message::MessageProto) -> Option<Self>
where
T: From<Vec<u8>>,
{
pub fn from_proto(mut proto: message::MessageProto) -> Option<Self> {
if proto.has_broadcast() {
BroadcastMessage::from_proto(
proto.take_broadcast(),
@ -105,10 +95,7 @@ impl<T: Send + Sync> Message<T> {
}
}
pub fn into_proto(self) -> MessageProto
where
T: Into<Vec<u8>>,
{
pub fn into_proto(self) -> MessageProto {
let mut m = MessageProto::new();
match self {
Message::Broadcast(b) => {
@ -125,10 +112,7 @@ impl<T: Send + Sync> Message<T> {
///
/// TODO: pass custom errors from down the chain of nested parsers as
/// opposed to returning `WireError::Other`.
pub fn parse_from_bytes(bytes: &[u8]) -> ProtobufResult<Self>
where
T: From<Vec<u8>>,
{
pub fn parse_from_bytes(bytes: &[u8]) -> ProtobufResult<Self> {
let r = parse_from_bytes::<MessageProto>(bytes).map(Self::from_proto);
match r {
@ -139,19 +123,13 @@ impl<T: Send + Sync> Message<T> {
}
/// Produce a protobuf representation of this `Message`.
pub fn write_to_bytes(self) -> ProtobufResult<Vec<u8>>
where
T: Into<Vec<u8>>,
{
pub fn write_to_bytes(self) -> ProtobufResult<Vec<u8>> {
self.into_proto().write_to_bytes()
}
}
impl<T: Send + Sync> BroadcastMessage<T> {
pub fn into_proto(self) -> BroadcastProto
where
T: Into<Vec<u8>>,
{
impl<T: Send + Sync + AsRef<[u8]> + From<Vec<u8>>> BroadcastMessage<T> {
pub fn into_proto(self) -> BroadcastProto {
let mut b = BroadcastProto::new();
match self {
BroadcastMessage::Value(p) => {
@ -173,10 +151,7 @@ impl<T: Send + Sync> BroadcastMessage<T> {
b
}
pub fn from_proto(mut mp: BroadcastProto, algorithm: &'static Algorithm) -> Option<Self>
where
T: From<Vec<u8>>,
{
pub fn from_proto(mut mp: BroadcastProto, algorithm: &'static Algorithm) -> Option<Self> {
if mp.has_value() {
mp.take_value()
.take_proof()
@ -223,10 +198,7 @@ impl AgreementMessage {
/// around the restriction of not being allowed to extend the implementation of
/// `Proof` outside its crate.
impl ProofProto {
pub fn from_proof<T>(proof: Proof<T>) -> Self
where
T: Into<Vec<u8>>,
{
pub fn from_proof<T: AsRef<[u8]>>(proof: Proof<T>) -> Self {
let mut proto = Self::new();
match proof {
@ -239,17 +211,17 @@ impl ProofProto {
} => {
proto.set_root_hash(root_hash);
proto.set_lemma(LemmaProto::from_lemma(lemma));
proto.set_value(value.into());
proto.set_value(value.as_ref().to_vec());
}
}
proto
}
pub fn into_proof<T>(mut self, algorithm: &'static Algorithm) -> Option<Proof<T>>
where
T: From<Vec<u8>>,
{
pub fn into_proof<T: From<Vec<u8>>>(
mut self,
algorithm: &'static Algorithm,
) -> Option<Proof<T>> {
if !self.has_lemma() {
return None;
}
@ -326,3 +298,19 @@ impl LemmaProto {
}
}
}
/// The path of the lemma, as a binary string
fn path_of_lemma(mut lemma: &Lemma) -> String {
let mut result = String::new();
loop {
match lemma.sibling_hash {
None => (),
Some(Positioned::Left(_)) => result.push('1'),
Some(Positioned::Right(_)) => result.push('0'),
}
lemma = match lemma.sub_lemma.as_ref() {
Some(lemma) => lemma,
None => return result,
}
}
}

View File

@ -57,7 +57,7 @@ impl<S: Read + Write> ProtoIo<S>
pub fn recv<T>(&mut self) -> Result<Message<T>, Error>
where
T: Clone + Send + Sync + From<Vec<u8>>, // + Into<Vec<u8>>
T: Clone + Send + Sync + AsRef<[u8]> + From<Vec<u8>>,
{
let mut stream = protobuf::CodedInputStream::new(&mut self.stream);
// Read magic number
@ -69,7 +69,7 @@ impl<S: Read + Write> ProtoIo<S>
pub fn send<T>(&mut self, message: Message<T>) -> Result<(), Error>
where
T: Clone + Send + Sync + Into<Vec<u8>>,
T: Clone + Send + Sync + AsRef<[u8]> + From<Vec<u8>>,
{
let mut stream = protobuf::CodedOutputStream::new(&mut self.stream);
// Write magic number
@ -85,6 +85,7 @@ impl<S: Read + Write> ProtoIo<S>
#[cfg(test)]
mod tests {
use broadcast::BroadcastMessage;
use proto_io::*;
use std::io::Cursor;

View File

@ -5,16 +5,15 @@ extern crate hbbft;
extern crate log;
extern crate crossbeam;
extern crate crossbeam_channel;
extern crate env_logger;
extern crate merkle;
extern crate rand;
extern crate simple_logger;
use rand::Rng;
use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt;
use hbbft::broadcast::{Broadcast, BroadcastTarget, TargetedBroadcastMessage};
use hbbft::proto::BroadcastMessage;
use hbbft::broadcast::{Broadcast, BroadcastMessage, BroadcastTarget, TargetedBroadcastMessage};
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy)]
struct NodeId(usize);
@ -168,7 +167,7 @@ impl Adversary for ProposeAdversary {
}
self.has_sent = true;
let value = b"Fake news";
let node_ids: HashSet<NodeId> = self.adv_nodes
let node_ids: BTreeSet<NodeId> = self.adv_nodes
.iter()
.cloned()
.chain(self.good_nodes.iter().cloned())
@ -191,7 +190,7 @@ impl<A: Adversary> TestNetwork<A> {
/// Creates a new network with `good_num` good nodes, and the given `adversary` controlling
/// `adv_num` nodes.
fn new(good_num: usize, adv_num: usize, adversary: A) -> TestNetwork<A> {
let node_ids: HashSet<NodeId> = (0..(good_num + adv_num)).map(NodeId).collect();
let node_ids: BTreeSet<NodeId> = (0..(good_num + adv_num)).map(NodeId).collect();
let new_broadcast = |id: NodeId| {
let bc =
Broadcast::new(id, NodeId(0), node_ids.clone()).expect("Instantiate broadcast");
@ -271,8 +270,8 @@ impl<A: Adversary> TestNetwork<A> {
/// Broadcasts a value from node 0 and expects all good nodes to receive it.
fn test_broadcast<A: Adversary>(mut network: TestNetwork<A>, proposed_value: &[u8]) {
// TODO: This returns an error in all but the first test.
let _ = simple_logger::init_with_level(log::Level::Debug);
// This returns an error in all but the first test.
let _ = env_logger::try_init();
// Make node 0 propose the value.
network.propose_value(NodeId(0), proposed_value.to_vec());
@ -288,36 +287,46 @@ fn test_broadcast<A: Adversary>(mut network: TestNetwork<A>, proposed_value: &[u
}
}
// TODO: Unignore once equal shards don't cause problems anymore.
#[test]
#[ignore]
fn test_8_broadcast_equal_leaves() {
fn test_8_broadcast_equal_leaves_silent() {
let adversary = SilentAdversary::new(MessageScheduler::Random);
// Space is ASCII character 32. So 32 spaces will create shards that are all equal, even if the
// length of the value is inserted.
test_broadcast(TestNetwork::new(8, 0, adversary), &[b' '; 32]);
}
// TODO: Unignore once node numbers are supported that are not powers of two.
#[test]
#[ignore]
fn test_13_broadcast_nodes_random_delivery() {
fn test_13_broadcast_nodes_random_delivery_silent() {
let adversary = SilentAdversary::new(MessageScheduler::Random);
test_broadcast(TestNetwork::new(13, 0, adversary), b"Foo");
}
#[test]
fn test_11_5_broadcast_nodes_random_delivery() {
fn test_4_broadcast_nodes_random_delivery_silent() {
let adversary = SilentAdversary::new(MessageScheduler::Random);
test_broadcast(TestNetwork::new(4, 0, adversary), b"Foo");
}
#[test]
fn test_11_5_broadcast_nodes_random_delivery_silent() {
let adversary = SilentAdversary::new(MessageScheduler::Random);
test_broadcast(TestNetwork::new(11, 5, adversary), b"Foo");
}
#[test]
fn test_11_5_broadcast_nodes_first_delivery() {
fn test_11_5_broadcast_nodes_first_delivery_silent() {
let adversary = SilentAdversary::new(MessageScheduler::First);
test_broadcast(TestNetwork::new(11, 5, adversary), b"Foo");
}
#[test]
fn test_3_1_broadcast_nodes_random_delivery_adv_propose() {
let good_nodes: BTreeSet<NodeId> = (0..3).map(NodeId).collect();
let adv_nodes: BTreeSet<NodeId> = (3..4).map(NodeId).collect();
let adversary = ProposeAdversary::new(MessageScheduler::Random, good_nodes, adv_nodes);
test_broadcast(TestNetwork::new(3, 1, adversary), b"Foo");
}
#[test]
fn test_11_5_broadcast_nodes_random_delivery_adv_propose() {
let good_nodes: BTreeSet<NodeId> = (0..11).map(NodeId).collect();