hbbft/src/honey_badger/honey_badger.rs

169 lines
5.9 KiB
Rust
Raw Normal View History

use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::sync::Arc;
use bincode;
use itertools::Itertools;
2018-07-31 13:27:22 -07:00
use rand::Rand;
use serde::{Deserialize, Serialize};
use super::epoch_state::EpochState;
2018-07-31 13:27:22 -07:00
use super::{Batch, Error, ErrorKind, HoneyBadgerBuilder, Message, MessageContent, Result};
use messaging::{self, DistAlgorithm, NetworkInfo};
use traits::{Contribution, NodeUidT};
/// An instance of the Honey Badger Byzantine fault tolerant consensus algorithm.
#[derive(Debug)]
pub struct HoneyBadger<C, N: Rand> {
/// Shared network data.
pub(super) netinfo: Arc<NetworkInfo<N>>,
/// The earliest epoch from which we have not yet received output.
2018-07-31 13:27:22 -07:00
pub(super) epoch: u64,
/// Whether we have already submitted a proposal for the current epoch.
2018-07-31 13:27:22 -07:00
pub(super) has_input: bool,
/// The subalgorithms for ongoing epochs.
pub(super) epochs: BTreeMap<u64, EpochState<C, N>>,
/// The maximum number of `CommonSubset` instances that we run simultaneously.
2018-07-31 13:27:22 -07:00
pub(super) max_future_epochs: u64,
/// Messages for future epochs that couldn't be handled yet.
pub(super) incoming_queue: BTreeMap<u64, Vec<(N, MessageContent<N>)>>,
}
pub type Step<C, N> = messaging::Step<HoneyBadger<C, N>>;
2018-07-09 04:35:26 -07:00
impl<C, N> DistAlgorithm for HoneyBadger<C, N>
where
C: Contribution + Serialize + for<'r> Deserialize<'r>,
N: NodeUidT + Rand,
{
type NodeUid = N;
type Input = C;
type Output = Batch<C, N>;
type Message = Message<N>;
type Error = Error;
fn input(&mut self, input: Self::Input) -> Result<Step<C, N>> {
self.propose(&input)
}
fn handle_message(&mut self, sender_id: &N, message: Self::Message) -> Result<Step<C, N>> {
2018-07-17 06:54:12 -07:00
if !self.netinfo.is_node_validator(sender_id) {
2018-05-20 04:51:33 -07:00
return Err(ErrorKind::UnknownSender.into());
}
let Message { epoch, content } = message;
if epoch > self.epoch + self.max_future_epochs {
// Postpone handling this message.
self.incoming_queue
.entry(epoch)
.or_insert_with(Vec::new)
.push((sender_id.clone(), content));
2018-07-09 04:35:26 -07:00
} else if epoch == self.epoch {
return self.handle_message_content(sender_id, epoch, content);
2018-07-09 04:35:26 -07:00
} // And ignore all messages from past epochs.
Ok(Step::default())
}
fn terminated(&self) -> bool {
false
}
fn our_id(&self) -> &N {
self.netinfo.our_uid()
}
}
impl<C, N> HoneyBadger<C, N>
where
C: Contribution + Serialize + for<'r> Deserialize<'r>,
N: NodeUidT + Rand,
{
/// Returns a new `HoneyBadgerBuilder` configured to use the node IDs and cryptographic keys
2018-06-28 14:07:11 -07:00
/// specified by `netinfo`.
pub fn builder(netinfo: Arc<NetworkInfo<N>>) -> HoneyBadgerBuilder<C, N> {
2018-06-28 14:07:11 -07:00
HoneyBadgerBuilder::new(netinfo)
}
/// Proposes a new item in the current epoch.
pub fn propose(&mut self, proposal: &C) -> Result<Step<C, N>> {
if !self.netinfo.is_validator() {
return Ok(Step::default());
}
self.has_input = true;
let epoch = self.epoch;
let ser_prop =
bincode::serialize(&proposal).map_err(|err| ErrorKind::ProposeBincode(*err))?;
let ciphertext = self.netinfo.public_key_set().public_key().encrypt(ser_prop);
let mut step = match self.epochs.entry(epoch) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => entry.insert(EpochState::new(self.netinfo.clone(), epoch)?),
}.propose(&ciphertext)?;
step.extend(self.try_output_batches()?);
Ok(step)
}
/// Returns `true` if input for the current epoch has already been provided.
pub fn has_input(&self) -> bool {
!self.netinfo.is_validator() || self.has_input
}
/// Returns the number of validators from which we have already received a proposal for the
/// current epoch.
pub(crate) fn received_proposals(&self) -> usize {
self.epochs
.get(&self.epoch)
.map_or(0, EpochState::received_proposals)
}
/// Handles a message for the given epoch.
fn handle_message_content(
&mut self,
sender_id: &N,
epoch: u64,
content: MessageContent<N>,
) -> Result<Step<C, N>> {
let mut step = match self.epochs.entry(epoch) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
entry.insert(EpochState::new(self.netinfo.clone(), self.epoch)?)
}
}.handle_message_content(sender_id, content)?;
step.extend(self.try_output_batches()?);
Ok(step)
}
/// Increments the epoch number and clears any state that is local to the finished epoch.
fn update_epoch(&mut self) -> Result<Step<C, N>> {
// Clear the state of the old epoch.
self.epochs.remove(&self.epoch);
self.epoch += 1;
self.has_input = false;
let max_epoch = self.epoch + self.max_future_epochs;
let mut step = Step::default();
// TODO: Once stable, use `Iterator::flatten`.
for (sender_id, content) in
Itertools::flatten(self.incoming_queue.remove(&max_epoch).into_iter())
{
step.extend(self.handle_message_content(&sender_id, max_epoch, content)?);
}
// Handle any decryption shares received for the new epoch.
step.extend(self.try_output_batches()?);
Ok(step)
}
/// Tries to decrypt contributions from all proposers and output those in a batch.
fn try_output_batches(&mut self) -> Result<Step<C, N>> {
let mut step = Step::default();
while let Some((batch, fault_log)) = self
.epochs
.get(&self.epoch)
.and_then(EpochState::try_output_batch)
{
// Queue the output and advance the epoch.
step.output.push_back(batch);
step.fault_log.extend(fault_log);
step.extend(self.update_epoch()?);
}
Ok(step)
}
}