Define a common DistAlgorithm trait.

This will allow us to deduplicate network simulations in tests for the
different algorithms. More generally, it facilitates writing general
tools and applying them to all distributed algorithms.
This commit is contained in:
Andreas Fackler 2018-05-14 14:35:06 +02:00
parent 93a6ce1bb4
commit 5528fc2de8
6 changed files with 350 additions and 272 deletions

View File

@ -42,7 +42,7 @@ use std::net::SocketAddr;
use std::{io, iter, process, thread, time};
use hbbft::broadcast::{Broadcast, BroadcastMessage};
use hbbft::messaging::SourcedMessage;
use hbbft::messaging::{DistAlgorithm, SourcedMessage};
use hbbft::proto::message::BroadcastProto;
use network::commst;
use network::connection;
@ -127,10 +127,8 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
.expect("failed to instantiate broadcast");
if let Some(v) = value {
for msg in broadcast
.propose_value(v.clone().into())
.expect("propose value")
{
broadcast.input(v.clone().into()).expect("propose value");
for msg in broadcast.message_iter() {
tx_from_algo.send(msg).expect("send from algo");
}
}
@ -140,16 +138,14 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
let message = rx_to_algo.recv().expect("receive from algo");
let SourcedMessage { source: i, message } = message;
debug!("{} received from {}: {:?}", our_id, i, message);
let (opt_output, msgs) = broadcast
.handle_broadcast_message(&i, message)
broadcast
.handle_message(&i, message)
.expect("handle broadcast message");
for msg in &msgs {
for msg in broadcast.message_iter() {
debug!("{} sending to {:?}: {:?}", our_id, msg.target, msg.message);
}
for msg in msgs {
tx_from_algo.send(msg).expect("send from algo");
}
if let Some(output) = opt_output {
if let Some(output) = broadcast.next_output() {
println!(
"Broadcast succeeded! Node {} output: {}",
our_id,

View File

@ -8,12 +8,9 @@ use ring::digest;
use serde::{Deserialize, Deserializer};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt::{self, Debug};
use std::hash::Hash;
use std::iter;
use messaging::{Target, TargetedMessage};
type MessageQueue<NodeUid> = VecDeque<TargetedMessage<BroadcastMessage, NodeUid>>;
use messaging::{DistAlgorithm, Target, TargetedMessage};
/// The three kinds of message sent during the reliable broadcast stage of the
/// consensus algorithm.
@ -87,13 +84,13 @@ impl fmt::Debug for BroadcastMessage {
/// eventually be able to decode (i.e. receive at least `f + 1` `Echo` messages).
/// * So a node with `2 * f + 1` `Ready`s and `f + 1` `Echos` will decode and _output_ the value,
/// knowing that every other good node will eventually do the same.
pub struct Broadcast<NodeUid> {
pub struct Broadcast<N> {
/// The UID of this node.
our_id: NodeUid,
our_id: N,
/// The UID of the sending node.
proposer_id: NodeUid,
proposer_id: N,
/// UIDs of all nodes for iteration purposes.
all_uids: BTreeSet<NodeUid>,
all_uids: BTreeSet<N>,
num_nodes: usize,
num_faulty_nodes: usize,
data_shard_num: usize,
@ -105,19 +102,68 @@ pub struct Broadcast<NodeUid> {
/// Whether we have already output a value.
has_output: bool,
/// The proofs we have received via `Echo` messages, by sender ID.
echos: BTreeMap<NodeUid, Proof<Vec<u8>>>,
echos: BTreeMap<N, Proof<Vec<u8>>>,
/// The root hashes we received via `Ready` messages, by sender ID.
readys: BTreeMap<NodeUid, Vec<u8>>,
readys: BTreeMap<N, Vec<u8>>,
/// The outgoing message queue.
messages: VecDeque<TargetedMessage<BroadcastMessage, N>>,
/// The output, if any.
output: Option<Vec<u8>>,
}
impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
impl<N: Eq + Debug + Clone + Ord> DistAlgorithm for Broadcast<N> {
type NodeUid = N;
type Input = Vec<u8>; // TODO: Allow anything serializable.
type Output = Self::Input;
type Message = BroadcastMessage;
type Error = Error;
fn input(&mut self, input: Self::Input) -> Result<(), Self::Error> {
if self.our_id != self.proposer_id {
return Err(Error::UnexpectedMessage);
}
// Split the value into chunks/shards, encode them with erasure codes.
// Assemble a Merkle tree from data and parity shards. Take all proofs
// from this tree and send them, each to its own node.
let proof = self.send_shards(input)?;
// TODO: We'd actually need to return the output here, if it was only one node. Should that
// use-case be supported?
let our_id = self.our_id.clone();
self.handle_value(&our_id, proof)
}
fn handle_message(&mut self, sender_id: &N, message: Self::Message) -> Result<(), Self::Error> {
if !self.all_uids.contains(sender_id) {
return Err(Error::UnknownSender);
}
match message {
BroadcastMessage::Value(p) => self.handle_value(sender_id, p),
BroadcastMessage::Echo(p) => self.handle_echo(sender_id, p),
BroadcastMessage::Ready(ref hash) => self.handle_ready(sender_id, hash),
}
}
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, N>> {
self.messages.pop_front()
}
fn next_output(&mut self) -> Option<Self::Output> {
self.output.take()
}
fn terminated(&self) -> bool {
self.has_output
}
fn our_id(&self) -> &N {
&self.our_id
}
}
impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
/// Creates a new broadcast instance to be used by node `our_id` which expects a value proposal
/// from node `proposer_id`.
pub fn new(
our_id: NodeUid,
proposer_id: NodeUid,
all_uids: BTreeSet<NodeUid>,
) -> Result<Self, Error> {
pub fn new(our_id: N, proposer_id: N, all_uids: BTreeSet<N>) -> Result<Self, Error> {
let num_nodes = all_uids.len();
let num_faulty_nodes = (num_nodes - 1) / 3;
let parity_shard_num = 2 * num_faulty_nodes;
@ -137,39 +183,17 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
has_output: false,
echos: BTreeMap::new(),
readys: BTreeMap::new(),
messages: VecDeque::new(),
output: None,
})
}
/// Processes the proposed value input by broadcasting it.
pub fn propose_value(&mut self, value: Vec<u8>) -> Result<MessageQueue<NodeUid>, Error> {
if self.our_id != self.proposer_id {
return Err(Error::UnexpectedMessage);
}
// Split the value into chunks/shards, encode them with erasure codes.
// Assemble a Merkle tree from data and parity shards. Take all proofs
// from this tree and send them, each to its own node.
let (proof, value_msgs) = self.send_shards(value)?;
// TODO: We'd actually need to return the output here, if it was only one node. Should that
// use-case be supported?
let our_id = self.our_id.clone();
let (_, echo_msgs) = self.handle_value(&our_id, proof)?;
Ok(value_msgs.into_iter().chain(echo_msgs).collect())
}
/// Returns this node's ID.
pub fn our_id(&self) -> &NodeUid {
&self.our_id
}
/// Breaks the input value into shards of equal length and encodes them --
/// and some extra parity shards -- with a Reed-Solomon erasure coding
/// scheme. The returned value contains the shard assigned to this
/// node. That shard doesn't need to be sent anywhere. It gets recorded in
/// the broadcast instance.
fn send_shards(
&self,
mut value: Vec<u8>,
) -> Result<(Proof<Vec<u8>>, MessageQueue<NodeUid>), Error> {
fn send_shards(&mut self, mut value: Vec<u8>) -> Result<Proof<Vec<u8>>, Error> {
let data_shard_num = self.coding.data_shard_count();
let parity_shard_num = self.coding.parity_shard_count();
@ -219,7 +243,6 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
// Default result in case of `gen_proof` error.
let mut result = Err(Error::ProofConstructionFailed);
let mut outgoing = VecDeque::new();
assert_eq!(self.num_nodes, mtree.iter().count());
// Send each proof to a node.
@ -232,36 +255,16 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
result = Ok(proof);
} else {
// Rest of the proofs are sent to remote nodes.
let msg = BroadcastMessage::Value(proof);
outgoing.push_back(Target::Node(uid.clone()).message(msg));
let msg = Target::Node(uid.clone()).message(BroadcastMessage::Value(proof));
self.messages.push_back(msg);
}
}
result.map(|r| (r, outgoing))
}
/// Handler of messages received from remote nodes.
pub fn handle_broadcast_message(
&mut self,
sender_id: &NodeUid,
message: BroadcastMessage,
) -> Result<(Option<Vec<u8>>, MessageQueue<NodeUid>), Error> {
if !self.all_uids.contains(sender_id) {
return Err(Error::UnknownSender);
}
match message {
BroadcastMessage::Value(p) => self.handle_value(sender_id, p),
BroadcastMessage::Echo(p) => self.handle_echo(sender_id, p),
BroadcastMessage::Ready(ref hash) => self.handle_ready(sender_id, hash),
}
result
}
/// Handles a received echo and verifies the proof it contains.
fn handle_value(
&mut self,
sender_id: &NodeUid,
p: Proof<Vec<u8>>,
) -> Result<(Option<Vec<u8>>, MessageQueue<NodeUid>), Error> {
fn handle_value(&mut self, sender_id: &N, p: Proof<Vec<u8>>) -> Result<(), Error> {
// If the sender is not the proposer, this is not the first `Value` or the proof is invalid,
// ignore.
if *sender_id != self.proposer_id {
@ -269,43 +272,37 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
"Node {:?} received Value from {:?} instead of {:?}.",
self.our_id, sender_id, self.proposer_id
);
return Ok((None, VecDeque::new()));
return Ok(());
}
if self.echo_sent {
info!("Node {:?} received multiple Values.", self.our_id);
return Ok((None, VecDeque::new()));
return Ok(());
}
if !self.validate_proof(&p, &self.our_id) {
return Ok((None, VecDeque::new()));
return Ok(());
}
// Otherwise multicast the proof in an `Echo` message, and handle it ourselves.
self.echo_sent = true;
let our_id = self.our_id.clone();
let (output, echo_msgs) = self.handle_echo(&our_id, p.clone())?;
let msgs = iter::once(Target::All.message(BroadcastMessage::Echo(p)))
.chain(echo_msgs)
.collect();
Ok((output, msgs))
self.handle_echo(&our_id, p.clone())?;
let echo_msg = Target::All.message(BroadcastMessage::Echo(p));
self.messages.push_back(echo_msg);
Ok(())
}
/// Handles a received `Echo` message.
fn handle_echo(
&mut self,
sender_id: &NodeUid,
p: Proof<Vec<u8>>,
) -> Result<(Option<Vec<u8>>, MessageQueue<NodeUid>), Error> {
fn handle_echo(&mut self, sender_id: &N, p: Proof<Vec<u8>>) -> Result<(), Error> {
// If the proof is invalid or the sender has already sent `Echo`, ignore.
if self.echos.contains_key(sender_id) {
info!(
"Node {:?} received multiple Echos from {:?}.",
self.our_id, sender_id,
);
return Ok((None, VecDeque::new()));
return Ok(());
}
if !self.validate_proof(&p, sender_id) {
return Ok((None, VecDeque::new()));
return Ok(());
}
let hash = p.root_hash.clone();
@ -314,54 +311,48 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
self.echos.insert(sender_id.clone(), p);
if self.ready_sent || self.count_echos(&hash) < self.num_nodes - self.num_faulty_nodes {
return Ok((self.get_output(&hash)?, VecDeque::new()));
return self.compute_output(&hash);
}
// Upon receiving `N - f` `Echo`s with this root hash, multicast `Ready`.
self.ready_sent = true;
let msg = Target::All.message(BroadcastMessage::Ready(hash.clone()));
let ready_msg = Target::All.message(BroadcastMessage::Ready(hash.clone()));
self.messages.push_back(ready_msg);
let our_id = self.our_id.clone();
let (output, ready_msgs) = self.handle_ready(&our_id, &hash)?;
Ok((output, iter::once(msg).chain(ready_msgs).collect()))
self.handle_ready(&our_id, &hash)
}
/// Handles a received `Ready` message.
fn handle_ready(
&mut self,
sender_id: &NodeUid,
hash: &[u8],
) -> Result<(Option<Vec<u8>>, MessageQueue<NodeUid>), Error> {
fn handle_ready(&mut self, sender_id: &N, hash: &[u8]) -> Result<(), Error> {
// If the sender has already sent a `Ready` before, ignore.
if self.readys.contains_key(sender_id) {
info!(
"Node {:?} received multiple Readys from {:?}.",
self.our_id, sender_id
);
return Ok((None, VecDeque::new()));
return Ok(());
}
self.readys.insert(sender_id.clone(), hash.to_vec());
// Upon receiving f + 1 matching Ready(h) messages, if Ready
// has not yet been sent, multicast Ready(h).
let outgoing = if self.count_readys(hash) == self.num_faulty_nodes + 1 && !self.ready_sent {
if self.count_readys(hash) == self.num_faulty_nodes + 1 && !self.ready_sent {
// Enqueue a broadcast of a Ready message.
self.ready_sent = true;
iter::once(Target::All.message(BroadcastMessage::Ready(hash.to_vec()))).collect()
} else {
VecDeque::new()
};
Ok((self.get_output(hash)?, outgoing))
let ready_msg = Target::All.message(BroadcastMessage::Ready(hash.to_vec()));
self.messages.push_back(ready_msg);
}
self.compute_output(&hash)
}
/// Checks whether the condition for output are met for this hash, and if so, returns the output
/// Checks whether the condition for output are met for this hash, and if so, sets the output
/// value.
fn get_output(&mut self, hash: &[u8]) -> Result<Option<Vec<u8>>, Error> {
fn compute_output(&mut self, hash: &[u8]) -> Result<(), Error> {
if self.has_output || self.count_readys(hash) <= 2 * self.num_faulty_nodes
|| self.count_echos(hash) <= self.num_faulty_nodes
{
return Ok(None);
return Ok(());
}
// Upon receiving 2f + 1 matching Ready(h) messages, wait for N 2f Echo messages.
@ -379,11 +370,12 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
})
.collect();
let value = decode_from_shards(&mut leaf_values, &self.coding, self.data_shard_num, hash)?;
Ok(Some(value))
self.output = Some(value);
Ok(())
}
/// Returns `i` if `node_id` is the `i`-th ID among all participating nodes.
fn index_of_node(&self, node_id: &NodeUid) -> Option<usize> {
fn index_of_node(&self, node_id: &N) -> Option<usize> {
self.all_uids.iter().position(|id| id == node_id)
}
@ -394,7 +386,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
/// Returns `true` if the proof is valid and has the same index as the node ID. Otherwise
/// logs an info message.
fn validate_proof(&self, p: &Proof<Vec<u8>>, id: &NodeUid) -> bool {
fn validate_proof(&self, p: &Proof<Vec<u8>>, id: &N) -> bool {
if !p.validate(&p.root_hash) {
info!(
"Node {:?} received invalid proof: {:?}",

View File

@ -13,7 +13,7 @@ use agreement::{Agreement, AgreementMessage};
use broadcast;
use broadcast::{Broadcast, BroadcastMessage};
use messaging::{Target, TargetedMessage};
use messaging::{DistAlgorithm, Target, TargetedMessage};
// TODO: Make this a generic argument of `Broadcast`.
type ProposedValue = Vec<u8>;
@ -25,6 +25,7 @@ type CommonSubsetOutput<NodeUid> = (
/// Message from Common Subset to remote nodes.
#[cfg_attr(feature = "serialization-serde", derive(Serialize))]
#[derive(Debug)]
pub enum Message<NodeUid> {
/// A message for the broadcast algorithm concerning the set element proposed by the given node.
Broadcast(NodeUid, BroadcastMessage),
@ -109,10 +110,10 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
) -> Result<VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>, Error> {
// Upon receiving input v_i , input v_i to RBC_i. See Figure 2.
if let Some(instance) = self.broadcast_instances.get_mut(&self.uid) {
instance.input(value)?;
let uid = self.uid.clone();
Ok(instance
.propose_value(value)?
.into_iter()
.message_iter()
.map(|msg| msg.map(|b_msg| Message::Broadcast(uid.clone(), b_msg)))
.collect())
} else {
@ -165,18 +166,12 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
Error,
> = {
if let Some(broadcast_instance) = self.broadcast_instances.get_mut(proposer_id) {
broadcast_instance
.handle_broadcast_message(sender_id, bmessage)
.map(|(opt_value, queue)| {
instance_result = opt_value;
queue
.into_iter()
.map(|msg| {
msg.map(|b_msg| Message::Broadcast(proposer_id.clone(), b_msg))
})
.collect()
})
.map_err(Error::from)
broadcast_instance.handle_message(sender_id, bmessage)?;
instance_result = broadcast_instance.next_output();
Ok(broadcast_instance
.message_iter()
.map(|msg| msg.map(|b_msg| Message::Broadcast(proposer_id.clone(), b_msg)))
.collect())
} else {
Err(Error::NoSuchBroadcastInstance)
}

View File

@ -1,13 +1,14 @@
use std::collections::{HashSet, VecDeque};
use std::fmt::{Debug, Display};
use std::hash::Hash;
use std::iter;
use bincode;
use serde::de::DeserializeOwned;
use serde::Serialize;
use common_subset::{self, CommonSubset};
use messaging::TargetedMessage;
use messaging::{DistAlgorithm, TargetedMessage};
/// An instance of the Honey Badger Byzantine fault tolerant consensus algorithm.
pub struct HoneyBadger<T, N: Eq + Hash + Ord + Clone + Display> {
@ -22,11 +23,58 @@ pub struct HoneyBadger<T, N: Eq + Hash + Ord + Clone + Display> {
/// This node's ID.
id: N,
/// The set of all node IDs of the participants (including ourselves).
all_ids: HashSet<N>,
all_uids: HashSet<N>,
/// The target number of transactions to be included in each batch.
// TODO: Do experiments and recommend a batch size. It should be proportional to
// `num_nodes * num_nodes * log(num_nodes)`.
batch_size: usize,
/// The messages that need to be sent to other nodes.
messages: VecDeque<TargetedMessage<Message<N>, N>>,
/// The outputs from completed epochs.
output: VecDeque<Batch<T>>,
}
impl<T, N> DistAlgorithm for HoneyBadger<T, N>
where
T: Ord + Serialize + DeserializeOwned,
N: Eq + Hash + Ord + Clone + Display + Debug,
{
type NodeUid = N;
type Input = T;
type Output = Batch<T>;
type Message = Message<N>;
type Error = Error;
fn input(&mut self, input: Self::Input) -> Result<(), Self::Error> {
self.add_transactions(iter::once(input))
}
fn handle_message(&mut self, sender_id: &N, message: Self::Message) -> Result<(), Self::Error> {
if !self.all_uids.contains(sender_id) {
return Err(Error::UnknownSender);
}
match message {
Message::CommonSubset(epoch, cs_msg) => {
self.handle_common_subset_message(sender_id, epoch, cs_msg)
}
}
}
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, N>> {
self.messages.pop_front()
}
fn next_output(&mut self) -> Option<Self::Output> {
self.output.pop_front()
}
fn terminated(&self) -> bool {
false
}
fn our_id(&self) -> &N {
&self.id
}
}
// TODO: Use a threshold encryption scheme to encrypt the proposed transactions.
@ -37,104 +85,84 @@ pub struct HoneyBadger<T, N: Eq + Hash + Ord + Clone + Display> {
// buffer instead of 1/n of it.
impl<T, N> HoneyBadger<T, N>
where
T: Ord + AsRef<[u8]> + Serialize + DeserializeOwned,
T: Ord + Serialize + DeserializeOwned,
N: Eq + Hash + Ord + Clone + Display + Debug,
{
/// Returns a new Honey Badger instance with the given parameters, starting at epoch `0`.
pub fn new<I>(id: N, all_ids_iter: I, batch_size: usize) -> Result<Self, Error>
pub fn new<I>(id: N, all_uids_iter: I, batch_size: usize) -> Result<Self, Error>
where
I: IntoIterator<Item = N>,
{
let all_ids: HashSet<N> = all_ids_iter.into_iter().collect();
if !all_ids.contains(&id) {
let all_uids: HashSet<N> = all_uids_iter.into_iter().collect();
if !all_uids.contains(&id) {
return Err(Error::OwnIdMissing);
}
Ok(HoneyBadger {
buffer: VecDeque::new(),
epoch: 0,
common_subset: CommonSubset::new(id.clone(), &all_ids)?,
common_subset: CommonSubset::new(id.clone(), &all_uids)?,
id,
batch_size,
all_ids,
all_uids,
messages: VecDeque::new(),
output: VecDeque::new(),
})
}
/// Adds transactions into the buffer.
pub fn add_transactions<I>(&mut self, txs: I) -> Result<HoneyBadgerOutput<T, N>, Error>
pub fn add_transactions<I>(&mut self, txs: I) -> Result<(), Error>
where
I: IntoIterator<Item = T>,
{
self.buffer.extend(txs);
if self.buffer.len() < self.batch_size {
return Ok((None, Vec::new()));
return Ok(());
}
// TODO: Handle the case `all_ids.len() == 1`.
let share = bincode::serialize(&self.buffer)?;
let msgs = self.common_subset
.send_proposed_value(share)?
.into_iter()
.map(|targeted_msg| {
targeted_msg.map(|cs_msg| Message::CommonSubset(self.epoch, cs_msg))
})
.collect();
Ok((None, msgs))
}
/// Handles a message from another node, and returns the next batch, if any, and the messages
/// to be sent out.
pub fn handle_message(
&mut self,
sender_id: &N,
message: Message<N>,
) -> Result<HoneyBadgerOutput<T, N>, Error> {
match message {
Message::CommonSubset(epoch, cs_msg) => {
self.handle_common_subset_message(sender_id, epoch, cs_msg)
}
for targeted_msg in self.common_subset.send_proposed_value(share)? {
let msg = targeted_msg.map(|cs_msg| Message::CommonSubset(self.epoch, cs_msg));
self.messages.push_back(msg);
}
Ok(())
}
/// Handles a message for the common subset sub-algorithm.
fn handle_common_subset_message(
&mut self,
sender_id: &N,
epoch: u64,
message: common_subset::Message<N>,
) -> Result<HoneyBadgerOutput<T, N>, Error> {
) -> Result<(), Error> {
if epoch != self.epoch {
// TODO: Do we need to cache messages for future epochs?
return Ok((None, Vec::new()));
return Ok(());
}
let (cs_out, cs_msgs) = self.common_subset.handle_message(sender_id, message)?;
let mut msgs: Vec<TargetedMessage<Message<N>, N>> = cs_msgs
.into_iter()
.map(|targeted_msg| targeted_msg.map(|cs_msg| Message::CommonSubset(epoch, cs_msg)))
.collect();
let output = if let Some(ser_batches) = cs_out {
let mut transactions: Vec<T> = ser_batches
for targeted_msg in cs_msgs {
let msg = targeted_msg.map(|cs_msg| Message::CommonSubset(epoch, cs_msg));
self.messages.push_back(msg);
}
let batches: Vec<Vec<T>> = if let Some(ser_batches) = cs_out {
ser_batches
.into_iter()
.map(|ser_batch| bincode::deserialize::<Vec<T>>(&ser_batch))
.collect::<Result<Vec<_>, Box<bincode::ErrorKind>>>()?
.into_iter()
.flat_map(|txs| txs)
.collect();
transactions.sort();
self.epoch += 1;
self.common_subset = CommonSubset::new(self.id.clone(), &self.all_ids)?;
let (_, new_epoch_msgs) = self.add_transactions(None)?;
msgs.extend(new_epoch_msgs);
Some(Batch {
epoch,
transactions,
})
.map(|ser_batch| bincode::deserialize(&ser_batch))
.collect::<Result<_, _>>()?
} else {
None
return Ok(());
};
Ok((output, msgs))
let mut transactions: Vec<T> = batches.into_iter().flat_map(|txs| txs).collect();
transactions.sort();
self.epoch += 1;
self.common_subset = CommonSubset::new(self.id.clone(), &self.all_uids)?;
self.add_transactions(None)?;
self.output.push_back(Batch {
epoch,
transactions,
});
Ok(())
}
}
type HoneyBadgerOutput<T, N> = (Option<Batch<T>>, Vec<TargetedMessage<Message<N>, N>>);
/// A batch of transactions the algorithm has output.
pub struct Batch<T> {
pub epoch: u64,
@ -143,6 +171,7 @@ pub struct Batch<T> {
/// A message sent to or received from another node's Honey Badger instance.
#[cfg_attr(feature = "serialization-serde", derive(Serialize))]
#[derive(Debug)]
pub enum Message<N> {
/// A message belonging to the common subset algorithm in the given epoch.
CommonSubset(u64, common_subset::Message<N>),
@ -150,8 +179,10 @@ pub enum Message<N> {
}
/// A Honey Badger error.
#[derive(Debug)]
pub enum Error {
OwnIdMissing,
UnknownSender,
CommonSubset(common_subset::Error),
Bincode(Box<bincode::ErrorKind>),
}

View File

@ -1,3 +1,5 @@
use std::fmt::Debug;
/// Message sent by a given source.
#[derive(Clone, Debug)]
pub struct SourcedMessage<M, N> {
@ -43,3 +45,61 @@ impl<M, N> TargetedMessage<M, N> {
}
}
}
/// A distributed algorithm that defines a message flow.
pub trait DistAlgorithm {
/// Unique node identifier.
type NodeUid: Debug + Clone + Ord + Eq;
/// The input provided by the user.
type Input;
/// The output type. Some algorithms return an output exactly once, others return multiple
/// times.
type Output;
/// The messages that need to be exchanged between the instances in the participating nodes.
type Message: Debug;
/// The errors that can occur during execution.
type Error: Debug;
/// Handles an input provided by the user, and returns
fn input(&mut self, input: Self::Input) -> Result<(), Self::Error>;
/// Handles a message received from node `sender_id`.
fn handle_message(
&mut self,
sender_id: &Self::NodeUid,
message: Self::Message,
) -> Result<(), Self::Error>;
/// Returns a message that needs to be sent to another node.
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, Self::NodeUid>>;
/// Returns the algorithm's output.
fn next_output(&mut self) -> Option<Self::Output>;
/// Returns `true` if execution has completed and this instance can be dropped.
fn terminated(&self) -> bool;
/// Returns this node's own ID.
fn our_id(&self) -> &Self::NodeUid;
/// Returns an iterator over the outgoing messages.
fn message_iter(&mut self) -> MessageIter<Self>
where
Self: Sized,
{
MessageIter { algorithm: self }
}
}
/// An iterator over a distributed algorithm's outgoing messages.
pub struct MessageIter<'a, D: DistAlgorithm + 'a> {
algorithm: &'a mut D,
}
impl<'a, D: DistAlgorithm + 'a> Iterator for MessageIter<'a, D> {
type Item = TargetedMessage<D::Message, D::NodeUid>;
fn next(&mut self) -> Option<Self::Item> {
self.algorithm.next_message()
}
}

View File

@ -12,49 +12,44 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt;
use hbbft::broadcast::{Broadcast, BroadcastMessage};
use hbbft::messaging::{Target, TargetedMessage};
use hbbft::messaging::{DistAlgorithm, Target, TargetedMessage};
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy)]
struct NodeId(usize);
struct NodeUid(usize);
type ProposedValue = Vec<u8>;
type MessageQueue = VecDeque<TargetedMessage<BroadcastMessage, NodeId>>;
/// A "node" running a broadcast instance.
struct TestNode {
struct TestNode<D: DistAlgorithm> {
/// This node's own ID.
id: NodeId,
id: D::NodeUid,
/// The instance of the broadcast algorithm.
broadcast: Broadcast<NodeId>,
algo: D,
/// Incoming messages from other nodes that this node has not yet handled.
queue: VecDeque<(NodeId, BroadcastMessage)>,
queue: VecDeque<(D::NodeUid, D::Message)>,
/// The values this node has output so far.
outputs: Vec<ProposedValue>,
outputs: Vec<D::Output>,
}
impl TestNode {
impl<D: DistAlgorithm> TestNode<D> {
/// Creates a new test node with the given broadcast instance.
fn new(broadcast: Broadcast<NodeId>) -> TestNode {
fn new(algo: D) -> TestNode<D> {
TestNode {
id: *broadcast.our_id(),
broadcast,
id: algo.our_id().clone(),
algo,
queue: VecDeque::new(),
outputs: Vec::new(),
}
}
/// Handles the first message in the node's queue.
fn handle_message(&mut self) -> (Option<ProposedValue>, MessageQueue) {
fn handle_message(&mut self) {
let (from_id, msg) = self.queue.pop_front().expect("message not found");
debug!("Handling {:?} -> {:?}: {:?}", from_id, self.id, msg);
let (output, msgs) = self.broadcast
.handle_broadcast_message(&from_id, msg)
self.algo
.handle_message(&from_id, msg)
.expect("handling message");
if let Some(output) = output.clone() {
self.outputs.push(output);
}
(output, msgs)
self.outputs.extend(self.algo.next_output());
}
}
@ -68,37 +63,43 @@ enum MessageScheduler {
impl MessageScheduler {
/// Chooses a node to be the next one to handle a message.
fn pick_node(&self, nodes: &BTreeMap<NodeId, TestNode>) -> NodeId {
fn pick_node<D: DistAlgorithm>(&self, nodes: &BTreeMap<D::NodeUid, TestNode<D>>) -> D::NodeUid {
match *self {
MessageScheduler::First => nodes
.iter()
.find(|(_, node)| !node.queue.is_empty())
.map(|(id, _)| *id)
.map(|(id, _)| id.clone())
.expect("no more messages in queue"),
MessageScheduler::Random => {
let ids: Vec<NodeId> = nodes
let ids: Vec<D::NodeUid> = nodes
.iter()
.filter(|(_, node)| !node.queue.is_empty())
.map(|(id, _)| *id)
.map(|(id, _)| id.clone())
.collect();
*rand::thread_rng()
rand::thread_rng()
.choose(&ids)
.expect("no more messages in queue")
.clone()
}
}
}
}
type MessageWithSender<D> = (
<D as DistAlgorithm>::NodeUid,
TargetedMessage<<D as DistAlgorithm>::Message, <D as DistAlgorithm>::NodeUid>,
);
/// An adversary that can control a set of nodes and pick the next good node to receive a message.
trait Adversary {
trait Adversary<D: DistAlgorithm> {
/// Chooses a node to be the next one to handle a message.
fn pick_node(&self, nodes: &BTreeMap<NodeId, TestNode>) -> NodeId;
fn pick_node(&self, nodes: &BTreeMap<D::NodeUid, TestNode<D>>) -> D::NodeUid;
/// Adds a message sent to one of the adversary's nodes.
fn push_message(&mut self, sender_id: NodeId, msg: TargetedMessage<BroadcastMessage, NodeId>);
fn push_message(&mut self, sender_id: D::NodeUid, msg: TargetedMessage<D::Message, D::NodeUid>);
/// Produces a list of messages to be sent from the adversary's nodes.
fn step(&mut self) -> Vec<(NodeId, TargetedMessage<BroadcastMessage, NodeId>)>;
fn step(&mut self) -> Vec<MessageWithSender<D>>;
}
/// An adversary whose nodes never send any messages.
@ -113,25 +114,25 @@ impl SilentAdversary {
}
}
impl Adversary for SilentAdversary {
fn pick_node(&self, nodes: &BTreeMap<NodeId, TestNode>) -> NodeId {
impl<D: DistAlgorithm> Adversary<D> for SilentAdversary {
fn pick_node(&self, nodes: &BTreeMap<D::NodeUid, TestNode<D>>) -> D::NodeUid {
self.scheduler.pick_node(nodes)
}
fn push_message(&mut self, _: NodeId, _: TargetedMessage<BroadcastMessage, NodeId>) {
fn push_message(&mut self, _: D::NodeUid, _: TargetedMessage<D::Message, D::NodeUid>) {
// All messages are ignored.
}
fn step(&mut self) -> Vec<(NodeId, TargetedMessage<BroadcastMessage, NodeId>)> {
fn step(&mut self) -> Vec<MessageWithSender<D>> {
vec![] // No messages are sent.
}
}
/// An adversary that proposes an alternate value.
/// An adversary that inputs an alternate value.
struct ProposeAdversary {
scheduler: MessageScheduler,
good_nodes: BTreeSet<NodeId>,
adv_nodes: BTreeSet<NodeId>,
good_nodes: BTreeSet<NodeUid>,
adv_nodes: BTreeSet<NodeUid>,
has_sent: bool,
}
@ -139,8 +140,8 @@ impl ProposeAdversary {
/// Creates a new replay adversary with the given message scheduler.
fn new(
scheduler: MessageScheduler,
good_nodes: BTreeSet<NodeId>,
adv_nodes: BTreeSet<NodeId>,
good_nodes: BTreeSet<NodeUid>,
adv_nodes: BTreeSet<NodeUid>,
) -> ProposeAdversary {
ProposeAdversary {
scheduler,
@ -151,54 +152,53 @@ impl ProposeAdversary {
}
}
impl Adversary for ProposeAdversary {
fn pick_node(&self, nodes: &BTreeMap<NodeId, TestNode>) -> NodeId {
impl Adversary<Broadcast<NodeUid>> for ProposeAdversary {
fn pick_node(&self, nodes: &BTreeMap<NodeUid, TestNode<Broadcast<NodeUid>>>) -> NodeUid {
self.scheduler.pick_node(nodes)
}
fn push_message(&mut self, _: NodeId, _: TargetedMessage<BroadcastMessage, NodeId>) {
fn push_message(&mut self, _: NodeUid, _: TargetedMessage<BroadcastMessage, NodeUid>) {
// All messages are ignored.
}
fn step(&mut self) -> Vec<(NodeId, TargetedMessage<BroadcastMessage, NodeId>)> {
fn step(&mut self) -> Vec<(NodeUid, TargetedMessage<BroadcastMessage, NodeUid>)> {
if self.has_sent {
return vec![];
}
self.has_sent = true;
let value = b"Fake news";
let node_ids: BTreeSet<NodeId> = self.adv_nodes
let node_ids: BTreeSet<NodeUid> = self.adv_nodes
.iter()
.chain(self.good_nodes.iter())
.cloned()
.chain(self.good_nodes.iter().cloned())
.collect();
let id = *self.adv_nodes.iter().next().unwrap();
let mut bc = Broadcast::new(id, id, node_ids).expect("broadcast instance");
let msgs = bc.propose_value(value.to_vec()).expect("propose");
msgs.into_iter().map(|msg| (id, msg)).collect()
bc.input(b"Fake news".to_vec()).expect("propose");
bc.message_iter().map(|msg| (id, msg)).collect()
}
}
/// A collection of `TestNode`s representing a network.
struct TestNetwork<A: Adversary> {
nodes: BTreeMap<NodeId, TestNode>,
adv_nodes: BTreeSet<NodeId>,
struct TestNetwork<A: Adversary<D>, D: DistAlgorithm> {
nodes: BTreeMap<D::NodeUid, TestNode<D>>,
adv_nodes: BTreeSet<D::NodeUid>,
adversary: A,
}
impl<A: Adversary> TestNetwork<A> {
impl<A: Adversary<Broadcast<NodeUid>>> TestNetwork<A, Broadcast<NodeUid>> {
/// Creates a new network with `good_num` good nodes, and the given `adversary` controlling
/// `adv_num` nodes.
fn new(good_num: usize, adv_num: usize, adversary: A) -> TestNetwork<A> {
let node_ids: BTreeSet<NodeId> = (0..(good_num + adv_num)).map(NodeId).collect();
let new_broadcast = |id: NodeId| {
fn new(good_num: usize, adv_num: usize, adversary: A) -> TestNetwork<A, Broadcast<NodeUid>> {
let node_ids: BTreeSet<NodeUid> = (0..(good_num + adv_num)).map(NodeUid).collect();
let new_broadcast = |id: NodeUid| {
let bc =
Broadcast::new(id, NodeId(0), node_ids.clone()).expect("Instantiate broadcast");
Broadcast::new(id, NodeUid(0), node_ids.clone()).expect("Instantiate broadcast");
(id, TestNode::new(bc))
};
let mut network = TestNetwork {
nodes: (0..good_num).map(NodeId).map(new_broadcast).collect(),
nodes: (0..good_num).map(NodeUid).map(new_broadcast).collect(),
adversary,
adv_nodes: (good_num..(good_num + adv_num)).map(NodeId).collect(),
adv_nodes: (good_num..(good_num + adv_num)).map(NodeUid).collect(),
};
let msgs = network.adversary.step();
for (sender_id, msg) in msgs {
@ -208,9 +208,9 @@ impl<A: Adversary> TestNetwork<A> {
}
/// Pushes the messages into the queues of the corresponding recipients.
fn dispatch_messages<Q>(&mut self, sender_id: NodeId, msgs: Q)
fn dispatch_messages<Q>(&mut self, sender_id: NodeUid, msgs: Q)
where
Q: IntoIterator<Item = TargetedMessage<BroadcastMessage, NodeId>> + fmt::Debug,
Q: IntoIterator<Item = TargetedMessage<BroadcastMessage, NodeUid>> + fmt::Debug,
{
for msg in msgs {
match msg {
@ -243,46 +243,50 @@ impl<A: Adversary> TestNetwork<A> {
}
}
/// Handles a queued message in a randomly selected node and returns the selected node's ID and
/// its output value, if any.
fn step(&mut self) -> (NodeId, Option<ProposedValue>) {
/// Handles a queued message in a randomly selected node and returns the selected node's ID.
fn step(&mut self) -> NodeUid {
let msgs = self.adversary.step();
for (sender_id, msg) in msgs {
self.dispatch_messages(sender_id, Some(msg));
}
// Pick a random non-idle node..
let id = self.adversary.pick_node(&self.nodes);
let (output, msgs) = self.nodes.get_mut(&id).unwrap().handle_message();
let msgs: Vec<_> = {
let node = self.nodes.get_mut(&id).unwrap();
node.handle_message();
node.algo.message_iter().collect()
};
self.dispatch_messages(id, msgs);
(id, output)
id
}
/// Makes the node `proposer_id` propose a value.
fn propose_value(&mut self, proposer_id: NodeId, value: ProposedValue) {
let msgs = self.nodes
.get_mut(&proposer_id)
.expect("proposer instance")
.broadcast
.propose_value(value)
.expect("propose");
fn input(&mut self, proposer_id: NodeUid, value: ProposedValue) {
let msgs: Vec<_> = {
let node = self.nodes.get_mut(&proposer_id).expect("proposer instance");
node.algo.input(value).expect("propose");
node.algo.message_iter().collect()
};
self.dispatch_messages(proposer_id, msgs);
}
}
/// Broadcasts a value from node 0 and expects all good nodes to receive it.
fn test_broadcast<A: Adversary>(mut network: TestNetwork<A>, proposed_value: &[u8]) {
fn test_broadcast<A: Adversary<Broadcast<NodeUid>>>(
mut network: TestNetwork<A, Broadcast<NodeUid>>,
proposed_value: &[u8],
) {
// This returns an error in all but the first test.
let _ = env_logger::try_init();
// Make node 0 propose the value.
network.propose_value(NodeId(0), proposed_value.to_vec());
network.input(NodeUid(0), proposed_value.to_vec());
// Handle messages in random order until all nodes have output the proposed value.
while network.nodes.values().any(|node| node.outputs.is_empty()) {
let (id, output) = network.step();
if let Some(value) = output {
assert_eq!(value, proposed_value);
assert_eq!(1, network.nodes[&id].outputs.len());
let id = network.step();
if !network.nodes[&id].outputs.is_empty() {
assert_eq!(vec![proposed_value.to_vec()], network.nodes[&id].outputs);
debug!("Node {:?} received", id);
}
}
@ -322,24 +326,24 @@ fn test_11_5_broadcast_nodes_first_delivery_silent() {
#[test]
fn test_3_1_broadcast_nodes_random_delivery_adv_propose() {
let good_nodes: BTreeSet<NodeId> = (0..3).map(NodeId).collect();
let adv_nodes: BTreeSet<NodeId> = (3..4).map(NodeId).collect();
let good_nodes: BTreeSet<NodeUid> = (0..3).map(NodeUid).collect();
let adv_nodes: BTreeSet<NodeUid> = (3..4).map(NodeUid).collect();
let adversary = ProposeAdversary::new(MessageScheduler::Random, good_nodes, adv_nodes);
test_broadcast(TestNetwork::new(3, 1, adversary), b"Foo");
}
#[test]
fn test_11_5_broadcast_nodes_random_delivery_adv_propose() {
let good_nodes: BTreeSet<NodeId> = (0..11).map(NodeId).collect();
let adv_nodes: BTreeSet<NodeId> = (11..16).map(NodeId).collect();
let good_nodes: BTreeSet<NodeUid> = (0..11).map(NodeUid).collect();
let adv_nodes: BTreeSet<NodeUid> = (11..16).map(NodeUid).collect();
let adversary = ProposeAdversary::new(MessageScheduler::Random, good_nodes, adv_nodes);
test_broadcast(TestNetwork::new(11, 5, adversary), b"Foo");
}
#[test]
fn test_11_5_broadcast_nodes_first_delivery_adv_propose() {
let good_nodes: BTreeSet<NodeId> = (0..11).map(NodeId).collect();
let adv_nodes: BTreeSet<NodeId> = (11..16).map(NodeId).collect();
let good_nodes: BTreeSet<NodeUid> = (0..11).map(NodeUid).collect();
let adv_nodes: BTreeSet<NodeUid> = (11..16).map(NodeUid).collect();
let adversary = ProposeAdversary::new(MessageScheduler::First, good_nodes, adv_nodes);
test_broadcast(TestNetwork::new(11, 5, adversary), b"Foo");
}