From 6117e11a9e7cb5fa73ef39c7d2055eda377f5a8c Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Mon, 30 Apr 2018 18:55:51 +0300 Subject: [PATCH] applied rustfmt --- build.rs | 11 +- examples/consensus-node.rs | 11 +- src/broadcast.rs | 555 ++++++++++++++++++------------------- src/common_subset.rs | 148 +++++----- src/commst.rs | 63 ++--- src/connection.rs | 39 ++- src/lib.rs | 12 +- src/messaging.rs | 364 ++++++++++++------------ src/node.rs | 118 ++++---- src/proto/mod.rs | 153 +++++----- src/proto_io.rs | 30 +- tests/broadcast.rs | 103 +++---- tests/netsim.rs | 16 +- 13 files changed, 802 insertions(+), 821 deletions(-) diff --git a/build.rs b/build.rs index cc992b3..6f22916 100644 --- a/build.rs +++ b/build.rs @@ -12,18 +12,19 @@ fn protoc_exists() -> bool { } } } - None => println!("PATH environment variable is not defined.") + None => println!("PATH environment variable is not defined."), } false } fn main() { if !protoc_exists() { - panic!(" + panic!( + " protoc cannot be found. Install the protobuf compiler in the \ -system path."); - } - else { +system path." + ); + } else { println!("cargo:rerun-if-changed=proto/message.proto"); protoc_rust::run(protoc_rust::Args { out_dir: "src/proto", diff --git a/examples/consensus-node.rs b/examples/consensus-node.rs index 237f83e..1af2aa2 100644 --- a/examples/consensus-node.rs +++ b/examples/consensus-node.rs @@ -1,13 +1,13 @@ //! Example of a consensus node that uses the `hbbft::node::Node` struct for //! running the distributed consensus state machine. //#[macro_use] -extern crate log; -extern crate simple_logger; extern crate docopt; extern crate hbbft; +extern crate log; +extern crate simple_logger; -use hbbft::node::Node; use docopt::Docopt; +use hbbft::node::Node; use std::collections::HashSet; use std::net::SocketAddr; use std::vec::Vec; @@ -38,15 +38,14 @@ fn parse_args() -> Args { Args { value: if args.get_count("--value") > 0 { Some(args.get_str("--value").as_bytes().to_vec()) - } - else { + } else { None }, bind_address: args.get_str("--bind-address").parse().unwrap(), remote_addresses: args.get_vec("--remote-address") .iter() .map(|s| s.parse().unwrap()) - .collect() + .collect(), } } diff --git a/src/broadcast.rs b/src/broadcast.rs index 14297bc..8e34e98 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -1,21 +1,20 @@ //! Reliable broadcast algorithm instance. -use std::collections::{HashMap, HashSet, VecDeque}; -use std::fmt::Debug; -use std::sync::{Arc, Mutex, RwLock}; use crossbeam; -use proto::*; -use std::marker::{Send, Sync}; +use crossbeam_channel::{Receiver, RecvError, SendError, Sender}; +use merkle::proof::{Lemma, Positioned, Proof}; use merkle::{Hashable, MerkleTree}; -use merkle::proof::{Proof, Lemma, Positioned}; +use proto::*; use reed_solomon_erasure as rse; use reed_solomon_erasure::ReedSolomon; -use crossbeam_channel::{Sender, Receiver, SendError, RecvError}; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::fmt::Debug; +use std::marker::{Send, Sync}; +use std::sync::{Arc, Mutex, RwLock}; -use messaging::{Target, TargetedMessage, SourcedMessage, - NodeUid, Algorithm, ProposedValue, QMessage, MessageLoopState, - Handler, LocalMessage, RemoteMessage, AlgoMessage, - RemoteNode}; use messaging; +use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoopState, NodeUid, + ProposedValue, QMessage, RemoteMessage, RemoteNode, SourcedMessage, Target, + TargetedMessage}; struct BroadcastState { root_hash: Option>, @@ -40,13 +39,11 @@ pub struct Broadcast { /// All the mutable state is confined to the `state` field. This allows to /// mutate state even when the broadcast instance is referred to by an /// immutable reference. - state: RwLock + state: RwLock, } impl Broadcast { - pub fn new(uid: NodeUid, all_uids: HashSet, num_nodes: usize) -> - Result - { + pub fn new(uid: NodeUid, all_uids: HashSet, num_nodes: usize) -> Result { let num_faulty_nodes = (num_nodes - 1) / 3; let parity_shard_num = 2 * num_faulty_nodes; let data_shard_num = num_nodes - parity_shard_num; @@ -67,68 +64,61 @@ impl Broadcast { readys: HashMap::new(), ready_sent: false, ready_to_decode: false, - }) + }), }) } /// The message-driven interface function for calls from the main message /// loop. - pub fn on_message(&self, m: QMessage, tx: &Sender) -> - Result - where E: From + From - { match m { - QMessage::Local(LocalMessage { - message, - .. - }) => { - match message { - AlgoMessage::BroadcastInput(value) => { - self.on_local_message(&mut value.to_owned()) + pub fn on_message(&self, m: QMessage, tx: &Sender) -> Result + where + E: From + From, + { + match m { + QMessage::Local(LocalMessage { message, .. }) => match message { + AlgoMessage::BroadcastInput(value) => self.on_local_message(&mut value.to_owned()), + + _ => Err(Error::UnexpectedMessage).map_err(E::from), + }, + + QMessage::Remote(RemoteMessage { + node: RemoteNode::Node(uid), + message, + }) => { + if let Message::Broadcast(b) = message { + self.on_remote_message(uid, &b, tx) + } else { + Err(Error::UnexpectedMessage).map_err(E::from) } - - _ => Err(Error::UnexpectedMessage).map_err(E::from) } - }, - QMessage::Remote(RemoteMessage { - node: RemoteNode::Node(uid), - message - }) => { - if let Message::Broadcast(b) = message { - self.on_remote_message(uid, &b, tx) - } - else { - Err(Error::UnexpectedMessage).map_err(E::from) - } - }, - - _ => Err(Error::UnexpectedMessage).map_err(E::from) - }} + _ => Err(Error::UnexpectedMessage).map_err(E::from), + } + } /// Processes the proposed value input by broadcasting it. - fn on_local_message(&self, value: &mut ProposedValue) -> - Result - where E: From + From + fn on_local_message(&self, value: &mut ProposedValue) -> Result + where + E: From + From, { let mut state = self.state.write().unwrap(); // Split the value into chunks/shards, encode them with erasure codes. // Assemble a Merkle tree from data and parity shards. Take all proofs // from this tree and send them, each to its own node. - self.send_shards(value) - .map(|(proof, remote_messages)| { - // Record the first proof as if it were sent by the node to - // itself. - let h = proof.root_hash.clone(); - if proof.validate(h.as_slice()) { - // Save the leaf value for reconstructing the tree later. - state.leaf_values[index_of_proof(&proof)] = - Some(proof.value.clone().into_boxed_slice()); - state.leaf_values_num += 1; - state.root_hash = Some(h); - } + self.send_shards(value).map(|(proof, remote_messages)| { + // Record the first proof as if it were sent by the node to + // itself. + let h = proof.root_hash.clone(); + if proof.validate(h.as_slice()) { + // Save the leaf value for reconstructing the tree later. + state.leaf_values[index_of_proof(&proof)] = + Some(proof.value.clone().into_boxed_slice()); + state.leaf_values_num += 1; + state.root_hash = Some(h); + } - MessageLoopState::Processing(remote_messages) - }) + MessageLoopState::Processing(remote_messages) + }) } /// Breaks the input value into shards of equal length and encodes them -- @@ -136,15 +126,20 @@ impl Broadcast { /// scheme. The returned value contains the shard assigned to this /// node. That shard doesn't need to be sent anywhere. It gets recorded in /// the broadcast instance. - fn send_shards(&self, value: &mut ProposedValue) -> - Result<(Proof, VecDeque), E> - where E: From + From + fn send_shards( + &self, + value: &mut ProposedValue, + ) -> Result<(Proof, VecDeque), E> + where + E: From + From, { let data_shard_num = self.coding.data_shard_count(); let parity_shard_num = self.coding.parity_shard_count(); - debug!("Data shards: {}, parity shards: {}", - self.data_shard_num, parity_shard_num); + debug!( + "Data shards: {}, parity shards: {}", + self.data_shard_num, parity_shard_num + ); // Insert the length of `v` so it can be decoded without the padding. let payload_len = value.len() as u8; value.insert(0, payload_len); // TODO: Handle messages larger than 255 @@ -153,8 +148,7 @@ impl Broadcast { // Size of a Merkle tree leaf value, in bytes. let shard_len = if value_len % data_shard_num > 0 { value_len / data_shard_num + 1 - } - else { + } else { value_len / data_shard_num }; // Pad the last data shard with zeros. Fill the parity shards with @@ -174,12 +168,13 @@ impl Broadcast { debug!("Shards before encoding: {:?}", shards); // Construct the parity chunks/shards - self.coding.encode(shards.as_mut_slice()).map_err(Error::from)?; + self.coding + .encode(shards.as_mut_slice()) + .map_err(Error::from)?; debug!("Shards: {:?}", shards); - let shards_t: Vec = - shards.into_iter().map(|s| s.to_vec()).collect(); + let shards_t: Vec = shards.into_iter().map(|s| s.to_vec()).collect(); // Convert the Merkle tree into a partial binary tree for later // deconstruction into compound branches. @@ -196,15 +191,12 @@ impl Broadcast { if uid == self.uid { // The proof is addressed to this node. result = Ok(proof); - } - else { + } else { // Rest of the proofs are sent to remote nodes. - outgoing.push_back( - RemoteMessage { - node: RemoteNode::Node(uid), - message: Message::Broadcast( - BroadcastMessage::Value(proof)) - }); + outgoing.push_back(RemoteMessage { + node: RemoteNode::Node(uid), + message: Message::Broadcast(BroadcastMessage::Value(proof)), + }); } } } @@ -213,11 +205,14 @@ impl Broadcast { } /// Handler of messages received from remote nodes. - fn on_remote_message(&self, uid: NodeUid, - message: &BroadcastMessage, - tx: &Sender) -> - Result - where E: From + From + fn on_remote_message( + &self, + uid: NodeUid, + message: &BroadcastMessage, + tx: &Sender, + ) -> Result + where + E: From + From, { let mut state = self.state.write().unwrap(); let no_outgoing = Ok(MessageLoopState::Processing(VecDeque::new())); @@ -228,13 +223,15 @@ impl Broadcast { if uid != self.uid { // Ignore value messages from unrelated remote nodes. no_outgoing - } - else { + } else { // Initialise the root hash if not already initialised. if state.root_hash.is_none() { state.root_hash = Some(p.root_hash.clone()); - debug!("Node {} Value root hash {:?}", - self.uid, HexBytes(&p.root_hash)); + debug!( + "Node {} Value root hash {:?}", + self.uid, + HexBytes(&p.root_hash) + ); } if let Some(ref h) = state.root_hash.clone() { @@ -248,22 +245,20 @@ impl Broadcast { } // Enqueue a broadcast of an echo of this proof. - Ok(MessageLoopState::Processing(VecDeque::from( - vec![RemoteMessage { + Ok(MessageLoopState::Processing(VecDeque::from(vec![ + RemoteMessage { node: RemoteNode::All, - message: Message::Broadcast( - BroadcastMessage::Echo(p.clone())) - }] - ))) + message: Message::Broadcast(BroadcastMessage::Echo(p.clone())), + }, + ]))) } - }, + } // An echo received. Verify the proof it contains. BroadcastMessage::Echo(p) => { if state.root_hash.is_none() && uid == self.uid { state.root_hash = Some(p.root_hash.clone()); - debug!("Node {} Echo root hash {:?}", - self.uid, state.root_hash); + debug!("Node {} Echo root hash {:?}", self.uid, state.root_hash); } // Call validate with the root hash as argument. @@ -279,61 +274,61 @@ impl Broadcast { // Upon receiving 2f + 1 matching READY(h) // messages, wait for N − 2 f ECHO messages, // then decode v. Return the decoded v to ACS. - if state.ready_to_decode && - state.leaf_values_num >= - self.num_nodes - 2 * self.num_faulty_nodes + if state.ready_to_decode + && state.leaf_values_num >= self.num_nodes - 2 * self.num_faulty_nodes { - let value = - decode_from_shards( - &mut state.leaf_values, - &self.coding, - self.data_shard_num, h)?; + let value = decode_from_shards( + &mut state.leaf_values, + &self.coding, + self.data_shard_num, + h, + )?; tx.send(QMessage::Local(LocalMessage { dst: Algorithm::CommonSubset, - message: AlgoMessage::BroadcastOutput( - uid, - value) + message: AlgoMessage::BroadcastOutput(uid, value), })).map_err(Error::from)?; no_outgoing - } - else if state.leaf_values_num >= - self.num_nodes - self.num_faulty_nodes - { - let result: Result = - decode_from_shards( - &mut state.leaf_values, - &self.coding, - self.data_shard_num, h); - match result { Ok(_) => { - // if Ready has not yet been sent, multicast - // Ready - if !state.ready_sent { - state.ready_sent = true; + } else if state.leaf_values_num >= self.num_nodes - self.num_faulty_nodes { + let result: Result = decode_from_shards( + &mut state.leaf_values, + &self.coding, + self.data_shard_num, + h, + ); + match result { + Ok(_) => { + // if Ready has not yet been sent, multicast + // Ready + if !state.ready_sent { + state.ready_sent = true; - Ok(MessageLoopState::Processing( - VecDeque::from(vec![RemoteMessage { - node: RemoteNode::All, - message: Message::Broadcast( - BroadcastMessage::Ready( - h.to_owned())) - }]) - )) - } else { no_outgoing } - }, Err(e) => Err(E::from(e)) } - } else { no_outgoing } - } - else { - debug!("Broadcast/{} cannot validate Echo {:?}", - self.uid, p); + Ok(MessageLoopState::Processing(VecDeque::from(vec![ + RemoteMessage { + node: RemoteNode::All, + message: Message::Broadcast( + BroadcastMessage::Ready(h.to_owned()), + ), + }, + ]))) + } else { + no_outgoing + } + } + Err(e) => Err(E::from(e)), + } + } else { + no_outgoing + } + } else { + debug!("Broadcast/{} cannot validate Echo {:?}", self.uid, p); no_outgoing } - } - else { + } else { error!("Broadcast/{} root hash not initialised", self.uid); no_outgoing } - }, + } BroadcastMessage::Ready(ref hash) => { // Update the number Ready has been received with this hash. @@ -346,14 +341,11 @@ impl Broadcast { // Upon receiving f + 1 matching Ready(h) messages, if Ready // has not yet been sent, multicast Ready(h). - if (ready_num == self.num_faulty_nodes + 1) && - !state.ready_sent - { + if (ready_num == self.num_faulty_nodes + 1) && !state.ready_sent { // Enqueue a broadcast of a ready message. outgoing.push_back(RemoteMessage { node: RemoteNode::All, - message: Message::Broadcast( - BroadcastMessage::Ready(h.to_vec())) + message: Message::Broadcast(BroadcastMessage::Ready(h.to_vec())), }); } @@ -361,39 +353,37 @@ impl Broadcast { // for N − 2f Echo messages, then decode v. if ready_num > 2 * self.num_faulty_nodes { // Wait for N - 2f Echo messages, then decode v. - if state.echo_num >= - self.num_nodes - 2 * self.num_faulty_nodes - { + if state.echo_num >= self.num_nodes - 2 * self.num_faulty_nodes { let value = decode_from_shards( &mut state.leaf_values, &self.coding, - self.data_shard_num, h)?; + self.data_shard_num, + h, + )?; tx.send(QMessage::Local(LocalMessage { dst: Algorithm::CommonSubset, - message: AlgoMessage::BroadcastOutput( - self.uid, - value) + message: AlgoMessage::BroadcastOutput(self.uid, value), })).map_err(Error::from)?; - } - else { + } else { state.ready_to_decode = true; } } Ok(MessageLoopState::Processing(outgoing)) + } else { + no_outgoing } - else { no_outgoing } } } } } impl<'a, E> Handler for Broadcast -where E: From + From { - fn handle(&self, m: QMessage, tx: Sender) -> - Result - { +where + E: From + From, +{ + fn handle(&self, m: QMessage, tx: Sender) -> Result { self.on_message(m, &tx) } } @@ -418,27 +408,26 @@ pub struct Instance<'a, T: 'a + Clone + Debug + Send + Sync> { /// Number of nodes participating in broadcast. num_nodes: usize, /// Maximum allowed number of faulty nodes. - num_faulty_nodes: usize + num_faulty_nodes: usize, } -impl<'a, T: Clone + Debug + Hashable + Send + Sync - + Into> + From>> +impl<'a, T: Clone + Debug + Hashable + Send + Sync + Into> + From>> Instance<'a, T> { - pub fn new(tx: &'a Sender>, - rx: &'a Receiver>, - broadcast_value: Option, - num_nodes: usize, - node_index: usize) -> - Self - { + pub fn new( + tx: &'a Sender>, + rx: &'a Receiver>, + broadcast_value: Option, + num_nodes: usize, + node_index: usize, + ) -> Self { Instance { tx, rx, broadcast_value, node_index, num_nodes, - num_faulty_nodes: (num_nodes - 1) / 3 + num_faulty_nodes: (num_nodes - 1) / 3, } } @@ -453,16 +442,19 @@ impl<'a, T: Clone + Debug + Hashable + Send + Sync crossbeam::scope(|scope| { scope.spawn(move || { - *result_r_scoped.lock().unwrap() = - Some(inner_run(self.tx, self.rx, bvalue, - self.node_index, self.num_nodes, - self.num_faulty_nodes)); + *result_r_scoped.lock().unwrap() = Some(inner_run( + self.tx, + self.rx, + bvalue, + self.node_index, + self.num_nodes, + self.num_faulty_nodes, + )); }); }); if let Some(ref r) = *result_r.lock().unwrap() { result = r.to_owned(); - } - else { + } else { result = Err(Error::Threading); } result @@ -480,28 +472,31 @@ pub enum Error { SendDeprecated(SendError>), Recv(RecvError), UnexpectedMessage, - NotImplemented + NotImplemented, } -impl From for Error -{ - fn from(err: rse::Error) -> Error { Error::ReedSolomon(err) } +impl From for Error { + fn from(err: rse::Error) -> Error { + Error::ReedSolomon(err) + } } -impl From> for Error -{ - fn from(err: SendError) -> Error { Error::Send(err) } +impl From> for Error { + fn from(err: SendError) -> Error { + Error::Send(err) + } } -impl From>> for Error -{ - fn from(err: SendError>) -> - Error { Error::SendDeprecated(err) } +impl From>> for Error { + fn from(err: SendError>) -> Error { + Error::SendDeprecated(err) + } } -impl From for Error -{ - fn from(err: RecvError) -> Error { Error::Recv(err) } +impl From for Error { + fn from(err: RecvError) -> Error { + Error::Recv(err) + } } /// Breaks the input value into shards of equal length and encodes them -- and @@ -509,17 +504,21 @@ impl From for Error /// returned value contains the shard assigned to this node. That shard doesn't /// need to be sent anywhere. It is returned to the broadcast instance and gets /// recorded immediately. -fn send_shards<'a, T>(value: T, - tx: &'a Sender>, - coding: &ReedSolomon) -> - Result, Error> -where T: Clone + Debug + Hashable + Send + Sync + Into> + From> +fn send_shards<'a, T>( + value: T, + tx: &'a Sender>, + coding: &ReedSolomon, +) -> Result, Error> +where + T: Clone + Debug + Hashable + Send + Sync + Into> + From>, { let data_shard_num = coding.data_shard_count(); let parity_shard_num = coding.parity_shard_count(); - debug!("Data shards: {}, parity shards: {}", - data_shard_num, parity_shard_num); + debug!( + "Data shards: {}, parity shards: {}", + data_shard_num, parity_shard_num + ); let mut v: Vec = T::into(value); // Insert the length of `v` so it can be decoded without the padding. let payload_len = v.len() as u8; @@ -528,8 +527,7 @@ where T: Clone + Debug + Hashable + Send + Sync + Into> + From> // Size of a Merkle tree leaf value, in bytes. let shard_len = if value_len % data_shard_num > 0 { value_len / data_shard_num + 1 - } - else { + } else { value_len / data_shard_num }; // Pad the last data shard with zeros. Fill the parity shards with zeros. @@ -552,8 +550,7 @@ where T: Clone + Debug + Hashable + Send + Sync + Into> + From> debug!("Shards: {:?}", shards); - let shards_t: Vec = - shards.into_iter().map(|s| s.to_vec()).collect(); + let shards_t: Vec = shards.into_iter().map(|s| s.to_vec()).collect(); // Convert the Merkle tree into a partial binary tree for later // deconstruction into compound branches. @@ -569,15 +566,12 @@ where T: Clone + Debug + Hashable + Send + Sync + Into> + From> if i == 0 { // The first proof is addressed to this node. result = Ok(proof); - } - else { + } else { // Rest of the proofs are sent to remote nodes. - tx.send( - TargetedMessage { - target: Target::Node(i), - message: Message::Broadcast( - BroadcastMessage::Value(proof)) - })?; + tx.send(TargetedMessage { + target: Target::Node(i), + message: Message::Broadcast(BroadcastMessage::Value(proof)), + })?; } } } @@ -586,14 +580,16 @@ where T: Clone + Debug + Hashable + Send + Sync + Into> + From> } /// The main loop of the broadcast task. -fn inner_run<'a, T>(tx: &'a Sender>, - rx: &'a Receiver>, - broadcast_value: Option, - node_index: usize, - num_nodes: usize, - num_faulty_nodes: usize) -> - Result -where T: Clone + Debug + Hashable + Send + Sync + Into> + From> +fn inner_run<'a, T>( + tx: &'a Sender>, + rx: &'a Receiver>, + broadcast_value: Option, + node_index: usize, + num_nodes: usize, + num_faulty_nodes: usize, +) -> Result +where + T: Clone + Debug + Hashable + Send + Sync + Into> + From>, { // Erasure coding scheme: N - 2f value shards and 2f parity shards let parity_shard_num = 2 * num_faulty_nodes; @@ -611,19 +607,17 @@ where T: Clone + Debug + Hashable + Send + Sync + Into> + From> // Assemble a Merkle tree from data and parity shards. Take all proofs from // this tree and send them, each to its own node. if let Some(v) = broadcast_value { - send_shards(v, tx, &coding) - .map(|proof| { - // Record the first proof as if it were sent by the node to - // itself. - let h = proof.root_hash.clone(); - if proof.validate(h.as_slice()) { - // Save the leaf value for reconstructing the tree later. - leaf_values[index_of_proof(&proof)] = - Some(proof.value.clone().into_boxed_slice()); - leaf_values_num += 1; - root_hash = Some(h); - } - })? + send_shards(v, tx, &coding).map(|proof| { + // Record the first proof as if it were sent by the node to + // itself. + let h = proof.root_hash.clone(); + if proof.validate(h.as_slice()) { + // Save the leaf value for reconstructing the tree later. + leaf_values[index_of_proof(&proof)] = Some(proof.value.clone().into_boxed_slice()); + leaf_values_num += 1; + root_hash = Some(h); + } + })? } // return value @@ -641,8 +635,9 @@ where T: Clone + Debug + Hashable + Send + Sync + Into> + From> let message = rx.recv()?; if let SourcedMessage { source: i, - message: Message::Broadcast(message) - } = message { + message: Message::Broadcast(message), + } = message + { match message { // A value received. Record the value and multicast an echo. BroadcastMessage::Value(p) => { @@ -653,8 +648,11 @@ where T: Clone + Debug + Hashable + Send + Sync + Into> + From> if root_hash.is_none() { root_hash = Some(p.root_hash.clone()); - debug!("Node {} Value root hash {:?}", - node_index, HexBytes(&p.root_hash)); + debug!( + "Node {} Value root hash {:?}", + node_index, + HexBytes(&p.root_hash) + ); } if let Some(ref h) = root_hash { @@ -669,16 +667,15 @@ where T: Clone + Debug + Hashable + Send + Sync + Into> + From> // Broadcast an echo of this proof. tx.send(TargetedMessage { target: Target::All, - message: Message::Broadcast(BroadcastMessage::Echo(p)) + message: Message::Broadcast(BroadcastMessage::Echo(p)), })? - }, + } // An echo received. Verify the proof it contains. BroadcastMessage::Echo(p) => { if root_hash.is_none() && i == node_index { root_hash = Some(p.root_hash.clone()); - debug!("Node {} Echo root hash {:?}", - node_index, root_hash); + debug!("Node {} Echo root hash {:?}", node_index, root_hash); } // call validate with the root hash as argument @@ -694,37 +691,37 @@ where T: Clone + Debug + Hashable + Send + Sync + Into> + From> // upon receiving 2f + 1 matching READY(h) // messages, wait for N − 2 f ECHO messages, then // decode v - if ready_to_decode && - leaf_values_num >= - num_nodes - 2 * num_faulty_nodes + if ready_to_decode + && leaf_values_num >= num_nodes - 2 * num_faulty_nodes { - result = Some( - decode_from_shards(&mut leaf_values, - &coding, - data_shard_num, h)); - } - else if leaf_values_num >= - num_nodes - num_faulty_nodes - { - result = Some( - decode_from_shards(&mut leaf_values, - &coding, - data_shard_num, h)); + result = Some(decode_from_shards( + &mut leaf_values, + &coding, + data_shard_num, + h, + )); + } else if leaf_values_num >= num_nodes - num_faulty_nodes { + result = Some(decode_from_shards( + &mut leaf_values, + &coding, + data_shard_num, + h, + )); // if Ready has not yet been sent, multicast // Ready if !ready_sent { ready_sent = true; tx.send(TargetedMessage { target: Target::All, - message: Message::Broadcast( - BroadcastMessage::Ready( - h.to_owned())) + message: Message::Broadcast(BroadcastMessage::Ready( + h.to_owned(), + )), })?; } } } } - }, + } BroadcastMessage::Ready(ref hash) => { // Update the number Ready has been received with this hash. @@ -736,14 +733,10 @@ where T: Clone + Debug + Hashable + Send + Sync + Into> + From> // Upon receiving f + 1 matching Ready(h) messages, if // Ready has not yet been sent, multicast Ready(h). - if (ready_num == num_faulty_nodes + 1) && - !ready_sent - { + if (ready_num == num_faulty_nodes + 1) && !ready_sent { tx.send(TargetedMessage { target: Target::All, - message: Message::Broadcast( - BroadcastMessage::Ready( - h.to_vec())) + message: Message::Broadcast(BroadcastMessage::Ready(h.to_vec())), })?; } @@ -752,20 +745,20 @@ where T: Clone + Debug + Hashable + Send + Sync + Into> + From> if ready_num > 2 * num_faulty_nodes { // Wait for N - 2f Echo messages, then decode v. if echo_num >= num_nodes - 2 * num_faulty_nodes { - result = Some( - decode_from_shards(&mut leaf_values, - &coding, - data_shard_num, h)); - } - else { + result = Some(decode_from_shards( + &mut leaf_values, + &coding, + data_shard_num, + h, + )); + } else { ready_to_decode = true; } } } } } - } - else { + } else { error!("Incorrect message from the socket: {:?}", message); } } @@ -773,12 +766,14 @@ where T: Clone + Debug + Hashable + Send + Sync + Into> + From> result.unwrap() } -fn decode_from_shards(leaf_values: &mut Vec>>, - coding: &ReedSolomon, - data_shard_num: usize, - root_hash: &[u8]) -> - Result -where T: Clone + Debug + Hashable + Send + Sync + From> + Into> +fn decode_from_shards( + leaf_values: &mut Vec>>, + coding: &ReedSolomon, + data_shard_num: usize, + root_hash: &[u8], +) -> Result +where + T: Clone + Debug + Hashable + Send + Sync + From> + Into>, { // Try to interpolate the Merkle tree using the Reed-Solomon erasure coding // scheme. @@ -802,8 +797,7 @@ where T: Clone + Debug + Hashable + Send + Sync + From> + Into> // sensible not to continue trying to reconstruct the tree after this // point. This instance must have received incorrect shards. Err(Error::RootHashMismatch) - } - else { + } else { // Reconstruct the value from the data shards. Ok(glue_shards(mtree, data_shard_num)) } @@ -813,7 +807,8 @@ where T: Clone + Debug + Hashable + Send + Sync + From> + Into> /// type `T`. This is useful for reconstructing the data value held in the tree /// and forgetting the leaves that contain parity information. fn glue_shards(m: MerkleTree, n: usize) -> T -where T: From> + Into> +where + T: From> + Into>, { let t: Vec = m.into_iter().take(n).flat_map(|s| s).collect(); let payload_len = t[0] as usize; diff --git a/src/common_subset.rs b/src/common_subset.rs index 6ef9c54..644a246 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -1,12 +1,11 @@ //! Asynchronous Common Subset algorithm. +use crossbeam_channel::{SendError, Sender}; use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::RwLock; -use crossbeam_channel::{Sender, SendError}; use messaging; -use messaging::{NodeUid, QMessage, MessageLoopState, Handler, LocalMessage, - Algorithm, AlgoMessage}; +use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoopState, NodeUid, QMessage}; struct CommonSubsetState { agreement_inputs: HashMap, @@ -18,14 +17,11 @@ pub struct CommonSubset { uid: NodeUid, num_nodes: usize, num_faulty_nodes: usize, - state: RwLock + state: RwLock, } impl CommonSubset { - pub fn new(uid: NodeUid, num_nodes: usize, - node_uids: HashSet) -> - Self - { + pub fn new(uid: NodeUid, num_nodes: usize, node_uids: HashSet) -> Self { let num_faulty_nodes = (num_nodes - 1) / 3; CommonSubset { @@ -35,98 +31,97 @@ impl CommonSubset { state: RwLock::new(CommonSubsetState { agreement_inputs: HashMap::new(), agreement_true_outputs: HashSet::new(), - agreements_without_input: node_uids - }) + agreements_without_input: node_uids, + }), } } - pub fn on_message(&self, m: QMessage, tx: &Sender) -> - Result - where E: From + From - { match m { - QMessage::Local(LocalMessage { - message, - .. - }) => { - let no_outgoing = Ok(MessageLoopState::Processing(VecDeque::new())); + pub fn on_message(&self, m: QMessage, tx: &Sender) -> Result + where + E: From + From, + { + match m { + QMessage::Local(LocalMessage { message, .. }) => { + let no_outgoing = Ok(MessageLoopState::Processing(VecDeque::new())); - match message { - // Upon receiving input v_i , input v_i to RBC_i. See Figure 2. - AlgoMessage::CommonSubsetInput(value) => { - tx.send(QMessage::Local(LocalMessage { - dst: Algorithm::Broadcast(self.uid), - message: AlgoMessage::BroadcastInput(value) - })).map_err(Error::from)?; - - no_outgoing - } - - // Upon delivery of v_j from RBC_j, if input has not yet been - // provided to BA_j, then provide input 1 to BA_j. See Figure - // 11. - // - // FIXME: Use the output value. - AlgoMessage::BroadcastOutput(uid, _value) => { - let mut state = self.state.write().unwrap(); - if state.agreement_inputs.get(&uid).is_none() { + match message { + // Upon receiving input v_i , input v_i to RBC_i. See Figure 2. + AlgoMessage::CommonSubsetInput(value) => { tx.send(QMessage::Local(LocalMessage { - dst: Algorithm::Agreement(uid), - message: AlgoMessage::AgreementInput(true) + dst: Algorithm::Broadcast(self.uid), + message: AlgoMessage::BroadcastInput(value), })).map_err(Error::from)?; - let _ = state.agreement_inputs.insert(uid, true); - state.agreements_without_input.remove(&uid); + no_outgoing } - no_outgoing - } - - // Upon delivery of value 1 from at least N − f instances of BA, - // provide input 0 to each instance of BA that has not yet been - // provided input. - AlgoMessage::AgreementOutput(uid, true) => { - let mut state = self.state.write().unwrap(); - state.agreement_true_outputs.insert(uid); - - if state.agreement_true_outputs.len() >= - self.num_nodes - self.num_faulty_nodes - { - // FIXME: Avoid cloning the set. - for uid0 in state.agreements_without_input.clone() { + // Upon delivery of v_j from RBC_j, if input has not yet been + // provided to BA_j, then provide input 1 to BA_j. See Figure + // 11. + // + // FIXME: Use the output value. + AlgoMessage::BroadcastOutput(uid, _value) => { + let mut state = self.state.write().unwrap(); + if state.agreement_inputs.get(&uid).is_none() { tx.send(QMessage::Local(LocalMessage { - dst: Algorithm::Agreement(uid0), - message: AlgoMessage::AgreementInput(false) + dst: Algorithm::Agreement(uid), + message: AlgoMessage::AgreementInput(true), })).map_err(Error::from)?; - // TODO: Possibly not required. Keeping in place to - // avoid resending `false`. - let _ = state.agreement_inputs.insert(uid0, false); + let _ = state.agreement_inputs.insert(uid, true); + state.agreements_without_input.remove(&uid); } + + no_outgoing } - no_outgoing - } + // Upon delivery of value 1 from at least N − f instances of BA, + // provide input 0 to each instance of BA that has not yet been + // provided input. + AlgoMessage::AgreementOutput(uid, true) => { + let mut state = self.state.write().unwrap(); + state.agreement_true_outputs.insert(uid); - // FIXME (missing clause): + if state.agreement_true_outputs.len() + >= self.num_nodes - self.num_faulty_nodes + { + // FIXME: Avoid cloning the set. + for uid0 in state.agreements_without_input.clone() { + tx.send(QMessage::Local(LocalMessage { + dst: Algorithm::Agreement(uid0), + message: AlgoMessage::AgreementInput(false), + })).map_err(Error::from)?; + + // TODO: Possibly not required. Keeping in place to + // avoid resending `false`. + let _ = state.agreement_inputs.insert(uid0, false); + } + } + + no_outgoing + } + + // FIXME (missing clause): // // Once all instances of BA have completed, let C ⊂ [1..N] be // the indexes of each BA that delivered 1. Wait for the output // v_j for each RBC_j such that j∈C. Finally output ∪ j∈C v_j. // Catchall - _ => Err(Error::UnexpectedMessage).map_err(E::from) + _ => Err(Error::UnexpectedMessage).map_err(E::from), + } } - }, - _ => Err(Error::UnexpectedMessage).map_err(E::from) - }} + _ => Err(Error::UnexpectedMessage).map_err(E::from), + } + } } impl Handler for CommonSubset -where E: From + From { - fn handle(&self, m: QMessage, tx: Sender) -> - Result - { +where + E: From + From, +{ + fn handle(&self, m: QMessage, tx: Sender) -> Result { self.on_message(m, &tx) } } @@ -138,7 +133,8 @@ pub enum Error { Send(SendError), } -impl From> for Error -{ - fn from(err: SendError) -> Error { Error::Send(err) } +impl From> for Error { + fn from(err: SendError) -> Error { + Error::Send(err) + } } diff --git a/src/commst.rs b/src/commst.rs index 56f55c7..5429479 100644 --- a/src/commst.rs +++ b/src/commst.rs @@ -1,17 +1,17 @@ //! Comms task structure. A comms task communicates with a remote node through a //! socket. Local communication with coordinating threads is made via //! `crossbeam_channel::unbounded()`. -use std::io; -use std::fmt::Debug; -use std::sync::Arc; -use std::net::TcpStream; use crossbeam; -use crossbeam_channel::{Sender, Receiver}; +use crossbeam_channel::{Receiver, Sender}; +use std::fmt::Debug; +use std::io; +use std::net::TcpStream; +use std::sync::Arc; +use messaging::SourcedMessage; use proto::Message; use proto_io; use proto_io::ProtoIo; -use messaging::SourcedMessage; #[derive(Debug)] pub enum Error { @@ -19,14 +19,14 @@ pub enum Error { } impl From for Error { - fn from(err: io::Error) -> Error { Error::IoError(err) } + fn from(err: io::Error) -> Error { + Error::IoError(err) + } } /// A communication task connects a remote node to the thread that manages the /// consensus algorithm. -pub struct CommsTask - <'a, T: 'a + Clone + Debug + Send + Sync + From> + Into>> -{ +pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync + From> + Into>> { /// The transmit side of the multiple producer channel from comms threads. tx: &'a Sender>, /// The receive side of the channel to the comms thread. @@ -34,26 +34,27 @@ pub struct CommsTask /// The socket IO task. io: ProtoIo, /// The index of this comms task for identification against its remote node. - pub node_index: usize + pub node_index: usize, } -impl <'a, T: 'a + Clone + Debug + Send + Sync + From> + Into>> - CommsTask<'a, T> -{ - pub fn new(tx: &'a Sender>, - rx: &'a Receiver>, - stream: TcpStream, - node_index: usize) -> - Self - { - debug!("Creating comms task #{} for {:?}", node_index, - stream.peer_addr().unwrap()); +impl<'a, T: 'a + Clone + Debug + Send + Sync + From> + Into>> CommsTask<'a, T> { + pub fn new( + tx: &'a Sender>, + rx: &'a Receiver>, + stream: TcpStream, + node_index: usize, + ) -> Self { + debug!( + "Creating comms task #{} for {:?}", + node_index, + stream.peer_addr().unwrap() + ); CommsTask { tx, rx, io: ProtoIo::from_stream(stream), - node_index + node_index, } } @@ -84,14 +85,14 @@ impl <'a, T: 'a + Clone + Debug + Send + Sync + From> + Into>> match self.io.recv() { Ok(message) => { debug!("Node {} -> {:?}", node_index, message); - tx.send( - SourcedMessage { - source: node_index, - message - }).unwrap(); - }, - Err(proto_io::Error::ProtobufError(e)) => - warn!("Node {} - Protobuf error {}", node_index, e), + tx.send(SourcedMessage { + source: node_index, + message, + }).unwrap(); + } + Err(proto_io::Error::ProtobufError(e)) => { + warn!("Node {} - Protobuf error {}", node_index, e) + } Err(e) => { warn!("Node {} - Critical error {:?}", node_index, e); break; diff --git a/src/connection.rs b/src/connection.rs index 34d8453..c2061d3 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -2,7 +2,7 @@ use std::collections::HashSet; use std::io::BufReader; -use std::net::{TcpStream, TcpListener, SocketAddr}; +use std::net::{SocketAddr, TcpListener, TcpStream}; #[derive(Debug)] pub struct Connection { @@ -15,24 +15,21 @@ impl Connection { Connection { // Create a read buffer of 1K bytes. reader: BufReader::with_capacity(1024, stream.try_clone().unwrap()), - stream + stream, } } } /// Connect this node to remote peers. A vector of successful connections is /// returned. -pub fn make(bind_address: &SocketAddr, - remote_addresses: &HashSet) -> Vec -{ +pub fn make(bind_address: &SocketAddr, remote_addresses: &HashSet) -> Vec { // Connected remote nodes. -// let mut connected: Vec = Vec::new(); + // let mut connected: Vec = Vec::new(); // Listen for incoming connections on a given TCP port. let bind_address = bind_address; let listener = TcpListener::bind(bind_address).unwrap(); // Initialise initial connection states. - let mut connections: Vec> = - (0 .. remote_addresses.len()) + let mut connections: Vec> = (0..remote_addresses.len()) .into_iter() .map(|_| None) .collect(); @@ -42,14 +39,13 @@ pub fn make(bind_address: &SocketAddr, for (n, &address) in remote_addresses.iter().enumerate() { let there_str = format!("{}", address); if here_str < there_str { - connections[n] = - match listener.accept() { - Ok((stream, _)) => { - info!("Connected to {}", there_str); - Some(Connection::new(stream)) - }, - Err(_) => None + connections[n] = match listener.accept() { + Ok((stream, _)) => { + info!("Connected to {}", there_str); + Some(Connection::new(stream)) } + Err(_) => None, + } } } @@ -57,14 +53,13 @@ pub fn make(bind_address: &SocketAddr, for (n, &address) in remote_addresses.iter().enumerate() { let there_str = format!("{}", address); if here_str > there_str { - connections[n] = - match TcpStream::connect(address) { - Ok(stream) => { - info!("Connected to {}", there_str); - Some(Connection::new(stream)) - }, - Err(_) => None + connections[n] = match TcpStream::connect(address) { + Ok(stream) => { + info!("Connected to {}", there_str); + Some(Connection::new(stream)) } + Err(_) => None, + } } } diff --git a/src/lib.rs b/src/lib.rs index 8743d4b..052f030 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,21 +40,21 @@ #![feature(optin_builtin_traits)] #[macro_use] extern crate log; +extern crate crossbeam; +extern crate merkle; extern crate protobuf; extern crate ring; -extern crate merkle; -extern crate crossbeam; #[macro_use] extern crate crossbeam_channel; extern crate reed_solomon_erasure; +pub mod agreement; +pub mod broadcast; +pub mod common_subset; +mod commst; mod connection; pub mod messaging; pub mod proto; mod proto_io; -mod commst; -pub mod common_subset; -pub mod broadcast; -pub mod agreement; pub mod node; diff --git a/src/messaging.rs b/src/messaging.rs index bb2776a..9663a0a 100644 --- a/src/messaging.rs +++ b/src/messaging.rs @@ -1,12 +1,12 @@ //! The local message delivery system. -use std::collections::{HashSet, HashMap, VecDeque}; +use crossbeam::{Scope, ScopedJoinHandle}; +use crossbeam_channel; +use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; +use proto::Message; +use std::collections::{HashMap, HashSet, VecDeque}; use std::fmt::Debug; use std::net::SocketAddr; use std::sync::RwLock; -use crossbeam::{Scope, ScopedJoinHandle}; -use crossbeam_channel; -use crossbeam_channel::{bounded, unbounded, Sender, Receiver}; -use proto::Message; /// Unique ID of a node. pub type NodeUid = SocketAddr; @@ -62,7 +62,7 @@ pub struct LocalMessage { /// Identifier of the message destination algorithm. pub dst: Algorithm, /// Payload - pub message: AlgoMessage + pub message: AlgoMessage, } /// The message destinations corresponding to a remote node `i`. It can be @@ -75,21 +75,21 @@ pub struct LocalMessage { #[derive(Clone, Debug, PartialEq, Eq)] pub enum RemoteNode { All, - Node(NodeUid) + Node(NodeUid), } /// Message to or from a remote node. #[derive(Clone, Debug, PartialEq)] pub struct RemoteMessage { pub node: RemoteNode, - pub message: Message + pub message: Message, } /// The union type of local and remote messages. #[derive(Clone)] pub enum QMessage { Local(LocalMessage), - Remote(RemoteMessage) + Remote(RemoteMessage), } /// States of the message loop consided as an automaton with output. There is @@ -99,15 +99,14 @@ pub enum QMessage { #[derive(Clone, PartialEq)] pub enum MessageLoopState { Processing(VecDeque), - Finished + Finished, } impl MessageLoopState { pub fn is_processing(&self) -> bool { if let MessageLoopState::Processing(_) = self { true - } - else { + } else { false } } @@ -116,10 +115,8 @@ impl MessageLoopState { /// emitted by the handler to the messages already queued from previous /// iterations of a message handling loop. pub fn append(&mut self, other: &mut MessageLoopState) { - if let MessageLoopState::Processing(ref mut new_msgs) = other - { - if let MessageLoopState::Processing(ref mut msgs) = self - { + if let MessageLoopState::Processing(ref mut new_msgs) = other { + if let MessageLoopState::Processing(ref mut msgs) = self { msgs.append(new_msgs); } } @@ -132,8 +129,7 @@ impl MessageLoopState { /// - either `Finished` or a state with outgoing messages to remote nodes - or /// an error. pub trait Handler>: Send + Sync { - fn handle(&self, m: QMessage, tx: Sender) -> - Result; + fn handle(&self, m: QMessage, tx: Sender) -> Result; } /// The queue functionality for messages sent between algorithm instances. @@ -149,21 +145,20 @@ pub struct MessageLoop<'a, HandlerError: 'a + From> { /// Remote send handles. Messages are sent through channels as opposed to /// directly to sockets. This is done to make tests independent of socket /// IO. - remote_txs: HashMap>> + remote_txs: HashMap>>, } impl<'a, HandlerError> MessageLoop<'a, HandlerError> -where HandlerError: 'a + From +where + HandlerError: 'a + From, { - pub fn new(remote_txs: HashMap>>) -> - Self - { + pub fn new(remote_txs: HashMap>>) -> Self { let (queue_tx, queue_rx) = unbounded(); MessageLoop { algos: RwLock::new(HashMap::new()), queue_tx, queue_rx, - remote_txs + remote_txs, } } @@ -172,14 +167,11 @@ where HandlerError: 'a + From } /// Registers a handler for messages sent to the given algorithm. - pub fn insert_algo(&'a self, algo: Algorithm, - handler: &'a Handler) - { + pub fn insert_algo(&'a self, algo: Algorithm, handler: &'a Handler) { let lock = self.algos.write(); if let Ok(mut map) = lock { map.insert(algo, handler); - } - else { + } else { error!("Cannot insert {:?}", algo); } } @@ -189,15 +181,13 @@ where HandlerError: 'a + From let lock = self.algos.write(); if let Ok(mut map) = lock { map.remove(algo); - } - else { + } else { error!("Cannot remove {:?}", algo); } } /// The message loop. - pub fn run(&self) -> Result - { + pub fn run(&self) -> Result { let mut result = Ok(MessageLoopState::Processing(VecDeque::new())); while let Ok(mut state) = result { @@ -207,107 +197,99 @@ where HandlerError: 'a + From self.send_remote(messages) .map(|_| MessageLoopState::Processing(VecDeque::new())) .map_err(HandlerError::from) - } - else { + } else { Ok(MessageLoopState::Finished) })?; // Receive local and remote messages. - if let Ok(m) = self.queue_rx.recv() { result = match m { - QMessage::Local(LocalMessage { - dst, - message - }) => { - // FIXME: error handling - if let Some(mut handler) = - self.algos.write().unwrap().get_mut(&dst) - { - let mut new_result = - handler.handle(QMessage::Local( - LocalMessage { - dst, - message - }), self.queue_tx.clone() + if let Ok(m) = self.queue_rx.recv() { + result = match m { + QMessage::Local(LocalMessage { dst, message }) => { + // FIXME: error handling + if let Some(mut handler) = self.algos.write().unwrap().get_mut(&dst) { + let mut new_result = handler.handle( + QMessage::Local(LocalMessage { dst, message }), + self.queue_tx.clone(), ); - if let Ok(ref mut new_state) = new_result { - state.append(new_state); - Ok(state) - } - else { - // Error overrides the previous state. - new_result + if let Ok(ref mut new_state) = new_result { + state.append(new_state); + Ok(state) + } else { + // Error overrides the previous state. + new_result + } + } else { + Err(Error::NoSuchAlgorithm).map_err(HandlerError::from) } } - else { - Err(Error::NoSuchAlgorithm).map_err(HandlerError::from) - } - } - // A message FROM a remote node. - QMessage::Remote(RemoteMessage { - node, - message - }) => { - // Multicast the message to all algorithm instances, - // collecting output messages iteratively and appending them - // to result. - // - // FIXME: error handling - self.algos.write().unwrap().iter_mut() - .fold(Ok(state), - |result1, (_, handler)| { - if let Ok(mut state1) = result1 { - handler.handle( - QMessage::Remote(RemoteMessage { - node: node.clone(), - message: message.clone() - }), self.queue_tx.clone() - ).map(|ref mut state2| { - state1.append(state2); - state1 - }) - } - else { - result1 - } - } + // A message FROM a remote node. + QMessage::Remote(RemoteMessage { node, message }) => { + // Multicast the message to all algorithm instances, + // collecting output messages iteratively and appending them + // to result. + // + // FIXME: error handling + self.algos.write().unwrap().iter_mut().fold( + Ok(state), + |result1, (_, handler)| { + if let Ok(mut state1) = result1 { + handler + .handle( + QMessage::Remote(RemoteMessage { + node: node.clone(), + message: message.clone(), + }), + self.queue_tx.clone(), + ) + .map(|ref mut state2| { + state1.append(state2); + state1 + }) + } else { + result1 + } + }, ) + } } - }} else { result = Err(Error::RecvError) - .map_err(HandlerError::from) } + } else { + result = Err(Error::RecvError).map_err(HandlerError::from) + } } // end of while loop result } /// Send a message queue to remote nodes. - fn send_remote(&self, messages: &VecDeque) -> - Result<(), Error> - { + fn send_remote(&self, messages: &VecDeque) -> Result<(), Error> { messages.iter().fold(Ok(()), |result, m| { - if result.is_err() { result } else { match m { - RemoteMessage { - node: RemoteNode::Node(uid), - message - } => { - if let Some(tx) = self.remote_txs.get(&uid) { - tx.send(message.clone()).map_err(Error::from) + if result.is_err() { + result + } else { + match m { + RemoteMessage { + node: RemoteNode::Node(uid), + message, + } => { + if let Some(tx) = self.remote_txs.get(&uid) { + tx.send(message.clone()).map_err(Error::from) + } else { + Err(Error::SendError) + } } - else { - Err(Error::SendError) - } - }, - RemoteMessage { - node: RemoteNode::All, - message - } => { - self.remote_txs.iter().fold(result, |result1, (_, tx)| { - if result1.is_err() { result1 } else { + RemoteMessage { + node: RemoteNode::All, + message, + } => self.remote_txs.iter().fold(result, |result1, (_, tx)| { + if result1.is_err() { + result1 + } else { tx.send(message.clone()).map_err(Error::from) } - }) + }), } - }} + } }) } } @@ -321,26 +303,25 @@ where HandlerError: 'a + From #[derive(Clone, Debug, PartialEq, Eq)] pub enum Target { All, - Node(usize) + Node(usize), } /// Message with a designated target. #[derive(Clone, Debug, PartialEq)] pub struct TargetedMessage { pub target: Target, - pub message: Message + pub message: Message, } -impl TargetedMessage -{ +impl TargetedMessage { /// Initialises a message while checking parameter preconditions. pub fn new(target: Target, message: Message) -> Option { match target { Target::Node(i) if i == 0 => { // Remote node indices start from 1. None - }, - _ => Some(TargetedMessage{target, message}) + } + _ => Some(TargetedMessage { target, message }), } } } @@ -353,7 +334,7 @@ impl TargetedMessage #[derive(Clone, Debug)] pub struct SourcedMessage { pub source: usize, - pub message: Message + pub message: Message, } /// The messaging struct allows for targeted message exchange between comms @@ -388,27 +369,20 @@ pub struct Messaging { impl Messaging { /// Initialises all the required TX and RX handles for the case on a total /// number `num_nodes` of consensus nodes. - pub fn new(num_nodes: usize) -> Self - { - let to_comms: Vec<_> = (0 .. num_nodes - 1) + pub fn new(num_nodes: usize) -> Self { + let to_comms: Vec<_> = (0..num_nodes - 1) .map(|_| unbounded::>()) .collect(); - let txs_to_comms = to_comms.iter() - .map(|&(ref tx, _)| tx.to_owned()) - .collect(); - let rxs_to_comms: Vec>> = to_comms.iter() - .map(|&(_, ref rx)| rx.to_owned()) - .collect(); + let txs_to_comms = to_comms.iter().map(|&(ref tx, _)| tx.to_owned()).collect(); + let rxs_to_comms: Vec>> = + to_comms.iter().map(|&(_, ref rx)| rx.to_owned()).collect(); let (tx_from_comms, rx_from_comms) = unbounded(); - let to_algo: Vec<_> = (0 .. num_nodes) + let to_algo: Vec<_> = (0..num_nodes) .map(|_| unbounded::>()) .collect(); - let txs_to_algo = to_algo.iter() - .map(|&(ref tx, _)| tx.to_owned()) - .collect(); - let rxs_to_algo: Vec>> = to_algo.iter() - .map(|&(_, ref rx)| rx.to_owned()) - .collect(); + let txs_to_algo = to_algo.iter().map(|&(ref tx, _)| tx.to_owned()).collect(); + let rxs_to_algo: Vec>> = + to_algo.iter().map(|&(_, ref rx)| rx.to_owned()).collect(); let (tx_from_algo, rx_from_algo) = unbounded(); let (stop_tx, stop_rx) = bounded(1); @@ -459,9 +433,9 @@ impl Messaging { } /// Spawns the message delivery thread in a given thread scope. - pub fn spawn<'a>(&self, scope: &Scope<'a>) -> - ScopedJoinHandle> - where T: 'a + pub fn spawn<'a>(&self, scope: &Scope<'a>) -> ScopedJoinHandle> + where + T: 'a, { let txs_to_comms = self.txs_to_comms.to_owned(); let rx_from_comms = self.rx_from_comms.to_owned(); @@ -472,66 +446,70 @@ impl Messaging { let mut stop = false; // TODO: `select_loop!` seems to really confuse Clippy. - #[cfg_attr(feature = "cargo-clippy", allow(never_loop, - if_let_redundant_pattern_matching, deref_addrof))] + #[cfg_attr( + feature = "cargo-clippy", + allow(never_loop, if_let_redundant_pattern_matching, deref_addrof) + )] scope.spawn(move || { let mut result = Ok(()); // This loop forwards messages according to their metadata. - while !stop && result.is_ok() { select_loop! { - recv(rx_from_algo, message) => { - match message { - TargetedMessage { - target: Target::All, - message - } => { - // Send the message to all remote nodes, stopping at - // the first error. - result = txs_to_comms.iter() - .fold(Ok(()), |result, tx| { - if result.is_ok() { - tx.send(message.clone()) - } - else { - result - } - }).map_err(Error::from); - }, - TargetedMessage { - target: Target::Node(i), - message - } => { - // Remote node indices start from 1. - assert!(i > 0); - // Convert node index to vector index. - let i = i - 1; + while !stop && result.is_ok() { + select_loop! { + recv(rx_from_algo, message) => { + match message { + TargetedMessage { + target: Target::All, + message + } => { + // Send the message to all remote nodes, stopping at + // the first error. + result = txs_to_comms.iter() + .fold(Ok(()), |result, tx| { + if result.is_ok() { + tx.send(message.clone()) + } + else { + result + } + }).map_err(Error::from); + }, + TargetedMessage { + target: Target::Node(i), + message + } => { + // Remote node indices start from 1. + assert!(i > 0); + // Convert node index to vector index. + let i = i - 1; - result = if i < txs_to_comms.len() { - txs_to_comms[i].send(message.clone()) - .map_err(Error::from) + result = if i < txs_to_comms.len() { + txs_to_comms[i].send(message.clone()) + .map_err(Error::from) + } + else { + Err(Error::NoSuchTarget) + }; + } + } + }, + recv(rx_from_comms, message) => { + // Send the message to all algorithm instances, stopping at + // the first error. + result = txs_to_algo.iter().fold(Ok(()), |result, tx| { + if result.is_ok() { + tx.send(message.clone()) } else { - Err(Error::NoSuchTarget) - }; - } + result + } + }).map_err(Error::from) + }, + recv(stop_rx, _) => { + // Flag the thread ready to exit. + stop = true; } - }, - recv(rx_from_comms, message) => { - // Send the message to all algorithm instances, stopping at - // the first error. - result = txs_to_algo.iter().fold(Ok(()), |result, tx| { - if result.is_ok() { - tx.send(message.clone()) - } - else { - result - } - }).map_err(Error::from) - }, - recv(stop_rx, _) => { - // Flag the thread ready to exit. - stop = true; } - }} // end of select_loop! + } // end of select_loop! result }) } @@ -547,5 +525,7 @@ pub enum Error { } impl From> for Error { - fn from(_: crossbeam_channel::SendError) -> Error { Error::SendError } + fn from(_: crossbeam_channel::SendError) -> Error { + Error::SendError + } } diff --git a/src/node.rs b/src/node.rs index faf473e..4fb9d89 100644 --- a/src/node.rs +++ b/src/node.rs @@ -1,30 +1,34 @@ //! Networking controls of the consensus node. -use std::io; -use std::collections::HashSet; -use std::fmt::Debug; -use std::marker::{Send, Sync}; -use std::net::SocketAddr; use crossbeam; use merkle::Hashable; +use std::collections::HashSet; +use std::fmt::Debug; +use std::io; +use std::marker::{Send, Sync}; +use std::net::SocketAddr; -use connection; use broadcast; use commst; +use connection; use messaging::Messaging; #[derive(Debug)] pub enum Error { IoError(io::Error), CommsError(commst::Error), - NotImplemented + NotImplemented, } impl From for Error { - fn from(err: io::Error) -> Error { Error::IoError(err) } + fn from(err: io::Error) -> Error { + Error::IoError(err) + } } impl From for Error { - fn from(err: commst::Error) -> Error { Error::CommsError(err) } + fn from(err: commst::Error) -> Error { + Error::CommsError(err) + } } /// This is a structure to start a consensus node. @@ -34,24 +38,23 @@ pub struct Node { /// Sockets of remote nodes. remotes: HashSet, /// Optionally, a value to be broadcast by this node. - value: Option + value: Option, } -impl> + Into>> +impl> + Into>> Node { /// Consensus node constructor. It only initialises initial parameters. - pub fn new(addr: SocketAddr, - remotes: HashSet, - value: Option) -> Self - { - Node {addr, remotes, value} + pub fn new(addr: SocketAddr, remotes: HashSet, value: Option) -> Self { + Node { + addr, + remotes, + value, + } } /// Consensus node procedure implementing HoneyBadgerBFT. - pub fn run(&self) -> Result - { + pub fn run(&self) -> Result { let value = &self.value; let connections = connection::make(&self.addr, &self.remotes); let num_nodes = connections.len() + 1; @@ -76,61 +79,65 @@ impl { - debug!("Broadcast instance 0 succeeded: {}", - String::from_utf8(T::into(t)).unwrap()); - }, - Err(e) => error!("Broadcast instance 0: {:?}", e) + debug!( + "Broadcast instance 0 succeeded: {}", + String::from_utf8(T::into(t)).unwrap() + ); + } + Err(e) => error!("Broadcast instance 0: {:?}", e), } })); // Start a comms task for each connection. Node indices of those // tasks are 1 through N where N is the number of connections. for (i, c) in connections.iter().enumerate() { - // Receive side of a single-consumer channel from algorithm // actor tasks to the comms task. let rx_to_comms = &rxs_to_comms[i]; let node_index = i + 1; scope.spawn(move || { - match commst::CommsTask::new(tx_from_comms, - rx_to_comms, - // FIXME: handle error - c.stream.try_clone().unwrap(), - node_index) - .run() + match commst::CommsTask::new( + tx_from_comms, + rx_to_comms, + // FIXME: handle error + c.stream.try_clone().unwrap(), + node_index, + ).run() { Ok(_) => debug!("Comms task {} succeeded", node_index), - Err(e) => error!("Comms task {}: {:?}", node_index, e) + Err(e) => error!("Comms task {}: {:?}", node_index, e), } }); - // Associate a broadcast instance to the above comms task. let rx_to_algo = &rxs_to_algo[node_index]; broadcast_handles.push(scope.spawn(move || { - match broadcast::Instance::new(tx_from_algo, - rx_to_algo, - None, - num_nodes, - node_index) - .run() + match broadcast::Instance::new( + tx_from_algo, + rx_to_algo, + None, + num_nodes, + node_index, + ).run() { Ok(t) => { - debug!("Broadcast instance {} succeeded: {}", - node_index, - String::from_utf8(T::into(t)).unwrap()); - }, - Err(e) => error!("Broadcast instance {}: {:?}", - node_index, e) + debug!( + "Broadcast instance {} succeeded: {}", + node_index, + String::from_utf8(T::into(t)).unwrap() + ); + } + Err(e) => error!("Broadcast instance {}: {:?}", node_index, e), } })); } @@ -142,13 +149,16 @@ impl debug!("Messaging stopped OK"), - Err(e) => debug!("Messaging error: {:?}", e) + Err(e) => debug!("Messaging error: {:?}", e), } // TODO: continue the implementation of the asynchronous common // subset algorithm. diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 9ff952f..8c40e3d 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,29 +1,29 @@ //! Construction of messages from protobuf buffers. pub mod message; +use merkle::proof::{Lemma, Positioned, Proof}; +use proto::message::*; +use protobuf::core::parse_from_bytes; +use protobuf::error::{ProtobufError, ProtobufResult, WireError}; +use protobuf::Message as ProtobufMessage; +use ring::digest::Algorithm; use std::fmt; use std::marker::{Send, Sync}; -use ring::digest::Algorithm; -use merkle::proof::{Proof, Lemma, Positioned}; -use protobuf::Message as ProtobufMessage; -use proto::message::*; -use protobuf::error::{ProtobufResult, ProtobufError, WireError}; -use protobuf::core::parse_from_bytes; /// Kinds of message sent by nodes participating in consensus. -#[derive (Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub enum Message { Broadcast(BroadcastMessage), - Agreement(AgreementMessage) + Agreement(AgreementMessage), } /// The three kinds of message sent during the reliable broadcast stage of the /// consensus algorithm. -#[derive (Clone, PartialEq)] +#[derive(Clone, PartialEq)] pub enum BroadcastMessage { Value(Proof), Echo(Proof), - Ready(Vec) + Ready(Vec), } pub struct HexBytes<'a>(pub &'a [u8]); @@ -51,10 +51,13 @@ struct HexProof<'a, T: 'a>(&'a Proof); impl<'a, T: Send + Sync + fmt::Debug> fmt::Debug for HexProof<'a, T> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Proof {{ algorithm: {:?}, root_hash: {:?}, lemma: .., value: {:?} }}", - self.0.algorithm, - HexBytes(&self.0.root_hash), - self.0.value) + write!( + f, + "Proof {{ algorithm: {:?}, root_hash: {:?}, lemma: .., value: {:?} }}", + self.0.algorithm, + HexBytes(&self.0.root_hash), + self.0.value + ) } } @@ -63,15 +66,13 @@ impl fmt::Debug for BroadcastMessage { match *self { BroadcastMessage::Value(ref v) => write!(f, "Value({:?})", HexProof(&v)), BroadcastMessage::Echo(ref v) => write!(f, "Echo({:?})", HexProof(&v)), - BroadcastMessage::Ready(ref bytes) => { - write!(f, "Ready({:?})", HexBytes(bytes)) - } + BroadcastMessage::Ready(ref bytes) => write!(f, "Ready({:?})", HexBytes(bytes)), } } } /// Messages sent during the binary Byzantine agreement stage. -#[derive (Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub enum AgreementMessage { // TODO } @@ -83,34 +84,33 @@ impl Message { /// to be fully serialised and sent as a whole, or it can be passed over /// using an ID and the `Eq` instance to discriminate the finite set of /// algorithms in `ring::digest`. - pub fn from_proto(mut proto: message::MessageProto) - -> Option - where T: From> + pub fn from_proto(mut proto: message::MessageProto) -> Option + where + T: From>, { if proto.has_broadcast() { - BroadcastMessage::from_proto(proto.take_broadcast(), - // TODO, possibly move Algorithm inside - // BroadcastMessage - &::ring::digest::SHA256) - .map(Message::Broadcast) - } - else if proto.has_agreement() { - AgreementMessage::from_proto(proto.take_agreement()) - .map(Message::Agreement) - } - else { + BroadcastMessage::from_proto( + proto.take_broadcast(), + // TODO, possibly move Algorithm inside + // BroadcastMessage + &::ring::digest::SHA256, + ).map(Message::Broadcast) + } else if proto.has_agreement() { + AgreementMessage::from_proto(proto.take_agreement()).map(Message::Agreement) + } else { None } } pub fn into_proto(self) -> MessageProto - where T: Into> + where + T: Into>, { let mut m = MessageProto::new(); match self { Message::Broadcast(b) => { m.set_broadcast(b.into_proto()); - }, + } Message::Agreement(a) => { m.set_agreement(a.into_proto()); } @@ -123,21 +123,22 @@ impl Message { /// TODO: pass custom errors from down the chain of nested parsers as /// opposed to returning `WireError::Other`. pub fn parse_from_bytes(bytes: &[u8]) -> ProtobufResult - where T: From> + where + T: From>, { - let r = parse_from_bytes::(bytes) - .map(Self::from_proto); + let r = parse_from_bytes::(bytes).map(Self::from_proto); match r { Ok(Some(m)) => Ok(m), Ok(None) => Err(ProtobufError::WireError(WireError::Other)), - Err(e) => Err(e) + Err(e) => Err(e), } } /// Produce a protobuf representation of this `Message`. pub fn write_to_bytes(self) -> ProtobufResult> - where T: Into> + where + T: Into>, { self.into_proto().write_to_bytes() } @@ -145,7 +146,8 @@ impl Message { impl BroadcastMessage { pub fn into_proto(self) -> BroadcastProto - where T: Into> + where + T: Into>, { let mut b = BroadcastProto::new(); match self { @@ -153,12 +155,12 @@ impl BroadcastMessage { let mut v = ValueProto::new(); v.set_proof(ProofProto::from_proof(p)); b.set_value(v); - }, + } BroadcastMessage::Echo(p) => { let mut e = EchoProto::new(); e.set_proof(ProofProto::from_proof(p)); b.set_echo(e); - }, + } BroadcastMessage::Ready(h) => { let mut r = ReadyProto::new(); r.set_root_hash(h); @@ -167,39 +169,37 @@ impl BroadcastMessage { b } - pub fn from_proto(mut mp: BroadcastProto, - algorithm: &'static Algorithm) - -> Option - where T: From> + pub fn from_proto(mut mp: BroadcastProto, algorithm: &'static Algorithm) -> Option + where + T: From>, { if mp.has_value() { - mp.take_value().take_proof().into_proof(algorithm) + mp.take_value() + .take_proof() + .into_proof(algorithm) .map(BroadcastMessage::Value) - } - else if mp.has_echo() { - mp.take_echo().take_proof().into_proof(algorithm) + } else if mp.has_echo() { + mp.take_echo() + .take_proof() + .into_proof(algorithm) .map(BroadcastMessage::Echo) - } - else if mp.has_ready() { + } else if mp.has_ready() { let h = mp.take_ready().take_root_hash(); Some(BroadcastMessage::Ready(h)) - } - else { + } else { None } } } impl AgreementMessage { - pub fn into_proto(self) -> AgreementProto - { + pub fn into_proto(self) -> AgreementProto { unimplemented!(); } // TODO: Re-enable lint once implemented. #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] - pub fn from_proto(_mp: AgreementProto) -> Option - { + pub fn from_proto(_mp: AgreementProto) -> Option { unimplemented!(); } } @@ -209,9 +209,9 @@ impl AgreementMessage { /// `Proof` outside its crate. impl ProofProto { pub fn from_proof(proof: Proof) -> Self - where T: Into> + where + T: Into>, { - let mut proto = Self::new(); match proof { @@ -231,10 +231,9 @@ impl ProofProto { proto } - pub fn into_proof(mut self, - algorithm: &'static Algorithm) - -> Option> - where T: From> + pub fn into_proof(mut self, algorithm: &'static Algorithm) -> Option> + where + T: From>, { if !self.has_lemma() { return None; @@ -256,21 +255,21 @@ impl LemmaProto { let mut proto = Self::new(); match lemma { - Lemma {node_hash, sibling_hash, sub_lemma} => { + Lemma { + node_hash, + sibling_hash, + sub_lemma, + } => { proto.set_node_hash(node_hash); - if let Some(sub_proto) = sub_lemma.map( - |l| Self::from_lemma(*l)) - { + if let Some(sub_proto) = sub_lemma.map(|l| Self::from_lemma(*l)) { proto.set_sub_lemma(sub_proto); } match sibling_hash { - Some(Positioned::Left(hash)) => - proto.set_left_sibling_hash(hash), + Some(Positioned::Left(hash)) => proto.set_left_sibling_hash(hash), - Some(Positioned::Right(hash)) => - proto.set_right_sibling_hash(hash), + Some(Positioned::Right(hash)) => proto.set_right_sibling_hash(hash), None => {} } @@ -295,12 +294,10 @@ impl LemmaProto { // If a `sub_lemma` is present is the Protobuf, // then we expect it to unserialize to a valid `Lemma`, // otherwise we return `None` - self.take_sub_lemma().into_lemma().map(|sub_lemma| { - Lemma { - node_hash, - sibling_hash, - sub_lemma: Some(Box::new(sub_lemma)), - } + self.take_sub_lemma().into_lemma().map(|sub_lemma| Lemma { + node_hash, + sibling_hash, + sub_lemma: Some(Box::new(sub_lemma)), }) } else { // We might very well not have a sub_lemma, diff --git a/src/proto_io.rs b/src/proto_io.rs index eda00a5..9b551a2 100644 --- a/src/proto_io.rs +++ b/src/proto_io.rs @@ -1,11 +1,11 @@ //! Protobuf message IO task structure. -use std::{cmp,io}; -use std::io::Read; -use std::net::TcpStream; +use proto::*; use protobuf; use protobuf::Message as ProtobufMessage; -use proto::*; +use std::io::Read; +use std::net::TcpStream; +use std::{cmp, io}; /// A magic key to put right before each message. An atavism of primitive serial /// protocols. @@ -24,14 +24,18 @@ pub enum Error { } impl From for Error { - fn from(err: io::Error) -> Error { Error::IoError(err) } + fn from(err: io::Error) -> Error { + Error::IoError(err) + } } impl From for Error { - fn from(err: protobuf::ProtobufError) -> Error { Error::ProtobufError(err) } + fn from(err: protobuf::ProtobufError) -> Error { + Error::ProtobufError(err) + } } -fn encode_u32_to_be(value: u32, buffer: &mut[u8]) -> Result<(), Error> { +fn encode_u32_to_be(value: u32, buffer: &mut [u8]) -> Result<(), Error> { if buffer.len() < 4 { return Err(Error::EncodeError); } @@ -81,7 +85,8 @@ impl ProtoIo } pub fn recv(&mut self) -> Result, Error> - where T: Clone + Send + Sync + From> // + Into> + where + T: Clone + Send + Sync + From>, // + Into> { self.stream.read_exact(&mut self.buffer[0..4])?; let frame_start = decode_u32_from_be(&self.buffer[0..4])?; @@ -94,19 +99,18 @@ impl ProtoIo let mut message_v: Vec = Vec::new(); message_v.reserve(size); while message_v.len() < size { - let num_to_read = cmp::min(self.buffer.len(), size - - message_v.len()); + let num_to_read = cmp::min(self.buffer.len(), size - message_v.len()); let (slice, _) = self.buffer.split_at_mut(num_to_read); self.stream.read_exact(slice)?; message_v.extend_from_slice(slice); } - Message::parse_from_bytes(&message_v) - .map_err(Error::ProtobufError) + Message::parse_from_bytes(&message_v).map_err(Error::ProtobufError) } pub fn send(&mut self, message: Message) -> Result<(), Error> - where T: Clone + Send + Sync + Into> + where + T: Clone + Send + Sync + Into>, { let mut buffer: [u8; 4] = [0; 4]; // Wrap stream diff --git a/tests/broadcast.rs b/tests/broadcast.rs index 434b2cd..fb16890 100644 --- a/tests/broadcast.rs +++ b/tests/broadcast.rs @@ -3,24 +3,23 @@ extern crate hbbft; #[macro_use] extern crate log; -extern crate simple_logger; extern crate crossbeam; extern crate crossbeam_channel; extern crate merkle; +extern crate simple_logger; mod netsim; -use std::collections::{HashSet, HashMap}; -use crossbeam_channel::{Sender, Receiver}; +use crossbeam_channel::{Receiver, Sender}; +use std::collections::{HashMap, HashSet}; -use hbbft::proto::*; -use hbbft::messaging; -use hbbft::messaging::{QMessage, NodeUid, Algorithm, ProposedValue, - AlgoMessage, Handler, LocalMessage, - MessageLoop, RemoteMessage, RemoteNode}; -use hbbft::broadcast::Broadcast; use hbbft::broadcast; +use hbbft::broadcast::Broadcast; use hbbft::common_subset; +use hbbft::messaging; +use hbbft::messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoop, NodeUid, + ProposedValue, QMessage, RemoteMessage, RemoteNode}; +use hbbft::proto::*; use netsim::NetSim; @@ -37,14 +36,14 @@ pub struct TestNode<'a> { message_loop: MessageLoop<'a, Error>, } -impl<'a> TestNode<'a> -{ +impl<'a> TestNode<'a> { /// Consensus node constructor. It only initialises initial parameters. - pub fn new(uid: NodeUid, - txs: HashMap>>, - rxs: HashMap>>, - value: Option) -> Self - { + pub fn new( + uid: NodeUid, + txs: HashMap>>, + rxs: HashMap>>, + value: Option, + ) -> Self { TestNode { uid, rxs, @@ -53,22 +52,18 @@ impl<'a> TestNode<'a> } } - pub fn add_handler>(&'a self, - algo: Algorithm, - handler: &'a H) - { + pub fn add_handler>(&'a self, algo: Algorithm, handler: &'a H) { self.message_loop.insert_algo(algo, handler); } - pub fn run(&'a self) -> Result, Error> - { + pub fn run(&'a self) -> Result, Error> { let tx = self.message_loop.queue_tx(); if let Some(value) = &self.value { // Start the broadcast value transmission. tx.send(QMessage::Local(LocalMessage { dst: Algorithm::Broadcast(self.uid), - message: AlgoMessage::BroadcastInput(value.clone()) + message: AlgoMessage::BroadcastInput(value.clone()), }))?; } @@ -86,7 +81,7 @@ impl<'a> TestNode<'a> // FIXME: error handling tx.send(QMessage::Remote(RemoteMessage { node: RemoteNode::Node(*uid), - message + message, })).unwrap(); } debug!("Node {} receiver {} terminated", self_uid, uid); @@ -106,23 +101,31 @@ pub enum Error { Broadcast(broadcast::Error), CommonSubset(common_subset::Error), Send(crossbeam_channel::SendError), - NotImplemented + NotImplemented, } impl From for Error { - fn from(e: messaging::Error) -> Error { Error::Messaging(e) } + fn from(e: messaging::Error) -> Error { + Error::Messaging(e) + } } impl From for Error { - fn from(e: broadcast::Error) -> Error { Error::Broadcast(e) } + fn from(e: broadcast::Error) -> Error { + Error::Broadcast(e) + } } impl From for Error { - fn from(e: common_subset::Error) -> Error { Error::CommonSubset(e) } + fn from(e: common_subset::Error) -> Error { + Error::CommonSubset(e) + } } impl From> for Error { - fn from(e: crossbeam_channel::SendError) -> Error { Error::Send(e) } + fn from(e: crossbeam_channel::SendError) -> Error { + Error::Send(e) + } } fn proposed_value(n: usize) -> ProposedValue { @@ -135,10 +138,10 @@ fn node_addr(node_index: usize) -> NodeUid { } /// Creates test nodes but does not run them. -fn create_test_nodes(num_nodes: usize, - net: &NetSim>>) -> - HashMap)> -{ +fn create_test_nodes( + num_nodes: usize, + net: &NetSim>>, +) -> HashMap)> { let mut nodes = HashMap::new(); for n in 0..num_nodes { let value = proposed_value(n); @@ -156,8 +159,7 @@ fn create_test_nodes(num_nodes: usize, } let uid = node_addr(n); - let all_uids: HashSet = - (0..num_nodes).into_iter().map(node_addr).collect(); + let all_uids: HashSet = (0..num_nodes).into_iter().map(node_addr).collect(); let all_uids_copy = all_uids.clone(); // Create a broadcast algorithm instance for each node. @@ -166,15 +168,20 @@ fn create_test_nodes(num_nodes: usize, match Broadcast::new(uid, all_uids_copy.clone(), num_nodes) { Ok(instance) => { broadcast_instances.insert(uid, instance); - }, + } Err(e) => { panic!("{:?}", e); } } } - nodes.insert(uid, (TestNode::new(uid, txs, rxs, Some(value)), - broadcast_instances)); + nodes.insert( + uid, + ( + TestNode::new(uid, txs, rxs, Some(value)), + broadcast_instances, + ), + ); } nodes } @@ -192,15 +199,17 @@ fn test_4_broadcast_nodes() { crossbeam::scope(|scope| { // Run the test nodes, each in its own thread. for (uid, (node, broadcast_instances)) in &nodes { - join_handles.insert(*uid, scope.spawn(move || { - // Register broadcast instance handlers with the message loop. - for (instance_uid, instance) in broadcast_instances { - node.add_handler(Algorithm::Broadcast(*instance_uid), - instance); - } - debug!("Running {:?}", node.uid); - node.run() - })); + join_handles.insert( + *uid, + scope.spawn(move || { + // Register broadcast instance handlers with the message loop. + for (instance_uid, instance) in broadcast_instances { + node.add_handler(Algorithm::Broadcast(*instance_uid), instance); + } + debug!("Running {:?}", node.uid); + node.run() + }), + ); } for (uid, join_handle) in join_handles { diff --git a/tests/netsim.rs b/tests/netsim.rs index c046adb..55b0b2e 100644 --- a/tests/netsim.rs +++ b/tests/netsim.rs @@ -5,7 +5,7 @@ extern crate crossbeam_channel; extern crate log; -use crossbeam_channel::{Sender, Receiver, unbounded}; +use crossbeam_channel::{unbounded, Receiver, Sender}; pub struct NetSim { /// The number of simulated nodes. @@ -21,19 +21,13 @@ impl NetSim { assert!(num_nodes > 1); // All channels of a totally connected network of size `num_nodes`. let channels: Vec<(Sender, Receiver)> = - (0 .. num_nodes * num_nodes) - .map(|_| unbounded()) - .collect(); - let txs = channels.iter() - .map(|&(ref tx, _)| tx.to_owned()) - .collect(); - let rxs = channels.iter() - .map(|&(_, ref rx)| rx.to_owned()) - .collect(); + (0..num_nodes * num_nodes).map(|_| unbounded()).collect(); + let txs = channels.iter().map(|&(ref tx, _)| tx.to_owned()).collect(); + let rxs = channels.iter().map(|&(_, ref rx)| rx.to_owned()).collect(); NetSim { num_nodes, txs, - rxs + rxs, } }