Merge pull request #127 from poanetwork/afck-dhb-votes

Minor fixes and simplifications.
This commit is contained in:
Andreas Fackler 2018-07-15 11:24:00 +02:00 committed by GitHub
commit 01ad256363
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 180 additions and 193 deletions

View File

@ -139,7 +139,7 @@ where
sender_id: &NodeUid, sender_id: &NodeUid,
message: Self::Message, message: Self::Message,
) -> Result<FaultLog<NodeUid>> { ) -> Result<FaultLog<NodeUid>> {
let epoch = message.epoch(); let epoch = message.start_epoch();
if epoch < self.start_epoch { if epoch < self.start_epoch {
return Ok(FaultLog::new()); // Obsolete message. return Ok(FaultLog::new()); // Obsolete message.
} }
@ -301,8 +301,7 @@ where
if start_epoch < self.start_epoch { if start_epoch < self.start_epoch {
let queue = mem::replace(&mut self.incoming_queue, Vec::new()); let queue = mem::replace(&mut self.incoming_queue, Vec::new());
for (sender_id, msg) in queue { for (sender_id, msg) in queue {
self.handle_message(&sender_id, msg)? fault_log.extend(self.handle_message(&sender_id, msg)?);
.merge_into(&mut fault_log);
} }
} }
Ok(fault_log) Ok(fault_log)
@ -364,6 +363,7 @@ where
self.messages self.messages
.extend_with_epoch(self.start_epoch, &mut self.honey_badger); .extend_with_epoch(self.start_epoch, &mut self.honey_badger);
self.start_epoch = epoch; self.start_epoch = epoch;
self.key_gen_msg_buffer.retain(|kg_msg| kg_msg.0 >= epoch);
let netinfo = Arc::new(self.netinfo.clone()); let netinfo = Arc::new(self.netinfo.clone());
let counter = VoteCounter::new(netinfo.clone(), epoch); let counter = VoteCounter::new(netinfo.clone(), epoch);
mem::replace(&mut self.vote_counter, counter); mem::replace(&mut self.vote_counter, counter);
@ -495,13 +495,21 @@ pub enum Message<NodeUid: Rand> {
} }
impl<NodeUid: Rand> Message<NodeUid> { impl<NodeUid: Rand> Message<NodeUid> {
pub fn epoch(&self) -> u64 { fn start_epoch(&self) -> u64 {
match *self { match *self {
Message::HoneyBadger(epoch, _) => epoch, Message::HoneyBadger(epoch, _) => epoch,
Message::KeyGen(epoch, _, _) => epoch, Message::KeyGen(epoch, _, _) => epoch,
Message::SignedVote(ref signed_vote) => signed_vote.era(), Message::SignedVote(ref signed_vote) => signed_vote.era(),
} }
} }
pub fn epoch(&self) -> u64 {
match *self {
Message::HoneyBadger(start_epoch, ref msg) => start_epoch + msg.epoch(),
Message::KeyGen(epoch, _, _) => epoch,
Message::SignedVote(ref signed_vote) => signed_vote.era(),
}
}
} }
/// The queue of outgoing messages in a `HoneyBadger` instance. /// The queue of outgoing messages in a `HoneyBadger` instance.

View File

