Allow arbitrary validator set changes in DHB. (#339)

* Allow arbitrary validator set changes in DHB.

This replaces `NodeChange` with a full list of IDs and public keys,
instead of just a single to-be-added or to-be-removed node, to allow
completely replacing the set of validators by any arbitrary new set in a
single key generation step.

* Address review comments: added_nodes, comments.

* Fix MessageScheduler::First.

Make sure every node eventually gets to handle its messages.
This commit is contained in:
Andreas Fackler 2018-11-18 10:17:33 +01:00 committed by GitHub
parent 767944c0f6
commit e89688bbd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 189 additions and 245 deletions

View File

@ -9,7 +9,7 @@ use {NetworkInfo, NodeIdT};
/// A batch of transactions the algorithm has output.
#[derive(Clone, Debug)]
pub struct Batch<C, N> {
pub struct Batch<C, N: Ord> {
/// The sequence number: there is exactly one batch in each epoch.
pub(super) epoch: u64,
/// The current `DynamicHoneyBadger` era.

View File

@ -1,49 +1,25 @@
use std::collections::BTreeMap;
use crypto::PublicKey;
use serde_derive::{Deserialize, Serialize};
use super::EncryptionSchedule;
#[derive(Clone, Copy, Eq, PartialEq, Serialize, Deserialize, Hash, Debug)]
pub enum NodeChange<N> {
/// Add a node. The public key is used only temporarily, for key generation.
Add(N, PublicKey),
/// Remove a node.
Remove(N),
}
impl<N> NodeChange<N> {
/// Returns the ID of the current candidate for being added, if any.
pub fn candidate(&self) -> Option<&N> {
match *self {
NodeChange::Add(ref id, _) => Some(id),
NodeChange::Remove(_) => None,
}
}
}
/// A node change action: adding or removing a node.
#[derive(Clone, Eq, PartialEq, Serialize, Deserialize, Hash, Debug)]
pub enum Change<N> {
// Add or Remove a node from the set of validators
NodeChange(NodeChange<N>),
pub enum Change<N: Ord> {
/// Change the set of validators to the one in the provided map. There are no restrictions on
/// the new set of validators. In particular, it can be disjoint with the current set of
/// validators.
NodeChange(BTreeMap<N, PublicKey>),
/// Change the threshold encryption schedule.
/// Increase frequency to prevent censorship or decrease frequency for increased throughput.
EncryptionSchedule(EncryptionSchedule),
}
impl<N> Change<N> {
/// Returns the ID of the current candidate for being added, if any.
pub fn candidate(&self) -> Option<&N> {
match self {
Change::NodeChange(node_change) => node_change.candidate(),
_ => None,
}
}
}
/// A change status: whether a change to the network is currently in progress or completed.
#[derive(Clone, Eq, PartialEq, Serialize, Deserialize, Hash, Debug)]
pub enum ChangeState<N> {
pub enum ChangeState<N: Ord> {
/// No change is currently being considered.
None,
/// A change is currently in progress. If it is a node addition, all broadcast messages must be

View File

@ -5,15 +5,14 @@ use std::{fmt, result};
use bincode;
use crypto::{PublicKey, Signature};
use derivative::Derivative;
use log::{debug, warn};
use log::debug;
use rand::{self, Rand};
use serde::{de::DeserializeOwned, Serialize};
use super::votes::{SignedVote, VoteCounter};
use super::{
Batch, Change, ChangeState, DynamicHoneyBadgerBuilder, EncryptionSchedule, Error, ErrorKind,
Input, InternalContrib, KeyGenMessage, KeyGenState, Message, NodeChange, Result,
SignedKeyGenMsg, Step,
Input, InternalContrib, KeyGenMessage, KeyGenState, Message, Result, SignedKeyGenMsg, Step,
};
use fault_log::{Fault, FaultKind, FaultLog};
use honey_badger::{self, HoneyBadger, Message as HbMessage};
@ -25,7 +24,7 @@ use {Contribution, DistAlgorithm, Epoched, NetworkInfo, NodeIdT, Target};
/// A Honey Badger instance that can handle adding and removing nodes.
#[derive(Derivative)]
#[derivative(Debug)]
pub struct DynamicHoneyBadger<C, N: Rand> {
pub struct DynamicHoneyBadger<C, N: Rand + Ord> {
/// Shared network data.
pub(super) netinfo: NetworkInfo<N>,
/// The maximum number of future epochs for which we handle messages simultaneously.
@ -135,15 +134,19 @@ where
/// This stores a pending vote for the change. It will be included in some future batch, and
/// once enough validators have been voted for the same change, it will take effect.
pub fn vote_to_add(&mut self, node_id: N, pub_key: PublicKey) -> Result<Step<C, N>> {
self.vote_for(Change::NodeChange(NodeChange::Add(node_id, pub_key)))
let mut pub_keys = self.netinfo.public_key_map().clone();
pub_keys.insert(node_id, pub_key);
self.vote_for(Change::NodeChange(pub_keys))
}
/// Casts a vote to demote a validator to observer.
///
/// This stores a pending vote for the change. It will be included in some future batch, and
/// once enough validators have been voted for the same change, it will take effect.
pub fn vote_to_remove(&mut self, node_id: N) -> Result<Step<C, N>> {
self.vote_for(Change::NodeChange(NodeChange::Remove(node_id)))
pub fn vote_to_remove(&mut self, node_id: &N) -> Result<Step<C, N>> {
let mut pub_keys = self.netinfo.public_key_map().clone();
pub_keys.remove(node_id);
self.vote_for(Change::NodeChange(pub_keys))
}
/// Handles a message received from `sender_id`.
@ -193,15 +196,8 @@ where
if self.vote_counter.pending_votes().any(is_our_vote) {
return true; // We have pending input to vote for a validator change.
}
let kgs = match self.key_gen_state {
None => return false, // No ongoing key generation.
Some(ref kgs) => kgs,
};
// If either we or the candidate have a pending key gen message, we should propose.
let ours_or_candidates = |msg: &SignedKeyGenMsg<_>| {
msg.1 == *self.our_id() || Some(&msg.1) == kgs.change.candidate()
};
self.key_gen_msg_buffer.iter().any(ours_or_candidates)
// If we have a pending key gen message, we should propose.
!self.key_gen_msg_buffer.is_empty()
}
/// Handles a message for the `HoneyBadger` instance.
@ -293,16 +289,18 @@ where
}
let change = if let Some(kgs) = self.take_ready_key_gen() {
// If DKG completed, apply the change, restart Honey Badger, and inform the user.
debug!("{}: DKG for {:?} complete!", self, kgs.change);
debug!("{}: DKG for complete for: {:?}", self, kgs.public_keys());
self.netinfo = kgs.key_gen.into_network_info()?;
self.restart_honey_badger(batch_epoch + 1, None);
ChangeState::Complete(Change::NodeChange(kgs.change))
ChangeState::Complete(Change::NodeChange(self.netinfo.public_key_map().clone()))
} else if let Some(change) = self.vote_counter.compute_winner().cloned() {
// If there is a new change, restart DKG. Inform the user about the current change.
step.extend(match &change {
Change::NodeChange(change) => self.update_key_gen(batch_epoch + 1, &change)?,
step.extend(match change {
Change::NodeChange(ref pub_keys) => {
self.update_key_gen(batch_epoch + 1, pub_keys)?
}
Change::EncryptionSchedule(schedule) => {
self.update_encryption_schedule(batch_epoch + 1, *schedule)?
self.update_encryption_schedule(batch_epoch + 1, schedule)?
}
});
match change {
@ -339,28 +337,21 @@ where
pub(super) fn update_key_gen(
&mut self,
era: u64,
change: &NodeChange<N>,
pub_keys: &BTreeMap<N, PublicKey>,
) -> Result<Step<C, N>> {
if self.key_gen_state.as_ref().map(|kgs| &kgs.change) == Some(change) {
if self.key_gen_state.as_ref().map(KeyGenState::public_keys) == Some(pub_keys) {
return Ok(Step::default()); // The change is the same as before. Continue DKG as is.
}
debug!("{}: Restarting DKG for {:?}.", self, change);
// Use the existing key shares - with the change applied - as keys for DKG.
let mut pub_keys = self.netinfo.public_key_map().clone();
if match *change {
NodeChange::Remove(ref id) => pub_keys.remove(id).is_none(),
NodeChange::Add(ref id, ref pk) => pub_keys.insert(id.clone(), pk.clone()).is_some(),
} {
warn!("{}: No-op change: {:?}", self, change);
}
debug!("{}: Restarting DKG for {:?}.", self, pub_keys);
self.restart_honey_badger(era, None);
// TODO: This needs to be the same as `num_faulty` will be in the _new_
// `NetworkInfo` if the change goes through. It would be safer to deduplicate.
let threshold = (pub_keys.len() - 1) / 3;
let sk = self.netinfo.secret_key().clone();
let our_id = self.our_id().clone();
let (key_gen, part) = SyncKeyGen::new(&mut self.rng, our_id, sk, pub_keys, threshold)?;
self.key_gen_state = Some(KeyGenState::new(key_gen, change.clone()));
let (key_gen, part) =
SyncKeyGen::new(&mut self.rng, our_id, sk, pub_keys.clone(), threshold)?;
self.key_gen_state = Some(KeyGenState::new(key_gen));
if let Some(part) = part {
self.send_transaction(KeyGenMessage::Part(part))
} else {
@ -460,7 +451,7 @@ where
/// Returns `true` if the signature of `kg_msg` by the node with the specified ID is valid.
/// Returns an error if the payload fails to serialize.
///
/// This accepts signatures from both validators and the currently joining candidate, if any.
/// This accepts signatures from both validators and currently joining candidates, if any.
fn verify_signature(
&self,
node_id: &N,
@ -469,13 +460,11 @@ where
) -> Result<bool> {
let ser =
bincode::serialize(kg_msg).map_err(|err| ErrorKind::VerifySignatureBincode(*err))?;
let get_candidate_key = || {
self.key_gen_state
.as_ref()
.and_then(|kgs| kgs.candidate_key(node_id))
};
let pk_opt = self.netinfo.public_key(node_id).or_else(get_candidate_key);
Ok(pk_opt.map_or(false, |pk| pk.verify(&sig, ser)))
let verify = |opt_pk: Option<&PublicKey>| opt_pk.map_or(false, |pk| pk.verify(&sig, &ser));
let kgs = self.key_gen_state.as_ref();
let current_key = self.netinfo.public_key(node_id);
let candidate_key = kgs.and_then(|kgs| kgs.public_keys().get(node_id));
Ok(verify(current_key) || verify(candidate_key))
}
/// Returns the maximum future epochs of the Honey Badger algorithm instance.

View File

@ -85,7 +85,7 @@ use NodeIdT;
pub use self::batch::Batch;
pub use self::builder::DynamicHoneyBadgerBuilder;
pub use self::change::{Change, ChangeState, NodeChange};
pub use self::change::{Change, ChangeState};
pub use self::dynamic_honey_badger::DynamicHoneyBadger;
pub use self::error::{Error, ErrorKind, Result};
@ -93,7 +93,7 @@ pub type Step<C, N> = ::DaStep<DynamicHoneyBadger<C, N>>;
/// The user input for `DynamicHoneyBadger`.
#[derive(Clone, Debug)]
pub enum Input<C, N> {
pub enum Input<C, N: Ord> {
/// A user-defined contribution for the next epoch.
User(C),
/// A vote to change the set of validators.
@ -113,7 +113,7 @@ pub enum KeyGenMessage {
/// A message sent to or received from another node's Honey Badger instance.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Message<N: Rand> {
pub enum Message<N: Rand + Ord> {
/// A message belonging to the `HoneyBadger` algorithm started in the given epoch.
HoneyBadger(u64, HbMessage<N>),
/// A transaction to be committed, signed by a node.
@ -122,7 +122,7 @@ pub enum Message<N: Rand> {
SignedVote(SignedVote<N>),
}
impl<N: Rand> Message<N> {
impl<N: Rand + Ord> Message<N> {
fn era(&self) -> u64 {
match *self {
Message::HoneyBadger(era, _) => era,
@ -153,38 +153,31 @@ pub struct JoinPlan<N: Ord> {
/// The ongoing key generation, together with information about the validator change.
#[derive(Debug)]
struct KeyGenState<N> {
struct KeyGenState<N: Ord> {
/// The key generation instance.
key_gen: SyncKeyGen<N>,
/// The change for which key generation is performed.
change: NodeChange<N>,
/// The number of key generation messages received from each peer. At most _N + 1_ are
/// accepted.
msg_count: BTreeMap<N, usize>,
}
impl<N: NodeIdT> KeyGenState<N> {
fn new(key_gen: SyncKeyGen<N>, change: NodeChange<N>) -> Self {
fn new(key_gen: SyncKeyGen<N>) -> Self {
KeyGenState {
key_gen,
change,
msg_count: BTreeMap::new(),
}
}
/// Returns `true` if the candidate's, if any, as well as enough validators' key generation
/// parts have been completed.
/// Returns `true` if enough validators' key generation parts have been completed.
fn is_ready(&self) -> bool {
let candidate_ready = |id: &N| self.key_gen.is_node_ready(id);
self.key_gen.is_ready() && self.change.candidate().map_or(true, candidate_ready)
let kg = &self.key_gen;
kg.is_ready() && kg.count_complete() * 3 > 2 * kg.public_keys().len()
}
/// If the node `node_id` is the currently joining candidate, returns its public key.
fn candidate_key(&self, node_id: &N) -> Option<&PublicKey> {
match self.change {
NodeChange::Add(ref id, ref pk) if id == node_id => Some(pk),
NodeChange::Add(_, _) | NodeChange::Remove(_) => None,
}
/// Returns the map of new validators and their public keys.
fn public_keys(&self) -> &BTreeMap<N, PublicKey> {
self.key_gen.public_keys()
}
/// Increments the message count for the given node, and returns the new count.
@ -198,7 +191,7 @@ impl<N: NodeIdT> KeyGenState<N> {
/// The contribution for the internal `HoneyBadger` instance: this includes a user-defined
/// application-level contribution as well as internal signed messages.
#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
struct InternalContrib<C, N> {
struct InternalContrib<C, N: Ord> {
/// A user-defined contribution.
contrib: C,
/// Key generation messages that get committed via Honey Badger to communicate synchronously.

View File

@ -15,7 +15,7 @@ use {NetworkInfo, NodeIdT};
/// This is reset whenever the set of validators changes or a change reaches _f + 1_ votes. We call
/// the epochs since the last reset the current _era_.
#[derive(Debug)]
pub struct VoteCounter<N> {
pub struct VoteCounter<N: Ord> {
/// Shared network data.
netinfo: Arc<NetworkInfo<N>>,
/// The epoch when voting was reset.
@ -158,7 +158,7 @@ where
/// A vote fore removing or adding a validator.
#[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash, Clone)]
struct Vote<N> {
struct Vote<N: Ord> {
/// The change this vote is for.
change: Change<N>,
/// The epoch in which the current era began.
@ -169,13 +169,13 @@ struct Vote<N> {
/// A signed vote for removing or adding a validator.
#[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash, Clone)]
pub struct SignedVote<N> {
pub struct SignedVote<N: Ord> {
vote: Vote<N>,
voter: N,
sig: Signature,
}
impl<N> SignedVote<N> {
impl<N: Ord> SignedVote<N> {
pub fn era(&self) -> u64 {
self.vote.era
}
@ -187,9 +187,9 @@ impl<N> SignedVote<N> {
#[cfg(test)]
mod tests {
use std::iter;
use std::sync::Arc;
use super::super::NodeChange;
use super::{Change, SignedVote, VoteCounter};
use fault_log::{FaultKind, FaultLog};
use rand;
@ -198,12 +198,14 @@ mod tests {
/// Returns a vector of `node_num` `VoteCounter`s, and some signed example votes.
///
/// If `signed_votes` is the second entry of the return value, then `signed_votes[i][j]` is the
/// the vote for `Remove(j)` by node `i`. Each node signed `Remove(0)`, `Remove(1)`, ... in
/// order.
/// the vote by node `i` for making `j` the only validator. Each node signed this for nodes
/// `0`, `1`, ... in order.
fn setup(node_num: usize, era: u64) -> (Vec<VoteCounter<usize>>, Vec<Vec<SignedVote<usize>>>) {
let mut rng = rand::thread_rng();
// Create keys for threshold cryptography.
let netinfos = NetworkInfo::generate_map(0..node_num, &mut rand::thread_rng())
let netinfos = NetworkInfo::generate_map(0..node_num, &mut rng)
.expect("Failed to generate `NetworkInfo` map");
let pub_keys = netinfos[&0].public_key_map().clone();
// Create a `VoteCounter` instance for each node.
let create_counter =
@ -213,13 +215,9 @@ mod tests {
// Sign a few votes.
let sign_votes = |counter: &mut VoteCounter<usize>| {
(0..node_num)
.map(NodeChange::Remove)
.map(|change| {
counter
.sign_vote_for(Change::NodeChange(change))
.expect("sign vote")
.clone()
}).collect::<Vec<_>>()
.map(|j| Change::NodeChange(iter::once((j, pub_keys[&j])).collect()))
.map(|change| counter.sign_vote_for(change).expect("sign vote").clone())
.collect::<Vec<_>>()
};
let signed_votes: Vec<_> = counters.iter_mut().map(sign_votes).collect();
(counters, signed_votes)
@ -305,9 +303,9 @@ mod tests {
.add_committed_vote(&1, sv[2][1].clone())
.expect("add committed");
assert!(faults.is_empty());
assert_eq!(
ct.compute_winner(),
Some(&Change::NodeChange(NodeChange::Remove(1)))
);
match ct.compute_winner() {
Some(Change::NodeChange(pub_keys)) => assert!(pub_keys.keys().eq(iter::once(&1))),
winner => panic!("Unexpected winner: {:?}", winner),
}
}
}

View File

@ -34,9 +34,9 @@ use serde::{de::DeserializeOwned, Serialize};
use dynamic_honey_badger::{self, Batch as DhbBatch, DynamicHoneyBadger, Message};
use transaction_queue::TransactionQueue;
use {util, Contribution, DistAlgorithm, NodeIdT};
use {util, Contribution, DistAlgorithm, NetworkInfo, NodeIdT};
pub use dynamic_honey_badger::{Change, ChangeState, Input, NodeChange};
pub use dynamic_honey_badger::{Change, ChangeState, Input};
/// Queueing honey badger error variants.
#[derive(Debug, Fail)]
@ -95,7 +95,7 @@ pub type Result<T> = ::std::result::Result<T, Error>;
/// A Queueing Honey Badger builder, to configure the parameters and create new instances of
/// `QueueingHoneyBadger`.
pub struct QueueingHoneyBadgerBuilder<T, N: Rand, Q> {
pub struct QueueingHoneyBadgerBuilder<T, N: Rand + Ord, Q> {
/// Shared network data.
dyn_hb: DynamicHoneyBadger<Vec<T>, N>,
/// The target number of transactions to be included in each batch.
@ -175,7 +175,7 @@ where
/// queue.
#[derive(Derivative)]
#[derivative(Debug)]
pub struct QueueingHoneyBadger<T, N: Rand, Q> {
pub struct QueueingHoneyBadger<T, N: Rand + Ord, Q> {
/// The target number of transactions to be included in each batch.
batch_size: usize,
/// The internal managed `DynamicHoneyBadger` instance.
@ -253,11 +253,7 @@ where
/// This stores a pending vote for the change. It will be included in some future batch, and
/// once enough validators have been voted for the same change, it will take effect.
pub fn vote_for(&mut self, change: Change<N>) -> Result<Step<T, N>> {
Ok(self
.dyn_hb
.handle_input(Input::Change(change))
.map_err(ErrorKind::Input)?
.join(self.propose()?))
self.apply(|dyn_hb| dyn_hb.vote_for(change))
}
/// Casts a vote to add a node as a validator.
@ -265,29 +261,22 @@ where
/// This stores a pending vote for the change. It will be included in some future batch, and
/// once enough validators have been voted for the same change, it will take effect.
pub fn vote_to_add(&mut self, node_id: N, pub_key: PublicKey) -> Result<Step<T, N>> {
self.vote_for(Change::NodeChange(NodeChange::Add(node_id, pub_key)))
self.apply(|dyn_hb| dyn_hb.vote_to_add(node_id, pub_key))
}
/// Casts a vote to demote a validator to observer.
///
/// This stores a pending vote for the change. It will be included in some future batch, and
/// once enough validators have been voted for the same change, it will take effect.
pub fn vote_to_remove(&mut self, node_id: N) -> Result<Step<T, N>> {
self.vote_for(Change::NodeChange(NodeChange::Remove(node_id)))
pub fn vote_to_remove(&mut self, node_id: &N) -> Result<Step<T, N>> {
self.apply(|dyn_hb| dyn_hb.vote_to_remove(node_id))
}
/// Handles a message received from `sender_id`.
///
/// This must be called with every message we receive from another node.
pub fn handle_message(&mut self, sender_id: &N, message: Message<N>) -> Result<Step<T, N>> {
let step = self
.dyn_hb
.handle_message(sender_id, message)
.map_err(ErrorKind::HandleMessage)?;
for batch in &step.output {
self.queue.remove_multiple(batch.iter());
}
Ok(step.join(self.propose()?))
self.apply(|dyn_hb| dyn_hb.handle_message(sender_id, message))
}
/// Returns a reference to the internal managed `DynamicHoneyBadger` instance.
@ -295,6 +284,22 @@ where
&self.dyn_hb
}
/// Returns the information about the node IDs in the network, and the cryptographic keys.
pub fn netinfo(&self) -> &NetworkInfo<N> {
self.dyn_hb.netinfo()
}
/// Applies a function `f` to the `DynamicHoneyBadger` instance and processes the step.
fn apply<F>(&mut self, f: F) -> Result<Step<T, N>>
where
F: FnOnce(&mut DynamicHoneyBadger<Vec<T>, N>) -> dynamic_honey_badger::Result<Step<T, N>>,
{
let step = f(&mut self.dyn_hb).map_err(ErrorKind::Input)?;
self.queue
.remove_multiple(step.output.iter().flat_map(Batch::iter));
Ok(step.join(self.propose()?))
}
/// Returns `true` if we are ready to propose our contribution for the next epoch, i.e. if the
/// previous epoch has completed and we have either pending transactions or we are required to
/// make a proposal to avoid stalling the network.

View File

@ -12,7 +12,7 @@ use super::{
use {Contribution, DaStep, NodeIdT};
use dynamic_honey_badger::{
Batch, Change, ChangeState, DynamicHoneyBadger, Error as DhbError, Message, NodeChange,
Batch, Change, ChangeState, DynamicHoneyBadger, Error as DhbError, Message,
};
impl<C, N> SenderQueueableOutput<N, Message<N>> for Batch<C, N>
@ -20,21 +20,19 @@ where
C: Contribution,
N: NodeIdT + Rand,
{
fn added_node(&self) -> Option<N> {
if let ChangeState::InProgress(Change::NodeChange(NodeChange::Add(ref id, _))) =
self.change()
{
fn added_peers(&self) -> Vec<N> {
if let ChangeState::InProgress(Change::NodeChange(pub_keys)) = self.change() {
// Register the new node to send broadcast messages to it from now on.
Some(id.clone())
pub_keys.keys().cloned().collect()
} else {
None
Vec::new()
}
}
}
impl<N> SenderQueueableMessage for Message<N>
where
N: Rand,
N: Rand + Ord,
{
type Epoch = (u64, u64);
@ -114,7 +112,7 @@ where
///
/// This stores a pending vote for the change. It will be included in some future batch, and
/// once enough validators have been voted for the same change, it will take effect.
pub fn vote_to_remove(&mut self, node_id: N) -> Result<C, N> {
pub fn vote_to_remove(&mut self, node_id: &N) -> Result<C, N> {
self.apply(|algo| algo.vote_to_remove(node_id))
}
}

View File

@ -10,8 +10,8 @@ where
C: Contribution,
N: NodeIdT + Rand,
{
fn added_node(&self) -> Option<N> {
None
fn added_peers(&self) -> Vec<N> {
Vec::new()
}
}

View File

@ -41,9 +41,9 @@ pub trait SenderQueueableOutput<N, M>
where
N: NodeIdT,
{
/// Returns an optional new node added with the batch. This node should be added to the set of
/// all nodes.
fn added_node(&self) -> Option<N>;
/// Returns the new set of validator that this batch is starting or completing key generation
/// for, including the existing ones. These should be added to the set of all nodes.
fn added_peers(&self) -> Vec<N>;
}
pub trait SenderQueueableDistAlgorithm: Epoched + DistAlgorithm {
@ -206,7 +206,7 @@ where
return Step::<D>::default();
}
// Look up `DynamicHoneyBadger` epoch updates and collect any added peers.
for node in step.output.iter().filter_map(|batch| batch.added_node()) {
for node in step.output.iter().flat_map(|batch| batch.added_peers()) {
if &node != self.our_id() {
self.peer_epochs.entry(node).or_default();
}

View File

@ -75,7 +75,7 @@ where
///
/// This stores a pending vote for the change. It will be included in some future batch, and
/// once enough validators have been voted for the same change, it will take effect.
pub fn vote_to_remove(&mut self, node_id: N) -> Result<T, N, Q> {
pub fn vote_to_remove(&mut self, node_id: &N) -> Result<T, N, Q> {
self.apply(|algo| algo.vote_to_remove(node_id))
}
}

View File

@ -351,6 +351,11 @@ impl<N: NodeIdT> SyncKeyGen<N> {
Ok((key_gen, Some(Part(commit, rows))))
}
/// Returns the map of participating nodes and their public keys.
pub fn public_keys(&self) -> &BTreeMap<N, PublicKey> {
&self.pub_keys
}
/// Handles a `Part` message. If it is valid, returns an `Ack` message to be broadcast.
///
/// If we are only an observer, `None` is returned instead and no messages need to be sent.

View File

@ -20,9 +20,7 @@ use itertools::Itertools;
use log::info;
use rand::{Isaac64Rng, Rng};
use hbbft::dynamic_honey_badger::{
Batch, Change, ChangeState, DynamicHoneyBadger, Input, NodeChange,
};
use hbbft::dynamic_honey_badger::{Batch, Change, ChangeState, DynamicHoneyBadger, Input};
use hbbft::sender_queue::{SenderQueue, Step};
use hbbft::transaction_queue::TransactionQueue;
use hbbft::NetworkInfo;
@ -43,25 +41,25 @@ where
network.input(*id, Input::User(queue.choose(&mut rng, 3, 10)));
}
network.input_all(Input::Change(Change::NodeChange(NodeChange::Remove(
NodeId(0),
))));
let netinfo = network.observer.instance().algo().netinfo().clone();
let pub_keys_add = netinfo.public_key_map().clone();
let mut pub_keys_rm = pub_keys_add.clone();
pub_keys_rm.remove(&NodeId(0));
network.input_all(Input::Change(Change::NodeChange(pub_keys_rm.clone())));
fn has_remove(node: &TestNode<UsizeDhb>) -> bool {
node.outputs().iter().any(|batch| {
*batch.change()
== ChangeState::Complete(Change::NodeChange(NodeChange::Remove(NodeId(0))))
})
}
fn has_add(node: &TestNode<UsizeDhb>) -> bool {
node.outputs().iter().any(|batch| match *batch.change() {
ChangeState::Complete(Change::NodeChange(NodeChange::Add(ref id, _))) => {
*id == NodeId(0)
}
let has_remove = |node: &TestNode<UsizeDhb>| {
node.outputs().iter().any(|batch| match batch.change() {
ChangeState::Complete(Change::NodeChange(pub_keys)) => pub_keys == &pub_keys_rm,
_ => false,
})
}
};
let has_add = |node: &TestNode<UsizeDhb>| {
node.outputs().iter().any(|batch| match batch.change() {
ChangeState::Complete(Change::NodeChange(pub_keys)) => pub_keys == &pub_keys_add,
_ => false,
})
};
// Returns `true` if the node has not output all transactions yet.
let node_busy = |node: &TestNode<UsizeDhb>| {
@ -99,16 +97,7 @@ where
network.step();
// Once all nodes have processed the removal of node 0, add it again.
if !input_add && network.nodes.values().all(has_remove) {
let pk = network.nodes[&NodeId(0)]
.instance()
.algo()
.netinfo()
.secret_key()
.public_key();
network.input_all(Input::Change(Change::NodeChange(NodeChange::Add(
NodeId(0),
pk,
))));
network.input_all(Input::Change(Change::NodeChange(pub_keys_add.clone())));
input_add = true;
}
}

View File

@ -9,7 +9,7 @@ pub mod net;
use std::{collections, time};
use hbbft::dynamic_honey_badger::{Change, ChangeState, DynamicHoneyBadger, Input, NodeChange};
use hbbft::dynamic_honey_badger::{Change, ChangeState, DynamicHoneyBadger, Input};
use hbbft::sender_queue::SenderQueue;
use net::adversary::ReorderingAdversary;
use net::proptest::{gen_seed, NetworkDimension, TestRng, TestRngSeed};
@ -143,9 +143,18 @@ fn do_drop_and_readd(cfg: TestConfig) {
}
// Afterwards, remove a specific node from the dynamic honey badger network.
net.broadcast_input(&Input::Change(Change::NodeChange(NodeChange::Remove(
pivot_node_id,
)))).expect("broadcasting failed");
let netinfo = net
.get(pivot_node_id)
.expect("pivot node missing")
.algorithm()
.algo()
.netinfo()
.clone();
let pub_keys_add = netinfo.public_key_map().clone();
let mut pub_keys_rm = pub_keys_add.clone();
pub_keys_rm.remove(&pivot_node_id);
net.broadcast_input(&Input::Change(Change::NodeChange(pub_keys_rm.clone())))
.expect("broadcasting failed");
// We are tracking (correct) nodes' state through the process by ticking them off individually.
let mut awaiting_removal: collections::BTreeSet<_> =
@ -163,27 +172,24 @@ fn do_drop_and_readd(cfg: TestConfig) {
for change in step.output.iter().map(|output| output.change()) {
match change {
ChangeState::Complete(Change::NodeChange(NodeChange::Remove(pivot_node_id))) => {
ChangeState::Complete(Change::NodeChange(ref pub_keys))
if *pub_keys == pub_keys_rm =>
{
println!("Node {:?} done removing.", node_id);
// Removal complete, tally:
awaiting_removal.remove(&node_id);
// Now we can add the node again. Public keys will be reused.
let pk = net[*pivot_node_id]
.algorithm()
.algo()
.netinfo()
.secret_key()
.public_key();
let _ = net[node_id]
.algorithm_mut()
.handle_input(Input::Change(Change::NodeChange(NodeChange::Add(
*pivot_node_id,
pk,
)))).expect("failed to send `Add` input");
let _ = net
.send_input(
node_id,
Input::Change(Change::NodeChange(pub_keys_add.clone())),
).expect("failed to send `Add` input");
}
ChangeState::Complete(Change::NodeChange(NodeChange::Add(pivot_node_id, _))) => {
ChangeState::Complete(Change::NodeChange(ref pub_keys))
if *pub_keys == pub_keys_add =>
{
println!("Node {:?} done adding.", node_id);
// Node added, ensure it has been removed first.
if awaiting_removal.contains(&node_id) {

View File

@ -105,24 +105,20 @@ impl MessageScheduler {
&self,
nodes: &BTreeMap<D::NodeId, TestNode<D>>,
) -> D::NodeId {
match *self {
MessageScheduler::First => nodes
.iter()
.find(|(_, node)| !node.queue.is_empty())
.map(|(id, _)| id.clone())
.expect("no more messages in queue"),
MessageScheduler::Random => {
let ids: Vec<D::NodeId> = nodes
.iter()
.filter(|(_, node)| !node.queue.is_empty())
.map(|(id, _)| id.clone())
.collect();
rand::thread_rng()
.choose(&ids)
.expect("no more messages in queue")
.clone()
}
}
let mut ids = nodes
.iter()
.filter(|(_, node)| !node.queue.is_empty())
.map(|(id, _)| id.clone());
let rand_node = match *self {
MessageScheduler::First => rand::thread_rng().gen_weighted_bool(10),
MessageScheduler::Random => true,
};
if rand_node {
let ids: Vec<D::NodeId> = ids.collect();
rand::thread_rng().choose(&ids).cloned()
} else {
ids.next()
}.expect("no more messages in queue")
}
}

View File

@ -21,9 +21,7 @@ use log::info;
use rand::{Isaac64Rng, Rng};
use hbbft::dynamic_honey_badger::DynamicHoneyBadger;
use hbbft::queueing_honey_badger::{
Batch, Change, ChangeState, Input, NodeChange, QueueingHoneyBadger,
};
use hbbft::queueing_honey_badger::{Batch, Change, ChangeState, Input, QueueingHoneyBadger};
use hbbft::sender_queue::{Message, SenderQueue, Step};
use hbbft::NetworkInfo;
@ -36,29 +34,30 @@ fn test_queueing_honey_badger<A>(mut network: TestNetwork<A, QHB>, num_txs: usiz
where
A: Adversary<QHB>,
{
let netinfo = network.observer.instance().algo().netinfo().clone();
let pub_keys_add = netinfo.public_key_map().clone();
let mut pub_keys_rm = pub_keys_add.clone();
pub_keys_rm.remove(&NodeId(0));
network.input_all(Input::Change(Change::NodeChange(pub_keys_rm.clone())));
// The second half of the transactions will be input only after a node has been removed.
network.input_all(Input::Change(Change::NodeChange(NodeChange::Remove(
NodeId(0),
))));
for tx in 0..(num_txs / 2) {
network.input_all(Input::User(tx));
}
fn has_remove(node: &TestNode<QHB>) -> bool {
node.outputs().iter().any(|batch| {
*batch.change()
== ChangeState::Complete(Change::NodeChange(NodeChange::Remove(NodeId(0))))
})
}
fn has_add(node: &TestNode<QHB>) -> bool {
node.outputs().iter().any(|batch| match *batch.change() {
ChangeState::Complete(Change::NodeChange(NodeChange::Add(ref id, _))) => {
*id == NodeId(0)
}
let has_remove = |node: &TestNode<QHB>| {
node.outputs().iter().any(|batch| match batch.change() {
ChangeState::Complete(Change::NodeChange(pub_keys)) => pub_keys == &pub_keys_rm,
_ => false,
})
}
};
let has_add = |node: &TestNode<QHB>| {
node.outputs().iter().any(|batch| match batch.change() {
ChangeState::Complete(Change::NodeChange(pub_keys)) => pub_keys == &pub_keys_add,
_ => false,
})
};
// Returns `true` if the node has not output all transactions yet.
// If it has, and has advanced another epoch, it clears all messages for later epochs.
@ -80,17 +79,7 @@ where
for tx in (num_txs / 2)..num_txs {
network.input_all(Input::User(tx));
}
let pk = network.nodes[&NodeId(0)]
.instance()
.algo()
.dyn_hb()
.netinfo()
.secret_key()
.public_key();
network.input_all(Input::Change(Change::NodeChange(NodeChange::Add(
NodeId(0),
pk,
))));
network.input_all(Input::Change(Change::NodeChange(pub_keys_add.clone())));
input_add = true;
}
}