Remove output and message queue from Agreement.

This commit is contained in:
Andreas Fackler 2018-07-24 11:43:35 +02:00
parent c23aebffb4
commit 102fa0e01d
1 changed files with 79 additions and 105 deletions

View File

@ -66,7 +66,7 @@
pub mod bin_values;
use rand;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug;
use std::mem::replace;
use std::sync::Arc;
@ -75,8 +75,7 @@ use itertools::Itertools;
use agreement::bin_values::BinValues;
use common_coin::{self, CommonCoin, CommonCoinMessage};
use fault_log::FaultLog;
use messaging::{self, DistAlgorithm, NetworkInfo, Target, TargetedMessage};
use messaging::{self, DistAlgorithm, NetworkInfo, Target};
error_chain!{
links {
@ -176,10 +175,6 @@ pub struct Agreement<NodeUid> {
received_term: BTreeMap<NodeUid, bool>,
/// The estimate of the decision value in the current epoch.
estimated: Option<bool>,
/// The value output by the agreement instance. It is set once to `Some(b)`
/// and then never changed. That is, no instance of Binary Agreement can
/// decide on two different values of output.
output: Option<bool>,
/// A permanent, latching copy of the output value. This copy is required because `output` can
/// be consumed using `DistAlgorithm::next_output` immediately after the instance finishing to
/// handle a message, in which case it would otherwise be unknown whether the output value was
@ -193,8 +188,6 @@ pub struct Agreement<NodeUid> {
/// agreement or have the necessary information to reach agreement, it sets the `terminated`
/// flag and accepts no more incoming messages.
terminated: bool,
/// The outgoing message queue.
messages: VecDeque<AgreementMessage>,
/// Whether the `Conf` message round has started in the current epoch.
conf_round: bool,
/// A common coin instance. It is reset on epoch update.
@ -213,8 +206,7 @@ impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for Agreement<NodeUid> {
type Error = Error;
fn input(&mut self, input: Self::Input) -> Result<Step<NodeUid>> {
let fault_log = self.set_input(input)?;
self.step(fault_log)
self.set_input(input)
}
/// Receive input from a remote node.
@ -223,23 +215,22 @@ impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for Agreement<NodeUid> {
sender_id: &Self::NodeUid,
message: Self::Message,
) -> Result<Step<NodeUid>> {
let fault_log = if self.terminated || message.epoch < self.epoch {
if self.terminated || message.epoch < self.epoch {
// Message is obsolete: We are already in a later epoch or terminated.
FaultLog::new()
Ok(Step::default())
} else if message.epoch > self.epoch {
// Message is for a later epoch. We can't handle that yet.
self.incoming_queue.push((sender_id.clone(), message));
FaultLog::new()
Ok(Step::default())
} else {
match message.content {
AgreementContent::BVal(b) => self.handle_bval(sender_id, b)?,
AgreementContent::Aux(b) => self.handle_aux(sender_id, b)?,
AgreementContent::Conf(v) => self.handle_conf(sender_id, v)?,
AgreementContent::Term(v) => self.handle_term(sender_id, v)?,
AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg)?,
AgreementContent::BVal(b) => self.handle_bval(sender_id, b),
AgreementContent::Aux(b) => self.handle_aux(sender_id, b),
AgreementContent::Conf(v) => self.handle_conf(sender_id, v),
AgreementContent::Term(v) => Ok(self.handle_term(sender_id, v)),
AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg),
}
}
};
self.step(fault_log)
}
/// Whether the algorithm has terminated.
@ -272,11 +263,9 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
received_conf: BTreeMap::new(),
received_term: BTreeMap::new(),
estimated: None,
output: None,
decision: None,
incoming_queue: Vec::new(),
terminated: false,
messages: VecDeque::new(),
conf_round: false,
common_coin: CommonCoin::new(
netinfo,
@ -289,27 +278,16 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
}
}
fn step(&mut self, fault_log: FaultLog<NodeUid>) -> Result<Step<NodeUid>> {
Ok(Step::new(
self.output.take().into_iter().collect(),
fault_log,
self.messages
.drain(..)
.map(|msg| Target::All.message(msg))
.collect(),
))
}
/// Sets the input value for agreement.
fn set_input(&mut self, input: bool) -> Result<FaultLog<NodeUid>> {
fn set_input(&mut self, input: bool) -> Result<Step<NodeUid>> {
if self.epoch != 0 || self.estimated.is_some() {
return Err(ErrorKind::InputNotAccepted.into());
}
if self.netinfo.num_nodes() == 1 {
let mut fault_log = self.send_bval(input)?;
self.send_aux(input)?.merge_into(&mut fault_log);
self.decide(input);
Ok(fault_log)
let mut step = self.send_bval(input)?;
step.extend(self.send_aux(input)?);
step.extend(self.decide(input));
Ok(step)
} else {
// Set the initial estimated value to the input value.
self.estimated = Some(input);
@ -323,7 +301,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
self.epoch == 0 && self.estimated.is_none()
}
fn handle_bval(&mut self, sender_id: &NodeUid, b: bool) -> Result<FaultLog<NodeUid>> {
fn handle_bval(&mut self, sender_id: &NodeUid, b: bool) -> Result<Step<NodeUid>> {
self.received_bval
.entry(sender_id.clone())
.or_insert_with(BTreeSet::new)
@ -348,27 +326,27 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
} else if bin_values_changed {
self.on_bin_values_changed()
} else {
Ok(FaultLog::new())
Ok(Step::default())
}
} else if count_bval == self.netinfo.num_faulty() + 1 && !self.sent_bval.contains(&b) {
// upon receiving `BVal(b)` messages from f + 1 nodes, if
// `BVal(b)` has not been sent, multicast `BVal(b)`
self.send_bval(b)
} else {
Ok(FaultLog::new())
Ok(Step::default())
}
}
/// Called when `bin_values` changes as a result of receiving a `BVal` message. Tries to update
/// the epoch.
fn on_bin_values_changed(&mut self) -> Result<FaultLog<NodeUid>> {
fn on_bin_values_changed(&mut self) -> Result<Step<NodeUid>> {
match self.coin_schedule {
CoinSchedule::False => {
let (aux_count, aux_vals) = self.count_aux();
if aux_count >= self.netinfo.num_nodes() - self.netinfo.num_faulty() {
self.on_coin(false, aux_vals.definite())
} else {
Ok(FaultLog::new())
Ok(Step::default())
}
}
CoinSchedule::True => {
@ -376,7 +354,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
if aux_count >= self.netinfo.num_nodes() - self.netinfo.num_faulty() {
self.on_coin(true, aux_vals.definite())
} else {
Ok(FaultLog::new())
Ok(Step::default())
}
}
CoinSchedule::Random => {
@ -387,40 +365,42 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
}
}
fn send_bval(&mut self, b: bool) -> Result<FaultLog<NodeUid>> {
fn send_bval(&mut self, b: bool) -> Result<Step<NodeUid>> {
if !self.netinfo.is_validator() {
return Ok(FaultLog::new());
return Ok(Step::default());
}
// Record the value `b` as sent.
self.sent_bval.insert(b);
// Multicast `BVal`.
self.messages
.push_back(AgreementContent::BVal(b).with_epoch(self.epoch));
let msg = AgreementContent::BVal(b).with_epoch(self.epoch);
let mut step: Step<NodeUid> = Target::All.message(msg).into();
// Receive the `BVal` message locally.
let our_uid = &self.netinfo.our_uid().clone();
self.handle_bval(our_uid, b)
step.extend(self.handle_bval(our_uid, b)?);
Ok(step)
}
fn send_conf(&mut self) -> Result<FaultLog<NodeUid>> {
fn send_conf(&mut self) -> Result<Step<NodeUid>> {
if self.conf_round {
// Only one `Conf` message is allowed in an epoch.
return Ok(FaultLog::new());
return Ok(Step::default());
}
// Trigger the start of the `Conf` round.
self.conf_round = true;
if !self.netinfo.is_validator() {
return Ok(FaultLog::new());
return Ok(Step::default());
}
let v = self.bin_values;
// Multicast `Conf`.
self.messages
.push_back(AgreementContent::Conf(v).with_epoch(self.epoch));
let msg = AgreementContent::Conf(v).with_epoch(self.epoch);
let mut step: Step<NodeUid> = Target::All.message(msg).into();
// Receive the `Conf` message locally.
let our_uid = &self.netinfo.our_uid().clone();
self.handle_conf(our_uid, v)
step.extend(self.handle_conf(our_uid, v)?);
Ok(step)
}
/// Waits until at least (N f) `Aux` messages have been received, such that
@ -428,33 +408,30 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
/// bin_values (note that bin_values_r may continue to change as `BVal`
/// messages are received, thus this condition may be triggered upon arrival
/// of either an `Aux` or a `BVal` message).
fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result<FaultLog<NodeUid>> {
fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result<Step<NodeUid>> {
// Perform the `Aux` message round only if a `Conf` round hasn't started yet.
if self.conf_round {
return Ok(FaultLog::new());
return Ok(Step::default());
}
self.received_aux.insert(sender_id.clone(), b);
if self.bin_values == BinValues::None {
return Ok(FaultLog::new());
return Ok(Step::default());
}
let (aux_count, aux_vals) = self.count_aux();
if aux_count < self.netinfo.num_nodes() - self.netinfo.num_faulty() {
// Continue waiting for the (N - f) `Aux` messages.
return Ok(FaultLog::new());
return Ok(Step::default());
}
// Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...`
match self.coin_schedule {
CoinSchedule::False => self.on_coin(false, aux_vals.definite()),
CoinSchedule::True => self.on_coin(true, aux_vals.definite()),
CoinSchedule::Random => {
// Start the `Conf` message round.
self.send_conf()
}
CoinSchedule::Random => self.send_conf(), // Start the `Conf` message round.
}
}
fn handle_conf(&mut self, sender_id: &NodeUid, v: BinValues) -> Result<FaultLog<NodeUid>> {
fn handle_conf(&mut self, sender_id: &NodeUid, v: BinValues) -> Result<Step<NodeUid>> {
self.received_conf.insert(sender_id.clone(), v);
self.try_finish_conf_round()
}
@ -462,16 +439,17 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
/// Receives a `Term(v)` message. If we haven't yet decided on a value and there are more than
/// `num_faulty` such messages with the same value from different nodes, performs expedite
/// termination: decides on `v`, broadcasts `Term(v)` and terminates the instance.
fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> Result<FaultLog<NodeUid>> {
fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> Step<NodeUid> {
self.received_term.insert(sender_id.clone(), b);
// Check for the expedite termination condition.
if self.decision.is_none()
&& self.received_term.iter().filter(|(_, &c)| b == c).count()
> self.netinfo.num_faulty()
{
self.decide(b);
self.decide(b)
} else {
Step::default()
}
Ok(FaultLog::new())
}
/// Handles a Common Coin message. If there is output from Common Coin, starts the next
@ -480,7 +458,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
&mut self,
sender_id: &NodeUid,
msg: CommonCoinMessage,
) -> Result<FaultLog<NodeUid>> {
) -> Result<Step<NodeUid>> {
let coin_step = self.common_coin.handle_message(sender_id, msg)?;
self.on_coin_step(coin_step)
}
@ -488,38 +466,33 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
fn on_coin_step(
&mut self,
coin_step: common_coin::Step<NodeUid, Nonce>,
) -> Result<FaultLog<NodeUid>> {
let common_coin::Step {
output,
mut fault_log,
messages,
} = coin_step;
) -> Result<Step<NodeUid>> {
let mut step = Step::default();
let epoch = self.epoch;
self.messages.extend(messages.into_iter().map(
|msg: TargetedMessage<CommonCoinMessage, NodeUid>| {
AgreementContent::Coin(Box::new(msg.message)).with_epoch(epoch)
},
));
if let Some(coin) = output.into_iter().next() {
let coin_output = step.extend_with(coin_step, |c_msg| {
AgreementContent::Coin(Box::new(c_msg)).with_epoch(epoch)
});
if let Some(coin) = coin_output.into_iter().next() {
let def_bin_value = self.count_conf().1.definite();
fault_log.extend(self.on_coin(coin, def_bin_value)?);
step.extend(self.on_coin(coin, def_bin_value)?);
}
Ok(fault_log)
Ok(step)
}
/// When the common coin has been computed, tries to decide on an output value, updates the
/// `Agreement` epoch and handles queued messages for the new epoch.
fn on_coin(&mut self, coin: bool, def_bin_value: Option<bool>) -> Result<FaultLog<NodeUid>> {
let mut fault_log = FaultLog::new();
fn on_coin(&mut self, coin: bool, def_bin_value: Option<bool>) -> Result<Step<NodeUid>> {
if self.terminated {
// Avoid an infinite regression without making an Agreement step.
return Ok(fault_log);
return Ok(Step::default());
}
let mut step = Step::default();
let b = if let Some(b) = def_bin_value {
// Outputting a value is allowed only once.
if self.decision.is_none() && b == coin {
self.decide(b);
step.extend(self.decide(b));
}
b
} else {
@ -529,18 +502,15 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
self.update_epoch();
self.estimated = Some(b);
fault_log.extend(self.send_bval(b)?);
step.extend(self.send_bval(b)?);
let queued_msgs = replace(&mut self.incoming_queue, Vec::new());
for (sender_id, msg) in queued_msgs {
let step = self.handle_message(&sender_id, msg)?;
fault_log.extend(step.fault_log);
// Save the output of the internal call.
self.output = step.output.into_iter().next();
step.extend(self.handle_message(&sender_id, msg)?);
if self.terminated {
break;
}
}
Ok(fault_log)
Ok(step)
}
/// Computes the coin schedule for the current `Agreement` epoch.
@ -553,12 +523,13 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
}
/// Decides on a value and broadcasts a `Term` message with that value.
fn decide(&mut self, b: bool) {
fn decide(&mut self, b: bool) -> Step<NodeUid> {
if self.terminated {
return;
return Step::default();
}
// Output the agreement value.
self.output = Some(b);
let mut step = Step::default();
step.output.push_back(b);
// Latch the decided state.
self.decision = Some(b);
debug!(
@ -569,14 +540,15 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
b
);
if self.netinfo.is_validator() {
self.messages
.push_back(AgreementContent::Term(b).with_epoch(self.epoch));
let msg = AgreementContent::Term(b).with_epoch(self.epoch);
step.messages.push_back(Target::All.message(msg));
self.received_term.insert(self.netinfo.our_uid().clone(), b);
}
self.terminated = true;
step
}
fn try_finish_conf_round(&mut self) -> Result<FaultLog<NodeUid>> {
fn try_finish_conf_round(&mut self) -> Result<Step<NodeUid>> {
if self.conf_round
&& self.count_conf().0 >= self.netinfo.num_nodes() - self.netinfo.num_faulty()
{
@ -585,20 +557,22 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
self.on_coin_step(coin_step)
} else {
// Continue waiting for (N - f) `Conf` messages
Ok(FaultLog::new())
Ok(Step::default())
}
}
fn send_aux(&mut self, b: bool) -> Result<FaultLog<NodeUid>> {
fn send_aux(&mut self, b: bool) -> Result<Step<NodeUid>> {
if !self.netinfo.is_validator() {
return Ok(FaultLog::new());
return Ok(Step::default());
}
// Multicast `Aux`.
self.messages
.push_back(AgreementContent::Aux(b).with_epoch(self.epoch));
let mut step: Step<NodeUid> = Target::All
.message(AgreementContent::Aux(b).with_epoch(self.epoch))
.into();
// Receive the `Aux` message locally.
let our_uid = &self.netinfo.our_uid().clone();
self.handle_aux(our_uid, b)
step.extend(self.handle_aux(our_uid, b)?);
Ok(step)
}
/// The count of `Aux` messages such that the set of values carried by those messages is a