integration test of broadcast mostly ready; there is a thread termination issue however

This commit is contained in:
Vladimir Komendantskiy 2018-04-13 18:28:41 +01:00
parent efdb4467c5
commit d9bc81fe5f
8 changed files with 306 additions and 88 deletions

View File

@ -37,8 +37,8 @@ pub struct Instance<'a, T: 'a + Clone + Debug + Send + Sync> {
num_faulty_nodes: usize num_faulty_nodes: usize
} }
impl<'a, T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> impl<'a, T: Clone + Debug + Hashable + Send + Sync
+ From<Vec<u8>>> + Into<Vec<u8>> + From<Vec<u8>>>
Instance<'a, T> Instance<'a, T>
{ {
pub fn new(tx: &'a Sender<TargetedMessage<T>>, pub fn new(tx: &'a Sender<TargetedMessage<T>>,
@ -96,17 +96,21 @@ pub enum Error<T: Clone + Debug + Send + Sync> {
Recv(RecvError) Recv(RecvError)
} }
impl<T: Clone + Debug + Send + Sync> From<rse::Error> for Error<T> { impl<T: Clone + Debug + Send + Sync>
From<rse::Error> for Error<T>
{
fn from(err: rse::Error) -> Error<T> { Error::ReedSolomon(err) } fn from(err: rse::Error) -> Error<T> { Error::ReedSolomon(err) }
} }
impl<T: Clone + Debug + Send + Sync> From<SendError<TargetedMessage<T>>> impl<T: Clone + Debug + Send + Sync>
for Error<T> From<SendError<TargetedMessage<T>>> for Error<T>
{ {
fn from(err: SendError<TargetedMessage<T>>) -> Error<T> { Error::Send(err) } fn from(err: SendError<TargetedMessage<T>>) -> Error<T> { Error::Send(err) }
} }
impl<T: Clone + Debug + Send + Sync> From<RecvError> for Error<T> { impl<T: Clone + Debug + Send + Sync>
From<RecvError> for Error<T>
{
fn from(err: RecvError) -> Error<T> { Error::Recv(err) } fn from(err: RecvError) -> Error<T> { Error::Recv(err) }
} }
@ -119,8 +123,7 @@ fn send_shards<'a, T>(value: T,
tx: &'a Sender<TargetedMessage<T>>, tx: &'a Sender<TargetedMessage<T>>,
coding: &ReedSolomon) -> coding: &ReedSolomon) ->
Result<Proof<T>, Error<T>> Result<Proof<T>, Error<T>>
where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>
+ From<Vec<u8>>
{ {
let data_shard_num = coding.data_shard_count(); let data_shard_num = coding.data_shard_count();
let parity_shard_num = coding.parity_shard_count(); let parity_shard_num = coding.parity_shard_count();
@ -206,8 +209,7 @@ fn inner_run<'a, T>(tx: &'a Sender<TargetedMessage<T>>,
num_nodes: usize, num_nodes: usize,
num_faulty_nodes: usize) -> num_faulty_nodes: usize) ->
Result<T, Error<T>> Result<T, Error<T>>
where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>
+ From<Vec<u8>>
{ {
// Erasure coding scheme: N - 2f value shards and 2f parity shards // Erasure coding scheme: N - 2f value shards and 2f parity shards
let parity_shard_num = 2 * num_faulty_nodes; let parity_shard_num = 2 * num_faulty_nodes;

View File

@ -12,7 +12,6 @@ use proto::Message;
use proto_io; use proto_io;
use proto_io::ProtoIo; use proto_io::ProtoIo;
use messaging::SourcedMessage; use messaging::SourcedMessage;
use stream_io::StreamIo;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
@ -38,8 +37,7 @@ pub struct CommsTask
pub node_index: usize pub node_index: usize
} }
impl impl <'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
CommsTask<'a, T> CommsTask<'a, T>
{ {
pub fn new(tx: &'a Sender<SourcedMessage<T>>, pub fn new(tx: &'a Sender<SourcedMessage<T>>,

View File

@ -50,11 +50,10 @@ extern crate reed_solomon_erasure;
mod connection; mod connection;
pub mod messaging; pub mod messaging;
mod stream_io;
pub mod proto; pub mod proto;
mod proto_io; mod proto_io;
mod commst; mod commst;
mod broadcast; pub mod broadcast;
mod agreement; pub mod agreement;
pub mod node; pub mod node;

View File

@ -1,7 +1,7 @@
//! The local message delivery system. //! The local message delivery system.
use std::fmt::Debug; use std::fmt::Debug;
use crossbeam::Scope; use crossbeam::{Scope, ScopedJoinHandle};
use crossbeam_channel::{unbounded, Sender, Receiver}; use crossbeam_channel::{bounded, unbounded, Sender, Receiver};
use proto::Message; use proto::Message;
/// Message destination can be either of the two: /// Message destination can be either of the two:
@ -23,7 +23,8 @@ pub struct TargetedMessage<T: Clone + Debug + Send + Sync> {
pub message: Message<T> pub message: Message<T>
} }
impl<T: Clone + Debug + Send + Sync> TargetedMessage<T> { impl<T: Clone + Debug + Send + Sync> TargetedMessage<T>
{
/// Initialises a message while checking parameter preconditions. /// Initialises a message while checking parameter preconditions.
pub fn new(target: Target, message: Message<T>) -> Option<Self> { pub fn new(target: Target, message: Message<T>) -> Option<Self> {
match target { match target {
@ -70,6 +71,10 @@ pub struct Messaging<T: Clone + Debug + Send + Sync> {
to_algo_rxs: Vec<Receiver<SourcedMessage<T>>>, to_algo_rxs: Vec<Receiver<SourcedMessage<T>>>,
/// TX handle to be used by algo tasks. /// TX handle to be used by algo tasks.
from_algo_tx: Sender<TargetedMessage<T>>, from_algo_tx: Sender<TargetedMessage<T>>,
/// Control channel used to stop the listening thread.
stop_tx: Sender<()>,
stop_rx: Receiver<()>,
} }
impl<T: Clone + Debug + Send + Sync> Messaging<T> { impl<T: Clone + Debug + Send + Sync> Messaging<T> {
@ -101,20 +106,25 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
.collect(); .collect();
let (from_algo_tx, from_algo_rx) = unbounded(); let (from_algo_tx, from_algo_rx) = unbounded();
let (stop_tx, stop_rx) = bounded(1);
Messaging { Messaging {
num_nodes: num_nodes, num_nodes,
// internally used handles // internally used handles
to_comms_txs: to_comms_txs, to_comms_txs,
from_comms_rx: from_comms_rx, from_comms_rx,
to_algo_txs: to_algo_txs, to_algo_txs,
from_algo_rx: from_algo_rx, from_algo_rx,
// externally used handles // externally used handles
to_comms_rxs: to_comms_rxs, to_comms_rxs,
from_comms_tx: from_comms_tx, from_comms_tx,
to_algo_rxs: to_algo_rxs, to_algo_rxs,
from_algo_tx: from_algo_tx, from_algo_tx,
stop_tx,
stop_rx,
} }
} }
@ -138,8 +148,13 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
&self.from_algo_tx &self.from_algo_tx
} }
/// Gives the ownership of the handle to stop the message receive loop.
pub fn stop_tx(&self) -> Sender<()> {
self.stop_tx.to_owned()
}
/// Spawns the message delivery thread in a given thread scope. /// Spawns the message delivery thread in a given thread scope.
pub fn spawn<'a>(&self, scope: &Scope<'a>) pub fn spawn<'a>(&self, scope: &Scope<'a>) -> ScopedJoinHandle<()>
where T: 'a where T: 'a
{ {
let to_comms_txs = self.to_comms_txs.to_owned(); let to_comms_txs = self.to_comms_txs.to_owned();
@ -147,9 +162,12 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
let to_algo_txs = self.to_algo_txs.to_owned(); let to_algo_txs = self.to_algo_txs.to_owned();
let from_algo_rx = self.from_algo_rx.to_owned(); let from_algo_rx = self.from_algo_rx.to_owned();
let stop_rx = self.stop_rx.to_owned();
let mut stop = false;
scope.spawn(move || { scope.spawn(move || {
// This loop forwards messages according to their metadata. // This loop forwards messages according to their metadata.
loop { select_loop! { while !stop { select_loop! {
recv(from_algo_rx, message) => { recv(from_algo_rx, message) => {
match message { match message {
TargetedMessage { TargetedMessage {
@ -183,8 +201,11 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
for tx in to_algo_txs.iter() { for tx in to_algo_txs.iter() {
tx.send(message.clone()).unwrap(); tx.send(message.clone()).unwrap();
} }
},
recv(stop_rx, _) => {
stop = true;
} }
}} // end of select_loop! }} // end of select_loop!
}); })
} }
} }

View File

@ -37,7 +37,8 @@ pub struct Node<T> {
value: Option<T> value: Option<T>
} }
impl<T: Clone + Debug + Hashable + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>> impl<T: Clone + Debug + Hashable + PartialEq + Send + Sync
+ From<Vec<u8>> + Into<Vec<u8>>>
Node<T> Node<T>
{ {
/// Consensus node constructor. It only initialises initial parameters. /// Consensus node constructor. It only initialises initial parameters.
@ -61,6 +62,7 @@ impl<T: Clone + Debug + Hashable + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
let from_comms_tx = messaging.from_comms_tx(); let from_comms_tx = messaging.from_comms_tx();
let to_algo_rxs = messaging.to_algo_rxs(); let to_algo_rxs = messaging.to_algo_rxs();
let from_algo_tx = messaging.from_algo_tx(); let from_algo_tx = messaging.from_algo_tx();
let stop_tx = messaging.stop_tx();
// All spawned threads will have exited by the end of the scope. // All spawned threads will have exited by the end of the scope.
crossbeam::scope(|scope| { crossbeam::scope(|scope| {
@ -132,6 +134,9 @@ impl<T: Clone + Debug + Hashable + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
}); });
} }
// Stop the messaging task.
stop_tx.send(()).unwrap();
// TODO: continue the implementation of the asynchronous common // TODO: continue the implementation of the asynchronous common
// subset algorithm. // subset algorithm.
Err(Error::NotImplemented) Err(Error::NotImplemented)

View File

@ -1,21 +0,0 @@
//! Abstract interface to serialised IO.
use std::io;
use std::io::{Read, Write};
use proto::Message; // FIXME: Message should be made independent of the
// protobuf type MessageProto.
/// Trait of types of streams carrying payload of type `Message<T>` and
/// returning errors of type `Error`.
///
/// This is a stream interface independent of the choice of serialisation
/// methods.
pub trait StreamIo<Stream, T, Error>
where Stream: Read + Write, T: Send + Sync
{
fn from_stream(stream: Stream) -> Self;
fn try_clone(&self) -> Result<Self, io::Error> where Self: Sized;
fn recv(&mut self) -> Result<Message<T>, Error>;
fn send(&mut self, m: Message<T>) -> Result<(), Error>;
}

View File

@ -1,29 +1,40 @@
//! Integration test of the reliable broadcast protocol. //! Integration test of the reliable broadcast protocol.
extern crate hbbft; extern crate hbbft;
#[macro_use]
extern crate log;
extern crate simple_logger;
extern crate crossbeam; extern crate crossbeam;
#[macro_use] #[macro_use]
extern crate crossbeam_channel; extern crate crossbeam_channel;
extern crate merkle; extern crate merkle;
mod netsim; mod netsim;
mod node_comms;
use std::sync::Arc;
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt::Debug; use std::fmt::Debug;
use std::io; use std::io;
use crossbeam_channel::{Sender, Receiver}; use crossbeam::{Scope, ScopedJoinHandle};
use crossbeam_channel::{bounded, Sender, Receiver};
use hbbft::proto::*; use hbbft::proto::*;
use hbbft::messaging::{Messaging, SourcedMessage};
use hbbft::broadcast;
use netsim::NetSim; use netsim::NetSim;
use node_comms::CommsTask;
/// This is a structure to start a consensus node. /// This is a structure to start a consensus node.
pub struct TestNode<'a> { pub struct TestNode<'a> {
/// Node identifier /// Node identifier.
ident: usize, node_index: usize,
/// TX handles, one for each other node /// Total number of nodes.
num_nodes: usize,
/// TX handles, one for each other node.
txs: Vec<&'a Sender<Message<TestValue>>>, txs: Vec<&'a Sender<Message<TestValue>>>,
/// RX handle, one for each other node /// RX handle, one for each other node.
rxs: Vec<&'a Receiver<Message<TestValue>>>, rxs: Vec<&'a Receiver<Message<TestValue>>>,
/// Optionally, a value to be broadcast by this node. /// Optionally, a value to be broadcast by this node.
value: Option<TestValue> value: Option<TestValue>
@ -32,52 +43,143 @@ pub struct TestNode<'a> {
impl<'a> TestNode<'a> impl<'a> TestNode<'a>
{ {
/// Consensus node constructor. It only initialises initial parameters. /// Consensus node constructor. It only initialises initial parameters.
pub fn new(ident: usize, pub fn new(node_index: usize,
num_nodes: usize,
txs: Vec<&'a Sender<Message<TestValue>>>, txs: Vec<&'a Sender<Message<TestValue>>>,
rxs: Vec<&'a Receiver<Message<TestValue>>>, rxs: Vec<&'a Receiver<Message<TestValue>>>,
value: Option<TestValue>) -> Self value: Option<TestValue>) -> Self
{ {
TestNode { TestNode {
ident: ident, node_index: node_index,
num_nodes: num_nodes,
txs: txs, txs: txs,
rxs: rxs, rxs: rxs,
value: value value: value
} }
} }
pub fn run(&self) -> Result<HashSet<TestValue>, Error> { pub fn run(&self, messaging: Messaging<TestValue>) ->
Result<HashSet<TestValue>, Error<TestValue>>
{
assert_eq!(self.rxs.len(), 3); assert_eq!(self.rxs.len(), 3);
let mut result = None;
for n in 0..3 { let to_comms_rxs = messaging.to_comms_rxs();
self.txs[n].send(Message::Broadcast( let from_comms_tx = messaging.from_comms_tx();
BroadcastMessage::Ready(Vec::new())) let to_algo_rxs = messaging.to_algo_rxs();
).unwrap(); let from_algo_tx = messaging.from_algo_tx();
let ref to_algo_rx0 = to_algo_rxs[0];
let value = self.value.to_owned();
let num_nodes = self.num_nodes;
let mut values = HashSet::new();
crossbeam::scope(|scope| {
let mut handles = Vec::new();
// Spawn the 0-th instance corresponding to this node. The return
// value shall be equal to `value` if computation succeeded or error
// otherwise.
handles.push(scope.spawn(move || {
broadcast::Instance::new(from_algo_tx,
to_algo_rx0,
value,
num_nodes,
0)
.run()
}));
// Control TX handles to stop all comms threads.
let mut comms_stop_txs = Vec::new();
// Spawn instances 1 through num_nodes-1 together with simulated
// remote comms tasks.
for i in 1..num_nodes {
// Make a channel to be used to stop the comms task.
let (comms_stop_tx, comms_stop_rx): (Sender<()>, Receiver<()>) =
bounded(1);
// Record the TX handle for using it later.
comms_stop_txs.push(comms_stop_tx);
// Spawn the comms task.
scope.spawn(move || {
// Termination condition variable.
let mut stop = false;
// Receive messages from the simulated node or locally.
while !stop { select_loop! {
// Receive from the simulated remote node.
recv(self.rxs[i-1], message) => {
debug!("Node {}/{} received {:?}",
self.node_index, i, message);
from_comms_tx.send(
SourcedMessage {
source: i,
message
}).unwrap();
},
// Receive from an algorithm via local
// messaging. Forward the message to the simulated
// remote node.
recv(to_comms_rxs[i-1], message) => {
self.txs[i-1].send(message).unwrap();
} }
while result.is_none() { recv(comms_stop_rx, _) => {
select_loop! { stop = true;
recv(self.rxs[0], message) => {
println!("Node {}/0 received {:?}", self.ident, message);
result = Some(Err(Error::NotImplemented));
} }
recv(self.rxs[1], message) => { }}
println!("Node {}/1 received {:?}", self.ident, message); });
result = Some(Err(Error::NotImplemented));
let ref to_algo_rx = to_algo_rxs[i];
// Spawn a broadcast instance associated with the above comms
// task.
handles.push(scope.spawn(move || {
broadcast::Instance::new(from_algo_tx,
to_algo_rx,
None,
num_nodes,
i)
.run()
}));
} }
recv(self.rxs[2], message) => {
println!("Node {}/2 received {:?}", self.ident, message); let mut error = None;
result = Some(Err(Error::NotImplemented));
// Collect the values computed by broadcast instances.
for h in handles {
match h.join() {
Ok(v) => {
values.insert(v);
},
Err(e) => {
error = Some(Error::Broadcast(e));
} }
};
} }
// Stop the comms tasks.
for tx in comms_stop_txs {
tx.send(()).unwrap();
} }
result.unwrap()
if error.is_some() {
Err(error.unwrap())
}
else {
Ok(values)
}
})
} }
} }
#[derive(Debug, PartialEq)] #[derive(Clone, Debug)]
pub enum Error { pub enum Error<T: Clone + Debug + Send + Sync> {
Broadcast(broadcast::Error<T>),
NotImplemented NotImplemented
} }
impl<T: Clone + Debug + Send + Sync> From<broadcast::Error<T>> for Error<T> {
fn from(e: broadcast::Error<T>) -> Error<T> { Error::Broadcast(e) }
}
#[derive(Clone, Debug, Hash, PartialEq, Eq)] #[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct TestValue { pub struct TestValue {
pub value: String pub value: String
@ -108,6 +210,7 @@ impl From<TestValue> for Vec<u8> {
} }
} }
/// Creates a vector of test nodes but does not run them.
fn create_test_nodes<'a>(num_nodes: usize, fn create_test_nodes<'a>(num_nodes: usize,
net: &'a NetSim<Message<TestValue>>) -> net: &'a NetSim<Message<TestValue>>) ->
Vec<TestNode<'a>> Vec<TestNode<'a>>
@ -119,6 +222,7 @@ fn create_test_nodes<'a>(num_nodes: usize,
}; };
let mut txs = Vec::new(); let mut txs = Vec::new();
let mut rxs = Vec::new(); let mut rxs = Vec::new();
// Set up comms channels to other nodes.
for m in 0..num_nodes { for m in 0..num_nodes {
if n == m { if n == m {
// Skip the channel back to the node itself. // Skip the channel back to the node itself.
@ -127,30 +231,48 @@ fn create_test_nodes<'a>(num_nodes: usize,
txs.push(net.tx(n, m)); txs.push(net.tx(n, m));
rxs.push(net.rx(m, n)); rxs.push(net.rx(m, n));
} }
nodes.push(TestNode::new(n, txs, rxs, Some(value))); nodes.push(TestNode::new(n, num_nodes, txs, rxs, Some(value)));
} }
nodes nodes
} }
#[test] #[test]
fn test_4_broadcast_nodes() { fn test_4_broadcast_nodes() {
simple_logger::init_with_level(log::Level::Debug).unwrap();
const NUM_NODES: usize = 4; const NUM_NODES: usize = 4;
let net: NetSim<Message<TestValue>> = NetSim::new(NUM_NODES); let net: NetSim<Message<TestValue>> = NetSim::new(NUM_NODES);
let nodes = create_test_nodes(NUM_NODES, &net); let nodes = create_test_nodes(NUM_NODES, &net);
crossbeam::scope(|scope| { crossbeam::scope(|scope| {
let mut handles = Vec::new(); let mut handles = Vec::new();
let mut messaging_stop_txs = Vec::new();
for node in nodes { for node in nodes {
// Start a local messaging service on the simulated node.
let messaging: Messaging<TestValue> =
Messaging::new(NUM_NODES);
messaging.spawn(scope);
// Take the thread control handle.
messaging_stop_txs.push(messaging.stop_tx());
handles.push(scope.spawn(move || { handles.push(scope.spawn(move || {
node.run() node.run(messaging)
})); }));
} }
// Compare the set of values returned by broadcast against the expected // Compare the set of values returned by broadcast against the expected
// set. // set.
for h in handles { for h in handles {
assert_eq!(h.join(), Err(Error::NotImplemented)); assert!(match h.join() {
Err(Error::NotImplemented) => true,
_ => false
});
}
// Stop all messaging tasks.
for tx in messaging_stop_txs {
tx.send(()).unwrap();
} }
}); });
} }

