Fixes the net_dynamic_hb test (#372)

* started waiting for a full epoch after node removal in net_dynamic_hb

* clarified the use of the stored join plan

* go back to rejoining the node in the same epoch it was removed

* cleanup of debug prints

* clippy lints and more cleanup

* cleaned up unused methods

* review comments; cleaned up net_dynamic_hb

* relaxed the condition on the readd input epoch

* updated the fault error in tests
This commit is contained in:
Vladimir Komendantskiy 2019-01-03 09:22:44 +00:00 committed by GitHub
parent 5bfcd6c692
commit 742ad7b83a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 380 additions and 162 deletions

View File

@ -8,12 +8,14 @@ use rand::Rng;
use serde::{de::DeserializeOwned, Serialize};
use super::{
SenderQueue, SenderQueueableDistAlgorithm, SenderQueueableMessage, SenderQueueableOutput,
Error, Message, SenderQueue, SenderQueueableDistAlgorithm, SenderQueueableMessage,
SenderQueueableOutput,
};
use crate::{Contribution, DaStep, NodeIdT};
use crate::dynamic_honey_badger::{
Batch, Change, ChangeState, DynamicHoneyBadger, Error as DhbError, Message,
Batch, Change, ChangeState, DynamicHoneyBadger, Error as DhbError, JoinPlan,
Message as DhbMessage,
};
impl<C, N> SenderQueueableOutput<N, (u64, u64)> for Batch<C, N>
@ -42,34 +44,34 @@ where
}
}
impl<N: Ord> SenderQueueableMessage for Message<N> {
impl<N: Ord> SenderQueueableMessage for DhbMessage<N> {
type Epoch = (u64, u64);
fn is_premature(&self, (them_era, them): (u64, u64), max_future_epochs: u64) -> bool {
match *self {
Message::HoneyBadger(era, ref msg) => {
DhbMessage::HoneyBadger(era, ref msg) => {
era > them_era || (era == them_era && msg.epoch() > them + max_future_epochs)
}
Message::KeyGen(era, _, _) => era > them_era,
Message::SignedVote(ref signed_vote) => signed_vote.era() > them_era,
DhbMessage::KeyGen(era, _, _) => era > them_era,
DhbMessage::SignedVote(ref signed_vote) => signed_vote.era() > them_era,
}
}
fn is_obsolete(&self, (them_era, them): (u64, u64)) -> bool {
match *self {
Message::HoneyBadger(era, ref msg) => {
DhbMessage::HoneyBadger(era, ref msg) => {
era < them_era || (era == them_era && msg.epoch() < them)
}
Message::KeyGen(era, _, _) => era < them_era,
Message::SignedVote(ref signed_vote) => signed_vote.era() < them_era,
DhbMessage::KeyGen(era, _, _) => era < them_era,
DhbMessage::SignedVote(ref signed_vote) => signed_vote.era() < them_era,
}
}
fn first_epoch(&self) -> (u64, u64) {
match *self {
Message::HoneyBadger(era, ref msg) => (era, msg.epoch()),
Message::KeyGen(era, _, _) => (era, 0),
Message::SignedVote(ref signed_vote) => (signed_vote.era(), 0),
DhbMessage::HoneyBadger(era, ref msg) => (era, msg.epoch()),
DhbMessage::KeyGen(era, _, _) => (era, 0),
DhbMessage::SignedVote(ref signed_vote) => (signed_vote.era(), 0),
}
}
}
@ -84,7 +86,7 @@ where
}
}
type Result<C, N> = result::Result<DaStep<SenderQueue<DynamicHoneyBadger<C, N>>>, DhbError>;
type Result<C, N> = result::Result<DaStep<SenderQueue<DynamicHoneyBadger<C, N>>>, Error<DhbError>>;
impl<C, N> SenderQueue<DynamicHoneyBadger<C, N>>
where
@ -124,4 +126,31 @@ where
pub fn vote_to_remove(&mut self, node_id: &N) -> Result<C, N> {
self.apply(|algo| algo.vote_to_remove(node_id))
}
/// Restarts the managed algorithm with the given join plan with a new list of peers and with
/// the same secret key. In order to be restarted, the node should have completed the process of
/// removing itself from the network. The node may not output a batch if it were not properly
/// removed.
pub fn restart<I, R: Rng>(
&mut self,
join_plan: JoinPlan<N>,
peer_ids: I,
rng: &mut R,
) -> Result<C, N>
where
I: Iterator<Item = N>,
{
if !self.is_removed {
return Err(Error::DynamicHoneyBadgerNotRemoved);
}
let secret_key = self.algo().netinfo().secret_key().clone();
let id = self.algo().netinfo().our_id().clone();
let (dhb, dhb_step) =
DynamicHoneyBadger::new_joining(id.clone(), secret_key, join_plan, rng)
.map_err(Error::DynamicHoneyBadgerNewJoining)?;
let (sq, mut sq_step) = SenderQueue::builder(dhb, peer_ids).build(id);
sq_step.extend(dhb_step.map(|output| output, |fault| fault, Message::from));
*self = sq;
Ok(sq_step)
}
}

