added a latch for the decided value in Agreement to remember the output value

This commit is contained in:
Vladimir Komendantskiy 2018-05-17 10:43:56 +01:00
parent 0c386276b1
commit c8c8e1bb1f
4 changed files with 51 additions and 33 deletions

View File

@ -38,6 +38,12 @@ pub struct Agreement<NodeUid> {
/// and then never changed. That is, no instance of Binary Agreement can /// and then never changed. That is, no instance of Binary Agreement can
/// decide on two different values of output. /// decide on two different values of output.
output: Option<bool>, output: Option<bool>,
/// A permanent, latching copy of the output value. This copy is required because `output` can
/// be consumed using `DistAlgorithm::next_output` immediately after the instance finishing to
/// handle a message, in which case it would otherwise be unknown whether the output value was
/// ever there at all. While the output value will still be required in a later epoch to decide
/// the termination state.
decision: Option<bool>,
/// Termination flag. The Agreement instance doesn't terminate immediately /// Termination flag. The Agreement instance doesn't terminate immediately
/// upon deciding on the agreed value. This is done in order to help other /// upon deciding on the agreed value. This is done in order to help other
/// nodes decide despite asynchrony of communication. Once the instance /// nodes decide despite asynchrony of communication. Once the instance
@ -117,6 +123,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
received_aux: HashMap::new(), received_aux: HashMap::new(),
estimated: None, estimated: None,
output: None, output: None,
decision: None,
terminated: false, terminated: false,
messages: VecDeque::new(), messages: VecDeque::new(),
} }
@ -174,9 +181,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
self.received_aux.insert(self.uid.clone(), b); self.received_aux.insert(self.uid.clone(), b);
} }
let (decision, maybe_message) = self.try_coin(); self.try_coin();
self.messages.extend(maybe_message);
self.output = decision;
} }
// upon receiving BVAL_r(b) messages from f + 1 nodes, if // upon receiving BVAL_r(b) messages from f + 1 nodes, if
// BVAL_r(b) has not been sent, multicast BVAL_r(b) // BVAL_r(b) has not been sent, multicast BVAL_r(b)
@ -198,9 +203,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result<(), Error> { fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result<(), Error> {
self.received_aux.insert(sender_id.clone(), b); self.received_aux.insert(sender_id.clone(), b);
if !self.bin_values.is_empty() { if !self.bin_values.is_empty() {
let (decision, maybe_message) = self.try_coin(); self.try_coin();
self.messages.extend(maybe_message);
self.output = decision;
} }
Ok(()) Ok(())
} }
@ -234,11 +237,11 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
/// to compute the next decision estimate and outputs the optional decision /// to compute the next decision estimate and outputs the optional decision
/// value. The function may start the next epoch. In that case, it also /// value. The function may start the next epoch. In that case, it also
/// returns a message for broadcast. /// returns a message for broadcast.
fn try_coin(&mut self) -> (Option<bool>, Vec<AgreementMessage>) { fn try_coin(&mut self) {
let (count_aux, vals) = self.count_aux(); let (count_aux, vals) = self.count_aux();
if count_aux < self.num_nodes - self.num_faulty_nodes { if count_aux < self.num_nodes - self.num_faulty_nodes {
// Continue waiting for the (N - f) AUX messages. // Continue waiting for the (N - f) AUX messages.
return (None, Vec::new()); return;
} }
debug!("{:?} try_coin in epoch {}", self.uid, self.epoch); debug!("{:?} try_coin in epoch {}", self.uid, self.epoch);
@ -249,7 +252,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
// Check the termination condition: "continue looping until both a // Check the termination condition: "continue looping until both a
// value b is output in some round r, and the value Coin_r' = b for // value b is output in some round r, and the value Coin_r' = b for
// some round r' > r." // some round r' > r."
self.terminated = self.terminated || self.output == Some(coin); self.terminated = self.terminated || self.decision == Some(coin);
if self.terminated { if self.terminated {
debug!("Agreement instance {:?} terminated", self.uid); debug!("Agreement instance {:?} terminated", self.uid);
} }
@ -286,8 +289,13 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
let b = self.estimated.unwrap(); let b = self.estimated.unwrap();
self.sent_bval.insert(b); self.sent_bval.insert(b);
let bval_msg = AgreementMessage::BVal(self.epoch, b); self.messages
(decision, vec![bval_msg]) .push_back(AgreementMessage::BVal(self.epoch, b));
self.output = decision;
// Latch the decided state.
if decision.is_some() {
self.decision = decision;
}
} }
} }

View File

