Separate queue from Honey Badger.

This makes Honey Badger a bit more complicated but a lot more flexible:
It is now unaware of transactions and basically just runs one Subset
instance per epoch.

That way, users can use any kind of external queue, control throttling
and prioritization.
This commit is contained in:
Andreas Fackler 2018-07-09 14:29:01 +02:00
parent b546c16a5e
commit c1b4381753
10 changed files with 717 additions and 376 deletions

View File

@ -12,7 +12,6 @@ extern crate serde_derive;
extern crate signifix;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::rc::Rc;
use std::time::{Duration, Instant};
use std::{cmp, u64};
@ -25,8 +24,8 @@ use serde::Serialize;
use signifix::{metric, TryFrom};
use hbbft::crypto::SecretKeySet;
use hbbft::honey_badger::{Batch, HoneyBadger};
use hbbft::messaging::{DistAlgorithm, NetworkInfo, Target};
use hbbft::queueing_honey_badger::{Batch, QueueingHoneyBadger};
const VERSION: &str = env!("CARGO_PKG_VERSION");
const USAGE: &str = "
@ -338,7 +337,7 @@ impl EpochInfo {
id: NodeUid,
time: Duration,
batch: &Batch<Transaction, NodeUid>,
network: &TestNetwork<HoneyBadger<Transaction, NodeUid>>,
network: &TestNetwork<QueueingHoneyBadger<Transaction, NodeUid>>,
) {
if self.nodes.contains_key(&id) {
return;
@ -370,12 +369,12 @@ impl EpochInfo {
/// Proposes `num_txs` values and expects nodes to output and order them.
fn simulate_honey_badger(
mut network: TestNetwork<HoneyBadger<Transaction, NodeUid>>,
mut network: TestNetwork<QueueingHoneyBadger<Transaction, NodeUid>>,
num_txs: usize,
) {
// Returns `true` if the node has not output all transactions yet.
// If it has, and has advanced another epoch, it clears all messages for later epochs.
let node_busy = |node: &mut TestNode<HoneyBadger<Transaction, NodeUid>>| {
let node_busy = |node: &mut TestNode<QueueingHoneyBadger<Transaction, NodeUid>>| {
node.outputs
.iter()
.map(|&(_, ref batch)| batch.len())
@ -430,17 +429,16 @@ fn main() {
let sk_set = SecretKeySet::random(args.flag_f, &mut rand::thread_rng());
let pk_set = sk_set.public_keys();
let new_honey_badger = |id: NodeUid, all_ids: BTreeSet<NodeUid>| {
let netinfo = Rc::new(NetworkInfo::new(
let netinfo = NetworkInfo::new(
id,
all_ids,
sk_set.secret_key_share(id.0 as u64),
pk_set.clone(),
));
HoneyBadger::builder(netinfo)
);
QueueingHoneyBadger::builder(netinfo)
.batch_size(args.flag_b)
.build_with_transactions(txs.clone())
.expect("Instantiate honey_badger")
.0
};
let hw_quality = HwQuality {
latency: Duration::from_millis(args.flag_lag),

View File

@ -1,10 +1,10 @@
//! # Dynamic Honey Badger
//!
//! Like Honey Badger, this protocol allows a network of _N_ nodes with at most _f_ faulty ones,
//! where _3 f < N_, to input "transactions" - any kind of data -, and to agree on a sequence of
//! _batches_ of transactions. The protocol proceeds in _epochs_, starting at number 0, and outputs
//! where _3 f < N_, to input "contributions" - any kind of data -, and to agree on a sequence of
//! _batches_ of contributions. The protocol proceeds in _epochs_, starting at number 0, and outputs
//! one batch in each epoch. It never terminates: It handles a continuous stream of incoming
//! transactions and keeps producing new batches from them. All correct nodes will output the same
//! contributions and keeps producing new batches from them. All correct nodes will output the same
//! batch for each epoch.
//!
//! Unlike Honey Badger, this algorithm allows dynamically adding new validators from the pool of
@ -29,14 +29,13 @@
//! ## How it works
//!
//! Dynamic Honey Badger runs a regular Honey Badger instance internally, which in addition to the
//! user's transactions contains special transactions for the change votes and the key generation
//! messages: Running votes and key generation "on-chain", as transactions, greatly simplifies
//! these processes, since it is guaranteed that every node will see the same sequence of votes and
//! messages.
//! user's contributions contains special transactions for the change votes and the key generation
//! messages: Running votes and key generation "on-chain" greatly simplifies these processes, since
//! it is guaranteed that every node will see the same sequence of votes and messages.
//!
//! Every time Honey Badger outputs a new batch, Dynamic Honey Badger outputs the user transactions
//! in its own batch. The other transactions are processed: votes are counted and key generation
//! messages are passed into a `SyncKeyGen` instance.
//! Every time Honey Badger outputs a new batch, Dynamic Honey Badger outputs the user
//! contributions in its own batch. The other transactions are processed: votes are counted and key
//! generation messages are passed into a `SyncKeyGen` instance.
//!
//! If after an epoch key generation has completed, the Honey Badger instance (including all
//! pending batches) is dropped, and replaced by a new one with the new set of participants.
@ -57,8 +56,8 @@ use clear_on_drop::ClearOnDrop;
use serde::{Deserialize, Serialize};
use crypto::{PublicKey, PublicKeySet, SecretKey, Signature};
use fault_log::{FaultKind, FaultLog};
use honey_badger::{self, Batch as HbBatch, HoneyBadger, Message as HbMessage};
use fault_log::FaultLog;
use honey_badger::{self, HoneyBadger, Message as HbMessage};
use messaging::{DistAlgorithm, NetworkInfo, Target, TargetedMessage};
use sync_key_gen::{Accept, Propose, ProposeOutcome, SyncKeyGen};
@ -112,30 +111,28 @@ pub enum ChangeState<NodeUid> {
/// The user input for `DynamicHoneyBadger`.
#[derive(Clone, Debug)]
pub enum Input<Tx, NodeUid> {
/// A user-defined transaction.
User(Tx),
/// A vote to change the set of nodes.
pub enum Input<C, NodeUid> {
/// A user-defined contribution for the next epoch.
User(C),
/// A vote to change the set of validators.
Change(Change<NodeUid>),
}
/// A Dynamic Honey Badger builder, to configure the parameters and create new instances of
/// `DynamicHoneyBadger`.
pub struct DynamicHoneyBadgerBuilder<Tx, NodeUid> {
pub struct DynamicHoneyBadgerBuilder<C, NodeUid> {
/// Shared network data.
netinfo: NetworkInfo<NodeUid>,
/// The target number of transactions to be included in each batch.
batch_size: usize,
/// The epoch at which to join the network.
start_epoch: u64,
/// The maximum number of future epochs for which we handle messages simultaneously.
max_future_epochs: usize,
_phantom: PhantomData<Tx>,
_phantom: PhantomData<C>,
}
impl<Tx, NodeUid> DynamicHoneyBadgerBuilder<Tx, NodeUid>
impl<C, NodeUid> DynamicHoneyBadgerBuilder<C, NodeUid>
where
Tx: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash,
C: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash,
NodeUid: Eq + Ord + Clone + Debug + Serialize + for<'r> Deserialize<'r> + Hash,
{
/// Returns a new `DynamicHoneyBadgerBuilder` configured to use the node IDs and cryptographic
@ -144,19 +141,12 @@ where
// TODO: Use the defaults from `HoneyBadgerBuilder`.
DynamicHoneyBadgerBuilder {
netinfo,
batch_size: 100,
start_epoch: 0,
max_future_epochs: 3,
_phantom: PhantomData,
}
}
/// Sets the target number of transactions per batch.
pub fn batch_size(&mut self, batch_size: usize) -> &mut Self {
self.batch_size = batch_size;
self
}
/// Sets the maximum number of future epochs for which we handle messages simultaneously.
pub fn max_future_epochs(&mut self, max_future_epochs: usize) -> &mut Self {
self.max_future_epochs = max_future_epochs;
@ -171,40 +161,33 @@ where
}
/// Creates a new Dynamic Honey Badger instance with an empty buffer.
pub fn build(&self) -> Result<(DynamicHoneyBadger<Tx, NodeUid>, FaultLog<NodeUid>)>
where
Tx: Serialize + for<'r> Deserialize<'r> + Debug + Hash + Eq,
{
let (honey_badger, fault_log) = HoneyBadger::builder(Rc::new(self.netinfo.clone()))
.batch_size(self.batch_size)
pub fn build(&self) -> DynamicHoneyBadger<C, NodeUid> {
let honey_badger = HoneyBadger::builder(Rc::new(self.netinfo.clone()))
.max_future_epochs(self.max_future_epochs)
.build()?;
let dyn_hb = DynamicHoneyBadger {
.build();
DynamicHoneyBadger {
netinfo: self.netinfo.clone(),
batch_size: self.batch_size,
max_future_epochs: self.max_future_epochs,
start_epoch: self.start_epoch,
votes: BTreeMap::new(),
node_tx_buffer: Vec::new(),
honey_badger,
key_gen: None,
incoming_queue: Vec::new(),
messages: MessageQueue(VecDeque::new()),
output: VecDeque::new(),
};
Ok((dyn_hb, fault_log))
}
}
}
/// A Honey Badger instance that can handle adding and removing nodes.
pub struct DynamicHoneyBadger<Tx, NodeUid>
pub struct DynamicHoneyBadger<C, NodeUid>
where
Tx: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash,
C: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash,
NodeUid: Ord + Clone + Serialize + for<'r> Deserialize<'r> + Debug,
{
/// Shared network data.
netinfo: NetworkInfo<NodeUid>,
/// The target number of transactions per batch.
batch_size: usize,
/// The maximum number of future epochs for which we handle messages simultaneously.
max_future_epochs: usize,
/// The first epoch after the latest node change.
@ -213,8 +196,10 @@ where
/// vote revokes the previous one. Resets whenever the set of validators is successfully
/// changed.
votes: BTreeMap<NodeUid, Change<NodeUid>>,
/// Pending node transactions that we will propose in the next epoch.
node_tx_buffer: Vec<SignedTransaction<NodeUid>>,
/// The `HoneyBadger` instance with the current set of nodes.
honey_badger: HoneyBadger<Transaction<Tx, NodeUid>, NodeUid>,
honey_badger: HoneyBadger<InternalContrib<C, NodeUid>, NodeUid>,
/// The current key generation process, and the change it applies to.
key_gen: Option<(SyncKeyGen<NodeUid>, Change<NodeUid>)>,
/// A queue for messages from future epochs that cannot be handled yet.
@ -222,30 +207,36 @@ where
/// The messages that need to be sent to other nodes.
messages: MessageQueue<NodeUid>,
/// The outputs from completed epochs.
output: VecDeque<Batch<Tx, NodeUid>>,
output: VecDeque<Batch<C, NodeUid>>,
}
impl<Tx, NodeUid> DistAlgorithm for DynamicHoneyBadger<Tx, NodeUid>
impl<C, NodeUid> DistAlgorithm for DynamicHoneyBadger<C, NodeUid>
where
Tx: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash,
C: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash,
NodeUid: Eq + Ord + Clone + Serialize + for<'r> Deserialize<'r> + Debug + Hash,
{
type NodeUid = NodeUid;
type Input = Input<Tx, NodeUid>;
type Output = Batch<Tx, NodeUid>;
type Input = Input<C, NodeUid>;
type Output = Batch<C, NodeUid>;
type Message = Message<NodeUid>;
type Error = Error;
fn input(&mut self, input: Self::Input) -> Result<FaultLog<NodeUid>> {
// User transactions are forwarded to `HoneyBadger` right away. Internal messages are
// in addition signed and broadcast.
// User contributions are forwarded to `HoneyBadger` right away. Votes are signed and
// broadcast.
match input {
Input::User(tx) => {
let mut fault_log = self.honey_badger.input(Transaction::User(tx))?;
Input::User(contrib) => {
let mut fault_log = self.honey_badger.input(InternalContrib {
contrib,
transactions: self.node_tx_buffer.clone(),
})?;
self.process_output()?.merge_into(&mut fault_log);
Ok(fault_log)
}
Input::Change(change) => self.send_transaction(NodeTransaction::Change(change)),
Input::Change(change) => {
self.send_transaction(NodeTransaction::Change(change))?;
Ok(FaultLog::new())
}
}
}
@ -266,7 +257,9 @@ where
}
match message {
Message::HoneyBadger(_, hb_msg) => self.handle_honey_badger_message(sender_id, hb_msg),
Message::Signed(_, node_tx, sig) => self.handle_signed_message(sender_id, node_tx, sig),
Message::Signed(_, node_tx, sig) => {
self.handle_signed_message(sender_id, node_tx, *sig)
}
}
}
@ -298,6 +291,16 @@ where
DynamicHoneyBadgerBuilder::new(netinfo)
}
/// Returns `true` if input for the current epoch has already been provided.
pub fn has_input(&self) -> bool {
self.honey_badger.has_input()
}
/// Returns the information about the node IDs in the network, and the cryptographic keys.
pub fn netinfo(&self) -> &NetworkInfo<NodeUid> {
&self.netinfo
}
/// Handles a message for the `HoneyBadger` instance.
fn handle_honey_badger_message(
&mut self,
@ -320,13 +323,12 @@ where
&mut self,
sender_id: &NodeUid,
node_tx: NodeTransaction<NodeUid>,
sig: Box<Signature>,
sig: Signature,
) -> Result<FaultLog<NodeUid>> {
self.verify_signature(sender_id, &*sig, &node_tx)?;
let tx = Transaction::Signed(self.start_epoch, sender_id.clone(), node_tx, sig);
let mut fault_log = self.honey_badger.input(tx)?;
self.process_output()?.merge_into(&mut fault_log);
Ok(fault_log)
self.verify_signature(sender_id, &sig, &node_tx)?;
let tx = SignedTransaction(self.start_epoch, sender_id.clone(), node_tx, sig);
self.node_tx_buffer.push(tx);
self.process_output()
}
/// Processes all pending batches output by Honey Badger.
@ -338,34 +340,26 @@ where
// `hb_batch`, and the current change state.
let mut batch = Batch::new(hb_batch.epoch + self.start_epoch);
// Add the user transactions to `batch` and handle votes and DKG messages.
for (id, tx_vec) in hb_batch.transactions {
let entry = batch.transactions.entry(id);
let id_txs = entry.or_insert_with(Vec::new);
for tx in tx_vec {
match tx {
Transaction::User(tx) => id_txs.push(tx),
Transaction::Signed(epoch, s_id, node_tx, sig) => {
if epoch < self.start_epoch {
info!("Obsolete node transaction: {:?}.", node_tx);
continue;
}
if !self.verify_signature(&s_id, &sig, &node_tx)? {
info!("Invalid signature from {:?} for: {:?}.", s_id, node_tx);
let fault_kind = FaultKind::InvalidNodeTransactionSignature;
fault_log.append(s_id.clone(), fault_kind);
continue;
}
use self::NodeTransaction::*;
match node_tx {
Change(change) => self.handle_vote(s_id, change),
Propose(propose) => self
.handle_propose(&s_id, propose)?
.merge_into(&mut fault_log),
Accept(accept) => self
.handle_accept(&s_id, accept)?
.merge_into(&mut fault_log),
}
}
for (id, int_contrib) in hb_batch.contributions {
batch.contributions.insert(id, int_contrib.contrib);
for SignedTransaction(epoch, s_id, node_tx, sig) in int_contrib.transactions {
if epoch < self.start_epoch {
info!("Obsolete node transaction: {:?}.", node_tx);
continue;
}
if !self.verify_signature(&s_id, &sig, &node_tx)? {
info!("Invalid signature from {:?} for: {:?}.", s_id, node_tx);
continue;
}
use self::NodeTransaction::*;
match node_tx {
Change(change) => self.handle_vote(s_id, change),
Propose(propose) => self
.handle_propose(&s_id, propose)?
.merge_into(&mut fault_log),
Accept(accept) => self
.handle_accept(&s_id, accept)?
.merge_into(&mut fault_log),
}
}
}
@ -377,13 +371,11 @@ where
ClearOnDrop::new(Box::new(self.netinfo.secret_key().clone()))
});
// Restart Honey Badger in the next epoch, and inform the user about the change.
self.apply_change(&change, pub_key_set, sk, batch.epoch + 1)?
.merge_into(&mut fault_log);
self.apply_change(&change, pub_key_set, sk, batch.epoch + 1)?;
batch.change = ChangeState::Complete(change);
} else {
// If the majority changed, restart DKG. Inform the user about the current change.
self.update_key_gen(batch.epoch + 1)?
.merge_into(&mut fault_log);
self.update_key_gen(batch.epoch + 1)?;
if let Some((_, ref change)) = self.key_gen {
batch.change = ChangeState::InProgress(change.clone());
}
@ -410,7 +402,7 @@ where
pub_key_set: PublicKeySet,
sk: ClearOnDrop<Box<SecretKey>>,
epoch: u64,
) -> Result<FaultLog<NodeUid>> {
) -> Result<()> {
self.votes.clear();
self.key_gen = None;
let mut all_uids = self.netinfo.all_uids().clone();
@ -427,16 +419,15 @@ where
/// If the majority of votes has changed, restarts Key Generation for the set of nodes implied
/// by the current change.
fn update_key_gen(&mut self, epoch: u64) -> Result<FaultLog<NodeUid>> {
let mut fault_log = FaultLog::new();
fn update_key_gen(&mut self, epoch: u64) -> Result<()> {
let change = match current_majority(&self.votes, &self.netinfo) {
None => {
self.key_gen = None;
return Ok(fault_log);
return Ok(());
}
Some(change) => {
if self.key_gen.as_ref().map(|&(_, ref ch)| ch) == Some(change) {
return Ok(fault_log); // The change is the same as last epoch. Continue DKG as is.
return Ok(()); // The change is the same as last epoch. Continue DKG as is.
}
change.clone()
}
@ -451,7 +442,7 @@ where
info!("{:?} No-op change: {:?}", self.our_id(), change);
}
if change.candidate().is_some() {
self.restart_honey_badger(epoch)?.merge_into(&mut fault_log);
self.restart_honey_badger(epoch)?;
}
// TODO: This needs to be the same as `num_faulty` will be in the _new_
// `NetworkInfo` if the change goes through. It would be safer to deduplicate.
@ -461,31 +452,22 @@ where
let (key_gen, propose) = SyncKeyGen::new(&our_uid, sk, pub_keys, threshold);
self.key_gen = Some((key_gen, change));
if let Some(propose) = propose {
self.send_transaction(NodeTransaction::Propose(propose))?
.merge_into(&mut fault_log);
self.send_transaction(NodeTransaction::Propose(propose))?;
}
Ok(fault_log)
Ok(())
}
/// Starts a new `HoneyBadger` instance and inputs the user transactions from the existing
/// one's buffer and pending outputs.
fn restart_honey_badger(&mut self, epoch: u64) -> Result<FaultLog<NodeUid>> {
fn restart_honey_badger(&mut self, epoch: u64) -> Result<()> {
// TODO: Filter out the messages for `epoch` and later.
self.messages
.extend_with_epoch(self.start_epoch, &mut self.honey_badger);
self.start_epoch = epoch;
let (honey_badger, fault_log) = {
let netinfo = Rc::new(self.netinfo.clone());
let old_buf = self.honey_badger.drain_buffer();
let outputs = (self.honey_badger.output_iter()).flat_map(HbBatch::into_tx_iter);
let buffer = outputs.chain(old_buf).filter(Transaction::is_user);
HoneyBadger::builder(netinfo)
.batch_size(self.batch_size)
.max_future_epochs(self.max_future_epochs)
.build_with_transactions(buffer)?
};
self.honey_badger = honey_badger;
Ok(fault_log)
self.honey_badger = HoneyBadger::builder(Rc::new(self.netinfo.clone()))
.max_future_epochs(self.max_future_epochs)
.build();
Ok(())
}
/// Handles a `Propose` message that was output by Honey Badger.
@ -499,7 +481,8 @@ where
};
match self.key_gen.as_mut().and_then(handle) {
Some(ProposeOutcome::Valid(accept)) => {
self.send_transaction(NodeTransaction::Accept(accept))
self.send_transaction(NodeTransaction::Accept(accept))?;
Ok(FaultLog::new())
}
Some(ProposeOutcome::Invalid(fault_log)) => Ok(fault_log),
None => Ok(FaultLog::new()),
@ -516,18 +499,17 @@ where
}
/// Signs and sends a `NodeTransaction` and also tries to commit it.
fn send_transaction(&mut self, node_tx: NodeTransaction<NodeUid>) -> Result<FaultLog<NodeUid>> {
fn send_transaction(&mut self, node_tx: NodeTransaction<NodeUid>) -> Result<()> {
let sig = self.sign(&node_tx)?;
let msg = Message::Signed(self.start_epoch, node_tx.clone(), sig.clone());
self.messages.push_back(Target::All.message(msg));
if !self.netinfo.is_validator() {
return Ok(FaultLog::new());
return Ok(());
}
let our_uid = self.netinfo.our_uid().clone();
let hb_tx = Transaction::Signed(self.start_epoch, our_uid, node_tx, sig);
let mut fault_log = self.honey_badger.input(hb_tx)?;
self.process_output()?.merge_into(&mut fault_log);
Ok(fault_log)
self.node_tx_buffer
.push(SignedTransaction(self.start_epoch, our_uid, node_tx, *sig));
Ok(())
}
/// If the current Key Generation process is ready, returns the generated key set.
@ -603,68 +585,86 @@ fn current_majority<'a, NodeUid: Ord + Clone + Hash + Eq>(
None
}
/// The transactions for the internal `HoneyBadger` instance: this includes both user-defined
/// "regular" transactions as well as internal signed messages.
/// The contribution for the internal `HoneyBadger` instance: this includes a user-defined
/// application-level contribution as well as internal signed messages.
#[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
enum Transaction<Tx, NodeUid> {
/// A user-defined transaction.
User(Tx),
/// A signed internal message that gets committed via Honey Badger to communicate synchronously.
Signed(u64, NodeUid, NodeTransaction<NodeUid>, Box<Signature>),
struct InternalContrib<C, NodeUid> {
/// A user-defined contribution.
contrib: C,
/// Signed internal messages that get committed via Honey Badger to communicate synchronously.
transactions: Vec<SignedTransaction<NodeUid>>,
}
impl<Tx, NodeUid> Transaction<Tx, NodeUid> {
/// Returns `true` if this is a user transaction.
fn is_user(&self) -> bool {
match *self {
Transaction::User(_) => true,
Transaction::Signed(_, _, _, _) => false,
}
}
}
/// A signed internal message.
#[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash, Clone)]
struct SignedTransaction<NodeUid>(u64, NodeUid, NodeTransaction<NodeUid>, Signature);
/// A batch of transactions the algorithm has output.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Batch<Tx, NodeUid> {
pub struct Batch<C, NodeUid> {
/// The sequence number: there is exactly one batch in each epoch.
pub epoch: u64,
/// The user transactions committed in this epoch.
pub transactions: BTreeMap<NodeUid, Vec<Tx>>,
/// The user contributions committed in this epoch.
pub contributions: BTreeMap<NodeUid, C>,
/// The current state of adding or removing a node: whether any is in progress, or completed
/// this epoch.
pub change: ChangeState<NodeUid>,
}
impl<Tx, NodeUid: Ord> Batch<Tx, NodeUid> {
impl<C, NodeUid: Ord> Batch<C, NodeUid> {
/// Returns a new, empty batch with the given epoch.
pub fn new(epoch: u64) -> Self {
Batch {
epoch,
transactions: BTreeMap::new(),
contributions: BTreeMap::new(),
change: ChangeState::None,
}
}
/// Returns an iterator over all transactions included in the batch.
pub fn iter(&self) -> impl Iterator<Item = &Tx> {
self.transactions.values().flat_map(|vec| vec)
}
/// Returns the number of transactions in the batch (without detecting duplicates).
pub fn len(&self) -> usize {
self.transactions.values().map(Vec::len).sum()
}
/// Returns `true` if the batch contains no transactions.
pub fn is_empty(&self) -> bool {
self.transactions.values().all(Vec::is_empty)
}
/// Returns whether any change to the set of participating nodes is in progress or was
/// completed in this epoch.
pub fn change(&self) -> &ChangeState<NodeUid> {
&self.change
}
/// Returns an iterator over references to all transactions included in the batch.
pub fn iter<'a>(&'a self) -> impl Iterator<Item = <&'a C as IntoIterator>::Item>
where
&'a C: IntoIterator,
{
self.contributions.values().flat_map(|item| item)
}
/// Returns an iterator over all transactions included in the batch. Consumes the batch.
pub fn into_tx_iter(self) -> impl Iterator<Item = <C as IntoIterator>::Item>
where
C: IntoIterator,
{
self.contributions.into_iter().flat_map(|(_, vec)| vec)
}
/// Returns the number of transactions in the batch (without detecting duplicates).
pub fn len<Tx>(&self) -> usize
where
C: AsRef<[Tx]>,
{
self.contributions
.values()
.map(C::as_ref)
.map(<[Tx]>::len)
.sum()
}
/// Returns `true` if the batch contains no transactions.
pub fn is_empty<Tx>(&self) -> bool
where
C: AsRef<[Tx]>,
{
self.contributions
.values()
.map(C::as_ref)
.all(<[Tx]>::is_empty)
}
}
/// An internal message containing a vote for adding or removing a validator, or a message for key
@ -681,8 +681,7 @@ pub enum NodeTransaction<NodeUid> {
}
/// A message sent to or received from another node's Honey Badger instance.
#[cfg_attr(feature = "serialization-serde", derive(Serialize, Deserialize))]
#[derive(Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Message<NodeUid> {
/// A message belonging to the `HoneyBadger` algorithm started in the given epoch.
HoneyBadger(u64, HbMessage<NodeUid>),

View File

@ -1,15 +1,13 @@
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt::Debug;
use std::hash::Hash;
use std::marker::PhantomData;
use std::ops::Not;
use std::rc::Rc;
use std::{cmp, iter, mem};
use bincode;
use itertools::Itertools;
use rand;
use serde::{Deserialize, Serialize};
use common_subset::{self, CommonSubset};
@ -36,19 +34,17 @@ error_chain!{
}
/// A Honey Badger builder, to configure the parameters and create new instances of `HoneyBadger`.
pub struct HoneyBadgerBuilder<Tx, NodeUid> {
pub struct HoneyBadgerBuilder<C, NodeUid> {
/// Shared network data.
netinfo: Rc<NetworkInfo<NodeUid>>,
/// The target number of transactions to be included in each batch.
// TODO: Do experiments and pick a suitable default.
batch_size: usize,
/// The maximum number of future epochs for which we handle messages simultaneously.
max_future_epochs: usize,
_phantom: PhantomData<Tx>,
_phantom: PhantomData<C>,
}
impl<Tx, NodeUid> HoneyBadgerBuilder<Tx, NodeUid>
impl<C, NodeUid> HoneyBadgerBuilder<C, NodeUid>
where
C: Serialize + for<'r> Deserialize<'r> + Debug + Hash + Eq,
NodeUid: Ord + Clone + Debug,
{
/// Returns a new `HoneyBadgerBuilder` configured to use the node IDs and cryptographic keys
@ -56,82 +52,52 @@ where
pub fn new(netinfo: Rc<NetworkInfo<NodeUid>>) -> Self {
HoneyBadgerBuilder {
netinfo,
batch_size: 100,
max_future_epochs: 3,
_phantom: PhantomData,
}
}
/// Sets the target number of transactions per batch.
pub fn batch_size(&mut self, batch_size: usize) -> &mut Self {
self.batch_size = batch_size;
self
}
/// Sets the maximum number of future epochs for which we handle messages simultaneously.
pub fn max_future_epochs(&mut self, max_future_epochs: usize) -> &mut Self {
self.max_future_epochs = max_future_epochs;
self
}
/// Creates a new Honey Badger instance with an empty buffer.
pub fn build(&self) -> HoneyBadgerResult<(HoneyBadger<Tx, NodeUid>, FaultLog<NodeUid>)>
where
Tx: Serialize + for<'r> Deserialize<'r> + Debug + Hash + Eq,
{
self.build_with_transactions(None)
}
/// Returns a new Honey Badger instance that starts with the given transactions in its buffer.
pub fn build_with_transactions<TI>(
&self,
txs: TI,
) -> HoneyBadgerResult<(HoneyBadger<Tx, NodeUid>, FaultLog<NodeUid>)>
where
TI: IntoIterator<Item = Tx>,
Tx: Serialize + for<'r> Deserialize<'r> + Debug + Hash + Eq,
{
let mut honey_badger = HoneyBadger {
/// Creates a new Honey Badger instance.
pub fn build(&self) -> HoneyBadger<C, NodeUid> {
HoneyBadger {
netinfo: self.netinfo.clone(),
buffer: Vec::new(),
epoch: 0,
has_input: false,
common_subsets: BTreeMap::new(),
batch_size: self.batch_size,
max_future_epochs: self.max_future_epochs as u64,
messages: MessageQueue(VecDeque::new()),
output: VecDeque::new(),
incoming_queue: BTreeMap::new(),
received_shares: BTreeMap::new(),
decrypted_selections: BTreeMap::new(),
decrypted_contributions: BTreeMap::new(),
ciphertexts: BTreeMap::new(),
};
honey_badger.buffer.extend(txs);
let fault_log = honey_badger.propose()?;
Ok((honey_badger, fault_log))
}
}
}
/// An instance of the Honey Badger Byzantine fault tolerant consensus algorithm.
pub struct HoneyBadger<Tx, NodeUid> {
pub struct HoneyBadger<C, NodeUid> {
/// Shared network data.
netinfo: Rc<NetworkInfo<NodeUid>>,
/// The buffer of transactions that have not yet been included in any output batch.
buffer: Vec<Tx>,
/// The earliest epoch from which we have not yet received output.
epoch: u64,
/// Whether we have already submitted a proposal for the current epoch.
has_input: bool,
/// The Asynchronous Common Subset instance that decides which nodes' transactions to include,
/// indexed by epoch.
common_subsets: BTreeMap<u64, CommonSubset<NodeUid>>,
/// The target number of transactions to be included in each batch.
// TODO: Do experiments and recommend a batch size. It should be proportional to
// `num_nodes * num_nodes * log(num_nodes)`.
batch_size: usize,
/// The maximum number of `CommonSubset` instances that we run simultaneously.
max_future_epochs: u64,
/// The messages that need to be sent to other nodes.
messages: MessageQueue<NodeUid>,
/// The outputs from completed epochs.
output: VecDeque<Batch<Tx, NodeUid>>,
output: VecDeque<Batch<C, NodeUid>>,
/// Messages for future epochs that couldn't be handled yet.
incoming_queue: BTreeMap<u64, Vec<(NodeUid, MessageContent<NodeUid>)>>,
/// Received decryption shares for an epoch. Each decryption share has a sender and a
@ -139,25 +105,24 @@ pub struct HoneyBadger<Tx, NodeUid> {
/// its key. The inner `BTreeMap` has the sender as its key.
received_shares: BTreeMap<u64, BTreeMap<NodeUid, BTreeMap<NodeUid, DecryptionShare>>>,
/// Decoded accepted proposals.
decrypted_selections: BTreeMap<NodeUid, Vec<u8>>,
decrypted_contributions: BTreeMap<NodeUid, Vec<u8>>,
/// Ciphertexts output by Common Subset in an epoch.
ciphertexts: BTreeMap<u64, BTreeMap<NodeUid, Ciphertext>>,
}
impl<Tx, NodeUid> DistAlgorithm for HoneyBadger<Tx, NodeUid>
impl<C, NodeUid> DistAlgorithm for HoneyBadger<C, NodeUid>
where
Tx: Serialize + for<'r> Deserialize<'r> + Debug + Hash + Eq,
C: Serialize + for<'r> Deserialize<'r> + Debug + Hash + Eq,
NodeUid: Ord + Clone + Debug,
{
type NodeUid = NodeUid;
type Input = Tx;
type Output = Batch<Tx, NodeUid>;
type Input = C;
type Output = Batch<C, NodeUid>;
type Message = Message<NodeUid>;
type Error = Error;
fn input(&mut self, input: Self::Input) -> HoneyBadgerResult<FaultLog<NodeUid>> {
self.add_transactions(iter::once(input));
Ok(FaultLog::new())
Ok(self.propose(&input)?)
}
fn handle_message(
@ -201,62 +166,39 @@ where
}
}
impl<Tx, NodeUid> HoneyBadger<Tx, NodeUid>
impl<C, NodeUid> HoneyBadger<C, NodeUid>
where
Tx: Serialize + for<'r> Deserialize<'r> + Debug + Hash + Eq,
C: Serialize + for<'r> Deserialize<'r> + Debug + Hash + Eq,
NodeUid: Ord + Clone + Debug,
{
/// Returns a new `HoneyBadgerBuilder` configured to use the node IDs and cryptographic keys
/// specified by `netinfo`.
pub fn builder(netinfo: Rc<NetworkInfo<NodeUid>>) -> HoneyBadgerBuilder<Tx, NodeUid> {
pub fn builder(netinfo: Rc<NetworkInfo<NodeUid>>) -> HoneyBadgerBuilder<C, NodeUid> {
HoneyBadgerBuilder::new(netinfo)
}
/// Adds transactions into the buffer.
pub fn add_transactions<I: IntoIterator<Item = Tx>>(&mut self, txs: I) {
self.buffer.extend(txs);
}
/// Empties and returns the transaction buffer.
pub fn drain_buffer(&mut self) -> Vec<Tx> {
mem::replace(&mut self.buffer, Vec::new())
}
/// Proposes a new batch in the current epoch.
fn propose(&mut self) -> HoneyBadgerResult<FaultLog<NodeUid>> {
/// Proposes a new item in the current epoch.
pub fn propose(&mut self, proposal: &C) -> HoneyBadgerResult<FaultLog<NodeUid>> {
if !self.netinfo.is_validator() {
return Ok(FaultLog::new());
}
let proposal = self.choose_transactions()?;
let cs = match self.common_subsets.entry(self.epoch) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
entry.insert(CommonSubset::new(self.netinfo.clone(), self.epoch)?)
}
};
let ciphertext = self.netinfo.public_key_set().public_key().encrypt(proposal);
let ser_prop = bincode::serialize(&proposal)?;
let ciphertext = self.netinfo.public_key_set().public_key().encrypt(ser_prop);
let fault_log = cs.input(bincode::serialize(&ciphertext).unwrap())?;
self.has_input = true;
self.messages.extend_with_epoch(self.epoch, cs);
Ok(fault_log)
}
/// Returns a random choice of `batch_size / all_uids.len()` buffered transactions, and
/// serializes them.
fn choose_transactions(&self) -> HoneyBadgerResult<Vec<u8>> {
let mut rng = rand::thread_rng();
let amount = cmp::max(1, self.batch_size / self.netinfo.all_uids().len());
let batch_size = cmp::min(self.batch_size, self.buffer.len());
let sample = match rand::seq::sample_iter(&mut rng, &self.buffer[..batch_size], amount) {
Ok(choice) => choice,
Err(choice) => choice, // Fewer than `amount` were available, which is fine.
};
debug!(
"{:?} Proposing in epoch {}: {:?}",
self.netinfo.our_uid(),
self.epoch,
sample
);
Ok(bincode::serialize(&sample)?)
/// Returns `true` if input for the current epoch has already been provided.
pub fn has_input(&self) -> bool {
self.has_input
}
/// Handles a message for the given epoch.
@ -343,10 +285,8 @@ where
proposer_shares.insert(sender_id.clone(), share);
}
if epoch == self.epoch && self.try_decrypt_proposer_selection(proposer_id) {
if let BoolWithFaultLog::True(faults) = self.try_output_batch()? {
fault_log.extend(faults);
}
if epoch == self.epoch && self.try_decrypt_proposer_contribution(proposer_id) {
self.try_output_batch()?.merge_into(&mut fault_log);
}
Ok(fault_log)
@ -368,25 +308,24 @@ where
}
}
/// When selections of transactions have been decrypted for all valid proposers in this epoch,
/// When contributions of transactions have been decrypted for all valid proposers in this epoch,
/// moves those transactions into a batch, outputs the batch and updates the epoch.
fn try_output_batch(&mut self) -> HoneyBadgerResult<BoolWithFaultLog<NodeUid>> {
// Wait until selections have been successfully decoded for all proposer nodes with correct
fn try_output_batch(&mut self) -> HoneyBadgerResult<FaultLog<NodeUid>> {
// Wait until contributions have been successfully decoded for all proposer nodes with correct
// ciphertext outputs.
if !self.all_selections_decrypted() {
return Ok(BoolWithFaultLog::False);
if !self.all_contributions_decrypted() {
return Ok(FaultLog::new());
}
// Deserialize the output.
let mut fault_log = FaultLog::new();
let transactions: BTreeMap<NodeUid, Vec<Tx>> = self
.decrypted_selections
let contributions: BTreeMap<NodeUid, C> = self
.decrypted_contributions
.iter()
.flat_map(|(proposer_id, ser_batch)| {
// If deserialization fails, the proposer of that batch is
// faulty. Log the faulty proposer and ignore the batch.
if let Ok(proposed) = bincode::deserialize::<Vec<Tx>>(&ser_batch) {
Some((proposer_id.clone(), proposed))
.flat_map(|(proposer_id, ser_contrib)| {
// If deserialization fails, the proposer of that item is faulty. Ignore it.
if let Ok(contrib) = bincode::deserialize::<C>(&ser_contrib) {
Some((proposer_id.clone(), contrib))
} else {
let fault_kind = FaultKind::BatchDeserializationFailed;
fault_log.append(proposer_id.clone(), fault_kind);
@ -396,32 +335,28 @@ where
.collect();
let batch = Batch {
epoch: self.epoch,
transactions,
contributions,
};
{
let tx_set: HashSet<&Tx> = batch.iter().collect();
// Remove the output transactions from our buffer.
self.buffer.retain(|tx| !tx_set.contains(&tx));
}
debug!(
"{:?} Epoch {} output {:?}",
self.netinfo.our_uid(),
self.epoch,
batch.transactions
batch.contributions
);
// Queue the output and advance the epoch.
self.output.push_back(batch);
self.update_epoch()?.merge_into(&mut fault_log);
Ok(BoolWithFaultLog::True(fault_log))
Ok(fault_log)
}
/// Increments the epoch number and clears any state that is local to the finished epoch.
fn update_epoch(&mut self) -> HoneyBadgerResult<FaultLog<NodeUid>> {
// Clear the state of the old epoch.
self.ciphertexts.remove(&self.epoch);
self.decrypted_selections.clear();
self.decrypted_contributions.clear();
self.received_shares.remove(&self.epoch);
self.epoch += 1;
self.has_input = false;
let max_epoch = self.epoch + self.max_future_epochs;
let mut fault_log = FaultLog::new();
// TODO: Once stable, use `Iterator::flatten`.
@ -432,56 +367,48 @@ where
.merge_into(&mut fault_log);
}
// Handle any decryption shares received for the new epoch.
if let BoolWithFaultLog::True(faults) = self.try_decrypt_and_output_batch()? {
fault_log.extend(faults);
} else {
// Continue with this epoch if a batch is not output by
// `try_decrypt_and_output_batch`.
self.propose()?.merge_into(&mut fault_log);
}
self.try_decrypt_and_output_batch()?
.merge_into(&mut fault_log);
Ok(fault_log)
}
/// Tries to decrypt transaction selections from all proposers and output those transactions in
/// Tries to decrypt transaction contributions from all proposers and output those transactions in
/// a batch.
fn try_decrypt_and_output_batch(&mut self) -> HoneyBadgerResult<BoolWithFaultLog<NodeUid>> {
fn try_decrypt_and_output_batch(&mut self) -> HoneyBadgerResult<FaultLog<NodeUid>> {
if let Some(proposer_ids) = self
.received_shares
.get(&self.epoch)
.map(|shares| shares.keys().cloned().collect::<BTreeSet<NodeUid>>())
{
// Try to output a batch if there is a non-empty set of proposers for which we have already received
// decryption shares.
// Try to output a batch if there is a non-empty set of proposers for which we have
// already received decryption shares.
if !proposer_ids.is_empty()
&& proposer_ids
.iter()
.all(|proposer_id| self.try_decrypt_proposer_selection(proposer_id.clone()))
.all(|proposer_id| self.try_decrypt_proposer_contribution(proposer_id.clone()))
{
self.try_output_batch()
} else {
Ok(BoolWithFaultLog::False)
return self.try_output_batch();
}
} else {
Ok(BoolWithFaultLog::False)
}
Ok(FaultLog::new())
}
/// Returns true if and only if transaction selections have been decrypted for all proposers in
/// Returns true if and only if contributions have been decrypted for all selected proposers in
/// this epoch.
fn all_selections_decrypted(&mut self) -> bool {
fn all_contributions_decrypted(&mut self) -> bool {
let ciphertexts = self
.ciphertexts
.entry(self.epoch)
.or_insert_with(BTreeMap::new);
let all_ciphertext_proposers: BTreeSet<_> = ciphertexts.keys().collect();
let all_decrypted_selection_proposers: BTreeSet<_> =
self.decrypted_selections.keys().collect();
all_ciphertext_proposers == all_decrypted_selection_proposers
let all_decrypted_contribution_proposers: BTreeSet<_> =
self.decrypted_contributions.keys().collect();
all_ciphertext_proposers == all_decrypted_contribution_proposers
}
/// Tries to decrypt the selection of transactions from a given proposer. Outputs `true` if and
/// only if decryption finished without errors.
fn try_decrypt_proposer_selection(&mut self, proposer_id: NodeUid) -> bool {
/// Tries to decrypt the contribution from a given proposer. Outputs `true` if and only if
/// decryption finished without errors.
fn try_decrypt_proposer_contribution(&mut self, proposer_id: NodeUid) -> bool {
let shares = &self.received_shares[&self.epoch][&proposer_id];
if shares.len() <= self.netinfo.num_faulty() {
return false;
@ -500,13 +427,13 @@ where
.into_iter()
.map(|(id, share)| (&ids_u64[id], share))
.collect();
if let Ok(decrypted_selection) = self
if let Ok(decrypted_contribution) = self
.netinfo
.public_key_set()
.decrypt(indexed_shares, ciphertext)
{
self.decrypted_selections
.insert(proposer_id, decrypted_selection);
self.decrypted_contributions
.insert(proposer_id, decrypted_contribution);
return true;
}
}
@ -653,34 +580,51 @@ where
}
}
/// A batch of transactions the algorithm has output.
///
/// TODO: Consider adding a `faulty_nodes` field to describe and report failures detected by `HoneyBadger`.
/// A batch of contributions the algorithm has output.
#[derive(Clone)]
pub struct Batch<Tx, NodeUid> {
pub struct Batch<C, NodeUid> {
pub epoch: u64,
pub transactions: BTreeMap<NodeUid, Vec<Tx>>,
pub contributions: BTreeMap<NodeUid, C>,
}
impl<Tx, NodeUid: Ord> Batch<Tx, NodeUid> {
impl<C, NodeUid: Ord> Batch<C, NodeUid> {
/// Returns an iterator over references to all transactions included in the batch.
pub fn iter(&self) -> impl Iterator<Item = &Tx> {
self.transactions.values().flat_map(|vec| vec)
pub fn iter<'a>(&'a self) -> impl Iterator<Item = <&'a C as IntoIterator>::Item>
where
&'a C: IntoIterator,
{
self.contributions.values().flat_map(|item| item)
}
/// Returns an iterator over all transactions included in the batch. Consumes the batch.
pub fn into_tx_iter(self) -> impl Iterator<Item = Tx> {
self.transactions.into_iter().flat_map(|(_, vec)| vec)
pub fn into_tx_iter(self) -> impl Iterator<Item = <C as IntoIterator>::Item>
where
C: IntoIterator,
{
self.contributions.into_iter().flat_map(|(_, vec)| vec)
}
/// Returns the number of transactions in the batch (without detecting duplicates).
pub fn len(&self) -> usize {
self.transactions.values().map(Vec::len).sum()
pub fn len<Tx>(&self) -> usize
where
C: AsRef<[Tx]>,
{
self.contributions
.values()
.map(C::as_ref)
.map(<[Tx]>::len)
.sum()
}
/// Returns `true` if the batch contains no transactions.
pub fn is_empty(&self) -> bool {
self.transactions.values().all(Vec::is_empty)
pub fn is_empty<Tx>(&self) -> bool
where
C: AsRef<[Tx]>,
{
self.contributions
.values()
.map(C::as_ref)
.all(<[Tx]>::is_empty)
}
}

View File

@ -134,4 +134,6 @@ pub mod messaging;
pub mod proto;
#[cfg(feature = "serialization-protobuf")]
pub mod proto_io;
pub mod queueing_honey_badger;
pub mod sync_key_gen;
pub mod transaction_queue;

View File

@ -0,0 +1,200 @@
//! # Queueing Honey Badger
//!
//! This works exactly like Dynamic Honey Badger, but it has a transaction queue built in.
use std::collections::VecDeque;
use std::fmt::Debug;
use std::hash::Hash;
use std::marker::PhantomData;
use serde::{Deserialize, Serialize};
use dynamic_honey_badger::{self, Batch as DhbBatch, DynamicHoneyBadger, Message};
use fault_log::FaultLog;
use messaging::{DistAlgorithm, NetworkInfo, TargetedMessage};
pub use dynamic_honey_badger::{Change, ChangeState, Input};
use transaction_queue::TransactionQueue;
error_chain!{
links {
DynamicHoneyBadger(dynamic_honey_badger::Error, dynamic_honey_badger::ErrorKind);
}
}
/// A Queueing Honey Badger builder, to configure the parameters and create new instances of
/// `QueueingHoneyBadger`.
pub struct QueueingHoneyBadgerBuilder<Tx, NodeUid> {
/// Shared network data.
netinfo: NetworkInfo<NodeUid>,
/// The target number of transactions to be included in each batch.
batch_size: usize,
/// The epoch at which to join the network.
start_epoch: u64,
/// The maximum number of future epochs for which we handle messages simultaneously.
max_future_epochs: usize,
_phantom: PhantomData<Tx>,
}
impl<Tx, NodeUid> QueueingHoneyBadgerBuilder<Tx, NodeUid>
where
Tx: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash + Clone,
NodeUid: Eq + Ord + Clone + Debug + Serialize + for<'r> Deserialize<'r> + Hash,
{
/// Returns a new `QueueingHoneyBadgerBuilder` configured to use the node IDs and cryptographic
/// keys specified by `netinfo`.
pub fn new(netinfo: NetworkInfo<NodeUid>) -> Self {
// TODO: Use the defaults from `HoneyBadgerBuilder`.
QueueingHoneyBadgerBuilder {
netinfo,
batch_size: 100,
start_epoch: 0,
max_future_epochs: 3,
_phantom: PhantomData,
}
}
/// Sets the target number of transactions per batch.
pub fn batch_size(&mut self, batch_size: usize) -> &mut Self {
self.batch_size = batch_size;
self
}
/// Sets the maximum number of future epochs for which we handle messages simultaneously.
pub fn max_future_epochs(&mut self, max_future_epochs: usize) -> &mut Self {
self.max_future_epochs = max_future_epochs;
self
}
/// Sets the epoch at which to join the network as an observer. This requires the node to
/// receive all broadcast messages for `start_epoch` and later.
pub fn start_epoch(&mut self, start_epoch: u64) -> &mut Self {
self.start_epoch = start_epoch;
self
}
/// Creates a new Queueing Honey Badger instance with an empty buffer.
pub fn build(&self) -> QueueingHoneyBadger<Tx, NodeUid>
where
Tx: Serialize + for<'r> Deserialize<'r> + Debug + Hash + Eq,
{
self.build_with_transactions(None)
.expect("building without transactions cannot fail")
}
/// Returns a new Queueing Honey Badger instance that starts with the given transactions in its
/// buffer.
pub fn build_with_transactions<TI>(&self, txs: TI) -> Result<QueueingHoneyBadger<Tx, NodeUid>>
where
TI: IntoIterator<Item = Tx>,
Tx: Serialize + for<'r> Deserialize<'r> + Debug + Hash + Eq,
{
let dyn_hb = DynamicHoneyBadger::builder(self.netinfo.clone())
.max_future_epochs(self.max_future_epochs)
.build();
let queue = TransactionQueue(txs.into_iter().collect());
let mut qhb = QueueingHoneyBadger {
dyn_hb,
queue,
batch_size: self.batch_size,
output: VecDeque::new(),
};
let _ = qhb.propose()?; // Fault log is empty: no contact with other nodes yet.
Ok(qhb)
}
}
/// A Honey Badger instance that can handle adding and removing nodes and manages a transaction
/// queue.
pub struct QueueingHoneyBadger<Tx, NodeUid>
where
Tx: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash,
NodeUid: Ord + Clone + Serialize + for<'r> Deserialize<'r> + Debug,
{
/// The target number of transactions to be included in each batch.
batch_size: usize,
/// The internal `DynamicHoneyBadger` instance.
dyn_hb: DynamicHoneyBadger<Vec<Tx>, NodeUid>,
/// The queue of pending transactions that haven't been output in a batch yet.
queue: TransactionQueue<Tx>,
/// The outputs from completed epochs.
output: VecDeque<Batch<Tx, NodeUid>>,
}
impl<Tx, NodeUid> DistAlgorithm for QueueingHoneyBadger<Tx, NodeUid>
where
Tx: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash + Clone,
NodeUid: Eq + Ord + Clone + Serialize + for<'r> Deserialize<'r> + Debug + Hash,
{
type NodeUid = NodeUid;
type Input = Input<Tx, NodeUid>;
type Output = Batch<Tx, NodeUid>;
type Message = Message<NodeUid>;
type Error = Error;
fn input(&mut self, input: Self::Input) -> Result<FaultLog<NodeUid>> {
// User transactions are forwarded to `HoneyBadger` right away. Internal messages are
// in addition signed and broadcast.
match input {
Input::User(tx) => {
self.queue.0.push_back(tx);
Ok(FaultLog::new())
}
Input::Change(change) => Ok(self.dyn_hb.input(Input::Change(change))?),
}
}
fn handle_message(
&mut self,
sender_id: &NodeUid,
message: Self::Message,
) -> Result<FaultLog<NodeUid>> {
let mut fault_log = self.dyn_hb.handle_message(sender_id, message)?;
while let Some(batch) = self.dyn_hb.next_output() {
self.queue.remove_all(batch.iter());
self.output.push_back(batch);
}
if !self.dyn_hb.has_input() {
self.propose()?.merge_into(&mut fault_log);
}
Ok(fault_log)
}
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, NodeUid>> {
self.dyn_hb.next_message()
}
fn next_output(&mut self) -> Option<Self::Output> {
self.output.pop_front()
}
fn terminated(&self) -> bool {
false
}
fn our_id(&self) -> &NodeUid {
self.dyn_hb.our_id()
}
}
impl<Tx, NodeUid> QueueingHoneyBadger<Tx, NodeUid>
where
Tx: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash + Clone,
NodeUid: Eq + Ord + Clone + Debug + Serialize + for<'r> Deserialize<'r> + Hash,
{
/// Returns a new `QueueingHoneyBadgerBuilder` configured to use the node IDs and cryptographic
/// keys specified by `netinfo`.
pub fn builder(netinfo: NetworkInfo<NodeUid>) -> QueueingHoneyBadgerBuilder<Tx, NodeUid> {
QueueingHoneyBadgerBuilder::new(netinfo)
}
/// Initiates the next epoch by proposing a batch from the queue.
fn propose(&mut self) -> Result<FaultLog<NodeUid>> {
let amount = self.batch_size / self.dyn_hb.netinfo().num_nodes();
let proposal = self.queue.choose(amount, self.batch_size);
Ok(self.dyn_hb.input(Input::User(proposal))?)
}
}
pub type Batch<Tx, NodeUid> = DhbBatch<Vec<Tx>, NodeUid>;

34
src/transaction_queue.rs Normal file
View File

@ -0,0 +1,34 @@
use std::cmp;
use std::collections::{HashSet, VecDeque};
use std::hash::Hash;
use rand;
/// A wrapper providing a few convenience methods for a queue of pending transactions.
#[derive(Debug)]
pub struct TransactionQueue<Tx>(pub VecDeque<Tx>);
impl<Tx: Clone> TransactionQueue<Tx> {
/// Removes the given transactions from the queue.
pub fn remove_all<'a, I>(&mut self, txs: I)
where
I: IntoIterator<Item = &'a Tx>,
Tx: Eq + Hash + 'a,
{
let tx_set: HashSet<_> = txs.into_iter().collect();
self.0.retain(|tx| !tx_set.contains(tx));
}
/// Returns a new set of `amount` transactions, randomly chosen from the first `batch_size`.
/// No transactions are removed from the queue.
// TODO: Return references, once the `HoneyBadger` API accepts them. Remove `Clone` bound.
pub fn choose(&self, amount: usize, batch_size: usize) -> Vec<Tx> {
let mut rng = rand::thread_rng();
let limit = cmp::min(batch_size, self.0.len());
let sample = match rand::seq::sample_iter(&mut rng, self.0.iter().take(limit), amount) {
Ok(choice) => choice,
Err(choice) => choice, // Fewer than `amount` were available, which is fine.
};
sample.into_iter().cloned().collect()
}
}

View File

@ -18,31 +18,34 @@ use std::rc::Rc;
use rand::Rng;
use hbbft::dynamic_honey_badger::{Change, ChangeState, DynamicHoneyBadger, Input};
use hbbft::dynamic_honey_badger::{Batch, Change, ChangeState, DynamicHoneyBadger, Input};
use hbbft::messaging::NetworkInfo;
use hbbft::transaction_queue::TransactionQueue;
use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork, TestNode};
type UsizeDhb = DynamicHoneyBadger<Vec<usize>, NodeUid>;
/// Proposes `num_txs` values and expects nodes to output and order them.
fn test_dynamic_honey_badger<A>(
mut network: TestNetwork<A, DynamicHoneyBadger<usize, NodeUid>>,
num_txs: usize,
) where
A: Adversary<DynamicHoneyBadger<usize, NodeUid>>,
fn test_dynamic_honey_badger<A>(mut network: TestNetwork<A, UsizeDhb>, num_txs: usize)
where
A: Adversary<UsizeDhb>,
{
// The second half of the transactions will be input only after a node has been removed.
network.input_all(Input::Change(Change::Remove(NodeUid(0))));
for tx in 0..(num_txs / 2) {
network.input_all(Input::User(tx));
let new_queue = |id: &NodeUid| (*id, TransactionQueue((0..num_txs).collect()));
let mut queues: BTreeMap<_, _> = network.nodes.keys().map(new_queue).collect();
for (id, queue) in &queues {
network.input(*id, Input::User(queue.choose(3, 10)));
}
fn has_remove(node: &TestNode<DynamicHoneyBadger<usize, NodeUid>>) -> bool {
network.input_all(Input::Change(Change::Remove(NodeUid(0))));
fn has_remove(node: &TestNode<UsizeDhb>) -> bool {
node.outputs()
.iter()
.any(|batch| batch.change == ChangeState::Complete(Change::Remove(NodeUid(0))))
}
fn has_add(node: &TestNode<DynamicHoneyBadger<usize, NodeUid>>) -> bool {
fn has_add(node: &TestNode<UsizeDhb>) -> bool {
node.outputs().iter().any(|batch| match batch.change {
ChangeState::Complete(Change::Add(ref id, _)) => *id == NodeUid(0),
_ => false,
@ -51,7 +54,7 @@ fn test_dynamic_honey_badger<A>(
// Returns `true` if the node has not output all transactions yet.
// If it has, and has advanced another epoch, it clears all messages for later epochs.
let node_busy = |node: &mut TestNode<DynamicHoneyBadger<usize, NodeUid>>| {
let node_busy = |node: &mut TestNode<UsizeDhb>| {
if !has_remove(node) || !has_add(node) {
return true;
}
@ -76,11 +79,15 @@ fn test_dynamic_honey_badger<A>(
let mut input_add = false;
// Handle messages in random order until all nodes have output all transactions.
while network.nodes.values_mut().any(node_busy) {
network.step();
let id = network.step();
if !network.nodes[&id].instance().has_input() {
queues
.get_mut(&id)
.unwrap()
.remove_all(network.nodes[&id].outputs().iter().flat_map(Batch::iter));
network.input(id, Input::User(queues[&id].choose(3, 10)));
}
if !input_add && network.nodes.values().all(has_remove) {
for tx in (num_txs / 2)..num_txs {
network.input_all(Input::User(tx));
}
let pk = network.pk_set.public_key_share(0);
network.input_all(Input::Change(Change::Add(NodeUid(0), pk)));
info!("Input!");
@ -93,9 +100,9 @@ fn test_dynamic_honey_badger<A>(
/// Verifies that all instances output the same sequence of batches. We already know that all of
/// them have output all transactions and events, but some may have advanced a few empty batches
/// more than others, so we ignore those.
fn verify_output_sequence<A>(network: &TestNetwork<A, DynamicHoneyBadger<usize, NodeUid>>)
fn verify_output_sequence<A>(network: &TestNetwork<A, UsizeDhb>)
where
A: Adversary<DynamicHoneyBadger<usize, NodeUid>>,
A: Adversary<UsizeDhb>,
{
let expected = network.nodes[&NodeUid(0)].outputs().to_vec();
assert!(!expected.is_empty());
@ -107,17 +114,13 @@ where
// Allow passing `netinfo` by value. `TestNetwork` expects this function signature.
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
fn new_dynamic_hb(netinfo: Rc<NetworkInfo<NodeUid>>) -> DynamicHoneyBadger<usize, NodeUid> {
DynamicHoneyBadger::builder((*netinfo).clone())
.batch_size(12)
.build()
.expect("Instantiate dynamic_honey_badger")
.0
fn new_dynamic_hb(netinfo: Rc<NetworkInfo<NodeUid>>) -> UsizeDhb {
DynamicHoneyBadger::builder((*netinfo).clone()).build()
}
fn test_dynamic_honey_badger_different_sizes<A, F>(new_adversary: F, num_txs: usize)
where
A: Adversary<DynamicHoneyBadger<usize, NodeUid>>,
A: Adversary<UsizeDhb>,
F: Fn(usize, usize, BTreeMap<NodeUid, Rc<NetworkInfo<NodeUid>>>) -> A,
{
// This returns an error in all but the first test.

View File

@ -20,11 +20,14 @@ use rand::Rng;
use hbbft::honey_badger::{self, Batch, HoneyBadger, MessageContent};
use hbbft::messaging::{NetworkInfo, Target, TargetedMessage};
use hbbft::transaction_queue::TransactionQueue;
use network::{
Adversary, MessageScheduler, MessageWithSender, NodeUid, SilentAdversary, TestNetwork, TestNode,
};
type UsizeHoneyBadger = HoneyBadger<Vec<usize>, NodeUid>;
/// An adversary whose nodes only send messages with incorrect decryption shares.
pub struct FaultyShareAdversary {
num_good: usize,
@ -52,11 +55,8 @@ impl FaultyShareAdversary {
}
}
impl Adversary<HoneyBadger<usize, NodeUid>> for FaultyShareAdversary {
fn pick_node(
&self,
nodes: &BTreeMap<NodeUid, TestNode<HoneyBadger<usize, NodeUid>>>,
) -> NodeUid {
impl Adversary<UsizeHoneyBadger> for FaultyShareAdversary {
fn pick_node(&self, nodes: &BTreeMap<NodeUid, TestNode<UsizeHoneyBadger>>) -> NodeUid {
self.scheduler.pick_node(nodes)
}
@ -79,7 +79,7 @@ impl Adversary<HoneyBadger<usize, NodeUid>> for FaultyShareAdversary {
}
}
fn step(&mut self) -> Vec<MessageWithSender<HoneyBadger<usize, NodeUid>>> {
fn step(&mut self) -> Vec<MessageWithSender<UsizeHoneyBadger>> {
let mut outgoing = vec![];
let fake_proposal = &Vec::from("X marks the spot");
@ -118,17 +118,19 @@ impl Adversary<HoneyBadger<usize, NodeUid>> for FaultyShareAdversary {
}
/// Proposes `num_txs` values and expects nodes to output and order them.
fn test_honey_badger<A>(mut network: TestNetwork<A, HoneyBadger<usize, NodeUid>>, num_txs: usize)
fn test_honey_badger<A>(mut network: TestNetwork<A, UsizeHoneyBadger>, num_txs: usize)
where
A: Adversary<HoneyBadger<usize, NodeUid>>,
A: Adversary<UsizeHoneyBadger>,
{
for tx in 0..num_txs {
network.input_all(tx);
let new_queue = |id: &NodeUid| (*id, TransactionQueue((0..num_txs).collect()));
let mut queues: BTreeMap<_, _> = network.nodes.keys().map(new_queue).collect();
for (id, queue) in &queues {
network.input(*id, queue.choose(3, 10));
}
// Returns `true` if the node has not output all transactions yet.
// If it has, and has advanced another epoch, it clears all messages for later epochs.
let node_busy = |node: &mut TestNode<HoneyBadger<usize, NodeUid>>| {
let node_busy = |node: &mut TestNode<UsizeHoneyBadger>| {
let mut min_missing = 0;
for batch in node.outputs() {
for tx in batch.iter() {
@ -149,15 +151,22 @@ where
// Handle messages in random order until all nodes have output all transactions.
while network.nodes.values_mut().any(node_busy) {
network.step();
let id = network.step();
if !network.nodes[&id].instance().has_input() {
queues
.get_mut(&id)
.unwrap()
.remove_all(network.nodes[&id].outputs().iter().flat_map(Batch::iter));
network.input(id, queues[&id].choose(3, 10));
}
}
verify_output_sequence(&network);
}
/// Verifies that all instances output the same sequence of batches.
fn verify_output_sequence<A>(network: &TestNetwork<A, HoneyBadger<usize, NodeUid>>)
fn verify_output_sequence<A>(network: &TestNetwork<A, UsizeHoneyBadger>)
where
A: Adversary<HoneyBadger<usize, NodeUid>>,
A: Adversary<UsizeHoneyBadger>,
{
let mut expected: Option<BTreeMap<&_, &_>> = None;
for node in network.nodes.values() {
@ -168,8 +177,8 @@ where
.map(
|Batch {
epoch,
transactions,
}| (epoch, transactions),
contributions,
}| (epoch, contributions),
)
.collect();
if expected.is_none() {
@ -180,17 +189,13 @@ where
}
}
fn new_honey_badger(netinfo: Rc<NetworkInfo<NodeUid>>) -> HoneyBadger<usize, NodeUid> {
HoneyBadger::builder(netinfo)
.batch_size(12)
.build_with_transactions(0..5)
.expect("Instantiate honey_badger")
.0
fn new_honey_badger(netinfo: Rc<NetworkInfo<NodeUid>>) -> UsizeHoneyBadger {
HoneyBadger::builder(netinfo).build()
}
fn test_honey_badger_different_sizes<A, F>(new_adversary: F, num_txs: usize)
where
A: Adversary<HoneyBadger<usize, NodeUid>>,
A: Adversary<UsizeHoneyBadger>,
F: Fn(usize, usize, BTreeMap<NodeUid, Rc<NetworkInfo<NodeUid>>>) -> A,
{
// This returns an error in all but the first test.

View File

@ -42,6 +42,12 @@ impl<D: DistAlgorithm> TestNode<D> {
self.outputs.extend(self.algo.output_iter());
}
/// Returns the internal algorithm's instance.
#[allow(unused)] // Not used in all tests.
pub fn instance(&self) -> &D {
&self.algo
}
/// Creates a new test node with the given broadcast instance.
fn new(mut algo: D) -> TestNode<D> {
let outputs = algo.output_iter().collect();

View File

@ -0,0 +1,150 @@
//! Network tests for Queueing Honey Badger.
extern crate hbbft;
#[macro_use]
extern crate log;
extern crate env_logger;
extern crate pairing;
extern crate rand;
#[macro_use]
extern crate serde_derive;
mod network;
use std::cmp;
use std::collections::BTreeMap;
use std::iter::once;
use std::rc::Rc;
use rand::Rng;
use hbbft::messaging::NetworkInfo;
use hbbft::queueing_honey_badger::{Change, ChangeState, Input, QueueingHoneyBadger};
use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork, TestNode};
/// Proposes `num_txs` values and expects nodes to output and order them.
fn test_queueing_honey_badger<A>(
mut network: TestNetwork<A, QueueingHoneyBadger<usize, NodeUid>>,
num_txs: usize,
) where
A: Adversary<QueueingHoneyBadger<usize, NodeUid>>,
{
// The second half of the transactions will be input only after a node has been removed.
network.input_all(Input::Change(Change::Remove(NodeUid(0))));
for tx in 0..(num_txs / 2) {
network.input_all(Input::User(tx));
}
fn has_remove(node: &TestNode<QueueingHoneyBadger<usize, NodeUid>>) -> bool {
node.outputs()
.iter()
.any(|batch| batch.change == ChangeState::Complete(Change::Remove(NodeUid(0))))
}
fn has_add(node: &TestNode<QueueingHoneyBadger<usize, NodeUid>>) -> bool {
node.outputs().iter().any(|batch| match batch.change {
ChangeState::Complete(Change::Add(ref id, _)) => *id == NodeUid(0),
_ => false,
})
}
// Returns `true` if the node has not output all transactions yet.
// If it has, and has advanced another epoch, it clears all messages for later epochs.
let node_busy = |node: &mut TestNode<QueueingHoneyBadger<usize, NodeUid>>| {
if !has_remove(node) || !has_add(node) {
return true;
}
let mut min_missing = 0;
for batch in node.outputs() {
for tx in batch.iter() {
if *tx >= min_missing {
min_missing = tx + 1;
}
}
}
if min_missing < num_txs {
return true;
}
if node.outputs().last().unwrap().is_empty() {
let last = node.outputs().last().unwrap().epoch;
node.queue.retain(|(_, ref msg)| msg.epoch() < last);
}
false
};
let mut input_add = false;
// Handle messages in random order until all nodes have output all transactions.
while network.nodes.values_mut().any(node_busy) {
network.step();
if !input_add && network.nodes.values().all(has_remove) {
for tx in (num_txs / 2)..num_txs {
network.input_all(Input::User(tx));
}
let pk = network.pk_set.public_key_share(0);
network.input_all(Input::Change(Change::Add(NodeUid(0), pk)));
info!("Input!");
input_add = true;
}
}
verify_output_sequence(&network);
}
/// Verifies that all instances output the same sequence of batches. We already know that all of
/// them have output all transactions and events, but some may have advanced a few empty batches
/// more than others, so we ignore those.
fn verify_output_sequence<A>(network: &TestNetwork<A, QueueingHoneyBadger<usize, NodeUid>>)
where
A: Adversary<QueueingHoneyBadger<usize, NodeUid>>,
{
let expected = network.nodes[&NodeUid(0)].outputs().to_vec();
assert!(!expected.is_empty());
for node in network.nodes.values() {
let len = cmp::min(expected.len(), node.outputs().len());
assert_eq!(&expected[..len], &node.outputs()[..len]);
}
}
// Allow passing `netinfo` by value. `TestNetwork` expects this function signature.
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
fn new_queueing_hb(netinfo: Rc<NetworkInfo<NodeUid>>) -> QueueingHoneyBadger<usize, NodeUid> {
QueueingHoneyBadger::builder((*netinfo).clone())
.batch_size(12)
.build()
}
fn test_queueing_honey_badger_different_sizes<A, F>(new_adversary: F, num_txs: usize)
where
A: Adversary<QueueingHoneyBadger<usize, NodeUid>>,
F: Fn(usize, usize, BTreeMap<NodeUid, Rc<NetworkInfo<NodeUid>>>) -> A,
{
// This returns an error in all but the first test.
let _ = env_logger::try_init();
let mut rng = rand::thread_rng();
let sizes = (3..5).chain(once(rng.gen_range(6, 10)));
for size in sizes {
// The test is removing one correct node, so we allow fewer faulty ones.
let num_adv_nodes = (size - 2) / 3;
let num_good_nodes = size - num_adv_nodes;
info!(
"Network size: {} good nodes, {} faulty nodes",
num_good_nodes, num_adv_nodes
);
let adversary = |adv_nodes| new_adversary(num_good_nodes, num_adv_nodes, adv_nodes);
let network = TestNetwork::new(num_good_nodes, num_adv_nodes, adversary, new_queueing_hb);
test_queueing_honey_badger(network, num_txs);
}
}
#[test]
fn test_queueing_honey_badger_random_delivery_silent() {
let new_adversary = |_: usize, _: usize, _| SilentAdversary::new(MessageScheduler::Random);
test_queueing_honey_badger_different_sizes(new_adversary, 10);
}
#[test]
fn test_queueing_honey_badger_first_delivery_silent() {
let new_adversary = |_: usize, _: usize, _| SilentAdversary::new(MessageScheduler::First);
test_queueing_honey_badger_different_sizes(new_adversary, 10);
}