mirror of https://github.com/poanetwork/hbbft.git
Merge pull request #28 from poanetwork/afck-testnet
Generalize TestNetwork and test HoneyBadger.
This commit is contained in:
commit
4b0eebf299
|
@ -10,6 +10,7 @@ itertools = "0.7"
|
|||
log = "0.4.1"
|
||||
merkle = { git = "https://github.com/afck/merkle.rs", branch = "public-proof" }
|
||||
protobuf = { version = "1.6.0", optional = true }
|
||||
rand = "0.4.2"
|
||||
reed-solomon-erasure = "3.0"
|
||||
ring = "^0.12"
|
||||
serde = "1.0.55"
|
||||
|
@ -26,7 +27,6 @@ protoc-rust = { version = "1.6.0", optional = true }
|
|||
crossbeam = "0.3.2"
|
||||
crossbeam-channel = "0.1"
|
||||
docopt = "0.8"
|
||||
rand = "0.3"
|
||||
|
||||
[[example]]
|
||||
name = "consensus-node"
|
||||
|
|
|
@ -9,10 +9,9 @@ use std::hash::Hash;
|
|||
|
||||
use agreement;
|
||||
use agreement::{Agreement, AgreementMessage};
|
||||
|
||||
use broadcast;
|
||||
use broadcast::{Broadcast, BroadcastMessage};
|
||||
|
||||
use fmt::HexBytes;
|
||||
use messaging::{DistAlgorithm, Target, TargetedMessage};
|
||||
|
||||
// TODO: Make this a generic argument of `Broadcast`.
|
||||
|
@ -65,6 +64,47 @@ pub struct CommonSubset<NodeUid: Eq + Hash + Ord> {
|
|||
agreement_instances: HashMap<NodeUid, Agreement<NodeUid>>,
|
||||
broadcast_results: HashMap<NodeUid, ProposedValue>,
|
||||
agreement_results: HashMap<NodeUid, bool>,
|
||||
messages: VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>,
|
||||
output: Option<HashMap<NodeUid, ProposedValue>>,
|
||||
}
|
||||
|
||||
impl<NodeUid: Clone + Debug + Eq + Hash + Ord> DistAlgorithm for CommonSubset<NodeUid> {
|
||||
type NodeUid = NodeUid;
|
||||
type Input = ProposedValue;
|
||||
type Output = HashMap<NodeUid, ProposedValue>;
|
||||
type Message = Message<NodeUid>;
|
||||
type Error = Error;
|
||||
|
||||
fn input(&mut self, input: Self::Input) -> Result<(), Self::Error> {
|
||||
self.send_proposed_value(input)
|
||||
}
|
||||
|
||||
fn handle_message(
|
||||
&mut self,
|
||||
sender_id: &Self::NodeUid,
|
||||
message: Self::Message,
|
||||
) -> Result<(), Self::Error> {
|
||||
match message {
|
||||
Message::Broadcast(p_id, b_msg) => self.handle_broadcast(sender_id, &p_id, b_msg),
|
||||
Message::Agreement(p_id, a_msg) => self.handle_agreement(sender_id, &p_id, &a_msg),
|
||||
}
|
||||
}
|
||||
|
||||
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, Self::NodeUid>> {
|
||||
self.messages.pop_front()
|
||||
}
|
||||
|
||||
fn next_output(&mut self) -> Option<Self::Output> {
|
||||
self.output.take()
|
||||
}
|
||||
|
||||
fn terminated(&self) -> bool {
|
||||
self.messages.is_empty() && self.agreement_instances.values().all(Agreement::terminated)
|
||||
}
|
||||
|
||||
fn our_id(&self) -> &Self::NodeUid {
|
||||
&self.uid
|
||||
}
|
||||
}
|
||||
|
||||
impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
|
||||
|
@ -99,141 +139,111 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
|
|||
agreement_instances,
|
||||
broadcast_results: HashMap::new(),
|
||||
agreement_results: HashMap::new(),
|
||||
messages: VecDeque::new(),
|
||||
output: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Common Subset input message handler. It receives a value for broadcast
|
||||
/// and redirects it to the corresponding broadcast instance.
|
||||
pub fn send_proposed_value(
|
||||
&mut self,
|
||||
value: ProposedValue,
|
||||
) -> Result<VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>, Error> {
|
||||
pub fn send_proposed_value(&mut self, value: ProposedValue) -> Result<(), Error> {
|
||||
// Upon receiving input v_i , input v_i to RBC_i. See Figure 2.
|
||||
if let Some(instance) = self.broadcast_instances.get_mut(&self.uid) {
|
||||
instance.input(value)?;
|
||||
let uid = self.uid.clone();
|
||||
Ok(instance
|
||||
.message_iter()
|
||||
.map(|msg| msg.map(|b_msg| Message::Broadcast(uid.clone(), b_msg)))
|
||||
.collect())
|
||||
self.messages.extend(
|
||||
instance
|
||||
.message_iter()
|
||||
.map(|msg| msg.map(|b_msg| Message::Broadcast(uid.clone(), b_msg))),
|
||||
);
|
||||
} else {
|
||||
Err(Error::NoSuchBroadcastInstance)
|
||||
return Err(Error::NoSuchBroadcastInstance);
|
||||
}
|
||||
self.try_agreement_completion();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Upon delivery of v_j from RBC_j, if input has not yet been provided to
|
||||
/// BA_j, then provide input 1 to BA_j. See Figure 11.
|
||||
fn on_broadcast_result(&mut self, uid: &NodeUid) -> Result<Option<AgreementMessage>, Error> {
|
||||
fn on_broadcast_result(&mut self, uid: &NodeUid) -> Result<(), Error> {
|
||||
if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) {
|
||||
if agreement_instance.accepts_input() {
|
||||
Ok(Some(agreement_instance.set_input(true)?))
|
||||
} else {
|
||||
Ok(None)
|
||||
let msg = agreement_instance.set_input(true)?;
|
||||
self.messages
|
||||
.push_back(Target::All.message(Message::Agreement(uid.clone(), msg)));
|
||||
}
|
||||
} else {
|
||||
Err(Error::NoSuchBroadcastInstance)
|
||||
}
|
||||
}
|
||||
|
||||
/// Receives a message form a remote node `sender_id`, and returns an optional result of the
|
||||
/// Common Subset algorithm - a set of proposed values - and a queue of messages to be sent to
|
||||
/// remote nodes, or an error.
|
||||
pub fn handle_message(
|
||||
&mut self,
|
||||
sender_id: &NodeUid,
|
||||
message: Message<NodeUid>,
|
||||
) -> Result<CommonSubsetOutput<NodeUid>, Error> {
|
||||
match message {
|
||||
Message::Broadcast(p_id, b_msg) => self.handle_broadcast(sender_id, &p_id, b_msg),
|
||||
Message::Agreement(p_id, a_msg) => self.handle_agreement(sender_id, &p_id, &a_msg),
|
||||
return Err(Error::NoSuchBroadcastInstance);
|
||||
}
|
||||
self.try_agreement_completion();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Receives a broadcast message from a remote node `sender_id` concerning a
|
||||
/// value proposed by the node `proposer_id`. The output contains an
|
||||
/// optional result of the Common Subset algorithm - a set of proposed
|
||||
/// values - and a queue of messages to be sent to remote nodes, or an
|
||||
/// error.
|
||||
/// value proposed by the node `proposer_id`.
|
||||
fn handle_broadcast(
|
||||
&mut self,
|
||||
sender_id: &NodeUid,
|
||||
proposer_id: &NodeUid,
|
||||
bmessage: BroadcastMessage,
|
||||
) -> Result<CommonSubsetOutput<NodeUid>, Error> {
|
||||
let mut instance_result = None;
|
||||
let input_result: Result<
|
||||
VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>,
|
||||
Error,
|
||||
> = {
|
||||
if let Some(broadcast_instance) = self.broadcast_instances.get_mut(proposer_id) {
|
||||
broadcast_instance.handle_message(sender_id, bmessage)?;
|
||||
instance_result = broadcast_instance.next_output();
|
||||
Ok(broadcast_instance
|
||||
) -> Result<(), Error> {
|
||||
let value;
|
||||
if let Some(broadcast_instance) = self.broadcast_instances.get_mut(proposer_id) {
|
||||
broadcast_instance.handle_message(sender_id, bmessage)?;
|
||||
self.messages.extend(
|
||||
broadcast_instance
|
||||
.message_iter()
|
||||
.map(|msg| msg.map(|b_msg| Message::Broadcast(proposer_id.clone(), b_msg)))
|
||||
.collect())
|
||||
} else {
|
||||
Err(Error::NoSuchBroadcastInstance)
|
||||
}
|
||||
};
|
||||
let mut opt_message: Option<AgreementMessage> = None;
|
||||
if let Some(value) = instance_result {
|
||||
self.broadcast_results.insert(proposer_id.clone(), value);
|
||||
opt_message = self.on_broadcast_result(proposer_id)?;
|
||||
.map(|msg| msg.map(|b_msg| Message::Broadcast(proposer_id.clone(), b_msg))),
|
||||
);
|
||||
value = match broadcast_instance.next_output() {
|
||||
None => return Ok(()),
|
||||
Some(result) => result,
|
||||
};
|
||||
} else {
|
||||
return Err(Error::NoSuchBroadcastInstance);
|
||||
}
|
||||
input_result.map(|mut queue| {
|
||||
if let Some(agreement_message) = opt_message {
|
||||
// Append the message to agreement nodes to the common output queue.
|
||||
queue.push_back(
|
||||
Target::All.message(Message::Agreement(proposer_id.clone(), agreement_message)),
|
||||
);
|
||||
}
|
||||
(None, queue)
|
||||
})
|
||||
self.broadcast_results.insert(proposer_id.clone(), value);
|
||||
self.on_broadcast_result(proposer_id)
|
||||
}
|
||||
|
||||
/// Receives an agreement message from a remote node `sender_id` concerning
|
||||
/// a value proposed by the node `proposer_id`. The output contains an
|
||||
/// optional result of the Common Subset algorithm - a set of proposed
|
||||
/// values - and a queue of messages to be sent to remote nodes, or an
|
||||
/// error.
|
||||
/// a value proposed by the node `proposer_id`.
|
||||
fn handle_agreement(
|
||||
&mut self,
|
||||
sender_id: &NodeUid,
|
||||
proposer_id: &NodeUid,
|
||||
amessage: &AgreementMessage,
|
||||
) -> Result<CommonSubsetOutput<NodeUid>, Error> {
|
||||
// The result defaults to error.
|
||||
let mut result = Err(Error::NoSuchAgreementInstance);
|
||||
|
||||
) -> Result<(), Error> {
|
||||
let input_result;
|
||||
// Send the message to the local instance of Agreement
|
||||
if let Some(agreement_instance) = self.agreement_instances.get_mut(proposer_id) {
|
||||
// Optional output of agreement and outgoing agreement
|
||||
// messages to remote nodes.
|
||||
result = if agreement_instance.terminated() {
|
||||
if agreement_instance.terminated() {
|
||||
// This instance has terminated and does not accept input.
|
||||
Ok((None, VecDeque::new()))
|
||||
} else {
|
||||
// Send the message to the agreement instance.
|
||||
agreement_instance
|
||||
.handle_agreement_message(sender_id, &amessage)
|
||||
.map_err(Error::from)
|
||||
return Ok(());
|
||||
}
|
||||
// Send the message to the agreement instance.
|
||||
let (opt_output, msgs) =
|
||||
agreement_instance.handle_agreement_message(sender_id, &amessage)?;
|
||||
self.messages.extend(
|
||||
msgs.into_iter().map(|a_msg| {
|
||||
Target::All.message(Message::Agreement(proposer_id.clone(), a_msg))
|
||||
}),
|
||||
);
|
||||
input_result = opt_output;
|
||||
} else {
|
||||
debug!("Proposer {:?} does not exist.", proposer_id);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let (output, mut outgoing) = result?;
|
||||
|
||||
// Process Agreement outputs.
|
||||
if let Some(b) = output {
|
||||
outgoing.append(&mut self.on_agreement_output(proposer_id, b)?);
|
||||
if let Some(output) = input_result {
|
||||
// Process Agreement outputs.
|
||||
self.on_agreement_output(proposer_id, output)?;
|
||||
}
|
||||
|
||||
// Check whether Agreement has completed.
|
||||
let into_msg = |a_msg| Target::All.message(Message::Agreement(proposer_id.clone(), a_msg));
|
||||
Ok((
|
||||
self.try_agreement_completion(),
|
||||
outgoing.into_iter().map(into_msg).collect(),
|
||||
))
|
||||
self.try_agreement_completion();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Callback to be invoked on receipt of the decision value of the Agreement
|
||||
|
@ -242,23 +252,28 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
|
|||
&mut self,
|
||||
element_proposer_id: &NodeUid,
|
||||
result: bool,
|
||||
) -> Result<VecDeque<AgreementMessage>, Error> {
|
||||
) -> Result<(), Error> {
|
||||
self.agreement_results
|
||||
.insert(element_proposer_id.clone(), result);
|
||||
debug!("Updated Agreement results: {:?}", self.agreement_results);
|
||||
debug!(
|
||||
"{:?} Updated Agreement results: {:?}",
|
||||
self.uid, self.agreement_results
|
||||
);
|
||||
if !result || self.count_true() < self.num_nodes - self.num_faulty_nodes {
|
||||
return Ok(VecDeque::new());
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Upon delivery of value 1 from at least N − f instances of BA, provide
|
||||
// input 0 to each instance of BA that has not yet been provided input.
|
||||
let mut outgoing = VecDeque::new();
|
||||
for instance in self.agreement_instances.values_mut() {
|
||||
if instance.accepts_input() {
|
||||
outgoing.push_back(instance.set_input(false)?);
|
||||
for agreement_instance in self.agreement_instances.values_mut() {
|
||||
if agreement_instance.accepts_input() {
|
||||
let msg = agreement_instance.set_input(false)?;
|
||||
let uid = agreement_instance.our_id().clone();
|
||||
self.messages
|
||||
.push_back(Target::All.message(Message::Agreement(uid, msg)));
|
||||
}
|
||||
}
|
||||
Ok(outgoing)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the number of agreement instances that have decided "yes".
|
||||
|
@ -266,42 +281,35 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
|
|||
self.agreement_results.values().filter(|v| **v).count()
|
||||
}
|
||||
|
||||
fn try_agreement_completion(&self) -> Option<HashMap<NodeUid, ProposedValue>> {
|
||||
fn try_agreement_completion(&mut self) {
|
||||
// Once all instances of BA have completed, let C ⊂ [1..N] be
|
||||
// the indexes of each BA that delivered 1. Wait for the output
|
||||
// v_j for each RBC_j such that j∈C. Finally output ∪ j∈C v_j.
|
||||
if self.agreement_instances
|
||||
.values()
|
||||
.all(|instance| instance.terminated())
|
||||
{
|
||||
debug!("All Agreement instances have terminated");
|
||||
// All instances of Agreement that delivered `true` (or "1" in the paper).
|
||||
let delivered_1: HashSet<&NodeUid> = self.agreement_results
|
||||
.iter()
|
||||
.filter(|(_, v)| **v)
|
||||
.map(|(k, _)| k)
|
||||
.collect();
|
||||
debug!("Agreement instances that delivered 1: {:?}", delivered_1);
|
||||
if self.agreement_results.len() < self.num_nodes {
|
||||
return;
|
||||
}
|
||||
debug!("{:?} All Agreement instances have terminated", self.uid);
|
||||
// All instances of Agreement that delivered `true` (or "1" in the paper).
|
||||
let delivered_1: HashSet<&NodeUid> = self.agreement_results
|
||||
.iter()
|
||||
.filter(|(_, v)| **v)
|
||||
.map(|(k, _)| k)
|
||||
.collect();
|
||||
debug!("Agreement instances that delivered 1: {:?}", delivered_1);
|
||||
|
||||
// Results of Broadcast instances in `delivered_1`
|
||||
let broadcast_results: HashMap<NodeUid, ProposedValue> = self.broadcast_results
|
||||
.iter()
|
||||
.filter(|(k, _)| delivered_1.get(k).is_some())
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
.collect();
|
||||
debug!(
|
||||
"Broadcast results among the Agreement instances that delivered 1: {:?}",
|
||||
broadcast_results
|
||||
);
|
||||
// Results of Broadcast instances in `delivered_1`
|
||||
let broadcast_results: HashMap<NodeUid, ProposedValue> = self.broadcast_results
|
||||
.iter()
|
||||
.filter(|(k, _)| delivered_1.contains(k))
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
.collect();
|
||||
|
||||
if delivered_1.len() == broadcast_results.len() {
|
||||
debug!("Agreement instances completed with {:?}", broadcast_results);
|
||||
Some(broadcast_results)
|
||||
} else {
|
||||
None
|
||||
if delivered_1.len() == broadcast_results.len() {
|
||||
debug!("{:?} Agreement instances completed:", self.uid);
|
||||
for (uid, result) in &broadcast_results {
|
||||
debug!(" {:?} → {:?}", uid, HexBytes(&result));
|
||||
}
|
||||
} else {
|
||||
None
|
||||
self.output = Some(broadcast_results)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
use std::collections::{HashSet, VecDeque};
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::collections::btree_map::Entry;
|
||||
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
|
||||
use std::fmt::Debug;
|
||||
use std::hash::Hash;
|
||||
use std::iter;
|
||||
use std::{cmp, iter};
|
||||
|
||||
use bincode;
|
||||
use rand;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::Serialize;
|
||||
|
||||
|
@ -11,15 +13,14 @@ use common_subset::{self, CommonSubset};
|
|||
use messaging::{DistAlgorithm, TargetedMessage};
|
||||
|
||||
/// An instance of the Honey Badger Byzantine fault tolerant consensus algorithm.
|
||||
pub struct HoneyBadger<T, N: Eq + Hash + Ord + Clone + Display> {
|
||||
/// The buffer of transactions that have not yet been included in any batch.
|
||||
pub struct HoneyBadger<T, N: Eq + Hash + Ord + Clone> {
|
||||
/// The buffer of transactions that have not yet been included in any output batch.
|
||||
buffer: VecDeque<T>,
|
||||
/// The current epoch, i.e. the number of batches that have been output so far.
|
||||
/// The earliest epoch from which we have not yet received output.
|
||||
epoch: u64,
|
||||
/// The Asynchronous Common Subset instance that decides which nodes' transactions to include.
|
||||
// TODO: Common subset could be optimized to output before it is allowed to terminate. In that
|
||||
// case, we would need to keep track of one or two previous instances, too.
|
||||
common_subset: CommonSubset<N>,
|
||||
/// The Asynchronous Common Subset instance that decides which nodes' transactions to include,
|
||||
/// indexed by epoch.
|
||||
common_subsets: BTreeMap<u64, CommonSubset<N>>,
|
||||
/// This node's ID.
|
||||
id: N,
|
||||
/// The set of all node IDs of the participants (including ourselves).
|
||||
|
@ -36,8 +37,8 @@ pub struct HoneyBadger<T, N: Eq + Hash + Ord + Clone + Display> {
|
|||
|
||||
impl<T, N> DistAlgorithm for HoneyBadger<T, N>
|
||||
where
|
||||
T: Ord + Serialize + DeserializeOwned,
|
||||
N: Eq + Hash + Ord + Clone + Display + Debug,
|
||||
T: Ord + Serialize + DeserializeOwned + Debug,
|
||||
N: Eq + Hash + Ord + Clone + Debug,
|
||||
{
|
||||
type NodeUid = N;
|
||||
type Input = T;
|
||||
|
@ -78,54 +79,75 @@ where
|
|||
}
|
||||
|
||||
// TODO: Use a threshold encryption scheme to encrypt the proposed transactions.
|
||||
// TODO: We only contribute a proposal to the next round once we have `batch_size` buffered
|
||||
// transactions. This should be more configurable: `min_batch_size`, `max_batch_size` and maybe a
|
||||
// timeout? The paper assumes that all nodes will often have more or less the same set of
|
||||
// transactions in the buffer; if the sets are disjoint on average, we can just send our whole
|
||||
// buffer instead of 1/n of it.
|
||||
impl<T, N> HoneyBadger<T, N>
|
||||
where
|
||||
T: Ord + Serialize + DeserializeOwned,
|
||||
N: Eq + Hash + Ord + Clone + Display + Debug,
|
||||
T: Ord + Serialize + DeserializeOwned + Debug,
|
||||
N: Eq + Hash + Ord + Clone + Debug,
|
||||
{
|
||||
/// Returns a new Honey Badger instance with the given parameters, starting at epoch `0`.
|
||||
pub fn new<I>(id: N, all_uids_iter: I, batch_size: usize) -> Result<Self, Error>
|
||||
pub fn new<I, TI>(id: N, all_uids_iter: I, batch_size: usize, txs: TI) -> Result<Self, Error>
|
||||
where
|
||||
I: IntoIterator<Item = N>,
|
||||
TI: IntoIterator<Item = T>,
|
||||
{
|
||||
let all_uids: HashSet<N> = all_uids_iter.into_iter().collect();
|
||||
if !all_uids.contains(&id) {
|
||||
return Err(Error::OwnIdMissing);
|
||||
}
|
||||
Ok(HoneyBadger {
|
||||
buffer: VecDeque::new(),
|
||||
let mut honey_badger = HoneyBadger {
|
||||
buffer: txs.into_iter().collect(),
|
||||
epoch: 0,
|
||||
common_subset: CommonSubset::new(id.clone(), &all_uids)?,
|
||||
common_subsets: BTreeMap::new(),
|
||||
id,
|
||||
batch_size,
|
||||
all_uids,
|
||||
messages: VecDeque::new(),
|
||||
output: VecDeque::new(),
|
||||
})
|
||||
};
|
||||
honey_badger.propose()?;
|
||||
Ok(honey_badger)
|
||||
}
|
||||
|
||||
/// Adds transactions into the buffer.
|
||||
pub fn add_transactions<I>(&mut self, txs: I) -> Result<(), Error>
|
||||
where
|
||||
I: IntoIterator<Item = T>,
|
||||
{
|
||||
pub fn add_transactions<I: IntoIterator<Item = T>>(&mut self, txs: I) -> Result<(), Error> {
|
||||
self.buffer.extend(txs);
|
||||
if self.buffer.len() < self.batch_size {
|
||||
return Ok(());
|
||||
}
|
||||
let share = bincode::serialize(&self.buffer)?;
|
||||
for targeted_msg in self.common_subset.send_proposed_value(share)? {
|
||||
let msg = targeted_msg.map(|cs_msg| Message::CommonSubset(self.epoch, cs_msg));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Proposes a new batch in the current epoch.
|
||||
fn propose(&mut self) -> Result<(), Error> {
|
||||
let proposal = self.choose_transactions()?;
|
||||
let cs = match self.common_subsets.entry(self.epoch) {
|
||||
Entry::Occupied(entry) => entry.into_mut(),
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(CommonSubset::new(self.id.clone(), &self.all_uids)?)
|
||||
}
|
||||
};
|
||||
cs.input(proposal)?;
|
||||
for targeted_msg in cs.message_iter() {
|
||||
let epoch = self.epoch;
|
||||
let msg = targeted_msg.map(|cs_msg| Message::CommonSubset(epoch, cs_msg));
|
||||
self.messages.push_back(msg);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns a random choice of `batch_size / all_uids.len()` buffered transactions, and
|
||||
/// serializes them.
|
||||
fn choose_transactions(&self) -> Result<Vec<u8>, Error> {
|
||||
let mut rng = rand::thread_rng();
|
||||
let amount = cmp::max(1, self.batch_size / self.all_uids.len());
|
||||
let sample = match rand::seq::sample_iter(&mut rng, &self.buffer, amount) {
|
||||
Ok(choice) => choice,
|
||||
Err(choice) => choice, // Fewer than `amount` were available, which is fine.
|
||||
};
|
||||
debug!(
|
||||
"{:?} Proposing in epoch {}: {:?}",
|
||||
self.id, self.epoch, sample
|
||||
);
|
||||
Ok(bincode::serialize(&sample)?)
|
||||
}
|
||||
|
||||
/// Handles a message for the common subset sub-algorithm.
|
||||
fn handle_common_subset_message(
|
||||
&mut self,
|
||||
|
@ -133,47 +155,96 @@ where
|
|||
epoch: u64,
|
||||
message: common_subset::Message<N>,
|
||||
) -> Result<(), Error> {
|
||||
if epoch != self.epoch {
|
||||
// TODO: Do we need to cache messages for future epochs?
|
||||
return Ok(());
|
||||
{
|
||||
// Borrow the instance for `epoch`, or create it.
|
||||
let cs = match self.common_subsets.entry(epoch) {
|
||||
Entry::Occupied(entry) => entry.into_mut(),
|
||||
Entry::Vacant(entry) => {
|
||||
if epoch < self.epoch {
|
||||
return Ok(()); // Epoch has already terminated. Message is obsolete.
|
||||
} else {
|
||||
entry.insert(CommonSubset::new(self.id.clone(), &self.all_uids)?)
|
||||
}
|
||||
}
|
||||
};
|
||||
// Handle the message and put the outgoing messages into the queue.
|
||||
cs.handle_message(sender_id, message)?;
|
||||
for targeted_msg in cs.message_iter() {
|
||||
let msg = targeted_msg.map(|cs_msg| Message::CommonSubset(epoch, cs_msg));
|
||||
self.messages.push_back(msg);
|
||||
}
|
||||
}
|
||||
let (cs_out, cs_msgs) = self.common_subset.handle_message(sender_id, message)?;
|
||||
|
||||
for targeted_msg in cs_msgs {
|
||||
let msg = targeted_msg.map(|cs_msg| Message::CommonSubset(epoch, cs_msg));
|
||||
self.messages.push_back(msg);
|
||||
// If this is the current epoch, the message could cause a new output.
|
||||
if epoch == self.epoch {
|
||||
self.process_output()?;
|
||||
}
|
||||
// FIXME: Handle the node IDs in `ser_batches`.
|
||||
let batches: Vec<Vec<T>> = if let Some(ser_batches) = cs_out {
|
||||
ser_batches
|
||||
.values()
|
||||
.map(|ser_batch| bincode::deserialize(&ser_batch))
|
||||
.collect::<Result<_, _>>()?
|
||||
} else {
|
||||
return Ok(());
|
||||
};
|
||||
let mut transactions: Vec<T> = batches.into_iter().flat_map(|txs| txs).collect();
|
||||
transactions.sort();
|
||||
self.epoch += 1;
|
||||
self.common_subset = CommonSubset::new(self.id.clone(), &self.all_uids)?;
|
||||
self.add_transactions(None)?;
|
||||
self.output.push_back(Batch {
|
||||
epoch,
|
||||
transactions,
|
||||
});
|
||||
self.remove_terminated(epoch);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Checks whether the current epoch has output, and if it does, advances the epoch and
|
||||
/// proposes a new batch.
|
||||
fn process_output(&mut self) -> Result<(), Error> {
|
||||
let old_epoch = self.epoch;
|
||||
while let Some(ser_batches) = self.take_current_output() {
|
||||
// Deserialize the output.
|
||||
let transactions: BTreeSet<T> = ser_batches
|
||||
.into_iter()
|
||||
.map(|(_, ser_batch)| bincode::deserialize::<Vec<T>>(&ser_batch))
|
||||
.collect::<Result<Vec<Vec<T>>, _>>()?
|
||||
.into_iter()
|
||||
.flat_map(|txs| txs)
|
||||
.collect();
|
||||
// Remove the output transactions from our buffer.
|
||||
self.buffer.retain(|tx| !transactions.contains(tx));
|
||||
debug!(
|
||||
"{:?} Epoch {} output {:?}",
|
||||
self.id, self.epoch, transactions
|
||||
);
|
||||
// Queue the output and advance the epoch.
|
||||
self.output.push_back(Batch {
|
||||
epoch: self.epoch,
|
||||
transactions,
|
||||
});
|
||||
self.epoch += 1;
|
||||
}
|
||||
// If we have moved to a new epoch, propose a new batch of transactions.
|
||||
if self.epoch > old_epoch {
|
||||
self.propose()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the output of the current epoch's `CommonSubset` instance, if any.
|
||||
fn take_current_output(&mut self) -> Option<HashMap<N, Vec<u8>>> {
|
||||
self.common_subsets
|
||||
.get_mut(&self.epoch)
|
||||
.and_then(CommonSubset::next_output)
|
||||
}
|
||||
|
||||
/// Removes all `CommonSubset` instances from _past_ epochs that have terminated.
|
||||
fn remove_terminated(&mut self, from_epoch: u64) {
|
||||
for epoch in from_epoch..self.epoch {
|
||||
if self.common_subsets
|
||||
.get(&epoch)
|
||||
.map_or(false, CommonSubset::terminated)
|
||||
{
|
||||
debug!("{:?} Epoch {} has terminated.", self.id, epoch);
|
||||
self.common_subsets.remove(&epoch);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A batch of transactions the algorithm has output.
|
||||
pub struct Batch<T> {
|
||||
pub epoch: u64,
|
||||
pub transactions: Vec<T>,
|
||||
pub transactions: BTreeSet<T>,
|
||||
}
|
||||
|
||||
/// A message sent to or received from another node's Honey Badger instance.
|
||||
#[cfg_attr(feature = "serialization-serde", derive(Serialize))]
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Message<N> {
|
||||
/// A message belonging to the common subset algorithm in the given epoch.
|
||||
CommonSubset(u64, common_subset::Message<N>),
|
||||
|
|
|
@ -12,6 +12,7 @@ extern crate itertools;
|
|||
extern crate merkle;
|
||||
#[cfg(feature = "serialization-protobuf")]
|
||||
extern crate protobuf;
|
||||
extern crate rand;
|
||||
extern crate reed_solomon_erasure;
|
||||
extern crate ring;
|
||||
extern crate serde;
|
||||
|
@ -22,10 +23,10 @@ extern crate serde_derive;
|
|||
pub mod agreement;
|
||||
pub mod broadcast;
|
||||
pub mod common_subset;
|
||||
mod fmt;
|
||||
pub mod honey_badger;
|
||||
pub mod messaging;
|
||||
#[cfg(feature = "serialization-protobuf")]
|
||||
pub mod proto;
|
||||
#[cfg(feature = "serialization-protobuf")]
|
||||
pub mod proto_io;
|
||||
mod fmt;
|
||||
|
|
|
@ -4,135 +4,18 @@ extern crate hbbft;
|
|||
#[macro_use]
|
||||
extern crate log;
|
||||
extern crate env_logger;
|
||||
extern crate merkle;
|
||||
extern crate rand;
|
||||
|
||||
mod network;
|
||||
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::iter;
|
||||
|
||||
use rand::Rng;
|
||||
use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
||||
use std::{fmt, iter};
|
||||
|
||||
use hbbft::broadcast::{Broadcast, BroadcastMessage};
|
||||
use hbbft::messaging::{DistAlgorithm, Target, TargetedMessage};
|
||||
|
||||
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy)]
|
||||
struct NodeUid(usize);
|
||||
|
||||
type ProposedValue = Vec<u8>;
|
||||
|
||||
/// A "node" running a broadcast instance.
|
||||
struct TestNode<D: DistAlgorithm> {
|
||||
/// This node's own ID.
|
||||
id: D::NodeUid,
|
||||
/// The instance of the broadcast algorithm.
|
||||
algo: D,
|
||||
/// Incoming messages from other nodes that this node has not yet handled.
|
||||
queue: VecDeque<(D::NodeUid, D::Message)>,
|
||||
/// The values this node has output so far.
|
||||
outputs: Vec<D::Output>,
|
||||
}
|
||||
|
||||
impl<D: DistAlgorithm> TestNode<D> {
|
||||
/// Creates a new test node with the given broadcast instance.
|
||||
fn new(algo: D) -> TestNode<D> {
|
||||
TestNode {
|
||||
id: algo.our_id().clone(),
|
||||
algo,
|
||||
queue: VecDeque::new(),
|
||||
outputs: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles the first message in the node's queue.
|
||||
fn handle_message(&mut self) {
|
||||
let (from_id, msg) = self.queue.pop_front().expect("message not found");
|
||||
debug!("Handling {:?} -> {:?}: {:?}", from_id, self.id, msg);
|
||||
self.algo
|
||||
.handle_message(&from_id, msg)
|
||||
.expect("handling message");
|
||||
self.outputs.extend(self.algo.output_iter());
|
||||
}
|
||||
|
||||
/// Inputs a value into the instance.
|
||||
fn input(&mut self, input: D::Input) {
|
||||
self.algo.input(input).expect("input");
|
||||
self.outputs.extend(self.algo.output_iter());
|
||||
}
|
||||
}
|
||||
|
||||
/// A strategy for picking the next good node to handle a message.
|
||||
enum MessageScheduler {
|
||||
/// Picks a random node.
|
||||
Random,
|
||||
/// Picks the first non-idle node.
|
||||
First,
|
||||
}
|
||||
|
||||
impl MessageScheduler {
|
||||
/// Chooses a node to be the next one to handle a message.
|
||||
fn pick_node<D: DistAlgorithm>(&self, nodes: &BTreeMap<D::NodeUid, TestNode<D>>) -> D::NodeUid {
|
||||
match *self {
|
||||
MessageScheduler::First => nodes
|
||||
.iter()
|
||||
.find(|(_, node)| !node.queue.is_empty())
|
||||
.map(|(id, _)| id.clone())
|
||||
.expect("no more messages in queue"),
|
||||
MessageScheduler::Random => {
|
||||
let ids: Vec<D::NodeUid> = nodes
|
||||
.iter()
|
||||
.filter(|(_, node)| !node.queue.is_empty())
|
||||
.map(|(id, _)| id.clone())
|
||||
.collect();
|
||||
rand::thread_rng()
|
||||
.choose(&ids)
|
||||
.expect("no more messages in queue")
|
||||
.clone()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type MessageWithSender<D> = (
|
||||
<D as DistAlgorithm>::NodeUid,
|
||||
TargetedMessage<<D as DistAlgorithm>::Message, <D as DistAlgorithm>::NodeUid>,
|
||||
);
|
||||
|
||||
/// An adversary that can control a set of nodes and pick the next good node to receive a message.
|
||||
trait Adversary<D: DistAlgorithm> {
|
||||
/// Chooses a node to be the next one to handle a message.
|
||||
fn pick_node(&self, nodes: &BTreeMap<D::NodeUid, TestNode<D>>) -> D::NodeUid;
|
||||
|
||||
/// Adds a message sent to one of the adversary's nodes.
|
||||
fn push_message(&mut self, sender_id: D::NodeUid, msg: TargetedMessage<D::Message, D::NodeUid>);
|
||||
|
||||
/// Produces a list of messages to be sent from the adversary's nodes.
|
||||
fn step(&mut self) -> Vec<MessageWithSender<D>>;
|
||||
}
|
||||
|
||||
/// An adversary whose nodes never send any messages.
|
||||
struct SilentAdversary {
|
||||
scheduler: MessageScheduler,
|
||||
}
|
||||
|
||||
impl SilentAdversary {
|
||||
/// Creates a new silent adversary with the given message scheduler.
|
||||
fn new(scheduler: MessageScheduler) -> SilentAdversary {
|
||||
SilentAdversary { scheduler }
|
||||
}
|
||||
}
|
||||
|
||||
impl<D: DistAlgorithm> Adversary<D> for SilentAdversary {
|
||||
fn pick_node(&self, nodes: &BTreeMap<D::NodeUid, TestNode<D>>) -> D::NodeUid {
|
||||
self.scheduler.pick_node(nodes)
|
||||
}
|
||||
|
||||
fn push_message(&mut self, _: D::NodeUid, _: TargetedMessage<D::Message, D::NodeUid>) {
|
||||
// All messages are ignored.
|
||||
}
|
||||
|
||||
fn step(&mut self) -> Vec<MessageWithSender<D>> {
|
||||
vec![] // No messages are sent.
|
||||
}
|
||||
}
|
||||
use hbbft::messaging::{DistAlgorithm, TargetedMessage};
|
||||
use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork, TestNode};
|
||||
|
||||
/// An adversary that inputs an alternate value.
|
||||
struct ProposeAdversary {
|
||||
|
@ -187,99 +70,6 @@ impl Adversary<Broadcast<NodeUid>> for ProposeAdversary {
|
|||
}
|
||||
}
|
||||
|
||||
/// A collection of `TestNode`s representing a network.
|
||||
struct TestNetwork<A: Adversary<D>, D: DistAlgorithm> {
|
||||
nodes: BTreeMap<D::NodeUid, TestNode<D>>,
|
||||
adv_nodes: BTreeSet<D::NodeUid>,
|
||||
adversary: A,
|
||||
}
|
||||
|
||||
impl<A: Adversary<Broadcast<NodeUid>>> TestNetwork<A, Broadcast<NodeUid>> {
|
||||
/// Creates a new network with `good_num` good nodes, and the given `adversary` controlling
|
||||
/// `adv_num` nodes.
|
||||
fn new(good_num: usize, adv_num: usize, adversary: A) -> TestNetwork<A, Broadcast<NodeUid>> {
|
||||
let node_ids: BTreeSet<NodeUid> = (0..(good_num + adv_num)).map(NodeUid).collect();
|
||||
let new_broadcast = |id: NodeUid| {
|
||||
let bc =
|
||||
Broadcast::new(id, NodeUid(0), node_ids.clone()).expect("Instantiate broadcast");
|
||||
(id, TestNode::new(bc))
|
||||
};
|
||||
let mut network = TestNetwork {
|
||||
nodes: (0..good_num).map(NodeUid).map(new_broadcast).collect(),
|
||||
adversary,
|
||||
adv_nodes: (good_num..(good_num + adv_num)).map(NodeUid).collect(),
|
||||
};
|
||||
let msgs = network.adversary.step();
|
||||
for (sender_id, msg) in msgs {
|
||||
network.dispatch_messages(sender_id, vec![msg]);
|
||||
}
|
||||
network
|
||||
}
|
||||
|
||||
/// Pushes the messages into the queues of the corresponding recipients.
|
||||
fn dispatch_messages<Q>(&mut self, sender_id: NodeUid, msgs: Q)
|
||||
where
|
||||
Q: IntoIterator<Item = TargetedMessage<BroadcastMessage, NodeUid>> + fmt::Debug,
|
||||
{
|
||||
for msg in msgs {
|
||||
match msg {
|
||||
TargetedMessage {
|
||||
target: Target::All,
|
||||
ref message,
|
||||
} => {
|
||||
for node in self.nodes.values_mut() {
|
||||
if node.id != sender_id {
|
||||
node.queue.push_back((sender_id, message.clone()))
|
||||
}
|
||||
}
|
||||
self.adversary.push_message(sender_id, msg.clone());
|
||||
}
|
||||
TargetedMessage {
|
||||
target: Target::Node(to_id),
|
||||
ref message,
|
||||
} => {
|
||||
if self.adv_nodes.contains(&to_id) {
|
||||
self.adversary.push_message(sender_id, msg.clone());
|
||||
} else {
|
||||
self.nodes
|
||||
.get_mut(&to_id)
|
||||
.unwrap()
|
||||
.queue
|
||||
.push_back((sender_id, message.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles a queued message in a randomly selected node and returns the selected node's ID.
|
||||
fn step(&mut self) -> NodeUid {
|
||||
let msgs = self.adversary.step();
|
||||
for (sender_id, msg) in msgs {
|
||||
self.dispatch_messages(sender_id, Some(msg));
|
||||
}
|
||||
// Pick a random non-idle node..
|
||||
let id = self.adversary.pick_node(&self.nodes);
|
||||
let msgs: Vec<_> = {
|
||||
let node = self.nodes.get_mut(&id).unwrap();
|
||||
node.handle_message();
|
||||
node.algo.message_iter().collect()
|
||||
};
|
||||
self.dispatch_messages(id, msgs);
|
||||
id
|
||||
}
|
||||
|
||||
/// Makes the node `proposer_id` propose a value.
|
||||
fn input(&mut self, proposer_id: NodeUid, value: ProposedValue) {
|
||||
let msgs: Vec<_> = {
|
||||
let node = self.nodes.get_mut(&proposer_id).expect("proposer instance");
|
||||
node.input(value);
|
||||
node.algo.message_iter().collect()
|
||||
};
|
||||
self.dispatch_messages(proposer_id, msgs);
|
||||
}
|
||||
}
|
||||
|
||||
/// Broadcasts a value from node 0 and expects all good nodes to receive it.
|
||||
fn test_broadcast<A: Adversary<Broadcast<NodeUid>>>(
|
||||
mut network: TestNetwork<A, Broadcast<NodeUid>>,
|
||||
|
@ -292,15 +82,19 @@ fn test_broadcast<A: Adversary<Broadcast<NodeUid>>>(
|
|||
network.input(NodeUid(0), proposed_value.to_vec());
|
||||
|
||||
// Handle messages in random order until all nodes have output the proposed value.
|
||||
while network.nodes.values().any(|node| node.outputs.is_empty()) {
|
||||
while network.nodes.values().any(|node| node.outputs().is_empty()) {
|
||||
let id = network.step();
|
||||
if !network.nodes[&id].outputs.is_empty() {
|
||||
assert_eq!(vec![proposed_value.to_vec()], network.nodes[&id].outputs);
|
||||
if !network.nodes[&id].outputs().is_empty() {
|
||||
assert_eq!(vec![proposed_value.to_vec()], network.nodes[&id].outputs());
|
||||
debug!("Node {:?} received", id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn new_broadcast(id: NodeUid, all_ids: BTreeSet<NodeUid>) -> Broadcast<NodeUid> {
|
||||
Broadcast::new(id, NodeUid(0), all_ids).expect("Instantiate broadcast")
|
||||
}
|
||||
|
||||
fn test_broadcast_different_sizes<A, F>(new_adversary: F, proposed_value: &[u8])
|
||||
where
|
||||
A: Adversary<Broadcast<NodeUid>>,
|
||||
|
@ -318,7 +112,7 @@ where
|
|||
num_good_nodes, num_faulty_nodes
|
||||
);
|
||||
let adversary = new_adversary(num_good_nodes, num_faulty_nodes);
|
||||
let network = TestNetwork::new(num_good_nodes, num_faulty_nodes, adversary);
|
||||
let network = TestNetwork::new(num_good_nodes, num_faulty_nodes, adversary, new_broadcast);
|
||||
test_broadcast(network, proposed_value);
|
||||
}
|
||||
}
|
||||
|
@ -328,7 +122,10 @@ fn test_8_broadcast_equal_leaves_silent() {
|
|||
let adversary = SilentAdversary::new(MessageScheduler::Random);
|
||||
// Space is ASCII character 32. So 32 spaces will create shards that are all equal, even if the
|
||||
// length of the value is inserted.
|
||||
test_broadcast(TestNetwork::new(8, 0, adversary), &[b' '; 32]);
|
||||
test_broadcast(
|
||||
TestNetwork::new(8, 0, adversary, new_broadcast),
|
||||
&[b' '; 32],
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -338,13 +135,13 @@ fn test_broadcast_random_delivery_silent() {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn test_broadcast_nodes_first_delivery_silent() {
|
||||
fn test_broadcast_first_delivery_silent() {
|
||||
let new_adversary = |_: usize, _: usize| SilentAdversary::new(MessageScheduler::First);
|
||||
test_broadcast_different_sizes(new_adversary, b"Foo");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_broadcast_nodes_random_delivery_adv_propose() {
|
||||
fn test_broadcast_random_delivery_adv_propose() {
|
||||
let new_adversary = |num_good_nodes: usize, num_faulty_nodes: usize| {
|
||||
let good_nodes: BTreeSet<NodeUid> = (0..num_good_nodes).map(NodeUid).collect();
|
||||
let adv_nodes: BTreeSet<NodeUid> = (num_good_nodes..(num_good_nodes + num_faulty_nodes))
|
||||
|
@ -356,7 +153,7 @@ fn test_broadcast_nodes_random_delivery_adv_propose() {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn test_broadcast_nodes_first_delivery_adv_propose() {
|
||||
fn test_broadcast_first_delivery_adv_propose() {
|
||||
let new_adversary = |num_good_nodes: usize, num_faulty_nodes: usize| {
|
||||
let good_nodes: BTreeSet<NodeUid> = (0..num_good_nodes).map(NodeUid).collect();
|
||||
let adv_nodes: BTreeSet<NodeUid> = (num_good_nodes..(num_good_nodes + num_faulty_nodes))
|
||||
|
|
|
@ -9,7 +9,7 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
|
|||
|
||||
use hbbft::common_subset;
|
||||
use hbbft::common_subset::CommonSubset;
|
||||
use hbbft::messaging::{Target, TargetedMessage};
|
||||
use hbbft::messaging::{DistAlgorithm, Target, TargetedMessage};
|
||||
|
||||
type ProposedValue = Vec<u8>;
|
||||
|
||||
|
@ -45,9 +45,11 @@ impl TestNode {
|
|||
let (sender_id, message) = self.queue
|
||||
.pop_front()
|
||||
.expect("popping a message off the queue");
|
||||
let (output, messages) = self.cs
|
||||
self.cs
|
||||
.handle_message(&sender_id, message)
|
||||
.expect("handling a Common Subset message");
|
||||
let output = self.cs.next_output();
|
||||
let messages = self.cs.message_iter().collect();
|
||||
debug!("{:?} produced messages: {:?}", self.id, messages);
|
||||
if let Some(ref decision) = output {
|
||||
self.decision = Some(decision.clone());
|
||||
|
@ -133,12 +135,11 @@ impl TestNetwork {
|
|||
|
||||
/// Make Node 0 propose a value.
|
||||
fn send_proposed_value(&mut self, sender_id: NodeUid, value: ProposedValue) {
|
||||
let messages = self.nodes
|
||||
.get_mut(&sender_id)
|
||||
.unwrap()
|
||||
.cs
|
||||
.send_proposed_value(value)
|
||||
.expect("send proposed value");
|
||||
let messages = {
|
||||
let cs = &mut self.nodes.get_mut(&sender_id).unwrap().cs;
|
||||
cs.send_proposed_value(value).expect("send proposed value");
|
||||
cs.message_iter().collect()
|
||||
};
|
||||
self.dispatch_messages(sender_id, messages);
|
||||
}
|
||||
}
|
||||
|
@ -149,7 +150,7 @@ fn test_common_subset(mut network: TestNetwork) -> BTreeMap<NodeUid, TestNode> {
|
|||
// Pick the first node with a non-empty queue.
|
||||
network.pick_node();
|
||||
|
||||
while network.nodes.values().any(|node| node.decision.is_none()) {
|
||||
while network.nodes.values().any(|node| !node.cs.terminated()) {
|
||||
let (NodeUid(id), output) = network.step();
|
||||
if let Some(decision) = output {
|
||||
debug!("Node {} output {:?}", id, decision);
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
//! Network tests for Honey Badger.
|
||||
|
||||
extern crate hbbft;
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
extern crate env_logger;
|
||||
extern crate rand;
|
||||
|
||||
mod network;
|
||||
|
||||
use std::collections::BTreeSet;
|
||||
use std::iter;
|
||||
|
||||
use rand::Rng;
|
||||
|
||||
use hbbft::honey_badger::{self, HoneyBadger};
|
||||
use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork, TestNode};
|
||||
|
||||
/// Proposes `num_txs` values and expects nodes to output and order them.
|
||||
fn test_honey_badger<A>(mut network: TestNetwork<A, HoneyBadger<usize, NodeUid>>, num_txs: usize)
|
||||
where
|
||||
A: Adversary<HoneyBadger<usize, NodeUid>>,
|
||||
{
|
||||
for tx in 0..num_txs {
|
||||
network.input_all(tx);
|
||||
}
|
||||
|
||||
// Returns `true` if the node has not output all transactions yet.
|
||||
// If it has, and has advanced another epoch, it clears all messages for later epochs.
|
||||
let node_busy = |node: &mut TestNode<HoneyBadger<usize, NodeUid>>| {
|
||||
let mut min_missing = 0;
|
||||
for batch in node.outputs() {
|
||||
for tx in &batch.transactions {
|
||||
if *tx >= min_missing {
|
||||
min_missing = tx + 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
if min_missing < num_txs {
|
||||
return true;
|
||||
}
|
||||
if node.outputs().last().unwrap().transactions.is_empty() {
|
||||
let last = node.outputs().last().unwrap().epoch;
|
||||
node.queue.retain(|(_, ref msg)| match msg {
|
||||
honey_badger::Message::CommonSubset(e, _) => *e < last,
|
||||
});
|
||||
}
|
||||
false
|
||||
};
|
||||
|
||||
// Handle messages in random order until all nodes have output all transactions.
|
||||
while network.nodes.values_mut().any(node_busy) {
|
||||
network.step();
|
||||
}
|
||||
// TODO: Verify that all nodes output the same epochs.
|
||||
}
|
||||
|
||||
fn new_honey_badger(id: NodeUid, all_ids: BTreeSet<NodeUid>) -> HoneyBadger<usize, NodeUid> {
|
||||
HoneyBadger::new(id, all_ids, 12, 0..5).expect("Instantiate honey_badger")
|
||||
}
|
||||
|
||||
fn test_honey_badger_different_sizes<A, F>(new_adversary: F, num_txs: usize)
|
||||
where
|
||||
A: Adversary<HoneyBadger<usize, NodeUid>>,
|
||||
F: Fn(usize, usize) -> A,
|
||||
{
|
||||
// This returns an error in all but the first test.
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
let mut rng = rand::thread_rng();
|
||||
let sizes = (4..5)
|
||||
.chain(iter::once(rng.gen_range(6, 10)))
|
||||
.chain(iter::once(rng.gen_range(11, 15)));
|
||||
for size in sizes {
|
||||
// TODO: Fix `CommonSubset` to tolerate faulty nodes.
|
||||
// let num_faulty_nodes = (size - 1) / 3;
|
||||
let num_faulty_nodes = 0;
|
||||
let num_good_nodes = size - num_faulty_nodes;
|
||||
info!(
|
||||
"Network size: {} good nodes, {} faulty nodes",
|
||||
num_good_nodes, num_faulty_nodes
|
||||
);
|
||||
let adversary = new_adversary(num_good_nodes, num_faulty_nodes);
|
||||
let network = TestNetwork::new(
|
||||
num_good_nodes,
|
||||
num_faulty_nodes,
|
||||
adversary,
|
||||
new_honey_badger,
|
||||
);
|
||||
test_honey_badger(network, num_txs);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_honey_badger_random_delivery_silent() {
|
||||
let new_adversary = |_: usize, _: usize| SilentAdversary::new(MessageScheduler::Random);
|
||||
test_honey_badger_different_sizes(new_adversary, 10);
|
||||
}
|
||||
|
||||
// TODO: This test is flaky. Debug.
|
||||
#[ignore]
|
||||
#[test]
|
||||
fn test_honey_badger_first_delivery_silent() {
|
||||
let new_adversary = |_: usize, _: usize| SilentAdversary::new(MessageScheduler::First);
|
||||
test_honey_badger_different_sizes(new_adversary, 10);
|
||||
}
|
|
@ -0,0 +1,248 @@
|
|||
use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
||||
use std::fmt::Debug;
|
||||
|
||||
use rand::{self, Rng};
|
||||
|
||||
use hbbft::messaging::{DistAlgorithm, Target, TargetedMessage};
|
||||
|
||||
/// A node identifier. In the tests, nodes are simply numbered.
|
||||
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy)]
|
||||
pub struct NodeUid(pub usize);
|
||||
|
||||
/// A "node" running an instance of the algorithm `D`.
|
||||
pub struct TestNode<D: DistAlgorithm> {
|
||||
/// This node's own ID.
|
||||
id: D::NodeUid,
|
||||
/// The instance of the broadcast algorithm.
|
||||
algo: D,
|
||||
/// Incoming messages from other nodes that this node has not yet handled.
|
||||
pub queue: VecDeque<(D::NodeUid, D::Message)>,
|
||||
/// The values this node has output so far.
|
||||
outputs: Vec<D::Output>,
|
||||
}
|
||||
|
||||
impl<D: DistAlgorithm> TestNode<D> {
|
||||
/// Returns the list of outputs received by this node.
|
||||
pub fn outputs(&self) -> &[D::Output] {
|
||||
&self.outputs
|
||||
}
|
||||
|
||||
/// Creates a new test node with the given broadcast instance.
|
||||
fn new(mut algo: D) -> TestNode<D> {
|
||||
let outputs = algo.output_iter().collect();
|
||||
TestNode {
|
||||
id: algo.our_id().clone(),
|
||||
algo,
|
||||
queue: VecDeque::new(),
|
||||
outputs,
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles the first message in the node's queue.
|
||||
fn handle_message(&mut self) {
|
||||
let (from_id, msg) = self.queue.pop_front().expect("message not found");
|
||||
debug!("Handling {:?} -> {:?}: {:?}", from_id, self.id, msg);
|
||||
self.algo
|
||||
.handle_message(&from_id, msg)
|
||||
.expect("handling message");
|
||||
self.outputs.extend(self.algo.output_iter());
|
||||
}
|
||||
|
||||
/// Inputs a value into the instance.
|
||||
fn input(&mut self, input: D::Input) {
|
||||
self.algo.input(input).expect("input");
|
||||
self.outputs.extend(self.algo.output_iter());
|
||||
}
|
||||
}
|
||||
|
||||
/// A strategy for picking the next good node to handle a message.
|
||||
pub enum MessageScheduler {
|
||||
/// Picks a random node.
|
||||
Random,
|
||||
/// Picks the first non-idle node.
|
||||
First,
|
||||
}
|
||||
|
||||
impl MessageScheduler {
|
||||
/// Chooses a node to be the next one to handle a message.
|
||||
pub fn pick_node<D: DistAlgorithm>(
|
||||
&self,
|
||||
nodes: &BTreeMap<D::NodeUid, TestNode<D>>,
|
||||
) -> D::NodeUid {
|
||||
match *self {
|
||||
MessageScheduler::First => nodes
|
||||
.iter()
|
||||
.find(|(_, node)| !node.queue.is_empty())
|
||||
.map(|(id, _)| id.clone())
|
||||
.expect("no more messages in queue"),
|
||||
MessageScheduler::Random => {
|
||||
let ids: Vec<D::NodeUid> = nodes
|
||||
.iter()
|
||||
.filter(|(_, node)| !node.queue.is_empty())
|
||||
.map(|(id, _)| id.clone())
|
||||
.collect();
|
||||
rand::thread_rng()
|
||||
.choose(&ids)
|
||||
.expect("no more messages in queue")
|
||||
.clone()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type MessageWithSender<D> = (
|
||||
<D as DistAlgorithm>::NodeUid,
|
||||
TargetedMessage<<D as DistAlgorithm>::Message, <D as DistAlgorithm>::NodeUid>,
|
||||
);
|
||||
|
||||
/// An adversary that can control a set of nodes and pick the next good node to receive a message.
|
||||
pub trait Adversary<D: DistAlgorithm> {
|
||||
/// Chooses a node to be the next one to handle a message.
|
||||
fn pick_node(&self, nodes: &BTreeMap<D::NodeUid, TestNode<D>>) -> D::NodeUid;
|
||||
|
||||
/// Adds a message sent to one of the adversary's nodes.
|
||||
fn push_message(&mut self, sender_id: D::NodeUid, msg: TargetedMessage<D::Message, D::NodeUid>);
|
||||
|
||||
/// Produces a list of messages to be sent from the adversary's nodes.
|
||||
fn step(&mut self) -> Vec<MessageWithSender<D>>;
|
||||
}
|
||||
|
||||
/// An adversary whose nodes never send any messages.
|
||||
pub struct SilentAdversary {
|
||||
scheduler: MessageScheduler,
|
||||
}
|
||||
|
||||
impl SilentAdversary {
|
||||
/// Creates a new silent adversary with the given message scheduler.
|
||||
pub fn new(scheduler: MessageScheduler) -> SilentAdversary {
|
||||
SilentAdversary { scheduler }
|
||||
}
|
||||
}
|
||||
|
||||
impl<D: DistAlgorithm> Adversary<D> for SilentAdversary {
|
||||
fn pick_node(&self, nodes: &BTreeMap<D::NodeUid, TestNode<D>>) -> D::NodeUid {
|
||||
self.scheduler.pick_node(nodes)
|
||||
}
|
||||
|
||||
fn push_message(&mut self, _: D::NodeUid, _: TargetedMessage<D::Message, D::NodeUid>) {
|
||||
// All messages are ignored.
|
||||
}
|
||||
|
||||
fn step(&mut self) -> Vec<MessageWithSender<D>> {
|
||||
vec![] // No messages are sent.
|
||||
}
|
||||
}
|
||||
|
||||
/// A collection of `TestNode`s representing a network.
|
||||
pub struct TestNetwork<A: Adversary<D>, D: DistAlgorithm> {
|
||||
pub nodes: BTreeMap<D::NodeUid, TestNode<D>>,
|
||||
adv_nodes: BTreeSet<D::NodeUid>,
|
||||
adversary: A,
|
||||
}
|
||||
|
||||
impl<A: Adversary<D>, D: DistAlgorithm<NodeUid = NodeUid>> TestNetwork<A, D>
|
||||
where
|
||||
D::Message: Clone,
|
||||
{
|
||||
/// Creates a new network with `good_num` good nodes, and the given `adversary` controlling
|
||||
/// `adv_num` nodes.
|
||||
pub fn new<F>(good_num: usize, adv_num: usize, adversary: A, new_algo: F) -> TestNetwork<A, D>
|
||||
where
|
||||
F: Fn(NodeUid, BTreeSet<NodeUid>) -> D,
|
||||
{
|
||||
let node_ids: BTreeSet<NodeUid> = (0..(good_num + adv_num)).map(NodeUid).collect();
|
||||
let new_node_by_id = |id: NodeUid| (id, TestNode::new(new_algo(id, node_ids.clone())));
|
||||
let mut network = TestNetwork {
|
||||
nodes: (0..good_num).map(NodeUid).map(new_node_by_id).collect(),
|
||||
adversary,
|
||||
adv_nodes: (good_num..(good_num + adv_num)).map(NodeUid).collect(),
|
||||
};
|
||||
let msgs = network.adversary.step();
|
||||
for (sender_id, msg) in msgs {
|
||||
network.dispatch_messages(sender_id, vec![msg]);
|
||||
}
|
||||
let mut initial_msgs: Vec<(D::NodeUid, Vec<_>)> = Vec::new();
|
||||
for (id, node) in &mut network.nodes {
|
||||
initial_msgs.push((*id, node.algo.message_iter().collect()));
|
||||
}
|
||||
for (id, msgs) in initial_msgs {
|
||||
network.dispatch_messages(id, msgs);
|
||||
}
|
||||
network
|
||||
}
|
||||
|
||||
/// Pushes the messages into the queues of the corresponding recipients.
|
||||
fn dispatch_messages<Q>(&mut self, sender_id: NodeUid, msgs: Q)
|
||||
where
|
||||
Q: IntoIterator<Item = TargetedMessage<D::Message, NodeUid>> + Debug,
|
||||
{
|
||||
for msg in msgs {
|
||||
match msg {
|
||||
TargetedMessage {
|
||||
target: Target::All,
|
||||
ref message,
|
||||
} => {
|
||||
for node in self.nodes.values_mut() {
|
||||
if node.id != sender_id {
|
||||
node.queue.push_back((sender_id, message.clone()))
|
||||
}
|
||||
}
|
||||
self.adversary.push_message(sender_id, msg.clone());
|
||||
}
|
||||
TargetedMessage {
|
||||
target: Target::Node(to_id),
|
||||
ref message,
|
||||
} => {
|
||||
if self.adv_nodes.contains(&to_id) {
|
||||
self.adversary.push_message(sender_id, msg.clone());
|
||||
} else {
|
||||
self.nodes
|
||||
.get_mut(&to_id)
|
||||
.unwrap()
|
||||
.queue
|
||||
.push_back((sender_id, message.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles a queued message in a randomly selected node and returns the selected node's ID.
|
||||
pub fn step(&mut self) -> NodeUid {
|
||||
let msgs = self.adversary.step();
|
||||
for (sender_id, msg) in msgs {
|
||||
self.dispatch_messages(sender_id, Some(msg));
|
||||
}
|
||||
// Pick a random non-idle node..
|
||||
let id = self.adversary.pick_node(&self.nodes);
|
||||
let msgs: Vec<_> = {
|
||||
let node = self.nodes.get_mut(&id).unwrap();
|
||||
node.handle_message();
|
||||
node.algo.message_iter().collect()
|
||||
};
|
||||
self.dispatch_messages(id, msgs);
|
||||
id
|
||||
}
|
||||
|
||||
/// Inputs a value in node `id`.
|
||||
pub fn input(&mut self, id: NodeUid, value: D::Input) {
|
||||
let msgs: Vec<_> = {
|
||||
let node = self.nodes.get_mut(&id).expect("proposer instance");
|
||||
node.input(value);
|
||||
node.algo.message_iter().collect()
|
||||
};
|
||||
self.dispatch_messages(id, msgs);
|
||||
}
|
||||
|
||||
/// Inputs a value in all nodes.
|
||||
#[allow(unused)] // Not used in all tests.
|
||||
pub fn input_all(&mut self, value: D::Input)
|
||||
where
|
||||
D::Input: Clone,
|
||||
{
|
||||
let ids: Vec<D::NodeUid> = self.nodes.keys().cloned().collect();
|
||||
for id in ids {
|
||||
self.input(id, value.clone());
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue