Remove output and message queue from HoneyBadger.

This commit is contained in:
Andreas Fackler 2018-07-23 18:11:45 +02:00
parent 9d43e8df59
commit 4327744976
3 changed files with 153 additions and 182 deletions

View File

@ -251,7 +251,12 @@ impl SecretKeyShare {
if !ct.verify() {
return None;
}
Some(DecryptionShare(ct.0.into_affine().mul((self.0).0)))
Some(self.decrypt_share_no_verify(ct))
}
/// Returns a decryption share, without validating the ciphertext.
pub fn decrypt_share_no_verify(&self, ct: &Ciphertext) -> DecryptionShare {
DecryptionShare(ct.0.into_affine().mul((self.0).0))
}
}

View File

@ -24,10 +24,11 @@
use rand::Rand;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug;
use std::hash::Hash;
use std::marker::PhantomData;
use std::mem;
use std::sync::Arc;
use bincode;
@ -36,8 +37,8 @@ use serde::{Deserialize, Serialize};
use common_subset::{self, CommonSubset};
use crypto::{Ciphertext, DecryptionShare};
use fault_log::{FaultKind, FaultLog};
use messaging::{self, DistAlgorithm, NetworkInfo, Target, TargetedMessage};
use fault_log::{Fault, FaultKind, FaultLog};
use messaging::{self, DistAlgorithm, NetworkInfo, Target};
error_chain!{
links {
@ -91,12 +92,11 @@ where
has_input: false,
common_subsets: BTreeMap::new(),
max_future_epochs: self.max_future_epochs as u64,
messages: MessageQueue(VecDeque::new()),
output: Vec::new(),
incoming_queue: BTreeMap::new(),
received_shares: BTreeMap::new(),
decrypted_contributions: BTreeMap::new(),
ciphertexts: BTreeMap::new(),
_phantom: PhantomData,
}
}
}
@ -114,10 +114,6 @@ pub struct HoneyBadger<C, NodeUid: Rand> {
common_subsets: BTreeMap<u64, CommonSubset<NodeUid>>,
/// The maximum number of `CommonSubset` instances that we run simultaneously.
max_future_epochs: u64,
/// The messages that need to be sent to other nodes.
messages: MessageQueue<NodeUid>,
/// The outputs from completed epochs.
output: Vec<Batch<C, NodeUid>>,
/// Messages for future epochs that couldn't be handled yet.
incoming_queue: BTreeMap<u64, Vec<(NodeUid, MessageContent<NodeUid>)>>,
/// Received decryption shares for an epoch. Each decryption share has a sender and a
@ -128,6 +124,7 @@ pub struct HoneyBadger<C, NodeUid: Rand> {
decrypted_contributions: BTreeMap<NodeUid, Vec<u8>>,
/// Ciphertexts output by Common Subset in an epoch.
ciphertexts: BTreeMap<u64, BTreeMap<NodeUid, Ciphertext>>,
_phantom: PhantomData<C>,
}
pub type Step<C, NodeUid> = messaging::Step<HoneyBadger<C, NodeUid>>;
@ -144,8 +141,7 @@ where
type Error = Error;
fn input(&mut self, input: Self::Input) -> Result<Step<C, NodeUid>> {
let fault_log = self.propose(&input)?;
self.step(fault_log)
self.propose(&input)
}
fn handle_message(
@ -157,7 +153,6 @@ where
return Err(ErrorKind::UnknownSender.into());
}
let Message { epoch, content } = message;
let mut fault_log = FaultLog::new();
if epoch > self.epoch + self.max_future_epochs {
// Postpone handling this message.
self.incoming_queue
@ -165,9 +160,9 @@ where
.or_insert_with(Vec::new)
.push((sender_id.clone(), content));
} else if epoch == self.epoch {
fault_log.extend(self.handle_message_content(sender_id, epoch, content)?);
return self.handle_message_content(sender_id, epoch, content);
} // And ignore all messages from past epochs.
self.step(fault_log)
Ok(Step::default())
}
fn terminated(&self) -> bool {
@ -190,24 +185,17 @@ where
HoneyBadgerBuilder::new(netinfo)
}
fn step(&mut self, fault_log: FaultLog<NodeUid>) -> Result<Step<C, NodeUid>> {
Ok(Step::new(
self.output.drain(..).collect(),
fault_log,
self.messages.drain(..).collect(),
))
}
/// Proposes a new item in the current epoch.
pub fn propose(&mut self, proposal: &C) -> Result<FaultLog<NodeUid>> {
pub fn propose(&mut self, proposal: &C) -> Result<Step<C, NodeUid>> {
if !self.netinfo.is_validator() {
return Ok(FaultLog::new());
return Ok(Step::default());
}
let step = {
let cs = match self.common_subsets.entry(self.epoch) {
let epoch = self.epoch;
let cs_step = {
let cs = match self.common_subsets.entry(epoch) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
entry.insert(CommonSubset::new(self.netinfo.clone(), self.epoch)?)
entry.insert(CommonSubset::new(self.netinfo.clone(), epoch)?)
}
};
let ser_prop = bincode::serialize(&proposal)?;
@ -215,7 +203,7 @@ where
self.has_input = true;
cs.input(bincode::serialize(&ciphertext).unwrap())?
};
Ok(self.process_output(step, None)?)
self.process_output(cs_step, epoch)
}
/// Returns `true` if input for the current epoch has already been provided.
@ -229,7 +217,7 @@ where
sender_id: &NodeUid,
epoch: u64,
content: MessageContent<NodeUid>,
) -> Result<FaultLog<NodeUid>> {
) -> Result<Step<C, NodeUid>> {
match content {
MessageContent::CommonSubset(cs_msg) => {
self.handle_common_subset_message(sender_id, epoch, cs_msg)
@ -246,16 +234,15 @@ where
sender_id: &NodeUid,
epoch: u64,
message: common_subset::Message<NodeUid>,
) -> Result<FaultLog<NodeUid>> {
let mut fault_log = FaultLog::new();
let step = {
) -> Result<Step<C, NodeUid>> {
let cs_step = {
// Borrow the instance for `epoch`, or create it.
let cs = match self.common_subsets.entry(epoch) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
if epoch < self.epoch {
// Epoch has already terminated. Message is obsolete.
return Ok(fault_log);
return Ok(Step::default());
} else {
entry.insert(CommonSubset::new(self.netinfo.clone(), epoch)?)
}
@ -263,9 +250,9 @@ where
};
cs.handle_message(sender_id, message)?
};
fault_log.extend(self.process_output(step, Some(epoch))?);
let step = self.process_output(cs_step, epoch)?;
self.remove_terminated(epoch);
Ok(fault_log)
Ok(step)
}
/// Handles decryption shares sent by `HoneyBadger` instances.
@ -275,9 +262,7 @@ where
epoch: u64,
proposer_id: NodeUid,
share: DecryptionShare,
) -> Result<FaultLog<NodeUid>> {
let mut fault_log = FaultLog::new();
) -> Result<Step<C, NodeUid>> {
if let Some(ciphertext) = self
.ciphertexts
.get(&epoch)
@ -285,8 +270,7 @@ where
{
if !self.verify_decryption_share(sender_id, &share, ciphertext) {
let fault_kind = FaultKind::UnverifiedDecryptionShareSender;
fault_log.append(sender_id.clone(), fault_kind);
return Ok(fault_log);
return Ok(Fault::new(sender_id.clone(), fault_kind).into());
}
}
@ -294,16 +278,15 @@ where
self.received_shares
.entry(epoch)
.or_insert_with(BTreeMap::new)
.entry(proposer_id.clone())
.entry(proposer_id)
.or_insert_with(BTreeMap::new)
.insert(sender_id.clone(), share);
if epoch == self.epoch {
self.try_decrypt_proposer_contribution(proposer_id);
fault_log.extend(self.try_decrypt_and_output_batch()?);
self.try_decrypt_and_output_batch()
} else {
Ok(Step::default())
}
Ok(fault_log)
}
/// Verifies a given decryption share using the sender's public key and the proposer's
@ -324,29 +307,38 @@ where
/// When contributions of transactions have been decrypted for all valid proposers in this
/// epoch, moves those contributions into a batch, outputs the batch and updates the epoch.
fn try_output_batch(&mut self) -> Result<FaultLog<NodeUid>> {
// Wait until contributions have been successfully decoded for all proposer nodes with correct
// ciphertext outputs.
if !self.all_contributions_decrypted() {
return Ok(FaultLog::new());
fn try_output_batch(&mut self) -> Result<Option<Step<C, NodeUid>>> {
// Return if we don't have ciphertexts yet.
let proposer_ids = match self.ciphertexts.get(&self.epoch) {
Some(cts) => cts.keys().cloned().collect_vec(),
None => return Ok(None),
};
// Try to decrypt all contributions. If some are still missing, return.
if !proposer_ids
.into_iter()
.all(|id| self.try_decrypt_proposer_contribution(id))
{
return Ok(None);
}
let mut step = Step::default();
// Deserialize the output.
let mut fault_log = FaultLog::new();
let contributions: BTreeMap<NodeUid, C> = self
.decrypted_contributions
.iter()
.flat_map(|(proposer_id, ser_contrib)| {
// If deserialization fails, the proposer of that item is faulty. Ignore it.
if let Ok(contrib) = bincode::deserialize::<C>(&ser_contrib) {
Some((proposer_id.clone(), contrib))
} else {
let fault_kind = FaultKind::BatchDeserializationFailed;
fault_log.append(proposer_id.clone(), fault_kind);
None
}
})
.collect();
let contributions: BTreeMap<NodeUid, C> =
mem::replace(&mut self.decrypted_contributions, BTreeMap::new())
.into_iter()
.flat_map(|(proposer_id, ser_contrib)| {
// If deserialization fails, the proposer of that item is faulty. Ignore it.
if let Ok(contrib) = bincode::deserialize::<C>(&ser_contrib) {
Some((proposer_id, contrib))
} else {
let fault_kind = FaultKind::BatchDeserializationFailed;
step.fault_log.append(proposer_id, fault_kind);
None
}
})
.collect();
let batch = Batch {
epoch: self.epoch,
contributions,
@ -358,64 +350,44 @@ where
batch.contributions.keys().collect::<Vec<_>>()
);
// Queue the output and advance the epoch.
self.output.push(batch);
fault_log.extend(self.update_epoch()?);
Ok(fault_log)
step.output.push_back(batch);
step.extend(self.update_epoch()?);
Ok(Some(step))
}
/// Increments the epoch number and clears any state that is local to the finished epoch.
fn update_epoch(&mut self) -> Result<FaultLog<NodeUid>> {
fn update_epoch(&mut self) -> Result<Step<C, NodeUid>> {
// Clear the state of the old epoch.
self.ciphertexts.remove(&self.epoch);
self.decrypted_contributions.clear();
self.received_shares.remove(&self.epoch);
self.epoch += 1;
self.has_input = false;
let max_epoch = self.epoch + self.max_future_epochs;
let mut fault_log = FaultLog::new();
let mut step = Step::default();
// TODO: Once stable, use `Iterator::flatten`.
for (sender_id, content) in
Itertools::flatten(self.incoming_queue.remove(&max_epoch).into_iter())
{
self.handle_message_content(&sender_id, max_epoch, content)?
.merge_into(&mut fault_log);
step.extend(self.handle_message_content(&sender_id, max_epoch, content)?);
}
// Handle any decryption shares received for the new epoch.
self.try_decrypt_and_output_batch()?
.merge_into(&mut fault_log);
Ok(fault_log)
step.extend(self.try_decrypt_and_output_batch()?);
Ok(step)
}
/// Tries to decrypt contributions from all proposers and output those in a batch.
fn try_decrypt_and_output_batch(&mut self) -> Result<FaultLog<NodeUid>> {
// Return if we don't have ciphertexts yet.
let proposer_ids: Vec<_> = match self.ciphertexts.get(&self.epoch) {
Some(cts) => cts.keys().cloned().collect(),
None => {
return Ok(FaultLog::new());
}
};
// Try to output a batch if all contributions have been decrypted.
for proposer_id in proposer_ids {
self.try_decrypt_proposer_contribution(proposer_id);
}
self.try_output_batch()
}
/// Returns true if and only if contributions have been decrypted for all selected proposers in
/// this epoch.
fn all_contributions_decrypted(&mut self) -> bool {
match self.ciphertexts.get(&self.epoch) {
None => false, // No ciphertexts yet.
Some(ciphertexts) => ciphertexts.keys().eq(self.decrypted_contributions.keys()),
fn try_decrypt_and_output_batch(&mut self) -> Result<Step<C, NodeUid>> {
let mut step = Step::default();
while let Some(new_step) = self.try_output_batch()? {
step.extend(new_step);
}
Ok(step)
}
/// Tries to decrypt the contribution from a given proposer.
fn try_decrypt_proposer_contribution(&mut self, proposer_id: NodeUid) {
fn try_decrypt_proposer_contribution(&mut self, proposer_id: NodeUid) -> bool {
if self.decrypted_contributions.contains_key(&proposer_id) {
return; // Already decrypted.
return true; // Already decrypted.
}
let shares = if let Some(shares) = self
.received_shares
@ -424,10 +396,10 @@ where
{
shares
} else {
return;
return false; // No shares yet.
};
if shares.len() <= self.netinfo.num_faulty() {
return;
return false; // Not enough shares yet.
}
if let Some(ciphertext) = self
@ -454,70 +426,76 @@ where
Err(err) => error!("{:?} Decryption failed: {:?}.", self.our_id(), err),
}
}
true
}
fn send_decryption_shares(
&mut self,
cs_output: BTreeMap<NodeUid, Vec<u8>>,
) -> Result<FaultLog<NodeUid>> {
let mut fault_log = FaultLog::new();
epoch: u64,
) -> Result<Step<C, NodeUid>> {
let mut step = Step::default();
let mut ciphertexts = BTreeMap::new();
for (proposer_id, v) in cs_output {
let mut ciphertext: Ciphertext;
if let Ok(ct) = bincode::deserialize(&v) {
ciphertext = ct;
} else {
warn!("Invalid ciphertext from proposer {:?} ignored", proposer_id);
let fault_kind = FaultKind::InvalidCiphertext;
fault_log.append(proposer_id.clone(), fault_kind);
let ciphertext: Ciphertext = match bincode::deserialize(&v) {
Ok(ciphertext) => ciphertext,
Err(err) => {
warn!(
"Cannot deserialize ciphertext from {:?}: {:?}",
proposer_id, err
);
let fault_kind = FaultKind::InvalidCiphertext;
step.fault_log.append(proposer_id, fault_kind);
continue;
}
};
if !ciphertext.verify() {
warn!("Invalid ciphertext from {:?}", proposer_id);
let fault_kind = FaultKind::ShareDecryptionFailed;
step.fault_log.append(proposer_id.clone(), fault_kind);
continue;
}
let (incorrect_senders, faults) =
self.verify_pending_decryption_shares(&proposer_id, &ciphertext);
self.remove_incorrect_decryption_shares(&proposer_id, incorrect_senders);
fault_log.extend(faults);
let (valid, dec_fl) = self.send_decryption_share(&proposer_id, &ciphertext)?;
fault_log.extend(dec_fl);
if valid {
ciphertexts.insert(proposer_id.clone(), ciphertext);
self.try_decrypt_proposer_contribution(proposer_id);
} else {
warn!("Share decryption failed for proposer {:?}", proposer_id);
let fault_kind = FaultKind::ShareDecryptionFailed;
fault_log.append(proposer_id.clone(), fault_kind);
self.verify_pending_decryption_shares(&proposer_id, &ciphertext, epoch);
self.remove_incorrect_decryption_shares(&proposer_id, incorrect_senders, epoch);
step.fault_log.extend(faults);
if self.netinfo.is_validator() {
step.extend(self.send_decryption_share(&proposer_id, &ciphertext, epoch)?);
}
ciphertexts.insert(proposer_id, ciphertext);
}
self.ciphertexts.insert(self.epoch, ciphertexts);
fault_log.extend(self.try_decrypt_and_output_batch()?);
Ok(fault_log)
self.ciphertexts.insert(epoch, ciphertexts);
if epoch == self.epoch {
step.extend(self.try_decrypt_and_output_batch()?);
}
Ok(step)
}
/// Verifies the ciphertext and sends decryption shares. Returns whether it is valid.
/// Sends decryption shares without verifying the ciphertext.
fn send_decryption_share(
&mut self,
proposer_id: &NodeUid,
ciphertext: &Ciphertext,
) -> Result<(bool, FaultLog<NodeUid>)> {
if !self.netinfo.is_validator() {
return Ok((ciphertext.verify(), FaultLog::new()));
}
let share = match self.netinfo.secret_key_share().decrypt_share(&ciphertext) {
None => return Ok((false, FaultLog::new())),
Some(share) => share,
};
epoch: u64,
) -> Result<Step<C, NodeUid>> {
let share = self
.netinfo
.secret_key_share()
.decrypt_share_no_verify(&ciphertext);
// Send the share to remote nodes.
let our_id = self.netinfo.our_uid().clone();
// Insert the share.
self.received_shares
.entry(epoch)
.or_insert_with(BTreeMap::new)
.entry(proposer_id.clone())
.or_insert_with(BTreeMap::new)
.insert(our_id, share.clone());
let content = MessageContent::DecryptionShare {
proposer_id: proposer_id.clone(),
share: share.clone(),
share,
};
let message = Target::All.message(content.with_epoch(self.epoch));
self.messages.0.push_back(message);
let epoch = self.epoch;
let our_id = self.netinfo.our_uid().clone();
// Receive the share locally.
let fault_log =
self.handle_decryption_share_message(&our_id, epoch, proposer_id.clone(), share)?;
Ok((true, fault_log))
Ok(Target::All.message(content.with_epoch(epoch)).into())
}
/// Verifies the shares of the current epoch that are pending verification. Returned are the
@ -526,12 +504,13 @@ where
&self,
proposer_id: &NodeUid,
ciphertext: &Ciphertext,
epoch: u64,
) -> (BTreeSet<NodeUid>, FaultLog<NodeUid>) {
let mut incorrect_senders = BTreeSet::new();
let mut fault_log = FaultLog::new();
if let Some(sender_shares) = self
.received_shares
.get(&self.epoch)
.get(&epoch)
.and_then(|e| e.get(proposer_id))
{
for (sender_id, share) in sender_shares {
@ -549,10 +528,11 @@ where
&mut self,
proposer_id: &NodeUid,
incorrect_senders: BTreeSet<NodeUid>,
epoch: u64,
) {
if let Some(sender_shares) = self
.received_shares
.get_mut(&self.epoch)
.get_mut(&epoch)
.and_then(|e| e.get_mut(proposer_id))
{
for sender_id in incorrect_senders {
@ -567,23 +547,18 @@ where
/// `epoch == Some(given_epoch)`.
fn process_output(
&mut self,
step: common_subset::Step<NodeUid>,
epoch: Option<u64>,
) -> Result<FaultLog<NodeUid>> {
let common_subset::Step {
output,
mut fault_log,
mut messages,
} = step;
self.messages.extend_with_epoch(self.epoch, &mut messages);
// If this is the current epoch, the message could cause a new output.
if epoch.is_none() || epoch == Some(self.epoch) {
for cs_output in output {
fault_log.extend(self.send_decryption_shares(cs_output)?);
// TODO: May also check that there is no further output from Common Subset.
}
cs_step: common_subset::Step<NodeUid>,
epoch: u64,
) -> Result<Step<C, NodeUid>> {
let mut step = Step::default();
let cs_outputs = step.extend_with(cs_step, |cs_msg| {
MessageContent::CommonSubset(cs_msg).with_epoch(epoch)
});
for cs_output in cs_outputs {
// There is at most one output.
step.extend(self.send_decryption_shares(cs_output, epoch)?);
}
Ok(fault_log)
Ok(step)
}
/// Removes all `CommonSubset` instances from _past_ epochs that have terminated.
@ -686,21 +661,3 @@ impl<NodeUid: Rand> Message<NodeUid> {
self.epoch
}
}
/// The queue of outgoing messages in a `HoneyBadger` instance.
#[derive(Deref, DerefMut)]
struct MessageQueue<NodeUid: Rand>(VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>);
impl<NodeUid: Clone + Debug + Ord + Rand> MessageQueue<NodeUid> {
/// Appends to the queue the messages from `cs`, wrapped with `epoch`.
fn extend_with_epoch(
&mut self,
epoch: u64,
msgs: &mut VecDeque<TargetedMessage<common_subset::Message<NodeUid>, NodeUid>>,
) {
let convert = |msg: TargetedMessage<common_subset::Message<NodeUid>, NodeUid>| {
msg.map(|cs_msg| MessageContent::CommonSubset(cs_msg).with_epoch(epoch))
};
self.extend(msgs.drain(..).map(convert));
}
}

View File

@ -3,7 +3,7 @@ use std::fmt::Debug;
use std::iter::once;
use crypto::{PublicKey, PublicKeySet, PublicKeyShare, SecretKey, SecretKeyShare};
use fault_log::FaultLog;
use fault_log::{Fault, FaultLog};
/// Message sent by a given source.
#[derive(Clone, Debug)]
@ -156,6 +156,15 @@ impl<D: DistAlgorithm> From<FaultLog<D::NodeUid>> for Step<D> {
}
}
impl<D: DistAlgorithm> From<Fault<D::NodeUid>> for Step<D> {
fn from(fault: Fault<D::NodeUid>) -> Self {
Step {
fault_log: fault.into(),
..Step::default()
}
}
}
impl<D: DistAlgorithm> From<TargetedMessage<D::Message, D::NodeUid>> for Step<D> {
fn from(msg: TargetedMessage<D::Message, D::NodeUid>) -> Self {
Step {