mirror of https://github.com/poanetwork/hbbft.git
Merge branch 'afck--examples' of github.com:poanetwork/hbbft
This commit is contained in:
commit
dda1570bfc
|
@ -1,13 +1,18 @@
|
|||
//! Example of a consensus node that uses the `hbbft::node::Node` struct for
|
||||
//! running the distributed consensus state machine.
|
||||
//#[macro_use]
|
||||
extern crate crossbeam;
|
||||
#[macro_use]
|
||||
extern crate crossbeam_channel;
|
||||
extern crate docopt;
|
||||
extern crate hbbft;
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
extern crate simple_logger;
|
||||
|
||||
mod network;
|
||||
|
||||
use docopt::Docopt;
|
||||
use hbbft::node::Node;
|
||||
use network::node::Node;
|
||||
use std::collections::HashSet;
|
||||
use std::net::SocketAddr;
|
||||
use std::vec::Vec;
|
||||
|
|
|
@ -8,10 +8,9 @@ use std::io;
|
|||
use std::net::TcpStream;
|
||||
use std::sync::Arc;
|
||||
|
||||
use messaging::SourcedMessage;
|
||||
use proto::Message;
|
||||
use proto_io;
|
||||
use proto_io::ProtoIo;
|
||||
use hbbft::messaging::SourcedMessage;
|
||||
use hbbft::proto::Message;
|
||||
use hbbft::proto_io::{self, ProtoIo};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
|
@ -32,7 +31,7 @@ pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + I
|
|||
/// The receive side of the channel to the comms thread.
|
||||
rx: &'a Receiver<Message<T>>,
|
||||
/// The socket IO task.
|
||||
io: ProtoIo,
|
||||
io: ProtoIo<TcpStream>,
|
||||
/// The index of this comms task for identification against its remote node.
|
||||
pub node_index: usize,
|
||||
}
|
|
@ -0,0 +1,189 @@
|
|||
//! The local message delivery system.
|
||||
use crossbeam::{Scope, ScopedJoinHandle};
|
||||
use crossbeam_channel;
|
||||
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
|
||||
use hbbft::messaging::{SourcedMessage, Target, TargetedMessage};
|
||||
use hbbft::proto::Message;
|
||||
use std::fmt::Debug;
|
||||
|
||||
/// The queue functionality for messages sent between algorithm instances.
|
||||
/// The messaging struct allows for targeted message exchange between comms
|
||||
/// tasks on one side and algo tasks on the other.
|
||||
pub struct Messaging<T: Clone + Debug + Send + Sync> {
|
||||
/// Transmit sides of message channels to comms threads.
|
||||
txs_to_comms: Vec<Sender<Message<T>>>,
|
||||
/// Receive side of the routed message channel from comms threads.
|
||||
rx_from_comms: Receiver<SourcedMessage<T>>,
|
||||
/// Transmit sides of message channels to algo threads.
|
||||
txs_to_algo: Vec<Sender<SourcedMessage<T>>>,
|
||||
/// Receive side of the routed message channel from comms threads.
|
||||
rx_from_algo: Receiver<TargetedMessage<T>>,
|
||||
|
||||
/// RX handles to be used by comms tasks.
|
||||
rxs_to_comms: Vec<Receiver<Message<T>>>,
|
||||
/// TX handle to be used by comms tasks.
|
||||
tx_from_comms: Sender<SourcedMessage<T>>,
|
||||
/// RX handles to be used by algo tasks.
|
||||
rxs_to_algo: Vec<Receiver<SourcedMessage<T>>>,
|
||||
/// TX handle to be used by algo tasks.
|
||||
tx_from_algo: Sender<TargetedMessage<T>>,
|
||||
|
||||
/// Control channel used to stop the listening thread.
|
||||
stop_tx: Sender<()>,
|
||||
stop_rx: Receiver<()>,
|
||||
}
|
||||
|
||||
impl<T: Clone + Debug + Send + Sync> Messaging<T> {
|
||||
/// Initialises all the required TX and RX handles for the case on a total
|
||||
/// number `num_nodes` of consensus nodes.
|
||||
pub fn new(num_nodes: usize) -> Self {
|
||||
let to_comms: Vec<_> = (0..num_nodes - 1)
|
||||
.map(|_| unbounded::<Message<T>>())
|
||||
.collect();
|
||||
let txs_to_comms = to_comms.iter().map(|&(ref tx, _)| tx.to_owned()).collect();
|
||||
let rxs_to_comms: Vec<Receiver<Message<T>>> =
|
||||
to_comms.iter().map(|&(_, ref rx)| rx.to_owned()).collect();
|
||||
let (tx_from_comms, rx_from_comms) = unbounded();
|
||||
let to_algo: Vec<_> = (0..num_nodes)
|
||||
.map(|_| unbounded::<SourcedMessage<T>>())
|
||||
.collect();
|
||||
let txs_to_algo = to_algo.iter().map(|&(ref tx, _)| tx.to_owned()).collect();
|
||||
let rxs_to_algo: Vec<Receiver<SourcedMessage<T>>> =
|
||||
to_algo.iter().map(|&(_, ref rx)| rx.to_owned()).collect();
|
||||
let (tx_from_algo, rx_from_algo) = unbounded();
|
||||
|
||||
let (stop_tx, stop_rx) = bounded(1);
|
||||
|
||||
Messaging {
|
||||
// internally used handles
|
||||
txs_to_comms,
|
||||
rx_from_comms,
|
||||
txs_to_algo,
|
||||
rx_from_algo,
|
||||
|
||||
// externally used handles
|
||||
rxs_to_comms,
|
||||
tx_from_comms,
|
||||
rxs_to_algo,
|
||||
tx_from_algo,
|
||||
|
||||
stop_tx,
|
||||
stop_rx,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rxs_to_comms(&self) -> &Vec<Receiver<Message<T>>> {
|
||||
&self.rxs_to_comms
|
||||
}
|
||||
|
||||
pub fn tx_from_comms(&self) -> &Sender<SourcedMessage<T>> {
|
||||
&self.tx_from_comms
|
||||
}
|
||||
|
||||
pub fn rxs_to_algo(&self) -> &Vec<Receiver<SourcedMessage<T>>> {
|
||||
&self.rxs_to_algo
|
||||
}
|
||||
|
||||
pub fn tx_from_algo(&self) -> &Sender<TargetedMessage<T>> {
|
||||
&self.tx_from_algo
|
||||
}
|
||||
|
||||
/// Gives the ownership of the handle to stop the message receive loop.
|
||||
pub fn stop_tx(&self) -> Sender<()> {
|
||||
self.stop_tx.to_owned()
|
||||
}
|
||||
|
||||
/// Spawns the message delivery thread in a given thread scope.
|
||||
pub fn spawn<'a>(&self, scope: &Scope<'a>) -> ScopedJoinHandle<Result<(), Error>>
|
||||
where
|
||||
T: 'a,
|
||||
{
|
||||
let txs_to_comms = self.txs_to_comms.to_owned();
|
||||
let rx_from_comms = self.rx_from_comms.to_owned();
|
||||
let txs_to_algo = self.txs_to_algo.to_owned();
|
||||
let rx_from_algo = self.rx_from_algo.to_owned();
|
||||
|
||||
let stop_rx = self.stop_rx.to_owned();
|
||||
let mut stop = false;
|
||||
|
||||
// TODO: `select_loop!` seems to really confuse Clippy.
|
||||
#[cfg_attr(
|
||||
feature = "cargo-clippy",
|
||||
allow(never_loop, if_let_redundant_pattern_matching, deref_addrof)
|
||||
)]
|
||||
scope.spawn(move || {
|
||||
let mut result = Ok(());
|
||||
// This loop forwards messages according to their metadata.
|
||||
while !stop && result.is_ok() {
|
||||
select_loop! {
|
||||
recv(rx_from_algo, message) => {
|
||||
match message {
|
||||
TargetedMessage {
|
||||
target: Target::All,
|
||||
message
|
||||
} => {
|
||||
// Send the message to all remote nodes, stopping at
|
||||
// the first error.
|
||||
result = txs_to_comms.iter()
|
||||
.fold(Ok(()), |result, tx| {
|
||||
if result.is_ok() {
|
||||
tx.send(message.clone())
|
||||
}
|
||||
else {
|
||||
result
|
||||
}
|
||||
}).map_err(Error::from);
|
||||
},
|
||||
TargetedMessage {
|
||||
target: Target::Node(i),
|
||||
message
|
||||
} => {
|
||||
// Remote node indices start from 1.
|
||||
assert!(i > 0);
|
||||
// Convert node index to vector index.
|
||||
let i = i - 1;
|
||||
|
||||
result = if i < txs_to_comms.len() {
|
||||
txs_to_comms[i].send(message.clone())
|
||||
.map_err(Error::from)
|
||||
}
|
||||
else {
|
||||
Err(Error::NoSuchTarget)
|
||||
};
|
||||
}
|
||||
}
|
||||
},
|
||||
recv(rx_from_comms, message) => {
|
||||
// Send the message to all algorithm instances, stopping at
|
||||
// the first error.
|
||||
result = txs_to_algo.iter().fold(Ok(()), |result, tx| {
|
||||
if result.is_ok() {
|
||||
tx.send(message.clone())
|
||||
}
|
||||
else {
|
||||
result
|
||||
}
|
||||
}).map_err(Error::from)
|
||||
},
|
||||
recv(stop_rx, _) => {
|
||||
// Flag the thread ready to exit.
|
||||
stop = true;
|
||||
}
|
||||
}
|
||||
} // end of select_loop!
|
||||
result
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum Error {
|
||||
NoSuchTarget,
|
||||
SendError,
|
||||
}
|
||||
|
||||
impl<T> From<crossbeam_channel::SendError<T>> for Error {
|
||||
fn from(_: crossbeam_channel::SendError<T>) -> Error {
|
||||
Error::SendError
|
||||
}
|
||||
}
|
|
@ -0,0 +1,4 @@
|
|||
pub mod commst;
|
||||
pub mod connection;
|
||||
pub mod messaging;
|
||||
pub mod node;
|
|
@ -1,16 +1,50 @@
|
|||
//! Networking controls of the consensus node.
|
||||
//!
|
||||
//! ## Example
|
||||
//!
|
||||
//! The following code could be run on host 192.168.1.1:
|
||||
//!
|
||||
//! ```ignore
|
||||
//! extern crate hbbft;
|
||||
//!
|
||||
//! use hbbft::node::Node;
|
||||
//! use std::net::SocketAddr;
|
||||
//! use std::vec::Vec;
|
||||
//!
|
||||
//! fn main() {
|
||||
//! let bind_address = "127.0.0.1:10001".parse().unwrap();
|
||||
//! let remote_addresses = vec!["192.168.1.2:10002",
|
||||
//! "192.168.1.3:10003",
|
||||
//! "192.168.1.4:10004"]
|
||||
//! .iter()
|
||||
//! .map(|s| s.parse().unwrap())
|
||||
//! .collect();
|
||||
//!
|
||||
//! let value = "Value #1".as_bytes().to_vec();
|
||||
//!
|
||||
//! let result = Node::new(bind_address, remote_addresses, Some(value))
|
||||
//! .run();
|
||||
//! println!("Consensus result {:?}", result);
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! Similar code shall then run on hosts 192.168.1.2, 192.168.1.3 and
|
||||
//! 192.168.1.4 with appropriate changes in `bind_address` and
|
||||
//! `remote_addresses`. Each host has it's own optional broadcast `value`. If
|
||||
//! the consensus `result` is not an error then every successfully terminated
|
||||
//! consensus node will be the same `result`.
|
||||
|
||||
use crossbeam;
|
||||
use merkle::Hashable;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Debug;
|
||||
use std::io;
|
||||
use std::marker::{Send, Sync};
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use broadcast;
|
||||
use commst;
|
||||
use connection;
|
||||
use messaging::Messaging;
|
||||
use hbbft::broadcast;
|
||||
use network::commst;
|
||||
use network::connection;
|
||||
use network::messaging::Messaging;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
|
@ -41,7 +75,7 @@ pub struct Node<T> {
|
|||
value: Option<T>,
|
||||
}
|
||||
|
||||
impl<T: Clone + Debug + Hashable + PartialEq + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
|
||||
impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
|
||||
Node<T>
|
||||
{
|
||||
/// Consensus node constructor. It only initialises initial parameters.
|
432
src/broadcast.rs
432
src/broadcast.rs
|
@ -1,5 +1,4 @@
|
|||
//! Reliable broadcast algorithm instance.
|
||||
use crossbeam;
|
||||
use crossbeam_channel::{Receiver, RecvError, SendError, Sender};
|
||||
use merkle::proof::{Lemma, Positioned, Proof};
|
||||
use merkle::{Hashable, MerkleTree};
|
||||
|
@ -11,11 +10,12 @@ use std::fmt::Debug;
|
|||
use std::hash::Hash;
|
||||
use std::iter;
|
||||
use std::marker::{Send, Sync};
|
||||
use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
|
||||
use std::sync::{RwLock, RwLockWriteGuard};
|
||||
|
||||
use messaging;
|
||||
use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoopState, ProposedValue,
|
||||
QMessage, RemoteMessage, RemoteNode, SourcedMessage, Target, TargetedMessage};
|
||||
use messaging::{SourcedMessage, Target, TargetedMessage};
|
||||
|
||||
// TODO: Make this a generic argument of `Broadcast`.
|
||||
type ProposedValue = Vec<u8>;
|
||||
|
||||
type MessageQueue<NodeUid> = VecDeque<TargetedBroadcastMessage<NodeUid>>;
|
||||
|
||||
|
@ -26,12 +26,12 @@ pub struct TargetedBroadcastMessage<NodeUid> {
|
|||
pub message: BroadcastMessage<ProposedValue>,
|
||||
}
|
||||
|
||||
impl TargetedBroadcastMessage<messaging::NodeUid> {
|
||||
pub fn into_remote_message(self) -> RemoteMessage {
|
||||
RemoteMessage {
|
||||
node: match self.target {
|
||||
BroadcastTarget::All => RemoteNode::All,
|
||||
BroadcastTarget::Node(node) => RemoteNode::Node(node),
|
||||
impl TargetedBroadcastMessage<usize> {
|
||||
pub fn into_targeted_message(self) -> TargetedMessage<ProposedValue> {
|
||||
TargetedMessage {
|
||||
target: match self.target {
|
||||
BroadcastTarget::All => Target::All,
|
||||
BroadcastTarget::Node(node) => Target::Node(node),
|
||||
},
|
||||
message: Message::Broadcast(self.message),
|
||||
}
|
||||
|
@ -74,95 +74,6 @@ pub struct Broadcast<NodeUid: Eq + Hash> {
|
|||
state: RwLock<BroadcastState>,
|
||||
}
|
||||
|
||||
impl Broadcast<messaging::NodeUid> {
|
||||
/// The message-driven interface function for calls from the main message
|
||||
/// loop.
|
||||
pub fn on_message(
|
||||
&self,
|
||||
m: QMessage,
|
||||
tx: &Sender<QMessage>,
|
||||
) -> Result<MessageLoopState, Error> {
|
||||
match m {
|
||||
QMessage::Local(LocalMessage { message, .. }) => match message {
|
||||
AlgoMessage::BroadcastInput(value) => self.on_local_message(&mut value.to_owned()),
|
||||
|
||||
_ => Err(Error::UnexpectedMessage),
|
||||
},
|
||||
|
||||
QMessage::Remote(RemoteMessage {
|
||||
node: RemoteNode::Node(uid),
|
||||
message,
|
||||
}) => {
|
||||
if let Message::Broadcast(b) = message {
|
||||
self.on_remote_message(uid, b, tx)
|
||||
} else {
|
||||
Err(Error::UnexpectedMessage)
|
||||
}
|
||||
}
|
||||
|
||||
_ => Err(Error::UnexpectedMessage),
|
||||
}
|
||||
}
|
||||
|
||||
fn on_remote_message(
|
||||
&self,
|
||||
uid: messaging::NodeUid,
|
||||
message: BroadcastMessage<ProposedValue>,
|
||||
tx: &Sender<QMessage>,
|
||||
) -> Result<MessageLoopState, Error> {
|
||||
let (output, messages) = self.handle_broadcast_message(&uid, message)?;
|
||||
if let Some(value) = output {
|
||||
tx.send(QMessage::Local(LocalMessage {
|
||||
dst: Algorithm::CommonSubset,
|
||||
message: AlgoMessage::BroadcastOutput(self.our_id, value),
|
||||
})).map_err(Error::from)?;
|
||||
}
|
||||
Ok(MessageLoopState::Processing(
|
||||
messages
|
||||
.into_iter()
|
||||
.map(TargetedBroadcastMessage::into_remote_message)
|
||||
.collect(),
|
||||
))
|
||||
}
|
||||
|
||||
/// Processes the proposed value input by broadcasting it.
|
||||
pub fn on_local_message(&self, value: &mut ProposedValue) -> Result<MessageLoopState, Error> {
|
||||
let mut state = self.state.write().unwrap();
|
||||
// Split the value into chunks/shards, encode them with erasure codes.
|
||||
// Assemble a Merkle tree from data and parity shards. Take all proofs
|
||||
// from this tree and send them, each to its own node.
|
||||
self.send_shards(value.clone())
|
||||
.map(|(proof, remote_messages)| {
|
||||
// Record the first proof as if it were sent by the node to
|
||||
// itself.
|
||||
let h = proof.root_hash.clone();
|
||||
if proof.validate(h.as_slice()) {
|
||||
// Save the leaf value for reconstructing the tree later.
|
||||
state.leaf_values[index_of_proof(&proof)] =
|
||||
Some(proof.value.clone().into_boxed_slice());
|
||||
state.leaf_values_num += 1;
|
||||
state.root_hash = Some(h);
|
||||
}
|
||||
|
||||
MessageLoopState::Processing(
|
||||
remote_messages
|
||||
.into_iter()
|
||||
.map(TargetedBroadcastMessage::into_remote_message)
|
||||
.collect(),
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, E> Handler<E> for Broadcast<messaging::NodeUid>
|
||||
where
|
||||
E: From<Error> + From<messaging::Error>,
|
||||
{
|
||||
fn handle(&self, m: QMessage, tx: Sender<QMessage>) -> Result<MessageLoopState, E> {
|
||||
self.on_message(m, &tx).map_err(E::from)
|
||||
}
|
||||
}
|
||||
|
||||
impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
|
||||
/// Creates a new broadcast instance to be used by node `our_id` which expects a value proposal
|
||||
/// from node `proposer_id`.
|
||||
|
@ -487,14 +398,10 @@ pub struct Instance<'a, T: 'a + Clone + Debug + Send + Sync> {
|
|||
tx: &'a Sender<TargetedMessage<ProposedValue>>,
|
||||
/// The receive side of the channel from comms threads.
|
||||
rx: &'a Receiver<SourcedMessage<ProposedValue>>,
|
||||
/// The broadcast algorithm instance.
|
||||
broadcast: Broadcast<usize>,
|
||||
/// Value to be broadcast.
|
||||
broadcast_value: Option<T>,
|
||||
/// This instance's index for identification against its comms task.
|
||||
node_index: usize,
|
||||
/// Number of nodes participating in broadcast.
|
||||
num_nodes: usize,
|
||||
/// Maximum allowed number of faulty nodes.
|
||||
num_faulty_nodes: usize,
|
||||
}
|
||||
|
||||
impl<'a, T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>>
|
||||
|
@ -505,45 +412,25 @@ impl<'a, T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8
|
|||
rx: &'a Receiver<SourcedMessage<ProposedValue>>,
|
||||
broadcast_value: Option<T>,
|
||||
num_nodes: usize,
|
||||
node_index: usize,
|
||||
proposer_index: usize,
|
||||
) -> Self {
|
||||
let all_indexes = (0..num_nodes).collect();
|
||||
let broadcast = Broadcast::new(0, proposer_index, all_indexes)
|
||||
.expect("failed to instantiate broadcast");
|
||||
Instance {
|
||||
tx,
|
||||
rx,
|
||||
broadcast,
|
||||
broadcast_value,
|
||||
node_index,
|
||||
num_nodes,
|
||||
num_faulty_nodes: (num_nodes - 1) / 3,
|
||||
}
|
||||
}
|
||||
|
||||
/// Broadcast stage task returning the computed values in case of success,
|
||||
/// and an error in case of failure.
|
||||
pub fn run(&mut self) -> Result<T, Error> {
|
||||
pub fn run(self) -> Result<T, Error> {
|
||||
// Broadcast state machine thread.
|
||||
let bvalue = self.broadcast_value.to_owned();
|
||||
let result: Result<T, Error>;
|
||||
let result_r = Arc::new(Mutex::new(None));
|
||||
let result_r_scoped = result_r.clone();
|
||||
|
||||
crossbeam::scope(|scope| {
|
||||
scope.spawn(move || {
|
||||
*result_r_scoped.lock().unwrap() = Some(inner_run(
|
||||
self.tx,
|
||||
self.rx,
|
||||
bvalue,
|
||||
self.node_index,
|
||||
self.num_nodes,
|
||||
self.num_faulty_nodes,
|
||||
));
|
||||
});
|
||||
});
|
||||
if let Some(ref r) = *result_r.lock().unwrap() {
|
||||
result = r.to_owned();
|
||||
} else {
|
||||
result = Err(Error::Threading);
|
||||
}
|
||||
result
|
||||
let bvalue: Option<ProposedValue> = self.broadcast_value.map(|v| v.into());
|
||||
inner_run(self.tx, self.rx, bvalue, &self.broadcast).map(ProposedValue::into)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -554,7 +441,6 @@ pub enum Error {
|
|||
Threading,
|
||||
ProofConstructionFailed,
|
||||
ReedSolomon(rse::Error),
|
||||
Send(SendError<QMessage>),
|
||||
SendDeprecated(SendError<TargetedMessage<ProposedValue>>),
|
||||
Recv(RecvError),
|
||||
UnexpectedMessage,
|
||||
|
@ -567,12 +453,6 @@ impl From<rse::Error> for Error {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<SendError<QMessage>> for Error {
|
||||
fn from(err: SendError<QMessage>) -> Error {
|
||||
Error::Send(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SendError<TargetedMessage<ProposedValue>>> for Error {
|
||||
fn from(err: SendError<TargetedMessage<ProposedValue>>) -> Error {
|
||||
Error::SendDeprecated(err)
|
||||
|
@ -585,138 +465,25 @@ impl From<RecvError> for Error {
|
|||
}
|
||||
}
|
||||
|
||||
/// Breaks the input value into shards of equal length and encodes them -- and
|
||||
/// some extra parity shards -- with a Reed-Solomon erasure coding scheme. The
|
||||
/// returned value contains the shard assigned to this node. That shard doesn't
|
||||
/// need to be sent anywhere. It is returned to the broadcast instance and gets
|
||||
/// recorded immediately.
|
||||
fn send_shards<'a, T>(
|
||||
value: T,
|
||||
/// The main loop of the broadcast task.
|
||||
fn inner_run<'a>(
|
||||
tx: &'a Sender<TargetedMessage<ProposedValue>>,
|
||||
coding: &ReedSolomon,
|
||||
) -> Result<Proof<ProposedValue>, Error>
|
||||
where
|
||||
T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>,
|
||||
{
|
||||
let data_shard_num = coding.data_shard_count();
|
||||
let parity_shard_num = coding.parity_shard_count();
|
||||
|
||||
debug!(
|
||||
"Data shards: {}, parity shards: {}",
|
||||
data_shard_num, parity_shard_num
|
||||
);
|
||||
let mut v: Vec<u8> = T::into(value);
|
||||
// Insert the length of `v` so it can be decoded without the padding.
|
||||
let payload_len = v.len() as u8;
|
||||
v.insert(0, payload_len); // TODO: Handle messages larger than 255 bytes.
|
||||
let value_len = v.len();
|
||||
// Size of a Merkle tree leaf value, in bytes.
|
||||
let shard_len = if value_len % data_shard_num > 0 {
|
||||
value_len / data_shard_num + 1
|
||||
} else {
|
||||
value_len / data_shard_num
|
||||
};
|
||||
// Pad the last data shard with zeros. Fill the parity shards with zeros.
|
||||
v.resize(shard_len * (data_shard_num + parity_shard_num), 0);
|
||||
|
||||
debug!("value_len {}, shard_len {}", value_len, shard_len);
|
||||
|
||||
// Divide the vector into chunks/shards.
|
||||
let shards_iter = v.chunks_mut(shard_len);
|
||||
// Convert the iterator over slices into a vector of slices.
|
||||
let mut shards: Vec<&mut [u8]> = Vec::new();
|
||||
for s in shards_iter {
|
||||
shards.push(s);
|
||||
}
|
||||
|
||||
debug!("Shards before encoding: {:?}", shards);
|
||||
|
||||
// Construct the parity chunks/shards
|
||||
coding.encode(shards.as_mut_slice())?;
|
||||
|
||||
debug!("Shards: {:?}", shards);
|
||||
|
||||
let shards_t: Vec<ProposedValue> = shards.into_iter().map(|s| s.to_vec()).collect();
|
||||
|
||||
// Convert the Merkle tree into a partial binary tree for later
|
||||
// deconstruction into compound branches.
|
||||
let mtree = MerkleTree::from_vec(&::ring::digest::SHA256, shards_t);
|
||||
|
||||
// Default result in case of `gen_proof` error.
|
||||
let mut result = Err(Error::ProofConstructionFailed);
|
||||
|
||||
// Send each proof to a node.
|
||||
for (i, leaf_value) in mtree.iter().enumerate() {
|
||||
let proof = mtree.gen_proof(leaf_value.to_vec());
|
||||
if let Some(proof) = proof {
|
||||
if i == 0 {
|
||||
// The first proof is addressed to this node.
|
||||
result = Ok(proof);
|
||||
} else {
|
||||
// Rest of the proofs are sent to remote nodes.
|
||||
tx.send(TargetedMessage {
|
||||
target: Target::Node(i),
|
||||
message: Message::Broadcast(BroadcastMessage::Value(proof)),
|
||||
})?;
|
||||
}
|
||||
rx: &'a Receiver<SourcedMessage<ProposedValue>>,
|
||||
broadcast_value: Option<ProposedValue>,
|
||||
broadcast: &Broadcast<usize>,
|
||||
) -> Result<ProposedValue, Error> {
|
||||
if let Some(v) = broadcast_value {
|
||||
for msg in broadcast
|
||||
.propose_value(v)?
|
||||
.into_iter()
|
||||
.map(TargetedBroadcastMessage::into_targeted_message)
|
||||
{
|
||||
tx.send(msg)?;
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// The main loop of the broadcast task.
|
||||
fn inner_run<'a, T>(
|
||||
tx: &'a Sender<TargetedMessage<ProposedValue>>,
|
||||
rx: &'a Receiver<SourcedMessage<ProposedValue>>,
|
||||
broadcast_value: Option<T>,
|
||||
node_index: usize,
|
||||
num_nodes: usize,
|
||||
num_faulty_nodes: usize,
|
||||
) -> Result<T, Error>
|
||||
where
|
||||
T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>,
|
||||
{
|
||||
// Erasure coding scheme: N - 2f value shards and 2f parity shards
|
||||
let parity_shard_num = 2 * num_faulty_nodes;
|
||||
let data_shard_num = num_nodes - parity_shard_num;
|
||||
let coding = ReedSolomon::new(data_shard_num, parity_shard_num)?;
|
||||
// currently known leaf values
|
||||
let mut leaf_values: Vec<Option<Box<[u8]>>> = vec![None; num_nodes];
|
||||
// Write-once root hash of a tree broadcast from the sender associated with
|
||||
// this instance.
|
||||
let mut root_hash: Option<Vec<u8>> = None;
|
||||
// number of non-None leaf values
|
||||
let mut leaf_values_num = 0;
|
||||
|
||||
// Split the value into chunks/shards, encode them with erasure codes.
|
||||
// Assemble a Merkle tree from data and parity shards. Take all proofs from
|
||||
// this tree and send them, each to its own node.
|
||||
if let Some(v) = broadcast_value {
|
||||
send_shards(v, tx, &coding).map(|proof| {
|
||||
// Record the first proof as if it were sent by the node to
|
||||
// itself.
|
||||
let h = proof.root_hash.clone();
|
||||
if proof.validate(h.as_slice()) {
|
||||
// Save the leaf value for reconstructing the tree later.
|
||||
leaf_values[index_of_proof(&proof)] = Some(proof.value.clone().into_boxed_slice());
|
||||
leaf_values_num += 1;
|
||||
root_hash = Some(h);
|
||||
}
|
||||
})?
|
||||
}
|
||||
|
||||
// return value
|
||||
let mut result: Option<Result<T, Error>> = None;
|
||||
// Number of times Echo was received with the same root hash.
|
||||
let mut echo_num = 0;
|
||||
// Number of times Ready was received with the same root hash.
|
||||
let mut readys: HashMap<Vec<u8>, usize> = HashMap::new();
|
||||
let mut ready_sent = false;
|
||||
let mut ready_to_decode = false;
|
||||
|
||||
// TODO: handle exit conditions
|
||||
while result.is_none() {
|
||||
loop {
|
||||
// Receive a message from the socket IO task.
|
||||
let message = rx.recv()?;
|
||||
if let SourcedMessage {
|
||||
|
@ -724,132 +491,19 @@ where
|
|||
message: Message::Broadcast(message),
|
||||
} = message
|
||||
{
|
||||
match message {
|
||||
// A value received. Record the value and multicast an echo.
|
||||
BroadcastMessage::Value(p) => {
|
||||
if i != node_index {
|
||||
// Ignore value messages from unrelated remote nodes.
|
||||
continue;
|
||||
}
|
||||
|
||||
if root_hash.is_none() {
|
||||
root_hash = Some(p.root_hash.clone());
|
||||
debug!(
|
||||
"Node {} Value root hash {:?}",
|
||||
node_index,
|
||||
HexBytes(&p.root_hash)
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(ref h) = root_hash {
|
||||
if p.validate(h.as_slice()) {
|
||||
// Save the leaf value for reconstructing the tree
|
||||
// later.
|
||||
leaf_values[index_of_proof(&p)] =
|
||||
Some(p.value.clone().into_boxed_slice());
|
||||
leaf_values_num += 1;
|
||||
}
|
||||
}
|
||||
// Broadcast an echo of this proof.
|
||||
tx.send(TargetedMessage {
|
||||
target: Target::All,
|
||||
message: Message::Broadcast(BroadcastMessage::Echo(p)),
|
||||
})?
|
||||
}
|
||||
|
||||
// An echo received. Verify the proof it contains.
|
||||
BroadcastMessage::Echo(p) => {
|
||||
if root_hash.is_none() && i == node_index {
|
||||
root_hash = Some(p.root_hash.clone());
|
||||
debug!("Node {} Echo root hash {:?}", node_index, root_hash);
|
||||
}
|
||||
|
||||
// call validate with the root hash as argument
|
||||
if let Some(ref h) = root_hash {
|
||||
if p.validate(h.as_slice()) {
|
||||
echo_num += 1;
|
||||
// Save the leaf value for reconstructing the tree
|
||||
// later.
|
||||
leaf_values[index_of_proof(&p)] =
|
||||
Some(p.value.clone().into_boxed_slice());
|
||||
leaf_values_num += 1;
|
||||
|
||||
// upon receiving 2f + 1 matching READY(h)
|
||||
// messages, wait for N − 2 f ECHO messages, then
|
||||
// decode v
|
||||
if ready_to_decode
|
||||
&& leaf_values_num >= num_nodes - 2 * num_faulty_nodes
|
||||
{
|
||||
result = Some(decode_from_shards(
|
||||
&mut leaf_values,
|
||||
&coding,
|
||||
data_shard_num,
|
||||
h,
|
||||
));
|
||||
} else if leaf_values_num >= num_nodes - num_faulty_nodes {
|
||||
result = Some(decode_from_shards(
|
||||
&mut leaf_values,
|
||||
&coding,
|
||||
data_shard_num,
|
||||
h,
|
||||
));
|
||||
// if Ready has not yet been sent, multicast
|
||||
// Ready
|
||||
if !ready_sent {
|
||||
ready_sent = true;
|
||||
tx.send(TargetedMessage {
|
||||
target: Target::All,
|
||||
message: Message::Broadcast(BroadcastMessage::Ready(
|
||||
h.to_owned(),
|
||||
)),
|
||||
})?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
BroadcastMessage::Ready(ref hash) => {
|
||||
// Update the number Ready has been received with this hash.
|
||||
*readys.entry(hash.to_vec()).or_insert(1) += 1;
|
||||
|
||||
// Check that the root hash matches.
|
||||
if let Some(ref h) = root_hash {
|
||||
let ready_num: usize = *readys.get(h).unwrap_or(&0);
|
||||
|
||||
// Upon receiving f + 1 matching Ready(h) messages, if
|
||||
// Ready has not yet been sent, multicast Ready(h).
|
||||
if (ready_num == num_faulty_nodes + 1) && !ready_sent {
|
||||
tx.send(TargetedMessage {
|
||||
target: Target::All,
|
||||
message: Message::Broadcast(BroadcastMessage::Ready(h.to_vec())),
|
||||
})?;
|
||||
}
|
||||
|
||||
// Upon receiving 2f + 1 matching Ready(h) messages,
|
||||
// wait for N − 2f Echo messages, then decode v.
|
||||
if ready_num > 2 * num_faulty_nodes {
|
||||
// Wait for N - 2f Echo messages, then decode v.
|
||||
if echo_num >= num_nodes - 2 * num_faulty_nodes {
|
||||
result = Some(decode_from_shards(
|
||||
&mut leaf_values,
|
||||
&coding,
|
||||
data_shard_num,
|
||||
h,
|
||||
));
|
||||
} else {
|
||||
ready_to_decode = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let (opt_output, msgs) = broadcast.handle_broadcast_message(&i, message)?;
|
||||
for msg in msgs.into_iter()
|
||||
.map(TargetedBroadcastMessage::into_targeted_message)
|
||||
{
|
||||
tx.send(msg)?;
|
||||
}
|
||||
if let Some(output) = opt_output {
|
||||
return Ok(output);
|
||||
}
|
||||
} else {
|
||||
error!("Incorrect message from the socket: {:?}", message);
|
||||
}
|
||||
}
|
||||
// result is not a None, safe to extract value
|
||||
result.unwrap()
|
||||
}
|
||||
|
||||
fn decode_from_shards<T>(
|
||||
|
|
|
@ -13,10 +13,11 @@ use agreement::Agreement;
|
|||
use broadcast;
|
||||
use broadcast::{Broadcast, TargetedBroadcastMessage};
|
||||
|
||||
use messaging::ProposedValue;
|
||||
|
||||
use proto::{AgreementMessage, BroadcastMessage};
|
||||
|
||||
// TODO: Make this a generic argument of `Broadcast`.
|
||||
type ProposedValue = Vec<u8>;
|
||||
|
||||
/// Input from a remote node to Common Subset.
|
||||
pub enum Input<NodeUid> {
|
||||
/// Message from a remote node `uid` to the broadcast instance `uid`.
|
||||
|
|
45
src/lib.rs
45
src/lib.rs
|
@ -2,59 +2,20 @@
|
|||
//!
|
||||
//! Library of asynchronous Byzantine fault tolerant consensus known as "the
|
||||
//! honey badger of BFT protocols" after a paper with the same title.
|
||||
//!
|
||||
//! ## Example
|
||||
//!
|
||||
//! The following code could be run on host 192.168.1.1:
|
||||
//!
|
||||
//! ```ignore
|
||||
//! extern crate hbbft;
|
||||
//!
|
||||
//! use hbbft::node::Node;
|
||||
//! use std::net::SocketAddr;
|
||||
//! use std::vec::Vec;
|
||||
//!
|
||||
//! fn main() {
|
||||
//! let bind_address = "127.0.0.1:10001".parse().unwrap();
|
||||
//! let remote_addresses = vec!["192.168.1.2:10002",
|
||||
//! "192.168.1.3:10003",
|
||||
//! "192.168.1.4:10004"]
|
||||
//! .iter()
|
||||
//! .map(|s| s.parse().unwrap())
|
||||
//! .collect();
|
||||
//!
|
||||
//! let value = "Value #1".as_bytes().to_vec();
|
||||
//!
|
||||
//! let result = Node::new(bind_address, remote_addresses, Some(value))
|
||||
//! .run();
|
||||
//! println!("Consensus result {:?}", result);
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! Similar code shall then run on hosts 192.168.1.2, 192.168.1.3 and
|
||||
//! 192.168.1.4 with appropriate changes in `bind_address` and
|
||||
//! `remote_addresses`. Each host has it's own optional broadcast `value`. If
|
||||
//! the consensus `result` is not an error then every successfully terminated
|
||||
//! consensus node will be the same `result`.
|
||||
|
||||
#![feature(optin_builtin_traits)]
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
extern crate crossbeam;
|
||||
extern crate crossbeam_channel;
|
||||
extern crate merkle;
|
||||
extern crate protobuf;
|
||||
extern crate ring;
|
||||
#[macro_use]
|
||||
extern crate crossbeam_channel;
|
||||
extern crate reed_solomon_erasure;
|
||||
extern crate ring;
|
||||
|
||||
pub mod agreement;
|
||||
pub mod broadcast;
|
||||
pub mod common_subset;
|
||||
mod commst;
|
||||
mod connection;
|
||||
pub mod messaging;
|
||||
pub mod proto;
|
||||
mod proto_io;
|
||||
|
||||
pub mod node;
|
||||
pub mod proto_io;
|
||||
|
|
503
src/messaging.rs
503
src/messaging.rs
|
@ -1,297 +1,16 @@
|
|||
//! The local message delivery system.
|
||||
use crossbeam::{Scope, ScopedJoinHandle};
|
||||
use crossbeam_channel;
|
||||
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
|
||||
use proto::Message;
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use std::fmt::Debug;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::RwLock;
|
||||
|
||||
/// Unique ID of a node.
|
||||
pub type NodeUid = SocketAddr;
|
||||
|
||||
/// Type of algorithm primitive used in HoneyBadgerBFT.
|
||||
///
|
||||
/// TODO: Add the epoch parameter?
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||
pub enum Algorithm {
|
||||
/// Encryption stage.
|
||||
Encryption,
|
||||
/// Decryption stage.
|
||||
Decryption,
|
||||
/// Asynchronous Common Subset.
|
||||
CommonSubset,
|
||||
/// Reliable Broadcast instance.
|
||||
Broadcast(NodeUid),
|
||||
/// Binary Agreement instance.
|
||||
Agreement(NodeUid),
|
||||
}
|
||||
|
||||
impl Iterator for Algorithm {
|
||||
type Item = String;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
Some(format!("{:?}", self))
|
||||
}
|
||||
}
|
||||
|
||||
/// Type of proposed (encrypted) value for consensus.
|
||||
pub type ProposedValue = Vec<u8>;
|
||||
|
||||
/// Kinds of messages sent between algorithm instances.
|
||||
#[derive(Clone)]
|
||||
pub enum AlgoMessage {
|
||||
/// Asynchronous common subset input.
|
||||
CommonSubsetInput(ProposedValue),
|
||||
/// Asynchronous common subset output.
|
||||
CommonSubsetOutput(HashSet<ProposedValue>),
|
||||
/// Broadcast instance input.
|
||||
BroadcastInput(ProposedValue),
|
||||
/// Broadcast instance output.
|
||||
BroadcastOutput(NodeUid, ProposedValue),
|
||||
/// Binary agreement instance input.
|
||||
AgreementInput(bool),
|
||||
/// Binary agreement instance output.
|
||||
AgreementOutput(NodeUid, bool),
|
||||
}
|
||||
|
||||
/// A message sent between algorithm instances.
|
||||
#[derive(Clone)]
|
||||
pub struct LocalMessage {
|
||||
/// Identifier of the message destination algorithm.
|
||||
pub dst: Algorithm,
|
||||
/// Payload
|
||||
pub message: AlgoMessage,
|
||||
}
|
||||
|
||||
/// The message destinations corresponding to a remote node `i`. It can be
|
||||
/// either of the two:
|
||||
///
|
||||
/// 1) `All`: all nodes if sent to socket tasks, or all local algorithm
|
||||
/// instances if received from socket tasks.
|
||||
///
|
||||
/// 2) `Node(i)`: node `i` or local algorithm instances with the node ID `i`.
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub enum RemoteNode {
|
||||
All,
|
||||
Node(NodeUid),
|
||||
}
|
||||
|
||||
/// Message to or from a remote node.
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct RemoteMessage {
|
||||
pub node: RemoteNode,
|
||||
pub message: Message<ProposedValue>,
|
||||
}
|
||||
|
||||
/// The union type of local and remote messages.
|
||||
#[derive(Clone)]
|
||||
pub enum QMessage {
|
||||
Local(LocalMessage),
|
||||
Remote(RemoteMessage),
|
||||
}
|
||||
|
||||
/// States of the message loop consided as an automaton with output. There is
|
||||
/// one exit state `Finished` and one transitional (also initial) state
|
||||
/// `Processing` whose argument is an output queue of messages to be sent to
|
||||
/// remote nodes.
|
||||
#[derive(Clone, PartialEq)]
|
||||
pub enum MessageLoopState {
|
||||
Processing(VecDeque<RemoteMessage>),
|
||||
Finished,
|
||||
}
|
||||
|
||||
impl MessageLoopState {
|
||||
pub fn is_processing(&self) -> bool {
|
||||
if let MessageLoopState::Processing(_) = self {
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Appends pending messages of another state. Used to append messages
|
||||
/// emitted by the handler to the messages already queued from previous
|
||||
/// iterations of a message handling loop.
|
||||
pub fn append(&mut self, other: &mut MessageLoopState) {
|
||||
if let MessageLoopState::Processing(ref mut new_msgs) = other {
|
||||
if let MessageLoopState::Processing(ref mut msgs) = self {
|
||||
msgs.append(new_msgs);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Abstract type of message handler callback. A callback function has two
|
||||
/// arguments: the sent message and the TX handle to send replies back to the
|
||||
/// message loop. A call to the function returns either a new message loop state
|
||||
/// - either `Finished` or a state with outgoing messages to remote nodes - or
|
||||
/// an error.
|
||||
pub trait Handler<HandlerError: From<Error>>: Send + Sync {
|
||||
fn handle(&self, m: QMessage, tx: Sender<QMessage>) -> Result<MessageLoopState, HandlerError>;
|
||||
}
|
||||
|
||||
/// The queue functionality for messages sent between algorithm instances.
|
||||
pub struct MessageLoop<'a, HandlerError: 'a + From<Error>> {
|
||||
/// Algorithm message handlers. Every message handler receives a message and
|
||||
/// the TX handle of the incoming message queue for sending replies back to
|
||||
/// the message loop.
|
||||
algos: RwLock<HashMap<Algorithm, &'a Handler<HandlerError>>>,
|
||||
/// TX handle of the message queue.
|
||||
queue_tx: Sender<QMessage>,
|
||||
/// RX handle of the message queue.
|
||||
queue_rx: Receiver<QMessage>,
|
||||
/// Remote send handles. Messages are sent through channels as opposed to
|
||||
/// directly to sockets. This is done to make tests independent of socket
|
||||
/// IO.
|
||||
remote_txs: HashMap<NodeUid, Sender<Message<ProposedValue>>>,
|
||||
}
|
||||
|
||||
impl<'a, HandlerError> MessageLoop<'a, HandlerError>
|
||||
where
|
||||
HandlerError: 'a + From<Error>,
|
||||
{
|
||||
pub fn new(remote_txs: HashMap<NodeUid, Sender<Message<ProposedValue>>>) -> Self {
|
||||
let (queue_tx, queue_rx) = unbounded();
|
||||
MessageLoop {
|
||||
algos: RwLock::new(HashMap::new()),
|
||||
queue_tx,
|
||||
queue_rx,
|
||||
remote_txs,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn queue_tx(&self) -> Sender<QMessage> {
|
||||
self.queue_tx.clone()
|
||||
}
|
||||
|
||||
/// Registers a handler for messages sent to the given algorithm.
|
||||
pub fn insert_algo(&'a self, algo: Algorithm, handler: &'a Handler<HandlerError>) {
|
||||
let lock = self.algos.write();
|
||||
if let Ok(mut map) = lock {
|
||||
map.insert(algo, handler);
|
||||
} else {
|
||||
error!("Cannot insert {:?}", algo);
|
||||
}
|
||||
}
|
||||
|
||||
/// Unregisters the handler for messages sent to the given algorithm.
|
||||
pub fn remove_algo(&self, algo: &Algorithm) {
|
||||
let lock = self.algos.write();
|
||||
if let Ok(mut map) = lock {
|
||||
map.remove(algo);
|
||||
} else {
|
||||
error!("Cannot remove {:?}", algo);
|
||||
}
|
||||
}
|
||||
|
||||
/// The message loop.
|
||||
pub fn run(&self) -> Result<MessageLoopState, HandlerError> {
|
||||
let mut result = Ok(MessageLoopState::Processing(VecDeque::new()));
|
||||
|
||||
while let Ok(mut state) = result {
|
||||
// Send any outgoing messages to remote nodes using the provided
|
||||
// function.
|
||||
(if let MessageLoopState::Processing(messages) = &state {
|
||||
self.send_remote(messages)
|
||||
.map(|_| MessageLoopState::Processing(VecDeque::new()))
|
||||
.map_err(HandlerError::from)
|
||||
} else {
|
||||
Ok(MessageLoopState::Finished)
|
||||
})?;
|
||||
|
||||
// Receive local and remote messages.
|
||||
if let Ok(m) = self.queue_rx.recv() {
|
||||
result = match m {
|
||||
QMessage::Local(LocalMessage { dst, message }) => {
|
||||
// FIXME: error handling
|
||||
if let Some(mut handler) = self.algos.write().unwrap().get_mut(&dst) {
|
||||
let mut new_result = handler.handle(
|
||||
QMessage::Local(LocalMessage { dst, message }),
|
||||
self.queue_tx.clone(),
|
||||
);
|
||||
if let Ok(ref mut new_state) = new_result {
|
||||
state.append(new_state);
|
||||
Ok(state)
|
||||
} else {
|
||||
// Error overrides the previous state.
|
||||
new_result
|
||||
}
|
||||
} else {
|
||||
Err(Error::NoSuchAlgorithm).map_err(HandlerError::from)
|
||||
}
|
||||
}
|
||||
|
||||
// A message FROM a remote node.
|
||||
QMessage::Remote(RemoteMessage { node, message }) => {
|
||||
// Multicast the message to all algorithm instances,
|
||||
// collecting output messages iteratively and appending them
|
||||
// to result.
|
||||
//
|
||||
// FIXME: error handling
|
||||
self.algos.write().unwrap().iter_mut().fold(
|
||||
Ok(state),
|
||||
|result1, (_, handler)| {
|
||||
if let Ok(mut state1) = result1 {
|
||||
handler
|
||||
.handle(
|
||||
QMessage::Remote(RemoteMessage {
|
||||
node: node.clone(),
|
||||
message: message.clone(),
|
||||
}),
|
||||
self.queue_tx.clone(),
|
||||
)
|
||||
.map(|ref mut state2| {
|
||||
state1.append(state2);
|
||||
state1
|
||||
})
|
||||
} else {
|
||||
result1
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
result = Err(Error::RecvError).map_err(HandlerError::from)
|
||||
}
|
||||
} // end of while loop
|
||||
result
|
||||
}
|
||||
|
||||
/// Send a message queue to remote nodes.
|
||||
fn send_remote(&self, messages: &VecDeque<RemoteMessage>) -> Result<(), Error> {
|
||||
messages.iter().fold(Ok(()), |result, m| {
|
||||
if result.is_err() {
|
||||
result
|
||||
} else {
|
||||
match m {
|
||||
RemoteMessage {
|
||||
node: RemoteNode::Node(uid),
|
||||
message,
|
||||
} => {
|
||||
if let Some(tx) = self.remote_txs.get(&uid) {
|
||||
tx.send(message.clone()).map_err(Error::from)
|
||||
} else {
|
||||
Err(Error::SendError)
|
||||
}
|
||||
}
|
||||
|
||||
RemoteMessage {
|
||||
node: RemoteNode::All,
|
||||
message,
|
||||
} => self.remote_txs.iter().fold(result, |result1, (_, tx)| {
|
||||
if result1.is_err() {
|
||||
result1
|
||||
} else {
|
||||
tx.send(message.clone()).map_err(Error::from)
|
||||
}
|
||||
}),
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
/// Message sent by a given source. The sources are consensus nodes indexed 1
|
||||
/// through N where N is the total number of nodes. Sourced messages are
|
||||
/// required when it is essential to know the message origin but the set of
|
||||
/// recepients is unknown without further computation which is irrelevant to the
|
||||
/// message delivery task.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SourcedMessage<T: Clone + Debug + Send + Sync> {
|
||||
pub source: usize,
|
||||
pub message: Message<T>,
|
||||
}
|
||||
|
||||
/// Message destination can be either of the two:
|
||||
|
@ -325,207 +44,3 @@ impl<T: Clone + Debug + Send + Sync> TargetedMessage<T> {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Message sent by a given source. The sources are consensus nodes indexed 1
|
||||
/// through N where N is the total number of nodes. Sourced messages are
|
||||
/// required when it is essential to know the message origin but the set of
|
||||
/// recepients is unknown without further computation which is irrelevant to the
|
||||
/// message delivery task.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SourcedMessage<T: Clone + Debug + Send + Sync> {
|
||||
pub source: usize,
|
||||
pub message: Message<T>,
|
||||
}
|
||||
|
||||
/// The messaging struct allows for targeted message exchange between comms
|
||||
/// tasks on one side and algo tasks on the other.
|
||||
pub struct Messaging<T: Clone + Debug + Send + Sync> {
|
||||
/// The total number of consensus nodes for indexing purposes.
|
||||
num_nodes: usize,
|
||||
|
||||
/// Transmit sides of message channels to comms threads.
|
||||
txs_to_comms: Vec<Sender<Message<T>>>,
|
||||
/// Receive side of the routed message channel from comms threads.
|
||||
rx_from_comms: Receiver<SourcedMessage<T>>,
|
||||
/// Transmit sides of message channels to algo threads.
|
||||
txs_to_algo: Vec<Sender<SourcedMessage<T>>>,
|
||||
/// Receive side of the routed message channel from comms threads.
|
||||
rx_from_algo: Receiver<TargetedMessage<T>>,
|
||||
|
||||
/// RX handles to be used by comms tasks.
|
||||
rxs_to_comms: Vec<Receiver<Message<T>>>,
|
||||
/// TX handle to be used by comms tasks.
|
||||
tx_from_comms: Sender<SourcedMessage<T>>,
|
||||
/// RX handles to be used by algo tasks.
|
||||
rxs_to_algo: Vec<Receiver<SourcedMessage<T>>>,
|
||||
/// TX handle to be used by algo tasks.
|
||||
tx_from_algo: Sender<TargetedMessage<T>>,
|
||||
|
||||
/// Control channel used to stop the listening thread.
|
||||
stop_tx: Sender<()>,
|
||||
stop_rx: Receiver<()>,
|
||||
}
|
||||
|
||||
impl<T: Clone + Debug + Send + Sync> Messaging<T> {
|
||||
/// Initialises all the required TX and RX handles for the case on a total
|
||||
/// number `num_nodes` of consensus nodes.
|
||||
pub fn new(num_nodes: usize) -> Self {
|
||||
let to_comms: Vec<_> = (0..num_nodes - 1)
|
||||
.map(|_| unbounded::<Message<T>>())
|
||||
.collect();
|
||||
let txs_to_comms = to_comms.iter().map(|&(ref tx, _)| tx.to_owned()).collect();
|
||||
let rxs_to_comms: Vec<Receiver<Message<T>>> =
|
||||
to_comms.iter().map(|&(_, ref rx)| rx.to_owned()).collect();
|
||||
let (tx_from_comms, rx_from_comms) = unbounded();
|
||||
let to_algo: Vec<_> = (0..num_nodes)
|
||||
.map(|_| unbounded::<SourcedMessage<T>>())
|
||||
.collect();
|
||||
let txs_to_algo = to_algo.iter().map(|&(ref tx, _)| tx.to_owned()).collect();
|
||||
let rxs_to_algo: Vec<Receiver<SourcedMessage<T>>> =
|
||||
to_algo.iter().map(|&(_, ref rx)| rx.to_owned()).collect();
|
||||
let (tx_from_algo, rx_from_algo) = unbounded();
|
||||
|
||||
let (stop_tx, stop_rx) = bounded(1);
|
||||
|
||||
Messaging {
|
||||
num_nodes,
|
||||
|
||||
// internally used handles
|
||||
txs_to_comms,
|
||||
rx_from_comms,
|
||||
txs_to_algo,
|
||||
rx_from_algo,
|
||||
|
||||
// externally used handles
|
||||
rxs_to_comms,
|
||||
tx_from_comms,
|
||||
rxs_to_algo,
|
||||
tx_from_algo,
|
||||
|
||||
stop_tx,
|
||||
stop_rx,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn num_nodes(&self) -> usize {
|
||||
self.num_nodes
|
||||
}
|
||||
|
||||
pub fn rxs_to_comms(&self) -> &Vec<Receiver<Message<T>>> {
|
||||
&self.rxs_to_comms
|
||||
}
|
||||
|
||||
pub fn tx_from_comms(&self) -> &Sender<SourcedMessage<T>> {
|
||||
&self.tx_from_comms
|
||||
}
|
||||
|
||||
pub fn rxs_to_algo(&self) -> &Vec<Receiver<SourcedMessage<T>>> {
|
||||
&self.rxs_to_algo
|
||||
}
|
||||
|
||||
pub fn tx_from_algo(&self) -> &Sender<TargetedMessage<T>> {
|
||||
&self.tx_from_algo
|
||||
}
|
||||
|
||||
/// Gives the ownership of the handle to stop the message receive loop.
|
||||
pub fn stop_tx(&self) -> Sender<()> {
|
||||
self.stop_tx.to_owned()
|
||||
}
|
||||
|
||||
/// Spawns the message delivery thread in a given thread scope.
|
||||
pub fn spawn<'a>(&self, scope: &Scope<'a>) -> ScopedJoinHandle<Result<(), Error>>
|
||||
where
|
||||
T: 'a,
|
||||
{
|
||||
let txs_to_comms = self.txs_to_comms.to_owned();
|
||||
let rx_from_comms = self.rx_from_comms.to_owned();
|
||||
let txs_to_algo = self.txs_to_algo.to_owned();
|
||||
let rx_from_algo = self.rx_from_algo.to_owned();
|
||||
|
||||
let stop_rx = self.stop_rx.to_owned();
|
||||
let mut stop = false;
|
||||
|
||||
// TODO: `select_loop!` seems to really confuse Clippy.
|
||||
#[cfg_attr(
|
||||
feature = "cargo-clippy",
|
||||
allow(never_loop, if_let_redundant_pattern_matching, deref_addrof)
|
||||
)]
|
||||
scope.spawn(move || {
|
||||
let mut result = Ok(());
|
||||
// This loop forwards messages according to their metadata.
|
||||
while !stop && result.is_ok() {
|
||||
select_loop! {
|
||||
recv(rx_from_algo, message) => {
|
||||
match message {
|
||||
TargetedMessage {
|
||||
target: Target::All,
|
||||
message
|
||||
} => {
|
||||
// Send the message to all remote nodes, stopping at
|
||||
// the first error.
|
||||
result = txs_to_comms.iter()
|
||||
.fold(Ok(()), |result, tx| {
|
||||
if result.is_ok() {
|
||||
tx.send(message.clone())
|
||||
}
|
||||
else {
|
||||
result
|
||||
}
|
||||
}).map_err(Error::from);
|
||||
},
|
||||
TargetedMessage {
|
||||
target: Target::Node(i),
|
||||
message
|
||||
} => {
|
||||
// Remote node indices start from 1.
|
||||
assert!(i > 0);
|
||||
// Convert node index to vector index.
|
||||
let i = i - 1;
|
||||
|
||||
result = if i < txs_to_comms.len() {
|
||||
txs_to_comms[i].send(message.clone())
|
||||
.map_err(Error::from)
|
||||
}
|
||||
else {
|
||||
Err(Error::NoSuchTarget)
|
||||
};
|
||||
}
|
||||
}
|
||||
},
|
||||
recv(rx_from_comms, message) => {
|
||||
// Send the message to all algorithm instances, stopping at
|
||||
// the first error.
|
||||
result = txs_to_algo.iter().fold(Ok(()), |result, tx| {
|
||||
if result.is_ok() {
|
||||
tx.send(message.clone())
|
||||
}
|
||||
else {
|
||||
result
|
||||
}
|
||||
}).map_err(Error::from)
|
||||
},
|
||||
recv(stop_rx, _) => {
|
||||
// Flag the thread ready to exit.
|
||||
stop = true;
|
||||
}
|
||||
}
|
||||
} // end of select_loop!
|
||||
result
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum Error {
|
||||
NoSuchAlgorithm,
|
||||
NoSuchRemote,
|
||||
RecvError,
|
||||
NoSuchTarget,
|
||||
SendError,
|
||||
}
|
||||
|
||||
impl<T> From<crossbeam_channel::SendError<T>> for Error {
|
||||
fn from(_: crossbeam_channel::SendError<T>) -> Error {
|
||||
Error::SendError
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ pub enum BroadcastMessage<T: Send + Sync> {
|
|||
Ready(Vec<u8>),
|
||||
}
|
||||
|
||||
/// Wrapper for a byte array, whose `Debug` implementation outputs shortened hexadecimal strings.
|
||||
pub struct HexBytes<'a>(pub &'a [u8]);
|
||||
|
||||
impl<'a> fmt::Debug for HexBytes<'a> {
|
||||
|
@ -166,6 +167,7 @@ impl<T: Send + Sync> BroadcastMessage<T> {
|
|||
BroadcastMessage::Ready(h) => {
|
||||
let mut r = ReadyProto::new();
|
||||
r.set_root_hash(h);
|
||||
b.set_ready(r);
|
||||
}
|
||||
}
|
||||
b
|
||||
|
|
106
src/proto_io.rs
106
src/proto_io.rs
|
@ -3,9 +3,9 @@
|
|||
use proto::*;
|
||||
use protobuf;
|
||||
use protobuf::Message as ProtobufMessage;
|
||||
use std::io::Read;
|
||||
use std::io;
|
||||
use std::io::{Read, Write};
|
||||
use std::net::TcpStream;
|
||||
use std::{cmp, io};
|
||||
|
||||
/// A magic key to put right before each message. An atavism of primitive serial
|
||||
/// protocols.
|
||||
|
@ -35,95 +35,48 @@ impl From<protobuf::ProtobufError> for Error {
|
|||
}
|
||||
}
|
||||
|
||||
fn encode_u32_to_be(value: u32, buffer: &mut [u8]) -> Result<(), Error> {
|
||||
if buffer.len() < 4 {
|
||||
return Err(Error::EncodeError);
|
||||
}
|
||||
let value = value.to_le();
|
||||
buffer[0] = ((value & 0xFF00_0000) >> 24) as u8;
|
||||
buffer[1] = ((value & 0x00FF_0000) >> 16) as u8;
|
||||
buffer[2] = ((value & 0x0000_FF00) >> 8) as u8;
|
||||
buffer[3] = (value & 0x0000_00FF) as u8;
|
||||
Ok(())
|
||||
pub struct ProtoIo<S: Read + Write> {
|
||||
stream: S,
|
||||
}
|
||||
|
||||
fn decode_u32_from_be(buffer: &[u8]) -> Result<u32, Error> {
|
||||
if buffer.len() < 4 {
|
||||
return Err(Error::DecodeError);
|
||||
impl ProtoIo<TcpStream> {
|
||||
pub fn try_clone(&self) -> Result<ProtoIo<TcpStream>, ::std::io::Error> {
|
||||
Ok(ProtoIo {
|
||||
stream: self.stream.try_clone()?,
|
||||
})
|
||||
}
|
||||
let mut result = u32::from(buffer[0]);
|
||||
result <<= 8;
|
||||
result += u32::from(buffer[1]);
|
||||
result <<= 8;
|
||||
result += u32::from(buffer[2]);
|
||||
result <<= 8;
|
||||
result += u32::from(buffer[3]);
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub struct ProtoIo {
|
||||
stream: TcpStream,
|
||||
buffer: [u8; 1024 * 4],
|
||||
}
|
||||
|
||||
/// A message handling task.
|
||||
impl ProtoIo
|
||||
impl<S: Read + Write> ProtoIo<S>
|
||||
//where T: Clone + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>
|
||||
{
|
||||
pub fn from_stream(stream: TcpStream) -> Self {
|
||||
ProtoIo {
|
||||
stream,
|
||||
buffer: [0; 1024 * 4],
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_clone(&self) -> Result<ProtoIo, ::std::io::Error> {
|
||||
Ok(ProtoIo {
|
||||
stream: self.stream.try_clone()?,
|
||||
buffer: self.buffer,
|
||||
})
|
||||
pub fn from_stream(stream: S) -> Self {
|
||||
ProtoIo { stream }
|
||||
}
|
||||
|
||||
pub fn recv<T>(&mut self) -> Result<Message<T>, Error>
|
||||
where
|
||||
T: Clone + Send + Sync + From<Vec<u8>>, // + Into<Vec<u8>>
|
||||
{
|
||||
self.stream.read_exact(&mut self.buffer[0..4])?;
|
||||
let frame_start = decode_u32_from_be(&self.buffer[0..4])?;
|
||||
if frame_start != FRAME_START {
|
||||
let mut stream = protobuf::CodedInputStream::new(&mut self.stream);
|
||||
// Read magic number
|
||||
if stream.read_raw_varint32()? != FRAME_START {
|
||||
return Err(Error::FrameStartMismatch);
|
||||
};
|
||||
self.stream.read_exact(&mut self.buffer[0..4])?;
|
||||
let size = decode_u32_from_be(&self.buffer[0..4])? as usize;
|
||||
|
||||
let mut message_v: Vec<u8> = Vec::new();
|
||||
message_v.reserve(size);
|
||||
while message_v.len() < size {
|
||||
let num_to_read = cmp::min(self.buffer.len(), size - message_v.len());
|
||||
let (slice, _) = self.buffer.split_at_mut(num_to_read);
|
||||
self.stream.read_exact(slice)?;
|
||||
message_v.extend_from_slice(slice);
|
||||
}
|
||||
|
||||
Message::parse_from_bytes(&message_v).map_err(Error::ProtobufError)
|
||||
Message::from_proto(stream.read_message()?).ok_or(Error::DecodeError)
|
||||
}
|
||||
|
||||
pub fn send<T>(&mut self, message: Message<T>) -> Result<(), Error>
|
||||
where
|
||||
T: Clone + Send + Sync + Into<Vec<u8>>,
|
||||
{
|
||||
let mut buffer: [u8; 4] = [0; 4];
|
||||
// Wrap stream
|
||||
let mut stream = protobuf::CodedOutputStream::new(&mut self.stream);
|
||||
// Write magic number
|
||||
encode_u32_to_be(FRAME_START, &mut buffer[0..4])?;
|
||||
stream.write_raw_bytes(&buffer)?;
|
||||
stream.write_raw_varint32(FRAME_START)?;
|
||||
let message_p = message.into_proto();
|
||||
// Write message size
|
||||
encode_u32_to_be(message_p.compute_size(), &mut buffer[0..4])?;
|
||||
stream.write_raw_bytes(&buffer)?;
|
||||
// Write message
|
||||
message_p.write_to(&mut stream)?;
|
||||
message_p.write_length_delimited_to(&mut stream)?;
|
||||
// Flush
|
||||
stream.flush()?;
|
||||
Ok(())
|
||||
|
@ -133,14 +86,23 @@ impl ProtoIo
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use proto_io::*;
|
||||
use std::io::Cursor;
|
||||
|
||||
/// Test the requirement that composing encoding with decoding yields the
|
||||
/// identity.
|
||||
#[test]
|
||||
fn encode_decode_identity() {
|
||||
let mut buffer: [u8; 4] = [0; 4];
|
||||
encode_u32_to_be(FRAME_START, &mut buffer[0..4]).unwrap();
|
||||
let frame_start = decode_u32_from_be(&buffer[0..4]).unwrap();
|
||||
assert_eq!(frame_start, FRAME_START);
|
||||
fn encode_decode_message() {
|
||||
let msg0: Message<Vec<u8>> =
|
||||
Message::Broadcast(BroadcastMessage::Ready(b"Test 0".to_vec()));
|
||||
let msg1: Message<Vec<u8>> =
|
||||
Message::Broadcast(BroadcastMessage::Ready(b"Test 1".to_vec()));
|
||||
let mut pio = ProtoIo::from_stream(Cursor::new(Vec::new()));
|
||||
pio.send(msg0.clone()).expect("send msg0");
|
||||
pio.send(msg1.clone()).expect("send msg1");
|
||||
println!("{:?}", pio.stream.get_ref());
|
||||
pio.stream.set_position(0);
|
||||
assert_eq!(msg0, pio.recv().expect("recv msg0"));
|
||||
// TODO: Figure out why the cursor is wrong here.
|
||||
let len = pio.stream.get_ref().len() as u64;
|
||||
pio.stream.set_position(len / 2);
|
||||
assert_eq!(msg1, pio.recv().expect("recv msg1"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,12 +14,13 @@ use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
|
|||
use std::fmt;
|
||||
|
||||
use hbbft::broadcast::{Broadcast, BroadcastTarget, TargetedBroadcastMessage};
|
||||
use hbbft::messaging::ProposedValue;
|
||||
use hbbft::proto::BroadcastMessage;
|
||||
|
||||
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy)]
|
||||
struct NodeId(usize);
|
||||
|
||||
type ProposedValue = Vec<u8>;
|
||||
|
||||
type MessageQueue = VecDeque<TargetedBroadcastMessage<NodeId>>;
|
||||
|
||||
/// A "node" running a broadcast instance.
|
||||
|
|
Loading…
Reference in New Issue