diff --git a/examples/consensus-node.rs b/examples/consensus-node.rs index 1af2aa2..d7de6c8 100644 --- a/examples/consensus-node.rs +++ b/examples/consensus-node.rs @@ -1,13 +1,18 @@ //! Example of a consensus node that uses the `hbbft::node::Node` struct for //! running the distributed consensus state machine. -//#[macro_use] +extern crate crossbeam; +#[macro_use] +extern crate crossbeam_channel; extern crate docopt; extern crate hbbft; +#[macro_use] extern crate log; extern crate simple_logger; +mod network; + use docopt::Docopt; -use hbbft::node::Node; +use network::node::Node; use std::collections::HashSet; use std::net::SocketAddr; use std::vec::Vec; diff --git a/src/commst.rs b/examples/network/commst.rs similarity index 96% rename from src/commst.rs rename to examples/network/commst.rs index 5429479..4005d3f 100644 --- a/src/commst.rs +++ b/examples/network/commst.rs @@ -8,10 +8,9 @@ use std::io; use std::net::TcpStream; use std::sync::Arc; -use messaging::SourcedMessage; -use proto::Message; -use proto_io; -use proto_io::ProtoIo; +use hbbft::messaging::SourcedMessage; +use hbbft::proto::Message; +use hbbft::proto_io::{self, ProtoIo}; #[derive(Debug)] pub enum Error { @@ -32,7 +31,7 @@ pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync + From> + I /// The receive side of the channel to the comms thread. rx: &'a Receiver>, /// The socket IO task. - io: ProtoIo, + io: ProtoIo, /// The index of this comms task for identification against its remote node. pub node_index: usize, } diff --git a/src/connection.rs b/examples/network/connection.rs similarity index 100% rename from src/connection.rs rename to examples/network/connection.rs diff --git a/examples/network/messaging.rs b/examples/network/messaging.rs new file mode 100644 index 0000000..4a51d40 --- /dev/null +++ b/examples/network/messaging.rs @@ -0,0 +1,189 @@ +//! The local message delivery system. +use crossbeam::{Scope, ScopedJoinHandle}; +use crossbeam_channel; +use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; +use hbbft::messaging::{SourcedMessage, Target, TargetedMessage}; +use hbbft::proto::Message; +use std::fmt::Debug; + +/// The queue functionality for messages sent between algorithm instances. +/// The messaging struct allows for targeted message exchange between comms +/// tasks on one side and algo tasks on the other. +pub struct Messaging { + /// Transmit sides of message channels to comms threads. + txs_to_comms: Vec>>, + /// Receive side of the routed message channel from comms threads. + rx_from_comms: Receiver>, + /// Transmit sides of message channels to algo threads. + txs_to_algo: Vec>>, + /// Receive side of the routed message channel from comms threads. + rx_from_algo: Receiver>, + + /// RX handles to be used by comms tasks. + rxs_to_comms: Vec>>, + /// TX handle to be used by comms tasks. + tx_from_comms: Sender>, + /// RX handles to be used by algo tasks. + rxs_to_algo: Vec>>, + /// TX handle to be used by algo tasks. + tx_from_algo: Sender>, + + /// Control channel used to stop the listening thread. + stop_tx: Sender<()>, + stop_rx: Receiver<()>, +} + +impl Messaging { + /// Initialises all the required TX and RX handles for the case on a total + /// number `num_nodes` of consensus nodes. + pub fn new(num_nodes: usize) -> Self { + let to_comms: Vec<_> = (0..num_nodes - 1) + .map(|_| unbounded::>()) + .collect(); + let txs_to_comms = to_comms.iter().map(|&(ref tx, _)| tx.to_owned()).collect(); + let rxs_to_comms: Vec>> = + to_comms.iter().map(|&(_, ref rx)| rx.to_owned()).collect(); + let (tx_from_comms, rx_from_comms) = unbounded(); + let to_algo: Vec<_> = (0..num_nodes) + .map(|_| unbounded::>()) + .collect(); + let txs_to_algo = to_algo.iter().map(|&(ref tx, _)| tx.to_owned()).collect(); + let rxs_to_algo: Vec>> = + to_algo.iter().map(|&(_, ref rx)| rx.to_owned()).collect(); + let (tx_from_algo, rx_from_algo) = unbounded(); + + let (stop_tx, stop_rx) = bounded(1); + + Messaging { + // internally used handles + txs_to_comms, + rx_from_comms, + txs_to_algo, + rx_from_algo, + + // externally used handles + rxs_to_comms, + tx_from_comms, + rxs_to_algo, + tx_from_algo, + + stop_tx, + stop_rx, + } + } + + pub fn rxs_to_comms(&self) -> &Vec>> { + &self.rxs_to_comms + } + + pub fn tx_from_comms(&self) -> &Sender> { + &self.tx_from_comms + } + + pub fn rxs_to_algo(&self) -> &Vec>> { + &self.rxs_to_algo + } + + pub fn tx_from_algo(&self) -> &Sender> { + &self.tx_from_algo + } + + /// Gives the ownership of the handle to stop the message receive loop. + pub fn stop_tx(&self) -> Sender<()> { + self.stop_tx.to_owned() + } + + /// Spawns the message delivery thread in a given thread scope. + pub fn spawn<'a>(&self, scope: &Scope<'a>) -> ScopedJoinHandle> + where + T: 'a, + { + let txs_to_comms = self.txs_to_comms.to_owned(); + let rx_from_comms = self.rx_from_comms.to_owned(); + let txs_to_algo = self.txs_to_algo.to_owned(); + let rx_from_algo = self.rx_from_algo.to_owned(); + + let stop_rx = self.stop_rx.to_owned(); + let mut stop = false; + + // TODO: `select_loop!` seems to really confuse Clippy. + #[cfg_attr( + feature = "cargo-clippy", + allow(never_loop, if_let_redundant_pattern_matching, deref_addrof) + )] + scope.spawn(move || { + let mut result = Ok(()); + // This loop forwards messages according to their metadata. + while !stop && result.is_ok() { + select_loop! { + recv(rx_from_algo, message) => { + match message { + TargetedMessage { + target: Target::All, + message + } => { + // Send the message to all remote nodes, stopping at + // the first error. + result = txs_to_comms.iter() + .fold(Ok(()), |result, tx| { + if result.is_ok() { + tx.send(message.clone()) + } + else { + result + } + }).map_err(Error::from); + }, + TargetedMessage { + target: Target::Node(i), + message + } => { + // Remote node indices start from 1. + assert!(i > 0); + // Convert node index to vector index. + let i = i - 1; + + result = if i < txs_to_comms.len() { + txs_to_comms[i].send(message.clone()) + .map_err(Error::from) + } + else { + Err(Error::NoSuchTarget) + }; + } + } + }, + recv(rx_from_comms, message) => { + // Send the message to all algorithm instances, stopping at + // the first error. + result = txs_to_algo.iter().fold(Ok(()), |result, tx| { + if result.is_ok() { + tx.send(message.clone()) + } + else { + result + } + }).map_err(Error::from) + }, + recv(stop_rx, _) => { + // Flag the thread ready to exit. + stop = true; + } + } + } // end of select_loop! + result + }) + } +} + +#[derive(Clone, Debug)] +pub enum Error { + NoSuchTarget, + SendError, +} + +impl From> for Error { + fn from(_: crossbeam_channel::SendError) -> Error { + Error::SendError + } +} diff --git a/examples/network/mod.rs b/examples/network/mod.rs new file mode 100644 index 0000000..8734b80 --- /dev/null +++ b/examples/network/mod.rs @@ -0,0 +1,4 @@ +pub mod commst; +pub mod connection; +pub mod messaging; +pub mod node; diff --git a/src/node.rs b/examples/network/node.rs similarity index 82% rename from src/node.rs rename to examples/network/node.rs index 4fb9d89..46c91cc 100644 --- a/src/node.rs +++ b/examples/network/node.rs @@ -1,16 +1,50 @@ //! Networking controls of the consensus node. +//! +//! ## Example +//! +//! The following code could be run on host 192.168.1.1: +//! +//! ```ignore +//! extern crate hbbft; +//! +//! use hbbft::node::Node; +//! use std::net::SocketAddr; +//! use std::vec::Vec; +//! +//! fn main() { +//! let bind_address = "127.0.0.1:10001".parse().unwrap(); +//! let remote_addresses = vec!["192.168.1.2:10002", +//! "192.168.1.3:10003", +//! "192.168.1.4:10004"] +//! .iter() +//! .map(|s| s.parse().unwrap()) +//! .collect(); +//! +//! let value = "Value #1".as_bytes().to_vec(); +//! +//! let result = Node::new(bind_address, remote_addresses, Some(value)) +//! .run(); +//! println!("Consensus result {:?}", result); +//! } +//! ``` +//! +//! Similar code shall then run on hosts 192.168.1.2, 192.168.1.3 and +//! 192.168.1.4 with appropriate changes in `bind_address` and +//! `remote_addresses`. Each host has it's own optional broadcast `value`. If +//! the consensus `result` is not an error then every successfully terminated +//! consensus node will be the same `result`. + use crossbeam; -use merkle::Hashable; use std::collections::HashSet; use std::fmt::Debug; use std::io; use std::marker::{Send, Sync}; use std::net::SocketAddr; -use broadcast; -use commst; -use connection; -use messaging::Messaging; +use hbbft::broadcast; +use network::commst; +use network::connection; +use network::messaging::Messaging; #[derive(Debug)] pub enum Error { @@ -41,7 +75,7 @@ pub struct Node { value: Option, } -impl> + Into>> +impl + PartialEq + Send + Sync + From> + Into>> Node { /// Consensus node constructor. It only initialises initial parameters. diff --git a/src/broadcast.rs b/src/broadcast.rs index 46a5fbf..20ee669 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -1,5 +1,4 @@ //! Reliable broadcast algorithm instance. -use crossbeam; use crossbeam_channel::{Receiver, RecvError, SendError, Sender}; use merkle::proof::{Lemma, Positioned, Proof}; use merkle::{Hashable, MerkleTree}; @@ -11,11 +10,12 @@ use std::fmt::Debug; use std::hash::Hash; use std::iter; use std::marker::{Send, Sync}; -use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard}; +use std::sync::{RwLock, RwLockWriteGuard}; -use messaging; -use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoopState, ProposedValue, - QMessage, RemoteMessage, RemoteNode, SourcedMessage, Target, TargetedMessage}; +use messaging::{SourcedMessage, Target, TargetedMessage}; + +// TODO: Make this a generic argument of `Broadcast`. +type ProposedValue = Vec; type MessageQueue = VecDeque>; @@ -26,12 +26,12 @@ pub struct TargetedBroadcastMessage { pub message: BroadcastMessage, } -impl TargetedBroadcastMessage { - pub fn into_remote_message(self) -> RemoteMessage { - RemoteMessage { - node: match self.target { - BroadcastTarget::All => RemoteNode::All, - BroadcastTarget::Node(node) => RemoteNode::Node(node), +impl TargetedBroadcastMessage { + pub fn into_targeted_message(self) -> TargetedMessage { + TargetedMessage { + target: match self.target { + BroadcastTarget::All => Target::All, + BroadcastTarget::Node(node) => Target::Node(node), }, message: Message::Broadcast(self.message), } @@ -74,95 +74,6 @@ pub struct Broadcast { state: RwLock, } -impl Broadcast { - /// The message-driven interface function for calls from the main message - /// loop. - pub fn on_message( - &self, - m: QMessage, - tx: &Sender, - ) -> Result { - match m { - QMessage::Local(LocalMessage { message, .. }) => match message { - AlgoMessage::BroadcastInput(value) => self.on_local_message(&mut value.to_owned()), - - _ => Err(Error::UnexpectedMessage), - }, - - QMessage::Remote(RemoteMessage { - node: RemoteNode::Node(uid), - message, - }) => { - if let Message::Broadcast(b) = message { - self.on_remote_message(uid, b, tx) - } else { - Err(Error::UnexpectedMessage) - } - } - - _ => Err(Error::UnexpectedMessage), - } - } - - fn on_remote_message( - &self, - uid: messaging::NodeUid, - message: BroadcastMessage, - tx: &Sender, - ) -> Result { - let (output, messages) = self.handle_broadcast_message(&uid, message)?; - if let Some(value) = output { - tx.send(QMessage::Local(LocalMessage { - dst: Algorithm::CommonSubset, - message: AlgoMessage::BroadcastOutput(self.our_id, value), - })).map_err(Error::from)?; - } - Ok(MessageLoopState::Processing( - messages - .into_iter() - .map(TargetedBroadcastMessage::into_remote_message) - .collect(), - )) - } - - /// Processes the proposed value input by broadcasting it. - pub fn on_local_message(&self, value: &mut ProposedValue) -> Result { - let mut state = self.state.write().unwrap(); - // Split the value into chunks/shards, encode them with erasure codes. - // Assemble a Merkle tree from data and parity shards. Take all proofs - // from this tree and send them, each to its own node. - self.send_shards(value.clone()) - .map(|(proof, remote_messages)| { - // Record the first proof as if it were sent by the node to - // itself. - let h = proof.root_hash.clone(); - if proof.validate(h.as_slice()) { - // Save the leaf value for reconstructing the tree later. - state.leaf_values[index_of_proof(&proof)] = - Some(proof.value.clone().into_boxed_slice()); - state.leaf_values_num += 1; - state.root_hash = Some(h); - } - - MessageLoopState::Processing( - remote_messages - .into_iter() - .map(TargetedBroadcastMessage::into_remote_message) - .collect(), - ) - }) - } -} - -impl<'a, E> Handler for Broadcast -where - E: From + From, -{ - fn handle(&self, m: QMessage, tx: Sender) -> Result { - self.on_message(m, &tx).map_err(E::from) - } -} - impl Broadcast { /// Creates a new broadcast instance to be used by node `our_id` which expects a value proposal /// from node `proposer_id`. @@ -487,14 +398,10 @@ pub struct Instance<'a, T: 'a + Clone + Debug + Send + Sync> { tx: &'a Sender>, /// The receive side of the channel from comms threads. rx: &'a Receiver>, + /// The broadcast algorithm instance. + broadcast: Broadcast, /// Value to be broadcast. broadcast_value: Option, - /// This instance's index for identification against its comms task. - node_index: usize, - /// Number of nodes participating in broadcast. - num_nodes: usize, - /// Maximum allowed number of faulty nodes. - num_faulty_nodes: usize, } impl<'a, T: Clone + Debug + Hashable + Send + Sync + Into> + From>> @@ -505,45 +412,25 @@ impl<'a, T: Clone + Debug + Hashable + Send + Sync + Into> + From>, broadcast_value: Option, num_nodes: usize, - node_index: usize, + proposer_index: usize, ) -> Self { + let all_indexes = (0..num_nodes).collect(); + let broadcast = Broadcast::new(0, proposer_index, all_indexes) + .expect("failed to instantiate broadcast"); Instance { tx, rx, + broadcast, broadcast_value, - node_index, - num_nodes, - num_faulty_nodes: (num_nodes - 1) / 3, } } /// Broadcast stage task returning the computed values in case of success, /// and an error in case of failure. - pub fn run(&mut self) -> Result { + pub fn run(self) -> Result { // Broadcast state machine thread. - let bvalue = self.broadcast_value.to_owned(); - let result: Result; - let result_r = Arc::new(Mutex::new(None)); - let result_r_scoped = result_r.clone(); - - crossbeam::scope(|scope| { - scope.spawn(move || { - *result_r_scoped.lock().unwrap() = Some(inner_run( - self.tx, - self.rx, - bvalue, - self.node_index, - self.num_nodes, - self.num_faulty_nodes, - )); - }); - }); - if let Some(ref r) = *result_r.lock().unwrap() { - result = r.to_owned(); - } else { - result = Err(Error::Threading); - } - result + let bvalue: Option = self.broadcast_value.map(|v| v.into()); + inner_run(self.tx, self.rx, bvalue, &self.broadcast).map(ProposedValue::into) } } @@ -554,7 +441,6 @@ pub enum Error { Threading, ProofConstructionFailed, ReedSolomon(rse::Error), - Send(SendError), SendDeprecated(SendError>), Recv(RecvError), UnexpectedMessage, @@ -567,12 +453,6 @@ impl From for Error { } } -impl From> for Error { - fn from(err: SendError) -> Error { - Error::Send(err) - } -} - impl From>> for Error { fn from(err: SendError>) -> Error { Error::SendDeprecated(err) @@ -585,138 +465,25 @@ impl From for Error { } } -/// Breaks the input value into shards of equal length and encodes them -- and -/// some extra parity shards -- with a Reed-Solomon erasure coding scheme. The -/// returned value contains the shard assigned to this node. That shard doesn't -/// need to be sent anywhere. It is returned to the broadcast instance and gets -/// recorded immediately. -fn send_shards<'a, T>( - value: T, +/// The main loop of the broadcast task. +fn inner_run<'a>( tx: &'a Sender>, - coding: &ReedSolomon, -) -> Result, Error> -where - T: Clone + Debug + Hashable + Send + Sync + Into> + From>, -{ - let data_shard_num = coding.data_shard_count(); - let parity_shard_num = coding.parity_shard_count(); - - debug!( - "Data shards: {}, parity shards: {}", - data_shard_num, parity_shard_num - ); - let mut v: Vec = T::into(value); - // Insert the length of `v` so it can be decoded without the padding. - let payload_len = v.len() as u8; - v.insert(0, payload_len); // TODO: Handle messages larger than 255 bytes. - let value_len = v.len(); - // Size of a Merkle tree leaf value, in bytes. - let shard_len = if value_len % data_shard_num > 0 { - value_len / data_shard_num + 1 - } else { - value_len / data_shard_num - }; - // Pad the last data shard with zeros. Fill the parity shards with zeros. - v.resize(shard_len * (data_shard_num + parity_shard_num), 0); - - debug!("value_len {}, shard_len {}", value_len, shard_len); - - // Divide the vector into chunks/shards. - let shards_iter = v.chunks_mut(shard_len); - // Convert the iterator over slices into a vector of slices. - let mut shards: Vec<&mut [u8]> = Vec::new(); - for s in shards_iter { - shards.push(s); - } - - debug!("Shards before encoding: {:?}", shards); - - // Construct the parity chunks/shards - coding.encode(shards.as_mut_slice())?; - - debug!("Shards: {:?}", shards); - - let shards_t: Vec = shards.into_iter().map(|s| s.to_vec()).collect(); - - // Convert the Merkle tree into a partial binary tree for later - // deconstruction into compound branches. - let mtree = MerkleTree::from_vec(&::ring::digest::SHA256, shards_t); - - // Default result in case of `gen_proof` error. - let mut result = Err(Error::ProofConstructionFailed); - - // Send each proof to a node. - for (i, leaf_value) in mtree.iter().enumerate() { - let proof = mtree.gen_proof(leaf_value.to_vec()); - if let Some(proof) = proof { - if i == 0 { - // The first proof is addressed to this node. - result = Ok(proof); - } else { - // Rest of the proofs are sent to remote nodes. - tx.send(TargetedMessage { - target: Target::Node(i), - message: Message::Broadcast(BroadcastMessage::Value(proof)), - })?; - } + rx: &'a Receiver>, + broadcast_value: Option, + broadcast: &Broadcast, +) -> Result { + if let Some(v) = broadcast_value { + for msg in broadcast + .propose_value(v)? + .into_iter() + .map(TargetedBroadcastMessage::into_targeted_message) + { + tx.send(msg)?; } } - result -} - -/// The main loop of the broadcast task. -fn inner_run<'a, T>( - tx: &'a Sender>, - rx: &'a Receiver>, - broadcast_value: Option, - node_index: usize, - num_nodes: usize, - num_faulty_nodes: usize, -) -> Result -where - T: Clone + Debug + Hashable + Send + Sync + Into> + From>, -{ - // Erasure coding scheme: N - 2f value shards and 2f parity shards - let parity_shard_num = 2 * num_faulty_nodes; - let data_shard_num = num_nodes - parity_shard_num; - let coding = ReedSolomon::new(data_shard_num, parity_shard_num)?; - // currently known leaf values - let mut leaf_values: Vec>> = vec![None; num_nodes]; - // Write-once root hash of a tree broadcast from the sender associated with - // this instance. - let mut root_hash: Option> = None; - // number of non-None leaf values - let mut leaf_values_num = 0; - - // Split the value into chunks/shards, encode them with erasure codes. - // Assemble a Merkle tree from data and parity shards. Take all proofs from - // this tree and send them, each to its own node. - if let Some(v) = broadcast_value { - send_shards(v, tx, &coding).map(|proof| { - // Record the first proof as if it were sent by the node to - // itself. - let h = proof.root_hash.clone(); - if proof.validate(h.as_slice()) { - // Save the leaf value for reconstructing the tree later. - leaf_values[index_of_proof(&proof)] = Some(proof.value.clone().into_boxed_slice()); - leaf_values_num += 1; - root_hash = Some(h); - } - })? - } - - // return value - let mut result: Option> = None; - // Number of times Echo was received with the same root hash. - let mut echo_num = 0; - // Number of times Ready was received with the same root hash. - let mut readys: HashMap, usize> = HashMap::new(); - let mut ready_sent = false; - let mut ready_to_decode = false; - // TODO: handle exit conditions - while result.is_none() { + loop { // Receive a message from the socket IO task. let message = rx.recv()?; if let SourcedMessage { @@ -724,132 +491,19 @@ where message: Message::Broadcast(message), } = message { - match message { - // A value received. Record the value and multicast an echo. - BroadcastMessage::Value(p) => { - if i != node_index { - // Ignore value messages from unrelated remote nodes. - continue; - } - - if root_hash.is_none() { - root_hash = Some(p.root_hash.clone()); - debug!( - "Node {} Value root hash {:?}", - node_index, - HexBytes(&p.root_hash) - ); - } - - if let Some(ref h) = root_hash { - if p.validate(h.as_slice()) { - // Save the leaf value for reconstructing the tree - // later. - leaf_values[index_of_proof(&p)] = - Some(p.value.clone().into_boxed_slice()); - leaf_values_num += 1; - } - } - // Broadcast an echo of this proof. - tx.send(TargetedMessage { - target: Target::All, - message: Message::Broadcast(BroadcastMessage::Echo(p)), - })? - } - - // An echo received. Verify the proof it contains. - BroadcastMessage::Echo(p) => { - if root_hash.is_none() && i == node_index { - root_hash = Some(p.root_hash.clone()); - debug!("Node {} Echo root hash {:?}", node_index, root_hash); - } - - // call validate with the root hash as argument - if let Some(ref h) = root_hash { - if p.validate(h.as_slice()) { - echo_num += 1; - // Save the leaf value for reconstructing the tree - // later. - leaf_values[index_of_proof(&p)] = - Some(p.value.clone().into_boxed_slice()); - leaf_values_num += 1; - - // upon receiving 2f + 1 matching READY(h) - // messages, wait for N − 2 f ECHO messages, then - // decode v - if ready_to_decode - && leaf_values_num >= num_nodes - 2 * num_faulty_nodes - { - result = Some(decode_from_shards( - &mut leaf_values, - &coding, - data_shard_num, - h, - )); - } else if leaf_values_num >= num_nodes - num_faulty_nodes { - result = Some(decode_from_shards( - &mut leaf_values, - &coding, - data_shard_num, - h, - )); - // if Ready has not yet been sent, multicast - // Ready - if !ready_sent { - ready_sent = true; - tx.send(TargetedMessage { - target: Target::All, - message: Message::Broadcast(BroadcastMessage::Ready( - h.to_owned(), - )), - })?; - } - } - } - } - } - - BroadcastMessage::Ready(ref hash) => { - // Update the number Ready has been received with this hash. - *readys.entry(hash.to_vec()).or_insert(1) += 1; - - // Check that the root hash matches. - if let Some(ref h) = root_hash { - let ready_num: usize = *readys.get(h).unwrap_or(&0); - - // Upon receiving f + 1 matching Ready(h) messages, if - // Ready has not yet been sent, multicast Ready(h). - if (ready_num == num_faulty_nodes + 1) && !ready_sent { - tx.send(TargetedMessage { - target: Target::All, - message: Message::Broadcast(BroadcastMessage::Ready(h.to_vec())), - })?; - } - - // Upon receiving 2f + 1 matching Ready(h) messages, - // wait for N − 2f Echo messages, then decode v. - if ready_num > 2 * num_faulty_nodes { - // Wait for N - 2f Echo messages, then decode v. - if echo_num >= num_nodes - 2 * num_faulty_nodes { - result = Some(decode_from_shards( - &mut leaf_values, - &coding, - data_shard_num, - h, - )); - } else { - ready_to_decode = true; - } - } - } - } + let (opt_output, msgs) = broadcast.handle_broadcast_message(&i, message)?; + for msg in msgs.into_iter() + .map(TargetedBroadcastMessage::into_targeted_message) + { + tx.send(msg)?; + } + if let Some(output) = opt_output { + return Ok(output); } } else { error!("Incorrect message from the socket: {:?}", message); } } - // result is not a None, safe to extract value - result.unwrap() } fn decode_from_shards( diff --git a/src/common_subset.rs b/src/common_subset.rs index c94fea5..e304d51 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -13,10 +13,11 @@ use agreement::Agreement; use broadcast; use broadcast::{Broadcast, TargetedBroadcastMessage}; -use messaging::ProposedValue; - use proto::{AgreementMessage, BroadcastMessage}; +// TODO: Make this a generic argument of `Broadcast`. +type ProposedValue = Vec; + /// Input from a remote node to Common Subset. pub enum Input { /// Message from a remote node `uid` to the broadcast instance `uid`. diff --git a/src/lib.rs b/src/lib.rs index 052f030..58deb95 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,59 +2,20 @@ //! //! Library of asynchronous Byzantine fault tolerant consensus known as "the //! honey badger of BFT protocols" after a paper with the same title. -//! -//! ## Example -//! -//! The following code could be run on host 192.168.1.1: -//! -//! ```ignore -//! extern crate hbbft; -//! -//! use hbbft::node::Node; -//! use std::net::SocketAddr; -//! use std::vec::Vec; -//! -//! fn main() { -//! let bind_address = "127.0.0.1:10001".parse().unwrap(); -//! let remote_addresses = vec!["192.168.1.2:10002", -//! "192.168.1.3:10003", -//! "192.168.1.4:10004"] -//! .iter() -//! .map(|s| s.parse().unwrap()) -//! .collect(); -//! -//! let value = "Value #1".as_bytes().to_vec(); -//! -//! let result = Node::new(bind_address, remote_addresses, Some(value)) -//! .run(); -//! println!("Consensus result {:?}", result); -//! } -//! ``` -//! -//! Similar code shall then run on hosts 192.168.1.2, 192.168.1.3 and -//! 192.168.1.4 with appropriate changes in `bind_address` and -//! `remote_addresses`. Each host has it's own optional broadcast `value`. If -//! the consensus `result` is not an error then every successfully terminated -//! consensus node will be the same `result`. #![feature(optin_builtin_traits)] #[macro_use] extern crate log; extern crate crossbeam; +extern crate crossbeam_channel; extern crate merkle; extern crate protobuf; -extern crate ring; -#[macro_use] -extern crate crossbeam_channel; extern crate reed_solomon_erasure; +extern crate ring; pub mod agreement; pub mod broadcast; pub mod common_subset; -mod commst; -mod connection; pub mod messaging; pub mod proto; -mod proto_io; - -pub mod node; +pub mod proto_io; diff --git a/src/messaging.rs b/src/messaging.rs index 9663a0a..b404f7a 100644 --- a/src/messaging.rs +++ b/src/messaging.rs @@ -1,297 +1,16 @@ //! The local message delivery system. -use crossbeam::{Scope, ScopedJoinHandle}; -use crossbeam_channel; -use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; use proto::Message; -use std::collections::{HashMap, HashSet, VecDeque}; use std::fmt::Debug; -use std::net::SocketAddr; -use std::sync::RwLock; -/// Unique ID of a node. -pub type NodeUid = SocketAddr; - -/// Type of algorithm primitive used in HoneyBadgerBFT. -/// -/// TODO: Add the epoch parameter? -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub enum Algorithm { - /// Encryption stage. - Encryption, - /// Decryption stage. - Decryption, - /// Asynchronous Common Subset. - CommonSubset, - /// Reliable Broadcast instance. - Broadcast(NodeUid), - /// Binary Agreement instance. - Agreement(NodeUid), -} - -impl Iterator for Algorithm { - type Item = String; - - fn next(&mut self) -> Option { - Some(format!("{:?}", self)) - } -} - -/// Type of proposed (encrypted) value for consensus. -pub type ProposedValue = Vec; - -/// Kinds of messages sent between algorithm instances. -#[derive(Clone)] -pub enum AlgoMessage { - /// Asynchronous common subset input. - CommonSubsetInput(ProposedValue), - /// Asynchronous common subset output. - CommonSubsetOutput(HashSet), - /// Broadcast instance input. - BroadcastInput(ProposedValue), - /// Broadcast instance output. - BroadcastOutput(NodeUid, ProposedValue), - /// Binary agreement instance input. - AgreementInput(bool), - /// Binary agreement instance output. - AgreementOutput(NodeUid, bool), -} - -/// A message sent between algorithm instances. -#[derive(Clone)] -pub struct LocalMessage { - /// Identifier of the message destination algorithm. - pub dst: Algorithm, - /// Payload - pub message: AlgoMessage, -} - -/// The message destinations corresponding to a remote node `i`. It can be -/// either of the two: -/// -/// 1) `All`: all nodes if sent to socket tasks, or all local algorithm -/// instances if received from socket tasks. -/// -/// 2) `Node(i)`: node `i` or local algorithm instances with the node ID `i`. -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum RemoteNode { - All, - Node(NodeUid), -} - -/// Message to or from a remote node. -#[derive(Clone, Debug, PartialEq)] -pub struct RemoteMessage { - pub node: RemoteNode, - pub message: Message, -} - -/// The union type of local and remote messages. -#[derive(Clone)] -pub enum QMessage { - Local(LocalMessage), - Remote(RemoteMessage), -} - -/// States of the message loop consided as an automaton with output. There is -/// one exit state `Finished` and one transitional (also initial) state -/// `Processing` whose argument is an output queue of messages to be sent to -/// remote nodes. -#[derive(Clone, PartialEq)] -pub enum MessageLoopState { - Processing(VecDeque), - Finished, -} - -impl MessageLoopState { - pub fn is_processing(&self) -> bool { - if let MessageLoopState::Processing(_) = self { - true - } else { - false - } - } - - /// Appends pending messages of another state. Used to append messages - /// emitted by the handler to the messages already queued from previous - /// iterations of a message handling loop. - pub fn append(&mut self, other: &mut MessageLoopState) { - if let MessageLoopState::Processing(ref mut new_msgs) = other { - if let MessageLoopState::Processing(ref mut msgs) = self { - msgs.append(new_msgs); - } - } - } -} - -/// Abstract type of message handler callback. A callback function has two -/// arguments: the sent message and the TX handle to send replies back to the -/// message loop. A call to the function returns either a new message loop state -/// - either `Finished` or a state with outgoing messages to remote nodes - or -/// an error. -pub trait Handler>: Send + Sync { - fn handle(&self, m: QMessage, tx: Sender) -> Result; -} - -/// The queue functionality for messages sent between algorithm instances. -pub struct MessageLoop<'a, HandlerError: 'a + From> { - /// Algorithm message handlers. Every message handler receives a message and - /// the TX handle of the incoming message queue for sending replies back to - /// the message loop. - algos: RwLock>>, - /// TX handle of the message queue. - queue_tx: Sender, - /// RX handle of the message queue. - queue_rx: Receiver, - /// Remote send handles. Messages are sent through channels as opposed to - /// directly to sockets. This is done to make tests independent of socket - /// IO. - remote_txs: HashMap>>, -} - -impl<'a, HandlerError> MessageLoop<'a, HandlerError> -where - HandlerError: 'a + From, -{ - pub fn new(remote_txs: HashMap>>) -> Self { - let (queue_tx, queue_rx) = unbounded(); - MessageLoop { - algos: RwLock::new(HashMap::new()), - queue_tx, - queue_rx, - remote_txs, - } - } - - pub fn queue_tx(&self) -> Sender { - self.queue_tx.clone() - } - - /// Registers a handler for messages sent to the given algorithm. - pub fn insert_algo(&'a self, algo: Algorithm, handler: &'a Handler) { - let lock = self.algos.write(); - if let Ok(mut map) = lock { - map.insert(algo, handler); - } else { - error!("Cannot insert {:?}", algo); - } - } - - /// Unregisters the handler for messages sent to the given algorithm. - pub fn remove_algo(&self, algo: &Algorithm) { - let lock = self.algos.write(); - if let Ok(mut map) = lock { - map.remove(algo); - } else { - error!("Cannot remove {:?}", algo); - } - } - - /// The message loop. - pub fn run(&self) -> Result { - let mut result = Ok(MessageLoopState::Processing(VecDeque::new())); - - while let Ok(mut state) = result { - // Send any outgoing messages to remote nodes using the provided - // function. - (if let MessageLoopState::Processing(messages) = &state { - self.send_remote(messages) - .map(|_| MessageLoopState::Processing(VecDeque::new())) - .map_err(HandlerError::from) - } else { - Ok(MessageLoopState::Finished) - })?; - - // Receive local and remote messages. - if let Ok(m) = self.queue_rx.recv() { - result = match m { - QMessage::Local(LocalMessage { dst, message }) => { - // FIXME: error handling - if let Some(mut handler) = self.algos.write().unwrap().get_mut(&dst) { - let mut new_result = handler.handle( - QMessage::Local(LocalMessage { dst, message }), - self.queue_tx.clone(), - ); - if let Ok(ref mut new_state) = new_result { - state.append(new_state); - Ok(state) - } else { - // Error overrides the previous state. - new_result - } - } else { - Err(Error::NoSuchAlgorithm).map_err(HandlerError::from) - } - } - - // A message FROM a remote node. - QMessage::Remote(RemoteMessage { node, message }) => { - // Multicast the message to all algorithm instances, - // collecting output messages iteratively and appending them - // to result. - // - // FIXME: error handling - self.algos.write().unwrap().iter_mut().fold( - Ok(state), - |result1, (_, handler)| { - if let Ok(mut state1) = result1 { - handler - .handle( - QMessage::Remote(RemoteMessage { - node: node.clone(), - message: message.clone(), - }), - self.queue_tx.clone(), - ) - .map(|ref mut state2| { - state1.append(state2); - state1 - }) - } else { - result1 - } - }, - ) - } - } - } else { - result = Err(Error::RecvError).map_err(HandlerError::from) - } - } // end of while loop - result - } - - /// Send a message queue to remote nodes. - fn send_remote(&self, messages: &VecDeque) -> Result<(), Error> { - messages.iter().fold(Ok(()), |result, m| { - if result.is_err() { - result - } else { - match m { - RemoteMessage { - node: RemoteNode::Node(uid), - message, - } => { - if let Some(tx) = self.remote_txs.get(&uid) { - tx.send(message.clone()).map_err(Error::from) - } else { - Err(Error::SendError) - } - } - - RemoteMessage { - node: RemoteNode::All, - message, - } => self.remote_txs.iter().fold(result, |result1, (_, tx)| { - if result1.is_err() { - result1 - } else { - tx.send(message.clone()).map_err(Error::from) - } - }), - } - } - }) - } +/// Message sent by a given source. The sources are consensus nodes indexed 1 +/// through N where N is the total number of nodes. Sourced messages are +/// required when it is essential to know the message origin but the set of +/// recepients is unknown without further computation which is irrelevant to the +/// message delivery task. +#[derive(Clone, Debug)] +pub struct SourcedMessage { + pub source: usize, + pub message: Message, } /// Message destination can be either of the two: @@ -325,207 +44,3 @@ impl TargetedMessage { } } } - -/// Message sent by a given source. The sources are consensus nodes indexed 1 -/// through N where N is the total number of nodes. Sourced messages are -/// required when it is essential to know the message origin but the set of -/// recepients is unknown without further computation which is irrelevant to the -/// message delivery task. -#[derive(Clone, Debug)] -pub struct SourcedMessage { - pub source: usize, - pub message: Message, -} - -/// The messaging struct allows for targeted message exchange between comms -/// tasks on one side and algo tasks on the other. -pub struct Messaging { - /// The total number of consensus nodes for indexing purposes. - num_nodes: usize, - - /// Transmit sides of message channels to comms threads. - txs_to_comms: Vec>>, - /// Receive side of the routed message channel from comms threads. - rx_from_comms: Receiver>, - /// Transmit sides of message channels to algo threads. - txs_to_algo: Vec>>, - /// Receive side of the routed message channel from comms threads. - rx_from_algo: Receiver>, - - /// RX handles to be used by comms tasks. - rxs_to_comms: Vec>>, - /// TX handle to be used by comms tasks. - tx_from_comms: Sender>, - /// RX handles to be used by algo tasks. - rxs_to_algo: Vec>>, - /// TX handle to be used by algo tasks. - tx_from_algo: Sender>, - - /// Control channel used to stop the listening thread. - stop_tx: Sender<()>, - stop_rx: Receiver<()>, -} - -impl Messaging { - /// Initialises all the required TX and RX handles for the case on a total - /// number `num_nodes` of consensus nodes. - pub fn new(num_nodes: usize) -> Self { - let to_comms: Vec<_> = (0..num_nodes - 1) - .map(|_| unbounded::>()) - .collect(); - let txs_to_comms = to_comms.iter().map(|&(ref tx, _)| tx.to_owned()).collect(); - let rxs_to_comms: Vec>> = - to_comms.iter().map(|&(_, ref rx)| rx.to_owned()).collect(); - let (tx_from_comms, rx_from_comms) = unbounded(); - let to_algo: Vec<_> = (0..num_nodes) - .map(|_| unbounded::>()) - .collect(); - let txs_to_algo = to_algo.iter().map(|&(ref tx, _)| tx.to_owned()).collect(); - let rxs_to_algo: Vec>> = - to_algo.iter().map(|&(_, ref rx)| rx.to_owned()).collect(); - let (tx_from_algo, rx_from_algo) = unbounded(); - - let (stop_tx, stop_rx) = bounded(1); - - Messaging { - num_nodes, - - // internally used handles - txs_to_comms, - rx_from_comms, - txs_to_algo, - rx_from_algo, - - // externally used handles - rxs_to_comms, - tx_from_comms, - rxs_to_algo, - tx_from_algo, - - stop_tx, - stop_rx, - } - } - - pub fn num_nodes(&self) -> usize { - self.num_nodes - } - - pub fn rxs_to_comms(&self) -> &Vec>> { - &self.rxs_to_comms - } - - pub fn tx_from_comms(&self) -> &Sender> { - &self.tx_from_comms - } - - pub fn rxs_to_algo(&self) -> &Vec>> { - &self.rxs_to_algo - } - - pub fn tx_from_algo(&self) -> &Sender> { - &self.tx_from_algo - } - - /// Gives the ownership of the handle to stop the message receive loop. - pub fn stop_tx(&self) -> Sender<()> { - self.stop_tx.to_owned() - } - - /// Spawns the message delivery thread in a given thread scope. - pub fn spawn<'a>(&self, scope: &Scope<'a>) -> ScopedJoinHandle> - where - T: 'a, - { - let txs_to_comms = self.txs_to_comms.to_owned(); - let rx_from_comms = self.rx_from_comms.to_owned(); - let txs_to_algo = self.txs_to_algo.to_owned(); - let rx_from_algo = self.rx_from_algo.to_owned(); - - let stop_rx = self.stop_rx.to_owned(); - let mut stop = false; - - // TODO: `select_loop!` seems to really confuse Clippy. - #[cfg_attr( - feature = "cargo-clippy", - allow(never_loop, if_let_redundant_pattern_matching, deref_addrof) - )] - scope.spawn(move || { - let mut result = Ok(()); - // This loop forwards messages according to their metadata. - while !stop && result.is_ok() { - select_loop! { - recv(rx_from_algo, message) => { - match message { - TargetedMessage { - target: Target::All, - message - } => { - // Send the message to all remote nodes, stopping at - // the first error. - result = txs_to_comms.iter() - .fold(Ok(()), |result, tx| { - if result.is_ok() { - tx.send(message.clone()) - } - else { - result - } - }).map_err(Error::from); - }, - TargetedMessage { - target: Target::Node(i), - message - } => { - // Remote node indices start from 1. - assert!(i > 0); - // Convert node index to vector index. - let i = i - 1; - - result = if i < txs_to_comms.len() { - txs_to_comms[i].send(message.clone()) - .map_err(Error::from) - } - else { - Err(Error::NoSuchTarget) - }; - } - } - }, - recv(rx_from_comms, message) => { - // Send the message to all algorithm instances, stopping at - // the first error. - result = txs_to_algo.iter().fold(Ok(()), |result, tx| { - if result.is_ok() { - tx.send(message.clone()) - } - else { - result - } - }).map_err(Error::from) - }, - recv(stop_rx, _) => { - // Flag the thread ready to exit. - stop = true; - } - } - } // end of select_loop! - result - }) - } -} - -#[derive(Clone, Debug)] -pub enum Error { - NoSuchAlgorithm, - NoSuchRemote, - RecvError, - NoSuchTarget, - SendError, -} - -impl From> for Error { - fn from(_: crossbeam_channel::SendError) -> Error { - Error::SendError - } -} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 24c6478..47bba6f 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -26,6 +26,7 @@ pub enum BroadcastMessage { Ready(Vec), } +/// Wrapper for a byte array, whose `Debug` implementation outputs shortened hexadecimal strings. pub struct HexBytes<'a>(pub &'a [u8]); impl<'a> fmt::Debug for HexBytes<'a> { @@ -166,6 +167,7 @@ impl BroadcastMessage { BroadcastMessage::Ready(h) => { let mut r = ReadyProto::new(); r.set_root_hash(h); + b.set_ready(r); } } b diff --git a/src/proto_io.rs b/src/proto_io.rs index 9b551a2..169949d 100644 --- a/src/proto_io.rs +++ b/src/proto_io.rs @@ -3,9 +3,9 @@ use proto::*; use protobuf; use protobuf::Message as ProtobufMessage; -use std::io::Read; +use std::io; +use std::io::{Read, Write}; use std::net::TcpStream; -use std::{cmp, io}; /// A magic key to put right before each message. An atavism of primitive serial /// protocols. @@ -35,95 +35,48 @@ impl From for Error { } } -fn encode_u32_to_be(value: u32, buffer: &mut [u8]) -> Result<(), Error> { - if buffer.len() < 4 { - return Err(Error::EncodeError); - } - let value = value.to_le(); - buffer[0] = ((value & 0xFF00_0000) >> 24) as u8; - buffer[1] = ((value & 0x00FF_0000) >> 16) as u8; - buffer[2] = ((value & 0x0000_FF00) >> 8) as u8; - buffer[3] = (value & 0x0000_00FF) as u8; - Ok(()) +pub struct ProtoIo { + stream: S, } -fn decode_u32_from_be(buffer: &[u8]) -> Result { - if buffer.len() < 4 { - return Err(Error::DecodeError); +impl ProtoIo { + pub fn try_clone(&self) -> Result, ::std::io::Error> { + Ok(ProtoIo { + stream: self.stream.try_clone()?, + }) } - let mut result = u32::from(buffer[0]); - result <<= 8; - result += u32::from(buffer[1]); - result <<= 8; - result += u32::from(buffer[2]); - result <<= 8; - result += u32::from(buffer[3]); - Ok(result) -} - -pub struct ProtoIo { - stream: TcpStream, - buffer: [u8; 1024 * 4], } /// A message handling task. -impl ProtoIo +impl ProtoIo //where T: Clone + Send + Sync + From> + Into> { - pub fn from_stream(stream: TcpStream) -> Self { - ProtoIo { - stream, - buffer: [0; 1024 * 4], - } - } - - pub fn try_clone(&self) -> Result { - Ok(ProtoIo { - stream: self.stream.try_clone()?, - buffer: self.buffer, - }) + pub fn from_stream(stream: S) -> Self { + ProtoIo { stream } } pub fn recv(&mut self) -> Result, Error> where T: Clone + Send + Sync + From>, // + Into> { - self.stream.read_exact(&mut self.buffer[0..4])?; - let frame_start = decode_u32_from_be(&self.buffer[0..4])?; - if frame_start != FRAME_START { + let mut stream = protobuf::CodedInputStream::new(&mut self.stream); + // Read magic number + if stream.read_raw_varint32()? != FRAME_START { return Err(Error::FrameStartMismatch); - }; - self.stream.read_exact(&mut self.buffer[0..4])?; - let size = decode_u32_from_be(&self.buffer[0..4])? as usize; - - let mut message_v: Vec = Vec::new(); - message_v.reserve(size); - while message_v.len() < size { - let num_to_read = cmp::min(self.buffer.len(), size - message_v.len()); - let (slice, _) = self.buffer.split_at_mut(num_to_read); - self.stream.read_exact(slice)?; - message_v.extend_from_slice(slice); } - - Message::parse_from_bytes(&message_v).map_err(Error::ProtobufError) + Message::from_proto(stream.read_message()?).ok_or(Error::DecodeError) } pub fn send(&mut self, message: Message) -> Result<(), Error> where T: Clone + Send + Sync + Into>, { - let mut buffer: [u8; 4] = [0; 4]; - // Wrap stream let mut stream = protobuf::CodedOutputStream::new(&mut self.stream); // Write magic number - encode_u32_to_be(FRAME_START, &mut buffer[0..4])?; - stream.write_raw_bytes(&buffer)?; + stream.write_raw_varint32(FRAME_START)?; let message_p = message.into_proto(); - // Write message size - encode_u32_to_be(message_p.compute_size(), &mut buffer[0..4])?; - stream.write_raw_bytes(&buffer)?; // Write message - message_p.write_to(&mut stream)?; + message_p.write_length_delimited_to(&mut stream)?; // Flush stream.flush()?; Ok(()) @@ -133,14 +86,23 @@ impl ProtoIo #[cfg(test)] mod tests { use proto_io::*; + use std::io::Cursor; - /// Test the requirement that composing encoding with decoding yields the - /// identity. #[test] - fn encode_decode_identity() { - let mut buffer: [u8; 4] = [0; 4]; - encode_u32_to_be(FRAME_START, &mut buffer[0..4]).unwrap(); - let frame_start = decode_u32_from_be(&buffer[0..4]).unwrap(); - assert_eq!(frame_start, FRAME_START); + fn encode_decode_message() { + let msg0: Message> = + Message::Broadcast(BroadcastMessage::Ready(b"Test 0".to_vec())); + let msg1: Message> = + Message::Broadcast(BroadcastMessage::Ready(b"Test 1".to_vec())); + let mut pio = ProtoIo::from_stream(Cursor::new(Vec::new())); + pio.send(msg0.clone()).expect("send msg0"); + pio.send(msg1.clone()).expect("send msg1"); + println!("{:?}", pio.stream.get_ref()); + pio.stream.set_position(0); + assert_eq!(msg0, pio.recv().expect("recv msg0")); + // TODO: Figure out why the cursor is wrong here. + let len = pio.stream.get_ref().len() as u64; + pio.stream.set_position(len / 2); + assert_eq!(msg1, pio.recv().expect("recv msg1")); } } diff --git a/tests/broadcast.rs b/tests/broadcast.rs index 450afe9..7a333dc 100644 --- a/tests/broadcast.rs +++ b/tests/broadcast.rs @@ -14,12 +14,13 @@ use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque}; use std::fmt; use hbbft::broadcast::{Broadcast, BroadcastTarget, TargetedBroadcastMessage}; -use hbbft::messaging::ProposedValue; use hbbft::proto::BroadcastMessage; #[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy)] struct NodeId(usize); +type ProposedValue = Vec; + type MessageQueue = VecDeque>; /// A "node" running a broadcast instance.