applied rustfmt

This commit is contained in:
Andreas Fackler 2018-04-30 18:55:51 +03:00
parent f399ed2c07
commit 6117e11a9e
13 changed files with 802 additions and 821 deletions

View File

@ -12,18 +12,19 @@ fn protoc_exists() -> bool {
} }
} }
} }
None => println!("PATH environment variable is not defined.") None => println!("PATH environment variable is not defined."),
} }
false false
} }
fn main() { fn main() {
if !protoc_exists() { if !protoc_exists() {
panic!(" panic!(
"
protoc cannot be found. Install the protobuf compiler in the \ protoc cannot be found. Install the protobuf compiler in the \
system path."); system path."
} );
else { } else {
println!("cargo:rerun-if-changed=proto/message.proto"); println!("cargo:rerun-if-changed=proto/message.proto");
protoc_rust::run(protoc_rust::Args { protoc_rust::run(protoc_rust::Args {
out_dir: "src/proto", out_dir: "src/proto",

View File

@ -1,13 +1,13 @@
//! Example of a consensus node that uses the `hbbft::node::Node` struct for //! Example of a consensus node that uses the `hbbft::node::Node` struct for
//! running the distributed consensus state machine. //! running the distributed consensus state machine.
//#[macro_use] //#[macro_use]
extern crate log;
extern crate simple_logger;
extern crate docopt; extern crate docopt;
extern crate hbbft; extern crate hbbft;
extern crate log;
extern crate simple_logger;
use hbbft::node::Node;
use docopt::Docopt; use docopt::Docopt;
use hbbft::node::Node;
use std::collections::HashSet; use std::collections::HashSet;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::vec::Vec; use std::vec::Vec;
@ -38,15 +38,14 @@ fn parse_args() -> Args {
Args { Args {
value: if args.get_count("--value") > 0 { value: if args.get_count("--value") > 0 {
Some(args.get_str("--value").as_bytes().to_vec()) Some(args.get_str("--value").as_bytes().to_vec())
} } else {
else {
None None
}, },
bind_address: args.get_str("--bind-address").parse().unwrap(), bind_address: args.get_str("--bind-address").parse().unwrap(),
remote_addresses: args.get_vec("--remote-address") remote_addresses: args.get_vec("--remote-address")
.iter() .iter()
.map(|s| s.parse().unwrap()) .map(|s| s.parse().unwrap())
.collect() .collect(),
} }
} }

View File

@ -1,21 +1,20 @@
//! Reliable broadcast algorithm instance. //! Reliable broadcast algorithm instance.
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt::Debug;
use std::sync::{Arc, Mutex, RwLock};
use crossbeam; use crossbeam;
use proto::*; use crossbeam_channel::{Receiver, RecvError, SendError, Sender};
use std::marker::{Send, Sync}; use merkle::proof::{Lemma, Positioned, Proof};
use merkle::{Hashable, MerkleTree}; use merkle::{Hashable, MerkleTree};
use merkle::proof::{Proof, Lemma, Positioned}; use proto::*;
use reed_solomon_erasure as rse; use reed_solomon_erasure as rse;
use reed_solomon_erasure::ReedSolomon; use reed_solomon_erasure::ReedSolomon;
use crossbeam_channel::{Sender, Receiver, SendError, RecvError}; use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt::Debug;
use std::marker::{Send, Sync};
use std::sync::{Arc, Mutex, RwLock};
use messaging::{Target, TargetedMessage, SourcedMessage,
NodeUid, Algorithm, ProposedValue, QMessage, MessageLoopState,
Handler, LocalMessage, RemoteMessage, AlgoMessage,
RemoteNode};
use messaging; use messaging;
use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoopState, NodeUid,
ProposedValue, QMessage, RemoteMessage, RemoteNode, SourcedMessage, Target,
TargetedMessage};
struct BroadcastState { struct BroadcastState {
root_hash: Option<Vec<u8>>, root_hash: Option<Vec<u8>>,
@ -40,13 +39,11 @@ pub struct Broadcast {
/// All the mutable state is confined to the `state` field. This allows to /// All the mutable state is confined to the `state` field. This allows to
/// mutate state even when the broadcast instance is referred to by an /// mutate state even when the broadcast instance is referred to by an
/// immutable reference. /// immutable reference.
state: RwLock<BroadcastState> state: RwLock<BroadcastState>,
} }
impl Broadcast { impl Broadcast {
pub fn new(uid: NodeUid, all_uids: HashSet<NodeUid>, num_nodes: usize) -> pub fn new(uid: NodeUid, all_uids: HashSet<NodeUid>, num_nodes: usize) -> Result<Self, Error> {
Result<Self, Error>
{
let num_faulty_nodes = (num_nodes - 1) / 3; let num_faulty_nodes = (num_nodes - 1) / 3;
let parity_shard_num = 2 * num_faulty_nodes; let parity_shard_num = 2 * num_faulty_nodes;
let data_shard_num = num_nodes - parity_shard_num; let data_shard_num = num_nodes - parity_shard_num;
@ -67,68 +64,61 @@ impl Broadcast {
readys: HashMap::new(), readys: HashMap::new(),
ready_sent: false, ready_sent: false,
ready_to_decode: false, ready_to_decode: false,
}) }),
}) })
} }
/// The message-driven interface function for calls from the main message /// The message-driven interface function for calls from the main message
/// loop. /// loop.
pub fn on_message<E>(&self, m: QMessage, tx: &Sender<QMessage>) -> pub fn on_message<E>(&self, m: QMessage, tx: &Sender<QMessage>) -> Result<MessageLoopState, E>
Result<MessageLoopState, E> where
where E: From<Error> + From<messaging::Error> E: From<Error> + From<messaging::Error>,
{ match m { {
QMessage::Local(LocalMessage { match m {
message, QMessage::Local(LocalMessage { message, .. }) => match message {
.. AlgoMessage::BroadcastInput(value) => self.on_local_message(&mut value.to_owned()),
}) => {
match message { _ => Err(Error::UnexpectedMessage).map_err(E::from),
AlgoMessage::BroadcastInput(value) => { },
self.on_local_message(&mut value.to_owned())
QMessage::Remote(RemoteMessage {
node: RemoteNode::Node(uid),
message,
}) => {
if let Message::Broadcast(b) = message {
self.on_remote_message(uid, &b, tx)
} else {
Err(Error::UnexpectedMessage).map_err(E::from)
} }
_ => Err(Error::UnexpectedMessage).map_err(E::from)
} }
},
QMessage::Remote(RemoteMessage { _ => Err(Error::UnexpectedMessage).map_err(E::from),
node: RemoteNode::Node(uid), }
message }
}) => {
if let Message::Broadcast(b) = message {
self.on_remote_message(uid, &b, tx)
}
else {
Err(Error::UnexpectedMessage).map_err(E::from)
}
},
_ => Err(Error::UnexpectedMessage).map_err(E::from)
}}
/// Processes the proposed value input by broadcasting it. /// Processes the proposed value input by broadcasting it.
fn on_local_message<E>(&self, value: &mut ProposedValue) -> fn on_local_message<E>(&self, value: &mut ProposedValue) -> Result<MessageLoopState, E>
Result<MessageLoopState, E> where
where E: From<Error> + From<messaging::Error> E: From<Error> + From<messaging::Error>,
{ {
let mut state = self.state.write().unwrap(); let mut state = self.state.write().unwrap();
// Split the value into chunks/shards, encode them with erasure codes. // Split the value into chunks/shards, encode them with erasure codes.
// Assemble a Merkle tree from data and parity shards. Take all proofs // Assemble a Merkle tree from data and parity shards. Take all proofs
// from this tree and send them, each to its own node. // from this tree and send them, each to its own node.
self.send_shards(value) self.send_shards(value).map(|(proof, remote_messages)| {
.map(|(proof, remote_messages)| { // Record the first proof as if it were sent by the node to
// Record the first proof as if it were sent by the node to // itself.
// itself. let h = proof.root_hash.clone();
let h = proof.root_hash.clone(); if proof.validate(h.as_slice()) {
if proof.validate(h.as_slice()) { // Save the leaf value for reconstructing the tree later.
// Save the leaf value for reconstructing the tree later. state.leaf_values[index_of_proof(&proof)] =
state.leaf_values[index_of_proof(&proof)] = Some(proof.value.clone().into_boxed_slice());
Some(proof.value.clone().into_boxed_slice()); state.leaf_values_num += 1;
state.leaf_values_num += 1; state.root_hash = Some(h);
state.root_hash = Some(h); }
}
MessageLoopState::Processing(remote_messages) MessageLoopState::Processing(remote_messages)
}) })
} }
/// Breaks the input value into shards of equal length and encodes them -- /// Breaks the input value into shards of equal length and encodes them --
@ -136,15 +126,20 @@ impl Broadcast {
/// scheme. The returned value contains the shard assigned to this /// scheme. The returned value contains the shard assigned to this
/// node. That shard doesn't need to be sent anywhere. It gets recorded in /// node. That shard doesn't need to be sent anywhere. It gets recorded in
/// the broadcast instance. /// the broadcast instance.
fn send_shards<E>(&self, value: &mut ProposedValue) -> fn send_shards<E>(
Result<(Proof<ProposedValue>, VecDeque<RemoteMessage>), E> &self,
where E: From<Error> + From<messaging::Error> value: &mut ProposedValue,
) -> Result<(Proof<ProposedValue>, VecDeque<RemoteMessage>), E>
where
E: From<Error> + From<messaging::Error>,
{ {
let data_shard_num = self.coding.data_shard_count(); let data_shard_num = self.coding.data_shard_count();
let parity_shard_num = self.coding.parity_shard_count(); let parity_shard_num = self.coding.parity_shard_count();
debug!("Data shards: {}, parity shards: {}", debug!(
self.data_shard_num, parity_shard_num); "Data shards: {}, parity shards: {}",
self.data_shard_num, parity_shard_num
);
// Insert the length of `v` so it can be decoded without the padding. // Insert the length of `v` so it can be decoded without the padding.
let payload_len = value.len() as u8; let payload_len = value.len() as u8;
value.insert(0, payload_len); // TODO: Handle messages larger than 255 value.insert(0, payload_len); // TODO: Handle messages larger than 255
@ -153,8 +148,7 @@ impl Broadcast {
// Size of a Merkle tree leaf value, in bytes. // Size of a Merkle tree leaf value, in bytes.
let shard_len = if value_len % data_shard_num > 0 { let shard_len = if value_len % data_shard_num > 0 {
value_len / data_shard_num + 1 value_len / data_shard_num + 1
} } else {
else {
value_len / data_shard_num value_len / data_shard_num
}; };
// Pad the last data shard with zeros. Fill the parity shards with // Pad the last data shard with zeros. Fill the parity shards with
@ -174,12 +168,13 @@ impl Broadcast {
debug!("Shards before encoding: {:?}", shards); debug!("Shards before encoding: {:?}", shards);
// Construct the parity chunks/shards // Construct the parity chunks/shards
self.coding.encode(shards.as_mut_slice()).map_err(Error::from)?; self.coding
.encode(shards.as_mut_slice())
.map_err(Error::from)?;
debug!("Shards: {:?}", shards); debug!("Shards: {:?}", shards);
let shards_t: Vec<ProposedValue> = let shards_t: Vec<ProposedValue> = shards.into_iter().map(|s| s.to_vec()).collect();
shards.into_iter().map(|s| s.to_vec()).collect();
// Convert the Merkle tree into a partial binary tree for later // Convert the Merkle tree into a partial binary tree for later
// deconstruction into compound branches. // deconstruction into compound branches.
@ -196,15 +191,12 @@ impl Broadcast {
if uid == self.uid { if uid == self.uid {
// The proof is addressed to this node. // The proof is addressed to this node.
result = Ok(proof); result = Ok(proof);
} } else {
else {
// Rest of the proofs are sent to remote nodes. // Rest of the proofs are sent to remote nodes.
outgoing.push_back( outgoing.push_back(RemoteMessage {
RemoteMessage { node: RemoteNode::Node(uid),
node: RemoteNode::Node(uid), message: Message::Broadcast(BroadcastMessage::Value(proof)),
message: Message::Broadcast( });
BroadcastMessage::Value(proof))
});
} }
} }
} }
@ -213,11 +205,14 @@ impl Broadcast {
} }
/// Handler of messages received from remote nodes. /// Handler of messages received from remote nodes.
fn on_remote_message<E>(&self, uid: NodeUid, fn on_remote_message<E>(
message: &BroadcastMessage<ProposedValue>, &self,
tx: &Sender<QMessage>) -> uid: NodeUid,
Result<MessageLoopState, E> message: &BroadcastMessage<ProposedValue>,
where E: From<Error> + From<messaging::Error> tx: &Sender<QMessage>,
) -> Result<MessageLoopState, E>
where
E: From<Error> + From<messaging::Error>,
{ {
let mut state = self.state.write().unwrap(); let mut state = self.state.write().unwrap();
let no_outgoing = Ok(MessageLoopState::Processing(VecDeque::new())); let no_outgoing = Ok(MessageLoopState::Processing(VecDeque::new()));
@ -228,13 +223,15 @@ impl Broadcast {
if uid != self.uid { if uid != self.uid {
// Ignore value messages from unrelated remote nodes. // Ignore value messages from unrelated remote nodes.
no_outgoing no_outgoing
} } else {
else {
// Initialise the root hash if not already initialised. // Initialise the root hash if not already initialised.
if state.root_hash.is_none() { if state.root_hash.is_none() {
state.root_hash = Some(p.root_hash.clone()); state.root_hash = Some(p.root_hash.clone());
debug!("Node {} Value root hash {:?}", debug!(
self.uid, HexBytes(&p.root_hash)); "Node {} Value root hash {:?}",
self.uid,
HexBytes(&p.root_hash)
);
} }
if let Some(ref h) = state.root_hash.clone() { if let Some(ref h) = state.root_hash.clone() {
@ -248,22 +245,20 @@ impl Broadcast {
} }
// Enqueue a broadcast of an echo of this proof. // Enqueue a broadcast of an echo of this proof.
Ok(MessageLoopState::Processing(VecDeque::from( Ok(MessageLoopState::Processing(VecDeque::from(vec![
vec![RemoteMessage { RemoteMessage {
node: RemoteNode::All, node: RemoteNode::All,
message: Message::Broadcast( message: Message::Broadcast(BroadcastMessage::Echo(p.clone())),
BroadcastMessage::Echo(p.clone())) },
}] ])))
)))
} }
}, }
// An echo received. Verify the proof it contains. // An echo received. Verify the proof it contains.
BroadcastMessage::Echo(p) => { BroadcastMessage::Echo(p) => {
if state.root_hash.is_none() && uid == self.uid { if state.root_hash.is_none() && uid == self.uid {
state.root_hash = Some(p.root_hash.clone()); state.root_hash = Some(p.root_hash.clone());
debug!("Node {} Echo root hash {:?}", debug!("Node {} Echo root hash {:?}", self.uid, state.root_hash);
self.uid, state.root_hash);
} }
// Call validate with the root hash as argument. // Call validate with the root hash as argument.
@ -279,61 +274,61 @@ impl Broadcast {
// Upon receiving 2f + 1 matching READY(h) // Upon receiving 2f + 1 matching READY(h)
// messages, wait for N 2 f ECHO messages, // messages, wait for N 2 f ECHO messages,
// then decode v. Return the decoded v to ACS. // then decode v. Return the decoded v to ACS.
if state.ready_to_decode && if state.ready_to_decode
state.leaf_values_num >= && state.leaf_values_num >= self.num_nodes - 2 * self.num_faulty_nodes
self.num_nodes - 2 * self.num_faulty_nodes
{ {
let value = let value = decode_from_shards(
decode_from_shards( &mut state.leaf_values,
&mut state.leaf_values, &self.coding,
&self.coding, self.data_shard_num,
self.data_shard_num, h)?; h,
)?;
tx.send(QMessage::Local(LocalMessage { tx.send(QMessage::Local(LocalMessage {
dst: Algorithm::CommonSubset, dst: Algorithm::CommonSubset,
message: AlgoMessage::BroadcastOutput( message: AlgoMessage::BroadcastOutput(uid, value),
uid,
value)
})).map_err(Error::from)?; })).map_err(Error::from)?;
no_outgoing no_outgoing
} } else if state.leaf_values_num >= self.num_nodes - self.num_faulty_nodes {
else if state.leaf_values_num >= let result: Result<ProposedValue, Error> = decode_from_shards(
self.num_nodes - self.num_faulty_nodes &mut state.leaf_values,
{ &self.coding,
let result: Result<ProposedValue, Error> = self.data_shard_num,
decode_from_shards( h,
&mut state.leaf_values, );
&self.coding, match result {
self.data_shard_num, h); Ok(_) => {
match result { Ok(_) => { // if Ready has not yet been sent, multicast
// if Ready has not yet been sent, multicast // Ready
// Ready if !state.ready_sent {
if !state.ready_sent { state.ready_sent = true;
state.ready_sent = true;
Ok(MessageLoopState::Processing( Ok(MessageLoopState::Processing(VecDeque::from(vec![
VecDeque::from(vec![RemoteMessage { RemoteMessage {
node: RemoteNode::All, node: RemoteNode::All,
message: Message::Broadcast( message: Message::Broadcast(
BroadcastMessage::Ready( BroadcastMessage::Ready(h.to_owned()),
h.to_owned())) ),
}]) },
)) ])))
} else { no_outgoing } } else {
}, Err(e) => Err(E::from(e)) } no_outgoing
} else { no_outgoing } }
} }
else { Err(e) => Err(E::from(e)),
debug!("Broadcast/{} cannot validate Echo {:?}", }
self.uid, p); } else {
no_outgoing
}
} else {
debug!("Broadcast/{} cannot validate Echo {:?}", self.uid, p);
no_outgoing no_outgoing
} }
} } else {
else {
error!("Broadcast/{} root hash not initialised", self.uid); error!("Broadcast/{} root hash not initialised", self.uid);
no_outgoing no_outgoing
} }
}, }
BroadcastMessage::Ready(ref hash) => { BroadcastMessage::Ready(ref hash) => {
// Update the number Ready has been received with this hash. // Update the number Ready has been received with this hash.
@ -346,14 +341,11 @@ impl Broadcast {
// Upon receiving f + 1 matching Ready(h) messages, if Ready // Upon receiving f + 1 matching Ready(h) messages, if Ready
// has not yet been sent, multicast Ready(h). // has not yet been sent, multicast Ready(h).
if (ready_num == self.num_faulty_nodes + 1) && if (ready_num == self.num_faulty_nodes + 1) && !state.ready_sent {
!state.ready_sent
{
// Enqueue a broadcast of a ready message. // Enqueue a broadcast of a ready message.
outgoing.push_back(RemoteMessage { outgoing.push_back(RemoteMessage {
node: RemoteNode::All, node: RemoteNode::All,
message: Message::Broadcast( message: Message::Broadcast(BroadcastMessage::Ready(h.to_vec())),
BroadcastMessage::Ready(h.to_vec()))
}); });
} }
@ -361,39 +353,37 @@ impl Broadcast {
// for N 2f Echo messages, then decode v. // for N 2f Echo messages, then decode v.
if ready_num > 2 * self.num_faulty_nodes { if ready_num > 2 * self.num_faulty_nodes {
// Wait for N - 2f Echo messages, then decode v. // Wait for N - 2f Echo messages, then decode v.
if state.echo_num >= if state.echo_num >= self.num_nodes - 2 * self.num_faulty_nodes {
self.num_nodes - 2 * self.num_faulty_nodes
{
let value = decode_from_shards( let value = decode_from_shards(
&mut state.leaf_values, &mut state.leaf_values,
&self.coding, &self.coding,
self.data_shard_num, h)?; self.data_shard_num,
h,
)?;
tx.send(QMessage::Local(LocalMessage { tx.send(QMessage::Local(LocalMessage {
dst: Algorithm::CommonSubset, dst: Algorithm::CommonSubset,
message: AlgoMessage::BroadcastOutput( message: AlgoMessage::BroadcastOutput(self.uid, value),
self.uid,
value)
})).map_err(Error::from)?; })).map_err(Error::from)?;
} } else {
else {
state.ready_to_decode = true; state.ready_to_decode = true;
} }
} }
Ok(MessageLoopState::Processing(outgoing)) Ok(MessageLoopState::Processing(outgoing))
} else {
no_outgoing
} }
else { no_outgoing }
} }
} }
} }
} }
impl<'a, E> Handler<E> for Broadcast impl<'a, E> Handler<E> for Broadcast
where E: From<Error> + From<messaging::Error> { where
fn handle(&self, m: QMessage, tx: Sender<QMessage>) -> E: From<Error> + From<messaging::Error>,
Result<MessageLoopState, E> {
{ fn handle(&self, m: QMessage, tx: Sender<QMessage>) -> Result<MessageLoopState, E> {
self.on_message(m, &tx) self.on_message(m, &tx)
} }
} }
@ -418,27 +408,26 @@ pub struct Instance<'a, T: 'a + Clone + Debug + Send + Sync> {
/// Number of nodes participating in broadcast. /// Number of nodes participating in broadcast.
num_nodes: usize, num_nodes: usize,
/// Maximum allowed number of faulty nodes. /// Maximum allowed number of faulty nodes.
num_faulty_nodes: usize num_faulty_nodes: usize,
} }
impl<'a, T: Clone + Debug + Hashable + Send + Sync impl<'a, T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>>
+ Into<Vec<u8>> + From<Vec<u8>>>
Instance<'a, T> Instance<'a, T>
{ {
pub fn new(tx: &'a Sender<TargetedMessage<ProposedValue>>, pub fn new(
rx: &'a Receiver<SourcedMessage<ProposedValue>>, tx: &'a Sender<TargetedMessage<ProposedValue>>,
broadcast_value: Option<T>, rx: &'a Receiver<SourcedMessage<ProposedValue>>,
num_nodes: usize, broadcast_value: Option<T>,
node_index: usize) -> num_nodes: usize,
Self node_index: usize,
{ ) -> Self {
Instance { Instance {
tx, tx,
rx, rx,
broadcast_value, broadcast_value,
node_index, node_index,
num_nodes, num_nodes,
num_faulty_nodes: (num_nodes - 1) / 3 num_faulty_nodes: (num_nodes - 1) / 3,
} }
} }
@ -453,16 +442,19 @@ impl<'a, T: Clone + Debug + Hashable + Send + Sync
crossbeam::scope(|scope| { crossbeam::scope(|scope| {
scope.spawn(move || { scope.spawn(move || {
*result_r_scoped.lock().unwrap() = *result_r_scoped.lock().unwrap() = Some(inner_run(
Some(inner_run(self.tx, self.rx, bvalue, self.tx,
self.node_index, self.num_nodes, self.rx,
self.num_faulty_nodes)); bvalue,
self.node_index,
self.num_nodes,
self.num_faulty_nodes,
));
}); });
}); });
if let Some(ref r) = *result_r.lock().unwrap() { if let Some(ref r) = *result_r.lock().unwrap() {
result = r.to_owned(); result = r.to_owned();
} } else {
else {
result = Err(Error::Threading); result = Err(Error::Threading);
} }
result result
@ -480,28 +472,31 @@ pub enum Error {
SendDeprecated(SendError<TargetedMessage<ProposedValue>>), SendDeprecated(SendError<TargetedMessage<ProposedValue>>),
Recv(RecvError), Recv(RecvError),
UnexpectedMessage, UnexpectedMessage,
NotImplemented NotImplemented,
} }
impl From<rse::Error> for Error impl From<rse::Error> for Error {
{ fn from(err: rse::Error) -> Error {
fn from(err: rse::Error) -> Error { Error::ReedSolomon(err) } Error::ReedSolomon(err)
}
} }
impl From<SendError<QMessage>> for Error impl From<SendError<QMessage>> for Error {
{ fn from(err: SendError<QMessage>) -> Error {
fn from(err: SendError<QMessage>) -> Error { Error::Send(err) } Error::Send(err)
}
} }
impl From<SendError<TargetedMessage<ProposedValue>>> for Error impl From<SendError<TargetedMessage<ProposedValue>>> for Error {
{ fn from(err: SendError<TargetedMessage<ProposedValue>>) -> Error {
fn from(err: SendError<TargetedMessage<ProposedValue>>) -> Error::SendDeprecated(err)
Error { Error::SendDeprecated(err) } }
} }
impl From<RecvError> for Error impl From<RecvError> for Error {
{ fn from(err: RecvError) -> Error {
fn from(err: RecvError) -> Error { Error::Recv(err) } Error::Recv(err)
}
} }
/// Breaks the input value into shards of equal length and encodes them -- and /// Breaks the input value into shards of equal length and encodes them -- and
@ -509,17 +504,21 @@ impl From<RecvError> for Error
/// returned value contains the shard assigned to this node. That shard doesn't /// returned value contains the shard assigned to this node. That shard doesn't
/// need to be sent anywhere. It is returned to the broadcast instance and gets /// need to be sent anywhere. It is returned to the broadcast instance and gets
/// recorded immediately. /// recorded immediately.
fn send_shards<'a, T>(value: T, fn send_shards<'a, T>(
tx: &'a Sender<TargetedMessage<ProposedValue>>, value: T,
coding: &ReedSolomon) -> tx: &'a Sender<TargetedMessage<ProposedValue>>,
Result<Proof<ProposedValue>, Error> coding: &ReedSolomon,
where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>> ) -> Result<Proof<ProposedValue>, Error>
where
T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>,
{ {
let data_shard_num = coding.data_shard_count(); let data_shard_num = coding.data_shard_count();
let parity_shard_num = coding.parity_shard_count(); let parity_shard_num = coding.parity_shard_count();
debug!("Data shards: {}, parity shards: {}", debug!(
data_shard_num, parity_shard_num); "Data shards: {}, parity shards: {}",
data_shard_num, parity_shard_num
);
let mut v: Vec<u8> = T::into(value); let mut v: Vec<u8> = T::into(value);
// Insert the length of `v` so it can be decoded without the padding. // Insert the length of `v` so it can be decoded without the padding.
let payload_len = v.len() as u8; let payload_len = v.len() as u8;
@ -528,8 +527,7 @@ where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>
// Size of a Merkle tree leaf value, in bytes. // Size of a Merkle tree leaf value, in bytes.
let shard_len = if value_len % data_shard_num > 0 { let shard_len = if value_len % data_shard_num > 0 {
value_len / data_shard_num + 1 value_len / data_shard_num + 1
} } else {
else {
value_len / data_shard_num value_len / data_shard_num
}; };
// Pad the last data shard with zeros. Fill the parity shards with zeros. // Pad the last data shard with zeros. Fill the parity shards with zeros.
@ -552,8 +550,7 @@ where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>
debug!("Shards: {:?}", shards); debug!("Shards: {:?}", shards);
let shards_t: Vec<ProposedValue> = let shards_t: Vec<ProposedValue> = shards.into_iter().map(|s| s.to_vec()).collect();
shards.into_iter().map(|s| s.to_vec()).collect();
// Convert the Merkle tree into a partial binary tree for later // Convert the Merkle tree into a partial binary tree for later
// deconstruction into compound branches. // deconstruction into compound branches.
@ -569,15 +566,12 @@ where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>
if i == 0 { if i == 0 {
// The first proof is addressed to this node. // The first proof is addressed to this node.
result = Ok(proof); result = Ok(proof);
} } else {
else {
// Rest of the proofs are sent to remote nodes. // Rest of the proofs are sent to remote nodes.
tx.send( tx.send(TargetedMessage {
TargetedMessage { target: Target::Node(i),
target: Target::Node(i), message: Message::Broadcast(BroadcastMessage::Value(proof)),
message: Message::Broadcast( })?;
BroadcastMessage::Value(proof))
})?;
} }
} }
} }
@ -586,14 +580,16 @@ where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>
} }
/// The main loop of the broadcast task. /// The main loop of the broadcast task.
fn inner_run<'a, T>(tx: &'a Sender<TargetedMessage<ProposedValue>>, fn inner_run<'a, T>(
rx: &'a Receiver<SourcedMessage<ProposedValue>>, tx: &'a Sender<TargetedMessage<ProposedValue>>,
broadcast_value: Option<T>, rx: &'a Receiver<SourcedMessage<ProposedValue>>,
node_index: usize, broadcast_value: Option<T>,
num_nodes: usize, node_index: usize,
num_faulty_nodes: usize) -> num_nodes: usize,
Result<T, Error> num_faulty_nodes: usize,
where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>> ) -> Result<T, Error>
where
T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>,
{ {
// Erasure coding scheme: N - 2f value shards and 2f parity shards // Erasure coding scheme: N - 2f value shards and 2f parity shards
let parity_shard_num = 2 * num_faulty_nodes; let parity_shard_num = 2 * num_faulty_nodes;
@ -611,19 +607,17 @@ where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>
// Assemble a Merkle tree from data and parity shards. Take all proofs from // Assemble a Merkle tree from data and parity shards. Take all proofs from
// this tree and send them, each to its own node. // this tree and send them, each to its own node.
if let Some(v) = broadcast_value { if let Some(v) = broadcast_value {
send_shards(v, tx, &coding) send_shards(v, tx, &coding).map(|proof| {
.map(|proof| { // Record the first proof as if it were sent by the node to
// Record the first proof as if it were sent by the node to // itself.
// itself. let h = proof.root_hash.clone();
let h = proof.root_hash.clone(); if proof.validate(h.as_slice()) {
if proof.validate(h.as_slice()) { // Save the leaf value for reconstructing the tree later.
// Save the leaf value for reconstructing the tree later. leaf_values[index_of_proof(&proof)] = Some(proof.value.clone().into_boxed_slice());
leaf_values[index_of_proof(&proof)] = leaf_values_num += 1;
Some(proof.value.clone().into_boxed_slice()); root_hash = Some(h);
leaf_values_num += 1; }
root_hash = Some(h); })?
}
})?
} }
// return value // return value
@ -641,8 +635,9 @@ where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>
let message = rx.recv()?; let message = rx.recv()?;
if let SourcedMessage { if let SourcedMessage {
source: i, source: i,
message: Message::Broadcast(message) message: Message::Broadcast(message),
} = message { } = message
{
match message { match message {
// A value received. Record the value and multicast an echo. // A value received. Record the value and multicast an echo.
BroadcastMessage::Value(p) => { BroadcastMessage::Value(p) => {
@ -653,8 +648,11 @@ where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>
if root_hash.is_none() { if root_hash.is_none() {
root_hash = Some(p.root_hash.clone()); root_hash = Some(p.root_hash.clone());
debug!("Node {} Value root hash {:?}", debug!(
node_index, HexBytes(&p.root_hash)); "Node {} Value root hash {:?}",
node_index,
HexBytes(&p.root_hash)
);
} }
if let Some(ref h) = root_hash { if let Some(ref h) = root_hash {
@ -669,16 +667,15 @@ where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>
// Broadcast an echo of this proof. // Broadcast an echo of this proof.
tx.send(TargetedMessage { tx.send(TargetedMessage {
target: Target::All, target: Target::All,
message: Message::Broadcast(BroadcastMessage::Echo(p)) message: Message::Broadcast(BroadcastMessage::Echo(p)),
})? })?
}, }
// An echo received. Verify the proof it contains. // An echo received. Verify the proof it contains.
BroadcastMessage::Echo(p) => { BroadcastMessage::Echo(p) => {
if root_hash.is_none() && i == node_index { if root_hash.is_none() && i == node_index {
root_hash = Some(p.root_hash.clone()); root_hash = Some(p.root_hash.clone());
debug!("Node {} Echo root hash {:?}", debug!("Node {} Echo root hash {:?}", node_index, root_hash);
node_index, root_hash);
} }
// call validate with the root hash as argument // call validate with the root hash as argument
@ -694,37 +691,37 @@ where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>
// upon receiving 2f + 1 matching READY(h) // upon receiving 2f + 1 matching READY(h)
// messages, wait for N 2 f ECHO messages, then // messages, wait for N 2 f ECHO messages, then
// decode v // decode v
if ready_to_decode && if ready_to_decode
leaf_values_num >= && leaf_values_num >= num_nodes - 2 * num_faulty_nodes
num_nodes - 2 * num_faulty_nodes
{ {
result = Some( result = Some(decode_from_shards(
decode_from_shards(&mut leaf_values, &mut leaf_values,
&coding, &coding,
data_shard_num, h)); data_shard_num,
} h,
else if leaf_values_num >= ));
num_nodes - num_faulty_nodes } else if leaf_values_num >= num_nodes - num_faulty_nodes {
{ result = Some(decode_from_shards(
result = Some( &mut leaf_values,
decode_from_shards(&mut leaf_values, &coding,
&coding, data_shard_num,
data_shard_num, h)); h,
));
// if Ready has not yet been sent, multicast // if Ready has not yet been sent, multicast
// Ready // Ready
if !ready_sent { if !ready_sent {
ready_sent = true; ready_sent = true;
tx.send(TargetedMessage { tx.send(TargetedMessage {
target: Target::All, target: Target::All,
message: Message::Broadcast( message: Message::Broadcast(BroadcastMessage::Ready(
BroadcastMessage::Ready( h.to_owned(),
h.to_owned())) )),
})?; })?;
} }
} }
} }
} }
}, }
BroadcastMessage::Ready(ref hash) => { BroadcastMessage::Ready(ref hash) => {
// Update the number Ready has been received with this hash. // Update the number Ready has been received with this hash.
@ -736,14 +733,10 @@ where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>
// Upon receiving f + 1 matching Ready(h) messages, if // Upon receiving f + 1 matching Ready(h) messages, if
// Ready has not yet been sent, multicast Ready(h). // Ready has not yet been sent, multicast Ready(h).
if (ready_num == num_faulty_nodes + 1) && if (ready_num == num_faulty_nodes + 1) && !ready_sent {
!ready_sent
{
tx.send(TargetedMessage { tx.send(TargetedMessage {
target: Target::All, target: Target::All,
message: Message::Broadcast( message: Message::Broadcast(BroadcastMessage::Ready(h.to_vec())),
BroadcastMessage::Ready(
h.to_vec()))
})?; })?;
} }
@ -752,20 +745,20 @@ where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>
if ready_num > 2 * num_faulty_nodes { if ready_num > 2 * num_faulty_nodes {
// Wait for N - 2f Echo messages, then decode v. // Wait for N - 2f Echo messages, then decode v.
if echo_num >= num_nodes - 2 * num_faulty_nodes { if echo_num >= num_nodes - 2 * num_faulty_nodes {
result = Some( result = Some(decode_from_shards(
decode_from_shards(&mut leaf_values, &mut leaf_values,
&coding, &coding,
data_shard_num, h)); data_shard_num,
} h,
else { ));
} else {
ready_to_decode = true; ready_to_decode = true;
} }
} }
} }
} }
} }
} } else {
else {
error!("Incorrect message from the socket: {:?}", message); error!("Incorrect message from the socket: {:?}", message);
} }
} }
@ -773,12 +766,14 @@ where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>
result.unwrap() result.unwrap()
} }
fn decode_from_shards<T>(leaf_values: &mut Vec<Option<Box<[u8]>>>, fn decode_from_shards<T>(
coding: &ReedSolomon, leaf_values: &mut Vec<Option<Box<[u8]>>>,
data_shard_num: usize, coding: &ReedSolomon,
root_hash: &[u8]) -> data_shard_num: usize,
Result<T, Error> root_hash: &[u8],
where T: Clone + Debug + Hashable + Send + Sync + From<Vec<u8>> + Into<Vec<u8>> ) -> Result<T, Error>
where
T: Clone + Debug + Hashable + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>,
{ {
// Try to interpolate the Merkle tree using the Reed-Solomon erasure coding // Try to interpolate the Merkle tree using the Reed-Solomon erasure coding
// scheme. // scheme.
@ -802,8 +797,7 @@ where T: Clone + Debug + Hashable + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>
// sensible not to continue trying to reconstruct the tree after this // sensible not to continue trying to reconstruct the tree after this
// point. This instance must have received incorrect shards. // point. This instance must have received incorrect shards.
Err(Error::RootHashMismatch) Err(Error::RootHashMismatch)
} } else {
else {
// Reconstruct the value from the data shards. // Reconstruct the value from the data shards.
Ok(glue_shards(mtree, data_shard_num)) Ok(glue_shards(mtree, data_shard_num))
} }
@ -813,7 +807,8 @@ where T: Clone + Debug + Hashable + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>
/// type `T`. This is useful for reconstructing the data value held in the tree /// type `T`. This is useful for reconstructing the data value held in the tree
/// and forgetting the leaves that contain parity information. /// and forgetting the leaves that contain parity information.
fn glue_shards<T>(m: MerkleTree<ProposedValue>, n: usize) -> T fn glue_shards<T>(m: MerkleTree<ProposedValue>, n: usize) -> T
where T: From<Vec<u8>> + Into<Vec<u8>> where
T: From<Vec<u8>> + Into<Vec<u8>>,
{ {
let t: Vec<u8> = m.into_iter().take(n).flat_map(|s| s).collect(); let t: Vec<u8> = m.into_iter().take(n).flat_map(|s| s).collect();
let payload_len = t[0] as usize; let payload_len = t[0] as usize;

View File

@ -1,12 +1,11 @@
//! Asynchronous Common Subset algorithm. //! Asynchronous Common Subset algorithm.
use crossbeam_channel::{SendError, Sender};
use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::RwLock; use std::sync::RwLock;
use crossbeam_channel::{Sender, SendError};
use messaging; use messaging;
use messaging::{NodeUid, QMessage, MessageLoopState, Handler, LocalMessage, use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoopState, NodeUid, QMessage};
Algorithm, AlgoMessage};
struct CommonSubsetState { struct CommonSubsetState {
agreement_inputs: HashMap<NodeUid, bool>, agreement_inputs: HashMap<NodeUid, bool>,
@ -18,14 +17,11 @@ pub struct CommonSubset {
uid: NodeUid, uid: NodeUid,
num_nodes: usize, num_nodes: usize,
num_faulty_nodes: usize, num_faulty_nodes: usize,
state: RwLock<CommonSubsetState> state: RwLock<CommonSubsetState>,
} }
impl CommonSubset { impl CommonSubset {
pub fn new(uid: NodeUid, num_nodes: usize, pub fn new(uid: NodeUid, num_nodes: usize, node_uids: HashSet<NodeUid>) -> Self {
node_uids: HashSet<NodeUid>) ->
Self
{
let num_faulty_nodes = (num_nodes - 1) / 3; let num_faulty_nodes = (num_nodes - 1) / 3;
CommonSubset { CommonSubset {
@ -35,98 +31,97 @@ impl CommonSubset {
state: RwLock::new(CommonSubsetState { state: RwLock::new(CommonSubsetState {
agreement_inputs: HashMap::new(), agreement_inputs: HashMap::new(),
agreement_true_outputs: HashSet::new(), agreement_true_outputs: HashSet::new(),
agreements_without_input: node_uids agreements_without_input: node_uids,
}) }),
} }
} }
pub fn on_message<E>(&self, m: QMessage, tx: &Sender<QMessage>) -> pub fn on_message<E>(&self, m: QMessage, tx: &Sender<QMessage>) -> Result<MessageLoopState, E>
Result<MessageLoopState, E> where
where E: From<Error> + From<messaging::Error> E: From<Error> + From<messaging::Error>,
{ match m { {
QMessage::Local(LocalMessage { match m {
message, QMessage::Local(LocalMessage { message, .. }) => {
.. let no_outgoing = Ok(MessageLoopState::Processing(VecDeque::new()));
}) => {
let no_outgoing = Ok(MessageLoopState::Processing(VecDeque::new()));
match message { match message {
// Upon receiving input v_i , input v_i to RBC_i. See Figure 2. // Upon receiving input v_i , input v_i to RBC_i. See Figure 2.
AlgoMessage::CommonSubsetInput(value) => { AlgoMessage::CommonSubsetInput(value) => {
tx.send(QMessage::Local(LocalMessage {
dst: Algorithm::Broadcast(self.uid),
message: AlgoMessage::BroadcastInput(value)
})).map_err(Error::from)?;
no_outgoing
}
// Upon delivery of v_j from RBC_j, if input has not yet been
// provided to BA_j, then provide input 1 to BA_j. See Figure
// 11.
//
// FIXME: Use the output value.
AlgoMessage::BroadcastOutput(uid, _value) => {
let mut state = self.state.write().unwrap();
if state.agreement_inputs.get(&uid).is_none() {
tx.send(QMessage::Local(LocalMessage { tx.send(QMessage::Local(LocalMessage {
dst: Algorithm::Agreement(uid), dst: Algorithm::Broadcast(self.uid),
message: AlgoMessage::AgreementInput(true) message: AlgoMessage::BroadcastInput(value),
})).map_err(Error::from)?; })).map_err(Error::from)?;
let _ = state.agreement_inputs.insert(uid, true); no_outgoing
state.agreements_without_input.remove(&uid);
} }
no_outgoing // Upon delivery of v_j from RBC_j, if input has not yet been
} // provided to BA_j, then provide input 1 to BA_j. See Figure
// 11.
// Upon delivery of value 1 from at least N f instances of BA, //
// provide input 0 to each instance of BA that has not yet been // FIXME: Use the output value.
// provided input. AlgoMessage::BroadcastOutput(uid, _value) => {
AlgoMessage::AgreementOutput(uid, true) => { let mut state = self.state.write().unwrap();
let mut state = self.state.write().unwrap(); if state.agreement_inputs.get(&uid).is_none() {
state.agreement_true_outputs.insert(uid);
if state.agreement_true_outputs.len() >=
self.num_nodes - self.num_faulty_nodes
{
// FIXME: Avoid cloning the set.
for uid0 in state.agreements_without_input.clone() {
tx.send(QMessage::Local(LocalMessage { tx.send(QMessage::Local(LocalMessage {
dst: Algorithm::Agreement(uid0), dst: Algorithm::Agreement(uid),
message: AlgoMessage::AgreementInput(false) message: AlgoMessage::AgreementInput(true),
})).map_err(Error::from)?; })).map_err(Error::from)?;
// TODO: Possibly not required. Keeping in place to let _ = state.agreement_inputs.insert(uid, true);
// avoid resending `false`. state.agreements_without_input.remove(&uid);
let _ = state.agreement_inputs.insert(uid0, false);
} }
no_outgoing
} }
no_outgoing // Upon delivery of value 1 from at least N f instances of BA,
} // provide input 0 to each instance of BA that has not yet been
// provided input.
AlgoMessage::AgreementOutput(uid, true) => {
let mut state = self.state.write().unwrap();
state.agreement_true_outputs.insert(uid);
// FIXME (missing clause): if state.agreement_true_outputs.len()
>= self.num_nodes - self.num_faulty_nodes
{
// FIXME: Avoid cloning the set.
for uid0 in state.agreements_without_input.clone() {
tx.send(QMessage::Local(LocalMessage {
dst: Algorithm::Agreement(uid0),
message: AlgoMessage::AgreementInput(false),
})).map_err(Error::from)?;
// TODO: Possibly not required. Keeping in place to
// avoid resending `false`.
let _ = state.agreement_inputs.insert(uid0, false);
}
}
no_outgoing
}
// FIXME (missing clause):
// //
// Once all instances of BA have completed, let C ⊂ [1..N] be // Once all instances of BA have completed, let C ⊂ [1..N] be
// the indexes of each BA that delivered 1. Wait for the output // the indexes of each BA that delivered 1. Wait for the output
// v_j for each RBC_j such that j∈C. Finally output j∈C v_j. // v_j for each RBC_j such that j∈C. Finally output j∈C v_j.
// Catchall // Catchall
_ => Err(Error::UnexpectedMessage).map_err(E::from) _ => Err(Error::UnexpectedMessage).map_err(E::from),
}
} }
},
_ => Err(Error::UnexpectedMessage).map_err(E::from) _ => Err(Error::UnexpectedMessage).map_err(E::from),
}} }
}
} }
impl<E> Handler<E> for CommonSubset impl<E> Handler<E> for CommonSubset
where E: From<Error> + From<messaging::Error> { where
fn handle(&self, m: QMessage, tx: Sender<QMessage>) -> E: From<Error> + From<messaging::Error>,
Result<MessageLoopState, E> {
{ fn handle(&self, m: QMessage, tx: Sender<QMessage>) -> Result<MessageLoopState, E> {
self.on_message(m, &tx) self.on_message(m, &tx)
} }
} }
@ -138,7 +133,8 @@ pub enum Error {
Send(SendError<QMessage>), Send(SendError<QMessage>),
} }
impl From<SendError<QMessage>> for Error impl From<SendError<QMessage>> for Error {
{ fn from(err: SendError<QMessage>) -> Error {
fn from(err: SendError<QMessage>) -> Error { Error::Send(err) } Error::Send(err)
}
} }

View File

@ -1,17 +1,17 @@
//! Comms task structure. A comms task communicates with a remote node through a //! Comms task structure. A comms task communicates with a remote node through a
//! socket. Local communication with coordinating threads is made via //! socket. Local communication with coordinating threads is made via
//! `crossbeam_channel::unbounded()`. //! `crossbeam_channel::unbounded()`.
use std::io;
use std::fmt::Debug;
use std::sync::Arc;
use std::net::TcpStream;
use crossbeam; use crossbeam;
use crossbeam_channel::{Sender, Receiver}; use crossbeam_channel::{Receiver, Sender};
use std::fmt::Debug;
use std::io;
use std::net::TcpStream;
use std::sync::Arc;
use messaging::SourcedMessage;
use proto::Message; use proto::Message;
use proto_io; use proto_io;
use proto_io::ProtoIo; use proto_io::ProtoIo;
use messaging::SourcedMessage;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
@ -19,14 +19,14 @@ pub enum Error {
} }
impl From<io::Error> for Error { impl From<io::Error> for Error {
fn from(err: io::Error) -> Error { Error::IoError(err) } fn from(err: io::Error) -> Error {
Error::IoError(err)
}
} }
/// A communication task connects a remote node to the thread that manages the /// A communication task connects a remote node to the thread that manages the
/// consensus algorithm. /// consensus algorithm.
pub struct CommsTask pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>> {
<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
{
/// The transmit side of the multiple producer channel from comms threads. /// The transmit side of the multiple producer channel from comms threads.
tx: &'a Sender<SourcedMessage<T>>, tx: &'a Sender<SourcedMessage<T>>,
/// The receive side of the channel to the comms thread. /// The receive side of the channel to the comms thread.
@ -34,26 +34,27 @@ pub struct CommsTask
/// The socket IO task. /// The socket IO task.
io: ProtoIo, io: ProtoIo,
/// The index of this comms task for identification against its remote node. /// The index of this comms task for identification against its remote node.
pub node_index: usize pub node_index: usize,
} }
impl <'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>> impl<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>> CommsTask<'a, T> {
CommsTask<'a, T> pub fn new(
{ tx: &'a Sender<SourcedMessage<T>>,
pub fn new(tx: &'a Sender<SourcedMessage<T>>, rx: &'a Receiver<Message<T>>,
rx: &'a Receiver<Message<T>>, stream: TcpStream,
stream: TcpStream, node_index: usize,
node_index: usize) -> ) -> Self {
Self debug!(
{ "Creating comms task #{} for {:?}",
debug!("Creating comms task #{} for {:?}", node_index, node_index,
stream.peer_addr().unwrap()); stream.peer_addr().unwrap()
);
CommsTask { CommsTask {
tx, tx,
rx, rx,
io: ProtoIo::from_stream(stream), io: ProtoIo::from_stream(stream),
node_index node_index,
} }
} }
@ -84,14 +85,14 @@ impl <'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
match self.io.recv() { match self.io.recv() {
Ok(message) => { Ok(message) => {
debug!("Node {} -> {:?}", node_index, message); debug!("Node {} -> {:?}", node_index, message);
tx.send( tx.send(SourcedMessage {
SourcedMessage { source: node_index,
source: node_index, message,
message }).unwrap();
}).unwrap(); }
}, Err(proto_io::Error::ProtobufError(e)) => {
Err(proto_io::Error::ProtobufError(e)) => warn!("Node {} - Protobuf error {}", node_index, e)
warn!("Node {} - Protobuf error {}", node_index, e), }
Err(e) => { Err(e) => {
warn!("Node {} - Critical error {:?}", node_index, e); warn!("Node {} - Critical error {:?}", node_index, e);
break; break;

View File

@ -2,7 +2,7 @@
use std::collections::HashSet; use std::collections::HashSet;
use std::io::BufReader; use std::io::BufReader;
use std::net::{TcpStream, TcpListener, SocketAddr}; use std::net::{SocketAddr, TcpListener, TcpStream};
#[derive(Debug)] #[derive(Debug)]
pub struct Connection { pub struct Connection {
@ -15,24 +15,21 @@ impl Connection {
Connection { Connection {
// Create a read buffer of 1K bytes. // Create a read buffer of 1K bytes.
reader: BufReader::with_capacity(1024, stream.try_clone().unwrap()), reader: BufReader::with_capacity(1024, stream.try_clone().unwrap()),
stream stream,
} }
} }
} }
/// Connect this node to remote peers. A vector of successful connections is /// Connect this node to remote peers. A vector of successful connections is
/// returned. /// returned.
pub fn make(bind_address: &SocketAddr, pub fn make(bind_address: &SocketAddr, remote_addresses: &HashSet<SocketAddr>) -> Vec<Connection> {
remote_addresses: &HashSet<SocketAddr>) -> Vec<Connection>
{
// Connected remote nodes. // Connected remote nodes.
// let mut connected: Vec<SocketAddr> = Vec::new(); // let mut connected: Vec<SocketAddr> = Vec::new();
// Listen for incoming connections on a given TCP port. // Listen for incoming connections on a given TCP port.
let bind_address = bind_address; let bind_address = bind_address;
let listener = TcpListener::bind(bind_address).unwrap(); let listener = TcpListener::bind(bind_address).unwrap();
// Initialise initial connection states. // Initialise initial connection states.
let mut connections: Vec<Option<Connection>> = let mut connections: Vec<Option<Connection>> = (0..remote_addresses.len())
(0 .. remote_addresses.len())
.into_iter() .into_iter()
.map(|_| None) .map(|_| None)
.collect(); .collect();
@ -42,14 +39,13 @@ pub fn make(bind_address: &SocketAddr,
for (n, &address) in remote_addresses.iter().enumerate() { for (n, &address) in remote_addresses.iter().enumerate() {
let there_str = format!("{}", address); let there_str = format!("{}", address);
if here_str < there_str { if here_str < there_str {
connections[n] = connections[n] = match listener.accept() {
match listener.accept() { Ok((stream, _)) => {
Ok((stream, _)) => { info!("Connected to {}", there_str);
info!("Connected to {}", there_str); Some(Connection::new(stream))
Some(Connection::new(stream))
},
Err(_) => None
} }
Err(_) => None,
}
} }
} }
@ -57,14 +53,13 @@ pub fn make(bind_address: &SocketAddr,
for (n, &address) in remote_addresses.iter().enumerate() { for (n, &address) in remote_addresses.iter().enumerate() {
let there_str = format!("{}", address); let there_str = format!("{}", address);
if here_str > there_str { if here_str > there_str {
connections[n] = connections[n] = match TcpStream::connect(address) {
match TcpStream::connect(address) { Ok(stream) => {
Ok(stream) => { info!("Connected to {}", there_str);
info!("Connected to {}", there_str); Some(Connection::new(stream))
Some(Connection::new(stream))
},
Err(_) => None
} }
Err(_) => None,
}
} }
} }

View File

@ -40,21 +40,21 @@
#![feature(optin_builtin_traits)] #![feature(optin_builtin_traits)]
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate crossbeam;
extern crate merkle;
extern crate protobuf; extern crate protobuf;
extern crate ring; extern crate ring;
extern crate merkle;
extern crate crossbeam;
#[macro_use] #[macro_use]
extern crate crossbeam_channel; extern crate crossbeam_channel;
extern crate reed_solomon_erasure; extern crate reed_solomon_erasure;
pub mod agreement;
pub mod broadcast;
pub mod common_subset;
mod commst;
mod connection; mod connection;
pub mod messaging; pub mod messaging;
pub mod proto; pub mod proto;
mod proto_io; mod proto_io;
mod commst;
pub mod common_subset;
pub mod broadcast;
pub mod agreement;
pub mod node; pub mod node;

View File

@ -1,12 +1,12 @@
//! The local message delivery system. //! The local message delivery system.
use std::collections::{HashSet, HashMap, VecDeque}; use crossbeam::{Scope, ScopedJoinHandle};
use crossbeam_channel;
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use proto::Message;
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt::Debug; use std::fmt::Debug;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::RwLock; use std::sync::RwLock;
use crossbeam::{Scope, ScopedJoinHandle};
use crossbeam_channel;
use crossbeam_channel::{bounded, unbounded, Sender, Receiver};
use proto::Message;
/// Unique ID of a node. /// Unique ID of a node.
pub type NodeUid = SocketAddr; pub type NodeUid = SocketAddr;
@ -62,7 +62,7 @@ pub struct LocalMessage {
/// Identifier of the message destination algorithm. /// Identifier of the message destination algorithm.
pub dst: Algorithm, pub dst: Algorithm,
/// Payload /// Payload
pub message: AlgoMessage pub message: AlgoMessage,
} }
/// The message destinations corresponding to a remote node `i`. It can be /// The message destinations corresponding to a remote node `i`. It can be
@ -75,21 +75,21 @@ pub struct LocalMessage {
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub enum RemoteNode { pub enum RemoteNode {
All, All,
Node(NodeUid) Node(NodeUid),
} }
/// Message to or from a remote node. /// Message to or from a remote node.
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
pub struct RemoteMessage { pub struct RemoteMessage {
pub node: RemoteNode, pub node: RemoteNode,
pub message: Message<ProposedValue> pub message: Message<ProposedValue>,
} }
/// The union type of local and remote messages. /// The union type of local and remote messages.
#[derive(Clone)] #[derive(Clone)]
pub enum QMessage { pub enum QMessage {
Local(LocalMessage), Local(LocalMessage),
Remote(RemoteMessage) Remote(RemoteMessage),
} }
/// States of the message loop consided as an automaton with output. There is /// States of the message loop consided as an automaton with output. There is
@ -99,15 +99,14 @@ pub enum QMessage {
#[derive(Clone, PartialEq)] #[derive(Clone, PartialEq)]
pub enum MessageLoopState { pub enum MessageLoopState {
Processing(VecDeque<RemoteMessage>), Processing(VecDeque<RemoteMessage>),
Finished Finished,
} }
impl MessageLoopState { impl MessageLoopState {
pub fn is_processing(&self) -> bool { pub fn is_processing(&self) -> bool {
if let MessageLoopState::Processing(_) = self { if let MessageLoopState::Processing(_) = self {
true true
} } else {
else {
false false
} }
} }
@ -116,10 +115,8 @@ impl MessageLoopState {
/// emitted by the handler to the messages already queued from previous /// emitted by the handler to the messages already queued from previous
/// iterations of a message handling loop. /// iterations of a message handling loop.
pub fn append(&mut self, other: &mut MessageLoopState) { pub fn append(&mut self, other: &mut MessageLoopState) {
if let MessageLoopState::Processing(ref mut new_msgs) = other if let MessageLoopState::Processing(ref mut new_msgs) = other {
{ if let MessageLoopState::Processing(ref mut msgs) = self {
if let MessageLoopState::Processing(ref mut msgs) = self
{
msgs.append(new_msgs); msgs.append(new_msgs);
} }
} }
@ -132,8 +129,7 @@ impl MessageLoopState {
/// - either `Finished` or a state with outgoing messages to remote nodes - or /// - either `Finished` or a state with outgoing messages to remote nodes - or
/// an error. /// an error.
pub trait Handler<HandlerError: From<Error>>: Send + Sync { pub trait Handler<HandlerError: From<Error>>: Send + Sync {
fn handle(&self, m: QMessage, tx: Sender<QMessage>) -> fn handle(&self, m: QMessage, tx: Sender<QMessage>) -> Result<MessageLoopState, HandlerError>;
Result<MessageLoopState, HandlerError>;
} }
/// The queue functionality for messages sent between algorithm instances. /// The queue functionality for messages sent between algorithm instances.
@ -149,21 +145,20 @@ pub struct MessageLoop<'a, HandlerError: 'a + From<Error>> {
/// Remote send handles. Messages are sent through channels as opposed to /// Remote send handles. Messages are sent through channels as opposed to
/// directly to sockets. This is done to make tests independent of socket /// directly to sockets. This is done to make tests independent of socket
/// IO. /// IO.
remote_txs: HashMap<NodeUid, Sender<Message<ProposedValue>>> remote_txs: HashMap<NodeUid, Sender<Message<ProposedValue>>>,
} }
impl<'a, HandlerError> MessageLoop<'a, HandlerError> impl<'a, HandlerError> MessageLoop<'a, HandlerError>
where HandlerError: 'a + From<Error> where
HandlerError: 'a + From<Error>,
{ {
pub fn new(remote_txs: HashMap<NodeUid, Sender<Message<ProposedValue>>>) -> pub fn new(remote_txs: HashMap<NodeUid, Sender<Message<ProposedValue>>>) -> Self {
Self
{
let (queue_tx, queue_rx) = unbounded(); let (queue_tx, queue_rx) = unbounded();
MessageLoop { MessageLoop {
algos: RwLock::new(HashMap::new()), algos: RwLock::new(HashMap::new()),
queue_tx, queue_tx,
queue_rx, queue_rx,
remote_txs remote_txs,
} }
} }
@ -172,14 +167,11 @@ where HandlerError: 'a + From<Error>
} }
/// Registers a handler for messages sent to the given algorithm. /// Registers a handler for messages sent to the given algorithm.
pub fn insert_algo(&'a self, algo: Algorithm, pub fn insert_algo(&'a self, algo: Algorithm, handler: &'a Handler<HandlerError>) {
handler: &'a Handler<HandlerError>)
{
let lock = self.algos.write(); let lock = self.algos.write();
if let Ok(mut map) = lock { if let Ok(mut map) = lock {
map.insert(algo, handler); map.insert(algo, handler);
} } else {
else {
error!("Cannot insert {:?}", algo); error!("Cannot insert {:?}", algo);
} }
} }
@ -189,15 +181,13 @@ where HandlerError: 'a + From<Error>
let lock = self.algos.write(); let lock = self.algos.write();
if let Ok(mut map) = lock { if let Ok(mut map) = lock {
map.remove(algo); map.remove(algo);
} } else {
else {
error!("Cannot remove {:?}", algo); error!("Cannot remove {:?}", algo);
} }
} }
/// The message loop. /// The message loop.
pub fn run(&self) -> Result<MessageLoopState, HandlerError> pub fn run(&self) -> Result<MessageLoopState, HandlerError> {
{
let mut result = Ok(MessageLoopState::Processing(VecDeque::new())); let mut result = Ok(MessageLoopState::Processing(VecDeque::new()));
while let Ok(mut state) = result { while let Ok(mut state) = result {
@ -207,107 +197,99 @@ where HandlerError: 'a + From<Error>
self.send_remote(messages) self.send_remote(messages)
.map(|_| MessageLoopState::Processing(VecDeque::new())) .map(|_| MessageLoopState::Processing(VecDeque::new()))
.map_err(HandlerError::from) .map_err(HandlerError::from)
} } else {
else {
Ok(MessageLoopState::Finished) Ok(MessageLoopState::Finished)
})?; })?;
// Receive local and remote messages. // Receive local and remote messages.
if let Ok(m) = self.queue_rx.recv() { result = match m { if let Ok(m) = self.queue_rx.recv() {
QMessage::Local(LocalMessage { result = match m {
dst, QMessage::Local(LocalMessage { dst, message }) => {
message // FIXME: error handling
}) => { if let Some(mut handler) = self.algos.write().unwrap().get_mut(&dst) {
// FIXME: error handling let mut new_result = handler.handle(
if let Some(mut handler) = QMessage::Local(LocalMessage { dst, message }),
self.algos.write().unwrap().get_mut(&dst) self.queue_tx.clone(),
{
let mut new_result =
handler.handle(QMessage::Local(
LocalMessage {
dst,
message
}), self.queue_tx.clone()
); );
if let Ok(ref mut new_state) = new_result { if let Ok(ref mut new_state) = new_result {
state.append(new_state); state.append(new_state);
Ok(state) Ok(state)
} } else {
else { // Error overrides the previous state.
// Error overrides the previous state. new_result
new_result }
} else {
Err(Error::NoSuchAlgorithm).map_err(HandlerError::from)
} }
} }
else {
Err(Error::NoSuchAlgorithm).map_err(HandlerError::from)
}
}
// A message FROM a remote node. // A message FROM a remote node.
QMessage::Remote(RemoteMessage { QMessage::Remote(RemoteMessage { node, message }) => {
node, // Multicast the message to all algorithm instances,
message // collecting output messages iteratively and appending them
}) => { // to result.
// Multicast the message to all algorithm instances, //
// collecting output messages iteratively and appending them // FIXME: error handling
// to result. self.algos.write().unwrap().iter_mut().fold(
// Ok(state),
// FIXME: error handling |result1, (_, handler)| {
self.algos.write().unwrap().iter_mut() if let Ok(mut state1) = result1 {
.fold(Ok(state), handler
|result1, (_, handler)| { .handle(
if let Ok(mut state1) = result1 { QMessage::Remote(RemoteMessage {
handler.handle( node: node.clone(),
QMessage::Remote(RemoteMessage { message: message.clone(),
node: node.clone(), }),
message: message.clone() self.queue_tx.clone(),
}), self.queue_tx.clone() )
).map(|ref mut state2| { .map(|ref mut state2| {
state1.append(state2); state1.append(state2);
state1 state1
}) })
} } else {
else { result1
result1 }
} },
}
) )
}
} }
}} else { result = Err(Error::RecvError) } else {
.map_err(HandlerError::from) } result = Err(Error::RecvError).map_err(HandlerError::from)
}
} // end of while loop } // end of while loop
result result
} }
/// Send a message queue to remote nodes. /// Send a message queue to remote nodes.
fn send_remote(&self, messages: &VecDeque<RemoteMessage>) -> fn send_remote(&self, messages: &VecDeque<RemoteMessage>) -> Result<(), Error> {
Result<(), Error>
{
messages.iter().fold(Ok(()), |result, m| { messages.iter().fold(Ok(()), |result, m| {
if result.is_err() { result } else { match m { if result.is_err() {
RemoteMessage { result
node: RemoteNode::Node(uid), } else {
message match m {
} => { RemoteMessage {
if let Some(tx) = self.remote_txs.get(&uid) { node: RemoteNode::Node(uid),
tx.send(message.clone()).map_err(Error::from) message,
} => {
if let Some(tx) = self.remote_txs.get(&uid) {
tx.send(message.clone()).map_err(Error::from)
} else {
Err(Error::SendError)
}
} }
else {
Err(Error::SendError)
}
},
RemoteMessage { RemoteMessage {
node: RemoteNode::All, node: RemoteNode::All,
message message,
} => { } => self.remote_txs.iter().fold(result, |result1, (_, tx)| {
self.remote_txs.iter().fold(result, |result1, (_, tx)| { if result1.is_err() {
if result1.is_err() { result1 } else { result1
} else {
tx.send(message.clone()).map_err(Error::from) tx.send(message.clone()).map_err(Error::from)
} }
}) }),
} }
}} }
}) })
} }
} }
@ -321,26 +303,25 @@ where HandlerError: 'a + From<Error>
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub enum Target { pub enum Target {
All, All,
Node(usize) Node(usize),
} }
/// Message with a designated target. /// Message with a designated target.
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
pub struct TargetedMessage<T: Clone + Debug + Send + Sync> { pub struct TargetedMessage<T: Clone + Debug + Send + Sync> {
pub target: Target, pub target: Target,
pub message: Message<T> pub message: Message<T>,
} }
impl<T: Clone + Debug + Send + Sync> TargetedMessage<T> impl<T: Clone + Debug + Send + Sync> TargetedMessage<T> {
{
/// Initialises a message while checking parameter preconditions. /// Initialises a message while checking parameter preconditions.
pub fn new(target: Target, message: Message<T>) -> Option<Self> { pub fn new(target: Target, message: Message<T>) -> Option<Self> {
match target { match target {
Target::Node(i) if i == 0 => { Target::Node(i) if i == 0 => {
// Remote node indices start from 1. // Remote node indices start from 1.
None None
}, }
_ => Some(TargetedMessage{target, message}) _ => Some(TargetedMessage { target, message }),
} }
} }
} }
@ -353,7 +334,7 @@ impl<T: Clone + Debug + Send + Sync> TargetedMessage<T>
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct SourcedMessage<T: Clone + Debug + Send + Sync> { pub struct SourcedMessage<T: Clone + Debug + Send + Sync> {
pub source: usize, pub source: usize,
pub message: Message<T> pub message: Message<T>,
} }
/// The messaging struct allows for targeted message exchange between comms /// The messaging struct allows for targeted message exchange between comms
@ -388,27 +369,20 @@ pub struct Messaging<T: Clone + Debug + Send + Sync> {
impl<T: Clone + Debug + Send + Sync> Messaging<T> { impl<T: Clone + Debug + Send + Sync> Messaging<T> {
/// Initialises all the required TX and RX handles for the case on a total /// Initialises all the required TX and RX handles for the case on a total
/// number `num_nodes` of consensus nodes. /// number `num_nodes` of consensus nodes.
pub fn new(num_nodes: usize) -> Self pub fn new(num_nodes: usize) -> Self {
{ let to_comms: Vec<_> = (0..num_nodes - 1)
let to_comms: Vec<_> = (0 .. num_nodes - 1)
.map(|_| unbounded::<Message<T>>()) .map(|_| unbounded::<Message<T>>())
.collect(); .collect();
let txs_to_comms = to_comms.iter() let txs_to_comms = to_comms.iter().map(|&(ref tx, _)| tx.to_owned()).collect();
.map(|&(ref tx, _)| tx.to_owned()) let rxs_to_comms: Vec<Receiver<Message<T>>> =
.collect(); to_comms.iter().map(|&(_, ref rx)| rx.to_owned()).collect();
let rxs_to_comms: Vec<Receiver<Message<T>>> = to_comms.iter()
.map(|&(_, ref rx)| rx.to_owned())
.collect();
let (tx_from_comms, rx_from_comms) = unbounded(); let (tx_from_comms, rx_from_comms) = unbounded();
let to_algo: Vec<_> = (0 .. num_nodes) let to_algo: Vec<_> = (0..num_nodes)
.map(|_| unbounded::<SourcedMessage<T>>()) .map(|_| unbounded::<SourcedMessage<T>>())
.collect(); .collect();
let txs_to_algo = to_algo.iter() let txs_to_algo = to_algo.iter().map(|&(ref tx, _)| tx.to_owned()).collect();
.map(|&(ref tx, _)| tx.to_owned()) let rxs_to_algo: Vec<Receiver<SourcedMessage<T>>> =
.collect(); to_algo.iter().map(|&(_, ref rx)| rx.to_owned()).collect();
let rxs_to_algo: Vec<Receiver<SourcedMessage<T>>> = to_algo.iter()
.map(|&(_, ref rx)| rx.to_owned())
.collect();
let (tx_from_algo, rx_from_algo) = unbounded(); let (tx_from_algo, rx_from_algo) = unbounded();
let (stop_tx, stop_rx) = bounded(1); let (stop_tx, stop_rx) = bounded(1);
@ -459,9 +433,9 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
} }
/// Spawns the message delivery thread in a given thread scope. /// Spawns the message delivery thread in a given thread scope.
pub fn spawn<'a>(&self, scope: &Scope<'a>) -> pub fn spawn<'a>(&self, scope: &Scope<'a>) -> ScopedJoinHandle<Result<(), Error>>
ScopedJoinHandle<Result<(), Error>> where
where T: 'a T: 'a,
{ {
let txs_to_comms = self.txs_to_comms.to_owned(); let txs_to_comms = self.txs_to_comms.to_owned();
let rx_from_comms = self.rx_from_comms.to_owned(); let rx_from_comms = self.rx_from_comms.to_owned();
@ -472,66 +446,70 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
let mut stop = false; let mut stop = false;
// TODO: `select_loop!` seems to really confuse Clippy. // TODO: `select_loop!` seems to really confuse Clippy.
#[cfg_attr(feature = "cargo-clippy", allow(never_loop, #[cfg_attr(
if_let_redundant_pattern_matching, deref_addrof))] feature = "cargo-clippy",
allow(never_loop, if_let_redundant_pattern_matching, deref_addrof)
)]
scope.spawn(move || { scope.spawn(move || {
let mut result = Ok(()); let mut result = Ok(());
// This loop forwards messages according to their metadata. // This loop forwards messages according to their metadata.
while !stop && result.is_ok() { select_loop! { while !stop && result.is_ok() {
recv(rx_from_algo, message) => { select_loop! {
match message { recv(rx_from_algo, message) => {
TargetedMessage { match message {
target: Target::All, TargetedMessage {
message target: Target::All,
} => { message
// Send the message to all remote nodes, stopping at } => {
// the first error. // Send the message to all remote nodes, stopping at
result = txs_to_comms.iter() // the first error.
.fold(Ok(()), |result, tx| { result = txs_to_comms.iter()
if result.is_ok() { .fold(Ok(()), |result, tx| {
tx.send(message.clone()) if result.is_ok() {
} tx.send(message.clone())
else { }
result else {
} result
}).map_err(Error::from); }
}, }).map_err(Error::from);
TargetedMessage { },
target: Target::Node(i), TargetedMessage {
message target: Target::Node(i),
} => { message
// Remote node indices start from 1. } => {
assert!(i > 0); // Remote node indices start from 1.
// Convert node index to vector index. assert!(i > 0);
let i = i - 1; // Convert node index to vector index.
let i = i - 1;
result = if i < txs_to_comms.len() { result = if i < txs_to_comms.len() {
txs_to_comms[i].send(message.clone()) txs_to_comms[i].send(message.clone())
.map_err(Error::from) .map_err(Error::from)
}
else {
Err(Error::NoSuchTarget)
};
}
}
},
recv(rx_from_comms, message) => {
// Send the message to all algorithm instances, stopping at
// the first error.
result = txs_to_algo.iter().fold(Ok(()), |result, tx| {
if result.is_ok() {
tx.send(message.clone())
} }
else { else {
Err(Error::NoSuchTarget) result
}; }
} }).map_err(Error::from)
},
recv(stop_rx, _) => {
// Flag the thread ready to exit.
stop = true;
} }
},
recv(rx_from_comms, message) => {
// Send the message to all algorithm instances, stopping at
// the first error.
result = txs_to_algo.iter().fold(Ok(()), |result, tx| {
if result.is_ok() {
tx.send(message.clone())
}
else {
result
}
}).map_err(Error::from)
},
recv(stop_rx, _) => {
// Flag the thread ready to exit.
stop = true;
} }
}} // end of select_loop! } // end of select_loop!
result result
}) })
} }
@ -547,5 +525,7 @@ pub enum Error {
} }
impl<T> From<crossbeam_channel::SendError<T>> for Error { impl<T> From<crossbeam_channel::SendError<T>> for Error {
fn from(_: crossbeam_channel::SendError<T>) -> Error { Error::SendError } fn from(_: crossbeam_channel::SendError<T>) -> Error {
Error::SendError
}
} }

