From 4164af1702afd355a9225a25c444b7507aef8093 Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Wed, 16 May 2018 14:23:57 +0200 Subject: [PATCH] Generalize TestNetwork and test HoneyBadger. --- Cargo.toml | 2 +- src/common_subset.rs | 262 +++++++++++++++++++++-------------------- src/honey_badger.rs | 195 ++++++++++++++++++++---------- src/lib.rs | 3 +- tests/broadcast.rs | 247 ++++---------------------------------- tests/common_subset.rs | 19 +-- tests/honey_badger.rs | 106 +++++++++++++++++ tests/network/mod.rs | 248 ++++++++++++++++++++++++++++++++++++++ 8 files changed, 657 insertions(+), 425 deletions(-) create mode 100644 tests/honey_badger.rs create mode 100644 tests/network/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 4de2df2..7d38743 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ itertools = "0.7" log = "0.4.1" merkle = { git = "https://github.com/afck/merkle.rs", branch = "public-proof" } protobuf = { version = "1.6.0", optional = true } +rand = "0.4.2" reed-solomon-erasure = "3.0" ring = "^0.12" serde = "1.0.55" @@ -26,7 +27,6 @@ protoc-rust = { version = "1.6.0", optional = true } crossbeam = "0.3.2" crossbeam-channel = "0.1" docopt = "0.8" -rand = "0.3" [[example]] name = "consensus-node" diff --git a/src/common_subset.rs b/src/common_subset.rs index 41a2826..39a8503 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -9,10 +9,9 @@ use std::hash::Hash; use agreement; use agreement::{Agreement, AgreementMessage}; - use broadcast; use broadcast::{Broadcast, BroadcastMessage}; - +use fmt::HexBytes; use messaging::{DistAlgorithm, Target, TargetedMessage}; // TODO: Make this a generic argument of `Broadcast`. @@ -65,6 +64,47 @@ pub struct CommonSubset { agreement_instances: HashMap>, broadcast_results: HashMap, agreement_results: HashMap, + messages: VecDeque, NodeUid>>, + output: Option>, +} + +impl DistAlgorithm for CommonSubset { + type NodeUid = NodeUid; + type Input = ProposedValue; + type Output = HashMap; + type Message = Message; + type Error = Error; + + fn input(&mut self, input: Self::Input) -> Result<(), Self::Error> { + self.send_proposed_value(input) + } + + fn handle_message( + &mut self, + sender_id: &Self::NodeUid, + message: Self::Message, + ) -> Result<(), Self::Error> { + match message { + Message::Broadcast(p_id, b_msg) => self.handle_broadcast(sender_id, &p_id, b_msg), + Message::Agreement(p_id, a_msg) => self.handle_agreement(sender_id, &p_id, &a_msg), + } + } + + fn next_message(&mut self) -> Option> { + self.messages.pop_front() + } + + fn next_output(&mut self) -> Option { + self.output.take() + } + + fn terminated(&self) -> bool { + self.messages.is_empty() && self.agreement_instances.values().all(Agreement::terminated) + } + + fn our_id(&self) -> &Self::NodeUid { + &self.uid + } } impl CommonSubset { @@ -99,141 +139,111 @@ impl CommonSubset { agreement_instances, broadcast_results: HashMap::new(), agreement_results: HashMap::new(), + messages: VecDeque::new(), + output: None, }) } /// Common Subset input message handler. It receives a value for broadcast /// and redirects it to the corresponding broadcast instance. - pub fn send_proposed_value( - &mut self, - value: ProposedValue, - ) -> Result, NodeUid>>, Error> { + pub fn send_proposed_value(&mut self, value: ProposedValue) -> Result<(), Error> { // Upon receiving input v_i , input v_i to RBC_i. See Figure 2. if let Some(instance) = self.broadcast_instances.get_mut(&self.uid) { instance.input(value)?; let uid = self.uid.clone(); - Ok(instance - .message_iter() - .map(|msg| msg.map(|b_msg| Message::Broadcast(uid.clone(), b_msg))) - .collect()) + self.messages.extend( + instance + .message_iter() + .map(|msg| msg.map(|b_msg| Message::Broadcast(uid.clone(), b_msg))), + ); } else { - Err(Error::NoSuchBroadcastInstance) + return Err(Error::NoSuchBroadcastInstance); } + self.try_agreement_completion(); + Ok(()) } /// Upon delivery of v_j from RBC_j, if input has not yet been provided to /// BA_j, then provide input 1 to BA_j. See Figure 11. - fn on_broadcast_result(&mut self, uid: &NodeUid) -> Result, Error> { + fn on_broadcast_result(&mut self, uid: &NodeUid) -> Result<(), Error> { if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) { if agreement_instance.accepts_input() { - Ok(Some(agreement_instance.set_input(true)?)) - } else { - Ok(None) + let msg = agreement_instance.set_input(true)?; + self.messages + .push_back(Target::All.message(Message::Agreement(uid.clone(), msg))); } } else { - Err(Error::NoSuchBroadcastInstance) - } - } - - /// Receives a message form a remote node `sender_id`, and returns an optional result of the - /// Common Subset algorithm - a set of proposed values - and a queue of messages to be sent to - /// remote nodes, or an error. - pub fn handle_message( - &mut self, - sender_id: &NodeUid, - message: Message, - ) -> Result, Error> { - match message { - Message::Broadcast(p_id, b_msg) => self.handle_broadcast(sender_id, &p_id, b_msg), - Message::Agreement(p_id, a_msg) => self.handle_agreement(sender_id, &p_id, &a_msg), + return Err(Error::NoSuchBroadcastInstance); } + self.try_agreement_completion(); + Ok(()) } /// Receives a broadcast message from a remote node `sender_id` concerning a - /// value proposed by the node `proposer_id`. The output contains an - /// optional result of the Common Subset algorithm - a set of proposed - /// values - and a queue of messages to be sent to remote nodes, or an - /// error. + /// value proposed by the node `proposer_id`. fn handle_broadcast( &mut self, sender_id: &NodeUid, proposer_id: &NodeUid, bmessage: BroadcastMessage, - ) -> Result, Error> { - let mut instance_result = None; - let input_result: Result< - VecDeque, NodeUid>>, - Error, - > = { - if let Some(broadcast_instance) = self.broadcast_instances.get_mut(proposer_id) { - broadcast_instance.handle_message(sender_id, bmessage)?; - instance_result = broadcast_instance.next_output(); - Ok(broadcast_instance + ) -> Result<(), Error> { + let value; + if let Some(broadcast_instance) = self.broadcast_instances.get_mut(proposer_id) { + broadcast_instance.handle_message(sender_id, bmessage)?; + self.messages.extend( + broadcast_instance .message_iter() - .map(|msg| msg.map(|b_msg| Message::Broadcast(proposer_id.clone(), b_msg))) - .collect()) - } else { - Err(Error::NoSuchBroadcastInstance) - } - }; - let mut opt_message: Option = None; - if let Some(value) = instance_result { - self.broadcast_results.insert(proposer_id.clone(), value); - opt_message = self.on_broadcast_result(proposer_id)?; + .map(|msg| msg.map(|b_msg| Message::Broadcast(proposer_id.clone(), b_msg))), + ); + value = match broadcast_instance.next_output() { + None => return Ok(()), + Some(result) => result, + }; + } else { + return Err(Error::NoSuchBroadcastInstance); } - input_result.map(|mut queue| { - if let Some(agreement_message) = opt_message { - // Append the message to agreement nodes to the common output queue. - queue.push_back( - Target::All.message(Message::Agreement(proposer_id.clone(), agreement_message)), - ); - } - (None, queue) - }) + self.broadcast_results.insert(proposer_id.clone(), value); + self.on_broadcast_result(proposer_id) } /// Receives an agreement message from a remote node `sender_id` concerning - /// a value proposed by the node `proposer_id`. The output contains an - /// optional result of the Common Subset algorithm - a set of proposed - /// values - and a queue of messages to be sent to remote nodes, or an - /// error. + /// a value proposed by the node `proposer_id`. fn handle_agreement( &mut self, sender_id: &NodeUid, proposer_id: &NodeUid, amessage: &AgreementMessage, - ) -> Result, Error> { - // The result defaults to error. - let mut result = Err(Error::NoSuchAgreementInstance); - + ) -> Result<(), Error> { + let input_result; // Send the message to the local instance of Agreement if let Some(agreement_instance) = self.agreement_instances.get_mut(proposer_id) { // Optional output of agreement and outgoing agreement // messages to remote nodes. - result = if agreement_instance.terminated() { + if agreement_instance.terminated() { // This instance has terminated and does not accept input. - Ok((None, VecDeque::new())) - } else { - // Send the message to the agreement instance. - agreement_instance - .handle_agreement_message(sender_id, &amessage) - .map_err(Error::from) + return Ok(()); } + // Send the message to the agreement instance. + let (opt_output, msgs) = + agreement_instance.handle_agreement_message(sender_id, &amessage)?; + self.messages.extend( + msgs.into_iter().map(|a_msg| { + Target::All.message(Message::Agreement(proposer_id.clone(), a_msg)) + }), + ); + input_result = opt_output; + } else { + debug!("Proposer {:?} does not exist.", proposer_id); + return Ok(()); } - let (output, mut outgoing) = result?; - - // Process Agreement outputs. - if let Some(b) = output { - outgoing.append(&mut self.on_agreement_output(proposer_id, b)?); + if let Some(output) = input_result { + // Process Agreement outputs. + self.on_agreement_output(proposer_id, output)?; } - // Check whether Agreement has completed. - let into_msg = |a_msg| Target::All.message(Message::Agreement(proposer_id.clone(), a_msg)); - Ok(( - self.try_agreement_completion(), - outgoing.into_iter().map(into_msg).collect(), - )) + self.try_agreement_completion(); + Ok(()) } /// Callback to be invoked on receipt of the decision value of the Agreement @@ -242,23 +252,28 @@ impl CommonSubset { &mut self, element_proposer_id: &NodeUid, result: bool, - ) -> Result, Error> { + ) -> Result<(), Error> { self.agreement_results .insert(element_proposer_id.clone(), result); - debug!("Updated Agreement results: {:?}", self.agreement_results); + debug!( + "{:?} Updated Agreement results: {:?}", + self.uid, self.agreement_results + ); if !result || self.count_true() < self.num_nodes - self.num_faulty_nodes { - return Ok(VecDeque::new()); + return Ok(()); } // Upon delivery of value 1 from at least N − f instances of BA, provide // input 0 to each instance of BA that has not yet been provided input. - let mut outgoing = VecDeque::new(); - for instance in self.agreement_instances.values_mut() { - if instance.accepts_input() { - outgoing.push_back(instance.set_input(false)?); + for agreement_instance in self.agreement_instances.values_mut() { + if agreement_instance.accepts_input() { + let msg = agreement_instance.set_input(false)?; + let uid = agreement_instance.our_id().clone(); + self.messages + .push_back(Target::All.message(Message::Agreement(uid, msg))); } } - Ok(outgoing) + Ok(()) } /// Returns the number of agreement instances that have decided "yes". @@ -266,42 +281,35 @@ impl CommonSubset { self.agreement_results.values().filter(|v| **v).count() } - fn try_agreement_completion(&self) -> Option> { + fn try_agreement_completion(&mut self) { // Once all instances of BA have completed, let C ⊂ [1..N] be // the indexes of each BA that delivered 1. Wait for the output // v_j for each RBC_j such that j∈C. Finally output ∪ j∈C v_j. - if self.agreement_instances - .values() - .all(|instance| instance.terminated()) - { - debug!("All Agreement instances have terminated"); - // All instances of Agreement that delivered `true` (or "1" in the paper). - let delivered_1: HashSet<&NodeUid> = self.agreement_results - .iter() - .filter(|(_, v)| **v) - .map(|(k, _)| k) - .collect(); - debug!("Agreement instances that delivered 1: {:?}", delivered_1); + if self.agreement_results.len() < self.num_nodes { + return; + } + debug!("{:?} All Agreement instances have terminated", self.uid); + // All instances of Agreement that delivered `true` (or "1" in the paper). + let delivered_1: HashSet<&NodeUid> = self.agreement_results + .iter() + .filter(|(_, v)| **v) + .map(|(k, _)| k) + .collect(); + debug!("Agreement instances that delivered 1: {:?}", delivered_1); - // Results of Broadcast instances in `delivered_1` - let broadcast_results: HashMap = self.broadcast_results - .iter() - .filter(|(k, _)| delivered_1.get(k).is_some()) - .map(|(k, v)| (k.clone(), v.clone())) - .collect(); - debug!( - "Broadcast results among the Agreement instances that delivered 1: {:?}", - broadcast_results - ); + // Results of Broadcast instances in `delivered_1` + let broadcast_results: HashMap = self.broadcast_results + .iter() + .filter(|(k, _)| delivered_1.contains(k)) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); - if delivered_1.len() == broadcast_results.len() { - debug!("Agreement instances completed with {:?}", broadcast_results); - Some(broadcast_results) - } else { - None + if delivered_1.len() == broadcast_results.len() { + debug!("{:?} Agreement instances completed:", self.uid); + for (uid, result) in &broadcast_results { + debug!(" {:?} → {:?}", uid, HexBytes(&result)); } - } else { - None + self.output = Some(broadcast_results) } } } diff --git a/src/honey_badger.rs b/src/honey_badger.rs index 1caf1e6..7049dbc 100644 --- a/src/honey_badger.rs +++ b/src/honey_badger.rs @@ -1,9 +1,11 @@ -use std::collections::{HashSet, VecDeque}; -use std::fmt::{Debug, Display}; +use std::collections::btree_map::Entry; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; +use std::fmt::Debug; use std::hash::Hash; -use std::iter; +use std::{cmp, iter}; use bincode; +use rand; use serde::de::DeserializeOwned; use serde::Serialize; @@ -11,15 +13,14 @@ use common_subset::{self, CommonSubset}; use messaging::{DistAlgorithm, TargetedMessage}; /// An instance of the Honey Badger Byzantine fault tolerant consensus algorithm. -pub struct HoneyBadger { - /// The buffer of transactions that have not yet been included in any batch. +pub struct HoneyBadger { + /// The buffer of transactions that have not yet been included in any output batch. buffer: VecDeque, - /// The current epoch, i.e. the number of batches that have been output so far. + /// The earliest epoch from which we have not yet received output. epoch: u64, - /// The Asynchronous Common Subset instance that decides which nodes' transactions to include. - // TODO: Common subset could be optimized to output before it is allowed to terminate. In that - // case, we would need to keep track of one or two previous instances, too. - common_subset: CommonSubset, + /// The Asynchronous Common Subset instance that decides which nodes' transactions to include, + /// indexed by epoch. + common_subsets: BTreeMap>, /// This node's ID. id: N, /// The set of all node IDs of the participants (including ourselves). @@ -36,8 +37,8 @@ pub struct HoneyBadger { impl DistAlgorithm for HoneyBadger where - T: Ord + Serialize + DeserializeOwned, - N: Eq + Hash + Ord + Clone + Display + Debug, + T: Ord + Serialize + DeserializeOwned + Debug, + N: Eq + Hash + Ord + Clone + Debug, { type NodeUid = N; type Input = T; @@ -78,54 +79,75 @@ where } // TODO: Use a threshold encryption scheme to encrypt the proposed transactions. -// TODO: We only contribute a proposal to the next round once we have `batch_size` buffered -// transactions. This should be more configurable: `min_batch_size`, `max_batch_size` and maybe a -// timeout? The paper assumes that all nodes will often have more or less the same set of -// transactions in the buffer; if the sets are disjoint on average, we can just send our whole -// buffer instead of 1/n of it. impl HoneyBadger where - T: Ord + Serialize + DeserializeOwned, - N: Eq + Hash + Ord + Clone + Display + Debug, + T: Ord + Serialize + DeserializeOwned + Debug, + N: Eq + Hash + Ord + Clone + Debug, { /// Returns a new Honey Badger instance with the given parameters, starting at epoch `0`. - pub fn new(id: N, all_uids_iter: I, batch_size: usize) -> Result + pub fn new(id: N, all_uids_iter: I, batch_size: usize, txs: TI) -> Result where I: IntoIterator, + TI: IntoIterator, { let all_uids: HashSet = all_uids_iter.into_iter().collect(); if !all_uids.contains(&id) { return Err(Error::OwnIdMissing); } - Ok(HoneyBadger { - buffer: VecDeque::new(), + let mut honey_badger = HoneyBadger { + buffer: txs.into_iter().collect(), epoch: 0, - common_subset: CommonSubset::new(id.clone(), &all_uids)?, + common_subsets: BTreeMap::new(), id, batch_size, all_uids, messages: VecDeque::new(), output: VecDeque::new(), - }) + }; + honey_badger.propose()?; + Ok(honey_badger) } /// Adds transactions into the buffer. - pub fn add_transactions(&mut self, txs: I) -> Result<(), Error> - where - I: IntoIterator, - { + pub fn add_transactions>(&mut self, txs: I) -> Result<(), Error> { self.buffer.extend(txs); - if self.buffer.len() < self.batch_size { - return Ok(()); - } - let share = bincode::serialize(&self.buffer)?; - for targeted_msg in self.common_subset.send_proposed_value(share)? { - let msg = targeted_msg.map(|cs_msg| Message::CommonSubset(self.epoch, cs_msg)); + Ok(()) + } + + /// Proposes a new batch in the current epoch. + fn propose(&mut self) -> Result<(), Error> { + let proposal = self.choose_transactions()?; + let cs = match self.common_subsets.entry(self.epoch) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(entry) => { + entry.insert(CommonSubset::new(self.id.clone(), &self.all_uids)?) + } + }; + cs.input(proposal)?; + for targeted_msg in cs.message_iter() { + let epoch = self.epoch; + let msg = targeted_msg.map(|cs_msg| Message::CommonSubset(epoch, cs_msg)); self.messages.push_back(msg); } Ok(()) } + /// Returns a random choice of `batch_size / all_uids.len()` buffered transactions, and + /// serializes them. + fn choose_transactions(&self) -> Result, Error> { + let mut rng = rand::thread_rng(); + let amount = cmp::max(1, self.batch_size / self.all_uids.len()); + let sample = match rand::seq::sample_iter(&mut rng, &self.buffer, amount) { + Ok(choice) => choice, + Err(choice) => choice, // Fewer than `amount` were available, which is fine. + }; + debug!( + "{:?} Proposing in epoch {}: {:?}", + self.id, self.epoch, sample + ); + Ok(bincode::serialize(&sample)?) + } + /// Handles a message for the common subset sub-algorithm. fn handle_common_subset_message( &mut self, @@ -133,47 +155,96 @@ where epoch: u64, message: common_subset::Message, ) -> Result<(), Error> { - if epoch != self.epoch { - // TODO: Do we need to cache messages for future epochs? - return Ok(()); + { + // Borrow the instance for `epoch`, or create it. + let cs = match self.common_subsets.entry(epoch) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(entry) => { + if epoch < self.epoch { + return Ok(()); // Epoch has already terminated. Message is obsolete. + } else { + entry.insert(CommonSubset::new(self.id.clone(), &self.all_uids)?) + } + } + }; + // Handle the message and put the outgoing messages into the queue. + cs.handle_message(sender_id, message)?; + for targeted_msg in cs.message_iter() { + let msg = targeted_msg.map(|cs_msg| Message::CommonSubset(epoch, cs_msg)); + self.messages.push_back(msg); + } } - let (cs_out, cs_msgs) = self.common_subset.handle_message(sender_id, message)?; - - for targeted_msg in cs_msgs { - let msg = targeted_msg.map(|cs_msg| Message::CommonSubset(epoch, cs_msg)); - self.messages.push_back(msg); + // If this is the current epoch, the message could cause a new output. + if epoch == self.epoch { + self.process_output()?; } - // FIXME: Handle the node IDs in `ser_batches`. - let batches: Vec> = if let Some(ser_batches) = cs_out { - ser_batches - .values() - .map(|ser_batch| bincode::deserialize(&ser_batch)) - .collect::>()? - } else { - return Ok(()); - }; - let mut transactions: Vec = batches.into_iter().flat_map(|txs| txs).collect(); - transactions.sort(); - self.epoch += 1; - self.common_subset = CommonSubset::new(self.id.clone(), &self.all_uids)?; - self.add_transactions(None)?; - self.output.push_back(Batch { - epoch, - transactions, - }); + self.remove_terminated(epoch); Ok(()) } + + /// Checks whether the current epoch has output, and if it does, advances the epoch and + /// proposes a new batch. + fn process_output(&mut self) -> Result<(), Error> { + let old_epoch = self.epoch; + while let Some(ser_batches) = self.take_current_output() { + // Deserialize the output. + let transactions: BTreeSet = ser_batches + .into_iter() + .map(|(_, ser_batch)| bincode::deserialize::>(&ser_batch)) + .collect::>, _>>()? + .into_iter() + .flat_map(|txs| txs) + .collect(); + // Remove the output transactions from our buffer. + self.buffer.retain(|tx| !transactions.contains(tx)); + debug!( + "{:?} Epoch {} output {:?}", + self.id, self.epoch, transactions + ); + // Queue the output and advance the epoch. + self.output.push_back(Batch { + epoch: self.epoch, + transactions, + }); + self.epoch += 1; + } + // If we have moved to a new epoch, propose a new batch of transactions. + if self.epoch > old_epoch { + self.propose()?; + } + Ok(()) + } + + /// Returns the output of the current epoch's `CommonSubset` instance, if any. + fn take_current_output(&mut self) -> Option>> { + self.common_subsets + .get_mut(&self.epoch) + .and_then(CommonSubset::next_output) + } + + /// Removes all `CommonSubset` instances from _past_ epochs that have terminated. + fn remove_terminated(&mut self, from_epoch: u64) { + for epoch in from_epoch..self.epoch { + if self.common_subsets + .get(&epoch) + .map_or(false, CommonSubset::terminated) + { + debug!("{:?} Epoch {} has terminated.", self.id, epoch); + self.common_subsets.remove(&epoch); + } + } + } } /// A batch of transactions the algorithm has output. pub struct Batch { pub epoch: u64, - pub transactions: Vec, + pub transactions: BTreeSet, } /// A message sent to or received from another node's Honey Badger instance. #[cfg_attr(feature = "serialization-serde", derive(Serialize))] -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum Message { /// A message belonging to the common subset algorithm in the given epoch. CommonSubset(u64, common_subset::Message), diff --git a/src/lib.rs b/src/lib.rs index a9bdf9e..c5c7565 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ extern crate itertools; extern crate merkle; #[cfg(feature = "serialization-protobuf")] extern crate protobuf; +extern crate rand; extern crate reed_solomon_erasure; extern crate ring; extern crate serde; @@ -22,10 +23,10 @@ extern crate serde_derive; pub mod agreement; pub mod broadcast; pub mod common_subset; +mod fmt; pub mod honey_badger; pub mod messaging; #[cfg(feature = "serialization-protobuf")] pub mod proto; #[cfg(feature = "serialization-protobuf")] pub mod proto_io; -mod fmt; diff --git a/tests/broadcast.rs b/tests/broadcast.rs index 43f8ec0..c694198 100644 --- a/tests/broadcast.rs +++ b/tests/broadcast.rs @@ -4,135 +4,18 @@ extern crate hbbft; #[macro_use] extern crate log; extern crate env_logger; -extern crate merkle; extern crate rand; +mod network; + +use std::collections::{BTreeMap, BTreeSet}; +use std::iter; + use rand::Rng; -use std::collections::{BTreeMap, BTreeSet, VecDeque}; -use std::{fmt, iter}; use hbbft::broadcast::{Broadcast, BroadcastMessage}; -use hbbft::messaging::{DistAlgorithm, Target, TargetedMessage}; - -#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy)] -struct NodeUid(usize); - -type ProposedValue = Vec; - -/// A "node" running a broadcast instance. -struct TestNode { - /// This node's own ID. - id: D::NodeUid, - /// The instance of the broadcast algorithm. - algo: D, - /// Incoming messages from other nodes that this node has not yet handled. - queue: VecDeque<(D::NodeUid, D::Message)>, - /// The values this node has output so far. - outputs: Vec, -} - -impl TestNode { - /// Creates a new test node with the given broadcast instance. - fn new(algo: D) -> TestNode { - TestNode { - id: algo.our_id().clone(), - algo, - queue: VecDeque::new(), - outputs: Vec::new(), - } - } - - /// Handles the first message in the node's queue. - fn handle_message(&mut self) { - let (from_id, msg) = self.queue.pop_front().expect("message not found"); - debug!("Handling {:?} -> {:?}: {:?}", from_id, self.id, msg); - self.algo - .handle_message(&from_id, msg) - .expect("handling message"); - self.outputs.extend(self.algo.output_iter()); - } - - /// Inputs a value into the instance. - fn input(&mut self, input: D::Input) { - self.algo.input(input).expect("input"); - self.outputs.extend(self.algo.output_iter()); - } -} - -/// A strategy for picking the next good node to handle a message. -enum MessageScheduler { - /// Picks a random node. - Random, - /// Picks the first non-idle node. - First, -} - -impl MessageScheduler { - /// Chooses a node to be the next one to handle a message. - fn pick_node(&self, nodes: &BTreeMap>) -> D::NodeUid { - match *self { - MessageScheduler::First => nodes - .iter() - .find(|(_, node)| !node.queue.is_empty()) - .map(|(id, _)| id.clone()) - .expect("no more messages in queue"), - MessageScheduler::Random => { - let ids: Vec = nodes - .iter() - .filter(|(_, node)| !node.queue.is_empty()) - .map(|(id, _)| id.clone()) - .collect(); - rand::thread_rng() - .choose(&ids) - .expect("no more messages in queue") - .clone() - } - } - } -} - -type MessageWithSender = ( - ::NodeUid, - TargetedMessage<::Message, ::NodeUid>, -); - -/// An adversary that can control a set of nodes and pick the next good node to receive a message. -trait Adversary { - /// Chooses a node to be the next one to handle a message. - fn pick_node(&self, nodes: &BTreeMap>) -> D::NodeUid; - - /// Adds a message sent to one of the adversary's nodes. - fn push_message(&mut self, sender_id: D::NodeUid, msg: TargetedMessage); - - /// Produces a list of messages to be sent from the adversary's nodes. - fn step(&mut self) -> Vec>; -} - -/// An adversary whose nodes never send any messages. -struct SilentAdversary { - scheduler: MessageScheduler, -} - -impl SilentAdversary { - /// Creates a new silent adversary with the given message scheduler. - fn new(scheduler: MessageScheduler) -> SilentAdversary { - SilentAdversary { scheduler } - } -} - -impl Adversary for SilentAdversary { - fn pick_node(&self, nodes: &BTreeMap>) -> D::NodeUid { - self.scheduler.pick_node(nodes) - } - - fn push_message(&mut self, _: D::NodeUid, _: TargetedMessage) { - // All messages are ignored. - } - - fn step(&mut self) -> Vec> { - vec![] // No messages are sent. - } -} +use hbbft::messaging::{DistAlgorithm, TargetedMessage}; +use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork, TestNode}; /// An adversary that inputs an alternate value. struct ProposeAdversary { @@ -187,99 +70,6 @@ impl Adversary> for ProposeAdversary { } } -/// A collection of `TestNode`s representing a network. -struct TestNetwork, D: DistAlgorithm> { - nodes: BTreeMap>, - adv_nodes: BTreeSet, - adversary: A, -} - -impl>> TestNetwork> { - /// Creates a new network with `good_num` good nodes, and the given `adversary` controlling - /// `adv_num` nodes. - fn new(good_num: usize, adv_num: usize, adversary: A) -> TestNetwork> { - let node_ids: BTreeSet = (0..(good_num + adv_num)).map(NodeUid).collect(); - let new_broadcast = |id: NodeUid| { - let bc = - Broadcast::new(id, NodeUid(0), node_ids.clone()).expect("Instantiate broadcast"); - (id, TestNode::new(bc)) - }; - let mut network = TestNetwork { - nodes: (0..good_num).map(NodeUid).map(new_broadcast).collect(), - adversary, - adv_nodes: (good_num..(good_num + adv_num)).map(NodeUid).collect(), - }; - let msgs = network.adversary.step(); - for (sender_id, msg) in msgs { - network.dispatch_messages(sender_id, vec![msg]); - } - network - } - - /// Pushes the messages into the queues of the corresponding recipients. - fn dispatch_messages(&mut self, sender_id: NodeUid, msgs: Q) - where - Q: IntoIterator> + fmt::Debug, - { - for msg in msgs { - match msg { - TargetedMessage { - target: Target::All, - ref message, - } => { - for node in self.nodes.values_mut() { - if node.id != sender_id { - node.queue.push_back((sender_id, message.clone())) - } - } - self.adversary.push_message(sender_id, msg.clone()); - } - TargetedMessage { - target: Target::Node(to_id), - ref message, - } => { - if self.adv_nodes.contains(&to_id) { - self.adversary.push_message(sender_id, msg.clone()); - } else { - self.nodes - .get_mut(&to_id) - .unwrap() - .queue - .push_back((sender_id, message.clone())); - } - } - } - } - } - - /// Handles a queued message in a randomly selected node and returns the selected node's ID. - fn step(&mut self) -> NodeUid { - let msgs = self.adversary.step(); - for (sender_id, msg) in msgs { - self.dispatch_messages(sender_id, Some(msg)); - } - // Pick a random non-idle node.. - let id = self.adversary.pick_node(&self.nodes); - let msgs: Vec<_> = { - let node = self.nodes.get_mut(&id).unwrap(); - node.handle_message(); - node.algo.message_iter().collect() - }; - self.dispatch_messages(id, msgs); - id - } - - /// Makes the node `proposer_id` propose a value. - fn input(&mut self, proposer_id: NodeUid, value: ProposedValue) { - let msgs: Vec<_> = { - let node = self.nodes.get_mut(&proposer_id).expect("proposer instance"); - node.input(value); - node.algo.message_iter().collect() - }; - self.dispatch_messages(proposer_id, msgs); - } -} - /// Broadcasts a value from node 0 and expects all good nodes to receive it. fn test_broadcast>>( mut network: TestNetwork>, @@ -292,15 +82,19 @@ fn test_broadcast>>( network.input(NodeUid(0), proposed_value.to_vec()); // Handle messages in random order until all nodes have output the proposed value. - while network.nodes.values().any(|node| node.outputs.is_empty()) { + while network.nodes.values().any(|node| node.outputs().is_empty()) { let id = network.step(); - if !network.nodes[&id].outputs.is_empty() { - assert_eq!(vec![proposed_value.to_vec()], network.nodes[&id].outputs); + if !network.nodes[&id].outputs().is_empty() { + assert_eq!(vec![proposed_value.to_vec()], network.nodes[&id].outputs()); debug!("Node {:?} received", id); } } } +fn new_broadcast(id: NodeUid, all_ids: BTreeSet) -> Broadcast { + Broadcast::new(id, NodeUid(0), all_ids).expect("Instantiate broadcast") +} + fn test_broadcast_different_sizes(new_adversary: F, proposed_value: &[u8]) where A: Adversary>, @@ -318,7 +112,7 @@ where num_good_nodes, num_faulty_nodes ); let adversary = new_adversary(num_good_nodes, num_faulty_nodes); - let network = TestNetwork::new(num_good_nodes, num_faulty_nodes, adversary); + let network = TestNetwork::new(num_good_nodes, num_faulty_nodes, adversary, new_broadcast); test_broadcast(network, proposed_value); } } @@ -328,7 +122,10 @@ fn test_8_broadcast_equal_leaves_silent() { let adversary = SilentAdversary::new(MessageScheduler::Random); // Space is ASCII character 32. So 32 spaces will create shards that are all equal, even if the // length of the value is inserted. - test_broadcast(TestNetwork::new(8, 0, adversary), &[b' '; 32]); + test_broadcast( + TestNetwork::new(8, 0, adversary, new_broadcast), + &[b' '; 32], + ); } #[test] @@ -338,13 +135,13 @@ fn test_broadcast_random_delivery_silent() { } #[test] -fn test_broadcast_nodes_first_delivery_silent() { +fn test_broadcast_first_delivery_silent() { let new_adversary = |_: usize, _: usize| SilentAdversary::new(MessageScheduler::First); test_broadcast_different_sizes(new_adversary, b"Foo"); } #[test] -fn test_broadcast_nodes_random_delivery_adv_propose() { +fn test_broadcast_random_delivery_adv_propose() { let new_adversary = |num_good_nodes: usize, num_faulty_nodes: usize| { let good_nodes: BTreeSet = (0..num_good_nodes).map(NodeUid).collect(); let adv_nodes: BTreeSet = (num_good_nodes..(num_good_nodes + num_faulty_nodes)) @@ -356,7 +153,7 @@ fn test_broadcast_nodes_random_delivery_adv_propose() { } #[test] -fn test_broadcast_nodes_first_delivery_adv_propose() { +fn test_broadcast_first_delivery_adv_propose() { let new_adversary = |num_good_nodes: usize, num_faulty_nodes: usize| { let good_nodes: BTreeSet = (0..num_good_nodes).map(NodeUid).collect(); let adv_nodes: BTreeSet = (num_good_nodes..(num_good_nodes + num_faulty_nodes)) diff --git a/tests/common_subset.rs b/tests/common_subset.rs index 99df1b5..64ee2e8 100644 --- a/tests/common_subset.rs +++ b/tests/common_subset.rs @@ -9,7 +9,7 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use hbbft::common_subset; use hbbft::common_subset::CommonSubset; -use hbbft::messaging::{Target, TargetedMessage}; +use hbbft::messaging::{DistAlgorithm, Target, TargetedMessage}; type ProposedValue = Vec; @@ -45,9 +45,11 @@ impl TestNode { let (sender_id, message) = self.queue .pop_front() .expect("popping a message off the queue"); - let (output, messages) = self.cs + self.cs .handle_message(&sender_id, message) .expect("handling a Common Subset message"); + let output = self.cs.next_output(); + let messages = self.cs.message_iter().collect(); debug!("{:?} produced messages: {:?}", self.id, messages); if let Some(ref decision) = output { self.decision = Some(decision.clone()); @@ -133,12 +135,11 @@ impl TestNetwork { /// Make Node 0 propose a value. fn send_proposed_value(&mut self, sender_id: NodeUid, value: ProposedValue) { - let messages = self.nodes - .get_mut(&sender_id) - .unwrap() - .cs - .send_proposed_value(value) - .expect("send proposed value"); + let messages = { + let cs = &mut self.nodes.get_mut(&sender_id).unwrap().cs; + cs.send_proposed_value(value).expect("send proposed value"); + cs.message_iter().collect() + }; self.dispatch_messages(sender_id, messages); } } @@ -149,7 +150,7 @@ fn test_common_subset(mut network: TestNetwork) -> BTreeMap { // Pick the first node with a non-empty queue. network.pick_node(); - while network.nodes.values().any(|node| node.decision.is_none()) { + while network.nodes.values().any(|node| !node.cs.terminated()) { let (NodeUid(id), output) = network.step(); if let Some(decision) = output { debug!("Node {} output {:?}", id, decision); diff --git a/tests/honey_badger.rs b/tests/honey_badger.rs new file mode 100644 index 0000000..6449822 --- /dev/null +++ b/tests/honey_badger.rs @@ -0,0 +1,106 @@ +//! Network tests for Honey Badger. + +extern crate hbbft; +#[macro_use] +extern crate log; +extern crate env_logger; +extern crate rand; + +mod network; + +use std::collections::BTreeSet; +use std::iter; + +use rand::Rng; + +use hbbft::honey_badger::{self, HoneyBadger}; +use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork, TestNode}; + +/// Proposes `num_txs` values and expects nodes to output and order them. +fn test_honey_badger(mut network: TestNetwork>, num_txs: usize) +where + A: Adversary>, +{ + for tx in 0..num_txs { + network.input_all(tx); + } + + // Returns `true` if the node has not output all transactions yet. + // If it has, and has advanced another epoch, it clears all messages for later epochs. + let node_busy = |node: &mut TestNode>| { + let mut min_missing = 0; + for batch in node.outputs() { + for tx in &batch.transactions { + if *tx >= min_missing { + min_missing = tx + 1; + } + } + } + if min_missing < num_txs { + return true; + } + if node.outputs().last().unwrap().transactions.is_empty() { + let last = node.outputs().last().unwrap().epoch; + node.queue.retain(|(_, ref msg)| match msg { + honey_badger::Message::CommonSubset(e, _) => *e < last, + }); + } + false + }; + + // Handle messages in random order until all nodes have output all transactions. + while network.nodes.values_mut().any(node_busy) { + network.step(); + } + // TODO: Verify that all nodes output the same epochs. +} + +fn new_honey_badger(id: NodeUid, all_ids: BTreeSet) -> HoneyBadger { + HoneyBadger::new(id, all_ids, 12, 0..5).expect("Instantiate honey_badger") +} + +fn test_honey_badger_different_sizes(new_adversary: F, num_txs: usize) +where + A: Adversary>, + F: Fn(usize, usize) -> A, +{ + // This returns an error in all but the first test. + let _ = env_logger::try_init(); + + let mut rng = rand::thread_rng(); + let sizes = (4..5) + .chain(iter::once(rng.gen_range(6, 10))) + .chain(iter::once(rng.gen_range(11, 15))); + for size in sizes { + // TODO: Fix `CommonSubset` to tolerate faulty nodes. + // let num_faulty_nodes = (size - 1) / 3; + let num_faulty_nodes = 0; + let num_good_nodes = size - num_faulty_nodes; + info!( + "Network size: {} good nodes, {} faulty nodes", + num_good_nodes, num_faulty_nodes + ); + let adversary = new_adversary(num_good_nodes, num_faulty_nodes); + let network = TestNetwork::new( + num_good_nodes, + num_faulty_nodes, + adversary, + new_honey_badger, + ); + test_honey_badger(network, num_txs); + } +} + +#[test] +fn test_honey_badger_random_delivery_silent() { + let new_adversary = |_: usize, _: usize| SilentAdversary::new(MessageScheduler::Random); + test_honey_badger_different_sizes(new_adversary, 10); +} + +// TODO: This test is flaky. Debug. +#[ignore] +#[test] +fn test_honey_badger_first_delivery_silent() { + let new_adversary = |_: usize, _: usize| SilentAdversary::new(MessageScheduler::First); + test_honey_badger_different_sizes(new_adversary, 10); +} diff --git a/tests/network/mod.rs b/tests/network/mod.rs new file mode 100644 index 0000000..e6f64a3 --- /dev/null +++ b/tests/network/mod.rs @@ -0,0 +1,248 @@ +use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::fmt::Debug; + +use rand::{self, Rng}; + +use hbbft::messaging::{DistAlgorithm, Target, TargetedMessage}; + +/// A node identifier. In the tests, nodes are simply numbered. +#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy)] +pub struct NodeUid(pub usize); + +/// A "node" running an instance of the algorithm `D`. +pub struct TestNode { + /// This node's own ID. + id: D::NodeUid, + /// The instance of the broadcast algorithm. + algo: D, + /// Incoming messages from other nodes that this node has not yet handled. + pub queue: VecDeque<(D::NodeUid, D::Message)>, + /// The values this node has output so far. + outputs: Vec, +} + +impl TestNode { + /// Returns the list of outputs received by this node. + pub fn outputs(&self) -> &[D::Output] { + &self.outputs + } + + /// Creates a new test node with the given broadcast instance. + fn new(mut algo: D) -> TestNode { + let outputs = algo.output_iter().collect(); + TestNode { + id: algo.our_id().clone(), + algo, + queue: VecDeque::new(), + outputs, + } + } + + /// Handles the first message in the node's queue. + fn handle_message(&mut self) { + let (from_id, msg) = self.queue.pop_front().expect("message not found"); + debug!("Handling {:?} -> {:?}: {:?}", from_id, self.id, msg); + self.algo + .handle_message(&from_id, msg) + .expect("handling message"); + self.outputs.extend(self.algo.output_iter()); + } + + /// Inputs a value into the instance. + fn input(&mut self, input: D::Input) { + self.algo.input(input).expect("input"); + self.outputs.extend(self.algo.output_iter()); + } +} + +/// A strategy for picking the next good node to handle a message. +pub enum MessageScheduler { + /// Picks a random node. + Random, + /// Picks the first non-idle node. + First, +} + +impl MessageScheduler { + /// Chooses a node to be the next one to handle a message. + pub fn pick_node( + &self, + nodes: &BTreeMap>, + ) -> D::NodeUid { + match *self { + MessageScheduler::First => nodes + .iter() + .find(|(_, node)| !node.queue.is_empty()) + .map(|(id, _)| id.clone()) + .expect("no more messages in queue"), + MessageScheduler::Random => { + let ids: Vec = nodes + .iter() + .filter(|(_, node)| !node.queue.is_empty()) + .map(|(id, _)| id.clone()) + .collect(); + rand::thread_rng() + .choose(&ids) + .expect("no more messages in queue") + .clone() + } + } + } +} + +type MessageWithSender = ( + ::NodeUid, + TargetedMessage<::Message, ::NodeUid>, +); + +/// An adversary that can control a set of nodes and pick the next good node to receive a message. +pub trait Adversary { + /// Chooses a node to be the next one to handle a message. + fn pick_node(&self, nodes: &BTreeMap>) -> D::NodeUid; + + /// Adds a message sent to one of the adversary's nodes. + fn push_message(&mut self, sender_id: D::NodeUid, msg: TargetedMessage); + + /// Produces a list of messages to be sent from the adversary's nodes. + fn step(&mut self) -> Vec>; +} + +/// An adversary whose nodes never send any messages. +pub struct SilentAdversary { + scheduler: MessageScheduler, +} + +impl SilentAdversary { + /// Creates a new silent adversary with the given message scheduler. + pub fn new(scheduler: MessageScheduler) -> SilentAdversary { + SilentAdversary { scheduler } + } +} + +impl Adversary for SilentAdversary { + fn pick_node(&self, nodes: &BTreeMap>) -> D::NodeUid { + self.scheduler.pick_node(nodes) + } + + fn push_message(&mut self, _: D::NodeUid, _: TargetedMessage) { + // All messages are ignored. + } + + fn step(&mut self) -> Vec> { + vec![] // No messages are sent. + } +} + +/// A collection of `TestNode`s representing a network. +pub struct TestNetwork, D: DistAlgorithm> { + pub nodes: BTreeMap>, + adv_nodes: BTreeSet, + adversary: A, +} + +impl, D: DistAlgorithm> TestNetwork +where + D::Message: Clone, +{ + /// Creates a new network with `good_num` good nodes, and the given `adversary` controlling + /// `adv_num` nodes. + pub fn new(good_num: usize, adv_num: usize, adversary: A, new_algo: F) -> TestNetwork + where + F: Fn(NodeUid, BTreeSet) -> D, + { + let node_ids: BTreeSet = (0..(good_num + adv_num)).map(NodeUid).collect(); + let new_node_by_id = |id: NodeUid| (id, TestNode::new(new_algo(id, node_ids.clone()))); + let mut network = TestNetwork { + nodes: (0..good_num).map(NodeUid).map(new_node_by_id).collect(), + adversary, + adv_nodes: (good_num..(good_num + adv_num)).map(NodeUid).collect(), + }; + let msgs = network.adversary.step(); + for (sender_id, msg) in msgs { + network.dispatch_messages(sender_id, vec![msg]); + } + let mut initial_msgs: Vec<(D::NodeUid, Vec<_>)> = Vec::new(); + for (id, node) in &mut network.nodes { + initial_msgs.push((*id, node.algo.message_iter().collect())); + } + for (id, msgs) in initial_msgs { + network.dispatch_messages(id, msgs); + } + network + } + + /// Pushes the messages into the queues of the corresponding recipients. + fn dispatch_messages(&mut self, sender_id: NodeUid, msgs: Q) + where + Q: IntoIterator> + Debug, + { + for msg in msgs { + match msg { + TargetedMessage { + target: Target::All, + ref message, + } => { + for node in self.nodes.values_mut() { + if node.id != sender_id { + node.queue.push_back((sender_id, message.clone())) + } + } + self.adversary.push_message(sender_id, msg.clone()); + } + TargetedMessage { + target: Target::Node(to_id), + ref message, + } => { + if self.adv_nodes.contains(&to_id) { + self.adversary.push_message(sender_id, msg.clone()); + } else { + self.nodes + .get_mut(&to_id) + .unwrap() + .queue + .push_back((sender_id, message.clone())); + } + } + } + } + } + + /// Handles a queued message in a randomly selected node and returns the selected node's ID. + pub fn step(&mut self) -> NodeUid { + let msgs = self.adversary.step(); + for (sender_id, msg) in msgs { + self.dispatch_messages(sender_id, Some(msg)); + } + // Pick a random non-idle node.. + let id = self.adversary.pick_node(&self.nodes); + let msgs: Vec<_> = { + let node = self.nodes.get_mut(&id).unwrap(); + node.handle_message(); + node.algo.message_iter().collect() + }; + self.dispatch_messages(id, msgs); + id + } + + /// Inputs a value in node `id`. + pub fn input(&mut self, id: NodeUid, value: D::Input) { + let msgs: Vec<_> = { + let node = self.nodes.get_mut(&id).expect("proposer instance"); + node.input(value); + node.algo.message_iter().collect() + }; + self.dispatch_messages(id, msgs); + } + + /// Inputs a value in all nodes. + #[allow(unused)] // Not used in all tests. + pub fn input_all(&mut self, value: D::Input) + where + D::Input: Clone, + { + let ids: Vec = self.nodes.keys().cloned().collect(); + for id in ids { + self.input(id, value.clone()); + } + } +}