Merge pull request #63 from poanetwork/afck-hb-whose-batch

Return proposer info from HoneyBadger.
This commit is contained in:
Vladimir Komendantskiy 2018-06-19 09:53:52 +01:00 committed by GitHub
commit 1436d85455
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 126 additions and 102 deletions

View File

@ -329,7 +329,7 @@ where
/// The timestamped batches for a particular epoch that have already been output. /// The timestamped batches for a particular epoch that have already been output.
#[derive(Clone, Default)] #[derive(Clone, Default)]
struct EpochInfo { struct EpochInfo {
nodes: BTreeMap<NodeUid, (Duration, Batch<Transaction>)>, nodes: BTreeMap<NodeUid, (Duration, Batch<Transaction, NodeUid>)>,
} }
impl EpochInfo { impl EpochInfo {
@ -338,7 +338,7 @@ impl EpochInfo {
&mut self, &mut self,
id: NodeUid, id: NodeUid,
time: Duration, time: Duration,
batch: &Batch<Transaction>, batch: &Batch<Transaction, NodeUid>,
network: &TestNetwork<HoneyBadger<Transaction, NodeUid>>, network: &TestNetwork<HoneyBadger<Transaction, NodeUid>>,
) { ) {
if self.nodes.contains_key(&id) { if self.nodes.contains_key(&id) {
@ -355,7 +355,7 @@ impl EpochInfo {
.minmax() .minmax()
.into_option() .into_option()
.unwrap(); .unwrap();
let txs = batch.transactions.len(); let txs = batch.len();
println!( println!(
"{:>5} {:6} {:6} {:5} {:9} {:>9}B", "{:>5} {:6} {:6} {:5} {:9} {:>9}B",
batch.epoch.to_string().cyan(), batch.epoch.to_string().cyan(),
@ -379,7 +379,7 @@ fn simulate_honey_badger(
let node_busy = |node: &mut TestNode<HoneyBadger<Transaction, NodeUid>>| { let node_busy = |node: &mut TestNode<HoneyBadger<Transaction, NodeUid>>| {
node.outputs node.outputs
.iter() .iter()
.map(|&(_, ref batch)| batch.transactions.len()) .map(|&(_, ref batch)| batch.len())
.sum::<usize>() < num_txs .sum::<usize>() < num_txs
}; };

View File

@ -4,7 +4,6 @@ pub mod bin_values;
use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt::Debug; use std::fmt::Debug;
use std::hash::Hash;
use std::mem::replace; use std::mem::replace;
use std::rc::Rc; use std::rc::Rc;
@ -68,10 +67,7 @@ pub struct AgreementMessage {
} }
/// Binary Agreement instance /// Binary Agreement instance
pub struct Agreement<NodeUid> pub struct Agreement<NodeUid> {
where
NodeUid: Clone + Debug + Eq + Hash,
{
/// Shared network information. /// Shared network information.
netinfo: Rc<NetworkInfo<NodeUid>>, netinfo: Rc<NetworkInfo<NodeUid>>,
/// Session ID, e.g, the Honey Badger algorithm epoch. /// Session ID, e.g, the Honey Badger algorithm epoch.
@ -121,7 +117,7 @@ where
common_coin: CommonCoin<NodeUid, Nonce>, common_coin: CommonCoin<NodeUid, Nonce>,
} }
impl<NodeUid: Clone + Debug + Eq + Hash + Ord> DistAlgorithm for Agreement<NodeUid> { impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for Agreement<NodeUid> {
type NodeUid = NodeUid; type NodeUid = NodeUid;
type Input = bool; type Input = bool;
type Output = bool; type Output = bool;
@ -177,7 +173,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> DistAlgorithm for Agreement<NodeU
} }
} }
impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> { impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
pub fn new( pub fn new(
netinfo: Rc<NetworkInfo<NodeUid>>, netinfo: Rc<NetworkInfo<NodeUid>>,
session_id: u64, session_id: u64,

View File

@ -1,6 +1,5 @@
use std::collections::{BTreeMap, VecDeque}; use std::collections::{BTreeMap, VecDeque};
use std::fmt::{self, Debug}; use std::fmt::{self, Debug};
use std::hash::Hash;
use std::iter::once; use std::iter::once;
use std::rc::Rc; use std::rc::Rc;
@ -92,11 +91,11 @@ impl Debug for BroadcastMessage {
/// eventually be able to decode (i.e. receive at least `f + 1` `Echo` messages). /// eventually be able to decode (i.e. receive at least `f + 1` `Echo` messages).
/// * So a node with `2 * f + 1` `Ready`s and `f + 1` `Echos` will decode and _output_ the value, /// * So a node with `2 * f + 1` `Ready`s and `f + 1` `Echos` will decode and _output_ the value,
/// knowing that every other good node will eventually do the same. /// knowing that every other good node will eventually do the same.
pub struct Broadcast<N: Clone + Eq + Hash> { pub struct Broadcast<NodeUid> {
/// Shared network data. /// Shared network data.
netinfo: Rc<NetworkInfo<N>>, netinfo: Rc<NetworkInfo<NodeUid>>,
/// The UID of the sending node. /// The UID of the sending node.
proposer_id: N, proposer_id: NodeUid,
data_shard_num: usize, data_shard_num: usize,
coding: Coding, coding: Coding,
/// Whether we have already multicast `Echo`. /// Whether we have already multicast `Echo`.
@ -106,17 +105,17 @@ pub struct Broadcast<N: Clone + Eq + Hash> {
/// Whether we have already output a value. /// Whether we have already output a value.
decided: bool, decided: bool,
/// The proofs we have received via `Echo` messages, by sender ID. /// The proofs we have received via `Echo` messages, by sender ID.
echos: BTreeMap<N, Proof<Vec<u8>>>, echos: BTreeMap<NodeUid, Proof<Vec<u8>>>,
/// The root hashes we received via `Ready` messages, by sender ID. /// The root hashes we received via `Ready` messages, by sender ID.
readys: BTreeMap<N, Vec<u8>>, readys: BTreeMap<NodeUid, Vec<u8>>,
/// The outgoing message queue. /// The outgoing message queue.
messages: VecDeque<TargetedMessage<BroadcastMessage, N>>, messages: VecDeque<TargetedMessage<BroadcastMessage, NodeUid>>,
/// The output, if any. /// The output, if any.
output: Option<Vec<u8>>, output: Option<Vec<u8>>,
} }
impl<N: Eq + Debug + Clone + Hash + Ord> DistAlgorithm for Broadcast<N> { impl<NodeUid: Debug + Clone + Ord> DistAlgorithm for Broadcast<NodeUid> {
type NodeUid = N; type NodeUid = NodeUid;
// TODO: Allow anything serializable and deserializable, i.e. make this a type parameter // TODO: Allow anything serializable and deserializable, i.e. make this a type parameter
// T: Serialize + DeserializeOwned // T: Serialize + DeserializeOwned
type Input = Vec<u8>; type Input = Vec<u8>;
@ -136,7 +135,11 @@ impl<N: Eq + Debug + Clone + Hash + Ord> DistAlgorithm for Broadcast<N> {
self.handle_value(our_uid, proof) self.handle_value(our_uid, proof)
} }
fn handle_message(&mut self, sender_id: &N, message: Self::Message) -> BroadcastResult<()> { fn handle_message(
&mut self,
sender_id: &NodeUid,
message: Self::Message,
) -> BroadcastResult<()> {
if !self.netinfo.all_uids().contains(sender_id) { if !self.netinfo.all_uids().contains(sender_id) {
return Err(ErrorKind::UnknownSender.into()); return Err(ErrorKind::UnknownSender.into());
} }
@ -147,7 +150,7 @@ impl<N: Eq + Debug + Clone + Hash + Ord> DistAlgorithm for Broadcast<N> {
} }
} }
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, N>> { fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, NodeUid>> {
self.messages.pop_front() self.messages.pop_front()
} }
@ -159,15 +162,15 @@ impl<N: Eq + Debug + Clone + Hash + Ord> DistAlgorithm for Broadcast<N> {
self.decided self.decided
} }
fn our_id(&self) -> &N { fn our_id(&self) -> &NodeUid {
self.netinfo.our_uid() self.netinfo.our_uid()
} }
} }
impl<N: Eq + Debug + Clone + Hash + Ord> Broadcast<N> { impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
/// Creates a new broadcast instance to be used by node `our_id` which expects a value proposal /// Creates a new broadcast instance to be used by node `our_id` which expects a value proposal
/// from node `proposer_id`. /// from node `proposer_id`.
pub fn new(netinfo: Rc<NetworkInfo<N>>, proposer_id: N) -> BroadcastResult<Self> { pub fn new(netinfo: Rc<NetworkInfo<NodeUid>>, proposer_id: NodeUid) -> BroadcastResult<Self> {
let parity_shard_num = 2 * netinfo.num_faulty(); let parity_shard_num = 2 * netinfo.num_faulty();
let data_shard_num = netinfo.num_nodes() - parity_shard_num; let data_shard_num = netinfo.num_nodes() - parity_shard_num;
let coding = Coding::new(data_shard_num, parity_shard_num)?; let coding = Coding::new(data_shard_num, parity_shard_num)?;
@ -266,7 +269,7 @@ impl<N: Eq + Debug + Clone + Hash + Ord> Broadcast<N> {
} }
/// Handles a received echo and verifies the proof it contains. /// Handles a received echo and verifies the proof it contains.
fn handle_value(&mut self, sender_id: &N, p: Proof<Vec<u8>>) -> BroadcastResult<()> { fn handle_value(&mut self, sender_id: &NodeUid, p: Proof<Vec<u8>>) -> BroadcastResult<()> {
// If the sender is not the proposer, this is not the first `Value` or the proof is invalid, // If the sender is not the proposer, this is not the first `Value` or the proof is invalid,
// ignore. // ignore.
if *sender_id != self.proposer_id { if *sender_id != self.proposer_id {
@ -299,7 +302,7 @@ impl<N: Eq + Debug + Clone + Hash + Ord> Broadcast<N> {
} }
/// Handles a received `Echo` message. /// Handles a received `Echo` message.
fn handle_echo(&mut self, sender_id: &N, p: Proof<Vec<u8>>) -> BroadcastResult<()> { fn handle_echo(&mut self, sender_id: &NodeUid, p: Proof<Vec<u8>>) -> BroadcastResult<()> {
// If the proof is invalid or the sender has already sent `Echo`, ignore. // If the proof is invalid or the sender has already sent `Echo`, ignore.
if self.echos.contains_key(sender_id) { if self.echos.contains_key(sender_id) {
info!( info!(
@ -333,7 +336,7 @@ impl<N: Eq + Debug + Clone + Hash + Ord> Broadcast<N> {
} }
/// Handles a received `Ready` message. /// Handles a received `Ready` message.
fn handle_ready(&mut self, sender_id: &N, hash: &[u8]) -> BroadcastResult<()> { fn handle_ready(&mut self, sender_id: &NodeUid, hash: &[u8]) -> BroadcastResult<()> {
// If the sender has already sent a `Ready` before, ignore. // If the sender has already sent a `Ready` before, ignore.
if self.readys.contains_key(sender_id) { if self.readys.contains_key(sender_id) {
info!( info!(
@ -389,13 +392,13 @@ impl<N: Eq + Debug + Clone + Hash + Ord> Broadcast<N> {
} }
/// Returns `i` if `node_id` is the `i`-th ID among all participating nodes. /// Returns `i` if `node_id` is the `i`-th ID among all participating nodes.
fn index_of_node(&self, node_id: &N) -> Option<usize> { fn index_of_node(&self, node_id: &NodeUid) -> Option<usize> {
self.netinfo.all_uids().iter().position(|id| id == node_id) self.netinfo.all_uids().iter().position(|id| id == node_id)
} }
/// Returns `true` if the proof is valid and has the same index as the node ID. Otherwise /// Returns `true` if the proof is valid and has the same index as the node ID. Otherwise
/// logs an info message. /// logs an info message.
fn validate_proof(&self, p: &Proof<Vec<u8>>, id: &N) -> bool { fn validate_proof(&self, p: &Proof<Vec<u8>>, id: &NodeUid) -> bool {
if !p.validate(&p.root_hash) { if !p.validate(&p.root_hash) {
info!( info!(
"Node {:?} received invalid proof: {:?}", "Node {:?} received invalid proof: {:?}",

View File

@ -2,7 +2,6 @@
use std::collections::{BTreeMap, VecDeque}; use std::collections::{BTreeMap, VecDeque};
use std::fmt::Debug; use std::fmt::Debug;
use std::hash::Hash;
use std::rc::Rc; use std::rc::Rc;
use pairing::bls12_381::Bls12; use pairing::bls12_381::Bls12;
@ -44,11 +43,8 @@ impl CommonCoinMessage {
/// receiving at least `num_faulty + 1` shares, attempts to combine them into a signature. If that /// receiving at least `num_faulty + 1` shares, attempts to combine them into a signature. If that
/// signature is valid, the instance outputs it and terminates; otherwise the instance aborts. /// signature is valid, the instance outputs it and terminates; otherwise the instance aborts.
#[derive(Debug)] #[derive(Debug)]
pub struct CommonCoin<N, T> pub struct CommonCoin<NodeUid, T> {
where netinfo: Rc<NetworkInfo<NodeUid>>,
N: Clone + Debug + Eq + Hash,
{
netinfo: Rc<NetworkInfo<N>>,
/// The name of this common coin. It is required to be unique for each common coin round. /// The name of this common coin. It is required to be unique for each common coin round.
nonce: T, nonce: T,
/// The result of combination of at least `num_faulty + 1` threshold signature shares. /// The result of combination of at least `num_faulty + 1` threshold signature shares.
@ -56,19 +52,19 @@ where
/// Outgoing message queue. /// Outgoing message queue.
messages: VecDeque<CommonCoinMessage>, messages: VecDeque<CommonCoinMessage>,
/// All received threshold signature shares. /// All received threshold signature shares.
received_shares: BTreeMap<N, Signature<Bls12>>, received_shares: BTreeMap<NodeUid, Signature<Bls12>>,
/// Whether we provided input to the common coin. /// Whether we provided input to the common coin.
had_input: bool, had_input: bool,
/// Termination flag. /// Termination flag.
terminated: bool, terminated: bool,
} }
impl<N, T> DistAlgorithm for CommonCoin<N, T> impl<NodeUid, T> DistAlgorithm for CommonCoin<NodeUid, T>
where where
N: Clone + Debug + Hash + Ord, NodeUid: Clone + Debug + Ord,
T: Clone + AsRef<[u8]>, T: Clone + AsRef<[u8]>,
{ {
type NodeUid = N; type NodeUid = NodeUid;
type Input = (); type Input = ();
type Output = bool; type Output = bool;
type Message = CommonCoinMessage; type Message = CommonCoinMessage;
@ -115,12 +111,12 @@ where
} }
} }
impl<N, T> CommonCoin<N, T> impl<NodeUid, T> CommonCoin<NodeUid, T>
where where
N: Clone + Debug + Hash + Ord, NodeUid: Clone + Debug + Ord,
T: Clone + AsRef<[u8]>, T: Clone + AsRef<[u8]>,
{ {
pub fn new(netinfo: Rc<NetworkInfo<N>>, nonce: T) -> Self { pub fn new(netinfo: Rc<NetworkInfo<NodeUid>>, nonce: T) -> Self {
CommonCoin { CommonCoin {
netinfo, netinfo,
nonce, nonce,
@ -139,7 +135,7 @@ where
self.handle_share(&id, share) self.handle_share(&id, share)
} }
fn handle_share(&mut self, sender_id: &N, share: Signature<Bls12>) -> Result<()> { fn handle_share(&mut self, sender_id: &NodeUid, share: Signature<Bls12>) -> Result<()> {
if let Some(i) = self.netinfo.node_index(sender_id) { if let Some(i) = self.netinfo.node_index(sender_id) {
let pk_i = self.netinfo.public_key_set().public_key_share(*i as u64); let pk_i = self.netinfo.public_key_set().public_key_share(*i as u64);
if !pk_i.verify(&share, &self.nonce) { if !pk_i.verify(&share, &self.nonce) {
@ -163,8 +159,9 @@ where
fn combine_and_verify_sig(&self) -> Result<Signature<Bls12>> { fn combine_and_verify_sig(&self) -> Result<Signature<Bls12>> {
// Pass the indices of sender nodes to `combine_signatures`. // Pass the indices of sender nodes to `combine_signatures`.
let ids_shares: BTreeMap<&N, &Signature<Bls12>> = self.received_shares.iter().collect(); let ids_shares: BTreeMap<&NodeUid, &Signature<Bls12>> =
let ids_u64: BTreeMap<&N, u64> = ids_shares self.received_shares.iter().collect();
let ids_u64: BTreeMap<&NodeUid, u64> = ids_shares
.keys() .keys()
.map(|&id| (id, *self.netinfo.node_index(id).unwrap() as u64)) .map(|&id| (id, *self.netinfo.node_index(id).unwrap() as u64))
.collect(); .collect();

View File

@ -2,7 +2,6 @@
use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt::Debug; use std::fmt::Debug;
use std::hash::Hash;
use std::rc::Rc; use std::rc::Rc;
use agreement; use agreement;
@ -47,7 +46,7 @@ pub enum Message<NodeUid> {
#[derive(Deref, DerefMut)] #[derive(Deref, DerefMut)]
struct MessageQueue<NodeUid>(VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>); struct MessageQueue<NodeUid>(VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>);
impl<NodeUid: Clone + Debug + Eq + Hash + Ord> MessageQueue<NodeUid> { impl<NodeUid: Clone + Debug + Ord> MessageQueue<NodeUid> {
/// Appends to the queue the messages from `agr`, wrapped with `proposer_id`. /// Appends to the queue the messages from `agr`, wrapped with `proposer_id`.
fn extend_agreement(&mut self, proposer_id: &NodeUid, agr: &mut Agreement<NodeUid>) { fn extend_agreement(&mut self, proposer_id: &NodeUid, agr: &mut Agreement<NodeUid>) {
let convert = |msg: TargetedMessage<AgreementMessage, NodeUid>| { let convert = |msg: TargetedMessage<AgreementMessage, NodeUid>| {
@ -88,7 +87,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> MessageQueue<NodeUid> {
/// remaining ones, where we haven't provided input yet. /// remaining ones, where we haven't provided input yet.
/// * Once all `Agreement` instances have decided, `CommonSubset` returns the set of all proposed /// * Once all `Agreement` instances have decided, `CommonSubset` returns the set of all proposed
/// values for which the decision was "yes". /// values for which the decision was "yes".
pub struct CommonSubset<NodeUid: Clone + Debug + Eq + Hash + Ord> { pub struct CommonSubset<NodeUid> {
/// Shared network information. /// Shared network information.
netinfo: Rc<NetworkInfo<NodeUid>>, netinfo: Rc<NetworkInfo<NodeUid>>,
broadcast_instances: BTreeMap<NodeUid, Broadcast<NodeUid>>, broadcast_instances: BTreeMap<NodeUid, Broadcast<NodeUid>>,
@ -103,7 +102,7 @@ pub struct CommonSubset<NodeUid: Clone + Debug + Eq + Hash + Ord> {
decided: bool, decided: bool,
} }
impl<NodeUid: Clone + Debug + Eq + Hash + Ord> DistAlgorithm for CommonSubset<NodeUid> { impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for CommonSubset<NodeUid> {
type NodeUid = NodeUid; type NodeUid = NodeUid;
type Input = ProposedValue; type Input = ProposedValue;
type Output = BTreeMap<NodeUid, ProposedValue>; type Output = BTreeMap<NodeUid, ProposedValue>;
@ -147,7 +146,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> DistAlgorithm for CommonSubset<No
} }
} }
impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> { impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
pub fn new(netinfo: Rc<NetworkInfo<NodeUid>>, session_id: u64) -> CommonSubsetResult<Self> { pub fn new(netinfo: Rc<NetworkInfo<NodeUid>>, session_id: u64) -> CommonSubsetResult<Self> {
// Create all broadcast instances. // Create all broadcast instances.
let mut broadcast_instances: BTreeMap<NodeUid, Broadcast<NodeUid>> = BTreeMap::new(); let mut broadcast_instances: BTreeMap<NodeUid, Broadcast<NodeUid>> = BTreeMap::new();

View File

@ -1,5 +1,5 @@
use std::collections::btree_map::Entry; use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::collections::{BTreeMap, HashSet, VecDeque};
use std::fmt::Debug; use std::fmt::Debug;
use std::hash::Hash; use std::hash::Hash;
use std::rc::Rc; use std::rc::Rc;
@ -32,42 +32,46 @@ error_chain!{
} }
/// An instance of the Honey Badger Byzantine fault tolerant consensus algorithm. /// An instance of the Honey Badger Byzantine fault tolerant consensus algorithm.
pub struct HoneyBadger<T, N: Clone + Debug + Eq + Hash + Ord + Clone> { pub struct HoneyBadger<Tx, NodeUid> {
/// Shared network data. /// Shared network data.
netinfo: Rc<NetworkInfo<N>>, netinfo: Rc<NetworkInfo<NodeUid>>,
/// The buffer of transactions that have not yet been included in any output batch. /// The buffer of transactions that have not yet been included in any output batch.
buffer: Vec<T>, buffer: Vec<Tx>,
/// The earliest epoch from which we have not yet received output. /// The earliest epoch from which we have not yet received output.
epoch: u64, epoch: u64,
/// The Asynchronous Common Subset instance that decides which nodes' transactions to include, /// The Asynchronous Common Subset instance that decides which nodes' transactions to include,
/// indexed by epoch. /// indexed by epoch.
common_subsets: BTreeMap<u64, CommonSubset<N>>, common_subsets: BTreeMap<u64, CommonSubset<NodeUid>>,
/// The target number of transactions to be included in each batch. /// The target number of transactions to be included in each batch.
// TODO: Do experiments and recommend a batch size. It should be proportional to // TODO: Do experiments and recommend a batch size. It should be proportional to
// `num_nodes * num_nodes * log(num_nodes)`. // `num_nodes * num_nodes * log(num_nodes)`.
batch_size: usize, batch_size: usize,
/// The messages that need to be sent to other nodes. /// The messages that need to be sent to other nodes.
messages: MessageQueue<N>, messages: MessageQueue<NodeUid>,
/// The outputs from completed epochs. /// The outputs from completed epochs.
output: VecDeque<Batch<T>>, output: VecDeque<Batch<Tx, NodeUid>>,
} }
impl<T, N> DistAlgorithm for HoneyBadger<T, N> impl<Tx, NodeUid> DistAlgorithm for HoneyBadger<Tx, NodeUid>
where where
T: Ord + Serialize + DeserializeOwned + Debug, Tx: Eq + Hash + Serialize + DeserializeOwned + Debug,
N: Eq + Hash + Ord + Clone + Debug, NodeUid: Ord + Clone + Debug,
{ {
type NodeUid = N; type NodeUid = NodeUid;
type Input = T; type Input = Tx;
type Output = Batch<T>; type Output = Batch<Tx, NodeUid>;
type Message = Message<N>; type Message = Message<NodeUid>;
type Error = Error; type Error = Error;
fn input(&mut self, input: Self::Input) -> HoneyBadgerResult<()> { fn input(&mut self, input: Self::Input) -> HoneyBadgerResult<()> {
self.add_transactions(iter::once(input)) self.add_transactions(iter::once(input))
} }
fn handle_message(&mut self, sender_id: &N, message: Self::Message) -> HoneyBadgerResult<()> { fn handle_message(
&mut self,
sender_id: &NodeUid,
message: Self::Message,
) -> HoneyBadgerResult<()> {
if !self.netinfo.all_uids().contains(sender_id) { if !self.netinfo.all_uids().contains(sender_id) {
return Err(ErrorKind::UnknownSender.into()); return Err(ErrorKind::UnknownSender.into());
} }
@ -78,7 +82,7 @@ where
} }
} }
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, N>> { fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, NodeUid>> {
self.messages.pop_front() self.messages.pop_front()
} }
@ -90,25 +94,25 @@ where
false false
} }
fn our_id(&self) -> &N { fn our_id(&self) -> &NodeUid {
self.netinfo.our_uid() self.netinfo.our_uid()
} }
} }
// TODO: Use a threshold encryption scheme to encrypt the proposed transactions. // TODO: Use a threshold encryption scheme to encrypt the proposed transactions.
impl<T, N> HoneyBadger<T, N> impl<Tx, NodeUid> HoneyBadger<Tx, NodeUid>
where where
T: Ord + Serialize + DeserializeOwned + Debug, Tx: Eq + Hash + Serialize + DeserializeOwned + Debug,
N: Eq + Hash + Ord + Clone + Debug, NodeUid: Ord + Clone + Debug,
{ {
/// Returns a new Honey Badger instance with the given parameters, starting at epoch `0`. /// Returns a new Honey Badger instance with the given parameters, starting at epoch `0`.
pub fn new<TI>( pub fn new<TI>(
netinfo: Rc<NetworkInfo<N>>, netinfo: Rc<NetworkInfo<NodeUid>>,
batch_size: usize, batch_size: usize,
txs: TI, txs: TI,
) -> HoneyBadgerResult<Self> ) -> HoneyBadgerResult<Self>
where where
TI: IntoIterator<Item = T>, TI: IntoIterator<Item = Tx>,
{ {
let mut honey_badger = HoneyBadger { let mut honey_badger = HoneyBadger {
netinfo, netinfo,
@ -124,7 +128,10 @@ where
} }
/// Adds transactions into the buffer. /// Adds transactions into the buffer.
pub fn add_transactions<I: IntoIterator<Item = T>>(&mut self, txs: I) -> HoneyBadgerResult<()> { pub fn add_transactions<I: IntoIterator<Item = Tx>>(
&mut self,
txs: I,
) -> HoneyBadgerResult<()> {
self.buffer.extend(txs); self.buffer.extend(txs);
Ok(()) Ok(())
} }
@ -165,9 +172,9 @@ where
/// Handles a message for the common subset sub-algorithm. /// Handles a message for the common subset sub-algorithm.
fn handle_common_subset_message( fn handle_common_subset_message(
&mut self, &mut self,
sender_id: &N, sender_id: &NodeUid,
epoch: u64, epoch: u64,
message: common_subset::Message<N>, message: common_subset::Message<NodeUid>,
) -> HoneyBadgerResult<()> { ) -> HoneyBadgerResult<()> {
{ {
// Borrow the instance for `epoch`, or create it. // Borrow the instance for `epoch`, or create it.
@ -199,26 +206,32 @@ where
let old_epoch = self.epoch; let old_epoch = self.epoch;
while let Some(ser_batches) = self.take_current_output() { while let Some(ser_batches) = self.take_current_output() {
// Deserialize the output. // Deserialize the output.
let transactions: BTreeSet<T> = ser_batches let transactions: BTreeMap<NodeUid, Vec<Tx>> = ser_batches
.into_iter() .into_iter()
.flat_map(|(_, ser_batch)| { .filter_map(|(proposer_id, ser_batch)| {
// If serialization fails, the proposer of that batch is faulty. Ignore it. // If serialization fails, the proposer of that batch is faulty. Ignore it.
bincode::deserialize::<Vec<T>>(&ser_batch).unwrap_or_else(|_| Vec::new()) bincode::deserialize::<Vec<Tx>>(&ser_batch)
.ok()
.map(|proposed| (proposer_id, proposed))
}) })
.collect(); .collect();
// Remove the output transactions from our buffer. let batch = Batch {
self.buffer.retain(|tx| !transactions.contains(tx)); epoch: self.epoch,
transactions,
};
{
let tx_set: HashSet<&Tx> = batch.iter().collect();
// Remove the output transactions from our buffer.
self.buffer.retain(|tx| !tx_set.contains(&tx));
}
debug!( debug!(
"{:?} Epoch {} output {:?}", "{:?} Epoch {} output {:?}",
self.netinfo.our_uid(), self.netinfo.our_uid(),
self.epoch, self.epoch,
transactions batch.transactions,
); );
// Queue the output and advance the epoch. // Queue the output and advance the epoch.
self.output.push_back(Batch { self.output.push_back(batch);
epoch: self.epoch,
transactions,
});
self.epoch += 1; self.epoch += 1;
} }
// If we have moved to a new epoch, propose a new batch of transactions. // If we have moved to a new epoch, propose a new batch of transactions.
@ -229,7 +242,7 @@ where
} }
/// Returns the output of the current epoch's `CommonSubset` instance, if any. /// Returns the output of the current epoch's `CommonSubset` instance, if any.
fn take_current_output(&mut self) -> Option<BTreeMap<N, Vec<u8>>> { fn take_current_output(&mut self) -> Option<BTreeMap<NodeUid, Vec<u8>>> {
self.common_subsets self.common_subsets
.get_mut(&self.epoch) .get_mut(&self.epoch)
.and_then(CommonSubset::next_output) .and_then(CommonSubset::next_output)
@ -256,28 +269,45 @@ where
/// A batch of transactions the algorithm has output. /// A batch of transactions the algorithm has output.
#[derive(Clone)] #[derive(Clone)]
pub struct Batch<T> { pub struct Batch<Tx, NodeUid> {
pub epoch: u64, pub epoch: u64,
pub transactions: BTreeSet<T>, pub transactions: BTreeMap<NodeUid, Vec<Tx>>,
}
impl<Tx, NodeUid: Ord> Batch<Tx, NodeUid> {
/// Returns an iterator over all transactions included in the batch.
pub fn iter(&self) -> impl Iterator<Item = &Tx> {
self.transactions.values().flat_map(|vec| vec)
}
/// Returns the number of transactions in the batch (without detecting duplicates).
pub fn len(&self) -> usize {
self.transactions.values().map(Vec::len).sum()
}
/// Returns `true` if the batch contains no transactions.
pub fn is_empty(&self) -> bool {
self.transactions.values().all(Vec::is_empty)
}
} }
/// A message sent to or received from another node's Honey Badger instance. /// A message sent to or received from another node's Honey Badger instance.
#[cfg_attr(feature = "serialization-serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "serialization-serde", derive(Serialize, Deserialize))]
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum Message<N> { pub enum Message<NodeUid> {
/// A message belonging to the common subset algorithm in the given epoch. /// A message belonging to the common subset algorithm in the given epoch.
CommonSubset(u64, common_subset::Message<N>), CommonSubset(u64, common_subset::Message<NodeUid>),
// TODO: Decryption share. // TODO: Decryption share.
} }
/// The queue of outgoing messages in a `HoneyBadger` instance. /// The queue of outgoing messages in a `HoneyBadger` instance.
#[derive(Deref, DerefMut)] #[derive(Deref, DerefMut)]
struct MessageQueue<N>(VecDeque<TargetedMessage<Message<N>, N>>); struct MessageQueue<NodeUid>(VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>);
impl<N: Clone + Debug + Eq + Hash + Ord> MessageQueue<N> { impl<NodeUid: Clone + Debug + Ord> MessageQueue<NodeUid> {
/// Appends to the queue the messages from `cs`, wrapped with `epoch`. /// Appends to the queue the messages from `cs`, wrapped with `epoch`.
fn extend_with_epoch(&mut self, epoch: u64, cs: &mut CommonSubset<N>) { fn extend_with_epoch(&mut self, epoch: u64, cs: &mut CommonSubset<NodeUid>) {
let convert = |msg: TargetedMessage<common_subset::Message<N>, N>| { let convert = |msg: TargetedMessage<common_subset::Message<NodeUid>, NodeUid>| {
msg.map(|cs_msg| Message::CommonSubset(epoch, cs_msg)) msg.map(|cs_msg| Message::CommonSubset(epoch, cs_msg))
}; };
self.extend(cs.message_iter().map(convert)); self.extend(cs.message_iter().map(convert));

View File

@ -1,6 +1,5 @@
use std::collections::{BTreeSet, HashMap}; use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug; use std::fmt::Debug;
use std::hash::Hash;
use pairing::bls12_381::Bls12; use pairing::bls12_381::Bls12;
@ -132,17 +131,17 @@ impl<'a, D: DistAlgorithm + 'a> Iterator for OutputIter<'a, D> {
/// Common data shared between algorithms. /// Common data shared between algorithms.
#[derive(Debug)] #[derive(Debug)]
pub struct NetworkInfo<NodeUid: Clone + Eq + Hash> { pub struct NetworkInfo<NodeUid> {
our_uid: NodeUid, our_uid: NodeUid,
all_uids: BTreeSet<NodeUid>, all_uids: BTreeSet<NodeUid>,
num_nodes: usize, num_nodes: usize,
num_faulty: usize, num_faulty: usize,
secret_key: SecretKey<Bls12>, secret_key: SecretKey<Bls12>,
public_key_set: PublicKeySet<Bls12>, public_key_set: PublicKeySet<Bls12>,
node_indices: HashMap<NodeUid, usize>, node_indices: BTreeMap<NodeUid, usize>,
} }
impl<NodeUid: Clone + Hash + Ord> NetworkInfo<NodeUid> { impl<NodeUid: Clone + Ord> NetworkInfo<NodeUid> {
pub fn new( pub fn new(
our_uid: NodeUid, our_uid: NodeUid,
all_uids: BTreeSet<NodeUid>, all_uids: BTreeSet<NodeUid>,

View File

@ -33,7 +33,7 @@ where
let node_busy = |node: &mut TestNode<HoneyBadger<usize, NodeUid>>| { let node_busy = |node: &mut TestNode<HoneyBadger<usize, NodeUid>>| {
let mut min_missing = 0; let mut min_missing = 0;
for batch in node.outputs() { for batch in node.outputs() {
for tx in &batch.transactions { for tx in batch.iter() {
if *tx >= min_missing { if *tx >= min_missing {
min_missing = tx + 1; min_missing = tx + 1;
} }
@ -42,7 +42,7 @@ where
if min_missing < num_txs { if min_missing < num_txs {
return true; return true;
} }
if node.outputs().last().unwrap().transactions.is_empty() { if node.outputs().last().unwrap().is_empty() {
let last = node.outputs().last().unwrap().epoch; let last = node.outputs().last().unwrap().epoch;
node.queue.retain(|(_, ref msg)| match msg { node.queue.retain(|(_, ref msg)| match msg {
honey_badger::Message::CommonSubset(e, _) => *e < last, honey_badger::Message::CommonSubset(e, _) => *e < last,