View File

@ -1,30 +1,34 @@
//! Networking controls of the consensus node. //! Networking controls of the consensus node.
use std::io;
use std::collections::HashSet;
use std::fmt::Debug;
use std::marker::{Send, Sync};
use std::net::SocketAddr;
use crossbeam; use crossbeam;
use merkle::Hashable; use merkle::Hashable;
use std::collections::HashSet;
use std::fmt::Debug;
use std::io;
use std::marker::{Send, Sync};
use std::net::SocketAddr;
use connection;
use broadcast; use broadcast;
use commst; use commst;
use connection;
use messaging::Messaging; use messaging::Messaging;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
IoError(io::Error), IoError(io::Error),
CommsError(commst::Error), CommsError(commst::Error),
NotImplemented NotImplemented,
} }
impl From<io::Error> for Error { impl From<io::Error> for Error {
fn from(err: io::Error) -> Error { Error::IoError(err) } fn from(err: io::Error) -> Error {
Error::IoError(err)
}
} }
impl From<commst::Error> for Error { impl From<commst::Error> for Error {
fn from(err: commst::Error) -> Error { Error::CommsError(err) } fn from(err: commst::Error) -> Error {
Error::CommsError(err)
}
} }
/// This is a structure to start a consensus node. /// This is a structure to start a consensus node.
@ -34,24 +38,23 @@ pub struct Node<T> {
/// Sockets of remote nodes. /// Sockets of remote nodes.
remotes: HashSet<SocketAddr>, remotes: HashSet<SocketAddr>,
/// Optionally, a value to be broadcast by this node. /// Optionally, a value to be broadcast by this node.
value: Option<T> value: Option<T>,
} }
impl<T: Clone + Debug + Hashable + PartialEq + Send + Sync impl<T: Clone + Debug + Hashable + PartialEq + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
+ From<Vec<u8>> + Into<Vec<u8>>>
Node<T> Node<T>
{ {
/// Consensus node constructor. It only initialises initial parameters. /// Consensus node constructor. It only initialises initial parameters.
pub fn new(addr: SocketAddr, pub fn new(addr: SocketAddr, remotes: HashSet<SocketAddr>, value: Option<T>) -> Self {
remotes: HashSet<SocketAddr>, Node {
value: Option<T>) -> Self addr,
{ remotes,
Node {addr, remotes, value} value,
}
} }
/// Consensus node procedure implementing HoneyBadgerBFT. /// Consensus node procedure implementing HoneyBadgerBFT.
pub fn run(&self) -> Result<T, Error> pub fn run(&self) -> Result<T, Error> {
{
let value = &self.value; let value = &self.value;
let connections = connection::make(&self.addr, &self.remotes); let connections = connection::make(&self.addr, &self.remotes);
let num_nodes = connections.len() + 1; let num_nodes = connections.len() + 1;
@ -76,61 +79,65 @@ impl<T: Clone + Debug + Hashable + PartialEq + Send + Sync
// node index is 0. // node index is 0.
let rx_to_algo0 = &rxs_to_algo[0]; let rx_to_algo0 = &rxs_to_algo[0];
broadcast_handles.push(scope.spawn(move || { broadcast_handles.push(scope.spawn(move || {
match broadcast::Instance::new(tx_from_algo, match broadcast::Instance::new(
rx_to_algo0, tx_from_algo,
value.to_owned(), rx_to_algo0,
num_nodes, value.to_owned(),
0) num_nodes,
.run() 0,
).run()
{ {
Ok(t) => { Ok(t) => {
debug!("Broadcast instance 0 succeeded: {}", debug!(
String::from_utf8(T::into(t)).unwrap()); "Broadcast instance 0 succeeded: {}",
}, String::from_utf8(T::into(t)).unwrap()
Err(e) => error!("Broadcast instance 0: {:?}", e) );
}
Err(e) => error!("Broadcast instance 0: {:?}", e),
} }
})); }));
// Start a comms task for each connection. Node indices of those // Start a comms task for each connection. Node indices of those
// tasks are 1 through N where N is the number of connections. // tasks are 1 through N where N is the number of connections.
for (i, c) in connections.iter().enumerate() { for (i, c) in connections.iter().enumerate() {
// Receive side of a single-consumer channel from algorithm // Receive side of a single-consumer channel from algorithm
// actor tasks to the comms task. // actor tasks to the comms task.
let rx_to_comms = &rxs_to_comms[i]; let rx_to_comms = &rxs_to_comms[i];
let node_index = i + 1; let node_index = i + 1;
scope.spawn(move || { scope.spawn(move || {
match commst::CommsTask::new(tx_from_comms, match commst::CommsTask::new(
rx_to_comms, tx_from_comms,
// FIXME: handle error rx_to_comms,
c.stream.try_clone().unwrap(), // FIXME: handle error
node_index) c.stream.try_clone().unwrap(),
.run() node_index,
).run()
{ {
Ok(_) => debug!("Comms task {} succeeded", node_index), Ok(_) => debug!("Comms task {} succeeded", node_index),
Err(e) => error!("Comms task {}: {:?}", node_index, e) Err(e) => error!("Comms task {}: {:?}", node_index, e),
} }
}); });
// Associate a broadcast instance to the above comms task. // Associate a broadcast instance to the above comms task.
let rx_to_algo = &rxs_to_algo[node_index]; let rx_to_algo = &rxs_to_algo[node_index];
broadcast_handles.push(scope.spawn(move || { broadcast_handles.push(scope.spawn(move || {
match broadcast::Instance::new(tx_from_algo, match broadcast::Instance::new(
rx_to_algo, tx_from_algo,
None, rx_to_algo,
num_nodes, None,
node_index) num_nodes,
.run() node_index,
).run()
{ {
Ok(t) => { Ok(t) => {
debug!("Broadcast instance {} succeeded: {}", debug!(
node_index, "Broadcast instance {} succeeded: {}",
String::from_utf8(T::into(t)).unwrap()); node_index,
}, String::from_utf8(T::into(t)).unwrap()
Err(e) => error!("Broadcast instance {}: {:?}", );
node_index, e) }
Err(e) => error!("Broadcast instance {}: {:?}", node_index, e),
} }
})); }));
} }
@ -142,13 +149,16 @@ impl<T: Clone + Debug + Hashable + PartialEq + Send + Sync
} }
// Stop the messaging task. // Stop the messaging task.
stop_tx.send(()).map_err(|e| { stop_tx
error!("{}", e); .send(())
}).unwrap(); .map_err(|e| {
error!("{}", e);
})
.unwrap();
match msg_handle.join() { match msg_handle.join() {
Ok(()) => debug!("Messaging stopped OK"), Ok(()) => debug!("Messaging stopped OK"),
Err(e) => debug!("Messaging error: {:?}", e) Err(e) => debug!("Messaging error: {:?}", e),
} }
// TODO: continue the implementation of the asynchronous common // TODO: continue the implementation of the asynchronous common
// subset algorithm. // subset algorithm.

View File

@ -1,29 +1,29 @@
//! Construction of messages from protobuf buffers. //! Construction of messages from protobuf buffers.
pub mod message; pub mod message;
use merkle::proof::{Lemma, Positioned, Proof};
use proto::message::*;
use protobuf::core::parse_from_bytes;
use protobuf::error::{ProtobufError, ProtobufResult, WireError};
use protobuf::Message as ProtobufMessage;
use ring::digest::Algorithm;
use std::fmt; use std::fmt;
use std::marker::{Send, Sync}; use std::marker::{Send, Sync};
use ring::digest::Algorithm;
use merkle::proof::{Proof, Lemma, Positioned};
use protobuf::Message as ProtobufMessage;
use proto::message::*;
use protobuf::error::{ProtobufResult, ProtobufError, WireError};
use protobuf::core::parse_from_bytes;
/// Kinds of message sent by nodes participating in consensus. /// Kinds of message sent by nodes participating in consensus.
#[derive (Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
pub enum Message<T: Send + Sync> { pub enum Message<T: Send + Sync> {
Broadcast(BroadcastMessage<T>), Broadcast(BroadcastMessage<T>),
Agreement(AgreementMessage) Agreement(AgreementMessage),
} }
/// The three kinds of message sent during the reliable broadcast stage of the /// The three kinds of message sent during the reliable broadcast stage of the
/// consensus algorithm. /// consensus algorithm.
#[derive (Clone, PartialEq)] #[derive(Clone, PartialEq)]
pub enum BroadcastMessage<T: Send + Sync> { pub enum BroadcastMessage<T: Send + Sync> {
Value(Proof<T>), Value(Proof<T>),
Echo(Proof<T>), Echo(Proof<T>),
Ready(Vec<u8>) Ready(Vec<u8>),
} }
pub struct HexBytes<'a>(pub &'a [u8]); pub struct HexBytes<'a>(pub &'a [u8]);
@ -51,10 +51,13 @@ struct HexProof<'a, T: 'a>(&'a Proof<T>);
impl<'a, T: Send + Sync + fmt::Debug> fmt::Debug for HexProof<'a, T> { impl<'a, T: Send + Sync + fmt::Debug> fmt::Debug for HexProof<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Proof {{ algorithm: {:?}, root_hash: {:?}, lemma: .., value: {:?} }}", write!(
self.0.algorithm, f,
HexBytes(&self.0.root_hash), "Proof {{ algorithm: {:?}, root_hash: {:?}, lemma: .., value: {:?} }}",
self.0.value) self.0.algorithm,
HexBytes(&self.0.root_hash),
self.0.value
)
} }
} }
@ -63,15 +66,13 @@ impl<T: Send + Sync + fmt::Debug> fmt::Debug for BroadcastMessage<T> {
match *self { match *self {
BroadcastMessage::Value(ref v) => write!(f, "Value({:?})", HexProof(&v)), BroadcastMessage::Value(ref v) => write!(f, "Value({:?})", HexProof(&v)),
BroadcastMessage::Echo(ref v) => write!(f, "Echo({:?})", HexProof(&v)), BroadcastMessage::Echo(ref v) => write!(f, "Echo({:?})", HexProof(&v)),
BroadcastMessage::Ready(ref bytes) => { BroadcastMessage::Ready(ref bytes) => write!(f, "Ready({:?})", HexBytes(bytes)),
write!(f, "Ready({:?})", HexBytes(bytes))
}
} }
} }
} }
/// Messages sent during the binary Byzantine agreement stage. /// Messages sent during the binary Byzantine agreement stage.
#[derive (Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
pub enum AgreementMessage { pub enum AgreementMessage {
// TODO // TODO
} }
@ -83,34 +84,33 @@ impl<T: Send + Sync> Message<T> {
/// to be fully serialised and sent as a whole, or it can be passed over /// to be fully serialised and sent as a whole, or it can be passed over
/// using an ID and the `Eq` instance to discriminate the finite set of /// using an ID and the `Eq` instance to discriminate the finite set of
/// algorithms in `ring::digest`. /// algorithms in `ring::digest`.
pub fn from_proto(mut proto: message::MessageProto) pub fn from_proto(mut proto: message::MessageProto) -> Option<Self>
-> Option<Self> where
where T: From<Vec<u8>> T: From<Vec<u8>>,
{ {
if proto.has_broadcast() { if proto.has_broadcast() {
BroadcastMessage::from_proto(proto.take_broadcast(), BroadcastMessage::from_proto(
// TODO, possibly move Algorithm inside proto.take_broadcast(),
// BroadcastMessage // TODO, possibly move Algorithm inside
&::ring::digest::SHA256) // BroadcastMessage
.map(Message::Broadcast) &::ring::digest::SHA256,
} ).map(Message::Broadcast)
else if proto.has_agreement() { } else if proto.has_agreement() {
AgreementMessage::from_proto(proto.take_agreement()) AgreementMessage::from_proto(proto.take_agreement()).map(Message::Agreement)
.map(Message::Agreement) } else {
}
else {
None None
} }
} }
pub fn into_proto(self) -> MessageProto pub fn into_proto(self) -> MessageProto
where T: Into<Vec<u8>> where
T: Into<Vec<u8>>,
{ {
let mut m = MessageProto::new(); let mut m = MessageProto::new();
match self { match self {
Message::Broadcast(b) => { Message::Broadcast(b) => {
m.set_broadcast(b.into_proto()); m.set_broadcast(b.into_proto());
}, }
Message::Agreement(a) => { Message::Agreement(a) => {
m.set_agreement(a.into_proto()); m.set_agreement(a.into_proto());
} }
@ -123,21 +123,22 @@ impl<T: Send + Sync> Message<T> {
/// TODO: pass custom errors from down the chain of nested parsers as /// TODO: pass custom errors from down the chain of nested parsers as
/// opposed to returning `WireError::Other`. /// opposed to returning `WireError::Other`.
pub fn parse_from_bytes(bytes: &[u8]) -> ProtobufResult<Self> pub fn parse_from_bytes(bytes: &[u8]) -> ProtobufResult<Self>
where T: From<Vec<u8>> where
T: From<Vec<u8>>,
{ {
let r = parse_from_bytes::<MessageProto>(bytes) let r = parse_from_bytes::<MessageProto>(bytes).map(Self::from_proto);
.map(Self::from_proto);
match r { match r {
Ok(Some(m)) => Ok(m), Ok(Some(m)) => Ok(m),
Ok(None) => Err(ProtobufError::WireError(WireError::Other)), Ok(None) => Err(ProtobufError::WireError(WireError::Other)),
Err(e) => Err(e) Err(e) => Err(e),
} }
} }
/// Produce a protobuf representation of this `Message`. /// Produce a protobuf representation of this `Message`.
pub fn write_to_bytes(self) -> ProtobufResult<Vec<u8>> pub fn write_to_bytes(self) -> ProtobufResult<Vec<u8>>
where T: Into<Vec<u8>> where
T: Into<Vec<u8>>,
{ {
self.into_proto().write_to_bytes() self.into_proto().write_to_bytes()
} }
@ -145,7 +146,8 @@ impl<T: Send + Sync> Message<T> {
impl<T: Send + Sync> BroadcastMessage<T> { impl<T: Send + Sync> BroadcastMessage<T> {
pub fn into_proto(self) -> BroadcastProto pub fn into_proto(self) -> BroadcastProto
where T: Into<Vec<u8>> where
T: Into<Vec<u8>>,
{ {
let mut b = BroadcastProto::new(); let mut b = BroadcastProto::new();
match self { match self {
@ -153,12 +155,12 @@ impl<T: Send + Sync> BroadcastMessage<T> {
let mut v = ValueProto::new(); let mut v = ValueProto::new();
v.set_proof(ProofProto::from_proof(p)); v.set_proof(ProofProto::from_proof(p));
b.set_value(v); b.set_value(v);
}, }
BroadcastMessage::Echo(p) => { BroadcastMessage::Echo(p) => {
let mut e = EchoProto::new(); let mut e = EchoProto::new();
e.set_proof(ProofProto::from_proof(p)); e.set_proof(ProofProto::from_proof(p));
b.set_echo(e); b.set_echo(e);
}, }
BroadcastMessage::Ready(h) => { BroadcastMessage::Ready(h) => {
let mut r = ReadyProto::new(); let mut r = ReadyProto::new();
r.set_root_hash(h); r.set_root_hash(h);
@ -167,39 +169,37 @@ impl<T: Send + Sync> BroadcastMessage<T> {
b b
} }
pub fn from_proto(mut mp: BroadcastProto, pub fn from_proto(mut mp: BroadcastProto, algorithm: &'static Algorithm) -> Option<Self>
algorithm: &'static Algorithm) where
-> Option<Self> T: From<Vec<u8>>,
where T: From<Vec<u8>>
{ {
if mp.has_value() { if mp.has_value() {
mp.take_value().take_proof().into_proof(algorithm) mp.take_value()
.take_proof()
.into_proof(algorithm)
.map(BroadcastMessage::Value) .map(BroadcastMessage::Value)
} } else if mp.has_echo() {
else if mp.has_echo() { mp.take_echo()
mp.take_echo().take_proof().into_proof(algorithm) .take_proof()
.into_proof(algorithm)
.map(BroadcastMessage::Echo) .map(BroadcastMessage::Echo)
} } else if mp.has_ready() {
else if mp.has_ready() {
let h = mp.take_ready().take_root_hash(); let h = mp.take_ready().take_root_hash();
Some(BroadcastMessage::Ready(h)) Some(BroadcastMessage::Ready(h))
} } else {
else {
None None
} }
} }
} }
impl AgreementMessage { impl AgreementMessage {
pub fn into_proto(self) -> AgreementProto pub fn into_proto(self) -> AgreementProto {
{
unimplemented!(); unimplemented!();
} }
// TODO: Re-enable lint once implemented. // TODO: Re-enable lint once implemented.
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
pub fn from_proto(_mp: AgreementProto) -> Option<Self> pub fn from_proto(_mp: AgreementProto) -> Option<Self> {
{
unimplemented!(); unimplemented!();
} }
} }
@ -209,9 +209,9 @@ impl AgreementMessage {
/// `Proof` outside its crate. /// `Proof` outside its crate.
impl ProofProto { impl ProofProto {
pub fn from_proof<T>(proof: Proof<T>) -> Self pub fn from_proof<T>(proof: Proof<T>) -> Self
where T: Into<Vec<u8>> where
T: Into<Vec<u8>>,
{ {
let mut proto = Self::new(); let mut proto = Self::new();
match proof { match proof {
@ -231,10 +231,9 @@ impl ProofProto {
proto proto
} }
pub fn into_proof<T>(mut self, pub fn into_proof<T>(mut self, algorithm: &'static Algorithm) -> Option<Proof<T>>
algorithm: &'static Algorithm) where
-> Option<Proof<T>> T: From<Vec<u8>>,
where T: From<Vec<u8>>
{ {
if !self.has_lemma() { if !self.has_lemma() {
return None; return None;
@ -256,21 +255,21 @@ impl LemmaProto {
let mut proto = Self::new(); let mut proto = Self::new();
match lemma { match lemma {
Lemma {node_hash, sibling_hash, sub_lemma} => { Lemma {
node_hash,
sibling_hash,
sub_lemma,
} => {
proto.set_node_hash(node_hash); proto.set_node_hash(node_hash);
if let Some(sub_proto) = sub_lemma.map( if let Some(sub_proto) = sub_lemma.map(|l| Self::from_lemma(*l)) {
|l| Self::from_lemma(*l))
{
proto.set_sub_lemma(sub_proto); proto.set_sub_lemma(sub_proto);
} }
match sibling_hash { match sibling_hash {
Some(Positioned::Left(hash)) => Some(Positioned::Left(hash)) => proto.set_left_sibling_hash(hash),
proto.set_left_sibling_hash(hash),
Some(Positioned::Right(hash)) => Some(Positioned::Right(hash)) => proto.set_right_sibling_hash(hash),
proto.set_right_sibling_hash(hash),
None => {} None => {}
} }
@ -295,12 +294,10 @@ impl LemmaProto {
// If a `sub_lemma` is present is the Protobuf, // If a `sub_lemma` is present is the Protobuf,
// then we expect it to unserialize to a valid `Lemma`, // then we expect it to unserialize to a valid `Lemma`,
// otherwise we return `None` // otherwise we return `None`
self.take_sub_lemma().into_lemma().map(|sub_lemma| { self.take_sub_lemma().into_lemma().map(|sub_lemma| Lemma {
Lemma { node_hash,
node_hash, sibling_hash,
sibling_hash, sub_lemma: Some(Box::new(sub_lemma)),
sub_lemma: Some(Box::new(sub_lemma)),
}
}) })
} else { } else {
// We might very well not have a sub_lemma, // We might very well not have a sub_lemma,

View File

@ -1,11 +1,11 @@
//! Protobuf message IO task structure. //! Protobuf message IO task structure.
use std::{cmp,io}; use proto::*;
use std::io::Read;
use std::net::TcpStream;
use protobuf; use protobuf;
use protobuf::Message as ProtobufMessage; use protobuf::Message as ProtobufMessage;
use proto::*; use std::io::Read;
use std::net::TcpStream;
use std::{cmp, io};
/// A magic key to put right before each message. An atavism of primitive serial /// A magic key to put right before each message. An atavism of primitive serial
/// protocols. /// protocols.
@ -24,14 +24,18 @@ pub enum Error {
} }
impl From<io::Error> for Error { impl From<io::Error> for Error {
fn from(err: io::Error) -> Error { Error::IoError(err) } fn from(err: io::Error) -> Error {
Error::IoError(err)
}
} }
impl From<protobuf::ProtobufError> for Error { impl From<protobuf::ProtobufError> for Error {
fn from(err: protobuf::ProtobufError) -> Error { Error::ProtobufError(err) } fn from(err: protobuf::ProtobufError) -> Error {
Error::ProtobufError(err)
}
} }
fn encode_u32_to_be(value: u32, buffer: &mut[u8]) -> Result<(), Error> { fn encode_u32_to_be(value: u32, buffer: &mut [u8]) -> Result<(), Error> {
if buffer.len() < 4 { if buffer.len() < 4 {
return Err(Error::EncodeError); return Err(Error::EncodeError);
} }
@ -81,7 +85,8 @@ impl ProtoIo
} }
pub fn recv<T>(&mut self) -> Result<Message<T>, Error> pub fn recv<T>(&mut self) -> Result<Message<T>, Error>
where T: Clone + Send + Sync + From<Vec<u8>> // + Into<Vec<u8>> where
T: Clone + Send + Sync + From<Vec<u8>>, // + Into<Vec<u8>>
{ {
self.stream.read_exact(&mut self.buffer[0..4])?; self.stream.read_exact(&mut self.buffer[0..4])?;
let frame_start = decode_u32_from_be(&self.buffer[0..4])?; let frame_start = decode_u32_from_be(&self.buffer[0..4])?;
@ -94,19 +99,18 @@ impl ProtoIo
let mut message_v: Vec<u8> = Vec::new(); let mut message_v: Vec<u8> = Vec::new();
message_v.reserve(size); message_v.reserve(size);
while message_v.len() < size { while message_v.len() < size {
let num_to_read = cmp::min(self.buffer.len(), size - let num_to_read = cmp::min(self.buffer.len(), size - message_v.len());
message_v.len());
let (slice, _) = self.buffer.split_at_mut(num_to_read); let (slice, _) = self.buffer.split_at_mut(num_to_read);
self.stream.read_exact(slice)?; self.stream.read_exact(slice)?;
message_v.extend_from_slice(slice); message_v.extend_from_slice(slice);
} }
Message::parse_from_bytes(&message_v) Message::parse_from_bytes(&message_v).map_err(Error::ProtobufError)
.map_err(Error::ProtobufError)
} }
pub fn send<T>(&mut self, message: Message<T>) -> Result<(), Error> pub fn send<T>(&mut self, message: Message<T>) -> Result<(), Error>
where T: Clone + Send + Sync + Into<Vec<u8>> where
T: Clone + Send + Sync + Into<Vec<u8>>,
{ {
let mut buffer: [u8; 4] = [0; 4]; let mut buffer: [u8; 4] = [0; 4];
// Wrap stream // Wrap stream

View File

@ -3,24 +3,23 @@
extern crate hbbft; extern crate hbbft;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate simple_logger;
extern crate crossbeam; extern crate crossbeam;
extern crate crossbeam_channel; extern crate crossbeam_channel;
extern crate merkle; extern crate merkle;
extern crate simple_logger;
mod netsim; mod netsim;
use std::collections::{HashSet, HashMap}; use crossbeam_channel::{Receiver, Sender};
use crossbeam_channel::{Sender, Receiver}; use std::collections::{HashMap, HashSet};
use hbbft::proto::*;
use hbbft::messaging;
use hbbft::messaging::{QMessage, NodeUid, Algorithm, ProposedValue,
AlgoMessage, Handler, LocalMessage,
MessageLoop, RemoteMessage, RemoteNode};
use hbbft::broadcast::Broadcast;
use hbbft::broadcast; use hbbft::broadcast;
use hbbft::broadcast::Broadcast;
use hbbft::common_subset; use hbbft::common_subset;
use hbbft::messaging;
use hbbft::messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoop, NodeUid,
ProposedValue, QMessage, RemoteMessage, RemoteNode};
use hbbft::proto::*;
use netsim::NetSim; use netsim::NetSim;
@ -37,14 +36,14 @@ pub struct TestNode<'a> {
message_loop: MessageLoop<'a, Error>, message_loop: MessageLoop<'a, Error>,
} }
impl<'a> TestNode<'a> impl<'a> TestNode<'a> {
{
/// Consensus node constructor. It only initialises initial parameters. /// Consensus node constructor. It only initialises initial parameters.
pub fn new(uid: NodeUid, pub fn new(
txs: HashMap<NodeUid, Sender<Message<ProposedValue>>>, uid: NodeUid,
rxs: HashMap<NodeUid, Receiver<Message<ProposedValue>>>, txs: HashMap<NodeUid, Sender<Message<ProposedValue>>>,
value: Option<ProposedValue>) -> Self rxs: HashMap<NodeUid, Receiver<Message<ProposedValue>>>,
{ value: Option<ProposedValue>,
) -> Self {
TestNode { TestNode {
uid, uid,
rxs, rxs,
@ -53,22 +52,18 @@ impl<'a> TestNode<'a>
} }
} }
pub fn add_handler<H: 'a + Handler<Error>>(&'a self, pub fn add_handler<H: 'a + Handler<Error>>(&'a self, algo: Algorithm, handler: &'a H) {
algo: Algorithm,
handler: &'a H)
{
self.message_loop.insert_algo(algo, handler); self.message_loop.insert_algo(algo, handler);
} }
pub fn run(&'a self) -> Result<HashSet<ProposedValue>, Error> pub fn run(&'a self) -> Result<HashSet<ProposedValue>, Error> {
{
let tx = self.message_loop.queue_tx(); let tx = self.message_loop.queue_tx();
if let Some(value) = &self.value { if let Some(value) = &self.value {
// Start the broadcast value transmission. // Start the broadcast value transmission.
tx.send(QMessage::Local(LocalMessage { tx.send(QMessage::Local(LocalMessage {
dst: Algorithm::Broadcast(self.uid), dst: Algorithm::Broadcast(self.uid),
message: AlgoMessage::BroadcastInput(value.clone()) message: AlgoMessage::BroadcastInput(value.clone()),
}))?; }))?;
} }
@ -86,7 +81,7 @@ impl<'a> TestNode<'a>
// FIXME: error handling // FIXME: error handling
tx.send(QMessage::Remote(RemoteMessage { tx.send(QMessage::Remote(RemoteMessage {
node: RemoteNode::Node(*uid), node: RemoteNode::Node(*uid),
message message,
})).unwrap(); })).unwrap();
} }
debug!("Node {} receiver {} terminated", self_uid, uid); debug!("Node {} receiver {} terminated", self_uid, uid);
@ -106,23 +101,31 @@ pub enum Error {
Broadcast(broadcast::Error), Broadcast(broadcast::Error),
CommonSubset(common_subset::Error), CommonSubset(common_subset::Error),
Send(crossbeam_channel::SendError<QMessage>), Send(crossbeam_channel::SendError<QMessage>),
NotImplemented NotImplemented,
} }
impl From<messaging::Error> for Error { impl From<messaging::Error> for Error {
fn from(e: messaging::Error) -> Error { Error::Messaging(e) } fn from(e: messaging::Error) -> Error {
Error::Messaging(e)
}
} }
impl From<broadcast::Error> for Error { impl From<broadcast::Error> for Error {
fn from(e: broadcast::Error) -> Error { Error::Broadcast(e) } fn from(e: broadcast::Error) -> Error {
Error::Broadcast(e)
}
} }
impl From<common_subset::Error> for Error { impl From<common_subset::Error> for Error {
fn from(e: common_subset::Error) -> Error { Error::CommonSubset(e) } fn from(e: common_subset::Error) -> Error {
Error::CommonSubset(e)
}
} }
impl From<crossbeam_channel::SendError<QMessage>> for Error { impl From<crossbeam_channel::SendError<QMessage>> for Error {
fn from(e: crossbeam_channel::SendError<QMessage>) -> Error { Error::Send(e) } fn from(e: crossbeam_channel::SendError<QMessage>) -> Error {
Error::Send(e)
}
} }
fn proposed_value(n: usize) -> ProposedValue { fn proposed_value(n: usize) -> ProposedValue {
@ -135,10 +138,10 @@ fn node_addr(node_index: usize) -> NodeUid {
} }
/// Creates test nodes but does not run them. /// Creates test nodes but does not run them.
fn create_test_nodes(num_nodes: usize, fn create_test_nodes(
net: &NetSim<Message<Vec<u8>>>) -> num_nodes: usize,
HashMap<NodeUid, (TestNode, HashMap<NodeUid, Broadcast>)> net: &NetSim<Message<Vec<u8>>>,
{ ) -> HashMap<NodeUid, (TestNode, HashMap<NodeUid, Broadcast>)> {
let mut nodes = HashMap::new(); let mut nodes = HashMap::new();
for n in 0..num_nodes { for n in 0..num_nodes {
let value = proposed_value(n); let value = proposed_value(n);
@ -156,8 +159,7 @@ fn create_test_nodes(num_nodes: usize,
} }
let uid = node_addr(n); let uid = node_addr(n);
let all_uids: HashSet<NodeUid> = let all_uids: HashSet<NodeUid> = (0..num_nodes).into_iter().map(node_addr).collect();
(0..num_nodes).into_iter().map(node_addr).collect();
let all_uids_copy = all_uids.clone(); let all_uids_copy = all_uids.clone();
// Create a broadcast algorithm instance for each node. // Create a broadcast algorithm instance for each node.
@ -166,15 +168,20 @@ fn create_test_nodes(num_nodes: usize,
match Broadcast::new(uid, all_uids_copy.clone(), num_nodes) { match Broadcast::new(uid, all_uids_copy.clone(), num_nodes) {
Ok(instance) => { Ok(instance) => {
broadcast_instances.insert(uid, instance); broadcast_instances.insert(uid, instance);
}, }
Err(e) => { Err(e) => {
panic!("{:?}", e); panic!("{:?}", e);
} }
} }
} }
nodes.insert(uid, (TestNode::new(uid, txs, rxs, Some(value)), nodes.insert(
broadcast_instances)); uid,
(
TestNode::new(uid, txs, rxs, Some(value)),
broadcast_instances,
),
);
} }
nodes nodes
} }
@ -192,15 +199,17 @@ fn test_4_broadcast_nodes() {
crossbeam::scope(|scope| { crossbeam::scope(|scope| {
// Run the test nodes, each in its own thread. // Run the test nodes, each in its own thread.
for (uid, (node, broadcast_instances)) in &nodes { for (uid, (node, broadcast_instances)) in &nodes {
join_handles.insert(*uid, scope.spawn(move || { join_handles.insert(
// Register broadcast instance handlers with the message loop. *uid,
for (instance_uid, instance) in broadcast_instances { scope.spawn(move || {
node.add_handler(Algorithm::Broadcast(*instance_uid), // Register broadcast instance handlers with the message loop.
instance); for (instance_uid, instance) in broadcast_instances {
} node.add_handler(Algorithm::Broadcast(*instance_uid), instance);
debug!("Running {:?}", node.uid); }
node.run() debug!("Running {:?}", node.uid);
})); node.run()
}),
);
} }
for (uid, join_handle) in join_handles { for (uid, join_handle) in join_handles {

View File

@ -5,7 +5,7 @@
extern crate crossbeam_channel; extern crate crossbeam_channel;
extern crate log; extern crate log;
use crossbeam_channel::{Sender, Receiver, unbounded}; use crossbeam_channel::{unbounded, Receiver, Sender};
pub struct NetSim<Message: Clone + Send + Sync> { pub struct NetSim<Message: Clone + Send + Sync> {
/// The number of simulated nodes. /// The number of simulated nodes.
@ -21,19 +21,13 @@ impl<Message: Clone + Send + Sync> NetSim<Message> {
assert!(num_nodes > 1); assert!(num_nodes > 1);
// All channels of a totally connected network of size `num_nodes`. // All channels of a totally connected network of size `num_nodes`.
let channels: Vec<(Sender<Message>, Receiver<Message>)> = let channels: Vec<(Sender<Message>, Receiver<Message>)> =
(0 .. num_nodes * num_nodes) (0..num_nodes * num_nodes).map(|_| unbounded()).collect();
.map(|_| unbounded()) let txs = channels.iter().map(|&(ref tx, _)| tx.to_owned()).collect();
.collect(); let rxs = channels.iter().map(|&(_, ref rx)| rx.to_owned()).collect();
let txs = channels.iter()
.map(|&(ref tx, _)| tx.to_owned())
.collect();
let rxs = channels.iter()
.map(|&(_, ref rx)| rx.to_owned())
.collect();
NetSim { NetSim {
num_nodes, num_nodes,
txs, txs,
rxs rxs,
} }
} }