Minor improvements to the Step API. (#292)

* Minor improvements to the Step API.

* Make use of DistAlgorithm::our_id.

* Rename Step::and to join.
This commit is contained in:
Andreas Fackler 2018-10-25 14:44:28 +02:00 committed by GitHub
parent c6e0406596
commit dda2f54a06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 93 additions and 93 deletions

View File

@ -232,9 +232,8 @@ impl<N: NodeIdT> BinaryAgreement<N> {
// Otherwise handle the `Term` as a `BVal`, `Aux` and `Conf`.
let mut sbvb_step = self.sbv_broadcast.handle_bval(sender_id, b)?;
sbvb_step.extend(self.sbv_broadcast.handle_aux(sender_id, b)?);
let mut step = self.handle_sbvb_step(sbvb_step)?;
step.extend(self.handle_conf(sender_id, BoolSet::from(b))?);
Ok(step)
let step = self.handle_sbvb_step(sbvb_step)?;
Ok(step.join(self.handle_conf(sender_id, BoolSet::from(b))?))
}
}
@ -272,12 +271,11 @@ impl<N: NodeIdT> BinaryAgreement<N> {
if !self.netinfo.is_validator() {
return Ok(Step::default());
}
let mut step: Step<_> = Target::All
let step: Step<_> = Target::All
.message(content.clone().with_epoch(self.epoch))
.into();
let our_id = &self.netinfo.our_id().clone();
step.extend(self.handle_message_content(our_id, content)?);
Ok(step)
let our_id = &self.our_id().clone();
Ok(step.join(self.handle_message_content(our_id, content)?))
}
/// Handles a step returned from the `ThresholdSign`.
@ -346,19 +344,19 @@ impl<N: NodeIdT> BinaryAgreement<N> {
}
// Output the Binary Agreement value.
let mut step = Step::default();
step.output.push_back(b);
step.output.push(b);
// Latch the decided state.
self.decision = Some(b);
debug!(
"{:?}/{:?} (is_validator: {}) decision: {}",
self.netinfo.our_id(),
self.our_id(),
self.proposer_id,
self.netinfo.is_validator(),
b
);
if self.netinfo.is_validator() {
let msg = MessageContent::Term(b).with_epoch(self.epoch + 1);
step.messages.push_back(Target::All.message(msg));
step.messages.push(Target::All.message(msg));
}
step
}
@ -374,9 +372,7 @@ impl<N: NodeIdT> BinaryAgreement<N> {
CoinState::Decided(_) => return Ok(Step::default()), // Coin has already decided.
CoinState::InProgress(ref mut ts) => ts.sign().map_err(Error::InvokeCoin)?,
};
let mut step = self.on_coin_step(ts_step)?;
step.extend(self.try_update_epoch()?);
Ok(step)
Ok(self.on_coin_step(ts_step)?.join(self.try_update_epoch()?))
}
/// Counts the number of received `Conf` messages with values in `bin_values`.
@ -397,7 +393,7 @@ impl<N: NodeIdT> BinaryAgreement<N> {
self.coin_state = self.coin_state();
debug!(
"{:?} BinaryAgreement instance {:?} started epoch {}, {} terminated",
self.netinfo.our_id(),
self.our_id(),
self.proposer_id,
self.epoch,
self.received_conf.len(),

View File

@ -147,10 +147,9 @@ impl<N: NodeIdT> SbvBroadcast<N> {
if !self.netinfo.is_validator() {
return Ok(Step::default());
}
let mut step: Step<_> = Target::All.message(msg.clone()).into();
let our_id = &self.netinfo.our_id().clone();
step.extend(self.handle_message(our_id, msg)?);
Ok(step)
let step: Step<_> = Target::All.message(msg.clone()).into();
let our_id = &self.our_id().clone();
Ok(step.join(self.handle_message(our_id, msg)?))
}
/// Multicasts a `BVal(b)` message, and handles it.

View File

@ -87,7 +87,7 @@ impl<N: NodeIdT> Broadcast<N> {
/// Initiates the broadcast. This must only be called in the proposer node.
pub fn broadcast(&mut self, input: Vec<u8>) -> Result<Step<N>> {
if *self.netinfo.our_id() != self.proposer_id {
if *self.our_id() != self.proposer_id {
return Err(Error::InstanceCannotPropose);
}
if self.value_sent {
@ -97,10 +97,9 @@ impl<N: NodeIdT> Broadcast<N> {
// Split the value into chunks/shards, encode them with erasure codes.
// Assemble a Merkle tree from data and parity shards. Take all proofs
// from this tree and send them, each to its own node.
let (proof, mut step) = self.send_shards(input)?;
let our_id = &self.netinfo.our_id().clone();
step.extend(self.handle_value(our_id, proof)?);
Ok(step)
let (proof, step) = self.send_shards(input)?;
let our_id = &self.our_id().clone();
Ok(step.join(self.handle_value(our_id, proof)?))
}
/// Handles a message received from `sender_id`.
@ -172,13 +171,13 @@ impl<N: NodeIdT> Broadcast<N> {
// Send each proof to a node.
for (index, id) in self.netinfo.all_ids().enumerate() {
let proof = mtree.proof(index).ok_or(Error::ProofConstructionFailed)?;
if *id == *self.netinfo.our_id() {
if *id == *self.our_id() {
// The proof is addressed to this node.
result = Ok(proof);
} else {
// Rest of the proofs are sent to remote nodes.
let msg = Target::Node(id.clone()).message(Message::Value(proof));
step.messages.push_back(msg);
step.messages.push(msg);
}
}
@ -191,7 +190,7 @@ impl<N: NodeIdT> Broadcast<N> {
if *sender_id != self.proposer_id {
info!(
"Node {:?} received Value from {:?} instead of {:?}.",
self.netinfo.our_id(),
self.our_id(),
sender_id,
self.proposer_id
);
@ -199,7 +198,7 @@ impl<N: NodeIdT> Broadcast<N> {
return Ok(Fault::new(sender_id.clone(), fault_kind).into());
}
if self.echo_sent {
info!("Node {:?} received multiple Values.", self.netinfo.our_id());
info!("Node {:?} received multiple Values.", self.our_id());
if self.echos.get(self.our_id()) == Some(&p) {
return Ok(Step::default());
} else {
@ -208,7 +207,7 @@ impl<N: NodeIdT> Broadcast<N> {
}
// If the proof is invalid, log the faulty node behavior and ignore.
if !self.validate_proof(&p, &self.netinfo.our_id()) {
if !self.validate_proof(&p, &self.our_id()) {
return Ok(Fault::new(sender_id.clone(), FaultKind::InvalidProof).into());
}
@ -222,7 +221,7 @@ impl<N: NodeIdT> Broadcast<N> {
if self.echos.contains_key(sender_id) {
info!(
"Node {:?} received multiple Echos from {:?}.",
self.netinfo.our_id(),
self.our_id(),
sender_id,
);
return Ok(Step::default());
@ -252,7 +251,7 @@ impl<N: NodeIdT> Broadcast<N> {
if self.readys.contains_key(sender_id) {
info!(
"Node {:?} received multiple Readys from {:?}.",
self.netinfo.our_id(),
self.our_id(),
sender_id
);
return Ok(Step::default());
@ -267,8 +266,7 @@ impl<N: NodeIdT> Broadcast<N> {
// Enqueue a broadcast of a Ready message.
step.extend(self.send_ready(hash)?);
}
step.extend(self.compute_output(hash)?);
Ok(step)
Ok(step.join(self.compute_output(hash)?))
}
/// Sends an `Echo` message and handles it. Does nothing if we are only an observer.
@ -278,10 +276,9 @@ impl<N: NodeIdT> Broadcast<N> {
return Ok(Step::default());
}
let echo_msg = Message::Echo(p.clone());
let mut step: Step<_> = Target::All.message(echo_msg).into();
let our_id = &self.netinfo.our_id().clone();
step.extend(self.handle_echo(our_id, p)?);
Ok(step)
let step: Step<_> = Target::All.message(echo_msg).into();
let our_id = &self.our_id().clone();
Ok(step.join(self.handle_echo(our_id, p)?))
}
/// Sends a `Ready` message and handles it. Does nothing if we are only an observer.
@ -291,10 +288,9 @@ impl<N: NodeIdT> Broadcast<N> {
return Ok(Step::default());
}
let ready_msg = Message::Ready(*hash);
let mut step: Step<_> = Target::All.message(ready_msg).into();
let our_id = &self.netinfo.our_id().clone();
step.extend(self.handle_ready(our_id, hash)?);
Ok(step)
let step: Step<_> = Target::All.message(ready_msg).into();
let our_id = &self.our_id().clone();
Ok(step.join(self.handle_ready(our_id, hash)?))
}
/// Checks whether the conditions for output are met for this hash, and if so, sets the output
@ -336,14 +332,14 @@ impl<N: NodeIdT> Broadcast<N> {
if !p.validate(self.netinfo.num_nodes()) {
info!(
"Node {:?} received invalid proof: {:?}",
self.netinfo.our_id(),
self.our_id(),
HexProof(&p)
);
false
} else if self.netinfo.node_index(id) != Some(p.index()) {
info!(
"Node {:?} received proof for wrong position: {:?}.",
self.netinfo.our_id(),
self.our_id(),
HexProof(&p)
);
false

View File

@ -297,7 +297,7 @@ where
} else {
ChangeState::None
};
step.output.push_back(Batch {
step.output.push(Batch {
epoch: batch_epoch,
change,
netinfo: Arc::new(self.netinfo.clone()),
@ -390,7 +390,7 @@ where
bincode::serialize(&kg_msg).map_err(|err| ErrorKind::SendTransactionBincode(*err))?;
let sig = Box::new(self.netinfo.secret_key().sign(ser));
if self.netinfo.is_validator() {
let our_id = self.netinfo.our_id().clone();
let our_id = self.our_id().clone();
let signed_msg =
SignedKeyGenMsg(self.start_epoch, our_id, kg_msg.clone(), *sig.clone());
self.key_gen_msg_buffer.push(signed_msg);

View File

@ -1,5 +1,5 @@
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::collections::{BTreeMap, BTreeSet};
use std::marker::PhantomData;
use std::mem::replace;
use std::sync::Arc;
@ -292,7 +292,7 @@ where
/// Checks whether the subset has output, and if it does, sends out our decryption shares.
fn process_subset(&mut self, cs_step: cs::Step<N>) -> Result<Step<C, N>> {
let mut step = Step::default();
let cs_outputs: VecDeque<_> = step.extend_with(cs_step, |cs_msg| {
let cs_outputs = step.extend_with(cs_step, |cs_msg| {
MessageContent::Subset(cs_msg).with_epoch(self.epoch)
});
let mut has_seen_done = false;

View File

@ -96,9 +96,8 @@ where
.public_key()
.encrypt_with_rng(&mut self.rng, ser_prop);
let epoch = self.epoch;
let mut step = self.epoch_state_mut(epoch)?.propose(&ciphertext)?;
step.extend(self.try_output_batches()?);
Ok(step)
let step = self.epoch_state_mut(epoch)?.propose(&ciphertext)?;
Ok(step.join(self.try_output_batches()?))
}
/// Handles a message received from `sender_id`.
@ -116,11 +115,10 @@ where
.or_insert_with(Vec::new)
.push((sender_id.clone(), content));
} else if self.epoch <= epoch {
let mut step = self
let step = self
.epoch_state_mut(epoch)?
.handle_message_content(sender_id, content)?;
step.extend(self.try_output_batches()?);
return Ok(step);
return Ok(step.join(self.try_output_batches()?));
} // And ignore all messages from past epochs.
Ok(Step::default())
}
@ -164,7 +162,7 @@ where
.and_then(EpochState::try_output_batch)
{
// Queue the output and advance the epoch.
step.output.push_back(batch);
step.output.push(batch);
step.fault_log.extend(fault_log);
step.extend(self.update_epoch()?);
}

View File

@ -55,68 +55,81 @@ impl<N: NodeIdT> NetworkInfo<N> {
}
/// The ID of the node the algorithm runs on.
#[inline]
pub fn our_id(&self) -> &N {
&self.our_id
}
/// ID of all nodes in the network.
#[inline]
pub fn all_ids(&self) -> impl Iterator<Item = &N> {
self.public_keys.keys()
}
/// The total number _N_ of nodes.
#[inline]
pub fn num_nodes(&self) -> usize {
self.num_nodes
}
/// The maximum number _f_ of faulty, Byzantine nodes up to which Honey Badger is guaranteed to
/// be correct.
#[inline]
pub fn num_faulty(&self) -> usize {
self.num_faulty
}
/// The minimum number _N - f_ of correct nodes with which Honey Badger is guaranteed to be
/// correct.
#[inline]
pub fn num_correct(&self) -> usize {
self.num_nodes - self.num_faulty
}
/// Returns our secret key share for threshold cryptography.
#[inline]
pub fn secret_key_share(&self) -> &SecretKeyShare {
&self.secret_key_share
}
/// Returns our secret key for encryption and signing.
#[inline]
pub fn secret_key(&self) -> &SecretKey {
&self.secret_key
}
/// Returns the public key set for threshold cryptography.
#[inline]
pub fn public_key_set(&self) -> &PublicKeySet {
&self.public_key_set
}
/// Returns the public key share if a node with that ID exists, otherwise `None`.
#[inline]
pub fn public_key_share(&self, id: &N) -> Option<&PublicKeyShare> {
self.public_key_shares.get(id)
}
/// Returns a map of all node IDs to their public key shares.
#[inline]
pub fn public_key_share_map(&self) -> &BTreeMap<N, PublicKeyShare> {
&self.public_key_shares
}
/// Returns a map of all node IDs to their public keys.
#[inline]
pub fn public_key(&self, id: &N) -> Option<&PublicKey> {
self.public_keys.get(id)
}
/// Returns a map of all node IDs to their public keys.
#[inline]
pub fn public_key_map(&self) -> &BTreeMap<N, PublicKey> {
&self.public_keys
}
/// The index of a node in a canonical numbering of all nodes.
#[inline]
pub fn node_index(&self, id: &N) -> Option<usize> {
self.node_indices.get(id).cloned()
}
@ -127,18 +140,21 @@ impl<N: NodeIdT> NetworkInfo<N> {
/// each invocation, or makes it unsafe to reuse keys for different invocations. A better
/// invocation ID would be one that is distributed to all nodes on each invocation and would be
/// independent from the public key, so that reusing keys would be safer.
#[inline]
pub fn invocation_id(&self) -> Vec<u8> {
self.public_key_set.public_key().to_bytes()
}
/// Returns `true` if this node takes part in the consensus itself. If not, it is only an
/// observer.
#[inline]
pub fn is_validator(&self) -> bool {
self.is_validator
}
/// Returns `true` if the given node takes part in the consensus itself. If not, it is only an
/// observer.
#[inline]
pub fn is_node_validator(&self, id: &N) -> bool {
self.public_keys.contains_key(id)
}

View File

@ -251,20 +251,19 @@ where
/// This stores a pending vote for the change. It will be included in some future batch, and
/// once enough validators have been voted for the same change, it will take effect.
pub fn vote_for(&mut self, change: Change<N>) -> Result<Step<T, N, Q>> {
let mut step = self
Ok(self
.dyn_hb
.handle_input(Input::Change(change))
.map_err(ErrorKind::Input)?
.convert();
step.extend(self.propose()?);
Ok(step)
.convert()
.join(self.propose()?))
}
/// Handles a message received from `sender_id`.
///
/// This must be called with every message we receive from another node.
pub fn handle_message(&mut self, sender_id: &N, message: Message<N>) -> Result<Step<T, N, Q>> {
let mut step = self
let step = self
.dyn_hb
.handle_message(sender_id, message)
.map_err(ErrorKind::HandleMessage)?
@ -272,8 +271,7 @@ where
for batch in &step.output {
self.queue.remove_multiple(batch.iter());
}
step.extend(self.propose()?);
Ok(step)
Ok(step.join(self.propose()?))
}
/// Returns a reference to the internal `DynamicHoneyBadger` instance.

View File

@ -160,7 +160,7 @@ impl<N: NodeIdT + Rand> Subset<N> {
if !self.netinfo.is_validator() {
return Ok(Step::default());
}
let id = self.netinfo.our_id().clone();
let id = self.our_id().clone();
debug!("{:?} Proposing {:0.10}", id, HexFmt(&value));
self.process_broadcast(&id, |bc| bc.handle_input(value))
}
@ -234,7 +234,7 @@ impl<N: NodeIdT + Rand> Subset<N> {
let val_to_insert = if let Some(true) = self.ba_results.get(proposer_id) {
debug!(" {:?} → {:0.10}", proposer_id, HexFmt(&value));
step.output
.extend(Some(SubsetOutput::Contribution(proposer_id.clone(), value)));
.push(SubsetOutput::Contribution(proposer_id.clone(), value));
None
} else {
Some(value)
@ -247,8 +247,7 @@ impl<N: NodeIdT + Rand> Subset<N> {
error!("Duplicate insert in broadcast_results: {:?}", inval)
}
let set_binary_agreement_input = |ba: &mut BinaryAgreement<N>| ba.handle_input(true);
step.extend(self.process_binary_agreement(proposer_id, set_binary_agreement_input)?);
Ok(step)
Ok(step.join(self.process_binary_agreement(proposer_id, set_binary_agreement_input)?))
}
/// Callback to be invoked on receipt of the decision value of the Binary Agreement
@ -289,7 +288,7 @@ impl<N: NodeIdT + Rand> Subset<N> {
debug!(
"{:?} Updated Binary Agreement results: {:?}",
self.netinfo.our_id(),
self.our_id(),
self.ba_results
);
@ -318,12 +317,11 @@ impl<N: NodeIdT + Rand> Subset<N> {
{
debug!(" {:?} → {:0.10}", proposer_id, HexFmt(&value));
step.output
.extend(Some(SubsetOutput::Contribution(proposer_id.clone(), value)));
.push(SubsetOutput::Contribution(proposer_id.clone(), value));
}
}
step.output.extend(self.try_binary_agreement_completion());
Ok(step)
Ok(step.with_output(self.try_binary_agreement_completion()))
}
/// Returns the number of Binary Agreement instances that have decided "yes".
@ -343,7 +341,7 @@ impl<N: NodeIdT + Rand> Subset<N> {
}
debug!(
"{:?} All Binary Agreement instances have terminated",
self.netinfo.our_id()
self.our_id()
);
// All instances of BinaryAgreement that delivered `true` (or "1" in the paper).
let delivered_1: BTreeSet<&N> = self
@ -366,10 +364,7 @@ impl<N: NodeIdT + Rand> Subset<N> {
.collect();
if delivered_1.len() == broadcast_results.len() {
debug!(
"{:?} Binary Agreement instances completed:",
self.netinfo.our_id()
);
debug!("{:?} Binary Agreement instances completed:", self.our_id());
self.decided = true;
Some(SubsetOutput::Done)
} else {

View File

@ -104,7 +104,7 @@ impl<N: NodeIdT> ThresholdDecryption<N> {
step.fault_log.extend(self.remove_invalid_shares());
if self.netinfo.is_validator() {
let msg = Target::All.message(Message(share.clone()));
step.messages.push_back(msg);
step.messages.push(msg);
self.shares.insert(our_id, share);
}
step.extend(self.try_output()?);

View File

@ -115,7 +115,7 @@ impl<N: NodeIdT> ThresholdSign<N> {
}
let msg = Message(self.netinfo.secret_key_share().sign_g2(self.msg_hash));
let mut step: Step<_> = Target::All.message(msg.clone()).into();
let id = self.netinfo.our_id().clone();
let id = self.our_id().clone();
step.extend(self.handle_message(&id, msg)?);
Ok(step)
}
@ -146,13 +146,13 @@ impl<N: NodeIdT> ThresholdSign<N> {
fn try_output(&mut self) -> Result<Step<N>> {
debug!(
"{:?} received {} shares, had_input = {}",
self.netinfo.our_id(),
self.our_id(),
self.received_shares.len(),
self.had_input
);
if self.had_input && self.received_shares.len() > self.netinfo.num_faulty() {
let sig = self.combine_and_verify_sig()?;
debug!("{:?} output {:?}", self.netinfo.our_id(), sig);
debug!("{:?} output {:?}", self.our_id(), sig);
self.terminated = true;
let step = self.handle_input(())?; // Before terminating, make sure we sent our share.
Ok(step.with_output(sig))
@ -177,10 +177,7 @@ impl<N: NodeIdT> ThresholdSign<N> {
.verify_g2(&sig, self.msg_hash)
{
// Abort
error!(
"{:?} main public key verification failed",
self.netinfo.our_id()
);
error!("{:?} main public key verification failed", self.our_id());
Err(Error::VerificationFailed)
} else {
Ok(sig)

View File

@ -1,6 +1,5 @@
//! Common supertraits for distributed algorithms.
use std::collections::VecDeque;
use std::fmt::Debug;
use std::hash::Hash;
use std::iter::once;
@ -31,9 +30,9 @@ where
D: DistAlgorithm,
<D as DistAlgorithm>::NodeId: NodeIdT,
{
pub output: VecDeque<D::Output>,
pub output: Vec<D::Output>,
pub fault_log: FaultLog<D::NodeId>,
pub messages: VecDeque<TargetedMessage<D::Message, D::NodeId>>,
pub messages: Vec<TargetedMessage<D::Message, D::NodeId>>,
}
impl<D> Default for Step<D>
@ -43,9 +42,9 @@ where
{
fn default() -> Step<D> {
Step {
output: VecDeque::default(),
output: Vec::default(),
fault_log: FaultLog::default(),
messages: VecDeque::default(),
messages: Vec::default(),
}
}
}
@ -56,9 +55,9 @@ where
{
/// Creates a new `Step` from the given collections.
pub fn new(
output: VecDeque<D::Output>,
output: Vec<D::Output>,
fault_log: FaultLog<D::NodeId>,
messages: VecDeque<TargetedMessage<D::Message, D::NodeId>>,
messages: Vec<TargetedMessage<D::Message, D::NodeId>>,
) -> Self {
Step {
output,
@ -68,8 +67,8 @@ where
}
/// Returns the same step, with the given additional output.
pub fn with_output(mut self, output: D::Output) -> Self {
self.output.push_back(output);
pub fn with_output<T: Into<Option<D::Output>>>(mut self, output: T) -> Self {
self.output.extend(output.into());
self
}
@ -89,7 +88,7 @@ where
}
/// Extends `self` with `other`s messages and fault logs, and returns `other.output`.
pub fn extend_with<D2, FM>(&mut self, other: Step<D2>, f_msg: FM) -> VecDeque<D2::Output>
pub fn extend_with<D2, FM>(&mut self, other: Step<D2>, f_msg: FM) -> Vec<D2::Output>
where
D2: DistAlgorithm<NodeId = D::NodeId>,
FM: Fn(D2::Message) -> D::Message,
@ -107,6 +106,12 @@ where
self.messages.extend(other.messages);
}
/// Extends this step with `other` and returns the result.
pub fn join(mut self, other: Self) -> Self {
self.extend(other);
self
}
/// Converts this step into an equivalent step for a different `DistAlgorithm`.
// This cannot be a `From` impl, because it would conflict with `impl From<T> for T`.
pub fn convert<D2>(self) -> Step<D2>

View File

@ -671,7 +671,7 @@ where
// Perform sorting and drain `Vec` back into `VecDeque`.
msgs.sort_by(f);
self.messages.extend(msgs.into_iter());
self.messages.extend(msgs);
}
}

View File

@ -24,7 +24,7 @@ pub struct TestNode<D: DistAlgorithm> {
/// The values this node has output so far.
outputs: Vec<D::Output>,
/// Outgoing messages to be sent to other nodes.
messages: VecDeque<TargetedMessage<D::Message, D::NodeId>>,
messages: Vec<TargetedMessage<D::Message, D::NodeId>>,
/// Collected fault logs.
faults: Vec<Fault<D::NodeId>>,
}