mirror of https://github.com/poanetwork/hbbft.git
Merge pull request #182 from poanetwork/afck-agreement
Extract SBV broadcast from agreement.
This commit is contained in:
commit
4b854568f6
|
@ -0,0 +1,67 @@
|
||||||
|
use std::collections::{btree_set, BTreeSet};
|
||||||
|
use std::ops::{Index, IndexMut};
|
||||||
|
|
||||||
|
/// A map from `bool` to `BTreeSet<N>`.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct BoolMultimap<N>([BTreeSet<N>; 2]);
|
||||||
|
|
||||||
|
impl<N: Ord> Default for BoolMultimap<N> {
|
||||||
|
fn default() -> Self {
|
||||||
|
BoolMultimap([BTreeSet::default(), BTreeSet::default()])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<N: Ord> Index<bool> for BoolMultimap<N> {
|
||||||
|
type Output = BTreeSet<N>;
|
||||||
|
|
||||||
|
fn index(&self, index: bool) -> &BTreeSet<N> {
|
||||||
|
&self.0[if index { 1 } else { 0 }]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<N: Ord> IndexMut<bool> for BoolMultimap<N> {
|
||||||
|
fn index_mut(&mut self, index: bool) -> &mut BTreeSet<N> {
|
||||||
|
&mut self.0[if index { 1 } else { 0 }]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, N: Ord> IntoIterator for &'a BoolMultimap<N> {
|
||||||
|
type Item = (bool, &'a N);
|
||||||
|
type IntoIter = Iter<'a, N>;
|
||||||
|
|
||||||
|
fn into_iter(self) -> Iter<'a, N> {
|
||||||
|
Iter::new(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Iter<'a, N: 'a> {
|
||||||
|
key: bool,
|
||||||
|
set_iter: btree_set::Iter<'a, N>,
|
||||||
|
map: &'a BoolMultimap<N>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, N: 'a + Ord> Iter<'a, N> {
|
||||||
|
fn new(map: &'a BoolMultimap<N>) -> Self {
|
||||||
|
Iter {
|
||||||
|
key: false,
|
||||||
|
set_iter: map[false].iter(),
|
||||||
|
map,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, N: 'a + Ord> Iterator for Iter<'a, N> {
|
||||||
|
type Item = (bool, &'a N);
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<(bool, &'a N)> {
|
||||||
|
if let Some(n) = self.set_iter.next() {
|
||||||
|
Some((self.key, n))
|
||||||
|
} else if self.key {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
self.key = true;
|
||||||
|
self.set_iter = self.map[true].iter();
|
||||||
|
self.next()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -63,18 +63,21 @@
|
||||||
//! * After _f + 1_ nodes have sent us their coin shares, we receive the coin output and assign it
|
//! * After _f + 1_ nodes have sent us their coin shares, we receive the coin output and assign it
|
||||||
//! to `s`.
|
//! to `s`.
|
||||||
|
|
||||||
|
mod bool_multimap;
|
||||||
pub mod bool_set;
|
pub mod bool_set;
|
||||||
|
mod sbv_broadcast;
|
||||||
|
|
||||||
use rand;
|
use rand;
|
||||||
use std::collections::{BTreeMap, BTreeSet};
|
use std::collections::BTreeMap;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
|
|
||||||
|
use self::bool_multimap::BoolMultimap;
|
||||||
|
use self::sbv_broadcast::SbvBroadcast;
|
||||||
use agreement::bool_set::BoolSet;
|
use agreement::bool_set::BoolSet;
|
||||||
use common_coin::{self, CommonCoin, CommonCoinMessage};
|
use common_coin::{self, CommonCoin, CommonCoinMessage};
|
||||||
use fault_log::{Fault, FaultKind};
|
|
||||||
use messaging::{self, DistAlgorithm, NetworkInfo, Target};
|
use messaging::{self, DistAlgorithm, NetworkInfo, Target};
|
||||||
|
|
||||||
/// An agreement error.
|
/// An agreement error.
|
||||||
|
@ -95,10 +98,8 @@ pub type Result<T> = ::std::result::Result<T, Error>;
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||||
pub enum AgreementContent {
|
pub enum AgreementContent {
|
||||||
/// `BVal` message.
|
/// Synchronized Binary Value Broadcast message.
|
||||||
BVal(bool),
|
SbvBroadcast(sbv_broadcast::Message),
|
||||||
/// `Aux` message.
|
|
||||||
Aux(bool),
|
|
||||||
/// `Conf` message.
|
/// `Conf` message.
|
||||||
Conf(BoolSet),
|
Conf(BoolSet),
|
||||||
/// `Term` message.
|
/// `Term` message.
|
||||||
|
@ -137,13 +138,10 @@ pub struct AgreementMessage {
|
||||||
// with no replacement in sight.
|
// with no replacement in sight.
|
||||||
impl rand::Rand for AgreementContent {
|
impl rand::Rand for AgreementContent {
|
||||||
fn rand<R: rand::Rng>(rng: &mut R) -> Self {
|
fn rand<R: rand::Rng>(rng: &mut R) -> Self {
|
||||||
let message_type = *rng
|
let message_type = *rng.choose(&["sbvb", "conf", "term", "coin"]).unwrap();
|
||||||
.choose(&["bval", "aux", "conf", "term", "coin"])
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
match message_type {
|
match message_type {
|
||||||
"bval" => AgreementContent::BVal(rand::random()),
|
"sbvb" => AgreementContent::SbvBroadcast(rand::random()),
|
||||||
"aux" => AgreementContent::Aux(rand::random()),
|
|
||||||
"conf" => AgreementContent::Conf(rand::random()),
|
"conf" => AgreementContent::Conf(rand::random()),
|
||||||
"term" => AgreementContent::Term(rand::random()),
|
"term" => AgreementContent::Term(rand::random()),
|
||||||
"coin" => AgreementContent::Coin(Box::new(rand::random())),
|
"coin" => AgreementContent::Coin(Box::new(rand::random())),
|
||||||
|
@ -189,19 +187,13 @@ pub struct Agreement<NodeUid> {
|
||||||
proposer_id: NodeUid,
|
proposer_id: NodeUid,
|
||||||
/// Agreement algorithm epoch.
|
/// Agreement algorithm epoch.
|
||||||
epoch: u32,
|
epoch: u32,
|
||||||
/// Bin values. Reset on every epoch update.
|
/// This epoch's Synchronized Binary Value Broadcast instance.
|
||||||
bin_values: BoolSet,
|
sbv_broadcast: SbvBroadcast<NodeUid>,
|
||||||
/// Values received in `BVal` messages. Reset on every epoch update.
|
|
||||||
received_bval: BTreeMap<bool, BTreeSet<NodeUid>>,
|
|
||||||
/// Sent `BVal` values. Reset on every epoch update.
|
|
||||||
sent_bval: BoolSet,
|
|
||||||
/// Values received in `Aux` messages. Reset on every epoch update.
|
|
||||||
received_aux: BTreeMap<bool, BTreeSet<NodeUid>>,
|
|
||||||
/// Received `Conf` messages. Reset on every epoch update.
|
/// Received `Conf` messages. Reset on every epoch update.
|
||||||
received_conf: BTreeMap<NodeUid, BoolSet>,
|
received_conf: BTreeMap<NodeUid, BoolSet>,
|
||||||
/// Received `Term` messages. Kept throughout epoch updates. These count as `BVal`, `Aux` and
|
/// Received `Term` messages. Kept throughout epoch updates. These count as `BVal`, `Aux` and
|
||||||
/// `Conf` messages for all future epochs.
|
/// `Conf` messages for all future epochs.
|
||||||
received_term: BTreeMap<bool, BTreeSet<NodeUid>>,
|
received_term: BoolMultimap<NodeUid>,
|
||||||
/// The estimate of the decision value in the current epoch.
|
/// The estimate of the decision value in the current epoch.
|
||||||
estimated: Option<bool>,
|
estimated: Option<bool>,
|
||||||
/// A permanent, latching copy of the output value. This copy is required because `output` can
|
/// A permanent, latching copy of the output value. This copy is required because `output` can
|
||||||
|
@ -271,16 +263,13 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
||||||
return Err(Error::UnknownProposer);
|
return Err(Error::UnknownProposer);
|
||||||
}
|
}
|
||||||
Ok(Agreement {
|
Ok(Agreement {
|
||||||
netinfo,
|
netinfo: netinfo.clone(),
|
||||||
session_id,
|
session_id,
|
||||||
proposer_id,
|
proposer_id,
|
||||||
epoch: 0,
|
epoch: 0,
|
||||||
bin_values: bool_set::NONE,
|
sbv_broadcast: SbvBroadcast::new(netinfo),
|
||||||
received_bval: BTreeMap::new(),
|
|
||||||
sent_bval: bool_set::NONE,
|
|
||||||
received_aux: BTreeMap::new(),
|
|
||||||
received_conf: BTreeMap::new(),
|
received_conf: BTreeMap::new(),
|
||||||
received_term: BTreeMap::new(),
|
received_term: BoolMultimap::default(),
|
||||||
estimated: None,
|
estimated: None,
|
||||||
decision: None,
|
decision: None,
|
||||||
incoming_queue: BTreeMap::new(),
|
incoming_queue: BTreeMap::new(),
|
||||||
|
@ -297,8 +286,8 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
||||||
// Set the initial estimated value to the input value.
|
// Set the initial estimated value to the input value.
|
||||||
self.estimated = Some(input);
|
self.estimated = Some(input);
|
||||||
debug!("{:?}/{:?} Input {}", self.our_id(), self.proposer_id, input);
|
debug!("{:?}/{:?} Input {}", self.our_id(), self.proposer_id, input);
|
||||||
// Record the input value as sent.
|
let sbvb_step = self.sbv_broadcast.input(input)?;
|
||||||
self.send_bval(input)
|
self.handle_sbvb_step(sbvb_step)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Acceptance check to be performed before setting the input value.
|
/// Acceptance check to be performed before setting the input value.
|
||||||
|
@ -313,65 +302,50 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
||||||
content: AgreementContent,
|
content: AgreementContent,
|
||||||
) -> Result<Step<NodeUid>> {
|
) -> Result<Step<NodeUid>> {
|
||||||
match content {
|
match content {
|
||||||
AgreementContent::BVal(b) => self.handle_bval(sender_id, b),
|
AgreementContent::SbvBroadcast(msg) => self.handle_sbv_broadcast(sender_id, msg),
|
||||||
AgreementContent::Aux(b) => self.handle_aux(sender_id, b),
|
|
||||||
AgreementContent::Conf(v) => self.handle_conf(sender_id, v),
|
AgreementContent::Conf(v) => self.handle_conf(sender_id, v),
|
||||||
AgreementContent::Term(v) => self.handle_term(sender_id, v),
|
AgreementContent::Term(v) => self.handle_term(sender_id, v),
|
||||||
AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg),
|
AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handles a `BVal(b)` message.
|
/// Handles a Synchroniced Binary Value Broadcast message.
|
||||||
///
|
fn handle_sbv_broadcast(
|
||||||
/// Upon receiving _f + 1_ `BVal(b)`, multicast `BVal(b)`. Upon receiving _2 f + 1_ `BVal(b)`,
|
&mut self,
|
||||||
/// update `bin_values`. When `bin_values` gets its first entry, multicast `Aux(b)`. If the
|
sender_id: &NodeUid,
|
||||||
/// condition is met, starts the `Conf` round or decides. (See `on_bval_or_aux`.)
|
msg: sbv_broadcast::Message,
|
||||||
fn handle_bval(&mut self, sender_id: &NodeUid, b: bool) -> Result<Step<NodeUid>> {
|
) -> Result<Step<NodeUid>> {
|
||||||
let count_bval = {
|
let sbvb_step = self.sbv_broadcast.handle_message(sender_id, msg)?;
|
||||||
let entry = self.received_bval.entry(b).or_insert_with(BTreeSet::new);
|
self.handle_sbvb_step(sbvb_step)
|
||||||
if !entry.insert(sender_id.clone()) {
|
|
||||||
return Ok(Fault::new(sender_id.clone(), FaultKind::DuplicateBVal).into());
|
|
||||||
}
|
|
||||||
entry.len()
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut step = Step::default();
|
|
||||||
|
|
||||||
if count_bval == 2 * self.netinfo.num_faulty() + 1 {
|
|
||||||
self.bin_values.insert(b);
|
|
||||||
|
|
||||||
if self.bin_values != bool_set::BOTH {
|
|
||||||
step.extend(self.send(AgreementContent::Aux(b))?) // First entry: send `Aux(b)`.
|
|
||||||
} else {
|
|
||||||
step.extend(self.on_bval_or_aux()?); // Otherwise just check for `Conf` condition.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if count_bval == self.netinfo.num_faulty() + 1 {
|
|
||||||
step.extend(self.send_bval(b)?);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(step)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handles an `Aux` message.
|
/// Handles a Synchronized Binary Value Broadcast step. On output, starts the `Conf` round or
|
||||||
///
|
/// decides.
|
||||||
/// If the condition is met, starts the `Conf` round or decides. (See `on_bval_or_aux`.)
|
fn handle_sbvb_step(
|
||||||
fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result<Step<NodeUid>> {
|
&mut self,
|
||||||
// Perform the `Aux` message round only if a `Conf` round hasn't started yet.
|
sbvb_step: sbv_broadcast::Step<NodeUid>,
|
||||||
|
) -> Result<Step<NodeUid>> {
|
||||||
|
let mut step = Step::default();
|
||||||
|
let output = step.extend_with(sbvb_step, |msg| {
|
||||||
|
AgreementContent::SbvBroadcast(msg).with_epoch(self.epoch)
|
||||||
|
});
|
||||||
if self.conf_values.is_some() {
|
if self.conf_values.is_some() {
|
||||||
return Ok(Step::default());
|
return Ok(step); // The `Conf` round has already started.
|
||||||
}
|
}
|
||||||
// TODO: Detect duplicate `Aux` messages and report faults.
|
if let Some(aux_vals) = output.into_iter().next() {
|
||||||
if !self
|
// Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...`
|
||||||
.received_aux
|
match self.coin_state {
|
||||||
.entry(b)
|
CoinState::Decided(_) => {
|
||||||
.or_insert_with(BTreeSet::new)
|
self.conf_values = Some(aux_vals);
|
||||||
.insert(sender_id.clone())
|
step.extend(self.try_update_epoch()?)
|
||||||
{
|
}
|
||||||
return Ok(Fault::new(sender_id.clone(), FaultKind::DuplicateAux).into());
|
CoinState::InProgress(_) => {
|
||||||
|
// Start the `Conf` message round.
|
||||||
|
step.extend(self.send_conf(aux_vals)?)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
self.on_bval_or_aux()
|
Ok(step)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handles a `Conf` message. When _N - f_ `Conf` messages with values in `bin_values` have
|
/// Handles a `Conf` message. When _N - f_ `Conf` messages with values in `bin_values` have
|
||||||
|
@ -385,19 +359,17 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
||||||
/// _f_ such messages with the same value from different nodes, performs expedite termination:
|
/// _f_ such messages with the same value from different nodes, performs expedite termination:
|
||||||
/// decides on `v`, broadcasts `Term(v)` and terminates the instance.
|
/// decides on `v`, broadcasts `Term(v)` and terminates the instance.
|
||||||
fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> Result<Step<NodeUid>> {
|
fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> Result<Step<NodeUid>> {
|
||||||
self.received_term
|
self.received_term[b].insert(sender_id.clone());
|
||||||
.entry(b)
|
|
||||||
.or_insert_with(BTreeSet::new)
|
|
||||||
.insert(sender_id.clone());
|
|
||||||
// Check for the expedite termination condition.
|
// Check for the expedite termination condition.
|
||||||
if self.decision.is_some() {
|
if self.decision.is_some() {
|
||||||
Ok(Step::default())
|
Ok(Step::default())
|
||||||
} else if self.received_term[&b].len() > self.netinfo.num_faulty() {
|
} else if self.received_term[b].len() > self.netinfo.num_faulty() {
|
||||||
Ok(self.decide(b))
|
Ok(self.decide(b))
|
||||||
} else {
|
} else {
|
||||||
// Otherwise handle the `Term` as a `BVal`, `Aux` and `Conf`.
|
// Otherwise handle the `Term` as a `BVal`, `Aux` and `Conf`.
|
||||||
let mut step = self.handle_bval(sender_id, b)?;
|
let mut sbvb_step = self.sbv_broadcast.handle_bval(sender_id, b)?;
|
||||||
step.extend(self.handle_aux(sender_id, b)?);
|
sbvb_step.extend(self.sbv_broadcast.handle_aux(sender_id, b)?);
|
||||||
|
let mut step = self.handle_sbvb_step(sbvb_step)?;
|
||||||
step.extend(self.handle_conf(sender_id, BoolSet::from(b))?);
|
step.extend(self.handle_conf(sender_id, BoolSet::from(b))?);
|
||||||
Ok(step)
|
Ok(step)
|
||||||
}
|
}
|
||||||
|
@ -419,35 +391,6 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
||||||
self.on_coin_step(coin_step)
|
self.on_coin_step(coin_step)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Checks whether there are _N - f_ `Aux` messages with values in `bin_values`. If so, starts
|
|
||||||
/// the `Conf` round or decides.
|
|
||||||
fn on_bval_or_aux(&mut self) -> Result<Step<NodeUid>> {
|
|
||||||
if self.bin_values == bool_set::NONE || self.conf_values.is_some() {
|
|
||||||
return Ok(Step::default());
|
|
||||||
}
|
|
||||||
let (aux_count, aux_vals) = self.count_aux();
|
|
||||||
if aux_count < self.netinfo.num_correct() {
|
|
||||||
return Ok(Step::default());
|
|
||||||
}
|
|
||||||
// Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...`
|
|
||||||
match self.coin_state {
|
|
||||||
CoinState::Decided(_) => {
|
|
||||||
self.conf_values = Some(aux_vals);
|
|
||||||
self.try_update_epoch()
|
|
||||||
}
|
|
||||||
CoinState::InProgress(_) => self.send_conf(aux_vals), // Start the `Conf` message round.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Multicasts a `BVal(b)` message, and handles it.
|
|
||||||
fn send_bval(&mut self, b: bool) -> Result<Step<NodeUid>> {
|
|
||||||
// Record the value `b` as sent. If it was already there, don't send it again.
|
|
||||||
if !self.sent_bval.insert(b) {
|
|
||||||
return Ok(Step::default());
|
|
||||||
}
|
|
||||||
self.send(AgreementContent::BVal(b))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Multicasts a `Conf(values)` message, and handles it.
|
/// Multicasts a `Conf(values)` message, and handles it.
|
||||||
fn send_conf(&mut self, values: BoolSet) -> Result<Step<NodeUid>> {
|
fn send_conf(&mut self, values: BoolSet) -> Result<Step<NodeUid>> {
|
||||||
if self.conf_values.is_some() {
|
if self.conf_values.is_some() {
|
||||||
|
@ -583,40 +526,16 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
||||||
|
|
||||||
/// Counts the number of received `Conf` messages with values in `bin_values`.
|
/// Counts the number of received `Conf` messages with values in `bin_values`.
|
||||||
fn count_conf(&self) -> usize {
|
fn count_conf(&self) -> usize {
|
||||||
let is_bin_val = |conf: &&BoolSet| conf.is_subset(self.bin_values);
|
let is_bin_val = |conf: &&BoolSet| conf.is_subset(self.sbv_broadcast.bin_values());
|
||||||
self.received_conf.values().filter(is_bin_val).count()
|
self.received_conf.values().filter(is_bin_val).count()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The count of `Aux` messages such that the set of values carried by those messages is a
|
|
||||||
/// subset of `bin_values`.
|
|
||||||
///
|
|
||||||
/// In general, we can't expect every good node to send the same `Aux` value, so waiting for
|
|
||||||
/// _N - f_ agreeing messages would not always terminate. We can, however, expect every good
|
|
||||||
/// node to send an `Aux` value that will eventually end up in our `bin_values`.
|
|
||||||
fn count_aux(&self) -> (usize, BoolSet) {
|
|
||||||
let mut values = bool_set::NONE;
|
|
||||||
let mut count = 0;
|
|
||||||
for b in self.bin_values {
|
|
||||||
let b_count = self.received_aux.get(&b).map_or(0, BTreeSet::len);
|
|
||||||
if b_count > 0 {
|
|
||||||
values.insert(b);
|
|
||||||
count += b_count;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
(count, values)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Increments the epoch, sets the new estimate and handles queued messages.
|
/// Increments the epoch, sets the new estimate and handles queued messages.
|
||||||
fn update_epoch(&mut self, b: bool) -> Result<Step<NodeUid>> {
|
fn update_epoch(&mut self, b: bool) -> Result<Step<NodeUid>> {
|
||||||
self.bin_values = bool_set::NONE;
|
self.sbv_broadcast.clear(&self.received_term);
|
||||||
self.received_bval = self.received_term.clone();
|
|
||||||
self.sent_bval = bool_set::NONE;
|
|
||||||
self.received_aux = self.received_term.clone();
|
|
||||||
self.received_conf.clear();
|
self.received_conf.clear();
|
||||||
for (v, ids) in &self.received_term {
|
for (v, id) in &self.received_term {
|
||||||
for id in ids {
|
self.received_conf.insert(id.clone(), BoolSet::from(v));
|
||||||
self.received_conf.insert(id.clone(), BoolSet::from(*v));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
self.conf_values = None;
|
self.conf_values = None;
|
||||||
self.epoch += 1;
|
self.epoch += 1;
|
||||||
|
@ -630,7 +549,8 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
||||||
);
|
);
|
||||||
|
|
||||||
self.estimated = Some(b);
|
self.estimated = Some(b);
|
||||||
let mut step = self.send_bval(b)?;
|
let sbvb_step = self.sbv_broadcast.input(b)?;
|
||||||
|
let mut step = self.handle_sbvb_step(sbvb_step)?;
|
||||||
let queued_msgs = Itertools::flatten(self.incoming_queue.remove(&self.epoch).into_iter());
|
let queued_msgs = Itertools::flatten(self.incoming_queue.remove(&self.epoch).into_iter());
|
||||||
for (sender_id, content) in queued_msgs {
|
for (sender_id, content) in queued_msgs {
|
||||||
step.extend(self.handle_message_content(&sender_id, content)?);
|
step.extend(self.handle_message_content(&sender_id, content)?);
|
||||||
|
|
|
@ -0,0 +1,208 @@
|
||||||
|
//! # Synchronized Binary Value Broadcast
|
||||||
|
//!
|
||||||
|
//! This performs the `BVal` and `Aux` steps for `Agreement`.
|
||||||
|
//!
|
||||||
|
//! Validators input binary values, and each node outputs a set of one or two binary values.
|
||||||
|
//! These outputs are not necessarily the same in each node, but it is guaranteed that whenever two
|
||||||
|
//! nodes output singletons _{v}_ and _{w}_, then _v = w_.
|
||||||
|
//!
|
||||||
|
//! It will only output once, but can continue handling messages and will keep track of the set
|
||||||
|
//! `bin_values` of values for which _2 f + 1_ `BVal`s were received.
|
||||||
|
|
||||||
|
use rand;
|
||||||
|
use std::fmt::Debug;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use super::bool_multimap::BoolMultimap;
|
||||||
|
use super::bool_set::{self, BoolSet};
|
||||||
|
use super::{Error, Result};
|
||||||
|
use fault_log::{Fault, FaultKind};
|
||||||
|
use messaging::{self, DistAlgorithm, NetworkInfo, Target};
|
||||||
|
|
||||||
|
pub type Step<NodeUid> = messaging::Step<SbvBroadcast<NodeUid>>;
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||||
|
pub enum Message {
|
||||||
|
BVal(bool),
|
||||||
|
Aux(bool),
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE: Extending rand_derive to correctly generate random values from boxes would make this
|
||||||
|
// implementation obsolete; however at the time of this writing, `rand::Rand` is already deprecated
|
||||||
|
// with no replacement in sight.
|
||||||
|
impl rand::Rand for Message {
|
||||||
|
fn rand<R: rand::Rng>(rng: &mut R) -> Self {
|
||||||
|
let message_type = *rng.choose(&["bval", "aux"]).unwrap();
|
||||||
|
|
||||||
|
match message_type {
|
||||||
|
"bval" => Message::BVal(rand::random()),
|
||||||
|
"aux" => Message::Aux(rand::random()),
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct SbvBroadcast<NodeUid> {
|
||||||
|
/// Shared network information.
|
||||||
|
netinfo: Arc<NetworkInfo<NodeUid>>,
|
||||||
|
/// The set of values for which _2 f + 1_ `BVal`s have been received.
|
||||||
|
bin_values: BoolSet,
|
||||||
|
/// The nodes that sent us a `BVal(b)`, by `b`.
|
||||||
|
received_bval: BoolMultimap<NodeUid>,
|
||||||
|
/// The values `b` for which we already sent `BVal(b)`.
|
||||||
|
sent_bval: BoolSet,
|
||||||
|
/// The nodes that sent us an `Aux(b)`, by `b`.
|
||||||
|
received_aux: BoolMultimap<NodeUid>,
|
||||||
|
/// Whether we have already output.
|
||||||
|
terminated: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for SbvBroadcast<NodeUid> {
|
||||||
|
type NodeUid = NodeUid;
|
||||||
|
type Input = bool;
|
||||||
|
type Output = BoolSet;
|
||||||
|
type Message = Message;
|
||||||
|
type Error = Error;
|
||||||
|
|
||||||
|
fn input(&mut self, input: Self::Input) -> Result<Step<NodeUid>> {
|
||||||
|
self.send_bval(input)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_message(
|
||||||
|
&mut self,
|
||||||
|
sender_id: &Self::NodeUid,
|
||||||
|
msg: Self::Message,
|
||||||
|
) -> Result<Step<NodeUid>> {
|
||||||
|
match msg {
|
||||||
|
Message::BVal(b) => self.handle_bval(sender_id, b),
|
||||||
|
Message::Aux(b) => self.handle_aux(sender_id, b),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn terminated(&self) -> bool {
|
||||||
|
self.terminated
|
||||||
|
}
|
||||||
|
|
||||||
|
fn our_id(&self) -> &Self::NodeUid {
|
||||||
|
self.netinfo.our_uid()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<NodeUid: Clone + Debug + Ord> SbvBroadcast<NodeUid> {
|
||||||
|
pub fn new(netinfo: Arc<NetworkInfo<NodeUid>>) -> Self {
|
||||||
|
SbvBroadcast {
|
||||||
|
netinfo,
|
||||||
|
bin_values: bool_set::NONE,
|
||||||
|
received_bval: BoolMultimap::default(),
|
||||||
|
sent_bval: bool_set::NONE,
|
||||||
|
received_aux: BoolMultimap::default(),
|
||||||
|
terminated: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Resets the algorithm, but assumes the given `init` values have already been received as
|
||||||
|
/// both `BVal` and `Aux` messages.
|
||||||
|
pub fn clear(&mut self, init: &BoolMultimap<NodeUid>) {
|
||||||
|
self.bin_values = bool_set::NONE;
|
||||||
|
self.received_bval = init.clone();
|
||||||
|
self.sent_bval = bool_set::NONE;
|
||||||
|
self.received_aux = init.clone();
|
||||||
|
self.terminated = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handles a `BVal(b)` message.
|
||||||
|
///
|
||||||
|
/// Upon receiving _f + 1_ `BVal(b)`, multicasts `BVal(b)`. Upon receiving _2 f + 1_ `BVal(b)`,
|
||||||
|
/// updates `bin_values`. When `bin_values` gets its first entry, multicasts `Aux(b)`.
|
||||||
|
pub fn handle_bval(&mut self, sender_id: &NodeUid, b: bool) -> Result<Step<NodeUid>> {
|
||||||
|
let count_bval = {
|
||||||
|
if !self.received_bval[b].insert(sender_id.clone()) {
|
||||||
|
return Ok(Fault::new(sender_id.clone(), FaultKind::DuplicateBVal).into());
|
||||||
|
}
|
||||||
|
self.received_bval[b].len()
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut step = Step::default();
|
||||||
|
|
||||||
|
if count_bval == 2 * self.netinfo.num_faulty() + 1 {
|
||||||
|
self.bin_values.insert(b);
|
||||||
|
|
||||||
|
if self.bin_values != bool_set::BOTH {
|
||||||
|
step.extend(self.send(Message::Aux(b))?) // First entry: send `Aux(b)`.
|
||||||
|
} else {
|
||||||
|
step.extend(self.try_output()?); // Otherwise just check for `Conf` condition.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if count_bval == self.netinfo.num_faulty() + 1 {
|
||||||
|
step.extend(self.send_bval(b)?);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(step)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the current `bin_values`: the set of `b` for which _2 f + 1_ `BVal`s were received.
|
||||||
|
pub fn bin_values(&self) -> BoolSet {
|
||||||
|
self.bin_values
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Multicasts and handles a message. Does nothing if we are only an observer.
|
||||||
|
fn send(&mut self, msg: Message) -> Result<Step<NodeUid>> {
|
||||||
|
if !self.netinfo.is_validator() {
|
||||||
|
return Ok(Step::default());
|
||||||
|
}
|
||||||
|
let mut step: Step<_> = Target::All.message(msg.clone()).into();
|
||||||
|
let our_uid = &self.netinfo.our_uid().clone();
|
||||||
|
step.extend(self.handle_message(our_uid, msg)?);
|
||||||
|
Ok(step)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Multicasts a `BVal(b)` message, and handles it.
|
||||||
|
fn send_bval(&mut self, b: bool) -> Result<Step<NodeUid>> {
|
||||||
|
// Record the value `b` as sent. If it was already there, don't send it again.
|
||||||
|
if !self.sent_bval.insert(b) {
|
||||||
|
return Ok(Step::default());
|
||||||
|
}
|
||||||
|
self.send(Message::BVal(b))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handles an `Aux` message.
|
||||||
|
pub fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result<Step<NodeUid>> {
|
||||||
|
if !self.received_aux[b].insert(sender_id.clone()) {
|
||||||
|
return Ok(Fault::new(sender_id.clone(), FaultKind::DuplicateAux).into());
|
||||||
|
}
|
||||||
|
self.try_output()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Checks whether there are _N - f_ `Aux` messages with values in `bin_values`, and outputs.
|
||||||
|
fn try_output(&mut self) -> Result<Step<NodeUid>> {
|
||||||
|
if self.terminated || self.bin_values == bool_set::NONE {
|
||||||
|
return Ok(Step::default());
|
||||||
|
}
|
||||||
|
let (aux_count, aux_vals) = self.count_aux();
|
||||||
|
if aux_count < self.netinfo.num_correct() {
|
||||||
|
return Ok(Step::default());
|
||||||
|
}
|
||||||
|
self.terminated = true;
|
||||||
|
Ok(Step::default().with_output(aux_vals))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The count of `Aux` messages such that the set of values carried by those messages is a
|
||||||
|
/// subset of `bin_values`.
|
||||||
|
///
|
||||||
|
/// In general, we can't expect every good node to send the same `Aux` value, so waiting for
|
||||||
|
/// _N - f_ agreeing messages would not always terminate. We can, however, expect every good
|
||||||
|
/// node to send an `Aux` value that will eventually end up in our `bin_values`.
|
||||||
|
fn count_aux(&self) -> (usize, BoolSet) {
|
||||||
|
let mut values = bool_set::NONE;
|
||||||
|
let mut count = 0;
|
||||||
|
for b in self.bin_values {
|
||||||
|
if !self.received_aux[b].is_empty() {
|
||||||
|
values.insert(b);
|
||||||
|
count += self.received_aux[b].len();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(count, values)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue