changed code according to review comments

This commit is contained in:
Vladimir Komendantskiy 2018-05-08 17:25:57 +01:00
parent 2205f9083e
commit 394462c88b
5 changed files with 242 additions and 214 deletions

View File

@ -9,10 +9,10 @@ log = "0.4.1"
reed-solomon-erasure = "3.0"
merkle = { git = "https://github.com/vkomenda/merkle.rs", branch = "public-proof" }
ring = "^0.12"
rand = "*"
protobuf = "1.4.4"
crossbeam = "0.3.2"
crossbeam-channel = "0.1"
rand = "0.3"
[build-dependencies]
protoc-rust = "1.4.4"

View File

@ -1,9 +1,51 @@
//! Binary Byzantine agreement protocol from a common coin protocol.
use rand::random;
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::hash::Hash;
use proto::AgreementMessage;
use proto::message;
type AgreementOutput = (Option<bool>, VecDeque<AgreementMessage>);
/// Messages sent during the binary Byzantine agreement stage.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum AgreementMessage {
/// BVAL message with an epoch.
BVal((u32, bool)),
/// AUX message with an epoch.
Aux((u32, bool)),
}
impl AgreementMessage {
pub fn into_proto(self) -> message::AgreementProto {
let mut p = message::AgreementProto::new();
match self {
AgreementMessage::BVal((e, b)) => {
p.set_epoch(e);
p.set_bval(b);
}
AgreementMessage::Aux((e, b)) => {
p.set_epoch(e);
p.set_aux(b);
}
}
p
}
// TODO: Re-enable lint once implemented.
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
pub fn from_proto(mp: message::AgreementProto) -> Option<Self> {
let epoch = mp.get_epoch();
if mp.has_bval() {
Some(AgreementMessage::BVal((epoch, mp.get_bval())))
} else if mp.has_aux() {
Some(AgreementMessage::Aux((epoch, mp.get_aux())))
} else {
None
}
}
}
pub struct Agreement<NodeUid> {
/// The UID of the corresponding node.
@ -21,7 +63,7 @@ pub struct Agreement<NodeUid> {
/// Values received in AUX messages. Reset on every epoch update.
received_aux: HashMap<NodeUid, BTreeSet<bool>>,
/// All the output values in all epochs.
outputs: BTreeMap<u32, bool>,
estimated: BTreeMap<u32, bool>,
/// Termination flag.
terminated: bool,
}
@ -40,7 +82,7 @@ impl<NodeUid: Clone + Eq + Hash> Agreement<NodeUid> {
received_bval: HashMap::new(),
sent_bval: BTreeSet::new(),
received_aux: HashMap::new(),
outputs: BTreeMap::new(),
estimated: BTreeMap::new(),
terminated: false,
}
}
@ -53,7 +95,10 @@ impl<NodeUid: Clone + Eq + Hash> Agreement<NodeUid> {
pub fn set_input(&mut self, input: bool) -> AgreementMessage {
self.input = Some(input);
// Receive the BVAL message locally.
update_map_of_sets(&mut self.received_bval, self.uid.clone(), input);
self.received_bval
.entry(self.uid.clone())
.or_insert_with(BTreeSet::new)
.insert(input);
// Multicast BVAL
AgreementMessage::BVal((self.epoch, input))
}
@ -70,71 +115,85 @@ impl<NodeUid: Clone + Eq + Hash> Agreement<NodeUid> {
&mut self,
uid: NodeUid,
message: &AgreementMessage,
) -> Result<(Option<bool>, VecDeque<AgreementMessage>), Error> {
) -> Result<AgreementOutput, Error> {
match *message {
// The algorithm instance has already terminated.
_ if self.terminated => Err(Error::Terminated),
AgreementMessage::BVal((epoch, b)) if epoch == self.epoch => self.on_bval(uid, b),
AgreementMessage::Aux((epoch, b)) if epoch == self.epoch => self.on_aux(uid, b),
// Epoch does not match. Ignore the message.
_ => Ok((None, VecDeque::new())),
}
}
fn on_bval(&mut self, uid: NodeUid, b: bool) -> Result<AgreementOutput, Error> {
let mut outgoing = VecDeque::new();
match *message {
_ if self.terminated => {
// The algorithm instance has already terminated.
Err(Error::Terminated)
self.received_bval
.entry(uid)
.or_insert_with(BTreeSet::new)
.insert(b);
let count_bval = self.received_bval
.values()
.filter(|values| values.contains(&b))
.count();
// upon receiving BVAL_r(b) messages from 2f + 1 nodes,
// bin_values_r := bin_values_r {b}
if count_bval == 2 * self.num_faulty_nodes + 1 {
self.bin_values.insert(b);
// wait until bin_values_r != 0, then multicast AUX_r(w)
// where w ∈ bin_values_r
if self.bin_values.len() == 1 {
// Send an AUX message at most once per epoch.
outgoing.push_back(AgreementMessage::Aux((self.epoch, b)));
// Receive the AUX message locally.
self.received_aux
.entry(self.uid.clone())
.or_insert_with(BTreeSet::new)
.insert(b);
}
AgreementMessage::BVal((epoch, b)) if epoch == self.epoch => {
update_map_of_sets(&mut self.received_bval, uid, b);
let count_bval = self.received_bval.iter().fold(0, |count, (_, values)| {
if values.contains(&b) {
count + 1
} else {
count
}
});
// upon receiving BVAL_r(b) messages from 2f + 1 nodes,
// bin_values_r := bin_values_r {b}
if count_bval == 2 * self.num_faulty_nodes + 1 {
self.bin_values.insert(b);
// wait until bin_values_r /= 0, then multicast AUX_r(w)
// where w ∈ bin_values_r
outgoing.push_back(AgreementMessage::Aux((self.epoch, b)));
// Receive the AUX message locally.
update_map_of_sets(&mut self.received_aux, self.uid.clone(), b);
let coin_result = self.try_coin();
if let Some(output_message) = coin_result.1 {
outgoing.push_back(output_message);
}
Ok((coin_result.0, outgoing))
}
// upon receiving BVAL_r(b) messages from f + 1 nodes, if
// BVAL_r(b) has not been sent, multicast BVAL_r(b)
else if count_bval == self.num_faulty_nodes + 1 && !self.sent_bval.contains(&b) {
outgoing.push_back(AgreementMessage::BVal((self.epoch, b)));
// Receive the BVAL message locally.
update_map_of_sets(&mut self.received_bval, self.uid.clone(), b);
Ok((None, outgoing))
} else {
Ok((None, outgoing))
}
let coin_result = self.try_coin();
if let Some(output_message) = coin_result.1 {
outgoing.push_back(output_message);
}
Ok((coin_result.0, outgoing))
}
// upon receiving BVAL_r(b) messages from f + 1 nodes, if
// BVAL_r(b) has not been sent, multicast BVAL_r(b)
else if count_bval == self.num_faulty_nodes + 1 && !self.sent_bval.contains(&b) {
outgoing.push_back(AgreementMessage::BVal((self.epoch, b)));
// Receive the BVAL message locally.
self.received_bval
.entry(self.uid.clone())
.or_insert_with(BTreeSet::new)
.insert(b);
Ok((None, outgoing))
} else {
Ok((None, outgoing))
}
}
AgreementMessage::Aux((epoch, b)) if epoch == self.epoch => {
update_map_of_sets(&mut self.received_aux, uid, b);
if !self.bin_values.is_empty() {
let coin_result = self.try_coin();
if let Some(output_message) = coin_result.1 {
outgoing.push_back(output_message);
}
Ok((coin_result.0, outgoing))
} else {
Ok((None, outgoing))
}
}
fn on_aux(&mut self, uid: NodeUid, b: bool) -> Result<AgreementOutput, Error> {
let mut outgoing = VecDeque::new();
_ => {
// Epoch does not match. Ignore the message.
Ok((None, outgoing))
self.received_aux
.entry(uid)
.or_insert_with(BTreeSet::new)
.insert(b);
if !self.bin_values.is_empty() {
let coin_result = self.try_coin();
if let Some(output_message) = coin_result.1 {
outgoing.push_back(output_message);
}
Ok((coin_result.0, outgoing))
} else {
Ok((None, outgoing))
}
}
@ -143,47 +202,54 @@ impl<NodeUid: Clone + Eq + Hash> Agreement<NodeUid> {
///
/// FIXME: Clarify whether the values of AUX messages should be the same or
/// not. It is assumed in `count_aux` that they can differ.
///
/// In general, we can't expect every good node to send the same AUX value,
/// so waiting for N - f agreeing messages would not always terminate. We
/// can, however, expect every good node to send an AUX value that will
/// eventually end up in our bin_values.
fn count_aux(&self) -> (usize, BTreeSet<bool>) {
let vals = BTreeSet::new();
(
self.received_aux.iter().fold(0, |count, (_, values)| {
if values.is_subset(&self.bin_values) {
vals.union(values);
count + 1
} else {
count
}
}),
self.received_aux
.values()
.filter(|values| values.is_subset(&self.bin_values))
.map(|values| vals.union(values))
.count(),
vals,
)
}
/// Wait until at least (N f) AUX_r messages have been received, such that
/// Waits until at least (N f) AUX_r messages have been received, such that
/// the set of values carried by these messages, vals, are a subset of
/// bin_values_r (note that bin_values_r may continue to change as BVAL_r
/// messages are received, thus this condition may be triggered upon arrival
/// of either an AUX_r or a BVAL_r message).
///
/// `try_coin` output an optional combination of the agreement value and the
/// agreement broadcast message.
/// `try_coin` outputs an optional combination of the agreement value and
/// the agreement broadcast message.
fn try_coin(&mut self) -> (Option<bool>, Option<AgreementMessage>) {
let (count_aux, vals) = self.count_aux();
if count_aux >= self.num_nodes - self.num_faulty_nodes {
if count_aux < self.num_nodes - self.num_faulty_nodes {
// Continue waiting for the (N - f) AUX messages.
(None, None)
} else {
// FIXME: Implement the Common Coin algorithm. At the moment the
// coin value is constant `true`.
let coin: u64 = 1;
let coin2 = coin % 2 != 0;
// coin value is random and local to each instance of Agreement.
let coin2 = random::<bool>();
// Check the termination condition: "continue looping until both a
// value b is output in some round r, and the value Coin_r' = b for
// some round r' > r."
self.terminated = self.terminated || self.outputs.values().any(|b| *b == coin2);
self.terminated = self.terminated || self.estimated.values().any(|b| *b == coin2);
// Prepare to start the next epoch.
self.bin_values.clear();
if vals.len() == 1 {
if vals.len() != 1 {
// Start the next epoch.
self.epoch += 1;
(None, Some(self.set_input(coin2)))
} else {
let mut message = None;
// NOTE: `vals` has exactly one element due to `vals.len() == 1`
let output: Vec<Option<bool>> = vals.into_iter()
@ -193,7 +259,7 @@ impl<NodeUid: Clone + Eq + Hash> Agreement<NodeUid> {
if b == coin2 {
// Record the output to perform a termination check later.
self.outputs.insert(self.epoch, b);
self.estimated.insert(self.epoch, b);
// Output the agreement value.
Some(b)
} else {
@ -205,35 +271,11 @@ impl<NodeUid: Clone + Eq + Hash> Agreement<NodeUid> {
// Start the next epoch.
self.epoch += 1;
(output[0], message)
} else {
// Start the next epoch.
self.epoch += 1;
(None, Some(self.set_input(coin2)))
}
} else {
// Continue waiting for the (N - f) AUX messages.
(None, None)
}
}
}
// Insert an element into a hash map of sets of values of type `Elt`.
fn update_map_of_sets<Key, Elt>(map: &mut HashMap<Key, BTreeSet<Elt>>, key: Key, elt: Elt)
where
Key: Eq + Hash,
Elt: Copy + Ord,
{
map.entry(key)
.and_modify(|values| {
values.insert(elt);
})
.or_insert({
let mut values = BTreeSet::new();
values.insert(elt);
values
});
}
#[derive(Clone, Debug)]
pub enum Error {
Terminated,

View File

@ -8,13 +8,11 @@ use std::fmt::{Debug, Display};
use std::hash::Hash;
use agreement;
use agreement::Agreement;
use agreement::{Agreement, AgreementMessage};
use broadcast;
use broadcast::{Broadcast, BroadcastMessage, TargetedBroadcastMessage};
use proto::AgreementMessage;
// TODO: Make this a generic argument of `Broadcast`.
type ProposedValue = Vec<u8>;
// Type of output from the Common Subset message handler.
@ -24,7 +22,7 @@ type CommonSubsetOutput<NodeUid> = (Option<HashSet<ProposedValue>>, VecDeque<Out
pub enum Input<NodeUid> {
/// Message from a remote node `uid` to the broadcast instance `uid`.
Broadcast(NodeUid, BroadcastMessage<ProposedValue>),
/// Message from a remote node `uid` to the agreement instance `uid`.
/// Message from a remote node `uid` to all agreement instances.
Agreement(NodeUid, AgreementMessage),
}
@ -45,8 +43,6 @@ pub struct CommonSubset<NodeUid: Eq + Hash + Ord> {
broadcast_instances: HashMap<NodeUid, Broadcast<NodeUid>>,
agreement_instances: HashMap<NodeUid, Agreement<NodeUid>>,
broadcast_results: HashMap<NodeUid, ProposedValue>,
/// FIXME: The result may be a set of bool rather than a single bool due to
/// the ability of Agreement to output multiple values.
agreement_results: HashMap<NodeUid, bool>,
}
@ -125,68 +121,98 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
message: Input<NodeUid>,
) -> Result<CommonSubsetOutput<NodeUid>, Error> {
match message {
Input::Broadcast(uid, bmessage) => {
let mut instance_result = None;
let input_result: Result<VecDeque<Output<NodeUid>>, Error> = {
if let Some(broadcast_instance) = self.broadcast_instances.get(&uid) {
broadcast_instance
.handle_broadcast_message(&uid, bmessage)
.map(|(opt_value, queue)| {
instance_result = opt_value;
queue.into_iter().map(Output::Broadcast).collect()
})
.map_err(Error::from)
} else {
Err(Error::NoSuchBroadcastInstance)
}
};
let mut opt_message: Option<AgreementMessage> = None;
if let Some(value) = instance_result {
self.broadcast_results.insert(uid.clone(), value);
opt_message = self.on_broadcast_result(&uid)?;
}
input_result.map(|mut queue| {
if let Some(agreement_message) = opt_message {
// Append the message to agreement nodes to the common output queue.
queue.push_back(Output::Agreement(agreement_message))
}
(None, queue)
})
}
Input::Broadcast(uid, bmessage) => self.on_input_broadcast(&uid, bmessage),
Input::Agreement(uid, amessage) => {
// The result defaults to error.
let mut result = Err(Error::NoSuchAgreementInstance);
Input::Agreement(uid, amessage) => self.on_input_agreement(&uid, &amessage),
}
}
if let Some(mut agreement_instance) = self.agreement_instances.get_mut(&uid) {
// Optional output of agreement and outgoing agreement
// messages to remote nodes.
result = if agreement_instance.terminated() {
// This instance has terminated and does not accept input.
Ok((None, VecDeque::new()))
} else {
// Send the message to the agreement instance.
agreement_instance
.on_input(uid.clone(), &amessage)
.map_err(Error::from)
}
}
if let Ok((output, mut outgoing)) = result {
if let Some(b) = output {
outgoing.append(&mut self.on_agreement_result(uid, b));
}
Ok((
self.try_agreement_completion(),
outgoing.into_iter().map(Output::Agreement).collect(),
))
} else {
// error
result.map(|(_, messages)| {
(None, messages.into_iter().map(Output::Agreement).collect())
fn on_input_broadcast(
&mut self,
uid: &NodeUid,
bmessage: BroadcastMessage<ProposedValue>,
) -> Result<CommonSubsetOutput<NodeUid>, Error> {
let mut instance_result = None;
let input_result: Result<VecDeque<Output<NodeUid>>, Error> = {
if let Some(broadcast_instance) = self.broadcast_instances.get(&uid) {
broadcast_instance
.handle_broadcast_message(&uid, bmessage)
.map(|(opt_value, queue)| {
instance_result = opt_value;
queue.into_iter().map(Output::Broadcast).collect()
})
}
.map_err(Error::from)
} else {
Err(Error::NoSuchBroadcastInstance)
}
};
let mut opt_message: Option<AgreementMessage> = None;
if let Some(value) = instance_result {
self.broadcast_results.insert(uid.clone(), value);
opt_message = self.on_broadcast_result(&uid)?;
}
input_result.map(|mut queue| {
if let Some(agreement_message) = opt_message {
// Append the message to agreement nodes to the common output queue.
queue.push_back(Output::Agreement(agreement_message))
}
(None, queue)
})
}
fn on_input_agreement(
&mut self,
uid: &NodeUid,
amessage: &AgreementMessage,
) -> Result<CommonSubsetOutput<NodeUid>, Error> {
// Send the message to all local instances of Agreement
let on_input_result: Result<
(HashMap<NodeUid, bool>, VecDeque<AgreementMessage>),
Error,
> = self.agreement_instances.iter_mut().fold(
Ok((HashMap::new(), VecDeque::new())),
|accum, (instance_uid, instance)| {
match accum {
Err(_) => accum,
Ok((mut outputs, mut outgoing)) => {
// Optional output of agreement and outgoing
// agreement messages to remote nodes.
if instance.terminated() {
// This instance has terminated and does not accept input.
Ok((outputs, outgoing))
} else {
// Send the message to the agreement instance.
instance
.on_input(uid.clone(), &amessage)
.map_err(Error::from)
.map(|(output, mut messages)| {
if let Some(b) = output {
outputs.insert(instance_uid.clone(), b);
}
outgoing.append(&mut messages);
(outputs, outgoing)
})
}
}
}
},
);
if let Ok((outputs, mut outgoing)) = on_input_result {
// Process Agreement outputs.
outputs.iter().map(|(output_uid, &output_value)| {
outgoing.append(&mut self.on_agreement_result(output_uid.clone(), output_value));
});
// Check whether Agreement has completed.
Ok((
self.try_agreement_completion(),
outgoing.into_iter().map(Output::Agreement).collect(),
))
} else {
// error
on_input_result
.map(|(_, messages)| (None, messages.into_iter().map(Output::Agreement).collect()))
}
}
@ -199,10 +225,7 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
if result {
self.agreement_results.insert(uid, result);
// The number of instances of BA that output 1.
let results1: usize =
self.agreement_results
.iter()
.fold(0, |count, (_, v)| if *v { count + 1 } else { count });
let results1 = self.agreement_results.values().filter(|v| **v).count();
if results1 >= self.num_nodes - self.num_faulty_nodes {
for instance in self.agreement_instances.values_mut() {
@ -220,8 +243,8 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
// the indexes of each BA that delivered 1. Wait for the output
// v_j for each RBC_j such that j∈C. Finally output j∈C v_j.
if self.agreement_instances
.iter()
.all(|(_, instance)| instance.terminated())
.values()
.all(|instance| instance.terminated())
{
// All instances of Agreement that delivered `true` (or "1" in the paper).
let delivered_1: HashSet<&NodeUid> = self.agreement_results

View File

@ -10,6 +10,7 @@ extern crate crossbeam;
extern crate crossbeam_channel;
extern crate merkle;
extern crate protobuf;
extern crate rand;
extern crate reed_solomon_erasure;
extern crate ring;

View File

@ -1,6 +1,7 @@
//! Construction of messages from protobuf buffers.
pub mod message;
use agreement::AgreementMessage;
use broadcast::BroadcastMessage;
use merkle::proof::{Lemma, Positioned, Proof};
use proto::message::*;
@ -66,15 +67,6 @@ impl<'a, T: AsRef<[u8]>> fmt::Debug for HexProof<'a, T> {
}
}
/// Messages sent during the binary Byzantine agreement stage.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum AgreementMessage {
/// BVAL message with an epoch.
BVal((u32, bool)),
/// AUX message with an epoch.
Aux((u32, bool)),
}
impl<T: Send + Sync + AsRef<[u8]> + From<Vec<u8>>> Message<T> {
/// Translation from protobuf to the regular type.
///
@ -173,36 +165,6 @@ impl<T: Send + Sync + AsRef<[u8]> + From<Vec<u8>>> BroadcastMessage<T> {
}
}
impl AgreementMessage {
pub fn into_proto(self) -> AgreementProto {
let mut p = AgreementProto::new();
match self {
AgreementMessage::BVal((e, b)) => {
p.set_epoch(e);
p.set_bval(b);
}
AgreementMessage::Aux((e, b)) => {
p.set_epoch(e);
p.set_aux(b);
}
}
p
}
// TODO: Re-enable lint once implemented.
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
pub fn from_proto(mp: AgreementProto) -> Option<Self> {
let epoch = mp.get_epoch();
if mp.has_bval() {
Some(AgreementMessage::BVal((epoch, mp.get_bval())))
} else if mp.has_aux() {
Some(AgreementMessage::Aux((epoch, mp.get_aux())))
} else {
None
}
}
}
/// Serialisation of `Proof` defined against its protobuf interface to work
/// around the restriction of not being allowed to extend the implementation of
/// `Proof` outside its crate.