@ -1,4 +1,4 @@
use std::collections::{btree_map, BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug; use std::fmt::Debug;
use std::hash::Hash; use std::hash::Hash;
use std::sync::Arc; use std::sync::Arc;
@ -65,8 +65,13 @@ where
sender_id: &NodeUid, sender_id: &NodeUid,
signed_vote: SignedVote<NodeUid>, signed_vote: SignedVote<NodeUid>,
) -> Result<FaultLog<NodeUid>> { ) -> Result<FaultLog<NodeUid>> {
if signed_vote.vote.era != self.era { if signed_vote.vote.era != self.era
return Ok(FaultLog::new()); // The vote is obsolete. || self
.pending
.get(&signed_vote.voter)
.map_or(false, |sv| sv.vote.num >= signed_vote.vote.num)
{
return Ok(FaultLog::new()); // The vote is obsolete or already exists.
} }
if !self.validate(&signed_vote)? { if !self.validate(&signed_vote)? {
return Ok(FaultLog::init( return Ok(FaultLog::init(
@ -74,16 +79,7 @@ where
FaultKind::InvalidVoteSignature, FaultKind::InvalidVoteSignature,
)); ));
} }
match self.pending.entry(signed_vote.voter.clone()) { self.pending.insert(signed_vote.voter.clone(), signed_vote);
btree_map::Entry::Vacant(entry) => {
entry.insert(signed_vote);
}
btree_map::Entry::Occupied(mut entry) => {
if entry.get().vote.num < signed_vote.vote.num {
entry.insert(signed_vote);
}
}
}
Ok(FaultLog::new()) Ok(FaultLog::new())
} }
@ -119,22 +115,20 @@ where
proposer_id: &NodeUid, proposer_id: &NodeUid,
signed_vote: SignedVote<NodeUid>, signed_vote: SignedVote<NodeUid>,
) -> Result<FaultLog<NodeUid>> { ) -> Result<FaultLog<NodeUid>> {
if !self.validate(&signed_vote)? || signed_vote.vote.era != self.era { if self
.committed
.get(&signed_vote.voter)
.map_or(false, |vote| vote.num >= signed_vote.vote.num)
{
return Ok(FaultLog::new()); // The vote is obsolete or already exists.
}
if signed_vote.vote.era != self.era || !self.validate(&signed_vote)? {
return Ok(FaultLog::init( return Ok(FaultLog::init(
proposer_id.clone(), proposer_id.clone(),
FaultKind::InvalidCommittedVote, FaultKind::InvalidCommittedVote,
)); ));
} }
match self.committed.entry(signed_vote.voter.clone()) { self.committed.insert(signed_vote.voter, signed_vote.vote);
btree_map::Entry::Vacant(entry) => {
entry.insert(signed_vote.vote);
}
btree_map::Entry::Occupied(mut entry) => {
if entry.get().num < signed_vote.vote.num {
entry.insert(signed_vote.vote);
}
}
}
Ok(FaultLog::new()) Ok(FaultLog::new())
} }

View File

