Make sure Term messages are handled as BVal, Aux, Conf.

This commit is contained in:
Andreas Fackler 2018-07-26 14:19:01 +02:00
parent 35edde9007
commit 08d6abf6b4
1 changed files with 20 additions and 24 deletions

View File

@ -229,7 +229,7 @@ impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for Agreement<NodeUid> {
AgreementContent::BVal(b) => self.handle_bval(sender_id, b), AgreementContent::BVal(b) => self.handle_bval(sender_id, b),
AgreementContent::Aux(b) => self.handle_aux(sender_id, b), AgreementContent::Aux(b) => self.handle_aux(sender_id, b),
AgreementContent::Conf(v) => self.handle_conf(sender_id, v), AgreementContent::Conf(v) => self.handle_conf(sender_id, v),
AgreementContent::Term(v) => Ok(self.handle_term(sender_id, v)), AgreementContent::Term(v) => self.handle_term(sender_id, v),
AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg), AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg),
} }
} }
@ -443,16 +443,20 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
/// Receives a `Term(v)` message. If we haven't yet decided on a value and there are more than /// Receives a `Term(v)` message. If we haven't yet decided on a value and there are more than
/// `num_faulty` such messages with the same value from different nodes, performs expedite /// `num_faulty` such messages with the same value from different nodes, performs expedite
/// termination: decides on `v`, broadcasts `Term(v)` and terminates the instance. /// termination: decides on `v`, broadcasts `Term(v)` and terminates the instance.
fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> Step<NodeUid> { fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> Result<Step<NodeUid>> {
self.received_term.insert(sender_id.clone(), b); self.received_term.insert(sender_id.clone(), b);
// Check for the expedite termination condition. // Check for the expedite termination condition.
if self.decision.is_none() if self.decision.is_none()
&& self.received_term.iter().filter(|(_, &c)| b == c).count() && self.received_term.iter().filter(|(_, &c)| b == c).count()
> self.netinfo.num_faulty() > self.netinfo.num_faulty()
{ {
self.decide(b) Ok(self.decide(b))
} else { } else {
Step::default() // Otherwise handle the `Term` as a `BVal`, `Aux` and `Conf`.
let mut step = self.handle_bval(sender_id, b)?;
step.extend(self.handle_aux(sender_id, b)?);
step.extend(self.handle_conf(sender_id, BinValues::from_bool(b))?);
Ok(step)
} }
} }
@ -491,22 +495,22 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
return Ok(Step::default()); return Ok(Step::default());
} }
let mut step = Step::default(); if self.decision.is_none() && Some(coin) == def_bin_value {
return Ok(self.decide(coin));
}
let b = if let Some(b) = def_bin_value { let b = def_bin_value.unwrap_or(coin);
// Outputting a value is allowed only once.
if self.decision.is_none() && b == coin {
step.extend(self.decide(b));
}
b
} else {
coin
};
self.update_epoch(); self.update_epoch();
self.estimated = Some(b); self.estimated = Some(b);
step.extend(self.send_bval(b)?); let mut step = self.send_bval(b)?;
for (sender_id, b) in self.received_term.clone() {
step.extend(self.handle_term(&sender_id, b)?);
if self.terminated {
return Ok(step);
}
}
let queued_msgs = replace(&mut self.incoming_queue, Vec::new()); let queued_msgs = replace(&mut self.incoming_queue, Vec::new());
for (sender_id, msg) in queued_msgs { for (sender_id, msg) in queued_msgs {
step.extend(self.handle_message(&sender_id, msg)?); step.extend(self.handle_message(&sender_id, msg)?);
@ -587,20 +591,12 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
/// f agreeing messages would not always terminate. We can, however, expect every good node to /// f agreeing messages would not always terminate. We can, however, expect every good node to
/// send an `Aux` value that will eventually end up in our `bin_values`. /// send an `Aux` value that will eventually end up in our `bin_values`.
fn count_aux(&self) -> (usize, BinValues) { fn count_aux(&self) -> (usize, BinValues) {
let mut aux: BTreeMap<_, _> = self let aux: BTreeMap<_, _> = self
.received_aux .received_aux
.iter() .iter()
.filter(|(_, &b)| self.bin_values.contains(b)) .filter(|(_, &b)| self.bin_values.contains(b))
.collect(); .collect();
let term: BTreeMap<_, _> = self
.received_term
.iter()
.filter(|(_, &b)| self.bin_values.contains(b))
.collect();
// Ensure that nodes are not counted twice.
aux.extend(term);
let bin: BinValues = aux.values().map(|&&v| BinValues::from_bool(v)).collect(); let bin: BinValues = aux.values().map(|&&v| BinValues::from_bool(v)).collect();
(aux.len(), bin) (aux.len(), bin)
} }