Merge pull request #13 from poanetwork/afck--remove-channels

Fix broadcast and example, enable more tests.
This commit is contained in:
Vladimir Komendantskiy 2018-05-09 16:47:01 +01:00 committed by GitHub
commit 7748c9073b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 596 additions and 493 deletions

View File

@ -17,9 +17,11 @@ env:
global:
- RUST_BACKTRACE=1
- RUSTFLAGS="-D warnings"
script:
- cargo fmt -- --write-mode=diff
- cargo clippy -- -D clippy
- cargo clippy --tests -- -D clippy
- cargo check --tests
- cargo test
script:
# TODO: This currently fails, claiming that `src/proto/message.rs` does not
# exist. Re-enable once the problem is resolved.
# - cargo fmt -- --write-mode=diff
- cargo clippy -- -D clippy
- cargo clippy --tests -- -D clippy
- cargo check --tests
- cargo test

View File

@ -4,8 +4,8 @@ version = "0.1.0"
authors = ["Vladimir Komendantskiy <komendantsky@gmail.com>"]
[dependencies]
env_logger = "0.5.10"
log = "0.4.1"
simple_logger = "0.5"
reed-solomon-erasure = "3.0"
merkle = { git = "https://github.com/vkomenda/merkle.rs", branch = "public-proof" }
ring = "^0.12"

View File

