Fix Agreement and re-enable HoneyBadger test.

This commit is contained in:
Andreas Fackler 2018-05-17 17:38:45 +02:00
parent 50d007b954
commit d5f9c4d40d
4 changed files with 114 additions and 193 deletions

View File

@ -4,6 +4,7 @@ use itertools::Itertools;
use std::collections::{BTreeSet, HashMap, VecDeque}; use std::collections::{BTreeSet, HashMap, VecDeque};
use std::fmt::Debug; use std::fmt::Debug;
use std::hash::Hash; use std::hash::Hash;
use std::mem;
use messaging::{DistAlgorithm, Target, TargetedMessage}; use messaging::{DistAlgorithm, Target, TargetedMessage};
@ -17,6 +18,15 @@ pub enum AgreementMessage {
Aux(u32, bool), Aux(u32, bool),
} }
impl AgreementMessage {
fn epoch(&self) -> u32 {
match *self {
AgreementMessage::BVal(epoch, _) => epoch,
AgreementMessage::Aux(epoch, _) => epoch,
}
}
}
/// Binary Agreement instance /// Binary Agreement instance
pub struct Agreement<NodeUid> { pub struct Agreement<NodeUid> {
/// The UID of the corresponding proposer node. /// The UID of the corresponding proposer node.
@ -44,6 +54,9 @@ pub struct Agreement<NodeUid> {
/// ever there at all. While the output value will still be required in a later epoch to decide /// ever there at all. While the output value will still be required in a later epoch to decide
/// the termination state. /// the termination state.
decision: Option<bool>, decision: Option<bool>,
/// A cache for messages for future epochs that cannot be handled yet.
// TODO: Find a better solution for this; defend against spam.
incoming_queue: Vec<(NodeUid, AgreementMessage)>,
/// Termination flag. The Agreement instance doesn't terminate immediately /// Termination flag. The Agreement instance doesn't terminate immediately
/// upon deciding on the agreed value. This is done in order to help other /// upon deciding on the agreed value. This is done in order to help other
/// nodes decide despite asynchrony of communication. Once the instance /// nodes decide despite asynchrony of communication. Once the instance
@ -71,18 +84,20 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> DistAlgorithm for Agreement<NodeU
sender_id: &Self::NodeUid, sender_id: &Self::NodeUid,
message: Self::Message, message: Self::Message,
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
if self.terminated {
return Err(Error::Terminated);
}
if message.epoch() < self.epoch {
return Ok(()); // Message is obsolete: We are already in a later epoch.
}
if message.epoch() > self.epoch {
// Message is for a later epoch. We can't handle that yet.
self.incoming_queue.push((sender_id.clone(), message));
return Ok(());
}
match message { match message {
// The algorithm instance has already terminated. AgreementMessage::BVal(_, b) => self.handle_bval(sender_id, b),
_ if self.terminated => Err(Error::Terminated), AgreementMessage::Aux(_, b) => self.handle_aux(sender_id, b),
AgreementMessage::BVal(epoch, b) if epoch == self.epoch => {
self.handle_bval(sender_id, b)
}
AgreementMessage::Aux(epoch, b) if epoch == self.epoch => self.handle_aux(sender_id, b),
// Epoch does not match. Ignore the message.
_ => Ok(()),
} }
} }
@ -108,7 +123,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> DistAlgorithm for Agreement<NodeU
} }
} }
impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> { impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
pub fn new(uid: NodeUid, num_nodes: usize) -> Self { pub fn new(uid: NodeUid, num_nodes: usize) -> Self {
let num_faulty_nodes = (num_nodes - 1) / 3; let num_faulty_nodes = (num_nodes - 1) / 3;
@ -124,6 +139,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
estimated: None, estimated: None,
output: None, output: None,
decision: None, decision: None,
incoming_queue: Vec::new(),
terminated: false, terminated: false,
messages: VecDeque::new(), messages: VecDeque::new(),
} }
@ -134,20 +150,16 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
if self.epoch != 0 || self.estimated.is_some() { if self.epoch != 0 || self.estimated.is_some() {
return Err(Error::InputNotAccepted); return Err(Error::InputNotAccepted);
} }
if self.num_nodes == 1 {
self.decision = Some(input);
self.output = Some(input);
self.terminated = true;
}
// Set the initial estimated value to the input value. // Set the initial estimated value to the input value.
self.estimated = Some(input); self.estimated = Some(input);
// Record the input value as sent. // Record the input value as sent.
self.sent_bval.insert(input); self.send_bval(input)
// Receive the BVAL message locally.
self.received_bval
.entry(self.uid.clone())
.or_insert_with(BTreeSet::new)
.insert(input);
// Multicast BVAL
self.messages
.push_back(AgreementMessage::BVal(self.epoch, input));
Ok(())
} }
/// Acceptance check to be performed before setting the input value. /// Acceptance check to be performed before setting the input value.
@ -178,32 +190,35 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
self.messages self.messages
.push_back(AgreementMessage::Aux(self.epoch, b)); .push_back(AgreementMessage::Aux(self.epoch, b));
// Receive the AUX message locally. // Receive the AUX message locally.
self.received_aux.insert(self.uid.clone(), b); let our_uid = self.uid.clone();
self.handle_aux(&our_uid, b)?;
} }
self.try_coin(); self.try_coin()?;
} }
// upon receiving BVAL_r(b) messages from f + 1 nodes, if // upon receiving BVAL_r(b) messages from f + 1 nodes, if
// BVAL_r(b) has not been sent, multicast BVAL_r(b) // BVAL_r(b) has not been sent, multicast BVAL_r(b)
else if count_bval == self.num_faulty_nodes + 1 && !self.sent_bval.contains(&b) { else if count_bval == self.num_faulty_nodes + 1 && !self.sent_bval.contains(&b) {
// Record the value `b` as sent. self.send_bval(b)?;
self.sent_bval.insert(b);
// Receive the BVAL message locally.
self.received_bval
.entry(self.uid.clone())
.or_insert_with(BTreeSet::new)
.insert(b);
// Multicast BVAL.
self.messages
.push_back(AgreementMessage::BVal(self.epoch, b));
} }
Ok(()) Ok(())
} }
fn send_bval(&mut self, b: bool) -> Result<(), Error> {
// Record the value `b` as sent.
self.sent_bval.insert(b);
// Multicast BVAL.
self.messages
.push_back(AgreementMessage::BVal(self.epoch, b));
// Receive the BVAL message locally.
let our_uid = self.uid.clone();
self.handle_bval(&our_uid, b)
}
fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result<(), Error> { fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result<(), Error> {
self.received_aux.insert(sender_id.clone(), b); self.received_aux.insert(sender_id.clone(), b);
if !self.bin_values.is_empty() { if !self.bin_values.is_empty() {
self.try_coin(); self.try_coin()?;
} }
Ok(()) Ok(())
} }
@ -237,11 +252,11 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
/// to compute the next decision estimate and outputs the optional decision /// to compute the next decision estimate and outputs the optional decision
/// value. The function may start the next epoch. In that case, it also /// value. The function may start the next epoch. In that case, it also
/// returns a message for broadcast. /// returns a message for broadcast.
fn try_coin(&mut self) { fn try_coin(&mut self) -> Result<(), Error> {
let (count_aux, vals) = self.count_aux(); let (count_aux, vals) = self.count_aux();
if count_aux < self.num_nodes - self.num_faulty_nodes { if count_aux < self.num_nodes - self.num_faulty_nodes {
// Continue waiting for the (N - f) AUX messages. // Continue waiting for the (N - f) AUX messages.
return; return Ok(());
} }
debug!("{:?} try_coin in epoch {}", self.uid, self.epoch); debug!("{:?} try_coin in epoch {}", self.uid, self.epoch);
@ -255,6 +270,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
self.terminated = self.terminated || self.decision == Some(coin); self.terminated = self.terminated || self.decision == Some(coin);
if self.terminated { if self.terminated {
debug!("Agreement instance {:?} terminated", self.uid); debug!("Agreement instance {:?} terminated", self.uid);
return Ok(());
} }
// Start the next epoch. // Start the next epoch.
@ -286,9 +302,12 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
}; };
let b = self.estimated.unwrap(); let b = self.estimated.unwrap();
self.sent_bval.insert(b); self.send_bval(b)?;
self.messages let queued_msgs = mem::replace(&mut self.incoming_queue, Vec::new());
.push_back(AgreementMessage::BVal(self.epoch, b)); for (sender_id, msg) in queued_msgs {
self.handle_message(&sender_id, msg)?;
}
Ok(())
} }
} }

