corrected the CommonSubset module and test to decide on a map of values insetead of a set

This commit is contained in:
Vladimir Komendantskiy 2018-05-15 18:18:05 +01:00
parent ff3e819a4f
commit c8034da332
4 changed files with 42 additions and 19 deletions

View File

@ -2,6 +2,7 @@
use itertools::Itertools;
use std::collections::{BTreeSet, HashMap, VecDeque};
use std::fmt::Debug;
use std::hash::Hash;
/// Type of output from the Agreement message handler. The first component is
@ -20,7 +21,7 @@ pub enum AgreementMessage {
Aux(u32, bool),
}
/// Binary Agreement instance.
/// Binary Agreement instance
pub struct Agreement<NodeUid> {
/// The UID of the corresponding proposer node.
uid: NodeUid,
@ -49,7 +50,7 @@ pub struct Agreement<NodeUid> {
terminated: bool,
}
impl<NodeUid: Clone + Eq + Hash> Agreement<NodeUid> {
impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
pub fn new(uid: NodeUid, num_nodes: usize) -> Self {
let num_faulty_nodes = (num_nodes - 1) / 3;
@ -222,6 +223,7 @@ impl<NodeUid: Clone + Eq + Hash> Agreement<NodeUid> {
return (None, Vec::new());
}
debug!("{:?} try_coin in epoch {}", self.uid, self.epoch);
// FIXME: Implement the Common Coin algorithm. At the moment the
// coin value is common across different nodes but not random.
let coin = (self.epoch % 2) == 0;
@ -230,13 +232,17 @@ impl<NodeUid: Clone + Eq + Hash> Agreement<NodeUid> {
// value b is output in some round r, and the value Coin_r' = b for
// some round r' > r."
self.terminated = self.terminated || self.output == Some(coin);
if self.terminated {
debug!("Agreement instance {:?} terminated", self.uid);
}
// Start the next epoch.
self.bin_values.clear();
self.received_bval.clear();
self.sent_bval.clear();
self.received_aux.clear();
self.sent_bval.clear();
self.epoch += 1;
debug!("Agreement instance {:?} started epoch {}", self.uid, self.epoch);
let decision = if vals.len() != 1 {
self.estimated = Some(coin);
@ -250,6 +256,7 @@ impl<NodeUid: Clone + Eq + Hash> Agreement<NodeUid> {
if self.output.is_none() && b == coin {
// Output the agreement value.
self.output = Some(b);
debug!("Agreement instance {:?} output: {}", self.uid, b);
self.output
} else {
None

View File

@ -19,7 +19,7 @@ use messaging::{DistAlgorithm, Target, TargetedMessage};
type ProposedValue = Vec<u8>;
// Type of output from the Common Subset message handler.
type CommonSubsetOutput<NodeUid> = (
Option<HashSet<ProposedValue>>,
Option<HashMap<NodeUid, ProposedValue>>,
VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>,
);
@ -225,7 +225,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
// Process Agreement outputs.
if let Some(b) = output {
outgoing.append(&mut self.on_agreement_result(proposer_id, b)?);
outgoing.append(&mut self.on_agreement_output(proposer_id, b)?);
}
// Check whether Agreement has completed.
@ -236,15 +236,16 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
))
}
/// Callback to be invoked on receipt of a returned value of the Agreement
/// Callback to be invoked on receipt of the decision value of the Agreement
/// instance `uid`.
fn on_agreement_result(
fn on_agreement_output(
&mut self,
element_proposer_id: &NodeUid,
result: bool,
) -> Result<VecDeque<AgreementMessage>, Error> {
self.agreement_results
.insert(element_proposer_id.clone(), result);
debug!("Updated Agreement results: {:?}", self.agreement_results);
if !result || self.count_true() < self.num_nodes - self.num_faulty_nodes {
return Ok(VecDeque::new());
}
@ -265,7 +266,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
self.agreement_results.values().filter(|v| **v).count()
}
fn try_agreement_completion(&self) -> Option<HashSet<ProposedValue>> {
fn try_agreement_completion(&self) -> Option<HashMap<NodeUid, ProposedValue>> {
// Once all instances of BA have completed, let C ⊂ [1..N] be
// the indexes of each BA that delivered 1. Wait for the output
// v_j for each RBC_j such that j∈C. Finally output j∈C v_j.
@ -273,20 +274,26 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
.values()
.all(|instance| instance.terminated())
{
debug!("All Agreement instances have terminated");
// All instances of Agreement that delivered `true` (or "1" in the paper).
let delivered_1: HashSet<&NodeUid> = self.agreement_results
.iter()
.filter(|(_, v)| **v)
.map(|(k, _)| k)
.collect();
debug!("Agreement instances that delivered 1: {:?}", delivered_1);
// Results of Broadcast instances in `delivered_1`
let broadcast_results: HashSet<ProposedValue> = self.broadcast_results
let broadcast_results: HashMap<NodeUid, ProposedValue> = self.broadcast_results
.iter()
.filter(|(k, _)| delivered_1.get(k).is_some())
.map(|(_, v)| v.clone())
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
debug!("Broadcast results among the Agreement instances that delivered 1: {:?}",
broadcast_results);
if delivered_1.len() == broadcast_results.len() {
debug!("Agreement instances completed with {:?}", broadcast_results);
Some(broadcast_results)
} else {
None

View File

@ -138,14 +138,16 @@ where
return Ok(());
}
let (cs_out, cs_msgs) = self.common_subset.handle_message(sender_id, message)?;
for targeted_msg in cs_msgs {
let msg = targeted_msg.map(|cs_msg| Message::CommonSubset(epoch, cs_msg));
self.messages.push_back(msg);
}
// FIXME: Handle the node IDs in `ser_batches`.
let batches: Vec<Vec<T>> = if let Some(ser_batches) = cs_out {
ser_batches
.into_iter()
.map(|ser_batch| bincode::deserialize(&ser_batch))
.map(|(_, ser_batch)| bincode::deserialize(&ser_batch))
.collect::<Result<_, _>>()?
} else {
return Ok(());

View File

@ -5,7 +5,7 @@ extern crate hbbft;
extern crate log;
extern crate env_logger;
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::collections::{BTreeMap, HashSet, HashMap, VecDeque};
use hbbft::common_subset;
use hbbft::common_subset::CommonSubset;
@ -29,7 +29,7 @@ struct TestNode {
/// Queue of tuples of a sender ID and a message.
queue: VecDeque<(NodeUid, common_subset::Message<NodeUid>)>,
/// The output of the Common Subset algorithm, if there is one.
decision: Option<HashSet<ProposedValue>>,
decision: Option<HashMap<NodeUid, ProposedValue>>,
}
impl TestNode {
@ -42,7 +42,7 @@ impl TestNode {
}
}
fn handle_message(&mut self) -> (Option<HashSet<Vec<u8>>>, OutputQueue) {
fn handle_message(&mut self) -> (Option<HashMap<NodeUid, Vec<u8>>>, OutputQueue) {
let (sender_id, message) = self.queue
.pop_front()
.expect("popping a message off the queue");
@ -114,7 +114,7 @@ impl TestNetwork {
id
}
fn step(&mut self) -> (NodeUid, Option<HashSet<ProposedValue>>) {
fn step(&mut self) -> (NodeUid, Option<HashMap<NodeUid, ProposedValue>>) {
let sender_id = self.pick_node();
let (output, messages) = self.nodes.get_mut(&sender_id).unwrap().handle_message();
let messages = messages
@ -126,8 +126,7 @@ impl TestNetwork {
}
/// Make Node 0 propose a value.
fn send_proposed_value(&mut self, value: ProposedValue) {
let sender_id = NodeUid(0);
fn send_proposed_value(&mut self, sender_id: NodeUid, value: ProposedValue) {
let messages = self.nodes
.get_mut(&sender_id)
.unwrap()
@ -164,14 +163,22 @@ fn test_common_subset_4_nodes() {
let proposed_value = Vec::from("Fake news");
let all_ids: HashSet<NodeUid> = (0..4).map(NodeUid).collect();
let mut network = TestNetwork::new(&all_ids);
let expected_node_decision: HashMap<NodeUid, ProposedValue> = all_ids
.iter()
.map(|id| (id.clone(), proposed_value.clone()))
.collect();
network.send_proposed_value(NodeUid(0), proposed_value.clone());
network.send_proposed_value(NodeUid(1), proposed_value.clone());
network.send_proposed_value(NodeUid(2), proposed_value.clone());
network.send_proposed_value(NodeUid(3), proposed_value.clone());
network.send_proposed_value(proposed_value.clone());
let nodes = test_common_subset(network);
for node in nodes.values() {
assert_eq!(
node.decision,
Some(vec![proposed_value.clone()].into_iter().collect())
Some(expected_node_decision.clone())
);
}
}