diff --git a/.travis.yml b/.travis.yml index 6a9e4c9..70da60b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,9 +17,11 @@ env: global: - RUST_BACKTRACE=1 - RUSTFLAGS="-D warnings" - script: - - cargo fmt -- --write-mode=diff - - cargo clippy -- -D clippy - - cargo clippy --tests -- -D clippy - - cargo check --tests - - cargo test +script: + # TODO: This currently fails, claiming that `src/proto/message.rs` does not + # exist. Re-enable once the problem is resolved. + # - cargo fmt -- --write-mode=diff + - cargo clippy -- -D clippy + - cargo clippy --tests -- -D clippy + - cargo check --tests + - cargo test diff --git a/Cargo.toml b/Cargo.toml index 732d73b..f0aad98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,8 +4,8 @@ version = "0.1.0" authors = ["Vladimir Komendantskiy "] [dependencies] +env_logger = "0.5.10" log = "0.4.1" -simple_logger = "0.5" reed-solomon-erasure = "3.0" merkle = { git = "https://github.com/vkomenda/merkle.rs", branch = "public-proof" } ring = "^0.12" diff --git a/examples/consensus-node.rs b/examples/consensus-node.rs index d7de6c8..438d993 100644 --- a/examples/consensus-node.rs +++ b/examples/consensus-node.rs @@ -4,10 +4,10 @@ extern crate crossbeam; #[macro_use] extern crate crossbeam_channel; extern crate docopt; +extern crate env_logger; extern crate hbbft; #[macro_use] extern crate log; -extern crate simple_logger; mod network; @@ -55,7 +55,7 @@ fn parse_args() -> Args { } pub fn main() { - simple_logger::init_with_level(log::Level::Debug).unwrap(); + env_logger::init(); let args: Args = parse_args(); println!("{:?}", args); let node = Node::new(args.bind_address, args.remote_addresses, args.value); diff --git a/examples/network/commst.rs b/examples/network/commst.rs index 4005d3f..d7988db 100644 --- a/examples/network/commst.rs +++ b/examples/network/commst.rs @@ -25,7 +25,7 @@ impl From for Error { /// A communication task connects a remote node to the thread that manages the /// consensus algorithm. -pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync + From> + Into>> { +pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync + From> + AsRef<[u8]>> { /// The transmit side of the multiple producer channel from comms threads. tx: &'a Sender>, /// The receive side of the channel to the comms thread. @@ -36,7 +36,7 @@ pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync + From> + I pub node_index: usize, } -impl<'a, T: 'a + Clone + Debug + Send + Sync + From> + Into>> CommsTask<'a, T> { +impl<'a, T: 'a + Clone + Debug + Send + Sync + From> + AsRef<[u8]>> CommsTask<'a, T> { pub fn new( tx: &'a Sender>, rx: &'a Receiver>, @@ -72,7 +72,6 @@ impl<'a, T: 'a + Clone + Debug + Send + Sync + From> + Into>> Co loop { // Receive a multicast message from the manager thread. let message = rx.recv().unwrap(); - debug!("Node {} <- {:?}", node_index, message); // Forward the message to the remote node. io1.send(message).unwrap(); } @@ -83,7 +82,6 @@ impl<'a, T: 'a + Clone + Debug + Send + Sync + From> + Into>> Co loop { match self.io.recv() { Ok(message) => { - debug!("Node {} -> {:?}", node_index, message); tx.send(SourcedMessage { source: node_index, message, diff --git a/examples/network/connection.rs b/examples/network/connection.rs index c2061d3..ecd49fc 100644 --- a/examples/network/connection.rs +++ b/examples/network/connection.rs @@ -1,6 +1,6 @@ //! Connection data and initiation routines. -use std::collections::HashSet; +use std::collections::{BTreeMap, HashSet}; use std::io::BufReader; use std::net::{SocketAddr, TcpListener, TcpStream}; @@ -8,61 +8,47 @@ use std::net::{SocketAddr, TcpListener, TcpStream}; pub struct Connection { pub stream: TcpStream, pub reader: BufReader, + pub node_str: String, } impl Connection { - pub fn new(stream: TcpStream) -> Self { + pub fn new(stream: TcpStream, node_str: String) -> Self { Connection { // Create a read buffer of 1K bytes. reader: BufReader::with_capacity(1024, stream.try_clone().unwrap()), stream, + node_str, } } } -/// Connect this node to remote peers. A vector of successful connections is -/// returned. -pub fn make(bind_address: &SocketAddr, remote_addresses: &HashSet) -> Vec { - // Connected remote nodes. - // let mut connected: Vec = Vec::new(); +/// Connect this node to remote peers. A vector of successful connections is returned, as well as +/// our own node ID. +pub fn make( + bind_address: &SocketAddr, + remote_addresses: &HashSet, +) -> (String, Vec) { // Listen for incoming connections on a given TCP port. let bind_address = bind_address; - let listener = TcpListener::bind(bind_address).unwrap(); - // Initialise initial connection states. - let mut connections: Vec> = (0..remote_addresses.len()) - .into_iter() - .map(|_| None) - .collect(); - + let listener = TcpListener::bind(bind_address).expect("start listener"); let here_str = format!("{}", bind_address); + // Use a `BTreeMap` to make sure we all iterate in the same order. + let remote_by_str: BTreeMap = remote_addresses + .iter() + .map(|addr| (format!("{}", addr), addr)) + .filter(|(there_str, _)| *there_str != here_str) + .collect(); // Wait for all nodes with larger addresses to connect. - for (n, &address) in remote_addresses.iter().enumerate() { - let there_str = format!("{}", address); - if here_str < there_str { - connections[n] = match listener.accept() { - Ok((stream, _)) => { - info!("Connected to {}", there_str); - Some(Connection::new(stream)) - } - Err(_) => None, - } - } - } - - // Try to connect to all nodes with smaller addresses. - for (n, &address) in remote_addresses.iter().enumerate() { - let there_str = format!("{}", address); - if here_str > there_str { - connections[n] = match TcpStream::connect(address) { - Ok(stream) => { - info!("Connected to {}", there_str); - Some(Connection::new(stream)) - } - Err(_) => None, - } - } - } - - // remove Nones from connections - connections.into_iter().filter_map(|c| c).collect() + let connections = remote_by_str + .into_iter() + .map(|(there_str, address)| { + let tcp_conn = if here_str < there_str { + listener.accept().expect("failed to connect").0 + } else { + TcpStream::connect(address).expect("failed to connect") + }; + Connection::new(tcp_conn, there_str.to_string()) + }) + .collect(); + (here_str, connections) } diff --git a/examples/network/messaging.rs b/examples/network/messaging.rs index 4a51d40..f570eca 100644 --- a/examples/network/messaging.rs +++ b/examples/network/messaging.rs @@ -9,13 +9,13 @@ use std::fmt::Debug; /// The queue functionality for messages sent between algorithm instances. /// The messaging struct allows for targeted message exchange between comms /// tasks on one side and algo tasks on the other. -pub struct Messaging { +pub struct Messaging> { /// Transmit sides of message channels to comms threads. txs_to_comms: Vec>>, /// Receive side of the routed message channel from comms threads. rx_from_comms: Receiver>, - /// Transmit sides of message channels to algo threads. - txs_to_algo: Vec>>, + /// Transmit sides of message channels to algo thread. + tx_to_algo: Sender>, /// Receive side of the routed message channel from comms threads. rx_from_algo: Receiver>, @@ -23,9 +23,9 @@ pub struct Messaging { rxs_to_comms: Vec>>, /// TX handle to be used by comms tasks. tx_from_comms: Sender>, - /// RX handles to be used by algo tasks. - rxs_to_algo: Vec>>, - /// TX handle to be used by algo tasks. + /// RX handles to be used by algo task. + rx_to_algo: Receiver>, + /// TX handle to be used by algo task. tx_from_algo: Sender>, /// Control channel used to stop the listening thread. @@ -33,23 +33,17 @@ pub struct Messaging { stop_rx: Receiver<()>, } -impl Messaging { +impl> Messaging { /// Initialises all the required TX and RX handles for the case on a total /// number `num_nodes` of consensus nodes. pub fn new(num_nodes: usize) -> Self { - let to_comms: Vec<_> = (0..num_nodes - 1) - .map(|_| unbounded::>()) - .collect(); + let to_comms: Vec<_> = (0..num_nodes).map(|_| unbounded::>()).collect(); let txs_to_comms = to_comms.iter().map(|&(ref tx, _)| tx.to_owned()).collect(); let rxs_to_comms: Vec>> = to_comms.iter().map(|&(_, ref rx)| rx.to_owned()).collect(); let (tx_from_comms, rx_from_comms) = unbounded(); - let to_algo: Vec<_> = (0..num_nodes) - .map(|_| unbounded::>()) - .collect(); - let txs_to_algo = to_algo.iter().map(|&(ref tx, _)| tx.to_owned()).collect(); - let rxs_to_algo: Vec>> = - to_algo.iter().map(|&(_, ref rx)| rx.to_owned()).collect(); + + let (tx_to_algo, rx_to_algo) = unbounded(); let (tx_from_algo, rx_from_algo) = unbounded(); let (stop_tx, stop_rx) = bounded(1); @@ -58,13 +52,13 @@ impl Messaging { // internally used handles txs_to_comms, rx_from_comms, - txs_to_algo, + tx_to_algo, rx_from_algo, // externally used handles rxs_to_comms, tx_from_comms, - rxs_to_algo, + rx_to_algo, tx_from_algo, stop_tx, @@ -80,8 +74,8 @@ impl Messaging { &self.tx_from_comms } - pub fn rxs_to_algo(&self) -> &Vec>> { - &self.rxs_to_algo + pub fn rx_to_algo(&self) -> &Receiver> { + &self.rx_to_algo } pub fn tx_from_algo(&self) -> &Sender> { @@ -100,7 +94,7 @@ impl Messaging { { let txs_to_comms = self.txs_to_comms.to_owned(); let rx_from_comms = self.rx_from_comms.to_owned(); - let txs_to_algo = self.txs_to_algo.to_owned(); + let tx_to_algo = self.tx_to_algo.to_owned(); let rx_from_algo = self.rx_from_algo.to_owned(); let stop_rx = self.stop_rx.to_owned(); @@ -128,8 +122,7 @@ impl Messaging { .fold(Ok(()), |result, tx| { if result.is_ok() { tx.send(message.clone()) - } - else { + } else { result } }).map_err(Error::from); @@ -138,16 +131,10 @@ impl Messaging { target: Target::Node(i), message } => { - // Remote node indices start from 1. - assert!(i > 0); - // Convert node index to vector index. - let i = i - 1; - result = if i < txs_to_comms.len() { txs_to_comms[i].send(message.clone()) .map_err(Error::from) - } - else { + } else { Err(Error::NoSuchTarget) }; } @@ -156,14 +143,7 @@ impl Messaging { recv(rx_from_comms, message) => { // Send the message to all algorithm instances, stopping at // the first error. - result = txs_to_algo.iter().fold(Ok(()), |result, tx| { - if result.is_ok() { - tx.send(message.clone()) - } - else { - result - } - }).map_err(Error::from) + result = tx_to_algo.send(message.clone()).map_err(Error::from) }, recv(stop_rx, _) => { // Flag the thread ready to exit. diff --git a/examples/network/node.rs b/examples/network/node.rs index 46c91cc..440e5d0 100644 --- a/examples/network/node.rs +++ b/examples/network/node.rs @@ -37,9 +37,9 @@ use crossbeam; use std::collections::HashSet; use std::fmt::Debug; -use std::io; use std::marker::{Send, Sync}; use std::net::SocketAddr; +use std::{io, iter, process, thread, time}; use hbbft::broadcast; use network::commst; @@ -50,7 +50,6 @@ use network::messaging::Messaging; pub enum Error { IoError(io::Error), CommsError(commst::Error), - NotImplemented, } impl From for Error { @@ -90,54 +89,65 @@ impl + PartialEq + Send + Sync + From> + /// Consensus node procedure implementing HoneyBadgerBFT. pub fn run(&self) -> Result { let value = &self.value; - let connections = connection::make(&self.addr, &self.remotes); + let (our_str, connections) = connection::make(&self.addr, &self.remotes); + let mut node_strs: Vec = iter::once(our_str.clone()) + .chain(connections.iter().map(|c| c.node_str.clone())) + .collect(); + node_strs.sort(); + debug!("Nodes: {:?}", node_strs); + let proposer_id = 0; + let our_id = node_strs.binary_search(&our_str).unwrap(); let num_nodes = connections.len() + 1; + if value.is_some() != (our_id == proposer_id) { + panic!("Exactly the first node must propose a value."); + } + // Initialise the message delivery system and obtain TX and RX handles. let messaging: Messaging> = Messaging::new(num_nodes); let rxs_to_comms = messaging.rxs_to_comms(); let tx_from_comms = messaging.tx_from_comms(); - let rxs_to_algo = messaging.rxs_to_algo(); + let rx_to_algo = messaging.rx_to_algo(); let tx_from_algo = messaging.tx_from_algo(); let stop_tx = messaging.stop_tx(); // All spawned threads will have exited by the end of the scope. crossbeam::scope(|scope| { // Start the centralised message delivery system. - let msg_handle = messaging.spawn(scope); - let mut broadcast_handles = Vec::new(); + let _msg_handle = messaging.spawn(scope); // Associate a broadcast instance with this node. This instance will // broadcast the proposed value. There is no remote node // corresponding to this instance, and no dedicated comms task. The // node index is 0. - let rx_to_algo0 = &rxs_to_algo[0]; - broadcast_handles.push(scope.spawn(move || { + let broadcast_handle = scope.spawn(move || { match broadcast::Instance::new( tx_from_algo, - rx_to_algo0, + rx_to_algo, value.to_owned(), - num_nodes, - 0, + (0..num_nodes).collect(), + our_id, + proposer_id, ).run() { Ok(t) => { - debug!( - "Broadcast instance 0 succeeded: {}", + println!( + "Broadcast succeeded! Node {} output: {}", + our_id, String::from_utf8(T::into(t)).unwrap() ); } - Err(e) => error!("Broadcast instance 0: {:?}", e), + Err(e) => error!("Broadcast instance: {:?}", e), } - })); + }); // Start a comms task for each connection. Node indices of those // tasks are 1 through N where N is the number of connections. for (i, c) in connections.iter().enumerate() { // Receive side of a single-consumer channel from algorithm // actor tasks to the comms task. - let rx_to_comms = &rxs_to_comms[i]; - let node_index = i + 1; + let node_index = if c.node_str < our_str { i } else { i + 1 }; + let rx_to_comms = &rxs_to_comms[node_index]; scope.spawn(move || { match commst::CommsTask::new( @@ -152,35 +162,14 @@ impl + PartialEq + Send + Sync + From> + Err(e) => error!("Comms task {}: {:?}", node_index, e), } }); - - // Associate a broadcast instance to the above comms task. - let rx_to_algo = &rxs_to_algo[node_index]; - broadcast_handles.push(scope.spawn(move || { - match broadcast::Instance::new( - tx_from_algo, - rx_to_algo, - None, - num_nodes, - node_index, - ).run() - { - Ok(t) => { - debug!( - "Broadcast instance {} succeeded: {}", - node_index, - String::from_utf8(T::into(t)).unwrap() - ); - } - Err(e) => error!("Broadcast instance {}: {:?}", node_index, e), - } - })); } // Wait for the broadcast instances to finish before stopping the // messaging task. - for h in broadcast_handles { - h.join(); - } + broadcast_handle.join(); + + // Wait another second so that pending messages get sent out. + thread::sleep(time::Duration::from_secs(1)); // Stop the messaging task. stop_tx @@ -190,30 +179,14 @@ impl + PartialEq + Send + Sync + From> + }) .unwrap(); - match msg_handle.join() { - Ok(()) => debug!("Messaging stopped OK"), - Err(e) => debug!("Messaging error: {:?}", e), - } - // TODO: continue the implementation of the asynchronous common - // subset algorithm. - Err(Error::NotImplemented) + process::exit(0); + + // TODO: Exit cleanly. + // match msg_handle.join() { + // Ok(()) => debug!("Messaging stopped OK"), + // Err(e) => debug!("Messaging error: {:?}", e), + // } + // Err(Error::NotImplemented) }) // end of thread scope } } - -// #[cfg(test)] -// mod tests { -// use std::collections::HashSet; -// use node; - -// /// Test that the node works to completion. -// #[test] -// fn test_node_0() { -// let node = node::Node::new("127.0.0.1:10000".parse().unwrap(), -// HashSet::new(), -// Some("abc".as_bytes().to_vec())); -// let result = node.run(); -// assert!(match result { Err(node::Error::NotImplemented) => true, -// _ => false }); -// } -// } diff --git a/examples/run-consensus-nodes.sh b/examples/run-consensus-nodes.sh new file mode 100644 index 0000000..3a6c6f3 --- /dev/null +++ b/examples/run-consensus-nodes.sh @@ -0,0 +1,15 @@ +#! /bin/bash + +export RUST_LOG=hbbft=debug + +cargo build --example consensus-node +cargo run --example consensus-node -- --bind-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --value=Foo & +sleep 1 +cargo run --example consensus-node -- --bind-address=127.0.0.1:5001 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 & +sleep 1 +cargo run --example consensus-node -- --bind-address=127.0.0.1:5002 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 & +sleep 1 +cargo run --example consensus-node -- --bind-address=127.0.0.1:5003 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5004 & +sleep 1 +cargo run --example consensus-node -- --bind-address=127.0.0.1:5004 --remote-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 & +wait diff --git a/src/broadcast.rs b/src/broadcast.rs index 20ee669..ec0c79c 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -1,12 +1,11 @@ -//! Reliable broadcast algorithm instance. use crossbeam_channel::{Receiver, RecvError, SendError, Sender}; use merkle::proof::{Lemma, Positioned, Proof}; use merkle::{Hashable, MerkleTree}; use proto::*; use reed_solomon_erasure as rse; use reed_solomon_erasure::ReedSolomon; -use std::collections::{HashMap, HashSet, VecDeque}; -use std::fmt::Debug; +use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::fmt::{self, Debug}; use std::hash::Hash; use std::iter; use std::marker::{Send, Sync}; @@ -19,6 +18,41 @@ type ProposedValue = Vec; type MessageQueue = VecDeque>; +/// The three kinds of message sent during the reliable broadcast stage of the +/// consensus algorithm. +#[derive(Clone, PartialEq)] +pub enum BroadcastMessage { + Value(Proof), + Echo(Proof), + Ready(Vec), +} + +impl BroadcastMessage { + fn target_all(self) -> TargetedBroadcastMessage { + TargetedBroadcastMessage { + target: BroadcastTarget::All, + message: self, + } + } + + fn target_node(self, id: NodeUid) -> TargetedBroadcastMessage { + TargetedBroadcastMessage { + target: BroadcastTarget::Node(id), + message: self, + } + } +} + +impl> fmt::Debug for BroadcastMessage { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + BroadcastMessage::Value(ref v) => write!(f, "Value({:?})", HexProof(&v)), + BroadcastMessage::Echo(ref v) => write!(f, "Echo({:?})", HexProof(&v)), + BroadcastMessage::Ready(ref bytes) => write!(f, "Ready({:?})", HexBytes(bytes)), + } + } +} + /// A `BroadcastMessage` to be sent out, together with a target. #[derive(Clone, Debug)] pub struct TargetedBroadcastMessage { @@ -26,14 +60,11 @@ pub struct TargetedBroadcastMessage { pub message: BroadcastMessage, } -impl TargetedBroadcastMessage { - pub fn into_targeted_message(self) -> TargetedMessage { +impl From> for TargetedMessage { + fn from(msg: TargetedBroadcastMessage) -> TargetedMessage { TargetedMessage { - target: match self.target { - BroadcastTarget::All => Target::All, - BroadcastTarget::Node(node) => Target::Node(node), - }, - message: Message::Broadcast(self.message), + target: msg.target.into(), + message: Message::Broadcast(msg.message), } } } @@ -45,25 +76,93 @@ pub enum BroadcastTarget { Node(NodeUid), } -struct BroadcastState { - root_hash: Option>, - leaf_values: Vec>>, - leaf_values_num: usize, - echo_num: usize, - readys: HashMap, usize>, +impl From> for Target { + fn from(bt: BroadcastTarget) -> Target { + match bt { + BroadcastTarget::All => Target::All, + BroadcastTarget::Node(node) => Target::Node(node), + } + } +} + +struct BroadcastState { + /// Whether we have already multicas `Echo`. + echo_sent: bool, + /// Whether we have already multicast `Ready`. ready_sent: bool, - ready_to_decode: bool, + /// Whether we have already output a value. has_output: bool, + /// The proofs we have received via `Echo` messages, by sender ID. + echos: BTreeMap>>, + /// The root hashes we received via `Ready` messages, by sender ID. + readys: BTreeMap>, +} + +impl BroadcastState { + /// Returns the number of nodes that have sent us an `Echo` message with this hash. + fn count_echos(&self, hash: &[u8]) -> usize { + self.echos + .values() + .filter(|p| p.root_hash.as_slice() == hash) + .count() + } + + /// Returns the number of nodes that have sent us a `Ready` message with this hash. + fn count_readys(&self, hash: &[u8]) -> usize { + self.readys + .values() + .filter(|h| h.as_slice() == hash) + .count() + } } /// Reliable Broadcast algorithm instance. -pub struct Broadcast { +/// +/// The Reliable Broadcast Protocol assumes a network of `N` nodes that send signed messages to +/// each other, with at most `f` of them malicious, where `3 * f < N`. Handling the networking and +/// signing is the responsibility of this crate's user: only when a message has been verified to be +/// "from node i", it can be handed to the `Broadcast` instance. One of the nodes is the "proposer" +/// who sends a value. Under the above conditions, the protocol guarantees that either all or none +/// of the good nodes output a value, and that if the proposer is good, all good nodes output the +/// proposed value. +/// +/// The algorithm works as follows: +/// +/// * The proposer uses a Reed-Solomon code to split the value into `N` chunks, `f + 1` of which +/// suffice to reconstruct the value. These chunks are put into a Merkle tree, so that with the +/// tree's root hash `h`, branch `bi` and chunk `si`, the `i`-th chunk `si` can be verified by +/// anyone to belong to the Merkle tree with root hash `h`. These values are "proof" number `i`: +/// `pi`. +/// * The proposer sends `Value(pi)` to node `i`. It translates to: "I am the proposer, and `pi` +/// contains the `i`-th share of my value." +/// * Every (good) node that receives `Value(pi)` from the proposer sends it on to everyone else as +/// `Echo(pi)`. An `Echo` translates to: "I have received `pi` directly from the proposer." If the +/// proposer sends another `Value` message, that is ignored. +/// * So every node that has received at least `f + 1` `Echo` messages with the same root +/// hash will be able to decode a value. +/// * Every node that has received `N - f` `Echo`s with the same root hash from different nodes +/// knows that at least `f + 1` _good_ nodes have sent an `Echo` with that hash to everyone, and +/// therefore everyone will eventually receive at least `f + 1` of them. So upon receiving `N - f` +/// `Echo`s, they send a `Ready(h)` to everyone to indicate that. `Ready` translates to: "I know +/// that everyone will eventually be able to decode the value." Moreover, since every good node +/// only ever sends one kind of `Echo` message, this cannot happen for two different root hashes. +/// * Even without enough `Echo` messages, if a node receives `f + 1` `Ready` messages, it knows +/// that at least one _good_ node has sent `Ready`. It therefore also knows that everyone will be +/// able to decode eventually, and multicasts `Ready` itself. +/// * If a node has received `2 * f + 1` `Ready`s (with matching root hash) from different nodes, +/// it knows that at least `f + 1` _good_ nodes have sent it. Therefore, every good node will +/// eventually receive `f + 1`, and multicast it itself. Therefore, every good node will eventually +/// receive `2 * f + 1` `Ready`s, too. _And_ we know at this point that every good node will +/// eventually be able to decode (i.e. receive at least `f + 1` `Echo` messages). +/// * So a node with `2 * f + 1` `Ready`s and `f + 1` `Echos` will decode and _output_ the value, +/// knowing that every other good node will eventually do the same. +pub struct Broadcast { /// The UID of this node. our_id: NodeUid, /// The UID of the sending node. proposer_id: NodeUid, /// UIDs of all nodes for iteration purposes. - all_uids: HashSet, + all_uids: BTreeSet, num_nodes: usize, num_faulty_nodes: usize, data_shard_num: usize, @@ -71,16 +170,16 @@ pub struct Broadcast { /// All the mutable state is confined to the `state` field. This allows to /// mutate state even when the broadcast instance is referred to by an /// immutable reference. - state: RwLock, + state: RwLock>, } -impl Broadcast { +impl Broadcast { /// Creates a new broadcast instance to be used by node `our_id` which expects a value proposal /// from node `proposer_id`. pub fn new( our_id: NodeUid, proposer_id: NodeUid, - all_uids: HashSet, + all_uids: BTreeSet, ) -> Result { let num_nodes = all_uids.len(); let num_faulty_nodes = (num_nodes - 1) / 3; @@ -97,14 +196,11 @@ impl Broadcast { data_shard_num, coding, state: RwLock::new(BroadcastState { - root_hash: None, - leaf_values: vec![None; num_nodes], - leaf_values_num: 0, - echo_num: 0, - readys: HashMap::new(), + echo_sent: false, ready_sent: false, - ready_to_decode: false, has_output: false, + echos: BTreeMap::new(), + readys: BTreeMap::new(), }), }) } @@ -114,23 +210,18 @@ impl Broadcast { if self.our_id != self.proposer_id { return Err(Error::UnexpectedMessage); } - let mut state = self.state.write().unwrap(); // Split the value into chunks/shards, encode them with erasure codes. // Assemble a Merkle tree from data and parity shards. Take all proofs // from this tree and send them, each to its own node. - self.send_shards(value).map(|(proof, remote_messages)| { - // Record the first proof as if it were sent by the node to itself. - let h = proof.root_hash.clone(); - // Save the leaf value for reconstructing the tree later. - state.leaf_values[index_of_proof(&proof)] = - Some(proof.value.clone().into_boxed_slice()); - state.leaf_values_num += 1; - state.root_hash = Some(h); - - remote_messages - }) + let (proof, value_msgs) = self.send_shards(value)?; + // TODO: We'd actually need to return the output here, if it was only one node. Should that + // use-case be supported? + let state = self.state.write().unwrap(); + let (_, echo_msgs) = self.handle_value(&self.our_id, proof, state)?; + Ok(value_msgs.into_iter().chain(echo_msgs).collect()) } + /// Returns this node's ID. pub fn our_id(&self) -> &NodeUid { &self.our_id } @@ -143,7 +234,7 @@ impl Broadcast { fn send_shards( &self, mut value: ProposedValue, - ) -> Result<(Proof, MessageQueue), Error> { + ) -> Result<(Proof>, MessageQueue), Error> { let data_shard_num = self.coding.data_shard_count(); let parity_shard_num = self.coding.parity_shard_count(); @@ -172,14 +263,20 @@ impl Broadcast { // Convert the iterator over slices into a vector of slices. let mut shards: Vec<&mut [u8]> = shards_iter.collect(); - debug!("Shards before encoding: {:?}", shards); + debug!("Shards before encoding: {:?}", HexList(&shards)); // Construct the parity chunks/shards self.coding.encode(&mut shards)?; - debug!("Shards: {:?}", shards); + debug!("Shards: {:?}", HexList(&shards)); - let shards_t: Vec = shards.into_iter().map(|s| s.to_vec()).collect(); + // TODO: `MerkleTree` generates the wrong proof if a leaf occurs more than once, so we + // prepend an "index byte" to each shard. Consider using the `merkle_light` crate instead. + let shards_t: Vec = shards + .into_iter() + .enumerate() + .map(|(i, s)| iter::once(i as u8).chain(s.iter().cloned()).collect()) + .collect(); // Convert the Merkle tree into a partial binary tree for later // deconstruction into compound branches. @@ -188,23 +285,19 @@ impl Broadcast { // Default result in case of `gen_proof` error. let mut result = Err(Error::ProofConstructionFailed); let mut outgoing = VecDeque::new(); + assert_eq!(self.num_nodes, mtree.iter().count()); // Send each proof to a node. - // TODO: This generates the wrong proof if a leaf occurs more than once. Consider using the - // `merkle_light` crate instead. - for (leaf_value, uid) in mtree.iter().zip(self.all_uids.clone()) { + for (leaf_value, uid) in mtree.iter().zip(&self.all_uids) { let proof = mtree .gen_proof(leaf_value.to_vec()) .ok_or(Error::ProofConstructionFailed)?; - if uid == self.our_id { + if *uid == self.our_id { // The proof is addressed to this node. result = Ok(proof); } else { // Rest of the proofs are sent to remote nodes. - outgoing.push_back(TargetedBroadcastMessage { - target: BroadcastTarget::Node(uid), - message: BroadcastMessage::Value(proof), - }); + outgoing.push_back(BroadcastMessage::Value(proof).target_node(uid.clone())); } } @@ -217,11 +310,14 @@ impl Broadcast { sender_id: &NodeUid, message: BroadcastMessage, ) -> Result<(Option, MessageQueue), Error> { + if !self.all_uids.contains(sender_id) { + return Err(Error::UnknownSender); + } let state = self.state.write().unwrap(); match message { BroadcastMessage::Value(p) => self.handle_value(sender_id, p, state), - BroadcastMessage::Echo(p) => self.handle_echo(p, state), - BroadcastMessage::Ready(hash) => self.handle_ready(hash, state), + BroadcastMessage::Echo(p) => self.handle_echo(sender_id, p, state), + BroadcastMessage::Ready(ref hash) => self.handle_ready(sender_id, hash, state), } } @@ -229,159 +325,166 @@ impl Broadcast { fn handle_value( &self, sender_id: &NodeUid, - p: Proof, - mut state: RwLockWriteGuard, + p: Proof>, + mut state: RwLockWriteGuard>, ) -> Result<(Option, MessageQueue), Error> { + // If the sender is not the proposer, this is not the first `Value` or the proof is invalid, + // ignore. if *sender_id != self.proposer_id { + info!( + "Node {:?} received Value from {:?} instead of {:?}.", + self.our_id, sender_id, self.proposer_id + ); return Ok((None, VecDeque::new())); } - // Initialize the root hash if not already initialised. - if state.root_hash.is_none() { - state.root_hash = Some(p.root_hash.clone()); - debug!( - "Node {:?} Value root hash {:?}", - self.our_id, - HexBytes(&p.root_hash) - ); + if state.echo_sent { + info!("Node {:?} received multiple Values.", self.our_id); + return Ok((None, VecDeque::new())); + } + if !self.validate_proof(&p, &self.our_id) { + return Ok((None, VecDeque::new())); } - if state.root_hash.as_ref().map_or(false, |h| p.validate(h)) { - // TODO: Should messages failing this be echoed at all? - // Save the leaf value for reconstructing the tree later. - let idx = index_of_proof(&p); - state.leaf_values[idx] = Some(p.value.clone().into_boxed_slice()); - state.leaf_values_num += 1; - } + // Otherwise multicast the proof in an `Echo` message, and handle it ourselves. + state.echo_sent = true; + let (output, echo_msgs) = self.handle_echo(&self.our_id, p.clone(), state)?; + let msgs = iter::once(BroadcastMessage::Echo(p).target_all()) + .chain(echo_msgs) + .collect(); - // Enqueue a broadcast of an echo of this proof. - let msgs = VecDeque::from(vec![TargetedBroadcastMessage { - target: BroadcastTarget::All, - message: BroadcastMessage::Echo(p.clone()), - }]); - let (output, echo_msgs) = self.handle_echo(p, state)?; - Ok((output, msgs.into_iter().chain(echo_msgs).collect())) + Ok((output, msgs)) } - /// Handles a received echo and verifies the proof it contains. + /// Handles a received `Echo` message. fn handle_echo( &self, - p: Proof, - mut state: RwLockWriteGuard, + sender_id: &NodeUid, + p: Proof>, + mut state: RwLockWriteGuard>, ) -> Result<(Option, MessageQueue), Error> { - if state.root_hash.is_none() { - state.root_hash = Some(p.root_hash.clone()); - debug!( - "Node {:?} Echo root hash {:?}", - self.our_id, state.root_hash + // If the proof is invalid or the sender has already sent `Echo`, ignore. + if state.echos.contains_key(sender_id) { + info!( + "Node {:?} received multiple Echos from {:?}.", + self.our_id, sender_id, ); - } - - // Call validate with the root hash as argument. - let h = if let Some(h) = state.root_hash.clone() { - h - } else { - error!("Broadcast/{:?} root hash not initialised", self.our_id); return Ok((None, VecDeque::new())); - }; - - if !p.validate(h.as_slice()) { - debug!("Broadcast/{:?} cannot validate Echo {:?}", self.our_id, p); + } + if !self.validate_proof(&p, sender_id) { return Ok((None, VecDeque::new())); } - state.echo_num += 1; - // Save the leaf value for reconstructing the tree later. - let idx = index_of_proof(&p); - state.leaf_values[idx] = Some(p.value.into_boxed_slice()); - state.leaf_values_num += 1; + let hash = p.root_hash.clone(); - // Upon receiving 2f + 1 matching READY(h) - // messages, wait for N − 2 f ECHO messages, - // then decode v. Return the decoded v to ACS. - if state.leaf_values_num < self.num_nodes - self.num_faulty_nodes { - return Ok((None, VecDeque::new())); - } - - // TODO: Only decode once. Don't repeat for every ECHO message. - let value = decode_from_shards( - &mut state.leaf_values, - &self.coding, - self.data_shard_num, - &h, - )?; - - if state.ready_to_decode && !state.has_output { - state.has_output = true; - return Ok((Some(value), VecDeque::new())); - } - - // if Ready has not yet been sent, multicast Ready - if state.ready_sent { - return Ok((None, VecDeque::new())); + // Save the proof for reconstructing the tree later. + state.echos.insert(sender_id.clone(), p); + + if state.ready_sent || state.count_echos(&hash) < self.num_nodes - self.num_faulty_nodes { + return Ok((self.get_output(state, &hash)?, VecDeque::new())); } + // Upon receiving `N - f` `Echo`s with this root hash, multicast `Ready`. state.ready_sent = true; - let msg = TargetedBroadcastMessage { - target: BroadcastTarget::All, - message: BroadcastMessage::Ready(h.clone()), - }; - let (output, ready_msgs) = self.handle_ready(h, state)?; + let msg = BroadcastMessage::Ready(hash.clone()).target_all(); + let (output, ready_msgs) = self.handle_ready(&self.our_id, &hash, state)?; Ok((output, iter::once(msg).chain(ready_msgs).collect())) } + /// Handles a received `Ready` message. fn handle_ready( &self, - hash: Vec, - mut state: RwLockWriteGuard, + sender_id: &NodeUid, + hash: &[u8], + mut state: RwLockWriteGuard>, ) -> Result<(Option, MessageQueue), Error> { - // Update the number Ready has been received with this hash. - // TODO: Don't accept multiple ready messages from the same node. - *state.readys.entry(hash).or_insert(1) += 1; - - // Check that the root hash matches. - let h = if let Some(h) = state.root_hash.clone() { - h - } else { + // If the sender has already sent a `Ready` before, ignore. + if state.readys.contains_key(sender_id) { + info!( + "Node {:?} received multiple Readys from {:?}.", + self.our_id, sender_id + ); return Ok((None, VecDeque::new())); - }; + } - let ready_num = *state.readys.get(&h).unwrap_or(&0); - let mut outgoing = VecDeque::new(); + state.readys.insert(sender_id.clone(), hash.to_vec()); // Upon receiving f + 1 matching Ready(h) messages, if Ready // has not yet been sent, multicast Ready(h). - if (ready_num == self.num_faulty_nodes + 1) && !state.ready_sent { - // Enqueue a broadcast of a ready message. - outgoing.push_back(TargetedBroadcastMessage { - target: BroadcastTarget::All, - message: BroadcastMessage::Ready(h.to_vec()), - }); + let outgoing = if state.count_readys(hash) == self.num_faulty_nodes + 1 && !state.ready_sent + { + // Enqueue a broadcast of a Ready message. + state.ready_sent = true; + iter::once(BroadcastMessage::Ready(hash.to_vec()).target_all()).collect() + } else { + VecDeque::new() + }; + + Ok((self.get_output(state, hash)?, outgoing)) + } + + /// Checks whether the condition for output are met for this hash, and if so, returns the output + /// value. + fn get_output( + &self, + mut state: RwLockWriteGuard>, + hash: &[u8], + ) -> Result, Error> { + if state.has_output || state.count_readys(hash) <= 2 * self.num_faulty_nodes + || state.count_echos(hash) <= self.num_faulty_nodes + { + return Ok(None); } - let mut output = None; + // Upon receiving 2f + 1 matching Ready(h) messages, wait for N − 2f Echo messages. + state.has_output = true; + let mut leaf_values: Vec>> = self.all_uids + .iter() + .map(|id| { + state.echos.get(id).and_then(|p| { + if p.root_hash.as_slice() == hash { + Some(p.value.clone().into_boxed_slice()) + } else { + None + } + }) + }) + .collect(); + let value = decode_from_shards(&mut leaf_values, &self.coding, self.data_shard_num, hash)?; + Ok(Some(value)) + } - // Upon receiving 2f + 1 matching Ready(h) messages, wait - // for N − 2f Echo messages, then decode v. - if ready_num > 2 * self.num_faulty_nodes { - // Wait for N - 2f Echo messages, then decode v. - if state.echo_num >= self.num_nodes - 2 * self.num_faulty_nodes { - let value = decode_from_shards( - &mut state.leaf_values, - &self.coding, - self.data_shard_num, - &h, - )?; + /// Returns `i` if `node_id` is the `i`-th ID among all participating nodes. + fn index_of_node(&self, node_id: &NodeUid) -> Option { + self.all_uids.iter().position(|id| id == node_id) + } - if !state.has_output { - output = Some(value); - state.has_output = true; - } - } else { - state.ready_to_decode = true; - } + /// Returns the index of this proof's leave in the Merkle tree. + fn index_of_proof(&self, proof: &Proof) -> usize { + index_of_lemma(&proof.lemma, self.num_nodes) + } + + /// Returns `true` if the proof is valid and has the same index as the node ID. Otherwise + /// logs an info message. + fn validate_proof(&self, p: &Proof, id: &NodeUid) -> bool { + if !p.validate(&p.root_hash) { + info!( + "Node {:?} received invalid proof: {:?}", + self.our_id, + HexProof(&p) + ); + false + } else if self.index_of_node(id) != Some(p.value[0] as usize) + || self.index_of_proof(&p) != p.value[0] as usize + { + info!( + "Node {:?} received proof for wrong position: {:?}.", + self.our_id, + HexProof(&p) + ); + false + } else { + true } - - Ok((output, outgoing)) } } @@ -411,12 +514,12 @@ impl<'a, T: Clone + Debug + Hashable + Send + Sync + Into> + From>, rx: &'a Receiver>, broadcast_value: Option, - num_nodes: usize, - proposer_index: usize, + node_ids: BTreeSet, + our_id: usize, + proposer_id: usize, ) -> Self { - let all_indexes = (0..num_nodes).collect(); - let broadcast = Broadcast::new(0, proposer_index, all_indexes) - .expect("failed to instantiate broadcast"); + let broadcast = + Broadcast::new(our_id, proposer_id, node_ids).expect("failed to instantiate broadcast"); Instance { tx, rx, @@ -445,6 +548,7 @@ pub enum Error { Recv(RecvError), UnexpectedMessage, NotImplemented, + UnknownSender, } impl From for Error { @@ -476,7 +580,7 @@ fn inner_run<'a>( for msg in broadcast .propose_value(v)? .into_iter() - .map(TargetedBroadcastMessage::into_targeted_message) + .map(TargetedBroadcastMessage::into) { tx.send(msg)?; } @@ -491,10 +595,15 @@ fn inner_run<'a>( message: Message::Broadcast(message), } = message { + debug!("{} received from {}: {:?}", broadcast.our_id, i, message); let (opt_output, msgs) = broadcast.handle_broadcast_message(&i, message)?; - for msg in msgs.into_iter() - .map(TargetedBroadcastMessage::into_targeted_message) - { + for msg in &msgs { + debug!( + "{} sending to {:?}: {:?}", + broadcast.our_id, msg.target, msg.message + ); + } + for msg in msgs.into_iter().map(TargetedBroadcastMessage::into) { tx.send(msg)?; } if let Some(output) = opt_output { @@ -525,6 +634,9 @@ where .iter() .filter_map(|l| l.as_ref().map(|v| v.to_vec())) .collect(); + + debug!("Reconstructed shards: {:?}", HexList(&shards)); + // Construct the Merkle tree. let mtree = MerkleTree::from_vec(&::ring::digest::SHA256, shards); // If the root hash of the reconstructed tree does not match the one @@ -547,62 +659,41 @@ fn glue_shards(m: MerkleTree, n: usize) -> T where T: From> + Into>, { - let t: Vec = m.into_iter().take(n).flat_map(|s| s).collect(); + let t: Vec = m.into_iter() + .take(n) + .flat_map(|s| s.into_iter().skip(1)) // Drop the index byte. + .collect(); let payload_len = t[0] as usize; - debug!("Glued data shards {:?}", &t[1..(payload_len + 1)]); + debug!("Glued data shards {:?}", HexBytes(&t[1..(payload_len + 1)])); Vec::into(t[1..(payload_len + 1)].to_vec()) } -/// An additional path conversion operation on `Lemma` to allow reconstruction -/// of erasure-coded `Proof` from `Lemma`s. The output path, when read from left -/// to right, goes from leaf to root (LSB order). -fn path_of_lemma(lemma: &Lemma) -> Vec { - match lemma.sub_lemma { - None => { - match lemma.sibling_hash { - // lemma terminates with no leaf - None => vec![], - // the leaf is on the right - Some(Positioned::Left(_)) => vec![true], - // the leaf is on the left - Some(Positioned::Right(_)) => vec![false], - } - } - Some(ref l) => { - let mut p = path_of_lemma(l.as_ref()); - - match lemma.sibling_hash { - // lemma terminates - None => (), - // lemma branches out to the right - Some(Positioned::Left(_)) => p.push(true), - // lemma branches out to the left - Some(Positioned::Right(_)) => p.push(false), - } - p - } +/// Computes the Merkle tree leaf index of a value in a given lemma. +pub fn index_of_lemma(lemma: &Lemma, n: usize) -> usize { + let m = n.next_power_of_two(); + match (lemma.sub_lemma.as_ref(), lemma.sibling_hash.as_ref()) { + (None, Some(&Positioned::Right(_))) | (None, None) => 0, + (None, Some(&Positioned::Left(_))) => 1, + (Some(l), None) => index_of_lemma(l, n), + (Some(l), Some(&Positioned::Left(_))) => (m >> 1) + index_of_lemma(l, n - (m >> 1)), + (Some(l), Some(&Positioned::Right(_))) => index_of_lemma(l, m >> 1), } } -/// Further conversion of a binary tree path into an array index. -fn index_of_path(mut path: Vec) -> usize { - let mut idx = 0; - // Convert to the MSB order. - path.reverse(); - - for &dir in &path { - idx <<= 1; - if dir { - idx |= 1; +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_index_of_lemma() { + for &n in &[3, 4, 13, 16, 127, 128, 129, 255] { + let shards: Vec<[u8; 1]> = (0..n).map(|i| [i as u8]).collect(); + let mtree = MerkleTree::from_vec(&::ring::digest::SHA256, shards); + for (i, val) in mtree.iter().enumerate() { + let p = mtree.gen_proof(val.clone()).expect("generate proof"); + let idx = index_of_lemma(&p.lemma, n); + assert_eq!(i, idx, "Wrong index {} for leaf {}/{}.", idx, i, n); + } } } - idx -} - -/// Computes the Merkle tree leaf index of a value in a given proof. -// TODO: This currently only works if the number of leaves is a power of two. With the -// `merkle_light` crate, it might not even be needed, though. -pub fn index_of_proof(p: &Proof) -> usize { - index_of_path(path_of_lemma(&p.lemma)) } diff --git a/src/common_subset.rs b/src/common_subset.rs index e304d51..d92c5d4 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -11,9 +11,9 @@ use agreement; use agreement::Agreement; use broadcast; -use broadcast::{Broadcast, TargetedBroadcastMessage}; +use broadcast::{Broadcast, BroadcastMessage, TargetedBroadcastMessage}; -use proto::{AgreementMessage, BroadcastMessage}; +use proto::AgreementMessage; // TODO: Make this a generic argument of `Broadcast`. type ProposedValue = Vec; @@ -39,7 +39,7 @@ pub enum Output { Agreement(AgreementMessage), } -pub struct CommonSubset { +pub struct CommonSubset { uid: NodeUid, num_nodes: usize, num_faulty_nodes: usize, @@ -59,7 +59,11 @@ impl CommonSubset { for uid0 in all_uids { broadcast_instances.insert( uid0.clone(), - Broadcast::new(uid.clone(), uid0.clone(), all_uids.clone())?, + Broadcast::new( + uid.clone(), + uid0.clone(), + all_uids.iter().cloned().collect(), + )?, ); } diff --git a/src/messaging.rs b/src/messaging.rs index b404f7a..94b8fa1 100644 --- a/src/messaging.rs +++ b/src/messaging.rs @@ -8,7 +8,7 @@ use std::fmt::Debug; /// recepients is unknown without further computation which is irrelevant to the /// message delivery task. #[derive(Clone, Debug)] -pub struct SourcedMessage { +pub struct SourcedMessage> { pub source: usize, pub message: Message, } @@ -27,20 +27,14 @@ pub enum Target { /// Message with a designated target. #[derive(Clone, Debug, PartialEq)] -pub struct TargetedMessage { +pub struct TargetedMessage> { pub target: Target, pub message: Message, } -impl TargetedMessage { +impl> TargetedMessage { /// Initialises a message while checking parameter preconditions. - pub fn new(target: Target, message: Message) -> Option { - match target { - Target::Node(i) if i == 0 => { - // Remote node indices start from 1. - None - } - _ => Some(TargetedMessage { target, message }), - } + pub fn new(target: Target, message: Message) -> Self { + TargetedMessage { target, message } } } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 47bba6f..ba5e2a0 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,6 +1,7 @@ //! Construction of messages from protobuf buffers. pub mod message; +use broadcast::BroadcastMessage; use merkle::proof::{Lemma, Positioned, Proof}; use proto::message::*; use protobuf::core::parse_from_bytes; @@ -12,20 +13,11 @@ use std::marker::{Send, Sync}; /// Kinds of message sent by nodes participating in consensus. #[derive(Clone, Debug, PartialEq)] -pub enum Message { +pub enum Message> { Broadcast(BroadcastMessage), Agreement(AgreementMessage), } -/// The three kinds of message sent during the reliable broadcast stage of the -/// consensus algorithm. -#[derive(Clone, PartialEq)] -pub enum BroadcastMessage { - Value(Proof), - Echo(Proof), - Ready(Vec), -} - /// Wrapper for a byte array, whose `Debug` implementation outputs shortened hexadecimal strings. pub struct HexBytes<'a>(pub &'a [u8]); @@ -33,46 +25,47 @@ impl<'a> fmt::Debug for HexBytes<'a> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { if self.0.len() > 6 { for byte in &self.0[..3] { - write!(f, "{:0x}", byte)?; + write!(f, "{:02x}", byte)?; } write!(f, "..")?; for byte in &self.0[(self.0.len() - 3)..] { - write!(f, "{:0x}", byte)?; + write!(f, "{:02x}", byte)?; } } else { for byte in self.0 { - write!(f, "{:0x}", byte)?; + write!(f, "{:02x}", byte)?; } } Ok(()) } } -struct HexProof<'a, T: 'a>(&'a Proof); +/// Wrapper for a list of byte arrays, whose `Debug` implementation outputs shortened hexadecimal +/// strings. +pub struct HexList<'a, T: 'a>(pub &'a [T]); -impl<'a, T: Send + Sync + fmt::Debug> fmt::Debug for HexProof<'a, T> { +impl<'a, T: AsRef<[u8]>> fmt::Debug for HexList<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let v: Vec<_> = self.0.iter().map(|t| HexBytes(t.as_ref())).collect(); + write!(f, "{:?}", v) + } +} + +pub struct HexProof<'a, T: 'a>(pub &'a Proof); + +impl<'a, T: AsRef<[u8]>> fmt::Debug for HexProof<'a, T> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, "Proof {{ algorithm: {:?}, root_hash: {:?}, lemma for leaf #{}, value: {:?} }}", self.0.algorithm, HexBytes(&self.0.root_hash), - ::broadcast::index_of_proof(self.0), - self.0.value + path_of_lemma(&self.0.lemma), + HexBytes(&self.0.value.as_ref()) ) } } -impl fmt::Debug for BroadcastMessage { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - BroadcastMessage::Value(ref v) => write!(f, "Value({:?})", HexProof(&v)), - BroadcastMessage::Echo(ref v) => write!(f, "Echo({:?})", HexProof(&v)), - BroadcastMessage::Ready(ref bytes) => write!(f, "Ready({:?})", HexBytes(bytes)), - } - } -} - /// Messages sent during the binary Byzantine agreement stage. #[derive(Copy, Clone, Debug, PartialEq)] pub enum AgreementMessage { @@ -80,17 +73,14 @@ pub enum AgreementMessage { Aux(bool), } -impl Message { +impl + From>> Message { /// Translation from protobuf to the regular type. /// /// TODO: add an `Algorithm` field to `MessageProto`. Either `Algorithm` has /// to be fully serialised and sent as a whole, or it can be passed over /// using an ID and the `Eq` instance to discriminate the finite set of /// algorithms in `ring::digest`. - pub fn from_proto(mut proto: message::MessageProto) -> Option - where - T: From>, - { + pub fn from_proto(mut proto: message::MessageProto) -> Option { if proto.has_broadcast() { BroadcastMessage::from_proto( proto.take_broadcast(), @@ -105,10 +95,7 @@ impl Message { } } - pub fn into_proto(self) -> MessageProto - where - T: Into>, - { + pub fn into_proto(self) -> MessageProto { let mut m = MessageProto::new(); match self { Message::Broadcast(b) => { @@ -125,10 +112,7 @@ impl Message { /// /// TODO: pass custom errors from down the chain of nested parsers as /// opposed to returning `WireError::Other`. - pub fn parse_from_bytes(bytes: &[u8]) -> ProtobufResult - where - T: From>, - { + pub fn parse_from_bytes(bytes: &[u8]) -> ProtobufResult { let r = parse_from_bytes::(bytes).map(Self::from_proto); match r { @@ -139,19 +123,13 @@ impl Message { } /// Produce a protobuf representation of this `Message`. - pub fn write_to_bytes(self) -> ProtobufResult> - where - T: Into>, - { + pub fn write_to_bytes(self) -> ProtobufResult> { self.into_proto().write_to_bytes() } } -impl BroadcastMessage { - pub fn into_proto(self) -> BroadcastProto - where - T: Into>, - { +impl + From>> BroadcastMessage { + pub fn into_proto(self) -> BroadcastProto { let mut b = BroadcastProto::new(); match self { BroadcastMessage::Value(p) => { @@ -173,10 +151,7 @@ impl BroadcastMessage { b } - pub fn from_proto(mut mp: BroadcastProto, algorithm: &'static Algorithm) -> Option - where - T: From>, - { + pub fn from_proto(mut mp: BroadcastProto, algorithm: &'static Algorithm) -> Option { if mp.has_value() { mp.take_value() .take_proof() @@ -223,10 +198,7 @@ impl AgreementMessage { /// around the restriction of not being allowed to extend the implementation of /// `Proof` outside its crate. impl ProofProto { - pub fn from_proof(proof: Proof) -> Self - where - T: Into>, - { + pub fn from_proof>(proof: Proof) -> Self { let mut proto = Self::new(); match proof { @@ -239,17 +211,17 @@ impl ProofProto { } => { proto.set_root_hash(root_hash); proto.set_lemma(LemmaProto::from_lemma(lemma)); - proto.set_value(value.into()); + proto.set_value(value.as_ref().to_vec()); } } proto } - pub fn into_proof(mut self, algorithm: &'static Algorithm) -> Option> - where - T: From>, - { + pub fn into_proof>>( + mut self, + algorithm: &'static Algorithm, + ) -> Option> { if !self.has_lemma() { return None; } @@ -326,3 +298,35 @@ impl LemmaProto { } } } + +/// The path of a lemma in a Merkle tree +struct BinaryPath(Vec); + +/// The path of the lemma, as a binary string +fn path_of_lemma(mut lemma: &Lemma) -> BinaryPath { + let mut result = Vec::new(); + loop { + match lemma.sibling_hash { + None => (), + Some(Positioned::Left(_)) => result.push(true), + Some(Positioned::Right(_)) => result.push(false), + } + lemma = match lemma.sub_lemma.as_ref() { + Some(lemma) => lemma, + None => return BinaryPath(result), + } + } +} + +impl fmt::Display for BinaryPath { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + for b in &self.0 { + if *b { + write!(f, "1")?; + } else { + write!(f, "0")?; + } + } + Ok(()) + } +} diff --git a/src/proto_io.rs b/src/proto_io.rs index 169949d..3c67d21 100644 --- a/src/proto_io.rs +++ b/src/proto_io.rs @@ -3,9 +3,9 @@ use proto::*; use protobuf; use protobuf::Message as ProtobufMessage; -use std::io; use std::io::{Read, Write}; use std::net::TcpStream; +use std::{cmp, io}; /// A magic key to put right before each message. An atavism of primitive serial /// protocols. @@ -35,14 +35,42 @@ impl From for Error { } } +fn encode_u32_to_be(value: u32, buffer: &mut [u8]) -> Result<(), Error> { + if buffer.len() < 4 { + return Err(Error::EncodeError); + } + let value = value.to_le(); + buffer[0] = ((value & 0xFF00_0000) >> 24) as u8; + buffer[1] = ((value & 0x00FF_0000) >> 16) as u8; + buffer[2] = ((value & 0x0000_FF00) >> 8) as u8; + buffer[3] = (value & 0x0000_00FF) as u8; + Ok(()) +} + +fn decode_u32_from_be(buffer: &[u8]) -> Result { + if buffer.len() < 4 { + return Err(Error::DecodeError); + } + let mut result = u32::from(buffer[0]); + result <<= 8; + result += u32::from(buffer[1]); + result <<= 8; + result += u32::from(buffer[2]); + result <<= 8; + result += u32::from(buffer[3]); + Ok(result) +} + pub struct ProtoIo { stream: S, + buffer: [u8; 1024 * 4], } impl ProtoIo { pub fn try_clone(&self) -> Result, ::std::io::Error> { Ok(ProtoIo { stream: self.stream.try_clone()?, + buffer: [0; 1024 * 4], }) } } @@ -52,31 +80,52 @@ impl ProtoIo //where T: Clone + Send + Sync + From> + Into> { pub fn from_stream(stream: S) -> Self { - ProtoIo { stream } + ProtoIo { + stream, + buffer: [0; 1024 * 4], + } } pub fn recv(&mut self) -> Result, Error> where - T: Clone + Send + Sync + From>, // + Into> + T: Clone + Send + Sync + AsRef<[u8]> + From>, { - let mut stream = protobuf::CodedInputStream::new(&mut self.stream); - // Read magic number - if stream.read_raw_varint32()? != FRAME_START { + self.stream.read_exact(&mut self.buffer[0..4])?; + let frame_start = decode_u32_from_be(&self.buffer[0..4])?; + if frame_start != FRAME_START { return Err(Error::FrameStartMismatch); + }; + self.stream.read_exact(&mut self.buffer[0..4])?; + let size = decode_u32_from_be(&self.buffer[0..4])? as usize; + + let mut message_v: Vec = Vec::new(); + message_v.reserve(size); + while message_v.len() < size { + let num_to_read = cmp::min(self.buffer.len(), size - message_v.len()); + let (slice, _) = self.buffer.split_at_mut(num_to_read); + self.stream.read_exact(slice)?; + message_v.extend_from_slice(slice); } - Message::from_proto(stream.read_message()?).ok_or(Error::DecodeError) + + Message::parse_from_bytes(&message_v).map_err(Error::ProtobufError) } pub fn send(&mut self, message: Message) -> Result<(), Error> where - T: Clone + Send + Sync + Into>, + T: Clone + Send + Sync + AsRef<[u8]> + From>, { + let mut buffer: [u8; 4] = [0; 4]; + // Wrap stream let mut stream = protobuf::CodedOutputStream::new(&mut self.stream); // Write magic number - stream.write_raw_varint32(FRAME_START)?; + encode_u32_to_be(FRAME_START, &mut buffer[0..4])?; + stream.write_raw_bytes(&buffer)?; let message_p = message.into_proto(); + // Write message size + encode_u32_to_be(message_p.compute_size(), &mut buffer[0..4])?; + stream.write_raw_bytes(&buffer)?; // Write message - message_p.write_length_delimited_to(&mut stream)?; + message_p.write_to(&mut stream)?; // Flush stream.flush()?; Ok(()) @@ -85,6 +134,7 @@ impl ProtoIo #[cfg(test)] mod tests { + use broadcast::BroadcastMessage; use proto_io::*; use std::io::Cursor; @@ -100,9 +150,6 @@ mod tests { println!("{:?}", pio.stream.get_ref()); pio.stream.set_position(0); assert_eq!(msg0, pio.recv().expect("recv msg0")); - // TODO: Figure out why the cursor is wrong here. - let len = pio.stream.get_ref().len() as u64; - pio.stream.set_position(len / 2); assert_eq!(msg1, pio.recv().expect("recv msg1")); } } diff --git a/tests/broadcast.rs b/tests/broadcast.rs index 7a333dc..2c2ce86 100644 --- a/tests/broadcast.rs +++ b/tests/broadcast.rs @@ -5,16 +5,15 @@ extern crate hbbft; extern crate log; extern crate crossbeam; extern crate crossbeam_channel; +extern crate env_logger; extern crate merkle; extern crate rand; -extern crate simple_logger; use rand::Rng; -use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::fmt; -use hbbft::broadcast::{Broadcast, BroadcastTarget, TargetedBroadcastMessage}; -use hbbft::proto::BroadcastMessage; +use hbbft::broadcast::{Broadcast, BroadcastMessage, BroadcastTarget, TargetedBroadcastMessage}; #[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy)] struct NodeId(usize); @@ -168,7 +167,7 @@ impl Adversary for ProposeAdversary { } self.has_sent = true; let value = b"Fake news"; - let node_ids: HashSet = self.adv_nodes + let node_ids: BTreeSet = self.adv_nodes .iter() .cloned() .chain(self.good_nodes.iter().cloned()) @@ -191,7 +190,7 @@ impl TestNetwork { /// Creates a new network with `good_num` good nodes, and the given `adversary` controlling /// `adv_num` nodes. fn new(good_num: usize, adv_num: usize, adversary: A) -> TestNetwork { - let node_ids: HashSet = (0..(good_num + adv_num)).map(NodeId).collect(); + let node_ids: BTreeSet = (0..(good_num + adv_num)).map(NodeId).collect(); let new_broadcast = |id: NodeId| { let bc = Broadcast::new(id, NodeId(0), node_ids.clone()).expect("Instantiate broadcast"); @@ -271,8 +270,8 @@ impl TestNetwork { /// Broadcasts a value from node 0 and expects all good nodes to receive it. fn test_broadcast(mut network: TestNetwork, proposed_value: &[u8]) { - // TODO: This returns an error in all but the first test. - let _ = simple_logger::init_with_level(log::Level::Debug); + // This returns an error in all but the first test. + let _ = env_logger::try_init(); // Make node 0 propose the value. network.propose_value(NodeId(0), proposed_value.to_vec()); @@ -288,36 +287,46 @@ fn test_broadcast(mut network: TestNetwork, proposed_value: &[u } } -// TODO: Unignore once equal shards don't cause problems anymore. #[test] -#[ignore] -fn test_8_broadcast_equal_leaves() { +fn test_8_broadcast_equal_leaves_silent() { let adversary = SilentAdversary::new(MessageScheduler::Random); // Space is ASCII character 32. So 32 spaces will create shards that are all equal, even if the // length of the value is inserted. test_broadcast(TestNetwork::new(8, 0, adversary), &[b' '; 32]); } -// TODO: Unignore once node numbers are supported that are not powers of two. #[test] -#[ignore] -fn test_13_broadcast_nodes_random_delivery() { +fn test_13_broadcast_nodes_random_delivery_silent() { let adversary = SilentAdversary::new(MessageScheduler::Random); test_broadcast(TestNetwork::new(13, 0, adversary), b"Foo"); } #[test] -fn test_11_5_broadcast_nodes_random_delivery() { +fn test_4_broadcast_nodes_random_delivery_silent() { + let adversary = SilentAdversary::new(MessageScheduler::Random); + test_broadcast(TestNetwork::new(4, 0, adversary), b"Foo"); +} + +#[test] +fn test_11_5_broadcast_nodes_random_delivery_silent() { let adversary = SilentAdversary::new(MessageScheduler::Random); test_broadcast(TestNetwork::new(11, 5, adversary), b"Foo"); } #[test] -fn test_11_5_broadcast_nodes_first_delivery() { +fn test_11_5_broadcast_nodes_first_delivery_silent() { let adversary = SilentAdversary::new(MessageScheduler::First); test_broadcast(TestNetwork::new(11, 5, adversary), b"Foo"); } +#[test] +fn test_3_1_broadcast_nodes_random_delivery_adv_propose() { + let good_nodes: BTreeSet = (0..3).map(NodeId).collect(); + let adv_nodes: BTreeSet = (3..4).map(NodeId).collect(); + let adversary = ProposeAdversary::new(MessageScheduler::Random, good_nodes, adv_nodes); + test_broadcast(TestNetwork::new(3, 1, adversary), b"Foo"); +} + #[test] fn test_11_5_broadcast_nodes_random_delivery_adv_propose() { let good_nodes: BTreeSet = (0..11).map(NodeId).collect();