fixed a starvation issue in Agreement when num_faulty=0

This commit is contained in:
Vladimir Komendantskiy 2018-07-10 12:23:50 +01:00
parent 0ba06fdb76
commit 1254d40147
4 changed files with 52 additions and 30 deletions

View File

@ -74,8 +74,8 @@ use itertools::Itertools;
use agreement::bin_values::BinValues;
use common_coin;
use common_coin::{CommonCoin, CommonCoinMessage};
use fault_log::FaultLog;
use common_coin::{CommonCoin, CommonCoinMessage, CommonCoinStep};
use messaging::{DistAlgorithm, NetworkInfo, Step, Target, TargetedMessage};
error_chain!{
@ -173,11 +173,9 @@ pub struct Agreement<NodeUid> {
/// A cache for messages for future epochs that cannot be handled yet.
// TODO: Find a better solution for this; defend against spam.
incoming_queue: Vec<(NodeUid, AgreementMessage)>,
/// Termination flag. The Agreement instance doesn't terminate immediately
/// upon deciding on the agreed value. This is done in order to help other
/// nodes decide despite asynchrony of communication. Once the instance
/// determines that all the remote nodes have reached agreement, it sets the
/// `terminated` flag and accepts no more incoming messages.
/// Termination flag. Once the instance determines that all the remote nodes have reached
/// agreement or have the necessary information to reach agreement, it sets the `terminated`
/// flag and accepts no more incoming messages.
terminated: bool,
/// The outgoing message queue.
messages: VecDeque<AgreementMessage>,
@ -283,7 +281,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
}
fn step(&mut self) -> AgreementResult<AgreementStep<NodeUid>> {
Ok(Step::new(replace(&mut self.output, None)))
Ok(Step::new(self.output.take()))
}
/// Sets the input value for agreement.
@ -471,16 +469,18 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
sender_id: &NodeUid,
msg: CommonCoinMessage,
) -> AgreementResult<FaultLog<NodeUid>> {
let mut coin_step = self.common_coin.handle_message(sender_id, msg)?;
let coin_step = self.common_coin.handle_message(sender_id, msg)?;
self.extend_common_coin();
self.on_coin_step(coin_step)
}
fn on_coin_step(&mut self, coin_step: CommonCoinStep<NodeUid>) -> AgreementResult<FaultLog<NodeUid>> {
let mut fault_log = FaultLog::new();
if let Some(coin) = coin_step.output {
let def_bin_value = self.count_conf().1.definite();
self.on_coin(coin, def_bin_value)?
.merge_into(&mut coin_step.fault_log);
fault_log.extend(self.on_coin(coin, def_bin_value)?);
}
Ok(coin_step.fault_log)
Ok(fault_log)
}
/// When the common coin has been computed, tries to decide on an output value, updates the
@ -490,6 +490,12 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
coin: bool,
def_bin_value: Option<bool>,
) -> AgreementResult<FaultLog<NodeUid>> {
let mut fault_log = FaultLog::new();
if self.terminated {
// Avoid an infinite regression without making an Agreement step.
return Ok(fault_log);
}
let b = if let Some(b) = def_bin_value {
// Outputting a value is allowed only once.
if self.decision.is_none() && b == coin {
@ -503,7 +509,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
self.update_epoch();
self.estimated = Some(b);
let mut fault_log = self.send_bval(b)?;
fault_log.extend(self.send_bval(b)?);
let queued_msgs = replace(&mut self.incoming_queue, Vec::new());
for (sender_id, msg) in queued_msgs {
let step = self.handle_message(&sender_id, msg)?;
@ -545,17 +551,19 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
self.output = Some(b);
// Latch the decided state.
self.decision = Some(b);
debug!(
"{:?}/{:?} (is_validator: {}) decision: {}",
self.netinfo.our_uid(),
self.proposer_id,
self.netinfo.is_validator(),
b
);
if self.netinfo.is_validator() {
self.messages
.push_back(AgreementContent::Term(b).with_epoch(self.epoch));
self.received_term.insert(self.netinfo.our_uid().clone(), b);
}
self.terminated = true;
debug!(
"Agreement instance {:?} decided: {}",
self.netinfo.our_uid(),
b
);
}
fn try_finish_conf_round(&mut self) -> AgreementResult<FaultLog<NodeUid>> {
@ -566,8 +574,11 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
// Continue waiting for (N - f) `Conf` messages
return Ok(fault_log);
}
self.common_coin.input(())?.merge_into(&mut fault_log);
// Invoke the comon coin.
let coin_step = self.common_coin.input(())?;
fault_log.extend(coin_step.fault_log);
self.extend_common_coin();
self.on_coin_step(coin_step)?;
}
Ok(fault_log)
}

View File

@ -23,7 +23,6 @@
use std::collections::{BTreeMap, VecDeque};
use std::fmt::Debug;
use std::mem::replace;
use std::sync::Arc;
use crypto::error as cerror;
@ -153,7 +152,7 @@ where
}
fn step(&mut self) -> Result<CommonCoinStep<NodeUid>> {
Ok(Step::new(replace(&mut self.output, None)))
Ok(Step::new(self.output.take()))
}
fn get_coin(&mut self) -> Result<FaultLog<NodeUid>> {
@ -185,10 +184,17 @@ where
fn try_output(&mut self) -> Result<()> {
let received_shares = &self.received_shares;
debug!(
"{:?} received {} shares, had_input = {}",
self.netinfo.our_uid(),
received_shares.len(),
self.had_input
);
if self.had_input && received_shares.len() > self.netinfo.num_faulty() {
let sig = self.combine_and_verify_sig()?;
// Output the parity of the verified signature.
let parity = sig.parity();
debug!("{:?} output {}", self.netinfo.our_uid(), parity);
self.output = Some(parity);
self.terminated = true;
}

View File

@ -129,7 +129,7 @@ where
// broadcast.
let fault_log = match input {
Input::User(contrib) => self.propose(contrib)?,
Input::Change(change) => self.vote_for(change)?,
Input::Change(change) => self.vote_for(change).map(|()| FaultLog::new())?,
};
self.step().with_fault_log(fault_log)
}
@ -183,7 +183,7 @@ where
NodeUid: Eq + Ord + Clone + Debug + Serialize + for<'r> Deserialize<'r> + Hash,
{
fn step(&mut self) -> Result<DynamicHoneyBadgerStep<C, NodeUid>> {
Ok(Step::new(self.output.take()))
Ok(Step::new(self.output.drain(0..).collect()))
}
/// Returns a new `DynamicHoneyBadgerBuilder` configured to use the node IDs and cryptographic
@ -258,7 +258,7 @@ where
/// Processes all pending batches output by Honey Badger.
fn process_output(
&mut self,
step: HoneyBadgerStep<NodeUid, InternalContrib<C, NodeUid>>,
step: HoneyBadgerStep<InternalContrib<C, NodeUid>, NodeUid>,
) -> Result<FaultLog<NodeUid>> {
let mut fault_log = FaultLog::new();
fault_log.extend(step.fault_log);
@ -314,8 +314,9 @@ where
if start_epoch < self.start_epoch {
let queue = mem::replace(&mut self.incoming_queue, Vec::new());
for (sender_id, msg) in queue {
self.handle_message(&sender_id, msg)?
.merge_into(&mut fault_log);
let rec_step = self.handle_message(&sender_id, msg)?;
self.output.extend(rec_step.output);
fault_log.extend(rec_step.fault_log);
}
}
Ok(fault_log)

View File

@ -75,11 +75,11 @@ where
for size in sizes {
let num_faulty_nodes = (size - 1) / 3;
let num_good_nodes = size - num_faulty_nodes;
info!(
"Network size: {} good nodes, {} faulty nodes",
num_good_nodes, num_faulty_nodes
);
for &input in &[None, Some(false), Some(true)] {
info!(
"Test start: {} good nodes and {} faulty nodes, input: {:?}",
num_good_nodes, num_faulty_nodes, input
);
let adversary = |_| new_adversary(num_good_nodes, num_faulty_nodes);
let new_agreement = |netinfo: Arc<NetworkInfo<NodeUid>>| {
Agreement::new(netinfo, 0, NodeUid(0)).expect("agreement instance")
@ -87,6 +87,10 @@ where
let network =
TestNetwork::new(num_good_nodes, num_faulty_nodes, adversary, new_agreement);
test_agreement(network, input);
info!(
"Test success: {} good nodes and {} faulty nodes, input: {:?}",
num_good_nodes, num_faulty_nodes, input
);
}
}
}