Added fault checking on every step in the net framework (#338)

* added fault checking in the net framework

* check that the node in the fault report is not faulty

* simplified a condition

* made error on fault a parameter of VirtualNet

* updated the BA test to error on fault

* explained errors and refactored an assignment

* typo fix
This commit is contained in:
Vladimir Komendantskiy 2018-12-11 08:12:38 +00:00 committed by GitHub
parent f297d84514
commit c1c7ffff49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 173 additions and 62 deletions

View File

@ -79,8 +79,8 @@ pub enum FaultKind {
/// A structure representing the context of a faulty node. This structure
/// describes which node is faulty (`node_id`) and which faulty behavior
/// the node exhibited ('kind').
#[derive(Debug, PartialEq)]
/// that the node exhibited ('kind').
#[derive(Clone, Debug, PartialEq)]
pub struct Fault<N> {
/// The faulty node's ID.
pub node_id: N,

View File

@ -78,9 +78,8 @@ proptest!{
}
type NodeId = u16;
type Algo = BinaryAgreement<NodeId, u8>;
impl VirtualNet<Algo> {
impl VirtualNet<BinaryAgreement<NodeId, u8>> {
fn test_binary_agreement<R>(&mut self, input: Option<bool>, mut rng: R)
where
R: Rng + 'static,
@ -121,7 +120,7 @@ fn binary_agreement(cfg: TestConfig) {
num_good_nodes, num_faulty_nodes, cfg.input
);
// Create a network with `size` validators and one observer.
let mut net: VirtualNet<Algo> = NetBuilder::new(0..size as u16)
let (mut net, _) = NetBuilder::new(0..size as u16)
.num_faulty(num_faulty_nodes as usize)
.message_limit(10_000 * size as usize)
.time_limit(time::Duration::from_secs(30 * size as u64))

View File

@ -439,7 +439,7 @@ fn reordering_attack() {
let _ = env_logger::try_init();
let ids: Vec<NodeId> = (0..NUM_NODES).collect();
let adversary_netinfo: Arc<Mutex<Option<Arc<NetworkInfo<NodeId>>>>> = Default::default();
let mut net = NetBuilder::new(ids.iter().cloned())
let (mut net, _) = NetBuilder::new(ids.iter().cloned())
.adversary(AbaCommonCoinAdversary::new(adversary_netinfo.clone()))
.crank_limit(10000)
.using(move |info| {

View File

@ -1,9 +1,12 @@
//! Test network errors
use std::{fmt, time};
use std::fmt::{self, Debug, Display};
use std::time;
use failure;
use hbbft::DistAlgorithm;
use threshold_crypto as crypto;
use hbbft::{DistAlgorithm, Fault};
use super::NetMessage;
@ -15,7 +18,12 @@ where
D: DistAlgorithm,
{
/// The algorithm run by the node produced a `DistAlgorithm::Error` while processing input.
AlgorithmError {
HandleInput(D::Error),
/// The algorithm run by the node produced a `DistAlgorithm::Error` while processing input to
/// all nodes.
HandleInputAll(D::Error),
/// The algorithm run by the node produced a `DistAlgorithm::Error` while processing a message.
HandleMessage {
/// Network message that triggered the error.
msg: NetMessage<D>,
err: D::Error,
@ -29,6 +37,10 @@ where
MessageLimitExceeded(usize),
/// The execution time limit has been reached or exceeded.
TimeLimitHit(time::Duration),
/// A `Fault` is encountered in a step of a `DistAlgorithm`.
Fault(Fault<D::NodeId>),
/// An error occurred while generating initial keys for threshold cryptography.
InitialKeyGeneration(crypto::error::Error),
}
// Note: Deriving [Debug](std::fmt::Debug), [Fail](failure::Fail) and through that,
@ -40,13 +52,21 @@ where
//
// * <https://github.com/rust-lang/rust/issues/26925>
// * <https://github.com/rust-lang/rust/issues/26925#issuecomment-405189266>
impl<D> fmt::Display for CrankError<D>
impl<D> Display for CrankError<D>
where
D: DistAlgorithm,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
CrankError::AlgorithmError { msg, err } => write!(
CrankError::HandleInput(err) => {
write!(f, "The algorithm could not process input: {:?}", err)
}
CrankError::HandleInputAll(err) => write!(
f,
"The algorithm could not process input to all nodes: {:?}",
err
),
CrankError::HandleMessage { msg, err } => write!(
f,
"The algorithm could not process network message {:?}. Error: {:?}",
msg, err
@ -65,18 +85,32 @@ where
CrankError::TimeLimitHit(lim) => {
write!(f, "Time limit of {} seconds exceeded.", lim.as_secs())
}
CrankError::Fault(fault) => {
write!(f, "Node {:?} is faulty: {:?}.", fault.node_id, fault.kind)
}
CrankError::InitialKeyGeneration(err) => write!(
f,
"An error occurred while generating initial keys for threshold cryptography: {:?}.",
err
),
}
}
}
impl<D> fmt::Debug for CrankError<D>
impl<D> Debug for CrankError<D>
where
D: DistAlgorithm,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
CrankError::AlgorithmError { msg, err } => f
.debug_struct("AlgorithmError")
CrankError::HandleInput(err) => {
f.debug_struct("HandleInput").field("err", err).finish()
}
CrankError::HandleInputAll(err) => {
f.debug_struct("HandleInputAll").field("err", err).finish()
}
CrankError::HandleMessage { msg, err } => f
.debug_struct("HandleMessage")
.field("msg", msg)
.field("err", err)
.finish(),
@ -88,6 +122,10 @@ where
f.debug_tuple("MessageLimitExceeded").field(max).finish()
}
CrankError::TimeLimitHit(lim) => f.debug_tuple("TimeLimitHit").field(lim).finish(),
CrankError::Fault(fault) => f.debug_tuple("Fault").field(fault).finish(),
CrankError::InitialKeyGeneration(err) => {
f.debug_tuple("InitialKeyGeneration").field(err).finish()
}
}
}
}
@ -98,7 +136,9 @@ where
{
fn cause(&self) -> Option<&failure::Fail> {
match self {
CrankError::AlgorithmError { err, .. } => Some(err),
CrankError::HandleInput(err) | CrankError::HandleInputAll(err) => Some(err),
CrankError::HandleMessage { err, .. } => Some(err),
CrankError::InitialKeyGeneration(err) => Some(err),
_ => None,
}
}

View File

@ -22,11 +22,10 @@ use std::{cmp, collections, env, fmt, fs, io, ops, process, time};
use rand;
use rand::{Rand, Rng};
use threshold_crypto as crypto;
use hbbft::dynamic_honey_badger::Batch;
use hbbft::util::SubRng;
use hbbft::{self, Contribution, DaStep, DistAlgorithm, NetworkInfo, NodeIdT, Step};
use hbbft::{self, Contribution, DaStep, DistAlgorithm, Fault, NetworkInfo, NodeIdT, Step};
use try_some;
@ -73,6 +72,8 @@ pub struct Node<D: DistAlgorithm> {
is_faulty: bool,
/// Captured algorithm outputs, in order.
outputs: Vec<D::Output>,
/// Collected fault log, in order.
faults: Vec<Fault<D::NodeId>>,
}
impl<D> fmt::Debug for Node<D>
@ -96,6 +97,7 @@ impl<D: DistAlgorithm> Node<D> {
algorithm,
is_faulty,
outputs: Vec::new(),
faults: Vec::new(),
}
}
@ -132,6 +134,23 @@ impl<D: DistAlgorithm> Node<D> {
pub fn outputs(&self) -> &[D::Output] {
self.outputs.as_slice()
}
/// List faults so far.
///
/// All faults are collected for reference purposes.
#[inline]
pub fn faults(&self) -> &[Fault<D::NodeId>] {
self.faults.as_slice()
}
/// Collects all outputs and faults (not required for network operation) for user convenience.
fn store_step(&mut self, step: &DaStep<D>)
where
D::Output: Clone,
{
self.outputs.extend(step.output.iter().cloned());
self.faults.extend(step.fault_log.0.iter().cloned());
}
}
/// A network message on the virtual network.
@ -199,7 +218,8 @@ fn process_step<'a, D>(
sender: D::NodeId,
step: &DaStep<D>,
dest: &mut collections::VecDeque<NetMessage<D>>,
) -> usize
error_on_fault: bool,
) -> Result<usize, CrankError<D>>
where
D: DistAlgorithm + 'a,
D::Message: Clone,
@ -244,14 +264,19 @@ where
}
}
// Collect all outputs (not required for network operation) as a convenience for the user.
nodes
.get_mut(&sender)
.expect("Trying to process a step with non-existing node ID")
.outputs
.extend(step.output.iter().cloned());
message_count
.store_step(step);
if error_on_fault {
// Verify that no correct node is reported as faulty.
for fault in &step.fault_log.0 {
if nodes.get(&fault.node_id).map_or(false, |n| !n.is_faulty()) {
return Err(CrankError::Fault(fault.clone()));
}
}
}
Ok(message_count)
}
/// New network node construction information.
@ -318,6 +343,9 @@ where
message_limit: Option<usize>,
/// Optional time limit.
time_limit: Option<time::Duration>,
/// Property to cause an error if a `Fault` is output from a correct node. By default,
/// encountering a fault leads to an error.
error_on_fault: bool,
/// Random number generator used to generate keys.
rng: Option<Box<dyn Rng>>,
}
@ -336,6 +364,7 @@ where
.field("crank_limit", &self.crank_limit)
.field("message_limit", &self.message_limit)
.field("time_limit", &self.time_limit)
.field("error_on_fault", &self.error_on_fault)
.field("rng", &"<RNG>")
.finish()
}
@ -367,6 +396,7 @@ where
crank_limit: None,
message_limit: None,
time_limit: DEFAULT_TIME_LIMIT,
error_on_fault: true,
rng: None,
}
}
@ -456,6 +486,16 @@ where
self
}
/// Property to cause an error if a `Fault` is output from a correct node. By default,
/// encountering a fault leads to an error.
///
/// The default setting `true` can be changed using this function.
#[inline]
pub fn error_on_fault(mut self, error_on_fault: bool) -> Self {
self.error_on_fault = error_on_fault;
self
}
/// Constructor function (with step).
///
/// The constructor function is used to construct each node in the network. Any step returned
@ -489,7 +529,7 @@ where
///
/// If the total number of nodes is not `> 3 * num_faulty`, construction will panic.
#[inline]
pub fn build(self) -> Result<VirtualNet<D>, crypto::error::Error> {
pub fn build(self) -> Result<(VirtualNet<D>, Vec<(D::NodeId, DaStep<D>)>), CrankError<D>> {
let rng: Box<dyn Rng> = self.rng.unwrap_or_else(|| Box::new(rand::thread_rng()));
// The time limit can be overriden through environment variables:
@ -513,9 +553,13 @@ where
// Note: Closure is not redundant, won't compile without it.
#[cfg_attr(feature = "cargo-clippy", allow(redundant_closure))]
let mut net = VirtualNet::new(self.node_ids, self.num_faulty as usize, rng, move |node| {
cons(node)
})?;
let (mut net, steps) = VirtualNet::new(
self.node_ids,
self.num_faulty as usize,
rng,
move |node| cons(node),
self.error_on_fault,
)?;
if self.adversary.is_some() {
net.adversary = self.adversary;
@ -537,7 +581,7 @@ where
net.message_limit = self.message_limit;
net.time_limit = time_limit;
Ok(net)
Ok((net, steps))
}
}
@ -569,6 +613,10 @@ where
time_limit: Option<time::Duration>,
/// The instant the network was created.
start_time: time::Instant,
/// Property to cause an error if a `Fault` is output from a correct node. Setting this to
/// `false` switches allows to carry on with the test despite `Fault`s reported for a correct
/// node.
error_on_fault: bool,
}
impl<D> fmt::Debug for VirtualNet<D>
@ -585,6 +633,7 @@ where
.field("crank_limit", &self.crank_limit)
.field("message_count", &self.message_count)
.field("message_limit", &self.message_limit)
.field("error_on_fault", &self.error_on_fault)
.finish()
}
}
@ -714,7 +763,8 @@ where
/// construct nodes, the `cons` function is passed the ID and the generated `NetworkInfo` and
/// expected to return a (`DistAlgorithm`, `Step`) tuple.
///
/// All messages from the resulting step are queued for delivery.
/// All messages from the resulting step are queued for delivery. The function outputs the
/// initial steps of the nodes in the constructed network for testing purposes.
///
/// This function is not used directly, instead the `NetBuilder` should be used.
///
@ -727,14 +777,16 @@ where
faulty: usize,
mut rng: R,
cons: F,
) -> Result<Self, crypto::error::Error>
error_on_fault: bool,
) -> Result<(Self, Vec<(D::NodeId, DaStep<D>)>), CrankError<D>>
where
F: Fn(NewNodeInfo<D>) -> (D, DaStep<D>),
I: IntoIterator<Item = D::NodeId>,
R: rand::Rng,
{
// Generate a new set of cryptographic keys for threshold cryptography.
let net_infos = NetworkInfo::generate_map(node_ids, &mut rng)?;
let net_infos = NetworkInfo::generate_map(node_ids, &mut rng)
.map_err(CrankError::InitialKeyGeneration)?;
assert!(
faulty * 3 < net_infos.len(),
@ -762,27 +814,33 @@ where
let mut message_count: usize = 0;
// For every recorded step, apply it.
for (sender, step) in steps {
message_count = message_count.saturating_add(process_step(
for (sender, step) in &steps {
let n = process_step(
&mut nodes,
sender,
&step,
sender.clone(),
step,
&mut messages,
));
error_on_fault,
)?;
message_count = message_count.saturating_add(n);
}
Ok(VirtualNet {
nodes,
messages,
adversary: Some(Box::new(adversary::NullAdversary::new())),
trace: None,
crank_count: 0,
crank_limit: None,
message_count,
message_limit: None,
time_limit: None,
start_time: time::Instant::now(),
})
Ok((
VirtualNet {
nodes,
messages,
adversary: Some(Box::new(adversary::NullAdversary::new())),
trace: None,
crank_count: 0,
crank_limit: None,
message_count,
message_limit: None,
time_limit: None,
start_time: time::Instant::now(),
error_on_fault: true,
},
steps.into_iter().collect(),
))
}
/// Helper function to dispatch messages.
@ -796,13 +854,13 @@ where
.ok_or_else(|| CrankError::NodeDisappeared(msg.to.clone()))?;
// Store a copy of the message, in case we need to pass it to the error variant.
// By reducing the information in `CrankError::AlgorithmError`, we could reduce overhead
// By reducing the information in `CrankError::HandleMessage`, we could reduce overhead
// here if necessary.
let msg_copy = msg.clone();
let step = node
.algorithm
.handle_message(&msg.from, msg.payload)
.map_err(move |err| CrankError::AlgorithmError { msg: msg_copy, err })?;
.map_err(move |err| CrankError::HandleMessage { msg: msg_copy, err })?;
Ok(step)
}
@ -816,20 +874,26 @@ where
///
/// Panics if `id` does not name a valid node.
#[inline]
pub fn send_input(&mut self, id: D::NodeId, input: D::Input) -> Result<DaStep<D>, D::Error> {
pub fn send_input(
&mut self,
id: D::NodeId,
input: D::Input,
) -> Result<DaStep<D>, CrankError<D>> {
let step = self
.nodes
.get_mut(&id)
.expect("cannot handle input on non-existing node")
.algorithm
.handle_input(input)?;
.handle_input(input)
.map_err(CrankError::HandleInput)?;
self.message_count = self.message_count.saturating_add(process_step(
&mut self.nodes,
id,
&step,
&mut self.messages,
));
self.error_on_fault,
)?);
Ok(step)
}
@ -915,12 +979,16 @@ where
// All messages are expanded and added to the queue. We opt for copying them, so we can
// return unaltered step later on for inspection.
self.message_count = self.message_count.saturating_add(process_step(
self.message_count = self.message_count.saturating_add(match process_step(
&mut self.nodes,
receiver.clone(),
&step,
&mut self.messages,
));
self.error_on_fault,
) {
Ok(n) => n,
Err(e) => return Some(Err(e)),
});
// Increase the crank count.
self.crank_count += 1;
@ -956,7 +1024,7 @@ where
pub fn broadcast_input<'a>(
&'a mut self,
input: &'a D::Input,
) -> Result<Vec<(D::NodeId, DaStep<D>)>, D::Error> {
) -> Result<Vec<(D::NodeId, DaStep<D>)>, CrankError<D>> {
// Note: The tricky lifetime annotation basically says that the input value given must
// live as long as the iterator returned lives (because it is cloned on every step,
// with steps only evaluated each time `next()` is called. For the same reason the
@ -970,19 +1038,23 @@ where
.map(move |node| {
Ok((
node.id().clone(),
node.algorithm.handle_input(input.clone())?,
node.algorithm
.handle_input(input.clone())
.map_err(CrankError::HandleInputAll)?,
))
}).collect::<Result<_, _>>()?;
// Process all messages from all steps in the queue.
steps.iter().for_each(|(id, step)| {
self.message_count = self.message_count.saturating_add(process_step(
for (id, step) in &steps {
let n = process_step(
&mut self.nodes,
id.clone(),
step,
&mut self.messages,
));
});
self.error_on_fault,
)?;
self.message_count = self.message_count.saturating_add(n);
}
Ok(steps)
}

View File

@ -93,7 +93,7 @@ fn do_drop_and_readd(cfg: TestConfig) {
let mut rng: TestRng = TestRng::from_seed(cfg.seed);
// First, we create a new test network with Honey Badger instances.
let mut net = NetBuilder::new(0..cfg.dimension.size())
let (mut net, _) = NetBuilder::new(0..cfg.dimension.size())
.num_faulty(cfg.dimension.faulty())
// Limited to 15k messages per node.
.message_limit(15_000 * cfg.dimension.size() as usize)