Remove peers from sender queue (#352)

* impl. old validator removal from sender queue peer list

* provided current validators for sender queue peer removal

* renamed validators as nodes in the sender queue

* Revert "renamed validators as nodes in the sender queue"

This reverts commit 78e1e1569d5f624c469bf752a5bf874b434a61d2.

* cleaned up the SQ builder and moved removal of old validators to triggers

* computing participant transitions from batches in the sender queue

* added a missing comment

* removing old validators as soon as all messages are delivered up to the last epoch

* review comments

* rejoined Node 0 in the old DHB test

* DHB test uses the first step of the DHB algorithm on the restarted node

* changed test batch verification to account for node 0 removal

* updated net_dynamic_hb test to cope with the removal of node 0

* relaxed verification of batches to only check inclusion of node 0 transactions

* corrected test state transitions in DHB and QHB tests

* added a builder function for a joining QHB

* rejoin the pivot node instead of node 0

* changed VirtualNet::verify_batches to take a full node as an argument

* corrected a variable name

* correction: use the pivot node ID instead of indices

* corrected the pivot node ID

* simplified a find

* simplified a conditional statement

* corrected the inference of expected output in verify_batches

* WIP on DHB and QHB tests; VirtualNet::verify_batches made more general

* readded node 0 in the DHB test when InProgress change is output

* allowed node 0 to miss a few batches while it is removed in the QHB test

* edition and rebase fixes

* refactored the use of process_step

* added VirtualNet functionality of node insertion and removal

* restarting the pivot node after all validators add it as peer

* clippy lints in net_dynamic_hb

* added observer in the QHB test; removed the DHB TestNetwork test

* fixed rng usage in the QHB test

* check output length when verifying batches; comment correction
This commit is contained in:
Vladimir Komendantskiy 2018-12-17 13:27:46 +00:00 committed by GitHub
parent eafa77d5fc
commit 14960a148e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 570 additions and 304 deletions

View File

@ -144,7 +144,7 @@ pub struct JoinPlan<N: Ord> {
change: ChangeState<N>,
/// The current public key set for threshold cryptography.
pub_key_set: PublicKeySet,
/// The public keys of the nodes taking part in key generation.
/// The public keys of the current validators.
pub_keys: BTreeMap<N, PublicKey>,
/// Parameters controlling Honey Badger's behavior and performance.
params: Params,

View File

@ -25,13 +25,15 @@
use std::marker::PhantomData;
use std::{cmp, iter};
use crate::crypto::PublicKey;
use derivative::Derivative;
use failure::Fail;
use rand::{Rand, Rng};
use serde::{de::DeserializeOwned, Serialize};
use crate::dynamic_honey_badger::{self, Batch as DhbBatch, DynamicHoneyBadger, Message};
use crate::crypto::{PublicKey, SecretKey};
use crate::dynamic_honey_badger::{
self, Batch as DhbBatch, DynamicHoneyBadger, JoinPlan, Message, Step as DhbStep,
};
use crate::transaction_queue::TransactionQueue;
use crate::{Contribution, DistAlgorithm, NetworkInfo, NodeIdT};
@ -49,6 +51,9 @@ pub enum Error {
/// Failed to propose a contribution.
#[fail(display = "Propose error: {}", _0)]
Propose(dynamic_honey_badger::Error),
/// Failed to create a Dynamic Honey Badger instance according to a join plan.
#[fail(display = "New joining error: {}", _0)]
NewJoining(dynamic_honey_badger::Error),
}
/// The result of `QueueingHoneyBadger` handling an input or message.
@ -56,13 +61,19 @@ pub type Result<T> = ::std::result::Result<T, Error>;
/// A Queueing Honey Badger builder, to configure the parameters and create new instances of
/// `QueueingHoneyBadger`.
pub struct QueueingHoneyBadgerBuilder<T, N: Rand + Ord, Q> {
pub struct QueueingHoneyBadgerBuilder<T, N, Q>
where
T: Contribution + Serialize + DeserializeOwned + Clone,
N: NodeIdT + Serialize + DeserializeOwned + Rand,
{
/// Shared network data.
dyn_hb: DynamicHoneyBadger<Vec<T>, N>,
/// The target number of transactions to be included in each batch.
batch_size: usize,
/// The queue of pending transactions that haven't been output in a batch yet.
queue: Q,
/// The initial step of the managed `DynamicHoneyBadger` instance.
step: Option<DhbStep<Vec<T>, N>>,
_phantom: PhantomData<T>,
}
@ -74,20 +85,25 @@ where
N: NodeIdT + Serialize + DeserializeOwned + Rand,
Q: TransactionQueue<T>,
{
/// Returns a new `QueueingHoneyBadgerBuilder` configured to use the node IDs and cryptographic
/// keys specified by `netinfo`.
// TODO: Make it easier to build a `QueueingHoneyBadger` with a `JoinPlan`. Handle `Step`
// conversion internally.
/// Returns a new `QueueingHoneyBadgerBuilder` wrapping the given instance of
/// `DynamicHoneyBadger`.
pub fn new(dyn_hb: DynamicHoneyBadger<Vec<T>, N>) -> Self {
// TODO: Use the defaults from `HoneyBadgerBuilder`.
QueueingHoneyBadgerBuilder {
dyn_hb,
batch_size: 100,
queue: Default::default(),
step: None,
_phantom: PhantomData,
}
}
/// Sets the initial step of the `DynamicHoneyBadger` instance.
pub fn step(mut self, step: DhbStep<Vec<T>, N>) -> Self {
self.step = Some(step);
self
}
/// Sets the target number of transactions per batch.
pub fn batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
@ -123,7 +139,10 @@ where
batch_size: self.batch_size,
queue: self.queue,
};
let step = qhb.propose(rng)?;
let mut step = qhb.propose(rng)?;
if let Some(dhb_step) = self.step {
step.extend(dhb_step);
}
Ok((qhb, step))
}
}
@ -196,6 +215,22 @@ where
QueueingHoneyBadgerBuilder::new(dyn_hb)
}
/// Creates a new `QueueingHoneyBadgerBuilder` for joining the network specified in the
/// `JoinPlan`.
///
/// Returns a `QueueingHoneyBadgerBuilder` or an error if creation of the managed
/// `DynamicHoneyBadger` instance has failed.
pub fn builder_joining<R: Rng>(
our_id: N,
secret_key: SecretKey,
join_plan: JoinPlan<N>,
rng: &mut R,
) -> Result<QueueingHoneyBadgerBuilder<T, N, Q>> {
let (dhb, step) = DynamicHoneyBadger::new_joining(our_id, secret_key, join_plan, rng)
.map_err(Error::NewJoining)?;
Ok(QueueingHoneyBadgerBuilder::new(dhb).step(step))
}
/// Adds a transaction to the queue.
///
/// This can be called at any time to append to the transaction queue. The new transaction will
@ -263,6 +298,11 @@ where
self.dyn_hb.netinfo()
}
/// Returns the current queue of the `QueueingHoneyBadger`.
pub fn queue(&self) -> &Q {
&self.queue
}
/// Applies a function `f` to the `DynamicHoneyBadger` instance and processes the step.
fn apply<R, F>(&mut self, f: F, rng: &mut R) -> Result<Step<T, N>>
where

