mirror of https://github.com/poanetwork/hbbft.git
Merge pull request #11 from poanetwork/vk-agreement
The ACS and BA algorithms
This commit is contained in:
commit
d9febca3c3
|
@ -9,13 +9,14 @@ log = "0.4.1"
|
|||
reed-solomon-erasure = "3.0"
|
||||
merkle = { git = "https://github.com/vkomenda/merkle.rs", branch = "public-proof" }
|
||||
ring = "^0.12"
|
||||
rand = "*"
|
||||
protobuf = "1.4.4"
|
||||
crossbeam = "0.3.2"
|
||||
crossbeam-channel = "0.1"
|
||||
itertools = "0.7"
|
||||
|
||||
[build-dependencies]
|
||||
protoc-rust = "1.4.4"
|
||||
|
||||
[dev-dependencies]
|
||||
docopt = "0.8"
|
||||
rand = "0.3"
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
TODO
|
||||
====
|
||||
|
||||
* Fix the inappropriate use of Common Coin in the Byzantine Agreement protocol
|
||||
|
||||
This bug is explained in https://github.com/amiller/HoneyBadgerBFT/issues/59
|
||||
where a solution is suggested introducing an additional type of message,
|
||||
CONF.
|
||||
|
||||
There may be alternative solutions, such as using a different Byzantine
|
||||
Agreement protocol altogether, for example,
|
||||
https://people.csail.mit.edu/silvio/Selected%20Scientific%20Papers/Distributed%20Computation/BYZANTYNE%20AGREEMENT%20MADE%20TRIVIAL.pdf
|
|
@ -41,12 +41,12 @@ message LemmaProto {
|
|||
bytes left_sibling_hash = 3;
|
||||
bytes right_sibling_hash = 4;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
message AgreementProto {
|
||||
uint32 epoch = 1;
|
||||
oneof payload {
|
||||
bool bval = 1;
|
||||
bool aux = 2;
|
||||
bool bval = 2;
|
||||
bool aux = 3;
|
||||
}
|
||||
}
|
297
src/agreement.rs
297
src/agreement.rs
|
@ -1,42 +1,295 @@
|
|||
//! Binary Byzantine agreement protocol from a common coin protocol.
|
||||
|
||||
use proto::AgreementMessage;
|
||||
use std::collections::{BTreeSet, VecDeque};
|
||||
use itertools::Itertools;
|
||||
use std::collections::{BTreeSet, HashMap, VecDeque};
|
||||
use std::hash::Hash;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Agreement {
|
||||
input: Option<bool>,
|
||||
_bin_values: BTreeSet<bool>,
|
||||
use proto::message;
|
||||
|
||||
/// Type of output from the Agreement message handler. The first component is
|
||||
/// the value on which the Agreement has decided, also called "output" in the
|
||||
/// HoneyadgerBFT paper. The second component is a queue of messages to be sent
|
||||
/// to remote nodes as a result of handling the incomming message.
|
||||
type AgreementOutput = (Option<bool>, VecDeque<AgreementMessage>);
|
||||
|
||||
/// Messages sent during the binary Byzantine agreement stage.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub enum AgreementMessage {
|
||||
/// BVAL message with an epoch.
|
||||
BVal((u32, bool)),
|
||||
/// AUX message with an epoch.
|
||||
Aux((u32, bool)),
|
||||
}
|
||||
|
||||
impl Agreement {
|
||||
pub fn new() -> Self {
|
||||
impl AgreementMessage {
|
||||
pub fn into_proto(self) -> message::AgreementProto {
|
||||
let mut p = message::AgreementProto::new();
|
||||
match self {
|
||||
AgreementMessage::BVal((e, b)) => {
|
||||
p.set_epoch(e);
|
||||
p.set_bval(b);
|
||||
}
|
||||
AgreementMessage::Aux((e, b)) => {
|
||||
p.set_epoch(e);
|
||||
p.set_aux(b);
|
||||
}
|
||||
}
|
||||
p
|
||||
}
|
||||
|
||||
// TODO: Re-enable lint once implemented.
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
|
||||
pub fn from_proto(mp: message::AgreementProto) -> Option<Self> {
|
||||
let epoch = mp.get_epoch();
|
||||
if mp.has_bval() {
|
||||
Some(AgreementMessage::BVal((epoch, mp.get_bval())))
|
||||
} else if mp.has_aux() {
|
||||
Some(AgreementMessage::Aux((epoch, mp.get_aux())))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Binary Agreement instance.
|
||||
pub struct Agreement<NodeUid> {
|
||||
/// The UID of the corresponding proposer node.
|
||||
uid: NodeUid,
|
||||
num_nodes: usize,
|
||||
num_faulty_nodes: usize,
|
||||
epoch: u32,
|
||||
/// Bin values. Reset on every epoch update.
|
||||
bin_values: BTreeSet<bool>,
|
||||
/// Values received in BVAL messages. Reset on every epoch update.
|
||||
received_bval: HashMap<NodeUid, BTreeSet<bool>>,
|
||||
/// Sent BVAL values. Reset on every epoch update.
|
||||
sent_bval: BTreeSet<bool>,
|
||||
/// Values received in AUX messages. Reset on every epoch update.
|
||||
received_aux: HashMap<NodeUid, bool>,
|
||||
/// The estimate of the decision value in the current epoch.
|
||||
estimated: Option<bool>,
|
||||
/// The value output by the agreement instance. It is set once to `Some(b)`
|
||||
/// and then never changed. That is, no instance of Binary Agreement can
|
||||
/// decide on two different values of output.
|
||||
output: Option<bool>,
|
||||
/// Termination flag. The Agreement instance doesn't terminate immediately
|
||||
/// upon deciding on the agreed value. This is done in order to help other
|
||||
/// nodes decide despite asynchrony of communication. Once the instance
|
||||
/// determines that all the remote nodes have reached agreement, it sets the
|
||||
/// `terminated` flag and accepts no more incoming messages.
|
||||
terminated: bool,
|
||||
}
|
||||
|
||||
impl<NodeUid: Clone + Eq + Hash> Agreement<NodeUid> {
|
||||
pub fn new(uid: NodeUid, num_nodes: usize) -> Self {
|
||||
let num_faulty_nodes = (num_nodes - 1) / 3;
|
||||
|
||||
Agreement {
|
||||
input: None,
|
||||
_bin_values: BTreeSet::new(),
|
||||
uid,
|
||||
num_nodes,
|
||||
num_faulty_nodes,
|
||||
epoch: 0,
|
||||
bin_values: BTreeSet::new(),
|
||||
received_bval: HashMap::new(),
|
||||
sent_bval: BTreeSet::new(),
|
||||
received_aux: HashMap::new(),
|
||||
estimated: None,
|
||||
output: None,
|
||||
terminated: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_input(&mut self, input: bool) -> AgreementMessage {
|
||||
self.input = Some(input);
|
||||
// Multicast BVAL
|
||||
AgreementMessage::BVal(input)
|
||||
/// Algorithm has terminated.
|
||||
pub fn terminated(&self) -> bool {
|
||||
self.terminated
|
||||
}
|
||||
|
||||
pub fn has_input(&self) -> bool {
|
||||
self.input.is_some()
|
||||
/// Sets the input value for agreement.
|
||||
pub fn set_input(&mut self, input: bool) -> Result<AgreementMessage, Error> {
|
||||
if self.epoch != 0 {
|
||||
return Err(Error::InputNotAccepted);
|
||||
}
|
||||
|
||||
// Set the initial estimated value to the input value.
|
||||
self.estimated = Some(input);
|
||||
// Receive the BVAL message locally.
|
||||
self.received_bval
|
||||
.entry(self.uid.clone())
|
||||
.or_insert_with(BTreeSet::new)
|
||||
.insert(input);
|
||||
// Multicast BVAL
|
||||
Ok(AgreementMessage::BVal((self.epoch, input)))
|
||||
}
|
||||
|
||||
/// Acceptance check to be performed before setting the input value.
|
||||
pub fn accepts_input(&self) -> bool {
|
||||
self.epoch == 0 && self.estimated.is_none()
|
||||
}
|
||||
|
||||
/// Receive input from a remote node.
|
||||
pub fn on_input(
|
||||
&self,
|
||||
_message: &AgreementMessage,
|
||||
) -> Result<VecDeque<AgreementMessage>, Error> {
|
||||
Err(Error::NotImplemented)
|
||||
///
|
||||
/// Outputs an optional agreement result and a queue of agreement messages
|
||||
/// to remote nodes. There can be up to 2 messages.
|
||||
pub fn handle_agreement_message(
|
||||
&mut self,
|
||||
sender_id: &NodeUid,
|
||||
message: &AgreementMessage,
|
||||
) -> Result<AgreementOutput, Error> {
|
||||
match *message {
|
||||
// The algorithm instance has already terminated.
|
||||
_ if self.terminated => Err(Error::Terminated),
|
||||
|
||||
AgreementMessage::BVal((epoch, b)) if epoch == self.epoch => {
|
||||
self.handle_bval(sender_id, b)
|
||||
}
|
||||
|
||||
AgreementMessage::Aux((epoch, b)) if epoch == self.epoch => {
|
||||
self.handle_aux(sender_id, b)
|
||||
}
|
||||
|
||||
// Epoch does not match. Ignore the message.
|
||||
_ => Ok((None, VecDeque::new())),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_bval(&mut self, sender_id: &NodeUid, b: bool) -> Result<AgreementOutput, Error> {
|
||||
let mut outgoing = VecDeque::new();
|
||||
|
||||
self.received_bval
|
||||
.entry(sender_id.clone())
|
||||
.or_insert_with(BTreeSet::new)
|
||||
.insert(b);
|
||||
let count_bval = self.received_bval
|
||||
.values()
|
||||
.filter(|values| values.contains(&b))
|
||||
.count();
|
||||
|
||||
// upon receiving BVAL_r(b) messages from 2f + 1 nodes,
|
||||
// bin_values_r := bin_values_r ∪ {b}
|
||||
if count_bval == 2 * self.num_faulty_nodes + 1 {
|
||||
self.bin_values.insert(b);
|
||||
|
||||
// wait until bin_values_r != 0, then multicast AUX_r(w)
|
||||
// where w ∈ bin_values_r
|
||||
if self.bin_values.len() == 1 {
|
||||
// Send an AUX message at most once per epoch.
|
||||
outgoing.push_back(AgreementMessage::Aux((self.epoch, b)));
|
||||
// Receive the AUX message locally.
|
||||
self.received_aux.insert(self.uid.clone(), b);
|
||||
}
|
||||
|
||||
let (decision, maybe_message) = self.try_coin();
|
||||
outgoing.extend(maybe_message);
|
||||
Ok((decision, outgoing))
|
||||
}
|
||||
// upon receiving BVAL_r(b) messages from f + 1 nodes, if
|
||||
// BVAL_r(b) has not been sent, multicast BVAL_r(b)
|
||||
else if count_bval == self.num_faulty_nodes + 1 && !self.sent_bval.contains(&b) {
|
||||
outgoing.push_back(AgreementMessage::BVal((self.epoch, b)));
|
||||
// Receive the BVAL message locally.
|
||||
self.received_bval
|
||||
.entry(self.uid.clone())
|
||||
.or_insert_with(BTreeSet::new)
|
||||
.insert(b);
|
||||
Ok((None, outgoing))
|
||||
} else {
|
||||
Ok((None, outgoing))
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result<AgreementOutput, Error> {
|
||||
self.received_aux.insert(sender_id.clone(), b);
|
||||
let mut outgoing = VecDeque::new();
|
||||
if !self.bin_values.is_empty() {
|
||||
let (decision, maybe_message) = self.try_coin();
|
||||
outgoing.extend(maybe_message);
|
||||
Ok((decision, outgoing))
|
||||
} else {
|
||||
Ok((None, outgoing))
|
||||
}
|
||||
}
|
||||
|
||||
/// AUX_r messages such that the set of values carried by those messages is
|
||||
/// a subset of bin_values_r. Outputs this subset.
|
||||
///
|
||||
/// FIXME: Clarify whether the values of AUX messages should be the same or
|
||||
/// not. It is assumed in `count_aux` that they can differ.
|
||||
///
|
||||
/// In general, we can't expect every good node to send the same AUX value,
|
||||
/// so waiting for N - f agreeing messages would not always terminate. We
|
||||
/// can, however, expect every good node to send an AUX value that will
|
||||
/// eventually end up in our bin_values.
|
||||
fn count_aux(&self) -> (usize, BTreeSet<bool>) {
|
||||
let (vals_cnt, vals) = self.received_aux
|
||||
.values()
|
||||
.filter(|b| self.bin_values.contains(b))
|
||||
.tee();
|
||||
|
||||
(vals_cnt.count(), vals.cloned().collect())
|
||||
}
|
||||
|
||||
/// Waits until at least (N − f) AUX_r messages have been received, such that
|
||||
/// the set of values carried by these messages, vals, are a subset of
|
||||
/// bin_values_r (note that bin_values_r may continue to change as BVAL_r
|
||||
/// messages are received, thus this condition may be triggered upon arrival
|
||||
/// of either an AUX_r or a BVAL_r message).
|
||||
///
|
||||
/// Once the (N - f) messages are received, gets a common coin and uses it
|
||||
/// to compute the next decision estimate and outputs the optional decision
|
||||
/// value. The function may start the next epoch. In that case, it also
|
||||
/// returns a message for broadcast.
|
||||
fn try_coin(&mut self) -> (Option<bool>, VecDeque<AgreementMessage>) {
|
||||
let (count_aux, vals) = self.count_aux();
|
||||
if count_aux < self.num_nodes - self.num_faulty_nodes {
|
||||
// Continue waiting for the (N - f) AUX messages.
|
||||
return (None, VecDeque::new());
|
||||
}
|
||||
|
||||
// FIXME: Implement the Common Coin algorithm. At the moment the
|
||||
// coin value is common across different nodes but not random.
|
||||
let coin = (self.epoch % 2) == 0;
|
||||
|
||||
// Check the termination condition: "continue looping until both a
|
||||
// value b is output in some round r, and the value Coin_r' = b for
|
||||
// some round r' > r."
|
||||
self.terminated = self.terminated || self.output == Some(coin);
|
||||
|
||||
// Start the next epoch.
|
||||
self.bin_values.clear();
|
||||
self.received_aux.clear();
|
||||
self.epoch += 1;
|
||||
|
||||
let decision = if vals.len() != 1 {
|
||||
self.estimated = Some(coin);
|
||||
None
|
||||
} else {
|
||||
// NOTE: `vals` has exactly one element due to `vals.len() == 1`
|
||||
let v: Vec<bool> = vals.into_iter().collect();
|
||||
let b = v[0];
|
||||
self.estimated = Some(b);
|
||||
// Outputting a value is allowed only once.
|
||||
if self.output.is_none() && b == coin {
|
||||
// Output the agreement value.
|
||||
self.output = Some(b);
|
||||
self.output
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
(
|
||||
decision,
|
||||
vec![AgreementMessage::BVal((
|
||||
self.epoch,
|
||||
self.estimated.unwrap(),
|
||||
))].into_iter()
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum Error {
|
||||
NotImplemented,
|
||||
Terminated,
|
||||
InputNotAccepted,
|
||||
}
|
||||
|
|
|
@ -8,28 +8,17 @@ use std::fmt::{Debug, Display};
|
|||
use std::hash::Hash;
|
||||
|
||||
use agreement;
|
||||
use agreement::Agreement;
|
||||
use agreement::{Agreement, AgreementMessage};
|
||||
|
||||
use broadcast;
|
||||
use broadcast::{Broadcast, BroadcastMessage, TargetedBroadcastMessage};
|
||||
|
||||
use proto::AgreementMessage;
|
||||
|
||||
// TODO: Make this a generic argument of `Broadcast`.
|
||||
type ProposedValue = Vec<u8>;
|
||||
|
||||
/// Input from a remote node to Common Subset.
|
||||
pub enum Input<NodeUid> {
|
||||
/// Message from a remote node `uid` to the broadcast instance `uid`.
|
||||
Broadcast(NodeUid, BroadcastMessage<ProposedValue>),
|
||||
/// Message from a remote node `uid` to the agreement instance `uid`.
|
||||
Agreement(NodeUid, AgreementMessage),
|
||||
}
|
||||
// Type of output from the Common Subset message handler.
|
||||
type CommonSubsetOutput<NodeUid> = (Option<HashSet<ProposedValue>>, VecDeque<Output<NodeUid>>);
|
||||
|
||||
/// Output from Common Subset to remote nodes.
|
||||
///
|
||||
/// FIXME: We can do an interface that doesn't need this type and instead works
|
||||
/// directly with the `TargetBroadcastMessage` and `AgreementMessage`.
|
||||
pub enum Output<NodeUid> {
|
||||
/// A broadcast message to be sent to the destination set in the
|
||||
/// `TargetedBroadcastMessage`.
|
||||
|
@ -43,15 +32,15 @@ pub struct CommonSubset<NodeUid: Eq + Hash + Ord> {
|
|||
uid: NodeUid,
|
||||
num_nodes: usize,
|
||||
num_faulty_nodes: usize,
|
||||
agreement_true_outputs: HashSet<NodeUid>,
|
||||
broadcast_instances: HashMap<NodeUid, Broadcast<NodeUid>>,
|
||||
agreement_instances: HashMap<NodeUid, Agreement>,
|
||||
agreement_instances: HashMap<NodeUid, Agreement<NodeUid>>,
|
||||
broadcast_results: HashMap<NodeUid, ProposedValue>,
|
||||
agreement_results: HashMap<NodeUid, bool>,
|
||||
}
|
||||
|
||||
impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
|
||||
pub fn new(uid: NodeUid, all_uids: &HashSet<NodeUid>, num_nodes: usize) -> Result<Self, Error> {
|
||||
pub fn new(uid: NodeUid, all_uids: &HashSet<NodeUid>) -> Result<Self, Error> {
|
||||
let num_nodes = all_uids.len();
|
||||
let num_faulty_nodes = (num_nodes - 1) / 3;
|
||||
|
||||
// Create all broadcast instances.
|
||||
|
@ -68,18 +57,17 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
|
|||
}
|
||||
|
||||
// Create all agreement instances.
|
||||
let mut agreement_instances: HashMap<NodeUid, Agreement> = HashMap::new();
|
||||
let mut agreement_instances: HashMap<NodeUid, Agreement<NodeUid>> = HashMap::new();
|
||||
for uid0 in all_uids {
|
||||
agreement_instances.insert(uid0.clone(), Agreement::new());
|
||||
agreement_instances.insert(uid0.clone(), Agreement::new(uid0.clone(), num_nodes));
|
||||
}
|
||||
|
||||
Ok(CommonSubset {
|
||||
uid,
|
||||
num_nodes,
|
||||
num_faulty_nodes,
|
||||
agreement_true_outputs: HashSet::new(),
|
||||
broadcast_instances,
|
||||
agreement_instances: HashMap::new(),
|
||||
agreement_instances,
|
||||
broadcast_results: HashMap::new(),
|
||||
agreement_results: HashMap::new(),
|
||||
})
|
||||
|
@ -105,13 +93,10 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
|
|||
|
||||
/// Upon delivery of v_j from RBC_j, if input has not yet been provided to
|
||||
/// BA_j, then provide input 1 to BA_j. See Figure 11.
|
||||
pub fn on_broadcast_result(
|
||||
&mut self,
|
||||
uid: &NodeUid,
|
||||
) -> Result<Option<AgreementMessage>, Error> {
|
||||
if let Some(agreement_instance) = self.agreement_instances.get_mut(uid) {
|
||||
if !agreement_instance.has_input() {
|
||||
Ok(Some(agreement_instance.set_input(true)))
|
||||
fn on_broadcast_result(&mut self, uid: &NodeUid) -> Result<Option<AgreementMessage>, Error> {
|
||||
if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) {
|
||||
if agreement_instance.accepts_input() {
|
||||
Ok(Some(agreement_instance.set_input(true)?))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
|
@ -120,84 +105,132 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Receive input from a remote node.
|
||||
pub fn on_input(
|
||||
/// Receives a broadcast message from a remote node `sender_id` concerning a
|
||||
/// value proposed by the node `proposer_id`. The output contains an
|
||||
/// optional result of the Common Subset algorithm - a set of proposed
|
||||
/// values - and a queue of messages to be sent to remote nodes, or an
|
||||
/// error.
|
||||
pub fn handle_broadcast(
|
||||
&mut self,
|
||||
message: Input<NodeUid>,
|
||||
) -> Result<VecDeque<Output<NodeUid>>, Error> {
|
||||
match message {
|
||||
Input::Broadcast(uid, bmessage) => {
|
||||
let mut instance_result = None;
|
||||
let input_result = {
|
||||
if let Some(broadcast_instance) = self.broadcast_instances.get(&uid) {
|
||||
broadcast_instance
|
||||
.handle_broadcast_message(&uid, bmessage)
|
||||
.map(|(value, queue)| {
|
||||
instance_result = value;
|
||||
queue.into_iter().map(Output::Broadcast).collect()
|
||||
})
|
||||
.map_err(Error::from)
|
||||
} else {
|
||||
Err(Error::NoSuchBroadcastInstance)
|
||||
}
|
||||
};
|
||||
if instance_result.is_some() {
|
||||
self.on_broadcast_result(&uid)?;
|
||||
}
|
||||
input_result
|
||||
sender_id: &NodeUid,
|
||||
proposer_id: &NodeUid,
|
||||
bmessage: BroadcastMessage<ProposedValue>,
|
||||
) -> Result<CommonSubsetOutput<NodeUid>, Error> {
|
||||
let mut instance_result = None;
|
||||
let input_result: Result<VecDeque<Output<NodeUid>>, Error> = {
|
||||
if let Some(broadcast_instance) = self.broadcast_instances.get(proposer_id) {
|
||||
broadcast_instance
|
||||
.handle_broadcast_message(sender_id, bmessage)
|
||||
.map(|(opt_value, queue)| {
|
||||
instance_result = opt_value;
|
||||
queue.into_iter().map(Output::Broadcast).collect()
|
||||
})
|
||||
.map_err(Error::from)
|
||||
} else {
|
||||
Err(Error::NoSuchBroadcastInstance)
|
||||
}
|
||||
Input::Agreement(_uid, _message) => {
|
||||
// FIXME: send the message to the Agreement instance and
|
||||
// conditionally call `on_agreement_output`
|
||||
};
|
||||
let mut opt_message: Option<AgreementMessage> = None;
|
||||
if let Some(value) = instance_result {
|
||||
self.broadcast_results.insert(proposer_id.clone(), value);
|
||||
opt_message = self.on_broadcast_result(proposer_id)?;
|
||||
}
|
||||
input_result.map(|mut queue| {
|
||||
if let Some(agreement_message) = opt_message {
|
||||
// Append the message to agreement nodes to the common output queue.
|
||||
queue.push_back(Output::Agreement(agreement_message))
|
||||
}
|
||||
(None, queue)
|
||||
})
|
||||
}
|
||||
|
||||
Err(Error::NotImplemented)
|
||||
/// Receives an agreement message from a remote node `sender_id` concerning
|
||||
/// a value proposed by the node `proposer_id`. The output contains an
|
||||
/// optional result of the Common Subset algorithm - a set of proposed
|
||||
/// values - and a queue of messages to be sent to remote nodes, or an
|
||||
/// error.
|
||||
pub fn handle_agreement(
|
||||
&mut self,
|
||||
sender_id: &NodeUid,
|
||||
proposer_id: &NodeUid,
|
||||
amessage: &AgreementMessage,
|
||||
) -> Result<CommonSubsetOutput<NodeUid>, Error> {
|
||||
// The result defaults to error.
|
||||
let mut result = Err(Error::NoSuchAgreementInstance);
|
||||
|
||||
// Send the message to the local instance of Agreement
|
||||
if let Some(agreement_instance) = self.agreement_instances.get_mut(proposer_id) {
|
||||
// Optional output of agreement and outgoing agreement
|
||||
// messages to remote nodes.
|
||||
result = if agreement_instance.terminated() {
|
||||
// This instance has terminated and does not accept input.
|
||||
Ok((None, VecDeque::new()))
|
||||
} else {
|
||||
// Send the message to the agreement instance.
|
||||
agreement_instance
|
||||
.handle_agreement_message(sender_id, &amessage)
|
||||
.map_err(Error::from)
|
||||
}
|
||||
}
|
||||
|
||||
if let Ok((output, mut outgoing)) = result {
|
||||
// Process Agreement outputs.
|
||||
if let Some(b) = output {
|
||||
outgoing.append(&mut self.on_agreement_result(proposer_id, b)?);
|
||||
}
|
||||
|
||||
// Check whether Agreement has completed.
|
||||
Ok((
|
||||
self.try_agreement_completion(),
|
||||
outgoing.into_iter().map(Output::Agreement).collect(),
|
||||
))
|
||||
} else {
|
||||
// error
|
||||
result
|
||||
.map(|(_, messages)| (None, messages.into_iter().map(Output::Agreement).collect()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Callback to be invoked on receipt of a returned value of the Agreement
|
||||
/// instance `uid`.
|
||||
///
|
||||
/// FIXME: It is likely that only one `AgreementMessage` is required because
|
||||
/// Figure 11 does not count the number of messages but the number of nodes
|
||||
/// that sent messages.
|
||||
fn on_agreement_result(&mut self, uid: NodeUid, result: bool) -> VecDeque<AgreementMessage> {
|
||||
fn on_agreement_result(
|
||||
&mut self,
|
||||
element_proposer_id: &NodeUid,
|
||||
result: bool,
|
||||
) -> Result<VecDeque<AgreementMessage>, Error> {
|
||||
let mut outgoing = VecDeque::new();
|
||||
// Upon delivery of value 1 from at least N − f instances of BA, provide
|
||||
// input 0 to each instance of BA that has not yet been provided input.
|
||||
if result {
|
||||
self.agreement_true_outputs.insert(uid);
|
||||
self.agreement_results
|
||||
.insert(element_proposer_id.clone(), result);
|
||||
// The number of instances of BA that output 1.
|
||||
let results1 = self.agreement_results.values().filter(|v| **v).count();
|
||||
|
||||
if self.agreement_true_outputs.len() >= self.num_nodes - self.num_faulty_nodes {
|
||||
let instances = &mut self.agreement_instances;
|
||||
for (_uid0, instance) in instances.iter_mut() {
|
||||
if !instance.has_input() {
|
||||
outgoing.push_back(instance.set_input(false));
|
||||
if results1 >= self.num_nodes - self.num_faulty_nodes {
|
||||
for instance in self.agreement_instances.values_mut() {
|
||||
if instance.accepts_input() {
|
||||
outgoing.push_back(instance.set_input(false)?);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
outgoing
|
||||
Ok(outgoing)
|
||||
}
|
||||
|
||||
pub fn on_agreement_completion(&self) -> Option<HashSet<ProposedValue>> {
|
||||
fn try_agreement_completion(&self) -> Option<HashSet<ProposedValue>> {
|
||||
// Once all instances of BA have completed, let C ⊂ [1..N] be
|
||||
// the indexes of each BA that delivered 1. Wait for the output
|
||||
// v_j for each RBC_j such that j∈C. Finally output ∪ j∈C v_j.
|
||||
let instance_uids: HashSet<NodeUid> = self.agreement_instances
|
||||
.iter()
|
||||
.map(|(k, _)| k.clone())
|
||||
.collect();
|
||||
let completed_uids: HashSet<NodeUid> = self.agreement_results
|
||||
.iter()
|
||||
.map(|(k, _)| k.clone())
|
||||
.collect();
|
||||
if instance_uids == completed_uids {
|
||||
// All instances of Agreement that delivered `true`.
|
||||
let delivered_1: HashSet<NodeUid> = self.agreement_results
|
||||
if self.agreement_instances
|
||||
.values()
|
||||
.all(|instance| instance.terminated())
|
||||
{
|
||||
// All instances of Agreement that delivered `true` (or "1" in the paper).
|
||||
let delivered_1: HashSet<&NodeUid> = self.agreement_results
|
||||
.iter()
|
||||
.filter(|(_, v)| **v)
|
||||
.map(|(k, _)| k.clone())
|
||||
.map(|(k, _)| k)
|
||||
.collect();
|
||||
// Results of Broadcast instances in `delivered_1`
|
||||
let broadcast_results: HashSet<ProposedValue> = self.broadcast_results
|
||||
|
@ -222,6 +255,7 @@ pub enum Error {
|
|||
UnexpectedMessage,
|
||||
NotImplemented,
|
||||
NoSuchBroadcastInstance,
|
||||
NoSuchAgreementInstance,
|
||||
Broadcast(broadcast::Error),
|
||||
Agreement(agreement::Error),
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
extern crate log;
|
||||
extern crate crossbeam;
|
||||
extern crate crossbeam_channel;
|
||||
extern crate itertools;
|
||||
extern crate merkle;
|
||||
extern crate protobuf;
|
||||
extern crate reed_solomon_erasure;
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
//! Construction of messages from protobuf buffers.
|
||||
pub mod message;
|
||||
|
||||
use agreement::AgreementMessage;
|
||||
use broadcast::BroadcastMessage;
|
||||
use merkle::proof::{Lemma, Positioned, Proof};
|
||||
use proto::message::*;
|
||||
|
@ -66,13 +67,6 @@ impl<'a, T: AsRef<[u8]>> fmt::Debug for HexProof<'a, T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Messages sent during the binary Byzantine agreement stage.
|
||||
#[derive(Copy, Clone, Debug, PartialEq)]
|
||||
pub enum AgreementMessage {
|
||||
BVal(bool),
|
||||
Aux(bool),
|
||||
}
|
||||
|
||||
impl<T: Send + Sync + AsRef<[u8]> + From<Vec<u8>>> Message<T> {
|
||||
/// Translation from protobuf to the regular type.
|
||||
///
|
||||
|
@ -171,29 +165,6 @@ impl<T: Send + Sync + AsRef<[u8]> + From<Vec<u8>>> BroadcastMessage<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl AgreementMessage {
|
||||
pub fn into_proto(self) -> AgreementProto {
|
||||
let mut p = AgreementProto::new();
|
||||
match self {
|
||||
AgreementMessage::BVal(b) => p.set_bval(b),
|
||||
AgreementMessage::Aux(b) => p.set_aux(b),
|
||||
}
|
||||
p
|
||||
}
|
||||
|
||||
// TODO: Re-enable lint once implemented.
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
|
||||
pub fn from_proto(mp: AgreementProto) -> Option<Self> {
|
||||
if mp.has_bval() {
|
||||
Some(AgreementMessage::BVal(mp.get_bval()))
|
||||
} else if mp.has_aux() {
|
||||
Some(AgreementMessage::Aux(mp.get_aux()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Serialisation of `Proof` defined against its protobuf interface to work
|
||||
/// around the restriction of not being allowed to extend the implementation of
|
||||
/// `Proof` outside its crate.
|
||||
|
|
Loading…
Reference in New Issue