@ -4,7 +4,6 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt::Debug; use std::fmt::Debug;
use std::hash::Hash; use std::hash::Hash;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::ops::Not;
use std::sync::Arc; use std::sync::Arc;
use bincode; use bincode;
@ -183,6 +182,8 @@ where
if !self.netinfo.is_validator() { if !self.netinfo.is_validator() {
return Ok(FaultLog::new()); return Ok(FaultLog::new());
} }
let mut fault_log = FaultLog::new();
{
let cs = match self.common_subsets.entry(self.epoch) { let cs = match self.common_subsets.entry(self.epoch) {
Entry::Occupied(entry) => entry.into_mut(), Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
@ -191,15 +192,17 @@ where
}; };
let ser_prop = bincode::serialize(&proposal)?; let ser_prop = bincode::serialize(&proposal)?;
let ciphertext = self.netinfo.public_key_set().public_key().encrypt(ser_prop); let ciphertext = self.netinfo.public_key_set().public_key().encrypt(ser_prop);
let fault_log = cs.input(bincode::serialize(&ciphertext).unwrap())?;
self.has_input = true; self.has_input = true;
fault_log.extend(cs.input(bincode::serialize(&ciphertext).unwrap())?);
self.messages.extend_with_epoch(self.epoch, cs); self.messages.extend_with_epoch(self.epoch, cs);
}
fault_log.extend(self.process_output()?);
Ok(fault_log) Ok(fault_log)
} }
/// Returns `true` if input for the current epoch has already been provided. /// Returns `true` if input for the current epoch has already been provided.
pub fn has_input(&self) -> bool { pub fn has_input(&self) -> bool {
self.has_input !self.netinfo.is_validator() || self.has_input
} }
/// Handles a message for the given epoch. /// Handles a message for the given epoch.
@ -265,7 +268,7 @@ where
if let Some(ciphertext) = self if let Some(ciphertext) = self
.ciphertexts .ciphertexts
.get(&self.epoch) .get(&epoch)
.and_then(|cts| cts.get(&proposer_id)) .and_then(|cts| cts.get(&proposer_id))
{ {
if !self.verify_decryption_share(sender_id, &share, ciphertext) { if !self.verify_decryption_share(sender_id, &share, ciphertext) {
@ -275,19 +278,17 @@ where
} }
} }
{
// Insert the share. // Insert the share.
let proposer_shares = self self.received_shares
.received_shares
.entry(epoch) .entry(epoch)
.or_insert_with(BTreeMap::new) .or_insert_with(BTreeMap::new)
.entry(proposer_id.clone()) .entry(proposer_id.clone())
.or_insert_with(BTreeMap::new); .or_insert_with(BTreeMap::new)
proposer_shares.insert(sender_id.clone(), share); .insert(sender_id.clone(), share);
}
if epoch == self.epoch && self.try_decrypt_proposer_contribution(proposer_id) { if epoch == self.epoch {
self.try_output_batch()?.merge_into(&mut fault_log); self.try_decrypt_proposer_contribution(proposer_id);
fault_log.extend(self.try_decrypt_and_output_batch()?);
} }
Ok(fault_log) Ok(fault_log)
@ -309,8 +310,8 @@ where
} }
} }
/// When contributions of transactions have been decrypted for all valid proposers in this epoch, /// When contributions of transactions have been decrypted for all valid proposers in this
/// moves those transactions into a batch, outputs the batch and updates the epoch. /// epoch, moves those contributions into a batch, outputs the batch and updates the epoch.
fn try_output_batch(&mut self) -> HoneyBadgerResult<FaultLog<NodeUid>> { fn try_output_batch(&mut self) -> HoneyBadgerResult<FaultLog<NodeUid>> {
// Wait until contributions have been successfully decoded for all proposer nodes with correct // Wait until contributions have been successfully decoded for all proposer nodes with correct
// ciphertext outputs. // ciphertext outputs.
@ -342,7 +343,7 @@ where
"{:?} Epoch {} output {:?}", "{:?} Epoch {} output {:?}",
self.netinfo.our_uid(), self.netinfo.our_uid(),
self.epoch, self.epoch,
batch.contributions batch.contributions.keys().collect::<Vec<_>>()
); );
// Queue the output and advance the epoch. // Queue the output and advance the epoch.
self.output.push_back(batch); self.output.push_back(batch);
@ -373,46 +374,48 @@ where
Ok(fault_log) Ok(fault_log)
} }
/// Tries to decrypt transaction contributions from all proposers and output those transactions in /// Tries to decrypt contributions from all proposers and output those in a batch.
/// a batch.
fn try_decrypt_and_output_batch(&mut self) -> HoneyBadgerResult<FaultLog<NodeUid>> { fn try_decrypt_and_output_batch(&mut self) -> HoneyBadgerResult<FaultLog<NodeUid>> {
if let Some(proposer_ids) = self // Return if we don't have ciphertexts yet.
.received_shares let proposer_ids: Vec<_> = match self.ciphertexts.get(&self.epoch) {
.get(&self.epoch) Some(cts) => cts.keys().cloned().collect(),
.map(|shares| shares.keys().cloned().collect::<BTreeSet<NodeUid>>()) None => {
{ return Ok(FaultLog::new());
// Try to output a batch if there is a non-empty set of proposers for which we have
// already received decryption shares.
if !proposer_ids.is_empty()
&& proposer_ids
.iter()
.all(|proposer_id| self.try_decrypt_proposer_contribution(proposer_id.clone()))
{
return self.try_output_batch();
} }
};
// Try to output a batch if all contributions have been decrypted.
for proposer_id in proposer_ids {
self.try_decrypt_proposer_contribution(proposer_id);
} }
Ok(FaultLog::new()) self.try_output_batch()
} }
/// Returns true if and only if contributions have been decrypted for all selected proposers in /// Returns true if and only if contributions have been decrypted for all selected proposers in
/// this epoch. /// this epoch.
fn all_contributions_decrypted(&mut self) -> bool { fn all_contributions_decrypted(&mut self) -> bool {
let ciphertexts = self match self.ciphertexts.get(&self.epoch) {
.ciphertexts None => false, // No ciphertexts yet.
.entry(self.epoch) Some(ciphertexts) => ciphertexts.keys().eq(self.decrypted_contributions.keys()),
.or_insert_with(BTreeMap::new); }
let all_ciphertext_proposers: BTreeSet<_> = ciphertexts.keys().collect();
let all_decrypted_contribution_proposers: BTreeSet<_> =
self.decrypted_contributions.keys().collect();
all_ciphertext_proposers == all_decrypted_contribution_proposers
} }
/// Tries to decrypt the contribution from a given proposer. Outputs `true` if and only if /// Tries to decrypt the contribution from a given proposer.
/// decryption finished without errors. fn try_decrypt_proposer_contribution(&mut self, proposer_id: NodeUid) {
fn try_decrypt_proposer_contribution(&mut self, proposer_id: NodeUid) -> bool { if self.decrypted_contributions.contains_key(&proposer_id) {
let shares = &self.received_shares[&self.epoch][&proposer_id]; return; // Already decrypted.
}
let shares = if let Some(shares) = self
.received_shares
.get(&self.epoch)
.and_then(|sh| sh.get(&proposer_id))
{
shares
} else {
return;
};
if shares.len() <= self.netinfo.num_faulty() { if shares.len() <= self.netinfo.num_faulty() {
return false; return;
} }
if let Some(ciphertext) = self if let Some(ciphertext) = self
@ -428,17 +431,17 @@ where
.into_iter() .into_iter()
.map(|(id, share)| (&ids_u64[id], share)) .map(|(id, share)| (&ids_u64[id], share))
.collect(); .collect();
if let Ok(decrypted_contribution) = self match self
.netinfo .netinfo
.public_key_set() .public_key_set()
.decrypt(indexed_shares, ciphertext) .decrypt(indexed_shares, ciphertext)
{ {
self.decrypted_contributions Ok(contrib) => {
.insert(proposer_id, decrypted_contribution); self.decrypted_contributions.insert(proposer_id, contrib);
return true; }
Err(err) => error!("{:?} Decryption failed: {:?}.", self.our_id(), err),
} }
} }
false
} }
fn send_decryption_shares( fn send_decryption_shares(
@ -446,6 +449,7 @@ where
cs_output: BTreeMap<NodeUid, Vec<u8>>, cs_output: BTreeMap<NodeUid, Vec<u8>>,
) -> HoneyBadgerResult<FaultLog<NodeUid>> { ) -> HoneyBadgerResult<FaultLog<NodeUid>> {
let mut fault_log = FaultLog::new(); let mut fault_log = FaultLog::new();
let mut ciphertexts = BTreeMap::new();
for (proposer_id, v) in cs_output { for (proposer_id, v) in cs_output {
let mut ciphertext: Ciphertext; let mut ciphertext: Ciphertext;
if let Ok(ct) = bincode::deserialize(&v) { if let Ok(ct) = bincode::deserialize(&v) {
@ -460,19 +464,19 @@ where
self.verify_pending_decryption_shares(&proposer_id, &ciphertext); self.verify_pending_decryption_shares(&proposer_id, &ciphertext);
self.remove_incorrect_decryption_shares(&proposer_id, incorrect_senders); self.remove_incorrect_decryption_shares(&proposer_id, incorrect_senders);
fault_log.extend(faults); fault_log.extend(faults);
let (valid, dec_fl) = self.send_decryption_share(&proposer_id, &ciphertext)?;
if !self.send_decryption_share(&proposer_id, &ciphertext)? { fault_log.extend(dec_fl);
if valid {
ciphertexts.insert(proposer_id.clone(), ciphertext);
self.try_decrypt_proposer_contribution(proposer_id);
} else {
warn!("Share decryption failed for proposer {:?}", proposer_id); warn!("Share decryption failed for proposer {:?}", proposer_id);
let fault_kind = FaultKind::ShareDecryptionFailed; let fault_kind = FaultKind::ShareDecryptionFailed;
fault_log.append(proposer_id.clone(), fault_kind); fault_log.append(proposer_id.clone(), fault_kind);
continue;
} }
let ciphertexts = self
.ciphertexts
.entry(self.epoch)
.or_insert_with(BTreeMap::new);
ciphertexts.insert(proposer_id, ciphertext);
} }
self.ciphertexts.insert(self.epoch, ciphertexts);
fault_log.extend(self.try_decrypt_and_output_batch()?);
Ok(fault_log) Ok(fault_log)
} }
@ -481,12 +485,12 @@ where
&mut self, &mut self,
proposer_id: &NodeUid, proposer_id: &NodeUid,
ciphertext: &Ciphertext, ciphertext: &Ciphertext,
) -> HoneyBadgerResult<BoolWithFaultLog<NodeUid>> { ) -> HoneyBadgerResult<(bool, FaultLog<NodeUid>)> {
if !self.netinfo.is_validator() { if !self.netinfo.is_validator() {
return Ok(ciphertext.verify().into()); return Ok((ciphertext.verify(), FaultLog::new()));
} }
let share = match self.netinfo.secret_key().decrypt_share(&ciphertext) { let share = match self.netinfo.secret_key().decrypt_share(&ciphertext) {
None => return Ok(BoolWithFaultLog::False), None => return Ok((false, FaultLog::new())),
Some(share) => share, Some(share) => share,
}; };
// Send the share to remote nodes. // Send the share to remote nodes.
@ -496,11 +500,12 @@ where
}; };
let message = Target::All.message(content.with_epoch(self.epoch)); let message = Target::All.message(content.with_epoch(self.epoch));
self.messages.0.push_back(message); self.messages.0.push_back(message);
let our_id = self.netinfo.our_uid().clone();
let epoch = self.epoch; let epoch = self.epoch;
let our_id = self.netinfo.our_uid().clone();
// Receive the share locally. // Receive the share locally.
self.handle_decryption_share_message(&our_id, epoch, proposer_id.clone(), share) let fault_log =
.map(|fault_log| fault_log.into()) self.handle_decryption_share_message(&our_id, epoch, proposer_id.clone(), share)?;
Ok((true, fault_log))
} }
/// Verifies the shares of the current epoch that are pending verification. Returned are the /// Verifies the shares of the current epoch that are pending verification. Returned are the
@ -582,7 +587,7 @@ where
} }
/// A batch of contributions the algorithm has output. /// A batch of contributions the algorithm has output.
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct Batch<C, NodeUid> { pub struct Batch<C, NodeUid> {
pub epoch: u64, pub epoch: u64,
pub contributions: BTreeMap<NodeUid, C>, pub contributions: BTreeMap<NodeUid, C>,
@ -676,37 +681,3 @@ impl<NodeUid: Clone + Debug + Ord + Rand> MessageQueue<NodeUid> {
self.extend(cs.message_iter().map(convert)); self.extend(cs.message_iter().map(convert));
} }
} }
// The return type for `HoneyBadger` methods that return a boolean and a
// fault log.
enum BoolWithFaultLog<NodeUid: Clone> {
True(FaultLog<NodeUid>),
False,
}
impl<NodeUid: Clone> Into<BoolWithFaultLog<NodeUid>> for bool {
fn into(self) -> BoolWithFaultLog<NodeUid> {
if self {
BoolWithFaultLog::True(FaultLog::new())
} else {
BoolWithFaultLog::False
}
}
}
impl<NodeUid: Clone> Into<BoolWithFaultLog<NodeUid>> for FaultLog<NodeUid> {
fn into(self) -> BoolWithFaultLog<NodeUid> {
BoolWithFaultLog::True(self)
}
}
impl<NodeUid: Clone> Not for BoolWithFaultLog<NodeUid> {
type Output = bool;
fn not(self) -> Self::Output {
match self {
BoolWithFaultLog::False => true,
_ => false,
}
}
}

