Merge pull request #14 from poanetwork/afck--types

Simplify the message types.
This commit is contained in:
Vladimir Komendantskiy 2018-05-10 17:23:08 +01:00 committed by GitHub
commit a83536dc6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 190 additions and 385 deletions

View File

@ -5,18 +5,18 @@ authors = ["Vladimir Komendantskiy <komendantsky@gmail.com>"]
[dependencies]
env_logger = "0.5.10"
itertools = "0.7"
log = "0.4.1"
reed-solomon-erasure = "3.0"
merkle = { git = "https://github.com/vkomenda/merkle.rs", branch = "public-proof" }
ring = "^0.12"
protobuf = "1.4.4"
crossbeam = "0.3.2"
crossbeam-channel = "0.1"
itertools = "0.7"
ring = "^0.12"
[build-dependencies]
protoc-rust = "1.4.4"
[dev-dependencies]
crossbeam = "0.3.2"
crossbeam-channel = "0.1"
docopt = "0.8"
rand = "0.3"

View File

@ -8,6 +8,7 @@ extern crate env_logger;
extern crate hbbft;
#[macro_use]
extern crate log;
extern crate protobuf;
mod network;

View File

@ -3,14 +3,12 @@
//! `crossbeam_channel::unbounded()`.
use crossbeam;
use crossbeam_channel::{Receiver, Sender};
use std::fmt::Debug;
use std::io;
use std::net::TcpStream;
use std::sync::Arc;
use hbbft::messaging::SourcedMessage;
use hbbft::proto::Message;
use hbbft::proto_io::{self, ProtoIo};
use protobuf::{Message, MessageStatic};
#[derive(Debug)]
pub enum Error {
@ -25,21 +23,21 @@ impl From<io::Error> for Error {
/// A communication task connects a remote node to the thread that manages the
/// consensus algorithm.
pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + AsRef<[u8]>> {
pub struct CommsTask<'a, P: 'a, M: 'a> {
/// The transmit side of the multiple producer channel from comms threads.
tx: &'a Sender<SourcedMessage<T>>,
tx: &'a Sender<SourcedMessage<M, usize>>,
/// The receive side of the channel to the comms thread.
rx: &'a Receiver<Message<T>>,
rx: &'a Receiver<M>,
/// The socket IO task.
io: ProtoIo<TcpStream>,
io: ProtoIo<TcpStream, P>,
/// The index of this comms task for identification against its remote node.
pub node_index: usize,
}
impl<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + AsRef<[u8]>> CommsTask<'a, T> {
impl<'a, P: Message + MessageStatic + 'a, M: Into<P> + From<P> + Send + 'a> CommsTask<'a, P, M> {
pub fn new(
tx: &'a Sender<SourcedMessage<T>>,
rx: &'a Receiver<Message<T>>,
tx: &'a Sender<SourcedMessage<M, usize>>,
rx: &'a Receiver<M>,
stream: TcpStream,
node_index: usize,
) -> Self {
@ -59,21 +57,21 @@ impl<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + AsRef<[u8]>> Comm
/// The main socket IO loop and an asynchronous thread responding to manager
/// thread requests.
pub fn run(&mut self) -> Result<(), Error> {
pub fn run(mut self) -> Result<(), Error> {
// Borrow parts of `self` before entering the thread binding scope.
let tx = Arc::new(self.tx);
let rx = Arc::new(self.rx);
let tx = self.tx;
let rx = self.rx;
let mut io1 = self.io.try_clone()?;
let node_index = self.node_index;
crossbeam::scope(|scope| {
crossbeam::scope(move |scope| {
// Local comms receive loop thread.
scope.spawn(move || {
loop {
// Receive a multicast message from the manager thread.
let message = rx.recv().unwrap();
// Forward the message to the remote node.
io1.send(message).unwrap();
io1.send(&message.into()).unwrap();
}
});
@ -84,7 +82,7 @@ impl<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + AsRef<[u8]>> Comm
Ok(message) => {
tx.send(SourcedMessage {
source: node_index,
message,
message: message.into(),
}).unwrap();
}
Err(proto_io::Error::ProtobufError(e)) => {

View File

@ -3,43 +3,41 @@ use crossbeam::{Scope, ScopedJoinHandle};
use crossbeam_channel;
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use hbbft::messaging::{SourcedMessage, Target, TargetedMessage};
use hbbft::proto::Message;
use std::fmt::Debug;
/// The queue functionality for messages sent between algorithm instances.
/// The messaging struct allows for targeted message exchange between comms
/// tasks on one side and algo tasks on the other.
pub struct Messaging<T: Clone + Debug + Send + Sync + AsRef<[u8]>> {
pub struct Messaging<M> {
/// Transmit sides of message channels to comms threads.
txs_to_comms: Vec<Sender<Message<T>>>,
txs_to_comms: Vec<Sender<M>>,
/// Receive side of the routed message channel from comms threads.
rx_from_comms: Receiver<SourcedMessage<T>>,
rx_from_comms: Receiver<SourcedMessage<M, usize>>,
/// Transmit sides of message channels to algo thread.
tx_to_algo: Sender<SourcedMessage<T>>,
tx_to_algo: Sender<SourcedMessage<M, usize>>,
/// Receive side of the routed message channel from comms threads.
rx_from_algo: Receiver<TargetedMessage<T>>,
rx_from_algo: Receiver<TargetedMessage<M, usize>>,
/// RX handles to be used by comms tasks.
rxs_to_comms: Vec<Receiver<Message<T>>>,
rxs_to_comms: Vec<Receiver<M>>,
/// TX handle to be used by comms tasks.
tx_from_comms: Sender<SourcedMessage<T>>,
tx_from_comms: Sender<SourcedMessage<M, usize>>,
/// RX handles to be used by algo task.
rx_to_algo: Receiver<SourcedMessage<T>>,
rx_to_algo: Receiver<SourcedMessage<M, usize>>,
/// TX handle to be used by algo task.
tx_from_algo: Sender<TargetedMessage<T>>,
tx_from_algo: Sender<TargetedMessage<M, usize>>,
/// Control channel used to stop the listening thread.
stop_tx: Sender<()>,
stop_rx: Receiver<()>,
}
impl<T: Clone + Debug + Send + Sync + AsRef<[u8]>> Messaging<T> {
impl<M: Send> Messaging<M> {
/// Initialises all the required TX and RX handles for the case on a total
/// number `num_nodes` of consensus nodes.
pub fn new(num_nodes: usize) -> Self {
let to_comms: Vec<_> = (0..num_nodes).map(|_| unbounded::<Message<T>>()).collect();
let to_comms: Vec<_> = (0..num_nodes).map(|_| unbounded::<M>()).collect();
let txs_to_comms = to_comms.iter().map(|&(ref tx, _)| tx.to_owned()).collect();
let rxs_to_comms: Vec<Receiver<Message<T>>> =
let rxs_to_comms: Vec<Receiver<M>> =
to_comms.iter().map(|&(_, ref rx)| rx.to_owned()).collect();
let (tx_from_comms, rx_from_comms) = unbounded();
@ -66,19 +64,19 @@ impl<T: Clone + Debug + Send + Sync + AsRef<[u8]>> Messaging<T> {
}
}
pub fn rxs_to_comms(&self) -> &Vec<Receiver<Message<T>>> {
pub fn rxs_to_comms(&self) -> &Vec<Receiver<M>> {
&self.rxs_to_comms
}
pub fn tx_from_comms(&self) -> &Sender<SourcedMessage<T>> {
pub fn tx_from_comms(&self) -> &Sender<SourcedMessage<M, usize>> {
&self.tx_from_comms
}
pub fn rx_to_algo(&self) -> &Receiver<SourcedMessage<T>> {
pub fn rx_to_algo(&self) -> &Receiver<SourcedMessage<M, usize>> {
&self.rx_to_algo
}
pub fn tx_from_algo(&self) -> &Sender<TargetedMessage<T>> {
pub fn tx_from_algo(&self) -> &Sender<TargetedMessage<M, usize>> {
&self.tx_from_algo
}
@ -90,7 +88,7 @@ impl<T: Clone + Debug + Send + Sync + AsRef<[u8]>> Messaging<T> {
/// Spawns the message delivery thread in a given thread scope.
pub fn spawn<'a>(&self, scope: &Scope<'a>) -> ScopedJoinHandle<Result<(), Error>>
where
T: 'a,
M: Clone + 'a,
{
let txs_to_comms = self.txs_to_comms.to_owned();
let rx_from_comms = self.rx_from_comms.to_owned();
@ -110,29 +108,23 @@ impl<T: Clone + Debug + Send + Sync + AsRef<[u8]>> Messaging<T> {
// This loop forwards messages according to their metadata.
while !stop && result.is_ok() {
select_loop! {
recv(rx_from_algo, message) => {
match message {
TargetedMessage {
target: Target::All,
message
} => {
recv(rx_from_algo, tm) => {
match tm.target {
Target::All => {
// Send the message to all remote nodes, stopping at
// the first error.
result = txs_to_comms.iter()
.fold(Ok(()), |result, tx| {
if result.is_ok() {
tx.send(message.clone())
tx.send(tm.message.clone())
} else {
result
}
}).map_err(Error::from);
},
TargetedMessage {
target: Target::Node(i),
message
} => {
Target::Node(i) => {
result = if i < txs_to_comms.len() {
txs_to_comms[i].send(message.clone())
txs_to_comms[i].send(tm.message)
.map_err(Error::from)
} else {
Err(Error::NoSuchTarget)

View File

@ -41,7 +41,9 @@ use std::marker::{Send, Sync};
use std::net::SocketAddr;
use std::{io, iter, process, thread, time};
use hbbft::broadcast;
use hbbft::broadcast::{Broadcast, BroadcastMessage, TargetedBroadcastMessage};
use hbbft::messaging::SourcedMessage;
use hbbft::proto::message::BroadcastProto;
use network::commst;
use network::connection;
use network::messaging::Messaging;
@ -104,7 +106,7 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
}
// Initialise the message delivery system and obtain TX and RX handles.
let messaging: Messaging<Vec<u8>> = Messaging::new(num_nodes);
let messaging: Messaging<BroadcastMessage> = Messaging::new(num_nodes);
let rxs_to_comms = messaging.rxs_to_comms();
let tx_from_comms = messaging.tx_from_comms();
let rx_to_algo = messaging.rx_to_algo();
@ -121,23 +123,42 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
// corresponding to this instance, and no dedicated comms task. The
// node index is 0.
let broadcast_handle = scope.spawn(move || {
match broadcast::Instance::new(
tx_from_algo,
rx_to_algo,
value.to_owned(),
(0..num_nodes).collect(),
our_id,
proposer_id,
).run()
{
Ok(t) => {
let broadcast = Broadcast::new(our_id, proposer_id, (0..num_nodes).collect())
.expect("failed to instantiate broadcast");
if let Some(v) = value {
for msg in broadcast
.propose_value(v.clone().into())
.expect("propose value")
.into_iter()
.map(TargetedBroadcastMessage::into)
{
tx_from_algo.send(msg).expect("send from algo");
}
}
loop {
// Receive a message from the socket IO task.
let message = rx_to_algo.recv().expect("receive from algo");
let SourcedMessage { source: i, message } = message;
debug!("{} received from {}: {:?}", our_id, i, message);
let (opt_output, msgs) = broadcast
.handle_broadcast_message(&i, message)
.expect("handle broadcast message");
for msg in &msgs {
debug!("{} sending to {:?}: {:?}", our_id, msg.target, msg.message);
}
for msg in msgs.into_iter().map(TargetedBroadcastMessage::into) {
tx_from_algo.send(msg).expect("send from algo");
}
if let Some(output) = opt_output {
println!(
"Broadcast succeeded! Node {} output: {}",
our_id,
String::from_utf8(T::into(t)).unwrap()
String::from_utf8(output).unwrap()
);
break;
}
Err(e) => error!("Broadcast instance: {:?}", e),
}
});
@ -150,7 +171,7 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
let rx_to_comms = &rxs_to_comms[node_index];
scope.spawn(move || {
match commst::CommsTask::new(
match commst::CommsTask::<BroadcastProto, BroadcastMessage>::new(
tx_from_comms,
rx_to_comms,
// FIXME: handle error

View File

@ -1,6 +1,6 @@
#! /bin/bash
export RUST_LOG=hbbft=debug
export RUST_LOG=hbbft=debug,consensus_node=debug
cargo build --example consensus-node
cargo run --example consensus-node -- --bind-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --value=Foo &

View File

@ -4,8 +4,6 @@ use itertools::Itertools;
use std::collections::{BTreeSet, HashMap, VecDeque};
use std::hash::Hash;
use proto::message;
/// Type of output from the Agreement message handler. The first component is
/// the value on which the Agreement has decided, also called "output" in the
/// HoneyadgerBFT paper. The second component is a queue of messages to be sent
@ -21,36 +19,6 @@ pub enum AgreementMessage {
Aux((u32, bool)),
}
impl AgreementMessage {
pub fn into_proto(self) -> message::AgreementProto {
let mut p = message::AgreementProto::new();
match self {
AgreementMessage::BVal((e, b)) => {
p.set_epoch(e);
p.set_bval(b);
}
AgreementMessage::Aux((e, b)) => {
p.set_epoch(e);
p.set_aux(b);
}
}
p
}
// TODO: Re-enable lint once implemented.
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
pub fn from_proto(mp: message::AgreementProto) -> Option<Self> {
let epoch = mp.get_epoch();
if mp.has_bval() {
Some(AgreementMessage::BVal((epoch, mp.get_bval())))
} else if mp.has_aux() {
Some(AgreementMessage::Aux((epoch, mp.get_aux())))
} else {
None
}
}
}
/// Binary Agreement instance.
pub struct Agreement<NodeUid> {
/// The UID of the corresponding proposer node.

View File

@ -1,6 +1,5 @@
use crossbeam_channel::{Receiver, RecvError, SendError, Sender};
use merkle::proof::{Lemma, Positioned, Proof};
use merkle::{Hashable, MerkleTree};
use merkle::MerkleTree;
use proto::*;
use reed_solomon_erasure as rse;
use reed_solomon_erasure::ReedSolomon;
@ -8,42 +7,38 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt::{self, Debug};
use std::hash::Hash;
use std::iter;
use std::marker::{Send, Sync};
use std::sync::{RwLock, RwLockWriteGuard};
use messaging::{SourcedMessage, Target, TargetedMessage};
// TODO: Make this a generic argument of `Broadcast`.
type ProposedValue = Vec<u8>;
use messaging::{Target, TargetedMessage};
type MessageQueue<NodeUid> = VecDeque<TargetedBroadcastMessage<NodeUid>>;
/// The three kinds of message sent during the reliable broadcast stage of the
/// consensus algorithm.
#[derive(Clone, PartialEq)]
pub enum BroadcastMessage<T: Send + Sync> {
Value(Proof<T>),
Echo(Proof<T>),
pub enum BroadcastMessage {
Value(Proof<Vec<u8>>),
Echo(Proof<Vec<u8>>),
Ready(Vec<u8>),
}
impl BroadcastMessage<ProposedValue> {
impl BroadcastMessage {
fn target_all<NodeUid>(self) -> TargetedBroadcastMessage<NodeUid> {
TargetedBroadcastMessage {
target: BroadcastTarget::All,
target: Target::All,
message: self,
}
}
fn target_node<NodeUid>(self, id: NodeUid) -> TargetedBroadcastMessage<NodeUid> {
TargetedBroadcastMessage {
target: BroadcastTarget::Node(id),
target: Target::Node(id),
message: self,
}
}
}
impl<T: Send + Sync + Debug + AsRef<[u8]>> fmt::Debug for BroadcastMessage<T> {
impl fmt::Debug for BroadcastMessage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
BroadcastMessage::Value(ref v) => write!(f, "Value({:?})", HexProof(&v)),
@ -56,36 +51,20 @@ impl<T: Send + Sync + Debug + AsRef<[u8]>> fmt::Debug for BroadcastMessage<T> {
/// A `BroadcastMessage` to be sent out, together with a target.
#[derive(Clone, Debug)]
pub struct TargetedBroadcastMessage<NodeUid> {
pub target: BroadcastTarget<NodeUid>,
pub message: BroadcastMessage<ProposedValue>,
pub target: Target<NodeUid>,
pub message: BroadcastMessage,
}
impl From<TargetedBroadcastMessage<usize>> for TargetedMessage<ProposedValue> {
fn from(msg: TargetedBroadcastMessage<usize>) -> TargetedMessage<ProposedValue> {
impl From<TargetedBroadcastMessage<usize>> for TargetedMessage<BroadcastMessage, usize> {
fn from(msg: TargetedBroadcastMessage<usize>) -> TargetedMessage<BroadcastMessage, usize> {
TargetedMessage {
target: msg.target.into(),
message: Message::Broadcast(msg.message),
target: msg.target,
message: msg.message,
}
}
}
/// A target node for a `BroadcastMessage`.
#[derive(Clone, Debug)]
pub enum BroadcastTarget<NodeUid> {
All,
Node(NodeUid),
}
impl From<BroadcastTarget<usize>> for Target {
fn from(bt: BroadcastTarget<usize>) -> Target {
match bt {
BroadcastTarget::All => Target::All,
BroadcastTarget::Node(node) => Target::Node(node),
}
}
}
struct BroadcastState<NodeUid: Eq + Hash + Ord> {
struct BroadcastState<NodeUid> {
/// Whether we have already multicas `Echo`.
echo_sent: bool,
/// Whether we have already multicast `Ready`.
@ -156,7 +135,7 @@ impl<NodeUid: Eq + Hash + Ord> BroadcastState<NodeUid> {
/// eventually be able to decode (i.e. receive at least `f + 1` `Echo` messages).
/// * So a node with `2 * f + 1` `Ready`s and `f + 1` `Echos` will decode and _output_ the value,
/// knowing that every other good node will eventually do the same.
pub struct Broadcast<NodeUid: Eq + Hash + Ord> {
pub struct Broadcast<NodeUid> {
/// The UID of this node.
our_id: NodeUid,
/// The UID of the sending node.
@ -206,7 +185,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
}
/// Processes the proposed value input by broadcasting it.
pub fn propose_value(&self, value: ProposedValue) -> Result<MessageQueue<NodeUid>, Error> {
pub fn propose_value(&self, value: Vec<u8>) -> Result<MessageQueue<NodeUid>, Error> {
if self.our_id != self.proposer_id {
return Err(Error::UnexpectedMessage);
}
@ -233,7 +212,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
/// the broadcast instance.
fn send_shards(
&self,
mut value: ProposedValue,
mut value: Vec<u8>,
) -> Result<(Proof<Vec<u8>>, MessageQueue<NodeUid>), Error> {
let data_shard_num = self.coding.data_shard_count();
let parity_shard_num = self.coding.parity_shard_count();
@ -272,7 +251,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
// TODO: `MerkleTree` generates the wrong proof if a leaf occurs more than once, so we
// prepend an "index byte" to each shard. Consider using the `merkle_light` crate instead.
let shards_t: Vec<ProposedValue> = shards
let shards_t: Vec<Vec<u8>> = shards
.into_iter()
.enumerate()
.map(|(i, s)| iter::once(i as u8).chain(s.iter().cloned()).collect())
@ -308,8 +287,8 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
pub fn handle_broadcast_message(
&self,
sender_id: &NodeUid,
message: BroadcastMessage<ProposedValue>,
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
message: BroadcastMessage,
) -> Result<(Option<Vec<u8>>, MessageQueue<NodeUid>), Error> {
if !self.all_uids.contains(sender_id) {
return Err(Error::UnknownSender);
}
@ -327,7 +306,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
sender_id: &NodeUid,
p: Proof<Vec<u8>>,
mut state: RwLockWriteGuard<BroadcastState<NodeUid>>,
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
) -> Result<(Option<Vec<u8>>, MessageQueue<NodeUid>), Error> {
// If the sender is not the proposer, this is not the first `Value` or the proof is invalid,
// ignore.
if *sender_id != self.proposer_id {
@ -361,7 +340,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
sender_id: &NodeUid,
p: Proof<Vec<u8>>,
mut state: RwLockWriteGuard<BroadcastState<NodeUid>>,
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
) -> Result<(Option<Vec<u8>>, MessageQueue<NodeUid>), Error> {
// If the proof is invalid or the sender has already sent `Echo`, ignore.
if state.echos.contains_key(sender_id) {
info!(
@ -396,7 +375,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
sender_id: &NodeUid,
hash: &[u8],
mut state: RwLockWriteGuard<BroadcastState<NodeUid>>,
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
) -> Result<(Option<Vec<u8>>, MessageQueue<NodeUid>), Error> {
// If the sender has already sent a `Ready` before, ignore.
if state.readys.contains_key(sender_id) {
info!(
@ -428,7 +407,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
&self,
mut state: RwLockWriteGuard<BroadcastState<NodeUid>>,
hash: &[u8],
) -> Result<Option<ProposedValue>, Error> {
) -> Result<Option<Vec<u8>>, Error> {
if state.has_output || state.count_readys(hash) <= 2 * self.num_faulty_nodes
|| state.count_echos(hash) <= self.num_faulty_nodes
{
@ -459,13 +438,13 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
}
/// Returns the index of this proof's leave in the Merkle tree.
fn index_of_proof(&self, proof: &Proof<ProposedValue>) -> usize {
fn index_of_proof(&self, proof: &Proof<Vec<u8>>) -> usize {
index_of_lemma(&proof.lemma, self.num_nodes)
}
/// Returns `true` if the proof is valid and has the same index as the node ID. Otherwise
/// logs an info message.
fn validate_proof(&self, p: &Proof<ProposedValue>, id: &NodeUid) -> bool {
fn validate_proof(&self, p: &Proof<Vec<u8>>, id: &NodeUid) -> bool {
if !p.validate(&p.root_hash) {
info!(
"Node {:?} received invalid proof: {:?}",
@ -488,55 +467,6 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
}
}
/// Broadcast algorithm instance.
///
/// The ACS algorithm requires multiple broadcast instances running
/// asynchronously, see Figure 4 in the HBBFT paper. Those are N asynchronous
/// coroutines, each responding to values from one particular remote node. The
/// paper doesn't make it clear though how other messages - Echo and Ready - are
/// distributed over the instances. Also it appears that the sender of a message
/// might become part of the message for this to work.
pub struct Instance<'a, T: 'a + Clone + Debug + Send + Sync> {
/// The transmit side of the channel to comms threads.
tx: &'a Sender<TargetedMessage<ProposedValue>>,
/// The receive side of the channel from comms threads.
rx: &'a Receiver<SourcedMessage<ProposedValue>>,
/// The broadcast algorithm instance.
broadcast: Broadcast<usize>,
/// Value to be broadcast.
broadcast_value: Option<T>,
}
impl<'a, T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>>
Instance<'a, T>
{
pub fn new(
tx: &'a Sender<TargetedMessage<ProposedValue>>,
rx: &'a Receiver<SourcedMessage<ProposedValue>>,
broadcast_value: Option<T>,
node_ids: BTreeSet<usize>,
our_id: usize,
proposer_id: usize,
) -> Self {
let broadcast =
Broadcast::new(our_id, proposer_id, node_ids).expect("failed to instantiate broadcast");
Instance {
tx,
rx,
broadcast,
broadcast_value,
}
}
/// Broadcast stage task returning the computed values in case of success,
/// and an error in case of failure.
pub fn run(self) -> Result<T, Error> {
// Broadcast state machine thread.
let bvalue: Option<ProposedValue> = self.broadcast_value.map(|v| v.into());
inner_run(self.tx, self.rx, bvalue, &self.broadcast).map(ProposedValue::into)
}
}
/// Errors returned by the broadcast instance.
#[derive(Debug, Clone)]
pub enum Error {
@ -544,8 +474,6 @@ pub enum Error {
Threading,
ProofConstructionFailed,
ReedSolomon(rse::Error),
SendDeprecated(SendError<TargetedMessage<ProposedValue>>),
Recv(RecvError),
UnexpectedMessage,
NotImplemented,
UnknownSender,
@ -557,64 +485,6 @@ impl From<rse::Error> for Error {
}
}
impl From<SendError<TargetedMessage<ProposedValue>>> for Error {
fn from(err: SendError<TargetedMessage<ProposedValue>>) -> Error {
Error::SendDeprecated(err)
}
}
impl From<RecvError> for Error {
fn from(err: RecvError) -> Error {
Error::Recv(err)
}
}
/// The main loop of the broadcast task.
fn inner_run<'a>(
tx: &'a Sender<TargetedMessage<ProposedValue>>,
rx: &'a Receiver<SourcedMessage<ProposedValue>>,
broadcast_value: Option<ProposedValue>,
broadcast: &Broadcast<usize>,
) -> Result<ProposedValue, Error> {
if let Some(v) = broadcast_value {
for msg in broadcast
.propose_value(v)?
.into_iter()
.map(TargetedBroadcastMessage::into)
{
tx.send(msg)?;
}
}
// TODO: handle exit conditions
loop {
// Receive a message from the socket IO task.
let message = rx.recv()?;
if let SourcedMessage {
source: i,
message: Message::Broadcast(message),
} = message
{
debug!("{} received from {}: {:?}", broadcast.our_id, i, message);
let (opt_output, msgs) = broadcast.handle_broadcast_message(&i, message)?;
for msg in &msgs {
debug!(
"{} sending to {:?}: {:?}",
broadcast.our_id, msg.target, msg.message
);
}
for msg in msgs.into_iter().map(TargetedBroadcastMessage::into) {
tx.send(msg)?;
}
if let Some(output) = opt_output {
return Ok(output);
}
} else {
error!("Incorrect message from the socket: {:?}", message);
}
}
}
fn decode_from_shards<T>(
leaf_values: &mut [Option<Box<[u8]>>],
coding: &ReedSolomon,
@ -622,7 +492,7 @@ fn decode_from_shards<T>(
root_hash: &[u8],
) -> Result<T, Error>
where
T: Clone + Debug + Hashable + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>,
T: From<Vec<u8>>,
{
// Try to interpolate the Merkle tree using the Reed-Solomon erasure coding scheme.
coding.reconstruct_shards(leaf_values)?;
@ -630,7 +500,7 @@ where
// Recompute the Merkle tree root.
// Collect shards for tree construction.
let shards: Vec<ProposedValue> = leaf_values
let shards: Vec<Vec<u8>> = leaf_values
.iter()
.filter_map(|l| l.as_ref().map(|v| v.to_vec()))
.collect();
@ -655,9 +525,9 @@ where
/// Concatenates the first `n` leaf values of a Merkle tree `m` in one value of
/// type `T`. This is useful for reconstructing the data value held in the tree
/// and forgetting the leaves that contain parity information.
fn glue_shards<T>(m: MerkleTree<ProposedValue>, n: usize) -> T
fn glue_shards<T>(m: MerkleTree<Vec<u8>>, n: usize) -> T
where
T: From<Vec<u8>> + Into<Vec<u8>>,
T: From<Vec<u8>>,
{
let t: Vec<u8> = m.into_iter()
.take(n)
@ -666,7 +536,7 @@ where
let payload_len = t[0] as usize;
debug!("Glued data shards {:?}", HexBytes(&t[1..(payload_len + 1)]));
Vec::into(t[1..(payload_len + 1)].to_vec())
t[1..(payload_len + 1)].to_vec().into()
}
/// Computes the Merkle tree leaf index of a value in a given lemma.

View File

@ -114,7 +114,7 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
&mut self,
sender_id: &NodeUid,
proposer_id: &NodeUid,
bmessage: BroadcastMessage<ProposedValue>,
bmessage: BroadcastMessage,
) -> Result<CommonSubsetOutput<NodeUid>, Error> {
let mut instance_result = None;
let input_result: Result<VecDeque<Output<NodeUid>>, Error> = {

View File

@ -6,8 +6,6 @@
#![feature(optin_builtin_traits)]
#[macro_use]
extern crate log;
extern crate crossbeam;
extern crate crossbeam_channel;
extern crate itertools;
extern crate merkle;
extern crate protobuf;

View File

@ -1,16 +1,8 @@
//! The local message delivery system.
use proto::Message;
use std::fmt::Debug;
/// Message sent by a given source. The sources are consensus nodes indexed 1
/// through N where N is the total number of nodes. Sourced messages are
/// required when it is essential to know the message origin but the set of
/// recepients is unknown without further computation which is irrelevant to the
/// message delivery task.
/// Message sent by a given source.
#[derive(Clone, Debug)]
pub struct SourcedMessage<T: Clone + Debug + Send + Sync + AsRef<[u8]>> {
pub source: usize,
pub message: Message<T>,
pub struct SourcedMessage<M, N> {
pub source: N,
pub message: M,
}
/// Message destination can be either of the two:
@ -20,21 +12,14 @@ pub struct SourcedMessage<T: Clone + Debug + Send + Sync + AsRef<[u8]>> {
///
/// 2) `Node(i)`: node i or local algorithm instances with the node index i.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Target {
pub enum Target<N> {
All,
Node(usize),
Node(N),
}
/// Message with a designated target.
#[derive(Clone, Debug, PartialEq)]
pub struct TargetedMessage<T: Clone + Debug + Send + Sync + AsRef<[u8]>> {
pub target: Target,
pub message: Message<T>,
}
impl<T: Clone + Debug + Send + Sync + AsRef<[u8]>> TargetedMessage<T> {
/// Initialises a message while checking parameter preconditions.
pub fn new(target: Target, message: Message<T>) -> Self {
TargetedMessage { target, message }
}
pub struct TargetedMessage<M, N> {
pub target: Target<N>,
pub message: M,
}

View File

@ -5,19 +5,8 @@ use agreement::AgreementMessage;
use broadcast::BroadcastMessage;
use merkle::proof::{Lemma, Positioned, Proof};
use proto::message::*;
use protobuf::core::parse_from_bytes;
use protobuf::error::{ProtobufError, ProtobufResult, WireError};
use protobuf::Message as ProtobufMessage;
use ring::digest::Algorithm;
use std::fmt;
use std::marker::{Send, Sync};
/// Kinds of message sent by nodes participating in consensus.
#[derive(Clone, Debug, PartialEq)]
pub enum Message<T: Send + Sync + AsRef<[u8]>> {
Broadcast(BroadcastMessage<T>),
Agreement(AgreementMessage),
}
/// Wrapper for a byte array, whose `Debug` implementation outputs shortened hexadecimal strings.
pub struct HexBytes<'a>(pub &'a [u8]);
@ -67,62 +56,20 @@ impl<'a, T: AsRef<[u8]>> fmt::Debug for HexProof<'a, T> {
}
}
impl<T: Send + Sync + AsRef<[u8]> + From<Vec<u8>>> Message<T> {
/// Translation from protobuf to the regular type.
///
/// TODO: add an `Algorithm` field to `MessageProto`. Either `Algorithm` has
/// to be fully serialised and sent as a whole, or it can be passed over
/// using an ID and the `Eq` instance to discriminate the finite set of
/// algorithms in `ring::digest`.
pub fn from_proto(mut proto: message::MessageProto) -> Option<Self> {
if proto.has_broadcast() {
BroadcastMessage::from_proto(
proto.take_broadcast(),
// TODO, possibly move Algorithm inside
// BroadcastMessage
&::ring::digest::SHA256,
).map(Message::Broadcast)
} else if proto.has_agreement() {
AgreementMessage::from_proto(proto.take_agreement()).map(Message::Agreement)
} else {
None
}
}
pub fn into_proto(self) -> MessageProto {
let mut m = MessageProto::new();
match self {
Message::Broadcast(b) => {
m.set_broadcast(b.into_proto());
}
Message::Agreement(a) => {
m.set_agreement(a.into_proto());
}
}
m
}
/// Parse a `Message` from its protobuf binary representation.
///
/// TODO: pass custom errors from down the chain of nested parsers as
/// opposed to returning `WireError::Other`.
pub fn parse_from_bytes(bytes: &[u8]) -> ProtobufResult<Self> {
let r = parse_from_bytes::<MessageProto>(bytes).map(Self::from_proto);
match r {
Ok(Some(m)) => Ok(m),
Ok(None) => Err(ProtobufError::WireError(WireError::Other)),
Err(e) => Err(e),
}
}
/// Produce a protobuf representation of this `Message`.
pub fn write_to_bytes(self) -> ProtobufResult<Vec<u8>> {
self.into_proto().write_to_bytes()
impl From<message::BroadcastProto> for BroadcastMessage {
fn from(proto: message::BroadcastProto) -> BroadcastMessage {
BroadcastMessage::from_proto(proto, &::ring::digest::SHA256)
.expect("invalid broadcast message")
}
}
impl<T: Send + Sync + AsRef<[u8]> + From<Vec<u8>>> BroadcastMessage<T> {
impl From<BroadcastMessage> for message::BroadcastProto {
fn from(msg: BroadcastMessage) -> message::BroadcastProto {
msg.into_proto()
}
}
impl BroadcastMessage {
pub fn into_proto(self) -> BroadcastProto {
let mut b = BroadcastProto::new();
match self {
@ -165,6 +112,36 @@ impl<T: Send + Sync + AsRef<[u8]> + From<Vec<u8>>> BroadcastMessage<T> {
}
}
impl AgreementMessage {
pub fn into_proto(self) -> message::AgreementProto {
let mut p = message::AgreementProto::new();
match self {
AgreementMessage::BVal((e, b)) => {
p.set_epoch(e);
p.set_bval(b);
}
AgreementMessage::Aux((e, b)) => {
p.set_epoch(e);
p.set_aux(b);
}
}
p
}
// TODO: Re-enable lint once implemented.
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
pub fn from_proto(mp: message::AgreementProto) -> Option<Self> {
let epoch = mp.get_epoch();
if mp.has_bval() {
Some(AgreementMessage::BVal((epoch, mp.get_bval())))
} else if mp.has_aux() {
Some(AgreementMessage::Aux((epoch, mp.get_aux())))
} else {
None
}
}
}
/// Serialisation of `Proof` defined against its protobuf interface to work
/// around the restriction of not being allowed to extend the implementation of
/// `Proof` outside its crate.

View File

@ -1,9 +1,8 @@
//! Protobuf message IO task structure.
use proto::*;
use protobuf;
use protobuf::Message as ProtobufMessage;
use protobuf::{self, Message, MessageStatic};
use std::io::{Read, Write};
use std::marker::PhantomData;
use std::net::TcpStream;
use std::{cmp, io};
@ -61,35 +60,35 @@ fn decode_u32_from_be(buffer: &[u8]) -> Result<u32, Error> {
Ok(result)
}
pub struct ProtoIo<S: Read + Write> {
pub struct ProtoIo<S: Read + Write, M> {
stream: S,
buffer: [u8; 1024 * 4],
_phantom: PhantomData<M>,
}
impl ProtoIo<TcpStream> {
pub fn try_clone(&self) -> Result<ProtoIo<TcpStream>, ::std::io::Error> {
impl<M> ProtoIo<TcpStream, M> {
pub fn try_clone(&self) -> Result<Self, ::std::io::Error> {
Ok(ProtoIo {
stream: self.stream.try_clone()?,
buffer: [0; 1024 * 4],
_phantom: PhantomData,
})
}
}
/// A message handling task.
impl<S: Read + Write> ProtoIo<S>
impl<S: Read + Write, M: Message + MessageStatic> ProtoIo<S, M>
//where T: Clone + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>
{
pub fn from_stream(stream: S) -> Self {
ProtoIo {
stream,
buffer: [0; 1024 * 4],
_phantom: PhantomData,
}
}
pub fn recv<T>(&mut self) -> Result<Message<T>, Error>
where
T: Clone + Send + Sync + AsRef<[u8]> + From<Vec<u8>>,
{
pub fn recv(&mut self) -> Result<M, Error> {
self.stream.read_exact(&mut self.buffer[0..4])?;
let frame_start = decode_u32_from_be(&self.buffer[0..4])?;
if frame_start != FRAME_START {
@ -107,25 +106,21 @@ impl<S: Read + Write> ProtoIo<S>
message_v.extend_from_slice(slice);
}
Message::parse_from_bytes(&message_v).map_err(Error::ProtobufError)
protobuf::parse_from_bytes(&message_v).map_err(Error::ProtobufError)
}
pub fn send<T>(&mut self, message: Message<T>) -> Result<(), Error>
where
T: Clone + Send + Sync + AsRef<[u8]> + From<Vec<u8>>,
{
pub fn send(&mut self, message: &M) -> Result<(), Error> {
let mut buffer: [u8; 4] = [0; 4];
// Wrap stream
let mut stream = protobuf::CodedOutputStream::new(&mut self.stream);
// Write magic number
encode_u32_to_be(FRAME_START, &mut buffer[0..4])?;
stream.write_raw_bytes(&buffer)?;
let message_p = message.into_proto();
// Write message size
encode_u32_to_be(message_p.compute_size(), &mut buffer[0..4])?;
encode_u32_to_be(message.compute_size(), &mut buffer[0..4])?;
stream.write_raw_bytes(&buffer)?;
// Write message
message_p.write_to(&mut stream)?;
message.write_to(&mut stream)?;
// Flush
stream.flush()?;
Ok(())
@ -135,21 +130,20 @@ impl<S: Read + Write> ProtoIo<S>
#[cfg(test)]
mod tests {
use broadcast::BroadcastMessage;
use proto::message::BroadcastProto;
use proto_io::*;
use std::io::Cursor;
#[test]
fn encode_decode_message() {
let msg0: Message<Vec<u8>> =
Message::Broadcast(BroadcastMessage::Ready(b"Test 0".to_vec()));
let msg1: Message<Vec<u8>> =
Message::Broadcast(BroadcastMessage::Ready(b"Test 1".to_vec()));
let mut pio = ProtoIo::from_stream(Cursor::new(Vec::new()));
pio.send(msg0.clone()).expect("send msg0");
pio.send(msg1.clone()).expect("send msg1");
let msg0 = BroadcastMessage::Ready(b"Test 0".to_vec());
let msg1 = BroadcastMessage::Ready(b"Test 1".to_vec());
let mut pio = ProtoIo::<_, BroadcastProto>::from_stream(Cursor::new(Vec::new()));
pio.send(&msg0.clone().into()).expect("send msg0");
pio.send(&msg1.clone().into()).expect("send msg1");
println!("{:?}", pio.stream.get_ref());
pio.stream.set_position(0);
assert_eq!(msg0, pio.recv().expect("recv msg0"));
assert_eq!(msg1, pio.recv().expect("recv msg1"));
assert_eq!(msg0, pio.recv().expect("recv msg0").into());
assert_eq!(msg1, pio.recv().expect("recv msg1").into());
}
}

View File

@ -13,7 +13,8 @@ use rand::Rng;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt;
use hbbft::broadcast::{Broadcast, BroadcastMessage, BroadcastTarget, TargetedBroadcastMessage};
use hbbft::broadcast::{Broadcast, BroadcastMessage, TargetedBroadcastMessage};
use hbbft::messaging::Target;
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy)]
struct NodeId(usize);
@ -29,7 +30,7 @@ struct TestNode {
/// The instance of the broadcast algorithm.
broadcast: Broadcast<NodeId>,
/// Incoming messages from other nodes that this node has not yet handled.
queue: VecDeque<(NodeId, BroadcastMessage<ProposedValue>)>,
queue: VecDeque<(NodeId, BroadcastMessage)>,
/// The values this node has output so far.
outputs: Vec<ProposedValue>,
}
@ -216,7 +217,7 @@ impl<A: Adversary> TestNetwork<A> {
for msg in msgs {
match msg {
TargetedBroadcastMessage {
target: BroadcastTarget::All,
target: Target::All,
ref message,
} => {
for node in self.nodes.values_mut() {
@ -227,7 +228,7 @@ impl<A: Adversary> TestNetwork<A> {
self.adversary.push_message(sender_id, msg.clone());
}
TargetedBroadcastMessage {
target: BroadcastTarget::Node(to_id),
target: Target::Node(to_id),
ref message,
} => {
if self.adv_nodes.contains(&to_id) {