@ -4,10 +4,10 @@ extern crate crossbeam;
#[macro_use]
extern crate crossbeam_channel;
extern crate docopt;
extern crate env_logger;
extern crate hbbft;
#[macro_use]
extern crate log;
extern crate simple_logger;
mod network;
@ -55,7 +55,7 @@ fn parse_args() -> Args {
}
pub fn main() {
simple_logger::init_with_level(log::Level::Debug).unwrap();
env_logger::init();
let args: Args = parse_args();
println!("{:?}", args);
let node = Node::new(args.bind_address, args.remote_addresses, args.value);

View File

@ -25,7 +25,7 @@ impl From<io::Error> for Error {
/// A communication task connects a remote node to the thread that manages the
/// consensus algorithm.
pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>> {
pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + AsRef<[u8]>> {
/// The transmit side of the multiple producer channel from comms threads.
tx: &'a Sender<SourcedMessage<T>>,
/// The receive side of the channel to the comms thread.
@ -36,7 +36,7 @@ pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + I
pub node_index: usize,
}
impl<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>> CommsTask<'a, T> {
impl<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + AsRef<[u8]>> CommsTask<'a, T> {
pub fn new(
tx: &'a Sender<SourcedMessage<T>>,
rx: &'a Receiver<Message<T>>,
@ -72,7 +72,6 @@ impl<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>> Co
loop {
// Receive a multicast message from the manager thread.
let message = rx.recv().unwrap();
debug!("Node {} <- {:?}", node_index, message);
// Forward the message to the remote node.
io1.send(message).unwrap();
}
@ -83,7 +82,6 @@ impl<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>> Co
loop {
match self.io.recv() {
Ok(message) => {
debug!("Node {} -> {:?}", node_index, message);
tx.send(SourcedMessage {
source: node_index,
message,

View File

@ -1,6 +1,6 @@
//! Connection data and initiation routines.
use std::collections::HashSet;
use std::collections::{BTreeMap, HashSet};
use std::io::BufReader;
use std::net::{SocketAddr, TcpListener, TcpStream};
@ -8,61 +8,47 @@ use std::net::{SocketAddr, TcpListener, TcpStream};
pub struct Connection {
pub stream: TcpStream,
pub reader: BufReader<TcpStream>,
pub node_str: String,
}
impl Connection {
pub fn new(stream: TcpStream) -> Self {
pub fn new(stream: TcpStream, node_str: String) -> Self {
Connection {
// Create a read buffer of 1K bytes.
reader: BufReader::with_capacity(1024, stream.try_clone().unwrap()),
stream,
node_str,
}
}
}
/// Connect this node to remote peers. A vector of successful connections is
/// returned.
pub fn make(bind_address: &SocketAddr, remote_addresses: &HashSet<SocketAddr>) -> Vec<Connection> {
// Connected remote nodes.
// let mut connected: Vec<SocketAddr> = Vec::new();
/// Connect this node to remote peers. A vector of successful connections is returned, as well as
/// our own node ID.
pub fn make(
bind_address: &SocketAddr,
remote_addresses: &HashSet<SocketAddr>,
) -> (String, Vec<Connection>) {
// Listen for incoming connections on a given TCP port.
let bind_address = bind_address;
let listener = TcpListener::bind(bind_address).unwrap();
// Initialise initial connection states.
let mut connections: Vec<Option<Connection>> = (0..remote_addresses.len())
.into_iter()
.map(|_| None)
.collect();
let listener = TcpListener::bind(bind_address).expect("start listener");
let here_str = format!("{}", bind_address);
// Use a `BTreeMap` to make sure we all iterate in the same order.
let remote_by_str: BTreeMap<String, _> = remote_addresses
.iter()
.map(|addr| (format!("{}", addr), addr))
.filter(|(there_str, _)| *there_str != here_str)
.collect();
// Wait for all nodes with larger addresses to connect.
for (n, &address) in remote_addresses.iter().enumerate() {
let there_str = format!("{}", address);
if here_str < there_str {
connections[n] = match listener.accept() {
Ok((stream, _)) => {
info!("Connected to {}", there_str);
Some(Connection::new(stream))
}
Err(_) => None,
}
}
}
// Try to connect to all nodes with smaller addresses.
for (n, &address) in remote_addresses.iter().enumerate() {
let there_str = format!("{}", address);
if here_str > there_str {
connections[n] = match TcpStream::connect(address) {
Ok(stream) => {
info!("Connected to {}", there_str);
Some(Connection::new(stream))
}
Err(_) => None,
}
}
}
// remove Nones from connections
connections.into_iter().filter_map(|c| c).collect()
let connections = remote_by_str
.into_iter()
.map(|(there_str, address)| {
let tcp_conn = if here_str < there_str {
listener.accept().expect("failed to connect").0
} else {
TcpStream::connect(address).expect("failed to connect")
};
Connection::new(tcp_conn, there_str.to_string())
})
.collect();
(here_str, connections)
}

View File

@ -9,13 +9,13 @@ use std::fmt::Debug;
/// The queue functionality for messages sent between algorithm instances.
/// The messaging struct allows for targeted message exchange between comms
/// tasks on one side and algo tasks on the other.
pub struct Messaging<T: Clone + Debug + Send + Sync> {
pub struct Messaging<T: Clone + Debug + Send + Sync + AsRef<[u8]>> {
/// Transmit sides of message channels to comms threads.
txs_to_comms: Vec<Sender<Message<T>>>,
/// Receive side of the routed message channel from comms threads.
rx_from_comms: Receiver<SourcedMessage<T>>,
/// Transmit sides of message channels to algo threads.
txs_to_algo: Vec<Sender<SourcedMessage<T>>>,
/// Transmit sides of message channels to algo thread.
tx_to_algo: Sender<SourcedMessage<T>>,
/// Receive side of the routed message channel from comms threads.
rx_from_algo: Receiver<TargetedMessage<T>>,
@ -23,9 +23,9 @@ pub struct Messaging<T: Clone + Debug + Send + Sync> {
rxs_to_comms: Vec<Receiver<Message<T>>>,
/// TX handle to be used by comms tasks.
tx_from_comms: Sender<SourcedMessage<T>>,
/// RX handles to be used by algo tasks.
rxs_to_algo: Vec<Receiver<SourcedMessage<T>>>,
/// TX handle to be used by algo tasks.
/// RX handles to be used by algo task.
rx_to_algo: Receiver<SourcedMessage<T>>,
/// TX handle to be used by algo task.
tx_from_algo: Sender<TargetedMessage<T>>,
/// Control channel used to stop the listening thread.
@ -33,23 +33,17 @@ pub struct Messaging<T: Clone + Debug + Send + Sync> {
stop_rx: Receiver<()>,
}
impl<T: Clone + Debug + Send + Sync> Messaging<T> {
impl<T: Clone + Debug + Send + Sync + AsRef<[u8]>> Messaging<T> {
/// Initialises all the required TX and RX handles for the case on a total
/// number `num_nodes` of consensus nodes.
pub fn new(num_nodes: usize) -> Self {
let to_comms: Vec<_> = (0..num_nodes - 1)
.map(|_| unbounded::<Message<T>>())
.collect();
let to_comms: Vec<_> = (0..num_nodes).map(|_| unbounded::<Message<T>>()).collect();
let txs_to_comms = to_comms.iter().map(|&(ref tx, _)| tx.to_owned()).collect();
let rxs_to_comms: Vec<Receiver<Message<T>>> =
to_comms.iter().map(|&(_, ref rx)| rx.to_owned()).collect();
let (tx_from_comms, rx_from_comms) = unbounded();
let to_algo: Vec<_> = (0..num_nodes)
.map(|_| unbounded::<SourcedMessage<T>>())
.collect();
let txs_to_algo = to_algo.iter().map(|&(ref tx, _)| tx.to_owned()).collect();
let rxs_to_algo: Vec<Receiver<SourcedMessage<T>>> =
to_algo.iter().map(|&(_, ref rx)| rx.to_owned()).collect();
let (tx_to_algo, rx_to_algo) = unbounded();
let (tx_from_algo, rx_from_algo) = unbounded();
let (stop_tx, stop_rx) = bounded(1);
@ -58,13 +52,13 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
// internally used handles
txs_to_comms,
rx_from_comms,
txs_to_algo,
tx_to_algo,
rx_from_algo,
// externally used handles
rxs_to_comms,
tx_from_comms,
rxs_to_algo,
rx_to_algo,
tx_from_algo,
stop_tx,
@ -80,8 +74,8 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
&self.tx_from_comms
}
pub fn rxs_to_algo(&self) -> &Vec<Receiver<SourcedMessage<T>>> {
&self.rxs_to_algo
pub fn rx_to_algo(&self) -> &Receiver<SourcedMessage<T>> {
&self.rx_to_algo
}
pub fn tx_from_algo(&self) -> &Sender<TargetedMessage<T>> {
@ -100,7 +94,7 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
{
let txs_to_comms = self.txs_to_comms.to_owned();
let rx_from_comms = self.rx_from_comms.to_owned();
let txs_to_algo = self.txs_to_algo.to_owned();
let tx_to_algo = self.tx_to_algo.to_owned();
let rx_from_algo = self.rx_from_algo.to_owned();
let stop_rx = self.stop_rx.to_owned();
@ -128,8 +122,7 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
.fold(Ok(()), |result, tx| {
if result.is_ok() {
tx.send(message.clone())
}
else {
} else {
result
}
}).map_err(Error::from);
@ -138,16 +131,10 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
target: Target::Node(i),
message
} => {
// Remote node indices start from 1.
assert!(i > 0);
// Convert node index to vector index.
let i = i - 1;
result = if i < txs_to_comms.len() {
txs_to_comms[i].send(message.clone())
.map_err(Error::from)
}
else {
} else {
Err(Error::NoSuchTarget)
};
}
@ -156,14 +143,7 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
recv(rx_from_comms, message) => {
// Send the message to all algorithm instances, stopping at
// the first error.
result = txs_to_algo.iter().fold(Ok(()), |result, tx| {
if result.is_ok() {
tx.send(message.clone())
}
else {
result
}
}).map_err(Error::from)
result = tx_to_algo.send(message.clone()).map_err(Error::from)
},
recv(stop_rx, _) => {
// Flag the thread ready to exit.

View File

@ -37,9 +37,9 @@
use crossbeam;
use std::collections::HashSet;
use std::fmt::Debug;
use std::io;
use std::marker::{Send, Sync};
use std::net::SocketAddr;
use std::{io, iter, process, thread, time};
use hbbft::broadcast;
use network::commst;
@ -50,7 +50,6 @@ use network::messaging::Messaging;
pub enum Error {
IoError(io::Error),
CommsError(commst::Error),
NotImplemented,
}
impl From<io::Error> for Error {
@ -90,54 +89,65 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
/// Consensus node procedure implementing HoneyBadgerBFT.
pub fn run(&self) -> Result<T, Error> {
let value = &self.value;
let connections = connection::make(&self.addr, &self.remotes);
let (our_str, connections) = connection::make(&self.addr, &self.remotes);
let mut node_strs: Vec<String> = iter::once(our_str.clone())
.chain(connections.iter().map(|c| c.node_str.clone()))
.collect();
node_strs.sort();
debug!("Nodes: {:?}", node_strs);
let proposer_id = 0;
let our_id = node_strs.binary_search(&our_str).unwrap();
let num_nodes = connections.len() + 1;
if value.is_some() != (our_id == proposer_id) {
panic!("Exactly the first node must propose a value.");
}
// Initialise the message delivery system and obtain TX and RX handles.
let messaging: Messaging<Vec<u8>> = Messaging::new(num_nodes);
let rxs_to_comms = messaging.rxs_to_comms();
let tx_from_comms = messaging.tx_from_comms();
let rxs_to_algo = messaging.rxs_to_algo();
let rx_to_algo = messaging.rx_to_algo();
let tx_from_algo = messaging.tx_from_algo();
let stop_tx = messaging.stop_tx();
// All spawned threads will have exited by the end of the scope.
crossbeam::scope(|scope| {
// Start the centralised message delivery system.
let msg_handle = messaging.spawn(scope);
let mut broadcast_handles = Vec::new();
let _msg_handle = messaging.spawn(scope);
// Associate a broadcast instance with this node. This instance will
// broadcast the proposed value. There is no remote node
// corresponding to this instance, and no dedicated comms task. The
// node index is 0.
let rx_to_algo0 = &rxs_to_algo[0];
broadcast_handles.push(scope.spawn(move || {
let broadcast_handle = scope.spawn(move || {
match broadcast::Instance::new(
tx_from_algo,
rx_to_algo0,
rx_to_algo,
value.to_owned(),
num_nodes,
0,
(0..num_nodes).collect(),
our_id,
proposer_id,
).run()
{
Ok(t) => {
debug!(
"Broadcast instance 0 succeeded: {}",
println!(
"Broadcast succeeded! Node {} output: {}",
our_id,
String::from_utf8(T::into(t)).unwrap()
);
}
Err(e) => error!("Broadcast instance 0: {:?}", e),
Err(e) => error!("Broadcast instance: {:?}", e),
}
}));
});
// Start a comms task for each connection. Node indices of those
// tasks are 1 through N where N is the number of connections.
for (i, c) in connections.iter().enumerate() {
// Receive side of a single-consumer channel from algorithm
// actor tasks to the comms task.
let rx_to_comms = &rxs_to_comms[i];
let node_index = i + 1;
let node_index = if c.node_str < our_str { i } else { i + 1 };
let rx_to_comms = &rxs_to_comms[node_index];
scope.spawn(move || {
match commst::CommsTask::new(
@ -152,35 +162,14 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
Err(e) => error!("Comms task {}: {:?}", node_index, e),
}
});
// Associate a broadcast instance to the above comms task.
let rx_to_algo = &rxs_to_algo[node_index];
broadcast_handles.push(scope.spawn(move || {
match broadcast::Instance::new(
tx_from_algo,
rx_to_algo,
None,
num_nodes,
node_index,
).run()
{
Ok(t) => {
debug!(
"Broadcast instance {} succeeded: {}",
node_index,
String::from_utf8(T::into(t)).unwrap()
);
}
Err(e) => error!("Broadcast instance {}: {:?}", node_index, e),
}
}));
}
// Wait for the broadcast instances to finish before stopping the
// messaging task.
for h in broadcast_handles {
h.join();
}
broadcast_handle.join();
// Wait another second so that pending messages get sent out.
thread::sleep(time::Duration::from_secs(1));
// Stop the messaging task.
stop_tx
@ -190,30 +179,14 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
})
.unwrap();
match msg_handle.join() {
Ok(()) => debug!("Messaging stopped OK"),
Err(e) => debug!("Messaging error: {:?}", e),
}
// TODO: continue the implementation of the asynchronous common
// subset algorithm.
Err(Error::NotImplemented)
process::exit(0);
// TODO: Exit cleanly.
// match msg_handle.join() {
// Ok(()) => debug!("Messaging stopped OK"),
// Err(e) => debug!("Messaging error: {:?}", e),
// }
// Err(Error::NotImplemented)
}) // end of thread scope
}
}
// #[cfg(test)]
// mod tests {
// use std::collections::HashSet;
// use node;
// /// Test that the node works to completion.
// #[test]
// fn test_node_0() {
// let node = node::Node::new("127.0.0.1:10000".parse().unwrap(),
// HashSet::new(),
// Some("abc".as_bytes().to_vec()));
// let result = node.run();
// assert!(match result { Err(node::Error::NotImplemented) => true,
// _ => false });
// }
// }

View File

@ -0,0 +1,15 @@
#! /bin/bash
export RUST_LOG=hbbft=debug
cargo build --example consensus-node
cargo run --example consensus-node -- --bind-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --value=Foo &
sleep 1
cargo run --example consensus-node -- --bind-address=127.0.0.1:5001 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 &
sleep 1
cargo run --example consensus-node -- --bind-address=127.0.0.1:5002 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 &
sleep 1
cargo run --example consensus-node -- --bind-address=127.0.0.1:5003 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5004 &
sleep 1
cargo run --example consensus-node -- --bind-address=127.0.0.1:5004 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 &
wait

View File

@ -1,12 +1,11 @@
//! Reliable broadcast algorithm instance.
use crossbeam_channel::{Receiver, RecvError, SendError, Sender};
use merkle::proof::{Lemma, Positioned, Proof};
use merkle::{Hashable, MerkleTree};
use proto::*;
use reed_solomon_erasure as rse;
use reed_solomon_erasure::ReedSolomon;
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt::Debug;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt::{self, Debug};
use std::hash::Hash;
use std::iter;
use std::marker::{Send, Sync};
@ -19,6 +18,41 @@ type ProposedValue = Vec<u8>;
type MessageQueue<NodeUid> = VecDeque<TargetedBroadcastMessage<NodeUid>>;
/// The three kinds of message sent during the reliable broadcast stage of the
/// consensus algorithm.
#[derive(Clone, PartialEq)]
pub enum BroadcastMessage<T: Send + Sync> {
Value(Proof<T>),
Echo(Proof<T>),
Ready(Vec<u8>),
}
impl BroadcastMessage<ProposedValue> {
fn target_all<NodeUid>(self) -> TargetedBroadcastMessage<NodeUid> {
TargetedBroadcastMessage {
target: BroadcastTarget::All,
message: self,
}
}
fn target_node<NodeUid>(self, id: NodeUid) -> TargetedBroadcastMessage<NodeUid> {
TargetedBroadcastMessage {
target: BroadcastTarget::Node(id),
message: self,
}
}
}
impl<T: Send + Sync + Debug + AsRef<[u8]>> fmt::Debug for BroadcastMessage<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
BroadcastMessage::Value(ref v) => write!(f, "Value({:?})", HexProof(&v)),
BroadcastMessage::Echo(ref v) => write!(f, "Echo({:?})", HexProof(&v)),
BroadcastMessage::Ready(ref bytes) => write!(f, "Ready({:?})", HexBytes(bytes)),
}
}
}
/// A `BroadcastMessage` to be sent out, together with a target.
#[derive(Clone, Debug)]
pub struct TargetedBroadcastMessage<NodeUid> {
@ -26,14 +60,11 @@ pub struct TargetedBroadcastMessage<NodeUid> {
pub message: BroadcastMessage<ProposedValue>,
}
impl TargetedBroadcastMessage<usize> {
pub fn into_targeted_message(self) -> TargetedMessage<ProposedValue> {
impl From<TargetedBroadcastMessage<usize>> for TargetedMessage<ProposedValue> {
fn from(msg: TargetedBroadcastMessage<usize>) -> TargetedMessage<ProposedValue> {
TargetedMessage {
target: match self.target {
BroadcastTarget::All => Target::All,
BroadcastTarget::Node(node) => Target::Node(node),
},
message: Message::Broadcast(self.message),
target: msg.target.into(),
message: Message::Broadcast(msg.message),
}
}
}
@ -45,25 +76,93 @@ pub enum BroadcastTarget<NodeUid> {
Node(NodeUid),
}
struct BroadcastState {
root_hash: Option<Vec<u8>>,
leaf_values: Vec<Option<Box<[u8]>>>,
leaf_values_num: usize,
echo_num: usize,
readys: HashMap<Vec<u8>, usize>,
impl From<BroadcastTarget<usize>> for Target {
fn from(bt: BroadcastTarget<usize>) -> Target {
match bt {
BroadcastTarget::All => Target::All,
BroadcastTarget::Node(node) => Target::Node(node),
}
}
}
struct BroadcastState<NodeUid: Eq + Hash + Ord> {
/// Whether we have already multicas `Echo`.
echo_sent: bool,
/// Whether we have already multicast `Ready`.
ready_sent: bool,
ready_to_decode: bool,
/// Whether we have already output a value.
has_output: bool,
/// The proofs we have received via `Echo` messages, by sender ID.
echos: BTreeMap<NodeUid, Proof<Vec<u8>>>,
/// The root hashes we received via `Ready` messages, by sender ID.
readys: BTreeMap<NodeUid, Vec<u8>>,
}
impl<NodeUid: Eq + Hash + Ord> BroadcastState<NodeUid> {
/// Returns the number of nodes that have sent us an `Echo` message with this hash.
fn count_echos(&self, hash: &[u8]) -> usize {
self.echos
.values()
.filter(|p| p.root_hash.as_slice() == hash)
.count()
}
/// Returns the number of nodes that have sent us a `Ready` message with this hash.
fn count_readys(&self, hash: &[u8]) -> usize {
self.readys
.values()
.filter(|h| h.as_slice() == hash)
.count()
}
}
/// Reliable Broadcast algorithm instance.
pub struct Broadcast<NodeUid: Eq + Hash> {
///
/// The Reliable Broadcast Protocol assumes a network of `N` nodes that send signed messages to
/// each other, with at most `f` of them malicious, where `3 * f < N`. Handling the networking and
/// signing is the responsibility of this crate's user: only when a message has been verified to be
/// "from node i", it can be handed to the `Broadcast` instance. One of the nodes is the "proposer"
/// who sends a value. Under the above conditions, the protocol guarantees that either all or none
/// of the good nodes output a value, and that if the proposer is good, all good nodes output the
/// proposed value.
///
/// The algorithm works as follows:
///
/// * The proposer uses a Reed-Solomon code to split the value into `N` chunks, `f + 1` of which
/// suffice to reconstruct the value. These chunks are put into a Merkle tree, so that with the
/// tree's root hash `h`, branch `bi` and chunk `si`, the `i`-th chunk `si` can be verified by
/// anyone to belong to the Merkle tree with root hash `h`. These values are "proof" number `i`:
/// `pi`.
/// * The proposer sends `Value(pi)` to node `i`. It translates to: "I am the proposer, and `pi`
/// contains the `i`-th share of my value."
/// * Every (good) node that receives `Value(pi)` from the proposer sends it on to everyone else as
/// `Echo(pi)`. An `Echo` translates to: "I have received `pi` directly from the proposer." If the
/// proposer sends another `Value` message, that is ignored.
/// * So every node that has received at least `f + 1` `Echo` messages with the same root
/// hash will be able to decode a value.
/// * Every node that has received `N - f` `Echo`s with the same root hash from different nodes
/// knows that at least `f + 1` _good_ nodes have sent an `Echo` with that hash to everyone, and
/// therefore everyone will eventually receive at least `f + 1` of them. So upon receiving `N - f`
/// `Echo`s, they send a `Ready(h)` to everyone to indicate that. `Ready` translates to: "I know
/// that everyone will eventually be able to decode the value." Moreover, since every good node
/// only ever sends one kind of `Echo` message, this cannot happen for two different root hashes.
/// * Even without enough `Echo` messages, if a node receives `f + 1` `Ready` messages, it knows
/// that at least one _good_ node has sent `Ready`. It therefore also knows that everyone will be
/// able to decode eventually, and multicasts `Ready` itself.
/// * If a node has received `2 * f + 1` `Ready`s (with matching root hash) from different nodes,
/// it knows that at least `f + 1` _good_ nodes have sent it. Therefore, every good node will
/// eventually receive `f + 1`, and multicast it itself. Therefore, every good node will eventually
/// receive `2 * f + 1` `Ready`s, too. _And_ we know at this point that every good node will
/// eventually be able to decode (i.e. receive at least `f + 1` `Echo` messages).
/// * So a node with `2 * f + 1` `Ready`s and `f + 1` `Echos` will decode and _output_ the value,
/// knowing that every other good node will eventually do the same.
pub struct Broadcast<NodeUid: Eq + Hash + Ord> {
/// The UID of this node.
our_id: NodeUid,
/// The UID of the sending node.
proposer_id: NodeUid,
/// UIDs of all nodes for iteration purposes.
all_uids: HashSet<NodeUid>,
all_uids: BTreeSet<NodeUid>,
num_nodes: usize,
num_faulty_nodes: usize,
data_shard_num: usize,
@ -71,16 +170,16 @@ pub struct Broadcast<NodeUid: Eq + Hash> {
/// All the mutable state is confined to the `state` field. This allows to
/// mutate state even when the broadcast instance is referred to by an
/// immutable reference.
state: RwLock<BroadcastState>,
state: RwLock<BroadcastState<NodeUid>>,
}
impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
/// Creates a new broadcast instance to be used by node `our_id` which expects a value proposal
/// from node `proposer_id`.
pub fn new(
our_id: NodeUid,
proposer_id: NodeUid,
all_uids: HashSet<NodeUid>,
all_uids: BTreeSet<NodeUid>,
) -> Result<Self, Error> {
let num_nodes = all_uids.len();
let num_faulty_nodes = (num_nodes - 1) / 3;
@ -97,14 +196,11 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
data_shard_num,
coding,
state: RwLock::new(BroadcastState {
root_hash: None,
leaf_values: vec![None; num_nodes],
leaf_values_num: 0,
echo_num: 0,
readys: HashMap::new(),
echo_sent: false,
ready_sent: false,
ready_to_decode: false,
has_output: false,
echos: BTreeMap::new(),
readys: BTreeMap::new(),
}),
})
}
@ -114,23 +210,18 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
if self.our_id != self.proposer_id {
return Err(Error::UnexpectedMessage);
}
let mut state = self.state.write().unwrap();
// Split the value into chunks/shards, encode them with erasure codes.
// Assemble a Merkle tree from data and parity shards. Take all proofs
// from this tree and send them, each to its own node.
self.send_shards(value).map(|(proof, remote_messages)| {
// Record the first proof as if it were sent by the node to itself.
let h = proof.root_hash.clone();
// Save the leaf value for reconstructing the tree later.
state.leaf_values[index_of_proof(&proof)] =
Some(proof.value.clone().into_boxed_slice());
state.leaf_values_num += 1;
state.root_hash = Some(h);
remote_messages
})
let (proof, value_msgs) = self.send_shards(value)?;
// TODO: We'd actually need to return the output here, if it was only one node. Should that
// use-case be supported?
let state = self.state.write().unwrap();
let (_, echo_msgs) = self.handle_value(&self.our_id, proof, state)?;
Ok(value_msgs.into_iter().chain(echo_msgs).collect())
}
/// Returns this node's ID.
pub fn our_id(&self) -> &NodeUid {
&self.our_id
}
@ -143,7 +234,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
fn send_shards(
&self,
mut value: ProposedValue,
) -> Result<(Proof<ProposedValue>, MessageQueue<NodeUid>), Error> {
) -> Result<(Proof<Vec<u8>>, MessageQueue<NodeUid>), Error> {
let data_shard_num = self.coding.data_shard_count();
let parity_shard_num = self.coding.parity_shard_count();
@ -172,14 +263,20 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
// Convert the iterator over slices into a vector of slices.
let mut shards: Vec<&mut [u8]> = shards_iter.collect();
debug!("Shards before encoding: {:?}", shards);
debug!("Shards before encoding: {:?}", HexList(&shards));
// Construct the parity chunks/shards
self.coding.encode(&mut shards)?;
debug!("Shards: {:?}", shards);
debug!("Shards: {:?}", HexList(&shards));
let shards_t: Vec<ProposedValue> = shards.into_iter().map(|s| s.to_vec()).collect();
// TODO: `MerkleTree` generates the wrong proof if a leaf occurs more than once, so we
// prepend an "index byte" to each shard. Consider using the `merkle_light` crate instead.
let shards_t: Vec<ProposedValue> = shards
.into_iter()
.enumerate()
.map(|(i, s)| iter::once(i as u8).chain(s.iter().cloned()).collect())
.collect();
// Convert the Merkle tree into a partial binary tree for later
// deconstruction into compound branches.
@ -188,23 +285,19 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
// Default result in case of `gen_proof` error.
let mut result = Err(Error::ProofConstructionFailed);
let mut outgoing = VecDeque::new();
assert_eq!(self.num_nodes, mtree.iter().count());
// Send each proof to a node.
// TODO: This generates the wrong proof if a leaf occurs more than once. Consider using the
// `merkle_light` crate instead.
for (leaf_value, uid) in mtree.iter().zip(self.all_uids.clone()) {
for (leaf_value, uid) in mtree.iter().zip(&self.all_uids) {
let proof = mtree
.gen_proof(leaf_value.to_vec())
.ok_or(Error::ProofConstructionFailed)?;
if uid == self.our_id {
if *uid == self.our_id {
// The proof is addressed to this node.
result = Ok(proof);
} else {
// Rest of the proofs are sent to remote nodes.
outgoing.push_back(TargetedBroadcastMessage {
target: BroadcastTarget::Node(uid),
message: BroadcastMessage::Value(proof),
});
outgoing.push_back(BroadcastMessage::Value(proof).target_node(uid.clone()));
}
}
@ -217,11 +310,14 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
sender_id: &NodeUid,
message: BroadcastMessage<ProposedValue>,
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
if !self.all_uids.contains(sender_id) {
return Err(Error::UnknownSender);
}
let state = self.state.write().unwrap();
match message {
BroadcastMessage::Value(p) => self.handle_value(sender_id, p, state),
BroadcastMessage::Echo(p) => self.handle_echo(p, state),
BroadcastMessage::Ready(hash) => self.handle_ready(hash, state),
BroadcastMessage::Echo(p) => self.handle_echo(sender_id, p, state),
BroadcastMessage::Ready(ref hash) => self.handle_ready(sender_id, hash, state),
}
}
@ -229,159 +325,166 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
fn handle_value(
&self,
sender_id: &NodeUid,
p: Proof<ProposedValue>,
mut state: RwLockWriteGuard<BroadcastState>,
p: Proof<Vec<u8>>,
mut state: RwLockWriteGuard<BroadcastState<NodeUid>>,
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
// If the sender is not the proposer, this is not the first `Value` or the proof is invalid,
// ignore.
if *sender_id != self.proposer_id {
info!(
"Node {:?} received Value from {:?} instead of {:?}.",
self.our_id, sender_id, self.proposer_id
);
return Ok((None, VecDeque::new()));
}
// Initialize the root hash if not already initialised.
if state.root_hash.is_none() {
state.root_hash = Some(p.root_hash.clone());
debug!(
"Node {:?} Value root hash {:?}",
self.our_id,
HexBytes(&p.root_hash)
);
if state.echo_sent {
info!("Node {:?} received multiple Values.", self.our_id);
return Ok((None, VecDeque::new()));
}
if !self.validate_proof(&p, &self.our_id) {
return Ok((None, VecDeque::new()));
}
if state.root_hash.as_ref().map_or(false, |h| p.validate(h)) {
// TODO: Should messages failing this be echoed at all?
// Save the leaf value for reconstructing the tree later.
let idx = index_of_proof(&p);
state.leaf_values[idx] = Some(p.value.clone().into_boxed_slice());
state.leaf_values_num += 1;
}
// Otherwise multicast the proof in an `Echo` message, and handle it ourselves.
state.echo_sent = true;
let (output, echo_msgs) = self.handle_echo(&self.our_id, p.clone(), state)?;
let msgs = iter::once(BroadcastMessage::Echo(p).target_all())
.chain(echo_msgs)
.collect();
// Enqueue a broadcast of an echo of this proof.
let msgs = VecDeque::from(vec![TargetedBroadcastMessage {
target: BroadcastTarget::All,
message: BroadcastMessage::Echo(p.clone()),
}]);
let (output, echo_msgs) = self.handle_echo(p, state)?;
Ok((output, msgs.into_iter().chain(echo_msgs).collect()))
Ok((output, msgs))
}
/// Handles a received echo and verifies the proof it contains.
/// Handles a received `Echo` message.
fn handle_echo(
&self,
p: Proof<ProposedValue>,
mut state: RwLockWriteGuard<BroadcastState>,
sender_id: &NodeUid,
p: Proof<Vec<u8>>,
mut state: RwLockWriteGuard<BroadcastState<NodeUid>>,
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
if state.root_hash.is_none() {
state.root_hash = Some(p.root_hash.clone());
debug!(
"Node {:?} Echo root hash {:?}",
self.our_id, state.root_hash
// If the proof is invalid or the sender has already sent `Echo`, ignore.
if state.echos.contains_key(sender_id) {
info!(
"Node {:?} received multiple Echos from {:?}.",
self.our_id, sender_id,
);
}
// Call validate with the root hash as argument.
let h = if let Some(h) = state.root_hash.clone() {
h
} else {
error!("Broadcast/{:?} root hash not initialised", self.our_id);
return Ok((None, VecDeque::new()));
};
if !p.validate(h.as_slice()) {
debug!("Broadcast/{:?} cannot validate Echo {:?}", self.our_id, p);
}
if !self.validate_proof(&p, sender_id) {
return Ok((None, VecDeque::new()));
}
state.echo_num += 1;
// Save the leaf value for reconstructing the tree later.
let idx = index_of_proof(&p);
state.leaf_values[idx] = Some(p.value.into_boxed_slice());
state.leaf_values_num += 1;
let hash = p.root_hash.clone();
// Upon receiving 2f + 1 matching READY(h)
// messages, wait for N 2 f ECHO messages,
// then decode v. Return the decoded v to ACS.
if state.leaf_values_num < self.num_nodes - self.num_faulty_nodes {
return Ok((None, VecDeque::new()));
}
// TODO: Only decode once. Don't repeat for every ECHO message.
let value = decode_from_shards(
&mut state.leaf_values,
&self.coding,
self.data_shard_num,
&h,
)?;
if state.ready_to_decode && !state.has_output {
state.has_output = true;
return Ok((Some(value), VecDeque::new()));
}
// if Ready has not yet been sent, multicast Ready
if state.ready_sent {
return Ok((None, VecDeque::new()));
// Save the proof for reconstructing the tree later.
state.echos.insert(sender_id.clone(), p);
if state.ready_sent || state.count_echos(&hash) < self.num_nodes - self.num_faulty_nodes {
return Ok((self.get_output(state, &hash)?, VecDeque::new()));
}
// Upon receiving `N - f` `Echo`s with this root hash, multicast `Ready`.
state.ready_sent = true;
let msg = TargetedBroadcastMessage {
target: BroadcastTarget::All,
message: BroadcastMessage::Ready(h.clone()),
};
let (output, ready_msgs) = self.handle_ready(h, state)?;
let msg = BroadcastMessage::Ready(hash.clone()).target_all();
let (output, ready_msgs) = self.handle_ready(&self.our_id, &hash, state)?;
Ok((output, iter::once(msg).chain(ready_msgs).collect()))
}
/// Handles a received `Ready` message.
fn handle_ready(
&self,
hash: Vec<u8>,
mut state: RwLockWriteGuard<BroadcastState>,
sender_id: &NodeUid,
hash: &[u8],
mut state: RwLockWriteGuard<BroadcastState<NodeUid>>,
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
// Update the number Ready has been received with this hash.
// TODO: Don't accept multiple ready messages from the same node.
*state.readys.entry(hash).or_insert(1) += 1;
// Check that the root hash matches.
let h = if let Some(h) = state.root_hash.clone() {
h
} else {
// If the sender has already sent a `Ready` before, ignore.
if state.readys.contains_key(sender_id) {
info!(
"Node {:?} received multiple Readys from {:?}.",
self.our_id, sender_id
);
return Ok((None, VecDeque::new()));
};
}
let ready_num = *state.readys.get(&h).unwrap_or(&0);
let mut outgoing = VecDeque::new();
state.readys.insert(sender_id.clone(), hash.to_vec());
// Upon receiving f + 1 matching Ready(h) messages, if Ready
// has not yet been sent, multicast Ready(h).
if (ready_num == self.num_faulty_nodes + 1) && !state.ready_sent {
// Enqueue a broadcast of a ready message.
outgoing.push_back(TargetedBroadcastMessage {
target: BroadcastTarget::All,
message: BroadcastMessage::Ready(h.to_vec()),
});
let outgoing = if state.count_readys(hash) == self.num_faulty_nodes + 1 && !state.ready_sent
{
// Enqueue a broadcast of a Ready message.
state.ready_sent = true;
iter::once(BroadcastMessage::Ready(hash.to_vec()).target_all()).collect()
} else {
VecDeque::new()
};
Ok((self.get_output(state, hash)?, outgoing))
}
/// Checks whether the condition for output are met for this hash, and if so, returns the output
/// value.
fn get_output(
&self,
mut state: RwLockWriteGuard<BroadcastState<NodeUid>>,
hash: &[u8],
) -> Result<Option<ProposedValue>, Error> {
if state.has_output || state.count_readys(hash) <= 2 * self.num_faulty_nodes
|| state.count_echos(hash) <= self.num_faulty_nodes
{
return Ok(None);
}
let mut output = None;
// Upon receiving 2f + 1 matching Ready(h) messages, wait for N 2f Echo messages.
state.has_output = true;
let mut leaf_values: Vec<Option<Box<[u8]>>> = self.all_uids
.iter()
.map(|id| {
state.echos.get(id).and_then(|p| {
if p.root_hash.as_slice() == hash {
Some(p.value.clone().into_boxed_slice())
} else {
None
}
})
})
.collect();
let value = decode_from_shards(&mut leaf_values, &self.coding, self.data_shard_num, hash)?;
Ok(Some(value))
}
// Upon receiving 2f + 1 matching Ready(h) messages, wait
// for N 2f Echo messages, then decode v.
if ready_num > 2 * self.num_faulty_nodes {
// Wait for N - 2f Echo messages, then decode v.
if state.echo_num >= self.num_nodes - 2 * self.num_faulty_nodes {
let value = decode_from_shards(
&mut state.leaf_values,
&self.coding,
self.data_shard_num,
&h,
)?;
/// Returns `i` if `node_id` is the `i`-th ID among all participating nodes.
fn index_of_node(&self, node_id: &NodeUid) -> Option<usize> {
self.all_uids.iter().position(|id| id == node_id)
}
if !state.has_output {
output = Some(value);
state.has_output = true;
}
} else {
state.ready_to_decode = true;
}
/// Returns the index of this proof's leave in the Merkle tree.
fn index_of_proof(&self, proof: &Proof<ProposedValue>) -> usize {
index_of_lemma(&proof.lemma, self.num_nodes)
}
/// Returns `true` if the proof is valid and has the same index as the node ID. Otherwise
/// logs an info message.
fn validate_proof(&self, p: &Proof<ProposedValue>, id: &NodeUid) -> bool {
if !p.validate(&p.root_hash) {
info!(
"Node {:?} received invalid proof: {:?}",
self.our_id,
HexProof(&p)
);
false
} else if self.index_of_node(id) != Some(p.value[0] as usize)
|| self.index_of_proof(&p) != p.value[0] as usize
{
info!(
"Node {:?} received proof for wrong position: {:?}.",
self.our_id,
HexProof(&p)
);
false
} else {
true
}
Ok((output, outgoing))
}
}
@ -411,12 +514,12 @@ impl<'a, T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8
tx: &'a Sender<TargetedMessage<ProposedValue>>,
rx: &'a Receiver<SourcedMessage<ProposedValue>>,
broadcast_value: Option<T>,
num_nodes: usize,
proposer_index: usize,
node_ids: BTreeSet<usize>,
our_id: usize,
proposer_id: usize,
) -> Self {
let all_indexes = (0..num_nodes).collect();
let broadcast = Broadcast::new(0, proposer_index, all_indexes)
.expect("failed to instantiate broadcast");
let broadcast =
Broadcast::new(our_id, proposer_id, node_ids).expect("failed to instantiate broadcast");
Instance {
tx,
rx,
@ -445,6 +548,7 @@ pub enum Error {
Recv(RecvError),
UnexpectedMessage,
NotImplemented,
UnknownSender,
}
impl From<rse::Error> for Error {
@ -476,7 +580,7 @@ fn inner_run<'a>(
for msg in broadcast
.propose_value(v)?
.into_iter()
.map(TargetedBroadcastMessage::into_targeted_message)
.map(TargetedBroadcastMessage::into)
{
tx.send(msg)?;
}
@ -491,10 +595,15 @@ fn inner_run<'a>(
message: Message::Broadcast(message),
} = message
{
debug!("{} received from {}: {:?}", broadcast.our_id, i, message);
let (opt_output, msgs) = broadcast.handle_broadcast_message(&i, message)?;
for msg in msgs.into_iter()
.map(TargetedBroadcastMessage::into_targeted_message)
{
for msg in &msgs {
debug!(
"{} sending to {:?}: {:?}",
broadcast.our_id, msg.target, msg.message
);
}
for msg in msgs.into_iter().map(TargetedBroadcastMessage::into) {
tx.send(msg)?;
}
if let Some(output) = opt_output {
@ -525,6 +634,9 @@ where
.iter()
.filter_map(|l| l.as_ref().map(|v| v.to_vec()))
.collect();
debug!("Reconstructed shards: {:?}", HexList(&shards));
// Construct the Merkle tree.
let mtree = MerkleTree::from_vec(&::ring::digest::SHA256, shards);
// If the root hash of the reconstructed tree does not match the one
@ -547,62 +659,41 @@ fn glue_shards<T>(m: MerkleTree<ProposedValue>, n: usize) -> T
where
T: From<Vec<u8>> + Into<Vec<u8>>,
{
let t: Vec<u8> = m.into_iter().take(n).flat_map(|s| s).collect();
let t: Vec<u8> = m.into_iter()
.take(n)
.flat_map(|s| s.into_iter().skip(1)) // Drop the index byte.
.collect();
let payload_len = t[0] as usize;
debug!("Glued data shards {:?}", &t[1..(payload_len + 1)]);
debug!("Glued data shards {:?}", HexBytes(&t[1..(payload_len + 1)]));
Vec::into(t[1..(payload_len + 1)].to_vec())
}
/// An additional path conversion operation on `Lemma` to allow reconstruction
/// of erasure-coded `Proof` from `Lemma`s. The output path, when read from left
/// to right, goes from leaf to root (LSB order).
fn path_of_lemma(lemma: &Lemma) -> Vec<bool> {
match lemma.sub_lemma {
None => {
match lemma.sibling_hash {
// lemma terminates with no leaf
None => vec![],
// the leaf is on the right
Some(Positioned::Left(_)) => vec![true],
// the leaf is on the left
Some(Positioned::Right(_)) => vec![false],
}
}
Some(ref l) => {
let mut p = path_of_lemma(l.as_ref());
match lemma.sibling_hash {
// lemma terminates
None => (),
// lemma branches out to the right
Some(Positioned::Left(_)) => p.push(true),
// lemma branches out to the left
Some(Positioned::Right(_)) => p.push(false),
}
p
}
/// Computes the Merkle tree leaf index of a value in a given lemma.
pub fn index_of_lemma(lemma: &Lemma, n: usize) -> usize {
let m = n.next_power_of_two();
match (lemma.sub_lemma.as_ref(), lemma.sibling_hash.as_ref()) {
(None, Some(&Positioned::Right(_))) | (None, None) => 0,
(None, Some(&Positioned::Left(_))) => 1,
(Some(l), None) => index_of_lemma(l, n),
(Some(l), Some(&Positioned::Left(_))) => (m >> 1) + index_of_lemma(l, n - (m >> 1)),
(Some(l), Some(&Positioned::Right(_))) => index_of_lemma(l, m >> 1),
}
}
/// Further conversion of a binary tree path into an array index.
fn index_of_path(mut path: Vec<bool>) -> usize {
let mut idx = 0;
// Convert to the MSB order.
path.reverse();
for &dir in &path {
idx <<= 1;
if dir {
idx |= 1;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_index_of_lemma() {
for &n in &[3, 4, 13, 16, 127, 128, 129, 255] {
let shards: Vec<[u8; 1]> = (0..n).map(|i| [i as u8]).collect();
let mtree = MerkleTree::from_vec(&::ring::digest::SHA256, shards);
for (i, val) in mtree.iter().enumerate() {
let p = mtree.gen_proof(val.clone()).expect("generate proof");
let idx = index_of_lemma(&p.lemma, n);
assert_eq!(i, idx, "Wrong index {} for leaf {}/{}.", idx, i, n);
}
}
}
idx
}
/// Computes the Merkle tree leaf index of a value in a given proof.
// TODO: This currently only works if the number of leaves is a power of two. With the
// `merkle_light` crate, it might not even be needed, though.
pub fn index_of_proof<T>(p: &Proof<T>) -> usize {
index_of_path(path_of_lemma(&p.lemma))
}

View File

@ -11,9 +11,9 @@ use agreement;
use agreement::Agreement;
use broadcast;
use broadcast::{Broadcast, TargetedBroadcastMessage};
use broadcast::{Broadcast, BroadcastMessage, TargetedBroadcastMessage};
use proto::{AgreementMessage, BroadcastMessage};
use proto::AgreementMessage;
// TODO: Make this a generic argument of `Broadcast`.
type ProposedValue = Vec<u8>;
@ -39,7 +39,7 @@ pub enum Output<NodeUid> {
Agreement(AgreementMessage),
}
pub struct CommonSubset<NodeUid: Eq + Hash> {
pub struct CommonSubset<NodeUid: Eq + Hash + Ord> {
uid: NodeUid,
num_nodes: usize,
num_faulty_nodes: usize,
@ -59,7 +59,11 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
for uid0 in all_uids {
broadcast_instances.insert(
uid0.clone(),
Broadcast::new(uid.clone(), uid0.clone(), all_uids.clone())?,
Broadcast::new(
uid.clone(),
uid0.clone(),
all_uids.iter().cloned().collect(),
)?,
);
}

View File

@ -8,7 +8,7 @@ use std::fmt::Debug;
/// recepients is unknown without further computation which is irrelevant to the
/// message delivery task.
#[derive(Clone, Debug)]
pub struct SourcedMessage<T: Clone + Debug + Send + Sync> {
pub struct SourcedMessage<T: Clone + Debug + Send + Sync + AsRef<[u8]>> {
pub source: usize,
pub message: Message<T>,
}
@ -27,20 +27,14 @@ pub enum Target {
/// Message with a designated target.
#[derive(Clone, Debug, PartialEq)]
pub struct TargetedMessage<T: Clone + Debug + Send + Sync> {
pub struct TargetedMessage<T: Clone + Debug + Send + Sync + AsRef<[u8]>> {
pub target: Target,
pub message: Message<T>,
}
impl<T: Clone + Debug + Send + Sync> TargetedMessage<T> {
impl<T: Clone + Debug + Send + Sync + AsRef<[u8]>> TargetedMessage<T> {
/// Initialises a message while checking parameter preconditions.
pub fn new(target: Target, message: Message<T>) -> Option<Self> {
match target {
Target::Node(i) if i == 0 => {
// Remote node indices start from 1.
None
}
_ => Some(TargetedMessage { target, message }),
}
pub fn new(target: Target, message: Message<T>) -> Self {
TargetedMessage { target, message }
}
}

View File

@ -1,6 +1,7 @@
//! Construction of messages from protobuf buffers.
pub mod message;
use broadcast::BroadcastMessage;
use merkle::proof::{Lemma, Positioned, Proof};
use proto::message::*;
use protobuf::core::parse_from_bytes;
@ -12,20 +13,11 @@ use std::marker::{Send, Sync};
/// Kinds of message sent by nodes participating in consensus.
#[derive(Clone, Debug, PartialEq)]
pub enum Message<T: Send + Sync> {
pub enum Message<T: Send + Sync + AsRef<[u8]>> {
Broadcast(BroadcastMessage<T>),
Agreement(AgreementMessage),
}
/// The three kinds of message sent during the reliable broadcast stage of the
/// consensus algorithm.
#[derive(Clone, PartialEq)]
pub enum BroadcastMessage<T: Send + Sync> {
Value(Proof<T>),
Echo(Proof<T>),
Ready(Vec<u8>),
}
/// Wrapper for a byte array, whose `Debug` implementation outputs shortened hexadecimal strings.
pub struct HexBytes<'a>(pub &'a [u8]);
@ -33,46 +25,47 @@ impl<'a> fmt::Debug for HexBytes<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.0.len() > 6 {
for byte in &self.0[..3] {
write!(f, "{:0x}", byte)?;
write!(f, "{:02x}", byte)?;
}
write!(f, "..")?;
for byte in &self.0[(self.0.len() - 3)..] {
write!(f, "{:0x}", byte)?;
write!(f, "{:02x}", byte)?;
}
} else {
for byte in self.0 {
write!(f, "{:0x}", byte)?;
write!(f, "{:02x}", byte)?;
}
}
Ok(())
}
}
struct HexProof<'a, T: 'a>(&'a Proof<T>);
/// Wrapper for a list of byte arrays, whose `Debug` implementation outputs shortened hexadecimal
/// strings.
pub struct HexList<'a, T: 'a>(pub &'a [T]);
impl<'a, T: Send + Sync + fmt::Debug> fmt::Debug for HexProof<'a, T> {
impl<'a, T: AsRef<[u8]>> fmt::Debug for HexList<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let v: Vec<_> = self.0.iter().map(|t| HexBytes(t.as_ref())).collect();
write!(f, "{:?}", v)
}
}
pub struct HexProof<'a, T: 'a>(pub &'a Proof<T>);
impl<'a, T: AsRef<[u8]>> fmt::Debug for HexProof<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Proof {{ algorithm: {:?}, root_hash: {:?}, lemma for leaf #{}, value: {:?} }}",
self.0.algorithm,
HexBytes(&self.0.root_hash),
::broadcast::index_of_proof(self.0),
self.0.value
path_of_lemma(&self.0.lemma),
HexBytes(&self.0.value.as_ref())
)
}
}
impl<T: Send + Sync + fmt::Debug> fmt::Debug for BroadcastMessage<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
BroadcastMessage::Value(ref v) => write!(f, "Value({:?})", HexProof(&v)),
BroadcastMessage::Echo(ref v) => write!(f, "Echo({:?})", HexProof(&v)),
BroadcastMessage::Ready(ref bytes) => write!(f, "Ready({:?})", HexBytes(bytes)),
}
}
}
/// Messages sent during the binary Byzantine agreement stage.
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum AgreementMessage {
@ -80,17 +73,14 @@ pub enum AgreementMessage {
Aux(bool),
}
impl<T: Send + Sync> Message<T> {
impl<T: Send + Sync + AsRef<[u8]> + From<Vec<u8>>> Message<T> {
/// Translation from protobuf to the regular type.
///
/// TODO: add an `Algorithm` field to `MessageProto`. Either `Algorithm` has
/// to be fully serialised and sent as a whole, or it can be passed over
/// using an ID and the `Eq` instance to discriminate the finite set of
/// algorithms in `ring::digest`.
pub fn from_proto(mut proto: message::MessageProto) -> Option<Self>
where
T: From<Vec<u8>>,
{
pub fn from_proto(mut proto: message::MessageProto) -> Option<Self> {
if proto.has_broadcast() {
BroadcastMessage::from_proto(
proto.take_broadcast(),
@ -105,10 +95,7 @@ impl<T: Send + Sync> Message<T> {
}
}
pub fn into_proto(self) -> MessageProto
where
T: Into<Vec<u8>>,
{
pub fn into_proto(self) -> MessageProto {
let mut m = MessageProto::new();
match self {
Message::Broadcast(b) => {
@ -125,10 +112,7 @@ impl<T: Send + Sync> Message<T> {
///
/// TODO: pass custom errors from down the chain of nested parsers as
/// opposed to returning `WireError::Other`.
pub fn parse_from_bytes(bytes: &[u8]) -> ProtobufResult<Self>
where
T: From<Vec<u8>>,
{
pub fn parse_from_bytes(bytes: &[u8]) -> ProtobufResult<Self> {
let r = parse_from_bytes::<MessageProto>(bytes).map(Self::from_proto);
match r {
@ -139,19 +123,13 @@ impl<T: Send + Sync> Message<T> {
}
/// Produce a protobuf representation of this `Message`.
pub fn write_to_bytes(self) -> ProtobufResult<Vec<u8>>
where
T: Into<Vec<u8>>,
{
pub fn write_to_bytes(self) -> ProtobufResult<Vec<u8>> {
self.into_proto().write_to_bytes()
}
}
impl<T: Send + Sync> BroadcastMessage<T> {
pub fn into_proto(self) -> BroadcastProto
where
T: Into<Vec<u8>>,
{
impl<T: Send + Sync + AsRef<[u8]> + From<Vec<u8>>> BroadcastMessage<T> {
pub fn into_proto(self) -> BroadcastProto {
let mut b = BroadcastProto::new();
match self {
BroadcastMessage::Value(p) => {
@ -173,10 +151,7 @@ impl<T: Send + Sync> BroadcastMessage<T> {
b
}
pub fn from_proto(mut mp: BroadcastProto, algorithm: &'static Algorithm) -> Option<Self>
where
T: From<Vec<u8>>,
{
pub fn from_proto(mut mp: BroadcastProto, algorithm: &'static Algorithm) -> Option<Self> {
if mp.has_value() {
mp.take_value()
.take_proof()
@ -223,10 +198,7 @@ impl AgreementMessage {
/// around the restriction of not being allowed to extend the implementation of
/// `Proof` outside its crate.
impl ProofProto {
pub fn from_proof<T>(proof: Proof<T>) -> Self
where
T: Into<Vec<u8>>,
{
pub fn from_proof<T: AsRef<[u8]>>(proof: Proof<T>) -> Self {
let mut proto = Self::new();
match proof {
@ -239,17 +211,17 @@ impl ProofProto {
} => {
proto.set_root_hash(root_hash);
proto.set_lemma(LemmaProto::from_lemma(lemma));
proto.set_value(value.into());
proto.set_value(value.as_ref().to_vec());
}
}
proto
}
pub fn into_proof<T>(mut self, algorithm: &'static Algorithm) -> Option<Proof<T>>
where
T: From<Vec<u8>>,
{
pub fn into_proof<T: From<Vec<u8>>>(
mut self,
algorithm: &'static Algorithm,
) -> Option<Proof<T>> {
if !self.has_lemma() {
return None;
}
@ -326,3 +298,35 @@ impl LemmaProto {
}
}
}
/// The path of a lemma in a Merkle tree
struct BinaryPath(Vec<bool>);
/// The path of the lemma, as a binary string
fn path_of_lemma(mut lemma: &Lemma) -> BinaryPath {
let mut result = Vec::new();
loop {
match lemma.sibling_hash {
None => (),
Some(Positioned::Left(_)) => result.push(true),
Some(Positioned::Right(_)) => result.push(false),
}
lemma = match lemma.sub_lemma.as_ref() {
Some(lemma) => lemma,
None => return BinaryPath(result),
}
}
}
impl fmt::Display for BinaryPath {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
for b in &self.0 {
if *b {
write!(f, "1")?;
} else {
write!(f, "0")?;
}
}
Ok(())
}
}

View File

@ -3,9 +3,9 @@
use proto::*;
use protobuf;
use protobuf::Message as ProtobufMessage;
use std::io;
use std::io::{Read, Write};
use std::net::TcpStream;
use std::{cmp, io};
/// A magic key to put right before each message. An atavism of primitive serial
/// protocols.
@ -35,14 +35,42 @@ impl From<protobuf::ProtobufError> for Error {
}
}
fn encode_u32_to_be(value: u32, buffer: &mut [u8]) -> Result<(), Error> {
if buffer.len() < 4 {
return Err(Error::EncodeError);
}
let value = value.to_le();
buffer[0] = ((value & 0xFF00_0000) >> 24) as u8;
buffer[1] = ((value & 0x00FF_0000) >> 16) as u8;
buffer[2] = ((value & 0x0000_FF00) >> 8) as u8;
buffer[3] = (value & 0x0000_00FF) as u8;
Ok(())
}
fn decode_u32_from_be(buffer: &[u8]) -> Result<u32, Error> {
if buffer.len() < 4 {
return Err(Error::DecodeError);
}
let mut result = u32::from(buffer[0]);
result <<= 8;
result += u32::from(buffer[1]);
result <<= 8;
result += u32::from(buffer[2]);
result <<= 8;
result += u32::from(buffer[3]);
Ok(result)
}
pub struct ProtoIo<S: Read + Write> {
stream: S,
buffer: [u8; 1024 * 4],
}
impl ProtoIo<TcpStream> {
pub fn try_clone(&self) -> Result<ProtoIo<TcpStream>, ::std::io::Error> {
Ok(ProtoIo {
stream: self.stream.try_clone()?,
buffer: [0; 1024 * 4],
})
}
}
@ -52,31 +80,52 @@ impl<S: Read + Write> ProtoIo<S>
//where T: Clone + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>
{
pub fn from_stream(stream: S) -> Self {
ProtoIo { stream }
ProtoIo {
stream,
buffer: [0; 1024 * 4],
}
}
pub fn recv<T>(&mut self) -> Result<Message<T>, Error>
where
T: Clone + Send + Sync + From<Vec<u8>>, // + Into<Vec<u8>>
T: Clone + Send + Sync + AsRef<[u8]> + From<Vec<u8>>,
{
let mut stream = protobuf::CodedInputStream::new(&mut self.stream);
// Read magic number
if stream.read_raw_varint32()? != FRAME_START {
self.stream.read_exact(&mut self.buffer[0..4])?;
let frame_start = decode_u32_from_be(&self.buffer[0..4])?;
if frame_start != FRAME_START {
return Err(Error::FrameStartMismatch);
};
self.stream.read_exact(&mut self.buffer[0..4])?;
let size = decode_u32_from_be(&self.buffer[0..4])? as usize;
let mut message_v: Vec<u8> = Vec::new();
message_v.reserve(size);
while message_v.len() < size {
let num_to_read = cmp::min(self.buffer.len(), size - message_v.len());
let (slice, _) = self.buffer.split_at_mut(num_to_read);
self.stream.read_exact(slice)?;
message_v.extend_from_slice(slice);
}
Message::from_proto(stream.read_message()?).ok_or(Error::DecodeError)
Message::parse_from_bytes(&message_v).map_err(Error::ProtobufError)
}
pub fn send<T>(&mut self, message: Message<T>) -> Result<(), Error>
where
T: Clone + Send + Sync + Into<Vec<u8>>,
T: Clone + Send + Sync + AsRef<[u8]> + From<Vec<u8>>,
{
let mut buffer: [u8; 4] = [0; 4];
// Wrap stream
let mut stream = protobuf::CodedOutputStream::new(&mut self.stream);
// Write magic number
stream.write_raw_varint32(FRAME_START)?;
encode_u32_to_be(FRAME_START, &mut buffer[0..4])?;
stream.write_raw_bytes(&buffer)?;
let message_p = message.into_proto();
// Write message size
encode_u32_to_be(message_p.compute_size(), &mut buffer[0..4])?;
stream.write_raw_bytes(&buffer)?;
// Write message
message_p.write_length_delimited_to(&mut stream)?;
message_p.write_to(&mut stream)?;
// Flush
stream.flush()?;
Ok(())
@ -85,6 +134,7 @@ impl<S: Read + Write> ProtoIo<S>
#[cfg(test)]
mod tests {
use broadcast::BroadcastMessage;
use proto_io::*;
use std::io::Cursor;
@ -100,9 +150,6 @@ mod tests {
println!("{:?}", pio.stream.get_ref());
pio.stream.set_position(0);
assert_eq!(msg0, pio.recv().expect("recv msg0"));
// TODO: Figure out why the cursor is wrong here.
let len = pio.stream.get_ref().len() as u64;
pio.stream.set_position(len / 2);
assert_eq!(msg1, pio.recv().expect("recv msg1"));
}
}

View File

@ -5,16 +5,15 @@ extern crate hbbft;
extern crate log;
extern crate crossbeam;
extern crate crossbeam_channel;
extern crate env_logger;
extern crate merkle;
extern crate rand;
extern crate simple_logger;
use rand::Rng;
use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt;
use hbbft::broadcast::{Broadcast, BroadcastTarget, TargetedBroadcastMessage};
use hbbft::proto::BroadcastMessage;
use hbbft::broadcast::{Broadcast, BroadcastMessage, BroadcastTarget, TargetedBroadcastMessage};
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy)]
struct NodeId(usize);
@ -168,7 +167,7 @@ impl Adversary for ProposeAdversary {
}
self.has_sent = true;
let value = b"Fake news";
let node_ids: HashSet<NodeId> = self.adv_nodes
let node_ids: BTreeSet<NodeId> = self.adv_nodes
.iter()
.cloned()
.chain(self.good_nodes.iter().cloned())
@ -191,7 +190,7 @@ impl<A: Adversary> TestNetwork<A> {
/// Creates a new network with `good_num` good nodes, and the given `adversary` controlling
/// `adv_num` nodes.
fn new(good_num: usize, adv_num: usize, adversary: A) -> TestNetwork<A> {
let node_ids: HashSet<NodeId> = (0..(good_num + adv_num)).map(NodeId).collect();
let node_ids: BTreeSet<NodeId> = (0..(good_num + adv_num)).map(NodeId).collect();
let new_broadcast = |id: NodeId| {
let bc =
Broadcast::new(id, NodeId(0), node_ids.clone()).expect("Instantiate broadcast");
@ -271,8 +270,8 @@ impl<A: Adversary> TestNetwork<A> {
/// Broadcasts a value from node 0 and expects all good nodes to receive it.
fn test_broadcast<A: Adversary>(mut network: TestNetwork<A>, proposed_value: &[u8]) {
// TODO: This returns an error in all but the first test.
let _ = simple_logger::init_with_level(log::Level::Debug);
// This returns an error in all but the first test.
let _ = env_logger::try_init();
// Make node 0 propose the value.
network.propose_value(NodeId(0), proposed_value.to_vec());
@ -288,36 +287,46 @@ fn test_broadcast<A: Adversary>(mut network: TestNetwork<A>, proposed_value: &[u
}
}
// TODO: Unignore once equal shards don't cause problems anymore.
#[test]
#[ignore]
fn test_8_broadcast_equal_leaves() {
fn test_8_broadcast_equal_leaves_silent() {
let adversary = SilentAdversary::new(MessageScheduler::Random);
// Space is ASCII character 32. So 32 spaces will create shards that are all equal, even if the
// length of the value is inserted.
test_broadcast(TestNetwork::new(8, 0, adversary), &[b' '; 32]);
}
// TODO: Unignore once node numbers are supported that are not powers of two.
#[test]
#[ignore]
fn test_13_broadcast_nodes_random_delivery() {
fn test_13_broadcast_nodes_random_delivery_silent() {
let adversary = SilentAdversary::new(MessageScheduler::Random);
test_broadcast(TestNetwork::new(13, 0, adversary), b"Foo");
}
#[test]
fn test_11_5_broadcast_nodes_random_delivery() {
fn test_4_broadcast_nodes_random_delivery_silent() {
let adversary = SilentAdversary::new(MessageScheduler::Random);
test_broadcast(TestNetwork::new(4, 0, adversary), b"Foo");
}
#[test]
fn test_11_5_broadcast_nodes_random_delivery_silent() {
let adversary = SilentAdversary::new(MessageScheduler::Random);
test_broadcast(TestNetwork::new(11, 5, adversary), b"Foo");
}
#[test]
fn test_11_5_broadcast_nodes_first_delivery() {
fn test_11_5_broadcast_nodes_first_delivery_silent() {
let adversary = SilentAdversary::new(MessageScheduler::First);
test_broadcast(TestNetwork::new(11, 5, adversary), b"Foo");
}
#[test]
fn test_3_1_broadcast_nodes_random_delivery_adv_propose() {
let good_nodes: BTreeSet<NodeId> = (0..3).map(NodeId).collect();
let adv_nodes: BTreeSet<NodeId> = (3..4).map(NodeId).collect();
let adversary = ProposeAdversary::new(MessageScheduler::Random, good_nodes, adv_nodes);
test_broadcast(TestNetwork::new(3, 1, adversary), b"Foo");
}
#[test]
fn test_11_5_broadcast_nodes_random_delivery_adv_propose() {
let good_nodes: BTreeSet<NodeId> = (0..11).map(NodeId).collect();