Extend tests and fix CommonSubset.

Verify termination and more outputs.

`CommonSubset` now instantiates `Agreement` with the correct ID.
This commit is contained in:
Andreas Fackler 2018-05-19 14:29:31 +02:00
parent 8a406dd154
commit b8a2463d1c
10 changed files with 190 additions and 170 deletions

View File

@ -5,6 +5,7 @@ authors = ["Vladimir Komendantskiy <komendantsky@gmail.com>"]
[dependencies]
bincode = "1.0.0"
derive_deref = "1.0.1"
env_logger = "0.5.10"
itertools = "0.7"
log = "0.4.1"

View File

@ -187,18 +187,11 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
// where w ∈ bin_values_r
if bin_values_was_empty {
// Send an AUX message at most once per epoch.
self.messages
.push_back(AgreementMessage::Aux(self.epoch, b));
// Receive the AUX message locally.
let our_uid = self.uid.clone();
self.handle_aux(&our_uid, b)?;
self.send_aux(b)?;
}
self.try_coin()?;
}
// upon receiving BVAL_r(b) messages from f + 1 nodes, if
// BVAL_r(b) has not been sent, multicast BVAL_r(b)
else if count_bval == self.num_faulty_nodes + 1 && !self.sent_bval.contains(&b) {
} else if count_bval == self.num_faulty_nodes + 1 && !self.sent_bval.contains(&b) {
// upon receiving BVAL_r(b) messages from f + 1 nodes, if
// BVAL_r(b) has not been sent, multicast BVAL_r(b)
self.send_bval(b)?;
}
Ok(())
@ -217,10 +210,16 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result<(), Error> {
self.received_aux.insert(sender_id.clone(), b);
if !self.bin_values.is_empty() {
self.try_coin()?;
}
Ok(())
self.try_coin()
}
fn send_aux(&mut self, b: bool) -> Result<(), Error> {
// Multicast AUX.
self.messages
.push_back(AgreementMessage::Aux(self.epoch, b));
// Receive the AUX message locally.
let our_uid = self.uid.clone();
self.handle_aux(&our_uid, b)
}
/// AUX_r messages such that the set of values carried by those messages is
@ -253,6 +252,9 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
/// value. The function may start the next epoch. In that case, it also
/// returns a message for broadcast.
fn try_coin(&mut self) -> Result<(), Error> {
if self.bin_values.is_empty() {
return Ok(());
}
let (count_aux, vals) = self.count_aux();
if count_aux < self.num_nodes - self.num_faulty_nodes {
// Continue waiting for the (N - f) AUX messages.
@ -288,8 +290,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
self.estimated = Some(coin);
} else {
// NOTE: `vals` has exactly one element due to `vals.len() == 1`
let v: Vec<bool> = vals.into_iter().collect();
let b = v[0];
let b = vals.into_iter().next().unwrap();
self.estimated = Some(b);
// Outputting a value is allowed only once.
if self.decision.is_none() && b == coin {

View File

@ -33,6 +33,28 @@ pub enum Message<NodeUid> {
Agreement(NodeUid, AgreementMessage),
}
/// The queue of outgoing messages in a `CommonSubset` instance.
#[derive(Deref, DerefMut)]
struct MessageQueue<NodeUid>(VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>);
impl<NodeUid: Clone + Debug + Eq + Hash + Ord> MessageQueue<NodeUid> {
/// Appends to the queue the messages from `agr`, wrapped with `proposer_id`.
fn extend_agreement(&mut self, proposer_id: &NodeUid, agr: &mut Agreement<NodeUid>) {
let convert = |msg: TargetedMessage<AgreementMessage, NodeUid>| {
msg.map(|a_msg| Message::Agreement(proposer_id.clone(), a_msg))
};
self.extend(agr.message_iter().map(convert));
}
/// Appends to the queue the messages from `bc`, wrapped with `proposer_id`.
fn extend_broadcast(&mut self, proposer_id: &NodeUid, bc: &mut Broadcast<NodeUid>) {
let convert = |msg: TargetedMessage<BroadcastMessage, NodeUid>| {
msg.map(|b_msg| Message::Broadcast(proposer_id.clone(), b_msg))
};
self.extend(bc.message_iter().map(convert));
}
}
/// Asynchronous Common Subset algorithm instance
///
/// The Asynchronous Common Subset protocol assumes a network of `N` nodes that send signed
@ -64,8 +86,9 @@ pub struct CommonSubset<NodeUid: Eq + Hash + Ord> {
agreement_instances: BTreeMap<NodeUid, Agreement<NodeUid>>,
broadcast_results: BTreeMap<NodeUid, ProposedValue>,
agreement_results: BTreeMap<NodeUid, bool>,
messages: VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>,
messages: MessageQueue<NodeUid>,
output: Option<BTreeMap<NodeUid, ProposedValue>>,
decided: bool,
}
impl<NodeUid: Clone + Debug + Eq + Hash + Ord> DistAlgorithm for CommonSubset<NodeUid> {
@ -86,7 +109,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> DistAlgorithm for CommonSubset<No
) -> Result<(), Self::Error> {
match message {
Message::Broadcast(p_id, b_msg) => self.handle_broadcast(sender_id, &p_id, b_msg),
Message::Agreement(p_id, a_msg) => self.handle_agreement(sender_id, &p_id, &a_msg),
Message::Agreement(p_id, a_msg) => self.handle_agreement(sender_id, &p_id, a_msg),
}
}
@ -99,10 +122,6 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> DistAlgorithm for CommonSubset<No
}
fn terminated(&self) -> bool {
debug!(
"Termination check. Terminated Agreement instances: {:?}",
self.agreement_instances.values().all(Agreement::terminated)
);
self.messages.is_empty() && self.agreement_instances.values().all(Agreement::terminated)
}
@ -132,7 +151,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
// Create all agreement instances.
let mut agreement_instances: BTreeMap<NodeUid, Agreement<NodeUid>> = BTreeMap::new();
for uid0 in all_uids {
agreement_instances.insert(uid0.clone(), Agreement::new(uid0.clone(), num_nodes));
agreement_instances.insert(uid0.clone(), Agreement::new(uid.clone(), num_nodes));
}
Ok(CommonSubset {
@ -143,47 +162,18 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
agreement_instances,
broadcast_results: BTreeMap::new(),
agreement_results: BTreeMap::new(),
messages: VecDeque::new(),
messages: MessageQueue(VecDeque::new()),
output: None,
decided: false,
})
}
/// Common Subset input message handler. It receives a value for broadcast
/// and redirects it to the corresponding broadcast instance.
pub fn send_proposed_value(&mut self, value: ProposedValue) -> Result<(), Error> {
let uid = self.uid.clone();
// Upon receiving input v_i , input v_i to RBC_i. See Figure 2.
if let Some(instance) = self.broadcast_instances.get_mut(&self.uid) {
instance.input(value)?;
let uid = self.uid.clone();
self.messages.extend(
instance
.message_iter()
.map(|msg| msg.map(|b_msg| Message::Broadcast(uid.clone(), b_msg))),
);
} else {
return Err(Error::NoSuchBroadcastInstance);
}
self.try_agreement_completion();
Ok(())
}
/// Upon delivery of v_j from RBC_j, if input has not yet been provided to
/// BA_j, then provide input 1 to BA_j. See Figure 11.
fn on_broadcast_result(&mut self, uid: &NodeUid) -> Result<(), Error> {
if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) {
if agreement_instance.accepts_input() {
agreement_instance.set_input(true)?;
self.messages.extend(
agreement_instance
.message_iter()
.map(|msg| msg.map(|a_msg| Message::Agreement(uid.clone(), a_msg))),
);
}
} else {
return Err(Error::NoSuchBroadcastInstance);
}
self.try_agreement_completion();
Ok(())
self.process_broadcast(&uid, |bc| bc.input(value))
}
/// Receives a broadcast message from a remote node `sender_id` concerning a
@ -194,23 +184,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
proposer_id: &NodeUid,
bmessage: BroadcastMessage,
) -> Result<(), Error> {
let value;
if let Some(broadcast_instance) = self.broadcast_instances.get_mut(proposer_id) {
broadcast_instance.handle_message(sender_id, bmessage)?;
self.messages.extend(
broadcast_instance
.message_iter()
.map(|msg| msg.map(|b_msg| Message::Broadcast(proposer_id.clone(), b_msg))),
);
value = match broadcast_instance.next_output() {
None => return Ok(()),
Some(result) => result,
};
} else {
return Err(Error::NoSuchBroadcastInstance);
}
self.broadcast_results.insert(proposer_id.clone(), value);
self.on_broadcast_result(proposer_id)
self.process_broadcast(proposer_id, |bc| bc.handle_message(sender_id, bmessage))
}
/// Receives an agreement message from a remote node `sender_id` concerning
@ -219,69 +193,90 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
&mut self,
sender_id: &NodeUid,
proposer_id: &NodeUid,
amessage: &AgreementMessage,
amessage: AgreementMessage,
) -> Result<(), Error> {
let input_result;
// Send the message to the local instance of Agreement
if let Some(agreement_instance) = self.agreement_instances.get_mut(proposer_id) {
// Optional output of agreement and outgoing agreement
// messages to remote nodes.
if agreement_instance.terminated() {
// This instance has terminated and does not accept input.
self.process_agreement(proposer_id, |agreement| {
agreement.handle_message(sender_id, amessage)
})
}
/// Upon delivery of v_j from RBC_j, if input has not yet been provided to
/// BA_j, then provide input 1 to BA_j. See Figure 11.
fn process_broadcast<F>(&mut self, proposer_id: &NodeUid, f: F) -> Result<(), Error>
where
F: FnOnce(&mut Broadcast<NodeUid>) -> Result<(), broadcast::Error>,
{
let value = {
let broadcast = self.broadcast_instances
.get_mut(proposer_id)
.ok_or(Error::NoSuchBroadcastInstance)?;
f(broadcast)?;
self.messages.extend_broadcast(&proposer_id, broadcast);
if let Some(output) = broadcast.next_output() {
output
} else {
return Ok(());
}
// Send the message to the agreement instance.
agreement_instance.handle_message(sender_id, amessage.clone())?;
self.messages.extend(
agreement_instance
.message_iter()
.map(|msg| msg.map(|a_msg| Message::Agreement(proposer_id.clone(), a_msg))),
);
input_result = agreement_instance.next_output();
} else {
debug!("Proposer {:?} does not exist.", proposer_id);
return Err(Error::NoSuchAgreementInstance);
}
if let Some(output) = input_result {
// Process Agreement outputs.
self.on_agreement_output(proposer_id, output)?;
}
// Check whether Agreement has completed.
self.try_agreement_completion();
Ok(())
};
self.broadcast_results.insert(proposer_id.clone(), value);
self.process_agreement(proposer_id, |agreement| {
if agreement.accepts_input() {
agreement.set_input(true)
} else {
Ok(())
}
})
}
/// Callback to be invoked on receipt of the decision value of the Agreement
/// instance `uid`.
fn on_agreement_output(
&mut self,
element_proposer_id: &NodeUid,
result: bool,
) -> Result<(), Error> {
self.agreement_results
.insert(element_proposer_id.clone(), result);
fn process_agreement<F>(&mut self, proposer_id: &NodeUid, f: F) -> Result<(), Error>
where
F: FnOnce(&mut Agreement<NodeUid>) -> Result<(), agreement::Error>,
{
let value = {
let agreement = self.agreement_instances
.get_mut(proposer_id)
.ok_or(Error::NoSuchAgreementInstance)?;
if agreement.terminated() {
return Ok(());
}
f(agreement);
self.messages.extend_agreement(proposer_id, agreement);
if let Some(output) = agreement.next_output() {
output
} else {
return Ok(());
}
};
if self.agreement_results
.insert(proposer_id.clone(), value)
.is_some()
{
return Err(Error::MultipleAgreementResults);
}
debug!(
"{:?} Updated Agreement results: {:?}",
self.uid, self.agreement_results
);
if !result || self.count_true() < self.num_nodes - self.num_faulty_nodes {
return Ok(());
}
// Upon delivery of value 1 from at least N f instances of BA, provide
// input 0 to each instance of BA that has not yet been provided input.
for agreement_instance in self.agreement_instances.values_mut() {
if agreement_instance.accepts_input() {
agreement_instance.set_input(false)?;
let uid = agreement_instance.our_id().clone();
self.messages.extend(
agreement_instance
.message_iter()
.map(|msg| msg.map(|a_msg| Message::Agreement(uid.clone(), a_msg))),
);
if value && self.count_true() == self.num_nodes - self.num_faulty_nodes {
// Upon delivery of value 1 from at least N f instances of BA, provide
// input 0 to each instance of BA that has not yet been provided input.
for (uid, agreement) in &mut self.agreement_instances {
if agreement.accepts_input() {
agreement.set_input(false)?;
self.messages.extend_agreement(uid, agreement);
if let Some(output) = agreement.next_output() {
if self.agreement_results.insert(uid.clone(), output).is_some() {
return Err(Error::MultipleAgreementResults);
}
}
}
}
}
self.try_agreement_completion();
Ok(())
}
@ -291,6 +286,9 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
}
fn try_agreement_completion(&mut self) {
if self.decided || self.count_true() < self.num_nodes - self.num_faulty_nodes {
return;
}
// Once all instances of BA have completed, let C ⊂ [1..N] be
// the indexes of each BA that delivered 1. Wait for the output
// v_j for each RBC_j such that j∈C. Finally output j∈C v_j.
@ -318,6 +316,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
for (uid, result) in &broadcast_results {
debug!(" {:?} → {:?}", uid, HexBytes(&result));
}
self.decided = true;
self.output = Some(broadcast_results)
}
}
@ -329,6 +328,7 @@ pub enum Error {
NotImplemented,
NoSuchBroadcastInstance,
NoSuchAgreementInstance,
MultipleAgreementResults,
Broadcast(broadcast::Error),
Agreement(agreement::Error),
}

View File

@ -30,7 +30,7 @@ pub struct HoneyBadger<T, N: Eq + Hash + Ord + Clone> {
// `num_nodes * num_nodes * log(num_nodes)`.
batch_size: usize,
/// The messages that need to be sent to other nodes.
messages: VecDeque<TargetedMessage<Message<N>, N>>,
messages: MessageQueue<N>,
/// The outputs from completed epochs.
output: VecDeque<Batch<T>>,
}
@ -101,7 +101,7 @@ where
id,
batch_size,
all_uids,
messages: VecDeque::new(),
messages: MessageQueue(VecDeque::new()),
output: VecDeque::new(),
};
honey_badger.propose()?;
@ -124,11 +124,7 @@ where
}
};
cs.input(proposal)?;
for targeted_msg in cs.message_iter() {
let epoch = self.epoch;
let msg = targeted_msg.map(|cs_msg| Message::CommonSubset(epoch, cs_msg));
self.messages.push_back(msg);
}
self.messages.extend_with_epoch(self.epoch, cs);
Ok(())
}
@ -170,10 +166,7 @@ where
};
// Handle the message and put the outgoing messages into the queue.
cs.handle_message(sender_id, message)?;
for targeted_msg in cs.message_iter() {
let msg = targeted_msg.map(|cs_msg| Message::CommonSubset(epoch, cs_msg));
self.messages.push_back(msg);
}
self.messages.extend_with_epoch(epoch, cs);
}
// If this is the current epoch, the message could cause a new output.
if epoch == self.epoch {
@ -253,6 +246,20 @@ pub enum Message<N> {
// TODO: Decryption share.
}
/// The queue of outgoing messages in a `HoneyBadger` instance.
#[derive(Deref, DerefMut)]
struct MessageQueue<N>(VecDeque<TargetedMessage<Message<N>, N>>);
impl<N: Clone + Debug + Eq + Hash + Ord> MessageQueue<N> {
/// Appends to the queue the messages from `cs`, wrapped with `epoch`.
fn extend_with_epoch(&mut self, epoch: u64, cs: &mut CommonSubset<N>) {
let convert = |msg: TargetedMessage<common_subset::Message<N>, N>| {
msg.map(|cs_msg| Message::CommonSubset(epoch, cs_msg))
};
self.extend(cs.message_iter().map(convert));
}
}
/// A Honey Badger error.
#[derive(Debug)]
pub enum Error {

View File

@ -6,6 +6,8 @@
#![feature(optin_builtin_traits)]
extern crate bincode;
#[macro_use(Deref, DerefMut)]
extern crate derive_deref;
#[macro_use]
extern crate log;
extern crate itertools;

View File

@ -27,7 +27,7 @@ use std::iter;
use rand::Rng;
use hbbft::agreement::Agreement;
use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork};
use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork, TestNode};
fn test_agreement<A: Adversary<Agreement<NodeUid>>>(
mut network: TestNetwork<A, Agreement<NodeUid>>,
@ -39,13 +39,17 @@ fn test_agreement<A: Adversary<Agreement<NodeUid>>>(
}
// Handle messages in random order until all nodes have output the proposed value.
while network.nodes.values().any(|node| node.outputs().is_empty()) {
let id = network.step();
if let Some(&b) = network.nodes[&id].outputs().iter().next() {
if let Some(expected) = input {
assert_eq!(expected, b);
}
debug!("Node {:?} decided: {}", id, b);
while !network.nodes.values().all(TestNode::terminated) {
network.step();
}
// Verify that all instances output the same value.
let mut expected = input;
for node in network.nodes.values() {
if let Some(b) = expected {
assert!(iter::once(&b).eq(node.outputs()));
} else {
assert_eq!(1, node.outputs().len());
expected = Some(node.outputs()[0]);
}
}
}
@ -80,13 +84,13 @@ where
}
#[test]
fn test_agreement_random_silent_all_true() {
fn test_agreement_random_silent() {
let new_adversary = |_: usize, _: usize| SilentAdversary::new(MessageScheduler::Random);
test_agreement_different_sizes(new_adversary);
}
#[test]
fn test_agreement_first_silent_all_true() {
fn test_agreement_first_silent() {
let new_adversary = |_: usize, _: usize| SilentAdversary::new(MessageScheduler::First);
test_agreement_different_sizes(new_adversary);
}

View File

@ -82,12 +82,12 @@ fn test_broadcast<A: Adversary<Broadcast<NodeUid>>>(
network.input(NodeUid(0), proposed_value.to_vec());
// Handle messages in random order until all nodes have output the proposed value.
while network.nodes.values().any(|node| node.outputs().is_empty()) {
let id = network.step();
if !network.nodes[&id].outputs().is_empty() {
assert_eq!(vec![proposed_value.to_vec()], network.nodes[&id].outputs());
debug!("Node {:?} received", id);
}
while !network.nodes.values().all(TestNode::terminated) {
network.step();
}
// Verify that all instances output the proposed value.
for node in network.nodes.values() {
assert!(iter::once(&proposed_value.to_vec()).eq(node.outputs()));
}
}

View File

@ -9,11 +9,11 @@ extern crate rand;
mod network;
use std::collections::{BTreeMap, BTreeSet};
use std::iter;
use hbbft::common_subset::CommonSubset;
use hbbft::messaging::DistAlgorithm;
use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork};
use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork, TestNode};
type ProposedValue = Vec<u8>;
@ -22,7 +22,6 @@ fn test_common_subset<A: Adversary<CommonSubset<NodeUid>>>(
inputs: &BTreeMap<NodeUid, ProposedValue>,
) {
let ids: Vec<NodeUid> = network.nodes.keys().cloned().collect();
let mut decided_nodes: BTreeSet<NodeUid> = BTreeSet::new();
for id in ids {
if let Some(value) = inputs.get(&id) {
@ -31,21 +30,23 @@ fn test_common_subset<A: Adversary<CommonSubset<NodeUid>>>(
}
// Terminate when all good nodes do.
while network
.nodes
.values()
.any(|node| network.adv_nodes.contains(&node.algo.our_id()) || node.algo.terminated())
{
let id = network.step();
if let Some(output) = network.nodes[&id].outputs().iter().next() {
assert_eq!(inputs, output);
debug!("Node {:?} decided: {:?}", id, output);
// Test uniqueness of output of the good nodes.
if !network.adv_nodes.contains(&id) {
assert!(!decided_nodes.insert(id));
}
while !network.nodes.values().all(TestNode::terminated) {
network.step();
}
// Verify that all instances output the same set.
let mut expected = None;
for node in network.nodes.values() {
if let Some(output) = expected.as_ref() {
assert!(iter::once(output).eq(node.outputs()));
continue;
}
assert_eq!(1, node.outputs().len());
expected = Some(node.outputs()[0].clone());
}
let output = expected.unwrap();
assert!(output.len() * 3 > inputs.len() * 2);
for (id, value) in output {
assert_eq!(inputs[&id], value);
}
}

View File

@ -72,9 +72,7 @@ where
.chain(iter::once(rng.gen_range(6, 10)))
.chain(iter::once(rng.gen_range(11, 15)));
for size in sizes {
// TODO: Fix `CommonSubset` to tolerate faulty nodes.
// let num_faulty_nodes = (size - 1) / 3;
let num_faulty_nodes = 0;
let num_faulty_nodes = (size - 1) / 3;
let num_good_nodes = size - num_faulty_nodes;
info!(
"Network size: {} good nodes, {} faulty nodes",

View File

@ -14,7 +14,7 @@ pub struct TestNode<D: DistAlgorithm> {
/// This node's own ID.
id: D::NodeUid,
/// The instance of the broadcast algorithm.
pub algo: D,
algo: D,
/// Incoming messages from other nodes that this node has not yet handled.
pub queue: VecDeque<(D::NodeUid, D::Message)>,
/// The values this node has output so far.
@ -27,6 +27,12 @@ impl<D: DistAlgorithm> TestNode<D> {
&self.outputs
}
/// Returns whether the algorithm has terminated.
#[allow(unused)] // Not used in all tests.
pub fn terminated(&self) -> bool {
self.algo.terminated()
}
/// Creates a new test node with the given broadcast instance.
fn new(mut algo: D) -> TestNode<D> {
let outputs = algo.output_iter().collect();