View File

@ -2,12 +2,13 @@
//! //!
//! This works exactly like Dynamic Honey Badger, but it has a transaction queue built in. //! This works exactly like Dynamic Honey Badger, but it has a transaction queue built in.
use rand::Rand; use std::cmp;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::fmt::Debug; use std::fmt::Debug;
use std::hash::Hash; use std::hash::Hash;
use std::marker::PhantomData; use std::marker::PhantomData;
use rand::Rand;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use dynamic_honey_badger::{self, Batch as DhbBatch, DynamicHoneyBadger, Message}; use dynamic_honey_badger::{self, Batch as DhbBatch, DynamicHoneyBadger, Message};
@ -155,9 +156,7 @@ where
self.queue.remove_all(batch.iter()); self.queue.remove_all(batch.iter());
self.output.push_back(batch); self.output.push_back(batch);
} }
if !self.dyn_hb.has_input() {
self.propose()?.merge_into(&mut fault_log); self.propose()?.merge_into(&mut fault_log);
}
Ok(fault_log) Ok(fault_log)
} }
@ -196,9 +195,18 @@ where
/// Initiates the next epoch by proposing a batch from the queue. /// Initiates the next epoch by proposing a batch from the queue.
fn propose(&mut self) -> Result<FaultLog<NodeUid>> { fn propose(&mut self) -> Result<FaultLog<NodeUid>> {
let amount = self.batch_size / self.dyn_hb.netinfo().num_nodes(); let amount = cmp::max(1, self.batch_size / self.dyn_hb.netinfo().num_nodes());
// TODO: This will loop forever if we are the only validator.
let mut fault_log = FaultLog::new();
while !self.dyn_hb.has_input() {
let proposal = self.queue.choose(amount, self.batch_size); let proposal = self.queue.choose(amount, self.batch_size);
Ok(self.dyn_hb.input(Input::User(proposal))?) fault_log.extend(self.dyn_hb.input(Input::User(proposal))?);
while let Some(batch) = self.dyn_hb.next_output() {
self.queue.remove_all(batch.iter());
self.output.push_back(batch);
}
}
Ok(fault_log)
} }
} }