View File

@ -13,176 +13,80 @@
//! //!
//! TODO: Implement adversaries and send BVAL messages at different times. //! TODO: Implement adversaries and send BVAL messages at different times.
extern crate env_logger;
extern crate hbbft; extern crate hbbft;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate env_logger; extern crate rand;
use std::collections::{BTreeMap, VecDeque}; mod network;
use hbbft::agreement::{Agreement, AgreementMessage}; use std::collections::BTreeSet;
use hbbft::messaging::{DistAlgorithm, Target, TargetedMessage}; use std::iter;
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] use rand::Rng;
struct NodeUid(usize);
/// The queue of messages of a particular Agreement instance. use hbbft::agreement::Agreement;
type MessageQueue = VecDeque<TargetedMessage<AgreementMessage, NodeUid>>; use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork};
struct TestNode { fn test_agreement<A: Adversary<Agreement<NodeUid>>>(
/// Sender ID. mut network: TestNetwork<A, Agreement<NodeUid>>,
id: NodeUid, input: Option<bool>,
/// The only agreement instance. ) {
agreement: Agreement<NodeUid>, let ids: Vec<NodeUid> = network.nodes.keys().cloned().collect();
/// Queue of tuples of a sender ID and a message. for id in ids {
queue: VecDeque<(NodeUid, AgreementMessage)>, network.input(id, input.unwrap_or_else(rand::random));
/// All outputs
outputs: Vec<bool>,
}
impl TestNode {
fn new(id: NodeUid, agreement: Agreement<NodeUid>) -> TestNode {
TestNode {
id,
agreement,
queue: VecDeque::new(),
outputs: Vec::new(),
}
} }
fn handle_message(&mut self) -> (Option<bool>, MessageQueue) { // Handle messages in random order until all nodes have output the proposed value.
let output; while network.nodes.values().any(|node| node.outputs().is_empty()) {
let (sender_id, message) = self.queue let id = network.step();
.pop_front() if let Some(&b) = network.nodes[&id].outputs().iter().next() {
.expect("popping a message off the queue"); if let Some(expected) = input {
self.agreement assert_eq!(expected, b);
.handle_message(&sender_id, message)
.expect("handling an agreement message");
if let Some(b) = self.agreement.next_output() {
self.outputs.push(b);
output = Some(b);
} else {
output = None;
}
let messages = self.agreement.message_iter().collect();
debug!("{:?} produced messages: {:?}", self.id, messages);
(output, messages)
}
}
struct TestNetwork {
nodes: BTreeMap<NodeUid, TestNode>,
/// The next node to handle a message in its queue.
scheduled_node_id: NodeUid,
}
impl TestNetwork {
fn new(num_nodes: usize) -> TestNetwork {
// Make a node with an Agreement instance associated with the proposer node 0.
let make_node =
|id: NodeUid| (id, TestNode::new(id, Agreement::new(NodeUid(0), num_nodes)));
TestNetwork {
nodes: (0..num_nodes).map(NodeUid).map(make_node).collect(),
scheduled_node_id: NodeUid(0),
}
}
fn dispatch_messages(&mut self, sender_id: NodeUid, messages: MessageQueue) {
for message in messages {
match message {
TargetedMessage {
target: Target::Node(id),
message,
} => {
let node = self.nodes.get_mut(&id).expect("finding recipient node");
node.queue.push_back((sender_id, message));
}
TargetedMessage {
target: Target::All,
message,
} => {
// Multicast the message to other nodes.
let _: Vec<()> = self.nodes
.iter_mut()
.filter(|(id, _)| **id != sender_id)
.map(|(_, node)| node.queue.push_back((sender_id, message.clone())))
.collect();
}
} }
debug!("Node {:?} decided: {}", id, b);
} }
} }
// Gets a node for receiving a message and picks the next node with a
// non-empty message queue in a cyclic order.
fn pick_node(&mut self) -> NodeUid {
let id = self.scheduled_node_id;
// Try a node with a higher ID for fairness.
if let Some(next_id) = self.nodes
.iter()
.find(|(&next_id, node)| id < next_id && !node.queue.is_empty())
.map(|(id, _)| *id)
{
self.scheduled_node_id = next_id;
} else {
// Fall back to nodes up to the currently scheduled ID.
self.scheduled_node_id = self.nodes
.iter()
.find(|(&next_id, node)| id >= next_id && !node.queue.is_empty())
.map(|(id, _)| *id)
.expect("no more messages in any node's queue")
}
debug!("Picked node {:?}", self.scheduled_node_id);
id
}
fn step(&mut self) -> (NodeUid, Option<bool>) {
let sender_id = self.pick_node();
let (output, messages) = self.nodes.get_mut(&sender_id).unwrap().handle_message();
self.dispatch_messages(sender_id, messages);
(sender_id, output)
}
fn set_input(&mut self, sender_id: NodeUid, input: bool) {
let messages = {
let instance = &mut self.nodes.get_mut(&sender_id).unwrap().agreement;
instance.set_input(input).expect("set input");
instance.message_iter().collect()
};
self.dispatch_messages(sender_id, messages);
}
} }
fn test_agreement(mut network: TestNetwork) -> BTreeMap<NodeUid, TestNode> { fn test_agreement_different_sizes<A, F>(new_adversary: F)
where
A: Adversary<Agreement<NodeUid>>,
F: Fn(usize, usize) -> A,
{
// This returns an error in all but the first test.
let _ = env_logger::try_init(); let _ = env_logger::try_init();
// Pick the first node with a non-empty queue. let mut rng = rand::thread_rng();
network.pick_node(); let sizes = (1..6)
.chain(iter::once(rng.gen_range(6, 20)))
while network.nodes.values().any(|node| node.outputs.is_empty()) { .chain(iter::once(rng.gen_range(30, 50)));
let (NodeUid(id), output) = network.step(); for size in sizes {
if let Some(value) = output { let num_faulty_nodes = (size - 1) / 3;
debug!("Node {} output {}", id, value); let num_good_nodes = size - num_faulty_nodes;
info!(
"Network size: {} good nodes, {} faulty nodes",
num_good_nodes, num_faulty_nodes
);
for &input in &[None, Some(false), Some(true)] {
let adversary = new_adversary(num_good_nodes, num_faulty_nodes);
let new_agreement = |id, all_ids: BTreeSet<_>| Agreement::new(id, all_ids.len());
let network =
TestNetwork::new(num_good_nodes, num_faulty_nodes, adversary, new_agreement);
test_agreement(network, input);
} }
} }
network.nodes
} }
/// Test 4 correct nodes. One of the nodes, #3, hasn't finished broadcast yet
/// and gets an input 0 as a result.
#[test] #[test]
fn test_agreement_and_validity_with_1_late_node() { fn test_agreement_random_silent_all_true() {
let mut network = TestNetwork::new(4); let new_adversary = |_: usize, _: usize| SilentAdversary::new(MessageScheduler::Random);
test_agreement_different_sizes(new_adversary);
network.set_input(NodeUid(0), true); }
network.set_input(NodeUid(1), true);
network.set_input(NodeUid(2), true); #[test]
network.set_input(NodeUid(3), false); fn test_agreement_first_silent_all_true() {
let new_adversary = |_: usize, _: usize| SilentAdversary::new(MessageScheduler::First);
let nodes = test_agreement(network); test_agreement_different_sizes(new_adversary);
for node in nodes.values() {
assert_eq!(node.outputs, vec![true]);
}
} }

View File

@ -107,7 +107,7 @@ where
for size in sizes { for size in sizes {
let num_faulty_nodes = (size - 1) / 3; let num_faulty_nodes = (size - 1) / 3;
let num_good_nodes = size - num_faulty_nodes; let num_good_nodes = size - num_faulty_nodes;
println!( info!(
"Network size: {} good nodes, {} faulty nodes", "Network size: {} good nodes, {} faulty nodes",
num_good_nodes, num_faulty_nodes num_good_nodes, num_faulty_nodes
); );

View File

@ -97,8 +97,6 @@ fn test_honey_badger_random_delivery_silent() {
test_honey_badger_different_sizes(new_adversary, 10); test_honey_badger_different_sizes(new_adversary, 10);
} }
// TODO: This test is flaky. Debug.
#[ignore]
#[test] #[test]
fn test_honey_badger_first_delivery_silent() { fn test_honey_badger_first_delivery_silent() {
let new_adversary = |_: usize, _: usize| SilentAdversary::new(MessageScheduler::First); let new_adversary = |_: usize, _: usize| SilentAdversary::new(MessageScheduler::First);