hbbft/src/binary_agreement/binary_agreement.rs

539 lines
20 KiB
Rust

use std::collections::BTreeMap;
use std::sync::Arc;
use std::{fmt, result};
use crate::crypto::SignatureShare;
use bincode;
use log::debug;
use rand::Rng;
use super::bool_multimap::BoolMultimap;
use super::bool_set::{self, BoolSet};
use super::sbv_broadcast::{self, Message as SbvMessage, SbvBroadcast};
use super::{Error, FaultKind, Message, MessageContent, Result, Step};
use crate::fault_log::Fault;
use crate::threshold_sign::{self, Message as TsMessage, ThresholdSign};
use crate::{ConsensusProtocol, NetworkInfo, NodeIdT, SessionIdT, Target};
/// The state of the current epoch's coin. In some epochs this is fixed, in others it starts
/// with in `InProgress`.
#[derive(Debug)]
enum CoinState<N> {
/// The value was fixed in the current epoch, or the coin has already terminated.
Decided(bool),
/// The coin value is not known yet.
InProgress(Box<ThresholdSign<N>>),
}
impl<N> CoinState<N> {
/// Returns the value, if this coin has already decided.
fn value(&self) -> Option<bool> {
match self {
CoinState::Decided(value) => Some(*value),
CoinState::InProgress(_) => None,
}
}
}
impl<N> From<bool> for CoinState<N> {
fn from(value: bool) -> Self {
CoinState::Decided(value)
}
}
/// Binary Agreeement messages received from other nodes for a particular Binary Agreement epoch.
#[derive(Debug)]
struct ReceivedMessages {
/// Received `BVal` messages.
bval: BoolSet,
/// Received `Aux` messages.
aux: BoolSet,
/// Received `Conf` message.
conf: Option<BoolSet>,
/// Received `Term` message.
term: Option<bool>,
/// Received `Coin` message, namely its `SignatureShare`.
coin: Option<SignatureShare>,
}
impl ReceivedMessages {
fn new() -> Self {
ReceivedMessages {
bval: bool_set::NONE,
aux: bool_set::NONE,
conf: None,
term: None,
coin: None,
}
}
/// Inserts new message content if it is accepted or returns a `FaultKind` indicating an issue
/// that prevented insertion of that new content.
fn insert(&mut self, content: MessageContent) -> Option<FaultKind> {
match content {
MessageContent::SbvBroadcast(sbv) => match sbv {
sbv_broadcast::Message::BVal(b) => {
if !self.bval.insert(b) {
return Some(FaultKind::DuplicateBVal);
}
}
sbv_broadcast::Message::Aux(b) => {
if !self.aux.insert(b) {
return Some(FaultKind::DuplicateAux);
}
}
},
MessageContent::Conf(bs) => {
if self.conf.is_none() {
self.conf = Some(bs);
} else {
return Some(FaultKind::MultipleConf);
}
}
MessageContent::Term(b) => {
if self.term.is_none() {
self.term = Some(b);
} else {
return Some(FaultKind::MultipleTerm);
}
}
MessageContent::Coin(msg) => {
if self.coin.is_none() {
self.coin = Some(msg.0);
} else {
return Some(FaultKind::AgreementEpoch);
}
}
}
None
}
/// Creates message content from `ReceivedMessages`. That message content can then be handled.
fn messages(self) -> Vec<MessageContent> {
let ReceivedMessages {
bval,
aux,
conf,
term,
coin,
} = self;
let mut messages = Vec::new();
for b in bval {
messages.push(MessageContent::SbvBroadcast(SbvMessage::BVal(b)));
}
for b in aux {
messages.push(MessageContent::SbvBroadcast(SbvMessage::Aux(b)));
}
if let Some(bs) = conf {
messages.push(MessageContent::Conf(bs));
}
if let Some(b) = term {
messages.push(MessageContent::Term(b));
}
if let Some(ss) = coin {
messages.push(MessageContent::Coin(Box::new(TsMessage(ss))));
}
messages
}
}
/// Binary Agreement instance
#[derive(Debug)]
pub struct BinaryAgreement<N, S> {
/// Shared network information.
netinfo: Arc<NetworkInfo<N>>,
/// Session identifier, to prevent replaying messages in other instances.
session_id: S,
/// Binary Agreement algorithm epoch.
epoch: u64,
/// Maximum number of future epochs for which incoming messages are accepted.
max_future_epochs: u64,
/// This epoch's Synchronized Binary Value Broadcast instance.
sbv_broadcast: SbvBroadcast<N>,
/// Received `Conf` messages. Reset on every epoch update.
received_conf: BTreeMap<N, BoolSet>,
/// Received `Term` messages. Kept throughout epoch updates. These count as `BVal`, `Aux` and
/// `Conf` messages for all future epochs.
received_term: BoolMultimap<N>,
/// The estimate of the decision value in the current epoch.
estimated: Option<bool>,
/// A permanent, latching copy of the output value. This copy is required because `output` can
/// be consumed using `ConsensusProtocol::next_output` immediately after the instance finishing to
/// handle a message, in which case it would otherwise be unknown whether the output value was
/// ever there at all. While the output value will still be required in a later epoch to decide
/// the termination state.
decision: Option<bool>,
/// A cache for messages for future epochs that cannot be handled yet.
incoming_queue: BTreeMap<u64, BTreeMap<N, ReceivedMessages>>,
/// The values we found in the first _N - f_ `Aux` messages that were in `bin_values`.
conf_values: Option<BoolSet>,
/// The state of this epoch's coin.
coin_state: CoinState<N>,
}
impl<N: NodeIdT, S: SessionIdT> ConsensusProtocol for BinaryAgreement<N, S> {
type NodeId = N;
type Input = bool;
type Output = bool;
type Message = Message;
type Error = Error;
type FaultKind = FaultKind;
fn handle_input<R: Rng>(&mut self, input: Self::Input, _rng: &mut R) -> Result<Step<N>> {
self.propose(input)
}
/// Receive input from a remote node.
fn handle_message<R: Rng>(
&mut self,
sender_id: &Self::NodeId,
message: Message,
_rng: &mut R,
) -> Result<Step<N>> {
self.handle_message(sender_id, message)
}
/// Whether the algorithm has terminated.
fn terminated(&self) -> bool {
self.decision.is_some()
}
fn our_id(&self) -> &Self::NodeId {
self.netinfo.our_id()
}
}
impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
/// Creates a new `BinaryAgreement` instance with the given session identifier, to prevent
/// replaying messages in other instances.
pub fn new(netinfo: Arc<NetworkInfo<N>>, session_id: S) -> Result<Self> {
Ok(BinaryAgreement {
netinfo: netinfo.clone(),
session_id,
epoch: 0,
max_future_epochs: 1000,
sbv_broadcast: SbvBroadcast::new(netinfo),
received_conf: BTreeMap::new(),
received_term: BoolMultimap::default(),
estimated: None,
decision: None,
incoming_queue: BTreeMap::new(),
conf_values: None,
coin_state: CoinState::Decided(true),
})
}
/// Proposes a boolean value for Binary Agreement.
///
/// If more than two thirds of validators propose the same value, that will eventually be
/// output. Otherwise either output is possible.
///
/// Note that if `can_propose` returns `false`, it is already too late to affect the outcome.
pub fn propose(&mut self, input: bool) -> Result<Step<N>> {
if !self.can_propose() {
return Ok(Step::default());
}
// Set the initial estimated value to the input value.
self.estimated = Some(input);
let sbvb_step = self.sbv_broadcast.send_bval(input)?;
self.handle_sbvb_step(sbvb_step)
}
/// Handles a message received from `sender_id`.
///
/// This must be called with every message we receive from another node.
pub fn handle_message(&mut self, sender_id: &N, msg: Message) -> Result<Step<N>> {
let Message { epoch, content } = msg;
if self.decision.is_some() || (epoch < self.epoch && content.can_expire()) {
// Message is obsolete: We are already in a later epoch or terminated.
Ok(Step::default())
} else if epoch > self.epoch + self.max_future_epochs {
Ok(Fault::new(sender_id.clone(), FaultKind::AgreementEpoch).into())
} else if epoch > self.epoch {
// Message is for a later epoch. We can't handle that yet.
let epoch_state = self
.incoming_queue
.entry(epoch)
.or_insert_with(BTreeMap::new);
let received = epoch_state
.entry(sender_id.clone())
.or_insert_with(ReceivedMessages::new);
Ok(received
.insert(content)
.map_or_else(Step::default, |fault| {
Fault::new(sender_id.clone(), fault).into()
}))
} else {
self.handle_message_content(sender_id, content)
}
}
/// Whether we can still input a value. It is not an error to input if this returns `false`,
/// but it will have no effect on the outcome.
pub fn can_propose(&self) -> bool {
self.epoch == 0 && self.estimated.is_none()
}
/// Dispatches the message content to the corresponding handling method.
fn handle_message_content(
&mut self,
sender_id: &N,
content: MessageContent,
) -> Result<Step<N>> {
match content {
MessageContent::SbvBroadcast(msg) => self.handle_sbv_broadcast(sender_id, &msg),
MessageContent::Conf(v) => self.handle_conf(sender_id, v),
MessageContent::Term(v) => self.handle_term(sender_id, v),
MessageContent::Coin(msg) => self.handle_coin(sender_id, *msg),
}
}
/// Handles a Synchroniced Binary Value Broadcast message.
fn handle_sbv_broadcast(
&mut self,
sender_id: &N,
msg: &sbv_broadcast::Message,
) -> Result<Step<N>> {
let sbvb_step = self.sbv_broadcast.handle_message(sender_id, &msg)?;
self.handle_sbvb_step(sbvb_step)
}
/// Handles a Synchronized Binary Value Broadcast step. On output, starts the `Conf` round or
/// decides.
fn handle_sbvb_step(&mut self, sbvb_step: sbv_broadcast::Step<N>) -> Result<Step<N>> {
let mut step = Step::default();
let output = step.extend_with(
sbvb_step,
|fault| fault,
|msg| MessageContent::SbvBroadcast(msg).with_epoch(self.epoch),
);
if self.conf_values.is_some() {
return Ok(step); // The `Conf` round has already started.
}
if let Some(aux_vals) = output.into_iter().next() {
// Execute the Coin schedule `false, true, get_coin(), false, true, get_coin(), ...`
match self.coin_state {
CoinState::Decided(_) => {
self.conf_values = Some(aux_vals);
step.extend(self.try_update_epoch()?)
}
CoinState::InProgress(_) => {
// Start the `Conf` message round.
step.extend(self.send_conf(aux_vals)?)
}
}
}
Ok(step)
}
/// Handles a `Conf` message. When _N - f_ `Conf` messages with values in `bin_values` have
/// been received, updates the epoch or decides.
fn handle_conf(&mut self, sender_id: &N, v: BoolSet) -> Result<Step<N>> {
self.received_conf.insert(sender_id.clone(), v);
self.try_finish_conf_round()
}
/// Handles a `Term(v)` message. If we haven't yet decided on a value and there are more than
/// _f_ such messages with the same value from different nodes, performs expedite termination:
/// decides on `v`, broadcasts `Term(v)` and terminates the instance.
fn handle_term(&mut self, sender_id: &N, b: bool) -> Result<Step<N>> {
self.received_term[b].insert(sender_id.clone());
// Check for the expedite termination condition.
if self.decision.is_some() {
Ok(Step::default())
} else if self.received_term[b].len() > self.netinfo.num_faulty() {
Ok(self.decide(b))
} else {
// Otherwise handle the `Term` as a `BVal`, `Aux` and `Conf`.
let mut sbvb_step = self.sbv_broadcast.handle_bval(sender_id, b)?;
sbvb_step.extend(self.sbv_broadcast.handle_aux(sender_id, b)?);
let step = self.handle_sbvb_step(sbvb_step)?;
Ok(step.join(self.handle_conf(sender_id, BoolSet::from(b))?))
}
}
/// Handles a `ThresholdSign` message. If there is output, starts the next epoch. The function
/// may output a decision value.
fn handle_coin(&mut self, sender_id: &N, msg: threshold_sign::Message) -> Result<Step<N>> {
let ts_step = match self.coin_state {
CoinState::Decided(_) => return Ok(Step::default()), // Coin value is already decided.
CoinState::InProgress(ref mut ts) => ts
.handle_message(sender_id, msg)
.map_err(Error::HandleThresholdSign)?,
};
self.on_coin_step(ts_step)
}
/// Multicasts a `Conf(values)` message, and handles it.
fn send_conf(&mut self, values: BoolSet) -> Result<Step<N>> {
if self.conf_values.is_some() {
// Only one `Conf` message is allowed in an epoch.
return Ok(Step::default());
}
// Trigger the start of the `Conf` round.
self.conf_values = Some(values);
if !self.netinfo.is_validator() {
return Ok(self.try_finish_conf_round()?);
}
self.send(MessageContent::Conf(values))
}
/// Multicasts and handles a message. Does nothing if we are only an observer.
fn send(&mut self, content: MessageContent) -> Result<Step<N>> {
if !self.netinfo.is_validator() {
return Ok(Step::default());
}
let step: Step<N> = Target::all()
.message(content.clone().with_epoch(self.epoch))
.into();
let our_id = &self.our_id().clone();
Ok(step.join(self.handle_message_content(our_id, content)?))
}
/// Handles a step returned from the `ThresholdSign`.
fn on_coin_step(&mut self, ts_step: threshold_sign::Step<N>) -> Result<Step<N>> {
let mut step = Step::default();
let epoch = self.epoch;
let to_msg = |c_msg| MessageContent::Coin(Box::new(c_msg)).with_epoch(epoch);
let ts_output = step.extend_with(ts_step, FaultKind::CoinFault, to_msg);
if let Some(sig) = ts_output.into_iter().next() {
// Take the parity of the signature as the coin value.
self.coin_state = sig.parity().into();
step.extend(self.try_update_epoch()?);
}
Ok(step)
}
/// If this epoch's coin value or conf values are not known yet, does nothing, otherwise
/// updates the epoch or decides.
///
/// With two conf values, the next epoch's estimate is the coin value. If there is only one conf
/// value and that disagrees with the coin, the conf value is the next epoch's estimate. If
/// the unique conf value agrees with the coin, terminates and decides on that value.
fn try_update_epoch(&mut self) -> Result<Step<N>> {
if self.decision.is_some() {
// Avoid an infinite regression without making a Binary Agreement step.
return Ok(Step::default());
}
let coin = match self.coin_state.value() {
None => return Ok(Step::default()), // Still waiting for coin value.
Some(coin) => coin,
};
let def_bin_value = match self.conf_values {
None => return Ok(Step::default()), // Still waiting for conf value.
Some(ref values) => values.definite(),
};
if Some(coin) == def_bin_value {
Ok(self.decide(coin))
} else {
self.update_epoch(def_bin_value.unwrap_or(coin))
}
}
/// Creates the initial coin state for the current epoch, i.e. sets it to the predetermined
/// value, or initializes a `ThresholdSign` instance.
fn coin_state(&self) -> Result<CoinState<N>> {
match self.epoch % 3 {
0 => Ok(CoinState::Decided(true)),
1 => Ok(CoinState::Decided(false)),
_ => {
let coin_id = bincode::serialize(&(&self.session_id, self.epoch))?;
let mut ts = ThresholdSign::new(self.netinfo.clone());
ts.set_document(coin_id).map_err(Error::InvokeCoin)?;
Ok(CoinState::InProgress(Box::new(ts)))
}
}
}
/// Decides on a value and broadcasts a `Term` message with that value.
fn decide(&mut self, b: bool) -> Step<N> {
if self.decision.is_some() {
return Step::default();
}
// Output the Binary Agreement value.
let mut step = Step::default();
step.output.push(b);
// Latch the decided state.
self.decision = Some(b);
debug!("{}: decision: {}", self, b);
if self.netinfo.is_validator() {
let msg = MessageContent::Term(b).with_epoch(self.epoch + 1);
step.messages.push(Target::all().message(msg));
}
step
}
/// Checks whether the _N - f_ `Conf` messages have arrived, and if so, activates the coin.
fn try_finish_conf_round(&mut self) -> Result<Step<N>> {
if self.conf_values.is_none() || self.count_conf() < self.netinfo.num_correct() {
return Ok(Step::default());
}
// Invoke the coin.
let ts_step = match self.coin_state {
CoinState::Decided(_) => return Ok(Step::default()), // Coin has already decided.
CoinState::InProgress(ref mut ts) => ts.sign().map_err(Error::InvokeCoin)?,
};
Ok(self.on_coin_step(ts_step)?.join(self.try_update_epoch()?))
}
/// Counts the number of received `Conf` messages with values in `bin_values`.
fn count_conf(&self) -> usize {
let is_bin_val = |conf: &&BoolSet| conf.is_subset(self.sbv_broadcast.bin_values());
self.received_conf.values().filter(is_bin_val).count()
}
/// Increments the epoch, sets the new estimate and handles queued messages.
fn update_epoch(&mut self, b: bool) -> Result<Step<N>> {
self.sbv_broadcast.clear(&self.received_term);
self.received_conf.clear();
for (v, id) in &self.received_term {
self.received_conf.insert(id.clone(), BoolSet::from(v));
}
self.conf_values = None;
self.epoch += 1;
self.coin_state = self.coin_state()?;
debug!(
"{}: epoch started, {} terminated",
self,
self.received_conf.len(),
);
self.estimated = Some(b);
let sbvb_step = self.sbv_broadcast.send_bval(b)?;
let mut step = self.handle_sbvb_step(sbvb_step)?;
let epoch = self.epoch;
let epoch_state = self.incoming_queue.remove(&epoch).into_iter().flatten();
for (sender_id, received) in epoch_state {
for m in received.messages() {
step.extend(self.handle_message_content(&sender_id, m)?);
if self.decision.is_some() || self.epoch > epoch {
return Ok(step);
}
}
}
Ok(step)
}
}
impl<N: NodeIdT, S: SessionIdT> fmt::Display for BinaryAgreement<N, S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> result::Result<(), fmt::Error> {
write!(
f,
"{:?} BA {} epoch {} ({})",
self.our_id(),
self.session_id,
self.epoch,
if self.netinfo.is_validator() {
"validator"
} else {
"observer"
}
)
}
}