added broadcast integration test concept

This commit is contained in:
Vladimir Komendantskiy 2018-04-12 17:17:33 +01:00
parent 40575402c8
commit efdb4467c5
7 changed files with 236 additions and 28 deletions

View File

@ -10,7 +10,7 @@ use crossbeam_channel::{Sender, Receiver};
use proto::Message;
use proto_io;
use proto_io::CodecIo;
use proto_io::ProtoIo;
use messaging::SourcedMessage;
use stream_io::StreamIo;
@ -25,20 +25,21 @@ impl From<io::Error> for Error {
/// A communication task connects a remote node to the thread that manages the
/// consensus algorithm.
pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync +
From<Vec<u8>> + Into<Vec<u8>>>
pub struct CommsTask
<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
{
/// The transmit side of the multiple producer channel from comms threads.
tx: &'a Sender<SourcedMessage<T>>,
/// The receive side of the channel to the comms thread.
rx: &'a Receiver<Message<T>>,
/// The socket IO task.
io: CodecIo<T>,
io: ProtoIo,
/// The index of this comms task for identification against its remote node.
pub node_index: usize
}
impl<'a, T: Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
impl
<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
CommsTask<'a, T>
{
pub fn new(tx: &'a Sender<SourcedMessage<T>>,
@ -53,7 +54,7 @@ impl<'a, T: Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
CommsTask {
tx: tx,
rx: rx,
io: StreamIo::from_stream(stream),
io: ProtoIo::from_stream(stream),
node_index: node_index
}
}

View File

@ -49,9 +49,9 @@ extern crate crossbeam_channel;
extern crate reed_solomon_erasure;
mod connection;
mod messaging;
pub mod messaging;
mod stream_io;
mod proto;
pub mod proto;
mod proto_io;
mod commst;
mod broadcast;

View File

@ -1,6 +1,5 @@
//! The local message delivery system.
use std::fmt::Debug;
use std::sync::Arc;
use crossbeam::Scope;
use crossbeam_channel::{unbounded, Sender, Receiver};
use proto::Message;
@ -40,7 +39,7 @@ impl<T: Clone + Debug + Send + Sync> TargetedMessage<T> {
/// Message sent by a given source. The sources are consensus nodes indexed 1
/// through N where N is the total number of nodes. Sourced messages are
/// required when it is essential to know the message origin but the set of
/// recepients is unkown without further computation which is irrelevant to the
/// recepients is unknown without further computation which is irrelevant to the
/// message delivery task.
#[derive(Clone, Debug)]
pub struct SourcedMessage<T: Clone + Debug + Send + Sync> {

View File

@ -6,7 +6,6 @@ use std::net::TcpStream;
use protobuf;
use protobuf::Message as ProtobufMessage;
use proto::*;
use stream_io::StreamIo;
/// A magic key to put right before each message. An atavism of primitive serial
/// protocols.
@ -58,34 +57,31 @@ fn decode_u32_from_be(buffer: &[u8]) -> Result<u32, Error> {
Ok(result)
}
pub struct CodecIo<T> {
pub struct ProtoIo {
stream: TcpStream,
buffer: [u8; 1024 * 4],
/// FIXME: remove this dependent argument
phant: T
}
/// A message handling task.
impl<T> StreamIo<TcpStream, T, Error> for CodecIo<T>
where T: Clone + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>
impl ProtoIo
//where T: Clone + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>
{
fn from_stream(stream: TcpStream) -> Self {
CodecIo {
pub fn from_stream(stream: TcpStream) -> Self {
ProtoIo {
stream,
buffer: [0; 1024 * 4],
phant: T::from(Vec::new())
}
}
fn try_clone(&self) -> Result<CodecIo<T>, ::std::io::Error> {
Ok(CodecIo {
pub fn try_clone(&self) -> Result<ProtoIo, ::std::io::Error> {
Ok(ProtoIo {
stream: self.stream.try_clone()?,
buffer: self.buffer.clone(),
phant: T::from(Vec::new())
})
}
fn recv(&mut self) -> Result<Message<T>, Error>
pub fn recv<T>(&mut self) -> Result<Message<T>, Error>
where T: Clone + Send + Sync + From<Vec<u8>> // + Into<Vec<u8>>
{
self.stream.read_exact(&mut self.buffer[0..4])?;
let frame_start = decode_u32_from_be(&self.buffer[0..4])?;
@ -109,7 +105,8 @@ where T: Clone + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>
.map_err(|e| Error::ProtobufError(e))
}
fn send(&mut self, message: Message<T>) -> Result<(), Error>
pub fn send<T>(&mut self, message: Message<T>) -> Result<(), Error>
where T: Clone + Send + Sync + Into<Vec<u8>>
{
let mut buffer: [u8; 4] = [0; 4];
// Wrap stream

View File

@ -3,18 +3,19 @@
use std::io;
use std::io::{Read, Write};
use proto::*;
use proto::Message; // FIXME: Message should be made independent of the
// protobuf type MessageProto.
/// Trait of types of streams carrying payload of type `Message<T>` and
/// returning errors of type `Error`.
///
/// This is a stream interface independent of the choice of serialisation
/// methods.
pub trait StreamIo<Stream, T, Error>: Sized
where Stream: Read + Write, T: Send + Sync // From implies Into
pub trait StreamIo<Stream, T, Error>
where Stream: Read + Write, T: Send + Sync
{
fn from_stream(stream: Stream) -> Self;
fn try_clone(&self) -> Result<Self, io::Error>;
fn try_clone(&self) -> Result<Self, io::Error> where Self: Sized;
fn recv(&mut self) -> Result<Message<T>, Error>;
fn send(&mut self, m: Message<T>) -> Result<(), Error>;
}

156
tests/broadcast.rs Normal file
View File

@ -0,0 +1,156 @@
//! Integration test of the reliable broadcast protocol.
extern crate hbbft;
extern crate crossbeam;
#[macro_use]
extern crate crossbeam_channel;
extern crate merkle;
mod netsim;
use std::collections::HashSet;
use std::fmt::Debug;
use std::io;
use crossbeam_channel::{Sender, Receiver};
use hbbft::proto::*;
use netsim::NetSim;
/// This is a structure to start a consensus node.
pub struct TestNode<'a> {
/// Node identifier
ident: usize,
/// TX handles, one for each other node
txs: Vec<&'a Sender<Message<TestValue>>>,
/// RX handle, one for each other node
rxs: Vec<&'a Receiver<Message<TestValue>>>,
/// Optionally, a value to be broadcast by this node.
value: Option<TestValue>
}
impl<'a> TestNode<'a>
{
/// Consensus node constructor. It only initialises initial parameters.
pub fn new(ident: usize,
txs: Vec<&'a Sender<Message<TestValue>>>,
rxs: Vec<&'a Receiver<Message<TestValue>>>,
value: Option<TestValue>) -> Self
{
TestNode {
ident: ident,
txs: txs,
rxs: rxs,
value: value
}
}
pub fn run(&self) -> Result<HashSet<TestValue>, Error> {
assert_eq!(self.rxs.len(), 3);
let mut result = None;
for n in 0..3 {
self.txs[n].send(Message::Broadcast(
BroadcastMessage::Ready(Vec::new()))
).unwrap();
}
while result.is_none() {
select_loop! {
recv(self.rxs[0], message) => {
println!("Node {}/0 received {:?}", self.ident, message);
result = Some(Err(Error::NotImplemented));
}
recv(self.rxs[1], message) => {
println!("Node {}/1 received {:?}", self.ident, message);
result = Some(Err(Error::NotImplemented));
}
recv(self.rxs[2], message) => {
println!("Node {}/2 received {:?}", self.ident, message);
result = Some(Err(Error::NotImplemented));
}
}
}
result.unwrap()
}
}
#[derive(Debug, PartialEq)]
pub enum Error {
NotImplemented
}
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct TestValue {
pub value: String
}
/// `TestValue: merkle::Hashable` is derived from `TestValue: AsRef<[u8]>`.
impl AsRef<[u8]> for TestValue {
fn as_ref(&self) -> &[u8] {
self.value.as_ref()
}
}
impl From<Vec<u8>> for TestValue {
fn from(bytes: Vec<u8>) -> TestValue {
TestValue {
value: String::from_utf8(bytes).expect("Found invalid UTF-8")
}
}
}
impl From<TestValue> for Vec<u8> {
fn from(v: TestValue) -> Vec<u8> {
match v {
TestValue { value } => {
value.as_bytes().to_vec()
}
}
}
}
fn create_test_nodes<'a>(num_nodes: usize,
net: &'a NetSim<Message<TestValue>>) ->
Vec<TestNode<'a>>
{
let mut nodes = Vec::new();
for n in 0..num_nodes {
let value = TestValue {
value: format!("-{}-{}-{}-", n, n, n)
};
let mut txs = Vec::new();
let mut rxs = Vec::new();
for m in 0..num_nodes {
if n == m {
// Skip the channel back to the node itself.
continue;
}
txs.push(net.tx(n, m));
rxs.push(net.rx(m, n));
}
nodes.push(TestNode::new(n, txs, rxs, Some(value)));
}
nodes
}
#[test]
fn test_4_broadcast_nodes() {
const NUM_NODES: usize = 4;
let net: NetSim<Message<TestValue>> = NetSim::new(NUM_NODES);
let nodes = create_test_nodes(NUM_NODES, &net);
crossbeam::scope(|scope| {
let mut handles = Vec::new();
for node in nodes {
handles.push(scope.spawn(move || {
node.run()
}));
}
// Compare the set of values returned by broadcast against the expected
// set.
for h in handles {
assert_eq!(h.join(), Err(Error::NotImplemented));
}
});
}

54
tests/netsim.rs Normal file
View File

@ -0,0 +1,54 @@
//! Network simulator for testing without message serialisation. Socket
//! connections between nodes are simulated using
//! `crossbeam_channel::unbounded`.
extern crate crossbeam_channel;
use crossbeam_channel::{Sender, Receiver, unbounded};
pub struct NetSim<Message: Clone + Send + Sync> {
/// The number of simulated nodes.
num_nodes: usize,
/// All TX handles
txs: Vec<Sender<Message>>,
/// All RX handles
rxs: Vec<Receiver<Message>>,
}
impl<Message: Clone + Send + Sync> NetSim<Message> {
pub fn new(num_nodes: usize) -> Self {
assert!(num_nodes > 1);
// All channels of a totally conected network of size `num_nodes`.
let channels: Vec<(Sender<Message>, Receiver<Message>)> =
(0 .. num_nodes * num_nodes)
.map(|_| unbounded())
.collect();
let txs = channels.iter()
.map(|(tx, _)| tx.to_owned())
.collect();
let rxs = channels.iter()
.map(|(_, rx)| rx.to_owned())
.collect();
NetSim {
num_nodes: num_nodes,
txs: txs,
rxs: rxs
}
}
/// The TX side of a channel from node `src` to node `dst`.
pub fn tx(&self, src: usize, dst: usize) -> &Sender<Message> {
assert!(src < self.num_nodes);
assert!(dst < self.num_nodes);
&self.txs[src * self.num_nodes + dst]
}
/// The RX side of a channel from node `src` to node `dst`.
pub fn rx(&self, src: usize, dst: usize) -> &Receiver<Message> {
assert!(src < self.num_nodes);
assert!(dst < self.num_nodes);
&self.rxs[src * self.num_nodes + dst]
}
}