mirror of https://github.com/poanetwork/hbbft.git
Merge pull request #161 from poanetwork/afck-queue-fields
Remove output and message queue from HoneyBadger.
This commit is contained in:
commit
c23aebffb4
|
@ -251,7 +251,12 @@ impl SecretKeyShare {
|
|||
if !ct.verify() {
|
||||
return None;
|
||||
}
|
||||
Some(DecryptionShare(ct.0.into_affine().mul((self.0).0)))
|
||||
Some(self.decrypt_share_no_verify(ct))
|
||||
}
|
||||
|
||||
/// Returns a decryption share, without validating the ciphertext.
|
||||
pub fn decrypt_share_no_verify(&self, ct: &Ciphertext) -> DecryptionShare {
|
||||
DecryptionShare(ct.0.into_affine().mul((self.0).0))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -24,10 +24,11 @@
|
|||
|
||||
use rand::Rand;
|
||||
use std::collections::btree_map::Entry;
|
||||
use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::fmt::Debug;
|
||||
use std::hash::Hash;
|
||||
use std::marker::PhantomData;
|
||||
use std::mem;
|
||||
use std::sync::Arc;
|
||||
|
||||
use bincode;
|
||||
|
@ -36,8 +37,8 @@ use serde::{Deserialize, Serialize};
|
|||
|
||||
use common_subset::{self, CommonSubset};
|
||||
use crypto::{Ciphertext, DecryptionShare};
|
||||
use fault_log::{FaultKind, FaultLog};
|
||||
use messaging::{self, DistAlgorithm, NetworkInfo, Target, TargetedMessage};
|
||||
use fault_log::{Fault, FaultKind, FaultLog};
|
||||
use messaging::{self, DistAlgorithm, NetworkInfo, Target};
|
||||
|
||||
error_chain!{
|
||||
links {
|
||||
|
@ -91,12 +92,11 @@ where
|
|||
has_input: false,
|
||||
common_subsets: BTreeMap::new(),
|
||||
max_future_epochs: self.max_future_epochs as u64,
|
||||
messages: MessageQueue(VecDeque::new()),
|
||||
output: Vec::new(),
|
||||
incoming_queue: BTreeMap::new(),
|
||||
received_shares: BTreeMap::new(),
|
||||
decrypted_contributions: BTreeMap::new(),
|
||||
ciphertexts: BTreeMap::new(),
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -114,10 +114,6 @@ pub struct HoneyBadger<C, NodeUid: Rand> {
|
|||
common_subsets: BTreeMap<u64, CommonSubset<NodeUid>>,
|
||||
/// The maximum number of `CommonSubset` instances that we run simultaneously.
|
||||
max_future_epochs: u64,
|
||||
/// The messages that need to be sent to other nodes.
|
||||
messages: MessageQueue<NodeUid>,
|
||||
/// The outputs from completed epochs.
|
||||
output: Vec<Batch<C, NodeUid>>,
|
||||
/// Messages for future epochs that couldn't be handled yet.
|
||||
incoming_queue: BTreeMap<u64, Vec<(NodeUid, MessageContent<NodeUid>)>>,
|
||||
/// Received decryption shares for an epoch. Each decryption share has a sender and a
|
||||
|
@ -128,6 +124,7 @@ pub struct HoneyBadger<C, NodeUid: Rand> {
|
|||
decrypted_contributions: BTreeMap<NodeUid, Vec<u8>>,
|
||||
/// Ciphertexts output by Common Subset in an epoch.
|
||||
ciphertexts: BTreeMap<u64, BTreeMap<NodeUid, Ciphertext>>,
|
||||
_phantom: PhantomData<C>,
|
||||
}
|
||||
|
||||
pub type Step<C, NodeUid> = messaging::Step<HoneyBadger<C, NodeUid>>;
|
||||
|
@ -144,8 +141,7 @@ where
|
|||
type Error = Error;
|
||||
|
||||
fn input(&mut self, input: Self::Input) -> Result<Step<C, NodeUid>> {
|
||||
let fault_log = self.propose(&input)?;
|
||||
self.step(fault_log)
|
||||
self.propose(&input)
|
||||
}
|
||||
|
||||
fn handle_message(
|
||||
|
@ -157,7 +153,6 @@ where
|
|||
return Err(ErrorKind::UnknownSender.into());
|
||||
}
|
||||
let Message { epoch, content } = message;
|
||||
let mut fault_log = FaultLog::new();
|
||||
if epoch > self.epoch + self.max_future_epochs {
|
||||
// Postpone handling this message.
|
||||
self.incoming_queue
|
||||
|
@ -165,9 +160,9 @@ where
|
|||
.or_insert_with(Vec::new)
|
||||
.push((sender_id.clone(), content));
|
||||
} else if epoch == self.epoch {
|
||||
fault_log.extend(self.handle_message_content(sender_id, epoch, content)?);
|
||||
return self.handle_message_content(sender_id, epoch, content);
|
||||
} // And ignore all messages from past epochs.
|
||||
self.step(fault_log)
|
||||
Ok(Step::default())
|
||||
}
|
||||
|
||||
fn terminated(&self) -> bool {
|
||||
|
@ -190,24 +185,17 @@ where
|
|||
HoneyBadgerBuilder::new(netinfo)
|
||||
}
|
||||
|
||||
fn step(&mut self, fault_log: FaultLog<NodeUid>) -> Result<Step<C, NodeUid>> {
|
||||
Ok(Step::new(
|
||||
self.output.drain(..).collect(),
|
||||
fault_log,
|
||||
self.messages.drain(..).collect(),
|
||||
))
|
||||
}
|
||||
|
||||
/// Proposes a new item in the current epoch.
|
||||
pub fn propose(&mut self, proposal: &C) -> Result<FaultLog<NodeUid>> {
|
||||
pub fn propose(&mut self, proposal: &C) -> Result<Step<C, NodeUid>> {
|
||||
if !self.netinfo.is_validator() {
|
||||
return Ok(FaultLog::new());
|
||||
return Ok(Step::default());
|
||||
}
|
||||
let step = {
|
||||
let cs = match self.common_subsets.entry(self.epoch) {
|
||||
let epoch = self.epoch;
|
||||
let cs_step = {
|
||||
let cs = match self.common_subsets.entry(epoch) {
|
||||
Entry::Occupied(entry) => entry.into_mut(),
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(CommonSubset::new(self.netinfo.clone(), self.epoch)?)
|
||||
entry.insert(CommonSubset::new(self.netinfo.clone(), epoch)?)
|
||||
}
|
||||
};
|
||||
let ser_prop = bincode::serialize(&proposal)?;
|
||||
|
@ -215,7 +203,7 @@ where
|
|||
self.has_input = true;
|
||||
cs.input(bincode::serialize(&ciphertext).unwrap())?
|
||||
};
|
||||
Ok(self.process_output(step, None)?)
|
||||
self.process_output(cs_step, epoch)
|
||||
}
|
||||
|
||||
/// Returns `true` if input for the current epoch has already been provided.
|
||||
|
@ -229,7 +217,7 @@ where
|
|||
sender_id: &NodeUid,
|
||||
epoch: u64,
|
||||
content: MessageContent<NodeUid>,
|
||||
) -> Result<FaultLog<NodeUid>> {
|
||||
) -> Result<Step<C, NodeUid>> {
|
||||
match content {
|
||||
MessageContent::CommonSubset(cs_msg) => {
|
||||
self.handle_common_subset_message(sender_id, epoch, cs_msg)
|
||||
|
@ -246,16 +234,15 @@ where
|
|||
sender_id: &NodeUid,
|
||||
epoch: u64,
|
||||
message: common_subset::Message<NodeUid>,
|
||||
) -> Result<FaultLog<NodeUid>> {
|
||||
let mut fault_log = FaultLog::new();
|
||||
let step = {
|
||||
) -> Result<Step<C, NodeUid>> {
|
||||
let cs_step = {
|
||||
// Borrow the instance for `epoch`, or create it.
|
||||
let cs = match self.common_subsets.entry(epoch) {
|
||||
Entry::Occupied(entry) => entry.into_mut(),
|
||||
Entry::Vacant(entry) => {
|
||||
if epoch < self.epoch {
|
||||
// Epoch has already terminated. Message is obsolete.
|
||||
return Ok(fault_log);
|
||||
return Ok(Step::default());
|
||||
} else {
|
||||
entry.insert(CommonSubset::new(self.netinfo.clone(), epoch)?)
|
||||
}
|
||||
|
@ -263,9 +250,9 @@ where
|
|||
};
|
||||
cs.handle_message(sender_id, message)?
|
||||
};
|
||||
fault_log.extend(self.process_output(step, Some(epoch))?);
|
||||
let step = self.process_output(cs_step, epoch)?;
|
||||
self.remove_terminated(epoch);
|
||||
Ok(fault_log)
|
||||
Ok(step)
|
||||
}
|
||||
|
||||
/// Handles decryption shares sent by `HoneyBadger` instances.
|
||||
|
@ -275,9 +262,7 @@ where
|
|||
epoch: u64,
|
||||
proposer_id: NodeUid,
|
||||
share: DecryptionShare,
|
||||
) -> Result<FaultLog<NodeUid>> {
|
||||
let mut fault_log = FaultLog::new();
|
||||
|
||||
) -> Result<Step<C, NodeUid>> {
|
||||
if let Some(ciphertext) = self
|
||||
.ciphertexts
|
||||
.get(&epoch)
|
||||
|
@ -285,8 +270,7 @@ where
|
|||
{
|
||||
if !self.verify_decryption_share(sender_id, &share, ciphertext) {
|
||||
let fault_kind = FaultKind::UnverifiedDecryptionShareSender;
|
||||
fault_log.append(sender_id.clone(), fault_kind);
|
||||
return Ok(fault_log);
|
||||
return Ok(Fault::new(sender_id.clone(), fault_kind).into());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -294,16 +278,15 @@ where
|
|||
self.received_shares
|
||||
.entry(epoch)
|
||||
.or_insert_with(BTreeMap::new)
|
||||
.entry(proposer_id.clone())
|
||||
.entry(proposer_id)
|
||||
.or_insert_with(BTreeMap::new)
|
||||
.insert(sender_id.clone(), share);
|
||||
|
||||
if epoch == self.epoch {
|
||||
self.try_decrypt_proposer_contribution(proposer_id);
|
||||
fault_log.extend(self.try_decrypt_and_output_batch()?);
|
||||
self.try_output_batches()
|
||||
} else {
|
||||
Ok(Step::default())
|
||||
}
|
||||
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
/// Verifies a given decryption share using the sender's public key and the proposer's
|
||||
|
@ -324,29 +307,38 @@ where
|
|||
|
||||
/// When contributions of transactions have been decrypted for all valid proposers in this
|
||||
/// epoch, moves those contributions into a batch, outputs the batch and updates the epoch.
|
||||
fn try_output_batch(&mut self) -> Result<FaultLog<NodeUid>> {
|
||||
// Wait until contributions have been successfully decoded for all proposer nodes with correct
|
||||
// ciphertext outputs.
|
||||
if !self.all_contributions_decrypted() {
|
||||
return Ok(FaultLog::new());
|
||||
fn try_output_batch(&mut self) -> Result<Option<Step<C, NodeUid>>> {
|
||||
// Return if we don't have ciphertexts yet.
|
||||
let proposer_ids = match self.ciphertexts.get(&self.epoch) {
|
||||
Some(cts) => cts.keys().cloned().collect_vec(),
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
// Try to decrypt all contributions. If some are still missing, return.
|
||||
if !proposer_ids
|
||||
.into_iter()
|
||||
.all(|id| self.try_decrypt_proposer_contribution(id))
|
||||
{
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let mut step = Step::default();
|
||||
|
||||
// Deserialize the output.
|
||||
let mut fault_log = FaultLog::new();
|
||||
let contributions: BTreeMap<NodeUid, C> = self
|
||||
.decrypted_contributions
|
||||
.iter()
|
||||
.flat_map(|(proposer_id, ser_contrib)| {
|
||||
// If deserialization fails, the proposer of that item is faulty. Ignore it.
|
||||
if let Ok(contrib) = bincode::deserialize::<C>(&ser_contrib) {
|
||||
Some((proposer_id.clone(), contrib))
|
||||
} else {
|
||||
let fault_kind = FaultKind::BatchDeserializationFailed;
|
||||
fault_log.append(proposer_id.clone(), fault_kind);
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let contributions: BTreeMap<NodeUid, C> =
|
||||
mem::replace(&mut self.decrypted_contributions, BTreeMap::new())
|
||||
.into_iter()
|
||||
.flat_map(|(proposer_id, ser_contrib)| {
|
||||
// If deserialization fails, the proposer of that item is faulty. Ignore it.
|
||||
if let Ok(contrib) = bincode::deserialize::<C>(&ser_contrib) {
|
||||
Some((proposer_id, contrib))
|
||||
} else {
|
||||
let fault_kind = FaultKind::BatchDeserializationFailed;
|
||||
step.fault_log.append(proposer_id, fault_kind);
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let batch = Batch {
|
||||
epoch: self.epoch,
|
||||
contributions,
|
||||
|
@ -358,64 +350,44 @@ where
|
|||
batch.contributions.keys().collect::<Vec<_>>()
|
||||
);
|
||||
// Queue the output and advance the epoch.
|
||||
self.output.push(batch);
|
||||
fault_log.extend(self.update_epoch()?);
|
||||
Ok(fault_log)
|
||||
step.output.push_back(batch);
|
||||
step.extend(self.update_epoch()?);
|
||||
Ok(Some(step))
|
||||
}
|
||||
|
||||
/// Increments the epoch number and clears any state that is local to the finished epoch.
|
||||
fn update_epoch(&mut self) -> Result<FaultLog<NodeUid>> {
|
||||
fn update_epoch(&mut self) -> Result<Step<C, NodeUid>> {
|
||||
// Clear the state of the old epoch.
|
||||
self.ciphertexts.remove(&self.epoch);
|
||||
self.decrypted_contributions.clear();
|
||||
self.received_shares.remove(&self.epoch);
|
||||
self.epoch += 1;
|
||||
self.has_input = false;
|
||||
let max_epoch = self.epoch + self.max_future_epochs;
|
||||
let mut fault_log = FaultLog::new();
|
||||
let mut step = Step::default();
|
||||
// TODO: Once stable, use `Iterator::flatten`.
|
||||
for (sender_id, content) in
|
||||
Itertools::flatten(self.incoming_queue.remove(&max_epoch).into_iter())
|
||||
{
|
||||
self.handle_message_content(&sender_id, max_epoch, content)?
|
||||
.merge_into(&mut fault_log);
|
||||
step.extend(self.handle_message_content(&sender_id, max_epoch, content)?);
|
||||
}
|
||||
// Handle any decryption shares received for the new epoch.
|
||||
self.try_decrypt_and_output_batch()?
|
||||
.merge_into(&mut fault_log);
|
||||
Ok(fault_log)
|
||||
step.extend(self.try_output_batches()?);
|
||||
Ok(step)
|
||||
}
|
||||
|
||||
/// Tries to decrypt contributions from all proposers and output those in a batch.
|
||||
fn try_decrypt_and_output_batch(&mut self) -> Result<FaultLog<NodeUid>> {
|
||||
// Return if we don't have ciphertexts yet.
|
||||
let proposer_ids: Vec<_> = match self.ciphertexts.get(&self.epoch) {
|
||||
Some(cts) => cts.keys().cloned().collect(),
|
||||
None => {
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
};
|
||||
|
||||
// Try to output a batch if all contributions have been decrypted.
|
||||
for proposer_id in proposer_ids {
|
||||
self.try_decrypt_proposer_contribution(proposer_id);
|
||||
}
|
||||
self.try_output_batch()
|
||||
}
|
||||
|
||||
/// Returns true if and only if contributions have been decrypted for all selected proposers in
|
||||
/// this epoch.
|
||||
fn all_contributions_decrypted(&mut self) -> bool {
|
||||
match self.ciphertexts.get(&self.epoch) {
|
||||
None => false, // No ciphertexts yet.
|
||||
Some(ciphertexts) => ciphertexts.keys().eq(self.decrypted_contributions.keys()),
|
||||
fn try_output_batches(&mut self) -> Result<Step<C, NodeUid>> {
|
||||
let mut step = Step::default();
|
||||
while let Some(new_step) = self.try_output_batch()? {
|
||||
step.extend(new_step);
|
||||
}
|
||||
Ok(step)
|
||||
}
|
||||
|
||||
/// Tries to decrypt the contribution from a given proposer.
|
||||
fn try_decrypt_proposer_contribution(&mut self, proposer_id: NodeUid) {
|
||||
fn try_decrypt_proposer_contribution(&mut self, proposer_id: NodeUid) -> bool {
|
||||
if self.decrypted_contributions.contains_key(&proposer_id) {
|
||||
return; // Already decrypted.
|
||||
return true; // Already decrypted.
|
||||
}
|
||||
let shares = if let Some(shares) = self
|
||||
.received_shares
|
||||
|
@ -424,10 +396,10 @@ where
|
|||
{
|
||||
shares
|
||||
} else {
|
||||
return;
|
||||
return false; // No shares yet.
|
||||
};
|
||||
if shares.len() <= self.netinfo.num_faulty() {
|
||||
return;
|
||||
return false; // Not enough shares yet.
|
||||
}
|
||||
|
||||
if let Some(ciphertext) = self
|
||||
|
@ -454,70 +426,76 @@ where
|
|||
Err(err) => error!("{:?} Decryption failed: {:?}.", self.our_id(), err),
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
fn send_decryption_shares(
|
||||
&mut self,
|
||||
cs_output: BTreeMap<NodeUid, Vec<u8>>,
|
||||
) -> Result<FaultLog<NodeUid>> {
|
||||
let mut fault_log = FaultLog::new();
|
||||
epoch: u64,
|
||||
) -> Result<Step<C, NodeUid>> {
|
||||
let mut step = Step::default();
|
||||
let mut ciphertexts = BTreeMap::new();
|
||||
for (proposer_id, v) in cs_output {
|
||||
let mut ciphertext: Ciphertext;
|
||||
if let Ok(ct) = bincode::deserialize(&v) {
|
||||
ciphertext = ct;
|
||||
} else {
|
||||
warn!("Invalid ciphertext from proposer {:?} ignored", proposer_id);
|
||||
let fault_kind = FaultKind::InvalidCiphertext;
|
||||
fault_log.append(proposer_id.clone(), fault_kind);
|
||||
let ciphertext: Ciphertext = match bincode::deserialize(&v) {
|
||||
Ok(ciphertext) => ciphertext,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"Cannot deserialize ciphertext from {:?}: {:?}",
|
||||
proposer_id, err
|
||||
);
|
||||
let fault_kind = FaultKind::InvalidCiphertext;
|
||||
step.fault_log.append(proposer_id, fault_kind);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if !ciphertext.verify() {
|
||||
warn!("Invalid ciphertext from {:?}", proposer_id);
|
||||
let fault_kind = FaultKind::ShareDecryptionFailed;
|
||||
step.fault_log.append(proposer_id.clone(), fault_kind);
|
||||
continue;
|
||||
}
|
||||
let (incorrect_senders, faults) =
|
||||
self.verify_pending_decryption_shares(&proposer_id, &ciphertext);
|
||||
self.remove_incorrect_decryption_shares(&proposer_id, incorrect_senders);
|
||||
fault_log.extend(faults);
|
||||
let (valid, dec_fl) = self.send_decryption_share(&proposer_id, &ciphertext)?;
|
||||
fault_log.extend(dec_fl);
|
||||
if valid {
|
||||
ciphertexts.insert(proposer_id.clone(), ciphertext);
|
||||
self.try_decrypt_proposer_contribution(proposer_id);
|
||||
} else {
|
||||
warn!("Share decryption failed for proposer {:?}", proposer_id);
|
||||
let fault_kind = FaultKind::ShareDecryptionFailed;
|
||||
fault_log.append(proposer_id.clone(), fault_kind);
|
||||
self.verify_pending_decryption_shares(&proposer_id, &ciphertext, epoch);
|
||||
self.remove_incorrect_decryption_shares(&proposer_id, incorrect_senders, epoch);
|
||||
step.fault_log.extend(faults);
|
||||
if self.netinfo.is_validator() {
|
||||
step.extend(self.send_decryption_share(&proposer_id, &ciphertext, epoch)?);
|
||||
}
|
||||
ciphertexts.insert(proposer_id, ciphertext);
|
||||
}
|
||||
self.ciphertexts.insert(self.epoch, ciphertexts);
|
||||
fault_log.extend(self.try_decrypt_and_output_batch()?);
|
||||
Ok(fault_log)
|
||||
self.ciphertexts.insert(epoch, ciphertexts);
|
||||
if epoch == self.epoch {
|
||||
step.extend(self.try_output_batches()?);
|
||||
}
|
||||
Ok(step)
|
||||
}
|
||||
|
||||
/// Verifies the ciphertext and sends decryption shares. Returns whether it is valid.
|
||||
/// Sends decryption shares without verifying the ciphertext.
|
||||
fn send_decryption_share(
|
||||
&mut self,
|
||||
proposer_id: &NodeUid,
|
||||
ciphertext: &Ciphertext,
|
||||
) -> Result<(bool, FaultLog<NodeUid>)> {
|
||||
if !self.netinfo.is_validator() {
|
||||
return Ok((ciphertext.verify(), FaultLog::new()));
|
||||
}
|
||||
let share = match self.netinfo.secret_key_share().decrypt_share(&ciphertext) {
|
||||
None => return Ok((false, FaultLog::new())),
|
||||
Some(share) => share,
|
||||
};
|
||||
epoch: u64,
|
||||
) -> Result<Step<C, NodeUid>> {
|
||||
let share = self
|
||||
.netinfo
|
||||
.secret_key_share()
|
||||
.decrypt_share_no_verify(&ciphertext);
|
||||
// Send the share to remote nodes.
|
||||
let our_id = self.netinfo.our_uid().clone();
|
||||
// Insert the share.
|
||||
self.received_shares
|
||||
.entry(epoch)
|
||||
.or_insert_with(BTreeMap::new)
|
||||
.entry(proposer_id.clone())
|
||||
.or_insert_with(BTreeMap::new)
|
||||
.insert(our_id, share.clone());
|
||||
let content = MessageContent::DecryptionShare {
|
||||
proposer_id: proposer_id.clone(),
|
||||
share: share.clone(),
|
||||
share,
|
||||
};
|
||||
let message = Target::All.message(content.with_epoch(self.epoch));
|
||||
self.messages.0.push_back(message);
|
||||
let epoch = self.epoch;
|
||||
let our_id = self.netinfo.our_uid().clone();
|
||||
// Receive the share locally.
|
||||
let fault_log =
|
||||
self.handle_decryption_share_message(&our_id, epoch, proposer_id.clone(), share)?;
|
||||
Ok((true, fault_log))
|
||||
Ok(Target::All.message(content.with_epoch(epoch)).into())
|
||||
}
|
||||
|
||||
/// Verifies the shares of the current epoch that are pending verification. Returned are the
|
||||
|
@ -526,12 +504,13 @@ where
|
|||
&self,
|
||||
proposer_id: &NodeUid,
|
||||
ciphertext: &Ciphertext,
|
||||
epoch: u64,
|
||||
) -> (BTreeSet<NodeUid>, FaultLog<NodeUid>) {
|
||||
let mut incorrect_senders = BTreeSet::new();
|
||||
let mut fault_log = FaultLog::new();
|
||||
if let Some(sender_shares) = self
|
||||
.received_shares
|
||||
.get(&self.epoch)
|
||||
.get(&epoch)
|
||||
.and_then(|e| e.get(proposer_id))
|
||||
{
|
||||
for (sender_id, share) in sender_shares {
|
||||
|
@ -549,10 +528,11 @@ where
|
|||
&mut self,
|
||||
proposer_id: &NodeUid,
|
||||
incorrect_senders: BTreeSet<NodeUid>,
|
||||
epoch: u64,
|
||||
) {
|
||||
if let Some(sender_shares) = self
|
||||
.received_shares
|
||||
.get_mut(&self.epoch)
|
||||
.get_mut(&epoch)
|
||||
.and_then(|e| e.get_mut(proposer_id))
|
||||
{
|
||||
for sender_id in incorrect_senders {
|
||||
|
@ -567,23 +547,21 @@ where
|
|||
/// `epoch == Some(given_epoch)`.
|
||||
fn process_output(
|
||||
&mut self,
|
||||
step: common_subset::Step<NodeUid>,
|
||||
epoch: Option<u64>,
|
||||
) -> Result<FaultLog<NodeUid>> {
|
||||
let common_subset::Step {
|
||||
output,
|
||||
mut fault_log,
|
||||
mut messages,
|
||||
} = step;
|
||||
self.messages.extend_with_epoch(self.epoch, &mut messages);
|
||||
// If this is the current epoch, the message could cause a new output.
|
||||
if epoch.is_none() || epoch == Some(self.epoch) {
|
||||
for cs_output in output {
|
||||
fault_log.extend(self.send_decryption_shares(cs_output)?);
|
||||
// TODO: May also check that there is no further output from Common Subset.
|
||||
}
|
||||
cs_step: common_subset::Step<NodeUid>,
|
||||
epoch: u64,
|
||||
) -> Result<Step<C, NodeUid>> {
|
||||
let mut step = Step::default();
|
||||
let mut cs_outputs = step.extend_with(cs_step, |cs_msg| {
|
||||
MessageContent::CommonSubset(cs_msg).with_epoch(epoch)
|
||||
});
|
||||
if let Some(cs_output) = cs_outputs.pop_front() {
|
||||
// There is at most one output.
|
||||
step.extend(self.send_decryption_shares(cs_output, epoch)?);
|
||||
}
|
||||
Ok(fault_log)
|
||||
if !cs_outputs.is_empty() {
|
||||
error!("Multiple outputs from a single Common Subset instance.");
|
||||
}
|
||||
Ok(step)
|
||||
}
|
||||
|
||||
/// Removes all `CommonSubset` instances from _past_ epochs that have terminated.
|
||||
|
@ -686,21 +664,3 @@ impl<NodeUid: Rand> Message<NodeUid> {
|
|||
self.epoch
|
||||
}
|
||||
}
|
||||
|
||||
/// The queue of outgoing messages in a `HoneyBadger` instance.
|
||||
#[derive(Deref, DerefMut)]
|
||||
struct MessageQueue<NodeUid: Rand>(VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>);
|
||||
|
||||
impl<NodeUid: Clone + Debug + Ord + Rand> MessageQueue<NodeUid> {
|
||||
/// Appends to the queue the messages from `cs`, wrapped with `epoch`.
|
||||
fn extend_with_epoch(
|
||||
&mut self,
|
||||
epoch: u64,
|
||||
msgs: &mut VecDeque<TargetedMessage<common_subset::Message<NodeUid>, NodeUid>>,
|
||||
) {
|
||||
let convert = |msg: TargetedMessage<common_subset::Message<NodeUid>, NodeUid>| {
|
||||
msg.map(|cs_msg| MessageContent::CommonSubset(cs_msg).with_epoch(epoch))
|
||||
};
|
||||
self.extend(msgs.drain(..).map(convert));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::fmt::Debug;
|
|||
use std::iter::once;
|
||||
|
||||
use crypto::{PublicKey, PublicKeySet, PublicKeyShare, SecretKey, SecretKeyShare};
|
||||
use fault_log::FaultLog;
|
||||
use fault_log::{Fault, FaultLog};
|
||||
|
||||
/// Message sent by a given source.
|
||||
#[derive(Clone, Debug)]
|
||||
|
@ -156,6 +156,15 @@ impl<D: DistAlgorithm> From<FaultLog<D::NodeUid>> for Step<D> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<D: DistAlgorithm> From<Fault<D::NodeUid>> for Step<D> {
|
||||
fn from(fault: Fault<D::NodeUid>) -> Self {
|
||||
Step {
|
||||
fault_log: fault.into(),
|
||||
..Step::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<D: DistAlgorithm> From<TargetedMessage<D::Message, D::NodeUid>> for Step<D> {
|
||||
fn from(msg: TargetedMessage<D::Message, D::NodeUid>) -> Self {
|
||||
Step {
|
||||
|
|
Loading…
Reference in New Issue