19
src/sender_queue/error.rs Normal file
View File

@ -0,0 +1,19 @@
use failure::Fail;
use std::fmt::Debug;
/// Sender queue error variants.
#[derive(Debug, Fail)]
pub enum Error<E>
where
E: Debug + Fail,
{
/// Failed to apply a function to the managed algorithm.
#[fail(display = "Function application failure: {}", _0)]
Apply(E),
/// Failed to restart `DynamicHoneyBadger` because it had not been removed.
#[fail(display = "DynamicHoneyBadger was not removed before restarting")]
DynamicHoneyBadgerNotRemoved,
/// Failed to start a new joining `DynamicHoneyBadger`.
#[fail(display = "Failed to start a new joining DynamicHoneyBadger: {}", _0)]
DynamicHoneyBadgerNewJoining(E),
}

View File

@ -6,6 +6,7 @@
//! any incoming messages with non-matching epochs can be safely discarded.
mod dynamic_honey_badger;
mod error;
mod honey_badger;
mod message;
mod queueing_honey_badger;
@ -19,6 +20,7 @@ use log::debug;
use crate::traits::EpochT;
use crate::{DaStep, DistAlgorithm, Epoched, NodeIdT, Target};
pub use self::error::Error;
pub use self::message::Message;
/// A message type that is suitable for use with a sender queue.
@ -103,6 +105,12 @@ where
/// participants (validators both current and proposed) in order to roll the ballot back if it
/// fails to progress.
participants_after_change: BTreeSet<D::NodeId>,
/// A flag that gets set when this node is removed from the set of participants. When it is set,
/// the node broadcasts the last `EpochStarted` and then no more messages. The flag can be reset
/// to `false` only by restarting the node. In case of `DynamicHoneyBadger` or
/// `QueueingHoneyBadger`, it can be restarted on receipt of a join plan where this node is a
/// validator.
is_removed: bool,
}
/// A `SenderQueue` step. The output corresponds to the wrapped algorithm.
@ -119,14 +127,14 @@ where
type Input = D::Input;
type Output = D::Output;
type Message = Message<D::Message>;
type Error = D::Error;
type Error = Error<D::Error>;
type FaultKind = D::FaultKind;
fn handle_input<R: Rng>(
&mut self,
input: Self::Input,
rng: &mut R,
) -> Result<DaStep<Self>, D::Error> {
) -> Result<DaStep<Self>, Error<D::Error>> {
self.handle_input(input, rng)
}
@ -135,12 +143,12 @@ where
sender_id: &D::NodeId,
message: Self::Message,
rng: &mut R,
) -> Result<DaStep<Self>, D::Error> {
) -> Result<DaStep<Self>, Error<D::Error>> {
self.handle_message(sender_id, message, rng)
}
fn terminated(&self) -> bool {
false
self.is_removed
}
fn our_id(&self) -> &D::NodeId {
@ -169,7 +177,10 @@ where
&mut self,
input: D::Input,
rng: &mut R,
) -> Result<DaStep<Self>, D::Error> {
) -> Result<DaStep<Self>, Error<D::Error>> {
if self.is_removed {
return Ok(Step::<D>::default());
}
self.apply(|algo| algo.handle_input(input, rng))
}
@ -181,7 +192,10 @@ where
sender_id: &D::NodeId,
message: Message<D::Message>,
rng: &mut R,
) -> Result<DaStep<Self>, D::Error> {
) -> Result<DaStep<Self>, Error<D::Error>> {
if self.is_removed {
return Ok(Step::<D>::default());
}
match message {
Message::EpochStarted(epoch) => Ok(self.handle_epoch_started(sender_id, epoch)),
Message::Algo(msg) => self.handle_message_content(sender_id, msg, rng),
@ -193,13 +207,18 @@ where
&self.algo
}
/// Returns `true` iff the node has been removed from the list of participants.
pub fn is_removed(&self) -> bool {
self.is_removed
}
/// Applies `f` to the wrapped algorithm and converts the step in the result to a sender queue
/// step, deferring or dropping messages, where necessary.
fn apply<F>(&mut self, f: F) -> Result<DaStep<Self>, D::Error>
fn apply<F>(&mut self, f: F) -> Result<DaStep<Self>, Error<D::Error>>
where
F: FnOnce(&mut D) -> Result<DaStep<D>, D::Error>,
{
let mut step = f(&mut self.algo)?;
let mut step = f(&mut self.algo).map_err(Error::Apply)?;
let mut sender_queue_step = self.update_epoch(&step);
self.defer_messages(&mut step);
sender_queue_step.extend(step.map(|output| output, |fault| fault, Message::from));
@ -249,7 +268,7 @@ where
sender_id: &D::NodeId,
content: D::Message,
rng: &mut R,
) -> Result<DaStep<Self>, D::Error> {
) -> Result<DaStep<Self>, Error<D::Error>> {
self.apply(|algo| algo.handle_message(sender_id, content, rng))
}
@ -258,6 +277,9 @@ where
if step.output.is_empty() {
return Step::<D>::default();
}
// If this node removes itself after this epoch, it should send an `EpochStarted` with the
// next epoch and then go offline.
let mut send_last_epoch_started = false;
// Look up `DynamicHoneyBadger` epoch updates and collect any added peers.
for batch in &step.output {
if let Some(next_participants) = batch.participant_change() {
@ -278,16 +300,27 @@ where
.clone()
.difference(&next_participants)
{
// Begin the peer removal process.
// Begin the participant removal process.
self.remove_participant_after(&id, &batch.output_epoch());
}
if self.participants_after_change.contains(&self.our_id)
&& !next_participants.contains(&self.our_id)
{
send_last_epoch_started = true;
}
self.participants_after_change = next_participants;
}
}
// Announce the new epoch.
Target::All
.message(Message::EpochStarted(self.algo.epoch()))
.into()
if !self.is_removed || send_last_epoch_started {
// Announce the new epoch.
Target::All
.message(Message::EpochStarted(self.algo.epoch()))
.into()
} else {
// If removed, do not announce the new epoch to prevent peers from sending messages to
// this node.
Step::<D>::default()
}
}
/// Removes any messages to nodes at earlier epochs from the given `Step`. This may involve
@ -307,25 +340,17 @@ where
}
}
/// Removes a given old participant if it has been scheduled for removal as a result of being
/// superseded by a new set of participants of which it is not a member. Returns `true` if the
/// participant has been removed and `false` otherwise.
fn remove_participant_if_old(&mut self, id: &D::NodeId) -> bool {
self.last_epochs
.get(id)
.cloned()
.map_or(false, |last_epoch| self.remove_participant(id, &last_epoch))
}
/// Removes a given old participant after a specified epoch if that participant has become
/// superseded by a new set of participants of which it is not a member. Returns `true` if the
/// participant has been removed and `false` otherwise.
fn remove_participant_after(&mut self, id: &D::NodeId, last_epoch: &D::Epoch) -> bool {
self.last_epochs.insert(id.clone(), last_epoch.clone());
self.remove_participant(id, last_epoch)
self.remove_participant_if_old(id)
}
/// Removes a participant after a specified last epoch. The participant is removed if
/// Removes a given old participant if it has been scheduled for removal as a result of being
/// superseded by a new set of participants of which it is not a member. The participant is
/// removed if
///
/// 1. its epoch is newer than its last epoch, or
///
@ -333,23 +358,27 @@ where
/// queue has sent all messages for all epochs up to the last epoch to the participant.
///
/// Returns `true` if the participant has been removed and `false` otherwise.
fn remove_participant(&mut self, id: &D::NodeId, last_epoch: &D::Epoch) -> bool {
if *last_epoch >= self.algo.epoch() {
fn remove_participant_if_old(&mut self, id: &D::NodeId) -> bool {
let last_epoch = if let Some(epoch) = self.last_epochs.get(id) {
*epoch
} else {
return false;
};
if last_epoch >= self.algo.epoch() {
return false;
}
if let Some(peer_epoch) = self.peer_epochs.get(id) {
if last_epoch >= peer_epoch {
return false;
}
if let Some(q) = self.outgoing_queue.get(id) {
if q.keys().any(|epoch| epoch <= last_epoch) {
if id == self.our_id() {
self.is_removed = true;
} else {
if let Some(peer_epoch) = self.peer_epochs.get(id) {
if last_epoch >= *peer_epoch {
return false;
}
}
self.peer_epochs.remove(&id);
self.outgoing_queue.remove(&id);
}
self.peer_epochs.remove(&id);
self.last_epochs.remove(&id);
self.outgoing_queue.remove(&id);
true
}
@ -408,6 +437,7 @@ where
peer_epochs: self.peer_epochs,
last_epochs: BTreeMap::new(),
participants_after_change: BTreeSet::new(),
is_removed: false,
};
let step = Target::All.message(Message::EpochStarted(epoch)).into();
(sq, step)

View File

@ -7,7 +7,7 @@ use rand::distributions::{Distribution, Standard};
use rand::Rng;
use serde::{de::DeserializeOwned, Serialize};
use super::{SenderQueue, SenderQueueableDistAlgorithm};
use super::{Error, SenderQueue, SenderQueueableDistAlgorithm};
use crate::queueing_honey_badger::{Change, Error as QhbError, QueueingHoneyBadger};
use crate::transaction_queue::TransactionQueue;
use crate::{Contribution, DaStep, Epoched, NodeIdT};
@ -38,7 +38,8 @@ where
}
}
type Result<T, N, Q> = result::Result<DaStep<SenderQueue<QueueingHoneyBadger<T, N, Q>>>, QhbError>;
type Result<T, N, Q> =
result::Result<DaStep<SenderQueue<QueueingHoneyBadger<T, N, Q>>>, Error<QhbError>>;
impl<T, N, Q> SenderQueue<QueueingHoneyBadger<T, N, Q>>
where

View File

@ -265,18 +265,18 @@ where
}
}
Target::All => {
if peer_epochs
.values()
.all(|&them| msg.message.is_accepted(them, max_future_epochs))
{
let is_accepted = |&them| msg.message.is_accepted(them, max_future_epochs);
let is_premature = |&them| msg.message.is_premature(them, max_future_epochs);
let is_obsolete = |&them| msg.message.is_obsolete(them);
if peer_epochs.values().all(is_accepted) {
passed_msgs.push(msg);
} else {
// The `Target::All` message is split into two sets of point messages: those
// which can be sent without delay and those which should be postponed.
for (id, &them) in peer_epochs {
if msg.message.is_premature(them, max_future_epochs) {
for (id, them) in peer_epochs {
if is_premature(them) {
deferred_msgs.push((id.clone(), msg.message.clone()));
} else if !msg.message.is_obsolete(them) {
} else if !is_obsolete(them) {
passed_msgs
.push(Target::Node(id.clone()).message(msg.message.clone()));
}

View File

@ -6,7 +6,7 @@ use std::time;
use failure;
use threshold_crypto as crypto;
use hbbft::{DistAlgorithm, Fault};
use hbbft::DistAlgorithm;
use super::NetMessage;
@ -40,8 +40,15 @@ where
MessageLimitExceeded(usize),
/// The execution time limit has been reached or exceeded.
TimeLimitHit(time::Duration),
/// A `Fault` is encountered in a step of a `DistAlgorithm`.
Fault(Fault<D::NodeId, D::FaultKind>),
/// A `Fault` was reported by a correct node in a step of a `DistAlgorithm`.
Fault {
/// The ID of the node that reported the fault.
reported_by: D::NodeId,
/// The ID of the faulty node.
faulty_id: D::NodeId,
/// The reported fault.
fault_kind: D::FaultKind,
},
/// An error occurred while generating initial keys for threshold cryptography.
InitialKeyGeneration(crypto::error::Error),
}
@ -93,9 +100,15 @@ where
CrankError::TimeLimitHit(lim) => {
write!(f, "Time limit of {} seconds exceeded.", lim.as_secs())
}
CrankError::Fault(fault) => {
write!(f, "Node {:?} is faulty: {:?}.", fault.node_id, fault.kind)
}
CrankError::Fault {
reported_by,
faulty_id,
fault_kind,
} => write!(
f,
"Correct node {:?} reported node {:?} as faulty: {:?}.",
reported_by, faulty_id, fault_kind
),
CrankError::InitialKeyGeneration(err) => write!(
f,
"An error occurred while generating initial keys for threshold cryptography: {:?}.",
@ -136,7 +149,16 @@ where
f.debug_tuple("MessageLimitExceeded").field(max).finish()
}
CrankError::TimeLimitHit(lim) => f.debug_tuple("TimeLimitHit").field(lim).finish(),
CrankError::Fault(fault) => f.debug_tuple("Fault").field(fault).finish(),
CrankError::Fault {
reported_by,
faulty_id,
fault_kind,
} => f
.debug_struct("Fault")
.field("reported_by", reported_by)
.field("faulty_id", faulty_id)
.field("fault_kind", fault_kind)
.finish(),
CrankError::InitialKeyGeneration(err) => {
f.debug_tuple("InitialKeyGeneration").field(err).finish()
}

View File

@ -217,7 +217,7 @@ pub type NetMessage<D> =
#[allow(clippy::needless_pass_by_value)]
fn process_step<'a, D>(
nodes: &'a mut BTreeMap<D::NodeId, Node<D>>,
sender: D::NodeId,
stepped_id: D::NodeId,
step: &DaStep<D>,
dest: &mut VecDeque<NetMessage<D>>,
error_on_fault: bool,
@ -229,7 +229,7 @@ where
{
// For non-faulty nodes, we count the number of messages.
let faulty = nodes
.get(&sender)
.get(&stepped_id)
.expect("Trying to process a step with non-existing node ID")
.is_faulty();
let mut message_count: usize = 0;
@ -244,20 +244,20 @@ where
}
dest.push_back(NetworkMessage::new(
sender.clone(),
stepped_id.clone(),
tmsg.message.clone(),
to.clone(),
));
}
// Broadcast messages get expanded into multiple direct messages.
hbbft::Target::All => {
for to in nodes.keys().filter(|&to| to != &sender) {
for to in nodes.keys().filter(|&to| to != &stepped_id) {
if !faulty {
message_count = message_count.saturating_add(1);
}
dest.push_back(NetworkMessage::new(
sender.clone(),
stepped_id.clone(),
tmsg.message.clone(),
to.clone(),
));
@ -267,14 +267,18 @@ where
}
nodes
.get_mut(&sender)
.get_mut(&stepped_id)
.expect("Trying to process a step with non-existing node ID")
.store_step(step);
if error_on_fault {
// Verify that no correct node is reported as faulty.
// Verify that no correct node is reported as faulty by a correct node.
if error_on_fault && !nodes[&stepped_id].is_faulty() {
for fault in &step.fault_log.0 {
if nodes.get(&fault.node_id).map_or(false, |n| !n.is_faulty()) {
return Err(CrankError::Fault(fault.clone()));
return Err(CrankError::Fault {
reported_by: stepped_id.clone(),
faulty_id: fault.node_id.clone(),
fault_kind: fault.kind.clone(),
});
}
}
}
@ -588,6 +592,9 @@ where
/// `false` switches allows to carry on with the test despite `Fault`s reported for a correct
/// node.
error_on_fault: bool,
/// IDs of nodes that have been removed from the network. This is used to discard messages that
/// may be sent to those nodes cleanly.
removed_nodes: BTreeSet<D::NodeId>,
}
impl<D, A> VirtualNet<D, A>
@ -637,13 +644,15 @@ where
/// the network at the time of insertion.
#[inline]
pub fn insert_node(&mut self, node: Node<D>) -> Option<Node<D>> {
self.removed_nodes.remove(node.id());
self.nodes.insert(node.id().clone(), node)
}
/// Removes a node with the given ID from the network. Returns the removed node if there was a
/// node with this ID at the time of removal.
/// Removes a node with the given ID from the network and all messages addressed to
/// it. Returns the removed node if there was a node with this ID at the time of removal.
#[inline]
pub fn remove_node(&mut self, id: &D::NodeId) -> Option<Node<D>> {
self.removed_nodes.insert(id.clone());
self.messages.retain(|msg| msg.to != *id);
self.nodes.remove(id)
}
@ -777,10 +786,10 @@ where
let mut message_count: usize = 0;
// For every recorded step, apply it.
for (sender, step) in &steps {
for (stepped_id, step) in &steps {
let n = process_step(
&mut nodes,
sender.clone(),
stepped_id.clone(),
step,
&mut messages,
error_on_fault,
@ -801,6 +810,7 @@ where
time_limit: None,
start_time: time::Instant::now(),
error_on_fault: true,
removed_nodes: BTreeSet::new(),
},
steps.into_iter().collect(),
))
@ -913,8 +923,14 @@ where
}
self.adversary = adv;
// Step 1: Pick a message from the queue and deliver it; returns `None` if queue is empty.
let msg = self.messages.pop_front()?;
// Step 1: Pick the first message from the queue addressed to a node that is not removed and
// deliver it. Return `None` if the queue is empty.
let msg = loop {
let msg = self.messages.pop_front()?;
if !self.removed_nodes.contains(&msg.to) {
break msg;
}
};
net_trace!(
self,
@ -923,14 +939,14 @@ where
msg.to,
msg.payload
);
let receiver = msg.to.clone();
let stepped_id = msg.to.clone();
// Unfortunately, we have to re-borrow the target node further down to make the borrow
// checker happy. First, we check if the receiving node is faulty, so we can dispatch
// through the adversary if it is.
let is_faulty = try_some!(self
.nodes
.get(&msg.to)
.get(&stepped_id)
.ok_or_else(|| CrankError::NodeDisappearedInCrank(msg.to.clone())))
.is_faulty();
@ -956,12 +972,12 @@ where
// All messages are expanded and added to the queue. We opt for copying them, so we can
// return unaltered step later on for inspection.
try_some!(self.process_step(receiver.clone(), &step));
try_some!(self.process_step(stepped_id.clone(), &step));
// Increase the crank count.
self.crank_count += 1;
Some(Ok((receiver, step)))
Some(Ok((stepped_id, step)))
}
/// Convenience function for cranking.
@ -1046,6 +1062,16 @@ where
}
}
for node in self.correct_nodes().filter(|n| n.id() != full_node.id()) {
let id = node.id();
let actual_epochs: BTreeSet<_> =
node.outputs.iter().map(|batch| batch.epoch()).collect();
let expected_epochs: BTreeSet<_> =
expected[id].iter().map(|batch| batch.epoch()).collect();
assert_eq!(
expected_epochs, actual_epochs,
"Output epochs of {:?} don't match the expectation.",
id
);
assert_eq!(
node.outputs.len(),
expected[node.id()].len(),

View File

@ -1,9 +1,11 @@
pub mod net;
use std::{collections, time};
use std::collections::{BTreeMap, BTreeSet};
use std::time;
use hbbft::dynamic_honey_badger::{Change, ChangeState, DynamicHoneyBadger, Input, JoinPlan};
use hbbft::sender_queue::{Message, SenderQueue, Step};
use hbbft::sender_queue::{SenderQueue, Step};
use hbbft::Epoched;
use proptest::{prelude::ProptestConfig, prop_compose, proptest, proptest_helper};
use rand::{seq::SliceRandom, SeedableRng};
@ -90,8 +92,9 @@ fn do_drop_and_readd(cfg: TestConfig) {
let mut rng: TestRng = TestRng::from_seed(cfg.seed);
// First, we create a new test network with Honey Badger instances.
let (mut net, _) = NetBuilder::new(0..cfg.dimension.size())
.num_faulty(cfg.dimension.faulty())
let num_faulty = cfg.dimension.faulty();
let (net, _) = NetBuilder::new(0..cfg.dimension.size())
.num_faulty(num_faulty)
// Limited to 15k messages per node.
.message_limit(15_000 * cfg.dimension.size() as usize)
// 30 secs per node.
@ -99,7 +102,11 @@ fn do_drop_and_readd(cfg: TestConfig) {
.adversary(ReorderingAdversary::new())
.using_step(move |node: NewNodeInfo<SenderQueue<_>>| {
let id = node.id;
println!("Constructing new dynamic honey badger node #{}", id);
println!(
"Constructing new {} dynamic honey badger node #{}",
if id < num_faulty { "faulty" } else { "correct" },
id
);
let dhb = DynamicHoneyBadger::builder().build(node.netinfo.clone());
SenderQueue::builder(
dhb,
@ -110,9 +117,12 @@ fn do_drop_and_readd(cfg: TestConfig) {
.build(&mut rng)
.expect("could not construct test network");
let mut state = TestState::new(net);
// We will use the first correct node as the node we will remove from and re-add to the network.
// Note: This should be randomized using proptest.
let pivot_node_id: usize = *(net
let pivot_node_id: usize = *(state
.net
.correct_nodes()
.nth(0)
.expect("expected at least one correct node")
@ -121,7 +131,8 @@ fn do_drop_and_readd(cfg: TestConfig) {
// We generate a list of transaction we want to propose, for each node. All nodes will propose
// a number between 0..total_txs, chosen randomly.
let mut queues: collections::BTreeMap<_, Vec<usize>> = net
let mut queues: BTreeMap<_, Vec<usize>> = state
.net
.nodes()
.map(|node| (*node.id(), (0..cfg.total_txs).collect()))
.collect();
@ -132,13 +143,15 @@ fn do_drop_and_readd(cfg: TestConfig) {
println!("Node {:?} will propose: {:?}", id, proposal);
// The step will have its messages added to the queue automatically, we ignore the output.
let _ = net
let _ = state
.net
.send_input(*id, Input::User(proposal), &mut rng)
.expect("could not send initial transaction");
}
// Afterwards, remove a specific node from the dynamic honey badger network.
let netinfo = net
let netinfo = state
.net
.get(pivot_node_id)
.expect("pivot node missing")
.algorithm()
@ -148,25 +161,31 @@ fn do_drop_and_readd(cfg: TestConfig) {
let pub_keys_add = netinfo.public_key_map().clone();
let mut pub_keys_rm = pub_keys_add.clone();
pub_keys_rm.remove(&pivot_node_id);
net.broadcast_input(
&Input::Change(Change::NodeChange(pub_keys_rm.clone())),
&mut rng,
)
.expect("broadcasting failed");
state
.net
.broadcast_input(
&Input::Change(Change::NodeChange(pub_keys_rm.clone())),
&mut rng,
)
.expect("broadcasting failed");
// We are tracking (correct) nodes' state through the process by ticking them off individually.
let mut awaiting_removal: collections::BTreeSet<_> =
net.correct_nodes().map(|n| *n.id()).collect();
let mut awaiting_addition: collections::BTreeSet<_> = net
let non_pivot_nodes: BTreeSet<_> = state
.net
.correct_nodes()
.map(|n| *n.id())
.filter(|id| *id != pivot_node_id)
.collect();
let mut expected_outputs: collections::BTreeMap<_, collections::BTreeSet<_>> = net
let mut awaiting_removal: BTreeSet<_> = state.net.correct_nodes().map(|n| *n.id()).collect();
let mut awaiting_addition_input: BTreeSet<_> = non_pivot_nodes.clone();
let mut awaiting_addition_in_progress: BTreeSet<_> = non_pivot_nodes.clone();
let mut awaiting_addition: BTreeSet<_> = awaiting_removal.clone();
let mut expected_outputs: BTreeMap<_, BTreeSet<_>> = state
.net
.correct_nodes()
.map(|n| (*n.id(), (0..10).collect()))
.collect();
let mut received_batches: collections::BTreeMap<u64, _> = collections::BTreeMap::new();
let mut received_batches: BTreeMap<u64, _> = BTreeMap::new();
// Whether node 0 was rejoined as a validator.
let mut rejoined_pivot_node = false;
// The removed pivot node which is to be restarted as soon as all remaining validators agree to
@ -175,10 +194,8 @@ fn do_drop_and_readd(cfg: TestConfig) {
// Run the network:
loop {
let (node_id, step) = net.crank_expect(&mut rng);
// A flag telling whether the cranked node has been removed from the network.
let mut removed_ourselves = false;
if !net[node_id].is_faulty() {
let (node_id, step) = state.net.crank_expect(&mut rng);
if !state.net[node_id].is_faulty() {
for batch in &step.output {
// Check that correct nodes don't output different batches for the same epoch.
if let Some(b) = received_batches.insert(batch.epoch(), batch.clone()) {
@ -212,10 +229,10 @@ fn do_drop_and_readd(cfg: TestConfig) {
batch_participants
.iter()
.all(|id| expected_participants.contains(id)),
"The batch at node {} contains a contribution from an unexpected participant: \
{:?}",
"The batch at node {} contains an unexpected participant: {:?} (expected {:?})",
node_id,
batch
batch_participants,
expected_participants,
);
}
}
@ -224,41 +241,26 @@ fn do_drop_and_readd(cfg: TestConfig) {
ChangeState::Complete(Change::NodeChange(ref pub_keys))
if *pub_keys == pub_keys_rm =>
{
println!("Node {:?} done removing.", node_id);
println!("Node {} done removing.", node_id);
// Removal complete, tally:
awaiting_removal.remove(&node_id);
}
if awaiting_removal.is_empty() {
println!(
"Removing the pivot node {} from the test network",
pivot_node_id
);
saved_node = net.remove_node(&pivot_node_id);
if node_id == pivot_node_id {
removed_ourselves = true;
}
}
if node_id != pivot_node_id {
// Now we can add the node again. Public keys will be reused.
let _ = net
.send_input(
node_id,
Input::Change(Change::NodeChange(pub_keys_add.clone())),
&mut rng,
)
.expect("failed to send `Add` input");
}
ChangeState::InProgress(Change::NodeChange(ref pub_keys))
if *pub_keys == pub_keys_add =>
{
println!("Node {} is progressing with readding.", node_id);
awaiting_addition_in_progress.remove(&node_id);
}
ChangeState::Complete(Change::NodeChange(ref pub_keys))
if *pub_keys == pub_keys_add =>
{
println!("Node {:?} done adding.", node_id);
println!("Node {} done adding.", node_id);
// Node added, ensure it has been removed first.
if awaiting_removal.contains(&node_id) {
panic!(
"Node {:?} reported a success `Add({}, _)` before `Remove({})`",
"Node {} reported a success `Add({}, _)` before `Remove({})`",
node_id, pivot_node_id, pivot_node_id
);
}
@ -269,10 +271,25 @@ fn do_drop_and_readd(cfg: TestConfig) {
}
}
}
if removed_ourselves {
// Further operations on the cranked node are not possible. Continue with processing
// other nodes.
continue;
let (era, hb_epoch) = state.net[node_id].algorithm().algo().epoch();
if node_id != pivot_node_id
&& awaiting_addition_input.contains(&node_id)
&& state.shutdown_epoch.is_some()
&& era + hb_epoch >= state.shutdown_epoch.unwrap()
{
// Now we can add the node again. Public keys will be reused.
let step = state
.net
.send_input(
node_id,
Input::Change(Change::NodeChange(pub_keys_add.clone())),
&mut rng,
)
.expect("failed to send `Add` input");
assert!(step.output.is_empty());
awaiting_addition_input.remove(&node_id);
println!("Node {} started readding.", node_id);
}
// Record whether or not we received some output.
@ -291,6 +308,14 @@ fn do_drop_and_readd(cfg: TestConfig) {
node_id,
);
// If this is a batch removing the pivot node, record the epoch in which the pivot node
// will shut down.
if let ChangeState::Complete(Change::NodeChange(ref pub_keys)) = batch.change() {
if *pub_keys == pub_keys_rm {
state.shutdown_epoch = Some(batch.epoch() + 1);
}
}
for tx in batch.iter() {
// Remove the confirmed contribution from the input queue.
let index = queue.iter().position(|v| v == tx);
@ -299,29 +324,68 @@ fn do_drop_and_readd(cfg: TestConfig) {
}
// Add it to the set of received outputs.
if !net[node_id].is_faulty() {
if !state.net[node_id].is_faulty() {
expected_outputs
.get_mut(&node_id)
.expect("output set disappeared")
.remove(tx);
// Also delete expected output from the pivot node if that node is currently
// removed. It does not output any values in epochs in which it is not a
// participant.
if node_id != pivot_node_id
&& awaiting_removal.is_empty()
&& !rejoined_pivot_node
{
expected_outputs
.get_mut(&pivot_node_id)
.expect("pivot node output set disappeared")
.remove(tx);
}
}
}
// If this is the first batch from a correct node with a vote to add node 0 back, take
// the join plan of the batch and use it to restart node 0.
if awaiting_addition.is_empty() && !net[node_id].is_faulty() && !rejoined_pivot_node {
if !rejoined_pivot_node && !state.net[node_id].is_faulty() && state.join_plan.is_none()
{
if let ChangeState::InProgress(Change::NodeChange(pub_keys)) = batch.change() {
if *pub_keys == pub_keys_add {
let join_plan = batch
.join_plan()
.expect("failed to get the join plan of the batch");
let node = saved_node.take().expect("the pivot node wasn't saved");
let step = restart_node_for_add(&mut net, node, join_plan, &mut rng);
net.process_step(pivot_node_id, &step)
.expect("processing a step failed");
rejoined_pivot_node = true;
state.join_plan = Some(
batch
.join_plan()
.expect("failed to get the join plan of the batch"),
);
}
}
}
// Restart the pivot node having checked that it can be correctly restarted.
if !rejoined_pivot_node && awaiting_addition_in_progress.is_empty() {
if let Some(join_plan) = state.join_plan.take() {
let node = saved_node.take().expect("the pivot node wasn't saved");
let step = restart_node_for_add(&mut state.net, node, join_plan, &mut rng);
state
.net
.process_step(pivot_node_id, &step)
.expect("processing a step failed");
rejoined_pivot_node = true;
}
}
}
// Decide - from the point of view of the pivot node - whether it is ready to go offline.
if !rejoined_pivot_node
&& saved_node.is_none()
&& state.net[pivot_node_id].algorithm().is_removed()
{
println!(
"Removing the pivot node {} from the test network.",
pivot_node_id
);
saved_node = state.net.remove_node(&pivot_node_id);
if node_id == pivot_node_id {
// Further operations on the cranked node are not possible. Continue with
// processing other nodes.
continue;
}
}
// Check if we are done.
@ -339,7 +403,8 @@ fn do_drop_and_readd(cfg: TestConfig) {
let proposal =
choose_contribution(&mut rng, queue, cfg.batch_size, cfg.contribution_size);
let _ = net
let _ = state
.net
.send_input(node_id, Input::User(proposal), &mut rng)
.expect("could not send follow-up transaction");
}
@ -347,11 +412,12 @@ fn do_drop_and_readd(cfg: TestConfig) {
// As a final step, we verify that all nodes have arrived at the same conclusion. The pivot node
// can miss some batches while it was removed.
let full_node = net
let full_node = state
.net
.correct_nodes()
.find(|node| *node.id() != pivot_node_id)
.expect("Could not find a full node");
net.verify_batches(&full_node);
state.net.verify_batches(&full_node);
println!("End result: {:?}", full_node.outputs());
}
@ -368,18 +434,43 @@ where
{
println!("Restarting node {} with {:?}", node.id(), join_plan);
// TODO: When an observer node is added to the network, it should also be added to peer_ids.
let peer_ids: Vec<usize> = net
let peer_ids: Vec<_> = net
.nodes()
.map(|node| *node.id())
.filter(|id| id != node.id())
.map(|node| node.id())
.filter(|id| *id != node.id())
.cloned()
.collect();
let secret_key = node.algorithm().algo().netinfo().secret_key().clone();
let id = *node.id();
let (dhb, dhb_step) = DynamicHoneyBadger::new_joining(id, secret_key, join_plan, rng)
.expect("failed to reconstruct the pivot node");
let (sq, mut sq_step) = SenderQueue::builder(dhb, peer_ids.into_iter()).build(id);
*node.algorithm_mut() = sq;
sq_step.extend(dhb_step.map(|output| output, |fault| fault, Message::from));
let step = node
.algorithm_mut()
.restart(join_plan, peer_ids.into_iter(), rng)
.expect("failed to restart pivot node");
net.insert_node(node);
sq_step
step
}
/// Internal state of the test.
struct TestState<A>
where
A: Adversary<DHB>,
{
/// The test network.
net: VirtualNet<DHB, A>,
/// The join plan for readding the pivot node.
join_plan: Option<JoinPlan<usize>>,
/// The epoch in which the pivot node should go offline.
shutdown_epoch: Option<u64>,
}
impl<A> TestState<A>
where
A: Adversary<DHB>,
{
/// Constructs a new `VirtualNetState`.
fn new(net: VirtualNet<DHB, A>) -> Self {
TestState {
net,
join_plan: None,
shutdown_epoch: None,
}
}
}