View File

@ -15,7 +15,6 @@ mod network;
use std::cmp; use std::cmp;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::iter::once;
use std::sync::Arc; use std::sync::Arc;
use rand::Rng; use rand::Rng;
@ -61,13 +60,11 @@ where
return true; return true;
} }
let mut min_missing = 0; let mut min_missing = 0;
for batch in node.outputs() { for tx in node.outputs().iter().flat_map(Batch::iter) {
for tx in batch.iter() {
if *tx >= min_missing { if *tx >= min_missing {
min_missing = tx + 1; min_missing = tx + 1;
} }
} }
}
if min_missing < num_txs { if min_missing < num_txs {
return true; return true;
} }
@ -78,17 +75,28 @@ where
false false
}; };
let mut rng = rand::thread_rng();
let mut input_add = false; let mut input_add = false;
// Handle messages in random order until all nodes have output all transactions. // Handle messages in random order until all nodes have output all transactions.
while network.nodes.values_mut().any(node_busy) { while network.nodes.values_mut().any(node_busy) {
let id = network.step(); // If a node is expecting input, take it from the queue. Otherwise handle a message.
if !network.nodes[&id].instance().has_input() { let input_ids: Vec<_> = network
queues .nodes
.get_mut(&id) .iter()
.unwrap() .filter(|(_, node)| {
.remove_all(network.nodes[&id].outputs().iter().flat_map(Batch::iter)); !node.instance().has_input() && node.instance().netinfo().is_validator()
network.input(id, Input::User(queues[&id].choose(3, 10))); })
.map(|(id, _)| *id)
.collect();
if let Some(id) = rng.choose(&input_ids) {
let queue = queues.get_mut(id).unwrap();
queue.remove_all(network.nodes[id].outputs().iter().flat_map(Batch::iter));
network.input(*id, Input::User(queue.choose(3, 10)));
} else {
network.step();
} }
// Once all nodes have processed the removal of node 0, add it again.
if !input_add && network.nodes.values().all(has_remove) { if !input_add && network.nodes.values().all(has_remove) {
let pk = network.pk_set.public_key_share(0); let pk = network.pk_set.public_key_share(0);
network.input_all(Input::Change(Change::Add(NodeUid(0), pk))); network.input_all(Input::Change(Change::Add(NodeUid(0), pk)));
@ -129,7 +137,8 @@ where
let _ = env_logger::try_init(); let _ = env_logger::try_init();
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let sizes = (3..5).chain(once(rng.gen_range(6, 10))); // TODO: This should also work with two nodes.
let sizes = vec![3, 5, rng.gen_range(6, 10)];
for size in sizes { for size in sizes {
// The test is removing one correct node, so we allow fewer faulty ones. // The test is removing one correct node, so we allow fewer faulty ones.
let num_adv_nodes = (size - 2) / 3; let num_adv_nodes = (size - 2) / 3;

View File

@ -15,7 +15,6 @@ extern crate serde_derive;
mod network; mod network;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::iter::once;
use std::sync::Arc; use std::sync::Arc;
use rand::Rng; use rand::Rng;
@ -127,21 +126,16 @@ where
{ {
let new_queue = |id: &NodeUid| (*id, TransactionQueue((0..num_txs).collect())); let new_queue = |id: &NodeUid| (*id, TransactionQueue((0..num_txs).collect()));
let mut queues: BTreeMap<_, _> = network.nodes.keys().map(new_queue).collect(); let mut queues: BTreeMap<_, _> = network.nodes.keys().map(new_queue).collect();
for (id, queue) in &queues {
network.input(*id, queue.choose(3, 10));
}
// Returns `true` if the node has not output all transactions yet. // Returns `true` if the node has not output all transactions yet.
// If it has, and has advanced another epoch, it clears all messages for later epochs. // If it has, and has advanced another epoch, it clears all messages for later epochs.
let node_busy = |node: &mut TestNode<UsizeHoneyBadger>| { let node_busy = |node: &mut TestNode<UsizeHoneyBadger>| {
let mut min_missing = 0; let mut min_missing = 0;
for batch in node.outputs() { for tx in node.outputs().iter().flat_map(Batch::iter) {
for tx in batch.iter() {
if *tx >= min_missing { if *tx >= min_missing {
min_missing = tx + 1; min_missing = tx + 1;
} }
} }
}
if min_missing < num_txs { if min_missing < num_txs {
return true; return true;
} }
@ -152,15 +146,23 @@ where
false false
}; };
let mut rng = rand::thread_rng();
// Handle messages in random order until all nodes have output all transactions. // Handle messages in random order until all nodes have output all transactions.
while network.nodes.values_mut().any(node_busy) { while network.nodes.values_mut().any(node_busy) {
let id = network.step(); // If a node is expecting input, take it from the queue. Otherwise handle a message.
if !network.nodes[&id].instance().has_input() { let input_ids: Vec<_> = network
queues .nodes
.get_mut(&id) .iter()
.unwrap() .filter(|(_, node)| !node.instance().has_input())
.remove_all(network.nodes[&id].outputs().iter().flat_map(Batch::iter)); .map(|(id, _)| *id)
network.input(id, queues[&id].choose(3, 10)); .collect();
if let Some(id) = rng.choose(&input_ids) {
let queue = queues.get_mut(id).unwrap();
queue.remove_all(network.nodes[id].outputs().iter().flat_map(Batch::iter));
network.input(*id, queue.choose(3, 10));
} else {
network.step();
} }
} }
verify_output_sequence(&network); verify_output_sequence(&network);
@ -205,9 +207,7 @@ where
let _ = env_logger::try_init(); let _ = env_logger::try_init();
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let sizes = (2..5) let sizes = vec![1, 2, 3, 5, rng.gen_range(6, 10)];
.chain(once(rng.gen_range(6, 10)))
.chain(once(rng.gen_range(11, 15)));
for size in sizes { for size in sizes {
let num_adv_nodes = (size - 1) / 3; let num_adv_nodes = (size - 1) / 3;
let num_good_nodes = size - num_adv_nodes; let num_good_nodes = size - num_adv_nodes;
@ -224,13 +224,13 @@ where
#[test] #[test]
fn test_honey_badger_random_delivery_silent() { fn test_honey_badger_random_delivery_silent() {
let new_adversary = |_: usize, _: usize, _| SilentAdversary::new(MessageScheduler::Random); let new_adversary = |_: usize, _: usize, _| SilentAdversary::new(MessageScheduler::Random);
test_honey_badger_different_sizes(new_adversary, 10); test_honey_badger_different_sizes(new_adversary, 30);
} }
#[test] #[test]
fn test_honey_badger_first_delivery_silent() { fn test_honey_badger_first_delivery_silent() {
let new_adversary = |_: usize, _: usize, _| SilentAdversary::new(MessageScheduler::First); let new_adversary = |_: usize, _: usize, _| SilentAdversary::new(MessageScheduler::First);
test_honey_badger_different_sizes(new_adversary, 10); test_honey_badger_different_sizes(new_adversary, 30);
} }
#[test] #[test]

View File

@ -15,11 +15,10 @@ mod network;
use std::cmp; use std::cmp;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::iter::once;
use std::sync::Arc; use std::sync::Arc;
use hbbft::messaging::NetworkInfo; use hbbft::messaging::NetworkInfo;
use hbbft::queueing_honey_badger::{Change, ChangeState, Input, QueueingHoneyBadger}; use hbbft::queueing_honey_badger::{Batch, Change, ChangeState, Input, QueueingHoneyBadger};
use rand::Rng; use rand::Rng;
use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork, TestNode}; use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork, TestNode};
@ -57,13 +56,11 @@ fn test_queueing_honey_badger<A>(
return true; return true;
} }
let mut min_missing = 0; let mut min_missing = 0;
for batch in node.outputs() { for tx in node.outputs().iter().flat_map(Batch::iter) {
for tx in batch.iter() {
if *tx >= min_missing { if *tx >= min_missing {
min_missing = tx + 1; min_missing = tx + 1;
} }
} }
}
if min_missing < num_txs { if min_missing < num_txs {
return true; return true;
} }
@ -110,7 +107,7 @@ where
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
fn new_queueing_hb(netinfo: Arc<NetworkInfo<NodeUid>>) -> QueueingHoneyBadger<usize, NodeUid> { fn new_queueing_hb(netinfo: Arc<NetworkInfo<NodeUid>>) -> QueueingHoneyBadger<usize, NodeUid> {
QueueingHoneyBadger::builder((*netinfo).clone()) QueueingHoneyBadger::builder((*netinfo).clone())
.batch_size(12) .batch_size(3)
.build() .build()
} }
@ -123,7 +120,7 @@ where
let _ = env_logger::try_init(); let _ = env_logger::try_init();
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let sizes = (3..5).chain(once(rng.gen_range(6, 10))); let sizes = vec![3, 5, rng.gen_range(6, 10)];
for size in sizes { for size in sizes {
// The test is removing one correct node, so we allow fewer faulty ones. // The test is removing one correct node, so we allow fewer faulty ones.
let num_adv_nodes = (size - 2) / 3; let num_adv_nodes = (size - 2) / 3;
@ -141,11 +138,11 @@ where
#[test] #[test]
fn test_queueing_honey_badger_random_delivery_silent() { fn test_queueing_honey_badger_random_delivery_silent() {
let new_adversary = |_: usize, _: usize, _| SilentAdversary::new(MessageScheduler::Random); let new_adversary = |_: usize, _: usize, _| SilentAdversary::new(MessageScheduler::Random);
test_queueing_honey_badger_different_sizes(new_adversary, 10); test_queueing_honey_badger_different_sizes(new_adversary, 30);
} }
#[test] #[test]
fn test_queueing_honey_badger_first_delivery_silent() { fn test_queueing_honey_badger_first_delivery_silent() {
let new_adversary = |_: usize, _: usize, _| SilentAdversary::new(MessageScheduler::First); let new_adversary = |_: usize, _: usize, _| SilentAdversary::new(MessageScheduler::First);
test_queueing_honey_badger_different_sizes(new_adversary, 10); test_queueing_honey_badger_different_sizes(new_adversary, 30);
} }