@ -99,6 +99,10 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> DistAlgorithm for CommonSubset<No
} }
fn terminated(&self) -> bool { fn terminated(&self) -> bool {
debug!(
"Termination check. Terminated Agreement instances: {:?}",
self.agreement_instances.values().all(Agreement::terminated)
);
self.messages.is_empty() && self.agreement_instances.values().all(Agreement::terminated) self.messages.is_empty() && self.agreement_instances.values().all(Agreement::terminated)
} }
@ -169,10 +173,11 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) { if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) {
if agreement_instance.accepts_input() { if agreement_instance.accepts_input() {
agreement_instance.set_input(true)?; agreement_instance.set_input(true)?;
if let Some(msg) = agreement_instance.next_message() { self.messages.extend(
self.messages agreement_instance
.push_back(msg.map(|a_msg| Message::Agreement(uid.clone(), a_msg))); .message_iter()
} .map(|msg| msg.map(|a_msg| Message::Agreement(uid.clone(), a_msg))),
);
} }
} else { } else {
return Err(Error::NoSuchBroadcastInstance); return Err(Error::NoSuchBroadcastInstance);
@ -227,14 +232,15 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
} }
// Send the message to the agreement instance. // Send the message to the agreement instance.
agreement_instance.handle_message(sender_id, amessage.clone())?; agreement_instance.handle_message(sender_id, amessage.clone())?;
while let Some(msg) = agreement_instance.next_message() { self.messages.extend(
self.messages agreement_instance
.push_back(msg.map(|a_msg| Message::Agreement(proposer_id.clone(), a_msg))); .message_iter()
} .map(|msg| msg.map(|a_msg| Message::Agreement(proposer_id.clone(), a_msg))),
);
input_result = agreement_instance.next_output(); input_result = agreement_instance.next_output();
} else { } else {
debug!("Proposer {:?} does not exist.", proposer_id); debug!("Proposer {:?} does not exist.", proposer_id);
return Ok(()); return Err(Error::NoSuchAgreementInstance);
} }
if let Some(output) = input_result { if let Some(output) = input_result {
@ -268,11 +274,12 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
for agreement_instance in self.agreement_instances.values_mut() { for agreement_instance in self.agreement_instances.values_mut() {
if agreement_instance.accepts_input() { if agreement_instance.accepts_input() {
agreement_instance.set_input(false)?; agreement_instance.set_input(false)?;
if let Some(msg) = agreement_instance.next_message() { let uid = agreement_instance.our_id().clone();
self.messages.push_back(msg.map(|a_msg| { self.messages.extend(
Message::Agreement(agreement_instance.our_id().clone(), a_msg) agreement_instance
})); .message_iter()
} .map(|msg| msg.map(|a_msg| Message::Agreement(uid.clone(), a_msg))),
);
} }
} }
Ok(()) Ok(())

View File

@ -53,8 +53,8 @@ impl TestNode {
let (sender_id, message) = self.queue let (sender_id, message) = self.queue
.pop_front() .pop_front()
.expect("popping a message off the queue"); .expect("popping a message off the queue");
let (output, messages) = self.agreement self.agreement
.handle_agreement_message(&sender_id, &message) .handle_message(&sender_id, &message)
.expect("handling an agreement message"); .expect("handling an agreement message");
debug!("{:?} produced messages: {:?}", self.id, messages); debug!("{:?} produced messages: {:?}", self.id, messages);
if let Some(output) = output { if let Some(output) = output {

View File

@ -160,19 +160,22 @@ fn test_common_subset(mut network: TestNetwork) -> BTreeMap<NodeUid, TestNode> {
} }
#[test] #[test]
fn test_common_subset_4_nodes_same_proposed_value() { fn test_common_subset_3_out_of_4_nodes_propose() {
let proposed_value = Vec::from("Fake news"); let proposed_value = Vec::from("Fake news");
let all_ids: HashSet<NodeUid> = (0..4).map(NodeUid).collect(); let all_ids: HashSet<NodeUid> = (0..4).map(NodeUid).collect();
let mut network = TestNetwork::new(&all_ids); let mut network = TestNetwork::new(&all_ids);
let expected_node_decision: HashMap<NodeUid, ProposedValue> = all_ids
let proposing_ids: HashSet<NodeUid> = (0..3).map(NodeUid).collect();
let expected_node_decision: HashMap<NodeUid, ProposedValue> = proposing_ids
.iter() .iter()
.map(|id| (*id, proposed_value.clone())) .map(|id| (*id, proposed_value.clone()))
.collect(); .collect();
network.send_proposed_value(NodeUid(0), proposed_value.clone()); // Nodes propose values.
network.send_proposed_value(NodeUid(1), proposed_value.clone()); let _: Vec<()> = proposing_ids
network.send_proposed_value(NodeUid(2), proposed_value.clone()); .iter()
network.send_proposed_value(NodeUid(3), proposed_value.clone()); .map(|id| network.send_proposed_value(*id, proposed_value.clone()))
.collect();
let nodes = test_common_subset(network); let nodes = test_common_subset(network);