92
tests/node_comms.rs Normal file
View File

@ -0,0 +1,92 @@
//! Simulated comms task structure. A simulated comms task communicates with a
//! simulated remote node through a channel. Local communication with
//! coordinating threads is also made via a channel.
extern crate hbbft;
extern crate crossbeam;
extern crate crossbeam_channel;
use std::io;
use std::fmt::Debug;
use std::sync::Arc;
use crossbeam::{Scope, ScopedJoinHandle};
use crossbeam_channel::{Sender, Receiver};
use hbbft::proto::Message;
use hbbft::messaging::SourcedMessage;
#[derive(Debug)]
pub enum Error {
IoError(io::Error),
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error { Error::IoError(err) }
}
/// A communication task connects a remote node to the thread that manages the
/// consensus algorithm.
pub struct CommsTask
<'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
{
/// The transmit side of the multiple producer channel from comms threads.
tx: &'a Sender<SourcedMessage<T>>,
/// The receive side of the channel to the comms thread.
rx: &'a Receiver<Message<T>>,
/// TX to the remote node.
remote_tx: &'a Sender<Message<T>>,
/// RX from the remote node.
remote_rx: &'a Receiver<Message<T>>,
/// The index of this comms task for identification against its remote node.
pub node_index: usize
}
impl <'a, T: 'a + Clone + Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
CommsTask<'a, T>
{
pub fn new(tx: &'a Sender<SourcedMessage<T>>,
rx: &'a Receiver<Message<T>>,
remote_tx: &'a Sender<Message<T>>,
remote_rx: &'a Receiver<Message<T>>,
node_index: usize) ->
Self
{
CommsTask {
tx: tx,
rx: rx,
remote_tx: remote_tx,
remote_rx: remote_rx,
node_index: node_index
}
}
/// The main socket IO loop and an asynchronous thread responding to manager
/// thread requests.
pub fn spawn(&mut self, scope: &Scope<'a>) -> ScopedJoinHandle<()> {
// Borrow parts of `self` before entering the thread binding scope.
let tx = Arc::new(self.tx);
let rx = Arc::new(self.rx);
let remote_tx = Arc::new(self.remote_tx);
let remote_rx = Arc::new(self.remote_rx);
let node_index = self.node_index;
scope.spawn(move || {
// FIXME: refactor to a while loop with clean termination
loop { select_loop! {
recv(rx, message) => {
println!("Node {} <- {:?}", node_index, message);
// Forward the message to the remote node.
remote_tx.send(message).unwrap();
},
recv(remote_rx, message) => {
println!("Node {} -> {:?}", node_index, message);
tx.send(
SourcedMessage {
source: node_index,
message
}).unwrap();
}
}}
})
}
}