Output Subset messages as received (#233)

* Outputing subset messages as received

This outputs subset messages as they are received.  All tests pass.

* Fix test suite, while still outputing results early

This fixes the test suite, while still outputting results early.

* Actually do the optimization

There is a testsuite failure in the `dynamic_honey_badger` tests.  Is
this a testsuite bug?

* Respond to code review

* Document the meaning of `None` in Subset::broadcast_results

* Fix adding Contributions and fault check

* Fix clippy

* Keep track of nodes that have sent us valid messages

Otherwise, we reject all nodes as faulty.

* Remove excessive debug logging

There is no need to log a quadratic amount of data.

* Re-add check that the observer’s values match

the rest of the nodes.  Also `panic!` if `Done` is ever not the last
value in a series of `SubsetOutput`s.

* Respond to review

* Rename field
This commit is contained in:
Demi Marie Obenour 2018-09-20 08:34:40 -04:00 committed by Andreas Fackler
parent 70fe9cd506
commit 679f5784b9
4 changed files with 147 additions and 96 deletions

View File

@ -1,5 +1,5 @@
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::marker::PhantomData;
use std::sync::Arc;
@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
use super::{Batch, ErrorKind, MessageContent, Result, Step};
use fault_log::{Fault, FaultKind, FaultLog};
use messaging::{DistAlgorithm, NetworkInfo};
use subset::{self as cs, Subset};
use subset::{self as cs, Subset, SubsetOutput};
use threshold_decryption::{self as td, ThresholdDecryption};
use traits::{Contribution, NodeIdT};
@ -116,6 +116,8 @@ pub struct EpochState<C, N: Rand> {
subset: SubsetState<N>,
/// The status of threshold decryption, by proposer.
decryption: BTreeMap<N, DecryptionState<N>>,
/// Nodes found so far in `Subset` output.
accepted_proposers: BTreeSet<N>,
_phantom: PhantomData<C>,
}
@ -132,6 +134,7 @@ where
netinfo,
subset: SubsetState::Ongoing(cs),
decryption: BTreeMap::default(),
accepted_proposers: Default::default(),
_phantom: PhantomData,
})
}
@ -219,15 +222,39 @@ where
/// Checks whether the subset has output, and if it does, sends out our decryption shares.
fn process_subset(&mut self, cs_step: cs::Step<N>) -> Result<Step<C, N>> {
let mut step = Step::default();
let mut cs_outputs = step.extend_with(cs_step, |cs_msg| {
let cs_outputs: VecDeque<_> = step.extend_with(cs_step, |cs_msg| {
MessageContent::Subset(cs_msg).with_epoch(self.epoch)
});
if let Some(cs_output) = cs_outputs.pop_front() {
self.subset = SubsetState::Complete(cs_output.keys().cloned().collect());
step.extend(self.send_decryption_shares(cs_output)?);
}
if !cs_outputs.is_empty() {
error!("Multiple outputs from a single Subset instance.");
let mut has_seen_done = false;
for cs_output in cs_outputs {
if has_seen_done {
error!("`SubsetOutput::Done` was not the last `SubsetOutput`");
}
match cs_output {
SubsetOutput::Contribution(k, v) => {
step.extend(self.send_decryption_share(k.clone(), &v)?);
self.accepted_proposers.insert(k);
}
SubsetOutput::Done => {
self.subset = SubsetState::Complete(self.accepted_proposers.clone());
let faulty_shares: Vec<_> = self
.decryption
.keys()
.filter(|id| !self.accepted_proposers.contains(id))
.cloned()
.collect();
for id in faulty_shares {
if let Some(DecryptionState::Ongoing(td)) = self.decryption.remove(&id) {
for id in td.sender_ids() {
let fault_kind = FaultKind::UnexpectedDecryptionShare;
step.fault_log.append(id.clone(), fault_kind);
}
}
}
has_seen_done = true
}
}
}
Ok(step)
}
@ -250,49 +277,28 @@ where
/// Given the output of the Subset algorithm, inputs the ciphertexts into the Threshold
/// Decryption instances and sends our own decryption shares.
fn send_decryption_shares(&mut self, cs_output: BTreeMap<N, Vec<u8>>) -> Result<Step<C, N>> {
let mut step = Step::default();
let faulty_shares: Vec<_> = self
.decryption
.keys()
.filter(|id| !cs_output.contains_key(id))
.cloned()
.collect();
for id in faulty_shares {
if let Some(DecryptionState::Ongoing(td)) = self.decryption.remove(&id) {
for id in td.sender_ids() {
let fault_kind = FaultKind::UnexpectedDecryptionShare;
step.fault_log.append(id.clone(), fault_kind);
}
fn send_decryption_share(&mut self, proposer_id: N, v: &[u8]) -> Result<Step<C, N>> {
let ciphertext: Ciphertext = match bincode::deserialize(v) {
Ok(ciphertext) => ciphertext,
Err(err) => {
warn!(
"Cannot deserialize ciphertext from {:?}: {:?}",
proposer_id, err
);
return Ok(Fault::new(proposer_id, FaultKind::InvalidCiphertext).into());
}
}
for (proposer_id, v) in cs_output {
let ciphertext: Ciphertext = match bincode::deserialize(&v) {
Ok(ciphertext) => ciphertext,
Err(err) => {
warn!(
"Cannot deserialize ciphertext from {:?}: {:?}",
proposer_id, err
);
let fault_kind = FaultKind::InvalidCiphertext;
step.fault_log.append(proposer_id, fault_kind);
continue;
}
};
let td_result = match self.decryption.entry(proposer_id.clone()) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => entry.insert(DecryptionState::new(self.netinfo.clone())),
}.set_ciphertext(ciphertext);
match td_result {
Ok(td_step) => step.extend(self.process_decryption(proposer_id, td_step)?),
Err(td::Error::InvalidCiphertext(_)) => {
warn!("Invalid ciphertext from {:?}", proposer_id);
let fault_kind = FaultKind::ShareDecryptionFailed;
step.fault_log.append(proposer_id.clone(), fault_kind);
}
Err(err) => return Err(ErrorKind::ThresholdDecryption(err).into()),
};
let td_result = match self.decryption.entry(proposer_id.clone()) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => entry.insert(DecryptionState::new(self.netinfo.clone())),
}.set_ciphertext(ciphertext);
match td_result {
Ok(td_step) => self.process_decryption(proposer_id, td_step),
Err(td::Error::InvalidCiphertext(_)) => {
warn!("Invalid ciphertext from {:?}", proposer_id);
Ok(Fault::new(proposer_id.clone(), FaultKind::ShareDecryptionFailed).into())
}
Err(err) => Err(ErrorKind::ThresholdDecryption(err).into()),
}
Ok(step)
}
}

View File

@ -78,7 +78,8 @@ pub struct Subset<N: Rand> {
netinfo: Arc<NetworkInfo<N>>,
broadcast_instances: BTreeMap<N, Broadcast<N>>,
ba_instances: BTreeMap<N, BinaryAgreement<N>>,
broadcast_results: BTreeMap<N, ProposedValue>,
/// `None` means that that item has already been output.
broadcast_results: BTreeMap<N, Option<ProposedValue>>,
ba_results: BTreeMap<N, bool>,
/// Whether the instance has decided on a value.
decided: bool,
@ -89,7 +90,7 @@ pub type Step<N> = messaging::Step<Subset<N>>;
impl<N: NodeIdT + Rand> DistAlgorithm for Subset<N> {
type NodeId = N;
type Input = ProposedValue;
type Output = BTreeMap<N, ProposedValue>;
type Output = SubsetOutput<N>;
type Message = Message<N>;
type Error = Error;
@ -124,6 +125,12 @@ impl<N: NodeIdT + Rand> DistAlgorithm for Subset<N> {
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum SubsetOutput<N> {
Contribution(N, Vec<u8>),
Done,
}
impl<N: NodeIdT + Rand> Subset<N> {
pub fn new(netinfo: Arc<NetworkInfo<N>>, session_id: u64) -> Result<Self> {
// Create all broadcast instances.
@ -220,7 +227,22 @@ impl<N: NodeIdT + Rand> Subset<N> {
return Ok(step);
}
};
self.broadcast_results.insert(proposer_id.clone(), value);
let val_to_insert = if let Some(true) = self.ba_results.get(proposer_id) {
debug!(" {:?} → {:?}", proposer_id, HexBytes(&value));
step.output
.extend(Some(SubsetOutput::Contribution(proposer_id.clone(), value)));
None
} else {
Some(value)
};
if let Some(inval) = self
.broadcast_results
.insert(proposer_id.clone(), val_to_insert)
{
error!("Duplicate insert in broadcast_results: {:?}", inval)
}
let set_binary_agreement_input = |ba: &mut BinaryAgreement<N>| {
if ba.accepts_input() {
ba.handle_input(true)
@ -239,7 +261,7 @@ impl<N: NodeIdT + Rand> Subset<N> {
F: FnOnce(&mut BinaryAgreement<N>) -> binary_agreement::Result<binary_agreement::Step<N>>,
{
let mut step = Step::default();
let value = {
let accepted = {
let binary_agreement = self
.ba_instances
.get_mut(proposer_id)
@ -252,40 +274,55 @@ impl<N: NodeIdT + Rand> Subset<N> {
f(binary_agreement).map_err(Error::ProcessBinaryAgreement0)?,
to_msg,
);
if let Some(output) = output.into_iter().next() {
output
if let Some(accepted) = output.into_iter().next() {
accepted
} else {
return Ok(step);
}
};
if self.ba_results.insert(proposer_id.clone(), value).is_some() {
// Binary agreement result accepted.
if self
.ba_results
.insert(proposer_id.clone(), accepted)
.is_some()
{
return Err(Error::MultipleBinaryAgreementResults);
}
debug!(
"{:?} Updated Binary Agreement results: {:?}",
self.netinfo.our_id(),
self.ba_results
);
if value && self.count_true() == self.netinfo.num_correct() {
// Upon delivery of value 1 from at least N f instances of BA, provide
// input 0 to each instance of BA that has not yet been provided input.
for (id, binary_agreement) in &mut self.ba_instances {
if binary_agreement.accepts_input() {
let to_msg = |a_msg| Message::BinaryAgreement(id.clone(), a_msg);
for output in step.extend_with(
binary_agreement
.handle_input(false)
.map_err(Error::ProcessBinaryAgreement1)?,
to_msg,
) {
if self.ba_results.insert(id.clone(), output).is_some() {
return Err(Error::MultipleBinaryAgreementResults);
if accepted {
if self.count_true() == self.netinfo.num_correct() {
// Upon delivery of value 1 from at least N f instances of BA, provide
// input 0 to each instance of BA that has not yet been provided input.
for (id, binary_agreement) in &mut self.ba_instances {
if binary_agreement.accepts_input() {
let to_msg = |a_msg| Message::BinaryAgreement(id.clone(), a_msg);
for output in step.extend_with(
binary_agreement
.handle_input(false)
.map_err(Error::ProcessBinaryAgreement1)?,
to_msg,
) {
if self.ba_results.insert(id.clone(), output).is_some() {
return Err(Error::MultipleBinaryAgreementResults);
}
}
}
}
}
if let Some(Some(value)) = self.broadcast_results.insert(proposer_id.clone(), None) {
debug!(" {:?} → {:?}", proposer_id, HexBytes(&value));
step.output
.extend(Some(SubsetOutput::Contribution(proposer_id.clone(), value)));
}
}
step.output.extend(self.try_binary_agreement_completion());
Ok(step)
}
@ -295,7 +332,7 @@ impl<N: NodeIdT + Rand> Subset<N> {
self.ba_results.values().filter(|v| **v).count()
}
fn try_binary_agreement_completion(&mut self) -> Option<BTreeMap<N, ProposedValue>> {
fn try_binary_agreement_completion(&mut self) -> Option<SubsetOutput<N>> {
if self.decided || self.count_true() < self.netinfo.num_correct() {
return None;
}
@ -322,11 +359,11 @@ impl<N: NodeIdT + Rand> Subset<N> {
);
// Results of Broadcast instances in `delivered_1`
let broadcast_results: BTreeMap<N, ProposedValue> = self
let broadcast_results: BTreeSet<&N> = self
.broadcast_results
.iter()
.filter(|(k, _)| delivered_1.contains(k))
.map(|(k, v)| (k.clone(), v.clone()))
.map(|(k, _)| k)
.collect();
if delivered_1.len() == broadcast_results.len() {
@ -334,11 +371,8 @@ impl<N: NodeIdT + Rand> Subset<N> {
"{:?} Binary Agreement instances completed:",
self.netinfo.our_id()
);
for (id, result) in &broadcast_results {
debug!(" {:?} → {:?}", id, HexBytes(&result));
}
self.decided = true;
Some(broadcast_results)
Some(SubsetOutput::Done)
} else {
None
}

View File

@ -90,10 +90,12 @@ fn do_drop_and_readd(cfg: TestConfig) {
// First, we create a new test network with Honey Badger instances.
let mut net = NetBuilder::new(0..cfg.dimension.size)
.num_faulty(cfg.dimension.faulty)
.message_limit(200_000) // Limited to 200k messages for now.
.message_limit(200_000) // Limited to 200k messages for now.
.using_step(move |node| {
println!("Constructing new dynamic honey badger node #{}", node.id);
DynamicHoneyBadger::builder().build(node.netinfo).expect("cannot build instance")
DynamicHoneyBadger::builder()
.build(node.netinfo)
.expect("cannot build instance")
}).build()
.expect("could not construct test network");

View File

@ -20,7 +20,7 @@ use std::iter::once;
use std::sync::Arc;
use hbbft::messaging::NetworkInfo;
use hbbft::subset::Subset;
use hbbft::subset::{Subset, SubsetOutput};
use network::{Adversary, MessageScheduler, NodeId, SilentAdversary, TestNetwork, TestNode};
@ -42,24 +42,33 @@ fn test_subset<A: Adversary<Subset<NodeId>>>(
while !network.nodes.values().all(TestNode::terminated) {
network.step();
}
// Verify that all instances output the same set.
let mut expected = None;
let observer: BTreeSet<_> = network.observer.outputs().iter().cloned().collect();
for node in network.nodes.values() {
if let Some(output) = expected.as_ref() {
assert!(once(output).eq(node.outputs()));
continue;
let mut outputs = node.outputs();
let mut actual = BTreeMap::default();
let mut has_seen_done = false;
for i in outputs {
assert!(!has_seen_done);
match i {
SubsetOutput::Contribution(k, v) => {
assert!(actual.insert(k, v).is_none());
}
SubsetOutput::Done => has_seen_done = true,
}
}
assert_eq!(1, node.outputs().len());
expected = Some(node.outputs()[0].clone());
}
let output = expected.unwrap();
assert!(once(&output).eq(network.observer.outputs()));
// The Subset algorithm guarantees that more than two thirds of the proposed elements
// are in the set.
assert!(output.len() * 3 > inputs.len() * 2);
// Verify that the set's elements match the proposed values.
for (id, value) in output {
assert_eq!(inputs[&id], value);
assert_eq!(outputs.len(), actual.len() + 1);
// The Subset algorithm guarantees that more than two thirds of the proposed elements
// are in the set.
assert!(actual.len() * 3 > inputs.len() * 2);
for (id, value) in actual {
assert_eq!(&inputs[id], value);
}
assert_eq!(outputs.iter().cloned().collect::<BTreeSet<_>>(), observer);
}
}