View File

@ -1,5 +1,6 @@
//! Convenience methods for a `SenderQueue` wrapping a `DynamicHoneyBadger`.
use std::collections::BTreeSet;
use std::result;
use crate::crypto::PublicKey;
@ -15,19 +16,30 @@ use crate::dynamic_honey_badger::{
Batch, Change, ChangeState, DynamicHoneyBadger, Error as DhbError, Message,
};
impl<C, N> SenderQueueableOutput<N, Message<N>> for Batch<C, N>
impl<C, N> SenderQueueableOutput<N, (u64, u64)> for Batch<C, N>
where
C: Contribution,
N: NodeIdT + Rand,
{
fn added_peers(&self) -> Vec<N> {
fn participant_change(&self) -> Option<BTreeSet<N>> {
if let ChangeState::InProgress(Change::NodeChange(pub_keys)) = self.change() {
// Register the new node to send broadcast messages to it from now on.
pub_keys.keys().cloned().collect()
let candidates = pub_keys.keys();
let current_validators: BTreeSet<&N> =
self.network_info().public_key_map().keys().collect();
let participants = candidates.chain(current_validators).cloned().collect();
Some(participants)
} else if let ChangeState::Complete(Change::NodeChange(pub_keys)) = self.change() {
let next_validators = pub_keys.keys().cloned().collect();
Some(next_validators)
} else {
Vec::new()
None
}
}
fn output_epoch(&self) -> (u64, u64) {
let hb_epoch = self.epoch() - self.era();
(self.era(), hb_epoch)
}
}
impl<N> SenderQueueableMessage for Message<N>

View File

@ -1,3 +1,5 @@
use std::collections::BTreeSet;
use rand::Rand;
use serde::{de::DeserializeOwned, Serialize};
@ -5,13 +7,17 @@ use super::{SenderQueueableDistAlgorithm, SenderQueueableMessage, SenderQueueabl
use crate::honey_badger::{Batch, HoneyBadger, Message};
use crate::{Contribution, Epoched, NodeIdT};
impl<C, N> SenderQueueableOutput<N, Message<N>> for Batch<C, N>
impl<C, N> SenderQueueableOutput<N, u64> for Batch<C, N>
where
C: Contribution,
N: NodeIdT + Rand,
{
fn added_peers(&self) -> Vec<N> {
Vec::new()
fn participant_change(&self) -> Option<BTreeSet<N>> {
None
}
fn output_epoch(&self) -> u64 {
self.epoch
}
}

View File

@ -11,9 +11,11 @@ mod message;
mod queueing_honey_badger;
use rand::Rng;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug;
use log::debug;
use crate::traits::EpochT;
use crate::{DaStep, DistAlgorithm, Epoched, NodeIdT, Target};
@ -39,14 +41,22 @@ pub trait SenderQueueableMessage {
fn first_epoch(&self) -> Self::Epoch;
}
/// An output type that is suitable for use with a sender queue.
pub trait SenderQueueableOutput<N, M>
/// An output type compatible with the sender queue.
pub trait SenderQueueableOutput<N, E>
where
N: NodeIdT,
{
/// Returns the new set of validator that this batch is starting or completing key generation
/// for, including the existing ones. These should be added to the set of all nodes.
fn added_peers(&self) -> Vec<N>;
/// Returns the set of participants in the next epoch. New participants should be added to the
/// set of peers for tracking their epochs. Old participants - ones that appear only among
/// current participants - should be scheduled for removal from the set of peers in an orderly
/// manner making sure that all messages those participants are entitled to are delivered to
/// them.
///
/// The common case of no change in the set of participants is denoted by `None`.
fn participant_change(&self) -> Option<BTreeSet<N>>;
/// The epoch in which the output was produced.
fn output_epoch(&self) -> E;
}
/// A `DistAlgorithm` that can be wrapped by a sender queue.
@ -82,6 +92,17 @@ where
/// The set of all remote nodes on the network including validator as well as non-validator
/// (observer) nodes together with their epochs as of the last communication.
peer_epochs: BTreeMap<D::NodeId, D::Epoch>,
/// The set of previously participating nodes now removed from the network. Each node is marked
/// with an epoch after which it left. The node is a member of this set from the epoch when it
/// was voted to be removed and until all messages have been delivered to it for all epochs in
/// which it was still a participant.
last_epochs: BTreeMap<D::NodeId, D::Epoch>,
/// Participants of the managed algorithm after the latest change of the participant set. If the
/// set of participants never changes, this set remains empty and unused. If the algorithm
/// initiates a ballot to change the validators, the sender queue has to remember the new set of
/// participants (validators both current and proposed) in order to roll the ballot back if it
/// fails to progress.
participants_after_change: BTreeSet<D::NodeId>,
}
/// A `SenderQueue` step. The output corresponds to the wrapped algorithm.
@ -92,7 +113,7 @@ where
D: SenderQueueableDistAlgorithm + Debug,
D::Message: Clone + SenderQueueableMessage<Epoch = D::Epoch>,
D::NodeId: NodeIdT,
D::Output: SenderQueueableOutput<D::NodeId, D::Message>,
D::Output: SenderQueueableOutput<D::NodeId, D::Epoch>,
{
type NodeId = D::NodeId;
type Input = D::Input;
@ -131,7 +152,7 @@ where
D: SenderQueueableDistAlgorithm + Debug,
D::Message: Clone + SenderQueueableMessage<Epoch = D::Epoch>,
D::NodeId: NodeIdT,
D::Output: SenderQueueableOutput<D::NodeId, D::Message>,
D::Output: SenderQueueableOutput<D::NodeId, D::Epoch>,
{
/// Returns a new `SenderQueueBuilder` configured to manage a given `DynamicHoneyBadger`
/// instance.
@ -194,7 +215,11 @@ where
}
})
.or_insert(epoch);
self.process_new_epoch(sender_id, epoch)
if !self.remove_participant_if_old(sender_id) {
self.process_new_epoch(sender_id, epoch)
} else {
Step::<D>::default()
}
}
/// Processes an announcement of a new epoch update received from a remote node.
@ -233,9 +258,29 @@ where
return Step::<D>::default();
}
// Look up `DynamicHoneyBadger` epoch updates and collect any added peers.
for node in step.output.iter().flat_map(|batch| batch.added_peers()) {
if &node != self.our_id() {
self.peer_epochs.entry(node).or_default();
for batch in &step.output {
if let Some(next_participants) = batch.participant_change() {
// Insert candidates.
for id in &next_participants {
if id != self.our_id() {
self.peer_epochs.entry(id.clone()).or_default();
}
}
debug!(
"Participants after the last change: {:?}",
self.participants_after_change
);
debug!("Next participants: {:?}", next_participants);
// Remove obsolete participants.
for id in self
.participants_after_change
.clone()
.difference(&next_participants)
{
// Begin the peer removal process.
self.remove_participant_after(&id, &batch.output_epoch());
}
self.participants_after_change = next_participants;
}
}
// Announce the new epoch.
@ -261,10 +306,61 @@ where
}
}
/// Removes a given old participant if it has been scheduled for removal as a result of being
/// superseded by a new set of participants of which it is not a member. Returns `true` if the
/// participant has been removed and `false` otherwise.
fn remove_participant_if_old(&mut self, id: &D::NodeId) -> bool {
self.last_epochs
.get(id)
.cloned()
.map_or(false, |last_epoch| self.remove_participant(id, &last_epoch))
}
/// Removes a given old participant after a specified epoch if that participant has become
/// superseded by a new set of participants of which it is not a member. Returns `true` if the
/// participant has been removed and `false` otherwise.
fn remove_participant_after(&mut self, id: &D::NodeId, last_epoch: &D::Epoch) -> bool {
self.last_epochs.insert(id.clone(), last_epoch.clone());
self.remove_participant(id, last_epoch)
}
/// Removes a participant after a specified last epoch. The participant is removed if
///
/// 1. its epoch is newer than its last epoch, or
///
/// 2. the epoch of the managed algorithm instance is newer than the last epoch and the sender
/// queue has sent all messages for all epochs up to the last epoch to the participant.
///
/// Returns `true` if the participant has been removed and `false` otherwise.
fn remove_participant(&mut self, id: &D::NodeId, last_epoch: &D::Epoch) -> bool {
if *last_epoch >= self.algo.epoch() {
return false;
}
if let Some(peer_epoch) = self.peer_epochs.get(id) {
if last_epoch >= peer_epoch {
return false;
}
if let Some(q) = self.outgoing_queue.get(id) {
if q.keys().any(|epoch| epoch <= last_epoch) {
return false;
}
}
}
self.peer_epochs.remove(&id);
self.last_epochs.remove(&id);
self.outgoing_queue.remove(&id);
true
}
/// Returns a reference to the managed algorithm.
pub fn algo(&self) -> &D {
&self.algo
}
/// Returns a mutable reference to the managed algorithm.
pub fn algo_mut(&mut self) -> &mut D {
&mut self.algo
}
}
/// A builder of a Honey Badger with a sender queue. It configures the parameters and creates a new
@ -274,7 +370,6 @@ where
D: SenderQueueableDistAlgorithm,
{
algo: D,
outgoing_queue: OutgoingQueue<D>,
peer_epochs: BTreeMap<D::NodeId, D::Epoch>,
}
@ -283,7 +378,7 @@ where
D: SenderQueueableDistAlgorithm + Debug,
D::Message: Clone + SenderQueueableMessage<Epoch = D::Epoch>,
D::NodeId: NodeIdT,
D::Output: SenderQueueableOutput<D::NodeId, D::Message>,
D::Output: SenderQueueableOutput<D::NodeId, D::Epoch>,
{
/// Creates a new builder, with an empty outgoing queue and the specified known peers.
pub fn new<I>(algo: D, peer_ids: I) -> Self
@ -292,18 +387,11 @@ where
{
SenderQueueBuilder {
algo,
outgoing_queue: BTreeMap::default(),
peer_epochs: peer_ids.map(|id| (id, D::Epoch::default())).collect(),
}
}
/// Sets the outgoing queue, if pending messages are already known in advance.
pub fn outgoing_queue(mut self, outgoing_queue: OutgoingQueue<D>) -> Self {
self.outgoing_queue = outgoing_queue;
self
}
/// Sets the peer epochs that are already known in advance.
/// Sets the peer epochs.
pub fn peer_epochs(mut self, peer_epochs: BTreeMap<D::NodeId, D::Epoch>) -> Self {
self.peer_epochs = peer_epochs;
self
@ -315,8 +403,10 @@ where
let sq = SenderQueue {
algo: self.algo,
our_id,
outgoing_queue: self.outgoing_queue,
outgoing_queue: BTreeMap::new(),
peer_epochs: self.peer_epochs,
last_epochs: BTreeMap::new(),
participants_after_change: BTreeSet::new(),
};
let step = Target::All.message(Message::EpochStarted(epoch)).into();
(sq, step)

View File

@ -1,153 +0,0 @@
#![deny(unused_must_use)]
//! Network tests for Dynamic Honey Badger.
mod network;
use std::collections::BTreeMap;
use std::iter;
use std::sync::Arc;
use itertools::Itertools;
use log::info;
use rand::{Isaac64Rng, Rng};
use hbbft::dynamic_honey_badger::{Batch, Change, ChangeState, DynamicHoneyBadger, Input};
use hbbft::sender_queue::{SenderQueue, Step};
use hbbft::transaction_queue::TransactionQueue;
use hbbft::{util, NetworkInfo};
use crate::network::{Adversary, MessageScheduler, NodeId, SilentAdversary, TestNetwork, TestNode};
type UsizeDhb = SenderQueue<DynamicHoneyBadger<Vec<usize>, NodeId>>;
/// Proposes `num_txs` values and expects nodes to output and order them.
fn test_dynamic_honey_badger<A>(mut network: TestNetwork<A, UsizeDhb>, num_txs: usize)
where
A: Adversary<UsizeDhb>,
{
let mut rng = rand::thread_rng().gen::<Isaac64Rng>();
let new_queue = |id: &NodeId| (*id, (0..num_txs).collect::<Vec<usize>>());
let mut queues: BTreeMap<_, _> = network.nodes.keys().map(new_queue).collect();
for (id, queue) in &mut queues {
network.input(*id, Input::User(queue.choose(&mut rng, 3, 10)));
}
let netinfo = network.observer.instance().algo().netinfo().clone();
let pub_keys_add = netinfo.public_key_map().clone();
let mut pub_keys_rm = pub_keys_add.clone();
pub_keys_rm.remove(&NodeId(0));
network.input_all(Input::Change(Change::NodeChange(pub_keys_rm.clone())));
let has_remove = |node: &TestNode<UsizeDhb>| {
node.outputs().iter().any(|batch| match batch.change() {
ChangeState::Complete(Change::NodeChange(pub_keys)) => pub_keys == &pub_keys_rm,
_ => false,
})
};
let has_add = |node: &TestNode<UsizeDhb>| {
node.outputs().iter().any(|batch| match batch.change() {
ChangeState::Complete(Change::NodeChange(pub_keys)) => pub_keys == &pub_keys_add,
_ => false,
})
};
// Returns `true` if the node has not output all transactions yet.
let node_busy = |node: &TestNode<UsizeDhb>| {
if !has_remove(node) || !has_add(node) {
return true;
}
node.outputs().iter().flat_map(Batch::iter).unique().count() < num_txs
};
let mut rng = rand::thread_rng();
let mut input_add = false; // Whether the vote to add node 0 has already been input.
// Handle messages in random order until all nodes have output all transactions.
while network.nodes.values().any(node_busy) {
// If a node is expecting input, take it from the queue. Otherwise handle a message.
let input_ids: Vec<_> = network
.nodes
.iter()
.filter(|(_, node)| {
node_busy(*node)
&& !node.instance().algo().has_input()
&& node.instance().algo().netinfo().is_validator()
// Wait until all nodes have completed removing 0, before inputting `Add`.
&& (input_add || !has_remove(node))
// If there's only one node, it will immediately output on input. Make sure we
// first process all incoming messages before providing input again.
&& (network.nodes.len() > 2 || node.queue.is_empty())
})
.map(|(id, _)| *id)
.collect();
if let Some(id) = rng.choose(&input_ids) {
let queue = queues.get_mut(id).unwrap();
queue.remove_multiple(network.nodes[id].outputs().iter().flat_map(Batch::iter));
network.input(*id, Input::User(queue.choose(&mut rng, 3, 10)));
}
network.step();
// Once all nodes have processed the removal of node 0, add it again.
if !input_add && network.nodes.values().all(has_remove) {
network.input_all(Input::Change(Change::NodeChange(pub_keys_add.clone())));
input_add = true;
}
}
network.verify_batches();
}
// Allow passing `netinfo` by value. `TestNetwork` expects this function signature.
#[allow(clippy::needless_pass_by_value)]
fn new_dynamic_hb(
netinfo: Arc<NetworkInfo<NodeId>>,
) -> (UsizeDhb, Step<DynamicHoneyBadger<Vec<usize>, NodeId>>) {
let observer = NodeId(netinfo.num_nodes());
let our_id = *netinfo.our_id();
let peer_ids = netinfo
.all_ids()
.filter(|&&them| them != our_id)
.cloned()
.chain(iter::once(observer));
SenderQueue::builder(
DynamicHoneyBadger::builder().build((*netinfo).clone()),
peer_ids,
)
.build(our_id)
}
fn test_dynamic_honey_badger_different_sizes<A, F>(new_adversary: F, num_txs: usize)
where
A: Adversary<UsizeDhb>,
F: Fn(usize, usize, BTreeMap<NodeId, Arc<NetworkInfo<NodeId>>>) -> A,
{
// This returns an error in all but the first test.
let _ = env_logger::try_init();
let mut rng = rand::thread_rng();
let sizes = vec![2, 3, 5, rng.gen_range(6, 10)];
for size in sizes {
// The test is removing one correct node, so we allow fewer faulty ones.
let num_adv_nodes = util::max_faulty(size - 1);
let num_good_nodes = size - num_adv_nodes;
info!(
"Network size: {} good nodes, {} faulty nodes",
num_good_nodes, num_adv_nodes
);
let adversary = |adv_nodes| new_adversary(num_good_nodes, num_adv_nodes, adv_nodes);
let network =
TestNetwork::new_with_step(num_good_nodes, num_adv_nodes, adversary, new_dynamic_hb);
test_dynamic_honey_badger(network, num_txs);
}
}
#[test]
fn test_dynamic_honey_badger_random_delivery_silent() {
let new_adversary = |_: usize, _: usize, _| SilentAdversary::new(MessageScheduler::Random);
test_dynamic_honey_badger_different_sizes(new_adversary, 10);
}
#[test]
fn test_dynamic_honey_badger_first_delivery_silent() {
let new_adversary = |_: usize, _: usize, _| SilentAdversary::new(MessageScheduler::First);
test_dynamic_honey_badger_different_sizes(new_adversary, 10);
}

View File

@ -28,9 +28,12 @@ where
msg: NetMessage<D>,
err: D::Error,
},
/// A node unexpectly disappeared from the list of nodes. Note that this is likely a bug in
/// the network framework code.
NodeDisappeared(D::NodeId),
/// As spotted during cranking, a node unexpectly disappeared from the list of nodes. Note that
/// this is likely a bug in the network framework code.
NodeDisappearedInCrank(D::NodeId),
/// As spotted during message dispatch, a node unexpectly disappeared from the list of
/// nodes. Note that this is likely a bug in the network framework code.
NodeDisappearedInDispatch(D::NodeId),
/// The configured maximum number of cranks has been reached or exceeded.
CrankLimitExceeded(usize),
/// The configured maximum number of messages has been reached or exceeded.
@ -71,7 +74,12 @@ where
"The algorithm could not process network message {:?}. Error: {:?}",
msg, err
),
CrankError::NodeDisappeared(id) => write!(
CrankError::NodeDisappearedInCrank(id) => write!(
f,
"Node {:?} disappeared or never existed, while it was cranked.",
id
),
CrankError::NodeDisappearedInDispatch(id) => write!(
f,
"Node {:?} disappeared or never existed, while it still had incoming messages.",
id
@ -114,7 +122,13 @@ where
.field("msg", msg)
.field("err", err)
.finish(),
CrankError::NodeDisappeared(id) => f.debug_tuple("NodeDisappeared").field(id).finish(),
CrankError::NodeDisappearedInCrank(id) => {
f.debug_tuple("NodeDisappearedInCrank").field(id).finish()
}
CrankError::NodeDisappearedInDispatch(id) => f
.debug_tuple("NodeDisappearedInDispatch")
.field(id)
.finish(),
CrankError::CrankLimitExceeded(max) => {
f.debug_tuple("CrankLimitExceeded").field(max).finish()
}

View File

@ -19,13 +19,15 @@ pub mod err;
pub mod proptest;
pub mod util;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::io::Write;
use std::{cmp, collections, env, fmt, fs, io, ops, process, time};
use std::{cmp, env, fmt, fs, io, ops, process, time};
use rand;
use rand::{Rand, Rng};
use hbbft::dynamic_honey_badger::Batch;
use hbbft::sender_queue::SenderQueueableOutput;
use hbbft::{self, Contribution, DaStep, DistAlgorithm, Fault, NetworkInfo, NodeIdT, Step};
use crate::try_some;
@ -194,7 +196,7 @@ impl<M, N> NetworkMessage<M, N> {
}
/// Mapping from node IDs to actual node instances.
pub type NodeMap<D> = collections::BTreeMap<<D as DistAlgorithm>::NodeId, Node<D>>;
pub type NodeMap<D> = BTreeMap<<D as DistAlgorithm>::NodeId, Node<D>>;
/// A virtual network message tied to a distributed algorithm.
pub type NetMessage<D> =
@ -215,10 +217,10 @@ pub type NetMessage<D> =
// borrow-checker restrictions.
#[allow(clippy::needless_pass_by_value)]
fn process_step<'a, D>(
nodes: &'a mut collections::BTreeMap<D::NodeId, Node<D>>,
nodes: &'a mut BTreeMap<D::NodeId, Node<D>>,
sender: D::NodeId,
step: &DaStep<D>,
dest: &mut collections::VecDeque<NetMessage<D>>,
dest: &mut VecDeque<NetMessage<D>>,
error_on_fault: bool,
) -> Result<usize, CrankError<D>>
where
@ -566,7 +568,7 @@ where
/// Maps node IDs to actual node instances.
nodes: NodeMap<D>,
/// A collection of all network messages queued up for delivery.
messages: collections::VecDeque<NetMessage<D>>,
messages: VecDeque<NetMessage<D>>,
/// An optional `Adversary` that controls the network delivery schedule and all faulty nodes.
adversary: Option<A>,
/// Trace output; if active, writes out a log of all messages.
@ -632,6 +634,21 @@ where
self.nodes_mut().filter(|n| !n.is_faulty())
}
/// Inserts a new node into the network. Returns the old node with the same ID if it existed on
/// the network at the time of insertion.
#[inline]
pub fn insert_node(&mut self, node: Node<D>) -> Option<Node<D>> {
self.nodes.insert(node.id().clone(), node)
}
/// Removes a node with the given ID from the network. Returns the removed node if there was a
/// node with this ID at the time of removal.
#[inline]
pub fn remove_node(&mut self, id: &D::NodeId) -> Option<Node<D>> {
self.messages.retain(|msg| msg.to != *id);
self.nodes.remove(id)
}
/// Retrieve a node by ID.
///
/// Returns `None` if the node ID is not part of the network.
@ -740,8 +757,8 @@ where
"Too many faulty nodes requested, `f` must satisfy `3f < total_nodes`."
);
let mut steps = collections::BTreeMap::new();
let mut messages = collections::VecDeque::new();
let mut steps = BTreeMap::new();
let mut messages = VecDeque::new();
let mut nodes = net_infos
.into_iter()
@ -802,7 +819,7 @@ where
let node = self
.nodes
.get_mut(&msg.to)
.ok_or_else(|| CrankError::NodeDisappeared(msg.to.clone()))?;
.ok_or_else(|| CrankError::NodeDisappearedInDispatch(msg.to.clone()))?;
// Store a copy of the message, in case we need to pass it to the error variant.
// By reducing the information in `CrankError::HandleMessage`, we could reduce overhead
@ -838,16 +855,22 @@ where
.algorithm
.handle_input(input, rng)
.map_err(CrankError::HandleInput)?;
self.process_step(id, &step)?;
Ok(step)
}
/// Processes a step of a given node. The results of the processing are stored internally in the
/// test network.
#[must_use = "The result of processing a step must be used."]
pub fn process_step(&mut self, id: D::NodeId, step: &DaStep<D>) -> Result<(), CrankError<D>> {
self.message_count = self.message_count.saturating_add(process_step(
&mut self.nodes,
id,
&step,
step,
&mut self.messages,
self.error_on_fault,
)?);
Ok(step)
Ok(())
}
/// Advance the network.
@ -909,7 +932,7 @@ where
let is_faulty = try_some!(self
.nodes
.get(&msg.to)
.ok_or_else(|| CrankError::NodeDisappeared(msg.to.clone())))
.ok_or_else(|| CrankError::NodeDisappearedInCrank(msg.to.clone())))
.is_faulty();
let step: Step<_, _, _> = if is_faulty {
@ -934,18 +957,7 @@ where
// All messages are expanded and added to the queue. We opt for copying them, so we can
// return unaltered step later on for inspection.
self.message_count = self.message_count.saturating_add(
match process_step(
&mut self.nodes,
receiver.clone(),
&step,
&mut self.messages,
self.error_on_fault,
) {
Ok(n) => n,
Err(e) => return Some(Err(e)),
},
);
try_some!(self.process_step(receiver.clone(), &step));
// Increase the crank count.
self.crank_count += 1;
@ -999,14 +1011,7 @@ where
// Process all messages from all steps in the queue.
for (id, step) in &steps {
let n = process_step(
&mut self.nodes,
id.clone(),
step,
&mut self.messages,
self.error_on_fault,
)?;
self.message_count = self.message_count.saturating_add(n);
self.process_step(id.clone(), step)?;
}
Ok(steps)
@ -1015,26 +1020,49 @@ where
impl<C, D, N, A> VirtualNet<D, A>
where
D: DistAlgorithm<Output = Batch<C, N>>,
D: DistAlgorithm<NodeId = N, Output = Batch<C, N>>,
D::Message: Clone,
A: Adversary<D>,
C: Contribution + Clone,
N: NodeIdT,
{
/// Verifies that all nodes' outputs agree, and returns the output.
pub fn verify_batches(&self) -> &[Batch<C, N>] {
let first = self.correct_nodes().nth(0).unwrap().outputs();
let pub_eq = |(b0, b1): (&Batch<C, _>, &Batch<C, _>)| b0.public_eq(b1);
for (i, node) in self.correct_nodes().enumerate().skip(0) {
assert!(
first.iter().zip(node.outputs()).all(pub_eq),
"Outputs of nodes 0 and {} differ: {:?} != {:?}",
i,
first,
node.outputs()
);
/// Verifies that all nodes' outputs agree, given a correct "full" node that output all
/// batches in a total order and with no gaps.
///
/// The output of the full node is used to derive in expected output of other nodes in every
/// epoch. After that the check ensures that correct nodes output the same batches in epochs
/// when those nodes were participants (either validators or candidates).
pub fn verify_batches<E>(&self, full_node: &Node<D>)
where
Batch<C, N>: SenderQueueableOutput<N, E>,
{
let mut participants: BTreeSet<N> = self.nodes().map(Node::id).cloned().collect();
let mut expected: BTreeMap<N, Vec<_>> = BTreeMap::new();
for batch in &full_node.outputs {
for id in &participants {
expected.entry(id.clone()).or_default().push(batch);
}
if let Some(new_participants) = batch.participant_change() {
participants = new_participants;
}
}
for node in self.correct_nodes().filter(|n| n.id() != full_node.id()) {
assert_eq!(
node.outputs.len(),
expected[node.id()].len(),
"The output length of node {:?} is incorrect",
node.id()
);
assert!(node
.outputs
.iter()
.zip(
expected
.get(node.id())
.expect("outputs don't match the expectation")
)
.all(|(a, b)| a.public_eq(b)));
}
first
}
}

View File

@ -2,14 +2,17 @@ pub mod net;
use std::{collections, time};
use crate::net::adversary::ReorderingAdversary;
use crate::net::proptest::{gen_seed, NetworkDimension, TestRng, TestRngSeed};
use crate::net::{NetBuilder, NewNodeInfo};
use hbbft::dynamic_honey_badger::{Change, ChangeState, DynamicHoneyBadger, Input};
use hbbft::sender_queue::SenderQueue;
use hbbft::dynamic_honey_badger::{Change, ChangeState, DynamicHoneyBadger, Input, JoinPlan};
use hbbft::sender_queue::{Message, SenderQueue, Step};
use proptest::{prelude::ProptestConfig, prop_compose, proptest, proptest_helper};
use rand::SeedableRng;
use crate::net::adversary::{Adversary, ReorderingAdversary};
use crate::net::proptest::{gen_seed, NetworkDimension, TestRng, TestRngSeed};
use crate::net::{NetBuilder, NewNodeInfo, Node, VirtualNet};
type DHB = SenderQueue<DynamicHoneyBadger<Vec<usize>, usize>>;
/// Choose a node's contribution for an epoch.
///
/// Selects randomly out of a slice, according to chosen batch and contribution sizes. The function
@ -61,7 +64,7 @@ prop_compose! {
contribution_size in 1..10usize,
seed in gen_seed())
-> TestConfig {
TestConfig{
TestConfig {
dimension, total_txs, batch_size, contribution_size, seed
}
}
@ -81,7 +84,8 @@ proptest! {
/// Dynamic honey badger: Drop a validator node, demoting it to observer, then re-add it, all while
/// running a regular honey badger network.
#[allow(clippy::needless_pass_by_value)]
// TODO: Add an observer node to the test network.
#[allow(clippy::needless_pass_by_value, clippy::cyclomatic_complexity)]
fn do_drop_and_readd(cfg: TestConfig) {
let mut rng: TestRng = TestRng::from_seed(cfg.seed);
@ -153,17 +157,27 @@ fn do_drop_and_readd(cfg: TestConfig) {
// We are tracking (correct) nodes' state through the process by ticking them off individually.
let mut awaiting_removal: collections::BTreeSet<_> =
net.correct_nodes().map(|n| *n.id()).collect();
let mut awaiting_addition: collections::BTreeSet<_> =
net.correct_nodes().map(|n| *n.id()).collect();
let mut awaiting_addition: collections::BTreeSet<_> = net
.correct_nodes()
.map(|n| *n.id())
.filter(|id| *id != pivot_node_id)
.collect();
let mut expected_outputs: collections::BTreeMap<_, collections::BTreeSet<_>> = net
.correct_nodes()
.map(|n| (*n.id(), (0..10).collect()))
.collect();
let mut received_batches: collections::BTreeMap<u64, _> = collections::BTreeMap::new();
// Whether node 0 was rejoined as a validator.
let mut rejoined_pivot_node = false;
// The removed pivot node which is to be restarted as soon as all remaining validators agree to
// add it back.
let mut saved_node: Option<Node<DHB>> = None;
// Run the network:
loop {
let (node_id, step) = net.crank_expect(&mut rng);
// A flag telling whether the cranked node has been removed from the network.
let mut removed_ourselves = false;
if !net[node_id].is_faulty() {
for batch in &step.output {
// Check that correct nodes don't output different batches for the same epoch.
@ -213,14 +227,27 @@ fn do_drop_and_readd(cfg: TestConfig) {
// Removal complete, tally:
awaiting_removal.remove(&node_id);
// Now we can add the node again. Public keys will be reused.
let _ = net
.send_input(
node_id,
Input::Change(Change::NodeChange(pub_keys_add.clone())),
&mut rng,
)
.expect("failed to send `Add` input");
if awaiting_removal.is_empty() {
println!(
"Removing the pivot node {} from the test network",
pivot_node_id
);
saved_node = net.remove_node(&pivot_node_id);
if node_id == pivot_node_id {
removed_ourselves = true;
}
}
if node_id != pivot_node_id {
// Now we can add the node again. Public keys will be reused.
let _ = net
.send_input(
node_id,
Input::Change(Change::NodeChange(pub_keys_add.clone())),
&mut rng,
)
.expect("failed to send `Add` input");
}
}
ChangeState::Complete(Change::NodeChange(ref pub_keys))
@ -241,6 +268,11 @@ fn do_drop_and_readd(cfg: TestConfig) {
}
}
}
if removed_ourselves {
// Further operations on the cranked node are not possible. Continue with processing
// other nodes.
continue;
}
// Record whether or not we received some output.
let has_output = !step.output.is_empty();
@ -273,6 +305,22 @@ fn do_drop_and_readd(cfg: TestConfig) {
.remove(tx);
}
}
// If this is the first batch from a correct node with a vote to add node 0 back, take
// the join plan of the batch and use it to restart node 0.
if awaiting_addition.is_empty() && !net[node_id].is_faulty() && !rejoined_pivot_node {
if let ChangeState::InProgress(Change::NodeChange(pub_keys)) = batch.change() {
if *pub_keys == pub_keys_add {
let join_plan = batch
.join_plan()
.expect("failed to get the join plan of the batch");
let node = saved_node.take().expect("the pivot node wasn't saved");
let step = restart_node_for_add(&mut net, node, join_plan, &mut rng);
net.process_step(pivot_node_id, &step)
.expect("processing a step failed");
rejoined_pivot_node = true;
}
}
}
}
// Check if we are done.
@ -296,8 +344,41 @@ fn do_drop_and_readd(cfg: TestConfig) {
}
}
// As a final step, we verify that all nodes have arrived at the same conclusion.
let out = net.verify_batches();
println!("End result: {:?}", out);
// As a final step, we verify that all nodes have arrived at the same conclusion. The pivot node
// can miss some batches while it was removed.
let full_node = net
.correct_nodes()
.find(|node| *node.id() != pivot_node_id)
.expect("Could not find a full node");
net.verify_batches(&full_node);
println!("End result: {:?}", full_node.outputs());
}
/// Restarts node 0 on the test network for adding it back as a validator.
fn restart_node_for_add<R, A>(
net: &mut VirtualNet<DHB, A>,
mut node: Node<DHB>,
join_plan: JoinPlan<usize>,
rng: &mut R,
) -> Step<DynamicHoneyBadger<Vec<usize>, usize>>
where
R: rand::Rng,
A: Adversary<DHB>,
{
println!("Restarting node {} with {:?}", node.id(), join_plan);
// TODO: When an observer node is added to the network, it should also be added to peer_ids.
let peer_ids: Vec<usize> = net
.nodes()
.map(|node| *node.id())
.filter(|id| id != node.id())
.collect();
let secret_key = node.algorithm().algo().netinfo().secret_key().clone();
let id = *node.id();
let (dhb, dhb_step) = DynamicHoneyBadger::new_joining(id, secret_key, join_plan, rng)
.expect("failed to reconstruct the pivot node");
let (sq, mut sq_step) = SenderQueue::builder(dhb, peer_ids.into_iter()).build(id);
*node.algorithm_mut() = sq;
sq_step.extend(dhb_step.map(|output| output, Message::from));
net.insert_node(node);
sq_step
}

View File

@ -1,4 +1,4 @@
use std::collections::{BTreeMap, VecDeque};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt::{self, Debug};
use std::mem;
use std::sync::Arc;
@ -9,6 +9,7 @@ use rand_derive::Rand;
use serde_derive::{Deserialize, Serialize};
use hbbft::dynamic_honey_badger::Batch;
use hbbft::sender_queue::SenderQueueableOutput;
use hbbft::{
Contribution, DaStep, DistAlgorithm, Fault, NetworkInfo, Step, Target, TargetedMessage,
};
@ -59,6 +60,12 @@ impl<D: DistAlgorithm> TestNode<D> {
&self.algo
}
/// Returns the internal algorithm's mutable instance.
#[allow(unused)] // Not used in all tests.
pub fn instance_mut(&mut self) -> &mut D {
&mut self.algo
}
/// Creates a new test node with the given broadcast instance.
fn new((algo, step): (D, DaStep<D>)) -> TestNode<D> {
TestNode {
@ -459,7 +466,7 @@ where
}
/// Pushes the messages into the queues of the corresponding recipients.
fn dispatch_messages<Q>(&mut self, sender_id: NodeId, msgs: Q)
pub fn dispatch_messages<Q>(&mut self, sender_id: NodeId, msgs: Q)
where
Q: IntoIterator<Item = TargetedMessage<D::Message, NodeId>> + Debug,
{
@ -601,21 +608,65 @@ where
D: DistAlgorithm<Output = Batch<C, NodeId>, NodeId = NodeId>,
C: Contribution + Clone,
{
/// Verifies that all nodes' outputs agree.
/// Verifies that all nodes' outputs agree, given a correct "full" node that output all
/// batches with no gaps.
///
/// The output of the full node is used to derive in expected output of other nodes in every
/// epoch. After that the check ensures that correct nodes output the same batches in epochs
/// when those nodes were participants (either validators or candidates).
#[allow(unused)] // Not used in all tests.
pub fn verify_batches(&self) {
let expected = self.nodes[&NodeId(0)].outputs().to_vec();
assert!(!expected.is_empty());
let pub_eq = |(b0, b1): (&Batch<C, _>, &Batch<C, _>)| b0.public_eq(b1);
for node in self.nodes.values() {
assert_eq!(expected.len(), node.outputs().len());
pub fn verify_batches<E>(&self, full_node: &TestNode<D>)
where
Batch<C, NodeId>: SenderQueueableOutput<NodeId, E>,
{
// Participants of epoch 0 are all validators in the test network.
let mut participants: BTreeSet<NodeId> = self
.nodes
.keys()
.cloned()
.chain(self.adv_nodes.keys().cloned())
.collect();
let mut expected: BTreeMap<NodeId, Vec<_>> = BTreeMap::new();
for batch in &full_node.outputs {
for id in &participants {
expected.entry(id.clone()).or_default().push(batch);
}
if let Some(new_participants) = batch.participant_change() {
participants = new_participants;
}
}
for (id, node) in self.nodes.iter().filter(|(&id, _)| id != full_node.id) {
let actual_epochs: BTreeSet<_> =
node.outputs.iter().map(|batch| batch.epoch()).collect();
let expected_epochs: BTreeSet<_> =
expected[id].iter().map(|batch| batch.epoch()).collect();
assert_eq!(
expected_epochs, actual_epochs,
"Output epochs of {:?} don't match the expectation.",
id
);
assert_eq!(
node.outputs.len(),
expected[id].len(),
"Output length of {:?} doesn't match the expectation",
id
);
assert!(
expected.iter().zip(node.outputs()).all(pub_eq),
"Outputs of nodes 0 and {} differ: {:?} != {:?}",
node.instance().our_id().0,
expected,
node.outputs()
node.outputs
.iter()
.zip(expected.get(id).expect("node is not expected"))
.all(|(a, b)| a.public_eq(b)),
"Outputs of {:?} don't match the expectation",
id
);
}
assert!(
self.observer
.outputs
.iter()
.zip(full_node.outputs.iter())
.all(|(a, b)| a.public_eq(b)),
"Observer outputs don't match the expectation."
);
}
}

View File

@ -3,16 +3,15 @@
mod network;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use std::iter;
use std::sync::Arc;
use itertools::Itertools;
use log::info;
use rand::{Isaac64Rng, Rng};
use hbbft::dynamic_honey_badger::DynamicHoneyBadger;
use hbbft::queueing_honey_badger::{Batch, Change, ChangeState, Input, QueueingHoneyBadger};
use hbbft::dynamic_honey_badger::{DynamicHoneyBadger, JoinPlan};
use hbbft::queueing_honey_badger::{Change, ChangeState, Input, QueueingHoneyBadger};
use hbbft::sender_queue::{Message, SenderQueue, Step};
use hbbft::{util, NetworkInfo};
@ -31,11 +30,18 @@ where
pub_keys_rm.remove(&NodeId(0));
network.input_all(Input::Change(Change::NodeChange(pub_keys_rm.clone())));
// The second half of the transactions will be input only after a node has been removed.
// The second half of the transactions will be input only after a node has been removed and
// readded.
for tx in 0..(num_txs / 2) {
network.input_all(Input::User(tx));
}
let input_second_half = |network: &mut TestNetwork<_, _>, id: NodeId| {
for tx in (num_txs / 2)..num_txs {
network.input(id, Input::User(tx));
}
};
let has_remove = |node: &TestNode<QHB>| {
node.outputs().iter().any(|batch| match batch.change() {
ChangeState::Complete(Change::NodeChange(pub_keys)) => pub_keys == &pub_keys_rm,
@ -50,31 +56,122 @@ where
})
};
// Returns `true` if the node has not output all transactions yet.
// If it has, and has advanced another epoch, it clears all messages for later epochs.
let node_busy = |node: &mut TestNode<QHB>| {
if !has_remove(node) || !has_add(node) {
return true;
}
if node.outputs().iter().flat_map(Batch::iter).unique().count() < num_txs {
return true;
}
false
// Returns `true` if the node has not output all changes or transactions yet.
let node_busy = |node: &TestNode<QHB>| {
!has_remove(node) || !has_add(node) || !node.instance().algo().queue().is_empty()
};
let mut input_add = false;
let mut awaiting_removal: BTreeSet<_> = network.nodes.iter().map(|(id, _)| *id).collect();
let mut awaiting_addition: BTreeSet<_> = network
.nodes
.iter()
.map(|(id, _)| *id)
.filter(|id| *id != NodeId(0))
.collect();
// The set of nodes awaiting the second half of user transactions.
let mut awaiting_second_half: BTreeSet<_> = awaiting_removal.clone();
// Whether node 0 was rejoined as a validator.
let mut rejoined_node0 = false;
// The removed node 0 which is to be restarted as soon as all remaining validators agree to add
// it back.
let mut saved_node0: Option<TestNode<QHB>> = None;
// Handle messages in random order until all nodes have output all transactions.
while network.nodes.values_mut().any(node_busy) {
network.step();
if !input_add && network.nodes.values().all(has_remove) {
for tx in (num_txs / 2)..num_txs {
network.input_all(Input::User(tx));
while network.nodes.values().any(node_busy) || !has_add(&network.observer) {
let stepped_id = network.step();
if awaiting_removal.contains(&stepped_id) && has_remove(&network.nodes[&stepped_id]) {
awaiting_removal.remove(&stepped_id);
info!(
"{:?} has finished waiting for node removal; still waiting: {:?}",
stepped_id, awaiting_removal
);
if awaiting_removal.is_empty() {
info!("Removing node 0 from the test network");
saved_node0 = network.nodes.remove(&NodeId(0));
}
network.input_all(Input::Change(Change::NodeChange(pub_keys_add.clone())));
input_add = true;
// Vote to add node 0 back.
if stepped_id != NodeId(0) {
network.input(
stepped_id,
Input::Change(Change::NodeChange(pub_keys_add.clone())),
);
info!(
"Input the vote to add node 0 into {:?} with netinfo {:?}",
stepped_id,
network.nodes[&stepped_id].instance().algo().netinfo()
);
}
}
if awaiting_removal.is_empty() && awaiting_addition.contains(&stepped_id) {
// If the stepped node started voting to add node 0 back, take a note of that and rejoin
// node 0.
if let Some(join_plan) = network.nodes[&stepped_id]
.outputs()
.iter()
.find_map(|batch| match batch.change() {
ChangeState::InProgress(Change::NodeChange(pub_keys))
if pub_keys == &pub_keys_add =>
{
batch.join_plan()
}
_ => None,
})
{
awaiting_addition.remove(&stepped_id);
info!(
"{:?} has finished waiting for node addition; still waiting: {:?}",
stepped_id, awaiting_addition
);
if awaiting_addition.is_empty() && !rejoined_node0 {
let node = saved_node0.take().expect("node 0 wasn't saved");
let step = restart_node_for_add(&mut network, node, join_plan);
network.dispatch_messages(NodeId(0), step.messages);
rejoined_node0 = true;
}
}
}
if rejoined_node0 && awaiting_second_half.contains(&stepped_id) {
// Input the second half of user transactions into the stepped node.
input_second_half(&mut network, stepped_id);
awaiting_second_half.remove(&stepped_id);
}
}
network.verify_batches();
let node1 = network.nodes.get(&NodeId(1)).expect("node 1 is missing");
network.verify_batches(&node1);
}
/// Restarts a stopped and removed node with a given join plan and adds the node back on the test
/// network.
fn restart_node_for_add<A>(
network: &mut TestNetwork<A, QHB>,
mut node: TestNode<QHB>,
join_plan: JoinPlan<NodeId>,
) -> Step<QueueingHoneyBadger<usize, NodeId, Vec<usize>>>
where
A: Adversary<QHB>,
{
let our_id = node.id;
info!("Restarting {:?} with {:?}", our_id, join_plan);
let observer = network.observer.id;
let peer_ids: Vec<NodeId> = network
.nodes
.keys()
.cloned()
.filter(|id| *id != our_id)
.chain(iter::once(observer))
.collect();
let secret_key = node.instance().algo().netinfo().secret_key().clone();
let mut rng = rand::thread_rng().gen::<Isaac64Rng>();
let (qhb, qhb_step) =
QueueingHoneyBadger::builder_joining(our_id, secret_key, join_plan, &mut rng)
.expect("failed to rebuild the node with a join plan")
.batch_size(3)
.build(&mut rng);
let (sq, mut sq_step) = SenderQueue::builder(qhb, peer_ids.into_iter()).build(our_id);
*node.instance_mut() = sq;
sq_step.extend(qhb_step.map(|output| output, Message::from));
network.nodes.insert(our_id, node);
sq_step
}
// Allow passing `netinfo` by value. `TestNetwork` expects this function signature.
@ -95,7 +192,7 @@ fn new_queueing_hb(
.batch_size(3)
.build(&mut rng);
let (sq, mut step) = SenderQueue::builder(qhb, peer_ids).build(our_id);
step.extend_with(qhb_step, Message::from);
step.extend(qhb_step.map(|output| output, Message::from));
(sq, step)
}