Remove output and message queue from CommonSubset.

This commit is contained in:
Andreas Fackler 2018-07-24 13:12:06 +02:00
parent 990899327e
commit 9d0f1b3d15
11 changed files with 50 additions and 93 deletions

View File

@ -19,7 +19,6 @@ travis-ci = { repository = "poanetwork/hbbft" }
[dependencies]
bincode = "1.0.0"
byteorder = "1.2.3"
derive_deref = "1.0.1"
env_logger = "0.5.10"
error-chain = "0.11.0"
init_with = "1.1.0"

View File

@ -145,6 +145,7 @@ impl rand::Rand for AgreementContent {
/// Possible values of the common coin schedule defining the method to derive the common coin in a
/// given epoch: as a constant value or a distributed computation.
#[derive(Debug)]
enum CoinSchedule {
False,
True,
@ -152,6 +153,7 @@ enum CoinSchedule {
}
/// Binary Agreement instance
#[derive(Debug)]
pub struct Agreement<NodeUid> {
/// Shared network information.
netinfo: Arc<NetworkInfo<NodeUid>>,
@ -639,7 +641,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
}
}
#[derive(Clone)]
#[derive(Clone, Debug)]
struct Nonce(Vec<u8>);
impl Nonce {

View File

@ -201,6 +201,7 @@ impl Debug for BroadcastMessage {
}
/// Reliable Broadcast algorithm instance.
#[derive(Debug)]
pub struct Broadcast<NodeUid> {
/// Shared network data.
netinfo: Arc<NetworkInfo<NodeUid>>,
@ -559,6 +560,7 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
}
/// A wrapper for `ReedSolomon` that doesn't panic if there are no parity shards.
#[derive(Debug)]
enum Coding {
/// A `ReedSolomon` instance with at least one parity shard.
ReedSolomon(Box<ReedSolomon>),

View File

@ -23,16 +23,15 @@
//! * Once all `Agreement` instances have decided, `CommonSubset` returns the set of all proposed
//! values for which the decision was "yes".
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug;
use std::result;
use std::sync::Arc;
use agreement::{self, Agreement, AgreementMessage};
use broadcast::{self, Broadcast, BroadcastMessage};
use fault_log::FaultLog;
use fmt::HexBytes;
use messaging::{self, DistAlgorithm, NetworkInfo, TargetedMessage};
use messaging::{self, DistAlgorithm, NetworkInfo};
use rand::Rand;
error_chain!{
@ -61,37 +60,8 @@ pub enum Message<NodeUid: Rand> {
Agreement(NodeUid, AgreementMessage),
}
/// The queue of outgoing messages in a `CommonSubset` instance.
#[derive(Deref, DerefMut)]
struct MessageQueue<NodeUid: Rand>(VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>);
impl<NodeUid: Clone + Debug + Ord + Rand> MessageQueue<NodeUid> {
/// Appends to the queue the messages from `agr`, wrapped with `proposer_id`.
fn extend_agreement(
&mut self,
proposer_id: &NodeUid,
msgs: &mut VecDeque<TargetedMessage<AgreementMessage, NodeUid>>,
) {
let convert = |msg: TargetedMessage<AgreementMessage, NodeUid>| {
msg.map(|a_msg| Message::Agreement(proposer_id.clone(), a_msg))
};
self.extend(msgs.drain(..).map(convert));
}
/// Appends to the queue the messages from `bc`, wrapped with `proposer_id`.
fn extend_broadcast(
&mut self,
proposer_id: &NodeUid,
msgs: &mut VecDeque<TargetedMessage<BroadcastMessage, NodeUid>>,
) {
let convert = |msg: TargetedMessage<BroadcastMessage, NodeUid>| {
msg.map(|b_msg| Message::Broadcast(proposer_id.clone(), b_msg))
};
self.extend(msgs.drain(..).map(convert));
}
}
/// Asynchronous Common Subset algorithm instance
#[derive(Debug)]
pub struct CommonSubset<NodeUid: Rand> {
/// Shared network information.
netinfo: Arc<NetworkInfo<NodeUid>>,
@ -99,10 +69,6 @@ pub struct CommonSubset<NodeUid: Rand> {
agreement_instances: BTreeMap<NodeUid, Agreement<NodeUid>>,
broadcast_results: BTreeMap<NodeUid, ProposedValue>,
agreement_results: BTreeMap<NodeUid, bool>,
/// Outgoing message queue.
messages: MessageQueue<NodeUid>,
/// The output value of the algorithm.
output: Option<BTreeMap<NodeUid, ProposedValue>>,
/// Whether the instance has decided on a value.
decided: bool,
}
@ -122,8 +88,7 @@ impl<NodeUid: Clone + Debug + Ord + Rand> DistAlgorithm for CommonSubset<NodeUid
self.netinfo.our_uid(),
HexBytes(&input)
);
let fault_log = self.send_proposed_value(input)?;
self.step(fault_log)
self.send_proposed_value(input)
}
fn handle_message(
@ -131,15 +96,14 @@ impl<NodeUid: Clone + Debug + Ord + Rand> DistAlgorithm for CommonSubset<NodeUid
sender_id: &Self::NodeUid,
message: Self::Message,
) -> Result<Step<NodeUid>> {
let fault_log = match message {
Message::Broadcast(p_id, b_msg) => self.handle_broadcast(sender_id, &p_id, b_msg)?,
Message::Agreement(p_id, a_msg) => self.handle_agreement(sender_id, &p_id, a_msg)?,
};
self.step(fault_log)
match message {
Message::Broadcast(p_id, b_msg) => self.handle_broadcast(sender_id, &p_id, b_msg),
Message::Agreement(p_id, a_msg) => self.handle_agreement(sender_id, &p_id, a_msg),
}
}
fn terminated(&self) -> bool {
self.messages.is_empty() && self.agreement_instances.values().all(Agreement::terminated)
self.agreement_instances.values().all(Agreement::terminated)
}
fn our_id(&self) -> &Self::NodeUid {
@ -173,25 +137,15 @@ impl<NodeUid: Clone + Debug + Ord + Rand> CommonSubset<NodeUid> {
agreement_instances,
broadcast_results: BTreeMap::new(),
agreement_results: BTreeMap::new(),
messages: MessageQueue(VecDeque::new()),
output: None,
decided: false,
})
}
fn step(&mut self, fault_log: FaultLog<NodeUid>) -> Result<Step<NodeUid>> {
Ok(Step::new(
self.output.take().into_iter().collect(),
fault_log,
self.messages.drain(..).collect(),
))
}
/// Common Subset input message handler. It receives a value for broadcast
/// and redirects it to the corresponding broadcast instance.
pub fn send_proposed_value(&mut self, value: ProposedValue) -> Result<FaultLog<NodeUid>> {
pub fn send_proposed_value(&mut self, value: ProposedValue) -> Result<Step<NodeUid>> {
if !self.netinfo.is_validator() {
return Ok(FaultLog::new());
return Ok(Step::default());
}
let uid = self.netinfo.our_uid().clone();
// Upon receiving input v_i , input v_i to RBC_i. See Figure 2.
@ -205,7 +159,7 @@ impl<NodeUid: Clone + Debug + Ord + Rand> CommonSubset<NodeUid> {
sender_id: &NodeUid,
proposer_id: &NodeUid,
bmessage: BroadcastMessage,
) -> Result<FaultLog<NodeUid>> {
) -> Result<Step<NodeUid>> {
self.process_broadcast(proposer_id, |bc| bc.handle_message(sender_id, bmessage))
}
@ -216,7 +170,7 @@ impl<NodeUid: Clone + Debug + Ord + Rand> CommonSubset<NodeUid> {
sender_id: &NodeUid,
proposer_id: &NodeUid,
amessage: AgreementMessage,
) -> Result<FaultLog<NodeUid>> {
) -> Result<Step<NodeUid>> {
// Send the message to the local instance of Agreement
self.process_agreement(proposer_id, |agreement| {
agreement.handle_message(sender_id, amessage)
@ -225,25 +179,23 @@ impl<NodeUid: Clone + Debug + Ord + Rand> CommonSubset<NodeUid> {
/// Upon delivery of v_j from RBC_j, if input has not yet been provided to
/// BA_j, then provide input 1 to BA_j. See Figure 11.
fn process_broadcast<F>(&mut self, proposer_id: &NodeUid, f: F) -> Result<FaultLog<NodeUid>>
fn process_broadcast<F>(&mut self, proposer_id: &NodeUid, f: F) -> Result<Step<NodeUid>>
where
F: FnOnce(&mut Broadcast<NodeUid>)
-> result::Result<broadcast::Step<NodeUid>, broadcast::Error>,
{
let mut fault_log = FaultLog::new();
let mut step = Step::default();
let value = {
let broadcast = self
.broadcast_instances
.get_mut(proposer_id)
.ok_or(ErrorKind::NoSuchBroadcastInstance)?;
let mut step = f(broadcast)?;
fault_log.extend(step.fault_log);
self.messages
.extend_broadcast(&proposer_id, &mut step.messages);
if let Some(output) = step.output.into_iter().next() {
let to_msg = |b_msg| Message::Broadcast(proposer_id.clone(), b_msg);
let output = step.extend_with(f(broadcast)?, to_msg);
if let Some(output) = output.into_iter().next() {
output
} else {
return Ok(fault_log);
return Ok(step);
}
};
self.broadcast_results.insert(proposer_id.clone(), value);
@ -254,35 +206,32 @@ impl<NodeUid: Clone + Debug + Ord + Rand> CommonSubset<NodeUid> {
Ok(agreement::Step::default())
}
};
self.process_agreement(proposer_id, set_agreement_input)?
.merge_into(&mut fault_log);
Ok(fault_log)
step.extend(self.process_agreement(proposer_id, set_agreement_input)?);
Ok(step)
}
/// Callback to be invoked on receipt of the decision value of the Agreement
/// instance `uid`.
fn process_agreement<F>(&mut self, proposer_id: &NodeUid, f: F) -> Result<FaultLog<NodeUid>>
fn process_agreement<F>(&mut self, proposer_id: &NodeUid, f: F) -> Result<Step<NodeUid>>
where
F: FnOnce(&mut Agreement<NodeUid>)
-> result::Result<agreement::Step<NodeUid>, agreement::Error>,
{
let mut fault_log = FaultLog::new();
let mut step = Step::default();
let value = {
let agreement = self
.agreement_instances
.get_mut(proposer_id)
.ok_or(ErrorKind::NoSuchAgreementInstance)?;
if agreement.terminated() {
return Ok(fault_log);
return Ok(step);
}
let mut step = f(agreement)?;
fault_log.extend(step.fault_log);
self.messages
.extend_agreement(proposer_id, &mut step.messages);
if let Some(output) = step.output.into_iter().next() {
let to_msg = |a_msg| Message::Agreement(proposer_id.clone(), a_msg);
let output = step.extend_with(f(agreement)?, to_msg);
if let Some(output) = output.into_iter().next() {
output
} else {
return Ok(fault_log);
return Ok(step);
}
};
if self
@ -303,10 +252,8 @@ impl<NodeUid: Clone + Debug + Ord + Rand> CommonSubset<NodeUid> {
// input 0 to each instance of BA that has not yet been provided input.
for (uid, agreement) in &mut self.agreement_instances {
if agreement.accepts_input() {
let mut step = agreement.input(false)?;
fault_log.extend(step.fault_log);
self.messages.extend_agreement(uid, &mut step.messages);
if let Some(output) = step.output.into_iter().next() {
let to_msg = |a_msg| Message::Agreement(uid.clone(), a_msg);
for output in step.extend_with(agreement.input(false)?, to_msg) {
if self.agreement_results.insert(uid.clone(), output).is_some() {
return Err(ErrorKind::MultipleAgreementResults.into());
}
@ -314,8 +261,8 @@ impl<NodeUid: Clone + Debug + Ord + Rand> CommonSubset<NodeUid> {
}
}
}
self.try_agreement_completion();
Ok(fault_log)
step.output.extend(self.try_agreement_completion());
Ok(step)
}
/// Returns the number of agreement instances that have decided "yes".
@ -323,16 +270,16 @@ impl<NodeUid: Clone + Debug + Ord + Rand> CommonSubset<NodeUid> {
self.agreement_results.values().filter(|v| **v).count()
}
fn try_agreement_completion(&mut self) {
fn try_agreement_completion(&mut self) -> Option<BTreeMap<NodeUid, ProposedValue>> {
if self.decided || self.count_true() < self.netinfo.num_nodes() - self.netinfo.num_faulty()
{
return;
return None;
}
// Once all instances of BA have completed, let C ⊂ [1..N] be
// the indexes of each BA that delivered 1. Wait for the output
// v_j for each RBC_j such that j∈C. Finally output j∈C v_j.
if self.agreement_results.len() < self.netinfo.num_nodes() {
return;
return None;
}
debug!(
"{:?} All Agreement instances have terminated",
@ -364,7 +311,9 @@ impl<NodeUid: Clone + Debug + Ord + Rand> CommonSubset<NodeUid> {
debug!(" {:?} → {:?}", uid, HexBytes(&result));
}
self.decided = true;
self.output = Some(broadcast_results)
Some(broadcast_results)
} else {
None
}
}
}

View File

@ -93,6 +93,7 @@ pub enum Input<C, NodeUid> {
}
/// A Honey Badger instance that can handle adding and removing nodes.
#[derive(Debug)]
pub struct DynamicHoneyBadger<C, NodeUid: Rand> {
/// Shared network data.
netinfo: NetworkInfo<NodeUid>,

View File

@ -15,6 +15,7 @@ use messaging::NetworkInfo;
///
/// This is reset whenever the set of validators changes or a change reaches a majority. We call
/// the epochs since the last reset the current _era_.
#[derive(Debug)]
pub struct VoteCounter<NodeUid> {
/// Shared network data.
netinfo: Arc<NetworkInfo<NodeUid>>,

View File

@ -102,6 +102,7 @@ where
}
/// An instance of the Honey Badger Byzantine fault tolerant consensus algorithm.
#[derive(Debug)]
pub struct HoneyBadger<C, NodeUid: Rand> {
/// Shared network data.
netinfo: Arc<NetworkInfo<NodeUid>>,

View File

@ -106,8 +106,6 @@
extern crate bincode;
extern crate byteorder;
#[macro_use(Deref, DerefMut)]
extern crate derive_deref;
#[macro_use]
extern crate error_chain;
extern crate init_with;

View File

@ -53,6 +53,7 @@ impl<M, N> TargetedMessage<M, N> {
/// Result of one step of the local state machine of a distributed algorithm. Such a result should
/// be used and never discarded by the client of the algorithm.
#[must_use = "The algorithm step result must be used."]
#[derive(Debug)]
pub struct Step<D>
where
D: DistAlgorithm,

View File

@ -106,6 +106,7 @@ where
/// A Honey Badger instance that can handle adding and removing nodes and manages a transaction
/// queue.
#[derive(Debug)]
pub struct QueueingHoneyBadger<Tx, NodeUid>
where
Tx: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash,

View File

@ -200,6 +200,7 @@ impl Debug for Ack {
}
/// The information needed to track a single proposer's secret sharing process.
#[derive(Debug)]
struct ProposalState {
/// The proposer's commitment.
commit: BivarCommitment,
@ -240,6 +241,7 @@ pub enum PartOutcome<NodeUid: Clone> {
/// A synchronous algorithm for dealerless distributed key generation.
///
/// It requires that all nodes handle all messages in the exact same order.
#[derive(Debug)]
pub struct SyncKeyGen<NodeUid> {
/// Our node ID.
our_uid: NodeUid,