Support a global RNG in transaction queue (#257)

* adds an RNG argument to the transaction queue

* minimal support of transaction queue RNG in tests

* added the TransactionQueue trait object

* review comments and streamlining of trait bounds

* removed the RNG from the transaction Q and placed it into QueueingHoneyBadger

* formatting fix
This commit is contained in:
Vladimir Komendantskiy 2018-10-11 14:33:03 +01:00 committed by Andreas Fackler
parent 7dc92ed34e
commit 59444fcf7b
6 changed files with 133 additions and 68 deletions

View File

@ -19,7 +19,7 @@ use std::{cmp, u64};
use colored::*;
use docopt::Docopt;
use itertools::Itertools;
use rand::Rng;
use rand::{Isaac64Rng, Rng};
use serde::de::DeserializeOwned;
use serde::Serialize;
use signifix::{metric, TryFrom};
@ -354,7 +354,7 @@ impl EpochInfo {
id: NodeId,
time: Duration,
batch: &Batch<Transaction, NodeId>,
network: &TestNetwork<QueueingHoneyBadger<Transaction, NodeId>>,
network: &TestNetwork<QueueingHoneyBadger<Transaction, NodeId, Vec<Transaction>>>,
) {
if self.nodes.contains_key(&id) {
return;
@ -385,7 +385,9 @@ impl EpochInfo {
}
/// Proposes `num_txs` values and expects nodes to output and order them.
fn simulate_honey_badger(mut network: TestNetwork<QueueingHoneyBadger<Transaction, NodeId>>) {
fn simulate_honey_badger(
mut network: TestNetwork<QueueingHoneyBadger<Transaction, NodeId, Vec<Transaction>>>,
) {
// Handle messages until all nodes have output all transactions.
println!(
"{}",
@ -437,7 +439,7 @@ fn main() {
let dyn_hb = DynamicHoneyBadger::builder().build(netinfo);
QueueingHoneyBadger::builder(dyn_hb)
.batch_size(args.flag_b)
.build_with_transactions(txs.clone())
.build_with_transactions(txs.clone(), rand::thread_rng().gen::<Isaac64Rng>())
.expect("instantiate QueueingHoneyBadger")
};
let hw_quality = HwQuality {

View File

@ -27,7 +27,7 @@ use std::fmt::{self, Display};
use std::marker::PhantomData;
use failure::{Backtrace, Context, Fail};
use rand::Rand;
use rand::{Rand, Rng};
use serde::{Deserialize, Serialize};
use dynamic_honey_badger::{self, Batch as DhbBatch, DynamicHoneyBadger, Message};
@ -93,18 +93,23 @@ pub type Result<T> = ::std::result::Result<T, Error>;
/// A Queueing Honey Badger builder, to configure the parameters and create new instances of
/// `QueueingHoneyBadger`.
pub struct QueueingHoneyBadgerBuilder<T, N: Rand> {
pub struct QueueingHoneyBadgerBuilder<T, N: Rand, Q> {
/// Shared network data.
dyn_hb: DynamicHoneyBadger<Vec<T>, N>,
/// The target number of transactions to be included in each batch.
batch_size: usize,
/// The queue of pending transactions that haven't been output in a batch yet.
queue: Q,
_phantom: PhantomData<T>,
}
impl<T, N> QueueingHoneyBadgerBuilder<T, N>
pub type QueueingHoneyBadgerWithStep<T, N, Q> = (QueueingHoneyBadger<T, N, Q>, Step<T, N, Q>);
impl<T, N, Q> QueueingHoneyBadgerBuilder<T, N, Q>
where
T: Contribution + Serialize + for<'r> Deserialize<'r> + Clone,
N: NodeIdT + Serialize + for<'r> Deserialize<'r> + Rand,
Q: TransactionQueue<T>,
{
/// Returns a new `QueueingHoneyBadgerBuilder` configured to use the node IDs and cryptographic
/// keys specified by `netinfo`.
@ -115,6 +120,7 @@ where
QueueingHoneyBadgerBuilder {
dyn_hb,
batch_size: 100,
queue: Default::default(),
_phantom: PhantomData,
}
}
@ -125,30 +131,40 @@ where
self
}
/// Sets the transaction queue object.
pub fn queue(mut self, queue: Q) -> Self {
self.queue = queue;
self
}
/// Creates a new Queueing Honey Badger instance with an empty buffer.
pub fn build(self) -> (QueueingHoneyBadger<T, N>, Step<T, N>)
pub fn build<R>(self, rng: R) -> QueueingHoneyBadgerWithStep<T, N, Q>
where
T: Contribution + Serialize + for<'r> Deserialize<'r>,
R: 'static + Rng + Send + Sync,
{
self.build_with_transactions(None)
self.build_with_transactions(None, rng)
.expect("building without transactions cannot fail")
}
/// Returns a new Queueing Honey Badger instance that starts with the given transactions in its
/// buffer.
pub fn build_with_transactions<TI>(
self,
pub fn build_with_transactions<TI, R>(
mut self,
txs: TI,
) -> Result<(QueueingHoneyBadger<T, N>, Step<T, N>)>
rng: R,
) -> Result<QueueingHoneyBadgerWithStep<T, N, Q>>
where
TI: IntoIterator<Item = T>,
T: Contribution + Serialize + for<'r> Deserialize<'r>,
R: 'static + Rng + Send + Sync,
{
let queue = TransactionQueue(txs.into_iter().collect());
self.queue.extend(txs);
let mut qhb = QueueingHoneyBadger {
dyn_hb: self.dyn_hb,
queue,
batch_size: self.batch_size,
queue: self.queue,
rng: Box::new(rng),
};
let step = qhb.propose()?;
Ok((qhb, step))
@ -157,26 +173,45 @@ where
/// A Honey Badger instance that can handle adding and removing nodes and manages a transaction
/// queue.
#[derive(Debug)]
pub struct QueueingHoneyBadger<T, N>
pub struct QueueingHoneyBadger<T, N, Q>
where
T: Contribution + Serialize + for<'r> Deserialize<'r>,
N: NodeIdT + Serialize + for<'r> Deserialize<'r> + Rand,
Q: TransactionQueue<T>,
{
/// The target number of transactions to be included in each batch.
batch_size: usize,
/// The internal `DynamicHoneyBadger` instance.
dyn_hb: DynamicHoneyBadger<Vec<T>, N>,
/// The queue of pending transactions that haven't been output in a batch yet.
queue: TransactionQueue<T>,
queue: Q,
/// Random number generator used for choosing transactions from the queue.
rng: Box<dyn Rng + Send + Sync>,
}
pub type Step<T, N> = ::Step<QueueingHoneyBadger<T, N>>;
impl<T, N, Q> fmt::Debug for QueueingHoneyBadger<T, N, Q>
where
T: Contribution + Serialize + for<'r> Deserialize<'r>,
N: NodeIdT + Serialize + for<'r> Deserialize<'r> + Rand,
Q: TransactionQueue<T>,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("QueueingHoneyBadger")
.field("batch_size", &self.batch_size)
.field("dyn_hb", &self.dyn_hb)
.field("queue", &self.queue)
.field("rng", &"<RNG>")
.finish()
}
}
impl<T, N> DistAlgorithm for QueueingHoneyBadger<T, N>
pub type Step<T, N, Q> = ::Step<QueueingHoneyBadger<T, N, Q>>;
impl<T, N, Q> DistAlgorithm for QueueingHoneyBadger<T, N, Q>
where
T: Contribution + Serialize + for<'r> Deserialize<'r> + Clone,
N: NodeIdT + Serialize + for<'r> Deserialize<'r> + Rand,
Q: TransactionQueue<T>,
{
type NodeId = N;
type Input = Input<T, N>;
@ -184,12 +219,12 @@ where
type Message = Message<N>;
type Error = Error;
fn handle_input(&mut self, input: Self::Input) -> Result<Step<T, N>> {
fn handle_input(&mut self, input: Self::Input) -> Result<Step<T, N, Q>> {
// User transactions are forwarded to `HoneyBadger` right away. Internal messages are
// in addition signed and broadcast.
let mut step = match input {
Input::User(tx) => {
self.queue.0.push_back(tx);
self.queue.push(tx);
Step::default()
}
Input::Change(change) => self
@ -202,14 +237,14 @@ where
Ok(step)
}
fn handle_message(&mut self, sender_id: &N, message: Self::Message) -> Result<Step<T, N>> {
fn handle_message(&mut self, sender_id: &N, message: Self::Message) -> Result<Step<T, N, Q>> {
let mut step = self
.dyn_hb
.handle_message(sender_id, message)
.map_err(ErrorKind::HandleMessage)?
.convert::<Self>();
for batch in &step.output {
self.queue.remove_all(batch.iter());
self.queue.remove_multiple(batch.iter());
}
step.extend(self.propose()?);
Ok(step)
@ -224,14 +259,15 @@ where
}
}
impl<T, N> QueueingHoneyBadger<T, N>
impl<T, N, Q> QueueingHoneyBadger<T, N, Q>
where
T: Contribution + Serialize + for<'r> Deserialize<'r> + Clone,
N: NodeIdT + Serialize + for<'r> Deserialize<'r> + Rand,
Q: TransactionQueue<T>,
{
/// Returns a new `QueueingHoneyBadgerBuilder` configured to use the node IDs and cryptographic
/// keys specified by `netinfo`.
pub fn builder(dyn_hb: DynamicHoneyBadger<Vec<T>, N>) -> QueueingHoneyBadgerBuilder<T, N> {
pub fn builder(dyn_hb: DynamicHoneyBadger<Vec<T>, N>) -> QueueingHoneyBadgerBuilder<T, N, Q> {
QueueingHoneyBadgerBuilder::new(dyn_hb)
}
@ -247,15 +283,15 @@ where
if self.dyn_hb.has_input() {
return false; // Previous epoch is still in progress.
}
!self.queue.0.is_empty() || self.dyn_hb.should_propose()
!self.queue.is_empty() || self.dyn_hb.should_propose()
}
/// Initiates the next epoch by proposing a batch from the queue.
fn propose(&mut self) -> Result<Step<T, N>> {
fn propose(&mut self) -> Result<Step<T, N, Q>> {
let mut step = Step::default();
while self.can_propose() {
let amount = cmp::max(1, self.batch_size / self.dyn_hb.netinfo().num_nodes());
let proposal = self.queue.choose(amount, self.batch_size);
let proposal = self.queue.choose(&mut self.rng, amount, self.batch_size);
step.extend(
self.dyn_hb
.handle_input(Input::User(proposal))

View File

@ -1,32 +1,56 @@
use std::cmp;
use std::collections::{HashSet, VecDeque};
use std::collections::HashSet;
use std::{cmp, fmt};
use rand;
use rand::{self, Rng};
use Contribution;
/// A wrapper providing a few convenience methods for a queue of pending transactions.
#[derive(Debug)]
pub struct TransactionQueue<T>(pub VecDeque<T>);
impl<T: Clone> TransactionQueue<T> {
/// An interface to the transaction queue. A transaction queue is a structural part of
/// `QueueingHoneyBadger` that manages enqueueing of transactions for a future batch and dequeueing
/// of transactions to become part of a current batch.
pub trait TransactionQueue<T>: fmt::Debug + Default + Extend<T> + Sync + Send {
/// Checks whether the queue is empty.
fn is_empty(&self) -> bool;
/// Appends an element at the end of the queue.
fn push(&mut self, t: T);
/// Returns a new set of `amount` transactions, randomly chosen from the first `batch_size`.
/// No transactions are removed from the queue.
// TODO: Return references, once the `HoneyBadger` API accepts them.
fn choose<R: Rng>(&mut self, rng: &mut R, amount: usize, batch_size: usize) -> Vec<T>;
/// Removes the given transactions from the queue.
pub fn remove_all<'a, I>(&mut self, txs: I)
fn remove_multiple<'a, I>(&mut self, txs: I)
where
I: IntoIterator<Item = &'a T>,
T: 'a + Contribution;
}
impl<T> TransactionQueue<T> for Vec<T>
where
T: Clone + fmt::Debug + Sync + Send,
{
#[inline]
fn is_empty(&self) -> bool {
self.is_empty()
}
#[inline]
fn push(&mut self, t: T) {
self.push(t);
}
fn remove_multiple<'a, I>(&mut self, txs: I)
where
I: IntoIterator<Item = &'a T>,
T: 'a + Contribution,
{
let tx_set: HashSet<_> = txs.into_iter().collect();
self.0.retain(|tx| !tx_set.contains(tx));
self.retain(|tx| !tx_set.contains(tx));
}
/// Returns a new set of `amount` transactions, randomly chosen from the first `batch_size`.
/// No transactions are removed from the queue.
// TODO: Return references, once the `HoneyBadger` API accepts them. Remove `Clone` bound.
pub fn choose(&self, amount: usize, batch_size: usize) -> Vec<T> {
let mut rng = rand::thread_rng();
let limit = cmp::min(batch_size, self.0.len());
let sample = match rand::seq::sample_iter(&mut rng, self.0.iter().take(limit), amount) {
fn choose<R: Rng>(&mut self, rng: &mut R, amount: usize, batch_size: usize) -> Vec<T> {
let limit = cmp::min(batch_size, self.len());
let sample = match rand::seq::sample_iter(rng, self.iter().take(limit), amount) {
Ok(choice) => choice,
Err(choice) => choice, // Fewer than `amount` were available, which is fine.
};

View File

@ -19,7 +19,7 @@ use std::collections::BTreeMap;
use std::sync::Arc;
use itertools::Itertools;
use rand::Rng;
use rand::{Isaac64Rng, Rng};
use hbbft::dynamic_honey_badger::{Batch, Change, ChangeState, DynamicHoneyBadger, Input};
use hbbft::transaction_queue::TransactionQueue;
@ -34,10 +34,11 @@ fn test_dynamic_honey_badger<A>(mut network: TestNetwork<A, UsizeDhb>, num_txs:
where
A: Adversary<UsizeDhb>,
{
let new_queue = |id: &NodeId| (*id, TransactionQueue((0..num_txs).collect()));
let mut rng = rand::thread_rng().gen::<Isaac64Rng>();
let new_queue = |id: &NodeId| (*id, (0..num_txs).collect::<Vec<usize>>());
let mut queues: BTreeMap<_, _> = network.nodes.keys().map(new_queue).collect();
for (id, queue) in &queues {
network.input(*id, Input::User(queue.choose(3, 10)));
for (id, queue) in &mut queues {
network.input(*id, Input::User(queue.choose(&mut rng, 3, 10)));
}
network.input_all(Input::Change(Change::Remove(NodeId(0))));
@ -85,8 +86,8 @@ where
.collect();
if let Some(id) = rng.choose(&input_ids) {
let queue = queues.get_mut(id).unwrap();
queue.remove_all(network.nodes[id].outputs().iter().flat_map(Batch::iter));
network.input(*id, Input::User(queue.choose(3, 10)));
queue.remove_multiple(network.nodes[id].outputs().iter().flat_map(Batch::iter));
network.input(*id, Input::User(queue.choose(&mut rng, 3, 10)));
}
network.step();
// Once all nodes have processed the removal of node 0, add it again.

View File

@ -127,7 +127,7 @@ fn test_honey_badger<A>(mut network: TestNetwork<A, UsizeHoneyBadger>, num_txs:
where
A: Adversary<UsizeHoneyBadger>,
{
let new_queue = |id: &NodeId| (*id, TransactionQueue((0..num_txs).collect()));
let new_queue = |id: &NodeId| (*id, (0..num_txs).collect::<Vec<usize>>());
let mut queues: BTreeMap<_, _> = network.nodes.keys().map(new_queue).collect();
// Returns `true` if the node has not output all transactions yet.
@ -149,8 +149,8 @@ where
.collect();
if let Some(id) = rng.choose(&input_ids) {
let queue = queues.get_mut(id).unwrap();
queue.remove_all(network.nodes[id].outputs().iter().flat_map(Batch::iter));
network.input(*id, queue.choose(3, 10));
queue.remove_multiple(network.nodes[id].outputs().iter().flat_map(Batch::iter));
network.input(*id, queue.choose(&mut rng, 3, 10));
} else {
network.step();
}

View File

@ -18,20 +18,21 @@ mod network;
use std::collections::BTreeMap;
use std::sync::Arc;
use itertools::Itertools;
use rand::{Isaac64Rng, Rng};
use hbbft::dynamic_honey_badger::DynamicHoneyBadger;
use hbbft::queueing_honey_badger::{Batch, Change, ChangeState, Input, QueueingHoneyBadger, Step};
use hbbft::NetworkInfo;
use itertools::Itertools;
use rand::Rng;
use network::{Adversary, MessageScheduler, NodeId, SilentAdversary, TestNetwork, TestNode};
type QHB = QueueingHoneyBadger<usize, NodeId, Vec<usize>>;
/// Proposes `num_txs` values and expects nodes to output and order them.
fn test_queueing_honey_badger<A>(
mut network: TestNetwork<A, QueueingHoneyBadger<usize, NodeId>>,
num_txs: usize,
) where
A: Adversary<QueueingHoneyBadger<usize, NodeId>>,
fn test_queueing_honey_badger<A>(mut network: TestNetwork<A, QHB>, num_txs: usize)
where
A: Adversary<QHB>,
{
// The second half of the transactions will be input only after a node has been removed.
network.input_all(Input::Change(Change::Remove(NodeId(0))));
@ -39,13 +40,13 @@ fn test_queueing_honey_badger<A>(
network.input_all(Input::User(tx));
}
fn has_remove(node: &TestNode<QueueingHoneyBadger<usize, NodeId>>) -> bool {
fn has_remove(node: &TestNode<QHB>) -> bool {
node.outputs()
.iter()
.any(|batch| *batch.change() == ChangeState::Complete(Change::Remove(NodeId(0))))
}
fn has_add(node: &TestNode<QueueingHoneyBadger<usize, NodeId>>) -> bool {
fn has_add(node: &TestNode<QHB>) -> bool {
node.outputs().iter().any(|batch| match *batch.change() {
ChangeState::Complete(Change::Add(ref id, _)) => *id == NodeId(0),
_ => false,
@ -54,7 +55,7 @@ fn test_queueing_honey_badger<A>(
// Returns `true` if the node has not output all transactions yet.
// If it has, and has advanced another epoch, it clears all messages for later epochs.
let node_busy = |node: &mut TestNode<QueueingHoneyBadger<usize, NodeId>>| {
let node_busy = |node: &mut TestNode<QHB>| {
if !has_remove(node) || !has_add(node) {
return true;
}
@ -87,16 +88,17 @@ fn test_queueing_honey_badger<A>(
// Allow passing `netinfo` by value. `TestNetwork` expects this function signature.
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
fn new_queueing_hb(
netinfo: Arc<NetworkInfo<NodeId>>,
) -> (QueueingHoneyBadger<usize, NodeId>, Step<usize, NodeId>) {
fn new_queueing_hb(netinfo: Arc<NetworkInfo<NodeId>>) -> (QHB, Step<usize, NodeId, Vec<usize>>) {
let dyn_hb = DynamicHoneyBadger::builder().build((*netinfo).clone());
QueueingHoneyBadger::builder(dyn_hb).batch_size(3).build()
let rng = rand::thread_rng().gen::<Isaac64Rng>();
QueueingHoneyBadger::builder(dyn_hb)
.batch_size(3)
.build(rng)
}
fn test_queueing_honey_badger_different_sizes<A, F>(new_adversary: F, num_txs: usize)
where
A: Adversary<QueueingHoneyBadger<usize, NodeId>>,
A: Adversary<QHB>,
F: Fn(usize, usize, BTreeMap<NodeId, Arc<NetworkInfo<NodeId>>>) -> A,
{
// This returns an error in all but the first test.