hbbft/tests/broadcast.rs

118 lines
3.4 KiB
Rust
Raw Normal View History

//! Integration test of the reliable broadcast protocol.
extern crate hbbft;
#[macro_use]
extern crate log;
extern crate crossbeam;
extern crate crossbeam_channel;
extern crate merkle;
2018-05-01 09:32:01 -07:00
extern crate rand;
2018-04-30 08:55:51 -07:00
extern crate simple_logger;
2018-05-01 09:32:01 -07:00
use rand::Rng;
use std::collections::{HashSet, VecDeque};
2018-05-01 09:32:01 -07:00
use hbbft::broadcast::{Broadcast, BroadcastTarget, TargetedBroadcastMessage};
use hbbft::messaging::ProposedValue;
use hbbft::proto::BroadcastMessage;
2018-05-01 09:32:01 -07:00
struct TestNode {
broadcast: Broadcast<usize>,
queue: VecDeque<(usize, BroadcastMessage<ProposedValue>)>,
}
2018-05-01 09:32:01 -07:00
impl TestNode {
fn new(broadcast: Broadcast<usize>) -> TestNode {
TestNode {
2018-05-01 09:32:01 -07:00
broadcast,
queue: VecDeque::new(),
}
}
}
2018-05-01 09:32:01 -07:00
/// Creates `num` nodes, and returns the set of node IDs as well as the new `TestNode`s.
fn create_test_nodes(num: usize) -> Vec<TestNode> {
let node_ids: HashSet<usize> = (0..num).collect();
(0..num)
.map(|id| {
TestNode::new(Broadcast::new(id, node_ids.clone(), num).expect("Instantiate broadcast"))
})
.collect()
}
2018-05-01 09:32:01 -07:00
/// Pushes the messages into the queues of the corresponding recipients.
fn dispatch_messages(
nodes: &mut Vec<TestNode>,
sender_id: usize,
msgs: VecDeque<TargetedBroadcastMessage<usize>>,
) {
for msg in msgs {
match msg {
TargetedBroadcastMessage {
target: BroadcastTarget::All,
message,
} => {
for (i, node) in nodes.iter_mut().enumerate() {
if i != sender_id {
node.queue.push_back((sender_id, message.clone()))
}
}
}
2018-05-01 09:32:01 -07:00
TargetedBroadcastMessage {
target: BroadcastTarget::Node(to_id),
message,
} => nodes[to_id].queue.push_back((sender_id, message)),
}
}
2018-05-01 09:32:01 -07:00
}
/// Handles a queued message in a randomly selected node.
fn handle_message(nodes: &mut Vec<TestNode>) -> (usize, Option<ProposedValue>) {
let ids: Vec<usize> = nodes
.iter()
.enumerate()
.filter(|(_, node)| !node.queue.is_empty())
.map(|(id, _)| id)
.collect();
let id = *rand::thread_rng()
.choose(&ids)
.expect("no more messages in queue");
let (from_id, msg) = nodes[id].queue.pop_front().expect("message not found");
debug!("Handling {} -> {}: {:?}", from_id, id, msg);
let (output, msgs) = nodes[id]
.broadcast
.handle_broadcast_message(&id, &msg)
.expect("handling message");
debug!("Sending: {:?}", msgs);
dispatch_messages(nodes, id, msgs);
(id, output)
}
#[test]
2018-05-01 09:32:01 -07:00
fn test_16_broadcast_nodes() {
simple_logger::init_with_level(log::Level::Debug).unwrap();
2018-05-01 09:32:01 -07:00
// Create 4 nodes.
const NUM_NODES: usize = 16;
let mut nodes = create_test_nodes(NUM_NODES);
// Make node 0 propose a value.
let proposed_value = b"Foo";
let msgs = nodes[0]
.broadcast
.propose_value(proposed_value.to_vec())
.expect("propose");
dispatch_messages(&mut nodes, 0, msgs);
// Handle messages in random order until all nodes have output the proposed value.
let mut received = 0;
while received < NUM_NODES {
let (id, output) = handle_message(&mut nodes);
if let Some(value) = output {
assert_eq!(value, proposed_value);
received += 1;
debug!("Node {} received", id);
}
2018-05-01 09:32:01 -07:00
}
}