mirror of https://github.com/poanetwork/hbbft.git
extended tests, make broadcast output only once
This commit is contained in:
parent
7b04d5e084
commit
7096251b91
|
@ -7,7 +7,7 @@ use proto::*;
|
|||
use reed_solomon_erasure as rse;
|
||||
use reed_solomon_erasure::ReedSolomon;
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::fmt::Debug;
|
||||
use std::hash::Hash;
|
||||
use std::marker::{Send, Sync};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
|
@ -17,7 +17,7 @@ use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoopState,
|
|||
QMessage, RemoteMessage, RemoteNode, SourcedMessage, Target, TargetedMessage};
|
||||
|
||||
/// A `BroadcastMessage` to be sent out, together with a target.
|
||||
#[derive(Debug)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct TargetedBroadcastMessage<NodeUid> {
|
||||
pub target: BroadcastTarget<NodeUid>,
|
||||
pub message: BroadcastMessage<ProposedValue>,
|
||||
|
@ -36,7 +36,7 @@ impl TargetedBroadcastMessage<messaging::NodeUid> {
|
|||
}
|
||||
|
||||
/// A target node for a `BroadcastMessage`.
|
||||
#[derive(Debug)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum BroadcastTarget<NodeUid> {
|
||||
All,
|
||||
Node(NodeUid),
|
||||
|
@ -50,6 +50,7 @@ struct BroadcastState {
|
|||
readys: HashMap<Vec<u8>, usize>,
|
||||
ready_sent: bool,
|
||||
ready_to_decode: bool,
|
||||
has_output: bool,
|
||||
}
|
||||
|
||||
/// Reliable Broadcast algorithm instance.
|
||||
|
@ -157,7 +158,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<NodeUid: Eq + Hash + Debug + Display + Clone> Broadcast<NodeUid> {
|
||||
impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
|
||||
pub fn new(uid: NodeUid, all_uids: HashSet<NodeUid>, num_nodes: usize) -> Result<Self, Error> {
|
||||
let num_faulty_nodes = (num_nodes - 1) / 3;
|
||||
let parity_shard_num = 2 * num_faulty_nodes;
|
||||
|
@ -179,6 +180,7 @@ impl<NodeUid: Eq + Hash + Debug + Display + Clone> Broadcast<NodeUid> {
|
|||
readys: HashMap::new(),
|
||||
ready_sent: false,
|
||||
ready_to_decode: false,
|
||||
has_output: false,
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
@ -210,6 +212,10 @@ impl<NodeUid: Eq + Hash + Debug + Display + Clone> Broadcast<NodeUid> {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn uid(&self) -> &NodeUid {
|
||||
&self.uid
|
||||
}
|
||||
|
||||
/// Breaks the input value into shards of equal length and encodes them --
|
||||
/// and some extra parity shards -- with a Reed-Solomon erasure coding
|
||||
/// scheme. The returned value contains the shard assigned to this
|
||||
|
@ -322,7 +328,7 @@ impl<NodeUid: Eq + Hash + Debug + Display + Clone> Broadcast<NodeUid> {
|
|||
if state.root_hash.is_none() {
|
||||
state.root_hash = Some(p.root_hash.clone());
|
||||
debug!(
|
||||
"Node {} Value root hash {:?}",
|
||||
"Node {:?} Value root hash {:?}",
|
||||
self.uid,
|
||||
HexBytes(&p.root_hash)
|
||||
);
|
||||
|
@ -351,7 +357,7 @@ impl<NodeUid: Eq + Hash + Debug + Display + Clone> Broadcast<NodeUid> {
|
|||
BroadcastMessage::Echo(p) => {
|
||||
if state.root_hash.is_none() && *uid == self.uid {
|
||||
state.root_hash = Some(p.root_hash.clone());
|
||||
debug!("Node {} Echo root hash {:?}", self.uid, state.root_hash);
|
||||
debug!("Node {:?} Echo root hash {:?}", self.uid, state.root_hash);
|
||||
}
|
||||
|
||||
// Call validate with the root hash as argument.
|
||||
|
@ -404,11 +410,11 @@ impl<NodeUid: Eq + Hash + Debug + Display + Clone> Broadcast<NodeUid> {
|
|||
Ok((None, no_outgoing))
|
||||
}
|
||||
} else {
|
||||
debug!("Broadcast/{} cannot validate Echo {:?}", self.uid, p);
|
||||
debug!("Broadcast/{:?} cannot validate Echo {:?}", self.uid, p);
|
||||
Ok((None, no_outgoing))
|
||||
}
|
||||
} else {
|
||||
error!("Broadcast/{} root hash not initialised", self.uid);
|
||||
error!("Broadcast/{:?} root hash not initialised", self.uid);
|
||||
Ok((None, no_outgoing))
|
||||
}
|
||||
}
|
||||
|
@ -446,7 +452,10 @@ impl<NodeUid: Eq + Hash + Debug + Display + Clone> Broadcast<NodeUid> {
|
|||
h,
|
||||
)?;
|
||||
|
||||
if !state.has_output {
|
||||
output = Some(value);
|
||||
state.has_output = true;
|
||||
}
|
||||
} else {
|
||||
state.ready_to_decode = true;
|
||||
}
|
||||
|
|
|
@ -10,108 +10,232 @@ extern crate rand;
|
|||
extern crate simple_logger;
|
||||
|
||||
use rand::Rng;
|
||||
use std::collections::{HashSet, VecDeque};
|
||||
use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
|
||||
|
||||
use hbbft::broadcast::{Broadcast, BroadcastTarget, TargetedBroadcastMessage};
|
||||
use hbbft::messaging::ProposedValue;
|
||||
use hbbft::proto::BroadcastMessage;
|
||||
|
||||
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy)]
|
||||
struct NodeId(usize);
|
||||
|
||||
type MessageQueue = VecDeque<TargetedBroadcastMessage<NodeId>>;
|
||||
|
||||
/// A "node" running a broadcast instance.
|
||||
struct TestNode {
|
||||
broadcast: Broadcast<usize>,
|
||||
queue: VecDeque<(usize, BroadcastMessage<ProposedValue>)>,
|
||||
/// This node's own ID.
|
||||
id: NodeId,
|
||||
/// The instance of the broadcast algorithm.
|
||||
broadcast: Broadcast<NodeId>,
|
||||
/// Incoming messages from other nodes that this node has not yet handled.
|
||||
queue: VecDeque<(NodeId, BroadcastMessage<ProposedValue>)>,
|
||||
/// The values this node has output so far.
|
||||
outputs: Vec<ProposedValue>,
|
||||
}
|
||||
|
||||
impl TestNode {
|
||||
fn new(broadcast: Broadcast<usize>) -> TestNode {
|
||||
/// Creates a new test node with the given broadcast instance.
|
||||
fn new(broadcast: Broadcast<NodeId>) -> TestNode {
|
||||
TestNode {
|
||||
id: *broadcast.uid(),
|
||||
broadcast,
|
||||
queue: VecDeque::new(),
|
||||
outputs: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles the first message in the node's queue.
|
||||
fn handle_message(&mut self) -> (Option<ProposedValue>, MessageQueue) {
|
||||
let (from_id, msg) = self.queue.pop_front().expect("message not found");
|
||||
debug!("Handling {:?} -> {:?}: {:?}", from_id, self.id, msg);
|
||||
let (output, msgs) = self.broadcast
|
||||
.handle_broadcast_message(&self.id, &msg)
|
||||
.expect("handling message");
|
||||
if let Some(output) = output.clone() {
|
||||
self.outputs.push(output);
|
||||
}
|
||||
(output, msgs)
|
||||
}
|
||||
}
|
||||
|
||||
/// A strategy for picking the next good node to handle a message.
|
||||
enum MessageScheduler {
|
||||
/// Picks a random node.
|
||||
Random,
|
||||
/// Picks the first non-idle node.
|
||||
First,
|
||||
}
|
||||
|
||||
impl MessageScheduler {
|
||||
/// Chooses a node to be the next one to handle a message.
|
||||
fn pick_node(&self, nodes: &BTreeMap<NodeId, TestNode>) -> NodeId {
|
||||
match *self {
|
||||
MessageScheduler::First => nodes
|
||||
.iter()
|
||||
.find(|(_, node)| !node.queue.is_empty())
|
||||
.map(|(id, _)| *id)
|
||||
.expect("no more messages in queue"),
|
||||
MessageScheduler::Random => {
|
||||
let ids: Vec<NodeId> = nodes
|
||||
.iter()
|
||||
.filter(|(_, node)| !node.queue.is_empty())
|
||||
.map(|(id, _)| *id)
|
||||
.collect();
|
||||
*rand::thread_rng()
|
||||
.choose(&ids)
|
||||
.expect("no more messages in queue")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates `num` nodes, and returns the set of node IDs as well as the new `TestNode`s.
|
||||
fn create_test_nodes(num: usize) -> Vec<TestNode> {
|
||||
let node_ids: HashSet<usize> = (0..num).collect();
|
||||
(0..num)
|
||||
.map(|id| {
|
||||
TestNode::new(Broadcast::new(id, node_ids.clone(), num).expect("Instantiate broadcast"))
|
||||
})
|
||||
.collect()
|
||||
/// An adversary that can control a set of nodes and pick the next good node to receive a message.
|
||||
trait Adversary {
|
||||
/// Chooses a node to be the next one to handle a message.
|
||||
fn pick_node(&self, nodes: &BTreeMap<NodeId, TestNode>) -> NodeId;
|
||||
|
||||
/// Adds a message sent to one of the adversary's nodes.
|
||||
fn push_message(&mut self, sender_id: NodeId, msg: TargetedBroadcastMessage<NodeId>);
|
||||
|
||||
/// Produces a list of messages to be sent from the adversary's nodes.
|
||||
fn step(&mut self) -> Vec<(NodeId, TargetedBroadcastMessage<NodeId>)>;
|
||||
}
|
||||
|
||||
/// An adversary whose nodes never send any messages.
|
||||
struct SilentAdversary {
|
||||
scheduler: MessageScheduler,
|
||||
}
|
||||
|
||||
impl SilentAdversary {
|
||||
/// Creates a new silent adversary with the given message scheduler.
|
||||
fn new(scheduler: MessageScheduler) -> SilentAdversary {
|
||||
SilentAdversary { scheduler }
|
||||
}
|
||||
}
|
||||
|
||||
impl Adversary for SilentAdversary {
|
||||
fn pick_node(&self, nodes: &BTreeMap<NodeId, TestNode>) -> NodeId {
|
||||
self.scheduler.pick_node(nodes)
|
||||
}
|
||||
|
||||
fn push_message(&mut self, _: NodeId, _: TargetedBroadcastMessage<NodeId>) {
|
||||
// All messages are ignored.
|
||||
}
|
||||
|
||||
fn step(&mut self) -> Vec<(NodeId, TargetedBroadcastMessage<NodeId>)> {
|
||||
vec![] // No messages are sent.
|
||||
}
|
||||
}
|
||||
|
||||
/// A collection of `TestNode`s representing a network.
|
||||
struct TestNetwork<A: Adversary> {
|
||||
nodes: BTreeMap<NodeId, TestNode>,
|
||||
adv_nodes: BTreeSet<NodeId>,
|
||||
adversary: A,
|
||||
}
|
||||
|
||||
impl<A: Adversary> TestNetwork<A> {
|
||||
/// Creates a new network with `good_num` good nodes, and the given `adversary` controlling
|
||||
/// `adv_num` nodes.
|
||||
fn new(good_num: usize, adv_num: usize, adversary: A) -> TestNetwork<A> {
|
||||
let node_ids: HashSet<NodeId> = (0..(good_num + adv_num)).map(NodeId).collect();
|
||||
let new_broadcast = |id: NodeId| {
|
||||
let bc = Broadcast::new(id, node_ids.clone(), node_ids.len())
|
||||
.expect("Instantiate broadcast");
|
||||
(id, TestNode::new(bc))
|
||||
};
|
||||
TestNetwork {
|
||||
nodes: (0..good_num).map(NodeId).map(new_broadcast).collect(),
|
||||
adversary,
|
||||
adv_nodes: (good_num..(good_num + adv_num)).map(NodeId).collect(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Pushes the messages into the queues of the corresponding recipients.
|
||||
fn dispatch_messages(
|
||||
nodes: &mut Vec<TestNode>,
|
||||
sender_id: usize,
|
||||
msgs: VecDeque<TargetedBroadcastMessage<usize>>,
|
||||
) {
|
||||
fn dispatch_messages(&mut self, sender_id: NodeId, msgs: MessageQueue) {
|
||||
debug!("Sending: {:?}", msgs);
|
||||
for msg in msgs {
|
||||
match msg {
|
||||
TargetedBroadcastMessage {
|
||||
target: BroadcastTarget::All,
|
||||
message,
|
||||
ref message,
|
||||
} => {
|
||||
for (i, node) in nodes.iter_mut().enumerate() {
|
||||
if i != sender_id {
|
||||
for node in self.nodes.values_mut() {
|
||||
// TODO: `Broadcast` currently assumes that messages to `All` also reach
|
||||
// ourselves.
|
||||
//if node.id != sender_id {
|
||||
node.queue.push_back((sender_id, message.clone()))
|
||||
//}
|
||||
}
|
||||
}
|
||||
self.adversary.push_message(sender_id, msg.clone());
|
||||
}
|
||||
TargetedBroadcastMessage {
|
||||
target: BroadcastTarget::Node(to_id),
|
||||
message,
|
||||
} => nodes[to_id].queue.push_back((sender_id, message)),
|
||||
ref message,
|
||||
} => {
|
||||
if self.adv_nodes.contains(&to_id) {
|
||||
self.adversary.push_message(sender_id, msg.clone());
|
||||
} else {
|
||||
self.nodes
|
||||
.get_mut(&to_id)
|
||||
.unwrap()
|
||||
.queue
|
||||
.push_back((sender_id, message.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles a queued message in a randomly selected node.
|
||||
fn handle_message(nodes: &mut Vec<TestNode>) -> (usize, Option<ProposedValue>) {
|
||||
let ids: Vec<usize> = nodes
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|(_, node)| !node.queue.is_empty())
|
||||
.map(|(id, _)| id)
|
||||
.collect();
|
||||
let id = *rand::thread_rng()
|
||||
.choose(&ids)
|
||||
.expect("no more messages in queue");
|
||||
let (from_id, msg) = nodes[id].queue.pop_front().expect("message not found");
|
||||
debug!("Handling {} -> {}: {:?}", from_id, id, msg);
|
||||
let (output, msgs) = nodes[id]
|
||||
.broadcast
|
||||
.handle_broadcast_message(&id, &msg)
|
||||
.expect("handling message");
|
||||
debug!("Sending: {:?}", msgs);
|
||||
dispatch_messages(nodes, id, msgs);
|
||||
/// Handles a queued message in a randomly selected node and returns the selected node's ID and
|
||||
/// its output value, if any.
|
||||
fn step(&mut self) -> (NodeId, Option<ProposedValue>) {
|
||||
// TODO: `self.adversary.step()`
|
||||
// Pick a random non-idle node..
|
||||
let id = self.adversary.pick_node(&self.nodes);
|
||||
let (output, msgs) = self.nodes.get_mut(&id).unwrap().handle_message();
|
||||
self.dispatch_messages(id, msgs);
|
||||
(id, output)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_16_broadcast_nodes() {
|
||||
simple_logger::init_with_level(log::Level::Debug).unwrap();
|
||||
/// Makes the node `proposer_id` propose a value.
|
||||
fn propose_value(&mut self, proposer_id: NodeId, value: ProposedValue) {
|
||||
let msgs = self.nodes[&proposer_id]
|
||||
.broadcast
|
||||
.propose_value(value)
|
||||
.expect("propose");
|
||||
self.dispatch_messages(proposer_id, msgs);
|
||||
}
|
||||
}
|
||||
|
||||
// Create 4 nodes.
|
||||
const NUM_NODES: usize = 16;
|
||||
let mut nodes = create_test_nodes(NUM_NODES);
|
||||
/// Broadcasts a value from node 0 and expects all good nodes to receive it.
|
||||
fn test_broadcast<A: Adversary>(mut network: TestNetwork<A>) {
|
||||
// TODO: This returns an error in all but the first test.
|
||||
let _ = simple_logger::init_with_level(log::Level::Debug);
|
||||
|
||||
// Make node 0 propose a value.
|
||||
let proposed_value = b"Foo";
|
||||
let msgs = nodes[0]
|
||||
.broadcast
|
||||
.propose_value(proposed_value.to_vec())
|
||||
.expect("propose");
|
||||
dispatch_messages(&mut nodes, 0, msgs);
|
||||
network.propose_value(NodeId(0), proposed_value.to_vec());
|
||||
|
||||
// Handle messages in random order until all nodes have output the proposed value.
|
||||
let mut received = 0;
|
||||
while received < NUM_NODES {
|
||||
let (id, output) = handle_message(&mut nodes);
|
||||
while network.nodes.values().any(|node| node.outputs.is_empty()) {
|
||||
let (id, output) = network.step();
|
||||
if let Some(value) = output {
|
||||
assert_eq!(value, proposed_value);
|
||||
received += 1;
|
||||
debug!("Node {} received", id);
|
||||
assert_eq!(1, network.nodes[&id].outputs.len());
|
||||
debug!("Node {:?} received", id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_11_5_broadcast_nodes_random_delivery() {
|
||||
let adversary = SilentAdversary::new(MessageScheduler::Random);
|
||||
test_broadcast(TestNetwork::new(11, 5, adversary));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_11_5_broadcast_nodes_first_delivery() {
|
||||
let adversary = SilentAdversary::new(MessageScheduler::First);
|
||||
test_broadcast(TestNetwork::new(11, 5, adversary));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue