|
|
|
@ -33,7 +33,7 @@
|
|
|
|
|
//! In short: If _any_ correct node believes in `v`, _every_ correct node will.
|
|
|
|
|
//!
|
|
|
|
|
//! * Every correct node will eventually send exactly one `Aux`, so we will receive at least
|
|
|
|
|
//! _2 f + 1_ `Aux` messages with values we believe in. At that point, we define the set `vals`
|
|
|
|
|
//! _N - f_ `Aux` messages with values we believe in. At that point, we define the set `vals`
|
|
|
|
|
//! of _candidate values_: the set of values we believe in _and_ have received in an `Aux`.
|
|
|
|
|
//!
|
|
|
|
|
//! * Once we have the set of candidate values, we obtain a _coin value_ `s` (see below).
|
|
|
|
@ -57,7 +57,7 @@
|
|
|
|
|
//! * We multicast a `Conf` message containing our candidate values.
|
|
|
|
|
//!
|
|
|
|
|
//! * Since every good node believes in all values it puts into its `Conf` message, we will
|
|
|
|
|
//! eventually receive _2 f + 1_ `Conf` messages containing only values we believe in. Then we
|
|
|
|
|
//! eventually receive _N - f_ `Conf` messages containing only values we believe in. Then we
|
|
|
|
|
//! trigger the common coin.
|
|
|
|
|
//!
|
|
|
|
|
//! * After _f + 1_ nodes have sent us their coin shares, we receive the coin output and assign it
|
|
|
|
@ -68,13 +68,13 @@ pub mod bin_values;
|
|
|
|
|
use rand;
|
|
|
|
|
use std::collections::{BTreeMap, BTreeSet};
|
|
|
|
|
use std::fmt::Debug;
|
|
|
|
|
use std::mem::replace;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
|
|
|
|
|
use itertools::Itertools;
|
|
|
|
|
|
|
|
|
|
use agreement::bin_values::BinValues;
|
|
|
|
|
use common_coin::{self, CommonCoin, CommonCoinMessage};
|
|
|
|
|
use fault_log::{Fault, FaultKind};
|
|
|
|
|
use messaging::{self, DistAlgorithm, NetworkInfo, Target};
|
|
|
|
|
|
|
|
|
|
error_chain!{
|
|
|
|
@ -114,6 +114,14 @@ impl AgreementContent {
|
|
|
|
|
content: self,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Returns `true` if this message can be ignored if its epoch has already passed.
|
|
|
|
|
pub fn can_expire(&self) -> bool {
|
|
|
|
|
match *self {
|
|
|
|
|
AgreementContent::Term(_) => false,
|
|
|
|
|
_ => true,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Messages sent during the binary Byzantine agreement stage.
|
|
|
|
@ -143,13 +151,30 @@ impl rand::Rand for AgreementContent {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Possible values of the common coin schedule defining the method to derive the common coin in a
|
|
|
|
|
/// given epoch: as a constant value or a distributed computation.
|
|
|
|
|
/// The state of the current epoch's common coin. In some epochs this is fixed, in others it starts
|
|
|
|
|
/// with in `InProgress`.
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
enum CoinSchedule {
|
|
|
|
|
False,
|
|
|
|
|
True,
|
|
|
|
|
Random,
|
|
|
|
|
enum CoinState<NodeUid> {
|
|
|
|
|
/// The value was fixed in the current epoch, or the coin has already terminated.
|
|
|
|
|
Decided(bool),
|
|
|
|
|
/// The coin value is not known yet.
|
|
|
|
|
InProgress(CommonCoin<NodeUid, Nonce>),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<NodeUid> CoinState<NodeUid> {
|
|
|
|
|
/// Returns the value, if this coin has already decided.
|
|
|
|
|
fn value(&self) -> Option<bool> {
|
|
|
|
|
match self {
|
|
|
|
|
CoinState::Decided(value) => Some(*value),
|
|
|
|
|
CoinState::InProgress(_) => None,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<NodeUid> From<bool> for CoinState<NodeUid> {
|
|
|
|
|
fn from(value: bool) -> Self {
|
|
|
|
|
CoinState::Decided(value)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Binary Agreement instance
|
|
|
|
@ -166,15 +191,16 @@ pub struct Agreement<NodeUid> {
|
|
|
|
|
/// Bin values. Reset on every epoch update.
|
|
|
|
|
bin_values: BinValues,
|
|
|
|
|
/// Values received in `BVal` messages. Reset on every epoch update.
|
|
|
|
|
received_bval: BTreeMap<NodeUid, BTreeSet<bool>>,
|
|
|
|
|
received_bval: BTreeMap<bool, BTreeSet<NodeUid>>,
|
|
|
|
|
/// Sent `BVal` values. Reset on every epoch update.
|
|
|
|
|
sent_bval: BTreeSet<bool>,
|
|
|
|
|
/// Values received in `Aux` messages. Reset on every epoch update.
|
|
|
|
|
received_aux: BTreeMap<NodeUid, bool>,
|
|
|
|
|
received_aux: BTreeMap<bool, BTreeSet<NodeUid>>,
|
|
|
|
|
/// Received `Conf` messages. Reset on every epoch update.
|
|
|
|
|
received_conf: BTreeMap<NodeUid, BinValues>,
|
|
|
|
|
/// Received `Term` messages. Kept throughout epoch updates.
|
|
|
|
|
received_term: BTreeMap<NodeUid, bool>,
|
|
|
|
|
/// Received `Term` messages. Kept throughout epoch updates. These count as `BVal`, `Aux` and
|
|
|
|
|
/// `Conf` messages for all future epochs.
|
|
|
|
|
received_term: BTreeMap<bool, BTreeSet<NodeUid>>,
|
|
|
|
|
/// The estimate of the decision value in the current epoch.
|
|
|
|
|
estimated: Option<bool>,
|
|
|
|
|
/// A permanent, latching copy of the output value. This copy is required because `output` can
|
|
|
|
@ -185,17 +211,11 @@ pub struct Agreement<NodeUid> {
|
|
|
|
|
decision: Option<bool>,
|
|
|
|
|
/// A cache for messages for future epochs that cannot be handled yet.
|
|
|
|
|
// TODO: Find a better solution for this; defend against spam.
|
|
|
|
|
incoming_queue: Vec<(NodeUid, AgreementMessage)>,
|
|
|
|
|
/// Termination flag. Once the instance determines that all the remote nodes have reached
|
|
|
|
|
/// agreement or have the necessary information to reach agreement, it sets the `terminated`
|
|
|
|
|
/// flag and accepts no more incoming messages.
|
|
|
|
|
terminated: bool,
|
|
|
|
|
/// Whether the `Conf` message round has started in the current epoch.
|
|
|
|
|
conf_round: bool,
|
|
|
|
|
/// A common coin instance. It is reset on epoch update.
|
|
|
|
|
common_coin: CommonCoin<NodeUid, Nonce>,
|
|
|
|
|
/// Common coin schedule computed at the start of each epoch.
|
|
|
|
|
coin_schedule: CoinSchedule,
|
|
|
|
|
incoming_queue: BTreeMap<u32, Vec<(NodeUid, AgreementContent)>>,
|
|
|
|
|
/// The values we found in the first _N - f_ `Aux` messages that were in `bin_values`.
|
|
|
|
|
conf_values: Option<BinValues>,
|
|
|
|
|
/// The state of this epoch's common coin.
|
|
|
|
|
coin_state: CoinState<NodeUid>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub type Step<NodeUid> = messaging::Step<Agreement<NodeUid>>;
|
|
|
|
@ -215,29 +235,24 @@ impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for Agreement<NodeUid> {
|
|
|
|
|
fn handle_message(
|
|
|
|
|
&mut self,
|
|
|
|
|
sender_id: &Self::NodeUid,
|
|
|
|
|
message: Self::Message,
|
|
|
|
|
AgreementMessage { epoch, content }: Self::Message,
|
|
|
|
|
) -> Result<Step<NodeUid>> {
|
|
|
|
|
if self.terminated || message.epoch < self.epoch {
|
|
|
|
|
if self.decision.is_some() || (epoch < self.epoch && content.can_expire()) {
|
|
|
|
|
// Message is obsolete: We are already in a later epoch or terminated.
|
|
|
|
|
Ok(Step::default())
|
|
|
|
|
} else if message.epoch > self.epoch {
|
|
|
|
|
} else if epoch > self.epoch {
|
|
|
|
|
// Message is for a later epoch. We can't handle that yet.
|
|
|
|
|
self.incoming_queue.push((sender_id.clone(), message));
|
|
|
|
|
let queue = self.incoming_queue.entry(epoch).or_insert_with(Vec::new);
|
|
|
|
|
queue.push((sender_id.clone(), content));
|
|
|
|
|
Ok(Step::default())
|
|
|
|
|
} else {
|
|
|
|
|
match message.content {
|
|
|
|
|
AgreementContent::BVal(b) => self.handle_bval(sender_id, b),
|
|
|
|
|
AgreementContent::Aux(b) => self.handle_aux(sender_id, b),
|
|
|
|
|
AgreementContent::Conf(v) => self.handle_conf(sender_id, v),
|
|
|
|
|
AgreementContent::Term(v) => self.handle_term(sender_id, v),
|
|
|
|
|
AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg),
|
|
|
|
|
}
|
|
|
|
|
self.handle_message_content(sender_id, content)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Whether the algorithm has terminated.
|
|
|
|
|
fn terminated(&self) -> bool {
|
|
|
|
|
self.terminated
|
|
|
|
|
self.decision.is_some()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn our_id(&self) -> &Self::NodeUid {
|
|
|
|
@ -251,10 +266,11 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|
|
|
|
session_id: u64,
|
|
|
|
|
proposer_id: NodeUid,
|
|
|
|
|
) -> Result<Self> {
|
|
|
|
|
let invocation_id = netinfo.invocation_id();
|
|
|
|
|
if let Some(proposer_i) = netinfo.node_index(&proposer_id) {
|
|
|
|
|
if !netinfo.is_node_validator(&proposer_id) {
|
|
|
|
|
return Err(ErrorKind::UnknownProposer.into());
|
|
|
|
|
}
|
|
|
|
|
Ok(Agreement {
|
|
|
|
|
netinfo: netinfo.clone(),
|
|
|
|
|
netinfo,
|
|
|
|
|
session_id,
|
|
|
|
|
proposer_id,
|
|
|
|
|
epoch: 0,
|
|
|
|
@ -266,18 +282,10 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|
|
|
|
received_term: BTreeMap::new(),
|
|
|
|
|
estimated: None,
|
|
|
|
|
decision: None,
|
|
|
|
|
incoming_queue: Vec::new(),
|
|
|
|
|
terminated: false,
|
|
|
|
|
conf_round: false,
|
|
|
|
|
common_coin: CommonCoin::new(
|
|
|
|
|
netinfo,
|
|
|
|
|
Nonce::new(invocation_id.as_ref(), session_id, proposer_i, 0),
|
|
|
|
|
),
|
|
|
|
|
coin_schedule: CoinSchedule::True,
|
|
|
|
|
incoming_queue: BTreeMap::new(),
|
|
|
|
|
conf_values: None,
|
|
|
|
|
coin_state: CoinState::Decided(true),
|
|
|
|
|
})
|
|
|
|
|
} else {
|
|
|
|
|
Err(ErrorKind::UnknownProposer.into())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Sets the input value for agreement.
|
|
|
|
@ -285,171 +293,105 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|
|
|
|
if self.epoch != 0 || self.estimated.is_some() {
|
|
|
|
|
return Err(ErrorKind::InputNotAccepted.into());
|
|
|
|
|
}
|
|
|
|
|
if self.netinfo.num_nodes() == 1 {
|
|
|
|
|
let mut step = self.send_bval(input)?;
|
|
|
|
|
step.extend(self.send_aux(input)?);
|
|
|
|
|
step.extend(self.decide(input));
|
|
|
|
|
Ok(step)
|
|
|
|
|
} else {
|
|
|
|
|
// Set the initial estimated value to the input value.
|
|
|
|
|
self.estimated = Some(input);
|
|
|
|
|
debug!("{:?}/{:?} Input {}", self.our_id(), self.proposer_id, input);
|
|
|
|
|
// Record the input value as sent.
|
|
|
|
|
self.send_bval(input)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Acceptance check to be performed before setting the input value.
|
|
|
|
|
pub fn accepts_input(&self) -> bool {
|
|
|
|
|
self.epoch == 0 && self.estimated.is_none()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Dispatches the message content to the corresponding handling method.
|
|
|
|
|
fn handle_message_content(
|
|
|
|
|
&mut self,
|
|
|
|
|
sender_id: &NodeUid,
|
|
|
|
|
content: AgreementContent,
|
|
|
|
|
) -> Result<Step<NodeUid>> {
|
|
|
|
|
match content {
|
|
|
|
|
AgreementContent::BVal(b) => self.handle_bval(sender_id, b),
|
|
|
|
|
AgreementContent::Aux(b) => self.handle_aux(sender_id, b),
|
|
|
|
|
AgreementContent::Conf(v) => self.handle_conf(sender_id, v),
|
|
|
|
|
AgreementContent::Term(v) => self.handle_term(sender_id, v),
|
|
|
|
|
AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Handles a `BVal(b)` message.
|
|
|
|
|
///
|
|
|
|
|
/// Upon receiving _f + 1_ `BVal(b)`, multicast `BVal(b)`. Upon receiving _2 f + 1_ `BVal(b)`,
|
|
|
|
|
/// update `bin_values`. When `bin_values` gets its first entry, multicast `Aux(b)`. If the
|
|
|
|
|
/// condition is met, starts the `Conf` round or decides. (See `on_bval_or_aux`.)
|
|
|
|
|
fn handle_bval(&mut self, sender_id: &NodeUid, b: bool) -> Result<Step<NodeUid>> {
|
|
|
|
|
self.received_bval
|
|
|
|
|
.entry(sender_id.clone())
|
|
|
|
|
.or_insert_with(BTreeSet::new)
|
|
|
|
|
.insert(b);
|
|
|
|
|
let count_bval = self
|
|
|
|
|
.received_bval
|
|
|
|
|
.values()
|
|
|
|
|
.filter(|values| values.contains(&b))
|
|
|
|
|
.count();
|
|
|
|
|
let count_bval = {
|
|
|
|
|
let entry = self.received_bval.entry(b).or_insert_with(BTreeSet::new);
|
|
|
|
|
if !entry.insert(sender_id.clone()) {
|
|
|
|
|
return Ok(Fault::new(sender_id.clone(), FaultKind::DuplicateBVal).into());
|
|
|
|
|
}
|
|
|
|
|
entry.len()
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut step = Step::default();
|
|
|
|
|
|
|
|
|
|
// upon receiving `BVal(b)` messages from 2f + 1 nodes,
|
|
|
|
|
// bin_values := bin_values ∪ {b}
|
|
|
|
|
if count_bval == 2 * self.netinfo.num_faulty() + 1 {
|
|
|
|
|
let previous_bin_values = self.bin_values;
|
|
|
|
|
let bin_values_changed = self.bin_values.insert(b);
|
|
|
|
|
self.bin_values.insert(b);
|
|
|
|
|
|
|
|
|
|
// wait until bin_values != 0, then multicast `Aux(w)`
|
|
|
|
|
// where w ∈ bin_values
|
|
|
|
|
if previous_bin_values == BinValues::None {
|
|
|
|
|
// Send an `Aux` message at most once per epoch.
|
|
|
|
|
step.extend(self.send_aux(b)?);
|
|
|
|
|
}
|
|
|
|
|
if bin_values_changed {
|
|
|
|
|
step.extend(self.on_bin_values_changed()?);
|
|
|
|
|
if self.bin_values != BinValues::Both {
|
|
|
|
|
step.extend(self.send(AgreementContent::Aux(b))?) // First entry: send `Aux(b)`.
|
|
|
|
|
} else {
|
|
|
|
|
step.extend(self.on_bval_or_aux()?); // Otherwise just check for `Conf` condition.
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if count_bval == self.netinfo.num_faulty() + 1 && !self.sent_bval.contains(&b) {
|
|
|
|
|
// upon receiving `BVal(b)` messages from f + 1 nodes, if
|
|
|
|
|
// `BVal(b)` has not been sent, multicast `BVal(b)`
|
|
|
|
|
if count_bval == self.netinfo.num_faulty() + 1 {
|
|
|
|
|
step.extend(self.send_bval(b)?);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(step)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Called when `bin_values` changes as a result of receiving a `BVal` message. Tries to update
|
|
|
|
|
/// the epoch.
|
|
|
|
|
fn on_bin_values_changed(&mut self) -> Result<Step<NodeUid>> {
|
|
|
|
|
match self.coin_schedule {
|
|
|
|
|
CoinSchedule::False => {
|
|
|
|
|
let (aux_count, aux_vals) = self.count_aux();
|
|
|
|
|
if aux_count >= self.netinfo.num_nodes() - self.netinfo.num_faulty() {
|
|
|
|
|
self.on_coin(false, aux_vals.definite())
|
|
|
|
|
} else {
|
|
|
|
|
Ok(Step::default())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
CoinSchedule::True => {
|
|
|
|
|
let (aux_count, aux_vals) = self.count_aux();
|
|
|
|
|
if aux_count >= self.netinfo.num_nodes() - self.netinfo.num_faulty() {
|
|
|
|
|
self.on_coin(true, aux_vals.definite())
|
|
|
|
|
} else {
|
|
|
|
|
Ok(Step::default())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
CoinSchedule::Random => {
|
|
|
|
|
// If the `Conf` round has already started, a change in `bin_values` can lead to its
|
|
|
|
|
// end. Try if it has indeed finished.
|
|
|
|
|
self.try_finish_conf_round()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn send_bval(&mut self, b: bool) -> Result<Step<NodeUid>> {
|
|
|
|
|
if !self.netinfo.is_validator() {
|
|
|
|
|
return Ok(Step::default());
|
|
|
|
|
}
|
|
|
|
|
// Record the value `b` as sent.
|
|
|
|
|
self.sent_bval.insert(b);
|
|
|
|
|
// Multicast `BVal`.
|
|
|
|
|
let msg = AgreementContent::BVal(b).with_epoch(self.epoch);
|
|
|
|
|
let mut step: Step<NodeUid> = Target::All.message(msg).into();
|
|
|
|
|
// Receive the `BVal` message locally.
|
|
|
|
|
let our_uid = &self.netinfo.our_uid().clone();
|
|
|
|
|
step.extend(self.handle_bval(our_uid, b)?);
|
|
|
|
|
Ok(step)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn send_conf(&mut self) -> Result<Step<NodeUid>> {
|
|
|
|
|
if self.conf_round {
|
|
|
|
|
// Only one `Conf` message is allowed in an epoch.
|
|
|
|
|
return Ok(Step::default());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Trigger the start of the `Conf` round.
|
|
|
|
|
self.conf_round = true;
|
|
|
|
|
|
|
|
|
|
if !self.netinfo.is_validator() {
|
|
|
|
|
return Ok(Step::default());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let v = self.bin_values;
|
|
|
|
|
// Multicast `Conf`.
|
|
|
|
|
let msg = AgreementContent::Conf(v).with_epoch(self.epoch);
|
|
|
|
|
let mut step: Step<NodeUid> = Target::All.message(msg).into();
|
|
|
|
|
// Receive the `Conf` message locally.
|
|
|
|
|
let our_uid = &self.netinfo.our_uid().clone();
|
|
|
|
|
step.extend(self.handle_conf(our_uid, v)?);
|
|
|
|
|
Ok(step)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Waits until at least (N − f) `Aux` messages have been received, such that
|
|
|
|
|
/// the set of values carried by these messages, vals, are a subset of
|
|
|
|
|
/// bin_values (note that bin_values_r may continue to change as `BVal`
|
|
|
|
|
/// messages are received, thus this condition may be triggered upon arrival
|
|
|
|
|
/// of either an `Aux` or a `BVal` message).
|
|
|
|
|
/// Handles an `Aux` message.
|
|
|
|
|
///
|
|
|
|
|
/// If the condition is met, starts the `Conf` round or decides. (See `on_bval_or_aux`.)
|
|
|
|
|
fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result<Step<NodeUid>> {
|
|
|
|
|
// Perform the `Aux` message round only if a `Conf` round hasn't started yet.
|
|
|
|
|
if self.conf_round {
|
|
|
|
|
if self.conf_values.is_some() {
|
|
|
|
|
return Ok(Step::default());
|
|
|
|
|
}
|
|
|
|
|
self.received_aux.insert(sender_id.clone(), b);
|
|
|
|
|
if self.bin_values == BinValues::None {
|
|
|
|
|
return Ok(Step::default());
|
|
|
|
|
}
|
|
|
|
|
let (aux_count, aux_vals) = self.count_aux();
|
|
|
|
|
if aux_count < self.netinfo.num_nodes() - self.netinfo.num_faulty() {
|
|
|
|
|
// Continue waiting for the (N - f) `Aux` messages.
|
|
|
|
|
return Ok(Step::default());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...`
|
|
|
|
|
match self.coin_schedule {
|
|
|
|
|
CoinSchedule::False => self.on_coin(false, aux_vals.definite()),
|
|
|
|
|
CoinSchedule::True => self.on_coin(true, aux_vals.definite()),
|
|
|
|
|
CoinSchedule::Random => self.send_conf(), // Start the `Conf` message round.
|
|
|
|
|
// TODO: Detect duplicate `Aux` messages and report faults.
|
|
|
|
|
if !self
|
|
|
|
|
.received_aux
|
|
|
|
|
.entry(b)
|
|
|
|
|
.or_insert_with(BTreeSet::new)
|
|
|
|
|
.insert(sender_id.clone())
|
|
|
|
|
{
|
|
|
|
|
return Ok(Fault::new(sender_id.clone(), FaultKind::DuplicateAux).into());
|
|
|
|
|
}
|
|
|
|
|
self.on_bval_or_aux()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Handles a `Conf` message. When _N - f_ `Conf` messages with values in `bin_values` have
|
|
|
|
|
/// been received, updates the epoch or decides.
|
|
|
|
|
fn handle_conf(&mut self, sender_id: &NodeUid, v: BinValues) -> Result<Step<NodeUid>> {
|
|
|
|
|
self.received_conf.insert(sender_id.clone(), v);
|
|
|
|
|
self.try_finish_conf_round()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Receives a `Term(v)` message. If we haven't yet decided on a value and there are more than
|
|
|
|
|
/// `num_faulty` such messages with the same value from different nodes, performs expedite
|
|
|
|
|
/// termination: decides on `v`, broadcasts `Term(v)` and terminates the instance.
|
|
|
|
|
/// Handles a `Term(v)` message. If we haven't yet decided on a value and there are more than
|
|
|
|
|
/// _f_ such messages with the same value from different nodes, performs expedite termination:
|
|
|
|
|
/// decides on `v`, broadcasts `Term(v)` and terminates the instance.
|
|
|
|
|
fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> Result<Step<NodeUid>> {
|
|
|
|
|
self.received_term.insert(sender_id.clone(), b);
|
|
|
|
|
self.received_term
|
|
|
|
|
.entry(b)
|
|
|
|
|
.or_insert_with(BTreeSet::new)
|
|
|
|
|
.insert(sender_id.clone());
|
|
|
|
|
// Check for the expedite termination condition.
|
|
|
|
|
if self.decision.is_none()
|
|
|
|
|
&& self.received_term.iter().filter(|(_, &c)| b == c).count()
|
|
|
|
|
> self.netinfo.num_faulty()
|
|
|
|
|
{
|
|
|
|
|
if self.decision.is_some() {
|
|
|
|
|
Ok(Step::default())
|
|
|
|
|
} else if self.received_term[&b].len() > self.netinfo.num_faulty() {
|
|
|
|
|
Ok(self.decide(b))
|
|
|
|
|
} else {
|
|
|
|
|
// Otherwise handle the `Term` as a `BVal`, `Aux` and `Conf`.
|
|
|
|
@ -467,75 +409,138 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|
|
|
|
sender_id: &NodeUid,
|
|
|
|
|
msg: CommonCoinMessage,
|
|
|
|
|
) -> Result<Step<NodeUid>> {
|
|
|
|
|
let coin_step = self.common_coin.handle_message(sender_id, msg)?;
|
|
|
|
|
let coin_step = match self.coin_state {
|
|
|
|
|
CoinState::Decided(_) => return Ok(Step::default()), // Coin value is already decided.
|
|
|
|
|
CoinState::InProgress(ref mut common_coin) => {
|
|
|
|
|
common_coin.handle_message(sender_id, msg)?
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
self.on_coin_step(coin_step)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Checks whether there are _N - f_ `Aux` messages with values in `bin_values`. If so, starts
|
|
|
|
|
/// the `Conf` round or decides.
|
|
|
|
|
fn on_bval_or_aux(&mut self) -> Result<Step<NodeUid>> {
|
|
|
|
|
if self.bin_values == BinValues::None || self.conf_values.is_some() {
|
|
|
|
|
return Ok(Step::default());
|
|
|
|
|
}
|
|
|
|
|
let (aux_count, aux_vals) = self.count_aux();
|
|
|
|
|
if aux_count < self.netinfo.num_correct() {
|
|
|
|
|
return Ok(Step::default());
|
|
|
|
|
}
|
|
|
|
|
// Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...`
|
|
|
|
|
match self.coin_state {
|
|
|
|
|
CoinState::Decided(_) => {
|
|
|
|
|
self.conf_values = Some(aux_vals);
|
|
|
|
|
self.try_update_epoch()
|
|
|
|
|
}
|
|
|
|
|
CoinState::InProgress(_) => self.send_conf(aux_vals), // Start the `Conf` message round.
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Multicasts a `BVal(b)` message, and handles it.
|
|
|
|
|
fn send_bval(&mut self, b: bool) -> Result<Step<NodeUid>> {
|
|
|
|
|
// Record the value `b` as sent. If it was already there, don't send it again.
|
|
|
|
|
if !self.sent_bval.insert(b) {
|
|
|
|
|
return Ok(Step::default());
|
|
|
|
|
}
|
|
|
|
|
self.send(AgreementContent::BVal(b))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Multicasts a `Conf(values)` message, and handles it.
|
|
|
|
|
fn send_conf(&mut self, values: BinValues) -> Result<Step<NodeUid>> {
|
|
|
|
|
if self.conf_values.is_some() {
|
|
|
|
|
// Only one `Conf` message is allowed in an epoch.
|
|
|
|
|
return Ok(Step::default());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Trigger the start of the `Conf` round.
|
|
|
|
|
self.conf_values = Some(values);
|
|
|
|
|
|
|
|
|
|
if !self.netinfo.is_validator() {
|
|
|
|
|
return Ok(self.try_finish_conf_round()?);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
self.send(AgreementContent::Conf(values))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Multicasts and handles a message. Does nothing if we are only an observer.
|
|
|
|
|
fn send(&mut self, content: AgreementContent) -> Result<Step<NodeUid>> {
|
|
|
|
|
if !self.netinfo.is_validator() {
|
|
|
|
|
return Ok(Step::default());
|
|
|
|
|
}
|
|
|
|
|
let mut step: Step<_> = Target::All
|
|
|
|
|
.message(content.clone().with_epoch(self.epoch))
|
|
|
|
|
.into();
|
|
|
|
|
let our_uid = &self.netinfo.our_uid().clone();
|
|
|
|
|
step.extend(self.handle_message_content(our_uid, content)?);
|
|
|
|
|
Ok(step)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Handles a step returned from the `CommonCoin`.
|
|
|
|
|
fn on_coin_step(
|
|
|
|
|
&mut self,
|
|
|
|
|
coin_step: common_coin::Step<NodeUid, Nonce>,
|
|
|
|
|
) -> Result<Step<NodeUid>> {
|
|
|
|
|
let mut step = Step::default();
|
|
|
|
|
let epoch = self.epoch;
|
|
|
|
|
let coin_output = step.extend_with(coin_step, |c_msg| {
|
|
|
|
|
AgreementContent::Coin(Box::new(c_msg)).with_epoch(epoch)
|
|
|
|
|
});
|
|
|
|
|
let to_msg = |c_msg| AgreementContent::Coin(Box::new(c_msg)).with_epoch(epoch);
|
|
|
|
|
let coin_output = step.extend_with(coin_step, to_msg);
|
|
|
|
|
if let Some(coin) = coin_output.into_iter().next() {
|
|
|
|
|
let def_bin_value = self.count_conf().1.definite();
|
|
|
|
|
step.extend(self.on_coin(coin, def_bin_value)?);
|
|
|
|
|
self.coin_state = coin.into();
|
|
|
|
|
step.extend(self.try_update_epoch()?);
|
|
|
|
|
}
|
|
|
|
|
Ok(step)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// When the common coin has been computed, tries to decide on an output value, updates the
|
|
|
|
|
/// `Agreement` epoch and handles queued messages for the new epoch.
|
|
|
|
|
fn on_coin(&mut self, coin: bool, def_bin_value: Option<bool>) -> Result<Step<NodeUid>> {
|
|
|
|
|
if self.terminated {
|
|
|
|
|
/// If this epoch's coin value or conf values are not known yet, does nothing, otherwise
|
|
|
|
|
/// updates the epoch or decides.
|
|
|
|
|
///
|
|
|
|
|
/// With two conf values, the next epoch's estimate is the coin value. If there is only one conf
|
|
|
|
|
/// value and that disagrees with the coin, the conf value is the next epoch's estimate. If
|
|
|
|
|
/// the unique conf value agrees with the coin, terminates and decides on that value.
|
|
|
|
|
fn try_update_epoch(&mut self) -> Result<Step<NodeUid>> {
|
|
|
|
|
if self.decision.is_some() {
|
|
|
|
|
// Avoid an infinite regression without making an Agreement step.
|
|
|
|
|
return Ok(Step::default());
|
|
|
|
|
}
|
|
|
|
|
let coin = match self.coin_state.value() {
|
|
|
|
|
None => return Ok(Step::default()), // Still waiting for coin value.
|
|
|
|
|
Some(coin) => coin,
|
|
|
|
|
};
|
|
|
|
|
let def_bin_value = match self.conf_values {
|
|
|
|
|
None => return Ok(Step::default()), // Still waiting for conf value.
|
|
|
|
|
Some(ref values) => values.definite(),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if self.decision.is_none() && Some(coin) == def_bin_value {
|
|
|
|
|
return Ok(self.decide(coin));
|
|
|
|
|
if Some(coin) == def_bin_value {
|
|
|
|
|
Ok(self.decide(coin))
|
|
|
|
|
} else {
|
|
|
|
|
self.update_epoch(def_bin_value.unwrap_or(coin))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let b = def_bin_value.unwrap_or(coin);
|
|
|
|
|
|
|
|
|
|
self.update_epoch();
|
|
|
|
|
|
|
|
|
|
self.estimated = Some(b);
|
|
|
|
|
let mut step = self.send_bval(b)?;
|
|
|
|
|
// Create a temporary map of received TERM messages to avoid a second mutable access to
|
|
|
|
|
// `self`.
|
|
|
|
|
let received_term = replace(&mut self.received_term, BTreeMap::new());
|
|
|
|
|
for (sender_id, b) in received_term {
|
|
|
|
|
step.extend(self.handle_term(&sender_id, b)?);
|
|
|
|
|
if self.terminated {
|
|
|
|
|
return Ok(step);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
let queued_msgs = replace(&mut self.incoming_queue, Vec::new());
|
|
|
|
|
for (sender_id, msg) in queued_msgs {
|
|
|
|
|
step.extend(self.handle_message(&sender_id, msg)?);
|
|
|
|
|
if self.terminated {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Ok(step)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Computes the coin schedule for the current `Agreement` epoch.
|
|
|
|
|
fn coin_schedule(&self) -> CoinSchedule {
|
|
|
|
|
/// Creates the initial coin state for the current epoch, i.e. sets it to the predetermined
|
|
|
|
|
/// value, or initializes a `CommonCoin` instance.
|
|
|
|
|
fn coin_state(&self) -> CoinState<NodeUid> {
|
|
|
|
|
match self.epoch % 3 {
|
|
|
|
|
0 => CoinSchedule::True,
|
|
|
|
|
1 => CoinSchedule::False,
|
|
|
|
|
_ => CoinSchedule::Random,
|
|
|
|
|
0 => CoinState::Decided(true),
|
|
|
|
|
1 => CoinState::Decided(false),
|
|
|
|
|
_ => {
|
|
|
|
|
let nonce = Nonce::new(
|
|
|
|
|
self.netinfo.invocation_id().as_ref(),
|
|
|
|
|
self.session_id,
|
|
|
|
|
self.netinfo.node_index(&self.proposer_id).unwrap(),
|
|
|
|
|
self.epoch,
|
|
|
|
|
);
|
|
|
|
|
CoinState::InProgress(CommonCoin::new(self.netinfo.clone(), nonce))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Decides on a value and broadcasts a `Term` message with that value.
|
|
|
|
|
fn decide(&mut self, b: bool) -> Step<NodeUid> {
|
|
|
|
|
if self.terminated {
|
|
|
|
|
if self.decision.is_some() {
|
|
|
|
|
return Step::default();
|
|
|
|
|
}
|
|
|
|
|
// Output the agreement value.
|
|
|
|
@ -551,94 +556,87 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|
|
|
|
b
|
|
|
|
|
);
|
|
|
|
|
if self.netinfo.is_validator() {
|
|
|
|
|
let msg = AgreementContent::Term(b).with_epoch(self.epoch);
|
|
|
|
|
let msg = AgreementContent::Term(b).with_epoch(self.epoch + 1);
|
|
|
|
|
step.messages.push_back(Target::All.message(msg));
|
|
|
|
|
self.received_term.insert(self.netinfo.our_uid().clone(), b);
|
|
|
|
|
}
|
|
|
|
|
self.terminated = true;
|
|
|
|
|
step
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Checks whether the _N - f_ `Conf` messages have arrived, and if so, activates the coin.
|
|
|
|
|
fn try_finish_conf_round(&mut self) -> Result<Step<NodeUid>> {
|
|
|
|
|
if self.conf_round
|
|
|
|
|
&& self.count_conf().0 >= self.netinfo.num_nodes() - self.netinfo.num_faulty()
|
|
|
|
|
{
|
|
|
|
|
// Invoke the common coin.
|
|
|
|
|
let coin_step = self.common_coin.input(())?;
|
|
|
|
|
self.on_coin_step(coin_step)
|
|
|
|
|
} else {
|
|
|
|
|
// Continue waiting for (N - f) `Conf` messages
|
|
|
|
|
Ok(Step::default())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn send_aux(&mut self, b: bool) -> Result<Step<NodeUid>> {
|
|
|
|
|
if !self.netinfo.is_validator() {
|
|
|
|
|
if self.conf_values.is_none() || self.count_conf() < self.netinfo.num_correct() {
|
|
|
|
|
return Ok(Step::default());
|
|
|
|
|
}
|
|
|
|
|
// Multicast `Aux`.
|
|
|
|
|
let mut step: Step<NodeUid> = Target::All
|
|
|
|
|
.message(AgreementContent::Aux(b).with_epoch(self.epoch))
|
|
|
|
|
.into();
|
|
|
|
|
// Receive the `Aux` message locally.
|
|
|
|
|
let our_uid = &self.netinfo.our_uid().clone();
|
|
|
|
|
step.extend(self.handle_aux(our_uid, b)?);
|
|
|
|
|
|
|
|
|
|
// Invoke the common coin.
|
|
|
|
|
let coin_step = match self.coin_state {
|
|
|
|
|
CoinState::Decided(_) => return Ok(Step::default()), // Coin has already decided.
|
|
|
|
|
CoinState::InProgress(ref mut common_coin) => common_coin.input(())?,
|
|
|
|
|
};
|
|
|
|
|
let mut step = self.on_coin_step(coin_step)?;
|
|
|
|
|
step.extend(self.try_update_epoch()?);
|
|
|
|
|
Ok(step)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Counts the number of received `Conf` messages with values in `bin_values`.
|
|
|
|
|
fn count_conf(&self) -> usize {
|
|
|
|
|
let is_bin_val = |conf: &&BinValues| conf.is_subset(self.bin_values);
|
|
|
|
|
self.received_conf.values().filter(is_bin_val).count()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// The count of `Aux` messages such that the set of values carried by those messages is a
|
|
|
|
|
/// subset of bin_values_r. The count of matching `Term` messages from terminated nodes is also
|
|
|
|
|
/// added to the count of `Aux` messages as witnesses of the terminated nodes' decision.
|
|
|
|
|
/// subset of `bin_values`.
|
|
|
|
|
///
|
|
|
|
|
/// In general, we can't expect every good node to send the same `Aux` value, so waiting for N -
|
|
|
|
|
/// f agreeing messages would not always terminate. We can, however, expect every good node to
|
|
|
|
|
/// send an `Aux` value that will eventually end up in our `bin_values`.
|
|
|
|
|
/// In general, we can't expect every good node to send the same `Aux` value, so waiting for
|
|
|
|
|
/// _N - f_ agreeing messages would not always terminate. We can, however, expect every good
|
|
|
|
|
/// node to send an `Aux` value that will eventually end up in our `bin_values`.
|
|
|
|
|
fn count_aux(&self) -> (usize, BinValues) {
|
|
|
|
|
let aux: BTreeMap<_, _> = self
|
|
|
|
|
.received_aux
|
|
|
|
|
.iter()
|
|
|
|
|
.filter(|(_, &b)| self.bin_values.contains(b))
|
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
|
|
let bin: BinValues = aux.values().map(|&&v| BinValues::from_bool(v)).collect();
|
|
|
|
|
(aux.len(), bin)
|
|
|
|
|
let mut values = BinValues::None;
|
|
|
|
|
let mut count = 0;
|
|
|
|
|
for b in self.bin_values {
|
|
|
|
|
let b_count = self.received_aux.get(b).map_or(0, BTreeSet::len);
|
|
|
|
|
if b_count > 0 {
|
|
|
|
|
values.insert(*b);
|
|
|
|
|
count += b_count;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
(count, values)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Counts the number of received `Conf` messages.
|
|
|
|
|
fn count_conf(&self) -> (usize, BinValues) {
|
|
|
|
|
let (vals_cnt, vals) = self
|
|
|
|
|
.received_conf
|
|
|
|
|
.values()
|
|
|
|
|
.filter(|&conf| conf.is_subset(self.bin_values))
|
|
|
|
|
.tee();
|
|
|
|
|
|
|
|
|
|
(vals_cnt.count(), vals.cloned().collect())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn update_epoch(&mut self) {
|
|
|
|
|
/// Increments the epoch, sets the new estimate and handles queued messages.
|
|
|
|
|
fn update_epoch(&mut self, b: bool) -> Result<Step<NodeUid>> {
|
|
|
|
|
self.bin_values.clear();
|
|
|
|
|
self.received_bval.clear();
|
|
|
|
|
self.received_bval = self.received_term.clone();
|
|
|
|
|
self.sent_bval.clear();
|
|
|
|
|
self.received_aux.clear();
|
|
|
|
|
self.received_aux = self.received_term.clone();
|
|
|
|
|
self.received_conf.clear();
|
|
|
|
|
self.conf_round = false;
|
|
|
|
|
for (v, ids) in &self.received_term {
|
|
|
|
|
for id in ids {
|
|
|
|
|
self.received_conf
|
|
|
|
|
.insert(id.clone(), BinValues::from_bool(*v));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
self.conf_values = None;
|
|
|
|
|
self.epoch += 1;
|
|
|
|
|
let nonce = Nonce::new(
|
|
|
|
|
self.netinfo.invocation_id().as_ref(),
|
|
|
|
|
self.session_id,
|
|
|
|
|
self.netinfo.node_index(&self.proposer_id).unwrap(),
|
|
|
|
|
self.epoch,
|
|
|
|
|
);
|
|
|
|
|
// TODO: Don't spend time creating a `CommonCoin` instance in epochs where the common coin
|
|
|
|
|
// is known.
|
|
|
|
|
self.common_coin = CommonCoin::new(self.netinfo.clone(), nonce);
|
|
|
|
|
self.coin_schedule = self.coin_schedule();
|
|
|
|
|
self.coin_state = self.coin_state();
|
|
|
|
|
debug!(
|
|
|
|
|
"{:?} Agreement instance {:?} started epoch {}",
|
|
|
|
|
"{:?} Agreement instance {:?} started epoch {}, {} terminated",
|
|
|
|
|
self.netinfo.our_uid(),
|
|
|
|
|
self.proposer_id,
|
|
|
|
|
self.epoch
|
|
|
|
|
self.epoch,
|
|
|
|
|
self.received_conf.len(),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
self.estimated = Some(b);
|
|
|
|
|
let mut step = self.send_bval(b)?;
|
|
|
|
|
let queued_msgs = Itertools::flatten(self.incoming_queue.remove(&self.epoch).into_iter());
|
|
|
|
|
for (sender_id, content) in queued_msgs {
|
|
|
|
|
step.extend(self.handle_message_content(&sender_id, content)?);
|
|
|
|
|
if self.decision.is_some() {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Ok(step)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|